Skip to main content

arrow_ipc/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Arrow IPC File and Stream Readers
19//!
20//! # Notes
21//!
22//! The [`FileReader`] and [`StreamReader`] have similar interfaces,
23//! however the [`FileReader`] expects a reader that supports [`Seek`]ing
24//!
25//! [`Seek`]: std::io::Seek
26
27mod stream;
28pub use stream::*;
29
30use arrow_select::concat;
31
32use flatbuffers::{VectorIter, VerifierOptions};
33use std::collections::{HashMap, VecDeque};
34use std::fmt;
35use std::io::{BufReader, Read, Seek, SeekFrom};
36use std::sync::Arc;
37
38use arrow_array::*;
39use arrow_buffer::{
40    ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, NullBuffer, ScalarBuffer,
41};
42use arrow_data::{ArrayData, ArrayDataBuilder, UnsafeFlag};
43use arrow_schema::*;
44
45use crate::compression::{CompressionCodec, DecompressionContext};
46use crate::r#gen::Message::{self};
47use crate::{Block, CONTINUATION_MARKER, FieldNode, MetadataVersion};
48use DataType::*;
49
50/// Read a buffer based on offset and length
51/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
52/// Each constituent buffer is first compressed with the indicated
53/// compressor, and then written with the uncompressed length in the first 8
54/// bytes as a 64-bit little-endian signed integer followed by the compressed
55/// buffer bytes (and then padding as required by the protocol). The
56/// uncompressed length may be set to -1 to indicate that the data that
57/// follows is not compressed, which can be useful for cases where
58/// compression does not yield appreciable savings.
59fn read_buffer(
60    buf: &crate::Buffer,
61    a_data: &Buffer,
62    compression_codec: Option<CompressionCodec>,
63    decompression_context: &mut DecompressionContext,
64) -> Result<Buffer, ArrowError> {
65    let start_offset = buf.offset() as usize;
66    let buf_data = a_data.slice_with_length(start_offset, buf.length() as usize);
67    // corner case: empty buffer
68    match (buf_data.is_empty(), compression_codec) {
69        (true, _) | (_, None) => Ok(buf_data),
70        (false, Some(decompressor)) => {
71            decompressor.decompress_to_buffer(&buf_data, decompression_context)
72        }
73    }
74}
75impl RecordBatchDecoder<'_> {
76    /// Coordinates reading arrays based on data types.
77    ///
78    /// `variadic_counts` encodes the number of buffers to read for variadic types (e.g., Utf8View, BinaryView)
79    /// When encounter such types, we pop from the front of the queue to get the number of buffers to read.
80    ///
81    /// Notes:
82    /// * In the IPC format, null buffers are always set, but may be empty. We discard them if an array has 0 nulls
83    /// * Numeric values inside list arrays are often stored as 64-bit values regardless of their data type size.
84    ///   We thus:
85    ///     - check if the bit width of non-64-bit numbers is 64, and
86    ///     - read the buffer as 64-bit (signed integer or float), and
87    ///     - cast the 64-bit array to the appropriate data type
88    fn create_array(
89        &mut self,
90        field: &Field,
91        variadic_counts: &mut VecDeque<i64>,
92    ) -> Result<ArrayRef, ArrowError> {
93        let data_type = field.data_type();
94        match data_type {
95            Utf8 | Binary | LargeBinary | LargeUtf8 => {
96                let field_node = self.next_node(field)?;
97                let buffers = [
98                    self.next_buffer()?,
99                    self.next_buffer()?,
100                    self.next_buffer()?,
101                ];
102                self.create_primitive_array(field_node, data_type, &buffers)
103            }
104            BinaryView | Utf8View => {
105                let count = variadic_counts
106                    .pop_front()
107                    .ok_or(ArrowError::IpcError(format!(
108                        "Missing variadic count for {data_type} column"
109                    )))?;
110                let count = count + 2; // view and null buffer.
111                let buffers = (0..count)
112                    .map(|_| self.next_buffer())
113                    .collect::<Result<Vec<_>, _>>()?;
114                let field_node = self.next_node(field)?;
115                self.create_primitive_array(field_node, data_type, &buffers)
116            }
117            FixedSizeBinary(_) => {
118                let field_node = self.next_node(field)?;
119                let buffers = [self.next_buffer()?, self.next_buffer()?];
120                self.create_primitive_array(field_node, data_type, &buffers)
121            }
122            List(list_field) | LargeList(list_field) | Map(list_field, _) => {
123                let list_node = self.next_node(field)?;
124                let list_buffers = [self.next_buffer()?, self.next_buffer()?];
125                let values = self.create_array(list_field, variadic_counts)?;
126                self.create_list_array(list_node, data_type, &list_buffers, values)
127            }
128            ListView(list_field) | LargeListView(list_field) => {
129                let list_node = self.next_node(field)?;
130                let list_buffers = [
131                    self.next_buffer()?, // null buffer
132                    self.next_buffer()?, // offsets
133                    self.next_buffer()?, // sizes
134                ];
135                let values = self.create_array(list_field, variadic_counts)?;
136                self.create_list_view_array(list_node, data_type, &list_buffers, values)
137            }
138            FixedSizeList(list_field, _) => {
139                let list_node = self.next_node(field)?;
140                let list_buffers = [self.next_buffer()?];
141                let values = self.create_array(list_field, variadic_counts)?;
142                self.create_list_array(list_node, data_type, &list_buffers, values)
143            }
144            Struct(struct_fields) => {
145                let struct_node = self.next_node(field)?;
146                let null_buffer = self.next_buffer()?;
147
148                // read the arrays for each field
149                let mut struct_arrays = vec![];
150                // TODO investigate whether just knowing the number of buffers could
151                // still work
152                for struct_field in struct_fields {
153                    let child = self.create_array(struct_field, variadic_counts)?;
154                    struct_arrays.push(child);
155                }
156                self.create_struct_array(struct_node, null_buffer, struct_fields, struct_arrays)
157            }
158            RunEndEncoded(run_ends_field, values_field) => {
159                let run_node = self.next_node(field)?;
160                let run_ends = self.create_array(run_ends_field, variadic_counts)?;
161                let values = self.create_array(values_field, variadic_counts)?;
162
163                let run_array_length = run_node.length() as usize;
164                let builder = ArrayData::builder(data_type.clone())
165                    .len(run_array_length)
166                    .offset(0)
167                    .add_child_data(run_ends.into_data())
168                    .add_child_data(values.into_data())
169                    .null_count(run_node.null_count() as usize);
170
171                self.create_array_from_builder(builder)
172            }
173            // Create dictionary array from RecordBatch
174            Dictionary(_, _) => {
175                let index_node = self.next_node(field)?;
176                let index_buffers = [self.next_buffer()?, self.next_buffer()?];
177
178                #[allow(deprecated)]
179                let dict_id = field.dict_id().ok_or_else(|| {
180                    ArrowError::ParseError(format!("Field {field} does not have dict id"))
181                })?;
182
183                let value_array = self.dictionaries_by_id.get(&dict_id).ok_or_else(|| {
184                    ArrowError::ParseError(format!(
185                        "Cannot find a dictionary batch with dict id: {dict_id}"
186                    ))
187                })?;
188
189                self.create_dictionary_array(
190                    index_node,
191                    data_type,
192                    &index_buffers,
193                    value_array.clone(),
194                )
195            }
196            Union(fields, mode) => {
197                let union_node = self.next_node(field)?;
198                let len = union_node.length() as usize;
199
200                // In V4, union types has validity bitmap
201                // In V5 and later, union types have no validity bitmap
202                if self.version < MetadataVersion::V5 {
203                    self.next_buffer()?;
204                }
205
206                let type_ids: ScalarBuffer<i8> =
207                    self.next_buffer()?.slice_with_length(0, len).into();
208
209                let value_offsets = match mode {
210                    UnionMode::Dense => {
211                        let offsets: ScalarBuffer<i32> =
212                            self.next_buffer()?.slice_with_length(0, len * 4).into();
213                        Some(offsets)
214                    }
215                    UnionMode::Sparse => None,
216                };
217
218                let mut children = Vec::with_capacity(fields.len());
219
220                for (_id, field) in fields.iter() {
221                    let child = self.create_array(field, variadic_counts)?;
222                    children.push(child);
223                }
224
225                let array = if self.skip_validation.get() {
226                    // safety: flag can only be set via unsafe code
227                    unsafe {
228                        UnionArray::new_unchecked(fields.clone(), type_ids, value_offsets, children)
229                    }
230                } else {
231                    UnionArray::try_new(fields.clone(), type_ids, value_offsets, children)?
232                };
233                Ok(Arc::new(array))
234            }
235            Null => {
236                let node = self.next_node(field)?;
237                let length = node.length();
238                let null_count = node.null_count();
239
240                if length != null_count {
241                    return Err(ArrowError::SchemaError(format!(
242                        "Field {field} of NullArray has unequal null_count {null_count} and len {length}"
243                    )));
244                }
245
246                let builder = ArrayData::builder(data_type.clone())
247                    .len(length as usize)
248                    .offset(0);
249                self.create_array_from_builder(builder)
250            }
251            _ => {
252                let field_node = self.next_node(field)?;
253                let buffers = [self.next_buffer()?, self.next_buffer()?];
254                self.create_primitive_array(field_node, data_type, &buffers)
255            }
256        }
257    }
258
259    /// Reads the correct number of buffers based on data type and null_count, and creates a
260    /// primitive array ref
261    fn create_primitive_array(
262        &self,
263        field_node: &FieldNode,
264        data_type: &DataType,
265        buffers: &[Buffer],
266    ) -> Result<ArrayRef, ArrowError> {
267        let length = field_node.length() as usize;
268        let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
269        let mut builder = match data_type {
270            Utf8 | Binary | LargeBinary | LargeUtf8 => {
271                // read 3 buffers: null buffer (optional), offsets buffer and data buffer
272                ArrayData::builder(data_type.clone())
273                    .len(length)
274                    .buffers(buffers[1..3].to_vec())
275                    .null_bit_buffer(null_buffer)
276            }
277            BinaryView | Utf8View => ArrayData::builder(data_type.clone())
278                .len(length)
279                .buffers(buffers[1..].to_vec())
280                .null_bit_buffer(null_buffer),
281            _ if data_type.is_primitive() || matches!(data_type, Boolean | FixedSizeBinary(_)) => {
282                // read 2 buffers: null buffer (optional) and data buffer
283                ArrayData::builder(data_type.clone())
284                    .len(length)
285                    .add_buffer(buffers[1].clone())
286                    .null_bit_buffer(null_buffer)
287            }
288            t => unreachable!("Data type {:?} either unsupported or not primitive", t),
289        };
290
291        builder = builder.null_count(field_node.null_count() as usize);
292
293        self.create_array_from_builder(builder)
294    }
295
296    /// Update the ArrayDataBuilder based on settings in this decoder
297    fn create_array_from_builder(&self, builder: ArrayDataBuilder) -> Result<ArrayRef, ArrowError> {
298        let mut builder = builder.align_buffers(!self.require_alignment);
299        if self.skip_validation.get() {
300            // SAFETY: flag can only be set via unsafe code
301            unsafe { builder = builder.skip_validation(true) }
302        };
303        Ok(make_array(builder.build()?))
304    }
305
306    /// Reads the correct number of buffers based on list type and null_count, and creates a
307    /// list array ref
308    fn create_list_array(
309        &self,
310        field_node: &FieldNode,
311        data_type: &DataType,
312        buffers: &[Buffer],
313        child_array: ArrayRef,
314    ) -> Result<ArrayRef, ArrowError> {
315        let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
316        let length = field_node.length() as usize;
317        let child_data = child_array.into_data();
318        let mut builder = match data_type {
319            List(_) | LargeList(_) | Map(_, _) => ArrayData::builder(data_type.clone())
320                .len(length)
321                .add_buffer(buffers[1].clone())
322                .add_child_data(child_data)
323                .null_bit_buffer(null_buffer),
324
325            FixedSizeList(_, _) => ArrayData::builder(data_type.clone())
326                .len(length)
327                .add_child_data(child_data)
328                .null_bit_buffer(null_buffer),
329
330            _ => unreachable!("Cannot create list or map array from {:?}", data_type),
331        };
332
333        builder = builder.null_count(field_node.null_count() as usize);
334
335        self.create_array_from_builder(builder)
336    }
337
338    fn create_list_view_array(
339        &self,
340        field_node: &FieldNode,
341        data_type: &DataType,
342        buffers: &[Buffer],
343        child_array: ArrayRef,
344    ) -> Result<ArrayRef, ArrowError> {
345        assert!(matches!(data_type, ListView(_) | LargeListView(_)));
346
347        let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
348        let length = field_node.length() as usize;
349        let child_data = child_array.into_data();
350
351        self.create_array_from_builder(
352            ArrayData::builder(data_type.clone())
353                .len(length)
354                .add_buffer(buffers[1].clone()) // offsets
355                .add_buffer(buffers[2].clone()) // sizes
356                .add_child_data(child_data)
357                .null_bit_buffer(null_buffer)
358                .null_count(field_node.null_count() as usize),
359        )
360    }
361
362    fn create_struct_array(
363        &self,
364        struct_node: &FieldNode,
365        null_buffer: Buffer,
366        struct_fields: &Fields,
367        struct_arrays: Vec<ArrayRef>,
368    ) -> Result<ArrayRef, ArrowError> {
369        let null_count = struct_node.null_count() as usize;
370        let len = struct_node.length() as usize;
371        let skip_validation = self.skip_validation.get();
372
373        let nulls = if null_count > 0 {
374            let validity_buffer = BooleanBuffer::new(null_buffer, 0, len);
375            let null_buffer = if skip_validation {
376                // safety: flag can only be set via unsafe code
377                unsafe { NullBuffer::new_unchecked(validity_buffer, null_count) }
378            } else {
379                let null_buffer = NullBuffer::new(validity_buffer);
380
381                if null_buffer.null_count() != null_count {
382                    return Err(ArrowError::InvalidArgumentError(format!(
383                        "null_count value ({}) doesn't match actual number of nulls in array ({})",
384                        null_count,
385                        null_buffer.null_count()
386                    )));
387                }
388
389                null_buffer
390            };
391
392            Some(null_buffer)
393        } else {
394            None
395        };
396        if struct_arrays.is_empty() {
397            // `StructArray::from` can't infer the correct row count
398            // if we have zero fields
399            return Ok(Arc::new(StructArray::new_empty_fields(len, nulls)));
400        }
401
402        let struct_array = if skip_validation {
403            // safety: flag can only be set via unsafe code
404            unsafe { StructArray::new_unchecked(struct_fields.clone(), struct_arrays, nulls) }
405        } else {
406            StructArray::try_new(struct_fields.clone(), struct_arrays, nulls)?
407        };
408
409        Ok(Arc::new(struct_array))
410    }
411
412    /// Reads the correct number of buffers based on list type and null_count, and creates a
413    /// list array ref
414    fn create_dictionary_array(
415        &self,
416        field_node: &FieldNode,
417        data_type: &DataType,
418        buffers: &[Buffer],
419        value_array: ArrayRef,
420    ) -> Result<ArrayRef, ArrowError> {
421        if let Dictionary(_, _) = *data_type {
422            let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
423            let builder = ArrayData::builder(data_type.clone())
424                .len(field_node.length() as usize)
425                .add_buffer(buffers[1].clone())
426                .add_child_data(value_array.into_data())
427                .null_bit_buffer(null_buffer)
428                .null_count(field_node.null_count() as usize);
429            self.create_array_from_builder(builder)
430        } else {
431            unreachable!("Cannot create dictionary array from {:?}", data_type)
432        }
433    }
434}
435
436/// State for decoding Arrow arrays from an [IPC RecordBatch] structure to
437/// [`RecordBatch`]
438///
439/// [IPC RecordBatch]: crate::RecordBatch
440///
441pub struct RecordBatchDecoder<'a> {
442    /// The flatbuffers encoded record batch
443    batch: crate::RecordBatch<'a>,
444    /// The output schema
445    schema: SchemaRef,
446    /// Decoded dictionaries indexed by dictionary id
447    dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
448    /// Optional compression codec
449    compression: Option<CompressionCodec>,
450    /// Decompression context for reusing zstd decompressor state
451    decompression_context: DecompressionContext,
452    /// The format version
453    version: MetadataVersion,
454    /// The raw data buffer
455    data: &'a Buffer,
456    /// The fields comprising this array
457    nodes: VectorIter<'a, FieldNode>,
458    /// The buffers comprising this array
459    buffers: VectorIter<'a, crate::Buffer>,
460    /// Projection (subset of columns) to read, if any
461    /// See [`RecordBatchDecoder::with_projection`] for details
462    projection: Option<&'a [usize]>,
463    /// Are buffers required to already be aligned? See
464    /// [`RecordBatchDecoder::with_require_alignment`] for details
465    require_alignment: bool,
466    /// Should validation be skipped when reading data? Defaults to false.
467    ///
468    /// See [`FileDecoder::with_skip_validation`] for details.
469    skip_validation: UnsafeFlag,
470}
471
472impl<'a> RecordBatchDecoder<'a> {
473    /// Create a reader for decoding arrays from an encoded [`RecordBatch`]
474    fn try_new(
475        buf: &'a Buffer,
476        batch: crate::RecordBatch<'a>,
477        schema: SchemaRef,
478        dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
479        metadata: &'a MetadataVersion,
480    ) -> Result<Self, ArrowError> {
481        let buffers = batch.buffers().ok_or_else(|| {
482            ArrowError::IpcError("Unable to get buffers from IPC RecordBatch".to_string())
483        })?;
484        let field_nodes = batch.nodes().ok_or_else(|| {
485            ArrowError::IpcError("Unable to get field nodes from IPC RecordBatch".to_string())
486        })?;
487
488        let batch_compression = batch.compression();
489        let compression = batch_compression
490            .map(|batch_compression| batch_compression.codec().try_into())
491            .transpose()?;
492
493        Ok(Self {
494            batch,
495            schema,
496            dictionaries_by_id,
497            compression,
498            decompression_context: DecompressionContext::new(),
499            version: *metadata,
500            data: buf,
501            nodes: field_nodes.iter(),
502            buffers: buffers.iter(),
503            projection: None,
504            require_alignment: false,
505            skip_validation: UnsafeFlag::new(),
506        })
507    }
508
509    /// Set the projection (default: None)
510    ///
511    /// If set, the projection is the list  of column indices
512    /// that will be read
513    pub fn with_projection(mut self, projection: Option<&'a [usize]>) -> Self {
514        self.projection = projection;
515        self
516    }
517
518    /// Set require_alignment (default: false)
519    ///
520    /// If true, buffers must be aligned appropriately or error will
521    /// result. If false, buffers will be copied to aligned buffers
522    /// if necessary.
523    pub fn with_require_alignment(mut self, require_alignment: bool) -> Self {
524        self.require_alignment = require_alignment;
525        self
526    }
527
528    /// Specifies if validation should be skipped when reading data (defaults to `false`)
529    ///
530    /// Note this API is somewhat "funky" as it allows the caller to skip validation
531    /// without having to use `unsafe` code. If this is ever made public
532    /// it should be made clearer that this is a potentially unsafe by
533    /// using an `unsafe` function that takes a boolean flag.
534    ///
535    /// # Safety
536    ///
537    /// Relies on the caller only passing a flag with `true` value if they are
538    /// certain that the data is valid
539    pub(crate) fn with_skip_validation(mut self, skip_validation: UnsafeFlag) -> Self {
540        self.skip_validation = skip_validation;
541        self
542    }
543
544    /// Read the record batch, consuming the reader
545    fn read_record_batch(mut self) -> Result<RecordBatch, ArrowError> {
546        let mut variadic_counts: VecDeque<i64> = self
547            .batch
548            .variadicBufferCounts()
549            .into_iter()
550            .flatten()
551            .collect();
552
553        let options = RecordBatchOptions::new().with_row_count(Some(self.batch.length() as usize));
554
555        let schema = Arc::clone(&self.schema);
556        if let Some(projection) = self.projection {
557            let mut arrays = vec![];
558            // project fields
559            for (idx, field) in schema.fields().iter().enumerate() {
560                // Create array for projected field
561                if let Some(proj_idx) = projection.iter().position(|p| p == &idx) {
562                    let child = self.create_array(field, &mut variadic_counts)?;
563                    arrays.push((proj_idx, child));
564                } else {
565                    self.skip_field(field, &mut variadic_counts)?;
566                }
567            }
568
569            arrays.sort_by_key(|t| t.0);
570
571            let schema = Arc::new(schema.project(projection)?);
572            let columns = arrays.into_iter().map(|t| t.1).collect::<Vec<_>>();
573
574            if self.skip_validation.get() {
575                // Safety: setting `skip_validation` requires `unsafe`, user assures data is valid
576                unsafe {
577                    Ok(RecordBatch::new_unchecked(
578                        schema,
579                        columns,
580                        self.batch.length() as usize,
581                    ))
582                }
583            } else {
584                assert!(variadic_counts.is_empty());
585                RecordBatch::try_new_with_options(schema, columns, &options)
586            }
587        } else {
588            let mut children = vec![];
589            // keep track of index as lists require more than one node
590            for field in schema.fields() {
591                let child = self.create_array(field, &mut variadic_counts)?;
592                children.push(child);
593            }
594
595            if self.skip_validation.get() {
596                // Safety: setting `skip_validation` requires `unsafe`, user assures data is valid
597                unsafe {
598                    Ok(RecordBatch::new_unchecked(
599                        schema,
600                        children,
601                        self.batch.length() as usize,
602                    ))
603                }
604            } else {
605                assert!(variadic_counts.is_empty());
606                RecordBatch::try_new_with_options(schema, children, &options)
607            }
608        }
609    }
610
611    fn next_buffer(&mut self) -> Result<Buffer, ArrowError> {
612        let buffer = self.buffers.next().ok_or_else(|| {
613            ArrowError::IpcError("Buffer count mismatched with metadata".to_string())
614        })?;
615        read_buffer(
616            buffer,
617            self.data,
618            self.compression,
619            &mut self.decompression_context,
620        )
621    }
622
623    fn skip_buffer(&mut self) {
624        self.buffers.next().unwrap();
625    }
626
627    fn next_node(&mut self, field: &Field) -> Result<&'a FieldNode, ArrowError> {
628        self.nodes.next().ok_or_else(|| {
629            ArrowError::SchemaError(format!(
630                "Invalid data for schema. {field} refers to node not found in schema",
631            ))
632        })
633    }
634
635    fn skip_field(
636        &mut self,
637        field: &Field,
638        variadic_count: &mut VecDeque<i64>,
639    ) -> Result<(), ArrowError> {
640        self.next_node(field)?;
641
642        match field.data_type() {
643            Utf8 | Binary | LargeBinary | LargeUtf8 => {
644                for _ in 0..3 {
645                    self.skip_buffer()
646                }
647            }
648            Utf8View | BinaryView => {
649                let count = variadic_count
650                    .pop_front()
651                    .ok_or(ArrowError::IpcError(format!(
652                        "Missing variadic count for {} column",
653                        field.data_type()
654                    )))?;
655                let count = count + 2; // view and null buffer.
656                for _i in 0..count {
657                    self.skip_buffer()
658                }
659            }
660            FixedSizeBinary(_) => {
661                self.skip_buffer();
662                self.skip_buffer();
663            }
664            List(list_field) | LargeList(list_field) | Map(list_field, _) => {
665                self.skip_buffer();
666                self.skip_buffer();
667                self.skip_field(list_field, variadic_count)?;
668            }
669            FixedSizeList(list_field, _) => {
670                self.skip_buffer();
671                self.skip_field(list_field, variadic_count)?;
672            }
673            Struct(struct_fields) => {
674                self.skip_buffer();
675
676                // skip for each field
677                for struct_field in struct_fields {
678                    self.skip_field(struct_field, variadic_count)?
679                }
680            }
681            RunEndEncoded(run_ends_field, values_field) => {
682                self.skip_field(run_ends_field, variadic_count)?;
683                self.skip_field(values_field, variadic_count)?;
684            }
685            Dictionary(_, _) => {
686                self.skip_buffer(); // Nulls
687                self.skip_buffer(); // Indices
688            }
689            Union(fields, mode) => {
690                self.skip_buffer(); // Nulls
691
692                match mode {
693                    UnionMode::Dense => self.skip_buffer(),
694                    UnionMode::Sparse => {}
695                };
696
697                for (_, field) in fields.iter() {
698                    self.skip_field(field, variadic_count)?
699                }
700            }
701            Null => {} // No buffer increases
702            _ => {
703                self.skip_buffer();
704                self.skip_buffer();
705            }
706        };
707        Ok(())
708    }
709}
710
711/// Creates a record batch from binary data using the `crate::RecordBatch` indexes and the `Schema`.
712///
713/// If `require_alignment` is true, this function will return an error if any array data in the
714/// input `buf` is not properly aligned.
715/// Under the hood it will use [`arrow_data::ArrayDataBuilder::build`] to construct [`arrow_data::ArrayData`].
716///
717/// If `require_alignment` is false, this function will automatically allocate a new aligned buffer
718/// and copy over the data if any array data in the input `buf` is not properly aligned.
719/// (Properly aligned array data will remain zero-copy.)
720/// Under the hood it will use [`arrow_data::ArrayDataBuilder::build_aligned`] to construct [`arrow_data::ArrayData`].
721pub fn read_record_batch(
722    buf: &Buffer,
723    batch: crate::RecordBatch,
724    schema: SchemaRef,
725    dictionaries_by_id: &HashMap<i64, ArrayRef>,
726    projection: Option<&[usize]>,
727    metadata: &MetadataVersion,
728) -> Result<RecordBatch, ArrowError> {
729    RecordBatchDecoder::try_new(buf, batch, schema, dictionaries_by_id, metadata)?
730        .with_projection(projection)
731        .with_require_alignment(false)
732        .read_record_batch()
733}
734
735/// Read the dictionary from the buffer and provided metadata,
736/// updating the `dictionaries_by_id` with the resulting dictionary
737pub fn read_dictionary(
738    buf: &Buffer,
739    batch: crate::DictionaryBatch,
740    schema: &Schema,
741    dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
742    metadata: &MetadataVersion,
743) -> Result<(), ArrowError> {
744    read_dictionary_impl(
745        buf,
746        batch,
747        schema,
748        dictionaries_by_id,
749        metadata,
750        false,
751        UnsafeFlag::new(),
752    )
753}
754
755fn read_dictionary_impl(
756    buf: &Buffer,
757    batch: crate::DictionaryBatch,
758    schema: &Schema,
759    dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
760    metadata: &MetadataVersion,
761    require_alignment: bool,
762    skip_validation: UnsafeFlag,
763) -> Result<(), ArrowError> {
764    let id = batch.id();
765
766    let dictionary_values = get_dictionary_values(
767        buf,
768        batch,
769        schema,
770        dictionaries_by_id,
771        metadata,
772        require_alignment,
773        skip_validation,
774    )?;
775
776    update_dictionaries(dictionaries_by_id, batch.isDelta(), id, dictionary_values)?;
777
778    Ok(())
779}
780
781/// Updates the `dictionaries_by_id` with the provided dictionary values and id.
782///
783/// # Errors
784/// - If `is_delta` is true and there is no existing dictionary for the given
785///   `dict_id`
786/// - If `is_delta` is true and the concatenation of the existing and new
787///   dictionary fails. This usually signals a type mismatch between the old and
788///   new values.
789fn update_dictionaries(
790    dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
791    is_delta: bool,
792    dict_id: i64,
793    dict_values: ArrayRef,
794) -> Result<(), ArrowError> {
795    if !is_delta {
796        // We don't currently record the isOrdered field. This could be general
797        // attributes of arrays.
798        // Add (possibly multiple) array refs to the dictionaries array.
799        dictionaries_by_id.insert(dict_id, dict_values.clone());
800        return Ok(());
801    }
802
803    let existing = dictionaries_by_id.get(&dict_id).ok_or_else(|| {
804        ArrowError::InvalidArgumentError(format!(
805            "No existing dictionary for delta dictionary with id '{dict_id}'"
806        ))
807    })?;
808
809    let combined = concat::concat(&[existing, &dict_values]).map_err(|e| {
810        ArrowError::InvalidArgumentError(format!("Failed to concat delta dictionary: {e}"))
811    })?;
812
813    dictionaries_by_id.insert(dict_id, combined);
814
815    Ok(())
816}
817
818/// Given a dictionary batch IPC message/body along with the full state of a
819/// stream including schema, dictionary cache, metadata, and other flags, this
820/// function will parse the buffer into an array of dictionary values.
821fn get_dictionary_values(
822    buf: &Buffer,
823    batch: crate::DictionaryBatch,
824    schema: &Schema,
825    dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
826    metadata: &MetadataVersion,
827    require_alignment: bool,
828    skip_validation: UnsafeFlag,
829) -> Result<ArrayRef, ArrowError> {
830    let id = batch.id();
831    #[allow(deprecated)]
832    let fields_using_this_dictionary = schema.fields_with_dict_id(id);
833    let first_field = fields_using_this_dictionary.first().ok_or_else(|| {
834        ArrowError::InvalidArgumentError(format!("dictionary id {id} not found in schema"))
835    })?;
836
837    // As the dictionary batch does not contain the type of the
838    // values array, we need to retrieve this from the schema.
839    // Get an array representing this dictionary's values.
840    let dictionary_values: ArrayRef = match first_field.data_type() {
841        DataType::Dictionary(_, value_type) => {
842            // Make a fake schema for the dictionary batch.
843            let value = value_type.as_ref().clone();
844            let schema = Schema::new(vec![Field::new("", value, true)]);
845            // Read a single column
846            let record_batch = RecordBatchDecoder::try_new(
847                buf,
848                batch.data().unwrap(),
849                Arc::new(schema),
850                dictionaries_by_id,
851                metadata,
852            )?
853            .with_require_alignment(require_alignment)
854            .with_skip_validation(skip_validation)
855            .read_record_batch()?;
856
857            Some(record_batch.column(0).clone())
858        }
859        _ => None,
860    }
861    .ok_or_else(|| {
862        ArrowError::InvalidArgumentError(format!("dictionary id {id} not found in schema"))
863    })?;
864
865    Ok(dictionary_values)
866}
867
868/// Read the data for a given block
869fn read_block<R: Read + Seek>(mut reader: R, block: &Block) -> Result<Buffer, ArrowError> {
870    reader.seek(SeekFrom::Start(block.offset() as u64))?;
871    let body_len = block.bodyLength().to_usize().unwrap();
872    let metadata_len = block.metaDataLength().to_usize().unwrap();
873    let total_len = body_len.checked_add(metadata_len).unwrap();
874
875    let mut buf = MutableBuffer::from_len_zeroed(total_len);
876    reader.read_exact(&mut buf)?;
877    Ok(buf.into())
878}
879
880/// Parse an encapsulated message
881///
882/// <https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format>
883fn parse_message(buf: &[u8]) -> Result<Message::Message<'_>, ArrowError> {
884    let buf = match buf[..4] == CONTINUATION_MARKER {
885        true => &buf[8..],
886        false => &buf[4..],
887    };
888    crate::root_as_message(buf)
889        .map_err(|err| ArrowError::ParseError(format!("Unable to get root as message: {err:?}")))
890}
891
892/// Read the footer length from the last 10 bytes of an Arrow IPC file
893///
894/// Expects a 4 byte footer length followed by `b"ARROW1"`
895pub fn read_footer_length(buf: [u8; 10]) -> Result<usize, ArrowError> {
896    if buf[4..] != super::ARROW_MAGIC {
897        return Err(ArrowError::ParseError(
898            "Arrow file does not contain correct footer".to_string(),
899        ));
900    }
901
902    // read footer length
903    let footer_len = i32::from_le_bytes(buf[..4].try_into().unwrap());
904    footer_len
905        .try_into()
906        .map_err(|_| ArrowError::ParseError(format!("Invalid footer length: {footer_len}")))
907}
908
909/// A low-level, push-based interface for reading an IPC file
910///
911/// For a higher-level interface see [`FileReader`]
912///
913/// For an example of using this API with `mmap` see the [`zero_copy_ipc`] example.
914///
915/// [`zero_copy_ipc`]: https://github.com/apache/arrow-rs/blob/main/arrow/examples/zero_copy_ipc.rs
916///
917/// ```
918/// # use std::sync::Arc;
919/// # use arrow_array::*;
920/// # use arrow_array::types::Int32Type;
921/// # use arrow_buffer::Buffer;
922/// # use arrow_ipc::convert::fb_to_schema;
923/// # use arrow_ipc::reader::{FileDecoder, read_footer_length};
924/// # use arrow_ipc::root_as_footer;
925/// # use arrow_ipc::writer::FileWriter;
926/// // Write an IPC file
927///
928/// let batch = RecordBatch::try_from_iter([
929///     ("a", Arc::new(Int32Array::from(vec![1, 2, 3])) as _),
930///     ("b", Arc::new(Int32Array::from(vec![1, 2, 3])) as _),
931///     ("c", Arc::new(DictionaryArray::<Int32Type>::from_iter(["hello", "hello", "world"])) as _),
932/// ]).unwrap();
933///
934/// let schema = batch.schema();
935///
936/// let mut out = Vec::with_capacity(1024);
937/// let mut writer = FileWriter::try_new(&mut out, schema.as_ref()).unwrap();
938/// writer.write(&batch).unwrap();
939/// writer.finish().unwrap();
940///
941/// drop(writer);
942///
943/// // Read IPC file
944///
945/// let buffer = Buffer::from_vec(out);
946/// let trailer_start = buffer.len() - 10;
947/// let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
948/// let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
949///
950/// let back = fb_to_schema(footer.schema().unwrap());
951/// assert_eq!(&back, schema.as_ref());
952///
953/// let mut decoder = FileDecoder::new(schema, footer.version());
954///
955/// // Read dictionaries
956/// for block in footer.dictionaries().iter().flatten() {
957///     let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
958///     let data = buffer.slice_with_length(block.offset() as _, block_len);
959///     decoder.read_dictionary(&block, &data).unwrap();
960/// }
961///
962/// // Read record batch
963/// let batches = footer.recordBatches().unwrap();
964/// assert_eq!(batches.len(), 1); // Only wrote a single batch
965///
966/// let block = batches.get(0);
967/// let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
968/// let data = buffer.slice_with_length(block.offset() as _, block_len);
969/// let back = decoder.read_record_batch(block, &data).unwrap().unwrap();
970///
971/// assert_eq!(batch, back);
972/// ```
973#[derive(Debug)]
974pub struct FileDecoder {
975    schema: SchemaRef,
976    dictionaries: HashMap<i64, ArrayRef>,
977    version: MetadataVersion,
978    projection: Option<Vec<usize>>,
979    require_alignment: bool,
980    skip_validation: UnsafeFlag,
981}
982
983impl FileDecoder {
984    /// Create a new [`FileDecoder`] with the given schema and version
985    pub fn new(schema: SchemaRef, version: MetadataVersion) -> Self {
986        Self {
987            schema,
988            version,
989            dictionaries: Default::default(),
990            projection: None,
991            require_alignment: false,
992            skip_validation: UnsafeFlag::new(),
993        }
994    }
995
996    /// Specify a projection
997    pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
998        self.projection = Some(projection);
999        self
1000    }
1001
1002    /// Specifies if the array data in input buffers is required to be properly aligned.
1003    ///
1004    /// If `require_alignment` is true, this decoder will return an error if any array data in the
1005    /// input `buf` is not properly aligned.
1006    /// Under the hood it will use [`arrow_data::ArrayDataBuilder::build`] to construct
1007    /// [`arrow_data::ArrayData`].
1008    ///
1009    /// If `require_alignment` is false (the default), this decoder will automatically allocate a
1010    /// new aligned buffer and copy over the data if any array data in the input `buf` is not
1011    /// properly aligned. (Properly aligned array data will remain zero-copy.)
1012    /// Under the hood it will use [`arrow_data::ArrayDataBuilder::build_aligned`] to construct
1013    /// [`arrow_data::ArrayData`].
1014    pub fn with_require_alignment(mut self, require_alignment: bool) -> Self {
1015        self.require_alignment = require_alignment;
1016        self
1017    }
1018
1019    /// Specifies if validation should be skipped when reading data (defaults to `false`)
1020    ///
1021    /// # Safety
1022    ///
1023    /// This flag must only be set to `true` when you trust the input data and are sure the data you are
1024    /// reading is a valid Arrow IPC file, otherwise undefined behavior may
1025    /// result.
1026    ///
1027    /// For example, some programs may wish to trust reading IPC files written
1028    /// by the same process that created the files.
1029    pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
1030        unsafe { self.skip_validation.set(skip_validation) };
1031        self
1032    }
1033
1034    fn read_message<'a>(&self, buf: &'a [u8]) -> Result<Message::Message<'a>, ArrowError> {
1035        let message = parse_message(buf)?;
1036
1037        // some old test data's footer metadata is not set, so we account for that
1038        if self.version != MetadataVersion::V1 && message.version() != self.version {
1039            return Err(ArrowError::IpcError(
1040                "Could not read IPC message as metadata versions mismatch".to_string(),
1041            ));
1042        }
1043        Ok(message)
1044    }
1045
1046    /// Read the dictionary with the given block and data buffer
1047    pub fn read_dictionary(&mut self, block: &Block, buf: &Buffer) -> Result<(), ArrowError> {
1048        let message = self.read_message(buf)?;
1049        match message.header_type() {
1050            crate::MessageHeader::DictionaryBatch => {
1051                let batch = message.header_as_dictionary_batch().unwrap();
1052                read_dictionary_impl(
1053                    &buf.slice(block.metaDataLength() as _),
1054                    batch,
1055                    &self.schema,
1056                    &mut self.dictionaries,
1057                    &message.version(),
1058                    self.require_alignment,
1059                    self.skip_validation.clone(),
1060                )
1061            }
1062            t => Err(ArrowError::ParseError(format!(
1063                "Expecting DictionaryBatch in dictionary blocks, found {t:?}."
1064            ))),
1065        }
1066    }
1067
1068    /// Read the RecordBatch with the given block and data buffer
1069    pub fn read_record_batch(
1070        &self,
1071        block: &Block,
1072        buf: &Buffer,
1073    ) -> Result<Option<RecordBatch>, ArrowError> {
1074        let message = self.read_message(buf)?;
1075        match message.header_type() {
1076            crate::MessageHeader::Schema => Err(ArrowError::IpcError(
1077                "Not expecting a schema when messages are read".to_string(),
1078            )),
1079            crate::MessageHeader::RecordBatch => {
1080                let batch = message.header_as_record_batch().ok_or_else(|| {
1081                    ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
1082                })?;
1083                // read the block that makes up the record batch into a buffer
1084                RecordBatchDecoder::try_new(
1085                    &buf.slice(block.metaDataLength() as _),
1086                    batch,
1087                    self.schema.clone(),
1088                    &self.dictionaries,
1089                    &message.version(),
1090                )?
1091                .with_projection(self.projection.as_deref())
1092                .with_require_alignment(self.require_alignment)
1093                .with_skip_validation(self.skip_validation.clone())
1094                .read_record_batch()
1095                .map(Some)
1096            }
1097            crate::MessageHeader::NONE => Ok(None),
1098            t => Err(ArrowError::InvalidArgumentError(format!(
1099                "Reading types other than record batches not yet supported, unable to read {t:?}"
1100            ))),
1101        }
1102    }
1103}
1104
1105/// Build an Arrow [`FileReader`] with custom options.
1106#[derive(Debug)]
1107pub struct FileReaderBuilder {
1108    /// Optional projection for which columns to load (zero-based column indices)
1109    projection: Option<Vec<usize>>,
1110    /// Passed through to construct [`VerifierOptions`]
1111    max_footer_fb_tables: usize,
1112    /// Passed through to construct [`VerifierOptions`]
1113    max_footer_fb_depth: usize,
1114}
1115
1116impl Default for FileReaderBuilder {
1117    fn default() -> Self {
1118        let verifier_options = VerifierOptions::default();
1119        Self {
1120            max_footer_fb_tables: verifier_options.max_tables,
1121            max_footer_fb_depth: verifier_options.max_depth,
1122            projection: None,
1123        }
1124    }
1125}
1126
1127impl FileReaderBuilder {
1128    /// Options for creating a new [`FileReader`].
1129    ///
1130    /// To convert a builder into a reader, call [`FileReaderBuilder::build`].
1131    pub fn new() -> Self {
1132        Self::default()
1133    }
1134
1135    /// Optional projection for which columns to load (zero-based column indices).
1136    pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
1137        self.projection = Some(projection);
1138        self
1139    }
1140
1141    /// Flatbuffers option for parsing the footer. Controls the max number of fields and
1142    /// metadata key-value pairs that can be parsed from the schema of the footer.
1143    ///
1144    /// By default this is set to `1_000_000` which roughly translates to a schema with
1145    /// no metadata key-value pairs but 499,999 fields.
1146    ///
1147    /// This default limit is enforced to protect against malicious files with a massive
1148    /// amount of flatbuffer tables which could cause a denial of service attack.
1149    ///
1150    /// If you need to ingest a trusted file with a massive number of fields and/or
1151    /// metadata key-value pairs and are facing the error `"Unable to get root as
1152    /// footer: TooManyTables"` then increase this parameter as necessary.
1153    pub fn with_max_footer_fb_tables(mut self, max_footer_fb_tables: usize) -> Self {
1154        self.max_footer_fb_tables = max_footer_fb_tables;
1155        self
1156    }
1157
1158    /// Flatbuffers option for parsing the footer. Controls the max depth for schemas with
1159    /// nested fields parsed from the footer.
1160    ///
1161    /// By default this is set to `64` which roughly translates to a schema with
1162    /// a field nested 60 levels down through other struct fields.
1163    ///
1164    /// This default limit is enforced to protect against malicious files with a extremely
1165    /// deep flatbuffer structure which could cause a denial of service attack.
1166    ///
1167    /// If you need to ingest a trusted file with a deeply nested field and are facing the
1168    /// error `"Unable to get root as footer: DepthLimitReached"` then increase this
1169    /// parameter as necessary.
1170    pub fn with_max_footer_fb_depth(mut self, max_footer_fb_depth: usize) -> Self {
1171        self.max_footer_fb_depth = max_footer_fb_depth;
1172        self
1173    }
1174
1175    /// Build [`FileReader`] with given reader.
1176    pub fn build<R: Read + Seek>(self, mut reader: R) -> Result<FileReader<R>, ArrowError> {
1177        // Space for ARROW_MAGIC (6 bytes) and length (4 bytes)
1178        let mut buffer = [0; 10];
1179        reader.seek(SeekFrom::End(-10))?;
1180        reader.read_exact(&mut buffer)?;
1181
1182        let footer_len = read_footer_length(buffer)?;
1183
1184        // read footer
1185        let mut footer_data = vec![0; footer_len];
1186        reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
1187        reader.read_exact(&mut footer_data)?;
1188
1189        let verifier_options = VerifierOptions {
1190            max_tables: self.max_footer_fb_tables,
1191            max_depth: self.max_footer_fb_depth,
1192            ..Default::default()
1193        };
1194        let footer = crate::root_as_footer_with_opts(&verifier_options, &footer_data[..]).map_err(
1195            |err| ArrowError::ParseError(format!("Unable to get root as footer: {err:?}")),
1196        )?;
1197
1198        let blocks = footer.recordBatches().ok_or_else(|| {
1199            ArrowError::ParseError("Unable to get record batches from IPC Footer".to_string())
1200        })?;
1201
1202        let total_blocks = blocks.len();
1203
1204        let ipc_schema = footer.schema().unwrap();
1205        if !ipc_schema.endianness().equals_to_target_endianness() {
1206            return Err(ArrowError::IpcError(
1207                "the endianness of the source system does not match the endianness of the target system.".to_owned()
1208            ));
1209        }
1210
1211        let schema = crate::convert::fb_to_schema(ipc_schema);
1212
1213        let mut custom_metadata = HashMap::new();
1214        if let Some(fb_custom_metadata) = footer.custom_metadata() {
1215            for kv in fb_custom_metadata.into_iter() {
1216                custom_metadata.insert(
1217                    kv.key().unwrap().to_string(),
1218                    kv.value().unwrap().to_string(),
1219                );
1220            }
1221        }
1222
1223        let mut decoder = FileDecoder::new(Arc::new(schema), footer.version());
1224        if let Some(projection) = self.projection {
1225            decoder = decoder.with_projection(projection)
1226        }
1227
1228        // Create an array of optional dictionary value arrays, one per field.
1229        if let Some(dictionaries) = footer.dictionaries() {
1230            for block in dictionaries {
1231                let buf = read_block(&mut reader, block)?;
1232                decoder.read_dictionary(block, &buf)?;
1233            }
1234        }
1235
1236        Ok(FileReader {
1237            reader,
1238            blocks: blocks.iter().copied().collect(),
1239            current_block: 0,
1240            total_blocks,
1241            decoder,
1242            custom_metadata,
1243        })
1244    }
1245}
1246
1247/// Arrow File Reader
1248///
1249/// Reads Arrow [`RecordBatch`]es from bytes in the [IPC File Format],
1250/// providing random access to the record batches.
1251///
1252/// # See Also
1253///
1254/// * [`Self::set_index`] for random access
1255/// * [`StreamReader`] for reading streaming data
1256///
1257/// # Example: Reading from a `File`
1258/// ```
1259/// # use std::io::Cursor;
1260/// use arrow_array::record_batch;
1261/// # use arrow_ipc::reader::FileReader;
1262/// # use arrow_ipc::writer::FileWriter;
1263/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
1264/// # let mut file = vec![]; // mimic a stream for the example
1265/// # {
1266/// #  let mut writer = FileWriter::try_new(&mut file, &batch.schema()).unwrap();
1267/// #  writer.write(&batch).unwrap();
1268/// #  writer.write(&batch).unwrap();
1269/// #  writer.finish().unwrap();
1270/// # }
1271/// # let mut file = Cursor::new(&file);
1272/// let projection = None; // read all columns
1273/// let mut reader = FileReader::try_new(&mut file, projection).unwrap();
1274/// // Position the reader to the second batch
1275/// reader.set_index(1).unwrap();
1276/// // read batches from the reader using the Iterator trait
1277/// let mut num_rows = 0;
1278/// for batch in reader {
1279///    let batch = batch.unwrap();
1280///    num_rows += batch.num_rows();
1281/// }
1282/// assert_eq!(num_rows, 3);
1283/// ```
1284/// # Example: Reading from `mmap`ed file
1285///
1286/// For an example creating Arrays without copying using  memory mapped (`mmap`)
1287/// files see the [`zero_copy_ipc`] example.
1288///
1289/// [IPC File Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format
1290/// [`zero_copy_ipc`]: https://github.com/apache/arrow-rs/blob/main/arrow/examples/zero_copy_ipc.rs
1291pub struct FileReader<R> {
1292    /// File reader that supports reading and seeking
1293    reader: R,
1294
1295    /// The decoder
1296    decoder: FileDecoder,
1297
1298    /// The blocks in the file
1299    ///
1300    /// A block indicates the regions in the file to read to get data
1301    blocks: Vec<Block>,
1302
1303    /// A counter to keep track of the current block that should be read
1304    current_block: usize,
1305
1306    /// The total number of blocks, which may contain record batches and other types
1307    total_blocks: usize,
1308
1309    /// User defined metadata
1310    custom_metadata: HashMap<String, String>,
1311}
1312
1313impl<R> fmt::Debug for FileReader<R> {
1314    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1315        f.debug_struct("FileReader<R>")
1316            .field("decoder", &self.decoder)
1317            .field("blocks", &self.blocks)
1318            .field("current_block", &self.current_block)
1319            .field("total_blocks", &self.total_blocks)
1320            .finish_non_exhaustive()
1321    }
1322}
1323
1324impl<R: Read + Seek> FileReader<BufReader<R>> {
1325    /// Try to create a new file reader with the reader wrapped in a BufReader.
1326    ///
1327    /// See [`FileReader::try_new`] for an unbuffered version.
1328    pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1329        Self::try_new(BufReader::new(reader), projection)
1330    }
1331}
1332
1333impl<R: Read + Seek> FileReader<R> {
1334    /// Try to create a new file reader.
1335    ///
1336    /// There is no internal buffering. If buffered reads are needed you likely want to use
1337    /// [`FileReader::try_new_buffered`] instead.
1338    ///
1339    /// # Errors
1340    ///
1341    /// An ['Err'](Result::Err) may be returned if:
1342    /// - the file does not meet the Arrow Format footer requirements, or
1343    /// - file endianness does not match the target endianness.
1344    pub fn try_new(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1345        let builder = FileReaderBuilder {
1346            projection,
1347            ..Default::default()
1348        };
1349        builder.build(reader)
1350    }
1351
1352    /// Return user defined customized metadata
1353    pub fn custom_metadata(&self) -> &HashMap<String, String> {
1354        &self.custom_metadata
1355    }
1356
1357    /// Return the number of batches in the file
1358    pub fn num_batches(&self) -> usize {
1359        self.total_blocks
1360    }
1361
1362    /// Return the schema of the file
1363    pub fn schema(&self) -> SchemaRef {
1364        self.decoder.schema.clone()
1365    }
1366
1367    /// See to a specific [`RecordBatch`]
1368    ///
1369    /// Sets the current block to the index, allowing random reads
1370    pub fn set_index(&mut self, index: usize) -> Result<(), ArrowError> {
1371        if index >= self.total_blocks {
1372            Err(ArrowError::InvalidArgumentError(format!(
1373                "Cannot set batch to index {} from {} total batches",
1374                index, self.total_blocks
1375            )))
1376        } else {
1377            self.current_block = index;
1378            Ok(())
1379        }
1380    }
1381
1382    fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
1383        let block = &self.blocks[self.current_block];
1384        self.current_block += 1;
1385
1386        // read length
1387        let buffer = read_block(&mut self.reader, block)?;
1388        self.decoder.read_record_batch(block, &buffer)
1389    }
1390
1391    /// Gets a reference to the underlying reader.
1392    ///
1393    /// It is inadvisable to directly read from the underlying reader.
1394    pub fn get_ref(&self) -> &R {
1395        &self.reader
1396    }
1397
1398    /// Gets a mutable reference to the underlying reader.
1399    ///
1400    /// It is inadvisable to directly read from the underlying reader.
1401    pub fn get_mut(&mut self) -> &mut R {
1402        &mut self.reader
1403    }
1404
1405    /// Specifies if validation should be skipped when reading data (defaults to `false`)
1406    ///
1407    /// # Safety
1408    ///
1409    /// See [`FileDecoder::with_skip_validation`]
1410    pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
1411        self.decoder = unsafe { self.decoder.with_skip_validation(skip_validation) };
1412        self
1413    }
1414}
1415
1416impl<R: Read + Seek> Iterator for FileReader<R> {
1417    type Item = Result<RecordBatch, ArrowError>;
1418
1419    fn next(&mut self) -> Option<Self::Item> {
1420        // get current block
1421        if self.current_block < self.total_blocks {
1422            self.maybe_next().transpose()
1423        } else {
1424            None
1425        }
1426    }
1427}
1428
1429impl<R: Read + Seek> RecordBatchReader for FileReader<R> {
1430    fn schema(&self) -> SchemaRef {
1431        self.schema()
1432    }
1433}
1434
1435/// Arrow Stream Reader
1436///
1437/// Reads Arrow [`RecordBatch`]es from bytes in the [IPC Streaming Format].
1438///
1439/// # See Also
1440///
1441/// * [`FileReader`] for random access.
1442///
1443/// # Example
1444/// ```
1445/// # use arrow_array::record_batch;
1446/// # use arrow_ipc::reader::StreamReader;
1447/// # use arrow_ipc::writer::StreamWriter;
1448/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
1449/// # let mut stream = vec![]; // mimic a stream for the example
1450/// # {
1451/// #  let mut writer = StreamWriter::try_new(&mut stream, &batch.schema()).unwrap();
1452/// #  writer.write(&batch).unwrap();
1453/// #  writer.finish().unwrap();
1454/// # }
1455/// # let stream = stream.as_slice();
1456/// let projection = None; // read all columns
1457/// let mut reader = StreamReader::try_new(stream, projection).unwrap();
1458/// // read batches from the reader using the Iterator trait
1459/// let mut num_rows = 0;
1460/// for batch in reader {
1461///    let batch = batch.unwrap();
1462///    num_rows += batch.num_rows();
1463/// }
1464/// assert_eq!(num_rows, 3);
1465/// ```
1466///
1467/// [IPC Streaming Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
1468pub struct StreamReader<R> {
1469    /// Stream reader
1470    reader: MessageReader<R>,
1471
1472    /// The schema that is read from the stream's first message
1473    schema: SchemaRef,
1474
1475    /// Optional dictionaries for each schema field.
1476    ///
1477    /// Dictionaries may be appended to in the streaming format.
1478    dictionaries_by_id: HashMap<i64, ArrayRef>,
1479
1480    /// An indicator of whether the stream is complete.
1481    ///
1482    /// This value is set to `true` the first time the reader's `next()` returns `None`.
1483    finished: bool,
1484
1485    /// Optional projection
1486    projection: Option<(Vec<usize>, Schema)>,
1487
1488    /// Should validation be skipped when reading data? Defaults to false.
1489    ///
1490    /// See [`FileDecoder::with_skip_validation`] for details.
1491    skip_validation: UnsafeFlag,
1492}
1493
1494impl<R> fmt::Debug for StreamReader<R> {
1495    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
1496        f.debug_struct("StreamReader<R>")
1497            .field("reader", &"R")
1498            .field("schema", &self.schema)
1499            .field("dictionaries_by_id", &self.dictionaries_by_id)
1500            .field("finished", &self.finished)
1501            .field("projection", &self.projection)
1502            .finish()
1503    }
1504}
1505
1506impl<R: Read> StreamReader<BufReader<R>> {
1507    /// Try to create a new stream reader with the reader wrapped in a BufReader.
1508    ///
1509    /// See [`StreamReader::try_new`] for an unbuffered version.
1510    pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1511        Self::try_new(BufReader::new(reader), projection)
1512    }
1513}
1514
1515impl<R: Read> StreamReader<R> {
1516    /// Try to create a new stream reader.
1517    ///
1518    /// To check if the reader is done, use [`is_finished(self)`](StreamReader::is_finished).
1519    ///
1520    /// There is no internal buffering. If buffered reads are needed you likely want to use
1521    /// [`StreamReader::try_new_buffered`] instead.
1522    ///
1523    /// # Errors
1524    ///
1525    /// An ['Err'](Result::Err) may be returned if the reader does not encounter a schema
1526    /// as the first message in the stream.
1527    pub fn try_new(
1528        reader: R,
1529        projection: Option<Vec<usize>>,
1530    ) -> Result<StreamReader<R>, ArrowError> {
1531        let mut msg_reader = MessageReader::new(reader);
1532        let message = msg_reader.maybe_next()?;
1533        let Some((message, _)) = message else {
1534            return Err(ArrowError::IpcError(
1535                "Expected schema message, found empty stream.".to_string(),
1536            ));
1537        };
1538
1539        if message.header_type() != Message::MessageHeader::Schema {
1540            return Err(ArrowError::IpcError(format!(
1541                "Expected a schema as the first message in the stream, got: {:?}",
1542                message.header_type()
1543            )));
1544        }
1545
1546        let schema = message.header_as_schema().ok_or_else(|| {
1547            ArrowError::ParseError("Failed to parse schema from message header".to_string())
1548        })?;
1549        let schema = crate::convert::fb_to_schema(schema);
1550
1551        // Create an array of optional dictionary value arrays, one per field.
1552        let dictionaries_by_id = HashMap::new();
1553
1554        let projection = match projection {
1555            Some(projection_indices) => {
1556                let schema = schema.project(&projection_indices)?;
1557                Some((projection_indices, schema))
1558            }
1559            _ => None,
1560        };
1561
1562        Ok(Self {
1563            reader: msg_reader,
1564            schema: Arc::new(schema),
1565            finished: false,
1566            dictionaries_by_id,
1567            projection,
1568            skip_validation: UnsafeFlag::new(),
1569        })
1570    }
1571
1572    /// Deprecated, use [`StreamReader::try_new`] instead.
1573    #[deprecated(since = "53.0.0", note = "use `try_new` instead")]
1574    pub fn try_new_unbuffered(
1575        reader: R,
1576        projection: Option<Vec<usize>>,
1577    ) -> Result<Self, ArrowError> {
1578        Self::try_new(reader, projection)
1579    }
1580
1581    /// Return the schema of the stream
1582    pub fn schema(&self) -> SchemaRef {
1583        self.schema.clone()
1584    }
1585
1586    /// Check if the stream is finished
1587    pub fn is_finished(&self) -> bool {
1588        self.finished
1589    }
1590
1591    fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
1592        if self.finished {
1593            return Ok(None);
1594        }
1595
1596        // Read messages until we get a record batch or end of stream
1597        loop {
1598            let message = self.next_ipc_message()?;
1599            let Some(message) = message else {
1600                // If the message is None, we have reached the end of the stream.
1601                self.finished = true;
1602                return Ok(None);
1603            };
1604
1605            match message {
1606                IpcMessage::Schema(_) => {
1607                    return Err(ArrowError::IpcError(
1608                        "Expected a record batch, but found a schema".to_string(),
1609                    ));
1610                }
1611                IpcMessage::RecordBatch(record_batch) => {
1612                    return Ok(Some(record_batch));
1613                }
1614                IpcMessage::DictionaryBatch { .. } => {
1615                    continue;
1616                }
1617            };
1618        }
1619    }
1620
1621    /// Reads and fully parses the next IPC message from the stream. Whereas
1622    /// [`Self::maybe_next`] is a higher level method focused on reading
1623    /// `RecordBatch`es, this method returns the individual fully parsed IPC
1624    /// messages from the underlying stream.
1625    ///
1626    /// This is useful primarily for testing reader/writer behaviors as it
1627    /// allows a full view into the messages that have been written to a stream.
1628    pub(crate) fn next_ipc_message(&mut self) -> Result<Option<IpcMessage>, ArrowError> {
1629        let message = self.reader.maybe_next()?;
1630        let Some((message, body)) = message else {
1631            // If the message is None, we have reached the end of the stream.
1632            return Ok(None);
1633        };
1634
1635        let ipc_message = match message.header_type() {
1636            Message::MessageHeader::Schema => {
1637                let schema = message.header_as_schema().ok_or_else(|| {
1638                    ArrowError::ParseError("Failed to parse schema from message header".to_string())
1639                })?;
1640                let arrow_schema = crate::convert::fb_to_schema(schema);
1641                IpcMessage::Schema(arrow_schema)
1642            }
1643            Message::MessageHeader::RecordBatch => {
1644                let batch = message.header_as_record_batch().ok_or_else(|| {
1645                    ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
1646                })?;
1647
1648                let version = message.version();
1649                let schema = self.schema.clone();
1650                let record_batch = RecordBatchDecoder::try_new(
1651                    &body.into(),
1652                    batch,
1653                    schema,
1654                    &self.dictionaries_by_id,
1655                    &version,
1656                )?
1657                .with_projection(self.projection.as_ref().map(|x| x.0.as_ref()))
1658                .with_require_alignment(false)
1659                .with_skip_validation(self.skip_validation.clone())
1660                .read_record_batch()?;
1661                IpcMessage::RecordBatch(record_batch)
1662            }
1663            Message::MessageHeader::DictionaryBatch => {
1664                let dict = message.header_as_dictionary_batch().ok_or_else(|| {
1665                    ArrowError::ParseError(
1666                        "Failed to parse dictionary batch from message header".to_string(),
1667                    )
1668                })?;
1669
1670                let version = message.version();
1671                let dict_values = get_dictionary_values(
1672                    &body.into(),
1673                    dict,
1674                    &self.schema,
1675                    &mut self.dictionaries_by_id,
1676                    &version,
1677                    false,
1678                    self.skip_validation.clone(),
1679                )?;
1680
1681                update_dictionaries(
1682                    &mut self.dictionaries_by_id,
1683                    dict.isDelta(),
1684                    dict.id(),
1685                    dict_values.clone(),
1686                )?;
1687
1688                IpcMessage::DictionaryBatch {
1689                    id: dict.id(),
1690                    is_delta: (dict.isDelta()),
1691                    values: (dict_values),
1692                }
1693            }
1694            x => {
1695                return Err(ArrowError::ParseError(format!(
1696                    "Unsupported message header type in IPC stream: '{x:?}'"
1697                )));
1698            }
1699        };
1700
1701        Ok(Some(ipc_message))
1702    }
1703
1704    /// Gets a reference to the underlying reader.
1705    ///
1706    /// It is inadvisable to directly read from the underlying reader.
1707    pub fn get_ref(&self) -> &R {
1708        self.reader.inner()
1709    }
1710
1711    /// Gets a mutable reference to the underlying reader.
1712    ///
1713    /// It is inadvisable to directly read from the underlying reader.
1714    pub fn get_mut(&mut self) -> &mut R {
1715        self.reader.inner_mut()
1716    }
1717
1718    /// Specifies if validation should be skipped when reading data (defaults to `false`)
1719    ///
1720    /// # Safety
1721    ///
1722    /// See [`FileDecoder::with_skip_validation`]
1723    pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
1724        unsafe { self.skip_validation.set(skip_validation) };
1725        self
1726    }
1727}
1728
1729impl<R: Read> Iterator for StreamReader<R> {
1730    type Item = Result<RecordBatch, ArrowError>;
1731
1732    fn next(&mut self) -> Option<Self::Item> {
1733        self.maybe_next().transpose()
1734    }
1735}
1736
1737impl<R: Read> RecordBatchReader for StreamReader<R> {
1738    fn schema(&self) -> SchemaRef {
1739        self.schema.clone()
1740    }
1741}
1742
1743/// Representation of a fully parsed IpcMessage from the underlying stream.
1744/// Parsing this kind of message is done by higher level constructs such as
1745/// [`StreamReader`], because fully interpreting the messages into a record
1746/// batch or dictionary batch requires access to stream state such as schema
1747/// and the full dictionary cache.
1748#[derive(Debug)]
1749#[allow(dead_code)]
1750pub(crate) enum IpcMessage {
1751    Schema(arrow_schema::Schema),
1752    RecordBatch(RecordBatch),
1753    DictionaryBatch {
1754        id: i64,
1755        is_delta: bool,
1756        values: ArrayRef,
1757    },
1758}
1759
1760/// A low-level construct that reads [`Message::Message`]s from a reader while
1761/// re-using a buffer for metadata. This is composed into [`StreamReader`].
1762struct MessageReader<R> {
1763    reader: R,
1764    buf: Vec<u8>,
1765}
1766
1767impl<R: Read> MessageReader<R> {
1768    fn new(reader: R) -> Self {
1769        Self {
1770            reader,
1771            buf: Vec::new(),
1772        }
1773    }
1774
1775    /// Reads the entire next message from the underlying reader which includes
1776    /// the metadata length, the metadata, and the body.
1777    ///
1778    /// # Returns
1779    /// - `Ok(None)` if the the reader signals the end of stream with EOF on
1780    ///   the first read
1781    /// - `Err(_)` if the reader returns an error other than on the first
1782    ///   read, or if the metadata length is invalid
1783    /// - `Ok(Some(_))` with the Message and buffer containiner the
1784    ///   body bytes otherwise.
1785    fn maybe_next(&mut self) -> Result<Option<(Message::Message<'_>, MutableBuffer)>, ArrowError> {
1786        let meta_len = self.read_meta_len()?;
1787        let Some(meta_len) = meta_len else {
1788            return Ok(None);
1789        };
1790
1791        self.buf.resize(meta_len, 0);
1792        self.reader.read_exact(&mut self.buf)?;
1793
1794        let message = crate::root_as_message(self.buf.as_slice()).map_err(|err| {
1795            ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
1796        })?;
1797
1798        let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
1799        self.reader.read_exact(&mut buf)?;
1800
1801        Ok(Some((message, buf)))
1802    }
1803
1804    /// Get a mutable reference to the underlying reader.
1805    fn inner_mut(&mut self) -> &mut R {
1806        &mut self.reader
1807    }
1808
1809    /// Get an immutable reference to the underlying reader.
1810    fn inner(&self) -> &R {
1811        &self.reader
1812    }
1813
1814    /// Read the metadata length for the next message from the underlying stream.
1815    ///
1816    /// # Returns
1817    /// - `Ok(None)` if the the reader signals the end of stream with EOF on
1818    ///   the first read
1819    /// - `Err(_)` if the reader returns an error other than on the first
1820    ///   read, or if the metadata length is less than 0.
1821    /// - `Ok(Some(_))` with the length otherwise.
1822    pub fn read_meta_len(&mut self) -> Result<Option<usize>, ArrowError> {
1823        let mut meta_len: [u8; 4] = [0; 4];
1824        match self.reader.read_exact(&mut meta_len) {
1825            Ok(_) => {}
1826            Err(e) => {
1827                return if e.kind() == std::io::ErrorKind::UnexpectedEof {
1828                    // Handle EOF without the "0xFFFFFFFF 0x00000000"
1829                    // valid according to:
1830                    // https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
1831                    Ok(None)
1832                } else {
1833                    Err(ArrowError::from(e))
1834                };
1835            }
1836        };
1837
1838        let meta_len = {
1839            // If a continuation marker is encountered, skip over it and read
1840            // the size from the next four bytes.
1841            if meta_len == CONTINUATION_MARKER {
1842                self.reader.read_exact(&mut meta_len)?;
1843            }
1844
1845            i32::from_le_bytes(meta_len)
1846        };
1847
1848        if meta_len == 0 {
1849            return Ok(None);
1850        }
1851
1852        let meta_len = usize::try_from(meta_len)
1853            .map_err(|_| ArrowError::ParseError(format!("Invalid metadata length: {meta_len}")))?;
1854
1855        Ok(Some(meta_len))
1856    }
1857}
1858
1859#[cfg(test)]
1860mod tests {
1861    use std::io::Cursor;
1862
1863    use crate::convert::fb_to_schema;
1864    use crate::writer::{
1865        DictionaryTracker, IpcDataGenerator, IpcWriteOptions, unslice_run_array, write_message,
1866    };
1867
1868    use super::*;
1869
1870    use crate::{root_as_footer, root_as_message, size_prefixed_root_as_message};
1871    use arrow_array::builder::{PrimitiveRunBuilder, UnionBuilder};
1872    use arrow_array::types::*;
1873    use arrow_buffer::{NullBuffer, OffsetBuffer};
1874    use arrow_data::ArrayDataBuilder;
1875
1876    fn create_test_projection_schema() -> Schema {
1877        // define field types
1878        let list_data_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1879
1880        let fixed_size_list_data_type =
1881            DataType::FixedSizeList(Arc::new(Field::new_list_field(DataType::Int32, false)), 3);
1882
1883        let union_fields = UnionFields::from_fields(vec![
1884            Field::new("a", DataType::Int32, false),
1885            Field::new("b", DataType::Float64, false),
1886        ]);
1887
1888        let union_data_type = DataType::Union(union_fields, UnionMode::Dense);
1889
1890        let struct_fields = Fields::from(vec![
1891            Field::new("id", DataType::Int32, false),
1892            Field::new_list("list", Field::new_list_field(DataType::Int8, true), false),
1893        ]);
1894        let struct_data_type = DataType::Struct(struct_fields);
1895
1896        let run_encoded_data_type = DataType::RunEndEncoded(
1897            Arc::new(Field::new("run_ends", DataType::Int16, false)),
1898            Arc::new(Field::new("values", DataType::Int32, true)),
1899        );
1900
1901        // define schema
1902        Schema::new(vec![
1903            Field::new("f0", DataType::UInt32, false),
1904            Field::new("f1", DataType::Utf8, false),
1905            Field::new("f2", DataType::Boolean, false),
1906            Field::new("f3", union_data_type, true),
1907            Field::new("f4", DataType::Null, true),
1908            Field::new("f5", DataType::Float64, true),
1909            Field::new("f6", list_data_type, false),
1910            Field::new("f7", DataType::FixedSizeBinary(3), true),
1911            Field::new("f8", fixed_size_list_data_type, false),
1912            Field::new("f9", struct_data_type, false),
1913            Field::new("f10", run_encoded_data_type, false),
1914            Field::new("f11", DataType::Boolean, false),
1915            Field::new_dictionary("f12", DataType::Int8, DataType::Utf8, false),
1916            Field::new("f13", DataType::Utf8, false),
1917        ])
1918    }
1919
1920    fn create_test_projection_batch_data(schema: &Schema) -> RecordBatch {
1921        // set test data for each column
1922        let array0 = UInt32Array::from(vec![1, 2, 3]);
1923        let array1 = StringArray::from(vec!["foo", "bar", "baz"]);
1924        let array2 = BooleanArray::from(vec![true, false, true]);
1925
1926        let mut union_builder = UnionBuilder::new_dense();
1927        union_builder.append::<Int32Type>("a", 1).unwrap();
1928        union_builder.append::<Float64Type>("b", 10.1).unwrap();
1929        union_builder.append_null::<Float64Type>("b").unwrap();
1930        let array3 = union_builder.build().unwrap();
1931
1932        let array4 = NullArray::new(3);
1933        let array5 = Float64Array::from(vec![Some(1.1), None, Some(3.3)]);
1934        let array6_values = vec![
1935            Some(vec![Some(10), Some(10), Some(10)]),
1936            Some(vec![Some(20), Some(20), Some(20)]),
1937            Some(vec![Some(30), Some(30)]),
1938        ];
1939        let array6 = ListArray::from_iter_primitive::<Int32Type, _, _>(array6_values);
1940        let array7_values = vec![vec![11, 12, 13], vec![22, 23, 24], vec![33, 34, 35]];
1941        let array7 = FixedSizeBinaryArray::try_from_iter(array7_values.into_iter()).unwrap();
1942
1943        let array8_values = ArrayData::builder(DataType::Int32)
1944            .len(9)
1945            .add_buffer(Buffer::from_slice_ref([40, 41, 42, 43, 44, 45, 46, 47, 48]))
1946            .build()
1947            .unwrap();
1948        let array8_data = ArrayData::builder(schema.field(8).data_type().clone())
1949            .len(3)
1950            .add_child_data(array8_values)
1951            .build()
1952            .unwrap();
1953        let array8 = FixedSizeListArray::from(array8_data);
1954
1955        let array9_id: ArrayRef = Arc::new(Int32Array::from(vec![1001, 1002, 1003]));
1956        let array9_list: ArrayRef =
1957            Arc::new(ListArray::from_iter_primitive::<Int8Type, _, _>(vec![
1958                Some(vec![Some(-10)]),
1959                Some(vec![Some(-20), Some(-20), Some(-20)]),
1960                Some(vec![Some(-30)]),
1961            ]));
1962        let array9 = ArrayDataBuilder::new(schema.field(9).data_type().clone())
1963            .add_child_data(array9_id.into_data())
1964            .add_child_data(array9_list.into_data())
1965            .len(3)
1966            .build()
1967            .unwrap();
1968        let array9 = StructArray::from(array9);
1969
1970        let array10_input = vec![Some(1_i32), None, None];
1971        let mut array10_builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
1972        array10_builder.extend(array10_input);
1973        let array10 = array10_builder.finish();
1974
1975        let array11 = BooleanArray::from(vec![false, false, true]);
1976
1977        let array12_values = StringArray::from(vec!["x", "yy", "zzz"]);
1978        let array12_keys = Int8Array::from_iter_values([1, 1, 2]);
1979        let array12 = DictionaryArray::new(array12_keys, Arc::new(array12_values));
1980
1981        let array13 = StringArray::from(vec!["a", "bb", "ccc"]);
1982
1983        // create record batch
1984        RecordBatch::try_new(
1985            Arc::new(schema.clone()),
1986            vec![
1987                Arc::new(array0),
1988                Arc::new(array1),
1989                Arc::new(array2),
1990                Arc::new(array3),
1991                Arc::new(array4),
1992                Arc::new(array5),
1993                Arc::new(array6),
1994                Arc::new(array7),
1995                Arc::new(array8),
1996                Arc::new(array9),
1997                Arc::new(array10),
1998                Arc::new(array11),
1999                Arc::new(array12),
2000                Arc::new(array13),
2001            ],
2002        )
2003        .unwrap()
2004    }
2005
2006    #[test]
2007    fn test_negative_meta_len_start_stream() {
2008        let bytes = i32::to_le_bytes(-1);
2009        let mut buf = vec![];
2010        buf.extend(CONTINUATION_MARKER);
2011        buf.extend(bytes);
2012
2013        let reader_err = StreamReader::try_new(Cursor::new(buf), None).err();
2014        assert!(reader_err.is_some());
2015        assert_eq!(
2016            reader_err.unwrap().to_string(),
2017            "Parser error: Invalid metadata length: -1"
2018        );
2019    }
2020
2021    #[test]
2022    fn test_negative_meta_len_mid_stream() {
2023        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2024        let mut buf = Vec::new();
2025        {
2026            let mut writer = crate::writer::StreamWriter::try_new(&mut buf, &schema).unwrap();
2027            let batch =
2028                RecordBatch::try_new(Arc::new(schema), vec![Arc::new(Int32Array::from(vec![1]))])
2029                    .unwrap();
2030            writer.write(&batch).unwrap();
2031        }
2032
2033        let bytes = i32::to_le_bytes(-1);
2034        buf.extend(CONTINUATION_MARKER);
2035        buf.extend(bytes);
2036
2037        let mut reader = StreamReader::try_new(Cursor::new(buf), None).unwrap();
2038        // Read the valid value
2039        assert!(reader.maybe_next().is_ok());
2040        // Read the invalid meta len
2041        let batch_err = reader.maybe_next().err();
2042        assert!(batch_err.is_some());
2043        assert_eq!(
2044            batch_err.unwrap().to_string(),
2045            "Parser error: Invalid metadata length: -1"
2046        );
2047    }
2048
2049    #[test]
2050    fn test_missing_buffer_metadata_error() {
2051        use crate::r#gen::Message::*;
2052        use flatbuffers::FlatBufferBuilder;
2053
2054        let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Int32, true)]));
2055
2056        // create RecordBatch buffer metadata with invalid buffer count
2057        // Int32Array needs 2 buffers (validity + data) but we provide only 1
2058        let mut fbb = FlatBufferBuilder::new();
2059        let nodes = fbb.create_vector(&[FieldNode::new(2, 0)]);
2060        let buffers = fbb.create_vector(&[crate::Buffer::new(0, 8)]);
2061        let batch_offset = RecordBatch::create(
2062            &mut fbb,
2063            &RecordBatchArgs {
2064                length: 2,
2065                nodes: Some(nodes),
2066                buffers: Some(buffers),
2067                compression: None,
2068                variadicBufferCounts: None,
2069            },
2070        );
2071        fbb.finish_minimal(batch_offset);
2072        let batch_bytes = fbb.finished_data().to_vec();
2073        let batch = flatbuffers::root::<RecordBatch>(&batch_bytes).unwrap();
2074
2075        let data_buffer = Buffer::from(vec![0u8; 8]);
2076        let dictionaries: HashMap<i64, ArrayRef> = HashMap::new();
2077        let metadata = MetadataVersion::V5;
2078
2079        let decoder = RecordBatchDecoder::try_new(
2080            &data_buffer,
2081            batch,
2082            schema.clone(),
2083            &dictionaries,
2084            &metadata,
2085        )
2086        .unwrap();
2087
2088        let result = decoder.read_record_batch();
2089
2090        match result {
2091            Err(ArrowError::IpcError(msg)) => {
2092                assert_eq!(msg, "Buffer count mismatched with metadata");
2093            }
2094            other => panic!("unexpected error: {other:?}"),
2095        }
2096    }
2097
2098    #[test]
2099    fn test_projection_array_values() {
2100        // define schema
2101        let schema = create_test_projection_schema();
2102
2103        // create record batch with test data
2104        let batch = create_test_projection_batch_data(&schema);
2105
2106        // write record batch in IPC format
2107        let mut buf = Vec::new();
2108        {
2109            let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
2110            writer.write(&batch).unwrap();
2111            writer.finish().unwrap();
2112        }
2113
2114        // read record batch with projection
2115        for index in 0..12 {
2116            let projection = vec![index];
2117            let reader = FileReader::try_new(std::io::Cursor::new(buf.clone()), Some(projection));
2118            let read_batch = reader.unwrap().next().unwrap().unwrap();
2119            let projected_column = read_batch.column(0);
2120            let expected_column = batch.column(index);
2121
2122            // check the projected column equals the expected column
2123            assert_eq!(projected_column.as_ref(), expected_column.as_ref());
2124        }
2125
2126        {
2127            // read record batch with reversed projection
2128            let reader =
2129                FileReader::try_new(std::io::Cursor::new(buf.clone()), Some(vec![3, 2, 1]));
2130            let read_batch = reader.unwrap().next().unwrap().unwrap();
2131            let expected_batch = batch.project(&[3, 2, 1]).unwrap();
2132            assert_eq!(read_batch, expected_batch);
2133        }
2134    }
2135
2136    #[test]
2137    fn test_arrow_single_float_row() {
2138        let schema = Schema::new(vec![
2139            Field::new("a", DataType::Float32, false),
2140            Field::new("b", DataType::Float32, false),
2141            Field::new("c", DataType::Int32, false),
2142            Field::new("d", DataType::Int32, false),
2143        ]);
2144        let arrays = vec![
2145            Arc::new(Float32Array::from(vec![1.23])) as ArrayRef,
2146            Arc::new(Float32Array::from(vec![-6.50])) as ArrayRef,
2147            Arc::new(Int32Array::from(vec![2])) as ArrayRef,
2148            Arc::new(Int32Array::from(vec![1])) as ArrayRef,
2149        ];
2150        let batch = RecordBatch::try_new(Arc::new(schema.clone()), arrays).unwrap();
2151        // create stream writer
2152        let mut file = tempfile::tempfile().unwrap();
2153        let mut stream_writer = crate::writer::StreamWriter::try_new(&mut file, &schema).unwrap();
2154        stream_writer.write(&batch).unwrap();
2155        stream_writer.finish().unwrap();
2156
2157        drop(stream_writer);
2158
2159        file.rewind().unwrap();
2160
2161        // read stream back
2162        let reader = StreamReader::try_new(&mut file, None).unwrap();
2163
2164        reader.for_each(|batch| {
2165            let batch = batch.unwrap();
2166            assert!(
2167                batch
2168                    .column(0)
2169                    .as_any()
2170                    .downcast_ref::<Float32Array>()
2171                    .unwrap()
2172                    .value(0)
2173                    != 0.0
2174            );
2175            assert!(
2176                batch
2177                    .column(1)
2178                    .as_any()
2179                    .downcast_ref::<Float32Array>()
2180                    .unwrap()
2181                    .value(0)
2182                    != 0.0
2183            );
2184        });
2185
2186        file.rewind().unwrap();
2187
2188        // Read with projection
2189        let reader = StreamReader::try_new(file, Some(vec![0, 3])).unwrap();
2190
2191        reader.for_each(|batch| {
2192            let batch = batch.unwrap();
2193            assert_eq!(batch.schema().fields().len(), 2);
2194            assert_eq!(batch.schema().fields()[0].data_type(), &DataType::Float32);
2195            assert_eq!(batch.schema().fields()[1].data_type(), &DataType::Int32);
2196        });
2197    }
2198
2199    /// Write the record batch to an in-memory buffer in IPC File format
2200    fn write_ipc(rb: &RecordBatch) -> Vec<u8> {
2201        let mut buf = Vec::new();
2202        let mut writer = crate::writer::FileWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
2203        writer.write(rb).unwrap();
2204        writer.finish().unwrap();
2205        buf
2206    }
2207
2208    /// Return the first record batch read from the IPC File buffer
2209    fn read_ipc(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2210        let mut reader = FileReader::try_new(std::io::Cursor::new(buf), None)?;
2211        reader.next().unwrap()
2212    }
2213
2214    /// Return the first record batch read from the IPC File buffer, disabling
2215    /// validation
2216    fn read_ipc_skip_validation(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2217        let mut reader = unsafe {
2218            FileReader::try_new(std::io::Cursor::new(buf), None)?.with_skip_validation(true)
2219        };
2220        reader.next().unwrap()
2221    }
2222
2223    fn roundtrip_ipc(rb: &RecordBatch) -> RecordBatch {
2224        let buf = write_ipc(rb);
2225        read_ipc(&buf).unwrap()
2226    }
2227
2228    /// Return the first record batch read from the IPC File buffer
2229    /// using the FileDecoder API
2230    fn read_ipc_with_decoder(buf: Vec<u8>) -> Result<RecordBatch, ArrowError> {
2231        read_ipc_with_decoder_inner(buf, false)
2232    }
2233
2234    /// Return the first record batch read from the IPC File buffer
2235    /// using the FileDecoder API, disabling validation
2236    fn read_ipc_with_decoder_skip_validation(buf: Vec<u8>) -> Result<RecordBatch, ArrowError> {
2237        read_ipc_with_decoder_inner(buf, true)
2238    }
2239
2240    fn read_ipc_with_decoder_inner(
2241        buf: Vec<u8>,
2242        skip_validation: bool,
2243    ) -> Result<RecordBatch, ArrowError> {
2244        let buffer = Buffer::from_vec(buf);
2245        let trailer_start = buffer.len() - 10;
2246        let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap())?;
2247        let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start])
2248            .map_err(|e| ArrowError::InvalidArgumentError(format!("Invalid footer: {e}")))?;
2249
2250        let schema = fb_to_schema(footer.schema().unwrap());
2251
2252        let mut decoder = unsafe {
2253            FileDecoder::new(Arc::new(schema), footer.version())
2254                .with_skip_validation(skip_validation)
2255        };
2256        // Read dictionaries
2257        for block in footer.dictionaries().iter().flatten() {
2258            let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2259            let data = buffer.slice_with_length(block.offset() as _, block_len);
2260            decoder.read_dictionary(block, &data)?
2261        }
2262
2263        // Read record batch
2264        let batches = footer.recordBatches().unwrap();
2265        assert_eq!(batches.len(), 1); // Only wrote a single batch
2266
2267        let block = batches.get(0);
2268        let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2269        let data = buffer.slice_with_length(block.offset() as _, block_len);
2270        Ok(decoder.read_record_batch(block, &data)?.unwrap())
2271    }
2272
2273    /// Write the record batch to an in-memory buffer in IPC Stream format
2274    fn write_stream(rb: &RecordBatch) -> Vec<u8> {
2275        let mut buf = Vec::new();
2276        let mut writer = crate::writer::StreamWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
2277        writer.write(rb).unwrap();
2278        writer.finish().unwrap();
2279        buf
2280    }
2281
2282    /// Return the first record batch read from the IPC Stream buffer
2283    fn read_stream(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2284        let mut reader = StreamReader::try_new(std::io::Cursor::new(buf), None)?;
2285        reader.next().unwrap()
2286    }
2287
2288    /// Return the first record batch read from the IPC Stream buffer,
2289    /// disabling validation
2290    fn read_stream_skip_validation(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2291        let mut reader = unsafe {
2292            StreamReader::try_new(std::io::Cursor::new(buf), None)?.with_skip_validation(true)
2293        };
2294        reader.next().unwrap()
2295    }
2296
2297    fn roundtrip_ipc_stream(rb: &RecordBatch) -> RecordBatch {
2298        let buf = write_stream(rb);
2299        read_stream(&buf).unwrap()
2300    }
2301
2302    #[test]
2303    fn test_roundtrip_with_custom_metadata() {
2304        let schema = Schema::new(vec![Field::new("dummy", DataType::Float64, false)]);
2305        let mut buf = Vec::new();
2306        let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
2307        let mut test_metadata = HashMap::new();
2308        test_metadata.insert("abc".to_string(), "abc".to_string());
2309        test_metadata.insert("def".to_string(), "def".to_string());
2310        for (k, v) in &test_metadata {
2311            writer.write_metadata(k, v);
2312        }
2313        writer.finish().unwrap();
2314        drop(writer);
2315
2316        let reader = crate::reader::FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
2317        assert_eq!(reader.custom_metadata(), &test_metadata);
2318    }
2319
2320    #[test]
2321    fn test_roundtrip_nested_dict() {
2322        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2323
2324        let array = Arc::new(inner) as ArrayRef;
2325
2326        let dctfield = Arc::new(Field::new("dict", array.data_type().clone(), false));
2327
2328        let s = StructArray::from(vec![(dctfield, array)]);
2329        let struct_array = Arc::new(s) as ArrayRef;
2330
2331        let schema = Arc::new(Schema::new(vec![Field::new(
2332            "struct",
2333            struct_array.data_type().clone(),
2334            false,
2335        )]));
2336
2337        let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2338
2339        assert_eq!(batch, roundtrip_ipc(&batch));
2340    }
2341
2342    #[test]
2343    fn test_roundtrip_nested_dict_no_preserve_dict_id() {
2344        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2345
2346        let array = Arc::new(inner) as ArrayRef;
2347
2348        let dctfield = Arc::new(Field::new("dict", array.data_type().clone(), false));
2349
2350        let s = StructArray::from(vec![(dctfield, array)]);
2351        let struct_array = Arc::new(s) as ArrayRef;
2352
2353        let schema = Arc::new(Schema::new(vec![Field::new(
2354            "struct",
2355            struct_array.data_type().clone(),
2356            false,
2357        )]));
2358
2359        let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2360
2361        let mut buf = Vec::new();
2362        let mut writer = crate::writer::FileWriter::try_new_with_options(
2363            &mut buf,
2364            batch.schema_ref(),
2365            IpcWriteOptions::default(),
2366        )
2367        .unwrap();
2368        writer.write(&batch).unwrap();
2369        writer.finish().unwrap();
2370        drop(writer);
2371
2372        let mut reader = FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
2373
2374        assert_eq!(batch, reader.next().unwrap().unwrap());
2375    }
2376
2377    fn check_union_with_builder(mut builder: UnionBuilder) {
2378        builder.append::<Int32Type>("a", 1).unwrap();
2379        builder.append_null::<Int32Type>("a").unwrap();
2380        builder.append::<Float64Type>("c", 3.0).unwrap();
2381        builder.append::<Int32Type>("a", 4).unwrap();
2382        builder.append::<Int64Type>("d", 11).unwrap();
2383        let union = builder.build().unwrap();
2384
2385        let schema = Arc::new(Schema::new(vec![Field::new(
2386            "union",
2387            union.data_type().clone(),
2388            false,
2389        )]));
2390
2391        let union_array = Arc::new(union) as ArrayRef;
2392
2393        let rb = RecordBatch::try_new(schema, vec![union_array]).unwrap();
2394        let rb2 = roundtrip_ipc(&rb);
2395        // TODO: equality not yet implemented for union, so we check that the length of the array is
2396        // the same and that all of the buffers are the same instead.
2397        assert_eq!(rb.schema(), rb2.schema());
2398        assert_eq!(rb.num_columns(), rb2.num_columns());
2399        assert_eq!(rb.num_rows(), rb2.num_rows());
2400        let union1 = rb.column(0);
2401        let union2 = rb2.column(0);
2402
2403        assert_eq!(union1, union2);
2404    }
2405
2406    #[test]
2407    fn test_roundtrip_dense_union() {
2408        check_union_with_builder(UnionBuilder::new_dense());
2409    }
2410
2411    #[test]
2412    fn test_roundtrip_sparse_union() {
2413        check_union_with_builder(UnionBuilder::new_sparse());
2414    }
2415
2416    #[test]
2417    fn test_roundtrip_struct_empty_fields() {
2418        let nulls = NullBuffer::from(&[true, true, false]);
2419        let rb = RecordBatch::try_from_iter([(
2420            "",
2421            Arc::new(StructArray::new_empty_fields(nulls.len(), Some(nulls))) as _,
2422        )])
2423        .unwrap();
2424        let rb2 = roundtrip_ipc(&rb);
2425        assert_eq!(rb, rb2);
2426    }
2427
2428    #[test]
2429    fn test_roundtrip_stream_run_array_sliced() {
2430        let run_array_1: Int32RunArray = vec!["a", "a", "a", "b", "b", "c", "c", "c"]
2431            .into_iter()
2432            .collect();
2433        let run_array_1_sliced = run_array_1.slice(2, 5);
2434
2435        let run_array_2_inupt = vec![Some(1_i32), None, None, Some(2), Some(2)];
2436        let mut run_array_2_builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
2437        run_array_2_builder.extend(run_array_2_inupt);
2438        let run_array_2 = run_array_2_builder.finish();
2439
2440        let schema = Arc::new(Schema::new(vec![
2441            Field::new(
2442                "run_array_1_sliced",
2443                run_array_1_sliced.data_type().clone(),
2444                false,
2445            ),
2446            Field::new("run_array_2", run_array_2.data_type().clone(), false),
2447        ]));
2448        let input_batch = RecordBatch::try_new(
2449            schema,
2450            vec![Arc::new(run_array_1_sliced.clone()), Arc::new(run_array_2)],
2451        )
2452        .unwrap();
2453        let output_batch = roundtrip_ipc_stream(&input_batch);
2454
2455        // As partial comparison not yet supported for run arrays, the sliced run array
2456        // has to be unsliced before comparing with the output. the second run array
2457        // can be compared as such.
2458        assert_eq!(input_batch.column(1), output_batch.column(1));
2459
2460        let run_array_1_unsliced = unslice_run_array(run_array_1_sliced.into_data()).unwrap();
2461        assert_eq!(run_array_1_unsliced, output_batch.column(0).into_data());
2462    }
2463
2464    #[test]
2465    fn test_roundtrip_stream_nested_dict() {
2466        let xs = vec!["AA", "BB", "AA", "CC", "BB"];
2467        let dict = Arc::new(
2468            xs.clone()
2469                .into_iter()
2470                .collect::<DictionaryArray<Int8Type>>(),
2471        );
2472        let string_array: ArrayRef = Arc::new(StringArray::from(xs.clone()));
2473        let struct_array = StructArray::from(vec![
2474            (
2475                Arc::new(Field::new("f2.1", DataType::Utf8, false)),
2476                string_array,
2477            ),
2478            (
2479                Arc::new(Field::new("f2.2_struct", dict.data_type().clone(), false)),
2480                dict.clone() as ArrayRef,
2481            ),
2482        ]);
2483        let schema = Arc::new(Schema::new(vec![
2484            Field::new("f1_string", DataType::Utf8, false),
2485            Field::new("f2_struct", struct_array.data_type().clone(), false),
2486        ]));
2487        let input_batch = RecordBatch::try_new(
2488            schema,
2489            vec![
2490                Arc::new(StringArray::from(xs.clone())),
2491                Arc::new(struct_array),
2492            ],
2493        )
2494        .unwrap();
2495        let output_batch = roundtrip_ipc_stream(&input_batch);
2496        assert_eq!(input_batch, output_batch);
2497    }
2498
2499    #[test]
2500    fn test_roundtrip_stream_nested_dict_of_map_of_dict() {
2501        let values = StringArray::from(vec![Some("a"), None, Some("b"), Some("c")]);
2502        let values = Arc::new(values) as ArrayRef;
2503        let value_dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 3, 1]);
2504        let value_dict_array = DictionaryArray::new(value_dict_keys, values.clone());
2505
2506        let key_dict_keys = Int8Array::from_iter_values([0, 0, 2, 1, 1, 3]);
2507        let key_dict_array = DictionaryArray::new(key_dict_keys, values);
2508
2509        #[allow(deprecated)]
2510        let keys_field = Arc::new(Field::new_dict(
2511            "keys",
2512            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2513            true, // It is technically not legal for this field to be null.
2514            1,
2515            false,
2516        ));
2517        #[allow(deprecated)]
2518        let values_field = Arc::new(Field::new_dict(
2519            "values",
2520            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2521            true,
2522            2,
2523            false,
2524        ));
2525        let entry_struct = StructArray::from(vec![
2526            (keys_field, make_array(key_dict_array.into_data())),
2527            (values_field, make_array(value_dict_array.into_data())),
2528        ]);
2529        let map_data_type = DataType::Map(
2530            Arc::new(Field::new(
2531                "entries",
2532                entry_struct.data_type().clone(),
2533                false,
2534            )),
2535            false,
2536        );
2537
2538        let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 6]);
2539        let map_data = ArrayData::builder(map_data_type)
2540            .len(3)
2541            .add_buffer(entry_offsets)
2542            .add_child_data(entry_struct.into_data())
2543            .build()
2544            .unwrap();
2545        let map_array = MapArray::from(map_data);
2546
2547        let dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 2, 1]);
2548        let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array));
2549
2550        let schema = Arc::new(Schema::new(vec![Field::new(
2551            "f1",
2552            dict_dict_array.data_type().clone(),
2553            false,
2554        )]));
2555        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2556        let output_batch = roundtrip_ipc_stream(&input_batch);
2557        assert_eq!(input_batch, output_batch);
2558    }
2559
2560    fn test_roundtrip_stream_dict_of_list_of_dict_impl<
2561        OffsetSize: OffsetSizeTrait,
2562        U: ArrowNativeType,
2563    >(
2564        list_data_type: DataType,
2565        offsets: &[U; 5],
2566    ) {
2567        let values = StringArray::from(vec![Some("a"), None, Some("c"), None]);
2568        let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
2569        let dict_array = DictionaryArray::new(keys, Arc::new(values));
2570        let dict_data = dict_array.to_data();
2571
2572        let value_offsets = Buffer::from_slice_ref(offsets);
2573
2574        let list_data = ArrayData::builder(list_data_type)
2575            .len(4)
2576            .add_buffer(value_offsets)
2577            .add_child_data(dict_data)
2578            .build()
2579            .unwrap();
2580        let list_array = GenericListArray::<OffsetSize>::from(list_data);
2581
2582        let keys_for_dict = Int8Array::from_iter_values([0, 3, 0, 1, 1, 2, 0, 1, 3]);
2583        let dict_dict_array = DictionaryArray::new(keys_for_dict, Arc::new(list_array));
2584
2585        let schema = Arc::new(Schema::new(vec![Field::new(
2586            "f1",
2587            dict_dict_array.data_type().clone(),
2588            false,
2589        )]));
2590        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2591        let output_batch = roundtrip_ipc_stream(&input_batch);
2592        assert_eq!(input_batch, output_batch);
2593    }
2594
2595    #[test]
2596    fn test_roundtrip_stream_dict_of_list_of_dict() {
2597        // list
2598        #[allow(deprecated)]
2599        let list_data_type = DataType::List(Arc::new(Field::new_dict(
2600            "item",
2601            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2602            true,
2603            1,
2604            false,
2605        )));
2606        let offsets: &[i32; 5] = &[0, 2, 4, 4, 6];
2607        test_roundtrip_stream_dict_of_list_of_dict_impl::<i32, i32>(list_data_type, offsets);
2608
2609        // large list
2610        #[allow(deprecated)]
2611        let list_data_type = DataType::LargeList(Arc::new(Field::new_dict(
2612            "item",
2613            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2614            true,
2615            1,
2616            false,
2617        )));
2618        let offsets: &[i64; 5] = &[0, 2, 4, 4, 7];
2619        test_roundtrip_stream_dict_of_list_of_dict_impl::<i64, i64>(list_data_type, offsets);
2620    }
2621
2622    #[test]
2623    fn test_roundtrip_stream_dict_of_fixed_size_list_of_dict() {
2624        let values = StringArray::from(vec![Some("a"), None, Some("c"), None]);
2625        let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3, 1, 2]);
2626        let dict_array = DictionaryArray::new(keys, Arc::new(values));
2627        let dict_data = dict_array.into_data();
2628
2629        #[allow(deprecated)]
2630        let list_data_type = DataType::FixedSizeList(
2631            Arc::new(Field::new_dict(
2632                "item",
2633                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2634                true,
2635                1,
2636                false,
2637            )),
2638            3,
2639        );
2640        let list_data = ArrayData::builder(list_data_type)
2641            .len(3)
2642            .add_child_data(dict_data)
2643            .build()
2644            .unwrap();
2645        let list_array = FixedSizeListArray::from(list_data);
2646
2647        let keys_for_dict = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]);
2648        let dict_dict_array = DictionaryArray::new(keys_for_dict, Arc::new(list_array));
2649
2650        let schema = Arc::new(Schema::new(vec![Field::new(
2651            "f1",
2652            dict_dict_array.data_type().clone(),
2653            false,
2654        )]));
2655        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2656        let output_batch = roundtrip_ipc_stream(&input_batch);
2657        assert_eq!(input_batch, output_batch);
2658    }
2659
2660    const LONG_TEST_STRING: &str =
2661        "This is a long string to make sure binary view array handles it";
2662
2663    #[test]
2664    fn test_roundtrip_view_types() {
2665        let schema = Schema::new(vec![
2666            Field::new("field_1", DataType::BinaryView, true),
2667            Field::new("field_2", DataType::Utf8, true),
2668            Field::new("field_3", DataType::Utf8View, true),
2669        ]);
2670        let bin_values: Vec<Option<&[u8]>> = vec![
2671            Some(b"foo"),
2672            None,
2673            Some(b"bar"),
2674            Some(LONG_TEST_STRING.as_bytes()),
2675        ];
2676        let utf8_values: Vec<Option<&str>> =
2677            vec![Some("foo"), None, Some("bar"), Some(LONG_TEST_STRING)];
2678        let bin_view_array = BinaryViewArray::from_iter(bin_values);
2679        let utf8_array = StringArray::from_iter(utf8_values.iter());
2680        let utf8_view_array = StringViewArray::from_iter(utf8_values);
2681        let record_batch = RecordBatch::try_new(
2682            Arc::new(schema.clone()),
2683            vec![
2684                Arc::new(bin_view_array),
2685                Arc::new(utf8_array),
2686                Arc::new(utf8_view_array),
2687            ],
2688        )
2689        .unwrap();
2690
2691        assert_eq!(record_batch, roundtrip_ipc(&record_batch));
2692        assert_eq!(record_batch, roundtrip_ipc_stream(&record_batch));
2693
2694        let sliced_batch = record_batch.slice(1, 2);
2695        assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
2696        assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
2697    }
2698
2699    #[test]
2700    fn test_roundtrip_view_types_nested_dict() {
2701        let bin_values: Vec<Option<&[u8]>> = vec![
2702            Some(b"foo"),
2703            None,
2704            Some(b"bar"),
2705            Some(LONG_TEST_STRING.as_bytes()),
2706            Some(b"field"),
2707        ];
2708        let utf8_values: Vec<Option<&str>> = vec![
2709            Some("foo"),
2710            None,
2711            Some("bar"),
2712            Some(LONG_TEST_STRING),
2713            Some("field"),
2714        ];
2715        let bin_view_array = Arc::new(BinaryViewArray::from_iter(bin_values));
2716        let utf8_view_array = Arc::new(StringViewArray::from_iter(utf8_values));
2717
2718        let key_dict_keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
2719        let key_dict_array = DictionaryArray::new(key_dict_keys, utf8_view_array.clone());
2720        #[allow(deprecated)]
2721        let keys_field = Arc::new(Field::new_dict(
2722            "keys",
2723            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8View)),
2724            true,
2725            1,
2726            false,
2727        ));
2728
2729        let value_dict_keys = Int8Array::from_iter_values([0, 3, 0, 1, 2, 0, 1]);
2730        let value_dict_array = DictionaryArray::new(value_dict_keys, bin_view_array);
2731        #[allow(deprecated)]
2732        let values_field = Arc::new(Field::new_dict(
2733            "values",
2734            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::BinaryView)),
2735            true,
2736            2,
2737            false,
2738        ));
2739        let entry_struct = StructArray::from(vec![
2740            (keys_field, make_array(key_dict_array.into_data())),
2741            (values_field, make_array(value_dict_array.into_data())),
2742        ]);
2743
2744        let map_data_type = DataType::Map(
2745            Arc::new(Field::new(
2746                "entries",
2747                entry_struct.data_type().clone(),
2748                false,
2749            )),
2750            false,
2751        );
2752        let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 7]);
2753        let map_data = ArrayData::builder(map_data_type)
2754            .len(3)
2755            .add_buffer(entry_offsets)
2756            .add_child_data(entry_struct.into_data())
2757            .build()
2758            .unwrap();
2759        let map_array = MapArray::from(map_data);
2760
2761        let dict_keys = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]);
2762        let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array));
2763        let schema = Arc::new(Schema::new(vec![Field::new(
2764            "f1",
2765            dict_dict_array.data_type().clone(),
2766            false,
2767        )]));
2768        let batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2769        assert_eq!(batch, roundtrip_ipc(&batch));
2770        assert_eq!(batch, roundtrip_ipc_stream(&batch));
2771
2772        let sliced_batch = batch.slice(1, 2);
2773        assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
2774        assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
2775    }
2776
2777    #[test]
2778    fn test_no_columns_batch() {
2779        let schema = Arc::new(Schema::empty());
2780        let options = RecordBatchOptions::new()
2781            .with_match_field_names(true)
2782            .with_row_count(Some(10));
2783        let input_batch = RecordBatch::try_new_with_options(schema, vec![], &options).unwrap();
2784        let output_batch = roundtrip_ipc_stream(&input_batch);
2785        assert_eq!(input_batch, output_batch);
2786    }
2787
2788    #[test]
2789    fn test_unaligned() {
2790        let batch = RecordBatch::try_from_iter(vec![(
2791            "i32",
2792            Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
2793        )])
2794        .unwrap();
2795
2796        let r#gen = IpcDataGenerator {};
2797        let mut dict_tracker = DictionaryTracker::new(false);
2798        let (_, encoded) = r#gen
2799            .encode(
2800                &batch,
2801                &mut dict_tracker,
2802                &Default::default(),
2803                &mut Default::default(),
2804            )
2805            .unwrap();
2806
2807        let message = root_as_message(&encoded.ipc_message).unwrap();
2808
2809        // Construct an unaligned buffer
2810        let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
2811        buffer.push(0_u8);
2812        buffer.extend_from_slice(&encoded.arrow_data);
2813        let b = Buffer::from(buffer).slice(1);
2814        assert_ne!(b.as_ptr().align_offset(8), 0);
2815
2816        let ipc_batch = message.header_as_record_batch().unwrap();
2817        let roundtrip = RecordBatchDecoder::try_new(
2818            &b,
2819            ipc_batch,
2820            batch.schema(),
2821            &Default::default(),
2822            &message.version(),
2823        )
2824        .unwrap()
2825        .with_require_alignment(false)
2826        .read_record_batch()
2827        .unwrap();
2828        assert_eq!(batch, roundtrip);
2829    }
2830
2831    #[test]
2832    fn test_unaligned_throws_error_with_require_alignment() {
2833        let batch = RecordBatch::try_from_iter(vec![(
2834            "i32",
2835            Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
2836        )])
2837        .unwrap();
2838
2839        let r#gen = IpcDataGenerator {};
2840        let mut dict_tracker = DictionaryTracker::new(false);
2841        let (_, encoded) = r#gen
2842            .encode(
2843                &batch,
2844                &mut dict_tracker,
2845                &Default::default(),
2846                &mut Default::default(),
2847            )
2848            .unwrap();
2849
2850        let message = root_as_message(&encoded.ipc_message).unwrap();
2851
2852        // Construct an unaligned buffer
2853        let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
2854        buffer.push(0_u8);
2855        buffer.extend_from_slice(&encoded.arrow_data);
2856        let b = Buffer::from(buffer).slice(1);
2857        assert_ne!(b.as_ptr().align_offset(8), 0);
2858
2859        let ipc_batch = message.header_as_record_batch().unwrap();
2860        let result = RecordBatchDecoder::try_new(
2861            &b,
2862            ipc_batch,
2863            batch.schema(),
2864            &Default::default(),
2865            &message.version(),
2866        )
2867        .unwrap()
2868        .with_require_alignment(true)
2869        .read_record_batch();
2870
2871        let error = result.unwrap_err();
2872        assert_eq!(
2873            error.to_string(),
2874            "Invalid argument error: Misaligned buffers[0] in array of type Int32, \
2875             offset from expected alignment of 4 by 1"
2876        );
2877    }
2878
2879    #[test]
2880    fn test_file_with_massive_column_count() {
2881        // 499_999 is upper limit for default settings (1_000_000)
2882        let limit = 600_000;
2883
2884        let fields = (0..limit)
2885            .map(|i| Field::new(format!("{i}"), DataType::Boolean, false))
2886            .collect::<Vec<_>>();
2887        let schema = Arc::new(Schema::new(fields));
2888        let batch = RecordBatch::new_empty(schema);
2889
2890        let mut buf = Vec::new();
2891        let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
2892        writer.write(&batch).unwrap();
2893        writer.finish().unwrap();
2894        drop(writer);
2895
2896        let mut reader = FileReaderBuilder::new()
2897            .with_max_footer_fb_tables(1_500_000)
2898            .build(std::io::Cursor::new(buf))
2899            .unwrap();
2900        let roundtrip_batch = reader.next().unwrap().unwrap();
2901
2902        assert_eq!(batch, roundtrip_batch);
2903    }
2904
2905    #[test]
2906    fn test_file_with_deeply_nested_columns() {
2907        // 60 is upper limit for default settings (64)
2908        let limit = 61;
2909
2910        let fields = (0..limit).fold(
2911            vec![Field::new("leaf", DataType::Boolean, false)],
2912            |field, index| vec![Field::new_struct(format!("{index}"), field, false)],
2913        );
2914        let schema = Arc::new(Schema::new(fields));
2915        let batch = RecordBatch::new_empty(schema);
2916
2917        let mut buf = Vec::new();
2918        let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
2919        writer.write(&batch).unwrap();
2920        writer.finish().unwrap();
2921        drop(writer);
2922
2923        let mut reader = FileReaderBuilder::new()
2924            .with_max_footer_fb_depth(65)
2925            .build(std::io::Cursor::new(buf))
2926            .unwrap();
2927        let roundtrip_batch = reader.next().unwrap().unwrap();
2928
2929        assert_eq!(batch, roundtrip_batch);
2930    }
2931
2932    #[test]
2933    fn test_invalid_struct_array_ipc_read_errors() {
2934        let a_field = Field::new("a", DataType::Int32, false);
2935        let b_field = Field::new("b", DataType::Int32, false);
2936        let struct_fields = Fields::from(vec![a_field.clone(), b_field.clone()]);
2937
2938        let a_array_data = ArrayData::builder(a_field.data_type().clone())
2939            .len(4)
2940            .add_buffer(Buffer::from_slice_ref([1, 2, 3, 4]))
2941            .build()
2942            .unwrap();
2943        let b_array_data = ArrayData::builder(b_field.data_type().clone())
2944            .len(3)
2945            .add_buffer(Buffer::from_slice_ref([5, 6, 7]))
2946            .build()
2947            .unwrap();
2948
2949        let invalid_struct_arr = unsafe {
2950            StructArray::new_unchecked(
2951                struct_fields,
2952                vec![make_array(a_array_data), make_array(b_array_data)],
2953                None,
2954            )
2955        };
2956
2957        expect_ipc_validation_error(
2958            Arc::new(invalid_struct_arr),
2959            "Invalid argument error: Incorrect array length for StructArray field \"b\", expected 4 got 3",
2960        );
2961    }
2962
2963    #[test]
2964    fn test_invalid_nested_array_ipc_read_errors() {
2965        // one of the nested arrays has invalid data
2966        let a_field = Field::new("a", DataType::Int32, false);
2967        let b_field = Field::new("b", DataType::Utf8, false);
2968
2969        let schema = Arc::new(Schema::new(vec![Field::new_struct(
2970            "s",
2971            vec![a_field.clone(), b_field.clone()],
2972            false,
2973        )]));
2974
2975        let a_array_data = ArrayData::builder(a_field.data_type().clone())
2976            .len(4)
2977            .add_buffer(Buffer::from_slice_ref([1, 2, 3, 4]))
2978            .build()
2979            .unwrap();
2980        // invalid nested child array -- length is correct, but has invalid utf8 data
2981        let b_array_data = {
2982            let valid: &[u8] = b"   ";
2983            let mut invalid = vec![];
2984            invalid.extend_from_slice(b"ValidString");
2985            invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
2986            let binary_array =
2987                BinaryArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
2988            let array = unsafe {
2989                StringArray::new_unchecked(
2990                    binary_array.offsets().clone(),
2991                    binary_array.values().clone(),
2992                    binary_array.nulls().cloned(),
2993                )
2994            };
2995            array.into_data()
2996        };
2997        let struct_data_type = schema.field(0).data_type();
2998
2999        let invalid_struct_arr = unsafe {
3000            make_array(
3001                ArrayData::builder(struct_data_type.clone())
3002                    .len(4)
3003                    .add_child_data(a_array_data)
3004                    .add_child_data(b_array_data)
3005                    .build_unchecked(),
3006            )
3007        };
3008        expect_ipc_validation_error(
3009            invalid_struct_arr,
3010            "Invalid argument error: Invalid UTF8 sequence at string index 3 (3..18): invalid utf-8 sequence of 1 bytes from index 11",
3011        );
3012    }
3013
3014    #[test]
3015    fn test_same_dict_id_without_preserve() {
3016        let batch = RecordBatch::try_new(
3017            Arc::new(Schema::new(
3018                ["a", "b"]
3019                    .iter()
3020                    .map(|name| {
3021                        #[allow(deprecated)]
3022                        Field::new_dict(
3023                            name.to_string(),
3024                            DataType::Dictionary(
3025                                Box::new(DataType::Int32),
3026                                Box::new(DataType::Utf8),
3027                            ),
3028                            true,
3029                            0,
3030                            false,
3031                        )
3032                    })
3033                    .collect::<Vec<Field>>(),
3034            )),
3035            vec![
3036                Arc::new(
3037                    vec![Some("c"), Some("d")]
3038                        .into_iter()
3039                        .collect::<DictionaryArray<Int32Type>>(),
3040                ) as ArrayRef,
3041                Arc::new(
3042                    vec![Some("e"), Some("f")]
3043                        .into_iter()
3044                        .collect::<DictionaryArray<Int32Type>>(),
3045                ) as ArrayRef,
3046            ],
3047        )
3048        .expect("Failed to create RecordBatch");
3049
3050        // serialize the record batch as an IPC stream
3051        let mut buf = vec![];
3052        {
3053            let mut writer = crate::writer::StreamWriter::try_new_with_options(
3054                &mut buf,
3055                batch.schema().as_ref(),
3056                crate::writer::IpcWriteOptions::default(),
3057            )
3058            .expect("Failed to create StreamWriter");
3059            writer.write(&batch).expect("Failed to write RecordBatch");
3060            writer.finish().expect("Failed to finish StreamWriter");
3061        }
3062
3063        StreamReader::try_new(std::io::Cursor::new(buf), None)
3064            .expect("Failed to create StreamReader")
3065            .for_each(|decoded_batch| {
3066                assert_eq!(decoded_batch.expect("Failed to read RecordBatch"), batch);
3067            });
3068    }
3069
3070    #[test]
3071    fn test_validation_of_invalid_list_array() {
3072        // ListArray with invalid offsets
3073        let array = unsafe {
3074            let values = Int32Array::from(vec![1, 2, 3]);
3075            let bad_offsets = ScalarBuffer::<i32>::from(vec![0, 2, 4, 2]); // offsets can't go backwards
3076            let offsets = OffsetBuffer::new_unchecked(bad_offsets); // INVALID array created
3077            let field = Field::new_list_field(DataType::Int32, true);
3078            let nulls = None;
3079            ListArray::new(Arc::new(field), offsets, Arc::new(values), nulls)
3080        };
3081
3082        expect_ipc_validation_error(
3083            Arc::new(array),
3084            "Invalid argument error: Offset invariant failure: offset at position 2 out of bounds: 4 > 2",
3085        );
3086    }
3087
3088    #[test]
3089    fn test_validation_of_invalid_string_array() {
3090        let valid: &[u8] = b"   ";
3091        let mut invalid = vec![];
3092        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3093        invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3094        let binary_array = BinaryArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
3095        // data is not valid utf8 we can not construct a correct StringArray
3096        // safely, so purposely create an invalid StringArray
3097        let array = unsafe {
3098            StringArray::new_unchecked(
3099                binary_array.offsets().clone(),
3100                binary_array.values().clone(),
3101                binary_array.nulls().cloned(),
3102            )
3103        };
3104        expect_ipc_validation_error(
3105            Arc::new(array),
3106            "Invalid argument error: Invalid UTF8 sequence at string index 3 (3..45): invalid utf-8 sequence of 1 bytes from index 38",
3107        );
3108    }
3109
3110    #[test]
3111    fn test_validation_of_invalid_string_view_array() {
3112        let valid: &[u8] = b"   ";
3113        let mut invalid = vec![];
3114        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3115        invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3116        let binary_view_array =
3117            BinaryViewArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
3118        // data is not valid utf8 we can not construct a correct StringArray
3119        // safely, so purposely create an invalid StringArray
3120        let array = unsafe {
3121            StringViewArray::new_unchecked(
3122                binary_view_array.views().clone(),
3123                binary_view_array.data_buffers().to_vec(),
3124                binary_view_array.nulls().cloned(),
3125            )
3126        };
3127        expect_ipc_validation_error(
3128            Arc::new(array),
3129            "Invalid argument error: Encountered non-UTF-8 data at index 3: invalid utf-8 sequence of 1 bytes from index 38",
3130        );
3131    }
3132
3133    /// return an invalid dictionary array (key is larger than values)
3134    /// ListArray with invalid offsets
3135    #[test]
3136    fn test_validation_of_invalid_dictionary_array() {
3137        let array = unsafe {
3138            let values = StringArray::from_iter_values(["a", "b", "c"]);
3139            let keys = Int32Array::from(vec![1, 200]); // keys are not valid for values
3140            DictionaryArray::new_unchecked(keys, Arc::new(values))
3141        };
3142
3143        expect_ipc_validation_error(
3144            Arc::new(array),
3145            "Invalid argument error: Value at position 1 out of bounds: 200 (should be in [0, 2])",
3146        );
3147    }
3148
3149    #[test]
3150    fn test_validation_of_invalid_union_array() {
3151        let array = unsafe {
3152            let fields = UnionFields::try_new(
3153                vec![1, 3], // typeids : type id 2 is not valid
3154                vec![
3155                    Field::new("a", DataType::Int32, false),
3156                    Field::new("b", DataType::Utf8, false),
3157                ],
3158            )
3159            .unwrap();
3160            let type_ids = ScalarBuffer::from(vec![1i8, 2, 3]); // 2 is invalid
3161            let offsets = None;
3162            let children: Vec<ArrayRef> = vec![
3163                Arc::new(Int32Array::from(vec![10, 20, 30])),
3164                Arc::new(StringArray::from(vec![Some("a"), Some("b"), Some("c")])),
3165            ];
3166
3167            UnionArray::new_unchecked(fields, type_ids, offsets, children)
3168        };
3169
3170        expect_ipc_validation_error(
3171            Arc::new(array),
3172            "Invalid argument error: Type Ids values must match one of the field type ids",
3173        );
3174    }
3175
3176    /// Invalid Utf-8 sequence in the first character
3177    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
3178    const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3179
3180    /// Expect an error when reading the record batch using IPC or IPC Streams
3181    fn expect_ipc_validation_error(array: ArrayRef, expected_err: &str) {
3182        let rb = RecordBatch::try_from_iter([("a", array)]).unwrap();
3183
3184        // IPC Stream format
3185        let buf = write_stream(&rb); // write is ok
3186        read_stream_skip_validation(&buf).unwrap();
3187        let err = read_stream(&buf).unwrap_err();
3188        assert_eq!(err.to_string(), expected_err);
3189
3190        // IPC File format
3191        let buf = write_ipc(&rb); // write is ok
3192        read_ipc_skip_validation(&buf).unwrap();
3193        let err = read_ipc(&buf).unwrap_err();
3194        assert_eq!(err.to_string(), expected_err);
3195
3196        // IPC Format with FileDecoder
3197        read_ipc_with_decoder_skip_validation(buf.clone()).unwrap();
3198        let err = read_ipc_with_decoder(buf).unwrap_err();
3199        assert_eq!(err.to_string(), expected_err);
3200    }
3201
3202    #[test]
3203    fn test_roundtrip_schema() {
3204        let schema = Schema::new(vec![
3205            Field::new(
3206                "a",
3207                DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3208                false,
3209            ),
3210            Field::new(
3211                "b",
3212                DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3213                false,
3214            ),
3215        ]);
3216
3217        let options = IpcWriteOptions::default();
3218        let data_gen = IpcDataGenerator::default();
3219        let mut dict_tracker = DictionaryTracker::new(false);
3220        let encoded_data =
3221            data_gen.schema_to_bytes_with_dictionary_tracker(&schema, &mut dict_tracker, &options);
3222        let mut schema_bytes = vec![];
3223        write_message(&mut schema_bytes, encoded_data, &options).expect("write_message");
3224
3225        let begin_offset: usize = if schema_bytes[0..4].eq(&CONTINUATION_MARKER) {
3226            4
3227        } else {
3228            0
3229        };
3230
3231        size_prefixed_root_as_message(&schema_bytes[begin_offset..])
3232            .expect_err("size_prefixed_root_as_message");
3233
3234        let msg = parse_message(&schema_bytes).expect("parse_message");
3235        let ipc_schema = msg.header_as_schema().expect("header_as_schema");
3236        let new_schema = fb_to_schema(ipc_schema);
3237
3238        assert_eq!(schema, new_schema);
3239    }
3240
3241    #[test]
3242    fn test_negative_meta_len() {
3243        let bytes = i32::to_le_bytes(-1);
3244        let mut buf = vec![];
3245        buf.extend(CONTINUATION_MARKER);
3246        buf.extend(bytes);
3247
3248        let reader = StreamReader::try_new(Cursor::new(buf), None);
3249        assert!(reader.is_err());
3250    }
3251}