1use crate::{DFResult, RemoteDataType, RemoteField, RemoteSchema};
2use datafusion::arrow::array::{
3 ArrayRef, BooleanArray, Float16Array, Float32Array, Float64Array, Int16Array, Int32Array,
4 Int64Array, Int8Array, ListArray, RecordBatch, UInt16Array, UInt32Array, UInt64Array,
5 UInt8Array,
6};
7use datafusion::arrow::datatypes::{Field, Schema};
8use std::fmt::Debug;
9use std::sync::Arc;
10
11pub trait Transform: Debug + Send + Sync {
12 fn transform_boolean(
13 &self,
14 array: &BooleanArray,
15 remote_field: &RemoteField,
16 ) -> DFResult<(ArrayRef, Field)> {
17 Ok((Arc::new(array.clone()), remote_field.to_arrow_field()))
18 }
19
20 fn transform_int8(
21 &self,
22 array: &Int8Array,
23 remote_field: &RemoteField,
24 ) -> DFResult<(ArrayRef, Field)> {
25 Ok((Arc::new(array.clone()), remote_field.to_arrow_field()))
26 }
27
28 fn transform_int16(
29 &self,
30 array: &Int16Array,
31 remote_field: &RemoteField,
32 ) -> DFResult<(ArrayRef, Field)> {
33 Ok((Arc::new(array.clone()), remote_field.to_arrow_field()))
34 }
35
36 fn transform_int32(
37 &self,
38 array: &Int32Array,
39 remote_field: &RemoteField,
40 ) -> DFResult<(ArrayRef, Field)> {
41 Ok((Arc::new(array.clone()), remote_field.to_arrow_field()))
42 }
43
44 fn transform_int64(
45 &self,
46 array: &Int64Array,
47 remote_field: &RemoteField,
48 ) -> DFResult<(ArrayRef, Field)> {
49 Ok((Arc::new(array.clone()), remote_field.to_arrow_field()))
50 }
51
52 fn transform_uint8(
53 &self,
54 array: &UInt8Array,
55 remote_field: &RemoteField,
56 ) -> DFResult<(ArrayRef, Field)> {
57 Ok((Arc::new(array.clone()), remote_field.to_arrow_field()))
58 }
59
60 fn transform_uint16(
61 &self,
62 array: &UInt16Array,
63 remote_field: &RemoteField,
64 ) -> DFResult<(ArrayRef, Field)> {
65 Ok((Arc::new(array.clone()), remote_field.to_arrow_field()))
66 }
67
68 fn transform_uint32(
69 &self,
70 array: &UInt32Array,
71 remote_field: &RemoteField,
72 ) -> DFResult<(ArrayRef, Field)> {
73 Ok((Arc::new(array.clone()), remote_field.to_arrow_field()))
74 }
75
76 fn transform_uint64(
77 &self,
78 array: &UInt64Array,
79 remote_field: &RemoteField,
80 ) -> DFResult<(ArrayRef, Field)> {
81 Ok((Arc::new(array.clone()), remote_field.to_arrow_field()))
82 }
83
84 fn transform_float16(
85 &self,
86 array: &Float16Array,
87 remote_field: &RemoteField,
88 ) -> DFResult<(ArrayRef, Field)> {
89 Ok((Arc::new(array.clone()), remote_field.to_arrow_field()))
90 }
91
92 fn transform_float32(
93 &self,
94 array: &Float32Array,
95 remote_field: &RemoteField,
96 ) -> DFResult<(ArrayRef, Field)> {
97 Ok((Arc::new(array.clone()), remote_field.to_arrow_field()))
98 }
99
100 fn transform_float64(
101 &self,
102 array: &Float64Array,
103 remote_field: &RemoteField,
104 ) -> DFResult<(ArrayRef, Field)> {
105 Ok((Arc::new(array.clone()), remote_field.to_arrow_field()))
106 }
107
108 fn transform_list(
109 &self,
110 array: &ListArray,
111 remote_field: &RemoteField,
112 ) -> DFResult<(ArrayRef, Field)> {
113 Ok((Arc::new(array.clone()), remote_field.to_arrow_field()))
114 }
115}
116
117pub(crate) fn transform_batch(
118 batch: RecordBatch,
119 transform: &dyn Transform,
120 remote_schema: &RemoteSchema,
121) -> DFResult<RecordBatch> {
122 let mut new_arrays: Vec<ArrayRef> = Vec::with_capacity(remote_schema.fields.len());
123 let mut new_fields: Vec<Field> = Vec::with_capacity(remote_schema.fields.len());
124 for (idx, remote_field) in remote_schema.fields.iter().enumerate() {
125 let (new_array, new_field) = match &remote_field.data_type {
126 RemoteDataType::Boolean => {
128 let array = batch
129 .column(idx)
130 .as_any()
131 .downcast_ref::<BooleanArray>()
132 .expect("Failed to downcast to BooleanArray");
133 transform.transform_boolean(array, &remote_field)?
134 }
135 RemoteDataType::Int8 => {
136 let array = batch
137 .column(idx)
138 .as_any()
139 .downcast_ref::<Int8Array>()
140 .expect("Failed to downcast to Int8Array");
141 transform.transform_int8(array, &remote_field)?
142 }
143 RemoteDataType::Int16 => {
144 let array = batch
145 .column(idx)
146 .as_any()
147 .downcast_ref::<Int16Array>()
148 .expect("Failed to downcast to Int16Array");
149 transform.transform_int16(array, &remote_field)?
150 }
151 RemoteDataType::Int32 => {
152 let array = batch
153 .column(idx)
154 .as_any()
155 .downcast_ref::<Int32Array>()
156 .expect("Failed to downcast to Int32Array");
157 transform.transform_int32(array, &remote_field)?
158 }
159 RemoteDataType::Int64 => {
160 let array = batch
161 .column(idx)
162 .as_any()
163 .downcast_ref::<Int64Array>()
164 .expect("Failed to downcast to Int64Array");
165 transform.transform_int64(array, &remote_field)?
166 }
167 RemoteDataType::UInt8 => {
168 let array = batch
169 .column(idx)
170 .as_any()
171 .downcast_ref::<UInt8Array>()
172 .expect("Failed to downcast to UInt8Array");
173 transform.transform_uint8(array, &remote_field)?
174 }
175 RemoteDataType::UInt16 => {
176 let array = batch
177 .column(idx)
178 .as_any()
179 .downcast_ref::<UInt16Array>()
180 .expect("Failed to downcast to UInt16Array");
181 transform.transform_uint16(array, &remote_field)?
182 }
183 RemoteDataType::UInt32 => {
184 let array = batch
185 .column(idx)
186 .as_any()
187 .downcast_ref::<UInt32Array>()
188 .expect("Failed to downcast to UInt32Array");
189 transform.transform_uint32(array, &remote_field)?
190 }
191 RemoteDataType::UInt64 => {
192 let array = batch
193 .column(idx)
194 .as_any()
195 .downcast_ref::<UInt64Array>()
196 .expect("Failed to downcast to UInt64Array");
197 transform.transform_uint64(array, &remote_field)?
198 }
199 RemoteDataType::Float16 => {
200 let array = batch
201 .column(idx)
202 .as_any()
203 .downcast_ref::<Float16Array>()
204 .expect("Failed to downcast to Float16Array");
205 transform.transform_float16(array, &remote_field)?
206 }
207 RemoteDataType::Float32 => {
208 let array = batch
209 .column(idx)
210 .as_any()
211 .downcast_ref::<Float32Array>()
212 .expect("Failed to downcast to Float32Array");
213 transform.transform_float32(array, &remote_field)?
214 }
215 RemoteDataType::Float64 => {
216 let array = batch
217 .column(idx)
218 .as_any()
219 .downcast_ref::<Float64Array>()
220 .expect("Failed to downcast to Float64Array");
221 transform.transform_float64(array, &remote_field)?
222 }
223 RemoteDataType::List(_field) => {
224 let array = batch
225 .column(idx)
226 .as_any()
227 .downcast_ref::<ListArray>()
228 .expect("Failed to downcast to ListArray");
229 transform.transform_list(array, &remote_field)?
230 }
231 };
232 new_arrays.push(new_array);
233 new_fields.push(new_field);
234 }
235 let new_schema = Arc::new(Schema::new(new_fields));
236 Ok(RecordBatch::try_new(new_schema, new_arrays)?)
237}