use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::str::FromStr;
use std::sync::Arc;
use datafusion::arrow::array::{ArrayRef, RecordBatch, StringArray};
use datafusion::arrow::datatypes::{
DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
};
use datafusion::common::Result as DFResult;
use datafusion::error::DataFusionError;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
execute_input_stream,
};
use futures::StreamExt;
use iceberg::arrow::FieldMatchMode;
use iceberg::spec::{DataFileFormat, serialize_data_file_to_json};
use iceberg::table::Table;
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
use iceberg::writer::file_writer::ParquetWriterBuilder;
use iceberg::writer::file_writer::location_generator::{
DefaultFileNameGenerator, DefaultLocationGenerator,
};
use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
use iceberg::{Error, ErrorKind};
use parquet::file::properties::WriterProperties;
use uuid::Uuid;
use crate::physical_plan::DATA_FILES_COL_NAME;
use crate::task_writer::TaskWriter;
use crate::to_datafusion_error;
#[derive(Debug)]
pub(crate) struct IcebergWriteExec {
table: Table,
input: Arc<dyn ExecutionPlan>,
result_schema: ArrowSchemaRef,
plan_properties: PlanProperties,
}
impl IcebergWriteExec {
pub fn new(table: Table, input: Arc<dyn ExecutionPlan>, schema: ArrowSchemaRef) -> Self {
let plan_properties = Self::compute_properties(&input, schema);
Self {
table,
input,
result_schema: Self::make_result_schema(),
plan_properties,
}
}
fn compute_properties(
input: &Arc<dyn ExecutionPlan>,
schema: ArrowSchemaRef,
) -> PlanProperties {
PlanProperties::new(
EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(input.output_partitioning().partition_count()),
EmissionType::Final,
Boundedness::Bounded,
)
}
fn make_result_batch(data_files: Vec<String>) -> DFResult<RecordBatch> {
let files_array = Arc::new(StringArray::from(data_files)) as ArrayRef;
RecordBatch::try_new(Self::make_result_schema(), vec![files_array]).map_err(|e| {
DataFusionError::ArrowError(
Box::new(e),
Some("Failed to make result batch".to_string()),
)
})
}
fn make_result_schema() -> ArrowSchemaRef {
Arc::new(ArrowSchema::new(vec![Field::new(
DATA_FILES_COL_NAME,
DataType::Utf8,
false,
)]))
}
}
impl DisplayAs for IcebergWriteExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
write!(f, "IcebergWriteExec: table={}", self.table.identifier())
}
DisplayFormatType::Verbose => {
write!(
f,
"IcebergWriteExec: table={}, result_schema={:?}",
self.table.identifier(),
self.result_schema
)
}
DisplayFormatType::TreeRender => {
write!(f, "IcebergWriteExec: table={}", self.table.identifier())
}
}
}
}
impl ExecutionPlan for IcebergWriteExec {
fn name(&self) -> &str {
"IcebergWriteExec"
}
fn as_any(&self) -> &dyn Any {
self
}
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
vec![false]
}
fn maintains_input_order(&self) -> Vec<bool> {
vec![true; self.children().len()]
}
fn properties(&self) -> &PlanProperties {
&self.plan_properties
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
if children.len() != 1 {
return Err(DataFusionError::Internal(format!(
"IcebergWriteExec expects exactly one child, but provided {}",
children.len()
)));
}
Ok(Arc::new(Self::new(
self.table.clone(),
Arc::clone(&children[0]),
self.schema(),
)))
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> DFResult<SendableRecordBatchStream> {
let partition_type = self.table.metadata().default_partition_type().clone();
let format_version = self.table.metadata().format_version();
let table_props = self
.table
.metadata()
.table_properties()
.map_err(to_datafusion_error)?;
let file_format = DataFileFormat::from_str(&table_props.write_format_default)
.map_err(to_datafusion_error)?;
if file_format != DataFileFormat::Parquet {
return Err(to_datafusion_error(Error::new(
ErrorKind::FeatureUnsupported,
format!("File format {file_format} is not supported for insert_into yet!"),
)));
}
let parquet_file_writer_builder = ParquetWriterBuilder::new_with_match_mode(
WriterProperties::default(),
self.table.metadata().current_schema().clone(),
FieldMatchMode::Name,
);
let target_file_size = table_props.write_target_file_size_bytes;
let file_io = self.table.file_io().clone();
let location_generator = DefaultLocationGenerator::new(self.table.metadata().clone())
.map_err(to_datafusion_error)?;
let file_name_generator =
DefaultFileNameGenerator::new(Uuid::now_v7().to_string(), None, file_format);
let rolling_writer_builder = RollingFileWriterBuilder::new(
parquet_file_writer_builder,
target_file_size,
file_io,
location_generator,
file_name_generator,
);
let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder);
let fanout_enabled = table_props.write_datafusion_fanout_enabled;
let schema = self.table.metadata().current_schema().clone();
let partition_spec = self.table.metadata().default_partition_spec().clone();
let task_writer = TaskWriter::try_new(
data_file_writer_builder,
fanout_enabled,
schema.clone(),
partition_spec,
)
.map_err(to_datafusion_error)?;
let data = execute_input_stream(
Arc::clone(&self.input),
self.input.schema(), partition,
Arc::clone(&context),
)?;
let stream = futures::stream::once(async move {
let mut task_writer = task_writer;
let mut input_stream = data;
while let Some(batch) = input_stream.next().await {
let batch = batch?;
task_writer
.write(batch)
.await
.map_err(to_datafusion_error)?;
}
let data_files = task_writer.close().await.map_err(to_datafusion_error)?;
let data_files_strs: Vec<String> = data_files
.into_iter()
.map(|data_file| {
serialize_data_file_to_json(data_file, &partition_type, format_version)
.map_err(to_datafusion_error)
})
.collect::<DFResult<Vec<String>>>()?;
Self::make_result_batch(data_files_strs)
})
.boxed();
Ok(Box::pin(RecordBatchStreamAdapter::new(
Arc::clone(&self.result_schema),
stream,
)))
}
}
#[cfg(test)]
mod tests {
use std::any::Any;
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use datafusion::arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use datafusion::arrow::datatypes::{
DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
};
use datafusion::common::Result as DFResult;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
use futures::{StreamExt, stream};
use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
use iceberg::spec::{
DataFileFormat, NestedField, PrimitiveType, Schema, Type, deserialize_data_file_from_json,
};
use iceberg::{Catalog, CatalogBuilder, MemoryCatalog, NamespaceIdent, Result, TableCreation};
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
use tempfile::TempDir;
use super::*;
struct MockExecutionPlan {
schema: ArrowSchemaRef,
batches: Vec<RecordBatch>,
properties: PlanProperties,
}
impl MockExecutionPlan {
fn new(schema: ArrowSchemaRef, batches: Vec<RecordBatch>) -> Self {
let properties = PlanProperties::new(
EquivalenceProperties::new(schema.clone()),
Partitioning::UnknownPartitioning(1),
EmissionType::Final,
Boundedness::Bounded,
);
Self {
schema,
batches,
properties,
}
}
}
impl Debug for MockExecutionPlan {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "MockExecutionPlan")
}
}
impl DisplayAs for MockExecutionPlan {
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default
| DisplayFormatType::Verbose
| DisplayFormatType::TreeRender => {
write!(f, "MockExecutionPlan")
}
}
}
}
impl ExecutionPlan for MockExecutionPlan {
fn name(&self) -> &str {
"MockExecutionPlan"
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &PlanProperties {
&self.properties
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
Ok(self)
}
fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> DFResult<SendableRecordBatchStream> {
let batches = self.batches.clone();
let stream = stream::iter(batches.into_iter().map(Ok));
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema.clone(),
stream.boxed(),
)))
}
}
fn temp_path() -> String {
let temp_dir = TempDir::new().unwrap();
temp_dir.path().to_str().unwrap().to_string()
}
async fn get_iceberg_catalog() -> MemoryCatalog {
MemoryCatalogBuilder::default()
.load(
"memory",
HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), temp_path())]),
)
.await
.unwrap()
}
fn get_test_schema() -> Result<Schema> {
Schema::builder()
.with_schema_id(0)
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
])
.build()
}
fn get_table_creation(
location: impl ToString,
name: impl ToString,
schema: Schema,
) -> TableCreation {
TableCreation::builder()
.location(location.to_string())
.name(name.to_string())
.properties(HashMap::new())
.schema(schema)
.build()
}
#[tokio::test]
async fn test_iceberg_write_exec() -> Result<()> {
let iceberg_catalog = get_iceberg_catalog().await;
let namespace = NamespaceIdent::new("test_namespace".to_string());
iceberg_catalog
.create_namespace(&namespace, HashMap::new())
.await?;
let schema = get_test_schema()?;
let table_name = "test_table";
let table_location = temp_path();
let creation = get_table_creation(table_location, table_name, schema);
let table = iceberg_catalog.create_table(&namespace, creation).await?;
let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)])),
Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"2".to_string(),
)])),
]));
let id_array = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
let name_array = Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])) as ArrayRef;
let batch = RecordBatch::try_new(arrow_schema.clone(), vec![id_array, name_array])
.map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to create record batch: {e}"),
)
})?;
let input_plan = Arc::new(MockExecutionPlan::new(arrow_schema.clone(), vec![
batch.clone(),
]));
let write_exec = IcebergWriteExec::new(table.clone(), input_plan, arrow_schema);
let task_ctx = Arc::new(TaskContext::default());
let stream = write_exec.execute(0, task_ctx).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to execute plan: {e}"),
)
})?;
let mut results = vec![];
let mut stream = stream;
while let Some(batch) = stream.next().await {
results.push(batch.map_err(|e| {
Error::new(ErrorKind::Unexpected, format!("Failed to get batch: {e}"))
})?);
}
assert_eq!(results.len(), 1, "Expected one result batch");
let result_batch = &results[0];
assert_eq!(
result_batch.schema().as_ref(),
&ArrowSchema::new(vec![Field::new(DATA_FILES_COL_NAME, DataType::Utf8, false)])
);
assert_eq!(result_batch.num_rows(), 1, "Expected one data file");
let data_file_json = result_batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.expect("Expected StringArray")
.value(0);
let partition_type = table.metadata().default_partition_type();
let spec_id = table.metadata().default_partition_spec_id();
let schema = table.metadata().current_schema();
let data_file =
deserialize_data_file_from_json(data_file_json, spec_id, partition_type, schema)
.expect("Failed to deserialize data file JSON");
assert_eq!(
data_file.record_count(),
3,
"Expected 3 records in the data file"
);
assert!(
data_file.file_size_in_bytes() > 0,
"File size should be greater than 0"
);
assert_eq!(
data_file.file_format(),
DataFileFormat::Parquet,
"Expected Parquet file format"
);
assert!(
data_file.column_sizes().get(&1).unwrap() > &0,
"Column 1 size should be greater than 0"
);
assert!(
data_file.column_sizes().get(&2).unwrap() > &0,
"Column 2 size should be greater than 0"
);
assert_eq!(
*data_file.value_counts().get(&1).unwrap(),
3,
"Expected 3 values for column 1"
);
assert_eq!(
*data_file.value_counts().get(&2).unwrap(),
3,
"Expected 3 values for column 2"
);
assert!(
data_file.lower_bounds().contains_key(&1) || data_file.lower_bounds().contains_key(&2),
"Expected lower bounds to contain at least one column"
);
assert!(
data_file.upper_bounds().contains_key(&1) || data_file.upper_bounds().contains_key(&2),
"Expected upper bounds to contain at least one column"
);
let file_path = data_file.file_path();
assert!(!file_path.is_empty(), "File path should not be empty");
let file_io = table.file_io();
assert!(file_io.exists(file_path).await?, "Data file should exist");
Ok(())
}
}