use arrow::array::RecordBatch;
use arrow_row::{RowConverter, SortField};
use arrow_select::filter::filter_record_batch;
use chrono::{DateTime, Utc};
use derive_builder::Builder;
use futures::{TryStreamExt, future};
use parquet::{
arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder},
basic::Compression,
file::properties::{EnabledStatistics, WriterProperties},
};
use serde::{Deserialize, Serialize};
use std::{collections::HashSet, marker::PhantomData, path::Path, sync::Arc};
use tracing::{info, warn};
use crate::{
Client,
error::{CompactionError, Error, Result},
file_meta::FileMeta,
file_sink::DEFAULT_MAX_SIZE_BYTES,
list_files, put_file, remove_file,
traits::ArrowSchema,
};
use aws_sdk_s3::primitives::ByteStream;
#[derive(Debug, Clone, Builder)]
#[builder(pattern = "owned")]
pub struct FileCompactorConfig<T> {
client: Client,
bucket: String,
prefix: String,
#[builder(default = "None")]
after_timestamp: Option<DateTime<Utc>>,
until_timestamp: DateTime<Utc>,
#[builder(default = "DEFAULT_MAX_SIZE_BYTES")]
max_bytes_per_file: usize,
#[builder(default = "Compression::SNAPPY")]
compression: Compression,
#[builder(default = "10_000")]
row_group_size: usize,
#[builder(default = "true")]
delete_originals: bool,
#[builder(default = "false")]
enable_deduplication: bool,
#[builder(default)]
_phantom: PhantomData<T>,
}
#[derive(Debug, Clone, Serialize)]
pub struct CompactionResult {
pub files_processed: usize,
pub files_created: usize,
pub records_consolidated: usize,
pub bytes_saved: usize,
#[serde(skip_serializing_if = "is_zero", default)]
pub duplicate_records_eliminated: usize,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub last_processed_timestamp: Option<DateTime<Utc>>,
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub deletion_failures: Vec<String>,
}
fn is_zero(n: &usize) -> bool {
*n == 0
}
impl CompactionResult {
pub fn empty() -> Self {
Self {
files_processed: 0,
files_created: 0,
records_consolidated: 0,
bytes_saved: 0,
duplicate_records_eliminated: 0,
last_processed_timestamp: None,
deletion_failures: Vec::new(),
}
}
}
#[derive(Debug)]
struct FinalizeResult {
records_count: usize,
bytes_saved: usize,
duplicate_count: usize,
deletion_failures: Vec<String>,
}
struct DeduplicatingAccumulator {
batches: Vec<RecordBatch>,
seen_hashes: HashSet<u128>,
total_records: usize,
duplicate_count: usize,
}
impl DeduplicatingAccumulator {
fn new() -> Self {
Self {
batches: Vec::new(),
seen_hashes: HashSet::new(),
total_records: 0,
duplicate_count: 0,
}
}
fn add_batch(&mut self, batch: RecordBatch, deduplicate: bool) -> Result<()> {
if !deduplicate {
self.total_records += batch.num_rows();
self.batches.push(batch);
return Ok(());
}
let hashes = compute_row_hashes(&batch)?;
let mut keep_mask = vec![false; batch.num_rows()];
for (idx, hash) in hashes.iter().enumerate() {
if self.seen_hashes.insert(*hash) {
keep_mask[idx] = true;
} else {
self.duplicate_count += 1;
}
}
let filtered = filter_batch_by_mask(&batch, &keep_mask)?;
if filtered.num_rows() > 0 {
self.total_records += filtered.num_rows();
self.batches.push(filtered);
}
Ok(())
}
fn estimated_size(&self) -> usize {
self.batches.iter().map(|b| b.get_array_memory_size()).sum()
}
fn take_batches(&mut self) -> (Vec<RecordBatch>, usize) {
let batches = std::mem::take(&mut self.batches);
let duplicate_count = self.duplicate_count;
self.total_records = 0;
self.duplicate_count = 0;
(batches, duplicate_count)
}
fn is_empty(&self) -> bool {
self.batches.is_empty()
}
fn total_records(&self) -> usize {
self.total_records
}
}
fn compute_row_hashes(batch: &RecordBatch) -> Result<Vec<u128>> {
let schema = batch.schema();
let sort_fields: Vec<SortField> = schema
.fields()
.iter()
.map(|field| SortField::new(field.data_type().clone()))
.collect();
let converter = RowConverter::new(sort_fields)?;
let rows = converter.convert_columns(batch.columns())?;
let hashes = (0..batch.num_rows())
.map(|idx| xxhash_rust::xxh3::xxh3_128(rows.row(idx).as_ref()))
.collect();
Ok(hashes)
}
fn filter_batch_by_mask(batch: &RecordBatch, keep: &[bool]) -> Result<RecordBatch> {
use arrow::array::BooleanArray;
let filter_array = BooleanArray::from(keep.to_vec());
let filtered = filter_record_batch(batch, &filter_array)?;
Ok(filtered)
}
fn measure_output_size(
batches: &[RecordBatch],
schema: &Arc<arrow::datatypes::Schema>,
compression: Compression,
row_group_size: usize,
) -> Result<usize> {
let mut buf = Vec::new();
let props = WriterProperties::builder()
.set_compression(compression)
.set_max_row_group_size(row_group_size)
.set_write_batch_size(1024)
.set_statistics_enabled(EnabledStatistics::Page)
.set_created_by(format!("prestige/{}", env!("CARGO_PKG_VERSION")))
.build();
let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), Some(props))?;
for batch in batches {
writer.write(batch)?;
}
writer.close()?;
Ok(buf.len())
}
async fn create_processed_marker(client: &Client, bucket: &str, source_key: &str) -> Result<()> {
let marker_key = format!("{}.processed", source_key);
client
.put_object()
.bucket(bucket)
.key(marker_key)
.body(ByteStream::from_static(b""))
.content_type("text/plain")
.send()
.await
.map(|_| ())
.map_err(|e| Error::from(aws_sdk_s3::Error::from(e)))
}
async fn has_processed_marker(client: &Client, bucket: &str, source_key: &str) -> bool {
let marker_key = format!("{}.processed", source_key);
client
.head_object()
.bucket(bucket)
.key(marker_key)
.send()
.await
.is_ok()
}
async fn delete_processed_marker(client: &Client, bucket: &str, source_key: &str) -> Result<()> {
let marker_key = format!("{}.processed", source_key);
remove_file(client, bucket, &marker_key).await
}
async fn delete_original_files(client: &Client, bucket: &str, files: &[FileMeta]) -> Vec<String> {
info!("Deleting {} original files and markers", files.len());
let delete_futures: Vec<_> = files
.iter()
.map(|file| async move {
let key = &file.key;
let source_result = remove_file(client, bucket, key).await;
let _ = delete_processed_marker(client, bucket, key).await; source_result
})
.collect();
let results = future::join_all(delete_futures).await;
let mut failed_keys = Vec::new();
for (idx, result) in results.iter().enumerate() {
if let Err(e) = result {
let key = &files[idx].key;
warn!("Failed to delete original file {}: {}", key, e);
failed_keys.push(key.clone());
}
}
if !failed_keys.is_empty() {
warn!(
"Failed to delete {} out of {} original files",
failed_keys.len(),
files.len()
);
} else {
info!(
"Successfully deleted all {} original files and markers",
files.len()
);
}
failed_keys
}
impl FileCompactorConfigBuilder<()> {
pub async fn execute_schema_agnostic(self) -> Result<CompactionResult> {
let config = self
.build()
.map_err(|e| crate::error::Error::SerdeArrow(format!("Config builder error: {}", e)))?;
execute_compaction_schema_agnostic(config).await
}
pub async fn plan_schema_agnostic(self) -> Result<CompactionResult> {
let config = self
.build()
.map_err(|e| crate::error::Error::SerdeArrow(format!("Config builder error: {}", e)))?;
plan_compaction_schema_agnostic(config).await
}
}
impl<T> FileCompactorConfigBuilder<T>
where
T: ArrowSchema + Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync + 'static,
{
pub async fn execute(self) -> Result<CompactionResult> {
let config = self
.build()
.map_err(|e| crate::error::Error::SerdeArrow(format!("Config builder error: {}", e)))?;
let schema_agnostic_config = FileCompactorConfig {
client: config.client,
bucket: config.bucket,
prefix: config.prefix,
after_timestamp: config.after_timestamp,
until_timestamp: config.until_timestamp,
max_bytes_per_file: config.max_bytes_per_file,
compression: config.compression,
row_group_size: config.row_group_size,
delete_originals: config.delete_originals,
enable_deduplication: config.enable_deduplication,
_phantom: PhantomData::<()>,
};
execute_compaction_schema_agnostic(schema_agnostic_config).await
}
}
async fn finalize_and_upload_schema_agnostic(
batches: Vec<RecordBatch>,
duplicate_count: usize,
source_files: Vec<FileMeta>,
schema: &Arc<arrow::datatypes::Schema>,
config: &FileCompactorConfig<()>,
temp_dir: &Path,
) -> Result<FinalizeResult> {
let latest_timestamp = source_files
.iter()
.map(|f| f.timestamp)
.max()
.ok_or(CompactionError::NoSourceFiles)?;
let compacted_meta = FileMeta::as_compacted(config.prefix.clone(), latest_timestamp);
let total_records: usize = batches.iter().map(|b| b.num_rows()).sum();
info!(
"Finalizing {} records from {} source files into {}",
total_records,
source_files.len(),
compacted_meta.key
);
let local_path = temp_dir.join(&compacted_meta.key);
let std_file = std::fs::File::create(&local_path)?;
let props = WriterProperties::builder()
.set_compression(config.compression)
.set_max_row_group_size(config.row_group_size)
.set_write_batch_size(1024)
.set_statistics_enabled(EnabledStatistics::Page)
.set_created_by(format!("prestige/{}", env!("CARGO_PKG_VERSION")))
.build();
let mut writer = ArrowWriter::try_new(std_file, schema.clone(), Some(props))?;
for batch in batches {
writer.write(&batch)?;
}
writer.close()?;
info!("Uploading {} to S3", compacted_meta.key);
put_file(&config.client, &config.bucket, &local_path)
.await
.map_err(|_| CompactionError::UploadFailed {
file_key: compacted_meta.key.clone(),
})?;
info!("Uploaded {} successfully", compacted_meta.key);
info!(
"Creating processed markers for {} source files",
source_files.len()
);
let marker_futures: Vec<_> = source_files
.iter()
.map(|file| create_processed_marker(&config.client, &config.bucket, &file.key))
.collect();
let marker_results = future::join_all(marker_futures).await;
for (idx, result) in marker_results.iter().enumerate() {
if let Err(e) = result {
warn!(
"Failed to create marker for {}: {}",
source_files[idx].key, e
);
}
}
let original_bytes: usize = source_files.iter().map(|f| f.size).sum();
let compacted_bytes = local_path.metadata()?.len() as usize;
let bytes_saved = original_bytes.saturating_sub(compacted_bytes);
let deletion_failures = if config.delete_originals {
delete_original_files(&config.client, &config.bucket, &source_files).await
} else {
Vec::new()
};
Ok(FinalizeResult {
records_count: total_records,
bytes_saved,
duplicate_count,
deletion_failures,
})
}
async fn execute_compaction_schema_agnostic(
config: FileCompactorConfig<()>,
) -> Result<CompactionResult> {
info!(
"Starting schema-agnostic compaction for prefix '{}' in bucket '{}'",
config.prefix, config.bucket
);
info!(
"Time range: after {:?}, until {}",
config.after_timestamp, config.until_timestamp
);
let mut uncompacted_files = Vec::new();
let mut file_stream = list_files(
&config.client,
&config.bucket,
&config.prefix,
config.after_timestamp,
Some(config.until_timestamp),
);
while let Some(file) = file_stream.try_next().await? {
if !file.compacted {
uncompacted_files.push(file);
}
}
if uncompacted_files.is_empty() {
info!("No uncompacted files found");
return Ok(CompactionResult::empty());
}
info!(
"Found {} uncompacted files to process",
uncompacted_files.len()
);
uncompacted_files.sort_by_key(|f| f.timestamp);
let temp_dir = tempfile::tempdir()?;
info!("Using temporary directory: {}", temp_dir.path().display());
let mut accumulator = DeduplicatingAccumulator::new();
let mut source_files: Vec<FileMeta> = Vec::new();
let mut schema: Option<Arc<arrow::datatypes::Schema>> = None;
let mut files_processed = 0;
let mut files_created = 0;
let mut records_consolidated = 0;
let mut bytes_saved = 0;
let mut duplicate_records_eliminated = 0;
let mut last_processed_timestamp: Option<DateTime<Utc>> = None;
let mut deletion_failures: Vec<String> = Vec::new();
for file_meta in uncompacted_files {
info!("Processing file: {}", file_meta.key);
if has_processed_marker(&config.client, &config.bucket, &file_meta.key).await {
info!(
"Skipping already-processed file: {} (marker exists)",
file_meta.key
);
files_processed += 1;
last_processed_timestamp = Some(file_meta.timestamp);
continue;
}
let file_content = config
.client
.get_object()
.bucket(&config.bucket)
.key(&file_meta.key)
.send()
.await
.map_err(|e| Error::from(aws_sdk_s3::Error::from(e)))?;
let bytes = file_content
.body
.collect()
.await
.map_err(|e| Error::Io(std::io::Error::other(e)))?
.into_bytes();
if bytes.is_empty() {
info!("Skipping empty file: {}", file_meta.key);
files_processed += 1;
last_processed_timestamp = Some(file_meta.timestamp);
source_files.push(file_meta.clone());
continue;
}
let cursor = std::io::Cursor::new(bytes);
let builder = match ParquetRecordBatchStreamBuilder::new(cursor).await {
Ok(b) => b,
Err(e) => {
warn!("Failed to read parquet file {}: {}", file_meta.key, e);
files_processed += 1;
last_processed_timestamp = Some(file_meta.timestamp);
continue;
}
};
if let Some(ref current_schema) = schema {
if current_schema != builder.schema() {
warn!("Schema mismatch in file {}, skipping", file_meta.key);
files_processed += 1;
last_processed_timestamp = Some(file_meta.timestamp);
continue;
}
} else {
let new_schema = builder.schema().clone();
info!("Detected schema with {} fields", new_schema.fields().len());
schema = Some(new_schema);
}
let mut stream = builder.build()?;
let mut file_had_records = false;
while let Some(batch_result) = stream.try_next().await? {
if batch_result.num_rows() == 0 {
continue;
}
file_had_records = true;
let batch_size = batch_result.get_array_memory_size();
info!(
"Loaded batch with {} records ({} bytes) from {}",
batch_result.num_rows(),
batch_size,
file_meta.key
);
if !accumulator.is_empty()
&& (accumulator.estimated_size() + batch_size) > config.max_bytes_per_file
{
info!(
"Size limit reached, finalizing current batch ({} bytes, {} records)",
accumulator.estimated_size(),
accumulator.total_records()
);
let (batches, dup_count) = accumulator.take_batches();
let finalize_result = finalize_and_upload_schema_agnostic(
batches,
dup_count,
source_files,
schema.as_ref().ok_or(Error::Internal(
"schema not initialized during compaction".into(),
))?,
&config,
temp_dir.path(),
)
.await?;
files_created += 1;
records_consolidated += finalize_result.records_count;
bytes_saved += finalize_result.bytes_saved;
duplicate_records_eliminated += finalize_result.duplicate_count;
deletion_failures.extend(finalize_result.deletion_failures);
source_files = Vec::new();
}
accumulator.add_batch(batch_result, config.enable_deduplication)?;
}
if !file_had_records {
info!("Skipping file with no records: {}", file_meta.key);
}
source_files.push(file_meta.clone());
files_processed += 1;
last_processed_timestamp = Some(file_meta.timestamp);
}
if !accumulator.is_empty() {
info!(
"Finalizing remaining batch ({} bytes, {} records)",
accumulator.estimated_size(),
accumulator.total_records()
);
let (batches, dup_count) = accumulator.take_batches();
let finalize_result = finalize_and_upload_schema_agnostic(
batches,
dup_count,
source_files,
schema.as_ref().ok_or(CompactionError::NoSourceFiles)?,
&config,
temp_dir.path(),
)
.await?;
files_created += 1;
records_consolidated += finalize_result.records_count;
bytes_saved += finalize_result.bytes_saved;
duplicate_records_eliminated += finalize_result.duplicate_count;
deletion_failures.extend(finalize_result.deletion_failures);
}
info!(
"Schema-agnostic compaction complete: {} files -> {} files, {} records, {} duplicates eliminated, ~{} bytes saved",
files_processed,
files_created,
records_consolidated,
duplicate_records_eliminated,
bytes_saved
);
Ok(CompactionResult {
files_processed,
files_created,
records_consolidated,
bytes_saved,
duplicate_records_eliminated,
last_processed_timestamp,
deletion_failures,
})
}
async fn plan_compaction_schema_agnostic(
config: FileCompactorConfig<()>,
) -> Result<CompactionResult> {
info!(
"Planning schema-agnostic compaction for prefix '{}' in bucket '{}'",
config.prefix, config.bucket
);
info!(
"Time range: after {:?}, until {}",
config.after_timestamp, config.until_timestamp
);
let mut uncompacted_files = Vec::new();
let mut file_stream = list_files(
&config.client,
&config.bucket,
&config.prefix,
config.after_timestamp,
Some(config.until_timestamp),
);
while let Some(file) = file_stream.try_next().await? {
if !file.compacted {
uncompacted_files.push(file);
}
}
if uncompacted_files.is_empty() {
info!("No uncompacted files found");
return Ok(CompactionResult::empty());
}
info!(
"Found {} uncompacted files to process",
uncompacted_files.len()
);
uncompacted_files.sort_by_key(|f| f.timestamp);
let mut accumulator = DeduplicatingAccumulator::new();
let mut source_files: Vec<FileMeta> = Vec::new();
let mut schema: Option<Arc<arrow::datatypes::Schema>> = None;
let mut files_processed = 0;
let mut files_created = 0;
let mut records_consolidated = 0;
let mut bytes_saved = 0;
let mut duplicate_records_eliminated = 0;
let mut last_processed_timestamp: Option<DateTime<Utc>> = None;
for file_meta in uncompacted_files {
info!("Planning file: {}", file_meta.key);
if has_processed_marker(&config.client, &config.bucket, &file_meta.key).await {
info!(
"Skipping already-processed file: {} (marker exists)",
file_meta.key
);
files_processed += 1;
last_processed_timestamp = Some(file_meta.timestamp);
continue;
}
let file_content = config
.client
.get_object()
.bucket(&config.bucket)
.key(&file_meta.key)
.send()
.await
.map_err(|e| Error::from(aws_sdk_s3::Error::from(e)))?;
let bytes = file_content
.body
.collect()
.await
.map_err(|e| Error::Io(std::io::Error::other(e)))?
.into_bytes();
if bytes.is_empty() {
info!("Skipping empty file: {}", file_meta.key);
files_processed += 1;
last_processed_timestamp = Some(file_meta.timestamp);
source_files.push(file_meta.clone());
continue;
}
let cursor = std::io::Cursor::new(bytes);
let builder = match ParquetRecordBatchStreamBuilder::new(cursor).await {
Ok(b) => b,
Err(e) => {
warn!("Failed to read parquet file {}: {}", file_meta.key, e);
files_processed += 1;
last_processed_timestamp = Some(file_meta.timestamp);
continue;
}
};
if let Some(ref current_schema) = schema {
if current_schema != builder.schema() {
warn!("Schema mismatch in file {}, skipping", file_meta.key);
files_processed += 1;
last_processed_timestamp = Some(file_meta.timestamp);
continue;
}
} else {
let new_schema = builder.schema().clone();
info!("Detected schema with {} fields", new_schema.fields().len());
schema = Some(new_schema);
}
let mut stream = builder.build()?;
let mut file_had_records = false;
while let Some(batch_result) = stream.try_next().await? {
if batch_result.num_rows() == 0 {
continue;
}
file_had_records = true;
let batch_size = batch_result.get_array_memory_size();
if !accumulator.is_empty()
&& (accumulator.estimated_size() + batch_size) > config.max_bytes_per_file
{
info!(
"Size limit reached, measuring planned output ({} bytes, {} records)",
accumulator.estimated_size(),
accumulator.total_records()
);
let (batches, dup_count) = accumulator.take_batches();
let original_bytes: usize = source_files.iter().map(|f| f.size).sum();
let output_bytes = measure_output_size(
&batches,
schema.as_ref().ok_or(Error::Internal(
"schema not initialized during compaction".into(),
))?,
config.compression,
config.row_group_size,
)?;
let record_count: usize = batches.iter().map(|b| b.num_rows()).sum();
files_created += 1;
records_consolidated += record_count;
bytes_saved += original_bytes.saturating_sub(output_bytes);
duplicate_records_eliminated += dup_count;
source_files = Vec::new();
}
accumulator.add_batch(batch_result, config.enable_deduplication)?;
}
if !file_had_records {
info!("Skipping file with no records: {}", file_meta.key);
}
source_files.push(file_meta.clone());
files_processed += 1;
last_processed_timestamp = Some(file_meta.timestamp);
}
if !accumulator.is_empty() {
let (batches, dup_count) = accumulator.take_batches();
let original_bytes: usize = source_files.iter().map(|f| f.size).sum();
let output_bytes = measure_output_size(
&batches,
schema.as_ref().ok_or(CompactionError::NoSourceFiles)?,
config.compression,
config.row_group_size,
)?;
let record_count: usize = batches.iter().map(|b| b.num_rows()).sum();
files_created += 1;
records_consolidated += record_count;
bytes_saved += original_bytes.saturating_sub(output_bytes);
duplicate_records_eliminated += dup_count;
}
info!(
"Schema-agnostic compaction plan complete: {} files -> {} files, {} records, {} duplicates, ~{} bytes saved",
files_processed,
files_created,
records_consolidated,
duplicate_records_eliminated,
bytes_saved
);
Ok(CompactionResult {
files_processed,
files_created,
records_consolidated,
bytes_saved,
duplicate_records_eliminated,
last_processed_timestamp,
deletion_failures: Vec::new(),
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_compaction_result_empty() {
let result = CompactionResult::empty();
assert_eq!(result.files_processed, 0);
assert_eq!(result.files_created, 0);
assert_eq!(result.records_consolidated, 0);
assert_eq!(result.bytes_saved, 0);
assert_eq!(result.duplicate_records_eliminated, 0);
assert_eq!(result.last_processed_timestamp, None);
assert!(result.deletion_failures.is_empty());
}
#[test]
fn test_finalize_result() {
let result = FinalizeResult {
records_count: 100,
bytes_saved: 1024,
duplicate_count: 0,
deletion_failures: Vec::new(),
};
assert_eq!(result.records_count, 100);
assert_eq!(result.bytes_saved, 1024);
assert_eq!(result.duplicate_count, 0);
assert!(result.deletion_failures.is_empty());
}
}