datafusion_common/utils/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module provides the bisect function, which implements binary search.
19
20pub mod expr;
21pub mod memory;
22pub mod proxy;
23pub mod string_utils;
24
25use crate::assert_or_internal_err;
26use crate::error::{_exec_datafusion_err, _internal_datafusion_err};
27use crate::{Result, ScalarValue};
28use arrow::array::{
29    Array, ArrayRef, FixedSizeListArray, LargeListArray, ListArray, OffsetSizeTrait,
30    cast::AsArray,
31};
32use arrow::buffer::OffsetBuffer;
33use arrow::compute::{SortColumn, SortOptions, partition};
34use arrow::datatypes::{DataType, Field, SchemaRef};
35#[cfg(feature = "sql")]
36use sqlparser::{ast::Ident, dialect::GenericDialect, parser::Parser};
37use std::borrow::{Borrow, Cow};
38use std::cmp::{Ordering, min};
39use std::collections::HashSet;
40use std::num::NonZero;
41use std::ops::Range;
42use std::sync::Arc;
43use std::thread::available_parallelism;
44
45/// Applies an optional projection to a [`SchemaRef`], returning the
46/// projected schema
47///
48/// Example:
49/// ```
50/// use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
51/// use datafusion_common::project_schema;
52///
53/// // Schema with columns 'a', 'b', and 'c'
54/// let schema = SchemaRef::new(Schema::new(vec![
55///     Field::new("a", DataType::Int32, true),
56///     Field::new("b", DataType::Int64, true),
57///     Field::new("c", DataType::Utf8, true),
58/// ]));
59///
60/// // Pick columns 'c' and 'b'
61/// let projection = Some(vec![2, 1]);
62/// let projected_schema = project_schema(&schema, projection.as_ref()).unwrap();
63///
64/// let expected_schema = SchemaRef::new(Schema::new(vec![
65///     Field::new("c", DataType::Utf8, true),
66///     Field::new("b", DataType::Int64, true),
67/// ]));
68///
69/// assert_eq!(projected_schema, expected_schema);
70/// ```
71pub fn project_schema(
72    schema: &SchemaRef,
73    projection: Option<&Vec<usize>>,
74) -> Result<SchemaRef> {
75    let schema = match projection {
76        Some(columns) => Arc::new(schema.project(columns)?),
77        None => Arc::clone(schema),
78    };
79    Ok(schema)
80}
81
82/// Extracts a row at the specified index from a set of columns and stores it in the provided buffer.
83pub fn extract_row_at_idx_to_buf(
84    columns: &[ArrayRef],
85    idx: usize,
86    buf: &mut Vec<ScalarValue>,
87) -> Result<()> {
88    buf.clear();
89
90    let iter = columns
91        .iter()
92        .map(|arr| ScalarValue::try_from_array(arr, idx));
93    for v in iter.into_iter() {
94        buf.push(v?);
95    }
96
97    Ok(())
98}
99/// Given column vectors, returns row at `idx`.
100pub fn get_row_at_idx(columns: &[ArrayRef], idx: usize) -> Result<Vec<ScalarValue>> {
101    columns
102        .iter()
103        .map(|arr| ScalarValue::try_from_array(arr, idx))
104        .collect()
105}
106
107/// This function compares two tuples depending on the given sort options.
108pub fn compare_rows(
109    x: &[ScalarValue],
110    y: &[ScalarValue],
111    sort_options: &[SortOptions],
112) -> Result<Ordering> {
113    let zip_it = x.iter().zip(y.iter()).zip(sort_options.iter());
114    // Preserving lexical ordering.
115    for ((lhs, rhs), sort_options) in zip_it {
116        // Consider all combinations of NULLS FIRST/LAST and ASC/DESC configurations.
117        let result = match (lhs.is_null(), rhs.is_null(), sort_options.nulls_first) {
118            (true, false, false) | (false, true, true) => Ordering::Greater,
119            (true, false, true) | (false, true, false) => Ordering::Less,
120            (false, false, _) => {
121                if sort_options.descending {
122                    rhs.try_cmp(lhs)?
123                } else {
124                    lhs.try_cmp(rhs)?
125                }
126            }
127            (true, true, _) => continue,
128        };
129        if result != Ordering::Equal {
130            return Ok(result);
131        }
132    }
133    Ok(Ordering::Equal)
134}
135
136/// This function searches for a tuple of given values (`target`) among the given
137/// rows (`item_columns`) using the bisection algorithm. It assumes that `item_columns`
138/// is sorted according to `sort_options` and returns the insertion index of `target`.
139/// Template argument `SIDE` being `true`/`false` means left/right insertion.
140pub fn bisect<const SIDE: bool>(
141    item_columns: &[ArrayRef],
142    target: &[ScalarValue],
143    sort_options: &[SortOptions],
144) -> Result<usize> {
145    let low: usize = 0;
146    let high: usize = item_columns
147        .first()
148        .ok_or_else(|| _internal_datafusion_err!("Column array shouldn't be empty"))?
149        .len();
150    let compare_fn = |current: &[ScalarValue], target: &[ScalarValue]| {
151        let cmp = compare_rows(current, target, sort_options)?;
152        Ok(if SIDE { cmp.is_lt() } else { cmp.is_le() })
153    };
154    find_bisect_point(item_columns, target, compare_fn, low, high)
155}
156
157/// This function searches for a tuple of given values (`target`) among a slice of
158/// the given rows (`item_columns`) using the bisection algorithm. The slice starts
159/// at the index `low` and ends at the index `high`. The boolean-valued function
160/// `compare_fn` specifies whether we bisect on the left (by returning `false`),
161/// or on the right (by returning `true`) when we compare the target value with
162/// the current value as we iteratively bisect the input.
163pub fn find_bisect_point<F>(
164    item_columns: &[ArrayRef],
165    target: &[ScalarValue],
166    compare_fn: F,
167    mut low: usize,
168    mut high: usize,
169) -> Result<usize>
170where
171    F: Fn(&[ScalarValue], &[ScalarValue]) -> Result<bool>,
172{
173    while low < high {
174        let mid = ((high - low) / 2) + low;
175        let val = get_row_at_idx(item_columns, mid)?;
176        if compare_fn(&val, target)? {
177            low = mid + 1;
178        } else {
179            high = mid;
180        }
181    }
182    Ok(low)
183}
184
185/// This function searches for a tuple of given values (`target`) among the given
186/// rows (`item_columns`) via a linear scan. It assumes that `item_columns` is sorted
187/// according to `sort_options` and returns the insertion index of `target`.
188/// Template argument `SIDE` being `true`/`false` means left/right insertion.
189pub fn linear_search<const SIDE: bool>(
190    item_columns: &[ArrayRef],
191    target: &[ScalarValue],
192    sort_options: &[SortOptions],
193) -> Result<usize> {
194    let low: usize = 0;
195    let high: usize = item_columns
196        .first()
197        .ok_or_else(|| _internal_datafusion_err!("Column array shouldn't be empty"))?
198        .len();
199    let compare_fn = |current: &[ScalarValue], target: &[ScalarValue]| {
200        let cmp = compare_rows(current, target, sort_options)?;
201        Ok(if SIDE { cmp.is_lt() } else { cmp.is_le() })
202    };
203    search_in_slice(item_columns, target, compare_fn, low, high)
204}
205
206/// This function searches for a tuple of given values (`target`) among a slice of
207/// the given rows (`item_columns`) via a linear scan. The slice starts at the index
208/// `low` and ends at the index `high`. The boolean-valued function `compare_fn`
209/// specifies the stopping criterion.
210pub fn search_in_slice<F>(
211    item_columns: &[ArrayRef],
212    target: &[ScalarValue],
213    compare_fn: F,
214    mut low: usize,
215    high: usize,
216) -> Result<usize>
217where
218    F: Fn(&[ScalarValue], &[ScalarValue]) -> Result<bool>,
219{
220    while low < high {
221        let val = get_row_at_idx(item_columns, low)?;
222        if !compare_fn(&val, target)? {
223            break;
224        }
225        low += 1;
226    }
227    Ok(low)
228}
229
230/// Given a list of 0 or more already sorted columns, finds the
231/// partition ranges that would partition equally across columns.
232///
233/// See [`partition`] for more details.
234pub fn evaluate_partition_ranges(
235    num_rows: usize,
236    partition_columns: &[SortColumn],
237) -> Result<Vec<Range<usize>>> {
238    Ok(if partition_columns.is_empty() {
239        vec![Range {
240            start: 0,
241            end: num_rows,
242        }]
243    } else {
244        let cols: Vec<_> = partition_columns
245            .iter()
246            .map(|x| Arc::clone(&x.values))
247            .collect();
248        partition(&cols)?.ranges()
249    })
250}
251
252/// Wraps identifier string in double quotes, escaping any double quotes in
253/// the identifier by replacing it with two double quotes
254///
255/// e.g. identifier `tab.le"name` becomes `"tab.le""name"`
256pub fn quote_identifier(s: &str) -> Cow<'_, str> {
257    if needs_quotes(s) {
258        Cow::Owned(format!("\"{}\"", s.replace('"', "\"\"")))
259    } else {
260        Cow::Borrowed(s)
261    }
262}
263
264/// returns true if this identifier needs quotes
265fn needs_quotes(s: &str) -> bool {
266    let mut chars = s.chars();
267
268    // first char can not be a number unless escaped
269    if let Some(first_char) = chars.next()
270        && !(first_char.is_ascii_lowercase() || first_char == '_')
271    {
272        return true;
273    }
274
275    !chars.all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_')
276}
277
278#[cfg(feature = "sql")]
279pub(crate) fn parse_identifiers(s: &str) -> Result<Vec<Ident>> {
280    let dialect = GenericDialect;
281    let mut parser = Parser::new(&dialect).try_with_sql(s)?;
282    let idents = parser.parse_multipart_identifier()?;
283    Ok(idents)
284}
285
286/// Parse a string into a vector of identifiers.
287///
288/// Note: If ignore_case is false, the string will be normalized to lowercase.
289#[cfg(feature = "sql")]
290pub(crate) fn parse_identifiers_normalized(s: &str, ignore_case: bool) -> Vec<String> {
291    parse_identifiers(s)
292        .unwrap_or_default()
293        .into_iter()
294        .map(|id| match id.quote_style {
295            Some(_) => id.value,
296            None if ignore_case => id.value,
297            _ => id.value.to_ascii_lowercase(),
298        })
299        .collect::<Vec<_>>()
300}
301
302#[cfg(not(feature = "sql"))]
303pub(crate) fn parse_identifiers(s: &str) -> Result<Vec<String>> {
304    let mut result = Vec::new();
305    let mut current = String::new();
306    let mut in_quotes = false;
307
308    for ch in s.chars() {
309        match ch {
310            '"' => {
311                in_quotes = !in_quotes;
312                current.push(ch);
313            }
314            '.' if !in_quotes => {
315                result.push(current.clone());
316                current.clear();
317            }
318            _ => {
319                current.push(ch);
320            }
321        }
322    }
323
324    // Push the last part if it's not empty
325    if !current.is_empty() {
326        result.push(current);
327    }
328
329    Ok(result)
330}
331
332#[cfg(not(feature = "sql"))]
333pub(crate) fn parse_identifiers_normalized(s: &str, ignore_case: bool) -> Vec<String> {
334    parse_identifiers(s)
335        .unwrap_or_default()
336        .into_iter()
337        .map(|id| {
338            let is_double_quoted = if id.len() > 2 {
339                let mut chars = id.chars();
340                chars.next() == Some('"') && chars.last() == Some('"')
341            } else {
342                false
343            };
344            if is_double_quoted {
345                id[1..id.len() - 1].to_string().replace("\"\"", "\"")
346            } else if ignore_case {
347                id
348            } else {
349                id.to_ascii_lowercase()
350            }
351        })
352        .collect::<Vec<_>>()
353}
354
355/// This function "takes" the elements at `indices` from the slice `items`.
356pub fn get_at_indices<T: Clone, I: Borrow<usize>>(
357    items: &[T],
358    indices: impl IntoIterator<Item = I>,
359) -> Result<Vec<T>> {
360    indices
361        .into_iter()
362        .map(|idx| items.get(*idx.borrow()).cloned())
363        .collect::<Option<Vec<T>>>()
364        .ok_or_else(|| {
365            _exec_datafusion_err!("Expects indices to be in the range of searched vector")
366        })
367}
368
369/// This function finds the longest prefix of the form 0, 1, 2, ... within the
370/// collection `sequence`. Examples:
371/// - For 0, 1, 2, 4, 5; we would produce 3, meaning 0, 1, 2 is the longest satisfying
372///   prefix.
373/// - For 1, 2, 3, 4; we would produce 0, meaning there is no such prefix.
374pub fn longest_consecutive_prefix<T: Borrow<usize>>(
375    sequence: impl IntoIterator<Item = T>,
376) -> usize {
377    let mut count = 0;
378    for item in sequence {
379        if !count.eq(item.borrow()) {
380            break;
381        }
382        count += 1;
383    }
384    count
385}
386
387/// Creates single element [`ListArray`], [`LargeListArray`] and
388/// [`FixedSizeListArray`] from other arrays
389///
390/// For example this builder can convert `[1, 2, 3]` into `[[1, 2, 3]]`
391///
392/// # Example
393/// ```
394/// # use std::sync::Arc;
395/// # use arrow::array::{Array, ListArray};
396/// # use arrow::array::types::Int64Type;
397/// # use datafusion_common::utils::SingleRowListArrayBuilder;
398/// // Array is [1, 2, 3]
399/// let arr = ListArray::from_iter_primitive::<Int64Type, _, _>(vec![Some(vec![
400///     Some(1),
401///     Some(2),
402///     Some(3),
403/// ])]);
404/// // Wrap as a list array: [[1, 2, 3]]
405/// let list_arr = SingleRowListArrayBuilder::new(Arc::new(arr)).build_list_array();
406/// assert_eq!(list_arr.len(), 1);
407/// ```
408#[derive(Debug, Clone)]
409pub struct SingleRowListArrayBuilder {
410    /// array to be wrapped
411    arr: ArrayRef,
412    /// Should the resulting array be nullable? Defaults to `true`.
413    nullable: bool,
414    /// Specify the field name for the resulting array. Defaults to value used in
415    /// [`Field::new_list_field`]
416    field_name: Option<String>,
417}
418
419impl SingleRowListArrayBuilder {
420    /// Create a new instance of [`SingleRowListArrayBuilder`]
421    pub fn new(arr: ArrayRef) -> Self {
422        Self {
423            arr,
424            nullable: true,
425            field_name: None,
426        }
427    }
428
429    /// Set the nullable flag
430    pub fn with_nullable(mut self, nullable: bool) -> Self {
431        self.nullable = nullable;
432        self
433    }
434
435    /// sets the field name for the resulting array
436    pub fn with_field_name(mut self, field_name: Option<String>) -> Self {
437        self.field_name = field_name;
438        self
439    }
440
441    /// Copies field name and nullable from the specified field
442    pub fn with_field(self, field: &Field) -> Self {
443        self.with_field_name(Some(field.name().to_owned()))
444            .with_nullable(field.is_nullable())
445    }
446
447    /// Build a single element [`ListArray`]
448    pub fn build_list_array(self) -> ListArray {
449        let (field, arr) = self.into_field_and_arr();
450        let offsets = OffsetBuffer::from_lengths([arr.len()]);
451        ListArray::new(field, offsets, arr, None)
452    }
453
454    /// Build a single element [`ListArray`] and wrap as [`ScalarValue::List`]
455    pub fn build_list_scalar(self) -> ScalarValue {
456        ScalarValue::List(Arc::new(self.build_list_array()))
457    }
458
459    /// Build a single element [`LargeListArray`]
460    pub fn build_large_list_array(self) -> LargeListArray {
461        let (field, arr) = self.into_field_and_arr();
462        let offsets = OffsetBuffer::from_lengths([arr.len()]);
463        LargeListArray::new(field, offsets, arr, None)
464    }
465
466    /// Build a single element [`LargeListArray`] and wrap as [`ScalarValue::LargeList`]
467    pub fn build_large_list_scalar(self) -> ScalarValue {
468        ScalarValue::LargeList(Arc::new(self.build_large_list_array()))
469    }
470
471    /// Build a single element [`FixedSizeListArray`]
472    pub fn build_fixed_size_list_array(self, list_size: usize) -> FixedSizeListArray {
473        let (field, arr) = self.into_field_and_arr();
474        FixedSizeListArray::new(field, list_size as i32, arr, None)
475    }
476
477    /// Build a single element [`FixedSizeListArray`] and wrap as [`ScalarValue::FixedSizeList`]
478    pub fn build_fixed_size_list_scalar(self, list_size: usize) -> ScalarValue {
479        ScalarValue::FixedSizeList(Arc::new(self.build_fixed_size_list_array(list_size)))
480    }
481
482    /// Helper function: convert this builder into a tuple of field and array
483    fn into_field_and_arr(self) -> (Arc<Field>, ArrayRef) {
484        let Self {
485            arr,
486            nullable,
487            field_name,
488        } = self;
489        let data_type = arr.data_type().to_owned();
490        let field = match field_name {
491            Some(name) => Field::new(name, data_type, nullable),
492            None => Field::new_list_field(data_type, nullable),
493        };
494        (Arc::new(field), arr)
495    }
496}
497
498/// Wrap arrays into a single element `ListArray`.
499///
500/// Example:
501/// ```
502/// use arrow::array::{Int32Array, ListArray, ArrayRef};
503/// use arrow::datatypes::{Int32Type, Field};
504/// use std::sync::Arc;
505///
506/// let arr1 = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
507/// let arr2 = Arc::new(Int32Array::from(vec![4, 5, 6])) as ArrayRef;
508///
509/// let list_arr = datafusion_common::utils::arrays_into_list_array([arr1, arr2]).unwrap();
510///
511/// let expected = ListArray::from_iter_primitive::<Int32Type, _, _>(
512///    vec![
513///     Some(vec![Some(1), Some(2), Some(3)]),
514///     Some(vec![Some(4), Some(5), Some(6)]),
515///    ]
516/// );
517///
518/// assert_eq!(list_arr, expected);
519pub fn arrays_into_list_array(
520    arr: impl IntoIterator<Item = ArrayRef>,
521) -> Result<ListArray> {
522    let arr = arr.into_iter().collect::<Vec<_>>();
523    assert_or_internal_err!(!arr.is_empty(), "Cannot wrap empty array into list array");
524
525    let lens = arr.iter().map(|x| x.len()).collect::<Vec<_>>();
526    // Assume data type is consistent
527    let data_type = arr[0].data_type().to_owned();
528    let values = arr.iter().map(|x| x.as_ref()).collect::<Vec<_>>();
529    Ok(ListArray::new(
530        Arc::new(Field::new_list_field(data_type, true)),
531        OffsetBuffer::from_lengths(lens),
532        arrow::compute::concat(values.as_slice())?,
533        None,
534    ))
535}
536
537/// Helper function to convert a ListArray into a vector of ArrayRefs.
538pub fn list_to_arrays<O: OffsetSizeTrait>(a: &ArrayRef) -> Vec<ArrayRef> {
539    a.as_list::<O>().iter().flatten().collect::<Vec<_>>()
540}
541
542/// Helper function to convert a FixedSizeListArray into a vector of ArrayRefs.
543pub fn fixed_size_list_to_arrays(a: &ArrayRef) -> Vec<ArrayRef> {
544    a.as_fixed_size_list().iter().flatten().collect::<Vec<_>>()
545}
546
547/// Get the base type of a data type.
548///
549/// Example
550/// ```
551/// use arrow::datatypes::{DataType, Field};
552/// use datafusion_common::utils::base_type;
553/// use std::sync::Arc;
554///
555/// let data_type =
556///     DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
557/// assert_eq!(base_type(&data_type), DataType::Int32);
558///
559/// let data_type = DataType::Int32;
560/// assert_eq!(base_type(&data_type), DataType::Int32);
561/// ```
562pub fn base_type(data_type: &DataType) -> DataType {
563    match data_type {
564        DataType::List(field)
565        | DataType::LargeList(field)
566        | DataType::FixedSizeList(field, _) => base_type(field.data_type()),
567        _ => data_type.to_owned(),
568    }
569}
570
571/// Information about how to coerce lists.
572#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
573pub enum ListCoercion {
574    /// [`DataType::FixedSizeList`] should be coerced to [`DataType::List`].
575    FixedSizedListToList,
576}
577
578/// A helper function to coerce base type in List.
579///
580/// Example
581/// ```
582/// use arrow::datatypes::{DataType, Field};
583/// use datafusion_common::utils::coerced_type_with_base_type_only;
584/// use std::sync::Arc;
585///
586/// let data_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
587/// let base_type = DataType::Float64;
588/// let coerced_type = coerced_type_with_base_type_only(&data_type, &base_type, None);
589/// assert_eq!(coerced_type, DataType::List(Arc::new(Field::new_list_field(DataType::Float64, true))));
590pub fn coerced_type_with_base_type_only(
591    data_type: &DataType,
592    base_type: &DataType,
593    array_coercion: Option<&ListCoercion>,
594) -> DataType {
595    match (data_type, array_coercion) {
596        (DataType::List(field), _)
597        | (DataType::FixedSizeList(field, _), Some(ListCoercion::FixedSizedListToList)) =>
598        {
599            let field_type = coerced_type_with_base_type_only(
600                field.data_type(),
601                base_type,
602                array_coercion,
603            );
604
605            DataType::List(Arc::new(Field::new(
606                field.name(),
607                field_type,
608                field.is_nullable(),
609            )))
610        }
611        (DataType::FixedSizeList(field, len), _) => {
612            let field_type = coerced_type_with_base_type_only(
613                field.data_type(),
614                base_type,
615                array_coercion,
616            );
617
618            DataType::FixedSizeList(
619                Arc::new(Field::new(field.name(), field_type, field.is_nullable())),
620                *len,
621            )
622        }
623        (DataType::LargeList(field), _) => {
624            let field_type = coerced_type_with_base_type_only(
625                field.data_type(),
626                base_type,
627                array_coercion,
628            );
629
630            DataType::LargeList(Arc::new(Field::new(
631                field.name(),
632                field_type,
633                field.is_nullable(),
634            )))
635        }
636
637        _ => base_type.clone(),
638    }
639}
640
641/// Recursively coerce and `FixedSizeList` elements to `List`
642pub fn coerced_fixed_size_list_to_list(data_type: &DataType) -> DataType {
643    match data_type {
644        DataType::List(field) | DataType::FixedSizeList(field, _) => {
645            let field_type = coerced_fixed_size_list_to_list(field.data_type());
646
647            DataType::List(Arc::new(Field::new(
648                field.name(),
649                field_type,
650                field.is_nullable(),
651            )))
652        }
653        DataType::LargeList(field) => {
654            let field_type = coerced_fixed_size_list_to_list(field.data_type());
655
656            DataType::LargeList(Arc::new(Field::new(
657                field.name(),
658                field_type,
659                field.is_nullable(),
660            )))
661        }
662
663        _ => data_type.clone(),
664    }
665}
666
667/// Compute the number of dimensions in a list data type.
668pub fn list_ndims(data_type: &DataType) -> u64 {
669    match data_type {
670        DataType::List(field)
671        | DataType::LargeList(field)
672        | DataType::FixedSizeList(field, _) => 1 + list_ndims(field.data_type()),
673        _ => 0,
674    }
675}
676
677/// Adopted from strsim-rs for string similarity metrics
678pub mod datafusion_strsim {
679    // Source: https://github.com/dguo/strsim-rs/blob/master/src/lib.rs
680    // License: https://github.com/dguo/strsim-rs/blob/master/LICENSE
681    use std::cmp::min;
682    use std::str::Chars;
683
684    struct StringWrapper<'a>(&'a str);
685
686    impl<'b> IntoIterator for &StringWrapper<'b> {
687        type Item = char;
688        type IntoIter = Chars<'b>;
689
690        fn into_iter(self) -> Self::IntoIter {
691            self.0.chars()
692        }
693    }
694
695    /// Calculates the minimum number of insertions, deletions, and substitutions
696    /// required to change one sequence into the other, using a reusable cache buffer.
697    ///
698    /// This is the generic implementation that works with any iterator types.
699    /// The `cache` buffer will be resized as needed and reused across calls.
700    fn generic_levenshtein_with_buffer<'a, 'b, Iter1, Iter2, Elem1, Elem2>(
701        a: &'a Iter1,
702        b: &'b Iter2,
703        cache: &mut Vec<usize>,
704    ) -> usize
705    where
706        &'a Iter1: IntoIterator<Item = Elem1>,
707        &'b Iter2: IntoIterator<Item = Elem2>,
708        Elem1: PartialEq<Elem2>,
709    {
710        let b_len = b.into_iter().count();
711
712        if a.into_iter().next().is_none() {
713            return b_len;
714        }
715
716        // Resize cache to fit b_len elements
717        cache.clear();
718        cache.extend(1..=b_len);
719
720        let mut result = 0;
721
722        for (i, a_elem) in a.into_iter().enumerate() {
723            result = i + 1;
724            let mut distance_b = i;
725
726            for (j, b_elem) in b.into_iter().enumerate() {
727                let cost = if a_elem == b_elem { 0usize } else { 1usize };
728                let distance_a = distance_b + cost;
729                distance_b = cache[j];
730                result = min(result + 1, min(distance_a, distance_b + 1));
731                cache[j] = result;
732            }
733        }
734
735        result
736    }
737
738    /// Calculates the minimum number of insertions, deletions, and substitutions
739    /// required to change one sequence into the other.
740    fn generic_levenshtein<'a, 'b, Iter1, Iter2, Elem1, Elem2>(
741        a: &'a Iter1,
742        b: &'b Iter2,
743    ) -> usize
744    where
745        &'a Iter1: IntoIterator<Item = Elem1>,
746        &'b Iter2: IntoIterator<Item = Elem2>,
747        Elem1: PartialEq<Elem2>,
748    {
749        let mut cache = Vec::new();
750        generic_levenshtein_with_buffer(a, b, &mut cache)
751    }
752
753    /// Calculates the minimum number of insertions, deletions, and substitutions
754    /// required to change one string into the other.
755    ///
756    /// ```
757    /// use datafusion_common::utils::datafusion_strsim::levenshtein;
758    ///
759    /// assert_eq!(3, levenshtein("kitten", "sitting"));
760    /// ```
761    pub fn levenshtein(a: &str, b: &str) -> usize {
762        generic_levenshtein(&StringWrapper(a), &StringWrapper(b))
763    }
764
765    /// Calculates the Levenshtein distance using a reusable cache buffer.
766    /// This avoids allocating a new Vec for each call, improving performance
767    /// when computing many distances.
768    ///
769    /// The `cache` buffer will be resized as needed and reused across calls.
770    pub fn levenshtein_with_buffer(a: &str, b: &str, cache: &mut Vec<usize>) -> usize {
771        generic_levenshtein_with_buffer(&StringWrapper(a), &StringWrapper(b), cache)
772    }
773
774    /// Calculates the normalized Levenshtein distance between two strings.
775    /// The normalized distance is a value between 0.0 and 1.0, where 1.0 indicates
776    /// that the strings are identical and 0.0 indicates no similarity.
777    ///
778    /// ```
779    /// use datafusion_common::utils::datafusion_strsim::normalized_levenshtein;
780    ///
781    /// assert!((normalized_levenshtein("kitten", "sitting") - 0.57142).abs() < 0.00001);
782    ///
783    /// assert!(normalized_levenshtein("", "second").abs() < 0.00001);
784    ///
785    /// assert!((normalized_levenshtein("kitten", "sitten") - 0.833).abs() < 0.001);
786    /// ```
787    pub fn normalized_levenshtein(a: &str, b: &str) -> f64 {
788        if a.is_empty() && b.is_empty() {
789            return 1.0;
790        }
791        1.0 - (levenshtein(a, b) as f64)
792            / (a.chars().count().max(b.chars().count()) as f64)
793    }
794}
795
796/// Merges collections `first` and `second`, removes duplicates and sorts the
797/// result, returning it as a [`Vec`].
798pub fn merge_and_order_indices<T: Borrow<usize>, S: Borrow<usize>>(
799    first: impl IntoIterator<Item = T>,
800    second: impl IntoIterator<Item = S>,
801) -> Vec<usize> {
802    let mut result: Vec<_> = first
803        .into_iter()
804        .map(|e| *e.borrow())
805        .chain(second.into_iter().map(|e| *e.borrow()))
806        .collect::<HashSet<_>>()
807        .into_iter()
808        .collect();
809    result.sort();
810    result
811}
812
813/// Calculates the set difference between sequences `first` and `second`,
814/// returning the result as a [`Vec`]. Preserves the ordering of `first`.
815pub fn set_difference<T: Borrow<usize>, S: Borrow<usize>>(
816    first: impl IntoIterator<Item = T>,
817    second: impl IntoIterator<Item = S>,
818) -> Vec<usize> {
819    let set: HashSet<_> = second.into_iter().map(|e| *e.borrow()).collect();
820    first
821        .into_iter()
822        .map(|e| *e.borrow())
823        .filter(|e| !set.contains(e))
824        .collect()
825}
826
827/// Find indices of each element in `targets` inside `items`. If one of the
828/// elements is absent in `items`, returns an error.
829pub fn find_indices<T: PartialEq, S: Borrow<T>>(
830    items: &[T],
831    targets: impl IntoIterator<Item = S>,
832) -> Result<Vec<usize>> {
833    targets
834        .into_iter()
835        .map(|target| items.iter().position(|e| target.borrow().eq(e)))
836        .collect::<Option<_>>()
837        .ok_or_else(|| _exec_datafusion_err!("Target not found"))
838}
839
840/// Transposes the given vector of vectors.
841pub fn transpose<T>(original: Vec<Vec<T>>) -> Vec<Vec<T>> {
842    match original.as_slice() {
843        [] => vec![],
844        [first, ..] => {
845            let mut result = (0..first.len()).map(|_| vec![]).collect::<Vec<_>>();
846            for row in original {
847                for (item, transposed_row) in row.into_iter().zip(&mut result) {
848                    transposed_row.push(item);
849                }
850            }
851            result
852        }
853    }
854}
855
856/// Computes the `skip` and `fetch` parameters of a single limit that would be
857/// equivalent to two consecutive limits with the given `skip`/`fetch` parameters.
858///
859/// There are multiple cases to consider:
860///
861/// # Case 0: Parent and child are disjoint (`child_fetch <= skip`).
862///
863/// ```text
864///   Before merging:
865///                     |........skip........|---fetch-->|     Parent limit
866///    |...child_skip...|---child_fetch-->|                    Child limit
867/// ```
868///
869///   After merging:
870/// ```text
871///    |.........(child_skip + skip).........|
872/// ```
873///
874/// # Case 1: Parent is beyond child's range (`skip < child_fetch <= skip + fetch`).
875///
876///   Before merging:
877/// ```text
878///                     |...skip...|------------fetch------------>|   Parent limit
879///    |...child_skip...|-------------child_fetch------------>|       Child limit
880/// ```
881///
882///   After merging:
883/// ```text
884///    |....(child_skip + skip)....|---(child_fetch - skip)-->|
885/// ```
886///
887///  # Case 2: Parent is within child's range (`skip + fetch < child_fetch`).
888///
889///   Before merging:
890/// ```text
891///                     |...skip...|---fetch-->|                   Parent limit
892///    |...child_skip...|-------------child_fetch------------>|    Child limit
893/// ```
894///
895///   After merging:
896/// ```text
897///    |....(child_skip + skip)....|---fetch-->|
898/// ```
899pub fn combine_limit(
900    parent_skip: usize,
901    parent_fetch: Option<usize>,
902    child_skip: usize,
903    child_fetch: Option<usize>,
904) -> (usize, Option<usize>) {
905    let combined_skip = child_skip.saturating_add(parent_skip);
906
907    let combined_fetch = match (parent_fetch, child_fetch) {
908        (Some(parent_fetch), Some(child_fetch)) => {
909            Some(min(parent_fetch, child_fetch.saturating_sub(parent_skip)))
910        }
911        (Some(parent_fetch), None) => Some(parent_fetch),
912        (None, Some(child_fetch)) => Some(child_fetch.saturating_sub(parent_skip)),
913        (None, None) => None,
914    };
915
916    (combined_skip, combined_fetch)
917}
918
919/// Returns the estimated number of threads available for parallel execution.
920///
921/// This is a wrapper around `std::thread::available_parallelism`, providing a default value
922/// of `1` if the system's parallelism cannot be determined.
923pub fn get_available_parallelism() -> usize {
924    available_parallelism()
925        .unwrap_or(NonZero::new(1).expect("literal value `1` shouldn't be zero"))
926        .get()
927}
928
929/// Converts a collection of function arguments into a fixed-size array of length N
930/// producing a reasonable error message in case of unexpected number of arguments.
931///
932/// # Example
933/// ```
934/// # use datafusion_common::Result;
935/// # use datafusion_common::utils::take_function_args;
936/// # use datafusion_common::ScalarValue;
937/// fn my_function(args: &[ScalarValue]) -> Result<()> {
938///     // function expects 2 args, so create a 2-element array
939///     let [arg1, arg2] = take_function_args("my_function", args)?;
940///     // ... do stuff..
941///     Ok(())
942/// }
943///
944/// // Calling the function with 1 argument produces an error:
945/// let args = vec![ScalarValue::Int32(Some(10))];
946/// let err = my_function(&args).unwrap_err();
947/// assert_eq!(
948///     err.to_string(),
949///     "Execution error: my_function function requires 2 arguments, got 1"
950/// );
951/// // Calling the function with 2 arguments works great
952/// let args = vec![ScalarValue::Int32(Some(10)), ScalarValue::Int32(Some(20))];
953/// my_function(&args).unwrap();
954/// ```
955pub fn take_function_args<const N: usize, T>(
956    function_name: &str,
957    args: impl IntoIterator<Item = T>,
958) -> Result<[T; N]> {
959    let args = args.into_iter().collect::<Vec<_>>();
960    args.try_into().map_err(|v: Vec<T>| {
961        _exec_datafusion_err!(
962            "{} function requires {} {}, got {}",
963            function_name,
964            N,
965            if N == 1 { "argument" } else { "arguments" },
966            v.len()
967        )
968    })
969}
970
971#[cfg(test)]
972mod tests {
973    use super::*;
974    use crate::ScalarValue::Null;
975    use arrow::array::Float64Array;
976
977    #[test]
978    fn test_bisect_linear_left_and_right() -> Result<()> {
979        let arrays: Vec<ArrayRef> = vec![
980            Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 9., 10.])),
981            Arc::new(Float64Array::from(vec![2.0, 3.0, 3.0, 4.0, 5.0])),
982            Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 10., 11.0])),
983            Arc::new(Float64Array::from(vec![15.0, 13.0, 8.0, 5., 0.0])),
984        ];
985        let search_tuple: Vec<ScalarValue> = vec![
986            ScalarValue::Float64(Some(8.0)),
987            ScalarValue::Float64(Some(3.0)),
988            ScalarValue::Float64(Some(8.0)),
989            ScalarValue::Float64(Some(8.0)),
990        ];
991        let ords = [
992            SortOptions {
993                descending: false,
994                nulls_first: true,
995            },
996            SortOptions {
997                descending: false,
998                nulls_first: true,
999            },
1000            SortOptions {
1001                descending: false,
1002                nulls_first: true,
1003            },
1004            SortOptions {
1005                descending: true,
1006                nulls_first: true,
1007            },
1008        ];
1009        let res = bisect::<true>(&arrays, &search_tuple, &ords)?;
1010        assert_eq!(res, 2);
1011        let res = bisect::<false>(&arrays, &search_tuple, &ords)?;
1012        assert_eq!(res, 3);
1013        let res = linear_search::<true>(&arrays, &search_tuple, &ords)?;
1014        assert_eq!(res, 2);
1015        let res = linear_search::<false>(&arrays, &search_tuple, &ords)?;
1016        assert_eq!(res, 3);
1017        Ok(())
1018    }
1019
1020    #[test]
1021    fn vector_ord() {
1022        assert!(vec![1, 0, 0, 0, 0, 0, 0, 1] < vec![1, 0, 0, 0, 0, 0, 0, 2]);
1023        assert!(vec![1, 0, 0, 0, 0, 0, 1, 1] > vec![1, 0, 0, 0, 0, 0, 0, 2]);
1024        assert!(
1025            vec![
1026                ScalarValue::Int32(Some(2)),
1027                Null,
1028                ScalarValue::Int32(Some(0)),
1029            ] < vec![
1030                ScalarValue::Int32(Some(2)),
1031                Null,
1032                ScalarValue::Int32(Some(1)),
1033            ]
1034        );
1035        assert!(
1036            vec![
1037                ScalarValue::Int32(Some(2)),
1038                ScalarValue::Int32(None),
1039                ScalarValue::Int32(Some(0)),
1040            ] < vec![
1041                ScalarValue::Int32(Some(2)),
1042                ScalarValue::Int32(None),
1043                ScalarValue::Int32(Some(1)),
1044            ]
1045        );
1046    }
1047
1048    #[test]
1049    fn ord_same_type() {
1050        assert!((ScalarValue::Int32(Some(2)) < ScalarValue::Int32(Some(3))));
1051    }
1052
1053    #[test]
1054    fn test_bisect_linear_left_and_right_diff_sort() -> Result<()> {
1055        // Descending, left
1056        let arrays: Vec<ArrayRef> =
1057            vec![Arc::new(Float64Array::from(vec![4.0, 3.0, 2.0, 1.0, 0.0]))];
1058        let search_tuple: Vec<ScalarValue> = vec![ScalarValue::Float64(Some(4.0))];
1059        let ords = [SortOptions {
1060            descending: true,
1061            nulls_first: true,
1062        }];
1063        let res = bisect::<true>(&arrays, &search_tuple, &ords)?;
1064        assert_eq!(res, 0);
1065        let res = linear_search::<true>(&arrays, &search_tuple, &ords)?;
1066        assert_eq!(res, 0);
1067
1068        // Descending, right
1069        let arrays: Vec<ArrayRef> =
1070            vec![Arc::new(Float64Array::from(vec![4.0, 3.0, 2.0, 1.0, 0.0]))];
1071        let search_tuple: Vec<ScalarValue> = vec![ScalarValue::Float64(Some(4.0))];
1072        let ords = [SortOptions {
1073            descending: true,
1074            nulls_first: true,
1075        }];
1076        let res = bisect::<false>(&arrays, &search_tuple, &ords)?;
1077        assert_eq!(res, 1);
1078        let res = linear_search::<false>(&arrays, &search_tuple, &ords)?;
1079        assert_eq!(res, 1);
1080
1081        // Ascending, left
1082        let arrays: Vec<ArrayRef> =
1083            vec![Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 9., 10.]))];
1084        let search_tuple: Vec<ScalarValue> = vec![ScalarValue::Float64(Some(7.0))];
1085        let ords = [SortOptions {
1086            descending: false,
1087            nulls_first: true,
1088        }];
1089        let res = bisect::<true>(&arrays, &search_tuple, &ords)?;
1090        assert_eq!(res, 1);
1091        let res = linear_search::<true>(&arrays, &search_tuple, &ords)?;
1092        assert_eq!(res, 1);
1093
1094        // Ascending, right
1095        let arrays: Vec<ArrayRef> =
1096            vec![Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 9., 10.]))];
1097        let search_tuple: Vec<ScalarValue> = vec![ScalarValue::Float64(Some(7.0))];
1098        let ords = [SortOptions {
1099            descending: false,
1100            nulls_first: true,
1101        }];
1102        let res = bisect::<false>(&arrays, &search_tuple, &ords)?;
1103        assert_eq!(res, 2);
1104        let res = linear_search::<false>(&arrays, &search_tuple, &ords)?;
1105        assert_eq!(res, 2);
1106
1107        let arrays: Vec<ArrayRef> = vec![
1108            Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 8.0, 9., 10.])),
1109            Arc::new(Float64Array::from(vec![10.0, 9.0, 8.0, 7.5, 7., 6.])),
1110        ];
1111        let search_tuple: Vec<ScalarValue> = vec![
1112            ScalarValue::Float64(Some(8.0)),
1113            ScalarValue::Float64(Some(8.0)),
1114        ];
1115        let ords = [
1116            SortOptions {
1117                descending: false,
1118                nulls_first: true,
1119            },
1120            SortOptions {
1121                descending: true,
1122                nulls_first: true,
1123            },
1124        ];
1125        let res = bisect::<false>(&arrays, &search_tuple, &ords)?;
1126        assert_eq!(res, 3);
1127        let res = linear_search::<false>(&arrays, &search_tuple, &ords)?;
1128        assert_eq!(res, 3);
1129
1130        let res = bisect::<true>(&arrays, &search_tuple, &ords)?;
1131        assert_eq!(res, 2);
1132        let res = linear_search::<true>(&arrays, &search_tuple, &ords)?;
1133        assert_eq!(res, 2);
1134        Ok(())
1135    }
1136
1137    #[test]
1138    fn test_evaluate_partition_ranges() -> Result<()> {
1139        let arrays: Vec<ArrayRef> = vec![
1140            Arc::new(Float64Array::from(vec![1.0, 1.0, 1.0, 2.0, 2.0, 2.0])),
1141            Arc::new(Float64Array::from(vec![4.0, 4.0, 3.0, 2.0, 1.0, 1.0])),
1142        ];
1143        let n_row = arrays[0].len();
1144        let options: Vec<SortOptions> = vec![
1145            SortOptions {
1146                descending: false,
1147                nulls_first: false,
1148            },
1149            SortOptions {
1150                descending: true,
1151                nulls_first: false,
1152            },
1153        ];
1154        let sort_columns = arrays
1155            .into_iter()
1156            .zip(options)
1157            .map(|(values, options)| SortColumn {
1158                values,
1159                options: Some(options),
1160            })
1161            .collect::<Vec<_>>();
1162        let ranges = evaluate_partition_ranges(n_row, &sort_columns)?;
1163        assert_eq!(ranges.len(), 4);
1164        assert_eq!(ranges[0], Range { start: 0, end: 2 });
1165        assert_eq!(ranges[1], Range { start: 2, end: 3 });
1166        assert_eq!(ranges[2], Range { start: 3, end: 4 });
1167        assert_eq!(ranges[3], Range { start: 4, end: 6 });
1168        Ok(())
1169    }
1170
1171    #[cfg(feature = "sql")]
1172    #[test]
1173    fn test_quote_identifier() -> Result<()> {
1174        let cases = vec![
1175            ("foo", r#"foo"#),
1176            ("_foo", r#"_foo"#),
1177            ("foo_bar", r#"foo_bar"#),
1178            ("foo-bar", r#""foo-bar""#),
1179            // name itself has a period, needs to be quoted
1180            ("foo.bar", r#""foo.bar""#),
1181            ("Foo", r#""Foo""#),
1182            ("Foo.Bar", r#""Foo.Bar""#),
1183            // name starting with a number needs to be quoted
1184            ("test1", r#"test1"#),
1185            ("1test", r#""1test""#),
1186        ];
1187
1188        for (identifier, quoted_identifier) in cases {
1189            println!("input: \n{identifier}\nquoted_identifier:\n{quoted_identifier}");
1190
1191            assert_eq!(quote_identifier(identifier), quoted_identifier);
1192
1193            // When parsing the quoted identifier, it should be a
1194            // a single identifier without normalization, and not in multiple parts
1195            let quote_style = if quoted_identifier.starts_with('"') {
1196                Some('"')
1197            } else {
1198                None
1199            };
1200
1201            let expected_parsed = vec![Ident {
1202                value: identifier.to_string(),
1203                quote_style,
1204                span: sqlparser::tokenizer::Span::empty(),
1205            }];
1206
1207            assert_eq!(
1208                parse_identifiers(quoted_identifier).unwrap(),
1209                expected_parsed
1210            );
1211        }
1212
1213        Ok(())
1214    }
1215
1216    #[test]
1217    fn test_get_at_indices() -> Result<()> {
1218        let in_vec = vec![1, 2, 3, 4, 5, 6, 7];
1219        assert_eq!(get_at_indices(&in_vec, [0, 2])?, vec![1, 3]);
1220        assert_eq!(get_at_indices(&in_vec, [4, 2])?, vec![5, 3]);
1221        // 7 is outside the range
1222        assert!(get_at_indices(&in_vec, [7]).is_err());
1223        Ok(())
1224    }
1225
1226    #[test]
1227    fn test_longest_consecutive_prefix() {
1228        assert_eq!(longest_consecutive_prefix([0, 3, 4]), 1);
1229        assert_eq!(longest_consecutive_prefix([0, 1, 3, 4]), 2);
1230        assert_eq!(longest_consecutive_prefix([0, 1, 2, 3, 4]), 5);
1231        assert_eq!(longest_consecutive_prefix([1, 2, 3, 4]), 0);
1232    }
1233
1234    #[test]
1235    fn test_merge_and_order_indices() {
1236        assert_eq!(
1237            merge_and_order_indices([0, 3, 4], [1, 3, 5]),
1238            vec![0, 1, 3, 4, 5]
1239        );
1240        // Result should be ordered, even if inputs are not
1241        assert_eq!(
1242            merge_and_order_indices([3, 0, 4], [5, 1, 3]),
1243            vec![0, 1, 3, 4, 5]
1244        );
1245    }
1246
1247    #[test]
1248    fn test_set_difference() {
1249        assert_eq!(set_difference([0, 3, 4], [1, 2]), vec![0, 3, 4]);
1250        assert_eq!(set_difference([0, 3, 4], [1, 2, 4]), vec![0, 3]);
1251        // return value should have same ordering with the in1
1252        assert_eq!(set_difference([3, 4, 0], [1, 2, 4]), vec![3, 0]);
1253        assert_eq!(set_difference([0, 3, 4], [4, 1, 2]), vec![0, 3]);
1254        assert_eq!(set_difference([3, 4, 0], [4, 1, 2]), vec![3, 0]);
1255    }
1256
1257    #[test]
1258    fn test_find_indices() -> Result<()> {
1259        assert_eq!(find_indices(&[0, 3, 4], [0, 3, 4])?, vec![0, 1, 2]);
1260        assert_eq!(find_indices(&[0, 3, 4], [0, 4, 3])?, vec![0, 2, 1]);
1261        assert_eq!(find_indices(&[3, 0, 4], [0, 3])?, vec![1, 0]);
1262        assert!(find_indices(&[0, 3], [0, 3, 4]).is_err());
1263        assert!(find_indices(&[0, 3, 4], [0, 2]).is_err());
1264        Ok(())
1265    }
1266
1267    #[test]
1268    fn test_transpose() -> Result<()> {
1269        let in_data = vec![vec![1, 2, 3], vec![4, 5, 6]];
1270        let transposed = transpose(in_data);
1271        let expected = vec![vec![1, 4], vec![2, 5], vec![3, 6]];
1272        assert_eq!(expected, transposed);
1273        Ok(())
1274    }
1275}