Skip to main content

datafusion_common/utils/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module provides the bisect function, which implements binary search.
19
20pub(crate) mod aggregate;
21pub mod expr;
22pub mod memory;
23pub mod proxy;
24pub mod string_utils;
25
26use crate::assert_or_internal_err;
27use crate::error::{_exec_datafusion_err, _exec_err, _internal_datafusion_err};
28use crate::{Result, ScalarValue};
29use arrow::array::{
30    Array, ArrayRef, FixedSizeListArray, LargeListArray, ListArray, OffsetSizeTrait,
31    cast::AsArray,
32};
33use arrow::array::{
34    ArrowPrimitiveType, BooleanArray, Datum, GenericListArray, Int32Array, Int64Array,
35    MutableArrayData, PrimitiveArray, make_array,
36};
37use arrow::array::{LargeListViewArray, ListViewArray};
38use arrow::buffer::{OffsetBuffer, ScalarBuffer};
39use arrow::compute::kernels::cmp::eq;
40use arrow::compute::kernels::length::length;
41use arrow::compute::{SortColumn, SortOptions, partition};
42use arrow::datatypes::{
43    ArrowNativeType, DataType, Field, Int32Type, Int64Type, SchemaRef,
44};
45#[cfg(feature = "sql")]
46use sqlparser::{ast::Ident, dialect::GenericDialect, parser::Parser};
47use std::borrow::{Borrow, Cow};
48use std::cmp::{Ordering, min};
49use std::collections::HashSet;
50use std::iter::repeat_n;
51use std::num::NonZero;
52use std::ops::Range;
53use std::sync::{Arc, LazyLock};
54use std::thread::available_parallelism;
55
56/// Applies an optional projection to a [`SchemaRef`], returning the
57/// projected schema
58///
59/// Example:
60/// ```
61/// use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
62/// use datafusion_common::project_schema;
63///
64/// // Schema with columns 'a', 'b', and 'c'
65/// let schema = SchemaRef::new(Schema::new(vec![
66///     Field::new("a", DataType::Int32, true),
67///     Field::new("b", DataType::Int64, true),
68///     Field::new("c", DataType::Utf8, true),
69/// ]));
70///
71/// // Pick columns 'c' and 'b'
72/// let projection = Some(vec![2, 1]);
73/// let projected_schema = project_schema(&schema, projection.as_ref()).unwrap();
74///
75/// let expected_schema = SchemaRef::new(Schema::new(vec![
76///     Field::new("c", DataType::Utf8, true),
77///     Field::new("b", DataType::Int64, true),
78/// ]));
79///
80/// assert_eq!(projected_schema, expected_schema);
81/// ```
82pub fn project_schema(
83    schema: &SchemaRef,
84    projection: Option<&impl AsRef<[usize]>>,
85) -> Result<SchemaRef> {
86    let schema = match projection {
87        Some(columns) => Arc::new(schema.project(columns.as_ref())?),
88        None => Arc::clone(schema),
89    };
90    Ok(schema)
91}
92
93/// Extracts a row at the specified index from a set of columns and stores it in the provided buffer.
94pub fn extract_row_at_idx_to_buf(
95    columns: &[ArrayRef],
96    idx: usize,
97    buf: &mut Vec<ScalarValue>,
98) -> Result<()> {
99    buf.clear();
100
101    let iter = columns
102        .iter()
103        .map(|arr| ScalarValue::try_from_array(arr, idx));
104    for v in iter.into_iter() {
105        buf.push(v?);
106    }
107
108    Ok(())
109}
110/// Given column vectors, returns row at `idx`.
111pub fn get_row_at_idx(columns: &[ArrayRef], idx: usize) -> Result<Vec<ScalarValue>> {
112    columns
113        .iter()
114        .map(|arr| ScalarValue::try_from_array(arr, idx))
115        .collect()
116}
117
118/// This function compares two tuples depending on the given sort options.
119pub fn compare_rows(
120    x: &[ScalarValue],
121    y: &[ScalarValue],
122    sort_options: &[SortOptions],
123) -> Result<Ordering> {
124    let zip_it = x.iter().zip(y.iter()).zip(sort_options.iter());
125    // Preserving lexical ordering.
126    for ((lhs, rhs), sort_options) in zip_it {
127        // Consider all combinations of NULLS FIRST/LAST and ASC/DESC configurations.
128        let result = match (lhs.is_null(), rhs.is_null(), sort_options.nulls_first) {
129            (true, false, false) | (false, true, true) => Ordering::Greater,
130            (true, false, true) | (false, true, false) => Ordering::Less,
131            (false, false, _) => {
132                if sort_options.descending {
133                    rhs.try_cmp(lhs)?
134                } else {
135                    lhs.try_cmp(rhs)?
136                }
137            }
138            (true, true, _) => continue,
139        };
140        if result != Ordering::Equal {
141            return Ok(result);
142        }
143    }
144    Ok(Ordering::Equal)
145}
146
147/// This function searches for a tuple of given values (`target`) among the given
148/// rows (`item_columns`) using the bisection algorithm. It assumes that `item_columns`
149/// is sorted according to `sort_options` and returns the insertion index of `target`.
150/// Template argument `SIDE` being `true`/`false` means left/right insertion.
151pub fn bisect<const SIDE: bool>(
152    item_columns: &[ArrayRef],
153    target: &[ScalarValue],
154    sort_options: &[SortOptions],
155) -> Result<usize> {
156    let low: usize = 0;
157    let high: usize = item_columns
158        .first()
159        .ok_or_else(|| _internal_datafusion_err!("Column array shouldn't be empty"))?
160        .len();
161    let compare_fn = |current: &[ScalarValue], target: &[ScalarValue]| {
162        let cmp = compare_rows(current, target, sort_options)?;
163        Ok(if SIDE { cmp.is_lt() } else { cmp.is_le() })
164    };
165    find_bisect_point(item_columns, target, compare_fn, low, high)
166}
167
168/// This function searches for a tuple of given values (`target`) among a slice of
169/// the given rows (`item_columns`) using the bisection algorithm. The slice starts
170/// at the index `low` and ends at the index `high`. The boolean-valued function
171/// `compare_fn` specifies whether we bisect on the left (by returning `false`),
172/// or on the right (by returning `true`) when we compare the target value with
173/// the current value as we iteratively bisect the input.
174pub fn find_bisect_point<F>(
175    item_columns: &[ArrayRef],
176    target: &[ScalarValue],
177    compare_fn: F,
178    mut low: usize,
179    mut high: usize,
180) -> Result<usize>
181where
182    F: Fn(&[ScalarValue], &[ScalarValue]) -> Result<bool>,
183{
184    while low < high {
185        let mid = ((high - low) / 2) + low;
186        let val = get_row_at_idx(item_columns, mid)?;
187        if compare_fn(&val, target)? {
188            low = mid + 1;
189        } else {
190            high = mid;
191        }
192    }
193    Ok(low)
194}
195
196/// This function searches for a tuple of given values (`target`) among the given
197/// rows (`item_columns`) via a linear scan. It assumes that `item_columns` is sorted
198/// according to `sort_options` and returns the insertion index of `target`.
199/// Template argument `SIDE` being `true`/`false` means left/right insertion.
200pub fn linear_search<const SIDE: bool>(
201    item_columns: &[ArrayRef],
202    target: &[ScalarValue],
203    sort_options: &[SortOptions],
204) -> Result<usize> {
205    let low: usize = 0;
206    let high: usize = item_columns
207        .first()
208        .ok_or_else(|| _internal_datafusion_err!("Column array shouldn't be empty"))?
209        .len();
210    let compare_fn = |current: &[ScalarValue], target: &[ScalarValue]| {
211        let cmp = compare_rows(current, target, sort_options)?;
212        Ok(if SIDE { cmp.is_lt() } else { cmp.is_le() })
213    };
214    search_in_slice(item_columns, target, compare_fn, low, high)
215}
216
217/// This function searches for a tuple of given values (`target`) among a slice of
218/// the given rows (`item_columns`) via a linear scan. The slice starts at the index
219/// `low` and ends at the index `high`. The boolean-valued function `compare_fn`
220/// specifies the stopping criterion.
221pub fn search_in_slice<F>(
222    item_columns: &[ArrayRef],
223    target: &[ScalarValue],
224    compare_fn: F,
225    mut low: usize,
226    high: usize,
227) -> Result<usize>
228where
229    F: Fn(&[ScalarValue], &[ScalarValue]) -> Result<bool>,
230{
231    while low < high {
232        let val = get_row_at_idx(item_columns, low)?;
233        if !compare_fn(&val, target)? {
234            break;
235        }
236        low += 1;
237    }
238    Ok(low)
239}
240
241/// Given a list of 0 or more already sorted columns, finds the
242/// partition ranges that would partition equally across columns.
243///
244/// See [`partition`] for more details.
245pub fn evaluate_partition_ranges(
246    num_rows: usize,
247    partition_columns: &[SortColumn],
248) -> Result<Vec<Range<usize>>> {
249    Ok(if partition_columns.is_empty() {
250        vec![Range {
251            start: 0,
252            end: num_rows,
253        }]
254    } else {
255        let cols: Vec<_> = partition_columns
256            .iter()
257            .map(|x| Arc::clone(&x.values))
258            .collect();
259        partition(&cols)?.ranges()
260    })
261}
262
263/// Wraps identifier string in double quotes, escaping any double quotes in
264/// the identifier by replacing it with two double quotes
265///
266/// e.g. identifier `tab.le"name` becomes `"tab.le""name"`
267pub fn quote_identifier(s: &str) -> Cow<'_, str> {
268    if needs_quotes(s) {
269        Cow::Owned(format!("\"{}\"", s.replace('"', "\"\"")))
270    } else {
271        Cow::Borrowed(s)
272    }
273}
274
275/// returns true if this identifier needs quotes
276fn needs_quotes(s: &str) -> bool {
277    let mut chars = s.chars();
278
279    // first char can not be a number unless escaped
280    if let Some(first_char) = chars.next()
281        && !(first_char.is_ascii_lowercase() || first_char == '_')
282    {
283        return true;
284    }
285
286    !chars.all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_')
287}
288
289#[cfg(feature = "sql")]
290pub(crate) fn parse_identifiers(s: &str) -> Result<Vec<Ident>> {
291    let dialect = GenericDialect;
292    let mut parser = Parser::new(&dialect).try_with_sql(s)?;
293    let idents = parser.parse_multipart_identifier()?;
294    Ok(idents)
295}
296
297/// Parse a string into a vector of identifiers.
298///
299/// Note: If ignore_case is false, the string will be normalized to lowercase.
300#[cfg(feature = "sql")]
301pub(crate) fn parse_identifiers_normalized(s: &str, ignore_case: bool) -> Vec<String> {
302    parse_identifiers(s)
303        .unwrap_or_default()
304        .into_iter()
305        .map(|id| match id.quote_style {
306            Some(_) => id.value,
307            None if ignore_case => id.value,
308            _ => id.value.to_ascii_lowercase(),
309        })
310        .collect::<Vec<_>>()
311}
312
313#[cfg(not(feature = "sql"))]
314pub(crate) fn parse_identifiers(s: &str) -> Result<Vec<String>> {
315    let mut result = Vec::new();
316    let mut current = String::new();
317    let mut in_quotes = false;
318
319    for ch in s.chars() {
320        match ch {
321            '"' => {
322                in_quotes = !in_quotes;
323                current.push(ch);
324            }
325            '.' if !in_quotes => {
326                result.push(current.clone());
327                current.clear();
328            }
329            _ => {
330                current.push(ch);
331            }
332        }
333    }
334
335    // Push the last part if it's not empty
336    if !current.is_empty() {
337        result.push(current);
338    }
339
340    Ok(result)
341}
342
343#[cfg(not(feature = "sql"))]
344pub(crate) fn parse_identifiers_normalized(s: &str, ignore_case: bool) -> Vec<String> {
345    parse_identifiers(s)
346        .unwrap_or_default()
347        .into_iter()
348        .map(|id| {
349            let is_double_quoted = if id.len() > 2 {
350                let mut chars = id.chars();
351                chars.next() == Some('"') && chars.last() == Some('"')
352            } else {
353                false
354            };
355            if is_double_quoted {
356                id[1..id.len() - 1].to_string().replace("\"\"", "\"")
357            } else if ignore_case {
358                id
359            } else {
360                id.to_ascii_lowercase()
361            }
362        })
363        .collect::<Vec<_>>()
364}
365
366/// This function "takes" the elements at `indices` from the slice `items`.
367pub fn get_at_indices<T: Clone, I: Borrow<usize>>(
368    items: &[T],
369    indices: impl IntoIterator<Item = I>,
370) -> Result<Vec<T>> {
371    indices
372        .into_iter()
373        .map(|idx| items.get(*idx.borrow()).cloned())
374        .collect::<Option<Vec<T>>>()
375        .ok_or_else(|| {
376            _exec_datafusion_err!("Expects indices to be in the range of searched vector")
377        })
378}
379
380/// This function finds the longest prefix of the form 0, 1, 2, ... within the
381/// collection `sequence`. Examples:
382/// - For 0, 1, 2, 4, 5; we would produce 3, meaning 0, 1, 2 is the longest satisfying
383///   prefix.
384/// - For 1, 2, 3, 4; we would produce 0, meaning there is no such prefix.
385pub fn longest_consecutive_prefix<T: Borrow<usize>>(
386    sequence: impl IntoIterator<Item = T>,
387) -> usize {
388    let mut count = 0;
389    for item in sequence {
390        if !count.eq(item.borrow()) {
391            break;
392        }
393        count += 1;
394    }
395    count
396}
397
398/// Creates single element [`ListArray`], [`LargeListArray`] and
399/// [`FixedSizeListArray`] from other arrays
400///
401/// For example this builder can convert `[1, 2, 3]` into `[[1, 2, 3]]`
402///
403/// # Example
404/// ```
405/// # use std::sync::Arc;
406/// # use arrow::array::{Array, ListArray};
407/// # use arrow::array::types::Int64Type;
408/// # use datafusion_common::utils::SingleRowListArrayBuilder;
409/// // Array is [1, 2, 3]
410/// let arr = ListArray::from_iter_primitive::<Int64Type, _, _>(vec![Some(vec![
411///     Some(1),
412///     Some(2),
413///     Some(3),
414/// ])]);
415/// // Wrap as a list array: [[1, 2, 3]]
416/// let list_arr = SingleRowListArrayBuilder::new(Arc::new(arr)).build_list_array();
417/// assert_eq!(list_arr.len(), 1);
418/// ```
419#[derive(Debug, Clone)]
420pub struct SingleRowListArrayBuilder {
421    /// array to be wrapped
422    arr: ArrayRef,
423    /// Should the resulting array be nullable? Defaults to `true`.
424    nullable: bool,
425    /// Specify the field name for the resulting array. Defaults to value used in
426    /// [`Field::new_list_field`]
427    field_name: Option<String>,
428}
429
430impl SingleRowListArrayBuilder {
431    /// Create a new instance of [`SingleRowListArrayBuilder`]
432    pub fn new(arr: ArrayRef) -> Self {
433        Self {
434            arr,
435            nullable: true,
436            field_name: None,
437        }
438    }
439
440    /// Set the nullable flag
441    pub fn with_nullable(mut self, nullable: bool) -> Self {
442        self.nullable = nullable;
443        self
444    }
445
446    /// sets the field name for the resulting array
447    pub fn with_field_name(mut self, field_name: Option<String>) -> Self {
448        self.field_name = field_name;
449        self
450    }
451
452    /// Copies field name and nullable from the specified field
453    pub fn with_field(self, field: &Field) -> Self {
454        self.with_field_name(Some(field.name().to_owned()))
455            .with_nullable(field.is_nullable())
456    }
457
458    /// Build a single element [`ListArray`]
459    pub fn build_list_array(self) -> ListArray {
460        let (field, arr) = self.into_field_and_arr();
461        let offsets = OffsetBuffer::from_lengths([arr.len()]);
462        ListArray::new(field, offsets, arr, None)
463    }
464
465    /// Build a single element [`ListArray`] and wrap as [`ScalarValue::List`]
466    pub fn build_list_scalar(self) -> ScalarValue {
467        ScalarValue::List(Arc::new(self.build_list_array()))
468    }
469
470    /// Build a single element [`LargeListArray`]
471    pub fn build_large_list_array(self) -> LargeListArray {
472        let (field, arr) = self.into_field_and_arr();
473        let offsets = OffsetBuffer::from_lengths([arr.len()]);
474        LargeListArray::new(field, offsets, arr, None)
475    }
476
477    /// Build a single element [`LargeListArray`] and wrap as [`ScalarValue::LargeList`]
478    pub fn build_large_list_scalar(self) -> ScalarValue {
479        ScalarValue::LargeList(Arc::new(self.build_large_list_array()))
480    }
481
482    /// Build a single element [`FixedSizeListArray`]
483    pub fn build_fixed_size_list_array(self, list_size: usize) -> FixedSizeListArray {
484        let (field, arr) = self.into_field_and_arr();
485        FixedSizeListArray::new(field, list_size as i32, arr, None)
486    }
487
488    /// Build a single element [`FixedSizeListArray`] and wrap as [`ScalarValue::FixedSizeList`]
489    pub fn build_fixed_size_list_scalar(self, list_size: usize) -> ScalarValue {
490        ScalarValue::FixedSizeList(Arc::new(self.build_fixed_size_list_array(list_size)))
491    }
492
493    /// Build a single element [`ListViewArray`]
494    pub fn build_list_view_array(self) -> ListViewArray {
495        let (field, arr) = self.into_field_and_arr();
496        let offsets = ScalarBuffer::from(vec![0]);
497        let sizes = ScalarBuffer::from(vec![i32::try_from(arr.len()).expect(
498            "Trying to construct a ListView where element length exceeds i32::MAX",
499        )]);
500        ListViewArray::new(field, offsets, sizes, arr, None)
501    }
502
503    /// Build a single element [`ListViewArray`] and wrap as [`ScalarValue::ListView`]
504    pub fn build_list_view_scalar(self) -> ScalarValue {
505        ScalarValue::ListView(Arc::new(self.build_list_view_array()))
506    }
507
508    /// Build a single element [`LargeListViewArray`]
509    pub fn build_large_list_view_array(self) -> LargeListViewArray {
510        let (field, arr) = self.into_field_and_arr();
511        let offsets = ScalarBuffer::from(vec![0]);
512        let sizes = ScalarBuffer::from(vec![arr.len() as i64]);
513        LargeListViewArray::new(field, offsets, sizes, arr, None)
514    }
515
516    /// Build a single element [`LargeListViewArray`] and wrap as [`ScalarValue::LargeListView`]
517    pub fn build_large_list_view_scalar(self) -> ScalarValue {
518        ScalarValue::LargeListView(Arc::new(self.build_large_list_view_array()))
519    }
520
521    /// Helper function: convert this builder into a tuple of field and array
522    fn into_field_and_arr(self) -> (Arc<Field>, ArrayRef) {
523        let Self {
524            arr,
525            nullable,
526            field_name,
527        } = self;
528        let data_type = arr.data_type().to_owned();
529        let field = match field_name {
530            Some(name) => Field::new(name, data_type, nullable),
531            None => Field::new_list_field(data_type, nullable),
532        };
533        (Arc::new(field), arr)
534    }
535}
536
537/// Wrap arrays into a single element `ListArray`.
538///
539/// Example:
540/// ```
541/// use arrow::array::{Int32Array, ListArray, ArrayRef};
542/// use arrow::datatypes::{Int32Type, Field};
543/// use std::sync::Arc;
544///
545/// let arr1 = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
546/// let arr2 = Arc::new(Int32Array::from(vec![4, 5, 6])) as ArrayRef;
547///
548/// let list_arr = datafusion_common::utils::arrays_into_list_array([arr1, arr2]).unwrap();
549///
550/// let expected = ListArray::from_iter_primitive::<Int32Type, _, _>(
551///    vec![
552///     Some(vec![Some(1), Some(2), Some(3)]),
553///     Some(vec![Some(4), Some(5), Some(6)]),
554///    ]
555/// );
556///
557/// assert_eq!(list_arr, expected);
558/// ```
559pub fn arrays_into_list_array(
560    arr: impl IntoIterator<Item = ArrayRef>,
561) -> Result<ListArray> {
562    let arr = arr.into_iter().collect::<Vec<_>>();
563    assert_or_internal_err!(!arr.is_empty(), "Cannot wrap empty array into list array");
564
565    let lens = arr.iter().map(|x| x.len()).collect::<Vec<_>>();
566    // Assume data type is consistent
567    let data_type = arr[0].data_type().to_owned();
568    let values = arr.iter().map(|x| x.as_ref()).collect::<Vec<_>>();
569    Ok(ListArray::new(
570        Arc::new(Field::new_list_field(data_type, true)),
571        OffsetBuffer::from_lengths(lens),
572        arrow::compute::concat(values.as_slice())?,
573        None,
574    ))
575}
576
577/// Helper function to convert a ListArray into a vector of ArrayRefs.
578pub fn list_to_arrays<O: OffsetSizeTrait>(a: &ArrayRef) -> Vec<ArrayRef> {
579    a.as_list::<O>().iter().flatten().collect::<Vec<_>>()
580}
581
582/// Helper function to convert a FixedSizeListArray into a vector of ArrayRefs.
583pub fn fixed_size_list_to_arrays(a: &ArrayRef) -> Vec<ArrayRef> {
584    a.as_fixed_size_list().iter().flatten().collect::<Vec<_>>()
585}
586
587/// Get the base type of a data type.
588///
589/// Example
590/// ```
591/// use arrow::datatypes::{DataType, Field};
592/// use datafusion_common::utils::base_type;
593/// use std::sync::Arc;
594///
595/// let data_type =
596///     DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
597/// assert_eq!(base_type(&data_type), DataType::Int32);
598///
599/// let data_type = DataType::Int32;
600/// assert_eq!(base_type(&data_type), DataType::Int32);
601/// ```
602pub fn base_type(data_type: &DataType) -> DataType {
603    match data_type {
604        DataType::List(field)
605        | DataType::LargeList(field)
606        | DataType::ListView(field)
607        | DataType::LargeListView(field)
608        | DataType::FixedSizeList(field, _) => base_type(field.data_type()),
609        _ => data_type.to_owned(),
610    }
611}
612
613// TODO: Modify this to also allow specifying how listviews should be treated.
614//       For example if cast to List (default) or maintain as ListView (requires
615//       function to implement support for ListViews)
616//       https://github.com/apache/datafusion/issues/21777
617/// Information about how to coerce lists.
618#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
619pub enum ListCoercion {
620    /// [`DataType::FixedSizeList`] should be coerced to [`DataType::List`].
621    FixedSizedListToList,
622}
623
624/// A helper function to coerce base type in List.
625///
626/// Example
627/// ```
628/// use arrow::datatypes::{DataType, Field};
629/// use datafusion_common::utils::coerced_type_with_base_type_only;
630/// use std::sync::Arc;
631///
632/// let data_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
633/// let base_type = DataType::Float64;
634/// let coerced_type = coerced_type_with_base_type_only(&data_type, &base_type, None);
635/// assert_eq!(coerced_type, DataType::List(Arc::new(Field::new_list_field(DataType::Float64, true))));
636/// ```
637pub fn coerced_type_with_base_type_only(
638    data_type: &DataType,
639    base_type: &DataType,
640    array_coercion: Option<&ListCoercion>,
641) -> DataType {
642    match (data_type, array_coercion) {
643        (DataType::List(field), _)
644        | (DataType::FixedSizeList(field, _), Some(ListCoercion::FixedSizedListToList)) =>
645        {
646            let field_type = coerced_type_with_base_type_only(
647                field.data_type(),
648                base_type,
649                array_coercion,
650            );
651
652            DataType::List(Arc::new(Field::new(
653                field.name(),
654                field_type,
655                field.is_nullable(),
656            )))
657        }
658        (DataType::FixedSizeList(field, len), _) => {
659            let field_type = coerced_type_with_base_type_only(
660                field.data_type(),
661                base_type,
662                array_coercion,
663            );
664
665            DataType::FixedSizeList(
666                Arc::new(Field::new(field.name(), field_type, field.is_nullable())),
667                *len,
668            )
669        }
670        (DataType::ListView(field), _) => {
671            let field_type = coerced_type_with_base_type_only(
672                field.data_type(),
673                base_type,
674                array_coercion,
675            );
676
677            DataType::ListView(Arc::new(Field::new(
678                field.name(),
679                field_type,
680                field.is_nullable(),
681            )))
682        }
683        (DataType::LargeList(field), _) => {
684            let field_type = coerced_type_with_base_type_only(
685                field.data_type(),
686                base_type,
687                array_coercion,
688            );
689
690            DataType::LargeList(Arc::new(Field::new(
691                field.name(),
692                field_type,
693                field.is_nullable(),
694            )))
695        }
696        (DataType::LargeListView(field), _) => {
697            let field_type = coerced_type_with_base_type_only(
698                field.data_type(),
699                base_type,
700                array_coercion,
701            );
702
703            DataType::LargeListView(Arc::new(Field::new(
704                field.name(),
705                field_type,
706                field.is_nullable(),
707            )))
708        }
709
710        _ => base_type.clone(),
711    }
712}
713
714/// Recursively coerce and `FixedSizeList` elements to `List`
715pub fn coerced_fixed_size_list_to_list(data_type: &DataType) -> DataType {
716    match data_type {
717        DataType::List(field) | DataType::FixedSizeList(field, _) => {
718            let field_type = coerced_fixed_size_list_to_list(field.data_type());
719
720            DataType::List(Arc::new(Field::new(
721                field.name(),
722                field_type,
723                field.is_nullable(),
724            )))
725        }
726        DataType::ListView(field) => {
727            let field_type = coerced_fixed_size_list_to_list(field.data_type());
728
729            DataType::ListView(Arc::new(Field::new(
730                field.name(),
731                field_type,
732                field.is_nullable(),
733            )))
734        }
735        DataType::LargeList(field) => {
736            let field_type = coerced_fixed_size_list_to_list(field.data_type());
737
738            DataType::LargeList(Arc::new(Field::new(
739                field.name(),
740                field_type,
741                field.is_nullable(),
742            )))
743        }
744        DataType::LargeListView(field) => {
745            let field_type = coerced_fixed_size_list_to_list(field.data_type());
746
747            DataType::LargeListView(Arc::new(Field::new(
748                field.name(),
749                field_type,
750                field.is_nullable(),
751            )))
752        }
753
754        _ => data_type.clone(),
755    }
756}
757
758/// Compute the number of dimensions in a list data type.
759pub fn list_ndims(data_type: &DataType) -> u64 {
760    match data_type {
761        DataType::List(field)
762        | DataType::LargeList(field)
763        | DataType::ListView(field)
764        | DataType::LargeListView(field)
765        | DataType::FixedSizeList(field, _) => 1 + list_ndims(field.data_type()),
766        _ => 0,
767    }
768}
769
770/// Adopted from strsim-rs for string similarity metrics
771pub mod datafusion_strsim {
772    // Source: https://github.com/dguo/strsim-rs/blob/master/src/lib.rs
773    // License: https://github.com/dguo/strsim-rs/blob/master/LICENSE
774    use std::cmp::min;
775    use std::str::Chars;
776
777    struct StringWrapper<'a>(&'a str);
778
779    impl<'b> IntoIterator for &StringWrapper<'b> {
780        type Item = char;
781        type IntoIter = Chars<'b>;
782
783        fn into_iter(self) -> Self::IntoIter {
784            self.0.chars()
785        }
786    }
787
788    /// Calculates the minimum number of insertions, deletions, and substitutions
789    /// required to change one sequence into the other, using a reusable cache buffer.
790    ///
791    /// This is the generic implementation that works with any iterator types.
792    /// The `cache` buffer will be resized as needed and reused across calls.
793    fn generic_levenshtein_with_buffer<'a, 'b, Iter1, Iter2, Elem1, Elem2>(
794        a: &'a Iter1,
795        b: &'b Iter2,
796        cache: &mut Vec<usize>,
797    ) -> usize
798    where
799        &'a Iter1: IntoIterator<Item = Elem1>,
800        &'b Iter2: IntoIterator<Item = Elem2>,
801        Elem1: PartialEq<Elem2>,
802    {
803        let b_len = b.into_iter().count();
804
805        if a.into_iter().next().is_none() {
806            return b_len;
807        }
808
809        // Resize cache to fit b_len elements
810        cache.clear();
811        cache.extend(1..=b_len);
812
813        let mut result = 0;
814
815        for (i, a_elem) in a.into_iter().enumerate() {
816            result = i + 1;
817            let mut distance_b = i;
818
819            for (j, b_elem) in b.into_iter().enumerate() {
820                let cost = if a_elem == b_elem { 0usize } else { 1usize };
821                let distance_a = distance_b + cost;
822                distance_b = cache[j];
823                result = min(result + 1, min(distance_a, distance_b + 1));
824                cache[j] = result;
825            }
826        }
827
828        result
829    }
830
831    /// Calculates the minimum number of insertions, deletions, and substitutions
832    /// required to change one sequence into the other.
833    fn generic_levenshtein<'a, 'b, Iter1, Iter2, Elem1, Elem2>(
834        a: &'a Iter1,
835        b: &'b Iter2,
836    ) -> usize
837    where
838        &'a Iter1: IntoIterator<Item = Elem1>,
839        &'b Iter2: IntoIterator<Item = Elem2>,
840        Elem1: PartialEq<Elem2>,
841    {
842        let mut cache = Vec::new();
843        generic_levenshtein_with_buffer(a, b, &mut cache)
844    }
845
846    /// Calculates the minimum number of insertions, deletions, and substitutions
847    /// required to change one string into the other.
848    ///
849    /// ```
850    /// use datafusion_common::utils::datafusion_strsim::levenshtein;
851    ///
852    /// assert_eq!(3, levenshtein("kitten", "sitting"));
853    /// ```
854    pub fn levenshtein(a: &str, b: &str) -> usize {
855        generic_levenshtein(&StringWrapper(a), &StringWrapper(b))
856    }
857
858    /// Calculates the Levenshtein distance using a reusable cache buffer.
859    /// This avoids allocating a new Vec for each call, improving performance
860    /// when computing many distances.
861    ///
862    /// The `cache` buffer will be resized as needed and reused across calls.
863    pub fn levenshtein_with_buffer(a: &str, b: &str, cache: &mut Vec<usize>) -> usize {
864        generic_levenshtein_with_buffer(&StringWrapper(a), &StringWrapper(b), cache)
865    }
866
867    /// Calculates the normalized Levenshtein distance between two strings.
868    /// The normalized distance is a value between 0.0 and 1.0, where 1.0 indicates
869    /// that the strings are identical and 0.0 indicates no similarity.
870    ///
871    /// ```
872    /// use datafusion_common::utils::datafusion_strsim::normalized_levenshtein;
873    ///
874    /// assert!((normalized_levenshtein("kitten", "sitting") - 0.57142).abs() < 0.00001);
875    ///
876    /// assert!(normalized_levenshtein("", "second").abs() < 0.00001);
877    ///
878    /// assert!((normalized_levenshtein("kitten", "sitten") - 0.833).abs() < 0.001);
879    /// ```
880    pub fn normalized_levenshtein(a: &str, b: &str) -> f64 {
881        if a.is_empty() && b.is_empty() {
882            return 1.0;
883        }
884        1.0 - (levenshtein(a, b) as f64)
885            / (a.chars().count().max(b.chars().count()) as f64)
886    }
887}
888
889/// Merges collections `first` and `second`, removes duplicates and sorts the
890/// result, returning it as a [`Vec`].
891pub fn merge_and_order_indices<T: Borrow<usize>, S: Borrow<usize>>(
892    first: impl IntoIterator<Item = T>,
893    second: impl IntoIterator<Item = S>,
894) -> Vec<usize> {
895    let mut result: Vec<_> = first
896        .into_iter()
897        .map(|e| *e.borrow())
898        .chain(second.into_iter().map(|e| *e.borrow()))
899        .collect::<HashSet<_>>()
900        .into_iter()
901        .collect();
902    result.sort();
903    result
904}
905
906/// Calculates the set difference between sequences `first` and `second`,
907/// returning the result as a [`Vec`]. Preserves the ordering of `first`.
908pub fn set_difference<T: Borrow<usize>, S: Borrow<usize>>(
909    first: impl IntoIterator<Item = T>,
910    second: impl IntoIterator<Item = S>,
911) -> Vec<usize> {
912    let set: HashSet<_> = second.into_iter().map(|e| *e.borrow()).collect();
913    first
914        .into_iter()
915        .map(|e| *e.borrow())
916        .filter(|e| !set.contains(e))
917        .collect()
918}
919
920/// Find indices of each element in `targets` inside `items`. If one of the
921/// elements is absent in `items`, returns an error.
922pub fn find_indices<T: PartialEq, S: Borrow<T>>(
923    items: &[T],
924    targets: impl IntoIterator<Item = S>,
925) -> Result<Vec<usize>> {
926    targets
927        .into_iter()
928        .map(|target| items.iter().position(|e| target.borrow().eq(e)))
929        .collect::<Option<_>>()
930        .ok_or_else(|| _exec_datafusion_err!("Target not found"))
931}
932
933/// Transposes the given vector of vectors.
934pub fn transpose<T>(original: Vec<Vec<T>>) -> Vec<Vec<T>> {
935    match original.as_slice() {
936        [] => vec![],
937        [first, ..] => {
938            let mut result = (0..first.len()).map(|_| vec![]).collect::<Vec<_>>();
939            for row in original {
940                for (item, transposed_row) in row.into_iter().zip(&mut result) {
941                    transposed_row.push(item);
942                }
943            }
944            result
945        }
946    }
947}
948
949/// Computes the `skip` and `fetch` parameters of a single limit that would be
950/// equivalent to two consecutive limits with the given `skip`/`fetch` parameters.
951///
952/// There are multiple cases to consider:
953///
954/// # Case 0: Parent and child are disjoint (`child_fetch <= skip`).
955///
956/// ```text
957///   Before merging:
958///                     |........skip........|---fetch-->|     Parent limit
959///    |...child_skip...|---child_fetch-->|                    Child limit
960/// ```
961///
962///   After merging:
963/// ```text
964///    |.........(child_skip + skip).........|
965/// ```
966///
967/// # Case 1: Parent is beyond child's range (`skip < child_fetch <= skip + fetch`).
968///
969///   Before merging:
970/// ```text
971///                     |...skip...|------------fetch------------>|   Parent limit
972///    |...child_skip...|-------------child_fetch------------>|       Child limit
973/// ```
974///
975///   After merging:
976/// ```text
977///    |....(child_skip + skip)....|---(child_fetch - skip)-->|
978/// ```
979///
980///  # Case 2: Parent is within child's range (`skip + fetch < child_fetch`).
981///
982///   Before merging:
983/// ```text
984///                     |...skip...|---fetch-->|                   Parent limit
985///    |...child_skip...|-------------child_fetch------------>|    Child limit
986/// ```
987///
988///   After merging:
989/// ```text
990///    |....(child_skip + skip)....|---fetch-->|
991/// ```
992pub fn combine_limit(
993    parent_skip: usize,
994    parent_fetch: Option<usize>,
995    child_skip: usize,
996    child_fetch: Option<usize>,
997) -> (usize, Option<usize>) {
998    let combined_skip = child_skip.saturating_add(parent_skip);
999
1000    let combined_fetch = match (parent_fetch, child_fetch) {
1001        (Some(parent_fetch), Some(child_fetch)) => {
1002            Some(min(parent_fetch, child_fetch.saturating_sub(parent_skip)))
1003        }
1004        (Some(parent_fetch), None) => Some(parent_fetch),
1005        (None, Some(child_fetch)) => Some(child_fetch.saturating_sub(parent_skip)),
1006        (None, None) => None,
1007    };
1008
1009    (combined_skip, combined_fetch)
1010}
1011
1012/// Returns the estimated number of threads available for parallel execution.
1013///
1014/// This is a wrapper around `std::thread::available_parallelism`, providing a default value
1015/// of `1` if the system's parallelism cannot be determined.
1016///
1017/// The result is cached after the first call.
1018pub fn get_available_parallelism() -> usize {
1019    static PARALLELISM: LazyLock<usize> = LazyLock::new(|| {
1020        available_parallelism()
1021            .unwrap_or(NonZero::new(1).expect("literal value `1` shouldn't be zero"))
1022            .get()
1023    });
1024    *PARALLELISM
1025}
1026
1027/// Converts a collection of function arguments into a fixed-size array of length N
1028/// producing a reasonable error message in case of unexpected number of arguments.
1029///
1030/// # Example
1031/// ```
1032/// # use datafusion_common::Result;
1033/// # use datafusion_common::utils::take_function_args;
1034/// # use datafusion_common::ScalarValue;
1035/// fn my_function(args: &[ScalarValue]) -> Result<()> {
1036///     // function expects 2 args, so create a 2-element array
1037///     let [arg1, arg2] = take_function_args("my_function", args)?;
1038///     // ... do stuff..
1039///     Ok(())
1040/// }
1041///
1042/// // Calling the function with 1 argument produces an error:
1043/// let args = vec![ScalarValue::Int32(Some(10))];
1044/// let err = my_function(&args).unwrap_err();
1045/// assert_eq!(
1046///     err.to_string(),
1047///     "Execution error: my_function function requires 2 arguments, got 1"
1048/// );
1049/// // Calling the function with 2 arguments works great
1050/// let args = vec![ScalarValue::Int32(Some(10)), ScalarValue::Int32(Some(20))];
1051/// my_function(&args).unwrap();
1052/// ```
1053pub fn take_function_args<const N: usize, T>(
1054    function_name: &str,
1055    args: impl IntoIterator<Item = T>,
1056) -> Result<[T; N]> {
1057    let args = args.into_iter().collect::<Vec<_>>();
1058    args.try_into().map_err(|v: Vec<T>| {
1059        _exec_datafusion_err!(
1060            "{} function requires {} {}, got {}",
1061            function_name,
1062            N,
1063            if N == 1 { "argument" } else { "arguments" },
1064            v.len()
1065        )
1066    })
1067}
1068
1069/// Returns the inner values of a list, or an error otherwise
1070/// For [`ListArray`] and [`LargeListArray`], if it's sliced, it returns a
1071/// sliced array too. Therefore, too reconstruct a list using it,
1072/// you must adjust the offsets using [`adjust_offsets_for_slice`]
1073pub fn list_values(array: &dyn Array) -> Result<ArrayRef> {
1074    match array.data_type() {
1075        DataType::List(_) => Ok(sliced_list_values(array.as_list::<i32>())),
1076        DataType::LargeList(_) => Ok(sliced_list_values(array.as_list::<i64>())),
1077        DataType::FixedSizeList(_, _) => {
1078            Ok(Arc::clone(array.as_fixed_size_list().values()))
1079        }
1080        other => _exec_err!("expected list, got {other}"),
1081    }
1082}
1083
1084fn sliced_list_values<O: OffsetSizeTrait>(list: &GenericListArray<O>) -> ArrayRef {
1085    let values = list.values();
1086    let offsets = list.offsets();
1087
1088    if let (Some(first), Some(last)) = (offsets.first(), offsets.last()) {
1089        let first = first.as_usize();
1090        let last = last.as_usize();
1091
1092        if first != 0 || last != values.len() {
1093            return values.slice(first, last - first);
1094        }
1095    }
1096
1097    Arc::clone(values)
1098}
1099
1100/// If `list` is sliced, returns an adjusted offset buffer so that
1101/// it points to the sliced portion of the list values, and not the whole list values
1102pub fn adjust_offsets_for_slice<O: OffsetSizeTrait>(
1103    list: &GenericListArray<O>,
1104) -> OffsetBuffer<O> {
1105    let offsets = list.offsets();
1106
1107    if let (Some(first), Some(last)) = (offsets.first(), offsets.last())
1108        && (!first.is_zero() || last.as_usize() != list.values().len())
1109    {
1110        let offsets = offsets.iter().map(|offset| *offset - *first).collect();
1111
1112        //todo: use unsafe Offset::new_unchecked?
1113        return OffsetBuffer::new(offsets);
1114    }
1115
1116    offsets.clone()
1117}
1118
1119/// For lists and large lists, truncates the sublist of null values
1120/// Otherwise returns an error
1121pub fn remove_list_null_values(array: &ArrayRef) -> Result<ArrayRef> {
1122    // todo: handle list view and map
1123    match array.data_type() {
1124        DataType::List(_) => Ok(Arc::new(truncate_list_nulls(array.as_list::<i32>())?)),
1125        DataType::LargeList(_) => {
1126            Ok(Arc::new(truncate_list_nulls(array.as_list::<i64>())?))
1127        }
1128        dt => _exec_err!("expected List or LargeList, got {dt}"),
1129    }
1130}
1131
1132/// Create a new list array where all the nulls point to empty lists
1133fn truncate_list_nulls<O: OffsetSizeTrait>(
1134    list: &GenericListArray<O>,
1135) -> Result<GenericListArray<O>> {
1136    if let Some(nulls) = list.nulls()
1137        && nulls.null_count() > 0
1138    {
1139        let lengths = length(list)?;
1140        let zero: &dyn Datum = if lengths.data_type() == &DataType::Int32 {
1141            &Int32Array::new_scalar(0)
1142        } else {
1143            &Int64Array::new_scalar(0)
1144        };
1145
1146        let (mut valid_or_empty, _nulls) = eq(&lengths, zero)?.into_parts();
1147        valid_or_empty |= nulls.inner();
1148        let valid_or_empty = BooleanArray::from(valid_or_empty);
1149
1150        if valid_or_empty.has_false() {
1151            let array_data = list.values().to_data();
1152            let offsets = list.offsets();
1153            let capacity = offsets[offsets.len() - 1] - offsets[0];
1154            let mut mutable_array_data =
1155                MutableArrayData::new(vec![&array_data], false, capacity.as_usize());
1156
1157            let (valid_or_empty, _nulls) = valid_or_empty.into_parts();
1158
1159            for (start, end) in valid_or_empty.set_slices() {
1160                mutable_array_data.extend(
1161                    0,
1162                    offsets[start].as_usize(),
1163                    offsets[end].as_usize(),
1164                );
1165            }
1166
1167            let lengths = std::iter::zip(offsets.lengths(), nulls)
1168                .map(|(length, is_valid)| if is_valid { length } else { 0 });
1169
1170            let offsets = OffsetBuffer::from_lengths(lengths);
1171            let values = make_array(mutable_array_data.freeze());
1172
1173            let field = match list.data_type() {
1174                DataType::List(field) => field,
1175                DataType::LargeList(field) => field,
1176                _ => unreachable!(),
1177            };
1178
1179            return Ok(GenericListArray::try_new(
1180                Arc::clone(field),
1181                offsets,
1182                values,
1183                list.nulls().cloned(),
1184            )?);
1185        }
1186    }
1187    Ok(list.clone())
1188}
1189
1190/// If `array` is a list or a map, returns a new array of the same length as it's inner values
1191/// where each value is the 1-based index of the sublist it's contained. Example:
1192///
1193/// `[[1], [2, 3], [4, 5, 6]] =>  [1, 2, 2, 3, 3, 3]`
1194///
1195/// Otherwise returns an error
1196pub fn list_values_row_number(array: &dyn Array) -> Result<ArrayRef> {
1197    match array.data_type() {
1198        DataType::List(_) => Ok(Arc::new(variable_size_list_values_row_number::<
1199            Int32Type,
1200        >(array.as_list().offsets()))),
1201        DataType::LargeList(_) => Ok(Arc::new(variable_size_list_values_row_number::<
1202            Int64Type,
1203        >(array.as_list().offsets()))),
1204        DataType::ListView(_) => Ok(Arc::new(variable_size_list_values_row_number::<
1205            Int32Type,
1206        >(array.as_list_view().offsets()))),
1207        DataType::LargeListView(_) => {
1208            Ok(Arc::new(variable_size_list_values_row_number::<Int64Type>(
1209                array.as_list_view().offsets(),
1210            )))
1211        }
1212        DataType::FixedSizeList(_, _) => {
1213            let fixed_size_list = array.as_fixed_size_list();
1214
1215            Ok(Arc::new(fsl_values_row_number(
1216                fixed_size_list.value_length(),
1217                fixed_size_list.len(),
1218            )?))
1219        }
1220        DataType::Map(_, _) => Ok(Arc::new(variable_size_list_values_row_number::<
1221            Int32Type,
1222        >(array.as_map().offsets()))),
1223        other => _exec_err!("expected list, got {other}"),
1224    }
1225}
1226
1227/// [0, 2, 2, 5, 6] -> [0, 0, 2, 2, 2, 3]
1228fn variable_size_list_values_row_number<T: ArrowPrimitiveType>(
1229    offsets: &[T::Native],
1230) -> PrimitiveArray<T> {
1231    let mut rows_number = Vec::with_capacity(
1232        offsets[offsets.len() - 1].to_usize().unwrap() - offsets[0].to_usize().unwrap(),
1233    );
1234
1235    for (i, w) in offsets.windows(2).enumerate() {
1236        let len = w[1].as_usize() - w[0].as_usize();
1237        rows_number.extend(repeat_n(T::Native::usize_as(i), len));
1238    }
1239
1240    PrimitiveArray::new(rows_number.into(), None)
1241}
1242
1243/// (2, 3) -> [0, 0, 1, 1, 2, 2]
1244fn fsl_values_row_number(list_size: i32, array_len: usize) -> Result<Int32Array> {
1245    let list_size = list_size.to_usize().ok_or_else(|| {
1246        _exec_datafusion_err!("fsl_values_index: invalid list_size {list_size}")
1247    })?;
1248
1249    let mut rows_number = Vec::with_capacity(list_size * array_len);
1250
1251    for i in 0..array_len {
1252        rows_number.extend(repeat_n(i as i32, list_size));
1253    }
1254
1255    Ok(PrimitiveArray::new(rows_number.into(), None))
1256}
1257
1258#[cfg(test)]
1259mod tests {
1260    use std::sync::Arc;
1261
1262    use super::*;
1263    use crate::ScalarValue::Null;
1264    use arrow::{
1265        array::{Float64Array, Int32Array},
1266        buffer::NullBuffer,
1267        datatypes::Int32Type,
1268    };
1269    use sqlparser::ast::Ident;
1270
1271    #[test]
1272    fn test_bisect_linear_left_and_right() -> Result<()> {
1273        let arrays: Vec<ArrayRef> = vec![
1274            Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 9., 10.])),
1275            Arc::new(Float64Array::from(vec![2.0, 3.0, 3.0, 4.0, 5.0])),
1276            Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 10., 11.0])),
1277            Arc::new(Float64Array::from(vec![15.0, 13.0, 8.0, 5., 0.0])),
1278        ];
1279        let search_tuple: Vec<ScalarValue> = vec![
1280            ScalarValue::Float64(Some(8.0)),
1281            ScalarValue::Float64(Some(3.0)),
1282            ScalarValue::Float64(Some(8.0)),
1283            ScalarValue::Float64(Some(8.0)),
1284        ];
1285        let ords = [
1286            SortOptions {
1287                descending: false,
1288                nulls_first: true,
1289            },
1290            SortOptions {
1291                descending: false,
1292                nulls_first: true,
1293            },
1294            SortOptions {
1295                descending: false,
1296                nulls_first: true,
1297            },
1298            SortOptions {
1299                descending: true,
1300                nulls_first: true,
1301            },
1302        ];
1303        let res = bisect::<true>(&arrays, &search_tuple, &ords)?;
1304        assert_eq!(res, 2);
1305        let res = bisect::<false>(&arrays, &search_tuple, &ords)?;
1306        assert_eq!(res, 3);
1307        let res = linear_search::<true>(&arrays, &search_tuple, &ords)?;
1308        assert_eq!(res, 2);
1309        let res = linear_search::<false>(&arrays, &search_tuple, &ords)?;
1310        assert_eq!(res, 3);
1311        Ok(())
1312    }
1313
1314    #[test]
1315    fn vector_ord() {
1316        assert!(vec![1, 0, 0, 0, 0, 0, 0, 1] < vec![1, 0, 0, 0, 0, 0, 0, 2]);
1317        assert!(vec![1, 0, 0, 0, 0, 0, 1, 1] > vec![1, 0, 0, 0, 0, 0, 0, 2]);
1318        assert!(
1319            vec![
1320                ScalarValue::Int32(Some(2)),
1321                Null,
1322                ScalarValue::Int32(Some(0)),
1323            ] < vec![
1324                ScalarValue::Int32(Some(2)),
1325                Null,
1326                ScalarValue::Int32(Some(1)),
1327            ]
1328        );
1329        assert!(
1330            vec![
1331                ScalarValue::Int32(Some(2)),
1332                ScalarValue::Int32(None),
1333                ScalarValue::Int32(Some(0)),
1334            ] < vec![
1335                ScalarValue::Int32(Some(2)),
1336                ScalarValue::Int32(None),
1337                ScalarValue::Int32(Some(1)),
1338            ]
1339        );
1340    }
1341
1342    #[test]
1343    fn ord_same_type() {
1344        assert!((ScalarValue::Int32(Some(2)) < ScalarValue::Int32(Some(3))));
1345    }
1346
1347    #[test]
1348    fn test_bisect_linear_left_and_right_diff_sort() -> Result<()> {
1349        // Descending, left
1350        let arrays: Vec<ArrayRef> =
1351            vec![Arc::new(Float64Array::from(vec![4.0, 3.0, 2.0, 1.0, 0.0]))];
1352        let search_tuple: Vec<ScalarValue> = vec![ScalarValue::Float64(Some(4.0))];
1353        let ords = [SortOptions {
1354            descending: true,
1355            nulls_first: true,
1356        }];
1357        let res = bisect::<true>(&arrays, &search_tuple, &ords)?;
1358        assert_eq!(res, 0);
1359        let res = linear_search::<true>(&arrays, &search_tuple, &ords)?;
1360        assert_eq!(res, 0);
1361
1362        // Descending, right
1363        let arrays: Vec<ArrayRef> =
1364            vec![Arc::new(Float64Array::from(vec![4.0, 3.0, 2.0, 1.0, 0.0]))];
1365        let search_tuple: Vec<ScalarValue> = vec![ScalarValue::Float64(Some(4.0))];
1366        let ords = [SortOptions {
1367            descending: true,
1368            nulls_first: true,
1369        }];
1370        let res = bisect::<false>(&arrays, &search_tuple, &ords)?;
1371        assert_eq!(res, 1);
1372        let res = linear_search::<false>(&arrays, &search_tuple, &ords)?;
1373        assert_eq!(res, 1);
1374
1375        // Ascending, left
1376        let arrays: Vec<ArrayRef> =
1377            vec![Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 9., 10.]))];
1378        let search_tuple: Vec<ScalarValue> = vec![ScalarValue::Float64(Some(7.0))];
1379        let ords = [SortOptions {
1380            descending: false,
1381            nulls_first: true,
1382        }];
1383        let res = bisect::<true>(&arrays, &search_tuple, &ords)?;
1384        assert_eq!(res, 1);
1385        let res = linear_search::<true>(&arrays, &search_tuple, &ords)?;
1386        assert_eq!(res, 1);
1387
1388        // Ascending, right
1389        let arrays: Vec<ArrayRef> =
1390            vec![Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 9., 10.]))];
1391        let search_tuple: Vec<ScalarValue> = vec![ScalarValue::Float64(Some(7.0))];
1392        let ords = [SortOptions {
1393            descending: false,
1394            nulls_first: true,
1395        }];
1396        let res = bisect::<false>(&arrays, &search_tuple, &ords)?;
1397        assert_eq!(res, 2);
1398        let res = linear_search::<false>(&arrays, &search_tuple, &ords)?;
1399        assert_eq!(res, 2);
1400
1401        let arrays: Vec<ArrayRef> = vec![
1402            Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 8.0, 9., 10.])),
1403            Arc::new(Float64Array::from(vec![10.0, 9.0, 8.0, 7.5, 7., 6.])),
1404        ];
1405        let search_tuple: Vec<ScalarValue> = vec![
1406            ScalarValue::Float64(Some(8.0)),
1407            ScalarValue::Float64(Some(8.0)),
1408        ];
1409        let ords = [
1410            SortOptions {
1411                descending: false,
1412                nulls_first: true,
1413            },
1414            SortOptions {
1415                descending: true,
1416                nulls_first: true,
1417            },
1418        ];
1419        let res = bisect::<false>(&arrays, &search_tuple, &ords)?;
1420        assert_eq!(res, 3);
1421        let res = linear_search::<false>(&arrays, &search_tuple, &ords)?;
1422        assert_eq!(res, 3);
1423
1424        let res = bisect::<true>(&arrays, &search_tuple, &ords)?;
1425        assert_eq!(res, 2);
1426        let res = linear_search::<true>(&arrays, &search_tuple, &ords)?;
1427        assert_eq!(res, 2);
1428        Ok(())
1429    }
1430
1431    #[test]
1432    fn test_evaluate_partition_ranges() -> Result<()> {
1433        let arrays: Vec<ArrayRef> = vec![
1434            Arc::new(Float64Array::from(vec![1.0, 1.0, 1.0, 2.0, 2.0, 2.0])),
1435            Arc::new(Float64Array::from(vec![4.0, 4.0, 3.0, 2.0, 1.0, 1.0])),
1436        ];
1437        let n_row = arrays[0].len();
1438        let options: Vec<SortOptions> = vec![
1439            SortOptions {
1440                descending: false,
1441                nulls_first: false,
1442            },
1443            SortOptions {
1444                descending: true,
1445                nulls_first: false,
1446            },
1447        ];
1448        let sort_columns = arrays
1449            .into_iter()
1450            .zip(options)
1451            .map(|(values, options)| SortColumn {
1452                values,
1453                options: Some(options),
1454            })
1455            .collect::<Vec<_>>();
1456        let ranges = evaluate_partition_ranges(n_row, &sort_columns)?;
1457        assert_eq!(ranges.len(), 4);
1458        assert_eq!(ranges[0], Range { start: 0, end: 2 });
1459        assert_eq!(ranges[1], Range { start: 2, end: 3 });
1460        assert_eq!(ranges[2], Range { start: 3, end: 4 });
1461        assert_eq!(ranges[3], Range { start: 4, end: 6 });
1462        Ok(())
1463    }
1464
1465    #[cfg(feature = "sql")]
1466    #[test]
1467    fn test_quote_identifier() -> Result<()> {
1468        let cases = vec![
1469            ("foo", r#"foo"#),
1470            ("_foo", r#"_foo"#),
1471            ("foo_bar", r#"foo_bar"#),
1472            ("foo-bar", r#""foo-bar""#),
1473            // name itself has a period, needs to be quoted
1474            ("foo.bar", r#""foo.bar""#),
1475            ("Foo", r#""Foo""#),
1476            ("Foo.Bar", r#""Foo.Bar""#),
1477            // name starting with a number needs to be quoted
1478            ("test1", r#"test1"#),
1479            ("1test", r#""1test""#),
1480        ];
1481
1482        for (identifier, quoted_identifier) in cases {
1483            println!("input: \n{identifier}\nquoted_identifier:\n{quoted_identifier}");
1484
1485            assert_eq!(quote_identifier(identifier), quoted_identifier);
1486
1487            // When parsing the quoted identifier, it should be a
1488            // a single identifier without normalization, and not in multiple parts
1489            let quote_style = if quoted_identifier.starts_with('"') {
1490                Some('"')
1491            } else {
1492                None
1493            };
1494
1495            let expected_parsed = vec![Ident {
1496                value: identifier.to_string(),
1497                quote_style,
1498                span: sqlparser::tokenizer::Span::empty(),
1499            }];
1500
1501            assert_eq!(
1502                parse_identifiers(quoted_identifier).unwrap(),
1503                expected_parsed
1504            );
1505        }
1506
1507        Ok(())
1508    }
1509
1510    #[test]
1511    fn test_get_at_indices() -> Result<()> {
1512        let in_vec = vec![1, 2, 3, 4, 5, 6, 7];
1513        assert_eq!(get_at_indices(&in_vec, [0, 2])?, vec![1, 3]);
1514        assert_eq!(get_at_indices(&in_vec, [4, 2])?, vec![5, 3]);
1515        // 7 is outside the range
1516        assert!(get_at_indices(&in_vec, [7]).is_err());
1517        Ok(())
1518    }
1519
1520    #[test]
1521    fn test_longest_consecutive_prefix() {
1522        assert_eq!(longest_consecutive_prefix([0, 3, 4]), 1);
1523        assert_eq!(longest_consecutive_prefix([0, 1, 3, 4]), 2);
1524        assert_eq!(longest_consecutive_prefix([0, 1, 2, 3, 4]), 5);
1525        assert_eq!(longest_consecutive_prefix([1, 2, 3, 4]), 0);
1526    }
1527
1528    #[test]
1529    fn test_merge_and_order_indices() {
1530        assert_eq!(
1531            merge_and_order_indices([0, 3, 4], [1, 3, 5]),
1532            vec![0, 1, 3, 4, 5]
1533        );
1534        // Result should be ordered, even if inputs are not
1535        assert_eq!(
1536            merge_and_order_indices([3, 0, 4], [5, 1, 3]),
1537            vec![0, 1, 3, 4, 5]
1538        );
1539    }
1540
1541    #[test]
1542    fn test_set_difference() {
1543        assert_eq!(set_difference([0, 3, 4], [1, 2]), vec![0, 3, 4]);
1544        assert_eq!(set_difference([0, 3, 4], [1, 2, 4]), vec![0, 3]);
1545        // return value should have same ordering with the in1
1546        assert_eq!(set_difference([3, 4, 0], [1, 2, 4]), vec![3, 0]);
1547        assert_eq!(set_difference([0, 3, 4], [4, 1, 2]), vec![0, 3]);
1548        assert_eq!(set_difference([3, 4, 0], [4, 1, 2]), vec![3, 0]);
1549    }
1550
1551    #[test]
1552    fn test_find_indices() -> Result<()> {
1553        assert_eq!(find_indices(&[0, 3, 4], [0, 3, 4])?, vec![0, 1, 2]);
1554        assert_eq!(find_indices(&[0, 3, 4], [0, 4, 3])?, vec![0, 2, 1]);
1555        assert_eq!(find_indices(&[3, 0, 4], [0, 3])?, vec![1, 0]);
1556        assert!(find_indices(&[0, 3], [0, 3, 4]).is_err());
1557        assert!(find_indices(&[0, 3, 4], [0, 2]).is_err());
1558        Ok(())
1559    }
1560
1561    #[test]
1562    fn test_transpose() -> Result<()> {
1563        let in_data = vec![vec![1, 2, 3], vec![4, 5, 6]];
1564        let transposed = transpose(in_data);
1565        let expected = vec![vec![1, 4], vec![2, 5], vec![3, 6]];
1566        assert_eq!(expected, transposed);
1567        Ok(())
1568    }
1569
1570    #[test]
1571    fn test_sliced_list_values() {
1572        let data = vec![
1573            Some(vec![Some(0), Some(1), Some(2)]),
1574            None,
1575            Some(vec![Some(3), None, Some(5)]),
1576            Some(vec![Some(6), Some(7)]),
1577        ];
1578
1579        let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data);
1580
1581        assert_eq!(
1582            sliced_list_values(&list).as_primitive(),
1583            &Int32Array::from(vec![
1584                Some(0),
1585                Some(1),
1586                Some(2),
1587                Some(3),
1588                None,
1589                Some(5),
1590                Some(6),
1591                Some(7)
1592            ])
1593        );
1594
1595        assert_eq!(
1596            sliced_list_values(&list.slice(0, 1)).as_primitive(),
1597            &Int32Array::from(vec![Some(0), Some(1), Some(2)])
1598        );
1599
1600        assert_eq!(
1601            sliced_list_values(&list.slice(2, 1)).as_primitive(),
1602            &Int32Array::from(vec![Some(3), None, Some(5)])
1603        );
1604
1605        assert_eq!(
1606            sliced_list_values(&list.slice(3, 1)).as_primitive(),
1607            &Int32Array::from(vec![Some(6), Some(7)])
1608        );
1609
1610        assert!(sliced_list_values(&list.slice(0, 0)).is_empty());
1611        assert!(sliced_list_values(&list.slice(1, 0)).is_empty());
1612        assert!(sliced_list_values(&list.slice(3, 0)).is_empty());
1613    }
1614
1615    #[test]
1616    fn test_adjust_offsets() {
1617        let data = vec![
1618            Some(vec![Some(0), Some(1), Some(2)]),
1619            None,
1620            Some(vec![Some(3), None, Some(5)]),
1621            Some(vec![Some(6), Some(7)]),
1622        ];
1623        let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data);
1624
1625        assert_eq!(
1626            adjust_offsets_for_slice(&list),
1627            OffsetBuffer::from_lengths([3, 0, 3, 2])
1628        );
1629
1630        assert_eq!(
1631            adjust_offsets_for_slice(&list.slice(0, 1)),
1632            OffsetBuffer::from_lengths([3])
1633        );
1634
1635        assert_eq!(
1636            adjust_offsets_for_slice(&list.slice(1, 2)),
1637            OffsetBuffer::from_lengths([0, 3])
1638        );
1639
1640        assert_eq!(
1641            adjust_offsets_for_slice(&list.slice(1, 3)),
1642            OffsetBuffer::from_lengths([0, 3, 2])
1643        );
1644
1645        assert_eq!(
1646            adjust_offsets_for_slice(&list.slice(0, 0)),
1647            OffsetBuffer::from_lengths([])
1648        );
1649
1650        assert_eq!(
1651            adjust_offsets_for_slice(&list.slice(1, 0)),
1652            OffsetBuffer::from_lengths([])
1653        );
1654
1655        assert_eq!(
1656            adjust_offsets_for_slice(&list.slice(3, 0)),
1657            OffsetBuffer::from_lengths([])
1658        );
1659    }
1660
1661    fn create_i32_list(
1662        values: impl Into<Int32Array>,
1663        offsets: OffsetBuffer<i32>,
1664        nulls: Option<NullBuffer>,
1665    ) -> ListArray {
1666        let list_field = Arc::new(Field::new_list_field(DataType::Int32, true));
1667
1668        ListArray::new(list_field, offsets, Arc::new(values.into()), nulls)
1669    }
1670
1671    #[test]
1672    fn test_remove_list_null_values_list() {
1673        let list = Arc::new(create_i32_list(
1674            vec![100, 20, 10, 0, 0, 0, 0, 1, 50],
1675            OffsetBuffer::<i32>::from_lengths(vec![3, 4, 0, 2, 0]),
1676            Some(NullBuffer::from(vec![true, false, false, true, false])),
1677        )) as ArrayRef;
1678
1679        let res = remove_list_null_values(&list).unwrap();
1680        let res = res.as_list::<i32>();
1681
1682        let expected = Arc::new(create_i32_list(
1683            vec![100, 20, 10, 1, 50],
1684            OffsetBuffer::<i32>::from_lengths(vec![3, 0, 0, 2, 0]),
1685            Some(NullBuffer::from(vec![true, false, false, true, false])),
1686        )) as ArrayRef;
1687        let expected = expected.as_list::<i32>();
1688
1689        assert_eq!(res, expected);
1690        // check above skips inner value of nulls
1691        assert_eq!(res.values(), expected.values());
1692        assert_eq!(res.offsets(), expected.offsets());
1693    }
1694
1695    #[test]
1696    fn test_list_array_values_row_number() {
1697        assert_eq!(
1698            variable_size_list_values_row_number::<Int32Type>(
1699                &OffsetBuffer::from_lengths([1, 3, 0, 2,])
1700            ),
1701            Int32Array::from(vec![0, 1, 1, 1, 3, 3])
1702        );
1703
1704        assert_eq!(
1705            variable_size_list_values_row_number::<Int32Type>(
1706                &OffsetBuffer::from_lengths([])
1707            ),
1708            Int32Array::new_null(0)
1709        );
1710
1711        assert_eq!(
1712            variable_size_list_values_row_number::<Int32Type>(
1713                &OffsetBuffer::from_lengths([0])
1714            ),
1715            Int32Array::new_null(0)
1716        );
1717
1718        assert_eq!(
1719            variable_size_list_values_row_number::<Int32Type>(
1720                &OffsetBuffer::from_lengths([0, 0])
1721            ),
1722            Int32Array::new_null(0)
1723        );
1724
1725        assert_eq!(
1726            variable_size_list_values_row_number::<Int32Type>(
1727                &OffsetBuffer::from_lengths([1])
1728            ),
1729            Int32Array::from(vec![0])
1730        );
1731
1732        assert_eq!(
1733            variable_size_list_values_row_number::<Int32Type>(
1734                &OffsetBuffer::from_lengths([2])
1735            ),
1736            Int32Array::from(vec![0, 0])
1737        );
1738    }
1739
1740    #[test]
1741    fn test_fsl_values_row_number() {
1742        assert_eq!(
1743            fsl_values_row_number(2, 3).unwrap(),
1744            Int32Array::from(vec![0, 0, 1, 1, 2, 2])
1745        );
1746
1747        assert_eq!(
1748            fsl_values_row_number(1, 3).unwrap(),
1749            Int32Array::from(vec![0, 1, 2])
1750        );
1751
1752        assert_eq!(
1753            fsl_values_row_number(2, 1).unwrap(),
1754            Int32Array::from(vec![0, 0])
1755        );
1756
1757        assert_eq!(
1758            fsl_values_row_number(2, 0).unwrap(),
1759            Int32Array::new_null(0),
1760        );
1761
1762        assert_eq!(
1763            fsl_values_row_number(0, 2).unwrap(),
1764            Int32Array::new_null(0),
1765        );
1766
1767        assert_eq!(
1768            fsl_values_row_number(0, 0).unwrap(),
1769            Int32Array::new_null(0),
1770        );
1771
1772        fsl_values_row_number(-1, 2).unwrap_err();
1773        fsl_values_row_number(-1, 0).unwrap_err();
1774    }
1775}