use std::collections::HashMap;
use std::num::NonZeroU64;
use std::sync::OnceLock;
use arrow_array::RecordBatch;
use arrow_schema::{ArrowError, SchemaRef as ArrowSchemaRef};
use delta_kernel::expressions::Scalar;
use delta_kernel::table_properties::DataSkippingNumIndexedCols;
use futures::{StreamExt, TryStreamExt};
use indexmap::IndexMap;
use object_store::buffered::BufWriter;
use object_store::path::Path;
use parquet::arrow::AsyncArrowWriter;
use parquet::arrow::async_writer::ParquetObjectWriter;
use parquet::basic::Compression;
use parquet::file::properties::WriterProperties;
use tokio::task::JoinSet;
use tracing::*;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Add, PartitionsExt};
use crate::logstore::ObjectStoreRef;
use crate::parquet_utils::default_writer_properties;
use crate::writer::record_batch::{PartitionResult, divide_by_partition_values};
use crate::writer::stats::create_add;
use crate::writer::utils::{
arrow_schema_without_partitions, next_data_path, record_batch_without_partitions,
};
use parquet::file::metadata::ParquetMetaData;
const DEFAULT_WRITE_BATCH_SIZE: usize = 1024;
const DEFAULT_UPLOAD_PART_SIZE: usize = 1024 * 1024 * 5;
const DEFAULT_MAX_CONCURRENCY_TASKS: usize = 10;
fn upload_part_size() -> usize {
static UPLOAD_SIZE: OnceLock<usize> = OnceLock::new();
*UPLOAD_SIZE.get_or_init(|| {
std::env::var("DELTARS_UPLOAD_PART_SIZE")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.map(|size| {
if size < DEFAULT_UPLOAD_PART_SIZE {
debug!("DELTARS_UPLOAD_PART_SIZE must be at least 5MB, therefore falling back on default of 5MB.");
DEFAULT_UPLOAD_PART_SIZE
} else if size > 1024 * 1024 * 1024 * 5 {
debug!("DELTARS_UPLOAD_PART_SIZE must not be higher than 5GB, therefore capping it at 5GB.");
1024 * 1024 * 1024 * 5
} else {
size
}
})
.unwrap_or(DEFAULT_UPLOAD_PART_SIZE)
})
}
fn get_max_concurrency_tasks() -> usize {
static MAX_CONCURRENCY_TASKS: OnceLock<usize> = OnceLock::new();
*MAX_CONCURRENCY_TASKS.get_or_init(|| {
std::env::var("DELTARS_MAX_CONCURRENCY_TASKS")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(DEFAULT_MAX_CONCURRENCY_TASKS)
})
}
#[instrument(skip(arrow_writer), fields(rows = 0, size = 0))]
async fn upload_parquet_file(
mut arrow_writer: AsyncArrowWriter<ParquetObjectWriter>,
path: Path,
) -> DeltaResult<(Path, usize, ParquetMetaData)> {
let metadata = arrow_writer.finish().await?;
let file_size = arrow_writer.bytes_written();
Span::current().record("rows", metadata.file_metadata().num_rows());
Span::current().record("size", file_size);
debug!("multipart upload completed successfully");
Ok((path, file_size, metadata))
}
fn sort_completed_writes_by_path<T>(results: &mut [(Path, usize, T)]) {
results.sort_unstable_by(|a, b| a.0.cmp(&b.0));
}
#[derive(thiserror::Error, Debug)]
enum WriteError {
#[error("Unexpected Arrow schema: got: {schema}, expected: {expected_schema}")]
SchemaMismatch {
schema: ArrowSchemaRef,
expected_schema: ArrowSchemaRef,
},
#[error("Error creating add action: {source}")]
CreateAdd {
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
#[error("Error handling Arrow data: {source}")]
Arrow {
#[from]
source: ArrowError,
},
#[error("Error partitioning record batch: {0}")]
Partitioning(String),
}
impl From<WriteError> for DeltaTableError {
fn from(err: WriteError) -> Self {
match err {
WriteError::SchemaMismatch { .. } => DeltaTableError::SchemaMismatch {
msg: err.to_string(),
},
WriteError::Arrow { source } => DeltaTableError::Arrow { source },
_ => DeltaTableError::GenericError {
source: Box::new(err),
},
}
}
}
#[derive(Debug, Clone)]
pub struct WriterConfig {
table_schema: ArrowSchemaRef,
partition_columns: Vec<String>,
writer_properties: WriterProperties,
target_file_size: Option<NonZeroU64>,
write_batch_size: usize,
num_indexed_cols: DataSkippingNumIndexedCols,
stats_columns: Option<Vec<String>>,
}
impl WriterConfig {
pub fn new(
table_schema: ArrowSchemaRef,
partition_columns: Vec<String>,
writer_properties: Option<WriterProperties>,
target_file_size: Option<NonZeroU64>,
write_batch_size: Option<usize>,
num_indexed_cols: DataSkippingNumIndexedCols,
stats_columns: Option<Vec<String>>,
) -> Self {
let writer_properties =
writer_properties.unwrap_or_else(|| default_writer_properties(Compression::SNAPPY));
let write_batch_size = write_batch_size.unwrap_or(DEFAULT_WRITE_BATCH_SIZE);
Self {
table_schema,
partition_columns,
writer_properties,
target_file_size,
write_batch_size,
num_indexed_cols,
stats_columns,
}
}
pub fn file_schema(&self) -> ArrowSchemaRef {
arrow_schema_without_partitions(&self.table_schema, &self.partition_columns)
}
}
pub struct DeltaWriter {
object_store: ObjectStoreRef,
config: WriterConfig,
partition_writers: HashMap<Path, PartitionWriter>,
}
impl DeltaWriter {
pub fn new(object_store: ObjectStoreRef, config: WriterConfig) -> Self {
Self {
object_store,
config,
partition_writers: HashMap::new(),
}
}
pub fn with_writer_properties(mut self, writer_properties: WriterProperties) -> Self {
self.config.writer_properties = writer_properties;
self
}
fn divide_by_partition_values(
&mut self,
values: &RecordBatch,
) -> DeltaResult<Vec<PartitionResult>> {
Ok(divide_by_partition_values(
self.config.file_schema(),
self.config.partition_columns.clone(),
values,
)
.map_err(|err| WriteError::Partitioning(err.to_string()))?)
}
pub async fn write_partition(
&mut self,
record_batch: RecordBatch,
partition_values: &IndexMap<String, Scalar>,
) -> DeltaResult<()> {
let partition_key = Path::parse(partition_values.hive_partition_path())?;
let record_batch =
record_batch_without_partitions(&record_batch, &self.config.partition_columns)?;
match self.partition_writers.get_mut(&partition_key) {
Some(writer) => {
writer.write(&record_batch).await?;
}
None => {
let config = PartitionWriterConfig::try_new(
self.config.file_schema(),
partition_values.clone(),
Some(self.config.writer_properties.clone()),
self.config.target_file_size,
Some(self.config.write_batch_size),
None,
)?;
let mut writer = PartitionWriter::try_with_config(
self.object_store.clone(),
config,
self.config.num_indexed_cols,
self.config.stats_columns.clone(),
)?;
writer.write(&record_batch).await?;
let _ = self.partition_writers.insert(partition_key, writer);
}
}
Ok(())
}
pub async fn write(&mut self, batch: &RecordBatch) -> DeltaResult<()> {
for result in self.divide_by_partition_values(batch)? {
self.write_partition(result.record_batch, &result.partition_values)
.await?;
}
Ok(())
}
pub async fn close(mut self) -> DeltaResult<Vec<Add>> {
let writers = std::mem::take(&mut self.partition_writers);
let actions = futures::stream::iter(writers)
.map(|(_, writer)| async move {
let writer_actions = writer.close().await?;
Ok::<_, DeltaTableError>(writer_actions)
})
.buffered(num_cpus::get())
.try_fold(Vec::new(), |mut acc, actions| {
acc.extend(actions);
futures::future::ready(Ok(acc))
})
.await?;
Ok(actions)
}
}
#[derive(Debug, Clone)]
pub struct PartitionWriterConfig {
file_schema: ArrowSchemaRef,
prefix: Path,
partition_values: IndexMap<String, Scalar>,
writer_properties: WriterProperties,
target_file_size: Option<NonZeroU64>,
write_batch_size: usize,
max_concurrency_tasks: usize,
}
impl PartitionWriterConfig {
pub fn try_new(
file_schema: ArrowSchemaRef,
partition_values: IndexMap<String, Scalar>,
writer_properties: Option<WriterProperties>,
target_file_size: Option<NonZeroU64>,
write_batch_size: Option<usize>,
max_concurrency_tasks: Option<usize>,
) -> DeltaResult<Self> {
let part_path = partition_values.hive_partition_path();
let prefix = Path::parse(part_path)?;
let writer_properties =
writer_properties.unwrap_or_else(|| default_writer_properties(Compression::SNAPPY));
let write_batch_size = write_batch_size.unwrap_or(DEFAULT_WRITE_BATCH_SIZE);
Ok(Self {
file_schema,
prefix,
partition_values,
writer_properties,
target_file_size,
write_batch_size,
max_concurrency_tasks: max_concurrency_tasks.unwrap_or_else(get_max_concurrency_tasks),
})
}
}
enum LazyArrowWriter {
Initialized(Path, ObjectStoreRef, PartitionWriterConfig),
Writing(Path, AsyncArrowWriter<ParquetObjectWriter>),
}
impl LazyArrowWriter {
async fn write_batch(&mut self, batch: &RecordBatch) -> DeltaResult<()> {
match self {
LazyArrowWriter::Initialized(path, object_store, config) => {
let writer = ParquetObjectWriter::from_buf_writer(
BufWriter::with_capacity(
object_store.clone(),
path.clone(),
upload_part_size(),
)
.with_max_concurrency(config.max_concurrency_tasks),
);
let mut arrow_writer = AsyncArrowWriter::try_new(
writer,
config.file_schema.clone(),
Some(config.writer_properties.clone()),
)?;
arrow_writer.write(batch).await?;
*self = LazyArrowWriter::Writing(path.clone(), arrow_writer);
}
LazyArrowWriter::Writing(_, arrow_writer) => {
arrow_writer.write(batch).await?;
}
}
Ok(())
}
fn estimated_size(&self) -> usize {
match self {
LazyArrowWriter::Initialized(_, _, _) => 0,
LazyArrowWriter::Writing(_, arrow_writer) => {
arrow_writer.bytes_written() + arrow_writer.in_progress_size()
}
}
}
}
pub struct PartitionWriter {
object_store: ObjectStoreRef,
writer_id: uuid::Uuid,
config: PartitionWriterConfig,
writer: LazyArrowWriter,
part_counter: usize,
num_indexed_cols: DataSkippingNumIndexedCols,
stats_columns: Option<Vec<String>>,
in_flight_writers: JoinSet<DeltaResult<(Path, usize, ParquetMetaData)>>,
}
impl PartitionWriter {
pub fn try_with_config(
object_store: ObjectStoreRef,
config: PartitionWriterConfig,
num_indexed_cols: DataSkippingNumIndexedCols,
stats_columns: Option<Vec<String>>,
) -> DeltaResult<Self> {
let writer_id = uuid::Uuid::new_v4();
let first_path = next_data_path(&config.prefix, 0, &writer_id, &config.writer_properties);
let writer = Self::create_writer(object_store.clone(), first_path.clone(), &config)?;
Ok(Self {
object_store,
writer_id,
config,
writer,
part_counter: 0,
num_indexed_cols,
stats_columns,
in_flight_writers: JoinSet::new(),
})
}
fn create_writer(
object_store: ObjectStoreRef,
path: Path,
config: &PartitionWriterConfig,
) -> DeltaResult<LazyArrowWriter> {
let state = LazyArrowWriter::Initialized(path, object_store.clone(), config.clone());
Ok(state)
}
fn next_data_path(&mut self) -> Path {
self.part_counter += 1;
next_data_path(
&self.config.prefix,
self.part_counter,
&self.writer_id,
&self.config.writer_properties,
)
}
fn reset_writer(&mut self) -> DeltaResult<()> {
let next_path = self.next_data_path();
let new_writer = Self::create_writer(self.object_store.clone(), next_path, &self.config)?;
let state = std::mem::replace(&mut self.writer, new_writer);
if let LazyArrowWriter::Writing(path, arrow_writer) = state {
self.in_flight_writers
.spawn(upload_parquet_file(arrow_writer, path));
}
Ok(())
}
pub async fn write(&mut self, batch: &RecordBatch) -> DeltaResult<()> {
if batch.schema() != self.config.file_schema {
return Err(WriteError::SchemaMismatch {
schema: batch.schema(),
expected_schema: self.config.file_schema.clone(),
}
.into());
}
let max_offset = batch.num_rows();
for offset in (0..max_offset).step_by(self.config.write_batch_size) {
let length = usize::min(self.config.write_batch_size, max_offset - offset);
self.writer
.write_batch(&batch.slice(offset, length))
.await?;
if let Some(target_file_size) = self.config.target_file_size {
let estimated_size = self.writer.estimated_size();
if estimated_size as u64 >= target_file_size.get() {
debug!("Writing file with estimated size {estimated_size:?} in background.");
self.reset_writer()?;
}
}
}
Ok(())
}
pub async fn close(mut self) -> DeltaResult<Vec<Add>> {
if let LazyArrowWriter::Writing(path, arrow_writer) = self.writer {
self.in_flight_writers
.spawn(upload_parquet_file(arrow_writer, path));
}
let mut results = Vec::new();
while let Some(result) = self.in_flight_writers.join_next().await {
match result {
Ok(Ok(data)) => results.push(data),
Ok(Err(e)) => {
return Err(e);
}
Err(e) => {
return Err(DeltaTableError::GenericError {
source: Box::new(e),
});
}
}
}
sort_completed_writes_by_path(&mut results);
let adds = results
.into_iter()
.map(|(path, file_size, metadata)| {
create_add(
&self.config.partition_values,
path.to_string(),
file_size as i64,
&metadata,
self.num_indexed_cols,
&self.stats_columns,
)
.map_err(|err| WriteError::CreateAdd {
source: Box::new(err),
})
})
.collect::<Result<Vec<_>, _>>()?;
Ok(adds)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::DeltaTableBuilder;
use crate::crate_version;
use crate::logstore::tests::flatten_list_stream as list;
use crate::table::config::DEFAULT_NUM_INDEX_COLS;
use crate::writer::test_utils::*;
use arrow::array::{Int32Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use object_store::ObjectStoreExt as _;
use parquet::schema::types::ColumnPath;
use std::sync::Arc;
fn get_delta_writer(
object_store: ObjectStoreRef,
batch: &RecordBatch,
writer_properties: Option<WriterProperties>,
target_file_size: Option<NonZeroU64>,
write_batch_size: Option<usize>,
) -> DeltaWriter {
let config = WriterConfig::new(
batch.schema(),
vec![],
writer_properties,
target_file_size,
write_batch_size,
DataSkippingNumIndexedCols::NumColumns(DEFAULT_NUM_INDEX_COLS),
None,
);
DeltaWriter::new(object_store, config)
}
fn get_partition_writer(
object_store: ObjectStoreRef,
batch: &RecordBatch,
writer_properties: Option<WriterProperties>,
target_file_size: Option<NonZeroU64>,
write_batch_size: Option<usize>,
) -> PartitionWriter {
let config = PartitionWriterConfig::try_new(
batch.schema(),
IndexMap::new(),
writer_properties,
target_file_size,
write_batch_size,
None,
)
.unwrap();
PartitionWriter::try_with_config(
object_store,
config,
DataSkippingNumIndexedCols::NumColumns(DEFAULT_NUM_INDEX_COLS),
None,
)
.unwrap()
}
fn assert_default_created_by(writer_properties: &WriterProperties) {
assert_eq!(
writer_properties.created_by(),
format!("delta-rs version {}", crate_version())
);
}
#[test]
fn test_writer_config_defaults_include_delta_rs_created_by() {
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"id",
DataType::Int32,
true,
)]));
let config = WriterConfig::new(
schema,
vec![],
None,
None,
None,
DataSkippingNumIndexedCols::NumColumns(DEFAULT_NUM_INDEX_COLS),
None,
);
assert_default_created_by(&config.writer_properties);
assert_eq!(
config
.writer_properties
.compression(&ColumnPath::from("id")),
Compression::SNAPPY
);
}
#[test]
fn test_partition_writer_config_defaults_include_delta_rs_created_by() {
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"id",
DataType::Int32,
true,
)]));
let config =
PartitionWriterConfig::try_new(schema, IndexMap::new(), None, None, None, None)
.unwrap();
assert_default_created_by(&config.writer_properties);
assert_eq!(
config
.writer_properties
.compression(&ColumnPath::from("id")),
Compression::SNAPPY
);
}
#[tokio::test]
async fn test_write_partition() {
let log_store = DeltaTableBuilder::from_url(url::Url::parse("memory:///").unwrap())
.unwrap()
.build_storage()
.unwrap();
let object_store = log_store.object_store(None);
let batch = get_record_batch(None, false);
let mut writer = get_partition_writer(object_store.clone(), &batch, None, None, None);
writer.write(&batch).await.unwrap();
let files = list(object_store.as_ref(), None).await.unwrap();
assert_eq!(files.len(), 0);
let adds = writer.close().await.unwrap();
let files = list(object_store.as_ref(), None).await.unwrap();
assert_eq!(files.len(), 1);
assert_eq!(files.len(), adds.len());
let head = object_store
.head(&Path::from(adds[0].path.clone()))
.await
.unwrap();
assert_eq!(head.size, adds[0].size as u64)
}
#[tokio::test]
async fn test_write_partition_with_parts() {
let base_int = Arc::new(Int32Array::from((0..10000).collect::<Vec<i32>>()));
let base_str = Arc::new(StringArray::from(vec!["A"; 10000]));
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Utf8, true),
Field::new("value", DataType::Int32, true),
]));
let batch = RecordBatch::try_new(schema, vec![base_str, base_int]).unwrap();
let object_store = DeltaTableBuilder::from_url(url::Url::parse("memory:///").unwrap())
.unwrap()
.build_storage()
.unwrap()
.object_store(None);
let properties = WriterProperties::builder()
.set_max_row_group_size(1024)
.build();
let mut writer = get_partition_writer(
object_store,
&batch,
Some(properties),
Some(NonZeroU64::new(10_000).unwrap()),
None,
);
writer.write(&batch).await.unwrap();
let adds = writer.close().await.unwrap();
assert!(adds.len() > 1);
let target_file_count = adds
.iter()
.fold(0, |acc, add| acc + (add.size > 10_000) as i32);
assert!(target_file_count >= adds.len() as i32 - 1)
}
#[tokio::test]
async fn test_unflushed_row_group_size() {
let base_int = Arc::new(Int32Array::from((0..10000).collect::<Vec<i32>>()));
let base_str = Arc::new(StringArray::from(vec!["A"; 10000]));
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Utf8, true),
Field::new("value", DataType::Int32, true),
]));
let batch = RecordBatch::try_new(schema, vec![base_str, base_int]).unwrap();
let object_store = DeltaTableBuilder::from_url(url::Url::parse("memory:///").unwrap())
.unwrap()
.build_storage()
.unwrap()
.object_store(None);
let mut writer = get_partition_writer(
object_store,
&batch,
None,
Some(NonZeroU64::new(10_000).unwrap()),
None,
);
writer.write(&batch).await.unwrap();
let adds = writer.close().await.unwrap();
assert!(adds.len() > 1);
let target_file_count = adds
.iter()
.fold(0, |acc, add| acc + (add.size > 10_000) as i32);
assert!(target_file_count >= adds.len() as i32 - 1)
}
#[tokio::test]
async fn test_do_not_write_empty_file_on_close() {
let base_int = Arc::new(Int32Array::from((0..10000_i32).collect::<Vec<i32>>()));
let base_str = Arc::new(StringArray::from(vec!["A"; 10000]));
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Utf8, true),
Field::new("value", DataType::Int32, true),
]));
let batch = RecordBatch::try_new(schema, vec![base_str, base_int]).unwrap();
let object_store = DeltaTableBuilder::from_url(url::Url::parse("memory:///").unwrap())
.unwrap()
.build_storage()
.unwrap()
.object_store(None);
let mut writer = get_partition_writer(
object_store,
&batch,
None,
Some(NonZeroU64::new(9000).unwrap()),
Some(10000),
);
writer.write(&batch).await.unwrap();
let adds = writer.close().await.unwrap();
assert_eq!(adds.len(), 1);
}
#[test]
fn test_sort_completed_writes_by_path() {
let mut results = vec![
(Path::from("part-00002.parquet"), 3, 2_u8),
(Path::from("part-00000.parquet"), 1, 0_u8),
(Path::from("part-00001.parquet"), 2, 1_u8),
];
sort_completed_writes_by_path(&mut results);
let ordered_paths = results
.iter()
.map(|(path, _, _)| path.as_ref())
.collect::<Vec<_>>();
assert_eq!(
ordered_paths,
vec![
"part-00000.parquet",
"part-00001.parquet",
"part-00002.parquet"
]
);
}
#[tokio::test]
async fn test_write_mismatched_schema() {
let log_store = DeltaTableBuilder::from_url(url::Url::parse("memory:///").unwrap())
.unwrap()
.build_storage()
.unwrap();
let object_store = log_store.object_store(None);
let batch = get_record_batch(None, false);
let mut writer = get_delta_writer(object_store.clone(), &batch, None, None, None);
writer.write(&batch).await.unwrap();
let files = list(object_store.as_ref(), None).await.unwrap();
assert_eq!(files.len(), 0);
let second_schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, true),
Field::new("name", DataType::Utf8, true),
]));
let second_batch = RecordBatch::try_new(
second_schema,
vec![
Arc::new(Int32Array::from(vec![Some(1), Some(2)])),
Arc::new(StringArray::from(vec![Some("will"), Some("robert")])),
],
)
.unwrap();
let result = writer.write(&second_batch).await;
assert!(result.is_err());
match result {
Ok(_) => {
panic!("Should not have successfully written");
}
Err(e) => {
match e {
DeltaTableError::SchemaMismatch { .. } => {
}
others => {
panic!("Got the wrong error: {others:?}");
}
}
}
};
}
}