1use crate::PostgresType;
2use crate::{DFResult, RemoteType};
3use chrono::{TimeZone, Utc};
4use datafusion::arrow::array::timezone::Tz;
5use datafusion::arrow::array::*;
6use datafusion::arrow::datatypes::*;
7use datafusion::arrow::temporal_conversions::{
8 date32_to_datetime, time64ns_to_time, time64us_to_time, timestamp_ns_to_datetime,
9 timestamp_us_to_datetime,
10};
11use datafusion::error::DataFusionError;
12use std::any::Any;
13use std::fmt::Debug;
14
15macro_rules! literalize_array {
16 ($array:ident) => {{
17 let mut sqls: Vec<String> = Vec::with_capacity($array.len());
18 for v in $array.iter() {
19 match v {
20 Some(v) => {
21 sqls.push(format!("{v}"));
22 }
23 None => {
24 sqls.push("NULL".to_string());
25 }
26 }
27 }
28 Ok::<_, DataFusionError>(sqls)
29 }};
30 ($array:ident, $convert:expr) => {{
31 let mut sqls: Vec<String> = Vec::with_capacity($array.len());
32 for v in $array.iter() {
33 match v {
34 Some(v) => {
35 sqls.push($convert(v)?);
36 }
37 None => {
38 sqls.push("NULL".to_string());
39 }
40 }
41 }
42 Ok::<_, DataFusionError>(sqls)
43 }};
44}
45
46pub trait Literalize: Debug + Send + Sync {
47 fn as_any(&self) -> &dyn Any;
48
49 fn literalize_null_array(
50 &self,
51 array: &NullArray,
52 _remote_type: RemoteType,
53 ) -> DFResult<Vec<String>> {
54 Ok(vec!["NULL".to_string(); array.len()])
55 }
56
57 fn literalize_boolean_array(
58 &self,
59 array: &BooleanArray,
60 _remote_type: RemoteType,
61 ) -> DFResult<Vec<String>> {
62 literalize_array!(array)
63 }
64
65 fn literalize_int8_array(
66 &self,
67 array: &Int8Array,
68 _remote_type: RemoteType,
69 ) -> DFResult<Vec<String>> {
70 literalize_array!(array)
71 }
72
73 fn literalize_int16_array(
74 &self,
75 array: &Int16Array,
76 _remote_type: RemoteType,
77 ) -> DFResult<Vec<String>> {
78 literalize_array!(array)
79 }
80
81 fn literalize_int32_array(
82 &self,
83 array: &Int32Array,
84 _remote_type: RemoteType,
85 ) -> DFResult<Vec<String>> {
86 literalize_array!(array)
87 }
88
89 fn literalize_int64_array(
90 &self,
91 array: &Int64Array,
92 _remote_type: RemoteType,
93 ) -> DFResult<Vec<String>> {
94 literalize_array!(array)
95 }
96
97 fn literalize_uint8_array(
98 &self,
99 array: &UInt8Array,
100 _remote_type: RemoteType,
101 ) -> DFResult<Vec<String>> {
102 literalize_array!(array)
103 }
104
105 fn literalize_uint16_array(
106 &self,
107 array: &UInt16Array,
108 _remote_type: RemoteType,
109 ) -> DFResult<Vec<String>> {
110 literalize_array!(array)
111 }
112
113 fn literalize_uint32_array(
114 &self,
115 array: &UInt32Array,
116 _remote_type: RemoteType,
117 ) -> DFResult<Vec<String>> {
118 literalize_array!(array)
119 }
120
121 fn literalize_uint64_array(
122 &self,
123 array: &UInt64Array,
124 _remote_type: RemoteType,
125 ) -> DFResult<Vec<String>> {
126 literalize_array!(array)
127 }
128
129 fn literalize_float16_array(
130 &self,
131 array: &Float16Array,
132 _remote_type: RemoteType,
133 ) -> DFResult<Vec<String>> {
134 literalize_array!(array)
135 }
136
137 fn literalize_float32_array(
138 &self,
139 array: &Float32Array,
140 _remote_type: RemoteType,
141 ) -> DFResult<Vec<String>> {
142 literalize_array!(array)
143 }
144
145 fn literalize_timestamp_microsecond_array(
146 &self,
147 array: &TimestampMicrosecondArray,
148 remote_type: RemoteType,
149 ) -> DFResult<Vec<String>> {
150 let db_type = remote_type.db_type();
151 let tz = match array.timezone() {
152 Some(tz) => Some(
153 tz.parse::<Tz>()
154 .map_err(|e| DataFusionError::Internal(e.to_string()))?,
155 ),
156 None => None,
157 };
158
159 literalize_array!(array, |v| {
160 let Some(naive) = timestamp_us_to_datetime(v) else {
161 return Err(DataFusionError::Internal(format!(
162 "invalid timestamp microsecond value: {v}"
163 )));
164 };
165 let format = match tz {
166 Some(tz) => {
167 let date = Utc.from_utc_datetime(&naive).with_timezone(&tz);
168 date.format("%Y-%m-%d %H:%M:%S.%f").to_string()
169 }
170 None => naive.format("%Y-%m-%d %H:%M:%S.%f").to_string(),
171 };
172 Ok::<_, DataFusionError>(db_type.sql_string_literal(&format))
173 })
174 }
175
176 fn literalize_timestamp_nanosecond_array(
177 &self,
178 array: &TimestampNanosecondArray,
179 remote_type: RemoteType,
180 ) -> DFResult<Vec<String>> {
181 let db_type = remote_type.db_type();
182 let tz = match array.timezone() {
183 Some(tz) => Some(
184 tz.parse::<Tz>()
185 .map_err(|e| DataFusionError::Internal(e.to_string()))?,
186 ),
187 None => None,
188 };
189
190 literalize_array!(array, |v| {
191 let Some(naive) = timestamp_ns_to_datetime(v) else {
192 return Err(DataFusionError::Internal(format!(
193 "invalid timestamp nanosecond value: {v}"
194 )));
195 };
196 let format = match tz {
197 Some(tz) => {
198 let date = Utc.from_utc_datetime(&naive).with_timezone(&tz);
199 date.format("%Y-%m-%d %H:%M:%S.%f").to_string()
200 }
201 None => naive.format("%Y-%m-%d %H:%M:%S.%f").to_string(),
202 };
203 Ok::<_, DataFusionError>(db_type.sql_string_literal(&format))
204 })
205 }
206
207 fn literalize_float64_array(
208 &self,
209 array: &Float64Array,
210 _remote_type: RemoteType,
211 ) -> DFResult<Vec<String>> {
212 literalize_array!(array)
213 }
214
215 fn literalize_date32_array(
216 &self,
217 array: &Date32Array,
218 remote_type: RemoteType,
219 ) -> DFResult<Vec<String>> {
220 let db_type = remote_type.db_type();
221 literalize_array!(array, |v| {
222 let Some(date) = date32_to_datetime(v) else {
223 return Err(DataFusionError::Internal(format!(
224 "invalid date32 value: {v}"
225 )));
226 };
227 Ok::<_, DataFusionError>(
228 db_type.sql_string_literal(&date.format("%Y-%m-%d").to_string()),
229 )
230 })
231 }
232
233 fn literalize_time64_microsecond_array(
234 &self,
235 array: &Time64MicrosecondArray,
236 remote_type: RemoteType,
237 ) -> DFResult<Vec<String>> {
238 let db_type = remote_type.db_type();
239 literalize_array!(array, |v| {
240 let Some(time) = time64us_to_time(v) else {
241 return Err(DataFusionError::Internal(format!(
242 "invalid time64 microsecond value: {v}"
243 )));
244 };
245 Ok::<_, DataFusionError>(
246 db_type.sql_string_literal(&time.format("%H:%M:%S.%f").to_string()),
247 )
248 })
249 }
250
251 fn literalize_time64_nanosecond_array(
252 &self,
253 array: &Time64NanosecondArray,
254 remote_type: RemoteType,
255 ) -> DFResult<Vec<String>> {
256 let db_type = remote_type.db_type();
257 literalize_array!(array, |v| {
258 let Some(time) = time64ns_to_time(v) else {
259 return Err(DataFusionError::Internal(format!(
260 "invalid time64 nanosecond value: {v}"
261 )));
262 };
263 Ok::<_, DataFusionError>(
264 db_type.sql_string_literal(&time.format("%H:%M:%S.%f").to_string()),
265 )
266 })
267 }
268
269 fn literalize_interval_month_day_nano_array(
270 &self,
271 array: &IntervalMonthDayNanoArray,
272 remote_type: RemoteType,
273 ) -> DFResult<Vec<String>> {
274 let db_type = remote_type.db_type();
275 literalize_array!(array, |v: IntervalMonthDayNano| {
276 let mut s = String::new();
277 let mut prefix = "";
278
279 if v.months != 0 {
280 s.push_str(&format!("{prefix}{} mons", v.months));
281 prefix = " ";
282 }
283
284 if v.days != 0 {
285 s.push_str(&format!("{prefix}{} days", v.days));
286 prefix = " ";
287 }
288
289 if v.nanoseconds != 0 {
290 let secs = v.nanoseconds / 1_000_000_000;
291 let mins = secs / 60;
292 let hours = mins / 60;
293
294 let secs = secs - (mins * 60);
295 let mins = mins - (hours * 60);
296
297 let nanoseconds = v.nanoseconds % 1_000_000_000;
298
299 if hours != 0 {
300 s.push_str(&format!("{prefix}{} hours", hours));
301 prefix = " ";
302 }
303
304 if mins != 0 {
305 s.push_str(&format!("{prefix}{} mins", mins));
306 prefix = " ";
307 }
308
309 if secs != 0 || nanoseconds != 0 {
310 let secs_sign = if secs < 0 || nanoseconds < 0 { "-" } else { "" };
311 s.push_str(&format!(
312 "{prefix}{}{}.{:09} secs",
313 secs_sign,
314 secs.abs(),
315 nanoseconds.abs()
316 ));
317 }
318 }
319
320 Ok::<_, DataFusionError>(db_type.sql_string_literal(&s))
321 })
322 }
323
324 fn literalize_string_array(
325 &self,
326 array: &StringArray,
327 remote_type: RemoteType,
328 ) -> DFResult<Vec<String>> {
329 let db_type = remote_type.db_type();
330 literalize_array!(array, |v| Ok::<_, DataFusionError>(
331 db_type.sql_string_literal(v)
332 ))
333 }
334
335 fn literalize_large_string_array(
336 &self,
337 array: &LargeStringArray,
338 remote_type: RemoteType,
339 ) -> DFResult<Vec<String>> {
340 let db_type = remote_type.db_type();
341 literalize_array!(array, |v| Ok::<_, DataFusionError>(
342 db_type.sql_string_literal(v)
343 ))
344 }
345
346 fn literalize_binary_array(
347 &self,
348 array: &BinaryArray,
349 remote_type: RemoteType,
350 ) -> DFResult<Vec<String>> {
351 let db_type = remote_type.db_type();
352 match remote_type {
353 RemoteType::Postgres(PostgresType::PostGisGeometry) => {
354 literalize_array!(array, |v| {
355 let s = db_type.sql_binary_literal(v);
356 Ok::<_, DataFusionError>(format!("ST_GeomFromWKB({s})"))
357 })
358 }
359 _ => literalize_array!(array, |v| Ok::<_, DataFusionError>(
360 db_type.sql_binary_literal(v)
361 )),
362 }
363 }
364
365 fn literalize_fixed_size_binary_array(
366 &self,
367 array: &FixedSizeBinaryArray,
368 remote_type: RemoteType,
369 ) -> DFResult<Vec<String>> {
370 let db_type = remote_type.db_type();
371 match remote_type {
372 RemoteType::Postgres(PostgresType::Uuid) => {
373 literalize_array!(array, |v| Ok::<_, DataFusionError>(format!(
374 "'{}'",
375 hex::encode(v)
376 )))
377 }
378 _ => literalize_array!(array, |v| Ok::<_, DataFusionError>(
379 db_type.sql_binary_literal(v)
380 )),
381 }
382 }
383
384 fn literalize_list_array(
385 &self,
386 array: &ListArray,
387 remote_type: RemoteType,
388 ) -> DFResult<Vec<String>> {
389 let db_type = remote_type.db_type();
390 let data_type = array.data_type();
391 let DataType::List(field) = data_type else {
392 return Err(DataFusionError::Internal(format!(
393 "expect list array, but got {data_type}"
394 )));
395 };
396
397 let inner_type = field.data_type();
398
399 match inner_type {
400 DataType::Boolean => {
401 literalize_array!(array, |v: ArrayRef| {
402 let array = v.as_boolean();
403 let sqls = literalize_array!(array)?;
404 Ok::<_, DataFusionError>(format!("ARRAY[{}]", sqls.join(",")))
405 })
406 }
407 DataType::Int16 => {
408 literalize_array!(array, |v: ArrayRef| {
409 let array = v.as_primitive::<Int16Type>();
410 let sqls = literalize_array!(array)?;
411 Ok::<_, DataFusionError>(format!("ARRAY[{}]", sqls.join(",")))
412 })
413 }
414 DataType::Int32 => {
415 literalize_array!(array, |v: ArrayRef| {
416 let array = v.as_primitive::<Int32Type>();
417 let sqls = literalize_array!(array)?;
418 Ok::<_, DataFusionError>(format!("ARRAY[{}]", sqls.join(",")))
419 })
420 }
421 DataType::Int64 => {
422 literalize_array!(array, |v: ArrayRef| {
423 let array = v.as_primitive::<Int64Type>();
424 let sqls = literalize_array!(array)?;
425 Ok::<_, DataFusionError>(format!("ARRAY[{}]", sqls.join(",")))
426 })
427 }
428 DataType::Float32 => {
429 literalize_array!(array, |v: ArrayRef| {
430 let array = v.as_primitive::<Float32Type>();
431 let sqls = literalize_array!(array)?;
432 Ok::<_, DataFusionError>(format!("ARRAY[{}]", sqls.join(",")))
433 })
434 }
435 DataType::Float64 => {
436 literalize_array!(array, |v: ArrayRef| {
437 let array = v.as_primitive::<Float64Type>();
438 let sqls = literalize_array!(array)?;
439 Ok::<_, DataFusionError>(format!("ARRAY[{}]", sqls.join(",")))
440 })
441 }
442 DataType::Utf8 => {
443 literalize_array!(array, |v: ArrayRef| {
444 let array = v.as_string::<i32>();
445 let sqls = literalize_array!(array, |v| Ok::<_, DataFusionError>(
446 db_type.sql_string_literal(v)
447 ))?;
448 Ok::<_, DataFusionError>(format!("ARRAY[{}]", sqls.join(",")))
449 })
450 }
451 DataType::LargeUtf8 => {
452 literalize_array!(array, |v: ArrayRef| {
453 let array = v.as_string::<i64>();
454 let sqls = literalize_array!(array, |v| Ok::<_, DataFusionError>(
455 db_type.sql_string_literal(v)
456 ))?;
457 Ok::<_, DataFusionError>(format!("ARRAY[{}]", sqls.join(",")))
458 })
459 }
460 _ => Err(DataFusionError::NotImplemented(format!(
461 "Not supported literalizing list array: {data_type}"
462 ))),
463 }
464 }
465
466 fn literalize_decimal128_array(
467 &self,
468 array: &Decimal128Array,
469 remote_type: RemoteType,
470 ) -> DFResult<Vec<String>> {
471 let precision = array.precision();
472 let scale = array.scale();
473 let db_type = remote_type.db_type();
474 literalize_array!(array, |v| Ok::<_, DataFusionError>(
475 db_type.sql_string_literal(&Decimal128Type::format_decimal(v, precision, scale))
476 ))
477 }
478
479 fn literalize_decimal256_array(
480 &self,
481 array: &Decimal256Array,
482 remote_type: RemoteType,
483 ) -> DFResult<Vec<String>> {
484 let precision = array.precision();
485 let scale = array.scale();
486 let db_type = remote_type.db_type();
487 literalize_array!(array, |v| Ok::<_, DataFusionError>(
488 db_type.sql_string_literal(&Decimal256Type::format_decimal(v, precision, scale))
489 ))
490 }
491}
492
493pub fn literalize_array(
494 literalizer: &dyn Literalize,
495 array: &ArrayRef,
496 remote_type: RemoteType,
497) -> DFResult<Vec<String>> {
498 match array.data_type() {
499 DataType::Null => {
500 let array = array
501 .as_any()
502 .downcast_ref::<NullArray>()
503 .expect("expect null array");
504 literalizer.literalize_null_array(array, remote_type)
505 }
506 DataType::Boolean => {
507 let array = array.as_boolean();
508 literalizer.literalize_boolean_array(array, remote_type)
509 }
510 DataType::Int8 => {
511 let array = array.as_primitive::<Int8Type>();
512 literalizer.literalize_int8_array(array, remote_type)
513 }
514 DataType::Int16 => {
515 let array = array.as_primitive::<Int16Type>();
516 literalizer.literalize_int16_array(array, remote_type)
517 }
518 DataType::Int32 => {
519 let array = array.as_primitive::<Int32Type>();
520 literalizer.literalize_int32_array(array, remote_type)
521 }
522 DataType::Int64 => {
523 let array = array.as_primitive::<Int64Type>();
524 literalizer.literalize_int64_array(array, remote_type)
525 }
526 DataType::UInt8 => {
527 let array = array.as_primitive::<UInt8Type>();
528 literalizer.literalize_uint8_array(array, remote_type)
529 }
530 DataType::UInt16 => {
531 let array = array.as_primitive::<UInt16Type>();
532 literalizer.literalize_uint16_array(array, remote_type)
533 }
534 DataType::UInt32 => {
535 let array = array.as_primitive::<UInt32Type>();
536 literalizer.literalize_uint32_array(array, remote_type)
537 }
538 DataType::UInt64 => {
539 let array = array.as_primitive::<UInt64Type>();
540 literalizer.literalize_uint64_array(array, remote_type)
541 }
542 DataType::Float16 => {
543 let array = array.as_primitive::<Float16Type>();
544 literalizer.literalize_float16_array(array, remote_type)
545 }
546 DataType::Float32 => {
547 let array = array.as_primitive::<Float32Type>();
548 literalizer.literalize_float32_array(array, remote_type)
549 }
550 DataType::Float64 => {
551 let array = array.as_primitive::<Float64Type>();
552 literalizer.literalize_float64_array(array, remote_type)
553 }
554 DataType::Timestamp(TimeUnit::Microsecond, _) => {
555 let array = array.as_primitive::<TimestampMicrosecondType>();
556 literalizer.literalize_timestamp_microsecond_array(array, remote_type)
557 }
558 DataType::Timestamp(TimeUnit::Nanosecond, _) => {
559 let array = array.as_primitive::<TimestampNanosecondType>();
560 literalizer.literalize_timestamp_nanosecond_array(array, remote_type)
561 }
562 DataType::Date32 => {
563 let array = array.as_primitive::<Date32Type>();
564 literalizer.literalize_date32_array(array, remote_type)
565 }
566 DataType::Time64(TimeUnit::Microsecond) => {
567 let array = array.as_primitive::<Time64MicrosecondType>();
568 literalizer.literalize_time64_microsecond_array(array, remote_type)
569 }
570 DataType::Time64(TimeUnit::Nanosecond) => {
571 let array = array.as_primitive::<Time64NanosecondType>();
572 literalizer.literalize_time64_nanosecond_array(array, remote_type)
573 }
574 DataType::Interval(IntervalUnit::MonthDayNano) => {
575 let array = array.as_primitive::<IntervalMonthDayNanoType>();
576 literalizer.literalize_interval_month_day_nano_array(array, remote_type)
577 }
578 DataType::Utf8 => {
579 let array = array.as_string();
580 literalizer.literalize_string_array(array, remote_type)
581 }
582 DataType::LargeUtf8 => {
583 let array = array.as_string::<i64>();
584 literalizer.literalize_large_string_array(array, remote_type)
585 }
586 DataType::Binary => {
587 let array = array.as_binary::<i32>();
588 literalizer.literalize_binary_array(array, remote_type)
589 }
590 DataType::FixedSizeBinary(_) => {
591 let array = array.as_fixed_size_binary();
592 literalizer.literalize_fixed_size_binary_array(array, remote_type)
593 }
594 DataType::List(_) => {
595 let array = array.as_list::<i32>();
596 literalizer.literalize_list_array(array, remote_type)
597 }
598 DataType::Decimal128(_, _) => {
599 let array = array.as_primitive::<Decimal128Type>();
600 literalizer.literalize_decimal128_array(array, remote_type)
601 }
602 DataType::Decimal256(_, _) => {
603 let array = array.as_primitive::<Decimal256Type>();
604 literalizer.literalize_decimal256_array(array, remote_type)
605 }
606 _ => Err(DataFusionError::NotImplemented(format!(
607 "Not supported literalizing array: {}",
608 array.data_type()
609 ))),
610 }
611}
612
613#[derive(Debug)]
614pub struct DefaultLiteralizer {}
615
616impl Literalize for DefaultLiteralizer {
617 fn as_any(&self) -> &dyn Any {
618 self
619 }
620}