1use ahash::AHashMap;
2use std::collections::VecDeque;
3use std::io::{Read, Seek};
4
5use arrow_format;
6
7use crate::array::*;
8use crate::chunk::Chunk;
9use crate::datatypes::{DataType, Field};
10use crate::error::{Error, Result};
11use crate::io::ipc::read::OutOfSpecKind;
12use crate::io::ipc::{IpcField, IpcSchema};
13
14use super::deserialize::{read, skip};
15use super::Dictionaries;
16
17#[derive(Debug, Eq, PartialEq, Hash)]
18enum ProjectionResult<A> {
19 Selected(A),
20 NotSelected(A),
21}
22
23struct ProjectionIter<'a, A, I: Iterator<Item = A>> {
27 projection: &'a [usize],
28 iter: I,
29 current_count: usize,
30 current_projection: usize,
31}
32
33impl<'a, A, I: Iterator<Item = A>> ProjectionIter<'a, A, I> {
34 pub fn new(projection: &'a [usize], iter: I) -> Self {
37 Self {
38 projection: &projection[1..],
39 iter,
40 current_count: 0,
41 current_projection: projection[0],
42 }
43 }
44}
45
46impl<'a, A, I: Iterator<Item = A>> Iterator for ProjectionIter<'a, A, I> {
47 type Item = ProjectionResult<A>;
48
49 fn next(&mut self) -> Option<Self::Item> {
50 if let Some(item) = self.iter.next() {
51 let result = if self.current_count == self.current_projection {
52 if !self.projection.is_empty() {
53 assert!(self.projection[0] > self.current_projection);
54 self.current_projection = self.projection[0];
55 self.projection = &self.projection[1..];
56 } else {
57 self.current_projection = 0 };
59 Some(ProjectionResult::Selected(item))
60 } else {
61 Some(ProjectionResult::NotSelected(item))
62 };
63 self.current_count += 1;
64 result
65 } else {
66 None
67 }
68 }
69
70 fn size_hint(&self) -> (usize, Option<usize>) {
71 self.iter.size_hint()
72 }
73}
74
75#[allow(clippy::too_many_arguments)]
79pub fn read_record_batch<R: Read + Seek>(
80 batch: arrow_format::ipc::RecordBatchRef,
81 fields: &[Field],
82 ipc_schema: &IpcSchema,
83 projection: Option<&[usize]>,
84 limit: Option<usize>,
85 dictionaries: &Dictionaries,
86 version: arrow_format::ipc::MetadataVersion,
87 reader: &mut R,
88 block_offset: u64,
89 file_size: u64,
90 scratch: &mut Vec<u8>,
91) -> Result<Chunk<Box<dyn Array>>> {
92 assert_eq!(fields.len(), ipc_schema.fields.len());
93 let buffers = batch
94 .buffers()
95 .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferBuffers(err)))?
96 .ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageBuffers))?;
97 let mut buffers: VecDeque<arrow_format::ipc::BufferRef> = buffers.iter().collect();
98
99 let buffers_size = buffers
101 .iter()
102 .map(|buffer| {
103 let buffer_size: u64 = buffer
104 .length()
105 .try_into()
106 .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;
107 Ok(buffer_size)
108 })
109 .sum::<Result<u64>>()?;
110 if buffers_size > file_size {
111 return Err(Error::from(OutOfSpecKind::InvalidBuffersLength {
112 buffers_size,
113 file_size,
114 }));
115 }
116
117 let field_nodes = batch
118 .nodes()
119 .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferNodes(err)))?
120 .ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageNodes))?;
121 let mut field_nodes = field_nodes.iter().collect::<VecDeque<_>>();
122
123 let columns = if let Some(projection) = projection {
124 let projection =
125 ProjectionIter::new(projection, fields.iter().zip(ipc_schema.fields.iter()));
126
127 projection
128 .map(|maybe_field| match maybe_field {
129 ProjectionResult::Selected((field, ipc_field)) => Ok(Some(read(
130 &mut field_nodes,
131 field,
132 ipc_field,
133 &mut buffers,
134 reader,
135 dictionaries,
136 block_offset,
137 ipc_schema.is_little_endian,
138 batch.compression().map_err(|err| {
139 Error::from(OutOfSpecKind::InvalidFlatbufferCompression(err))
140 })?,
141 limit,
142 version,
143 scratch,
144 )?)),
145 ProjectionResult::NotSelected((field, _)) => {
146 skip(&mut field_nodes, &field.data_type, &mut buffers)?;
147 Ok(None)
148 }
149 })
150 .filter_map(|x| x.transpose())
151 .collect::<Result<Vec<_>>>()?
152 } else {
153 fields
154 .iter()
155 .zip(ipc_schema.fields.iter())
156 .map(|(field, ipc_field)| {
157 read(
158 &mut field_nodes,
159 field,
160 ipc_field,
161 &mut buffers,
162 reader,
163 dictionaries,
164 block_offset,
165 ipc_schema.is_little_endian,
166 batch.compression().map_err(|err| {
167 Error::from(OutOfSpecKind::InvalidFlatbufferCompression(err))
168 })?,
169 limit,
170 version,
171 scratch,
172 )
173 })
174 .collect::<Result<Vec<_>>>()?
175 };
176 Chunk::try_new(columns)
177}
178
179fn find_first_dict_field_d<'a>(
180 id: i64,
181 data_type: &'a DataType,
182 ipc_field: &'a IpcField,
183) -> Option<(&'a Field, &'a IpcField)> {
184 use DataType::*;
185 match data_type {
186 Dictionary(_, inner, _) => find_first_dict_field_d(id, inner.as_ref(), ipc_field),
187 List(field) | LargeList(field) | FixedSizeList(field, ..) | Map(field, ..) => {
188 find_first_dict_field(id, field.as_ref(), &ipc_field.fields[0])
189 }
190 Union(fields, ..) | Struct(fields) => {
191 for (field, ipc_field) in fields.iter().zip(ipc_field.fields.iter()) {
192 if let Some(f) = find_first_dict_field(id, field, ipc_field) {
193 return Some(f);
194 }
195 }
196 None
197 }
198 _ => None,
199 }
200}
201
202fn find_first_dict_field<'a>(
203 id: i64,
204 field: &'a Field,
205 ipc_field: &'a IpcField,
206) -> Option<(&'a Field, &'a IpcField)> {
207 if let Some(field_id) = ipc_field.dictionary_id {
208 if id == field_id {
209 return Some((field, ipc_field));
210 }
211 }
212 find_first_dict_field_d(id, &field.data_type, ipc_field)
213}
214
215pub(crate) fn first_dict_field<'a>(
216 id: i64,
217 fields: &'a [Field],
218 ipc_fields: &'a [IpcField],
219) -> Result<(&'a Field, &'a IpcField)> {
220 assert_eq!(fields.len(), ipc_fields.len());
221 for (field, ipc_field) in fields.iter().zip(ipc_fields.iter()) {
222 if let Some(field) = find_first_dict_field(id, field, ipc_field) {
223 return Ok(field);
224 }
225 }
226 Err(Error::from(OutOfSpecKind::InvalidId { requested_id: id }))
227}
228
229#[allow(clippy::too_many_arguments)]
232pub fn read_dictionary<R: Read + Seek>(
233 batch: arrow_format::ipc::DictionaryBatchRef,
234 fields: &[Field],
235 ipc_schema: &IpcSchema,
236 dictionaries: &mut Dictionaries,
237 reader: &mut R,
238 block_offset: u64,
239 file_size: u64,
240 scratch: &mut Vec<u8>,
241) -> Result<()> {
242 if batch
243 .is_delta()
244 .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferIsDelta(err)))?
245 {
246 return Err(Error::NotYetImplemented(
247 "delta dictionary batches not supported".to_string(),
248 ));
249 }
250
251 let id = batch
252 .id()
253 .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferId(err)))?;
254 let (first_field, first_ipc_field) = first_dict_field(id, fields, &ipc_schema.fields)?;
255
256 let batch = batch
257 .data()
258 .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferData(err)))?
259 .ok_or_else(|| Error::from(OutOfSpecKind::MissingData))?;
260
261 let value_type =
262 if let DataType::Dictionary(_, value_type, _) = first_field.data_type.to_logical_type() {
263 value_type.as_ref()
264 } else {
265 return Err(Error::from(OutOfSpecKind::InvalidIdDataType {
266 requested_id: id,
267 }));
268 };
269
270 let fields = vec![Field::new("", value_type.clone(), false)];
272 let ipc_schema = IpcSchema {
273 fields: vec![first_ipc_field.clone()],
274 is_little_endian: ipc_schema.is_little_endian,
275 };
276 let chunk = read_record_batch(
277 batch,
278 &fields,
279 &ipc_schema,
280 None,
281 None, dictionaries,
283 arrow_format::ipc::MetadataVersion::V5,
284 reader,
285 block_offset,
286 file_size,
287 scratch,
288 )?;
289
290 dictionaries.insert(id, chunk.into_arrays().pop().unwrap());
291
292 Ok(())
293}
294
295#[cfg(test)]
296mod tests {
297 use super::*;
298
299 #[test]
300 fn project_iter() {
301 let iter = 1..6;
302 let iter = ProjectionIter::new(&[0, 2, 4], iter);
303 let result: Vec<_> = iter.collect();
304 use ProjectionResult::*;
305 assert_eq!(
306 result,
307 vec![
308 Selected(1),
309 NotSelected(2),
310 Selected(3),
311 NotSelected(4),
312 Selected(5)
313 ]
314 )
315 }
316}
317
318pub fn prepare_projection(
319 fields: &[Field],
320 mut projection: Vec<usize>,
321) -> (Vec<usize>, AHashMap<usize, usize>, Vec<Field>) {
322 let fields = projection.iter().map(|x| fields[*x].clone()).collect();
323
324 let mut indices = (0..projection.len()).collect::<Vec<_>>();
326 indices.sort_unstable_by_key(|&i| &projection[i]);
327 let map = indices.iter().copied().enumerate().fold(
328 AHashMap::default(),
329 |mut acc, (index, new_index)| {
330 acc.insert(index, new_index);
331 acc
332 },
333 );
334 projection.sort_unstable();
335
336 if !projection.is_empty() {
338 let mut previous = projection[0];
339
340 for &i in &projection[1..] {
341 assert!(
342 previous < i,
343 "The projection on IPC must not contain duplicates"
344 );
345 previous = i;
346 }
347 }
348
349 (projection, map, fields)
350}
351
352pub fn apply_projection(
353 chunk: Chunk<Box<dyn Array>>,
354 map: &AHashMap<usize, usize>,
355) -> Chunk<Box<dyn Array>> {
356 let arrays = chunk.into_arrays();
358 let mut new_arrays = arrays.clone();
359
360 map.iter()
361 .for_each(|(old, new)| new_arrays[*new] = arrays[*old].clone());
362
363 Chunk::new(new_arrays)
364}