datafusion_comet_spark_expr/kernels/
strings.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! String kernels
19
20use std::sync::Arc;
21
22use arrow::{
23    array::*,
24    buffer::MutableBuffer,
25    compute::kernels::substring::{substring as arrow_substring, substring_by_char},
26    datatypes::{DataType, Int32Type},
27};
28use datafusion::common::DataFusionError;
29
30/// Returns an ArrayRef with a string consisting of `length` spaces.
31///
32/// # Preconditions
33///
34/// - elements in `length` must not be negative
35pub fn string_space(length: &dyn Array) -> Result<ArrayRef, DataFusionError> {
36    match length.data_type() {
37        DataType::Int32 => {
38            let array = length.as_any().downcast_ref::<Int32Array>().unwrap();
39            Ok(generic_string_space::<i32>(array))
40        }
41        DataType::Dictionary(_, _) => {
42            let dict = as_dictionary_array::<Int32Type>(length);
43            let values = string_space(dict.values())?;
44            let result = DictionaryArray::try_new(dict.keys().clone(), values)?;
45            Ok(Arc::new(result))
46        }
47        dt => panic!("Unsupported input type for function 'string_space': {dt:?}"),
48    }
49}
50
51pub fn substring(array: &dyn Array, start: i64, length: u64) -> Result<ArrayRef, DataFusionError> {
52    match array.data_type() {
53        DataType::LargeUtf8 => substring_by_char(
54            array
55                .as_any()
56                .downcast_ref::<LargeStringArray>()
57                .expect("A large string is expected"),
58            start,
59            Some(length),
60        )
61        .map_err(|e| e.into())
62        .map(|t| make_array(t.into_data())),
63        DataType::Utf8 => substring_by_char(
64            array
65                .as_any()
66                .downcast_ref::<StringArray>()
67                .expect("A string is expected"),
68            start,
69            Some(length),
70        )
71        .map_err(|e| e.into())
72        .map(|t| make_array(t.into_data())),
73        DataType::Binary | DataType::LargeBinary => {
74            arrow_substring(array, start, Some(length)).map_err(|e| e.into())
75        }
76        DataType::Dictionary(_, _) => {
77            let dict = as_dictionary_array::<Int32Type>(array);
78            let values = substring(dict.values(), start, length)?;
79            let result = DictionaryArray::try_new(dict.keys().clone(), values)?;
80            Ok(Arc::new(result))
81        }
82        dt => panic!("Unsupported input type for function 'substring': {dt:?}"),
83    }
84}
85
86fn generic_string_space<OffsetSize: OffsetSizeTrait>(length: &Int32Array) -> ArrayRef {
87    let array_len = length.len();
88    let mut offsets = MutableBuffer::new((array_len + 1) * std::mem::size_of::<OffsetSize>());
89    let mut length_so_far = OffsetSize::zero();
90
91    // compute null bitmap (copy)
92    let null_bit_buffer = length.to_data().nulls().map(|b| b.buffer().clone());
93
94    // Gets slice of length array to access it directly for performance.
95    let length_data = length.to_data();
96    let lengths = length_data.buffers()[0].typed_data::<i32>();
97    let total = lengths.iter().map(|l| *l as usize).sum::<usize>();
98    let mut values = MutableBuffer::new(total);
99
100    offsets.push(length_so_far);
101
102    let blank = " ".as_bytes()[0];
103    values.resize(total, blank);
104
105    (0..array_len).for_each(|i| {
106        let current_len = lengths[i] as usize;
107
108        length_so_far += OffsetSize::from_usize(current_len).unwrap();
109        offsets.push(length_so_far);
110    });
111
112    let data = unsafe {
113        ArrayData::new_unchecked(
114            GenericStringArray::<OffsetSize>::DATA_TYPE,
115            array_len,
116            None,
117            null_bit_buffer,
118            0,
119            vec![offsets.into(), values.into()],
120            vec![],
121        )
122    };
123    make_array(data)
124}