use std::{io::Write, sync::Arc};
use arrow_format::ipc::planus::Builder;
use super::{
super::IpcField,
super::ARROW_MAGIC,
common::{encode_chunk, DictionaryTracker, EncodedData, WriteOptions},
common_sync::{write_continuation, write_message},
default_ipc_fields, schema, schema_to_bytes,
};
use crate::array::Array;
use crate::chunk::Chunk;
use crate::datatypes::*;
use crate::error::{ArrowError, Result};
pub struct FileWriter<W: Write> {
writer: W,
options: WriteOptions,
schema: Schema,
ipc_fields: Vec<IpcField>,
block_offsets: usize,
dictionary_blocks: Vec<arrow_format::ipc::Block>,
record_blocks: Vec<arrow_format::ipc::Block>,
finished: bool,
dictionary_tracker: DictionaryTracker,
}
impl<W: Write> FileWriter<W> {
pub fn try_new(
mut writer: W,
schema: &Schema,
ipc_fields: Option<Vec<IpcField>>,
options: WriteOptions,
) -> Result<Self> {
writer.write_all(&ARROW_MAGIC[..])?;
writer.write_all(&[0, 0])?;
let ipc_fields = if let Some(ipc_fields) = ipc_fields {
ipc_fields
} else {
default_ipc_fields(&schema.fields)
};
let encoded_message = EncodedData {
ipc_message: schema_to_bytes(schema, &ipc_fields),
arrow_data: vec![],
};
let (meta, data) = write_message(&mut writer, encoded_message)?;
Ok(Self {
writer,
options,
schema: schema.clone(),
ipc_fields,
block_offsets: meta + data + 8,
dictionary_blocks: vec![],
record_blocks: vec![],
finished: false,
dictionary_tracker: DictionaryTracker::new(true),
})
}
pub fn into_inner(self) -> W {
self.writer
}
pub fn write(
&mut self,
columns: &Chunk<Arc<dyn Array>>,
ipc_fields: Option<&[IpcField]>,
) -> Result<()> {
if self.finished {
return Err(ArrowError::Io(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"Cannot write to a finished file".to_string(),
)));
}
let ipc_fields = if let Some(ipc_fields) = ipc_fields {
ipc_fields
} else {
self.ipc_fields.as_ref()
};
let (encoded_dictionaries, encoded_message) = encode_chunk(
columns,
ipc_fields,
&mut self.dictionary_tracker,
&self.options,
)?;
for encoded_dictionary in encoded_dictionaries {
let (meta, data) = write_message(&mut self.writer, encoded_dictionary)?;
let block = arrow_format::ipc::Block {
offset: self.block_offsets as i64,
meta_data_length: meta as i32,
body_length: data as i64,
};
self.dictionary_blocks.push(block);
self.block_offsets += meta + data;
}
let (meta, data) = write_message(&mut self.writer, encoded_message)?;
let block = arrow_format::ipc::Block {
offset: self.block_offsets as i64,
meta_data_length: meta as i32, body_length: data as i64,
};
self.record_blocks.push(block);
self.block_offsets += meta + data;
Ok(())
}
pub fn finish(&mut self) -> Result<()> {
write_continuation(&mut self.writer, 0)?;
let schema = schema::serialize_schema(&self.schema, &self.ipc_fields);
let root = arrow_format::ipc::Footer {
version: arrow_format::ipc::MetadataVersion::V5,
schema: Some(Box::new(schema)),
dictionaries: Some(std::mem::take(&mut self.dictionary_blocks)),
record_batches: Some(std::mem::take(&mut self.record_blocks)),
custom_metadata: None,
};
let mut builder = Builder::new();
let footer_data = builder.finish(&root, None);
self.writer.write_all(footer_data)?;
self.writer
.write_all(&(footer_data.len() as i32).to_le_bytes())?;
self.writer.write_all(&ARROW_MAGIC)?;
self.writer.flush()?;
self.finished = true;
Ok(())
}
}