datafusion_spark/function/string/
space.rs1use arrow::array::{
19 Array, ArrayRef, DictionaryArray, Int32Array, StringArray, StringBuilder,
20 as_dictionary_array,
21};
22use arrow::datatypes::{DataType, Int32Type};
23use datafusion_common::cast::as_int32_array;
24use datafusion_common::{Result, ScalarValue, exec_err};
25use datafusion_expr::{
26 ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
27};
28use std::any::Any;
29use std::sync::Arc;
30
31#[derive(Debug, PartialEq, Eq, Hash)]
34pub struct SparkSpace {
35 signature: Signature,
36}
37
38impl Default for SparkSpace {
39 fn default() -> Self {
40 Self::new()
41 }
42}
43
44impl SparkSpace {
45 pub fn new() -> Self {
46 Self {
47 signature: Signature::uniform(
48 1,
49 vec![
50 DataType::Int32,
51 DataType::Dictionary(
52 Box::new(DataType::Int32),
53 Box::new(DataType::Int32),
54 ),
55 ],
56 Volatility::Immutable,
57 ),
58 }
59 }
60}
61
62impl ScalarUDFImpl for SparkSpace {
63 fn as_any(&self) -> &dyn Any {
64 self
65 }
66
67 fn name(&self) -> &str {
68 "space"
69 }
70
71 fn signature(&self) -> &Signature {
72 &self.signature
73 }
74
75 fn return_type(&self, args: &[DataType]) -> Result<DataType> {
76 let return_type = match &args[0] {
77 DataType::Dictionary(key_type, _) => {
78 DataType::Dictionary(key_type.clone(), Box::new(DataType::Utf8))
79 }
80 _ => DataType::Utf8,
81 };
82 Ok(return_type)
83 }
84
85 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
86 spark_space(&args.args)
87 }
88}
89
90pub fn spark_space(args: &[ColumnarValue]) -> Result<ColumnarValue> {
91 if args.len() != 1 {
92 return exec_err!("space function takes exactly one argument");
93 }
94 match &args[0] {
95 ColumnarValue::Array(array) => {
96 let result = spark_space_array(array)?;
97 Ok(ColumnarValue::Array(result))
98 }
99 ColumnarValue::Scalar(scalar) => {
100 let result = spark_space_scalar(scalar)?;
101 Ok(ColumnarValue::Scalar(result))
102 }
103 }
104}
105
106fn spark_space_array(array: &ArrayRef) -> Result<ArrayRef> {
107 match array.data_type() {
108 DataType::Int32 => {
109 let array = as_int32_array(array)?;
110 Ok(Arc::new(spark_space_array_inner(array)))
111 }
112 DataType::Dictionary(_, _) => {
113 let dict = as_dictionary_array::<Int32Type>(array);
114 let values = spark_space_array(dict.values())?;
115 let result = DictionaryArray::try_new(dict.keys().clone(), values)?;
116 Ok(Arc::new(result))
117 }
118 other => {
119 exec_err!("Unsupported data type {other:?} for function `space`")
120 }
121 }
122}
123
124fn spark_space_scalar(scalar: &ScalarValue) -> Result<ScalarValue> {
125 match scalar {
126 ScalarValue::Int32(value) => {
127 let result = value.map(|v| {
128 if v <= 0 {
129 String::new()
130 } else {
131 " ".repeat(v as usize)
132 }
133 });
134 Ok(ScalarValue::Utf8(result))
135 }
136 other => {
137 exec_err!("Unsupported data type {other:?} for function `space`")
138 }
139 }
140}
141
142fn spark_space_array_inner(array: &Int32Array) -> StringArray {
143 let mut builder = StringBuilder::with_capacity(array.len(), array.len() * 16);
144 let mut space_buf = String::new();
145 for value in array.iter() {
146 match value {
147 None => builder.append_null(),
148 Some(l) if l > 0 => {
149 let l = l as usize;
150 if space_buf.len() < l {
151 space_buf = " ".repeat(l);
152 }
153 builder.append_value(&space_buf[..l]);
154 }
155 Some(_) => builder.append_value(""),
156 }
157 }
158 builder.finish()
159}
160
161#[cfg(test)]
162mod tests {
163 use crate::function::string::space::spark_space;
164 use arrow::array::{Array, Int32Array, Int32DictionaryArray};
165 use arrow::datatypes::Int32Type;
166 use datafusion_common::cast::{as_dictionary_array, as_string_array};
167 use datafusion_common::{Result, ScalarValue};
168 use datafusion_expr::ColumnarValue;
169 use std::sync::Arc;
170
171 #[test]
172 fn test_spark_space_int32_array() -> Result<()> {
173 let int32_array = ColumnarValue::Array(Arc::new(Int32Array::from(vec![
174 Some(1),
175 Some(-3),
176 Some(0),
177 Some(5),
178 None,
179 ])));
180 let ColumnarValue::Array(result) = spark_space(&[int32_array])? else {
181 unreachable!()
182 };
183 let result = as_string_array(&result)?;
184
185 assert_eq!(result.value(0), " ");
186 assert_eq!(result.value(1), "");
187 assert_eq!(result.value(2), "");
188 assert_eq!(result.value(3), " ");
189 assert!(result.is_null(4));
190 Ok(())
191 }
192
193 #[test]
194 fn test_spark_space_dictionary() -> Result<()> {
195 let dictionary = ColumnarValue::Array(Arc::new(Int32DictionaryArray::new(
196 Int32Array::from(vec![0, 1, 2, 3, 4]),
197 Arc::new(Int32Array::from(vec![
198 Some(1),
199 Some(-3),
200 Some(0),
201 Some(5),
202 None,
203 ])),
204 )));
205 let ColumnarValue::Array(result) = spark_space(&[dictionary])? else {
206 unreachable!()
207 };
208 let result =
209 as_string_array(as_dictionary_array::<Int32Type>(&result)?.values())?;
210 assert_eq!(result.value(0), " ");
211 assert_eq!(result.value(1), "");
212 assert_eq!(result.value(2), "");
213 assert_eq!(result.value(3), " ");
214 assert!(result.is_null(4));
215 Ok(())
216 }
217
218 #[test]
219 fn test_spark_space_scalar() -> Result<()> {
220 let scalar = ColumnarValue::Scalar(ScalarValue::Int32(Some(-5)));
221 let ColumnarValue::Scalar(result) = spark_space(&[scalar])? else {
222 unreachable!()
223 };
224 match result {
225 ScalarValue::Utf8(Some(result)) => {
226 assert_eq!(result, "");
227 }
228 _ => unreachable!(),
229 }
230 Ok(())
231 }
232}