use std::{
collections::{BTreeMap, HashMap},
mem,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
time::Instant,
};
use bytes::Bytes;
use chrono::Utc;
use futures::future::join_all;
use roaring::RoaringBitmap;
use tokio::time;
use uuid::Uuid;
use crate::{
Supertable,
config::CompactionSettings,
runtime_bridge::bridge_on_runtime,
superfile::builder::SuperfileBuilder,
supertable::{
BuildError, CommitError, SuperfileEntry,
error::CompactionError,
query::dispatch::open_reader,
wal::{
SealRecord, WalStore,
tombstones_admin::{self, TombstonesAdminError},
},
writer::{
PreparedSuperfile, ShardOutput, backoff_delay, prepare_superfile, try_commit_attempt,
},
},
};
struct CompactionSlot<'a>(&'a AtomicBool);
impl Drop for CompactionSlot<'_> {
fn drop(&mut self) {
self.0.store(false, Ordering::Release);
}
}
const MIB: u64 = 1024 * 1024;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SuperfileStats {
pub superfile_id: Uuid,
pub partition_key: Vec<u8>,
pub size_bytes: u64,
pub n_docs: u64,
pub tombstoned_docs: u64,
pub sealed_by_other: bool,
}
impl SuperfileStats {
fn live_docs(&self) -> u64 {
self.n_docs.saturating_sub(self.tombstoned_docs)
}
fn live_bytes(&self) -> u64 {
if self.n_docs == 0 {
return 0;
}
(self.size_bytes as u128 * self.live_docs() as u128 / self.n_docs as u128) as u64
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CompactionJob {
pub partition_key: Vec<u8>,
pub inputs: Vec<Uuid>,
pub estimated_output_bytes: u64,
}
pub fn select(superfiles: &[SuperfileStats], cfg: &CompactionSettings) -> Vec<CompactionJob> {
let target_bytes = cfg.target_superfile_size_mb.saturating_mul(MIB);
let min_output_bytes =
(target_bytes as u128 * cfg.min_fill_percent.clamp(1, 100) as u128 / 100) as u64;
let mut by_partition: BTreeMap<&[u8], Vec<&SuperfileStats>> = BTreeMap::new();
for s in superfiles {
by_partition.entry(&s.partition_key).or_default().push(s);
}
let mut jobs = Vec::new();
for (key, segs) in by_partition {
pack_partition(key, segs, target_bytes, min_output_bytes, &mut jobs);
}
jobs
}
fn pack_partition(
key: &[u8],
segs: Vec<&SuperfileStats>,
target_bytes: u64,
min_output_bytes: u64,
jobs: &mut Vec<CompactionJob>,
) {
let mut candidates: Vec<&SuperfileStats> = segs
.into_iter()
.filter(|s| !s.sealed_by_other && s.size_bytes < target_bytes)
.collect();
candidates.sort_by(|a, b| {
let lhs = a.tombstoned_docs as u128 * b.n_docs.max(1) as u128;
let rhs = b.tombstoned_docs as u128 * a.n_docs.max(1) as u128;
rhs.cmp(&lhs)
.then(a.size_bytes.cmp(&b.size_bytes))
.then(a.superfile_id.cmp(&b.superfile_id))
});
let mut pending = PendingJob::default();
for s in candidates {
if !pending.fits(s, target_bytes) {
pending.emit(key, min_output_bytes, jobs);
}
pending.push(s);
}
pending.emit(key, min_output_bytes, jobs);
}
#[derive(Default)]
struct PendingJob {
inputs: Vec<Uuid>,
live_bytes: u64,
}
impl PendingJob {
fn fits(&self, s: &SuperfileStats, target_bytes: u64) -> bool {
self.live_bytes + s.live_bytes() <= target_bytes
}
fn push(&mut self, s: &SuperfileStats) {
self.inputs.push(s.superfile_id);
self.live_bytes += s.live_bytes();
}
fn emit(&mut self, key: &[u8], min_output_bytes: u64, jobs: &mut Vec<CompactionJob>) {
if self.inputs.len() >= 2 && self.live_bytes >= min_output_bytes {
jobs.push(CompactionJob {
partition_key: key.to_vec(),
inputs: mem::take(&mut self.inputs),
estimated_output_bytes: self.live_bytes,
});
}
*self = PendingJob::default();
}
}
impl Supertable {
pub(crate) fn compact(&self, cfg: &CompactionSettings) -> Result<(), CompactionError> {
bridge_on_runtime(self.compact_async(cfg), &self.inner().query_runtime())
}
pub(crate) async fn compact_async(
&self,
cfg: &CompactionSettings,
) -> Result<(), CompactionError> {
let inner = self.inner();
match inner.compaction_outstanding.compare_exchange(
false,
true,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => {}
Err(_) => return Err(CompactionError::AlreadyCompacting),
}
let _slot = CompactionSlot(&inner.compaction_outstanding);
let manifest = inner.manifest.load_full();
let superfile_ids: Vec<Uuid> = manifest
.get_all_superfiles()
.iter()
.map(|e| e.superfile_id)
.collect();
let sidecar_map: HashMap<Uuid, (Arc<RoaringBitmap>, Option<SealRecord>)> =
if let Some(cache) = &inner.tombstone_cache {
let now = Instant::now();
cache.prefetch(&superfile_ids, now).await;
superfile_ids
.iter()
.filter_map(|id| match cache.sidecar_for(*id, now) {
Ok((bitmap, seal)) => Some((*id, (bitmap, seal))),
Err(_) => None,
})
.collect()
} else {
HashMap::new()
};
let stats: Vec<SuperfileStats> = manifest
.get_all_superfiles()
.iter()
.map(|entry| {
let (bitmap, seal) = sidecar_map
.get(&entry.superfile_id)
.cloned()
.unwrap_or_else(|| (Arc::new(RoaringBitmap::new()), None));
let tombstoned_docs = bitmap.len();
let sealed_by_other = seal.is_some();
SuperfileStats {
superfile_id: entry.superfile_id,
partition_key: entry.partition_key.clone(),
size_bytes: entry
.subsection_offsets
.as_ref()
.map(|o| o.total_size)
.unwrap_or(0),
n_docs: entry.n_docs,
tombstoned_docs,
sealed_by_other,
}
})
.collect();
let jobs = select(&stats, cfg);
for job in jobs {
self.run_compaction_job(job).await?;
self.refresh()
.await
.map_err(|e| CompactionError::Refresh(e.to_string()))?;
}
Ok(())
}
pub(crate) async fn merge_superfiles(
&self,
superfiles: &[Arc<SuperfileEntry>],
) -> Result<PreparedSuperfile, BuildError> {
let manifest = { self.inner().manifest.load().clone() };
let store = manifest.options.store.clone();
let disk_cache = manifest.options.disk_cache.clone();
let storage = manifest.options.storage.clone();
let tombstone_cache = self.inner().tombstone_cache.clone();
let mut superfile_readers_fut = Vec::with_capacity(superfiles.len());
for entry in superfiles {
let open_fut = async {
let r = open_reader(&store, disk_cache.as_ref(), storage.as_ref(), entry).await;
(entry.superfile_id, r)
};
superfile_readers_fut.push(open_fut);
}
let readers = join_all(superfile_readers_fut).await;
let now = Instant::now();
if let Some(tombstone_cache) = &tombstone_cache {
let superfile_ids = superfiles
.iter()
.map(|entry| entry.superfile_id)
.collect::<Vec<_>>();
tombstone_cache.prefetch(&superfile_ids, now).await;
}
let mut readers_with_tombstones = Vec::with_capacity(readers.len());
for (superfile_id, reader) in readers {
let bitmap = tombstone_cache
.as_ref()
.map(|t| t.bitmap_for(superfile_id, now))
.transpose()
.map_err(|e| BuildError::Store(e.to_string()))?;
let reader = reader.map_err(|e| BuildError::Store(e.to_string()))?;
readers_with_tombstones.push((reader.clone(), bitmap));
}
let (merged_bytes, superfile_stats) =
SuperfileBuilder::build_from_readers(&readers_with_tombstones)?;
let merged_bytes = Bytes::from(merged_bytes);
let shard = ShardOutput::new_with_params(
merged_bytes,
superfile_stats.n_docs,
superfile_stats.id_min,
superfile_stats.id_max,
superfile_stats.scalar_stats,
);
let prepared_superfile = prepare_superfile(self.inner().as_ref(), shard)?;
prepared_superfile.ok_or(BuildError::NoDocsToBuild)
}
pub(crate) async fn run_compaction_job(
&self,
job: CompactionJob,
) -> Result<(), CompactionError> {
let inner = self.inner();
let manifest = inner.manifest.load_full();
let storage = manifest
.options
.storage
.as_ref()
.ok_or(CompactionError::NoStorage)?
.clone();
let wal_store = WalStore::new(storage.clone());
let inputs: Vec<Arc<SuperfileEntry>> = job
.inputs
.iter()
.map(|id| {
manifest
.get_all_superfiles()
.iter()
.find(|e| e.superfile_id == *id)
.cloned()
.ok_or(CompactionError::SuperfileNotFound(*id))
})
.collect::<Result<_, _>>()?;
let compaction_id = Uuid::new_v4();
let sealed_at = Utc::now();
for entry in &inputs {
loop {
match tombstones_admin::seal(
&wal_store,
entry.superfile_id,
compaction_id,
sealed_at,
)
.await
{
Ok(_) => break,
Err(TombstonesAdminError::CasLost { .. }) => {
continue;
}
Err(TombstonesAdminError::AlreadySealed {
superfile_id,
existing_compaction_id,
}) => {
return Err(CompactionError::SidecarConflict {
superfile_id,
existing_compaction_id,
});
}
Err(TombstonesAdminError::WalStore(e)) => {
return Err(CompactionError::Seal(e.to_string()));
}
}
}
}
let merged_segment = self
.merge_superfiles(&inputs)
.await
.map_err(|e| CompactionError::Build(e.to_string()))?;
let new_entries = vec![merged_segment.entry];
let mut pending_storage_writes = vec![
merged_segment
.bytes_for_storage
.ok_or(CompactionError::EmptyMergedSuperfile)?,
];
let opts = Arc::clone(&inner.options);
let max_retries = opts.max_commit_retries.max(1);
for attempt in 0..max_retries {
let current = inner.manifest.load_full();
let entries_to_remove: Vec<Arc<SuperfileEntry>> = job
.inputs
.iter()
.filter_map(|id| {
current
.get_all_superfiles()
.iter()
.find(|e| e.superfile_id == *id)
.cloned()
})
.collect();
if entries_to_remove.len() != job.inputs.len() {
return Ok(());
}
match try_commit_attempt(
storage.clone(),
Arc::clone(&opts),
current,
&new_entries,
&entries_to_remove,
&mut pending_storage_writes,
)
.await
{
Ok(_) => return Ok(()),
Err(CommitError::WriteContentionExhausted) if attempt + 1 < max_retries => {
self.refresh()
.await
.map_err(|e| CompactionError::Refresh(e.to_string()))?;
time::sleep(backoff_delay(attempt)).await;
}
Err(e) => return Err(CompactionError::Commit(e.to_string())),
}
}
Err(CompactionError::Commit(
"commit retries exhausted".to_string(),
))
}
}
#[cfg(test)]
mod tests {
use std::{collections::HashSet, mem, str, sync::Arc};
use arrow_array::LargeStringArray;
use tempfile::TempDir;
use tokio::task;
use super::*;
use crate::{
BoolMode, Supertable,
supertable::{
error::CompactionError,
storage::{LocalFsStorageProvider, StorageProvider},
},
test_helpers::{build_title_batch, default_supertable_options},
};
fn mib(n: u64) -> u64 {
n * MIB
}
fn seg(id: u128, size_mib: u64, n_docs: u64, tombstoned: u64) -> SuperfileStats {
SuperfileStats {
superfile_id: Uuid::from_u128(id),
partition_key: Vec::new(),
size_bytes: mib(size_mib),
n_docs,
tombstoned_docs: tombstoned,
sealed_by_other: false,
}
}
fn default_cfg() -> CompactionSettings {
CompactionSettings::default() }
#[test]
fn empty_input_yields_no_jobs() {
assert!(select(&[], &default_cfg()).is_empty());
}
#[test]
fn below_fill_floor_skips() {
let segs = vec![seg(1, 200, 1000, 0), seg(2, 200, 1000, 0)];
assert!(select(&segs, &default_cfg()).is_empty());
}
#[test]
fn packs_one_job_and_leaves_remainder() {
let segs: Vec<_> = (0..6).map(|i| seg(i, 200, 1000, 0)).collect();
let jobs = select(&segs, &default_cfg());
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].inputs.len(), 5);
assert_eq!(jobs[0].estimated_output_bytes, mib(1000));
}
#[test]
fn splits_many_superfiles_into_multiple_jobs() {
let segs: Vec<_> = (0..12).map(|i| seg(i, 200, 1000, 0)).collect();
let jobs = select(&segs, &default_cfg());
assert_eq!(jobs.len(), 2);
assert!(jobs.iter().all(|j| j.inputs.len() == 5));
}
#[test]
fn already_target_sized_superfile_is_never_re_compacted() {
let big = seg(99, 1024, 1_000_000, 0);
let mut segs = vec![big.clone()];
segs.extend((0..5).map(|i| seg(i, 200, 1000, 0)));
let jobs = select(&segs, &default_cfg());
assert_eq!(jobs.len(), 1);
assert!(!jobs[0].inputs.contains(&big.superfile_id));
}
#[test]
fn output_estimate_uses_live_bytes() {
let segs: Vec<_> = (0..5).map(|i| seg(i, 400, 1000, 500)).collect();
let jobs = select(&segs, &default_cfg());
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].inputs.len(), 5);
assert_eq!(jobs[0].estimated_output_bytes, mib(1000));
}
#[test]
fn prefers_most_deleted_first() {
let mut segs: Vec<_> = (0..9).map(|i| seg(i, 100, 1000, 0)).collect();
let dead_heavy = seg(100, 100, 1000, 900);
segs.push(dead_heavy.clone());
let jobs = select(&segs, &default_cfg());
assert_eq!(jobs[0].inputs[0], dead_heavy.superfile_id);
}
#[test]
fn sealed_by_other_is_excluded() {
let mut owned = seg(1, 200, 1000, 0);
owned.sealed_by_other = true;
let segs = vec![owned, seg(2, 200, 1000, 0), seg(3, 200, 1000, 0)];
for job in select(&segs, &default_cfg()) {
assert!(!job.inputs.contains(&Uuid::from_u128(1)));
}
}
#[test]
fn fewer_than_two_candidates_skips() {
assert!(select(&[seg(1, 200, 1000, 0)], &default_cfg()).is_empty());
}
#[test]
fn live_docs_subtracts_tombstones_and_saturates() {
let s = seg(1, 100, 1000, 250);
assert_eq!(s.live_docs(), 750);
let over = seg(2, 100, 100, 200);
assert_eq!(over.live_docs(), 0);
}
#[test]
fn live_bytes_scales_by_live_fraction() {
let s = seg(1, 100, 1000, 500);
assert_eq!(s.live_bytes(), mib(100) / 2);
}
#[test]
fn live_bytes_zero_docs_is_zero() {
let s = seg(1, 100, 0, 0);
assert_eq!(s.live_bytes(), 0);
}
#[test]
fn pending_job_fits_until_target_exceeded() {
let target = mib(100);
let mut p = PendingJob::default();
let a = seg(1, 60, 1000, 0); assert!(p.fits(&a, target));
p.push(&a);
assert_eq!(p.live_bytes, mib(60));
assert_eq!(p.inputs.len(), 1);
let b = seg(2, 60, 1000, 0);
assert!(!p.fits(&b, target));
let c = seg(3, 40, 1000, 0);
assert!(p.fits(&c, target));
}
#[test]
fn pending_job_emit_requires_two_inputs() {
let mut jobs = Vec::new();
let mut p = PendingJob::default();
p.push(&seg(1, 200, 1000, 0));
p.emit(&[], 0, &mut jobs);
assert!(jobs.is_empty(), "single-input job must not emit");
assert_eq!(p.inputs.len(), 0);
assert_eq!(p.live_bytes, 0);
}
#[tokio::test(flavor = "multi_thread")]
async fn run_compaction_job_unknown_input_surfaces_not_found() {
let dir = TempDir::new().expect("tempdir");
let st = make_st(&dir);
commit_titles(&st, &["alpha first", "alpha second"]);
let bogus = Uuid::from_u128(0xDEAD_BEEF);
let job = CompactionJob {
partition_key: Vec::new(),
inputs: vec![bogus],
estimated_output_bytes: 0,
};
let err = st
.run_compaction_job(job)
.await
.expect_err("must error on unknown input");
assert!(
matches!(err, CompactionError::SuperfileNotFound(id) if id == bogus),
"{err:?}"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn compact_sync_wrapper_runs_jobs() {
let dir = TempDir::new().expect("tempdir");
let st = make_st(&dir);
for titles in [
["alpha first", "alpha second"],
["bravo first", "bravo second"],
["charlie first", "charlie second"],
["delta first", "delta second"],
["echo first", "echo second"],
["foxtrot first", "foxtrot second"],
["golf first", "golf second"],
["hotel first", "hotel second"],
["india first", "india second"],
["juliet first", "juliet second"],
] {
commit_titles(&st, &titles);
}
let before = st.manifest_id();
let cfg = small_compact_cfg();
task::spawn_blocking(move || st.compact(&cfg).map(|_| st.manifest_id()))
.await
.expect("join")
.map(|after| {
assert!(after > before, "sync compact must have run a job");
})
.expect("compact");
}
#[test]
fn partitions_packed_independently() {
let mut segs = Vec::new();
for i in 0..5 {
let mut s = seg(i, 200, 1000, 0);
s.partition_key = vec![0xA];
segs.push(s);
}
for i in 5..10 {
let mut s = seg(i, 200, 1000, 0);
s.partition_key = vec![0xB];
segs.push(s);
}
let jobs = select(&segs, &default_cfg());
assert_eq!(jobs.len(), 2);
let a = jobs
.iter()
.find(|j| j.partition_key == vec![0xA])
.expect("partition A job");
assert!(a.inputs.iter().all(|id| id.as_u128() < 5));
}
#[tokio::test(flavor = "multi_thread")]
async fn merge_superfiles_merges_two_superfiles() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let st =
Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create supertable");
{
let mut w = st.writer().expect("writer");
let batch = build_title_batch(&["first doc", "second doc"]);
w.append(&batch).expect("append");
w.commit().expect("commit");
}
{
let mut w = st.writer().expect("writer");
let batch = build_title_batch(&["third doc", "fourth doc"]);
w.append(&batch).expect("append");
w.commit().expect("commit");
}
let reader = st.reader();
let superfiles: Vec<Arc<SuperfileEntry>> = reader
.manifest()
.get_all_superfiles()
.iter()
.take(2)
.cloned()
.collect();
assert_eq!(superfiles.len(), 2, "should have 2 superfiles");
let _merged_superfile = st
.merge_superfiles(&superfiles)
.await
.expect("merge_superfiles should succeed");
}
#[tokio::test(flavor = "multi_thread")]
async fn merge_superfiles_preserves_scalar_stats() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let st =
Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create supertable");
{
let mut w = st.writer().expect("writer");
let batch = build_title_batch(&["apple", "banana"]);
w.append(&batch).expect("append");
w.commit().expect("commit");
}
{
let mut w = st.writer().expect("writer");
let batch = build_title_batch(&["cherry", "date"]);
w.append(&batch).expect("append");
w.commit().expect("commit");
}
let reader = st.reader();
let superfiles: Vec<Arc<SuperfileEntry>> = reader
.manifest()
.get_all_superfiles()
.iter()
.take(2)
.cloned()
.collect();
let expected_n_docs: u64 = superfiles.iter().map(|sf| sf.n_docs).sum();
let expected_id_min = superfiles
.iter()
.map(|sf| sf.id_min)
.min()
.unwrap_or(i128::MAX);
let expected_id_max = superfiles
.iter()
.map(|sf| sf.id_max)
.max()
.unwrap_or(i128::MIN);
let merged_superfile = st
.merge_superfiles(&superfiles)
.await
.expect("merge_superfiles should succeed");
assert_eq!(
merged_superfile.entry.n_docs, expected_n_docs,
"n_docs should be sum of input superfiles"
);
assert_eq!(
merged_superfile.entry.id_min, expected_id_min,
"id_min should be minimum across all superfiles"
);
assert_eq!(
merged_superfile.entry.id_max, expected_id_max,
"id_max should be maximum across all superfiles"
);
let title_stats = merged_superfile
.entry
.scalar_stats
.get("title")
.expect("merged entry should have title column stats");
let title_min_arr = title_stats
.min
.as_any()
.downcast_ref::<LargeStringArray>()
.expect("title column should be LargeStringArray");
let title_max_arr = title_stats
.max
.as_any()
.downcast_ref::<LargeStringArray>()
.expect("title column should be LargeStringArray");
let min_value = title_min_arr.value(0);
let max_value = title_max_arr.value(0);
assert_eq!(min_value, "apple", "minimum title should be 'apple'");
assert_eq!(max_value, "date", "maximum title should be 'date'");
}
#[tokio::test(flavor = "multi_thread")]
async fn merge_superfiles_combines_multiple_superfiles() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let st =
Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create supertable");
let batch_titles = [
["alpha first", "alpha second"],
["beta first", "beta second"],
["gamma first", "gamma second"],
];
for titles in &batch_titles {
let mut w = st.writer().expect("writer");
let batch = build_title_batch(titles);
w.append(&batch).expect("append");
w.commit().expect("commit");
}
let reader = st.reader();
let superfiles: Vec<Arc<SuperfileEntry>> = reader
.manifest()
.get_all_superfiles()
.iter()
.take(3)
.cloned()
.collect();
assert_eq!(superfiles.len(), 3, "should have 3 superfiles");
let merged_superfile = st
.merge_superfiles(&superfiles)
.await
.expect("merge_superfiles should succeed");
assert_eq!(
merged_superfile.entry.n_docs, 6,
"merged superfile should have 6 documents (3 files × 2 docs each)"
);
let source_id_min = superfiles
.iter()
.map(|sf| sf.id_min)
.min()
.unwrap_or(i128::MAX);
let source_id_max = superfiles
.iter()
.map(|sf| sf.id_max)
.max()
.unwrap_or(i128::MIN);
assert_eq!(merged_superfile.entry.id_min, source_id_min);
assert_eq!(merged_superfile.entry.id_max, source_id_max);
let merged_reader = merged_superfile
.open_reader()
.expect("merged superfile should have bytes")
.expect("open reader on merged superfile");
assert_eq!(merged_reader.n_docs(), 6, "reader should report 6 docs");
for term in &["alpha", "beta", "gamma"] {
let hits = merged_reader
.token_match("title", &[*term], BoolMode::And)
.await
.unwrap_or_else(|_| panic!("token_match for '{term}'"));
assert_eq!(hits.len(), 2, "term '{term}' should match exactly 2 docs");
}
}
#[tokio::test(flavor = "multi_thread")]
async fn merge_superfiles_single_superfile() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let st =
Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create supertable");
{
let mut w = st.writer().expect("writer");
let batch = build_title_batch(&["only doc", "second doc"]);
w.append(&batch).expect("append");
w.commit().expect("commit");
}
let reader = st.reader();
let superfiles: Vec<Arc<SuperfileEntry>> = reader
.manifest()
.get_all_superfiles()
.iter()
.take(1)
.cloned()
.collect();
assert_eq!(superfiles.len(), 1, "should have 1 superfile");
let merged_superfile = st
.merge_superfiles(&superfiles)
.await
.expect("merge_superfiles should succeed");
assert_eq!(
merged_superfile.entry.n_docs, 2,
"merged superfile should have 2 documents"
);
let source_id_min = superfiles
.iter()
.map(|sf| sf.id_min)
.min()
.unwrap_or(i128::MAX);
let source_id_max = superfiles
.iter()
.map(|sf| sf.id_max)
.max()
.unwrap_or(i128::MIN);
assert_eq!(merged_superfile.entry.id_min, source_id_min);
assert_eq!(merged_superfile.entry.id_max, source_id_max);
let merged_reader = merged_superfile
.open_reader()
.expect("merged superfile should have bytes")
.expect("open reader on merged superfile");
assert_eq!(merged_reader.n_docs(), 2, "reader should report 2 docs");
let only_hits = merged_reader
.token_match("title", &["only"], BoolMode::And)
.await
.expect("token_match for 'only'");
assert_eq!(
only_hits.len(),
1,
"should find exactly 1 doc matching 'only'"
);
let second_hits = merged_reader
.token_match("title", &["second"], BoolMode::And)
.await
.expect("token_match for 'second'");
assert_eq!(
second_hits.len(),
1,
"should find exactly 1 doc matching 'second'"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn compact_in_memory_table_takes_empty_sidecar_fallback() {
let st =
Supertable::create(default_supertable_options()).expect("create in-memory supertable");
{
let mut w = st.writer().expect("writer");
w.append(&build_title_batch(&["alpha first", "alpha second"]))
.expect("append");
w.commit().expect("commit");
}
let before = st.manifest_id();
st.compact_async(&small_compact_cfg())
.await
.expect("in-memory compact is a no-op, not an error");
assert_eq!(
st.manifest_id(),
before,
"single superfile yields no compaction job"
);
}
fn make_st(dir: &TempDir) -> Supertable {
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create supertable")
}
fn small_compact_cfg() -> CompactionSettings {
CompactionSettings {
target_superfile_size_mb: 1,
min_fill_percent: 1,
..CompactionSettings::default()
}
}
fn commit_titles(st: &Supertable, titles: &[&str]) {
let mut w = st.writer().expect("writer");
w.append(&build_title_batch(titles)).expect("append");
w.commit().expect("commit");
}
#[tokio::test(flavor = "multi_thread")]
async fn compact_rejects_concurrent_call_while_slot_held() {
let dir = TempDir::new().expect("tempdir");
let st = make_st(&dir);
st.inner()
.compaction_outstanding
.store(true, Ordering::Release);
let err = st
.compact_async(&small_compact_cfg())
.await
.expect_err("must reject while slot held");
assert!(
matches!(err, CompactionError::AlreadyCompacting),
"expected AlreadyCompacting, got {err:?}"
);
st.inner()
.compaction_outstanding
.store(false, Ordering::Release);
}
#[tokio::test(flavor = "multi_thread")]
async fn compact_slot_released_after_completion() {
let dir = TempDir::new().expect("tempdir");
let st = make_st(&dir);
commit_titles(&st, &["alpha first", "alpha second"]);
st.compact_async(&small_compact_cfg())
.await
.expect("first compact");
st.compact_async(&small_compact_cfg())
.await
.expect("second compact after slot release");
}
#[tokio::test(flavor = "multi_thread")]
async fn compact_succeeds_when_concurrent_writer_commits_during_compaction() {
let dir = TempDir::new().expect("tempdir");
let st = make_st(&dir);
for title in &[
["alpha first", "alpha second"],
["bravo first", "bravo second"],
["charlie first", "charlie second"],
["delta first", "delta second"],
["echo first", "echo second"],
["foxtrot first", "foxtrot second"],
["golf first", "golf second"],
["hotel first", "hotel second"],
["india first", "india second"],
["juliet first", "juliet second"],
] {
commit_titles(&st, title);
}
let before_docs = st.reader().n_docs_total();
let st2 = st.clone();
let writer_handle = task::spawn_blocking(move || {
commit_titles(&st2, &["kilo first", "kilo second"]);
});
st.compact_async(&small_compact_cfg())
.await
.expect("compact must succeed despite concurrent writer");
writer_handle.await.expect("writer task");
st.refresh().await.expect("refresh");
let after_docs = st.reader().n_docs_total();
assert_eq!(
after_docs,
before_docs + 2,
"writer's 2 docs must survive alongside compacted data"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn compact_reduces_superfile_count() {
let dir = TempDir::new().expect("tempdir");
let st = make_st(&dir);
commit_titles(&st, &["alpha cherry", "alpha mango"]);
commit_titles(&st, &["bravo cherry", "bravo mango"]);
commit_titles(&st, &["charlie delta", "charlie echo"]);
commit_titles(&st, &["foxtrot golf", "foxtrot hotel"]);
commit_titles(&st, &["india first", "india second"]);
commit_titles(&st, &["lima first", "lima second"]);
commit_titles(&st, &["november first", "november second"]);
commit_titles(&st, &["quebec first", "quebec second"]);
commit_titles(&st, &["romeo first", "romeo second"]);
commit_titles(&st, &["sierra first", "sierra second"]);
let before = st.reader();
let before_manifest_id = before.manifest_id();
let before_n_superfiles = before.n_superfiles();
let input_ids: HashSet<Uuid> = before
.manifest()
.superfiles
.iter()
.map(|s| s.superfile_id)
.collect();
let expected_docs = before.n_docs_total();
let expected_id_min = before
.manifest()
.superfiles
.iter()
.map(|s| s.id_min)
.min()
.expect("at least one superfile before compaction");
let expected_id_max = before
.manifest()
.superfiles
.iter()
.map(|s| s.id_max)
.max()
.expect("at least one superfile before compaction");
st.compact_async(&small_compact_cfg())
.await
.expect("compact");
let after = st.reader();
let sfs = &after.manifest().superfiles;
assert!(
after.manifest_id() == before_manifest_id + 1,
"no compaction jobs ran; adjust small_compact_cfg() if superfiles exceed \
min_output_bytes"
);
assert!(
sfs.len() < before_n_superfiles,
"superfile count should decrease after compaction"
);
assert!(
!sfs.iter().any(|s| input_ids.contains(&s.superfile_id)),
"original superfile IDs must not appear after compaction"
);
assert_eq!(after.n_docs_total(), expected_docs);
let merged_min = sfs
.iter()
.map(|s| s.id_min)
.min()
.expect("at least one superfile after compaction");
let merged_max = sfs
.iter()
.map(|s| s.id_max)
.max()
.expect("at least one superfile after compaction");
assert!(merged_min == expected_id_min);
assert!(merged_max == expected_id_max);
assert!(sfs.iter().all(|s| s.partition_key == sfs[0].partition_key));
let fts = sfs[0]
.fts_summary
.get("title")
.expect("fts summary present");
for term in &[
b"alpha" as &[u8],
b"bravo",
b"charlie",
b"foxtrot",
b"india",
b"lima",
b"november",
b"quebec",
b"romeo",
b"sierra",
] {
assert!(
fts.may_contain(term),
"bloom missing term '{}'",
str::from_utf8(term).expect("term literal is valid utf-8")
);
}
mem::forget(dir);
}
#[tokio::test(flavor = "multi_thread")]
async fn compact_no_op_when_single_superfile() {
let dir = TempDir::new().expect("tempdir");
let st = make_st(&dir);
commit_titles(&st, &["only doc", "second doc"]);
let before_manifest_id = st.manifest_id();
let before_n = st.reader().n_superfiles();
st.compact_async(&small_compact_cfg())
.await
.expect("compact");
assert_eq!(
st.manifest_id(),
before_manifest_id,
"manifest_id must not change: a single superfile cannot form a merge job"
);
assert_eq!(st.reader().n_superfiles(), before_n);
}
#[tokio::test(flavor = "multi_thread")]
async fn compact_no_op_when_below_fill_floor() {
let dir = TempDir::new().expect("tempdir");
let st = make_st(&dir);
commit_titles(&st, &["alpha first", "alpha second"]);
commit_titles(&st, &["beta first", "beta second"]);
let before_manifest_id = st.manifest_id();
let cfg = CompactionSettings {
target_superfile_size_mb: 1024,
min_fill_percent: 100,
..CompactionSettings::default()
};
st.compact_async(&cfg).await.expect("compact");
assert_eq!(
st.manifest_id(),
before_manifest_id,
"manifest must not change when combined size is below the fill floor"
);
assert_eq!(st.reader().n_superfiles(), 2);
}
#[tokio::test(flavor = "multi_thread")]
async fn reader_pinned_before_compact_sees_old_state() {
let dir = TempDir::new().expect("tempdir");
let st = make_st(&dir);
commit_titles(&st, &["alpha first", "alpha second"]);
commit_titles(&st, &["bravo first", "bravo second"]);
commit_titles(&st, &["charlie first", "charlie second"]);
commit_titles(&st, &["delta first", "delta second"]);
commit_titles(&st, &["echo first", "echo second"]);
commit_titles(&st, &["foxtrot first", "foxtrot second"]);
commit_titles(&st, &["golf first", "golf second"]);
commit_titles(&st, &["hotel first", "hotel second"]);
commit_titles(&st, &["india first", "india second"]);
commit_titles(&st, &["juliet first", "juliet second"]);
let reader_before = st.reader();
let before_n = reader_before.n_superfiles();
let before_manifest_id = reader_before.manifest_id();
st.compact_async(&small_compact_cfg())
.await
.expect("compact");
let reader_after = st.reader();
assert_eq!(reader_before.n_superfiles(), before_n);
assert_eq!(reader_before.manifest_id(), before_manifest_id);
assert!(
reader_after.manifest_id() > before_manifest_id,
"compact must have run for snapshot isolation to be observable; \
adjust small_compact_cfg() if needed"
);
assert!(reader_after.n_superfiles() < before_n);
}
#[tokio::test(flavor = "multi_thread")]
async fn fts_search_returns_correct_results_after_compact() {
let dir = TempDir::new().expect("tempdir");
let st = make_st(&dir);
commit_titles(&st, &["alpha first", "alpha second"]);
commit_titles(&st, &["bravo first", "bravo second"]);
commit_titles(&st, &["charlie first", "charlie second"]);
commit_titles(&st, &["delta first", "delta second"]);
commit_titles(&st, &["echo first", "echo second"]);
commit_titles(&st, &["foxtrot first", "foxtrot second"]);
commit_titles(&st, &["golf first", "golf second"]);
commit_titles(&st, &["hotel first", "hotel second"]);
commit_titles(&st, &["india first", "india second"]);
commit_titles(&st, &["juliet first", "juliet second"]);
let before_manifest_id = st.manifest_id();
st.compact_async(&small_compact_cfg())
.await
.expect("compact");
assert!(
st.manifest_id() == before_manifest_id + 1,
"compact must have run; adjust small_compact_cfg() if needed"
);
for term in &["alpha", "bravo", "charlie"] {
let n: usize = st
.token_match("title", term, BoolMode::And, None)
.unwrap_or_else(|e| panic!("token_match for '{term}': {e}"))
.iter()
.map(|b| b.num_rows())
.sum();
assert_eq!(n, 2, "term '{term}' should match 2 docs after compact");
}
let n_first: usize = st
.token_match("title", "first", BoolMode::And, None)
.expect("token_match for 'first'")
.iter()
.map(|b| b.num_rows())
.sum();
assert_eq!(n_first, 10, "'first' should match 10 docs");
}
#[tokio::test(flavor = "multi_thread")]
async fn fts_bloom_filter_covers_all_terms_after_compact() {
let dir = TempDir::new().expect("tempdir");
let st = make_st(&dir);
commit_titles(&st, &["alpha first", "alpha second"]);
commit_titles(&st, &["bravo first", "bravo second"]);
commit_titles(&st, &["charlie first", "charlie second"]);
commit_titles(&st, &["delta first", "delta second"]);
commit_titles(&st, &["echo first", "echo second"]);
commit_titles(&st, &["foxtrot first", "foxtrot second"]);
commit_titles(&st, &["golf first", "golf second"]);
commit_titles(&st, &["hotel first", "hotel second"]);
commit_titles(&st, &["india first", "india second"]);
commit_titles(&st, &["juliet first", "juliet second"]);
let before_manifest_id = st.manifest_id();
st.compact_async(&small_compact_cfg())
.await
.expect("compact");
assert!(
st.manifest_id() == before_manifest_id + 1,
"compact must have run; adjust small_compact_cfg() if needed"
);
let r = st.reader();
let sfs = &r.manifest().superfiles;
assert!(sfs.len() < 10, "superfile count should have decreased");
let fts = sfs[0]
.fts_summary
.get("title")
.expect("fts summary present");
for term in &[
b"alpha" as &[u8],
b"bravo",
b"charlie",
b"delta",
b"echo",
b"foxtrot",
b"golf",
b"hotel",
b"india",
b"juliet",
] {
assert!(
fts.may_contain(term),
"bloom missing term '{}'",
str::from_utf8(term).expect("term literal is valid utf-8")
);
}
}
#[tokio::test(flavor = "multi_thread")]
async fn second_compact_is_no_op_after_full_merge() {
let dir = TempDir::new().expect("tempdir");
let st = make_st(&dir);
commit_titles(&st, &["alpha first", "alpha second"]);
commit_titles(&st, &["bravo first", "bravo second"]);
commit_titles(&st, &["charlie first", "charlie second"]);
commit_titles(&st, &["delta first", "delta second"]);
commit_titles(&st, &["echo first", "echo second"]);
commit_titles(&st, &["foxtrot first", "foxtrot second"]);
commit_titles(&st, &["golf first", "golf second"]);
commit_titles(&st, &["hotel first", "hotel second"]);
commit_titles(&st, &["india first", "india second"]);
commit_titles(&st, &["juliet first", "juliet second"]);
let before_first_compact = st.manifest_id();
st.compact_async(&small_compact_cfg())
.await
.expect("first compact");
assert!(
st.manifest_id() == before_first_compact + 1,
"first compact must have run; adjust small_compact_cfg() if needed"
);
assert_eq!(st.inner().manifest.load_full().superfiles.len(), 1);
let after_first_manifest_id = st.manifest_id();
let after_first_n = st.reader().n_superfiles();
st.compact_async(&small_compact_cfg())
.await
.expect("second compact");
assert_eq!(
st.manifest_id(),
after_first_manifest_id,
"second compact should produce no jobs"
);
assert_eq!(st.reader().n_superfiles(), after_first_n);
}
#[tokio::test(flavor = "multi_thread")]
async fn compact_runs_multiple_compactions_on_separate_file_sets() {
let dir = TempDir::new().expect("tempdir");
let st = make_st(&dir);
commit_titles(&st, &["alpha first", "alpha second"]);
commit_titles(&st, &["bravo first", "bravo second"]);
commit_titles(&st, &["charlie first", "charlie second"]);
commit_titles(&st, &["delta first", "delta second"]);
commit_titles(&st, &["echo first", "echo second"]);
commit_titles(&st, &["foxtrot first", "foxtrot second"]);
commit_titles(&st, &["golf first", "golf second"]);
commit_titles(&st, &["hotel first", "hotel second"]);
commit_titles(&st, &["india first", "india second"]);
commit_titles(&st, &["juliet first", "juliet second"]);
let before_first_compact = st.manifest_id();
st.compact_async(&small_compact_cfg())
.await
.expect("first compact");
let manifest_id_after_first_compact = st.manifest_id();
assert_eq!(manifest_id_after_first_compact, before_first_compact + 1);
assert_eq!(
st.reader().n_docs_total(),
20,
"batch A should have 20 docs"
);
commit_titles(&st, &["kilo first", "kilo second"]);
commit_titles(&st, &["lima first", "lima second"]);
commit_titles(&st, &["mike first", "mike second"]);
commit_titles(&st, &["november first", "november second"]);
commit_titles(&st, &["oscar first", "oscar second"]);
commit_titles(&st, &["papa first", "papa second"]);
commit_titles(&st, &["quebec first", "quebec second"]);
commit_titles(&st, &["romeo first", "romeo second"]);
commit_titles(&st, &["sierra first", "sierra second"]);
commit_titles(&st, &["tango first", "tango second"]);
st.compact_async(&small_compact_cfg())
.await
.expect("second compact");
assert!(
st.manifest_id() == manifest_id_after_first_compact + 10 + 1,
"second compact must have run a job on the batch-B superfiles"
);
let r = st.reader();
assert_eq!(r.n_docs_total(), 40, "all docs must be preserved");
assert!(
r.n_superfiles() < 8,
"overall superfile count must have decreased from original 20"
);
let sfs = &r.manifest().superfiles;
let total_from_manifest: u64 = sfs.iter().map(|s| s.n_docs).sum();
assert_eq!(total_from_manifest, 40);
for sf in sfs.iter() {
assert!(sf.id_min <= sf.id_max);
}
drop(r);
for term in &[
"alpha", "bravo", "charlie", "delta", "echo", "foxtrot", "golf", "hotel", "india",
"juliet", "kilo", "lima", "mike", "november", "oscar", "papa", "quebec", "romeo",
"sierra", "tango",
] {
let n: usize = st
.token_match("title", term, BoolMode::And, None)
.unwrap_or_else(|e| panic!("token_match for '{term}': {e}"))
.iter()
.map(|b| b.num_rows())
.sum();
assert_eq!(n, 2, "term '{term}' should match exactly 2 docs");
}
}
#[tokio::test(flavor = "multi_thread")]
async fn compact_runs_multiple_compactions_on_separate_file_sets_in_same_job() {
let dir = TempDir::new().expect("tempdir");
let st = make_st(&dir);
let commit_bulk = |titles: &[&str]| {
let mut w = st.writer().expect("writer");
for _ in 0..4096 {
w.append(&build_title_batch(titles)).expect("append");
}
w.commit().expect("commit");
};
commit_bulk(&["alpha first", "alpha second"]);
commit_bulk(&["bravo first", "bravo second"]);
commit_bulk(&["charlie first", "charlie second"]);
commit_bulk(&["delta first", "delta second"]);
commit_bulk(&["echo first", "echo second"]);
commit_bulk(&["foxtrot first", "foxtrot second"]);
commit_bulk(&["golf first", "golf second"]);
commit_bulk(&["hotel first", "hotel second"]);
commit_bulk(&["india first", "india second"]);
commit_bulk(&["juliet first", "juliet second"]);
for _ in 0..2 {
commit_bulk(&["kilo first", "kilo second"]);
commit_bulk(&["lima first", "lima second"]);
commit_bulk(&["mike first", "mike second"]);
commit_bulk(&["november first", "november second"]);
commit_bulk(&["oscar first", "oscar second"]);
commit_bulk(&["papa first", "papa second"]);
commit_bulk(&["quebec first", "quebec second"]);
commit_bulk(&["romeo first", "romeo second"]);
commit_bulk(&["sierra first", "sierra second"]);
commit_bulk(&["tango first", "tango second"]);
}
let manifest_id_before_first_compact = st.manifest_id();
st.compact_async(&small_compact_cfg())
.await
.expect("second compact");
assert!(
st.manifest_id() == manifest_id_before_first_compact + 2,
"compact must have run two jobs, one per file set"
);
let r = st.reader();
assert_eq!(r.n_docs_total(), 245760, "all docs must be preserved");
assert!(
r.n_superfiles() == 2,
"overall superfile count must have decreased from original 30"
);
let sfs = &r.manifest().superfiles;
let total_from_manifest: u64 = sfs.iter().map(|s| s.n_docs).sum();
assert_eq!(total_from_manifest, 245760);
for sf in sfs.iter() {
assert!(sf.id_min <= sf.id_max);
}
drop(r);
for term in &[
"alpha", "bravo", "charlie", "delta", "echo", "foxtrot", "golf", "hotel", "india",
"juliet",
] {
let n: usize = st
.token_match("title", term, BoolMode::And, None)
.unwrap_or_else(|e| panic!("token_match for '{term}': {e}"))
.iter()
.map(|b| b.num_rows())
.sum();
assert_eq!(n, 8192, "term '{term}' should match exactly 8192 docs");
}
for term in &[
"kilo", "lima", "mike", "november", "oscar", "papa", "quebec", "romeo", "sierra",
"tango",
] {
let n: usize = st
.token_match("title", term, BoolMode::And, None)
.unwrap_or_else(|e| panic!("token_match for '{term}': {e}"))
.iter()
.map(|b| b.num_rows())
.sum();
assert_eq!(n, 16384, "term '{term}' should match exactly 16384 docs");
}
}
}