use crate::batch::{WriteBatch, WriteBatchIterator};
use crate::bytes_range::BytesRange;
use crate::clock::MonotonicClock;
use crate::config::{DurabilityLevel, ReadOptions, ScanOptions};
use crate::db_state::ManifestCore;
use crate::db_stats::DbStats;
use crate::iter::RowEntryIterator;
use crate::mem_table::{ImmutableMemtable, KVTable};
use crate::merge_operator::{instrument_merge_operator, MergeOperatorType};
use crate::oracle::Oracle;
use crate::sorted_run_iterator::SortedRunIterator;
use crate::sst_iter::{SstIterator, SstIteratorOptions};
use crate::tablestore::TableStore;
use crate::types::KeyValue;
use crate::utils::{build_concurrent, compute_max_parallel};
use crate::{db_iter::DbIteratorRangeTracker, error::SlateDBError, DbIterator};
use futures::future::join;
use std::collections::VecDeque;
use std::sync::Arc;
pub(crate) trait DbStateReader {
fn memtable(&self) -> Arc<KVTable>;
fn imm_memtable(&self) -> &VecDeque<Arc<ImmutableMemtable>>;
fn core(&self) -> &ManifestCore;
}
struct IteratorSources {
write_batch_iter: Option<WriteBatchIterator>,
mem_iters: Vec<Box<dyn RowEntryIterator + 'static>>,
l0_iters: VecDeque<Box<dyn RowEntryIterator + 'static>>,
sr_iters: VecDeque<Box<dyn RowEntryIterator + 'static>>,
}
pub(crate) struct Reader {
pub(crate) table_store: Arc<TableStore>,
pub(crate) db_stats: DbStats,
#[allow(dead_code)] pub(crate) mono_clock: Arc<MonotonicClock>,
pub(crate) oracle: Arc<dyn Oracle>,
pub(crate) read_merge_operator: Option<MergeOperatorType>,
}
impl Reader {
pub(crate) fn new(
table_store: Arc<TableStore>,
db_stats: DbStats,
mono_clock: Arc<MonotonicClock>,
oracle: Arc<dyn Oracle>,
merge_operator: Option<MergeOperatorType>,
) -> Self {
let read_merge_operator = merge_operator.map(|merge_operator| {
instrument_merge_operator(
merge_operator,
db_stats.merge_operator_read_operands.clone(),
)
});
Self {
table_store,
db_stats,
mono_clock,
oracle,
read_merge_operator,
}
}
fn prepare_max_seq(
&self,
max_seq_by_user: Option<u64>,
durability_filter: DurabilityLevel,
dirty: bool,
) -> Option<u64> {
let mut max_seq: Option<u64> = None;
if matches!(durability_filter, DurabilityLevel::Remote) {
max_seq = Some(self.oracle.last_remote_persisted_seq());
}
if !dirty {
max_seq = max_seq
.map(|seq| seq.min(self.oracle.last_committed_seq()))
.or(Some(self.oracle.last_committed_seq()));
}
if let Some(max_seq_by_user) = max_seq_by_user {
max_seq = max_seq
.map(|seq| seq.min(max_seq_by_user))
.or(Some(max_seq_by_user));
}
max_seq
}
async fn build_iterator_sources(
&self,
range: &BytesRange,
db_state: &(dyn DbStateReader + Sync),
write_batch: Option<WriteBatch>,
sst_iter_options: SstIteratorOptions,
point_lookup_stats: Option<DbStats>,
) -> Result<IteratorSources, SlateDBError> {
let write_batch_iter = write_batch
.map(|batch| WriteBatchIterator::new(batch, range.clone(), sst_iter_options.order));
let mut memtables = VecDeque::new();
memtables.push_back(db_state.memtable());
for memtable in db_state.imm_memtable() {
memtables.push_back(memtable.table());
}
let mem_iters = memtables
.iter()
.map(|table| {
Box::new(table.range(range.clone(), sst_iter_options.order))
as Box<dyn RowEntryIterator + 'static>
})
.collect::<Vec<_>>();
let max_parallel =
compute_max_parallel(db_state.core().l0.len(), &db_state.core().compacted, 4);
let (l0_iters, sr_iters) = if let Some(point_key) = range.as_point() {
let l0 = self.build_point_l0_iters(
range,
db_state,
sst_iter_options,
point_lookup_stats.clone(),
)?;
let sr = self.build_point_sr_iters(
range,
point_key.as_ref(),
db_state,
sst_iter_options,
point_lookup_stats,
)?;
(l0, sr)
} else {
let l0_future =
self.build_range_l0_iters(range, db_state, sst_iter_options, max_parallel);
let sr_future =
self.build_range_sr_iters(range, db_state, sst_iter_options, max_parallel);
let (l0_res, sr_res) = join(l0_future, sr_future).await;
(l0_res?, sr_res?)
};
Ok(IteratorSources {
write_batch_iter,
mem_iters,
l0_iters,
sr_iters,
})
}
fn build_point_l0_iters<'a>(
&self,
range: &BytesRange,
db_state: &(dyn DbStateReader + Sync),
sst_iter_options: SstIteratorOptions,
db_stats: Option<DbStats>,
) -> Result<VecDeque<Box<dyn RowEntryIterator + 'a>>, SlateDBError> {
let mut iters = VecDeque::new();
for sst in &db_state.core().l0 {
let iterator = SstIterator::new_owned_with_stats(
range.clone(),
sst.clone(),
self.table_store.clone(),
sst_iter_options,
db_stats.clone(),
)?;
if let Some(iterator) = iterator {
iters.push_back(Box::new(iterator) as Box<dyn RowEntryIterator + 'a>);
}
}
Ok(iters)
}
fn build_point_sr_iters<'a>(
&self,
range: &BytesRange,
key: &[u8],
db_state: &(dyn DbStateReader + Sync),
sst_iter_options: SstIteratorOptions,
db_stats: Option<DbStats>,
) -> Result<VecDeque<Box<dyn RowEntryIterator + 'a>>, SlateDBError> {
let mut iters = VecDeque::new();
for sr in &db_state.core().compacted {
for handle in sr.tables_covering_point_key(key) {
let iterator = SstIterator::new_owned_with_stats(
range.clone(),
handle.clone(),
self.table_store.clone(),
sst_iter_options,
db_stats.clone(),
)?;
if let Some(iterator) = iterator {
iters.push_back(Box::new(iterator) as Box<dyn RowEntryIterator + 'a>);
}
}
}
Ok(iters)
}
async fn build_range_l0_iters<'a>(
&self,
range: &BytesRange,
db_state: &(dyn DbStateReader + Sync),
sst_iter_options: SstIteratorOptions,
max_parallel: usize,
) -> Result<VecDeque<Box<dyn RowEntryIterator + 'a>>, SlateDBError> {
let range_clone = range.clone();
let table_store = self.table_store.clone();
build_concurrent(
db_state.core().l0.iter().cloned(),
max_parallel,
move |sst| {
let table_store = table_store.clone();
let range = range_clone.clone();
async move {
SstIterator::new_owned_initialized(
range.clone(),
sst,
table_store,
sst_iter_options,
)
.await
.map(|maybe_iter| {
maybe_iter.map(|iter| Box::new(iter) as Box<dyn RowEntryIterator + 'a>)
})
}
},
)
.await
}
async fn build_range_sr_iters<'a>(
&self,
range: &BytesRange,
db_state: &(dyn DbStateReader + Sync),
sst_iter_options: SstIteratorOptions,
max_parallel: usize,
) -> Result<VecDeque<Box<dyn RowEntryIterator + 'a>>, SlateDBError> {
let range_clone = range.clone();
let table_store = self.table_store.clone();
build_concurrent(
db_state.core().compacted.iter().cloned(),
max_parallel,
move |sr| {
let table_store = table_store.clone();
let range = range_clone.clone();
async move {
SortedRunIterator::new_owned_initialized_with_stats(
range.clone(),
sr,
table_store,
sst_iter_options,
Some(self.db_stats.clone()),
)
.await
.map(|iter| Some(Box::new(iter) as Box<dyn RowEntryIterator + 'a>))
}
},
)
.await
}
pub(crate) async fn get_key_value_with_options<K: AsRef<[u8]>>(
&self,
key: K,
options: &ReadOptions,
db_state: &(dyn DbStateReader + Sync + Send),
write_batch: Option<WriteBatch>,
max_seq: Option<u64>,
) -> Result<Option<KeyValue>, SlateDBError> {
self.db_stats.get_requests.increment(1);
let max_seq = self.prepare_max_seq(max_seq, options.durability_filter, options.dirty);
let key_slice = key.as_ref();
let range = BytesRange::from_slice(key_slice..=key_slice);
let sst_iter_options = SstIteratorOptions {
cache_blocks: options.cache_blocks,
eager_spawn: true,
..SstIteratorOptions::default()
};
let IteratorSources {
write_batch_iter,
mem_iters,
l0_iters,
sr_iters,
} = self
.build_iterator_sources(
&range,
db_state,
write_batch,
sst_iter_options,
Some(self.db_stats.clone()),
)
.await?;
let mut iterator = DbIterator::new(
range,
write_batch_iter,
mem_iters,
l0_iters,
sr_iters,
max_seq,
None,
self.read_merge_operator.clone(),
sst_iter_options.order,
)
.await?;
iterator
.next_entry()
.await?
.map(|entry| {
if entry.value.is_tombstone() {
Err(SlateDBError::UnexpectedTombstone)
} else {
Ok(KeyValue::from(entry))
}
})
.transpose()
}
pub(crate) async fn scan_with_options(
&self,
range: BytesRange,
options: &ScanOptions,
db_state: &(dyn DbStateReader + Sync),
write_batch: Option<WriteBatch>,
max_seq: Option<u64>,
range_tracker: Option<Arc<DbIteratorRangeTracker>>,
) -> Result<DbIterator, SlateDBError> {
self.db_stats.scan_requests.increment(1);
let max_seq = self.prepare_max_seq(max_seq, options.durability_filter, options.dirty);
let read_ahead_blocks = self.table_store.bytes_to_blocks(options.read_ahead_bytes);
let sst_iter_options = SstIteratorOptions {
max_fetch_tasks: options.max_fetch_tasks,
blocks_to_fetch: read_ahead_blocks,
cache_blocks: options.cache_blocks,
eager_spawn: true,
order: options.order,
};
let IteratorSources {
write_batch_iter,
mem_iters,
l0_iters,
sr_iters,
} = self
.build_iterator_sources(&range, db_state, write_batch, sst_iter_options, None)
.await?;
DbIterator::new(
range,
write_batch_iter,
mem_iters,
l0_iters,
sr_iters,
max_seq,
range_tracker,
self.read_merge_operator.clone(),
options.order,
)
.await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::merge_operator::{
MergeOperator, MergeOperatorError, MERGE_OPERATOR_FLUSH_PATH, MERGE_OPERATOR_READ_PATH,
};
use crate::test_utils::lookup_merge_operator_operands;
use crate::types::{RowEntry, ValueDeletable};
use bytes::Bytes;
use rstest::rstest;
use slatedb_common::clock::{MockSystemClock, SystemClock};
use crate::batch::WriteBatch;
use crate::clock::MonotonicClock;
use crate::db_state::{SortedRun, SsTableHandle, SsTableId};
use crate::db_status::DbStatusManager;
use crate::format::sst::SsTableFormat;
use crate::manifest::SsTableView;
use crate::object_stores::ObjectStores;
use crate::oracle::DbReaderOracle;
use crate::tablestore::TableStore;
use object_store::{memory::InMemory, path::Path, ObjectStore};
use slatedb_common::metrics::{
lookup_metric, DefaultMetricsRecorder, MetricLevel, MetricsRecorder, MetricsRecorderHelper,
};
use std::collections::HashMap;
use std::sync::Arc;
use ulid::Ulid;
struct StringConcatMergeOperator;
impl MergeOperator for StringConcatMergeOperator {
fn merge(
&self,
_key: &Bytes,
existing_value: Option<Bytes>,
operand: Bytes,
) -> Result<Bytes, MergeOperatorError> {
match existing_value {
Some(base) => {
let mut merged = base.to_vec();
merged.extend_from_slice(&operand);
Ok(Bytes::from(merged))
}
None => Ok(operand),
}
}
}
struct TestDbState {
memtable: Arc<KVTable>,
imm_memtable: VecDeque<Arc<ImmutableMemtable>>,
core: ManifestCore,
table_store: Arc<TableStore>,
}
impl TestDbState {
async fn new() -> Self {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let table_store = Arc::new(TableStore::new(
ObjectStores::new(object_store, None),
SsTableFormat::default(),
Path::from("/test"),
None,
));
Self {
memtable: Arc::new(KVTable::new()),
imm_memtable: VecDeque::new(),
core: ManifestCore::new(),
table_store,
}
}
fn add_to_memtable(&mut self, entries: Vec<RowEntry>) {
for entry in entries {
self.memtable.put(entry);
}
}
fn add_immutable_memtable(&mut self, entries: Vec<RowEntry>) {
let writable = crate::mem_table::WritableKVTable::new();
for entry in entries {
writable.put(entry);
}
let imm = Arc::new(ImmutableMemtable::new(writable, 0));
self.imm_memtable.push_back(imm);
}
async fn add_to_l0(&mut self, entries: Vec<RowEntry>) -> Result<(), SlateDBError> {
if entries.is_empty() {
return Ok(());
}
let sst_handle = self.build_sst(entries).await?;
self.core.l0.push_front(SsTableView::identity(sst_handle));
Ok(())
}
async fn add_to_sorted_run(
&mut self,
sr_id: u32,
entries: Vec<RowEntry>,
) -> Result<(), SlateDBError> {
if entries.is_empty() {
return Ok(());
}
let sst_handle = self.build_sst(entries).await?;
if let Some(sr) = self.core.compacted.iter_mut().find(|sr| sr.id == sr_id) {
sr.sst_views.push(SsTableView::identity(sst_handle));
} else {
let new_sr = SortedRun {
id: sr_id,
sst_views: vec![SsTableView::identity(sst_handle)],
};
self.core.compacted.push(new_sr);
}
Ok(())
}
async fn build_sst(
&self,
mut entries: Vec<RowEntry>,
) -> Result<SsTableHandle, SlateDBError> {
entries.sort_by(|a, b| a.key.cmp(&b.key));
let mut builder = self.table_store.table_builder();
for entry in entries {
builder.add(entry).await?;
}
let encoded = builder.build().await?;
let id = SsTableId::Compacted(Ulid::new());
self.table_store.write_sst(&id, encoded, false).await
}
}
impl DbStateReader for TestDbState {
fn memtable(&self) -> Arc<KVTable> {
self.memtable.clone()
}
fn imm_memtable(&self) -> &VecDeque<Arc<ImmutableMemtable>> {
&self.imm_memtable
}
fn core(&self) -> &ManifestCore {
&self.core
}
}
#[derive(Debug, Clone)]
enum LayerLocation {
WriteBatch,
Memtable,
ImmutableMemtable(usize), L0Sst(usize), SortedRun(u32), }
#[derive(Debug, Clone)]
struct TestEntry {
location: LayerLocation,
key: &'static [u8],
value: ValueDeletable,
seq: u64,
expire_ts: Option<i64>, }
impl TestEntry {
fn value(key: &'static [u8], val: &'static [u8], seq: u64) -> Self {
Self {
location: LayerLocation::Memtable,
key,
value: ValueDeletable::Value(Bytes::from_static(val)),
seq,
expire_ts: None,
}
}
fn tombstone(key: &'static [u8], seq: u64) -> Self {
Self {
location: LayerLocation::Memtable,
key,
value: ValueDeletable::Tombstone,
seq,
expire_ts: None,
}
}
fn merge(key: &'static [u8], val: &'static [u8], seq: u64) -> Self {
Self {
location: LayerLocation::Memtable,
key,
value: ValueDeletable::Merge(Bytes::from_static(val)),
seq,
expire_ts: None,
}
}
fn with_location(mut self, location: LayerLocation) -> Self {
self.location = location;
self
}
fn with_expire_ts(mut self, expire_ts: i64) -> Self {
self.expire_ts = Some(expire_ts);
self
}
fn to_row_entry(&self) -> RowEntry {
RowEntry::new(
Bytes::from_static(self.key),
self.value.clone(),
self.seq,
None,
self.expire_ts,
)
}
}
async fn populate_db_state(
test_db_state: &mut TestDbState,
entries: Vec<TestEntry>,
) -> Result<Option<WriteBatch>, SlateDBError> {
let mut wb_batch: Option<WriteBatch> = None;
let mut mem_entries = Vec::new();
let mut imm_entries: HashMap<usize, Vec<RowEntry>> = HashMap::new();
let mut l0_entries: HashMap<usize, Vec<RowEntry>> = HashMap::new();
let mut sr_entries: HashMap<u32, Vec<RowEntry>> = HashMap::new();
for entry in entries {
let row_entry = entry.to_row_entry();
match entry.location {
LayerLocation::WriteBatch => {
if wb_batch.is_none() {
wb_batch = Some(WriteBatch::new());
}
if let Some(ref mut batch) = wb_batch {
match &entry.value {
ValueDeletable::Value(v) => {
batch.put(entry.key, v.as_ref());
}
ValueDeletable::Tombstone => {
batch.delete(entry.key);
}
ValueDeletable::Merge(v) => {
batch.merge(entry.key, v.as_ref());
}
}
}
}
LayerLocation::Memtable => mem_entries.push(row_entry),
LayerLocation::ImmutableMemtable(idx) => {
imm_entries.entry(idx).or_default().push(row_entry);
}
LayerLocation::L0Sst(idx) => {
l0_entries.entry(idx).or_default().push(row_entry);
}
LayerLocation::SortedRun(sr_id) => {
sr_entries.entry(sr_id).or_default().push(row_entry);
}
}
}
if !mem_entries.is_empty() {
test_db_state.add_to_memtable(mem_entries);
}
let mut imm_indices: Vec<_> = imm_entries.keys().copied().collect();
imm_indices.sort();
for idx in imm_indices {
if let Some(entries) = imm_entries.remove(&idx) {
test_db_state.add_immutable_memtable(entries);
}
}
let mut l0_indices: Vec<_> = l0_entries.keys().copied().collect();
l0_indices.sort();
for idx in l0_indices {
if let Some(entries) = l0_entries.remove(&idx) {
test_db_state.add_to_l0(entries).await?;
}
}
let mut sr_ids: Vec<_> = sr_entries.keys().copied().collect();
sr_ids.sort();
sr_ids.reverse();
for sr_id in sr_ids {
if let Some(entries) = sr_entries.remove(&sr_id) {
test_db_state.add_to_sorted_run(sr_id, entries).await?;
}
}
Ok(wb_batch)
}
struct LayerPriorityTestCase {
entries: Vec<TestEntry>,
query_key: &'static [u8],
expected: Option<&'static [u8]>,
description: &'static str,
dirty: bool,
last_committed_seq: Option<u64>,
max_seq: Option<u64>,
}
#[tokio::test]
#[rstest]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::value(b"key1", b"wb_value", 100).with_location(LayerLocation::WriteBatch),
TestEntry::value(b"key1", b"mem_value", 50),
TestEntry::value(b"key1", b"l0_value", 40).with_location(LayerLocation::L0Sst(0)),
],
query_key: b"key1",
expected: Some(b"wb_value"),
description: "write batch should override memtable and L0", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::value(b"key1", b"mem_value", 50),
TestEntry::value(b"key1", b"l0_value", 40).with_location(LayerLocation::L0Sst(0)),
TestEntry::value(b"key1", b"sr_value", 30).with_location(LayerLocation::SortedRun(0)),
],
query_key: b"key1",
expected: Some(b"mem_value"),
description: "memtable should override L0 and sorted run", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::tombstone(b"key1", 100).with_location(LayerLocation::WriteBatch),
TestEntry::value(b"key1", b"mem_value", 50),
TestEntry::value(b"key1", b"l0_value", 40).with_location(LayerLocation::L0Sst(0)),
],
query_key: b"key1",
expected: None,
description: "tombstone in write batch should hide all values", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::tombstone(b"key1", 50),
TestEntry::value(b"key1", b"l0_value", 40).with_location(LayerLocation::L0Sst(0)),
],
query_key: b"key1",
expected: None,
description: "tombstone in memtable should hide L0 value", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::tombstone(b"key1", 40).with_location(LayerLocation::L0Sst(0)),
TestEntry::value(b"key1", b"sr_value", 30).with_location(LayerLocation::SortedRun(0)),
],
query_key: b"key1",
expected: None,
description: "tombstone in L0 should hide sorted run value", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::value(b"key1", b"new_value", 60),
TestEntry::tombstone(b"key1", 50).with_location(LayerLocation::L0Sst(0)),
],
query_key: b"key1",
expected: Some(b"new_value"),
description: "newer value should override older tombstone", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::value(b"key1", b"l0_newer", 45).with_location(LayerLocation::L0Sst(1)),
TestEntry::value(b"key1", b"l0_older", 40).with_location(LayerLocation::L0Sst(0)),
],
query_key: b"key1",
expected: Some(b"l0_newer"),
description: "newer L0 SST should win over older L0 SST", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::value(b"key1", b"l0_value", 40).with_location(LayerLocation::L0Sst(0)),
TestEntry::value(b"key1", b"sr_value", 30).with_location(LayerLocation::SortedRun(0)),
],
query_key: b"key1",
expected: Some(b"l0_value"),
description: "L0 value should override sorted run value", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::value(b"other_key", b"value", 50),
],
query_key: b"key1",
expected: None,
description: "nonexistent key should return None", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::tombstone(b"key1", 50),
],
query_key: b"key1",
expected: None,
description: "tombstone with no previous value should return None", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::value(b"key1", b"wb", 100).with_location(LayerLocation::WriteBatch),
TestEntry::value(b"key1", b"mem", 90),
TestEntry::value(b"key1", b"imm", 80).with_location(LayerLocation::ImmutableMemtable(0)),
TestEntry::value(b"key1", b"l0", 70).with_location(LayerLocation::L0Sst(0)),
TestEntry::value(b"key1", b"sr", 60).with_location(LayerLocation::SortedRun(0)),
],
query_key: b"key1",
expected: Some(b"wb"),
description: "write batch should win", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::value(b"key1", b"l0_0_val1", 50).with_location(LayerLocation::L0Sst(1)),
TestEntry::value(b"key2", b"l0_0_val2", 51).with_location(LayerLocation::L0Sst(1)),
TestEntry::value(b"key1", b"l0_1_val1", 40).with_location(LayerLocation::L0Sst(0)),
],
query_key: b"key1",
expected: Some(b"l0_0_val1"),
description: "first L0 SST entry should win", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::value(b"key1", b"sr0", 30).with_location(LayerLocation::SortedRun(1)),
TestEntry::value(b"key1", b"sr1", 20).with_location(LayerLocation::SortedRun(0)),
],
query_key: b"key1",
expected: Some(b"sr0"),
description: "first sorted run should win", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::value(b"key1", b"imm0", 60).with_location(LayerLocation::ImmutableMemtable(0)),
TestEntry::value(b"key1", b"imm1", 50).with_location(LayerLocation::ImmutableMemtable(1)),
TestEntry::value(b"key1", b"l0", 40).with_location(LayerLocation::L0Sst(0)),
],
query_key: b"key1",
expected: Some(b"imm0"),
description: "first immutable memtable should win over second and L0", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
// WriteBatch has multiple keys but not key1
TestEntry::value(b"key2", b"wb2", 100).with_location(LayerLocation::WriteBatch),
// Memtable has key1 with high seq
TestEntry::value(b"key1", b"mem", 90),
TestEntry::value(b"key3", b"mem3", 91),
// L0 has older versions
TestEntry::value(b"key1", b"l0_0", 70).with_location(LayerLocation::L0Sst(1)),
TestEntry::value(b"key1", b"l0_1", 60).with_location(LayerLocation::L0Sst(0)),
// SR has oldest
TestEntry::value(b"key1", b"sr", 50).with_location(LayerLocation::SortedRun(0)),
],
query_key: b"key1",
expected: Some(b"mem"),
description: "memtable value should win in complex multi-layer scenario", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::tombstone(b"key1", 60).with_location(LayerLocation::L0Sst(1)),
TestEntry::value(b"key1", b"l0_old_value", 50).with_location(LayerLocation::L0Sst(0)),
],
query_key: b"key1",
expected: None,
description: "tombstone should prevent returning older value regardless of TTL", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::value(b"key1", b"mem_value", 50),
],
query_key: b"key1",
expected: Some(b"mem_value"),
description: "value with no expiration should be returned at any time", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::value(b"key1", b"uncommitted", 100),
],
query_key: b"key1",
expected: None,
description: "committed read should not see uncommitted data", dirty: false, last_committed_seq: Some(50), max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::value(b"key1", b"committed", 40),
],
query_key: b"key1",
expected: Some(b"committed"),
description: "committed read should see data within committed range", dirty: false, last_committed_seq: Some(50), max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::value(b"key1", b"uncommitted", 100),
TestEntry::value(b"key1", b"committed", 40).with_location(LayerLocation::L0Sst(0)),
],
query_key: b"key1",
expected: Some(b"committed"),
description: "committed read should see older committed value when newer is uncommitted", dirty: false, last_committed_seq: Some(50), max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::value(b"key1", b"newer", 80),
TestEntry::value(b"key1", b"older", 50).with_location(LayerLocation::L0Sst(0)),
],
query_key: b"key1",
expected: Some(b"older"),
description: "snapshot read should only see values up to max_seq", dirty: true, last_committed_seq: None, max_seq: Some(60),
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::value(b"key1", b"newer", 80),
],
query_key: b"key1",
expected: None,
description: "snapshot should return None when all values exceed max_seq", dirty: true, last_committed_seq: None, max_seq: Some(60),
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::value(b"key1", b"v1", 100),
TestEntry::value(b"key1", b"v2", 70).with_location(LayerLocation::L0Sst(1)),
TestEntry::value(b"key1", b"v3", 40).with_location(LayerLocation::L0Sst(0)),
],
query_key: b"key1",
expected: Some(b"v3"),
description: "should respect both max_seq and committed_seq constraints", dirty: false, last_committed_seq: Some(50), max_seq: Some(60),
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::tombstone(b"key1", 45).with_location(LayerLocation::L0Sst(0)),
TestEntry::value(b"key1", b"old_value", 30).with_location(LayerLocation::SortedRun(0)),
],
query_key: b"key1",
expected: None,
description: "tombstone within seq bounds should hide older values", dirty: false, last_committed_seq: Some(50), max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::tombstone(b"key1", 100),
TestEntry::value(b"key1", b"old_value", 40).with_location(LayerLocation::L0Sst(0)),
],
query_key: b"key1",
expected: Some(b"old_value"),
description: "filtered tombstone should not hide visible older value", dirty: false, last_committed_seq: Some(50), max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::value(b"key1", b"mem", 90),
TestEntry::value(b"key1", b"imm", 70).with_location(LayerLocation::ImmutableMemtable(0)),
TestEntry::value(b"key1", b"l0", 50).with_location(LayerLocation::L0Sst(0)),
TestEntry::value(b"key1", b"sr", 30).with_location(LayerLocation::SortedRun(0)),
],
query_key: b"key1",
expected: Some(b"l0"),
description: "sequence filtering should work uniformly across all layers", dirty: false, last_committed_seq: Some(60), max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::value(b"key1", b"uncommitted", 100),
],
query_key: b"key1",
expected: Some(b"uncommitted"),
description: "dirty read should see uncommitted data", dirty: true, last_committed_seq: Some(50), max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::merge(b"key1", b"wb_merge", u64::MAX).with_location(LayerLocation::WriteBatch),
TestEntry::value(b"key1", b"mem_value", 50),
TestEntry::value(b"key1", b"l0_value", 40).with_location(LayerLocation::L0Sst(0)),
],
query_key: b"key1",
expected: Some(b"mem_valuewb_merge"), // Merge("mem_value", "wb_merge") = "mem_valuewb_merge"
description: "[MERGE] write batch merge should merge with memtable value", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::merge(b"key1", b"merge1", u64::MAX).with_location(LayerLocation::WriteBatch),
TestEntry::merge(b"key1", b"merge2", u64::MAX).with_location(LayerLocation::WriteBatch),
TestEntry::value(b"key1", b"base", 50).with_location(LayerLocation::L0Sst(0)),
],
query_key: b"key1",
expected: Some(b"basemerge1merge2"), // Merge(base, merge1, merge2) = "basemerge1merge2"
description: "[MERGE] multiple write batch merges should merge with base value", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::merge(b"key1", b"old_merge", u64::MAX).with_location(LayerLocation::WriteBatch),
TestEntry::tombstone(b"key1", 90).with_location(LayerLocation::WriteBatch),
TestEntry::merge(b"key1", b"new_merge", u64::MAX).with_location(LayerLocation::WriteBatch),
TestEntry::value(b"key1", b"base", 50).with_location(LayerLocation::L0Sst(0)),
],
query_key: b"key1",
expected: Some(b"new_merge"), // Tombstone clears history, only merge after tombstone applies
description: "[MERGE] write batch tombstone should clear merge history", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::merge(b"key1", b"old_merge", u64::MAX).with_location(LayerLocation::WriteBatch),
TestEntry::value(b"key1", b"new_base", 90).with_location(LayerLocation::WriteBatch),
TestEntry::merge(b"key1", b"merge", u64::MAX).with_location(LayerLocation::WriteBatch),
TestEntry::value(b"key1", b"old_base", 50).with_location(LayerLocation::L0Sst(0)),
],
query_key: b"key1",
expected: Some(b"new_basemerge"), // Value acts as barrier, only Merge("new_base", "merge") applies
description: "[MERGE] write batch value should act as new base for subsequent merges", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::merge(b"key1", b"merge1", u64::MAX).with_location(LayerLocation::WriteBatch),
TestEntry::merge(b"key1", b"merge2", u64::MAX).with_location(LayerLocation::WriteBatch),
],
query_key: b"key1",
expected: Some(b"merge1merge2"), // Only merge operands, no base value
description: "[MERGE] write batch merges without base values should merge together", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::merge(b"key1", b"a", 50),
],
query_key: b"key1",
expected: Some(b"a"), // Single merge operand with no base returns the operand
description: "[MERGE] single merge operand without base should be returned as-is", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::merge(b"key1", b"b", 60),
TestEntry::value(b"key1", b"a", 50).with_location(LayerLocation::L0Sst(0)),
],
query_key: b"key1",
expected: Some(b"ab"), // Merge("a", "b") = "ab" (concatenation)
description: "[MERGE] merge operand should be merged with base value", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::merge(b"key1", b"c", 70),
TestEntry::merge(b"key1", b"b", 60).with_location(LayerLocation::L0Sst(0)),
TestEntry::value(b"key1", b"a", 50).with_location(LayerLocation::SortedRun(0)),
],
query_key: b"key1",
expected: Some(b"abc"), // Merge(Merge("a", "b"), "c") = "abc"
description: "[MERGE] multiple merge operands should be applied in sequence order", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::merge(b"key1", b"d", 80),
TestEntry::tombstone(b"key1", 70).with_location(LayerLocation::L0Sst(1)),
TestEntry::merge(b"key1", b"c", 60).with_location(LayerLocation::L0Sst(0)),
TestEntry::value(b"key1", b"ab", 50).with_location(LayerLocation::SortedRun(0)),
],
query_key: b"key1",
expected: Some(b"d"), // Tombstone acts as barrier, only merge after tombstone is applied
description: "[MERGE] tombstone should clear merge history", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::merge(b"key1", b"z", 90),
TestEntry::value(b"key1", b"y", 80).with_location(LayerLocation::L0Sst(1)),
TestEntry::merge(b"key1", b"x", 70).with_location(LayerLocation::L0Sst(0)),
TestEntry::value(b"key1", b"w", 60).with_location(LayerLocation::SortedRun(0)),
],
query_key: b"key1",
expected: Some(b"yz"), // Value("y") acts as barrier, only Merge("y", "z") is applied
description: "[MERGE] value should act as new base for subsequent merges", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::merge(b"key1", b"d", 100),
TestEntry::merge(b"key1", b"c", 90).with_location(LayerLocation::ImmutableMemtable(0)),
TestEntry::merge(b"key1", b"b", 80).with_location(LayerLocation::L0Sst(0)),
TestEntry::value(b"key1", b"a", 70).with_location(LayerLocation::SortedRun(0)),
],
query_key: b"key1",
expected: Some(b"abcd"), // Merge(Merge(Merge("a", "b"), "c"), "d") = "abcd"
description: "[MERGE] merges should work across all layers", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::merge(b"key1", b"c", 100), // Filtered (uncommitted)
TestEntry::merge(b"key1", b"b", 45), // Visible
TestEntry::value(b"key1", b"a", 40).with_location(LayerLocation::L0Sst(0)), // Visible
],
query_key: b"key1",
expected: Some(b"ab"), description: "[MERGE] merge operands should respect sequence filtering", dirty: false, last_committed_seq: Some(50), max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::merge(b"key1", b"c", 80), // Filtered by snapshot
TestEntry::merge(b"key1", b"b", 55), // Visible in snapshot
TestEntry::value(b"key1", b"a", 50).with_location(LayerLocation::L0Sst(0)), // Visible in snapshot
],
query_key: b"key1",
expected: Some(b"ab"), // Merge("a", "b") = "ab" (seq 80 filtered by snapshot)
description: "[MERGE] snapshot should filter merge operands by max_seq", dirty: true, last_committed_seq: None, max_seq: Some(60),
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::merge(b"key1", b"c", 70),
TestEntry::merge(b"key1", b"b", 60).with_location(LayerLocation::L0Sst(0)),
TestEntry::merge(b"key1", b"a", 50).with_location(LayerLocation::SortedRun(0)),
],
query_key: b"key1",
expected: Some(b"abc"), // String concatenation: "" + "a" + "b" + "c" = "abc"
description: "[MERGE] multiple merge operands without base should merge together", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::merge(b"key1", b"new", 80),
TestEntry::tombstone(b"key1", 70).with_location(LayerLocation::L0Sst(1)),
TestEntry::value(b"key1", b"old", 60).with_location(LayerLocation::L0Sst(0)),
],
query_key: b"key1",
expected: Some(b"new"), // Tombstone blocks old value, merge becomes new base
description: "[MERGE] tombstone should prevent merge from applying to older value", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(LayerPriorityTestCase {
entries: vec![
TestEntry::merge(b"key1", b"c", 70),
TestEntry::merge(b"key1", b"b", 60),
TestEntry::merge(b"key1", b"a", 50),
],
query_key: b"key1",
expected: Some(b"abc"), // All merges in memtable: Merge(Merge("a", "b"), "c") = "abc"
description: "[MERGE] multiple merge operands in same layer should merge in seq order", dirty: true, last_committed_seq: None, max_seq: None,
})]
async fn test_get_with_options_layer_priority(
#[case] test_case: LayerPriorityTestCase,
) -> Result<(), SlateDBError> {
let mut test_db_state = TestDbState::new().await;
let write_batch = populate_db_state(&mut test_db_state, test_case.entries).await?;
let recorder = slatedb_common::metrics::MetricsRecorderHelper::noop();
let db_stats = DbStats::new(&recorder);
let test_clock = Arc::new(MockSystemClock::new());
let mono_clock = Arc::new(MonotonicClock::new(test_clock as Arc<dyn SystemClock>, 0));
let last_committed_seq = test_case.last_committed_seq.unwrap_or(u64::MAX);
let oracle = Arc::new(DbReaderOracle::new(
last_committed_seq,
DbStatusManager::new(0),
));
let merge_operator = if test_case.description.contains("[MERGE]") {
Some(Arc::new(StringConcatMergeOperator) as Arc<dyn MergeOperator + Send + Sync>)
} else {
None
};
let reader = Reader::new(
test_db_state.table_store.clone(),
db_stats,
mono_clock,
oracle,
merge_operator,
);
let read_options = ReadOptions::default().with_dirty(test_case.dirty);
let result = reader
.get_key_value_with_options(
test_case.query_key,
&read_options,
&test_db_state,
write_batch,
test_case.max_seq,
)
.await?;
let actual = result.as_ref().map(|kv| kv.value.as_ref());
let expected = test_case.expected;
assert_eq!(
actual,
expected,
"Failed test: {}\nActual: {:?}\nExpected: {:?}",
test_case.description,
actual.map(|b| String::from_utf8_lossy(b)),
expected.map(|b| String::from_utf8_lossy(b))
);
Ok(())
}
#[test]
fn test_scan_options_builder_pattern() {
let options = ScanOptions::default()
.with_max_fetch_tasks(4)
.with_cache_blocks(true)
.with_read_ahead_bytes(1024);
assert_eq!(options.max_fetch_tasks, 4);
assert!(options.cache_blocks);
assert_eq!(options.read_ahead_bytes, 1024);
}
struct ScanTestCase {
entries: Vec<TestEntry>,
range_start: &'static [u8],
range_end: &'static [u8],
expected: Vec<(&'static [u8], &'static [u8])>,
description: &'static str,
dirty: bool,
last_committed_seq: Option<u64>,
max_seq: Option<u64>,
}
#[tokio::test]
#[rstest]
#[case(ScanTestCase {
entries: vec![
TestEntry::value(b"key1", b"val1", 10),
TestEntry::value(b"key2", b"val2", 10),
TestEntry::value(b"key3", b"val3", 10),
],
range_start: b"key1",
range_end: b"key4",
expected: vec![(b"key1", b"val1"), (b"key2", b"val2"), (b"key3", b"val3")],
description: "scan should return keys in order", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(ScanTestCase {
entries: vec![
TestEntry::value(b"key1", b"val1", 10),
TestEntry::value(b"key2", b"val2", 10),
TestEntry::value(b"key3", b"val3", 10),
],
range_start: b"key2",
range_end: b"key3",
expected: vec![(b"key2", b"val2")],
description: "scan should respect range boundaries (end exclusive)", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(ScanTestCase {
entries: vec![
TestEntry::value(b"key1", b"mem_val", 50),
TestEntry::value(b"key1", b"l0_val", 40).with_location(LayerLocation::L0Sst(0)),
TestEntry::value(b"key2", b"l0_val2", 40).with_location(LayerLocation::L0Sst(0)),
],
range_start: b"key1",
range_end: b"key3",
expected: vec![(b"key1", b"mem_val"), (b"key2", b"l0_val2")],
description: "scan should prefer higher layer values", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(ScanTestCase {
entries: vec![
TestEntry::value(b"key1", b"val1", 50),
TestEntry::tombstone(b"key2", 50),
TestEntry::value(b"key2", b"old_val2", 40).with_location(LayerLocation::L0Sst(0)),
TestEntry::value(b"key3", b"val3", 50),
],
range_start: b"key1",
range_end: b"key4",
expected: vec![(b"key1", b"val1"), (b"key3", b"val3")],
description: "tombstones should hide values from lower layers", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(ScanTestCase {
entries: vec![
TestEntry::value(b"key1", b"committed", 40),
TestEntry::value(b"key2", b"uncommitted", 60),
TestEntry::value(b"key3", b"committed", 30),
],
range_start: b"key1",
range_end: b"key4",
expected: vec![(b"key1", b"committed"), (b"key3", b"committed")],
description: "committed scan should filter uncommitted values", dirty: false, last_committed_seq: Some(50), max_seq: None,
})]
#[case(ScanTestCase {
entries: vec![
TestEntry::value(b"key1", b"uncommitted", 60),
TestEntry::value(b"key1", b"committed", 40).with_location(LayerLocation::L0Sst(0)),
],
range_start: b"key1",
range_end: b"key2",
expected: vec![(b"key1", b"committed")],
description: "uncommitted value should not hide committed value in scan", dirty: false, last_committed_seq: Some(50), max_seq: None,
})]
#[case(ScanTestCase {
entries: vec![
TestEntry::value(b"key1", b"v1_new", 70),
TestEntry::value(b"key1", b"v1_old", 50).with_location(LayerLocation::L0Sst(0)),
TestEntry::value(b"key2", b"v2", 55),
],
range_start: b"key1",
range_end: b"key3",
expected: vec![(b"key1", b"v1_old"), (b"key2", b"v2")],
description: "snapshot scan should respect max_seq", dirty: true, last_committed_seq: None, max_seq: Some(60),
})]
#[case(ScanTestCase {
entries: vec![
TestEntry::value(b"key1", b"mem", 80),
TestEntry::value(b"key2", b"imm", 70).with_location(LayerLocation::ImmutableMemtable(0)),
TestEntry::value(b"key3", b"l0_new", 60).with_location(LayerLocation::L0Sst(1)),
TestEntry::value(b"key3", b"l0_old", 50).with_location(LayerLocation::L0Sst(0)),
TestEntry::value(b"key4", b"sr", 40).with_location(LayerLocation::SortedRun(0)),
],
range_start: b"key1",
range_end: b"key5",
expected: vec![(b"key1", b"mem"), (b"key2", b"imm"), (b"key3", b"l0_new"), (b"key4", b"sr")],
description: "scan should properly deduplicate across all layers", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(ScanTestCase {
entries: vec![
TestEntry::value(b"key1", b"val1", 10),
],
range_start: b"key5",
range_end: b"key9",
expected: vec![],
description: "scan of empty range should return no results", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(ScanTestCase {
entries: vec![
TestEntry::tombstone(b"key1", 50),
TestEntry::tombstone(b"key2", 50),
],
range_start: b"key1",
range_end: b"key3",
expected: vec![],
description: "scan with all tombstones should return empty", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(ScanTestCase {
entries: vec![
TestEntry::value(b"key1", b"uncommitted1", 70),
TestEntry::value(b"key1", b"committed1", 40).with_location(LayerLocation::L0Sst(0)),
TestEntry::value(b"key2", b"committed2", 45),
TestEntry::value(b"key3", b"uncommitted3", 80),
],
range_start: b"key1",
range_end: b"key4",
expected: vec![(b"key1", b"committed1"), (b"key2", b"committed2")],
description: "complex committed scan should filter correctly", dirty: false, last_committed_seq: Some(50), max_seq: None,
})]
#[case(ScanTestCase {
entries: vec![
TestEntry::tombstone(b"key1", 45).with_location(LayerLocation::L0Sst(0)),
TestEntry::value(b"key1", b"old", 30).with_location(LayerLocation::SortedRun(0)),
TestEntry::value(b"key2", b"val2", 40),
],
range_start: b"key1",
range_end: b"key3",
expected: vec![(b"key2", b"val2")],
description: "tombstone in bounds should hide older value", dirty: false, last_committed_seq: Some(50), max_seq: None,
})]
#[case(ScanTestCase {
entries: vec![
TestEntry::tombstone(b"key1", 70),
TestEntry::value(b"key1", b"visible", 40).with_location(LayerLocation::L0Sst(0)),
],
range_start: b"key1",
range_end: b"key2",
expected: vec![(b"key1", b"visible")],
description: "filtered tombstone should not hide visible value", dirty: false, last_committed_seq: Some(50), max_seq: None,
})]
#[case(ScanTestCase {
entries: vec![
TestEntry::merge(b"key1", b"b", 60),
TestEntry::value(b"key1", b"a", 50).with_location(LayerLocation::L0Sst(0)),
TestEntry::value(b"key2", b"x", 50).with_location(LayerLocation::L0Sst(0)),
],
range_start: b"key1",
range_end: b"key3",
expected: vec![(b"key1", b"ab"), (b"key2", b"x")], description: "[MERGE SCAN] should merge operands with base values during scan", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(ScanTestCase {
entries: vec![
TestEntry::merge(b"key1", b"2", 60),
TestEntry::value(b"key1", b"1", 50).with_location(LayerLocation::L0Sst(0)),
TestEntry::merge(b"key2", b"b", 60),
TestEntry::value(b"key2", b"a", 50).with_location(LayerLocation::L0Sst(0)),
TestEntry::merge(b"key3", b"y", 60),
TestEntry::value(b"key3", b"x", 50).with_location(LayerLocation::L0Sst(0)),
],
range_start: b"key1",
range_end: b"key4",
expected: vec![(b"key1", b"12"), (b"key2", b"ab"), (b"key3", b"xy")],
description: "[MERGE SCAN] should merge operands for multiple keys", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(ScanTestCase {
entries: vec![
TestEntry::merge(b"key1", b"new", 70),
TestEntry::tombstone(b"key1", 60).with_location(LayerLocation::L0Sst(1)),
TestEntry::merge(b"key1", b"c", 50).with_location(LayerLocation::L0Sst(0)),
TestEntry::value(b"key2", b"x", 50).with_location(LayerLocation::L0Sst(0)),
],
range_start: b"key1",
range_end: b"key3",
expected: vec![(b"key1", b"new"), (b"key2", b"x")], description: "[MERGE SCAN] tombstone should clear merge history in scan", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(ScanTestCase {
entries: vec![
TestEntry::merge(b"key1", b"b", 60),
TestEntry::merge(b"key1", b"a", 50).with_location(LayerLocation::L0Sst(0)),
TestEntry::merge(b"key2", b"y", 60),
TestEntry::merge(b"key2", b"x", 50).with_location(LayerLocation::L0Sst(0)),
],
range_start: b"key1",
range_end: b"key3",
expected: vec![(b"key1", b"ab"), (b"key2", b"xy")],
description: "[MERGE SCAN] should merge operands without base values", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(ScanTestCase {
entries: vec![
TestEntry::merge(b"key1", b"d", 80),
TestEntry::merge(b"key1", b"c", 70).with_location(LayerLocation::ImmutableMemtable(0)),
TestEntry::merge(b"key1", b"b", 60).with_location(LayerLocation::L0Sst(0)),
TestEntry::value(b"key1", b"a", 50).with_location(LayerLocation::SortedRun(0)),
],
range_start: b"key1",
range_end: b"key2",
expected: vec![(b"key1", b"abcd")], description: "[MERGE SCAN] should merge operands across multiple layers", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(ScanTestCase {
entries: vec![
TestEntry::merge(b"key1", b"c", 100), // Filtered (uncommitted)
TestEntry::merge(b"key1", b"b", 45),
TestEntry::value(b"key1", b"a", 40).with_location(LayerLocation::L0Sst(0)),
TestEntry::value(b"key2", b"x", 40).with_location(LayerLocation::L0Sst(0)),
],
range_start: b"key1",
range_end: b"key3",
expected: vec![(b"key1", b"ab"), (b"key2", b"x")], description: "[MERGE SCAN] should filter merge operands by sequence number", dirty: false, last_committed_seq: Some(50), max_seq: None,
})]
#[case(ScanTestCase {
entries: vec![
TestEntry::merge(b"key1", b"b", 60),
TestEntry::value(b"key1", b"a", 50).with_location(LayerLocation::L0Sst(0)),
TestEntry::tombstone(b"key2", 60),
TestEntry::merge(b"key2", b"z", 50).with_location(LayerLocation::L0Sst(0)),
TestEntry::merge(b"key3", b"y", 60),
TestEntry::merge(b"key3", b"x", 50).with_location(LayerLocation::L0Sst(0)),
],
range_start: b"key1",
range_end: b"key4",
expected: vec![(b"key1", b"ab"), (b"key3", b"xy")], description: "[MERGE SCAN] should handle mix of values, merges, and tombstones", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(ScanTestCase {
entries: vec![
TestEntry::merge(b"key1", b"wb_merge", 100).with_location(LayerLocation::WriteBatch),
TestEntry::value(b"key1", b"base", 50).with_location(LayerLocation::L0Sst(0)),
TestEntry::value(b"key2", b"normal", 50).with_location(LayerLocation::L0Sst(0)),
],
range_start: b"key1",
range_end: b"key3",
expected: vec![(b"key1", b"basewb_merge"), (b"key2", b"normal")], description: "[MERGE SCAN] should merge write batch operands with base values", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(ScanTestCase {
entries: vec![
TestEntry::merge(b"key1", b"merge1", 80).with_location(LayerLocation::WriteBatch),
TestEntry::merge(b"key1", b"merge2", 90).with_location(LayerLocation::WriteBatch),
TestEntry::merge(b"key1", b"merge3", 100).with_location(LayerLocation::WriteBatch),
TestEntry::value(b"key1", b"base", 50).with_location(LayerLocation::L0Sst(0)),
TestEntry::value(b"key2", b"normal", 50).with_location(LayerLocation::L0Sst(0)),
],
range_start: b"key1",
range_end: b"key3",
expected: vec![(b"key1", b"basemerge1merge2merge3"), (b"key2", b"normal")], description: "[MERGE SCAN] should apply multiple write batch merges in sequence", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(ScanTestCase {
entries: vec![
TestEntry::merge(b"key1", b"old_merge", 80).with_location(LayerLocation::WriteBatch),
TestEntry::tombstone(b"key1", 90).with_location(LayerLocation::WriteBatch),
TestEntry::merge(b"key1", b"new_merge", 100).with_location(LayerLocation::WriteBatch),
TestEntry::value(b"key1", b"base", 50).with_location(LayerLocation::L0Sst(0)),
TestEntry::value(b"key2", b"normal", 50).with_location(LayerLocation::L0Sst(0)),
],
range_start: b"key1",
range_end: b"key3",
expected: vec![(b"key1", b"new_merge"), (b"key2", b"normal")], description: "[MERGE SCAN] write batch tombstone should clear merge history", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(ScanTestCase {
entries: vec![
TestEntry::merge(b"key1", b"old_merge", 80).with_location(LayerLocation::WriteBatch),
TestEntry::value(b"key1", b"new_base", 90).with_location(LayerLocation::WriteBatch),
TestEntry::merge(b"key1", b"final_merge", 100).with_location(LayerLocation::WriteBatch),
TestEntry::value(b"key1", b"old_base", 50).with_location(LayerLocation::L0Sst(0)),
TestEntry::value(b"key2", b"normal", 50).with_location(LayerLocation::L0Sst(0)),
],
range_start: b"key1",
range_end: b"key3",
expected: vec![(b"key1", b"new_basefinal_merge"), (b"key2", b"normal")], description: "[MERGE SCAN] write batch value should act as new base for subsequent merges", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(ScanTestCase {
entries: vec![
TestEntry::merge(b"key1", b"1_merge", 100).with_location(LayerLocation::WriteBatch),
TestEntry::value(b"key1", b"1_base", 50).with_location(LayerLocation::L0Sst(0)),
TestEntry::merge(b"key2", b"2_merge", 100).with_location(LayerLocation::WriteBatch),
TestEntry::value(b"key2", b"2_base", 50).with_location(LayerLocation::L0Sst(0)),
TestEntry::merge(b"key3", b"3_merge", 100).with_location(LayerLocation::WriteBatch),
TestEntry::value(b"key3", b"3_base", 50).with_location(LayerLocation::L0Sst(0)),
],
range_start: b"key1",
range_end: b"key4",
expected: vec![(b"key1", b"1_base1_merge"), (b"key2", b"2_base2_merge"), (b"key3", b"3_base3_merge")],
description: "[MERGE SCAN] should merge write batch operands for multiple keys", dirty: true, last_committed_seq: None, max_seq: None,
})]
#[case(ScanTestCase {
entries: vec![
TestEntry::merge(b"key1", b"merge1", 90).with_location(LayerLocation::WriteBatch),
TestEntry::merge(b"key1", b"merge2", 100).with_location(LayerLocation::WriteBatch),
TestEntry::value(b"key2", b"normal", 50).with_location(LayerLocation::L0Sst(0)),
],
range_start: b"key1",
range_end: b"key3",
expected: vec![(b"key1", b"merge1merge2"), (b"key2", b"normal")], description: "[MERGE SCAN] should merge write batch operands without base values", dirty: true, last_committed_seq: None, max_seq: None,
})]
async fn test_scan_with_options_layer_priority(
#[case] test_case: ScanTestCase,
) -> Result<(), SlateDBError> {
let mut test_db_state = TestDbState::new().await;
let write_batch = populate_db_state(&mut test_db_state, test_case.entries).await?;
let recorder = slatedb_common::metrics::MetricsRecorderHelper::noop();
let db_stats = DbStats::new(&recorder);
let test_clock = Arc::new(MockSystemClock::new());
let mono_clock = Arc::new(MonotonicClock::new(test_clock as Arc<dyn SystemClock>, 0));
let last_committed_seq = test_case.last_committed_seq.unwrap_or(u64::MAX);
let oracle = Arc::new(DbReaderOracle::new(
last_committed_seq,
DbStatusManager::new(0),
));
let merge_operator = if test_case.description.contains("[MERGE") {
Some(Arc::new(StringConcatMergeOperator) as Arc<dyn MergeOperator + Send + Sync>)
} else {
None
};
let reader = Reader::new(
test_db_state.table_store.clone(),
db_stats,
mono_clock,
oracle,
merge_operator,
);
let range = BytesRange::from_slice(test_case.range_start..test_case.range_end);
let scan_options = ScanOptions::default().with_dirty(test_case.dirty);
let mut iter = reader
.scan_with_options(
range,
&scan_options,
&test_db_state,
write_batch,
test_case.max_seq,
None,
)
.await?;
let mut actual = Vec::new();
while let Some(kv) = iter
.next()
.await
.map_err(|e| SlateDBError::IoError(Arc::new(std::io::Error::other(e))))?
{
actual.push((kv.key.to_vec(), kv.value.to_vec()));
}
let expected: Vec<(Vec<u8>, Vec<u8>)> = test_case
.expected
.into_iter()
.map(|(k, v)| (k.to_vec(), v.to_vec()))
.collect();
assert_eq!(
actual,
expected,
"Failed test: {}\nActual entries: {:?}\nExpected entries: {:?}",
test_case.description,
actual
.iter()
.map(|(k, v)| format!(
"({}, {})",
String::from_utf8_lossy(k),
String::from_utf8_lossy(v)
))
.collect::<Vec<_>>(),
expected
.iter()
.map(|(k, v)| format!(
"({}, {})",
String::from_utf8_lossy(k),
String::from_utf8_lossy(v)
))
.collect::<Vec<_>>()
);
Ok(())
}
async fn build_reader(
test_db_state: &TestDbState,
db_stats: DbStats,
with_merge: bool,
) -> Reader {
let test_clock = Arc::new(MockSystemClock::new());
let mono_clock = Arc::new(MonotonicClock::new(test_clock as Arc<dyn SystemClock>, 0));
let oracle = Arc::new(DbReaderOracle::new(u64::MAX, DbStatusManager::new(0)));
let merge_operator = if with_merge {
Some(Arc::new(StringConcatMergeOperator) as Arc<dyn MergeOperator + Send + Sync>)
} else {
None
};
Reader::new(
test_db_state.table_store.clone(),
db_stats,
mono_clock,
oracle,
merge_operator,
)
}
#[tokio::test]
async fn should_record_merge_operator_operands_on_read_path() -> Result<(), SlateDBError> {
let entries = vec![
TestEntry::merge(b"key1", b"b", 2),
TestEntry::merge(b"key1", b"a", 1),
TestEntry::value(b"key2", b"value2", 1),
];
let mut test_db_state = TestDbState::new().await;
let write_batch = populate_db_state(&mut test_db_state, entries).await?;
let metrics_recorder = Arc::new(DefaultMetricsRecorder::new());
let recorder = MetricsRecorderHelper::new(
metrics_recorder.clone() as Arc<dyn MetricsRecorder>,
MetricLevel::default(),
);
let db_stats = DbStats::new(&recorder);
let reader = build_reader(&test_db_state, db_stats, true).await;
assert_eq!(
lookup_merge_operator_operands(metrics_recorder.as_ref(), MERGE_OPERATOR_READ_PATH),
Some(0)
);
assert_eq!(
lookup_merge_operator_operands(metrics_recorder.as_ref(), MERGE_OPERATOR_FLUSH_PATH,),
Some(0)
);
let result = reader
.get_key_value_with_options(
b"key1",
&ReadOptions::default().with_dirty(true),
&test_db_state,
write_batch.clone(),
None,
)
.await?;
assert_eq!(result.unwrap().value, Bytes::from_static(b"ab"));
assert_eq!(
lookup_merge_operator_operands(metrics_recorder.as_ref(), MERGE_OPERATOR_READ_PATH),
Some(3)
);
assert_eq!(
lookup_merge_operator_operands(metrics_recorder.as_ref(), MERGE_OPERATOR_FLUSH_PATH,),
Some(0)
);
let mut iter = reader
.scan_with_options(
BytesRange::from_slice(b"key1".as_ref()..b"key3".as_ref()),
&ScanOptions::default().with_dirty(true),
&test_db_state,
write_batch,
None,
None,
)
.await?;
let first = iter
.next()
.await
.map_err(|e| SlateDBError::IoError(Arc::new(std::io::Error::other(e))))?
.expect("should have key1");
assert_eq!(first.key.as_ref(), b"key1");
assert_eq!(first.value.as_ref(), b"ab");
let second = iter
.next()
.await
.map_err(|e| SlateDBError::IoError(Arc::new(std::io::Error::other(e))))?
.expect("should have key2");
assert_eq!(second.key.as_ref(), b"key2");
assert_eq!(second.value.as_ref(), b"value2");
assert!(iter
.next()
.await
.map_err(|e| SlateDBError::IoError(Arc::new(std::io::Error::other(e))))?
.is_none());
assert_eq!(
lookup_merge_operator_operands(metrics_recorder.as_ref(), MERGE_OPERATOR_READ_PATH),
Some(6)
);
assert_eq!(
lookup_merge_operator_operands(metrics_recorder.as_ref(), MERGE_OPERATOR_FLUSH_PATH,),
Some(0)
);
Ok(())
}
#[tokio::test]
async fn should_record_bloom_filter_negative_for_sorted_run_point_lookup(
) -> Result<(), SlateDBError> {
let entries = vec![
TestEntry::value(b"key1", b"value1", 50).with_location(LayerLocation::SortedRun(0)),
TestEntry::value(b"key3", b"value3", 40).with_location(LayerLocation::SortedRun(0)),
];
let mut test_db_state = TestDbState::new().await;
let write_batch = populate_db_state(&mut test_db_state, entries).await?;
let recorder = Arc::new(DefaultMetricsRecorder::new());
let helper = MetricsRecorderHelper::new(recorder.clone(), MetricLevel::default());
let db_stats = DbStats::new(&helper);
let reader = build_reader(&test_db_state, db_stats, false).await;
let result = reader
.get_key_value_with_options(
b"key2",
&ReadOptions::default().with_dirty(true),
&test_db_state,
write_batch,
None,
)
.await?;
assert!(result.is_none());
assert_eq!(
lookup_metric(&recorder, crate::db_stats::SST_FILTER_NEGATIVE_COUNT),
Some(1)
);
assert_eq!(
lookup_metric(&recorder, crate::db_stats::SST_FILTER_POSITIVE_COUNT),
Some(0)
);
assert_eq!(
lookup_metric(&recorder, crate::db_stats::SST_FILTER_FALSE_POSITIVE_COUNT),
Some(0)
);
Ok(())
}
#[tokio::test]
async fn should_return_expire_ts_on_get() -> Result<(), SlateDBError> {
let entries = vec![
TestEntry::value(b"key1", b"value1", 50).with_expire_ts(500),
TestEntry::value(b"key2", b"value2", 50),
];
let mut test_db_state = TestDbState::new().await;
let write_batch = populate_db_state(&mut test_db_state, entries).await?;
let recorder = slatedb_common::metrics::MetricsRecorderHelper::noop();
let db_stats = DbStats::new(&recorder);
let reader = build_reader(&test_db_state, db_stats, false).await;
let result = reader
.get_key_value_with_options(
b"key1",
&ReadOptions::default().with_dirty(true),
&test_db_state,
write_batch.clone(),
None,
)
.await?;
let kv = result.expect("should return value");
assert_eq!(kv.value.as_ref(), b"value1");
assert_eq!(kv.expire_ts, Some(500));
let result = reader
.get_key_value_with_options(
b"key2",
&ReadOptions::default().with_dirty(true),
&test_db_state,
write_batch,
None,
)
.await?;
let kv = result.expect("should return value");
assert_eq!(kv.value.as_ref(), b"value2");
assert_eq!(kv.expire_ts, None);
Ok(())
}
#[tokio::test]
async fn should_return_expire_ts_on_scan() -> Result<(), SlateDBError> {
let entries = vec![
TestEntry::value(b"key1", b"val1", 50).with_expire_ts(100),
TestEntry::value(b"key2", b"val2", 50),
TestEntry::value(b"key3", b"val3", 50).with_expire_ts(300),
];
let mut test_db_state = TestDbState::new().await;
let write_batch = populate_db_state(&mut test_db_state, entries).await?;
let recorder = slatedb_common::metrics::MetricsRecorderHelper::noop();
let db_stats = DbStats::new(&recorder);
let reader = build_reader(&test_db_state, db_stats, false).await;
let range = BytesRange::from_slice(b"key1".as_ref()..b"key4".as_ref());
let scan_options = ScanOptions::default().with_dirty(true);
let mut iter = reader
.scan_with_options(
range,
&scan_options,
&test_db_state,
write_batch,
None,
None,
)
.await?;
let kv1 = iter
.next()
.await
.map_err(|e| SlateDBError::IoError(Arc::new(std::io::Error::other(e))))?
.expect("should have key1");
assert_eq!(kv1.key.as_ref(), b"key1");
assert_eq!(kv1.expire_ts, Some(100));
let kv2 = iter
.next()
.await
.map_err(|e| SlateDBError::IoError(Arc::new(std::io::Error::other(e))))?
.expect("should have key2");
assert_eq!(kv2.key.as_ref(), b"key2");
assert_eq!(kv2.expire_ts, None);
let kv3 = iter
.next()
.await
.map_err(|e| SlateDBError::IoError(Arc::new(std::io::Error::other(e))))?
.expect("should have key3");
assert_eq!(kv3.key.as_ref(), b"key3");
assert_eq!(kv3.expire_ts, Some(300));
Ok(())
}
#[tokio::test]
async fn should_return_min_expire_ts_from_merged_entries() -> Result<(), SlateDBError> {
let entries = vec![
TestEntry::merge(b"key1", b"d", 80).with_expire_ts(200),
TestEntry::merge(b"key1", b"c", 70)
.with_location(LayerLocation::L0Sst(1))
.with_expire_ts(100),
TestEntry::merge(b"key1", b"b", 60).with_location(LayerLocation::L0Sst(0)),
TestEntry::value(b"key1", b"a", 50)
.with_location(LayerLocation::SortedRun(0))
.with_expire_ts(300),
];
let mut test_db_state = TestDbState::new().await;
let write_batch = populate_db_state(&mut test_db_state, entries).await?;
let recorder = slatedb_common::metrics::MetricsRecorderHelper::noop();
let db_stats = DbStats::new(&recorder);
let reader = build_reader(&test_db_state, db_stats, true).await;
let result = reader
.get_key_value_with_options(
b"key1",
&ReadOptions::default().with_dirty(true),
&test_db_state,
write_batch,
None,
)
.await?;
let kv = result.expect("should return merged value");
assert_eq!(kv.value.as_ref(), b"abcd");
assert_eq!(
kv.expire_ts,
Some(100),
"expire_ts should be the minimum across all merge operands"
);
Ok(())
}
#[tokio::test]
async fn should_return_base_expire_ts_when_it_is_earliest() -> Result<(), SlateDBError> {
let entries = vec![
TestEntry::merge(b"key1", b"b", 60).with_expire_ts(300),
TestEntry::value(b"key1", b"a", 50)
.with_location(LayerLocation::L0Sst(0))
.with_expire_ts(50),
];
let mut test_db_state = TestDbState::new().await;
let write_batch = populate_db_state(&mut test_db_state, entries).await?;
let recorder = slatedb_common::metrics::MetricsRecorderHelper::noop();
let db_stats = DbStats::new(&recorder);
let reader = build_reader(&test_db_state, db_stats, true).await;
let result = reader
.get_key_value_with_options(
b"key1",
&ReadOptions::default().with_dirty(true),
&test_db_state,
write_batch,
None,
)
.await?;
let kv = result.expect("should return merged value");
assert_eq!(kv.value.as_ref(), b"ab");
assert_eq!(
kv.expire_ts,
Some(50),
"expire_ts should be the base value's expire_ts since it is the earliest"
);
Ok(())
}
}