use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
use std::fmt::{Display, Formatter};
use log::{error, info};
use serde::{Deserialize, Serialize};
use ulid::Ulid;
use crate::db_state::{ManifestCore, SortedRun, SsTableHandle, SsTableView};
use crate::error::SlateDBError;
use crate::manifest::Manifest;
use slatedb_txn_obj::DirtyObject;
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, Serialize, Deserialize)]
pub enum SourceId {
SortedRun(u32),
SstView(Ulid),
}
impl Display for SourceId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
match self {
SourceId::SortedRun(id) => {
format!("{}", *id)
}
SourceId::SstView(_) => String::from("l0"),
}
)
}
}
impl SourceId {
pub(crate) fn unwrap_sorted_run(&self) -> u32 {
self.maybe_unwrap_sorted_run()
.expect("tried to unwrap SstView as Sorted Run")
}
pub(crate) fn maybe_unwrap_sorted_run(&self) -> Option<u32> {
match self {
SourceId::SortedRun(id) => Some(*id),
SourceId::SstView(_) => None,
}
}
pub(crate) fn maybe_unwrap_sst_view(&self) -> Option<Ulid> {
match self {
SourceId::SortedRun(_) => None,
SourceId::SstView(id) => Some(*id),
}
}
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct CompactionSpec {
sources: Vec<SourceId>,
destination: u32,
}
impl CompactionSpec {
pub fn new(sources: Vec<SourceId>, destination: u32) -> Self {
Self {
sources,
destination,
}
}
pub fn sources(&self) -> &Vec<SourceId> {
&self.sources
}
pub fn destination(&self) -> u32 {
self.destination
}
pub fn has_l0_sources(&self) -> bool {
self.sources
.iter()
.any(|s| matches!(s, SourceId::SstView(_)))
}
pub fn has_sr_sources(&self) -> bool {
self.sources
.iter()
.any(|s| matches!(s, SourceId::SortedRun(_)))
}
}
impl Display for CompactionSpec {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let displayed_sources: Vec<String> =
self.sources().iter().map(|s| format!("{}", s)).collect();
write!(f, "{:?} -> SR({})", displayed_sources, self.destination())
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)]
pub enum CompactionStatus {
Submitted,
Running,
Completed,
Failed,
}
impl CompactionStatus {
fn active(self) -> bool {
matches!(
self,
CompactionStatus::Submitted | CompactionStatus::Running
)
}
fn finished(self) -> bool {
matches!(self, CompactionStatus::Completed | CompactionStatus::Failed)
}
}
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct Compaction {
id: Ulid,
spec: CompactionSpec,
bytes_processed: u64,
status: CompactionStatus,
output_ssts: Vec<SsTableHandle>,
}
impl Compaction {
pub(crate) fn new(id: Ulid, spec: CompactionSpec) -> Self {
Self {
id,
spec,
bytes_processed: 0,
status: CompactionStatus::Submitted,
output_ssts: Vec::new(),
}
}
pub(crate) fn with_status(mut self, status: CompactionStatus) -> Self {
self.status = status;
self
}
pub(crate) fn with_output_ssts(mut self, output_ssts: Vec<SsTableHandle>) -> Self {
self.output_ssts = output_ssts;
self
}
pub(crate) fn get_sorted_runs(&self, db_state: &ManifestCore) -> Vec<SortedRun> {
let srs_by_id: HashMap<u32, &SortedRun> =
db_state.compacted.iter().map(|sr| (sr.id, sr)).collect();
self.spec
.sources()
.iter()
.filter_map(|s| s.maybe_unwrap_sorted_run())
.filter_map(|id| srs_by_id.get(&id).map(|t| (*t).clone()))
.collect()
}
pub(crate) fn get_l0_sst_views(&self, db_state: &ManifestCore) -> Vec<SsTableView> {
let sst_views_by_id: HashMap<Ulid, &SsTableView> =
db_state.l0.iter().map(|view| (view.id, view)).collect();
self.spec
.sources()
.iter()
.filter_map(|s| s.maybe_unwrap_sst_view())
.filter_map(|ulid| sst_views_by_id.get(&ulid).map(|t| (*t).clone()))
.collect()
}
pub fn id(&self) -> Ulid {
self.id
}
pub fn spec(&self) -> &CompactionSpec {
&self.spec
}
pub(crate) fn set_bytes_processed(&mut self, bytes: u64) {
self.bytes_processed = bytes;
}
pub fn bytes_processed(&self) -> u64 {
self.bytes_processed
}
pub fn status(&self) -> CompactionStatus {
self.status
}
pub(crate) fn set_output_ssts(&mut self, output_ssts: Vec<SsTableHandle>) {
assert!(
output_ssts.starts_with(self.output_ssts.as_slice()),
"new output SSTs must always extend previous output SSTs"
);
self.output_ssts = output_ssts;
}
pub fn output_ssts(&self) -> &Vec<SsTableHandle> {
&self.output_ssts
}
pub(crate) fn set_status(&mut self, status: CompactionStatus) {
self.status = status;
}
pub fn active(&self) -> bool {
self.status.active()
}
}
impl Display for Compaction {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let displayed_sources: Vec<_> = self
.spec
.sources()
.iter()
.map(|s| format!("{}", s))
.collect();
write!(
f,
"{:?} -> SR({})",
displayed_sources,
self.spec.destination(),
)?;
if self.bytes_processed > 0 {
let human_bytes_processed = crate::utils::format_bytes_si(self.bytes_processed);
write!(f, " ({} processed)", human_bytes_processed)?;
}
Ok(())
}
}
#[derive(Clone, Debug, Serialize)]
pub struct CompactionsCore {
recent_compactions: BTreeMap<Ulid, Compaction>,
}
impl CompactionsCore {
pub(crate) fn new() -> Self {
Self {
recent_compactions: BTreeMap::new(),
}
}
pub(crate) fn with_compactions(mut self, compactions: BTreeMap<Ulid, Compaction>) -> Self {
self.recent_compactions = compactions;
self
}
pub fn recent_compactions(&self) -> impl Iterator<Item = &Compaction> {
self.recent_compactions.values()
}
}
#[derive(Clone, Debug, Serialize)]
pub(crate) struct Compactions {
pub(crate) compactor_epoch: u64,
pub(crate) core: CompactionsCore,
}
impl Compactions {
pub(crate) fn new(compactor_epoch: u64) -> Self {
Self {
compactor_epoch,
core: CompactionsCore::new(),
}
}
pub(crate) fn with_compactions(mut self, compactions: Vec<Compaction>) -> Self {
let recent_compactions = compactions
.into_iter()
.map(|c| (c.id(), c))
.collect::<BTreeMap<Ulid, Compaction>>();
self.core.recent_compactions = recent_compactions;
self
}
pub(crate) fn insert(&mut self, compaction: Compaction) {
self.core
.recent_compactions
.insert(compaction.id, compaction);
}
pub(crate) fn get(&self, compaction_id: &Ulid) -> Option<&Compaction> {
self.core.recent_compactions.get(compaction_id)
}
pub(crate) fn get_mut(&mut self, compaction_id: &Ulid) -> Option<&mut Compaction> {
self.core.recent_compactions.get_mut(compaction_id)
}
#[cfg(test)]
pub(crate) fn contains(&self, compaction_id: &Ulid) -> bool {
self.core.recent_compactions.contains_key(compaction_id)
}
pub(crate) fn iter(&self) -> impl Iterator<Item = &Compaction> {
self.core.recent_compactions.values()
}
pub(crate) fn iter_mut(&mut self) -> impl Iterator<Item = &mut Compaction> {
self.core.recent_compactions.values_mut()
}
pub(crate) fn iter_active(&self) -> impl Iterator<Item = &Compaction> {
self.core.recent_compactions.values().filter(|c| c.active())
}
pub(crate) fn iter_with_status(
&self,
status: CompactionStatus,
) -> impl Iterator<Item = &Compaction> {
self.core
.recent_compactions
.values()
.filter(move |c| c.status() == status)
}
pub(crate) fn retain_active_and_last_finished(&mut self) {
let latest_finished = self
.core
.recent_compactions
.iter()
.filter_map(|(_, c)| {
if c.status().finished() {
Some(c.id())
} else {
None
}
})
.max();
self.core
.recent_compactions
.retain(|id, compaction| compaction.active() || Some(id) == latest_finished.as_ref());
}
}
pub struct CompactorState {
manifest: DirtyObject<Manifest>,
compactions: DirtyObject<Compactions>,
}
impl CompactorState {
pub(crate) fn new(
manifest: DirtyObject<Manifest>,
compactions: DirtyObject<Compactions>,
) -> Self {
assert_eq!(
manifest.value.compactor_epoch,
compactions.value.compactor_epoch
);
Self {
manifest,
compactions,
}
}
pub(crate) fn db_state(&self) -> &ManifestCore {
&self.manifest.value.core
}
pub(crate) fn manifest(&self) -> &DirtyObject<Manifest> {
&self.manifest
}
pub(crate) fn active_compactions(&self) -> impl Iterator<Item = &Compaction> {
self.compactions.value.iter_active()
}
pub(crate) fn compactions_with_status(
&self,
status: CompactionStatus,
) -> impl Iterator<Item = &Compaction> {
self.compactions.value.iter_with_status(status)
}
pub(crate) fn compactions(&self) -> &DirtyObject<Compactions> {
&self.compactions
}
pub(crate) fn set_compactions(&mut self, compactions: DirtyObject<Compactions>) {
assert_eq!(
self.manifest.value.compactor_epoch,
compactions.value.compactor_epoch
);
self.compactions = compactions;
}
pub(crate) fn merge_remote_compactions(
&mut self,
mut remote_compactions: DirtyObject<Compactions>,
) {
let mut merged = BTreeMap::new();
for compaction in self.compactions.value.iter() {
merged.insert(compaction.id(), compaction.clone());
}
for compaction in remote_compactions.value.iter() {
if let Entry::Vacant(v) = merged.entry(compaction.id()) {
if !matches!(compaction.status(), CompactionStatus::Submitted) {
error!(
"skipping remote commpaction with unexpected (non-Submitted) status [compaction={:?}]",
compaction,
);
continue;
}
v.insert(compaction.clone());
}
}
let mut merged_compactions = Compactions {
compactor_epoch: self.compactions.value.compactor_epoch,
core: CompactionsCore::new().with_compactions(merged),
};
merged_compactions.retain_active_and_last_finished();
remote_compactions.value = merged_compactions;
self.set_compactions(remote_compactions);
}
pub(crate) fn merge_remote_manifest(&mut self, mut remote_manifest: DirtyObject<Manifest>) {
let my_db_state = self.db_state();
let last_compacted_l0 = my_db_state.last_compacted_l0_sst_view_id;
let mut merged_l0s = VecDeque::new();
let writer_l0 = &remote_manifest.value.core.l0;
for writer_l0_sst in writer_l0 {
if match &last_compacted_l0 {
None => true,
Some(last_compacted_l0_id) => writer_l0_sst.id != *last_compacted_l0_id,
} {
merged_l0s.push_back(writer_l0_sst.clone());
} else {
break;
}
}
let merged = ManifestCore {
initialized: remote_manifest.value.core.initialized,
last_compacted_l0_sst_view_id: my_db_state.last_compacted_l0_sst_view_id,
last_compacted_l0_sst_id: my_db_state.last_compacted_l0_sst_id,
l0: merged_l0s,
compacted: my_db_state.compacted.clone(),
next_wal_sst_id: remote_manifest.value.core.next_wal_sst_id,
replay_after_wal_id: remote_manifest.value.core.replay_after_wal_id,
last_l0_clock_tick: remote_manifest.value.core.last_l0_clock_tick,
last_l0_seq: remote_manifest.value.core.last_l0_seq,
checkpoints: remote_manifest.value.core.checkpoints.clone(),
wal_object_store_uri: my_db_state.wal_object_store_uri.clone(),
recent_snapshot_min_seq: remote_manifest.value.core.recent_snapshot_min_seq,
sequence_tracker: remote_manifest.value.core.sequence_tracker,
};
remote_manifest.value.core = merged;
self.manifest = remote_manifest;
}
pub(crate) fn add_compaction(&mut self, compaction: Compaction) -> Result<(), SlateDBError> {
let spec = compaction.spec();
if self
.compactions
.value
.iter_active()
.map(|c| c.spec())
.any(|c| c.destination() == spec.destination())
{
return Err(SlateDBError::InvalidCompaction);
}
if self
.db_state()
.compacted
.iter()
.any(|sr| sr.id == spec.destination())
&& !spec.sources().iter().any(|src| match src {
SourceId::SortedRun(sr) => *sr == spec.destination(),
SourceId::SstView(_) => false,
})
{
return Err(SlateDBError::InvalidCompaction);
}
info!("accepted submitted compaction [compaction={}]", compaction);
self.compactions
.value
.core
.recent_compactions
.insert(compaction.id(), compaction);
Ok(())
}
pub(crate) fn update_compaction<F>(&mut self, compaction_id: &Ulid, f: F)
where
F: FnOnce(&mut Compaction),
{
if let Some(compaction) = self
.compactions
.value
.core
.recent_compactions
.get_mut(compaction_id)
{
f(compaction);
}
self.compactions.value.retain_active_and_last_finished();
}
pub(crate) fn finish_compaction(&mut self, compaction_id: Ulid, output_sr: SortedRun) {
let mut db_state = self.db_state().clone();
if let Some(compaction) = self.compactions.value.get_mut(&compaction_id) {
let spec = compaction.spec();
info!("finished compaction [spec={}]", spec);
let compaction_l0s: HashSet<Ulid> = spec
.sources()
.iter()
.filter_map(|id| id.maybe_unwrap_sst_view())
.collect();
let compaction_srs: HashSet<u32> = spec
.sources()
.iter()
.chain(std::iter::once(&SourceId::SortedRun(spec.destination())))
.filter_map(|id| id.maybe_unwrap_sorted_run())
.collect();
let new_l0: VecDeque<SsTableView> = db_state
.l0
.iter()
.filter(|l0| !compaction_l0s.contains(&l0.id))
.cloned()
.collect();
let mut new_compacted = Vec::new();
let mut inserted = false;
for compacted in db_state.compacted.iter() {
if !inserted && output_sr.id >= compacted.id {
new_compacted.push(output_sr.clone());
inserted = true;
}
if !compaction_srs.contains(&compacted.id) {
new_compacted.push(compacted.clone());
}
}
if !inserted {
new_compacted.push(output_sr);
}
Self::assert_compacted_srs_in_id_order(&new_compacted);
let first_source = spec
.sources()
.first()
.expect("illegal: empty compaction spec");
if let Some(view_id) = first_source.maybe_unwrap_sst_view() {
db_state.last_compacted_l0_sst_view_id = Some(view_id);
db_state.last_compacted_l0_sst_id = db_state
.l0
.iter()
.find(|v| v.id == view_id)
.map(|v| v.sst.id.unwrap_compacted_id());
}
db_state.l0 = new_l0;
db_state.compacted = new_compacted;
self.manifest.value.core = db_state;
self.update_compaction(&compaction_id, |c| {
c.set_status(CompactionStatus::Completed);
});
} else {
error!("compaction not found [compaction_id={}]", compaction_id);
}
}
fn assert_compacted_srs_in_id_order(compacted: &[SortedRun]) {
let mut last_sr_id = u32::MAX;
for sr in compacted.iter() {
assert!(sr.id < last_sr_id);
last_sr_id = sr.id;
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;
use super::*;
use crate::checkpoint::Checkpoint;
use crate::compactor_state::SourceId::SstView;
use crate::config::Settings;
use crate::db::Db;
use crate::db_state::SsTableId;
use crate::manifest::store::test_utils::new_dirty_manifest;
use crate::manifest::store::{ManifestStore, StoredManifest};
use crate::utils::IdGenerator;
use crate::DbRand;
use object_store::memory::InMemory;
use object_store::path::Path;
use object_store::ObjectStore;
use slatedb_common::clock::{DefaultSystemClock, SystemClock};
use slatedb_txn_obj::test_utils::new_dirty_object;
use tokio::runtime::{Handle, Runtime};
const PATH: &str = "/test/db";
#[test]
fn test_trim_keeps_latest_finished_and_active_compactions() {
let mut compactions = Compactions::new(0);
let oldest_finished = Ulid::from_parts(1, 0);
let latest_finished = Ulid::from_parts(2, 0);
let active = Ulid::from_parts(3, 0);
compactions.insert(compaction_with_status(
oldest_finished,
CompactionStatus::Completed,
));
compactions.insert(compaction_with_status(
latest_finished,
CompactionStatus::Completed,
));
compactions.insert(compaction_with_status(active, CompactionStatus::Submitted));
compactions.retain_active_and_last_finished();
assert!(compactions.contains(&active));
assert!(compactions.contains(&latest_finished));
assert!(!compactions.contains(&oldest_finished));
}
#[test]
fn test_compaction_status_active_and_finished() {
assert!(CompactionStatus::Submitted.active());
assert!(CompactionStatus::Running.active());
assert!(!CompactionStatus::Completed.active());
assert!(!CompactionStatus::Failed.active());
assert!(!CompactionStatus::Submitted.finished());
assert!(!CompactionStatus::Running.finished());
assert!(CompactionStatus::Completed.finished());
assert!(CompactionStatus::Failed.finished());
}
#[test]
fn test_trim_keeps_only_most_recent_finished_when_no_active() {
let mut compactions = Compactions::new(0);
let older = Ulid::from_parts(10, 0);
let middle = Ulid::from_parts(20, 0);
let newest = Ulid::from_parts(30, 0);
compactions.insert(compaction_with_status(older, CompactionStatus::Completed));
compactions.insert(compaction_with_status(middle, CompactionStatus::Failed));
compactions.insert(compaction_with_status(newest, CompactionStatus::Completed));
compactions.retain_active_and_last_finished();
assert!(!compactions.contains(&older));
assert!(!compactions.contains(&middle));
assert!(compactions.contains(&newest));
}
#[test]
fn test_trim_preserves_all_active_when_no_finished() {
let mut compactions = Compactions::new(0);
let submitted = Ulid::from_parts(1, 0);
let running = Ulid::from_parts(2, 0);
compactions.insert(compaction_with_status(
submitted,
CompactionStatus::Submitted,
));
compactions.insert(compaction_with_status(running, CompactionStatus::Running));
compactions.retain_active_and_last_finished();
assert!(compactions.contains(&submitted));
assert!(compactions.contains(&running));
}
#[test]
fn test_merge_remote_compactions_preserves_local_and_adds_remote() {
let manifest = new_dirty_manifest();
let compactor_epoch = manifest.value.compactor_epoch;
let local_running = Ulid::from_parts(1, 0);
let local_finished = Ulid::from_parts(2, 0);
let mut local_compactions = new_dirty_compactions(compactor_epoch);
local_compactions.value.insert(compaction_with_status(
local_running,
CompactionStatus::Running,
));
local_compactions.value.insert(compaction_with_status(
local_finished,
CompactionStatus::Completed,
));
let mut state = CompactorState::new(manifest, local_compactions);
let remote_submitted = Ulid::from_parts(3, 0);
let mut remote_compactions = new_dirty_compactions(compactor_epoch);
remote_compactions.value.insert(compaction_with_status(
remote_submitted,
CompactionStatus::Submitted,
));
state.merge_remote_compactions(remote_compactions);
let merged = &state.compactions.value;
assert!(merged.contains(&local_running));
assert_eq!(
merged.get(&local_running).expect("not found").status(),
CompactionStatus::Running
);
assert!(merged.contains(&local_finished));
assert_eq!(
merged.get(&local_finished).expect("not found").status(),
CompactionStatus::Completed
);
assert!(merged.contains(&remote_submitted));
assert_eq!(
merged.get(&remote_submitted).expect("not found").status(),
CompactionStatus::Submitted
);
}
#[test]
fn test_merge_remote_compactions_drops_non_submitted_remote() {
let manifest = new_dirty_manifest();
let compactor_epoch = manifest.value.compactor_epoch;
let local_running = Ulid::from_parts(1, 0);
let mut local_compactions = new_dirty_compactions(compactor_epoch);
local_compactions.value.insert(compaction_with_status(
local_running,
CompactionStatus::Running,
));
let mut state = CompactorState::new(manifest, local_compactions);
let remote_running = Ulid::from_parts(3, 0);
let mut remote_compactions = new_dirty_compactions(compactor_epoch);
remote_compactions.value.insert(compaction_with_status(
remote_running,
CompactionStatus::Running,
));
state.merge_remote_compactions(remote_compactions);
let merged = &state.compactions.value;
assert!(merged.contains(&local_running));
assert_eq!(
merged.get(&local_running).expect("not found").status(),
CompactionStatus::Running
);
assert!(!merged.contains(&remote_running));
}
#[test]
fn test_should_register_compaction() {
let rt = build_runtime();
let (_, _, mut state, system_clock, rand) = build_test_state(rt.handle());
let compaction_id = rand.rng().gen_ulid(system_clock.as_ref());
let spec = build_l0_compaction(&state.db_state().l0, 0);
let compaction = Compaction::new(compaction_id, spec.clone());
state
.add_compaction(compaction)
.expect("failed to add compaction");
let mut compactions = state.active_compactions();
let expected = Compaction::new(compaction_id, spec);
assert_eq!(compactions.next().expect("compaction not found"), &expected);
assert!(compactions.next().is_none());
}
#[test]
fn test_should_update_dbstate_when_compaction_finished() {
let rt = build_runtime();
let (_, _, mut state, system_clock, rand) = build_test_state(rt.handle());
let before_compaction = state.db_state().clone();
let compaction_id = rand.rng().gen_ulid(system_clock.as_ref());
let spec = build_l0_compaction(&before_compaction.l0, 0);
let compaction = Compaction::new(compaction_id, spec);
state
.add_compaction(compaction)
.expect("failed to add compaction");
let compacted_ssts = before_compaction.l0.iter().cloned().collect();
let sr = SortedRun {
id: 0,
sst_views: compacted_ssts,
};
state.finish_compaction(compaction_id, sr.clone());
assert_eq!(
state.db_state().last_compacted_l0_sst_view_id,
Some(before_compaction.l0.front().unwrap().id)
);
assert_eq!(state.db_state().l0.len(), 0);
assert_eq!(state.db_state().compacted.len(), 1);
assert_eq!(state.db_state().compacted.first().unwrap().id, sr.id);
let expected_ids: Vec<SsTableId> = sr.sst_views.iter().map(|h| h.sst.id).collect();
let found_ids: Vec<SsTableId> = state
.db_state()
.compacted
.first()
.unwrap()
.sst_views
.iter()
.map(|h| h.sst.id)
.collect();
assert_eq!(expected_ids, found_ids);
}
#[test]
fn test_should_remove_compaction_when_compaction_finished() {
let rt = build_runtime();
let (_, _, mut state, system_clock, rand) = build_test_state(rt.handle());
let before_compaction = state.db_state().clone();
let compaction_id = rand.rng().gen_ulid(system_clock.as_ref());
let spec = build_l0_compaction(&before_compaction.l0, 0);
let compaction = Compaction::new(compaction_id, spec);
state
.add_compaction(compaction)
.expect("failed to add compaction");
let compacted_ssts = before_compaction.l0.iter().cloned().collect();
let sr = SortedRun {
id: 0,
sst_views: compacted_ssts,
};
state.finish_compaction(compaction_id, sr);
assert_eq!(state.active_compactions().count(), 0)
}
#[test]
fn test_should_merge_db_state_correctly_when_never_compacted() {
let rt = build_runtime();
let (os, mut sm, mut state, _, _) = build_test_state(rt.handle());
let db = build_db(os.clone(), rt.handle());
rt.block_on(db.put(&[b'a'; 16], &[b'b'; 48])).unwrap();
rt.block_on(db.put(&[b'j'; 16], &[b'k'; 48])).unwrap();
wait_for_manifest_with_l0_len(&mut sm, rt.handle(), state.db_state().l0.len() + 1);
state.merge_remote_manifest(sm.prepare_dirty().unwrap());
assert!(state.db_state().last_compacted_l0_sst_view_id.is_none());
let expected_merged_l0s: Vec<Ulid> = sm
.manifest()
.core
.l0
.iter()
.map(|t| t.sst.id.unwrap_compacted_id())
.collect();
let merged_l0s: Vec<Ulid> = state
.db_state()
.l0
.iter()
.map(|h| h.sst.id.unwrap_compacted_id())
.collect();
assert_eq!(expected_merged_l0s, merged_l0s);
}
#[test]
fn test_should_merge_db_state_correctly() {
let rt = build_runtime();
let (os, mut sm, mut state, system_clock, rand) = build_test_state(rt.handle());
let original_l0s = &state.db_state().clone().l0;
let compaction_id = rand.rng().gen_ulid(system_clock.as_ref());
let spec = CompactionSpec::new(vec![SstView(original_l0s.back().unwrap().id)], 0);
let compaction = Compaction::new(compaction_id, spec);
state
.add_compaction(compaction)
.expect("failed to add compaction");
state.finish_compaction(
compaction_id,
SortedRun {
id: 0,
sst_views: vec![original_l0s.back().unwrap().clone()],
},
);
let db = build_db(os.clone(), rt.handle());
rt.block_on(db.put(&[b'a'; 16], &[b'b'; 48])).unwrap();
rt.block_on(db.put(&[b'j'; 16], &[b'k'; 48])).unwrap();
wait_for_manifest_with_l0_len(&mut sm, rt.handle(), original_l0s.len() + 1);
let db_state_before_merge = state.db_state().clone();
state.merge_remote_manifest(sm.prepare_dirty().unwrap());
let db_state = state.db_state();
let mut expected_merged_l0s: VecDeque<Ulid> = original_l0s
.iter()
.map(|h| h.sst.id.unwrap_compacted_id())
.collect();
expected_merged_l0s.pop_back();
let new_l0 = sm
.manifest()
.core
.l0
.front()
.unwrap()
.sst
.id
.unwrap_compacted_id();
expected_merged_l0s.push_front(new_l0);
let merged_l0: VecDeque<Ulid> = db_state
.l0
.iter()
.map(|h| h.sst.id.unwrap_compacted_id())
.collect();
assert_eq!(merged_l0, expected_merged_l0s);
assert_eq!(
compacted_to_description(&db_state.compacted),
compacted_to_description(&db_state_before_merge.compacted)
);
assert_eq!(
db_state.replay_after_wal_id,
sm.manifest().core.replay_after_wal_id
);
assert_eq!(db_state.next_wal_sst_id, sm.manifest().core.next_wal_sst_id);
}
#[test]
fn test_should_merge_db_state_correctly_when_all_l0_compacted() {
let rt = build_runtime();
let (os, mut sm, mut state, system_clock, rand) = build_test_state(rt.handle());
let original_l0s = &state.db_state().clone().l0;
let compaction_id = rand.rng().gen_ulid(system_clock.as_ref());
let spec = CompactionSpec::new(original_l0s.iter().map(|h| SstView(h.id)).collect(), 0);
let compaction = Compaction::new(compaction_id, spec);
state
.add_compaction(compaction)
.expect("failed to add compaction");
state.finish_compaction(
compaction_id,
SortedRun {
id: 0,
sst_views: original_l0s.clone().into(),
},
);
assert_eq!(state.db_state().l0.len(), 0);
let db = build_db(os.clone(), rt.handle());
rt.block_on(db.put(&[b'a'; 16], &[b'b'; 48])).unwrap();
rt.block_on(db.put(&[b'j'; 16], &[b'k'; 48])).unwrap();
wait_for_manifest_with_l0_len(&mut sm, rt.handle(), original_l0s.len() + 1);
state.merge_remote_manifest(sm.prepare_dirty().unwrap());
let db_state = state.db_state();
let mut expected_merged_l0s = VecDeque::new();
let new_l0 = sm
.manifest()
.core
.l0
.front()
.unwrap()
.sst
.id
.unwrap_compacted_id();
expected_merged_l0s.push_front(new_l0);
let merged_l0: VecDeque<Ulid> = db_state
.l0
.iter()
.map(|h| h.sst.id.unwrap_compacted_id())
.collect();
assert_eq!(merged_l0, expected_merged_l0s);
}
#[test]
fn test_should_merge_db_state_with_new_checkpoints() {
let manifest = new_dirty_manifest();
let compactions = new_dirty_compactions(manifest.value.compactor_epoch);
let mut state = CompactorState::new(manifest, compactions);
let mut dirty = new_dirty_manifest();
let checkpoint = Checkpoint {
id: uuid::Uuid::new_v4(),
manifest_id: 1,
expire_time: None,
create_time: DefaultSystemClock::default().now(),
name: None,
};
dirty.value.core.checkpoints.push(checkpoint.clone());
state.merge_remote_manifest(dirty);
assert_eq!(vec![checkpoint], state.db_state().checkpoints);
}
#[test]
fn test_should_submit_correct_compaction() {
let rt = build_runtime();
let (_os, mut _sm, mut state, system_clock, rand) = build_test_state(rt.handle());
let original_l0s = &state.db_state().clone().l0;
let compaction_id = rand.rng().gen_ulid(system_clock.as_ref());
let spec = CompactionSpec::new(
original_l0s
.iter()
.enumerate()
.filter(|(i, _e)| i > &2usize)
.map(|(_i, x)| SstView(x.id))
.collect::<Vec<SourceId>>(),
0,
);
let compaction = Compaction::new(compaction_id, spec);
let result = state.add_compaction(compaction);
assert!(result.is_ok());
}
#[test]
fn test_source_boundary_compaction() {
let rt = build_runtime();
let (_os, mut _sm, mut state, system_clock, rand) = build_test_state(rt.handle());
let original_l0s = &state.db_state().clone().l0;
let original_srs = &state.db_state().clone().compacted;
let l0_sources = original_l0s.iter().skip(3).map(|h| SourceId::SstView(h.id));
let sr_sources = original_srs
.iter()
.take(3)
.map(|sr| SourceId::SortedRun(sr.id));
let sources: Vec<SourceId> = l0_sources.chain(sr_sources).collect();
let compaction_id = rand.rng().gen_ulid(system_clock.as_ref());
let spec = CompactionSpec::new(sources, 0);
let compaction = Compaction::new(compaction_id, spec);
let result = state.add_compaction(compaction);
assert!(result.is_ok());
}
fn new_dirty_compactions(compactor_epoch: u64) -> DirtyObject<Compactions> {
new_dirty_object(1u64, Compactions::new(compactor_epoch))
}
fn compaction_with_status(id: Ulid, status: CompactionStatus) -> Compaction {
Compaction::new(id, CompactionSpec::new(vec![], 0)).with_status(status)
}
fn run_for<T, F>(duration: Duration, mut f: F) -> Option<T>
where
F: FnMut() -> Option<T>,
{
let clock = DefaultSystemClock::default();
let start = clock.now();
while clock
.now()
.signed_duration_since(start)
.to_std()
.expect("duration < 0 not allowed")
< duration
{
let maybe_result = f();
if maybe_result.is_some() {
return maybe_result;
}
sleep(Duration::from_millis(100));
}
None
}
#[derive(PartialEq, Debug)]
struct SortedRunDescription {
id: u32,
ssts: Vec<SsTableId>,
}
fn build_runtime() -> Runtime {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
}
fn compacted_to_description(compacted: &[SortedRun]) -> Vec<SortedRunDescription> {
compacted.iter().map(sorted_run_to_description).collect()
}
fn sorted_run_to_description(sr: &SortedRun) -> SortedRunDescription {
SortedRunDescription {
id: sr.id,
ssts: sr.sst_views.iter().map(|h| h.sst.id).collect(),
}
}
fn wait_for_manifest_with_l0_len(
stored_manifest: &mut StoredManifest,
tokio_handle: &Handle,
len: usize,
) {
run_for(Duration::from_secs(30), || {
let manifest = tokio_handle.block_on(stored_manifest.refresh()).unwrap();
if manifest.core.l0.len() == len {
return Some(manifest.core.clone());
}
None
})
.expect("no manifest found with l0 len");
}
fn build_l0_compaction(ssts: &VecDeque<SsTableView>, dst: u32) -> CompactionSpec {
let sources = ssts.iter().map(|h| SourceId::SstView(h.id)).collect();
CompactionSpec::new(sources, dst)
}
fn build_db(os: Arc<dyn ObjectStore>, tokio_handle: &Handle) -> Db {
let opts = Settings {
l0_sst_size_bytes: 256,
compactor_options: None,
..Default::default()
};
tokio_handle
.block_on(Db::builder(PATH, os.clone()).with_settings(opts).build())
.unwrap()
}
#[allow(clippy::type_complexity)]
fn build_test_state(
tokio_handle: &Handle,
) -> (
Arc<dyn ObjectStore>,
StoredManifest,
CompactorState,
Arc<dyn SystemClock>,
Arc<DbRand>,
) {
let os: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let db = build_db(os.clone(), tokio_handle);
let l0_count: u64 = 5;
for i in 0..l0_count {
tokio_handle
.block_on(db.put(&[b'a' + i as u8; 16], &[b'b' + i as u8; 48]))
.unwrap();
tokio_handle
.block_on(db.put(&[b'j' + i as u8; 16], &[b'k' + i as u8; 48]))
.unwrap();
}
tokio_handle.block_on(db.close()).unwrap();
let system_clock: Arc<dyn SystemClock> = Arc::new(DefaultSystemClock::new());
let rand: Arc<DbRand> = Arc::new(DbRand::default());
let manifest_store = Arc::new(ManifestStore::new(&Path::from(PATH), os.clone()));
let stored_manifest = tokio_handle
.block_on(StoredManifest::load(
manifest_store,
Arc::new(DefaultSystemClock::new()),
))
.unwrap();
let compactions = new_dirty_compactions(stored_manifest.manifest().compactor_epoch);
let state = CompactorState::new(stored_manifest.prepare_dirty().unwrap(), compactions);
(os, stored_manifest, state, system_clock, rand)
}
}