arrow2/io/ipc/write/
stream.rs

1//! Arrow IPC File and Stream Writers
2//!
3//! The `FileWriter` and `StreamWriter` have similar interfaces,
4//! however the `FileWriter` expects a reader that supports `Seek`ing
5
6use std::io::Write;
7
8use super::super::IpcField;
9use super::common::{encode_chunk, DictionaryTracker, EncodedData, WriteOptions};
10use super::common_sync::{write_continuation, write_message};
11use super::{default_ipc_fields, schema_to_bytes};
12
13use crate::array::Array;
14use crate::chunk::Chunk;
15use crate::datatypes::*;
16use crate::error::{Error, Result};
17
18/// Arrow stream writer
19///
20/// The data written by this writer must be read in order. To signal that no more
21/// data is arriving through the stream call [`self.finish()`](StreamWriter::finish);
22///
23/// For a usage walkthrough consult [this example](https://github.com/jorgecarleitao/arrow2/tree/main/examples/ipc_pyarrow).
24pub struct StreamWriter<W: Write> {
25    /// The object to write to
26    writer: W,
27    /// IPC write options
28    write_options: WriteOptions,
29    /// Whether the stream has been finished
30    finished: bool,
31    /// Keeps track of dictionaries that have been written
32    dictionary_tracker: DictionaryTracker,
33
34    ipc_fields: Option<Vec<IpcField>>,
35}
36
37impl<W: Write> StreamWriter<W> {
38    /// Creates a new [`StreamWriter`]
39    pub fn new(writer: W, write_options: WriteOptions) -> Self {
40        Self {
41            writer,
42            write_options,
43            finished: false,
44            dictionary_tracker: DictionaryTracker {
45                dictionaries: Default::default(),
46                cannot_replace: false,
47            },
48            ipc_fields: None,
49        }
50    }
51
52    /// Starts the stream by writing a Schema message to it.
53    /// Use `ipc_fields` to declare dictionary ids in the schema, for dictionary-reuse
54    pub fn start(&mut self, schema: &Schema, ipc_fields: Option<Vec<IpcField>>) -> Result<()> {
55        self.ipc_fields = Some(if let Some(ipc_fields) = ipc_fields {
56            ipc_fields
57        } else {
58            default_ipc_fields(&schema.fields)
59        });
60
61        let encoded_message = EncodedData {
62            ipc_message: schema_to_bytes(schema, self.ipc_fields.as_ref().unwrap()),
63            arrow_data: vec![],
64        };
65        write_message(&mut self.writer, &encoded_message)?;
66        Ok(())
67    }
68
69    /// Writes [`Chunk`] to the stream
70    pub fn write(
71        &mut self,
72        columns: &Chunk<Box<dyn Array>>,
73        ipc_fields: Option<&[IpcField]>,
74    ) -> Result<()> {
75        if self.finished {
76            return Err(Error::Io(std::io::Error::new(
77                std::io::ErrorKind::UnexpectedEof,
78                "Cannot write to a finished stream".to_string(),
79            )));
80        }
81
82        // we can't make it a closure because it borrows (and it can't borrow mut and non-mut below)
83        #[allow(clippy::or_fun_call)]
84        let fields = ipc_fields.unwrap_or(self.ipc_fields.as_ref().unwrap());
85
86        let (encoded_dictionaries, encoded_message) = encode_chunk(
87            columns,
88            fields,
89            &mut self.dictionary_tracker,
90            &self.write_options,
91        )?;
92
93        for encoded_dictionary in encoded_dictionaries {
94            write_message(&mut self.writer, &encoded_dictionary)?;
95        }
96
97        write_message(&mut self.writer, &encoded_message)?;
98        Ok(())
99    }
100
101    /// Write continuation bytes, and mark the stream as done
102    pub fn finish(&mut self) -> Result<()> {
103        write_continuation(&mut self.writer, 0)?;
104
105        self.finished = true;
106
107        Ok(())
108    }
109
110    /// Consumes itself, returning the inner writer.
111    pub fn into_inner(self) -> W {
112        self.writer
113    }
114}