mod gc;
pub mod handle;
pub mod ingest;
#[doc(hidden)]
pub use gc::{FragmentationEntry, FragmentationMap};
use crate::{
Cache, Config, Memtable, SeqNo, TableId, TreeId, UserKey, UserValue,
abstract_tree::{AbstractTree, RangeItem},
coding::Decode,
iter_guard::{IterGuard, IterGuardImpl},
table::Table,
tree::inner::MemtableId,
value::InternalValue,
version::Version,
vlog::{Accessor, BlobFile, BlobFileWriter},
};
use handle::BlobIndirection;
use std::{
ops::RangeBounds,
path::{Path, PathBuf},
sync::{Arc, MutexGuard},
};
pub struct Guard {
tree: crate::BlobTree,
version: Version,
kv: crate::Result<InternalValue>,
}
impl IterGuard for Guard {
fn into_inner_if(
self,
pred: impl Fn(&UserKey) -> bool,
) -> crate::Result<(UserKey, Option<UserValue>)> {
let kv = self.kv?;
if pred(&kv.key.user_key) {
resolve_value_handle(
self.tree.id(),
self.tree.blobs_folder.as_path(),
&self.tree.index.config.cache,
&self.version,
kv,
)
.map(|(k, v)| (k, Some(v)))
} else {
Ok((kv.key.user_key, None))
}
}
fn key(self) -> crate::Result<UserKey> {
self.kv.map(|kv| kv.key.user_key)
}
fn size(self) -> crate::Result<u32> {
let kv = self.kv?;
if kv.key.value_type.is_indirection() {
let mut cursor = std::io::Cursor::new(kv.value);
Ok(BlobIndirection::decode_from(&mut cursor)?.size)
} else {
#[expect(clippy::cast_possible_truncation, reason = "values are u32 max length")]
Ok(kv.value.len() as u32)
}
}
fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
resolve_value_handle(
self.tree.id(),
self.tree.blobs_folder.as_path(),
&self.tree.index.config.cache,
&self.version,
self.kv?,
)
}
}
fn resolve_value_handle(
tree_id: TreeId,
blobs_folder: &Path,
cache: &Cache,
version: &Version,
item: InternalValue,
) -> RangeItem {
if item.key.value_type.is_indirection() {
let mut cursor = std::io::Cursor::new(item.value);
let vptr = BlobIndirection::decode_from(&mut cursor)?;
match Accessor::new(&version.blob_files).get(
tree_id,
blobs_folder,
&item.key.user_key,
&vptr.vhandle,
cache,
) {
Ok(Some(v)) => {
let k = item.key.user_key;
Ok((k, v))
}
Ok(None) => {
panic!(
"value handle ({:?} => {:?}) did not match any blob - this is a bug; version={}",
item.key.user_key,
vptr.vhandle,
version.id(),
);
}
Err(e) => Err(e),
}
} else {
let k = item.key.user_key;
let v = item.value;
Ok((k, v))
}
}
#[derive(Clone)]
pub struct BlobTree {
#[doc(hidden)]
pub index: crate::Tree,
blobs_folder: Arc<PathBuf>,
}
impl BlobTree {
pub(crate) fn open(config: Config) -> crate::Result<Self> {
use crate::file::{BLOBS_FOLDER, fsync_directory};
let index = crate::Tree::open(config)?;
let blobs_folder = index.config.path.join(BLOBS_FOLDER);
(*index.config.fs).create_dir_all(&blobs_folder)?;
fsync_directory(&blobs_folder, &*index.config.fs)?;
let blob_file_id_to_continue_with = index
.current_version()
.blob_files
.list_ids()
.max()
.map(|x| x + 1)
.unwrap_or_default();
index
.0
.blob_file_id_counter
.set(blob_file_id_to_continue_with);
Ok(Self {
index,
blobs_folder: Arc::new(blobs_folder),
})
}
fn resolve_key(
&self,
super_version: &crate::version::SuperVersion,
key: &[u8],
seqno: SeqNo,
) -> crate::Result<Option<UserValue>> {
let Some(item) = crate::Tree::get_internal_entry_from_version(
super_version,
key,
seqno,
self.index.config.comparator.as_ref(),
)?
else {
return Ok(None);
};
let (_, v) = resolve_value_handle(
self.id(),
self.blobs_folder.as_path(),
&self.index.config.cache,
&super_version.version,
item,
)?;
Ok(Some(v))
}
}
impl crate::abstract_tree::sealed::Sealed for BlobTree {}
impl AbstractTree for BlobTree {
fn print_trace(&self, key: &[u8]) -> crate::Result<()> {
self.index.print_trace(key)
}
fn table_file_cache_size(&self) -> usize {
self.index.table_file_cache_size()
}
fn get_version_history_lock(
&self,
) -> std::sync::RwLockWriteGuard<'_, crate::version::SuperVersions> {
self.index.get_version_history_lock()
}
fn next_table_id(&self) -> TableId {
self.index.next_table_id()
}
fn id(&self) -> crate::TreeId {
self.index.id()
}
fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
self.index.get_internal_entry(key, seqno)
}
fn current_version(&self) -> Version {
self.index.current_version()
}
#[cfg(feature = "metrics")]
fn metrics(&self) -> &Arc<crate::Metrics> {
self.index.metrics()
}
fn version_free_list_len(&self) -> usize {
self.index.version_free_list_len()
}
fn prefix<K: AsRef<[u8]>>(
&self,
prefix: K,
seqno: SeqNo,
index: Option<(Arc<Memtable>, SeqNo)>,
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
use crate::prefix::compute_prefix_hash;
use crate::range::prefix_to_range;
let prefix_bytes = prefix.as_ref();
let prefix_hash =
compute_prefix_hash(self.index.config.prefix_extractor.as_ref(), prefix_bytes);
let super_version = self.index.get_version_for_snapshot(seqno);
let tree = self.clone();
let range = prefix_to_range(prefix_bytes);
Box::new(
crate::Tree::create_internal_range_with_prefix_hash(
super_version.clone(),
&range,
seqno,
index,
None, self.index.config.comparator.clone(),
prefix_hash,
)
.map(move |kv| {
IterGuardImpl::Blob(Guard {
tree: tree.clone(),
version: super_version.version.clone(),
kv,
})
}),
)
}
fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
&self,
range: R,
seqno: SeqNo,
index: Option<(Arc<Memtable>, SeqNo)>,
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
let super_version = self.index.get_version_for_snapshot(seqno);
let tree = self.clone();
Box::new(
crate::Tree::create_internal_range(
super_version.clone(),
&range,
seqno,
index,
None,
self.index.config.comparator.clone(),
)
.map(move |kv| {
IterGuardImpl::Blob(Guard {
tree: tree.clone(),
version: super_version.version.clone(),
kv,
})
}),
)
}
fn tombstone_count(&self) -> u64 {
self.index.tombstone_count()
}
fn weak_tombstone_count(&self) -> u64 {
self.index.weak_tombstone_count()
}
fn weak_tombstone_reclaimable_count(&self) -> u64 {
self.index.weak_tombstone_reclaimable_count()
}
fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
self.index.drop_range(range)
}
fn clear(&self) -> crate::Result<()> {
self.index.clear()
}
fn major_compact(
&self,
target_size: u64,
seqno_threshold: SeqNo,
) -> crate::Result<crate::compaction::CompactionResult> {
self.index.major_compact(target_size, seqno_threshold)
}
fn clear_active_memtable(&self) {
self.index.clear_active_memtable();
}
fn l0_run_count(&self) -> usize {
self.index.l0_run_count()
}
fn blob_file_count(&self) -> usize {
self.current_version().blob_file_count()
}
fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
let Some(item) = self.index.get_internal_entry(key.as_ref(), seqno)? else {
return Ok(None);
};
Ok(Some(if item.key.value_type.is_indirection() {
let mut cursor = std::io::Cursor::new(item.value);
let vptr = BlobIndirection::decode_from(&mut cursor)?;
vptr.size
} else {
#[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
{
item.value.len() as u32
}
}))
}
fn stale_blob_bytes(&self) -> u64 {
self.current_version().gc_stats().stale_bytes()
}
fn filter_size(&self) -> u64 {
self.index.filter_size()
}
fn pinned_filter_size(&self) -> usize {
self.index.pinned_filter_size()
}
fn pinned_block_index_size(&self) -> usize {
self.index.pinned_block_index_size()
}
fn sealed_memtable_count(&self) -> usize {
self.index.sealed_memtable_count()
}
fn get_flush_lock(&self) -> MutexGuard<'_, ()> {
self.index.get_flush_lock()
}
#[expect(clippy::too_many_lines, reason = "flush logic is inherently complex")]
fn flush_to_tables_with_rt(
&self,
stream: impl Iterator<Item = crate::Result<InternalValue>>,
range_tombstones: Vec<crate::range_tombstone::RangeTombstone>,
) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>> {
use crate::{coding::Encode, file::BLOBS_FOLDER, table::multi_writer::MultiWriter};
let start = std::time::Instant::now();
let (table_folder, level_fs) = self.index.config.tables_folder_for_level(0);
let data_block_size = self.index.config.data_block_size_policy.get(0);
let data_block_restart_interval =
self.index.config.data_block_restart_interval_policy.get(0);
let index_block_restart_interval =
self.index.config.index_block_restart_interval_policy.get(0);
let data_block_compression = self.index.config.data_block_compression_policy.get(0);
let index_block_compression = self.index.config.index_block_compression_policy.get(0);
let data_block_hash_ratio = self.index.config.data_block_hash_ratio_policy.get(0);
let index_partitioning = self.index.config.index_block_partitioning_policy.get(0);
let filter_partitioning = self.index.config.filter_block_partitioning_policy.get(0);
log::debug!(
"Flushing memtable(s) and performing key-value separation, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, data_block_compression={data_block_compression:?}, index_block_compression={index_block_compression:?}"
);
log::debug!("=> to table(s) in {}", table_folder.display());
log::debug!("=> to blob file(s) at {}", self.blobs_folder.display());
let mut table_writer = MultiWriter::new(
table_folder.clone(),
self.index.table_id_counter.clone(),
64 * 1_024 * 1_024,
0,
level_fs.clone(),
)?
.set_comparator(self.index.config.comparator.clone())
.use_data_block_restart_interval(data_block_restart_interval)
.use_index_block_restart_interval(index_block_restart_interval)
.use_data_block_compression(data_block_compression)
.use_index_block_compression(index_block_compression)
.use_data_block_size(data_block_size)
.use_data_block_hash_ratio(data_block_hash_ratio)
.use_bloom_policy({
use crate::config::FilterPolicyEntry::{Bloom, None};
use crate::table::filter::BloomConstructionPolicy;
match self.index.config.filter_policy.get(0) {
Bloom(policy) => policy,
None => BloomConstructionPolicy::BitsPerKey(0.0),
}
});
if index_partitioning {
table_writer = table_writer.use_partitioned_index();
}
if filter_partitioning {
table_writer = table_writer.use_partitioned_filter();
}
table_writer =
table_writer.use_prefix_extractor(self.index.config.prefix_extractor.clone());
table_writer = table_writer.use_encryption(self.index.config.encryption.clone());
#[cfg(zstd_any)]
{
table_writer =
table_writer.use_zstd_dictionary(self.index.config.zstd_dictionary.clone());
}
#[expect(
clippy::expect_used,
reason = "cannot create blob tree without defining kv separation options"
)]
let kv_opts = self
.index
.config
.kv_separation_opts
.as_ref()
.expect("kv separation options should exist");
let mut blob_writer = BlobFileWriter::new(
self.index.0.blob_file_id_counter.clone(),
self.index.config.path.join(BLOBS_FOLDER),
self.id(),
self.index.config.descriptor_table.clone(),
self.index.config.fs.clone(),
)?
.use_target_size(kv_opts.file_target_size)
.use_compression(kv_opts.compression);
let separation_threshold = kv_opts.separation_threshold;
table_writer.set_range_tombstones(range_tombstones);
for item in stream {
let item = item?;
if item.is_tombstone() {
table_writer.write(InternalValue::new(item.key, UserValue::empty()))?;
continue;
}
let value = item.value;
#[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
let value_size = value.len() as u32;
if value_size >= separation_threshold {
let vhandle = blob_writer.write(&item.key.user_key, item.key.seqno, &value)?;
let indirection = BlobIndirection {
vhandle,
size: value_size,
};
table_writer.write({
let mut vptr =
InternalValue::new(item.key.clone(), indirection.encode_into_vec());
vptr.key.value_type = crate::ValueType::Indirection;
vptr
})?;
table_writer.register_blob(indirection);
} else {
table_writer.write(InternalValue::new(item.key, value))?;
}
}
let blob_files = blob_writer.finish()?;
let result = table_writer.finish()?;
log::debug!("Flushed memtable(s) in {:?}", start.elapsed());
let pin_filter = self.index.config.filter_block_pinning_policy.get(0);
let pin_index = self.index.config.index_block_pinning_policy.get(0);
let tables = result
.into_iter()
.map(|(table_id, checksum)| -> crate::Result<Table> {
Table::recover(
table_folder.join(table_id.to_string()),
checksum,
0,
self.index.id,
self.index.config.cache.clone(),
self.index.config.descriptor_table.clone(),
level_fs.clone(),
pin_filter,
pin_index,
self.index.config.encryption.clone(),
#[cfg(zstd_any)]
self.index.config.zstd_dictionary.clone(),
self.index.config.comparator.clone(),
#[cfg(feature = "metrics")]
self.index.metrics.clone(),
)
})
.collect::<crate::Result<Vec<_>>>()?;
Ok(Some((tables, Some(blob_files))))
}
fn register_tables(
&self,
tables: &[Table],
blob_files: Option<&[BlobFile]>,
frag_map: Option<FragmentationMap>,
sealed_memtables_to_delete: &[MemtableId],
gc_watermark: SeqNo,
) -> crate::Result<()> {
self.index.register_tables(
tables,
blob_files,
frag_map,
sealed_memtables_to_delete,
gc_watermark,
)
}
fn compact(
&self,
strategy: Arc<dyn crate::compaction::CompactionStrategy>,
seqno_threshold: SeqNo,
) -> crate::Result<crate::compaction::CompactionResult> {
self.index.compact(strategy, seqno_threshold)
}
fn get_next_table_id(&self) -> TableId {
self.index.get_next_table_id()
}
fn tree_config(&self) -> &Config {
&self.index.config
}
fn get_highest_seqno(&self) -> Option<SeqNo> {
self.index.get_highest_seqno()
}
fn active_memtable(&self) -> Arc<Memtable> {
self.index.active_memtable()
}
fn tree_type(&self) -> crate::TreeType {
crate::TreeType::Blob
}
fn rotate_memtable(&self) -> Option<Arc<Memtable>> {
self.index.rotate_memtable()
}
fn table_count(&self) -> usize {
self.index.table_count()
}
fn level_table_count(&self, idx: usize) -> Option<usize> {
self.index.level_table_count(idx)
}
fn approximate_len(&self) -> usize {
self.index.approximate_len()
}
fn is_empty(&self, seqno: SeqNo, index: Option<(Arc<Memtable>, SeqNo)>) -> crate::Result<bool> {
self.index.is_empty(seqno, index)
}
fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<bool> {
self.index.contains_key(key, seqno)
}
fn contains_prefix<K: AsRef<[u8]>>(
&self,
prefix: K,
seqno: SeqNo,
index: Option<(Arc<Memtable>, SeqNo)>,
) -> crate::Result<bool> {
self.index.contains_prefix(prefix, seqno, index)
}
fn len(&self, seqno: SeqNo, index: Option<(Arc<Memtable>, SeqNo)>) -> crate::Result<usize> {
self.index.len(seqno, index)
}
fn disk_space(&self) -> u64 {
let version = self.current_version();
self.index.disk_space() + version.blob_files.on_disk_size()
}
fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
self.index.get_highest_memtable_seqno()
}
fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
self.index.get_highest_persisted_seqno()
}
fn insert<K: Into<UserKey>, V: Into<UserValue>>(
&self,
key: K,
value: V,
seqno: SeqNo,
) -> (u64, u64) {
self.index.insert(key, value.into(), seqno)
}
fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<crate::UserValue>> {
let super_version = self.index.get_version_for_snapshot(seqno);
self.resolve_key(&super_version, key.as_ref(), seqno)
}
fn multi_get<K: AsRef<[u8]>>(
&self,
keys: impl IntoIterator<Item = K>,
seqno: SeqNo,
) -> crate::Result<Vec<Option<crate::UserValue>>> {
let super_version = self.index.get_version_for_snapshot(seqno);
keys.into_iter()
.map(|key| self.resolve_key(&super_version, key.as_ref(), seqno))
.collect()
}
fn merge<K: Into<UserKey>, V: Into<UserValue>>(
&self,
key: K,
operand: V,
seqno: SeqNo,
) -> (u64, u64) {
self.index.merge(key, operand, seqno)
}
fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
self.index.remove(key, seqno)
}
fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
self.index.remove_weak(key, seqno)
}
fn remove_range<K: Into<UserKey>>(&self, start: K, end: K, seqno: SeqNo) -> u64 {
self.index.remove_range(start, end, seqno)
}
}