polars_arrow/io/ipc/read/
stream.rs1use std::io::{Read, Seek};
2
3use arrow_format::ipc::planus::ReadAsRoot;
4use polars_error::{PolarsError, PolarsResult, polars_bail, polars_err};
5use polars_utils::bool::UnsafeBool;
6
7use super::super::CONTINUATION_MARKER;
8use super::common::*;
9use super::schema::deserialize_stream_metadata;
10use super::{Dictionaries, OutOfSpecKind};
11use crate::array::Array;
12use crate::datatypes::{ArrowSchema, Metadata};
13use crate::io::ipc::IpcSchema;
14use crate::record_batch::RecordBatchT;
15
16#[derive(Debug, Clone)]
18pub struct StreamMetadata {
19 pub schema: ArrowSchema,
21
22 pub custom_schema_metadata: Option<Metadata>,
24
25 pub version: arrow_format::ipc::MetadataVersion,
27
28 pub ipc_schema: IpcSchema,
30}
31
32pub fn read_stream_metadata(reader: &mut dyn std::io::Read) -> PolarsResult<StreamMetadata> {
34 let mut meta_size: [u8; 4] = [0; 4];
36 reader.read_exact(&mut meta_size)?;
37 let meta_length = {
38 if meta_size == CONTINUATION_MARKER {
41 reader.read_exact(&mut meta_size)?;
42 }
43 i32::from_le_bytes(meta_size)
44 };
45
46 let length: usize = meta_length
47 .try_into()
48 .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
49
50 let mut buffer = vec![];
51 buffer.try_reserve(length)?;
52 reader.take(length as u64).read_to_end(&mut buffer)?;
53
54 deserialize_stream_metadata(&buffer)
55}
56
57pub enum StreamState {
67 Waiting,
69 Some(RecordBatchT<Box<dyn Array>>),
71}
72
73impl StreamState {
74 pub fn unwrap(self) -> RecordBatchT<Box<dyn Array>> {
80 if let StreamState::Some(batch) = self {
81 batch
82 } else {
83 panic!("The batch is not available")
84 }
85 }
86}
87
88fn read_next<R: Read + Seek>(
91 reader: &mut R,
92 metadata: &StreamMetadata,
93 dictionaries: &mut Dictionaries,
94 message_buffer: &mut Vec<u8>,
95 projection: &Option<ProjectionInfo>,
96 scratch: &mut Vec<u8>,
97 checked: UnsafeBool,
98) -> PolarsResult<Option<StreamState>> {
99 let mut meta_length: [u8; 4] = [0; 4];
101
102 match reader.read_exact(&mut meta_length) {
103 Ok(()) => (),
104 Err(e) => {
105 return if e.kind() == std::io::ErrorKind::UnexpectedEof {
106 Ok(Some(StreamState::Waiting))
110 } else {
111 Err(PolarsError::from(e))
112 };
113 },
114 }
115
116 let meta_length = {
117 if meta_length == CONTINUATION_MARKER {
120 reader.read_exact(&mut meta_length)?;
121 }
122 i32::from_le_bytes(meta_length)
123 };
124
125 let meta_length: usize = meta_length
126 .try_into()
127 .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
128
129 if meta_length == 0 {
130 return Ok(None);
132 }
133
134 message_buffer.clear();
135 message_buffer.try_reserve(meta_length)?;
136 reader
137 .by_ref()
138 .take(meta_length as u64)
139 .read_to_end(message_buffer)?;
140
141 let message = arrow_format::ipc::MessageRef::read_as_root(message_buffer.as_ref())
142 .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferMessage(err)))?;
143
144 let header = message
145 .header()
146 .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferHeader(err)))?
147 .ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingMessageHeader))?;
148
149 let block_length: usize = message
150 .body_length()
151 .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferBodyLength(err)))?
152 .try_into()
153 .map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;
154
155 match header {
156 arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => {
157 let cur_pos = reader.stream_position()?;
158
159 let chunk = read_record_batch(
160 batch,
161 &metadata.schema,
162 &metadata.ipc_schema,
163 projection.as_ref().map(|x| x.columns.as_ref()),
164 None,
165 dictionaries,
166 metadata.version,
167 &mut (&mut *reader).take(block_length as u64),
168 0,
169 scratch,
170 checked,
171 );
172
173 let new_pos = reader.stream_position()?;
174 let read_size = new_pos - cur_pos;
175
176 reader.seek(std::io::SeekFrom::Current(
177 block_length as i64 - read_size as i64,
178 ))?;
179
180 if let Some(ProjectionInfo { map, .. }) = projection {
181 chunk
183 .map(|chunk| apply_projection(chunk, map))
184 .map(|x| Some(StreamState::Some(x)))
185 } else {
186 chunk.map(|x| Some(StreamState::Some(x)))
187 }
188 },
189 arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => {
190 let cur_pos = reader.stream_position()?;
191
192 read_dictionary(
193 batch,
194 &metadata.schema,
195 &metadata.ipc_schema,
196 dictionaries,
197 &mut (&mut *reader).take(block_length as u64),
198 0,
199 scratch,
200 checked,
201 )?;
202
203 let new_pos = reader.stream_position()?;
204 let read_size = new_pos - cur_pos;
205
206 reader.seek(std::io::SeekFrom::Current(
207 block_length as i64 - read_size as i64,
208 ))?;
209
210 read_next(
212 reader,
213 metadata,
214 dictionaries,
215 message_buffer,
216 projection,
217 scratch,
218 checked,
219 )
220 },
221 _ => polars_bail!(oos = OutOfSpecKind::UnexpectedMessageType),
222 }
223}
224
225pub struct StreamReader<R: Read> {
232 reader: R,
233 metadata: StreamMetadata,
234 dictionaries: Dictionaries,
235 finished: bool,
236 message_buffer: Vec<u8>,
237 projection: Option<ProjectionInfo>,
238 scratch: Vec<u8>,
239 checked: UnsafeBool,
240}
241
242impl<R: Read + Seek> StreamReader<R> {
243 pub fn new(reader: R, metadata: StreamMetadata, projection: Option<Vec<usize>>) -> Self {
249 let projection =
250 projection.map(|projection| prepare_projection(&metadata.schema, projection));
251
252 Self {
253 reader,
254 metadata,
255 dictionaries: Default::default(),
256 finished: false,
257 message_buffer: Default::default(),
258 projection,
259 scratch: Default::default(),
260 checked: UnsafeBool::default(),
261 }
262 }
263
264 pub unsafe fn unchecked(mut self) -> Self {
268 unsafe {
269 self.checked = UnsafeBool::new_false();
270 }
271 self
272 }
273
274 pub fn metadata(&self) -> &StreamMetadata {
276 &self.metadata
277 }
278
279 pub fn schema(&self) -> &ArrowSchema {
281 self.projection
282 .as_ref()
283 .map(|x| &x.schema)
284 .unwrap_or(&self.metadata.schema)
285 }
286
287 pub fn is_finished(&self) -> bool {
289 self.finished
290 }
291
292 fn maybe_next(&mut self) -> PolarsResult<Option<StreamState>> {
293 if self.finished {
294 return Ok(None);
295 }
296 let batch = read_next(
297 &mut self.reader,
298 &self.metadata,
299 &mut self.dictionaries,
300 &mut self.message_buffer,
301 &self.projection,
302 &mut self.scratch,
303 self.checked,
304 )?;
305 if batch.is_none() {
306 self.finished = true;
307 }
308 Ok(batch)
309 }
310}
311
312impl<R: Read + Seek> Iterator for StreamReader<R> {
313 type Item = PolarsResult<StreamState>;
314
315 fn next(&mut self) -> Option<Self::Item> {
316 self.maybe_next().transpose()
317 }
318}