datafusion_remote_table/
literalize.rs

1use crate::PostgresType;
2use crate::{DFResult, RemoteType};
3use chrono::{TimeZone, Utc};
4use datafusion::arrow::array::timezone::Tz;
5use datafusion::arrow::array::*;
6use datafusion::arrow::datatypes::*;
7use datafusion::arrow::temporal_conversions::{
8    date32_to_datetime, time64ns_to_time, time64us_to_time, timestamp_ns_to_datetime,
9    timestamp_us_to_datetime,
10};
11use datafusion::error::DataFusionError;
12use std::any::Any;
13use std::fmt::Debug;
14
15macro_rules! literalize_array {
16    ($array:ident) => {{
17        let mut sqls: Vec<String> = Vec::with_capacity($array.len());
18        for v in $array.iter() {
19            match v {
20                Some(v) => {
21                    sqls.push(format!("{v}"));
22                }
23                None => {
24                    sqls.push("NULL".to_string());
25                }
26            }
27        }
28        Ok::<_, DataFusionError>(sqls)
29    }};
30    ($array:ident, $convert:expr) => {{
31        let mut sqls: Vec<String> = Vec::with_capacity($array.len());
32        for v in $array.iter() {
33            match v {
34                Some(v) => {
35                    sqls.push($convert(v)?);
36                }
37                None => {
38                    sqls.push("NULL".to_string());
39                }
40            }
41        }
42        Ok::<_, DataFusionError>(sqls)
43    }};
44}
45
46pub trait Literalize: Debug + Send + Sync {
47    fn as_any(&self) -> &dyn Any;
48
49    fn literalize_null_array(
50        &self,
51        array: &NullArray,
52        _remote_type: RemoteType,
53    ) -> DFResult<Vec<String>> {
54        Ok(vec!["NULL".to_string(); array.len()])
55    }
56
57    fn literalize_boolean_array(
58        &self,
59        array: &BooleanArray,
60        _remote_type: RemoteType,
61    ) -> DFResult<Vec<String>> {
62        literalize_array!(array)
63    }
64
65    fn literalize_int8_array(
66        &self,
67        array: &Int8Array,
68        _remote_type: RemoteType,
69    ) -> DFResult<Vec<String>> {
70        literalize_array!(array)
71    }
72
73    fn literalize_int16_array(
74        &self,
75        array: &Int16Array,
76        _remote_type: RemoteType,
77    ) -> DFResult<Vec<String>> {
78        literalize_array!(array)
79    }
80
81    fn literalize_int32_array(
82        &self,
83        array: &Int32Array,
84        _remote_type: RemoteType,
85    ) -> DFResult<Vec<String>> {
86        literalize_array!(array)
87    }
88
89    fn literalize_int64_array(
90        &self,
91        array: &Int64Array,
92        _remote_type: RemoteType,
93    ) -> DFResult<Vec<String>> {
94        literalize_array!(array)
95    }
96
97    fn literalize_uint8_array(
98        &self,
99        array: &UInt8Array,
100        _remote_type: RemoteType,
101    ) -> DFResult<Vec<String>> {
102        literalize_array!(array)
103    }
104
105    fn literalize_uint16_array(
106        &self,
107        array: &UInt16Array,
108        _remote_type: RemoteType,
109    ) -> DFResult<Vec<String>> {
110        literalize_array!(array)
111    }
112
113    fn literalize_uint32_array(
114        &self,
115        array: &UInt32Array,
116        _remote_type: RemoteType,
117    ) -> DFResult<Vec<String>> {
118        literalize_array!(array)
119    }
120
121    fn literalize_uint64_array(
122        &self,
123        array: &UInt64Array,
124        _remote_type: RemoteType,
125    ) -> DFResult<Vec<String>> {
126        literalize_array!(array)
127    }
128
129    fn literalize_float16_array(
130        &self,
131        array: &Float16Array,
132        _remote_type: RemoteType,
133    ) -> DFResult<Vec<String>> {
134        literalize_array!(array)
135    }
136
137    fn literalize_float32_array(
138        &self,
139        array: &Float32Array,
140        _remote_type: RemoteType,
141    ) -> DFResult<Vec<String>> {
142        literalize_array!(array)
143    }
144
145    fn literalize_timestamp_microsecond_array(
146        &self,
147        array: &TimestampMicrosecondArray,
148        remote_type: RemoteType,
149    ) -> DFResult<Vec<String>> {
150        let db_type = remote_type.db_type();
151        let tz = match array.timezone() {
152            Some(tz) => Some(
153                tz.parse::<Tz>()
154                    .map_err(|e| DataFusionError::Internal(e.to_string()))?,
155            ),
156            None => None,
157        };
158
159        literalize_array!(array, |v| {
160            let Some(naive) = timestamp_us_to_datetime(v) else {
161                return Err(DataFusionError::Internal(format!(
162                    "invalid timestamp microsecond value: {v}"
163                )));
164            };
165            let format = match tz {
166                Some(tz) => {
167                    let date = Utc.from_utc_datetime(&naive).with_timezone(&tz);
168                    date.format("%Y-%m-%d %H:%M:%S.%f").to_string()
169                }
170                None => naive.format("%Y-%m-%d %H:%M:%S.%f").to_string(),
171            };
172            Ok::<_, DataFusionError>(db_type.sql_string_literal(&format))
173        })
174    }
175
176    fn literalize_timestamp_nanosecond_array(
177        &self,
178        array: &TimestampNanosecondArray,
179        remote_type: RemoteType,
180    ) -> DFResult<Vec<String>> {
181        let db_type = remote_type.db_type();
182        let tz = match array.timezone() {
183            Some(tz) => Some(
184                tz.parse::<Tz>()
185                    .map_err(|e| DataFusionError::Internal(e.to_string()))?,
186            ),
187            None => None,
188        };
189
190        literalize_array!(array, |v| {
191            let Some(naive) = timestamp_ns_to_datetime(v) else {
192                return Err(DataFusionError::Internal(format!(
193                    "invalid timestamp nanosecond value: {v}"
194                )));
195            };
196            let format = match tz {
197                Some(tz) => {
198                    let date = Utc.from_utc_datetime(&naive).with_timezone(&tz);
199                    date.format("%Y-%m-%d %H:%M:%S.%f").to_string()
200                }
201                None => naive.format("%Y-%m-%d %H:%M:%S.%f").to_string(),
202            };
203            Ok::<_, DataFusionError>(db_type.sql_string_literal(&format))
204        })
205    }
206
207    fn literalize_float64_array(
208        &self,
209        array: &Float64Array,
210        _remote_type: RemoteType,
211    ) -> DFResult<Vec<String>> {
212        literalize_array!(array)
213    }
214
215    fn literalize_date32_array(
216        &self,
217        array: &Date32Array,
218        remote_type: RemoteType,
219    ) -> DFResult<Vec<String>> {
220        let db_type = remote_type.db_type();
221        literalize_array!(array, |v| {
222            let Some(date) = date32_to_datetime(v) else {
223                return Err(DataFusionError::Internal(format!(
224                    "invalid date32 value: {v}"
225                )));
226            };
227            Ok::<_, DataFusionError>(
228                db_type.sql_string_literal(&date.format("%Y-%m-%d").to_string()),
229            )
230        })
231    }
232
233    fn literalize_time64_microsecond_array(
234        &self,
235        array: &Time64MicrosecondArray,
236        remote_type: RemoteType,
237    ) -> DFResult<Vec<String>> {
238        let db_type = remote_type.db_type();
239        literalize_array!(array, |v| {
240            let Some(time) = time64us_to_time(v) else {
241                return Err(DataFusionError::Internal(format!(
242                    "invalid time64 microsecond value: {v}"
243                )));
244            };
245            Ok::<_, DataFusionError>(
246                db_type.sql_string_literal(&time.format("%H:%M:%S.%f").to_string()),
247            )
248        })
249    }
250
251    fn literalize_time64_nanosecond_array(
252        &self,
253        array: &Time64NanosecondArray,
254        remote_type: RemoteType,
255    ) -> DFResult<Vec<String>> {
256        let db_type = remote_type.db_type();
257        literalize_array!(array, |v| {
258            let Some(time) = time64ns_to_time(v) else {
259                return Err(DataFusionError::Internal(format!(
260                    "invalid time64 nanosecond value: {v}"
261                )));
262            };
263            Ok::<_, DataFusionError>(
264                db_type.sql_string_literal(&time.format("%H:%M:%S.%f").to_string()),
265            )
266        })
267    }
268
269    fn literalize_interval_month_day_nano_array(
270        &self,
271        array: &IntervalMonthDayNanoArray,
272        remote_type: RemoteType,
273    ) -> DFResult<Vec<String>> {
274        let db_type = remote_type.db_type();
275        literalize_array!(array, |v: IntervalMonthDayNano| {
276            let mut s = String::new();
277            let mut prefix = "";
278
279            if v.months != 0 {
280                s.push_str(&format!("{prefix}{} mons", v.months));
281                prefix = " ";
282            }
283
284            if v.days != 0 {
285                s.push_str(&format!("{prefix}{} days", v.days));
286                prefix = " ";
287            }
288
289            if v.nanoseconds != 0 {
290                let secs = v.nanoseconds / 1_000_000_000;
291                let mins = secs / 60;
292                let hours = mins / 60;
293
294                let secs = secs - (mins * 60);
295                let mins = mins - (hours * 60);
296
297                let nanoseconds = v.nanoseconds % 1_000_000_000;
298
299                if hours != 0 {
300                    s.push_str(&format!("{prefix}{} hours", hours));
301                    prefix = " ";
302                }
303
304                if mins != 0 {
305                    s.push_str(&format!("{prefix}{} mins", mins));
306                    prefix = " ";
307                }
308
309                if secs != 0 || nanoseconds != 0 {
310                    let secs_sign = if secs < 0 || nanoseconds < 0 { "-" } else { "" };
311                    s.push_str(&format!(
312                        "{prefix}{}{}.{:09} secs",
313                        secs_sign,
314                        secs.abs(),
315                        nanoseconds.abs()
316                    ));
317                }
318            }
319
320            Ok::<_, DataFusionError>(db_type.sql_string_literal(&s))
321        })
322    }
323
324    fn literalize_string_array(
325        &self,
326        array: &StringArray,
327        remote_type: RemoteType,
328    ) -> DFResult<Vec<String>> {
329        let db_type = remote_type.db_type();
330        literalize_array!(array, |v| Ok::<_, DataFusionError>(
331            db_type.sql_string_literal(v)
332        ))
333    }
334
335    fn literalize_large_string_array(
336        &self,
337        array: &LargeStringArray,
338        remote_type: RemoteType,
339    ) -> DFResult<Vec<String>> {
340        let db_type = remote_type.db_type();
341        literalize_array!(array, |v| Ok::<_, DataFusionError>(
342            db_type.sql_string_literal(v)
343        ))
344    }
345
346    fn literalize_binary_array(
347        &self,
348        array: &BinaryArray,
349        remote_type: RemoteType,
350    ) -> DFResult<Vec<String>> {
351        let db_type = remote_type.db_type();
352        match remote_type {
353            RemoteType::Postgres(PostgresType::PostGisGeometry) => {
354                literalize_array!(array, |v| {
355                    let s = db_type.sql_binary_literal(v);
356                    Ok::<_, DataFusionError>(format!("ST_GeomFromWKB({s})"))
357                })
358            }
359            _ => literalize_array!(array, |v| Ok::<_, DataFusionError>(
360                db_type.sql_binary_literal(v)
361            )),
362        }
363    }
364
365    fn literalize_fixed_size_binary_array(
366        &self,
367        array: &FixedSizeBinaryArray,
368        remote_type: RemoteType,
369    ) -> DFResult<Vec<String>> {
370        let db_type = remote_type.db_type();
371        match remote_type {
372            RemoteType::Postgres(PostgresType::Uuid) => {
373                literalize_array!(array, |v| Ok::<_, DataFusionError>(format!(
374                    "'{}'",
375                    hex::encode(v)
376                )))
377            }
378            _ => literalize_array!(array, |v| Ok::<_, DataFusionError>(
379                db_type.sql_binary_literal(v)
380            )),
381        }
382    }
383
384    fn literalize_list_array(
385        &self,
386        array: &ListArray,
387        remote_type: RemoteType,
388    ) -> DFResult<Vec<String>> {
389        let db_type = remote_type.db_type();
390        let data_type = array.data_type();
391        let DataType::List(field) = data_type else {
392            return Err(DataFusionError::Internal(format!(
393                "expect list array, but got {data_type}"
394            )));
395        };
396
397        let inner_type = field.data_type();
398
399        match inner_type {
400            DataType::Boolean => {
401                literalize_array!(array, |v: ArrayRef| {
402                    let array = v.as_boolean();
403                    let sqls = literalize_array!(array)?;
404                    Ok::<_, DataFusionError>(format!("ARRAY[{}]", sqls.join(",")))
405                })
406            }
407            DataType::Int16 => {
408                literalize_array!(array, |v: ArrayRef| {
409                    let array = v.as_primitive::<Int16Type>();
410                    let sqls = literalize_array!(array)?;
411                    Ok::<_, DataFusionError>(format!("ARRAY[{}]", sqls.join(",")))
412                })
413            }
414            DataType::Int32 => {
415                literalize_array!(array, |v: ArrayRef| {
416                    let array = v.as_primitive::<Int32Type>();
417                    let sqls = literalize_array!(array)?;
418                    Ok::<_, DataFusionError>(format!("ARRAY[{}]", sqls.join(",")))
419                })
420            }
421            DataType::Int64 => {
422                literalize_array!(array, |v: ArrayRef| {
423                    let array = v.as_primitive::<Int64Type>();
424                    let sqls = literalize_array!(array)?;
425                    Ok::<_, DataFusionError>(format!("ARRAY[{}]", sqls.join(",")))
426                })
427            }
428            DataType::Float32 => {
429                literalize_array!(array, |v: ArrayRef| {
430                    let array = v.as_primitive::<Float32Type>();
431                    let sqls = literalize_array!(array)?;
432                    Ok::<_, DataFusionError>(format!("ARRAY[{}]", sqls.join(",")))
433                })
434            }
435            DataType::Float64 => {
436                literalize_array!(array, |v: ArrayRef| {
437                    let array = v.as_primitive::<Float64Type>();
438                    let sqls = literalize_array!(array)?;
439                    Ok::<_, DataFusionError>(format!("ARRAY[{}]", sqls.join(",")))
440                })
441            }
442            DataType::Utf8 => {
443                literalize_array!(array, |v: ArrayRef| {
444                    let array = v.as_string::<i32>();
445                    let sqls = literalize_array!(array, |v| Ok::<_, DataFusionError>(
446                        db_type.sql_string_literal(v)
447                    ))?;
448                    Ok::<_, DataFusionError>(format!("ARRAY[{}]", sqls.join(",")))
449                })
450            }
451            DataType::LargeUtf8 => {
452                literalize_array!(array, |v: ArrayRef| {
453                    let array = v.as_string::<i64>();
454                    let sqls = literalize_array!(array, |v| Ok::<_, DataFusionError>(
455                        db_type.sql_string_literal(v)
456                    ))?;
457                    Ok::<_, DataFusionError>(format!("ARRAY[{}]", sqls.join(",")))
458                })
459            }
460            _ => Err(DataFusionError::NotImplemented(format!(
461                "Not supported literalizing list array: {data_type}"
462            ))),
463        }
464    }
465
466    fn literalize_decimal128_array(
467        &self,
468        array: &Decimal128Array,
469        remote_type: RemoteType,
470    ) -> DFResult<Vec<String>> {
471        let precision = array.precision();
472        let scale = array.scale();
473        let db_type = remote_type.db_type();
474        literalize_array!(array, |v| Ok::<_, DataFusionError>(
475            db_type.sql_string_literal(&Decimal128Type::format_decimal(v, precision, scale))
476        ))
477    }
478
479    fn literalize_decimal256_array(
480        &self,
481        array: &Decimal256Array,
482        remote_type: RemoteType,
483    ) -> DFResult<Vec<String>> {
484        let precision = array.precision();
485        let scale = array.scale();
486        let db_type = remote_type.db_type();
487        literalize_array!(array, |v| Ok::<_, DataFusionError>(
488            db_type.sql_string_literal(&Decimal256Type::format_decimal(v, precision, scale))
489        ))
490    }
491}
492
493pub fn literalize_array(
494    literalizer: &dyn Literalize,
495    array: &ArrayRef,
496    remote_type: RemoteType,
497) -> DFResult<Vec<String>> {
498    match array.data_type() {
499        DataType::Null => {
500            let array = array
501                .as_any()
502                .downcast_ref::<NullArray>()
503                .expect("expect null array");
504            literalizer.literalize_null_array(array, remote_type)
505        }
506        DataType::Boolean => {
507            let array = array.as_boolean();
508            literalizer.literalize_boolean_array(array, remote_type)
509        }
510        DataType::Int8 => {
511            let array = array.as_primitive::<Int8Type>();
512            literalizer.literalize_int8_array(array, remote_type)
513        }
514        DataType::Int16 => {
515            let array = array.as_primitive::<Int16Type>();
516            literalizer.literalize_int16_array(array, remote_type)
517        }
518        DataType::Int32 => {
519            let array = array.as_primitive::<Int32Type>();
520            literalizer.literalize_int32_array(array, remote_type)
521        }
522        DataType::Int64 => {
523            let array = array.as_primitive::<Int64Type>();
524            literalizer.literalize_int64_array(array, remote_type)
525        }
526        DataType::UInt8 => {
527            let array = array.as_primitive::<UInt8Type>();
528            literalizer.literalize_uint8_array(array, remote_type)
529        }
530        DataType::UInt16 => {
531            let array = array.as_primitive::<UInt16Type>();
532            literalizer.literalize_uint16_array(array, remote_type)
533        }
534        DataType::UInt32 => {
535            let array = array.as_primitive::<UInt32Type>();
536            literalizer.literalize_uint32_array(array, remote_type)
537        }
538        DataType::UInt64 => {
539            let array = array.as_primitive::<UInt64Type>();
540            literalizer.literalize_uint64_array(array, remote_type)
541        }
542        DataType::Float16 => {
543            let array = array.as_primitive::<Float16Type>();
544            literalizer.literalize_float16_array(array, remote_type)
545        }
546        DataType::Float32 => {
547            let array = array.as_primitive::<Float32Type>();
548            literalizer.literalize_float32_array(array, remote_type)
549        }
550        DataType::Float64 => {
551            let array = array.as_primitive::<Float64Type>();
552            literalizer.literalize_float64_array(array, remote_type)
553        }
554        DataType::Timestamp(TimeUnit::Microsecond, _) => {
555            let array = array.as_primitive::<TimestampMicrosecondType>();
556            literalizer.literalize_timestamp_microsecond_array(array, remote_type)
557        }
558        DataType::Timestamp(TimeUnit::Nanosecond, _) => {
559            let array = array.as_primitive::<TimestampNanosecondType>();
560            literalizer.literalize_timestamp_nanosecond_array(array, remote_type)
561        }
562        DataType::Date32 => {
563            let array = array.as_primitive::<Date32Type>();
564            literalizer.literalize_date32_array(array, remote_type)
565        }
566        DataType::Time64(TimeUnit::Microsecond) => {
567            let array = array.as_primitive::<Time64MicrosecondType>();
568            literalizer.literalize_time64_microsecond_array(array, remote_type)
569        }
570        DataType::Time64(TimeUnit::Nanosecond) => {
571            let array = array.as_primitive::<Time64NanosecondType>();
572            literalizer.literalize_time64_nanosecond_array(array, remote_type)
573        }
574        DataType::Interval(IntervalUnit::MonthDayNano) => {
575            let array = array.as_primitive::<IntervalMonthDayNanoType>();
576            literalizer.literalize_interval_month_day_nano_array(array, remote_type)
577        }
578        DataType::Utf8 => {
579            let array = array.as_string();
580            literalizer.literalize_string_array(array, remote_type)
581        }
582        DataType::LargeUtf8 => {
583            let array = array.as_string::<i64>();
584            literalizer.literalize_large_string_array(array, remote_type)
585        }
586        DataType::Binary => {
587            let array = array.as_binary::<i32>();
588            literalizer.literalize_binary_array(array, remote_type)
589        }
590        DataType::FixedSizeBinary(_) => {
591            let array = array.as_fixed_size_binary();
592            literalizer.literalize_fixed_size_binary_array(array, remote_type)
593        }
594        DataType::List(_) => {
595            let array = array.as_list::<i32>();
596            literalizer.literalize_list_array(array, remote_type)
597        }
598        DataType::Decimal128(_, _) => {
599            let array = array.as_primitive::<Decimal128Type>();
600            literalizer.literalize_decimal128_array(array, remote_type)
601        }
602        DataType::Decimal256(_, _) => {
603            let array = array.as_primitive::<Decimal256Type>();
604            literalizer.literalize_decimal256_array(array, remote_type)
605        }
606        _ => Err(DataFusionError::NotImplemented(format!(
607            "Not supported literalizing array: {}",
608            array.data_type()
609        ))),
610    }
611}
612
613#[derive(Debug)]
614pub struct DefaultLiteralizer {}
615
616impl Literalize for DefaultLiteralizer {
617    fn as_any(&self) -> &dyn Any {
618        self
619    }
620}