1mod stream;
28pub use stream::*;
29
30use arrow_select::concat;
31
32use flatbuffers::{VectorIter, VerifierOptions};
33use std::collections::{HashMap, VecDeque};
34use std::fmt;
35use std::io::{BufReader, Read, Seek, SeekFrom};
36use std::sync::Arc;
37
38use arrow_array::*;
39use arrow_buffer::{
40 ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, NullBuffer, ScalarBuffer,
41};
42use arrow_data::{ArrayData, ArrayDataBuilder, UnsafeFlag};
43use arrow_schema::*;
44
45use crate::compression::{CompressionCodec, DecompressionContext};
46use crate::r#gen::Message::{self};
47use crate::{Block, CONTINUATION_MARKER, FieldNode, MetadataVersion};
48use DataType::*;
49
50fn read_buffer(
60 buf: &crate::Buffer,
61 a_data: &Buffer,
62 compression_codec: Option<CompressionCodec>,
63 decompression_context: &mut DecompressionContext,
64) -> Result<Buffer, ArrowError> {
65 let start_offset = buf.offset() as usize;
66 let buf_data = a_data.slice_with_length(start_offset, buf.length() as usize);
67 match (buf_data.is_empty(), compression_codec) {
69 (true, _) | (_, None) => Ok(buf_data),
70 (false, Some(decompressor)) => {
71 decompressor.decompress_to_buffer(&buf_data, decompression_context)
72 }
73 }
74}
75impl RecordBatchDecoder<'_> {
76 fn create_array(
89 &mut self,
90 field: &Field,
91 variadic_counts: &mut VecDeque<i64>,
92 ) -> Result<ArrayRef, ArrowError> {
93 let data_type = field.data_type();
94 match data_type {
95 Utf8 | Binary | LargeBinary | LargeUtf8 => {
96 let field_node = self.next_node(field)?;
97 let buffers = [
98 self.next_buffer()?,
99 self.next_buffer()?,
100 self.next_buffer()?,
101 ];
102 self.create_primitive_array(field_node, data_type, &buffers)
103 }
104 BinaryView | Utf8View => {
105 let count = variadic_counts
106 .pop_front()
107 .ok_or(ArrowError::IpcError(format!(
108 "Missing variadic count for {data_type} column"
109 )))?;
110 let count = count + 2; let buffers = (0..count)
112 .map(|_| self.next_buffer())
113 .collect::<Result<Vec<_>, _>>()?;
114 let field_node = self.next_node(field)?;
115 self.create_primitive_array(field_node, data_type, &buffers)
116 }
117 FixedSizeBinary(_) => {
118 let field_node = self.next_node(field)?;
119 let buffers = [self.next_buffer()?, self.next_buffer()?];
120 self.create_primitive_array(field_node, data_type, &buffers)
121 }
122 List(list_field) | LargeList(list_field) | Map(list_field, _) => {
123 let list_node = self.next_node(field)?;
124 let list_buffers = [self.next_buffer()?, self.next_buffer()?];
125 let values = self.create_array(list_field, variadic_counts)?;
126 self.create_list_array(list_node, data_type, &list_buffers, values)
127 }
128 ListView(list_field) | LargeListView(list_field) => {
129 let list_node = self.next_node(field)?;
130 let list_buffers = [
131 self.next_buffer()?, self.next_buffer()?, self.next_buffer()?, ];
135 let values = self.create_array(list_field, variadic_counts)?;
136 self.create_list_view_array(list_node, data_type, &list_buffers, values)
137 }
138 FixedSizeList(list_field, _) => {
139 let list_node = self.next_node(field)?;
140 let list_buffers = [self.next_buffer()?];
141 let values = self.create_array(list_field, variadic_counts)?;
142 self.create_list_array(list_node, data_type, &list_buffers, values)
143 }
144 Struct(struct_fields) => {
145 let struct_node = self.next_node(field)?;
146 let null_buffer = self.next_buffer()?;
147
148 let mut struct_arrays = vec![];
150 for struct_field in struct_fields {
153 let child = self.create_array(struct_field, variadic_counts)?;
154 struct_arrays.push(child);
155 }
156 self.create_struct_array(struct_node, null_buffer, struct_fields, struct_arrays)
157 }
158 RunEndEncoded(run_ends_field, values_field) => {
159 let run_node = self.next_node(field)?;
160 let run_ends = self.create_array(run_ends_field, variadic_counts)?;
161 let values = self.create_array(values_field, variadic_counts)?;
162
163 let run_array_length = run_node.length() as usize;
164 let builder = ArrayData::builder(data_type.clone())
165 .len(run_array_length)
166 .offset(0)
167 .add_child_data(run_ends.into_data())
168 .add_child_data(values.into_data())
169 .null_count(run_node.null_count() as usize);
170
171 self.create_array_from_builder(builder)
172 }
173 Dictionary(_, _) => {
175 let index_node = self.next_node(field)?;
176 let index_buffers = [self.next_buffer()?, self.next_buffer()?];
177
178 #[allow(deprecated)]
179 let dict_id = field.dict_id().ok_or_else(|| {
180 ArrowError::ParseError(format!("Field {field} does not have dict id"))
181 })?;
182
183 let value_array = self.dictionaries_by_id.get(&dict_id).ok_or_else(|| {
184 ArrowError::ParseError(format!(
185 "Cannot find a dictionary batch with dict id: {dict_id}"
186 ))
187 })?;
188
189 self.create_dictionary_array(
190 index_node,
191 data_type,
192 &index_buffers,
193 value_array.clone(),
194 )
195 }
196 Union(fields, mode) => {
197 let union_node = self.next_node(field)?;
198 let len = union_node.length() as usize;
199
200 if self.version < MetadataVersion::V5 {
203 self.next_buffer()?;
204 }
205
206 let type_ids: ScalarBuffer<i8> =
207 self.next_buffer()?.slice_with_length(0, len).into();
208
209 let value_offsets = match mode {
210 UnionMode::Dense => {
211 let offsets: ScalarBuffer<i32> =
212 self.next_buffer()?.slice_with_length(0, len * 4).into();
213 Some(offsets)
214 }
215 UnionMode::Sparse => None,
216 };
217
218 let mut children = Vec::with_capacity(fields.len());
219
220 for (_id, field) in fields.iter() {
221 let child = self.create_array(field, variadic_counts)?;
222 children.push(child);
223 }
224
225 let array = if self.skip_validation.get() {
226 unsafe {
228 UnionArray::new_unchecked(fields.clone(), type_ids, value_offsets, children)
229 }
230 } else {
231 UnionArray::try_new(fields.clone(), type_ids, value_offsets, children)?
232 };
233 Ok(Arc::new(array))
234 }
235 Null => {
236 let node = self.next_node(field)?;
237 let length = node.length();
238 let null_count = node.null_count();
239
240 if length != null_count {
241 return Err(ArrowError::SchemaError(format!(
242 "Field {field} of NullArray has unequal null_count {null_count} and len {length}"
243 )));
244 }
245
246 let builder = ArrayData::builder(data_type.clone())
247 .len(length as usize)
248 .offset(0);
249 self.create_array_from_builder(builder)
250 }
251 _ => {
252 let field_node = self.next_node(field)?;
253 let buffers = [self.next_buffer()?, self.next_buffer()?];
254 self.create_primitive_array(field_node, data_type, &buffers)
255 }
256 }
257 }
258
259 fn create_primitive_array(
262 &self,
263 field_node: &FieldNode,
264 data_type: &DataType,
265 buffers: &[Buffer],
266 ) -> Result<ArrayRef, ArrowError> {
267 let length = field_node.length() as usize;
268 let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
269 let mut builder = match data_type {
270 Utf8 | Binary | LargeBinary | LargeUtf8 => {
271 ArrayData::builder(data_type.clone())
273 .len(length)
274 .buffers(buffers[1..3].to_vec())
275 .null_bit_buffer(null_buffer)
276 }
277 BinaryView | Utf8View => ArrayData::builder(data_type.clone())
278 .len(length)
279 .buffers(buffers[1..].to_vec())
280 .null_bit_buffer(null_buffer),
281 _ if data_type.is_primitive() || matches!(data_type, Boolean | FixedSizeBinary(_)) => {
282 ArrayData::builder(data_type.clone())
284 .len(length)
285 .add_buffer(buffers[1].clone())
286 .null_bit_buffer(null_buffer)
287 }
288 t => unreachable!("Data type {:?} either unsupported or not primitive", t),
289 };
290
291 builder = builder.null_count(field_node.null_count() as usize);
292
293 self.create_array_from_builder(builder)
294 }
295
296 fn create_array_from_builder(&self, builder: ArrayDataBuilder) -> Result<ArrayRef, ArrowError> {
298 let mut builder = builder.align_buffers(!self.require_alignment);
299 if self.skip_validation.get() {
300 unsafe { builder = builder.skip_validation(true) }
302 };
303 Ok(make_array(builder.build()?))
304 }
305
306 fn create_list_array(
309 &self,
310 field_node: &FieldNode,
311 data_type: &DataType,
312 buffers: &[Buffer],
313 child_array: ArrayRef,
314 ) -> Result<ArrayRef, ArrowError> {
315 let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
316 let length = field_node.length() as usize;
317 let child_data = child_array.into_data();
318 let mut builder = match data_type {
319 List(_) | LargeList(_) | Map(_, _) => ArrayData::builder(data_type.clone())
320 .len(length)
321 .add_buffer(buffers[1].clone())
322 .add_child_data(child_data)
323 .null_bit_buffer(null_buffer),
324
325 FixedSizeList(_, _) => ArrayData::builder(data_type.clone())
326 .len(length)
327 .add_child_data(child_data)
328 .null_bit_buffer(null_buffer),
329
330 _ => unreachable!("Cannot create list or map array from {:?}", data_type),
331 };
332
333 builder = builder.null_count(field_node.null_count() as usize);
334
335 self.create_array_from_builder(builder)
336 }
337
338 fn create_list_view_array(
339 &self,
340 field_node: &FieldNode,
341 data_type: &DataType,
342 buffers: &[Buffer],
343 child_array: ArrayRef,
344 ) -> Result<ArrayRef, ArrowError> {
345 assert!(matches!(data_type, ListView(_) | LargeListView(_)));
346
347 let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
348 let length = field_node.length() as usize;
349 let child_data = child_array.into_data();
350
351 self.create_array_from_builder(
352 ArrayData::builder(data_type.clone())
353 .len(length)
354 .add_buffer(buffers[1].clone()) .add_buffer(buffers[2].clone()) .add_child_data(child_data)
357 .null_bit_buffer(null_buffer)
358 .null_count(field_node.null_count() as usize),
359 )
360 }
361
362 fn create_struct_array(
363 &self,
364 struct_node: &FieldNode,
365 null_buffer: Buffer,
366 struct_fields: &Fields,
367 struct_arrays: Vec<ArrayRef>,
368 ) -> Result<ArrayRef, ArrowError> {
369 let null_count = struct_node.null_count() as usize;
370 let len = struct_node.length() as usize;
371 let skip_validation = self.skip_validation.get();
372
373 let nulls = if null_count > 0 {
374 let validity_buffer = BooleanBuffer::new(null_buffer, 0, len);
375 let null_buffer = if skip_validation {
376 unsafe { NullBuffer::new_unchecked(validity_buffer, null_count) }
378 } else {
379 let null_buffer = NullBuffer::new(validity_buffer);
380
381 if null_buffer.null_count() != null_count {
382 return Err(ArrowError::InvalidArgumentError(format!(
383 "null_count value ({}) doesn't match actual number of nulls in array ({})",
384 null_count,
385 null_buffer.null_count()
386 )));
387 }
388
389 null_buffer
390 };
391
392 Some(null_buffer)
393 } else {
394 None
395 };
396 if struct_arrays.is_empty() {
397 return Ok(Arc::new(StructArray::new_empty_fields(len, nulls)));
400 }
401
402 let struct_array = if skip_validation {
403 unsafe { StructArray::new_unchecked(struct_fields.clone(), struct_arrays, nulls) }
405 } else {
406 StructArray::try_new(struct_fields.clone(), struct_arrays, nulls)?
407 };
408
409 Ok(Arc::new(struct_array))
410 }
411
412 fn create_dictionary_array(
415 &self,
416 field_node: &FieldNode,
417 data_type: &DataType,
418 buffers: &[Buffer],
419 value_array: ArrayRef,
420 ) -> Result<ArrayRef, ArrowError> {
421 if let Dictionary(_, _) = *data_type {
422 let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
423 let builder = ArrayData::builder(data_type.clone())
424 .len(field_node.length() as usize)
425 .add_buffer(buffers[1].clone())
426 .add_child_data(value_array.into_data())
427 .null_bit_buffer(null_buffer)
428 .null_count(field_node.null_count() as usize);
429 self.create_array_from_builder(builder)
430 } else {
431 unreachable!("Cannot create dictionary array from {:?}", data_type)
432 }
433 }
434}
435
436pub struct RecordBatchDecoder<'a> {
442 batch: crate::RecordBatch<'a>,
444 schema: SchemaRef,
446 dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
448 compression: Option<CompressionCodec>,
450 decompression_context: DecompressionContext,
452 version: MetadataVersion,
454 data: &'a Buffer,
456 nodes: VectorIter<'a, FieldNode>,
458 buffers: VectorIter<'a, crate::Buffer>,
460 projection: Option<&'a [usize]>,
463 require_alignment: bool,
466 skip_validation: UnsafeFlag,
470}
471
472impl<'a> RecordBatchDecoder<'a> {
473 fn try_new(
475 buf: &'a Buffer,
476 batch: crate::RecordBatch<'a>,
477 schema: SchemaRef,
478 dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
479 metadata: &'a MetadataVersion,
480 ) -> Result<Self, ArrowError> {
481 let buffers = batch.buffers().ok_or_else(|| {
482 ArrowError::IpcError("Unable to get buffers from IPC RecordBatch".to_string())
483 })?;
484 let field_nodes = batch.nodes().ok_or_else(|| {
485 ArrowError::IpcError("Unable to get field nodes from IPC RecordBatch".to_string())
486 })?;
487
488 let batch_compression = batch.compression();
489 let compression = batch_compression
490 .map(|batch_compression| batch_compression.codec().try_into())
491 .transpose()?;
492
493 Ok(Self {
494 batch,
495 schema,
496 dictionaries_by_id,
497 compression,
498 decompression_context: DecompressionContext::new(),
499 version: *metadata,
500 data: buf,
501 nodes: field_nodes.iter(),
502 buffers: buffers.iter(),
503 projection: None,
504 require_alignment: false,
505 skip_validation: UnsafeFlag::new(),
506 })
507 }
508
509 pub fn with_projection(mut self, projection: Option<&'a [usize]>) -> Self {
514 self.projection = projection;
515 self
516 }
517
518 pub fn with_require_alignment(mut self, require_alignment: bool) -> Self {
524 self.require_alignment = require_alignment;
525 self
526 }
527
528 pub(crate) fn with_skip_validation(mut self, skip_validation: UnsafeFlag) -> Self {
540 self.skip_validation = skip_validation;
541 self
542 }
543
544 fn read_record_batch(mut self) -> Result<RecordBatch, ArrowError> {
546 let mut variadic_counts: VecDeque<i64> = self
547 .batch
548 .variadicBufferCounts()
549 .into_iter()
550 .flatten()
551 .collect();
552
553 let options = RecordBatchOptions::new().with_row_count(Some(self.batch.length() as usize));
554
555 let schema = Arc::clone(&self.schema);
556 if let Some(projection) = self.projection {
557 let mut arrays = vec![];
558 for (idx, field) in schema.fields().iter().enumerate() {
560 if let Some(proj_idx) = projection.iter().position(|p| p == &idx) {
562 let child = self.create_array(field, &mut variadic_counts)?;
563 arrays.push((proj_idx, child));
564 } else {
565 self.skip_field(field, &mut variadic_counts)?;
566 }
567 }
568
569 arrays.sort_by_key(|t| t.0);
570
571 let schema = Arc::new(schema.project(projection)?);
572 let columns = arrays.into_iter().map(|t| t.1).collect::<Vec<_>>();
573
574 if self.skip_validation.get() {
575 unsafe {
577 Ok(RecordBatch::new_unchecked(
578 schema,
579 columns,
580 self.batch.length() as usize,
581 ))
582 }
583 } else {
584 assert!(variadic_counts.is_empty());
585 RecordBatch::try_new_with_options(schema, columns, &options)
586 }
587 } else {
588 let mut children = vec![];
589 for field in schema.fields() {
591 let child = self.create_array(field, &mut variadic_counts)?;
592 children.push(child);
593 }
594
595 if self.skip_validation.get() {
596 unsafe {
598 Ok(RecordBatch::new_unchecked(
599 schema,
600 children,
601 self.batch.length() as usize,
602 ))
603 }
604 } else {
605 assert!(variadic_counts.is_empty());
606 RecordBatch::try_new_with_options(schema, children, &options)
607 }
608 }
609 }
610
611 fn next_buffer(&mut self) -> Result<Buffer, ArrowError> {
612 let buffer = self.buffers.next().ok_or_else(|| {
613 ArrowError::IpcError("Buffer count mismatched with metadata".to_string())
614 })?;
615 read_buffer(
616 buffer,
617 self.data,
618 self.compression,
619 &mut self.decompression_context,
620 )
621 }
622
623 fn skip_buffer(&mut self) {
624 self.buffers.next().unwrap();
625 }
626
627 fn next_node(&mut self, field: &Field) -> Result<&'a FieldNode, ArrowError> {
628 self.nodes.next().ok_or_else(|| {
629 ArrowError::SchemaError(format!(
630 "Invalid data for schema. {field} refers to node not found in schema",
631 ))
632 })
633 }
634
635 fn skip_field(
636 &mut self,
637 field: &Field,
638 variadic_count: &mut VecDeque<i64>,
639 ) -> Result<(), ArrowError> {
640 self.next_node(field)?;
641
642 match field.data_type() {
643 Utf8 | Binary | LargeBinary | LargeUtf8 => {
644 for _ in 0..3 {
645 self.skip_buffer()
646 }
647 }
648 Utf8View | BinaryView => {
649 let count = variadic_count
650 .pop_front()
651 .ok_or(ArrowError::IpcError(format!(
652 "Missing variadic count for {} column",
653 field.data_type()
654 )))?;
655 let count = count + 2; for _i in 0..count {
657 self.skip_buffer()
658 }
659 }
660 FixedSizeBinary(_) => {
661 self.skip_buffer();
662 self.skip_buffer();
663 }
664 List(list_field) | LargeList(list_field) | Map(list_field, _) => {
665 self.skip_buffer();
666 self.skip_buffer();
667 self.skip_field(list_field, variadic_count)?;
668 }
669 FixedSizeList(list_field, _) => {
670 self.skip_buffer();
671 self.skip_field(list_field, variadic_count)?;
672 }
673 Struct(struct_fields) => {
674 self.skip_buffer();
675
676 for struct_field in struct_fields {
678 self.skip_field(struct_field, variadic_count)?
679 }
680 }
681 RunEndEncoded(run_ends_field, values_field) => {
682 self.skip_field(run_ends_field, variadic_count)?;
683 self.skip_field(values_field, variadic_count)?;
684 }
685 Dictionary(_, _) => {
686 self.skip_buffer(); self.skip_buffer(); }
689 Union(fields, mode) => {
690 self.skip_buffer(); match mode {
693 UnionMode::Dense => self.skip_buffer(),
694 UnionMode::Sparse => {}
695 };
696
697 for (_, field) in fields.iter() {
698 self.skip_field(field, variadic_count)?
699 }
700 }
701 Null => {} _ => {
703 self.skip_buffer();
704 self.skip_buffer();
705 }
706 };
707 Ok(())
708 }
709}
710
711pub fn read_record_batch(
722 buf: &Buffer,
723 batch: crate::RecordBatch,
724 schema: SchemaRef,
725 dictionaries_by_id: &HashMap<i64, ArrayRef>,
726 projection: Option<&[usize]>,
727 metadata: &MetadataVersion,
728) -> Result<RecordBatch, ArrowError> {
729 RecordBatchDecoder::try_new(buf, batch, schema, dictionaries_by_id, metadata)?
730 .with_projection(projection)
731 .with_require_alignment(false)
732 .read_record_batch()
733}
734
735pub fn read_dictionary(
738 buf: &Buffer,
739 batch: crate::DictionaryBatch,
740 schema: &Schema,
741 dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
742 metadata: &MetadataVersion,
743) -> Result<(), ArrowError> {
744 read_dictionary_impl(
745 buf,
746 batch,
747 schema,
748 dictionaries_by_id,
749 metadata,
750 false,
751 UnsafeFlag::new(),
752 )
753}
754
755fn read_dictionary_impl(
756 buf: &Buffer,
757 batch: crate::DictionaryBatch,
758 schema: &Schema,
759 dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
760 metadata: &MetadataVersion,
761 require_alignment: bool,
762 skip_validation: UnsafeFlag,
763) -> Result<(), ArrowError> {
764 let id = batch.id();
765
766 let dictionary_values = get_dictionary_values(
767 buf,
768 batch,
769 schema,
770 dictionaries_by_id,
771 metadata,
772 require_alignment,
773 skip_validation,
774 )?;
775
776 update_dictionaries(dictionaries_by_id, batch.isDelta(), id, dictionary_values)?;
777
778 Ok(())
779}
780
781fn update_dictionaries(
790 dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
791 is_delta: bool,
792 dict_id: i64,
793 dict_values: ArrayRef,
794) -> Result<(), ArrowError> {
795 if !is_delta {
796 dictionaries_by_id.insert(dict_id, dict_values.clone());
800 return Ok(());
801 }
802
803 let existing = dictionaries_by_id.get(&dict_id).ok_or_else(|| {
804 ArrowError::InvalidArgumentError(format!(
805 "No existing dictionary for delta dictionary with id '{dict_id}'"
806 ))
807 })?;
808
809 let combined = concat::concat(&[existing, &dict_values]).map_err(|e| {
810 ArrowError::InvalidArgumentError(format!("Failed to concat delta dictionary: {e}"))
811 })?;
812
813 dictionaries_by_id.insert(dict_id, combined);
814
815 Ok(())
816}
817
818fn get_dictionary_values(
822 buf: &Buffer,
823 batch: crate::DictionaryBatch,
824 schema: &Schema,
825 dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
826 metadata: &MetadataVersion,
827 require_alignment: bool,
828 skip_validation: UnsafeFlag,
829) -> Result<ArrayRef, ArrowError> {
830 let id = batch.id();
831 #[allow(deprecated)]
832 let fields_using_this_dictionary = schema.fields_with_dict_id(id);
833 let first_field = fields_using_this_dictionary.first().ok_or_else(|| {
834 ArrowError::InvalidArgumentError(format!("dictionary id {id} not found in schema"))
835 })?;
836
837 let dictionary_values: ArrayRef = match first_field.data_type() {
841 DataType::Dictionary(_, value_type) => {
842 let value = value_type.as_ref().clone();
844 let schema = Schema::new(vec![Field::new("", value, true)]);
845 let record_batch = RecordBatchDecoder::try_new(
847 buf,
848 batch.data().unwrap(),
849 Arc::new(schema),
850 dictionaries_by_id,
851 metadata,
852 )?
853 .with_require_alignment(require_alignment)
854 .with_skip_validation(skip_validation)
855 .read_record_batch()?;
856
857 Some(record_batch.column(0).clone())
858 }
859 _ => None,
860 }
861 .ok_or_else(|| {
862 ArrowError::InvalidArgumentError(format!("dictionary id {id} not found in schema"))
863 })?;
864
865 Ok(dictionary_values)
866}
867
868fn read_block<R: Read + Seek>(mut reader: R, block: &Block) -> Result<Buffer, ArrowError> {
870 reader.seek(SeekFrom::Start(block.offset() as u64))?;
871 let body_len = block.bodyLength().to_usize().unwrap();
872 let metadata_len = block.metaDataLength().to_usize().unwrap();
873 let total_len = body_len.checked_add(metadata_len).unwrap();
874
875 let mut buf = MutableBuffer::from_len_zeroed(total_len);
876 reader.read_exact(&mut buf)?;
877 Ok(buf.into())
878}
879
880fn parse_message(buf: &[u8]) -> Result<Message::Message<'_>, ArrowError> {
884 let buf = match buf[..4] == CONTINUATION_MARKER {
885 true => &buf[8..],
886 false => &buf[4..],
887 };
888 crate::root_as_message(buf)
889 .map_err(|err| ArrowError::ParseError(format!("Unable to get root as message: {err:?}")))
890}
891
892pub fn read_footer_length(buf: [u8; 10]) -> Result<usize, ArrowError> {
896 if buf[4..] != super::ARROW_MAGIC {
897 return Err(ArrowError::ParseError(
898 "Arrow file does not contain correct footer".to_string(),
899 ));
900 }
901
902 let footer_len = i32::from_le_bytes(buf[..4].try_into().unwrap());
904 footer_len
905 .try_into()
906 .map_err(|_| ArrowError::ParseError(format!("Invalid footer length: {footer_len}")))
907}
908
909#[derive(Debug)]
974pub struct FileDecoder {
975 schema: SchemaRef,
976 dictionaries: HashMap<i64, ArrayRef>,
977 version: MetadataVersion,
978 projection: Option<Vec<usize>>,
979 require_alignment: bool,
980 skip_validation: UnsafeFlag,
981}
982
983impl FileDecoder {
984 pub fn new(schema: SchemaRef, version: MetadataVersion) -> Self {
986 Self {
987 schema,
988 version,
989 dictionaries: Default::default(),
990 projection: None,
991 require_alignment: false,
992 skip_validation: UnsafeFlag::new(),
993 }
994 }
995
996 pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
998 self.projection = Some(projection);
999 self
1000 }
1001
1002 pub fn with_require_alignment(mut self, require_alignment: bool) -> Self {
1015 self.require_alignment = require_alignment;
1016 self
1017 }
1018
1019 pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
1030 unsafe { self.skip_validation.set(skip_validation) };
1031 self
1032 }
1033
1034 fn read_message<'a>(&self, buf: &'a [u8]) -> Result<Message::Message<'a>, ArrowError> {
1035 let message = parse_message(buf)?;
1036
1037 if self.version != MetadataVersion::V1 && message.version() != self.version {
1039 return Err(ArrowError::IpcError(
1040 "Could not read IPC message as metadata versions mismatch".to_string(),
1041 ));
1042 }
1043 Ok(message)
1044 }
1045
1046 pub fn read_dictionary(&mut self, block: &Block, buf: &Buffer) -> Result<(), ArrowError> {
1048 let message = self.read_message(buf)?;
1049 match message.header_type() {
1050 crate::MessageHeader::DictionaryBatch => {
1051 let batch = message.header_as_dictionary_batch().unwrap();
1052 read_dictionary_impl(
1053 &buf.slice(block.metaDataLength() as _),
1054 batch,
1055 &self.schema,
1056 &mut self.dictionaries,
1057 &message.version(),
1058 self.require_alignment,
1059 self.skip_validation.clone(),
1060 )
1061 }
1062 t => Err(ArrowError::ParseError(format!(
1063 "Expecting DictionaryBatch in dictionary blocks, found {t:?}."
1064 ))),
1065 }
1066 }
1067
1068 pub fn read_record_batch(
1070 &self,
1071 block: &Block,
1072 buf: &Buffer,
1073 ) -> Result<Option<RecordBatch>, ArrowError> {
1074 let message = self.read_message(buf)?;
1075 match message.header_type() {
1076 crate::MessageHeader::Schema => Err(ArrowError::IpcError(
1077 "Not expecting a schema when messages are read".to_string(),
1078 )),
1079 crate::MessageHeader::RecordBatch => {
1080 let batch = message.header_as_record_batch().ok_or_else(|| {
1081 ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
1082 })?;
1083 RecordBatchDecoder::try_new(
1085 &buf.slice(block.metaDataLength() as _),
1086 batch,
1087 self.schema.clone(),
1088 &self.dictionaries,
1089 &message.version(),
1090 )?
1091 .with_projection(self.projection.as_deref())
1092 .with_require_alignment(self.require_alignment)
1093 .with_skip_validation(self.skip_validation.clone())
1094 .read_record_batch()
1095 .map(Some)
1096 }
1097 crate::MessageHeader::NONE => Ok(None),
1098 t => Err(ArrowError::InvalidArgumentError(format!(
1099 "Reading types other than record batches not yet supported, unable to read {t:?}"
1100 ))),
1101 }
1102 }
1103}
1104
1105#[derive(Debug)]
1107pub struct FileReaderBuilder {
1108 projection: Option<Vec<usize>>,
1110 max_footer_fb_tables: usize,
1112 max_footer_fb_depth: usize,
1114}
1115
1116impl Default for FileReaderBuilder {
1117 fn default() -> Self {
1118 let verifier_options = VerifierOptions::default();
1119 Self {
1120 max_footer_fb_tables: verifier_options.max_tables,
1121 max_footer_fb_depth: verifier_options.max_depth,
1122 projection: None,
1123 }
1124 }
1125}
1126
1127impl FileReaderBuilder {
1128 pub fn new() -> Self {
1132 Self::default()
1133 }
1134
1135 pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
1137 self.projection = Some(projection);
1138 self
1139 }
1140
1141 pub fn with_max_footer_fb_tables(mut self, max_footer_fb_tables: usize) -> Self {
1154 self.max_footer_fb_tables = max_footer_fb_tables;
1155 self
1156 }
1157
1158 pub fn with_max_footer_fb_depth(mut self, max_footer_fb_depth: usize) -> Self {
1171 self.max_footer_fb_depth = max_footer_fb_depth;
1172 self
1173 }
1174
1175 pub fn build<R: Read + Seek>(self, mut reader: R) -> Result<FileReader<R>, ArrowError> {
1177 let mut buffer = [0; 10];
1179 reader.seek(SeekFrom::End(-10))?;
1180 reader.read_exact(&mut buffer)?;
1181
1182 let footer_len = read_footer_length(buffer)?;
1183
1184 let mut footer_data = vec![0; footer_len];
1186 reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
1187 reader.read_exact(&mut footer_data)?;
1188
1189 let verifier_options = VerifierOptions {
1190 max_tables: self.max_footer_fb_tables,
1191 max_depth: self.max_footer_fb_depth,
1192 ..Default::default()
1193 };
1194 let footer = crate::root_as_footer_with_opts(&verifier_options, &footer_data[..]).map_err(
1195 |err| ArrowError::ParseError(format!("Unable to get root as footer: {err:?}")),
1196 )?;
1197
1198 let blocks = footer.recordBatches().ok_or_else(|| {
1199 ArrowError::ParseError("Unable to get record batches from IPC Footer".to_string())
1200 })?;
1201
1202 let total_blocks = blocks.len();
1203
1204 let ipc_schema = footer.schema().unwrap();
1205 if !ipc_schema.endianness().equals_to_target_endianness() {
1206 return Err(ArrowError::IpcError(
1207 "the endianness of the source system does not match the endianness of the target system.".to_owned()
1208 ));
1209 }
1210
1211 let schema = crate::convert::fb_to_schema(ipc_schema);
1212
1213 let mut custom_metadata = HashMap::new();
1214 if let Some(fb_custom_metadata) = footer.custom_metadata() {
1215 for kv in fb_custom_metadata.into_iter() {
1216 custom_metadata.insert(
1217 kv.key().unwrap().to_string(),
1218 kv.value().unwrap().to_string(),
1219 );
1220 }
1221 }
1222
1223 let mut decoder = FileDecoder::new(Arc::new(schema), footer.version());
1224 if let Some(projection) = self.projection {
1225 decoder = decoder.with_projection(projection)
1226 }
1227
1228 if let Some(dictionaries) = footer.dictionaries() {
1230 for block in dictionaries {
1231 let buf = read_block(&mut reader, block)?;
1232 decoder.read_dictionary(block, &buf)?;
1233 }
1234 }
1235
1236 Ok(FileReader {
1237 reader,
1238 blocks: blocks.iter().copied().collect(),
1239 current_block: 0,
1240 total_blocks,
1241 decoder,
1242 custom_metadata,
1243 })
1244 }
1245}
1246
1247pub struct FileReader<R> {
1292 reader: R,
1294
1295 decoder: FileDecoder,
1297
1298 blocks: Vec<Block>,
1302
1303 current_block: usize,
1305
1306 total_blocks: usize,
1308
1309 custom_metadata: HashMap<String, String>,
1311}
1312
1313impl<R> fmt::Debug for FileReader<R> {
1314 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1315 f.debug_struct("FileReader<R>")
1316 .field("decoder", &self.decoder)
1317 .field("blocks", &self.blocks)
1318 .field("current_block", &self.current_block)
1319 .field("total_blocks", &self.total_blocks)
1320 .finish_non_exhaustive()
1321 }
1322}
1323
1324impl<R: Read + Seek> FileReader<BufReader<R>> {
1325 pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1329 Self::try_new(BufReader::new(reader), projection)
1330 }
1331}
1332
1333impl<R: Read + Seek> FileReader<R> {
1334 pub fn try_new(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1345 let builder = FileReaderBuilder {
1346 projection,
1347 ..Default::default()
1348 };
1349 builder.build(reader)
1350 }
1351
1352 pub fn custom_metadata(&self) -> &HashMap<String, String> {
1354 &self.custom_metadata
1355 }
1356
1357 pub fn num_batches(&self) -> usize {
1359 self.total_blocks
1360 }
1361
1362 pub fn schema(&self) -> SchemaRef {
1364 self.decoder.schema.clone()
1365 }
1366
1367 pub fn set_index(&mut self, index: usize) -> Result<(), ArrowError> {
1371 if index >= self.total_blocks {
1372 Err(ArrowError::InvalidArgumentError(format!(
1373 "Cannot set batch to index {} from {} total batches",
1374 index, self.total_blocks
1375 )))
1376 } else {
1377 self.current_block = index;
1378 Ok(())
1379 }
1380 }
1381
1382 fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
1383 let block = &self.blocks[self.current_block];
1384 self.current_block += 1;
1385
1386 let buffer = read_block(&mut self.reader, block)?;
1388 self.decoder.read_record_batch(block, &buffer)
1389 }
1390
1391 pub fn get_ref(&self) -> &R {
1395 &self.reader
1396 }
1397
1398 pub fn get_mut(&mut self) -> &mut R {
1402 &mut self.reader
1403 }
1404
1405 pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
1411 self.decoder = unsafe { self.decoder.with_skip_validation(skip_validation) };
1412 self
1413 }
1414}
1415
1416impl<R: Read + Seek> Iterator for FileReader<R> {
1417 type Item = Result<RecordBatch, ArrowError>;
1418
1419 fn next(&mut self) -> Option<Self::Item> {
1420 if self.current_block < self.total_blocks {
1422 self.maybe_next().transpose()
1423 } else {
1424 None
1425 }
1426 }
1427}
1428
1429impl<R: Read + Seek> RecordBatchReader for FileReader<R> {
1430 fn schema(&self) -> SchemaRef {
1431 self.schema()
1432 }
1433}
1434
1435pub struct StreamReader<R> {
1469 reader: MessageReader<R>,
1471
1472 schema: SchemaRef,
1474
1475 dictionaries_by_id: HashMap<i64, ArrayRef>,
1479
1480 finished: bool,
1484
1485 projection: Option<(Vec<usize>, Schema)>,
1487
1488 skip_validation: UnsafeFlag,
1492}
1493
1494impl<R> fmt::Debug for StreamReader<R> {
1495 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
1496 f.debug_struct("StreamReader<R>")
1497 .field("reader", &"R")
1498 .field("schema", &self.schema)
1499 .field("dictionaries_by_id", &self.dictionaries_by_id)
1500 .field("finished", &self.finished)
1501 .field("projection", &self.projection)
1502 .finish()
1503 }
1504}
1505
1506impl<R: Read> StreamReader<BufReader<R>> {
1507 pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1511 Self::try_new(BufReader::new(reader), projection)
1512 }
1513}
1514
1515impl<R: Read> StreamReader<R> {
1516 pub fn try_new(
1528 reader: R,
1529 projection: Option<Vec<usize>>,
1530 ) -> Result<StreamReader<R>, ArrowError> {
1531 let mut msg_reader = MessageReader::new(reader);
1532 let message = msg_reader.maybe_next()?;
1533 let Some((message, _)) = message else {
1534 return Err(ArrowError::IpcError(
1535 "Expected schema message, found empty stream.".to_string(),
1536 ));
1537 };
1538
1539 if message.header_type() != Message::MessageHeader::Schema {
1540 return Err(ArrowError::IpcError(format!(
1541 "Expected a schema as the first message in the stream, got: {:?}",
1542 message.header_type()
1543 )));
1544 }
1545
1546 let schema = message.header_as_schema().ok_or_else(|| {
1547 ArrowError::ParseError("Failed to parse schema from message header".to_string())
1548 })?;
1549 let schema = crate::convert::fb_to_schema(schema);
1550
1551 let dictionaries_by_id = HashMap::new();
1553
1554 let projection = match projection {
1555 Some(projection_indices) => {
1556 let schema = schema.project(&projection_indices)?;
1557 Some((projection_indices, schema))
1558 }
1559 _ => None,
1560 };
1561
1562 Ok(Self {
1563 reader: msg_reader,
1564 schema: Arc::new(schema),
1565 finished: false,
1566 dictionaries_by_id,
1567 projection,
1568 skip_validation: UnsafeFlag::new(),
1569 })
1570 }
1571
1572 #[deprecated(since = "53.0.0", note = "use `try_new` instead")]
1574 pub fn try_new_unbuffered(
1575 reader: R,
1576 projection: Option<Vec<usize>>,
1577 ) -> Result<Self, ArrowError> {
1578 Self::try_new(reader, projection)
1579 }
1580
1581 pub fn schema(&self) -> SchemaRef {
1583 self.schema.clone()
1584 }
1585
1586 pub fn is_finished(&self) -> bool {
1588 self.finished
1589 }
1590
1591 fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
1592 if self.finished {
1593 return Ok(None);
1594 }
1595
1596 loop {
1598 let message = self.next_ipc_message()?;
1599 let Some(message) = message else {
1600 self.finished = true;
1602 return Ok(None);
1603 };
1604
1605 match message {
1606 IpcMessage::Schema(_) => {
1607 return Err(ArrowError::IpcError(
1608 "Expected a record batch, but found a schema".to_string(),
1609 ));
1610 }
1611 IpcMessage::RecordBatch(record_batch) => {
1612 return Ok(Some(record_batch));
1613 }
1614 IpcMessage::DictionaryBatch { .. } => {
1615 continue;
1616 }
1617 };
1618 }
1619 }
1620
1621 pub(crate) fn next_ipc_message(&mut self) -> Result<Option<IpcMessage>, ArrowError> {
1629 let message = self.reader.maybe_next()?;
1630 let Some((message, body)) = message else {
1631 return Ok(None);
1633 };
1634
1635 let ipc_message = match message.header_type() {
1636 Message::MessageHeader::Schema => {
1637 let schema = message.header_as_schema().ok_or_else(|| {
1638 ArrowError::ParseError("Failed to parse schema from message header".to_string())
1639 })?;
1640 let arrow_schema = crate::convert::fb_to_schema(schema);
1641 IpcMessage::Schema(arrow_schema)
1642 }
1643 Message::MessageHeader::RecordBatch => {
1644 let batch = message.header_as_record_batch().ok_or_else(|| {
1645 ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
1646 })?;
1647
1648 let version = message.version();
1649 let schema = self.schema.clone();
1650 let record_batch = RecordBatchDecoder::try_new(
1651 &body.into(),
1652 batch,
1653 schema,
1654 &self.dictionaries_by_id,
1655 &version,
1656 )?
1657 .with_projection(self.projection.as_ref().map(|x| x.0.as_ref()))
1658 .with_require_alignment(false)
1659 .with_skip_validation(self.skip_validation.clone())
1660 .read_record_batch()?;
1661 IpcMessage::RecordBatch(record_batch)
1662 }
1663 Message::MessageHeader::DictionaryBatch => {
1664 let dict = message.header_as_dictionary_batch().ok_or_else(|| {
1665 ArrowError::ParseError(
1666 "Failed to parse dictionary batch from message header".to_string(),
1667 )
1668 })?;
1669
1670 let version = message.version();
1671 let dict_values = get_dictionary_values(
1672 &body.into(),
1673 dict,
1674 &self.schema,
1675 &mut self.dictionaries_by_id,
1676 &version,
1677 false,
1678 self.skip_validation.clone(),
1679 )?;
1680
1681 update_dictionaries(
1682 &mut self.dictionaries_by_id,
1683 dict.isDelta(),
1684 dict.id(),
1685 dict_values.clone(),
1686 )?;
1687
1688 IpcMessage::DictionaryBatch {
1689 id: dict.id(),
1690 is_delta: (dict.isDelta()),
1691 values: (dict_values),
1692 }
1693 }
1694 x => {
1695 return Err(ArrowError::ParseError(format!(
1696 "Unsupported message header type in IPC stream: '{x:?}'"
1697 )));
1698 }
1699 };
1700
1701 Ok(Some(ipc_message))
1702 }
1703
1704 pub fn get_ref(&self) -> &R {
1708 self.reader.inner()
1709 }
1710
1711 pub fn get_mut(&mut self) -> &mut R {
1715 self.reader.inner_mut()
1716 }
1717
1718 pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
1724 unsafe { self.skip_validation.set(skip_validation) };
1725 self
1726 }
1727}
1728
1729impl<R: Read> Iterator for StreamReader<R> {
1730 type Item = Result<RecordBatch, ArrowError>;
1731
1732 fn next(&mut self) -> Option<Self::Item> {
1733 self.maybe_next().transpose()
1734 }
1735}
1736
1737impl<R: Read> RecordBatchReader for StreamReader<R> {
1738 fn schema(&self) -> SchemaRef {
1739 self.schema.clone()
1740 }
1741}
1742
1743#[derive(Debug)]
1749#[allow(dead_code)]
1750pub(crate) enum IpcMessage {
1751 Schema(arrow_schema::Schema),
1752 RecordBatch(RecordBatch),
1753 DictionaryBatch {
1754 id: i64,
1755 is_delta: bool,
1756 values: ArrayRef,
1757 },
1758}
1759
1760struct MessageReader<R> {
1763 reader: R,
1764 buf: Vec<u8>,
1765}
1766
1767impl<R: Read> MessageReader<R> {
1768 fn new(reader: R) -> Self {
1769 Self {
1770 reader,
1771 buf: Vec::new(),
1772 }
1773 }
1774
1775 fn maybe_next(&mut self) -> Result<Option<(Message::Message<'_>, MutableBuffer)>, ArrowError> {
1786 let meta_len = self.read_meta_len()?;
1787 let Some(meta_len) = meta_len else {
1788 return Ok(None);
1789 };
1790
1791 self.buf.resize(meta_len, 0);
1792 self.reader.read_exact(&mut self.buf)?;
1793
1794 let message = crate::root_as_message(self.buf.as_slice()).map_err(|err| {
1795 ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
1796 })?;
1797
1798 let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
1799 self.reader.read_exact(&mut buf)?;
1800
1801 Ok(Some((message, buf)))
1802 }
1803
1804 fn inner_mut(&mut self) -> &mut R {
1806 &mut self.reader
1807 }
1808
1809 fn inner(&self) -> &R {
1811 &self.reader
1812 }
1813
1814 pub fn read_meta_len(&mut self) -> Result<Option<usize>, ArrowError> {
1823 let mut meta_len: [u8; 4] = [0; 4];
1824 match self.reader.read_exact(&mut meta_len) {
1825 Ok(_) => {}
1826 Err(e) => {
1827 return if e.kind() == std::io::ErrorKind::UnexpectedEof {
1828 Ok(None)
1832 } else {
1833 Err(ArrowError::from(e))
1834 };
1835 }
1836 };
1837
1838 let meta_len = {
1839 if meta_len == CONTINUATION_MARKER {
1842 self.reader.read_exact(&mut meta_len)?;
1843 }
1844
1845 i32::from_le_bytes(meta_len)
1846 };
1847
1848 if meta_len == 0 {
1849 return Ok(None);
1850 }
1851
1852 let meta_len = usize::try_from(meta_len)
1853 .map_err(|_| ArrowError::ParseError(format!("Invalid metadata length: {meta_len}")))?;
1854
1855 Ok(Some(meta_len))
1856 }
1857}
1858
1859#[cfg(test)]
1860mod tests {
1861 use std::io::Cursor;
1862
1863 use crate::convert::fb_to_schema;
1864 use crate::writer::{
1865 DictionaryTracker, IpcDataGenerator, IpcWriteOptions, unslice_run_array, write_message,
1866 };
1867
1868 use super::*;
1869
1870 use crate::{root_as_footer, root_as_message, size_prefixed_root_as_message};
1871 use arrow_array::builder::{PrimitiveRunBuilder, UnionBuilder};
1872 use arrow_array::types::*;
1873 use arrow_buffer::{NullBuffer, OffsetBuffer};
1874 use arrow_data::ArrayDataBuilder;
1875
1876 fn create_test_projection_schema() -> Schema {
1877 let list_data_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1879
1880 let fixed_size_list_data_type =
1881 DataType::FixedSizeList(Arc::new(Field::new_list_field(DataType::Int32, false)), 3);
1882
1883 let union_fields = UnionFields::from_fields(vec![
1884 Field::new("a", DataType::Int32, false),
1885 Field::new("b", DataType::Float64, false),
1886 ]);
1887
1888 let union_data_type = DataType::Union(union_fields, UnionMode::Dense);
1889
1890 let struct_fields = Fields::from(vec![
1891 Field::new("id", DataType::Int32, false),
1892 Field::new_list("list", Field::new_list_field(DataType::Int8, true), false),
1893 ]);
1894 let struct_data_type = DataType::Struct(struct_fields);
1895
1896 let run_encoded_data_type = DataType::RunEndEncoded(
1897 Arc::new(Field::new("run_ends", DataType::Int16, false)),
1898 Arc::new(Field::new("values", DataType::Int32, true)),
1899 );
1900
1901 Schema::new(vec![
1903 Field::new("f0", DataType::UInt32, false),
1904 Field::new("f1", DataType::Utf8, false),
1905 Field::new("f2", DataType::Boolean, false),
1906 Field::new("f3", union_data_type, true),
1907 Field::new("f4", DataType::Null, true),
1908 Field::new("f5", DataType::Float64, true),
1909 Field::new("f6", list_data_type, false),
1910 Field::new("f7", DataType::FixedSizeBinary(3), true),
1911 Field::new("f8", fixed_size_list_data_type, false),
1912 Field::new("f9", struct_data_type, false),
1913 Field::new("f10", run_encoded_data_type, false),
1914 Field::new("f11", DataType::Boolean, false),
1915 Field::new_dictionary("f12", DataType::Int8, DataType::Utf8, false),
1916 Field::new("f13", DataType::Utf8, false),
1917 ])
1918 }
1919
1920 fn create_test_projection_batch_data(schema: &Schema) -> RecordBatch {
1921 let array0 = UInt32Array::from(vec![1, 2, 3]);
1923 let array1 = StringArray::from(vec!["foo", "bar", "baz"]);
1924 let array2 = BooleanArray::from(vec![true, false, true]);
1925
1926 let mut union_builder = UnionBuilder::new_dense();
1927 union_builder.append::<Int32Type>("a", 1).unwrap();
1928 union_builder.append::<Float64Type>("b", 10.1).unwrap();
1929 union_builder.append_null::<Float64Type>("b").unwrap();
1930 let array3 = union_builder.build().unwrap();
1931
1932 let array4 = NullArray::new(3);
1933 let array5 = Float64Array::from(vec![Some(1.1), None, Some(3.3)]);
1934 let array6_values = vec![
1935 Some(vec![Some(10), Some(10), Some(10)]),
1936 Some(vec![Some(20), Some(20), Some(20)]),
1937 Some(vec![Some(30), Some(30)]),
1938 ];
1939 let array6 = ListArray::from_iter_primitive::<Int32Type, _, _>(array6_values);
1940 let array7_values = vec![vec![11, 12, 13], vec![22, 23, 24], vec![33, 34, 35]];
1941 let array7 = FixedSizeBinaryArray::try_from_iter(array7_values.into_iter()).unwrap();
1942
1943 let array8_values = ArrayData::builder(DataType::Int32)
1944 .len(9)
1945 .add_buffer(Buffer::from_slice_ref([40, 41, 42, 43, 44, 45, 46, 47, 48]))
1946 .build()
1947 .unwrap();
1948 let array8_data = ArrayData::builder(schema.field(8).data_type().clone())
1949 .len(3)
1950 .add_child_data(array8_values)
1951 .build()
1952 .unwrap();
1953 let array8 = FixedSizeListArray::from(array8_data);
1954
1955 let array9_id: ArrayRef = Arc::new(Int32Array::from(vec![1001, 1002, 1003]));
1956 let array9_list: ArrayRef =
1957 Arc::new(ListArray::from_iter_primitive::<Int8Type, _, _>(vec![
1958 Some(vec![Some(-10)]),
1959 Some(vec![Some(-20), Some(-20), Some(-20)]),
1960 Some(vec![Some(-30)]),
1961 ]));
1962 let array9 = ArrayDataBuilder::new(schema.field(9).data_type().clone())
1963 .add_child_data(array9_id.into_data())
1964 .add_child_data(array9_list.into_data())
1965 .len(3)
1966 .build()
1967 .unwrap();
1968 let array9 = StructArray::from(array9);
1969
1970 let array10_input = vec![Some(1_i32), None, None];
1971 let mut array10_builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
1972 array10_builder.extend(array10_input);
1973 let array10 = array10_builder.finish();
1974
1975 let array11 = BooleanArray::from(vec![false, false, true]);
1976
1977 let array12_values = StringArray::from(vec!["x", "yy", "zzz"]);
1978 let array12_keys = Int8Array::from_iter_values([1, 1, 2]);
1979 let array12 = DictionaryArray::new(array12_keys, Arc::new(array12_values));
1980
1981 let array13 = StringArray::from(vec!["a", "bb", "ccc"]);
1982
1983 RecordBatch::try_new(
1985 Arc::new(schema.clone()),
1986 vec![
1987 Arc::new(array0),
1988 Arc::new(array1),
1989 Arc::new(array2),
1990 Arc::new(array3),
1991 Arc::new(array4),
1992 Arc::new(array5),
1993 Arc::new(array6),
1994 Arc::new(array7),
1995 Arc::new(array8),
1996 Arc::new(array9),
1997 Arc::new(array10),
1998 Arc::new(array11),
1999 Arc::new(array12),
2000 Arc::new(array13),
2001 ],
2002 )
2003 .unwrap()
2004 }
2005
2006 #[test]
2007 fn test_negative_meta_len_start_stream() {
2008 let bytes = i32::to_le_bytes(-1);
2009 let mut buf = vec![];
2010 buf.extend(CONTINUATION_MARKER);
2011 buf.extend(bytes);
2012
2013 let reader_err = StreamReader::try_new(Cursor::new(buf), None).err();
2014 assert!(reader_err.is_some());
2015 assert_eq!(
2016 reader_err.unwrap().to_string(),
2017 "Parser error: Invalid metadata length: -1"
2018 );
2019 }
2020
2021 #[test]
2022 fn test_negative_meta_len_mid_stream() {
2023 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2024 let mut buf = Vec::new();
2025 {
2026 let mut writer = crate::writer::StreamWriter::try_new(&mut buf, &schema).unwrap();
2027 let batch =
2028 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(Int32Array::from(vec![1]))])
2029 .unwrap();
2030 writer.write(&batch).unwrap();
2031 }
2032
2033 let bytes = i32::to_le_bytes(-1);
2034 buf.extend(CONTINUATION_MARKER);
2035 buf.extend(bytes);
2036
2037 let mut reader = StreamReader::try_new(Cursor::new(buf), None).unwrap();
2038 assert!(reader.maybe_next().is_ok());
2040 let batch_err = reader.maybe_next().err();
2042 assert!(batch_err.is_some());
2043 assert_eq!(
2044 batch_err.unwrap().to_string(),
2045 "Parser error: Invalid metadata length: -1"
2046 );
2047 }
2048
2049 #[test]
2050 fn test_missing_buffer_metadata_error() {
2051 use crate::r#gen::Message::*;
2052 use flatbuffers::FlatBufferBuilder;
2053
2054 let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Int32, true)]));
2055
2056 let mut fbb = FlatBufferBuilder::new();
2059 let nodes = fbb.create_vector(&[FieldNode::new(2, 0)]);
2060 let buffers = fbb.create_vector(&[crate::Buffer::new(0, 8)]);
2061 let batch_offset = RecordBatch::create(
2062 &mut fbb,
2063 &RecordBatchArgs {
2064 length: 2,
2065 nodes: Some(nodes),
2066 buffers: Some(buffers),
2067 compression: None,
2068 variadicBufferCounts: None,
2069 },
2070 );
2071 fbb.finish_minimal(batch_offset);
2072 let batch_bytes = fbb.finished_data().to_vec();
2073 let batch = flatbuffers::root::<RecordBatch>(&batch_bytes).unwrap();
2074
2075 let data_buffer = Buffer::from(vec![0u8; 8]);
2076 let dictionaries: HashMap<i64, ArrayRef> = HashMap::new();
2077 let metadata = MetadataVersion::V5;
2078
2079 let decoder = RecordBatchDecoder::try_new(
2080 &data_buffer,
2081 batch,
2082 schema.clone(),
2083 &dictionaries,
2084 &metadata,
2085 )
2086 .unwrap();
2087
2088 let result = decoder.read_record_batch();
2089
2090 match result {
2091 Err(ArrowError::IpcError(msg)) => {
2092 assert_eq!(msg, "Buffer count mismatched with metadata");
2093 }
2094 other => panic!("unexpected error: {other:?}"),
2095 }
2096 }
2097
2098 #[test]
2099 fn test_projection_array_values() {
2100 let schema = create_test_projection_schema();
2102
2103 let batch = create_test_projection_batch_data(&schema);
2105
2106 let mut buf = Vec::new();
2108 {
2109 let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
2110 writer.write(&batch).unwrap();
2111 writer.finish().unwrap();
2112 }
2113
2114 for index in 0..12 {
2116 let projection = vec![index];
2117 let reader = FileReader::try_new(std::io::Cursor::new(buf.clone()), Some(projection));
2118 let read_batch = reader.unwrap().next().unwrap().unwrap();
2119 let projected_column = read_batch.column(0);
2120 let expected_column = batch.column(index);
2121
2122 assert_eq!(projected_column.as_ref(), expected_column.as_ref());
2124 }
2125
2126 {
2127 let reader =
2129 FileReader::try_new(std::io::Cursor::new(buf.clone()), Some(vec![3, 2, 1]));
2130 let read_batch = reader.unwrap().next().unwrap().unwrap();
2131 let expected_batch = batch.project(&[3, 2, 1]).unwrap();
2132 assert_eq!(read_batch, expected_batch);
2133 }
2134 }
2135
2136 #[test]
2137 fn test_arrow_single_float_row() {
2138 let schema = Schema::new(vec![
2139 Field::new("a", DataType::Float32, false),
2140 Field::new("b", DataType::Float32, false),
2141 Field::new("c", DataType::Int32, false),
2142 Field::new("d", DataType::Int32, false),
2143 ]);
2144 let arrays = vec![
2145 Arc::new(Float32Array::from(vec![1.23])) as ArrayRef,
2146 Arc::new(Float32Array::from(vec![-6.50])) as ArrayRef,
2147 Arc::new(Int32Array::from(vec![2])) as ArrayRef,
2148 Arc::new(Int32Array::from(vec![1])) as ArrayRef,
2149 ];
2150 let batch = RecordBatch::try_new(Arc::new(schema.clone()), arrays).unwrap();
2151 let mut file = tempfile::tempfile().unwrap();
2153 let mut stream_writer = crate::writer::StreamWriter::try_new(&mut file, &schema).unwrap();
2154 stream_writer.write(&batch).unwrap();
2155 stream_writer.finish().unwrap();
2156
2157 drop(stream_writer);
2158
2159 file.rewind().unwrap();
2160
2161 let reader = StreamReader::try_new(&mut file, None).unwrap();
2163
2164 reader.for_each(|batch| {
2165 let batch = batch.unwrap();
2166 assert!(
2167 batch
2168 .column(0)
2169 .as_any()
2170 .downcast_ref::<Float32Array>()
2171 .unwrap()
2172 .value(0)
2173 != 0.0
2174 );
2175 assert!(
2176 batch
2177 .column(1)
2178 .as_any()
2179 .downcast_ref::<Float32Array>()
2180 .unwrap()
2181 .value(0)
2182 != 0.0
2183 );
2184 });
2185
2186 file.rewind().unwrap();
2187
2188 let reader = StreamReader::try_new(file, Some(vec![0, 3])).unwrap();
2190
2191 reader.for_each(|batch| {
2192 let batch = batch.unwrap();
2193 assert_eq!(batch.schema().fields().len(), 2);
2194 assert_eq!(batch.schema().fields()[0].data_type(), &DataType::Float32);
2195 assert_eq!(batch.schema().fields()[1].data_type(), &DataType::Int32);
2196 });
2197 }
2198
2199 fn write_ipc(rb: &RecordBatch) -> Vec<u8> {
2201 let mut buf = Vec::new();
2202 let mut writer = crate::writer::FileWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
2203 writer.write(rb).unwrap();
2204 writer.finish().unwrap();
2205 buf
2206 }
2207
2208 fn read_ipc(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2210 let mut reader = FileReader::try_new(std::io::Cursor::new(buf), None)?;
2211 reader.next().unwrap()
2212 }
2213
2214 fn read_ipc_skip_validation(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2217 let mut reader = unsafe {
2218 FileReader::try_new(std::io::Cursor::new(buf), None)?.with_skip_validation(true)
2219 };
2220 reader.next().unwrap()
2221 }
2222
2223 fn roundtrip_ipc(rb: &RecordBatch) -> RecordBatch {
2224 let buf = write_ipc(rb);
2225 read_ipc(&buf).unwrap()
2226 }
2227
2228 fn read_ipc_with_decoder(buf: Vec<u8>) -> Result<RecordBatch, ArrowError> {
2231 read_ipc_with_decoder_inner(buf, false)
2232 }
2233
2234 fn read_ipc_with_decoder_skip_validation(buf: Vec<u8>) -> Result<RecordBatch, ArrowError> {
2237 read_ipc_with_decoder_inner(buf, true)
2238 }
2239
2240 fn read_ipc_with_decoder_inner(
2241 buf: Vec<u8>,
2242 skip_validation: bool,
2243 ) -> Result<RecordBatch, ArrowError> {
2244 let buffer = Buffer::from_vec(buf);
2245 let trailer_start = buffer.len() - 10;
2246 let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap())?;
2247 let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start])
2248 .map_err(|e| ArrowError::InvalidArgumentError(format!("Invalid footer: {e}")))?;
2249
2250 let schema = fb_to_schema(footer.schema().unwrap());
2251
2252 let mut decoder = unsafe {
2253 FileDecoder::new(Arc::new(schema), footer.version())
2254 .with_skip_validation(skip_validation)
2255 };
2256 for block in footer.dictionaries().iter().flatten() {
2258 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2259 let data = buffer.slice_with_length(block.offset() as _, block_len);
2260 decoder.read_dictionary(block, &data)?
2261 }
2262
2263 let batches = footer.recordBatches().unwrap();
2265 assert_eq!(batches.len(), 1); let block = batches.get(0);
2268 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2269 let data = buffer.slice_with_length(block.offset() as _, block_len);
2270 Ok(decoder.read_record_batch(block, &data)?.unwrap())
2271 }
2272
2273 fn write_stream(rb: &RecordBatch) -> Vec<u8> {
2275 let mut buf = Vec::new();
2276 let mut writer = crate::writer::StreamWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
2277 writer.write(rb).unwrap();
2278 writer.finish().unwrap();
2279 buf
2280 }
2281
2282 fn read_stream(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2284 let mut reader = StreamReader::try_new(std::io::Cursor::new(buf), None)?;
2285 reader.next().unwrap()
2286 }
2287
2288 fn read_stream_skip_validation(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2291 let mut reader = unsafe {
2292 StreamReader::try_new(std::io::Cursor::new(buf), None)?.with_skip_validation(true)
2293 };
2294 reader.next().unwrap()
2295 }
2296
2297 fn roundtrip_ipc_stream(rb: &RecordBatch) -> RecordBatch {
2298 let buf = write_stream(rb);
2299 read_stream(&buf).unwrap()
2300 }
2301
2302 #[test]
2303 fn test_roundtrip_with_custom_metadata() {
2304 let schema = Schema::new(vec![Field::new("dummy", DataType::Float64, false)]);
2305 let mut buf = Vec::new();
2306 let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
2307 let mut test_metadata = HashMap::new();
2308 test_metadata.insert("abc".to_string(), "abc".to_string());
2309 test_metadata.insert("def".to_string(), "def".to_string());
2310 for (k, v) in &test_metadata {
2311 writer.write_metadata(k, v);
2312 }
2313 writer.finish().unwrap();
2314 drop(writer);
2315
2316 let reader = crate::reader::FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
2317 assert_eq!(reader.custom_metadata(), &test_metadata);
2318 }
2319
2320 #[test]
2321 fn test_roundtrip_nested_dict() {
2322 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2323
2324 let array = Arc::new(inner) as ArrayRef;
2325
2326 let dctfield = Arc::new(Field::new("dict", array.data_type().clone(), false));
2327
2328 let s = StructArray::from(vec![(dctfield, array)]);
2329 let struct_array = Arc::new(s) as ArrayRef;
2330
2331 let schema = Arc::new(Schema::new(vec![Field::new(
2332 "struct",
2333 struct_array.data_type().clone(),
2334 false,
2335 )]));
2336
2337 let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2338
2339 assert_eq!(batch, roundtrip_ipc(&batch));
2340 }
2341
2342 #[test]
2343 fn test_roundtrip_nested_dict_no_preserve_dict_id() {
2344 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2345
2346 let array = Arc::new(inner) as ArrayRef;
2347
2348 let dctfield = Arc::new(Field::new("dict", array.data_type().clone(), false));
2349
2350 let s = StructArray::from(vec![(dctfield, array)]);
2351 let struct_array = Arc::new(s) as ArrayRef;
2352
2353 let schema = Arc::new(Schema::new(vec![Field::new(
2354 "struct",
2355 struct_array.data_type().clone(),
2356 false,
2357 )]));
2358
2359 let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2360
2361 let mut buf = Vec::new();
2362 let mut writer = crate::writer::FileWriter::try_new_with_options(
2363 &mut buf,
2364 batch.schema_ref(),
2365 IpcWriteOptions::default(),
2366 )
2367 .unwrap();
2368 writer.write(&batch).unwrap();
2369 writer.finish().unwrap();
2370 drop(writer);
2371
2372 let mut reader = FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
2373
2374 assert_eq!(batch, reader.next().unwrap().unwrap());
2375 }
2376
2377 fn check_union_with_builder(mut builder: UnionBuilder) {
2378 builder.append::<Int32Type>("a", 1).unwrap();
2379 builder.append_null::<Int32Type>("a").unwrap();
2380 builder.append::<Float64Type>("c", 3.0).unwrap();
2381 builder.append::<Int32Type>("a", 4).unwrap();
2382 builder.append::<Int64Type>("d", 11).unwrap();
2383 let union = builder.build().unwrap();
2384
2385 let schema = Arc::new(Schema::new(vec![Field::new(
2386 "union",
2387 union.data_type().clone(),
2388 false,
2389 )]));
2390
2391 let union_array = Arc::new(union) as ArrayRef;
2392
2393 let rb = RecordBatch::try_new(schema, vec![union_array]).unwrap();
2394 let rb2 = roundtrip_ipc(&rb);
2395 assert_eq!(rb.schema(), rb2.schema());
2398 assert_eq!(rb.num_columns(), rb2.num_columns());
2399 assert_eq!(rb.num_rows(), rb2.num_rows());
2400 let union1 = rb.column(0);
2401 let union2 = rb2.column(0);
2402
2403 assert_eq!(union1, union2);
2404 }
2405
2406 #[test]
2407 fn test_roundtrip_dense_union() {
2408 check_union_with_builder(UnionBuilder::new_dense());
2409 }
2410
2411 #[test]
2412 fn test_roundtrip_sparse_union() {
2413 check_union_with_builder(UnionBuilder::new_sparse());
2414 }
2415
2416 #[test]
2417 fn test_roundtrip_struct_empty_fields() {
2418 let nulls = NullBuffer::from(&[true, true, false]);
2419 let rb = RecordBatch::try_from_iter([(
2420 "",
2421 Arc::new(StructArray::new_empty_fields(nulls.len(), Some(nulls))) as _,
2422 )])
2423 .unwrap();
2424 let rb2 = roundtrip_ipc(&rb);
2425 assert_eq!(rb, rb2);
2426 }
2427
2428 #[test]
2429 fn test_roundtrip_stream_run_array_sliced() {
2430 let run_array_1: Int32RunArray = vec!["a", "a", "a", "b", "b", "c", "c", "c"]
2431 .into_iter()
2432 .collect();
2433 let run_array_1_sliced = run_array_1.slice(2, 5);
2434
2435 let run_array_2_inupt = vec![Some(1_i32), None, None, Some(2), Some(2)];
2436 let mut run_array_2_builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
2437 run_array_2_builder.extend(run_array_2_inupt);
2438 let run_array_2 = run_array_2_builder.finish();
2439
2440 let schema = Arc::new(Schema::new(vec![
2441 Field::new(
2442 "run_array_1_sliced",
2443 run_array_1_sliced.data_type().clone(),
2444 false,
2445 ),
2446 Field::new("run_array_2", run_array_2.data_type().clone(), false),
2447 ]));
2448 let input_batch = RecordBatch::try_new(
2449 schema,
2450 vec![Arc::new(run_array_1_sliced.clone()), Arc::new(run_array_2)],
2451 )
2452 .unwrap();
2453 let output_batch = roundtrip_ipc_stream(&input_batch);
2454
2455 assert_eq!(input_batch.column(1), output_batch.column(1));
2459
2460 let run_array_1_unsliced = unslice_run_array(run_array_1_sliced.into_data()).unwrap();
2461 assert_eq!(run_array_1_unsliced, output_batch.column(0).into_data());
2462 }
2463
2464 #[test]
2465 fn test_roundtrip_stream_nested_dict() {
2466 let xs = vec!["AA", "BB", "AA", "CC", "BB"];
2467 let dict = Arc::new(
2468 xs.clone()
2469 .into_iter()
2470 .collect::<DictionaryArray<Int8Type>>(),
2471 );
2472 let string_array: ArrayRef = Arc::new(StringArray::from(xs.clone()));
2473 let struct_array = StructArray::from(vec![
2474 (
2475 Arc::new(Field::new("f2.1", DataType::Utf8, false)),
2476 string_array,
2477 ),
2478 (
2479 Arc::new(Field::new("f2.2_struct", dict.data_type().clone(), false)),
2480 dict.clone() as ArrayRef,
2481 ),
2482 ]);
2483 let schema = Arc::new(Schema::new(vec![
2484 Field::new("f1_string", DataType::Utf8, false),
2485 Field::new("f2_struct", struct_array.data_type().clone(), false),
2486 ]));
2487 let input_batch = RecordBatch::try_new(
2488 schema,
2489 vec![
2490 Arc::new(StringArray::from(xs.clone())),
2491 Arc::new(struct_array),
2492 ],
2493 )
2494 .unwrap();
2495 let output_batch = roundtrip_ipc_stream(&input_batch);
2496 assert_eq!(input_batch, output_batch);
2497 }
2498
2499 #[test]
2500 fn test_roundtrip_stream_nested_dict_of_map_of_dict() {
2501 let values = StringArray::from(vec![Some("a"), None, Some("b"), Some("c")]);
2502 let values = Arc::new(values) as ArrayRef;
2503 let value_dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 3, 1]);
2504 let value_dict_array = DictionaryArray::new(value_dict_keys, values.clone());
2505
2506 let key_dict_keys = Int8Array::from_iter_values([0, 0, 2, 1, 1, 3]);
2507 let key_dict_array = DictionaryArray::new(key_dict_keys, values);
2508
2509 #[allow(deprecated)]
2510 let keys_field = Arc::new(Field::new_dict(
2511 "keys",
2512 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2513 true, 1,
2515 false,
2516 ));
2517 #[allow(deprecated)]
2518 let values_field = Arc::new(Field::new_dict(
2519 "values",
2520 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2521 true,
2522 2,
2523 false,
2524 ));
2525 let entry_struct = StructArray::from(vec![
2526 (keys_field, make_array(key_dict_array.into_data())),
2527 (values_field, make_array(value_dict_array.into_data())),
2528 ]);
2529 let map_data_type = DataType::Map(
2530 Arc::new(Field::new(
2531 "entries",
2532 entry_struct.data_type().clone(),
2533 false,
2534 )),
2535 false,
2536 );
2537
2538 let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 6]);
2539 let map_data = ArrayData::builder(map_data_type)
2540 .len(3)
2541 .add_buffer(entry_offsets)
2542 .add_child_data(entry_struct.into_data())
2543 .build()
2544 .unwrap();
2545 let map_array = MapArray::from(map_data);
2546
2547 let dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 2, 1]);
2548 let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array));
2549
2550 let schema = Arc::new(Schema::new(vec![Field::new(
2551 "f1",
2552 dict_dict_array.data_type().clone(),
2553 false,
2554 )]));
2555 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2556 let output_batch = roundtrip_ipc_stream(&input_batch);
2557 assert_eq!(input_batch, output_batch);
2558 }
2559
2560 fn test_roundtrip_stream_dict_of_list_of_dict_impl<
2561 OffsetSize: OffsetSizeTrait,
2562 U: ArrowNativeType,
2563 >(
2564 list_data_type: DataType,
2565 offsets: &[U; 5],
2566 ) {
2567 let values = StringArray::from(vec![Some("a"), None, Some("c"), None]);
2568 let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
2569 let dict_array = DictionaryArray::new(keys, Arc::new(values));
2570 let dict_data = dict_array.to_data();
2571
2572 let value_offsets = Buffer::from_slice_ref(offsets);
2573
2574 let list_data = ArrayData::builder(list_data_type)
2575 .len(4)
2576 .add_buffer(value_offsets)
2577 .add_child_data(dict_data)
2578 .build()
2579 .unwrap();
2580 let list_array = GenericListArray::<OffsetSize>::from(list_data);
2581
2582 let keys_for_dict = Int8Array::from_iter_values([0, 3, 0, 1, 1, 2, 0, 1, 3]);
2583 let dict_dict_array = DictionaryArray::new(keys_for_dict, Arc::new(list_array));
2584
2585 let schema = Arc::new(Schema::new(vec![Field::new(
2586 "f1",
2587 dict_dict_array.data_type().clone(),
2588 false,
2589 )]));
2590 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2591 let output_batch = roundtrip_ipc_stream(&input_batch);
2592 assert_eq!(input_batch, output_batch);
2593 }
2594
2595 #[test]
2596 fn test_roundtrip_stream_dict_of_list_of_dict() {
2597 #[allow(deprecated)]
2599 let list_data_type = DataType::List(Arc::new(Field::new_dict(
2600 "item",
2601 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2602 true,
2603 1,
2604 false,
2605 )));
2606 let offsets: &[i32; 5] = &[0, 2, 4, 4, 6];
2607 test_roundtrip_stream_dict_of_list_of_dict_impl::<i32, i32>(list_data_type, offsets);
2608
2609 #[allow(deprecated)]
2611 let list_data_type = DataType::LargeList(Arc::new(Field::new_dict(
2612 "item",
2613 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2614 true,
2615 1,
2616 false,
2617 )));
2618 let offsets: &[i64; 5] = &[0, 2, 4, 4, 7];
2619 test_roundtrip_stream_dict_of_list_of_dict_impl::<i64, i64>(list_data_type, offsets);
2620 }
2621
2622 #[test]
2623 fn test_roundtrip_stream_dict_of_fixed_size_list_of_dict() {
2624 let values = StringArray::from(vec![Some("a"), None, Some("c"), None]);
2625 let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3, 1, 2]);
2626 let dict_array = DictionaryArray::new(keys, Arc::new(values));
2627 let dict_data = dict_array.into_data();
2628
2629 #[allow(deprecated)]
2630 let list_data_type = DataType::FixedSizeList(
2631 Arc::new(Field::new_dict(
2632 "item",
2633 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2634 true,
2635 1,
2636 false,
2637 )),
2638 3,
2639 );
2640 let list_data = ArrayData::builder(list_data_type)
2641 .len(3)
2642 .add_child_data(dict_data)
2643 .build()
2644 .unwrap();
2645 let list_array = FixedSizeListArray::from(list_data);
2646
2647 let keys_for_dict = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]);
2648 let dict_dict_array = DictionaryArray::new(keys_for_dict, Arc::new(list_array));
2649
2650 let schema = Arc::new(Schema::new(vec![Field::new(
2651 "f1",
2652 dict_dict_array.data_type().clone(),
2653 false,
2654 )]));
2655 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2656 let output_batch = roundtrip_ipc_stream(&input_batch);
2657 assert_eq!(input_batch, output_batch);
2658 }
2659
2660 const LONG_TEST_STRING: &str =
2661 "This is a long string to make sure binary view array handles it";
2662
2663 #[test]
2664 fn test_roundtrip_view_types() {
2665 let schema = Schema::new(vec![
2666 Field::new("field_1", DataType::BinaryView, true),
2667 Field::new("field_2", DataType::Utf8, true),
2668 Field::new("field_3", DataType::Utf8View, true),
2669 ]);
2670 let bin_values: Vec<Option<&[u8]>> = vec![
2671 Some(b"foo"),
2672 None,
2673 Some(b"bar"),
2674 Some(LONG_TEST_STRING.as_bytes()),
2675 ];
2676 let utf8_values: Vec<Option<&str>> =
2677 vec![Some("foo"), None, Some("bar"), Some(LONG_TEST_STRING)];
2678 let bin_view_array = BinaryViewArray::from_iter(bin_values);
2679 let utf8_array = StringArray::from_iter(utf8_values.iter());
2680 let utf8_view_array = StringViewArray::from_iter(utf8_values);
2681 let record_batch = RecordBatch::try_new(
2682 Arc::new(schema.clone()),
2683 vec![
2684 Arc::new(bin_view_array),
2685 Arc::new(utf8_array),
2686 Arc::new(utf8_view_array),
2687 ],
2688 )
2689 .unwrap();
2690
2691 assert_eq!(record_batch, roundtrip_ipc(&record_batch));
2692 assert_eq!(record_batch, roundtrip_ipc_stream(&record_batch));
2693
2694 let sliced_batch = record_batch.slice(1, 2);
2695 assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
2696 assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
2697 }
2698
2699 #[test]
2700 fn test_roundtrip_view_types_nested_dict() {
2701 let bin_values: Vec<Option<&[u8]>> = vec![
2702 Some(b"foo"),
2703 None,
2704 Some(b"bar"),
2705 Some(LONG_TEST_STRING.as_bytes()),
2706 Some(b"field"),
2707 ];
2708 let utf8_values: Vec<Option<&str>> = vec![
2709 Some("foo"),
2710 None,
2711 Some("bar"),
2712 Some(LONG_TEST_STRING),
2713 Some("field"),
2714 ];
2715 let bin_view_array = Arc::new(BinaryViewArray::from_iter(bin_values));
2716 let utf8_view_array = Arc::new(StringViewArray::from_iter(utf8_values));
2717
2718 let key_dict_keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
2719 let key_dict_array = DictionaryArray::new(key_dict_keys, utf8_view_array.clone());
2720 #[allow(deprecated)]
2721 let keys_field = Arc::new(Field::new_dict(
2722 "keys",
2723 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8View)),
2724 true,
2725 1,
2726 false,
2727 ));
2728
2729 let value_dict_keys = Int8Array::from_iter_values([0, 3, 0, 1, 2, 0, 1]);
2730 let value_dict_array = DictionaryArray::new(value_dict_keys, bin_view_array);
2731 #[allow(deprecated)]
2732 let values_field = Arc::new(Field::new_dict(
2733 "values",
2734 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::BinaryView)),
2735 true,
2736 2,
2737 false,
2738 ));
2739 let entry_struct = StructArray::from(vec![
2740 (keys_field, make_array(key_dict_array.into_data())),
2741 (values_field, make_array(value_dict_array.into_data())),
2742 ]);
2743
2744 let map_data_type = DataType::Map(
2745 Arc::new(Field::new(
2746 "entries",
2747 entry_struct.data_type().clone(),
2748 false,
2749 )),
2750 false,
2751 );
2752 let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 7]);
2753 let map_data = ArrayData::builder(map_data_type)
2754 .len(3)
2755 .add_buffer(entry_offsets)
2756 .add_child_data(entry_struct.into_data())
2757 .build()
2758 .unwrap();
2759 let map_array = MapArray::from(map_data);
2760
2761 let dict_keys = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]);
2762 let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array));
2763 let schema = Arc::new(Schema::new(vec![Field::new(
2764 "f1",
2765 dict_dict_array.data_type().clone(),
2766 false,
2767 )]));
2768 let batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2769 assert_eq!(batch, roundtrip_ipc(&batch));
2770 assert_eq!(batch, roundtrip_ipc_stream(&batch));
2771
2772 let sliced_batch = batch.slice(1, 2);
2773 assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
2774 assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
2775 }
2776
2777 #[test]
2778 fn test_no_columns_batch() {
2779 let schema = Arc::new(Schema::empty());
2780 let options = RecordBatchOptions::new()
2781 .with_match_field_names(true)
2782 .with_row_count(Some(10));
2783 let input_batch = RecordBatch::try_new_with_options(schema, vec![], &options).unwrap();
2784 let output_batch = roundtrip_ipc_stream(&input_batch);
2785 assert_eq!(input_batch, output_batch);
2786 }
2787
2788 #[test]
2789 fn test_unaligned() {
2790 let batch = RecordBatch::try_from_iter(vec![(
2791 "i32",
2792 Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
2793 )])
2794 .unwrap();
2795
2796 let r#gen = IpcDataGenerator {};
2797 let mut dict_tracker = DictionaryTracker::new(false);
2798 let (_, encoded) = r#gen
2799 .encode(
2800 &batch,
2801 &mut dict_tracker,
2802 &Default::default(),
2803 &mut Default::default(),
2804 )
2805 .unwrap();
2806
2807 let message = root_as_message(&encoded.ipc_message).unwrap();
2808
2809 let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
2811 buffer.push(0_u8);
2812 buffer.extend_from_slice(&encoded.arrow_data);
2813 let b = Buffer::from(buffer).slice(1);
2814 assert_ne!(b.as_ptr().align_offset(8), 0);
2815
2816 let ipc_batch = message.header_as_record_batch().unwrap();
2817 let roundtrip = RecordBatchDecoder::try_new(
2818 &b,
2819 ipc_batch,
2820 batch.schema(),
2821 &Default::default(),
2822 &message.version(),
2823 )
2824 .unwrap()
2825 .with_require_alignment(false)
2826 .read_record_batch()
2827 .unwrap();
2828 assert_eq!(batch, roundtrip);
2829 }
2830
2831 #[test]
2832 fn test_unaligned_throws_error_with_require_alignment() {
2833 let batch = RecordBatch::try_from_iter(vec![(
2834 "i32",
2835 Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
2836 )])
2837 .unwrap();
2838
2839 let r#gen = IpcDataGenerator {};
2840 let mut dict_tracker = DictionaryTracker::new(false);
2841 let (_, encoded) = r#gen
2842 .encode(
2843 &batch,
2844 &mut dict_tracker,
2845 &Default::default(),
2846 &mut Default::default(),
2847 )
2848 .unwrap();
2849
2850 let message = root_as_message(&encoded.ipc_message).unwrap();
2851
2852 let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
2854 buffer.push(0_u8);
2855 buffer.extend_from_slice(&encoded.arrow_data);
2856 let b = Buffer::from(buffer).slice(1);
2857 assert_ne!(b.as_ptr().align_offset(8), 0);
2858
2859 let ipc_batch = message.header_as_record_batch().unwrap();
2860 let result = RecordBatchDecoder::try_new(
2861 &b,
2862 ipc_batch,
2863 batch.schema(),
2864 &Default::default(),
2865 &message.version(),
2866 )
2867 .unwrap()
2868 .with_require_alignment(true)
2869 .read_record_batch();
2870
2871 let error = result.unwrap_err();
2872 assert_eq!(
2873 error.to_string(),
2874 "Invalid argument error: Misaligned buffers[0] in array of type Int32, \
2875 offset from expected alignment of 4 by 1"
2876 );
2877 }
2878
2879 #[test]
2880 fn test_file_with_massive_column_count() {
2881 let limit = 600_000;
2883
2884 let fields = (0..limit)
2885 .map(|i| Field::new(format!("{i}"), DataType::Boolean, false))
2886 .collect::<Vec<_>>();
2887 let schema = Arc::new(Schema::new(fields));
2888 let batch = RecordBatch::new_empty(schema);
2889
2890 let mut buf = Vec::new();
2891 let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
2892 writer.write(&batch).unwrap();
2893 writer.finish().unwrap();
2894 drop(writer);
2895
2896 let mut reader = FileReaderBuilder::new()
2897 .with_max_footer_fb_tables(1_500_000)
2898 .build(std::io::Cursor::new(buf))
2899 .unwrap();
2900 let roundtrip_batch = reader.next().unwrap().unwrap();
2901
2902 assert_eq!(batch, roundtrip_batch);
2903 }
2904
2905 #[test]
2906 fn test_file_with_deeply_nested_columns() {
2907 let limit = 61;
2909
2910 let fields = (0..limit).fold(
2911 vec![Field::new("leaf", DataType::Boolean, false)],
2912 |field, index| vec![Field::new_struct(format!("{index}"), field, false)],
2913 );
2914 let schema = Arc::new(Schema::new(fields));
2915 let batch = RecordBatch::new_empty(schema);
2916
2917 let mut buf = Vec::new();
2918 let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
2919 writer.write(&batch).unwrap();
2920 writer.finish().unwrap();
2921 drop(writer);
2922
2923 let mut reader = FileReaderBuilder::new()
2924 .with_max_footer_fb_depth(65)
2925 .build(std::io::Cursor::new(buf))
2926 .unwrap();
2927 let roundtrip_batch = reader.next().unwrap().unwrap();
2928
2929 assert_eq!(batch, roundtrip_batch);
2930 }
2931
2932 #[test]
2933 fn test_invalid_struct_array_ipc_read_errors() {
2934 let a_field = Field::new("a", DataType::Int32, false);
2935 let b_field = Field::new("b", DataType::Int32, false);
2936 let struct_fields = Fields::from(vec![a_field.clone(), b_field.clone()]);
2937
2938 let a_array_data = ArrayData::builder(a_field.data_type().clone())
2939 .len(4)
2940 .add_buffer(Buffer::from_slice_ref([1, 2, 3, 4]))
2941 .build()
2942 .unwrap();
2943 let b_array_data = ArrayData::builder(b_field.data_type().clone())
2944 .len(3)
2945 .add_buffer(Buffer::from_slice_ref([5, 6, 7]))
2946 .build()
2947 .unwrap();
2948
2949 let invalid_struct_arr = unsafe {
2950 StructArray::new_unchecked(
2951 struct_fields,
2952 vec![make_array(a_array_data), make_array(b_array_data)],
2953 None,
2954 )
2955 };
2956
2957 expect_ipc_validation_error(
2958 Arc::new(invalid_struct_arr),
2959 "Invalid argument error: Incorrect array length for StructArray field \"b\", expected 4 got 3",
2960 );
2961 }
2962
2963 #[test]
2964 fn test_invalid_nested_array_ipc_read_errors() {
2965 let a_field = Field::new("a", DataType::Int32, false);
2967 let b_field = Field::new("b", DataType::Utf8, false);
2968
2969 let schema = Arc::new(Schema::new(vec![Field::new_struct(
2970 "s",
2971 vec![a_field.clone(), b_field.clone()],
2972 false,
2973 )]));
2974
2975 let a_array_data = ArrayData::builder(a_field.data_type().clone())
2976 .len(4)
2977 .add_buffer(Buffer::from_slice_ref([1, 2, 3, 4]))
2978 .build()
2979 .unwrap();
2980 let b_array_data = {
2982 let valid: &[u8] = b" ";
2983 let mut invalid = vec![];
2984 invalid.extend_from_slice(b"ValidString");
2985 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
2986 let binary_array =
2987 BinaryArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
2988 let array = unsafe {
2989 StringArray::new_unchecked(
2990 binary_array.offsets().clone(),
2991 binary_array.values().clone(),
2992 binary_array.nulls().cloned(),
2993 )
2994 };
2995 array.into_data()
2996 };
2997 let struct_data_type = schema.field(0).data_type();
2998
2999 let invalid_struct_arr = unsafe {
3000 make_array(
3001 ArrayData::builder(struct_data_type.clone())
3002 .len(4)
3003 .add_child_data(a_array_data)
3004 .add_child_data(b_array_data)
3005 .build_unchecked(),
3006 )
3007 };
3008 expect_ipc_validation_error(
3009 invalid_struct_arr,
3010 "Invalid argument error: Invalid UTF8 sequence at string index 3 (3..18): invalid utf-8 sequence of 1 bytes from index 11",
3011 );
3012 }
3013
3014 #[test]
3015 fn test_same_dict_id_without_preserve() {
3016 let batch = RecordBatch::try_new(
3017 Arc::new(Schema::new(
3018 ["a", "b"]
3019 .iter()
3020 .map(|name| {
3021 #[allow(deprecated)]
3022 Field::new_dict(
3023 name.to_string(),
3024 DataType::Dictionary(
3025 Box::new(DataType::Int32),
3026 Box::new(DataType::Utf8),
3027 ),
3028 true,
3029 0,
3030 false,
3031 )
3032 })
3033 .collect::<Vec<Field>>(),
3034 )),
3035 vec![
3036 Arc::new(
3037 vec![Some("c"), Some("d")]
3038 .into_iter()
3039 .collect::<DictionaryArray<Int32Type>>(),
3040 ) as ArrayRef,
3041 Arc::new(
3042 vec![Some("e"), Some("f")]
3043 .into_iter()
3044 .collect::<DictionaryArray<Int32Type>>(),
3045 ) as ArrayRef,
3046 ],
3047 )
3048 .expect("Failed to create RecordBatch");
3049
3050 let mut buf = vec![];
3052 {
3053 let mut writer = crate::writer::StreamWriter::try_new_with_options(
3054 &mut buf,
3055 batch.schema().as_ref(),
3056 crate::writer::IpcWriteOptions::default(),
3057 )
3058 .expect("Failed to create StreamWriter");
3059 writer.write(&batch).expect("Failed to write RecordBatch");
3060 writer.finish().expect("Failed to finish StreamWriter");
3061 }
3062
3063 StreamReader::try_new(std::io::Cursor::new(buf), None)
3064 .expect("Failed to create StreamReader")
3065 .for_each(|decoded_batch| {
3066 assert_eq!(decoded_batch.expect("Failed to read RecordBatch"), batch);
3067 });
3068 }
3069
3070 #[test]
3071 fn test_validation_of_invalid_list_array() {
3072 let array = unsafe {
3074 let values = Int32Array::from(vec![1, 2, 3]);
3075 let bad_offsets = ScalarBuffer::<i32>::from(vec![0, 2, 4, 2]); let offsets = OffsetBuffer::new_unchecked(bad_offsets); let field = Field::new_list_field(DataType::Int32, true);
3078 let nulls = None;
3079 ListArray::new(Arc::new(field), offsets, Arc::new(values), nulls)
3080 };
3081
3082 expect_ipc_validation_error(
3083 Arc::new(array),
3084 "Invalid argument error: Offset invariant failure: offset at position 2 out of bounds: 4 > 2",
3085 );
3086 }
3087
3088 #[test]
3089 fn test_validation_of_invalid_string_array() {
3090 let valid: &[u8] = b" ";
3091 let mut invalid = vec![];
3092 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3093 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3094 let binary_array = BinaryArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
3095 let array = unsafe {
3098 StringArray::new_unchecked(
3099 binary_array.offsets().clone(),
3100 binary_array.values().clone(),
3101 binary_array.nulls().cloned(),
3102 )
3103 };
3104 expect_ipc_validation_error(
3105 Arc::new(array),
3106 "Invalid argument error: Invalid UTF8 sequence at string index 3 (3..45): invalid utf-8 sequence of 1 bytes from index 38",
3107 );
3108 }
3109
3110 #[test]
3111 fn test_validation_of_invalid_string_view_array() {
3112 let valid: &[u8] = b" ";
3113 let mut invalid = vec![];
3114 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3115 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3116 let binary_view_array =
3117 BinaryViewArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
3118 let array = unsafe {
3121 StringViewArray::new_unchecked(
3122 binary_view_array.views().clone(),
3123 binary_view_array.data_buffers().to_vec(),
3124 binary_view_array.nulls().cloned(),
3125 )
3126 };
3127 expect_ipc_validation_error(
3128 Arc::new(array),
3129 "Invalid argument error: Encountered non-UTF-8 data at index 3: invalid utf-8 sequence of 1 bytes from index 38",
3130 );
3131 }
3132
3133 #[test]
3136 fn test_validation_of_invalid_dictionary_array() {
3137 let array = unsafe {
3138 let values = StringArray::from_iter_values(["a", "b", "c"]);
3139 let keys = Int32Array::from(vec![1, 200]); DictionaryArray::new_unchecked(keys, Arc::new(values))
3141 };
3142
3143 expect_ipc_validation_error(
3144 Arc::new(array),
3145 "Invalid argument error: Value at position 1 out of bounds: 200 (should be in [0, 2])",
3146 );
3147 }
3148
3149 #[test]
3150 fn test_validation_of_invalid_union_array() {
3151 let array = unsafe {
3152 let fields = UnionFields::try_new(
3153 vec![1, 3], vec![
3155 Field::new("a", DataType::Int32, false),
3156 Field::new("b", DataType::Utf8, false),
3157 ],
3158 )
3159 .unwrap();
3160 let type_ids = ScalarBuffer::from(vec![1i8, 2, 3]); let offsets = None;
3162 let children: Vec<ArrayRef> = vec![
3163 Arc::new(Int32Array::from(vec![10, 20, 30])),
3164 Arc::new(StringArray::from(vec![Some("a"), Some("b"), Some("c")])),
3165 ];
3166
3167 UnionArray::new_unchecked(fields, type_ids, offsets, children)
3168 };
3169
3170 expect_ipc_validation_error(
3171 Arc::new(array),
3172 "Invalid argument error: Type Ids values must match one of the field type ids",
3173 );
3174 }
3175
3176 const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3179
3180 fn expect_ipc_validation_error(array: ArrayRef, expected_err: &str) {
3182 let rb = RecordBatch::try_from_iter([("a", array)]).unwrap();
3183
3184 let buf = write_stream(&rb); read_stream_skip_validation(&buf).unwrap();
3187 let err = read_stream(&buf).unwrap_err();
3188 assert_eq!(err.to_string(), expected_err);
3189
3190 let buf = write_ipc(&rb); read_ipc_skip_validation(&buf).unwrap();
3193 let err = read_ipc(&buf).unwrap_err();
3194 assert_eq!(err.to_string(), expected_err);
3195
3196 read_ipc_with_decoder_skip_validation(buf.clone()).unwrap();
3198 let err = read_ipc_with_decoder(buf).unwrap_err();
3199 assert_eq!(err.to_string(), expected_err);
3200 }
3201
3202 #[test]
3203 fn test_roundtrip_schema() {
3204 let schema = Schema::new(vec![
3205 Field::new(
3206 "a",
3207 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3208 false,
3209 ),
3210 Field::new(
3211 "b",
3212 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3213 false,
3214 ),
3215 ]);
3216
3217 let options = IpcWriteOptions::default();
3218 let data_gen = IpcDataGenerator::default();
3219 let mut dict_tracker = DictionaryTracker::new(false);
3220 let encoded_data =
3221 data_gen.schema_to_bytes_with_dictionary_tracker(&schema, &mut dict_tracker, &options);
3222 let mut schema_bytes = vec![];
3223 write_message(&mut schema_bytes, encoded_data, &options).expect("write_message");
3224
3225 let begin_offset: usize = if schema_bytes[0..4].eq(&CONTINUATION_MARKER) {
3226 4
3227 } else {
3228 0
3229 };
3230
3231 size_prefixed_root_as_message(&schema_bytes[begin_offset..])
3232 .expect_err("size_prefixed_root_as_message");
3233
3234 let msg = parse_message(&schema_bytes).expect("parse_message");
3235 let ipc_schema = msg.header_as_schema().expect("header_as_schema");
3236 let new_schema = fb_to_schema(ipc_schema);
3237
3238 assert_eq!(schema, new_schema);
3239 }
3240
3241 #[test]
3242 fn test_negative_meta_len() {
3243 let bytes = i32::to_le_bytes(-1);
3244 let mut buf = vec![];
3245 buf.extend(CONTINUATION_MARKER);
3246 buf.extend(bytes);
3247
3248 let reader = StreamReader::try_new(Cursor::new(buf), None);
3249 assert!(reader.is_err());
3250 }
3251}