mod compression;
mod gc;
pub mod index;
pub mod value;
use crate::{
coding::{Decode, Encode},
compaction::stream::CompactionStream,
file::BLOBS_FOLDER,
r#abstract::{AbstractTree, RangeItem},
tree::inner::MemtableId,
value::InternalValue,
Config, KvPair, Memtable, Segment, SegmentId, SeqNo, Snapshot, UserKey, UserValue,
};
use compression::MyCompressor;
use gc::{reader::GcReader, writer::GcWriter};
use index::IndexTree;
use std::{
io::Cursor,
ops::{RangeBounds, RangeFull},
sync::{atomic::AtomicUsize, Arc},
};
use value::MaybeInlineValue;
use value_log::ValueLog;
fn resolve_value_handle(vlog: &ValueLog<MyCompressor>, item: RangeItem) -> RangeItem {
use MaybeInlineValue::{Indirect, Inline};
match item {
Ok((key, value)) => {
let mut cursor = Cursor::new(value);
match MaybeInlineValue::decode_from(&mut cursor)? {
Inline(bytes) => Ok((key, bytes)),
Indirect { vhandle, .. } => {
match vlog.get(&vhandle) {
Ok(Some(bytes)) => Ok((key, bytes)),
Err(e) => Err(e.into()),
_ => {
panic!("value handle ({:?} => {vhandle:?}) did not match any blob - this is a bug", String::from_utf8_lossy(&key))
}
}
}
}
}
Err(e) => Err(e),
}
}
#[derive(Clone)]
pub struct BlobTree {
#[doc(hidden)]
pub index: IndexTree,
#[doc(hidden)]
pub blobs: ValueLog<MyCompressor>,
#[doc(hidden)]
pub pending_segments: Arc<AtomicUsize>,
}
impl BlobTree {
pub(crate) fn open(config: Config) -> crate::Result<Self> {
let path = &config.path;
let vlog_path = path.join(BLOBS_FOLDER);
let vlog_cfg = value_log::Config::<MyCompressor>::default()
.blob_cache(config.blob_cache.clone())
.segment_size_bytes(config.blob_file_target_size)
.compression(MyCompressor(config.blob_compression));
let index: IndexTree = config.open()?.into();
Ok(Self {
index,
blobs: ValueLog::open(vlog_path, vlog_cfg)?,
pending_segments: Arc::new(AtomicUsize::new(0)),
})
}
#[doc(hidden)]
pub fn gc_scan_stats(
&self,
seqno: SeqNo,
gc_watermark: SeqNo,
) -> crate::Result<crate::gc::Report> {
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use MaybeInlineValue::{Indirect, Inline};
while self
.pending_segments
.load(std::sync::atomic::Ordering::Acquire)
> 0
{
}
let _memtable_lock = self.index.read_lock_active_memtable();
while self
.pending_segments
.load(std::sync::atomic::Ordering::Acquire)
> 0
{
}
let iter = self
.index
.create_internal_range::<&[u8], RangeFull>(&.., Some(seqno), None);
let mut seqno_map = crate::HashMap::<SegmentId, SeqNo>::default();
let result = self
.blobs
.scan_for_stats(iter.filter_map(|kv| {
let Ok(kv) = kv else {
return Some(Err(IoError::new(
IoErrorKind::Other,
"Failed to load KV pair from index tree",
)));
};
let mut cursor = Cursor::new(kv.value);
let value = match MaybeInlineValue::decode_from(&mut cursor) {
Ok(v) => v,
Err(e) => return Some(Err(IoError::new(IoErrorKind::Other, e.to_string()))),
};
match value {
Indirect { vhandle, size } => {
seqno_map
.entry(vhandle.segment_id)
.and_modify(|x| *x = (*x).max(kv.key.seqno))
.or_insert(kv.key.seqno);
Some(Ok((vhandle, size)))
}
Inline(_) => None,
}
}))
.map_err(Into::into);
let mut lock = self
.blobs
.manifest
.segments
.write()
.expect("lock is poisoned");
for (blob_file_id, max_seqno) in seqno_map {
if gc_watermark <= max_seqno {
if let Some(blob_file) = lock.get_mut(&blob_file_id) {
blob_file.gc_stats.set_stale_items(0);
blob_file.gc_stats.set_stale_bytes(0);
}
}
}
result
}
pub fn apply_gc_strategy(
&self,
strategy: &impl value_log::GcStrategy<MyCompressor>,
seqno: SeqNo,
) -> crate::Result<u64> {
let memtable_lock = self.index.lock_active_memtable();
self.blobs.apply_gc_strategy(
strategy,
&GcReader::new(&self.index, &memtable_lock),
GcWriter::new(seqno, &memtable_lock),
)?;
self.blobs.drop_stale_segments().map_err(Into::into)
}
#[doc(hidden)]
pub fn gc_drop_stale(&self) -> crate::Result<u64> {
let _lock = self.index.lock_active_memtable();
self.blobs.drop_stale_segments().map_err(Into::into)
}
#[doc(hidden)]
pub fn flush_active_memtable(&self, eviction_seqno: SeqNo) -> crate::Result<Option<Segment>> {
let Some((segment_id, yanked_memtable)) = self.index.rotate_memtable() else {
return Ok(None);
};
let Some(segment) = self.flush_memtable(segment_id, &yanked_memtable, eviction_seqno)?
else {
return Ok(None);
};
self.register_segments(&[segment.clone()])?;
Ok(Some(segment))
}
}
impl AbstractTree for BlobTree {
fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: Option<SeqNo>) -> crate::Result<Option<u32>> {
let vhandle = self.index.get_vhandle(key.as_ref(), seqno)?;
Ok(vhandle.map(|x| match x {
MaybeInlineValue::Inline(v) => v.len() as u32,
MaybeInlineValue::Indirect { size, .. } => size,
}))
}
fn bloom_filter_size(&self) -> usize {
self.index.bloom_filter_size()
}
fn sealed_memtable_count(&self) -> usize {
self.index.sealed_memtable_count()
}
fn is_first_level_disjoint(&self) -> bool {
self.index.is_first_level_disjoint()
}
#[doc(hidden)]
fn verify(&self) -> crate::Result<usize> {
let index_tree_sum = self.index.verify()?;
let vlog_sum = self.blobs.verify()?;
Ok(index_tree_sum + vlog_sum)
}
fn keys(
&self,
seqno: Option<SeqNo>,
index: Option<Arc<Memtable>>,
) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserKey>> + 'static> {
self.index.keys(seqno, index)
}
fn values(
&self,
seqno: Option<SeqNo>,
index: Option<Arc<Memtable>>,
) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserValue>> + 'static> {
Box::new(self.iter(seqno, index).map(|x| x.map(|(_, v)| v)))
}
fn flush_memtable(
&self,
segment_id: SegmentId,
memtable: &Arc<Memtable>,
eviction_seqno: SeqNo,
) -> crate::Result<Option<Segment>> {
use crate::{
file::SEGMENTS_FOLDER,
segment::writer::{Options, Writer as SegmentWriter},
};
use value::MaybeInlineValue;
let lsm_segment_folder = self.index.config.path.join(SEGMENTS_FOLDER);
log::debug!("flushing memtable & performing key-value separation");
log::debug!("=> to LSM segments in {:?}", lsm_segment_folder);
log::debug!("=> to blob segment at {:?}", self.blobs.path);
let mut segment_writer = SegmentWriter::new(Options {
segment_id,
data_block_size: self.index.config.data_block_size,
index_block_size: self.index.config.index_block_size,
folder: lsm_segment_folder,
})?
.use_compression(self.index.config.compression);
segment_writer = segment_writer.use_bloom_policy(
crate::segment::writer::BloomConstructionPolicy::FpRate(0.0001),
);
let mut blob_writer = self.blobs.get_writer()?;
let iter = memtable.iter().map(Ok);
let compaction_filter = CompactionStream::new(iter, eviction_seqno);
for item in compaction_filter {
let item = item?;
if item.is_tombstone() {
segment_writer.write(InternalValue::new(item.key, vec![]))?;
continue;
}
let mut cursor = Cursor::new(item.value);
let value = MaybeInlineValue::decode_from(&mut cursor)?;
let value = match value {
MaybeInlineValue::Inline(value) => value,
indirection @ MaybeInlineValue::Indirect { .. } => {
let mut serialized_indirection = vec![];
indirection.encode_into(&mut serialized_indirection)?;
segment_writer
.write(InternalValue::new(item.key.clone(), serialized_indirection))?;
continue;
}
};
#[allow(clippy::cast_possible_truncation)]
let value_size = value.len() as u32;
if value_size >= self.index.config.blob_file_separation_threshold {
let vhandle = blob_writer.get_next_value_handle();
let indirection = MaybeInlineValue::Indirect {
vhandle,
size: value_size,
};
let mut serialized_indirection = vec![];
indirection.encode_into(&mut serialized_indirection)?;
segment_writer
.write(InternalValue::new(item.key.clone(), serialized_indirection))?;
blob_writer.write(&item.key.user_key, value)?;
} else {
let direct = MaybeInlineValue::Inline(value);
let serialized_direct = direct.encode_into_vec();
segment_writer.write(InternalValue::new(item.key, serialized_direct))?;
}
}
let _memtable_lock = self.lock_active_memtable();
log::trace!("Register blob writer into value log");
self.blobs.register_writer(blob_writer)?;
log::trace!("Creating LSM-tree segment {segment_id}");
let segment = self.index.consume_writer(segment_id, segment_writer)?;
if segment.is_some() {
self.pending_segments
.fetch_add(1, std::sync::atomic::Ordering::Release);
}
Ok(segment)
}
fn register_segments(&self, segments: &[Segment]) -> crate::Result<()> {
self.index.register_segments(segments)?;
let count = self
.pending_segments
.load(std::sync::atomic::Ordering::Acquire);
assert!(
count >= segments.len(),
"pending_segments is less than segments to register - this is a bug"
);
self.pending_segments
.fetch_sub(segments.len(), std::sync::atomic::Ordering::Release);
Ok(())
}
fn lock_active_memtable(&self) -> std::sync::RwLockWriteGuard<'_, Memtable> {
self.index.lock_active_memtable()
}
fn set_active_memtable(&self, memtable: Memtable) {
self.index.set_active_memtable(memtable);
}
fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
self.index.add_sealed_memtable(id, memtable);
}
fn compact(
&self,
strategy: Arc<dyn crate::compaction::CompactionStrategy>,
seqno_threshold: SeqNo,
) -> crate::Result<()> {
self.index.compact(strategy, seqno_threshold)
}
fn get_next_segment_id(&self) -> SegmentId {
self.index.get_next_segment_id()
}
fn tree_config(&self) -> &Config {
&self.index.config
}
fn get_highest_seqno(&self) -> Option<SeqNo> {
self.index.get_highest_seqno()
}
fn active_memtable_size(&self) -> u32 {
self.index.active_memtable_size()
}
fn tree_type(&self) -> crate::TreeType {
crate::TreeType::Blob
}
fn rotate_memtable(&self) -> Option<(crate::tree::inner::MemtableId, Arc<crate::Memtable>)> {
self.index.rotate_memtable()
}
fn segment_count(&self) -> usize {
self.index.segment_count()
}
fn first_level_segment_count(&self) -> usize {
self.index.first_level_segment_count()
}
fn approximate_len(&self) -> usize {
self.index.approximate_len()
}
fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: Option<SeqNo>) -> crate::Result<bool> {
self.index.contains_key(key, seqno)
}
fn len(&self, seqno: Option<SeqNo>, index: Option<Arc<Memtable>>) -> crate::Result<usize> {
self.index.len(seqno, index)
}
#[must_use]
fn disk_space(&self) -> u64 {
self.index.disk_space() + self.blobs.manifest.disk_space_used()
}
fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
self.index.get_highest_memtable_seqno()
}
fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
self.index.get_highest_persisted_seqno()
}
fn snapshot(&self, seqno: SeqNo) -> Snapshot {
use crate::AnyTree::Blob;
Snapshot::new(Blob(self.clone()), seqno)
}
fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
&self,
range: R,
seqno: Option<SeqNo>,
index: Option<Arc<Memtable>>,
) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
let vlog = self.blobs.clone();
Box::new(
self.index
.0
.create_range(&range, seqno, index)
.map(move |item| resolve_value_handle(&vlog, item)),
)
}
fn prefix<K: AsRef<[u8]>>(
&self,
prefix: K,
seqno: Option<SeqNo>,
index: Option<Arc<Memtable>>,
) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
let vlog = self.blobs.clone();
Box::new(
self.index
.0
.create_prefix(prefix, seqno, index)
.map(move |item| resolve_value_handle(&vlog, item)),
)
}
fn insert<K: Into<UserKey>, V: Into<UserValue>>(
&self,
key: K,
value: V,
seqno: SeqNo,
) -> (u32, u32) {
use value::MaybeInlineValue;
let item = MaybeInlineValue::Inline(value.into());
let value = item.encode_into_vec();
self.index.insert(key, value, seqno)
}
fn get<K: AsRef<[u8]>>(
&self,
key: K,
seqno: Option<SeqNo>,
) -> crate::Result<Option<crate::UserValue>> {
use value::MaybeInlineValue::{Indirect, Inline};
let key = key.as_ref();
let Some(value) = self.index.get_vhandle(key, seqno)? else {
return Ok(None);
};
match value {
Inline(bytes) => Ok(Some(bytes)),
Indirect { vhandle, .. } => {
match self.blobs.get(&vhandle)? {
Some(bytes) => Ok(Some(bytes)),
None => {
panic!("value handle ({key:?} => {vhandle:?}) did not match any blob - this is a bug")
}
}
}
}
}
fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
self.index.remove(key, seqno)
}
fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
self.index.remove_weak(key, seqno)
}
}