datafusion_remote_table/
transform.rs

1use crate::{DFResult, RemoteDataType, RemoteField, RemoteSchema};
2use datafusion::arrow::array::{
3    ArrayRef, BooleanArray, Float16Array, Float32Array, Float64Array, Int16Array, Int32Array,
4    Int64Array, Int8Array, ListArray, RecordBatch, UInt16Array, UInt32Array, UInt64Array,
5    UInt8Array,
6};
7use datafusion::arrow::datatypes::{Field, Schema};
8use std::fmt::Debug;
9use std::sync::Arc;
10
11pub trait Transform: Debug + Send + Sync {
12    fn transform_boolean(
13        &self,
14        array: &BooleanArray,
15        remote_field: &RemoteField,
16    ) -> DFResult<(ArrayRef, Field)> {
17        Ok((Arc::new(array.clone()), remote_field.to_arrow_field()))
18    }
19
20    fn transform_int8(
21        &self,
22        array: &Int8Array,
23        remote_field: &RemoteField,
24    ) -> DFResult<(ArrayRef, Field)> {
25        Ok((Arc::new(array.clone()), remote_field.to_arrow_field()))
26    }
27
28    fn transform_int16(
29        &self,
30        array: &Int16Array,
31        remote_field: &RemoteField,
32    ) -> DFResult<(ArrayRef, Field)> {
33        Ok((Arc::new(array.clone()), remote_field.to_arrow_field()))
34    }
35
36    fn transform_int32(
37        &self,
38        array: &Int32Array,
39        remote_field: &RemoteField,
40    ) -> DFResult<(ArrayRef, Field)> {
41        Ok((Arc::new(array.clone()), remote_field.to_arrow_field()))
42    }
43
44    fn transform_int64(
45        &self,
46        array: &Int64Array,
47        remote_field: &RemoteField,
48    ) -> DFResult<(ArrayRef, Field)> {
49        Ok((Arc::new(array.clone()), remote_field.to_arrow_field()))
50    }
51
52    fn transform_uint8(
53        &self,
54        array: &UInt8Array,
55        remote_field: &RemoteField,
56    ) -> DFResult<(ArrayRef, Field)> {
57        Ok((Arc::new(array.clone()), remote_field.to_arrow_field()))
58    }
59
60    fn transform_uint16(
61        &self,
62        array: &UInt16Array,
63        remote_field: &RemoteField,
64    ) -> DFResult<(ArrayRef, Field)> {
65        Ok((Arc::new(array.clone()), remote_field.to_arrow_field()))
66    }
67
68    fn transform_uint32(
69        &self,
70        array: &UInt32Array,
71        remote_field: &RemoteField,
72    ) -> DFResult<(ArrayRef, Field)> {
73        Ok((Arc::new(array.clone()), remote_field.to_arrow_field()))
74    }
75
76    fn transform_uint64(
77        &self,
78        array: &UInt64Array,
79        remote_field: &RemoteField,
80    ) -> DFResult<(ArrayRef, Field)> {
81        Ok((Arc::new(array.clone()), remote_field.to_arrow_field()))
82    }
83
84    fn transform_float16(
85        &self,
86        array: &Float16Array,
87        remote_field: &RemoteField,
88    ) -> DFResult<(ArrayRef, Field)> {
89        Ok((Arc::new(array.clone()), remote_field.to_arrow_field()))
90    }
91
92    fn transform_float32(
93        &self,
94        array: &Float32Array,
95        remote_field: &RemoteField,
96    ) -> DFResult<(ArrayRef, Field)> {
97        Ok((Arc::new(array.clone()), remote_field.to_arrow_field()))
98    }
99
100    fn transform_float64(
101        &self,
102        array: &Float64Array,
103        remote_field: &RemoteField,
104    ) -> DFResult<(ArrayRef, Field)> {
105        Ok((Arc::new(array.clone()), remote_field.to_arrow_field()))
106    }
107
108    fn transform_list(
109        &self,
110        array: &ListArray,
111        remote_field: &RemoteField,
112    ) -> DFResult<(ArrayRef, Field)> {
113        Ok((Arc::new(array.clone()), remote_field.to_arrow_field()))
114    }
115}
116
117pub(crate) fn transform_batch(
118    batch: RecordBatch,
119    transform: &dyn Transform,
120    remote_schema: &RemoteSchema,
121) -> DFResult<RecordBatch> {
122    let mut new_arrays: Vec<ArrayRef> = Vec::with_capacity(remote_schema.fields.len());
123    let mut new_fields: Vec<Field> = Vec::with_capacity(remote_schema.fields.len());
124    for (idx, remote_field) in remote_schema.fields.iter().enumerate() {
125        let (new_array, new_field) = match &remote_field.data_type {
126            // TODO use a macro to reduce boilerplate
127            RemoteDataType::Boolean => {
128                let array = batch
129                    .column(idx)
130                    .as_any()
131                    .downcast_ref::<BooleanArray>()
132                    .expect("Failed to downcast to BooleanArray");
133                transform.transform_boolean(array, &remote_field)?
134            }
135            RemoteDataType::Int8 => {
136                let array = batch
137                    .column(idx)
138                    .as_any()
139                    .downcast_ref::<Int8Array>()
140                    .expect("Failed to downcast to Int8Array");
141                transform.transform_int8(array, &remote_field)?
142            }
143            RemoteDataType::Int16 => {
144                let array = batch
145                    .column(idx)
146                    .as_any()
147                    .downcast_ref::<Int16Array>()
148                    .expect("Failed to downcast to Int16Array");
149                transform.transform_int16(array, &remote_field)?
150            }
151            RemoteDataType::Int32 => {
152                let array = batch
153                    .column(idx)
154                    .as_any()
155                    .downcast_ref::<Int32Array>()
156                    .expect("Failed to downcast to Int32Array");
157                transform.transform_int32(array, &remote_field)?
158            }
159            RemoteDataType::Int64 => {
160                let array = batch
161                    .column(idx)
162                    .as_any()
163                    .downcast_ref::<Int64Array>()
164                    .expect("Failed to downcast to Int64Array");
165                transform.transform_int64(array, &remote_field)?
166            }
167            RemoteDataType::UInt8 => {
168                let array = batch
169                    .column(idx)
170                    .as_any()
171                    .downcast_ref::<UInt8Array>()
172                    .expect("Failed to downcast to UInt8Array");
173                transform.transform_uint8(array, &remote_field)?
174            }
175            RemoteDataType::UInt16 => {
176                let array = batch
177                    .column(idx)
178                    .as_any()
179                    .downcast_ref::<UInt16Array>()
180                    .expect("Failed to downcast to UInt16Array");
181                transform.transform_uint16(array, &remote_field)?
182            }
183            RemoteDataType::UInt32 => {
184                let array = batch
185                    .column(idx)
186                    .as_any()
187                    .downcast_ref::<UInt32Array>()
188                    .expect("Failed to downcast to UInt32Array");
189                transform.transform_uint32(array, &remote_field)?
190            }
191            RemoteDataType::UInt64 => {
192                let array = batch
193                    .column(idx)
194                    .as_any()
195                    .downcast_ref::<UInt64Array>()
196                    .expect("Failed to downcast to UInt64Array");
197                transform.transform_uint64(array, &remote_field)?
198            }
199            RemoteDataType::Float16 => {
200                let array = batch
201                    .column(idx)
202                    .as_any()
203                    .downcast_ref::<Float16Array>()
204                    .expect("Failed to downcast to Float16Array");
205                transform.transform_float16(array, &remote_field)?
206            }
207            RemoteDataType::Float32 => {
208                let array = batch
209                    .column(idx)
210                    .as_any()
211                    .downcast_ref::<Float32Array>()
212                    .expect("Failed to downcast to Float32Array");
213                transform.transform_float32(array, &remote_field)?
214            }
215            RemoteDataType::Float64 => {
216                let array = batch
217                    .column(idx)
218                    .as_any()
219                    .downcast_ref::<Float64Array>()
220                    .expect("Failed to downcast to Float64Array");
221                transform.transform_float64(array, &remote_field)?
222            }
223            RemoteDataType::List(_field) => {
224                let array = batch
225                    .column(idx)
226                    .as_any()
227                    .downcast_ref::<ListArray>()
228                    .expect("Failed to downcast to ListArray");
229                transform.transform_list(array, &remote_field)?
230            }
231        };
232        new_arrays.push(new_array);
233        new_fields.push(new_field);
234    }
235    let new_schema = Arc::new(Schema::new(new_fields));
236    Ok(RecordBatch::try_new(new_schema, new_arrays)?)
237}