use crate::{
BlobIndirection, InternalValue, KvSeparationOptions, SeqNo, UserKey, UserValue, ValueType,
coding::{Decode, Encode},
compaction::{
stream::{StreamFilter, StreamFilterVerdict},
worker::Options,
},
key::InternalKey,
version::Version,
vlog::{Accessor, BlobFileWriter, ValueHandle},
};
use std::{panic::RefUnwindSafe, path::Path};
#[non_exhaustive]
#[derive(Debug, Default)]
pub enum Verdict {
#[default]
Keep,
Remove,
RemoveWeak,
ReplaceValue(UserValue),
Destroy,
}
pub trait CompactionFilter: Send {
fn filter_item(&mut self, item: ItemAccessor<'_>, ctx: &Context) -> crate::Result<Verdict>;
fn finish(self: Box<Self>) {}
}
#[non_exhaustive]
#[derive(Debug)]
pub struct Context {
pub is_last_level: bool,
}
pub trait Factory: Send + Sync + RefUnwindSafe {
fn name(&self) -> &str;
fn make_filter(&self, ctx: &Context) -> Box<dyn CompactionFilter>;
}
struct AccessorShared<'a> {
opts: &'a Options,
version: &'a Version,
blobs_folder: &'a Path,
}
impl AccessorShared<'_> {
fn get_indirect_value(
&self,
user_key: &[u8],
vhandle: &ValueHandle,
) -> crate::Result<Option<UserValue>> {
Accessor::new(&self.version.blob_files).get(
self.opts.tree_id,
self.blobs_folder,
user_key,
vhandle,
&self.opts.config.cache,
)
}
}
pub struct ItemAccessor<'a> {
item: &'a InternalValue,
shared: &'a AccessorShared<'a>,
}
impl<'a> ItemAccessor<'a> {
#[must_use]
pub fn key(&self) -> &'a UserKey {
&self.item.key.user_key
}
#[must_use]
pub fn seqno(&self) -> SeqNo {
self.item.key.seqno
}
#[must_use]
#[doc(hidden)]
pub fn is_indirection(&self) -> bool {
self.item.key.value_type.is_indirection()
}
pub fn value(&self) -> crate::Result<UserValue> {
match self.item.key.value_type {
crate::ValueType::Value | crate::ValueType::MergeOperand => Ok(self.item.value.clone()),
crate::ValueType::Indirection => {
let mut reader = &self.item.value[..];
let indirection = BlobIndirection::decode_from(&mut reader)?;
let vhandle = indirection.vhandle;
let value = self
.shared
.get_indirect_value(&self.item.key.user_key, &vhandle)?;
if let Some(value) = value {
Ok(value)
} else {
log::error!(
"failed to read referenced blob file during execution of compaction filter. key: {:?}, vptr: {indirection:?}",
self.item.key,
);
Err(crate::Error::Unrecoverable)
}
}
crate::ValueType::WeakTombstone | crate::ValueType::Tombstone => {
unreachable!("tombstones are filtered out before calling filter")
}
}
}
}
pub(crate) struct StreamFilterAdapter<'a, 'b: 'a> {
filter: Option<&'a mut (dyn CompactionFilter + 'b)>,
shared: AccessorShared<'a>,
blob_opts: Option<&'a KvSeparationOptions>,
blob_writer: &'a mut Option<BlobFileWriter>,
ctx: &'a Context,
}
impl<'a, 'b: 'a> StreamFilterAdapter<'a, 'b> {
pub fn new(
filter: Option<&'a mut (dyn CompactionFilter + 'b)>,
opts: &'a Options,
version: &'a Version,
blobs_folder: &'a Path,
blob_writer: &'a mut Option<BlobFileWriter>,
ctx: &'a Context,
) -> Self {
Self {
filter,
shared: AccessorShared {
opts,
version,
blobs_folder,
},
blob_opts: opts.config.kv_separation_opts.as_ref(),
blob_writer,
ctx,
}
}
fn handle_write(
&mut self,
prev_key: &InternalKey,
new_value: UserValue,
) -> crate::Result<(ValueType, UserValue)> {
let Some(blob_opts) = self.blob_opts else {
return Ok((ValueType::Value, new_value));
};
#[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
let value_size = new_value.len() as u32;
if value_size < blob_opts.separation_threshold {
return Ok((ValueType::Value, new_value));
}
let writer = if let Some(writer) = self.blob_writer {
writer
} else {
let writer = BlobFileWriter::new(
self.shared.opts.blob_file_id_generator.clone(),
self.shared.blobs_folder,
self.shared.opts.tree_id,
self.shared.opts.config.descriptor_table.clone(),
self.shared.opts.config.fs.clone(),
)?
.use_target_size(blob_opts.file_target_size)
.use_compression(blob_opts.compression);
self.blob_writer.insert(writer)
};
let vhandle = writer.write(&prev_key.user_key, prev_key.seqno, &new_value)?;
let indirection = BlobIndirection {
vhandle,
size: value_size,
};
Ok((ValueType::Indirection, indirection.encode_into_vec().into()))
}
}
impl<'a, 'b: 'a> StreamFilter for StreamFilterAdapter<'a, 'b> {
fn filter_item(&mut self, item: &InternalValue) -> crate::Result<StreamFilterVerdict> {
let Some(filter) = self.filter.as_mut() else {
return Ok(StreamFilterVerdict::Keep);
};
match filter.filter_item(
ItemAccessor {
item,
shared: &self.shared,
},
self.ctx,
)? {
Verdict::Destroy => Ok(StreamFilterVerdict::Drop),
Verdict::Keep => Ok(StreamFilterVerdict::Keep),
Verdict::Remove => Ok(StreamFilterVerdict::Replace((
ValueType::Tombstone,
UserValue::empty(),
))),
Verdict::RemoveWeak => Ok(StreamFilterVerdict::Replace((
ValueType::WeakTombstone,
UserValue::empty(),
))),
Verdict::ReplaceValue(new_value) => self
.handle_write(&item.key, new_value)
.map(StreamFilterVerdict::Replace),
}
}
}