1use std::cmp::min;
28use std::collections::HashMap;
29use std::io::{BufWriter, Write};
30use std::mem::size_of;
31use std::sync::Arc;
32
33use flatbuffers::FlatBufferBuilder;
34
35use arrow_array::builder::BufferBuilder;
36use arrow_array::cast::*;
37use arrow_array::types::{Int16Type, Int32Type, Int64Type, RunEndIndexType};
38use arrow_array::*;
39use arrow_buffer::bit_util;
40use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, ToByteSlice};
41use arrow_data::{ArrayData, ArrayDataBuilder, BufferSpec, layout};
42use arrow_schema::*;
43
44use crate::CONTINUATION_MARKER;
45use crate::compression::CompressionCodec;
46pub use crate::compression::CompressionContext;
47use crate::convert::IpcSchemaEncoder;
48
49#[derive(Debug, Clone)]
51pub struct IpcWriteOptions {
52 alignment: u8,
55 write_legacy_ipc_format: bool,
57 metadata_version: crate::MetadataVersion,
66 batch_compression_type: Option<crate::CompressionType>,
69 dictionary_handling: DictionaryHandling,
71}
72
73impl IpcWriteOptions {
74 pub fn try_with_compression(
79 mut self,
80 batch_compression_type: Option<crate::CompressionType>,
81 ) -> Result<Self, ArrowError> {
82 self.batch_compression_type = batch_compression_type;
83
84 if self.batch_compression_type.is_some()
85 && self.metadata_version < crate::MetadataVersion::V5
86 {
87 return Err(ArrowError::InvalidArgumentError(
88 "Compression only supported in metadata v5 and above".to_string(),
89 ));
90 }
91 Ok(self)
92 }
93 pub fn try_new(
95 alignment: usize,
96 write_legacy_ipc_format: bool,
97 metadata_version: crate::MetadataVersion,
98 ) -> Result<Self, ArrowError> {
99 let is_alignment_valid =
100 alignment == 8 || alignment == 16 || alignment == 32 || alignment == 64;
101 if !is_alignment_valid {
102 return Err(ArrowError::InvalidArgumentError(
103 "Alignment should be 8, 16, 32, or 64.".to_string(),
104 ));
105 }
106 let alignment: u8 = u8::try_from(alignment).expect("range already checked");
107 match metadata_version {
108 crate::MetadataVersion::V1
109 | crate::MetadataVersion::V2
110 | crate::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError(
111 "Writing IPC metadata version 3 and lower not supported".to_string(),
112 )),
113 #[allow(deprecated)]
114 crate::MetadataVersion::V4 => Ok(Self {
115 alignment,
116 write_legacy_ipc_format,
117 metadata_version,
118 batch_compression_type: None,
119 dictionary_handling: DictionaryHandling::default(),
120 }),
121 crate::MetadataVersion::V5 => {
122 if write_legacy_ipc_format {
123 Err(ArrowError::InvalidArgumentError(
124 "Legacy IPC format only supported on metadata version 4".to_string(),
125 ))
126 } else {
127 Ok(Self {
128 alignment,
129 write_legacy_ipc_format,
130 metadata_version,
131 batch_compression_type: None,
132 dictionary_handling: DictionaryHandling::default(),
133 })
134 }
135 }
136 z => Err(ArrowError::InvalidArgumentError(format!(
137 "Unsupported crate::MetadataVersion {z:?}"
138 ))),
139 }
140 }
141
142 pub fn with_dictionary_handling(mut self, dictionary_handling: DictionaryHandling) -> Self {
144 self.dictionary_handling = dictionary_handling;
145 self
146 }
147}
148
149impl Default for IpcWriteOptions {
150 fn default() -> Self {
151 Self {
152 alignment: 64,
153 write_legacy_ipc_format: false,
154 metadata_version: crate::MetadataVersion::V5,
155 batch_compression_type: None,
156 dictionary_handling: DictionaryHandling::default(),
157 }
158 }
159}
160
161#[derive(Debug, Default)]
162pub struct IpcDataGenerator {}
196
197impl IpcDataGenerator {
198 pub fn schema_to_bytes_with_dictionary_tracker(
201 &self,
202 schema: &Schema,
203 dictionary_tracker: &mut DictionaryTracker,
204 write_options: &IpcWriteOptions,
205 ) -> EncodedData {
206 let mut fbb = FlatBufferBuilder::new();
207 let schema = {
208 let fb = IpcSchemaEncoder::new()
209 .with_dictionary_tracker(dictionary_tracker)
210 .schema_to_fb_offset(&mut fbb, schema);
211 fb.as_union_value()
212 };
213
214 let mut message = crate::MessageBuilder::new(&mut fbb);
215 message.add_version(write_options.metadata_version);
216 message.add_header_type(crate::MessageHeader::Schema);
217 message.add_bodyLength(0);
218 message.add_header(schema);
219 let data = message.finish();
221 fbb.finish(data, None);
222
223 let data = fbb.finished_data();
224 EncodedData {
225 ipc_message: data.to_vec(),
226 arrow_data: vec![],
227 }
228 }
229
230 fn _encode_dictionaries<I: Iterator<Item = i64>>(
231 &self,
232 column: &ArrayRef,
233 encoded_dictionaries: &mut Vec<EncodedData>,
234 dictionary_tracker: &mut DictionaryTracker,
235 write_options: &IpcWriteOptions,
236 dict_id: &mut I,
237 compression_context: &mut CompressionContext,
238 ) -> Result<(), ArrowError> {
239 match column.data_type() {
240 DataType::Struct(fields) => {
241 let s = as_struct_array(column);
242 for (field, column) in fields.iter().zip(s.columns()) {
243 self.encode_dictionaries(
244 field,
245 column,
246 encoded_dictionaries,
247 dictionary_tracker,
248 write_options,
249 dict_id,
250 compression_context,
251 )?;
252 }
253 }
254 DataType::RunEndEncoded(_, values) => {
255 let data = column.to_data();
256 if data.child_data().len() != 2 {
257 return Err(ArrowError::InvalidArgumentError(format!(
258 "The run encoded array should have exactly two child arrays. Found {}",
259 data.child_data().len()
260 )));
261 }
262 let values_array = make_array(data.child_data()[1].clone());
265 self.encode_dictionaries(
266 values,
267 &values_array,
268 encoded_dictionaries,
269 dictionary_tracker,
270 write_options,
271 dict_id,
272 compression_context,
273 )?;
274 }
275 DataType::List(field) => {
276 let list = as_list_array(column);
277 self.encode_dictionaries(
278 field,
279 list.values(),
280 encoded_dictionaries,
281 dictionary_tracker,
282 write_options,
283 dict_id,
284 compression_context,
285 )?;
286 }
287 DataType::LargeList(field) => {
288 let list = as_large_list_array(column);
289 self.encode_dictionaries(
290 field,
291 list.values(),
292 encoded_dictionaries,
293 dictionary_tracker,
294 write_options,
295 dict_id,
296 compression_context,
297 )?;
298 }
299 DataType::ListView(field) => {
300 let list = column.as_list_view::<i32>();
301 self.encode_dictionaries(
302 field,
303 list.values(),
304 encoded_dictionaries,
305 dictionary_tracker,
306 write_options,
307 dict_id,
308 compression_context,
309 )?;
310 }
311 DataType::LargeListView(field) => {
312 let list = column.as_list_view::<i64>();
313 self.encode_dictionaries(
314 field,
315 list.values(),
316 encoded_dictionaries,
317 dictionary_tracker,
318 write_options,
319 dict_id,
320 compression_context,
321 )?;
322 }
323 DataType::FixedSizeList(field, _) => {
324 let list = column
325 .as_any()
326 .downcast_ref::<FixedSizeListArray>()
327 .expect("Unable to downcast to fixed size list array");
328 self.encode_dictionaries(
329 field,
330 list.values(),
331 encoded_dictionaries,
332 dictionary_tracker,
333 write_options,
334 dict_id,
335 compression_context,
336 )?;
337 }
338 DataType::Map(field, _) => {
339 let map_array = as_map_array(column);
340
341 let (keys, values) = match field.data_type() {
342 DataType::Struct(fields) if fields.len() == 2 => (&fields[0], &fields[1]),
343 _ => panic!("Incorrect field data type {:?}", field.data_type()),
344 };
345
346 self.encode_dictionaries(
348 keys,
349 map_array.keys(),
350 encoded_dictionaries,
351 dictionary_tracker,
352 write_options,
353 dict_id,
354 compression_context,
355 )?;
356
357 self.encode_dictionaries(
359 values,
360 map_array.values(),
361 encoded_dictionaries,
362 dictionary_tracker,
363 write_options,
364 dict_id,
365 compression_context,
366 )?;
367 }
368 DataType::Union(fields, _) => {
369 let union = as_union_array(column);
370 for (type_id, field) in fields.iter() {
371 let column = union.child(type_id);
372 self.encode_dictionaries(
373 field,
374 column,
375 encoded_dictionaries,
376 dictionary_tracker,
377 write_options,
378 dict_id,
379 compression_context,
380 )?;
381 }
382 }
383 _ => (),
384 }
385
386 Ok(())
387 }
388
389 #[allow(clippy::too_many_arguments)]
390 fn encode_dictionaries<I: Iterator<Item = i64>>(
391 &self,
392 field: &Field,
393 column: &ArrayRef,
394 encoded_dictionaries: &mut Vec<EncodedData>,
395 dictionary_tracker: &mut DictionaryTracker,
396 write_options: &IpcWriteOptions,
397 dict_id_seq: &mut I,
398 compression_context: &mut CompressionContext,
399 ) -> Result<(), ArrowError> {
400 match column.data_type() {
401 DataType::Dictionary(_key_type, _value_type) => {
402 let dict_data = column.to_data();
403 let dict_values = &dict_data.child_data()[0];
404
405 let values = make_array(dict_data.child_data()[0].clone());
406
407 self._encode_dictionaries(
408 &values,
409 encoded_dictionaries,
410 dictionary_tracker,
411 write_options,
412 dict_id_seq,
413 compression_context,
414 )?;
415
416 let dict_id = dict_id_seq.next().ok_or_else(|| {
420 ArrowError::IpcError(format!(
421 "no dict id for field {:?}: field.data_type={:?}, column.data_type={:?}",
422 field.name(),
423 field.data_type(),
424 column.data_type()
425 ))
426 })?;
427
428 match dictionary_tracker.insert_column(
429 dict_id,
430 column,
431 write_options.dictionary_handling,
432 )? {
433 DictionaryUpdate::None => {}
434 DictionaryUpdate::New | DictionaryUpdate::Replaced => {
435 encoded_dictionaries.push(self.dictionary_batch_to_bytes(
436 dict_id,
437 dict_values,
438 write_options,
439 false,
440 compression_context,
441 )?);
442 }
443 DictionaryUpdate::Delta(data) => {
444 encoded_dictionaries.push(self.dictionary_batch_to_bytes(
445 dict_id,
446 &data,
447 write_options,
448 true,
449 compression_context,
450 )?);
451 }
452 }
453 }
454 _ => self._encode_dictionaries(
455 column,
456 encoded_dictionaries,
457 dictionary_tracker,
458 write_options,
459 dict_id_seq,
460 compression_context,
461 )?,
462 }
463
464 Ok(())
465 }
466
467 pub fn encode(
471 &self,
472 batch: &RecordBatch,
473 dictionary_tracker: &mut DictionaryTracker,
474 write_options: &IpcWriteOptions,
475 compression_context: &mut CompressionContext,
476 ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
477 let schema = batch.schema();
478 let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len());
479
480 let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter();
481
482 for (i, field) in schema.fields().iter().enumerate() {
483 let column = batch.column(i);
484 self.encode_dictionaries(
485 field,
486 column,
487 &mut encoded_dictionaries,
488 dictionary_tracker,
489 write_options,
490 &mut dict_id,
491 compression_context,
492 )?;
493 }
494
495 let encoded_message =
496 self.record_batch_to_bytes(batch, write_options, compression_context)?;
497 Ok((encoded_dictionaries, encoded_message))
498 }
499
500 #[deprecated(since = "57.0.0", note = "Use `encode` instead")]
504 pub fn encoded_batch(
505 &self,
506 batch: &RecordBatch,
507 dictionary_tracker: &mut DictionaryTracker,
508 write_options: &IpcWriteOptions,
509 ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
510 self.encode(
511 batch,
512 dictionary_tracker,
513 write_options,
514 &mut Default::default(),
515 )
516 }
517
518 fn record_batch_to_bytes(
521 &self,
522 batch: &RecordBatch,
523 write_options: &IpcWriteOptions,
524 compression_context: &mut CompressionContext,
525 ) -> Result<EncodedData, ArrowError> {
526 let mut fbb = FlatBufferBuilder::new();
527
528 let mut nodes: Vec<crate::FieldNode> = vec![];
529 let mut buffers: Vec<crate::Buffer> = vec![];
530 let mut arrow_data: Vec<u8> = vec![];
531 let mut offset = 0;
532
533 let batch_compression_type = write_options.batch_compression_type;
535
536 let compression = batch_compression_type.map(|batch_compression_type| {
537 let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
538 c.add_method(crate::BodyCompressionMethod::BUFFER);
539 c.add_codec(batch_compression_type);
540 c.finish()
541 });
542
543 let compression_codec: Option<CompressionCodec> =
544 batch_compression_type.map(TryInto::try_into).transpose()?;
545
546 let mut variadic_buffer_counts = vec![];
547
548 for array in batch.columns() {
549 let array_data = array.to_data();
550 offset = write_array_data(
551 &array_data,
552 &mut buffers,
553 &mut arrow_data,
554 &mut nodes,
555 offset,
556 array.len(),
557 array.null_count(),
558 compression_codec,
559 compression_context,
560 write_options,
561 )?;
562
563 append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data);
564 }
565 let len = arrow_data.len();
567 let pad_len = pad_to_alignment(write_options.alignment, len);
568 arrow_data.extend_from_slice(&PADDING[..pad_len]);
569
570 let buffers = fbb.create_vector(&buffers);
572 let nodes = fbb.create_vector(&nodes);
573 let variadic_buffer = if variadic_buffer_counts.is_empty() {
574 None
575 } else {
576 Some(fbb.create_vector(&variadic_buffer_counts))
577 };
578
579 let root = {
580 let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
581 batch_builder.add_length(batch.num_rows() as i64);
582 batch_builder.add_nodes(nodes);
583 batch_builder.add_buffers(buffers);
584 if let Some(c) = compression {
585 batch_builder.add_compression(c);
586 }
587
588 if let Some(v) = variadic_buffer {
589 batch_builder.add_variadicBufferCounts(v);
590 }
591 let b = batch_builder.finish();
592 b.as_union_value()
593 };
594 let mut message = crate::MessageBuilder::new(&mut fbb);
596 message.add_version(write_options.metadata_version);
597 message.add_header_type(crate::MessageHeader::RecordBatch);
598 message.add_bodyLength(arrow_data.len() as i64);
599 message.add_header(root);
600 let root = message.finish();
601 fbb.finish(root, None);
602 let finished_data = fbb.finished_data();
603
604 Ok(EncodedData {
605 ipc_message: finished_data.to_vec(),
606 arrow_data,
607 })
608 }
609
610 fn dictionary_batch_to_bytes(
613 &self,
614 dict_id: i64,
615 array_data: &ArrayData,
616 write_options: &IpcWriteOptions,
617 is_delta: bool,
618 compression_context: &mut CompressionContext,
619 ) -> Result<EncodedData, ArrowError> {
620 let mut fbb = FlatBufferBuilder::new();
621
622 let mut nodes: Vec<crate::FieldNode> = vec![];
623 let mut buffers: Vec<crate::Buffer> = vec![];
624 let mut arrow_data: Vec<u8> = vec![];
625
626 let batch_compression_type = write_options.batch_compression_type;
628
629 let compression = batch_compression_type.map(|batch_compression_type| {
630 let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
631 c.add_method(crate::BodyCompressionMethod::BUFFER);
632 c.add_codec(batch_compression_type);
633 c.finish()
634 });
635
636 let compression_codec: Option<CompressionCodec> = batch_compression_type
637 .map(|batch_compression_type| batch_compression_type.try_into())
638 .transpose()?;
639
640 write_array_data(
641 array_data,
642 &mut buffers,
643 &mut arrow_data,
644 &mut nodes,
645 0,
646 array_data.len(),
647 array_data.null_count(),
648 compression_codec,
649 compression_context,
650 write_options,
651 )?;
652
653 let mut variadic_buffer_counts = vec![];
654 append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data);
655
656 let len = arrow_data.len();
658 let pad_len = pad_to_alignment(write_options.alignment, len);
659 arrow_data.extend_from_slice(&PADDING[..pad_len]);
660
661 let buffers = fbb.create_vector(&buffers);
663 let nodes = fbb.create_vector(&nodes);
664 let variadic_buffer = if variadic_buffer_counts.is_empty() {
665 None
666 } else {
667 Some(fbb.create_vector(&variadic_buffer_counts))
668 };
669
670 let root = {
671 let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
672 batch_builder.add_length(array_data.len() as i64);
673 batch_builder.add_nodes(nodes);
674 batch_builder.add_buffers(buffers);
675 if let Some(c) = compression {
676 batch_builder.add_compression(c);
677 }
678 if let Some(v) = variadic_buffer {
679 batch_builder.add_variadicBufferCounts(v);
680 }
681 batch_builder.finish()
682 };
683
684 let root = {
685 let mut batch_builder = crate::DictionaryBatchBuilder::new(&mut fbb);
686 batch_builder.add_id(dict_id);
687 batch_builder.add_data(root);
688 batch_builder.add_isDelta(is_delta);
689 batch_builder.finish().as_union_value()
690 };
691
692 let root = {
693 let mut message_builder = crate::MessageBuilder::new(&mut fbb);
694 message_builder.add_version(write_options.metadata_version);
695 message_builder.add_header_type(crate::MessageHeader::DictionaryBatch);
696 message_builder.add_bodyLength(arrow_data.len() as i64);
697 message_builder.add_header(root);
698 message_builder.finish()
699 };
700
701 fbb.finish(root, None);
702 let finished_data = fbb.finished_data();
703
704 Ok(EncodedData {
705 ipc_message: finished_data.to_vec(),
706 arrow_data,
707 })
708 }
709}
710
711fn append_variadic_buffer_counts(counts: &mut Vec<i64>, array: &ArrayData) {
712 match array.data_type() {
713 DataType::BinaryView | DataType::Utf8View => {
714 counts.push(array.buffers().len() as i64 - 1);
717 }
718 DataType::Dictionary(_, _) => {
719 }
722 _ => {
723 for child in array.child_data() {
724 append_variadic_buffer_counts(counts, child)
725 }
726 }
727 }
728}
729
730pub(crate) fn unslice_run_array(arr: ArrayData) -> Result<ArrayData, ArrowError> {
731 match arr.data_type() {
732 DataType::RunEndEncoded(k, _) => match k.data_type() {
733 DataType::Int16 => {
734 Ok(into_zero_offset_run_array(RunArray::<Int16Type>::from(arr))?.into_data())
735 }
736 DataType::Int32 => {
737 Ok(into_zero_offset_run_array(RunArray::<Int32Type>::from(arr))?.into_data())
738 }
739 DataType::Int64 => {
740 Ok(into_zero_offset_run_array(RunArray::<Int64Type>::from(arr))?.into_data())
741 }
742 d => unreachable!("Unexpected data type {d}"),
743 },
744 d => Err(ArrowError::InvalidArgumentError(format!(
745 "The given array is not a run array. Data type of given array: {d}"
746 ))),
747 }
748}
749
750fn into_zero_offset_run_array<R: RunEndIndexType>(
753 run_array: RunArray<R>,
754) -> Result<RunArray<R>, ArrowError> {
755 let run_ends = run_array.run_ends();
756 if run_ends.offset() == 0 && run_ends.max_value() == run_ends.len() {
757 return Ok(run_array);
758 }
759
760 let start_physical_index = run_ends.get_start_physical_index();
762
763 let end_physical_index = run_ends.get_end_physical_index();
765
766 let physical_length = end_physical_index - start_physical_index + 1;
767
768 let offset = R::Native::usize_as(run_ends.offset());
770 let mut builder = BufferBuilder::<R::Native>::new(physical_length);
771 for run_end_value in &run_ends.values()[start_physical_index..end_physical_index] {
772 builder.append(run_end_value.sub_wrapping(offset));
773 }
774 builder.append(R::Native::from_usize(run_array.len()).unwrap());
775 let new_run_ends = unsafe {
776 ArrayDataBuilder::new(R::DATA_TYPE)
779 .len(physical_length)
780 .add_buffer(builder.finish())
781 .build_unchecked()
782 };
783
784 let new_values = run_array
786 .values()
787 .slice(start_physical_index, physical_length)
788 .into_data();
789
790 let builder = ArrayDataBuilder::new(run_array.data_type().clone())
791 .len(run_array.len())
792 .add_child_data(new_run_ends)
793 .add_child_data(new_values);
794 let array_data = unsafe {
795 builder.build_unchecked()
798 };
799 Ok(array_data.into())
800}
801
802#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
804pub enum DictionaryHandling {
805 #[default]
807 Resend,
808 Delta,
814}
815
816#[derive(Debug, Clone)]
818pub enum DictionaryUpdate {
819 None,
822 New,
824 Replaced,
826 Delta(ArrayData),
828}
829
830#[derive(Debug)]
836pub struct DictionaryTracker {
837 written: HashMap<i64, ArrayData>,
839 dict_ids: Vec<i64>,
840 error_on_replacement: bool,
841}
842
843impl DictionaryTracker {
844 pub fn new(error_on_replacement: bool) -> Self {
850 #[allow(deprecated)]
851 Self {
852 written: HashMap::new(),
853 dict_ids: Vec::new(),
854 error_on_replacement,
855 }
856 }
857
858 pub fn next_dict_id(&mut self) -> i64 {
860 let next = self
861 .dict_ids
862 .last()
863 .copied()
864 .map(|i| i + 1)
865 .unwrap_or_default();
866
867 self.dict_ids.push(next);
868 next
869 }
870
871 pub fn dict_id(&mut self) -> &[i64] {
874 &self.dict_ids
875 }
876
877 #[deprecated(since = "56.1.0", note = "Use `insert_column` instead")]
887 pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool, ArrowError> {
888 let dict_data = column.to_data();
889 let dict_values = &dict_data.child_data()[0];
890
891 if let Some(last) = self.written.get(&dict_id) {
893 if ArrayData::ptr_eq(&last.child_data()[0], dict_values) {
894 return Ok(false);
896 }
897 if self.error_on_replacement {
898 if last.child_data()[0] == *dict_values {
900 return Ok(false);
902 }
903 return Err(ArrowError::InvalidArgumentError(
904 "Dictionary replacement detected when writing IPC file format. \
905 Arrow IPC files only support a single dictionary for a given field \
906 across all batches."
907 .to_string(),
908 ));
909 }
910 }
911
912 self.written.insert(dict_id, dict_data);
913 Ok(true)
914 }
915
916 pub fn insert_column(
932 &mut self,
933 dict_id: i64,
934 column: &ArrayRef,
935 dict_handling: DictionaryHandling,
936 ) -> Result<DictionaryUpdate, ArrowError> {
937 let new_data = column.to_data();
938 let new_values = &new_data.child_data()[0];
939
940 let Some(old) = self.written.get(&dict_id) else {
942 self.written.insert(dict_id, new_data);
943 return Ok(DictionaryUpdate::New);
944 };
945
946 let old_values = &old.child_data()[0];
949 if ArrayData::ptr_eq(old_values, new_values) {
950 return Ok(DictionaryUpdate::None);
951 }
952
953 let comparison = compare_dictionaries(old_values, new_values);
955 if matches!(comparison, DictionaryComparison::Equal) {
956 return Ok(DictionaryUpdate::None);
957 }
958
959 const REPLACEMENT_ERROR: &str = "Dictionary replacement detected when writing IPC file format. \
960 Arrow IPC files only support a single dictionary for a given field \
961 across all batches.";
962
963 match comparison {
964 DictionaryComparison::NotEqual => {
965 if self.error_on_replacement {
966 return Err(ArrowError::InvalidArgumentError(
967 REPLACEMENT_ERROR.to_string(),
968 ));
969 }
970
971 self.written.insert(dict_id, new_data);
972 Ok(DictionaryUpdate::Replaced)
973 }
974 DictionaryComparison::Delta => match dict_handling {
975 DictionaryHandling::Resend => {
976 if self.error_on_replacement {
977 return Err(ArrowError::InvalidArgumentError(
978 REPLACEMENT_ERROR.to_string(),
979 ));
980 }
981
982 self.written.insert(dict_id, new_data);
983 Ok(DictionaryUpdate::Replaced)
984 }
985 DictionaryHandling::Delta => {
986 let delta =
987 new_values.slice(old_values.len(), new_values.len() - old_values.len());
988 self.written.insert(dict_id, new_data);
989 Ok(DictionaryUpdate::Delta(delta))
990 }
991 },
992 DictionaryComparison::Equal => unreachable!("Already checked equal case"),
993 }
994 }
995
996 pub fn clear(&mut self) {
1002 self.dict_ids.clear();
1003 self.written.clear();
1004 }
1005}
1006
1007#[derive(Debug, Clone)]
1009enum DictionaryComparison {
1010 NotEqual,
1012 Equal,
1014 Delta,
1017}
1018
1019fn compare_dictionaries(old: &ArrayData, new: &ArrayData) -> DictionaryComparison {
1021 let existing_len = old.len();
1023 let new_len = new.len();
1024 if existing_len == new_len {
1025 if *old == *new {
1026 return DictionaryComparison::Equal;
1027 } else {
1028 return DictionaryComparison::NotEqual;
1029 }
1030 }
1031
1032 if new_len < existing_len {
1034 return DictionaryComparison::NotEqual;
1035 }
1036
1037 if new.slice(0, existing_len) == *old {
1039 return DictionaryComparison::Delta;
1040 }
1041
1042 DictionaryComparison::NotEqual
1043}
1044
1045pub struct FileWriter<W> {
1068 writer: W,
1070 write_options: IpcWriteOptions,
1072 schema: SchemaRef,
1074 block_offsets: usize,
1076 dictionary_blocks: Vec<crate::Block>,
1078 record_blocks: Vec<crate::Block>,
1080 finished: bool,
1082 dictionary_tracker: DictionaryTracker,
1084 custom_metadata: HashMap<String, String>,
1086
1087 data_gen: IpcDataGenerator,
1088
1089 compression_context: CompressionContext,
1090}
1091
1092impl<W: Write> FileWriter<BufWriter<W>> {
1093 pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1097 Self::try_new(BufWriter::new(writer), schema)
1098 }
1099}
1100
1101impl<W: Write> FileWriter<W> {
1102 pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1110 let write_options = IpcWriteOptions::default();
1111 Self::try_new_with_options(writer, schema, write_options)
1112 }
1113
1114 pub fn try_new_with_options(
1122 mut writer: W,
1123 schema: &Schema,
1124 write_options: IpcWriteOptions,
1125 ) -> Result<Self, ArrowError> {
1126 let data_gen = IpcDataGenerator::default();
1127 let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len());
1129 let header_size = super::ARROW_MAGIC.len() + pad_len;
1130 writer.write_all(&super::ARROW_MAGIC)?;
1131 writer.write_all(&PADDING[..pad_len])?;
1132 let mut dictionary_tracker = DictionaryTracker::new(true);
1134 let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1135 schema,
1136 &mut dictionary_tracker,
1137 &write_options,
1138 );
1139 let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
1140 Ok(Self {
1141 writer,
1142 write_options,
1143 schema: Arc::new(schema.clone()),
1144 block_offsets: meta + data + header_size,
1145 dictionary_blocks: vec![],
1146 record_blocks: vec![],
1147 finished: false,
1148 dictionary_tracker,
1149 custom_metadata: HashMap::new(),
1150 data_gen,
1151 compression_context: CompressionContext::default(),
1152 })
1153 }
1154
1155 pub fn write_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
1157 self.custom_metadata.insert(key.into(), value.into());
1158 }
1159
1160 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1162 if self.finished {
1163 return Err(ArrowError::IpcError(
1164 "Cannot write record batch to file writer as it is closed".to_string(),
1165 ));
1166 }
1167
1168 let (encoded_dictionaries, encoded_message) = self.data_gen.encode(
1169 batch,
1170 &mut self.dictionary_tracker,
1171 &self.write_options,
1172 &mut self.compression_context,
1173 )?;
1174
1175 for encoded_dictionary in encoded_dictionaries {
1176 let (meta, data) =
1177 write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
1178
1179 let block = crate::Block::new(self.block_offsets as i64, meta as i32, data as i64);
1180 self.dictionary_blocks.push(block);
1181 self.block_offsets += meta + data;
1182 }
1183
1184 let (meta, data) = write_message(&mut self.writer, encoded_message, &self.write_options)?;
1185
1186 let block = crate::Block::new(
1188 self.block_offsets as i64,
1189 meta as i32, data as i64,
1191 );
1192 self.record_blocks.push(block);
1193 self.block_offsets += meta + data;
1194 Ok(())
1195 }
1196
1197 pub fn finish(&mut self) -> Result<(), ArrowError> {
1199 if self.finished {
1200 return Err(ArrowError::IpcError(
1201 "Cannot write footer to file writer as it is closed".to_string(),
1202 ));
1203 }
1204
1205 write_continuation(&mut self.writer, &self.write_options, 0)?;
1207
1208 let mut fbb = FlatBufferBuilder::new();
1209 let dictionaries = fbb.create_vector(&self.dictionary_blocks);
1210 let record_batches = fbb.create_vector(&self.record_blocks);
1211
1212 self.dictionary_tracker.clear();
1214 let schema = IpcSchemaEncoder::new()
1215 .with_dictionary_tracker(&mut self.dictionary_tracker)
1216 .schema_to_fb_offset(&mut fbb, &self.schema);
1217 let fb_custom_metadata = (!self.custom_metadata.is_empty())
1218 .then(|| crate::convert::metadata_to_fb(&mut fbb, &self.custom_metadata));
1219
1220 let root = {
1221 let mut footer_builder = crate::FooterBuilder::new(&mut fbb);
1222 footer_builder.add_version(self.write_options.metadata_version);
1223 footer_builder.add_schema(schema);
1224 footer_builder.add_dictionaries(dictionaries);
1225 footer_builder.add_recordBatches(record_batches);
1226 if let Some(fb_custom_metadata) = fb_custom_metadata {
1227 footer_builder.add_custom_metadata(fb_custom_metadata);
1228 }
1229 footer_builder.finish()
1230 };
1231 fbb.finish(root, None);
1232 let footer_data = fbb.finished_data();
1233 self.writer.write_all(footer_data)?;
1234 self.writer
1235 .write_all(&(footer_data.len() as i32).to_le_bytes())?;
1236 self.writer.write_all(&super::ARROW_MAGIC)?;
1237 self.writer.flush()?;
1238 self.finished = true;
1239
1240 Ok(())
1241 }
1242
1243 pub fn schema(&self) -> &SchemaRef {
1245 &self.schema
1246 }
1247
1248 pub fn get_ref(&self) -> &W {
1250 &self.writer
1251 }
1252
1253 pub fn get_mut(&mut self) -> &mut W {
1257 &mut self.writer
1258 }
1259
1260 pub fn flush(&mut self) -> Result<(), ArrowError> {
1264 self.writer.flush()?;
1265 Ok(())
1266 }
1267
1268 pub fn into_inner(mut self) -> Result<W, ArrowError> {
1277 if !self.finished {
1278 self.finish()?;
1280 }
1281 Ok(self.writer)
1282 }
1283}
1284
1285impl<W: Write> RecordBatchWriter for FileWriter<W> {
1286 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1287 self.write(batch)
1288 }
1289
1290 fn close(mut self) -> Result<(), ArrowError> {
1291 self.finish()
1292 }
1293}
1294
1295pub struct StreamWriter<W> {
1369 writer: W,
1371 write_options: IpcWriteOptions,
1373 finished: bool,
1375 dictionary_tracker: DictionaryTracker,
1377
1378 data_gen: IpcDataGenerator,
1379
1380 compression_context: CompressionContext,
1381}
1382
1383impl<W: Write> StreamWriter<BufWriter<W>> {
1384 pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1388 Self::try_new(BufWriter::new(writer), schema)
1389 }
1390}
1391
1392impl<W: Write> StreamWriter<W> {
1393 pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1401 let write_options = IpcWriteOptions::default();
1402 Self::try_new_with_options(writer, schema, write_options)
1403 }
1404
1405 pub fn try_new_with_options(
1411 mut writer: W,
1412 schema: &Schema,
1413 write_options: IpcWriteOptions,
1414 ) -> Result<Self, ArrowError> {
1415 let data_gen = IpcDataGenerator::default();
1416 let mut dictionary_tracker = DictionaryTracker::new(false);
1417
1418 let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1420 schema,
1421 &mut dictionary_tracker,
1422 &write_options,
1423 );
1424 write_message(&mut writer, encoded_message, &write_options)?;
1425 Ok(Self {
1426 writer,
1427 write_options,
1428 finished: false,
1429 dictionary_tracker,
1430 data_gen,
1431 compression_context: CompressionContext::default(),
1432 })
1433 }
1434
1435 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1437 if self.finished {
1438 return Err(ArrowError::IpcError(
1439 "Cannot write record batch to stream writer as it is closed".to_string(),
1440 ));
1441 }
1442
1443 let (encoded_dictionaries, encoded_message) = self
1444 .data_gen
1445 .encode(
1446 batch,
1447 &mut self.dictionary_tracker,
1448 &self.write_options,
1449 &mut self.compression_context,
1450 )
1451 .expect("StreamWriter is configured to not error on dictionary replacement");
1452
1453 for encoded_dictionary in encoded_dictionaries {
1454 write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
1455 }
1456
1457 write_message(&mut self.writer, encoded_message, &self.write_options)?;
1458 Ok(())
1459 }
1460
1461 pub fn finish(&mut self) -> Result<(), ArrowError> {
1463 if self.finished {
1464 return Err(ArrowError::IpcError(
1465 "Cannot write footer to stream writer as it is closed".to_string(),
1466 ));
1467 }
1468
1469 write_continuation(&mut self.writer, &self.write_options, 0)?;
1470 self.writer.flush()?;
1471
1472 self.finished = true;
1473
1474 Ok(())
1475 }
1476
1477 pub fn get_ref(&self) -> &W {
1479 &self.writer
1480 }
1481
1482 pub fn get_mut(&mut self) -> &mut W {
1486 &mut self.writer
1487 }
1488
1489 pub fn flush(&mut self) -> Result<(), ArrowError> {
1493 self.writer.flush()?;
1494 Ok(())
1495 }
1496
1497 pub fn into_inner(mut self) -> Result<W, ArrowError> {
1535 if !self.finished {
1536 self.finish()?;
1538 }
1539 Ok(self.writer)
1540 }
1541}
1542
1543impl<W: Write> RecordBatchWriter for StreamWriter<W> {
1544 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1545 self.write(batch)
1546 }
1547
1548 fn close(mut self) -> Result<(), ArrowError> {
1549 self.finish()
1550 }
1551}
1552
1553pub struct EncodedData {
1555 pub ipc_message: Vec<u8>,
1557 pub arrow_data: Vec<u8>,
1559}
1560pub fn write_message<W: Write>(
1562 mut writer: W,
1563 encoded: EncodedData,
1564 write_options: &IpcWriteOptions,
1565) -> Result<(usize, usize), ArrowError> {
1566 let arrow_data_len = encoded.arrow_data.len();
1567 if arrow_data_len % usize::from(write_options.alignment) != 0 {
1568 return Err(ArrowError::MemoryError(
1569 "Arrow data not aligned".to_string(),
1570 ));
1571 }
1572
1573 let a = usize::from(write_options.alignment - 1);
1574 let buffer = encoded.ipc_message;
1575 let flatbuf_size = buffer.len();
1576 let prefix_size = if write_options.write_legacy_ipc_format {
1577 4
1578 } else {
1579 8
1580 };
1581 let aligned_size = (flatbuf_size + prefix_size + a) & !a;
1582 let padding_bytes = aligned_size - flatbuf_size - prefix_size;
1583
1584 write_continuation(
1585 &mut writer,
1586 write_options,
1587 (aligned_size - prefix_size) as i32,
1588 )?;
1589
1590 if flatbuf_size > 0 {
1592 writer.write_all(&buffer)?;
1593 }
1594 writer.write_all(&PADDING[..padding_bytes])?;
1596
1597 let body_len = if arrow_data_len > 0 {
1599 write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)?
1600 } else {
1601 0
1602 };
1603
1604 Ok((aligned_size, body_len))
1605}
1606
1607fn write_body_buffers<W: Write>(
1608 mut writer: W,
1609 data: &[u8],
1610 alignment: u8,
1611) -> Result<usize, ArrowError> {
1612 let len = data.len();
1613 let pad_len = pad_to_alignment(alignment, len);
1614 let total_len = len + pad_len;
1615
1616 writer.write_all(data)?;
1618 if pad_len > 0 {
1619 writer.write_all(&PADDING[..pad_len])?;
1620 }
1621
1622 Ok(total_len)
1623}
1624
1625fn write_continuation<W: Write>(
1628 mut writer: W,
1629 write_options: &IpcWriteOptions,
1630 total_len: i32,
1631) -> Result<usize, ArrowError> {
1632 let mut written = 8;
1633
1634 match write_options.metadata_version {
1636 crate::MetadataVersion::V1 | crate::MetadataVersion::V2 | crate::MetadataVersion::V3 => {
1637 unreachable!("Options with the metadata version cannot be created")
1638 }
1639 crate::MetadataVersion::V4 => {
1640 if !write_options.write_legacy_ipc_format {
1641 writer.write_all(&CONTINUATION_MARKER)?;
1643 written = 4;
1644 }
1645 writer.write_all(&total_len.to_le_bytes()[..])?;
1646 }
1647 crate::MetadataVersion::V5 => {
1648 writer.write_all(&CONTINUATION_MARKER)?;
1650 writer.write_all(&total_len.to_le_bytes()[..])?;
1651 }
1652 z => panic!("Unsupported crate::MetadataVersion {z:?}"),
1653 };
1654
1655 Ok(written)
1656}
1657
1658fn has_validity_bitmap(data_type: &DataType, write_options: &IpcWriteOptions) -> bool {
1662 if write_options.metadata_version < crate::MetadataVersion::V5 {
1663 !matches!(data_type, DataType::Null)
1664 } else {
1665 !matches!(
1666 data_type,
1667 DataType::Null | DataType::Union(_, _) | DataType::RunEndEncoded(_, _)
1668 )
1669 }
1670}
1671
1672#[inline]
1674fn buffer_need_truncate(
1675 array_offset: usize,
1676 buffer: &Buffer,
1677 spec: &BufferSpec,
1678 min_length: usize,
1679) -> bool {
1680 spec != &BufferSpec::AlwaysNull && (array_offset != 0 || min_length < buffer.len())
1681}
1682
1683#[inline]
1685fn get_buffer_element_width(spec: &BufferSpec) -> usize {
1686 match spec {
1687 BufferSpec::FixedWidth { byte_width, .. } => *byte_width,
1688 _ => 0,
1689 }
1690}
1691
1692fn reencode_offsets<O: OffsetSizeTrait>(
1695 offsets: &Buffer,
1696 data: &ArrayData,
1697) -> (Buffer, usize, usize) {
1698 let offsets_slice: &[O] = offsets.typed_data::<O>();
1699 let offset_slice = &offsets_slice[data.offset()..data.offset() + data.len() + 1];
1700
1701 let start_offset = offset_slice.first().unwrap();
1702 let end_offset = offset_slice.last().unwrap();
1703
1704 let offsets = match start_offset.as_usize() {
1705 0 => {
1706 let size = size_of::<O>();
1707 offsets.slice_with_length(data.offset() * size, (data.len() + 1) * size)
1708 }
1709 _ => offset_slice.iter().map(|x| *x - *start_offset).collect(),
1710 };
1711
1712 let start_offset = start_offset.as_usize();
1713 let end_offset = end_offset.as_usize();
1714
1715 (offsets, start_offset, end_offset - start_offset)
1716}
1717
1718fn get_byte_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, Buffer) {
1724 if data.is_empty() {
1725 let mut offsets = MutableBuffer::new(size_of::<O>());
1728 offsets.extend_from_slice(O::usize_as(0).to_byte_slice());
1729 return (offsets.into(), MutableBuffer::new(0).into());
1730 }
1731
1732 let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1733 let values = data.buffers()[1].slice_with_length(original_start_offset, len);
1734 (offsets, values)
1735}
1736
1737fn get_list_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, ArrayData) {
1740 if data.is_empty() {
1741 let mut offsets = MutableBuffer::new(size_of::<O>());
1744 offsets.extend_from_slice(O::usize_as(0).to_byte_slice());
1745 return (offsets.into(), data.child_data()[0].slice(0, 0));
1746 }
1747
1748 let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1749 let child_data = data.child_data()[0].slice(original_start_offset, len);
1750 (offsets, child_data)
1751}
1752
1753fn get_list_view_array_buffers<O: OffsetSizeTrait>(
1759 data: &ArrayData,
1760) -> (Buffer, Buffer, ArrayData) {
1761 if data.is_empty() {
1762 return (
1763 MutableBuffer::new(0).into(),
1764 MutableBuffer::new(0).into(),
1765 data.child_data()[0].slice(0, 0),
1766 );
1767 }
1768
1769 let offsets = &data.buffers()[0];
1770 let sizes = &data.buffers()[1];
1771
1772 let element_size = std::mem::size_of::<O>();
1773 let offsets_slice =
1774 offsets.slice_with_length(data.offset() * element_size, data.len() * element_size);
1775 let sizes_slice =
1776 sizes.slice_with_length(data.offset() * element_size, data.len() * element_size);
1777
1778 let child_data = data.child_data()[0].clone();
1779
1780 (offsets_slice, sizes_slice, child_data)
1781}
1782
1783fn get_or_truncate_buffer(array_data: &ArrayData) -> &[u8] {
1790 let buffer = &array_data.buffers()[0];
1791 let layout = layout(array_data.data_type());
1792 let spec = &layout.buffers[0];
1793
1794 let byte_width = get_buffer_element_width(spec);
1795 let min_length = array_data.len() * byte_width;
1796 if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) {
1797 let byte_offset = array_data.offset() * byte_width;
1798 let buffer_length = min(min_length, buffer.len() - byte_offset);
1799 &buffer.as_slice()[byte_offset..(byte_offset + buffer_length)]
1800 } else {
1801 buffer.as_slice()
1802 }
1803}
1804
1805#[allow(clippy::too_many_arguments)]
1807fn write_array_data(
1808 array_data: &ArrayData,
1809 buffers: &mut Vec<crate::Buffer>,
1810 arrow_data: &mut Vec<u8>,
1811 nodes: &mut Vec<crate::FieldNode>,
1812 offset: i64,
1813 num_rows: usize,
1814 null_count: usize,
1815 compression_codec: Option<CompressionCodec>,
1816 compression_context: &mut CompressionContext,
1817 write_options: &IpcWriteOptions,
1818) -> Result<i64, ArrowError> {
1819 let mut offset = offset;
1820 if !matches!(array_data.data_type(), DataType::Null) {
1821 nodes.push(crate::FieldNode::new(num_rows as i64, null_count as i64));
1822 } else {
1823 nodes.push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
1826 }
1827 if has_validity_bitmap(array_data.data_type(), write_options) {
1828 let null_buffer = match array_data.nulls() {
1830 None => {
1831 let num_bytes = bit_util::ceil(num_rows, 8);
1833 let buffer = MutableBuffer::new(num_bytes);
1834 let buffer = buffer.with_bitset(num_bytes, true);
1835 buffer.into()
1836 }
1837 Some(buffer) => buffer.inner().sliced(),
1838 };
1839
1840 offset = write_buffer(
1841 null_buffer.as_slice(),
1842 buffers,
1843 arrow_data,
1844 offset,
1845 compression_codec,
1846 compression_context,
1847 write_options.alignment,
1848 )?;
1849 }
1850
1851 let data_type = array_data.data_type();
1852 if matches!(data_type, DataType::Binary | DataType::Utf8) {
1853 let (offsets, values) = get_byte_array_buffers::<i32>(array_data);
1854 for buffer in [offsets, values] {
1855 offset = write_buffer(
1856 buffer.as_slice(),
1857 buffers,
1858 arrow_data,
1859 offset,
1860 compression_codec,
1861 compression_context,
1862 write_options.alignment,
1863 )?;
1864 }
1865 } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) {
1866 let views = get_or_truncate_buffer(array_data);
1873 offset = write_buffer(
1874 views,
1875 buffers,
1876 arrow_data,
1877 offset,
1878 compression_codec,
1879 compression_context,
1880 write_options.alignment,
1881 )?;
1882
1883 for buffer in array_data.buffers().iter().skip(1) {
1884 offset = write_buffer(
1885 buffer.as_slice(),
1886 buffers,
1887 arrow_data,
1888 offset,
1889 compression_codec,
1890 compression_context,
1891 write_options.alignment,
1892 )?;
1893 }
1894 } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) {
1895 let (offsets, values) = get_byte_array_buffers::<i64>(array_data);
1896 for buffer in [offsets, values] {
1897 offset = write_buffer(
1898 buffer.as_slice(),
1899 buffers,
1900 arrow_data,
1901 offset,
1902 compression_codec,
1903 compression_context,
1904 write_options.alignment,
1905 )?;
1906 }
1907 } else if DataType::is_numeric(data_type)
1908 || DataType::is_temporal(data_type)
1909 || matches!(
1910 array_data.data_type(),
1911 DataType::FixedSizeBinary(_) | DataType::Dictionary(_, _)
1912 )
1913 {
1914 assert_eq!(array_data.buffers().len(), 1);
1916
1917 let buffer = get_or_truncate_buffer(array_data);
1918 offset = write_buffer(
1919 buffer,
1920 buffers,
1921 arrow_data,
1922 offset,
1923 compression_codec,
1924 compression_context,
1925 write_options.alignment,
1926 )?;
1927 } else if matches!(data_type, DataType::Boolean) {
1928 assert_eq!(array_data.buffers().len(), 1);
1931
1932 let buffer = &array_data.buffers()[0];
1933 let buffer = buffer.bit_slice(array_data.offset(), array_data.len());
1934 offset = write_buffer(
1935 &buffer,
1936 buffers,
1937 arrow_data,
1938 offset,
1939 compression_codec,
1940 compression_context,
1941 write_options.alignment,
1942 )?;
1943 } else if matches!(
1944 data_type,
1945 DataType::List(_) | DataType::LargeList(_) | DataType::Map(_, _)
1946 ) {
1947 assert_eq!(array_data.buffers().len(), 1);
1948 assert_eq!(array_data.child_data().len(), 1);
1949
1950 let (offsets, sliced_child_data) = match data_type {
1952 DataType::List(_) => get_list_array_buffers::<i32>(array_data),
1953 DataType::Map(_, _) => get_list_array_buffers::<i32>(array_data),
1954 DataType::LargeList(_) => get_list_array_buffers::<i64>(array_data),
1955 _ => unreachable!(),
1956 };
1957 offset = write_buffer(
1958 offsets.as_slice(),
1959 buffers,
1960 arrow_data,
1961 offset,
1962 compression_codec,
1963 compression_context,
1964 write_options.alignment,
1965 )?;
1966 offset = write_array_data(
1967 &sliced_child_data,
1968 buffers,
1969 arrow_data,
1970 nodes,
1971 offset,
1972 sliced_child_data.len(),
1973 sliced_child_data.null_count(),
1974 compression_codec,
1975 compression_context,
1976 write_options,
1977 )?;
1978 return Ok(offset);
1979 } else if matches!(
1980 data_type,
1981 DataType::ListView(_) | DataType::LargeListView(_)
1982 ) {
1983 assert_eq!(array_data.buffers().len(), 2); assert_eq!(array_data.child_data().len(), 1);
1985
1986 let (offsets, sizes, child_data) = match data_type {
1987 DataType::ListView(_) => get_list_view_array_buffers::<i32>(array_data),
1988 DataType::LargeListView(_) => get_list_view_array_buffers::<i64>(array_data),
1989 _ => unreachable!(),
1990 };
1991
1992 offset = write_buffer(
1993 offsets.as_slice(),
1994 buffers,
1995 arrow_data,
1996 offset,
1997 compression_codec,
1998 compression_context,
1999 write_options.alignment,
2000 )?;
2001
2002 offset = write_buffer(
2003 sizes.as_slice(),
2004 buffers,
2005 arrow_data,
2006 offset,
2007 compression_codec,
2008 compression_context,
2009 write_options.alignment,
2010 )?;
2011
2012 offset = write_array_data(
2013 &child_data,
2014 buffers,
2015 arrow_data,
2016 nodes,
2017 offset,
2018 child_data.len(),
2019 child_data.null_count(),
2020 compression_codec,
2021 compression_context,
2022 write_options,
2023 )?;
2024 return Ok(offset);
2025 } else if let DataType::FixedSizeList(_, fixed_size) = data_type {
2026 assert_eq!(array_data.child_data().len(), 1);
2027 let fixed_size = *fixed_size as usize;
2028
2029 let child_offset = array_data.offset() * fixed_size;
2030 let child_length = array_data.len() * fixed_size;
2031 let child_data = array_data.child_data()[0].slice(child_offset, child_length);
2032
2033 offset = write_array_data(
2034 &child_data,
2035 buffers,
2036 arrow_data,
2037 nodes,
2038 offset,
2039 child_data.len(),
2040 child_data.null_count(),
2041 compression_codec,
2042 compression_context,
2043 write_options,
2044 )?;
2045 return Ok(offset);
2046 } else {
2047 for buffer in array_data.buffers() {
2048 offset = write_buffer(
2049 buffer,
2050 buffers,
2051 arrow_data,
2052 offset,
2053 compression_codec,
2054 compression_context,
2055 write_options.alignment,
2056 )?;
2057 }
2058 }
2059
2060 match array_data.data_type() {
2061 DataType::Dictionary(_, _) => {}
2062 DataType::RunEndEncoded(_, _) => {
2063 let arr = unslice_run_array(array_data.clone())?;
2065 for data_ref in arr.child_data() {
2067 offset = write_array_data(
2069 data_ref,
2070 buffers,
2071 arrow_data,
2072 nodes,
2073 offset,
2074 data_ref.len(),
2075 data_ref.null_count(),
2076 compression_codec,
2077 compression_context,
2078 write_options,
2079 )?;
2080 }
2081 }
2082 _ => {
2083 for data_ref in array_data.child_data() {
2085 offset = write_array_data(
2087 data_ref,
2088 buffers,
2089 arrow_data,
2090 nodes,
2091 offset,
2092 data_ref.len(),
2093 data_ref.null_count(),
2094 compression_codec,
2095 compression_context,
2096 write_options,
2097 )?;
2098 }
2099 }
2100 }
2101 Ok(offset)
2102}
2103
2104fn write_buffer(
2117 buffer: &[u8], buffers: &mut Vec<crate::Buffer>, arrow_data: &mut Vec<u8>, offset: i64, compression_codec: Option<CompressionCodec>,
2122 compression_context: &mut CompressionContext,
2123 alignment: u8,
2124) -> Result<i64, ArrowError> {
2125 let len: i64 = match compression_codec {
2126 Some(compressor) => compressor.compress_to_vec(buffer, arrow_data, compression_context)?,
2127 None => {
2128 arrow_data.extend_from_slice(buffer);
2129 buffer.len()
2130 }
2131 }
2132 .try_into()
2133 .map_err(|e| {
2134 ArrowError::InvalidArgumentError(format!("Could not convert compressed size to i64: {e}"))
2135 })?;
2136
2137 buffers.push(crate::Buffer::new(offset, len));
2139 let pad_len = pad_to_alignment(alignment, len as usize);
2141 arrow_data.extend_from_slice(&PADDING[..pad_len]);
2142
2143 Ok(offset + len + (pad_len as i64))
2144}
2145
2146const PADDING: [u8; 64] = [0; 64];
2147
2148#[inline]
2150fn pad_to_alignment(alignment: u8, len: usize) -> usize {
2151 let a = usize::from(alignment - 1);
2152 ((len + a) & !a) - len
2153}
2154
2155#[cfg(test)]
2156mod tests {
2157 use std::hash::Hasher;
2158 use std::io::Cursor;
2159 use std::io::Seek;
2160
2161 use arrow_array::builder::FixedSizeListBuilder;
2162 use arrow_array::builder::Float32Builder;
2163 use arrow_array::builder::Int64Builder;
2164 use arrow_array::builder::MapBuilder;
2165 use arrow_array::builder::StringViewBuilder;
2166 use arrow_array::builder::UnionBuilder;
2167 use arrow_array::builder::{
2168 GenericListBuilder, GenericListViewBuilder, ListBuilder, StringBuilder,
2169 };
2170 use arrow_array::builder::{PrimitiveRunBuilder, UInt32Builder};
2171 use arrow_array::types::*;
2172 use arrow_buffer::ScalarBuffer;
2173
2174 use crate::MetadataVersion;
2175 use crate::convert::fb_to_schema;
2176 use crate::reader::*;
2177 use crate::root_as_footer;
2178
2179 use super::*;
2180
2181 fn serialize_file(rb: &RecordBatch) -> Vec<u8> {
2182 let mut writer = FileWriter::try_new(vec![], rb.schema_ref()).unwrap();
2183 writer.write(rb).unwrap();
2184 writer.finish().unwrap();
2185 writer.into_inner().unwrap()
2186 }
2187
2188 fn deserialize_file(bytes: Vec<u8>) -> RecordBatch {
2189 let mut reader = FileReader::try_new(Cursor::new(bytes), None).unwrap();
2190 reader.next().unwrap().unwrap()
2191 }
2192
2193 fn serialize_stream(record: &RecordBatch) -> Vec<u8> {
2194 const IPC_ALIGNMENT: usize = 8;
2198
2199 let mut stream_writer = StreamWriter::try_new_with_options(
2200 vec![],
2201 record.schema_ref(),
2202 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
2203 )
2204 .unwrap();
2205 stream_writer.write(record).unwrap();
2206 stream_writer.finish().unwrap();
2207 stream_writer.into_inner().unwrap()
2208 }
2209
2210 fn deserialize_stream(bytes: Vec<u8>) -> RecordBatch {
2211 let mut stream_reader = StreamReader::try_new(Cursor::new(bytes), None).unwrap();
2212 stream_reader.next().unwrap().unwrap()
2213 }
2214
2215 #[test]
2216 #[cfg(feature = "lz4")]
2217 fn test_write_empty_record_batch_lz4_compression() {
2218 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2219 let values: Vec<Option<i32>> = vec![];
2220 let array = Int32Array::from(values);
2221 let record_batch =
2222 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2223
2224 let mut file = tempfile::tempfile().unwrap();
2225
2226 {
2227 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2228 .unwrap()
2229 .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2230 .unwrap();
2231
2232 let mut writer =
2233 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2234 writer.write(&record_batch).unwrap();
2235 writer.finish().unwrap();
2236 }
2237 file.rewind().unwrap();
2238 {
2239 let reader = FileReader::try_new(file, None).unwrap();
2241 for read_batch in reader {
2242 read_batch
2243 .unwrap()
2244 .columns()
2245 .iter()
2246 .zip(record_batch.columns())
2247 .for_each(|(a, b)| {
2248 assert_eq!(a.data_type(), b.data_type());
2249 assert_eq!(a.len(), b.len());
2250 assert_eq!(a.null_count(), b.null_count());
2251 });
2252 }
2253 }
2254 }
2255
2256 #[test]
2257 #[cfg(feature = "lz4")]
2258 fn test_write_file_with_lz4_compression() {
2259 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2260 let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2261 let array = Int32Array::from(values);
2262 let record_batch =
2263 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2264
2265 let mut file = tempfile::tempfile().unwrap();
2266 {
2267 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2268 .unwrap()
2269 .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2270 .unwrap();
2271
2272 let mut writer =
2273 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2274 writer.write(&record_batch).unwrap();
2275 writer.finish().unwrap();
2276 }
2277 file.rewind().unwrap();
2278 {
2279 let reader = FileReader::try_new(file, None).unwrap();
2281 for read_batch in reader {
2282 read_batch
2283 .unwrap()
2284 .columns()
2285 .iter()
2286 .zip(record_batch.columns())
2287 .for_each(|(a, b)| {
2288 assert_eq!(a.data_type(), b.data_type());
2289 assert_eq!(a.len(), b.len());
2290 assert_eq!(a.null_count(), b.null_count());
2291 });
2292 }
2293 }
2294 }
2295
2296 #[test]
2297 #[cfg(feature = "zstd")]
2298 fn test_write_file_with_zstd_compression() {
2299 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2300 let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2301 let array = Int32Array::from(values);
2302 let record_batch =
2303 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2304 let mut file = tempfile::tempfile().unwrap();
2305 {
2306 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2307 .unwrap()
2308 .try_with_compression(Some(crate::CompressionType::ZSTD))
2309 .unwrap();
2310
2311 let mut writer =
2312 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2313 writer.write(&record_batch).unwrap();
2314 writer.finish().unwrap();
2315 }
2316 file.rewind().unwrap();
2317 {
2318 let reader = FileReader::try_new(file, None).unwrap();
2320 for read_batch in reader {
2321 read_batch
2322 .unwrap()
2323 .columns()
2324 .iter()
2325 .zip(record_batch.columns())
2326 .for_each(|(a, b)| {
2327 assert_eq!(a.data_type(), b.data_type());
2328 assert_eq!(a.len(), b.len());
2329 assert_eq!(a.null_count(), b.null_count());
2330 });
2331 }
2332 }
2333 }
2334
2335 #[test]
2336 fn test_write_file() {
2337 let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, true)]);
2338 let values: Vec<Option<u32>> = vec![
2339 Some(999),
2340 None,
2341 Some(235),
2342 Some(123),
2343 None,
2344 None,
2345 None,
2346 None,
2347 None,
2348 ];
2349 let array1 = UInt32Array::from(values);
2350 let batch =
2351 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array1) as ArrayRef])
2352 .unwrap();
2353 let mut file = tempfile::tempfile().unwrap();
2354 {
2355 let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2356
2357 writer.write(&batch).unwrap();
2358 writer.finish().unwrap();
2359 }
2360 file.rewind().unwrap();
2361
2362 {
2363 let mut reader = FileReader::try_new(file, None).unwrap();
2364 while let Some(Ok(read_batch)) = reader.next() {
2365 read_batch
2366 .columns()
2367 .iter()
2368 .zip(batch.columns())
2369 .for_each(|(a, b)| {
2370 assert_eq!(a.data_type(), b.data_type());
2371 assert_eq!(a.len(), b.len());
2372 assert_eq!(a.null_count(), b.null_count());
2373 });
2374 }
2375 }
2376 }
2377
2378 #[test]
2379 fn test_empty_utf8_ipc_writes_nonempty_offsets_buffer() {
2380 let name = StringArray::from(Vec::<String>::new());
2381 let (offsets, values) = get_byte_array_buffers::<i32>(&name.to_data());
2382
2383 assert_eq!(name.len(), 0);
2384 assert_eq!(
2385 offsets.len(),
2386 std::mem::size_of::<i32>(),
2387 "offsets buffer should contain one zero i32 offset"
2388 );
2389 assert_eq!(values.len(), 0, "values buffer should remain empty");
2390 }
2391
2392 #[test]
2393 fn test_empty_large_utf8_ipc_writes_nonempty_offsets_buffer() {
2394 let name = LargeStringArray::from(Vec::<String>::new());
2395 let (offsets, values) = get_byte_array_buffers::<i64>(&name.to_data());
2396
2397 assert_eq!(name.len(), 0);
2398 assert_eq!(
2399 offsets.len(),
2400 std::mem::size_of::<i64>(),
2401 "offsets buffer should contain one zero i64 offset"
2402 );
2403 assert_eq!(values.len(), 0, "values buffer should remain empty");
2404 }
2405
2406 #[test]
2407 fn test_empty_list_ipc_writes_nonempty_offsets_buffer() {
2408 let list = GenericListBuilder::<i32, _>::new(UInt32Builder::new()).finish();
2409 let (offsets, child_data) = get_list_array_buffers::<i32>(&list.to_data());
2410
2411 assert_eq!(list.len(), 0);
2412 assert_eq!(
2413 offsets.len(),
2414 std::mem::size_of::<i32>(),
2415 "offsets buffer should contain one zero i32 offset"
2416 );
2417 assert_eq!(child_data.len(), 0, "child data should remain empty");
2418 }
2419
2420 #[test]
2421 fn test_empty_large_list_ipc_writes_nonempty_offsets_buffer() {
2422 let list = GenericListBuilder::<i64, _>::new(UInt32Builder::new()).finish();
2423 let (offsets, child_data) = get_list_array_buffers::<i64>(&list.to_data());
2424
2425 assert_eq!(list.len(), 0);
2426 assert_eq!(
2427 offsets.len(),
2428 std::mem::size_of::<i64>(),
2429 "offsets buffer should contain one zero i64 offset"
2430 );
2431 assert_eq!(child_data.len(), 0, "child data should remain empty");
2432 }
2433
2434 fn write_null_file(options: IpcWriteOptions) {
2435 let schema = Schema::new(vec![
2436 Field::new("nulls", DataType::Null, true),
2437 Field::new("int32s", DataType::Int32, false),
2438 Field::new("nulls2", DataType::Null, true),
2439 Field::new("f64s", DataType::Float64, false),
2440 ]);
2441 let array1 = NullArray::new(32);
2442 let array2 = Int32Array::from(vec![1; 32]);
2443 let array3 = NullArray::new(32);
2444 let array4 = Float64Array::from(vec![f64::NAN; 32]);
2445 let batch = RecordBatch::try_new(
2446 Arc::new(schema.clone()),
2447 vec![
2448 Arc::new(array1) as ArrayRef,
2449 Arc::new(array2) as ArrayRef,
2450 Arc::new(array3) as ArrayRef,
2451 Arc::new(array4) as ArrayRef,
2452 ],
2453 )
2454 .unwrap();
2455 let mut file = tempfile::tempfile().unwrap();
2456 {
2457 let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2458
2459 writer.write(&batch).unwrap();
2460 writer.finish().unwrap();
2461 }
2462
2463 file.rewind().unwrap();
2464
2465 {
2466 let reader = FileReader::try_new(file, None).unwrap();
2467 reader.for_each(|maybe_batch| {
2468 maybe_batch
2469 .unwrap()
2470 .columns()
2471 .iter()
2472 .zip(batch.columns())
2473 .for_each(|(a, b)| {
2474 assert_eq!(a.data_type(), b.data_type());
2475 assert_eq!(a.len(), b.len());
2476 assert_eq!(a.null_count(), b.null_count());
2477 });
2478 });
2479 }
2480 }
2481 #[test]
2482 fn test_write_null_file_v4() {
2483 write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2484 write_null_file(IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap());
2485 write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V4).unwrap());
2486 write_null_file(IpcWriteOptions::try_new(64, true, MetadataVersion::V4).unwrap());
2487 }
2488
2489 #[test]
2490 fn test_write_null_file_v5() {
2491 write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2492 write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V5).unwrap());
2493 }
2494
2495 #[test]
2496 fn track_union_nested_dict() {
2497 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2498
2499 let array = Arc::new(inner) as ArrayRef;
2500
2501 #[allow(deprecated)]
2503 let dctfield = Field::new_dict("dict", array.data_type().clone(), false, 0, false);
2504 let union_fields = [(0, Arc::new(dctfield))].into_iter().collect();
2505
2506 let types = [0, 0, 0].into_iter().collect::<ScalarBuffer<i8>>();
2507 let offsets = [0, 1, 2].into_iter().collect::<ScalarBuffer<i32>>();
2508
2509 let union = UnionArray::try_new(union_fields, types, Some(offsets), vec![array]).unwrap();
2510
2511 let schema = Arc::new(Schema::new(vec![Field::new(
2512 "union",
2513 union.data_type().clone(),
2514 false,
2515 )]));
2516
2517 let r#gen = IpcDataGenerator::default();
2518 let mut dict_tracker = DictionaryTracker::new(false);
2519 r#gen.schema_to_bytes_with_dictionary_tracker(
2520 &schema,
2521 &mut dict_tracker,
2522 &IpcWriteOptions::default(),
2523 );
2524
2525 let batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
2526
2527 r#gen
2528 .encode(
2529 &batch,
2530 &mut dict_tracker,
2531 &Default::default(),
2532 &mut Default::default(),
2533 )
2534 .unwrap();
2535
2536 assert!(dict_tracker.written.contains_key(&0));
2539 }
2540
2541 #[test]
2542 fn track_struct_nested_dict() {
2543 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2544
2545 let array = Arc::new(inner) as ArrayRef;
2546
2547 #[allow(deprecated)]
2549 let dctfield = Arc::new(Field::new_dict(
2550 "dict",
2551 array.data_type().clone(),
2552 false,
2553 2,
2554 false,
2555 ));
2556
2557 let s = StructArray::from(vec![(dctfield, array)]);
2558 let struct_array = Arc::new(s) as ArrayRef;
2559
2560 let schema = Arc::new(Schema::new(vec![Field::new(
2561 "struct",
2562 struct_array.data_type().clone(),
2563 false,
2564 )]));
2565
2566 let r#gen = IpcDataGenerator::default();
2567 let mut dict_tracker = DictionaryTracker::new(false);
2568 r#gen.schema_to_bytes_with_dictionary_tracker(
2569 &schema,
2570 &mut dict_tracker,
2571 &IpcWriteOptions::default(),
2572 );
2573
2574 let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2575
2576 r#gen
2577 .encode(
2578 &batch,
2579 &mut dict_tracker,
2580 &Default::default(),
2581 &mut Default::default(),
2582 )
2583 .unwrap();
2584
2585 assert!(dict_tracker.written.contains_key(&0));
2586 }
2587
2588 fn write_union_file(options: IpcWriteOptions) {
2589 let schema = Schema::new(vec![Field::new_union(
2590 "union",
2591 vec![0, 1],
2592 vec![
2593 Field::new("a", DataType::Int32, false),
2594 Field::new("c", DataType::Float64, false),
2595 ],
2596 UnionMode::Sparse,
2597 )]);
2598 let mut builder = UnionBuilder::with_capacity_sparse(5);
2599 builder.append::<Int32Type>("a", 1).unwrap();
2600 builder.append_null::<Int32Type>("a").unwrap();
2601 builder.append::<Float64Type>("c", 3.0).unwrap();
2602 builder.append_null::<Float64Type>("c").unwrap();
2603 builder.append::<Int32Type>("a", 4).unwrap();
2604 let union = builder.build().unwrap();
2605
2606 let batch =
2607 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(union) as ArrayRef])
2608 .unwrap();
2609
2610 let mut file = tempfile::tempfile().unwrap();
2611 {
2612 let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2613
2614 writer.write(&batch).unwrap();
2615 writer.finish().unwrap();
2616 }
2617 file.rewind().unwrap();
2618
2619 {
2620 let reader = FileReader::try_new(file, None).unwrap();
2621 reader.for_each(|maybe_batch| {
2622 maybe_batch
2623 .unwrap()
2624 .columns()
2625 .iter()
2626 .zip(batch.columns())
2627 .for_each(|(a, b)| {
2628 assert_eq!(a.data_type(), b.data_type());
2629 assert_eq!(a.len(), b.len());
2630 assert_eq!(a.null_count(), b.null_count());
2631 });
2632 });
2633 }
2634 }
2635
2636 #[test]
2637 fn test_write_union_file_v4_v5() {
2638 write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2639 write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2640 }
2641
2642 #[test]
2643 fn test_write_view_types() {
2644 const LONG_TEST_STRING: &str =
2645 "This is a long string to make sure binary view array handles it";
2646 let schema = Schema::new(vec![
2647 Field::new("field1", DataType::BinaryView, true),
2648 Field::new("field2", DataType::Utf8View, true),
2649 ]);
2650 let values: Vec<Option<&[u8]>> = vec![
2651 Some(b"foo"),
2652 Some(b"bar"),
2653 Some(LONG_TEST_STRING.as_bytes()),
2654 ];
2655 let binary_array = BinaryViewArray::from_iter(values);
2656 let utf8_array =
2657 StringViewArray::from_iter(vec![Some("foo"), Some("bar"), Some(LONG_TEST_STRING)]);
2658 let record_batch = RecordBatch::try_new(
2659 Arc::new(schema.clone()),
2660 vec![Arc::new(binary_array), Arc::new(utf8_array)],
2661 )
2662 .unwrap();
2663
2664 let mut file = tempfile::tempfile().unwrap();
2665 {
2666 let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2667 writer.write(&record_batch).unwrap();
2668 writer.finish().unwrap();
2669 }
2670 file.rewind().unwrap();
2671 {
2672 let mut reader = FileReader::try_new(&file, None).unwrap();
2673 let read_batch = reader.next().unwrap().unwrap();
2674 read_batch
2675 .columns()
2676 .iter()
2677 .zip(record_batch.columns())
2678 .for_each(|(a, b)| {
2679 assert_eq!(a, b);
2680 });
2681 }
2682 file.rewind().unwrap();
2683 {
2684 let mut reader = FileReader::try_new(&file, Some(vec![0])).unwrap();
2685 let read_batch = reader.next().unwrap().unwrap();
2686 assert_eq!(read_batch.num_columns(), 1);
2687 let read_array = read_batch.column(0);
2688 let write_array = record_batch.column(0);
2689 assert_eq!(read_array, write_array);
2690 }
2691 }
2692
2693 #[test]
2694 fn truncate_ipc_record_batch() {
2695 fn create_batch(rows: usize) -> RecordBatch {
2696 let schema = Schema::new(vec![
2697 Field::new("a", DataType::Int32, false),
2698 Field::new("b", DataType::Utf8, false),
2699 ]);
2700
2701 let a = Int32Array::from_iter_values(0..rows as i32);
2702 let b = StringArray::from_iter_values((0..rows).map(|i| i.to_string()));
2703
2704 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2705 }
2706
2707 let big_record_batch = create_batch(65536);
2708
2709 let length = 5;
2710 let small_record_batch = create_batch(length);
2711
2712 let offset = 2;
2713 let record_batch_slice = big_record_batch.slice(offset, length);
2714 assert!(
2715 serialize_stream(&big_record_batch).len() > serialize_stream(&small_record_batch).len()
2716 );
2717 assert_eq!(
2718 serialize_stream(&small_record_batch).len(),
2719 serialize_stream(&record_batch_slice).len()
2720 );
2721
2722 assert_eq!(
2723 deserialize_stream(serialize_stream(&record_batch_slice)),
2724 record_batch_slice
2725 );
2726 }
2727
2728 #[test]
2729 fn truncate_ipc_record_batch_with_nulls() {
2730 fn create_batch() -> RecordBatch {
2731 let schema = Schema::new(vec![
2732 Field::new("a", DataType::Int32, true),
2733 Field::new("b", DataType::Utf8, true),
2734 ]);
2735
2736 let a = Int32Array::from(vec![Some(1), None, Some(1), None, Some(1)]);
2737 let b = StringArray::from(vec![None, Some("a"), Some("a"), None, Some("a")]);
2738
2739 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2740 }
2741
2742 let record_batch = create_batch();
2743 let record_batch_slice = record_batch.slice(1, 2);
2744 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2745
2746 assert!(
2747 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2748 );
2749
2750 assert!(deserialized_batch.column(0).is_null(0));
2751 assert!(deserialized_batch.column(0).is_valid(1));
2752 assert!(deserialized_batch.column(1).is_valid(0));
2753 assert!(deserialized_batch.column(1).is_valid(1));
2754
2755 assert_eq!(record_batch_slice, deserialized_batch);
2756 }
2757
2758 #[test]
2759 fn truncate_ipc_dictionary_array() {
2760 fn create_batch() -> RecordBatch {
2761 let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
2762 .into_iter()
2763 .collect();
2764 let keys: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2765
2766 let array = DictionaryArray::new(keys, Arc::new(values));
2767
2768 let schema = Schema::new(vec![Field::new("dict", array.data_type().clone(), true)]);
2769
2770 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
2771 }
2772
2773 let record_batch = create_batch();
2774 let record_batch_slice = record_batch.slice(1, 2);
2775 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2776
2777 assert!(
2778 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2779 );
2780
2781 assert!(deserialized_batch.column(0).is_valid(0));
2782 assert!(deserialized_batch.column(0).is_null(1));
2783
2784 assert_eq!(record_batch_slice, deserialized_batch);
2785 }
2786
2787 #[test]
2788 fn truncate_ipc_struct_array() {
2789 fn create_batch() -> RecordBatch {
2790 let strings: StringArray = [Some("foo"), None, Some("bar"), Some("baz")]
2791 .into_iter()
2792 .collect();
2793 let ints: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2794
2795 let struct_array = StructArray::from(vec![
2796 (
2797 Arc::new(Field::new("s", DataType::Utf8, true)),
2798 Arc::new(strings) as ArrayRef,
2799 ),
2800 (
2801 Arc::new(Field::new("c", DataType::Int32, true)),
2802 Arc::new(ints) as ArrayRef,
2803 ),
2804 ]);
2805
2806 let schema = Schema::new(vec![Field::new(
2807 "struct_array",
2808 struct_array.data_type().clone(),
2809 true,
2810 )]);
2811
2812 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(struct_array)]).unwrap()
2813 }
2814
2815 let record_batch = create_batch();
2816 let record_batch_slice = record_batch.slice(1, 2);
2817 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2818
2819 assert!(
2820 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2821 );
2822
2823 let structs = deserialized_batch
2824 .column(0)
2825 .as_any()
2826 .downcast_ref::<StructArray>()
2827 .unwrap();
2828
2829 assert!(structs.column(0).is_null(0));
2830 assert!(structs.column(0).is_valid(1));
2831 assert!(structs.column(1).is_valid(0));
2832 assert!(structs.column(1).is_null(1));
2833 assert_eq!(record_batch_slice, deserialized_batch);
2834 }
2835
2836 #[test]
2837 fn truncate_ipc_string_array_with_all_empty_string() {
2838 fn create_batch() -> RecordBatch {
2839 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
2840 let a = StringArray::from(vec![Some(""), Some(""), Some(""), Some(""), Some("")]);
2841 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap()
2842 }
2843
2844 let record_batch = create_batch();
2845 let record_batch_slice = record_batch.slice(0, 1);
2846 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2847
2848 assert!(
2849 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2850 );
2851 assert_eq!(record_batch_slice, deserialized_batch);
2852 }
2853
2854 #[test]
2855 fn test_stream_writer_writes_array_slice() {
2856 let array = UInt32Array::from(vec![Some(1), Some(2), Some(3)]);
2857 assert_eq!(
2858 vec![Some(1), Some(2), Some(3)],
2859 array.iter().collect::<Vec<_>>()
2860 );
2861
2862 let sliced = array.slice(1, 2);
2863 assert_eq!(vec![Some(2), Some(3)], sliced.iter().collect::<Vec<_>>());
2864
2865 let batch = RecordBatch::try_new(
2866 Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, true)])),
2867 vec![Arc::new(sliced)],
2868 )
2869 .expect("new batch");
2870
2871 let mut writer = StreamWriter::try_new(vec![], batch.schema_ref()).expect("new writer");
2872 writer.write(&batch).expect("write");
2873 let outbuf = writer.into_inner().expect("inner");
2874
2875 let mut reader = StreamReader::try_new(&outbuf[..], None).expect("new reader");
2876 let read_batch = reader.next().unwrap().expect("read batch");
2877
2878 let read_array: &UInt32Array = read_batch.column(0).as_primitive();
2879 assert_eq!(
2880 vec![Some(2), Some(3)],
2881 read_array.iter().collect::<Vec<_>>()
2882 );
2883 }
2884
2885 #[test]
2886 fn test_large_slice_uint32() {
2887 ensure_roundtrip(Arc::new(UInt32Array::from_iter(
2888 (0..8000).map(|i| if i % 2 == 0 { Some(i) } else { None }),
2889 )));
2890 }
2891
2892 #[test]
2893 fn test_large_slice_string() {
2894 let strings: Vec<_> = (0..8000)
2895 .map(|i| {
2896 if i % 2 == 0 {
2897 Some(format!("value{i}"))
2898 } else {
2899 None
2900 }
2901 })
2902 .collect();
2903
2904 ensure_roundtrip(Arc::new(StringArray::from(strings)));
2905 }
2906
2907 #[test]
2908 fn test_large_slice_string_list() {
2909 let mut ls = ListBuilder::new(StringBuilder::new());
2910
2911 let mut s = String::new();
2912 for row_number in 0..8000 {
2913 if row_number % 2 == 0 {
2914 for list_element in 0..1000 {
2915 s.clear();
2916 use std::fmt::Write;
2917 write!(&mut s, "value{row_number}-{list_element}").unwrap();
2918 ls.values().append_value(&s);
2919 }
2920 ls.append(true)
2921 } else {
2922 ls.append(false); }
2924 }
2925
2926 ensure_roundtrip(Arc::new(ls.finish()));
2927 }
2928
2929 #[test]
2930 fn test_large_slice_string_list_of_lists() {
2931 let mut ls = ListBuilder::new(ListBuilder::new(StringBuilder::new()));
2935
2936 for _ in 0..4000 {
2937 ls.values().append(true);
2938 ls.append(true)
2939 }
2940
2941 let mut s = String::new();
2942 for row_number in 0..4000 {
2943 if row_number % 2 == 0 {
2944 for list_element in 0..1000 {
2945 s.clear();
2946 use std::fmt::Write;
2947 write!(&mut s, "value{row_number}-{list_element}").unwrap();
2948 ls.values().values().append_value(&s);
2949 }
2950 ls.values().append(true);
2951 ls.append(true)
2952 } else {
2953 ls.append(false); }
2955 }
2956
2957 ensure_roundtrip(Arc::new(ls.finish()));
2958 }
2959
2960 fn ensure_roundtrip(array: ArrayRef) {
2962 let num_rows = array.len();
2963 let orig_batch = RecordBatch::try_from_iter(vec![("a", array)]).unwrap();
2964 let sliced_batch = orig_batch.slice(1, num_rows - 1);
2966
2967 let schema = orig_batch.schema();
2968 let stream_data = {
2969 let mut writer = StreamWriter::try_new(vec![], &schema).unwrap();
2970 writer.write(&sliced_batch).unwrap();
2971 writer.into_inner().unwrap()
2972 };
2973 let read_batch = {
2974 let projection = None;
2975 let mut reader = StreamReader::try_new(Cursor::new(stream_data), projection).unwrap();
2976 reader
2977 .next()
2978 .expect("expect no errors reading batch")
2979 .expect("expect batch")
2980 };
2981 assert_eq!(sliced_batch, read_batch);
2982
2983 let file_data = {
2984 let mut writer = FileWriter::try_new_buffered(vec![], &schema).unwrap();
2985 writer.write(&sliced_batch).unwrap();
2986 writer.into_inner().unwrap().into_inner().unwrap()
2987 };
2988 let read_batch = {
2989 let projection = None;
2990 let mut reader = FileReader::try_new(Cursor::new(file_data), projection).unwrap();
2991 reader
2992 .next()
2993 .expect("expect no errors reading batch")
2994 .expect("expect batch")
2995 };
2996 assert_eq!(sliced_batch, read_batch);
2997
2998 }
3000
3001 #[test]
3002 fn encode_bools_slice() {
3003 assert_bool_roundtrip([true, false], 1, 1);
3005
3006 assert_bool_roundtrip(
3008 [
3009 true, false, true, true, false, false, true, true, true, false, false, false, true,
3010 true, true, true, false, false, false, false, true, true, true, true, true, false,
3011 false, false, false, false,
3012 ],
3013 13,
3014 17,
3015 );
3016
3017 assert_bool_roundtrip(
3019 [
3020 true, false, true, true, false, false, true, true, true, false, false, false,
3021 ],
3022 8,
3023 2,
3024 );
3025
3026 assert_bool_roundtrip(
3028 [
3029 true, false, true, true, false, false, true, true, true, false, false, false, true,
3030 true, true, true, true, false, false, false, false, false,
3031 ],
3032 8,
3033 8,
3034 );
3035 }
3036
3037 fn assert_bool_roundtrip<const N: usize>(bools: [bool; N], offset: usize, length: usize) {
3038 let val_bool_field = Field::new("val", DataType::Boolean, false);
3039
3040 let schema = Arc::new(Schema::new(vec![val_bool_field]));
3041
3042 let bools = BooleanArray::from(bools.to_vec());
3043
3044 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(bools)]).unwrap();
3045 let batch = batch.slice(offset, length);
3046
3047 let data = serialize_stream(&batch);
3048 let batch2 = deserialize_stream(data);
3049 assert_eq!(batch, batch2);
3050 }
3051
3052 #[test]
3053 fn test_run_array_unslice() {
3054 let total_len = 80;
3055 let vals: Vec<Option<i32>> = vec![Some(1), None, Some(2), Some(3), Some(4), None, Some(5)];
3056 let repeats: Vec<usize> = vec![3, 4, 1, 2];
3057 let mut input_array: Vec<Option<i32>> = Vec::with_capacity(total_len);
3058 for ix in 0_usize..32 {
3059 let repeat: usize = repeats[ix % repeats.len()];
3060 let val: Option<i32> = vals[ix % vals.len()];
3061 input_array.resize(input_array.len() + repeat, val);
3062 }
3063
3064 let mut builder =
3066 PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
3067 builder.extend(input_array.iter().copied());
3068 let run_array = builder.finish();
3069
3070 for slice_len in 1..=total_len {
3072 let sliced_run_array: RunArray<Int16Type> =
3074 run_array.slice(0, slice_len).into_data().into();
3075
3076 let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
3078 let typed = unsliced_run_array
3079 .downcast::<PrimitiveArray<Int32Type>>()
3080 .unwrap();
3081 let expected: Vec<Option<i32>> = input_array.iter().take(slice_len).copied().collect();
3082 let actual: Vec<Option<i32>> = typed.into_iter().collect();
3083 assert_eq!(expected, actual);
3084
3085 let sliced_run_array: RunArray<Int16Type> = run_array
3087 .slice(total_len - slice_len, slice_len)
3088 .into_data()
3089 .into();
3090
3091 let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
3093 let typed = unsliced_run_array
3094 .downcast::<PrimitiveArray<Int32Type>>()
3095 .unwrap();
3096 let expected: Vec<Option<i32>> = input_array
3097 .iter()
3098 .skip(total_len - slice_len)
3099 .copied()
3100 .collect();
3101 let actual: Vec<Option<i32>> = typed.into_iter().collect();
3102 assert_eq!(expected, actual);
3103 }
3104 }
3105
3106 fn generate_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3107 let mut ls = GenericListBuilder::<O, _>::new(UInt32Builder::new());
3108
3109 for i in 0..100_000 {
3110 for value in [i, i, i] {
3111 ls.values().append_value(value);
3112 }
3113 ls.append(true)
3114 }
3115
3116 ls.finish()
3117 }
3118
3119 fn generate_utf8view_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3120 let mut ls = GenericListBuilder::<O, _>::new(StringViewBuilder::new());
3121
3122 for i in 0..100_000 {
3123 for value in [
3124 format!("value{}", i),
3125 format!("value{}", i),
3126 format!("value{}", i),
3127 ] {
3128 ls.values().append_value(&value);
3129 }
3130 ls.append(true)
3131 }
3132
3133 ls.finish()
3134 }
3135
3136 fn generate_string_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3137 let mut ls = GenericListBuilder::<O, _>::new(StringBuilder::new());
3138
3139 for i in 0..100_000 {
3140 for value in [
3141 format!("value{}", i),
3142 format!("value{}", i),
3143 format!("value{}", i),
3144 ] {
3145 ls.values().append_value(&value);
3146 }
3147 ls.append(true)
3148 }
3149
3150 ls.finish()
3151 }
3152
3153 fn generate_nested_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3154 let mut ls =
3155 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
3156
3157 for _i in 0..10_000 {
3158 for j in 0..10 {
3159 for value in [j, j, j, j] {
3160 ls.values().values().append_value(value);
3161 }
3162 ls.values().append(true)
3163 }
3164 ls.append(true);
3165 }
3166
3167 ls.finish()
3168 }
3169
3170 fn generate_nested_list_data_starting_at_zero<O: OffsetSizeTrait>() -> GenericListArray<O> {
3171 let mut ls =
3172 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
3173
3174 for _i in 0..999 {
3175 ls.values().append(true);
3176 ls.append(true);
3177 }
3178
3179 for j in 0..10 {
3180 for value in [j, j, j, j] {
3181 ls.values().values().append_value(value);
3182 }
3183 ls.values().append(true)
3184 }
3185 ls.append(true);
3186
3187 for i in 0..9_000 {
3188 for j in 0..10 {
3189 for value in [i + j, i + j, i + j, i + j] {
3190 ls.values().values().append_value(value);
3191 }
3192 ls.values().append(true)
3193 }
3194 ls.append(true);
3195 }
3196
3197 ls.finish()
3198 }
3199
3200 fn generate_map_array_data() -> MapArray {
3201 let keys_builder = UInt32Builder::new();
3202 let values_builder = UInt32Builder::new();
3203
3204 let mut builder = MapBuilder::new(None, keys_builder, values_builder);
3205
3206 for i in 0..100_000 {
3207 for _j in 0..3 {
3208 builder.keys().append_value(i);
3209 builder.values().append_value(i * 2);
3210 }
3211 builder.append(true).unwrap();
3212 }
3213
3214 builder.finish()
3215 }
3216
3217 #[test]
3218 fn reencode_offsets_when_first_offset_is_not_zero() {
3219 let original_list = generate_list_data::<i32>();
3220 let original_data = original_list.into_data();
3221 let slice_data = original_data.slice(75, 7);
3222 let (new_offsets, original_start, length) =
3223 reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
3224 assert_eq!(
3225 vec![0, 3, 6, 9, 12, 15, 18, 21],
3226 new_offsets.typed_data::<i32>()
3227 );
3228 assert_eq!(225, original_start);
3229 assert_eq!(21, length);
3230 }
3231
3232 #[test]
3233 fn reencode_offsets_when_first_offset_is_zero() {
3234 let mut ls = GenericListBuilder::<i32, _>::new(UInt32Builder::new());
3235 ls.append(true);
3237 ls.values().append_value(35);
3238 ls.values().append_value(42);
3239 ls.append(true);
3240 let original_list = ls.finish();
3241 let original_data = original_list.into_data();
3242
3243 let slice_data = original_data.slice(1, 1);
3244 let (new_offsets, original_start, length) =
3245 reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
3246 assert_eq!(vec![0, 2], new_offsets.typed_data::<i32>());
3247 assert_eq!(0, original_start);
3248 assert_eq!(2, length);
3249 }
3250
3251 fn roundtrip_ensure_sliced_smaller(in_batch: RecordBatch, expected_size_factor: usize) {
3254 let in_sliced = in_batch.slice(999, 1);
3256
3257 let bytes_batch = serialize_file(&in_batch);
3258 let bytes_sliced = serialize_file(&in_sliced);
3259
3260 assert!(bytes_sliced.len() < (bytes_batch.len() / expected_size_factor));
3262
3263 let out_batch = deserialize_file(bytes_batch);
3265 assert_eq!(in_batch, out_batch);
3266
3267 let out_sliced = deserialize_file(bytes_sliced);
3268 assert_eq!(in_sliced, out_sliced);
3269 }
3270
3271 #[test]
3272 fn encode_lists() {
3273 let val_inner = Field::new_list_field(DataType::UInt32, true);
3274 let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
3275 let schema = Arc::new(Schema::new(vec![val_list_field]));
3276
3277 let values = Arc::new(generate_list_data::<i32>());
3278
3279 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3280 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3281 }
3282
3283 #[test]
3284 fn encode_empty_list() {
3285 let val_inner = Field::new_list_field(DataType::UInt32, true);
3286 let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
3287 let schema = Arc::new(Schema::new(vec![val_list_field]));
3288
3289 let values = Arc::new(generate_list_data::<i32>());
3290
3291 let in_batch = RecordBatch::try_new(schema, vec![values])
3292 .unwrap()
3293 .slice(999, 0);
3294 let out_batch = deserialize_file(serialize_file(&in_batch));
3295 assert_eq!(in_batch, out_batch);
3296 }
3297
3298 #[test]
3299 fn encode_large_lists() {
3300 let val_inner = Field::new_list_field(DataType::UInt32, true);
3301 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3302 let schema = Arc::new(Schema::new(vec![val_list_field]));
3303
3304 let values = Arc::new(generate_list_data::<i64>());
3305
3306 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3309 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3310 }
3311
3312 #[test]
3313 fn encode_large_lists_non_zero_offset() {
3314 let val_inner = Field::new_list_field(DataType::UInt32, true);
3315 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3316 let schema = Arc::new(Schema::new(vec![val_list_field]));
3317
3318 let values = Arc::new(generate_list_data::<i64>());
3319
3320 check_sliced_list_array(schema, values);
3321 }
3322
3323 #[test]
3324 fn encode_large_lists_string_non_zero_offset() {
3325 let val_inner = Field::new_list_field(DataType::Utf8, true);
3326 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3327 let schema = Arc::new(Schema::new(vec![val_list_field]));
3328
3329 let values = Arc::new(generate_string_list_data::<i64>());
3330
3331 check_sliced_list_array(schema, values);
3332 }
3333
3334 #[test]
3335 fn encode_large_list_string_view_non_zero_offset() {
3336 let val_inner = Field::new_list_field(DataType::Utf8View, true);
3337 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3338 let schema = Arc::new(Schema::new(vec![val_list_field]));
3339
3340 let values = Arc::new(generate_utf8view_list_data::<i64>());
3341
3342 check_sliced_list_array(schema, values);
3343 }
3344
3345 fn check_sliced_list_array(schema: Arc<Schema>, values: Arc<GenericListArray<i64>>) {
3346 for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3347 let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3348 .unwrap()
3349 .slice(offset, len);
3350 let out_batch = deserialize_file(serialize_file(&in_batch));
3351 assert_eq!(in_batch, out_batch);
3352 }
3353 }
3354
3355 #[test]
3356 fn encode_nested_lists() {
3357 let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
3358 let inner_list_field = Arc::new(Field::new_list_field(DataType::List(inner_int), true));
3359 let list_field = Field::new("val", DataType::List(inner_list_field), true);
3360 let schema = Arc::new(Schema::new(vec![list_field]));
3361
3362 let values = Arc::new(generate_nested_list_data::<i32>());
3363
3364 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3365 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3366 }
3367
3368 #[test]
3369 fn encode_nested_lists_starting_at_zero() {
3370 let inner_int = Arc::new(Field::new("item", DataType::UInt32, true));
3371 let inner_list_field = Arc::new(Field::new("item", DataType::List(inner_int), true));
3372 let list_field = Field::new("val", DataType::List(inner_list_field), true);
3373 let schema = Arc::new(Schema::new(vec![list_field]));
3374
3375 let values = Arc::new(generate_nested_list_data_starting_at_zero::<i32>());
3376
3377 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3378 roundtrip_ensure_sliced_smaller(in_batch, 1);
3379 }
3380
3381 #[test]
3382 fn encode_map_array() {
3383 let keys = Arc::new(Field::new("keys", DataType::UInt32, false));
3384 let values = Arc::new(Field::new("values", DataType::UInt32, true));
3385 let map_field = Field::new_map("map", "entries", keys, values, false, true);
3386 let schema = Arc::new(Schema::new(vec![map_field]));
3387
3388 let values = Arc::new(generate_map_array_data());
3389
3390 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3391 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3392 }
3393
3394 fn generate_list_view_data<O: OffsetSizeTrait>() -> GenericListViewArray<O> {
3395 let mut builder = GenericListViewBuilder::<O, _>::new(UInt32Builder::new());
3396
3397 for i in 0u32..100_000 {
3398 if i.is_multiple_of(10_000) {
3399 builder.append(false);
3400 continue;
3401 }
3402 for value in [i, i, i] {
3403 builder.values().append_value(value);
3404 }
3405 builder.append(true);
3406 }
3407
3408 builder.finish()
3409 }
3410
3411 #[test]
3412 fn encode_list_view_arrays() {
3413 let val_inner = Field::new_list_field(DataType::UInt32, true);
3414 let val_field = Field::new("val", DataType::ListView(Arc::new(val_inner)), true);
3415 let schema = Arc::new(Schema::new(vec![val_field]));
3416
3417 let values = Arc::new(generate_list_view_data::<i32>());
3418
3419 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3420 let out_batch = deserialize_file(serialize_file(&in_batch));
3421 assert_eq!(in_batch, out_batch);
3422 }
3423
3424 #[test]
3425 fn encode_large_list_view_arrays() {
3426 let val_inner = Field::new_list_field(DataType::UInt32, true);
3427 let val_field = Field::new("val", DataType::LargeListView(Arc::new(val_inner)), true);
3428 let schema = Arc::new(Schema::new(vec![val_field]));
3429
3430 let values = Arc::new(generate_list_view_data::<i64>());
3431
3432 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3433 let out_batch = deserialize_file(serialize_file(&in_batch));
3434 assert_eq!(in_batch, out_batch);
3435 }
3436
3437 #[test]
3438 fn check_sliced_list_view_array() {
3439 let inner = Field::new_list_field(DataType::UInt32, true);
3440 let field = Field::new("val", DataType::ListView(Arc::new(inner)), true);
3441 let schema = Arc::new(Schema::new(vec![field]));
3442 let values = Arc::new(generate_list_view_data::<i32>());
3443
3444 for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3445 let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3446 .unwrap()
3447 .slice(offset, len);
3448 let out_batch = deserialize_file(serialize_file(&in_batch));
3449 assert_eq!(in_batch, out_batch);
3450 }
3451 }
3452
3453 #[test]
3454 fn check_sliced_large_list_view_array() {
3455 let inner = Field::new_list_field(DataType::UInt32, true);
3456 let field = Field::new("val", DataType::LargeListView(Arc::new(inner)), true);
3457 let schema = Arc::new(Schema::new(vec![field]));
3458 let values = Arc::new(generate_list_view_data::<i64>());
3459
3460 for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3461 let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3462 .unwrap()
3463 .slice(offset, len);
3464 let out_batch = deserialize_file(serialize_file(&in_batch));
3465 assert_eq!(in_batch, out_batch);
3466 }
3467 }
3468
3469 fn generate_nested_list_view_data<O: OffsetSizeTrait>() -> GenericListViewArray<O> {
3470 let inner_builder = UInt32Builder::new();
3471 let middle_builder = GenericListViewBuilder::<O, _>::new(inner_builder);
3472 let mut outer_builder = GenericListViewBuilder::<O, _>::new(middle_builder);
3473
3474 for i in 0u32..10_000 {
3475 if i.is_multiple_of(1_000) {
3476 outer_builder.append(false);
3477 continue;
3478 }
3479
3480 for _ in 0..3 {
3481 for value in [i, i + 1, i + 2] {
3482 outer_builder.values().values().append_value(value);
3483 }
3484 outer_builder.values().append(true);
3485 }
3486 outer_builder.append(true);
3487 }
3488
3489 outer_builder.finish()
3490 }
3491
3492 #[test]
3493 fn encode_nested_list_views() {
3494 let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
3495 let inner_list_field = Arc::new(Field::new_list_field(DataType::ListView(inner_int), true));
3496 let list_field = Field::new("val", DataType::ListView(inner_list_field), true);
3497 let schema = Arc::new(Schema::new(vec![list_field]));
3498
3499 let values = Arc::new(generate_nested_list_view_data::<i32>());
3500
3501 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3502 let out_batch = deserialize_file(serialize_file(&in_batch));
3503 assert_eq!(in_batch, out_batch);
3504 }
3505
3506 fn test_roundtrip_list_view_of_dict_impl<OffsetSize: OffsetSizeTrait, U: ArrowNativeType>(
3507 list_data_type: DataType,
3508 offsets: &[U; 5],
3509 sizes: &[U; 4],
3510 ) {
3511 let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3512 let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3513 let dict_array = DictionaryArray::new(keys, Arc::new(values));
3514 let dict_data = dict_array.to_data();
3515
3516 let value_offsets = Buffer::from_slice_ref(offsets);
3517 let value_sizes = Buffer::from_slice_ref(sizes);
3518
3519 let list_data = ArrayData::builder(list_data_type)
3520 .len(4)
3521 .add_buffer(value_offsets)
3522 .add_buffer(value_sizes)
3523 .add_child_data(dict_data)
3524 .build()
3525 .unwrap();
3526 let list_view_array = GenericListViewArray::<OffsetSize>::from(list_data);
3527
3528 let schema = Arc::new(Schema::new(vec![Field::new(
3529 "f1",
3530 list_view_array.data_type().clone(),
3531 false,
3532 )]));
3533 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(list_view_array)]).unwrap();
3534
3535 let output_batch = deserialize_file(serialize_file(&input_batch));
3536 assert_eq!(input_batch, output_batch);
3537
3538 let output_batch = deserialize_stream(serialize_stream(&input_batch));
3539 assert_eq!(input_batch, output_batch);
3540 }
3541
3542 #[test]
3543 fn test_roundtrip_list_view_of_dict() {
3544 #[allow(deprecated)]
3545 let list_data_type = DataType::ListView(Arc::new(Field::new_dict(
3546 "item",
3547 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3548 true,
3549 1,
3550 false,
3551 )));
3552 let offsets: &[i32; 5] = &[0, 2, 4, 4, 7];
3553 let sizes: &[i32; 4] = &[2, 2, 0, 3];
3554 test_roundtrip_list_view_of_dict_impl::<i32, i32>(list_data_type, offsets, sizes);
3555 }
3556
3557 #[test]
3558 fn test_roundtrip_large_list_view_of_dict() {
3559 #[allow(deprecated)]
3560 let list_data_type = DataType::LargeListView(Arc::new(Field::new_dict(
3561 "item",
3562 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3563 true,
3564 2,
3565 false,
3566 )));
3567 let offsets: &[i64; 5] = &[0, 2, 4, 4, 7];
3568 let sizes: &[i64; 4] = &[2, 2, 0, 3];
3569 test_roundtrip_list_view_of_dict_impl::<i64, i64>(list_data_type, offsets, sizes);
3570 }
3571
3572 #[test]
3573 fn test_roundtrip_sliced_list_view_of_dict() {
3574 #[allow(deprecated)]
3575 let list_data_type = DataType::ListView(Arc::new(Field::new_dict(
3576 "item",
3577 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3578 true,
3579 3,
3580 false,
3581 )));
3582
3583 let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3584 let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2, 1, 0, 3, 2, 1]);
3585 let dict_array = DictionaryArray::new(keys, Arc::new(values));
3586 let dict_data = dict_array.to_data();
3587
3588 let offsets: &[i32; 7] = &[0, 2, 4, 4, 7, 9, 12];
3589 let sizes: &[i32; 6] = &[2, 2, 0, 3, 2, 3];
3590 let value_offsets = Buffer::from_slice_ref(offsets);
3591 let value_sizes = Buffer::from_slice_ref(sizes);
3592
3593 let list_data = ArrayData::builder(list_data_type)
3594 .len(6)
3595 .add_buffer(value_offsets)
3596 .add_buffer(value_sizes)
3597 .add_child_data(dict_data)
3598 .build()
3599 .unwrap();
3600 let list_view_array = GenericListViewArray::<i32>::from(list_data);
3601
3602 let schema = Arc::new(Schema::new(vec![Field::new(
3603 "f1",
3604 list_view_array.data_type().clone(),
3605 false,
3606 )]));
3607 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(list_view_array)]).unwrap();
3608
3609 let sliced_batch = input_batch.slice(1, 4);
3610
3611 let output_batch = deserialize_file(serialize_file(&sliced_batch));
3612 assert_eq!(sliced_batch, output_batch);
3613
3614 let output_batch = deserialize_stream(serialize_stream(&sliced_batch));
3615 assert_eq!(sliced_batch, output_batch);
3616 }
3617
3618 #[test]
3619 fn test_roundtrip_dense_union_of_dict() {
3620 let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3621 let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3622 let dict_array = DictionaryArray::new(keys, Arc::new(values));
3623
3624 #[allow(deprecated)]
3625 let dict_field = Arc::new(Field::new_dict(
3626 "dict",
3627 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3628 true,
3629 1,
3630 false,
3631 ));
3632 let int_field = Arc::new(Field::new("int", DataType::Int32, false));
3633 let union_fields = UnionFields::try_new(vec![0, 1], vec![dict_field, int_field]).unwrap();
3634
3635 let types = ScalarBuffer::from(vec![0i8, 0, 1, 0, 1, 0, 0]);
3636 let offsets = ScalarBuffer::from(vec![0i32, 1, 0, 2, 1, 3, 4]);
3637
3638 let int_array = Int32Array::from(vec![100, 200]);
3639
3640 let union = UnionArray::try_new(
3641 union_fields.clone(),
3642 types,
3643 Some(offsets),
3644 vec![Arc::new(dict_array), Arc::new(int_array)],
3645 )
3646 .unwrap();
3647
3648 let schema = Arc::new(Schema::new(vec![Field::new(
3649 "union",
3650 DataType::Union(union_fields, UnionMode::Dense),
3651 false,
3652 )]));
3653 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
3654
3655 let output_batch = deserialize_file(serialize_file(&input_batch));
3656 assert_eq!(input_batch, output_batch);
3657
3658 let output_batch = deserialize_stream(serialize_stream(&input_batch));
3659 assert_eq!(input_batch, output_batch);
3660 }
3661
3662 #[test]
3663 fn test_roundtrip_sparse_union_of_dict() {
3664 let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3665 let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3666 let dict_array = DictionaryArray::new(keys, Arc::new(values));
3667
3668 #[allow(deprecated)]
3669 let dict_field = Arc::new(Field::new_dict(
3670 "dict",
3671 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3672 true,
3673 2,
3674 false,
3675 ));
3676 let int_field = Arc::new(Field::new("int", DataType::Int32, false));
3677 let union_fields = UnionFields::try_new(vec![0, 1], vec![dict_field, int_field]).unwrap();
3678
3679 let types = ScalarBuffer::from(vec![0i8, 0, 1, 0, 1, 0, 0]);
3680
3681 let int_array = Int32Array::from(vec![0, 0, 100, 0, 200, 0, 0]);
3682
3683 let union = UnionArray::try_new(
3684 union_fields.clone(),
3685 types,
3686 None,
3687 vec![Arc::new(dict_array), Arc::new(int_array)],
3688 )
3689 .unwrap();
3690
3691 let schema = Arc::new(Schema::new(vec![Field::new(
3692 "union",
3693 DataType::Union(union_fields, UnionMode::Sparse),
3694 false,
3695 )]));
3696 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
3697
3698 let output_batch = deserialize_file(serialize_file(&input_batch));
3699 assert_eq!(input_batch, output_batch);
3700
3701 let output_batch = deserialize_stream(serialize_stream(&input_batch));
3702 assert_eq!(input_batch, output_batch);
3703 }
3704
3705 #[test]
3706 fn test_roundtrip_map_with_dict_keys() {
3707 let key_values = StringArray::from(vec!["key_a", "key_b", "key_c"]);
3710 let keys = Int32Array::from_iter_values([0, 1, 2, 0, 1, 0]);
3711 let dict_keys = DictionaryArray::new(keys, Arc::new(key_values));
3712
3713 let values = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
3714
3715 #[allow(deprecated)]
3716 let entries_field = Arc::new(Field::new(
3717 "entries",
3718 DataType::Struct(
3719 vec![
3720 Field::new_dict(
3721 "key",
3722 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3723 false,
3724 1,
3725 false,
3726 ),
3727 Field::new("value", DataType::Int32, true),
3728 ]
3729 .into(),
3730 ),
3731 false,
3732 ));
3733
3734 let entries = StructArray::from(vec![
3735 (
3736 Arc::new(Field::new(
3737 "key",
3738 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3739 false,
3740 )),
3741 Arc::new(dict_keys) as ArrayRef,
3742 ),
3743 (
3744 Arc::new(Field::new("value", DataType::Int32, true)),
3745 Arc::new(values) as ArrayRef,
3746 ),
3747 ]);
3748
3749 let offsets = Buffer::from_slice_ref([0i32, 2, 4, 6]);
3750
3751 let map_data = ArrayData::builder(DataType::Map(entries_field, false))
3752 .len(3)
3753 .add_buffer(offsets)
3754 .add_child_data(entries.into_data())
3755 .build()
3756 .unwrap();
3757 let map_array = MapArray::from(map_data);
3758
3759 let schema = Arc::new(Schema::new(vec![Field::new(
3760 "map",
3761 map_array.data_type().clone(),
3762 false,
3763 )]));
3764 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(map_array)]).unwrap();
3765
3766 let output_batch = deserialize_file(serialize_file(&input_batch));
3767 assert_eq!(input_batch, output_batch);
3768
3769 let output_batch = deserialize_stream(serialize_stream(&input_batch));
3770 assert_eq!(input_batch, output_batch);
3771 }
3772
3773 #[test]
3774 fn test_roundtrip_map_with_dict_values() {
3775 let keys = StringArray::from(vec!["a", "b", "c", "d", "e", "f"]);
3778
3779 let value_values = StringArray::from(vec!["val_x", "val_y", "val_z"]);
3780 let value_keys = Int32Array::from_iter_values([0, 1, 2, 0, 1, 0]);
3781 let dict_values = DictionaryArray::new(value_keys, Arc::new(value_values));
3782
3783 #[allow(deprecated)]
3784 let entries_field = Arc::new(Field::new(
3785 "entries",
3786 DataType::Struct(
3787 vec![
3788 Field::new("key", DataType::Utf8, false),
3789 Field::new_dict(
3790 "value",
3791 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3792 true,
3793 2,
3794 false,
3795 ),
3796 ]
3797 .into(),
3798 ),
3799 false,
3800 ));
3801
3802 let entries = StructArray::from(vec![
3803 (
3804 Arc::new(Field::new("key", DataType::Utf8, false)),
3805 Arc::new(keys) as ArrayRef,
3806 ),
3807 (
3808 Arc::new(Field::new(
3809 "value",
3810 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3811 true,
3812 )),
3813 Arc::new(dict_values) as ArrayRef,
3814 ),
3815 ]);
3816
3817 let offsets = Buffer::from_slice_ref([0i32, 2, 4, 6]);
3818
3819 let map_data = ArrayData::builder(DataType::Map(entries_field, false))
3820 .len(3)
3821 .add_buffer(offsets)
3822 .add_child_data(entries.into_data())
3823 .build()
3824 .unwrap();
3825 let map_array = MapArray::from(map_data);
3826
3827 let schema = Arc::new(Schema::new(vec![Field::new(
3828 "map",
3829 map_array.data_type().clone(),
3830 false,
3831 )]));
3832 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(map_array)]).unwrap();
3833
3834 let output_batch = deserialize_file(serialize_file(&input_batch));
3835 assert_eq!(input_batch, output_batch);
3836
3837 let output_batch = deserialize_stream(serialize_stream(&input_batch));
3838 assert_eq!(input_batch, output_batch);
3839 }
3840
3841 #[test]
3842 fn test_decimal128_alignment16_is_sufficient() {
3843 const IPC_ALIGNMENT: usize = 16;
3844
3845 for num_cols in [1, 2, 3, 17, 50, 73, 99] {
3850 let num_rows = (num_cols * 7 + 11) % 100; let mut fields = Vec::new();
3853 let mut arrays = Vec::new();
3854 for i in 0..num_cols {
3855 let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
3856 let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
3857 fields.push(field);
3858 arrays.push(Arc::new(array) as Arc<dyn Array>);
3859 }
3860 let schema = Schema::new(fields);
3861 let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
3862
3863 let mut writer = FileWriter::try_new_with_options(
3864 Vec::new(),
3865 batch.schema_ref(),
3866 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
3867 )
3868 .unwrap();
3869 writer.write(&batch).unwrap();
3870 writer.finish().unwrap();
3871
3872 let out: Vec<u8> = writer.into_inner().unwrap();
3873
3874 let buffer = Buffer::from_vec(out);
3875 let trailer_start = buffer.len() - 10;
3876 let footer_len =
3877 read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
3878 let footer =
3879 root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
3880
3881 let schema = fb_to_schema(footer.schema().unwrap());
3882
3883 let decoder =
3886 FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
3887
3888 let batches = footer.recordBatches().unwrap();
3889
3890 let block = batches.get(0);
3891 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
3892 let data = buffer.slice_with_length(block.offset() as _, block_len);
3893
3894 let batch2 = decoder.read_record_batch(block, &data).unwrap().unwrap();
3895
3896 assert_eq!(batch, batch2);
3897 }
3898 }
3899
3900 #[test]
3901 fn test_decimal128_alignment8_is_unaligned() {
3902 const IPC_ALIGNMENT: usize = 8;
3903
3904 let num_cols = 2;
3905 let num_rows = 1;
3906
3907 let mut fields = Vec::new();
3908 let mut arrays = Vec::new();
3909 for i in 0..num_cols {
3910 let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
3911 let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
3912 fields.push(field);
3913 arrays.push(Arc::new(array) as Arc<dyn Array>);
3914 }
3915 let schema = Schema::new(fields);
3916 let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
3917
3918 let mut writer = FileWriter::try_new_with_options(
3919 Vec::new(),
3920 batch.schema_ref(),
3921 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
3922 )
3923 .unwrap();
3924 writer.write(&batch).unwrap();
3925 writer.finish().unwrap();
3926
3927 let out: Vec<u8> = writer.into_inner().unwrap();
3928
3929 let buffer = Buffer::from_vec(out);
3930 let trailer_start = buffer.len() - 10;
3931 let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
3932 let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
3933 let schema = fb_to_schema(footer.schema().unwrap());
3934
3935 let decoder =
3938 FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
3939
3940 let batches = footer.recordBatches().unwrap();
3941
3942 let block = batches.get(0);
3943 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
3944 let data = buffer.slice_with_length(block.offset() as _, block_len);
3945
3946 let result = decoder.read_record_batch(block, &data);
3947
3948 let error = result.unwrap_err();
3949 assert_eq!(
3950 error.to_string(),
3951 "Invalid argument error: Misaligned buffers[0] in array of type Decimal128(38, 10), \
3952 offset from expected alignment of 16 by 8"
3953 );
3954 }
3955
3956 #[test]
3957 fn test_flush() {
3958 let num_cols = 2;
3961 let mut fields = Vec::new();
3962 let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap();
3963 for i in 0..num_cols {
3964 let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
3965 fields.push(field);
3966 }
3967 let schema = Schema::new(fields);
3968 let inner_stream_writer = BufWriter::with_capacity(1024, Vec::new());
3969 let inner_file_writer = BufWriter::with_capacity(1024, Vec::new());
3970 let mut stream_writer =
3971 StreamWriter::try_new_with_options(inner_stream_writer, &schema, options.clone())
3972 .unwrap();
3973 let mut file_writer =
3974 FileWriter::try_new_with_options(inner_file_writer, &schema, options).unwrap();
3975
3976 let stream_bytes_written_on_new = stream_writer.get_ref().get_ref().len();
3977 let file_bytes_written_on_new = file_writer.get_ref().get_ref().len();
3978 stream_writer.flush().unwrap();
3979 file_writer.flush().unwrap();
3980 let stream_bytes_written_on_flush = stream_writer.get_ref().get_ref().len();
3981 let file_bytes_written_on_flush = file_writer.get_ref().get_ref().len();
3982 let stream_out = stream_writer.into_inner().unwrap().into_inner().unwrap();
3983 let expected_stream_flushed_bytes = stream_out.len() - 8;
3987 let expected_file_flushed_bytes = expected_stream_flushed_bytes + 8;
3990
3991 assert!(
3992 stream_bytes_written_on_new < stream_bytes_written_on_flush,
3993 "this test makes no sense if flush is not actually required"
3994 );
3995 assert!(
3996 file_bytes_written_on_new < file_bytes_written_on_flush,
3997 "this test makes no sense if flush is not actually required"
3998 );
3999 assert_eq!(stream_bytes_written_on_flush, expected_stream_flushed_bytes);
4000 assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes);
4001 }
4002
4003 #[test]
4004 fn test_roundtrip_list_of_fixed_list() -> Result<(), ArrowError> {
4005 let l1_type =
4006 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, false)), 3);
4007 let l2_type = DataType::List(Arc::new(Field::new("item", l1_type.clone(), false)));
4008
4009 let l0_builder = Float32Builder::new();
4010 let l1_builder = FixedSizeListBuilder::new(l0_builder, 3).with_field(Arc::new(Field::new(
4011 "item",
4012 DataType::Float32,
4013 false,
4014 )));
4015 let mut l2_builder =
4016 ListBuilder::new(l1_builder).with_field(Arc::new(Field::new("item", l1_type, false)));
4017
4018 for point in [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]] {
4019 l2_builder.values().values().append_value(point[0]);
4020 l2_builder.values().values().append_value(point[1]);
4021 l2_builder.values().values().append_value(point[2]);
4022
4023 l2_builder.values().append(true);
4024 }
4025 l2_builder.append(true);
4026
4027 let point = [10., 11., 12.];
4028 l2_builder.values().values().append_value(point[0]);
4029 l2_builder.values().values().append_value(point[1]);
4030 l2_builder.values().values().append_value(point[2]);
4031
4032 l2_builder.values().append(true);
4033 l2_builder.append(true);
4034
4035 let array = Arc::new(l2_builder.finish()) as ArrayRef;
4036
4037 let schema = Arc::new(Schema::new_with_metadata(
4038 vec![Field::new("points", l2_type, false)],
4039 HashMap::default(),
4040 ));
4041
4042 test_slices(&array, &schema, 0, 1)?;
4045 test_slices(&array, &schema, 0, 2)?;
4046 test_slices(&array, &schema, 1, 1)?;
4047
4048 Ok(())
4049 }
4050
4051 #[test]
4052 fn test_roundtrip_list_of_fixed_list_w_nulls() -> Result<(), ArrowError> {
4053 let l0_builder = Float32Builder::new();
4054 let l1_builder = FixedSizeListBuilder::new(l0_builder, 3);
4055 let mut l2_builder = ListBuilder::new(l1_builder);
4056
4057 for point in [
4058 [Some(1.0), Some(2.0), None],
4059 [Some(4.0), Some(5.0), Some(6.0)],
4060 [None, Some(8.0), Some(9.0)],
4061 ] {
4062 for p in point {
4063 match p {
4064 Some(p) => l2_builder.values().values().append_value(p),
4065 None => l2_builder.values().values().append_null(),
4066 }
4067 }
4068
4069 l2_builder.values().append(true);
4070 }
4071 l2_builder.append(true);
4072
4073 let point = [Some(10.), None, None];
4074 for p in point {
4075 match p {
4076 Some(p) => l2_builder.values().values().append_value(p),
4077 None => l2_builder.values().values().append_null(),
4078 }
4079 }
4080
4081 l2_builder.values().append(true);
4082 l2_builder.append(true);
4083
4084 let array = Arc::new(l2_builder.finish()) as ArrayRef;
4085
4086 let schema = Arc::new(Schema::new_with_metadata(
4087 vec![Field::new(
4088 "points",
4089 DataType::List(Arc::new(Field::new(
4090 "item",
4091 DataType::FixedSizeList(
4092 Arc::new(Field::new("item", DataType::Float32, true)),
4093 3,
4094 ),
4095 true,
4096 ))),
4097 true,
4098 )],
4099 HashMap::default(),
4100 ));
4101
4102 test_slices(&array, &schema, 0, 1)?;
4105 test_slices(&array, &schema, 0, 2)?;
4106 test_slices(&array, &schema, 1, 1)?;
4107
4108 Ok(())
4109 }
4110
4111 fn test_slices(
4112 parent_array: &ArrayRef,
4113 schema: &SchemaRef,
4114 offset: usize,
4115 length: usize,
4116 ) -> Result<(), ArrowError> {
4117 let subarray = parent_array.slice(offset, length);
4118 let original_batch = RecordBatch::try_new(schema.clone(), vec![subarray])?;
4119
4120 let mut bytes = Vec::new();
4121 let mut writer = StreamWriter::try_new(&mut bytes, schema)?;
4122 writer.write(&original_batch)?;
4123 writer.finish()?;
4124
4125 let mut cursor = std::io::Cursor::new(bytes);
4126 let mut reader = StreamReader::try_new(&mut cursor, None)?;
4127 let returned_batch = reader.next().unwrap()?;
4128
4129 assert_eq!(original_batch, returned_batch);
4130
4131 Ok(())
4132 }
4133
4134 #[test]
4135 fn test_roundtrip_fixed_list() -> Result<(), ArrowError> {
4136 let int_builder = Int64Builder::new();
4137 let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3)
4138 .with_field(Arc::new(Field::new("item", DataType::Int64, false)));
4139
4140 for point in [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]] {
4141 fixed_list_builder.values().append_value(point[0]);
4142 fixed_list_builder.values().append_value(point[1]);
4143 fixed_list_builder.values().append_value(point[2]);
4144
4145 fixed_list_builder.append(true);
4146 }
4147
4148 let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
4149
4150 let schema = Arc::new(Schema::new_with_metadata(
4151 vec![Field::new(
4152 "points",
4153 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, false)), 3),
4154 false,
4155 )],
4156 HashMap::default(),
4157 ));
4158
4159 test_slices(&array, &schema, 0, 4)?;
4162 test_slices(&array, &schema, 0, 2)?;
4163 test_slices(&array, &schema, 1, 3)?;
4164 test_slices(&array, &schema, 2, 1)?;
4165
4166 Ok(())
4167 }
4168
4169 #[test]
4170 fn test_roundtrip_fixed_list_w_nulls() -> Result<(), ArrowError> {
4171 let int_builder = Int64Builder::new();
4172 let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3);
4173
4174 for point in [
4175 [Some(1), Some(2), None],
4176 [Some(4), Some(5), Some(6)],
4177 [None, Some(8), Some(9)],
4178 [Some(10), None, None],
4179 ] {
4180 for p in point {
4181 match p {
4182 Some(p) => fixed_list_builder.values().append_value(p),
4183 None => fixed_list_builder.values().append_null(),
4184 }
4185 }
4186
4187 fixed_list_builder.append(true);
4188 }
4189
4190 let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
4191
4192 let schema = Arc::new(Schema::new_with_metadata(
4193 vec![Field::new(
4194 "points",
4195 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 3),
4196 true,
4197 )],
4198 HashMap::default(),
4199 ));
4200
4201 test_slices(&array, &schema, 0, 4)?;
4204 test_slices(&array, &schema, 0, 2)?;
4205 test_slices(&array, &schema, 1, 3)?;
4206 test_slices(&array, &schema, 2, 1)?;
4207
4208 Ok(())
4209 }
4210
4211 #[test]
4212 fn test_metadata_encoding_ordering() {
4213 fn create_hash() -> u64 {
4214 let metadata: HashMap<String, String> = [
4215 ("a", "1"), ("b", "2"), ("c", "3"), ("d", "4"), ("e", "5"), ]
4221 .into_iter()
4222 .map(|(k, v)| (k.to_owned(), v.to_owned()))
4223 .collect();
4224
4225 let schema = Arc::new(
4227 Schema::new(vec![
4228 Field::new("a", DataType::Int64, true).with_metadata(metadata.clone()),
4229 ])
4230 .with_metadata(metadata)
4231 .clone(),
4232 );
4233 let batch = RecordBatch::new_empty(schema.clone());
4234
4235 let mut bytes = Vec::new();
4236 let mut w = StreamWriter::try_new(&mut bytes, batch.schema_ref()).unwrap();
4237 w.write(&batch).unwrap();
4238 w.finish().unwrap();
4239
4240 let mut h = std::hash::DefaultHasher::new();
4241 h.write(&bytes);
4242 h.finish()
4243 }
4244
4245 let expected = create_hash();
4246
4247 let all_passed = (0..20).all(|_| create_hash() == expected);
4252 assert!(all_passed);
4253 }
4254
4255 #[test]
4256 fn test_dictionary_tracker_reset() {
4257 let data_gen = IpcDataGenerator::default();
4258 let mut dictionary_tracker = DictionaryTracker::new(false);
4259 let writer_options = IpcWriteOptions::default();
4260 let mut compression_ctx = CompressionContext::default();
4261
4262 let schema = Arc::new(Schema::new(vec![Field::new(
4263 "a",
4264 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
4265 false,
4266 )]));
4267
4268 let mut write_single_batch_stream =
4269 |batch: RecordBatch, dict_tracker: &mut DictionaryTracker| -> Vec<u8> {
4270 let mut buffer = Vec::new();
4271
4272 let stream_header = data_gen.schema_to_bytes_with_dictionary_tracker(
4274 &schema,
4275 dict_tracker,
4276 &writer_options,
4277 );
4278 _ = write_message(&mut buffer, stream_header, &writer_options).unwrap();
4279
4280 let (encoded_dicts, encoded_batch) = data_gen
4281 .encode(&batch, dict_tracker, &writer_options, &mut compression_ctx)
4282 .unwrap();
4283 for encoded_dict in encoded_dicts {
4284 _ = write_message(&mut buffer, encoded_dict, &writer_options).unwrap();
4285 }
4286 _ = write_message(&mut buffer, encoded_batch, &writer_options).unwrap();
4287
4288 buffer
4289 };
4290
4291 let batch1 = RecordBatch::try_new(
4292 schema.clone(),
4293 vec![Arc::new(DictionaryArray::new(
4294 UInt8Array::from_iter_values([0]),
4295 Arc::new(StringArray::from_iter_values(["a"])),
4296 ))],
4297 )
4298 .unwrap();
4299 let buffer = write_single_batch_stream(batch1.clone(), &mut dictionary_tracker);
4300
4301 let mut reader = StreamReader::try_new(Cursor::new(buffer), None).unwrap();
4303 let read_batch = reader.next().unwrap().unwrap();
4304 assert_eq!(read_batch, batch1);
4305
4306 dictionary_tracker.clear();
4308
4309 let batch2 = RecordBatch::try_new(
4311 schema.clone(),
4312 vec![Arc::new(DictionaryArray::new(
4313 UInt8Array::from_iter_values([0]),
4314 Arc::new(StringArray::from_iter_values(["a"])),
4315 ))],
4316 )
4317 .unwrap();
4318 let buffer = write_single_batch_stream(batch2.clone(), &mut dictionary_tracker);
4319 let mut reader = StreamReader::try_new(Cursor::new(buffer), None).unwrap();
4320 let read_batch = reader.next().unwrap().unwrap();
4321 assert_eq!(read_batch, batch2);
4322 }
4323}