use std::borrow::Cow;
use std::collections::HashMap;
use std::io::Cursor;
use std::ops::{AddAssign, Range};
use std::sync::Arc;
use super::fragment::FileFragment;
use super::index::DatasetIndexRemapperOptions;
use super::rowids::load_row_id_sequences;
use super::transaction::{
Operation, RewriteGroup, RewrittenIndex, Transaction, TransactionBuilder,
};
use super::utils::make_rowid_capture_stream;
use super::{WriteMode, WriteParams, write_fragments_internal};
use crate::Dataset;
use crate::Result;
use crate::dataset::utils::CapturedRowIds;
use crate::io::commit::{commit_transaction, migrate_fragments};
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use futures::{StreamExt, TryStreamExt};
use lance_core::Error;
use lance_core::datatypes::BlobHandling;
use lance_core::utils::tokio::get_num_compute_intensive_cpus;
use lance_core::utils::tracing::{DATASET_COMPACTING_EVENT, TRACE_DATASET_EVENTS};
use lance_index::DatasetIndexExt;
use lance_index::frag_reuse::FragReuseGroup;
use lance_table::format::{Fragment, RowIdMeta};
use roaring::{RoaringBitmap, RoaringTreemap};
use serde::{Deserialize, Serialize};
use tracing::{info, warn};
mod binary_copy;
pub mod remapping;
use crate::index::frag_reuse::build_new_frag_reuse_index;
use crate::io::deletion::read_dataset_deletion_file;
use binary_copy::rewrite_files_binary_copy;
pub use remapping::{IgnoreRemap, IndexRemapper, IndexRemapperOptions, RemappedIndex};
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
pub enum CompactionMode {
Reencode,
TryBinaryCopy,
ForceBinaryCopy,
}
impl TryFrom<&str> for CompactionMode {
type Error = Error;
fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
match value.to_lowercase().as_str() {
"reencode" => Ok(Self::Reencode),
"try_binary_copy" => Ok(Self::TryBinaryCopy),
"force_binary_copy" => Ok(Self::ForceBinaryCopy),
_ => Err(Error::invalid_input(format!(
"Invalid compaction mode \"{}\". Valid values: \"reencode\", \"try_binary_copy\", \"force_binary_copy\"",
value
))),
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct CompactionOptions {
pub target_rows_per_fragment: usize,
pub max_rows_per_group: usize,
pub max_bytes_per_file: Option<usize>,
pub materialize_deletions: bool,
pub materialize_deletions_threshold: f32,
pub num_threads: Option<usize>,
pub batch_size: Option<usize>,
pub defer_index_remap: bool,
pub compaction_mode: Option<CompactionMode>,
#[deprecated(note = "Use `compaction_mode` instead")]
pub enable_binary_copy: bool,
#[deprecated(note = "Use `compaction_mode` instead")]
pub enable_binary_copy_force: bool,
pub binary_copy_read_batch_bytes: Option<usize>,
pub max_source_fragments: Option<usize>,
#[serde(skip)]
pub transaction_properties: Option<Arc<HashMap<String, String>>>,
}
#[allow(deprecated)]
impl Default for CompactionOptions {
fn default() -> Self {
Self {
target_rows_per_fragment: 1024 * 1024,
max_rows_per_group: 1024,
materialize_deletions: true,
materialize_deletions_threshold: 0.1,
num_threads: None,
max_bytes_per_file: None,
batch_size: None,
defer_index_remap: false,
compaction_mode: None,
enable_binary_copy: false,
enable_binary_copy_force: false,
binary_copy_read_batch_bytes: Some(16 * 1024 * 1024),
max_source_fragments: None,
transaction_properties: None,
}
}
}
pub const COMPACTION_CONFIG_PREFIX: &str = "lance.compaction.";
#[allow(deprecated)]
impl CompactionOptions {
pub fn from_dataset_config(config: &HashMap<String, String>) -> Result<Self> {
let mut opts = Self::default();
opts.apply_dataset_config(config)?;
Ok(opts)
}
pub fn apply_dataset_config(&mut self, config: &HashMap<String, String>) -> Result<()> {
for (key, value) in config {
let Some(field) = key.strip_prefix(COMPACTION_CONFIG_PREFIX) else {
continue;
};
match field {
"target_rows_per_fragment" => {
self.target_rows_per_fragment = value.parse().map_err(|_| {
Error::invalid_input(format!(
"Invalid value for {}: '{}' (expected a non-negative integer)",
key, value
))
})?;
}
"max_rows_per_group" => {
self.max_rows_per_group = value.parse().map_err(|_| {
Error::invalid_input(format!(
"Invalid value for {}: '{}' (expected a non-negative integer)",
key, value
))
})?;
}
"max_bytes_per_file" => {
self.max_bytes_per_file = Some(value.parse().map_err(|_| {
Error::invalid_input(format!(
"Invalid value for {}: '{}' (expected a non-negative integer)",
key, value
))
})?);
}
"materialize_deletions" => {
self.materialize_deletions = match value.to_lowercase().as_str() {
"true" => true,
"false" => false,
_ => {
return Err(Error::invalid_input(format!(
"Invalid value for {}: '{}' (expected 'true' or 'false')",
key, value
)));
}
};
}
"materialize_deletions_threshold" => {
self.materialize_deletions_threshold = value.parse().map_err(|_| {
Error::invalid_input(format!(
"Invalid value for {}: '{}' (expected a float between 0.0 and 1.0)",
key, value
))
})?;
}
"defer_index_remap" => {
self.defer_index_remap = match value.to_lowercase().as_str() {
"true" => true,
"false" => false,
_ => {
return Err(Error::invalid_input(format!(
"Invalid value for {}: '{}' (expected 'true' or 'false')",
key, value
)));
}
};
}
"batch_size" => {
self.batch_size = Some(value.parse().map_err(|_| {
Error::invalid_input(format!(
"Invalid value for {}: '{}' (expected a non-negative integer)",
key, value
))
})?);
}
"compaction_mode" => {
self.compaction_mode = Some(CompactionMode::try_from(value.as_str())?);
}
"binary_copy_read_batch_bytes" => {
self.binary_copy_read_batch_bytes = Some(value.parse().map_err(|_| {
Error::invalid_input(format!(
"Invalid value for {}: '{}' (expected a non-negative integer)",
key, value
))
})?);
}
"max_source_fragments" => {
self.max_source_fragments = Some(value.parse().map_err(|_| {
Error::invalid_input(format!(
"Invalid value for {}: '{}' (expected a non-negative integer)",
key, value
))
})?);
}
_ => {
warn!("Ignoring unknown compaction config key: {}", key);
}
}
}
Ok(())
}
pub fn validate(&mut self) {
if self.materialize_deletions && self.materialize_deletions_threshold >= 1.0 {
self.materialize_deletions = false;
}
}
pub fn compaction_mode(&self) -> CompactionMode {
if let Some(mode) = self.compaction_mode {
return mode;
}
match (self.enable_binary_copy, self.enable_binary_copy_force) {
(true, true) => CompactionMode::ForceBinaryCopy,
(true, false) => CompactionMode::TryBinaryCopy,
_ => CompactionMode::Reencode,
}
}
pub fn transaction_properties(mut self, properties: HashMap<String, String>) -> Self {
self.transaction_properties = Some(Arc::new(properties));
self
}
}
async fn can_use_binary_copy(
dataset: &Dataset,
options: &CompactionOptions,
fragments: &[Fragment],
) -> bool {
can_use_binary_copy_impl(dataset, options, fragments)
.await
.unwrap_or_else(|err| {
log::warn!("Binary copy disabled due to error: {}", err);
false
})
}
async fn can_use_binary_copy_impl(
dataset: &Dataset,
options: &CompactionOptions,
fragments: &[Fragment],
) -> Result<bool> {
use lance_file::reader::FileReader as LFReader;
use lance_file::version::LanceFileVersion;
use lance_io::scheduler::{ScanScheduler, SchedulerConfig};
if matches!(options.compaction_mode(), CompactionMode::Reencode) {
log::debug!("Binary copy disabled: compaction mode is Reencode");
return Ok(false);
}
let has_blob_columns = dataset
.schema()
.fields_pre_order()
.any(|field| field.is_blob());
if has_blob_columns {
log::debug!("Binary copy disabled: dataset contains blob columns");
return Ok(false);
}
let storage_ok = dataset
.manifest
.data_storage_format
.lance_file_version()
.map(|v| !matches!(v.resolve(), LanceFileVersion::Legacy))
.unwrap_or(false);
if !storage_ok {
log::debug!("Binary copy disabled: dataset uses legacy storage format");
return Ok(false);
}
if fragments.is_empty() {
log::debug!("Binary copy disabled: no fragments to compact");
return Ok(false);
}
let storage_file_version = dataset
.manifest
.data_storage_format
.lance_file_version()?
.resolve();
if fragments[0].files.is_empty() {
log::debug!(
"Binary copy disabled: fragment {} has no data files",
fragments[0].id
);
return Ok(false);
}
let ref_fields = &fragments[0].files[0].fields;
let ref_cols = &fragments[0].files[0].column_indices;
let mut is_same_version = true;
for fragment in fragments {
if fragment.deletion_file.is_some() {
log::debug!(
"Binary copy disabled: fragment {} has a deletion file",
fragment.id
);
return Ok(false);
}
for data_file in &fragment.files {
let version_ok = LanceFileVersion::try_from_major_minor(
data_file.file_major_version,
data_file.file_minor_version,
)
.map(|v| v.resolve())
.is_ok_and(|v| v == storage_file_version);
if !version_ok {
is_same_version = false;
}
if data_file.fields != *ref_fields || data_file.column_indices != *ref_cols {
return Ok(false);
}
let object_store = match data_file.base_id {
Some(base_id) => dataset.object_store_for_base(base_id).await?,
None => dataset.object_store.clone(),
};
let full_path = dataset
.data_file_dir(data_file)?
.child(data_file.path.as_str());
let scan_scheduler = ScanScheduler::new(
object_store.clone(),
SchedulerConfig::max_bandwidth(&object_store),
);
let file_scheduler = scan_scheduler
.open_file_with_priority(&full_path, 0, &data_file.file_size_bytes)
.await?;
let file_meta = LFReader::read_all_metadata(&file_scheduler).await?;
if file_meta.file_buffers.len() > 1 {
log::debug!(
"Binary copy disabled: data file has extra global buffers (len={})",
file_meta.file_buffers.len()
);
return Ok(false);
}
}
}
if !is_same_version {
log::debug!("Binary copy disabled: data files use different file versions");
return Ok(false);
}
Ok(true)
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct CompactionMetrics {
pub fragments_removed: usize,
pub fragments_added: usize,
pub files_removed: usize,
pub files_added: usize,
}
impl AddAssign for CompactionMetrics {
fn add_assign(&mut self, rhs: Self) {
self.fragments_removed += rhs.fragments_removed;
self.fragments_added += rhs.fragments_added;
self.files_removed += rhs.files_removed;
self.files_added += rhs.files_added;
}
}
#[async_trait::async_trait]
pub trait CompactionPlanner: Send + Sync {
async fn plan(&self, dataset: &Dataset) -> Result<CompactionPlan>;
}
#[derive(Debug, Clone, Default)]
pub struct DefaultCompactionPlanner {
options: CompactionOptions,
}
impl DefaultCompactionPlanner {
pub fn new(mut options: CompactionOptions) -> Self {
options.validate();
Self { options }
}
}
#[async_trait::async_trait]
impl CompactionPlanner for DefaultCompactionPlanner {
async fn plan(&self, dataset: &Dataset) -> Result<CompactionPlan> {
let fragments = dataset.get_fragments();
debug_assert!(
fragments.windows(2).all(|w| w[0].id() < w[1].id()),
"fragments in manifest are not sorted"
);
let mut fragment_metrics = futures::stream::iter(fragments)
.map(|fragment| async move {
match collect_metrics(&fragment).await {
Ok(metrics) => Ok((fragment.metadata, metrics)),
Err(e) => Err(e),
}
})
.buffered(dataset.object_store().io_parallelism());
let index_fragmaps = load_index_fragmaps(dataset).await?;
let indices_containing_frag = |frag_id: u32| {
index_fragmaps
.iter()
.enumerate()
.filter(|(_, bitmap)| bitmap.contains(frag_id))
.map(|(pos, _)| pos)
.collect::<Vec<_>>()
};
let mut candidate_bins: Vec<CandidateBin> = Vec::new();
let mut current_bin: Option<CandidateBin> = None;
let mut i = 0;
while let Some(res) = fragment_metrics.next().await {
let (fragment, metrics) = res?;
let candidacy = if self.options.materialize_deletions
&& metrics.deletion_percentage() > self.options.materialize_deletions_threshold
{
Some(CompactionCandidacy::CompactItself)
} else if metrics.physical_rows < self.options.target_rows_per_fragment {
Some(CompactionCandidacy::CompactWithNeighbors)
} else {
None
};
let indices = indices_containing_frag(fragment.id as u32);
match (candidacy, &mut current_bin) {
(None, None) => {} (Some(candidacy), None) => {
current_bin = Some(CandidateBin {
fragments: vec![fragment],
pos_range: i..(i + 1),
candidacy: vec![candidacy],
row_counts: vec![metrics.num_rows()],
indices,
});
}
(Some(candidacy), Some(bin)) => {
if bin.indices == indices {
bin.fragments.push(fragment);
bin.pos_range.end += 1;
bin.candidacy.push(candidacy);
bin.row_counts.push(metrics.num_rows());
} else {
candidate_bins.push(current_bin.take().unwrap());
current_bin = Some(CandidateBin {
fragments: vec![fragment],
pos_range: i..(i + 1),
candidacy: vec![candidacy],
row_counts: vec![metrics.num_rows()],
indices,
});
}
}
(None, Some(_)) => {
candidate_bins.push(current_bin.take().unwrap());
}
}
i += 1;
}
if let Some(bin) = current_bin {
candidate_bins.push(bin);
}
let all_tasks: Vec<TaskData> = candidate_bins
.into_iter()
.filter(|bin| !bin.is_noop())
.flat_map(|bin| bin.split_for_size(self.options.target_rows_per_fragment))
.map(|bin| TaskData {
fragments: bin.fragments,
})
.collect();
let tasks = if let Some(max_frags) = self.options.max_source_fragments {
let mut total_frags = 0;
all_tasks
.into_iter()
.take_while(|task| {
total_frags += task.fragments.len();
total_frags <= max_frags
})
.collect()
} else {
all_tasks
};
let mut compaction_plan =
CompactionPlan::new(dataset.manifest.version, self.options.clone());
compaction_plan.extend_tasks(tasks);
Ok(compaction_plan)
}
}
pub async fn compact_files(
dataset: &mut Dataset,
options: CompactionOptions,
remap_options: Option<Arc<dyn IndexRemapperOptions>>, ) -> Result<CompactionMetrics> {
info!(target: TRACE_DATASET_EVENTS, event=DATASET_COMPACTING_EVENT, uri = &dataset.uri);
let planner = DefaultCompactionPlanner::new(options);
compact_files_with_planner(dataset, remap_options, &planner).await
}
pub async fn compact_files_with_planner(
dataset: &mut Dataset,
remap_options: Option<Arc<dyn IndexRemapperOptions>>, planner: &dyn CompactionPlanner,
) -> Result<CompactionMetrics> {
let compaction_plan: CompactionPlan = planner.plan(dataset).await?;
if compaction_plan.tasks().is_empty() {
return Ok(CompactionMetrics::default());
}
let dataset_ref = &dataset.clone();
let result_stream = futures::stream::iter(compaction_plan.tasks.into_iter())
.map(|task| rewrite_files(Cow::Borrowed(dataset_ref), task, &compaction_plan.options))
.buffer_unordered(
compaction_plan
.options
.num_threads
.unwrap_or_else(get_num_compute_intensive_cpus),
);
let completed_tasks: Vec<RewriteResult> = result_stream.try_collect().await?;
let remap_options = remap_options.unwrap_or(Arc::new(DatasetIndexRemapperOptions::default()));
let metrics = commit_compaction(
dataset,
completed_tasks,
remap_options,
&compaction_plan.options,
)
.await?;
Ok(metrics)
}
#[derive(Debug)]
struct FragmentMetrics {
pub physical_rows: usize,
pub num_deletions: usize,
}
impl FragmentMetrics {
fn deletion_percentage(&self) -> f32 {
if self.physical_rows > 0 {
self.num_deletions as f32 / self.physical_rows as f32
} else {
0.0
}
}
fn num_rows(&self) -> usize {
self.physical_rows - self.num_deletions
}
}
async fn collect_metrics(fragment: &FileFragment) -> Result<FragmentMetrics> {
let physical_rows = fragment.physical_rows();
let num_deletions = fragment.count_deletions();
let (physical_rows, num_deletions) =
futures::future::try_join(physical_rows, num_deletions).await?;
Ok(FragmentMetrics {
physical_rows,
num_deletions,
})
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct CompactionPlan {
pub tasks: Vec<TaskData>,
pub read_version: u64,
pub options: CompactionOptions,
}
impl CompactionPlan {
pub fn compaction_tasks(&self) -> impl Iterator<Item = CompactionTask> + '_ {
let read_version = self.read_version;
let options = self.options.clone();
self.tasks.iter().map(move |task| CompactionTask {
task: task.clone(),
read_version,
options: options.clone(),
})
}
pub fn num_tasks(&self) -> usize {
self.tasks.len()
}
pub fn read_version(&self) -> u64 {
self.read_version
}
pub fn options(&self) -> &CompactionOptions {
&self.options
}
}
async fn prepare_reader(
dataset: &Dataset,
fragments: &[Fragment],
batch_size: Option<usize>,
with_frags: bool,
capture_row_ids: bool,
) -> Result<(
SendableRecordBatchStream,
Option<std::sync::mpsc::Receiver<CapturedRowIds>>,
)> {
let mut scanner = dataset.scan();
let has_blob_columns = dataset
.schema()
.fields_pre_order()
.any(|field| field.is_blob());
if has_blob_columns {
scanner.blob_handling(BlobHandling::AllBinary);
}
if let Some(bs) = batch_size {
scanner.batch_size(bs);
}
if with_frags {
scanner
.with_fragments(fragments.to_vec())
.scan_in_order(true);
}
if capture_row_ids {
scanner.with_row_id();
let data = SendableRecordBatchStream::from(scanner.try_into_stream().await?);
let (data_no_row_ids, rx) =
make_rowid_capture_stream(data, dataset.manifest.uses_stable_row_ids())?;
Ok((data_no_row_ids, Some(rx)))
} else {
Ok((
SendableRecordBatchStream::from(scanner.try_into_stream().await?),
None,
))
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct TaskData {
pub fragments: Vec<Fragment>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct CompactionTask {
pub task: TaskData,
pub read_version: u64,
pub options: CompactionOptions,
}
impl CompactionTask {
pub async fn execute(&self, dataset: &Dataset) -> Result<RewriteResult> {
let dataset = if dataset.manifest.version == self.read_version {
Cow::Borrowed(dataset)
} else {
Cow::Owned(dataset.checkout_version(self.read_version).await?)
};
rewrite_files(dataset, self.task.clone(), &self.options).await
}
}
impl CompactionPlan {
fn new(read_version: u64, options: CompactionOptions) -> Self {
Self {
tasks: Vec::new(),
read_version,
options,
}
}
fn extend_tasks(&mut self, tasks: impl IntoIterator<Item = TaskData>) {
self.tasks.extend(tasks);
}
fn tasks(&self) -> &[TaskData] {
&self.tasks
}
}
#[derive(Debug, Clone)]
enum CompactionCandidacy {
CompactWithNeighbors,
CompactItself,
}
struct CandidateBin {
pub fragments: Vec<Fragment>,
pub pos_range: Range<usize>,
pub candidacy: Vec<CompactionCandidacy>,
pub row_counts: Vec<usize>,
pub indices: Vec<usize>,
}
impl CandidateBin {
fn is_noop(&self) -> bool {
if self.fragments.is_empty() {
return true;
}
if self.fragments.len() == 1 {
matches!(self.candidacy[0], CompactionCandidacy::CompactWithNeighbors)
} else {
false
}
}
fn split_for_size(mut self, min_num_rows: usize) -> Vec<Self> {
let mut bins = Vec::new();
loop {
let mut bin_len = 0;
let mut bin_row_count = 0;
while bin_row_count < min_num_rows && bin_len < self.row_counts.len() {
bin_row_count += self.row_counts[bin_len];
bin_len += 1;
}
if self.row_counts[bin_len..].iter().sum::<usize>() >= min_num_rows {
bins.push(Self {
fragments: self.fragments.drain(0..bin_len).collect(),
pos_range: self.pos_range.start..(self.pos_range.start + bin_len),
candidacy: self.candidacy.drain(0..bin_len).collect(),
row_counts: self.row_counts.drain(0..bin_len).collect(),
indices: Vec::new(),
});
self.pos_range.start += bin_len;
} else {
bins.push(self);
break;
}
}
bins
}
}
async fn load_index_fragmaps(dataset: &Dataset) -> Result<Vec<RoaringBitmap>> {
let indices = dataset.load_indices().await?;
let mut index_fragmaps = Vec::with_capacity(indices.len());
for index in indices.iter() {
if let Some(fragment_bitmap) = index.fragment_bitmap.as_ref() {
index_fragmaps.push(fragment_bitmap.clone());
} else {
let dataset_at_index = dataset.checkout_version(index.dataset_version).await?;
let frags = 0..dataset_at_index.manifest.max_fragment_id.unwrap_or(0);
index_fragmaps.push(RoaringBitmap::from_sorted_iter(frags).unwrap());
}
}
Ok(index_fragmaps)
}
pub async fn plan_compaction(
dataset: &Dataset,
options: &CompactionOptions,
) -> Result<CompactionPlan> {
let planner = DefaultCompactionPlanner::new(options.clone());
planner.plan(dataset).await
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct RewriteResult {
pub metrics: CompactionMetrics,
pub new_fragments: Vec<Fragment>,
pub read_version: u64,
pub original_fragments: Vec<Fragment>,
pub row_addrs: Option<Vec<u8>>,
}
async fn reserve_fragment_ids(
dataset: &Dataset,
fragments: impl ExactSizeIterator<Item = &mut Fragment>,
) -> Result<()> {
let transaction = Transaction::new(
dataset.manifest.version,
Operation::ReserveFragments {
num_fragments: fragments.len() as u32,
},
None,
);
let (manifest, _) = commit_transaction(
dataset,
dataset.object_store(),
dataset.commit_handler.as_ref(),
&transaction,
&Default::default(),
&Default::default(),
dataset.manifest_location.naming_scheme,
None,
)
.await?;
let new_max_exclusive = manifest.max_fragment_id.unwrap_or(0) + 1;
let reserved_ids = (new_max_exclusive - fragments.len() as u32)..(new_max_exclusive);
for (fragment, new_id) in fragments.zip(reserved_ids) {
fragment.id = new_id as u64;
}
Ok(())
}
async fn rewrite_files(
dataset: Cow<'_, Dataset>,
task: TaskData,
options: &CompactionOptions,
) -> Result<RewriteResult> {
let mut metrics = CompactionMetrics::default();
if task.fragments.is_empty() {
return Ok(RewriteResult {
metrics,
new_fragments: Vec::new(),
read_version: dataset.manifest.version,
original_fragments: task.fragments,
row_addrs: None,
});
}
let previous_writer_version = &dataset.manifest.writer_version;
let recompute_stats = previous_writer_version.is_none();
let fragments = migrate_fragments(dataset.as_ref(), &task.fragments, recompute_stats).await?;
let num_rows = fragments
.iter()
.map(|f| f.physical_rows.unwrap() as u64)
.sum::<u64>();
let needs_remapping = !dataset.manifest.uses_stable_row_ids();
let mut new_fragments: Vec<Fragment>;
let task_id = uuid::Uuid::new_v4();
log::info!(
"Compaction task {}: Begin compacting {} rows across {} fragments",
task_id,
num_rows,
fragments.len()
);
let mode = options.compaction_mode();
let can_binary_copy = can_use_binary_copy(dataset.as_ref(), options, &fragments).await;
if !can_binary_copy && matches!(mode, CompactionMode::ForceBinaryCopy) {
return Err(Error::not_supported_source(
format!("compaction task {}: binary copy is not supported", task_id).into(),
));
}
let mut row_ids_rx: Option<std::sync::mpsc::Receiver<CapturedRowIds>> = None;
let mut reader: Option<SendableRecordBatchStream> = None;
if !can_binary_copy {
let (prepared_reader, rx_initial) = prepare_reader(
dataset.as_ref(),
&fragments,
options.batch_size,
true,
needs_remapping,
)
.await?;
row_ids_rx = rx_initial;
let mut rows_read = 0;
let schema = prepared_reader.schema();
let reader_with_progress = prepared_reader.inspect_ok(move |batch| {
rows_read += batch.num_rows();
log::info!(
"Compaction task {}: Read progress {}/{}",
task_id,
rows_read,
num_rows,
);
});
reader = Some(Box::pin(RecordBatchStreamAdapter::new(
schema,
reader_with_progress,
)));
}
let mut params = WriteParams {
max_rows_per_file: options.target_rows_per_fragment,
max_rows_per_group: options.max_rows_per_group,
mode: WriteMode::Append,
..Default::default()
};
if let Some(max_bytes_per_file) = options.max_bytes_per_file {
params.max_bytes_per_file = max_bytes_per_file;
}
if dataset.manifest.uses_stable_row_ids() {
params.enable_stable_row_ids = true;
}
if can_binary_copy {
new_fragments = rewrite_files_binary_copy(
dataset.as_ref(),
&fragments,
¶ms,
options.binary_copy_read_batch_bytes,
)
.await?;
if new_fragments.is_empty() && matches!(mode, CompactionMode::ForceBinaryCopy) {
return Err(Error::not_supported_source(
format!("compaction task {}: binary copy is not supported", task_id).into(),
));
}
if needs_remapping {
let (tx, rx) = std::sync::mpsc::channel();
let mut addrs = RoaringTreemap::new();
for frag in &fragments {
let frag_id = frag.id as u32;
let count = u64::try_from(frag.physical_rows.unwrap_or(0)).map_err(|_| {
Error::internal(format!(
"Fragment {} has too many physical rows to represent as row addresses",
frag.id
))
})?;
let start = u64::from(lance_core::utils::address::RowAddress::first_row(frag_id));
addrs.insert_range(start..start + count);
}
let captured = CapturedRowIds::AddressStyle(addrs);
let _ = tx.send(captured);
row_ids_rx = Some(rx);
}
} else {
let (frags, _) = write_fragments_internal(
Some(dataset.as_ref()),
dataset.object_store.clone(),
&dataset.base,
dataset.schema().clone(),
reader.expect("reader must be prepared for non-binary-copy path"),
params,
None,
)
.await?;
new_fragments = frags;
}
log::info!("Compaction task {}: file written", task_id);
let row_addrs = if let Some(row_ids_rx) = row_ids_rx {
let captured_ids = row_ids_rx
.try_recv()
.map_err(|err| Error::internal(format!("Failed to receive row ids: {}", err)))?;
let row_addrs = captured_ids.row_addrs(None).into_owned();
let mut serialized = Vec::with_capacity(row_addrs.serialized_size());
row_addrs.serialize_into(&mut serialized)?;
Some(serialized)
} else {
if dataset.manifest.uses_stable_row_ids() {
log::info!("Compaction task {}: rechunking stable row ids", task_id);
rechunk_stable_row_ids(dataset.as_ref(), &mut new_fragments, &fragments).await?;
recalc_versions_for_rewritten_fragments(
dataset.as_ref(),
&mut new_fragments,
&fragments,
)
.await?;
}
None
};
metrics.files_removed = task
.fragments
.iter()
.map(|f| f.files.len() + f.deletion_file.is_some() as usize)
.sum();
metrics.fragments_removed = task.fragments.len();
metrics.fragments_added = new_fragments.len();
metrics.files_added = new_fragments
.iter()
.map(|f| f.files.len() + f.deletion_file.is_some() as usize)
.sum();
log::info!("Compaction task {}: completed", task_id);
Ok(RewriteResult {
metrics,
new_fragments,
read_version: dataset.manifest.version,
original_fragments: fragments,
row_addrs,
})
}
async fn rechunk_stable_row_ids(
dataset: &Dataset,
new_fragments: &mut [Fragment],
old_fragments: &[Fragment],
) -> Result<()> {
let mut old_sequences = load_row_id_sequences(dataset, old_fragments)
.try_collect::<Vec<_>>()
.await?;
old_sequences.sort_by_key(|(frag_id, _)| {
old_fragments
.iter()
.position(|frag| frag.id as u32 == *frag_id)
.expect("Fragment not found")
});
futures::stream::iter(old_sequences.iter_mut().zip(old_fragments.iter()))
.map(Ok)
.try_for_each(|((_, seq), frag)| async move {
if let Some(deletion_file) = &frag.deletion_file {
let deletions = read_dataset_deletion_file(dataset, frag.id, deletion_file).await?;
let mut new_seq = seq.as_ref().clone();
new_seq.mask(deletions.to_sorted_iter())?;
*seq = Arc::new(new_seq);
}
Ok::<(), crate::Error>(())
})
.await?;
debug_assert_eq!(
{ old_sequences.iter().map(|(_, seq)| seq.len()).sum::<u64>() },
{
new_fragments
.iter()
.map(|frag| frag.physical_rows.unwrap() as u64)
.sum::<u64>()
},
"{:?}",
old_sequences
);
let new_sequences = lance_table::rowids::rechunk_sequences(
old_sequences
.into_iter()
.map(|(_, seq)| seq.as_ref().clone()),
new_fragments
.iter()
.map(|frag| frag.physical_rows.unwrap() as u64),
false,
)?;
for (fragment, sequence) in new_fragments.iter_mut().zip(new_sequences) {
let serialized = lance_table::rowids::write_row_ids(&sequence);
fragment.row_id_meta = Some(RowIdMeta::Inline(serialized));
}
Ok(())
}
async fn recalc_versions_for_rewritten_fragments(
dataset: &Dataset,
new_fragments: &mut [Fragment],
old_fragments: &[Fragment],
) -> Result<()> {
let mut old_last_updated_sequences: Vec<lance_table::format::RowDatasetVersionSequence> =
Vec::with_capacity(old_fragments.len());
let mut old_created_at_sequences: Vec<lance_table::format::RowDatasetVersionSequence> =
Vec::with_capacity(old_fragments.len());
for frag in old_fragments.iter() {
let row_count = if let Some(row_id_meta) = &frag.row_id_meta {
match row_id_meta {
RowIdMeta::Inline(data) => lance_table::rowids::read_row_ids(data)?.len(),
RowIdMeta::External(_file) => frag.physical_rows.unwrap_or(0) as u64,
}
} else {
frag.physical_rows.unwrap_or(0) as u64
};
let mut created_at_seq = if let Some(version_meta) = &frag.created_at_version_meta {
version_meta.load_sequence().map_err(|e| {
Error::internal(format!("Failed to load created_at version sequence: {}", e))
})?
} else {
lance_table::format::RowDatasetVersionSequence::from_uniform_row_count(row_count, 1)
};
let mut last_updated_seq = if let Some(version_meta) = &frag.last_updated_at_version_meta {
version_meta.load_sequence().map_err(|e| {
Error::internal(format!(
"Failed to load last_updated_at version sequence: {}",
e
))
})?
} else {
created_at_seq.clone()
};
if let Some(deletion_file) = &frag.deletion_file {
let deletions = read_dataset_deletion_file(dataset, frag.id, deletion_file).await?;
last_updated_seq.mask(deletions.to_sorted_iter())?;
created_at_seq.mask(deletions.to_sorted_iter())?;
}
old_last_updated_sequences.push(last_updated_seq);
old_created_at_sequences.push(created_at_seq);
}
let old_total: u64 = old_last_updated_sequences.iter().map(|s| s.len()).sum();
let new_total: u64 = new_fragments
.iter()
.map(|f| f.physical_rows.unwrap_or(0) as u64)
.sum();
debug_assert_eq!(old_total, new_total);
let chunk_sizes: Vec<u64> = new_fragments
.iter()
.map(|f| f.physical_rows.unwrap_or(0) as u64)
.collect();
let new_last_updated_sequences = lance_table::rowids::version::rechunk_version_sequences(
old_last_updated_sequences,
chunk_sizes.clone(),
false,
)?;
let new_created_at_sequences = lance_table::rowids::version::rechunk_version_sequences(
old_created_at_sequences,
chunk_sizes,
false,
)?;
for ((fragment, last_updated_seq), created_at_seq) in new_fragments
.iter_mut()
.zip(new_last_updated_sequences.into_iter())
.zip(new_created_at_sequences.into_iter())
{
fragment.last_updated_at_version_meta = Some(
lance_table::format::RowDatasetVersionMeta::from_sequence(&last_updated_seq).unwrap(),
);
fragment.created_at_version_meta = Some(
lance_table::format::RowDatasetVersionMeta::from_sequence(&created_at_seq).unwrap(),
);
}
Ok(())
}
pub async fn commit_compaction(
dataset: &mut Dataset,
completed_tasks: Vec<RewriteResult>,
remap_options: Arc<dyn IndexRemapperOptions>,
options: &CompactionOptions,
) -> Result<CompactionMetrics> {
if completed_tasks.is_empty() {
return Ok(CompactionMetrics::default());
}
let needs_remapping = !dataset.manifest.uses_stable_row_ids() && !options.defer_index_remap;
let mut completed_tasks = completed_tasks;
let has_address_style = completed_tasks.iter().any(|t| t.row_addrs.is_some());
if has_address_style {
let frags: Vec<&mut Fragment> = completed_tasks
.iter_mut()
.filter(|t| t.row_addrs.is_some())
.flat_map(|t| t.new_fragments.iter_mut())
.collect();
reserve_fragment_ids(dataset, frags.into_iter()).await?;
}
let mut rewrite_groups = Vec::with_capacity(completed_tasks.len());
let mut metrics = CompactionMetrics::default();
let mut row_id_map: HashMap<u64, Option<u64>> = HashMap::default();
let mut frag_reuse_groups: Vec<FragReuseGroup> = Vec::new();
let mut new_fragment_bitmap: RoaringBitmap = RoaringBitmap::new();
for task in completed_tasks {
metrics += task.metrics;
let rewrite_group = RewriteGroup {
old_fragments: task.original_fragments.clone(),
new_fragments: task.new_fragments.clone(),
};
if needs_remapping {
if let Some(row_addrs_bytes) = task.row_addrs {
let row_addrs =
RoaringTreemap::deserialize_from(&mut Cursor::new(&row_addrs_bytes))?;
let transposed = remapping::transpose_row_addrs(
row_addrs,
&task.original_fragments,
&task.new_fragments,
);
row_id_map.extend(transposed);
}
} else if options.defer_index_remap {
let changed_row_addrs = task.row_addrs.ok_or_else(|| {
Error::internal(
"defer_index_remap requires row_addrs but none were provided".to_string(),
)
})?;
frag_reuse_groups.push(FragReuseGroup {
changed_row_addrs,
old_frags: task.original_fragments.iter().map(|f| f.into()).collect(),
new_frags: task.new_fragments.iter().map(|f| f.into()).collect(),
});
task.new_fragments.iter().for_each(|frag| {
new_fragment_bitmap.insert(frag.id as u32);
});
}
rewrite_groups.push(rewrite_group);
}
let rewritten_indices = if needs_remapping {
let index_remapper = remap_options.create_remapper(dataset)?;
let affected_ids = rewrite_groups
.iter()
.flat_map(|group| group.old_fragments.iter().map(|frag| frag.id))
.collect::<Vec<_>>();
let remapped_indices = index_remapper
.remap_indices(row_id_map, &affected_ids)
.await?;
remapped_indices
.into_iter()
.map(|rewritten| RewrittenIndex {
old_id: rewritten.old_id,
new_id: rewritten.new_id,
new_index_details: rewritten.index_details,
new_index_version: rewritten.index_version,
new_index_files: rewritten.files,
})
.collect()
} else if !options.defer_index_remap && !has_address_style {
let new_fragments = rewrite_groups
.iter_mut()
.flat_map(|group| group.new_fragments.iter_mut())
.collect::<Vec<_>>();
reserve_fragment_ids(dataset, new_fragments.into_iter()).await?;
Vec::new()
} else {
Vec::new()
};
let frag_reuse_index = if options.defer_index_remap {
Some(build_new_frag_reuse_index(dataset, frag_reuse_groups, new_fragment_bitmap).await?)
} else {
None
};
let transaction = TransactionBuilder::new(
dataset.manifest.version,
Operation::Rewrite {
groups: rewrite_groups,
rewritten_indices,
frag_reuse_index,
},
)
.transaction_properties(options.transaction_properties.clone())
.build();
dataset
.apply_commit(transaction, &Default::default(), &Default::default())
.await?;
Ok(metrics)
}
#[cfg(test)]
mod tests {
mod binary_copy;
use self::remapping::RemappedIndex;
use super::*;
use crate::dataset::WriteDestination;
use crate::dataset::index::frag_reuse::cleanup_frag_reuse_index;
use crate::dataset::optimize::remapping::{transpose_row_addrs, transpose_row_ids_from_digest};
use crate::index::frag_reuse::{load_frag_reuse_index_details, open_frag_reuse_index};
use crate::index::vector::{StageParams, VectorIndexParams};
use crate::utils::test::{DatagenExt, FragmentCount, FragmentRowCount};
use arrow_array::types::{Float32Type, Float64Type, Int32Type, Int64Type};
use arrow_array::{
ArrayRef, Float32Array, Int32Array, Int64Array, LargeBinaryArray, LargeStringArray,
PrimitiveArray, RecordBatch, RecordBatchIterator,
};
use arrow_schema::{DataType, Field, Schema};
use arrow_select::concat::concat_batches;
use async_trait::async_trait;
use lance_arrow::BLOB_META_KEY;
use lance_core::Error;
use lance_core::utils::address::RowAddress;
use lance_core::utils::tempfile::TempStrDir;
use lance_datagen::Dimension;
use lance_file::version::LanceFileVersion;
use lance_index::frag_reuse::FRAG_REUSE_INDEX_NAME;
use lance_index::scalar::{
BuiltinIndexType, FullTextSearchQuery, InvertedIndexParams, ScalarIndexParams,
};
use lance_index::vector::ivf::IvfBuildParams;
use lance_index::vector::pq::PQBuildParams;
use lance_index::{Index, IndexType};
use lance_linalg::distance::{DistanceType, MetricType};
use lance_table::io::manifest::read_manifest_indexes;
use lance_testing::datagen::{BatchGenerator, IncrementingInt32, RandomVector};
use rstest::rstest;
use std::collections::HashSet;
use std::io::Cursor;
use std::sync::Arc;
use uuid::Uuid;
#[test]
fn test_candidate_bin() {
let empty_bin = CandidateBin {
fragments: vec![],
pos_range: 0..0,
candidacy: vec![],
row_counts: vec![],
indices: vec![],
};
assert!(empty_bin.is_noop());
let fragment = Fragment {
id: 0,
files: vec![],
deletion_file: None,
row_id_meta: None,
physical_rows: Some(0),
last_updated_at_version_meta: None,
created_at_version_meta: None,
};
let single_bin = CandidateBin {
fragments: vec![fragment.clone()],
pos_range: 0..1,
candidacy: vec![CompactionCandidacy::CompactWithNeighbors],
row_counts: vec![100],
indices: vec![],
};
assert!(single_bin.is_noop());
let single_bin = CandidateBin {
fragments: vec![fragment.clone()],
pos_range: 0..1,
candidacy: vec![CompactionCandidacy::CompactItself],
row_counts: vec![100],
indices: vec![],
};
assert!(!single_bin.is_noop());
let big_bin = CandidateBin {
fragments: std::iter::repeat_n(fragment, 8).collect(),
pos_range: 0..8,
candidacy: std::iter::repeat_n(CompactionCandidacy::CompactItself, 8).collect(),
row_counts: vec![100, 400, 200, 200, 400, 300, 300, 100],
indices: vec![],
};
assert!(!big_bin.is_noop());
let split = big_bin.split_for_size(500);
assert_eq!(split.len(), 3);
assert_eq!(split[0].pos_range, 0..2);
assert_eq!(split[1].pos_range, 2..5);
assert_eq!(split[2].pos_range, 5..8);
}
fn sample_data() -> RecordBatch {
let schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]);
RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(Int64Array::from_iter_values(0..10_000))],
)
.unwrap()
}
#[derive(Debug, Default, Clone, PartialEq)]
struct MockIndexRemapperExpectation {
expected: HashMap<u64, Option<u64>>,
answer: Vec<RemappedIndex>,
}
#[derive(Debug, Default, Clone, PartialEq)]
struct MockIndexRemapper {
expectations: Vec<MockIndexRemapperExpectation>,
}
impl MockIndexRemapper {
fn stringify_map(map: &HashMap<u64, Option<u64>>) -> String {
let mut sorted_keys = map.keys().collect::<Vec<_>>();
sorted_keys.sort();
let mut first_keys = sorted_keys
.into_iter()
.take(10)
.map(|key| {
format!(
"{}:{:?}",
RowAddress::from(*key),
map[key].map(RowAddress::from)
)
})
.collect::<Vec<_>>()
.join(",");
if map.len() > 10 {
first_keys.push_str(", ...");
}
let mut result_str = format!("(len={})", map.len());
result_str.push_str(&first_keys);
result_str
}
fn in_any_order(expectations: &[Self]) -> Self {
let expectations = expectations
.iter()
.flat_map(|item| item.expectations.clone())
.collect::<Vec<_>>();
Self { expectations }
}
}
#[async_trait]
impl IndexRemapper for MockIndexRemapper {
async fn remap_indices(
&self,
index_map: HashMap<u64, Option<u64>>,
_: &[u64],
) -> Result<Vec<RemappedIndex>> {
for expectation in &self.expectations {
if expectation.expected == index_map {
return Ok(expectation.answer.clone());
}
}
panic!(
"Unexpected index map (len={}): {}\n Options: {}",
index_map.len(),
Self::stringify_map(&index_map),
self.expectations
.iter()
.map(|expectation| Self::stringify_map(&expectation.expected))
.collect::<Vec<_>>()
.join("\n ")
);
}
}
impl IndexRemapperOptions for MockIndexRemapper {
fn create_remapper(&self, _: &Dataset) -> Result<Box<dyn IndexRemapper>> {
Ok(Box::new(self.clone()))
}
}
#[rstest]
#[tokio::test]
async fn test_compact_empty(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
let test_dir = TempStrDir::default();
let test_uri = &test_dir;
let schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]);
let reader = RecordBatchIterator::new(vec![].into_iter().map(Ok), Arc::new(schema));
let mut dataset = Dataset::write(
reader,
test_uri,
Some(WriteParams {
data_storage_version: Some(data_storage_version),
..Default::default()
}),
)
.await
.unwrap();
let plan = plan_compaction(&dataset, &CompactionOptions::default())
.await
.unwrap();
assert_eq!(plan.tasks().len(), 0);
let metrics = compact_files(&mut dataset, CompactionOptions::default(), None)
.await
.unwrap();
assert_eq!(metrics, CompactionMetrics::default());
assert_eq!(dataset.manifest.version, 1);
}
#[rstest]
#[tokio::test]
async fn test_compact_all_good(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
let test_dir = TempStrDir::default();
let test_uri = &test_dir;
let data = sample_data();
let reader = RecordBatchIterator::new(vec![Ok(data.clone())], data.schema());
let write_params = WriteParams {
max_rows_per_file: 10_000,
data_storage_version: Some(data_storage_version),
..Default::default()
};
let dataset = Dataset::write(reader, test_uri, Some(write_params))
.await
.unwrap();
let plan = plan_compaction(&dataset, &CompactionOptions::default())
.await
.unwrap();
assert_eq!(plan.tasks().len(), 0);
let reader = RecordBatchIterator::new(vec![Ok(data.clone())], data.schema());
let write_params = WriteParams {
max_rows_per_file: 3_000,
max_rows_per_group: 1_000,
data_storage_version: Some(data_storage_version),
mode: WriteMode::Overwrite,
..Default::default()
};
let dataset = Dataset::write(reader, test_uri, Some(write_params))
.await
.unwrap();
let options = CompactionOptions {
target_rows_per_fragment: 3_000,
..Default::default()
};
let plan = plan_compaction(&dataset, &options).await.unwrap();
assert_eq!(plan.tasks().len(), 0);
}
#[tokio::test]
async fn test_compact_blob_columns() {
let test_dir = TempStrDir::default();
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("blob", DataType::LargeBinary, false)
.with_metadata([(BLOB_META_KEY.to_string(), "true".to_string())].into()),
]));
let expected_payload: Vec<Vec<u8>> =
vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9, 10], vec![11]];
let id_column: ArrayRef = Arc::new(Int32Array::from_iter_values(
0..expected_payload.len() as i32,
));
let blob_array: ArrayRef = Arc::new(LargeBinaryArray::from_iter(
expected_payload.iter().map(|value| Some(value.as_slice())),
));
let batch = RecordBatch::try_new(schema.clone(), vec![id_column, blob_array]).unwrap();
let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
let mut dataset = Dataset::write(
reader,
&test_dir,
Some(WriteParams {
max_rows_per_file: 1,
..Default::default()
}),
)
.await
.unwrap();
dataset.validate().await.unwrap();
assert!(dataset.get_fragments().len() > 1);
compact_files(&mut dataset, CompactionOptions::default(), None)
.await
.unwrap();
dataset.validate().await.unwrap();
assert_eq!(dataset.get_fragments().len(), 1);
let dataset = Arc::new(dataset);
let row_indices: Vec<u64> = (0..expected_payload.len() as u64).collect();
let blobs = dataset
.take_blobs_by_indices(&row_indices, "blob")
.await
.unwrap();
assert_eq!(blobs.len(), expected_payload.len());
for (blob, expected) in blobs.iter().zip(expected_payload.iter()) {
let bytes = blob.read().await.unwrap();
assert_eq!(bytes.as_ref(), expected.as_slice());
}
}
fn row_addrs(frag_idx: u32, offsets: Range<u32>) -> Range<u64> {
let start = RowAddress::new_from_parts(frag_idx, offsets.start);
let end = RowAddress::new_from_parts(frag_idx, offsets.end);
start.into()..end.into()
}
fn expect_remap(
ranges: &[Vec<(Range<u64>, bool)>],
starting_new_frag_idx: u32,
) -> MockIndexRemapper {
let mut expected_remap: HashMap<u64, Option<u64>> = HashMap::default();
expected_remap.reserve(ranges.iter().map(|r| r.len()).sum());
for (new_frag_offset, new_frag_ranges) in ranges.iter().enumerate() {
let new_frag_idx = starting_new_frag_idx + new_frag_offset as u32;
let mut row_offset = 0;
for (old_id_range, is_found) in new_frag_ranges.iter() {
for old_id in old_id_range.clone() {
if *is_found {
let new_id = RowAddress::new_from_parts(new_frag_idx, row_offset);
expected_remap.insert(old_id, Some(new_id.into()));
row_offset += 1;
} else {
expected_remap.insert(old_id, None);
}
}
}
}
MockIndexRemapper {
expectations: vec![MockIndexRemapperExpectation {
expected: expected_remap,
answer: vec![],
}],
}
}
#[rstest]
#[tokio::test]
async fn test_compact_many(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
let test_dir = TempStrDir::default();
let test_uri = &test_dir;
let data = sample_data();
let reader = RecordBatchIterator::new(vec![Ok(data.slice(0, 1200))], data.schema());
let write_params = WriteParams {
max_rows_per_file: 400,
data_storage_version: Some(data_storage_version),
..Default::default()
};
Dataset::write(reader, test_uri, Some(write_params))
.await
.unwrap();
let reader = RecordBatchIterator::new(vec![Ok(data.slice(1200, 2000))], data.schema());
let write_params = WriteParams {
max_rows_per_file: 1000,
data_storage_version: Some(data_storage_version),
mode: WriteMode::Append,
..Default::default()
};
let mut dataset = Dataset::write(reader, test_uri, Some(write_params))
.await
.unwrap();
dataset.delete("a = 1300").await.unwrap();
dataset.delete("a >= 2400 AND a < 2600").await.unwrap();
let reader = RecordBatchIterator::new(vec![Ok(data.slice(3200, 600))], data.schema());
let write_params = WriteParams {
max_rows_per_file: 300,
data_storage_version: Some(data_storage_version),
mode: WriteMode::Append,
..Default::default()
};
let mut dataset = Dataset::write(reader, test_uri, Some(write_params))
.await
.unwrap();
let first_new_frag_idx = 7;
let remap_a = expect_remap(
&[
vec![
(row_addrs(0, 0..400), true),
(row_addrs(1, 0..400), true),
(row_addrs(2, 0..200), true),
],
vec![(row_addrs(2, 200..400), true)],
vec![
(row_addrs(4, 0..200), true),
(row_addrs(4, 200..400), false),
(row_addrs(4, 400..1000), true),
(row_addrs(5, 0..200), true),
],
vec![(row_addrs(5, 200..300), true), (row_addrs(6, 0..300), true)],
],
first_new_frag_idx,
);
let remap_b = expect_remap(
&[
vec![
(row_addrs(4, 0..200), true),
(row_addrs(4, 200..400), false),
(row_addrs(4, 400..1000), true),
(row_addrs(5, 0..200), true),
],
vec![(row_addrs(5, 200..300), true), (row_addrs(6, 0..300), true)],
vec![
(row_addrs(0, 0..400), true),
(row_addrs(1, 0..400), true),
(row_addrs(2, 0..200), true),
],
vec![(row_addrs(2, 200..400), true)],
],
first_new_frag_idx,
);
let options = CompactionOptions {
target_rows_per_fragment: 1000,
..Default::default()
};
let plan = plan_compaction(&dataset, &options).await.unwrap();
assert_eq!(plan.tasks().len(), 2);
assert_eq!(plan.tasks()[0].fragments.len(), 3);
assert_eq!(plan.tasks()[1].fragments.len(), 3);
assert_eq!(
plan.tasks()[0]
.fragments
.iter()
.map(|f| f.id)
.collect::<Vec<_>>(),
vec![0, 1, 2]
);
assert_eq!(
plan.tasks()[1]
.fragments
.iter()
.map(|f| f.id)
.collect::<Vec<_>>(),
vec![4, 5, 6]
);
let mock_remapper = MockIndexRemapper::in_any_order(&[remap_a, remap_b]);
let metrics = compact_files(&mut dataset, options, Some(Arc::new(mock_remapper)))
.await
.unwrap();
assert_eq!(metrics.fragments_removed, 6);
assert_eq!(metrics.fragments_added, 4);
assert_eq!(metrics.files_removed, 7); assert_eq!(metrics.files_added, 4);
let fragment_ids = dataset
.get_fragments()
.iter()
.map(|f| f.id())
.collect::<Vec<_>>();
assert_eq!(fragment_ids, vec![3, 7, 8, 9, 10]);
}
#[rstest]
#[tokio::test]
async fn test_compact_data_files(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
let test_dir = TempStrDir::default();
let test_uri = &test_dir;
let data = sample_data();
let reader = RecordBatchIterator::new(vec![Ok(data.clone())], data.schema());
let write_params = WriteParams {
max_rows_per_file: 5_000,
max_rows_per_group: 1_000,
data_storage_version: Some(data_storage_version),
..Default::default()
};
let mut dataset = Dataset::write(reader, test_uri, Some(write_params))
.await
.unwrap();
let schema = Schema::new(vec![
Field::new("a", DataType::Int64, false),
Field::new("x", DataType::Float32, false),
]);
let data = RecordBatch::try_new(
Arc::new(schema),
vec![
Arc::new(Int64Array::from_iter_values(0..10_000)),
Arc::new(Float32Array::from_iter_values(
(0..10_000).map(|x| x as f32 * std::f32::consts::PI),
)),
],
)
.unwrap();
let reader = RecordBatchIterator::new(vec![Ok(data.clone())], data.schema());
dataset.merge(reader, "a", "a").await.unwrap();
let expected_remap = expect_remap(
&[vec![
(row_addrs(0, 0..5000), true),
(row_addrs(1, 0..5000), true),
]],
2,
);
let plan = plan_compaction(
&dataset,
&CompactionOptions {
..Default::default()
},
)
.await
.unwrap();
assert_eq!(plan.tasks().len(), 1);
assert_eq!(plan.tasks()[0].fragments.len(), 2);
let metrics = compact_files(&mut dataset, plan.options, Some(Arc::new(expected_remap)))
.await
.unwrap();
assert_eq!(metrics.files_removed, 4); assert_eq!(metrics.files_added, 1); assert_eq!(metrics.fragments_removed, 2);
assert_eq!(metrics.fragments_added, 1);
let scanner = dataset.scan();
let batches = scanner
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let scanned_data = concat_batches(&batches[0].schema(), &batches).unwrap();
assert_eq!(scanned_data, data);
}
#[rstest]
#[tokio::test]
async fn test_compact_deletions(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
let test_dir = TempStrDir::default();
let test_uri = &test_dir;
let data = sample_data();
let reader = RecordBatchIterator::new(vec![Ok(data.slice(0, 1000))], data.schema());
let write_params = WriteParams {
max_rows_per_file: 1000,
data_storage_version: Some(data_storage_version),
..Default::default()
};
let mut dataset = Dataset::write(reader, test_uri, Some(write_params))
.await
.unwrap();
dataset.delete("a <= 500").await.unwrap();
let mut options = CompactionOptions {
materialize_deletions_threshold: 0.8,
..Default::default()
};
let plan = plan_compaction(&dataset, &options).await.unwrap();
assert_eq!(plan.tasks().len(), 0);
options.materialize_deletions_threshold = 0.1;
options.materialize_deletions = false;
let plan = plan_compaction(&dataset, &options).await.unwrap();
assert_eq!(plan.tasks().len(), 0);
options.materialize_deletions = true;
let plan = plan_compaction(&dataset, &options).await.unwrap();
assert_eq!(plan.tasks().len(), 1);
let metrics = compact_files(&mut dataset, options, None).await.unwrap();
assert_eq!(metrics.fragments_removed, 1);
assert_eq!(metrics.files_removed, 2);
assert_eq!(metrics.fragments_added, 1);
let fragments = dataset.get_fragments();
assert_eq!(fragments.len(), 1);
assert!(fragments[0].metadata.deletion_file.is_none());
}
#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
struct IgnoreRemap {}
#[async_trait]
impl IndexRemapper for IgnoreRemap {
async fn remap_indices(
&self,
_: HashMap<u64, Option<u64>>,
_: &[u64],
) -> Result<Vec<RemappedIndex>> {
Ok(Vec::new())
}
}
impl IndexRemapperOptions for IgnoreRemap {
fn create_remapper(&self, _: &Dataset) -> Result<Box<dyn IndexRemapper>> {
Ok(Box::new(Self {}))
}
}
#[rstest::rstest]
#[tokio::test]
async fn test_compact_distributed(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
#[values(false, true)] use_stable_row_id: bool,
) {
let test_dir = TempStrDir::default();
let test_uri = &test_dir;
let data = sample_data();
let reader = RecordBatchIterator::new(vec![Ok(data.slice(0, 9000))], data.schema());
let write_params = WriteParams {
max_rows_per_file: 1000,
data_storage_version: Some(data_storage_version),
enable_stable_row_ids: use_stable_row_id,
..Default::default()
};
let mut dataset = Dataset::write(reader, test_uri, Some(write_params))
.await
.unwrap();
let options = CompactionOptions {
target_rows_per_fragment: 3_000,
..Default::default()
};
let plan = plan_compaction(&dataset, &options).await.unwrap();
assert_eq!(plan.tasks().len(), 3);
let dataset_ref = &dataset;
let mut results = futures::stream::iter(plan.compaction_tasks())
.then(|task| async move { task.execute(dataset_ref).await.unwrap() })
.collect::<Vec<_>>()
.await;
assert_eq!(results.len(), 3);
assert_eq!(
results[0]
.original_fragments
.iter()
.map(|f| f.id)
.collect::<Vec<_>>(),
vec![0, 1, 2]
);
assert_eq!(results[0].metrics.files_removed, 3);
assert_eq!(results[0].metrics.files_added, 1);
commit_compaction(
&mut dataset,
vec![results.pop().unwrap()],
Arc::new(IgnoreRemap::default()),
&options,
)
.await
.unwrap();
assert_eq!(dataset.manifest.version, 3);
commit_compaction(
&mut dataset,
results,
Arc::new(IgnoreRemap::default()),
&options,
)
.await
.unwrap();
assert_eq!(dataset.manifest.version, 5);
assert_eq!(dataset.manifest.uses_stable_row_ids(), use_stable_row_id,);
}
#[tokio::test]
async fn test_stable_row_indices() {
let mut data_gen = BatchGenerator::new()
.col(Box::new(
RandomVector::new().vec_width(16).named("vec".to_owned()),
))
.col(Box::new(IncrementingInt32::new().named("i".to_owned())));
let mut dataset = Dataset::write(
data_gen.batch(500),
"memory://test/table",
Some(WriteParams {
enable_stable_row_ids: true,
max_rows_per_file: 100, ..Default::default()
}),
)
.await
.unwrap();
dataset.delete("i < 110").await.unwrap();
dataset
.create_index(
&["i"],
IndexType::Scalar,
Some("scalar".into()),
&ScalarIndexParams::default(),
false,
)
.await
.unwrap();
let params = VectorIndexParams::ivf_pq(1, 8, 1, MetricType::L2, 50);
dataset
.create_index(
&["vec"],
IndexType::Vector,
Some("vector".into()),
¶ms,
false,
)
.await
.unwrap();
async fn index_set(dataset: &Dataset) -> HashSet<Uuid> {
dataset
.load_indices()
.await
.unwrap()
.iter()
.map(|index| index.uuid)
.collect()
}
let indices = index_set(&dataset).await;
async fn vector_query(dataset: &Dataset) -> RecordBatch {
let mut scanner = dataset.scan();
let query = Float32Array::from(vec![0.0f32; 16]);
scanner
.nearest("vec", &query, 10)
.unwrap()
.project(&["i"])
.unwrap();
scanner.try_into_batch().await.unwrap()
}
async fn scalar_query(dataset: &Dataset) -> RecordBatch {
let mut scanner = dataset.scan();
scanner.filter("i = 100").unwrap().project(&["i"]).unwrap();
scanner.try_into_batch().await.unwrap()
}
let before_vec_result = vector_query(&dataset).await;
let before_scalar_result = scalar_query(&dataset).await;
let options = CompactionOptions {
target_rows_per_fragment: 180,
..Default::default()
};
let _metrics = compact_files(&mut dataset, options, None).await.unwrap();
let current_indices = index_set(&dataset).await;
assert_eq!(indices, current_indices);
let after_vec_result = vector_query(&dataset).await;
assert_eq!(before_vec_result, after_vec_result);
let after_scalar_result = scalar_query(&dataset).await;
assert_eq!(before_scalar_result, after_scalar_result);
}
#[tokio::test]
async fn test_defer_index_remap_large_external_file() {
let test_dir = TempStrDir::default();
let test_uri = &test_dir;
let num_fragments = 150usize;
let rows_per_fragment = 1000usize;
let total_rows = num_fragments * rows_per_fragment;
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
let mut dataset = Dataset::write(
RecordBatchIterator::new(
vec![Ok(RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..total_rows as i32)) as ArrayRef],
)
.unwrap())],
schema.clone(),
),
test_uri,
Some(WriteParams {
max_rows_per_file: rows_per_fragment,
..Default::default()
}),
)
.await
.unwrap();
assert_eq!(dataset.get_fragments().len(), num_fragments);
dataset.delete("i % 1000 = 0").await.unwrap();
compact_files(
&mut dataset,
CompactionOptions {
defer_index_remap: true,
..Default::default()
},
None,
)
.await
.unwrap();
let frag_reuse_meta = dataset
.load_index_by_name(FRAG_REUSE_INDEX_NAME)
.await
.unwrap()
.expect("fragment reuse index must exist after compaction");
load_frag_reuse_index_details(&dataset, &frag_reuse_meta)
.await
.expect("loading large frag reuse index details must not fail");
}
#[tokio::test]
async fn test_defer_index_remap() {
let mut data_gen = BatchGenerator::new()
.col(Box::new(
RandomVector::new().vec_width(128).named("vec".to_owned()),
))
.col(Box::new(IncrementingInt32::new().named("i".to_owned())));
let mut dataset = Dataset::write(
data_gen.batch(6_000),
"memory://test/table",
Some(WriteParams {
max_rows_per_file: 1_000, ..Default::default()
}),
)
.await
.unwrap();
let mut data_gen2 = BatchGenerator::new()
.col(Box::new(
RandomVector::new().vec_width(128).named("vec".to_owned()),
))
.col(Box::new(IncrementingInt32::new().named("i".to_owned())));
let mut dataset2 = Dataset::write(
data_gen2.batch(6_000),
"memory://test/table",
Some(WriteParams {
max_rows_per_file: 1_000, ..Default::default()
}),
)
.await
.unwrap();
dataset.delete("i < 500").await.unwrap();
dataset2.delete("i < 500").await.unwrap();
dataset
.create_index(
&["i"],
IndexType::Scalar,
Some("scalar".into()),
&ScalarIndexParams::default(),
false,
)
.await
.unwrap();
let initial_indices = dataset.load_indices().await.unwrap();
assert_eq!(initial_indices.len(), 1);
assert_eq!(initial_indices[0].name, "scalar");
let original_scalar_uuid = initial_indices[0].uuid;
let options = CompactionOptions {
target_rows_per_fragment: 2_000,
defer_index_remap: true,
..Default::default()
};
let options2 = CompactionOptions {
target_rows_per_fragment: 2_000,
defer_index_remap: false,
..Default::default()
};
let plan = plan_compaction(&dataset, &options).await.unwrap();
let plan2 = plan_compaction(&dataset2, &options2).await.unwrap();
let mut expected_all_old_frag_ids = Vec::new();
let mut expected_all_new_frag_ids = Vec::new();
let mut expected_all_new_frag_bitmap = RoaringBitmap::new();
let mut expected_all_row_id_map = HashMap::new();
let mut deferred_results = Vec::new();
let mut immediate_results = Vec::new();
for (task, task2) in plan.tasks().iter().zip(plan2.tasks()) {
let deferred_result = rewrite_files(Cow::Borrowed(&dataset), task.clone(), &options)
.await
.unwrap();
let immediate_result =
rewrite_files(Cow::Borrowed(&dataset2), task2.clone(), &options2)
.await
.unwrap();
assert!(deferred_result.row_addrs.is_some());
assert!(!deferred_result.row_addrs.as_ref().unwrap().is_empty());
assert!(!deferred_result.row_addrs.as_ref().unwrap().is_empty());
assert!(!deferred_result.original_fragments.is_empty());
assert!(!deferred_result.new_fragments.is_empty());
assert!(immediate_result.row_addrs.is_some());
assert!(!immediate_result.original_fragments.is_empty());
assert!(!immediate_result.new_fragments.is_empty());
assert_eq!(deferred_result.row_addrs, immediate_result.row_addrs);
deferred_results.push(deferred_result);
immediate_results.push(immediate_result);
}
{
let frags: Vec<&mut Fragment> = immediate_results
.iter_mut()
.flat_map(|r| r.new_fragments.iter_mut())
.collect();
reserve_fragment_ids(&dataset2, frags.into_iter())
.await
.unwrap();
}
for immediate_result in &immediate_results {
let row_addrs_bytes = immediate_result.row_addrs.as_ref().unwrap();
let row_addrs =
RoaringTreemap::deserialize_from(&mut Cursor::new(row_addrs_bytes)).unwrap();
let transposed = transpose_row_addrs(
row_addrs,
&immediate_result.original_fragments,
&immediate_result.new_fragments,
);
expected_all_row_id_map.extend(transposed);
immediate_result.new_fragments.iter().for_each(|frag| {
expected_all_new_frag_bitmap.insert(frag.id as u32);
});
expected_all_new_frag_ids.extend(
immediate_result
.new_fragments
.iter()
.map(|s| s.id)
.collect::<Vec<_>>(),
);
expected_all_old_frag_ids.extend(
immediate_result
.original_fragments
.iter()
.map(|s| s.id)
.collect::<Vec<_>>(),
);
}
let first_metrics = commit_compaction(
&mut dataset,
deferred_results.clone(),
Arc::new(DatasetIndexRemapperOptions::default()),
&options,
)
.await
.unwrap();
assert!(first_metrics.fragments_removed > 0);
assert!(first_metrics.fragments_added > 0);
let Some(frag_reuse_index_meta) = dataset
.load_index_by_name(FRAG_REUSE_INDEX_NAME)
.await
.unwrap()
else {
panic!("Fragment reuse index must be available");
};
assert_eq!(
frag_reuse_index_meta.fragment_bitmap.clone().unwrap(),
expected_all_new_frag_bitmap
);
let frag_reuse_details = load_frag_reuse_index_details(&dataset, &frag_reuse_index_meta)
.await
.unwrap();
let frag_reuse_index =
open_frag_reuse_index(frag_reuse_index_meta.uuid, frag_reuse_details.as_ref())
.await
.unwrap();
let stats = frag_reuse_index.statistics().unwrap();
assert_eq!(
serde_json::to_string(&stats).unwrap(),
dataset
.index_statistics(FRAG_REUSE_INDEX_NAME)
.await
.unwrap()
);
let compaction_version = &frag_reuse_index.details.versions[0];
assert_eq!(frag_reuse_index.details.versions.len(), 1);
assert_eq!(
compaction_version.dataset_version,
frag_reuse_index_meta.dataset_version
);
let mut compacted_all_old_frag_digests = Vec::new();
let mut compacted_all_new_frag_digests = Vec::new();
let mut transposed_map = HashMap::new();
for group in compaction_version.groups.iter() {
let changed_row_addr_bytes = &group.changed_row_addrs;
let mut cursor = Cursor::new(&changed_row_addr_bytes);
let changed_row_addrs = RoaringTreemap::deserialize_from(&mut cursor).unwrap();
compacted_all_old_frag_digests.extend(group.old_frags.clone());
compacted_all_new_frag_digests.extend(group.new_frags.clone());
let group_transposed_map = transpose_row_ids_from_digest(
changed_row_addrs,
&group.old_frags,
&group.new_frags,
);
transposed_map.extend(group_transposed_map);
}
assert_eq!(transposed_map, expected_all_row_id_map);
assert_eq!(
compacted_all_old_frag_digests
.iter()
.map(|f| f.id)
.collect::<Vec<_>>(),
expected_all_old_frag_ids
);
assert_eq!(
compacted_all_new_frag_digests
.iter()
.map(|f| f.id)
.collect::<Vec<_>>(),
expected_all_new_frag_ids
);
let Some(current_scalar_index) = dataset.load_index_by_name("scalar").await.unwrap() else {
panic!("scalar index must be available");
};
assert_eq!(current_scalar_index.uuid, original_scalar_uuid);
}
#[tokio::test]
async fn test_defer_index_remap_multiple_compactions() {
let mut data_gen = BatchGenerator::new()
.col(Box::new(
RandomVector::new().vec_width(128).named("vec".to_owned()),
))
.col(Box::new(IncrementingInt32::new().named("i".to_owned())));
let mut dataset = Dataset::write(
data_gen.batch(6_000),
"memory://test/table",
Some(WriteParams {
max_rows_per_file: 1_000, ..Default::default()
}),
)
.await
.unwrap();
let options = CompactionOptions {
target_rows_per_fragment: 2_000,
defer_index_remap: true,
..Default::default()
};
let mut compact_read_versions = Vec::new();
for i in 0..10 {
dataset
.delete(&format!("i < {}", 500 * (i + 1)))
.await
.unwrap();
let read_version = dataset.manifest.version;
compact_files(&mut dataset, options.clone(), None)
.await
.unwrap();
if dataset.manifest.version > read_version {
compact_read_versions.push(read_version);
}
let Some(frag_reuse_index_meta) = dataset
.load_index_by_name(FRAG_REUSE_INDEX_NAME)
.await
.unwrap()
else {
panic!("Fragment reuse index must be available");
};
let frag_reuse_details =
load_frag_reuse_index_details(&dataset, &frag_reuse_index_meta)
.await
.unwrap();
let frag_reuse_index =
open_frag_reuse_index(frag_reuse_index_meta.uuid, frag_reuse_details.as_ref())
.await
.unwrap();
assert_eq!(
frag_reuse_index
.details
.versions
.iter()
.map(|v| v.dataset_version)
.collect::<Vec<_>>(),
compact_read_versions
);
}
}
#[tokio::test]
async fn test_remap_index_after_compaction() {
let mut data_gen = BatchGenerator::new()
.col(Box::new(
RandomVector::new().vec_width(128).named("vec".to_owned()),
))
.col(Box::new(IncrementingInt32::new().named("i".to_owned())));
let mut dataset = Dataset::write(
data_gen.batch(6_000),
"memory://test/table",
Some(WriteParams {
max_rows_per_file: 1_000, ..Default::default()
}),
)
.await
.unwrap();
let index_name = Some("scalar".into());
dataset
.create_index(
&["i"],
IndexType::Scalar,
index_name.clone(),
&ScalarIndexParams::default(),
false,
)
.await
.unwrap();
let options = CompactionOptions {
target_rows_per_fragment: 2_000,
defer_index_remap: true,
..Default::default()
};
let Some(scalar_index) = dataset.load_index_by_name("scalar").await.unwrap() else {
panic!("scalar index must be available");
};
let result = remapping::remap_column_index(&mut dataset, &["i"], index_name.clone()).await;
assert!(matches!(result, Err(Error::NotSupported { .. })));
let plan = plan_compaction(&dataset, &options).await.unwrap();
for task in plan.tasks().iter() {
let rewrite_result = rewrite_files(Cow::Borrowed(&dataset), task.clone(), &options)
.await
.unwrap();
commit_compaction(
&mut dataset,
Vec::from([rewrite_result]),
Arc::new(DatasetIndexRemapperOptions::default()),
&options,
)
.await
.unwrap();
}
let Some(frag_reuse_index_meta) = dataset
.load_index_by_name(FRAG_REUSE_INDEX_NAME)
.await
.unwrap()
else {
panic!("Fragment reuse index must be available");
};
let frag_reuse_details = load_frag_reuse_index_details(&dataset, &frag_reuse_index_meta)
.await
.unwrap();
let frag_reuse_index =
open_frag_reuse_index(frag_reuse_index_meta.uuid, frag_reuse_details.as_ref())
.await
.unwrap();
assert_eq!(frag_reuse_index.details.versions.len(), plan.tasks().len());
let mut all_fragment_bitmap = RoaringBitmap::new();
dataset.fragments().iter().for_each(|f| {
all_fragment_bitmap.insert(f.id as u32);
});
let Some(scalar_index_before_remap) = dataset.load_index_by_name("scalar").await.unwrap()
else {
panic!("scalar index must be available");
};
assert_eq!(
scalar_index_before_remap.fragment_bitmap.unwrap(),
all_fragment_bitmap
);
remapping::remap_column_index(&mut dataset, &["i"], index_name.clone())
.await
.unwrap();
let indices = read_manifest_indexes(
&dataset.object_store,
&dataset.manifest_location,
&dataset.manifest,
)
.await
.unwrap();
let Some(remapped_scalar_index) = indices.into_iter().find(|idx| idx.name == "scalar")
else {
panic!("scalar index must be available");
};
assert_ne!(remapped_scalar_index.uuid, scalar_index.uuid);
assert_eq!(
remapped_scalar_index.fragment_bitmap.unwrap(),
all_fragment_bitmap
);
}
#[tokio::test]
async fn test_concurrent_compaction_reindex_compaction_commit_first() {
let mut data_gen = BatchGenerator::new()
.col(Box::new(
RandomVector::new().vec_width(128).named("vec".to_owned()),
))
.col(Box::new(IncrementingInt32::new().named("i".to_owned())));
let mut dataset = Dataset::write(
data_gen.batch(6_000),
"memory://test/table",
Some(WriteParams {
max_rows_per_file: 1_000, ..Default::default()
}),
)
.await
.unwrap();
let index_name = Some("scalar".into());
dataset
.create_index(
&["i"],
IndexType::Scalar,
index_name.clone(),
&ScalarIndexParams::default(),
false,
)
.await
.unwrap();
Dataset::write(
data_gen.batch(6_000),
WriteDestination::Dataset(Arc::new(dataset.clone())),
Some(WriteParams {
max_rows_per_file: 1_000, mode: WriteMode::Append,
..Default::default()
}),
)
.await
.unwrap();
dataset.checkout_latest().await.unwrap();
let mut dataset_clone = dataset.clone();
compact_files(
&mut dataset,
CompactionOptions {
target_rows_per_fragment: 2_000,
defer_index_remap: true,
..Default::default()
},
None,
)
.await
.unwrap();
dataset_clone
.create_index(
&["i"],
IndexType::Scalar,
index_name.clone(),
&ScalarIndexParams::default(),
true,
)
.await
.unwrap();
dataset.checkout_latest().await.unwrap();
let Some(scalar_index) = dataset.load_index_by_name("scalar").await.unwrap() else {
panic!("scalar index must be available");
};
let index_frags = scalar_index
.fragment_bitmap
.unwrap()
.iter()
.collect::<HashSet<_>>();
assert_eq!(
index_frags,
dataset
.fragments()
.iter()
.map(|f| f.id as u32)
.collect::<HashSet<_>>()
)
}
#[tokio::test]
async fn test_concurrent_compaction_reindex_reindex_commit_first() {
let mut data_gen = BatchGenerator::new()
.col(Box::new(
RandomVector::new().vec_width(128).named("vec".to_owned()),
))
.col(Box::new(IncrementingInt32::new().named("i".to_owned())));
let mut dataset = Dataset::write(
data_gen.batch(6_000),
"memory://test/table",
Some(WriteParams {
max_rows_per_file: 1_000, ..Default::default()
}),
)
.await
.unwrap();
let index_name = Some("scalar".into());
dataset
.create_index(
&["i"],
IndexType::Scalar,
index_name.clone(),
&ScalarIndexParams::default(),
false,
)
.await
.unwrap();
Dataset::write(
data_gen.batch(6_000),
WriteDestination::Dataset(Arc::new(dataset.clone())),
Some(WriteParams {
max_rows_per_file: 1_000, mode: WriteMode::Append,
..Default::default()
}),
)
.await
.unwrap();
dataset.checkout_latest().await.unwrap();
let mut dataset_clone = dataset.clone();
dataset
.create_index(
&["i"],
IndexType::Scalar,
index_name.clone(),
&ScalarIndexParams::default(),
true,
)
.await
.unwrap();
compact_files(
&mut dataset_clone,
CompactionOptions {
target_rows_per_fragment: 2_000,
defer_index_remap: true,
..Default::default()
},
None,
)
.await
.unwrap();
dataset.checkout_latest().await.unwrap();
let Some(scalar_index) = dataset.load_index_by_name("scalar").await.unwrap() else {
panic!("scalar index must be available");
};
let index_frags = scalar_index
.fragment_bitmap
.unwrap()
.iter()
.collect::<HashSet<_>>();
assert_eq!(
index_frags,
dataset
.fragments()
.iter()
.map(|f| f.id as u32)
.collect::<HashSet<_>>()
)
}
#[tokio::test]
async fn test_concurrent_cleanup_and_compaction_rebase_cleanup() {
let mut dataset = lance_datagen::gen_batch()
.col(
"vec",
lance_datagen::array::rand_vec::<Float32Type>(Dimension::from(128)),
)
.col("i", lance_datagen::array::step::<Int32Type>())
.into_ram_dataset(FragmentCount::from(6), FragmentRowCount::from(1000))
.await
.unwrap();
let options = CompactionOptions {
target_rows_per_fragment: 2_000,
defer_index_remap: true,
..Default::default()
};
let plan = plan_compaction(&dataset, &options).await.unwrap();
let tasks = plan.tasks();
let rewrite_result = rewrite_files(Cow::Borrowed(&dataset), tasks[0].clone(), &options)
.await
.unwrap();
commit_compaction(
&mut dataset,
Vec::from([rewrite_result]),
Arc::new(DatasetIndexRemapperOptions::default()),
&options,
)
.await
.unwrap();
let mut dataset_clone = dataset.clone();
let Some(frag_reuse_index_meta) = dataset
.load_index_by_name(FRAG_REUSE_INDEX_NAME)
.await
.unwrap()
else {
panic!("Fragment reuse index must be available");
};
let frag_reuse_details = load_frag_reuse_index_details(&dataset, &frag_reuse_index_meta)
.await
.unwrap();
assert_eq!(frag_reuse_details.versions.len(), 1);
let rewrite_result2 = rewrite_files(Cow::Borrowed(&dataset), tasks[1].clone(), &options)
.await
.unwrap();
let rewritten_frags2 = rewrite_result2
.original_fragments
.iter()
.map(|f| f.id)
.collect::<Vec<_>>();
commit_compaction(
&mut dataset,
Vec::from([rewrite_result2]),
Arc::new(DatasetIndexRemapperOptions::default()),
&options,
)
.await
.unwrap();
let frag_reuse_index_meta2 = dataset
.load_index_by_name(FRAG_REUSE_INDEX_NAME)
.await
.unwrap()
.unwrap();
let frag_reuse_details2 = load_frag_reuse_index_details(&dataset, &frag_reuse_index_meta2)
.await
.unwrap();
let new_frags2 = frag_reuse_details2.versions.last().unwrap().new_frag_ids();
let rewrite_result3 = rewrite_files(Cow::Borrowed(&dataset), tasks[2].clone(), &options)
.await
.unwrap();
let rewritten_frags3 = rewrite_result3
.original_fragments
.iter()
.map(|f| f.id)
.collect::<Vec<_>>();
commit_compaction(
&mut dataset,
Vec::from([rewrite_result3]),
Arc::new(DatasetIndexRemapperOptions::default()),
&options,
)
.await
.unwrap();
let frag_reuse_index_meta3 = dataset
.load_index_by_name(FRAG_REUSE_INDEX_NAME)
.await
.unwrap()
.unwrap();
let frag_reuse_details3 = load_frag_reuse_index_details(&dataset, &frag_reuse_index_meta3)
.await
.unwrap();
let new_frags3 = frag_reuse_details3.versions.last().unwrap().new_frag_ids();
cleanup_frag_reuse_index(&mut dataset_clone).await.unwrap();
dataset.checkout_latest().await.unwrap();
let Some(frag_reuse_index_meta) = dataset
.load_index_by_name(FRAG_REUSE_INDEX_NAME)
.await
.unwrap()
else {
panic!("Fragment reuse index must be available");
};
let frag_reuse_details = load_frag_reuse_index_details(&dataset, &frag_reuse_index_meta)
.await
.unwrap();
assert_eq!(frag_reuse_details.versions.len(), 2);
assert_eq!(
frag_reuse_details.versions[0].old_frag_ids(),
rewritten_frags2
);
assert_eq!(frag_reuse_details.versions[0].new_frag_ids(), new_frags2);
assert_eq!(
frag_reuse_details.versions[1].old_frag_ids(),
rewritten_frags3
);
assert_eq!(frag_reuse_details.versions[1].new_frag_ids(), new_frags3);
}
#[tokio::test]
async fn test_concurrent_cleanup_and_compaction_rebase_compaction() {
let mut dataset = lance_datagen::gen_batch()
.col(
"vec",
lance_datagen::array::rand_vec::<Float32Type>(Dimension::from(128)),
)
.col("i", lance_datagen::array::step::<Int32Type>())
.into_ram_dataset(FragmentCount::from(6), FragmentRowCount::from(1000))
.await
.unwrap();
let options = CompactionOptions {
target_rows_per_fragment: 2_000,
defer_index_remap: true,
..Default::default()
};
let plan = plan_compaction(&dataset, &options).await.unwrap();
let tasks = plan.tasks();
let rewrite_result = rewrite_files(Cow::Borrowed(&dataset), tasks[0].clone(), &options)
.await
.unwrap();
commit_compaction(
&mut dataset,
Vec::from([rewrite_result]),
Arc::new(DatasetIndexRemapperOptions::default()),
&options,
)
.await
.unwrap();
let mut dataset_clone = dataset.clone();
let Some(frag_reuse_index_meta) = dataset
.load_index_by_name(FRAG_REUSE_INDEX_NAME)
.await
.unwrap()
else {
panic!("Fragment reuse index must be available");
};
let frag_reuse_details = load_frag_reuse_index_details(&dataset, &frag_reuse_index_meta)
.await
.unwrap();
assert_eq!(frag_reuse_details.versions.len(), 1);
cleanup_frag_reuse_index(&mut dataset).await.unwrap();
dataset.checkout_latest().await.unwrap();
let Some(frag_reuse_index_meta) = dataset
.load_index_by_name(FRAG_REUSE_INDEX_NAME)
.await
.unwrap()
else {
panic!("Fragment reuse index must be available");
};
let frag_reuse_details = load_frag_reuse_index_details(&dataset, &frag_reuse_index_meta)
.await
.unwrap();
assert_eq!(frag_reuse_details.versions.len(), 0);
let rewrite_result2 =
rewrite_files(Cow::Borrowed(&dataset_clone), tasks[1].clone(), &options)
.await
.unwrap();
let rewritten_frags2 = rewrite_result2
.original_fragments
.iter()
.map(|f| f.id)
.collect::<Vec<_>>();
commit_compaction(
&mut dataset_clone,
Vec::from([rewrite_result2]),
Arc::new(DatasetIndexRemapperOptions::default()),
&options,
)
.await
.unwrap();
dataset.checkout_latest().await.unwrap();
let Some(frag_reuse_index_meta) = dataset
.load_index_by_name(FRAG_REUSE_INDEX_NAME)
.await
.unwrap()
else {
panic!("Fragment reuse index must be available");
};
let frag_reuse_details = load_frag_reuse_index_details(&dataset, &frag_reuse_index_meta)
.await
.unwrap();
assert_eq!(frag_reuse_details.versions.len(), 1);
assert_eq!(
frag_reuse_details.versions[0].old_frag_ids(),
rewritten_frags2
);
let new_frags2 = frag_reuse_details.versions[0].new_frag_ids();
assert!(new_frags2.iter().all(|id| *id != 0));
}
#[tokio::test]
async fn test_concurrent_compactions_with_defer_index_remap() {
let mut dataset = lance_datagen::gen_batch()
.col(
"vec",
lance_datagen::array::rand_vec::<Float32Type>(Dimension::from(128)),
)
.col("i", lance_datagen::array::step::<Int32Type>())
.into_ram_dataset(FragmentCount::from(6), FragmentRowCount::from(1000))
.await
.unwrap();
let options = CompactionOptions {
target_rows_per_fragment: 2_000,
defer_index_remap: true,
..Default::default()
};
let plan = plan_compaction(&dataset, &options).await.unwrap();
let tasks = plan.tasks();
let mut dataset_clone = dataset.clone();
let rewrite_result = rewrite_files(Cow::Borrowed(&dataset), tasks[0].clone(), &options)
.await
.unwrap();
commit_compaction(
&mut dataset,
Vec::from([rewrite_result]),
Arc::new(DatasetIndexRemapperOptions::default()),
&options,
)
.await
.unwrap();
let Some(frag_reuse_index_meta) = dataset
.load_index_by_name(FRAG_REUSE_INDEX_NAME)
.await
.unwrap()
else {
panic!("Fragment reuse index must be available");
};
let frag_reuse_details = load_frag_reuse_index_details(&dataset, &frag_reuse_index_meta)
.await
.unwrap();
assert_eq!(frag_reuse_details.versions.len(), 1);
let rewrite_result2 =
rewrite_files(Cow::Borrowed(&dataset_clone), tasks[1].clone(), &options)
.await
.unwrap();
let result = commit_compaction(
&mut dataset_clone,
Vec::from([rewrite_result2]),
Arc::new(DatasetIndexRemapperOptions::default()),
&options,
)
.await;
assert!(matches!(result, Err(Error::RetryableCommitConflict { .. })));
}
#[tokio::test]
async fn test_read_bitmap_index_with_defer_index_remap() {
let mut dataset = lance_datagen::gen_batch()
.col(
"vec",
lance_datagen::array::rand_vec::<Float32Type>(Dimension::from(128)),
)
.col(
"category",
lance_datagen::array::cycle::<Int32Type>(vec![1, 2, 3]),
)
.into_ram_dataset(FragmentCount::from(6), FragmentRowCount::from(1000))
.await
.unwrap();
let count1 = dataset
.count_rows(Some("category = 1".to_owned()))
.await
.unwrap();
let count2 = dataset
.count_rows(Some("category = 2".to_owned()))
.await
.unwrap();
let count3 = dataset
.count_rows(Some("category = 3".to_owned()))
.await
.unwrap();
let index_name = Some("category_idx".into());
dataset
.create_index(
&["category"],
IndexType::Bitmap,
index_name.clone(),
&ScalarIndexParams::default(),
false,
)
.await
.unwrap();
let indices = dataset.load_indices().await.unwrap();
let original_index = indices
.iter()
.find(|idx| idx.name == "category_idx")
.unwrap();
let options = CompactionOptions {
target_rows_per_fragment: 2_000,
defer_index_remap: true,
..Default::default()
};
let metrics = compact_files(&mut dataset, options, None).await.unwrap();
assert!(metrics.fragments_removed > 0);
assert!(metrics.fragments_added > 0);
let Some(current_index) = dataset.load_index_by_name("category_idx").await.unwrap() else {
panic!("category index must be available");
};
assert_eq!(current_index.uuid, original_index.uuid);
assert_eq!(
dataset
.count_rows(Some("category = 1".to_owned()))
.await
.unwrap(),
count1
);
assert_eq!(
dataset
.count_rows(Some("category = 2".to_owned()))
.await
.unwrap(),
count2
);
assert_eq!(
dataset
.count_rows(Some("category = 3".to_owned()))
.await
.unwrap(),
count3
);
let mut scanner = dataset.scan();
scanner.filter("category = 1").unwrap();
scanner.project::<String>(&[]).unwrap().with_row_id();
let plan = scanner.explain_plan(false).await.unwrap();
assert!(
plan.contains("ScalarIndexQuery: query=[category = 1]@category_idx"),
"Expected index query in plan: {}",
plan
);
}
#[tokio::test]
async fn test_read_btree_index_with_defer_index_remap() {
let mut dataset = lance_datagen::gen_batch()
.col(
"vec",
lance_datagen::array::rand_vec::<Float32Type>(Dimension::from(128)),
)
.col("id", lance_datagen::array::step::<Int32Type>())
.into_ram_dataset(FragmentCount::from(110), FragmentRowCount::from(1000))
.await
.unwrap();
let count_low = dataset
.count_rows(Some("id < 1000".to_owned()))
.await
.unwrap();
let count_mid = dataset
.count_rows(Some("id >= 2000 and id < 3000".to_owned()))
.await
.unwrap();
let count_high = dataset
.count_rows(Some("id >= 5000".to_owned()))
.await
.unwrap();
let index_name = Some("id_idx".into());
dataset
.create_index(
&["id"],
IndexType::BTree,
index_name.clone(),
&ScalarIndexParams::default(),
false,
)
.await
.unwrap();
let indices = dataset.load_indices().await.unwrap();
let original_index = indices.iter().find(|idx| idx.name == "id_idx").unwrap();
let options = CompactionOptions {
target_rows_per_fragment: 50_000,
defer_index_remap: true,
..Default::default()
};
let metrics = compact_files(&mut dataset, options, None).await.unwrap();
assert!(metrics.fragments_removed > 0);
assert!(metrics.fragments_added > 0);
let Some(current_index) = dataset.load_index_by_name("id_idx").await.unwrap() else {
panic!("id index must be available");
};
assert_eq!(current_index.uuid, original_index.uuid);
assert_eq!(
dataset
.count_rows(Some("id < 1000".to_owned()))
.await
.unwrap(),
count_low
);
assert_eq!(
dataset
.count_rows(Some("id >= 2000 and id < 3000".to_owned()))
.await
.unwrap(),
count_mid
);
assert_eq!(
dataset
.count_rows(Some("id >= 5000".to_owned()))
.await
.unwrap(),
count_high
);
let mut scanner = dataset.scan();
scanner.filter("id >= 2000 and id < 3000").unwrap();
scanner.project::<String>(&[]).unwrap().with_row_id();
let plan = scanner.explain_plan(false).await.unwrap();
assert!(
plan.contains("ScalarIndexQuery: query=[id >= 2000 && id < 3000]@id_idx"),
"Expected scalar index query in plan: {}",
plan
);
}
#[tokio::test]
async fn test_read_inverted_index_with_defer_index_remap() {
let mut words_gen = lance_datagen::array::random_sentence(1, 100, true);
let doc_col = words_gen
.generate_default(lance_datagen::RowCount::from(6000))
.unwrap();
let batch = RecordBatch::try_new(
Schema::new(vec![Field::new("doc", DataType::LargeUtf8, false)]).into(),
vec![doc_col.clone()],
)
.unwrap();
let schema_ref = batch.schema();
let stream = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema_ref);
let mut dataset = Dataset::write(
stream,
"memory://test/table",
Some(WriteParams {
max_rows_per_file: 1_000, ..Default::default()
}),
)
.await
.unwrap();
let large_string_array = doc_col.as_any().downcast_ref::<LargeStringArray>().unwrap();
let sample_words: Vec<String> = large_string_array
.value(0)
.split_whitespace()
.take(10)
.map(|s| s.to_string())
.collect();
let test_word1 = &sample_words[0];
let test_word2 = &sample_words[1];
let test_word3 = &sample_words[2];
let index_name = Some("doc_idx".into());
dataset
.create_index(
&["doc"],
IndexType::Inverted,
index_name.clone(),
&InvertedIndexParams::default(),
false,
)
.await
.unwrap();
let indices = dataset.load_indices().await.unwrap();
let original_index = indices.iter().find(|idx| idx.name == "doc_idx").unwrap();
let options = CompactionOptions {
target_rows_per_fragment: 2_000,
defer_index_remap: true,
..Default::default()
};
let metrics = compact_files(&mut dataset, options, None).await.unwrap();
assert!(metrics.fragments_removed > 0);
assert!(metrics.fragments_added > 0);
let Some(current_index) = dataset.load_index_by_name("doc_idx").await.unwrap() else {
panic!("doc index must be available");
};
assert_eq!(current_index.uuid, original_index.uuid);
let mut scanner = dataset.scan();
scanner
.full_text_search(FullTextSearchQuery::new(test_word1.clone()))
.unwrap();
scanner.project::<String>(&[]).unwrap().with_row_id();
let count1 = scanner.count_rows().await.unwrap();
scanner = dataset.scan();
scanner
.full_text_search(FullTextSearchQuery::new(test_word2.clone()))
.unwrap();
scanner.project::<String>(&[]).unwrap().with_row_id();
let count2 = scanner.count_rows().await.unwrap();
scanner = dataset.scan();
scanner
.full_text_search(FullTextSearchQuery::new(test_word3.clone()))
.unwrap();
scanner.project::<String>(&[]).unwrap().with_row_id();
let count3 = scanner.count_rows().await.unwrap();
let mut scanner = dataset.scan();
scanner
.full_text_search(FullTextSearchQuery::new(test_word1.clone()))
.unwrap();
scanner.project::<String>(&[]).unwrap().with_row_id();
let plan = scanner.explain_plan(true).await.unwrap();
assert!(
plan.contains("MatchQuery"),
"Expected inverted index scan in plan: {}",
plan
);
assert!(
!plan.contains("LanceScan"),
"Expected no fragment scan in plan: {}",
plan
);
dataset
.create_index(
&["doc"],
IndexType::Inverted,
index_name.clone(),
&InvertedIndexParams::default(),
true,
)
.await
.unwrap();
let mut scanner = dataset.scan();
scanner
.full_text_search(FullTextSearchQuery::new(test_word1.clone()))
.unwrap();
scanner.project::<String>(&[]).unwrap().with_row_id();
assert_eq!(scanner.count_rows().await.unwrap(), count1);
scanner = dataset.scan();
scanner
.full_text_search(FullTextSearchQuery::new(test_word2.clone()))
.unwrap();
scanner.project::<String>(&[]).unwrap().with_row_id();
assert_eq!(scanner.count_rows().await.unwrap(), count2);
scanner = dataset.scan();
scanner
.full_text_search(FullTextSearchQuery::new(test_word3.clone()))
.unwrap();
scanner.project::<String>(&[]).unwrap().with_row_id();
assert_eq!(scanner.count_rows().await.unwrap(), count3);
}
#[tokio::test]
async fn test_read_ngram_index_with_defer_index_remap() {
let mut words_gen = lance_datagen::array::random_sentence(1, 100, true);
let doc_col = words_gen
.generate_default(lance_datagen::RowCount::from(6000))
.unwrap();
let batch = RecordBatch::try_new(
Schema::new(vec![Field::new("doc", DataType::LargeUtf8, false)]).into(),
vec![doc_col.clone()],
)
.unwrap();
let schema_ref = batch.schema();
let stream = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema_ref);
let mut dataset = Dataset::write(
stream,
"memory://test/table",
Some(WriteParams {
max_rows_per_file: 1_000, ..Default::default()
}),
)
.await
.unwrap();
let large_string_array = doc_col.as_any().downcast_ref::<LargeStringArray>().unwrap();
let sample_words: Vec<String> = large_string_array
.value(0)
.split_whitespace()
.take(10)
.map(|s| s.to_string())
.collect();
let test_word1 = &sample_words[0];
let test_word2 = &sample_words[1];
let test_word3 = &sample_words[2];
let index_name = Some("doc_idx".into());
dataset
.create_index(
&["doc"],
IndexType::NGram,
index_name.clone(),
&ScalarIndexParams::default(),
false,
)
.await
.unwrap();
let indices = dataset.load_indices().await.unwrap();
let original_index = indices.iter().find(|idx| idx.name == "doc_idx").unwrap();
let count1 = dataset
.count_rows(Some(format!("contains(doc, '{}')", test_word1)))
.await
.unwrap();
let count2 = dataset
.count_rows(Some(format!("contains(doc, '{}')", test_word2)))
.await
.unwrap();
let count3 = dataset
.count_rows(Some(format!("contains(doc, '{}')", test_word3)))
.await
.unwrap();
let options = CompactionOptions {
target_rows_per_fragment: 2_000,
defer_index_remap: true,
..Default::default()
};
let metrics = compact_files(&mut dataset, options, None).await.unwrap();
assert!(metrics.fragments_removed > 0);
assert!(metrics.fragments_added > 0);
let Some(current_index) = dataset.load_index_by_name("doc_idx").await.unwrap() else {
panic!("doc index must be available");
};
assert_eq!(current_index.uuid, original_index.uuid);
assert_eq!(
dataset
.count_rows(Some(format!("contains(doc, '{}')", test_word1)))
.await
.unwrap(),
count1
);
assert_eq!(
dataset
.count_rows(Some(format!("contains(doc, '{}')", test_word2)))
.await
.unwrap(),
count2
);
assert_eq!(
dataset
.count_rows(Some(format!("contains(doc, '{}')", test_word3)))
.await
.unwrap(),
count3
);
let mut scanner = dataset.scan();
scanner
.filter(&format!("contains(doc, '{}')", test_word1))
.unwrap();
scanner.project::<String>(&[]).unwrap().with_row_id();
let plan = scanner.explain_plan(false).await.unwrap();
assert!(
plan.contains("ScalarIndexQuery: query=[contains(doc, Utf8"),
"Expected scalar index query in plan: {}",
plan
);
}
#[tokio::test]
async fn test_read_label_list_index_with_defer_index_remap() {
let mut dataset = lance_datagen::gen_batch()
.col(
"vec",
lance_datagen::array::rand_vec::<Float32Type>(Dimension::from(128)),
)
.col(
"labels",
lance_datagen::array::rand_list_any(
lance_datagen::array::cycle::<Int64Type>(vec![1, 2, 3, 4, 5]),
false,
),
)
.into_ram_dataset(FragmentCount::from(6), FragmentRowCount::from(1000))
.await
.unwrap();
let count1 = dataset
.count_rows(Some("array_has_any(labels, [1])".to_owned()))
.await
.unwrap();
let count2 = dataset
.count_rows(Some("array_has_any(labels, [5])".to_owned()))
.await
.unwrap();
let count3 = dataset
.count_rows(Some("array_has_any(labels, [10])".to_owned()))
.await
.unwrap();
let index_name = Some("labels_idx".into());
dataset
.create_index(
&["labels"],
IndexType::LabelList,
index_name.clone(),
&ScalarIndexParams::default(),
false,
)
.await
.unwrap();
let indices = dataset.load_indices().await.unwrap();
let original_index = indices.iter().find(|idx| idx.name == "labels_idx").unwrap();
let options = CompactionOptions {
target_rows_per_fragment: 2000,
defer_index_remap: true,
..Default::default()
};
let metrics = compact_files(&mut dataset, options, None).await.unwrap();
assert!(metrics.fragments_removed > 0);
assert!(metrics.fragments_added > 0);
let indices = dataset.load_indices().await.unwrap();
let current_index = indices.iter().find(|idx| idx.name == "labels_idx").unwrap();
assert_eq!(current_index.uuid, original_index.uuid);
assert_eq!(
dataset
.count_rows(Some("array_has_any(labels, [1])".to_owned()))
.await
.unwrap(),
count1
);
assert_eq!(
dataset
.count_rows(Some("array_has_any(labels, [5])".to_owned()))
.await
.unwrap(),
count2
);
assert_eq!(
dataset
.count_rows(Some("array_has_any(labels, [10])".to_owned()))
.await
.unwrap(),
count3
);
let mut scanner = dataset.scan();
scanner.filter("array_has_any(labels, [1])").unwrap();
scanner.project::<String>(&[]).unwrap().with_row_id();
let plan = scanner.explain_plan(false).await.unwrap();
assert!(
plan.contains("ScalarIndexQuery: query=[array_has_any(labels, List([1]))]@labels_idx"),
"Expected scalar index query in plan: {}",
plan
);
}
#[tokio::test]
async fn test_read_ivf_pq_index_v3_with_defer_index_remap() {
let mut dataset = lance_datagen::gen_batch()
.col(
"vec",
lance_datagen::array::rand_vec::<Float32Type>(Dimension::from(128)),
)
.into_ram_dataset(FragmentCount::from(6), FragmentRowCount::from(1000))
.await
.unwrap();
let query_vec1: PrimitiveArray<Float32Type> =
PrimitiveArray::from_iter_values(std::iter::repeat_n(0.0, 128));
let query_vec2: PrimitiveArray<Float32Type> =
PrimitiveArray::from_iter_values(std::iter::repeat_n(1.1, 128));
let query_vec3: PrimitiveArray<Float32Type> =
PrimitiveArray::from_iter_values(std::iter::repeat_n(2.2, 128));
let mut scanner = dataset.scan();
scanner.nearest("vec", &query_vec1, 10).unwrap();
scanner.project::<String>(&[]).unwrap().with_row_id();
let results1 = scanner
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let count1 = results1.len();
scanner = dataset.scan();
scanner.nearest("vec", &query_vec2, 10).unwrap();
scanner.project::<String>(&[]).unwrap().with_row_id();
let results2 = scanner
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let count2 = results2.len();
scanner = dataset.scan();
scanner.nearest("vec", &query_vec3, 10).unwrap();
scanner.project::<String>(&[]).unwrap().with_row_id();
let results3 = scanner
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let count3 = results3.len();
let index_name = Some("vec_idx".into());
dataset
.create_index(
&["vec"],
IndexType::Vector,
index_name.clone(),
&VectorIndexParams {
metric_type: DistanceType::L2,
stages: vec![
StageParams::Ivf(IvfBuildParams {
max_iters: 2,
num_partitions: Some(2),
sample_rate: 2,
..Default::default()
}),
StageParams::PQ(PQBuildParams {
max_iters: 2,
num_sub_vectors: 2,
..Default::default()
}),
],
version: crate::index::vector::IndexFileVersion::V3,
skip_transpose: false,
},
false,
)
.await
.unwrap();
let indices = dataset.load_indices().await.unwrap();
let original_index = indices.iter().find(|idx| idx.name == "vec_idx").unwrap();
let options = CompactionOptions {
target_rows_per_fragment: 2_000,
defer_index_remap: true,
..Default::default()
};
let metrics = compact_files(&mut dataset, options, None).await.unwrap();
assert!(metrics.fragments_removed > 0);
assert!(metrics.fragments_added > 0);
let Some(current_index) = dataset.load_index_by_name("vec_idx").await.unwrap() else {
panic!("vec index must be available");
};
assert_eq!(current_index.uuid, original_index.uuid);
let mut scanner = dataset.scan();
scanner.nearest("vec", &query_vec1, 10).unwrap();
scanner.project::<String>(&[]).unwrap().with_row_id();
let new_results1 = scanner
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(new_results1.len(), count1);
scanner = dataset.scan();
scanner.nearest("vec", &query_vec2, 10).unwrap();
scanner.project::<String>(&[]).unwrap().with_row_id();
let new_results2 = scanner
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(new_results2.len(), count2);
scanner = dataset.scan();
scanner.nearest("vec", &query_vec3, 10).unwrap();
scanner.project::<String>(&[]).unwrap().with_row_id();
let new_results3 = scanner
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(new_results3.len(), count3);
let mut scanner = dataset.scan();
scanner.nearest("vec", &query_vec1, 10).unwrap();
scanner.project::<String>(&[]).unwrap().with_row_id();
let plan = scanner.explain_plan(false).await.unwrap();
assert!(
plan.contains("ANNSubIndex"),
"Expected vector index scan in plan: {}",
plan
);
assert!(
!plan.contains("LanceScan"),
"Expected no fragment scan in plan: {}",
plan
);
}
#[tokio::test]
async fn test_default_compaction_planner() {
let test_dir = TempStrDir::default();
let test_uri = &test_dir;
let data = sample_data();
let schema = data.schema();
let reader = RecordBatchIterator::new(vec![Ok(data.clone())], schema.clone());
let write_params = WriteParams {
max_rows_per_file: 2000,
..Default::default()
};
let dataset = Dataset::write(reader, test_uri, Some(write_params))
.await
.unwrap();
assert_eq!(dataset.get_fragments().len(), 5);
let options = CompactionOptions {
target_rows_per_fragment: 5000,
materialize_deletions_threshold: 2.0,
..Default::default()
};
let planner = DefaultCompactionPlanner::new(options);
let plan = planner.plan(&dataset).await.unwrap();
assert!(!plan.tasks.is_empty());
assert_eq!(plan.read_version, dataset.manifest.version);
assert!(!plan.options.materialize_deletions);
}
#[test]
fn test_from_dataset_config() {
let config = HashMap::from([
(
"lance.compaction.target_rows_per_fragment".to_string(),
"500000".to_string(),
),
(
"lance.compaction.max_rows_per_group".to_string(),
"2048".to_string(),
),
(
"lance.compaction.max_bytes_per_file".to_string(),
"1000000".to_string(),
),
(
"lance.compaction.materialize_deletions".to_string(),
"false".to_string(),
),
(
"lance.compaction.materialize_deletions_threshold".to_string(),
"0.25".to_string(),
),
(
"lance.compaction.defer_index_remap".to_string(),
"true".to_string(),
),
(
"lance.compaction.batch_size".to_string(),
"4096".to_string(),
),
(
"lance.compaction.compaction_mode".to_string(),
"try_binary_copy".to_string(),
),
(
"lance.compaction.binary_copy_read_batch_bytes".to_string(),
"8388608".to_string(),
),
]);
let opts = CompactionOptions::from_dataset_config(&config).unwrap();
assert_eq!(opts.target_rows_per_fragment, 500_000);
assert_eq!(opts.max_rows_per_group, 2048);
assert_eq!(opts.max_bytes_per_file, Some(1_000_000));
assert!(!opts.materialize_deletions);
assert!((opts.materialize_deletions_threshold - 0.25).abs() < f32::EPSILON);
assert!(opts.defer_index_remap);
assert_eq!(opts.batch_size, Some(4096));
assert_eq!(opts.compaction_mode, Some(CompactionMode::TryBinaryCopy));
assert_eq!(opts.binary_copy_read_batch_bytes, Some(8_388_608));
}
#[test]
fn test_from_dataset_config_empty() {
let config = HashMap::new();
let opts = CompactionOptions::from_dataset_config(&config).unwrap();
let defaults = CompactionOptions::default();
assert_eq!(
opts.target_rows_per_fragment,
defaults.target_rows_per_fragment
);
assert_eq!(opts.max_rows_per_group, defaults.max_rows_per_group);
assert_eq!(opts.max_bytes_per_file, defaults.max_bytes_per_file);
assert_eq!(opts.materialize_deletions, defaults.materialize_deletions);
assert_eq!(
opts.materialize_deletions_threshold,
defaults.materialize_deletions_threshold
);
assert_eq!(opts.defer_index_remap, defaults.defer_index_remap);
assert_eq!(opts.batch_size, defaults.batch_size);
assert_eq!(opts.compaction_mode, defaults.compaction_mode);
assert_eq!(
opts.binary_copy_read_batch_bytes,
defaults.binary_copy_read_batch_bytes
);
}
#[test]
fn test_from_dataset_config_partial() {
let config = HashMap::from([(
"lance.compaction.target_rows_per_fragment".to_string(),
"500000".to_string(),
)]);
let opts = CompactionOptions::from_dataset_config(&config).unwrap();
assert_eq!(opts.target_rows_per_fragment, 500_000);
let defaults = CompactionOptions::default();
assert_eq!(opts.max_rows_per_group, defaults.max_rows_per_group);
assert_eq!(opts.max_bytes_per_file, defaults.max_bytes_per_file);
assert_eq!(opts.materialize_deletions, defaults.materialize_deletions);
assert_eq!(opts.defer_index_remap, defaults.defer_index_remap);
assert_eq!(opts.batch_size, defaults.batch_size);
assert_eq!(opts.compaction_mode, defaults.compaction_mode);
assert_eq!(
opts.binary_copy_read_batch_bytes,
defaults.binary_copy_read_batch_bytes
);
}
#[test]
fn test_from_dataset_config_ignores_other_keys() {
let config = HashMap::from([
(
"lance.compaction.target_rows_per_fragment".to_string(),
"500000".to_string(),
),
(
"lance.auto_cleanup.interval".to_string(),
"3600".to_string(),
),
("some.other.key".to_string(), "value".to_string()),
]);
let opts = CompactionOptions::from_dataset_config(&config).unwrap();
assert_eq!(opts.target_rows_per_fragment, 500_000);
}
#[test]
fn test_from_dataset_config_invalid_value() {
let config = HashMap::from([(
"lance.compaction.target_rows_per_fragment".to_string(),
"not_a_number".to_string(),
)]);
let result = CompactionOptions::from_dataset_config(&config);
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("target_rows_per_fragment"));
assert!(err_msg.contains("not_a_number"));
}
#[test]
fn test_from_dataset_config_invalid_bool() {
let config = HashMap::from([(
"lance.compaction.materialize_deletions".to_string(),
"yes".to_string(),
)]);
let result = CompactionOptions::from_dataset_config(&config);
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("materialize_deletions"));
assert!(err_msg.contains("yes"));
}
#[test]
fn test_from_dataset_config_unknown_compaction_key() {
let config = HashMap::from([(
"lance.compaction.unknown_key".to_string(),
"value".to_string(),
)]);
let opts = CompactionOptions::from_dataset_config(&config).unwrap();
let defaults = CompactionOptions::default();
assert_eq!(
opts.target_rows_per_fragment,
defaults.target_rows_per_fragment
);
}
#[test]
fn test_from_dataset_config_invalid_compaction_mode() {
let config = HashMap::from([(
"lance.compaction.compaction_mode".to_string(),
"invalid_mode".to_string(),
)]);
let result = CompactionOptions::from_dataset_config(&config);
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("invalid_mode"));
}
#[test]
fn test_apply_dataset_config_overrides() {
let config = HashMap::from([(
"lance.compaction.target_rows_per_fragment".to_string(),
"500000".to_string(),
)]);
let mut opts = CompactionOptions {
max_rows_per_group: 4096,
..Default::default()
};
opts.apply_dataset_config(&config).unwrap();
assert_eq!(opts.target_rows_per_fragment, 500_000);
assert_eq!(opts.max_rows_per_group, 4096);
}
#[test]
fn test_apply_dataset_config_overwrites_matching_field() {
let config = HashMap::from([(
"lance.compaction.max_rows_per_group".to_string(),
"2048".to_string(),
)]);
let mut opts = CompactionOptions {
max_rows_per_group: 4096,
..Default::default()
};
opts.apply_dataset_config(&config).unwrap();
assert_eq!(opts.max_rows_per_group, 2048);
}
#[tokio::test]
async fn test_max_source_fragments() {
let test_dir = TempStrDir::default();
let test_uri = &test_dir;
let data = sample_data();
let schema = data.schema();
let write_params = WriteParams {
max_rows_per_file: 100,
..Default::default()
};
Dataset::write(
RecordBatchIterator::new(vec![Ok(data.slice(0, 100))], schema.clone()),
test_uri,
Some(write_params.clone()),
)
.await
.unwrap();
for i in 1..10 {
let mut append_params = write_params.clone();
append_params.mode = WriteMode::Append;
Dataset::write(
RecordBatchIterator::new(vec![Ok(data.slice(i * 100, 100))], schema.clone()),
test_uri,
Some(append_params),
)
.await
.unwrap();
}
let dataset = Dataset::open(test_uri).await.unwrap();
assert_eq!(dataset.get_fragments().len(), 10);
let opts_no_limit = CompactionOptions {
target_rows_per_fragment: 250,
..Default::default()
};
let plan_all = plan_compaction(&dataset, &opts_no_limit).await.unwrap();
let total_source_frags: usize = plan_all.tasks().iter().map(|t| t.fragments.len()).sum();
assert_eq!(total_source_frags, 10);
assert!(
plan_all.num_tasks() > 2,
"need multiple tasks to test bounding, got {}",
plan_all.num_tasks()
);
let opts_bounded = CompactionOptions {
target_rows_per_fragment: 250,
max_source_fragments: Some(4),
..Default::default()
};
let plan_bounded = plan_compaction(&dataset, &opts_bounded).await.unwrap();
let bounded_source_frags: usize =
plan_bounded.tasks().iter().map(|t| t.fragments.len()).sum();
assert!(
bounded_source_frags <= 4,
"expected at most 4 source fragments, got {bounded_source_frags}"
);
assert!(
bounded_source_frags > 0,
"expected at least 1 source fragment in bounded plan"
);
assert!(
plan_bounded.num_tasks() < plan_all.num_tasks(),
"bounded plan ({}) should have fewer tasks than unbounded ({})",
plan_bounded.num_tasks(),
plan_all.num_tasks()
);
let mut dataset = dataset;
compact_files(&mut dataset, opts_bounded, None)
.await
.unwrap();
let after_first = dataset.get_fragments().len();
assert!(
after_first < 10,
"expected fewer than 10 fragments after first compaction, got {after_first}"
);
assert!(
after_first > 1,
"expected partial compaction (not fully compacted), got {after_first}"
);
let opts_bounded = CompactionOptions {
target_rows_per_fragment: 250,
max_source_fragments: Some(4),
..Default::default()
};
compact_files(&mut dataset, opts_bounded, None)
.await
.unwrap();
let after_second = dataset.get_fragments().len();
assert!(
after_second <= after_first,
"expected progress: {after_second} should be <= {after_first}"
);
}
#[tokio::test]
async fn test_compaction_uses_manifest_config() {
let test_dir = TempStrDir::default();
let test_uri = &test_dir;
let data = sample_data();
let schema = data.schema();
let reader = RecordBatchIterator::new(vec![Ok(data.clone())], schema.clone());
let write_params = WriteParams {
max_rows_per_file: 2000,
..Default::default()
};
let mut dataset = Dataset::write(reader, test_uri, Some(write_params))
.await
.unwrap();
assert_eq!(dataset.get_fragments().len(), 5);
dataset
.update_config([
("lance.compaction.target_rows_per_fragment", "5000"),
("lance.compaction.materialize_deletions_threshold", "2.0"),
])
.await
.unwrap();
let opts = CompactionOptions::from_dataset_config(&dataset.manifest.config).unwrap();
assert_eq!(opts.target_rows_per_fragment, 5000);
assert!((opts.materialize_deletions_threshold - 2.0).abs() < f32::EPSILON);
let plan = plan_compaction(&dataset, &opts).await.unwrap();
assert!(!plan.tasks.is_empty());
assert_eq!(plan.options.target_rows_per_fragment, 5000);
assert!(!plan.options.materialize_deletions);
}
}