use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use super::config::{Storage, TreeConfig};
use super::errors::{Error, Result};
use super::stats::{BlobStats, CheckpointerStats, JournalStats, RouteCacheStats, TreeStats};
use crate::concurrency::{CommitGate, MaintenanceGate};
use crate::engine;
use crate::engine::{KeyRangeBuilder, KeyRangeEntry, RangeBuilder};
use crate::journal::codec::{
encode_erase_record, encode_insert_record, encode_rename_object_record,
encoded_erase_record_len, encoded_insert_record_len, encoded_rename_object_record_len,
BatchEncoder, RECORD_FOOTER_SIZE, RECORD_HEADER_SIZE,
};
use crate::journal::group_commit::Journal;
use crate::journal::reader::replay;
use crate::journal::wal_op::WalOp;
use crate::layout::{BlobGuid, DATA_AREA_START, PAGE_SIZE};
use crate::store::blob_store::{BlobStore, FileBlobStore, MemoryBlobStore};
use crate::store::buffer_manager::WriteThroughEntry;
use crate::store::{BlobFrame, BlobFrameRef, BufferManager, CachedBlob};
use super::atomic::{AtomicBatch, BatchOp, Record, RecordVersion};
const ONLINE_COMPACT_BLOB_BUDGET: usize = 256;
const ONLINE_MERGE_PARENT_BUDGET: usize = 256;
type BatchOverlay = HashMap<Vec<u8>, Option<Record>>;
struct BlobStatsAggregate {
blobs: Vec<BlobStats>,
total_space_used: u64,
total_gap_space: u64,
total_slots: u64,
total_compactions: u64,
total_tombstones: u64,
total_blob_edges: u64,
leaf_blob_count: u32,
max_blob_depth: u32,
total_blob_depth: u64,
max_blob_fill_per_mille: u32,
}
fn blob_fill_per_mille(space_used: u32, blob_data_capacity: u64) -> u32 {
if blob_data_capacity == 0 {
return 0;
}
let data_used = u64::from(space_used).saturating_sub(DATA_AREA_START as u64);
((data_used.saturating_mul(1000)) / blob_data_capacity) as u32
}
#[derive(Clone)]
pub struct Tree {
cfg: TreeConfig,
store: Arc<BufferManager>,
root_guid: BlobGuid,
root_pin: Arc<CachedBlob>,
rename_lock: Arc<Mutex<()>>,
route_cache: Arc<engine::RouteCache>,
maintenance_gate: Arc<MaintenanceGate>,
next_seq: Arc<AtomicU64>,
commit_gate: Arc<CommitGate>,
journal: Option<Arc<Journal>>,
checkpointer: Option<Arc<crate::checkpoint::Checkpointer>>,
}
impl std::fmt::Debug for Tree {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Tree")
.field("storage", &self.cfg.storage)
.field("root_guid", &self.root_guid)
.finish_non_exhaustive()
}
}
pub(crate) const ROOT_BLOB_GUID: BlobGuid = [0; 16];
fn encoded_batch_record_len(ops: &[BatchOp]) -> usize {
let body_prefix_len = 8 + 4; let mut len = RECORD_HEADER_SIZE + body_prefix_len + RECORD_FOOTER_SIZE;
let mut i = 0usize;
while i < ops.len() {
if let Some((key, value)) = batch_insert_parts(&ops[i]) {
let run = same_shape_insert_run_len(ops, i);
if run > 1 {
len += 1 + 8 + 4 + 4 + 4 + run * (key.len() + value.len());
} else {
len += 1 + 8 + 4 + key.len() + 4 + value.len();
}
i += run;
continue;
}
len += match &ops[i] {
BatchOp::Delete { key } | BatchOp::DeleteIfVersion { key, .. } => 1 + 8 + 4 + key.len(),
BatchOp::Rename { src, dst, .. } => 1 + 8 + 4 + src.len() + 4 + dst.len() + 1,
BatchOp::AssertVersion { .. } | BatchOp::AssertPrefixEmpty { .. } => 0,
BatchOp::Put { .. } | BatchOp::PutIfAbsent { .. } | BatchOp::CompareAndPut { .. } => {
unreachable!("insert-like ops handled above")
}
};
i += 1;
}
len
}
fn batch_insert_parts(op: &BatchOp) -> Option<(&[u8], &[u8])> {
match op {
BatchOp::Put { key, value }
| BatchOp::PutIfAbsent { key, value }
| BatchOp::CompareAndPut { key, value, .. } => Some((key, value)),
BatchOp::Delete { .. }
| BatchOp::DeleteIfVersion { .. }
| BatchOp::AssertVersion { .. }
| BatchOp::AssertPrefixEmpty { .. }
| BatchOp::Rename { .. } => None,
}
}
fn same_shape_insert_run_len(ops: &[BatchOp], start: usize) -> usize {
let Some((first_key, first_value)) = batch_insert_parts(&ops[start]) else {
return 0;
};
let mut end = start + 1;
while end < ops.len() {
match batch_insert_parts(&ops[end]) {
Some((key, value))
if key.len() == first_key.len() && value.len() == first_value.len() =>
{
end += 1;
}
_ => break,
}
}
end - start
}
impl Tree {
pub fn open(cfg: TreeConfig) -> Result<Self> {
let store: Arc<dyn BlobStore> = match &cfg.storage {
Storage::Memory => Arc::new(MemoryBlobStore::new()),
Storage::File { dir } => {
#[cfg(all(target_os = "linux", feature = "io-uring"))]
{
Arc::new(FileBlobStore::open_with_buffer_pool_hint(
dir,
cfg.buffer_pool_size,
)?)
}
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
{
Arc::new(FileBlobStore::open(dir)?)
}
}
};
Self::open_inner(cfg, store, true)
}
pub fn open_with_blob_store(cfg: TreeConfig, store: Arc<dyn BlobStore>) -> Result<Self> {
Self::open_inner(cfg, store, false)
}
fn open_inner(cfg: TreeConfig, store: Arc<dyn BlobStore>, attach_wal: bool) -> Result<Self> {
let bm: Arc<BufferManager> = Arc::new(BufferManager::new(store, cfg.buffer_pool_size));
let root_guid = ROOT_BLOB_GUID;
if !bm.has_blob(root_guid)? {
let mut scratch = bm.alloc_blob_buf_zeroed();
BlobFrame::init(scratch.as_mut_slice(), root_guid)?;
bm.write_blob(root_guid, &scratch)?;
bm.flush()?;
}
let (journal, next_seq) = if attach_wal {
match cfg.wal_path() {
None => (None, 1u64),
Some(path) => {
let next_seq = if path.exists() {
replay_wal(&path, &bm, root_guid)?
} else {
1
};
let journal = Journal::open_or_create(&path, 0)?;
(Some(Arc::new(journal)), next_seq)
}
}
} else {
(None, 1u64)
};
let root_pin = bm.pin(root_guid)?;
let maintenance_gate = Arc::new(MaintenanceGate::new());
let commit_gate = Arc::new(CommitGate::new());
let checkpointer = crate::checkpoint::Checkpointer::spawn(
Arc::clone(&bm),
journal.clone(),
Arc::clone(&maintenance_gate),
Arc::clone(&commit_gate),
cfg.checkpoint.clone(),
)
.map(Arc::new);
Ok(Self {
cfg,
store: bm,
root_guid,
root_pin,
rename_lock: Arc::new(Mutex::new(())),
route_cache: Arc::new(engine::RouteCache::new()),
maintenance_gate,
next_seq: Arc::new(AtomicU64::new(next_seq)),
commit_gate,
journal,
checkpointer,
})
}
pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
let _maintenance = self.maintenance_gate.enter_shared();
self.lookup_record_unlocked(key)
.map(|record| record.map(|record| record.value))
}
pub fn get_record(&self, key: &[u8]) -> Result<Option<Record>> {
let _maintenance = self.maintenance_gate.enter_shared();
self.lookup_record_unlocked(key)
}
pub fn get_version(&self, key: &[u8]) -> Result<Option<RecordVersion>> {
let _maintenance = self.maintenance_gate.enter_shared();
let search = engine::SearchKey::user(key);
engine::lookup_multi_with(&self.store, &self.root_pin, search, |hit| {
RecordVersion::new(hit.seq)
})
}
fn lookup_record_unlocked(&self, key: &[u8]) -> Result<Option<Record>> {
let search = engine::SearchKey::user(key);
engine::lookup_multi_with(&self.store, &self.root_pin, search, |hit| Record {
value: hit.value.to_vec(),
version: RecordVersion::new(hit.seq),
})
}
pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
self.put_inner_conditional(
key,
value,
false,
engine::InsertCondition::Always,
)
.map(|_| ())
}
pub fn put_if_absent(&self, key: &[u8], value: &[u8]) -> Result<bool> {
self.put_inner_conditional(
key,
value,
false,
engine::InsertCondition::IfAbsent,
)
.map(|outcome| outcome.mutated)
}
pub fn compare_and_put(
&self,
key: &[u8],
expected_version: RecordVersion,
value: &[u8],
) -> Result<bool> {
self.put_inner_conditional(
key,
value,
false,
engine::InsertCondition::IfVersion(expected_version.as_u64()),
)
.map(|outcome| outcome.mutated)
}
pub fn insert(&self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
self.put_inner_conditional(
key,
value,
true,
engine::InsertCondition::Always,
)
.map(|outcome| outcome.previous)
}
fn put_inner_conditional(
&self,
key: &[u8],
value: &[u8],
wants_prev: bool,
condition: engine::InsertCondition,
) -> Result<engine::InsertOutcome> {
let _maintenance = self.maintenance_gate.enter_shared();
let search = engine::SearchKey::user(key);
let seq = self.next_seq.fetch_add(1, Ordering::Relaxed);
let (outcome, journal_ack) = if let Some(journal) = &self.journal {
let _commit = self.commit_gate.enter_writer();
let outcome = engine::insert_multi_conditional(
&self.store,
&self.root_pin,
Some(&self.route_cache),
search,
value,
seq,
wants_prev,
condition,
)?;
if outcome.mutated {
if outcome.root_dirty {
self.store
.mark_dirty_cached(self.root_guid, seq, self.root_pin.as_ref());
}
let mut record =
Vec::with_capacity(encoded_insert_record_len(key.len(), value.len()));
encode_insert_record(&mut record, seq, 0, key, value);
let ack = journal.submit(record, self.cfg.wal_commit)?;
(outcome, ack)
} else {
(outcome, None)
}
} else {
let outcome = engine::insert_multi_conditional(
&self.store,
&self.root_pin,
Some(&self.route_cache),
search,
value,
seq,
wants_prev,
condition,
)?;
if outcome.root_dirty {
self.store
.mark_dirty_cached(self.root_guid, seq, self.root_pin.as_ref());
}
if outcome.mutated && self.cfg.memory_flush_on_write {
self.flush_dirty_inline()?;
self.flush_pending_deletes_inline()?;
}
(outcome, None)
};
if let Some(ack) = journal_ack {
ack.wait()?;
}
Ok(outcome)
}
pub fn delete(&self, key: &[u8]) -> Result<bool> {
self.delete_inner_conditional(
key,
false,
engine::EraseCondition::Always,
)
.map(|outcome| outcome.mutated)
}
pub fn delete_if_version(&self, key: &[u8], expected_version: RecordVersion) -> Result<bool> {
self.delete_inner_conditional(
key,
false,
engine::EraseCondition::IfVersion(expected_version.as_u64()),
)
.map(|outcome| outcome.mutated)
}
pub fn remove(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
self.delete_inner_conditional(
key,
true,
engine::EraseCondition::Always,
)
.map(|outcome| outcome.previous)
}
fn delete_inner_conditional(
&self,
key: &[u8],
wants_prev: bool,
condition: engine::EraseCondition,
) -> Result<engine::EraseOutcome> {
let _maintenance = self.maintenance_gate.enter_shared();
let search = engine::SearchKey::user(key);
let seq = self.next_seq.fetch_add(1, Ordering::Relaxed);
let (outcome, journal_ack) = if let Some(journal) = &self.journal {
let _commit = self.commit_gate.enter_writer();
let outcome = engine::erase_multi_conditional(
&self.store,
&self.root_pin,
Some(&self.route_cache),
search,
seq,
wants_prev,
condition,
)?;
if outcome.mutated {
if outcome.root_dirty {
self.store
.mark_dirty_cached(self.root_guid, seq, self.root_pin.as_ref());
}
let mut record = Vec::with_capacity(encoded_erase_record_len(key.len()));
encode_erase_record(&mut record, seq, 0, key);
let ack = journal.submit(record, self.cfg.wal_commit)?;
(outcome, ack)
} else {
(outcome, None)
}
} else {
let outcome = engine::erase_multi_conditional(
&self.store,
&self.root_pin,
Some(&self.route_cache),
search,
seq,
wants_prev,
condition,
)?;
if outcome.mutated && outcome.root_dirty {
self.store
.mark_dirty_cached(self.root_guid, seq, self.root_pin.as_ref());
}
if outcome.mutated && self.cfg.memory_flush_on_write {
self.flush_dirty_inline()?;
self.flush_pending_deletes_inline()?;
}
(outcome, None)
};
if let Some(ack) = journal_ack {
ack.wait()?;
}
Ok(outcome)
}
pub fn rename(&self, src: &[u8], dst: &[u8], force: bool) -> Result<()> {
let _maintenance = self.maintenance_gate.enter_shared();
let src_search = engine::SearchKey::user(src);
let dst_search = engine::SearchKey::user(dst);
let seq = self.next_seq.fetch_add(1, Ordering::Relaxed);
let _r = self.rename_lock.lock().unwrap();
let Some(value) =
engine::lookup_multi_with(&self.store, &self.root_pin, src_search, |hit| {
hit.value.to_vec()
})?
else {
return Err(Error::NotFound);
};
if src == dst {
return Ok(());
}
if !force
&& engine::lookup_multi_with(&self.store, &self.root_pin, dst_search, |_| ())?.is_some()
{
return Err(Error::DstExists);
}
let journal_ack = if let Some(journal) = &self.journal {
let _commit = self.commit_gate.enter_writer();
let erase_out = engine::erase_multi(
&self.store,
&self.root_pin,
Some(&self.route_cache),
src_search,
seq,
false,
)?;
let insert_out = engine::insert_multi(
&self.store,
&self.root_pin,
Some(&self.route_cache),
dst_search,
&value,
seq,
false,
)?;
if erase_out.root_dirty || insert_out.root_dirty {
self.store
.mark_dirty_cached(self.root_guid, seq, self.root_pin.as_ref());
}
let mut record =
Vec::with_capacity(encoded_rename_object_record_len(src.len(), dst.len()));
encode_rename_object_record(&mut record, seq, 0, src, dst, force);
journal.submit(record, self.cfg.wal_commit)?
} else {
let erase_out = engine::erase_multi(
&self.store,
&self.root_pin,
Some(&self.route_cache),
src_search,
seq,
false,
)?;
let insert_out = engine::insert_multi(
&self.store,
&self.root_pin,
Some(&self.route_cache),
dst_search,
&value,
seq,
false,
)?;
if erase_out.root_dirty || insert_out.root_dirty {
self.store
.mark_dirty_cached(self.root_guid, seq, self.root_pin.as_ref());
}
if self.cfg.memory_flush_on_write {
self.flush_dirty_inline()?;
self.flush_pending_deletes_inline()?;
}
None
};
if let Some(ack) = journal_ack {
ack.wait()?;
}
Ok(())
}
pub fn atomic<F>(&self, build: F) -> Result<bool>
where
F: FnOnce(&mut AtomicBatch),
{
let mut batch = AtomicBatch::default();
build(&mut batch);
if batch.pending.is_empty() {
return Ok(true);
}
self.apply_batch(batch.pending)
}
fn apply_batch(&self, pending: Vec<BatchOp>) -> Result<bool> {
let _maintenance = self.maintenance_gate.enter_exclusive();
let count = pending.iter().filter(|op| op.emits_wal()).count() as u64;
let base_seq = self.next_seq.fetch_add(count, Ordering::Relaxed);
if !self.preflight_batch(&pending, base_seq)? {
return Ok(false);
}
if count == 0 {
return Ok(true);
}
if let Some(journal) = &self.journal {
let ack = {
let _commit = self.commit_gate.enter_writer();
let mut record = Vec::with_capacity(encoded_batch_record_len(&pending));
let mut enc = BatchEncoder::begin(&mut record, base_seq, 0);
self.apply_batch_walker_inline(&pending, base_seq, Some(&mut enc))?;
let _n = enc.finish();
journal.submit(record, self.cfg.wal_commit)?
};
if let Some(ack) = ack {
ack.wait()?;
}
} else {
self.apply_batch_walker_inline(&pending, base_seq, None)?;
if self.cfg.memory_flush_on_write {
self.flush_dirty_inline()?;
self.flush_pending_deletes_inline()?;
}
}
Ok(true)
}
fn preflight_batch(&self, pending: &[BatchOp], base_seq: u64) -> Result<bool> {
if Self::batch_is_guard_free(pending) {
Self::preflight_guard_free_batch(pending)?;
return Ok(true);
}
let mut overlay = BatchOverlay::new();
let mut seq_offset = 0u64;
for op in pending {
let seq = if op.emits_wal() {
let seq = base_seq + seq_offset;
seq_offset += 1;
seq
} else {
base_seq + seq_offset
};
if !self.preflight_batch_op(&mut overlay, op, seq)? {
return Ok(false);
}
}
Ok(true)
}
fn preflight_batch_op(
&self,
overlay: &mut BatchOverlay,
op: &BatchOp,
seq: u64,
) -> Result<bool> {
match op {
BatchOp::Put { key, value } => {
Self::validate_insert_shape(key, value)?;
Self::overlay_put(overlay, key, value, seq);
}
BatchOp::PutIfAbsent { key, value } => {
Self::validate_insert_shape(key, value)?;
if self.projected_record(overlay, key)?.is_some() {
return Ok(false);
}
Self::overlay_put(overlay, key, value, seq);
}
BatchOp::CompareAndPut {
key,
expected,
value,
} => {
Self::validate_insert_shape(key, value)?;
match self.projected_record(overlay, key)? {
Some(record) if record.version == *expected => {
Self::overlay_put(overlay, key, value, seq);
}
_ => return Ok(false),
}
}
BatchOp::Delete { key } => {
overlay.insert(key.clone(), None);
}
BatchOp::DeleteIfVersion { key, expected } => {
match self.projected_record(overlay, key)? {
Some(record) if record.version == *expected => {
overlay.insert(key.clone(), None);
}
_ => return Ok(false),
}
}
BatchOp::AssertVersion { key, expected } => {
match self.projected_record(overlay, key)? {
Some(record) if record.version == *expected => {}
_ => return Ok(false),
}
}
BatchOp::AssertPrefixEmpty { prefix } => {
if !self.projected_prefix_empty(overlay, prefix)? {
return Ok(false);
}
}
BatchOp::Rename { src, dst, force } => {
self.preflight_rename_op(overlay, src, dst, *force, seq)?;
}
}
Ok(true)
}
fn preflight_rename_op(
&self,
overlay: &mut BatchOverlay,
src: &[u8],
dst: &[u8],
force: bool,
seq: u64,
) -> Result<()> {
let Some(src_record) = self.projected_record(overlay, src)? else {
return Err(Error::NotFound);
};
if src == dst {
return Ok(());
}
if !force && self.projected_record(overlay, dst)?.is_some() {
return Err(Error::DstExists);
}
Self::validate_insert_shape(dst, &src_record.value)?;
overlay.insert(src.to_vec(), None);
overlay.insert(
dst.to_vec(),
Some(Record {
value: src_record.value,
version: RecordVersion::new(seq),
}),
);
Ok(())
}
fn overlay_put(overlay: &mut BatchOverlay, key: &[u8], value: &[u8], seq: u64) {
overlay.insert(
key.to_vec(),
Some(Record {
value: value.to_vec(),
version: RecordVersion::new(seq),
}),
);
}
fn batch_is_guard_free(pending: &[BatchOp]) -> bool {
pending
.iter()
.all(|op| matches!(op, BatchOp::Put { .. } | BatchOp::Delete { .. }))
}
fn preflight_guard_free_batch(pending: &[BatchOp]) -> Result<()> {
for op in pending {
if let BatchOp::Put { key, value } = op {
Self::validate_insert_shape(key, value)?;
}
}
Ok(())
}
fn projected_record(&self, overlay: &BatchOverlay, key: &[u8]) -> Result<Option<Record>> {
match overlay.get(key) {
Some(record) => Ok(record.clone()),
None => self.lookup_record_unlocked(key),
}
}
fn projected_prefix_empty(&self, overlay: &BatchOverlay, prefix: &[u8]) -> Result<bool> {
if overlay
.iter()
.any(|(key, record)| record.is_some() && key.starts_with(prefix))
{
return Ok(false);
}
let mut iter = self.scan_keys(prefix).into_iter();
while let Some(entry) = iter.next_unlocked().transpose()? {
match entry {
KeyRangeEntry::Key { key, .. } => match overlay.get(&key) {
Some(None) => {}
Some(Some(_)) | None => return Ok(false),
},
KeyRangeEntry::CommonPrefix(_) => return Ok(false),
}
}
Ok(true)
}
fn validate_insert_shape(key: &[u8], value: &[u8]) -> Result<()> {
let key_len = key.len().saturating_add(1);
if key_len > u16::MAX as usize {
return Err(Error::KeyTooLong { len: key_len });
}
if value.len() > u16::MAX as usize {
return Err(Error::ValueTooLong { len: value.len() });
}
Ok(())
}
#[allow(clippy::too_many_lines)] fn apply_batch_walker_inline(
&self,
pending: &[BatchOp],
base_seq: u64,
mut enc: Option<&mut crate::journal::codec::BatchEncoder<'_>>,
) -> Result<()> {
let mut seq_offset = 0u64;
let mut i = 0usize;
while i < pending.len() {
if batch_insert_parts(&pending[i]).is_some() {
let run_len = same_shape_insert_run_len(pending, i);
let first_seq = base_seq + seq_offset;
self.apply_batch_insert_run_walker(&pending[i..i + run_len], first_seq)?;
seq_offset += run_len as u64;
if let Some(enc) = enc.as_deref_mut() {
let (key, value) = batch_insert_parts(&pending[i])
.expect("insert run begins with insert-like op");
enc.push_insert_run(
0,
run_len,
key.len(),
value.len(),
pending[i..i + run_len]
.iter()
.map(|op| batch_insert_parts(op).expect("same-shape insert run")),
);
}
i += run_len;
continue;
}
let op = &pending[i];
let seq = if op.emits_wal() {
let seq = base_seq + seq_offset;
seq_offset += 1;
seq
} else {
base_seq + seq_offset
};
match op {
BatchOp::Put { .. }
| BatchOp::PutIfAbsent { .. }
| BatchOp::CompareAndPut { .. } => {
unreachable!("insert-like ops are handled by the run path");
}
BatchOp::Delete { key } => {
let search = engine::SearchKey::user(key);
let outcome = engine::erase_multi(
&self.store,
&self.root_pin,
Some(&self.route_cache),
search,
seq,
false,
)?;
if outcome.mutated && outcome.root_dirty {
self.store
.mark_dirty_cached(self.root_guid, seq, self.root_pin.as_ref());
}
if let Some(enc) = enc.as_deref_mut() {
enc.push_erase(0, key);
}
}
BatchOp::DeleteIfVersion { key, expected } => {
let search = engine::SearchKey::user(key);
let outcome = engine::erase_multi_conditional(
&self.store,
&self.root_pin,
Some(&self.route_cache),
search,
seq,
false,
engine::EraseCondition::IfVersion(expected.as_u64()),
)?;
if !outcome.mutated {
return Err(Error::Internal(
"atomic preflight missed delete_if_version guard",
));
}
if outcome.root_dirty {
self.store
.mark_dirty_cached(self.root_guid, seq, self.root_pin.as_ref());
}
if let Some(enc) = enc.as_deref_mut() {
enc.push_erase(0, key);
}
}
BatchOp::AssertVersion { .. } | BatchOp::AssertPrefixEmpty { .. } => {}
BatchOp::Rename { src, dst, force } => {
self.apply_batch_rename_walker(src, dst, *force, seq)?;
if let Some(enc) = enc.as_deref_mut() {
enc.push_rename_object(0, src, dst, *force);
}
}
}
i += 1;
}
Ok(())
}
fn apply_batch_insert_run_walker(&self, ops: &[BatchOp], first_seq: u64) -> Result<()> {
let mut items = Vec::with_capacity(ops.len());
for (idx, op) in ops.iter().enumerate() {
let seq = first_seq + idx as u64;
let (key, value, condition) = match op {
BatchOp::Put { key, value } => (key, value, engine::InsertCondition::Always),
BatchOp::PutIfAbsent { key, value } => {
(key, value, engine::InsertCondition::IfAbsent)
}
BatchOp::CompareAndPut {
key,
expected,
value,
} => (
key,
value,
engine::InsertCondition::IfVersion(expected.as_u64()),
),
BatchOp::Delete { .. }
| BatchOp::DeleteIfVersion { .. }
| BatchOp::AssertVersion { .. }
| BatchOp::AssertPrefixEmpty { .. }
| BatchOp::Rename { .. } => unreachable!("not an insert-like batch op"),
};
items.push(engine::InsertBatchItem::new(
engine::SearchKey::user(key),
value,
seq,
condition,
));
}
let mut applied = 0usize;
while applied < items.len() {
let outcome = engine::insert_multi_batch_conditional(
&self.store,
&self.root_pin,
Some(&self.route_cache),
&items[applied..],
)?;
if outcome.applied == 0 {
return Err(Error::Internal("insert batch walker made no progress"));
}
if outcome.root_dirty {
self.store.mark_dirty_cached(
self.root_guid,
items[applied].seq,
self.root_pin.as_ref(),
);
}
applied += outcome.applied;
}
Ok(())
}
fn apply_batch_rename_walker(
&self,
src: &[u8],
dst: &[u8],
force: bool,
seq: u64,
) -> Result<()> {
let src_search = engine::SearchKey::user(src);
let dst_search = engine::SearchKey::user(dst);
let Some(value) =
engine::lookup_multi_with(&self.store, &self.root_pin, src_search, |hit| {
hit.value.to_vec()
})?
else {
return Err(Error::NotFound);
};
if src == dst {
return Ok(());
}
if !force
&& engine::lookup_multi_with(&self.store, &self.root_pin, dst_search, |_| ())?.is_some()
{
return Err(Error::DstExists);
}
let erase_out = engine::erase_multi(
&self.store,
&self.root_pin,
Some(&self.route_cache),
src_search,
seq,
false,
)?;
let insert_out = engine::insert_multi(
&self.store,
&self.root_pin,
Some(&self.route_cache),
dst_search,
&value,
seq,
false,
)?;
if erase_out.root_dirty || insert_out.root_dirty {
self.store
.mark_dirty_cached(self.root_guid, seq, self.root_pin.as_ref());
}
Ok(())
}
pub fn range(&self) -> RangeBuilder {
RangeBuilder::new(
Arc::clone(&self.store),
self.root_guid,
Arc::clone(&self.maintenance_gate),
)
}
pub fn scan_prefix(&self, prefix: &[u8]) -> RangeBuilder {
self.range().prefix(prefix)
}
pub fn range_keys(&self) -> KeyRangeBuilder {
KeyRangeBuilder::new(self.range())
}
pub fn scan_keys(&self, prefix: &[u8]) -> KeyRangeBuilder {
self.range_keys().prefix(prefix)
}
pub fn is_prefix_empty(&self, prefix: &[u8]) -> Result<bool> {
let _maintenance = self.maintenance_gate.enter_shared();
self.projected_prefix_empty(&std::collections::HashMap::new(), prefix)
}
fn flush_dirty_inline(&self) -> Result<()> {
let snap = self.store.snapshot_dirty();
let mut failed: std::collections::HashMap<BlobGuid, u64> = std::collections::HashMap::new();
let mut first_err: Option<Error> = None;
let mut entries = Vec::with_capacity(snap.len());
for (guid, expected_seq) in snap {
if let Some(bytes) = self.store.snapshot_bytes(guid) {
entries.push(WriteThroughEntry {
guid,
bytes,
expected_seq,
});
} else {
failed.insert(guid, expected_seq);
first_err.get_or_insert(Error::Internal(
"flush_dirty_inline: dirty entry lost cache image — invariant I1 violated",
));
}
}
if !entries.is_empty() {
let expected: Vec<_> = entries
.iter()
.map(|entry| (entry.guid, entry.expected_seq))
.collect();
if let Err(e) = self.store.write_through_batch(&entries) {
for (guid, expected_seq) in expected {
failed.insert(guid, expected_seq);
}
if first_err.is_none() {
first_err = Some(e);
}
}
}
if !failed.is_empty() {
self.store.restore_dirty(failed);
}
if let Some(e) = first_err {
return Err(e);
}
Ok(())
}
fn flush_pending_deletes_inline(&self) -> Result<()> {
let pending = self.store.snapshot_pending_deletes();
let mut failed: std::collections::HashMap<BlobGuid, u64> = std::collections::HashMap::new();
let mut first_err: Option<Error> = None;
for (guid, seq) in pending {
if let Err(e) = self.store.execute_pending_delete(guid) {
failed.insert(guid, seq);
if first_err.is_none() {
first_err = Some(e);
}
}
}
if !failed.is_empty() {
self.store.restore_pending_deletes(failed);
}
if let Some(e) = first_err {
return Err(e);
}
Ok(())
}
#[allow(clippy::too_many_lines)]
pub fn checkpoint(&self) -> Result<()> {
use std::collections::HashMap;
let _maintenance = self.maintenance_gate.enter_shared();
let (_snap_dirty, snap_pending, snap_bytes) = if let Some(journal) = &self.journal {
let _commit = self.commit_gate.enter_checkpoint();
let snap_dirty = self.store.snapshot_dirty();
let snap_pending = self.store.snapshot_pending_deletes();
if let Err(e) = journal.flush() {
self.store.restore_pending_deletes(snap_pending);
self.store.restore_dirty(snap_dirty);
return Err(e);
}
let mut snap_bytes = Vec::with_capacity(snap_dirty.len());
for (guid, expected_seq) in &snap_dirty {
let Some(bytes) = self.store.snapshot_bytes(*guid) else {
self.store.restore_pending_deletes(snap_pending);
self.store.restore_dirty(snap_dirty);
return Err(Error::Internal(
"checkpoint: dirty entry lost cache image — invariant I1 violated",
));
};
snap_bytes.push((*guid, *expected_seq, bytes));
}
(snap_dirty, snap_pending, snap_bytes)
} else {
let snap_dirty = self.store.snapshot_dirty();
let snap_pending = self.store.snapshot_pending_deletes();
let mut snap_bytes = Vec::with_capacity(snap_dirty.len());
for (guid, expected_seq) in &snap_dirty {
let Some(bytes) = self.store.snapshot_bytes(*guid) else {
self.store.restore_pending_deletes(snap_pending);
self.store.restore_dirty(snap_dirty);
return Err(Error::Internal(
"checkpoint: dirty entry lost cache image — invariant I1 violated",
));
};
snap_bytes.push((*guid, *expected_seq, bytes));
}
(snap_dirty, snap_pending, snap_bytes)
};
let mut dirty_failed: HashMap<BlobGuid, u64> = HashMap::new();
let mut first_dirty_err: Option<Error> = None;
let entries: Vec<_> = snap_bytes
.into_iter()
.map(|(guid, expected_seq, bytes)| WriteThroughEntry {
guid,
bytes,
expected_seq,
})
.collect();
if !entries.is_empty() {
let expected: Vec<_> = entries
.iter()
.map(|entry| (entry.guid, entry.expected_seq))
.collect();
if let Err(e) = self.store.write_through_batch(&entries) {
for (guid, expected_seq) in expected {
dirty_failed.insert(guid, expected_seq);
}
if first_dirty_err.is_none() {
first_dirty_err = Some(e);
}
}
}
let had_dirty_failure = !dirty_failed.is_empty();
if had_dirty_failure {
self.store.restore_dirty(dirty_failed);
}
if entries.is_empty() && snap_pending.is_empty() && !self.store.needs_flush() {
if let Some(journal) = &self.journal {
if journal.needs_checkpoint() {
let _commit = self.commit_gate.enter_checkpoint();
if self.store.dirty_count() == 0 && self.store.pending_delete_count() == 0 {
journal.truncate()?;
}
}
}
return Ok(());
}
if let Err(e) = self.store.flush() {
self.store.restore_pending_deletes(snap_pending);
return Err(e);
}
if had_dirty_failure {
self.store.restore_pending_deletes(snap_pending);
return Err(first_dirty_err.expect("had_dirty_failure ⇒ first_dirty_err set"));
}
let mut pending_failed: HashMap<BlobGuid, u64> = HashMap::new();
let mut first_pending_err: Option<Error> = None;
for (guid, seq) in &snap_pending {
if let Err(e) = self.store.execute_pending_delete(*guid) {
pending_failed.insert(*guid, *seq);
if first_pending_err.is_none() {
first_pending_err = Some(e);
}
}
}
if !pending_failed.is_empty() {
self.store.restore_pending_deletes(pending_failed.clone());
}
let applied_deletes = snap_pending.len() - pending_failed.len();
if applied_deletes > 0 {
if let Err(e) = self.store.flush() {
let restore_applied: HashMap<BlobGuid, u64> = snap_pending
.iter()
.filter(|(g, _)| !pending_failed.contains_key(*g))
.map(|(g, s)| (*g, *s))
.collect();
self.store.restore_pending_deletes(restore_applied);
return Err(e);
}
}
if let Some(e) = first_pending_err {
return Err(e);
}
if let Some(journal) = &self.journal {
let _commit = self.commit_gate.enter_checkpoint();
if self.store.dirty_count() == 0 && self.store.pending_delete_count() == 0 {
journal.truncate()?;
}
}
Ok(())
}
pub fn stats(&self) -> Result<TreeStats> {
let _maintenance = self.maintenance_gate.enter_shared();
let aggregate = self.collect_blob_stats_silent()?;
let bm_dirty_count = self.store.dirty_count();
let bm_pending_delete_count = self.store.pending_delete_count();
let bm_cache_hits = self.store.cache_hits();
let bm_cache_misses = self.store.cache_misses();
let bm_optimistic_restarts = self.store.optimistic_restarts();
let bm_range_restarts = self.store.range_restarts();
let bm_walker_ops = self.store.walker_ops();
let bm_walker_blob_hops = self.store.walker_blob_hops();
let bm_max_blob_hops = self.store.max_blob_hops();
let bm_max_cross_blob_depth = self.store.max_cross_blob_depth();
let bm_spillovers = self.store.spillover_count();
let bm_merges = self.store.merge_count();
let route = self.route_cache.stats();
let route_cache = RouteCacheStats {
entries: route.entries,
hits: route.hits,
misses: route.misses,
learns: route.learns,
evictions: route.evictions,
invalidations: route.invalidations,
};
let journal = self.journal.as_ref().map(|j| {
let s = j.stats();
JournalStats {
appends: s.appends,
batches: s.batches,
syncs: s.syncs,
}
});
let checkpointer = self.checkpointer.as_ref().map(|ck| CheckpointerStats {
rounds_attempted: ck.rounds_attempted(),
rounds_succeeded: ck.rounds_succeeded(),
blobs_flushed: ck.blobs_flushed(),
merges_total: ck.merges_total(),
truncates: ck.truncates(),
evictions: ck.evictions(),
});
Ok(TreeStats {
blob_count: aggregate.blobs.len() as u32,
total_space_used: aggregate.total_space_used,
total_gap_space: aggregate.total_gap_space,
total_slots: aggregate.total_slots,
total_compactions: aggregate.total_compactions,
total_tombstones: aggregate.total_tombstones,
total_blob_edges: aggregate.total_blob_edges,
leaf_blob_count: aggregate.leaf_blob_count,
max_blob_depth: aggregate.max_blob_depth,
total_blob_depth: aggregate.total_blob_depth,
max_blob_fill_per_mille: aggregate.max_blob_fill_per_mille,
blobs: aggregate.blobs,
bm_dirty_count,
bm_pending_delete_count,
bm_cache_hits,
bm_cache_misses,
bm_optimistic_restarts,
bm_range_restarts,
bm_walker_ops,
bm_walker_blob_hops,
bm_max_blob_hops,
bm_max_cross_blob_depth,
bm_spillovers,
bm_merges,
route_cache,
journal,
checkpointer,
})
}
fn collect_blob_stats_silent(&self) -> Result<BlobStatsAggregate> {
let topology = engine::collect_blob_topology_silent(&self.store, self.root_guid)?;
let blob_data_capacity = (PAGE_SIZE - DATA_AREA_START) as u64;
let mut aggregate = BlobStatsAggregate {
blobs: Vec::with_capacity(topology.len()),
total_space_used: 0,
total_gap_space: 0,
total_slots: 0,
total_compactions: 0,
total_tombstones: 0,
total_blob_edges: 0,
leaf_blob_count: 0,
max_blob_depth: 0,
total_blob_depth: 0,
max_blob_fill_per_mille: 0,
};
for entry in &topology {
let pin = self.store.pin_silent(entry.guid)?;
let guard = pin.read();
let frame = BlobFrameRef::wrap(guard.as_slice());
let h = frame.header();
let stats = BlobStats {
guid: entry.guid,
space_used: h.space_used,
gap_space: h.gap_space,
num_slots: h.num_slots,
num_ext_blobs: h.num_ext_blobs,
compact_times: h.compact_times,
tombstone_leaf_cnt: h.tombstone_leaf_cnt,
};
Self::accumulate_blob_stats(&mut aggregate, stats, entry.depth, blob_data_capacity);
}
Ok(aggregate)
}
fn accumulate_blob_stats(
aggregate: &mut BlobStatsAggregate,
stats: BlobStats,
depth: u32,
blob_data_capacity: u64,
) {
aggregate.total_space_used += u64::from(stats.space_used);
aggregate.total_gap_space += u64::from(stats.gap_space);
aggregate.total_slots += u64::from(stats.num_slots);
aggregate.total_compactions += u64::from(stats.compact_times);
aggregate.total_tombstones += u64::from(stats.tombstone_leaf_cnt);
aggregate.total_blob_edges += u64::from(stats.num_ext_blobs);
if stats.num_ext_blobs == 0 {
aggregate.leaf_blob_count += 1;
}
aggregate.max_blob_depth = aggregate.max_blob_depth.max(depth);
aggregate.total_blob_depth += u64::from(depth);
aggregate.max_blob_fill_per_mille = aggregate
.max_blob_fill_per_mille
.max(blob_fill_per_mille(stats.space_used, blob_data_capacity));
aggregate.blobs.push(stats);
}
pub fn compact(&self) -> Result<()> {
if self.store.compaction_candidate_count() == 0 && self.store.merge_candidate_count() == 0 {
self.seed_maintenance_candidates()?;
}
let compact_guids = self
.store
.pop_compaction_candidates(ONLINE_COMPACT_BLOB_BUDGET);
for guid in compact_guids {
self.compact_candidate_blob(guid)?;
}
let merge_guids = self.store.pop_merge_candidates(ONLINE_MERGE_PARENT_BUDGET);
for guid in merge_guids {
self.merge_candidate_parent(guid)?;
}
Ok(())
}
fn seed_maintenance_candidates(&self) -> Result<()> {
let _maintenance = self.maintenance_gate.enter_shared();
let guids = engine::collect_blob_guids(&self.store, self.root_guid)?;
for guid in guids {
let pin = self.store.pin(guid)?;
let guard = pin.read();
let frame = BlobFrameRef::wrap(guard.as_slice());
let header = frame.header();
if engine::blob_needs_compaction(frame) {
self.store.note_compaction_candidate(guid);
}
if header.num_ext_blobs != 0 {
self.store.note_merge_candidate(guid);
}
}
Ok(())
}
fn compact_candidate_blob(&self, guid: BlobGuid) -> Result<()> {
use crate::store::buffer_manager::STRUCTURAL_SEQ;
let _maintenance = self.maintenance_gate.enter_shared();
if !self.store.has_blob(guid)? {
return Ok(());
}
let pin = self.store.pin(guid)?;
let needs_compaction = {
let guard = pin.read();
engine::blob_needs_compaction(BlobFrameRef::wrap(guard.as_slice()))
};
if !needs_compaction {
return Ok(());
}
let _commit = self
.journal
.as_ref()
.map(|_| self.commit_gate.enter_writer());
let compacted = {
let mut guard = pin.write();
let still_needs_compaction = {
let frame = guard.frame();
engine::blob_needs_compaction(frame.as_ref())
};
if still_needs_compaction {
engine::compact_blob(&mut guard)?;
}
still_needs_compaction
};
if compacted {
self.store.mark_dirty(guid, STRUCTURAL_SEQ);
}
drop(pin);
Ok(())
}
fn merge_candidate_parent(&self, guid: BlobGuid) -> Result<()> {
use crate::store::buffer_manager::STRUCTURAL_SEQ;
let _maintenance = self.maintenance_gate.enter_exclusive();
if !self.store.has_blob(guid)? {
return Ok(());
}
let _commit = self
.journal
.as_ref()
.map(|_| self.commit_gate.enter_writer());
let pin = self.store.pin(guid)?;
let (merged, has_children) = {
let mut guard = pin.write();
let mut frame = guard.frame();
let merged = engine::try_merge_children(&self.store, &mut frame, STRUCTURAL_SEQ)?;
(merged, frame.header().num_ext_blobs != 0)
};
if merged.merged > 0 {
self.store.mark_dirty(guid, STRUCTURAL_SEQ);
self.store.note_merges(u64::from(merged.merged));
if has_children {
self.store.note_merge_candidate(guid);
}
}
drop(pin);
Ok(())
}
#[must_use]
pub fn config(&self) -> &TreeConfig {
&self.cfg
}
#[must_use]
pub const fn page_size() -> u32 {
PAGE_SIZE
}
}
fn replay_wal(path: &std::path::Path, bm: &Arc<BufferManager>, root_guid: BlobGuid) -> Result<u64> {
let root_pin = bm.pin(root_guid)?;
let (_header, stats) = replay(path, |op, seq, _off| {
let root_dirty = match op {
WalOp::Insert { key, value } => {
let search = engine::SearchKey::user(key);
engine::insert_multi(bm, &root_pin, None, search, value, seq, false)?.root_dirty
}
WalOp::Erase { key } => {
let search = engine::SearchKey::user(key);
engine::erase_multi(bm, &root_pin, None, search, seq, false)?.root_dirty
}
WalOp::RenameObject {
src_key,
dst_key,
force,
..
} => {
let src_search = engine::SearchKey::user(src_key);
let dst_search = engine::SearchKey::user(dst_key);
if engine::lookup_multi_with(bm, &root_pin, src_search, |_| ())?.is_none() {
return Ok(());
}
if !force && engine::lookup_multi_with(bm, &root_pin, dst_search, |_| ())?.is_some()
{
return Ok(());
}
let value =
engine::lookup_multi_with(bm, &root_pin, src_search, |hit| hit.value.to_vec())?
.unwrap_or_default();
let erase_out = engine::erase_multi(bm, &root_pin, None, src_search, seq, false)?;
let insert_out =
engine::insert_multi(bm, &root_pin, None, dst_search, &value, seq, false)?;
erase_out.root_dirty || insert_out.root_dirty
}
WalOp::Batch { ops: _ } => false,
};
if root_dirty {
bm.mark_dirty(root_guid, seq);
}
Ok(())
})?;
debug_assert!(
stats.records_seen > 0 || stats.highest_seq.is_none(),
"an empty WAL replay cannot report a highest seq",
);
debug_assert!(
stats.torn_tail_at.is_none() || stats.records_seen > 0 || stats.highest_seq.is_none(),
"a torn tail without complete records must not report a highest seq",
);
Ok(stats.highest_seq.unwrap_or(0) + 1)
}
#[cfg(test)]
mod tests {
use crate::TreeBuilder;
use std::sync::mpsc::sync_channel;
use std::thread;
use std::time::Duration;
#[test]
fn strict_prefix_point_keys_round_trip_through_public_api() {
let tree = TreeBuilder::new("ignored").memory().open().unwrap();
tree.put(b"abc", b"short").unwrap();
tree.put(b"abcdef", b"long").unwrap();
assert_eq!(tree.get(b"abc").unwrap().as_deref(), Some(&b"short"[..]));
assert_eq!(tree.get(b"abcdef").unwrap().as_deref(), Some(&b"long"[..]));
assert!(tree.delete(b"abc").unwrap());
assert_eq!(tree.get(b"abc").unwrap(), None);
assert_eq!(tree.get(b"abcdef").unwrap().as_deref(), Some(&b"long"[..]));
}
#[test]
fn compact_waits_for_maintenance_read_guard() {
let tree = TreeBuilder::new("ignored")
.memory()
.buffer_pool_size(16)
.open()
.unwrap();
let big = vec![0xCDu8; 4 * 1024];
for i in 0..256u32 {
tree.put(format!("k{i:08}").as_bytes(), &big).unwrap();
}
for i in 0..248u32 {
tree.delete(format!("k{i:08}").as_bytes()).unwrap();
}
assert!(
tree.stats().unwrap().blob_count > 1,
"test precondition: compact must have a BlobNode merge phase"
);
let read_guard = tree.maintenance_gate.enter_shared();
let worker_tree = tree.clone();
let (started_tx, started_rx) = sync_channel(0);
let (done_tx, done_rx) = sync_channel(0);
let handle = thread::spawn(move || {
started_tx.send(()).unwrap();
worker_tree.compact().unwrap();
done_tx.send(()).unwrap();
});
started_rx.recv().unwrap();
assert!(
done_rx.recv_timeout(Duration::from_millis(50)).is_err(),
"compact must wait behind active shared maintenance readers"
);
drop(read_guard);
done_rx.recv_timeout(Duration::from_secs(1)).unwrap();
handle.join().unwrap();
}
}