arrow2/io/ipc/write/
stream.rs1use std::io::Write;
7
8use super::super::IpcField;
9use super::common::{encode_chunk, DictionaryTracker, EncodedData, WriteOptions};
10use super::common_sync::{write_continuation, write_message};
11use super::{default_ipc_fields, schema_to_bytes};
12
13use crate::array::Array;
14use crate::chunk::Chunk;
15use crate::datatypes::*;
16use crate::error::{Error, Result};
17
18pub struct StreamWriter<W: Write> {
25 writer: W,
27 write_options: WriteOptions,
29 finished: bool,
31 dictionary_tracker: DictionaryTracker,
33
34 ipc_fields: Option<Vec<IpcField>>,
35}
36
37impl<W: Write> StreamWriter<W> {
38 pub fn new(writer: W, write_options: WriteOptions) -> Self {
40 Self {
41 writer,
42 write_options,
43 finished: false,
44 dictionary_tracker: DictionaryTracker {
45 dictionaries: Default::default(),
46 cannot_replace: false,
47 },
48 ipc_fields: None,
49 }
50 }
51
52 pub fn start(&mut self, schema: &Schema, ipc_fields: Option<Vec<IpcField>>) -> Result<()> {
55 self.ipc_fields = Some(if let Some(ipc_fields) = ipc_fields {
56 ipc_fields
57 } else {
58 default_ipc_fields(&schema.fields)
59 });
60
61 let encoded_message = EncodedData {
62 ipc_message: schema_to_bytes(schema, self.ipc_fields.as_ref().unwrap()),
63 arrow_data: vec![],
64 };
65 write_message(&mut self.writer, &encoded_message)?;
66 Ok(())
67 }
68
69 pub fn write(
71 &mut self,
72 columns: &Chunk<Box<dyn Array>>,
73 ipc_fields: Option<&[IpcField]>,
74 ) -> Result<()> {
75 if self.finished {
76 return Err(Error::Io(std::io::Error::new(
77 std::io::ErrorKind::UnexpectedEof,
78 "Cannot write to a finished stream".to_string(),
79 )));
80 }
81
82 #[allow(clippy::or_fun_call)]
84 let fields = ipc_fields.unwrap_or(self.ipc_fields.as_ref().unwrap());
85
86 let (encoded_dictionaries, encoded_message) = encode_chunk(
87 columns,
88 fields,
89 &mut self.dictionary_tracker,
90 &self.write_options,
91 )?;
92
93 for encoded_dictionary in encoded_dictionaries {
94 write_message(&mut self.writer, &encoded_dictionary)?;
95 }
96
97 write_message(&mut self.writer, &encoded_message)?;
98 Ok(())
99 }
100
101 pub fn finish(&mut self) -> Result<()> {
103 write_continuation(&mut self.writer, 0)?;
104
105 self.finished = true;
106
107 Ok(())
108 }
109
110 pub fn into_inner(self) -> W {
112 self.writer
113 }
114}