use std::sync::Arc;
use lance::dataset::cleanup::RemovalStats;
use lance::dataset::optimize::{CompactionMetrics, IndexRemapperOptions, compact_files};
use lance_index::DatasetIndexExt;
use lance_index::optimize::OptimizeOptions;
use log::info;
pub use chrono::Duration;
pub use lance::dataset::optimize::CompactionOptions;
use super::NativeTable;
use crate::error::Result;
#[derive(Default)]
pub enum OptimizeAction {
#[default]
All,
Compact {
options: CompactionOptions,
remap_options: Option<Arc<dyn IndexRemapperOptions>>,
},
Prune {
older_than: Option<Duration>,
delete_unverified: Option<bool>,
error_if_tagged_old_versions: Option<bool>,
},
Index(OptimizeOptions),
}
#[derive(Debug, Default)]
pub struct OptimizeStats {
pub compaction: Option<CompactionMetrics>,
pub prune: Option<RemovalStats>,
}
pub(crate) async fn optimize_indices(table: &NativeTable, options: &OptimizeOptions) -> Result<()> {
info!("LanceDB: optimizing indices: {:?}", options);
table.dataset.ensure_mutable()?;
let mut dataset = (*table.dataset.get().await?).clone();
dataset.optimize_indices(options).await?;
table.dataset.update(dataset);
Ok(())
}
pub(crate) async fn cleanup_old_versions(
table: &NativeTable,
older_than: Duration,
delete_unverified: Option<bool>,
error_if_tagged_old_versions: Option<bool>,
) -> Result<RemovalStats> {
table.dataset.ensure_mutable()?;
let dataset = table.dataset.get().await?;
Ok(dataset
.cleanup_old_versions(older_than, delete_unverified, error_if_tagged_old_versions)
.await?)
}
pub(crate) async fn compact_files_impl(
table: &NativeTable,
options: CompactionOptions,
remap_options: Option<Arc<dyn IndexRemapperOptions>>,
) -> Result<CompactionMetrics> {
table.dataset.ensure_mutable()?;
let mut dataset = (*table.dataset.get().await?).clone();
let metrics = compact_files(&mut dataset, options, remap_options).await?;
table.dataset.update(dataset);
Ok(metrics)
}
pub(crate) async fn execute_optimize(
table: &NativeTable,
action: OptimizeAction,
) -> Result<OptimizeStats> {
let mut stats = OptimizeStats {
compaction: None,
prune: None,
};
match action {
OptimizeAction::All => {
stats.compaction =
Some(compact_files_impl(table, CompactionOptions::default(), None).await?);
stats.prune = Some(
cleanup_old_versions(
table,
Duration::try_days(7).expect("valid delta"),
None,
None,
)
.await?,
);
optimize_indices(table, &OptimizeOptions::default()).await?;
}
OptimizeAction::Compact {
options,
remap_options,
} => {
stats.compaction = Some(compact_files_impl(table, options, remap_options).await?);
}
OptimizeAction::Prune {
older_than,
delete_unverified,
error_if_tagged_old_versions,
} => {
stats.prune = Some(
cleanup_old_versions(
table,
older_than.unwrap_or(Duration::try_days(7).expect("valid delta")),
delete_unverified,
error_if_tagged_old_versions,
)
.await?,
);
}
OptimizeAction::Index(options) => {
optimize_indices(table, &options).await?;
}
}
Ok(stats)
}
#[cfg(test)]
mod tests {
use arrow_array::{Int32Array, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema};
use rstest::rstest;
use std::sync::Arc;
use crate::connect;
use crate::index::{Index, scalar::BTreeIndexBuilder};
use crate::query::ExecutableQuery;
use crate::table::{CompactionOptions, OptimizeAction, OptimizeStats};
use futures::TryStreamExt;
#[tokio::test]
async fn test_optimize_compact_simple() {
let conn = connect("memory://").execute().await.unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..100))],
)
.unwrap();
let table = conn
.create_table("test_compact", batch)
.execute()
.await
.unwrap();
for i in 0..5 {
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(
(i * 100 + 100)..((i + 1) * 100 + 100),
))],
)
.unwrap();
table.add(batch).execute().await.unwrap();
}
let initial_row_count = table.count_rows(None).await.unwrap();
assert_eq!(initial_row_count, 600);
let stats = table
.optimize(OptimizeAction::Compact {
options: CompactionOptions {
target_rows_per_fragment: 1000,
..Default::default()
},
remap_options: None,
})
.await
.unwrap();
assert!(stats.compaction.is_some());
let compaction_metrics = stats.compaction.unwrap();
assert!(compaction_metrics.fragments_removed > 0);
let final_row_count = table.count_rows(None).await.unwrap();
assert_eq!(final_row_count, 600);
let batches = table
.query()
.execute()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 600);
let mut all_values: Vec<i32> = Vec::new();
for batch in &batches {
let array = batch["i"].as_any().downcast_ref::<Int32Array>().unwrap();
all_values.extend(array.values().iter().copied());
}
all_values.sort();
let expected: Vec<i32> = (0..600).collect();
assert_eq!(all_values, expected);
}
#[tokio::test]
async fn test_optimize_prune_versions() {
let conn = connect("memory://").execute().await.unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..10))],
)
.unwrap();
let table = conn
.create_table("test_prune", batch)
.execute()
.await
.unwrap();
for i in 0..5 {
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(
(i * 10 + 10)..((i + 1) * 10 + 10),
))],
)
.unwrap();
table.add(batch).execute().await.unwrap();
}
let versions = table.list_versions().await.unwrap();
assert!(versions.len() > 1);
let stats = table
.optimize(OptimizeAction::Prune {
older_than: Some(chrono::Duration::try_days(0).unwrap()),
delete_unverified: Some(true),
error_if_tagged_old_versions: None,
})
.await
.unwrap();
assert!(stats.compaction.is_none());
let prune_stats = stats.prune.unwrap();
assert!(prune_stats.bytes_removed > 0);
assert_eq!(prune_stats.old_versions, 5);
let final_row_count = table.count_rows(None).await.unwrap();
assert_eq!(final_row_count, 60);
let batches = table
.query()
.execute()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let mut all_values: Vec<i32> = Vec::new();
for batch in &batches {
let array = batch["i"].as_any().downcast_ref::<Int32Array>().unwrap();
all_values.extend(array.values().iter().copied());
}
all_values.sort();
let expected: Vec<i32> = (0..60).collect();
assert_eq!(all_values, expected);
}
#[tokio::test]
async fn test_optimize_index() {
let conn = connect("memory://").execute().await.unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..100))],
)
.unwrap();
let table = conn
.create_table("test_index_optimize", batch)
.execute()
.await
.unwrap();
table
.create_index(&["i"], Index::BTree(BTreeIndexBuilder::default()))
.execute()
.await
.unwrap();
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(100..200))],
)
.unwrap();
table.add(batch).execute().await.unwrap();
let indices = table.list_indices().await.unwrap();
assert_eq!(indices.len(), 1);
let index_name = indices[0].name.clone();
let stats_before = table.index_stats(&index_name).await.unwrap().unwrap();
assert_eq!(stats_before.num_indexed_rows, 100);
assert_eq!(stats_before.num_unindexed_rows, 100);
let stats = table
.optimize(OptimizeAction::Index(Default::default()))
.await
.unwrap();
assert!(stats.compaction.is_none());
assert!(stats.prune.is_none());
let stats_after = table.index_stats(&index_name).await.unwrap().unwrap();
assert_eq!(stats_after.num_indexed_rows, 200);
assert_eq!(stats_after.num_unindexed_rows, 0);
assert!(stats_after.num_indices.is_some());
let final_row_count = table.count_rows(None).await.unwrap();
assert_eq!(final_row_count, 200);
}
#[tokio::test]
async fn test_optimize_all() {
let conn = connect("memory://").execute().await.unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..100))],
)
.unwrap();
let table = conn
.create_table("test_optimize_all", batch)
.execute()
.await
.unwrap();
for i in 0..3 {
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(
(i * 100 + 100)..((i + 1) * 100 + 100),
))],
)
.unwrap();
table.add(batch).execute().await.unwrap();
}
let stats = table.optimize(OptimizeAction::All).await.unwrap();
assert!(stats.compaction.is_some());
assert!(stats.prune.is_some());
let final_row_count = table.count_rows(None).await.unwrap();
assert_eq!(final_row_count, 400);
let batches = table
.query()
.execute()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let mut all_values: Vec<i32> = Vec::new();
for batch in &batches {
let array = batch["i"].as_any().downcast_ref::<Int32Array>().unwrap();
all_values.extend(array.values().iter().copied());
}
all_values.sort();
let expected: Vec<i32> = (0..400).collect();
assert_eq!(all_values, expected);
}
#[tokio::test]
async fn test_optimize_default_action() {
let action: OptimizeAction = Default::default();
assert!(matches!(action, OptimizeAction::All));
}
#[tokio::test]
async fn test_optimize_stats_default() {
let stats: OptimizeStats = Default::default();
assert!(stats.compaction.is_none());
assert!(stats.prune.is_none());
}
#[tokio::test]
async fn test_compact_with_deferred_index_remap() {
let conn = connect("memory://").execute().await.unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..100))],
)
.unwrap();
let table = conn
.create_table("test_deferred_remap", batch.clone())
.execute()
.await
.unwrap();
table.add(batch).execute().await.unwrap();
table
.create_index(&["id"], Index::BTree(BTreeIndexBuilder::default()))
.execute()
.await
.unwrap();
let stats = table
.optimize(OptimizeAction::Compact {
options: CompactionOptions {
target_rows_per_fragment: 2000,
defer_index_remap: true,
..Default::default()
},
remap_options: None,
})
.await
.unwrap();
assert!(stats.compaction.is_some());
let final_row_count = table.count_rows(None).await.unwrap();
assert_eq!(final_row_count, 200);
let batches = table
.query()
.execute()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let mut all_values: Vec<i32> = Vec::new();
for batch in &batches {
let array = batch["id"].as_any().downcast_ref::<Int32Array>().unwrap();
all_values.extend(array.values().iter().copied());
}
all_values.sort();
let mut expected: Vec<i32> = (0..100).chain(0..100).collect();
expected.sort();
assert_eq!(all_values, expected);
}
#[tokio::test]
async fn test_compaction_preserves_schema() {
let conn = connect("memory://").execute().await.unwrap();
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, true),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from_iter_values(0..10)),
Arc::new(StringArray::from(
(0..10).map(|i| format!("name_{}", i)).collect::<Vec<_>>(),
)),
],
)
.unwrap();
let original_schema = batch.schema();
let table = conn
.create_table("test_schema_preserved", batch.clone())
.execute()
.await
.unwrap();
table.add(batch).execute().await.unwrap();
table
.optimize(OptimizeAction::Compact {
options: CompactionOptions::default(),
remap_options: None,
})
.await
.unwrap();
let current_schema = table.schema().await.unwrap();
assert_eq!(current_schema, original_schema);
let batches = table
.query()
.execute()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 20);
}
#[tokio::test]
async fn test_optimize_empty_table() {
let conn = connect("memory://").execute().await.unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..10))],
)
.unwrap();
let table = conn
.create_table("test_empty_optimize", batch)
.execute()
.await
.unwrap();
table.delete("true").await.unwrap();
assert_eq!(table.count_rows(None).await.unwrap(), 0);
let stats = table.optimize(OptimizeAction::All).await.unwrap();
assert!(stats.compaction.is_some());
assert!(stats.prune.is_some());
assert_eq!(table.count_rows(None).await.unwrap(), 0);
let current_schema = table.schema().await.unwrap();
assert_eq!(current_schema, schema);
}
#[rstest]
#[case::all(OptimizeAction::All)]
#[case::compact(OptimizeAction::Compact {
options: CompactionOptions::default(),
remap_options: None,
})]
#[case::prune(OptimizeAction::Prune {
older_than: Some(chrono::Duration::try_days(0).unwrap()),
delete_unverified: Some(true),
error_if_tagged_old_versions: None,
})]
#[case::index(OptimizeAction::Index(Default::default()))]
#[tokio::test]
async fn test_optimize_fails_on_checked_out_table(#[case] action: OptimizeAction) {
let conn = connect("memory://").execute().await.unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..10))],
)
.unwrap();
let table = conn
.create_table("test_checkout_optimize", batch.clone())
.execute()
.await
.unwrap();
table.add(batch).execute().await.unwrap();
table.checkout(1).await.unwrap();
let result = table.optimize(action).await;
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("cannot be modified when a specific version is checked out"),
"Expected error message about checked out table, got: {}",
err_msg
);
}
}