Skip to main content

arrow_ipc/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Arrow IPC File and Stream Readers
19//!
20//! # Notes
21//!
22//! The [`FileReader`] and [`StreamReader`] have similar interfaces,
23//! however the [`FileReader`] expects a reader that supports [`Seek`]ing
24//!
25//! [`Seek`]: std::io::Seek
26
27mod stream;
28pub use stream::*;
29
30use arrow_select::concat;
31
32use flatbuffers::{VectorIter, VerifierOptions};
33use std::collections::{HashMap, VecDeque};
34use std::fmt;
35use std::io::{BufReader, Read, Seek, SeekFrom};
36use std::sync::Arc;
37
38use arrow_array::*;
39use arrow_buffer::{
40    ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, NullBuffer, ScalarBuffer,
41};
42use arrow_data::{ArrayData, ArrayDataBuilder, UnsafeFlag};
43use arrow_schema::*;
44
45use crate::compression::{CompressionCodec, DecompressionContext};
46use crate::r#gen::Message::{self};
47use crate::{Block, CONTINUATION_MARKER, FieldNode, MetadataVersion};
48use DataType::*;
49
50/// Read a buffer based on offset and length
51/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
52/// Each constituent buffer is first compressed with the indicated
53/// compressor, and then written with the uncompressed length in the first 8
54/// bytes as a 64-bit little-endian signed integer followed by the compressed
55/// buffer bytes (and then padding as required by the protocol). The
56/// uncompressed length may be set to -1 to indicate that the data that
57/// follows is not compressed, which can be useful for cases where
58/// compression does not yield appreciable savings.
59fn read_buffer(
60    buf: &crate::Buffer,
61    a_data: &Buffer,
62    compression_codec: Option<CompressionCodec>,
63    decompression_context: &mut DecompressionContext,
64) -> Result<Buffer, ArrowError> {
65    let start_offset = buf.offset() as usize;
66    let buf_data = a_data.slice_with_length(start_offset, buf.length() as usize);
67    // corner case: empty buffer
68    match (buf_data.is_empty(), compression_codec) {
69        (true, _) | (_, None) => Ok(buf_data),
70        (false, Some(decompressor)) => {
71            decompressor.decompress_to_buffer(&buf_data, decompression_context)
72        }
73    }
74}
75impl RecordBatchDecoder<'_> {
76    /// Coordinates reading arrays based on data types.
77    ///
78    /// `variadic_counts` encodes the number of buffers to read for variadic types (e.g., Utf8View, BinaryView)
79    /// When encounter such types, we pop from the front of the queue to get the number of buffers to read.
80    ///
81    /// Notes:
82    /// * In the IPC format, null buffers are always set, but may be empty. We discard them if an array has 0 nulls
83    /// * Numeric values inside list arrays are often stored as 64-bit values regardless of their data type size.
84    ///   We thus:
85    ///     - check if the bit width of non-64-bit numbers is 64, and
86    ///     - read the buffer as 64-bit (signed integer or float), and
87    ///     - cast the 64-bit array to the appropriate data type
88    fn create_array(
89        &mut self,
90        field: &Field,
91        variadic_counts: &mut VecDeque<i64>,
92    ) -> Result<ArrayRef, ArrowError> {
93        let data_type = field.data_type();
94        match data_type {
95            Utf8 | Binary | LargeBinary | LargeUtf8 => {
96                let field_node = self.next_node(field)?;
97                let buffers = [
98                    self.next_buffer()?,
99                    self.next_buffer()?,
100                    self.next_buffer()?,
101                ];
102                self.create_primitive_array(field_node, data_type, &buffers)
103            }
104            BinaryView | Utf8View => {
105                let count = variadic_counts
106                    .pop_front()
107                    .ok_or(ArrowError::IpcError(format!(
108                        "Missing variadic count for {data_type} column"
109                    )))?;
110                let count = count + 2; // view and null buffer.
111                let buffers = (0..count)
112                    .map(|_| self.next_buffer())
113                    .collect::<Result<Vec<_>, _>>()?;
114                let field_node = self.next_node(field)?;
115                self.create_primitive_array(field_node, data_type, &buffers)
116            }
117            FixedSizeBinary(_) => {
118                let field_node = self.next_node(field)?;
119                let buffers = [self.next_buffer()?, self.next_buffer()?];
120                self.create_primitive_array(field_node, data_type, &buffers)
121            }
122            List(list_field) | LargeList(list_field) | Map(list_field, _) => {
123                let list_node = self.next_node(field)?;
124                let list_buffers = [self.next_buffer()?, self.next_buffer()?];
125                let values = self.create_array(list_field, variadic_counts)?;
126                self.create_list_array(list_node, data_type, &list_buffers, values)
127            }
128            ListView(list_field) | LargeListView(list_field) => {
129                let list_node = self.next_node(field)?;
130                let list_buffers = [
131                    self.next_buffer()?, // null buffer
132                    self.next_buffer()?, // offsets
133                    self.next_buffer()?, // sizes
134                ];
135                let values = self.create_array(list_field, variadic_counts)?;
136                self.create_list_view_array(list_node, data_type, &list_buffers, values)
137            }
138            FixedSizeList(list_field, _) => {
139                let list_node = self.next_node(field)?;
140                let list_buffers = [self.next_buffer()?];
141                let values = self.create_array(list_field, variadic_counts)?;
142                self.create_list_array(list_node, data_type, &list_buffers, values)
143            }
144            Struct(struct_fields) => {
145                let struct_node = self.next_node(field)?;
146                let null_buffer = self.next_buffer()?;
147
148                // read the arrays for each field
149                let mut struct_arrays = vec![];
150                // TODO investigate whether just knowing the number of buffers could
151                // still work
152                for struct_field in struct_fields {
153                    let child = self.create_array(struct_field, variadic_counts)?;
154                    struct_arrays.push(child);
155                }
156                self.create_struct_array(struct_node, null_buffer, struct_fields, struct_arrays)
157            }
158            RunEndEncoded(run_ends_field, values_field) => {
159                let run_node = self.next_node(field)?;
160                let run_ends = self.create_array(run_ends_field, variadic_counts)?;
161                let values = self.create_array(values_field, variadic_counts)?;
162
163                let run_array_length = run_node.length() as usize;
164                let builder = ArrayData::builder(data_type.clone())
165                    .len(run_array_length)
166                    .offset(0)
167                    .add_child_data(run_ends.into_data())
168                    .add_child_data(values.into_data())
169                    .null_count(run_node.null_count() as usize);
170
171                self.create_array_from_builder(builder)
172            }
173            // Create dictionary array from RecordBatch
174            Dictionary(_, _) => {
175                let index_node = self.next_node(field)?;
176                let index_buffers = [self.next_buffer()?, self.next_buffer()?];
177
178                #[allow(deprecated)]
179                let dict_id = field.dict_id().ok_or_else(|| {
180                    ArrowError::ParseError(format!("Field {field} does not have dict id"))
181                })?;
182
183                let value_array = match self.dictionaries_by_id.get(&dict_id) {
184                    Some(array) => array.clone(),
185                    None => {
186                        // Per the IPC spec, dictionary batches may be omitted when all
187                        // values in the column are null. In that case we synthesize an
188                        // empty values array so decoding can proceed.
189                        if let Dictionary(_, value_type) = data_type {
190                            arrow_array::new_empty_array(value_type.as_ref())
191                        } else {
192                            unreachable!()
193                        }
194                    }
195                };
196
197                self.create_dictionary_array(index_node, data_type, &index_buffers, value_array)
198            }
199            Union(fields, mode) => {
200                let union_node = self.next_node(field)?;
201                let len = union_node.length() as usize;
202
203                // In V4, union types has validity bitmap
204                // In V5 and later, union types have no validity bitmap
205                if self.version < MetadataVersion::V5 {
206                    self.next_buffer()?;
207                }
208
209                let type_ids: ScalarBuffer<i8> =
210                    self.next_buffer()?.slice_with_length(0, len).into();
211
212                let value_offsets = match mode {
213                    UnionMode::Dense => {
214                        let offsets: ScalarBuffer<i32> =
215                            self.next_buffer()?.slice_with_length(0, len * 4).into();
216                        Some(offsets)
217                    }
218                    UnionMode::Sparse => None,
219                };
220
221                let mut children = Vec::with_capacity(fields.len());
222
223                for (_id, field) in fields.iter() {
224                    let child = self.create_array(field, variadic_counts)?;
225                    children.push(child);
226                }
227
228                let array = if self.skip_validation.get() {
229                    // safety: flag can only be set via unsafe code
230                    unsafe {
231                        UnionArray::new_unchecked(fields.clone(), type_ids, value_offsets, children)
232                    }
233                } else {
234                    UnionArray::try_new(fields.clone(), type_ids, value_offsets, children)?
235                };
236                Ok(Arc::new(array))
237            }
238            Null => {
239                let node = self.next_node(field)?;
240                let length = node.length();
241                let null_count = node.null_count();
242
243                if length != null_count {
244                    return Err(ArrowError::SchemaError(format!(
245                        "Field {field} of NullArray has unequal null_count {null_count} and len {length}"
246                    )));
247                }
248
249                let builder = ArrayData::builder(data_type.clone())
250                    .len(length as usize)
251                    .offset(0);
252                self.create_array_from_builder(builder)
253            }
254            _ => {
255                let field_node = self.next_node(field)?;
256                let buffers = [self.next_buffer()?, self.next_buffer()?];
257                self.create_primitive_array(field_node, data_type, &buffers)
258            }
259        }
260    }
261
262    /// Reads the correct number of buffers based on data type and null_count, and creates a
263    /// primitive array ref
264    fn create_primitive_array(
265        &self,
266        field_node: &FieldNode,
267        data_type: &DataType,
268        buffers: &[Buffer],
269    ) -> Result<ArrayRef, ArrowError> {
270        let length = field_node.length() as usize;
271        let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
272        let mut builder = match data_type {
273            Utf8 | Binary | LargeBinary | LargeUtf8 => {
274                // read 3 buffers: null buffer (optional), offsets buffer and data buffer
275                ArrayData::builder(data_type.clone())
276                    .len(length)
277                    .buffers(buffers[1..3].to_vec())
278                    .null_bit_buffer(null_buffer)
279            }
280            BinaryView | Utf8View => ArrayData::builder(data_type.clone())
281                .len(length)
282                .buffers(buffers[1..].to_vec())
283                .null_bit_buffer(null_buffer),
284            _ if data_type.is_primitive() || matches!(data_type, Boolean | FixedSizeBinary(_)) => {
285                // read 2 buffers: null buffer (optional) and data buffer
286                ArrayData::builder(data_type.clone())
287                    .len(length)
288                    .add_buffer(buffers[1].clone())
289                    .null_bit_buffer(null_buffer)
290            }
291            t => unreachable!("Data type {:?} either unsupported or not primitive", t),
292        };
293
294        builder = builder.null_count(field_node.null_count() as usize);
295
296        self.create_array_from_builder(builder)
297    }
298
299    /// Update the ArrayDataBuilder based on settings in this decoder
300    fn create_array_from_builder(&self, builder: ArrayDataBuilder) -> Result<ArrayRef, ArrowError> {
301        let mut builder = builder.align_buffers(!self.require_alignment);
302        if self.skip_validation.get() {
303            // SAFETY: flag can only be set via unsafe code
304            unsafe { builder = builder.skip_validation(true) }
305        };
306        Ok(make_array(builder.build()?))
307    }
308
309    /// Reads the correct number of buffers based on list type and null_count, and creates a
310    /// list array ref
311    fn create_list_array(
312        &self,
313        field_node: &FieldNode,
314        data_type: &DataType,
315        buffers: &[Buffer],
316        child_array: ArrayRef,
317    ) -> Result<ArrayRef, ArrowError> {
318        let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
319        let length = field_node.length() as usize;
320        let child_data = child_array.into_data();
321        let mut builder = match data_type {
322            List(_) | LargeList(_) | Map(_, _) => ArrayData::builder(data_type.clone())
323                .len(length)
324                .add_buffer(buffers[1].clone())
325                .add_child_data(child_data)
326                .null_bit_buffer(null_buffer),
327
328            FixedSizeList(_, _) => ArrayData::builder(data_type.clone())
329                .len(length)
330                .add_child_data(child_data)
331                .null_bit_buffer(null_buffer),
332
333            _ => unreachable!("Cannot create list or map array from {:?}", data_type),
334        };
335
336        builder = builder.null_count(field_node.null_count() as usize);
337
338        self.create_array_from_builder(builder)
339    }
340
341    fn create_list_view_array(
342        &self,
343        field_node: &FieldNode,
344        data_type: &DataType,
345        buffers: &[Buffer],
346        child_array: ArrayRef,
347    ) -> Result<ArrayRef, ArrowError> {
348        assert!(matches!(data_type, ListView(_) | LargeListView(_)));
349
350        let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
351        let length = field_node.length() as usize;
352        let child_data = child_array.into_data();
353
354        self.create_array_from_builder(
355            ArrayData::builder(data_type.clone())
356                .len(length)
357                .add_buffer(buffers[1].clone()) // offsets
358                .add_buffer(buffers[2].clone()) // sizes
359                .add_child_data(child_data)
360                .null_bit_buffer(null_buffer)
361                .null_count(field_node.null_count() as usize),
362        )
363    }
364
365    fn create_struct_array(
366        &self,
367        struct_node: &FieldNode,
368        null_buffer: Buffer,
369        struct_fields: &Fields,
370        struct_arrays: Vec<ArrayRef>,
371    ) -> Result<ArrayRef, ArrowError> {
372        let null_count = struct_node.null_count() as usize;
373        let len = struct_node.length() as usize;
374        let skip_validation = self.skip_validation.get();
375
376        let nulls = if null_count > 0 {
377            let validity_buffer = BooleanBuffer::new(null_buffer, 0, len);
378            let null_buffer = if skip_validation {
379                // safety: flag can only be set via unsafe code
380                unsafe { NullBuffer::new_unchecked(validity_buffer, null_count) }
381            } else {
382                let null_buffer = NullBuffer::new(validity_buffer);
383
384                if null_buffer.null_count() != null_count {
385                    return Err(ArrowError::InvalidArgumentError(format!(
386                        "null_count value ({}) doesn't match actual number of nulls in array ({})",
387                        null_count,
388                        null_buffer.null_count()
389                    )));
390                }
391
392                null_buffer
393            };
394
395            Some(null_buffer)
396        } else {
397            None
398        };
399        if struct_arrays.is_empty() {
400            // `StructArray::from` can't infer the correct row count
401            // if we have zero fields
402            return Ok(Arc::new(StructArray::new_empty_fields(len, nulls)));
403        }
404
405        let struct_array = if skip_validation {
406            // safety: flag can only be set via unsafe code
407            unsafe { StructArray::new_unchecked(struct_fields.clone(), struct_arrays, nulls) }
408        } else {
409            StructArray::try_new(struct_fields.clone(), struct_arrays, nulls)?
410        };
411
412        Ok(Arc::new(struct_array))
413    }
414
415    /// Reads the correct number of buffers based on list type and null_count, and creates a
416    /// list array ref
417    fn create_dictionary_array(
418        &self,
419        field_node: &FieldNode,
420        data_type: &DataType,
421        buffers: &[Buffer],
422        value_array: ArrayRef,
423    ) -> Result<ArrayRef, ArrowError> {
424        if let Dictionary(_, _) = *data_type {
425            let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
426            let builder = ArrayData::builder(data_type.clone())
427                .len(field_node.length() as usize)
428                .add_buffer(buffers[1].clone())
429                .add_child_data(value_array.into_data())
430                .null_bit_buffer(null_buffer)
431                .null_count(field_node.null_count() as usize);
432            self.create_array_from_builder(builder)
433        } else {
434            unreachable!("Cannot create dictionary array from {:?}", data_type)
435        }
436    }
437}
438
439/// State for decoding Arrow arrays from an [IPC RecordBatch] structure to
440/// [`RecordBatch`]
441///
442/// [IPC RecordBatch]: crate::RecordBatch
443///
444pub struct RecordBatchDecoder<'a> {
445    /// The flatbuffers encoded record batch
446    batch: crate::RecordBatch<'a>,
447    /// The output schema
448    schema: SchemaRef,
449    /// Decoded dictionaries indexed by dictionary id
450    dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
451    /// Optional compression codec
452    compression: Option<CompressionCodec>,
453    /// Decompression context for reusing zstd decompressor state
454    decompression_context: DecompressionContext,
455    /// The format version
456    version: MetadataVersion,
457    /// The raw data buffer
458    data: &'a Buffer,
459    /// The fields comprising this array
460    nodes: VectorIter<'a, FieldNode>,
461    /// The buffers comprising this array
462    buffers: VectorIter<'a, crate::Buffer>,
463    /// Projection (subset of columns) to read, if any
464    /// See [`RecordBatchDecoder::with_projection`] for details
465    projection: Option<&'a [usize]>,
466    /// Are buffers required to already be aligned? See
467    /// [`RecordBatchDecoder::with_require_alignment`] for details
468    require_alignment: bool,
469    /// Should validation be skipped when reading data? Defaults to false.
470    ///
471    /// See [`FileDecoder::with_skip_validation`] for details.
472    skip_validation: UnsafeFlag,
473}
474
475impl<'a> RecordBatchDecoder<'a> {
476    /// Create a reader for decoding arrays from an encoded [`RecordBatch`]
477    fn try_new(
478        buf: &'a Buffer,
479        batch: crate::RecordBatch<'a>,
480        schema: SchemaRef,
481        dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
482        metadata: &'a MetadataVersion,
483    ) -> Result<Self, ArrowError> {
484        let buffers = batch.buffers().ok_or_else(|| {
485            ArrowError::IpcError("Unable to get buffers from IPC RecordBatch".to_string())
486        })?;
487        let field_nodes = batch.nodes().ok_or_else(|| {
488            ArrowError::IpcError("Unable to get field nodes from IPC RecordBatch".to_string())
489        })?;
490
491        let batch_compression = batch.compression();
492        let compression = batch_compression
493            .map(|batch_compression| batch_compression.codec().try_into())
494            .transpose()?;
495
496        Ok(Self {
497            batch,
498            schema,
499            dictionaries_by_id,
500            compression,
501            decompression_context: DecompressionContext::new(),
502            version: *metadata,
503            data: buf,
504            nodes: field_nodes.iter(),
505            buffers: buffers.iter(),
506            projection: None,
507            require_alignment: false,
508            skip_validation: UnsafeFlag::new(),
509        })
510    }
511
512    /// Set the projection (default: None)
513    ///
514    /// If set, the projection is the list  of column indices
515    /// that will be read
516    pub fn with_projection(mut self, projection: Option<&'a [usize]>) -> Self {
517        self.projection = projection;
518        self
519    }
520
521    /// Set require_alignment (default: false)
522    ///
523    /// If true, buffers must be aligned appropriately or error will
524    /// result. If false, buffers will be copied to aligned buffers
525    /// if necessary.
526    pub fn with_require_alignment(mut self, require_alignment: bool) -> Self {
527        self.require_alignment = require_alignment;
528        self
529    }
530
531    /// Specifies if validation should be skipped when reading data (defaults to `false`)
532    ///
533    /// Note this API is somewhat "funky" as it allows the caller to skip validation
534    /// without having to use `unsafe` code. If this is ever made public
535    /// it should be made clearer that this is a potentially unsafe by
536    /// using an `unsafe` function that takes a boolean flag.
537    ///
538    /// # Safety
539    ///
540    /// Relies on the caller only passing a flag with `true` value if they are
541    /// certain that the data is valid
542    pub(crate) fn with_skip_validation(mut self, skip_validation: UnsafeFlag) -> Self {
543        self.skip_validation = skip_validation;
544        self
545    }
546
547    /// Read the record batch, consuming the reader
548    fn read_record_batch(mut self) -> Result<RecordBatch, ArrowError> {
549        let mut variadic_counts: VecDeque<i64> = self
550            .batch
551            .variadicBufferCounts()
552            .into_iter()
553            .flatten()
554            .collect();
555
556        let options = RecordBatchOptions::new().with_row_count(Some(self.batch.length() as usize));
557
558        let schema = Arc::clone(&self.schema);
559        if let Some(projection) = self.projection {
560            let mut arrays = vec![];
561            // project fields
562            for (idx, field) in schema.fields().iter().enumerate() {
563                // Create array for projected field
564                if let Some(proj_idx) = projection.iter().position(|p| p == &idx) {
565                    let child = self.create_array(field, &mut variadic_counts)?;
566                    arrays.push((proj_idx, child));
567                } else {
568                    self.skip_field(field, &mut variadic_counts)?;
569                }
570            }
571
572            arrays.sort_by_key(|t| t.0);
573
574            let schema = Arc::new(schema.project(projection)?);
575            let columns = arrays.into_iter().map(|t| t.1).collect::<Vec<_>>();
576
577            if self.skip_validation.get() {
578                // Safety: setting `skip_validation` requires `unsafe`, user assures data is valid
579                unsafe {
580                    Ok(RecordBatch::new_unchecked(
581                        schema,
582                        columns,
583                        self.batch.length() as usize,
584                    ))
585                }
586            } else {
587                assert!(variadic_counts.is_empty());
588                RecordBatch::try_new_with_options(schema, columns, &options)
589            }
590        } else {
591            let mut children = vec![];
592            // keep track of index as lists require more than one node
593            for field in schema.fields() {
594                let child = self.create_array(field, &mut variadic_counts)?;
595                children.push(child);
596            }
597
598            if self.skip_validation.get() {
599                // Safety: setting `skip_validation` requires `unsafe`, user assures data is valid
600                unsafe {
601                    Ok(RecordBatch::new_unchecked(
602                        schema,
603                        children,
604                        self.batch.length() as usize,
605                    ))
606                }
607            } else {
608                assert!(variadic_counts.is_empty());
609                RecordBatch::try_new_with_options(schema, children, &options)
610            }
611        }
612    }
613
614    fn next_buffer(&mut self) -> Result<Buffer, ArrowError> {
615        let buffer = self.buffers.next().ok_or_else(|| {
616            ArrowError::IpcError("Buffer count mismatched with metadata".to_string())
617        })?;
618        read_buffer(
619            buffer,
620            self.data,
621            self.compression,
622            &mut self.decompression_context,
623        )
624    }
625
626    fn skip_buffer(&mut self) {
627        self.buffers.next().unwrap();
628    }
629
630    fn next_node(&mut self, field: &Field) -> Result<&'a FieldNode, ArrowError> {
631        self.nodes.next().ok_or_else(|| {
632            ArrowError::SchemaError(format!(
633                "Invalid data for schema. {field} refers to node not found in schema",
634            ))
635        })
636    }
637
638    fn skip_field(
639        &mut self,
640        field: &Field,
641        variadic_count: &mut VecDeque<i64>,
642    ) -> Result<(), ArrowError> {
643        self.next_node(field)?;
644
645        match field.data_type() {
646            Utf8 | Binary | LargeBinary | LargeUtf8 => {
647                for _ in 0..3 {
648                    self.skip_buffer()
649                }
650            }
651            Utf8View | BinaryView => {
652                let count = variadic_count
653                    .pop_front()
654                    .ok_or(ArrowError::IpcError(format!(
655                        "Missing variadic count for {} column",
656                        field.data_type()
657                    )))?;
658                let count = count + 2; // view and null buffer.
659                for _i in 0..count {
660                    self.skip_buffer()
661                }
662            }
663            FixedSizeBinary(_) => {
664                self.skip_buffer();
665                self.skip_buffer();
666            }
667            List(list_field) | LargeList(list_field) | Map(list_field, _) => {
668                self.skip_buffer();
669                self.skip_buffer();
670                self.skip_field(list_field, variadic_count)?;
671            }
672            ListView(list_field) | LargeListView(list_field) => {
673                self.skip_buffer(); // Null buffer
674                self.skip_buffer(); // Offsets
675                self.skip_buffer(); // Sizes
676                self.skip_field(list_field, variadic_count)?;
677            }
678            FixedSizeList(list_field, _) => {
679                self.skip_buffer();
680                self.skip_field(list_field, variadic_count)?;
681            }
682            Struct(struct_fields) => {
683                self.skip_buffer();
684
685                // skip for each field
686                for struct_field in struct_fields {
687                    self.skip_field(struct_field, variadic_count)?
688                }
689            }
690            RunEndEncoded(run_ends_field, values_field) => {
691                self.skip_field(run_ends_field, variadic_count)?;
692                self.skip_field(values_field, variadic_count)?;
693            }
694            Dictionary(_, _) => {
695                self.skip_buffer(); // Nulls
696                self.skip_buffer(); // Indices
697            }
698            Union(fields, mode) => {
699                if self.version < MetadataVersion::V5 {
700                    self.skip_buffer(); // Null buffer
701                }
702                self.skip_buffer(); // Type ids
703
704                match mode {
705                    UnionMode::Dense => self.skip_buffer(), // Offsets
706                    UnionMode::Sparse => {}
707                };
708
709                for (_, field) in fields.iter() {
710                    self.skip_field(field, variadic_count)?
711                }
712            }
713            // Null has no buffers to skip
714            Null => {}
715
716            // Fixed-width and boolean types: skip null buffer + values buffer
717            Boolean
718            | Int8
719            | Int16
720            | Int32
721            | Int64
722            | UInt8
723            | UInt16
724            | UInt32
725            | UInt64
726            | Float16
727            | Float32
728            | Float64
729            | Timestamp(_, _)
730            | Date32
731            | Date64
732            | Time32(_)
733            | Time64(_)
734            | Duration(_)
735            | Interval(_)
736            | Decimal32(_, _)
737            | Decimal64(_, _)
738            | Decimal128(_, _)
739            | Decimal256(_, _) => {
740                self.skip_buffer();
741                self.skip_buffer();
742            }
743        };
744        Ok(())
745    }
746}
747
748/// Creates a record batch from binary data using the `crate::RecordBatch` indexes and the `Schema`.
749///
750/// If `require_alignment` is true, this function will return an error if any array data in the
751/// input `buf` is not properly aligned.
752/// Under the hood it will use [`arrow_data::ArrayDataBuilder::build`] to construct [`arrow_data::ArrayData`].
753///
754/// If `require_alignment` is false, this function will automatically allocate a new aligned buffer
755/// and copy over the data if any array data in the input `buf` is not properly aligned.
756/// (Properly aligned array data will remain zero-copy.)
757/// Under the hood it will use [`arrow_data::ArrayDataBuilder::build_aligned`] to construct [`arrow_data::ArrayData`].
758pub fn read_record_batch(
759    buf: &Buffer,
760    batch: crate::RecordBatch,
761    schema: SchemaRef,
762    dictionaries_by_id: &HashMap<i64, ArrayRef>,
763    projection: Option<&[usize]>,
764    metadata: &MetadataVersion,
765) -> Result<RecordBatch, ArrowError> {
766    RecordBatchDecoder::try_new(buf, batch, schema, dictionaries_by_id, metadata)?
767        .with_projection(projection)
768        .with_require_alignment(false)
769        .read_record_batch()
770}
771
772/// Read the dictionary from the buffer and provided metadata,
773/// updating the `dictionaries_by_id` with the resulting dictionary
774pub fn read_dictionary(
775    buf: &Buffer,
776    batch: crate::DictionaryBatch,
777    schema: &Schema,
778    dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
779    metadata: &MetadataVersion,
780) -> Result<(), ArrowError> {
781    read_dictionary_impl(
782        buf,
783        batch,
784        schema,
785        dictionaries_by_id,
786        metadata,
787        false,
788        UnsafeFlag::new(),
789    )
790}
791
792fn read_dictionary_impl(
793    buf: &Buffer,
794    batch: crate::DictionaryBatch,
795    schema: &Schema,
796    dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
797    metadata: &MetadataVersion,
798    require_alignment: bool,
799    skip_validation: UnsafeFlag,
800) -> Result<(), ArrowError> {
801    let id = batch.id();
802
803    let dictionary_values = get_dictionary_values(
804        buf,
805        batch,
806        schema,
807        dictionaries_by_id,
808        metadata,
809        require_alignment,
810        skip_validation,
811    )?;
812
813    update_dictionaries(dictionaries_by_id, batch.isDelta(), id, dictionary_values)?;
814
815    Ok(())
816}
817
818/// Updates the `dictionaries_by_id` with the provided dictionary values and id.
819///
820/// # Errors
821/// - If `is_delta` is true and there is no existing dictionary for the given
822///   `dict_id`
823/// - If `is_delta` is true and the concatenation of the existing and new
824///   dictionary fails. This usually signals a type mismatch between the old and
825///   new values.
826fn update_dictionaries(
827    dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
828    is_delta: bool,
829    dict_id: i64,
830    dict_values: ArrayRef,
831) -> Result<(), ArrowError> {
832    if !is_delta {
833        // We don't currently record the isOrdered field. This could be general
834        // attributes of arrays.
835        // Add (possibly multiple) array refs to the dictionaries array.
836        dictionaries_by_id.insert(dict_id, dict_values.clone());
837        return Ok(());
838    }
839
840    let existing = dictionaries_by_id.get(&dict_id).ok_or_else(|| {
841        ArrowError::InvalidArgumentError(format!(
842            "No existing dictionary for delta dictionary with id '{dict_id}'"
843        ))
844    })?;
845
846    let combined = concat::concat(&[existing, &dict_values]).map_err(|e| {
847        ArrowError::InvalidArgumentError(format!("Failed to concat delta dictionary: {e}"))
848    })?;
849
850    dictionaries_by_id.insert(dict_id, combined);
851
852    Ok(())
853}
854
855/// Given a dictionary batch IPC message/body along with the full state of a
856/// stream including schema, dictionary cache, metadata, and other flags, this
857/// function will parse the buffer into an array of dictionary values.
858fn get_dictionary_values(
859    buf: &Buffer,
860    batch: crate::DictionaryBatch,
861    schema: &Schema,
862    dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
863    metadata: &MetadataVersion,
864    require_alignment: bool,
865    skip_validation: UnsafeFlag,
866) -> Result<ArrayRef, ArrowError> {
867    let id = batch.id();
868    #[allow(deprecated)]
869    let fields_using_this_dictionary = schema.fields_with_dict_id(id);
870    let first_field = fields_using_this_dictionary.first().ok_or_else(|| {
871        ArrowError::InvalidArgumentError(format!("dictionary id {id} not found in schema"))
872    })?;
873
874    // As the dictionary batch does not contain the type of the
875    // values array, we need to retrieve this from the schema.
876    // Get an array representing this dictionary's values.
877    let dictionary_values: ArrayRef = match first_field.data_type() {
878        DataType::Dictionary(_, value_type) => {
879            // Make a fake schema for the dictionary batch.
880            let value = value_type.as_ref().clone();
881            let schema = Schema::new(vec![Field::new("", value, true)]);
882            // Read a single column
883            let record_batch = RecordBatchDecoder::try_new(
884                buf,
885                batch.data().unwrap(),
886                Arc::new(schema),
887                dictionaries_by_id,
888                metadata,
889            )?
890            .with_require_alignment(require_alignment)
891            .with_skip_validation(skip_validation)
892            .read_record_batch()?;
893
894            Some(record_batch.column(0).clone())
895        }
896        _ => None,
897    }
898    .ok_or_else(|| {
899        ArrowError::InvalidArgumentError(format!("dictionary id {id} not found in schema"))
900    })?;
901
902    Ok(dictionary_values)
903}
904
905/// Read the data for a given block
906fn read_block<R: Read + Seek>(mut reader: R, block: &Block) -> Result<Buffer, ArrowError> {
907    reader.seek(SeekFrom::Start(block.offset() as u64))?;
908    let body_len = block.bodyLength().to_usize().unwrap();
909    let metadata_len = block.metaDataLength().to_usize().unwrap();
910    let total_len = body_len.checked_add(metadata_len).unwrap();
911
912    let mut buf = MutableBuffer::from_len_zeroed(total_len);
913    reader.read_exact(&mut buf)?;
914    Ok(buf.into())
915}
916
917/// Parse an encapsulated message
918///
919/// <https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format>
920fn parse_message(buf: &[u8]) -> Result<Message::Message<'_>, ArrowError> {
921    let buf = match buf[..4] == CONTINUATION_MARKER {
922        true => &buf[8..],
923        false => &buf[4..],
924    };
925    crate::root_as_message(buf)
926        .map_err(|err| ArrowError::ParseError(format!("Unable to get root as message: {err:?}")))
927}
928
929/// Read the footer length from the last 10 bytes of an Arrow IPC file
930///
931/// Expects a 4 byte footer length followed by `b"ARROW1"`
932pub fn read_footer_length(buf: [u8; 10]) -> Result<usize, ArrowError> {
933    if buf[4..] != super::ARROW_MAGIC {
934        return Err(ArrowError::ParseError(
935            "Arrow file does not contain correct footer".to_string(),
936        ));
937    }
938
939    // read footer length
940    let footer_len = i32::from_le_bytes(buf[..4].try_into().unwrap());
941    footer_len
942        .try_into()
943        .map_err(|_| ArrowError::ParseError(format!("Invalid footer length: {footer_len}")))
944}
945
946/// A low-level, push-based interface for reading an IPC file
947///
948/// For a higher-level interface see [`FileReader`]
949///
950/// For an example of using this API with `mmap` see the [`zero_copy_ipc`] example.
951///
952/// [`zero_copy_ipc`]: https://github.com/apache/arrow-rs/blob/main/arrow/examples/zero_copy_ipc.rs
953///
954/// ```
955/// # use std::sync::Arc;
956/// # use arrow_array::*;
957/// # use arrow_array::types::Int32Type;
958/// # use arrow_buffer::Buffer;
959/// # use arrow_ipc::convert::fb_to_schema;
960/// # use arrow_ipc::reader::{FileDecoder, read_footer_length};
961/// # use arrow_ipc::root_as_footer;
962/// # use arrow_ipc::writer::FileWriter;
963/// // Write an IPC file
964///
965/// let batch = RecordBatch::try_from_iter([
966///     ("a", Arc::new(Int32Array::from(vec![1, 2, 3])) as _),
967///     ("b", Arc::new(Int32Array::from(vec![1, 2, 3])) as _),
968///     ("c", Arc::new(DictionaryArray::<Int32Type>::from_iter(["hello", "hello", "world"])) as _),
969/// ]).unwrap();
970///
971/// let schema = batch.schema();
972///
973/// let mut out = Vec::with_capacity(1024);
974/// let mut writer = FileWriter::try_new(&mut out, schema.as_ref()).unwrap();
975/// writer.write(&batch).unwrap();
976/// writer.finish().unwrap();
977///
978/// drop(writer);
979///
980/// // Read IPC file
981///
982/// let buffer = Buffer::from_vec(out);
983/// let trailer_start = buffer.len() - 10;
984/// let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
985/// let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
986///
987/// let back = fb_to_schema(footer.schema().unwrap());
988/// assert_eq!(&back, schema.as_ref());
989///
990/// let mut decoder = FileDecoder::new(schema, footer.version());
991///
992/// // Read dictionaries
993/// for block in footer.dictionaries().iter().flatten() {
994///     let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
995///     let data = buffer.slice_with_length(block.offset() as _, block_len);
996///     decoder.read_dictionary(&block, &data).unwrap();
997/// }
998///
999/// // Read record batch
1000/// let batches = footer.recordBatches().unwrap();
1001/// assert_eq!(batches.len(), 1); // Only wrote a single batch
1002///
1003/// let block = batches.get(0);
1004/// let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
1005/// let data = buffer.slice_with_length(block.offset() as _, block_len);
1006/// let back = decoder.read_record_batch(block, &data).unwrap().unwrap();
1007///
1008/// assert_eq!(batch, back);
1009/// ```
1010#[derive(Debug)]
1011pub struct FileDecoder {
1012    schema: SchemaRef,
1013    dictionaries: HashMap<i64, ArrayRef>,
1014    version: MetadataVersion,
1015    projection: Option<Vec<usize>>,
1016    require_alignment: bool,
1017    skip_validation: UnsafeFlag,
1018}
1019
1020impl FileDecoder {
1021    /// Create a new [`FileDecoder`] with the given schema and version
1022    pub fn new(schema: SchemaRef, version: MetadataVersion) -> Self {
1023        Self {
1024            schema,
1025            version,
1026            dictionaries: Default::default(),
1027            projection: None,
1028            require_alignment: false,
1029            skip_validation: UnsafeFlag::new(),
1030        }
1031    }
1032
1033    /// Specify a projection
1034    pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
1035        self.projection = Some(projection);
1036        self
1037    }
1038
1039    /// Specifies if the array data in input buffers is required to be properly aligned.
1040    ///
1041    /// If `require_alignment` is true, this decoder will return an error if any array data in the
1042    /// input `buf` is not properly aligned.
1043    /// Under the hood it will use [`arrow_data::ArrayDataBuilder::build`] to construct
1044    /// [`arrow_data::ArrayData`].
1045    ///
1046    /// If `require_alignment` is false (the default), this decoder will automatically allocate a
1047    /// new aligned buffer and copy over the data if any array data in the input `buf` is not
1048    /// properly aligned. (Properly aligned array data will remain zero-copy.)
1049    /// Under the hood it will use [`arrow_data::ArrayDataBuilder::build_aligned`] to construct
1050    /// [`arrow_data::ArrayData`].
1051    pub fn with_require_alignment(mut self, require_alignment: bool) -> Self {
1052        self.require_alignment = require_alignment;
1053        self
1054    }
1055
1056    /// Specifies if validation should be skipped when reading data (defaults to `false`)
1057    ///
1058    /// # Safety
1059    ///
1060    /// This flag must only be set to `true` when you trust the input data and are sure the data you are
1061    /// reading is a valid Arrow IPC file, otherwise undefined behavior may
1062    /// result.
1063    ///
1064    /// For example, some programs may wish to trust reading IPC files written
1065    /// by the same process that created the files.
1066    pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
1067        unsafe { self.skip_validation.set(skip_validation) };
1068        self
1069    }
1070
1071    fn read_message<'a>(&self, buf: &'a [u8]) -> Result<Message::Message<'a>, ArrowError> {
1072        let message = parse_message(buf)?;
1073
1074        // some old test data's footer metadata is not set, so we account for that
1075        if self.version != MetadataVersion::V1 && message.version() != self.version {
1076            return Err(ArrowError::IpcError(
1077                "Could not read IPC message as metadata versions mismatch".to_string(),
1078            ));
1079        }
1080        Ok(message)
1081    }
1082
1083    /// Read the dictionary with the given block and data buffer
1084    pub fn read_dictionary(&mut self, block: &Block, buf: &Buffer) -> Result<(), ArrowError> {
1085        let message = self.read_message(buf)?;
1086        match message.header_type() {
1087            crate::MessageHeader::DictionaryBatch => {
1088                let batch = message.header_as_dictionary_batch().unwrap();
1089                read_dictionary_impl(
1090                    &buf.slice(block.metaDataLength() as _),
1091                    batch,
1092                    &self.schema,
1093                    &mut self.dictionaries,
1094                    &message.version(),
1095                    self.require_alignment,
1096                    self.skip_validation.clone(),
1097                )
1098            }
1099            t => Err(ArrowError::ParseError(format!(
1100                "Expecting DictionaryBatch in dictionary blocks, found {t:?}."
1101            ))),
1102        }
1103    }
1104
1105    /// Read the RecordBatch with the given block and data buffer
1106    pub fn read_record_batch(
1107        &self,
1108        block: &Block,
1109        buf: &Buffer,
1110    ) -> Result<Option<RecordBatch>, ArrowError> {
1111        let message = self.read_message(buf)?;
1112        match message.header_type() {
1113            crate::MessageHeader::Schema => Err(ArrowError::IpcError(
1114                "Not expecting a schema when messages are read".to_string(),
1115            )),
1116            crate::MessageHeader::RecordBatch => {
1117                let batch = message.header_as_record_batch().ok_or_else(|| {
1118                    ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
1119                })?;
1120                // read the block that makes up the record batch into a buffer
1121                RecordBatchDecoder::try_new(
1122                    &buf.slice(block.metaDataLength() as _),
1123                    batch,
1124                    self.schema.clone(),
1125                    &self.dictionaries,
1126                    &message.version(),
1127                )?
1128                .with_projection(self.projection.as_deref())
1129                .with_require_alignment(self.require_alignment)
1130                .with_skip_validation(self.skip_validation.clone())
1131                .read_record_batch()
1132                .map(Some)
1133            }
1134            crate::MessageHeader::NONE => Ok(None),
1135            t => Err(ArrowError::InvalidArgumentError(format!(
1136                "Reading types other than record batches not yet supported, unable to read {t:?}"
1137            ))),
1138        }
1139    }
1140}
1141
1142/// Build an Arrow [`FileReader`] with custom options.
1143#[derive(Debug)]
1144pub struct FileReaderBuilder {
1145    /// Optional projection for which columns to load (zero-based column indices)
1146    projection: Option<Vec<usize>>,
1147    /// Passed through to construct [`VerifierOptions`]
1148    max_footer_fb_tables: usize,
1149    /// Passed through to construct [`VerifierOptions`]
1150    max_footer_fb_depth: usize,
1151}
1152
1153impl Default for FileReaderBuilder {
1154    fn default() -> Self {
1155        let verifier_options = VerifierOptions::default();
1156        Self {
1157            max_footer_fb_tables: verifier_options.max_tables,
1158            max_footer_fb_depth: verifier_options.max_depth,
1159            projection: None,
1160        }
1161    }
1162}
1163
1164impl FileReaderBuilder {
1165    /// Options for creating a new [`FileReader`].
1166    ///
1167    /// To convert a builder into a reader, call [`FileReaderBuilder::build`].
1168    pub fn new() -> Self {
1169        Self::default()
1170    }
1171
1172    /// Optional projection for which columns to load (zero-based column indices).
1173    pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
1174        self.projection = Some(projection);
1175        self
1176    }
1177
1178    /// Flatbuffers option for parsing the footer. Controls the max number of fields and
1179    /// metadata key-value pairs that can be parsed from the schema of the footer.
1180    ///
1181    /// By default this is set to `1_000_000` which roughly translates to a schema with
1182    /// no metadata key-value pairs but 499,999 fields.
1183    ///
1184    /// This default limit is enforced to protect against malicious files with a massive
1185    /// amount of flatbuffer tables which could cause a denial of service attack.
1186    ///
1187    /// If you need to ingest a trusted file with a massive number of fields and/or
1188    /// metadata key-value pairs and are facing the error `"Unable to get root as
1189    /// footer: TooManyTables"` then increase this parameter as necessary.
1190    pub fn with_max_footer_fb_tables(mut self, max_footer_fb_tables: usize) -> Self {
1191        self.max_footer_fb_tables = max_footer_fb_tables;
1192        self
1193    }
1194
1195    /// Flatbuffers option for parsing the footer. Controls the max depth for schemas with
1196    /// nested fields parsed from the footer.
1197    ///
1198    /// By default this is set to `64` which roughly translates to a schema with
1199    /// a field nested 60 levels down through other struct fields.
1200    ///
1201    /// This default limit is enforced to protect against malicious files with a extremely
1202    /// deep flatbuffer structure which could cause a denial of service attack.
1203    ///
1204    /// If you need to ingest a trusted file with a deeply nested field and are facing the
1205    /// error `"Unable to get root as footer: DepthLimitReached"` then increase this
1206    /// parameter as necessary.
1207    pub fn with_max_footer_fb_depth(mut self, max_footer_fb_depth: usize) -> Self {
1208        self.max_footer_fb_depth = max_footer_fb_depth;
1209        self
1210    }
1211
1212    /// Build [`FileReader`] with given reader.
1213    pub fn build<R: Read + Seek>(self, mut reader: R) -> Result<FileReader<R>, ArrowError> {
1214        // Space for ARROW_MAGIC (6 bytes) and length (4 bytes)
1215        let mut buffer = [0; 10];
1216        reader.seek(SeekFrom::End(-10))?;
1217        reader.read_exact(&mut buffer)?;
1218
1219        let footer_len = read_footer_length(buffer)?;
1220
1221        // read footer
1222        let mut footer_data = vec![0; footer_len];
1223        reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
1224        reader.read_exact(&mut footer_data)?;
1225
1226        let verifier_options = VerifierOptions {
1227            max_tables: self.max_footer_fb_tables,
1228            max_depth: self.max_footer_fb_depth,
1229            ..Default::default()
1230        };
1231        let footer = crate::root_as_footer_with_opts(&verifier_options, &footer_data[..]).map_err(
1232            |err| ArrowError::ParseError(format!("Unable to get root as footer: {err:?}")),
1233        )?;
1234
1235        let blocks = footer.recordBatches().ok_or_else(|| {
1236            ArrowError::ParseError("Unable to get record batches from IPC Footer".to_string())
1237        })?;
1238
1239        let total_blocks = blocks.len();
1240
1241        let ipc_schema = footer.schema().unwrap();
1242        if !ipc_schema.endianness().equals_to_target_endianness() {
1243            return Err(ArrowError::IpcError(
1244                "the endianness of the source system does not match the endianness of the target system.".to_owned()
1245            ));
1246        }
1247
1248        let schema = crate::convert::fb_to_schema(ipc_schema);
1249
1250        let mut custom_metadata = HashMap::new();
1251        if let Some(fb_custom_metadata) = footer.custom_metadata() {
1252            for kv in fb_custom_metadata.into_iter() {
1253                custom_metadata.insert(
1254                    kv.key().unwrap().to_string(),
1255                    kv.value().unwrap().to_string(),
1256                );
1257            }
1258        }
1259
1260        let mut decoder = FileDecoder::new(Arc::new(schema), footer.version());
1261        if let Some(projection) = self.projection {
1262            decoder = decoder.with_projection(projection)
1263        }
1264
1265        // Create an array of optional dictionary value arrays, one per field.
1266        if let Some(dictionaries) = footer.dictionaries() {
1267            for block in dictionaries {
1268                let buf = read_block(&mut reader, block)?;
1269                decoder.read_dictionary(block, &buf)?;
1270            }
1271        }
1272
1273        Ok(FileReader {
1274            reader,
1275            blocks: blocks.iter().copied().collect(),
1276            current_block: 0,
1277            total_blocks,
1278            decoder,
1279            custom_metadata,
1280        })
1281    }
1282}
1283
1284/// Arrow File Reader
1285///
1286/// Reads Arrow [`RecordBatch`]es from bytes in the [IPC File Format],
1287/// providing random access to the record batches.
1288///
1289/// # See Also
1290///
1291/// * [`Self::set_index`] for random access
1292/// * [`StreamReader`] for reading streaming data
1293///
1294/// # Example: Reading from a `File`
1295/// ```
1296/// # use std::io::Cursor;
1297/// use arrow_array::record_batch;
1298/// # use arrow_ipc::reader::FileReader;
1299/// # use arrow_ipc::writer::FileWriter;
1300/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
1301/// # let mut file = vec![]; // mimic a stream for the example
1302/// # {
1303/// #  let mut writer = FileWriter::try_new(&mut file, &batch.schema()).unwrap();
1304/// #  writer.write(&batch).unwrap();
1305/// #  writer.write(&batch).unwrap();
1306/// #  writer.finish().unwrap();
1307/// # }
1308/// # let mut file = Cursor::new(&file);
1309/// let projection = None; // read all columns
1310/// let mut reader = FileReader::try_new(&mut file, projection).unwrap();
1311/// // Position the reader to the second batch
1312/// reader.set_index(1).unwrap();
1313/// // read batches from the reader using the Iterator trait
1314/// let mut num_rows = 0;
1315/// for batch in reader {
1316///    let batch = batch.unwrap();
1317///    num_rows += batch.num_rows();
1318/// }
1319/// assert_eq!(num_rows, 3);
1320/// ```
1321/// # Example: Reading from `mmap`ed file
1322///
1323/// For an example creating Arrays without copying using  memory mapped (`mmap`)
1324/// files see the [`zero_copy_ipc`] example.
1325///
1326/// [IPC File Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format
1327/// [`zero_copy_ipc`]: https://github.com/apache/arrow-rs/blob/main/arrow/examples/zero_copy_ipc.rs
1328pub struct FileReader<R> {
1329    /// File reader that supports reading and seeking
1330    reader: R,
1331
1332    /// The decoder
1333    decoder: FileDecoder,
1334
1335    /// The blocks in the file
1336    ///
1337    /// A block indicates the regions in the file to read to get data
1338    blocks: Vec<Block>,
1339
1340    /// A counter to keep track of the current block that should be read
1341    current_block: usize,
1342
1343    /// The total number of blocks, which may contain record batches and other types
1344    total_blocks: usize,
1345
1346    /// User defined metadata
1347    custom_metadata: HashMap<String, String>,
1348}
1349
1350impl<R> fmt::Debug for FileReader<R> {
1351    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1352        f.debug_struct("FileReader<R>")
1353            .field("decoder", &self.decoder)
1354            .field("blocks", &self.blocks)
1355            .field("current_block", &self.current_block)
1356            .field("total_blocks", &self.total_blocks)
1357            .finish_non_exhaustive()
1358    }
1359}
1360
1361impl<R: Read + Seek> FileReader<BufReader<R>> {
1362    /// Try to create a new file reader with the reader wrapped in a BufReader.
1363    ///
1364    /// See [`FileReader::try_new`] for an unbuffered version.
1365    pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1366        Self::try_new(BufReader::new(reader), projection)
1367    }
1368}
1369
1370impl<R: Read + Seek> FileReader<R> {
1371    /// Try to create a new file reader.
1372    ///
1373    /// There is no internal buffering. If buffered reads are needed you likely want to use
1374    /// [`FileReader::try_new_buffered`] instead.
1375    ///
1376    /// # Errors
1377    ///
1378    /// An ['Err'](Result::Err) may be returned if:
1379    /// - the file does not meet the Arrow Format footer requirements, or
1380    /// - file endianness does not match the target endianness.
1381    pub fn try_new(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1382        let builder = FileReaderBuilder {
1383            projection,
1384            ..Default::default()
1385        };
1386        builder.build(reader)
1387    }
1388
1389    /// Return user defined customized metadata
1390    pub fn custom_metadata(&self) -> &HashMap<String, String> {
1391        &self.custom_metadata
1392    }
1393
1394    /// Return the number of batches in the file
1395    pub fn num_batches(&self) -> usize {
1396        self.total_blocks
1397    }
1398
1399    /// Return the schema of the file
1400    pub fn schema(&self) -> SchemaRef {
1401        self.decoder.schema.clone()
1402    }
1403
1404    /// See to a specific [`RecordBatch`]
1405    ///
1406    /// Sets the current block to the index, allowing random reads
1407    pub fn set_index(&mut self, index: usize) -> Result<(), ArrowError> {
1408        if index >= self.total_blocks {
1409            Err(ArrowError::InvalidArgumentError(format!(
1410                "Cannot set batch to index {} from {} total batches",
1411                index, self.total_blocks
1412            )))
1413        } else {
1414            self.current_block = index;
1415            Ok(())
1416        }
1417    }
1418
1419    fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
1420        let block = &self.blocks[self.current_block];
1421        self.current_block += 1;
1422
1423        // read length
1424        let buffer = read_block(&mut self.reader, block)?;
1425        self.decoder.read_record_batch(block, &buffer)
1426    }
1427
1428    /// Gets a reference to the underlying reader.
1429    ///
1430    /// It is inadvisable to directly read from the underlying reader.
1431    pub fn get_ref(&self) -> &R {
1432        &self.reader
1433    }
1434
1435    /// Gets a mutable reference to the underlying reader.
1436    ///
1437    /// It is inadvisable to directly read from the underlying reader.
1438    pub fn get_mut(&mut self) -> &mut R {
1439        &mut self.reader
1440    }
1441
1442    /// Specifies if validation should be skipped when reading data (defaults to `false`)
1443    ///
1444    /// # Safety
1445    ///
1446    /// See [`FileDecoder::with_skip_validation`]
1447    pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
1448        self.decoder = unsafe { self.decoder.with_skip_validation(skip_validation) };
1449        self
1450    }
1451}
1452
1453impl<R: Read + Seek> Iterator for FileReader<R> {
1454    type Item = Result<RecordBatch, ArrowError>;
1455
1456    fn next(&mut self) -> Option<Self::Item> {
1457        // get current block
1458        if self.current_block < self.total_blocks {
1459            self.maybe_next().transpose()
1460        } else {
1461            None
1462        }
1463    }
1464}
1465
1466impl<R: Read + Seek> RecordBatchReader for FileReader<R> {
1467    fn schema(&self) -> SchemaRef {
1468        self.schema()
1469    }
1470}
1471
1472/// Arrow Stream Reader
1473///
1474/// Reads Arrow [`RecordBatch`]es from bytes in the [IPC Streaming Format].
1475///
1476/// # See Also
1477///
1478/// * [`FileReader`] for random access.
1479///
1480/// # Example
1481/// ```
1482/// # use arrow_array::record_batch;
1483/// # use arrow_ipc::reader::StreamReader;
1484/// # use arrow_ipc::writer::StreamWriter;
1485/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
1486/// # let mut stream = vec![]; // mimic a stream for the example
1487/// # {
1488/// #  let mut writer = StreamWriter::try_new(&mut stream, &batch.schema()).unwrap();
1489/// #  writer.write(&batch).unwrap();
1490/// #  writer.finish().unwrap();
1491/// # }
1492/// # let stream = stream.as_slice();
1493/// let projection = None; // read all columns
1494/// let mut reader = StreamReader::try_new(stream, projection).unwrap();
1495/// // read batches from the reader using the Iterator trait
1496/// let mut num_rows = 0;
1497/// for batch in reader {
1498///    let batch = batch.unwrap();
1499///    num_rows += batch.num_rows();
1500/// }
1501/// assert_eq!(num_rows, 3);
1502/// ```
1503///
1504/// [IPC Streaming Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
1505pub struct StreamReader<R> {
1506    /// Stream reader
1507    reader: MessageReader<R>,
1508
1509    /// The schema that is read from the stream's first message
1510    schema: SchemaRef,
1511
1512    /// Optional dictionaries for each schema field.
1513    ///
1514    /// Dictionaries may be appended to in the streaming format.
1515    dictionaries_by_id: HashMap<i64, ArrayRef>,
1516
1517    /// An indicator of whether the stream is complete.
1518    ///
1519    /// This value is set to `true` the first time the reader's `next()` returns `None`.
1520    finished: bool,
1521
1522    /// Optional projection
1523    projection: Option<(Vec<usize>, Schema)>,
1524
1525    /// Should validation be skipped when reading data? Defaults to false.
1526    ///
1527    /// See [`FileDecoder::with_skip_validation`] for details.
1528    skip_validation: UnsafeFlag,
1529}
1530
1531impl<R> fmt::Debug for StreamReader<R> {
1532    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
1533        f.debug_struct("StreamReader<R>")
1534            .field("reader", &"R")
1535            .field("schema", &self.schema)
1536            .field("dictionaries_by_id", &self.dictionaries_by_id)
1537            .field("finished", &self.finished)
1538            .field("projection", &self.projection)
1539            .finish()
1540    }
1541}
1542
1543impl<R: Read> StreamReader<BufReader<R>> {
1544    /// Try to create a new stream reader with the reader wrapped in a BufReader.
1545    ///
1546    /// See [`StreamReader::try_new`] for an unbuffered version.
1547    pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1548        Self::try_new(BufReader::new(reader), projection)
1549    }
1550}
1551
1552impl<R: Read> StreamReader<R> {
1553    /// Try to create a new stream reader.
1554    ///
1555    /// To check if the reader is done, use [`is_finished(self)`](StreamReader::is_finished).
1556    ///
1557    /// There is no internal buffering. If buffered reads are needed you likely want to use
1558    /// [`StreamReader::try_new_buffered`] instead.
1559    ///
1560    /// # Errors
1561    ///
1562    /// An ['Err'](Result::Err) may be returned if the reader does not encounter a schema
1563    /// as the first message in the stream.
1564    pub fn try_new(
1565        reader: R,
1566        projection: Option<Vec<usize>>,
1567    ) -> Result<StreamReader<R>, ArrowError> {
1568        let mut msg_reader = MessageReader::new(reader);
1569        let message = msg_reader.maybe_next()?;
1570        let Some((message, _)) = message else {
1571            return Err(ArrowError::IpcError(
1572                "Expected schema message, found empty stream.".to_string(),
1573            ));
1574        };
1575
1576        if message.header_type() != Message::MessageHeader::Schema {
1577            return Err(ArrowError::IpcError(format!(
1578                "Expected a schema as the first message in the stream, got: {:?}",
1579                message.header_type()
1580            )));
1581        }
1582
1583        let schema = message.header_as_schema().ok_or_else(|| {
1584            ArrowError::ParseError("Failed to parse schema from message header".to_string())
1585        })?;
1586        let schema = crate::convert::fb_to_schema(schema);
1587
1588        // Create an array of optional dictionary value arrays, one per field.
1589        let dictionaries_by_id = HashMap::new();
1590
1591        let projection = match projection {
1592            Some(projection_indices) => {
1593                let schema = schema.project(&projection_indices)?;
1594                Some((projection_indices, schema))
1595            }
1596            _ => None,
1597        };
1598
1599        Ok(Self {
1600            reader: msg_reader,
1601            schema: Arc::new(schema),
1602            finished: false,
1603            dictionaries_by_id,
1604            projection,
1605            skip_validation: UnsafeFlag::new(),
1606        })
1607    }
1608
1609    /// Deprecated, use [`StreamReader::try_new`] instead.
1610    #[deprecated(since = "53.0.0", note = "use `try_new` instead")]
1611    pub fn try_new_unbuffered(
1612        reader: R,
1613        projection: Option<Vec<usize>>,
1614    ) -> Result<Self, ArrowError> {
1615        Self::try_new(reader, projection)
1616    }
1617
1618    /// Return the schema of the stream
1619    pub fn schema(&self) -> SchemaRef {
1620        self.schema.clone()
1621    }
1622
1623    /// Check if the stream is finished
1624    pub fn is_finished(&self) -> bool {
1625        self.finished
1626    }
1627
1628    fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
1629        if self.finished {
1630            return Ok(None);
1631        }
1632
1633        // Read messages until we get a record batch or end of stream
1634        loop {
1635            let message = self.next_ipc_message()?;
1636            let Some(message) = message else {
1637                // If the message is None, we have reached the end of the stream.
1638                self.finished = true;
1639                return Ok(None);
1640            };
1641
1642            match message {
1643                IpcMessage::Schema(_) => {
1644                    return Err(ArrowError::IpcError(
1645                        "Expected a record batch, but found a schema".to_string(),
1646                    ));
1647                }
1648                IpcMessage::RecordBatch(record_batch) => {
1649                    return Ok(Some(record_batch));
1650                }
1651                IpcMessage::DictionaryBatch { .. } => {
1652                    continue;
1653                }
1654            };
1655        }
1656    }
1657
1658    /// Reads and fully parses the next IPC message from the stream. Whereas
1659    /// [`Self::maybe_next`] is a higher level method focused on reading
1660    /// `RecordBatch`es, this method returns the individual fully parsed IPC
1661    /// messages from the underlying stream.
1662    ///
1663    /// This is useful primarily for testing reader/writer behaviors as it
1664    /// allows a full view into the messages that have been written to a stream.
1665    pub(crate) fn next_ipc_message(&mut self) -> Result<Option<IpcMessage>, ArrowError> {
1666        let message = self.reader.maybe_next()?;
1667        let Some((message, body)) = message else {
1668            // If the message is None, we have reached the end of the stream.
1669            return Ok(None);
1670        };
1671
1672        let ipc_message = match message.header_type() {
1673            Message::MessageHeader::Schema => {
1674                let schema = message.header_as_schema().ok_or_else(|| {
1675                    ArrowError::ParseError("Failed to parse schema from message header".to_string())
1676                })?;
1677                let arrow_schema = crate::convert::fb_to_schema(schema);
1678                IpcMessage::Schema(arrow_schema)
1679            }
1680            Message::MessageHeader::RecordBatch => {
1681                let batch = message.header_as_record_batch().ok_or_else(|| {
1682                    ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
1683                })?;
1684
1685                let version = message.version();
1686                let schema = self.schema.clone();
1687                let record_batch = RecordBatchDecoder::try_new(
1688                    &body.into(),
1689                    batch,
1690                    schema,
1691                    &self.dictionaries_by_id,
1692                    &version,
1693                )?
1694                .with_projection(self.projection.as_ref().map(|x| x.0.as_ref()))
1695                .with_require_alignment(false)
1696                .with_skip_validation(self.skip_validation.clone())
1697                .read_record_batch()?;
1698                IpcMessage::RecordBatch(record_batch)
1699            }
1700            Message::MessageHeader::DictionaryBatch => {
1701                let dict = message.header_as_dictionary_batch().ok_or_else(|| {
1702                    ArrowError::ParseError(
1703                        "Failed to parse dictionary batch from message header".to_string(),
1704                    )
1705                })?;
1706
1707                let version = message.version();
1708                let dict_values = get_dictionary_values(
1709                    &body.into(),
1710                    dict,
1711                    &self.schema,
1712                    &mut self.dictionaries_by_id,
1713                    &version,
1714                    false,
1715                    self.skip_validation.clone(),
1716                )?;
1717
1718                update_dictionaries(
1719                    &mut self.dictionaries_by_id,
1720                    dict.isDelta(),
1721                    dict.id(),
1722                    dict_values.clone(),
1723                )?;
1724
1725                IpcMessage::DictionaryBatch {
1726                    id: dict.id(),
1727                    is_delta: (dict.isDelta()),
1728                    values: (dict_values),
1729                }
1730            }
1731            x => {
1732                return Err(ArrowError::ParseError(format!(
1733                    "Unsupported message header type in IPC stream: '{x:?}'"
1734                )));
1735            }
1736        };
1737
1738        Ok(Some(ipc_message))
1739    }
1740
1741    /// Gets a reference to the underlying reader.
1742    ///
1743    /// It is inadvisable to directly read from the underlying reader.
1744    pub fn get_ref(&self) -> &R {
1745        self.reader.inner()
1746    }
1747
1748    /// Gets a mutable reference to the underlying reader.
1749    ///
1750    /// It is inadvisable to directly read from the underlying reader.
1751    pub fn get_mut(&mut self) -> &mut R {
1752        self.reader.inner_mut()
1753    }
1754
1755    /// Specifies if validation should be skipped when reading data (defaults to `false`)
1756    ///
1757    /// # Safety
1758    ///
1759    /// See [`FileDecoder::with_skip_validation`]
1760    pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
1761        unsafe { self.skip_validation.set(skip_validation) };
1762        self
1763    }
1764}
1765
1766impl<R: Read> Iterator for StreamReader<R> {
1767    type Item = Result<RecordBatch, ArrowError>;
1768
1769    fn next(&mut self) -> Option<Self::Item> {
1770        self.maybe_next().transpose()
1771    }
1772}
1773
1774impl<R: Read> RecordBatchReader for StreamReader<R> {
1775    fn schema(&self) -> SchemaRef {
1776        self.schema.clone()
1777    }
1778}
1779
1780/// Representation of a fully parsed IpcMessage from the underlying stream.
1781/// Parsing this kind of message is done by higher level constructs such as
1782/// [`StreamReader`], because fully interpreting the messages into a record
1783/// batch or dictionary batch requires access to stream state such as schema
1784/// and the full dictionary cache.
1785#[derive(Debug)]
1786#[allow(dead_code)]
1787pub(crate) enum IpcMessage {
1788    Schema(arrow_schema::Schema),
1789    RecordBatch(RecordBatch),
1790    DictionaryBatch {
1791        id: i64,
1792        is_delta: bool,
1793        values: ArrayRef,
1794    },
1795}
1796
1797/// A low-level construct that reads [`Message::Message`]s from a reader while
1798/// re-using a buffer for metadata. This is composed into [`StreamReader`].
1799struct MessageReader<R> {
1800    reader: R,
1801    buf: Vec<u8>,
1802}
1803
1804impl<R: Read> MessageReader<R> {
1805    fn new(reader: R) -> Self {
1806        Self {
1807            reader,
1808            buf: Vec::new(),
1809        }
1810    }
1811
1812    /// Reads the entire next message from the underlying reader which includes
1813    /// the metadata length, the metadata, and the body.
1814    ///
1815    /// # Returns
1816    /// - `Ok(None)` if the the reader signals the end of stream with EOF on
1817    ///   the first read
1818    /// - `Err(_)` if the reader returns an error other than on the first
1819    ///   read, or if the metadata length is invalid
1820    /// - `Ok(Some(_))` with the Message and buffer containiner the
1821    ///   body bytes otherwise.
1822    fn maybe_next(&mut self) -> Result<Option<(Message::Message<'_>, MutableBuffer)>, ArrowError> {
1823        let meta_len = self.read_meta_len()?;
1824        let Some(meta_len) = meta_len else {
1825            return Ok(None);
1826        };
1827
1828        self.buf.resize(meta_len, 0);
1829        self.reader.read_exact(&mut self.buf)?;
1830
1831        let message = crate::root_as_message(self.buf.as_slice()).map_err(|err| {
1832            ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
1833        })?;
1834
1835        let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
1836        self.reader.read_exact(&mut buf)?;
1837
1838        Ok(Some((message, buf)))
1839    }
1840
1841    /// Get a mutable reference to the underlying reader.
1842    fn inner_mut(&mut self) -> &mut R {
1843        &mut self.reader
1844    }
1845
1846    /// Get an immutable reference to the underlying reader.
1847    fn inner(&self) -> &R {
1848        &self.reader
1849    }
1850
1851    /// Read the metadata length for the next message from the underlying stream.
1852    ///
1853    /// # Returns
1854    /// - `Ok(None)` if the the reader signals the end of stream with EOF on
1855    ///   the first read
1856    /// - `Err(_)` if the reader returns an error other than on the first
1857    ///   read, or if the metadata length is less than 0.
1858    /// - `Ok(Some(_))` with the length otherwise.
1859    pub fn read_meta_len(&mut self) -> Result<Option<usize>, ArrowError> {
1860        let mut meta_len: [u8; 4] = [0; 4];
1861        match self.reader.read_exact(&mut meta_len) {
1862            Ok(_) => {}
1863            Err(e) => {
1864                return if e.kind() == std::io::ErrorKind::UnexpectedEof {
1865                    // Handle EOF without the "0xFFFFFFFF 0x00000000"
1866                    // valid according to:
1867                    // https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
1868                    Ok(None)
1869                } else {
1870                    Err(ArrowError::from(e))
1871                };
1872            }
1873        };
1874
1875        let meta_len = {
1876            // If a continuation marker is encountered, skip over it and read
1877            // the size from the next four bytes.
1878            if meta_len == CONTINUATION_MARKER {
1879                self.reader.read_exact(&mut meta_len)?;
1880            }
1881
1882            i32::from_le_bytes(meta_len)
1883        };
1884
1885        if meta_len == 0 {
1886            return Ok(None);
1887        }
1888
1889        let meta_len = usize::try_from(meta_len)
1890            .map_err(|_| ArrowError::ParseError(format!("Invalid metadata length: {meta_len}")))?;
1891
1892        Ok(Some(meta_len))
1893    }
1894}
1895
1896#[cfg(test)]
1897mod tests {
1898    use std::io::Cursor;
1899
1900    use crate::convert::fb_to_schema;
1901    use crate::writer::{
1902        DictionaryTracker, IpcDataGenerator, IpcWriteOptions, unslice_run_array, write_message,
1903    };
1904
1905    use super::*;
1906
1907    use crate::{root_as_footer, root_as_message, size_prefixed_root_as_message};
1908    use arrow_array::builder::{PrimitiveRunBuilder, UnionBuilder};
1909    use arrow_array::types::*;
1910    use arrow_buffer::{NullBuffer, OffsetBuffer};
1911    use arrow_data::ArrayDataBuilder;
1912
1913    fn create_test_projection_schema() -> Schema {
1914        // define field types
1915        let list_data_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1916
1917        let fixed_size_list_data_type =
1918            DataType::FixedSizeList(Arc::new(Field::new_list_field(DataType::Int32, false)), 3);
1919
1920        let union_fields = UnionFields::from_fields(vec![
1921            Field::new("a", DataType::Int32, false),
1922            Field::new("b", DataType::Float64, false),
1923        ]);
1924
1925        let union_data_type = DataType::Union(union_fields, UnionMode::Dense);
1926
1927        let struct_fields = Fields::from(vec![
1928            Field::new("id", DataType::Int32, false),
1929            Field::new_list("list", Field::new_list_field(DataType::Int8, true), false),
1930        ]);
1931        let struct_data_type = DataType::Struct(struct_fields);
1932
1933        let run_encoded_data_type = DataType::RunEndEncoded(
1934            Arc::new(Field::new("run_ends", DataType::Int16, false)),
1935            Arc::new(Field::new("values", DataType::Int32, true)),
1936        );
1937
1938        // define schema
1939        Schema::new(vec![
1940            Field::new("f0", DataType::UInt32, false),
1941            Field::new("f1", DataType::Utf8, false),
1942            Field::new("f2", DataType::Boolean, false),
1943            Field::new("f3", union_data_type, true),
1944            Field::new("f4", DataType::Null, true),
1945            Field::new("f5", DataType::Float64, true),
1946            Field::new("f6", list_data_type, false),
1947            Field::new("f7", DataType::FixedSizeBinary(3), true),
1948            Field::new("f8", fixed_size_list_data_type, false),
1949            Field::new("f9", struct_data_type, false),
1950            Field::new("f10", run_encoded_data_type, false),
1951            Field::new("f11", DataType::Boolean, false),
1952            Field::new_dictionary("f12", DataType::Int8, DataType::Utf8, false),
1953            Field::new("f13", DataType::Utf8, false),
1954        ])
1955    }
1956
1957    fn create_test_projection_batch_data(schema: &Schema) -> RecordBatch {
1958        // set test data for each column
1959        let array0 = UInt32Array::from(vec![1, 2, 3]);
1960        let array1 = StringArray::from(vec!["foo", "bar", "baz"]);
1961        let array2 = BooleanArray::from(vec![true, false, true]);
1962
1963        let mut union_builder = UnionBuilder::new_dense();
1964        union_builder.append::<Int32Type>("a", 1).unwrap();
1965        union_builder.append::<Float64Type>("b", 10.1).unwrap();
1966        union_builder.append_null::<Float64Type>("b").unwrap();
1967        let array3 = union_builder.build().unwrap();
1968
1969        let array4 = NullArray::new(3);
1970        let array5 = Float64Array::from(vec![Some(1.1), None, Some(3.3)]);
1971        let array6_values = vec![
1972            Some(vec![Some(10), Some(10), Some(10)]),
1973            Some(vec![Some(20), Some(20), Some(20)]),
1974            Some(vec![Some(30), Some(30)]),
1975        ];
1976        let array6 = ListArray::from_iter_primitive::<Int32Type, _, _>(array6_values);
1977        let array7_values = vec![vec![11, 12, 13], vec![22, 23, 24], vec![33, 34, 35]];
1978        let array7 = FixedSizeBinaryArray::try_from_iter(array7_values.into_iter()).unwrap();
1979
1980        let array8_values = ArrayData::builder(DataType::Int32)
1981            .len(9)
1982            .add_buffer(Buffer::from_slice_ref([40, 41, 42, 43, 44, 45, 46, 47, 48]))
1983            .build()
1984            .unwrap();
1985        let array8_data = ArrayData::builder(schema.field(8).data_type().clone())
1986            .len(3)
1987            .add_child_data(array8_values)
1988            .build()
1989            .unwrap();
1990        let array8 = FixedSizeListArray::from(array8_data);
1991
1992        let array9_id: ArrayRef = Arc::new(Int32Array::from(vec![1001, 1002, 1003]));
1993        let array9_list: ArrayRef =
1994            Arc::new(ListArray::from_iter_primitive::<Int8Type, _, _>(vec![
1995                Some(vec![Some(-10)]),
1996                Some(vec![Some(-20), Some(-20), Some(-20)]),
1997                Some(vec![Some(-30)]),
1998            ]));
1999        let array9 = ArrayDataBuilder::new(schema.field(9).data_type().clone())
2000            .add_child_data(array9_id.into_data())
2001            .add_child_data(array9_list.into_data())
2002            .len(3)
2003            .build()
2004            .unwrap();
2005        let array9 = StructArray::from(array9);
2006
2007        let array10_input = vec![Some(1_i32), None, None];
2008        let mut array10_builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
2009        array10_builder.extend(array10_input);
2010        let array10 = array10_builder.finish();
2011
2012        let array11 = BooleanArray::from(vec![false, false, true]);
2013
2014        let array12_values = StringArray::from(vec!["x", "yy", "zzz"]);
2015        let array12_keys = Int8Array::from_iter_values([1, 1, 2]);
2016        let array12 = DictionaryArray::new(array12_keys, Arc::new(array12_values));
2017
2018        let array13 = StringArray::from(vec!["a", "bb", "ccc"]);
2019
2020        // create record batch
2021        RecordBatch::try_new(
2022            Arc::new(schema.clone()),
2023            vec![
2024                Arc::new(array0),
2025                Arc::new(array1),
2026                Arc::new(array2),
2027                Arc::new(array3),
2028                Arc::new(array4),
2029                Arc::new(array5),
2030                Arc::new(array6),
2031                Arc::new(array7),
2032                Arc::new(array8),
2033                Arc::new(array9),
2034                Arc::new(array10),
2035                Arc::new(array11),
2036                Arc::new(array12),
2037                Arc::new(array13),
2038            ],
2039        )
2040        .unwrap()
2041    }
2042
2043    #[test]
2044    fn test_negative_meta_len_start_stream() {
2045        let bytes = i32::to_le_bytes(-1);
2046        let mut buf = vec![];
2047        buf.extend(CONTINUATION_MARKER);
2048        buf.extend(bytes);
2049
2050        let reader_err = StreamReader::try_new(Cursor::new(buf), None).err();
2051        assert!(reader_err.is_some());
2052        assert_eq!(
2053            reader_err.unwrap().to_string(),
2054            "Parser error: Invalid metadata length: -1"
2055        );
2056    }
2057
2058    #[test]
2059    fn test_negative_meta_len_mid_stream() {
2060        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2061        let mut buf = Vec::new();
2062        {
2063            let mut writer = crate::writer::StreamWriter::try_new(&mut buf, &schema).unwrap();
2064            let batch =
2065                RecordBatch::try_new(Arc::new(schema), vec![Arc::new(Int32Array::from(vec![1]))])
2066                    .unwrap();
2067            writer.write(&batch).unwrap();
2068        }
2069
2070        let bytes = i32::to_le_bytes(-1);
2071        buf.extend(CONTINUATION_MARKER);
2072        buf.extend(bytes);
2073
2074        let mut reader = StreamReader::try_new(Cursor::new(buf), None).unwrap();
2075        // Read the valid value
2076        assert!(reader.maybe_next().is_ok());
2077        // Read the invalid meta len
2078        let batch_err = reader.maybe_next().err();
2079        assert!(batch_err.is_some());
2080        assert_eq!(
2081            batch_err.unwrap().to_string(),
2082            "Parser error: Invalid metadata length: -1"
2083        );
2084    }
2085
2086    #[test]
2087    fn test_missing_buffer_metadata_error() {
2088        use crate::r#gen::Message::*;
2089        use flatbuffers::FlatBufferBuilder;
2090
2091        let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Int32, true)]));
2092
2093        // create RecordBatch buffer metadata with invalid buffer count
2094        // Int32Array needs 2 buffers (validity + data) but we provide only 1
2095        let mut fbb = FlatBufferBuilder::new();
2096        let nodes = fbb.create_vector(&[FieldNode::new(2, 0)]);
2097        let buffers = fbb.create_vector(&[crate::Buffer::new(0, 8)]);
2098        let batch_offset = RecordBatch::create(
2099            &mut fbb,
2100            &RecordBatchArgs {
2101                length: 2,
2102                nodes: Some(nodes),
2103                buffers: Some(buffers),
2104                compression: None,
2105                variadicBufferCounts: None,
2106            },
2107        );
2108        fbb.finish_minimal(batch_offset);
2109        let batch_bytes = fbb.finished_data().to_vec();
2110        let batch = flatbuffers::root::<RecordBatch>(&batch_bytes).unwrap();
2111
2112        let data_buffer = Buffer::from(vec![0u8; 8]);
2113        let dictionaries: HashMap<i64, ArrayRef> = HashMap::new();
2114        let metadata = MetadataVersion::V5;
2115
2116        let decoder = RecordBatchDecoder::try_new(
2117            &data_buffer,
2118            batch,
2119            schema.clone(),
2120            &dictionaries,
2121            &metadata,
2122        )
2123        .unwrap();
2124
2125        let result = decoder.read_record_batch();
2126
2127        match result {
2128            Err(ArrowError::IpcError(msg)) => {
2129                assert_eq!(msg, "Buffer count mismatched with metadata");
2130            }
2131            other => panic!("unexpected error: {other:?}"),
2132        }
2133    }
2134
2135    /// Test that the reader can read legacy files where empty list arrays were written with a 0-byte offsets buffer.
2136    #[test]
2137    fn test_read_legacy_empty_list_without_offsets_buffer() {
2138        use crate::r#gen::Message::*;
2139        use flatbuffers::FlatBufferBuilder;
2140
2141        let schema = Arc::new(Schema::new(vec![Field::new_list(
2142            "items",
2143            Field::new_list_field(DataType::Int32, true),
2144            true,
2145        )]));
2146
2147        // Legacy arrow-rs versions wrote empty offsets buffers for empty list arrays.
2148        // Keep reader compatibility with such files by accepting a 0-byte offsets buffer.
2149        let mut fbb = FlatBufferBuilder::new();
2150        let nodes = fbb.create_vector(&[
2151            FieldNode::new(0, 0), // list node
2152            FieldNode::new(0, 0), // child int32 node
2153        ]);
2154        let buffers = fbb.create_vector(&[
2155            crate::Buffer::new(0, 0), // list validity
2156            crate::Buffer::new(0, 0), // list offsets (legacy empty buffer)
2157            crate::Buffer::new(0, 0), // child validity
2158            crate::Buffer::new(0, 0), // child values
2159        ]);
2160        let batch_offset = RecordBatch::create(
2161            &mut fbb,
2162            &RecordBatchArgs {
2163                length: 0,
2164                nodes: Some(nodes),
2165                buffers: Some(buffers),
2166                compression: None,
2167                variadicBufferCounts: None,
2168            },
2169        );
2170        fbb.finish_minimal(batch_offset);
2171        let batch_bytes = fbb.finished_data().to_vec();
2172        let batch = flatbuffers::root::<RecordBatch>(&batch_bytes).unwrap();
2173
2174        let body = Buffer::from(Vec::<u8>::new());
2175        let dictionaries: HashMap<i64, ArrayRef> = HashMap::new();
2176        let metadata = MetadataVersion::V5;
2177
2178        let decoder =
2179            RecordBatchDecoder::try_new(&body, batch, schema.clone(), &dictionaries, &metadata)
2180                .unwrap();
2181
2182        let read_batch = decoder.read_record_batch().unwrap();
2183        assert_eq!(read_batch.num_rows(), 0);
2184
2185        let list = read_batch
2186            .column(0)
2187            .as_any()
2188            .downcast_ref::<ListArray>()
2189            .unwrap();
2190        assert_eq!(list.len(), 0);
2191        assert_eq!(list.values().len(), 0);
2192    }
2193
2194    /// Test that the reader can read legacy files where empty Utf8/Binary arrays were written with a 0-byte offsets buffer.
2195    #[test]
2196    fn test_read_legacy_empty_utf8_and_binary_without_offsets_buffer() {
2197        use crate::r#gen::Message::*;
2198        use flatbuffers::FlatBufferBuilder;
2199
2200        let schema = Arc::new(Schema::new(vec![
2201            Field::new("name", DataType::Utf8, true),
2202            Field::new("payload", DataType::Binary, true),
2203        ]));
2204
2205        // Legacy arrow-rs versions wrote empty offsets buffers for empty Utf8/Binary arrays.
2206        // Keep reader compatibility with such files by accepting 0-byte offsets buffers.
2207        let mut fbb = FlatBufferBuilder::new();
2208        let nodes = fbb.create_vector(&[
2209            FieldNode::new(0, 0), // utf8 node
2210            FieldNode::new(0, 0), // binary node
2211        ]);
2212        let buffers = fbb.create_vector(&[
2213            crate::Buffer::new(0, 0), // utf8 validity
2214            crate::Buffer::new(0, 0), // utf8 offsets (legacy empty buffer)
2215            crate::Buffer::new(0, 0), // utf8 values
2216            crate::Buffer::new(0, 0), // binary validity
2217            crate::Buffer::new(0, 0), // binary offsets (legacy empty buffer)
2218            crate::Buffer::new(0, 0), // binary values
2219        ]);
2220        let batch_offset = RecordBatch::create(
2221            &mut fbb,
2222            &RecordBatchArgs {
2223                length: 0,
2224                nodes: Some(nodes),
2225                buffers: Some(buffers),
2226                compression: None,
2227                variadicBufferCounts: None,
2228            },
2229        );
2230        fbb.finish_minimal(batch_offset);
2231        let batch_bytes = fbb.finished_data().to_vec();
2232        let batch = flatbuffers::root::<RecordBatch>(&batch_bytes).unwrap();
2233
2234        let body = Buffer::from(Vec::<u8>::new());
2235        let dictionaries: HashMap<i64, ArrayRef> = HashMap::new();
2236        let metadata = MetadataVersion::V5;
2237
2238        let decoder =
2239            RecordBatchDecoder::try_new(&body, batch, schema.clone(), &dictionaries, &metadata)
2240                .unwrap();
2241
2242        let read_batch = decoder.read_record_batch().unwrap();
2243        assert_eq!(read_batch.num_rows(), 0);
2244
2245        let utf8 = read_batch
2246            .column(0)
2247            .as_any()
2248            .downcast_ref::<StringArray>()
2249            .unwrap();
2250        assert_eq!(utf8.len(), 0);
2251        assert_eq!(utf8.value_offsets(), [0]);
2252
2253        let binary = read_batch
2254            .column(1)
2255            .as_any()
2256            .downcast_ref::<BinaryArray>()
2257            .unwrap();
2258        assert_eq!(binary.len(), 0);
2259        assert_eq!(binary.value_offsets(), [0]);
2260    }
2261
2262    #[test]
2263    fn test_projection_array_values() {
2264        // define schema
2265        let schema = create_test_projection_schema();
2266
2267        // create record batch with test data
2268        let batch = create_test_projection_batch_data(&schema);
2269
2270        // write record batch in IPC format
2271        let mut buf = Vec::new();
2272        {
2273            let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
2274            writer.write(&batch).unwrap();
2275            writer.finish().unwrap();
2276        }
2277
2278        // read record batch with projection
2279        for index in 0..12 {
2280            let projection = vec![index];
2281            let reader = FileReader::try_new(std::io::Cursor::new(buf.clone()), Some(projection));
2282            let read_batch = reader.unwrap().next().unwrap().unwrap();
2283            let projected_column = read_batch.column(0);
2284            let expected_column = batch.column(index);
2285
2286            // check the projected column equals the expected column
2287            assert_eq!(projected_column.as_ref(), expected_column.as_ref());
2288        }
2289
2290        {
2291            // read record batch with reversed projection
2292            let reader =
2293                FileReader::try_new(std::io::Cursor::new(buf.clone()), Some(vec![3, 2, 1]));
2294            let read_batch = reader.unwrap().next().unwrap().unwrap();
2295            let expected_batch = batch.project(&[3, 2, 1]).unwrap();
2296            assert_eq!(read_batch, expected_batch);
2297        }
2298    }
2299
2300    #[test]
2301    fn test_arrow_single_float_row() {
2302        let schema = Schema::new(vec![
2303            Field::new("a", DataType::Float32, false),
2304            Field::new("b", DataType::Float32, false),
2305            Field::new("c", DataType::Int32, false),
2306            Field::new("d", DataType::Int32, false),
2307        ]);
2308        let arrays = vec![
2309            Arc::new(Float32Array::from(vec![1.23])) as ArrayRef,
2310            Arc::new(Float32Array::from(vec![-6.50])) as ArrayRef,
2311            Arc::new(Int32Array::from(vec![2])) as ArrayRef,
2312            Arc::new(Int32Array::from(vec![1])) as ArrayRef,
2313        ];
2314        let batch = RecordBatch::try_new(Arc::new(schema.clone()), arrays).unwrap();
2315        // create stream writer
2316        let mut file = tempfile::tempfile().unwrap();
2317        let mut stream_writer = crate::writer::StreamWriter::try_new(&mut file, &schema).unwrap();
2318        stream_writer.write(&batch).unwrap();
2319        stream_writer.finish().unwrap();
2320
2321        drop(stream_writer);
2322
2323        file.rewind().unwrap();
2324
2325        // read stream back
2326        let reader = StreamReader::try_new(&mut file, None).unwrap();
2327
2328        reader.for_each(|batch| {
2329            let batch = batch.unwrap();
2330            assert!(
2331                batch
2332                    .column(0)
2333                    .as_any()
2334                    .downcast_ref::<Float32Array>()
2335                    .unwrap()
2336                    .value(0)
2337                    != 0.0
2338            );
2339            assert!(
2340                batch
2341                    .column(1)
2342                    .as_any()
2343                    .downcast_ref::<Float32Array>()
2344                    .unwrap()
2345                    .value(0)
2346                    != 0.0
2347            );
2348        });
2349
2350        file.rewind().unwrap();
2351
2352        // Read with projection
2353        let reader = StreamReader::try_new(file, Some(vec![0, 3])).unwrap();
2354
2355        reader.for_each(|batch| {
2356            let batch = batch.unwrap();
2357            assert_eq!(batch.schema().fields().len(), 2);
2358            assert_eq!(batch.schema().fields()[0].data_type(), &DataType::Float32);
2359            assert_eq!(batch.schema().fields()[1].data_type(), &DataType::Int32);
2360        });
2361    }
2362
2363    /// Write the record batch to an in-memory buffer in IPC File format
2364    fn write_ipc(rb: &RecordBatch) -> Vec<u8> {
2365        let mut buf = Vec::new();
2366        let mut writer = crate::writer::FileWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
2367        writer.write(rb).unwrap();
2368        writer.finish().unwrap();
2369        buf
2370    }
2371
2372    /// Return the first record batch read from the IPC File buffer
2373    fn read_ipc(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2374        let mut reader = FileReader::try_new(std::io::Cursor::new(buf), None)?;
2375        reader.next().unwrap()
2376    }
2377
2378    /// Return the first record batch read from the IPC File buffer, disabling
2379    /// validation
2380    fn read_ipc_skip_validation(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2381        let mut reader = unsafe {
2382            FileReader::try_new(std::io::Cursor::new(buf), None)?.with_skip_validation(true)
2383        };
2384        reader.next().unwrap()
2385    }
2386
2387    fn roundtrip_ipc(rb: &RecordBatch) -> RecordBatch {
2388        let buf = write_ipc(rb);
2389        read_ipc(&buf).unwrap()
2390    }
2391
2392    /// Return the first record batch read from the IPC File buffer
2393    /// using the FileDecoder API
2394    fn read_ipc_with_decoder(buf: Vec<u8>) -> Result<RecordBatch, ArrowError> {
2395        read_ipc_with_decoder_inner(buf, false)
2396    }
2397
2398    /// Return the first record batch read from the IPC File buffer
2399    /// using the FileDecoder API, disabling validation
2400    fn read_ipc_with_decoder_skip_validation(buf: Vec<u8>) -> Result<RecordBatch, ArrowError> {
2401        read_ipc_with_decoder_inner(buf, true)
2402    }
2403
2404    fn read_ipc_with_decoder_inner(
2405        buf: Vec<u8>,
2406        skip_validation: bool,
2407    ) -> Result<RecordBatch, ArrowError> {
2408        let buffer = Buffer::from_vec(buf);
2409        let trailer_start = buffer.len() - 10;
2410        let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap())?;
2411        let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start])
2412            .map_err(|e| ArrowError::InvalidArgumentError(format!("Invalid footer: {e}")))?;
2413
2414        let schema = fb_to_schema(footer.schema().unwrap());
2415
2416        let mut decoder = unsafe {
2417            FileDecoder::new(Arc::new(schema), footer.version())
2418                .with_skip_validation(skip_validation)
2419        };
2420        // Read dictionaries
2421        for block in footer.dictionaries().iter().flatten() {
2422            let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2423            let data = buffer.slice_with_length(block.offset() as _, block_len);
2424            decoder.read_dictionary(block, &data)?
2425        }
2426
2427        // Read record batch
2428        let batches = footer.recordBatches().unwrap();
2429        assert_eq!(batches.len(), 1); // Only wrote a single batch
2430
2431        let block = batches.get(0);
2432        let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2433        let data = buffer.slice_with_length(block.offset() as _, block_len);
2434        Ok(decoder.read_record_batch(block, &data)?.unwrap())
2435    }
2436
2437    /// Write the record batch to an in-memory buffer in IPC Stream format
2438    fn write_stream(rb: &RecordBatch) -> Vec<u8> {
2439        let mut buf = Vec::new();
2440        let mut writer = crate::writer::StreamWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
2441        writer.write(rb).unwrap();
2442        writer.finish().unwrap();
2443        buf
2444    }
2445
2446    /// Return the first record batch read from the IPC Stream buffer
2447    fn read_stream(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2448        let mut reader = StreamReader::try_new(std::io::Cursor::new(buf), None)?;
2449        reader.next().unwrap()
2450    }
2451
2452    /// Return the first record batch read from the IPC Stream buffer,
2453    /// disabling validation
2454    fn read_stream_skip_validation(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2455        let mut reader = unsafe {
2456            StreamReader::try_new(std::io::Cursor::new(buf), None)?.with_skip_validation(true)
2457        };
2458        reader.next().unwrap()
2459    }
2460
2461    fn roundtrip_ipc_stream(rb: &RecordBatch) -> RecordBatch {
2462        let buf = write_stream(rb);
2463        read_stream(&buf).unwrap()
2464    }
2465
2466    #[test]
2467    fn test_roundtrip_with_custom_metadata() {
2468        let schema = Schema::new(vec![Field::new("dummy", DataType::Float64, false)]);
2469        let mut buf = Vec::new();
2470        let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
2471        let mut test_metadata = HashMap::new();
2472        test_metadata.insert("abc".to_string(), "abc".to_string());
2473        test_metadata.insert("def".to_string(), "def".to_string());
2474        for (k, v) in &test_metadata {
2475            writer.write_metadata(k, v);
2476        }
2477        writer.finish().unwrap();
2478        drop(writer);
2479
2480        let reader = crate::reader::FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
2481        assert_eq!(reader.custom_metadata(), &test_metadata);
2482    }
2483
2484    #[test]
2485    fn test_roundtrip_nested_dict() {
2486        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2487
2488        let array = Arc::new(inner) as ArrayRef;
2489
2490        let dctfield = Arc::new(Field::new("dict", array.data_type().clone(), false));
2491
2492        let s = StructArray::from(vec![(dctfield, array)]);
2493        let struct_array = Arc::new(s) as ArrayRef;
2494
2495        let schema = Arc::new(Schema::new(vec![Field::new(
2496            "struct",
2497            struct_array.data_type().clone(),
2498            false,
2499        )]));
2500
2501        let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2502
2503        assert_eq!(batch, roundtrip_ipc(&batch));
2504    }
2505
2506    #[test]
2507    fn test_roundtrip_nested_dict_no_preserve_dict_id() {
2508        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2509
2510        let array = Arc::new(inner) as ArrayRef;
2511
2512        let dctfield = Arc::new(Field::new("dict", array.data_type().clone(), false));
2513
2514        let s = StructArray::from(vec![(dctfield, array)]);
2515        let struct_array = Arc::new(s) as ArrayRef;
2516
2517        let schema = Arc::new(Schema::new(vec![Field::new(
2518            "struct",
2519            struct_array.data_type().clone(),
2520            false,
2521        )]));
2522
2523        let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2524
2525        let mut buf = Vec::new();
2526        let mut writer = crate::writer::FileWriter::try_new_with_options(
2527            &mut buf,
2528            batch.schema_ref(),
2529            IpcWriteOptions::default(),
2530        )
2531        .unwrap();
2532        writer.write(&batch).unwrap();
2533        writer.finish().unwrap();
2534        drop(writer);
2535
2536        let mut reader = FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
2537
2538        assert_eq!(batch, reader.next().unwrap().unwrap());
2539    }
2540
2541    fn check_union_with_builder(mut builder: UnionBuilder) {
2542        builder.append::<Int32Type>("a", 1).unwrap();
2543        builder.append_null::<Int32Type>("a").unwrap();
2544        builder.append::<Float64Type>("c", 3.0).unwrap();
2545        builder.append::<Int32Type>("a", 4).unwrap();
2546        builder.append::<Int64Type>("d", 11).unwrap();
2547        let union = builder.build().unwrap();
2548
2549        let schema = Arc::new(Schema::new(vec![Field::new(
2550            "union",
2551            union.data_type().clone(),
2552            false,
2553        )]));
2554
2555        let union_array = Arc::new(union) as ArrayRef;
2556
2557        let rb = RecordBatch::try_new(schema, vec![union_array]).unwrap();
2558        let rb2 = roundtrip_ipc(&rb);
2559        // TODO: equality not yet implemented for union, so we check that the length of the array is
2560        // the same and that all of the buffers are the same instead.
2561        assert_eq!(rb.schema(), rb2.schema());
2562        assert_eq!(rb.num_columns(), rb2.num_columns());
2563        assert_eq!(rb.num_rows(), rb2.num_rows());
2564        let union1 = rb.column(0);
2565        let union2 = rb2.column(0);
2566
2567        assert_eq!(union1, union2);
2568    }
2569
2570    #[test]
2571    fn test_roundtrip_dense_union() {
2572        check_union_with_builder(UnionBuilder::new_dense());
2573    }
2574
2575    #[test]
2576    fn test_roundtrip_sparse_union() {
2577        check_union_with_builder(UnionBuilder::new_sparse());
2578    }
2579
2580    #[test]
2581    fn test_roundtrip_struct_empty_fields() {
2582        let nulls = NullBuffer::from(&[true, true, false]);
2583        let rb = RecordBatch::try_from_iter([(
2584            "",
2585            Arc::new(StructArray::new_empty_fields(nulls.len(), Some(nulls))) as _,
2586        )])
2587        .unwrap();
2588        let rb2 = roundtrip_ipc(&rb);
2589        assert_eq!(rb, rb2);
2590    }
2591
2592    #[test]
2593    fn test_roundtrip_stream_run_array_sliced() {
2594        let run_array_1: Int32RunArray = vec!["a", "a", "a", "b", "b", "c", "c", "c"]
2595            .into_iter()
2596            .collect();
2597        let run_array_1_sliced = run_array_1.slice(2, 5);
2598
2599        let run_array_2_inupt = vec![Some(1_i32), None, None, Some(2), Some(2)];
2600        let mut run_array_2_builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
2601        run_array_2_builder.extend(run_array_2_inupt);
2602        let run_array_2 = run_array_2_builder.finish();
2603
2604        let schema = Arc::new(Schema::new(vec![
2605            Field::new(
2606                "run_array_1_sliced",
2607                run_array_1_sliced.data_type().clone(),
2608                false,
2609            ),
2610            Field::new("run_array_2", run_array_2.data_type().clone(), false),
2611        ]));
2612        let input_batch = RecordBatch::try_new(
2613            schema,
2614            vec![Arc::new(run_array_1_sliced.clone()), Arc::new(run_array_2)],
2615        )
2616        .unwrap();
2617        let output_batch = roundtrip_ipc_stream(&input_batch);
2618
2619        // As partial comparison not yet supported for run arrays, the sliced run array
2620        // has to be unsliced before comparing with the output. the second run array
2621        // can be compared as such.
2622        assert_eq!(input_batch.column(1), output_batch.column(1));
2623
2624        let run_array_1_unsliced = unslice_run_array(run_array_1_sliced.into_data()).unwrap();
2625        assert_eq!(run_array_1_unsliced, output_batch.column(0).into_data());
2626    }
2627
2628    #[test]
2629    fn test_roundtrip_stream_nested_dict() {
2630        let xs = vec!["AA", "BB", "AA", "CC", "BB"];
2631        let dict = Arc::new(
2632            xs.clone()
2633                .into_iter()
2634                .collect::<DictionaryArray<Int8Type>>(),
2635        );
2636        let string_array: ArrayRef = Arc::new(StringArray::from(xs.clone()));
2637        let struct_array = StructArray::from(vec![
2638            (
2639                Arc::new(Field::new("f2.1", DataType::Utf8, false)),
2640                string_array,
2641            ),
2642            (
2643                Arc::new(Field::new("f2.2_struct", dict.data_type().clone(), false)),
2644                dict.clone() as ArrayRef,
2645            ),
2646        ]);
2647        let schema = Arc::new(Schema::new(vec![
2648            Field::new("f1_string", DataType::Utf8, false),
2649            Field::new("f2_struct", struct_array.data_type().clone(), false),
2650        ]));
2651        let input_batch = RecordBatch::try_new(
2652            schema,
2653            vec![
2654                Arc::new(StringArray::from(xs.clone())),
2655                Arc::new(struct_array),
2656            ],
2657        )
2658        .unwrap();
2659        let output_batch = roundtrip_ipc_stream(&input_batch);
2660        assert_eq!(input_batch, output_batch);
2661    }
2662
2663    #[test]
2664    fn test_roundtrip_stream_nested_dict_of_map_of_dict() {
2665        let values = StringArray::from(vec![Some("a"), None, Some("b"), Some("c")]);
2666        let values = Arc::new(values) as ArrayRef;
2667        let value_dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 3, 1]);
2668        let value_dict_array = DictionaryArray::new(value_dict_keys, values.clone());
2669
2670        let key_dict_keys = Int8Array::from_iter_values([0, 0, 2, 1, 1, 3]);
2671        let key_dict_array = DictionaryArray::new(key_dict_keys, values);
2672
2673        #[allow(deprecated)]
2674        let keys_field = Arc::new(Field::new_dict(
2675            "keys",
2676            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2677            true, // It is technically not legal for this field to be null.
2678            1,
2679            false,
2680        ));
2681        #[allow(deprecated)]
2682        let values_field = Arc::new(Field::new_dict(
2683            "values",
2684            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2685            true,
2686            2,
2687            false,
2688        ));
2689        let entry_struct = StructArray::from(vec![
2690            (keys_field, make_array(key_dict_array.into_data())),
2691            (values_field, make_array(value_dict_array.into_data())),
2692        ]);
2693        let map_data_type = DataType::Map(
2694            Arc::new(Field::new(
2695                "entries",
2696                entry_struct.data_type().clone(),
2697                false,
2698            )),
2699            false,
2700        );
2701
2702        let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 6]);
2703        let map_data = ArrayData::builder(map_data_type)
2704            .len(3)
2705            .add_buffer(entry_offsets)
2706            .add_child_data(entry_struct.into_data())
2707            .build()
2708            .unwrap();
2709        let map_array = MapArray::from(map_data);
2710
2711        let dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 2, 1]);
2712        let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array));
2713
2714        let schema = Arc::new(Schema::new(vec![Field::new(
2715            "f1",
2716            dict_dict_array.data_type().clone(),
2717            false,
2718        )]));
2719        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2720        let output_batch = roundtrip_ipc_stream(&input_batch);
2721        assert_eq!(input_batch, output_batch);
2722    }
2723
2724    fn test_roundtrip_stream_dict_of_list_of_dict_impl<
2725        OffsetSize: OffsetSizeTrait,
2726        U: ArrowNativeType,
2727    >(
2728        list_data_type: DataType,
2729        offsets: &[U; 5],
2730    ) {
2731        let values = StringArray::from(vec![Some("a"), None, Some("c"), None]);
2732        let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
2733        let dict_array = DictionaryArray::new(keys, Arc::new(values));
2734        let dict_data = dict_array.to_data();
2735
2736        let value_offsets = Buffer::from_slice_ref(offsets);
2737
2738        let list_data = ArrayData::builder(list_data_type)
2739            .len(4)
2740            .add_buffer(value_offsets)
2741            .add_child_data(dict_data)
2742            .build()
2743            .unwrap();
2744        let list_array = GenericListArray::<OffsetSize>::from(list_data);
2745
2746        let keys_for_dict = Int8Array::from_iter_values([0, 3, 0, 1, 1, 2, 0, 1, 3]);
2747        let dict_dict_array = DictionaryArray::new(keys_for_dict, Arc::new(list_array));
2748
2749        let schema = Arc::new(Schema::new(vec![Field::new(
2750            "f1",
2751            dict_dict_array.data_type().clone(),
2752            false,
2753        )]));
2754        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2755        let output_batch = roundtrip_ipc_stream(&input_batch);
2756        assert_eq!(input_batch, output_batch);
2757    }
2758
2759    #[test]
2760    fn test_roundtrip_stream_dict_of_list_of_dict() {
2761        // list
2762        #[allow(deprecated)]
2763        let list_data_type = DataType::List(Arc::new(Field::new_dict(
2764            "item",
2765            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2766            true,
2767            1,
2768            false,
2769        )));
2770        let offsets: &[i32; 5] = &[0, 2, 4, 4, 6];
2771        test_roundtrip_stream_dict_of_list_of_dict_impl::<i32, i32>(list_data_type, offsets);
2772
2773        // large list
2774        #[allow(deprecated)]
2775        let list_data_type = DataType::LargeList(Arc::new(Field::new_dict(
2776            "item",
2777            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2778            true,
2779            1,
2780            false,
2781        )));
2782        let offsets: &[i64; 5] = &[0, 2, 4, 4, 7];
2783        test_roundtrip_stream_dict_of_list_of_dict_impl::<i64, i64>(list_data_type, offsets);
2784    }
2785
2786    #[test]
2787    fn test_roundtrip_stream_dict_of_fixed_size_list_of_dict() {
2788        let values = StringArray::from(vec![Some("a"), None, Some("c"), None]);
2789        let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3, 1, 2]);
2790        let dict_array = DictionaryArray::new(keys, Arc::new(values));
2791        let dict_data = dict_array.into_data();
2792
2793        #[allow(deprecated)]
2794        let list_data_type = DataType::FixedSizeList(
2795            Arc::new(Field::new_dict(
2796                "item",
2797                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2798                true,
2799                1,
2800                false,
2801            )),
2802            3,
2803        );
2804        let list_data = ArrayData::builder(list_data_type)
2805            .len(3)
2806            .add_child_data(dict_data)
2807            .build()
2808            .unwrap();
2809        let list_array = FixedSizeListArray::from(list_data);
2810
2811        let keys_for_dict = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]);
2812        let dict_dict_array = DictionaryArray::new(keys_for_dict, Arc::new(list_array));
2813
2814        let schema = Arc::new(Schema::new(vec![Field::new(
2815            "f1",
2816            dict_dict_array.data_type().clone(),
2817            false,
2818        )]));
2819        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2820        let output_batch = roundtrip_ipc_stream(&input_batch);
2821        assert_eq!(input_batch, output_batch);
2822    }
2823
2824    const LONG_TEST_STRING: &str =
2825        "This is a long string to make sure binary view array handles it";
2826
2827    #[test]
2828    fn test_roundtrip_view_types() {
2829        let schema = Schema::new(vec![
2830            Field::new("field_1", DataType::BinaryView, true),
2831            Field::new("field_2", DataType::Utf8, true),
2832            Field::new("field_3", DataType::Utf8View, true),
2833        ]);
2834        let bin_values: Vec<Option<&[u8]>> = vec![
2835            Some(b"foo"),
2836            None,
2837            Some(b"bar"),
2838            Some(LONG_TEST_STRING.as_bytes()),
2839        ];
2840        let utf8_values: Vec<Option<&str>> =
2841            vec![Some("foo"), None, Some("bar"), Some(LONG_TEST_STRING)];
2842        let bin_view_array = BinaryViewArray::from_iter(bin_values);
2843        let utf8_array = StringArray::from_iter(utf8_values.iter());
2844        let utf8_view_array = StringViewArray::from_iter(utf8_values);
2845        let record_batch = RecordBatch::try_new(
2846            Arc::new(schema.clone()),
2847            vec![
2848                Arc::new(bin_view_array),
2849                Arc::new(utf8_array),
2850                Arc::new(utf8_view_array),
2851            ],
2852        )
2853        .unwrap();
2854
2855        assert_eq!(record_batch, roundtrip_ipc(&record_batch));
2856        assert_eq!(record_batch, roundtrip_ipc_stream(&record_batch));
2857
2858        let sliced_batch = record_batch.slice(1, 2);
2859        assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
2860        assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
2861    }
2862
2863    #[test]
2864    fn test_roundtrip_view_types_nested_dict() {
2865        let bin_values: Vec<Option<&[u8]>> = vec![
2866            Some(b"foo"),
2867            None,
2868            Some(b"bar"),
2869            Some(LONG_TEST_STRING.as_bytes()),
2870            Some(b"field"),
2871        ];
2872        let utf8_values: Vec<Option<&str>> = vec![
2873            Some("foo"),
2874            None,
2875            Some("bar"),
2876            Some(LONG_TEST_STRING),
2877            Some("field"),
2878        ];
2879        let bin_view_array = Arc::new(BinaryViewArray::from_iter(bin_values));
2880        let utf8_view_array = Arc::new(StringViewArray::from_iter(utf8_values));
2881
2882        let key_dict_keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
2883        let key_dict_array = DictionaryArray::new(key_dict_keys, utf8_view_array.clone());
2884        #[allow(deprecated)]
2885        let keys_field = Arc::new(Field::new_dict(
2886            "keys",
2887            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8View)),
2888            true,
2889            1,
2890            false,
2891        ));
2892
2893        let value_dict_keys = Int8Array::from_iter_values([0, 3, 0, 1, 2, 0, 1]);
2894        let value_dict_array = DictionaryArray::new(value_dict_keys, bin_view_array);
2895        #[allow(deprecated)]
2896        let values_field = Arc::new(Field::new_dict(
2897            "values",
2898            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::BinaryView)),
2899            true,
2900            2,
2901            false,
2902        ));
2903        let entry_struct = StructArray::from(vec![
2904            (keys_field, make_array(key_dict_array.into_data())),
2905            (values_field, make_array(value_dict_array.into_data())),
2906        ]);
2907
2908        let map_data_type = DataType::Map(
2909            Arc::new(Field::new(
2910                "entries",
2911                entry_struct.data_type().clone(),
2912                false,
2913            )),
2914            false,
2915        );
2916        let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 7]);
2917        let map_data = ArrayData::builder(map_data_type)
2918            .len(3)
2919            .add_buffer(entry_offsets)
2920            .add_child_data(entry_struct.into_data())
2921            .build()
2922            .unwrap();
2923        let map_array = MapArray::from(map_data);
2924
2925        let dict_keys = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]);
2926        let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array));
2927        let schema = Arc::new(Schema::new(vec![Field::new(
2928            "f1",
2929            dict_dict_array.data_type().clone(),
2930            false,
2931        )]));
2932        let batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2933        assert_eq!(batch, roundtrip_ipc(&batch));
2934        assert_eq!(batch, roundtrip_ipc_stream(&batch));
2935
2936        let sliced_batch = batch.slice(1, 2);
2937        assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
2938        assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
2939    }
2940
2941    #[test]
2942    fn test_no_columns_batch() {
2943        let schema = Arc::new(Schema::empty());
2944        let options = RecordBatchOptions::new()
2945            .with_match_field_names(true)
2946            .with_row_count(Some(10));
2947        let input_batch = RecordBatch::try_new_with_options(schema, vec![], &options).unwrap();
2948        let output_batch = roundtrip_ipc_stream(&input_batch);
2949        assert_eq!(input_batch, output_batch);
2950    }
2951
2952    #[test]
2953    fn test_unaligned() {
2954        let batch = RecordBatch::try_from_iter(vec![(
2955            "i32",
2956            Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
2957        )])
2958        .unwrap();
2959
2960        let r#gen = IpcDataGenerator {};
2961        let mut dict_tracker = DictionaryTracker::new(false);
2962        let (_, encoded) = r#gen
2963            .encode(
2964                &batch,
2965                &mut dict_tracker,
2966                &Default::default(),
2967                &mut Default::default(),
2968            )
2969            .unwrap();
2970
2971        let message = root_as_message(&encoded.ipc_message).unwrap();
2972
2973        // Construct an unaligned buffer
2974        let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
2975        buffer.push(0_u8);
2976        buffer.extend_from_slice(&encoded.arrow_data);
2977        let b = Buffer::from(buffer).slice(1);
2978        assert_ne!(b.as_ptr().align_offset(8), 0);
2979
2980        let ipc_batch = message.header_as_record_batch().unwrap();
2981        let roundtrip = RecordBatchDecoder::try_new(
2982            &b,
2983            ipc_batch,
2984            batch.schema(),
2985            &Default::default(),
2986            &message.version(),
2987        )
2988        .unwrap()
2989        .with_require_alignment(false)
2990        .read_record_batch()
2991        .unwrap();
2992        assert_eq!(batch, roundtrip);
2993    }
2994
2995    #[test]
2996    fn test_unaligned_throws_error_with_require_alignment() {
2997        let batch = RecordBatch::try_from_iter(vec![(
2998            "i32",
2999            Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
3000        )])
3001        .unwrap();
3002
3003        let r#gen = IpcDataGenerator {};
3004        let mut dict_tracker = DictionaryTracker::new(false);
3005        let (_, encoded) = r#gen
3006            .encode(
3007                &batch,
3008                &mut dict_tracker,
3009                &Default::default(),
3010                &mut Default::default(),
3011            )
3012            .unwrap();
3013
3014        let message = root_as_message(&encoded.ipc_message).unwrap();
3015
3016        // Construct an unaligned buffer
3017        let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
3018        buffer.push(0_u8);
3019        buffer.extend_from_slice(&encoded.arrow_data);
3020        let b = Buffer::from(buffer).slice(1);
3021        assert_ne!(b.as_ptr().align_offset(8), 0);
3022
3023        let ipc_batch = message.header_as_record_batch().unwrap();
3024        let result = RecordBatchDecoder::try_new(
3025            &b,
3026            ipc_batch,
3027            batch.schema(),
3028            &Default::default(),
3029            &message.version(),
3030        )
3031        .unwrap()
3032        .with_require_alignment(true)
3033        .read_record_batch();
3034
3035        let error = result.unwrap_err();
3036        assert_eq!(
3037            error.to_string(),
3038            "Invalid argument error: Misaligned buffers[0] in array of type Int32, \
3039             offset from expected alignment of 4 by 1"
3040        );
3041    }
3042
3043    #[test]
3044    fn test_file_with_massive_column_count() {
3045        // 499_999 is upper limit for default settings (1_000_000)
3046        let limit = 600_000;
3047
3048        let fields = (0..limit)
3049            .map(|i| Field::new(format!("{i}"), DataType::Boolean, false))
3050            .collect::<Vec<_>>();
3051        let schema = Arc::new(Schema::new(fields));
3052        let batch = RecordBatch::new_empty(schema);
3053
3054        let mut buf = Vec::new();
3055        let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
3056        writer.write(&batch).unwrap();
3057        writer.finish().unwrap();
3058        drop(writer);
3059
3060        let mut reader = FileReaderBuilder::new()
3061            .with_max_footer_fb_tables(1_500_000)
3062            .build(std::io::Cursor::new(buf))
3063            .unwrap();
3064        let roundtrip_batch = reader.next().unwrap().unwrap();
3065
3066        assert_eq!(batch, roundtrip_batch);
3067    }
3068
3069    #[test]
3070    fn test_file_with_deeply_nested_columns() {
3071        // 60 is upper limit for default settings (64)
3072        let limit = 61;
3073
3074        let fields = (0..limit).fold(
3075            vec![Field::new("leaf", DataType::Boolean, false)],
3076            |field, index| vec![Field::new_struct(format!("{index}"), field, false)],
3077        );
3078        let schema = Arc::new(Schema::new(fields));
3079        let batch = RecordBatch::new_empty(schema);
3080
3081        let mut buf = Vec::new();
3082        let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
3083        writer.write(&batch).unwrap();
3084        writer.finish().unwrap();
3085        drop(writer);
3086
3087        let mut reader = FileReaderBuilder::new()
3088            .with_max_footer_fb_depth(65)
3089            .build(std::io::Cursor::new(buf))
3090            .unwrap();
3091        let roundtrip_batch = reader.next().unwrap().unwrap();
3092
3093        assert_eq!(batch, roundtrip_batch);
3094    }
3095
3096    #[test]
3097    fn test_invalid_struct_array_ipc_read_errors() {
3098        let a_field = Field::new("a", DataType::Int32, false);
3099        let b_field = Field::new("b", DataType::Int32, false);
3100        let struct_fields = Fields::from(vec![a_field.clone(), b_field.clone()]);
3101
3102        let a_array_data = ArrayData::builder(a_field.data_type().clone())
3103            .len(4)
3104            .add_buffer(Buffer::from_slice_ref([1, 2, 3, 4]))
3105            .build()
3106            .unwrap();
3107        let b_array_data = ArrayData::builder(b_field.data_type().clone())
3108            .len(3)
3109            .add_buffer(Buffer::from_slice_ref([5, 6, 7]))
3110            .build()
3111            .unwrap();
3112
3113        let invalid_struct_arr = unsafe {
3114            StructArray::new_unchecked(
3115                struct_fields,
3116                vec![make_array(a_array_data), make_array(b_array_data)],
3117                None,
3118            )
3119        };
3120
3121        expect_ipc_validation_error(
3122            Arc::new(invalid_struct_arr),
3123            "Invalid argument error: Incorrect array length for StructArray field \"b\", expected 4 got 3",
3124        );
3125    }
3126
3127    #[test]
3128    fn test_invalid_nested_array_ipc_read_errors() {
3129        // one of the nested arrays has invalid data
3130        let a_field = Field::new("a", DataType::Int32, false);
3131        let b_field = Field::new("b", DataType::Utf8, false);
3132
3133        let schema = Arc::new(Schema::new(vec![Field::new_struct(
3134            "s",
3135            vec![a_field.clone(), b_field.clone()],
3136            false,
3137        )]));
3138
3139        let a_array_data = ArrayData::builder(a_field.data_type().clone())
3140            .len(4)
3141            .add_buffer(Buffer::from_slice_ref([1, 2, 3, 4]))
3142            .build()
3143            .unwrap();
3144        // invalid nested child array -- length is correct, but has invalid utf8 data
3145        let b_array_data = {
3146            let valid: &[u8] = b"   ";
3147            let mut invalid = vec![];
3148            invalid.extend_from_slice(b"ValidString");
3149            invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3150            let binary_array =
3151                BinaryArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
3152            let array = unsafe {
3153                StringArray::new_unchecked(
3154                    binary_array.offsets().clone(),
3155                    binary_array.values().clone(),
3156                    binary_array.nulls().cloned(),
3157                )
3158            };
3159            array.into_data()
3160        };
3161        let struct_data_type = schema.field(0).data_type();
3162
3163        let invalid_struct_arr = unsafe {
3164            make_array(
3165                ArrayData::builder(struct_data_type.clone())
3166                    .len(4)
3167                    .add_child_data(a_array_data)
3168                    .add_child_data(b_array_data)
3169                    .build_unchecked(),
3170            )
3171        };
3172        expect_ipc_validation_error(
3173            invalid_struct_arr,
3174            "Invalid argument error: Invalid UTF8 sequence at string index 3 (3..18): invalid utf-8 sequence of 1 bytes from index 11",
3175        );
3176    }
3177
3178    #[test]
3179    fn test_same_dict_id_without_preserve() {
3180        let batch = RecordBatch::try_new(
3181            Arc::new(Schema::new(
3182                ["a", "b"]
3183                    .iter()
3184                    .map(|name| {
3185                        #[allow(deprecated)]
3186                        Field::new_dict(
3187                            name.to_string(),
3188                            DataType::Dictionary(
3189                                Box::new(DataType::Int32),
3190                                Box::new(DataType::Utf8),
3191                            ),
3192                            true,
3193                            0,
3194                            false,
3195                        )
3196                    })
3197                    .collect::<Vec<Field>>(),
3198            )),
3199            vec![
3200                Arc::new(
3201                    vec![Some("c"), Some("d")]
3202                        .into_iter()
3203                        .collect::<DictionaryArray<Int32Type>>(),
3204                ) as ArrayRef,
3205                Arc::new(
3206                    vec![Some("e"), Some("f")]
3207                        .into_iter()
3208                        .collect::<DictionaryArray<Int32Type>>(),
3209                ) as ArrayRef,
3210            ],
3211        )
3212        .expect("Failed to create RecordBatch");
3213
3214        // serialize the record batch as an IPC stream
3215        let mut buf = vec![];
3216        {
3217            let mut writer = crate::writer::StreamWriter::try_new_with_options(
3218                &mut buf,
3219                batch.schema().as_ref(),
3220                crate::writer::IpcWriteOptions::default(),
3221            )
3222            .expect("Failed to create StreamWriter");
3223            writer.write(&batch).expect("Failed to write RecordBatch");
3224            writer.finish().expect("Failed to finish StreamWriter");
3225        }
3226
3227        StreamReader::try_new(std::io::Cursor::new(buf), None)
3228            .expect("Failed to create StreamReader")
3229            .for_each(|decoded_batch| {
3230                assert_eq!(decoded_batch.expect("Failed to read RecordBatch"), batch);
3231            });
3232    }
3233
3234    #[test]
3235    fn test_validation_of_invalid_list_array() {
3236        // ListArray with invalid offsets
3237        let array = unsafe {
3238            let values = Int32Array::from(vec![1, 2, 3]);
3239            let bad_offsets = ScalarBuffer::<i32>::from(vec![0, 2, 4, 2]); // offsets can't go backwards
3240            let offsets = OffsetBuffer::new_unchecked(bad_offsets); // INVALID array created
3241            let field = Field::new_list_field(DataType::Int32, true);
3242            let nulls = None;
3243            ListArray::new(Arc::new(field), offsets, Arc::new(values), nulls)
3244        };
3245
3246        expect_ipc_validation_error(
3247            Arc::new(array),
3248            "Invalid argument error: Offset invariant failure: offset at position 2 out of bounds: 4 > 2",
3249        );
3250    }
3251
3252    #[test]
3253    fn test_validation_of_invalid_string_array() {
3254        let valid: &[u8] = b"   ";
3255        let mut invalid = vec![];
3256        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3257        invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3258        let binary_array = BinaryArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
3259        // data is not valid utf8 we can not construct a correct StringArray
3260        // safely, so purposely create an invalid StringArray
3261        let array = unsafe {
3262            StringArray::new_unchecked(
3263                binary_array.offsets().clone(),
3264                binary_array.values().clone(),
3265                binary_array.nulls().cloned(),
3266            )
3267        };
3268        expect_ipc_validation_error(
3269            Arc::new(array),
3270            "Invalid argument error: Invalid UTF8 sequence at string index 3 (3..45): invalid utf-8 sequence of 1 bytes from index 38",
3271        );
3272    }
3273
3274    #[test]
3275    fn test_validation_of_invalid_string_view_array() {
3276        let valid: &[u8] = b"   ";
3277        let mut invalid = vec![];
3278        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3279        invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3280        let binary_view_array =
3281            BinaryViewArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
3282        // data is not valid utf8 we can not construct a correct StringArray
3283        // safely, so purposely create an invalid StringArray
3284        let array = unsafe {
3285            StringViewArray::new_unchecked(
3286                binary_view_array.views().clone(),
3287                binary_view_array.data_buffers().to_vec(),
3288                binary_view_array.nulls().cloned(),
3289            )
3290        };
3291        expect_ipc_validation_error(
3292            Arc::new(array),
3293            "Invalid argument error: Encountered non-UTF-8 data at index 3: invalid utf-8 sequence of 1 bytes from index 38",
3294        );
3295    }
3296
3297    /// return an invalid dictionary array (key is larger than values)
3298    /// ListArray with invalid offsets
3299    #[test]
3300    fn test_validation_of_invalid_dictionary_array() {
3301        let array = unsafe {
3302            let values = StringArray::from_iter_values(["a", "b", "c"]);
3303            let keys = Int32Array::from(vec![1, 200]); // keys are not valid for values
3304            DictionaryArray::new_unchecked(keys, Arc::new(values))
3305        };
3306
3307        expect_ipc_validation_error(
3308            Arc::new(array),
3309            "Invalid argument error: Value at position 1 out of bounds: 200 (should be in [0, 2])",
3310        );
3311    }
3312
3313    #[test]
3314    fn test_validation_of_invalid_union_array() {
3315        let array = unsafe {
3316            let fields = UnionFields::try_new(
3317                vec![1, 3], // typeids : type id 2 is not valid
3318                vec![
3319                    Field::new("a", DataType::Int32, false),
3320                    Field::new("b", DataType::Utf8, false),
3321                ],
3322            )
3323            .unwrap();
3324            let type_ids = ScalarBuffer::from(vec![1i8, 2, 3]); // 2 is invalid
3325            let offsets = None;
3326            let children: Vec<ArrayRef> = vec![
3327                Arc::new(Int32Array::from(vec![10, 20, 30])),
3328                Arc::new(StringArray::from(vec![Some("a"), Some("b"), Some("c")])),
3329            ];
3330
3331            UnionArray::new_unchecked(fields, type_ids, offsets, children)
3332        };
3333
3334        expect_ipc_validation_error(
3335            Arc::new(array),
3336            "Invalid argument error: Type Ids values must match one of the field type ids",
3337        );
3338    }
3339
3340    /// Invalid Utf-8 sequence in the first character
3341    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
3342    const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3343
3344    /// Expect an error when reading the record batch using IPC or IPC Streams
3345    fn expect_ipc_validation_error(array: ArrayRef, expected_err: &str) {
3346        let rb = RecordBatch::try_from_iter([("a", array)]).unwrap();
3347
3348        // IPC Stream format
3349        let buf = write_stream(&rb); // write is ok
3350        read_stream_skip_validation(&buf).unwrap();
3351        let err = read_stream(&buf).unwrap_err();
3352        assert_eq!(err.to_string(), expected_err);
3353
3354        // IPC File format
3355        let buf = write_ipc(&rb); // write is ok
3356        read_ipc_skip_validation(&buf).unwrap();
3357        let err = read_ipc(&buf).unwrap_err();
3358        assert_eq!(err.to_string(), expected_err);
3359
3360        // IPC Format with FileDecoder
3361        read_ipc_with_decoder_skip_validation(buf.clone()).unwrap();
3362        let err = read_ipc_with_decoder(buf).unwrap_err();
3363        assert_eq!(err.to_string(), expected_err);
3364    }
3365
3366    #[test]
3367    fn test_roundtrip_schema() {
3368        let schema = Schema::new(vec![
3369            Field::new(
3370                "a",
3371                DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3372                false,
3373            ),
3374            Field::new(
3375                "b",
3376                DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3377                false,
3378            ),
3379        ]);
3380
3381        let options = IpcWriteOptions::default();
3382        let data_gen = IpcDataGenerator::default();
3383        let mut dict_tracker = DictionaryTracker::new(false);
3384        let encoded_data =
3385            data_gen.schema_to_bytes_with_dictionary_tracker(&schema, &mut dict_tracker, &options);
3386        let mut schema_bytes = vec![];
3387        write_message(&mut schema_bytes, encoded_data, &options).expect("write_message");
3388
3389        let begin_offset: usize = if schema_bytes[0..4].eq(&CONTINUATION_MARKER) {
3390            4
3391        } else {
3392            0
3393        };
3394
3395        size_prefixed_root_as_message(&schema_bytes[begin_offset..])
3396            .expect_err("size_prefixed_root_as_message");
3397
3398        let msg = parse_message(&schema_bytes).expect("parse_message");
3399        let ipc_schema = msg.header_as_schema().expect("header_as_schema");
3400        let new_schema = fb_to_schema(ipc_schema);
3401
3402        assert_eq!(schema, new_schema);
3403    }
3404
3405    #[test]
3406    fn test_negative_meta_len() {
3407        let bytes = i32::to_le_bytes(-1);
3408        let mut buf = vec![];
3409        buf.extend(CONTINUATION_MARKER);
3410        buf.extend(bytes);
3411
3412        let reader = StreamReader::try_new(Cursor::new(buf), None);
3413        assert!(reader.is_err());
3414    }
3415
3416    /// Per the IPC specification, dictionary batches may be omitted for
3417    /// dictionary-encoded columns where all values are null.  The C++
3418    /// implementation relies on this and does not emit a dictionary batch
3419    /// in that case.  Verify that the Rust reader handles such streams
3420    /// by synthesizing an empty dictionary instead of returning an error.
3421    #[test]
3422    fn test_read_null_dict_without_dictionary_batch() {
3423        // Build an all-null dictionary-encoded column.
3424        let keys = Int32Array::new_null(4);
3425        let values: ArrayRef = new_empty_array(&DataType::Utf8);
3426        let dict_array = DictionaryArray::new(keys, values);
3427
3428        let schema = Arc::new(Schema::new(vec![Field::new(
3429            "d",
3430            dict_array.data_type().clone(),
3431            true,
3432        )]));
3433        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(dict_array)]).unwrap();
3434
3435        // Write a normal IPC stream (which includes the dictionary batch).
3436        let full_stream = write_stream(&batch);
3437
3438        // Parse the stream into individual messages and reconstruct it
3439        // without the DictionaryBatch message, simulating what C++ emits
3440        // for an all-null dictionary column.
3441        let mut stripped = Vec::new();
3442        let mut cursor = Cursor::new(&full_stream);
3443        loop {
3444            // Each message is: [continuation (4 bytes)] [meta_len (4 bytes)]
3445            //                   [metadata (meta_len bytes)] [body (bodyLength bytes)]
3446            let mut header = [0u8; 4];
3447            if cursor.read_exact(&mut header).is_err() {
3448                break;
3449            }
3450            if header == CONTINUATION_MARKER && cursor.read_exact(&mut header).is_err() {
3451                break;
3452            }
3453            let meta_len = u32::from_le_bytes(header) as usize;
3454            if meta_len == 0 {
3455                // EOS marker — write it through.
3456                stripped.extend_from_slice(&CONTINUATION_MARKER);
3457                stripped.extend_from_slice(&0u32.to_le_bytes());
3458                break;
3459            }
3460            let mut meta_buf = vec![0u8; meta_len];
3461            cursor.read_exact(&mut meta_buf).unwrap();
3462
3463            let message = root_as_message(&meta_buf).unwrap();
3464            let body_len = message.bodyLength() as usize;
3465            let mut body_buf = vec![0u8; body_len];
3466            cursor.read_exact(&mut body_buf).unwrap();
3467
3468            if message.header_type() == crate::MessageHeader::DictionaryBatch {
3469                // Skip the dictionary batch — this is what C++ does for
3470                // all-null dictionary columns.
3471                continue;
3472            }
3473            stripped.extend_from_slice(&CONTINUATION_MARKER);
3474            stripped.extend_from_slice(&(meta_len as u32).to_le_bytes());
3475            stripped.extend_from_slice(&meta_buf);
3476            stripped.extend_from_slice(&body_buf);
3477        }
3478
3479        // Reading the stripped stream must succeed.
3480        let result = read_stream(&stripped).unwrap();
3481        assert_eq!(result.num_rows(), 4);
3482        assert_eq!(result.num_columns(), 1);
3483
3484        let col = result.column(0);
3485        assert_eq!(col.null_count(), 4);
3486        assert_eq!(col.len(), 4);
3487        // The result must be a dictionary-typed array.
3488        assert!(matches!(col.data_type(), DataType::Dictionary(_, _)));
3489    }
3490
3491    // Tests projected reads where a ListView column is skipped before another column.
3492    // This catches cases where skipping the ListView consumes the wrong number of buffers.
3493    #[test]
3494    fn test_projection_skip_list_view() {
3495        use crate::reader::FileReader;
3496        use crate::writer::FileWriter;
3497        use arrow_array::{
3498            GenericListViewArray, Int32Array, RecordBatch,
3499            builder::{GenericListViewBuilder, UInt32Builder},
3500        };
3501        use arrow_schema::{DataType, Field, Schema};
3502        use std::sync::Arc;
3503
3504        // Build a small ListView column with a mix of valid and null entries
3505        let mut builder = GenericListViewBuilder::<i32, _>::new(UInt32Builder::new());
3506
3507        builder.values().append_value(1);
3508        builder.values().append_value(2);
3509        builder.append(true);
3510
3511        builder.append(false);
3512
3513        builder.values().append_value(3);
3514        builder.values().append_value(4);
3515        builder.append(true);
3516
3517        let list_view: GenericListViewArray<i32> = builder.finish();
3518
3519        // Second column with simple values
3520        let values = Int32Array::from(vec![10, 20, 30]);
3521
3522        // Schema: first column is ListView, second is Int32
3523        let schema = Arc::new(Schema::new(vec![
3524            Field::new("a", list_view.data_type().clone(), true),
3525            Field::new("b", DataType::Int32, false),
3526        ]));
3527        // Create a batch with both columns
3528        let batch =
3529            RecordBatch::try_new(schema, vec![Arc::new(list_view), Arc::new(values.clone())])
3530                .unwrap();
3531
3532        // Write the batch to IPC
3533        let mut buf = Vec::new();
3534        {
3535            let mut writer = FileWriter::try_new(&mut buf, &batch.schema()).unwrap();
3536            writer.write(&batch).unwrap();
3537            writer.finish().unwrap();
3538        }
3539
3540        // Skip ListView column and Project only column "b"
3541        let mut reader = FileReader::try_new(std::io::Cursor::new(buf), Some(vec![1])).unwrap();
3542        let read_batch = reader.next().unwrap().unwrap();
3543
3544        // Verify that the projected column is read correctly
3545        assert_eq!(read_batch.num_columns(), 1);
3546        assert_eq!(read_batch.column(0).as_ref(), &values);
3547    }
3548
3549    // Tests reading a column when a preceding V4 Union column is skipped.
3550    // V4 Union columns include a null buffer and type ids (and offsets for dense unions).
3551    #[test]
3552    fn test_projection_skip_union_v4() {
3553        use crate::MetadataVersion;
3554        use crate::reader::FileReader;
3555        use crate::writer::{FileWriter, IpcWriteOptions};
3556        use arrow_array::{
3557            ArrayRef, Int32Array, RecordBatch, builder::UnionBuilder, types::Int32Type,
3558        };
3559        use arrow_schema::{DataType, Field, Schema};
3560        use std::sync::Arc;
3561
3562        // Build a dense Union column with simple Int32 values
3563        let mut builder = UnionBuilder::new_dense();
3564        builder.append::<Int32Type>("a", 1).unwrap();
3565        builder.append::<Int32Type>("a", 2).unwrap();
3566        builder.append::<Int32Type>("a", 3).unwrap();
3567        let union = builder.build().unwrap();
3568
3569        // Second column with known values to verify correctness after projection
3570        let values = Int32Array::from(vec![10, 20, 30]);
3571
3572        // Schema: first column is Union (to be skipped), second is Int32 (to be read)
3573        let schema = Arc::new(Schema::new(vec![
3574            Field::new("union", union.data_type().clone(), false),
3575            Field::new("values", DataType::Int32, false),
3576        ]));
3577
3578        // Create a batch containing both columns
3579        let batch = RecordBatch::try_new(
3580            schema,
3581            vec![Arc::new(union) as ArrayRef, Arc::new(values.clone())],
3582        )
3583        .unwrap();
3584
3585        // Write IPC using V4 metadata to trigger Union null buffer behavior
3586        let mut buf = Vec::new();
3587        {
3588            let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap();
3589            let mut writer =
3590                FileWriter::try_new_with_options(&mut buf, &batch.schema(), options).unwrap();
3591            writer.write(&batch).unwrap();
3592            writer.finish().unwrap();
3593        }
3594        // Read only the second column (skip the Union column)
3595        let mut reader = FileReader::try_new(std::io::Cursor::new(buf), Some(vec![1])).unwrap();
3596        let read_batch = reader.next().unwrap().unwrap();
3597
3598        // Verify that the projected column is read correctly after skipping Union
3599        assert_eq!(read_batch.num_columns(), 1);
3600        assert_eq!(read_batch.column(0).as_ref(), &values);
3601    }
3602
3603    // Tests reading a column when preceding fixed-width and boolean columns are skipped.
3604    // Covers all types that use the same two-buffer layout (null + values).
3605    // Verifies that skipping these types does not affect subsequent column decoding.
3606    #[test]
3607    fn test_projection_skip_fixed_width_types() {
3608        use std::sync::Arc;
3609
3610        use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, make_array};
3611        use arrow_buffer::Buffer;
3612        use arrow_data::ArrayData;
3613        use arrow_schema::{DataType, Field, IntervalUnit, Schema, TimeUnit};
3614
3615        use crate::reader::FileReader;
3616        use crate::writer::FileWriter;
3617
3618        // Create a minimal array for a given fixed-width or boolean type
3619        fn make_array_for_type(data_type: DataType) -> ArrayRef {
3620            let len = 3;
3621
3622            if matches!(data_type, DataType::Boolean) {
3623                return Arc::new(BooleanArray::from(vec![true, false, true]));
3624            }
3625
3626            let width = data_type.primitive_width().unwrap();
3627            let data = ArrayData::builder(data_type)
3628                .len(len)
3629                .add_buffer(Buffer::from(vec![0_u8; len * width]))
3630                .build()
3631                .unwrap();
3632
3633            make_array(data)
3634        }
3635
3636        // List of types that follow the same two-buffer layout (null + values)
3637        let data_types = vec![
3638            DataType::Boolean,
3639            DataType::Int8,
3640            DataType::Int16,
3641            DataType::Int32,
3642            DataType::Int64,
3643            DataType::UInt8,
3644            DataType::UInt16,
3645            DataType::UInt32,
3646            DataType::UInt64,
3647            DataType::Float16,
3648            DataType::Float32,
3649            DataType::Float64,
3650            DataType::Timestamp(TimeUnit::Second, None),
3651            DataType::Date32,
3652            DataType::Date64,
3653            DataType::Time32(TimeUnit::Second),
3654            DataType::Time64(TimeUnit::Microsecond),
3655            DataType::Duration(TimeUnit::Second),
3656            DataType::Interval(IntervalUnit::YearMonth),
3657            DataType::Interval(IntervalUnit::DayTime),
3658            DataType::Interval(IntervalUnit::MonthDayNano),
3659            DataType::Decimal32(9, 2),
3660            DataType::Decimal64(18, 2),
3661            DataType::Decimal128(38, 2),
3662            DataType::Decimal256(76, 2),
3663        ];
3664
3665        // For each type:
3666        // - write a batch with [skipped_column, values]
3667        // - read only the second column
3668        // - verify the result is correct
3669        for data_type in data_types {
3670            let skipped = make_array_for_type(data_type.clone());
3671            let values = Int32Array::from(vec![10, 20, 30]);
3672
3673            let schema = Arc::new(Schema::new(vec![
3674                Field::new("skipped", data_type, false),
3675                Field::new("values", DataType::Int32, false),
3676            ]));
3677
3678            let batch =
3679                RecordBatch::try_new(schema, vec![skipped, Arc::new(values.clone())]).unwrap();
3680
3681            // Serialize the batch into IPC format
3682            let mut buf = Vec::new();
3683            {
3684                let mut writer = FileWriter::try_new(&mut buf, &batch.schema()).unwrap();
3685                writer.write(&batch).unwrap();
3686                writer.finish().unwrap();
3687            }
3688
3689            // Read back only the second column (skip the first)
3690            let mut reader = FileReader::try_new(std::io::Cursor::new(buf), Some(vec![1])).unwrap();
3691            let read_batch = reader.next().unwrap().unwrap();
3692
3693            // Verify that the returned column matches the original values column
3694            assert_eq!(read_batch.num_columns(), 1);
3695            assert_eq!(read_batch.column(0).as_ref(), &values);
3696        }
3697    }
3698}