1mod stream;
28pub use stream::*;
29
30use arrow_select::concat;
31
32use flatbuffers::{VectorIter, VerifierOptions};
33use std::collections::{HashMap, VecDeque};
34use std::fmt;
35use std::io::{BufReader, Read, Seek, SeekFrom};
36use std::sync::Arc;
37
38use arrow_array::*;
39use arrow_buffer::{
40 ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, NullBuffer, ScalarBuffer,
41};
42use arrow_data::{ArrayData, ArrayDataBuilder, UnsafeFlag};
43use arrow_schema::*;
44
45use crate::compression::{CompressionCodec, DecompressionContext};
46use crate::r#gen::Message::{self};
47use crate::{Block, CONTINUATION_MARKER, FieldNode, MetadataVersion};
48use DataType::*;
49
50fn read_buffer(
60 buf: &crate::Buffer,
61 a_data: &Buffer,
62 compression_codec: Option<CompressionCodec>,
63 decompression_context: &mut DecompressionContext,
64) -> Result<Buffer, ArrowError> {
65 let start_offset = buf.offset() as usize;
66 let buf_data = a_data.slice_with_length(start_offset, buf.length() as usize);
67 match (buf_data.is_empty(), compression_codec) {
69 (true, _) | (_, None) => Ok(buf_data),
70 (false, Some(decompressor)) => {
71 decompressor.decompress_to_buffer(&buf_data, decompression_context)
72 }
73 }
74}
75impl RecordBatchDecoder<'_> {
76 fn create_array(
89 &mut self,
90 field: &Field,
91 variadic_counts: &mut VecDeque<i64>,
92 ) -> Result<ArrayRef, ArrowError> {
93 let data_type = field.data_type();
94 match data_type {
95 Utf8 | Binary | LargeBinary | LargeUtf8 => {
96 let field_node = self.next_node(field)?;
97 let buffers = [
98 self.next_buffer()?,
99 self.next_buffer()?,
100 self.next_buffer()?,
101 ];
102 self.create_primitive_array(field_node, data_type, &buffers)
103 }
104 BinaryView | Utf8View => {
105 let count = variadic_counts
106 .pop_front()
107 .ok_or(ArrowError::IpcError(format!(
108 "Missing variadic count for {data_type} column"
109 )))?;
110 let count = count + 2; let buffers = (0..count)
112 .map(|_| self.next_buffer())
113 .collect::<Result<Vec<_>, _>>()?;
114 let field_node = self.next_node(field)?;
115 self.create_primitive_array(field_node, data_type, &buffers)
116 }
117 FixedSizeBinary(_) => {
118 let field_node = self.next_node(field)?;
119 let buffers = [self.next_buffer()?, self.next_buffer()?];
120 self.create_primitive_array(field_node, data_type, &buffers)
121 }
122 List(list_field) | LargeList(list_field) | Map(list_field, _) => {
123 let list_node = self.next_node(field)?;
124 let list_buffers = [self.next_buffer()?, self.next_buffer()?];
125 let values = self.create_array(list_field, variadic_counts)?;
126 self.create_list_array(list_node, data_type, &list_buffers, values)
127 }
128 ListView(list_field) | LargeListView(list_field) => {
129 let list_node = self.next_node(field)?;
130 let list_buffers = [
131 self.next_buffer()?, self.next_buffer()?, self.next_buffer()?, ];
135 let values = self.create_array(list_field, variadic_counts)?;
136 self.create_list_view_array(list_node, data_type, &list_buffers, values)
137 }
138 FixedSizeList(list_field, _) => {
139 let list_node = self.next_node(field)?;
140 let list_buffers = [self.next_buffer()?];
141 let values = self.create_array(list_field, variadic_counts)?;
142 self.create_list_array(list_node, data_type, &list_buffers, values)
143 }
144 Struct(struct_fields) => {
145 let struct_node = self.next_node(field)?;
146 let null_buffer = self.next_buffer()?;
147
148 let mut struct_arrays = vec![];
150 for struct_field in struct_fields {
153 let child = self.create_array(struct_field, variadic_counts)?;
154 struct_arrays.push(child);
155 }
156 self.create_struct_array(struct_node, null_buffer, struct_fields, struct_arrays)
157 }
158 RunEndEncoded(run_ends_field, values_field) => {
159 let run_node = self.next_node(field)?;
160 let run_ends = self.create_array(run_ends_field, variadic_counts)?;
161 let values = self.create_array(values_field, variadic_counts)?;
162
163 let run_array_length = run_node.length() as usize;
164 let builder = ArrayData::builder(data_type.clone())
165 .len(run_array_length)
166 .offset(0)
167 .add_child_data(run_ends.into_data())
168 .add_child_data(values.into_data())
169 .null_count(run_node.null_count() as usize);
170
171 self.create_array_from_builder(builder)
172 }
173 Dictionary(_, _) => {
175 let index_node = self.next_node(field)?;
176 let index_buffers = [self.next_buffer()?, self.next_buffer()?];
177
178 #[allow(deprecated)]
179 let dict_id = field.dict_id().ok_or_else(|| {
180 ArrowError::ParseError(format!("Field {field} does not have dict id"))
181 })?;
182
183 let value_array = match self.dictionaries_by_id.get(&dict_id) {
184 Some(array) => array.clone(),
185 None => {
186 if let Dictionary(_, value_type) = data_type {
190 arrow_array::new_empty_array(value_type.as_ref())
191 } else {
192 unreachable!()
193 }
194 }
195 };
196
197 self.create_dictionary_array(index_node, data_type, &index_buffers, value_array)
198 }
199 Union(fields, mode) => {
200 let union_node = self.next_node(field)?;
201 let len = union_node.length() as usize;
202
203 if self.version < MetadataVersion::V5 {
206 self.next_buffer()?;
207 }
208
209 let type_ids: ScalarBuffer<i8> =
210 self.next_buffer()?.slice_with_length(0, len).into();
211
212 let value_offsets = match mode {
213 UnionMode::Dense => {
214 let offsets: ScalarBuffer<i32> =
215 self.next_buffer()?.slice_with_length(0, len * 4).into();
216 Some(offsets)
217 }
218 UnionMode::Sparse => None,
219 };
220
221 let mut children = Vec::with_capacity(fields.len());
222
223 for (_id, field) in fields.iter() {
224 let child = self.create_array(field, variadic_counts)?;
225 children.push(child);
226 }
227
228 let array = if self.skip_validation.get() {
229 unsafe {
231 UnionArray::new_unchecked(fields.clone(), type_ids, value_offsets, children)
232 }
233 } else {
234 UnionArray::try_new(fields.clone(), type_ids, value_offsets, children)?
235 };
236 Ok(Arc::new(array))
237 }
238 Null => {
239 let node = self.next_node(field)?;
240 let length = node.length();
241 let null_count = node.null_count();
242
243 if length != null_count {
244 return Err(ArrowError::SchemaError(format!(
245 "Field {field} of NullArray has unequal null_count {null_count} and len {length}"
246 )));
247 }
248
249 let builder = ArrayData::builder(data_type.clone())
250 .len(length as usize)
251 .offset(0);
252 self.create_array_from_builder(builder)
253 }
254 _ => {
255 let field_node = self.next_node(field)?;
256 let buffers = [self.next_buffer()?, self.next_buffer()?];
257 self.create_primitive_array(field_node, data_type, &buffers)
258 }
259 }
260 }
261
262 fn create_primitive_array(
265 &self,
266 field_node: &FieldNode,
267 data_type: &DataType,
268 buffers: &[Buffer],
269 ) -> Result<ArrayRef, ArrowError> {
270 let length = field_node.length() as usize;
271 let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
272 let mut builder = match data_type {
273 Utf8 | Binary | LargeBinary | LargeUtf8 => {
274 ArrayData::builder(data_type.clone())
276 .len(length)
277 .buffers(buffers[1..3].to_vec())
278 .null_bit_buffer(null_buffer)
279 }
280 BinaryView | Utf8View => ArrayData::builder(data_type.clone())
281 .len(length)
282 .buffers(buffers[1..].to_vec())
283 .null_bit_buffer(null_buffer),
284 _ if data_type.is_primitive() || matches!(data_type, Boolean | FixedSizeBinary(_)) => {
285 ArrayData::builder(data_type.clone())
287 .len(length)
288 .add_buffer(buffers[1].clone())
289 .null_bit_buffer(null_buffer)
290 }
291 t => unreachable!("Data type {:?} either unsupported or not primitive", t),
292 };
293
294 builder = builder.null_count(field_node.null_count() as usize);
295
296 self.create_array_from_builder(builder)
297 }
298
299 fn create_array_from_builder(&self, builder: ArrayDataBuilder) -> Result<ArrayRef, ArrowError> {
301 let mut builder = builder.align_buffers(!self.require_alignment);
302 if self.skip_validation.get() {
303 unsafe { builder = builder.skip_validation(true) }
305 };
306 Ok(make_array(builder.build()?))
307 }
308
309 fn create_list_array(
312 &self,
313 field_node: &FieldNode,
314 data_type: &DataType,
315 buffers: &[Buffer],
316 child_array: ArrayRef,
317 ) -> Result<ArrayRef, ArrowError> {
318 let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
319 let length = field_node.length() as usize;
320 let child_data = child_array.into_data();
321 let mut builder = match data_type {
322 List(_) | LargeList(_) | Map(_, _) => ArrayData::builder(data_type.clone())
323 .len(length)
324 .add_buffer(buffers[1].clone())
325 .add_child_data(child_data)
326 .null_bit_buffer(null_buffer),
327
328 FixedSizeList(_, _) => ArrayData::builder(data_type.clone())
329 .len(length)
330 .add_child_data(child_data)
331 .null_bit_buffer(null_buffer),
332
333 _ => unreachable!("Cannot create list or map array from {:?}", data_type),
334 };
335
336 builder = builder.null_count(field_node.null_count() as usize);
337
338 self.create_array_from_builder(builder)
339 }
340
341 fn create_list_view_array(
342 &self,
343 field_node: &FieldNode,
344 data_type: &DataType,
345 buffers: &[Buffer],
346 child_array: ArrayRef,
347 ) -> Result<ArrayRef, ArrowError> {
348 assert!(matches!(data_type, ListView(_) | LargeListView(_)));
349
350 let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
351 let length = field_node.length() as usize;
352 let child_data = child_array.into_data();
353
354 self.create_array_from_builder(
355 ArrayData::builder(data_type.clone())
356 .len(length)
357 .add_buffer(buffers[1].clone()) .add_buffer(buffers[2].clone()) .add_child_data(child_data)
360 .null_bit_buffer(null_buffer)
361 .null_count(field_node.null_count() as usize),
362 )
363 }
364
365 fn create_struct_array(
366 &self,
367 struct_node: &FieldNode,
368 null_buffer: Buffer,
369 struct_fields: &Fields,
370 struct_arrays: Vec<ArrayRef>,
371 ) -> Result<ArrayRef, ArrowError> {
372 let null_count = struct_node.null_count() as usize;
373 let len = struct_node.length() as usize;
374 let skip_validation = self.skip_validation.get();
375
376 let nulls = if null_count > 0 {
377 let validity_buffer = BooleanBuffer::new(null_buffer, 0, len);
378 let null_buffer = if skip_validation {
379 unsafe { NullBuffer::new_unchecked(validity_buffer, null_count) }
381 } else {
382 let null_buffer = NullBuffer::new(validity_buffer);
383
384 if null_buffer.null_count() != null_count {
385 return Err(ArrowError::InvalidArgumentError(format!(
386 "null_count value ({}) doesn't match actual number of nulls in array ({})",
387 null_count,
388 null_buffer.null_count()
389 )));
390 }
391
392 null_buffer
393 };
394
395 Some(null_buffer)
396 } else {
397 None
398 };
399 if struct_arrays.is_empty() {
400 return Ok(Arc::new(StructArray::new_empty_fields(len, nulls)));
403 }
404
405 let struct_array = if skip_validation {
406 unsafe { StructArray::new_unchecked(struct_fields.clone(), struct_arrays, nulls) }
408 } else {
409 StructArray::try_new(struct_fields.clone(), struct_arrays, nulls)?
410 };
411
412 Ok(Arc::new(struct_array))
413 }
414
415 fn create_dictionary_array(
418 &self,
419 field_node: &FieldNode,
420 data_type: &DataType,
421 buffers: &[Buffer],
422 value_array: ArrayRef,
423 ) -> Result<ArrayRef, ArrowError> {
424 if let Dictionary(_, _) = *data_type {
425 let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
426 let builder = ArrayData::builder(data_type.clone())
427 .len(field_node.length() as usize)
428 .add_buffer(buffers[1].clone())
429 .add_child_data(value_array.into_data())
430 .null_bit_buffer(null_buffer)
431 .null_count(field_node.null_count() as usize);
432 self.create_array_from_builder(builder)
433 } else {
434 unreachable!("Cannot create dictionary array from {:?}", data_type)
435 }
436 }
437}
438
439pub struct RecordBatchDecoder<'a> {
445 batch: crate::RecordBatch<'a>,
447 schema: SchemaRef,
449 dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
451 compression: Option<CompressionCodec>,
453 decompression_context: DecompressionContext,
455 version: MetadataVersion,
457 data: &'a Buffer,
459 nodes: VectorIter<'a, FieldNode>,
461 buffers: VectorIter<'a, crate::Buffer>,
463 projection: Option<&'a [usize]>,
466 require_alignment: bool,
469 skip_validation: UnsafeFlag,
473}
474
475impl<'a> RecordBatchDecoder<'a> {
476 fn try_new(
478 buf: &'a Buffer,
479 batch: crate::RecordBatch<'a>,
480 schema: SchemaRef,
481 dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
482 metadata: &'a MetadataVersion,
483 ) -> Result<Self, ArrowError> {
484 let buffers = batch.buffers().ok_or_else(|| {
485 ArrowError::IpcError("Unable to get buffers from IPC RecordBatch".to_string())
486 })?;
487 let field_nodes = batch.nodes().ok_or_else(|| {
488 ArrowError::IpcError("Unable to get field nodes from IPC RecordBatch".to_string())
489 })?;
490
491 let batch_compression = batch.compression();
492 let compression = batch_compression
493 .map(|batch_compression| batch_compression.codec().try_into())
494 .transpose()?;
495
496 Ok(Self {
497 batch,
498 schema,
499 dictionaries_by_id,
500 compression,
501 decompression_context: DecompressionContext::new(),
502 version: *metadata,
503 data: buf,
504 nodes: field_nodes.iter(),
505 buffers: buffers.iter(),
506 projection: None,
507 require_alignment: false,
508 skip_validation: UnsafeFlag::new(),
509 })
510 }
511
512 pub fn with_projection(mut self, projection: Option<&'a [usize]>) -> Self {
517 self.projection = projection;
518 self
519 }
520
521 pub fn with_require_alignment(mut self, require_alignment: bool) -> Self {
527 self.require_alignment = require_alignment;
528 self
529 }
530
531 pub(crate) fn with_skip_validation(mut self, skip_validation: UnsafeFlag) -> Self {
543 self.skip_validation = skip_validation;
544 self
545 }
546
547 fn read_record_batch(mut self) -> Result<RecordBatch, ArrowError> {
549 let mut variadic_counts: VecDeque<i64> = self
550 .batch
551 .variadicBufferCounts()
552 .into_iter()
553 .flatten()
554 .collect();
555
556 let options = RecordBatchOptions::new().with_row_count(Some(self.batch.length() as usize));
557
558 let schema = Arc::clone(&self.schema);
559 if let Some(projection) = self.projection {
560 let mut arrays = vec![];
561 for (idx, field) in schema.fields().iter().enumerate() {
563 if let Some(proj_idx) = projection.iter().position(|p| p == &idx) {
565 let child = self.create_array(field, &mut variadic_counts)?;
566 arrays.push((proj_idx, child));
567 } else {
568 self.skip_field(field, &mut variadic_counts)?;
569 }
570 }
571
572 arrays.sort_by_key(|t| t.0);
573
574 let schema = Arc::new(schema.project(projection)?);
575 let columns = arrays.into_iter().map(|t| t.1).collect::<Vec<_>>();
576
577 if self.skip_validation.get() {
578 unsafe {
580 Ok(RecordBatch::new_unchecked(
581 schema,
582 columns,
583 self.batch.length() as usize,
584 ))
585 }
586 } else {
587 assert!(variadic_counts.is_empty());
588 RecordBatch::try_new_with_options(schema, columns, &options)
589 }
590 } else {
591 let mut children = vec![];
592 for field in schema.fields() {
594 let child = self.create_array(field, &mut variadic_counts)?;
595 children.push(child);
596 }
597
598 if self.skip_validation.get() {
599 unsafe {
601 Ok(RecordBatch::new_unchecked(
602 schema,
603 children,
604 self.batch.length() as usize,
605 ))
606 }
607 } else {
608 assert!(variadic_counts.is_empty());
609 RecordBatch::try_new_with_options(schema, children, &options)
610 }
611 }
612 }
613
614 fn next_buffer(&mut self) -> Result<Buffer, ArrowError> {
615 let buffer = self.buffers.next().ok_or_else(|| {
616 ArrowError::IpcError("Buffer count mismatched with metadata".to_string())
617 })?;
618 read_buffer(
619 buffer,
620 self.data,
621 self.compression,
622 &mut self.decompression_context,
623 )
624 }
625
626 fn skip_buffer(&mut self) {
627 self.buffers.next().unwrap();
628 }
629
630 fn next_node(&mut self, field: &Field) -> Result<&'a FieldNode, ArrowError> {
631 self.nodes.next().ok_or_else(|| {
632 ArrowError::SchemaError(format!(
633 "Invalid data for schema. {field} refers to node not found in schema",
634 ))
635 })
636 }
637
638 fn skip_field(
639 &mut self,
640 field: &Field,
641 variadic_count: &mut VecDeque<i64>,
642 ) -> Result<(), ArrowError> {
643 self.next_node(field)?;
644
645 match field.data_type() {
646 Utf8 | Binary | LargeBinary | LargeUtf8 => {
647 for _ in 0..3 {
648 self.skip_buffer()
649 }
650 }
651 Utf8View | BinaryView => {
652 let count = variadic_count
653 .pop_front()
654 .ok_or(ArrowError::IpcError(format!(
655 "Missing variadic count for {} column",
656 field.data_type()
657 )))?;
658 let count = count + 2; for _i in 0..count {
660 self.skip_buffer()
661 }
662 }
663 FixedSizeBinary(_) => {
664 self.skip_buffer();
665 self.skip_buffer();
666 }
667 List(list_field) | LargeList(list_field) | Map(list_field, _) => {
668 self.skip_buffer();
669 self.skip_buffer();
670 self.skip_field(list_field, variadic_count)?;
671 }
672 ListView(list_field) | LargeListView(list_field) => {
673 self.skip_buffer(); self.skip_buffer(); self.skip_buffer(); self.skip_field(list_field, variadic_count)?;
677 }
678 FixedSizeList(list_field, _) => {
679 self.skip_buffer();
680 self.skip_field(list_field, variadic_count)?;
681 }
682 Struct(struct_fields) => {
683 self.skip_buffer();
684
685 for struct_field in struct_fields {
687 self.skip_field(struct_field, variadic_count)?
688 }
689 }
690 RunEndEncoded(run_ends_field, values_field) => {
691 self.skip_field(run_ends_field, variadic_count)?;
692 self.skip_field(values_field, variadic_count)?;
693 }
694 Dictionary(_, _) => {
695 self.skip_buffer(); self.skip_buffer(); }
698 Union(fields, mode) => {
699 if self.version < MetadataVersion::V5 {
700 self.skip_buffer(); }
702 self.skip_buffer(); match mode {
705 UnionMode::Dense => self.skip_buffer(), UnionMode::Sparse => {}
707 };
708
709 for (_, field) in fields.iter() {
710 self.skip_field(field, variadic_count)?
711 }
712 }
713 Null => {}
715
716 Boolean
718 | Int8
719 | Int16
720 | Int32
721 | Int64
722 | UInt8
723 | UInt16
724 | UInt32
725 | UInt64
726 | Float16
727 | Float32
728 | Float64
729 | Timestamp(_, _)
730 | Date32
731 | Date64
732 | Time32(_)
733 | Time64(_)
734 | Duration(_)
735 | Interval(_)
736 | Decimal32(_, _)
737 | Decimal64(_, _)
738 | Decimal128(_, _)
739 | Decimal256(_, _) => {
740 self.skip_buffer();
741 self.skip_buffer();
742 }
743 };
744 Ok(())
745 }
746}
747
748pub fn read_record_batch(
759 buf: &Buffer,
760 batch: crate::RecordBatch,
761 schema: SchemaRef,
762 dictionaries_by_id: &HashMap<i64, ArrayRef>,
763 projection: Option<&[usize]>,
764 metadata: &MetadataVersion,
765) -> Result<RecordBatch, ArrowError> {
766 RecordBatchDecoder::try_new(buf, batch, schema, dictionaries_by_id, metadata)?
767 .with_projection(projection)
768 .with_require_alignment(false)
769 .read_record_batch()
770}
771
772pub fn read_dictionary(
775 buf: &Buffer,
776 batch: crate::DictionaryBatch,
777 schema: &Schema,
778 dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
779 metadata: &MetadataVersion,
780) -> Result<(), ArrowError> {
781 read_dictionary_impl(
782 buf,
783 batch,
784 schema,
785 dictionaries_by_id,
786 metadata,
787 false,
788 UnsafeFlag::new(),
789 )
790}
791
792fn read_dictionary_impl(
793 buf: &Buffer,
794 batch: crate::DictionaryBatch,
795 schema: &Schema,
796 dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
797 metadata: &MetadataVersion,
798 require_alignment: bool,
799 skip_validation: UnsafeFlag,
800) -> Result<(), ArrowError> {
801 let id = batch.id();
802
803 let dictionary_values = get_dictionary_values(
804 buf,
805 batch,
806 schema,
807 dictionaries_by_id,
808 metadata,
809 require_alignment,
810 skip_validation,
811 )?;
812
813 update_dictionaries(dictionaries_by_id, batch.isDelta(), id, dictionary_values)?;
814
815 Ok(())
816}
817
818fn update_dictionaries(
827 dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
828 is_delta: bool,
829 dict_id: i64,
830 dict_values: ArrayRef,
831) -> Result<(), ArrowError> {
832 if !is_delta {
833 dictionaries_by_id.insert(dict_id, dict_values.clone());
837 return Ok(());
838 }
839
840 let existing = dictionaries_by_id.get(&dict_id).ok_or_else(|| {
841 ArrowError::InvalidArgumentError(format!(
842 "No existing dictionary for delta dictionary with id '{dict_id}'"
843 ))
844 })?;
845
846 let combined = concat::concat(&[existing, &dict_values]).map_err(|e| {
847 ArrowError::InvalidArgumentError(format!("Failed to concat delta dictionary: {e}"))
848 })?;
849
850 dictionaries_by_id.insert(dict_id, combined);
851
852 Ok(())
853}
854
855fn get_dictionary_values(
859 buf: &Buffer,
860 batch: crate::DictionaryBatch,
861 schema: &Schema,
862 dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
863 metadata: &MetadataVersion,
864 require_alignment: bool,
865 skip_validation: UnsafeFlag,
866) -> Result<ArrayRef, ArrowError> {
867 let id = batch.id();
868 #[allow(deprecated)]
869 let fields_using_this_dictionary = schema.fields_with_dict_id(id);
870 let first_field = fields_using_this_dictionary.first().ok_or_else(|| {
871 ArrowError::InvalidArgumentError(format!("dictionary id {id} not found in schema"))
872 })?;
873
874 let dictionary_values: ArrayRef = match first_field.data_type() {
878 DataType::Dictionary(_, value_type) => {
879 let value = value_type.as_ref().clone();
881 let schema = Schema::new(vec![Field::new("", value, true)]);
882 let record_batch = RecordBatchDecoder::try_new(
884 buf,
885 batch.data().unwrap(),
886 Arc::new(schema),
887 dictionaries_by_id,
888 metadata,
889 )?
890 .with_require_alignment(require_alignment)
891 .with_skip_validation(skip_validation)
892 .read_record_batch()?;
893
894 Some(record_batch.column(0).clone())
895 }
896 _ => None,
897 }
898 .ok_or_else(|| {
899 ArrowError::InvalidArgumentError(format!("dictionary id {id} not found in schema"))
900 })?;
901
902 Ok(dictionary_values)
903}
904
905fn read_block<R: Read + Seek>(mut reader: R, block: &Block) -> Result<Buffer, ArrowError> {
907 reader.seek(SeekFrom::Start(block.offset() as u64))?;
908 let body_len = block.bodyLength().to_usize().unwrap();
909 let metadata_len = block.metaDataLength().to_usize().unwrap();
910 let total_len = body_len.checked_add(metadata_len).unwrap();
911
912 let mut buf = MutableBuffer::from_len_zeroed(total_len);
913 reader.read_exact(&mut buf)?;
914 Ok(buf.into())
915}
916
917fn parse_message(buf: &[u8]) -> Result<Message::Message<'_>, ArrowError> {
921 let buf = match buf[..4] == CONTINUATION_MARKER {
922 true => &buf[8..],
923 false => &buf[4..],
924 };
925 crate::root_as_message(buf)
926 .map_err(|err| ArrowError::ParseError(format!("Unable to get root as message: {err:?}")))
927}
928
929pub fn read_footer_length(buf: [u8; 10]) -> Result<usize, ArrowError> {
933 if buf[4..] != super::ARROW_MAGIC {
934 return Err(ArrowError::ParseError(
935 "Arrow file does not contain correct footer".to_string(),
936 ));
937 }
938
939 let footer_len = i32::from_le_bytes(buf[..4].try_into().unwrap());
941 footer_len
942 .try_into()
943 .map_err(|_| ArrowError::ParseError(format!("Invalid footer length: {footer_len}")))
944}
945
946#[derive(Debug)]
1011pub struct FileDecoder {
1012 schema: SchemaRef,
1013 dictionaries: HashMap<i64, ArrayRef>,
1014 version: MetadataVersion,
1015 projection: Option<Vec<usize>>,
1016 require_alignment: bool,
1017 skip_validation: UnsafeFlag,
1018}
1019
1020impl FileDecoder {
1021 pub fn new(schema: SchemaRef, version: MetadataVersion) -> Self {
1023 Self {
1024 schema,
1025 version,
1026 dictionaries: Default::default(),
1027 projection: None,
1028 require_alignment: false,
1029 skip_validation: UnsafeFlag::new(),
1030 }
1031 }
1032
1033 pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
1035 self.projection = Some(projection);
1036 self
1037 }
1038
1039 pub fn with_require_alignment(mut self, require_alignment: bool) -> Self {
1052 self.require_alignment = require_alignment;
1053 self
1054 }
1055
1056 pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
1067 unsafe { self.skip_validation.set(skip_validation) };
1068 self
1069 }
1070
1071 fn read_message<'a>(&self, buf: &'a [u8]) -> Result<Message::Message<'a>, ArrowError> {
1072 let message = parse_message(buf)?;
1073
1074 if self.version != MetadataVersion::V1 && message.version() != self.version {
1076 return Err(ArrowError::IpcError(
1077 "Could not read IPC message as metadata versions mismatch".to_string(),
1078 ));
1079 }
1080 Ok(message)
1081 }
1082
1083 pub fn read_dictionary(&mut self, block: &Block, buf: &Buffer) -> Result<(), ArrowError> {
1085 let message = self.read_message(buf)?;
1086 match message.header_type() {
1087 crate::MessageHeader::DictionaryBatch => {
1088 let batch = message.header_as_dictionary_batch().unwrap();
1089 read_dictionary_impl(
1090 &buf.slice(block.metaDataLength() as _),
1091 batch,
1092 &self.schema,
1093 &mut self.dictionaries,
1094 &message.version(),
1095 self.require_alignment,
1096 self.skip_validation.clone(),
1097 )
1098 }
1099 t => Err(ArrowError::ParseError(format!(
1100 "Expecting DictionaryBatch in dictionary blocks, found {t:?}."
1101 ))),
1102 }
1103 }
1104
1105 pub fn read_record_batch(
1107 &self,
1108 block: &Block,
1109 buf: &Buffer,
1110 ) -> Result<Option<RecordBatch>, ArrowError> {
1111 let message = self.read_message(buf)?;
1112 match message.header_type() {
1113 crate::MessageHeader::Schema => Err(ArrowError::IpcError(
1114 "Not expecting a schema when messages are read".to_string(),
1115 )),
1116 crate::MessageHeader::RecordBatch => {
1117 let batch = message.header_as_record_batch().ok_or_else(|| {
1118 ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
1119 })?;
1120 RecordBatchDecoder::try_new(
1122 &buf.slice(block.metaDataLength() as _),
1123 batch,
1124 self.schema.clone(),
1125 &self.dictionaries,
1126 &message.version(),
1127 )?
1128 .with_projection(self.projection.as_deref())
1129 .with_require_alignment(self.require_alignment)
1130 .with_skip_validation(self.skip_validation.clone())
1131 .read_record_batch()
1132 .map(Some)
1133 }
1134 crate::MessageHeader::NONE => Ok(None),
1135 t => Err(ArrowError::InvalidArgumentError(format!(
1136 "Reading types other than record batches not yet supported, unable to read {t:?}"
1137 ))),
1138 }
1139 }
1140}
1141
1142#[derive(Debug)]
1144pub struct FileReaderBuilder {
1145 projection: Option<Vec<usize>>,
1147 max_footer_fb_tables: usize,
1149 max_footer_fb_depth: usize,
1151}
1152
1153impl Default for FileReaderBuilder {
1154 fn default() -> Self {
1155 let verifier_options = VerifierOptions::default();
1156 Self {
1157 max_footer_fb_tables: verifier_options.max_tables,
1158 max_footer_fb_depth: verifier_options.max_depth,
1159 projection: None,
1160 }
1161 }
1162}
1163
1164impl FileReaderBuilder {
1165 pub fn new() -> Self {
1169 Self::default()
1170 }
1171
1172 pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
1174 self.projection = Some(projection);
1175 self
1176 }
1177
1178 pub fn with_max_footer_fb_tables(mut self, max_footer_fb_tables: usize) -> Self {
1191 self.max_footer_fb_tables = max_footer_fb_tables;
1192 self
1193 }
1194
1195 pub fn with_max_footer_fb_depth(mut self, max_footer_fb_depth: usize) -> Self {
1208 self.max_footer_fb_depth = max_footer_fb_depth;
1209 self
1210 }
1211
1212 pub fn build<R: Read + Seek>(self, mut reader: R) -> Result<FileReader<R>, ArrowError> {
1214 let mut buffer = [0; 10];
1216 reader.seek(SeekFrom::End(-10))?;
1217 reader.read_exact(&mut buffer)?;
1218
1219 let footer_len = read_footer_length(buffer)?;
1220
1221 let mut footer_data = vec![0; footer_len];
1223 reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
1224 reader.read_exact(&mut footer_data)?;
1225
1226 let verifier_options = VerifierOptions {
1227 max_tables: self.max_footer_fb_tables,
1228 max_depth: self.max_footer_fb_depth,
1229 ..Default::default()
1230 };
1231 let footer = crate::root_as_footer_with_opts(&verifier_options, &footer_data[..]).map_err(
1232 |err| ArrowError::ParseError(format!("Unable to get root as footer: {err:?}")),
1233 )?;
1234
1235 let blocks = footer.recordBatches().ok_or_else(|| {
1236 ArrowError::ParseError("Unable to get record batches from IPC Footer".to_string())
1237 })?;
1238
1239 let total_blocks = blocks.len();
1240
1241 let ipc_schema = footer.schema().unwrap();
1242 if !ipc_schema.endianness().equals_to_target_endianness() {
1243 return Err(ArrowError::IpcError(
1244 "the endianness of the source system does not match the endianness of the target system.".to_owned()
1245 ));
1246 }
1247
1248 let schema = crate::convert::fb_to_schema(ipc_schema);
1249
1250 let mut custom_metadata = HashMap::new();
1251 if let Some(fb_custom_metadata) = footer.custom_metadata() {
1252 for kv in fb_custom_metadata.into_iter() {
1253 custom_metadata.insert(
1254 kv.key().unwrap().to_string(),
1255 kv.value().unwrap().to_string(),
1256 );
1257 }
1258 }
1259
1260 let mut decoder = FileDecoder::new(Arc::new(schema), footer.version());
1261 if let Some(projection) = self.projection {
1262 decoder = decoder.with_projection(projection)
1263 }
1264
1265 if let Some(dictionaries) = footer.dictionaries() {
1267 for block in dictionaries {
1268 let buf = read_block(&mut reader, block)?;
1269 decoder.read_dictionary(block, &buf)?;
1270 }
1271 }
1272
1273 Ok(FileReader {
1274 reader,
1275 blocks: blocks.iter().copied().collect(),
1276 current_block: 0,
1277 total_blocks,
1278 decoder,
1279 custom_metadata,
1280 })
1281 }
1282}
1283
1284pub struct FileReader<R> {
1329 reader: R,
1331
1332 decoder: FileDecoder,
1334
1335 blocks: Vec<Block>,
1339
1340 current_block: usize,
1342
1343 total_blocks: usize,
1345
1346 custom_metadata: HashMap<String, String>,
1348}
1349
1350impl<R> fmt::Debug for FileReader<R> {
1351 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1352 f.debug_struct("FileReader<R>")
1353 .field("decoder", &self.decoder)
1354 .field("blocks", &self.blocks)
1355 .field("current_block", &self.current_block)
1356 .field("total_blocks", &self.total_blocks)
1357 .finish_non_exhaustive()
1358 }
1359}
1360
1361impl<R: Read + Seek> FileReader<BufReader<R>> {
1362 pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1366 Self::try_new(BufReader::new(reader), projection)
1367 }
1368}
1369
1370impl<R: Read + Seek> FileReader<R> {
1371 pub fn try_new(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1382 let builder = FileReaderBuilder {
1383 projection,
1384 ..Default::default()
1385 };
1386 builder.build(reader)
1387 }
1388
1389 pub fn custom_metadata(&self) -> &HashMap<String, String> {
1391 &self.custom_metadata
1392 }
1393
1394 pub fn num_batches(&self) -> usize {
1396 self.total_blocks
1397 }
1398
1399 pub fn schema(&self) -> SchemaRef {
1401 self.decoder.schema.clone()
1402 }
1403
1404 pub fn set_index(&mut self, index: usize) -> Result<(), ArrowError> {
1408 if index >= self.total_blocks {
1409 Err(ArrowError::InvalidArgumentError(format!(
1410 "Cannot set batch to index {} from {} total batches",
1411 index, self.total_blocks
1412 )))
1413 } else {
1414 self.current_block = index;
1415 Ok(())
1416 }
1417 }
1418
1419 fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
1420 let block = &self.blocks[self.current_block];
1421 self.current_block += 1;
1422
1423 let buffer = read_block(&mut self.reader, block)?;
1425 self.decoder.read_record_batch(block, &buffer)
1426 }
1427
1428 pub fn get_ref(&self) -> &R {
1432 &self.reader
1433 }
1434
1435 pub fn get_mut(&mut self) -> &mut R {
1439 &mut self.reader
1440 }
1441
1442 pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
1448 self.decoder = unsafe { self.decoder.with_skip_validation(skip_validation) };
1449 self
1450 }
1451}
1452
1453impl<R: Read + Seek> Iterator for FileReader<R> {
1454 type Item = Result<RecordBatch, ArrowError>;
1455
1456 fn next(&mut self) -> Option<Self::Item> {
1457 if self.current_block < self.total_blocks {
1459 self.maybe_next().transpose()
1460 } else {
1461 None
1462 }
1463 }
1464}
1465
1466impl<R: Read + Seek> RecordBatchReader for FileReader<R> {
1467 fn schema(&self) -> SchemaRef {
1468 self.schema()
1469 }
1470}
1471
1472pub struct StreamReader<R> {
1506 reader: MessageReader<R>,
1508
1509 schema: SchemaRef,
1511
1512 dictionaries_by_id: HashMap<i64, ArrayRef>,
1516
1517 finished: bool,
1521
1522 projection: Option<(Vec<usize>, Schema)>,
1524
1525 skip_validation: UnsafeFlag,
1529}
1530
1531impl<R> fmt::Debug for StreamReader<R> {
1532 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
1533 f.debug_struct("StreamReader<R>")
1534 .field("reader", &"R")
1535 .field("schema", &self.schema)
1536 .field("dictionaries_by_id", &self.dictionaries_by_id)
1537 .field("finished", &self.finished)
1538 .field("projection", &self.projection)
1539 .finish()
1540 }
1541}
1542
1543impl<R: Read> StreamReader<BufReader<R>> {
1544 pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1548 Self::try_new(BufReader::new(reader), projection)
1549 }
1550}
1551
1552impl<R: Read> StreamReader<R> {
1553 pub fn try_new(
1565 reader: R,
1566 projection: Option<Vec<usize>>,
1567 ) -> Result<StreamReader<R>, ArrowError> {
1568 let mut msg_reader = MessageReader::new(reader);
1569 let message = msg_reader.maybe_next()?;
1570 let Some((message, _)) = message else {
1571 return Err(ArrowError::IpcError(
1572 "Expected schema message, found empty stream.".to_string(),
1573 ));
1574 };
1575
1576 if message.header_type() != Message::MessageHeader::Schema {
1577 return Err(ArrowError::IpcError(format!(
1578 "Expected a schema as the first message in the stream, got: {:?}",
1579 message.header_type()
1580 )));
1581 }
1582
1583 let schema = message.header_as_schema().ok_or_else(|| {
1584 ArrowError::ParseError("Failed to parse schema from message header".to_string())
1585 })?;
1586 let schema = crate::convert::fb_to_schema(schema);
1587
1588 let dictionaries_by_id = HashMap::new();
1590
1591 let projection = match projection {
1592 Some(projection_indices) => {
1593 let schema = schema.project(&projection_indices)?;
1594 Some((projection_indices, schema))
1595 }
1596 _ => None,
1597 };
1598
1599 Ok(Self {
1600 reader: msg_reader,
1601 schema: Arc::new(schema),
1602 finished: false,
1603 dictionaries_by_id,
1604 projection,
1605 skip_validation: UnsafeFlag::new(),
1606 })
1607 }
1608
1609 #[deprecated(since = "53.0.0", note = "use `try_new` instead")]
1611 pub fn try_new_unbuffered(
1612 reader: R,
1613 projection: Option<Vec<usize>>,
1614 ) -> Result<Self, ArrowError> {
1615 Self::try_new(reader, projection)
1616 }
1617
1618 pub fn schema(&self) -> SchemaRef {
1620 self.schema.clone()
1621 }
1622
1623 pub fn is_finished(&self) -> bool {
1625 self.finished
1626 }
1627
1628 fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
1629 if self.finished {
1630 return Ok(None);
1631 }
1632
1633 loop {
1635 let message = self.next_ipc_message()?;
1636 let Some(message) = message else {
1637 self.finished = true;
1639 return Ok(None);
1640 };
1641
1642 match message {
1643 IpcMessage::Schema(_) => {
1644 return Err(ArrowError::IpcError(
1645 "Expected a record batch, but found a schema".to_string(),
1646 ));
1647 }
1648 IpcMessage::RecordBatch(record_batch) => {
1649 return Ok(Some(record_batch));
1650 }
1651 IpcMessage::DictionaryBatch { .. } => {
1652 continue;
1653 }
1654 };
1655 }
1656 }
1657
1658 pub(crate) fn next_ipc_message(&mut self) -> Result<Option<IpcMessage>, ArrowError> {
1666 let message = self.reader.maybe_next()?;
1667 let Some((message, body)) = message else {
1668 return Ok(None);
1670 };
1671
1672 let ipc_message = match message.header_type() {
1673 Message::MessageHeader::Schema => {
1674 let schema = message.header_as_schema().ok_or_else(|| {
1675 ArrowError::ParseError("Failed to parse schema from message header".to_string())
1676 })?;
1677 let arrow_schema = crate::convert::fb_to_schema(schema);
1678 IpcMessage::Schema(arrow_schema)
1679 }
1680 Message::MessageHeader::RecordBatch => {
1681 let batch = message.header_as_record_batch().ok_or_else(|| {
1682 ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
1683 })?;
1684
1685 let version = message.version();
1686 let schema = self.schema.clone();
1687 let record_batch = RecordBatchDecoder::try_new(
1688 &body.into(),
1689 batch,
1690 schema,
1691 &self.dictionaries_by_id,
1692 &version,
1693 )?
1694 .with_projection(self.projection.as_ref().map(|x| x.0.as_ref()))
1695 .with_require_alignment(false)
1696 .with_skip_validation(self.skip_validation.clone())
1697 .read_record_batch()?;
1698 IpcMessage::RecordBatch(record_batch)
1699 }
1700 Message::MessageHeader::DictionaryBatch => {
1701 let dict = message.header_as_dictionary_batch().ok_or_else(|| {
1702 ArrowError::ParseError(
1703 "Failed to parse dictionary batch from message header".to_string(),
1704 )
1705 })?;
1706
1707 let version = message.version();
1708 let dict_values = get_dictionary_values(
1709 &body.into(),
1710 dict,
1711 &self.schema,
1712 &mut self.dictionaries_by_id,
1713 &version,
1714 false,
1715 self.skip_validation.clone(),
1716 )?;
1717
1718 update_dictionaries(
1719 &mut self.dictionaries_by_id,
1720 dict.isDelta(),
1721 dict.id(),
1722 dict_values.clone(),
1723 )?;
1724
1725 IpcMessage::DictionaryBatch {
1726 id: dict.id(),
1727 is_delta: (dict.isDelta()),
1728 values: (dict_values),
1729 }
1730 }
1731 x => {
1732 return Err(ArrowError::ParseError(format!(
1733 "Unsupported message header type in IPC stream: '{x:?}'"
1734 )));
1735 }
1736 };
1737
1738 Ok(Some(ipc_message))
1739 }
1740
1741 pub fn get_ref(&self) -> &R {
1745 self.reader.inner()
1746 }
1747
1748 pub fn get_mut(&mut self) -> &mut R {
1752 self.reader.inner_mut()
1753 }
1754
1755 pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
1761 unsafe { self.skip_validation.set(skip_validation) };
1762 self
1763 }
1764}
1765
1766impl<R: Read> Iterator for StreamReader<R> {
1767 type Item = Result<RecordBatch, ArrowError>;
1768
1769 fn next(&mut self) -> Option<Self::Item> {
1770 self.maybe_next().transpose()
1771 }
1772}
1773
1774impl<R: Read> RecordBatchReader for StreamReader<R> {
1775 fn schema(&self) -> SchemaRef {
1776 self.schema.clone()
1777 }
1778}
1779
1780#[derive(Debug)]
1786#[allow(dead_code)]
1787pub(crate) enum IpcMessage {
1788 Schema(arrow_schema::Schema),
1789 RecordBatch(RecordBatch),
1790 DictionaryBatch {
1791 id: i64,
1792 is_delta: bool,
1793 values: ArrayRef,
1794 },
1795}
1796
1797struct MessageReader<R> {
1800 reader: R,
1801 buf: Vec<u8>,
1802}
1803
1804impl<R: Read> MessageReader<R> {
1805 fn new(reader: R) -> Self {
1806 Self {
1807 reader,
1808 buf: Vec::new(),
1809 }
1810 }
1811
1812 fn maybe_next(&mut self) -> Result<Option<(Message::Message<'_>, MutableBuffer)>, ArrowError> {
1823 let meta_len = self.read_meta_len()?;
1824 let Some(meta_len) = meta_len else {
1825 return Ok(None);
1826 };
1827
1828 self.buf.resize(meta_len, 0);
1829 self.reader.read_exact(&mut self.buf)?;
1830
1831 let message = crate::root_as_message(self.buf.as_slice()).map_err(|err| {
1832 ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
1833 })?;
1834
1835 let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
1836 self.reader.read_exact(&mut buf)?;
1837
1838 Ok(Some((message, buf)))
1839 }
1840
1841 fn inner_mut(&mut self) -> &mut R {
1843 &mut self.reader
1844 }
1845
1846 fn inner(&self) -> &R {
1848 &self.reader
1849 }
1850
1851 pub fn read_meta_len(&mut self) -> Result<Option<usize>, ArrowError> {
1860 let mut meta_len: [u8; 4] = [0; 4];
1861 match self.reader.read_exact(&mut meta_len) {
1862 Ok(_) => {}
1863 Err(e) => {
1864 return if e.kind() == std::io::ErrorKind::UnexpectedEof {
1865 Ok(None)
1869 } else {
1870 Err(ArrowError::from(e))
1871 };
1872 }
1873 };
1874
1875 let meta_len = {
1876 if meta_len == CONTINUATION_MARKER {
1879 self.reader.read_exact(&mut meta_len)?;
1880 }
1881
1882 i32::from_le_bytes(meta_len)
1883 };
1884
1885 if meta_len == 0 {
1886 return Ok(None);
1887 }
1888
1889 let meta_len = usize::try_from(meta_len)
1890 .map_err(|_| ArrowError::ParseError(format!("Invalid metadata length: {meta_len}")))?;
1891
1892 Ok(Some(meta_len))
1893 }
1894}
1895
1896#[cfg(test)]
1897mod tests {
1898 use std::io::Cursor;
1899
1900 use crate::convert::fb_to_schema;
1901 use crate::writer::{
1902 DictionaryTracker, IpcDataGenerator, IpcWriteOptions, unslice_run_array, write_message,
1903 };
1904
1905 use super::*;
1906
1907 use crate::{root_as_footer, root_as_message, size_prefixed_root_as_message};
1908 use arrow_array::builder::{PrimitiveRunBuilder, UnionBuilder};
1909 use arrow_array::types::*;
1910 use arrow_buffer::{NullBuffer, OffsetBuffer};
1911 use arrow_data::ArrayDataBuilder;
1912
1913 fn create_test_projection_schema() -> Schema {
1914 let list_data_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1916
1917 let fixed_size_list_data_type =
1918 DataType::FixedSizeList(Arc::new(Field::new_list_field(DataType::Int32, false)), 3);
1919
1920 let union_fields = UnionFields::from_fields(vec![
1921 Field::new("a", DataType::Int32, false),
1922 Field::new("b", DataType::Float64, false),
1923 ]);
1924
1925 let union_data_type = DataType::Union(union_fields, UnionMode::Dense);
1926
1927 let struct_fields = Fields::from(vec![
1928 Field::new("id", DataType::Int32, false),
1929 Field::new_list("list", Field::new_list_field(DataType::Int8, true), false),
1930 ]);
1931 let struct_data_type = DataType::Struct(struct_fields);
1932
1933 let run_encoded_data_type = DataType::RunEndEncoded(
1934 Arc::new(Field::new("run_ends", DataType::Int16, false)),
1935 Arc::new(Field::new("values", DataType::Int32, true)),
1936 );
1937
1938 Schema::new(vec![
1940 Field::new("f0", DataType::UInt32, false),
1941 Field::new("f1", DataType::Utf8, false),
1942 Field::new("f2", DataType::Boolean, false),
1943 Field::new("f3", union_data_type, true),
1944 Field::new("f4", DataType::Null, true),
1945 Field::new("f5", DataType::Float64, true),
1946 Field::new("f6", list_data_type, false),
1947 Field::new("f7", DataType::FixedSizeBinary(3), true),
1948 Field::new("f8", fixed_size_list_data_type, false),
1949 Field::new("f9", struct_data_type, false),
1950 Field::new("f10", run_encoded_data_type, false),
1951 Field::new("f11", DataType::Boolean, false),
1952 Field::new_dictionary("f12", DataType::Int8, DataType::Utf8, false),
1953 Field::new("f13", DataType::Utf8, false),
1954 ])
1955 }
1956
1957 fn create_test_projection_batch_data(schema: &Schema) -> RecordBatch {
1958 let array0 = UInt32Array::from(vec![1, 2, 3]);
1960 let array1 = StringArray::from(vec!["foo", "bar", "baz"]);
1961 let array2 = BooleanArray::from(vec![true, false, true]);
1962
1963 let mut union_builder = UnionBuilder::new_dense();
1964 union_builder.append::<Int32Type>("a", 1).unwrap();
1965 union_builder.append::<Float64Type>("b", 10.1).unwrap();
1966 union_builder.append_null::<Float64Type>("b").unwrap();
1967 let array3 = union_builder.build().unwrap();
1968
1969 let array4 = NullArray::new(3);
1970 let array5 = Float64Array::from(vec![Some(1.1), None, Some(3.3)]);
1971 let array6_values = vec![
1972 Some(vec![Some(10), Some(10), Some(10)]),
1973 Some(vec![Some(20), Some(20), Some(20)]),
1974 Some(vec![Some(30), Some(30)]),
1975 ];
1976 let array6 = ListArray::from_iter_primitive::<Int32Type, _, _>(array6_values);
1977 let array7_values = vec![vec![11, 12, 13], vec![22, 23, 24], vec![33, 34, 35]];
1978 let array7 = FixedSizeBinaryArray::try_from_iter(array7_values.into_iter()).unwrap();
1979
1980 let array8_values = ArrayData::builder(DataType::Int32)
1981 .len(9)
1982 .add_buffer(Buffer::from_slice_ref([40, 41, 42, 43, 44, 45, 46, 47, 48]))
1983 .build()
1984 .unwrap();
1985 let array8_data = ArrayData::builder(schema.field(8).data_type().clone())
1986 .len(3)
1987 .add_child_data(array8_values)
1988 .build()
1989 .unwrap();
1990 let array8 = FixedSizeListArray::from(array8_data);
1991
1992 let array9_id: ArrayRef = Arc::new(Int32Array::from(vec![1001, 1002, 1003]));
1993 let array9_list: ArrayRef =
1994 Arc::new(ListArray::from_iter_primitive::<Int8Type, _, _>(vec![
1995 Some(vec![Some(-10)]),
1996 Some(vec![Some(-20), Some(-20), Some(-20)]),
1997 Some(vec![Some(-30)]),
1998 ]));
1999 let array9 = ArrayDataBuilder::new(schema.field(9).data_type().clone())
2000 .add_child_data(array9_id.into_data())
2001 .add_child_data(array9_list.into_data())
2002 .len(3)
2003 .build()
2004 .unwrap();
2005 let array9 = StructArray::from(array9);
2006
2007 let array10_input = vec![Some(1_i32), None, None];
2008 let mut array10_builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
2009 array10_builder.extend(array10_input);
2010 let array10 = array10_builder.finish();
2011
2012 let array11 = BooleanArray::from(vec![false, false, true]);
2013
2014 let array12_values = StringArray::from(vec!["x", "yy", "zzz"]);
2015 let array12_keys = Int8Array::from_iter_values([1, 1, 2]);
2016 let array12 = DictionaryArray::new(array12_keys, Arc::new(array12_values));
2017
2018 let array13 = StringArray::from(vec!["a", "bb", "ccc"]);
2019
2020 RecordBatch::try_new(
2022 Arc::new(schema.clone()),
2023 vec![
2024 Arc::new(array0),
2025 Arc::new(array1),
2026 Arc::new(array2),
2027 Arc::new(array3),
2028 Arc::new(array4),
2029 Arc::new(array5),
2030 Arc::new(array6),
2031 Arc::new(array7),
2032 Arc::new(array8),
2033 Arc::new(array9),
2034 Arc::new(array10),
2035 Arc::new(array11),
2036 Arc::new(array12),
2037 Arc::new(array13),
2038 ],
2039 )
2040 .unwrap()
2041 }
2042
2043 #[test]
2044 fn test_negative_meta_len_start_stream() {
2045 let bytes = i32::to_le_bytes(-1);
2046 let mut buf = vec![];
2047 buf.extend(CONTINUATION_MARKER);
2048 buf.extend(bytes);
2049
2050 let reader_err = StreamReader::try_new(Cursor::new(buf), None).err();
2051 assert!(reader_err.is_some());
2052 assert_eq!(
2053 reader_err.unwrap().to_string(),
2054 "Parser error: Invalid metadata length: -1"
2055 );
2056 }
2057
2058 #[test]
2059 fn test_negative_meta_len_mid_stream() {
2060 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2061 let mut buf = Vec::new();
2062 {
2063 let mut writer = crate::writer::StreamWriter::try_new(&mut buf, &schema).unwrap();
2064 let batch =
2065 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(Int32Array::from(vec![1]))])
2066 .unwrap();
2067 writer.write(&batch).unwrap();
2068 }
2069
2070 let bytes = i32::to_le_bytes(-1);
2071 buf.extend(CONTINUATION_MARKER);
2072 buf.extend(bytes);
2073
2074 let mut reader = StreamReader::try_new(Cursor::new(buf), None).unwrap();
2075 assert!(reader.maybe_next().is_ok());
2077 let batch_err = reader.maybe_next().err();
2079 assert!(batch_err.is_some());
2080 assert_eq!(
2081 batch_err.unwrap().to_string(),
2082 "Parser error: Invalid metadata length: -1"
2083 );
2084 }
2085
2086 #[test]
2087 fn test_missing_buffer_metadata_error() {
2088 use crate::r#gen::Message::*;
2089 use flatbuffers::FlatBufferBuilder;
2090
2091 let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Int32, true)]));
2092
2093 let mut fbb = FlatBufferBuilder::new();
2096 let nodes = fbb.create_vector(&[FieldNode::new(2, 0)]);
2097 let buffers = fbb.create_vector(&[crate::Buffer::new(0, 8)]);
2098 let batch_offset = RecordBatch::create(
2099 &mut fbb,
2100 &RecordBatchArgs {
2101 length: 2,
2102 nodes: Some(nodes),
2103 buffers: Some(buffers),
2104 compression: None,
2105 variadicBufferCounts: None,
2106 },
2107 );
2108 fbb.finish_minimal(batch_offset);
2109 let batch_bytes = fbb.finished_data().to_vec();
2110 let batch = flatbuffers::root::<RecordBatch>(&batch_bytes).unwrap();
2111
2112 let data_buffer = Buffer::from(vec![0u8; 8]);
2113 let dictionaries: HashMap<i64, ArrayRef> = HashMap::new();
2114 let metadata = MetadataVersion::V5;
2115
2116 let decoder = RecordBatchDecoder::try_new(
2117 &data_buffer,
2118 batch,
2119 schema.clone(),
2120 &dictionaries,
2121 &metadata,
2122 )
2123 .unwrap();
2124
2125 let result = decoder.read_record_batch();
2126
2127 match result {
2128 Err(ArrowError::IpcError(msg)) => {
2129 assert_eq!(msg, "Buffer count mismatched with metadata");
2130 }
2131 other => panic!("unexpected error: {other:?}"),
2132 }
2133 }
2134
2135 #[test]
2137 fn test_read_legacy_empty_list_without_offsets_buffer() {
2138 use crate::r#gen::Message::*;
2139 use flatbuffers::FlatBufferBuilder;
2140
2141 let schema = Arc::new(Schema::new(vec![Field::new_list(
2142 "items",
2143 Field::new_list_field(DataType::Int32, true),
2144 true,
2145 )]));
2146
2147 let mut fbb = FlatBufferBuilder::new();
2150 let nodes = fbb.create_vector(&[
2151 FieldNode::new(0, 0), FieldNode::new(0, 0), ]);
2154 let buffers = fbb.create_vector(&[
2155 crate::Buffer::new(0, 0), crate::Buffer::new(0, 0), crate::Buffer::new(0, 0), crate::Buffer::new(0, 0), ]);
2160 let batch_offset = RecordBatch::create(
2161 &mut fbb,
2162 &RecordBatchArgs {
2163 length: 0,
2164 nodes: Some(nodes),
2165 buffers: Some(buffers),
2166 compression: None,
2167 variadicBufferCounts: None,
2168 },
2169 );
2170 fbb.finish_minimal(batch_offset);
2171 let batch_bytes = fbb.finished_data().to_vec();
2172 let batch = flatbuffers::root::<RecordBatch>(&batch_bytes).unwrap();
2173
2174 let body = Buffer::from(Vec::<u8>::new());
2175 let dictionaries: HashMap<i64, ArrayRef> = HashMap::new();
2176 let metadata = MetadataVersion::V5;
2177
2178 let decoder =
2179 RecordBatchDecoder::try_new(&body, batch, schema.clone(), &dictionaries, &metadata)
2180 .unwrap();
2181
2182 let read_batch = decoder.read_record_batch().unwrap();
2183 assert_eq!(read_batch.num_rows(), 0);
2184
2185 let list = read_batch
2186 .column(0)
2187 .as_any()
2188 .downcast_ref::<ListArray>()
2189 .unwrap();
2190 assert_eq!(list.len(), 0);
2191 assert_eq!(list.values().len(), 0);
2192 }
2193
2194 #[test]
2196 fn test_read_legacy_empty_utf8_and_binary_without_offsets_buffer() {
2197 use crate::r#gen::Message::*;
2198 use flatbuffers::FlatBufferBuilder;
2199
2200 let schema = Arc::new(Schema::new(vec![
2201 Field::new("name", DataType::Utf8, true),
2202 Field::new("payload", DataType::Binary, true),
2203 ]));
2204
2205 let mut fbb = FlatBufferBuilder::new();
2208 let nodes = fbb.create_vector(&[
2209 FieldNode::new(0, 0), FieldNode::new(0, 0), ]);
2212 let buffers = fbb.create_vector(&[
2213 crate::Buffer::new(0, 0), crate::Buffer::new(0, 0), crate::Buffer::new(0, 0), crate::Buffer::new(0, 0), crate::Buffer::new(0, 0), crate::Buffer::new(0, 0), ]);
2220 let batch_offset = RecordBatch::create(
2221 &mut fbb,
2222 &RecordBatchArgs {
2223 length: 0,
2224 nodes: Some(nodes),
2225 buffers: Some(buffers),
2226 compression: None,
2227 variadicBufferCounts: None,
2228 },
2229 );
2230 fbb.finish_minimal(batch_offset);
2231 let batch_bytes = fbb.finished_data().to_vec();
2232 let batch = flatbuffers::root::<RecordBatch>(&batch_bytes).unwrap();
2233
2234 let body = Buffer::from(Vec::<u8>::new());
2235 let dictionaries: HashMap<i64, ArrayRef> = HashMap::new();
2236 let metadata = MetadataVersion::V5;
2237
2238 let decoder =
2239 RecordBatchDecoder::try_new(&body, batch, schema.clone(), &dictionaries, &metadata)
2240 .unwrap();
2241
2242 let read_batch = decoder.read_record_batch().unwrap();
2243 assert_eq!(read_batch.num_rows(), 0);
2244
2245 let utf8 = read_batch
2246 .column(0)
2247 .as_any()
2248 .downcast_ref::<StringArray>()
2249 .unwrap();
2250 assert_eq!(utf8.len(), 0);
2251 assert_eq!(utf8.value_offsets(), [0]);
2252
2253 let binary = read_batch
2254 .column(1)
2255 .as_any()
2256 .downcast_ref::<BinaryArray>()
2257 .unwrap();
2258 assert_eq!(binary.len(), 0);
2259 assert_eq!(binary.value_offsets(), [0]);
2260 }
2261
2262 #[test]
2263 fn test_projection_array_values() {
2264 let schema = create_test_projection_schema();
2266
2267 let batch = create_test_projection_batch_data(&schema);
2269
2270 let mut buf = Vec::new();
2272 {
2273 let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
2274 writer.write(&batch).unwrap();
2275 writer.finish().unwrap();
2276 }
2277
2278 for index in 0..12 {
2280 let projection = vec![index];
2281 let reader = FileReader::try_new(std::io::Cursor::new(buf.clone()), Some(projection));
2282 let read_batch = reader.unwrap().next().unwrap().unwrap();
2283 let projected_column = read_batch.column(0);
2284 let expected_column = batch.column(index);
2285
2286 assert_eq!(projected_column.as_ref(), expected_column.as_ref());
2288 }
2289
2290 {
2291 let reader =
2293 FileReader::try_new(std::io::Cursor::new(buf.clone()), Some(vec![3, 2, 1]));
2294 let read_batch = reader.unwrap().next().unwrap().unwrap();
2295 let expected_batch = batch.project(&[3, 2, 1]).unwrap();
2296 assert_eq!(read_batch, expected_batch);
2297 }
2298 }
2299
2300 #[test]
2301 fn test_arrow_single_float_row() {
2302 let schema = Schema::new(vec![
2303 Field::new("a", DataType::Float32, false),
2304 Field::new("b", DataType::Float32, false),
2305 Field::new("c", DataType::Int32, false),
2306 Field::new("d", DataType::Int32, false),
2307 ]);
2308 let arrays = vec![
2309 Arc::new(Float32Array::from(vec![1.23])) as ArrayRef,
2310 Arc::new(Float32Array::from(vec![-6.50])) as ArrayRef,
2311 Arc::new(Int32Array::from(vec![2])) as ArrayRef,
2312 Arc::new(Int32Array::from(vec![1])) as ArrayRef,
2313 ];
2314 let batch = RecordBatch::try_new(Arc::new(schema.clone()), arrays).unwrap();
2315 let mut file = tempfile::tempfile().unwrap();
2317 let mut stream_writer = crate::writer::StreamWriter::try_new(&mut file, &schema).unwrap();
2318 stream_writer.write(&batch).unwrap();
2319 stream_writer.finish().unwrap();
2320
2321 drop(stream_writer);
2322
2323 file.rewind().unwrap();
2324
2325 let reader = StreamReader::try_new(&mut file, None).unwrap();
2327
2328 reader.for_each(|batch| {
2329 let batch = batch.unwrap();
2330 assert!(
2331 batch
2332 .column(0)
2333 .as_any()
2334 .downcast_ref::<Float32Array>()
2335 .unwrap()
2336 .value(0)
2337 != 0.0
2338 );
2339 assert!(
2340 batch
2341 .column(1)
2342 .as_any()
2343 .downcast_ref::<Float32Array>()
2344 .unwrap()
2345 .value(0)
2346 != 0.0
2347 );
2348 });
2349
2350 file.rewind().unwrap();
2351
2352 let reader = StreamReader::try_new(file, Some(vec![0, 3])).unwrap();
2354
2355 reader.for_each(|batch| {
2356 let batch = batch.unwrap();
2357 assert_eq!(batch.schema().fields().len(), 2);
2358 assert_eq!(batch.schema().fields()[0].data_type(), &DataType::Float32);
2359 assert_eq!(batch.schema().fields()[1].data_type(), &DataType::Int32);
2360 });
2361 }
2362
2363 fn write_ipc(rb: &RecordBatch) -> Vec<u8> {
2365 let mut buf = Vec::new();
2366 let mut writer = crate::writer::FileWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
2367 writer.write(rb).unwrap();
2368 writer.finish().unwrap();
2369 buf
2370 }
2371
2372 fn read_ipc(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2374 let mut reader = FileReader::try_new(std::io::Cursor::new(buf), None)?;
2375 reader.next().unwrap()
2376 }
2377
2378 fn read_ipc_skip_validation(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2381 let mut reader = unsafe {
2382 FileReader::try_new(std::io::Cursor::new(buf), None)?.with_skip_validation(true)
2383 };
2384 reader.next().unwrap()
2385 }
2386
2387 fn roundtrip_ipc(rb: &RecordBatch) -> RecordBatch {
2388 let buf = write_ipc(rb);
2389 read_ipc(&buf).unwrap()
2390 }
2391
2392 fn read_ipc_with_decoder(buf: Vec<u8>) -> Result<RecordBatch, ArrowError> {
2395 read_ipc_with_decoder_inner(buf, false)
2396 }
2397
2398 fn read_ipc_with_decoder_skip_validation(buf: Vec<u8>) -> Result<RecordBatch, ArrowError> {
2401 read_ipc_with_decoder_inner(buf, true)
2402 }
2403
2404 fn read_ipc_with_decoder_inner(
2405 buf: Vec<u8>,
2406 skip_validation: bool,
2407 ) -> Result<RecordBatch, ArrowError> {
2408 let buffer = Buffer::from_vec(buf);
2409 let trailer_start = buffer.len() - 10;
2410 let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap())?;
2411 let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start])
2412 .map_err(|e| ArrowError::InvalidArgumentError(format!("Invalid footer: {e}")))?;
2413
2414 let schema = fb_to_schema(footer.schema().unwrap());
2415
2416 let mut decoder = unsafe {
2417 FileDecoder::new(Arc::new(schema), footer.version())
2418 .with_skip_validation(skip_validation)
2419 };
2420 for block in footer.dictionaries().iter().flatten() {
2422 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2423 let data = buffer.slice_with_length(block.offset() as _, block_len);
2424 decoder.read_dictionary(block, &data)?
2425 }
2426
2427 let batches = footer.recordBatches().unwrap();
2429 assert_eq!(batches.len(), 1); let block = batches.get(0);
2432 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2433 let data = buffer.slice_with_length(block.offset() as _, block_len);
2434 Ok(decoder.read_record_batch(block, &data)?.unwrap())
2435 }
2436
2437 fn write_stream(rb: &RecordBatch) -> Vec<u8> {
2439 let mut buf = Vec::new();
2440 let mut writer = crate::writer::StreamWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
2441 writer.write(rb).unwrap();
2442 writer.finish().unwrap();
2443 buf
2444 }
2445
2446 fn read_stream(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2448 let mut reader = StreamReader::try_new(std::io::Cursor::new(buf), None)?;
2449 reader.next().unwrap()
2450 }
2451
2452 fn read_stream_skip_validation(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2455 let mut reader = unsafe {
2456 StreamReader::try_new(std::io::Cursor::new(buf), None)?.with_skip_validation(true)
2457 };
2458 reader.next().unwrap()
2459 }
2460
2461 fn roundtrip_ipc_stream(rb: &RecordBatch) -> RecordBatch {
2462 let buf = write_stream(rb);
2463 read_stream(&buf).unwrap()
2464 }
2465
2466 #[test]
2467 fn test_roundtrip_with_custom_metadata() {
2468 let schema = Schema::new(vec![Field::new("dummy", DataType::Float64, false)]);
2469 let mut buf = Vec::new();
2470 let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
2471 let mut test_metadata = HashMap::new();
2472 test_metadata.insert("abc".to_string(), "abc".to_string());
2473 test_metadata.insert("def".to_string(), "def".to_string());
2474 for (k, v) in &test_metadata {
2475 writer.write_metadata(k, v);
2476 }
2477 writer.finish().unwrap();
2478 drop(writer);
2479
2480 let reader = crate::reader::FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
2481 assert_eq!(reader.custom_metadata(), &test_metadata);
2482 }
2483
2484 #[test]
2485 fn test_roundtrip_nested_dict() {
2486 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2487
2488 let array = Arc::new(inner) as ArrayRef;
2489
2490 let dctfield = Arc::new(Field::new("dict", array.data_type().clone(), false));
2491
2492 let s = StructArray::from(vec![(dctfield, array)]);
2493 let struct_array = Arc::new(s) as ArrayRef;
2494
2495 let schema = Arc::new(Schema::new(vec![Field::new(
2496 "struct",
2497 struct_array.data_type().clone(),
2498 false,
2499 )]));
2500
2501 let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2502
2503 assert_eq!(batch, roundtrip_ipc(&batch));
2504 }
2505
2506 #[test]
2507 fn test_roundtrip_nested_dict_no_preserve_dict_id() {
2508 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2509
2510 let array = Arc::new(inner) as ArrayRef;
2511
2512 let dctfield = Arc::new(Field::new("dict", array.data_type().clone(), false));
2513
2514 let s = StructArray::from(vec![(dctfield, array)]);
2515 let struct_array = Arc::new(s) as ArrayRef;
2516
2517 let schema = Arc::new(Schema::new(vec![Field::new(
2518 "struct",
2519 struct_array.data_type().clone(),
2520 false,
2521 )]));
2522
2523 let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2524
2525 let mut buf = Vec::new();
2526 let mut writer = crate::writer::FileWriter::try_new_with_options(
2527 &mut buf,
2528 batch.schema_ref(),
2529 IpcWriteOptions::default(),
2530 )
2531 .unwrap();
2532 writer.write(&batch).unwrap();
2533 writer.finish().unwrap();
2534 drop(writer);
2535
2536 let mut reader = FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
2537
2538 assert_eq!(batch, reader.next().unwrap().unwrap());
2539 }
2540
2541 fn check_union_with_builder(mut builder: UnionBuilder) {
2542 builder.append::<Int32Type>("a", 1).unwrap();
2543 builder.append_null::<Int32Type>("a").unwrap();
2544 builder.append::<Float64Type>("c", 3.0).unwrap();
2545 builder.append::<Int32Type>("a", 4).unwrap();
2546 builder.append::<Int64Type>("d", 11).unwrap();
2547 let union = builder.build().unwrap();
2548
2549 let schema = Arc::new(Schema::new(vec![Field::new(
2550 "union",
2551 union.data_type().clone(),
2552 false,
2553 )]));
2554
2555 let union_array = Arc::new(union) as ArrayRef;
2556
2557 let rb = RecordBatch::try_new(schema, vec![union_array]).unwrap();
2558 let rb2 = roundtrip_ipc(&rb);
2559 assert_eq!(rb.schema(), rb2.schema());
2562 assert_eq!(rb.num_columns(), rb2.num_columns());
2563 assert_eq!(rb.num_rows(), rb2.num_rows());
2564 let union1 = rb.column(0);
2565 let union2 = rb2.column(0);
2566
2567 assert_eq!(union1, union2);
2568 }
2569
2570 #[test]
2571 fn test_roundtrip_dense_union() {
2572 check_union_with_builder(UnionBuilder::new_dense());
2573 }
2574
2575 #[test]
2576 fn test_roundtrip_sparse_union() {
2577 check_union_with_builder(UnionBuilder::new_sparse());
2578 }
2579
2580 #[test]
2581 fn test_roundtrip_struct_empty_fields() {
2582 let nulls = NullBuffer::from(&[true, true, false]);
2583 let rb = RecordBatch::try_from_iter([(
2584 "",
2585 Arc::new(StructArray::new_empty_fields(nulls.len(), Some(nulls))) as _,
2586 )])
2587 .unwrap();
2588 let rb2 = roundtrip_ipc(&rb);
2589 assert_eq!(rb, rb2);
2590 }
2591
2592 #[test]
2593 fn test_roundtrip_stream_run_array_sliced() {
2594 let run_array_1: Int32RunArray = vec!["a", "a", "a", "b", "b", "c", "c", "c"]
2595 .into_iter()
2596 .collect();
2597 let run_array_1_sliced = run_array_1.slice(2, 5);
2598
2599 let run_array_2_inupt = vec![Some(1_i32), None, None, Some(2), Some(2)];
2600 let mut run_array_2_builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
2601 run_array_2_builder.extend(run_array_2_inupt);
2602 let run_array_2 = run_array_2_builder.finish();
2603
2604 let schema = Arc::new(Schema::new(vec![
2605 Field::new(
2606 "run_array_1_sliced",
2607 run_array_1_sliced.data_type().clone(),
2608 false,
2609 ),
2610 Field::new("run_array_2", run_array_2.data_type().clone(), false),
2611 ]));
2612 let input_batch = RecordBatch::try_new(
2613 schema,
2614 vec![Arc::new(run_array_1_sliced.clone()), Arc::new(run_array_2)],
2615 )
2616 .unwrap();
2617 let output_batch = roundtrip_ipc_stream(&input_batch);
2618
2619 assert_eq!(input_batch.column(1), output_batch.column(1));
2623
2624 let run_array_1_unsliced = unslice_run_array(run_array_1_sliced.into_data()).unwrap();
2625 assert_eq!(run_array_1_unsliced, output_batch.column(0).into_data());
2626 }
2627
2628 #[test]
2629 fn test_roundtrip_stream_nested_dict() {
2630 let xs = vec!["AA", "BB", "AA", "CC", "BB"];
2631 let dict = Arc::new(
2632 xs.clone()
2633 .into_iter()
2634 .collect::<DictionaryArray<Int8Type>>(),
2635 );
2636 let string_array: ArrayRef = Arc::new(StringArray::from(xs.clone()));
2637 let struct_array = StructArray::from(vec![
2638 (
2639 Arc::new(Field::new("f2.1", DataType::Utf8, false)),
2640 string_array,
2641 ),
2642 (
2643 Arc::new(Field::new("f2.2_struct", dict.data_type().clone(), false)),
2644 dict.clone() as ArrayRef,
2645 ),
2646 ]);
2647 let schema = Arc::new(Schema::new(vec![
2648 Field::new("f1_string", DataType::Utf8, false),
2649 Field::new("f2_struct", struct_array.data_type().clone(), false),
2650 ]));
2651 let input_batch = RecordBatch::try_new(
2652 schema,
2653 vec![
2654 Arc::new(StringArray::from(xs.clone())),
2655 Arc::new(struct_array),
2656 ],
2657 )
2658 .unwrap();
2659 let output_batch = roundtrip_ipc_stream(&input_batch);
2660 assert_eq!(input_batch, output_batch);
2661 }
2662
2663 #[test]
2664 fn test_roundtrip_stream_nested_dict_of_map_of_dict() {
2665 let values = StringArray::from(vec![Some("a"), None, Some("b"), Some("c")]);
2666 let values = Arc::new(values) as ArrayRef;
2667 let value_dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 3, 1]);
2668 let value_dict_array = DictionaryArray::new(value_dict_keys, values.clone());
2669
2670 let key_dict_keys = Int8Array::from_iter_values([0, 0, 2, 1, 1, 3]);
2671 let key_dict_array = DictionaryArray::new(key_dict_keys, values);
2672
2673 #[allow(deprecated)]
2674 let keys_field = Arc::new(Field::new_dict(
2675 "keys",
2676 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2677 true, 1,
2679 false,
2680 ));
2681 #[allow(deprecated)]
2682 let values_field = Arc::new(Field::new_dict(
2683 "values",
2684 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2685 true,
2686 2,
2687 false,
2688 ));
2689 let entry_struct = StructArray::from(vec![
2690 (keys_field, make_array(key_dict_array.into_data())),
2691 (values_field, make_array(value_dict_array.into_data())),
2692 ]);
2693 let map_data_type = DataType::Map(
2694 Arc::new(Field::new(
2695 "entries",
2696 entry_struct.data_type().clone(),
2697 false,
2698 )),
2699 false,
2700 );
2701
2702 let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 6]);
2703 let map_data = ArrayData::builder(map_data_type)
2704 .len(3)
2705 .add_buffer(entry_offsets)
2706 .add_child_data(entry_struct.into_data())
2707 .build()
2708 .unwrap();
2709 let map_array = MapArray::from(map_data);
2710
2711 let dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 2, 1]);
2712 let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array));
2713
2714 let schema = Arc::new(Schema::new(vec![Field::new(
2715 "f1",
2716 dict_dict_array.data_type().clone(),
2717 false,
2718 )]));
2719 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2720 let output_batch = roundtrip_ipc_stream(&input_batch);
2721 assert_eq!(input_batch, output_batch);
2722 }
2723
2724 fn test_roundtrip_stream_dict_of_list_of_dict_impl<
2725 OffsetSize: OffsetSizeTrait,
2726 U: ArrowNativeType,
2727 >(
2728 list_data_type: DataType,
2729 offsets: &[U; 5],
2730 ) {
2731 let values = StringArray::from(vec![Some("a"), None, Some("c"), None]);
2732 let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
2733 let dict_array = DictionaryArray::new(keys, Arc::new(values));
2734 let dict_data = dict_array.to_data();
2735
2736 let value_offsets = Buffer::from_slice_ref(offsets);
2737
2738 let list_data = ArrayData::builder(list_data_type)
2739 .len(4)
2740 .add_buffer(value_offsets)
2741 .add_child_data(dict_data)
2742 .build()
2743 .unwrap();
2744 let list_array = GenericListArray::<OffsetSize>::from(list_data);
2745
2746 let keys_for_dict = Int8Array::from_iter_values([0, 3, 0, 1, 1, 2, 0, 1, 3]);
2747 let dict_dict_array = DictionaryArray::new(keys_for_dict, Arc::new(list_array));
2748
2749 let schema = Arc::new(Schema::new(vec![Field::new(
2750 "f1",
2751 dict_dict_array.data_type().clone(),
2752 false,
2753 )]));
2754 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2755 let output_batch = roundtrip_ipc_stream(&input_batch);
2756 assert_eq!(input_batch, output_batch);
2757 }
2758
2759 #[test]
2760 fn test_roundtrip_stream_dict_of_list_of_dict() {
2761 #[allow(deprecated)]
2763 let list_data_type = DataType::List(Arc::new(Field::new_dict(
2764 "item",
2765 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2766 true,
2767 1,
2768 false,
2769 )));
2770 let offsets: &[i32; 5] = &[0, 2, 4, 4, 6];
2771 test_roundtrip_stream_dict_of_list_of_dict_impl::<i32, i32>(list_data_type, offsets);
2772
2773 #[allow(deprecated)]
2775 let list_data_type = DataType::LargeList(Arc::new(Field::new_dict(
2776 "item",
2777 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2778 true,
2779 1,
2780 false,
2781 )));
2782 let offsets: &[i64; 5] = &[0, 2, 4, 4, 7];
2783 test_roundtrip_stream_dict_of_list_of_dict_impl::<i64, i64>(list_data_type, offsets);
2784 }
2785
2786 #[test]
2787 fn test_roundtrip_stream_dict_of_fixed_size_list_of_dict() {
2788 let values = StringArray::from(vec![Some("a"), None, Some("c"), None]);
2789 let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3, 1, 2]);
2790 let dict_array = DictionaryArray::new(keys, Arc::new(values));
2791 let dict_data = dict_array.into_data();
2792
2793 #[allow(deprecated)]
2794 let list_data_type = DataType::FixedSizeList(
2795 Arc::new(Field::new_dict(
2796 "item",
2797 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2798 true,
2799 1,
2800 false,
2801 )),
2802 3,
2803 );
2804 let list_data = ArrayData::builder(list_data_type)
2805 .len(3)
2806 .add_child_data(dict_data)
2807 .build()
2808 .unwrap();
2809 let list_array = FixedSizeListArray::from(list_data);
2810
2811 let keys_for_dict = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]);
2812 let dict_dict_array = DictionaryArray::new(keys_for_dict, Arc::new(list_array));
2813
2814 let schema = Arc::new(Schema::new(vec![Field::new(
2815 "f1",
2816 dict_dict_array.data_type().clone(),
2817 false,
2818 )]));
2819 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2820 let output_batch = roundtrip_ipc_stream(&input_batch);
2821 assert_eq!(input_batch, output_batch);
2822 }
2823
2824 const LONG_TEST_STRING: &str =
2825 "This is a long string to make sure binary view array handles it";
2826
2827 #[test]
2828 fn test_roundtrip_view_types() {
2829 let schema = Schema::new(vec![
2830 Field::new("field_1", DataType::BinaryView, true),
2831 Field::new("field_2", DataType::Utf8, true),
2832 Field::new("field_3", DataType::Utf8View, true),
2833 ]);
2834 let bin_values: Vec<Option<&[u8]>> = vec![
2835 Some(b"foo"),
2836 None,
2837 Some(b"bar"),
2838 Some(LONG_TEST_STRING.as_bytes()),
2839 ];
2840 let utf8_values: Vec<Option<&str>> =
2841 vec![Some("foo"), None, Some("bar"), Some(LONG_TEST_STRING)];
2842 let bin_view_array = BinaryViewArray::from_iter(bin_values);
2843 let utf8_array = StringArray::from_iter(utf8_values.iter());
2844 let utf8_view_array = StringViewArray::from_iter(utf8_values);
2845 let record_batch = RecordBatch::try_new(
2846 Arc::new(schema.clone()),
2847 vec![
2848 Arc::new(bin_view_array),
2849 Arc::new(utf8_array),
2850 Arc::new(utf8_view_array),
2851 ],
2852 )
2853 .unwrap();
2854
2855 assert_eq!(record_batch, roundtrip_ipc(&record_batch));
2856 assert_eq!(record_batch, roundtrip_ipc_stream(&record_batch));
2857
2858 let sliced_batch = record_batch.slice(1, 2);
2859 assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
2860 assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
2861 }
2862
2863 #[test]
2864 fn test_roundtrip_view_types_nested_dict() {
2865 let bin_values: Vec<Option<&[u8]>> = vec![
2866 Some(b"foo"),
2867 None,
2868 Some(b"bar"),
2869 Some(LONG_TEST_STRING.as_bytes()),
2870 Some(b"field"),
2871 ];
2872 let utf8_values: Vec<Option<&str>> = vec![
2873 Some("foo"),
2874 None,
2875 Some("bar"),
2876 Some(LONG_TEST_STRING),
2877 Some("field"),
2878 ];
2879 let bin_view_array = Arc::new(BinaryViewArray::from_iter(bin_values));
2880 let utf8_view_array = Arc::new(StringViewArray::from_iter(utf8_values));
2881
2882 let key_dict_keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
2883 let key_dict_array = DictionaryArray::new(key_dict_keys, utf8_view_array.clone());
2884 #[allow(deprecated)]
2885 let keys_field = Arc::new(Field::new_dict(
2886 "keys",
2887 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8View)),
2888 true,
2889 1,
2890 false,
2891 ));
2892
2893 let value_dict_keys = Int8Array::from_iter_values([0, 3, 0, 1, 2, 0, 1]);
2894 let value_dict_array = DictionaryArray::new(value_dict_keys, bin_view_array);
2895 #[allow(deprecated)]
2896 let values_field = Arc::new(Field::new_dict(
2897 "values",
2898 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::BinaryView)),
2899 true,
2900 2,
2901 false,
2902 ));
2903 let entry_struct = StructArray::from(vec![
2904 (keys_field, make_array(key_dict_array.into_data())),
2905 (values_field, make_array(value_dict_array.into_data())),
2906 ]);
2907
2908 let map_data_type = DataType::Map(
2909 Arc::new(Field::new(
2910 "entries",
2911 entry_struct.data_type().clone(),
2912 false,
2913 )),
2914 false,
2915 );
2916 let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 7]);
2917 let map_data = ArrayData::builder(map_data_type)
2918 .len(3)
2919 .add_buffer(entry_offsets)
2920 .add_child_data(entry_struct.into_data())
2921 .build()
2922 .unwrap();
2923 let map_array = MapArray::from(map_data);
2924
2925 let dict_keys = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]);
2926 let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array));
2927 let schema = Arc::new(Schema::new(vec![Field::new(
2928 "f1",
2929 dict_dict_array.data_type().clone(),
2930 false,
2931 )]));
2932 let batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2933 assert_eq!(batch, roundtrip_ipc(&batch));
2934 assert_eq!(batch, roundtrip_ipc_stream(&batch));
2935
2936 let sliced_batch = batch.slice(1, 2);
2937 assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
2938 assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
2939 }
2940
2941 #[test]
2942 fn test_no_columns_batch() {
2943 let schema = Arc::new(Schema::empty());
2944 let options = RecordBatchOptions::new()
2945 .with_match_field_names(true)
2946 .with_row_count(Some(10));
2947 let input_batch = RecordBatch::try_new_with_options(schema, vec![], &options).unwrap();
2948 let output_batch = roundtrip_ipc_stream(&input_batch);
2949 assert_eq!(input_batch, output_batch);
2950 }
2951
2952 #[test]
2953 fn test_unaligned() {
2954 let batch = RecordBatch::try_from_iter(vec![(
2955 "i32",
2956 Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
2957 )])
2958 .unwrap();
2959
2960 let r#gen = IpcDataGenerator {};
2961 let mut dict_tracker = DictionaryTracker::new(false);
2962 let (_, encoded) = r#gen
2963 .encode(
2964 &batch,
2965 &mut dict_tracker,
2966 &Default::default(),
2967 &mut Default::default(),
2968 )
2969 .unwrap();
2970
2971 let message = root_as_message(&encoded.ipc_message).unwrap();
2972
2973 let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
2975 buffer.push(0_u8);
2976 buffer.extend_from_slice(&encoded.arrow_data);
2977 let b = Buffer::from(buffer).slice(1);
2978 assert_ne!(b.as_ptr().align_offset(8), 0);
2979
2980 let ipc_batch = message.header_as_record_batch().unwrap();
2981 let roundtrip = RecordBatchDecoder::try_new(
2982 &b,
2983 ipc_batch,
2984 batch.schema(),
2985 &Default::default(),
2986 &message.version(),
2987 )
2988 .unwrap()
2989 .with_require_alignment(false)
2990 .read_record_batch()
2991 .unwrap();
2992 assert_eq!(batch, roundtrip);
2993 }
2994
2995 #[test]
2996 fn test_unaligned_throws_error_with_require_alignment() {
2997 let batch = RecordBatch::try_from_iter(vec![(
2998 "i32",
2999 Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
3000 )])
3001 .unwrap();
3002
3003 let r#gen = IpcDataGenerator {};
3004 let mut dict_tracker = DictionaryTracker::new(false);
3005 let (_, encoded) = r#gen
3006 .encode(
3007 &batch,
3008 &mut dict_tracker,
3009 &Default::default(),
3010 &mut Default::default(),
3011 )
3012 .unwrap();
3013
3014 let message = root_as_message(&encoded.ipc_message).unwrap();
3015
3016 let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
3018 buffer.push(0_u8);
3019 buffer.extend_from_slice(&encoded.arrow_data);
3020 let b = Buffer::from(buffer).slice(1);
3021 assert_ne!(b.as_ptr().align_offset(8), 0);
3022
3023 let ipc_batch = message.header_as_record_batch().unwrap();
3024 let result = RecordBatchDecoder::try_new(
3025 &b,
3026 ipc_batch,
3027 batch.schema(),
3028 &Default::default(),
3029 &message.version(),
3030 )
3031 .unwrap()
3032 .with_require_alignment(true)
3033 .read_record_batch();
3034
3035 let error = result.unwrap_err();
3036 assert_eq!(
3037 error.to_string(),
3038 "Invalid argument error: Misaligned buffers[0] in array of type Int32, \
3039 offset from expected alignment of 4 by 1"
3040 );
3041 }
3042
3043 #[test]
3044 fn test_file_with_massive_column_count() {
3045 let limit = 600_000;
3047
3048 let fields = (0..limit)
3049 .map(|i| Field::new(format!("{i}"), DataType::Boolean, false))
3050 .collect::<Vec<_>>();
3051 let schema = Arc::new(Schema::new(fields));
3052 let batch = RecordBatch::new_empty(schema);
3053
3054 let mut buf = Vec::new();
3055 let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
3056 writer.write(&batch).unwrap();
3057 writer.finish().unwrap();
3058 drop(writer);
3059
3060 let mut reader = FileReaderBuilder::new()
3061 .with_max_footer_fb_tables(1_500_000)
3062 .build(std::io::Cursor::new(buf))
3063 .unwrap();
3064 let roundtrip_batch = reader.next().unwrap().unwrap();
3065
3066 assert_eq!(batch, roundtrip_batch);
3067 }
3068
3069 #[test]
3070 fn test_file_with_deeply_nested_columns() {
3071 let limit = 61;
3073
3074 let fields = (0..limit).fold(
3075 vec![Field::new("leaf", DataType::Boolean, false)],
3076 |field, index| vec![Field::new_struct(format!("{index}"), field, false)],
3077 );
3078 let schema = Arc::new(Schema::new(fields));
3079 let batch = RecordBatch::new_empty(schema);
3080
3081 let mut buf = Vec::new();
3082 let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
3083 writer.write(&batch).unwrap();
3084 writer.finish().unwrap();
3085 drop(writer);
3086
3087 let mut reader = FileReaderBuilder::new()
3088 .with_max_footer_fb_depth(65)
3089 .build(std::io::Cursor::new(buf))
3090 .unwrap();
3091 let roundtrip_batch = reader.next().unwrap().unwrap();
3092
3093 assert_eq!(batch, roundtrip_batch);
3094 }
3095
3096 #[test]
3097 fn test_invalid_struct_array_ipc_read_errors() {
3098 let a_field = Field::new("a", DataType::Int32, false);
3099 let b_field = Field::new("b", DataType::Int32, false);
3100 let struct_fields = Fields::from(vec![a_field.clone(), b_field.clone()]);
3101
3102 let a_array_data = ArrayData::builder(a_field.data_type().clone())
3103 .len(4)
3104 .add_buffer(Buffer::from_slice_ref([1, 2, 3, 4]))
3105 .build()
3106 .unwrap();
3107 let b_array_data = ArrayData::builder(b_field.data_type().clone())
3108 .len(3)
3109 .add_buffer(Buffer::from_slice_ref([5, 6, 7]))
3110 .build()
3111 .unwrap();
3112
3113 let invalid_struct_arr = unsafe {
3114 StructArray::new_unchecked(
3115 struct_fields,
3116 vec![make_array(a_array_data), make_array(b_array_data)],
3117 None,
3118 )
3119 };
3120
3121 expect_ipc_validation_error(
3122 Arc::new(invalid_struct_arr),
3123 "Invalid argument error: Incorrect array length for StructArray field \"b\", expected 4 got 3",
3124 );
3125 }
3126
3127 #[test]
3128 fn test_invalid_nested_array_ipc_read_errors() {
3129 let a_field = Field::new("a", DataType::Int32, false);
3131 let b_field = Field::new("b", DataType::Utf8, false);
3132
3133 let schema = Arc::new(Schema::new(vec![Field::new_struct(
3134 "s",
3135 vec![a_field.clone(), b_field.clone()],
3136 false,
3137 )]));
3138
3139 let a_array_data = ArrayData::builder(a_field.data_type().clone())
3140 .len(4)
3141 .add_buffer(Buffer::from_slice_ref([1, 2, 3, 4]))
3142 .build()
3143 .unwrap();
3144 let b_array_data = {
3146 let valid: &[u8] = b" ";
3147 let mut invalid = vec![];
3148 invalid.extend_from_slice(b"ValidString");
3149 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3150 let binary_array =
3151 BinaryArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
3152 let array = unsafe {
3153 StringArray::new_unchecked(
3154 binary_array.offsets().clone(),
3155 binary_array.values().clone(),
3156 binary_array.nulls().cloned(),
3157 )
3158 };
3159 array.into_data()
3160 };
3161 let struct_data_type = schema.field(0).data_type();
3162
3163 let invalid_struct_arr = unsafe {
3164 make_array(
3165 ArrayData::builder(struct_data_type.clone())
3166 .len(4)
3167 .add_child_data(a_array_data)
3168 .add_child_data(b_array_data)
3169 .build_unchecked(),
3170 )
3171 };
3172 expect_ipc_validation_error(
3173 invalid_struct_arr,
3174 "Invalid argument error: Invalid UTF8 sequence at string index 3 (3..18): invalid utf-8 sequence of 1 bytes from index 11",
3175 );
3176 }
3177
3178 #[test]
3179 fn test_same_dict_id_without_preserve() {
3180 let batch = RecordBatch::try_new(
3181 Arc::new(Schema::new(
3182 ["a", "b"]
3183 .iter()
3184 .map(|name| {
3185 #[allow(deprecated)]
3186 Field::new_dict(
3187 name.to_string(),
3188 DataType::Dictionary(
3189 Box::new(DataType::Int32),
3190 Box::new(DataType::Utf8),
3191 ),
3192 true,
3193 0,
3194 false,
3195 )
3196 })
3197 .collect::<Vec<Field>>(),
3198 )),
3199 vec![
3200 Arc::new(
3201 vec![Some("c"), Some("d")]
3202 .into_iter()
3203 .collect::<DictionaryArray<Int32Type>>(),
3204 ) as ArrayRef,
3205 Arc::new(
3206 vec![Some("e"), Some("f")]
3207 .into_iter()
3208 .collect::<DictionaryArray<Int32Type>>(),
3209 ) as ArrayRef,
3210 ],
3211 )
3212 .expect("Failed to create RecordBatch");
3213
3214 let mut buf = vec![];
3216 {
3217 let mut writer = crate::writer::StreamWriter::try_new_with_options(
3218 &mut buf,
3219 batch.schema().as_ref(),
3220 crate::writer::IpcWriteOptions::default(),
3221 )
3222 .expect("Failed to create StreamWriter");
3223 writer.write(&batch).expect("Failed to write RecordBatch");
3224 writer.finish().expect("Failed to finish StreamWriter");
3225 }
3226
3227 StreamReader::try_new(std::io::Cursor::new(buf), None)
3228 .expect("Failed to create StreamReader")
3229 .for_each(|decoded_batch| {
3230 assert_eq!(decoded_batch.expect("Failed to read RecordBatch"), batch);
3231 });
3232 }
3233
3234 #[test]
3235 fn test_validation_of_invalid_list_array() {
3236 let array = unsafe {
3238 let values = Int32Array::from(vec![1, 2, 3]);
3239 let bad_offsets = ScalarBuffer::<i32>::from(vec![0, 2, 4, 2]); let offsets = OffsetBuffer::new_unchecked(bad_offsets); let field = Field::new_list_field(DataType::Int32, true);
3242 let nulls = None;
3243 ListArray::new(Arc::new(field), offsets, Arc::new(values), nulls)
3244 };
3245
3246 expect_ipc_validation_error(
3247 Arc::new(array),
3248 "Invalid argument error: Offset invariant failure: offset at position 2 out of bounds: 4 > 2",
3249 );
3250 }
3251
3252 #[test]
3253 fn test_validation_of_invalid_string_array() {
3254 let valid: &[u8] = b" ";
3255 let mut invalid = vec![];
3256 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3257 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3258 let binary_array = BinaryArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
3259 let array = unsafe {
3262 StringArray::new_unchecked(
3263 binary_array.offsets().clone(),
3264 binary_array.values().clone(),
3265 binary_array.nulls().cloned(),
3266 )
3267 };
3268 expect_ipc_validation_error(
3269 Arc::new(array),
3270 "Invalid argument error: Invalid UTF8 sequence at string index 3 (3..45): invalid utf-8 sequence of 1 bytes from index 38",
3271 );
3272 }
3273
3274 #[test]
3275 fn test_validation_of_invalid_string_view_array() {
3276 let valid: &[u8] = b" ";
3277 let mut invalid = vec![];
3278 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3279 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3280 let binary_view_array =
3281 BinaryViewArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
3282 let array = unsafe {
3285 StringViewArray::new_unchecked(
3286 binary_view_array.views().clone(),
3287 binary_view_array.data_buffers().to_vec(),
3288 binary_view_array.nulls().cloned(),
3289 )
3290 };
3291 expect_ipc_validation_error(
3292 Arc::new(array),
3293 "Invalid argument error: Encountered non-UTF-8 data at index 3: invalid utf-8 sequence of 1 bytes from index 38",
3294 );
3295 }
3296
3297 #[test]
3300 fn test_validation_of_invalid_dictionary_array() {
3301 let array = unsafe {
3302 let values = StringArray::from_iter_values(["a", "b", "c"]);
3303 let keys = Int32Array::from(vec![1, 200]); DictionaryArray::new_unchecked(keys, Arc::new(values))
3305 };
3306
3307 expect_ipc_validation_error(
3308 Arc::new(array),
3309 "Invalid argument error: Value at position 1 out of bounds: 200 (should be in [0, 2])",
3310 );
3311 }
3312
3313 #[test]
3314 fn test_validation_of_invalid_union_array() {
3315 let array = unsafe {
3316 let fields = UnionFields::try_new(
3317 vec![1, 3], vec![
3319 Field::new("a", DataType::Int32, false),
3320 Field::new("b", DataType::Utf8, false),
3321 ],
3322 )
3323 .unwrap();
3324 let type_ids = ScalarBuffer::from(vec![1i8, 2, 3]); let offsets = None;
3326 let children: Vec<ArrayRef> = vec![
3327 Arc::new(Int32Array::from(vec![10, 20, 30])),
3328 Arc::new(StringArray::from(vec![Some("a"), Some("b"), Some("c")])),
3329 ];
3330
3331 UnionArray::new_unchecked(fields, type_ids, offsets, children)
3332 };
3333
3334 expect_ipc_validation_error(
3335 Arc::new(array),
3336 "Invalid argument error: Type Ids values must match one of the field type ids",
3337 );
3338 }
3339
3340 const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3343
3344 fn expect_ipc_validation_error(array: ArrayRef, expected_err: &str) {
3346 let rb = RecordBatch::try_from_iter([("a", array)]).unwrap();
3347
3348 let buf = write_stream(&rb); read_stream_skip_validation(&buf).unwrap();
3351 let err = read_stream(&buf).unwrap_err();
3352 assert_eq!(err.to_string(), expected_err);
3353
3354 let buf = write_ipc(&rb); read_ipc_skip_validation(&buf).unwrap();
3357 let err = read_ipc(&buf).unwrap_err();
3358 assert_eq!(err.to_string(), expected_err);
3359
3360 read_ipc_with_decoder_skip_validation(buf.clone()).unwrap();
3362 let err = read_ipc_with_decoder(buf).unwrap_err();
3363 assert_eq!(err.to_string(), expected_err);
3364 }
3365
3366 #[test]
3367 fn test_roundtrip_schema() {
3368 let schema = Schema::new(vec![
3369 Field::new(
3370 "a",
3371 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3372 false,
3373 ),
3374 Field::new(
3375 "b",
3376 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3377 false,
3378 ),
3379 ]);
3380
3381 let options = IpcWriteOptions::default();
3382 let data_gen = IpcDataGenerator::default();
3383 let mut dict_tracker = DictionaryTracker::new(false);
3384 let encoded_data =
3385 data_gen.schema_to_bytes_with_dictionary_tracker(&schema, &mut dict_tracker, &options);
3386 let mut schema_bytes = vec![];
3387 write_message(&mut schema_bytes, encoded_data, &options).expect("write_message");
3388
3389 let begin_offset: usize = if schema_bytes[0..4].eq(&CONTINUATION_MARKER) {
3390 4
3391 } else {
3392 0
3393 };
3394
3395 size_prefixed_root_as_message(&schema_bytes[begin_offset..])
3396 .expect_err("size_prefixed_root_as_message");
3397
3398 let msg = parse_message(&schema_bytes).expect("parse_message");
3399 let ipc_schema = msg.header_as_schema().expect("header_as_schema");
3400 let new_schema = fb_to_schema(ipc_schema);
3401
3402 assert_eq!(schema, new_schema);
3403 }
3404
3405 #[test]
3406 fn test_negative_meta_len() {
3407 let bytes = i32::to_le_bytes(-1);
3408 let mut buf = vec![];
3409 buf.extend(CONTINUATION_MARKER);
3410 buf.extend(bytes);
3411
3412 let reader = StreamReader::try_new(Cursor::new(buf), None);
3413 assert!(reader.is_err());
3414 }
3415
3416 #[test]
3422 fn test_read_null_dict_without_dictionary_batch() {
3423 let keys = Int32Array::new_null(4);
3425 let values: ArrayRef = new_empty_array(&DataType::Utf8);
3426 let dict_array = DictionaryArray::new(keys, values);
3427
3428 let schema = Arc::new(Schema::new(vec![Field::new(
3429 "d",
3430 dict_array.data_type().clone(),
3431 true,
3432 )]));
3433 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(dict_array)]).unwrap();
3434
3435 let full_stream = write_stream(&batch);
3437
3438 let mut stripped = Vec::new();
3442 let mut cursor = Cursor::new(&full_stream);
3443 loop {
3444 let mut header = [0u8; 4];
3447 if cursor.read_exact(&mut header).is_err() {
3448 break;
3449 }
3450 if header == CONTINUATION_MARKER && cursor.read_exact(&mut header).is_err() {
3451 break;
3452 }
3453 let meta_len = u32::from_le_bytes(header) as usize;
3454 if meta_len == 0 {
3455 stripped.extend_from_slice(&CONTINUATION_MARKER);
3457 stripped.extend_from_slice(&0u32.to_le_bytes());
3458 break;
3459 }
3460 let mut meta_buf = vec![0u8; meta_len];
3461 cursor.read_exact(&mut meta_buf).unwrap();
3462
3463 let message = root_as_message(&meta_buf).unwrap();
3464 let body_len = message.bodyLength() as usize;
3465 let mut body_buf = vec![0u8; body_len];
3466 cursor.read_exact(&mut body_buf).unwrap();
3467
3468 if message.header_type() == crate::MessageHeader::DictionaryBatch {
3469 continue;
3472 }
3473 stripped.extend_from_slice(&CONTINUATION_MARKER);
3474 stripped.extend_from_slice(&(meta_len as u32).to_le_bytes());
3475 stripped.extend_from_slice(&meta_buf);
3476 stripped.extend_from_slice(&body_buf);
3477 }
3478
3479 let result = read_stream(&stripped).unwrap();
3481 assert_eq!(result.num_rows(), 4);
3482 assert_eq!(result.num_columns(), 1);
3483
3484 let col = result.column(0);
3485 assert_eq!(col.null_count(), 4);
3486 assert_eq!(col.len(), 4);
3487 assert!(matches!(col.data_type(), DataType::Dictionary(_, _)));
3489 }
3490
3491 #[test]
3494 fn test_projection_skip_list_view() {
3495 use crate::reader::FileReader;
3496 use crate::writer::FileWriter;
3497 use arrow_array::{
3498 GenericListViewArray, Int32Array, RecordBatch,
3499 builder::{GenericListViewBuilder, UInt32Builder},
3500 };
3501 use arrow_schema::{DataType, Field, Schema};
3502 use std::sync::Arc;
3503
3504 let mut builder = GenericListViewBuilder::<i32, _>::new(UInt32Builder::new());
3506
3507 builder.values().append_value(1);
3508 builder.values().append_value(2);
3509 builder.append(true);
3510
3511 builder.append(false);
3512
3513 builder.values().append_value(3);
3514 builder.values().append_value(4);
3515 builder.append(true);
3516
3517 let list_view: GenericListViewArray<i32> = builder.finish();
3518
3519 let values = Int32Array::from(vec![10, 20, 30]);
3521
3522 let schema = Arc::new(Schema::new(vec![
3524 Field::new("a", list_view.data_type().clone(), true),
3525 Field::new("b", DataType::Int32, false),
3526 ]));
3527 let batch =
3529 RecordBatch::try_new(schema, vec![Arc::new(list_view), Arc::new(values.clone())])
3530 .unwrap();
3531
3532 let mut buf = Vec::new();
3534 {
3535 let mut writer = FileWriter::try_new(&mut buf, &batch.schema()).unwrap();
3536 writer.write(&batch).unwrap();
3537 writer.finish().unwrap();
3538 }
3539
3540 let mut reader = FileReader::try_new(std::io::Cursor::new(buf), Some(vec![1])).unwrap();
3542 let read_batch = reader.next().unwrap().unwrap();
3543
3544 assert_eq!(read_batch.num_columns(), 1);
3546 assert_eq!(read_batch.column(0).as_ref(), &values);
3547 }
3548
3549 #[test]
3552 fn test_projection_skip_union_v4() {
3553 use crate::MetadataVersion;
3554 use crate::reader::FileReader;
3555 use crate::writer::{FileWriter, IpcWriteOptions};
3556 use arrow_array::{
3557 ArrayRef, Int32Array, RecordBatch, builder::UnionBuilder, types::Int32Type,
3558 };
3559 use arrow_schema::{DataType, Field, Schema};
3560 use std::sync::Arc;
3561
3562 let mut builder = UnionBuilder::new_dense();
3564 builder.append::<Int32Type>("a", 1).unwrap();
3565 builder.append::<Int32Type>("a", 2).unwrap();
3566 builder.append::<Int32Type>("a", 3).unwrap();
3567 let union = builder.build().unwrap();
3568
3569 let values = Int32Array::from(vec![10, 20, 30]);
3571
3572 let schema = Arc::new(Schema::new(vec![
3574 Field::new("union", union.data_type().clone(), false),
3575 Field::new("values", DataType::Int32, false),
3576 ]));
3577
3578 let batch = RecordBatch::try_new(
3580 schema,
3581 vec![Arc::new(union) as ArrayRef, Arc::new(values.clone())],
3582 )
3583 .unwrap();
3584
3585 let mut buf = Vec::new();
3587 {
3588 let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap();
3589 let mut writer =
3590 FileWriter::try_new_with_options(&mut buf, &batch.schema(), options).unwrap();
3591 writer.write(&batch).unwrap();
3592 writer.finish().unwrap();
3593 }
3594 let mut reader = FileReader::try_new(std::io::Cursor::new(buf), Some(vec![1])).unwrap();
3596 let read_batch = reader.next().unwrap().unwrap();
3597
3598 assert_eq!(read_batch.num_columns(), 1);
3600 assert_eq!(read_batch.column(0).as_ref(), &values);
3601 }
3602
3603 #[test]
3607 fn test_projection_skip_fixed_width_types() {
3608 use std::sync::Arc;
3609
3610 use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, make_array};
3611 use arrow_buffer::Buffer;
3612 use arrow_data::ArrayData;
3613 use arrow_schema::{DataType, Field, IntervalUnit, Schema, TimeUnit};
3614
3615 use crate::reader::FileReader;
3616 use crate::writer::FileWriter;
3617
3618 fn make_array_for_type(data_type: DataType) -> ArrayRef {
3620 let len = 3;
3621
3622 if matches!(data_type, DataType::Boolean) {
3623 return Arc::new(BooleanArray::from(vec![true, false, true]));
3624 }
3625
3626 let width = data_type.primitive_width().unwrap();
3627 let data = ArrayData::builder(data_type)
3628 .len(len)
3629 .add_buffer(Buffer::from(vec![0_u8; len * width]))
3630 .build()
3631 .unwrap();
3632
3633 make_array(data)
3634 }
3635
3636 let data_types = vec![
3638 DataType::Boolean,
3639 DataType::Int8,
3640 DataType::Int16,
3641 DataType::Int32,
3642 DataType::Int64,
3643 DataType::UInt8,
3644 DataType::UInt16,
3645 DataType::UInt32,
3646 DataType::UInt64,
3647 DataType::Float16,
3648 DataType::Float32,
3649 DataType::Float64,
3650 DataType::Timestamp(TimeUnit::Second, None),
3651 DataType::Date32,
3652 DataType::Date64,
3653 DataType::Time32(TimeUnit::Second),
3654 DataType::Time64(TimeUnit::Microsecond),
3655 DataType::Duration(TimeUnit::Second),
3656 DataType::Interval(IntervalUnit::YearMonth),
3657 DataType::Interval(IntervalUnit::DayTime),
3658 DataType::Interval(IntervalUnit::MonthDayNano),
3659 DataType::Decimal32(9, 2),
3660 DataType::Decimal64(18, 2),
3661 DataType::Decimal128(38, 2),
3662 DataType::Decimal256(76, 2),
3663 ];
3664
3665 for data_type in data_types {
3670 let skipped = make_array_for_type(data_type.clone());
3671 let values = Int32Array::from(vec![10, 20, 30]);
3672
3673 let schema = Arc::new(Schema::new(vec![
3674 Field::new("skipped", data_type, false),
3675 Field::new("values", DataType::Int32, false),
3676 ]));
3677
3678 let batch =
3679 RecordBatch::try_new(schema, vec![skipped, Arc::new(values.clone())]).unwrap();
3680
3681 let mut buf = Vec::new();
3683 {
3684 let mut writer = FileWriter::try_new(&mut buf, &batch.schema()).unwrap();
3685 writer.write(&batch).unwrap();
3686 writer.finish().unwrap();
3687 }
3688
3689 let mut reader = FileReader::try_new(std::io::Cursor::new(buf), Some(vec![1])).unwrap();
3691 let read_batch = reader.next().unwrap().unwrap();
3692
3693 assert_eq!(read_batch.num_columns(), 1);
3695 assert_eq!(read_batch.column(0).as_ref(), &values);
3696 }
3697 }
3698}