Expand description
Table maintenance for optimizing table layout.
As a table is updated, its layout can become suboptimal. For example, if a series of small streaming appends are performed, eventually there will be a large number of small files. This imposes an overhead to track the large number of files and for very small files can make it harder to read data efficiently. In this case, files can be compacted into fewer larger files.
To compact files in a table, use the compact_files method. This currently can compact in two cases:
- If a fragment has fewer rows than the target number of rows per fragment. The fragment must also have neighbors that are also candidates for compaction.
- If a fragment has a higher percentage of deleted rows than the provided threshold.
In addition to the rules above there may be restrictions due to indexes. When a fragment is compacted its row ids change and any index that contained that fragment will be remapped. However, we cannot combine indexed fragments with unindexed fragments.
use lance::{dataset::WriteParams, Dataset, dataset::optimize::compact_files};
// Remapping indices is ignored in this example.
use lance::dataset::optimize::IgnoreRemap;
let schema = Arc::new(Schema::new(vec![Field::new("test", DataType::Int64, false)]));
let data = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int64Array::from_iter_values(0..10_000))]
).unwrap();
let reader = RecordBatchIterator::new(vec![Ok(data)], schema);
// Write 100 small files
let write_params = WriteParams { max_rows_per_file: 100, ..Default::default()};
let mut dataset = Dataset::write(reader, &uri, Some(write_params)).await.unwrap();
assert_eq!(dataset.get_fragments().len(), 100);
// Use compact_files() to consolidate the data to 1 fragment
let metrics = compact_files(&mut dataset, Default::default(), None).await.unwrap();
assert_eq!(metrics.fragments_removed, 100);
assert_eq!(metrics.fragments_added, 1);
assert_eq!(dataset.get_fragments().len(), 1);§Distributed execution
The compact_files method internally can use multiple threads, but sometimes you might want to run it across multiple machines. To do this, use the task API.
┌──► CompactionTask.execute() ─► RewriteResult ─┐
plan_compaction() ─► CompactionPlan ─┼──► CompactionTask.execute() ─► RewriteResult ─┼─► commit_compaction()
└──► CompactionTask.execute() ─► RewriteResult ─┘plan_compaction() produces a CompactionPlan. This can be split into multiple CompactionTask, which can be serialized and sent to other machines. Calling CompactionTask::execute() performs the compaction and returns a RewriteResult. The RewriteResult can be sent back to the coordinator, which can then call commit_compaction() to commit the changes to the dataset.
It’s not required that all tasks are passed to commit_compaction. If some didn’t complete successfully or before a deadline, they can be omitted and the successful tasks can be committed. You can also commit in batches if you wish. As long as the tasks don’t rewrite any of the same fragments, they can be committed in any order.
Re-exports§
pub use remapping::IgnoreRemap;pub use remapping::IndexRemapper;pub use remapping::IndexRemapperOptions;pub use remapping::RemappedIndex;
Modules§
- remapping
- Utilities for remapping row ids. Necessary before stable row ids.
Structs§
- Compaction
Metrics - Metrics returned by compact_files.
- Compaction
Options - Options to be passed to compact_files.
- Compaction
Plan - A plan for what groups of fragments to compact.
- Compaction
Task - A standalone task that can be serialized and sent to another machine for execution.
- Rewrite
Result - The result of a single compaction task.
- Task
Data - A single group of fragments to compact, which is a view into the compaction
plan. We keep the
replace_rangeindices so we can map the result of the compact back to the fragments it replaces.
Functions§
- commit_
compaction - Commit the results of file compaction.
- compact_
files - Compacts the files in the dataset without reordering them.
- plan_
compaction - Formulate a plan to compact the files in a dataset