datafusion_comet_spark_expr/string_funcs/
string_space.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow::array::{
19    as_dictionary_array, make_array, Array, ArrayData, ArrayRef, DictionaryArray,
20    GenericStringArray, Int32Array, OffsetSizeTrait,
21};
22use arrow::buffer::MutableBuffer;
23use arrow::datatypes::{DataType, Int32Type};
24use datafusion::common::{exec_err, internal_datafusion_err, DataFusionError, Result};
25use datafusion::logical_expr::{
26    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
27};
28use std::{any::Any, sync::Arc};
29
30#[derive(Debug)]
31pub struct SparkStringSpace {
32    signature: Signature,
33    aliases: Vec<String>,
34}
35
36impl Default for SparkStringSpace {
37    fn default() -> Self {
38        Self::new()
39    }
40}
41
42impl SparkStringSpace {
43    pub fn new() -> Self {
44        Self {
45            signature: Signature::user_defined(Volatility::Immutable),
46            aliases: vec![],
47        }
48    }
49}
50
51impl ScalarUDFImpl for SparkStringSpace {
52    fn as_any(&self) -> &dyn Any {
53        self
54    }
55
56    fn name(&self) -> &str {
57        "string_space"
58    }
59
60    fn signature(&self) -> &Signature {
61        &self.signature
62    }
63
64    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
65        Ok(match &arg_types[0] {
66            DataType::Dictionary(key_type, _) => {
67                DataType::Dictionary(key_type.clone(), Box::new(DataType::Utf8))
68            }
69            _ => DataType::Utf8,
70        })
71    }
72
73    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
74        let args: [ColumnarValue; 1] = args
75            .args
76            .try_into()
77            .map_err(|_| internal_datafusion_err!("string_space expects exactly one argument"))?;
78        spark_string_space(&args)
79    }
80
81    fn aliases(&self) -> &[String] {
82        &self.aliases
83    }
84}
85
86pub fn spark_string_space(args: &[ColumnarValue; 1]) -> Result<ColumnarValue> {
87    match args {
88        [ColumnarValue::Array(array)] => {
89            let result = string_space(&array)?;
90
91            Ok(ColumnarValue::Array(result))
92        }
93        _ => exec_err!("StringSpace(scalar) should be fold in Spark JVM side."),
94    }
95}
96
97fn string_space(length: &dyn Array) -> std::result::Result<ArrayRef, DataFusionError> {
98    match length.data_type() {
99        DataType::Int32 => {
100            let array = length.as_any().downcast_ref::<Int32Array>().unwrap();
101            Ok(generic_string_space::<i32>(array))
102        }
103        DataType::Dictionary(_, _) => {
104            let dict = as_dictionary_array::<Int32Type>(length);
105            let values = string_space(dict.values())?;
106            let result = DictionaryArray::try_new(dict.keys().clone(), values)?;
107            Ok(Arc::new(result))
108        }
109        other => exec_err!("Unsupported input type for function 'string_space': {other:?}"),
110    }
111}
112
113fn generic_string_space<OffsetSize: OffsetSizeTrait>(length: &Int32Array) -> ArrayRef {
114    let array_len = length.len();
115    let mut offsets = MutableBuffer::new((array_len + 1) * std::mem::size_of::<OffsetSize>());
116    let mut length_so_far = OffsetSize::zero();
117
118    // compute null bitmap (copy)
119    let null_bit_buffer = length.to_data().nulls().map(|b| b.buffer().clone());
120
121    // Gets slice of length array to access it directly for performance.
122    let length_data = length.to_data();
123    let lengths = length_data.buffers()[0].typed_data::<i32>();
124    let total = lengths.iter().map(|l| *l as usize).sum::<usize>();
125    let mut values = MutableBuffer::new(total);
126
127    offsets.push(length_so_far);
128
129    let blank = " ".as_bytes()[0];
130    values.resize(total, blank);
131
132    (0..array_len).for_each(|i| {
133        let current_len = lengths[i] as usize;
134
135        length_so_far += OffsetSize::from_usize(current_len).unwrap();
136        offsets.push(length_so_far);
137    });
138
139    let data = unsafe {
140        ArrayData::new_unchecked(
141            GenericStringArray::<OffsetSize>::DATA_TYPE,
142            array_len,
143            None,
144            null_bit_buffer,
145            0,
146            vec![offsets.into(), values.into()],
147            vec![],
148        )
149    };
150    make_array(data)
151}