arrow2/compute/sort/row/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! A comparable row-oriented representation of a collection of [`Array`].
19//!
20//! **This module is an arrow2 version of [arrow::row]:[https://docs.rs/arrow/latest/arrow/row/index.html]**
21//!
22//! As [`Row`] are [normalized for sorting], they can be very efficiently [compared](PartialOrd),
23//! using [`memcmp`] under the hood, or used in [non-comparison sorts] such as [radix sort]. This
24//! makes the row format ideal for implementing efficient multi-column sorting,
25//! grouping, aggregation, windowing and more.
26//!
27//! _Comparing [`Rows`] generated by different [`RowConverter`] is not guaranteed to
28//! yield a meaningful ordering_
29//!
30//! [non-comparison sorts]:[https://en.wikipedia.org/wiki/Sorting_algorithm#Non-comparison_sorts]
31//! [radix sort]:[https://en.wikipedia.org/wiki/Radix_sort]
32//! [normalized for sorting]:[https://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.83.1080&rep=rep1&type=pdf]
33//! [`memcmp`]:[https://www.man7.org/linux/man-pages/man3/memcmp.3.html]
34use std::{
35    cmp::Ordering,
36    hash::{Hash, Hasher},
37    sync::Arc,
38};
39
40use crate::{
41    array::{Array, BinaryArray, BooleanArray, DictionaryArray, PrimitiveArray, Utf8Array},
42    datatypes::PhysicalType,
43    error::*,
44};
45use crate::{compute::sort::SortOptions, datatypes::DataType};
46
47use self::{
48    dictionary::{compute_dictionary_mapping, encode_dictionary},
49    interner::OrderPreservingInterner,
50};
51
52mod dictionary;
53mod fixed;
54mod interner;
55mod variable;
56
57/// Converts `Box<dyn Array>` columns into a row-oriented format.
58///
59/// # Format
60///
61/// The encoding of the row format should not be considered stable, but is documented here
62/// for reference.
63///
64/// ## Unsigned Integer Encoding
65///
66/// A null integer is encoded as a `0_u8`, followed by a zero-ed number of bytes corresponding
67/// to the integer's length
68///
69/// A valid integer is encoded as `1_u8`, followed by the big-endian representation of the
70/// integer
71///
72/// ## Signed Integer Encoding
73///
74/// Signed integers have their most significant sign bit flipped, and are then encoded in the
75/// same manner as an unsigned integer
76///
77/// ## Float Encoding
78///
79/// Floats are converted from IEEE 754 representation to a signed integer representation
80/// by flipping all bar the sign bit if they are negative.
81///
82/// They are then encoded in the same manner as a signed integer
83///
84/// ## Variable Length Bytes Encoding
85///
86/// A null is encoded as a `0_u8`
87///
88/// An empty byte array is encoded as `1_u8`
89///
90/// A non-null, non-empty byte array is encoded as `2_u8` followed by the byte array
91/// encoded using a block based scheme described below.
92///
93/// The byte array is broken up into 32-byte blocks, each block is written in turn
94/// to the output, followed by `0xFF_u8`. The final block is padded to 32-bytes
95/// with `0_u8` and written to the output, followed by the un-padded length in bytes
96/// of this final block as a `u8`
97///
98/// This is loosely inspired by [COBS] encoding, and chosen over more traditional
99/// [byte stuffing] as it is more amenable to vectorisation, in particular AVX-256.
100///
101/// ## Dictionary Encoding
102///
103/// [`RowConverter`] needs to support converting dictionary encoded arrays with unsorted, and
104/// potentially distinct dictionaries. One simple mechanism to avoid this would be to reverse
105/// the dictionary encoding, and encode the array values directly, however, this would lose
106/// the benefits of dictionary encoding to reduce memory and CPU consumption.
107///
108/// As such the [`RowConverter`] maintains an order-preserving dictionary encoding for each
109/// dictionary encoded column. As this is a variable-length encoding, new dictionary values
110/// can be added whilst preserving the sort order.
111///
112/// A null dictionary value is encoded as `0_u8`.
113///
114/// A non-null dictionary value is encoded as `1_u8` followed by a null-terminated byte array
115/// key determined by the order-preserving dictionary encoding
116///
117/// # Ordering
118///
119/// ## Float Ordering
120///
121/// Floats are totally ordered in accordance to the `totalOrder` predicate as defined
122/// in the IEEE 754 (2008 revision) floating point standard.
123///
124/// The ordering established by this does not always agree with the
125/// [`PartialOrd`] and [`PartialEq`] implementations of `f32`. For example,
126/// they consider negative and positive zero equal, while this does not
127///
128/// ## Null Ordering
129///
130/// The encoding described above will order nulls first, this can be inverted by representing
131/// nulls as `0xFF_u8` instead of `0_u8`
132///
133/// ## Reverse Column Ordering
134///
135/// The order of a given column can be reversed by negating the encoded bytes of non-null values
136///
137/// [COBS]:[https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing]
138/// [byte stuffing]:[https://en.wikipedia.org/wiki/High-Level_Data_Link_Control#Asynchronous_framing]
139#[derive(Debug)]
140pub struct RowConverter {
141    /// Sort fields
142    fields: Arc<[SortField]>,
143    /// interning state for column `i`, if column`i` is a dictionary
144    interners: Vec<Option<Box<OrderPreservingInterner>>>,
145}
146
147/// Configure the data type and sort order for a given column
148#[derive(Debug, Clone, PartialEq, Eq)]
149pub struct SortField {
150    /// Sort options
151    options: SortOptions,
152    /// Data type
153    data_type: DataType,
154}
155
156impl SortField {
157    /// Create a new column with the given data type
158    pub fn new(data_type: DataType) -> Self {
159        Self::new_with_options(data_type, SortOptions::default())
160    }
161
162    /// Create a new column with the given data type and [`SortOptions`]
163    pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self {
164        Self { options, data_type }
165    }
166}
167
168impl RowConverter {
169    /// Create a new [`RowConverter`] with the provided schema
170    pub fn new(fields: Vec<SortField>) -> Self {
171        let interners = vec![None; fields.len()];
172        Self {
173            fields: fields.into(),
174            interners,
175        }
176    }
177
178    /// Convert a slice of [`Box<dyn Array>`] columns into [`Rows`]
179    ///
180    /// See [`Row`] for information on when [`Row`] can be compared
181    ///
182    /// # Panics
183    ///
184    /// Panics if the schema of `columns` does not match that provided to [`RowConverter::new`]
185    pub fn convert_columns(&mut self, columns: &[Box<dyn Array>]) -> Result<Rows> {
186        if columns.len() != self.fields.len() {
187            return Err(Error::InvalidArgumentError(format!(
188                "Incorrect number of arrays provided to RowConverter, expected {} got {}",
189                self.fields.len(),
190                columns.len()
191            )));
192        }
193
194        let dictionaries = columns
195            .iter()
196            .zip(&mut self.interners)
197            .zip(self.fields.iter())
198            .map(|((column, interner), field)| {
199                if column.data_type() != &field.data_type {
200                    return Err(Error::InvalidArgumentError(format!(
201                        "RowConverter column schema mismatch, expected {:?} got {:?}",
202                        field.data_type,
203                        column.data_type()
204                    )));
205                }
206
207                let values = match column.data_type().to_logical_type() {
208                    DataType::Dictionary(k, _, _) => match_integer_type!(k, |$T| {
209                        let column = column
210                            .as_any()
211                            .downcast_ref::<DictionaryArray<$T>>()
212                            .unwrap();
213                        column.values()
214                    }),
215                    _ => return Ok(None),
216                };
217
218                let interner = interner.get_or_insert_with(Default::default);
219
220                let mapping = compute_dictionary_mapping(interner, values)?
221                    .into_iter()
222                    .map(|maybe_interned| {
223                        maybe_interned.map(|interned| interner.normalized_key(interned))
224                    })
225                    .collect::<Vec<_>>();
226
227                Ok(Some(mapping))
228            })
229            .collect::<Result<Vec<_>>>()?;
230
231        let mut rows = new_empty_rows(columns, &dictionaries)?;
232
233        // jorgecarleitao's comments in PR#1287:
234        // This seems to be embarassibly parallel.
235        // Given that this is a transpose of O(N x C) where N is length and C number of columns, I wonder if we could split this so users can parallelize.
236        // This is almost parallelizable - it is changing rows.
237        // However, there is still an optimization since modifying rows is O(1) but encoding is O(C).
238        // Will continue to think about this.
239        for ((column, field), dictionary) in
240            columns.iter().zip(self.fields.iter()).zip(dictionaries)
241        {
242            // We encode a column at a time to minimise dispatch overheads
243            encode_column(&mut rows, column, field.options, dictionary.as_deref())
244        }
245
246        Ok(rows)
247    }
248}
249
250/// A row-oriented representation of arrow data, that is normalized for comparison
251///
252/// See [`RowConverter`]
253#[derive(Debug)]
254pub struct Rows {
255    /// Underlying row bytes
256    buffer: Box<[u8]>,
257    /// Row `i` has data `&buffer[offsets[i]..offsets[i+1]]`
258    offsets: Box<[usize]>,
259}
260
261impl Rows {
262    /// Get a reference to a certain row.
263    pub fn row(&self, row: usize) -> Row<'_> {
264        let end = self.offsets[row + 1];
265        let start = self.offsets[row];
266        Row {
267            data: unsafe { self.buffer.get_unchecked(start..end) },
268        }
269    }
270
271    /// Get a reference to a certain row but not check the bounds.
272    pub fn row_unchecked(&self, row: usize) -> Row<'_> {
273        let data = unsafe {
274            let end = *self.offsets.get_unchecked(row + 1);
275            let start = *self.offsets.get_unchecked(row);
276            self.buffer.get_unchecked(start..end)
277        };
278        Row { data }
279    }
280
281    /// Returns the number of rows
282    #[inline]
283    pub fn len(&self) -> usize {
284        self.offsets.len() - 1
285    }
286
287    #[inline]
288    /// Returns the iterator
289    pub fn iter(&self) -> RowsIter<'_> {
290        self.into_iter()
291    }
292}
293
294impl<'a> IntoIterator for &'a Rows {
295    type Item = Row<'a>;
296    type IntoIter = RowsIter<'a>;
297
298    #[inline]
299    fn into_iter(self) -> Self::IntoIter {
300        RowsIter {
301            rows: self,
302            start: 0,
303            end: self.len(),
304        }
305    }
306}
307
308/// An iterator over [`Rows`]
309#[derive(Debug)]
310pub struct RowsIter<'a> {
311    rows: &'a Rows,
312    start: usize,
313    end: usize,
314}
315
316impl<'a> Iterator for RowsIter<'a> {
317    type Item = Row<'a>;
318
319    #[inline]
320    fn next(&mut self) -> Option<Self::Item> {
321        if self.start < self.end {
322            let row = self.rows.row_unchecked(self.start);
323            self.start += 1;
324            Some(row)
325        } else {
326            None
327        }
328    }
329
330    #[inline]
331    fn size_hint(&self) -> (usize, Option<usize>) {
332        let len = self.len();
333        (len, Some(len))
334    }
335}
336
337impl<'a> ExactSizeIterator for RowsIter<'a> {
338    #[inline]
339    fn len(&self) -> usize {
340        self.end - self.start
341    }
342}
343
344impl<'a> DoubleEndedIterator for RowsIter<'a> {
345    fn next_back(&mut self) -> Option<Self::Item> {
346        if self.end == self.start {
347            return None;
348        }
349        let row = self.rows.row(self.end);
350        self.end -= 1;
351        Some(row)
352    }
353}
354
355unsafe impl<'a> crate::trusted_len::TrustedLen for RowsIter<'a> {}
356
357/// A comparable representation of a row
358///
359/// Two [`Row`] can be compared if they both belong to [`Rows`] returned by calls to
360/// [`RowConverter::convert_columns`] on the same [`RowConverter`]
361///
362/// Otherwise any ordering established by comparing the [`Row`] is arbitrary
363#[derive(Debug, Copy, Clone)]
364pub struct Row<'a> {
365    data: &'a [u8],
366}
367
368// Manually derive these as don't wish to include `fields`
369
370impl<'a> PartialEq for Row<'a> {
371    #[inline]
372    fn eq(&self, other: &Self) -> bool {
373        self.data.eq(other.data)
374    }
375}
376
377impl<'a> Eq for Row<'a> {}
378
379impl<'a> PartialOrd for Row<'a> {
380    #[inline]
381    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
382        self.data.partial_cmp(other.data)
383    }
384}
385
386impl<'a> Ord for Row<'a> {
387    #[inline]
388    fn cmp(&self, other: &Self) -> Ordering {
389        self.data.cmp(other.data)
390    }
391}
392
393impl<'a> Hash for Row<'a> {
394    #[inline]
395    fn hash<H: Hasher>(&self, state: &mut H) {
396        self.data.hash(state)
397    }
398}
399
400impl<'a> AsRef<[u8]> for Row<'a> {
401    #[inline]
402    fn as_ref(&self) -> &[u8] {
403        self.data
404    }
405}
406
407/// Returns the null sentinel, negated if `invert` is true
408#[inline]
409fn null_sentinel(options: SortOptions) -> u8 {
410    match options.nulls_first {
411        true => 0,
412        false => 0xFF,
413    }
414}
415
416/// Match `PrimitiveType` to standard Rust types
417#[macro_export]
418macro_rules! with_match_primitive_without_interval_type {(
419    $key_type:expr, | $_:tt $T:ident | $($body:tt)*
420) => ({
421    macro_rules! __with_ty__ {( $_ $T:ident ) => ( $($body)* )}
422    use $crate::datatypes::PrimitiveType::*;
423    use $crate::types::{f16, i256};
424    match $key_type {
425        Int8 => __with_ty__! { i8 },
426        Int16 => __with_ty__! { i16 },
427        Int32 => __with_ty__! { i32 },
428        Int64 => __with_ty__! { i64 },
429        Int128 => __with_ty__! { i128 },
430        Int256 => __with_ty__! { i256 },
431        UInt8 => __with_ty__! { u8 },
432        UInt16 => __with_ty__! { u16 },
433        UInt32 => __with_ty__! { u32 },
434        UInt64 => __with_ty__! { u64 },
435        Float16 => __with_ty__! { f16 },
436        Float32 => __with_ty__! { f32 },
437        Float64 => __with_ty__! { f64 },
438        _ => unimplemented!("Unsupported type: {:?}", $key_type),
439    }
440})}
441
442/// Computes the length of each encoded [`Rows`] and returns an empty [`Rows`]
443fn new_empty_rows(
444    cols: &[Box<dyn Array>],
445    dictionaries: &[Option<Vec<Option<&[u8]>>>],
446) -> Result<Rows> {
447    use fixed::FixedLengthEncoding;
448
449    let num_rows = cols.first().map(|x| x.len()).unwrap_or(0);
450    let mut lengths = vec![0; num_rows];
451
452    for (array, dict) in cols.iter().zip(dictionaries) {
453        match array.data_type().to_physical_type() {
454            PhysicalType::Primitive(primitive) => {
455                with_match_primitive_without_interval_type!(primitive, |$T| {
456                    let array = array
457                        .as_any()
458                        .downcast_ref::<PrimitiveArray<$T>>()
459                        .unwrap();
460                    lengths.iter_mut().for_each(|x| *x += fixed::encoded_len(array))
461                })
462            }
463            PhysicalType::Null => {}
464            PhysicalType::Boolean => lengths.iter_mut().for_each(|x| *x += bool::ENCODED_LEN),
465            PhysicalType::Binary => array
466                .as_any()
467                .downcast_ref::<BinaryArray<i32>>()
468                .unwrap()
469                .iter()
470                .zip(lengths.iter_mut())
471                .for_each(|(slice, length)| *length += variable::encoded_len(slice)),
472            PhysicalType::LargeBinary => array
473                .as_any()
474                .downcast_ref::<BinaryArray<i64>>()
475                .unwrap()
476                .iter()
477                .zip(lengths.iter_mut())
478                .for_each(|(slice, length)| *length += variable::encoded_len(slice)),
479            PhysicalType::Utf8 => array
480                .as_any()
481                .downcast_ref::<Utf8Array<i32>>()
482                .unwrap()
483                .iter()
484                .zip(lengths.iter_mut())
485                .for_each(|(slice, length)| {
486                    *length += variable::encoded_len(slice.map(|x| x.as_bytes()))
487                }),
488            PhysicalType::LargeUtf8 => array
489                .as_any()
490                .downcast_ref::<Utf8Array<i64>>()
491                .unwrap()
492                .iter()
493                .zip(lengths.iter_mut())
494                .for_each(|(slice, length)| {
495                    *length += variable::encoded_len(slice.map(|x| x.as_bytes()))
496                }),
497            PhysicalType::Dictionary(k) => match_integer_type!(k, |$T| {
498                let array = array
499                    .as_any()
500                    .downcast_ref::<DictionaryArray<$T>>()
501                    .unwrap();
502                let dict = dict.as_ref().unwrap();
503                for (v, length) in array.keys().iter().zip(lengths.iter_mut()) {
504                    match v.and_then(|v| dict[*v as usize]) {
505                        Some(k) => *length += k.len() + 1,
506                        None => *length += 1,
507                    }
508                }
509            }),
510            t => {
511                return Err(Error::NotYetImplemented(format!(
512                    "not yet implemented: {t:?}"
513                )))
514            }
515        }
516    }
517
518    let mut offsets = Vec::with_capacity(num_rows + 1);
519    offsets.push(0);
520
521    // We initialize the offsets shifted down by one row index.
522    //
523    // As the rows are appended to the offsets will be incremented to match
524    //
525    // For example, consider the case of 3 rows of length 3, 4, and 6 respectively.
526    // The offsets would be initialized to `0, 0, 3, 7`
527    //
528    // Writing the first row entirely would yield `0, 3, 3, 7`
529    // The second, `0, 3, 7, 7`
530    // The third, `0, 3, 7, 13`
531    //
532    // This would be the final offsets for reading
533    //
534    // In this way offsets tracks the position during writing whilst eventually serving
535    // as identifying the offsets of the written rows
536    let mut cur_offset = 0_usize;
537    for l in lengths {
538        offsets.push(cur_offset);
539        cur_offset = cur_offset.checked_add(l).expect("overflow");
540    }
541
542    let buffer = vec![0_u8; cur_offset];
543
544    Ok(Rows {
545        buffer: buffer.into(),
546        offsets: offsets.into(),
547    })
548}
549
550/// Encodes a column to the provided [`Rows`] incrementing the offsets as it progresses
551fn encode_column(
552    out: &mut Rows,
553    column: &Box<dyn Array>,
554    opts: SortOptions,
555    dictionary: Option<&[Option<&[u8]>]>,
556) {
557    match column.data_type().to_physical_type() {
558        PhysicalType::Primitive(primitive) => {
559            with_match_primitive_without_interval_type!(primitive, |$T| {
560                let column = column
561                    .as_any()
562                    .downcast_ref::<PrimitiveArray<$T>>()
563                    .unwrap()
564                    .iter()
565                    .map(|v| v.map(|v| *v));
566                fixed::encode(out, column, opts);
567            })
568        }
569        PhysicalType::Null => {}
570        PhysicalType::Boolean => fixed::encode(
571            out,
572            column.as_any().downcast_ref::<BooleanArray>().unwrap(),
573            opts,
574        ),
575        PhysicalType::Binary => {
576            variable::encode(
577                out,
578                column
579                    .as_any()
580                    .downcast_ref::<BinaryArray<i32>>()
581                    .unwrap()
582                    .iter(),
583                opts,
584            );
585        }
586        PhysicalType::LargeBinary => {
587            variable::encode(
588                out,
589                column
590                    .as_any()
591                    .downcast_ref::<BinaryArray<i64>>()
592                    .unwrap()
593                    .iter(),
594                opts,
595            );
596        }
597        PhysicalType::Utf8 => variable::encode(
598            out,
599            column
600                .as_any()
601                .downcast_ref::<Utf8Array<i32>>()
602                .unwrap()
603                .iter()
604                .map(|x| x.map(|x| x.as_bytes())),
605            opts,
606        ),
607        PhysicalType::LargeUtf8 => variable::encode(
608            out,
609            column
610                .as_any()
611                .downcast_ref::<Utf8Array<i64>>()
612                .unwrap()
613                .iter()
614                .map(|x| x.map(|x| x.as_bytes())),
615            opts,
616        ),
617        PhysicalType::Dictionary(k) => match_integer_type!(k, |$T| {
618            let column = column
619                .as_any()
620                .downcast_ref::<DictionaryArray<$T>>()
621                .unwrap();
622            encode_dictionary(out, column, dictionary.unwrap(), opts);
623        }),
624        t => unimplemented!("not yet implemented: {:?}", t),
625    }
626}
627
628#[cfg(test)]
629mod tests {
630    use std::fmt::Debug;
631
632    use rand::{
633        distributions::{uniform::SampleUniform, Distribution, Standard},
634        thread_rng, Rng,
635    };
636
637    use super::*;
638    use crate::{
639        array::{Array, DictionaryKey, Float32Array, Int16Array, NullArray},
640        compute::sort::build_compare,
641        datatypes::DataType,
642        offset::Offset,
643        types::NativeType,
644    };
645
646    #[test]
647    fn test_fixed_width() {
648        let cols = [
649            Int16Array::from([Some(1), Some(2), None, Some(-5), Some(2), Some(2), Some(0)])
650                .to_boxed(),
651            Float32Array::from([
652                Some(1.3),
653                Some(2.5),
654                None,
655                Some(4.),
656                Some(0.1),
657                Some(-4.),
658                Some(-0.),
659            ])
660            .to_boxed(),
661        ];
662
663        let mut converter = RowConverter::new(vec![
664            SortField::new(DataType::Int16),
665            SortField::new(DataType::Float32),
666        ]);
667        let rows = converter.convert_columns(&cols).unwrap();
668
669        assert_eq!(rows.offsets.as_ref(), &[0, 8, 16, 24, 32, 40, 48, 56]);
670        assert_eq!(
671            rows.buffer.as_ref(),
672            &[
673                1, 128, 1, //
674                1, 191, 166, 102, 102, //
675                1, 128, 2, //
676                1, 192, 32, 0, 0, //
677                0, 0, 0, //
678                0, 0, 0, 0, 0, //
679                1, 127, 251, //
680                1, 192, 128, 0, 0, //
681                1, 128, 2, //
682                1, 189, 204, 204, 205, //
683                1, 128, 2, //
684                1, 63, 127, 255, 255, //
685                1, 128, 0, //
686                1, 127, 255, 255, 255 //
687            ]
688        );
689
690        assert!(rows.row(3) < rows.row(6));
691        assert!(rows.row(0) < rows.row(1));
692        assert!(rows.row(3) < rows.row(0));
693        assert!(rows.row(4) < rows.row(1));
694        assert!(rows.row(5) < rows.row(4));
695    }
696
697    #[test]
698    fn test_null_encoding() {
699        let col = NullArray::new(DataType::Null, 10).to_boxed();
700        let mut converter = RowConverter::new(vec![SortField::new(DataType::Null)]);
701        let rows = converter.convert_columns(&[col]).unwrap();
702        assert_eq!(rows.len(), 10);
703        assert_eq!(rows.row(1).data.len(), 0);
704    }
705
706    fn generate_primitive_array<K>(len: usize, valid_percent: f64) -> PrimitiveArray<K>
707    where
708        K: NativeType,
709        Standard: Distribution<K>,
710    {
711        let mut rng = thread_rng();
712        (0..len)
713            .map(|_| rng.gen_bool(valid_percent).then(|| rng.gen()))
714            .collect()
715    }
716
717    fn generate_strings<O: Offset>(len: usize, valid_percent: f64) -> Utf8Array<O> {
718        let mut rng = thread_rng();
719        (0..len)
720            .map(|_| {
721                rng.gen_bool(valid_percent).then(|| {
722                    let len = rng.gen_range(0..100);
723                    let bytes = (0..len).map(|_| rng.gen_range(0..128)).collect();
724                    String::from_utf8(bytes).unwrap()
725                })
726            })
727            .collect()
728    }
729
730    fn generate_dictionary<K>(
731        values: Box<dyn Array>,
732        len: usize,
733        valid_percent: f64,
734    ) -> DictionaryArray<K>
735    where
736        K: DictionaryKey + Ord + SampleUniform,
737        <K as TryFrom<usize>>::Error: Debug,
738    {
739        let mut rng = thread_rng();
740        let min_key = 0_usize.try_into().unwrap();
741        let max_key = values.len().try_into().unwrap();
742        let keys: PrimitiveArray<K> = (0..len)
743            .map(|_| {
744                rng.gen_bool(valid_percent)
745                    .then(|| rng.gen_range(min_key..max_key))
746            })
747            .collect();
748
749        DictionaryArray::try_from_keys(keys, values).unwrap()
750    }
751
752    fn generate_column(len: usize) -> Box<dyn Array> {
753        let mut rng = thread_rng();
754        match rng.gen_range(0..9) {
755            0 => Box::new(generate_primitive_array::<i32>(len, 0.8)),
756            1 => Box::new(generate_primitive_array::<u32>(len, 0.8)),
757            2 => Box::new(generate_primitive_array::<i64>(len, 0.8)),
758            3 => Box::new(generate_primitive_array::<u64>(len, 0.8)),
759            4 => Box::new(generate_primitive_array::<f32>(len, 0.8)),
760            5 => Box::new(generate_primitive_array::<f64>(len, 0.8)),
761            6 => Box::new(generate_strings::<i32>(len, 0.8)),
762            7 => Box::new(generate_dictionary::<i64>(
763                // Cannot test dictionaries containing null values because of #2687
764                Box::new(generate_strings::<i32>(rng.gen_range(1..len), 1.0)),
765                len,
766                0.8,
767            )),
768            8 => Box::new(generate_dictionary::<i64>(
769                // Cannot test dictionaries containing null values because of #2687
770                Box::new(generate_primitive_array::<i64>(rng.gen_range(1..len), 1.0)),
771                len,
772                0.8,
773            )),
774            _ => unreachable!(),
775        }
776    }
777
778    #[test]
779    #[cfg_attr(miri, ignore)]
780    fn fuzz_test() {
781        for _ in 0..100 {
782            let mut rng = thread_rng();
783            let num_columns = rng.gen_range(1..5);
784            let len = rng.gen_range(5..100);
785            let arrays: Vec<_> = (0..num_columns).map(|_| generate_column(len)).collect();
786
787            let options: Vec<_> = (0..num_columns)
788                .map(|_| SortOptions {
789                    descending: rng.gen_bool(0.5),
790                    nulls_first: rng.gen_bool(0.5),
791                })
792                .collect();
793
794            let comparators = arrays
795                .iter()
796                .zip(options.iter())
797                .map(|(a, o)| build_compare(&**a, *o).unwrap())
798                .collect::<Vec<_>>();
799
800            let columns = options
801                .into_iter()
802                .zip(&arrays)
803                .map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o))
804                .collect();
805
806            let mut converter = RowConverter::new(columns);
807            let rows = converter.convert_columns(&arrays).unwrap();
808            let cmp = |i, j| {
809                for cmp in comparators.iter() {
810                    let cmp = cmp(i, j);
811                    if cmp != Ordering::Equal {
812                        return cmp;
813                    }
814                }
815                Ordering::Equal
816            };
817
818            for i in 0..len {
819                for j in 0..len {
820                    let row_i = rows.row(i);
821                    let row_j = rows.row(j);
822                    let row_cmp = row_i.cmp(&row_j);
823                    let lex_cmp = cmp(i, j);
824                    assert_eq!(row_cmp, lex_cmp);
825                }
826            }
827        }
828    }
829}