pub mod ingest;
pub mod inner;
pub mod sealed;
use crate::{
AbstractTree, Checksum, KvPair, SeqNo, SequenceNumberCounter, TableId, UserKey, UserValue,
ValueType,
compaction::{CompactionStrategy, drop_range::OwnedBounds, state::CompactionState},
config::Config,
format_version::FormatVersion,
iter_guard::{IterGuard, IterGuardImpl},
key::InternalKey,
manifest::Manifest,
memtable::Memtable,
slice::Slice,
table::Table,
value::InternalValue,
version::{SuperVersion, SuperVersions, Version, recovery::recover},
vlog::BlobFile,
};
use inner::{TreeId, TreeInner};
use std::{
ops::{Bound, RangeBounds},
path::Path,
sync::{Arc, Mutex, RwLock},
};
#[cfg(feature = "metrics")]
use crate::metrics::Metrics;
pub struct Guard(crate::Result<(UserKey, UserValue)>);
impl IterGuard for Guard {
fn into_inner_if(
self,
pred: impl Fn(&UserKey) -> bool,
) -> crate::Result<(UserKey, Option<UserValue>)> {
let (k, v) = self.0?;
if pred(&k) {
Ok((k, Some(v)))
} else {
Ok((k, None))
}
}
fn key(self) -> crate::Result<UserKey> {
self.0.map(|(k, _)| k)
}
fn size(self) -> crate::Result<u32> {
#[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
self.into_inner().map(|(_, v)| v.len() as u32)
}
fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
self.0
}
}
trait TablePointLookup: Sized {
fn lookup(
table: &Table,
key: &[u8],
seqno: SeqNo,
key_hash: u64,
) -> crate::Result<Option<Self>>;
fn entry_seqno(&self) -> SeqNo;
fn filter_tombstone(self) -> Option<Self>;
}
type TableEntry = InternalValue;
impl TablePointLookup for TableEntry {
fn lookup(
table: &Table,
key: &[u8],
seqno: SeqNo,
key_hash: u64,
) -> crate::Result<Option<Self>> {
table.get(key, seqno, key_hash)
}
fn entry_seqno(&self) -> SeqNo {
self.key.seqno
}
fn filter_tombstone(self) -> Option<Self> {
ignore_tombstone_value(self)
}
}
type TableEntryWithBlock = (InternalValue, crate::table::Block);
impl TablePointLookup for TableEntryWithBlock {
fn lookup(
table: &Table,
key: &[u8],
seqno: SeqNo,
key_hash: u64,
) -> crate::Result<Option<Self>> {
table.get_with_block(key, seqno, key_hash)
}
fn entry_seqno(&self) -> SeqNo {
self.0.key.seqno
}
fn filter_tombstone(self) -> Option<Self> {
ignore_tombstone_value(self.0).map(|iv| (iv, self.1))
}
}
fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
if item.is_tombstone() {
None
} else {
Some(item)
}
}
#[derive(Clone)]
pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
impl std::ops::Deref for Tree {
type Target = TreeInner;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl crate::abstract_tree::sealed::Sealed for Tree {}
impl AbstractTree for Tree {
fn table_file_cache_size(&self) -> usize {
self.config
.descriptor_table
.as_ref()
.map_or(0, |dt| dt.len())
}
fn get_version_history_lock(
&self,
) -> std::sync::RwLockWriteGuard<'_, crate::version::SuperVersions> {
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
self.version_history.write().expect("lock is poisoned")
}
fn next_table_id(&self) -> TableId {
self.0.table_id_counter.get()
}
fn id(&self) -> TreeId {
self.id
}
fn blob_file_count(&self) -> usize {
0
}
fn print_trace(&self, key: &[u8]) -> crate::Result<()> {
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let super_version = self
.version_history
.read()
.expect("lock is poisoned")
.latest_version();
let key = Slice::from(key);
for kv in super_version.active_memtable.range_internal((
Bound::Included(InternalKey::new(key.clone(), SeqNo::MAX, ValueType::Value)),
Bound::Unbounded,
)) {
log::info!("[Active] {kv:?}");
}
for mt in super_version.sealed_memtables.iter().rev() {
for kv in mt.range_internal((
Bound::Included(InternalKey::new(key.clone(), SeqNo::MAX, ValueType::Value)),
Bound::Unbounded,
)) {
log::info!("[Sealed #{}] {kv:?}", mt.id());
}
}
for table in super_version
.version
.iter_levels()
.flat_map(|lvl| lvl.iter())
.filter_map(|run| run.get_for_key_cmp(&key, self.config.comparator.as_ref()))
{
for kv in table.range(..) {
let kv = kv?;
if kv.key.user_key != key {
break;
}
log::info!("[Table #{}] {kv:?}", table.id());
}
}
Ok(())
}
fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let super_version = self
.version_history
.read()
.expect("lock is poisoned")
.get_version_for_snapshot(seqno);
Self::get_internal_entry_from_version(
&super_version,
key,
seqno,
self.config.comparator.as_ref(),
)
}
fn current_version(&self) -> Version {
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
self.version_history
.read()
.expect("poisoned")
.latest_version()
.version
}
fn get_flush_lock(&self) -> std::sync::MutexGuard<'_, ()> {
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
self.flush_lock.lock().expect("lock is poisoned")
}
#[cfg(feature = "metrics")]
fn metrics(&self) -> &Arc<crate::Metrics> {
&self.0.metrics
}
fn version_free_list_len(&self) -> usize {
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
self.version_history
.read()
.expect("lock is poisoned")
.free_list_len()
}
fn prefix<K: AsRef<[u8]>>(
&self,
prefix: K,
seqno: SeqNo,
index: Option<(Arc<Memtable>, SeqNo)>,
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
Box::new(
self.create_prefix(&prefix, seqno, index)
.map(|kv| IterGuardImpl::Standard(Guard(kv))),
)
}
fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
&self,
range: R,
seqno: SeqNo,
index: Option<(Arc<Memtable>, SeqNo)>,
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
Box::new(
self.create_range(&range, seqno, index)
.map(|kv| IterGuardImpl::Standard(Guard(kv))),
)
}
fn tombstone_count(&self) -> u64 {
self.current_version()
.iter_tables()
.map(Table::tombstone_count)
.sum()
}
fn weak_tombstone_count(&self) -> u64 {
self.current_version()
.iter_tables()
.map(Table::weak_tombstone_count)
.sum()
}
fn weak_tombstone_reclaimable_count(&self) -> u64 {
self.current_version()
.iter_tables()
.map(Table::weak_tombstone_reclaimable)
.sum()
}
fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
let (bounds, is_empty) = Self::range_bounds_to_owned_bounds(&range);
if is_empty {
return Ok(());
}
let strategy = Arc::new(crate::compaction::drop_range::Strategy::new(bounds));
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let _lock = self
.0
.major_compaction_lock
.write()
.expect("lock is poisoned");
log::info!("Starting drop_range compaction");
self.inner_compact(strategy, 0)?;
Ok(())
}
fn clear(&self) -> crate::Result<()> {
let mut versions = self.get_version_history_lock();
versions.upgrade_version(
&self.config.path,
|v| {
let mut copy = v.clone();
copy.active_memtable = Arc::new(Memtable::new(
self.memtable_id_counter.next(),
self.config.comparator.clone(),
));
copy.sealed_memtables = Arc::default();
copy.version = Version::new(v.version.id() + 1, self.tree_type());
Ok(copy)
},
&self.config.seqno,
&self.config.visible_seqno,
&*self.config.fs,
)
}
#[doc(hidden)]
fn major_compact(
&self,
target_size: u64,
seqno_threshold: SeqNo,
) -> crate::Result<crate::compaction::CompactionResult> {
let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let _lock = self
.0
.major_compaction_lock
.write()
.expect("lock is poisoned");
log::info!("Starting major compaction");
self.inner_compact(strategy, seqno_threshold)
}
fn l0_run_count(&self) -> usize {
self.current_version()
.level(0)
.map(|x| x.run_count())
.unwrap_or_default()
}
fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
#[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
}
fn filter_size(&self) -> u64 {
self.current_version()
.iter_tables()
.map(Table::filter_size)
.map(u64::from)
.sum()
}
fn pinned_filter_size(&self) -> usize {
self.current_version()
.iter_tables()
.map(Table::pinned_filter_size)
.sum()
}
fn pinned_block_index_size(&self) -> usize {
self.current_version()
.iter_tables()
.map(Table::pinned_block_index_size)
.sum()
}
fn sealed_memtable_count(&self) -> usize {
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
self.version_history
.read()
.expect("lock is poisoned")
.latest_version()
.sealed_memtables
.len()
}
fn flush_to_tables_with_rt(
&self,
stream: impl Iterator<Item = crate::Result<InternalValue>>,
range_tombstones: Vec<crate::range_tombstone::RangeTombstone>,
) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>> {
use crate::table::multi_writer::MultiWriter;
use std::time::Instant;
let start = Instant::now();
let (folder, level_fs) = self.config.tables_folder_for_level(0);
let data_block_size = self.config.data_block_size_policy.get(0);
let data_block_restart_interval = self.config.data_block_restart_interval_policy.get(0);
let index_block_restart_interval = self.config.index_block_restart_interval_policy.get(0);
let data_block_compression = self.config.data_block_compression_policy.get(0);
let index_block_compression = self.config.index_block_compression_policy.get(0);
let data_block_hash_ratio = self.config.data_block_hash_ratio_policy.get(0);
let index_partitioning = self.config.index_block_partitioning_policy.get(0);
let filter_partitioning = self.config.filter_block_partitioning_policy.get(0);
log::debug!(
"Flushing memtable(s) to {}, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, data_block_compression={data_block_compression:?}, index_block_compression={index_block_compression:?}",
folder.display(),
);
let mut table_writer = MultiWriter::new(
folder.clone(),
self.table_id_counter.clone(),
64 * 1_024 * 1_024,
0,
level_fs.clone(),
)?
.set_comparator(self.config.comparator.clone())
.use_data_block_restart_interval(data_block_restart_interval)
.use_index_block_restart_interval(index_block_restart_interval)
.use_data_block_compression(data_block_compression)
.use_index_block_compression(index_block_compression)
.use_data_block_size(data_block_size)
.use_data_block_hash_ratio(data_block_hash_ratio)
.use_bloom_policy({
use crate::config::FilterPolicyEntry::{Bloom, None};
use crate::table::filter::BloomConstructionPolicy;
match self.config.filter_policy.get(0) {
Bloom(policy) => policy,
None => BloomConstructionPolicy::BitsPerKey(0.0),
}
});
if index_partitioning {
table_writer = table_writer.use_partitioned_index();
}
if filter_partitioning {
table_writer = table_writer.use_partitioned_filter();
}
table_writer = table_writer.use_prefix_extractor(self.config.prefix_extractor.clone());
table_writer = table_writer.use_encryption(self.config.encryption.clone());
#[cfg(zstd_any)]
{
table_writer = table_writer.use_zstd_dictionary(self.config.zstd_dictionary.clone());
}
table_writer.set_range_tombstones(range_tombstones);
for item in stream {
table_writer.write(item?)?;
}
let result = table_writer.finish()?;
log::debug!("Flushed memtable(s) in {:?}", start.elapsed());
let pin_filter = self.config.filter_block_pinning_policy.get(0);
let pin_index = self.config.index_block_pinning_policy.get(0);
let tables = result
.into_iter()
.map(|(table_id, checksum)| -> crate::Result<Table> {
Table::recover(
folder.join(table_id.to_string()),
checksum,
0,
self.id,
self.config.cache.clone(),
self.config.descriptor_table.clone(),
level_fs.clone(),
pin_filter,
pin_index,
self.config.encryption.clone(),
#[cfg(zstd_any)]
self.config.zstd_dictionary.clone(),
self.config.comparator.clone(),
#[cfg(feature = "metrics")]
self.metrics.clone(),
)
})
.collect::<crate::Result<Vec<_>>>()?;
Ok(Some((tables, None)))
}
#[expect(clippy::significant_drop_tightening)]
fn register_tables(
&self,
tables: &[Table],
blob_files: Option<&[BlobFile]>,
frag_map: Option<crate::blob_tree::FragmentationMap>,
sealed_memtables_to_delete: &[crate::tree::inner::MemtableId],
gc_watermark: SeqNo,
) -> crate::Result<()> {
log::trace!(
"Registering {} tables, {} blob files",
tables.len(),
blob_files.map(<[BlobFile]>::len).unwrap_or_default(),
);
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let mut _compaction_state = self.compaction_state.lock().expect("lock is poisoned");
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let mut version_lock = self.version_history.write().expect("lock is poisoned");
version_lock.upgrade_version(
&self.config.path,
|current| {
let mut copy = current.clone();
let ctx = crate::version::TransformContext::new(self.config.comparator.as_ref());
copy.version = copy.version.with_new_l0_run(
tables,
blob_files,
frag_map.filter(|x| !x.is_empty()),
&ctx,
);
for &table_id in sealed_memtables_to_delete {
log::trace!("releasing sealed memtable #{table_id}");
copy.sealed_memtables = Arc::new(copy.sealed_memtables.remove(table_id));
}
Ok(copy)
},
&self.config.seqno,
&self.config.visible_seqno,
&*self.config.fs,
)?;
if let Err(e) = version_lock.maintenance(&self.config.path, gc_watermark, &*self.config.fs)
{
log::warn!("Version GC failed: {e:?}");
}
Ok(())
}
fn clear_active_memtable(&self) {
use crate::tree::sealed::SealedMemtables;
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let mut version_history_lock = self.version_history.write().expect("lock is poisoned");
let super_version = version_history_lock.latest_version();
if super_version.active_memtable.is_empty() {
return;
}
let mut copy = version_history_lock.latest_version();
copy.active_memtable = Arc::new(Memtable::new(
self.memtable_id_counter.next(),
self.config.comparator.clone(),
));
copy.sealed_memtables = Arc::new(SealedMemtables::default());
copy.seqno = super_version.seqno;
version_history_lock.replace_latest_version(copy);
log::trace!("cleared active memtable");
}
fn compact(
&self,
strategy: Arc<dyn CompactionStrategy>,
seqno_threshold: SeqNo,
) -> crate::Result<crate::compaction::CompactionResult> {
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let _lock = self
.0
.major_compaction_lock
.read()
.expect("lock is poisoned");
self.inner_compact(strategy, seqno_threshold)
}
fn get_next_table_id(&self) -> TableId {
self.0.get_next_table_id()
}
fn tree_config(&self) -> &Config {
&self.config
}
fn active_memtable(&self) -> Arc<Memtable> {
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
self.version_history
.read()
.expect("lock is poisoned")
.latest_version()
.active_memtable
}
fn tree_type(&self) -> crate::TreeType {
crate::TreeType::Standard
}
#[expect(clippy::significant_drop_tightening)]
fn rotate_memtable(&self) -> Option<Arc<Memtable>> {
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let mut version_history_lock = self.version_history.write().expect("lock is poisoned");
let super_version = version_history_lock.latest_version();
if super_version.active_memtable.is_empty() {
return None;
}
let yanked_memtable = super_version.active_memtable;
let mut copy = version_history_lock.latest_version();
copy.active_memtable = Arc::new(Memtable::new(
self.memtable_id_counter.next(),
self.config.comparator.clone(),
));
copy.sealed_memtables =
Arc::new(super_version.sealed_memtables.add(yanked_memtable.clone()));
copy.seqno = super_version.seqno;
version_history_lock.replace_latest_version(copy);
log::trace!(
"rotate: added memtable id={} to sealed memtables",
yanked_memtable.id,
);
Some(yanked_memtable)
}
fn table_count(&self) -> usize {
self.current_version().table_count()
}
fn level_table_count(&self, idx: usize) -> Option<usize> {
self.current_version().level(idx).map(|x| x.table_count())
}
fn approximate_len(&self) -> usize {
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let super_version = self
.version_history
.read()
.expect("lock is poisoned")
.latest_version();
let tables_item_count = self
.current_version()
.iter_tables()
.map(|x| x.metadata.item_count)
.sum::<u64>();
let memtable_count = super_version.active_memtable.len() as u64;
let sealed_count = super_version
.sealed_memtables
.iter()
.map(|mt| mt.len())
.sum::<usize>() as u64;
#[expect(clippy::expect_used, reason = "result should fit into usize")]
(memtable_count + sealed_count + tables_item_count)
.try_into()
.expect("approximate_len too large for usize")
}
fn disk_space(&self) -> u64 {
self.current_version()
.iter_levels()
.map(super::version::Level::size)
.sum()
}
fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let version = self
.version_history
.read()
.expect("lock is poisoned")
.latest_version();
let active = version.active_memtable.get_highest_seqno();
let sealed = version
.sealed_memtables
.iter()
.map(|mt| mt.get_highest_seqno())
.max()
.flatten();
active.max(sealed)
}
fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
self.current_version()
.iter_tables()
.map(Table::get_highest_seqno)
.max()
}
fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>> {
let key = key.as_ref();
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let super_version = self
.version_history
.read()
.expect("lock is poisoned")
.get_version_for_snapshot(seqno);
Self::resolve_or_passthrough(
&super_version,
key,
seqno,
self.config.merge_operator.as_ref(),
self.config.comparator.as_ref(),
)
}
fn get_pinned<K: AsRef<[u8]>>(
&self,
key: K,
seqno: SeqNo,
) -> crate::Result<Option<crate::PinnableSlice>> {
let key = key.as_ref();
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let super_version = self
.version_history
.read()
.expect("lock is poisoned")
.get_version_for_snapshot(seqno);
Self::resolve_or_passthrough_pinned(
&super_version,
key,
seqno,
self.config.merge_operator.as_ref(),
self.config.comparator.as_ref(),
)
}
#[expect(
clippy::indexing_slicing,
reason = "indices are generated from 0..n range, always in bounds"
)]
fn multi_get<K: AsRef<[u8]>>(
&self,
keys: impl IntoIterator<Item = K>,
seqno: SeqNo,
) -> crate::Result<Vec<Option<UserValue>>> {
let super_version = self.get_version_for_snapshot(seqno);
let comparator = self.config.comparator.as_ref();
let merge_operator = self.config.merge_operator.as_ref();
let keys: Vec<_> = keys.into_iter().collect();
let n = keys.len();
if n == 0 {
return Ok(Vec::new());
}
if n <= 2 {
return keys
.iter()
.map(|key| {
Self::resolve_or_passthrough(
&super_version,
key.as_ref(),
seqno,
merge_operator,
comparator,
)
})
.collect();
}
let mut internal_entries: Vec<Option<InternalValue>> = vec![None; n];
let mut remaining: Vec<usize> = Vec::with_capacity(n);
for idx in 0..n {
let key = keys[idx].as_ref();
if let Some(entry) = super_version.active_memtable.get(key, seqno) {
internal_entries[idx] = Some(entry);
continue;
}
if let Some(entry) =
Self::get_internal_entry_from_sealed_memtables(&super_version, key, seqno)
{
internal_entries[idx] = Some(entry);
continue;
}
remaining.push(idx);
}
if !remaining.is_empty() {
remaining.sort_by(|&a, &b| comparator.compare(keys[a].as_ref(), keys[b].as_ref()));
let miss_keys: Vec<(usize, u64)> = remaining
.iter()
.map(|&idx| {
let hash =
crate::table::filter::standard_bloom::Builder::get_hash(keys[idx].as_ref());
(idx, hash)
})
.collect();
Self::batch_get_from_tables(
&super_version.version,
&keys,
miss_keys,
seqno,
comparator,
&mut internal_entries,
)?;
}
let mut results = vec![None; n];
for idx in 0..n {
let entry = internal_entries[idx].take();
results[idx] = Self::resolve_entry(
&super_version,
keys[idx].as_ref(),
entry,
seqno,
merge_operator,
comparator,
)?;
}
Ok(results)
}
fn apply_batch(&self, batch: crate::WriteBatch, seqno: SeqNo) -> crate::Result<(u64, u64)> {
if batch.is_empty() {
return Ok((0, self.active_memtable().size()));
}
Ok(self.append_batch(batch.materialize(seqno)?))
}
fn insert<K: Into<UserKey>, V: Into<UserValue>>(
&self,
key: K,
value: V,
seqno: SeqNo,
) -> (u64, u64) {
let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
self.append_entry(value)
}
fn merge<K: Into<UserKey>, V: Into<UserValue>>(
&self,
key: K,
operand: V,
seqno: SeqNo,
) -> (u64, u64) {
let value = InternalValue::new_merge_operand(key, operand, seqno);
self.append_entry(value)
}
fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
let value = InternalValue::new_tombstone(key, seqno);
self.append_entry(value)
}
fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
let value = InternalValue::new_weak_tombstone(key, seqno);
self.append_entry(value)
}
fn remove_range<K: Into<UserKey>>(&self, start: K, end: K, seqno: SeqNo) -> u64 {
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let memtable = Arc::clone(
&self
.version_history
.read()
.expect("lock is poisoned")
.latest_version()
.active_memtable,
);
memtable.insert_range_tombstone(start.into(), end.into(), seqno)
}
}
impl Tree {
fn resolve_or_passthrough(
super_version: &SuperVersion,
key: &[u8],
seqno: SeqNo,
merge_operator: Option<&Arc<dyn crate::merge_operator::MergeOperator>>,
comparator: &dyn crate::comparator::UserComparator,
) -> crate::Result<Option<UserValue>> {
let entry = Self::get_internal_entry_from_version(super_version, key, seqno, comparator)?;
match entry {
Some(entry) if entry.key.value_type == ValueType::MergeOperand => {
if let Some(merge_op) = merge_operator {
Self::resolve_merge_via_pipeline(
super_version.clone(),
key,
seqno,
Arc::clone(merge_op),
)
} else if Self::is_suppressed_by_range_tombstones(
super_version,
key,
entry.key.seqno,
seqno,
comparator,
) {
Ok(None)
} else {
Ok(Some(entry.value))
}
}
Some(entry) => Ok(Some(entry.value)),
None => Ok(None),
}
}
fn resolve_pinned_entry(
super_version: &SuperVersion,
key: &[u8],
entry: InternalValue,
seqno: SeqNo,
merge_operator: Option<&Arc<dyn crate::merge_operator::MergeOperator>>,
comparator: &dyn crate::comparator::UserComparator,
wrap: impl FnOnce(UserValue) -> crate::PinnableSlice,
) -> crate::Result<Option<crate::PinnableSlice>> {
use crate::PinnableSlice;
let Some(entry) = ignore_tombstone_value(entry) else {
return Ok(None);
};
if Self::is_suppressed_by_range_tombstones(
super_version,
key,
entry.key.seqno,
seqno,
comparator,
) {
return Ok(None);
}
if entry.key.value_type == ValueType::MergeOperand
&& let Some(merge_op) = merge_operator
{
return Self::resolve_merge_via_pipeline(
super_version.clone(),
key,
seqno,
Arc::clone(merge_op),
)
.map(|opt| opt.map(PinnableSlice::owned));
}
Ok(Some(wrap(entry.value)))
}
fn resolve_or_passthrough_pinned(
super_version: &SuperVersion,
key: &[u8],
seqno: SeqNo,
merge_operator: Option<&Arc<dyn crate::merge_operator::MergeOperator>>,
comparator: &dyn crate::comparator::UserComparator,
) -> crate::Result<Option<crate::PinnableSlice>> {
use crate::PinnableSlice;
if let Some(entry) = super_version.active_memtable.get(key, seqno) {
return Self::resolve_pinned_entry(
super_version,
key,
entry,
seqno,
merge_operator,
comparator,
PinnableSlice::owned,
);
}
if let Some(entry) =
Self::get_internal_entry_from_sealed_memtables(super_version, key, seqno)
{
return Self::resolve_pinned_entry(
super_version,
key,
entry,
seqno,
merge_operator,
comparator,
PinnableSlice::owned,
);
}
let key_hash = crate::table::filter::standard_bloom::Builder::get_hash(key);
if let Some((entry, block)) = Self::get_internal_entry_with_block_from_tables(
&super_version.version,
key,
seqno,
key_hash,
comparator,
)? {
return Self::resolve_pinned_entry(
super_version,
key,
entry,
seqno,
merge_operator,
comparator,
|value| PinnableSlice::pinned(block, value),
);
}
Ok(None)
}
fn get_internal_entry_with_block_from_tables(
version: &Version,
key: &[u8],
seqno: SeqNo,
key_hash: u64,
comparator: &dyn crate::comparator::UserComparator,
) -> crate::Result<Option<(InternalValue, crate::table::Block)>> {
Self::find_in_tables::<TableEntryWithBlock>(version, key, seqno, key_hash, comparator)
}
pub(crate) fn resolve_merge_via_pipeline(
version: SuperVersion,
key: &[u8],
seqno: SeqNo,
merge_operator: Arc<dyn crate::merge_operator::MergeOperator>,
) -> crate::Result<Option<UserValue>> {
use crate::range::{IterState, TreeIter};
let key_hash = crate::table::filter::standard_bloom::Builder::get_hash(key);
let bloom_key = crate::Slice::from(key);
let comparator = version.active_memtable.comparator.clone();
let iter_state = IterState {
version,
ephemeral: None,
merge_operator: Some(merge_operator),
comparator,
prefix_hash: None,
key_hash: Some(key_hash),
bloom_key: Some(bloom_key),
#[cfg(feature = "metrics")]
metrics: None,
};
let mut iter = TreeIter::create_range_point(iter_state, key, seqno);
match iter.next() {
Some(Ok(entry)) => Ok(Some(entry.value)),
Some(Err(e)) => Err(e),
None => Ok(None),
}
}
#[doc(hidden)]
pub fn create_internal_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
version: SuperVersion,
range: &'a R,
seqno: SeqNo,
ephemeral: Option<(Arc<Memtable>, SeqNo)>,
merge_operator: Option<Arc<dyn crate::merge_operator::MergeOperator>>,
comparator: crate::comparator::SharedComparator,
) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
Self::create_internal_range_with_prefix_hash(
version,
range,
seqno,
ephemeral,
merge_operator,
comparator,
None,
)
}
#[doc(hidden)]
pub(crate) fn create_internal_range_with_prefix_hash<
'a,
K: AsRef<[u8]> + 'a,
R: RangeBounds<K> + 'a,
>(
version: SuperVersion,
range: &'a R,
seqno: SeqNo,
ephemeral: Option<(Arc<Memtable>, SeqNo)>,
merge_operator: Option<Arc<dyn crate::merge_operator::MergeOperator>>,
comparator: crate::comparator::SharedComparator,
prefix_hash: Option<u64>,
) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
use crate::range::{IterState, TreeIter};
use std::ops::Bound::{self, Excluded, Included, Unbounded};
let lo: Bound<UserKey> = match range.start_bound() {
Included(x) => Included(x.as_ref().into()),
Excluded(x) => Excluded(x.as_ref().into()),
Unbounded => Unbounded,
};
let hi: Bound<UserKey> = match range.end_bound() {
Included(x) => Included(x.as_ref().into()),
Excluded(x) => Excluded(x.as_ref().into()),
Unbounded => Unbounded,
};
let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
let iter_state = IterState {
version,
ephemeral,
merge_operator,
comparator,
prefix_hash,
key_hash: None,
bloom_key: None,
#[cfg(feature = "metrics")]
metrics: None,
};
TreeIter::create_range(iter_state, bounds, seqno)
}
pub(crate) fn get_internal_entry_from_version(
super_version: &SuperVersion,
key: &[u8],
seqno: SeqNo,
comparator: &dyn crate::comparator::UserComparator,
) -> crate::Result<Option<InternalValue>> {
if let Some(entry) = super_version.active_memtable.get(key, seqno) {
let Some(entry) = ignore_tombstone_value(entry) else {
return Ok(None);
};
if Self::is_suppressed_by_range_tombstones(
super_version,
key,
entry.key.seqno,
seqno,
comparator,
) {
return Ok(None);
}
return Ok(Some(entry));
}
if let Some(entry) =
Self::get_internal_entry_from_sealed_memtables(super_version, key, seqno)
{
let Some(entry) = ignore_tombstone_value(entry) else {
return Ok(None);
};
if Self::is_suppressed_by_range_tombstones(
super_version,
key,
entry.key.seqno,
seqno,
comparator,
) {
return Ok(None);
}
return Ok(Some(entry));
}
let entry =
Self::get_internal_entry_from_tables(&super_version.version, key, seqno, comparator)?;
if let Some(entry) = entry {
if Self::is_suppressed_by_range_tombstones(
super_version,
key,
entry.key.seqno,
seqno,
comparator,
) {
return Ok(None);
}
return Ok(Some(entry));
}
Ok(None)
}
pub(crate) fn is_suppressed_by_range_tombstones(
super_version: &SuperVersion,
key: &[u8],
key_seqno: SeqNo,
read_seqno: SeqNo,
comparator: &dyn crate::comparator::UserComparator,
) -> bool {
if super_version
.active_memtable
.is_key_suppressed_by_range_tombstone(key, key_seqno, read_seqno)
{
return true;
}
for mt in super_version.sealed_memtables.iter().rev() {
if mt.is_key_suppressed_by_range_tombstone(key, key_seqno, read_seqno) {
return true;
}
}
for table in super_version
.version
.iter_levels()
.flat_map(|lvl| lvl.iter())
.flat_map(|run| run.iter())
.filter(|t| !t.range_tombstones().is_empty())
.filter(|t| {
let kr = &t.metadata.key_range;
comparator.compare(kr.min(), key) != std::cmp::Ordering::Greater
&& comparator.compare(key, kr.max()) != std::cmp::Ordering::Greater
})
{
let rts = table.range_tombstones();
let candidate_end = rts.partition_point(|rt| {
comparator.compare(&rt.start, key) != std::cmp::Ordering::Greater
});
for rt in rts.iter().take(candidate_end) {
if rt.visible_at(read_seqno)
&& comparator.compare(&rt.start, key) != std::cmp::Ordering::Greater
&& comparator.compare(key, &rt.end) == std::cmp::Ordering::Less
&& key_seqno < rt.seqno
{
return true;
}
}
}
false
}
fn resolve_entry(
super_version: &SuperVersion,
key: &[u8],
entry: Option<InternalValue>,
seqno: SeqNo,
merge_operator: Option<&Arc<dyn crate::merge_operator::MergeOperator>>,
comparator: &dyn crate::comparator::UserComparator,
) -> crate::Result<Option<UserValue>> {
let Some(entry) = entry else {
return Ok(None);
};
Self::resolve_pinned_entry(
super_version,
key,
entry,
seqno,
merge_operator,
comparator,
crate::PinnableSlice::owned,
)
.map(|opt| opt.map(crate::PinnableSlice::into_value))
}
#[expect(
clippy::indexing_slicing,
reason = "miss_keys entries carry batch-local indices; callers must pass a results slice aligned with keys"
)]
pub(crate) fn batch_get_from_tables<K: AsRef<[u8]>>(
version: &Version,
keys: &[K],
miss_keys: Vec<(usize, u64)>,
seqno: SeqNo,
comparator: &dyn crate::comparator::UserComparator,
results: &mut [Option<InternalValue>],
) -> crate::Result<()> {
debug_assert_eq!(results.len(), keys.len());
debug_assert!(miss_keys.iter().all(|&(i, _)| i < keys.len()));
let mut still_remaining = miss_keys;
for (level_idx, level) in version.iter_levels().enumerate() {
if still_remaining.is_empty() {
break;
}
if level_idx == 0 {
let mut at_ceiling = vec![false; keys.len()];
for run in level.iter() {
for &(idx, hash) in &still_remaining {
if at_ceiling[idx] {
continue;
}
let key = keys[idx].as_ref();
if let Some(table) = run.get_for_key_cmp(key, comparator)
&& let Some(item) = table.get(key, seqno, hash)?
{
match &results[idx] {
Some(current) if current.key.seqno >= item.key.seqno => {}
_ => {
if item.key.seqno.checked_add(1) == Some(seqno) {
at_ceiling[idx] = true;
}
results[idx] = Some(item);
}
}
}
}
}
still_remaining.retain(|&(idx, _)| results[idx].is_none());
} else {
let mut covered_miss: Vec<(usize, u64)> = Vec::new();
for run in level.iter() {
let mut not_covered = Vec::with_capacity(still_remaining.len());
for &(idx, hash) in &still_remaining {
let key = keys[idx].as_ref();
if let Some(table) = run.get_for_key_cmp(key, comparator) {
if let Some(item) = table.get(key, seqno, hash)? {
results[idx] = Some(item);
} else {
covered_miss.push((idx, hash));
}
} else {
not_covered.push((idx, hash));
}
}
still_remaining = not_covered;
}
let needs_sort = !covered_miss.is_empty();
still_remaining.extend(covered_miss);
if needs_sort {
still_remaining.sort_by(|&(a, _), &(b, _)| {
comparator.compare(keys[a].as_ref(), keys[b].as_ref())
});
}
}
}
Ok(())
}
fn get_internal_entry_from_tables(
version: &Version,
key: &[u8],
seqno: SeqNo,
comparator: &dyn crate::comparator::UserComparator,
) -> crate::Result<Option<InternalValue>> {
let key_hash = crate::table::filter::standard_bloom::Builder::get_hash(key);
Self::find_in_tables::<TableEntry>(version, key, seqno, key_hash, comparator)
}
fn find_in_tables<T: TablePointLookup>(
version: &Version,
key: &[u8],
seqno: SeqNo,
key_hash: u64,
comparator: &dyn crate::comparator::UserComparator,
) -> crate::Result<Option<T>> {
for (level_idx, level) in version.iter_levels().enumerate() {
if level_idx == 0 {
let mut best: Option<T> = None;
for run in level.iter() {
if let Some(table) = run.get_for_key_cmp(key, comparator)
&& let Some(item) = T::lookup(table, key, seqno, key_hash)?
{
match &best {
Some(current) if current.entry_seqno() >= item.entry_seqno() => {}
_ => {
if item.entry_seqno().checked_add(1) == Some(seqno) {
return Ok(item.filter_tombstone());
}
best = Some(item);
}
}
}
}
if let Some(entry) = best {
return Ok(entry.filter_tombstone());
}
} else {
for run in level.iter() {
if let Some(table) = run.get_for_key_cmp(key, comparator) {
if let Some(item) = T::lookup(table, key, seqno, key_hash)? {
return Ok(item.filter_tombstone());
}
break;
}
}
}
}
Ok(None)
}
pub(crate) fn get_internal_entry_from_sealed_memtables(
super_version: &SuperVersion,
key: &[u8],
seqno: SeqNo,
) -> Option<InternalValue> {
for mt in super_version.sealed_memtables.iter().rev() {
if let Some(entry) = mt.get(key, seqno) {
return Some(entry);
}
}
None
}
pub(crate) fn get_version_for_snapshot(&self, seqno: SeqNo) -> SuperVersion {
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
self.version_history
.read()
.expect("lock is poisoned")
.get_version_for_snapshot(seqno)
}
fn range_bounds_to_owned_bounds<K: AsRef<[u8]>, R: RangeBounds<K>>(
range: &R,
) -> (OwnedBounds, bool) {
use Bound::{Excluded, Included, Unbounded};
let start = match range.start_bound() {
Included(key) => Included(Slice::from(key.as_ref())),
Excluded(key) => Excluded(Slice::from(key.as_ref())),
Unbounded => Unbounded,
};
let end = match range.end_bound() {
Included(key) => Included(Slice::from(key.as_ref())),
Excluded(key) => Excluded(Slice::from(key.as_ref())),
Unbounded => Unbounded,
};
let is_empty =
if let (Included(lo) | Excluded(lo), Included(hi) | Excluded(hi)) = (&start, &end) {
lo.as_ref() > hi.as_ref()
} else {
false
};
(OwnedBounds { start, end }, is_empty)
}
pub(crate) fn open(config: Config) -> crate::Result<Self> {
log::debug!("Opening LSM-tree at {}", config.path.display());
if config.fs.exists(&config.path.join("version"))? {
log::error!(
"It looks like you are trying to open a V1 database - the database needs a manual migration, however a migration tool is not provided, as V1 is extremely outdated."
);
return Err(crate::Error::InvalidVersion(FormatVersion::V1.into()));
}
let tree = match crate::version::recovery::get_current_version(&config.path, &*config.fs) {
Ok(_) => Self::recover(config),
Err(crate::Error::Io(e)) if e.kind() == std::io::ErrorKind::NotFound => {
Self::create_new(config)
}
Err(e) => Err(e),
}?;
Ok(tree)
}
#[doc(hidden)]
#[must_use]
pub fn is_compacting(&self) -> bool {
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
!self
.compaction_state
.lock()
.expect("lock is poisoned")
.hidden_set()
.is_empty()
}
fn inner_compact(
&self,
strategy: Arc<dyn CompactionStrategy>,
mvcc_gc_watermark: SeqNo,
) -> crate::Result<crate::compaction::CompactionResult> {
use crate::compaction::worker::{Options, do_compaction};
let mut opts = Options::from_tree(self, strategy);
opts.mvcc_gc_watermark = mvcc_gc_watermark;
let result = do_compaction(&opts)?;
log::debug!("Compaction run over");
Ok(result)
}
#[doc(hidden)]
#[must_use]
pub fn create_iter(
&self,
seqno: SeqNo,
ephemeral: Option<(Arc<Memtable>, SeqNo)>,
) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
self.create_range::<UserKey, _>(&.., seqno, ephemeral)
}
#[doc(hidden)]
pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
&self,
range: &'a R,
seqno: SeqNo,
ephemeral: Option<(Arc<Memtable>, SeqNo)>,
) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let super_version = self
.version_history
.read()
.expect("lock is poisoned")
.get_version_for_snapshot(seqno);
Self::create_internal_range(
super_version,
range,
seqno,
ephemeral,
self.config.merge_operator.clone(),
self.config.comparator.clone(),
)
.map(|item| match item {
Ok(kv) => Ok((kv.key.user_key, kv.value)),
Err(e) => Err(e),
})
}
#[doc(hidden)]
pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
&self,
prefix: K,
seqno: SeqNo,
ephemeral: Option<(Arc<Memtable>, SeqNo)>,
) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
use crate::prefix::compute_prefix_hash;
use crate::range::{IterState, TreeIter, prefix_to_range};
let prefix_bytes = prefix.as_ref();
let prefix_hash = compute_prefix_hash(self.config.prefix_extractor.as_ref(), prefix_bytes);
let range = prefix_to_range(prefix_bytes);
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let super_version = self
.version_history
.read()
.expect("lock is poisoned")
.get_version_for_snapshot(seqno);
let iter_state = IterState {
version: super_version,
ephemeral,
merge_operator: self.config.merge_operator.clone(),
comparator: self.config.comparator.clone(),
prefix_hash,
key_hash: None,
bloom_key: None,
#[cfg(feature = "metrics")]
metrics: Some(self.0.metrics.clone()),
};
TreeIter::create_range(iter_state, range, seqno).map(|item| match item {
Ok(kv) => Ok((kv.key.user_key, kv.value)),
Err(e) => Err(e),
})
}
#[doc(hidden)]
#[must_use]
pub fn append_entry(&self, value: InternalValue) -> (u64, u64) {
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
self.version_history
.read()
.expect("lock is poisoned")
.latest_version()
.active_memtable
.insert(value)
}
#[doc(hidden)]
#[must_use]
pub(crate) fn append_batch(&self, items: Vec<InternalValue>) -> (u64, u64) {
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
self.version_history
.read()
.expect("lock is poisoned")
.latest_version()
.active_memtable
.insert_batch(items)
}
fn recover(mut config: Config) -> crate::Result<Self> {
use crate::stop_signal::StopSignal;
use inner::get_next_tree_id;
log::info!("Recovering LSM-tree at {}", config.path.display());
{
let version_id =
crate::version::recovery::get_current_version(&config.path, &*config.fs)?;
let manifest_path = config.path.join(format!("v{version_id}"));
let mut manifest_file = config
.fs
.open(&manifest_path, &crate::fs::FsOpenOptions::new().read(true))?;
let reader = sfa::Reader::from_reader(&mut manifest_file)?;
let manifest = Manifest::decode_from(&manifest_path, &reader, &*config.fs)?;
if !matches!(manifest.version, FormatVersion::V3 | FormatVersion::V4) {
return Err(crate::Error::InvalidVersion(manifest.version.into()));
}
let supplied_name = config.comparator.name();
if manifest.comparator_name != supplied_name {
log::warn!(
"Comparator mismatch: tree was created with {:?} but opened with {:?}",
manifest.comparator_name,
supplied_name,
);
return Err(crate::Error::ComparatorMismatch {
stored: manifest.comparator_name,
supplied: supplied_name,
});
}
config.level_count = manifest.level_count;
}
let tree_id = get_next_tree_id();
#[cfg(feature = "metrics")]
let metrics = Arc::new(Metrics::default());
let version = Self::recover_levels(
&config.path,
tree_id,
&config,
#[cfg(feature = "metrics")]
&metrics,
)?;
{
let requested_tree_type = match config.kv_separation_opts {
Some(_) => crate::TreeType::Blob,
None => crate::TreeType::Standard,
};
if version.tree_type() != requested_tree_type {
log::error!(
"Tried to open a {requested_tree_type:?}Tree, but the existing tree is of type {:?}Tree. This indicates a misconfiguration or corruption.",
version.tree_type(),
);
return Err(crate::Error::Unrecoverable);
}
}
let highest_table_id = version
.iter_tables()
.map(Table::id)
.max()
.unwrap_or_default();
let comparator = config.comparator.clone();
let inner = TreeInner {
id: tree_id,
memtable_id_counter: SequenceNumberCounter::new(1),
table_id_counter: SequenceNumberCounter::new(highest_table_id + 1),
blob_file_id_counter: SequenceNumberCounter::default(),
version_history: Arc::new(RwLock::new(SuperVersions::new(version, &comparator))),
stop_signal: StopSignal::default(),
config: Arc::new(config),
major_compaction_lock: RwLock::default(),
flush_lock: Mutex::default(),
compaction_state: Arc::new(Mutex::new(CompactionState::default())),
#[cfg(feature = "metrics")]
metrics,
};
Ok(Self(Arc::new(inner)))
}
fn create_new(config: Config) -> crate::Result<Self> {
use crate::file::fsync_directory;
let path = config.path.clone();
log::trace!("Creating LSM-tree at {}", path.display());
(*config.fs).create_dir_all(&path)?;
for (table_folder_path, folder_fs) in config.all_tables_folders() {
folder_fs.create_dir_all(&table_folder_path)?;
fsync_directory(&table_folder_path, &*folder_fs)?;
if let Some(parent) = table_folder_path.parent() {
fsync_directory(parent, &*folder_fs)?;
if let Some(grandparent) = parent.parent() {
fsync_directory(grandparent, &*folder_fs)?;
}
}
}
fsync_directory(&path, &*config.fs)?;
let inner = TreeInner::create_new(config)?;
Ok(Self(Arc::new(inner)))
}
#[expect(
clippy::too_many_lines,
reason = "recovery logic is inherently complex"
)]
fn recover_levels<P: AsRef<Path>>(
tree_path: P,
tree_id: TreeId,
config: &Config,
#[cfg(feature = "metrics")] metrics: &Arc<Metrics>,
) -> crate::Result<Version> {
use crate::{TableId, file::fsync_directory};
let tree_path = tree_path.as_ref();
let recovery = recover(tree_path, &*config.fs)?;
let mut table_map = {
let mut result: crate::HashMap<TableId, (u8 /* Level index */, Checksum, SeqNo)> =
crate::HashMap::default();
for (level_idx, table_ids) in recovery.table_ids.iter().enumerate() {
for run in table_ids {
for table in run {
#[expect(
clippy::expect_used,
reason = "there are always less than 256 levels"
)]
result.insert(
table.id,
(
level_idx
.try_into()
.expect("there are less than 256 levels"),
table.checksum,
table.global_seqno,
),
);
}
}
}
result
};
let cnt = table_map.len();
log::debug!("Recovering {cnt} tables from {}", tree_path.display());
let progress_mod = match cnt {
_ if cnt <= 20 => 1,
_ if cnt <= 100 => 10,
_ => 100,
};
let mut tables = vec![];
let mut recovered_table_ids: crate::HashSet<TableId> = crate::HashSet::default();
let mut orphaned_tables: Vec<(std::path::PathBuf, Arc<dyn crate::fs::Fs>)> = vec![];
let all_folders = config.all_tables_folders();
for (table_base_folder, folder_fs) in &all_folders {
if !folder_fs.exists(table_base_folder)? {
folder_fs.create_dir_all(table_base_folder)?;
fsync_directory(table_base_folder, &**folder_fs)?;
if let Some(parent) = table_base_folder.parent() {
fsync_directory(parent, &**folder_fs)?;
if let Some(grandparent) = parent.parent() {
fsync_directory(grandparent, &**folder_fs)?;
}
}
}
for dirent in folder_fs.read_dir(table_base_folder)? {
let crate::fs::FsDirEntry {
path: table_file_path,
file_name,
is_dir,
} = dirent;
if file_name == ".DS_Store" {
continue;
}
if file_name.starts_with("._") {
continue;
}
let table_file_name = &file_name;
if is_dir {
log::warn!(
"Skipping unexpected directory in tables folder: {}",
table_file_path.display()
);
continue;
}
let table_id = table_file_name.parse::<TableId>().map_err(|e| {
log::error!("invalid table file name {table_file_name:?}: {e:?}");
crate::Error::Unrecoverable
})?;
if let Some((level_idx, checksum, global_seqno)) = table_map.remove(&table_id) {
let pin_filter = config.filter_block_pinning_policy.get(level_idx.into());
let pin_index = config.index_block_pinning_policy.get(level_idx.into());
let table = Table::recover(
table_file_path,
checksum,
global_seqno,
tree_id,
config.cache.clone(),
config.descriptor_table.clone(),
folder_fs.clone(),
pin_filter,
pin_index,
config.encryption.clone(),
#[cfg(zstd_any)]
config.zstd_dictionary.clone(),
config.comparator.clone(),
#[cfg(feature = "metrics")]
metrics.clone(),
)?;
tables.push(table);
recovered_table_ids.insert(table_id);
if tables.len() % progress_mod == 0 {
log::debug!("Recovered {}/{cnt} tables", tables.len());
}
} else if recovered_table_ids.contains(&table_id) {
log::warn!(
"Skipping duplicate sighting of manifest table {table_id} in {}",
table_file_path.display(),
);
} else {
orphaned_tables.push((table_file_path, folder_fs.clone()));
}
}
}
if tables.len() < cnt {
if let Some(routes) = &config.level_routes {
let all_missing_uncovered = table_map
.values()
.all(|(level, _, _)| !routes.iter().any(|r| r.levels.contains(level)));
if all_missing_uncovered {
let found = tables.len();
let missing_ids: Vec<_> = table_map.keys().collect();
log::error!(
"Route mismatch: expected {cnt} tables but found {found} — \
level_routes do not cover all previously used levels. \
Missing table IDs: {missing_ids:?}",
);
return Err(crate::Error::RouteMismatch {
expected: cnt,
found,
});
}
}
log::error!(
"Recovered less tables than expected: {:?}",
table_map.keys(),
);
return Err(crate::Error::Unrecoverable);
}
log::debug!("Successfully recovered {} tables", tables.len());
let (blob_files, orphaned_blob_files) = crate::vlog::recover_blob_files(
&tree_path.join(crate::file::BLOBS_FOLDER),
&recovery.blob_file_ids,
tree_id,
config.descriptor_table.as_ref(),
&config.fs,
)?;
let version = Version::from_recovery(recovery, &tables, &blob_files)?;
Self::cleanup_orphaned_version(tree_path, version.id(), &*config.fs)?;
for (table_path, orphan_fs) in orphaned_tables {
log::debug!("Deleting orphaned table {}", table_path.display());
orphan_fs.remove_file(&table_path)?;
}
for blob_file_path in orphaned_blob_files {
log::debug!("Deleting orphaned blob file {}", blob_file_path.display());
(*config.fs).remove_file(&blob_file_path)?;
}
Ok(version)
}
fn cleanup_orphaned_version(
path: &Path,
latest_version_id: crate::version::VersionId,
fs: &dyn crate::fs::Fs,
) -> crate::Result<()> {
let version_str = format!("v{latest_version_id}");
for dirent in fs.read_dir(path)? {
if dirent.is_dir {
continue;
}
if dirent.file_name.starts_with('v') && dirent.file_name != version_str {
log::trace!("Cleanup orphaned version {}", dirent.file_name);
match fs.remove_file(&dirent.path) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => return Err(e.into()),
}
}
}
Ok(())
}
}