arrow2/io/ipc/read/
common.rs

1use ahash::AHashMap;
2use std::collections::VecDeque;
3use std::io::{Read, Seek};
4
5use arrow_format;
6
7use crate::array::*;
8use crate::chunk::Chunk;
9use crate::datatypes::{DataType, Field};
10use crate::error::{Error, Result};
11use crate::io::ipc::read::OutOfSpecKind;
12use crate::io::ipc::{IpcField, IpcSchema};
13
14use super::deserialize::{read, skip};
15use super::Dictionaries;
16
17#[derive(Debug, Eq, PartialEq, Hash)]
18enum ProjectionResult<A> {
19    Selected(A),
20    NotSelected(A),
21}
22
23/// An iterator adapter that will return `Some(x)` or `None`
24/// # Panics
25/// The iterator panics iff the `projection` is not strictly increasing.
26struct ProjectionIter<'a, A, I: Iterator<Item = A>> {
27    projection: &'a [usize],
28    iter: I,
29    current_count: usize,
30    current_projection: usize,
31}
32
33impl<'a, A, I: Iterator<Item = A>> ProjectionIter<'a, A, I> {
34    /// # Panics
35    /// iff `projection` is empty
36    pub fn new(projection: &'a [usize], iter: I) -> Self {
37        Self {
38            projection: &projection[1..],
39            iter,
40            current_count: 0,
41            current_projection: projection[0],
42        }
43    }
44}
45
46impl<'a, A, I: Iterator<Item = A>> Iterator for ProjectionIter<'a, A, I> {
47    type Item = ProjectionResult<A>;
48
49    fn next(&mut self) -> Option<Self::Item> {
50        if let Some(item) = self.iter.next() {
51            let result = if self.current_count == self.current_projection {
52                if !self.projection.is_empty() {
53                    assert!(self.projection[0] > self.current_projection);
54                    self.current_projection = self.projection[0];
55                    self.projection = &self.projection[1..];
56                } else {
57                    self.current_projection = 0 // a value that most likely already passed
58                };
59                Some(ProjectionResult::Selected(item))
60            } else {
61                Some(ProjectionResult::NotSelected(item))
62            };
63            self.current_count += 1;
64            result
65        } else {
66            None
67        }
68    }
69
70    fn size_hint(&self) -> (usize, Option<usize>) {
71        self.iter.size_hint()
72    }
73}
74
75/// Returns a [`Chunk`] from a reader.
76/// # Panic
77/// Panics iff the projection is not in increasing order (e.g. `[1, 0]` nor `[0, 1, 1]` are valid)
78#[allow(clippy::too_many_arguments)]
79pub fn read_record_batch<R: Read + Seek>(
80    batch: arrow_format::ipc::RecordBatchRef,
81    fields: &[Field],
82    ipc_schema: &IpcSchema,
83    projection: Option<&[usize]>,
84    limit: Option<usize>,
85    dictionaries: &Dictionaries,
86    version: arrow_format::ipc::MetadataVersion,
87    reader: &mut R,
88    block_offset: u64,
89    file_size: u64,
90    scratch: &mut Vec<u8>,
91) -> Result<Chunk<Box<dyn Array>>> {
92    assert_eq!(fields.len(), ipc_schema.fields.len());
93    let buffers = batch
94        .buffers()
95        .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferBuffers(err)))?
96        .ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageBuffers))?;
97    let mut buffers: VecDeque<arrow_format::ipc::BufferRef> = buffers.iter().collect();
98
99    // check that the sum of the sizes of all buffers is <= than the size of the file
100    let buffers_size = buffers
101        .iter()
102        .map(|buffer| {
103            let buffer_size: u64 = buffer
104                .length()
105                .try_into()
106                .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;
107            Ok(buffer_size)
108        })
109        .sum::<Result<u64>>()?;
110    if buffers_size > file_size {
111        return Err(Error::from(OutOfSpecKind::InvalidBuffersLength {
112            buffers_size,
113            file_size,
114        }));
115    }
116
117    let field_nodes = batch
118        .nodes()
119        .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferNodes(err)))?
120        .ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageNodes))?;
121    let mut field_nodes = field_nodes.iter().collect::<VecDeque<_>>();
122
123    let columns = if let Some(projection) = projection {
124        let projection =
125            ProjectionIter::new(projection, fields.iter().zip(ipc_schema.fields.iter()));
126
127        projection
128            .map(|maybe_field| match maybe_field {
129                ProjectionResult::Selected((field, ipc_field)) => Ok(Some(read(
130                    &mut field_nodes,
131                    field,
132                    ipc_field,
133                    &mut buffers,
134                    reader,
135                    dictionaries,
136                    block_offset,
137                    ipc_schema.is_little_endian,
138                    batch.compression().map_err(|err| {
139                        Error::from(OutOfSpecKind::InvalidFlatbufferCompression(err))
140                    })?,
141                    limit,
142                    version,
143                    scratch,
144                )?)),
145                ProjectionResult::NotSelected((field, _)) => {
146                    skip(&mut field_nodes, &field.data_type, &mut buffers)?;
147                    Ok(None)
148                }
149            })
150            .filter_map(|x| x.transpose())
151            .collect::<Result<Vec<_>>>()?
152    } else {
153        fields
154            .iter()
155            .zip(ipc_schema.fields.iter())
156            .map(|(field, ipc_field)| {
157                read(
158                    &mut field_nodes,
159                    field,
160                    ipc_field,
161                    &mut buffers,
162                    reader,
163                    dictionaries,
164                    block_offset,
165                    ipc_schema.is_little_endian,
166                    batch.compression().map_err(|err| {
167                        Error::from(OutOfSpecKind::InvalidFlatbufferCompression(err))
168                    })?,
169                    limit,
170                    version,
171                    scratch,
172                )
173            })
174            .collect::<Result<Vec<_>>>()?
175    };
176    Chunk::try_new(columns)
177}
178
179fn find_first_dict_field_d<'a>(
180    id: i64,
181    data_type: &'a DataType,
182    ipc_field: &'a IpcField,
183) -> Option<(&'a Field, &'a IpcField)> {
184    use DataType::*;
185    match data_type {
186        Dictionary(_, inner, _) => find_first_dict_field_d(id, inner.as_ref(), ipc_field),
187        List(field) | LargeList(field) | FixedSizeList(field, ..) | Map(field, ..) => {
188            find_first_dict_field(id, field.as_ref(), &ipc_field.fields[0])
189        }
190        Union(fields, ..) | Struct(fields) => {
191            for (field, ipc_field) in fields.iter().zip(ipc_field.fields.iter()) {
192                if let Some(f) = find_first_dict_field(id, field, ipc_field) {
193                    return Some(f);
194                }
195            }
196            None
197        }
198        _ => None,
199    }
200}
201
202fn find_first_dict_field<'a>(
203    id: i64,
204    field: &'a Field,
205    ipc_field: &'a IpcField,
206) -> Option<(&'a Field, &'a IpcField)> {
207    if let Some(field_id) = ipc_field.dictionary_id {
208        if id == field_id {
209            return Some((field, ipc_field));
210        }
211    }
212    find_first_dict_field_d(id, &field.data_type, ipc_field)
213}
214
215pub(crate) fn first_dict_field<'a>(
216    id: i64,
217    fields: &'a [Field],
218    ipc_fields: &'a [IpcField],
219) -> Result<(&'a Field, &'a IpcField)> {
220    assert_eq!(fields.len(), ipc_fields.len());
221    for (field, ipc_field) in fields.iter().zip(ipc_fields.iter()) {
222        if let Some(field) = find_first_dict_field(id, field, ipc_field) {
223            return Ok(field);
224        }
225    }
226    Err(Error::from(OutOfSpecKind::InvalidId { requested_id: id }))
227}
228
229/// Reads a dictionary from the reader,
230/// updating `dictionaries` with the resulting dictionary
231#[allow(clippy::too_many_arguments)]
232pub fn read_dictionary<R: Read + Seek>(
233    batch: arrow_format::ipc::DictionaryBatchRef,
234    fields: &[Field],
235    ipc_schema: &IpcSchema,
236    dictionaries: &mut Dictionaries,
237    reader: &mut R,
238    block_offset: u64,
239    file_size: u64,
240    scratch: &mut Vec<u8>,
241) -> Result<()> {
242    if batch
243        .is_delta()
244        .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferIsDelta(err)))?
245    {
246        return Err(Error::NotYetImplemented(
247            "delta dictionary batches not supported".to_string(),
248        ));
249    }
250
251    let id = batch
252        .id()
253        .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferId(err)))?;
254    let (first_field, first_ipc_field) = first_dict_field(id, fields, &ipc_schema.fields)?;
255
256    let batch = batch
257        .data()
258        .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferData(err)))?
259        .ok_or_else(|| Error::from(OutOfSpecKind::MissingData))?;
260
261    let value_type =
262        if let DataType::Dictionary(_, value_type, _) = first_field.data_type.to_logical_type() {
263            value_type.as_ref()
264        } else {
265            return Err(Error::from(OutOfSpecKind::InvalidIdDataType {
266                requested_id: id,
267            }));
268        };
269
270    // Make a fake schema for the dictionary batch.
271    let fields = vec![Field::new("", value_type.clone(), false)];
272    let ipc_schema = IpcSchema {
273        fields: vec![first_ipc_field.clone()],
274        is_little_endian: ipc_schema.is_little_endian,
275    };
276    let chunk = read_record_batch(
277        batch,
278        &fields,
279        &ipc_schema,
280        None,
281        None, // we must read the whole dictionary
282        dictionaries,
283        arrow_format::ipc::MetadataVersion::V5,
284        reader,
285        block_offset,
286        file_size,
287        scratch,
288    )?;
289
290    dictionaries.insert(id, chunk.into_arrays().pop().unwrap());
291
292    Ok(())
293}
294
295#[cfg(test)]
296mod tests {
297    use super::*;
298
299    #[test]
300    fn project_iter() {
301        let iter = 1..6;
302        let iter = ProjectionIter::new(&[0, 2, 4], iter);
303        let result: Vec<_> = iter.collect();
304        use ProjectionResult::*;
305        assert_eq!(
306            result,
307            vec![
308                Selected(1),
309                NotSelected(2),
310                Selected(3),
311                NotSelected(4),
312                Selected(5)
313            ]
314        )
315    }
316}
317
318pub fn prepare_projection(
319    fields: &[Field],
320    mut projection: Vec<usize>,
321) -> (Vec<usize>, AHashMap<usize, usize>, Vec<Field>) {
322    let fields = projection.iter().map(|x| fields[*x].clone()).collect();
323
324    // todo: find way to do this more efficiently
325    let mut indices = (0..projection.len()).collect::<Vec<_>>();
326    indices.sort_unstable_by_key(|&i| &projection[i]);
327    let map = indices.iter().copied().enumerate().fold(
328        AHashMap::default(),
329        |mut acc, (index, new_index)| {
330            acc.insert(index, new_index);
331            acc
332        },
333    );
334    projection.sort_unstable();
335
336    // check unique
337    if !projection.is_empty() {
338        let mut previous = projection[0];
339
340        for &i in &projection[1..] {
341            assert!(
342                previous < i,
343                "The projection on IPC must not contain duplicates"
344            );
345            previous = i;
346        }
347    }
348
349    (projection, map, fields)
350}
351
352pub fn apply_projection(
353    chunk: Chunk<Box<dyn Array>>,
354    map: &AHashMap<usize, usize>,
355) -> Chunk<Box<dyn Array>> {
356    // re-order according to projection
357    let arrays = chunk.into_arrays();
358    let mut new_arrays = arrays.clone();
359
360    map.iter()
361        .for_each(|(old, new)| new_arrays[*new] = arrays[*old].clone());
362
363    Chunk::new(new_arrays)
364}