datafusion_comet_spark_expr/kernels/
strings.rs1use std::sync::Arc;
21
22use arrow::{
23 array::*,
24 buffer::MutableBuffer,
25 compute::kernels::substring::{substring as arrow_substring, substring_by_char},
26 datatypes::{DataType, Int32Type},
27};
28use datafusion::common::DataFusionError;
29
30pub fn string_space(length: &dyn Array) -> Result<ArrayRef, DataFusionError> {
36 match length.data_type() {
37 DataType::Int32 => {
38 let array = length.as_any().downcast_ref::<Int32Array>().unwrap();
39 Ok(generic_string_space::<i32>(array))
40 }
41 DataType::Dictionary(_, _) => {
42 let dict = as_dictionary_array::<Int32Type>(length);
43 let values = string_space(dict.values())?;
44 let result = DictionaryArray::try_new(dict.keys().clone(), values)?;
45 Ok(Arc::new(result))
46 }
47 dt => panic!("Unsupported input type for function 'string_space': {dt:?}"),
48 }
49}
50
51pub fn substring(array: &dyn Array, start: i64, length: u64) -> Result<ArrayRef, DataFusionError> {
52 match array.data_type() {
53 DataType::LargeUtf8 => substring_by_char(
54 array
55 .as_any()
56 .downcast_ref::<LargeStringArray>()
57 .expect("A large string is expected"),
58 start,
59 Some(length),
60 )
61 .map_err(|e| e.into())
62 .map(|t| make_array(t.into_data())),
63 DataType::Utf8 => substring_by_char(
64 array
65 .as_any()
66 .downcast_ref::<StringArray>()
67 .expect("A string is expected"),
68 start,
69 Some(length),
70 )
71 .map_err(|e| e.into())
72 .map(|t| make_array(t.into_data())),
73 DataType::Binary | DataType::LargeBinary => {
74 arrow_substring(array, start, Some(length)).map_err(|e| e.into())
75 }
76 DataType::Dictionary(_, _) => {
77 let dict = as_dictionary_array::<Int32Type>(array);
78 let values = substring(dict.values(), start, length)?;
79 let result = DictionaryArray::try_new(dict.keys().clone(), values)?;
80 Ok(Arc::new(result))
81 }
82 dt => panic!("Unsupported input type for function 'substring': {dt:?}"),
83 }
84}
85
86fn generic_string_space<OffsetSize: OffsetSizeTrait>(length: &Int32Array) -> ArrayRef {
87 let array_len = length.len();
88 let mut offsets = MutableBuffer::new((array_len + 1) * std::mem::size_of::<OffsetSize>());
89 let mut length_so_far = OffsetSize::zero();
90
91 let null_bit_buffer = length.to_data().nulls().map(|b| b.buffer().clone());
93
94 let length_data = length.to_data();
96 let lengths = length_data.buffers()[0].typed_data::<i32>();
97 let total = lengths.iter().map(|l| *l as usize).sum::<usize>();
98 let mut values = MutableBuffer::new(total);
99
100 offsets.push(length_so_far);
101
102 let blank = " ".as_bytes()[0];
103 values.resize(total, blank);
104
105 (0..array_len).for_each(|i| {
106 let current_len = lengths[i] as usize;
107
108 length_so_far += OffsetSize::from_usize(current_len).unwrap();
109 offsets.push(length_so_far);
110 });
111
112 let data = unsafe {
113 ArrayData::new_unchecked(
114 GenericStringArray::<OffsetSize>::DATA_TYPE,
115 array_len,
116 None,
117 null_bit_buffer,
118 0,
119 vec![offsets.into(), values.into()],
120 vec![],
121 )
122 };
123 make_array(data)
124}