datafusion_common/utils/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module provides the bisect function, which implements binary search.
19
20pub mod expr;
21pub mod memory;
22pub mod proxy;
23pub mod string_utils;
24
25use crate::error::{_exec_datafusion_err, _internal_datafusion_err, _internal_err};
26use crate::{Result, ScalarValue};
27use arrow::array::{
28    cast::AsArray, Array, ArrayRef, FixedSizeListArray, LargeListArray, ListArray,
29    OffsetSizeTrait,
30};
31use arrow::buffer::OffsetBuffer;
32use arrow::compute::{partition, SortColumn, SortOptions};
33use arrow::datatypes::{DataType, Field, SchemaRef};
34#[cfg(feature = "sql")]
35use sqlparser::{ast::Ident, dialect::GenericDialect, parser::Parser};
36use std::borrow::{Borrow, Cow};
37use std::cmp::{min, Ordering};
38use std::collections::HashSet;
39use std::num::NonZero;
40use std::ops::Range;
41use std::sync::Arc;
42use std::thread::available_parallelism;
43
44/// Applies an optional projection to a [`SchemaRef`], returning the
45/// projected schema
46///
47/// Example:
48/// ```
49/// use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
50/// use datafusion_common::project_schema;
51///
52/// // Schema with columns 'a', 'b', and 'c'
53/// let schema = SchemaRef::new(Schema::new(vec![
54///     Field::new("a", DataType::Int32, true),
55///     Field::new("b", DataType::Int64, true),
56///     Field::new("c", DataType::Utf8, true),
57/// ]));
58///
59/// // Pick columns 'c' and 'b'
60/// let projection = Some(vec![2, 1]);
61/// let projected_schema = project_schema(&schema, projection.as_ref()).unwrap();
62///
63/// let expected_schema = SchemaRef::new(Schema::new(vec![
64///     Field::new("c", DataType::Utf8, true),
65///     Field::new("b", DataType::Int64, true),
66/// ]));
67///
68/// assert_eq!(projected_schema, expected_schema);
69/// ```
70pub fn project_schema(
71    schema: &SchemaRef,
72    projection: Option<&Vec<usize>>,
73) -> Result<SchemaRef> {
74    let schema = match projection {
75        Some(columns) => Arc::new(schema.project(columns)?),
76        None => Arc::clone(schema),
77    };
78    Ok(schema)
79}
80
81/// Extracts a row at the specified index from a set of columns and stores it in the provided buffer.
82pub fn extract_row_at_idx_to_buf(
83    columns: &[ArrayRef],
84    idx: usize,
85    buf: &mut Vec<ScalarValue>,
86) -> Result<()> {
87    buf.clear();
88
89    let iter = columns
90        .iter()
91        .map(|arr| ScalarValue::try_from_array(arr, idx));
92    for v in iter.into_iter() {
93        buf.push(v?);
94    }
95
96    Ok(())
97}
98/// Given column vectors, returns row at `idx`.
99pub fn get_row_at_idx(columns: &[ArrayRef], idx: usize) -> Result<Vec<ScalarValue>> {
100    columns
101        .iter()
102        .map(|arr| ScalarValue::try_from_array(arr, idx))
103        .collect()
104}
105
106/// This function compares two tuples depending on the given sort options.
107pub fn compare_rows(
108    x: &[ScalarValue],
109    y: &[ScalarValue],
110    sort_options: &[SortOptions],
111) -> Result<Ordering> {
112    let zip_it = x.iter().zip(y.iter()).zip(sort_options.iter());
113    // Preserving lexical ordering.
114    for ((lhs, rhs), sort_options) in zip_it {
115        // Consider all combinations of NULLS FIRST/LAST and ASC/DESC configurations.
116        let result = match (lhs.is_null(), rhs.is_null(), sort_options.nulls_first) {
117            (true, false, false) | (false, true, true) => Ordering::Greater,
118            (true, false, true) | (false, true, false) => Ordering::Less,
119            (false, false, _) => {
120                if sort_options.descending {
121                    rhs.try_cmp(lhs)?
122                } else {
123                    lhs.try_cmp(rhs)?
124                }
125            }
126            (true, true, _) => continue,
127        };
128        if result != Ordering::Equal {
129            return Ok(result);
130        }
131    }
132    Ok(Ordering::Equal)
133}
134
135/// This function searches for a tuple of given values (`target`) among the given
136/// rows (`item_columns`) using the bisection algorithm. It assumes that `item_columns`
137/// is sorted according to `sort_options` and returns the insertion index of `target`.
138/// Template argument `SIDE` being `true`/`false` means left/right insertion.
139pub fn bisect<const SIDE: bool>(
140    item_columns: &[ArrayRef],
141    target: &[ScalarValue],
142    sort_options: &[SortOptions],
143) -> Result<usize> {
144    let low: usize = 0;
145    let high: usize = item_columns
146        .first()
147        .ok_or_else(|| _internal_datafusion_err!("Column array shouldn't be empty"))?
148        .len();
149    let compare_fn = |current: &[ScalarValue], target: &[ScalarValue]| {
150        let cmp = compare_rows(current, target, sort_options)?;
151        Ok(if SIDE { cmp.is_lt() } else { cmp.is_le() })
152    };
153    find_bisect_point(item_columns, target, compare_fn, low, high)
154}
155
156/// This function searches for a tuple of given values (`target`) among a slice of
157/// the given rows (`item_columns`) using the bisection algorithm. The slice starts
158/// at the index `low` and ends at the index `high`. The boolean-valued function
159/// `compare_fn` specifies whether we bisect on the left (by returning `false`),
160/// or on the right (by returning `true`) when we compare the target value with
161/// the current value as we iteratively bisect the input.
162pub fn find_bisect_point<F>(
163    item_columns: &[ArrayRef],
164    target: &[ScalarValue],
165    compare_fn: F,
166    mut low: usize,
167    mut high: usize,
168) -> Result<usize>
169where
170    F: Fn(&[ScalarValue], &[ScalarValue]) -> Result<bool>,
171{
172    while low < high {
173        let mid = ((high - low) / 2) + low;
174        let val = get_row_at_idx(item_columns, mid)?;
175        if compare_fn(&val, target)? {
176            low = mid + 1;
177        } else {
178            high = mid;
179        }
180    }
181    Ok(low)
182}
183
184/// This function searches for a tuple of given values (`target`) among the given
185/// rows (`item_columns`) via a linear scan. It assumes that `item_columns` is sorted
186/// according to `sort_options` and returns the insertion index of `target`.
187/// Template argument `SIDE` being `true`/`false` means left/right insertion.
188pub fn linear_search<const SIDE: bool>(
189    item_columns: &[ArrayRef],
190    target: &[ScalarValue],
191    sort_options: &[SortOptions],
192) -> Result<usize> {
193    let low: usize = 0;
194    let high: usize = item_columns
195        .first()
196        .ok_or_else(|| _internal_datafusion_err!("Column array shouldn't be empty"))?
197        .len();
198    let compare_fn = |current: &[ScalarValue], target: &[ScalarValue]| {
199        let cmp = compare_rows(current, target, sort_options)?;
200        Ok(if SIDE { cmp.is_lt() } else { cmp.is_le() })
201    };
202    search_in_slice(item_columns, target, compare_fn, low, high)
203}
204
205/// This function searches for a tuple of given values (`target`) among a slice of
206/// the given rows (`item_columns`) via a linear scan. The slice starts at the index
207/// `low` and ends at the index `high`. The boolean-valued function `compare_fn`
208/// specifies the stopping criterion.
209pub fn search_in_slice<F>(
210    item_columns: &[ArrayRef],
211    target: &[ScalarValue],
212    compare_fn: F,
213    mut low: usize,
214    high: usize,
215) -> Result<usize>
216where
217    F: Fn(&[ScalarValue], &[ScalarValue]) -> Result<bool>,
218{
219    while low < high {
220        let val = get_row_at_idx(item_columns, low)?;
221        if !compare_fn(&val, target)? {
222            break;
223        }
224        low += 1;
225    }
226    Ok(low)
227}
228
229/// Given a list of 0 or more already sorted columns, finds the
230/// partition ranges that would partition equally across columns.
231///
232/// See [`partition`] for more details.
233pub fn evaluate_partition_ranges(
234    num_rows: usize,
235    partition_columns: &[SortColumn],
236) -> Result<Vec<Range<usize>>> {
237    Ok(if partition_columns.is_empty() {
238        vec![Range {
239            start: 0,
240            end: num_rows,
241        }]
242    } else {
243        let cols: Vec<_> = partition_columns
244            .iter()
245            .map(|x| Arc::clone(&x.values))
246            .collect();
247        partition(&cols)?.ranges()
248    })
249}
250
251/// Wraps identifier string in double quotes, escaping any double quotes in
252/// the identifier by replacing it with two double quotes
253///
254/// e.g. identifier `tab.le"name` becomes `"tab.le""name"`
255pub fn quote_identifier(s: &str) -> Cow<'_, str> {
256    if needs_quotes(s) {
257        Cow::Owned(format!("\"{}\"", s.replace('"', "\"\"")))
258    } else {
259        Cow::Borrowed(s)
260    }
261}
262
263/// returns true if this identifier needs quotes
264fn needs_quotes(s: &str) -> bool {
265    let mut chars = s.chars();
266
267    // first char can not be a number unless escaped
268    if let Some(first_char) = chars.next() {
269        if !(first_char.is_ascii_lowercase() || first_char == '_') {
270            return true;
271        }
272    }
273
274    !chars.all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_')
275}
276
277#[cfg(feature = "sql")]
278pub(crate) fn parse_identifiers(s: &str) -> Result<Vec<Ident>> {
279    let dialect = GenericDialect;
280    let mut parser = Parser::new(&dialect).try_with_sql(s)?;
281    let idents = parser.parse_multipart_identifier()?;
282    Ok(idents)
283}
284
285/// Parse a string into a vector of identifiers.
286///
287/// Note: If ignore_case is false, the string will be normalized to lowercase.
288#[cfg(feature = "sql")]
289pub(crate) fn parse_identifiers_normalized(s: &str, ignore_case: bool) -> Vec<String> {
290    parse_identifiers(s)
291        .unwrap_or_default()
292        .into_iter()
293        .map(|id| match id.quote_style {
294            Some(_) => id.value,
295            None if ignore_case => id.value,
296            _ => id.value.to_ascii_lowercase(),
297        })
298        .collect::<Vec<_>>()
299}
300
301#[cfg(not(feature = "sql"))]
302pub(crate) fn parse_identifiers(s: &str) -> Result<Vec<String>> {
303    let mut result = Vec::new();
304    let mut current = String::new();
305    let mut in_quotes = false;
306
307    for ch in s.chars() {
308        match ch {
309            '"' => {
310                in_quotes = !in_quotes;
311                current.push(ch);
312            }
313            '.' if !in_quotes => {
314                result.push(current.clone());
315                current.clear();
316            }
317            _ => {
318                current.push(ch);
319            }
320        }
321    }
322
323    // Push the last part if it's not empty
324    if !current.is_empty() {
325        result.push(current);
326    }
327
328    Ok(result)
329}
330
331#[cfg(not(feature = "sql"))]
332pub(crate) fn parse_identifiers_normalized(s: &str, ignore_case: bool) -> Vec<String> {
333    parse_identifiers(s)
334        .unwrap_or_default()
335        .into_iter()
336        .map(|id| {
337            let is_double_quoted = if id.len() > 2 {
338                let mut chars = id.chars();
339                chars.next() == Some('"') && chars.last() == Some('"')
340            } else {
341                false
342            };
343            if is_double_quoted {
344                id[1..id.len() - 1].to_string().replace("\"\"", "\"")
345            } else if ignore_case {
346                id
347            } else {
348                id.to_ascii_lowercase()
349            }
350        })
351        .collect::<Vec<_>>()
352}
353
354/// This function "takes" the elements at `indices` from the slice `items`.
355pub fn get_at_indices<T: Clone, I: Borrow<usize>>(
356    items: &[T],
357    indices: impl IntoIterator<Item = I>,
358) -> Result<Vec<T>> {
359    indices
360        .into_iter()
361        .map(|idx| items.get(*idx.borrow()).cloned())
362        .collect::<Option<Vec<T>>>()
363        .ok_or_else(|| {
364            _exec_datafusion_err!("Expects indices to be in the range of searched vector")
365        })
366}
367
368/// This function finds the longest prefix of the form 0, 1, 2, ... within the
369/// collection `sequence`. Examples:
370/// - For 0, 1, 2, 4, 5; we would produce 3, meaning 0, 1, 2 is the longest satisfying
371///   prefix.
372/// - For 1, 2, 3, 4; we would produce 0, meaning there is no such prefix.
373pub fn longest_consecutive_prefix<T: Borrow<usize>>(
374    sequence: impl IntoIterator<Item = T>,
375) -> usize {
376    let mut count = 0;
377    for item in sequence {
378        if !count.eq(item.borrow()) {
379            break;
380        }
381        count += 1;
382    }
383    count
384}
385
386/// Creates single element [`ListArray`], [`LargeListArray`] and
387/// [`FixedSizeListArray`] from other arrays
388///
389/// For example this builder can convert `[1, 2, 3]` into `[[1, 2, 3]]`
390///
391/// # Example
392/// ```
393/// # use std::sync::Arc;
394/// # use arrow::array::{Array, ListArray};
395/// # use arrow::array::types::Int64Type;
396/// # use datafusion_common::utils::SingleRowListArrayBuilder;
397/// // Array is [1, 2, 3]
398/// let arr = ListArray::from_iter_primitive::<Int64Type, _, _>(vec![Some(vec![
399///     Some(1),
400///     Some(2),
401///     Some(3),
402/// ])]);
403/// // Wrap as a list array: [[1, 2, 3]]
404/// let list_arr = SingleRowListArrayBuilder::new(Arc::new(arr)).build_list_array();
405/// assert_eq!(list_arr.len(), 1);
406/// ```
407#[derive(Debug, Clone)]
408pub struct SingleRowListArrayBuilder {
409    /// array to be wrapped
410    arr: ArrayRef,
411    /// Should the resulting array be nullable? Defaults to `true`.
412    nullable: bool,
413    /// Specify the field name for the resulting array. Defaults to value used in
414    /// [`Field::new_list_field`]
415    field_name: Option<String>,
416}
417
418impl SingleRowListArrayBuilder {
419    /// Create a new instance of [`SingleRowListArrayBuilder`]
420    pub fn new(arr: ArrayRef) -> Self {
421        Self {
422            arr,
423            nullable: true,
424            field_name: None,
425        }
426    }
427
428    /// Set the nullable flag
429    pub fn with_nullable(mut self, nullable: bool) -> Self {
430        self.nullable = nullable;
431        self
432    }
433
434    /// sets the field name for the resulting array
435    pub fn with_field_name(mut self, field_name: Option<String>) -> Self {
436        self.field_name = field_name;
437        self
438    }
439
440    /// Copies field name and nullable from the specified field
441    pub fn with_field(self, field: &Field) -> Self {
442        self.with_field_name(Some(field.name().to_owned()))
443            .with_nullable(field.is_nullable())
444    }
445
446    /// Build a single element [`ListArray`]
447    pub fn build_list_array(self) -> ListArray {
448        let (field, arr) = self.into_field_and_arr();
449        let offsets = OffsetBuffer::from_lengths([arr.len()]);
450        ListArray::new(field, offsets, arr, None)
451    }
452
453    /// Build a single element [`ListArray`] and wrap as [`ScalarValue::List`]
454    pub fn build_list_scalar(self) -> ScalarValue {
455        ScalarValue::List(Arc::new(self.build_list_array()))
456    }
457
458    /// Build a single element [`LargeListArray`]
459    pub fn build_large_list_array(self) -> LargeListArray {
460        let (field, arr) = self.into_field_and_arr();
461        let offsets = OffsetBuffer::from_lengths([arr.len()]);
462        LargeListArray::new(field, offsets, arr, None)
463    }
464
465    /// Build a single element [`LargeListArray`] and wrap as [`ScalarValue::LargeList`]
466    pub fn build_large_list_scalar(self) -> ScalarValue {
467        ScalarValue::LargeList(Arc::new(self.build_large_list_array()))
468    }
469
470    /// Build a single element [`FixedSizeListArray`]
471    pub fn build_fixed_size_list_array(self, list_size: usize) -> FixedSizeListArray {
472        let (field, arr) = self.into_field_and_arr();
473        FixedSizeListArray::new(field, list_size as i32, arr, None)
474    }
475
476    /// Build a single element [`FixedSizeListArray`] and wrap as [`ScalarValue::FixedSizeList`]
477    pub fn build_fixed_size_list_scalar(self, list_size: usize) -> ScalarValue {
478        ScalarValue::FixedSizeList(Arc::new(self.build_fixed_size_list_array(list_size)))
479    }
480
481    /// Helper function: convert this builder into a tuple of field and array
482    fn into_field_and_arr(self) -> (Arc<Field>, ArrayRef) {
483        let Self {
484            arr,
485            nullable,
486            field_name,
487        } = self;
488        let data_type = arr.data_type().to_owned();
489        let field = match field_name {
490            Some(name) => Field::new(name, data_type, nullable),
491            None => Field::new_list_field(data_type, nullable),
492        };
493        (Arc::new(field), arr)
494    }
495}
496
497/// Wrap arrays into a single element `ListArray`.
498///
499/// Example:
500/// ```
501/// use arrow::array::{Int32Array, ListArray, ArrayRef};
502/// use arrow::datatypes::{Int32Type, Field};
503/// use std::sync::Arc;
504///
505/// let arr1 = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
506/// let arr2 = Arc::new(Int32Array::from(vec![4, 5, 6])) as ArrayRef;
507///
508/// let list_arr = datafusion_common::utils::arrays_into_list_array([arr1, arr2]).unwrap();
509///
510/// let expected = ListArray::from_iter_primitive::<Int32Type, _, _>(
511///    vec![
512///     Some(vec![Some(1), Some(2), Some(3)]),
513///     Some(vec![Some(4), Some(5), Some(6)]),
514///    ]
515/// );
516///
517/// assert_eq!(list_arr, expected);
518pub fn arrays_into_list_array(
519    arr: impl IntoIterator<Item = ArrayRef>,
520) -> Result<ListArray> {
521    let arr = arr.into_iter().collect::<Vec<_>>();
522    if arr.is_empty() {
523        return _internal_err!("Cannot wrap empty array into list array");
524    }
525
526    let lens = arr.iter().map(|x| x.len()).collect::<Vec<_>>();
527    // Assume data type is consistent
528    let data_type = arr[0].data_type().to_owned();
529    let values = arr.iter().map(|x| x.as_ref()).collect::<Vec<_>>();
530    Ok(ListArray::new(
531        Arc::new(Field::new_list_field(data_type, true)),
532        OffsetBuffer::from_lengths(lens),
533        arrow::compute::concat(values.as_slice())?,
534        None,
535    ))
536}
537
538/// Helper function to convert a ListArray into a vector of ArrayRefs.
539pub fn list_to_arrays<O: OffsetSizeTrait>(a: &ArrayRef) -> Vec<ArrayRef> {
540    a.as_list::<O>().iter().flatten().collect::<Vec<_>>()
541}
542
543/// Helper function to convert a FixedSizeListArray into a vector of ArrayRefs.
544pub fn fixed_size_list_to_arrays(a: &ArrayRef) -> Vec<ArrayRef> {
545    a.as_fixed_size_list().iter().flatten().collect::<Vec<_>>()
546}
547
548/// Get the base type of a data type.
549///
550/// Example
551/// ```
552/// use arrow::datatypes::{DataType, Field};
553/// use datafusion_common::utils::base_type;
554/// use std::sync::Arc;
555///
556/// let data_type =
557///     DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
558/// assert_eq!(base_type(&data_type), DataType::Int32);
559///
560/// let data_type = DataType::Int32;
561/// assert_eq!(base_type(&data_type), DataType::Int32);
562/// ```
563pub fn base_type(data_type: &DataType) -> DataType {
564    match data_type {
565        DataType::List(field)
566        | DataType::LargeList(field)
567        | DataType::FixedSizeList(field, _) => base_type(field.data_type()),
568        _ => data_type.to_owned(),
569    }
570}
571
572/// Information about how to coerce lists.
573#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
574pub enum ListCoercion {
575    /// [`DataType::FixedSizeList`] should be coerced to [`DataType::List`].
576    FixedSizedListToList,
577}
578
579/// A helper function to coerce base type in List.
580///
581/// Example
582/// ```
583/// use arrow::datatypes::{DataType, Field};
584/// use datafusion_common::utils::coerced_type_with_base_type_only;
585/// use std::sync::Arc;
586///
587/// let data_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
588/// let base_type = DataType::Float64;
589/// let coerced_type = coerced_type_with_base_type_only(&data_type, &base_type, None);
590/// assert_eq!(coerced_type, DataType::List(Arc::new(Field::new_list_field(DataType::Float64, true))));
591pub fn coerced_type_with_base_type_only(
592    data_type: &DataType,
593    base_type: &DataType,
594    array_coercion: Option<&ListCoercion>,
595) -> DataType {
596    match (data_type, array_coercion) {
597        (DataType::List(field), _)
598        | (DataType::FixedSizeList(field, _), Some(ListCoercion::FixedSizedListToList)) =>
599        {
600            let field_type = coerced_type_with_base_type_only(
601                field.data_type(),
602                base_type,
603                array_coercion,
604            );
605
606            DataType::List(Arc::new(Field::new(
607                field.name(),
608                field_type,
609                field.is_nullable(),
610            )))
611        }
612        (DataType::FixedSizeList(field, len), _) => {
613            let field_type = coerced_type_with_base_type_only(
614                field.data_type(),
615                base_type,
616                array_coercion,
617            );
618
619            DataType::FixedSizeList(
620                Arc::new(Field::new(field.name(), field_type, field.is_nullable())),
621                *len,
622            )
623        }
624        (DataType::LargeList(field), _) => {
625            let field_type = coerced_type_with_base_type_only(
626                field.data_type(),
627                base_type,
628                array_coercion,
629            );
630
631            DataType::LargeList(Arc::new(Field::new(
632                field.name(),
633                field_type,
634                field.is_nullable(),
635            )))
636        }
637
638        _ => base_type.clone(),
639    }
640}
641
642/// Recursively coerce and `FixedSizeList` elements to `List`
643pub fn coerced_fixed_size_list_to_list(data_type: &DataType) -> DataType {
644    match data_type {
645        DataType::List(field) | DataType::FixedSizeList(field, _) => {
646            let field_type = coerced_fixed_size_list_to_list(field.data_type());
647
648            DataType::List(Arc::new(Field::new(
649                field.name(),
650                field_type,
651                field.is_nullable(),
652            )))
653        }
654        DataType::LargeList(field) => {
655            let field_type = coerced_fixed_size_list_to_list(field.data_type());
656
657            DataType::LargeList(Arc::new(Field::new(
658                field.name(),
659                field_type,
660                field.is_nullable(),
661            )))
662        }
663
664        _ => data_type.clone(),
665    }
666}
667
668/// Compute the number of dimensions in a list data type.
669pub fn list_ndims(data_type: &DataType) -> u64 {
670    match data_type {
671        DataType::List(field)
672        | DataType::LargeList(field)
673        | DataType::FixedSizeList(field, _) => 1 + list_ndims(field.data_type()),
674        _ => 0,
675    }
676}
677
678/// Adopted from strsim-rs for string similarity metrics
679pub mod datafusion_strsim {
680    // Source: https://github.com/dguo/strsim-rs/blob/master/src/lib.rs
681    // License: https://github.com/dguo/strsim-rs/blob/master/LICENSE
682    use std::cmp::min;
683    use std::str::Chars;
684
685    struct StringWrapper<'a>(&'a str);
686
687    impl<'b> IntoIterator for &StringWrapper<'b> {
688        type Item = char;
689        type IntoIter = Chars<'b>;
690
691        fn into_iter(self) -> Self::IntoIter {
692            self.0.chars()
693        }
694    }
695
696    /// Calculates the minimum number of insertions, deletions, and substitutions
697    /// required to change one sequence into the other.
698    fn generic_levenshtein<'a, 'b, Iter1, Iter2, Elem1, Elem2>(
699        a: &'a Iter1,
700        b: &'b Iter2,
701    ) -> usize
702    where
703        &'a Iter1: IntoIterator<Item = Elem1>,
704        &'b Iter2: IntoIterator<Item = Elem2>,
705        Elem1: PartialEq<Elem2>,
706    {
707        let b_len = b.into_iter().count();
708
709        if a.into_iter().next().is_none() {
710            return b_len;
711        }
712
713        let mut cache: Vec<usize> = (1..b_len + 1).collect();
714
715        let mut result = 0;
716
717        for (i, a_elem) in a.into_iter().enumerate() {
718            result = i + 1;
719            let mut distance_b = i;
720
721            for (j, b_elem) in b.into_iter().enumerate() {
722                let cost = if a_elem == b_elem { 0usize } else { 1usize };
723                let distance_a = distance_b + cost;
724                distance_b = cache[j];
725                result = min(result + 1, min(distance_a, distance_b + 1));
726                cache[j] = result;
727            }
728        }
729
730        result
731    }
732
733    /// Calculates the minimum number of insertions, deletions, and substitutions
734    /// required to change one string into the other.
735    ///
736    /// ```
737    /// use datafusion_common::utils::datafusion_strsim::levenshtein;
738    ///
739    /// assert_eq!(3, levenshtein("kitten", "sitting"));
740    /// ```
741    pub fn levenshtein(a: &str, b: &str) -> usize {
742        generic_levenshtein(&StringWrapper(a), &StringWrapper(b))
743    }
744
745    /// Calculates the normalized Levenshtein distance between two strings.
746    /// The normalized distance is a value between 0.0 and 1.0, where 1.0 indicates
747    /// that the strings are identical and 0.0 indicates no similarity.
748    ///
749    /// ```
750    /// use datafusion_common::utils::datafusion_strsim::normalized_levenshtein;
751    ///
752    /// assert!((normalized_levenshtein("kitten", "sitting") - 0.57142).abs() < 0.00001);
753    ///
754    /// assert!(normalized_levenshtein("", "second").abs() < 0.00001);
755    ///
756    /// assert!((normalized_levenshtein("kitten", "sitten") - 0.833).abs() < 0.001);
757    /// ```
758    pub fn normalized_levenshtein(a: &str, b: &str) -> f64 {
759        if a.is_empty() && b.is_empty() {
760            return 1.0;
761        }
762        1.0 - (levenshtein(a, b) as f64)
763            / (a.chars().count().max(b.chars().count()) as f64)
764    }
765}
766
767/// Merges collections `first` and `second`, removes duplicates and sorts the
768/// result, returning it as a [`Vec`].
769pub fn merge_and_order_indices<T: Borrow<usize>, S: Borrow<usize>>(
770    first: impl IntoIterator<Item = T>,
771    second: impl IntoIterator<Item = S>,
772) -> Vec<usize> {
773    let mut result: Vec<_> = first
774        .into_iter()
775        .map(|e| *e.borrow())
776        .chain(second.into_iter().map(|e| *e.borrow()))
777        .collect::<HashSet<_>>()
778        .into_iter()
779        .collect();
780    result.sort();
781    result
782}
783
784/// Calculates the set difference between sequences `first` and `second`,
785/// returning the result as a [`Vec`]. Preserves the ordering of `first`.
786pub fn set_difference<T: Borrow<usize>, S: Borrow<usize>>(
787    first: impl IntoIterator<Item = T>,
788    second: impl IntoIterator<Item = S>,
789) -> Vec<usize> {
790    let set: HashSet<_> = second.into_iter().map(|e| *e.borrow()).collect();
791    first
792        .into_iter()
793        .map(|e| *e.borrow())
794        .filter(|e| !set.contains(e))
795        .collect()
796}
797
798/// Find indices of each element in `targets` inside `items`. If one of the
799/// elements is absent in `items`, returns an error.
800pub fn find_indices<T: PartialEq, S: Borrow<T>>(
801    items: &[T],
802    targets: impl IntoIterator<Item = S>,
803) -> Result<Vec<usize>> {
804    targets
805        .into_iter()
806        .map(|target| items.iter().position(|e| target.borrow().eq(e)))
807        .collect::<Option<_>>()
808        .ok_or_else(|| _exec_datafusion_err!("Target not found"))
809}
810
811/// Transposes the given vector of vectors.
812pub fn transpose<T>(original: Vec<Vec<T>>) -> Vec<Vec<T>> {
813    match original.as_slice() {
814        [] => vec![],
815        [first, ..] => {
816            let mut result = (0..first.len()).map(|_| vec![]).collect::<Vec<_>>();
817            for row in original {
818                for (item, transposed_row) in row.into_iter().zip(&mut result) {
819                    transposed_row.push(item);
820                }
821            }
822            result
823        }
824    }
825}
826
827/// Computes the `skip` and `fetch` parameters of a single limit that would be
828/// equivalent to two consecutive limits with the given `skip`/`fetch` parameters.
829///
830/// There are multiple cases to consider:
831///
832/// # Case 0: Parent and child are disjoint (`child_fetch <= skip`).
833///
834/// ```text
835///   Before merging:
836///                     |........skip........|---fetch-->|     Parent limit
837///    |...child_skip...|---child_fetch-->|                    Child limit
838/// ```
839///
840///   After merging:
841/// ```text
842///    |.........(child_skip + skip).........|
843/// ```
844///
845/// # Case 1: Parent is beyond child's range (`skip < child_fetch <= skip + fetch`).
846///
847///   Before merging:
848/// ```text
849///                     |...skip...|------------fetch------------>|   Parent limit
850///    |...child_skip...|-------------child_fetch------------>|       Child limit
851/// ```
852///
853///   After merging:
854/// ```text
855///    |....(child_skip + skip)....|---(child_fetch - skip)-->|
856/// ```
857///
858///  # Case 2: Parent is within child's range (`skip + fetch < child_fetch`).
859///
860///   Before merging:
861/// ```text
862///                     |...skip...|---fetch-->|                   Parent limit
863///    |...child_skip...|-------------child_fetch------------>|    Child limit
864/// ```
865///
866///   After merging:
867/// ```text
868///    |....(child_skip + skip)....|---fetch-->|
869/// ```
870pub fn combine_limit(
871    parent_skip: usize,
872    parent_fetch: Option<usize>,
873    child_skip: usize,
874    child_fetch: Option<usize>,
875) -> (usize, Option<usize>) {
876    let combined_skip = child_skip.saturating_add(parent_skip);
877
878    let combined_fetch = match (parent_fetch, child_fetch) {
879        (Some(parent_fetch), Some(child_fetch)) => {
880            Some(min(parent_fetch, child_fetch.saturating_sub(parent_skip)))
881        }
882        (Some(parent_fetch), None) => Some(parent_fetch),
883        (None, Some(child_fetch)) => Some(child_fetch.saturating_sub(parent_skip)),
884        (None, None) => None,
885    };
886
887    (combined_skip, combined_fetch)
888}
889
890/// Returns the estimated number of threads available for parallel execution.
891///
892/// This is a wrapper around `std::thread::available_parallelism`, providing a default value
893/// of `1` if the system's parallelism cannot be determined.
894pub fn get_available_parallelism() -> usize {
895    available_parallelism()
896        .unwrap_or(NonZero::new(1).expect("literal value `1` shouldn't be zero"))
897        .get()
898}
899
900/// Converts a collection of function arguments into a fixed-size array of length N
901/// producing a reasonable error message in case of unexpected number of arguments.
902///
903/// # Example
904/// ```
905/// # use datafusion_common::Result;
906/// # use datafusion_common::utils::take_function_args;
907/// # use datafusion_common::ScalarValue;
908/// fn my_function(args: &[ScalarValue]) -> Result<()> {
909///     // function expects 2 args, so create a 2-element array
910///     let [arg1, arg2] = take_function_args("my_function", args)?;
911///     // ... do stuff..
912///     Ok(())
913/// }
914///
915/// // Calling the function with 1 argument produces an error:
916/// let args = vec![ScalarValue::Int32(Some(10))];
917/// let err = my_function(&args).unwrap_err();
918/// assert_eq!(
919///     err.to_string(),
920///     "Execution error: my_function function requires 2 arguments, got 1"
921/// );
922/// // Calling the function with 2 arguments works great
923/// let args = vec![ScalarValue::Int32(Some(10)), ScalarValue::Int32(Some(20))];
924/// my_function(&args).unwrap();
925/// ```
926pub fn take_function_args<const N: usize, T>(
927    function_name: &str,
928    args: impl IntoIterator<Item = T>,
929) -> Result<[T; N]> {
930    let args = args.into_iter().collect::<Vec<_>>();
931    args.try_into().map_err(|v: Vec<T>| {
932        _exec_datafusion_err!(
933            "{} function requires {} {}, got {}",
934            function_name,
935            N,
936            if N == 1 { "argument" } else { "arguments" },
937            v.len()
938        )
939    })
940}
941
942#[cfg(test)]
943mod tests {
944    use super::*;
945    use crate::ScalarValue::Null;
946    use arrow::array::Float64Array;
947    use sqlparser::ast::Ident;
948    use sqlparser::tokenizer::Span;
949
950    #[test]
951    fn test_bisect_linear_left_and_right() -> Result<()> {
952        let arrays: Vec<ArrayRef> = vec![
953            Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 9., 10.])),
954            Arc::new(Float64Array::from(vec![2.0, 3.0, 3.0, 4.0, 5.0])),
955            Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 10., 11.0])),
956            Arc::new(Float64Array::from(vec![15.0, 13.0, 8.0, 5., 0.0])),
957        ];
958        let search_tuple: Vec<ScalarValue> = vec![
959            ScalarValue::Float64(Some(8.0)),
960            ScalarValue::Float64(Some(3.0)),
961            ScalarValue::Float64(Some(8.0)),
962            ScalarValue::Float64(Some(8.0)),
963        ];
964        let ords = [
965            SortOptions {
966                descending: false,
967                nulls_first: true,
968            },
969            SortOptions {
970                descending: false,
971                nulls_first: true,
972            },
973            SortOptions {
974                descending: false,
975                nulls_first: true,
976            },
977            SortOptions {
978                descending: true,
979                nulls_first: true,
980            },
981        ];
982        let res = bisect::<true>(&arrays, &search_tuple, &ords)?;
983        assert_eq!(res, 2);
984        let res = bisect::<false>(&arrays, &search_tuple, &ords)?;
985        assert_eq!(res, 3);
986        let res = linear_search::<true>(&arrays, &search_tuple, &ords)?;
987        assert_eq!(res, 2);
988        let res = linear_search::<false>(&arrays, &search_tuple, &ords)?;
989        assert_eq!(res, 3);
990        Ok(())
991    }
992
993    #[test]
994    fn vector_ord() {
995        assert!(vec![1, 0, 0, 0, 0, 0, 0, 1] < vec![1, 0, 0, 0, 0, 0, 0, 2]);
996        assert!(vec![1, 0, 0, 0, 0, 0, 1, 1] > vec![1, 0, 0, 0, 0, 0, 0, 2]);
997        assert!(
998            vec![
999                ScalarValue::Int32(Some(2)),
1000                Null,
1001                ScalarValue::Int32(Some(0)),
1002            ] < vec![
1003                ScalarValue::Int32(Some(2)),
1004                Null,
1005                ScalarValue::Int32(Some(1)),
1006            ]
1007        );
1008        assert!(
1009            vec![
1010                ScalarValue::Int32(Some(2)),
1011                ScalarValue::Int32(None),
1012                ScalarValue::Int32(Some(0)),
1013            ] < vec![
1014                ScalarValue::Int32(Some(2)),
1015                ScalarValue::Int32(None),
1016                ScalarValue::Int32(Some(1)),
1017            ]
1018        );
1019    }
1020
1021    #[test]
1022    fn ord_same_type() {
1023        assert!((ScalarValue::Int32(Some(2)) < ScalarValue::Int32(Some(3))));
1024    }
1025
1026    #[test]
1027    fn test_bisect_linear_left_and_right_diff_sort() -> Result<()> {
1028        // Descending, left
1029        let arrays: Vec<ArrayRef> =
1030            vec![Arc::new(Float64Array::from(vec![4.0, 3.0, 2.0, 1.0, 0.0]))];
1031        let search_tuple: Vec<ScalarValue> = vec![ScalarValue::Float64(Some(4.0))];
1032        let ords = [SortOptions {
1033            descending: true,
1034            nulls_first: true,
1035        }];
1036        let res = bisect::<true>(&arrays, &search_tuple, &ords)?;
1037        assert_eq!(res, 0);
1038        let res = linear_search::<true>(&arrays, &search_tuple, &ords)?;
1039        assert_eq!(res, 0);
1040
1041        // Descending, right
1042        let arrays: Vec<ArrayRef> =
1043            vec![Arc::new(Float64Array::from(vec![4.0, 3.0, 2.0, 1.0, 0.0]))];
1044        let search_tuple: Vec<ScalarValue> = vec![ScalarValue::Float64(Some(4.0))];
1045        let ords = [SortOptions {
1046            descending: true,
1047            nulls_first: true,
1048        }];
1049        let res = bisect::<false>(&arrays, &search_tuple, &ords)?;
1050        assert_eq!(res, 1);
1051        let res = linear_search::<false>(&arrays, &search_tuple, &ords)?;
1052        assert_eq!(res, 1);
1053
1054        // Ascending, left
1055        let arrays: Vec<ArrayRef> =
1056            vec![Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 9., 10.]))];
1057        let search_tuple: Vec<ScalarValue> = vec![ScalarValue::Float64(Some(7.0))];
1058        let ords = [SortOptions {
1059            descending: false,
1060            nulls_first: true,
1061        }];
1062        let res = bisect::<true>(&arrays, &search_tuple, &ords)?;
1063        assert_eq!(res, 1);
1064        let res = linear_search::<true>(&arrays, &search_tuple, &ords)?;
1065        assert_eq!(res, 1);
1066
1067        // Ascending, right
1068        let arrays: Vec<ArrayRef> =
1069            vec![Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 9., 10.]))];
1070        let search_tuple: Vec<ScalarValue> = vec![ScalarValue::Float64(Some(7.0))];
1071        let ords = [SortOptions {
1072            descending: false,
1073            nulls_first: true,
1074        }];
1075        let res = bisect::<false>(&arrays, &search_tuple, &ords)?;
1076        assert_eq!(res, 2);
1077        let res = linear_search::<false>(&arrays, &search_tuple, &ords)?;
1078        assert_eq!(res, 2);
1079
1080        let arrays: Vec<ArrayRef> = vec![
1081            Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 8.0, 9., 10.])),
1082            Arc::new(Float64Array::from(vec![10.0, 9.0, 8.0, 7.5, 7., 6.])),
1083        ];
1084        let search_tuple: Vec<ScalarValue> = vec![
1085            ScalarValue::Float64(Some(8.0)),
1086            ScalarValue::Float64(Some(8.0)),
1087        ];
1088        let ords = [
1089            SortOptions {
1090                descending: false,
1091                nulls_first: true,
1092            },
1093            SortOptions {
1094                descending: true,
1095                nulls_first: true,
1096            },
1097        ];
1098        let res = bisect::<false>(&arrays, &search_tuple, &ords)?;
1099        assert_eq!(res, 3);
1100        let res = linear_search::<false>(&arrays, &search_tuple, &ords)?;
1101        assert_eq!(res, 3);
1102
1103        let res = bisect::<true>(&arrays, &search_tuple, &ords)?;
1104        assert_eq!(res, 2);
1105        let res = linear_search::<true>(&arrays, &search_tuple, &ords)?;
1106        assert_eq!(res, 2);
1107        Ok(())
1108    }
1109
1110    #[test]
1111    fn test_evaluate_partition_ranges() -> Result<()> {
1112        let arrays: Vec<ArrayRef> = vec![
1113            Arc::new(Float64Array::from(vec![1.0, 1.0, 1.0, 2.0, 2.0, 2.0])),
1114            Arc::new(Float64Array::from(vec![4.0, 4.0, 3.0, 2.0, 1.0, 1.0])),
1115        ];
1116        let n_row = arrays[0].len();
1117        let options: Vec<SortOptions> = vec![
1118            SortOptions {
1119                descending: false,
1120                nulls_first: false,
1121            },
1122            SortOptions {
1123                descending: true,
1124                nulls_first: false,
1125            },
1126        ];
1127        let sort_columns = arrays
1128            .into_iter()
1129            .zip(options)
1130            .map(|(values, options)| SortColumn {
1131                values,
1132                options: Some(options),
1133            })
1134            .collect::<Vec<_>>();
1135        let ranges = evaluate_partition_ranges(n_row, &sort_columns)?;
1136        assert_eq!(ranges.len(), 4);
1137        assert_eq!(ranges[0], Range { start: 0, end: 2 });
1138        assert_eq!(ranges[1], Range { start: 2, end: 3 });
1139        assert_eq!(ranges[2], Range { start: 3, end: 4 });
1140        assert_eq!(ranges[3], Range { start: 4, end: 6 });
1141        Ok(())
1142    }
1143
1144    #[cfg(feature = "sql")]
1145    #[test]
1146    fn test_quote_identifier() -> Result<()> {
1147        let cases = vec![
1148            ("foo", r#"foo"#),
1149            ("_foo", r#"_foo"#),
1150            ("foo_bar", r#"foo_bar"#),
1151            ("foo-bar", r#""foo-bar""#),
1152            // name itself has a period, needs to be quoted
1153            ("foo.bar", r#""foo.bar""#),
1154            ("Foo", r#""Foo""#),
1155            ("Foo.Bar", r#""Foo.Bar""#),
1156            // name starting with a number needs to be quoted
1157            ("test1", r#"test1"#),
1158            ("1test", r#""1test""#),
1159        ];
1160
1161        for (identifier, quoted_identifier) in cases {
1162            println!("input: \n{identifier}\nquoted_identifier:\n{quoted_identifier}");
1163
1164            assert_eq!(quote_identifier(identifier), quoted_identifier);
1165
1166            // When parsing the quoted identifier, it should be a
1167            // a single identifier without normalization, and not in multiple parts
1168            let quote_style = if quoted_identifier.starts_with('"') {
1169                Some('"')
1170            } else {
1171                None
1172            };
1173
1174            let expected_parsed = vec![Ident {
1175                value: identifier.to_string(),
1176                quote_style,
1177                span: Span::empty(),
1178            }];
1179
1180            assert_eq!(
1181                parse_identifiers(quoted_identifier).unwrap(),
1182                expected_parsed
1183            );
1184        }
1185
1186        Ok(())
1187    }
1188
1189    #[test]
1190    fn test_get_at_indices() -> Result<()> {
1191        let in_vec = vec![1, 2, 3, 4, 5, 6, 7];
1192        assert_eq!(get_at_indices(&in_vec, [0, 2])?, vec![1, 3]);
1193        assert_eq!(get_at_indices(&in_vec, [4, 2])?, vec![5, 3]);
1194        // 7 is outside the range
1195        assert!(get_at_indices(&in_vec, [7]).is_err());
1196        Ok(())
1197    }
1198
1199    #[test]
1200    fn test_longest_consecutive_prefix() {
1201        assert_eq!(longest_consecutive_prefix([0, 3, 4]), 1);
1202        assert_eq!(longest_consecutive_prefix([0, 1, 3, 4]), 2);
1203        assert_eq!(longest_consecutive_prefix([0, 1, 2, 3, 4]), 5);
1204        assert_eq!(longest_consecutive_prefix([1, 2, 3, 4]), 0);
1205    }
1206
1207    #[test]
1208    fn test_merge_and_order_indices() {
1209        assert_eq!(
1210            merge_and_order_indices([0, 3, 4], [1, 3, 5]),
1211            vec![0, 1, 3, 4, 5]
1212        );
1213        // Result should be ordered, even if inputs are not
1214        assert_eq!(
1215            merge_and_order_indices([3, 0, 4], [5, 1, 3]),
1216            vec![0, 1, 3, 4, 5]
1217        );
1218    }
1219
1220    #[test]
1221    fn test_set_difference() {
1222        assert_eq!(set_difference([0, 3, 4], [1, 2]), vec![0, 3, 4]);
1223        assert_eq!(set_difference([0, 3, 4], [1, 2, 4]), vec![0, 3]);
1224        // return value should have same ordering with the in1
1225        assert_eq!(set_difference([3, 4, 0], [1, 2, 4]), vec![3, 0]);
1226        assert_eq!(set_difference([0, 3, 4], [4, 1, 2]), vec![0, 3]);
1227        assert_eq!(set_difference([3, 4, 0], [4, 1, 2]), vec![3, 0]);
1228    }
1229
1230    #[test]
1231    fn test_find_indices() -> Result<()> {
1232        assert_eq!(find_indices(&[0, 3, 4], [0, 3, 4])?, vec![0, 1, 2]);
1233        assert_eq!(find_indices(&[0, 3, 4], [0, 4, 3])?, vec![0, 2, 1]);
1234        assert_eq!(find_indices(&[3, 0, 4], [0, 3])?, vec![1, 0]);
1235        assert!(find_indices(&[0, 3], [0, 3, 4]).is_err());
1236        assert!(find_indices(&[0, 3, 4], [0, 2]).is_err());
1237        Ok(())
1238    }
1239
1240    #[test]
1241    fn test_transpose() -> Result<()> {
1242        let in_data = vec![vec![1, 2, 3], vec![4, 5, 6]];
1243        let transposed = transpose(in_data);
1244        let expected = vec![vec![1, 4], vec![2, 5], vec![3, 6]];
1245        assert_eq!(expected, transposed);
1246        Ok(())
1247    }
1248}