use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use super::config::{Storage, TreeConfig};
use super::errors::{Error, Result};
use super::stats::OpenStats;
use super::stats::{BlobStats, CheckpointerStats, JournalStats, RouteCacheStats, TreeStats};
use super::view::View;
use crate::concurrency::{CommitGate, EndpointLocks, Gate};
use crate::engine;
use crate::engine::{KeyRangeBuilder, KeyRangeEntry, RangeBuilder};
use crate::journal::codec::{
encode_erase_record, encode_insert_record, encode_rename_object_record,
encoded_erase_record_len, encoded_insert_record_len, encoded_rename_object_record_len,
BatchEncoder, RECORD_FOOTER_SIZE, RECORD_HEADER_SIZE,
};
use crate::journal::group_commit::Journal;
use crate::journal::reader::replay;
use crate::journal::wal_op::WalOp;
use crate::layout::{BlobGuid, DATA_AREA_START, PAGE_SIZE, ROOT_BLOB_GUID};
use crate::store::blob_store::{AlignedBlobBuf, BlobStore, FileBlobStore, MemoryBlobStore};
use crate::store::{
BlobFrame, BlobFrameRef, BufferManager, CachedBlob, DirtySnapshotEntry, WriteThroughEntry,
};
use super::atomic::{AtomicBatch, BatchOp, Record, RecordVersion};
const ONLINE_COMPACT_BLOB_BUDGET: usize = 256;
const ONLINE_MERGE_PARENT_BUDGET: usize = 256;
const SHAPE_UNDERFILLED_CHILD_FILL_PER_MILLE: u32 = 350;
const SHAPE_OVERFULL_CHILD_FILL_PER_MILLE: u32 = 850;
type BatchOverlay = HashMap<Vec<u8>, Option<Record>>;
type CheckpointMap = HashMap<BlobGuid, u64>;
type CheckpointBytes = Vec<(BlobGuid, u64, AlignedBlobBuf)>;
struct DirtyWriteOutcome {
wrote_any: bool,
failed: CheckpointMap,
first_err: Option<Error>,
}
struct PendingDeleteOutcome {
failed: CheckpointMap,
first_err: Option<Error>,
}
struct BlobStatsAggregate {
blobs: Vec<BlobStats>,
total_space_used: u64,
total_gap_space: u64,
total_slots: u64,
total_compactions: u64,
total_tombstones: u64,
total_blob_edges: u64,
leaf_blob_count: u32,
max_blob_depth: u32,
total_blob_depth: u64,
max_blob_fill_per_mille: u32,
underfilled_child_blobs: u32,
overfull_child_blobs: u32,
}
fn blob_fill_per_mille(space_used: u32, blob_data_capacity: u64) -> u32 {
if blob_data_capacity == 0 {
return 0;
}
let data_used = u64::from(space_used).saturating_sub(DATA_AREA_START as u64);
((data_used.saturating_mul(1000)) / blob_data_capacity) as u32
}
#[derive(Clone)]
pub struct Tree {
cfg: TreeConfig,
store: Arc<BufferManager>,
root_guid: BlobGuid,
root_pin: Arc<CachedBlob>,
endpoint_locks: Arc<EndpointLocks>,
route_cache: Arc<engine::RouteCache>,
maintenance_gate: Arc<Gate>,
next_seq: Arc<AtomicU64>,
commit_gate: Arc<CommitGate>,
journal: Option<Arc<Journal>>,
prefix_list_cache: Arc<engine::PrefixListCache>,
checkpointer: Option<Arc<crate::checkpoint::Checkpointer>>,
open_stats: OpenStats,
}
impl std::fmt::Debug for Tree {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Tree")
.field("storage", &self.cfg.storage)
.field("root_guid", &self.root_guid)
.finish_non_exhaustive()
}
}
fn encoded_batch_record_len(ops: &[BatchOp]) -> usize {
let body_prefix_len = 8 + 4; let mut len = RECORD_HEADER_SIZE + body_prefix_len + RECORD_FOOTER_SIZE;
let mut i = 0usize;
while i < ops.len() {
if let Some((key, value)) = batch_insert_parts(&ops[i]) {
let run = same_shape_insert_run_len(ops, i);
if run > 1 {
len += 1 + 8 + 4 + 4 + 4 + run * (key.len() + value.len());
} else {
len += 1 + 8 + 4 + key.len() + 4 + value.len();
}
i += run;
continue;
}
len += match &ops[i] {
BatchOp::Delete { key } | BatchOp::DeleteIfVersion { key, .. } => 1 + 8 + 4 + key.len(),
BatchOp::Rename { src, dst, .. } => 1 + 8 + 4 + src.len() + 4 + dst.len() + 1,
BatchOp::AssertVersion { .. } | BatchOp::AssertPrefixEmpty { .. } => 0,
BatchOp::Put { .. } | BatchOp::PutIfAbsent { .. } | BatchOp::CompareAndPut { .. } => {
unreachable!("insert-like ops handled above")
}
};
i += 1;
}
len
}
fn batch_insert_parts(op: &BatchOp) -> Option<(&[u8], &[u8])> {
match op {
BatchOp::Put { key, value }
| BatchOp::PutIfAbsent { key, value }
| BatchOp::CompareAndPut { key, value, .. } => Some((key, value)),
BatchOp::Delete { .. }
| BatchOp::DeleteIfVersion { .. }
| BatchOp::AssertVersion { .. }
| BatchOp::AssertPrefixEmpty { .. }
| BatchOp::Rename { .. } => None,
}
}
fn same_shape_insert_run_len(ops: &[BatchOp], start: usize) -> usize {
let Some((first_key, first_value)) = batch_insert_parts(&ops[start]) else {
return 0;
};
let mut end = start + 1;
while end < ops.len() {
match batch_insert_parts(&ops[end]) {
Some((key, value))
if key.len() == first_key.len() && value.len() == first_value.len() =>
{
end += 1;
}
_ => break,
}
}
end - start
}
impl Tree {
pub fn open(cfg: TreeConfig) -> Result<Self> {
let bm = match &cfg.storage {
Storage::Memory => {
let store: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
Arc::new(BufferManager::new(store, cfg.buffer_pool_size))
}
Storage::File { dir } => {
#[cfg(all(target_os = "linux", feature = "io-uring"))]
{
let store = Arc::new(FileBlobStore::open_with_buffer_pool_hint(
dir,
cfg.buffer_pool_size,
)?);
let store_dyn: Arc<dyn BlobStore> = store.clone();
let alloc_store = Arc::clone(&store);
Arc::new(BufferManager::new_with_uninit_allocator(
store_dyn,
cfg.buffer_pool_size,
move || {
unsafe { alloc_store.alloc_blob_buf_uninit() }
},
))
}
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
{
let store: Arc<dyn BlobStore> = Arc::new(FileBlobStore::open(dir)?);
Arc::new(BufferManager::new(store, cfg.buffer_pool_size))
}
}
};
Self::open_inner(cfg, bm, true)
}
pub fn open_with_blob_store(cfg: TreeConfig, store: Arc<dyn BlobStore>) -> Result<Self> {
let bm = Arc::new(BufferManager::new(store, cfg.buffer_pool_size));
Self::open_inner(cfg, bm, false)
}
fn open_inner(cfg: TreeConfig, bm: Arc<BufferManager>, attach_wal: bool) -> Result<Self> {
let root_guid = ROOT_BLOB_GUID;
if !bm.has_blob(root_guid)? {
let mut scratch = bm.alloc_blob_buf_zeroed();
BlobFrame::init(scratch.as_mut_slice(), root_guid)?;
bm.write_blob(root_guid, &scratch)?;
bm.flush()?;
}
let mut open_stats = OpenStats::default();
let (journal, next_seq) = if attach_wal {
match cfg.wal_path() {
None => (None, 1u64),
Some(path) => {
let next_seq = if path.exists() {
let start = std::time::Instant::now();
let (next_seq, replay_stats) = replay_wal(&path, &bm, root_guid)?;
open_stats.wal_replay_micros = start.elapsed().as_micros() as u64;
open_stats.wal_replay_records = replay_stats.records_seen;
open_stats.wal_torn_tail = replay_stats.torn_tail_at.is_some();
if let Ok(meta) = std::fs::metadata(&path) {
open_stats.wal_replay_bytes = meta.len();
}
next_seq
} else {
1
};
let journal = Journal::open_or_create(&path, 0)?;
(Some(Arc::new(journal)), next_seq)
}
}
} else {
(None, 1u64)
};
let root_pin = bm.pin(root_guid)?;
let maintenance_gate = Arc::new(Gate::new());
let commit_gate = Arc::new(CommitGate::new());
let checkpointer = crate::checkpoint::Checkpointer::spawn(
Arc::clone(&bm),
journal.clone(),
Arc::clone(&maintenance_gate),
Arc::clone(&commit_gate),
cfg.checkpoint.clone(),
)
.map(Arc::new);
Ok(Self {
cfg,
store: bm,
root_guid,
root_pin,
endpoint_locks: Arc::new(EndpointLocks::new()),
route_cache: Arc::new(engine::RouteCache::new()),
maintenance_gate,
next_seq: Arc::new(AtomicU64::new(next_seq)),
commit_gate,
journal,
prefix_list_cache: Arc::new(engine::PrefixListCache::new()),
checkpointer,
open_stats,
})
}
pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
self.lookup_record_unlocked(key)
.map(|record| record.map(|record| record.value))
}
pub fn get_record(&self, key: &[u8]) -> Result<Option<Record>> {
self.lookup_record_unlocked(key)
}
pub fn get_version(&self, key: &[u8]) -> Result<Option<RecordVersion>> {
let search = engine::SearchKey::user(key);
engine::lookup_multi_with(
&self.store,
&self.root_pin,
Some(&self.route_cache),
search,
|hit| RecordVersion::new(hit.seq),
)
}
fn lookup_record_unlocked(&self, key: &[u8]) -> Result<Option<Record>> {
let search = engine::SearchKey::user(key);
engine::lookup_multi_with(
&self.store,
&self.root_pin,
Some(&self.route_cache),
search,
|hit| Record {
value: hit.value.to_vec(),
version: RecordVersion::new(hit.seq),
},
)
}
pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
self.put_inner_conditional(key, value, engine::InsertCondition::Always)
.map(|_| ())
}
pub fn put_if_absent(&self, key: &[u8], value: &[u8]) -> Result<bool> {
self.put_inner_conditional(key, value, engine::InsertCondition::IfAbsent)
.map(|outcome| outcome.mutated)
}
pub fn compare_and_put(
&self,
key: &[u8],
expected_version: RecordVersion,
value: &[u8],
) -> Result<bool> {
self.put_inner_conditional(
key,
value,
engine::InsertCondition::IfVersion(expected_version.as_u64()),
)
.map(|outcome| outcome.mutated)
}
fn put_inner_conditional(
&self,
key: &[u8],
value: &[u8],
condition: engine::InsertCondition,
) -> Result<engine::InsertOutcome> {
let search = engine::SearchKey::user(key);
let (outcome, journal_ack) = {
let _mutation = self.maintenance_gate.enter_shared();
let _endpoint = self.endpoint_locks.lock_key(key);
let seq = self.next_seq.fetch_add(1, Ordering::Relaxed);
if let Some(journal) = &self.journal {
let _commit = self.commit_gate.enter_writer();
let outcome = engine::insert_multi_conditional(
&self.store,
&self.root_pin,
Some(&self.route_cache),
search,
value,
seq,
condition,
)?;
if outcome.mutated {
if outcome.root_dirty {
self.store
.mark_dirty_cached(self.root_guid, seq, self.root_pin.as_ref());
}
let mut record =
journal.record_buffer(encoded_insert_record_len(key.len(), value.len()));
encode_insert_record(&mut record, seq, 0, key, value);
let ack = journal.submit(record, self.cfg.wal_sync)?;
(outcome, ack)
} else {
(outcome, None)
}
} else {
let outcome = engine::insert_multi_conditional(
&self.store,
&self.root_pin,
Some(&self.route_cache),
search,
value,
seq,
condition,
)?;
if outcome.root_dirty {
self.store
.mark_dirty_cached(self.root_guid, seq, self.root_pin.as_ref());
}
if outcome.mutated && self.cfg.memory_flush_on_write {
self.flush_dirty_inline()?;
self.flush_pending_deletes_inline()?;
}
(outcome, None)
}
};
if let Some(ack) = journal_ack {
ack.wait()?;
}
Ok(outcome)
}
pub fn delete(&self, key: &[u8]) -> Result<bool> {
self.delete_inner_conditional(key, engine::EraseCondition::Always)
.map(|outcome| outcome.mutated)
}
pub fn delete_if_version(&self, key: &[u8], expected_version: RecordVersion) -> Result<bool> {
self.delete_inner_conditional(
key,
engine::EraseCondition::IfVersion(expected_version.as_u64()),
)
.map(|outcome| outcome.mutated)
}
fn delete_inner_conditional(
&self,
key: &[u8],
condition: engine::EraseCondition,
) -> Result<engine::EraseOutcome> {
let search = engine::SearchKey::user(key);
let (outcome, journal_ack) = {
let _mutation = self.maintenance_gate.enter_shared();
let _endpoint = self.endpoint_locks.lock_key(key);
let seq = self.next_seq.fetch_add(1, Ordering::Relaxed);
if let Some(journal) = &self.journal {
let _commit = self.commit_gate.enter_writer();
let outcome = engine::erase_multi_conditional(
&self.store,
&self.root_pin,
Some(&self.route_cache),
search,
seq,
condition,
)?;
if outcome.mutated {
if outcome.root_dirty {
self.store
.mark_dirty_cached(self.root_guid, seq, self.root_pin.as_ref());
}
let mut record = journal.record_buffer(encoded_erase_record_len(key.len()));
encode_erase_record(&mut record, seq, 0, key);
let ack = journal.submit(record, self.cfg.wal_sync)?;
(outcome, ack)
} else {
(outcome, None)
}
} else {
let outcome = engine::erase_multi_conditional(
&self.store,
&self.root_pin,
Some(&self.route_cache),
search,
seq,
condition,
)?;
if outcome.mutated && outcome.root_dirty {
self.store
.mark_dirty_cached(self.root_guid, seq, self.root_pin.as_ref());
}
if outcome.mutated && self.cfg.memory_flush_on_write {
self.flush_dirty_inline()?;
self.flush_pending_deletes_inline()?;
}
(outcome, None)
}
};
if let Some(ack) = journal_ack {
ack.wait()?;
}
Ok(outcome)
}
pub fn rename(&self, src: &[u8], dst: &[u8], force: bool) -> Result<()> {
let src_search = engine::SearchKey::user(src);
let dst_search = engine::SearchKey::user(dst);
let journal_ack = {
let _mutation = self.maintenance_gate.enter_shared();
let _endpoints = self.endpoint_locks.lock_pair(src, dst);
let seq = self.next_seq.fetch_add(1, Ordering::Relaxed);
let Some(value) = engine::lookup_multi_with(
&self.store,
&self.root_pin,
Some(&self.route_cache),
src_search,
|hit| hit.value.to_vec(),
)?
else {
return Err(Error::NotFound);
};
if src == dst {
return Ok(());
}
if !force
&& engine::lookup_multi_with(
&self.store,
&self.root_pin,
Some(&self.route_cache),
dst_search,
|_| (),
)?
.is_some()
{
return Err(Error::DstExists);
}
if let Some(journal) = &self.journal {
let _commit = self.commit_gate.enter_writer();
let erase_out = engine::erase_multi(
&self.store,
&self.root_pin,
Some(&self.route_cache),
src_search,
seq,
)?;
let insert_out = engine::insert_multi(
&self.store,
&self.root_pin,
Some(&self.route_cache),
dst_search,
&value,
seq,
)?;
if erase_out.root_dirty || insert_out.root_dirty {
self.store
.mark_dirty_cached(self.root_guid, seq, self.root_pin.as_ref());
}
let mut record =
journal.record_buffer(encoded_rename_object_record_len(src.len(), dst.len()));
encode_rename_object_record(&mut record, seq, 0, src, dst, force);
journal.submit(record, self.cfg.wal_sync)?
} else {
let erase_out = engine::erase_multi(
&self.store,
&self.root_pin,
Some(&self.route_cache),
src_search,
seq,
)?;
let insert_out = engine::insert_multi(
&self.store,
&self.root_pin,
Some(&self.route_cache),
dst_search,
&value,
seq,
)?;
if erase_out.root_dirty || insert_out.root_dirty {
self.store
.mark_dirty_cached(self.root_guid, seq, self.root_pin.as_ref());
}
if self.cfg.memory_flush_on_write {
self.flush_dirty_inline()?;
self.flush_pending_deletes_inline()?;
}
None
}
};
if let Some(ack) = journal_ack {
ack.wait()?;
}
Ok(())
}
pub fn atomic<F>(&self, build: F) -> Result<bool>
where
F: FnOnce(&mut AtomicBatch),
{
let mut batch = AtomicBatch::default();
build(&mut batch);
if batch.pending.is_empty() {
return Ok(true);
}
self.apply_batch(batch.pending)
}
fn apply_batch(&self, pending: Vec<BatchOp>) -> Result<bool> {
let _maintenance = self.maintenance_gate.enter_exclusive();
let count = pending.iter().filter(|op| op.emits_wal()).count() as u64;
let base_seq = self.next_seq.fetch_add(count, Ordering::Relaxed);
if !self.preflight_batch(&pending, base_seq)? {
return Ok(false);
}
if count == 0 {
return Ok(true);
}
if let Some(journal) = &self.journal {
let ack = {
let _commit = self.commit_gate.enter_writer();
let mut record = journal.record_buffer(encoded_batch_record_len(&pending));
let mut enc = BatchEncoder::begin(&mut record, base_seq, 0);
self.apply_batch_walker_inline(&pending, base_seq, Some(&mut enc))?;
let _n = enc.finish();
journal.submit(record, self.cfg.wal_sync)?
};
if let Some(ack) = ack {
ack.wait()?;
}
} else {
self.apply_batch_walker_inline(&pending, base_seq, None)?;
if self.cfg.memory_flush_on_write {
self.flush_dirty_inline()?;
self.flush_pending_deletes_inline()?;
}
}
Ok(true)
}
fn preflight_batch(&self, pending: &[BatchOp], base_seq: u64) -> Result<bool> {
if Self::batch_is_guard_free(pending) {
Self::preflight_guard_free_batch(pending)?;
return Ok(true);
}
let mut overlay = BatchOverlay::new();
let mut seq_offset = 0u64;
for op in pending {
let seq = if op.emits_wal() {
let seq = base_seq + seq_offset;
seq_offset += 1;
seq
} else {
base_seq + seq_offset
};
if !self.preflight_batch_op(&mut overlay, op, seq)? {
return Ok(false);
}
}
Ok(true)
}
fn preflight_batch_op(
&self,
overlay: &mut BatchOverlay,
op: &BatchOp,
seq: u64,
) -> Result<bool> {
match op {
BatchOp::Put { key, value } => {
Self::validate_insert_shape(key, value)?;
Self::overlay_put(overlay, key, value, seq);
}
BatchOp::PutIfAbsent { key, value } => {
Self::validate_insert_shape(key, value)?;
if self.projected_record(overlay, key)?.is_some() {
return Ok(false);
}
Self::overlay_put(overlay, key, value, seq);
}
BatchOp::CompareAndPut {
key,
expected,
value,
} => {
Self::validate_insert_shape(key, value)?;
match self.projected_record(overlay, key)? {
Some(record) if record.version == *expected => {
Self::overlay_put(overlay, key, value, seq);
}
_ => return Ok(false),
}
}
BatchOp::Delete { key } => {
overlay.insert(key.clone(), None);
}
BatchOp::DeleteIfVersion { key, expected } => {
match self.projected_record(overlay, key)? {
Some(record) if record.version == *expected => {
overlay.insert(key.clone(), None);
}
_ => return Ok(false),
}
}
BatchOp::AssertVersion { key, expected } => {
match self.projected_record(overlay, key)? {
Some(record) if record.version == *expected => {}
_ => return Ok(false),
}
}
BatchOp::AssertPrefixEmpty { prefix } => {
if !self.projected_prefix_empty(overlay, prefix)? {
return Ok(false);
}
}
BatchOp::Rename { src, dst, force } => {
self.preflight_rename_op(overlay, src, dst, *force, seq)?;
}
}
Ok(true)
}
fn preflight_rename_op(
&self,
overlay: &mut BatchOverlay,
src: &[u8],
dst: &[u8],
force: bool,
seq: u64,
) -> Result<()> {
let Some(src_record) = self.projected_record(overlay, src)? else {
return Err(Error::NotFound);
};
if src == dst {
return Ok(());
}
if !force && self.projected_record(overlay, dst)?.is_some() {
return Err(Error::DstExists);
}
Self::validate_insert_shape(dst, &src_record.value)?;
overlay.insert(src.to_vec(), None);
overlay.insert(
dst.to_vec(),
Some(Record {
value: src_record.value,
version: RecordVersion::new(seq),
}),
);
Ok(())
}
fn overlay_put(overlay: &mut BatchOverlay, key: &[u8], value: &[u8], seq: u64) {
overlay.insert(
key.to_vec(),
Some(Record {
value: value.to_vec(),
version: RecordVersion::new(seq),
}),
);
}
fn batch_is_guard_free(pending: &[BatchOp]) -> bool {
pending
.iter()
.all(|op| matches!(op, BatchOp::Put { .. } | BatchOp::Delete { .. }))
}
fn preflight_guard_free_batch(pending: &[BatchOp]) -> Result<()> {
for op in pending {
if let BatchOp::Put { key, value } = op {
Self::validate_insert_shape(key, value)?;
}
}
Ok(())
}
fn projected_record(&self, overlay: &BatchOverlay, key: &[u8]) -> Result<Option<Record>> {
match overlay.get(key) {
Some(record) => Ok(record.clone()),
None => self.lookup_record_unlocked(key),
}
}
fn projected_prefix_empty(&self, overlay: &BatchOverlay, prefix: &[u8]) -> Result<bool> {
if overlay
.iter()
.any(|(key, record)| record.is_some() && key.starts_with(prefix))
{
return Ok(false);
}
let mut iter = self.scan_keys(prefix).into_iter();
while let Some(entry) = iter.next_unlocked().transpose()? {
match entry {
KeyRangeEntry::Key { key, .. } => match overlay.get(&key) {
Some(None) => {}
Some(Some(_)) | None => return Ok(false),
},
KeyRangeEntry::CommonPrefix(_) => return Ok(false),
}
}
Ok(true)
}
fn validate_insert_shape(key: &[u8], value: &[u8]) -> Result<()> {
let key_len = key.len().saturating_add(1);
if key_len > u16::MAX as usize {
return Err(Error::KeyTooLong { len: key_len });
}
if value.len() > u16::MAX as usize {
return Err(Error::ValueTooLong { len: value.len() });
}
Ok(())
}
#[allow(clippy::too_many_lines)] fn apply_batch_walker_inline(
&self,
pending: &[BatchOp],
base_seq: u64,
mut enc: Option<&mut crate::journal::codec::BatchEncoder<'_>>,
) -> Result<()> {
let mut seq_offset = 0u64;
let mut i = 0usize;
while i < pending.len() {
if batch_insert_parts(&pending[i]).is_some() {
let run_len = same_shape_insert_run_len(pending, i);
let first_seq = base_seq + seq_offset;
self.apply_batch_insert_run_walker(&pending[i..i + run_len], first_seq)?;
seq_offset += run_len as u64;
if let Some(enc) = enc.as_deref_mut() {
let (key, value) = batch_insert_parts(&pending[i])
.expect("insert run begins with insert-like op");
enc.push_insert_run(
0,
run_len,
key.len(),
value.len(),
pending[i..i + run_len]
.iter()
.map(|op| batch_insert_parts(op).expect("same-shape insert run")),
);
}
i += run_len;
continue;
}
let op = &pending[i];
let seq = if op.emits_wal() {
let seq = base_seq + seq_offset;
seq_offset += 1;
seq
} else {
base_seq + seq_offset
};
match op {
BatchOp::Put { .. }
| BatchOp::PutIfAbsent { .. }
| BatchOp::CompareAndPut { .. } => {
unreachable!("insert-like ops are handled by the run path");
}
BatchOp::Delete { key } => {
let search = engine::SearchKey::user(key);
let outcome = engine::erase_multi(
&self.store,
&self.root_pin,
Some(&self.route_cache),
search,
seq,
)?;
if outcome.mutated && outcome.root_dirty {
self.store
.mark_dirty_cached(self.root_guid, seq, self.root_pin.as_ref());
}
if let Some(enc) = enc.as_deref_mut() {
enc.push_erase(0, key);
}
}
BatchOp::DeleteIfVersion { key, expected } => {
let search = engine::SearchKey::user(key);
let outcome = engine::erase_multi_conditional(
&self.store,
&self.root_pin,
Some(&self.route_cache),
search,
seq,
engine::EraseCondition::IfVersion(expected.as_u64()),
)?;
if !outcome.mutated {
return Err(Error::Internal(
"atomic preflight missed delete_if_version guard",
));
}
if outcome.root_dirty {
self.store
.mark_dirty_cached(self.root_guid, seq, self.root_pin.as_ref());
}
if let Some(enc) = enc.as_deref_mut() {
enc.push_erase(0, key);
}
}
BatchOp::AssertVersion { .. } | BatchOp::AssertPrefixEmpty { .. } => {}
BatchOp::Rename { src, dst, force } => {
self.apply_batch_rename_walker(src, dst, *force, seq)?;
if let Some(enc) = enc.as_deref_mut() {
enc.push_rename_object(0, src, dst, *force);
}
}
}
i += 1;
}
Ok(())
}
fn apply_batch_insert_run_walker(&self, ops: &[BatchOp], first_seq: u64) -> Result<()> {
let mut items = Vec::with_capacity(ops.len());
for (idx, op) in ops.iter().enumerate() {
let seq = first_seq + idx as u64;
let (key, value, condition) = match op {
BatchOp::Put { key, value } => (key, value, engine::InsertCondition::Always),
BatchOp::PutIfAbsent { key, value } => {
(key, value, engine::InsertCondition::IfAbsent)
}
BatchOp::CompareAndPut {
key,
expected,
value,
} => (
key,
value,
engine::InsertCondition::IfVersion(expected.as_u64()),
),
BatchOp::Delete { .. }
| BatchOp::DeleteIfVersion { .. }
| BatchOp::AssertVersion { .. }
| BatchOp::AssertPrefixEmpty { .. }
| BatchOp::Rename { .. } => unreachable!("not an insert-like batch op"),
};
items.push(engine::InsertBatchItem::new(
engine::SearchKey::user(key),
value,
seq,
condition,
));
}
let mut applied = 0usize;
while applied < items.len() {
let outcome = engine::insert_multi_batch_conditional(
&self.store,
&self.root_pin,
Some(&self.route_cache),
&items[applied..],
)?;
if outcome.applied == 0 {
return Err(Error::Internal("insert batch walker made no progress"));
}
if outcome.root_dirty {
self.store.mark_dirty_cached(
self.root_guid,
items[applied].seq,
self.root_pin.as_ref(),
);
}
applied += outcome.applied;
}
Ok(())
}
fn apply_batch_rename_walker(
&self,
src: &[u8],
dst: &[u8],
force: bool,
seq: u64,
) -> Result<()> {
let src_search = engine::SearchKey::user(src);
let dst_search = engine::SearchKey::user(dst);
let Some(value) = engine::lookup_multi_with(
&self.store,
&self.root_pin,
Some(&self.route_cache),
src_search,
|hit| hit.value.to_vec(),
)?
else {
return Err(Error::NotFound);
};
if src == dst {
return Ok(());
}
if !force
&& engine::lookup_multi_with(
&self.store,
&self.root_pin,
Some(&self.route_cache),
dst_search,
|_| (),
)?
.is_some()
{
return Err(Error::DstExists);
}
let erase_out = engine::erase_multi(
&self.store,
&self.root_pin,
Some(&self.route_cache),
src_search,
seq,
)?;
let insert_out = engine::insert_multi(
&self.store,
&self.root_pin,
Some(&self.route_cache),
dst_search,
&value,
seq,
)?;
if erase_out.root_dirty || insert_out.root_dirty {
self.store
.mark_dirty_cached(self.root_guid, seq, self.root_pin.as_ref());
}
Ok(())
}
pub fn range(&self) -> RangeBuilder {
RangeBuilder::new(
Arc::clone(&self.store),
Arc::clone(&self.root_pin),
self.root_guid,
Arc::clone(&self.maintenance_gate),
)
}
pub fn scan(&self, prefix: &[u8]) -> RangeBuilder {
self.range().prefix(prefix)
}
pub fn range_keys(&self) -> KeyRangeBuilder {
KeyRangeBuilder::new(self.range()).with_prefix_list_cache(
Arc::clone(&self.prefix_list_cache),
Arc::clone(&self.next_seq),
)
}
pub fn scan_keys(&self, prefix: &[u8]) -> KeyRangeBuilder {
self.range_keys().prefix(prefix)
}
pub fn view<F, R>(&self, prefix: &[u8], read: F) -> Result<R>
where
F: FnOnce(&View) -> Result<R>,
{
let view = self.capture_view(prefix)?;
read(&view)
}
fn capture_view(&self, prefix: &[u8]) -> Result<View> {
let scope = prefix.to_vec();
let (snapshot_store, blob_count) = {
let _maintenance = self.maintenance_gate.enter_exclusive();
let topology =
engine::collect_prefix_blob_topology_silent(&self.store, self.root_guid, prefix)?;
let snapshot_store = Arc::new(MemoryBlobStore::new());
for entry in &topology {
let bytes = self.store.snapshot_blob_image(entry.guid)?;
snapshot_store.write_blob(entry.guid, &bytes)?;
}
(snapshot_store, topology.len())
};
let snapshot_bm = Arc::new(BufferManager::new(snapshot_store, blob_count.max(1)));
let root_pin = snapshot_bm.pin(self.root_guid)?;
Ok(View::new(scope, snapshot_bm, self.root_guid, root_pin))
}
pub fn is_prefix_empty(&self, prefix: &[u8]) -> Result<bool> {
let _maintenance = self.maintenance_gate.enter_shared();
self.projected_prefix_empty(&std::collections::HashMap::new(), prefix)
}
fn flush_dirty_inline(&self) -> Result<()> {
let snap = self.store.snapshot_dirty();
let mut failed: std::collections::HashMap<BlobGuid, u64> = std::collections::HashMap::new();
let mut first_err: Option<Error> = None;
let mut entries = Vec::with_capacity(snap.len());
for (guid, expected_seq) in snap {
if let Some(bytes) = self.store.snapshot_bytes(guid) {
entries.push(WriteThroughEntry {
guid,
bytes,
expected_seq,
});
} else {
failed.insert(guid, expected_seq);
first_err.get_or_insert(Error::Internal(
"flush_dirty_inline: dirty entry lost cache image — invariant I1 violated",
));
}
}
if !entries.is_empty() {
let expected: Vec<_> = entries
.iter()
.map(|entry| (entry.guid, entry.expected_seq))
.collect();
if let Err(e) = self.store.write_through_batch(&entries) {
for (guid, expected_seq) in expected {
failed.insert(guid, expected_seq);
}
if first_err.is_none() {
first_err = Some(e);
}
}
}
if !failed.is_empty() {
self.store.restore_dirty(failed);
}
if let Some(e) = first_err {
return Err(e);
}
Ok(())
}
fn flush_pending_deletes_inline(&self) -> Result<()> {
let pending = self.store.snapshot_pending_deletes();
let mut failed: std::collections::HashMap<BlobGuid, u64> = std::collections::HashMap::new();
let mut first_err: Option<Error> = None;
for (guid, seq) in pending {
if let Err(e) = self.store.execute_pending_delete(guid) {
failed.insert(guid, seq);
if first_err.is_none() {
first_err = Some(e);
}
}
}
if !failed.is_empty() {
self.store.restore_pending_deletes(failed);
}
if let Some(e) = first_err {
return Err(e);
}
Ok(())
}
pub fn checkpoint(&self) -> Result<()> {
let _maintenance = self.maintenance_gate.enter_shared();
loop {
let (snap_dirty, snap_pending, versioned_snap, wal_up_to) =
self.capture_checkpoint_intent()?;
if let (Some(journal), Some(up_to)) = (&self.journal, wal_up_to) {
if let Err(e) = journal.flush_up_to(up_to) {
self.store.restore_pending_deletes(snap_pending);
self.store.restore_dirty(snap_dirty);
return Err(e);
}
}
let snap_bytes = match self.clone_checkpoint_bytes(&versioned_snap) {
Ok(Some(bytes)) => bytes,
Ok(None) => {
self.store.restore_pending_deletes(snap_pending);
self.store.restore_dirty(snap_dirty);
continue;
}
Err(e) => {
self.store.restore_pending_deletes(snap_pending);
self.store.restore_dirty(snap_dirty);
return Err(e);
}
};
return self.finish_checkpoint_snapshot(snap_pending, snap_bytes);
}
}
fn clone_checkpoint_bytes(
&self,
versioned_snap: &[DirtySnapshotEntry],
) -> Result<Option<CheckpointBytes>> {
let mut snap_bytes = Vec::with_capacity(versioned_snap.len());
for entry in versioned_snap {
match self
.store
.snapshot_bytes_if_version(entry.guid, entry.content_version)?
{
Some(bytes) => snap_bytes.push((entry.guid, entry.expected_seq, bytes)),
None => return Ok(None),
}
}
Ok(Some(snap_bytes))
}
fn capture_checkpoint_intent(
&self,
) -> Result<(
CheckpointMap,
CheckpointMap,
Vec<DirtySnapshotEntry>,
Option<u64>,
)> {
let (snap_dirty, snap_pending, wal_up_to) = if let Some(journal) = &self.journal {
let _commit = self.commit_gate.enter_checkpoint();
let snap_dirty = self.store.snapshot_dirty();
let snap_pending = self.store.snapshot_pending_deletes();
let wal_up_to = journal.queued_work();
(snap_dirty, snap_pending, Some(wal_up_to))
} else {
let snap_dirty = self.store.snapshot_dirty();
let snap_pending = self.store.snapshot_pending_deletes();
(snap_dirty, snap_pending, None)
};
let versioned_snap = match self.store.snapshot_dirty_versions(&snap_dirty) {
Ok(versioned) => versioned,
Err(e) => {
self.store.restore_pending_deletes(snap_pending);
self.store.restore_dirty(snap_dirty);
return Err(e);
}
};
Ok((snap_dirty, snap_pending, versioned_snap, wal_up_to))
}
fn finish_checkpoint_snapshot(
&self,
snap_pending: CheckpointMap,
snap_bytes: CheckpointBytes,
) -> Result<()> {
let DirtyWriteOutcome {
wrote_any,
failed: dirty_failed,
first_err: first_dirty_err,
} = self.write_checkpoint_bytes(snap_bytes);
let had_dirty_failure = !dirty_failed.is_empty();
if had_dirty_failure {
self.store.restore_dirty(dirty_failed);
}
if !wrote_any && snap_pending.is_empty() && !self.store.needs_flush() {
self.maybe_truncate_journal()?;
return Ok(());
}
if let Err(e) = self.store.flush() {
self.store.restore_pending_deletes(snap_pending);
return Err(e);
}
if had_dirty_failure {
self.store.restore_pending_deletes(snap_pending);
return Err(first_dirty_err.expect("had_dirty_failure ⇒ first_dirty_err set"));
}
let PendingDeleteOutcome {
failed: pending_failed,
first_err: first_pending_err,
} = self.apply_pending_deletes(&snap_pending);
if !pending_failed.is_empty() {
self.store.restore_pending_deletes(pending_failed.clone());
}
self.sync_applied_deletes(&snap_pending, &pending_failed)?;
if let Some(e) = first_pending_err {
return Err(e);
}
self.maybe_truncate_journal()
}
fn write_checkpoint_bytes(&self, snap_bytes: CheckpointBytes) -> DirtyWriteOutcome {
let entries: Vec<_> = snap_bytes
.into_iter()
.map(|(guid, expected_seq, bytes)| WriteThroughEntry {
guid,
bytes,
expected_seq,
})
.collect();
let mut failed = CheckpointMap::new();
let mut first_err = None;
if !entries.is_empty() {
let expected: Vec<_> = entries
.iter()
.map(|entry| (entry.guid, entry.expected_seq))
.collect();
if let Err(e) = self.store.write_through_batch(&entries) {
for (guid, expected_seq) in expected {
failed.insert(guid, expected_seq);
}
first_err = Some(e);
}
}
DirtyWriteOutcome {
wrote_any: !entries.is_empty(),
failed,
first_err,
}
}
fn apply_pending_deletes(&self, snap_pending: &CheckpointMap) -> PendingDeleteOutcome {
let mut failed = CheckpointMap::new();
let mut first_err = None;
for (guid, seq) in snap_pending {
if let Err(e) = self.store.execute_pending_delete(*guid) {
failed.insert(*guid, *seq);
first_err.get_or_insert(e);
}
}
PendingDeleteOutcome { failed, first_err }
}
fn sync_applied_deletes(
&self,
snap_pending: &CheckpointMap,
pending_failed: &CheckpointMap,
) -> Result<()> {
let applied_deletes = snap_pending.len() - pending_failed.len();
if applied_deletes > 0 {
if let Err(e) = self.store.flush() {
let restore_applied: CheckpointMap = snap_pending
.iter()
.filter(|(g, _)| !pending_failed.contains_key(*g))
.map(|(g, s)| (*g, *s))
.collect();
self.store.restore_pending_deletes(restore_applied);
return Err(e);
}
}
Ok(())
}
fn maybe_truncate_journal(&self) -> Result<()> {
if let Some(journal) = &self.journal {
if journal.needs_checkpoint() {
let _commit = self.commit_gate.enter_checkpoint();
if self.store.dirty_count() == 0 && self.store.pending_delete_count() == 0 {
journal.truncate()?;
}
}
}
Ok(())
}
pub fn stats(&self) -> Result<TreeStats> {
let _maintenance = self.maintenance_gate.enter_shared();
let aggregate = self.collect_blob_stats_silent()?;
let bm_dirty_count = self.store.dirty_count();
let bm_pending_delete_count = self.store.pending_delete_count();
let bm_cache_hits = self.store.cache_hits();
let bm_cache_misses = self.store.cache_misses();
let bm_optimistic_restarts = self.store.optimistic_restarts();
let bm_range_restarts = self.store.range_restarts();
let bm_walker_ops = self.store.walker_ops();
let bm_walker_blob_hops = self.store.walker_blob_hops();
let bm_max_blob_hops = self.store.max_blob_hops();
let bm_max_cross_blob_depth = self.store.max_cross_blob_depth();
let bm_spillovers = self.store.spillover_count();
let bm_merges = self.store.merge_count();
let bm_route_resident_count = self.store.route_resident_count();
let bm_route_resident_demotions = self.store.route_resident_demotions();
let bm_cache_evictions = self.store.cache_evictions();
let bm_eviction_skips_protected = self.store.eviction_skips_protected();
let bm_eviction_skips_route_resident = self.store.eviction_skips_route_resident();
let bm_admission_protects = self.store.admission_protects();
let route = self.route_cache.stats();
let route_cache = RouteCacheStats {
entries: route.entries,
hits: route.hits,
misses: route.misses,
learns: route.learns,
evictions: route.evictions,
invalidations: route.invalidations,
};
let journal = self.journal.as_ref().map(|j| {
let s = j.stats();
JournalStats {
appends: s.appends,
batches: s.batches,
syncs: s.syncs,
queued_work: s.queued_work,
written_work: s.written_work,
flushed_work: s.flushed_work,
checkpointed_work: s.checkpointed_work,
pending_work: s.pending_work,
checkpoint_debt: s.checkpoint_debt,
}
});
let checkpointer = self.checkpointer.as_ref().map(|ck| CheckpointerStats {
rounds_attempted: ck.rounds_attempted(),
rounds_succeeded: ck.rounds_succeeded(),
rounds_failed: ck.rounds_failed(),
blobs_flushed: ck.blobs_flushed(),
merges_total: ck.merges_total(),
truncates: ck.truncates(),
evictions: ck.evictions(),
last_dirty_count: ck.last_dirty_count(),
last_pending_delete_count: ck.last_pending_delete_count(),
last_round_micros: ck.last_round_micros(),
});
Ok(TreeStats {
blob_count: aggregate.blobs.len() as u32,
total_space_used: aggregate.total_space_used,
total_gap_space: aggregate.total_gap_space,
total_slots: aggregate.total_slots,
total_compactions: aggregate.total_compactions,
total_tombstones: aggregate.total_tombstones,
total_blob_edges: aggregate.total_blob_edges,
leaf_blob_count: aggregate.leaf_blob_count,
max_blob_depth: aggregate.max_blob_depth,
total_blob_depth: aggregate.total_blob_depth,
max_blob_fill_per_mille: aggregate.max_blob_fill_per_mille,
underfilled_child_blobs: aggregate.underfilled_child_blobs,
overfull_child_blobs: aggregate.overfull_child_blobs,
blobs: aggregate.blobs,
bm_dirty_count,
bm_pending_delete_count,
bm_cache_hits,
bm_cache_misses,
bm_optimistic_restarts,
bm_range_restarts,
bm_walker_ops,
bm_walker_blob_hops,
bm_max_blob_hops,
bm_max_cross_blob_depth,
bm_spillovers,
bm_merges,
bm_route_resident_count,
bm_route_resident_demotions,
bm_cache_evictions,
bm_eviction_skips_protected,
bm_eviction_skips_route_resident,
bm_admission_protects,
route_cache,
open: self.open_stats,
journal,
checkpointer,
})
}
fn collect_blob_stats_silent(&self) -> Result<BlobStatsAggregate> {
let topology = engine::collect_blob_topology_silent(&self.store, self.root_guid)?;
let blob_data_capacity = (PAGE_SIZE - DATA_AREA_START) as u64;
let mut aggregate = BlobStatsAggregate {
blobs: Vec::with_capacity(topology.len()),
total_space_used: 0,
total_gap_space: 0,
total_slots: 0,
total_compactions: 0,
total_tombstones: 0,
total_blob_edges: 0,
leaf_blob_count: 0,
max_blob_depth: 0,
total_blob_depth: 0,
max_blob_fill_per_mille: 0,
underfilled_child_blobs: 0,
overfull_child_blobs: 0,
};
for entry in &topology {
let pin = self.store.pin_silent(entry.guid)?;
let guard = pin.read();
let frame = BlobFrameRef::wrap(guard.as_slice());
let h = frame.header();
let stats = BlobStats {
guid: entry.guid,
space_used: h.space_used,
gap_space: h.gap_space,
num_slots: h.num_slots,
num_ext_blobs: h.num_ext_blobs,
compact_times: h.compact_times,
tombstone_leaf_cnt: h.tombstone_leaf_cnt,
};
Self::accumulate_blob_stats(&mut aggregate, stats, entry.depth, blob_data_capacity);
}
Ok(aggregate)
}
fn accumulate_blob_stats(
aggregate: &mut BlobStatsAggregate,
stats: BlobStats,
depth: u32,
blob_data_capacity: u64,
) {
aggregate.total_space_used += u64::from(stats.space_used);
aggregate.total_gap_space += u64::from(stats.gap_space);
aggregate.total_slots += u64::from(stats.num_slots);
aggregate.total_compactions += u64::from(stats.compact_times);
aggregate.total_tombstones += u64::from(stats.tombstone_leaf_cnt);
aggregate.total_blob_edges += u64::from(stats.num_ext_blobs);
if stats.num_ext_blobs == 0 {
aggregate.leaf_blob_count += 1;
}
aggregate.max_blob_depth = aggregate.max_blob_depth.max(depth);
aggregate.total_blob_depth += u64::from(depth);
let fill_per_mille = blob_fill_per_mille(stats.space_used, blob_data_capacity);
aggregate.max_blob_fill_per_mille = aggregate.max_blob_fill_per_mille.max(fill_per_mille);
if depth != 0 {
if fill_per_mille < SHAPE_UNDERFILLED_CHILD_FILL_PER_MILLE {
aggregate.underfilled_child_blobs += 1;
} else if fill_per_mille > SHAPE_OVERFULL_CHILD_FILL_PER_MILLE {
aggregate.overfull_child_blobs += 1;
}
}
aggregate.blobs.push(stats);
}
pub fn compact(&self) -> Result<()> {
if self.store.compaction_candidate_count() == 0 && self.store.merge_candidate_count() == 0 {
self.seed_maintenance_candidates()?;
}
let compact_guids = self
.store
.pop_compaction_candidates(ONLINE_COMPACT_BLOB_BUDGET);
let mut compacted_any = false;
for guid in compact_guids {
compacted_any |= self.compact_candidate_blob(guid)?;
}
if compacted_any && self.store.merge_candidate_count() == 0 {
self.seed_maintenance_candidates()?;
}
let merge_guids = self.store.pop_merge_candidates(ONLINE_MERGE_PARENT_BUDGET);
for guid in merge_guids {
self.merge_candidate_parent(guid)?;
}
Ok(())
}
fn seed_maintenance_candidates(&self) -> Result<()> {
let _maintenance = self.maintenance_gate.enter_shared();
let guids = engine::collect_blob_guids(&self.store, self.root_guid)?;
for guid in guids {
let pin = self.store.pin(guid)?;
let guard = pin.read();
let frame = BlobFrameRef::wrap(guard.as_slice());
let header = frame.header();
if engine::blob_needs_compaction(frame) {
self.store.note_compaction_candidate(guid);
}
if header.num_ext_blobs != 0 {
self.store.note_merge_candidate(guid);
}
}
Ok(())
}
fn compact_candidate_blob(&self, guid: BlobGuid) -> Result<bool> {
use crate::store::STRUCTURAL_SEQ;
let _maintenance = self.maintenance_gate.enter_shared();
if !self.store.has_blob(guid)? {
return Ok(false);
}
let pin = self.store.pin(guid)?;
let needs_compaction = {
let guard = pin.read();
engine::blob_needs_compaction(BlobFrameRef::wrap(guard.as_slice()))
};
if !needs_compaction {
return Ok(false);
}
let _commit = self
.journal
.as_ref()
.map(|_| self.commit_gate.enter_writer());
let compacted = {
let mut guard = pin.write();
let still_needs_compaction = {
let frame = guard.frame();
engine::blob_needs_compaction(frame.as_ref())
};
if still_needs_compaction {
engine::compact_blob(&mut guard)?;
}
still_needs_compaction
};
if compacted {
self.store.mark_dirty(guid, STRUCTURAL_SEQ);
}
drop(pin);
Ok(compacted)
}
fn merge_candidate_parent(&self, guid: BlobGuid) -> Result<()> {
use crate::store::STRUCTURAL_SEQ;
let _maintenance = self.maintenance_gate.enter_exclusive();
if !self.store.has_blob(guid)? {
return Ok(());
}
let _commit = self
.journal
.as_ref()
.map(|_| self.commit_gate.enter_writer());
let pin = self.store.pin(guid)?;
let (merged, has_children) = {
let mut guard = pin.write();
let mut frame = guard.frame();
let merged = engine::try_merge_children(&self.store, &mut frame, STRUCTURAL_SEQ)?;
(merged, frame.header().num_ext_blobs != 0)
};
if merged.merged > 0 {
self.store.mark_dirty(guid, STRUCTURAL_SEQ);
self.store.note_merges(u64::from(merged.merged));
if has_children {
self.store.note_merge_candidate(guid);
}
}
drop(pin);
Ok(())
}
#[must_use]
pub fn config(&self) -> &TreeConfig {
&self.cfg
}
#[must_use]
pub const fn page_size() -> u32 {
PAGE_SIZE
}
}
fn replay_wal(
path: &std::path::Path,
bm: &Arc<BufferManager>,
root_guid: BlobGuid,
) -> Result<(u64, crate::journal::reader::ReplayStats)> {
let root_pin = bm.pin(root_guid)?;
let (_header, stats) = replay(path, |op, seq, _off| {
let root_dirty = match op {
WalOp::Insert { key, value } => {
let search = engine::SearchKey::user(key);
engine::insert_multi(bm, &root_pin, None, search, value, seq)?.root_dirty
}
WalOp::Erase { key } => {
let search = engine::SearchKey::user(key);
engine::erase_multi(bm, &root_pin, None, search, seq)?.root_dirty
}
WalOp::RenameObject {
src_key,
dst_key,
force,
..
} => {
let src_search = engine::SearchKey::user(src_key);
let dst_search = engine::SearchKey::user(dst_key);
if engine::lookup_multi_with(bm, &root_pin, None, src_search, |_| ())?.is_none() {
return Ok(());
}
if !force
&& engine::lookup_multi_with(bm, &root_pin, None, dst_search, |_| ())?.is_some()
{
return Ok(());
}
let value = engine::lookup_multi_with(bm, &root_pin, None, src_search, |hit| {
hit.value.to_vec()
})?
.unwrap_or_default();
let erase_out = engine::erase_multi(bm, &root_pin, None, src_search, seq)?;
let insert_out =
engine::insert_multi(bm, &root_pin, None, dst_search, &value, seq)?;
erase_out.root_dirty || insert_out.root_dirty
}
WalOp::Batch { ops: _ } => false,
};
if root_dirty {
bm.mark_dirty(root_guid, seq);
}
Ok(())
})?;
debug_assert!(
stats.records_seen > 0 || stats.highest_seq.is_none(),
"an empty WAL replay cannot report a highest seq",
);
debug_assert!(
stats.torn_tail_at.is_none() || stats.records_seen > 0 || stats.highest_seq.is_none(),
"a torn tail without complete records must not report a highest seq",
);
Ok((stats.highest_seq.unwrap_or(0) + 1, stats))
}
#[cfg(test)]
mod tests {
use crate::TreeBuilder;
use std::sync::mpsc::sync_channel;
use std::thread;
use std::time::Duration;
#[test]
fn strict_prefix_point_keys_round_trip_through_public_api() {
let tree = TreeBuilder::new("ignored").memory().open().unwrap();
tree.put(b"abc", b"short").unwrap();
tree.put(b"abcdef", b"long").unwrap();
assert_eq!(tree.get(b"abc").unwrap().as_deref(), Some(&b"short"[..]));
assert_eq!(tree.get(b"abcdef").unwrap().as_deref(), Some(&b"long"[..]));
assert!(tree.delete(b"abc").unwrap());
assert_eq!(tree.get(b"abc").unwrap(), None);
assert_eq!(tree.get(b"abcdef").unwrap().as_deref(), Some(&b"long"[..]));
}
#[test]
fn point_get_does_not_enter_maintenance_gate() {
let tree = TreeBuilder::new("ignored").memory().open().unwrap();
tree.put(b"hot/key", b"value").unwrap();
let exclusive = tree.maintenance_gate.enter_exclusive();
let worker_tree = tree.clone();
let (done_tx, done_rx) = sync_channel(0);
let handle = thread::spawn(move || {
let got = worker_tree.get(b"hot/key").unwrap();
done_tx.send(got).unwrap();
});
let got = done_rx.recv_timeout(Duration::from_secs(1));
drop(exclusive);
handle.join().unwrap();
assert_eq!(got.unwrap().as_deref(), Some(&b"value"[..]));
}
#[test]
fn single_key_writes_wait_behind_maintenance_exclusive() {
let tree = TreeBuilder::new("ignored").memory().open().unwrap();
tree.put(b"src", b"old").unwrap();
let exclusive = tree.maintenance_gate.enter_exclusive();
let worker_tree = tree.clone();
let (done_tx, done_rx) = sync_channel(0);
let handle = thread::spawn(move || {
worker_tree.put(b"k1", b"v1").unwrap();
assert!(worker_tree.delete(b"src").unwrap());
worker_tree.put(b"rename-src", b"v2").unwrap();
worker_tree
.rename(b"rename-src", b"rename-dst", false)
.unwrap();
let k1 = worker_tree.get(b"k1").unwrap();
let renamed = worker_tree.get(b"rename-dst").unwrap();
done_tx.send((k1, renamed)).unwrap();
});
let got = done_rx.recv_timeout(Duration::from_secs(1));
assert!(
got.is_err(),
"single-key writers must wait behind an exclusive mutation gate"
);
drop(exclusive);
let got = done_rx.recv_timeout(Duration::from_secs(1));
handle.join().unwrap();
let (k1, renamed) = got.unwrap();
assert_eq!(k1.as_deref(), Some(&b"v1"[..]));
assert_eq!(renamed.as_deref(), Some(&b"v2"[..]));
}
#[test]
fn single_key_writes_take_endpoint_shard() {
let tree = TreeBuilder::new("ignored").memory().open().unwrap();
let endpoint = tree.endpoint_locks.lock_key(b"same/key");
let worker_tree = tree.clone();
let (done_tx, done_rx) = sync_channel(0);
let handle = thread::spawn(move || {
worker_tree.put(b"same/key", b"value").unwrap();
done_tx.send(()).unwrap();
});
assert!(
done_rx.recv_timeout(Duration::from_millis(50)).is_err(),
"put must wait behind the key endpoint shard"
);
drop(endpoint);
done_rx.recv_timeout(Duration::from_secs(1)).unwrap();
handle.join().unwrap();
assert_eq!(
tree.get(b"same/key").unwrap().as_deref(),
Some(&b"value"[..])
);
}
#[test]
fn compact_waits_for_maintenance_read_guard() {
let tree = TreeBuilder::new("ignored")
.memory()
.buffer_pool_size(16)
.open()
.unwrap();
let big = vec![0xCDu8; 4 * 1024];
for i in 0..256u32 {
tree.put(format!("k{i:08}").as_bytes(), &big).unwrap();
}
for i in 0..248u32 {
tree.delete(format!("k{i:08}").as_bytes()).unwrap();
}
assert!(
tree.stats().unwrap().blob_count > 1,
"test precondition: compact must have a BlobNode merge phase"
);
let read_guard = tree.maintenance_gate.enter_shared();
let worker_tree = tree.clone();
let (started_tx, started_rx) = sync_channel(0);
let (done_tx, done_rx) = sync_channel(0);
let handle = thread::spawn(move || {
started_tx.send(()).unwrap();
worker_tree.compact().unwrap();
done_tx.send(()).unwrap();
});
started_rx.recv().unwrap();
assert!(
done_rx.recv_timeout(Duration::from_millis(50)).is_err(),
"compact must wait behind active shared maintenance readers"
);
drop(read_guard);
done_rx.recv_timeout(Duration::from_secs(1)).unwrap();
handle.join().unwrap();
}
}