use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use super::config::{Storage, TreeConfig};
use super::errors::{Error, Result};
use super::stats::{BlobStats, CheckpointerStats, JournalStats, TreeStats};
use crate::concurrency::{CommitGate, MaintenanceGate};
use crate::engine;
use crate::engine::RangeBuilder;
use crate::journal::codec::{
encode_erase_record, encode_insert_record, encode_rename_object_record,
encoded_erase_record_len, encoded_insert_record_len, encoded_rename_object_record_len,
BatchEncoder, RECORD_FOOTER_SIZE, RECORD_HEADER_SIZE,
};
use crate::journal::group_commit::Journal;
use crate::journal::reader::replay;
use crate::journal::txn_op::TxnOp;
use crate::layout::{BlobGuid, PAGE_SIZE};
use crate::store::backend::{Backend, MemoryBackend, PersistentBackend};
use crate::store::buffer_manager::WriteThroughEntry;
use crate::store::{BlobFrame, BlobFrameRef, BufferManager, CachedBlob};
use super::txn::{BatchOp, TxnBatch};
const ONLINE_COMPACT_BLOB_BUDGET: usize = 256;
const ONLINE_MERGE_PARENT_BUDGET: usize = 256;
#[derive(Clone)]
pub struct Tree {
cfg: TreeConfig,
backend: Arc<BufferManager>,
root_guid: BlobGuid,
root_pin: Arc<CachedBlob>,
rename_lock: Arc<Mutex<()>>,
route_cache: Arc<engine::RouteCache>,
maintenance_gate: Arc<MaintenanceGate>,
next_seq: Arc<AtomicU64>,
commit_gate: Arc<CommitGate>,
journal: Option<Arc<Journal>>,
checkpointer: Option<Arc<crate::checkpoint::Checkpointer>>,
}
impl std::fmt::Debug for Tree {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Tree")
.field("storage", &self.cfg.storage)
.field("root_guid", &self.root_guid)
.finish_non_exhaustive()
}
}
pub(crate) const ROOT_BLOB_GUID: BlobGuid = [0; 16];
fn encoded_batch_record_len(ops: &[BatchOp]) -> usize {
let body_prefix_len = 8 + 4; RECORD_HEADER_SIZE
+ body_prefix_len
+ ops
.iter()
.map(|op| match op {
BatchOp::Put { key, value } => 1 + 8 + 4 + key.len() + 4 + value.len(),
BatchOp::Delete { key } => 1 + 8 + 4 + key.len(),
BatchOp::Rename { src, dst, .. } => 1 + 8 + 4 + src.len() + 4 + dst.len() + 1,
})
.sum::<usize>()
+ RECORD_FOOTER_SIZE
}
impl Tree {
pub fn open(cfg: TreeConfig) -> Result<Self> {
let backend: Arc<dyn Backend> = match &cfg.storage {
Storage::Memory => Arc::new(MemoryBackend::new()),
Storage::Persistent { dir } => {
#[cfg(all(target_os = "linux", feature = "io-uring"))]
{
Arc::new(PersistentBackend::open_with_buffer_pool_hint(
dir,
cfg.buffer_pool_size,
)?)
}
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
{
Arc::new(PersistentBackend::open(dir)?)
}
}
};
Self::open_inner(cfg, backend, true)
}
pub fn open_with_backend(cfg: TreeConfig, backend: Arc<dyn Backend>) -> Result<Self> {
Self::open_inner(cfg, backend, false)
}
fn open_inner(cfg: TreeConfig, backend: Arc<dyn Backend>, attach_wal: bool) -> Result<Self> {
let bm: Arc<BufferManager> = Arc::new(BufferManager::new(backend, cfg.buffer_pool_size));
let root_guid = ROOT_BLOB_GUID;
if !bm.has_blob(root_guid)? {
let mut scratch = bm.alloc_blob_buf_zeroed();
BlobFrame::init(scratch.as_mut_slice(), root_guid)?;
bm.write_blob(root_guid, &scratch)?;
bm.flush()?;
}
let (journal, next_seq) = if attach_wal {
match cfg.wal_path() {
None => (None, 1u64),
Some(path) => {
let next_seq = if path.exists() {
replay_wal(&path, &bm, root_guid)?
} else {
1
};
let journal = Journal::open_or_create(&path, 0)?;
(Some(Arc::new(journal)), next_seq)
}
}
} else {
(None, 1u64)
};
let root_pin = bm.pin(root_guid)?;
let maintenance_gate = Arc::new(MaintenanceGate::new());
let commit_gate = Arc::new(CommitGate::new());
let checkpointer = crate::checkpoint::Checkpointer::spawn(
Arc::clone(&bm),
journal.clone(),
Arc::clone(&maintenance_gate),
Arc::clone(&commit_gate),
cfg.checkpoint.clone(),
)
.map(Arc::new);
Ok(Self {
cfg,
backend: bm,
root_guid,
root_pin,
rename_lock: Arc::new(Mutex::new(())),
route_cache: Arc::new(engine::RouteCache::new()),
maintenance_gate,
next_seq: Arc::new(AtomicU64::new(next_seq)),
commit_gate,
journal,
checkpointer,
})
}
pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
let _maintenance = self.maintenance_gate.enter_shared();
let search = engine::SearchKey::user(key);
engine::lookup_multi_with(&self.backend, &self.root_pin, search, <[u8]>::to_vec)
}
pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
self.put_inner(key, value, false)
.map(|_| ())
}
pub fn insert(&self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
self.put_inner(key, value, true)
}
fn put_inner(&self, key: &[u8], value: &[u8], wants_prev: bool) -> Result<Option<Vec<u8>>> {
let _maintenance = self.maintenance_gate.enter_shared();
let search = engine::SearchKey::user(key);
let seq = self.next_seq.fetch_add(1, Ordering::Relaxed);
let (outcome, journal_ack) = if let Some(journal) = &self.journal {
let _commit = self.commit_gate.enter_writer();
let outcome = engine::insert_multi(
&self.backend,
&self.root_pin,
Some(&self.route_cache),
search,
value,
seq,
wants_prev,
)?;
if outcome.root_dirty {
self.backend
.mark_dirty_cached(self.root_guid, seq, self.root_pin.as_ref());
}
let mut record = Vec::with_capacity(encoded_insert_record_len(key.len(), value.len()));
encode_insert_record(&mut record, seq, 0, key, value);
let ack = journal.submit(record, self.cfg.wal_sync_on_commit)?;
(outcome, ack)
} else {
let outcome = engine::insert_multi(
&self.backend,
&self.root_pin,
Some(&self.route_cache),
search,
value,
seq,
wants_prev,
)?;
if outcome.root_dirty {
self.backend
.mark_dirty_cached(self.root_guid, seq, self.root_pin.as_ref());
}
if self.cfg.memory_flush_on_write {
self.flush_dirty_inline()?;
self.flush_pending_deletes_inline()?;
}
(outcome, None)
};
if let Some(ack) = journal_ack {
ack.wait()?;
}
Ok(outcome.previous)
}
pub fn delete(&self, key: &[u8]) -> Result<bool> {
self.delete_inner(key, false)
.map(|outcome| outcome.mutated)
}
pub fn remove(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
self.delete_inner(key, true)
.map(|outcome| outcome.previous)
}
fn delete_inner(&self, key: &[u8], wants_prev: bool) -> Result<engine::EraseOutcome> {
let _maintenance = self.maintenance_gate.enter_shared();
let search = engine::SearchKey::user(key);
let seq = self.next_seq.fetch_add(1, Ordering::Relaxed);
let (outcome, journal_ack) = if let Some(journal) = &self.journal {
let _commit = self.commit_gate.enter_writer();
let outcome = engine::erase_multi(
&self.backend,
&self.root_pin,
Some(&self.route_cache),
search,
seq,
wants_prev,
)?;
if outcome.mutated {
if outcome.root_dirty {
self.backend
.mark_dirty_cached(self.root_guid, seq, self.root_pin.as_ref());
}
let mut record = Vec::with_capacity(encoded_erase_record_len(key.len()));
encode_erase_record(&mut record, seq, 0, key);
let ack = journal.submit(record, self.cfg.wal_sync_on_commit)?;
(outcome, ack)
} else {
(outcome, None)
}
} else {
let outcome = engine::erase_multi(
&self.backend,
&self.root_pin,
Some(&self.route_cache),
search,
seq,
wants_prev,
)?;
if outcome.mutated && outcome.root_dirty {
self.backend
.mark_dirty_cached(self.root_guid, seq, self.root_pin.as_ref());
}
if self.cfg.memory_flush_on_write {
self.flush_dirty_inline()?;
self.flush_pending_deletes_inline()?;
}
(outcome, None)
};
if let Some(ack) = journal_ack {
ack.wait()?;
}
Ok(outcome)
}
pub fn rename(&self, src: &[u8], dst: &[u8], force: bool) -> Result<()> {
let _maintenance = self.maintenance_gate.enter_shared();
let src_search = engine::SearchKey::user(src);
let dst_search = engine::SearchKey::user(dst);
let seq = self.next_seq.fetch_add(1, Ordering::Relaxed);
let _r = self.rename_lock.lock().unwrap();
let Some(value) =
engine::lookup_multi_with(&self.backend, &self.root_pin, src_search, <[u8]>::to_vec)?
else {
return Err(Error::NotFound);
};
if src == dst {
return Ok(());
}
if !force
&& engine::lookup_multi_with(&self.backend, &self.root_pin, dst_search, <[u8]>::to_vec)?
.is_some()
{
return Err(Error::DstExists);
}
let journal_ack = if let Some(journal) = &self.journal {
let _commit = self.commit_gate.enter_writer();
let erase_out = engine::erase_multi(
&self.backend,
&self.root_pin,
Some(&self.route_cache),
src_search,
seq,
false,
)?;
let insert_out = engine::insert_multi(
&self.backend,
&self.root_pin,
Some(&self.route_cache),
dst_search,
&value,
seq,
false,
)?;
if erase_out.root_dirty || insert_out.root_dirty {
self.backend
.mark_dirty_cached(self.root_guid, seq, self.root_pin.as_ref());
}
let mut record =
Vec::with_capacity(encoded_rename_object_record_len(src.len(), dst.len()));
encode_rename_object_record(&mut record, seq, 0, src, dst, force);
journal.submit(record, self.cfg.wal_sync_on_commit)?
} else {
let erase_out = engine::erase_multi(
&self.backend,
&self.root_pin,
Some(&self.route_cache),
src_search,
seq,
false,
)?;
let insert_out = engine::insert_multi(
&self.backend,
&self.root_pin,
Some(&self.route_cache),
dst_search,
&value,
seq,
false,
)?;
if erase_out.root_dirty || insert_out.root_dirty {
self.backend
.mark_dirty_cached(self.root_guid, seq, self.root_pin.as_ref());
}
if self.cfg.memory_flush_on_write {
self.flush_dirty_inline()?;
self.flush_pending_deletes_inline()?;
}
None
};
if let Some(ack) = journal_ack {
ack.wait()?;
}
Ok(())
}
pub fn txn<F>(&self, build: F) -> Result<()>
where
F: FnOnce(&mut TxnBatch),
{
let mut batch = TxnBatch::default();
build(&mut batch);
if batch.pending.is_empty() {
return Ok(());
}
self.apply_batch(batch.pending)
}
fn apply_batch(&self, pending: Vec<BatchOp>) -> Result<()> {
let _maintenance = self.maintenance_gate.enter_shared();
let count = pending.len() as u64;
let _r = self.rename_lock.lock().unwrap();
let base_seq = self.next_seq.fetch_add(count, Ordering::Relaxed);
if let Some(journal) = &self.journal {
let ack = {
let _commit = self.commit_gate.enter_writer();
let mut record = Vec::with_capacity(encoded_batch_record_len(&pending));
let mut enc = BatchEncoder::begin(&mut record, base_seq, 0);
self.apply_batch_walker_inline(pending, base_seq, Some(&mut enc))?;
let _n = enc.finish();
journal.submit(record, self.cfg.wal_sync_on_commit)?
};
if let Some(ack) = ack {
ack.wait()?;
}
} else {
self.apply_batch_walker_inline(pending, base_seq, None)?;
if self.cfg.memory_flush_on_write {
self.flush_dirty_inline()?;
self.flush_pending_deletes_inline()?;
}
}
Ok(())
}
fn apply_batch_walker_inline(
&self,
pending: Vec<BatchOp>,
base_seq: u64,
mut enc: Option<&mut crate::journal::codec::BatchEncoder<'_>>,
) -> Result<()> {
for (i, op) in pending.into_iter().enumerate() {
let seq = base_seq + i as u64;
match op {
BatchOp::Put { key, value } => {
let search = engine::SearchKey::user(&key);
let outcome = engine::insert_multi(
&self.backend,
&self.root_pin,
Some(&self.route_cache),
search,
&value,
seq,
false,
)?;
if outcome.root_dirty {
self.backend
.mark_dirty_cached(self.root_guid, seq, self.root_pin.as_ref());
}
if let Some(enc) = enc.as_deref_mut() {
enc.push_insert(0, &key, &value);
}
}
BatchOp::Delete { key } => {
let search = engine::SearchKey::user(&key);
let outcome = engine::erase_multi(
&self.backend,
&self.root_pin,
Some(&self.route_cache),
search,
seq,
false,
)?;
if outcome.mutated {
if outcome.root_dirty {
self.backend.mark_dirty_cached(
self.root_guid,
seq,
self.root_pin.as_ref(),
);
}
if let Some(enc) = enc.as_deref_mut() {
enc.push_erase(0, &key);
}
}
}
BatchOp::Rename { src, dst, force } => {
self.apply_batch_rename_walker(&src, &dst, force, seq)?;
if let Some(enc) = enc.as_deref_mut() {
enc.push_rename_object(0, &src, &dst, force);
}
}
}
}
Ok(())
}
fn apply_batch_rename_walker(
&self,
src: &[u8],
dst: &[u8],
force: bool,
seq: u64,
) -> Result<()> {
let src_search = engine::SearchKey::user(src);
let dst_search = engine::SearchKey::user(dst);
let Some(value) =
engine::lookup_multi_with(&self.backend, &self.root_pin, src_search, <[u8]>::to_vec)?
else {
return Err(Error::NotFound);
};
if src == dst {
return Ok(());
}
if !force
&& engine::lookup_multi_with(&self.backend, &self.root_pin, dst_search, |_| ())?
.is_some()
{
return Err(Error::DstExists);
}
let erase_out = engine::erase_multi(
&self.backend,
&self.root_pin,
Some(&self.route_cache),
src_search,
seq,
false,
)?;
let insert_out = engine::insert_multi(
&self.backend,
&self.root_pin,
Some(&self.route_cache),
dst_search,
&value,
seq,
false,
)?;
if erase_out.root_dirty || insert_out.root_dirty {
self.backend
.mark_dirty_cached(self.root_guid, seq, self.root_pin.as_ref());
}
Ok(())
}
pub fn range(&self) -> RangeBuilder {
RangeBuilder::new(
Arc::clone(&self.backend),
self.root_guid,
Arc::clone(&self.maintenance_gate),
)
}
pub fn scan_prefix(&self, prefix: &[u8]) -> RangeBuilder {
self.range().prefix(prefix)
}
fn flush_dirty_inline(&self) -> Result<()> {
let snap = self.backend.snapshot_dirty();
let mut failed: std::collections::HashMap<BlobGuid, u64> = std::collections::HashMap::new();
let mut first_err: Option<Error> = None;
let mut entries = Vec::with_capacity(snap.len());
for (guid, expected_seq) in snap {
if let Some(bytes) = self.backend.snapshot_bytes(guid) {
entries.push(WriteThroughEntry {
guid,
bytes,
expected_seq,
});
} else {
failed.insert(guid, expected_seq);
first_err.get_or_insert(Error::Internal(
"flush_dirty_inline: dirty entry lost cache image — invariant I1 violated",
));
}
}
if !entries.is_empty() {
let expected: Vec<_> = entries
.iter()
.map(|entry| (entry.guid, entry.expected_seq))
.collect();
if let Err(e) = self.backend.write_through_batch(&entries) {
for (guid, expected_seq) in expected {
failed.insert(guid, expected_seq);
}
if first_err.is_none() {
first_err = Some(e);
}
}
}
if !failed.is_empty() {
self.backend.restore_dirty(failed);
}
if let Some(e) = first_err {
return Err(e);
}
Ok(())
}
fn flush_pending_deletes_inline(&self) -> Result<()> {
let pending = self.backend.snapshot_pending_deletes();
let mut failed: std::collections::HashMap<BlobGuid, u64> = std::collections::HashMap::new();
let mut first_err: Option<Error> = None;
for (guid, seq) in pending {
if let Err(e) = self.backend.execute_pending_delete(guid) {
failed.insert(guid, seq);
if first_err.is_none() {
first_err = Some(e);
}
}
}
if !failed.is_empty() {
self.backend.restore_pending_deletes(failed);
}
if let Some(e) = first_err {
return Err(e);
}
Ok(())
}
#[allow(clippy::too_many_lines)]
pub fn checkpoint(&self) -> Result<()> {
use std::collections::HashMap;
let _maintenance = self.maintenance_gate.enter_shared();
let (_snap_dirty, snap_pending, snap_bytes) = if let Some(journal) = &self.journal {
let _commit = self.commit_gate.enter_checkpoint();
let snap_dirty = self.backend.snapshot_dirty();
let snap_pending = self.backend.snapshot_pending_deletes();
if let Err(e) = journal.flush() {
self.backend.restore_pending_deletes(snap_pending);
self.backend.restore_dirty(snap_dirty);
return Err(e);
}
let mut snap_bytes = Vec::with_capacity(snap_dirty.len());
for (guid, expected_seq) in &snap_dirty {
let Some(bytes) = self.backend.snapshot_bytes(*guid) else {
self.backend.restore_pending_deletes(snap_pending);
self.backend.restore_dirty(snap_dirty);
return Err(Error::Internal(
"checkpoint: dirty entry lost cache image — invariant I1 violated",
));
};
snap_bytes.push((*guid, *expected_seq, bytes));
}
(snap_dirty, snap_pending, snap_bytes)
} else {
let snap_dirty = self.backend.snapshot_dirty();
let snap_pending = self.backend.snapshot_pending_deletes();
let mut snap_bytes = Vec::with_capacity(snap_dirty.len());
for (guid, expected_seq) in &snap_dirty {
let Some(bytes) = self.backend.snapshot_bytes(*guid) else {
self.backend.restore_pending_deletes(snap_pending);
self.backend.restore_dirty(snap_dirty);
return Err(Error::Internal(
"checkpoint: dirty entry lost cache image — invariant I1 violated",
));
};
snap_bytes.push((*guid, *expected_seq, bytes));
}
(snap_dirty, snap_pending, snap_bytes)
};
let mut dirty_failed: HashMap<BlobGuid, u64> = HashMap::new();
let mut first_dirty_err: Option<Error> = None;
let entries: Vec<_> = snap_bytes
.into_iter()
.map(|(guid, expected_seq, bytes)| WriteThroughEntry {
guid,
bytes,
expected_seq,
})
.collect();
if !entries.is_empty() {
let expected: Vec<_> = entries
.iter()
.map(|entry| (entry.guid, entry.expected_seq))
.collect();
if let Err(e) = self.backend.write_through_batch(&entries) {
for (guid, expected_seq) in expected {
dirty_failed.insert(guid, expected_seq);
}
if first_dirty_err.is_none() {
first_dirty_err = Some(e);
}
}
}
let had_dirty_failure = !dirty_failed.is_empty();
if had_dirty_failure {
self.backend.restore_dirty(dirty_failed);
}
if entries.is_empty() && snap_pending.is_empty() && !self.backend.needs_flush() {
if let Some(journal) = &self.journal {
if journal.needs_checkpoint() {
let _commit = self.commit_gate.enter_checkpoint();
if self.backend.dirty_count() == 0 && self.backend.pending_delete_count() == 0 {
journal.truncate()?;
}
}
}
return Ok(());
}
if let Err(e) = self.backend.flush() {
self.backend.restore_pending_deletes(snap_pending);
return Err(e);
}
if had_dirty_failure {
self.backend.restore_pending_deletes(snap_pending);
return Err(first_dirty_err.expect("had_dirty_failure ⇒ first_dirty_err set"));
}
let mut pending_failed: HashMap<BlobGuid, u64> = HashMap::new();
let mut first_pending_err: Option<Error> = None;
for (guid, seq) in &snap_pending {
if let Err(e) = self.backend.execute_pending_delete(*guid) {
pending_failed.insert(*guid, *seq);
if first_pending_err.is_none() {
first_pending_err = Some(e);
}
}
}
if !pending_failed.is_empty() {
self.backend.restore_pending_deletes(pending_failed.clone());
}
let applied_deletes = snap_pending.len() - pending_failed.len();
if applied_deletes > 0 {
if let Err(e) = self.backend.flush() {
let restore_applied: HashMap<BlobGuid, u64> = snap_pending
.iter()
.filter(|(g, _)| !pending_failed.contains_key(*g))
.map(|(g, s)| (*g, *s))
.collect();
self.backend.restore_pending_deletes(restore_applied);
return Err(e);
}
}
if let Some(e) = first_pending_err {
return Err(e);
}
if let Some(journal) = &self.journal {
let _commit = self.commit_gate.enter_checkpoint();
if self.backend.dirty_count() == 0 && self.backend.pending_delete_count() == 0 {
journal.truncate()?;
}
}
Ok(())
}
pub fn stats(&self) -> Result<TreeStats> {
let _maintenance = self.maintenance_gate.enter_shared();
let guids = engine::collect_blob_guids_silent(&self.backend, self.root_guid)?;
let mut blobs: Vec<BlobStats> = Vec::with_capacity(guids.len());
let mut total_space_used: u64 = 0;
let mut total_gap_space: u64 = 0;
let mut total_slots: u64 = 0;
let mut total_compactions: u64 = 0;
let mut total_tombstones: u64 = 0;
for guid in &guids {
let pin = self.backend.pin_silent(*guid)?;
let guard = pin.read();
let frame = BlobFrameRef::wrap(guard.as_slice());
let h = frame.header();
let s = BlobStats {
guid: *guid,
space_used: h.space_used,
gap_space: h.gap_space,
num_slots: h.num_slots,
num_ext_blobs: h.num_ext_blobs,
compact_times: h.compact_times,
tombstone_leaf_cnt: h.tombstone_leaf_cnt,
};
total_space_used += u64::from(s.space_used);
total_gap_space += u64::from(s.gap_space);
total_slots += u64::from(s.num_slots);
total_compactions += u64::from(s.compact_times);
total_tombstones += u64::from(s.tombstone_leaf_cnt);
blobs.push(s);
}
let bm_dirty_count = self.backend.dirty_count();
let bm_pending_delete_count = self.backend.pending_delete_count();
let bm_cache_hits = self.backend.cache_hits();
let bm_cache_misses = self.backend.cache_misses();
let bm_optimistic_restarts = self.backend.optimistic_restarts();
let bm_walker_ops = self.backend.walker_ops();
let bm_walker_blob_hops = self.backend.walker_blob_hops();
let bm_max_blob_hops = self.backend.max_blob_hops();
let bm_max_cross_blob_depth = self.backend.max_cross_blob_depth();
let bm_spillovers = self.backend.spillover_count();
let bm_merges = self.backend.merge_count();
let journal = self.journal.as_ref().map(|j| {
let s = j.stats();
JournalStats {
appends: s.appends,
batches: s.batches,
syncs: s.syncs,
}
});
let checkpointer = self.checkpointer.as_ref().map(|ck| CheckpointerStats {
rounds_attempted: ck.rounds_attempted(),
rounds_succeeded: ck.rounds_succeeded(),
blobs_flushed: ck.blobs_flushed(),
merges_total: ck.merges_total(),
truncates: ck.truncates(),
evictions: ck.evictions(),
});
Ok(TreeStats {
blob_count: blobs.len() as u32,
total_space_used,
total_gap_space,
total_slots,
total_compactions,
total_tombstones,
blobs,
bm_dirty_count,
bm_pending_delete_count,
bm_cache_hits,
bm_cache_misses,
bm_optimistic_restarts,
bm_walker_ops,
bm_walker_blob_hops,
bm_max_blob_hops,
bm_max_cross_blob_depth,
bm_spillovers,
bm_merges,
journal,
checkpointer,
})
}
pub fn compact(&self) -> Result<()> {
if self.backend.compaction_candidate_count() == 0
&& self.backend.merge_candidate_count() == 0
{
self.seed_maintenance_candidates()?;
}
let compact_guids = self
.backend
.pop_compaction_candidates(ONLINE_COMPACT_BLOB_BUDGET);
for guid in compact_guids {
self.compact_candidate_blob(guid)?;
}
let merge_guids = self
.backend
.pop_merge_candidates(ONLINE_MERGE_PARENT_BUDGET);
for guid in merge_guids {
self.merge_candidate_parent(guid)?;
}
Ok(())
}
fn seed_maintenance_candidates(&self) -> Result<()> {
let _maintenance = self.maintenance_gate.enter_shared();
let guids = engine::collect_blob_guids(&self.backend, self.root_guid)?;
for guid in guids {
let pin = self.backend.pin(guid)?;
let guard = pin.read();
let frame = BlobFrameRef::wrap(guard.as_slice());
let header = frame.header();
if engine::blob_needs_compaction(frame) {
self.backend.note_compaction_candidate(guid);
}
if header.num_ext_blobs != 0 {
self.backend.note_merge_candidate(guid);
}
}
Ok(())
}
fn compact_candidate_blob(&self, guid: BlobGuid) -> Result<()> {
use crate::store::buffer_manager::STRUCTURAL_SEQ;
let _maintenance = self.maintenance_gate.enter_shared();
if !self.backend.has_blob(guid)? {
return Ok(());
}
let pin = self.backend.pin(guid)?;
let needs_compaction = {
let guard = pin.read();
engine::blob_needs_compaction(BlobFrameRef::wrap(guard.as_slice()))
};
if !needs_compaction {
return Ok(());
}
let _commit = self
.journal
.as_ref()
.map(|_| self.commit_gate.enter_writer());
let compacted = {
let mut guard = pin.write();
let still_needs_compaction = {
let frame = guard.frame();
engine::blob_needs_compaction(frame.as_ref())
};
if still_needs_compaction {
engine::compact_blob(&mut guard)?;
}
still_needs_compaction
};
if compacted {
self.backend.mark_dirty(guid, STRUCTURAL_SEQ);
}
drop(pin);
Ok(())
}
fn merge_candidate_parent(&self, guid: BlobGuid) -> Result<()> {
use crate::store::buffer_manager::STRUCTURAL_SEQ;
let _maintenance = self.maintenance_gate.enter_exclusive();
if !self.backend.has_blob(guid)? {
return Ok(());
}
let _commit = self
.journal
.as_ref()
.map(|_| self.commit_gate.enter_writer());
let pin = self.backend.pin(guid)?;
let (merged, has_children) = {
let mut guard = pin.write();
let mut frame = guard.frame();
let merged = engine::try_merge_children(&self.backend, &mut frame, STRUCTURAL_SEQ)?;
(merged, frame.header().num_ext_blobs != 0)
};
if merged.merged > 0 {
self.backend.mark_dirty(guid, STRUCTURAL_SEQ);
self.backend.note_merges(u64::from(merged.merged));
if has_children {
self.backend.note_merge_candidate(guid);
}
}
drop(pin);
Ok(())
}
#[must_use]
pub fn config(&self) -> &TreeConfig {
&self.cfg
}
#[must_use]
pub const fn page_size() -> u32 {
PAGE_SIZE
}
}
fn replay_wal(path: &std::path::Path, bm: &Arc<BufferManager>, root_guid: BlobGuid) -> Result<u64> {
let root_pin = bm.pin(root_guid)?;
let mut highest = 0u64;
let _ = replay(path, |op, seq, _off| {
highest = highest.max(seq);
let root_dirty = match op {
TxnOp::Insert { key, value, .. } => {
let search = engine::SearchKey::user(key);
engine::insert_multi(bm, &root_pin, None, search, value, seq, false)?.root_dirty
}
TxnOp::Erase { key, .. } => {
let search = engine::SearchKey::user(key);
engine::erase_multi(bm, &root_pin, None, search, seq, false)?.root_dirty
}
TxnOp::RenameObject {
src_key,
dst_key,
force,
..
} => {
let src_search = engine::SearchKey::user(src_key);
let dst_search = engine::SearchKey::user(dst_key);
if engine::lookup_multi_with(bm, &root_pin, src_search, |_| ())?.is_none() {
return Ok(());
}
if !force && engine::lookup_multi_with(bm, &root_pin, dst_search, |_| ())?.is_some()
{
return Ok(());
}
let value = engine::lookup_multi_with(bm, &root_pin, src_search, <[u8]>::to_vec)?
.unwrap_or_default();
let erase_out = engine::erase_multi(bm, &root_pin, None, src_search, seq, false)?;
let insert_out =
engine::insert_multi(bm, &root_pin, None, dst_search, &value, seq, false)?;
erase_out.root_dirty || insert_out.root_dirty
}
TxnOp::Split { .. }
| TxnOp::Merge { .. }
| TxnOp::Compact { .. }
| TxnOp::Rename { .. }
| TxnOp::NewTree { .. }
| TxnOp::RmTree { .. }
| TxnOp::MemMarker { .. }
| TxnOp::Batch { .. } => false,
};
if root_dirty {
bm.mark_dirty(root_guid, seq);
}
Ok(())
})?;
Ok(highest + 1)
}
#[cfg(test)]
mod tests {
use crate::TreeBuilder;
use std::sync::mpsc::sync_channel;
use std::thread;
use std::time::Duration;
#[test]
fn strict_prefix_point_keys_round_trip_through_public_api() {
let tree = TreeBuilder::new("ignored").memory().open().unwrap();
tree.put(b"abc", b"short").unwrap();
tree.put(b"abcdef", b"long").unwrap();
assert_eq!(tree.get(b"abc").unwrap().as_deref(), Some(&b"short"[..]));
assert_eq!(tree.get(b"abcdef").unwrap().as_deref(), Some(&b"long"[..]));
assert!(tree.delete(b"abc").unwrap());
assert_eq!(tree.get(b"abc").unwrap(), None);
assert_eq!(tree.get(b"abcdef").unwrap().as_deref(), Some(&b"long"[..]));
}
#[test]
fn compact_waits_for_maintenance_read_guard() {
let tree = TreeBuilder::new("ignored")
.memory()
.buffer_pool_size(16)
.open()
.unwrap();
let big = vec![0xCDu8; 4 * 1024];
for i in 0..256u32 {
tree.put(format!("k{i:08}").as_bytes(), &big).unwrap();
}
for i in 0..248u32 {
tree.delete(format!("k{i:08}").as_bytes()).unwrap();
}
assert!(
tree.stats().unwrap().blob_count > 1,
"test precondition: compact must have a BlobNode merge phase"
);
let read_guard = tree.maintenance_gate.enter_shared();
let worker_tree = tree.clone();
let (started_tx, started_rx) = sync_channel(0);
let (done_tx, done_rx) = sync_channel(0);
let handle = thread::spawn(move || {
started_tx.send(()).unwrap();
worker_tree.compact().unwrap();
done_tx.send(()).unwrap();
});
started_rx.recv().unwrap();
assert!(
done_rx.recv_timeout(Duration::from_millis(50)).is_err(),
"compact must wait behind active shared maintenance readers"
);
drop(read_guard);
done_rx.recv_timeout(Duration::from_secs(1)).unwrap();
handle.join().unwrap();
}
}