use std::borrow::Cow;
use std::collections::HashMap;
use std::ops::{AddAssign, Range};
use std::sync::{Arc, RwLock};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::{StreamExt, TryStreamExt};
use lance_core::utils::tokio::get_num_compute_intensive_cpus;
use lance_index::DatasetIndexExt;
use lance_table::io::deletion::read_deletion_file;
use roaring::{RoaringBitmap, RoaringTreemap};
use serde::{Deserialize, Serialize};
use crate::io::commit::{commit_transaction, migrate_fragments};
use crate::Dataset;
use crate::Result;
use lance_table::format::{Fragment, RowIdMeta};
use super::fragment::FileFragment;
use super::index::DatasetIndexRemapperOptions;
use super::rowids::load_row_id_sequences;
use super::transaction::{Operation, RewriteGroup, RewrittenIndex, Transaction};
use super::utils::make_rowid_capture_stream;
use super::{write_fragments_internal, WriteMode, WriteParams};
mod remapping;
pub use remapping::{IgnoreRemap, IndexRemapper, IndexRemapperOptions, RemappedIndex};
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct CompactionOptions {
pub target_rows_per_fragment: usize,
pub max_rows_per_group: usize,
pub max_bytes_per_file: Option<usize>,
pub materialize_deletions: bool,
pub materialize_deletions_threshold: f32,
pub num_threads: Option<usize>,
pub batch_size: Option<usize>,
}
impl Default for CompactionOptions {
fn default() -> Self {
Self {
target_rows_per_fragment: 1024 * 1024,
max_rows_per_group: 1024,
materialize_deletions: true,
materialize_deletions_threshold: 0.1,
num_threads: None,
max_bytes_per_file: None,
batch_size: None,
}
}
}
impl CompactionOptions {
pub fn validate(&mut self) {
if self.materialize_deletions && self.materialize_deletions_threshold >= 1.0 {
self.materialize_deletions = false;
}
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct CompactionMetrics {
pub fragments_removed: usize,
pub fragments_added: usize,
pub files_removed: usize,
pub files_added: usize,
}
impl AddAssign for CompactionMetrics {
fn add_assign(&mut self, rhs: Self) {
self.fragments_removed += rhs.fragments_removed;
self.fragments_added += rhs.fragments_added;
self.files_removed += rhs.files_removed;
self.files_added += rhs.files_added;
}
}
pub async fn compact_files(
dataset: &mut Dataset,
mut options: CompactionOptions,
remap_options: Option<Arc<dyn IndexRemapperOptions>>, ) -> Result<CompactionMetrics> {
options.validate();
let compaction_plan: CompactionPlan = plan_compaction(dataset, &options).await?;
if compaction_plan.tasks().is_empty() {
return Ok(CompactionMetrics::default());
}
let dataset_ref = &dataset.clone();
let result_stream = futures::stream::iter(compaction_plan.tasks.into_iter())
.map(|task| rewrite_files(Cow::Borrowed(dataset_ref), task, &options))
.buffer_unordered(
options
.num_threads
.unwrap_or_else(get_num_compute_intensive_cpus),
);
let completed_tasks: Vec<RewriteResult> = result_stream.try_collect().await?;
let remap_options = remap_options.unwrap_or(Arc::new(DatasetIndexRemapperOptions::default()));
let metrics = commit_compaction(dataset, completed_tasks, remap_options).await?;
Ok(metrics)
}
#[derive(Debug)]
struct FragmentMetrics {
pub physical_rows: usize,
pub num_deletions: usize,
}
impl FragmentMetrics {
fn deletion_percentage(&self) -> f32 {
if self.physical_rows > 0 {
self.num_deletions as f32 / self.physical_rows as f32
} else {
0.0
}
}
fn num_rows(&self) -> usize {
self.physical_rows - self.num_deletions
}
}
async fn collect_metrics(fragment: &FileFragment) -> Result<FragmentMetrics> {
let physical_rows = fragment.physical_rows();
let num_deletions = fragment.count_deletions();
let (physical_rows, num_deletions) =
futures::future::try_join(physical_rows, num_deletions).await?;
Ok(FragmentMetrics {
physical_rows,
num_deletions,
})
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct CompactionPlan {
tasks: Vec<TaskData>,
read_version: u64,
options: CompactionOptions,
}
impl CompactionPlan {
pub fn compaction_tasks(&self) -> impl Iterator<Item = CompactionTask> + '_ {
let read_version = self.read_version;
let options = self.options.clone();
self.tasks.iter().map(move |task| CompactionTask {
task: task.clone(),
read_version,
options: options.clone(),
})
}
pub fn num_tasks(&self) -> usize {
self.tasks.len()
}
pub fn read_version(&self) -> u64 {
self.read_version
}
pub fn options(&self) -> &CompactionOptions {
&self.options
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct TaskData {
pub fragments: Vec<Fragment>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct CompactionTask {
pub task: TaskData,
pub read_version: u64,
options: CompactionOptions,
}
impl CompactionTask {
pub async fn execute(&self, dataset: &Dataset) -> Result<RewriteResult> {
let dataset = if dataset.manifest.version == self.read_version {
Cow::Borrowed(dataset)
} else {
Cow::Owned(dataset.checkout_version(self.read_version).await?)
};
rewrite_files(dataset, self.task.clone(), &self.options).await
}
}
impl CompactionPlan {
fn new(read_version: u64, options: CompactionOptions) -> Self {
Self {
tasks: Vec::new(),
read_version,
options,
}
}
fn extend_tasks(&mut self, tasks: impl IntoIterator<Item = TaskData>) {
self.tasks.extend(tasks);
}
fn tasks(&self) -> &[TaskData] {
&self.tasks
}
}
#[derive(Debug, Clone)]
enum CompactionCandidacy {
CompactWithNeighbors,
CompactItself,
}
struct CandidateBin {
pub fragments: Vec<Fragment>,
pub pos_range: Range<usize>,
pub candidacy: Vec<CompactionCandidacy>,
pub row_counts: Vec<usize>,
pub indices: Vec<usize>,
}
impl CandidateBin {
fn is_noop(&self) -> bool {
if self.fragments.is_empty() {
return true;
}
if self.fragments.len() == 1 {
matches!(self.candidacy[0], CompactionCandidacy::CompactWithNeighbors)
} else {
false
}
}
fn split_for_size(mut self, min_num_rows: usize) -> Vec<Self> {
let mut bins = Vec::new();
loop {
let mut bin_len = 0;
let mut bin_row_count = 0;
while bin_row_count < min_num_rows && bin_len < self.row_counts.len() {
bin_row_count += self.row_counts[bin_len];
bin_len += 1;
}
if self.row_counts[bin_len..].iter().sum::<usize>() >= min_num_rows {
bins.push(Self {
fragments: self.fragments.drain(0..bin_len).collect(),
pos_range: self.pos_range.start..(self.pos_range.start + bin_len),
candidacy: self.candidacy.drain(0..bin_len).collect(),
row_counts: self.row_counts.drain(0..bin_len).collect(),
indices: Vec::new(),
});
self.pos_range.start += bin_len;
} else {
bins.push(self);
break;
}
}
bins
}
}
async fn load_index_fragmaps(dataset: &Dataset) -> Result<Vec<RoaringBitmap>> {
let indices = dataset.load_indices().await?;
let mut index_fragmaps = Vec::with_capacity(indices.len());
for index in indices.iter() {
if let Some(fragment_bitmap) = index.fragment_bitmap.as_ref() {
index_fragmaps.push(fragment_bitmap.clone());
} else {
let dataset_at_index = dataset.checkout_version(index.dataset_version).await?;
let frags = 0..dataset_at_index.manifest.max_fragment_id;
index_fragmaps.push(RoaringBitmap::from_sorted_iter(frags).unwrap());
}
}
Ok(index_fragmaps)
}
pub async fn plan_compaction(
dataset: &Dataset,
options: &CompactionOptions,
) -> Result<CompactionPlan> {
debug_assert!(
dataset
.get_fragments()
.windows(2)
.all(|w| w[0].id() < w[1].id()),
"fragments in manifest are not sorted"
);
let mut fragment_metrics = futures::stream::iter(dataset.get_fragments())
.map(|fragment| async move {
match collect_metrics(&fragment).await {
Ok(metrics) => Ok((fragment.metadata, metrics)),
Err(e) => Err(e),
}
})
.buffered(dataset.object_store().io_parallelism());
let index_fragmaps = load_index_fragmaps(dataset).await?;
let indices_containing_frag = |frag_id: u32| {
index_fragmaps
.iter()
.enumerate()
.filter(|(_, bitmap)| bitmap.contains(frag_id))
.map(|(pos, _)| pos)
.collect::<Vec<_>>()
};
let mut candidate_bins: Vec<CandidateBin> = Vec::new();
let mut current_bin: Option<CandidateBin> = None;
let mut i = 0;
while let Some(res) = fragment_metrics.next().await {
let (fragment, metrics) = res?;
let candidacy = if options.materialize_deletions
&& metrics.deletion_percentage() > options.materialize_deletions_threshold
{
Some(CompactionCandidacy::CompactItself)
} else if metrics.physical_rows < options.target_rows_per_fragment {
Some(CompactionCandidacy::CompactWithNeighbors)
} else {
None
};
let indices = indices_containing_frag(fragment.id as u32);
match (candidacy, &mut current_bin) {
(None, None) => {} (Some(candidacy), None) => {
current_bin = Some(CandidateBin {
fragments: vec![fragment],
pos_range: i..(i + 1),
candidacy: vec![candidacy],
row_counts: vec![metrics.num_rows()],
indices,
});
}
(Some(candidacy), Some(bin)) => {
if bin.indices == indices {
bin.fragments.push(fragment);
bin.pos_range.end += 1;
bin.candidacy.push(candidacy);
bin.row_counts.push(metrics.num_rows());
} else {
candidate_bins.push(current_bin.take().unwrap());
current_bin = Some(CandidateBin {
fragments: vec![fragment],
pos_range: i..(i + 1),
candidacy: vec![candidacy],
row_counts: vec![metrics.num_rows()],
indices,
});
}
}
(None, Some(_)) => {
candidate_bins.push(current_bin.take().unwrap());
}
}
i += 1;
}
if let Some(bin) = current_bin {
candidate_bins.push(bin);
}
let final_bins = candidate_bins
.into_iter()
.filter(|bin| !bin.is_noop())
.flat_map(|bin| bin.split_for_size(options.target_rows_per_fragment))
.map(|bin| TaskData {
fragments: bin.fragments,
});
let mut compaction_plan = CompactionPlan::new(dataset.manifest.version, options.clone());
compaction_plan.extend_tasks(final_bins);
Ok(compaction_plan)
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct RewriteResult {
pub metrics: CompactionMetrics,
pub new_fragments: Vec<Fragment>,
pub read_version: u64,
pub original_fragments: Vec<Fragment>,
pub row_id_map: HashMap<u64, Option<u64>>,
}
async fn reserve_fragment_ids(
dataset: &Dataset,
fragments: impl ExactSizeIterator<Item = &mut Fragment>,
) -> Result<()> {
let transaction = Transaction::new(
dataset.manifest.version,
Operation::ReserveFragments {
num_fragments: fragments.len() as u32,
},
None,
None,
);
let manifest = commit_transaction(
dataset,
dataset.object_store(),
dataset.commit_handler.as_ref(),
&transaction,
&Default::default(),
&Default::default(),
dataset.manifest_naming_scheme,
)
.await?;
let new_max_exclusive = manifest.max_fragment_id + 1;
let reserved_ids = (new_max_exclusive - fragments.len() as u32)..(new_max_exclusive);
for (fragment, new_id) in fragments.zip(reserved_ids) {
fragment.id = new_id as u64;
}
Ok(())
}
async fn rewrite_files(
dataset: Cow<'_, Dataset>,
task: TaskData,
options: &CompactionOptions,
) -> Result<RewriteResult> {
let mut metrics = CompactionMetrics::default();
if task.fragments.is_empty() {
return Ok(RewriteResult {
metrics,
new_fragments: Vec::new(),
read_version: dataset.manifest.version,
original_fragments: task.fragments,
row_id_map: HashMap::new(),
});
}
let previous_writer_version = &dataset.manifest.writer_version;
let recompute_stats = previous_writer_version.is_none();
let fragments = migrate_fragments(dataset.as_ref(), &task.fragments, recompute_stats).await?;
let num_rows = fragments
.iter()
.map(|f| f.physical_rows.unwrap() as u64)
.sum::<u64>();
let needs_remapping = !dataset.manifest.uses_move_stable_row_ids();
let mut scanner = dataset.scan();
if let Some(batch_size) = options.batch_size {
scanner.batch_size(batch_size);
}
let task_id = uuid::Uuid::new_v4();
log::info!(
"Compaction task {}: Begin compacting {} rows across {} fragments",
task_id,
num_rows,
fragments.len()
);
scanner
.with_fragments(fragments.clone())
.scan_in_order(true);
let (row_ids, reader) = if needs_remapping {
let row_ids = Arc::new(RwLock::new(RoaringTreemap::new()));
scanner.with_row_id();
let data = SendableRecordBatchStream::from(scanner.try_into_stream().await?);
let data_no_row_ids = make_rowid_capture_stream(row_ids.clone(), data)?;
(Some(row_ids), data_no_row_ids)
} else {
let data = SendableRecordBatchStream::from(scanner.try_into_stream().await?);
(None, data)
};
let mut rows_read = 0;
let schema = reader.schema();
let reader = reader.inspect_ok(move |batch| {
rows_read += batch.num_rows();
log::info!(
"Compaction task {}: Read progress {}/{}",
task_id,
rows_read,
num_rows,
);
});
let reader = Box::pin(RecordBatchStreamAdapter::new(schema, reader));
let mut params = WriteParams {
max_rows_per_file: options.target_rows_per_fragment,
max_rows_per_group: options.max_rows_per_group,
mode: WriteMode::Append,
..Default::default()
};
if let Some(max_bytes_per_file) = options.max_bytes_per_file {
params.max_bytes_per_file = max_bytes_per_file;
}
if dataset.manifest.uses_move_stable_row_ids() {
params.enable_move_stable_row_ids = true;
}
let new_fragments = write_fragments_internal(
Some(dataset.as_ref()),
dataset.object_store.clone(),
&dataset.base,
dataset.schema().clone(),
reader,
params,
)
.await?;
assert!(new_fragments.blob.is_none());
let mut new_fragments = new_fragments.default.0;
log::info!("Compaction task {}: file written", task_id);
let row_id_map = if let Some(row_ids) = row_ids {
let row_ids = Arc::try_unwrap(row_ids)
.expect("Row ids lock still owned")
.into_inner()
.expect("Row ids mutex still locked");
log::info!(
"Compaction task {}: reserving fragment ids and transposing row ids",
task_id
);
reserve_fragment_ids(&dataset, new_fragments.iter_mut()).await?;
remapping::transpose_row_ids(row_ids, &fragments, &new_fragments)
} else {
log::info!("Compaction task {}: rechunking stable row ids", task_id);
rechunk_stable_row_ids(dataset.as_ref(), &mut new_fragments, &fragments).await?;
HashMap::new()
};
metrics.files_removed = task
.fragments
.iter()
.map(|f| f.files.len() + f.deletion_file.is_some() as usize)
.sum();
metrics.fragments_removed = task.fragments.len();
metrics.fragments_added = new_fragments.len();
metrics.files_added = new_fragments
.iter()
.map(|f| f.files.len() + f.deletion_file.is_some() as usize)
.sum();
log::info!("Compaction task {}: completed", task_id);
Ok(RewriteResult {
metrics,
new_fragments,
read_version: dataset.manifest.version,
original_fragments: task.fragments,
row_id_map,
})
}
async fn rechunk_stable_row_ids(
dataset: &Dataset,
new_fragments: &mut [Fragment],
old_fragments: &[Fragment],
) -> Result<()> {
let mut old_sequences = load_row_id_sequences(dataset, old_fragments)
.try_collect::<Vec<_>>()
.await?;
old_sequences.sort_by_key(|(frag_id, _)| {
old_fragments
.iter()
.position(|frag| frag.id as u32 == *frag_id)
.expect("Fragment not found")
});
futures::stream::iter(old_sequences.iter_mut().zip(old_fragments.iter()))
.map(Ok)
.try_for_each(|((_, seq), frag)| async move {
let deletions = read_deletion_file(&dataset.base, frag, dataset.object_store()).await?;
if let Some(deletions) = deletions {
let mut new_seq = seq.as_ref().clone();
new_seq.mask(deletions.into_iter())?;
*seq = Arc::new(new_seq);
}
Ok::<(), crate::Error>(())
})
.await?;
debug_assert_eq!(
{ old_sequences.iter().map(|(_, seq)| seq.len()).sum::<u64>() },
{
new_fragments
.iter()
.map(|frag| frag.physical_rows.unwrap() as u64)
.sum::<u64>()
},
"{:?}",
old_sequences
);
let new_sequences = lance_table::rowids::rechunk_sequences(
old_sequences
.into_iter()
.map(|(_, seq)| seq.as_ref().clone()),
new_fragments
.iter()
.map(|frag| frag.physical_rows.unwrap() as u64),
)?;
for (fragment, sequence) in new_fragments.iter_mut().zip(new_sequences) {
let serialized = lance_table::rowids::write_row_ids(&sequence);
fragment.row_id_meta = Some(RowIdMeta::Inline(serialized));
}
Ok(())
}
pub async fn commit_compaction(
dataset: &mut Dataset,
completed_tasks: Vec<RewriteResult>,
options: Arc<dyn IndexRemapperOptions>,
) -> Result<CompactionMetrics> {
if completed_tasks.is_empty() {
return Ok(CompactionMetrics::default());
}
let needs_remapping = !dataset.manifest.uses_move_stable_row_ids();
let mut rewrite_groups = Vec::with_capacity(completed_tasks.len());
let mut metrics = CompactionMetrics::default();
let mut row_id_map: HashMap<u64, Option<u64>> = HashMap::default();
for task in completed_tasks {
metrics += task.metrics;
let rewrite_group = RewriteGroup {
old_fragments: task.original_fragments,
new_fragments: task.new_fragments,
};
if needs_remapping {
row_id_map.extend(task.row_id_map);
}
rewrite_groups.push(rewrite_group);
}
let rewritten_indices = if needs_remapping {
let index_remapper = options.create_remapper(dataset)?;
let affected_ids = rewrite_groups
.iter()
.flat_map(|group| group.old_fragments.iter().map(|frag| frag.id))
.collect::<Vec<_>>();
let remapped_indices = index_remapper
.remap_indices(row_id_map, &affected_ids)
.await?;
remapped_indices
.iter()
.map(|rewritten| RewrittenIndex {
old_id: rewritten.original,
new_id: rewritten.new,
})
.collect()
} else {
let new_fragments = rewrite_groups
.iter_mut()
.flat_map(|group| group.new_fragments.iter_mut())
.collect::<Vec<_>>();
reserve_fragment_ids(dataset, new_fragments.into_iter()).await?;
Vec::new()
};
let transaction = Transaction::new(
dataset.manifest.version,
Operation::Rewrite {
groups: rewrite_groups,
rewritten_indices,
},
None,
None,
);
let manifest = commit_transaction(
dataset,
dataset.object_store(),
dataset.commit_handler.as_ref(),
&transaction,
&Default::default(),
&Default::default(),
dataset.manifest_naming_scheme,
)
.await?;
dataset.manifest = Arc::new(manifest);
Ok(metrics)
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use arrow_array::{Float32Array, Int64Array, RecordBatch, RecordBatchIterator};
use arrow_schema::{DataType, Field, Schema};
use arrow_select::concat::concat_batches;
use async_trait::async_trait;
use lance_core::utils::address::RowAddress;
use lance_file::version::LanceFileVersion;
use lance_index::scalar::ScalarIndexParams;
use lance_index::IndexType;
use lance_linalg::distance::MetricType;
use lance_testing::datagen::{BatchGenerator, IncrementingInt32, RandomVector};
use rstest::rstest;
use tempfile::tempdir;
use uuid::Uuid;
use crate::index::vector::VectorIndexParams;
use self::remapping::RemappedIndex;
use super::*;
#[test]
fn test_candidate_bin() {
let empty_bin = CandidateBin {
fragments: vec![],
pos_range: 0..0,
candidacy: vec![],
row_counts: vec![],
indices: vec![],
};
assert!(empty_bin.is_noop());
let fragment = Fragment {
id: 0,
files: vec![],
deletion_file: None,
row_id_meta: None,
physical_rows: Some(0),
};
let single_bin = CandidateBin {
fragments: vec![fragment.clone()],
pos_range: 0..1,
candidacy: vec![CompactionCandidacy::CompactWithNeighbors],
row_counts: vec![100],
indices: vec![],
};
assert!(single_bin.is_noop());
let single_bin = CandidateBin {
fragments: vec![fragment.clone()],
pos_range: 0..1,
candidacy: vec![CompactionCandidacy::CompactItself],
row_counts: vec![100],
indices: vec![],
};
assert!(!single_bin.is_noop());
let big_bin = CandidateBin {
fragments: std::iter::repeat(fragment).take(8).collect(),
pos_range: 0..8,
candidacy: std::iter::repeat(CompactionCandidacy::CompactItself)
.take(8)
.collect(),
row_counts: vec![100, 400, 200, 200, 400, 300, 300, 100],
indices: vec![],
};
assert!(!big_bin.is_noop());
let split = big_bin.split_for_size(500);
assert_eq!(split.len(), 3);
assert_eq!(split[0].pos_range, 0..2);
assert_eq!(split[1].pos_range, 2..5);
assert_eq!(split[2].pos_range, 5..8);
}
fn sample_data() -> RecordBatch {
let schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]);
RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(Int64Array::from_iter_values(0..10_000))],
)
.unwrap()
}
#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
struct MockIndexRemapperExpectation {
expected: HashMap<u64, Option<u64>>,
answer: Vec<RemappedIndex>,
}
#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
struct MockIndexRemapper {
expectations: Vec<MockIndexRemapperExpectation>,
}
impl MockIndexRemapper {
fn stringify_map(map: &HashMap<u64, Option<u64>>) -> String {
let mut sorted_keys = map.keys().collect::<Vec<_>>();
sorted_keys.sort();
let mut first_keys = sorted_keys
.into_iter()
.take(10)
.map(|key| {
format!(
"{}:{:?}",
RowAddress::from(*key),
map[key].map(RowAddress::from)
)
})
.collect::<Vec<_>>()
.join(",");
if map.len() > 10 {
first_keys.push_str(", ...");
}
let mut result_str = format!("(len={})", map.len());
result_str.push_str(&first_keys);
result_str
}
fn in_any_order(expectations: &[Self]) -> Self {
let expectations = expectations
.iter()
.flat_map(|item| item.expectations.clone())
.collect::<Vec<_>>();
Self { expectations }
}
}
#[async_trait]
impl IndexRemapper for MockIndexRemapper {
async fn remap_indices(
&self,
index_map: HashMap<u64, Option<u64>>,
_: &[u64],
) -> Result<Vec<RemappedIndex>> {
for expectation in &self.expectations {
if expectation.expected == index_map {
return Ok(expectation.answer.clone());
}
}
panic!(
"Unexpected index map (len={}): {}\n Options: {}",
index_map.len(),
Self::stringify_map(&index_map),
self.expectations
.iter()
.map(|expectation| Self::stringify_map(&expectation.expected))
.collect::<Vec<_>>()
.join("\n ")
);
}
}
impl IndexRemapperOptions for MockIndexRemapper {
fn create_remapper(&self, _: &Dataset) -> Result<Box<dyn IndexRemapper>> {
Ok(Box::new(self.clone()))
}
}
#[rstest]
#[tokio::test]
async fn test_compact_empty(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]);
let reader = RecordBatchIterator::new(vec![].into_iter().map(Ok), Arc::new(schema));
let mut dataset = Dataset::write(
reader,
test_uri,
Some(WriteParams {
data_storage_version: Some(data_storage_version),
..Default::default()
}),
)
.await
.unwrap();
let plan = plan_compaction(&dataset, &CompactionOptions::default())
.await
.unwrap();
assert_eq!(plan.tasks().len(), 0);
let metrics = compact_files(&mut dataset, CompactionOptions::default(), None)
.await
.unwrap();
assert_eq!(metrics, CompactionMetrics::default());
assert_eq!(dataset.manifest.version, 1);
}
#[rstest]
#[tokio::test]
async fn test_compact_all_good(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let data = sample_data();
let reader = RecordBatchIterator::new(vec![Ok(data.clone())], data.schema());
let write_params = WriteParams {
max_rows_per_file: 10_000,
data_storage_version: Some(data_storage_version),
..Default::default()
};
let dataset = Dataset::write(reader, test_uri, Some(write_params))
.await
.unwrap();
let plan = plan_compaction(&dataset, &CompactionOptions::default())
.await
.unwrap();
assert_eq!(plan.tasks().len(), 0);
let reader = RecordBatchIterator::new(vec![Ok(data.clone())], data.schema());
let write_params = WriteParams {
max_rows_per_file: 3_000,
max_rows_per_group: 1_000,
data_storage_version: Some(data_storage_version),
mode: WriteMode::Overwrite,
..Default::default()
};
let dataset = Dataset::write(reader, test_uri, Some(write_params))
.await
.unwrap();
let options = CompactionOptions {
target_rows_per_fragment: 3_000,
..Default::default()
};
let plan = plan_compaction(&dataset, &options).await.unwrap();
assert_eq!(plan.tasks().len(), 0);
}
fn row_ids(frag_idx: u32, offsets: Range<u32>) -> Range<u64> {
let start = RowAddress::new_from_parts(frag_idx, offsets.start);
let end = RowAddress::new_from_parts(frag_idx, offsets.end);
start.into()..end.into()
}
fn expect_remap(
ranges: &[Vec<(Range<u64>, bool)>],
starting_new_frag_idx: u32,
) -> MockIndexRemapper {
let mut expected_remap: HashMap<u64, Option<u64>> = HashMap::default();
expected_remap.reserve(ranges.iter().map(|r| r.len()).sum());
for (new_frag_offset, new_frag_ranges) in ranges.iter().enumerate() {
let new_frag_idx = starting_new_frag_idx + new_frag_offset as u32;
let mut row_offset = 0;
for (old_id_range, is_found) in new_frag_ranges.iter() {
for old_id in old_id_range.clone() {
if *is_found {
let new_id = RowAddress::new_from_parts(new_frag_idx, row_offset);
expected_remap.insert(old_id, Some(new_id.into()));
row_offset += 1;
} else {
expected_remap.insert(old_id, None);
}
}
}
}
MockIndexRemapper {
expectations: vec![MockIndexRemapperExpectation {
expected: expected_remap,
answer: vec![],
}],
}
}
#[rstest]
#[tokio::test]
async fn test_compact_many(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let data = sample_data();
let reader = RecordBatchIterator::new(vec![Ok(data.slice(0, 1200))], data.schema());
let write_params = WriteParams {
max_rows_per_file: 400,
data_storage_version: Some(data_storage_version),
..Default::default()
};
Dataset::write(reader, test_uri, Some(write_params))
.await
.unwrap();
let reader = RecordBatchIterator::new(vec![Ok(data.slice(1200, 2000))], data.schema());
let write_params = WriteParams {
max_rows_per_file: 1000,
data_storage_version: Some(data_storage_version),
mode: WriteMode::Append,
..Default::default()
};
let mut dataset = Dataset::write(reader, test_uri, Some(write_params))
.await
.unwrap();
dataset.delete("a = 1300").await.unwrap();
dataset.delete("a >= 2400 AND a < 2600").await.unwrap();
let reader = RecordBatchIterator::new(vec![Ok(data.slice(3200, 600))], data.schema());
let write_params = WriteParams {
max_rows_per_file: 300,
data_storage_version: Some(data_storage_version),
mode: WriteMode::Append,
..Default::default()
};
let mut dataset = Dataset::write(reader, test_uri, Some(write_params))
.await
.unwrap();
let first_new_frag_idx = 7;
let remap_a = expect_remap(
&[
vec![
(row_ids(0, 0..400), true),
(row_ids(1, 0..400), true),
(row_ids(2, 0..200), true),
],
vec![(row_ids(2, 200..400), true)],
vec![
(row_ids(4, 0..200), true),
(row_ids(4, 200..400), false),
(row_ids(4, 400..1000), true),
(row_ids(5, 0..200), true),
],
vec![(row_ids(5, 200..300), true), (row_ids(6, 0..300), true)],
],
first_new_frag_idx,
);
let remap_b = expect_remap(
&[
vec![
(row_ids(4, 0..200), true),
(row_ids(4, 200..400), false),
(row_ids(4, 400..1000), true),
(row_ids(5, 0..200), true),
],
vec![(row_ids(5, 200..300), true), (row_ids(6, 0..300), true)],
vec![
(row_ids(0, 0..400), true),
(row_ids(1, 0..400), true),
(row_ids(2, 0..200), true),
],
vec![(row_ids(2, 200..400), true)],
],
first_new_frag_idx,
);
let options = CompactionOptions {
target_rows_per_fragment: 1000,
..Default::default()
};
let plan = plan_compaction(&dataset, &options).await.unwrap();
assert_eq!(plan.tasks().len(), 2);
assert_eq!(plan.tasks()[0].fragments.len(), 3);
assert_eq!(plan.tasks()[1].fragments.len(), 3);
assert_eq!(
plan.tasks()[0]
.fragments
.iter()
.map(|f| f.id)
.collect::<Vec<_>>(),
vec![0, 1, 2]
);
assert_eq!(
plan.tasks()[1]
.fragments
.iter()
.map(|f| f.id)
.collect::<Vec<_>>(),
vec![4, 5, 6]
);
let mock_remapper = MockIndexRemapper::in_any_order(&[remap_a, remap_b]);
let metrics = compact_files(&mut dataset, options, Some(Arc::new(mock_remapper)))
.await
.unwrap();
assert_eq!(metrics.fragments_removed, 6);
assert_eq!(metrics.fragments_added, 4);
assert_eq!(metrics.files_removed, 7); assert_eq!(metrics.files_added, 4);
let fragment_ids = dataset
.get_fragments()
.iter()
.map(|f| f.id())
.collect::<Vec<_>>();
assert_eq!(fragment_ids, vec![3, 7, 8, 9, 10]);
}
#[rstest]
#[tokio::test]
async fn test_compact_data_files(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let data = sample_data();
let reader = RecordBatchIterator::new(vec![Ok(data.clone())], data.schema());
let write_params = WriteParams {
max_rows_per_file: 5_000,
max_rows_per_group: 1_000,
data_storage_version: Some(data_storage_version),
..Default::default()
};
let mut dataset = Dataset::write(reader, test_uri, Some(write_params))
.await
.unwrap();
let schema = Schema::new(vec![
Field::new("a", DataType::Int64, false),
Field::new("x", DataType::Float32, false),
]);
let data = RecordBatch::try_new(
Arc::new(schema),
vec![
Arc::new(Int64Array::from_iter_values(0..10_000)),
Arc::new(Float32Array::from_iter_values(
(0..10_000).map(|x| x as f32 * std::f32::consts::PI),
)),
],
)
.unwrap();
let reader = RecordBatchIterator::new(vec![Ok(data.clone())], data.schema());
dataset.merge(reader, "a", "a").await.unwrap();
let expected_remap = expect_remap(
&[vec![
(row_ids(0, 0..5000), true),
(row_ids(1, 0..5000), true),
]],
2,
);
let plan = plan_compaction(
&dataset,
&CompactionOptions {
..Default::default()
},
)
.await
.unwrap();
assert_eq!(plan.tasks().len(), 1);
assert_eq!(plan.tasks()[0].fragments.len(), 2);
let metrics = compact_files(&mut dataset, plan.options, Some(Arc::new(expected_remap)))
.await
.unwrap();
assert_eq!(metrics.files_removed, 4); assert_eq!(metrics.files_added, 1); assert_eq!(metrics.fragments_removed, 2);
assert_eq!(metrics.fragments_added, 1);
let scanner = dataset.scan();
let batches = scanner
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let scanned_data = concat_batches(&batches[0].schema(), &batches).unwrap();
assert_eq!(scanned_data, data);
}
#[rstest]
#[tokio::test]
async fn test_compact_deletions(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let data = sample_data();
let reader = RecordBatchIterator::new(vec![Ok(data.slice(0, 1000))], data.schema());
let write_params = WriteParams {
max_rows_per_file: 1000,
data_storage_version: Some(data_storage_version),
..Default::default()
};
let mut dataset = Dataset::write(reader, test_uri, Some(write_params))
.await
.unwrap();
dataset.delete("a <= 500").await.unwrap();
let mut options = CompactionOptions {
materialize_deletions_threshold: 0.8,
..Default::default()
};
let plan = plan_compaction(&dataset, &options).await.unwrap();
assert_eq!(plan.tasks().len(), 0);
options.materialize_deletions_threshold = 0.1;
options.materialize_deletions = false;
let plan = plan_compaction(&dataset, &options).await.unwrap();
assert_eq!(plan.tasks().len(), 0);
options.materialize_deletions = true;
let plan = plan_compaction(&dataset, &options).await.unwrap();
assert_eq!(plan.tasks().len(), 1);
let metrics = compact_files(&mut dataset, options, None).await.unwrap();
assert_eq!(metrics.fragments_removed, 1);
assert_eq!(metrics.files_removed, 2);
assert_eq!(metrics.fragments_added, 1);
let fragments = dataset.get_fragments();
assert_eq!(fragments.len(), 1);
assert!(fragments[0].metadata.deletion_file.is_none());
}
#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
struct IgnoreRemap {}
#[async_trait]
impl IndexRemapper for IgnoreRemap {
async fn remap_indices(
&self,
_: HashMap<u64, Option<u64>>,
_: &[u64],
) -> Result<Vec<RemappedIndex>> {
Ok(Vec::new())
}
}
impl IndexRemapperOptions for IgnoreRemap {
fn create_remapper(&self, _: &Dataset) -> Result<Box<dyn IndexRemapper>> {
Ok(Box::new(Self {}))
}
}
#[rstest::rstest]
#[tokio::test]
async fn test_compact_distributed(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
#[values(false, true)] use_stable_row_id: bool,
) {
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let data = sample_data();
let reader = RecordBatchIterator::new(vec![Ok(data.slice(0, 9000))], data.schema());
let write_params = WriteParams {
max_rows_per_file: 1000,
data_storage_version: Some(data_storage_version),
enable_move_stable_row_ids: use_stable_row_id,
..Default::default()
};
let mut dataset = Dataset::write(reader, test_uri, Some(write_params))
.await
.unwrap();
let options = CompactionOptions {
target_rows_per_fragment: 3_000,
..Default::default()
};
let plan = plan_compaction(&dataset, &options).await.unwrap();
assert_eq!(plan.tasks().len(), 3);
let dataset_ref = &dataset;
let mut results = futures::stream::iter(plan.compaction_tasks())
.then(|task| async move { task.execute(dataset_ref).await.unwrap() })
.collect::<Vec<_>>()
.await;
assert_eq!(results.len(), 3);
assert_eq!(
results[0]
.original_fragments
.iter()
.map(|f| f.id)
.collect::<Vec<_>>(),
vec![0, 1, 2]
);
assert_eq!(results[0].metrics.files_removed, 3);
assert_eq!(results[0].metrics.files_added, 1);
commit_compaction(
&mut dataset,
vec![results.pop().unwrap()],
Arc::new(IgnoreRemap::default()),
)
.await
.unwrap();
if use_stable_row_id {
assert_eq!(dataset.manifest.version, 3);
} else {
assert_eq!(dataset.manifest.version, 5);
}
commit_compaction(&mut dataset, results, Arc::new(IgnoreRemap::default()))
.await
.unwrap();
if use_stable_row_id {
assert_eq!(dataset.manifest.version, 5);
} else {
assert_eq!(dataset.manifest.version, 6);
}
assert_eq!(
dataset.manifest.uses_move_stable_row_ids(),
use_stable_row_id,
);
}
#[tokio::test]
async fn test_stable_row_indices() {
let mut data_gen = BatchGenerator::new()
.col(Box::new(
RandomVector::new().vec_width(128).named("vec".to_owned()),
))
.col(Box::new(IncrementingInt32::new().named("i".to_owned())));
let mut dataset = Dataset::write(
data_gen.batch(5_000),
"memory://test/table",
Some(WriteParams {
enable_move_stable_row_ids: true,
max_rows_per_file: 1_000, ..Default::default()
}),
)
.await
.unwrap();
dataset.delete("i < 1100").await.unwrap();
dataset
.create_index(
&["i"],
IndexType::Scalar,
Some("scalar".into()),
&ScalarIndexParams::default(),
false,
)
.await
.unwrap();
let params = VectorIndexParams::ivf_pq(1, 8, 8, MetricType::L2, 50);
dataset
.create_index(
&["vec"],
IndexType::Vector,
Some("vector".into()),
¶ms,
false,
)
.await
.unwrap();
async fn index_set(dataset: &Dataset) -> HashSet<Uuid> {
dataset
.load_indices()
.await
.unwrap()
.iter()
.map(|index| index.uuid)
.collect()
}
let indices = index_set(&dataset).await;
async fn vector_query(dataset: &Dataset) -> RecordBatch {
let mut scanner = dataset.scan();
scanner
.nearest("vec", &vec![0.0; 128].into(), 10)
.unwrap()
.project(&["i"])
.unwrap();
scanner.try_into_batch().await.unwrap()
}
async fn scalar_query(dataset: &Dataset) -> RecordBatch {
let mut scanner = dataset.scan();
scanner.filter("i = 1000").unwrap().project(&["i"]).unwrap();
scanner.try_into_batch().await.unwrap()
}
let before_vec_result = vector_query(&dataset).await;
let before_scalar_result = scalar_query(&dataset).await;
let options = CompactionOptions {
target_rows_per_fragment: 1_800,
..Default::default()
};
let _metrics = compact_files(&mut dataset, options, None).await.unwrap();
let current_indices = index_set(&dataset).await;
assert_eq!(indices, current_indices);
let after_vec_result = vector_query(&dataset).await;
assert_eq!(before_vec_result, after_vec_result);
let after_scalar_result = scalar_query(&dataset).await;
assert_eq!(before_scalar_result, after_scalar_result);
}
}