1use crate::PostgresType;
2use crate::{DFResult, RemoteDbType, RemoteType};
3use chrono::{TimeZone, Utc};
4use datafusion::arrow::array::timezone::Tz;
5use datafusion::arrow::array::*;
6use datafusion::arrow::datatypes::*;
7use datafusion::arrow::temporal_conversions::{
8 date32_to_datetime, time64ns_to_time, time64us_to_time, timestamp_ns_to_datetime,
9 timestamp_us_to_datetime,
10};
11use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion};
12use datafusion::error::DataFusionError;
13use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
14use std::any::Any;
15use std::fmt::Debug;
16
17macro_rules! unparse_array {
18 ($array:ident) => {{
19 let mut sqls: Vec<String> = Vec::with_capacity($array.len());
20 for v in $array.iter() {
21 match v {
22 Some(v) => {
23 sqls.push(format!("{v}"));
24 }
25 None => {
26 sqls.push("NULL".to_string());
27 }
28 }
29 }
30 Ok::<_, DataFusionError>(sqls)
31 }};
32 ($array:ident, $convert:expr) => {{
33 let mut sqls: Vec<String> = Vec::with_capacity($array.len());
34 for v in $array.iter() {
35 match v {
36 Some(v) => {
37 sqls.push($convert(v)?);
38 }
39 None => {
40 sqls.push("NULL".to_string());
41 }
42 }
43 }
44 Ok::<_, DataFusionError>(sqls)
45 }};
46}
47
48pub trait Unparse: Debug + Send + Sync {
49 fn as_any(&self) -> &dyn Any;
50
51 fn support_filter_pushdown(
52 &self,
53 filter: &Expr,
54 db_type: RemoteDbType,
55 ) -> DFResult<TableProviderFilterPushDown> {
56 let unparser = match db_type.create_unparser() {
57 Ok(unparser) => unparser,
58 Err(_) => return Ok(TableProviderFilterPushDown::Unsupported),
59 };
60 if unparser.expr_to_sql(filter).is_err() {
61 return Ok(TableProviderFilterPushDown::Unsupported);
62 }
63
64 let mut pushdown = TableProviderFilterPushDown::Exact;
65 filter
66 .apply(|e| {
67 if matches!(e, Expr::ScalarFunction(_)) {
68 pushdown = TableProviderFilterPushDown::Unsupported;
69 }
70 Ok(TreeNodeRecursion::Continue)
71 })
72 .expect("won't fail");
73
74 Ok(pushdown)
75 }
76
77 fn unparse_filter(&self, filter: &Expr, db_type: RemoteDbType) -> DFResult<String> {
78 let unparser = db_type.create_unparser()?;
79 let ast = unparser.expr_to_sql(filter)?;
80 Ok(format!("{ast}"))
81 }
82
83 fn unparse_null_array(
84 &self,
85 array: &NullArray,
86 _remote_type: RemoteType,
87 ) -> DFResult<Vec<String>> {
88 Ok(vec!["NULL".to_string(); array.len()])
89 }
90
91 fn unparse_boolean_array(
92 &self,
93 array: &BooleanArray,
94 _remote_type: RemoteType,
95 ) -> DFResult<Vec<String>> {
96 unparse_array!(array)
97 }
98
99 fn unparse_int8_array(
100 &self,
101 array: &Int8Array,
102 _remote_type: RemoteType,
103 ) -> DFResult<Vec<String>> {
104 unparse_array!(array)
105 }
106
107 fn unparse_int16_array(
108 &self,
109 array: &Int16Array,
110 _remote_type: RemoteType,
111 ) -> DFResult<Vec<String>> {
112 unparse_array!(array)
113 }
114
115 fn unparse_int32_array(
116 &self,
117 array: &Int32Array,
118 _remote_type: RemoteType,
119 ) -> DFResult<Vec<String>> {
120 unparse_array!(array)
121 }
122
123 fn unparse_int64_array(
124 &self,
125 array: &Int64Array,
126 _remote_type: RemoteType,
127 ) -> DFResult<Vec<String>> {
128 unparse_array!(array)
129 }
130
131 fn unparse_uint8_array(
132 &self,
133 array: &UInt8Array,
134 _remote_type: RemoteType,
135 ) -> DFResult<Vec<String>> {
136 unparse_array!(array)
137 }
138
139 fn unparse_uint16_array(
140 &self,
141 array: &UInt16Array,
142 _remote_type: RemoteType,
143 ) -> DFResult<Vec<String>> {
144 unparse_array!(array)
145 }
146
147 fn unparse_uint32_array(
148 &self,
149 array: &UInt32Array,
150 _remote_type: RemoteType,
151 ) -> DFResult<Vec<String>> {
152 unparse_array!(array)
153 }
154
155 fn unparse_uint64_array(
156 &self,
157 array: &UInt64Array,
158 _remote_type: RemoteType,
159 ) -> DFResult<Vec<String>> {
160 unparse_array!(array)
161 }
162
163 fn unparse_float16_array(
164 &self,
165 array: &Float16Array,
166 _remote_type: RemoteType,
167 ) -> DFResult<Vec<String>> {
168 unparse_array!(array)
169 }
170
171 fn unparse_float32_array(
172 &self,
173 array: &Float32Array,
174 _remote_type: RemoteType,
175 ) -> DFResult<Vec<String>> {
176 unparse_array!(array)
177 }
178
179 fn unparse_timestamp_microsecond_array(
180 &self,
181 array: &TimestampMicrosecondArray,
182 remote_type: RemoteType,
183 ) -> DFResult<Vec<String>> {
184 let db_type = remote_type.db_type();
185 let tz = match array.timezone() {
186 Some(tz) => Some(
187 tz.parse::<Tz>()
188 .map_err(|e| DataFusionError::Internal(e.to_string()))?,
189 ),
190 None => None,
191 };
192
193 unparse_array!(array, |v| {
194 let Some(naive) = timestamp_us_to_datetime(v) else {
195 return Err(DataFusionError::Internal(format!(
196 "invalid timestamp microsecond value: {v}"
197 )));
198 };
199 let format = match tz {
200 Some(tz) => {
201 let date = Utc.from_utc_datetime(&naive).with_timezone(&tz);
202 date.format("%Y-%m-%d %H:%M:%S.%f").to_string()
203 }
204 None => naive.format("%Y-%m-%d %H:%M:%S.%f").to_string(),
205 };
206 Ok::<_, DataFusionError>(db_type.sql_string_literal(&format))
207 })
208 }
209
210 fn unparse_timestamp_nanosecond_array(
211 &self,
212 array: &TimestampNanosecondArray,
213 remote_type: RemoteType,
214 ) -> DFResult<Vec<String>> {
215 let db_type = remote_type.db_type();
216 let tz = match array.timezone() {
217 Some(tz) => Some(
218 tz.parse::<Tz>()
219 .map_err(|e| DataFusionError::Internal(e.to_string()))?,
220 ),
221 None => None,
222 };
223
224 unparse_array!(array, |v| {
225 let Some(naive) = timestamp_ns_to_datetime(v) else {
226 return Err(DataFusionError::Internal(format!(
227 "invalid timestamp nanosecond value: {v}"
228 )));
229 };
230 let format = match tz {
231 Some(tz) => {
232 let date = Utc.from_utc_datetime(&naive).with_timezone(&tz);
233 date.format("%Y-%m-%d %H:%M:%S.%f").to_string()
234 }
235 None => naive.format("%Y-%m-%d %H:%M:%S.%f").to_string(),
236 };
237 Ok::<_, DataFusionError>(db_type.sql_string_literal(&format))
238 })
239 }
240
241 fn unparse_float64_array(
242 &self,
243 array: &Float64Array,
244 _remote_type: RemoteType,
245 ) -> DFResult<Vec<String>> {
246 unparse_array!(array)
247 }
248
249 fn unparse_date32_array(
250 &self,
251 array: &Date32Array,
252 remote_type: RemoteType,
253 ) -> DFResult<Vec<String>> {
254 let db_type = remote_type.db_type();
255 unparse_array!(array, |v| {
256 let Some(date) = date32_to_datetime(v) else {
257 return Err(DataFusionError::Internal(format!(
258 "invalid date32 value: {v}"
259 )));
260 };
261 Ok::<_, DataFusionError>(
262 db_type.sql_string_literal(&date.format("%Y-%m-%d").to_string()),
263 )
264 })
265 }
266
267 fn unparse_time64_microsecond_array(
268 &self,
269 array: &Time64MicrosecondArray,
270 remote_type: RemoteType,
271 ) -> DFResult<Vec<String>> {
272 let db_type = remote_type.db_type();
273 unparse_array!(array, |v| {
274 let Some(time) = time64us_to_time(v) else {
275 return Err(DataFusionError::Internal(format!(
276 "invalid time64 microsecond value: {v}"
277 )));
278 };
279 Ok::<_, DataFusionError>(
280 db_type.sql_string_literal(&time.format("%H:%M:%S.%f").to_string()),
281 )
282 })
283 }
284
285 fn unparse_time64_nanosecond_array(
286 &self,
287 array: &Time64NanosecondArray,
288 remote_type: RemoteType,
289 ) -> DFResult<Vec<String>> {
290 let db_type = remote_type.db_type();
291 unparse_array!(array, |v| {
292 let Some(time) = time64ns_to_time(v) else {
293 return Err(DataFusionError::Internal(format!(
294 "invalid time64 nanosecond value: {v}"
295 )));
296 };
297 Ok::<_, DataFusionError>(
298 db_type.sql_string_literal(&time.format("%H:%M:%S.%f").to_string()),
299 )
300 })
301 }
302
303 fn unparse_interval_month_day_nano_array(
304 &self,
305 array: &IntervalMonthDayNanoArray,
306 remote_type: RemoteType,
307 ) -> DFResult<Vec<String>> {
308 let db_type = remote_type.db_type();
309 unparse_array!(array, |v: IntervalMonthDayNano| {
310 let mut s = String::new();
311 let mut prefix = "";
312
313 if v.months != 0 {
314 s.push_str(&format!("{prefix}{} mons", v.months));
315 prefix = " ";
316 }
317
318 if v.days != 0 {
319 s.push_str(&format!("{prefix}{} days", v.days));
320 prefix = " ";
321 }
322
323 if v.nanoseconds != 0 {
324 let secs = v.nanoseconds / 1_000_000_000;
325 let mins = secs / 60;
326 let hours = mins / 60;
327
328 let secs = secs - (mins * 60);
329 let mins = mins - (hours * 60);
330
331 let nanoseconds = v.nanoseconds % 1_000_000_000;
332
333 if hours != 0 {
334 s.push_str(&format!("{prefix}{} hours", hours));
335 prefix = " ";
336 }
337
338 if mins != 0 {
339 s.push_str(&format!("{prefix}{} mins", mins));
340 prefix = " ";
341 }
342
343 if secs != 0 || nanoseconds != 0 {
344 let secs_sign = if secs < 0 || nanoseconds < 0 { "-" } else { "" };
345 s.push_str(&format!(
346 "{prefix}{}{}.{:09} secs",
347 secs_sign,
348 secs.abs(),
349 nanoseconds.abs()
350 ));
351 }
352 }
353
354 Ok::<_, DataFusionError>(db_type.sql_string_literal(&s))
355 })
356 }
357
358 fn unparse_string_array(
359 &self,
360 array: &StringArray,
361 remote_type: RemoteType,
362 ) -> DFResult<Vec<String>> {
363 let db_type = remote_type.db_type();
364 unparse_array!(array, |v| Ok::<_, DataFusionError>(
365 db_type.sql_string_literal(v)
366 ))
367 }
368
369 fn unparse_large_string_array(
370 &self,
371 array: &LargeStringArray,
372 remote_type: RemoteType,
373 ) -> DFResult<Vec<String>> {
374 let db_type = remote_type.db_type();
375 unparse_array!(array, |v| Ok::<_, DataFusionError>(
376 db_type.sql_string_literal(v)
377 ))
378 }
379
380 fn unparse_binary_array(
381 &self,
382 array: &BinaryArray,
383 remote_type: RemoteType,
384 ) -> DFResult<Vec<String>> {
385 let db_type = remote_type.db_type();
386 match remote_type {
387 RemoteType::Postgres(PostgresType::PostGisGeometry) => {
388 unparse_array!(array, |v| {
389 let s = db_type.sql_binary_literal(v);
390 Ok::<_, DataFusionError>(format!("ST_GeomFromWKB({s})"))
391 })
392 }
393 _ => unparse_array!(array, |v| Ok::<_, DataFusionError>(
394 db_type.sql_binary_literal(v)
395 )),
396 }
397 }
398
399 fn unparse_fixed_size_binary_array(
400 &self,
401 array: &FixedSizeBinaryArray,
402 remote_type: RemoteType,
403 ) -> DFResult<Vec<String>> {
404 let db_type = remote_type.db_type();
405 match remote_type {
406 RemoteType::Postgres(PostgresType::Uuid) => {
407 unparse_array!(array, |v| Ok::<_, DataFusionError>(format!(
408 "'{}'",
409 hex::encode(v)
410 )))
411 }
412 _ => unparse_array!(array, |v| Ok::<_, DataFusionError>(
413 db_type.sql_binary_literal(v)
414 )),
415 }
416 }
417
418 fn unparse_list_array(
419 &self,
420 array: &ListArray,
421 remote_type: RemoteType,
422 ) -> DFResult<Vec<String>> {
423 let db_type = remote_type.db_type();
424 let data_type = array.data_type();
425 let DataType::List(field) = data_type else {
426 return Err(DataFusionError::Internal(format!(
427 "expect list array, but got {data_type}"
428 )));
429 };
430
431 let inner_type = field.data_type();
432
433 match inner_type {
434 DataType::Boolean => {
435 unparse_array!(array, |v: ArrayRef| {
436 let array = v.as_boolean();
437 let sqls = unparse_array!(array)?;
438 Ok::<_, DataFusionError>(format!("ARRAY[{}]", sqls.join(",")))
439 })
440 }
441 DataType::Int16 => {
442 unparse_array!(array, |v: ArrayRef| {
443 let array = v.as_primitive::<Int16Type>();
444 let sqls = unparse_array!(array)?;
445 Ok::<_, DataFusionError>(format!("ARRAY[{}]", sqls.join(",")))
446 })
447 }
448 DataType::Int32 => {
449 unparse_array!(array, |v: ArrayRef| {
450 let array = v.as_primitive::<Int32Type>();
451 let sqls = unparse_array!(array)?;
452 Ok::<_, DataFusionError>(format!("ARRAY[{}]", sqls.join(",")))
453 })
454 }
455 DataType::Int64 => {
456 unparse_array!(array, |v: ArrayRef| {
457 let array = v.as_primitive::<Int64Type>();
458 let sqls = unparse_array!(array)?;
459 Ok::<_, DataFusionError>(format!("ARRAY[{}]", sqls.join(",")))
460 })
461 }
462 DataType::Float32 => {
463 unparse_array!(array, |v: ArrayRef| {
464 let array = v.as_primitive::<Float32Type>();
465 let sqls = unparse_array!(array)?;
466 Ok::<_, DataFusionError>(format!("ARRAY[{}]", sqls.join(",")))
467 })
468 }
469 DataType::Float64 => {
470 unparse_array!(array, |v: ArrayRef| {
471 let array = v.as_primitive::<Float64Type>();
472 let sqls = unparse_array!(array)?;
473 Ok::<_, DataFusionError>(format!("ARRAY[{}]", sqls.join(",")))
474 })
475 }
476 DataType::Utf8 => {
477 unparse_array!(array, |v: ArrayRef| {
478 let array = v.as_string::<i32>();
479 let sqls = unparse_array!(array, |v| Ok::<_, DataFusionError>(
480 db_type.sql_string_literal(v)
481 ))?;
482 Ok::<_, DataFusionError>(format!("ARRAY[{}]", sqls.join(",")))
483 })
484 }
485 DataType::LargeUtf8 => {
486 unparse_array!(array, |v: ArrayRef| {
487 let array = v.as_string::<i64>();
488 let sqls = unparse_array!(array, |v| Ok::<_, DataFusionError>(
489 db_type.sql_string_literal(v)
490 ))?;
491 Ok::<_, DataFusionError>(format!("ARRAY[{}]", sqls.join(",")))
492 })
493 }
494 _ => Err(DataFusionError::NotImplemented(format!(
495 "Not supported unparsing list array: {data_type}"
496 ))),
497 }
498 }
499
500 fn unparse_decimal128_array(
501 &self,
502 array: &Decimal128Array,
503 remote_type: RemoteType,
504 ) -> DFResult<Vec<String>> {
505 let precision = array.precision();
506 let scale = array.scale();
507 let db_type = remote_type.db_type();
508 unparse_array!(array, |v| Ok::<_, DataFusionError>(
509 db_type.sql_string_literal(&Decimal128Type::format_decimal(v, precision, scale))
510 ))
511 }
512
513 fn unparse_decimal256_array(
514 &self,
515 array: &Decimal256Array,
516 remote_type: RemoteType,
517 ) -> DFResult<Vec<String>> {
518 let precision = array.precision();
519 let scale = array.scale();
520 let db_type = remote_type.db_type();
521 unparse_array!(array, |v| Ok::<_, DataFusionError>(
522 db_type.sql_string_literal(&Decimal256Type::format_decimal(v, precision, scale))
523 ))
524 }
525}
526
527pub fn unparse_array(
528 unparser: &dyn Unparse,
529 array: &ArrayRef,
530 remote_type: RemoteType,
531) -> DFResult<Vec<String>> {
532 match array.data_type() {
533 DataType::Null => {
534 let array = array
535 .as_any()
536 .downcast_ref::<NullArray>()
537 .expect("expect null array");
538 unparser.unparse_null_array(array, remote_type)
539 }
540 DataType::Boolean => {
541 let array = array.as_boolean();
542 unparser.unparse_boolean_array(array, remote_type)
543 }
544 DataType::Int8 => {
545 let array = array.as_primitive::<Int8Type>();
546 unparser.unparse_int8_array(array, remote_type)
547 }
548 DataType::Int16 => {
549 let array = array.as_primitive::<Int16Type>();
550 unparser.unparse_int16_array(array, remote_type)
551 }
552 DataType::Int32 => {
553 let array = array.as_primitive::<Int32Type>();
554 unparser.unparse_int32_array(array, remote_type)
555 }
556 DataType::Int64 => {
557 let array = array.as_primitive::<Int64Type>();
558 unparser.unparse_int64_array(array, remote_type)
559 }
560 DataType::UInt8 => {
561 let array = array.as_primitive::<UInt8Type>();
562 unparser.unparse_uint8_array(array, remote_type)
563 }
564 DataType::UInt16 => {
565 let array = array.as_primitive::<UInt16Type>();
566 unparser.unparse_uint16_array(array, remote_type)
567 }
568 DataType::UInt32 => {
569 let array = array.as_primitive::<UInt32Type>();
570 unparser.unparse_uint32_array(array, remote_type)
571 }
572 DataType::UInt64 => {
573 let array = array.as_primitive::<UInt64Type>();
574 unparser.unparse_uint64_array(array, remote_type)
575 }
576 DataType::Float16 => {
577 let array = array.as_primitive::<Float16Type>();
578 unparser.unparse_float16_array(array, remote_type)
579 }
580 DataType::Float32 => {
581 let array = array.as_primitive::<Float32Type>();
582 unparser.unparse_float32_array(array, remote_type)
583 }
584 DataType::Float64 => {
585 let array = array.as_primitive::<Float64Type>();
586 unparser.unparse_float64_array(array, remote_type)
587 }
588 DataType::Timestamp(TimeUnit::Microsecond, _) => {
589 let array = array.as_primitive::<TimestampMicrosecondType>();
590 unparser.unparse_timestamp_microsecond_array(array, remote_type)
591 }
592 DataType::Timestamp(TimeUnit::Nanosecond, _) => {
593 let array = array.as_primitive::<TimestampNanosecondType>();
594 unparser.unparse_timestamp_nanosecond_array(array, remote_type)
595 }
596 DataType::Date32 => {
597 let array = array.as_primitive::<Date32Type>();
598 unparser.unparse_date32_array(array, remote_type)
599 }
600 DataType::Time64(TimeUnit::Microsecond) => {
601 let array = array.as_primitive::<Time64MicrosecondType>();
602 unparser.unparse_time64_microsecond_array(array, remote_type)
603 }
604 DataType::Time64(TimeUnit::Nanosecond) => {
605 let array = array.as_primitive::<Time64NanosecondType>();
606 unparser.unparse_time64_nanosecond_array(array, remote_type)
607 }
608 DataType::Interval(IntervalUnit::MonthDayNano) => {
609 let array = array.as_primitive::<IntervalMonthDayNanoType>();
610 unparser.unparse_interval_month_day_nano_array(array, remote_type)
611 }
612 DataType::Utf8 => {
613 let array = array.as_string();
614 unparser.unparse_string_array(array, remote_type)
615 }
616 DataType::LargeUtf8 => {
617 let array = array.as_string::<i64>();
618 unparser.unparse_large_string_array(array, remote_type)
619 }
620 DataType::Binary => {
621 let array = array.as_binary::<i32>();
622 unparser.unparse_binary_array(array, remote_type)
623 }
624 DataType::FixedSizeBinary(_) => {
625 let array = array.as_fixed_size_binary();
626 unparser.unparse_fixed_size_binary_array(array, remote_type)
627 }
628 DataType::List(_) => {
629 let array = array.as_list::<i32>();
630 unparser.unparse_list_array(array, remote_type)
631 }
632 DataType::Decimal128(_, _) => {
633 let array = array.as_primitive::<Decimal128Type>();
634 unparser.unparse_decimal128_array(array, remote_type)
635 }
636 DataType::Decimal256(_, _) => {
637 let array = array.as_primitive::<Decimal256Type>();
638 unparser.unparse_decimal256_array(array, remote_type)
639 }
640 _ => Err(DataFusionError::NotImplemented(format!(
641 "Not supported unparsing array: {}",
642 array.data_type()
643 ))),
644 }
645}
646
647#[derive(Debug)]
648pub struct DefaultUnparser {}
649
650impl Unparse for DefaultUnparser {
651 fn as_any(&self) -> &dyn Any {
652 self
653 }
654}