1use std::sync::Arc;
4
5use arrow::array::{
6 Array, ArrowPrimitiveType, FixedSizeListArray, ListArray, PrimitiveArray, StringArray,
7};
8use arrow::datatypes::Field;
9
10use crate::{Error, Transform};
11
12#[derive(Clone)]
17pub struct MapList<T> {
18 transform: T,
19}
20
21impl<T> MapList<T> {
22 pub fn new(transform: T) -> Self {
24 Self { transform }
25 }
26}
27
28impl<T, S, U> Transform for MapList<T>
29where
30 T: Transform<Source = S, Target = U>,
31 S: Array + 'static,
32 U: Array + 'static,
33{
34 type Source = ListArray;
35 type Target = ListArray;
36
37 fn transform(&self, source: &ListArray) -> Result<ListArray, Error> {
38 let (field, offsets, values, nulls) = source.clone().into_parts();
39 let downcast =
40 values
41 .as_any()
42 .downcast_ref::<S>()
43 .ok_or_else(|| Error::UnexpectedListValueType {
44 expected: std::any::type_name::<S>().to_owned(),
45 actual: values.data_type().clone(),
46 })?;
47
48 let transformed = self.transform.transform(downcast)?;
49
50 let new_field = field
51 .as_ref()
52 .clone()
53 .with_data_type(transformed.data_type().clone());
54 Ok(ListArray::new(
55 new_field.into(),
56 offsets,
57 Arc::new(transformed),
58 nulls,
59 ))
60 }
61}
62
63#[derive(Clone)]
68pub struct MapFixedSizeList<T> {
69 transform: T,
70}
71
72impl<T> MapFixedSizeList<T> {
73 pub fn new(transform: T) -> Self {
75 Self { transform }
76 }
77}
78
79impl<T, S, U> Transform for MapFixedSizeList<T>
80where
81 T: Transform<Source = S, Target = U>,
82 S: Array + 'static,
83 U: Array + 'static,
84{
85 type Source = FixedSizeListArray;
86 type Target = FixedSizeListArray;
87
88 fn transform(&self, source: &FixedSizeListArray) -> Result<FixedSizeListArray, Error> {
89 let values = source.values();
90 let downcast = values.as_any().downcast_ref::<S>().ok_or_else(|| {
91 Error::UnexpectedFixedSizeListValueType {
92 expected: std::any::type_name::<S>().to_owned(),
93 actual: values.data_type().clone(),
94 }
95 })?;
96
97 let transformed = self.transform.transform(downcast)?;
98 let field = Arc::new(Field::new_list_field(
99 transformed.data_type().clone(),
100 transformed.is_nullable(),
101 ));
102 let size = source.value_length();
103 let nulls = source.nulls().cloned();
104
105 Ok(FixedSizeListArray::new(
106 field,
107 size,
108 Arc::new(transformed),
109 nulls,
110 ))
111 }
112}
113
114#[derive(Clone)]
119pub struct MapPrimitive<S, F, T = S>
120where
121 S: ArrowPrimitiveType,
122 T: ArrowPrimitiveType,
123 F: Fn(S::Native) -> T::Native,
124{
125 f: F,
126 _phantom_source: std::marker::PhantomData<S>,
127 _phantom_target: std::marker::PhantomData<T>,
128}
129
130impl<S, F, T> MapPrimitive<S, F, T>
131where
132 S: ArrowPrimitiveType,
133 T: ArrowPrimitiveType,
134 F: Fn(S::Native) -> T::Native,
135{
136 pub fn new(f: F) -> Self {
138 Self {
139 f,
140 _phantom_source: std::marker::PhantomData,
141 _phantom_target: std::marker::PhantomData,
142 }
143 }
144}
145
146impl<S, F, T> Transform for MapPrimitive<S, F, T>
147where
148 S: ArrowPrimitiveType,
149 T: ArrowPrimitiveType,
150 F: Fn(S::Native) -> T::Native,
151{
152 type Source = PrimitiveArray<S>;
153 type Target = PrimitiveArray<T>;
154
155 fn transform(&self, source: &PrimitiveArray<S>) -> Result<PrimitiveArray<T>, Error> {
156 let result: PrimitiveArray<T> = source.iter().map(|opt| opt.map(|v| (self.f)(v))).collect();
157 Ok(result)
158 }
159}
160
161#[derive(Clone)]
166pub struct ReplaceNull<T>
167where
168 T: ArrowPrimitiveType,
169{
170 default_value: T::Native,
171 _phantom: std::marker::PhantomData<T>,
172}
173
174impl<T> ReplaceNull<T>
175where
176 T: ArrowPrimitiveType,
177{
178 pub fn new(default_value: T::Native) -> Self {
180 Self {
181 default_value,
182 _phantom: std::marker::PhantomData,
183 }
184 }
185}
186
187impl<T> Transform for ReplaceNull<T>
188where
189 T: ArrowPrimitiveType,
190{
191 type Source = PrimitiveArray<T>;
192 type Target = PrimitiveArray<T>;
193
194 fn transform(&self, source: &PrimitiveArray<T>) -> Result<PrimitiveArray<T>, Error> {
195 let result: PrimitiveArray<T> = source
196 .iter()
197 .map(|opt| Some(opt.unwrap_or(self.default_value)))
198 .collect();
199 Ok(result)
200 }
201}
202
203#[derive(Clone)]
207pub struct StringPrefix {
208 prefix: String,
209 prefix_empty_string: bool,
210}
211
212impl StringPrefix {
213 pub fn new(prefix: impl Into<String>) -> Self {
215 Self {
216 prefix: prefix.into(),
217 prefix_empty_string: true,
218 }
219 }
220
221 pub fn with_prefix_empty_string(mut self, prefix_empty_string: bool) -> Self {
226 self.prefix_empty_string = prefix_empty_string;
227 self
228 }
229}
230
231impl Transform for StringPrefix {
232 type Source = StringArray;
233 type Target = StringArray;
234
235 fn transform(&self, source: &StringArray) -> Result<StringArray, Error> {
236 let result: StringArray = source
237 .iter()
238 .map(|opt| {
239 opt.map(|s| {
240 if s.is_empty() && !self.prefix_empty_string {
241 s.to_owned()
243 } else {
244 format!("{}{}", self.prefix, s)
245 }
246 })
247 })
248 .collect();
249 Ok(result)
250 }
251}
252
253#[derive(Clone)]
257pub struct StringSuffix {
258 suffix: String,
259 suffix_empty_string: bool,
260}
261
262impl StringSuffix {
263 pub fn new(suffix: impl Into<String>) -> Self {
265 Self {
266 suffix: suffix.into(),
267 suffix_empty_string: true,
268 }
269 }
270
271 pub fn with_suffix_empty_string(mut self, suffix_empty_string: bool) -> Self {
276 self.suffix_empty_string = suffix_empty_string;
277 self
278 }
279}
280
281impl Transform for StringSuffix {
282 type Source = StringArray;
283 type Target = StringArray;
284
285 fn transform(&self, source: &StringArray) -> Result<StringArray, Error> {
286 let result: StringArray = source
287 .iter()
288 .map(|opt| {
289 opt.map(|s| {
290 if s.is_empty() && !self.suffix_empty_string {
291 s.to_owned()
293 } else {
294 format!("{}{}", s, self.suffix)
295 }
296 })
297 })
298 .collect();
299 Ok(result)
300 }
301}