datafusion_remote_table/
unparse.rs

1use crate::PostgresType;
2use crate::{DFResult, RemoteDbType, RemoteType};
3use chrono::{TimeZone, Utc};
4use datafusion::arrow::array::timezone::Tz;
5use datafusion::arrow::array::*;
6use datafusion::arrow::datatypes::*;
7use datafusion::arrow::temporal_conversions::{
8    date32_to_datetime, time64ns_to_time, time64us_to_time, timestamp_ns_to_datetime,
9    timestamp_us_to_datetime,
10};
11use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion};
12use datafusion::error::DataFusionError;
13use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
14use std::any::Any;
15use std::fmt::Debug;
16
17macro_rules! unparse_array {
18    ($array:ident) => {{
19        let mut sqls: Vec<String> = Vec::with_capacity($array.len());
20        for v in $array.iter() {
21            match v {
22                Some(v) => {
23                    sqls.push(format!("{v}"));
24                }
25                None => {
26                    sqls.push("NULL".to_string());
27                }
28            }
29        }
30        Ok::<_, DataFusionError>(sqls)
31    }};
32    ($array:ident, $convert:expr) => {{
33        let mut sqls: Vec<String> = Vec::with_capacity($array.len());
34        for v in $array.iter() {
35            match v {
36                Some(v) => {
37                    sqls.push($convert(v)?);
38                }
39                None => {
40                    sqls.push("NULL".to_string());
41                }
42            }
43        }
44        Ok::<_, DataFusionError>(sqls)
45    }};
46}
47
48pub trait Unparse: Debug + Send + Sync {
49    fn as_any(&self) -> &dyn Any;
50
51    fn support_filter_pushdown(
52        &self,
53        filter: &Expr,
54        db_type: RemoteDbType,
55    ) -> DFResult<TableProviderFilterPushDown> {
56        let unparser = match db_type.create_unparser() {
57            Ok(unparser) => unparser,
58            Err(_) => return Ok(TableProviderFilterPushDown::Unsupported),
59        };
60        if unparser.expr_to_sql(filter).is_err() {
61            return Ok(TableProviderFilterPushDown::Unsupported);
62        }
63
64        let mut pushdown = TableProviderFilterPushDown::Exact;
65        filter
66            .apply(|e| {
67                if matches!(e, Expr::ScalarFunction(_)) {
68                    pushdown = TableProviderFilterPushDown::Unsupported;
69                }
70                Ok(TreeNodeRecursion::Continue)
71            })
72            .expect("won't fail");
73
74        Ok(pushdown)
75    }
76
77    fn unparse_filter(&self, filter: &Expr, db_type: RemoteDbType) -> DFResult<String> {
78        let unparser = db_type.create_unparser()?;
79        let ast = unparser.expr_to_sql(filter)?;
80        Ok(format!("{ast}"))
81    }
82
83    fn unparse_null_array(
84        &self,
85        array: &NullArray,
86        _remote_type: RemoteType,
87    ) -> DFResult<Vec<String>> {
88        Ok(vec!["NULL".to_string(); array.len()])
89    }
90
91    fn unparse_boolean_array(
92        &self,
93        array: &BooleanArray,
94        _remote_type: RemoteType,
95    ) -> DFResult<Vec<String>> {
96        unparse_array!(array)
97    }
98
99    fn unparse_int8_array(
100        &self,
101        array: &Int8Array,
102        _remote_type: RemoteType,
103    ) -> DFResult<Vec<String>> {
104        unparse_array!(array)
105    }
106
107    fn unparse_int16_array(
108        &self,
109        array: &Int16Array,
110        _remote_type: RemoteType,
111    ) -> DFResult<Vec<String>> {
112        unparse_array!(array)
113    }
114
115    fn unparse_int32_array(
116        &self,
117        array: &Int32Array,
118        _remote_type: RemoteType,
119    ) -> DFResult<Vec<String>> {
120        unparse_array!(array)
121    }
122
123    fn unparse_int64_array(
124        &self,
125        array: &Int64Array,
126        _remote_type: RemoteType,
127    ) -> DFResult<Vec<String>> {
128        unparse_array!(array)
129    }
130
131    fn unparse_uint8_array(
132        &self,
133        array: &UInt8Array,
134        _remote_type: RemoteType,
135    ) -> DFResult<Vec<String>> {
136        unparse_array!(array)
137    }
138
139    fn unparse_uint16_array(
140        &self,
141        array: &UInt16Array,
142        _remote_type: RemoteType,
143    ) -> DFResult<Vec<String>> {
144        unparse_array!(array)
145    }
146
147    fn unparse_uint32_array(
148        &self,
149        array: &UInt32Array,
150        _remote_type: RemoteType,
151    ) -> DFResult<Vec<String>> {
152        unparse_array!(array)
153    }
154
155    fn unparse_uint64_array(
156        &self,
157        array: &UInt64Array,
158        _remote_type: RemoteType,
159    ) -> DFResult<Vec<String>> {
160        unparse_array!(array)
161    }
162
163    fn unparse_float16_array(
164        &self,
165        array: &Float16Array,
166        _remote_type: RemoteType,
167    ) -> DFResult<Vec<String>> {
168        unparse_array!(array)
169    }
170
171    fn unparse_float32_array(
172        &self,
173        array: &Float32Array,
174        _remote_type: RemoteType,
175    ) -> DFResult<Vec<String>> {
176        unparse_array!(array)
177    }
178
179    fn unparse_timestamp_microsecond_array(
180        &self,
181        array: &TimestampMicrosecondArray,
182        remote_type: RemoteType,
183    ) -> DFResult<Vec<String>> {
184        let db_type = remote_type.db_type();
185        let tz = match array.timezone() {
186            Some(tz) => Some(
187                tz.parse::<Tz>()
188                    .map_err(|e| DataFusionError::Internal(e.to_string()))?,
189            ),
190            None => None,
191        };
192
193        unparse_array!(array, |v| {
194            let Some(naive) = timestamp_us_to_datetime(v) else {
195                return Err(DataFusionError::Internal(format!(
196                    "invalid timestamp microsecond value: {v}"
197                )));
198            };
199            let format = match tz {
200                Some(tz) => {
201                    let date = Utc.from_utc_datetime(&naive).with_timezone(&tz);
202                    date.format("%Y-%m-%d %H:%M:%S.%f").to_string()
203                }
204                None => naive.format("%Y-%m-%d %H:%M:%S.%f").to_string(),
205            };
206            Ok::<_, DataFusionError>(db_type.sql_string_literal(&format))
207        })
208    }
209
210    fn unparse_timestamp_nanosecond_array(
211        &self,
212        array: &TimestampNanosecondArray,
213        remote_type: RemoteType,
214    ) -> DFResult<Vec<String>> {
215        let db_type = remote_type.db_type();
216        let tz = match array.timezone() {
217            Some(tz) => Some(
218                tz.parse::<Tz>()
219                    .map_err(|e| DataFusionError::Internal(e.to_string()))?,
220            ),
221            None => None,
222        };
223
224        unparse_array!(array, |v| {
225            let Some(naive) = timestamp_ns_to_datetime(v) else {
226                return Err(DataFusionError::Internal(format!(
227                    "invalid timestamp nanosecond value: {v}"
228                )));
229            };
230            let format = match tz {
231                Some(tz) => {
232                    let date = Utc.from_utc_datetime(&naive).with_timezone(&tz);
233                    date.format("%Y-%m-%d %H:%M:%S.%f").to_string()
234                }
235                None => naive.format("%Y-%m-%d %H:%M:%S.%f").to_string(),
236            };
237            Ok::<_, DataFusionError>(db_type.sql_string_literal(&format))
238        })
239    }
240
241    fn unparse_float64_array(
242        &self,
243        array: &Float64Array,
244        _remote_type: RemoteType,
245    ) -> DFResult<Vec<String>> {
246        unparse_array!(array)
247    }
248
249    fn unparse_date32_array(
250        &self,
251        array: &Date32Array,
252        remote_type: RemoteType,
253    ) -> DFResult<Vec<String>> {
254        let db_type = remote_type.db_type();
255        unparse_array!(array, |v| {
256            let Some(date) = date32_to_datetime(v) else {
257                return Err(DataFusionError::Internal(format!(
258                    "invalid date32 value: {v}"
259                )));
260            };
261            Ok::<_, DataFusionError>(
262                db_type.sql_string_literal(&date.format("%Y-%m-%d").to_string()),
263            )
264        })
265    }
266
267    fn unparse_time64_microsecond_array(
268        &self,
269        array: &Time64MicrosecondArray,
270        remote_type: RemoteType,
271    ) -> DFResult<Vec<String>> {
272        let db_type = remote_type.db_type();
273        unparse_array!(array, |v| {
274            let Some(time) = time64us_to_time(v) else {
275                return Err(DataFusionError::Internal(format!(
276                    "invalid time64 microsecond value: {v}"
277                )));
278            };
279            Ok::<_, DataFusionError>(
280                db_type.sql_string_literal(&time.format("%H:%M:%S.%f").to_string()),
281            )
282        })
283    }
284
285    fn unparse_time64_nanosecond_array(
286        &self,
287        array: &Time64NanosecondArray,
288        remote_type: RemoteType,
289    ) -> DFResult<Vec<String>> {
290        let db_type = remote_type.db_type();
291        unparse_array!(array, |v| {
292            let Some(time) = time64ns_to_time(v) else {
293                return Err(DataFusionError::Internal(format!(
294                    "invalid time64 nanosecond value: {v}"
295                )));
296            };
297            Ok::<_, DataFusionError>(
298                db_type.sql_string_literal(&time.format("%H:%M:%S.%f").to_string()),
299            )
300        })
301    }
302
303    fn unparse_interval_month_day_nano_array(
304        &self,
305        array: &IntervalMonthDayNanoArray,
306        remote_type: RemoteType,
307    ) -> DFResult<Vec<String>> {
308        let db_type = remote_type.db_type();
309        unparse_array!(array, |v: IntervalMonthDayNano| {
310            let mut s = String::new();
311            let mut prefix = "";
312
313            if v.months != 0 {
314                s.push_str(&format!("{prefix}{} mons", v.months));
315                prefix = " ";
316            }
317
318            if v.days != 0 {
319                s.push_str(&format!("{prefix}{} days", v.days));
320                prefix = " ";
321            }
322
323            if v.nanoseconds != 0 {
324                let secs = v.nanoseconds / 1_000_000_000;
325                let mins = secs / 60;
326                let hours = mins / 60;
327
328                let secs = secs - (mins * 60);
329                let mins = mins - (hours * 60);
330
331                let nanoseconds = v.nanoseconds % 1_000_000_000;
332
333                if hours != 0 {
334                    s.push_str(&format!("{prefix}{} hours", hours));
335                    prefix = " ";
336                }
337
338                if mins != 0 {
339                    s.push_str(&format!("{prefix}{} mins", mins));
340                    prefix = " ";
341                }
342
343                if secs != 0 || nanoseconds != 0 {
344                    let secs_sign = if secs < 0 || nanoseconds < 0 { "-" } else { "" };
345                    s.push_str(&format!(
346                        "{prefix}{}{}.{:09} secs",
347                        secs_sign,
348                        secs.abs(),
349                        nanoseconds.abs()
350                    ));
351                }
352            }
353
354            Ok::<_, DataFusionError>(db_type.sql_string_literal(&s))
355        })
356    }
357
358    fn unparse_string_array(
359        &self,
360        array: &StringArray,
361        remote_type: RemoteType,
362    ) -> DFResult<Vec<String>> {
363        let db_type = remote_type.db_type();
364        unparse_array!(array, |v| Ok::<_, DataFusionError>(
365            db_type.sql_string_literal(v)
366        ))
367    }
368
369    fn unparse_large_string_array(
370        &self,
371        array: &LargeStringArray,
372        remote_type: RemoteType,
373    ) -> DFResult<Vec<String>> {
374        let db_type = remote_type.db_type();
375        unparse_array!(array, |v| Ok::<_, DataFusionError>(
376            db_type.sql_string_literal(v)
377        ))
378    }
379
380    fn unparse_binary_array(
381        &self,
382        array: &BinaryArray,
383        remote_type: RemoteType,
384    ) -> DFResult<Vec<String>> {
385        let db_type = remote_type.db_type();
386        match remote_type {
387            RemoteType::Postgres(PostgresType::PostGisGeometry) => {
388                unparse_array!(array, |v| {
389                    let s = db_type.sql_binary_literal(v);
390                    Ok::<_, DataFusionError>(format!("ST_GeomFromWKB({s})"))
391                })
392            }
393            _ => unparse_array!(array, |v| Ok::<_, DataFusionError>(
394                db_type.sql_binary_literal(v)
395            )),
396        }
397    }
398
399    fn unparse_fixed_size_binary_array(
400        &self,
401        array: &FixedSizeBinaryArray,
402        remote_type: RemoteType,
403    ) -> DFResult<Vec<String>> {
404        let db_type = remote_type.db_type();
405        match remote_type {
406            RemoteType::Postgres(PostgresType::Uuid) => {
407                unparse_array!(array, |v| Ok::<_, DataFusionError>(format!(
408                    "'{}'",
409                    hex::encode(v)
410                )))
411            }
412            _ => unparse_array!(array, |v| Ok::<_, DataFusionError>(
413                db_type.sql_binary_literal(v)
414            )),
415        }
416    }
417
418    fn unparse_list_array(
419        &self,
420        array: &ListArray,
421        remote_type: RemoteType,
422    ) -> DFResult<Vec<String>> {
423        let db_type = remote_type.db_type();
424        let data_type = array.data_type();
425        let DataType::List(field) = data_type else {
426            return Err(DataFusionError::Internal(format!(
427                "expect list array, but got {data_type}"
428            )));
429        };
430
431        let inner_type = field.data_type();
432
433        match inner_type {
434            DataType::Boolean => {
435                unparse_array!(array, |v: ArrayRef| {
436                    let array = v.as_boolean();
437                    let sqls = unparse_array!(array)?;
438                    Ok::<_, DataFusionError>(format!("ARRAY[{}]", sqls.join(",")))
439                })
440            }
441            DataType::Int16 => {
442                unparse_array!(array, |v: ArrayRef| {
443                    let array = v.as_primitive::<Int16Type>();
444                    let sqls = unparse_array!(array)?;
445                    Ok::<_, DataFusionError>(format!("ARRAY[{}]", sqls.join(",")))
446                })
447            }
448            DataType::Int32 => {
449                unparse_array!(array, |v: ArrayRef| {
450                    let array = v.as_primitive::<Int32Type>();
451                    let sqls = unparse_array!(array)?;
452                    Ok::<_, DataFusionError>(format!("ARRAY[{}]", sqls.join(",")))
453                })
454            }
455            DataType::Int64 => {
456                unparse_array!(array, |v: ArrayRef| {
457                    let array = v.as_primitive::<Int64Type>();
458                    let sqls = unparse_array!(array)?;
459                    Ok::<_, DataFusionError>(format!("ARRAY[{}]", sqls.join(",")))
460                })
461            }
462            DataType::Float32 => {
463                unparse_array!(array, |v: ArrayRef| {
464                    let array = v.as_primitive::<Float32Type>();
465                    let sqls = unparse_array!(array)?;
466                    Ok::<_, DataFusionError>(format!("ARRAY[{}]", sqls.join(",")))
467                })
468            }
469            DataType::Float64 => {
470                unparse_array!(array, |v: ArrayRef| {
471                    let array = v.as_primitive::<Float64Type>();
472                    let sqls = unparse_array!(array)?;
473                    Ok::<_, DataFusionError>(format!("ARRAY[{}]", sqls.join(",")))
474                })
475            }
476            DataType::Utf8 => {
477                unparse_array!(array, |v: ArrayRef| {
478                    let array = v.as_string::<i32>();
479                    let sqls = unparse_array!(array, |v| Ok::<_, DataFusionError>(
480                        db_type.sql_string_literal(v)
481                    ))?;
482                    Ok::<_, DataFusionError>(format!("ARRAY[{}]", sqls.join(",")))
483                })
484            }
485            DataType::LargeUtf8 => {
486                unparse_array!(array, |v: ArrayRef| {
487                    let array = v.as_string::<i64>();
488                    let sqls = unparse_array!(array, |v| Ok::<_, DataFusionError>(
489                        db_type.sql_string_literal(v)
490                    ))?;
491                    Ok::<_, DataFusionError>(format!("ARRAY[{}]", sqls.join(",")))
492                })
493            }
494            _ => Err(DataFusionError::NotImplemented(format!(
495                "Not supported unparsing list array: {data_type}"
496            ))),
497        }
498    }
499
500    fn unparse_decimal128_array(
501        &self,
502        array: &Decimal128Array,
503        remote_type: RemoteType,
504    ) -> DFResult<Vec<String>> {
505        let precision = array.precision();
506        let scale = array.scale();
507        let db_type = remote_type.db_type();
508        unparse_array!(array, |v| Ok::<_, DataFusionError>(
509            db_type.sql_string_literal(&Decimal128Type::format_decimal(v, precision, scale))
510        ))
511    }
512
513    fn unparse_decimal256_array(
514        &self,
515        array: &Decimal256Array,
516        remote_type: RemoteType,
517    ) -> DFResult<Vec<String>> {
518        let precision = array.precision();
519        let scale = array.scale();
520        let db_type = remote_type.db_type();
521        unparse_array!(array, |v| Ok::<_, DataFusionError>(
522            db_type.sql_string_literal(&Decimal256Type::format_decimal(v, precision, scale))
523        ))
524    }
525}
526
527pub fn unparse_array(
528    unparser: &dyn Unparse,
529    array: &ArrayRef,
530    remote_type: RemoteType,
531) -> DFResult<Vec<String>> {
532    match array.data_type() {
533        DataType::Null => {
534            let array = array
535                .as_any()
536                .downcast_ref::<NullArray>()
537                .expect("expect null array");
538            unparser.unparse_null_array(array, remote_type)
539        }
540        DataType::Boolean => {
541            let array = array.as_boolean();
542            unparser.unparse_boolean_array(array, remote_type)
543        }
544        DataType::Int8 => {
545            let array = array.as_primitive::<Int8Type>();
546            unparser.unparse_int8_array(array, remote_type)
547        }
548        DataType::Int16 => {
549            let array = array.as_primitive::<Int16Type>();
550            unparser.unparse_int16_array(array, remote_type)
551        }
552        DataType::Int32 => {
553            let array = array.as_primitive::<Int32Type>();
554            unparser.unparse_int32_array(array, remote_type)
555        }
556        DataType::Int64 => {
557            let array = array.as_primitive::<Int64Type>();
558            unparser.unparse_int64_array(array, remote_type)
559        }
560        DataType::UInt8 => {
561            let array = array.as_primitive::<UInt8Type>();
562            unparser.unparse_uint8_array(array, remote_type)
563        }
564        DataType::UInt16 => {
565            let array = array.as_primitive::<UInt16Type>();
566            unparser.unparse_uint16_array(array, remote_type)
567        }
568        DataType::UInt32 => {
569            let array = array.as_primitive::<UInt32Type>();
570            unparser.unparse_uint32_array(array, remote_type)
571        }
572        DataType::UInt64 => {
573            let array = array.as_primitive::<UInt64Type>();
574            unparser.unparse_uint64_array(array, remote_type)
575        }
576        DataType::Float16 => {
577            let array = array.as_primitive::<Float16Type>();
578            unparser.unparse_float16_array(array, remote_type)
579        }
580        DataType::Float32 => {
581            let array = array.as_primitive::<Float32Type>();
582            unparser.unparse_float32_array(array, remote_type)
583        }
584        DataType::Float64 => {
585            let array = array.as_primitive::<Float64Type>();
586            unparser.unparse_float64_array(array, remote_type)
587        }
588        DataType::Timestamp(TimeUnit::Microsecond, _) => {
589            let array = array.as_primitive::<TimestampMicrosecondType>();
590            unparser.unparse_timestamp_microsecond_array(array, remote_type)
591        }
592        DataType::Timestamp(TimeUnit::Nanosecond, _) => {
593            let array = array.as_primitive::<TimestampNanosecondType>();
594            unparser.unparse_timestamp_nanosecond_array(array, remote_type)
595        }
596        DataType::Date32 => {
597            let array = array.as_primitive::<Date32Type>();
598            unparser.unparse_date32_array(array, remote_type)
599        }
600        DataType::Time64(TimeUnit::Microsecond) => {
601            let array = array.as_primitive::<Time64MicrosecondType>();
602            unparser.unparse_time64_microsecond_array(array, remote_type)
603        }
604        DataType::Time64(TimeUnit::Nanosecond) => {
605            let array = array.as_primitive::<Time64NanosecondType>();
606            unparser.unparse_time64_nanosecond_array(array, remote_type)
607        }
608        DataType::Interval(IntervalUnit::MonthDayNano) => {
609            let array = array.as_primitive::<IntervalMonthDayNanoType>();
610            unparser.unparse_interval_month_day_nano_array(array, remote_type)
611        }
612        DataType::Utf8 => {
613            let array = array.as_string();
614            unparser.unparse_string_array(array, remote_type)
615        }
616        DataType::LargeUtf8 => {
617            let array = array.as_string::<i64>();
618            unparser.unparse_large_string_array(array, remote_type)
619        }
620        DataType::Binary => {
621            let array = array.as_binary::<i32>();
622            unparser.unparse_binary_array(array, remote_type)
623        }
624        DataType::FixedSizeBinary(_) => {
625            let array = array.as_fixed_size_binary();
626            unparser.unparse_fixed_size_binary_array(array, remote_type)
627        }
628        DataType::List(_) => {
629            let array = array.as_list::<i32>();
630            unparser.unparse_list_array(array, remote_type)
631        }
632        DataType::Decimal128(_, _) => {
633            let array = array.as_primitive::<Decimal128Type>();
634            unparser.unparse_decimal128_array(array, remote_type)
635        }
636        DataType::Decimal256(_, _) => {
637            let array = array.as_primitive::<Decimal256Type>();
638            unparser.unparse_decimal256_array(array, remote_type)
639        }
640        _ => Err(DataFusionError::NotImplemented(format!(
641            "Not supported unparsing array: {}",
642            array.data_type()
643        ))),
644    }
645}
646
647#[derive(Debug)]
648pub struct DefaultUnparser {}
649
650impl Unparse for DefaultUnparser {
651    fn as_any(&self) -> &dyn Any {
652        self
653    }
654}