Skip to main content

arrow_ipc/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Arrow IPC File and Stream Writers
19//!
20//! # Notes
21//!
22//! [`FileWriter`] and [`StreamWriter`] have similar interfaces,
23//! however the [`FileWriter`] expects a reader that supports [`Seek`]ing
24//!
25//! [`Seek`]: std::io::Seek
26
27use std::cmp::min;
28use std::collections::HashMap;
29use std::io::{BufWriter, Write};
30use std::mem::size_of;
31use std::sync::Arc;
32
33use flatbuffers::FlatBufferBuilder;
34
35use arrow_array::builder::BufferBuilder;
36use arrow_array::cast::*;
37use arrow_array::types::{Int16Type, Int32Type, Int64Type, RunEndIndexType};
38use arrow_array::*;
39use arrow_buffer::bit_util;
40use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, ToByteSlice};
41use arrow_data::{ArrayData, ArrayDataBuilder, BufferSpec, layout};
42use arrow_schema::*;
43
44use crate::CONTINUATION_MARKER;
45use crate::compression::CompressionCodec;
46pub use crate::compression::CompressionContext;
47use crate::convert::IpcSchemaEncoder;
48
49/// IPC write options used to control the behaviour of the [`IpcDataGenerator`]
50#[derive(Debug, Clone)]
51pub struct IpcWriteOptions {
52    /// Write padding after memory buffers to this multiple of bytes.
53    /// Must be 8, 16, 32, or 64 - defaults to 64.
54    alignment: u8,
55    /// The legacy format is for releases before 0.15.0, and uses metadata V4
56    write_legacy_ipc_format: bool,
57    /// The metadata version to write. The Rust IPC writer supports V4+
58    ///
59    /// *Default versions per crate*
60    ///
61    /// When creating the default IpcWriteOptions, the following metadata versions are used:
62    ///
63    /// version 2.0.0: V4, with legacy format enabled
64    /// version 4.0.0: V5
65    metadata_version: crate::MetadataVersion,
66    /// Compression, if desired. Will result in a runtime error
67    /// if the corresponding feature is not enabled
68    batch_compression_type: Option<crate::CompressionType>,
69    /// How to handle updating dictionaries in IPC messages
70    dictionary_handling: DictionaryHandling,
71}
72
73impl IpcWriteOptions {
74    /// Configures compression when writing IPC files.
75    ///
76    /// Will result in a runtime error if the corresponding feature
77    /// is not enabled
78    pub fn try_with_compression(
79        mut self,
80        batch_compression_type: Option<crate::CompressionType>,
81    ) -> Result<Self, ArrowError> {
82        self.batch_compression_type = batch_compression_type;
83
84        if self.batch_compression_type.is_some()
85            && self.metadata_version < crate::MetadataVersion::V5
86        {
87            return Err(ArrowError::InvalidArgumentError(
88                "Compression only supported in metadata v5 and above".to_string(),
89            ));
90        }
91        Ok(self)
92    }
93    /// Try to create IpcWriteOptions, checking for incompatible settings
94    pub fn try_new(
95        alignment: usize,
96        write_legacy_ipc_format: bool,
97        metadata_version: crate::MetadataVersion,
98    ) -> Result<Self, ArrowError> {
99        let is_alignment_valid =
100            alignment == 8 || alignment == 16 || alignment == 32 || alignment == 64;
101        if !is_alignment_valid {
102            return Err(ArrowError::InvalidArgumentError(
103                "Alignment should be 8, 16, 32, or 64.".to_string(),
104            ));
105        }
106        let alignment: u8 = u8::try_from(alignment).expect("range already checked");
107        match metadata_version {
108            crate::MetadataVersion::V1
109            | crate::MetadataVersion::V2
110            | crate::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError(
111                "Writing IPC metadata version 3 and lower not supported".to_string(),
112            )),
113            #[allow(deprecated)]
114            crate::MetadataVersion::V4 => Ok(Self {
115                alignment,
116                write_legacy_ipc_format,
117                metadata_version,
118                batch_compression_type: None,
119                dictionary_handling: DictionaryHandling::default(),
120            }),
121            crate::MetadataVersion::V5 => {
122                if write_legacy_ipc_format {
123                    Err(ArrowError::InvalidArgumentError(
124                        "Legacy IPC format only supported on metadata version 4".to_string(),
125                    ))
126                } else {
127                    Ok(Self {
128                        alignment,
129                        write_legacy_ipc_format,
130                        metadata_version,
131                        batch_compression_type: None,
132                        dictionary_handling: DictionaryHandling::default(),
133                    })
134                }
135            }
136            z => Err(ArrowError::InvalidArgumentError(format!(
137                "Unsupported crate::MetadataVersion {z:?}"
138            ))),
139        }
140    }
141
142    /// Configure how dictionaries are handled in IPC messages
143    pub fn with_dictionary_handling(mut self, dictionary_handling: DictionaryHandling) -> Self {
144        self.dictionary_handling = dictionary_handling;
145        self
146    }
147}
148
149impl Default for IpcWriteOptions {
150    fn default() -> Self {
151        Self {
152            alignment: 64,
153            write_legacy_ipc_format: false,
154            metadata_version: crate::MetadataVersion::V5,
155            batch_compression_type: None,
156            dictionary_handling: DictionaryHandling::default(),
157        }
158    }
159}
160
161#[derive(Debug, Default)]
162/// Handles low level details of encoding [`Array`] and [`Schema`] into the
163/// [Arrow IPC Format].
164///
165/// # Example
166/// ```
167/// # fn run() {
168/// # use std::sync::Arc;
169/// # use arrow_array::UInt64Array;
170/// # use arrow_array::RecordBatch;
171/// # use arrow_ipc::writer::{CompressionContext, DictionaryTracker, IpcDataGenerator, IpcWriteOptions};
172///
173/// // Create a record batch
174/// let batch = RecordBatch::try_from_iter(vec![
175///  ("col2", Arc::new(UInt64Array::from_iter([10, 23, 33])) as _)
176/// ]).unwrap();
177///
178/// // Error of dictionary ids are replaced.
179/// let error_on_replacement = true;
180/// let options = IpcWriteOptions::default();
181/// let mut dictionary_tracker = DictionaryTracker::new(error_on_replacement);
182///
183/// let mut compression_context = CompressionContext::default();
184///
185/// // encode the batch into zero or more encoded dictionaries
186/// // and the data for the actual array.
187/// let data_gen = IpcDataGenerator::default();
188/// let (encoded_dictionaries, encoded_message) = data_gen
189///   .encode(&batch, &mut dictionary_tracker, &options, &mut compression_context)
190///   .unwrap();
191/// # }
192/// ```
193///
194/// [Arrow IPC Format]: https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc
195pub struct IpcDataGenerator {}
196
197impl IpcDataGenerator {
198    /// Converts a schema to an IPC message along with `dictionary_tracker`
199    /// and returns it encoded inside [EncodedData] as a flatbuffer.
200    pub fn schema_to_bytes_with_dictionary_tracker(
201        &self,
202        schema: &Schema,
203        dictionary_tracker: &mut DictionaryTracker,
204        write_options: &IpcWriteOptions,
205    ) -> EncodedData {
206        let mut fbb = FlatBufferBuilder::new();
207        let schema = {
208            let fb = IpcSchemaEncoder::new()
209                .with_dictionary_tracker(dictionary_tracker)
210                .schema_to_fb_offset(&mut fbb, schema);
211            fb.as_union_value()
212        };
213
214        let mut message = crate::MessageBuilder::new(&mut fbb);
215        message.add_version(write_options.metadata_version);
216        message.add_header_type(crate::MessageHeader::Schema);
217        message.add_bodyLength(0);
218        message.add_header(schema);
219        // TODO: custom metadata
220        let data = message.finish();
221        fbb.finish(data, None);
222
223        let data = fbb.finished_data();
224        EncodedData {
225            ipc_message: data.to_vec(),
226            arrow_data: vec![],
227        }
228    }
229
230    fn _encode_dictionaries<I: Iterator<Item = i64>>(
231        &self,
232        column: &ArrayRef,
233        encoded_dictionaries: &mut Vec<EncodedData>,
234        dictionary_tracker: &mut DictionaryTracker,
235        write_options: &IpcWriteOptions,
236        dict_id: &mut I,
237        compression_context: &mut CompressionContext,
238    ) -> Result<(), ArrowError> {
239        match column.data_type() {
240            DataType::Struct(fields) => {
241                let s = as_struct_array(column);
242                for (field, column) in fields.iter().zip(s.columns()) {
243                    self.encode_dictionaries(
244                        field,
245                        column,
246                        encoded_dictionaries,
247                        dictionary_tracker,
248                        write_options,
249                        dict_id,
250                        compression_context,
251                    )?;
252                }
253            }
254            DataType::RunEndEncoded(_, values) => {
255                let data = column.to_data();
256                if data.child_data().len() != 2 {
257                    return Err(ArrowError::InvalidArgumentError(format!(
258                        "The run encoded array should have exactly two child arrays. Found {}",
259                        data.child_data().len()
260                    )));
261                }
262                // The run_ends array is not expected to be dictionary encoded. Hence encode dictionaries
263                // only for values array.
264                let values_array = make_array(data.child_data()[1].clone());
265                self.encode_dictionaries(
266                    values,
267                    &values_array,
268                    encoded_dictionaries,
269                    dictionary_tracker,
270                    write_options,
271                    dict_id,
272                    compression_context,
273                )?;
274            }
275            DataType::List(field) => {
276                let list = as_list_array(column);
277                self.encode_dictionaries(
278                    field,
279                    list.values(),
280                    encoded_dictionaries,
281                    dictionary_tracker,
282                    write_options,
283                    dict_id,
284                    compression_context,
285                )?;
286            }
287            DataType::LargeList(field) => {
288                let list = as_large_list_array(column);
289                self.encode_dictionaries(
290                    field,
291                    list.values(),
292                    encoded_dictionaries,
293                    dictionary_tracker,
294                    write_options,
295                    dict_id,
296                    compression_context,
297                )?;
298            }
299            DataType::ListView(field) => {
300                let list = column.as_list_view::<i32>();
301                self.encode_dictionaries(
302                    field,
303                    list.values(),
304                    encoded_dictionaries,
305                    dictionary_tracker,
306                    write_options,
307                    dict_id,
308                    compression_context,
309                )?;
310            }
311            DataType::LargeListView(field) => {
312                let list = column.as_list_view::<i64>();
313                self.encode_dictionaries(
314                    field,
315                    list.values(),
316                    encoded_dictionaries,
317                    dictionary_tracker,
318                    write_options,
319                    dict_id,
320                    compression_context,
321                )?;
322            }
323            DataType::FixedSizeList(field, _) => {
324                let list = column
325                    .as_any()
326                    .downcast_ref::<FixedSizeListArray>()
327                    .expect("Unable to downcast to fixed size list array");
328                self.encode_dictionaries(
329                    field,
330                    list.values(),
331                    encoded_dictionaries,
332                    dictionary_tracker,
333                    write_options,
334                    dict_id,
335                    compression_context,
336                )?;
337            }
338            DataType::Map(field, _) => {
339                let map_array = as_map_array(column);
340
341                let (keys, values) = match field.data_type() {
342                    DataType::Struct(fields) if fields.len() == 2 => (&fields[0], &fields[1]),
343                    _ => panic!("Incorrect field data type {:?}", field.data_type()),
344                };
345
346                // keys
347                self.encode_dictionaries(
348                    keys,
349                    map_array.keys(),
350                    encoded_dictionaries,
351                    dictionary_tracker,
352                    write_options,
353                    dict_id,
354                    compression_context,
355                )?;
356
357                // values
358                self.encode_dictionaries(
359                    values,
360                    map_array.values(),
361                    encoded_dictionaries,
362                    dictionary_tracker,
363                    write_options,
364                    dict_id,
365                    compression_context,
366                )?;
367            }
368            DataType::Union(fields, _) => {
369                let union = as_union_array(column);
370                for (type_id, field) in fields.iter() {
371                    let column = union.child(type_id);
372                    self.encode_dictionaries(
373                        field,
374                        column,
375                        encoded_dictionaries,
376                        dictionary_tracker,
377                        write_options,
378                        dict_id,
379                        compression_context,
380                    )?;
381                }
382            }
383            _ => (),
384        }
385
386        Ok(())
387    }
388
389    #[allow(clippy::too_many_arguments)]
390    fn encode_dictionaries<I: Iterator<Item = i64>>(
391        &self,
392        field: &Field,
393        column: &ArrayRef,
394        encoded_dictionaries: &mut Vec<EncodedData>,
395        dictionary_tracker: &mut DictionaryTracker,
396        write_options: &IpcWriteOptions,
397        dict_id_seq: &mut I,
398        compression_context: &mut CompressionContext,
399    ) -> Result<(), ArrowError> {
400        match column.data_type() {
401            DataType::Dictionary(_key_type, _value_type) => {
402                let dict_data = column.to_data();
403                let dict_values = &dict_data.child_data()[0];
404
405                let values = make_array(dict_data.child_data()[0].clone());
406
407                self._encode_dictionaries(
408                    &values,
409                    encoded_dictionaries,
410                    dictionary_tracker,
411                    write_options,
412                    dict_id_seq,
413                    compression_context,
414                )?;
415
416                // It's important to only take the dict_id at this point, because the dict ID
417                // sequence is assigned depth-first, so we need to first encode children and have
418                // them take their assigned dict IDs before we take the dict ID for this field.
419                let dict_id = dict_id_seq.next().ok_or_else(|| {
420                    ArrowError::IpcError(format!(
421                        "no dict id for field {:?}: field.data_type={:?}, column.data_type={:?}",
422                        field.name(),
423                        field.data_type(),
424                        column.data_type()
425                    ))
426                })?;
427
428                match dictionary_tracker.insert_column(
429                    dict_id,
430                    column,
431                    write_options.dictionary_handling,
432                )? {
433                    DictionaryUpdate::None => {}
434                    DictionaryUpdate::New | DictionaryUpdate::Replaced => {
435                        encoded_dictionaries.push(self.dictionary_batch_to_bytes(
436                            dict_id,
437                            dict_values,
438                            write_options,
439                            false,
440                            compression_context,
441                        )?);
442                    }
443                    DictionaryUpdate::Delta(data) => {
444                        encoded_dictionaries.push(self.dictionary_batch_to_bytes(
445                            dict_id,
446                            &data,
447                            write_options,
448                            true,
449                            compression_context,
450                        )?);
451                    }
452                }
453            }
454            _ => self._encode_dictionaries(
455                column,
456                encoded_dictionaries,
457                dictionary_tracker,
458                write_options,
459                dict_id_seq,
460                compression_context,
461            )?,
462        }
463
464        Ok(())
465    }
466
467    /// Encodes a batch to a number of [EncodedData] items (dictionary batches + the record batch).
468    /// The [DictionaryTracker] keeps track of dictionaries with new `dict_id`s  (so they are only sent once)
469    /// Make sure the [DictionaryTracker] is initialized at the start of the stream.
470    pub fn encode(
471        &self,
472        batch: &RecordBatch,
473        dictionary_tracker: &mut DictionaryTracker,
474        write_options: &IpcWriteOptions,
475        compression_context: &mut CompressionContext,
476    ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
477        let schema = batch.schema();
478        let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len());
479
480        let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter();
481
482        for (i, field) in schema.fields().iter().enumerate() {
483            let column = batch.column(i);
484            self.encode_dictionaries(
485                field,
486                column,
487                &mut encoded_dictionaries,
488                dictionary_tracker,
489                write_options,
490                &mut dict_id,
491                compression_context,
492            )?;
493        }
494
495        let encoded_message =
496            self.record_batch_to_bytes(batch, write_options, compression_context)?;
497        Ok((encoded_dictionaries, encoded_message))
498    }
499
500    /// Encodes a batch to a number of [EncodedData] items (dictionary batches + the record batch).
501    /// The [DictionaryTracker] keeps track of dictionaries with new `dict_id`s  (so they are only sent once)
502    /// Make sure the [DictionaryTracker] is initialized at the start of the stream.
503    #[deprecated(since = "57.0.0", note = "Use `encode` instead")]
504    pub fn encoded_batch(
505        &self,
506        batch: &RecordBatch,
507        dictionary_tracker: &mut DictionaryTracker,
508        write_options: &IpcWriteOptions,
509    ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
510        self.encode(
511            batch,
512            dictionary_tracker,
513            write_options,
514            &mut Default::default(),
515        )
516    }
517
518    /// Write a `RecordBatch` into two sets of bytes, one for the header (crate::Message) and the
519    /// other for the batch's data
520    fn record_batch_to_bytes(
521        &self,
522        batch: &RecordBatch,
523        write_options: &IpcWriteOptions,
524        compression_context: &mut CompressionContext,
525    ) -> Result<EncodedData, ArrowError> {
526        let mut fbb = FlatBufferBuilder::new();
527
528        let mut nodes: Vec<crate::FieldNode> = vec![];
529        let mut buffers: Vec<crate::Buffer> = vec![];
530        let mut arrow_data: Vec<u8> = vec![];
531        let mut offset = 0;
532
533        // get the type of compression
534        let batch_compression_type = write_options.batch_compression_type;
535
536        let compression = batch_compression_type.map(|batch_compression_type| {
537            let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
538            c.add_method(crate::BodyCompressionMethod::BUFFER);
539            c.add_codec(batch_compression_type);
540            c.finish()
541        });
542
543        let compression_codec: Option<CompressionCodec> =
544            batch_compression_type.map(TryInto::try_into).transpose()?;
545
546        let mut variadic_buffer_counts = vec![];
547
548        for array in batch.columns() {
549            let array_data = array.to_data();
550            offset = write_array_data(
551                &array_data,
552                &mut buffers,
553                &mut arrow_data,
554                &mut nodes,
555                offset,
556                array.len(),
557                array.null_count(),
558                compression_codec,
559                compression_context,
560                write_options,
561            )?;
562
563            append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data);
564        }
565        // pad the tail of body data
566        let len = arrow_data.len();
567        let pad_len = pad_to_alignment(write_options.alignment, len);
568        arrow_data.extend_from_slice(&PADDING[..pad_len]);
569
570        // write data
571        let buffers = fbb.create_vector(&buffers);
572        let nodes = fbb.create_vector(&nodes);
573        let variadic_buffer = if variadic_buffer_counts.is_empty() {
574            None
575        } else {
576            Some(fbb.create_vector(&variadic_buffer_counts))
577        };
578
579        let root = {
580            let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
581            batch_builder.add_length(batch.num_rows() as i64);
582            batch_builder.add_nodes(nodes);
583            batch_builder.add_buffers(buffers);
584            if let Some(c) = compression {
585                batch_builder.add_compression(c);
586            }
587
588            if let Some(v) = variadic_buffer {
589                batch_builder.add_variadicBufferCounts(v);
590            }
591            let b = batch_builder.finish();
592            b.as_union_value()
593        };
594        // create an crate::Message
595        let mut message = crate::MessageBuilder::new(&mut fbb);
596        message.add_version(write_options.metadata_version);
597        message.add_header_type(crate::MessageHeader::RecordBatch);
598        message.add_bodyLength(arrow_data.len() as i64);
599        message.add_header(root);
600        let root = message.finish();
601        fbb.finish(root, None);
602        let finished_data = fbb.finished_data();
603
604        Ok(EncodedData {
605            ipc_message: finished_data.to_vec(),
606            arrow_data,
607        })
608    }
609
610    /// Write dictionary values into two sets of bytes, one for the header (crate::Message) and the
611    /// other for the data
612    fn dictionary_batch_to_bytes(
613        &self,
614        dict_id: i64,
615        array_data: &ArrayData,
616        write_options: &IpcWriteOptions,
617        is_delta: bool,
618        compression_context: &mut CompressionContext,
619    ) -> Result<EncodedData, ArrowError> {
620        let mut fbb = FlatBufferBuilder::new();
621
622        let mut nodes: Vec<crate::FieldNode> = vec![];
623        let mut buffers: Vec<crate::Buffer> = vec![];
624        let mut arrow_data: Vec<u8> = vec![];
625
626        // get the type of compression
627        let batch_compression_type = write_options.batch_compression_type;
628
629        let compression = batch_compression_type.map(|batch_compression_type| {
630            let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
631            c.add_method(crate::BodyCompressionMethod::BUFFER);
632            c.add_codec(batch_compression_type);
633            c.finish()
634        });
635
636        let compression_codec: Option<CompressionCodec> = batch_compression_type
637            .map(|batch_compression_type| batch_compression_type.try_into())
638            .transpose()?;
639
640        write_array_data(
641            array_data,
642            &mut buffers,
643            &mut arrow_data,
644            &mut nodes,
645            0,
646            array_data.len(),
647            array_data.null_count(),
648            compression_codec,
649            compression_context,
650            write_options,
651        )?;
652
653        let mut variadic_buffer_counts = vec![];
654        append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data);
655
656        // pad the tail of body data
657        let len = arrow_data.len();
658        let pad_len = pad_to_alignment(write_options.alignment, len);
659        arrow_data.extend_from_slice(&PADDING[..pad_len]);
660
661        // write data
662        let buffers = fbb.create_vector(&buffers);
663        let nodes = fbb.create_vector(&nodes);
664        let variadic_buffer = if variadic_buffer_counts.is_empty() {
665            None
666        } else {
667            Some(fbb.create_vector(&variadic_buffer_counts))
668        };
669
670        let root = {
671            let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
672            batch_builder.add_length(array_data.len() as i64);
673            batch_builder.add_nodes(nodes);
674            batch_builder.add_buffers(buffers);
675            if let Some(c) = compression {
676                batch_builder.add_compression(c);
677            }
678            if let Some(v) = variadic_buffer {
679                batch_builder.add_variadicBufferCounts(v);
680            }
681            batch_builder.finish()
682        };
683
684        let root = {
685            let mut batch_builder = crate::DictionaryBatchBuilder::new(&mut fbb);
686            batch_builder.add_id(dict_id);
687            batch_builder.add_data(root);
688            batch_builder.add_isDelta(is_delta);
689            batch_builder.finish().as_union_value()
690        };
691
692        let root = {
693            let mut message_builder = crate::MessageBuilder::new(&mut fbb);
694            message_builder.add_version(write_options.metadata_version);
695            message_builder.add_header_type(crate::MessageHeader::DictionaryBatch);
696            message_builder.add_bodyLength(arrow_data.len() as i64);
697            message_builder.add_header(root);
698            message_builder.finish()
699        };
700
701        fbb.finish(root, None);
702        let finished_data = fbb.finished_data();
703
704        Ok(EncodedData {
705            ipc_message: finished_data.to_vec(),
706            arrow_data,
707        })
708    }
709}
710
711fn append_variadic_buffer_counts(counts: &mut Vec<i64>, array: &ArrayData) {
712    match array.data_type() {
713        DataType::BinaryView | DataType::Utf8View => {
714            // The spec documents the counts only includes the variadic buffers, not the view/null buffers.
715            // https://arrow.apache.org/docs/format/Columnar.html#variadic-buffers
716            counts.push(array.buffers().len() as i64 - 1);
717        }
718        DataType::Dictionary(_, _) => {
719            // Do nothing
720            // Dictionary types are handled in `encode_dictionaries`.
721        }
722        _ => {
723            for child in array.child_data() {
724                append_variadic_buffer_counts(counts, child)
725            }
726        }
727    }
728}
729
730pub(crate) fn unslice_run_array(arr: ArrayData) -> Result<ArrayData, ArrowError> {
731    match arr.data_type() {
732        DataType::RunEndEncoded(k, _) => match k.data_type() {
733            DataType::Int16 => {
734                Ok(into_zero_offset_run_array(RunArray::<Int16Type>::from(arr))?.into_data())
735            }
736            DataType::Int32 => {
737                Ok(into_zero_offset_run_array(RunArray::<Int32Type>::from(arr))?.into_data())
738            }
739            DataType::Int64 => {
740                Ok(into_zero_offset_run_array(RunArray::<Int64Type>::from(arr))?.into_data())
741            }
742            d => unreachable!("Unexpected data type {d}"),
743        },
744        d => Err(ArrowError::InvalidArgumentError(format!(
745            "The given array is not a run array. Data type of given array: {d}"
746        ))),
747    }
748}
749
750// Returns a `RunArray` with zero offset and length matching the last value
751// in run_ends array.
752fn into_zero_offset_run_array<R: RunEndIndexType>(
753    run_array: RunArray<R>,
754) -> Result<RunArray<R>, ArrowError> {
755    let run_ends = run_array.run_ends();
756    if run_ends.offset() == 0 && run_ends.max_value() == run_ends.len() {
757        return Ok(run_array);
758    }
759
760    // The physical index of original run_ends array from which the `ArrayData`is sliced.
761    let start_physical_index = run_ends.get_start_physical_index();
762
763    // The physical index of original run_ends array until which the `ArrayData`is sliced.
764    let end_physical_index = run_ends.get_end_physical_index();
765
766    let physical_length = end_physical_index - start_physical_index + 1;
767
768    // build new run_ends array by subtracting offset from run ends.
769    let offset = R::Native::usize_as(run_ends.offset());
770    let mut builder = BufferBuilder::<R::Native>::new(physical_length);
771    for run_end_value in &run_ends.values()[start_physical_index..end_physical_index] {
772        builder.append(run_end_value.sub_wrapping(offset));
773    }
774    builder.append(R::Native::from_usize(run_array.len()).unwrap());
775    let new_run_ends = unsafe {
776        // Safety:
777        // The function builds a valid run_ends array and hence need not be validated.
778        ArrayDataBuilder::new(R::DATA_TYPE)
779            .len(physical_length)
780            .add_buffer(builder.finish())
781            .build_unchecked()
782    };
783
784    // build new values by slicing physical indices.
785    let new_values = run_array
786        .values()
787        .slice(start_physical_index, physical_length)
788        .into_data();
789
790    let builder = ArrayDataBuilder::new(run_array.data_type().clone())
791        .len(run_array.len())
792        .add_child_data(new_run_ends)
793        .add_child_data(new_values);
794    let array_data = unsafe {
795        // Safety:
796        //  This function builds a valid run array and hence can skip validation.
797        builder.build_unchecked()
798    };
799    Ok(array_data.into())
800}
801
802/// Controls how dictionaries are handled in Arrow IPC messages
803#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
804pub enum DictionaryHandling {
805    /// Send the entire dictionary every time it is encountered (default)
806    #[default]
807    Resend,
808    /// Send only new dictionary values since the last batch (delta encoding)
809    ///
810    /// When a dictionary is first encountered, the entire dictionary is sent.
811    /// For subsequent batches, only values that are new (not previously sent)
812    /// are transmitted with the `isDelta` flag set to true.
813    Delta,
814}
815
816/// Describes what kind of update took place after a call to [`DictionaryTracker::insert`].
817#[derive(Debug, Clone)]
818pub enum DictionaryUpdate {
819    /// No dictionary was written, the dictionary was identical to what was already
820    /// in the tracker.
821    None,
822    /// No dictionary was present in the tracker
823    New,
824    /// Dictionary was replaced with the new data
825    Replaced,
826    /// Dictionary was updated, ArrayData is the delta between old and new
827    Delta(ArrayData),
828}
829
830/// Keeps track of dictionaries that have been written, to avoid emitting the same dictionary
831/// multiple times.
832///
833/// Can optionally error if an update to an existing dictionary is attempted, which
834/// isn't allowed in the `FileWriter`.
835#[derive(Debug)]
836pub struct DictionaryTracker {
837    // NOTE: When adding fields, update the clear() method accordingly.
838    written: HashMap<i64, ArrayData>,
839    dict_ids: Vec<i64>,
840    error_on_replacement: bool,
841}
842
843impl DictionaryTracker {
844    /// Create a new [`DictionaryTracker`].
845    ///
846    /// If `error_on_replacement`
847    /// is true, an error will be generated if an update to an
848    /// existing dictionary is attempted.
849    pub fn new(error_on_replacement: bool) -> Self {
850        #[allow(deprecated)]
851        Self {
852            written: HashMap::new(),
853            dict_ids: Vec::new(),
854            error_on_replacement,
855        }
856    }
857
858    /// Record and return the next dictionary ID.
859    pub fn next_dict_id(&mut self) -> i64 {
860        let next = self
861            .dict_ids
862            .last()
863            .copied()
864            .map(|i| i + 1)
865            .unwrap_or_default();
866
867        self.dict_ids.push(next);
868        next
869    }
870
871    /// Return the sequence of dictionary IDs in the order they should be observed while
872    /// traversing the schema
873    pub fn dict_id(&mut self) -> &[i64] {
874        &self.dict_ids
875    }
876
877    /// Keep track of the dictionary with the given ID and values. Behavior:
878    ///
879    /// * If this ID has been written already and has the same data, return `Ok(false)` to indicate
880    ///   that the dictionary was not actually inserted (because it's already been seen).
881    /// * If this ID has been written already but with different data, and this tracker is
882    ///   configured to return an error, return an error.
883    /// * If the tracker has not been configured to error on replacement or this dictionary
884    ///   has never been seen before, return `Ok(true)` to indicate that the dictionary was just
885    ///   inserted.
886    #[deprecated(since = "56.1.0", note = "Use `insert_column` instead")]
887    pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool, ArrowError> {
888        let dict_data = column.to_data();
889        let dict_values = &dict_data.child_data()[0];
890
891        // If a dictionary with this id was already emitted, check if it was the same.
892        if let Some(last) = self.written.get(&dict_id) {
893            if ArrayData::ptr_eq(&last.child_data()[0], dict_values) {
894                // Same dictionary values => no need to emit it again
895                return Ok(false);
896            }
897            if self.error_on_replacement {
898                // If error on replacement perform a logical comparison
899                if last.child_data()[0] == *dict_values {
900                    // Same dictionary values => no need to emit it again
901                    return Ok(false);
902                }
903                return Err(ArrowError::InvalidArgumentError(
904                    "Dictionary replacement detected when writing IPC file format. \
905                     Arrow IPC files only support a single dictionary for a given field \
906                     across all batches."
907                        .to_string(),
908                ));
909            }
910        }
911
912        self.written.insert(dict_id, dict_data);
913        Ok(true)
914    }
915
916    /// Keep track of the dictionary with the given ID and values. The return
917    /// value indicates what, if any, update to the internal map took place
918    /// and how it should be interpreted based on the `dict_handling` parameter.
919    ///
920    /// # Returns
921    ///
922    /// * `Ok(Dictionary::New)` - If the dictionary was not previously written
923    /// * `Ok(Dictionary::Replaced)` - If the dictionary was previously written
924    ///   with completely different data, or if the data is a delta of the existing,
925    ///   but with `dict_handling` set to `DictionaryHandling::Resend`
926    /// * `Ok(Dictionary::Delta)` - If the dictionary was previously written, but
927    ///   the new data is a delta of the old and the `dict_handling` is set to
928    ///   `DictionaryHandling::Delta`
929    /// * `Err(e)` - If the dictionary was previously written with different data,
930    ///   and `error_on_replacement` is set to `true`.
931    pub fn insert_column(
932        &mut self,
933        dict_id: i64,
934        column: &ArrayRef,
935        dict_handling: DictionaryHandling,
936    ) -> Result<DictionaryUpdate, ArrowError> {
937        let new_data = column.to_data();
938        let new_values = &new_data.child_data()[0];
939
940        // If there is no existing dictionary with this ID, we always insert
941        let Some(old) = self.written.get(&dict_id) else {
942            self.written.insert(dict_id, new_data);
943            return Ok(DictionaryUpdate::New);
944        };
945
946        // Fast path - If the array data points to the same buffer as the
947        // existing then they're the same.
948        let old_values = &old.child_data()[0];
949        if ArrayData::ptr_eq(old_values, new_values) {
950            return Ok(DictionaryUpdate::None);
951        }
952
953        // Slow path - Compare the dictionaries value by value
954        let comparison = compare_dictionaries(old_values, new_values);
955        if matches!(comparison, DictionaryComparison::Equal) {
956            return Ok(DictionaryUpdate::None);
957        }
958
959        const REPLACEMENT_ERROR: &str = "Dictionary replacement detected when writing IPC file format. \
960                 Arrow IPC files only support a single dictionary for a given field \
961                 across all batches.";
962
963        match comparison {
964            DictionaryComparison::NotEqual => {
965                if self.error_on_replacement {
966                    return Err(ArrowError::InvalidArgumentError(
967                        REPLACEMENT_ERROR.to_string(),
968                    ));
969                }
970
971                self.written.insert(dict_id, new_data);
972                Ok(DictionaryUpdate::Replaced)
973            }
974            DictionaryComparison::Delta => match dict_handling {
975                DictionaryHandling::Resend => {
976                    if self.error_on_replacement {
977                        return Err(ArrowError::InvalidArgumentError(
978                            REPLACEMENT_ERROR.to_string(),
979                        ));
980                    }
981
982                    self.written.insert(dict_id, new_data);
983                    Ok(DictionaryUpdate::Replaced)
984                }
985                DictionaryHandling::Delta => {
986                    let delta =
987                        new_values.slice(old_values.len(), new_values.len() - old_values.len());
988                    self.written.insert(dict_id, new_data);
989                    Ok(DictionaryUpdate::Delta(delta))
990                }
991            },
992            DictionaryComparison::Equal => unreachable!("Already checked equal case"),
993        }
994    }
995
996    /// Clears the state of the dictionary tracker.
997    ///
998    /// This allows the dictionary tracker to be reused for a new IPC stream while avoiding the
999    /// allocation cost of creating a new instance. This method should not be called if
1000    /// the dictionary tracker will be used to continue writing to an existing IPC stream.
1001    pub fn clear(&mut self) {
1002        self.dict_ids.clear();
1003        self.written.clear();
1004    }
1005}
1006
1007/// Describes how two dictionary arrays compare to each other.
1008#[derive(Debug, Clone)]
1009enum DictionaryComparison {
1010    /// Neither a delta, nor an exact match
1011    NotEqual,
1012    /// Exact element-wise match
1013    Equal,
1014    /// The two arrays are dictionary deltas of each other, meaning the first
1015    /// is a prefix of the second.
1016    Delta,
1017}
1018
1019// Compares two dictionaries and returns a [`DictionaryComparison`].
1020fn compare_dictionaries(old: &ArrayData, new: &ArrayData) -> DictionaryComparison {
1021    // Check for exact match
1022    let existing_len = old.len();
1023    let new_len = new.len();
1024    if existing_len == new_len {
1025        if *old == *new {
1026            return DictionaryComparison::Equal;
1027        } else {
1028            return DictionaryComparison::NotEqual;
1029        }
1030    }
1031
1032    // Can't be a delta if the new is shorter than the existing
1033    if new_len < existing_len {
1034        return DictionaryComparison::NotEqual;
1035    }
1036
1037    // Check for delta
1038    if new.slice(0, existing_len) == *old {
1039        return DictionaryComparison::Delta;
1040    }
1041
1042    DictionaryComparison::NotEqual
1043}
1044
1045/// Arrow File Writer
1046///
1047/// Writes Arrow [`RecordBatch`]es in the [IPC File Format].
1048///
1049/// # See Also
1050///
1051/// * [`StreamWriter`] for writing IPC Streams
1052///
1053/// # Example
1054/// ```
1055/// # use arrow_array::record_batch;
1056/// # use arrow_ipc::writer::FileWriter;
1057/// # let mut file = vec![]; // mimic a file for the example
1058/// let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
1059/// // create a new writer, the schema must be known in advance
1060/// let mut writer = FileWriter::try_new(&mut file, &batch.schema()).unwrap();
1061/// // write each batch to the underlying writer
1062/// writer.write(&batch).unwrap();
1063/// // When all batches are written, call finish to flush all buffers
1064/// writer.finish().unwrap();
1065/// ```
1066/// [IPC File Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format
1067pub struct FileWriter<W> {
1068    /// The object to write to
1069    writer: W,
1070    /// IPC write options
1071    write_options: IpcWriteOptions,
1072    /// A reference to the schema, used in validating record batches
1073    schema: SchemaRef,
1074    /// The number of bytes between each block of bytes, as an offset for random access
1075    block_offsets: usize,
1076    /// Dictionary blocks that will be written as part of the IPC footer
1077    dictionary_blocks: Vec<crate::Block>,
1078    /// Record blocks that will be written as part of the IPC footer
1079    record_blocks: Vec<crate::Block>,
1080    /// Whether the writer footer has been written, and the writer is finished
1081    finished: bool,
1082    /// Keeps track of dictionaries that have been written
1083    dictionary_tracker: DictionaryTracker,
1084    /// User level customized metadata
1085    custom_metadata: HashMap<String, String>,
1086
1087    data_gen: IpcDataGenerator,
1088
1089    compression_context: CompressionContext,
1090}
1091
1092impl<W: Write> FileWriter<BufWriter<W>> {
1093    /// Try to create a new file writer with the writer wrapped in a BufWriter.
1094    ///
1095    /// See [`FileWriter::try_new`] for an unbuffered version.
1096    pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1097        Self::try_new(BufWriter::new(writer), schema)
1098    }
1099}
1100
1101impl<W: Write> FileWriter<W> {
1102    /// Try to create a new writer, with the schema written as part of the header
1103    ///
1104    /// Note the created writer is not buffered. See [`FileWriter::try_new_buffered`] for details.
1105    ///
1106    /// # Errors
1107    ///
1108    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1109    pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1110        let write_options = IpcWriteOptions::default();
1111        Self::try_new_with_options(writer, schema, write_options)
1112    }
1113
1114    /// Try to create a new writer with IpcWriteOptions
1115    ///
1116    /// Note the created writer is not buffered. See [`FileWriter::try_new_buffered`] for details.
1117    ///
1118    /// # Errors
1119    ///
1120    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1121    pub fn try_new_with_options(
1122        mut writer: W,
1123        schema: &Schema,
1124        write_options: IpcWriteOptions,
1125    ) -> Result<Self, ArrowError> {
1126        let data_gen = IpcDataGenerator::default();
1127        // write magic to header aligned on alignment boundary
1128        let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len());
1129        let header_size = super::ARROW_MAGIC.len() + pad_len;
1130        writer.write_all(&super::ARROW_MAGIC)?;
1131        writer.write_all(&PADDING[..pad_len])?;
1132        // write the schema, set the written bytes to the schema + header
1133        let mut dictionary_tracker = DictionaryTracker::new(true);
1134        let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1135            schema,
1136            &mut dictionary_tracker,
1137            &write_options,
1138        );
1139        let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
1140        Ok(Self {
1141            writer,
1142            write_options,
1143            schema: Arc::new(schema.clone()),
1144            block_offsets: meta + data + header_size,
1145            dictionary_blocks: vec![],
1146            record_blocks: vec![],
1147            finished: false,
1148            dictionary_tracker,
1149            custom_metadata: HashMap::new(),
1150            data_gen,
1151            compression_context: CompressionContext::default(),
1152        })
1153    }
1154
1155    /// Adds a key-value pair to the [FileWriter]'s custom metadata
1156    pub fn write_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
1157        self.custom_metadata.insert(key.into(), value.into());
1158    }
1159
1160    /// Write a record batch to the file
1161    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1162        if self.finished {
1163            return Err(ArrowError::IpcError(
1164                "Cannot write record batch to file writer as it is closed".to_string(),
1165            ));
1166        }
1167
1168        let (encoded_dictionaries, encoded_message) = self.data_gen.encode(
1169            batch,
1170            &mut self.dictionary_tracker,
1171            &self.write_options,
1172            &mut self.compression_context,
1173        )?;
1174
1175        for encoded_dictionary in encoded_dictionaries {
1176            let (meta, data) =
1177                write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
1178
1179            let block = crate::Block::new(self.block_offsets as i64, meta as i32, data as i64);
1180            self.dictionary_blocks.push(block);
1181            self.block_offsets += meta + data;
1182        }
1183
1184        let (meta, data) = write_message(&mut self.writer, encoded_message, &self.write_options)?;
1185
1186        // add a record block for the footer
1187        let block = crate::Block::new(
1188            self.block_offsets as i64,
1189            meta as i32, // TODO: is this still applicable?
1190            data as i64,
1191        );
1192        self.record_blocks.push(block);
1193        self.block_offsets += meta + data;
1194        Ok(())
1195    }
1196
1197    /// Write footer and closing tag, then mark the writer as done
1198    pub fn finish(&mut self) -> Result<(), ArrowError> {
1199        if self.finished {
1200            return Err(ArrowError::IpcError(
1201                "Cannot write footer to file writer as it is closed".to_string(),
1202            ));
1203        }
1204
1205        // write EOS
1206        write_continuation(&mut self.writer, &self.write_options, 0)?;
1207
1208        let mut fbb = FlatBufferBuilder::new();
1209        let dictionaries = fbb.create_vector(&self.dictionary_blocks);
1210        let record_batches = fbb.create_vector(&self.record_blocks);
1211
1212        // dictionaries are already written, so we can reset dictionary tracker to reuse for schema
1213        self.dictionary_tracker.clear();
1214        let schema = IpcSchemaEncoder::new()
1215            .with_dictionary_tracker(&mut self.dictionary_tracker)
1216            .schema_to_fb_offset(&mut fbb, &self.schema);
1217        let fb_custom_metadata = (!self.custom_metadata.is_empty())
1218            .then(|| crate::convert::metadata_to_fb(&mut fbb, &self.custom_metadata));
1219
1220        let root = {
1221            let mut footer_builder = crate::FooterBuilder::new(&mut fbb);
1222            footer_builder.add_version(self.write_options.metadata_version);
1223            footer_builder.add_schema(schema);
1224            footer_builder.add_dictionaries(dictionaries);
1225            footer_builder.add_recordBatches(record_batches);
1226            if let Some(fb_custom_metadata) = fb_custom_metadata {
1227                footer_builder.add_custom_metadata(fb_custom_metadata);
1228            }
1229            footer_builder.finish()
1230        };
1231        fbb.finish(root, None);
1232        let footer_data = fbb.finished_data();
1233        self.writer.write_all(footer_data)?;
1234        self.writer
1235            .write_all(&(footer_data.len() as i32).to_le_bytes())?;
1236        self.writer.write_all(&super::ARROW_MAGIC)?;
1237        self.writer.flush()?;
1238        self.finished = true;
1239
1240        Ok(())
1241    }
1242
1243    /// Returns the arrow [`SchemaRef`] for this arrow file.
1244    pub fn schema(&self) -> &SchemaRef {
1245        &self.schema
1246    }
1247
1248    /// Gets a reference to the underlying writer.
1249    pub fn get_ref(&self) -> &W {
1250        &self.writer
1251    }
1252
1253    /// Gets a mutable reference to the underlying writer.
1254    ///
1255    /// It is inadvisable to directly write to the underlying writer.
1256    pub fn get_mut(&mut self) -> &mut W {
1257        &mut self.writer
1258    }
1259
1260    /// Flush the underlying writer.
1261    ///
1262    /// Both the BufWriter and the underlying writer are flushed.
1263    pub fn flush(&mut self) -> Result<(), ArrowError> {
1264        self.writer.flush()?;
1265        Ok(())
1266    }
1267
1268    /// Unwraps the underlying writer.
1269    ///
1270    /// The writer is flushed and the FileWriter is finished before returning.
1271    ///
1272    /// # Errors
1273    ///
1274    /// An ['Err'](Result::Err) may be returned if an error occurs while finishing the StreamWriter
1275    /// or while flushing the writer.
1276    pub fn into_inner(mut self) -> Result<W, ArrowError> {
1277        if !self.finished {
1278            // `finish` flushes the writer.
1279            self.finish()?;
1280        }
1281        Ok(self.writer)
1282    }
1283}
1284
1285impl<W: Write> RecordBatchWriter for FileWriter<W> {
1286    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1287        self.write(batch)
1288    }
1289
1290    fn close(mut self) -> Result<(), ArrowError> {
1291        self.finish()
1292    }
1293}
1294
1295/// Arrow Stream Writer
1296///
1297/// Writes Arrow [`RecordBatch`]es to bytes using the [IPC Streaming Format].
1298///
1299/// # See Also
1300///
1301/// * [`FileWriter`] for writing IPC Files
1302///
1303/// # Example - Basic usage
1304/// ```
1305/// # use arrow_array::record_batch;
1306/// # use arrow_ipc::writer::StreamWriter;
1307/// # let mut stream = vec![]; // mimic a stream for the example
1308/// let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
1309/// // create a new writer, the schema must be known in advance
1310/// let mut writer = StreamWriter::try_new(&mut stream, &batch.schema()).unwrap();
1311/// // write each batch to the underlying stream
1312/// writer.write(&batch).unwrap();
1313/// // When all batches are written, call finish to flush all buffers
1314/// writer.finish().unwrap();
1315/// ```
1316/// # Example - Efficient delta dictionaries
1317/// ```
1318/// # use arrow_array::record_batch;
1319/// # use arrow_ipc::writer::{StreamWriter, IpcWriteOptions};
1320/// # use arrow_ipc::writer::DictionaryHandling;
1321/// # use arrow_schema::{DataType, Field, Schema, SchemaRef};
1322/// # use arrow_array::{
1323/// #    builder::StringDictionaryBuilder, types::Int32Type, Array, ArrayRef, DictionaryArray,
1324/// #    RecordBatch, StringArray,
1325/// # };
1326/// # use std::sync::Arc;
1327///
1328/// let schema = Arc::new(Schema::new(vec![Field::new(
1329///    "col1",
1330///    DataType::Dictionary(Box::from(DataType::Int32), Box::from(DataType::Utf8)),
1331///    true,
1332/// )]));
1333///
1334/// let mut builder = StringDictionaryBuilder::<arrow_array::types::Int32Type>::new();
1335///
1336/// // `finish_preserve_values` will keep the dictionary values along with their
1337/// // key assignments so that they can be re-used in the next batch.
1338/// builder.append("a").unwrap();
1339/// builder.append("b").unwrap();
1340/// let array1 = builder.finish_preserve_values();
1341/// let batch1 = RecordBatch::try_new(schema.clone(), vec![Arc::new(array1) as ArrayRef]).unwrap();
1342///
1343/// // In this batch, 'a' will have the same dictionary key as 'a' in the previous batch,
1344/// // and 'd' will take the next available key.
1345/// builder.append("a").unwrap();
1346/// builder.append("d").unwrap();
1347/// let array2 = builder.finish_preserve_values();
1348/// let batch2 = RecordBatch::try_new(schema.clone(), vec![Arc::new(array2) as ArrayRef]).unwrap();
1349///
1350/// let mut stream = vec![];
1351/// // You must set `.with_dictionary_handling(DictionaryHandling::Delta)` to
1352/// // enable delta dictionaries in the writer
1353/// let options = IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta);
1354/// let mut writer = StreamWriter::try_new(&mut stream, &schema).unwrap();
1355///
1356/// // When writing the first batch, a dictionary message with 'a' and 'b' will be written
1357/// // prior to the record batch.
1358/// writer.write(&batch1).unwrap();
1359/// // With the second batch only a delta dictionary with 'd' will be written
1360/// // prior to the record batch. This is only possible with `finish_preserve_values`.
1361/// // Without it, 'a' and 'd' in this batch would have different keys than the
1362/// // first batch and so we'd have to send a replacement dictionary with new keys
1363/// // for both.
1364/// writer.write(&batch2).unwrap();
1365/// writer.finish().unwrap();
1366/// ```
1367/// [IPC Streaming Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
1368pub struct StreamWriter<W> {
1369    /// The object to write to
1370    writer: W,
1371    /// IPC write options
1372    write_options: IpcWriteOptions,
1373    /// Whether the writer footer has been written, and the writer is finished
1374    finished: bool,
1375    /// Keeps track of dictionaries that have been written
1376    dictionary_tracker: DictionaryTracker,
1377
1378    data_gen: IpcDataGenerator,
1379
1380    compression_context: CompressionContext,
1381}
1382
1383impl<W: Write> StreamWriter<BufWriter<W>> {
1384    /// Try to create a new stream writer with the writer wrapped in a BufWriter.
1385    ///
1386    /// See [`StreamWriter::try_new`] for an unbuffered version.
1387    pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1388        Self::try_new(BufWriter::new(writer), schema)
1389    }
1390}
1391
1392impl<W: Write> StreamWriter<W> {
1393    /// Try to create a new writer, with the schema written as part of the header.
1394    ///
1395    /// Note that there is no internal buffering. See also [`StreamWriter::try_new_buffered`].
1396    ///
1397    /// # Errors
1398    ///
1399    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1400    pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1401        let write_options = IpcWriteOptions::default();
1402        Self::try_new_with_options(writer, schema, write_options)
1403    }
1404
1405    /// Try to create a new writer with [`IpcWriteOptions`].
1406    ///
1407    /// # Errors
1408    ///
1409    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1410    pub fn try_new_with_options(
1411        mut writer: W,
1412        schema: &Schema,
1413        write_options: IpcWriteOptions,
1414    ) -> Result<Self, ArrowError> {
1415        let data_gen = IpcDataGenerator::default();
1416        let mut dictionary_tracker = DictionaryTracker::new(false);
1417
1418        // write the schema, set the written bytes to the schema
1419        let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1420            schema,
1421            &mut dictionary_tracker,
1422            &write_options,
1423        );
1424        write_message(&mut writer, encoded_message, &write_options)?;
1425        Ok(Self {
1426            writer,
1427            write_options,
1428            finished: false,
1429            dictionary_tracker,
1430            data_gen,
1431            compression_context: CompressionContext::default(),
1432        })
1433    }
1434
1435    /// Write a record batch to the stream
1436    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1437        if self.finished {
1438            return Err(ArrowError::IpcError(
1439                "Cannot write record batch to stream writer as it is closed".to_string(),
1440            ));
1441        }
1442
1443        let (encoded_dictionaries, encoded_message) = self
1444            .data_gen
1445            .encode(
1446                batch,
1447                &mut self.dictionary_tracker,
1448                &self.write_options,
1449                &mut self.compression_context,
1450            )
1451            .expect("StreamWriter is configured to not error on dictionary replacement");
1452
1453        for encoded_dictionary in encoded_dictionaries {
1454            write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
1455        }
1456
1457        write_message(&mut self.writer, encoded_message, &self.write_options)?;
1458        Ok(())
1459    }
1460
1461    /// Write continuation bytes, and mark the stream as done
1462    pub fn finish(&mut self) -> Result<(), ArrowError> {
1463        if self.finished {
1464            return Err(ArrowError::IpcError(
1465                "Cannot write footer to stream writer as it is closed".to_string(),
1466            ));
1467        }
1468
1469        write_continuation(&mut self.writer, &self.write_options, 0)?;
1470        self.writer.flush()?;
1471
1472        self.finished = true;
1473
1474        Ok(())
1475    }
1476
1477    /// Gets a reference to the underlying writer.
1478    pub fn get_ref(&self) -> &W {
1479        &self.writer
1480    }
1481
1482    /// Gets a mutable reference to the underlying writer.
1483    ///
1484    /// It is inadvisable to directly write to the underlying writer.
1485    pub fn get_mut(&mut self) -> &mut W {
1486        &mut self.writer
1487    }
1488
1489    /// Flush the underlying writer.
1490    ///
1491    /// Both the BufWriter and the underlying writer are flushed.
1492    pub fn flush(&mut self) -> Result<(), ArrowError> {
1493        self.writer.flush()?;
1494        Ok(())
1495    }
1496
1497    /// Unwraps the the underlying writer.
1498    ///
1499    /// The writer is flushed and the StreamWriter is finished before returning.
1500    ///
1501    /// # Errors
1502    ///
1503    /// An ['Err'](Result::Err) may be returned if an error occurs while finishing the StreamWriter
1504    /// or while flushing the writer.
1505    ///
1506    /// # Example
1507    ///
1508    /// ```
1509    /// # use arrow_ipc::writer::{StreamWriter, IpcWriteOptions};
1510    /// # use arrow_ipc::MetadataVersion;
1511    /// # use arrow_schema::{ArrowError, Schema};
1512    /// # fn main() -> Result<(), ArrowError> {
1513    /// // The result we expect from an empty schema
1514    /// let expected = vec![
1515    ///     255, 255, 255, 255,  48,   0,   0,   0,
1516    ///      16,   0,   0,   0,   0,   0,  10,   0,
1517    ///      12,   0,  10,   0,   9,   0,   4,   0,
1518    ///      10,   0,   0,   0,  16,   0,   0,   0,
1519    ///       0,   1,   4,   0,   8,   0,   8,   0,
1520    ///       0,   0,   4,   0,   8,   0,   0,   0,
1521    ///       4,   0,   0,   0,   0,   0,   0,   0,
1522    ///     255, 255, 255, 255,   0,   0,   0,   0
1523    /// ];
1524    ///
1525    /// let schema = Schema::empty();
1526    /// let buffer: Vec<u8> = Vec::new();
1527    /// let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5)?;
1528    /// let stream_writer = StreamWriter::try_new_with_options(buffer, &schema, options)?;
1529    ///
1530    /// assert_eq!(stream_writer.into_inner()?, expected);
1531    /// # Ok(())
1532    /// # }
1533    /// ```
1534    pub fn into_inner(mut self) -> Result<W, ArrowError> {
1535        if !self.finished {
1536            // `finish` flushes.
1537            self.finish()?;
1538        }
1539        Ok(self.writer)
1540    }
1541}
1542
1543impl<W: Write> RecordBatchWriter for StreamWriter<W> {
1544    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1545        self.write(batch)
1546    }
1547
1548    fn close(mut self) -> Result<(), ArrowError> {
1549        self.finish()
1550    }
1551}
1552
1553/// Stores the encoded data, which is an crate::Message, and optional Arrow data
1554pub struct EncodedData {
1555    /// An encoded crate::Message
1556    pub ipc_message: Vec<u8>,
1557    /// Arrow buffers to be written, should be an empty vec for schema messages
1558    pub arrow_data: Vec<u8>,
1559}
1560/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written
1561pub fn write_message<W: Write>(
1562    mut writer: W,
1563    encoded: EncodedData,
1564    write_options: &IpcWriteOptions,
1565) -> Result<(usize, usize), ArrowError> {
1566    let arrow_data_len = encoded.arrow_data.len();
1567    if arrow_data_len % usize::from(write_options.alignment) != 0 {
1568        return Err(ArrowError::MemoryError(
1569            "Arrow data not aligned".to_string(),
1570        ));
1571    }
1572
1573    let a = usize::from(write_options.alignment - 1);
1574    let buffer = encoded.ipc_message;
1575    let flatbuf_size = buffer.len();
1576    let prefix_size = if write_options.write_legacy_ipc_format {
1577        4
1578    } else {
1579        8
1580    };
1581    let aligned_size = (flatbuf_size + prefix_size + a) & !a;
1582    let padding_bytes = aligned_size - flatbuf_size - prefix_size;
1583
1584    write_continuation(
1585        &mut writer,
1586        write_options,
1587        (aligned_size - prefix_size) as i32,
1588    )?;
1589
1590    // write the flatbuf
1591    if flatbuf_size > 0 {
1592        writer.write_all(&buffer)?;
1593    }
1594    // write padding
1595    writer.write_all(&PADDING[..padding_bytes])?;
1596
1597    // write arrow data
1598    let body_len = if arrow_data_len > 0 {
1599        write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)?
1600    } else {
1601        0
1602    };
1603
1604    Ok((aligned_size, body_len))
1605}
1606
1607fn write_body_buffers<W: Write>(
1608    mut writer: W,
1609    data: &[u8],
1610    alignment: u8,
1611) -> Result<usize, ArrowError> {
1612    let len = data.len();
1613    let pad_len = pad_to_alignment(alignment, len);
1614    let total_len = len + pad_len;
1615
1616    // write body buffer
1617    writer.write_all(data)?;
1618    if pad_len > 0 {
1619        writer.write_all(&PADDING[..pad_len])?;
1620    }
1621
1622    Ok(total_len)
1623}
1624
1625/// Write a record batch to the writer, writing the message size before the message
1626/// if the record batch is being written to a stream
1627fn write_continuation<W: Write>(
1628    mut writer: W,
1629    write_options: &IpcWriteOptions,
1630    total_len: i32,
1631) -> Result<usize, ArrowError> {
1632    let mut written = 8;
1633
1634    // the version of the writer determines whether continuation markers should be added
1635    match write_options.metadata_version {
1636        crate::MetadataVersion::V1 | crate::MetadataVersion::V2 | crate::MetadataVersion::V3 => {
1637            unreachable!("Options with the metadata version cannot be created")
1638        }
1639        crate::MetadataVersion::V4 => {
1640            if !write_options.write_legacy_ipc_format {
1641                // v0.15.0 format
1642                writer.write_all(&CONTINUATION_MARKER)?;
1643                written = 4;
1644            }
1645            writer.write_all(&total_len.to_le_bytes()[..])?;
1646        }
1647        crate::MetadataVersion::V5 => {
1648            // write continuation marker and message length
1649            writer.write_all(&CONTINUATION_MARKER)?;
1650            writer.write_all(&total_len.to_le_bytes()[..])?;
1651        }
1652        z => panic!("Unsupported crate::MetadataVersion {z:?}"),
1653    };
1654
1655    Ok(written)
1656}
1657
1658/// In V4, null types have no validity bitmap
1659/// In V5 and later, null and union types have no validity bitmap
1660/// Run end encoded type has no validity bitmap.
1661fn has_validity_bitmap(data_type: &DataType, write_options: &IpcWriteOptions) -> bool {
1662    if write_options.metadata_version < crate::MetadataVersion::V5 {
1663        !matches!(data_type, DataType::Null)
1664    } else {
1665        !matches!(
1666            data_type,
1667            DataType::Null | DataType::Union(_, _) | DataType::RunEndEncoded(_, _)
1668        )
1669    }
1670}
1671
1672/// Whether to truncate the buffer
1673#[inline]
1674fn buffer_need_truncate(
1675    array_offset: usize,
1676    buffer: &Buffer,
1677    spec: &BufferSpec,
1678    min_length: usize,
1679) -> bool {
1680    spec != &BufferSpec::AlwaysNull && (array_offset != 0 || min_length < buffer.len())
1681}
1682
1683/// Returns byte width for a buffer spec. Only for `BufferSpec::FixedWidth`.
1684#[inline]
1685fn get_buffer_element_width(spec: &BufferSpec) -> usize {
1686    match spec {
1687        BufferSpec::FixedWidth { byte_width, .. } => *byte_width,
1688        _ => 0,
1689    }
1690}
1691
1692/// Common functionality for re-encoding offsets. Returns the new offsets as well as
1693/// original start offset and length for use in slicing child data.
1694fn reencode_offsets<O: OffsetSizeTrait>(
1695    offsets: &Buffer,
1696    data: &ArrayData,
1697) -> (Buffer, usize, usize) {
1698    let offsets_slice: &[O] = offsets.typed_data::<O>();
1699    let offset_slice = &offsets_slice[data.offset()..data.offset() + data.len() + 1];
1700
1701    let start_offset = offset_slice.first().unwrap();
1702    let end_offset = offset_slice.last().unwrap();
1703
1704    let offsets = match start_offset.as_usize() {
1705        0 => {
1706            let size = size_of::<O>();
1707            offsets.slice_with_length(data.offset() * size, (data.len() + 1) * size)
1708        }
1709        _ => offset_slice.iter().map(|x| *x - *start_offset).collect(),
1710    };
1711
1712    let start_offset = start_offset.as_usize();
1713    let end_offset = end_offset.as_usize();
1714
1715    (offsets, start_offset, end_offset - start_offset)
1716}
1717
1718/// Returns the values and offsets [`Buffer`] for a ByteArray with offset type `O`
1719///
1720/// In particular, this handles re-encoding the offsets if they don't start at `0`,
1721/// slicing the values buffer as appropriate. This helps reduce the encoded
1722/// size of sliced arrays, as values that have been sliced away are not encoded
1723fn get_byte_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, Buffer) {
1724    if data.is_empty() {
1725        // As per specification, offsets buffer has N+1 elements.
1726        // So an empty array should still be encoded with a single 0 offset.
1727        let mut offsets = MutableBuffer::new(size_of::<O>());
1728        offsets.extend_from_slice(O::usize_as(0).to_byte_slice());
1729        return (offsets.into(), MutableBuffer::new(0).into());
1730    }
1731
1732    let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1733    let values = data.buffers()[1].slice_with_length(original_start_offset, len);
1734    (offsets, values)
1735}
1736
1737/// Similar logic as [`get_byte_array_buffers()`] but slices the child array instead
1738/// of a values buffer.
1739fn get_list_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, ArrayData) {
1740    if data.is_empty() {
1741        // As per specification, offsets buffer has N+1 elements.
1742        // So an empty array should still be encoded with a single 0 offset.
1743        let mut offsets = MutableBuffer::new(size_of::<O>());
1744        offsets.extend_from_slice(O::usize_as(0).to_byte_slice());
1745        return (offsets.into(), data.child_data()[0].slice(0, 0));
1746    }
1747
1748    let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1749    let child_data = data.child_data()[0].slice(original_start_offset, len);
1750    (offsets, child_data)
1751}
1752
1753/// Returns the offsets, sizes, and child data buffers for a ListView array.
1754///
1755/// Unlike List arrays, ListView arrays store both offsets and sizes explicitly,
1756/// and offsets can be non-monotonic. When slicing, we simply pass through the
1757/// offsets and sizes without re-encoding, and do not slice the child data.
1758fn get_list_view_array_buffers<O: OffsetSizeTrait>(
1759    data: &ArrayData,
1760) -> (Buffer, Buffer, ArrayData) {
1761    if data.is_empty() {
1762        return (
1763            MutableBuffer::new(0).into(),
1764            MutableBuffer::new(0).into(),
1765            data.child_data()[0].slice(0, 0),
1766        );
1767    }
1768
1769    let offsets = &data.buffers()[0];
1770    let sizes = &data.buffers()[1];
1771
1772    let element_size = std::mem::size_of::<O>();
1773    let offsets_slice =
1774        offsets.slice_with_length(data.offset() * element_size, data.len() * element_size);
1775    let sizes_slice =
1776        sizes.slice_with_length(data.offset() * element_size, data.len() * element_size);
1777
1778    let child_data = data.child_data()[0].clone();
1779
1780    (offsets_slice, sizes_slice, child_data)
1781}
1782
1783/// Returns the sliced views [`Buffer`] for a BinaryView/Utf8View array.
1784///
1785/// The views buffer is sliced to only include views in the valid range based on
1786/// the array's offset and length. This helps reduce the encoded size of sliced
1787/// arrays
1788///
1789fn get_or_truncate_buffer(array_data: &ArrayData) -> &[u8] {
1790    let buffer = &array_data.buffers()[0];
1791    let layout = layout(array_data.data_type());
1792    let spec = &layout.buffers[0];
1793
1794    let byte_width = get_buffer_element_width(spec);
1795    let min_length = array_data.len() * byte_width;
1796    if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) {
1797        let byte_offset = array_data.offset() * byte_width;
1798        let buffer_length = min(min_length, buffer.len() - byte_offset);
1799        &buffer.as_slice()[byte_offset..(byte_offset + buffer_length)]
1800    } else {
1801        buffer.as_slice()
1802    }
1803}
1804
1805/// Write array data to a vector of bytes
1806#[allow(clippy::too_many_arguments)]
1807fn write_array_data(
1808    array_data: &ArrayData,
1809    buffers: &mut Vec<crate::Buffer>,
1810    arrow_data: &mut Vec<u8>,
1811    nodes: &mut Vec<crate::FieldNode>,
1812    offset: i64,
1813    num_rows: usize,
1814    null_count: usize,
1815    compression_codec: Option<CompressionCodec>,
1816    compression_context: &mut CompressionContext,
1817    write_options: &IpcWriteOptions,
1818) -> Result<i64, ArrowError> {
1819    let mut offset = offset;
1820    if !matches!(array_data.data_type(), DataType::Null) {
1821        nodes.push(crate::FieldNode::new(num_rows as i64, null_count as i64));
1822    } else {
1823        // NullArray's null_count equals to len, but the `null_count` passed in is from ArrayData
1824        // where null_count is always 0.
1825        nodes.push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
1826    }
1827    if has_validity_bitmap(array_data.data_type(), write_options) {
1828        // write null buffer if exists
1829        let null_buffer = match array_data.nulls() {
1830            None => {
1831                // create a buffer and fill it with valid bits
1832                let num_bytes = bit_util::ceil(num_rows, 8);
1833                let buffer = MutableBuffer::new(num_bytes);
1834                let buffer = buffer.with_bitset(num_bytes, true);
1835                buffer.into()
1836            }
1837            Some(buffer) => buffer.inner().sliced(),
1838        };
1839
1840        offset = write_buffer(
1841            null_buffer.as_slice(),
1842            buffers,
1843            arrow_data,
1844            offset,
1845            compression_codec,
1846            compression_context,
1847            write_options.alignment,
1848        )?;
1849    }
1850
1851    let data_type = array_data.data_type();
1852    if matches!(data_type, DataType::Binary | DataType::Utf8) {
1853        let (offsets, values) = get_byte_array_buffers::<i32>(array_data);
1854        for buffer in [offsets, values] {
1855            offset = write_buffer(
1856                buffer.as_slice(),
1857                buffers,
1858                arrow_data,
1859                offset,
1860                compression_codec,
1861                compression_context,
1862                write_options.alignment,
1863            )?;
1864        }
1865    } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) {
1866        // Slicing the views buffer is safe and easy,
1867        // but pruning unneeded data buffers is much more nuanced since it's complicated to prove that no views reference the pruned buffers
1868        //
1869        // Current implementation just serialize the raw arrays as given and not try to optimize anything.
1870        // If users wants to "compact" the arrays prior to sending them over IPC,
1871        // they should consider the gc API suggested in #5513
1872        let views = get_or_truncate_buffer(array_data);
1873        offset = write_buffer(
1874            views,
1875            buffers,
1876            arrow_data,
1877            offset,
1878            compression_codec,
1879            compression_context,
1880            write_options.alignment,
1881        )?;
1882
1883        for buffer in array_data.buffers().iter().skip(1) {
1884            offset = write_buffer(
1885                buffer.as_slice(),
1886                buffers,
1887                arrow_data,
1888                offset,
1889                compression_codec,
1890                compression_context,
1891                write_options.alignment,
1892            )?;
1893        }
1894    } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) {
1895        let (offsets, values) = get_byte_array_buffers::<i64>(array_data);
1896        for buffer in [offsets, values] {
1897            offset = write_buffer(
1898                buffer.as_slice(),
1899                buffers,
1900                arrow_data,
1901                offset,
1902                compression_codec,
1903                compression_context,
1904                write_options.alignment,
1905            )?;
1906        }
1907    } else if DataType::is_numeric(data_type)
1908        || DataType::is_temporal(data_type)
1909        || matches!(
1910            array_data.data_type(),
1911            DataType::FixedSizeBinary(_) | DataType::Dictionary(_, _)
1912        )
1913    {
1914        // Truncate values
1915        assert_eq!(array_data.buffers().len(), 1);
1916
1917        let buffer = get_or_truncate_buffer(array_data);
1918        offset = write_buffer(
1919            buffer,
1920            buffers,
1921            arrow_data,
1922            offset,
1923            compression_codec,
1924            compression_context,
1925            write_options.alignment,
1926        )?;
1927    } else if matches!(data_type, DataType::Boolean) {
1928        // Bools are special because the payload (= 1 bit) is smaller than the physical container elements (= bytes).
1929        // The array data may not start at the physical boundary of the underlying buffer, so we need to shift bits around.
1930        assert_eq!(array_data.buffers().len(), 1);
1931
1932        let buffer = &array_data.buffers()[0];
1933        let buffer = buffer.bit_slice(array_data.offset(), array_data.len());
1934        offset = write_buffer(
1935            &buffer,
1936            buffers,
1937            arrow_data,
1938            offset,
1939            compression_codec,
1940            compression_context,
1941            write_options.alignment,
1942        )?;
1943    } else if matches!(
1944        data_type,
1945        DataType::List(_) | DataType::LargeList(_) | DataType::Map(_, _)
1946    ) {
1947        assert_eq!(array_data.buffers().len(), 1);
1948        assert_eq!(array_data.child_data().len(), 1);
1949
1950        // Truncate offsets and the child data to avoid writing unnecessary data
1951        let (offsets, sliced_child_data) = match data_type {
1952            DataType::List(_) => get_list_array_buffers::<i32>(array_data),
1953            DataType::Map(_, _) => get_list_array_buffers::<i32>(array_data),
1954            DataType::LargeList(_) => get_list_array_buffers::<i64>(array_data),
1955            _ => unreachable!(),
1956        };
1957        offset = write_buffer(
1958            offsets.as_slice(),
1959            buffers,
1960            arrow_data,
1961            offset,
1962            compression_codec,
1963            compression_context,
1964            write_options.alignment,
1965        )?;
1966        offset = write_array_data(
1967            &sliced_child_data,
1968            buffers,
1969            arrow_data,
1970            nodes,
1971            offset,
1972            sliced_child_data.len(),
1973            sliced_child_data.null_count(),
1974            compression_codec,
1975            compression_context,
1976            write_options,
1977        )?;
1978        return Ok(offset);
1979    } else if matches!(
1980        data_type,
1981        DataType::ListView(_) | DataType::LargeListView(_)
1982    ) {
1983        assert_eq!(array_data.buffers().len(), 2); // offsets + sizes
1984        assert_eq!(array_data.child_data().len(), 1);
1985
1986        let (offsets, sizes, child_data) = match data_type {
1987            DataType::ListView(_) => get_list_view_array_buffers::<i32>(array_data),
1988            DataType::LargeListView(_) => get_list_view_array_buffers::<i64>(array_data),
1989            _ => unreachable!(),
1990        };
1991
1992        offset = write_buffer(
1993            offsets.as_slice(),
1994            buffers,
1995            arrow_data,
1996            offset,
1997            compression_codec,
1998            compression_context,
1999            write_options.alignment,
2000        )?;
2001
2002        offset = write_buffer(
2003            sizes.as_slice(),
2004            buffers,
2005            arrow_data,
2006            offset,
2007            compression_codec,
2008            compression_context,
2009            write_options.alignment,
2010        )?;
2011
2012        offset = write_array_data(
2013            &child_data,
2014            buffers,
2015            arrow_data,
2016            nodes,
2017            offset,
2018            child_data.len(),
2019            child_data.null_count(),
2020            compression_codec,
2021            compression_context,
2022            write_options,
2023        )?;
2024        return Ok(offset);
2025    } else if let DataType::FixedSizeList(_, fixed_size) = data_type {
2026        assert_eq!(array_data.child_data().len(), 1);
2027        let fixed_size = *fixed_size as usize;
2028
2029        let child_offset = array_data.offset() * fixed_size;
2030        let child_length = array_data.len() * fixed_size;
2031        let child_data = array_data.child_data()[0].slice(child_offset, child_length);
2032
2033        offset = write_array_data(
2034            &child_data,
2035            buffers,
2036            arrow_data,
2037            nodes,
2038            offset,
2039            child_data.len(),
2040            child_data.null_count(),
2041            compression_codec,
2042            compression_context,
2043            write_options,
2044        )?;
2045        return Ok(offset);
2046    } else {
2047        for buffer in array_data.buffers() {
2048            offset = write_buffer(
2049                buffer,
2050                buffers,
2051                arrow_data,
2052                offset,
2053                compression_codec,
2054                compression_context,
2055                write_options.alignment,
2056            )?;
2057        }
2058    }
2059
2060    match array_data.data_type() {
2061        DataType::Dictionary(_, _) => {}
2062        DataType::RunEndEncoded(_, _) => {
2063            // unslice the run encoded array.
2064            let arr = unslice_run_array(array_data.clone())?;
2065            // recursively write out nested structures
2066            for data_ref in arr.child_data() {
2067                // write the nested data (e.g list data)
2068                offset = write_array_data(
2069                    data_ref,
2070                    buffers,
2071                    arrow_data,
2072                    nodes,
2073                    offset,
2074                    data_ref.len(),
2075                    data_ref.null_count(),
2076                    compression_codec,
2077                    compression_context,
2078                    write_options,
2079                )?;
2080            }
2081        }
2082        _ => {
2083            // recursively write out nested structures
2084            for data_ref in array_data.child_data() {
2085                // write the nested data (e.g list data)
2086                offset = write_array_data(
2087                    data_ref,
2088                    buffers,
2089                    arrow_data,
2090                    nodes,
2091                    offset,
2092                    data_ref.len(),
2093                    data_ref.null_count(),
2094                    compression_codec,
2095                    compression_context,
2096                    write_options,
2097                )?;
2098            }
2099        }
2100    }
2101    Ok(offset)
2102}
2103
2104/// Write a buffer into `arrow_data`, a vector of bytes, and adds its
2105/// [`crate::Buffer`] to `buffers`. Returns the new offset in `arrow_data`
2106///
2107///
2108/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
2109/// Each constituent buffer is first compressed with the indicated
2110/// compressor, and then written with the uncompressed length in the first 8
2111/// bytes as a 64-bit little-endian signed integer followed by the compressed
2112/// buffer bytes (and then padding as required by the protocol). The
2113/// uncompressed length may be set to -1 to indicate that the data that
2114/// follows is not compressed, which can be useful for cases where
2115/// compression does not yield appreciable savings.
2116fn write_buffer(
2117    buffer: &[u8],                    // input
2118    buffers: &mut Vec<crate::Buffer>, // output buffer descriptors
2119    arrow_data: &mut Vec<u8>,         // output stream
2120    offset: i64,                      // current output stream offset
2121    compression_codec: Option<CompressionCodec>,
2122    compression_context: &mut CompressionContext,
2123    alignment: u8,
2124) -> Result<i64, ArrowError> {
2125    let len: i64 = match compression_codec {
2126        Some(compressor) => compressor.compress_to_vec(buffer, arrow_data, compression_context)?,
2127        None => {
2128            arrow_data.extend_from_slice(buffer);
2129            buffer.len()
2130        }
2131    }
2132    .try_into()
2133    .map_err(|e| {
2134        ArrowError::InvalidArgumentError(format!("Could not convert compressed size to i64: {e}"))
2135    })?;
2136
2137    // make new index entry
2138    buffers.push(crate::Buffer::new(offset, len));
2139    // padding and make offset aligned
2140    let pad_len = pad_to_alignment(alignment, len as usize);
2141    arrow_data.extend_from_slice(&PADDING[..pad_len]);
2142
2143    Ok(offset + len + (pad_len as i64))
2144}
2145
2146const PADDING: [u8; 64] = [0; 64];
2147
2148/// Calculate an alignment boundary and return the number of bytes needed to pad to the alignment boundary
2149#[inline]
2150fn pad_to_alignment(alignment: u8, len: usize) -> usize {
2151    let a = usize::from(alignment - 1);
2152    ((len + a) & !a) - len
2153}
2154
2155#[cfg(test)]
2156mod tests {
2157    use std::hash::Hasher;
2158    use std::io::Cursor;
2159    use std::io::Seek;
2160
2161    use arrow_array::builder::FixedSizeListBuilder;
2162    use arrow_array::builder::Float32Builder;
2163    use arrow_array::builder::Int64Builder;
2164    use arrow_array::builder::MapBuilder;
2165    use arrow_array::builder::StringViewBuilder;
2166    use arrow_array::builder::UnionBuilder;
2167    use arrow_array::builder::{
2168        GenericListBuilder, GenericListViewBuilder, ListBuilder, StringBuilder,
2169    };
2170    use arrow_array::builder::{PrimitiveRunBuilder, UInt32Builder};
2171    use arrow_array::types::*;
2172    use arrow_buffer::ScalarBuffer;
2173
2174    use crate::MetadataVersion;
2175    use crate::convert::fb_to_schema;
2176    use crate::reader::*;
2177    use crate::root_as_footer;
2178
2179    use super::*;
2180
2181    fn serialize_file(rb: &RecordBatch) -> Vec<u8> {
2182        let mut writer = FileWriter::try_new(vec![], rb.schema_ref()).unwrap();
2183        writer.write(rb).unwrap();
2184        writer.finish().unwrap();
2185        writer.into_inner().unwrap()
2186    }
2187
2188    fn deserialize_file(bytes: Vec<u8>) -> RecordBatch {
2189        let mut reader = FileReader::try_new(Cursor::new(bytes), None).unwrap();
2190        reader.next().unwrap().unwrap()
2191    }
2192
2193    fn serialize_stream(record: &RecordBatch) -> Vec<u8> {
2194        // Use 8-byte alignment so that the various `truncate_*` tests can be compactly written,
2195        // without needing to construct a giant array to spill over the 64-byte default alignment
2196        // boundary.
2197        const IPC_ALIGNMENT: usize = 8;
2198
2199        let mut stream_writer = StreamWriter::try_new_with_options(
2200            vec![],
2201            record.schema_ref(),
2202            IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
2203        )
2204        .unwrap();
2205        stream_writer.write(record).unwrap();
2206        stream_writer.finish().unwrap();
2207        stream_writer.into_inner().unwrap()
2208    }
2209
2210    fn deserialize_stream(bytes: Vec<u8>) -> RecordBatch {
2211        let mut stream_reader = StreamReader::try_new(Cursor::new(bytes), None).unwrap();
2212        stream_reader.next().unwrap().unwrap()
2213    }
2214
2215    #[test]
2216    #[cfg(feature = "lz4")]
2217    fn test_write_empty_record_batch_lz4_compression() {
2218        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2219        let values: Vec<Option<i32>> = vec![];
2220        let array = Int32Array::from(values);
2221        let record_batch =
2222            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2223
2224        let mut file = tempfile::tempfile().unwrap();
2225
2226        {
2227            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2228                .unwrap()
2229                .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2230                .unwrap();
2231
2232            let mut writer =
2233                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2234            writer.write(&record_batch).unwrap();
2235            writer.finish().unwrap();
2236        }
2237        file.rewind().unwrap();
2238        {
2239            // read file
2240            let reader = FileReader::try_new(file, None).unwrap();
2241            for read_batch in reader {
2242                read_batch
2243                    .unwrap()
2244                    .columns()
2245                    .iter()
2246                    .zip(record_batch.columns())
2247                    .for_each(|(a, b)| {
2248                        assert_eq!(a.data_type(), b.data_type());
2249                        assert_eq!(a.len(), b.len());
2250                        assert_eq!(a.null_count(), b.null_count());
2251                    });
2252            }
2253        }
2254    }
2255
2256    #[test]
2257    #[cfg(feature = "lz4")]
2258    fn test_write_file_with_lz4_compression() {
2259        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2260        let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2261        let array = Int32Array::from(values);
2262        let record_batch =
2263            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2264
2265        let mut file = tempfile::tempfile().unwrap();
2266        {
2267            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2268                .unwrap()
2269                .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2270                .unwrap();
2271
2272            let mut writer =
2273                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2274            writer.write(&record_batch).unwrap();
2275            writer.finish().unwrap();
2276        }
2277        file.rewind().unwrap();
2278        {
2279            // read file
2280            let reader = FileReader::try_new(file, None).unwrap();
2281            for read_batch in reader {
2282                read_batch
2283                    .unwrap()
2284                    .columns()
2285                    .iter()
2286                    .zip(record_batch.columns())
2287                    .for_each(|(a, b)| {
2288                        assert_eq!(a.data_type(), b.data_type());
2289                        assert_eq!(a.len(), b.len());
2290                        assert_eq!(a.null_count(), b.null_count());
2291                    });
2292            }
2293        }
2294    }
2295
2296    #[test]
2297    #[cfg(feature = "zstd")]
2298    fn test_write_file_with_zstd_compression() {
2299        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2300        let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2301        let array = Int32Array::from(values);
2302        let record_batch =
2303            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2304        let mut file = tempfile::tempfile().unwrap();
2305        {
2306            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2307                .unwrap()
2308                .try_with_compression(Some(crate::CompressionType::ZSTD))
2309                .unwrap();
2310
2311            let mut writer =
2312                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2313            writer.write(&record_batch).unwrap();
2314            writer.finish().unwrap();
2315        }
2316        file.rewind().unwrap();
2317        {
2318            // read file
2319            let reader = FileReader::try_new(file, None).unwrap();
2320            for read_batch in reader {
2321                read_batch
2322                    .unwrap()
2323                    .columns()
2324                    .iter()
2325                    .zip(record_batch.columns())
2326                    .for_each(|(a, b)| {
2327                        assert_eq!(a.data_type(), b.data_type());
2328                        assert_eq!(a.len(), b.len());
2329                        assert_eq!(a.null_count(), b.null_count());
2330                    });
2331            }
2332        }
2333    }
2334
2335    #[test]
2336    fn test_write_file() {
2337        let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, true)]);
2338        let values: Vec<Option<u32>> = vec![
2339            Some(999),
2340            None,
2341            Some(235),
2342            Some(123),
2343            None,
2344            None,
2345            None,
2346            None,
2347            None,
2348        ];
2349        let array1 = UInt32Array::from(values);
2350        let batch =
2351            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array1) as ArrayRef])
2352                .unwrap();
2353        let mut file = tempfile::tempfile().unwrap();
2354        {
2355            let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2356
2357            writer.write(&batch).unwrap();
2358            writer.finish().unwrap();
2359        }
2360        file.rewind().unwrap();
2361
2362        {
2363            let mut reader = FileReader::try_new(file, None).unwrap();
2364            while let Some(Ok(read_batch)) = reader.next() {
2365                read_batch
2366                    .columns()
2367                    .iter()
2368                    .zip(batch.columns())
2369                    .for_each(|(a, b)| {
2370                        assert_eq!(a.data_type(), b.data_type());
2371                        assert_eq!(a.len(), b.len());
2372                        assert_eq!(a.null_count(), b.null_count());
2373                    });
2374            }
2375        }
2376    }
2377
2378    #[test]
2379    fn test_empty_utf8_ipc_writes_nonempty_offsets_buffer() {
2380        let name = StringArray::from(Vec::<String>::new());
2381        let (offsets, values) = get_byte_array_buffers::<i32>(&name.to_data());
2382
2383        assert_eq!(name.len(), 0);
2384        assert_eq!(
2385            offsets.len(),
2386            std::mem::size_of::<i32>(),
2387            "offsets buffer should contain one zero i32 offset"
2388        );
2389        assert_eq!(values.len(), 0, "values buffer should remain empty");
2390    }
2391
2392    #[test]
2393    fn test_empty_large_utf8_ipc_writes_nonempty_offsets_buffer() {
2394        let name = LargeStringArray::from(Vec::<String>::new());
2395        let (offsets, values) = get_byte_array_buffers::<i64>(&name.to_data());
2396
2397        assert_eq!(name.len(), 0);
2398        assert_eq!(
2399            offsets.len(),
2400            std::mem::size_of::<i64>(),
2401            "offsets buffer should contain one zero i64 offset"
2402        );
2403        assert_eq!(values.len(), 0, "values buffer should remain empty");
2404    }
2405
2406    #[test]
2407    fn test_empty_list_ipc_writes_nonempty_offsets_buffer() {
2408        let list = GenericListBuilder::<i32, _>::new(UInt32Builder::new()).finish();
2409        let (offsets, child_data) = get_list_array_buffers::<i32>(&list.to_data());
2410
2411        assert_eq!(list.len(), 0);
2412        assert_eq!(
2413            offsets.len(),
2414            std::mem::size_of::<i32>(),
2415            "offsets buffer should contain one zero i32 offset"
2416        );
2417        assert_eq!(child_data.len(), 0, "child data should remain empty");
2418    }
2419
2420    #[test]
2421    fn test_empty_large_list_ipc_writes_nonempty_offsets_buffer() {
2422        let list = GenericListBuilder::<i64, _>::new(UInt32Builder::new()).finish();
2423        let (offsets, child_data) = get_list_array_buffers::<i64>(&list.to_data());
2424
2425        assert_eq!(list.len(), 0);
2426        assert_eq!(
2427            offsets.len(),
2428            std::mem::size_of::<i64>(),
2429            "offsets buffer should contain one zero i64 offset"
2430        );
2431        assert_eq!(child_data.len(), 0, "child data should remain empty");
2432    }
2433
2434    fn write_null_file(options: IpcWriteOptions) {
2435        let schema = Schema::new(vec![
2436            Field::new("nulls", DataType::Null, true),
2437            Field::new("int32s", DataType::Int32, false),
2438            Field::new("nulls2", DataType::Null, true),
2439            Field::new("f64s", DataType::Float64, false),
2440        ]);
2441        let array1 = NullArray::new(32);
2442        let array2 = Int32Array::from(vec![1; 32]);
2443        let array3 = NullArray::new(32);
2444        let array4 = Float64Array::from(vec![f64::NAN; 32]);
2445        let batch = RecordBatch::try_new(
2446            Arc::new(schema.clone()),
2447            vec![
2448                Arc::new(array1) as ArrayRef,
2449                Arc::new(array2) as ArrayRef,
2450                Arc::new(array3) as ArrayRef,
2451                Arc::new(array4) as ArrayRef,
2452            ],
2453        )
2454        .unwrap();
2455        let mut file = tempfile::tempfile().unwrap();
2456        {
2457            let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2458
2459            writer.write(&batch).unwrap();
2460            writer.finish().unwrap();
2461        }
2462
2463        file.rewind().unwrap();
2464
2465        {
2466            let reader = FileReader::try_new(file, None).unwrap();
2467            reader.for_each(|maybe_batch| {
2468                maybe_batch
2469                    .unwrap()
2470                    .columns()
2471                    .iter()
2472                    .zip(batch.columns())
2473                    .for_each(|(a, b)| {
2474                        assert_eq!(a.data_type(), b.data_type());
2475                        assert_eq!(a.len(), b.len());
2476                        assert_eq!(a.null_count(), b.null_count());
2477                    });
2478            });
2479        }
2480    }
2481    #[test]
2482    fn test_write_null_file_v4() {
2483        write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2484        write_null_file(IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap());
2485        write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V4).unwrap());
2486        write_null_file(IpcWriteOptions::try_new(64, true, MetadataVersion::V4).unwrap());
2487    }
2488
2489    #[test]
2490    fn test_write_null_file_v5() {
2491        write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2492        write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V5).unwrap());
2493    }
2494
2495    #[test]
2496    fn track_union_nested_dict() {
2497        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2498
2499        let array = Arc::new(inner) as ArrayRef;
2500
2501        // Dict field with id 2
2502        #[allow(deprecated)]
2503        let dctfield = Field::new_dict("dict", array.data_type().clone(), false, 0, false);
2504        let union_fields = [(0, Arc::new(dctfield))].into_iter().collect();
2505
2506        let types = [0, 0, 0].into_iter().collect::<ScalarBuffer<i8>>();
2507        let offsets = [0, 1, 2].into_iter().collect::<ScalarBuffer<i32>>();
2508
2509        let union = UnionArray::try_new(union_fields, types, Some(offsets), vec![array]).unwrap();
2510
2511        let schema = Arc::new(Schema::new(vec![Field::new(
2512            "union",
2513            union.data_type().clone(),
2514            false,
2515        )]));
2516
2517        let r#gen = IpcDataGenerator::default();
2518        let mut dict_tracker = DictionaryTracker::new(false);
2519        r#gen.schema_to_bytes_with_dictionary_tracker(
2520            &schema,
2521            &mut dict_tracker,
2522            &IpcWriteOptions::default(),
2523        );
2524
2525        let batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
2526
2527        r#gen
2528            .encode(
2529                &batch,
2530                &mut dict_tracker,
2531                &Default::default(),
2532                &mut Default::default(),
2533            )
2534            .unwrap();
2535
2536        // The encoder will assign dict IDs itself to ensure uniqueness and ignore the dict ID in the schema
2537        // so we expect the dict will be keyed to 0
2538        assert!(dict_tracker.written.contains_key(&0));
2539    }
2540
2541    #[test]
2542    fn track_struct_nested_dict() {
2543        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2544
2545        let array = Arc::new(inner) as ArrayRef;
2546
2547        // Dict field with id 2
2548        #[allow(deprecated)]
2549        let dctfield = Arc::new(Field::new_dict(
2550            "dict",
2551            array.data_type().clone(),
2552            false,
2553            2,
2554            false,
2555        ));
2556
2557        let s = StructArray::from(vec![(dctfield, array)]);
2558        let struct_array = Arc::new(s) as ArrayRef;
2559
2560        let schema = Arc::new(Schema::new(vec![Field::new(
2561            "struct",
2562            struct_array.data_type().clone(),
2563            false,
2564        )]));
2565
2566        let r#gen = IpcDataGenerator::default();
2567        let mut dict_tracker = DictionaryTracker::new(false);
2568        r#gen.schema_to_bytes_with_dictionary_tracker(
2569            &schema,
2570            &mut dict_tracker,
2571            &IpcWriteOptions::default(),
2572        );
2573
2574        let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2575
2576        r#gen
2577            .encode(
2578                &batch,
2579                &mut dict_tracker,
2580                &Default::default(),
2581                &mut Default::default(),
2582            )
2583            .unwrap();
2584
2585        assert!(dict_tracker.written.contains_key(&0));
2586    }
2587
2588    fn write_union_file(options: IpcWriteOptions) {
2589        let schema = Schema::new(vec![Field::new_union(
2590            "union",
2591            vec![0, 1],
2592            vec![
2593                Field::new("a", DataType::Int32, false),
2594                Field::new("c", DataType::Float64, false),
2595            ],
2596            UnionMode::Sparse,
2597        )]);
2598        let mut builder = UnionBuilder::with_capacity_sparse(5);
2599        builder.append::<Int32Type>("a", 1).unwrap();
2600        builder.append_null::<Int32Type>("a").unwrap();
2601        builder.append::<Float64Type>("c", 3.0).unwrap();
2602        builder.append_null::<Float64Type>("c").unwrap();
2603        builder.append::<Int32Type>("a", 4).unwrap();
2604        let union = builder.build().unwrap();
2605
2606        let batch =
2607            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(union) as ArrayRef])
2608                .unwrap();
2609
2610        let mut file = tempfile::tempfile().unwrap();
2611        {
2612            let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2613
2614            writer.write(&batch).unwrap();
2615            writer.finish().unwrap();
2616        }
2617        file.rewind().unwrap();
2618
2619        {
2620            let reader = FileReader::try_new(file, None).unwrap();
2621            reader.for_each(|maybe_batch| {
2622                maybe_batch
2623                    .unwrap()
2624                    .columns()
2625                    .iter()
2626                    .zip(batch.columns())
2627                    .for_each(|(a, b)| {
2628                        assert_eq!(a.data_type(), b.data_type());
2629                        assert_eq!(a.len(), b.len());
2630                        assert_eq!(a.null_count(), b.null_count());
2631                    });
2632            });
2633        }
2634    }
2635
2636    #[test]
2637    fn test_write_union_file_v4_v5() {
2638        write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2639        write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2640    }
2641
2642    #[test]
2643    fn test_write_view_types() {
2644        const LONG_TEST_STRING: &str =
2645            "This is a long string to make sure binary view array handles it";
2646        let schema = Schema::new(vec![
2647            Field::new("field1", DataType::BinaryView, true),
2648            Field::new("field2", DataType::Utf8View, true),
2649        ]);
2650        let values: Vec<Option<&[u8]>> = vec![
2651            Some(b"foo"),
2652            Some(b"bar"),
2653            Some(LONG_TEST_STRING.as_bytes()),
2654        ];
2655        let binary_array = BinaryViewArray::from_iter(values);
2656        let utf8_array =
2657            StringViewArray::from_iter(vec![Some("foo"), Some("bar"), Some(LONG_TEST_STRING)]);
2658        let record_batch = RecordBatch::try_new(
2659            Arc::new(schema.clone()),
2660            vec![Arc::new(binary_array), Arc::new(utf8_array)],
2661        )
2662        .unwrap();
2663
2664        let mut file = tempfile::tempfile().unwrap();
2665        {
2666            let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2667            writer.write(&record_batch).unwrap();
2668            writer.finish().unwrap();
2669        }
2670        file.rewind().unwrap();
2671        {
2672            let mut reader = FileReader::try_new(&file, None).unwrap();
2673            let read_batch = reader.next().unwrap().unwrap();
2674            read_batch
2675                .columns()
2676                .iter()
2677                .zip(record_batch.columns())
2678                .for_each(|(a, b)| {
2679                    assert_eq!(a, b);
2680                });
2681        }
2682        file.rewind().unwrap();
2683        {
2684            let mut reader = FileReader::try_new(&file, Some(vec![0])).unwrap();
2685            let read_batch = reader.next().unwrap().unwrap();
2686            assert_eq!(read_batch.num_columns(), 1);
2687            let read_array = read_batch.column(0);
2688            let write_array = record_batch.column(0);
2689            assert_eq!(read_array, write_array);
2690        }
2691    }
2692
2693    #[test]
2694    fn truncate_ipc_record_batch() {
2695        fn create_batch(rows: usize) -> RecordBatch {
2696            let schema = Schema::new(vec![
2697                Field::new("a", DataType::Int32, false),
2698                Field::new("b", DataType::Utf8, false),
2699            ]);
2700
2701            let a = Int32Array::from_iter_values(0..rows as i32);
2702            let b = StringArray::from_iter_values((0..rows).map(|i| i.to_string()));
2703
2704            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2705        }
2706
2707        let big_record_batch = create_batch(65536);
2708
2709        let length = 5;
2710        let small_record_batch = create_batch(length);
2711
2712        let offset = 2;
2713        let record_batch_slice = big_record_batch.slice(offset, length);
2714        assert!(
2715            serialize_stream(&big_record_batch).len() > serialize_stream(&small_record_batch).len()
2716        );
2717        assert_eq!(
2718            serialize_stream(&small_record_batch).len(),
2719            serialize_stream(&record_batch_slice).len()
2720        );
2721
2722        assert_eq!(
2723            deserialize_stream(serialize_stream(&record_batch_slice)),
2724            record_batch_slice
2725        );
2726    }
2727
2728    #[test]
2729    fn truncate_ipc_record_batch_with_nulls() {
2730        fn create_batch() -> RecordBatch {
2731            let schema = Schema::new(vec![
2732                Field::new("a", DataType::Int32, true),
2733                Field::new("b", DataType::Utf8, true),
2734            ]);
2735
2736            let a = Int32Array::from(vec![Some(1), None, Some(1), None, Some(1)]);
2737            let b = StringArray::from(vec![None, Some("a"), Some("a"), None, Some("a")]);
2738
2739            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2740        }
2741
2742        let record_batch = create_batch();
2743        let record_batch_slice = record_batch.slice(1, 2);
2744        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2745
2746        assert!(
2747            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2748        );
2749
2750        assert!(deserialized_batch.column(0).is_null(0));
2751        assert!(deserialized_batch.column(0).is_valid(1));
2752        assert!(deserialized_batch.column(1).is_valid(0));
2753        assert!(deserialized_batch.column(1).is_valid(1));
2754
2755        assert_eq!(record_batch_slice, deserialized_batch);
2756    }
2757
2758    #[test]
2759    fn truncate_ipc_dictionary_array() {
2760        fn create_batch() -> RecordBatch {
2761            let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
2762                .into_iter()
2763                .collect();
2764            let keys: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2765
2766            let array = DictionaryArray::new(keys, Arc::new(values));
2767
2768            let schema = Schema::new(vec![Field::new("dict", array.data_type().clone(), true)]);
2769
2770            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
2771        }
2772
2773        let record_batch = create_batch();
2774        let record_batch_slice = record_batch.slice(1, 2);
2775        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2776
2777        assert!(
2778            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2779        );
2780
2781        assert!(deserialized_batch.column(0).is_valid(0));
2782        assert!(deserialized_batch.column(0).is_null(1));
2783
2784        assert_eq!(record_batch_slice, deserialized_batch);
2785    }
2786
2787    #[test]
2788    fn truncate_ipc_struct_array() {
2789        fn create_batch() -> RecordBatch {
2790            let strings: StringArray = [Some("foo"), None, Some("bar"), Some("baz")]
2791                .into_iter()
2792                .collect();
2793            let ints: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2794
2795            let struct_array = StructArray::from(vec![
2796                (
2797                    Arc::new(Field::new("s", DataType::Utf8, true)),
2798                    Arc::new(strings) as ArrayRef,
2799                ),
2800                (
2801                    Arc::new(Field::new("c", DataType::Int32, true)),
2802                    Arc::new(ints) as ArrayRef,
2803                ),
2804            ]);
2805
2806            let schema = Schema::new(vec![Field::new(
2807                "struct_array",
2808                struct_array.data_type().clone(),
2809                true,
2810            )]);
2811
2812            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(struct_array)]).unwrap()
2813        }
2814
2815        let record_batch = create_batch();
2816        let record_batch_slice = record_batch.slice(1, 2);
2817        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2818
2819        assert!(
2820            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2821        );
2822
2823        let structs = deserialized_batch
2824            .column(0)
2825            .as_any()
2826            .downcast_ref::<StructArray>()
2827            .unwrap();
2828
2829        assert!(structs.column(0).is_null(0));
2830        assert!(structs.column(0).is_valid(1));
2831        assert!(structs.column(1).is_valid(0));
2832        assert!(structs.column(1).is_null(1));
2833        assert_eq!(record_batch_slice, deserialized_batch);
2834    }
2835
2836    #[test]
2837    fn truncate_ipc_string_array_with_all_empty_string() {
2838        fn create_batch() -> RecordBatch {
2839            let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
2840            let a = StringArray::from(vec![Some(""), Some(""), Some(""), Some(""), Some("")]);
2841            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap()
2842        }
2843
2844        let record_batch = create_batch();
2845        let record_batch_slice = record_batch.slice(0, 1);
2846        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2847
2848        assert!(
2849            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2850        );
2851        assert_eq!(record_batch_slice, deserialized_batch);
2852    }
2853
2854    #[test]
2855    fn test_stream_writer_writes_array_slice() {
2856        let array = UInt32Array::from(vec![Some(1), Some(2), Some(3)]);
2857        assert_eq!(
2858            vec![Some(1), Some(2), Some(3)],
2859            array.iter().collect::<Vec<_>>()
2860        );
2861
2862        let sliced = array.slice(1, 2);
2863        assert_eq!(vec![Some(2), Some(3)], sliced.iter().collect::<Vec<_>>());
2864
2865        let batch = RecordBatch::try_new(
2866            Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, true)])),
2867            vec![Arc::new(sliced)],
2868        )
2869        .expect("new batch");
2870
2871        let mut writer = StreamWriter::try_new(vec![], batch.schema_ref()).expect("new writer");
2872        writer.write(&batch).expect("write");
2873        let outbuf = writer.into_inner().expect("inner");
2874
2875        let mut reader = StreamReader::try_new(&outbuf[..], None).expect("new reader");
2876        let read_batch = reader.next().unwrap().expect("read batch");
2877
2878        let read_array: &UInt32Array = read_batch.column(0).as_primitive();
2879        assert_eq!(
2880            vec![Some(2), Some(3)],
2881            read_array.iter().collect::<Vec<_>>()
2882        );
2883    }
2884
2885    #[test]
2886    fn test_large_slice_uint32() {
2887        ensure_roundtrip(Arc::new(UInt32Array::from_iter(
2888            (0..8000).map(|i| if i % 2 == 0 { Some(i) } else { None }),
2889        )));
2890    }
2891
2892    #[test]
2893    fn test_large_slice_string() {
2894        let strings: Vec<_> = (0..8000)
2895            .map(|i| {
2896                if i % 2 == 0 {
2897                    Some(format!("value{i}"))
2898                } else {
2899                    None
2900                }
2901            })
2902            .collect();
2903
2904        ensure_roundtrip(Arc::new(StringArray::from(strings)));
2905    }
2906
2907    #[test]
2908    fn test_large_slice_string_list() {
2909        let mut ls = ListBuilder::new(StringBuilder::new());
2910
2911        let mut s = String::new();
2912        for row_number in 0..8000 {
2913            if row_number % 2 == 0 {
2914                for list_element in 0..1000 {
2915                    s.clear();
2916                    use std::fmt::Write;
2917                    write!(&mut s, "value{row_number}-{list_element}").unwrap();
2918                    ls.values().append_value(&s);
2919                }
2920                ls.append(true)
2921            } else {
2922                ls.append(false); // null
2923            }
2924        }
2925
2926        ensure_roundtrip(Arc::new(ls.finish()));
2927    }
2928
2929    #[test]
2930    fn test_large_slice_string_list_of_lists() {
2931        // The reason for the special test is to verify reencode_offsets which looks both at
2932        // the starting offset and the data offset.  So need a dataset where the starting_offset
2933        // is zero but the data offset is not.
2934        let mut ls = ListBuilder::new(ListBuilder::new(StringBuilder::new()));
2935
2936        for _ in 0..4000 {
2937            ls.values().append(true);
2938            ls.append(true)
2939        }
2940
2941        let mut s = String::new();
2942        for row_number in 0..4000 {
2943            if row_number % 2 == 0 {
2944                for list_element in 0..1000 {
2945                    s.clear();
2946                    use std::fmt::Write;
2947                    write!(&mut s, "value{row_number}-{list_element}").unwrap();
2948                    ls.values().values().append_value(&s);
2949                }
2950                ls.values().append(true);
2951                ls.append(true)
2952            } else {
2953                ls.append(false); // null
2954            }
2955        }
2956
2957        ensure_roundtrip(Arc::new(ls.finish()));
2958    }
2959
2960    /// Read/write a record batch to a File and Stream and ensure it is the same at the outout
2961    fn ensure_roundtrip(array: ArrayRef) {
2962        let num_rows = array.len();
2963        let orig_batch = RecordBatch::try_from_iter(vec![("a", array)]).unwrap();
2964        // take off the first element
2965        let sliced_batch = orig_batch.slice(1, num_rows - 1);
2966
2967        let schema = orig_batch.schema();
2968        let stream_data = {
2969            let mut writer = StreamWriter::try_new(vec![], &schema).unwrap();
2970            writer.write(&sliced_batch).unwrap();
2971            writer.into_inner().unwrap()
2972        };
2973        let read_batch = {
2974            let projection = None;
2975            let mut reader = StreamReader::try_new(Cursor::new(stream_data), projection).unwrap();
2976            reader
2977                .next()
2978                .expect("expect no errors reading batch")
2979                .expect("expect batch")
2980        };
2981        assert_eq!(sliced_batch, read_batch);
2982
2983        let file_data = {
2984            let mut writer = FileWriter::try_new_buffered(vec![], &schema).unwrap();
2985            writer.write(&sliced_batch).unwrap();
2986            writer.into_inner().unwrap().into_inner().unwrap()
2987        };
2988        let read_batch = {
2989            let projection = None;
2990            let mut reader = FileReader::try_new(Cursor::new(file_data), projection).unwrap();
2991            reader
2992                .next()
2993                .expect("expect no errors reading batch")
2994                .expect("expect batch")
2995        };
2996        assert_eq!(sliced_batch, read_batch);
2997
2998        // TODO test file writer/reader
2999    }
3000
3001    #[test]
3002    fn encode_bools_slice() {
3003        // Test case for https://github.com/apache/arrow-rs/issues/3496
3004        assert_bool_roundtrip([true, false], 1, 1);
3005
3006        // slice somewhere in the middle
3007        assert_bool_roundtrip(
3008            [
3009                true, false, true, true, false, false, true, true, true, false, false, false, true,
3010                true, true, true, false, false, false, false, true, true, true, true, true, false,
3011                false, false, false, false,
3012            ],
3013            13,
3014            17,
3015        );
3016
3017        // start at byte boundary, end in the middle
3018        assert_bool_roundtrip(
3019            [
3020                true, false, true, true, false, false, true, true, true, false, false, false,
3021            ],
3022            8,
3023            2,
3024        );
3025
3026        // start and stop and byte boundary
3027        assert_bool_roundtrip(
3028            [
3029                true, false, true, true, false, false, true, true, true, false, false, false, true,
3030                true, true, true, true, false, false, false, false, false,
3031            ],
3032            8,
3033            8,
3034        );
3035    }
3036
3037    fn assert_bool_roundtrip<const N: usize>(bools: [bool; N], offset: usize, length: usize) {
3038        let val_bool_field = Field::new("val", DataType::Boolean, false);
3039
3040        let schema = Arc::new(Schema::new(vec![val_bool_field]));
3041
3042        let bools = BooleanArray::from(bools.to_vec());
3043
3044        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(bools)]).unwrap();
3045        let batch = batch.slice(offset, length);
3046
3047        let data = serialize_stream(&batch);
3048        let batch2 = deserialize_stream(data);
3049        assert_eq!(batch, batch2);
3050    }
3051
3052    #[test]
3053    fn test_run_array_unslice() {
3054        let total_len = 80;
3055        let vals: Vec<Option<i32>> = vec![Some(1), None, Some(2), Some(3), Some(4), None, Some(5)];
3056        let repeats: Vec<usize> = vec![3, 4, 1, 2];
3057        let mut input_array: Vec<Option<i32>> = Vec::with_capacity(total_len);
3058        for ix in 0_usize..32 {
3059            let repeat: usize = repeats[ix % repeats.len()];
3060            let val: Option<i32> = vals[ix % vals.len()];
3061            input_array.resize(input_array.len() + repeat, val);
3062        }
3063
3064        // Encode the input_array to run array
3065        let mut builder =
3066            PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
3067        builder.extend(input_array.iter().copied());
3068        let run_array = builder.finish();
3069
3070        // test for all slice lengths.
3071        for slice_len in 1..=total_len {
3072            // test for offset = 0, slice length = slice_len
3073            let sliced_run_array: RunArray<Int16Type> =
3074                run_array.slice(0, slice_len).into_data().into();
3075
3076            // Create unsliced run array.
3077            let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
3078            let typed = unsliced_run_array
3079                .downcast::<PrimitiveArray<Int32Type>>()
3080                .unwrap();
3081            let expected: Vec<Option<i32>> = input_array.iter().take(slice_len).copied().collect();
3082            let actual: Vec<Option<i32>> = typed.into_iter().collect();
3083            assert_eq!(expected, actual);
3084
3085            // test for offset = total_len - slice_len, length = slice_len
3086            let sliced_run_array: RunArray<Int16Type> = run_array
3087                .slice(total_len - slice_len, slice_len)
3088                .into_data()
3089                .into();
3090
3091            // Create unsliced run array.
3092            let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
3093            let typed = unsliced_run_array
3094                .downcast::<PrimitiveArray<Int32Type>>()
3095                .unwrap();
3096            let expected: Vec<Option<i32>> = input_array
3097                .iter()
3098                .skip(total_len - slice_len)
3099                .copied()
3100                .collect();
3101            let actual: Vec<Option<i32>> = typed.into_iter().collect();
3102            assert_eq!(expected, actual);
3103        }
3104    }
3105
3106    fn generate_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3107        let mut ls = GenericListBuilder::<O, _>::new(UInt32Builder::new());
3108
3109        for i in 0..100_000 {
3110            for value in [i, i, i] {
3111                ls.values().append_value(value);
3112            }
3113            ls.append(true)
3114        }
3115
3116        ls.finish()
3117    }
3118
3119    fn generate_utf8view_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3120        let mut ls = GenericListBuilder::<O, _>::new(StringViewBuilder::new());
3121
3122        for i in 0..100_000 {
3123            for value in [
3124                format!("value{}", i),
3125                format!("value{}", i),
3126                format!("value{}", i),
3127            ] {
3128                ls.values().append_value(&value);
3129            }
3130            ls.append(true)
3131        }
3132
3133        ls.finish()
3134    }
3135
3136    fn generate_string_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3137        let mut ls = GenericListBuilder::<O, _>::new(StringBuilder::new());
3138
3139        for i in 0..100_000 {
3140            for value in [
3141                format!("value{}", i),
3142                format!("value{}", i),
3143                format!("value{}", i),
3144            ] {
3145                ls.values().append_value(&value);
3146            }
3147            ls.append(true)
3148        }
3149
3150        ls.finish()
3151    }
3152
3153    fn generate_nested_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3154        let mut ls =
3155            GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
3156
3157        for _i in 0..10_000 {
3158            for j in 0..10 {
3159                for value in [j, j, j, j] {
3160                    ls.values().values().append_value(value);
3161                }
3162                ls.values().append(true)
3163            }
3164            ls.append(true);
3165        }
3166
3167        ls.finish()
3168    }
3169
3170    fn generate_nested_list_data_starting_at_zero<O: OffsetSizeTrait>() -> GenericListArray<O> {
3171        let mut ls =
3172            GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
3173
3174        for _i in 0..999 {
3175            ls.values().append(true);
3176            ls.append(true);
3177        }
3178
3179        for j in 0..10 {
3180            for value in [j, j, j, j] {
3181                ls.values().values().append_value(value);
3182            }
3183            ls.values().append(true)
3184        }
3185        ls.append(true);
3186
3187        for i in 0..9_000 {
3188            for j in 0..10 {
3189                for value in [i + j, i + j, i + j, i + j] {
3190                    ls.values().values().append_value(value);
3191                }
3192                ls.values().append(true)
3193            }
3194            ls.append(true);
3195        }
3196
3197        ls.finish()
3198    }
3199
3200    fn generate_map_array_data() -> MapArray {
3201        let keys_builder = UInt32Builder::new();
3202        let values_builder = UInt32Builder::new();
3203
3204        let mut builder = MapBuilder::new(None, keys_builder, values_builder);
3205
3206        for i in 0..100_000 {
3207            for _j in 0..3 {
3208                builder.keys().append_value(i);
3209                builder.values().append_value(i * 2);
3210            }
3211            builder.append(true).unwrap();
3212        }
3213
3214        builder.finish()
3215    }
3216
3217    #[test]
3218    fn reencode_offsets_when_first_offset_is_not_zero() {
3219        let original_list = generate_list_data::<i32>();
3220        let original_data = original_list.into_data();
3221        let slice_data = original_data.slice(75, 7);
3222        let (new_offsets, original_start, length) =
3223            reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
3224        assert_eq!(
3225            vec![0, 3, 6, 9, 12, 15, 18, 21],
3226            new_offsets.typed_data::<i32>()
3227        );
3228        assert_eq!(225, original_start);
3229        assert_eq!(21, length);
3230    }
3231
3232    #[test]
3233    fn reencode_offsets_when_first_offset_is_zero() {
3234        let mut ls = GenericListBuilder::<i32, _>::new(UInt32Builder::new());
3235        // ls = [[], [35, 42]
3236        ls.append(true);
3237        ls.values().append_value(35);
3238        ls.values().append_value(42);
3239        ls.append(true);
3240        let original_list = ls.finish();
3241        let original_data = original_list.into_data();
3242
3243        let slice_data = original_data.slice(1, 1);
3244        let (new_offsets, original_start, length) =
3245            reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
3246        assert_eq!(vec![0, 2], new_offsets.typed_data::<i32>());
3247        assert_eq!(0, original_start);
3248        assert_eq!(2, length);
3249    }
3250
3251    /// Ensure when serde full & sliced versions they are equal to original input.
3252    /// Also ensure serialized sliced version is significantly smaller than serialized full.
3253    fn roundtrip_ensure_sliced_smaller(in_batch: RecordBatch, expected_size_factor: usize) {
3254        // test both full and sliced versions
3255        let in_sliced = in_batch.slice(999, 1);
3256
3257        let bytes_batch = serialize_file(&in_batch);
3258        let bytes_sliced = serialize_file(&in_sliced);
3259
3260        // serializing 1 row should be significantly smaller than serializing 100,000
3261        assert!(bytes_sliced.len() < (bytes_batch.len() / expected_size_factor));
3262
3263        // ensure both are still valid and equal to originals
3264        let out_batch = deserialize_file(bytes_batch);
3265        assert_eq!(in_batch, out_batch);
3266
3267        let out_sliced = deserialize_file(bytes_sliced);
3268        assert_eq!(in_sliced, out_sliced);
3269    }
3270
3271    #[test]
3272    fn encode_lists() {
3273        let val_inner = Field::new_list_field(DataType::UInt32, true);
3274        let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
3275        let schema = Arc::new(Schema::new(vec![val_list_field]));
3276
3277        let values = Arc::new(generate_list_data::<i32>());
3278
3279        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3280        roundtrip_ensure_sliced_smaller(in_batch, 1000);
3281    }
3282
3283    #[test]
3284    fn encode_empty_list() {
3285        let val_inner = Field::new_list_field(DataType::UInt32, true);
3286        let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
3287        let schema = Arc::new(Schema::new(vec![val_list_field]));
3288
3289        let values = Arc::new(generate_list_data::<i32>());
3290
3291        let in_batch = RecordBatch::try_new(schema, vec![values])
3292            .unwrap()
3293            .slice(999, 0);
3294        let out_batch = deserialize_file(serialize_file(&in_batch));
3295        assert_eq!(in_batch, out_batch);
3296    }
3297
3298    #[test]
3299    fn encode_large_lists() {
3300        let val_inner = Field::new_list_field(DataType::UInt32, true);
3301        let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3302        let schema = Arc::new(Schema::new(vec![val_list_field]));
3303
3304        let values = Arc::new(generate_list_data::<i64>());
3305
3306        // ensure when serde full & sliced versions they are equal to original input
3307        // also ensure serialized sliced version is significantly smaller than serialized full
3308        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3309        roundtrip_ensure_sliced_smaller(in_batch, 1000);
3310    }
3311
3312    #[test]
3313    fn encode_large_lists_non_zero_offset() {
3314        let val_inner = Field::new_list_field(DataType::UInt32, true);
3315        let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3316        let schema = Arc::new(Schema::new(vec![val_list_field]));
3317
3318        let values = Arc::new(generate_list_data::<i64>());
3319
3320        check_sliced_list_array(schema, values);
3321    }
3322
3323    #[test]
3324    fn encode_large_lists_string_non_zero_offset() {
3325        let val_inner = Field::new_list_field(DataType::Utf8, true);
3326        let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3327        let schema = Arc::new(Schema::new(vec![val_list_field]));
3328
3329        let values = Arc::new(generate_string_list_data::<i64>());
3330
3331        check_sliced_list_array(schema, values);
3332    }
3333
3334    #[test]
3335    fn encode_large_list_string_view_non_zero_offset() {
3336        let val_inner = Field::new_list_field(DataType::Utf8View, true);
3337        let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3338        let schema = Arc::new(Schema::new(vec![val_list_field]));
3339
3340        let values = Arc::new(generate_utf8view_list_data::<i64>());
3341
3342        check_sliced_list_array(schema, values);
3343    }
3344
3345    fn check_sliced_list_array(schema: Arc<Schema>, values: Arc<GenericListArray<i64>>) {
3346        for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3347            let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3348                .unwrap()
3349                .slice(offset, len);
3350            let out_batch = deserialize_file(serialize_file(&in_batch));
3351            assert_eq!(in_batch, out_batch);
3352        }
3353    }
3354
3355    #[test]
3356    fn encode_nested_lists() {
3357        let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
3358        let inner_list_field = Arc::new(Field::new_list_field(DataType::List(inner_int), true));
3359        let list_field = Field::new("val", DataType::List(inner_list_field), true);
3360        let schema = Arc::new(Schema::new(vec![list_field]));
3361
3362        let values = Arc::new(generate_nested_list_data::<i32>());
3363
3364        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3365        roundtrip_ensure_sliced_smaller(in_batch, 1000);
3366    }
3367
3368    #[test]
3369    fn encode_nested_lists_starting_at_zero() {
3370        let inner_int = Arc::new(Field::new("item", DataType::UInt32, true));
3371        let inner_list_field = Arc::new(Field::new("item", DataType::List(inner_int), true));
3372        let list_field = Field::new("val", DataType::List(inner_list_field), true);
3373        let schema = Arc::new(Schema::new(vec![list_field]));
3374
3375        let values = Arc::new(generate_nested_list_data_starting_at_zero::<i32>());
3376
3377        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3378        roundtrip_ensure_sliced_smaller(in_batch, 1);
3379    }
3380
3381    #[test]
3382    fn encode_map_array() {
3383        let keys = Arc::new(Field::new("keys", DataType::UInt32, false));
3384        let values = Arc::new(Field::new("values", DataType::UInt32, true));
3385        let map_field = Field::new_map("map", "entries", keys, values, false, true);
3386        let schema = Arc::new(Schema::new(vec![map_field]));
3387
3388        let values = Arc::new(generate_map_array_data());
3389
3390        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3391        roundtrip_ensure_sliced_smaller(in_batch, 1000);
3392    }
3393
3394    fn generate_list_view_data<O: OffsetSizeTrait>() -> GenericListViewArray<O> {
3395        let mut builder = GenericListViewBuilder::<O, _>::new(UInt32Builder::new());
3396
3397        for i in 0u32..100_000 {
3398            if i.is_multiple_of(10_000) {
3399                builder.append(false);
3400                continue;
3401            }
3402            for value in [i, i, i] {
3403                builder.values().append_value(value);
3404            }
3405            builder.append(true);
3406        }
3407
3408        builder.finish()
3409    }
3410
3411    #[test]
3412    fn encode_list_view_arrays() {
3413        let val_inner = Field::new_list_field(DataType::UInt32, true);
3414        let val_field = Field::new("val", DataType::ListView(Arc::new(val_inner)), true);
3415        let schema = Arc::new(Schema::new(vec![val_field]));
3416
3417        let values = Arc::new(generate_list_view_data::<i32>());
3418
3419        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3420        let out_batch = deserialize_file(serialize_file(&in_batch));
3421        assert_eq!(in_batch, out_batch);
3422    }
3423
3424    #[test]
3425    fn encode_large_list_view_arrays() {
3426        let val_inner = Field::new_list_field(DataType::UInt32, true);
3427        let val_field = Field::new("val", DataType::LargeListView(Arc::new(val_inner)), true);
3428        let schema = Arc::new(Schema::new(vec![val_field]));
3429
3430        let values = Arc::new(generate_list_view_data::<i64>());
3431
3432        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3433        let out_batch = deserialize_file(serialize_file(&in_batch));
3434        assert_eq!(in_batch, out_batch);
3435    }
3436
3437    #[test]
3438    fn check_sliced_list_view_array() {
3439        let inner = Field::new_list_field(DataType::UInt32, true);
3440        let field = Field::new("val", DataType::ListView(Arc::new(inner)), true);
3441        let schema = Arc::new(Schema::new(vec![field]));
3442        let values = Arc::new(generate_list_view_data::<i32>());
3443
3444        for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3445            let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3446                .unwrap()
3447                .slice(offset, len);
3448            let out_batch = deserialize_file(serialize_file(&in_batch));
3449            assert_eq!(in_batch, out_batch);
3450        }
3451    }
3452
3453    #[test]
3454    fn check_sliced_large_list_view_array() {
3455        let inner = Field::new_list_field(DataType::UInt32, true);
3456        let field = Field::new("val", DataType::LargeListView(Arc::new(inner)), true);
3457        let schema = Arc::new(Schema::new(vec![field]));
3458        let values = Arc::new(generate_list_view_data::<i64>());
3459
3460        for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3461            let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3462                .unwrap()
3463                .slice(offset, len);
3464            let out_batch = deserialize_file(serialize_file(&in_batch));
3465            assert_eq!(in_batch, out_batch);
3466        }
3467    }
3468
3469    fn generate_nested_list_view_data<O: OffsetSizeTrait>() -> GenericListViewArray<O> {
3470        let inner_builder = UInt32Builder::new();
3471        let middle_builder = GenericListViewBuilder::<O, _>::new(inner_builder);
3472        let mut outer_builder = GenericListViewBuilder::<O, _>::new(middle_builder);
3473
3474        for i in 0u32..10_000 {
3475            if i.is_multiple_of(1_000) {
3476                outer_builder.append(false);
3477                continue;
3478            }
3479
3480            for _ in 0..3 {
3481                for value in [i, i + 1, i + 2] {
3482                    outer_builder.values().values().append_value(value);
3483                }
3484                outer_builder.values().append(true);
3485            }
3486            outer_builder.append(true);
3487        }
3488
3489        outer_builder.finish()
3490    }
3491
3492    #[test]
3493    fn encode_nested_list_views() {
3494        let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
3495        let inner_list_field = Arc::new(Field::new_list_field(DataType::ListView(inner_int), true));
3496        let list_field = Field::new("val", DataType::ListView(inner_list_field), true);
3497        let schema = Arc::new(Schema::new(vec![list_field]));
3498
3499        let values = Arc::new(generate_nested_list_view_data::<i32>());
3500
3501        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3502        let out_batch = deserialize_file(serialize_file(&in_batch));
3503        assert_eq!(in_batch, out_batch);
3504    }
3505
3506    fn test_roundtrip_list_view_of_dict_impl<OffsetSize: OffsetSizeTrait, U: ArrowNativeType>(
3507        list_data_type: DataType,
3508        offsets: &[U; 5],
3509        sizes: &[U; 4],
3510    ) {
3511        let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3512        let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3513        let dict_array = DictionaryArray::new(keys, Arc::new(values));
3514        let dict_data = dict_array.to_data();
3515
3516        let value_offsets = Buffer::from_slice_ref(offsets);
3517        let value_sizes = Buffer::from_slice_ref(sizes);
3518
3519        let list_data = ArrayData::builder(list_data_type)
3520            .len(4)
3521            .add_buffer(value_offsets)
3522            .add_buffer(value_sizes)
3523            .add_child_data(dict_data)
3524            .build()
3525            .unwrap();
3526        let list_view_array = GenericListViewArray::<OffsetSize>::from(list_data);
3527
3528        let schema = Arc::new(Schema::new(vec![Field::new(
3529            "f1",
3530            list_view_array.data_type().clone(),
3531            false,
3532        )]));
3533        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(list_view_array)]).unwrap();
3534
3535        let output_batch = deserialize_file(serialize_file(&input_batch));
3536        assert_eq!(input_batch, output_batch);
3537
3538        let output_batch = deserialize_stream(serialize_stream(&input_batch));
3539        assert_eq!(input_batch, output_batch);
3540    }
3541
3542    #[test]
3543    fn test_roundtrip_list_view_of_dict() {
3544        #[allow(deprecated)]
3545        let list_data_type = DataType::ListView(Arc::new(Field::new_dict(
3546            "item",
3547            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3548            true,
3549            1,
3550            false,
3551        )));
3552        let offsets: &[i32; 5] = &[0, 2, 4, 4, 7];
3553        let sizes: &[i32; 4] = &[2, 2, 0, 3];
3554        test_roundtrip_list_view_of_dict_impl::<i32, i32>(list_data_type, offsets, sizes);
3555    }
3556
3557    #[test]
3558    fn test_roundtrip_large_list_view_of_dict() {
3559        #[allow(deprecated)]
3560        let list_data_type = DataType::LargeListView(Arc::new(Field::new_dict(
3561            "item",
3562            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3563            true,
3564            2,
3565            false,
3566        )));
3567        let offsets: &[i64; 5] = &[0, 2, 4, 4, 7];
3568        let sizes: &[i64; 4] = &[2, 2, 0, 3];
3569        test_roundtrip_list_view_of_dict_impl::<i64, i64>(list_data_type, offsets, sizes);
3570    }
3571
3572    #[test]
3573    fn test_roundtrip_sliced_list_view_of_dict() {
3574        #[allow(deprecated)]
3575        let list_data_type = DataType::ListView(Arc::new(Field::new_dict(
3576            "item",
3577            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3578            true,
3579            3,
3580            false,
3581        )));
3582
3583        let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3584        let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2, 1, 0, 3, 2, 1]);
3585        let dict_array = DictionaryArray::new(keys, Arc::new(values));
3586        let dict_data = dict_array.to_data();
3587
3588        let offsets: &[i32; 7] = &[0, 2, 4, 4, 7, 9, 12];
3589        let sizes: &[i32; 6] = &[2, 2, 0, 3, 2, 3];
3590        let value_offsets = Buffer::from_slice_ref(offsets);
3591        let value_sizes = Buffer::from_slice_ref(sizes);
3592
3593        let list_data = ArrayData::builder(list_data_type)
3594            .len(6)
3595            .add_buffer(value_offsets)
3596            .add_buffer(value_sizes)
3597            .add_child_data(dict_data)
3598            .build()
3599            .unwrap();
3600        let list_view_array = GenericListViewArray::<i32>::from(list_data);
3601
3602        let schema = Arc::new(Schema::new(vec![Field::new(
3603            "f1",
3604            list_view_array.data_type().clone(),
3605            false,
3606        )]));
3607        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(list_view_array)]).unwrap();
3608
3609        let sliced_batch = input_batch.slice(1, 4);
3610
3611        let output_batch = deserialize_file(serialize_file(&sliced_batch));
3612        assert_eq!(sliced_batch, output_batch);
3613
3614        let output_batch = deserialize_stream(serialize_stream(&sliced_batch));
3615        assert_eq!(sliced_batch, output_batch);
3616    }
3617
3618    #[test]
3619    fn test_roundtrip_dense_union_of_dict() {
3620        let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3621        let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3622        let dict_array = DictionaryArray::new(keys, Arc::new(values));
3623
3624        #[allow(deprecated)]
3625        let dict_field = Arc::new(Field::new_dict(
3626            "dict",
3627            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3628            true,
3629            1,
3630            false,
3631        ));
3632        let int_field = Arc::new(Field::new("int", DataType::Int32, false));
3633        let union_fields = UnionFields::try_new(vec![0, 1], vec![dict_field, int_field]).unwrap();
3634
3635        let types = ScalarBuffer::from(vec![0i8, 0, 1, 0, 1, 0, 0]);
3636        let offsets = ScalarBuffer::from(vec![0i32, 1, 0, 2, 1, 3, 4]);
3637
3638        let int_array = Int32Array::from(vec![100, 200]);
3639
3640        let union = UnionArray::try_new(
3641            union_fields.clone(),
3642            types,
3643            Some(offsets),
3644            vec![Arc::new(dict_array), Arc::new(int_array)],
3645        )
3646        .unwrap();
3647
3648        let schema = Arc::new(Schema::new(vec![Field::new(
3649            "union",
3650            DataType::Union(union_fields, UnionMode::Dense),
3651            false,
3652        )]));
3653        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
3654
3655        let output_batch = deserialize_file(serialize_file(&input_batch));
3656        assert_eq!(input_batch, output_batch);
3657
3658        let output_batch = deserialize_stream(serialize_stream(&input_batch));
3659        assert_eq!(input_batch, output_batch);
3660    }
3661
3662    #[test]
3663    fn test_roundtrip_sparse_union_of_dict() {
3664        let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3665        let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3666        let dict_array = DictionaryArray::new(keys, Arc::new(values));
3667
3668        #[allow(deprecated)]
3669        let dict_field = Arc::new(Field::new_dict(
3670            "dict",
3671            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3672            true,
3673            2,
3674            false,
3675        ));
3676        let int_field = Arc::new(Field::new("int", DataType::Int32, false));
3677        let union_fields = UnionFields::try_new(vec![0, 1], vec![dict_field, int_field]).unwrap();
3678
3679        let types = ScalarBuffer::from(vec![0i8, 0, 1, 0, 1, 0, 0]);
3680
3681        let int_array = Int32Array::from(vec![0, 0, 100, 0, 200, 0, 0]);
3682
3683        let union = UnionArray::try_new(
3684            union_fields.clone(),
3685            types,
3686            None,
3687            vec![Arc::new(dict_array), Arc::new(int_array)],
3688        )
3689        .unwrap();
3690
3691        let schema = Arc::new(Schema::new(vec![Field::new(
3692            "union",
3693            DataType::Union(union_fields, UnionMode::Sparse),
3694            false,
3695        )]));
3696        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
3697
3698        let output_batch = deserialize_file(serialize_file(&input_batch));
3699        assert_eq!(input_batch, output_batch);
3700
3701        let output_batch = deserialize_stream(serialize_stream(&input_batch));
3702        assert_eq!(input_batch, output_batch);
3703    }
3704
3705    #[test]
3706    fn test_roundtrip_map_with_dict_keys() {
3707        // Building a map array is a bit involved. We first build a struct arary that has a key and
3708        // value field and then use that to build the actual map array.
3709        let key_values = StringArray::from(vec!["key_a", "key_b", "key_c"]);
3710        let keys = Int32Array::from_iter_values([0, 1, 2, 0, 1, 0]);
3711        let dict_keys = DictionaryArray::new(keys, Arc::new(key_values));
3712
3713        let values = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
3714
3715        #[allow(deprecated)]
3716        let entries_field = Arc::new(Field::new(
3717            "entries",
3718            DataType::Struct(
3719                vec![
3720                    Field::new_dict(
3721                        "key",
3722                        DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3723                        false,
3724                        1,
3725                        false,
3726                    ),
3727                    Field::new("value", DataType::Int32, true),
3728                ]
3729                .into(),
3730            ),
3731            false,
3732        ));
3733
3734        let entries = StructArray::from(vec![
3735            (
3736                Arc::new(Field::new(
3737                    "key",
3738                    DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3739                    false,
3740                )),
3741                Arc::new(dict_keys) as ArrayRef,
3742            ),
3743            (
3744                Arc::new(Field::new("value", DataType::Int32, true)),
3745                Arc::new(values) as ArrayRef,
3746            ),
3747        ]);
3748
3749        let offsets = Buffer::from_slice_ref([0i32, 2, 4, 6]);
3750
3751        let map_data = ArrayData::builder(DataType::Map(entries_field, false))
3752            .len(3)
3753            .add_buffer(offsets)
3754            .add_child_data(entries.into_data())
3755            .build()
3756            .unwrap();
3757        let map_array = MapArray::from(map_data);
3758
3759        let schema = Arc::new(Schema::new(vec![Field::new(
3760            "map",
3761            map_array.data_type().clone(),
3762            false,
3763        )]));
3764        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(map_array)]).unwrap();
3765
3766        let output_batch = deserialize_file(serialize_file(&input_batch));
3767        assert_eq!(input_batch, output_batch);
3768
3769        let output_batch = deserialize_stream(serialize_stream(&input_batch));
3770        assert_eq!(input_batch, output_batch);
3771    }
3772
3773    #[test]
3774    fn test_roundtrip_map_with_dict_values() {
3775        // Building a map array is a bit involved. We first build a struct arary that has a key and
3776        // value field and then use that to build the actual map array.
3777        let keys = StringArray::from(vec!["a", "b", "c", "d", "e", "f"]);
3778
3779        let value_values = StringArray::from(vec!["val_x", "val_y", "val_z"]);
3780        let value_keys = Int32Array::from_iter_values([0, 1, 2, 0, 1, 0]);
3781        let dict_values = DictionaryArray::new(value_keys, Arc::new(value_values));
3782
3783        #[allow(deprecated)]
3784        let entries_field = Arc::new(Field::new(
3785            "entries",
3786            DataType::Struct(
3787                vec![
3788                    Field::new("key", DataType::Utf8, false),
3789                    Field::new_dict(
3790                        "value",
3791                        DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3792                        true,
3793                        2,
3794                        false,
3795                    ),
3796                ]
3797                .into(),
3798            ),
3799            false,
3800        ));
3801
3802        let entries = StructArray::from(vec![
3803            (
3804                Arc::new(Field::new("key", DataType::Utf8, false)),
3805                Arc::new(keys) as ArrayRef,
3806            ),
3807            (
3808                Arc::new(Field::new(
3809                    "value",
3810                    DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3811                    true,
3812                )),
3813                Arc::new(dict_values) as ArrayRef,
3814            ),
3815        ]);
3816
3817        let offsets = Buffer::from_slice_ref([0i32, 2, 4, 6]);
3818
3819        let map_data = ArrayData::builder(DataType::Map(entries_field, false))
3820            .len(3)
3821            .add_buffer(offsets)
3822            .add_child_data(entries.into_data())
3823            .build()
3824            .unwrap();
3825        let map_array = MapArray::from(map_data);
3826
3827        let schema = Arc::new(Schema::new(vec![Field::new(
3828            "map",
3829            map_array.data_type().clone(),
3830            false,
3831        )]));
3832        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(map_array)]).unwrap();
3833
3834        let output_batch = deserialize_file(serialize_file(&input_batch));
3835        assert_eq!(input_batch, output_batch);
3836
3837        let output_batch = deserialize_stream(serialize_stream(&input_batch));
3838        assert_eq!(input_batch, output_batch);
3839    }
3840
3841    #[test]
3842    fn test_decimal128_alignment16_is_sufficient() {
3843        const IPC_ALIGNMENT: usize = 16;
3844
3845        // Test a bunch of different dimensions to ensure alignment is never an issue.
3846        // For example, if we only test `num_cols = 1` then even with alignment 8 this
3847        // test would _happen_ to pass, even though for different dimensions like
3848        // `num_cols = 2` it would fail.
3849        for num_cols in [1, 2, 3, 17, 50, 73, 99] {
3850            let num_rows = (num_cols * 7 + 11) % 100; // Deterministic swizzle
3851
3852            let mut fields = Vec::new();
3853            let mut arrays = Vec::new();
3854            for i in 0..num_cols {
3855                let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
3856                let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
3857                fields.push(field);
3858                arrays.push(Arc::new(array) as Arc<dyn Array>);
3859            }
3860            let schema = Schema::new(fields);
3861            let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
3862
3863            let mut writer = FileWriter::try_new_with_options(
3864                Vec::new(),
3865                batch.schema_ref(),
3866                IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
3867            )
3868            .unwrap();
3869            writer.write(&batch).unwrap();
3870            writer.finish().unwrap();
3871
3872            let out: Vec<u8> = writer.into_inner().unwrap();
3873
3874            let buffer = Buffer::from_vec(out);
3875            let trailer_start = buffer.len() - 10;
3876            let footer_len =
3877                read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
3878            let footer =
3879                root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
3880
3881            let schema = fb_to_schema(footer.schema().unwrap());
3882
3883            // Importantly we set `require_alignment`, checking that 16-byte alignment is sufficient
3884            // for `read_record_batch` later on to read the data in a zero-copy manner.
3885            let decoder =
3886                FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
3887
3888            let batches = footer.recordBatches().unwrap();
3889
3890            let block = batches.get(0);
3891            let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
3892            let data = buffer.slice_with_length(block.offset() as _, block_len);
3893
3894            let batch2 = decoder.read_record_batch(block, &data).unwrap().unwrap();
3895
3896            assert_eq!(batch, batch2);
3897        }
3898    }
3899
3900    #[test]
3901    fn test_decimal128_alignment8_is_unaligned() {
3902        const IPC_ALIGNMENT: usize = 8;
3903
3904        let num_cols = 2;
3905        let num_rows = 1;
3906
3907        let mut fields = Vec::new();
3908        let mut arrays = Vec::new();
3909        for i in 0..num_cols {
3910            let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
3911            let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
3912            fields.push(field);
3913            arrays.push(Arc::new(array) as Arc<dyn Array>);
3914        }
3915        let schema = Schema::new(fields);
3916        let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
3917
3918        let mut writer = FileWriter::try_new_with_options(
3919            Vec::new(),
3920            batch.schema_ref(),
3921            IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
3922        )
3923        .unwrap();
3924        writer.write(&batch).unwrap();
3925        writer.finish().unwrap();
3926
3927        let out: Vec<u8> = writer.into_inner().unwrap();
3928
3929        let buffer = Buffer::from_vec(out);
3930        let trailer_start = buffer.len() - 10;
3931        let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
3932        let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
3933        let schema = fb_to_schema(footer.schema().unwrap());
3934
3935        // Importantly we set `require_alignment`, otherwise the error later is suppressed due to copying
3936        // to an aligned buffer in `ArrayDataBuilder.build_aligned`.
3937        let decoder =
3938            FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
3939
3940        let batches = footer.recordBatches().unwrap();
3941
3942        let block = batches.get(0);
3943        let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
3944        let data = buffer.slice_with_length(block.offset() as _, block_len);
3945
3946        let result = decoder.read_record_batch(block, &data);
3947
3948        let error = result.unwrap_err();
3949        assert_eq!(
3950            error.to_string(),
3951            "Invalid argument error: Misaligned buffers[0] in array of type Decimal128(38, 10), \
3952             offset from expected alignment of 16 by 8"
3953        );
3954    }
3955
3956    #[test]
3957    fn test_flush() {
3958        // We write a schema which is small enough to fit into a buffer and not get flushed,
3959        // and then force the write with .flush().
3960        let num_cols = 2;
3961        let mut fields = Vec::new();
3962        let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap();
3963        for i in 0..num_cols {
3964            let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
3965            fields.push(field);
3966        }
3967        let schema = Schema::new(fields);
3968        let inner_stream_writer = BufWriter::with_capacity(1024, Vec::new());
3969        let inner_file_writer = BufWriter::with_capacity(1024, Vec::new());
3970        let mut stream_writer =
3971            StreamWriter::try_new_with_options(inner_stream_writer, &schema, options.clone())
3972                .unwrap();
3973        let mut file_writer =
3974            FileWriter::try_new_with_options(inner_file_writer, &schema, options).unwrap();
3975
3976        let stream_bytes_written_on_new = stream_writer.get_ref().get_ref().len();
3977        let file_bytes_written_on_new = file_writer.get_ref().get_ref().len();
3978        stream_writer.flush().unwrap();
3979        file_writer.flush().unwrap();
3980        let stream_bytes_written_on_flush = stream_writer.get_ref().get_ref().len();
3981        let file_bytes_written_on_flush = file_writer.get_ref().get_ref().len();
3982        let stream_out = stream_writer.into_inner().unwrap().into_inner().unwrap();
3983        // Finishing a stream writes the continuation bytes in MetadataVersion::V5 (4 bytes)
3984        // and then a length of 0 (4 bytes) for a total of 8 bytes.
3985        // Everything before that should have been flushed in the .flush() call.
3986        let expected_stream_flushed_bytes = stream_out.len() - 8;
3987        // A file write is the same as the stream write except for the leading magic string
3988        // ARROW1 plus padding, which is 8 bytes.
3989        let expected_file_flushed_bytes = expected_stream_flushed_bytes + 8;
3990
3991        assert!(
3992            stream_bytes_written_on_new < stream_bytes_written_on_flush,
3993            "this test makes no sense if flush is not actually required"
3994        );
3995        assert!(
3996            file_bytes_written_on_new < file_bytes_written_on_flush,
3997            "this test makes no sense if flush is not actually required"
3998        );
3999        assert_eq!(stream_bytes_written_on_flush, expected_stream_flushed_bytes);
4000        assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes);
4001    }
4002
4003    #[test]
4004    fn test_roundtrip_list_of_fixed_list() -> Result<(), ArrowError> {
4005        let l1_type =
4006            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, false)), 3);
4007        let l2_type = DataType::List(Arc::new(Field::new("item", l1_type.clone(), false)));
4008
4009        let l0_builder = Float32Builder::new();
4010        let l1_builder = FixedSizeListBuilder::new(l0_builder, 3).with_field(Arc::new(Field::new(
4011            "item",
4012            DataType::Float32,
4013            false,
4014        )));
4015        let mut l2_builder =
4016            ListBuilder::new(l1_builder).with_field(Arc::new(Field::new("item", l1_type, false)));
4017
4018        for point in [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]] {
4019            l2_builder.values().values().append_value(point[0]);
4020            l2_builder.values().values().append_value(point[1]);
4021            l2_builder.values().values().append_value(point[2]);
4022
4023            l2_builder.values().append(true);
4024        }
4025        l2_builder.append(true);
4026
4027        let point = [10., 11., 12.];
4028        l2_builder.values().values().append_value(point[0]);
4029        l2_builder.values().values().append_value(point[1]);
4030        l2_builder.values().values().append_value(point[2]);
4031
4032        l2_builder.values().append(true);
4033        l2_builder.append(true);
4034
4035        let array = Arc::new(l2_builder.finish()) as ArrayRef;
4036
4037        let schema = Arc::new(Schema::new_with_metadata(
4038            vec![Field::new("points", l2_type, false)],
4039            HashMap::default(),
4040        ));
4041
4042        // Test a variety of combinations that include 0 and non-zero offsets
4043        // and also portions or the rest of the array
4044        test_slices(&array, &schema, 0, 1)?;
4045        test_slices(&array, &schema, 0, 2)?;
4046        test_slices(&array, &schema, 1, 1)?;
4047
4048        Ok(())
4049    }
4050
4051    #[test]
4052    fn test_roundtrip_list_of_fixed_list_w_nulls() -> Result<(), ArrowError> {
4053        let l0_builder = Float32Builder::new();
4054        let l1_builder = FixedSizeListBuilder::new(l0_builder, 3);
4055        let mut l2_builder = ListBuilder::new(l1_builder);
4056
4057        for point in [
4058            [Some(1.0), Some(2.0), None],
4059            [Some(4.0), Some(5.0), Some(6.0)],
4060            [None, Some(8.0), Some(9.0)],
4061        ] {
4062            for p in point {
4063                match p {
4064                    Some(p) => l2_builder.values().values().append_value(p),
4065                    None => l2_builder.values().values().append_null(),
4066                }
4067            }
4068
4069            l2_builder.values().append(true);
4070        }
4071        l2_builder.append(true);
4072
4073        let point = [Some(10.), None, None];
4074        for p in point {
4075            match p {
4076                Some(p) => l2_builder.values().values().append_value(p),
4077                None => l2_builder.values().values().append_null(),
4078            }
4079        }
4080
4081        l2_builder.values().append(true);
4082        l2_builder.append(true);
4083
4084        let array = Arc::new(l2_builder.finish()) as ArrayRef;
4085
4086        let schema = Arc::new(Schema::new_with_metadata(
4087            vec![Field::new(
4088                "points",
4089                DataType::List(Arc::new(Field::new(
4090                    "item",
4091                    DataType::FixedSizeList(
4092                        Arc::new(Field::new("item", DataType::Float32, true)),
4093                        3,
4094                    ),
4095                    true,
4096                ))),
4097                true,
4098            )],
4099            HashMap::default(),
4100        ));
4101
4102        // Test a variety of combinations that include 0 and non-zero offsets
4103        // and also portions or the rest of the array
4104        test_slices(&array, &schema, 0, 1)?;
4105        test_slices(&array, &schema, 0, 2)?;
4106        test_slices(&array, &schema, 1, 1)?;
4107
4108        Ok(())
4109    }
4110
4111    fn test_slices(
4112        parent_array: &ArrayRef,
4113        schema: &SchemaRef,
4114        offset: usize,
4115        length: usize,
4116    ) -> Result<(), ArrowError> {
4117        let subarray = parent_array.slice(offset, length);
4118        let original_batch = RecordBatch::try_new(schema.clone(), vec![subarray])?;
4119
4120        let mut bytes = Vec::new();
4121        let mut writer = StreamWriter::try_new(&mut bytes, schema)?;
4122        writer.write(&original_batch)?;
4123        writer.finish()?;
4124
4125        let mut cursor = std::io::Cursor::new(bytes);
4126        let mut reader = StreamReader::try_new(&mut cursor, None)?;
4127        let returned_batch = reader.next().unwrap()?;
4128
4129        assert_eq!(original_batch, returned_batch);
4130
4131        Ok(())
4132    }
4133
4134    #[test]
4135    fn test_roundtrip_fixed_list() -> Result<(), ArrowError> {
4136        let int_builder = Int64Builder::new();
4137        let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3)
4138            .with_field(Arc::new(Field::new("item", DataType::Int64, false)));
4139
4140        for point in [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]] {
4141            fixed_list_builder.values().append_value(point[0]);
4142            fixed_list_builder.values().append_value(point[1]);
4143            fixed_list_builder.values().append_value(point[2]);
4144
4145            fixed_list_builder.append(true);
4146        }
4147
4148        let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
4149
4150        let schema = Arc::new(Schema::new_with_metadata(
4151            vec![Field::new(
4152                "points",
4153                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, false)), 3),
4154                false,
4155            )],
4156            HashMap::default(),
4157        ));
4158
4159        // Test a variety of combinations that include 0 and non-zero offsets
4160        // and also portions or the rest of the array
4161        test_slices(&array, &schema, 0, 4)?;
4162        test_slices(&array, &schema, 0, 2)?;
4163        test_slices(&array, &schema, 1, 3)?;
4164        test_slices(&array, &schema, 2, 1)?;
4165
4166        Ok(())
4167    }
4168
4169    #[test]
4170    fn test_roundtrip_fixed_list_w_nulls() -> Result<(), ArrowError> {
4171        let int_builder = Int64Builder::new();
4172        let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3);
4173
4174        for point in [
4175            [Some(1), Some(2), None],
4176            [Some(4), Some(5), Some(6)],
4177            [None, Some(8), Some(9)],
4178            [Some(10), None, None],
4179        ] {
4180            for p in point {
4181                match p {
4182                    Some(p) => fixed_list_builder.values().append_value(p),
4183                    None => fixed_list_builder.values().append_null(),
4184                }
4185            }
4186
4187            fixed_list_builder.append(true);
4188        }
4189
4190        let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
4191
4192        let schema = Arc::new(Schema::new_with_metadata(
4193            vec![Field::new(
4194                "points",
4195                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 3),
4196                true,
4197            )],
4198            HashMap::default(),
4199        ));
4200
4201        // Test a variety of combinations that include 0 and non-zero offsets
4202        // and also portions or the rest of the array
4203        test_slices(&array, &schema, 0, 4)?;
4204        test_slices(&array, &schema, 0, 2)?;
4205        test_slices(&array, &schema, 1, 3)?;
4206        test_slices(&array, &schema, 2, 1)?;
4207
4208        Ok(())
4209    }
4210
4211    #[test]
4212    fn test_metadata_encoding_ordering() {
4213        fn create_hash() -> u64 {
4214            let metadata: HashMap<String, String> = [
4215                ("a", "1"), //
4216                ("b", "2"), //
4217                ("c", "3"), //
4218                ("d", "4"), //
4219                ("e", "5"), //
4220            ]
4221            .into_iter()
4222            .map(|(k, v)| (k.to_owned(), v.to_owned()))
4223            .collect();
4224
4225            // Set metadata on both the schema and a field within it.
4226            let schema = Arc::new(
4227                Schema::new(vec![
4228                    Field::new("a", DataType::Int64, true).with_metadata(metadata.clone()),
4229                ])
4230                .with_metadata(metadata)
4231                .clone(),
4232            );
4233            let batch = RecordBatch::new_empty(schema.clone());
4234
4235            let mut bytes = Vec::new();
4236            let mut w = StreamWriter::try_new(&mut bytes, batch.schema_ref()).unwrap();
4237            w.write(&batch).unwrap();
4238            w.finish().unwrap();
4239
4240            let mut h = std::hash::DefaultHasher::new();
4241            h.write(&bytes);
4242            h.finish()
4243        }
4244
4245        let expected = create_hash();
4246
4247        // Since there is randomness in the HashMap and we cannot specify our
4248        // own Hasher for the implementation used for metadata, run the above
4249        // code 20x and verify it does not change. This is not perfect but it
4250        // should be good enough.
4251        let all_passed = (0..20).all(|_| create_hash() == expected);
4252        assert!(all_passed);
4253    }
4254
4255    #[test]
4256    fn test_dictionary_tracker_reset() {
4257        let data_gen = IpcDataGenerator::default();
4258        let mut dictionary_tracker = DictionaryTracker::new(false);
4259        let writer_options = IpcWriteOptions::default();
4260        let mut compression_ctx = CompressionContext::default();
4261
4262        let schema = Arc::new(Schema::new(vec![Field::new(
4263            "a",
4264            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
4265            false,
4266        )]));
4267
4268        let mut write_single_batch_stream =
4269            |batch: RecordBatch, dict_tracker: &mut DictionaryTracker| -> Vec<u8> {
4270                let mut buffer = Vec::new();
4271
4272                // create a new IPC stream:
4273                let stream_header = data_gen.schema_to_bytes_with_dictionary_tracker(
4274                    &schema,
4275                    dict_tracker,
4276                    &writer_options,
4277                );
4278                _ = write_message(&mut buffer, stream_header, &writer_options).unwrap();
4279
4280                let (encoded_dicts, encoded_batch) = data_gen
4281                    .encode(&batch, dict_tracker, &writer_options, &mut compression_ctx)
4282                    .unwrap();
4283                for encoded_dict in encoded_dicts {
4284                    _ = write_message(&mut buffer, encoded_dict, &writer_options).unwrap();
4285                }
4286                _ = write_message(&mut buffer, encoded_batch, &writer_options).unwrap();
4287
4288                buffer
4289            };
4290
4291        let batch1 = RecordBatch::try_new(
4292            schema.clone(),
4293            vec![Arc::new(DictionaryArray::new(
4294                UInt8Array::from_iter_values([0]),
4295                Arc::new(StringArray::from_iter_values(["a"])),
4296            ))],
4297        )
4298        .unwrap();
4299        let buffer = write_single_batch_stream(batch1.clone(), &mut dictionary_tracker);
4300
4301        // ensure we can read the stream back
4302        let mut reader = StreamReader::try_new(Cursor::new(buffer), None).unwrap();
4303        let read_batch = reader.next().unwrap().unwrap();
4304        assert_eq!(read_batch, batch1);
4305
4306        // reset the dictionary tracker so it can be used for next stream
4307        dictionary_tracker.clear();
4308
4309        // now write a 2nd stream and ensure we can also read it:
4310        let batch2 = RecordBatch::try_new(
4311            schema.clone(),
4312            vec![Arc::new(DictionaryArray::new(
4313                UInt8Array::from_iter_values([0]),
4314                Arc::new(StringArray::from_iter_values(["a"])),
4315            ))],
4316        )
4317        .unwrap();
4318        let buffer = write_single_batch_stream(batch2.clone(), &mut dictionary_tracker);
4319        let mut reader = StreamReader::try_new(Cursor::new(buffer), None).unwrap();
4320        let read_batch = reader.next().unwrap().unwrap();
4321        assert_eq!(read_batch, batch2);
4322    }
4323}