use futures::AsyncWrite;
pub use super::common::WriteOptions;
use super::common::{encoded_batch, DictionaryTracker, EncodedData};
use super::common_async::{write_continuation, write_message};
use super::schema_to_bytes;
use crate::datatypes::*;
use crate::error::{ArrowError, Result};
use crate::record_batch::RecordBatch;
pub struct StreamWriter<W: AsyncWrite + Unpin + Send> {
writer: W,
write_options: WriteOptions,
finished: bool,
dictionary_tracker: DictionaryTracker,
}
impl<W: AsyncWrite + Unpin + Send> StreamWriter<W> {
pub fn new(writer: W, write_options: WriteOptions) -> Self {
Self {
writer,
write_options,
finished: false,
dictionary_tracker: DictionaryTracker::new(false),
}
}
pub async fn start(&mut self, schema: &Schema) -> Result<()> {
let encoded_message = EncodedData {
ipc_message: schema_to_bytes(schema),
arrow_data: vec![],
};
write_message(&mut self.writer, encoded_message).await?;
Ok(())
}
pub async fn write(&mut self, batch: &RecordBatch) -> Result<()> {
if self.finished {
return Err(ArrowError::Io(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"Cannot write to a finished stream".to_string(),
)));
}
let (encoded_dictionaries, encoded_message) =
encoded_batch(batch, &mut self.dictionary_tracker, &self.write_options)?;
for encoded_dictionary in encoded_dictionaries {
write_message(&mut self.writer, encoded_dictionary).await?;
}
write_message(&mut self.writer, encoded_message).await?;
Ok(())
}
pub async fn finish(&mut self) -> Result<()> {
write_continuation(&mut self.writer, 0).await?;
self.finished = true;
Ok(())
}
pub fn into_inner(self) -> W {
self.writer
}
}