use std::{
collections::HashSet,
fs,
future::Future,
pin::Pin,
sync::{
Arc, Mutex as StdMutex,
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
},
time::{Duration, SystemTime},
};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use fusio::{
DynFs, Error as FusioError, Write,
disk::LocalFs,
dynamic::{MaybeSend, MaybeSendFuture, MaybeSync},
executor::{Executor, Instant, NoopExecutor, Timer, tokio::TokioExecutor},
fs::{CasCondition, FileMeta, FileSystemTag, Fs, FsCas, OpenOptions},
mem::fs::InMemoryFs,
path::Path,
};
use fusio_manifest::ObjectHead;
use futures::{StreamExt, future::AbortHandle};
use tokio::{sync::Mutex, time::sleep};
use typed_arrow_dyn::{DynCell, DynRow};
use super::common::workspace_temp_dir;
use crate::{
compaction::{
CompactionHandle, CompactionWorkerConfig,
executor::{
CompactionError, CompactionExecutor, CompactionJob, CompactionOutcome,
LocalCompactionExecutor,
},
metrics::{CompactionMetrics, CompactionQueueDropContext, CompactionQueueDropReason},
orchestrator,
planner::{
CompactionInput, CompactionPlanner, CompactionSnapshot, CompactionTask,
LeveledCompactionPlanner, LeveledPlannerConfig,
},
},
db::{
BackpressureDecision, CasBackoffConfig, CascadeConfig, DB, DbInner, L0BackpressureConfig,
L0Stats, L0StatsRefreshGuard,
},
extractor::projection_for_columns,
id::FileIdGenerator,
inmem::{
immutable::memtable::{
DeleteSidecar, ImmutableIndexEntry, ImmutableMemTable, bundle_mvcc_sidecar,
},
policy::BatchesThreshold,
},
key::KeyTsViewRaw,
manifest::{
ManifestError, SstEntry, TableId, TonboManifest, VersionEdit, VersionState, WalSegmentRef,
},
mvcc::Timestamp,
ondisk::sstable::{SsTableBuilder, SsTableConfig, SsTableDescriptor, SsTableId, SsTableReader},
schema::SchemaBuilder,
test::{build_batch, config_with_pk},
};
type SleepHook = Arc<dyn Fn(Duration) -> Pin<Box<dyn MaybeSendFuture<Output = ()>>> + Send + Sync>;
#[derive(Clone)]
struct FailOnceRemoveFs {
inner: LocalFs,
fail_path: Arc<String>,
failed_once: Arc<AtomicBool>,
}
impl FailOnceRemoveFs {
fn new(fail_path: impl Into<String>) -> Self {
Self {
inner: LocalFs {},
fail_path: Arc::new(fail_path.into()),
failed_once: Arc::new(AtomicBool::new(false)),
}
}
fn has_failed_once(&self) -> bool {
self.failed_once.load(Ordering::SeqCst)
}
}
impl Fs for FailOnceRemoveFs {
type File = <LocalFs as Fs>::File;
fn file_system(&self) -> FileSystemTag {
Fs::file_system(&self.inner)
}
async fn open_options(
&self,
path: &Path,
options: OpenOptions,
) -> Result<Self::File, FusioError> {
Fs::open_options(&self.inner, path, options).await
}
async fn create_dir_all(path: &Path) -> Result<(), FusioError> {
<LocalFs as Fs>::create_dir_all(path).await
}
async fn list(
&self,
path: &Path,
) -> Result<
impl futures::Stream<Item = Result<FileMeta, FusioError>> + fusio::dynamic::MaybeSend,
FusioError,
> {
Fs::list(&self.inner, path).await
}
async fn exists(&self, path: &Path) -> Result<bool, FusioError> {
Fs::exists(&self.inner, path).await
}
async fn remove(&self, path: &Path) -> Result<(), FusioError> {
if path.as_ref() == self.fail_path.as_str()
&& !self.failed_once.swap(true, Ordering::SeqCst)
{
return Err(std::io::Error::other("injected sweep delete failure").into());
}
Fs::remove(&self.inner, path).await
}
async fn copy(&self, from: &Path, to: &Path) -> Result<(), FusioError> {
Fs::copy(&self.inner, from, to).await
}
async fn link(&self, from: &Path, to: &Path) -> Result<(), FusioError> {
Fs::link(&self.inner, from, to).await
}
}
impl FsCas for FailOnceRemoveFs {
fn load_with_tag(
&self,
path: &Path,
) -> Pin<Box<dyn MaybeSendFuture<Output = Result<Option<(Vec<u8>, String)>, FusioError>> + '_>>
{
self.inner.load_with_tag(path)
}
fn put_conditional(
&self,
path: &Path,
payload: &[u8],
content_type: Option<&str>,
metadata: Option<Vec<(String, String)>>,
condition: CasCondition,
) -> Pin<Box<dyn MaybeSendFuture<Output = Result<String, FusioError>> + '_>> {
self.inner
.put_conditional(path, payload, content_type, metadata, condition)
}
}
impl ObjectHead for FailOnceRemoveFs {
fn head_metadata<'a>(
&'a self,
path: &'a Path,
) -> Pin<
Box<
dyn MaybeSendFuture<
Output = Result<Option<std::collections::HashMap<String, String>>, FusioError>,
> + 'a,
>,
> {
self.inner.head_metadata(path)
}
}
#[derive(Clone)]
struct RecordingExecutor {
inner: TokioExecutor,
sleep_calls: Arc<StdMutex<Vec<Duration>>>,
sleep_hook: Arc<StdMutex<Option<SleepHook>>>,
}
impl RecordingExecutor {
fn new() -> Self {
Self {
inner: TokioExecutor::default(),
sleep_calls: Arc::new(StdMutex::new(Vec::new())),
sleep_hook: Arc::new(StdMutex::new(None)),
}
}
fn set_hook(&self, hook: SleepHook) {
let mut guard = self.sleep_hook.lock().expect("sleep hook lock");
*guard = Some(hook);
}
fn sleep_calls(&self) -> Vec<Duration> {
self.sleep_calls.lock().expect("sleep calls lock").clone()
}
}
impl Executor for RecordingExecutor {
type JoinHandle<R>
= <TokioExecutor as Executor>::JoinHandle<R>
where
R: MaybeSend;
type Mutex<T>
= <TokioExecutor as Executor>::Mutex<T>
where
T: MaybeSend + MaybeSync;
type RwLock<T>
= <TokioExecutor as Executor>::RwLock<T>
where
T: MaybeSend + MaybeSync;
fn spawn<F>(&self, future: F) -> Self::JoinHandle<F::Output>
where
F: Future + MaybeSend + 'static,
F::Output: MaybeSend,
{
self.inner.spawn(future)
}
fn mutex<T>(value: T) -> Self::Mutex<T>
where
T: MaybeSend + MaybeSync,
{
<TokioExecutor as Executor>::mutex(value)
}
fn rw_lock<T>(value: T) -> Self::RwLock<T>
where
T: MaybeSend + MaybeSync,
{
<TokioExecutor as Executor>::rw_lock(value)
}
}
fn local_sst_path<FS, E>(db: &DbInner<FS, E>, relative: &str) -> Path
where
FS: crate::manifest::ManifestFs<E>,
E: Executor + Timer + Clone + 'static,
<FS as fusio::fs::Fs>::File: fusio::durability::FileCommit,
{
Path::from(format!("{}/{}", db.sst_root.as_ref(), relative))
}
async fn write_local_sst_object(
path: &Path,
bytes: &[u8],
) -> Result<(), Box<dyn std::error::Error>> {
let parent = std::path::Path::new(path.as_ref())
.parent()
.ok_or("sst object missing parent directory")?;
fs::create_dir_all(parent)?;
let fs = LocalFs {};
let mut file = Fs::open_options(
&fs,
path,
OpenOptions::default().create(true).truncate(true),
)
.await?;
let (write_res, _) = file.write_all(bytes.to_vec()).await;
write_res?;
file.close().await?;
Ok(())
}
async fn local_sst_exists(path: &Path) -> bool {
Fs::open(&LocalFs {}, path).await.is_ok()
}
fn count_local_sst_objects(root: &std::path::Path) -> Result<u64, Box<dyn std::error::Error>> {
fn visit(path: &std::path::Path, count: &mut u64) -> Result<(), std::io::Error> {
for entry in fs::read_dir(path)? {
let entry = entry?;
let file_type = entry.file_type()?;
if file_type.is_dir() {
visit(&entry.path(), count)?;
} else if entry
.path()
.extension()
.is_some_and(|extension| extension == "parquet")
{
*count = count.saturating_add(1);
}
}
Ok(())
}
let mut count = 0u64;
visit(root, &mut count)?;
Ok(count)
}
impl Timer for RecordingExecutor {
fn sleep(&self, dur: Duration) -> Pin<Box<dyn MaybeSendFuture<Output = ()>>> {
let calls = Arc::clone(&self.sleep_calls);
let hook = self.sleep_hook.lock().expect("sleep hook lock").clone();
Box::pin(async move {
calls.lock().expect("sleep calls lock").push(dur);
if let Some(hook) = hook {
hook(dur).await;
}
})
}
fn now(&self) -> Instant {
Instant::now()
}
fn system_time(&self) -> SystemTime {
SystemTime::now()
}
}
fn dummy_compaction_handle<E: Executor>() -> CompactionHandle<E> {
let (abort, _reg) = AbortHandle::new_pair();
CompactionHandle::new(abort, None, None)
}
#[test]
fn l0_backpressure_thresholds_select_actions() {
let config = L0BackpressureConfig::new(2, 4);
let stats = L0Stats {
file_count: 1,
total_bytes: Some(0),
};
assert!(matches!(
config.decision(stats),
BackpressureDecision::Proceed
));
let stats = L0Stats {
file_count: 2,
total_bytes: Some(0),
};
assert!(matches!(
config.decision(stats),
BackpressureDecision::Slowdown(_)
));
let stats = L0Stats {
file_count: 4,
total_bytes: Some(0),
};
assert!(matches!(
config.decision(stats),
BackpressureDecision::Stall(_)
));
}
#[test]
fn l0_backpressure_bytes_can_trigger() {
let config = L0BackpressureConfig::new(10, 20)
.slowdown_bytes(100)
.stop_bytes(200);
let stats = L0Stats {
file_count: 1,
total_bytes: Some(150),
};
assert!(matches!(
config.decision(stats),
BackpressureDecision::Slowdown(_)
));
let stats = L0Stats {
file_count: 1,
total_bytes: Some(250),
};
assert!(matches!(
config.decision(stats),
BackpressureDecision::Stall(_)
));
}
#[tokio::test(flavor = "current_thread")]
async fn l0_backpressure_applies_slowdown_delay() -> Result<(), Box<dyn std::error::Error>> {
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, false)]));
let mode_cfg = SchemaBuilder::from_schema(Arc::clone(&schema))
.primary_key("id")
.build()
.expect("schema builder");
let executor = RecordingExecutor::new();
let mut db: DbInner<InMemoryFs, RecordingExecutor> =
DB::new(mode_cfg, Arc::new(executor.clone()))
.await?
.into_inner();
let metrics = Arc::new(CompactionMetrics::new());
db.compaction_metrics = Some(Arc::clone(&metrics));
db.l0_backpressure = Some(
L0BackpressureConfig::new(1, 3)
.slowdown_delay(Duration::from_millis(7))
.stop_delay(Duration::from_millis(11)),
);
db.compaction_worker = Some(dummy_compaction_handle());
let entry = SstEntry::new(
SsTableId::new(1),
None,
None,
Path::from("L0/1.parquet"),
None,
);
db.manifest
.apply_version_edits(
db.manifest_table,
&[VersionEdit::AddSsts {
level: 0,
entries: vec![entry],
}],
)
.await?;
db.apply_l0_backpressure().await?;
let sleeps = executor.sleep_calls();
assert_eq!(sleeps, vec![Duration::from_millis(7)]);
let snapshot = metrics.snapshot();
assert_eq!(snapshot.backpressure_slowdown, 1);
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn l0_backpressure_stall_rechecks_until_reduced() -> Result<(), Box<dyn std::error::Error>> {
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, false)]));
let mode_cfg = SchemaBuilder::from_schema(Arc::clone(&schema))
.primary_key("id")
.build()
.expect("schema builder");
let executor = RecordingExecutor::new();
let mut db: DbInner<InMemoryFs, RecordingExecutor> =
DB::new(mode_cfg, Arc::new(executor.clone()))
.await?
.into_inner();
let metrics = Arc::new(CompactionMetrics::new());
db.compaction_metrics = Some(Arc::clone(&metrics));
db.l0_backpressure = Some(
L0BackpressureConfig::new(1, 2)
.slowdown_delay(Duration::from_millis(3))
.stop_delay(Duration::from_millis(9)),
);
db.compaction_worker = Some(dummy_compaction_handle());
let sst_id_a = SsTableId::new(1);
let sst_id_b = SsTableId::new(2);
let entry_a = SstEntry::new(
sst_id_a.clone(),
None,
None,
Path::from("L0/1.parquet"),
None,
);
let entry_b = SstEntry::new(
sst_id_b.clone(),
None,
None,
Path::from("L0/2.parquet"),
None,
);
db.manifest
.apply_version_edits(
db.manifest_table,
&[VersionEdit::AddSsts {
level: 0,
entries: vec![entry_a, entry_b],
}],
)
.await?;
let hook_calls = Arc::new(AtomicUsize::new(0));
let manifest = db.manifest.clone();
let table = db.manifest_table;
let remove_id = sst_id_b.clone();
let hook_calls_clone = Arc::clone(&hook_calls);
executor.set_hook(Arc::new(move |_delay| {
let manifest = manifest.clone();
let hook_calls = Arc::clone(&hook_calls_clone);
let remove_id = remove_id.clone();
Box::pin(async move {
if hook_calls.fetch_add(1, Ordering::SeqCst) == 0 {
manifest
.apply_version_edits(
table,
&[VersionEdit::RemoveSsts {
level: 0,
sst_ids: vec![remove_id],
}],
)
.await
.expect("remove sst");
}
})
}));
db.apply_l0_backpressure().await?;
let sleeps = executor.sleep_calls();
assert_eq!(
sleeps,
vec![Duration::from_millis(9), Duration::from_millis(3)]
);
let snapshot = metrics.snapshot();
assert_eq!(snapshot.backpressure_stall, 1);
assert_eq!(snapshot.backpressure_slowdown, 1);
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn l0_backpressure_cache_stale_until_forced_refresh() -> Result<(), Box<dyn std::error::Error>>
{
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, false)]));
let mode_cfg = SchemaBuilder::from_schema(Arc::clone(&schema))
.primary_key("id")
.build()
.expect("schema builder");
let executor = RecordingExecutor::new();
let mut db: DbInner<InMemoryFs, RecordingExecutor> =
DB::new(mode_cfg, Arc::new(executor.clone()))
.await?
.into_inner();
let backpressure = L0BackpressureConfig::new(1, 2)
.slowdown_delay(Duration::from_millis(1))
.stop_delay(Duration::from_secs(5));
db.l0_backpressure = Some(backpressure.clone());
db.compaction_worker = Some(dummy_compaction_handle());
let entry_a = SstEntry::new(
SsTableId::new(1),
None,
None,
Path::from("L0/1.parquet"),
None,
);
db.manifest
.apply_version_edits(
db.manifest_table,
&[VersionEdit::AddSsts {
level: 0,
entries: vec![entry_a],
}],
)
.await?;
let decision = db.l0_backpressure_decision(&backpressure, false).await?;
assert!(matches!(decision, BackpressureDecision::Slowdown(_)));
let entry_b = SstEntry::new(
SsTableId::new(2),
None,
None,
Path::from("L0/2.parquet"),
None,
);
db.manifest
.apply_version_edits(
db.manifest_table,
&[VersionEdit::AddSsts {
level: 0,
entries: vec![entry_b],
}],
)
.await?;
let cached = db.l0_backpressure_decision(&backpressure, false).await?;
assert!(matches!(cached, BackpressureDecision::Slowdown(_)));
let refreshed = db.l0_backpressure_decision(&backpressure, true).await?;
assert!(matches!(refreshed, BackpressureDecision::Stall(_)));
Ok(())
}
#[test]
fn l0_stats_refresh_guard_clears_on_drop() {
let flag = AtomicBool::new(true);
{
let _guard = L0StatsRefreshGuard::new(&flag);
}
assert!(!flag.load(Ordering::Acquire));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn plan_compaction_returns_task() -> Result<(), Box<dyn std::error::Error>> {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
]));
let config = SchemaBuilder::from_schema(schema)
.primary_key("id")
.with_metadata()
.build()
.expect("schema builder");
let schema = Arc::clone(&config.schema);
let executor = Arc::new(TokioExecutor::default());
let policy = Arc::new(BatchesThreshold { batches: 1 });
let db: DbInner<InMemoryFs, TokioExecutor> =
DB::new_with_policy(config, Arc::clone(&executor), policy)
.await?
.into_inner();
let fs: Arc<dyn DynFs> = Arc::new(LocalFs {});
let sst_cfg = Arc::new(SsTableConfig::new(
Arc::clone(&schema),
fs,
Path::from("/tmp/plan-compaction"),
));
for pass in 0..2 {
let rows = vec![vec![
Some(DynCell::Str(format!("comp-{pass}"))),
Some(DynCell::I32(pass)),
]];
let batch = build_batch(Arc::clone(&schema), rows).expect("batch");
db.ingest(batch).await.expect("ingest");
let descriptor = SsTableDescriptor::new(SsTableId::new(pass as u64 + 1), 0);
db.flush_immutables_with_descriptor(Arc::clone(&sst_cfg), descriptor)
.await
.expect("flush");
}
let planner = LeveledCompactionPlanner::new(LeveledPlannerConfig {
l0_trigger: 1,
l0_max_inputs: 2,
l0_max_bytes: None,
level_thresholds: vec![usize::MAX],
level_max_bytes: Vec::new(),
max_inputs_per_task: 2,
max_task_bytes: None,
});
let task = db
.plan_compaction_task(&planner)
.await?
.expect("compaction task");
assert_eq!(task.source_level, 0);
assert_eq!(task.target_level, 1);
assert_eq!(task.input.len(), 2);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn plan_compaction_empty_manifest_is_none() -> Result<(), Box<dyn std::error::Error>> {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
]));
let config = SchemaBuilder::from_schema(schema.clone())
.primary_key("id")
.with_metadata()
.build()
.expect("schema builder");
let executor = Arc::new(TokioExecutor::default());
let db: DB<InMemoryFs, TokioExecutor> = DB::new(config, Arc::clone(&executor)).await?;
let planner = LeveledCompactionPlanner::new(LeveledPlannerConfig::default());
let plan = db.inner().plan_compaction_task(&planner).await?;
assert!(plan.is_none());
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn minor_compaction_flushes_after_seal() -> Result<(), Box<dyn std::error::Error>> {
let db_root = workspace_temp_dir("minor-compaction-flush");
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
]));
let config = SchemaBuilder::from_schema(schema.clone())
.primary_key("id")
.with_metadata()
.build()
.expect("schema builder");
let batch_schema = Arc::clone(&config.schema);
let mut db: DbInner<LocalFs, TokioExecutor> = DB::<LocalFs, TokioExecutor>::builder(config)
.on_disk(&db_root)?
.with_minor_compaction(1, 0)
.build()
.await?
.into_inner();
db.set_seal_policy(Arc::new(BatchesThreshold { batches: 1 }));
let rows = vec![DynRow(vec![
Some(DynCell::Str("k".into())),
Some(DynCell::I32(1)),
])];
let batch = build_batch(batch_schema, rows).expect("batch");
db.ingest(batch)
.await
.expect("ingest triggers seal + flush");
assert_eq!(db.num_immutable_segments(), 0);
let sst_path = db_root
.join("sst")
.join("L0")
.join("00000000000000000001.parquet");
assert!(
sst_path.exists(),
"expected SST at {:?} to be created",
sst_path
);
Ok(())
}
#[test]
fn wal_segments_after_compaction_preserves_manifest_when_metadata_missing() {
let generator = FileIdGenerator::default();
let table_id = TableId::new(&generator);
let mut version = VersionState::empty(table_id);
let wal_a = WalSegmentRef::new(1, generator.generate(), 0, 0);
let wal_b = WalSegmentRef::new(2, generator.generate(), 0, 0);
let entry_missing_wal = SstEntry::new(SsTableId::new(1), None, None, Path::default(), None);
let entry_with_wal = SstEntry::new(
SsTableId::new(2),
None,
Some(vec![*wal_b.file_id()]),
Path::default(),
None,
);
version
.apply_edits(&[
VersionEdit::AddSsts {
level: 0,
entries: vec![entry_missing_wal, entry_with_wal],
},
VersionEdit::SetWalSegments {
segments: vec![wal_a.clone(), wal_b.clone()],
},
])
.expect("apply edits");
let wal_ids = orchestrator::wal_ids_for_remaining_ssts(&version, &HashSet::new(), &[]);
assert!(
wal_ids.is_none(),
"missing wal metadata on a remaining SST should preserve the manifest wal set"
);
let filtered = orchestrator::wal_segments_after_compaction(&version, &[], &[]);
assert!(
filtered.is_none(),
"compaction should not rewrite wal segments when metadata is absent"
);
}
#[test]
fn wal_segments_after_compaction_filters_and_tracks_obsolete() {
let generator = FileIdGenerator::default();
let table_id = TableId::new(&generator);
let mut version = VersionState::empty(table_id);
let wal_a = WalSegmentRef::new(1, generator.generate(), 0, 0);
let wal_b = WalSegmentRef::new(2, generator.generate(), 0, 0);
let wal_c = WalSegmentRef::new(3, generator.generate(), 0, 0);
let entry_a = SstEntry::new(
SsTableId::new(1),
None,
Some(vec![*wal_a.file_id(), *wal_b.file_id()]),
Path::default(),
None,
);
let entry_b = SstEntry::new(
SsTableId::new(2),
None,
Some(vec![*wal_c.file_id()]),
Path::default(),
None,
);
version
.apply_edits(&[
VersionEdit::AddSsts {
level: 0,
entries: vec![entry_a, entry_b],
},
VersionEdit::SetWalSegments {
segments: vec![wal_a.clone(), wal_b.clone(), wal_c.clone()],
},
])
.expect("apply edits");
let removed = vec![
SsTableDescriptor::new(SsTableId::new(1), 0)
.with_storage_paths(Path::from("L0/1.parquet"), None),
];
let added = vec![SsTableDescriptor::new(SsTableId::new(3), 0)];
assert_eq!(version.wal_segments().len(), 3);
assert!(
version.ssts()[0][0].wal_segments().is_some(),
"entry should carry wal segments"
);
assert!(
version.ssts()[0][1].wal_segments().is_some(),
"second entry should carry wal segments"
);
let removed_ids: HashSet<SsTableId> = removed.iter().map(|d| d.id().clone()).collect();
let wal_ids = orchestrator::wal_ids_for_remaining_ssts(&version, &removed_ids, &added);
assert!(
wal_ids.is_none(),
"wal ids should be None when any SST lacks wal metadata"
);
let filtered = orchestrator::wal_segments_after_compaction(&version, &removed, &added);
assert!(
filtered.is_none(),
"filtered wal segments should be None when any SST lacks wal metadata"
);
let outcome = CompactionOutcome {
add_ssts: Vec::new(),
remove_ssts: removed,
target_level: 0,
wal_segments: None,
tombstone_watermark: None,
outputs: added.clone(),
obsolete_sst_ids: Vec::new(),
wal_floor: None,
obsolete_wal_segments: vec![wal_a.clone(), wal_b.clone()],
};
let plan = orchestrator::gc_plan_from_outcome(&outcome)
.expect("gc plan")
.expect("plan present");
assert_eq!(plan.obsolete_wal_segments.len(), 2);
assert!(plan.obsolete_wal_segments.contains(&wal_a));
assert!(plan.obsolete_wal_segments.contains(&wal_b));
}
#[derive(Clone)]
struct StaticPlanner {
task: CompactionTask,
}
impl CompactionPlanner for StaticPlanner {
fn plan(&self, _snapshot: &CompactionSnapshot) -> Option<CompactionTask> {
Some(self.task.clone())
}
}
#[derive(Clone)]
struct OneShotPlanner {
task: CompactionTask,
planned: Arc<AtomicUsize>,
}
impl OneShotPlanner {
fn new(task: CompactionTask) -> Self {
Self {
task,
planned: Arc::new(AtomicUsize::new(0)),
}
}
}
impl CompactionPlanner for OneShotPlanner {
fn plan(&self, _snapshot: &CompactionSnapshot) -> Option<CompactionTask> {
if self.planned.fetch_add(1, Ordering::SeqCst) == 0 {
Some(self.task.clone())
} else {
None
}
}
}
#[derive(Clone)]
struct StaticExecutor {
outputs: Vec<SsTableDescriptor>,
wal_segments: Vec<WalSegmentRef>,
target_level: u32,
}
impl CompactionExecutor for StaticExecutor {
fn execute(
&self,
job: CompactionJob,
) -> Pin<Box<dyn MaybeSendFuture<Output = Result<CompactionOutcome, CompactionError>> + '_>>
{
let outputs = self.outputs.clone();
let wal_segments = self.wal_segments.clone();
let target_level = self.target_level;
Box::pin(async move {
let mut outcome = CompactionOutcome::from_outputs(
outputs.clone(),
job.inputs.clone(),
target_level,
Some(wal_segments),
)?;
outcome.outputs = outputs;
Ok(outcome)
})
}
fn cleanup_outputs<'a>(
&'a self,
_outputs: &'a [SsTableDescriptor],
) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), CompactionError>> + 'a>> {
Box::pin(async { Ok(()) })
}
}
#[derive(Clone)]
struct NoStatsExecutor {
output: SsTableDescriptor,
}
impl CompactionExecutor for NoStatsExecutor {
fn execute(
&self,
job: CompactionJob,
) -> Pin<Box<dyn MaybeSendFuture<Output = Result<CompactionOutcome, CompactionError>> + '_>>
{
let output = self.output.clone();
Box::pin(async move {
CompactionOutcome::from_outputs(
vec![output],
job.inputs.clone(),
job.task.target_level as u32,
None,
)
})
}
fn cleanup_outputs<'a>(
&'a self,
_outputs: &'a [SsTableDescriptor],
) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), CompactionError>> + 'a>> {
Box::pin(async { Ok(()) })
}
}
#[derive(Clone)]
struct FailingExecutor;
impl CompactionExecutor for FailingExecutor {
fn execute(
&self,
_job: CompactionJob,
) -> Pin<Box<dyn MaybeSendFuture<Output = Result<CompactionOutcome, CompactionError>> + '_>>
{
Box::pin(async { Err(CompactionError::NoInputs) })
}
fn cleanup_outputs<'a>(
&'a self,
_outputs: &'a [SsTableDescriptor],
) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), CompactionError>> + 'a>> {
Box::pin(async { Ok(()) })
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn compaction_updates_manifest_wal_and_records_gc_plan()
-> Result<(), Box<dyn std::error::Error>> {
let generator = FileIdGenerator::default();
let wal_a = WalSegmentRef::new(1, generator.generate(), 0, 0);
let wal_b = WalSegmentRef::new(2, generator.generate(), 0, 0);
let entry_a = SstEntry::new(
SsTableId::new(1),
None,
Some(vec![*wal_a.file_id(), *wal_b.file_id()]),
Path::from("L0/1.parquet"),
None,
);
let entry_b = SstEntry::new(
SsTableId::new(2),
None,
Some(vec![*wal_b.file_id()]),
Path::from("L0/2.parquet"),
None,
);
let db: DB<InMemoryFs, NoopExecutor> = DB::new(
SchemaBuilder::from_schema(Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
])))
.primary_key("id")
.build()
.expect("schema builder"),
Arc::new(NoopExecutor),
)
.await
.expect("db init");
db.inner()
.manifest
.apply_version_edits(
db.inner().manifest_table,
&[
VersionEdit::AddSsts {
level: 0,
entries: vec![entry_a, entry_b],
},
VersionEdit::SetWalSegments {
segments: vec![wal_a.clone(), wal_b.clone()],
},
],
)
.await
.expect("apply edits");
let task = CompactionTask {
source_level: 0,
target_level: 1,
input: vec![
CompactionInput {
level: 0,
sst_id: SsTableId::new(1),
},
CompactionInput {
level: 0,
sst_id: SsTableId::new(2),
},
],
key_range: None,
};
let planner = OneShotPlanner::new(task);
let output_desc = SsTableDescriptor::new(SsTableId::new(3), 1)
.with_wal_ids(Some(vec![*wal_b.file_id()]))
.with_storage_paths(Path::from("L1/3.parquet"), None);
let executor = StaticExecutor {
outputs: vec![output_desc],
wal_segments: vec![wal_b.clone()],
target_level: 1,
};
let outcome = db
.inner()
.run_compaction_task(&planner, &executor)
.await?
.expect("compaction outcome");
let snapshot = db
.inner()
.manifest
.snapshot_latest(db.inner().manifest_table)
.await?;
let latest = snapshot.latest_version.expect("latest version");
assert_eq!(latest.wal_segments().len(), 1);
assert_eq!(latest.wal_segments()[0].file_id(), wal_b.file_id());
assert_eq!(
outcome.obsolete_wal_segments,
vec![wal_a.clone()],
"should surface obsolete wal segments"
);
let _historical = db.snapshot_at(Timestamp::new(1)).await?;
let plan = db
.inner()
.manifest
.take_gc_plan_for_authorized_sweep_with_pins(
db.inner().manifest_table,
&db.inner().active_snapshot_pins(),
)
.await?
.expect("gc plan recorded");
assert_eq!(plan.obsolete_wal_segments.len(), 1);
assert_eq!(plan.obsolete_wal_segments[0].file_id(), wal_a.file_id());
assert_eq!(
plan.obsolete_ssts.len(),
0,
"active historical snapshots should keep replaced SSTs protected",
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn compaction_preserves_manifest_wal_when_metadata_missing()
-> Result<(), Box<dyn std::error::Error>> {
let generator = FileIdGenerator::default();
let wal_a = WalSegmentRef::new(1, generator.generate(), 0, 0);
let wal_b = WalSegmentRef::new(2, generator.generate(), 0, 0);
let entry_missing_wal = SstEntry::new(
SsTableId::new(1),
None,
None,
Path::from("L0/1.parquet"),
None,
);
let entry_with_wal = SstEntry::new(
SsTableId::new(2),
None,
Some(vec![*wal_b.file_id()]),
Path::from("L0/2.parquet"),
None,
);
let db: DB<InMemoryFs, NoopExecutor> = DB::new(
SchemaBuilder::from_schema(Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
])))
.primary_key("id")
.build()
.expect("schema builder"),
Arc::new(NoopExecutor),
)
.await
.expect("db init");
db.inner()
.manifest
.apply_version_edits(
db.inner().manifest_table,
&[
VersionEdit::AddSsts {
level: 0,
entries: vec![entry_missing_wal, entry_with_wal],
},
VersionEdit::SetWalSegments {
segments: vec![wal_a.clone(), wal_b.clone()],
},
],
)
.await
.expect("apply edits");
let task = CompactionTask {
source_level: 0,
target_level: 1,
input: vec![CompactionInput {
level: 0,
sst_id: SsTableId::new(2),
}],
key_range: None,
};
let planner = OneShotPlanner::new(task);
let output_desc = SsTableDescriptor::new(SsTableId::new(3), 1)
.with_wal_ids(Some(vec![*wal_b.file_id()]))
.with_storage_paths(Path::from("L1/3.parquet"), None);
let executor = StaticExecutor {
outputs: vec![output_desc],
wal_segments: vec![wal_b.clone()],
target_level: 1,
};
let outcome = db
.inner()
.run_compaction_task(&planner, &executor)
.await?
.expect("compaction outcome");
let snapshot = db
.inner()
.manifest
.snapshot_latest(db.inner().manifest_table)
.await?;
let latest = snapshot.latest_version.expect("latest version");
assert_eq!(
latest.wal_segments(),
&[wal_a.clone(), wal_b.clone()],
"manifest wal set should remain unchanged when wal metadata is missing",
);
assert!(
outcome.obsolete_wal_segments.is_empty(),
"should not surface obsolete wal segments when manifest set is preserved"
);
let _historical = db.snapshot_at(Timestamp::new(1)).await?;
assert!(
db.inner()
.manifest
.take_gc_plan_for_authorized_sweep_with_pins(
db.inner().manifest_table,
&db.inner().active_snapshot_pins(),
)
.await?
.is_none(),
"historically pinned SSTs should consume the staged GC plan when no WAL segments are \
obsolete",
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn compaction_e2e_merges_and_advances_wal_floor() -> Result<(), Box<dyn std::error::Error>> {
let temp_root = workspace_temp_dir("compaction-e2e-wal");
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
]));
let extractor =
crate::extractor::projection_for_field(Arc::clone(&schema), 0).expect("extractor");
let mode_cfg = SchemaBuilder::from_schema(Arc::clone(&schema))
.primary_key("id")
.build()
.expect("schema builder");
let fs: Arc<dyn DynFs> = Arc::new(LocalFs {});
let sst_root = temp_root.join("sst");
fs::create_dir_all(&sst_root)?;
let sst_root = Path::from_filesystem_path(&sst_root)?;
let sst_cfg = Arc::new(
SsTableConfig::new(Arc::clone(&schema), fs, sst_root).with_key_extractor(extractor.into()),
);
let wal_gen = FileIdGenerator::default();
let wal_a = WalSegmentRef::new(10, wal_gen.generate(), 0, 0);
let wal_b = WalSegmentRef::new(11, wal_gen.generate(), 0, 0);
let batch_a = build_batch(
Arc::clone(&schema),
vec![DynRow(vec![
Some(DynCell::Str("a".into())),
Some(DynCell::I32(1)),
])],
)?;
let imm_a = crate::inmem::immutable::memtable::segment_from_batch_with_key_name(batch_a, "id")?;
let mut builder_a = SsTableBuilder::new(
Arc::clone(&sst_cfg),
SsTableDescriptor::new(SsTableId::new(1), 0),
);
builder_a.add_immutable(&imm_a)?;
let sst_a = builder_a.finish(NoopExecutor).await?;
let desc_a = sst_a
.descriptor()
.clone()
.with_wal_ids(Some(vec![*wal_a.file_id()]));
let batch_b = build_batch(
Arc::clone(&schema),
vec![DynRow(vec![
Some(DynCell::Str("b".into())),
Some(DynCell::I32(2)),
])],
)?;
let imm_b = crate::inmem::immutable::memtable::segment_from_batch_with_key_name(batch_b, "id")?;
let mut builder_b = SsTableBuilder::new(
Arc::clone(&sst_cfg),
SsTableDescriptor::new(SsTableId::new(2), 0),
);
builder_b.add_immutable(&imm_b)?;
let sst_b = builder_b.finish(NoopExecutor).await?;
let desc_b = sst_b
.descriptor()
.clone()
.with_wal_ids(Some(vec![*wal_b.file_id()]));
let db: DB<InMemoryFs, NoopExecutor> = DB::new(mode_cfg, Arc::new(NoopExecutor)).await?;
let entry_a = SstEntry::new(
desc_a.id().clone(),
desc_a.stats().cloned(),
desc_a.wal_ids().map(|ids| ids.to_vec()),
desc_a
.data_path()
.expect("input descriptor missing data path")
.clone(),
desc_a.delete_path().cloned(),
);
let entry_b = SstEntry::new(
desc_b.id().clone(),
desc_b.stats().cloned(),
desc_b.wal_ids().map(|ids| ids.to_vec()),
desc_b
.data_path()
.expect("input descriptor missing data path")
.clone(),
desc_b.delete_path().cloned(),
);
db.inner()
.manifest
.apply_version_edits(
db.inner().manifest_table,
&[
VersionEdit::AddSsts {
level: 0,
entries: vec![entry_a, entry_b],
},
VersionEdit::SetWalSegments {
segments: vec![wal_a.clone(), wal_b.clone()],
},
],
)
.await?;
let task = CompactionTask {
source_level: 0,
target_level: 1,
input: vec![
CompactionInput {
level: 0,
sst_id: desc_a.id().clone(),
},
CompactionInput {
level: 0,
sst_id: desc_b.id().clone(),
},
],
key_range: None,
};
let planner = OneShotPlanner::new(task);
let executor = LocalCompactionExecutor::new(Arc::clone(&sst_cfg), 100)
.with_max_output_bytes(8 * 1024 * 1024);
let outcome = db
.inner()
.run_compaction_task(&planner, &executor)
.await?
.expect("compaction outcome");
assert_eq!(outcome.remove_ssts.len(), 2);
assert_eq!(outcome.add_ssts.len(), 1);
assert_eq!(
outcome
.add_ssts
.first()
.and_then(|e| e.wal_segments())
.map(|ids| ids.len()),
Some(2),
"output should aggregate wal ids"
);
assert_eq!(
outcome.wal_segments.as_ref().map(|segments| segments.len()),
Some(2),
"manifest wal set should retain gap when inputs have discontinuity"
);
assert_eq!(
outcome.wal_floor.as_ref().map(|w| w.seq()),
Some(wal_a.seq()),
"wal floor should reflect first retained segment when gaps exist"
);
assert_eq!(
outcome.obsolete_wal_segments.len(),
0,
"no wal segments should be obsolete when gaps are retained"
);
let snapshot = db
.inner()
.manifest
.snapshot_latest(db.inner().manifest_table)
.await?;
let latest = snapshot.latest_version.expect("latest version");
assert_eq!(latest.wal_segments().len(), 2);
let wal_ids: HashSet<_> = latest
.wal_segments()
.iter()
.map(|seg| *seg.file_id())
.collect();
assert!(wal_ids.contains(wal_a.file_id()));
assert!(wal_ids.contains(wal_b.file_id()));
assert_eq!(
latest.wal_floor().as_ref().map(|w| w.file_id()),
Some(wal_a.file_id()),
"wal floor should align to first retained segment"
);
let _historical = db.snapshot_at(Timestamp::new(1)).await?;
assert!(
db.inner()
.manifest
.take_gc_plan_for_authorized_sweep_with_pins(
db.inner().manifest_table,
&db.inner().active_snapshot_pins(),
)
.await?
.is_none(),
"active historical snapshots should keep replaced SSTs out of the authorized sweep set",
);
fs::remove_dir_all(&temp_root)?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn blocked_gc_candidates_survive_later_compactions_that_stage_more_work()
-> Result<(), Box<dyn std::error::Error>> {
let db: DB<InMemoryFs, NoopExecutor> = DB::new(
SchemaBuilder::from_schema(Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
])))
.primary_key("id")
.build()
.expect("schema builder"),
Arc::new(NoopExecutor),
)
.await
.expect("db init");
db.inner()
.manifest
.apply_version_edits(
db.inner().manifest_table,
&[VersionEdit::AddSsts {
level: 0,
entries: vec![
SstEntry::new(
SsTableId::new(1),
None,
None,
Path::from("L0/1.parquet"),
None,
),
SstEntry::new(
SsTableId::new(2),
None,
None,
Path::from("L0/2.parquet"),
None,
),
],
}],
)
.await?;
let _historical = db.snapshot_at(Timestamp::new(1)).await?;
let first = db
.inner()
.run_compaction_task(
&OneShotPlanner::new(CompactionTask {
source_level: 0,
target_level: 1,
input: vec![CompactionInput {
level: 0,
sst_id: SsTableId::new(1),
}],
key_range: None,
}),
&StaticExecutor {
outputs: vec![
SsTableDescriptor::new(SsTableId::new(3), 1)
.with_storage_paths(Path::from("L1/3.parquet"), None),
],
wal_segments: Vec::new(),
target_level: 1,
},
)
.await?
.expect("first compaction outcome");
assert_eq!(first.remove_ssts.len(), 1);
let second = db
.inner()
.run_compaction_task(
&OneShotPlanner::new(CompactionTask {
source_level: 0,
target_level: 1,
input: vec![CompactionInput {
level: 0,
sst_id: SsTableId::new(2),
}],
key_range: None,
}),
&StaticExecutor {
outputs: vec![
SsTableDescriptor::new(SsTableId::new(4), 1)
.with_storage_paths(Path::from("L1/4.parquet"), None),
],
wal_segments: Vec::new(),
target_level: 1,
},
)
.await?
.expect("second compaction outcome");
assert_eq!(second.remove_ssts.len(), 1);
let inspection = db
.inspect_sst_gc_plan()
.await?
.expect("blocked gc plan should remain staged");
assert_eq!(inspection.staged_sst_candidates, 2);
assert_eq!(inspection.authorized_sst_candidates, 0);
assert_eq!(inspection.blocked_sst_candidates, 2);
assert_eq!(
inspection
.candidates
.iter()
.map(|candidate| candidate.sst_id)
.collect::<Vec<_>>(),
vec![1, 2]
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn authorized_gc_plan_for_sweep_filters_reachable_ssts_from_current_root_set()
-> Result<(), Box<dyn std::error::Error>> {
let db: DB<InMemoryFs, NoopExecutor> = DB::new(
SchemaBuilder::from_schema(Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
])))
.primary_key("id")
.build()
.expect("schema builder"),
Arc::new(NoopExecutor),
)
.await
.expect("db init");
db.inner()
.manifest
.apply_version_edits(
db.inner().manifest_table,
&[VersionEdit::AddSsts {
level: 1,
entries: vec![SstEntry::new(
SsTableId::new(1),
None,
None,
Path::from("L1/1.parquet"),
None,
)],
}],
)
.await?;
db.inner()
.manifest
.record_gc_plan(
db.inner().manifest_table,
crate::manifest::GcPlanState {
obsolete_ssts: vec![
crate::manifest::GcSstRef {
id: SsTableId::new(1),
level: 1,
data_path: Path::from("L1/1.parquet"),
delete_path: None,
},
crate::manifest::GcSstRef {
id: SsTableId::new(99),
level: 0,
data_path: Path::from("L0/99.parquet"),
delete_path: None,
},
],
obsolete_wal_segments: Vec::new(),
},
)
.await?;
let plan = db
.inner()
.manifest
.take_gc_plan_for_authorized_sweep(db.inner().manifest_table)
.await?
.expect("filtered gc plan");
assert_eq!(plan.obsolete_ssts.len(), 1);
assert_eq!(plan.obsolete_ssts[0].id, SsTableId::new(99));
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn authorized_gc_plan_for_sweep_drops_sst_only_plan_when_candidate_is_live()
-> Result<(), Box<dyn std::error::Error>> {
let db: DB<InMemoryFs, NoopExecutor> = DB::new(
SchemaBuilder::from_schema(Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
])))
.primary_key("id")
.build()
.expect("schema builder"),
Arc::new(NoopExecutor),
)
.await
.expect("db init");
db.inner()
.manifest
.apply_version_edits(
db.inner().manifest_table,
&[VersionEdit::AddSsts {
level: 2,
entries: vec![SstEntry::new(
SsTableId::new(7),
None,
None,
Path::from("L2/7.parquet"),
None,
)],
}],
)
.await?;
db.inner()
.manifest
.record_gc_plan(
db.inner().manifest_table,
crate::manifest::GcPlanState {
obsolete_ssts: vec![crate::manifest::GcSstRef {
id: SsTableId::new(7),
level: 2,
data_path: Path::from("L2/7.parquet"),
delete_path: None,
}],
obsolete_wal_segments: Vec::new(),
},
)
.await?;
assert!(
db.inner()
.manifest
.take_gc_plan_for_authorized_sweep(db.inner().manifest_table)
.await?
.is_none(),
"root-set filtering should not authorize deleting a live SST",
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn authorized_gc_plan_for_sweep_rechecks_root_set_after_candidate_becomes_live()
-> Result<(), Box<dyn std::error::Error>> {
let db: DB<InMemoryFs, NoopExecutor> = DB::new(
SchemaBuilder::from_schema(Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
])))
.primary_key("id")
.build()
.expect("schema builder"),
Arc::new(NoopExecutor),
)
.await
.expect("db init");
db.inner()
.manifest
.record_gc_plan(
db.inner().manifest_table,
crate::manifest::GcPlanState {
obsolete_ssts: vec![crate::manifest::GcSstRef {
id: SsTableId::new(42),
level: 1,
data_path: Path::from("L1/42.parquet"),
delete_path: None,
}],
obsolete_wal_segments: Vec::new(),
},
)
.await?;
db.inner()
.manifest
.apply_version_edits(
db.inner().manifest_table,
&[VersionEdit::AddSsts {
level: 1,
entries: vec![SstEntry::new(
SsTableId::new(42),
None,
None,
Path::from("L1/42.parquet"),
None,
)],
}],
)
.await?;
assert!(
db.inner()
.manifest
.take_gc_plan_for_authorized_sweep(db.inner().manifest_table)
.await?
.is_none(),
"sweep authorization must reject a staged SST candidate that is live in the current root \
set",
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn authorized_gc_plan_for_sweep_preserves_historically_pinned_sst_objects()
-> Result<(), Box<dyn std::error::Error>> {
let db: DB<InMemoryFs, NoopExecutor> = DB::new(
SchemaBuilder::from_schema(Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
])))
.primary_key("id")
.build()
.expect("schema builder"),
Arc::new(NoopExecutor),
)
.await
.expect("db init");
let historical_data = Path::from("L0/5.parquet");
let historical_delete = Path::from("L0/5.delete.parquet");
let head_data = Path::from("L1/6.parquet");
db.inner()
.manifest
.apply_version_edits(
db.inner().manifest_table,
&[VersionEdit::AddSsts {
level: 0,
entries: vec![SstEntry::new(
SsTableId::new(5),
None,
None,
historical_data.clone(),
Some(historical_delete.clone()),
)],
}],
)
.await?;
db.inner()
.manifest
.apply_version_edits(
db.inner().manifest_table,
&[
VersionEdit::RemoveSsts {
level: 0,
sst_ids: vec![SsTableId::new(5)],
},
VersionEdit::AddSsts {
level: 1,
entries: vec![SstEntry::new(
SsTableId::new(6),
None,
None,
head_data.clone(),
None,
)],
},
],
)
.await?;
let _historical = db.snapshot_at(Timestamp::new(1)).await?;
let root_set = db
.inner()
.manifest
.current_root_set_with_pins(
db.inner().manifest_table,
&db.inner().active_snapshot_pins(),
)
.await?;
assert_eq!(root_set.protected_version_count(), 2);
assert_eq!(root_set.protected_object_count(), 3);
db.inner()
.manifest
.record_gc_plan(
db.inner().manifest_table,
crate::manifest::GcPlanState {
obsolete_ssts: vec![crate::manifest::GcSstRef {
id: SsTableId::new(5),
level: 0,
data_path: historical_data,
delete_path: Some(historical_delete),
}],
obsolete_wal_segments: Vec::new(),
},
)
.await?;
assert!(
db.inner()
.manifest
.take_gc_plan_for_authorized_sweep_with_pins(
db.inner().manifest_table,
&db.inner().active_snapshot_pins(),
)
.await?
.is_none(),
"active historical snapshots must keep their SST objects protected",
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn snapshot_at_reports_oldest_available_version_when_request_predates_history()
-> Result<(), Box<dyn std::error::Error>> {
let db: DB<InMemoryFs, NoopExecutor> = DB::new(
SchemaBuilder::from_schema(Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
])))
.primary_key("id")
.build()
.expect("schema builder"),
Arc::new(NoopExecutor),
)
.await
.expect("db init");
for (sst_id, level) in [(1u64, 0u32), (2, 1), (3, 2)] {
db.inner()
.manifest
.apply_version_edits(
db.inner().manifest_table,
&[VersionEdit::AddSsts {
level,
entries: vec![SstEntry::new(
SsTableId::new(sst_id),
None,
None,
Path::from(format!("L{level}/{sst_id}.parquet")),
None,
)],
}],
)
.await?;
}
let err = db
.snapshot_at(Timestamp::new(0))
.await
.expect_err("timestamp before the first committed version should be unavailable");
assert!(matches!(
err,
crate::transaction::SnapshotError::Manifest(ManifestError::VersionUnavailable {
requested,
oldest_available,
}) if requested == Timestamp::new(0) && oldest_available == Timestamp::new(1)
));
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn historical_snapshot_remains_stable_after_newer_manifest_commits()
-> Result<(), Box<dyn std::error::Error>> {
let db: DB<InMemoryFs, NoopExecutor> = DB::new(
SchemaBuilder::from_schema(Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
])))
.primary_key("id")
.build()
.expect("schema builder"),
Arc::new(NoopExecutor),
)
.await
.expect("db init");
db.inner()
.manifest
.apply_version_edits(
db.inner().manifest_table,
&[VersionEdit::AddSsts {
level: 0,
entries: vec![SstEntry::new(
SsTableId::new(1),
None,
None,
Path::from("L0/1.parquet"),
None,
)],
}],
)
.await?;
let historical = db.snapshot_at(Timestamp::new(1)).await?;
db.inner()
.manifest
.apply_version_edits(
db.inner().manifest_table,
&[
VersionEdit::RemoveSsts {
level: 0,
sst_ids: vec![SsTableId::new(1)],
},
VersionEdit::AddSsts {
level: 1,
entries: vec![SstEntry::new(
SsTableId::new(2),
None,
None,
Path::from("L1/2.parquet"),
None,
)],
},
],
)
.await?;
assert_eq!(
historical
.latest_version()
.map(VersionState::commit_timestamp),
Some(Timestamp::new(1))
);
assert_eq!(db.inner().active_snapshot_pins(), vec![Timestamp::new(1)]);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn multiple_concurrent_snapshots_pin_multiple_manifest_versions()
-> Result<(), Box<dyn std::error::Error>> {
let db: DB<InMemoryFs, NoopExecutor> = DB::new(
SchemaBuilder::from_schema(Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
])))
.primary_key("id")
.build()
.expect("schema builder"),
Arc::new(NoopExecutor),
)
.await
.expect("db init");
for (sst_id, level) in [(1u64, 0u32), (2, 1)] {
db.inner()
.manifest
.apply_version_edits(
db.inner().manifest_table,
&[VersionEdit::AddSsts {
level,
entries: vec![SstEntry::new(
SsTableId::new(sst_id),
None,
None,
Path::from(format!("L{level}/{sst_id}.parquet")),
None,
)],
}],
)
.await?;
}
let _v1 = db.snapshot_at(Timestamp::new(1)).await?;
let _v2 = db.begin_snapshot().await?;
let root_set = db
.inner()
.manifest
.current_root_set_with_pins(
db.inner().manifest_table,
&db.inner().active_snapshot_pins(),
)
.await?;
assert_eq!(root_set.protected_version_count(), 2);
assert_eq!(root_set.active_snapshot_version_count(), 2);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn latest_snapshot_pinning_tracks_head_without_regressing_reads()
-> Result<(), Box<dyn std::error::Error>> {
let db: DB<InMemoryFs, NoopExecutor> = DB::new(
SchemaBuilder::from_schema(Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
])))
.primary_key("id")
.build()
.expect("schema builder"),
Arc::new(NoopExecutor),
)
.await
.expect("db init");
db.inner()
.manifest
.apply_version_edits(
db.inner().manifest_table,
&[VersionEdit::AddSsts {
level: 0,
entries: vec![SstEntry::new(
SsTableId::new(1),
None,
None,
Path::from("L0/1.parquet"),
None,
)],
}],
)
.await?;
let latest = db.begin_snapshot().await?;
assert_eq!(
latest.latest_version().map(VersionState::commit_timestamp),
Some(Timestamp::new(1))
);
assert_eq!(db.inner().active_snapshot_pins(), vec![Timestamp::new(1)]);
drop(latest);
assert!(db.inner().active_snapshot_pins().is_empty());
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn sweep_manifest_ssts_reclaims_obsolete_candidate_after_snapshot_pin_drops()
-> Result<(), Box<dyn std::error::Error>> {
let root_dir = workspace_temp_dir("snapshot-pin-sst-reclaim");
let root_str = root_dir.to_string_lossy().into_owned();
let executor = Arc::new(TokioExecutor::default());
let db = DB::<LocalFs, TokioExecutor>::builder(config_with_pk(
vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
],
&["id"],
))
.on_disk(root_str.clone())?
.open_with_executor(Arc::clone(&executor))
.await?
.into_inner();
let obsolete_path = local_sst_path(&db, "L0/obsolete.parquet");
let head_path = local_sst_path(&db, "L1/head.parquet");
write_local_sst_object(&obsolete_path, b"obsolete").await?;
write_local_sst_object(&head_path, b"head").await?;
db.manifest
.apply_version_edits(
db.manifest_table,
&[VersionEdit::AddSsts {
level: 0,
entries: vec![SstEntry::new(
SsTableId::new(5),
None,
None,
Path::from("L0/obsolete.parquet"),
None,
)],
}],
)
.await?;
db.manifest
.apply_version_edits(
db.manifest_table,
&[
VersionEdit::RemoveSsts {
level: 0,
sst_ids: vec![SsTableId::new(5)],
},
VersionEdit::AddSsts {
level: 1,
entries: vec![SstEntry::new(
SsTableId::new(6),
None,
None,
Path::from("L1/head.parquet"),
None,
)],
},
],
)
.await?;
let historical = db.snapshot_at(Timestamp::new(1)).await?;
let root_set = db
.manifest
.current_root_set_with_pins(db.manifest_table, &db.active_snapshot_pins())
.await?;
assert_eq!(root_set.protected_version_count(), 2);
assert_eq!(root_set.active_snapshot_version_count(), 1);
assert!(root_set.contains_path(&Path::from("L0/obsolete.parquet")));
assert!(root_set.contains_path(&Path::from("L1/head.parquet")));
db.manifest
.record_gc_plan(
db.manifest_table,
crate::manifest::GcPlanState {
obsolete_ssts: vec![crate::manifest::GcSstRef {
id: SsTableId::new(5),
level: 0,
data_path: Path::from("L0/obsolete.parquet"),
delete_path: None,
}],
obsolete_wal_segments: Vec::new(),
},
)
.await?;
let blocked = db.sweep_manifest_ssts().await?;
assert_eq!(blocked.deleted_objects, 0);
assert_eq!(blocked.deleted_bytes, 0);
assert!(local_sst_exists(&obsolete_path).await);
drop(historical);
let root_set = db
.manifest
.current_root_set_with_pins(db.manifest_table, &db.active_snapshot_pins())
.await?;
assert_eq!(root_set.protected_version_count(), 1);
assert!(!root_set.contains_path(&Path::from("L0/obsolete.parquet")));
let summary = db.sweep_manifest_ssts().await?;
assert_eq!(summary.deleted_objects, 1);
assert_eq!(summary.deleted_bytes, 8);
assert_eq!(summary.delete_failures, 0);
assert!(!local_sst_exists(&obsolete_path).await);
assert!(local_sst_exists(&head_path).await);
assert!(
db.manifest
.take_gc_plan_for_authorized_sweep(db.manifest_table)
.await?
.is_none(),
"reclaimed candidate should fully drain the staged plan",
);
if let Err(err) = fs::remove_dir_all(&root_dir) {
eprintln!("failed to clean temp dir {:?}: {err}", &root_dir);
}
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn sweep_manifest_ssts_requeues_historically_blocked_candidates_and_reports_status()
-> Result<(), Box<dyn std::error::Error>> {
let db: DB<InMemoryFs, NoopExecutor> = DB::new(
SchemaBuilder::from_schema(Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
])))
.primary_key("id")
.build()
.expect("schema builder"),
Arc::new(NoopExecutor),
)
.await
.expect("db init");
db.inner()
.manifest
.apply_version_edits(
db.inner().manifest_table,
&[VersionEdit::AddSsts {
level: 0,
entries: vec![SstEntry::new(
SsTableId::new(5),
None,
None,
Path::from("L0/5.parquet"),
Some(Path::from("L0/5.delete.parquet")),
)],
}],
)
.await?;
db.inner()
.manifest
.apply_version_edits(
db.inner().manifest_table,
&[
VersionEdit::RemoveSsts {
level: 0,
sst_ids: vec![SsTableId::new(5)],
},
VersionEdit::AddSsts {
level: 1,
entries: vec![SstEntry::new(
SsTableId::new(6),
None,
None,
Path::from("L1/6.parquet"),
None,
)],
},
],
)
.await?;
db.inner()
.manifest
.record_gc_plan(
db.inner().manifest_table,
crate::manifest::GcPlanState {
obsolete_ssts: vec![crate::manifest::GcSstRef {
id: SsTableId::new(5),
level: 0,
data_path: Path::from("L0/5.parquet"),
delete_path: Some(Path::from("L0/5.delete.parquet")),
}],
obsolete_wal_segments: Vec::new(),
},
)
.await?;
let _historical = db.snapshot_at(Timestamp::new(1)).await?;
let summary = db.inner().sweep_manifest_ssts().await?;
assert_eq!(summary.deleted_objects, 0);
assert_eq!(summary.deleted_bytes, 0);
assert_eq!(summary.delete_failures, 0);
let status = db
.sst_gc_status()
.await?
.expect("blocked gc candidates should stay staged");
assert_eq!(status.staged_sst_candidates, 1);
assert_eq!(status.authorized_sst_candidates, 0);
assert_eq!(status.blocked_sst_candidates, 1);
assert_eq!(status.protected_versions, 2);
assert_eq!(status.active_snapshot_versions, 1);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn sweep_manifest_ssts_deletes_only_unreachable_objects_and_records_metrics()
-> Result<(), Box<dyn std::error::Error>> {
let temp_root = workspace_temp_dir("sst-sweep-live-vs-unreachable");
let root_str = temp_root.to_string_lossy().into_owned();
let mut db = DB::<LocalFs, TokioExecutor>::builder(config_with_pk(
vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
],
&["id"],
))
.on_disk(root_str)?
.open_with_executor(Arc::new(TokioExecutor::default()))
.await?
.into_inner();
let metrics = Arc::new(CompactionMetrics::new());
db.compaction_metrics = Some(Arc::clone(&metrics));
db.manifest
.apply_version_edits(
db.manifest_table,
&[VersionEdit::AddSsts {
level: 1,
entries: vec![SstEntry::new(
SsTableId::new(1),
None,
None,
Path::from("L1/live.parquet"),
None,
)],
}],
)
.await?;
let live_path = local_sst_path(&db, "L1/live.parquet");
let obsolete_data = local_sst_path(&db, "L0/obsolete.parquet");
let obsolete_delete = local_sst_path(&db, "L0/obsolete.delete.parquet");
write_local_sst_object(&live_path, b"live").await?;
write_local_sst_object(&obsolete_data, b"obsolete-data").await?;
write_local_sst_object(&obsolete_delete, b"obsolete-delete").await?;
db.manifest
.record_gc_plan(
db.manifest_table,
crate::manifest::GcPlanState {
obsolete_ssts: vec![
crate::manifest::GcSstRef {
id: SsTableId::new(1),
level: 1,
data_path: Path::from("L1/live.parquet"),
delete_path: None,
},
crate::manifest::GcSstRef {
id: SsTableId::new(9),
level: 0,
data_path: Path::from("L0/obsolete.parquet"),
delete_path: Some(Path::from("L0/obsolete.delete.parquet")),
},
],
obsolete_wal_segments: Vec::new(),
},
)
.await?;
let summary = db.sweep_manifest_ssts().await?;
assert_eq!(summary.deleted_objects, 2);
assert_eq!(summary.deleted_bytes, 28);
assert_eq!(summary.delete_failures, 0);
assert!(local_sst_exists(&live_path).await);
assert!(!local_sst_exists(&obsolete_data).await);
assert!(!local_sst_exists(&obsolete_delete).await);
let snapshot = metrics.snapshot();
assert_eq!(snapshot.sst_sweep_runs, 1);
assert_eq!(snapshot.sst_deleted_objects, 2);
assert_eq!(snapshot.sst_deleted_bytes, 28);
assert_eq!(snapshot.sst_delete_failures, 0);
assert!(snapshot.sst_sweep_duration_ms_total >= summary.duration_ms);
fs::remove_dir_all(temp_root)?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn sweep_manifest_ssts_is_idempotent_when_candidates_are_replayed()
-> Result<(), Box<dyn std::error::Error>> {
let temp_root = workspace_temp_dir("sst-sweep-idempotent");
let root_str = temp_root.to_string_lossy().into_owned();
let db = DB::<LocalFs, TokioExecutor>::builder(config_with_pk(
vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
],
&["id"],
))
.on_disk(root_str)?
.open_with_executor(Arc::new(TokioExecutor::default()))
.await?
.into_inner();
let obsolete_data = local_sst_path(&db, "L0/replay.parquet");
let obsolete_delete = local_sst_path(&db, "L0/replay.delete.parquet");
write_local_sst_object(&obsolete_data, b"payload").await?;
write_local_sst_object(&obsolete_delete, b"delete").await?;
let gc_plan = crate::manifest::GcPlanState {
obsolete_ssts: vec![crate::manifest::GcSstRef {
id: SsTableId::new(11),
level: 0,
data_path: Path::from("L0/replay.parquet"),
delete_path: Some(Path::from("L0/replay.delete.parquet")),
}],
obsolete_wal_segments: Vec::new(),
};
db.manifest
.record_gc_plan(db.manifest_table, gc_plan.clone())
.await?;
let first = db.sweep_manifest_ssts().await?;
assert_eq!(first.deleted_objects, 2);
assert_eq!(first.deleted_bytes, 13);
assert_eq!(first.delete_failures, 0);
db.manifest
.record_gc_plan(db.manifest_table, gc_plan)
.await?;
let second = db.sweep_manifest_ssts().await?;
assert_eq!(second.deleted_objects, 0);
assert_eq!(second.deleted_bytes, 0);
assert_eq!(second.delete_failures, 0);
fs::remove_dir_all(temp_root)?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn sweep_manifest_ssts_preserves_live_objects_across_repeated_replays()
-> Result<(), Box<dyn std::error::Error>> {
let temp_root = workspace_temp_dir("sst-sweep-live-replayed");
let root_str = temp_root.to_string_lossy().into_owned();
let db = DB::<LocalFs, TokioExecutor>::builder(config_with_pk(
vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
],
&["id"],
))
.on_disk(root_str)?
.open_with_executor(Arc::new(TokioExecutor::default()))
.await?
.into_inner();
db.manifest
.apply_version_edits(
db.manifest_table,
&[VersionEdit::AddSsts {
level: 1,
entries: vec![SstEntry::new(
SsTableId::new(1),
None,
None,
Path::from("L1/live.parquet"),
None,
)],
}],
)
.await?;
let live_path = local_sst_path(&db, "L1/live.parquet");
let obsolete_path = local_sst_path(&db, "L0/obsolete.parquet");
write_local_sst_object(&live_path, b"live").await?;
write_local_sst_object(&obsolete_path, b"obsolete").await?;
let replayed_gc_plan = crate::manifest::GcPlanState {
obsolete_ssts: vec![
crate::manifest::GcSstRef {
id: SsTableId::new(1),
level: 1,
data_path: Path::from("L1/live.parquet"),
delete_path: None,
},
crate::manifest::GcSstRef {
id: SsTableId::new(9),
level: 0,
data_path: Path::from("L0/obsolete.parquet"),
delete_path: None,
},
],
obsolete_wal_segments: Vec::new(),
};
db.manifest
.record_gc_plan(db.manifest_table, replayed_gc_plan.clone())
.await?;
let first = db.sweep_manifest_ssts().await?;
assert_eq!(first.deleted_objects, 1);
assert_eq!(first.deleted_bytes, 8);
assert_eq!(first.delete_failures, 0);
assert!(local_sst_exists(&live_path).await);
assert!(!local_sst_exists(&obsolete_path).await);
db.manifest
.record_gc_plan(db.manifest_table, replayed_gc_plan)
.await?;
let second = db.sweep_manifest_ssts().await?;
assert_eq!(second.deleted_objects, 0);
assert_eq!(second.deleted_bytes, 0);
assert_eq!(second.delete_failures, 0);
assert!(local_sst_exists(&live_path).await);
fs::remove_dir_all(temp_root)?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn merge_gc_plan_preserves_prior_candidates_and_deduplicates()
-> Result<(), Box<dyn std::error::Error>> {
let db: DB<InMemoryFs, NoopExecutor> = DB::new(
SchemaBuilder::from_schema(Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
])))
.primary_key("id")
.build()
.expect("schema builder"),
Arc::new(NoopExecutor),
)
.await
.expect("db init");
db.inner()
.manifest
.merge_gc_plan(
db.inner().manifest_table,
crate::manifest::GcPlanState {
obsolete_ssts: vec![
crate::manifest::GcSstRef {
id: SsTableId::new(1),
level: 0,
data_path: Path::from("L0/1.parquet"),
delete_path: None,
},
crate::manifest::GcSstRef {
id: SsTableId::new(2),
level: 1,
data_path: Path::from("L1/2.parquet"),
delete_path: Some(Path::from("L1/2.delete.parquet")),
},
],
obsolete_wal_segments: Vec::new(),
},
)
.await?;
let first = db
.inspect_sst_gc_plan()
.await?
.expect("first persisted plan");
assert_eq!(first.staged_sst_candidates, 2);
assert_eq!(
first
.candidates
.iter()
.map(|candidate| candidate.sst_id)
.collect::<Vec<_>>(),
vec![1, 2]
);
db.inner()
.manifest
.merge_gc_plan(
db.inner().manifest_table,
crate::manifest::GcPlanState {
obsolete_ssts: vec![
crate::manifest::GcSstRef {
id: SsTableId::new(2),
level: 1,
data_path: Path::from("L1/2.parquet"),
delete_path: Some(Path::from("L1/2.delete.parquet")),
},
crate::manifest::GcSstRef {
id: SsTableId::new(9),
level: 2,
data_path: Path::from("L2/9.parquet"),
delete_path: None,
},
],
obsolete_wal_segments: vec![WalSegmentRef::new(
7,
FileIdGenerator::default().generate(),
0,
0,
)],
},
)
.await?;
let second = db
.inspect_sst_gc_plan()
.await?
.expect("second persisted plan");
assert_eq!(second.staged_sst_candidates, 3);
assert_eq!(
second
.candidates
.iter()
.map(|candidate| candidate.sst_id)
.collect::<Vec<_>>(),
vec![1, 2, 9]
);
assert_eq!(second.obsolete_wal_segments, 1);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn physical_stale_sst_debt_can_exist_while_persisted_gc_plan_is_empty()
-> Result<(), Box<dyn std::error::Error>> {
let root_dir = workspace_temp_dir("physical-stale-debt-plan-empty");
let root_str = root_dir.to_string_lossy().into_owned();
let db = DB::<LocalFs, TokioExecutor>::builder(config_with_pk(
vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
],
&["id"],
))
.on_disk(root_str)?
.open_with_executor(Arc::new(TokioExecutor::default()))
.await?
.into_inner();
db.manifest
.apply_version_edits(
db.manifest_table,
&[VersionEdit::AddSsts {
level: 1,
entries: vec![SstEntry::new(
SsTableId::new(7),
None,
None,
Path::from("L1/live.parquet"),
None,
)],
}],
)
.await?;
let live_path = local_sst_path(&db, "L1/live.parquet");
let stale_path = local_sst_path(&db, "L0/stale.parquet");
write_local_sst_object(&live_path, b"live").await?;
write_local_sst_object(&stale_path, b"stale").await?;
let physical_sst_objects = count_local_sst_objects(&root_dir)?;
let live_sst_objects = db
.manifest
.snapshot_latest(db.manifest_table)
.await?
.latest_version
.expect("latest version")
.ssts()
.iter()
.map(Vec::len)
.sum::<usize>();
assert_eq!(physical_sst_objects, 2);
assert_eq!(live_sst_objects, 1);
assert!(
db.inspect_sst_gc_plan().await?.is_none(),
"no persisted GC plan should still be visible in this scenario",
);
fs::remove_dir_all(root_dir)?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn compaction_can_drain_gc_plan_before_later_observation_point()
-> Result<(), Box<dyn std::error::Error>> {
let root_dir = workspace_temp_dir("compaction-drains-gc-plan-early");
let root_str = root_dir.to_string_lossy().into_owned();
let mut inner = DB::<LocalFs, TokioExecutor>::builder(config_with_pk(
vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
],
&["id"],
))
.on_disk(root_str)?
.open_with_executor(Arc::new(TokioExecutor::default()))
.await?
.into_inner();
let metrics = Arc::new(CompactionMetrics::new());
inner.compaction_metrics = Some(Arc::clone(&metrics));
inner
.manifest
.apply_version_edits(
inner.manifest_table,
&[VersionEdit::AddSsts {
level: 0,
entries: vec![
SstEntry::new(
SsTableId::new(1),
None,
None,
Path::from("L0/1.parquet"),
None,
),
SstEntry::new(
SsTableId::new(2),
None,
None,
Path::from("L0/2.parquet"),
None,
),
],
}],
)
.await?;
let obsolete_one = local_sst_path(&inner, "L0/1.parquet");
let obsolete_two = local_sst_path(&inner, "L0/2.parquet");
write_local_sst_object(&obsolete_one, b"one").await?;
write_local_sst_object(&obsolete_two, b"two").await?;
let planner = OneShotPlanner::new(CompactionTask {
source_level: 0,
target_level: 1,
input: vec![
CompactionInput {
level: 0,
sst_id: SsTableId::new(1),
},
CompactionInput {
level: 0,
sst_id: SsTableId::new(2),
},
],
key_range: None,
});
let executor = StaticExecutor {
outputs: vec![
SsTableDescriptor::new(SsTableId::new(3), 1)
.with_storage_paths(Path::from("L1/3.parquet"), None),
],
wal_segments: Vec::new(),
target_level: 1,
};
inner
.run_compaction_task(&planner, &executor)
.await?
.expect("compaction outcome");
assert!(
inner.inspect_sst_gc_plan().await?.is_none(),
"the compaction path should have already staged and drained the plan",
);
assert!(!local_sst_exists(&obsolete_one).await);
assert!(!local_sst_exists(&obsolete_two).await);
let snapshot = metrics.snapshot();
assert_eq!(snapshot.gc_plan_write_runs, 1);
assert_eq!(snapshot.gc_plan_written_sst_candidates, 2);
assert_eq!(snapshot.gc_plan_take_runs, 1);
assert_eq!(snapshot.gc_plan_taken_sst_candidates, 2);
assert_eq!(snapshot.gc_plan_authorized_sst_candidates, 2);
assert_eq!(snapshot.gc_plan_blocked_sst_candidates, 0);
assert_eq!(snapshot.sst_sweep_runs, 1);
fs::remove_dir_all(root_dir)?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn sweep_manifest_ssts_requeues_partial_delete_failures_and_recovers_on_retry()
-> Result<(), Box<dyn std::error::Error>> {
let temp_root = workspace_temp_dir("sst-sweep-partial-retry");
let root_str = temp_root.to_string_lossy().into_owned();
let failing_delete_path = temp_root
.join("sst")
.join("L0")
.join("partial.delete.parquet");
let failing_delete_path = Path::from_filesystem_path(&failing_delete_path)?;
let failing_fs = Arc::new(FailOnceRemoveFs::new(
failing_delete_path.as_ref().to_string(),
));
let mut db = DB::<FailOnceRemoveFs, TokioExecutor>::builder(config_with_pk(
vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
],
&["id"],
))
.on_durable_fs(Arc::clone(&failing_fs), root_str)?
.open_with_executor(Arc::new(TokioExecutor::default()))
.await?
.into_inner();
let metrics = Arc::new(CompactionMetrics::new());
db.compaction_metrics = Some(Arc::clone(&metrics));
let obsolete_data = local_sst_path(&db, "L0/partial.parquet");
let obsolete_delete = local_sst_path(&db, "L0/partial.delete.parquet");
write_local_sst_object(&obsolete_data, b"partial-data").await?;
write_local_sst_object(&obsolete_delete, b"partial-delete").await?;
let gc_plan = crate::manifest::GcPlanState {
obsolete_ssts: vec![crate::manifest::GcSstRef {
id: SsTableId::new(17),
level: 0,
data_path: Path::from("L0/partial.parquet"),
delete_path: Some(Path::from("L0/partial.delete.parquet")),
}],
obsolete_wal_segments: Vec::new(),
};
db.manifest
.record_gc_plan(db.manifest_table, gc_plan.clone())
.await?;
let first = db.sweep_manifest_ssts().await?;
assert_eq!(first.deleted_objects, 1);
assert_eq!(first.deleted_bytes, 12);
assert_eq!(first.delete_failures, 1);
assert!(failing_fs.has_failed_once());
assert!(!local_sst_exists(&obsolete_data).await);
assert!(local_sst_exists(&obsolete_delete).await);
let residual = db.inspect_sst_gc_plan().await?.expect("re-queued gc plan");
assert_eq!(residual.staged_sst_candidates, 1);
assert_eq!(residual.authorized_sst_candidates, 1);
assert_eq!(residual.blocked_sst_candidates, 0);
assert_eq!(residual.obsolete_wal_segments, 0);
assert_eq!(residual.candidates[0].sst_id, 17);
assert!(residual.candidates[0].authorized);
let second = db.sweep_manifest_ssts().await?;
assert_eq!(second.deleted_objects, 1);
assert_eq!(second.deleted_bytes, 14);
assert_eq!(second.delete_failures, 0);
assert!(!local_sst_exists(&obsolete_data).await);
assert!(!local_sst_exists(&obsolete_delete).await);
assert!(
db.inspect_sst_gc_plan().await?.is_none(),
"retry should fully drain the staged SST candidate once the delete succeeds",
);
let snapshot = metrics.snapshot();
assert_eq!(snapshot.sst_sweep_runs, 2);
assert_eq!(snapshot.sst_deleted_objects, 2);
assert_eq!(snapshot.sst_deleted_bytes, 26);
assert_eq!(snapshot.sst_delete_failures, 1);
fs::remove_dir_all(temp_root)?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn sweep_manifest_ssts_keeps_blocked_candidates_durable_when_nothing_is_authorized()
-> Result<(), Box<dyn std::error::Error>> {
let db: DB<InMemoryFs, NoopExecutor> = DB::new(
SchemaBuilder::from_schema(Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
])))
.primary_key("id")
.build()
.expect("schema builder"),
Arc::new(NoopExecutor),
)
.await
.expect("db init");
db.inner()
.manifest
.apply_version_edits(
db.inner().manifest_table,
&[VersionEdit::AddSsts {
level: 1,
entries: vec![SstEntry::new(
SsTableId::new(44),
None,
None,
Path::from("L1/44.parquet"),
None,
)],
}],
)
.await?;
db.inner()
.manifest
.record_gc_plan(
db.inner().manifest_table,
crate::manifest::GcPlanState {
obsolete_ssts: vec![crate::manifest::GcSstRef {
id: SsTableId::new(44),
level: 1,
data_path: Path::from("L1/44.parquet"),
delete_path: None,
}],
obsolete_wal_segments: Vec::new(),
},
)
.await?;
let summary = db.inner().sweep_manifest_ssts().await?;
assert_eq!(summary.deleted_objects, 0);
assert_eq!(summary.deleted_bytes, 0);
assert_eq!(summary.delete_failures, 0);
let persisted = db
.inspect_sst_gc_plan()
.await?
.expect("blocked candidate should remain durable");
assert_eq!(persisted.staged_sst_candidates, 1);
assert_eq!(persisted.authorized_sst_candidates, 0);
assert_eq!(persisted.blocked_sst_candidates, 1);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn sweep_manifest_ssts_treats_missing_files_as_already_reclaimed_and_requeues_wal()
-> Result<(), Box<dyn std::error::Error>> {
let temp_root = workspace_temp_dir("sst-sweep-missing");
let root_str = temp_root.to_string_lossy().into_owned();
let db = DB::<LocalFs, TokioExecutor>::builder(config_with_pk(
vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
],
&["id"],
))
.on_disk(root_str)?
.open_with_executor(Arc::new(TokioExecutor::default()))
.await?
.into_inner();
let file_ids = FileIdGenerator::default();
let wal_ref = WalSegmentRef::new(7, file_ids.generate(), 0, 0);
db.manifest
.record_gc_plan(
db.manifest_table,
crate::manifest::GcPlanState {
obsolete_ssts: vec![crate::manifest::GcSstRef {
id: SsTableId::new(77),
level: 0,
data_path: Path::from("L0/missing.parquet"),
delete_path: Some(Path::from("L0/missing.delete.parquet")),
}],
obsolete_wal_segments: vec![wal_ref.clone()],
},
)
.await?;
let summary = db.sweep_manifest_ssts().await?;
assert_eq!(summary.deleted_objects, 0);
assert_eq!(summary.deleted_bytes, 0);
assert_eq!(summary.delete_failures, 0);
let residual = db
.manifest
.take_gc_plan_for_authorized_sweep(db.manifest_table)
.await?
.expect("wal candidates should be re-queued");
assert!(residual.obsolete_ssts.is_empty());
assert_eq!(residual.obsolete_wal_segments, vec![wal_ref]);
fs::remove_dir_all(temp_root)?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn sweep_manifest_ssts_reclaims_legacy_absolute_candidate_paths()
-> Result<(), Box<dyn std::error::Error>> {
let temp_root = workspace_temp_dir("sst-sweep-legacy-absolute-paths");
let root_str = temp_root.to_string_lossy().into_owned();
let db = DB::<LocalFs, TokioExecutor>::builder(config_with_pk(
vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
],
&["id"],
))
.on_disk(root_str)?
.open_with_executor(Arc::new(TokioExecutor::default()))
.await?
.into_inner();
let obsolete_path = local_sst_path(&db, "L0/legacy-absolute.parquet");
write_local_sst_object(&obsolete_path, b"legacy").await?;
db.manifest
.record_gc_plan(
db.manifest_table,
crate::manifest::GcPlanState {
obsolete_ssts: vec![crate::manifest::GcSstRef {
id: SsTableId::new(99),
level: 0,
data_path: obsolete_path.clone(),
delete_path: None,
}],
obsolete_wal_segments: Vec::new(),
},
)
.await?;
let summary = db.sweep_manifest_ssts().await?;
assert_eq!(summary.deleted_objects, 1);
assert_eq!(summary.deleted_bytes, 6);
assert_eq!(summary.delete_failures, 0);
assert!(!local_sst_exists(&obsolete_path).await);
fs::remove_dir_all(temp_root)?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn hidden_db_gc_hooks_expose_sweep_summary_and_metrics()
-> Result<(), Box<dyn std::error::Error>> {
let temp_root = workspace_temp_dir("sst-sweep-hidden-db-hooks");
let root_str = temp_root.to_string_lossy().into_owned();
let mut inner = DB::<LocalFs, TokioExecutor>::builder(config_with_pk(
vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
],
&["id"],
))
.on_disk(root_str)?
.open_with_executor(Arc::new(TokioExecutor::default()))
.await?
.into_inner();
let metrics = Arc::new(CompactionMetrics::new());
inner.compaction_metrics = Some(Arc::clone(&metrics));
let obsolete_path = local_sst_path(&inner, "L0/hook.parquet");
write_local_sst_object(&obsolete_path, b"hook").await?;
inner
.manifest
.record_gc_plan(
inner.manifest_table,
crate::manifest::GcPlanState {
obsolete_ssts: vec![crate::manifest::GcSstRef {
id: SsTableId::new(88),
level: 0,
data_path: Path::from("L0/hook.parquet"),
delete_path: None,
}],
obsolete_wal_segments: Vec::new(),
},
)
.await?;
let db = DB::from_inner(Arc::new(inner));
let summary = db.sweep_sst_objects().await?;
assert_eq!(summary.deleted_objects, 1);
assert_eq!(summary.deleted_bytes, 4);
assert_eq!(summary.delete_failures, 0);
let snapshot = db
.compaction_metrics_snapshot()
.expect("compaction metrics snapshot");
assert_eq!(snapshot.sst_sweep_runs, 1);
assert_eq!(snapshot.sst_deleted_objects, 1);
assert_eq!(snapshot.sst_deleted_bytes, 4);
assert_eq!(snapshot.sst_delete_failures, 0);
fs::remove_dir_all(temp_root)?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn compaction_merges_overlap_heavy_inputs() -> Result<(), Box<dyn std::error::Error>> {
let temp_root = workspace_temp_dir("compaction-overlap-heavy");
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
]));
let extractor =
crate::extractor::projection_for_field(Arc::clone(&schema), 0).expect("extractor");
let mode_cfg = SchemaBuilder::from_schema(Arc::clone(&schema))
.primary_key("id")
.build()
.expect("schema builder");
let fs: Arc<dyn DynFs> = Arc::new(LocalFs {});
let sst_root = temp_root.join("sst");
fs::create_dir_all(&sst_root)?;
let sst_root = Path::from_filesystem_path(&sst_root)?;
let sst_cfg = Arc::new(
SsTableConfig::new(Arc::clone(&schema), fs, sst_root).with_key_extractor(extractor.into()),
);
let seg_a = segment_with_commits(Arc::clone(&schema), vec![("k".to_string(), 1, 10, false)]);
let seg_b = segment_with_commits(Arc::clone(&schema), vec![("k".to_string(), 2, 20, false)]);
let seg_c = segment_with_commits(
Arc::clone(&schema),
vec![
("k".to_string(), 0, 30, true),
("z".to_string(), 9, 25, false),
],
);
let mut builder_a = SsTableBuilder::new(
Arc::clone(&sst_cfg),
SsTableDescriptor::new(SsTableId::new(1), 0),
);
builder_a.add_immutable(&seg_a)?;
let sst_a = builder_a.finish(NoopExecutor).await?;
let mut builder_b = SsTableBuilder::new(
Arc::clone(&sst_cfg),
SsTableDescriptor::new(SsTableId::new(2), 0),
);
builder_b.add_immutable(&seg_b)?;
let sst_b = builder_b.finish(NoopExecutor).await?;
let mut builder_c = SsTableBuilder::new(
Arc::clone(&sst_cfg),
SsTableDescriptor::new(SsTableId::new(3), 0),
);
builder_c.add_immutable(&seg_c)?;
let sst_c = builder_c.finish(NoopExecutor).await?;
let db: DB<InMemoryFs, NoopExecutor> = DB::new(mode_cfg, Arc::new(NoopExecutor)).await?;
for sst in [&sst_a, &sst_b, &sst_c] {
let desc = sst.descriptor();
let entry = SstEntry::new(
desc.id().clone(),
desc.stats().cloned(),
desc.wal_ids().map(|ids| ids.to_vec()),
desc.data_path()
.expect("input descriptor missing data path")
.clone(),
desc.delete_path().cloned(),
);
db.inner()
.manifest
.apply_version_edits(
db.inner().manifest_table,
&[VersionEdit::AddSsts {
level: 0,
entries: vec![entry],
}],
)
.await?;
}
let task = CompactionTask {
source_level: 0,
target_level: 1,
input: vec![
CompactionInput {
level: 0,
sst_id: SsTableId::new(1),
},
CompactionInput {
level: 0,
sst_id: SsTableId::new(2),
},
CompactionInput {
level: 0,
sst_id: SsTableId::new(3),
},
],
key_range: None,
};
let planner = OneShotPlanner::new(task);
let executor = LocalCompactionExecutor::new(Arc::clone(&sst_cfg), 100);
let outcome = db
.inner()
.run_compaction_task(&planner, &executor)
.await?
.expect("compaction outcome");
let mut data_keys = Vec::new();
let mut data_vals = Vec::new();
let mut delete_keys = Vec::new();
for desc in &outcome.outputs {
let reader = SsTableReader::open(Arc::clone(&sst_cfg), desc.clone()).await?;
let mut stream = reader
.into_stream(Timestamp::MAX, None, NoopExecutor)
.await?;
while let Some(batch) = stream.next().await {
let batch = batch?;
if batch.data.num_rows() > 0 {
let ids = batch
.data
.column(0)
.as_any()
.downcast_ref::<arrow_array::StringArray>()
.expect("string ids");
let vals = batch
.data
.column(1)
.as_any()
.downcast_ref::<arrow_array::Int32Array>()
.expect("int vals");
for i in 0..batch.data.num_rows() {
data_keys.push(ids.value(i).to_string());
data_vals.push(vals.value(i));
}
}
if let Some(delete) = batch.delete.as_ref()
&& delete.num_rows() > 0
{
let ids = delete
.column(0)
.as_any()
.downcast_ref::<arrow_array::StringArray>()
.expect("string delete ids");
for i in 0..delete.num_rows() {
delete_keys.push(ids.value(i).to_string());
}
}
}
}
assert_eq!(data_keys, vec!["z"]);
assert_eq!(data_vals, vec![9]);
assert!(delete_keys.contains(&"k".to_string()));
fs::remove_dir_all(&temp_root)?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn compaction_self_kick_advances_without_periodic_tick()
-> Result<(), Box<dyn std::error::Error>> {
let db_root = workspace_temp_dir("compaction-self-kick");
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
]));
let config = SchemaBuilder::from_schema(Arc::clone(&schema))
.primary_key("id")
.with_metadata()
.build()
.expect("schema builder");
let db: DbInner<LocalFs, TokioExecutor> = DB::<LocalFs, TokioExecutor>::builder(config)
.on_disk(&db_root)?
.disable_minor_compaction()
.build()
.await?
.into_inner();
let fs: Arc<dyn DynFs> = Arc::new(LocalFs {});
let sst_root = db_root.join("sst");
fs::create_dir_all(&sst_root)?;
let sst_root = Path::from_filesystem_path(&sst_root)?;
let sst_cfg = Arc::new(
SsTableConfig::new(Arc::clone(&db.schema), fs, sst_root)
.with_key_extractor(Arc::clone(db.extractor())),
);
let executor = (*db.executor).clone();
let mut entries = Vec::new();
for (id, key, value) in [(1_u64, "a", 1_i32), (2_u64, "b", 2_i32)] {
let batch = build_batch(
Arc::clone(&db.schema),
vec![DynRow(vec![
Some(DynCell::Str(key.into())),
Some(DynCell::I32(value)),
])],
)?;
let immutable =
crate::inmem::immutable::memtable::segment_from_batch_with_key_name(batch, "id")?;
let mut builder = SsTableBuilder::new(
Arc::clone(&sst_cfg),
SsTableDescriptor::new(SsTableId::new(id), 0),
);
builder.add_immutable(&immutable)?;
let sst = builder.finish(executor.clone()).await?;
let desc = sst.descriptor().clone();
let entry = SstEntry::new(
desc.id().clone(),
desc.stats().cloned(),
desc.wal_ids().map(|ids| ids.to_vec()),
desc.data_path()
.expect("input descriptor missing data path")
.clone(),
desc.delete_path().cloned(),
);
entries.push(entry);
}
db.manifest
.apply_version_edits(
db.manifest_table,
&[VersionEdit::AddSsts { level: 0, entries }],
)
.await?;
let planner = LeveledCompactionPlanner::new(LeveledPlannerConfig {
l0_trigger: 1,
l0_max_inputs: 2,
l0_max_bytes: None,
level_thresholds: vec![0],
level_max_bytes: Vec::new(),
max_inputs_per_task: 2,
max_task_bytes: None,
});
let id_allocator = Arc::new(AtomicU64::new(10));
let executor = LocalCompactionExecutor::with_id_allocator(Arc::clone(&sst_cfg), id_allocator);
let driver = Arc::new(db.compaction_driver());
let worker_config =
CompactionWorkerConfig::new(None, 1, 1, CascadeConfig::new(0, Duration::from_millis(0)));
let handle = driver.spawn_worker(Arc::clone(&db.executor), planner, executor, worker_config);
handle.kick();
let deadline = Instant::now() + Duration::from_secs(10);
loop {
let snapshot = db.manifest.snapshot_latest(db.manifest_table).await?;
if let Some(version) = snapshot.latest_version.as_ref()
&& version.ssts().len() > 2
&& !version.ssts()[2].is_empty()
{
break;
}
if Instant::now() >= deadline {
break;
}
sleep(Duration::from_millis(20)).await;
}
let snapshot = db.manifest.snapshot_latest(db.manifest_table).await?;
let latest = snapshot.latest_version.expect("latest version");
assert!(
latest.ssts().len() > 2 && !latest.ssts()[2].is_empty(),
"expected L2 compaction to complete without new writes"
);
handle.shutdown().await;
fs::remove_dir_all(&db_root)?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn compaction_splits_outputs_by_row_cap() -> Result<(), Box<dyn std::error::Error>> {
let temp_root = workspace_temp_dir("compaction-split-cap");
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
]));
let extractor =
crate::extractor::projection_for_field(Arc::clone(&schema), 0).expect("extractor");
let mode_cfg = SchemaBuilder::from_schema(Arc::clone(&schema))
.primary_key("id")
.build()
.expect("schema builder");
let fs: Arc<dyn DynFs> = Arc::new(LocalFs {});
let sst_root = temp_root.join("sst");
fs::create_dir_all(&sst_root)?;
let sst_root = Path::from_filesystem_path(&sst_root)?;
let sst_cfg = Arc::new(
SsTableConfig::new(Arc::clone(&schema), fs, sst_root).with_key_extractor(extractor.into()),
);
let batch_a = build_batch(
Arc::clone(&schema),
vec![
DynRow(vec![Some(DynCell::Str("a".into())), Some(DynCell::I32(1))]),
DynRow(vec![Some(DynCell::Str("b".into())), Some(DynCell::I32(2))]),
],
)?;
let imm_a = crate::inmem::immutable::memtable::segment_from_batch_with_key_name(batch_a, "id")?;
let mut builder_a = SsTableBuilder::new(
Arc::clone(&sst_cfg),
SsTableDescriptor::new(SsTableId::new(1), 0),
);
builder_a.add_immutable(&imm_a)?;
let sst_a = builder_a.finish(NoopExecutor).await?;
let batch_b = build_batch(
Arc::clone(&schema),
vec![DynRow(vec![
Some(DynCell::Str("c".into())),
Some(DynCell::I32(3)),
])],
)?;
let imm_b = crate::inmem::immutable::memtable::segment_from_batch_with_key_name(batch_b, "id")?;
let mut builder_b = SsTableBuilder::new(
Arc::clone(&sst_cfg),
SsTableDescriptor::new(SsTableId::new(2), 0),
);
builder_b.add_immutable(&imm_b)?;
let sst_b = builder_b.finish(NoopExecutor).await?;
let db: DB<InMemoryFs, NoopExecutor> = DB::new(mode_cfg, Arc::new(NoopExecutor)).await?;
for sst in [&sst_a, &sst_b] {
let desc = sst.descriptor();
let entry = SstEntry::new(
desc.id().clone(),
desc.stats().cloned(),
desc.wal_ids().map(|ids| ids.to_vec()),
desc.data_path()
.expect("input descriptor missing data path")
.clone(),
desc.delete_path().cloned(),
);
db.inner()
.manifest
.apply_version_edits(
db.inner().manifest_table,
&[VersionEdit::AddSsts {
level: 0,
entries: vec![entry],
}],
)
.await?;
}
let task = CompactionTask {
source_level: 0,
target_level: 1,
input: vec![
CompactionInput {
level: 0,
sst_id: SsTableId::new(1),
},
CompactionInput {
level: 0,
sst_id: SsTableId::new(2),
},
],
key_range: None,
};
let planner = StaticPlanner { task };
let executor = LocalCompactionExecutor::new(Arc::clone(&sst_cfg), 100).with_max_output_rows(1);
let outcome = db
.inner()
.run_compaction_task(&planner, &executor)
.await?
.expect("compaction outcome");
assert!(
outcome.outputs.len() > 1,
"expected output splitting with row cap"
);
for output in &outcome.outputs {
let stats = output.stats().expect("output stats");
assert!(stats.rows <= 1);
}
fs::remove_dir_all(&temp_root)?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn compaction_splits_outputs_by_byte_cap() -> Result<(), Box<dyn std::error::Error>> {
let temp_root = workspace_temp_dir("compaction-split-bytes");
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
]));
let extractor =
crate::extractor::projection_for_field(Arc::clone(&schema), 0).expect("extractor");
let mode_cfg = SchemaBuilder::from_schema(Arc::clone(&schema))
.primary_key("id")
.build()
.expect("schema builder");
let fs: Arc<dyn DynFs> = Arc::new(LocalFs {});
let sst_root = temp_root.join("sst");
fs::create_dir_all(&sst_root)?;
let sst_root = Path::from_filesystem_path(&sst_root)?;
let sst_cfg = Arc::new(
SsTableConfig::new(Arc::clone(&schema), fs, sst_root).with_key_extractor(extractor.into()),
);
let batch_a = build_batch(
Arc::clone(&schema),
vec![
DynRow(vec![Some(DynCell::Str("a".into())), Some(DynCell::I32(1))]),
DynRow(vec![Some(DynCell::Str("b".into())), Some(DynCell::I32(2))]),
],
)?;
let imm_a = crate::inmem::immutable::memtable::segment_from_batch_with_key_name(batch_a, "id")?;
let mut builder_a = SsTableBuilder::new(
Arc::clone(&sst_cfg),
SsTableDescriptor::new(SsTableId::new(1), 0),
);
builder_a.add_immutable(&imm_a)?;
let sst_a = builder_a.finish(NoopExecutor).await?;
let batch_b = build_batch(
Arc::clone(&schema),
vec![DynRow(vec![
Some(DynCell::Str("c".into())),
Some(DynCell::I32(3)),
])],
)?;
let imm_b = crate::inmem::immutable::memtable::segment_from_batch_with_key_name(batch_b, "id")?;
let mut builder_b = SsTableBuilder::new(
Arc::clone(&sst_cfg),
SsTableDescriptor::new(SsTableId::new(2), 0),
);
builder_b.add_immutable(&imm_b)?;
let sst_b = builder_b.finish(NoopExecutor).await?;
let db: DB<InMemoryFs, NoopExecutor> = DB::new(mode_cfg, Arc::new(NoopExecutor)).await?;
for sst in [&sst_a, &sst_b] {
let desc = sst.descriptor();
let entry = SstEntry::new(
desc.id().clone(),
desc.stats().cloned(),
desc.wal_ids().map(|ids| ids.to_vec()),
desc.data_path()
.expect("input descriptor missing data path")
.clone(),
desc.delete_path().cloned(),
);
db.inner()
.manifest
.apply_version_edits(
db.inner().manifest_table,
&[VersionEdit::AddSsts {
level: 0,
entries: vec![entry],
}],
)
.await?;
}
let task = CompactionTask {
source_level: 0,
target_level: 1,
input: vec![
CompactionInput {
level: 0,
sst_id: SsTableId::new(1),
},
CompactionInput {
level: 0,
sst_id: SsTableId::new(2),
},
],
key_range: None,
};
let planner = StaticPlanner { task };
let executor = LocalCompactionExecutor::new(Arc::clone(&sst_cfg), 100).with_max_output_bytes(1);
let outcome = db
.inner()
.run_compaction_task(&planner, &executor)
.await?
.expect("compaction outcome");
assert!(
outcome.outputs.len() > 1,
"expected output splitting with byte cap"
);
fs::remove_dir_all(&temp_root)?;
Ok(())
}
#[derive(Clone)]
struct ConflictingExecutor {
inner: LocalCompactionExecutor,
manifest: TonboManifest<InMemoryFs, TokioExecutor>,
table: TableId,
outputs: Arc<Mutex<Vec<SsTableDescriptor>>>,
execute_calls: Arc<AtomicUsize>,
cleanup_calls: Arc<AtomicUsize>,
}
impl ConflictingExecutor {
fn new(
inner: LocalCompactionExecutor,
manifest: TonboManifest<InMemoryFs, TokioExecutor>,
table: TableId,
outputs: Arc<Mutex<Vec<SsTableDescriptor>>>,
execute_calls: Arc<AtomicUsize>,
cleanup_calls: Arc<AtomicUsize>,
) -> Self {
Self {
inner,
manifest,
table,
outputs,
execute_calls,
cleanup_calls,
}
}
}
impl CompactionExecutor for ConflictingExecutor {
fn execute(
&self,
job: CompactionJob,
) -> Pin<Box<dyn MaybeSendFuture<Output = Result<CompactionOutcome, CompactionError>> + '_>>
{
let inner = self.inner.clone();
let manifest = self.manifest.clone();
let table = self.table;
let outputs = Arc::clone(&self.outputs);
let execute_calls = Arc::clone(&self.execute_calls);
Box::pin(async move {
execute_calls.fetch_add(1, Ordering::SeqCst);
let outcome = inner.execute(job).await?;
{
let mut guard = outputs.lock().await;
guard.clear();
guard.extend(outcome.outputs.iter().cloned());
}
manifest
.apply_version_edits(
table,
&[VersionEdit::SetWalSegments {
segments: Vec::new(),
}],
)
.await
.map_err(CompactionError::Manifest)?;
Ok(outcome)
})
}
fn cleanup_outputs<'a>(
&'a self,
outputs: &'a [SsTableDescriptor],
) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), CompactionError>> + 'a>> {
let inner = self.inner.clone();
let cleanup_calls = Arc::clone(&self.cleanup_calls);
Box::pin(async move {
let result = inner.cleanup_outputs(outputs).await;
cleanup_calls.fetch_add(1, Ordering::SeqCst);
result
})
}
}
#[derive(Clone)]
struct CasConflictExecutor {
manifest: TonboManifest<InMemoryFs, RecordingExecutor>,
table: TableId,
outputs: Vec<SsTableDescriptor>,
conflict_once: Arc<AtomicUsize>,
}
impl CasConflictExecutor {
fn new(
manifest: TonboManifest<InMemoryFs, RecordingExecutor>,
table: TableId,
outputs: Vec<SsTableDescriptor>,
) -> Self {
Self {
manifest,
table,
outputs,
conflict_once: Arc::new(AtomicUsize::new(0)),
}
}
}
impl CompactionExecutor for CasConflictExecutor {
fn execute(
&self,
job: CompactionJob,
) -> Pin<Box<dyn MaybeSendFuture<Output = Result<CompactionOutcome, CompactionError>> + '_>>
{
let manifest = self.manifest.clone();
let table = self.table;
let outputs = self.outputs.clone();
let conflict_once = Arc::clone(&self.conflict_once);
Box::pin(async move {
if conflict_once.fetch_add(1, Ordering::SeqCst) == 0 {
manifest
.apply_version_edits(
table,
&[VersionEdit::SetWalSegments {
segments: Vec::new(),
}],
)
.await
.map_err(CompactionError::Manifest)?;
}
CompactionOutcome::from_outputs(
outputs,
job.inputs.clone(),
job.task.target_level as u32,
None,
)
})
}
fn cleanup_outputs<'a>(
&'a self,
_outputs: &'a [SsTableDescriptor],
) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), CompactionError>> + 'a>> {
Box::pin(async { Ok(()) })
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn compaction_cas_conflict_cleans_outputs() -> Result<(), Box<dyn std::error::Error>> {
let temp_root = workspace_temp_dir("compaction-cas-cleanup");
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
]));
let mode_cfg = SchemaBuilder::from_schema(Arc::clone(&schema))
.primary_key("id")
.build()
.expect("schema builder");
let extractor = Arc::clone(&mode_cfg.extractor);
let schema = Arc::clone(&mode_cfg.schema);
let executor = Arc::new(TokioExecutor::default());
let policy = Arc::new(BatchesThreshold { batches: 1 });
let mut db: DbInner<InMemoryFs, TokioExecutor> =
DB::new_with_policy(mode_cfg, Arc::clone(&executor), policy)
.await?
.into_inner();
let metrics = Arc::new(CompactionMetrics::new());
db.compaction_metrics = Some(Arc::clone(&metrics));
let sst_root = temp_root.join("sst");
fs::create_dir_all(&sst_root)?;
let sst_root = Path::from_filesystem_path(&sst_root)?;
let sst_fs: Arc<dyn DynFs> = Arc::new(LocalFs {});
let sst_cfg = Arc::new(
SsTableConfig::new(Arc::clone(&schema), sst_fs.clone(), sst_root)
.with_key_extractor(extractor),
);
for idx in 0..2 {
let rows = vec![vec![
Some(DynCell::Str(format!("ck-{idx}"))),
Some(DynCell::I32(idx)),
]];
let batch = build_batch(Arc::clone(&schema), rows)?;
db.ingest(batch).await?;
let descriptor = SsTableDescriptor::new(SsTableId::new(idx as u64 + 1), 0);
db.flush_immutables_with_descriptor(Arc::clone(&sst_cfg), descriptor)
.await?;
}
let planner = LeveledCompactionPlanner::new(LeveledPlannerConfig {
l0_trigger: 1,
l0_max_inputs: 2,
l0_max_bytes: None,
level_thresholds: vec![usize::MAX],
level_max_bytes: Vec::new(),
max_inputs_per_task: 2,
max_task_bytes: None,
});
let recorded_outputs = Arc::new(Mutex::new(Vec::new()));
let execute_calls = Arc::new(AtomicUsize::new(0));
let cleanup_calls = Arc::new(AtomicUsize::new(0));
let conflicting_executor = ConflictingExecutor::new(
LocalCompactionExecutor::new(Arc::clone(&sst_cfg), 100),
db.manifest.clone(),
db.manifest_table,
Arc::clone(&recorded_outputs),
Arc::clone(&execute_calls),
Arc::clone(&cleanup_calls),
);
let result = db
.run_compaction_task(&planner, &conflicting_executor)
.await;
match result {
Err(CompactionError::Manifest(ManifestError::CasConflict(_)))
| Err(CompactionError::CasConflict) => {}
other => panic!("expected CAS conflict, got {other:?}"),
}
let outputs = recorded_outputs.lock().await.clone();
assert!(
!outputs.is_empty(),
"compaction should have produced outputs before CAS conflict"
);
let execute_count = execute_calls.load(Ordering::SeqCst);
let cleanup_count = cleanup_calls.load(Ordering::SeqCst);
assert!(
execute_count >= 2,
"expected CAS retry attempts, got {execute_count}"
);
assert_eq!(
cleanup_count, execute_count,
"cleanup should run per failed attempt"
);
for desc in outputs {
if let Some(path) = desc.data_path() {
assert!(
sst_fs.open(path).await.is_err(),
"data file should be cleaned up"
);
}
if let Some(path) = desc.delete_path() {
assert!(
sst_fs.open(path).await.is_err(),
"delete file should be cleaned up"
);
}
}
let snapshot = metrics.snapshot();
assert!(snapshot.cas_retries >= 1);
assert_eq!(snapshot.cas_aborts, 1);
assert_eq!(snapshot.job_failures, 1);
fs::remove_dir_all(&temp_root)?;
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn compaction_cas_backoff_sleeps_on_conflict() -> Result<(), Box<dyn std::error::Error>> {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
]));
let mode_cfg = SchemaBuilder::from_schema(Arc::clone(&schema))
.primary_key("id")
.build()
.expect("schema builder");
let executor = RecordingExecutor::new();
let mut db: DbInner<InMemoryFs, RecordingExecutor> =
DB::new(mode_cfg, Arc::new(executor.clone()))
.await?
.into_inner();
db.cas_backoff = CasBackoffConfig::new(Duration::from_millis(5), Duration::from_millis(20));
let entry = SstEntry::new(
SsTableId::new(1),
None,
None,
Path::from("L0/001.parquet"),
None,
);
db.manifest
.apply_version_edits(
db.manifest_table,
&[VersionEdit::AddSsts {
level: 0,
entries: vec![entry],
}],
)
.await?;
let task = CompactionTask {
source_level: 0,
target_level: 1,
input: vec![CompactionInput {
level: 0,
sst_id: SsTableId::new(1),
}],
key_range: None,
};
let planner = StaticPlanner { task };
let output_desc = SsTableDescriptor::new(SsTableId::new(100), 1)
.with_storage_paths(Path::from("L1/100.parquet"), None);
let executor_with_conflict =
CasConflictExecutor::new(db.manifest.clone(), db.manifest_table, vec![output_desc]);
let outcome = db
.run_compaction_task(&planner, &executor_with_conflict)
.await?
.expect("compaction outcome");
assert_eq!(outcome.add_ssts.len(), 1);
let sleeps = executor.sleep_calls();
assert_eq!(sleeps, vec![Duration::from_millis(5)]);
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn compaction_executor_failure_records_metrics() -> Result<(), Box<dyn std::error::Error>> {
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, false)]));
let mode_cfg = SchemaBuilder::from_schema(Arc::clone(&schema))
.primary_key("id")
.build()
.expect("schema builder");
let executor = Arc::new(TokioExecutor::default());
let mut db: DbInner<InMemoryFs, TokioExecutor> =
DB::new(mode_cfg, Arc::clone(&executor)).await?.into_inner();
let metrics = Arc::new(CompactionMetrics::new());
db.compaction_metrics = Some(Arc::clone(&metrics));
let entry = SstEntry::new(
SsTableId::new(1),
None,
None,
Path::from("L0/001.parquet"),
None,
);
db.manifest
.apply_version_edits(
db.manifest_table,
&[VersionEdit::AddSsts {
level: 0,
entries: vec![entry],
}],
)
.await?;
let task = CompactionTask {
source_level: 0,
target_level: 1,
input: vec![CompactionInput {
level: 0,
sst_id: SsTableId::new(1),
}],
key_range: None,
};
let planner = StaticPlanner { task };
let err = db
.run_compaction_task(&planner, &FailingExecutor)
.await
.expect_err("expected compaction failure");
assert!(matches!(err, CompactionError::NoInputs));
let snapshot = metrics.snapshot();
assert_eq!(snapshot.job_count, 0);
assert_eq!(snapshot.job_failures, 1);
assert_eq!(snapshot.cas_aborts, 0);
assert!(snapshot.last_job.is_some());
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn compaction_metrics_ignore_missing_stats() -> Result<(), Box<dyn std::error::Error>> {
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, false)]));
let mode_cfg = SchemaBuilder::from_schema(Arc::clone(&schema))
.primary_key("id")
.build()
.expect("schema builder");
let executor = Arc::new(TokioExecutor::default());
let mut db: DbInner<InMemoryFs, TokioExecutor> =
DB::new(mode_cfg, Arc::clone(&executor)).await?.into_inner();
let metrics = Arc::new(CompactionMetrics::new());
db.compaction_metrics = Some(Arc::clone(&metrics));
let entry = SstEntry::new(
SsTableId::new(1),
None,
None,
Path::from("L0/001.parquet"),
None,
);
db.manifest
.apply_version_edits(
db.manifest_table,
&[VersionEdit::AddSsts {
level: 0,
entries: vec![entry],
}],
)
.await?;
let task = CompactionTask {
source_level: 0,
target_level: 1,
input: vec![CompactionInput {
level: 0,
sst_id: SsTableId::new(1),
}],
key_range: None,
};
let planner = StaticPlanner { task };
let output_desc = SsTableDescriptor::new(SsTableId::new(99), 1)
.with_storage_paths(Path::from("L1/099.parquet"), None);
let executor = NoStatsExecutor {
output: output_desc,
};
let outcome = db
.run_compaction_task(&planner, &executor)
.await?
.expect("compaction outcome");
assert_eq!(outcome.add_ssts.len(), 1);
let snapshot = metrics.snapshot();
assert_eq!(snapshot.job_count, 1);
assert_eq!(snapshot.bytes_in, 0);
assert_eq!(snapshot.bytes_out, 0);
assert_eq!(snapshot.rows_in, 0);
assert_eq!(snapshot.rows_out, 0);
assert_eq!(snapshot.tombstones_in, 0);
assert_eq!(snapshot.tombstones_out, 0);
let last_job = snapshot.last_job.expect("last job");
assert!(!last_job.input.complete);
assert!(!last_job.output.complete);
Ok(())
}
#[derive(Clone)]
struct CascadePlanner;
impl CompactionPlanner for CascadePlanner {
fn plan(&self, snapshot: &CompactionSnapshot) -> Option<CompactionTask> {
let level0 = snapshot.level(0)?;
let file = level0.files().first()?;
Some(CompactionTask {
source_level: 0,
target_level: 1,
input: vec![CompactionInput {
level: 0,
sst_id: file.sst_id.clone(),
}],
key_range: None,
})
}
fn plan_with_min_level(
&self,
snapshot: &CompactionSnapshot,
min_level: usize,
) -> Option<CompactionTask> {
if min_level > 1 {
return None;
}
let level1 = snapshot.level(1)?;
let file = level1.files().first()?;
Some(CompactionTask {
source_level: 1,
target_level: 2,
input: vec![CompactionInput {
level: 1,
sst_id: file.sst_id.clone(),
}],
key_range: None,
})
}
}
#[derive(Clone)]
struct CountingExecutor {
executed: Arc<StdMutex<Vec<(usize, usize)>>>,
}
impl CountingExecutor {
fn new(executed: Arc<StdMutex<Vec<(usize, usize)>>>) -> Self {
Self { executed }
}
}
impl CompactionExecutor for CountingExecutor {
fn execute(
&self,
job: CompactionJob,
) -> Pin<Box<dyn MaybeSendFuture<Output = Result<CompactionOutcome, CompactionError>> + '_>>
{
let executed = Arc::clone(&self.executed);
Box::pin(async move {
let source_level = job.task.source_level;
let target_level = job.task.target_level;
executed
.lock()
.expect("executed lock")
.push((source_level, target_level));
let input_id = job.inputs.first().map(|desc| desc.id().raw()).unwrap_or(0);
let output_id = SsTableId::new(input_id + (target_level as u64 * 100));
let output_path = Path::from(format!(
"L{target_level}/{id}.parquet",
id = output_id.raw()
));
let output_desc = SsTableDescriptor::new(output_id, target_level)
.with_storage_paths(output_path, None);
CompactionOutcome::from_outputs(
vec![output_desc],
job.inputs.clone(),
target_level as u32,
None,
)
})
}
fn cleanup_outputs<'a>(
&'a self,
_outputs: &'a [SsTableDescriptor],
) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), CompactionError>> + 'a>> {
Box::pin(async { Ok(()) })
}
}
async fn wait_for_executions(
executed: &Arc<StdMutex<Vec<(usize, usize)>>>,
expected: usize,
) -> bool {
let deadline = Instant::now() + Duration::from_secs(1);
loop {
if executed.lock().expect("executed lock").len() >= expected {
return true;
}
if Instant::now() >= deadline {
return false;
}
tokio::time::sleep(Duration::from_millis(5)).await;
}
}
async fn wait_for_metrics<F>(
metrics: &Arc<CompactionMetrics>,
timeout: Duration,
mut predicate: F,
) -> bool
where
F: FnMut(&crate::compaction::metrics::CompactionMetricsSnapshot) -> bool,
{
let deadline = Instant::now() + timeout;
loop {
let snapshot = metrics.snapshot();
if predicate(&snapshot) {
return true;
}
if Instant::now() >= deadline {
return false;
}
tokio::time::sleep(Duration::from_millis(5)).await;
}
}
fn segment_with_commits(
schema: SchemaRef,
rows: Vec<(String, i32, u64, bool)>,
) -> ImmutableMemTable {
let mut data_rows = Vec::new();
let mut data_commits = Vec::new();
let mut delete_rows = Vec::new();
let mut delete_commits = Vec::new();
for (key, value, commit, tombstone) in rows {
let ts = Timestamp::new(commit);
if tombstone {
delete_rows.push(DynRow(vec![Some(DynCell::Str(key))]));
delete_commits.push(ts);
} else {
data_rows.push(DynRow(vec![
Some(DynCell::Str(key)),
Some(DynCell::I32(value)),
]));
data_commits.push(ts);
}
}
let batch = if data_rows.is_empty() {
RecordBatch::new_empty(schema.clone())
} else {
build_batch(schema.clone(), data_rows).expect("data batch")
};
let tombstone_flags = vec![false; batch.num_rows()];
let (batch, mvcc) =
bundle_mvcc_sidecar(batch, data_commits.clone(), tombstone_flags).expect("mvcc");
let delete_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, true)]));
let delete_empty = delete_rows.is_empty();
let delete_batch = if delete_empty {
RecordBatch::new_empty(delete_schema.clone())
} else {
build_batch(delete_schema.clone(), delete_rows).expect("delete batch")
};
let delete_sidecar = if delete_empty {
DeleteSidecar::empty(&delete_schema)
} else {
DeleteSidecar::new(delete_batch, delete_commits.clone())
};
let extractor = crate::extractor::projection_for_field(schema.clone(), 0).expect("extractor");
let mut composite = std::collections::BTreeMap::new();
let row_indices: Vec<usize> = (0..batch.num_rows()).collect();
let key_rows = extractor
.project_view(&batch, &row_indices)
.expect("project view");
for (row, key_row) in key_rows.into_iter().enumerate() {
composite.insert(
KeyTsViewRaw::new(key_row, mvcc.commit_ts[row]),
ImmutableIndexEntry::Row(row as u32),
);
}
if !delete_sidecar.is_empty() {
let delete_schema = delete_sidecar.key_batch().schema().clone();
let indices: Vec<usize> = (0..delete_schema.fields().len()).collect();
let projection =
projection_for_columns(delete_schema.clone(), indices).expect("identity projection");
let delete_row_indices: Vec<usize> = (0..delete_sidecar.key_batch().num_rows()).collect();
let delete_key_rows = projection
.project_view(delete_sidecar.key_batch(), &delete_row_indices)
.expect("delete keys");
for (row, key_row) in delete_key_rows.into_iter().enumerate() {
let ts = delete_commits[row];
composite.insert(KeyTsViewRaw::new(key_row, ts), ImmutableIndexEntry::Delete);
}
}
ImmutableMemTable::new(batch, composite, mvcc, delete_sidecar)
}
#[tokio::test(flavor = "current_thread")]
async fn compaction_periodic_trigger_records_metrics() -> Result<(), Box<dyn std::error::Error>> {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
]));
let mode_cfg = SchemaBuilder::from_schema(Arc::clone(&schema))
.primary_key("id")
.build()
.expect("schema builder");
let executor = Arc::new(TokioExecutor::default());
let mut inner: DbInner<InMemoryFs, TokioExecutor> =
DB::new(mode_cfg, Arc::clone(&executor)).await?.into_inner();
let metrics = Arc::new(CompactionMetrics::new());
inner.compaction_metrics = Some(Arc::clone(&metrics));
let entry = SstEntry::new(
SsTableId::new(1),
None,
None,
Path::from("L0/001.parquet"),
None,
);
inner
.manifest
.apply_version_edits(
inner.manifest_table,
&[VersionEdit::AddSsts {
level: 0,
entries: vec![entry],
}],
)
.await?;
let task = CompactionTask {
source_level: 0,
target_level: 1,
input: vec![CompactionInput {
level: 0,
sst_id: SsTableId::new(1),
}],
key_range: None,
};
let planner = OneShotPlanner::new(task);
let executed = Arc::new(StdMutex::new(Vec::new()));
let executor = CountingExecutor::new(Arc::clone(&executed));
let driver = Arc::new(inner.compaction_driver());
let worker_config = CompactionWorkerConfig::new(
Some(Duration::from_millis(5)),
2,
1,
CascadeConfig::new(0, Duration::from_millis(0)),
);
let handle = driver.spawn_worker(
Arc::clone(&inner.executor),
planner,
executor,
worker_config,
);
assert!(
wait_for_metrics(&metrics, Duration::from_secs(1), |snapshot| {
snapshot.trigger_periodic >= 1
})
.await,
"expected periodic trigger metrics"
);
assert!(
wait_for_executions(&executed, 1).await,
"expected compaction execution"
);
handle.shutdown().await;
let snapshot = metrics.snapshot();
assert!(snapshot.trigger_periodic >= 1);
assert_eq!(snapshot.trigger_kick, 0);
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn compaction_planner_queue_drop_records_metrics() -> Result<(), Box<dyn std::error::Error>> {
let metrics = CompactionMetrics::new();
metrics.record_queue_drop(
CompactionQueueDropContext::Planner,
CompactionQueueDropReason::Full,
);
metrics.record_queue_drop(
CompactionQueueDropContext::Planner,
CompactionQueueDropReason::Closed,
);
let snapshot = metrics.snapshot();
assert!(snapshot.queue_drops_planner_full >= 1);
assert!(snapshot.queue_drops_planner_closed >= 1);
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn compaction_kick_triggers_without_periodic_tick() -> Result<(), Box<dyn std::error::Error>>
{
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
]));
let mode_cfg = SchemaBuilder::from_schema(Arc::clone(&schema))
.primary_key("id")
.build()
.expect("schema builder");
let executor = Arc::new(TokioExecutor::default());
let mut inner: DbInner<InMemoryFs, TokioExecutor> =
DB::new(mode_cfg, Arc::clone(&executor)).await?.into_inner();
let metrics = Arc::new(CompactionMetrics::new());
inner.compaction_metrics = Some(Arc::clone(&metrics));
let entry = SstEntry::new(
SsTableId::new(1),
None,
None,
Path::from("L0/001.parquet"),
None,
);
inner
.manifest
.apply_version_edits(
inner.manifest_table,
&[VersionEdit::AddSsts {
level: 0,
entries: vec![entry],
}],
)
.await?;
let task = CompactionTask {
source_level: 0,
target_level: 1,
input: vec![CompactionInput {
level: 0,
sst_id: SsTableId::new(1),
}],
key_range: None,
};
let planner = OneShotPlanner::new(task);
let executed = Arc::new(StdMutex::new(Vec::new()));
let executor = CountingExecutor::new(Arc::clone(&executed));
let driver = Arc::new(inner.compaction_driver());
let worker_config =
CompactionWorkerConfig::new(None, 2, 1, CascadeConfig::new(0, Duration::from_millis(0)));
let handle = driver.spawn_worker(
Arc::clone(&inner.executor),
planner,
executor,
worker_config,
);
handle.kick();
assert!(
wait_for_executions(&executed, 1).await,
"expected compaction work after kick"
);
handle.shutdown().await;
let snapshot = metrics.snapshot();
assert_eq!(snapshot.trigger_kick, 1);
assert_eq!(snapshot.trigger_periodic, 0);
assert!(snapshot.job_count >= 1);
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn cascade_scheduling_queue_size_one() -> Result<(), Box<dyn std::error::Error>> {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
]));
let mode_cfg = SchemaBuilder::from_schema(Arc::clone(&schema))
.primary_key("id")
.build()
.expect("schema builder");
let executor = Arc::new(TokioExecutor::default());
let mut inner: DbInner<InMemoryFs, TokioExecutor> =
DB::new(mode_cfg, Arc::clone(&executor)).await?.into_inner();
let metrics = Arc::new(CompactionMetrics::new());
inner.compaction_metrics = Some(Arc::clone(&metrics));
let entry = SstEntry::new(
SsTableId::new(1),
None,
None,
Path::from("L0/001.parquet"),
None,
);
inner
.manifest
.apply_version_edits(
inner.manifest_table,
&[VersionEdit::AddSsts {
level: 0,
entries: vec![entry],
}],
)
.await?;
let executed = Arc::new(StdMutex::new(Vec::new()));
let planner = CascadePlanner;
let executor = CountingExecutor::new(Arc::clone(&executed));
let driver = Arc::new(inner.compaction_driver());
let worker_config =
CompactionWorkerConfig::new(None, 1, 1, CascadeConfig::new(1, Duration::from_millis(0)));
let handle = driver.spawn_worker(
Arc::clone(&inner.executor),
planner,
executor,
worker_config,
);
handle.kick();
assert!(
wait_for_executions(&executed, 2).await,
"expected L0->L1 and L1->L2 executions with queue size 1"
);
handle.shutdown().await;
let tasks = executed.lock().expect("executed lock").clone();
let l0_to_l1 = tasks
.iter()
.filter(|(source, target)| *source == 0 && *target == 1)
.count();
let l1_to_l2 = tasks
.iter()
.filter(|(source, target)| *source == 1 && *target == 2)
.count();
assert_eq!(l0_to_l1, 1);
assert_eq!(l1_to_l2, 1);
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn cascade_scheduling_respects_budget() -> Result<(), Box<dyn std::error::Error>> {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
]));
let mode_cfg = SchemaBuilder::from_schema(Arc::clone(&schema))
.primary_key("id")
.build()
.expect("schema builder");
let executor = Arc::new(TokioExecutor::default());
let mut inner: DbInner<InMemoryFs, TokioExecutor> =
DB::new(mode_cfg, Arc::clone(&executor)).await?.into_inner();
let metrics = Arc::new(CompactionMetrics::new());
inner.compaction_metrics = Some(Arc::clone(&metrics));
let entry = SstEntry::new(
SsTableId::new(1),
None,
None,
Path::from("L0/001.parquet"),
None,
);
inner
.manifest
.apply_version_edits(
inner.manifest_table,
&[VersionEdit::AddSsts {
level: 0,
entries: vec![entry],
}],
)
.await?;
let executed = Arc::new(StdMutex::new(Vec::new()));
let planner = CascadePlanner;
let executor = CountingExecutor::new(Arc::clone(&executed));
let driver = Arc::new(inner.compaction_driver());
let worker_config =
CompactionWorkerConfig::new(None, 2, 1, CascadeConfig::new(0, Duration::from_millis(0)));
let handle = driver.spawn_worker(
Arc::clone(&inner.executor),
planner,
executor,
worker_config,
);
handle.kick();
assert!(
wait_for_executions(&executed, 1).await,
"expected initial compaction execution"
);
handle.shutdown().await;
let tasks = executed.lock().expect("executed lock").clone();
let l0_to_l1 = tasks
.iter()
.filter(|(source, target)| *source == 0 && *target == 1)
.count();
let l1_to_l2 = tasks
.iter()
.filter(|(source, target)| *source == 1 && *target == 2)
.count();
assert_eq!(l0_to_l1, 1);
assert_eq!(l1_to_l2, 0);
let snapshot = metrics.snapshot();
assert_eq!(snapshot.cascades_scheduled, 0);
assert!(snapshot.cascades_blocked_budget >= 1);
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn cascade_scheduling_respects_cooldown() -> Result<(), Box<dyn std::error::Error>> {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
]));
let mode_cfg = SchemaBuilder::from_schema(Arc::clone(&schema))
.primary_key("id")
.build()
.expect("schema builder");
let executor = Arc::new(TokioExecutor::default());
let mut inner: DbInner<InMemoryFs, TokioExecutor> =
DB::new(mode_cfg, Arc::clone(&executor)).await?.into_inner();
let metrics = Arc::new(CompactionMetrics::new());
inner.compaction_metrics = Some(Arc::clone(&metrics));
let entry_a = SstEntry::new(
SsTableId::new(1),
None,
None,
Path::from("L0/001.parquet"),
None,
);
let entry_b = SstEntry::new(
SsTableId::new(2),
None,
None,
Path::from("L0/002.parquet"),
None,
);
inner
.manifest
.apply_version_edits(
inner.manifest_table,
&[VersionEdit::AddSsts {
level: 0,
entries: vec![entry_a, entry_b],
}],
)
.await?;
let executed = Arc::new(StdMutex::new(Vec::new()));
let planner = CascadePlanner;
let executor = CountingExecutor::new(Arc::clone(&executed));
let driver = Arc::new(inner.compaction_driver());
let worker_config = CompactionWorkerConfig::new(
Some(Duration::from_millis(5)),
4,
1,
CascadeConfig::new(1, Duration::from_secs(60)),
);
let handle = driver.spawn_worker(
Arc::clone(&inner.executor),
planner,
executor,
worker_config,
);
handle.kick();
assert!(
wait_for_executions(&executed, 3).await,
"expected two L0->L1 executions and one cascade"
);
handle.shutdown().await;
let tasks = executed.lock().expect("executed lock").clone();
let l0_to_l1 = tasks
.iter()
.filter(|(source, target)| *source == 0 && *target == 1)
.count();
let l1_to_l2 = tasks
.iter()
.filter(|(source, target)| *source == 1 && *target == 2)
.count();
assert_eq!(l0_to_l1, 2);
assert_eq!(l1_to_l2, 1);
let snapshot = metrics.snapshot();
assert_eq!(snapshot.cascades_scheduled, 1);
assert!(snapshot.cascades_blocked_cooldown >= 1);
Ok(())
}
#[test]
fn resolve_compaction_inputs_keeps_levels() {
let file_ids = FileIdGenerator::default();
let table_id = TableId::new(&file_ids);
let mut version = VersionState::empty(table_id);
let l0_id = SsTableId::new(1);
let l1_id = SsTableId::new(2);
let edits = vec![
VersionEdit::AddSsts {
level: 0,
entries: vec![SstEntry::new(
l0_id.clone(),
None,
None,
Path::from("L0/000.parquet"),
None,
)],
},
VersionEdit::AddSsts {
level: 1,
entries: vec![SstEntry::new(
l1_id.clone(),
None,
None,
Path::from("L1/001.parquet"),
None,
)],
},
];
version.apply_edits(&edits).expect("apply edits");
let task = CompactionTask {
source_level: 0,
target_level: 1,
input: vec![
CompactionInput {
level: 0,
sst_id: l0_id,
},
CompactionInput {
level: 1,
sst_id: l1_id,
},
],
key_range: None,
};
let resolved =
orchestrator::resolve_inputs(&Path::default(), &version, &task).expect("resolve");
assert_eq!(resolved.len(), 2);
assert_eq!(resolved[0].level(), 0);
assert_eq!(resolved[1].level(), 1);
}