datafusion_comet_spark_expr/string_funcs/
string_space.rs1use arrow::array::{
19 as_dictionary_array, make_array, Array, ArrayData, ArrayRef, DictionaryArray,
20 GenericStringArray, Int32Array, OffsetSizeTrait,
21};
22use arrow::buffer::MutableBuffer;
23use arrow::datatypes::{DataType, Int32Type};
24use datafusion::common::{exec_err, internal_datafusion_err, DataFusionError, Result};
25use datafusion::logical_expr::{
26 ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
27};
28use std::{any::Any, sync::Arc};
29
30#[derive(Debug)]
31pub struct SparkStringSpace {
32 signature: Signature,
33 aliases: Vec<String>,
34}
35
36impl Default for SparkStringSpace {
37 fn default() -> Self {
38 Self::new()
39 }
40}
41
42impl SparkStringSpace {
43 pub fn new() -> Self {
44 Self {
45 signature: Signature::user_defined(Volatility::Immutable),
46 aliases: vec![],
47 }
48 }
49}
50
51impl ScalarUDFImpl for SparkStringSpace {
52 fn as_any(&self) -> &dyn Any {
53 self
54 }
55
56 fn name(&self) -> &str {
57 "string_space"
58 }
59
60 fn signature(&self) -> &Signature {
61 &self.signature
62 }
63
64 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
65 Ok(match &arg_types[0] {
66 DataType::Dictionary(key_type, _) => {
67 DataType::Dictionary(key_type.clone(), Box::new(DataType::Utf8))
68 }
69 _ => DataType::Utf8,
70 })
71 }
72
73 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
74 let args: [ColumnarValue; 1] = args
75 .args
76 .try_into()
77 .map_err(|_| internal_datafusion_err!("string_space expects exactly one argument"))?;
78 spark_string_space(&args)
79 }
80
81 fn aliases(&self) -> &[String] {
82 &self.aliases
83 }
84}
85
86pub fn spark_string_space(args: &[ColumnarValue; 1]) -> Result<ColumnarValue> {
87 match args {
88 [ColumnarValue::Array(array)] => {
89 let result = string_space(&array)?;
90
91 Ok(ColumnarValue::Array(result))
92 }
93 _ => exec_err!("StringSpace(scalar) should be fold in Spark JVM side."),
94 }
95}
96
97fn string_space(length: &dyn Array) -> std::result::Result<ArrayRef, DataFusionError> {
98 match length.data_type() {
99 DataType::Int32 => {
100 let array = length.as_any().downcast_ref::<Int32Array>().unwrap();
101 Ok(generic_string_space::<i32>(array))
102 }
103 DataType::Dictionary(_, _) => {
104 let dict = as_dictionary_array::<Int32Type>(length);
105 let values = string_space(dict.values())?;
106 let result = DictionaryArray::try_new(dict.keys().clone(), values)?;
107 Ok(Arc::new(result))
108 }
109 other => exec_err!("Unsupported input type for function 'string_space': {other:?}"),
110 }
111}
112
113fn generic_string_space<OffsetSize: OffsetSizeTrait>(length: &Int32Array) -> ArrayRef {
114 let array_len = length.len();
115 let mut offsets = MutableBuffer::new((array_len + 1) * std::mem::size_of::<OffsetSize>());
116 let mut length_so_far = OffsetSize::zero();
117
118 let null_bit_buffer = length.to_data().nulls().map(|b| b.buffer().clone());
120
121 let length_data = length.to_data();
123 let lengths = length_data.buffers()[0].typed_data::<i32>();
124 let total = lengths.iter().map(|l| *l as usize).sum::<usize>();
125 let mut values = MutableBuffer::new(total);
126
127 offsets.push(length_so_far);
128
129 let blank = " ".as_bytes()[0];
130 values.resize(total, blank);
131
132 (0..array_len).for_each(|i| {
133 let current_len = lengths[i] as usize;
134
135 length_so_far += OffsetSize::from_usize(current_len).unwrap();
136 offsets.push(length_so_far);
137 });
138
139 let data = unsafe {
140 ArrayData::new_unchecked(
141 GenericStringArray::<OffsetSize>::DATA_TYPE,
142 array_len,
143 None,
144 null_bit_buffer,
145 0,
146 vec![offsets.into(), values.into()],
147 vec![],
148 )
149 };
150 make_array(data)
151}