use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use super::config::{Storage, TreeConfig};
use super::errors::{Error, Result};
use crate::engine;
use crate::engine::RangeBuilder;
use crate::journal::reader::replay;
use crate::journal::txn_op::TxnOp;
use crate::journal::writer::WalWriter;
use crate::layout::{BlobGuid, PAGE_SIZE};
use crate::store::backend::{AlignedBlobBuf, Backend, MemoryBackend, PersistentBackend};
use crate::store::{BlobFrame, BlobFrameRef, BufferManager};
use super::txn::{BatchOp, TxnBatch};
#[derive(Clone)]
pub struct Tree {
cfg: TreeConfig,
backend: Arc<BufferManager>,
root_guid: BlobGuid,
rename_lock: Arc<Mutex<()>>,
next_seq: Arc<AtomicU64>,
wal: Option<Arc<Mutex<WalWriter>>>,
}
impl std::fmt::Debug for Tree {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Tree")
.field("storage", &self.cfg.storage)
.field("root_guid", &self.root_guid)
.finish_non_exhaustive()
}
}
pub(crate) const ROOT_BLOB_GUID: BlobGuid = [0; 16];
#[derive(Debug, Clone, Copy)]
pub struct BlobStats {
pub guid: BlobGuid,
pub space_used: u32,
pub gap_space: u32,
pub num_slots: u16,
pub num_ext_blobs: u16,
pub compact_times: u32,
pub tombstone_leaf_cnt: u32,
}
#[derive(Debug, Clone)]
pub struct TreeStats {
pub blob_count: u32,
pub total_space_used: u64,
pub total_gap_space: u64,
pub total_slots: u64,
pub total_compactions: u64,
pub total_tombstones: u64,
pub blobs: Vec<BlobStats>,
}
#[inline]
fn pad_key(key: &[u8]) -> Vec<u8> {
let mut padded = Vec::with_capacity(key.len() + 1);
padded.extend_from_slice(key);
padded.push(0u8);
padded
}
impl Tree {
pub fn open(cfg: TreeConfig) -> Result<Self> {
let backend: Arc<dyn Backend> = match &cfg.storage {
Storage::Memory => Arc::new(MemoryBackend::new()),
Storage::Persistent { dir } => Arc::new(PersistentBackend::open(dir)?),
};
Self::open_inner(cfg, backend, true)
}
pub fn open_with_backend(cfg: TreeConfig, backend: Arc<dyn Backend>) -> Result<Self> {
Self::open_inner(cfg, backend, false)
}
fn open_inner(cfg: TreeConfig, backend: Arc<dyn Backend>, attach_wal: bool) -> Result<Self> {
let bm: Arc<BufferManager> = Arc::new(BufferManager::new(backend, cfg.buffer_pool_size));
let root_guid = ROOT_BLOB_GUID;
if !bm.has_blob(root_guid)? {
let mut scratch = AlignedBlobBuf::zeroed();
BlobFrame::init(scratch.as_mut_slice(), root_guid)?;
bm.write_blob(root_guid, &scratch)?;
bm.flush()?;
}
let (wal, next_seq) = if attach_wal {
match cfg.wal_path() {
None => (None, 1u64),
Some(path) => {
let next_seq = if path.exists() {
replay_wal(&path, &bm, root_guid)?
} else {
1
};
let writer = WalWriter::open_or_create(&path, 0)?;
(Some(Arc::new(Mutex::new(writer))), next_seq)
}
}
} else {
(None, 1u64)
};
Ok(Self {
cfg,
backend: bm,
root_guid,
rename_lock: Arc::new(Mutex::new(())),
next_seq: Arc::new(AtomicU64::new(next_seq)),
wal,
})
}
pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
let padded = pad_key(key);
engine::lookup_multi(&self.backend, self.root_guid, &padded)
}
pub fn put(&self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
let padded = pad_key(key);
let seq = self.next_seq.fetch_add(1, Ordering::SeqCst);
let outcome = engine::insert_multi(&self.backend, self.root_guid, &padded, value, seq)?;
if let Some(wal) = &self.wal {
let mut w = wal.lock().unwrap();
w.append_insert(seq, 0, key, value, outcome.previous.as_deref())?;
if self.cfg.wal_sync_on_commit {
w.flush()?;
}
} else if self.cfg.flush_on_write {
self.backend.commit(self.root_guid)?;
}
Ok(outcome.previous)
}
pub fn delete(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
let padded = pad_key(key);
let outcome = engine::erase_multi(&self.backend, self.root_guid, &padded)?;
if let Some(wal) = &self.wal {
if let Some(prev) = &outcome.previous {
let seq = self.next_seq.fetch_add(1, Ordering::SeqCst);
let mut w = wal.lock().unwrap();
w.append_erase(seq, 0, key, prev)?;
if self.cfg.wal_sync_on_commit {
w.flush()?;
}
}
} else if self.cfg.flush_on_write {
self.backend.commit(self.root_guid)?;
}
Ok(outcome.previous)
}
pub fn rename(&self, src: &[u8], dst: &[u8], force: bool) -> Result<()> {
let src_padded = pad_key(src);
let dst_padded = pad_key(dst);
let seq = self.next_seq.fetch_add(1, Ordering::SeqCst);
let _r = self.rename_lock.lock().unwrap();
let Some(value) = engine::lookup_multi(&self.backend, self.root_guid, &src_padded)? else {
return Err(Error::NotFound);
};
if src == dst {
return Ok(());
}
if !force && engine::lookup_multi(&self.backend, self.root_guid, &dst_padded)?.is_some() {
return Err(Error::DstExists);
}
engine::erase_multi(&self.backend, self.root_guid, &src_padded)?;
engine::insert_multi(&self.backend, self.root_guid, &dst_padded, &value, seq)?;
if let Some(wal) = &self.wal {
let mut w = wal.lock().unwrap();
w.append_rename_object(seq, 0, src, dst, force)?;
if self.cfg.wal_sync_on_commit {
w.flush()?;
}
} else if self.cfg.flush_on_write {
self.backend.commit(self.root_guid)?;
}
Ok(())
}
pub fn txn<F>(&self, build: F) -> Result<()>
where
F: FnOnce(&mut TxnBatch),
{
let mut batch = TxnBatch::default();
build(&mut batch);
if batch.pending.is_empty() {
return Ok(());
}
self.apply_batch(batch.pending)
}
fn apply_batch(&self, pending: Vec<BatchOp>) -> Result<()> {
let count = pending.len() as u64;
let _r = self.rename_lock.lock().unwrap();
let base_seq = self.next_seq.fetch_add(count, Ordering::SeqCst);
let mut wal_ops: Vec<TxnOp> = Vec::with_capacity(pending.len());
for (i, op) in pending.into_iter().enumerate() {
let seq = base_seq + i as u64;
match op {
BatchOp::Put { key, value } => {
let entry = self.apply_put_inner(&key, &value, seq)?;
wal_ops.push(entry);
}
BatchOp::Delete { key } => {
if let Some(entry) = self.apply_delete_inner(&key, seq)? {
wal_ops.push(entry);
}
}
BatchOp::Rename { src, dst, force } => {
let entry = self.apply_rename_inner(&src, &dst, force, seq)?;
wal_ops.push(entry);
}
}
}
if let Some(wal) = &self.wal {
let mut w = wal.lock().unwrap();
let envelope = TxnOp::Batch {
tree_id: 0,
ops: wal_ops,
};
w.append(&envelope, base_seq)?;
if self.cfg.wal_sync_on_commit {
w.flush()?;
}
} else if self.cfg.flush_on_write {
self.backend.commit(self.root_guid)?;
}
Ok(())
}
pub fn range(&self) -> RangeBuilder {
RangeBuilder::new(Arc::clone(&self.backend), self.root_guid)
}
fn apply_put_inner(&self, key: &[u8], value: &[u8], seq: u64) -> Result<TxnOp> {
let padded = pad_key(key);
let outcome = engine::insert_multi(&self.backend, self.root_guid, &padded, value, seq)?;
Ok(TxnOp::Insert {
tree_id: 0,
seq,
key: key.to_vec(),
value: value.to_vec(),
prev_value: outcome.previous,
})
}
fn apply_delete_inner(&self, key: &[u8], seq: u64) -> Result<Option<TxnOp>> {
let padded = pad_key(key);
let outcome = engine::erase_multi(&self.backend, self.root_guid, &padded)?;
Ok(outcome.previous.map(|prev| TxnOp::Erase {
tree_id: 0,
seq,
key: key.to_vec(),
value: prev,
}))
}
fn apply_rename_inner(&self, src: &[u8], dst: &[u8], force: bool, seq: u64) -> Result<TxnOp> {
let src_padded = pad_key(src);
let dst_padded = pad_key(dst);
let Some(value) = engine::lookup_multi(&self.backend, self.root_guid, &src_padded)? else {
return Err(Error::NotFound);
};
if src != dst {
if !force && engine::lookup_multi(&self.backend, self.root_guid, &dst_padded)?.is_some()
{
return Err(Error::DstExists);
}
engine::erase_multi(&self.backend, self.root_guid, &src_padded)?;
engine::insert_multi(&self.backend, self.root_guid, &dst_padded, &value, seq)?;
}
Ok(TxnOp::RenameObject {
tree_id: 0,
seq,
src_key: src.to_vec(),
dst_key: dst.to_vec(),
force,
})
}
pub fn checkpoint(&self) -> Result<()> {
if let Some(wal) = &self.wal {
wal.lock().unwrap().flush()?;
}
self.backend.commit(self.root_guid)?;
self.backend.flush()?;
if let Some(wal) = &self.wal {
wal.lock().unwrap().truncate()?;
}
Ok(())
}
pub fn stats(&self) -> Result<TreeStats> {
let guids = engine::collect_blob_guids(&self.backend, self.root_guid)?;
let mut blobs: Vec<BlobStats> = Vec::with_capacity(guids.len());
let mut total_space_used: u64 = 0;
let mut total_gap_space: u64 = 0;
let mut total_slots: u64 = 0;
let mut total_compactions: u64 = 0;
let mut total_tombstones: u64 = 0;
for guid in &guids {
let pin = self.backend.pin(*guid)?;
let guard = pin.read();
let frame = BlobFrameRef::wrap(guard.as_slice());
let h = frame.header();
let s = BlobStats {
guid: *guid,
space_used: h.space_used,
gap_space: h.gap_space,
num_slots: h.num_slots,
num_ext_blobs: h.num_ext_blobs,
compact_times: h.compact_times,
tombstone_leaf_cnt: h.tombstone_leaf_cnt,
};
total_space_used += u64::from(s.space_used);
total_gap_space += u64::from(s.gap_space);
total_slots += u64::from(s.num_slots);
total_compactions += u64::from(s.compact_times);
total_tombstones += u64::from(s.tombstone_leaf_cnt);
blobs.push(s);
}
Ok(TreeStats {
blob_count: blobs.len() as u32,
total_space_used,
total_gap_space,
total_slots,
total_compactions,
total_tombstones,
blobs,
})
}
pub fn compact(&self) -> Result<()> {
let guids = engine::collect_blob_guids(&self.backend, self.root_guid)?;
for guid in &guids {
let pin = self.backend.pin(*guid)?;
{
let mut guard = pin.write();
engine::compact_blob(&mut guard)?;
}
drop(pin);
self.backend.commit(*guid)?;
}
engine::refresh_blob_node_pointers(&self.backend, self.root_guid)?;
let parents = engine::collect_blob_guids(&self.backend, self.root_guid)?;
for guid in parents {
if !self.backend.has_blob(guid)? {
continue;
}
let pin = self.backend.pin(guid)?;
let merged = {
let mut guard = pin.write();
let mut frame = BlobFrame::wrap(guard.as_mut_slice());
engine::try_merge_children(&self.backend, &mut frame)?
};
drop(pin);
if merged.merged > 0 {
self.backend.commit(guid)?;
}
}
Ok(())
}
#[must_use]
pub fn config(&self) -> &TreeConfig {
&self.cfg
}
#[must_use]
pub const fn page_size() -> u32 {
PAGE_SIZE
}
}
fn replay_wal(path: &std::path::Path, bm: &Arc<BufferManager>, root_guid: BlobGuid) -> Result<u64> {
let mut highest = 0u64;
let _ = replay(path, |op, seq, _off| {
match op {
TxnOp::Insert { key, value, .. } => {
let padded = pad_key(key);
engine::insert_multi(bm, root_guid, &padded, value, seq)?;
}
TxnOp::Erase { key, .. } => {
let padded = pad_key(key);
engine::erase_multi(bm, root_guid, &padded)?;
}
TxnOp::RenameObject {
src_key,
dst_key,
force,
..
} => {
let src_padded = pad_key(src_key);
let dst_padded = pad_key(dst_key);
if engine::lookup_multi(bm, root_guid, &src_padded)?.is_none() {
return Ok(());
}
if !force && engine::lookup_multi(bm, root_guid, &dst_padded)?.is_some() {
return Ok(());
}
let value = engine::lookup_multi(bm, root_guid, &src_padded)?.unwrap_or_default();
engine::erase_multi(bm, root_guid, &src_padded)?;
engine::insert_multi(bm, root_guid, &dst_padded, &value, seq)?;
}
TxnOp::Split { .. }
| TxnOp::Merge { .. }
| TxnOp::Compact { .. }
| TxnOp::Rename { .. }
| TxnOp::NewTree { .. }
| TxnOp::RmTree { .. }
| TxnOp::MemMarker { .. }
| TxnOp::Batch { .. } => {}
}
highest = highest.max(seq);
Ok(())
})?;
Ok(highest + 1)
}