use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use super::config::{Storage, TreeConfig};
use super::errors::{Error, Result};
use super::stats::{BlobStats, CheckpointerStats, TreeStats};
use crate::engine;
use crate::engine::RangeBuilder;
use crate::journal::reader::replay;
use crate::journal::txn_op::TxnOp;
use crate::journal::writer::WalWriter;
use crate::layout::{BlobGuid, PAGE_SIZE};
use crate::store::backend::{AlignedBlobBuf, Backend, MemoryBackend, PersistentBackend};
use crate::store::{BlobFrame, BlobFrameRef, BufferManager, CachedBlob};
use super::txn::{BatchOp, TxnBatch};
#[derive(Clone)]
pub struct Tree {
cfg: TreeConfig,
backend: Arc<BufferManager>,
root_guid: BlobGuid,
root_pin: Arc<CachedBlob>,
rename_lock: Arc<Mutex<()>>,
next_seq: Arc<AtomicU64>,
wal: Option<Arc<Mutex<WalWriter>>>,
checkpointer: Option<Arc<crate::checkpoint::Checkpointer>>,
}
impl std::fmt::Debug for Tree {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Tree")
.field("storage", &self.cfg.storage)
.field("root_guid", &self.root_guid)
.finish_non_exhaustive()
}
}
pub(crate) const ROOT_BLOB_GUID: BlobGuid = [0; 16];
#[inline]
fn pad_key(key: &[u8]) -> Vec<u8> {
let mut padded = Vec::with_capacity(key.len() + 1);
padded.extend_from_slice(key);
padded.push(0u8);
padded
}
impl Tree {
pub fn open(cfg: TreeConfig) -> Result<Self> {
let backend: Arc<dyn Backend> = match &cfg.storage {
Storage::Memory => Arc::new(MemoryBackend::new()),
Storage::Persistent { dir } => Arc::new(PersistentBackend::open(dir)?),
};
Self::open_inner(cfg, backend, true)
}
pub fn open_with_backend(cfg: TreeConfig, backend: Arc<dyn Backend>) -> Result<Self> {
Self::open_inner(cfg, backend, false)
}
fn open_inner(cfg: TreeConfig, backend: Arc<dyn Backend>, attach_wal: bool) -> Result<Self> {
let bm: Arc<BufferManager> = Arc::new(BufferManager::new(backend, cfg.buffer_pool_size));
let root_guid = ROOT_BLOB_GUID;
if !bm.has_blob(root_guid)? {
let mut scratch = AlignedBlobBuf::zeroed();
BlobFrame::init(scratch.as_mut_slice(), root_guid)?;
bm.write_blob(root_guid, &scratch)?;
bm.flush()?;
}
let (wal, next_seq) = if attach_wal {
match cfg.wal_path() {
None => (None, 1u64),
Some(path) => {
let next_seq = if path.exists() {
replay_wal(&path, &bm, root_guid)?
} else {
1
};
let writer = WalWriter::open_or_create(&path, 0)?;
(Some(Arc::new(Mutex::new(writer))), next_seq)
}
}
} else {
(None, 1u64)
};
let root_pin = bm.pin(root_guid)?;
let checkpointer = crate::checkpoint::Checkpointer::spawn(
Arc::clone(&bm),
wal.clone(),
root_guid,
cfg.checkpoint.clone(),
)
.map(Arc::new);
Ok(Self {
cfg,
backend: bm,
root_guid,
root_pin,
rename_lock: Arc::new(Mutex::new(())),
next_seq: Arc::new(AtomicU64::new(next_seq)),
wal,
checkpointer,
})
}
pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
let padded = pad_key(key);
engine::lookup_multi(&self.backend, &self.root_pin, &padded)
}
pub fn put(&self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
let padded = pad_key(key);
let seq = self.next_seq.fetch_add(1, Ordering::SeqCst);
let outcome = if let Some(wal) = &self.wal {
let mut w = wal.lock().unwrap();
let outcome = engine::insert_multi(
&self.backend,
&self.root_pin,
&padded,
value,
seq,
)?;
self.backend.mark_dirty(self.root_guid, seq);
w.append_insert(seq, 0, key, value, outcome.previous.as_deref())?;
if self.cfg.wal_sync_on_commit {
w.flush()?;
}
outcome
} else {
let outcome = engine::insert_multi(
&self.backend,
&self.root_pin,
&padded,
value,
seq,
)?;
self.backend.mark_dirty(self.root_guid, seq);
if self.cfg.memory_flush_on_write {
self.flush_dirty_inline()?;
self.flush_pending_deletes_inline()?;
}
outcome
};
Ok(outcome.previous)
}
pub fn delete(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
let padded = pad_key(key);
let seq = self.next_seq.fetch_add(1, Ordering::SeqCst);
let outcome = if let Some(wal) = &self.wal {
let mut w = wal.lock().unwrap();
let outcome = engine::erase_multi(&self.backend, &self.root_pin, &padded, seq)?;
if let Some(prev) = &outcome.previous {
self.backend.mark_dirty(self.root_guid, seq);
w.append_erase(seq, 0, key, prev)?;
if self.cfg.wal_sync_on_commit {
w.flush()?;
}
}
outcome
} else {
let outcome = engine::erase_multi(&self.backend, &self.root_pin, &padded, seq)?;
if outcome.previous.is_some() {
self.backend.mark_dirty(self.root_guid, seq);
}
if self.cfg.memory_flush_on_write {
self.flush_dirty_inline()?;
self.flush_pending_deletes_inline()?;
}
outcome
};
Ok(outcome.previous)
}
pub fn rename(&self, src: &[u8], dst: &[u8], force: bool) -> Result<()> {
let src_padded = pad_key(src);
let dst_padded = pad_key(dst);
let seq = self.next_seq.fetch_add(1, Ordering::SeqCst);
let _r = self.rename_lock.lock().unwrap();
let Some(value) = engine::lookup_multi(&self.backend, &self.root_pin, &src_padded)? else {
return Err(Error::NotFound);
};
if src == dst {
return Ok(());
}
if !force && engine::lookup_multi(&self.backend, &self.root_pin, &dst_padded)?.is_some() {
return Err(Error::DstExists);
}
if let Some(wal) = &self.wal {
let mut w = wal.lock().unwrap();
engine::erase_multi(&self.backend, &self.root_pin, &src_padded, seq)?;
engine::insert_multi(&self.backend, &self.root_pin, &dst_padded, &value, seq)?;
self.backend.mark_dirty(self.root_guid, seq);
w.append_rename_object(seq, 0, src, dst, force)?;
if self.cfg.wal_sync_on_commit {
w.flush()?;
}
} else {
engine::erase_multi(&self.backend, &self.root_pin, &src_padded, seq)?;
engine::insert_multi(&self.backend, &self.root_pin, &dst_padded, &value, seq)?;
self.backend.mark_dirty(self.root_guid, seq);
if self.cfg.memory_flush_on_write {
self.flush_dirty_inline()?;
self.flush_pending_deletes_inline()?;
}
}
Ok(())
}
pub fn txn<F>(&self, build: F) -> Result<()>
where
F: FnOnce(&mut TxnBatch),
{
let mut batch = TxnBatch::default();
build(&mut batch);
if batch.pending.is_empty() {
return Ok(());
}
self.apply_batch(batch.pending)
}
fn apply_batch(&self, pending: Vec<BatchOp>) -> Result<()> {
let count = pending.len() as u64;
let _r = self.rename_lock.lock().unwrap();
let base_seq = self.next_seq.fetch_add(count, Ordering::SeqCst);
if let Some(wal) = &self.wal {
let mut w = wal.lock().unwrap();
let mut wal_ops: Vec<TxnOp> = Vec::with_capacity(pending.len());
for (i, op) in pending.into_iter().enumerate() {
let seq = base_seq + i as u64;
match op {
BatchOp::Put { key, value } => {
wal_ops.push(self.apply_put_inner(&key, &value, seq)?);
}
BatchOp::Delete { key } => {
if let Some(entry) = self.apply_delete_inner(&key, seq)? {
wal_ops.push(entry);
}
}
BatchOp::Rename { src, dst, force } => {
wal_ops.push(self.apply_rename_inner(&src, &dst, force, seq)?);
}
}
}
let envelope = TxnOp::Batch {
tree_id: 0,
ops: wal_ops,
};
w.append(&envelope, base_seq)?;
if self.cfg.wal_sync_on_commit {
w.flush()?;
}
} else {
for (i, op) in pending.into_iter().enumerate() {
let seq = base_seq + i as u64;
match op {
BatchOp::Put { key, value } => {
let _ = self.apply_put_inner(&key, &value, seq)?;
}
BatchOp::Delete { key } => {
let _ = self.apply_delete_inner(&key, seq)?;
}
BatchOp::Rename { src, dst, force } => {
let _ = self.apply_rename_inner(&src, &dst, force, seq)?;
}
}
}
if self.cfg.memory_flush_on_write {
self.flush_dirty_inline()?;
self.flush_pending_deletes_inline()?;
}
}
Ok(())
}
pub fn range(&self) -> RangeBuilder {
RangeBuilder::new(Arc::clone(&self.backend), self.root_guid)
}
pub fn scan_prefix(&self, prefix: &[u8]) -> RangeBuilder {
self.range().prefix(prefix)
}
fn flush_dirty_inline(&self) -> Result<()> {
let snap = self.backend.snapshot_dirty();
let mut failed: std::collections::HashMap<BlobGuid, u64> =
std::collections::HashMap::new();
let mut first_err: Option<Error> = None;
for (guid, expected_seq) in snap {
if let Some(bytes) = self.backend.snapshot_bytes(guid) {
if let Err(e) = self.backend.write_through(guid, &bytes, expected_seq) {
failed.insert(guid, expected_seq);
if first_err.is_none() {
first_err = Some(e);
}
}
}
}
if !failed.is_empty() {
self.backend.restore_dirty(failed);
}
if let Some(e) = first_err {
return Err(e);
}
Ok(())
}
fn flush_pending_deletes_inline(&self) -> Result<()> {
let pending = self.backend.snapshot_pending_deletes();
let mut failed: std::collections::HashMap<BlobGuid, u64> =
std::collections::HashMap::new();
let mut first_err: Option<Error> = None;
for (guid, seq) in pending {
if let Err(e) = self.backend.execute_pending_delete(guid) {
failed.insert(guid, seq);
if first_err.is_none() {
first_err = Some(e);
}
}
}
if !failed.is_empty() {
self.backend.restore_pending_deletes(failed);
}
if let Some(e) = first_err {
return Err(e);
}
Ok(())
}
fn apply_put_inner(&self, key: &[u8], value: &[u8], seq: u64) -> Result<TxnOp> {
let padded = pad_key(key);
let outcome = engine::insert_multi(&self.backend, &self.root_pin, &padded, value, seq)?;
self.backend.mark_dirty(self.root_guid, seq);
Ok(TxnOp::Insert {
tree_id: 0,
seq,
key: key.to_vec(),
value: value.to_vec(),
prev_value: outcome.previous,
})
}
fn apply_delete_inner(&self, key: &[u8], seq: u64) -> Result<Option<TxnOp>> {
let padded = pad_key(key);
let outcome = engine::erase_multi(&self.backend, &self.root_pin, &padded, seq)?;
if outcome.previous.is_some() {
self.backend.mark_dirty(self.root_guid, seq);
}
Ok(outcome.previous.map(|prev| TxnOp::Erase {
tree_id: 0,
seq,
key: key.to_vec(),
value: prev,
}))
}
fn apply_rename_inner(&self, src: &[u8], dst: &[u8], force: bool, seq: u64) -> Result<TxnOp> {
let src_padded = pad_key(src);
let dst_padded = pad_key(dst);
let Some(value) = engine::lookup_multi(&self.backend, &self.root_pin, &src_padded)? else {
return Err(Error::NotFound);
};
if src != dst {
if !force && engine::lookup_multi(&self.backend, &self.root_pin, &dst_padded)?.is_some()
{
return Err(Error::DstExists);
}
engine::erase_multi(&self.backend, &self.root_pin, &src_padded, seq)?;
engine::insert_multi(&self.backend, &self.root_pin, &dst_padded, &value, seq)?;
self.backend.mark_dirty(self.root_guid, seq);
}
Ok(TxnOp::RenameObject {
tree_id: 0,
seq,
src_key: src.to_vec(),
dst_key: dst.to_vec(),
force,
})
}
pub fn checkpoint(&self) -> Result<()> {
use std::collections::HashMap;
let (snap_dirty, snap_pending) = if let Some(wal) = &self.wal {
let mut w = wal.lock().unwrap();
let snap_dirty = self.backend.snapshot_dirty();
let snap_pending = self.backend.snapshot_pending_deletes();
if let Err(e) = w.flush() {
self.backend.restore_dirty(snap_dirty);
self.backend.restore_pending_deletes(snap_pending);
return Err(e);
}
(snap_dirty, snap_pending)
} else {
(
self.backend.snapshot_dirty(),
self.backend.snapshot_pending_deletes(),
)
};
let mut dirty_failed: HashMap<BlobGuid, u64> = HashMap::new();
let mut first_dirty_err: Option<Error> = None;
for (guid, expected_seq) in &snap_dirty {
if let Some(bytes) = self.backend.snapshot_bytes(*guid) {
if let Err(e) = self.backend.write_through(*guid, &bytes, *expected_seq) {
dirty_failed.insert(*guid, *expected_seq);
if first_dirty_err.is_none() {
first_dirty_err = Some(e);
}
}
}
}
let had_dirty_failure = !dirty_failed.is_empty();
if had_dirty_failure {
self.backend.restore_dirty(dirty_failed);
}
if let Err(e) = self.backend.flush() {
self.backend.restore_pending_deletes(snap_pending);
return Err(e);
}
if had_dirty_failure {
self.backend.restore_pending_deletes(snap_pending);
return Err(first_dirty_err.expect("had_dirty_failure ⇒ first_dirty_err set"));
}
let mut pending_failed: HashMap<BlobGuid, u64> = HashMap::new();
let mut first_pending_err: Option<Error> = None;
for (guid, seq) in &snap_pending {
if let Err(e) = self.backend.execute_pending_delete(*guid) {
pending_failed.insert(*guid, *seq);
if first_pending_err.is_none() {
first_pending_err = Some(e);
}
}
}
if !pending_failed.is_empty() {
self.backend.restore_pending_deletes(pending_failed.clone());
}
let applied_deletes = snap_pending.len() - pending_failed.len();
if applied_deletes > 0 {
if let Err(e) = self.backend.flush() {
let restore_applied: HashMap<BlobGuid, u64> = snap_pending
.iter()
.filter(|(g, _)| !pending_failed.contains_key(*g))
.map(|(g, s)| (*g, *s))
.collect();
self.backend.restore_pending_deletes(restore_applied);
return Err(e);
}
}
if let Some(e) = first_pending_err {
return Err(e);
}
if let Some(wal) = &self.wal {
let mut w = wal.lock().unwrap();
if self.backend.dirty_count() == 0 && self.backend.pending_delete_count() == 0 {
w.truncate()?;
}
}
Ok(())
}
pub fn stats(&self) -> Result<TreeStats> {
let guids = engine::collect_blob_guids_silent(&self.backend, self.root_guid)?;
let mut blobs: Vec<BlobStats> = Vec::with_capacity(guids.len());
let mut total_space_used: u64 = 0;
let mut total_gap_space: u64 = 0;
let mut total_slots: u64 = 0;
let mut total_compactions: u64 = 0;
let mut total_tombstones: u64 = 0;
for guid in &guids {
let pin = self.backend.pin_silent(*guid)?;
let guard = pin.read();
let frame = BlobFrameRef::wrap(guard.as_slice());
let h = frame.header();
let s = BlobStats {
guid: *guid,
space_used: h.space_used,
gap_space: h.gap_space,
num_slots: h.num_slots,
num_ext_blobs: h.num_ext_blobs,
compact_times: h.compact_times,
tombstone_leaf_cnt: h.tombstone_leaf_cnt,
};
total_space_used += u64::from(s.space_used);
total_gap_space += u64::from(s.gap_space);
total_slots += u64::from(s.num_slots);
total_compactions += u64::from(s.compact_times);
total_tombstones += u64::from(s.tombstone_leaf_cnt);
blobs.push(s);
}
let bm_dirty_count = self.backend.dirty_count();
let bm_pending_delete_count = self.backend.pending_delete_count();
let bm_cache_hits = self.backend.cache_hits();
let bm_cache_misses = self.backend.cache_misses();
let bm_optimistic_restarts = self.backend.optimistic_restarts();
let checkpointer = self.checkpointer.as_ref().map(|ck| CheckpointerStats {
rounds_attempted: ck.rounds_attempted(),
rounds_succeeded: ck.rounds_succeeded(),
blobs_flushed: ck.blobs_flushed(),
merges_total: ck.merges_total(),
truncates: ck.truncates(),
evictions: ck.evictions(),
});
Ok(TreeStats {
blob_count: blobs.len() as u32,
total_space_used,
total_gap_space,
total_slots,
total_compactions,
total_tombstones,
blobs,
bm_dirty_count,
bm_pending_delete_count,
bm_cache_hits,
bm_cache_misses,
bm_optimistic_restarts,
checkpointer,
})
}
pub fn compact(&self) -> Result<()> {
use crate::store::buffer_manager::STRUCTURAL_SEQ;
let guids = engine::collect_blob_guids(&self.backend, self.root_guid)?;
for guid in &guids {
let pin = self.backend.pin(*guid)?;
{
let mut guard = pin.write();
engine::compact_blob(&mut guard)?;
}
drop(pin);
self.backend.mark_dirty(*guid, STRUCTURAL_SEQ);
}
engine::refresh_blob_node_pointers(&self.backend, self.root_guid)?;
let parents = engine::collect_blob_guids(&self.backend, self.root_guid)?;
for guid in parents {
if !self.backend.has_blob(guid)? {
continue;
}
let pin = self.backend.pin(guid)?;
let merged = {
let mut guard = pin.write();
let mut frame = BlobFrame::wrap(guard.as_mut_slice());
engine::try_merge_children(&self.backend, &mut frame, STRUCTURAL_SEQ)?
};
drop(pin);
if merged.merged > 0 {
self.backend.mark_dirty(guid, STRUCTURAL_SEQ);
}
}
Ok(())
}
#[must_use]
pub fn config(&self) -> &TreeConfig {
&self.cfg
}
#[must_use]
pub const fn page_size() -> u32 {
PAGE_SIZE
}
}
fn replay_wal(path: &std::path::Path, bm: &Arc<BufferManager>, root_guid: BlobGuid) -> Result<u64> {
let root_pin = bm.pin(root_guid)?;
let mut highest = 0u64;
let _ = replay(path, |op, seq, _off| {
highest = highest.max(seq);
let touched_root = match op {
TxnOp::Insert { key, value, .. } => {
let padded = pad_key(key);
engine::insert_multi(bm, &root_pin, &padded, value, seq)?;
true
}
TxnOp::Erase { key, .. } => {
let padded = pad_key(key);
let outcome = engine::erase_multi(bm, &root_pin, &padded, seq)?;
outcome.previous.is_some()
}
TxnOp::RenameObject {
src_key,
dst_key,
force,
..
} => {
let src_padded = pad_key(src_key);
let dst_padded = pad_key(dst_key);
if engine::lookup_multi(bm, &root_pin, &src_padded)?.is_none() {
return Ok(());
}
if !force && engine::lookup_multi(bm, &root_pin, &dst_padded)?.is_some() {
return Ok(());
}
let value = engine::lookup_multi(bm, &root_pin, &src_padded)?.unwrap_or_default();
engine::erase_multi(bm, &root_pin, &src_padded, seq)?;
engine::insert_multi(bm, &root_pin, &dst_padded, &value, seq)?;
true
}
TxnOp::Split { .. }
| TxnOp::Merge { .. }
| TxnOp::Compact { .. }
| TxnOp::Rename { .. }
| TxnOp::NewTree { .. }
| TxnOp::RmTree { .. }
| TxnOp::MemMarker { .. }
| TxnOp::Batch { .. } => false,
};
if touched_root {
bm.mark_dirty(root_guid, seq);
}
Ok(())
})?;
Ok(highest + 1)
}