polars_arrow/io/ipc/read/
stream.rs

1use std::io::Read;
2
3use arrow_format::ipc::planus::ReadAsRoot;
4use polars_error::{PolarsError, PolarsResult, polars_bail, polars_err};
5
6use super::super::CONTINUATION_MARKER;
7use super::common::*;
8use super::schema::deserialize_stream_metadata;
9use super::{Dictionaries, OutOfSpecKind};
10use crate::array::Array;
11use crate::datatypes::{ArrowSchema, Metadata};
12use crate::io::ipc::IpcSchema;
13use crate::record_batch::RecordBatchT;
14
15/// Metadata of an Arrow IPC stream, written at the start of the stream
16#[derive(Debug, Clone)]
17pub struct StreamMetadata {
18    /// The schema that is read from the stream's first message
19    pub schema: ArrowSchema,
20
21    /// The custom metadata that is read from the schema
22    pub custom_schema_metadata: Option<Metadata>,
23
24    /// The IPC version of the stream
25    pub version: arrow_format::ipc::MetadataVersion,
26
27    /// The IPC fields tracking dictionaries
28    pub ipc_schema: IpcSchema,
29}
30
31/// Reads the metadata of the stream
32pub fn read_stream_metadata(reader: &mut dyn std::io::Read) -> PolarsResult<StreamMetadata> {
33    // determine metadata length
34    let mut meta_size: [u8; 4] = [0; 4];
35    reader.read_exact(&mut meta_size)?;
36    let meta_length = {
37        // If a continuation marker is encountered, skip over it and read
38        // the size from the next four bytes.
39        if meta_size == CONTINUATION_MARKER {
40            reader.read_exact(&mut meta_size)?;
41        }
42        i32::from_le_bytes(meta_size)
43    };
44
45    let length: usize = meta_length
46        .try_into()
47        .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
48
49    let mut buffer = vec![];
50    buffer.try_reserve(length)?;
51    reader.take(length as u64).read_to_end(&mut buffer)?;
52
53    deserialize_stream_metadata(&buffer)
54}
55
56/// Encodes the stream's status after each read.
57///
58/// A stream is an iterator, and an iterator returns `Option<Item>`. The `Item`
59/// type in the [`StreamReader`] case is `StreamState`, which means that an Arrow
60/// stream may yield one of three values: (1) `None`, which signals that the stream
61/// is done; (2) [`StreamState::Some`], which signals that there was
62/// data waiting in the stream and we read it; and finally (3)
63/// [`Some(StreamState::Waiting)`], which means that the stream is still "live", it
64/// just doesn't hold any data right now.
65pub enum StreamState {
66    /// A live stream without data
67    Waiting,
68    /// Next item in the stream
69    Some(RecordBatchT<Box<dyn Array>>),
70}
71
72impl StreamState {
73    /// Return the data inside this wrapper.
74    ///
75    /// # Panics
76    ///
77    /// If the `StreamState` was `Waiting`.
78    pub fn unwrap(self) -> RecordBatchT<Box<dyn Array>> {
79        if let StreamState::Some(batch) = self {
80            batch
81        } else {
82            panic!("The batch is not available")
83        }
84    }
85}
86
87/// Reads the next item, yielding `None` if the stream is done,
88/// and a [`StreamState`] otherwise.
89fn read_next<R: Read>(
90    reader: &mut R,
91    metadata: &StreamMetadata,
92    dictionaries: &mut Dictionaries,
93    message_buffer: &mut Vec<u8>,
94    data_buffer: &mut Vec<u8>,
95    projection: &Option<ProjectionInfo>,
96    scratch: &mut Vec<u8>,
97) -> PolarsResult<Option<StreamState>> {
98    // determine metadata length
99    let mut meta_length: [u8; 4] = [0; 4];
100
101    match reader.read_exact(&mut meta_length) {
102        Ok(()) => (),
103        Err(e) => {
104            return if e.kind() == std::io::ErrorKind::UnexpectedEof {
105                // Handle EOF without the "0xFFFFFFFF 0x00000000"
106                // valid according to:
107                // https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
108                Ok(Some(StreamState::Waiting))
109            } else {
110                Err(PolarsError::from(e))
111            };
112        },
113    }
114
115    let meta_length = {
116        // If a continuation marker is encountered, skip over it and read
117        // the size from the next four bytes.
118        if meta_length == CONTINUATION_MARKER {
119            reader.read_exact(&mut meta_length)?;
120        }
121        i32::from_le_bytes(meta_length)
122    };
123
124    let meta_length: usize = meta_length
125        .try_into()
126        .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
127
128    if meta_length == 0 {
129        // the stream has ended, mark the reader as finished
130        return Ok(None);
131    }
132
133    message_buffer.clear();
134    message_buffer.try_reserve(meta_length)?;
135    reader
136        .by_ref()
137        .take(meta_length as u64)
138        .read_to_end(message_buffer)?;
139
140    let message = arrow_format::ipc::MessageRef::read_as_root(message_buffer.as_ref())
141        .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferMessage(err)))?;
142
143    let header = message
144        .header()
145        .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferHeader(err)))?
146        .ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingMessageHeader))?;
147
148    let block_length: usize = message
149        .body_length()
150        .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferBodyLength(err)))?
151        .try_into()
152        .map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;
153
154    match header {
155        arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => {
156            data_buffer.clear();
157            data_buffer.try_reserve(block_length)?;
158            reader
159                .by_ref()
160                .take(block_length as u64)
161                .read_to_end(data_buffer)?;
162
163            let file_size = data_buffer.len() as u64;
164
165            let mut reader = std::io::Cursor::new(data_buffer);
166
167            let chunk = read_record_batch(
168                batch,
169                &metadata.schema,
170                &metadata.ipc_schema,
171                projection.as_ref().map(|x| x.columns.as_ref()),
172                None,
173                dictionaries,
174                metadata.version,
175                &mut reader,
176                0,
177                file_size,
178                scratch,
179            );
180
181            if let Some(ProjectionInfo { map, .. }) = projection {
182                // re-order according to projection
183                chunk
184                    .map(|chunk| apply_projection(chunk, map))
185                    .map(|x| Some(StreamState::Some(x)))
186            } else {
187                chunk.map(|x| Some(StreamState::Some(x)))
188            }
189        },
190        arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => {
191            data_buffer.clear();
192            data_buffer.try_reserve(block_length)?;
193            reader
194                .by_ref()
195                .take(block_length as u64)
196                .read_to_end(data_buffer)?;
197
198            let file_size = data_buffer.len() as u64;
199            let mut dict_reader = std::io::Cursor::new(&data_buffer);
200
201            read_dictionary(
202                batch,
203                &metadata.schema,
204                &metadata.ipc_schema,
205                dictionaries,
206                &mut dict_reader,
207                0,
208                file_size,
209                scratch,
210            )?;
211
212            // read the next message until we encounter a RecordBatch message
213            read_next(
214                reader,
215                metadata,
216                dictionaries,
217                message_buffer,
218                data_buffer,
219                projection,
220                scratch,
221            )
222        },
223        _ => polars_bail!(oos = OutOfSpecKind::UnexpectedMessageType),
224    }
225}
226
227/// Arrow Stream reader.
228///
229/// An [`Iterator`] over an Arrow stream that yields a result of [`StreamState`]s.
230/// This is the recommended way to read an arrow stream (by iterating over its data).
231///
232/// For a more thorough walkthrough consult [this example](https://github.com/jorgecarleitao/polars_arrow/tree/main/examples/ipc_pyarrow).
233pub struct StreamReader<R: Read> {
234    reader: R,
235    metadata: StreamMetadata,
236    dictionaries: Dictionaries,
237    finished: bool,
238    data_buffer: Vec<u8>,
239    message_buffer: Vec<u8>,
240    projection: Option<ProjectionInfo>,
241    scratch: Vec<u8>,
242}
243
244impl<R: Read> StreamReader<R> {
245    /// Try to create a new stream reader
246    ///
247    /// The first message in the stream is the schema, the reader will fail if it does not
248    /// encounter a schema.
249    /// To check if the reader is done, use `is_finished(self)`
250    pub fn new(reader: R, metadata: StreamMetadata, projection: Option<Vec<usize>>) -> Self {
251        let projection =
252            projection.map(|projection| prepare_projection(&metadata.schema, projection));
253
254        Self {
255            reader,
256            metadata,
257            dictionaries: Default::default(),
258            finished: false,
259            data_buffer: Default::default(),
260            message_buffer: Default::default(),
261            projection,
262            scratch: Default::default(),
263        }
264    }
265
266    /// Return the schema of the stream
267    pub fn metadata(&self) -> &StreamMetadata {
268        &self.metadata
269    }
270
271    /// Return the schema of the file
272    pub fn schema(&self) -> &ArrowSchema {
273        self.projection
274            .as_ref()
275            .map(|x| &x.schema)
276            .unwrap_or(&self.metadata.schema)
277    }
278
279    /// Check if the stream is finished
280    pub fn is_finished(&self) -> bool {
281        self.finished
282    }
283
284    fn maybe_next(&mut self) -> PolarsResult<Option<StreamState>> {
285        if self.finished {
286            return Ok(None);
287        }
288        let batch = read_next(
289            &mut self.reader,
290            &self.metadata,
291            &mut self.dictionaries,
292            &mut self.message_buffer,
293            &mut self.data_buffer,
294            &self.projection,
295            &mut self.scratch,
296        )?;
297        if batch.is_none() {
298            self.finished = true;
299        }
300        Ok(batch)
301    }
302}
303
304impl<R: Read> Iterator for StreamReader<R> {
305    type Item = PolarsResult<StreamState>;
306
307    fn next(&mut self) -> Option<Self::Item> {
308        self.maybe_next().transpose()
309    }
310}