use std::io::Write;
use super::common::{encoded_batch, DictionaryTracker, EncodedData, WriteOptions};
use super::common_sync::{write_continuation, write_message};
use super::schema_to_bytes;
use crate::datatypes::*;
use crate::error::{ArrowError, Result};
use crate::record_batch::RecordBatch;
pub struct StreamWriter<W: Write> {
writer: W,
write_options: WriteOptions,
finished: bool,
dictionary_tracker: DictionaryTracker,
}
impl<W: Write> StreamWriter<W> {
pub fn try_new(mut writer: W, schema: &Schema, write_options: WriteOptions) -> Result<Self> {
let encoded_message = EncodedData {
ipc_message: schema_to_bytes(schema),
arrow_data: vec![],
};
write_message(&mut writer, encoded_message)?;
Ok(Self {
writer,
write_options,
finished: false,
dictionary_tracker: DictionaryTracker::new(false),
})
}
pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
if self.finished {
return Err(ArrowError::Io(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"Cannot write to a finished stream".to_string(),
)));
}
let (encoded_dictionaries, encoded_message) =
encoded_batch(batch, &mut self.dictionary_tracker, &self.write_options)?;
for encoded_dictionary in encoded_dictionaries {
write_message(&mut self.writer, encoded_dictionary)?;
}
write_message(&mut self.writer, encoded_message)?;
Ok(())
}
pub fn finish(&mut self) -> Result<()> {
write_continuation(&mut self.writer, 0)?;
self.finished = true;
Ok(())
}
pub fn into_inner(self) -> W {
self.writer
}
}