mod compression;
mod gc;
pub mod index;
pub mod value;
use crate::{
coding::{Decode, Encode},
compaction::stream::CompactionStream,
file::BLOBS_FOLDER,
r#abstract::{AbstractTree, RangeItem},
tree::inner::MemtableId,
value::InternalValue,
Config, KvPair, Memtable, SegmentId, SeqNo, Slice, Snapshot, UserKey, UserValue, ValueType,
};
use compression::MyCompressor;
use gc::{reader::GcReader, writer::GcWriter};
use index::IndexTree;
use std::{
io::Cursor,
ops::RangeBounds,
sync::{Arc, RwLockWriteGuard},
};
use value::MaybeInlineValue;
use value_log::ValueLog;
fn resolve_value_handle(vlog: &ValueLog<MyCompressor>, item: RangeItem) -> RangeItem {
match item {
Ok((key, value)) => {
let mut cursor = Cursor::new(value);
let item = MaybeInlineValue::decode_from(&mut cursor)?;
match item {
MaybeInlineValue::Inline(bytes) => Ok((key, bytes)),
MaybeInlineValue::Indirect { vhandle, .. } => match vlog.get(&vhandle) {
Ok(Some(bytes)) => Ok((key, bytes)),
Err(e) => Err(e.into()),
_ => panic!("value handle did not match any blob - this is a bug"),
},
}
}
Err(e) => Err(e),
}
}
#[derive(Clone)]
pub struct BlobTree {
#[doc(hidden)]
pub index: IndexTree,
#[doc(hidden)]
pub blobs: ValueLog<MyCompressor>,
}
impl BlobTree {
pub(crate) fn open(config: Config) -> crate::Result<Self> {
let path = &config.path;
let vlog_path = path.join(BLOBS_FOLDER);
let vlog_cfg = value_log::Config::<MyCompressor>::default()
.blob_cache(config.blob_cache.clone())
.segment_size_bytes(config.blob_file_target_size)
.compression(MyCompressor(config.blob_compression));
let index: IndexTree = config.open()?.into();
Ok(Self {
index,
blobs: ValueLog::open(vlog_path, vlog_cfg)?,
})
}
#[doc(hidden)]
pub fn gc_scan_stats(&self, seqno: SeqNo) -> crate::Result<crate::GcReport> {
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use MaybeInlineValue::{Indirect, Inline};
let _memtable_lock = self.index.read_lock_active_memtable();
let snapshot = self.index.snapshot(seqno);
self.blobs
.scan_for_stats(snapshot.iter().filter_map(|kv| {
let Ok((_, v)) = kv else {
return Some(Err(IoError::new(
IoErrorKind::Other,
"Failed to load KV pair from index tree",
)));
};
let mut cursor = Cursor::new(v);
let value = match MaybeInlineValue::decode_from(&mut cursor) {
Ok(v) => v,
Err(e) => return Some(Err(IoError::new(IoErrorKind::Other, e.to_string()))),
};
match value {
Indirect { vhandle, size } => Some(Ok((vhandle, size))),
Inline(_) => None,
}
}))
.map_err(Into::into)
}
pub fn apply_gc_strategy(
&self,
strategy: &impl value_log::GcStrategy<MyCompressor>,
seqno: SeqNo,
) -> crate::Result<u64> {
let memtable_lock = self.index.lock_active_memtable();
self.blobs.apply_gc_strategy(
strategy,
&GcReader::new(&self.index, &memtable_lock),
GcWriter::new(seqno, &memtable_lock),
)?;
self.blobs.drop_stale_segments().map_err(Into::into)
}
#[doc(hidden)]
pub fn gc_drop_stale(&self) -> crate::Result<u64> {
let _lock = self.index.lock_active_memtable();
self.blobs.drop_stale_segments().map_err(Into::into)
}
#[doc(hidden)]
pub fn flush_active_memtable(
&self,
eviction_seqno: SeqNo,
) -> crate::Result<Option<Arc<crate::Segment>>> {
let Some((segment_id, yanked_memtable)) = self.index.rotate_memtable() else {
return Ok(None);
};
let Some(segment) = self.flush_memtable(segment_id, &yanked_memtable, eviction_seqno)?
else {
return Ok(None);
};
self.register_segments(&[segment.clone()])?;
Ok(Some(segment))
}
}
impl AbstractTree for BlobTree {
fn sealed_memtable_count(&self) -> usize {
self.index.sealed_memtable_count()
}
fn is_first_level_disjoint(&self) -> bool {
self.index.is_first_level_disjoint()
}
#[doc(hidden)]
fn verify(&self) -> crate::Result<usize> {
let index_tree_sum = self.index.verify()?;
let vlog_sum = self.blobs.verify()?;
Ok(index_tree_sum + vlog_sum)
}
fn keys_with_seqno(
&self,
seqno: SeqNo,
index: Option<Arc<Memtable>>,
) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserKey>> + 'static> {
self.index.keys_with_seqno(seqno, index)
}
fn values_with_seqno(
&self,
seqno: SeqNo,
index: Option<Arc<Memtable>>,
) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserValue>> + 'static> {
Box::new(
self.iter_with_seqno(seqno, index)
.map(|x| x.map(|(_, v)| v)),
)
}
fn keys(&self) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserKey>> + 'static> {
self.index.keys()
}
fn values(&self) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserKey>> + 'static> {
Box::new(self.iter().map(|x| x.map(|(_, v)| v)))
}
fn flush_memtable(
&self,
segment_id: SegmentId,
memtable: &Arc<Memtable>,
eviction_seqno: SeqNo,
) -> crate::Result<Option<Arc<crate::Segment>>> {
use crate::{
file::SEGMENTS_FOLDER,
segment::writer::{Options, Writer as SegmentWriter},
};
use value::MaybeInlineValue;
let lsm_segment_folder = self.index.config.path.join(SEGMENTS_FOLDER);
log::debug!("flushing memtable & performing key-value separation");
log::debug!("=> to LSM segments in {:?}", lsm_segment_folder);
log::debug!("=> to blob segment at {:?}", self.blobs.path);
let mut segment_writer = SegmentWriter::new(Options {
segment_id,
data_block_size: self.index.config.data_block_size,
index_block_size: self.index.config.index_block_size,
evict_tombstones: false,
folder: lsm_segment_folder,
})?
.use_compression(self.index.config.compression);
#[cfg(feature = "bloom")]
{
segment_writer = segment_writer.use_bloom_policy(
crate::segment::writer::BloomConstructionPolicy::FpRate(0.0001),
);
}
let mut blob_writer = self.blobs.get_writer()?;
let iter = memtable.iter().map(Ok);
let compaction_filter = CompactionStream::new(iter, eviction_seqno);
for item in compaction_filter {
let item = item?;
if item.is_tombstone() {
segment_writer.write(InternalValue::new(item.key, vec![]))?;
continue;
}
let mut cursor = Cursor::new(item.value);
let value = MaybeInlineValue::decode_from(&mut cursor)?;
let value = match value {
MaybeInlineValue::Inline(value) => value,
indirection @ MaybeInlineValue::Indirect { .. } => {
let mut serialized_indirection = vec![];
indirection.encode_into(&mut serialized_indirection)?;
segment_writer
.write(InternalValue::new(item.key.clone(), serialized_indirection))?;
continue;
}
};
#[allow(clippy::cast_possible_truncation)]
let value_size = value.len() as u32;
if value_size >= self.index.config.blob_file_separation_threshold {
let vhandle = blob_writer.get_next_value_handle();
let indirection = MaybeInlineValue::Indirect {
vhandle,
size: value_size,
};
let mut serialized_indirection = vec![];
indirection.encode_into(&mut serialized_indirection)?;
segment_writer
.write(InternalValue::new(item.key.clone(), serialized_indirection))?;
blob_writer.write(&item.key.user_key, value)?;
} else {
let direct = MaybeInlineValue::Inline(value);
let serialized_direct = direct.encode_into_vec()?;
segment_writer.write(InternalValue::new(item.key, serialized_direct))?;
}
}
log::trace!("Register blob writer into value log");
self.blobs.register_writer(blob_writer)?;
log::trace!("Creating segment");
self.index.consume_writer(segment_id, segment_writer)
}
fn register_segments(&self, segments: &[Arc<crate::Segment>]) -> crate::Result<()> {
self.index.register_segments(segments)
}
fn lock_active_memtable(&self) -> std::sync::RwLockWriteGuard<'_, Memtable> {
self.index.lock_active_memtable()
}
fn set_active_memtable(&self, memtable: Memtable) {
self.index.set_active_memtable(memtable);
}
fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
self.index.add_sealed_memtable(id, memtable);
}
fn compact(
&self,
strategy: Arc<dyn crate::compaction::CompactionStrategy>,
seqno_threshold: SeqNo,
) -> crate::Result<()> {
self.index.compact(strategy, seqno_threshold)
}
fn get_next_segment_id(&self) -> SegmentId {
self.index.get_next_segment_id()
}
fn tree_config(&self) -> &Config {
&self.index.config
}
fn get_highest_seqno(&self) -> Option<SeqNo> {
self.index.get_highest_seqno()
}
fn active_memtable_size(&self) -> u32 {
self.index.active_memtable_size()
}
fn tree_type(&self) -> crate::TreeType {
crate::TreeType::Blob
}
fn rotate_memtable(&self) -> Option<(crate::tree::inner::MemtableId, Arc<crate::Memtable>)> {
self.index.rotate_memtable()
}
fn segment_count(&self) -> usize {
self.index.segment_count()
}
fn first_level_segment_count(&self) -> usize {
self.index.first_level_segment_count()
}
fn approximate_len(&self) -> usize {
self.index.approximate_len()
}
fn contains_key<K: AsRef<[u8]>>(&self, key: K) -> crate::Result<bool> {
self.index.contains_key(key)
}
fn contains_key_with_seqno<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<bool> {
self.index.contains_key_with_seqno(key, seqno)
}
fn len(&self) -> crate::Result<usize> {
self.index.len()
}
#[must_use]
fn disk_space(&self) -> u64 {
self.index.disk_space() + self.blobs.manifest.disk_space_used()
}
fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
self.index.get_highest_memtable_seqno()
}
fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
self.index.get_highest_persisted_seqno()
}
fn snapshot(&self, seqno: SeqNo) -> Snapshot {
use crate::AnyTree::Blob;
Snapshot::new(Blob(self.clone()), seqno)
}
fn iter_with_seqno(
&self,
seqno: SeqNo,
index: Option<Arc<Memtable>>,
) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
self.range_with_seqno::<UserKey, _>(.., seqno, index)
}
fn range_with_seqno<K: AsRef<[u8]>, R: RangeBounds<K>>(
&self,
range: R,
seqno: SeqNo,
index: Option<Arc<Memtable>>,
) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
let vlog = self.blobs.clone();
Box::new(
self.index
.0
.create_range(&range, Some(seqno), index)
.map(move |item| resolve_value_handle(&vlog, item)),
)
}
fn prefix_with_seqno<K: AsRef<[u8]>>(
&self,
prefix: K,
seqno: SeqNo,
index: Option<Arc<Memtable>>,
) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
let vlog = self.blobs.clone();
Box::new(
self.index
.0
.create_prefix(prefix, Some(seqno), index)
.map(move |item| resolve_value_handle(&vlog, item)),
)
}
fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
&self,
range: R,
) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
let vlog = self.blobs.clone();
Box::new(
self.index
.0
.create_range(&range, None, None)
.map(move |item| resolve_value_handle(&vlog, item)),
)
}
fn prefix<K: AsRef<[u8]>>(
&self,
prefix: K,
) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
let vlog = self.blobs.clone();
Box::new(
self.index
.0
.create_prefix(prefix, None, None)
.map(move |item| resolve_value_handle(&vlog, item)),
)
}
fn raw_insert_with_lock<K: AsRef<[u8]>, V: AsRef<[u8]>>(
&self,
lock: &RwLockWriteGuard<'_, Memtable>,
key: K,
value: V,
seqno: SeqNo,
r#type: ValueType,
) -> (u32, u32) {
use value::MaybeInlineValue;
let item = MaybeInlineValue::Inline(value.as_ref().into());
let value = item.encode_into_vec().expect("should serialize");
let value = InternalValue::from_components(key.as_ref(), value, seqno, r#type);
lock.insert(value)
}
fn insert<K: AsRef<[u8]>, V: AsRef<[u8]>>(&self, key: K, value: V, seqno: SeqNo) -> (u32, u32) {
use value::MaybeInlineValue;
let item = MaybeInlineValue::Inline(value.as_ref().into());
let value = item.encode_into_vec().expect("should serialize");
self.index.insert(key, value, seqno)
}
fn get_with_seqno<K: AsRef<[u8]>>(
&self,
key: K,
seqno: SeqNo,
) -> crate::Result<Option<crate::UserValue>> {
use value::MaybeInlineValue::{Indirect, Inline};
let Some(value) = self.index.get_internal_with_seqno(key.as_ref(), seqno)? else {
return Ok(None);
};
match value {
Inline(bytes) => Ok(Some(bytes)),
Indirect { vhandle, .. } => {
Ok(self.blobs.get(&vhandle)?.map(Slice::from))
}
}
}
fn get<K: AsRef<[u8]>>(&self, key: K) -> crate::Result<Option<Slice>> {
use value::MaybeInlineValue::{Indirect, Inline};
let Some(value) = self.index.get_internal(key.as_ref())? else {
return Ok(None);
};
match value {
Inline(bytes) => Ok(Some(bytes)),
Indirect { vhandle, .. } => {
Ok(self.blobs.get(&vhandle)?.map(Slice::from))
}
}
}
fn remove<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
self.index.remove(key, seqno)
}
fn remove_weak<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
self.index.remove_weak(key, seqno)
}
}