Skip to main content

datafusion_spark/function/string/
space.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow::array::{
19    Array, ArrayRef, DictionaryArray, Int32Array, StringArray, StringBuilder,
20    as_dictionary_array,
21};
22use arrow::datatypes::{DataType, Int32Type};
23use datafusion_common::cast::as_int32_array;
24use datafusion_common::{Result, ScalarValue, exec_err};
25use datafusion_expr::{
26    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
27};
28use std::any::Any;
29use std::sync::Arc;
30
31/// Spark-compatible `space` expression
32/// <https://spark.apache.org/docs/latest/api/sql/index.html#space>
33#[derive(Debug, PartialEq, Eq, Hash)]
34pub struct SparkSpace {
35    signature: Signature,
36}
37
38impl Default for SparkSpace {
39    fn default() -> Self {
40        Self::new()
41    }
42}
43
44impl SparkSpace {
45    pub fn new() -> Self {
46        Self {
47            signature: Signature::uniform(
48                1,
49                vec![
50                    DataType::Int32,
51                    DataType::Dictionary(
52                        Box::new(DataType::Int32),
53                        Box::new(DataType::Int32),
54                    ),
55                ],
56                Volatility::Immutable,
57            ),
58        }
59    }
60}
61
62impl ScalarUDFImpl for SparkSpace {
63    fn as_any(&self) -> &dyn Any {
64        self
65    }
66
67    fn name(&self) -> &str {
68        "space"
69    }
70
71    fn signature(&self) -> &Signature {
72        &self.signature
73    }
74
75    fn return_type(&self, args: &[DataType]) -> Result<DataType> {
76        let return_type = match &args[0] {
77            DataType::Dictionary(key_type, _) => {
78                DataType::Dictionary(key_type.clone(), Box::new(DataType::Utf8))
79            }
80            _ => DataType::Utf8,
81        };
82        Ok(return_type)
83    }
84
85    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
86        spark_space(&args.args)
87    }
88}
89
90pub fn spark_space(args: &[ColumnarValue]) -> Result<ColumnarValue> {
91    if args.len() != 1 {
92        return exec_err!("space function takes exactly one argument");
93    }
94    match &args[0] {
95        ColumnarValue::Array(array) => {
96            let result = spark_space_array(array)?;
97            Ok(ColumnarValue::Array(result))
98        }
99        ColumnarValue::Scalar(scalar) => {
100            let result = spark_space_scalar(scalar)?;
101            Ok(ColumnarValue::Scalar(result))
102        }
103    }
104}
105
106fn spark_space_array(array: &ArrayRef) -> Result<ArrayRef> {
107    match array.data_type() {
108        DataType::Int32 => {
109            let array = as_int32_array(array)?;
110            Ok(Arc::new(spark_space_array_inner(array)))
111        }
112        DataType::Dictionary(_, _) => {
113            let dict = as_dictionary_array::<Int32Type>(array);
114            let values = spark_space_array(dict.values())?;
115            let result = DictionaryArray::try_new(dict.keys().clone(), values)?;
116            Ok(Arc::new(result))
117        }
118        other => {
119            exec_err!("Unsupported data type {other:?} for function `space`")
120        }
121    }
122}
123
124fn spark_space_scalar(scalar: &ScalarValue) -> Result<ScalarValue> {
125    match scalar {
126        ScalarValue::Int32(value) => {
127            let result = value.map(|v| {
128                if v <= 0 {
129                    String::new()
130                } else {
131                    " ".repeat(v as usize)
132                }
133            });
134            Ok(ScalarValue::Utf8(result))
135        }
136        other => {
137            exec_err!("Unsupported data type {other:?} for function `space`")
138        }
139    }
140}
141
142fn spark_space_array_inner(array: &Int32Array) -> StringArray {
143    let mut builder = StringBuilder::with_capacity(array.len(), array.len() * 16);
144    let mut space_buf = String::new();
145    for value in array.iter() {
146        match value {
147            None => builder.append_null(),
148            Some(l) if l > 0 => {
149                let l = l as usize;
150                if space_buf.len() < l {
151                    space_buf = " ".repeat(l);
152                }
153                builder.append_value(&space_buf[..l]);
154            }
155            Some(_) => builder.append_value(""),
156        }
157    }
158    builder.finish()
159}
160
161#[cfg(test)]
162mod tests {
163    use crate::function::string::space::spark_space;
164    use arrow::array::{Array, Int32Array, Int32DictionaryArray};
165    use arrow::datatypes::Int32Type;
166    use datafusion_common::cast::{as_dictionary_array, as_string_array};
167    use datafusion_common::{Result, ScalarValue};
168    use datafusion_expr::ColumnarValue;
169    use std::sync::Arc;
170
171    #[test]
172    fn test_spark_space_int32_array() -> Result<()> {
173        let int32_array = ColumnarValue::Array(Arc::new(Int32Array::from(vec![
174            Some(1),
175            Some(-3),
176            Some(0),
177            Some(5),
178            None,
179        ])));
180        let ColumnarValue::Array(result) = spark_space(&[int32_array])? else {
181            unreachable!()
182        };
183        let result = as_string_array(&result)?;
184
185        assert_eq!(result.value(0), " ");
186        assert_eq!(result.value(1), "");
187        assert_eq!(result.value(2), "");
188        assert_eq!(result.value(3), "     ");
189        assert!(result.is_null(4));
190        Ok(())
191    }
192
193    #[test]
194    fn test_spark_space_dictionary() -> Result<()> {
195        let dictionary = ColumnarValue::Array(Arc::new(Int32DictionaryArray::new(
196            Int32Array::from(vec![0, 1, 2, 3, 4]),
197            Arc::new(Int32Array::from(vec![
198                Some(1),
199                Some(-3),
200                Some(0),
201                Some(5),
202                None,
203            ])),
204        )));
205        let ColumnarValue::Array(result) = spark_space(&[dictionary])? else {
206            unreachable!()
207        };
208        let result =
209            as_string_array(as_dictionary_array::<Int32Type>(&result)?.values())?;
210        assert_eq!(result.value(0), " ");
211        assert_eq!(result.value(1), "");
212        assert_eq!(result.value(2), "");
213        assert_eq!(result.value(3), "     ");
214        assert!(result.is_null(4));
215        Ok(())
216    }
217
218    #[test]
219    fn test_spark_space_scalar() -> Result<()> {
220        let scalar = ColumnarValue::Scalar(ScalarValue::Int32(Some(-5)));
221        let ColumnarValue::Scalar(result) = spark_space(&[scalar])? else {
222            unreachable!()
223        };
224        match result {
225            ScalarValue::Utf8(Some(result)) => {
226                assert_eq!(result, "");
227            }
228            _ => unreachable!(),
229        }
230        Ok(())
231    }
232}