arrow_row/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! A comparable row-oriented representation of a collection of [`Array`].
19//!
20//! [`Row`]s are [normalized for sorting], and can therefore be very efficiently [compared],
21//! using [`memcmp`] under the hood, or used in [non-comparison sorts] such as [radix sort].
22//! This makes the row format ideal for implementing efficient multi-column sorting,
23//! grouping, aggregation, windowing and more, as described in more detail
24//! [in this blog post](https://arrow.apache.org/blog/2022/11/07/multi-column-sorts-in-arrow-rust-part-1/).
25//!
26//! For example, given three input [`Array`], [`RowConverter`] creates byte
27//! sequences that [compare] the same as when using [`lexsort`].
28//!
29//! ```text
30//!    ┌─────┐   ┌─────┐   ┌─────┐
31//!    │     │   │     │   │     │
32//!    ├─────┤ ┌ ┼─────┼ ─ ┼─────┼ ┐              ┏━━━━━━━━━━━━━┓
33//!    │     │   │     │   │     │  ─────────────▶┃             ┃
34//!    ├─────┤ └ ┼─────┼ ─ ┼─────┼ ┘              ┗━━━━━━━━━━━━━┛
35//!    │     │   │     │   │     │
36//!    └─────┘   └─────┘   └─────┘
37//!                ...
38//!    ┌─────┐ ┌ ┬─────┬ ─ ┬─────┬ ┐              ┏━━━━━━━━┓
39//!    │     │   │     │   │     │  ─────────────▶┃        ┃
40//!    └─────┘ └ ┴─────┴ ─ ┴─────┴ ┘              ┗━━━━━━━━┛
41//!     UInt64      Utf8     F64
42//!
43//!           Input Arrays                          Row Format
44//!     (Columns)
45//! ```
46//!
47//! _[`Rows`] must be generated by the same [`RowConverter`] for the comparison
48//! to be meaningful._
49//!
50//! # Basic Example
51//! ```
52//! # use std::sync::Arc;
53//! # use arrow_row::{RowConverter, SortField};
54//! # use arrow_array::{ArrayRef, Int32Array, StringArray};
55//! # use arrow_array::cast::{AsArray, as_string_array};
56//! # use arrow_array::types::Int32Type;
57//! # use arrow_schema::DataType;
58//!
59//! let a1 = Arc::new(Int32Array::from_iter_values([-1, -1, 0, 3, 3])) as ArrayRef;
60//! let a2 = Arc::new(StringArray::from_iter_values(["a", "b", "c", "d", "d"])) as ArrayRef;
61//! let arrays = vec![a1, a2];
62//!
63//! // Convert arrays to rows
64//! let converter = RowConverter::new(vec![
65//!     SortField::new(DataType::Int32),
66//!     SortField::new(DataType::Utf8),
67//! ]).unwrap();
68//! let rows = converter.convert_columns(&arrays).unwrap();
69//!
70//! // Compare rows
71//! for i in 0..4 {
72//!     assert!(rows.row(i) <= rows.row(i + 1));
73//! }
74//! assert_eq!(rows.row(3), rows.row(4));
75//!
76//! // Convert rows back to arrays
77//! let converted = converter.convert_rows(&rows).unwrap();
78//! assert_eq!(arrays, converted);
79//!
80//! // Compare rows from different arrays
81//! let a1 = Arc::new(Int32Array::from_iter_values([3, 4])) as ArrayRef;
82//! let a2 = Arc::new(StringArray::from_iter_values(["e", "f"])) as ArrayRef;
83//! let arrays = vec![a1, a2];
84//! let rows2 = converter.convert_columns(&arrays).unwrap();
85//!
86//! assert!(rows.row(4) < rows2.row(0));
87//! assert!(rows.row(4) < rows2.row(1));
88//!
89//! // Convert selection of rows back to arrays
90//! let selection = [rows.row(0), rows2.row(1), rows.row(2), rows2.row(0)];
91//! let converted = converter.convert_rows(selection).unwrap();
92//! let c1 = converted[0].as_primitive::<Int32Type>();
93//! assert_eq!(c1.values(), &[-1, 4, 0, 3]);
94//!
95//! let c2 = converted[1].as_string::<i32>();
96//! let c2_values: Vec<_> = c2.iter().flatten().collect();
97//! assert_eq!(&c2_values, &["a", "f", "c", "e"]);
98//! ```
99//!
100//! # Lexsort
101//!
102//! The row format can also be used to implement a fast multi-column / lexicographic sort
103//!
104//! ```
105//! # use arrow_row::{RowConverter, SortField};
106//! # use arrow_array::{ArrayRef, UInt32Array};
107//! fn lexsort_to_indices(arrays: &[ArrayRef]) -> UInt32Array {
108//!     let fields = arrays
109//!         .iter()
110//!         .map(|a| SortField::new(a.data_type().clone()))
111//!         .collect();
112//!     let converter = RowConverter::new(fields).unwrap();
113//!     let rows = converter.convert_columns(arrays).unwrap();
114//!     let mut sort: Vec<_> = rows.iter().enumerate().collect();
115//!     sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
116//!     UInt32Array::from_iter_values(sort.iter().map(|(i, _)| *i as u32))
117//! }
118//! ```
119//!
120//! [non-comparison sorts]: https://en.wikipedia.org/wiki/Sorting_algorithm#Non-comparison_sorts
121//! [radix sort]: https://en.wikipedia.org/wiki/Radix_sort
122//! [normalized for sorting]: http://wwwlgis.informatik.uni-kl.de/archiv/wwwdvs.informatik.uni-kl.de/courses/DBSREAL/SS2005/Vorlesungsunterlagen/Implementing_Sorting.pdf
123//! [`memcmp`]: https://www.man7.org/linux/man-pages/man3/memcmp.3.html
124//! [`lexsort`]: https://docs.rs/arrow-ord/latest/arrow_ord/sort/fn.lexsort.html
125//! [compared]: PartialOrd
126//! [compare]: PartialOrd
127
128#![doc(
129    html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg",
130    html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg"
131)]
132#![cfg_attr(docsrs, feature(doc_auto_cfg))]
133#![warn(missing_docs)]
134use std::cmp::Ordering;
135use std::hash::{Hash, Hasher};
136use std::sync::Arc;
137
138use arrow_array::cast::*;
139use arrow_array::types::ArrowDictionaryKeyType;
140use arrow_array::*;
141use arrow_buffer::{ArrowNativeType, Buffer, OffsetBuffer, ScalarBuffer};
142use arrow_data::ArrayDataBuilder;
143use arrow_schema::*;
144use variable::{decode_binary_view, decode_string_view};
145
146use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive};
147use crate::variable::{decode_binary, decode_string};
148use arrow_array::types::{Int16Type, Int32Type, Int64Type};
149
150mod fixed;
151mod list;
152mod run;
153mod variable;
154
155/// Converts [`ArrayRef`] columns into a [row-oriented](self) format.
156///
157/// *Note: The encoding of the row format may change from release to release.*
158///
159/// ## Overview
160///
161/// The row format is a variable length byte sequence created by
162/// concatenating the encoded form of each column. The encoding for
163/// each column depends on its datatype (and sort options).
164///
165/// The encoding is carefully designed in such a way that escaping is
166/// unnecessary: it is never ambiguous as to whether a byte is part of
167/// a sentinel (e.g. null) or a value.
168///
169/// ## Unsigned Integer Encoding
170///
171/// A null integer is encoded as a `0_u8`, followed by a zero-ed number of bytes corresponding
172/// to the integer's length.
173///
174/// A valid integer is encoded as `1_u8`, followed by the big-endian representation of the
175/// integer.
176///
177/// ```text
178///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
179///    3          │03│00│00│00│      │01│00│00│00│03│
180///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
181///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
182///   258         │02│01│00│00│      │01│00│00│01│02│
183///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
184///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
185///  23423        │7F│5B│00│00│      │01│00│00│5B│7F│
186///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
187///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
188///  NULL         │??│??│??│??│      │00│00│00│00│00│
189///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
190///
191///              32-bit (4 bytes)        Row Format
192///  Value        Little Endian
193/// ```
194///
195/// ## Signed Integer Encoding
196///
197/// Signed integers have their most significant sign bit flipped, and are then encoded in the
198/// same manner as an unsigned integer.
199///
200/// ```text
201///        ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┬──┐
202///     5  │05│00│00│00│       │05│00│00│80│       │01│80│00│00│05│
203///        └──┴──┴──┴──┘       └──┴──┴──┴──┘       └──┴──┴──┴──┴──┘
204///        ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┬──┐
205///    -5  │FB│FF│FF│FF│       │FB│FF│FF│7F│       │01│7F│FF│FF│FB│
206///        └──┴──┴──┴──┘       └──┴──┴──┴──┘       └──┴──┴──┴──┴──┘
207///
208///  Value  32-bit (4 bytes)    High bit flipped      Row Format
209///          Little Endian
210/// ```
211///
212/// ## Float Encoding
213///
214/// Floats are converted from IEEE 754 representation to a signed integer representation
215/// by flipping all bar the sign bit if they are negative.
216///
217/// They are then encoded in the same manner as a signed integer.
218///
219/// ## Fixed Length Bytes Encoding
220///
221/// Fixed length bytes are encoded in the same fashion as primitive types above.
222///
223/// For a fixed length array of length `n`:
224///
225/// A null is encoded as `0_u8` null sentinel followed by `n` `0_u8` bytes
226///
227/// A valid value is encoded as `1_u8` followed by the value bytes
228///
229/// ## Variable Length Bytes (including Strings) Encoding
230///
231/// A null is encoded as a `0_u8`.
232///
233/// An empty byte array is encoded as `1_u8`.
234///
235/// A non-null, non-empty byte array is encoded as `2_u8` followed by the byte array
236/// encoded using a block based scheme described below.
237///
238/// The byte array is broken up into fixed-width blocks, each block is written in turn
239/// to the output, followed by `0xFF_u8`. The final block is padded to 32-bytes
240/// with `0_u8` and written to the output, followed by the un-padded length in bytes
241/// of this final block as a `u8`. The first 4 blocks have a length of 8, with subsequent
242/// blocks using a length of 32, this is to reduce space amplification for small strings.
243///
244/// Note the following example encodings use a block size of 4 bytes for brevity:
245///
246/// ```text
247///                       ┌───┬───┬───┬───┬───┬───┐
248///  "MEEP"               │02 │'M'│'E'│'E'│'P'│04 │
249///                       └───┴───┴───┴───┴───┴───┘
250///
251///                       ┌───┐
252///  ""                   │01 |
253///                       └───┘
254///
255///  NULL                 ┌───┐
256///                       │00 │
257///                       └───┘
258///
259/// "Defenestration"      ┌───┬───┬───┬───┬───┬───┐
260///                       │02 │'D'│'e'│'f'│'e'│FF │
261///                       └───┼───┼───┼───┼───┼───┤
262///                           │'n'│'e'│'s'│'t'│FF │
263///                           ├───┼───┼───┼───┼───┤
264///                           │'r'│'a'│'t'│'r'│FF │
265///                           ├───┼───┼───┼───┼───┤
266///                           │'a'│'t'│'i'│'o'│FF │
267///                           ├───┼───┼───┼───┼───┤
268///                           │'n'│00 │00 │00 │01 │
269///                           └───┴───┴───┴───┴───┘
270/// ```
271///
272/// This approach is loosely inspired by [COBS] encoding, and chosen over more traditional
273/// [byte stuffing] as it is more amenable to vectorisation, in particular AVX-256.
274///
275/// ## Dictionary Encoding
276///
277/// Dictionary encoded arrays are hydrated to their underlying values
278///
279/// ## REE Encoding
280///
281/// REE (Run End Encoding) arrays, A form of Run Length Encoding, are hydrated to their underlying values.
282///
283/// ## Struct Encoding
284///
285/// A null is encoded as a `0_u8`.
286///
287/// A valid value is encoded as `1_u8` followed by the row encoding of each child.
288///
289/// This encoding effectively flattens the schema in a depth-first fashion.
290///
291/// For example
292///
293/// ```text
294/// ┌───────┬────────────────────────┬───────┐
295/// │ Int32 │ Struct[Int32, Float32] │ Int32 │
296/// └───────┴────────────────────────┴───────┘
297/// ```
298///
299/// Is encoded as
300///
301/// ```text
302/// ┌───────┬───────────────┬───────┬─────────┬───────┐
303/// │ Int32 │ Null Sentinel │ Int32 │ Float32 │ Int32 │
304/// └───────┴───────────────┴───────┴─────────┴───────┘
305/// ```
306///
307/// ## List Encoding
308///
309/// Lists are encoded by first encoding all child elements to the row format.
310///
311/// A list value is then encoded as the concatenation of each of the child elements,
312/// separately encoded using the variable length encoding described above, followed
313/// by the variable length encoding of an empty byte array.
314///
315/// For example given:
316///
317/// ```text
318/// [1_u8, 2_u8, 3_u8]
319/// [1_u8, null]
320/// []
321/// null
322/// ```
323///
324/// The elements would be converted to:
325///
326/// ```text
327///     ┌──┬──┐     ┌──┬──┐     ┌──┬──┐     ┌──┬──┐        ┌──┬──┐
328///  1  │01│01│  2  │01│02│  3  │01│03│  1  │01│01│  null  │00│00│
329///     └──┴──┘     └──┴──┘     └──┴──┘     └──┴──┘        └──┴──┘
330///```
331///
332/// Which would be encoded as
333///
334/// ```text
335///                         ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
336///  [1_u8, 2_u8, 3_u8]     │02│01│01│00│00│02│02│01│02│00│00│02│02│01│03│00│00│02│01│
337///                         └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
338///                          └──── 1_u8 ────┘   └──── 2_u8 ────┘  └──── 3_u8 ────┘
339///
340///                         ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
341///  [1_u8, null]           │02│01│01│00│00│02│02│00│00│00│00│02│01│
342///                         └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
343///                          └──── 1_u8 ────┘   └──── null ────┘
344///
345///```
346///
347/// With `[]` represented by an empty byte array, and `null` a null byte array.
348///
349/// # Ordering
350///
351/// ## Float Ordering
352///
353/// Floats are totally ordered in accordance to the `totalOrder` predicate as defined
354/// in the IEEE 754 (2008 revision) floating point standard.
355///
356/// The ordering established by this does not always agree with the
357/// [`PartialOrd`] and [`PartialEq`] implementations of `f32`. For example,
358/// they consider negative and positive zero equal, while this does not
359///
360/// ## Null Ordering
361///
362/// The encoding described above will order nulls first, this can be inverted by representing
363/// nulls as `0xFF_u8` instead of `0_u8`
364///
365/// ## Reverse Column Ordering
366///
367/// The order of a given column can be reversed by negating the encoded bytes of non-null values
368///
369/// [COBS]: https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing
370/// [byte stuffing]: https://en.wikipedia.org/wiki/High-Level_Data_Link_Control#Asynchronous_framing
371#[derive(Debug)]
372pub struct RowConverter {
373    fields: Arc<[SortField]>,
374    /// State for codecs
375    codecs: Vec<Codec>,
376}
377
378#[derive(Debug)]
379enum Codec {
380    /// No additional codec state is necessary
381    Stateless,
382    /// A row converter for the dictionary values
383    /// and the encoding of a row containing only nulls
384    Dictionary(RowConverter, OwnedRow),
385    /// A row converter for the child fields
386    /// and the encoding of a row containing only nulls
387    Struct(RowConverter, OwnedRow),
388    /// A row converter for the child field
389    List(RowConverter),
390    /// A row converter for the values array of a run-end encoded array
391    RunEndEncoded(RowConverter),
392}
393
394impl Codec {
395    fn new(sort_field: &SortField) -> Result<Self, ArrowError> {
396        match &sort_field.data_type {
397            DataType::Dictionary(_, values) => {
398                let sort_field =
399                    SortField::new_with_options(values.as_ref().clone(), sort_field.options);
400
401                let converter = RowConverter::new(vec![sort_field])?;
402                let null_array = new_null_array(values.as_ref(), 1);
403                let nulls = converter.convert_columns(&[null_array])?;
404
405                let owned = OwnedRow {
406                    data: nulls.buffer.into(),
407                    config: nulls.config,
408                };
409                Ok(Self::Dictionary(converter, owned))
410            }
411            DataType::RunEndEncoded(_, values) => {
412                // Similar to List implementation
413                let options = SortOptions {
414                    descending: false,
415                    nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
416                };
417
418                let field = SortField::new_with_options(values.data_type().clone(), options);
419                let converter = RowConverter::new(vec![field])?;
420                Ok(Self::RunEndEncoded(converter))
421            }
422            d if !d.is_nested() => Ok(Self::Stateless),
423            DataType::List(f) | DataType::LargeList(f) => {
424                // The encoded contents will be inverted if descending is set to true
425                // As such we set `descending` to false and negate nulls first if it
426                // it set to true
427                let options = SortOptions {
428                    descending: false,
429                    nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
430                };
431
432                let field = SortField::new_with_options(f.data_type().clone(), options);
433                let converter = RowConverter::new(vec![field])?;
434                Ok(Self::List(converter))
435            }
436            DataType::Struct(f) => {
437                let sort_fields = f
438                    .iter()
439                    .map(|x| SortField::new_with_options(x.data_type().clone(), sort_field.options))
440                    .collect();
441
442                let converter = RowConverter::new(sort_fields)?;
443                let nulls: Vec<_> = f.iter().map(|x| new_null_array(x.data_type(), 1)).collect();
444
445                let nulls = converter.convert_columns(&nulls)?;
446                let owned = OwnedRow {
447                    data: nulls.buffer.into(),
448                    config: nulls.config,
449                };
450
451                Ok(Self::Struct(converter, owned))
452            }
453            _ => Err(ArrowError::NotYetImplemented(format!(
454                "not yet implemented: {:?}",
455                sort_field.data_type
456            ))),
457        }
458    }
459
460    fn encoder(&self, array: &dyn Array) -> Result<Encoder<'_>, ArrowError> {
461        match self {
462            Codec::Stateless => Ok(Encoder::Stateless),
463            Codec::Dictionary(converter, nulls) => {
464                let values = array.as_any_dictionary().values().clone();
465                let rows = converter.convert_columns(&[values])?;
466                Ok(Encoder::Dictionary(rows, nulls.row()))
467            }
468            Codec::Struct(converter, null) => {
469                let v = as_struct_array(array);
470                let rows = converter.convert_columns(v.columns())?;
471                Ok(Encoder::Struct(rows, null.row()))
472            }
473            Codec::List(converter) => {
474                let values = match array.data_type() {
475                    DataType::List(_) => as_list_array(array).values(),
476                    DataType::LargeList(_) => as_large_list_array(array).values(),
477                    _ => unreachable!(),
478                };
479                let rows = converter.convert_columns(&[values.clone()])?;
480                Ok(Encoder::List(rows))
481            }
482            Codec::RunEndEncoded(converter) => {
483                let values = match array.data_type() {
484                    DataType::RunEndEncoded(r, _) => match r.data_type() {
485                        DataType::Int16 => array.as_run::<Int16Type>().values(),
486                        DataType::Int32 => array.as_run::<Int32Type>().values(),
487                        DataType::Int64 => array.as_run::<Int64Type>().values(),
488                        _ => unreachable!("Unsupported run end index type: {r:?}"),
489                    },
490                    _ => unreachable!(),
491                };
492                let rows = converter.convert_columns(&[values.clone()])?;
493                Ok(Encoder::RunEndEncoded(rows))
494            }
495        }
496    }
497
498    fn size(&self) -> usize {
499        match self {
500            Codec::Stateless => 0,
501            Codec::Dictionary(converter, nulls) => converter.size() + nulls.data.len(),
502            Codec::Struct(converter, nulls) => converter.size() + nulls.data.len(),
503            Codec::List(converter) => converter.size(),
504            Codec::RunEndEncoded(converter) => converter.size(),
505        }
506    }
507}
508
509#[derive(Debug)]
510enum Encoder<'a> {
511    /// No additional encoder state is necessary
512    Stateless,
513    /// The encoding of the child array and the encoding of a null row
514    Dictionary(Rows, Row<'a>),
515    /// The row encoding of the child arrays and the encoding of a null row
516    ///
517    /// It is necessary to encode to a temporary [`Rows`] to avoid serializing
518    /// values that are masked by a null in the parent StructArray, otherwise
519    /// this would establish an ordering between semantically null values
520    Struct(Rows, Row<'a>),
521    /// The row encoding of the child array
522    List(Rows),
523    /// The row encoding of the values array
524    RunEndEncoded(Rows),
525}
526
527/// Configure the data type and sort order for a given column
528#[derive(Debug, Clone, PartialEq, Eq)]
529pub struct SortField {
530    /// Sort options
531    options: SortOptions,
532    /// Data type
533    data_type: DataType,
534}
535
536impl SortField {
537    /// Create a new column with the given data type
538    pub fn new(data_type: DataType) -> Self {
539        Self::new_with_options(data_type, Default::default())
540    }
541
542    /// Create a new column with the given data type and [`SortOptions`]
543    pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self {
544        Self { options, data_type }
545    }
546
547    /// Return size of this instance in bytes.
548    ///
549    /// Includes the size of `Self`.
550    pub fn size(&self) -> usize {
551        self.data_type.size() + std::mem::size_of::<Self>() - std::mem::size_of::<DataType>()
552    }
553}
554
555impl RowConverter {
556    /// Create a new [`RowConverter`] with the provided schema
557    pub fn new(fields: Vec<SortField>) -> Result<Self, ArrowError> {
558        if !Self::supports_fields(&fields) {
559            return Err(ArrowError::NotYetImplemented(format!(
560                "Row format support not yet implemented for: {fields:?}"
561            )));
562        }
563
564        let codecs = fields.iter().map(Codec::new).collect::<Result<_, _>>()?;
565        Ok(Self {
566            fields: fields.into(),
567            codecs,
568        })
569    }
570
571    /// Check if the given fields are supported by the row format.
572    pub fn supports_fields(fields: &[SortField]) -> bool {
573        fields.iter().all(|x| Self::supports_datatype(&x.data_type))
574    }
575
576    fn supports_datatype(d: &DataType) -> bool {
577        match d {
578            _ if !d.is_nested() => true,
579            DataType::List(f) | DataType::LargeList(f) | DataType::Map(f, _) => {
580                Self::supports_datatype(f.data_type())
581            }
582            DataType::Struct(f) => f.iter().all(|x| Self::supports_datatype(x.data_type())),
583            DataType::RunEndEncoded(_, values) => Self::supports_datatype(values.data_type()),
584            _ => false,
585        }
586    }
587
588    /// Convert [`ArrayRef`] columns into [`Rows`]
589    ///
590    /// See [`Row`] for information on when [`Row`] can be compared
591    ///
592    /// # Panics
593    ///
594    /// Panics if the schema of `columns` does not match that provided to [`RowConverter::new`]
595    pub fn convert_columns(&self, columns: &[ArrayRef]) -> Result<Rows, ArrowError> {
596        let num_rows = columns.first().map(|x| x.len()).unwrap_or(0);
597        let mut rows = self.empty_rows(num_rows, 0);
598        self.append(&mut rows, columns)?;
599        Ok(rows)
600    }
601
602    /// Convert [`ArrayRef`] columns appending to an existing [`Rows`]
603    ///
604    /// See [`Row`] for information on when [`Row`] can be compared
605    ///
606    /// # Panics
607    ///
608    /// Panics if
609    /// * The schema of `columns` does not match that provided to [`RowConverter::new`]
610    /// * The provided [`Rows`] were not created by this [`RowConverter`]
611    ///
612    /// ```
613    /// # use std::sync::Arc;
614    /// # use std::collections::HashSet;
615    /// # use arrow_array::cast::AsArray;
616    /// # use arrow_array::StringArray;
617    /// # use arrow_row::{Row, RowConverter, SortField};
618    /// # use arrow_schema::DataType;
619    /// #
620    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
621    /// let a1 = StringArray::from(vec!["hello", "world"]);
622    /// let a2 = StringArray::from(vec!["a", "a", "hello"]);
623    ///
624    /// let mut rows = converter.empty_rows(5, 128);
625    /// converter.append(&mut rows, &[Arc::new(a1)]).unwrap();
626    /// converter.append(&mut rows, &[Arc::new(a2)]).unwrap();
627    ///
628    /// let back = converter.convert_rows(&rows).unwrap();
629    /// let values: Vec<_> = back[0].as_string::<i32>().iter().map(Option::unwrap).collect();
630    /// assert_eq!(&values, &["hello", "world", "a", "a", "hello"]);
631    /// ```
632    pub fn append(&self, rows: &mut Rows, columns: &[ArrayRef]) -> Result<(), ArrowError> {
633        assert!(
634            Arc::ptr_eq(&rows.config.fields, &self.fields),
635            "rows were not produced by this RowConverter"
636        );
637
638        if columns.len() != self.fields.len() {
639            return Err(ArrowError::InvalidArgumentError(format!(
640                "Incorrect number of arrays provided to RowConverter, expected {} got {}",
641                self.fields.len(),
642                columns.len()
643            )));
644        }
645
646        let encoders = columns
647            .iter()
648            .zip(&self.codecs)
649            .zip(self.fields.iter())
650            .map(|((column, codec), field)| {
651                if !column.data_type().equals_datatype(&field.data_type) {
652                    return Err(ArrowError::InvalidArgumentError(format!(
653                        "RowConverter column schema mismatch, expected {} got {}",
654                        field.data_type,
655                        column.data_type()
656                    )));
657                }
658                codec.encoder(column.as_ref())
659            })
660            .collect::<Result<Vec<_>, _>>()?;
661
662        let write_offset = rows.num_rows();
663        let lengths = row_lengths(columns, &encoders);
664        let total = lengths.extend_offsets(rows.offsets[write_offset], &mut rows.offsets);
665        rows.buffer.resize(total, 0);
666
667        for ((column, field), encoder) in columns.iter().zip(self.fields.iter()).zip(encoders) {
668            // We encode a column at a time to minimise dispatch overheads
669            encode_column(
670                &mut rows.buffer,
671                &mut rows.offsets[write_offset..],
672                column.as_ref(),
673                field.options,
674                &encoder,
675            )
676        }
677
678        if cfg!(debug_assertions) {
679            assert_eq!(*rows.offsets.last().unwrap(), rows.buffer.len());
680            rows.offsets
681                .windows(2)
682                .for_each(|w| assert!(w[0] <= w[1], "offsets should be monotonic"));
683        }
684
685        Ok(())
686    }
687
688    /// Convert [`Rows`] columns into [`ArrayRef`]
689    ///
690    /// # Panics
691    ///
692    /// Panics if the rows were not produced by this [`RowConverter`]
693    pub fn convert_rows<'a, I>(&self, rows: I) -> Result<Vec<ArrayRef>, ArrowError>
694    where
695        I: IntoIterator<Item = Row<'a>>,
696    {
697        let mut validate_utf8 = false;
698        let mut rows: Vec<_> = rows
699            .into_iter()
700            .map(|row| {
701                assert!(
702                    Arc::ptr_eq(&row.config.fields, &self.fields),
703                    "rows were not produced by this RowConverter"
704                );
705                validate_utf8 |= row.config.validate_utf8;
706                row.data
707            })
708            .collect();
709
710        // SAFETY
711        // We have validated that the rows came from this [`RowConverter`]
712        // and therefore must be valid
713        unsafe { self.convert_raw(&mut rows, validate_utf8) }
714    }
715
716    /// Returns an empty [`Rows`] with capacity for `row_capacity` rows with
717    /// a total length of `data_capacity`
718    ///
719    /// This can be used to buffer a selection of [`Row`]
720    ///
721    /// ```
722    /// # use std::sync::Arc;
723    /// # use std::collections::HashSet;
724    /// # use arrow_array::cast::AsArray;
725    /// # use arrow_array::StringArray;
726    /// # use arrow_row::{Row, RowConverter, SortField};
727    /// # use arrow_schema::DataType;
728    /// #
729    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
730    /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
731    ///
732    /// // Convert to row format and deduplicate
733    /// let converted = converter.convert_columns(&[Arc::new(array)]).unwrap();
734    /// let mut distinct_rows = converter.empty_rows(3, 100);
735    /// let mut dedup: HashSet<Row> = HashSet::with_capacity(3);
736    /// converted.iter().filter(|row| dedup.insert(*row)).for_each(|row| distinct_rows.push(row));
737    ///
738    /// // Note: we could skip buffering and feed the filtered iterator directly
739    /// // into convert_rows, this is done for demonstration purposes only
740    /// let distinct = converter.convert_rows(&distinct_rows).unwrap();
741    /// let values: Vec<_> = distinct[0].as_string::<i32>().iter().map(Option::unwrap).collect();
742    /// assert_eq!(&values, &["hello", "world", "a"]);
743    /// ```
744    pub fn empty_rows(&self, row_capacity: usize, data_capacity: usize) -> Rows {
745        let mut offsets = Vec::with_capacity(row_capacity.saturating_add(1));
746        offsets.push(0);
747
748        Rows {
749            offsets,
750            buffer: Vec::with_capacity(data_capacity),
751            config: RowConfig {
752                fields: self.fields.clone(),
753                validate_utf8: false,
754            },
755        }
756    }
757
758    /// Create a new [Rows] instance from the given binary data.
759    ///
760    /// ```
761    /// # use std::sync::Arc;
762    /// # use std::collections::HashSet;
763    /// # use arrow_array::cast::AsArray;
764    /// # use arrow_array::StringArray;
765    /// # use arrow_row::{OwnedRow, Row, RowConverter, RowParser, SortField};
766    /// # use arrow_schema::DataType;
767    /// #
768    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
769    /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
770    /// let rows = converter.convert_columns(&[Arc::new(array)]).unwrap();
771    ///
772    /// // We can convert rows into binary format and back in batch.
773    /// let values: Vec<OwnedRow> = rows.iter().map(|r| r.owned()).collect();
774    /// let binary = rows.try_into_binary().expect("known-small array");
775    /// let converted = converter.from_binary(binary.clone());
776    /// assert!(converted.iter().eq(values.iter().map(|r| r.row())));
777    /// ```
778    ///
779    /// # Panics
780    ///
781    /// This function expects the passed [BinaryArray] to contain valid row data as produced by this
782    /// [RowConverter]. It will panic if any rows are null. Operations on the returned [Rows] may
783    /// panic if the data is malformed.
784    pub fn from_binary(&self, array: BinaryArray) -> Rows {
785        assert_eq!(
786            array.null_count(),
787            0,
788            "can't construct Rows instance from array with nulls"
789        );
790        Rows {
791            buffer: array.values().to_vec(),
792            offsets: array.offsets().iter().map(|&i| i.as_usize()).collect(),
793            config: RowConfig {
794                fields: Arc::clone(&self.fields),
795                validate_utf8: true,
796            },
797        }
798    }
799
800    /// Convert raw bytes into [`ArrayRef`]
801    ///
802    /// # Safety
803    ///
804    /// `rows` must contain valid data for this [`RowConverter`]
805    unsafe fn convert_raw(
806        &self,
807        rows: &mut [&[u8]],
808        validate_utf8: bool,
809    ) -> Result<Vec<ArrayRef>, ArrowError> {
810        self.fields
811            .iter()
812            .zip(&self.codecs)
813            .map(|(field, codec)| decode_column(field, rows, codec, validate_utf8))
814            .collect()
815    }
816
817    /// Returns a [`RowParser`] that can be used to parse [`Row`] from bytes
818    pub fn parser(&self) -> RowParser {
819        RowParser::new(Arc::clone(&self.fields))
820    }
821
822    /// Returns the size of this instance in bytes
823    ///
824    /// Includes the size of `Self`.
825    pub fn size(&self) -> usize {
826        std::mem::size_of::<Self>()
827            + self.fields.iter().map(|x| x.size()).sum::<usize>()
828            + self.codecs.capacity() * std::mem::size_of::<Codec>()
829            + self.codecs.iter().map(Codec::size).sum::<usize>()
830    }
831}
832
833/// A [`RowParser`] can be created from a [`RowConverter`] and used to parse bytes to [`Row`]
834#[derive(Debug)]
835pub struct RowParser {
836    config: RowConfig,
837}
838
839impl RowParser {
840    fn new(fields: Arc<[SortField]>) -> Self {
841        Self {
842            config: RowConfig {
843                fields,
844                validate_utf8: true,
845            },
846        }
847    }
848
849    /// Creates a [`Row`] from the provided `bytes`.
850    ///
851    /// `bytes` must be a [`Row`] produced by the [`RowConverter`] associated with
852    /// this [`RowParser`], otherwise subsequent operations with the produced [`Row`] may panic
853    pub fn parse<'a>(&'a self, bytes: &'a [u8]) -> Row<'a> {
854        Row {
855            data: bytes,
856            config: &self.config,
857        }
858    }
859}
860
861/// The config of a given set of [`Row`]
862#[derive(Debug, Clone)]
863struct RowConfig {
864    /// The schema for these rows
865    fields: Arc<[SortField]>,
866    /// Whether to run UTF-8 validation when converting to arrow arrays
867    validate_utf8: bool,
868}
869
870/// A row-oriented representation of arrow data, that is normalized for comparison.
871///
872/// See the [module level documentation](self) and [`RowConverter`] for more details.
873#[derive(Debug)]
874pub struct Rows {
875    /// Underlying row bytes
876    buffer: Vec<u8>,
877    /// Row `i` has data `&buffer[offsets[i]..offsets[i+1]]`
878    offsets: Vec<usize>,
879    /// The config for these rows
880    config: RowConfig,
881}
882
883impl Rows {
884    /// Append a [`Row`] to this [`Rows`]
885    pub fn push(&mut self, row: Row<'_>) {
886        assert!(
887            Arc::ptr_eq(&row.config.fields, &self.config.fields),
888            "row was not produced by this RowConverter"
889        );
890        self.config.validate_utf8 |= row.config.validate_utf8;
891        self.buffer.extend_from_slice(row.data);
892        self.offsets.push(self.buffer.len())
893    }
894
895    /// Returns the row at index `row`
896    pub fn row(&self, row: usize) -> Row<'_> {
897        assert!(row + 1 < self.offsets.len());
898        unsafe { self.row_unchecked(row) }
899    }
900
901    /// Returns the row at `index` without bounds checking
902    ///
903    /// # Safety
904    /// Caller must ensure that `index` is less than the number of offsets (#rows + 1)
905    pub unsafe fn row_unchecked(&self, index: usize) -> Row<'_> {
906        let end = unsafe { self.offsets.get_unchecked(index + 1) };
907        let start = unsafe { self.offsets.get_unchecked(index) };
908        let data = unsafe { self.buffer.get_unchecked(*start..*end) };
909        Row {
910            data,
911            config: &self.config,
912        }
913    }
914
915    /// Sets the length of this [`Rows`] to 0
916    pub fn clear(&mut self) {
917        self.offsets.truncate(1);
918        self.buffer.clear();
919    }
920
921    /// Returns the number of [`Row`] in this [`Rows`]
922    pub fn num_rows(&self) -> usize {
923        self.offsets.len() - 1
924    }
925
926    /// Returns an iterator over the [`Row`] in this [`Rows`]
927    pub fn iter(&self) -> RowsIter<'_> {
928        self.into_iter()
929    }
930
931    /// Returns the size of this instance in bytes
932    ///
933    /// Includes the size of `Self`.
934    pub fn size(&self) -> usize {
935        // Size of fields is accounted for as part of RowConverter
936        std::mem::size_of::<Self>()
937            + self.buffer.len()
938            + self.offsets.len() * std::mem::size_of::<usize>()
939    }
940
941    /// Create a [BinaryArray] from the [Rows] data without reallocating the
942    /// underlying bytes.
943    ///
944    ///
945    /// ```
946    /// # use std::sync::Arc;
947    /// # use std::collections::HashSet;
948    /// # use arrow_array::cast::AsArray;
949    /// # use arrow_array::StringArray;
950    /// # use arrow_row::{OwnedRow, Row, RowConverter, RowParser, SortField};
951    /// # use arrow_schema::DataType;
952    /// #
953    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
954    /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
955    /// let rows = converter.convert_columns(&[Arc::new(array)]).unwrap();
956    ///
957    /// // We can convert rows into binary format and back.
958    /// let values: Vec<OwnedRow> = rows.iter().map(|r| r.owned()).collect();
959    /// let binary = rows.try_into_binary().expect("known-small array");
960    /// let parser = converter.parser();
961    /// let parsed: Vec<OwnedRow> =
962    ///   binary.iter().flatten().map(|b| parser.parse(b).owned()).collect();
963    /// assert_eq!(values, parsed);
964    /// ```
965    ///
966    /// # Errors
967    ///
968    /// This function will return an error if there is more data than can be stored in
969    /// a [BinaryArray] -- i.e. if the total data size is more than 2GiB.
970    pub fn try_into_binary(self) -> Result<BinaryArray, ArrowError> {
971        if self.buffer.len() > i32::MAX as usize {
972            return Err(ArrowError::InvalidArgumentError(format!(
973                "{}-byte rows buffer too long to convert into a i32-indexed BinaryArray",
974                self.buffer.len()
975            )));
976        }
977        // We've checked that the buffer length fits in an i32; so all offsets into that buffer should fit as well.
978        let offsets_scalar = ScalarBuffer::from_iter(self.offsets.into_iter().map(i32::usize_as));
979        // SAFETY: offsets buffer is nonempty, monotonically increasing, and all represent valid indexes into buffer.
980        let array = unsafe {
981            BinaryArray::new_unchecked(
982                OffsetBuffer::new_unchecked(offsets_scalar),
983                Buffer::from_vec(self.buffer),
984                None,
985            )
986        };
987        Ok(array)
988    }
989}
990
991impl<'a> IntoIterator for &'a Rows {
992    type Item = Row<'a>;
993    type IntoIter = RowsIter<'a>;
994
995    fn into_iter(self) -> Self::IntoIter {
996        RowsIter {
997            rows: self,
998            start: 0,
999            end: self.num_rows(),
1000        }
1001    }
1002}
1003
1004/// An iterator over [`Rows`]
1005#[derive(Debug)]
1006pub struct RowsIter<'a> {
1007    rows: &'a Rows,
1008    start: usize,
1009    end: usize,
1010}
1011
1012impl<'a> Iterator for RowsIter<'a> {
1013    type Item = Row<'a>;
1014
1015    fn next(&mut self) -> Option<Self::Item> {
1016        if self.end == self.start {
1017            return None;
1018        }
1019
1020        // SAFETY: We have checked that `start` is less than `end`
1021        let row = unsafe { self.rows.row_unchecked(self.start) };
1022        self.start += 1;
1023        Some(row)
1024    }
1025
1026    fn size_hint(&self) -> (usize, Option<usize>) {
1027        let len = self.len();
1028        (len, Some(len))
1029    }
1030}
1031
1032impl ExactSizeIterator for RowsIter<'_> {
1033    fn len(&self) -> usize {
1034        self.end - self.start
1035    }
1036}
1037
1038impl DoubleEndedIterator for RowsIter<'_> {
1039    fn next_back(&mut self) -> Option<Self::Item> {
1040        if self.end == self.start {
1041            return None;
1042        }
1043        // Safety: We have checked that `start` is less than `end`
1044        let row = unsafe { self.rows.row_unchecked(self.end) };
1045        self.end -= 1;
1046        Some(row)
1047    }
1048}
1049
1050/// A comparable representation of a row.
1051///
1052/// See the [module level documentation](self) for more details.
1053///
1054/// Two [`Row`] can only be compared if they both belong to [`Rows`]
1055/// returned by calls to [`RowConverter::convert_columns`] on the same
1056/// [`RowConverter`]. If different [`RowConverter`]s are used, any
1057/// ordering established by comparing the [`Row`] is arbitrary.
1058#[derive(Debug, Copy, Clone)]
1059pub struct Row<'a> {
1060    data: &'a [u8],
1061    config: &'a RowConfig,
1062}
1063
1064impl<'a> Row<'a> {
1065    /// Create owned version of the row to detach it from the shared [`Rows`].
1066    pub fn owned(&self) -> OwnedRow {
1067        OwnedRow {
1068            data: self.data.into(),
1069            config: self.config.clone(),
1070        }
1071    }
1072
1073    /// The row's bytes, with the lifetime of the underlying data.
1074    pub fn data(&self) -> &'a [u8] {
1075        self.data
1076    }
1077}
1078
1079// Manually derive these as don't wish to include `fields`
1080
1081impl PartialEq for Row<'_> {
1082    #[inline]
1083    fn eq(&self, other: &Self) -> bool {
1084        self.data.eq(other.data)
1085    }
1086}
1087
1088impl Eq for Row<'_> {}
1089
1090impl PartialOrd for Row<'_> {
1091    #[inline]
1092    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1093        Some(self.cmp(other))
1094    }
1095}
1096
1097impl Ord for Row<'_> {
1098    #[inline]
1099    fn cmp(&self, other: &Self) -> Ordering {
1100        self.data.cmp(other.data)
1101    }
1102}
1103
1104impl Hash for Row<'_> {
1105    #[inline]
1106    fn hash<H: Hasher>(&self, state: &mut H) {
1107        self.data.hash(state)
1108    }
1109}
1110
1111impl AsRef<[u8]> for Row<'_> {
1112    #[inline]
1113    fn as_ref(&self) -> &[u8] {
1114        self.data
1115    }
1116}
1117
1118/// Owned version of a [`Row`] that can be moved/cloned freely.
1119///
1120/// This contains the data for the one specific row (not the entire buffer of all rows).
1121#[derive(Debug, Clone)]
1122pub struct OwnedRow {
1123    data: Box<[u8]>,
1124    config: RowConfig,
1125}
1126
1127impl OwnedRow {
1128    /// Get borrowed [`Row`] from owned version.
1129    ///
1130    /// This is helpful if you want to compare an [`OwnedRow`] with a [`Row`].
1131    pub fn row(&self) -> Row<'_> {
1132        Row {
1133            data: &self.data,
1134            config: &self.config,
1135        }
1136    }
1137}
1138
1139// Manually derive these as don't wish to include `fields`. Also we just want to use the same `Row` implementations here.
1140
1141impl PartialEq for OwnedRow {
1142    #[inline]
1143    fn eq(&self, other: &Self) -> bool {
1144        self.row().eq(&other.row())
1145    }
1146}
1147
1148impl Eq for OwnedRow {}
1149
1150impl PartialOrd for OwnedRow {
1151    #[inline]
1152    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1153        Some(self.cmp(other))
1154    }
1155}
1156
1157impl Ord for OwnedRow {
1158    #[inline]
1159    fn cmp(&self, other: &Self) -> Ordering {
1160        self.row().cmp(&other.row())
1161    }
1162}
1163
1164impl Hash for OwnedRow {
1165    #[inline]
1166    fn hash<H: Hasher>(&self, state: &mut H) {
1167        self.row().hash(state)
1168    }
1169}
1170
1171impl AsRef<[u8]> for OwnedRow {
1172    #[inline]
1173    fn as_ref(&self) -> &[u8] {
1174        &self.data
1175    }
1176}
1177
1178/// Returns the null sentinel, negated if `invert` is true
1179#[inline]
1180fn null_sentinel(options: SortOptions) -> u8 {
1181    match options.nulls_first {
1182        true => 0,
1183        false => 0xFF,
1184    }
1185}
1186
1187/// Stores the lengths of the rows. Lazily materializes lengths for columns with fixed-size types.
1188enum LengthTracker {
1189    /// Fixed state: All rows have length `length`
1190    Fixed { length: usize, num_rows: usize },
1191    /// Variable state: The length of row `i` is `lengths[i] + fixed_length`
1192    Variable {
1193        fixed_length: usize,
1194        lengths: Vec<usize>,
1195    },
1196}
1197
1198impl LengthTracker {
1199    fn new(num_rows: usize) -> Self {
1200        Self::Fixed {
1201            length: 0,
1202            num_rows,
1203        }
1204    }
1205
1206    /// Adds a column of fixed-length elements, each of size `new_length` to the LengthTracker
1207    fn push_fixed(&mut self, new_length: usize) {
1208        match self {
1209            LengthTracker::Fixed { length, .. } => *length += new_length,
1210            LengthTracker::Variable { fixed_length, .. } => *fixed_length += new_length,
1211        }
1212    }
1213
1214    /// Adds a column of possibly variable-length elements, element `i` has length `new_lengths.nth(i)`
1215    fn push_variable(&mut self, new_lengths: impl ExactSizeIterator<Item = usize>) {
1216        match self {
1217            LengthTracker::Fixed { length, .. } => {
1218                *self = LengthTracker::Variable {
1219                    fixed_length: *length,
1220                    lengths: new_lengths.collect(),
1221                }
1222            }
1223            LengthTracker::Variable { lengths, .. } => {
1224                assert_eq!(lengths.len(), new_lengths.len());
1225                lengths
1226                    .iter_mut()
1227                    .zip(new_lengths)
1228                    .for_each(|(length, new_length)| *length += new_length);
1229            }
1230        }
1231    }
1232
1233    /// Returns the tracked row lengths as a slice
1234    fn materialized(&mut self) -> &mut [usize] {
1235        if let LengthTracker::Fixed { length, num_rows } = *self {
1236            *self = LengthTracker::Variable {
1237                fixed_length: length,
1238                lengths: vec![0; num_rows],
1239            };
1240        }
1241
1242        match self {
1243            LengthTracker::Variable { lengths, .. } => lengths,
1244            LengthTracker::Fixed { .. } => unreachable!(),
1245        }
1246    }
1247
1248    /// Initializes the offsets using the tracked lengths. Returns the sum of the
1249    /// lengths of the rows added.
1250    ///
1251    /// We initialize the offsets shifted down by one row index.
1252    ///
1253    /// As the rows are appended to the offsets will be incremented to match
1254    ///
1255    /// For example, consider the case of 3 rows of length 3, 4, and 6 respectively.
1256    /// The offsets would be initialized to `0, 0, 3, 7`
1257    ///
1258    /// Writing the first row entirely would yield `0, 3, 3, 7`
1259    /// The second, `0, 3, 7, 7`
1260    /// The third, `0, 3, 7, 13`
1261    //
1262    /// This would be the final offsets for reading
1263    //
1264    /// In this way offsets tracks the position during writing whilst eventually serving
1265    fn extend_offsets(&self, initial_offset: usize, offsets: &mut Vec<usize>) -> usize {
1266        match self {
1267            LengthTracker::Fixed { length, num_rows } => {
1268                offsets.extend((0..*num_rows).map(|i| initial_offset + i * length));
1269
1270                initial_offset + num_rows * length
1271            }
1272            LengthTracker::Variable {
1273                fixed_length,
1274                lengths,
1275            } => {
1276                let mut acc = initial_offset;
1277
1278                offsets.extend(lengths.iter().map(|length| {
1279                    let current = acc;
1280                    acc += length + fixed_length;
1281                    current
1282                }));
1283
1284                acc
1285            }
1286        }
1287    }
1288}
1289
1290/// Computes the length of each encoded [`Rows`] and returns an empty [`Rows`]
1291fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> LengthTracker {
1292    use fixed::FixedLengthEncoding;
1293
1294    let num_rows = cols.first().map(|x| x.len()).unwrap_or(0);
1295    let mut tracker = LengthTracker::new(num_rows);
1296
1297    for (array, encoder) in cols.iter().zip(encoders) {
1298        match encoder {
1299            Encoder::Stateless => {
1300                downcast_primitive_array! {
1301                    array => tracker.push_fixed(fixed::encoded_len(array)),
1302                    DataType::Null => {},
1303                    DataType::Boolean => tracker.push_fixed(bool::ENCODED_LEN),
1304                    DataType::Binary => tracker.push_variable(
1305                        as_generic_binary_array::<i32>(array)
1306                            .iter()
1307                            .map(|slice| variable::encoded_len(slice))
1308                    ),
1309                    DataType::LargeBinary => tracker.push_variable(
1310                        as_generic_binary_array::<i64>(array)
1311                            .iter()
1312                            .map(|slice| variable::encoded_len(slice))
1313                    ),
1314                    DataType::BinaryView => tracker.push_variable(
1315                        array.as_binary_view()
1316                            .iter()
1317                            .map(|slice| variable::encoded_len(slice))
1318                    ),
1319                    DataType::Utf8 => tracker.push_variable(
1320                        array.as_string::<i32>()
1321                            .iter()
1322                            .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1323                    ),
1324                    DataType::LargeUtf8 => tracker.push_variable(
1325                        array.as_string::<i64>()
1326                            .iter()
1327                            .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1328                    ),
1329                    DataType::Utf8View => tracker.push_variable(
1330                        array.as_string_view()
1331                            .iter()
1332                            .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1333                    ),
1334                    DataType::FixedSizeBinary(len) => {
1335                        let len = len.to_usize().unwrap();
1336                        tracker.push_fixed(1 + len)
1337                    }
1338                    _ => unimplemented!("unsupported data type: {}", array.data_type()),
1339                }
1340            }
1341            Encoder::Dictionary(values, null) => {
1342                downcast_dictionary_array! {
1343                    array => {
1344                        tracker.push_variable(
1345                            array.keys().iter().map(|v| match v {
1346                                Some(k) => values.row(k.as_usize()).data.len(),
1347                                None => null.data.len(),
1348                            })
1349                        )
1350                    }
1351                    _ => unreachable!(),
1352                }
1353            }
1354            Encoder::Struct(rows, null) => {
1355                let array = as_struct_array(array);
1356                tracker.push_variable((0..array.len()).map(|idx| match array.is_valid(idx) {
1357                    true => 1 + rows.row(idx).as_ref().len(),
1358                    false => 1 + null.data.len(),
1359                }));
1360            }
1361            Encoder::List(rows) => match array.data_type() {
1362                DataType::List(_) => {
1363                    list::compute_lengths(tracker.materialized(), rows, as_list_array(array))
1364                }
1365                DataType::LargeList(_) => {
1366                    list::compute_lengths(tracker.materialized(), rows, as_large_list_array(array))
1367                }
1368                _ => unreachable!(),
1369            },
1370            Encoder::RunEndEncoded(rows) => match array.data_type() {
1371                DataType::RunEndEncoded(r, _) => match r.data_type() {
1372                    DataType::Int16 => run::compute_lengths(
1373                        tracker.materialized(),
1374                        rows,
1375                        array.as_run::<Int16Type>(),
1376                    ),
1377                    DataType::Int32 => run::compute_lengths(
1378                        tracker.materialized(),
1379                        rows,
1380                        array.as_run::<Int32Type>(),
1381                    ),
1382                    DataType::Int64 => run::compute_lengths(
1383                        tracker.materialized(),
1384                        rows,
1385                        array.as_run::<Int64Type>(),
1386                    ),
1387                    _ => unreachable!("Unsupported run end index type: {r:?}"),
1388                },
1389                _ => unreachable!(),
1390            },
1391        }
1392    }
1393
1394    tracker
1395}
1396
1397/// Encodes a column to the provided [`Rows`] incrementing the offsets as it progresses
1398fn encode_column(
1399    data: &mut [u8],
1400    offsets: &mut [usize],
1401    column: &dyn Array,
1402    opts: SortOptions,
1403    encoder: &Encoder<'_>,
1404) {
1405    match encoder {
1406        Encoder::Stateless => {
1407            downcast_primitive_array! {
1408                column => {
1409                    if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1410                        fixed::encode(data, offsets, column.values(), nulls, opts)
1411                    } else {
1412                        fixed::encode_not_null(data, offsets, column.values(), opts)
1413                    }
1414                }
1415                DataType::Null => {}
1416                DataType::Boolean => {
1417                    if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1418                        fixed::encode_boolean(data, offsets, column.as_boolean().values(), nulls, opts)
1419                    } else {
1420                        fixed::encode_boolean_not_null(data, offsets, column.as_boolean().values(), opts)
1421                    }
1422                }
1423                DataType::Binary => {
1424                    variable::encode(data, offsets, as_generic_binary_array::<i32>(column).iter(), opts)
1425                }
1426                DataType::BinaryView => {
1427                    variable::encode(data, offsets, column.as_binary_view().iter(), opts)
1428                }
1429                DataType::LargeBinary => {
1430                    variable::encode(data, offsets, as_generic_binary_array::<i64>(column).iter(), opts)
1431                }
1432                DataType::Utf8 => variable::encode(
1433                    data, offsets,
1434                    column.as_string::<i32>().iter().map(|x| x.map(|x| x.as_bytes())),
1435                    opts,
1436                ),
1437                DataType::LargeUtf8 => variable::encode(
1438                    data, offsets,
1439                    column.as_string::<i64>()
1440                        .iter()
1441                        .map(|x| x.map(|x| x.as_bytes())),
1442                    opts,
1443                ),
1444                DataType::Utf8View => variable::encode(
1445                    data, offsets,
1446                    column.as_string_view().iter().map(|x| x.map(|x| x.as_bytes())),
1447                    opts,
1448                ),
1449                DataType::FixedSizeBinary(_) => {
1450                    let array = column.as_any().downcast_ref().unwrap();
1451                    fixed::encode_fixed_size_binary(data, offsets, array, opts)
1452                }
1453                _ => unimplemented!("unsupported data type: {}", column.data_type()),
1454            }
1455        }
1456        Encoder::Dictionary(values, nulls) => {
1457            downcast_dictionary_array! {
1458                column => encode_dictionary_values(data, offsets, column, values, nulls),
1459                _ => unreachable!()
1460            }
1461        }
1462        Encoder::Struct(rows, null) => {
1463            let array = as_struct_array(column);
1464            let null_sentinel = null_sentinel(opts);
1465            offsets
1466                .iter_mut()
1467                .skip(1)
1468                .enumerate()
1469                .for_each(|(idx, offset)| {
1470                    let (row, sentinel) = match array.is_valid(idx) {
1471                        true => (rows.row(idx), 0x01),
1472                        false => (*null, null_sentinel),
1473                    };
1474                    let end_offset = *offset + 1 + row.as_ref().len();
1475                    data[*offset] = sentinel;
1476                    data[*offset + 1..end_offset].copy_from_slice(row.as_ref());
1477                    *offset = end_offset;
1478                })
1479        }
1480        Encoder::List(rows) => match column.data_type() {
1481            DataType::List(_) => list::encode(data, offsets, rows, opts, as_list_array(column)),
1482            DataType::LargeList(_) => {
1483                list::encode(data, offsets, rows, opts, as_large_list_array(column))
1484            }
1485            _ => unreachable!(),
1486        },
1487        Encoder::RunEndEncoded(rows) => match column.data_type() {
1488            DataType::RunEndEncoded(r, _) => match r.data_type() {
1489                DataType::Int16 => {
1490                    run::encode(data, offsets, rows, opts, column.as_run::<Int16Type>())
1491                }
1492                DataType::Int32 => {
1493                    run::encode(data, offsets, rows, opts, column.as_run::<Int32Type>())
1494                }
1495                DataType::Int64 => {
1496                    run::encode(data, offsets, rows, opts, column.as_run::<Int64Type>())
1497                }
1498                _ => unreachable!("Unsupported run end index type: {r:?}"),
1499            },
1500            _ => unreachable!(),
1501        },
1502    }
1503}
1504
1505/// Encode dictionary values not preserving the dictionary encoding
1506pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>(
1507    data: &mut [u8],
1508    offsets: &mut [usize],
1509    column: &DictionaryArray<K>,
1510    values: &Rows,
1511    null: &Row<'_>,
1512) {
1513    for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) {
1514        let row = match k {
1515            Some(k) => values.row(k.as_usize()).data,
1516            None => null.data,
1517        };
1518        let end_offset = *offset + row.len();
1519        data[*offset..end_offset].copy_from_slice(row);
1520        *offset = end_offset;
1521    }
1522}
1523
1524macro_rules! decode_primitive_helper {
1525    ($t:ty, $rows:ident, $data_type:ident, $options:ident) => {
1526        Arc::new(decode_primitive::<$t>($rows, $data_type, $options))
1527    };
1528}
1529
1530/// Decodes a the provided `field` from `rows`
1531///
1532/// # Safety
1533///
1534/// Rows must contain valid data for the provided field
1535unsafe fn decode_column(
1536    field: &SortField,
1537    rows: &mut [&[u8]],
1538    codec: &Codec,
1539    validate_utf8: bool,
1540) -> Result<ArrayRef, ArrowError> {
1541    let options = field.options;
1542
1543    let array: ArrayRef = match codec {
1544        Codec::Stateless => {
1545            let data_type = field.data_type.clone();
1546            downcast_primitive! {
1547                data_type => (decode_primitive_helper, rows, data_type, options),
1548                DataType::Null => Arc::new(NullArray::new(rows.len())),
1549                DataType::Boolean => Arc::new(decode_bool(rows, options)),
1550                DataType::Binary => Arc::new(decode_binary::<i32>(rows, options)),
1551                DataType::LargeBinary => Arc::new(decode_binary::<i64>(rows, options)),
1552                DataType::BinaryView => Arc::new(decode_binary_view(rows, options)),
1553                DataType::FixedSizeBinary(size) => Arc::new(decode_fixed_size_binary(rows, size, options)),
1554                DataType::Utf8 => Arc::new(decode_string::<i32>(rows, options, validate_utf8)),
1555                DataType::LargeUtf8 => Arc::new(decode_string::<i64>(rows, options, validate_utf8)),
1556                DataType::Utf8View => Arc::new(decode_string_view(rows, options, validate_utf8)),
1557                _ => return Err(ArrowError::NotYetImplemented(format!("unsupported data type: {}", data_type)))
1558            }
1559        }
1560        Codec::Dictionary(converter, _) => {
1561            let cols = converter.convert_raw(rows, validate_utf8)?;
1562            cols.into_iter().next().unwrap()
1563        }
1564        Codec::Struct(converter, _) => {
1565            let (null_count, nulls) = fixed::decode_nulls(rows);
1566            rows.iter_mut().for_each(|row| *row = &row[1..]);
1567            let children = converter.convert_raw(rows, validate_utf8)?;
1568
1569            let child_data = children.iter().map(|c| c.to_data()).collect();
1570            let builder = ArrayDataBuilder::new(field.data_type.clone())
1571                .len(rows.len())
1572                .null_count(null_count)
1573                .null_bit_buffer(Some(nulls))
1574                .child_data(child_data);
1575
1576            Arc::new(StructArray::from(builder.build_unchecked()))
1577        }
1578        Codec::List(converter) => match &field.data_type {
1579            DataType::List(_) => {
1580                Arc::new(list::decode::<i32>(converter, rows, field, validate_utf8)?)
1581            }
1582            DataType::LargeList(_) => {
1583                Arc::new(list::decode::<i64>(converter, rows, field, validate_utf8)?)
1584            }
1585            _ => unreachable!(),
1586        },
1587        Codec::RunEndEncoded(converter) => match &field.data_type {
1588            DataType::RunEndEncoded(run_ends, _) => match run_ends.data_type() {
1589                DataType::Int16 => Arc::new(run::decode::<Int16Type>(
1590                    converter,
1591                    rows,
1592                    field,
1593                    validate_utf8,
1594                )?),
1595                DataType::Int32 => Arc::new(run::decode::<Int32Type>(
1596                    converter,
1597                    rows,
1598                    field,
1599                    validate_utf8,
1600                )?),
1601                DataType::Int64 => Arc::new(run::decode::<Int64Type>(
1602                    converter,
1603                    rows,
1604                    field,
1605                    validate_utf8,
1606                )?),
1607                _ => unreachable!(),
1608            },
1609            _ => unreachable!(),
1610        },
1611    };
1612    Ok(array)
1613}
1614
1615#[cfg(test)]
1616mod tests {
1617    use rand::distr::uniform::SampleUniform;
1618    use rand::distr::{Distribution, StandardUniform};
1619    use rand::{rng, Rng};
1620
1621    use arrow_array::builder::*;
1622    use arrow_array::types::*;
1623    use arrow_array::*;
1624    use arrow_buffer::{i256, NullBuffer};
1625    use arrow_buffer::{Buffer, OffsetBuffer};
1626    use arrow_cast::display::{ArrayFormatter, FormatOptions};
1627    use arrow_ord::sort::{LexicographicalComparator, SortColumn};
1628
1629    use super::*;
1630
1631    #[test]
1632    fn test_fixed_width() {
1633        let cols = [
1634            Arc::new(Int16Array::from_iter([
1635                Some(1),
1636                Some(2),
1637                None,
1638                Some(-5),
1639                Some(2),
1640                Some(2),
1641                Some(0),
1642            ])) as ArrayRef,
1643            Arc::new(Float32Array::from_iter([
1644                Some(1.3),
1645                Some(2.5),
1646                None,
1647                Some(4.),
1648                Some(0.1),
1649                Some(-4.),
1650                Some(-0.),
1651            ])) as ArrayRef,
1652        ];
1653
1654        let converter = RowConverter::new(vec![
1655            SortField::new(DataType::Int16),
1656            SortField::new(DataType::Float32),
1657        ])
1658        .unwrap();
1659        let rows = converter.convert_columns(&cols).unwrap();
1660
1661        assert_eq!(rows.offsets, &[0, 8, 16, 24, 32, 40, 48, 56]);
1662        assert_eq!(
1663            rows.buffer,
1664            &[
1665                1, 128, 1, //
1666                1, 191, 166, 102, 102, //
1667                1, 128, 2, //
1668                1, 192, 32, 0, 0, //
1669                0, 0, 0, //
1670                0, 0, 0, 0, 0, //
1671                1, 127, 251, //
1672                1, 192, 128, 0, 0, //
1673                1, 128, 2, //
1674                1, 189, 204, 204, 205, //
1675                1, 128, 2, //
1676                1, 63, 127, 255, 255, //
1677                1, 128, 0, //
1678                1, 127, 255, 255, 255 //
1679            ]
1680        );
1681
1682        assert!(rows.row(3) < rows.row(6));
1683        assert!(rows.row(0) < rows.row(1));
1684        assert!(rows.row(3) < rows.row(0));
1685        assert!(rows.row(4) < rows.row(1));
1686        assert!(rows.row(5) < rows.row(4));
1687
1688        let back = converter.convert_rows(&rows).unwrap();
1689        for (expected, actual) in cols.iter().zip(&back) {
1690            assert_eq!(expected, actual);
1691        }
1692    }
1693
1694    #[test]
1695    fn test_decimal128() {
1696        let converter = RowConverter::new(vec![SortField::new(DataType::Decimal128(
1697            DECIMAL128_MAX_PRECISION,
1698            7,
1699        ))])
1700        .unwrap();
1701        let col = Arc::new(
1702            Decimal128Array::from_iter([
1703                None,
1704                Some(i128::MIN),
1705                Some(-13),
1706                Some(46_i128),
1707                Some(5456_i128),
1708                Some(i128::MAX),
1709            ])
1710            .with_precision_and_scale(38, 7)
1711            .unwrap(),
1712        ) as ArrayRef;
1713
1714        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1715        for i in 0..rows.num_rows() - 1 {
1716            assert!(rows.row(i) < rows.row(i + 1));
1717        }
1718
1719        let back = converter.convert_rows(&rows).unwrap();
1720        assert_eq!(back.len(), 1);
1721        assert_eq!(col.as_ref(), back[0].as_ref())
1722    }
1723
1724    #[test]
1725    fn test_decimal256() {
1726        let converter = RowConverter::new(vec![SortField::new(DataType::Decimal256(
1727            DECIMAL256_MAX_PRECISION,
1728            7,
1729        ))])
1730        .unwrap();
1731        let col = Arc::new(
1732            Decimal256Array::from_iter([
1733                None,
1734                Some(i256::MIN),
1735                Some(i256::from_parts(0, -1)),
1736                Some(i256::from_parts(u128::MAX, -1)),
1737                Some(i256::from_parts(u128::MAX, 0)),
1738                Some(i256::from_parts(0, 46_i128)),
1739                Some(i256::from_parts(5, 46_i128)),
1740                Some(i256::MAX),
1741            ])
1742            .with_precision_and_scale(DECIMAL256_MAX_PRECISION, 7)
1743            .unwrap(),
1744        ) as ArrayRef;
1745
1746        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1747        for i in 0..rows.num_rows() - 1 {
1748            assert!(rows.row(i) < rows.row(i + 1));
1749        }
1750
1751        let back = converter.convert_rows(&rows).unwrap();
1752        assert_eq!(back.len(), 1);
1753        assert_eq!(col.as_ref(), back[0].as_ref())
1754    }
1755
1756    #[test]
1757    fn test_bool() {
1758        let converter = RowConverter::new(vec![SortField::new(DataType::Boolean)]).unwrap();
1759
1760        let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)])) as ArrayRef;
1761
1762        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1763        assert!(rows.row(2) > rows.row(1));
1764        assert!(rows.row(2) > rows.row(0));
1765        assert!(rows.row(1) > rows.row(0));
1766
1767        let cols = converter.convert_rows(&rows).unwrap();
1768        assert_eq!(&cols[0], &col);
1769
1770        let converter = RowConverter::new(vec![SortField::new_with_options(
1771            DataType::Boolean,
1772            SortOptions::default().desc().with_nulls_first(false),
1773        )])
1774        .unwrap();
1775
1776        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1777        assert!(rows.row(2) < rows.row(1));
1778        assert!(rows.row(2) < rows.row(0));
1779        assert!(rows.row(1) < rows.row(0));
1780        let cols = converter.convert_rows(&rows).unwrap();
1781        assert_eq!(&cols[0], &col);
1782    }
1783
1784    #[test]
1785    fn test_timezone() {
1786        let a =
1787            TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]).with_timezone("+01:00".to_string());
1788        let d = a.data_type().clone();
1789
1790        let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
1791        let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
1792        let back = converter.convert_rows(&rows).unwrap();
1793        assert_eq!(back.len(), 1);
1794        assert_eq!(back[0].data_type(), &d);
1795
1796        // Test dictionary
1797        let mut a = PrimitiveDictionaryBuilder::<Int32Type, TimestampNanosecondType>::new();
1798        a.append(34).unwrap();
1799        a.append_null();
1800        a.append(345).unwrap();
1801
1802        // Construct dictionary with a timezone
1803        let dict = a.finish();
1804        let values = TimestampNanosecondArray::from(dict.values().to_data());
1805        let dict_with_tz = dict.with_values(Arc::new(values.with_timezone("+02:00")));
1806        let v = DataType::Timestamp(TimeUnit::Nanosecond, Some("+02:00".into()));
1807        let d = DataType::Dictionary(Box::new(DataType::Int32), Box::new(v.clone()));
1808
1809        assert_eq!(dict_with_tz.data_type(), &d);
1810        let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
1811        let rows = converter
1812            .convert_columns(&[Arc::new(dict_with_tz) as _])
1813            .unwrap();
1814        let back = converter.convert_rows(&rows).unwrap();
1815        assert_eq!(back.len(), 1);
1816        assert_eq!(back[0].data_type(), &v);
1817    }
1818
1819    #[test]
1820    fn test_null_encoding() {
1821        let col = Arc::new(NullArray::new(10));
1822        let converter = RowConverter::new(vec![SortField::new(DataType::Null)]).unwrap();
1823        let rows = converter.convert_columns(&[col]).unwrap();
1824        assert_eq!(rows.num_rows(), 10);
1825        assert_eq!(rows.row(1).data.len(), 0);
1826    }
1827
1828    #[test]
1829    fn test_variable_width() {
1830        let col = Arc::new(StringArray::from_iter([
1831            Some("hello"),
1832            Some("he"),
1833            None,
1834            Some("foo"),
1835            Some(""),
1836        ])) as ArrayRef;
1837
1838        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1839        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1840
1841        assert!(rows.row(1) < rows.row(0));
1842        assert!(rows.row(2) < rows.row(4));
1843        assert!(rows.row(3) < rows.row(0));
1844        assert!(rows.row(3) < rows.row(1));
1845
1846        let cols = converter.convert_rows(&rows).unwrap();
1847        assert_eq!(&cols[0], &col);
1848
1849        let col = Arc::new(BinaryArray::from_iter([
1850            None,
1851            Some(vec![0_u8; 0]),
1852            Some(vec![0_u8; 6]),
1853            Some(vec![0_u8; variable::MINI_BLOCK_SIZE]),
1854            Some(vec![0_u8; variable::MINI_BLOCK_SIZE + 1]),
1855            Some(vec![0_u8; variable::BLOCK_SIZE]),
1856            Some(vec![0_u8; variable::BLOCK_SIZE + 1]),
1857            Some(vec![1_u8; 6]),
1858            Some(vec![1_u8; variable::MINI_BLOCK_SIZE]),
1859            Some(vec![1_u8; variable::MINI_BLOCK_SIZE + 1]),
1860            Some(vec![1_u8; variable::BLOCK_SIZE]),
1861            Some(vec![1_u8; variable::BLOCK_SIZE + 1]),
1862            Some(vec![0xFF_u8; 6]),
1863            Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE]),
1864            Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE + 1]),
1865            Some(vec![0xFF_u8; variable::BLOCK_SIZE]),
1866            Some(vec![0xFF_u8; variable::BLOCK_SIZE + 1]),
1867        ])) as ArrayRef;
1868
1869        let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
1870        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1871
1872        for i in 0..rows.num_rows() {
1873            for j in i + 1..rows.num_rows() {
1874                assert!(
1875                    rows.row(i) < rows.row(j),
1876                    "{} < {} - {:?} < {:?}",
1877                    i,
1878                    j,
1879                    rows.row(i),
1880                    rows.row(j)
1881                );
1882            }
1883        }
1884
1885        let cols = converter.convert_rows(&rows).unwrap();
1886        assert_eq!(&cols[0], &col);
1887
1888        let converter = RowConverter::new(vec![SortField::new_with_options(
1889            DataType::Binary,
1890            SortOptions::default().desc().with_nulls_first(false),
1891        )])
1892        .unwrap();
1893        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1894
1895        for i in 0..rows.num_rows() {
1896            for j in i + 1..rows.num_rows() {
1897                assert!(
1898                    rows.row(i) > rows.row(j),
1899                    "{} > {} - {:?} > {:?}",
1900                    i,
1901                    j,
1902                    rows.row(i),
1903                    rows.row(j)
1904                );
1905            }
1906        }
1907
1908        let cols = converter.convert_rows(&rows).unwrap();
1909        assert_eq!(&cols[0], &col);
1910    }
1911
1912    /// If `exact` is false performs a logical comparison between a and dictionary-encoded b
1913    fn dictionary_eq(a: &dyn Array, b: &dyn Array) {
1914        match b.data_type() {
1915            DataType::Dictionary(_, v) => {
1916                assert_eq!(a.data_type(), v.as_ref());
1917                let b = arrow_cast::cast(b, v).unwrap();
1918                assert_eq!(a, b.as_ref())
1919            }
1920            _ => assert_eq!(a, b),
1921        }
1922    }
1923
1924    #[test]
1925    fn test_string_dictionary() {
1926        let a = Arc::new(DictionaryArray::<Int32Type>::from_iter([
1927            Some("foo"),
1928            Some("hello"),
1929            Some("he"),
1930            None,
1931            Some("hello"),
1932            Some(""),
1933            Some("hello"),
1934            Some("hello"),
1935        ])) as ArrayRef;
1936
1937        let field = SortField::new(a.data_type().clone());
1938        let converter = RowConverter::new(vec![field]).unwrap();
1939        let rows_a = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
1940
1941        assert!(rows_a.row(3) < rows_a.row(5));
1942        assert!(rows_a.row(2) < rows_a.row(1));
1943        assert!(rows_a.row(0) < rows_a.row(1));
1944        assert!(rows_a.row(3) < rows_a.row(0));
1945
1946        assert_eq!(rows_a.row(1), rows_a.row(4));
1947        assert_eq!(rows_a.row(1), rows_a.row(6));
1948        assert_eq!(rows_a.row(1), rows_a.row(7));
1949
1950        let cols = converter.convert_rows(&rows_a).unwrap();
1951        dictionary_eq(&cols[0], &a);
1952
1953        let b = Arc::new(DictionaryArray::<Int32Type>::from_iter([
1954            Some("hello"),
1955            None,
1956            Some("cupcakes"),
1957        ])) as ArrayRef;
1958
1959        let rows_b = converter.convert_columns(&[Arc::clone(&b)]).unwrap();
1960        assert_eq!(rows_a.row(1), rows_b.row(0));
1961        assert_eq!(rows_a.row(3), rows_b.row(1));
1962        assert!(rows_b.row(2) < rows_a.row(0));
1963
1964        let cols = converter.convert_rows(&rows_b).unwrap();
1965        dictionary_eq(&cols[0], &b);
1966
1967        let converter = RowConverter::new(vec![SortField::new_with_options(
1968            a.data_type().clone(),
1969            SortOptions::default().desc().with_nulls_first(false),
1970        )])
1971        .unwrap();
1972
1973        let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
1974        assert!(rows_c.row(3) > rows_c.row(5));
1975        assert!(rows_c.row(2) > rows_c.row(1));
1976        assert!(rows_c.row(0) > rows_c.row(1));
1977        assert!(rows_c.row(3) > rows_c.row(0));
1978
1979        let cols = converter.convert_rows(&rows_c).unwrap();
1980        dictionary_eq(&cols[0], &a);
1981
1982        let converter = RowConverter::new(vec![SortField::new_with_options(
1983            a.data_type().clone(),
1984            SortOptions::default().desc().with_nulls_first(true),
1985        )])
1986        .unwrap();
1987
1988        let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
1989        assert!(rows_c.row(3) < rows_c.row(5));
1990        assert!(rows_c.row(2) > rows_c.row(1));
1991        assert!(rows_c.row(0) > rows_c.row(1));
1992        assert!(rows_c.row(3) < rows_c.row(0));
1993
1994        let cols = converter.convert_rows(&rows_c).unwrap();
1995        dictionary_eq(&cols[0], &a);
1996    }
1997
1998    #[test]
1999    fn test_struct() {
2000        // Test basic
2001        let a = Arc::new(Int32Array::from(vec![1, 1, 2, 2])) as ArrayRef;
2002        let a_f = Arc::new(Field::new("int", DataType::Int32, false));
2003        let u = Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as ArrayRef;
2004        let u_f = Arc::new(Field::new("s", DataType::Utf8, false));
2005        let s1 = Arc::new(StructArray::from(vec![(a_f, a), (u_f, u)])) as ArrayRef;
2006
2007        let sort_fields = vec![SortField::new(s1.data_type().clone())];
2008        let converter = RowConverter::new(sort_fields).unwrap();
2009        let r1 = converter.convert_columns(&[Arc::clone(&s1)]).unwrap();
2010
2011        for (a, b) in r1.iter().zip(r1.iter().skip(1)) {
2012            assert!(a < b);
2013        }
2014
2015        let back = converter.convert_rows(&r1).unwrap();
2016        assert_eq!(back.len(), 1);
2017        assert_eq!(&back[0], &s1);
2018
2019        // Test struct nullability
2020        let data = s1
2021            .to_data()
2022            .into_builder()
2023            .null_bit_buffer(Some(Buffer::from_slice_ref([0b00001010])))
2024            .null_count(2)
2025            .build()
2026            .unwrap();
2027
2028        let s2 = Arc::new(StructArray::from(data)) as ArrayRef;
2029        let r2 = converter.convert_columns(&[Arc::clone(&s2)]).unwrap();
2030        assert_eq!(r2.row(0), r2.row(2)); // Nulls equal
2031        assert!(r2.row(0) < r2.row(1)); // Nulls first
2032        assert_ne!(r1.row(0), r2.row(0)); // Value does not equal null
2033        assert_eq!(r1.row(1), r2.row(1)); // Values equal
2034
2035        let back = converter.convert_rows(&r2).unwrap();
2036        assert_eq!(back.len(), 1);
2037        assert_eq!(&back[0], &s2);
2038
2039        back[0].to_data().validate_full().unwrap();
2040    }
2041
2042    #[test]
2043    fn test_primitive_dictionary() {
2044        let mut builder = PrimitiveDictionaryBuilder::<Int32Type, Int32Type>::new();
2045        builder.append(2).unwrap();
2046        builder.append(3).unwrap();
2047        builder.append(0).unwrap();
2048        builder.append_null();
2049        builder.append(5).unwrap();
2050        builder.append(3).unwrap();
2051        builder.append(-1).unwrap();
2052
2053        let a = builder.finish();
2054        let data_type = a.data_type().clone();
2055        let columns = [Arc::new(a) as ArrayRef];
2056
2057        let field = SortField::new(data_type.clone());
2058        let converter = RowConverter::new(vec![field]).unwrap();
2059        let rows = converter.convert_columns(&columns).unwrap();
2060        assert!(rows.row(0) < rows.row(1));
2061        assert!(rows.row(2) < rows.row(0));
2062        assert!(rows.row(3) < rows.row(2));
2063        assert!(rows.row(6) < rows.row(2));
2064        assert!(rows.row(3) < rows.row(6));
2065    }
2066
2067    #[test]
2068    fn test_dictionary_nulls() {
2069        let values = Int32Array::from_iter([Some(1), Some(-1), None, Some(4), None]).into_data();
2070        let keys =
2071            Int32Array::from_iter([Some(0), Some(0), Some(1), Some(2), Some(4), None]).into_data();
2072
2073        let data_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32));
2074        let data = keys
2075            .into_builder()
2076            .data_type(data_type.clone())
2077            .child_data(vec![values])
2078            .build()
2079            .unwrap();
2080
2081        let columns = [Arc::new(DictionaryArray::<Int32Type>::from(data)) as ArrayRef];
2082        let field = SortField::new(data_type.clone());
2083        let converter = RowConverter::new(vec![field]).unwrap();
2084        let rows = converter.convert_columns(&columns).unwrap();
2085
2086        assert_eq!(rows.row(0), rows.row(1));
2087        assert_eq!(rows.row(3), rows.row(4));
2088        assert_eq!(rows.row(4), rows.row(5));
2089        assert!(rows.row(3) < rows.row(0));
2090    }
2091
2092    #[test]
2093    #[should_panic(expected = "Encountered non UTF-8 data")]
2094    fn test_invalid_utf8() {
2095        let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2096        let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2097        let rows = converter.convert_columns(&[array]).unwrap();
2098        let binary_row = rows.row(0);
2099
2100        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2101        let parser = converter.parser();
2102        let utf8_row = parser.parse(binary_row.as_ref());
2103
2104        converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2105    }
2106
2107    #[test]
2108    #[should_panic(expected = "Encountered non UTF-8 data")]
2109    fn test_invalid_utf8_array() {
2110        let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2111        let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2112        let rows = converter.convert_columns(&[array]).unwrap();
2113        let binary_rows = rows.try_into_binary().expect("known-small rows");
2114
2115        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2116        let parsed = converter.from_binary(binary_rows);
2117
2118        converter.convert_rows(parsed.iter()).unwrap();
2119    }
2120
2121    #[test]
2122    #[should_panic(expected = "index out of bounds")]
2123    fn test_invalid_empty() {
2124        let binary_row: &[u8] = &[];
2125
2126        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2127        let parser = converter.parser();
2128        let utf8_row = parser.parse(binary_row.as_ref());
2129
2130        converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2131    }
2132
2133    #[test]
2134    #[should_panic(expected = "index out of bounds")]
2135    fn test_invalid_empty_array() {
2136        let row: &[u8] = &[];
2137        let binary_rows = BinaryArray::from(vec![row]);
2138
2139        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2140        let parsed = converter.from_binary(binary_rows);
2141
2142        converter.convert_rows(parsed.iter()).unwrap();
2143    }
2144
2145    #[test]
2146    #[should_panic(expected = "index out of bounds")]
2147    fn test_invalid_truncated() {
2148        let binary_row: &[u8] = &[0x02];
2149
2150        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2151        let parser = converter.parser();
2152        let utf8_row = parser.parse(binary_row.as_ref());
2153
2154        converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2155    }
2156
2157    #[test]
2158    #[should_panic(expected = "index out of bounds")]
2159    fn test_invalid_truncated_array() {
2160        let row: &[u8] = &[0x02];
2161        let binary_rows = BinaryArray::from(vec![row]);
2162
2163        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2164        let parsed = converter.from_binary(binary_rows);
2165
2166        converter.convert_rows(parsed.iter()).unwrap();
2167    }
2168
2169    #[test]
2170    #[should_panic(expected = "rows were not produced by this RowConverter")]
2171    fn test_different_converter() {
2172        let values = Arc::new(Int32Array::from_iter([Some(1), Some(-1)]));
2173        let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2174        let rows = converter.convert_columns(&[values]).unwrap();
2175
2176        let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2177        let _ = converter.convert_rows(&rows);
2178    }
2179
2180    fn test_single_list<O: OffsetSizeTrait>() {
2181        let mut builder = GenericListBuilder::<O, _>::new(Int32Builder::new());
2182        builder.values().append_value(32);
2183        builder.values().append_value(52);
2184        builder.values().append_value(32);
2185        builder.append(true);
2186        builder.values().append_value(32);
2187        builder.values().append_value(52);
2188        builder.values().append_value(12);
2189        builder.append(true);
2190        builder.values().append_value(32);
2191        builder.values().append_value(52);
2192        builder.append(true);
2193        builder.values().append_value(32); // MASKED
2194        builder.values().append_value(52); // MASKED
2195        builder.append(false);
2196        builder.values().append_value(32);
2197        builder.values().append_null();
2198        builder.append(true);
2199        builder.append(true);
2200
2201        let list = Arc::new(builder.finish()) as ArrayRef;
2202        let d = list.data_type().clone();
2203
2204        let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2205
2206        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2207        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
2208        assert!(rows.row(2) < rows.row(1)); // [32, 42] < [32, 52, 12]
2209        assert!(rows.row(3) < rows.row(2)); // null < [32, 42]
2210        assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 42]
2211        assert!(rows.row(5) < rows.row(2)); // [] < [32, 42]
2212        assert!(rows.row(3) < rows.row(5)); // null < []
2213
2214        let back = converter.convert_rows(&rows).unwrap();
2215        assert_eq!(back.len(), 1);
2216        back[0].to_data().validate_full().unwrap();
2217        assert_eq!(&back[0], &list);
2218
2219        let options = SortOptions::default().asc().with_nulls_first(false);
2220        let field = SortField::new_with_options(d.clone(), options);
2221        let converter = RowConverter::new(vec![field]).unwrap();
2222        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2223
2224        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
2225        assert!(rows.row(2) < rows.row(1)); // [32, 42] < [32, 52, 12]
2226        assert!(rows.row(3) > rows.row(2)); // null > [32, 42]
2227        assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 42]
2228        assert!(rows.row(5) < rows.row(2)); // [] < [32, 42]
2229        assert!(rows.row(3) > rows.row(5)); // null > []
2230
2231        let back = converter.convert_rows(&rows).unwrap();
2232        assert_eq!(back.len(), 1);
2233        back[0].to_data().validate_full().unwrap();
2234        assert_eq!(&back[0], &list);
2235
2236        let options = SortOptions::default().desc().with_nulls_first(false);
2237        let field = SortField::new_with_options(d.clone(), options);
2238        let converter = RowConverter::new(vec![field]).unwrap();
2239        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2240
2241        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
2242        assert!(rows.row(2) > rows.row(1)); // [32, 42] > [32, 52, 12]
2243        assert!(rows.row(3) > rows.row(2)); // null > [32, 42]
2244        assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 42]
2245        assert!(rows.row(5) > rows.row(2)); // [] > [32, 42]
2246        assert!(rows.row(3) > rows.row(5)); // null > []
2247
2248        let back = converter.convert_rows(&rows).unwrap();
2249        assert_eq!(back.len(), 1);
2250        back[0].to_data().validate_full().unwrap();
2251        assert_eq!(&back[0], &list);
2252
2253        let options = SortOptions::default().desc().with_nulls_first(true);
2254        let field = SortField::new_with_options(d, options);
2255        let converter = RowConverter::new(vec![field]).unwrap();
2256        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2257
2258        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
2259        assert!(rows.row(2) > rows.row(1)); // [32, 42] > [32, 52, 12]
2260        assert!(rows.row(3) < rows.row(2)); // null < [32, 42]
2261        assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 42]
2262        assert!(rows.row(5) > rows.row(2)); // [] > [32, 42]
2263        assert!(rows.row(3) < rows.row(5)); // null < []
2264
2265        let back = converter.convert_rows(&rows).unwrap();
2266        assert_eq!(back.len(), 1);
2267        back[0].to_data().validate_full().unwrap();
2268        assert_eq!(&back[0], &list);
2269    }
2270
2271    fn test_nested_list<O: OffsetSizeTrait>() {
2272        let mut builder =
2273            GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(Int32Builder::new()));
2274
2275        builder.values().values().append_value(1);
2276        builder.values().values().append_value(2);
2277        builder.values().append(true);
2278        builder.values().values().append_value(1);
2279        builder.values().values().append_null();
2280        builder.values().append(true);
2281        builder.append(true);
2282
2283        builder.values().values().append_value(1);
2284        builder.values().values().append_null();
2285        builder.values().append(true);
2286        builder.values().values().append_value(1);
2287        builder.values().values().append_null();
2288        builder.values().append(true);
2289        builder.append(true);
2290
2291        builder.values().values().append_value(1);
2292        builder.values().values().append_null();
2293        builder.values().append(true);
2294        builder.values().append(false);
2295        builder.append(true);
2296        builder.append(false);
2297
2298        builder.values().values().append_value(1);
2299        builder.values().values().append_value(2);
2300        builder.values().append(true);
2301        builder.append(true);
2302
2303        let list = Arc::new(builder.finish()) as ArrayRef;
2304        let d = list.data_type().clone();
2305
2306        // [
2307        //   [[1, 2], [1, null]],
2308        //   [[1, null], [1, null]],
2309        //   [[1, null], null]
2310        //   null
2311        //   [[1, 2]]
2312        // ]
2313        let options = SortOptions::default().asc().with_nulls_first(true);
2314        let field = SortField::new_with_options(d.clone(), options);
2315        let converter = RowConverter::new(vec![field]).unwrap();
2316        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2317
2318        assert!(rows.row(0) > rows.row(1));
2319        assert!(rows.row(1) > rows.row(2));
2320        assert!(rows.row(2) > rows.row(3));
2321        assert!(rows.row(4) < rows.row(0));
2322        assert!(rows.row(4) > rows.row(1));
2323
2324        let back = converter.convert_rows(&rows).unwrap();
2325        assert_eq!(back.len(), 1);
2326        back[0].to_data().validate_full().unwrap();
2327        assert_eq!(&back[0], &list);
2328
2329        let options = SortOptions::default().desc().with_nulls_first(true);
2330        let field = SortField::new_with_options(d.clone(), options);
2331        let converter = RowConverter::new(vec![field]).unwrap();
2332        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2333
2334        assert!(rows.row(0) > rows.row(1));
2335        assert!(rows.row(1) > rows.row(2));
2336        assert!(rows.row(2) > rows.row(3));
2337        assert!(rows.row(4) > rows.row(0));
2338        assert!(rows.row(4) > rows.row(1));
2339
2340        let back = converter.convert_rows(&rows).unwrap();
2341        assert_eq!(back.len(), 1);
2342        back[0].to_data().validate_full().unwrap();
2343        assert_eq!(&back[0], &list);
2344
2345        let options = SortOptions::default().desc().with_nulls_first(false);
2346        let field = SortField::new_with_options(d, options);
2347        let converter = RowConverter::new(vec![field]).unwrap();
2348        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2349
2350        assert!(rows.row(0) < rows.row(1));
2351        assert!(rows.row(1) < rows.row(2));
2352        assert!(rows.row(2) < rows.row(3));
2353        assert!(rows.row(4) > rows.row(0));
2354        assert!(rows.row(4) < rows.row(1));
2355
2356        let back = converter.convert_rows(&rows).unwrap();
2357        assert_eq!(back.len(), 1);
2358        back[0].to_data().validate_full().unwrap();
2359        assert_eq!(&back[0], &list);
2360    }
2361
2362    #[test]
2363    fn test_list() {
2364        test_single_list::<i32>();
2365        test_nested_list::<i32>();
2366    }
2367
2368    #[test]
2369    fn test_large_list() {
2370        test_single_list::<i64>();
2371        test_nested_list::<i64>();
2372    }
2373
2374    fn generate_primitive_array<K>(len: usize, valid_percent: f64) -> PrimitiveArray<K>
2375    where
2376        K: ArrowPrimitiveType,
2377        StandardUniform: Distribution<K::Native>,
2378    {
2379        let mut rng = rng();
2380        (0..len)
2381            .map(|_| rng.random_bool(valid_percent).then(|| rng.random()))
2382            .collect()
2383    }
2384
2385    fn generate_strings<O: OffsetSizeTrait>(
2386        len: usize,
2387        valid_percent: f64,
2388    ) -> GenericStringArray<O> {
2389        let mut rng = rng();
2390        (0..len)
2391            .map(|_| {
2392                rng.random_bool(valid_percent).then(|| {
2393                    let len = rng.random_range(0..100);
2394                    let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
2395                    String::from_utf8(bytes).unwrap()
2396                })
2397            })
2398            .collect()
2399    }
2400
2401    fn generate_string_view(len: usize, valid_percent: f64) -> StringViewArray {
2402        let mut rng = rng();
2403        (0..len)
2404            .map(|_| {
2405                rng.random_bool(valid_percent).then(|| {
2406                    let len = rng.random_range(0..100);
2407                    let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
2408                    String::from_utf8(bytes).unwrap()
2409                })
2410            })
2411            .collect()
2412    }
2413
2414    fn generate_byte_view(len: usize, valid_percent: f64) -> BinaryViewArray {
2415        let mut rng = rng();
2416        (0..len)
2417            .map(|_| {
2418                rng.random_bool(valid_percent).then(|| {
2419                    let len = rng.random_range(0..100);
2420                    let bytes: Vec<_> = (0..len).map(|_| rng.random_range(0..128)).collect();
2421                    bytes
2422                })
2423            })
2424            .collect()
2425    }
2426
2427    fn generate_dictionary<K>(
2428        values: ArrayRef,
2429        len: usize,
2430        valid_percent: f64,
2431    ) -> DictionaryArray<K>
2432    where
2433        K: ArrowDictionaryKeyType,
2434        K::Native: SampleUniform,
2435    {
2436        let mut rng = rng();
2437        let min_key = K::Native::from_usize(0).unwrap();
2438        let max_key = K::Native::from_usize(values.len()).unwrap();
2439        let keys: PrimitiveArray<K> = (0..len)
2440            .map(|_| {
2441                rng.random_bool(valid_percent)
2442                    .then(|| rng.random_range(min_key..max_key))
2443            })
2444            .collect();
2445
2446        let data_type =
2447            DataType::Dictionary(Box::new(K::DATA_TYPE), Box::new(values.data_type().clone()));
2448
2449        let data = keys
2450            .into_data()
2451            .into_builder()
2452            .data_type(data_type)
2453            .add_child_data(values.to_data())
2454            .build()
2455            .unwrap();
2456
2457        DictionaryArray::from(data)
2458    }
2459
2460    fn generate_fixed_size_binary(len: usize, valid_percent: f64) -> FixedSizeBinaryArray {
2461        let mut rng = rng();
2462        let width = rng.random_range(0..20);
2463        let mut builder = FixedSizeBinaryBuilder::new(width);
2464
2465        let mut b = vec![0; width as usize];
2466        for _ in 0..len {
2467            match rng.random_bool(valid_percent) {
2468                true => {
2469                    b.iter_mut().for_each(|x| *x = rng.random());
2470                    builder.append_value(&b).unwrap();
2471                }
2472                false => builder.append_null(),
2473            }
2474        }
2475
2476        builder.finish()
2477    }
2478
2479    fn generate_struct(len: usize, valid_percent: f64) -> StructArray {
2480        let mut rng = rng();
2481        let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
2482        let a = generate_primitive_array::<Int32Type>(len, valid_percent);
2483        let b = generate_strings::<i32>(len, valid_percent);
2484        let fields = Fields::from(vec![
2485            Field::new("a", DataType::Int32, true),
2486            Field::new("b", DataType::Utf8, true),
2487        ]);
2488        let values = vec![Arc::new(a) as _, Arc::new(b) as _];
2489        StructArray::new(fields, values, Some(nulls))
2490    }
2491
2492    fn generate_list<F>(len: usize, valid_percent: f64, values: F) -> ListArray
2493    where
2494        F: FnOnce(usize) -> ArrayRef,
2495    {
2496        let mut rng = rng();
2497        let offsets = OffsetBuffer::<i32>::from_lengths((0..len).map(|_| rng.random_range(0..10)));
2498        let values_len = offsets.last().unwrap().to_usize().unwrap();
2499        let values = values(values_len);
2500        let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
2501        let field = Arc::new(Field::new_list_field(values.data_type().clone(), true));
2502        ListArray::new(field, offsets, values, Some(nulls))
2503    }
2504
2505    fn generate_column(len: usize) -> ArrayRef {
2506        let mut rng = rng();
2507        match rng.random_range(0..16) {
2508            0 => Arc::new(generate_primitive_array::<Int32Type>(len, 0.8)),
2509            1 => Arc::new(generate_primitive_array::<UInt32Type>(len, 0.8)),
2510            2 => Arc::new(generate_primitive_array::<Int64Type>(len, 0.8)),
2511            3 => Arc::new(generate_primitive_array::<UInt64Type>(len, 0.8)),
2512            4 => Arc::new(generate_primitive_array::<Float32Type>(len, 0.8)),
2513            5 => Arc::new(generate_primitive_array::<Float64Type>(len, 0.8)),
2514            6 => Arc::new(generate_strings::<i32>(len, 0.8)),
2515            7 => Arc::new(generate_dictionary::<Int64Type>(
2516                // Cannot test dictionaries containing null values because of #2687
2517                Arc::new(generate_strings::<i32>(rng.random_range(1..len), 1.0)),
2518                len,
2519                0.8,
2520            )),
2521            8 => Arc::new(generate_dictionary::<Int64Type>(
2522                // Cannot test dictionaries containing null values because of #2687
2523                Arc::new(generate_primitive_array::<Int64Type>(
2524                    rng.random_range(1..len),
2525                    1.0,
2526                )),
2527                len,
2528                0.8,
2529            )),
2530            9 => Arc::new(generate_fixed_size_binary(len, 0.8)),
2531            10 => Arc::new(generate_struct(len, 0.8)),
2532            11 => Arc::new(generate_list(len, 0.8, |values_len| {
2533                Arc::new(generate_primitive_array::<Int64Type>(values_len, 0.8))
2534            })),
2535            12 => Arc::new(generate_list(len, 0.8, |values_len| {
2536                Arc::new(generate_strings::<i32>(values_len, 0.8))
2537            })),
2538            13 => Arc::new(generate_list(len, 0.8, |values_len| {
2539                Arc::new(generate_struct(values_len, 0.8))
2540            })),
2541            14 => Arc::new(generate_string_view(len, 0.8)),
2542            15 => Arc::new(generate_byte_view(len, 0.8)),
2543            _ => unreachable!(),
2544        }
2545    }
2546
2547    fn print_row(cols: &[SortColumn], row: usize) -> String {
2548        let t: Vec<_> = cols
2549            .iter()
2550            .map(|x| match x.values.is_valid(row) {
2551                true => {
2552                    let opts = FormatOptions::default().with_null("NULL");
2553                    let formatter = ArrayFormatter::try_new(x.values.as_ref(), &opts).unwrap();
2554                    formatter.value(row).to_string()
2555                }
2556                false => "NULL".to_string(),
2557            })
2558            .collect();
2559        t.join(",")
2560    }
2561
2562    fn print_col_types(cols: &[SortColumn]) -> String {
2563        let t: Vec<_> = cols
2564            .iter()
2565            .map(|x| x.values.data_type().to_string())
2566            .collect();
2567        t.join(",")
2568    }
2569
2570    #[test]
2571    #[cfg_attr(miri, ignore)]
2572    fn fuzz_test() {
2573        for _ in 0..100 {
2574            let mut rng = rng();
2575            let num_columns = rng.random_range(1..5);
2576            let len = rng.random_range(5..100);
2577            let arrays: Vec<_> = (0..num_columns).map(|_| generate_column(len)).collect();
2578
2579            let options: Vec<_> = (0..num_columns)
2580                .map(|_| SortOptions {
2581                    descending: rng.random_bool(0.5),
2582                    nulls_first: rng.random_bool(0.5),
2583                })
2584                .collect();
2585
2586            let sort_columns: Vec<_> = options
2587                .iter()
2588                .zip(&arrays)
2589                .map(|(o, c)| SortColumn {
2590                    values: Arc::clone(c),
2591                    options: Some(*o),
2592                })
2593                .collect();
2594
2595            let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap();
2596
2597            let columns: Vec<SortField> = options
2598                .into_iter()
2599                .zip(&arrays)
2600                .map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o))
2601                .collect();
2602
2603            let converter = RowConverter::new(columns).unwrap();
2604            let rows = converter.convert_columns(&arrays).unwrap();
2605
2606            for i in 0..len {
2607                for j in 0..len {
2608                    let row_i = rows.row(i);
2609                    let row_j = rows.row(j);
2610                    let row_cmp = row_i.cmp(&row_j);
2611                    let lex_cmp = comparator.compare(i, j);
2612                    assert_eq!(
2613                        row_cmp,
2614                        lex_cmp,
2615                        "({:?} vs {:?}) vs ({:?} vs {:?}) for types {}",
2616                        print_row(&sort_columns, i),
2617                        print_row(&sort_columns, j),
2618                        row_i,
2619                        row_j,
2620                        print_col_types(&sort_columns)
2621                    );
2622                }
2623            }
2624
2625            let back = converter.convert_rows(&rows).unwrap();
2626            for (actual, expected) in back.iter().zip(&arrays) {
2627                actual.to_data().validate_full().unwrap();
2628                dictionary_eq(actual, expected)
2629            }
2630
2631            // Check that we can convert
2632            let rows = rows.try_into_binary().expect("reasonable size");
2633            let parser = converter.parser();
2634            let back = converter
2635                .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
2636                .unwrap();
2637            for (actual, expected) in back.iter().zip(&arrays) {
2638                actual.to_data().validate_full().unwrap();
2639                dictionary_eq(actual, expected)
2640            }
2641
2642            let rows = converter.from_binary(rows);
2643            let back = converter.convert_rows(&rows).unwrap();
2644            for (actual, expected) in back.iter().zip(&arrays) {
2645                actual.to_data().validate_full().unwrap();
2646                dictionary_eq(actual, expected)
2647            }
2648        }
2649    }
2650
2651    #[test]
2652    fn test_clear() {
2653        let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2654        let mut rows = converter.empty_rows(3, 128);
2655
2656        let first = Int32Array::from(vec![None, Some(2), Some(4)]);
2657        let second = Int32Array::from(vec![Some(2), None, Some(4)]);
2658        let arrays = [Arc::new(first) as ArrayRef, Arc::new(second) as ArrayRef];
2659
2660        for array in arrays.iter() {
2661            rows.clear();
2662            converter.append(&mut rows, &[array.clone()]).unwrap();
2663            let back = converter.convert_rows(&rows).unwrap();
2664            assert_eq!(&back[0], array);
2665        }
2666
2667        let mut rows_expected = converter.empty_rows(3, 128);
2668        converter.append(&mut rows_expected, &arrays[1..]).unwrap();
2669
2670        for (i, (actual, expected)) in rows.iter().zip(rows_expected.iter()).enumerate() {
2671            assert_eq!(
2672                actual, expected,
2673                "For row {}: expected {:?}, actual: {:?}",
2674                i, expected, actual
2675            );
2676        }
2677    }
2678
2679    #[test]
2680    fn test_append_codec_dictionary_binary() {
2681        use DataType::*;
2682        // Dictionary RowConverter
2683        let converter = RowConverter::new(vec![SortField::new(Dictionary(
2684            Box::new(Int32),
2685            Box::new(Binary),
2686        ))])
2687        .unwrap();
2688        let mut rows = converter.empty_rows(4, 128);
2689
2690        let keys = Int32Array::from_iter_values([0, 1, 2, 3]);
2691        let values = BinaryArray::from(vec![
2692            Some("a".as_bytes()),
2693            Some(b"b"),
2694            Some(b"c"),
2695            Some(b"d"),
2696        ]);
2697        let dict_array = DictionaryArray::new(keys, Arc::new(values));
2698
2699        rows.clear();
2700        let array = Arc::new(dict_array) as ArrayRef;
2701        converter.append(&mut rows, &[array.clone()]).unwrap();
2702        let back = converter.convert_rows(&rows).unwrap();
2703
2704        dictionary_eq(&back[0], &array);
2705    }
2706
2707    #[test]
2708    fn test_list_prefix() {
2709        let mut a = ListBuilder::new(Int8Builder::new());
2710        a.append_value([None]);
2711        a.append_value([None, None]);
2712        let a = a.finish();
2713
2714        let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
2715        let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
2716        assert_eq!(rows.row(0).cmp(&rows.row(1)), Ordering::Less);
2717    }
2718}