polars_arrow/io/ipc/read/
stream.rs1use std::io::Read;
2
3use arrow_format::ipc::planus::ReadAsRoot;
4use polars_error::{PolarsError, PolarsResult, polars_bail, polars_err};
5
6use super::super::CONTINUATION_MARKER;
7use super::common::*;
8use super::schema::deserialize_stream_metadata;
9use super::{Dictionaries, OutOfSpecKind};
10use crate::array::Array;
11use crate::datatypes::{ArrowSchema, Metadata};
12use crate::io::ipc::IpcSchema;
13use crate::record_batch::RecordBatchT;
14
15#[derive(Debug, Clone)]
17pub struct StreamMetadata {
18 pub schema: ArrowSchema,
20
21 pub custom_schema_metadata: Option<Metadata>,
23
24 pub version: arrow_format::ipc::MetadataVersion,
26
27 pub ipc_schema: IpcSchema,
29}
30
31pub fn read_stream_metadata(reader: &mut dyn std::io::Read) -> PolarsResult<StreamMetadata> {
33 let mut meta_size: [u8; 4] = [0; 4];
35 reader.read_exact(&mut meta_size)?;
36 let meta_length = {
37 if meta_size == CONTINUATION_MARKER {
40 reader.read_exact(&mut meta_size)?;
41 }
42 i32::from_le_bytes(meta_size)
43 };
44
45 let length: usize = meta_length
46 .try_into()
47 .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
48
49 let mut buffer = vec![];
50 buffer.try_reserve(length)?;
51 reader.take(length as u64).read_to_end(&mut buffer)?;
52
53 deserialize_stream_metadata(&buffer)
54}
55
56pub enum StreamState {
66 Waiting,
68 Some(RecordBatchT<Box<dyn Array>>),
70}
71
72impl StreamState {
73 pub fn unwrap(self) -> RecordBatchT<Box<dyn Array>> {
79 if let StreamState::Some(batch) = self {
80 batch
81 } else {
82 panic!("The batch is not available")
83 }
84 }
85}
86
87fn read_next<R: Read>(
90 reader: &mut R,
91 metadata: &StreamMetadata,
92 dictionaries: &mut Dictionaries,
93 message_buffer: &mut Vec<u8>,
94 data_buffer: &mut Vec<u8>,
95 projection: &Option<ProjectionInfo>,
96 scratch: &mut Vec<u8>,
97) -> PolarsResult<Option<StreamState>> {
98 let mut meta_length: [u8; 4] = [0; 4];
100
101 match reader.read_exact(&mut meta_length) {
102 Ok(()) => (),
103 Err(e) => {
104 return if e.kind() == std::io::ErrorKind::UnexpectedEof {
105 Ok(Some(StreamState::Waiting))
109 } else {
110 Err(PolarsError::from(e))
111 };
112 },
113 }
114
115 let meta_length = {
116 if meta_length == CONTINUATION_MARKER {
119 reader.read_exact(&mut meta_length)?;
120 }
121 i32::from_le_bytes(meta_length)
122 };
123
124 let meta_length: usize = meta_length
125 .try_into()
126 .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
127
128 if meta_length == 0 {
129 return Ok(None);
131 }
132
133 message_buffer.clear();
134 message_buffer.try_reserve(meta_length)?;
135 reader
136 .by_ref()
137 .take(meta_length as u64)
138 .read_to_end(message_buffer)?;
139
140 let message = arrow_format::ipc::MessageRef::read_as_root(message_buffer.as_ref())
141 .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferMessage(err)))?;
142
143 let header = message
144 .header()
145 .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferHeader(err)))?
146 .ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingMessageHeader))?;
147
148 let block_length: usize = message
149 .body_length()
150 .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferBodyLength(err)))?
151 .try_into()
152 .map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;
153
154 match header {
155 arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => {
156 data_buffer.clear();
157 data_buffer.try_reserve(block_length)?;
158 reader
159 .by_ref()
160 .take(block_length as u64)
161 .read_to_end(data_buffer)?;
162
163 let file_size = data_buffer.len() as u64;
164
165 let mut reader = std::io::Cursor::new(data_buffer);
166
167 let chunk = read_record_batch(
168 batch,
169 &metadata.schema,
170 &metadata.ipc_schema,
171 projection.as_ref().map(|x| x.columns.as_ref()),
172 None,
173 dictionaries,
174 metadata.version,
175 &mut reader,
176 0,
177 file_size,
178 scratch,
179 );
180
181 if let Some(ProjectionInfo { map, .. }) = projection {
182 chunk
184 .map(|chunk| apply_projection(chunk, map))
185 .map(|x| Some(StreamState::Some(x)))
186 } else {
187 chunk.map(|x| Some(StreamState::Some(x)))
188 }
189 },
190 arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => {
191 data_buffer.clear();
192 data_buffer.try_reserve(block_length)?;
193 reader
194 .by_ref()
195 .take(block_length as u64)
196 .read_to_end(data_buffer)?;
197
198 let file_size = data_buffer.len() as u64;
199 let mut dict_reader = std::io::Cursor::new(&data_buffer);
200
201 read_dictionary(
202 batch,
203 &metadata.schema,
204 &metadata.ipc_schema,
205 dictionaries,
206 &mut dict_reader,
207 0,
208 file_size,
209 scratch,
210 )?;
211
212 read_next(
214 reader,
215 metadata,
216 dictionaries,
217 message_buffer,
218 data_buffer,
219 projection,
220 scratch,
221 )
222 },
223 _ => polars_bail!(oos = OutOfSpecKind::UnexpectedMessageType),
224 }
225}
226
227pub struct StreamReader<R: Read> {
234 reader: R,
235 metadata: StreamMetadata,
236 dictionaries: Dictionaries,
237 finished: bool,
238 data_buffer: Vec<u8>,
239 message_buffer: Vec<u8>,
240 projection: Option<ProjectionInfo>,
241 scratch: Vec<u8>,
242}
243
244impl<R: Read> StreamReader<R> {
245 pub fn new(reader: R, metadata: StreamMetadata, projection: Option<Vec<usize>>) -> Self {
251 let projection =
252 projection.map(|projection| prepare_projection(&metadata.schema, projection));
253
254 Self {
255 reader,
256 metadata,
257 dictionaries: Default::default(),
258 finished: false,
259 data_buffer: Default::default(),
260 message_buffer: Default::default(),
261 projection,
262 scratch: Default::default(),
263 }
264 }
265
266 pub fn metadata(&self) -> &StreamMetadata {
268 &self.metadata
269 }
270
271 pub fn schema(&self) -> &ArrowSchema {
273 self.projection
274 .as_ref()
275 .map(|x| &x.schema)
276 .unwrap_or(&self.metadata.schema)
277 }
278
279 pub fn is_finished(&self) -> bool {
281 self.finished
282 }
283
284 fn maybe_next(&mut self) -> PolarsResult<Option<StreamState>> {
285 if self.finished {
286 return Ok(None);
287 }
288 let batch = read_next(
289 &mut self.reader,
290 &self.metadata,
291 &mut self.dictionaries,
292 &mut self.message_buffer,
293 &mut self.data_buffer,
294 &self.projection,
295 &mut self.scratch,
296 )?;
297 if batch.is_none() {
298 self.finished = true;
299 }
300 Ok(batch)
301 }
302}
303
304impl<R: Read> Iterator for StreamReader<R> {
305 type Item = PolarsResult<StreamState>;
306
307 fn next(&mut self) -> Option<Self::Item> {
308 self.maybe_next().transpose()
309 }
310}