use std::io::Write;
use arrow_format::ipc;
use arrow_format::ipc::flatbuffers::FlatBufferBuilder;
use super::super::ARROW_MAGIC;
use super::{
super::convert,
common::{
encoded_batch, write_continuation, write_message, DictionaryTracker, EncodedData,
IpcWriteOptions,
},
schema_to_bytes,
};
use crate::datatypes::*;
use crate::error::{ArrowError, Result};
use crate::record_batch::RecordBatch;
pub struct FileWriter<W: Write> {
writer: W,
write_options: IpcWriteOptions,
schema: Schema,
block_offsets: usize,
dictionary_blocks: Vec<ipc::File::Block>,
record_blocks: Vec<ipc::File::Block>,
finished: bool,
dictionary_tracker: DictionaryTracker,
}
impl<W: Write> FileWriter<W> {
pub fn try_new(writer: W, schema: &Schema) -> Result<Self> {
let write_options = IpcWriteOptions::default();
Self::try_new_with_options(writer, schema, write_options)
}
pub fn try_new_with_options(
mut writer: W,
schema: &Schema,
write_options: IpcWriteOptions,
) -> Result<Self> {
writer.write_all(&ARROW_MAGIC[..])?;
writer.write_all(&[0, 0])?;
let encoded_message = EncodedData {
ipc_message: schema_to_bytes(schema, *write_options.metadata_version()),
arrow_data: vec![],
};
let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
Ok(Self {
writer,
write_options,
schema: schema.clone(),
block_offsets: meta + data + 8,
dictionary_blocks: vec![],
record_blocks: vec![],
finished: false,
dictionary_tracker: DictionaryTracker::new(true),
})
}
pub fn into_inner(self) -> W {
self.writer
}
pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
if self.finished {
return Err(ArrowError::Ipc(
"Cannot write record batch to file writer as it is closed".to_string(),
));
}
let (encoded_dictionaries, encoded_message) =
encoded_batch(batch, &mut self.dictionary_tracker, &self.write_options)?;
for encoded_dictionary in encoded_dictionaries {
let (meta, data) =
write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
let block = ipc::File::Block::new(self.block_offsets as i64, meta as i32, data as i64);
self.dictionary_blocks.push(block);
self.block_offsets += meta + data;
}
let (meta, data) = write_message(&mut self.writer, encoded_message, &self.write_options)?;
let block = ipc::File::Block::new(
self.block_offsets as i64,
meta as i32, data as i64,
);
self.record_blocks.push(block);
self.block_offsets += meta + data;
Ok(())
}
pub fn finish(&mut self) -> Result<()> {
write_continuation(&mut self.writer, &self.write_options, 0)?;
let mut fbb = FlatBufferBuilder::new();
let dictionaries = fbb.create_vector(&self.dictionary_blocks);
let record_batches = fbb.create_vector(&self.record_blocks);
let schema = convert::schema_to_fb_offset(&mut fbb, &self.schema);
let root = {
let mut footer_builder = ipc::File::FooterBuilder::new(&mut fbb);
footer_builder.add_version(*self.write_options.metadata_version());
footer_builder.add_schema(schema);
footer_builder.add_dictionaries(dictionaries);
footer_builder.add_recordBatches(record_batches);
footer_builder.finish()
};
fbb.finish(root, None);
let footer_data = fbb.finished_data();
self.writer.write_all(footer_data)?;
self.writer
.write_all(&(footer_data.len() as i32).to_le_bytes())?;
self.writer.write_all(&ARROW_MAGIC)?;
self.writer.flush()?;
self.finished = true;
Ok(())
}
}