use std::borrow::Cow;
use std::ops::{AddAssign, Range};
use std::sync::Arc;
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::{StreamExt, TryStreamExt};
use serde::{Deserialize, Serialize};
use crate::io::commit::commit_transaction;
use crate::Result;
use crate::{format::Fragment, Dataset};
use super::fragment::FileFragment;
use super::transaction::{Operation, RewriteGroup, Transaction};
use super::{write_fragments, WriteMode, WriteParams};
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct CompactionOptions {
pub target_rows_per_fragment: usize,
pub max_rows_per_group: usize,
pub materialize_deletions: bool,
pub materialize_deletions_threshold: f32,
pub num_threads: usize,
}
impl Default for CompactionOptions {
fn default() -> Self {
Self {
target_rows_per_fragment: 1024 * 1024,
max_rows_per_group: 1024,
materialize_deletions: true,
materialize_deletions_threshold: 0.1,
num_threads: num_cpus::get(),
}
}
}
impl CompactionOptions {
pub fn validate(&mut self) {
if self.materialize_deletions && self.materialize_deletions_threshold >= 1.0 {
self.materialize_deletions = false;
}
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct CompactionMetrics {
pub fragments_removed: usize,
pub fragments_added: usize,
pub files_removed: usize,
pub files_added: usize,
}
impl AddAssign for CompactionMetrics {
fn add_assign(&mut self, rhs: Self) {
self.fragments_removed += rhs.fragments_removed;
self.fragments_added += rhs.fragments_added;
self.files_removed += rhs.files_removed;
self.files_added += rhs.files_added;
}
}
pub async fn compact_files(
dataset: &mut Dataset,
mut options: CompactionOptions,
) -> Result<CompactionMetrics> {
options.validate();
let compaction_plan: CompactionPlan = plan_compaction(dataset, &options).await?;
if compaction_plan.tasks().is_empty() {
return Ok(CompactionMetrics::default());
}
let dataset_ref = &dataset.clone();
let result_stream = futures::stream::iter(compaction_plan.tasks.into_iter())
.map(|task| rewrite_files(Cow::Borrowed(dataset_ref), task, &options))
.buffer_unordered(options.num_threads);
let completed_tasks: Vec<RewriteResult> = result_stream.try_collect().await?;
let metrics = commit_compaction(dataset, completed_tasks).await?;
Ok(metrics)
}
#[derive(Debug)]
struct FragmentMetrics {
pub fragment_length: usize,
pub num_deletions: usize,
}
impl FragmentMetrics {
fn deletion_percentage(&self) -> f32 {
if self.fragment_length > 0 {
self.num_deletions as f32 / self.fragment_length as f32
} else {
0.0
}
}
fn num_rows(&self) -> usize {
self.fragment_length - self.num_deletions
}
}
async fn collect_metrics(fragment: &FileFragment) -> Result<FragmentMetrics> {
let fragment_length = fragment.fragment_length();
let num_deletions = fragment.count_deletions();
let (fragment_length, num_deletions) =
futures::future::try_join(fragment_length, num_deletions).await?;
Ok(FragmentMetrics {
fragment_length,
num_deletions,
})
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompactionPlan {
tasks: Vec<TaskData>,
read_version: u64,
options: CompactionOptions,
}
impl CompactionPlan {
pub fn compaction_tasks(&self) -> impl Iterator<Item = CompactionTask> + '_ {
let read_version = self.read_version;
let options = self.options.clone();
self.tasks.iter().map(move |task| CompactionTask {
task: task.clone(),
read_version,
options: options.clone(),
})
}
pub fn num_tasks(&self) -> usize {
self.tasks.len()
}
pub fn read_version(&self) -> u64 {
self.read_version
}
pub fn options(&self) -> &CompactionOptions {
&self.options
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct TaskData {
pub fragments: Vec<Fragment>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct CompactionTask {
pub task: TaskData,
pub read_version: u64,
options: CompactionOptions,
}
impl CompactionTask {
pub async fn execute(&self, dataset: &Dataset) -> Result<RewriteResult> {
let dataset = if dataset.manifest.version == self.read_version {
Cow::Borrowed(dataset)
} else {
Cow::Owned(dataset.checkout_version(self.read_version).await?)
};
rewrite_files(dataset, self.task.clone(), &self.options).await
}
}
impl CompactionPlan {
fn new(read_version: u64, options: CompactionOptions) -> Self {
Self {
tasks: Vec::new(),
read_version,
options,
}
}
fn extend_tasks(&mut self, tasks: impl IntoIterator<Item = TaskData>) {
self.tasks.extend(tasks);
}
fn tasks(&self) -> &[TaskData] {
&self.tasks
}
}
#[derive(Debug, Clone)]
enum CompactionCandidacy {
CompactWithNeighbors,
CompactItself,
}
struct CandidateBin {
pub fragments: Vec<Fragment>,
pub pos_range: Range<usize>,
pub candidacy: Vec<CompactionCandidacy>,
pub row_counts: Vec<usize>,
}
impl CandidateBin {
fn is_noop(&self) -> bool {
if self.fragments.is_empty() {
return true;
}
if self.fragments.len() == 1 {
matches!(self.candidacy[0], CompactionCandidacy::CompactWithNeighbors)
} else {
false
}
}
fn split_for_size(mut self, min_num_rows: usize) -> Vec<Self> {
let mut bins = Vec::new();
loop {
let mut bin_len = 0;
let mut bin_row_count = 0;
while bin_row_count < min_num_rows && bin_len < self.row_counts.len() {
bin_row_count += self.row_counts[bin_len];
bin_len += 1;
}
if self.row_counts[bin_len..].iter().sum::<usize>() >= min_num_rows {
bins.push(Self {
fragments: self.fragments.drain(0..bin_len).collect(),
pos_range: self.pos_range.start..(self.pos_range.start + bin_len),
candidacy: self.candidacy.drain(0..bin_len).collect(),
row_counts: self.row_counts.drain(0..bin_len).collect(),
});
self.pos_range.start += bin_len;
} else {
bins.push(self);
break;
}
}
bins
}
}
pub async fn plan_compaction(
dataset: &Dataset,
options: &CompactionOptions,
) -> Result<CompactionPlan> {
let mut fragment_metrics = futures::stream::iter(dataset.get_fragments())
.map(|fragment| async move {
match collect_metrics(&fragment).await {
Ok(metrics) => Ok((fragment.metadata, metrics)),
Err(e) => Err(e),
}
})
.buffered(num_cpus::get() * 2);
let mut candidate_bins: Vec<CandidateBin> = Vec::new();
let mut current_bin: Option<CandidateBin> = None;
let mut i = 0;
while let Some(res) = fragment_metrics.next().await {
let (fragment, metrics) = res?;
let candidacy = if options.materialize_deletions
&& metrics.deletion_percentage() > options.materialize_deletions_threshold
{
Some(CompactionCandidacy::CompactItself)
} else if metrics.fragment_length < options.target_rows_per_fragment {
Some(CompactionCandidacy::CompactWithNeighbors)
} else {
None
};
match (candidacy, &mut current_bin) {
(None, None) => {} (Some(candidacy), None) => {
current_bin = Some(CandidateBin {
fragments: vec![fragment],
pos_range: i..(i + 1),
candidacy: vec![candidacy],
row_counts: vec![metrics.num_rows()],
});
}
(Some(candidacy), Some(bin)) => {
bin.fragments.push(fragment);
bin.pos_range.end += 1;
bin.candidacy.push(candidacy);
bin.row_counts.push(metrics.num_rows());
}
(None, Some(_)) => {
candidate_bins.push(current_bin.take().unwrap());
}
}
i += 1;
}
if let Some(bin) = current_bin {
candidate_bins.push(bin);
}
let final_bins = candidate_bins
.into_iter()
.filter(|bin| !bin.is_noop())
.flat_map(|bin| bin.split_for_size(options.target_rows_per_fragment))
.map(|bin| TaskData {
fragments: bin.fragments,
});
let mut compaction_plan = CompactionPlan::new(dataset.manifest.version, options.clone());
compaction_plan.extend_tasks(final_bins);
Ok(compaction_plan)
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct RewriteResult {
pub metrics: CompactionMetrics,
pub new_fragments: Vec<Fragment>,
pub read_version: u64,
pub original_fragments: Vec<Fragment>,
}
async fn rewrite_files(
dataset: Cow<'_, Dataset>,
task: TaskData,
options: &CompactionOptions,
) -> Result<RewriteResult> {
let mut metrics = CompactionMetrics::default();
if task.fragments.is_empty() {
return Ok(RewriteResult {
metrics,
new_fragments: Vec::new(),
read_version: dataset.manifest.version,
original_fragments: task.fragments,
});
}
let fragments = task.fragments.to_vec();
let mut scanner = dataset.scan();
scanner.with_fragments(fragments);
let data = SendableRecordBatchStream::from(scanner.try_into_stream().await?);
let params = WriteParams {
max_rows_per_file: options.target_rows_per_fragment,
max_rows_per_group: options.max_rows_per_group,
mode: WriteMode::Append,
..Default::default()
};
let new_fragments = write_fragments(
dataset.object_store.clone(),
&dataset.base,
dataset.schema(),
data,
params,
)
.await?;
metrics.files_removed = task
.fragments
.iter()
.map(|f| f.files.len() + f.deletion_file.is_some() as usize)
.sum();
metrics.fragments_removed = task.fragments.len();
metrics.fragments_added = new_fragments.len();
metrics.files_added = new_fragments
.iter()
.map(|f| f.files.len() + f.deletion_file.is_some() as usize)
.sum();
Ok(RewriteResult {
metrics,
new_fragments,
read_version: dataset.manifest.version,
original_fragments: task.fragments,
})
}
pub async fn commit_compaction(
dataset: &mut Dataset,
completed_tasks: Vec<RewriteResult>,
) -> Result<CompactionMetrics> {
if completed_tasks.is_empty() {
return Ok(CompactionMetrics::default());
}
let mut rewrite_groups = Vec::with_capacity(completed_tasks.len());
let mut metrics = CompactionMetrics::default();
for task in completed_tasks {
metrics += task.metrics;
let rewrite_group = RewriteGroup {
old_fragments: task.original_fragments,
new_fragments: task.new_fragments,
};
rewrite_groups.push(rewrite_group);
}
let transaction = Transaction::new(
dataset.manifest.version,
Operation::Rewrite {
groups: rewrite_groups,
},
None,
);
let manifest = commit_transaction(
dataset,
dataset.object_store(),
&transaction,
&Default::default(),
&Default::default(),
)
.await?;
dataset.manifest = Arc::new(manifest);
Ok(metrics)
}
#[cfg(test)]
mod tests {
use arrow_array::{Float32Array, Int64Array, RecordBatch, RecordBatchIterator};
use arrow_schema::{DataType, Field, Schema};
use arrow_select::concat::concat_batches;
use futures::TryStreamExt;
use tempfile::tempdir;
use super::*;
#[test]
fn test_candidate_bin() {
let empty_bin = CandidateBin {
fragments: vec![],
pos_range: 0..0,
candidacy: vec![],
row_counts: vec![],
};
assert!(empty_bin.is_noop());
let fragment = Fragment {
id: 0,
files: vec![],
deletion_file: None,
};
let single_bin = CandidateBin {
fragments: vec![fragment.clone()],
pos_range: 0..1,
candidacy: vec![CompactionCandidacy::CompactWithNeighbors],
row_counts: vec![100],
};
assert!(single_bin.is_noop());
let single_bin = CandidateBin {
fragments: vec![fragment.clone()],
pos_range: 0..1,
candidacy: vec![CompactionCandidacy::CompactItself],
row_counts: vec![100],
};
assert!(!single_bin.is_noop());
let big_bin = CandidateBin {
fragments: std::iter::repeat(fragment.clone()).take(8).collect(),
pos_range: 0..8,
candidacy: std::iter::repeat(CompactionCandidacy::CompactItself)
.take(8)
.collect(),
row_counts: vec![100, 400, 200, 200, 400, 300, 300, 100],
};
assert!(!big_bin.is_noop());
let split = big_bin.split_for_size(500);
assert_eq!(split.len(), 3);
assert_eq!(split[0].pos_range, 0..2);
assert_eq!(split[1].pos_range, 2..5);
assert_eq!(split[2].pos_range, 5..8);
}
fn sample_data() -> RecordBatch {
let schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]);
RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(Int64Array::from_iter_values(0..10_000))],
)
.unwrap()
}
#[tokio::test]
async fn test_compact_empty() {
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]);
let reader = RecordBatchIterator::new(vec![].into_iter().map(Ok), Arc::new(schema));
let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
let plan = plan_compaction(&dataset, &CompactionOptions::default())
.await
.unwrap();
assert_eq!(plan.tasks().len(), 0);
let metrics = compact_files(&mut dataset, CompactionOptions::default())
.await
.unwrap();
assert_eq!(metrics, CompactionMetrics::default());
assert_eq!(dataset.manifest.version, 1);
}
#[tokio::test]
async fn test_compact_all_good() {
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let data = sample_data();
let reader = RecordBatchIterator::new(vec![Ok(data.clone())], data.schema());
let write_params = WriteParams {
max_rows_per_file: 10_000,
..Default::default()
};
let dataset = Dataset::write(reader, test_uri, Some(write_params))
.await
.unwrap();
let plan = plan_compaction(&dataset, &CompactionOptions::default())
.await
.unwrap();
assert_eq!(plan.tasks().len(), 0);
let reader = RecordBatchIterator::new(vec![Ok(data.clone())], data.schema());
let write_params = WriteParams {
max_rows_per_file: 3_000,
max_rows_per_group: 1_000,
mode: WriteMode::Overwrite,
..Default::default()
};
let dataset = Dataset::write(reader, test_uri, Some(write_params))
.await
.unwrap();
let options = CompactionOptions {
target_rows_per_fragment: 3_000,
..Default::default()
};
let plan = plan_compaction(&dataset, &options).await.unwrap();
assert_eq!(plan.tasks().len(), 0);
}
#[tokio::test]
async fn test_compact_many() {
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let data = sample_data();
let reader = RecordBatchIterator::new(vec![Ok(data.slice(0, 1200))], data.schema());
let write_params = WriteParams {
max_rows_per_file: 400,
..Default::default()
};
Dataset::write(reader, test_uri, Some(write_params))
.await
.unwrap();
let reader = RecordBatchIterator::new(vec![Ok(data.slice(1200, 2000))], data.schema());
let write_params = WriteParams {
max_rows_per_file: 1000,
mode: WriteMode::Append,
..Default::default()
};
let mut dataset = Dataset::write(reader, test_uri, Some(write_params))
.await
.unwrap();
dataset.delete("a = 1300").await.unwrap();
dataset.delete("a >= 2400 AND a < 2600").await.unwrap();
let reader = RecordBatchIterator::new(vec![Ok(data.slice(3200, 600))], data.schema());
let write_params = WriteParams {
max_rows_per_file: 300,
mode: WriteMode::Append,
..Default::default()
};
let mut dataset = Dataset::write(reader, test_uri, Some(write_params))
.await
.unwrap();
let options = CompactionOptions {
target_rows_per_fragment: 1000,
..Default::default()
};
let plan = plan_compaction(&dataset, &options).await.unwrap();
assert_eq!(plan.tasks().len(), 2);
assert_eq!(plan.tasks()[0].fragments.len(), 3);
assert_eq!(plan.tasks()[1].fragments.len(), 3);
assert_eq!(
plan.tasks()[0]
.fragments
.iter()
.map(|f| f.id)
.collect::<Vec<_>>(),
vec![0, 1, 2]
);
assert_eq!(
plan.tasks()[1]
.fragments
.iter()
.map(|f| f.id)
.collect::<Vec<_>>(),
vec![4, 5, 6]
);
let metrics = compact_files(&mut dataset, options).await.unwrap();
assert_eq!(metrics.fragments_removed, 6);
assert_eq!(metrics.fragments_added, 4);
assert_eq!(metrics.files_removed, 7); assert_eq!(metrics.files_added, 4);
let fragment_ids = dataset
.get_fragments()
.iter()
.map(|f| f.id())
.collect::<Vec<_>>();
assert_eq!(fragment_ids[2], 3);
assert!(fragment_ids.iter().all(|id| *id > 6 || *id == 3));
dataset.validate().await.unwrap();
}
#[tokio::test]
async fn test_compact_data_files() {
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let data = sample_data();
let reader = RecordBatchIterator::new(vec![Ok(data.clone())], data.schema());
let write_params = WriteParams {
max_rows_per_file: 5_000,
max_rows_per_group: 1_000,
..Default::default()
};
let mut dataset = Dataset::write(reader, test_uri, Some(write_params))
.await
.unwrap();
let schema = Schema::new(vec![
Field::new("a", DataType::Int64, false),
Field::new("x", DataType::Float32, false),
]);
let data = RecordBatch::try_new(
Arc::new(schema),
vec![
Arc::new(Int64Array::from_iter_values(0..10_000)),
Arc::new(Float32Array::from_iter_values(
(0..10_000).map(|x| x as f32 * std::f32::consts::PI),
)),
],
)
.unwrap();
let reader = RecordBatchIterator::new(vec![Ok(data.clone())], data.schema());
dataset.merge(reader, "a", "a").await.unwrap();
let plan = plan_compaction(&dataset, &CompactionOptions::default())
.await
.unwrap();
assert_eq!(plan.tasks().len(), 1);
assert_eq!(plan.tasks()[0].fragments.len(), 2);
let metrics = compact_files(&mut dataset, CompactionOptions::default())
.await
.unwrap();
assert_eq!(metrics.files_removed, 4); assert_eq!(metrics.files_added, 1); assert_eq!(metrics.fragments_removed, 2);
assert_eq!(metrics.fragments_added, 1);
let scanner = dataset.scan();
let batches = scanner
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let scanned_data = concat_batches(&batches[0].schema(), &batches).unwrap();
assert_eq!(scanned_data, data);
}
#[tokio::test]
async fn test_compact_deletions() {
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let data = sample_data();
let reader = RecordBatchIterator::new(vec![Ok(data.slice(0, 1000))], data.schema());
let write_params = WriteParams {
max_rows_per_file: 1000,
..Default::default()
};
let mut dataset = Dataset::write(reader, test_uri, Some(write_params))
.await
.unwrap();
dataset.delete("a <= 500").await.unwrap();
let mut options = CompactionOptions {
materialize_deletions_threshold: 0.8,
..Default::default()
};
let plan = plan_compaction(&dataset, &options).await.unwrap();
assert_eq!(plan.tasks().len(), 0);
options.materialize_deletions_threshold = 0.1;
options.materialize_deletions = false;
let plan = plan_compaction(&dataset, &options).await.unwrap();
assert_eq!(plan.tasks().len(), 0);
options.materialize_deletions = true;
let plan = plan_compaction(&dataset, &options).await.unwrap();
assert_eq!(plan.tasks().len(), 1);
let metrics = compact_files(&mut dataset, options).await.unwrap();
assert_eq!(metrics.fragments_removed, 1);
assert_eq!(metrics.files_removed, 2);
assert_eq!(metrics.fragments_added, 1);
let fragments = dataset.get_fragments();
assert_eq!(fragments.len(), 1);
assert!(fragments[0].metadata.deletion_file.is_none());
}
#[tokio::test]
async fn test_compact_distributed() {
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let data = sample_data();
let reader = RecordBatchIterator::new(vec![Ok(data.slice(0, 9000))], data.schema());
let write_params = WriteParams {
max_rows_per_file: 1000,
..Default::default()
};
let mut dataset = Dataset::write(reader, test_uri, Some(write_params))
.await
.unwrap();
let options = CompactionOptions {
target_rows_per_fragment: 3_000,
..Default::default()
};
let plan = plan_compaction(&dataset, &options).await.unwrap();
assert_eq!(plan.tasks().len(), 3);
let dataset_ref = &dataset;
let mut results = futures::stream::iter(plan.compaction_tasks())
.then(|task| async move { task.execute(dataset_ref).await.unwrap() })
.collect::<Vec<_>>()
.await;
assert_eq!(results.len(), 3);
assert_eq!(
results[0]
.original_fragments
.iter()
.map(|f| f.id)
.collect::<Vec<_>>(),
vec![0, 1, 2]
);
assert_eq!(results[0].metrics.files_removed, 3);
assert_eq!(results[0].metrics.files_added, 1);
commit_compaction(&mut dataset, vec![results.pop().unwrap()])
.await
.unwrap();
assert_eq!(dataset.manifest.version, 2);
commit_compaction(&mut dataset, results).await.unwrap();
assert_eq!(dataset.manifest.version, 3);
}
}