use crate::bytes_range::BytesRange;
use crate::checkpoint::Checkpoint;
use crate::config::CompressionCodec;
use crate::error::SlateDBError;
use crate::manifest::Manifest;
use crate::mem_table::{ImmutableMemtable, KVTable, WritableKVTable};
use crate::reader::DbStateReader;
use crate::seq_tracker::SequenceTracker;
use crate::wal_id::WalIdStore;
use bytes::Bytes;
use log::debug;
use serde::Serialize;
use slatedb_txn_obj::DirtyObject;
use std::collections::VecDeque;
use std::fmt::{Debug, Formatter};
use std::ops::Bound::{Excluded, Included, Unbounded};
use std::ops::{Bound, Range, RangeBounds};
use std::sync::Arc;
use ulid::Ulid;
use uuid::Uuid;
use SsTableId::{Compacted, Wal};
#[derive(Clone, PartialEq, Serialize)]
pub struct SsTableHandle {
pub id: SsTableId,
pub(crate) format_version: u16,
pub info: SsTableInfo,
}
impl Debug for SsTableHandle {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!("SsTableHandle({:?})", self.id))
}
}
impl SsTableHandle {
pub(crate) fn new(id: SsTableId, format_version: u16, info: SsTableInfo) -> Self {
SsTableHandle {
id,
format_version,
info,
}
}
pub fn estimate_size(&self) -> u64 {
self.info.index_offset + self.info.index_len
}
}
impl AsRef<SsTableHandle> for SsTableHandle {
fn as_ref(&self) -> &SsTableHandle {
self
}
}
#[derive(Clone, PartialEq, Serialize)]
pub struct SsTableView {
pub id: Ulid,
pub sst: SsTableHandle,
pub(crate) visible_range: Option<BytesRange>,
effective_range: BytesRange,
}
impl Debug for SsTableView {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!(
"SsTableView({:?}, {:?})",
self.sst.id, self.visible_range
))
}
}
impl SsTableView {
pub(crate) fn identity(sst: SsTableHandle) -> Self {
let id = match &sst.id {
SsTableId::Compacted(ulid) => *ulid,
SsTableId::Wal(wal_id) => Ulid::from_parts(*wal_id, 0),
};
Self::new(id, sst)
}
pub(crate) fn new(id: Ulid, sst: SsTableHandle) -> Self {
let effective_range = match sst.info.first_entry.clone() {
Some(physical_first_entry) => {
let end_bound = match sst.info.last_entry.clone() {
Some(physical_last_entry) => Included(physical_last_entry),
None => Unbounded,
};
BytesRange::new(Included(physical_first_entry), end_bound)
}
None => BytesRange::new_empty(),
};
SsTableView {
id,
sst,
visible_range: None,
effective_range,
}
}
pub(crate) fn new_projected(
id: Ulid,
sst: SsTableHandle,
visible_range: Option<BytesRange>,
) -> Self {
let mut effective_range = match sst.info.first_entry.clone() {
Some(physical_first_entry) => {
let end_bound = match sst.info.last_entry.clone() {
Some(physical_last_entry) => Included(physical_last_entry),
None => Unbounded,
};
BytesRange::new(Included(physical_first_entry), end_bound)
}
None => {
unreachable!("SST always has a first entry.")
}
};
if let Some(visible_range) = &visible_range {
assert!(
visible_range.is_start_bound_included_or_unbounded(),
"Start bound of the visible range must be either Included or Unbounded."
);
effective_range = effective_range
.intersect(visible_range)
.expect("An intersection of visible and physical range must be non-empty.")
}
SsTableView {
id,
sst,
visible_range,
effective_range,
}
}
pub(crate) fn with_visible_range(&self, visible_range: BytesRange) -> Self {
Self::new_projected(self.id, self.sst.clone(), Some(visible_range))
}
pub fn visible_range(&self) -> Option<impl RangeBounds<Bytes>> {
self.visible_range.clone()
}
pub(crate) fn compacted_effective_start_bound(&self) -> Bound<Bytes> {
assert!(matches!(self.sst.id, Compacted(_)));
self.effective_range.start_bound().cloned()
}
pub(crate) fn compacted_effective_start_key(&self) -> &Bytes {
assert!(matches!(self.sst.id, Compacted(_)));
match self.effective_range.start_bound() {
Included(k) => k,
_ => unreachable!("Invalid start bound"),
}
}
pub(crate) fn compacted_effective_range(&self) -> &BytesRange {
&self.effective_range
}
pub(crate) fn compacted_intersection(
&self,
next_view: Option<&SsTableView>,
range: &BytesRange,
) -> Option<BytesRange> {
assert!(matches!(self.sst.id, Compacted(_)));
if let Some(next_view) = next_view {
BytesRange::new(
self.compacted_effective_start_bound(),
Excluded(next_view.compacted_effective_start_key().clone()),
)
.intersect(range)
} else {
self.effective_range.intersect(range)
}
}
pub(crate) fn intersects_range(&self, end_bound: Bound<Bytes>, range: &BytesRange) -> bool {
let sst_range =
BytesRange::new(Unbounded, end_bound.clone()).intersect(&self.effective_range);
match sst_range {
Some(sst_range) => BytesRange::new(sst_range.start_bound().cloned(), end_bound)
.intersect(range)
.is_some(),
None => false,
}
}
pub(crate) fn calculate_view_range(&self, range: BytesRange) -> Option<BytesRange> {
if let Some(visible_range) = &self.visible_range {
return range.intersect(visible_range);
}
if self.sst.info.last_entry.is_some() {
return range.intersect(&self.effective_range);
}
Some(range)
}
pub fn estimate_size(&self) -> u64 {
self.sst.estimate_size()
}
}
#[derive(Clone, PartialEq, Hash, Eq, Copy, Serialize)]
pub enum SsTableId {
Wal(u64),
Compacted(Ulid),
}
impl SsTableId {
#[allow(clippy::panic)]
pub fn unwrap_wal_id(&self) -> u64 {
match self {
Wal(wal_id) => *wal_id,
Compacted(_) => panic!("found compacted id when unwrapping WAL ID"),
}
}
#[allow(clippy::panic)]
pub fn unwrap_compacted_id(&self) -> Ulid {
match self {
Wal(_) => panic!("found WAL id when unwrapping compacted ID"),
Compacted(ulid) => *ulid,
}
}
}
impl Debug for SsTableId {
fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
match self {
Wal(id) => write!(f, "SsTableId::Wal({})", id),
Compacted(id) => write!(f, "SsTableId::Compacted({})", id.to_string()),
}
}
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash, Serialize)]
pub enum SstType {
#[default]
Compacted,
Wal,
}
#[derive(Clone, Debug, PartialEq, Serialize, Default)]
pub struct SsTableInfo {
pub first_entry: Option<Bytes>,
pub last_entry: Option<Bytes>,
pub index_offset: u64,
pub index_len: u64,
pub filter_offset: u64,
pub filter_len: u64,
pub compression_codec: Option<CompressionCodec>,
pub sst_type: SstType,
pub stats_offset: u64,
pub stats_len: u64,
}
pub(crate) trait SsTableInfoCodec: Send + Sync {
fn encode(&self, manifest: &SsTableInfo) -> Bytes;
fn decode(&self, bytes: &Bytes) -> Result<SsTableInfo, SlateDBError>;
fn clone_box(&self) -> Box<dyn SsTableInfoCodec>;
}
impl Clone for Box<dyn SsTableInfoCodec> {
fn clone(&self) -> Self {
self.as_ref().clone_box()
}
}
#[derive(Clone, PartialEq, Serialize, Debug)]
pub struct SortedRun {
pub id: u32,
pub sst_views: Vec<SsTableView>,
}
impl SortedRun {
pub fn estimate_size(&self) -> u64 {
self.sst_views.iter().map(|sst| sst.estimate_size()).sum()
}
pub(crate) fn find_last_sst_with_range_covering_key(&self, key: &[u8]) -> Option<usize> {
let first_sst = self
.sst_views
.partition_point(|sst| sst.compacted_effective_start_key() <= key);
if first_sst > 0 {
return Some(first_sst - 1);
}
None
}
fn table_end_bound(&self, idx: usize) -> Bound<Bytes> {
let current_sst = &self.sst_views[idx];
if idx + 1 < self.sst_views.len() {
let next_sst = &self.sst_views[idx + 1];
if current_sst
.compacted_effective_range()
.contains(next_sst.compacted_effective_start_key())
{
Included(next_sst.compacted_effective_start_key().clone())
} else {
Excluded(next_sst.compacted_effective_start_key().clone())
}
} else {
Unbounded
}
}
fn point_table_idx_covering_key(&self, key: &[u8]) -> Range<usize> {
let Some(max_idx) = self.find_last_sst_with_range_covering_key(key) else {
return 0..0;
};
let point_range = BytesRange::from_slice(key..=key);
if !self.sst_views[max_idx].intersects_range(self.table_end_bound(max_idx), &point_range) {
return 0..0;
}
let mut min_idx = max_idx;
while min_idx > 0
&& self.sst_views[min_idx - 1]
.intersects_range(self.table_end_bound(min_idx - 1), &point_range)
{
min_idx -= 1;
}
min_idx..(max_idx + 1)
}
fn table_idx_covering_range(&self, range: &BytesRange) -> Range<usize> {
let mut min_idx = None;
let mut max_idx = 0;
for idx in 0..self.sst_views.len() {
let current_sst = &self.sst_views[idx];
if current_sst.intersects_range(self.table_end_bound(idx), range) {
if min_idx.is_none() {
min_idx = Some(idx);
}
max_idx = idx;
}
}
match min_idx {
Some(min_idx) => min_idx..(max_idx + 1),
None => 0..0,
}
}
pub fn tables_covering_range<R: RangeBounds<Bytes>>(&self, range: R) -> VecDeque<&SsTableView> {
let bytes_range = BytesRange::new(range.start_bound().cloned(), range.end_bound().cloned());
let matching_range = self.table_idx_covering_range(&bytes_range);
self.sst_views[matching_range].iter().collect()
}
pub(crate) fn tables_covering_point_key(&self, key: &[u8]) -> &[SsTableView] {
let matching_range = self.point_table_idx_covering_key(key);
&self.sst_views[matching_range]
}
pub(crate) fn into_tables_covering_range(
mut self,
range: &BytesRange,
) -> VecDeque<SsTableView> {
let matching_range = self.table_idx_covering_range(range);
self.sst_views.drain(matching_range).collect()
}
}
pub(crate) struct DbState {
memtable: WritableKVTable,
state: Arc<COWDbState>,
}
#[derive(Clone)]
pub(crate) struct COWDbState {
pub(crate) imm_memtable: VecDeque<Arc<ImmutableMemtable>>,
pub(crate) manifest: DirtyObject<Manifest>,
}
impl COWDbState {
pub(crate) fn core(&self) -> &ManifestCore {
&self.manifest.value.core
}
}
#[derive(Clone, PartialEq, Serialize, Debug)]
pub struct ManifestCore {
pub initialized: bool,
pub last_compacted_l0_sst_view_id: Option<Ulid>,
pub last_compacted_l0_sst_id: Option<Ulid>,
pub l0: VecDeque<SsTableView>,
pub compacted: Vec<SortedRun>,
pub next_wal_sst_id: u64,
pub replay_after_wal_id: u64,
pub last_l0_clock_tick: i64,
pub last_l0_seq: u64,
pub recent_snapshot_min_seq: u64,
pub sequence_tracker: SequenceTracker,
pub checkpoints: Vec<Checkpoint>,
pub wal_object_store_uri: Option<String>,
}
impl ManifestCore {
pub(crate) fn new() -> Self {
Self {
initialized: true,
last_compacted_l0_sst_view_id: None,
last_compacted_l0_sst_id: None,
l0: VecDeque::new(),
compacted: vec![],
next_wal_sst_id: 1,
replay_after_wal_id: 0,
last_l0_clock_tick: i64::MIN,
last_l0_seq: 0,
checkpoints: vec![],
wal_object_store_uri: None,
recent_snapshot_min_seq: 0,
sequence_tracker: SequenceTracker::new(),
}
}
pub(crate) fn new_with_wal_object_store(wal_object_store_uri: Option<String>) -> Self {
let mut this = Self::new();
this.wal_object_store_uri = wal_object_store_uri;
this
}
pub(crate) fn init_clone_db(&self) -> ManifestCore {
let mut clone = self.clone();
clone.initialized = false;
clone.checkpoints.clear();
clone
}
pub(crate) fn log_db_runs(&self) {
let l0s: Vec<_> = self.l0.iter().map(|l0| l0.estimate_size()).collect();
let compacted: Vec<_> = self
.compacted
.iter()
.map(|sr| (sr.id, sr.estimate_size()))
.collect();
debug!("DB Levels:");
debug!("-----------------");
debug!("{:?}", l0s);
debug!("{:?}", compacted);
debug!("-----------------");
}
pub(crate) fn find_checkpoint(&self, checkpoint_id: Uuid) -> Option<&Checkpoint> {
self.checkpoints.iter().find(|c| c.id == checkpoint_id)
}
}
#[derive(Clone)]
pub(crate) struct DbStateView {
pub(crate) memtable: Arc<KVTable>,
pub(crate) state: Arc<COWDbState>,
}
impl DbStateReader for DbStateView {
fn memtable(&self) -> Arc<KVTable> {
Arc::clone(&self.memtable)
}
fn imm_memtable(&self) -> &VecDeque<Arc<ImmutableMemtable>> {
&self.state.imm_memtable
}
fn core(&self) -> &ManifestCore {
self.state.core()
}
}
impl DbState {
pub(crate) fn new(manifest: DirtyObject<Manifest>) -> Self {
Self {
memtable: WritableKVTable::new(),
state: Arc::new(COWDbState {
imm_memtable: VecDeque::new(),
manifest,
}),
}
}
pub(crate) fn state(&self) -> Arc<COWDbState> {
self.state.clone()
}
pub(crate) fn view(&self) -> DbStateView {
DbStateView {
memtable: self.memtable.table().clone(),
state: self.state.clone(),
}
}
pub(crate) fn memtable(&self) -> &WritableKVTable {
&self.memtable
}
pub(crate) fn freeze_memtable(&mut self, recent_flushed_wal_id: u64) {
let old_memtable = std::mem::replace(&mut self.memtable, WritableKVTable::new());
self.modify(|modifier| {
modifier
.state
.imm_memtable
.push_front(Arc::new(ImmutableMemtable::new(
old_memtable,
recent_flushed_wal_id,
)))
});
}
pub(crate) fn replace_memtable(&mut self, memtable: WritableKVTable) {
assert!(self.memtable.is_empty());
let _ = std::mem::replace(&mut self.memtable, memtable);
}
pub(crate) fn merge_remote_manifest(&mut self, remote_manifest: DirtyObject<Manifest>) {
self.modify(|modifier| modifier.merge_remote_manifest(remote_manifest));
}
pub(crate) fn modify<F, R>(&mut self, fun: F) -> R
where
F: FnOnce(&mut StateModifier<'_>) -> R,
{
let mut modifier = StateModifier::new(self);
let result = fun(&mut modifier);
modifier.finish();
result
}
}
pub(crate) struct StateModifier<'a> {
db_state: &'a mut DbState,
pub(crate) state: COWDbState,
}
impl<'a> StateModifier<'a> {
fn new(db_state: &'a mut DbState) -> Self {
let state = db_state.state.as_ref().clone();
Self { db_state, state }
}
pub(crate) fn merge_remote_manifest(&mut self, mut remote_manifest: DirtyObject<Manifest>) {
let l0_last_compacted_view_id = &remote_manifest.value.core.last_compacted_l0_sst_view_id;
let l0_last_compacted_sst_id = &remote_manifest.value.core.last_compacted_l0_sst_id;
let new_l0 = if l0_last_compacted_view_id.is_some() || l0_last_compacted_sst_id.is_some() {
self.state
.manifest
.value
.core
.l0
.iter()
.cloned()
.take_while(|view| {
if let Some(view_id) = l0_last_compacted_view_id {
if view.id == *view_id {
return false;
}
}
if let Some(sst_id) = l0_last_compacted_sst_id {
if view.sst.id.unwrap_compacted_id() == *sst_id {
return false;
}
}
true
})
.collect()
} else {
self.state.manifest.value.core.l0.iter().cloned().collect()
};
let my_db_state = self.state.core();
remote_manifest.value.core = ManifestCore {
initialized: my_db_state.initialized,
last_compacted_l0_sst_view_id: remote_manifest.value.core.last_compacted_l0_sst_view_id,
last_compacted_l0_sst_id: remote_manifest.value.core.last_compacted_l0_sst_id,
l0: new_l0,
compacted: remote_manifest.value.core.compacted,
next_wal_sst_id: my_db_state.next_wal_sst_id,
replay_after_wal_id: my_db_state.replay_after_wal_id,
last_l0_clock_tick: my_db_state.last_l0_clock_tick,
last_l0_seq: my_db_state.last_l0_seq,
recent_snapshot_min_seq: my_db_state.recent_snapshot_min_seq,
sequence_tracker: my_db_state.sequence_tracker.clone(),
checkpoints: remote_manifest.value.core.checkpoints,
wal_object_store_uri: my_db_state.wal_object_store_uri.clone(),
};
self.state.manifest = remote_manifest;
}
fn finish(self) {
self.db_state.state = Arc::new(self.state);
}
}
impl WalIdStore for parking_lot::RwLock<DbState> {
fn next_wal_id(&self) -> u64 {
let mut state = self.write();
#[allow(clippy::needless_return)]
return state.modify(|modifier| {
let next_wal_id = modifier.state.manifest.value.core.next_wal_sst_id;
modifier.state.manifest.value.core.next_wal_sst_id += 1;
next_wal_id
});
}
}
#[cfg(test)]
mod tests {
use crate::checkpoint::Checkpoint;
use crate::db_state::{
DbState, SortedRun, SsTableHandle, SsTableId, SsTableInfo, SsTableView, SstType,
};
use crate::format::sst::SST_FORMAT_VERSION_LATEST;
use crate::manifest::store::test_utils::new_dirty_manifest;
use crate::proptest_util::arbitrary;
use crate::seq_tracker::{FindOption, SequenceTracker, TrackedSeq};
use crate::test_utils;
use bytes::Bytes;
use chrono::{TimeZone, Utc};
use proptest::collection::vec;
use proptest::proptest;
use slatedb_common::clock::{DefaultSystemClock, SystemClock};
use std::collections::BTreeSet;
use std::collections::Bound::Included;
use std::ops::RangeBounds;
#[test]
fn test_should_merge_db_state_with_new_checkpoints() {
let mut db_state = DbState::new(new_dirty_manifest());
let mut updated_state = new_dirty_manifest();
updated_state.value.core = db_state.state.core().clone();
let checkpoint = Checkpoint {
id: uuid::Uuid::new_v4(),
manifest_id: 1,
expire_time: None,
create_time: DefaultSystemClock::default().now(),
name: None,
};
updated_state
.value
.core
.checkpoints
.push(checkpoint.clone());
db_state.merge_remote_manifest(updated_state);
assert_eq!(vec![checkpoint], db_state.state.core().checkpoints);
}
#[test]
fn test_should_merge_db_state_with_l0s_up_to_last_compacted() {
let mut db_state = DbState::new(new_dirty_manifest());
add_l0s_to_dbstate(&mut db_state, 4);
let mut compactor_state = new_dirty_manifest();
compactor_state.value.core = db_state.state.core().clone();
let last_compacted = compactor_state.value.core.l0.pop_back().unwrap();
compactor_state.value.core.last_compacted_l0_sst_view_id = Some(last_compacted.id);
db_state.merge_remote_manifest(compactor_state.clone());
let expected: Vec<SsTableId> = compactor_state
.value
.core
.l0
.iter()
.map(|l0| l0.sst.id)
.collect();
let merged: Vec<SsTableId> = db_state
.state
.core()
.l0
.iter()
.map(|l0| l0.sst.id)
.collect();
assert_eq!(expected, merged);
}
#[test]
fn test_should_merge_db_state_with_all_l0s_if_none_compacted() {
let mut db_state = DbState::new(new_dirty_manifest());
add_l0s_to_dbstate(&mut db_state, 4);
let l0s = db_state.state.core().l0.clone();
db_state.merge_remote_manifest(new_dirty_manifest());
let expected: Vec<SsTableId> = l0s.iter().map(|l0| l0.sst.id).collect();
let merged: Vec<SsTableId> = db_state
.state
.core()
.l0
.iter()
.map(|l0| l0.sst.id)
.collect();
assert_eq!(expected, merged);
}
#[test]
fn test_should_keep_local_sequence_tracker_on_merge() {
let mut db_state = DbState::new(new_dirty_manifest());
db_state.modify(|modifier| {
let core = &mut modifier.state.manifest.value.core;
core.last_l0_seq = 3;
core.sequence_tracker.insert(TrackedSeq {
seq: 1,
ts: Utc.timestamp_opt(60, 0).single().unwrap(),
});
core.sequence_tracker.insert(TrackedSeq {
seq: 2,
ts: Utc.timestamp_opt(120, 0).single().unwrap(),
});
core.sequence_tracker.insert(TrackedSeq {
seq: 3,
ts: Utc.timestamp_opt(180, 0).single().unwrap(),
});
});
let mut remote_state = new_dirty_manifest();
remote_state.value.core = db_state.state.core().clone();
remote_state.value.core.sequence_tracker = SequenceTracker::new();
db_state.merge_remote_manifest(remote_state);
let tracker = &db_state.state.core().sequence_tracker;
assert_eq!(
tracker.find_ts(1, FindOption::RoundDown),
Utc.timestamp_opt(60, 0).single()
);
assert_eq!(
tracker.find_ts(2, FindOption::RoundDown),
Utc.timestamp_opt(120, 0).single()
);
assert_eq!(
tracker.find_ts(3, FindOption::RoundDown),
Utc.timestamp_opt(180, 0).single()
);
}
fn add_l0s_to_dbstate(db_state: &mut DbState, n: u32) {
let dummy_info = create_sst_info(None);
for i in 0..n {
db_state.freeze_memtable(i as u64);
let imm = db_state.state.imm_memtable.back().unwrap().clone();
let handle = SsTableHandle::new(
SsTableId::Compacted(ulid::Ulid::from_parts(i as u64, 0)),
SST_FORMAT_VERSION_LATEST,
dummy_info.clone(),
);
let view: SsTableView = SsTableView::identity(handle);
db_state.modify(|modifier| {
modifier.state.manifest.value.core.l0.push_front(view);
modifier.state.manifest.value.core.replay_after_wal_id =
imm.recent_flushed_wal_id();
});
}
}
#[test]
fn test_sorted_run_collect_tables_in_range() {
let max_bytes_len = 5;
proptest!(|(
table_first_keys in vec(arbitrary::nonempty_bytes(max_bytes_len), 1..10),
range in arbitrary::nonempty_range(max_bytes_len),
)| {
let sorted_first_keys: BTreeSet<Bytes> = table_first_keys.into_iter().collect();
let sorted_run = create_sorted_run(0, &sorted_first_keys);
let covering_tables = sorted_run.tables_covering_range(range.clone());
let first_key = sorted_first_keys.first().unwrap().clone();
let range_start_key = test_utils::bound_as_option(range.start_bound())
.cloned()
.unwrap_or_default();
let range_end_key = test_utils::bound_as_option(range.end_bound())
.cloned()
.unwrap_or(vec![u8::MAX; max_bytes_len + 1].into());
if covering_tables.is_empty() {
assert!(range_end_key <= first_key);
} else {
let covering_first_key = covering_tables.front()
.map(|t| t.compacted_effective_start_key().clone())
.unwrap();
if range_start_key < covering_first_key {
assert_eq!(covering_first_key, first_key)
}
let covering_last_key = covering_tables.iter().last()
.map(|t| t.compacted_effective_start_key().clone())
.unwrap();
if covering_last_key == range_end_key {
assert_eq!(Included(range_end_key), range.end_bound().cloned());
} else {
assert!(covering_last_key < range_end_key);
}
}
});
}
#[test]
fn test_sorted_run_collect_tables_for_point_key() {
let sorted_run = SortedRun {
id: 0,
sst_views: vec![
create_compacted_sst_view_with_bounds(b"a", Some(b"k")),
create_compacted_sst_view_with_bounds(b"k", Some(b"k")),
create_compacted_sst_view_with_bounds(b"k", Some(b"m")),
create_compacted_sst_view_with_bounds(b"z", Some(b"z")),
],
};
let covering_tables = sorted_run.tables_covering_point_key(b"k");
assert_eq!(covering_tables.len(), 3);
assert_eq!(
covering_tables[0].compacted_effective_start_key().as_ref(),
b"a"
);
assert_eq!(
covering_tables[1].compacted_effective_start_key().as_ref(),
b"k"
);
assert_eq!(
covering_tables[2].compacted_effective_start_key().as_ref(),
b"k"
);
assert!(sorted_run.tables_covering_point_key(b"0").is_empty());
}
fn create_sorted_run(id: u32, first_keys: &BTreeSet<Bytes>) -> SortedRun {
let mut ssts = Vec::new();
for first_key in first_keys {
ssts.push(create_compacted_sst_view(Some(first_key.clone())));
}
SortedRun {
id,
sst_views: ssts,
}
}
fn create_compacted_sst_view(first_entry: Option<Bytes>) -> SsTableView {
let sst_info = create_sst_info(first_entry);
let sst_id = SsTableId::Compacted(ulid::Ulid::from_parts(0, 0));
let handle = SsTableHandle::new(sst_id, SST_FORMAT_VERSION_LATEST, sst_info);
SsTableView::identity(handle)
}
fn create_compacted_sst_view_with_bounds(
first_entry: &[u8],
last_entry: Option<&[u8]>,
) -> SsTableView {
let sst_info = SsTableInfo {
first_entry: Some(Bytes::copy_from_slice(first_entry)),
last_entry: last_entry.map(Bytes::copy_from_slice),
index_offset: 0,
index_len: 0,
filter_offset: 0,
filter_len: 0,
compression_codec: None,
sst_type: SstType::default(),
stats_offset: 0,
stats_len: 0,
};
let sst_id = SsTableId::Compacted(ulid::Ulid::new());
let handle = SsTableHandle::new(sst_id, SST_FORMAT_VERSION_LATEST, sst_info);
SsTableView::identity(handle)
}
fn create_sst_info(first_entry: Option<Bytes>) -> SsTableInfo {
SsTableInfo {
first_entry,
last_entry: None,
index_offset: 0,
index_len: 0,
filter_offset: 0,
filter_len: 0,
compression_codec: None,
sst_type: SstType::default(),
stats_offset: 0,
stats_len: 0,
}
}
}