Skip to main content

datafusion_spark/function/string/
space.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow::array::{
19    Array, ArrayRef, DictionaryArray, Int32Array, StringArray, StringBuilder,
20    as_dictionary_array,
21};
22use arrow::datatypes::{DataType, Int32Type};
23use datafusion_common::cast::as_int32_array;
24use datafusion_common::{Result, ScalarValue, exec_err};
25use datafusion_expr::{
26    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
27};
28use std::sync::Arc;
29
30/// Spark-compatible `space` expression
31/// <https://spark.apache.org/docs/latest/api/sql/index.html#space>
32#[derive(Debug, PartialEq, Eq, Hash)]
33pub struct SparkSpace {
34    signature: Signature,
35}
36
37impl Default for SparkSpace {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43impl SparkSpace {
44    pub fn new() -> Self {
45        Self {
46            signature: Signature::uniform(
47                1,
48                vec![
49                    DataType::Int32,
50                    DataType::Dictionary(
51                        Box::new(DataType::Int32),
52                        Box::new(DataType::Int32),
53                    ),
54                ],
55                Volatility::Immutable,
56            ),
57        }
58    }
59}
60
61impl ScalarUDFImpl for SparkSpace {
62    fn name(&self) -> &str {
63        "space"
64    }
65
66    fn signature(&self) -> &Signature {
67        &self.signature
68    }
69
70    fn return_type(&self, args: &[DataType]) -> Result<DataType> {
71        let return_type = match &args[0] {
72            DataType::Dictionary(key_type, _) => {
73                DataType::Dictionary(key_type.clone(), Box::new(DataType::Utf8))
74            }
75            _ => DataType::Utf8,
76        };
77        Ok(return_type)
78    }
79
80    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
81        spark_space(&args.args)
82    }
83}
84
85pub fn spark_space(args: &[ColumnarValue]) -> Result<ColumnarValue> {
86    if args.len() != 1 {
87        return exec_err!("space function takes exactly one argument");
88    }
89    match &args[0] {
90        ColumnarValue::Array(array) => {
91            let result = spark_space_array(array)?;
92            Ok(ColumnarValue::Array(result))
93        }
94        ColumnarValue::Scalar(scalar) => {
95            let result = spark_space_scalar(scalar)?;
96            Ok(ColumnarValue::Scalar(result))
97        }
98    }
99}
100
101fn spark_space_array(array: &ArrayRef) -> Result<ArrayRef> {
102    match array.data_type() {
103        DataType::Int32 => {
104            let array = as_int32_array(array)?;
105            Ok(Arc::new(spark_space_array_inner(array)))
106        }
107        DataType::Dictionary(_, _) => {
108            let dict = as_dictionary_array::<Int32Type>(array);
109            let values = spark_space_array(dict.values())?;
110            let result = DictionaryArray::try_new(dict.keys().clone(), values)?;
111            Ok(Arc::new(result))
112        }
113        other => {
114            exec_err!("Unsupported data type {other:?} for function `space`")
115        }
116    }
117}
118
119fn spark_space_scalar(scalar: &ScalarValue) -> Result<ScalarValue> {
120    match scalar {
121        ScalarValue::Int32(value) => {
122            let result = value.map(|v| {
123                if v <= 0 {
124                    String::new()
125                } else {
126                    " ".repeat(v as usize)
127                }
128            });
129            Ok(ScalarValue::Utf8(result))
130        }
131        other => {
132            exec_err!("Unsupported data type {other:?} for function `space`")
133        }
134    }
135}
136
137fn spark_space_array_inner(array: &Int32Array) -> StringArray {
138    let mut builder = StringBuilder::with_capacity(array.len(), array.len() * 16);
139    let mut space_buf = String::new();
140    for value in array.iter() {
141        match value {
142            None => builder.append_null(),
143            Some(l) if l > 0 => {
144                let l = l as usize;
145                if space_buf.len() < l {
146                    space_buf = " ".repeat(l);
147                }
148                builder.append_value(&space_buf[..l]);
149            }
150            Some(_) => builder.append_value(""),
151        }
152    }
153    builder.finish()
154}
155
156#[cfg(test)]
157mod tests {
158    use crate::function::string::space::spark_space;
159    use arrow::array::{Array, Int32Array, Int32DictionaryArray};
160    use arrow::datatypes::Int32Type;
161    use datafusion_common::cast::{as_dictionary_array, as_string_array};
162    use datafusion_common::{Result, ScalarValue};
163    use datafusion_expr::ColumnarValue;
164    use std::sync::Arc;
165
166    #[test]
167    fn test_spark_space_int32_array() -> Result<()> {
168        let int32_array = ColumnarValue::Array(Arc::new(Int32Array::from(vec![
169            Some(1),
170            Some(-3),
171            Some(0),
172            Some(5),
173            None,
174        ])));
175        let ColumnarValue::Array(result) = spark_space(&[int32_array])? else {
176            unreachable!()
177        };
178        let result = as_string_array(&result)?;
179
180        assert_eq!(result.value(0), " ");
181        assert_eq!(result.value(1), "");
182        assert_eq!(result.value(2), "");
183        assert_eq!(result.value(3), "     ");
184        assert!(result.is_null(4));
185        Ok(())
186    }
187
188    #[test]
189    fn test_spark_space_dictionary() -> Result<()> {
190        let dictionary = ColumnarValue::Array(Arc::new(Int32DictionaryArray::new(
191            Int32Array::from(vec![0, 1, 2, 3, 4]),
192            Arc::new(Int32Array::from(vec![
193                Some(1),
194                Some(-3),
195                Some(0),
196                Some(5),
197                None,
198            ])),
199        )));
200        let ColumnarValue::Array(result) = spark_space(&[dictionary])? else {
201            unreachable!()
202        };
203        let result =
204            as_string_array(as_dictionary_array::<Int32Type>(&result)?.values())?;
205        assert_eq!(result.value(0), " ");
206        assert_eq!(result.value(1), "");
207        assert_eq!(result.value(2), "");
208        assert_eq!(result.value(3), "     ");
209        assert!(result.is_null(4));
210        Ok(())
211    }
212
213    #[test]
214    fn test_spark_space_scalar() -> Result<()> {
215        let scalar = ColumnarValue::Scalar(ScalarValue::Int32(Some(-5)));
216        let ColumnarValue::Scalar(result) = spark_space(&[scalar])? else {
217            unreachable!()
218        };
219        match result {
220            ScalarValue::Utf8(Some(result)) => {
221                assert_eq!(result, "");
222            }
223            _ => unreachable!(),
224        }
225        Ok(())
226    }
227}