use std::io::{BufWriter, Write};
use super::common::{
encoded_batch, write_continuation, write_message, DictionaryTracker, EncodedData,
IpcWriteOptions,
};
use super::schema_to_bytes;
use crate::datatypes::*;
use crate::error::{ArrowError, Result};
use crate::record_batch::RecordBatch;
pub struct StreamWriter<W: Write> {
writer: BufWriter<W>,
write_options: IpcWriteOptions,
finished: bool,
dictionary_tracker: DictionaryTracker,
}
impl<W: Write> StreamWriter<W> {
pub fn try_new(writer: W, schema: &Schema) -> Result<Self> {
let write_options = IpcWriteOptions::default();
Self::try_new_with_options(writer, schema, write_options)
}
pub fn try_new_with_options(
writer: W,
schema: &Schema,
write_options: IpcWriteOptions,
) -> Result<Self> {
let mut writer = BufWriter::new(writer);
let encoded_message = EncodedData {
ipc_message: schema_to_bytes(schema, *write_options.metadata_version()),
arrow_data: vec![],
};
write_message(&mut writer, encoded_message, &write_options)?;
Ok(Self {
writer,
write_options,
finished: false,
dictionary_tracker: DictionaryTracker::new(false),
})
}
pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
if self.finished {
return Err(ArrowError::Ipc(
"Cannot write record batch to stream writer as it is closed".to_string(),
));
}
let (encoded_dictionaries, encoded_message) =
encoded_batch(batch, &mut self.dictionary_tracker, &self.write_options)
.expect("StreamWriter is configured to not error on dictionary replacement");
for encoded_dictionary in encoded_dictionaries {
write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
}
write_message(&mut self.writer, encoded_message, &self.write_options)?;
Ok(())
}
pub fn finish(&mut self) -> Result<()> {
write_continuation(&mut self.writer, &self.write_options, 0)?;
self.finished = true;
Ok(())
}
}
impl<W: Write> Drop for StreamWriter<W> {
fn drop(&mut self) {
if !self.finished {
self.finish().unwrap();
}
}
}