datafusion_spark/function/string/
space.rs1use arrow::array::{
19 Array, ArrayRef, DictionaryArray, Int32Array, StringArray, StringBuilder,
20 as_dictionary_array,
21};
22use arrow::datatypes::{DataType, Int32Type};
23use datafusion_common::cast::as_int32_array;
24use datafusion_common::{Result, ScalarValue, exec_err};
25use datafusion_expr::{
26 ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
27};
28use std::sync::Arc;
29
30#[derive(Debug, PartialEq, Eq, Hash)]
33pub struct SparkSpace {
34 signature: Signature,
35}
36
37impl Default for SparkSpace {
38 fn default() -> Self {
39 Self::new()
40 }
41}
42
43impl SparkSpace {
44 pub fn new() -> Self {
45 Self {
46 signature: Signature::uniform(
47 1,
48 vec![
49 DataType::Int32,
50 DataType::Dictionary(
51 Box::new(DataType::Int32),
52 Box::new(DataType::Int32),
53 ),
54 ],
55 Volatility::Immutable,
56 ),
57 }
58 }
59}
60
61impl ScalarUDFImpl for SparkSpace {
62 fn name(&self) -> &str {
63 "space"
64 }
65
66 fn signature(&self) -> &Signature {
67 &self.signature
68 }
69
70 fn return_type(&self, args: &[DataType]) -> Result<DataType> {
71 let return_type = match &args[0] {
72 DataType::Dictionary(key_type, _) => {
73 DataType::Dictionary(key_type.clone(), Box::new(DataType::Utf8))
74 }
75 _ => DataType::Utf8,
76 };
77 Ok(return_type)
78 }
79
80 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
81 spark_space(&args.args)
82 }
83}
84
85pub fn spark_space(args: &[ColumnarValue]) -> Result<ColumnarValue> {
86 if args.len() != 1 {
87 return exec_err!("space function takes exactly one argument");
88 }
89 match &args[0] {
90 ColumnarValue::Array(array) => {
91 let result = spark_space_array(array)?;
92 Ok(ColumnarValue::Array(result))
93 }
94 ColumnarValue::Scalar(scalar) => {
95 let result = spark_space_scalar(scalar)?;
96 Ok(ColumnarValue::Scalar(result))
97 }
98 }
99}
100
101fn spark_space_array(array: &ArrayRef) -> Result<ArrayRef> {
102 match array.data_type() {
103 DataType::Int32 => {
104 let array = as_int32_array(array)?;
105 Ok(Arc::new(spark_space_array_inner(array)))
106 }
107 DataType::Dictionary(_, _) => {
108 let dict = as_dictionary_array::<Int32Type>(array);
109 let values = spark_space_array(dict.values())?;
110 let result = DictionaryArray::try_new(dict.keys().clone(), values)?;
111 Ok(Arc::new(result))
112 }
113 other => {
114 exec_err!("Unsupported data type {other:?} for function `space`")
115 }
116 }
117}
118
119fn spark_space_scalar(scalar: &ScalarValue) -> Result<ScalarValue> {
120 match scalar {
121 ScalarValue::Int32(value) => {
122 let result = value.map(|v| {
123 if v <= 0 {
124 String::new()
125 } else {
126 " ".repeat(v as usize)
127 }
128 });
129 Ok(ScalarValue::Utf8(result))
130 }
131 other => {
132 exec_err!("Unsupported data type {other:?} for function `space`")
133 }
134 }
135}
136
137fn spark_space_array_inner(array: &Int32Array) -> StringArray {
138 let mut builder = StringBuilder::with_capacity(array.len(), array.len() * 16);
139 let mut space_buf = String::new();
140 for value in array.iter() {
141 match value {
142 None => builder.append_null(),
143 Some(l) if l > 0 => {
144 let l = l as usize;
145 if space_buf.len() < l {
146 space_buf = " ".repeat(l);
147 }
148 builder.append_value(&space_buf[..l]);
149 }
150 Some(_) => builder.append_value(""),
151 }
152 }
153 builder.finish()
154}
155
156#[cfg(test)]
157mod tests {
158 use crate::function::string::space::spark_space;
159 use arrow::array::{Array, Int32Array, Int32DictionaryArray};
160 use arrow::datatypes::Int32Type;
161 use datafusion_common::cast::{as_dictionary_array, as_string_array};
162 use datafusion_common::{Result, ScalarValue};
163 use datafusion_expr::ColumnarValue;
164 use std::sync::Arc;
165
166 #[test]
167 fn test_spark_space_int32_array() -> Result<()> {
168 let int32_array = ColumnarValue::Array(Arc::new(Int32Array::from(vec![
169 Some(1),
170 Some(-3),
171 Some(0),
172 Some(5),
173 None,
174 ])));
175 let ColumnarValue::Array(result) = spark_space(&[int32_array])? else {
176 unreachable!()
177 };
178 let result = as_string_array(&result)?;
179
180 assert_eq!(result.value(0), " ");
181 assert_eq!(result.value(1), "");
182 assert_eq!(result.value(2), "");
183 assert_eq!(result.value(3), " ");
184 assert!(result.is_null(4));
185 Ok(())
186 }
187
188 #[test]
189 fn test_spark_space_dictionary() -> Result<()> {
190 let dictionary = ColumnarValue::Array(Arc::new(Int32DictionaryArray::new(
191 Int32Array::from(vec![0, 1, 2, 3, 4]),
192 Arc::new(Int32Array::from(vec![
193 Some(1),
194 Some(-3),
195 Some(0),
196 Some(5),
197 None,
198 ])),
199 )));
200 let ColumnarValue::Array(result) = spark_space(&[dictionary])? else {
201 unreachable!()
202 };
203 let result =
204 as_string_array(as_dictionary_array::<Int32Type>(&result)?.values())?;
205 assert_eq!(result.value(0), " ");
206 assert_eq!(result.value(1), "");
207 assert_eq!(result.value(2), "");
208 assert_eq!(result.value(3), " ");
209 assert!(result.is_null(4));
210 Ok(())
211 }
212
213 #[test]
214 fn test_spark_space_scalar() -> Result<()> {
215 let scalar = ColumnarValue::Scalar(ScalarValue::Int32(Some(-5)));
216 let ColumnarValue::Scalar(result) = spark_space(&[scalar])? else {
217 unreachable!()
218 };
219 match result {
220 ScalarValue::Utf8(Some(result)) => {
221 assert_eq!(result, "");
222 }
223 _ => unreachable!(),
224 }
225 Ok(())
226 }
227}