use crate::arrow::format::{create_format_writer, FormatFileWriter};
use crate::io::FileIO;
use crate::spec::stats::BinaryTableStats;
use crate::spec::{DataFileMeta, EMPTY_SERIALIZED_ROW};
use crate::Result;
use arrow_array::RecordBatch;
use chrono::Utc;
use tokio::task::JoinSet;
pub(crate) struct DataFileWriter {
file_io: FileIO,
table_location: String,
partition_path: String,
bucket: i32,
schema_id: i64,
target_file_size: i64,
file_compression: String,
file_compression_zstd_level: i32,
write_buffer_size: i64,
file_source: Option<i32>,
first_row_id: Option<i64>,
write_cols: Option<Vec<String>>,
written_files: Vec<DataFileMeta>,
in_flight_closes: JoinSet<Result<DataFileMeta>>,
current_writer: Option<Box<dyn FormatFileWriter>>,
current_file_name: Option<String>,
current_row_count: i64,
}
impl DataFileWriter {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
file_io: FileIO,
table_location: String,
partition_path: String,
bucket: i32,
schema_id: i64,
target_file_size: i64,
file_compression: String,
file_compression_zstd_level: i32,
write_buffer_size: i64,
file_source: Option<i32>,
first_row_id: Option<i64>,
write_cols: Option<Vec<String>>,
) -> Self {
Self {
file_io,
table_location,
partition_path,
bucket,
schema_id,
target_file_size,
file_compression,
file_compression_zstd_level,
write_buffer_size,
file_source,
first_row_id,
write_cols,
written_files: Vec::new(),
in_flight_closes: JoinSet::new(),
current_writer: None,
current_file_name: None,
current_row_count: 0,
}
}
pub(crate) async fn write(&mut self, batch: &RecordBatch) -> Result<()> {
if batch.num_rows() == 0 {
return Ok(());
}
if self.current_writer.is_none() {
self.open_new_file(batch.schema()).await?;
}
self.current_row_count += batch.num_rows() as i64;
self.current_writer.as_mut().unwrap().write(batch).await?;
if self.current_writer.as_ref().unwrap().num_bytes() as i64 >= self.target_file_size {
self.roll_file();
}
if let Some(w) = self.current_writer.as_mut() {
if w.in_progress_size() as i64 >= self.write_buffer_size {
w.flush().await?;
}
}
Ok(())
}
async fn open_new_file(&mut self, schema: arrow_schema::SchemaRef) -> Result<()> {
let file_name = format!(
"data-{}-{}.parquet",
uuid::Uuid::new_v4(),
self.written_files.len()
);
let bucket_dir = if self.partition_path.is_empty() {
format!("{}/bucket-{}", self.table_location, self.bucket)
} else {
format!(
"{}/{}/bucket-{}",
self.table_location, self.partition_path, self.bucket
)
};
self.file_io.mkdirs(&format!("{bucket_dir}/")).await?;
let file_path = format!("{bucket_dir}/{file_name}");
let output = self.file_io.new_output(&file_path)?;
let writer = create_format_writer(
&output,
schema,
&self.file_compression,
self.file_compression_zstd_level,
)
.await?;
self.current_writer = Some(writer);
self.current_file_name = Some(file_name);
self.current_row_count = 0;
Ok(())
}
pub(crate) async fn close_current_file(&mut self) -> Result<()> {
let writer = match self.current_writer.take() {
Some(w) => w,
None => return Ok(()),
};
let file_name = self.current_file_name.take().unwrap();
let row_count = self.current_row_count;
self.current_row_count = 0;
let file_size = writer.close().await? as i64;
let meta = Self::build_meta(
file_name,
file_size,
row_count,
self.schema_id,
self.file_source,
self.first_row_id,
self.write_cols.clone(),
);
self.written_files.push(meta);
Ok(())
}
fn roll_file(&mut self) {
let writer = match self.current_writer.take() {
Some(w) => w,
None => return,
};
let file_name = self.current_file_name.take().unwrap();
let row_count = self.current_row_count;
self.current_row_count = 0;
let schema_id = self.schema_id;
let file_source = self.file_source;
let first_row_id = self.first_row_id;
let write_cols = self.write_cols.clone();
self.in_flight_closes.spawn(async move {
let file_size = writer.close().await? as i64;
Ok(Self::build_meta(
file_name,
file_size,
row_count,
schema_id,
file_source,
first_row_id,
write_cols,
))
});
}
pub(crate) async fn prepare_commit(&mut self) -> Result<Vec<DataFileMeta>> {
self.close_current_file().await?;
while let Some(result) = self.in_flight_closes.join_next().await {
let meta = result.map_err(|e| crate::Error::DataInvalid {
message: format!("Background file close task panicked: {e}"),
source: None,
})??;
self.written_files.push(meta);
}
Ok(std::mem::take(&mut self.written_files))
}
fn build_meta(
file_name: String,
file_size: i64,
row_count: i64,
schema_id: i64,
file_source: Option<i32>,
first_row_id: Option<i64>,
write_cols: Option<Vec<String>>,
) -> DataFileMeta {
DataFileMeta {
file_name,
file_size,
row_count,
min_key: EMPTY_SERIALIZED_ROW.clone(),
max_key: EMPTY_SERIALIZED_ROW.clone(),
key_stats: BinaryTableStats::new(
EMPTY_SERIALIZED_ROW.clone(),
EMPTY_SERIALIZED_ROW.clone(),
vec![],
),
value_stats: BinaryTableStats::new(
EMPTY_SERIALIZED_ROW.clone(),
EMPTY_SERIALIZED_ROW.clone(),
vec![],
),
min_sequence_number: 0,
max_sequence_number: 0,
schema_id,
level: 0,
extra_files: vec![],
creation_time: Some(Utc::now()),
delete_row_count: Some(0),
embedded_index: None,
file_source,
value_stats_cols: Some(vec![]),
external_path: None,
first_row_id,
write_cols,
}
}
}