datafusion_table_providers/sql/arrow_sql_gen/
statement.rs

1use bigdecimal::BigDecimal;
2use chrono::{DateTime, FixedOffset};
3use datafusion::arrow::{
4    array::{
5        array, Array, ArrayRef, BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array,
6        Int64Array, Int8Array, LargeStringArray, RecordBatch, StringArray, StructArray,
7        UInt16Array, UInt32Array, UInt64Array, UInt8Array,
8    },
9    datatypes::{DataType, Field, Fields, IntervalUnit, Schema, SchemaRef, TimeUnit},
10    util::display::array_value_to_string,
11};
12use num_bigint::BigInt;
13use sea_query::{
14    Alias, ColumnDef, ColumnType, Expr, GenericBuilder, Index, InsertStatement, IntoIden,
15    IntoIndexColumn, Keyword, MysqlQueryBuilder, OnConflict, PostgresQueryBuilder, Query,
16    QueryBuilder, SimpleExpr, SqliteQueryBuilder, Table,
17};
18use snafu::Snafu;
19use std::sync::Arc;
20use time::{OffsetDateTime, PrimitiveDateTime};
21
22#[derive(Debug, Snafu)]
23pub enum Error {
24    #[snafu(display("Failed to build insert statement: {source}"))]
25    FailedToCreateInsertStatement {
26        source: Box<dyn std::error::Error + Send + Sync>,
27    },
28
29    #[snafu(display("Unimplemented data type in insert statement: {data_type:?}"))]
30    UnimplementedDataTypeInInsertStatement { data_type: DataType },
31}
32
33pub type Result<T, E = Error> = std::result::Result<T, E>;
34
35pub struct CreateTableBuilder {
36    schema: SchemaRef,
37    table_name: String,
38    primary_keys: Vec<String>,
39}
40
41impl CreateTableBuilder {
42    #[must_use]
43    pub fn new(schema: SchemaRef, table_name: &str) -> Self {
44        Self {
45            schema,
46            table_name: table_name.to_string(),
47            primary_keys: Vec::new(),
48        }
49    }
50
51    #[must_use]
52    pub fn primary_keys<T>(mut self, keys: Vec<T>) -> Self
53    where
54        T: Into<String>,
55    {
56        self.primary_keys = keys.into_iter().map(Into::into).collect();
57        self
58    }
59
60    #[must_use]
61    #[cfg(feature = "postgres")]
62    pub fn build_postgres(self) -> Vec<String> {
63        use crate::sql::arrow_sql_gen::postgres::{
64            builder::TypeBuilder, get_postgres_composite_type_name,
65            map_data_type_to_column_type_postgres,
66        };
67        let schema = Arc::clone(&self.schema);
68        let table_name = self.table_name.clone();
69        let main_table_creation =
70            self.build(PostgresQueryBuilder, &|f: &Arc<Field>| -> ColumnType {
71                map_data_type_to_column_type_postgres(f.data_type(), &table_name, f.name())
72            });
73
74        // Postgres supports composite types (i.e. Structs) but needs to have the type defined first
75        // https://www.postgresql.org/docs/current/rowtypes.html
76        let mut creation_stmts = Vec::new();
77        for field in schema.fields() {
78            let DataType::Struct(struct_inner_fields) = field.data_type() else {
79                continue;
80            };
81            let type_builder = TypeBuilder::new(
82                get_postgres_composite_type_name(&table_name, field.name()),
83                struct_inner_fields,
84            );
85            creation_stmts.push(type_builder.build());
86        }
87
88        creation_stmts.push(main_table_creation);
89        creation_stmts
90    }
91
92    #[must_use]
93    pub fn build_sqlite(self) -> String {
94        self.build(SqliteQueryBuilder, &|f: &Arc<Field>| -> ColumnType {
95            // Sqlite does not natively support Arrays, Structs, etc
96            // so we use JSON column type for List, FixedSizeList, LargeList, Struct, etc
97            if f.data_type().is_nested() {
98                return ColumnType::JsonBinary;
99            }
100
101            map_data_type_to_column_type(f.data_type())
102        })
103    }
104
105    #[must_use]
106    pub fn build_mysql(self) -> String {
107        self.build(MysqlQueryBuilder, &|f: &Arc<Field>| -> ColumnType {
108            // MySQL does not natively support Arrays, Structs, etc
109            // so we use JSON column type for List, FixedSizeList, LargeList, Struct, etc
110            if f.data_type().is_nested() {
111                return ColumnType::JsonBinary;
112            }
113            map_data_type_to_column_type(f.data_type())
114        })
115    }
116
117    #[must_use]
118    fn build<T: GenericBuilder>(
119        self,
120        query_builder: T,
121        map_data_type_to_column_type_fn: &dyn Fn(&Arc<Field>) -> ColumnType,
122    ) -> String {
123        let mut create_stmt = Table::create();
124        create_stmt
125            .table(Alias::new(self.table_name.clone()))
126            .if_not_exists();
127
128        for field in self.schema.fields() {
129            let column_type = map_data_type_to_column_type_fn(field);
130            let mut column_def = ColumnDef::new_with_type(Alias::new(field.name()), column_type);
131            if !field.is_nullable() {
132                column_def.not_null();
133            }
134
135            create_stmt.col(&mut column_def);
136        }
137
138        if !self.primary_keys.is_empty() {
139            let mut index = Index::create();
140            index.primary();
141            for key in self.primary_keys {
142                index.col(Alias::new(key).into_iden().into_index_column());
143            }
144            create_stmt.primary_key(&mut index);
145        }
146
147        create_stmt.to_string(query_builder)
148    }
149}
150
151macro_rules! push_value {
152    ($row_values:expr, $column:expr, $row:expr, $array_type:ident) => {{
153        let array = $column.as_any().downcast_ref::<array::$array_type>();
154        if let Some(valid_array) = array {
155            if valid_array.is_null($row) {
156                $row_values.push(Keyword::Null.into());
157                continue;
158            }
159            $row_values.push(valid_array.value($row).into());
160        }
161    }};
162}
163
164macro_rules! push_list_values {
165    ($data_type:expr, $list_array:expr, $row_values:expr, $array_type:ty, $vec_type:ty, $sql_type:expr) => {{
166        let mut list_values: Vec<$vec_type> = Vec::new();
167        for i in 0..$list_array.len() {
168            let temp_array = $list_array.as_any().downcast_ref::<$array_type>();
169            if let Some(valid_array) = temp_array {
170                list_values.push(valid_array.value(i));
171            }
172        }
173        let expr: SimpleExpr = list_values.into();
174        // We must cast here in case the array is empty which SeaQuery does not handle.
175        $row_values.push(expr.cast_as(Alias::new($sql_type)));
176    }};
177}
178
179pub struct InsertBuilder {
180    table_name: String,
181    record_batches: Vec<RecordBatch>,
182}
183
184pub fn use_json_insert_for_type<T: QueryBuilder + 'static>(
185    data_type: &DataType,
186    query_builder: &T,
187) -> bool {
188    #[cfg(feature = "sqlite")]
189    {
190        use std::any::Any;
191        let any_builder = query_builder as &dyn Any;
192        if any_builder.is::<SqliteQueryBuilder>() {
193            return data_type.is_nested();
194        }
195    }
196    #[cfg(feature = "mysql")]
197    {
198        use std::any::Any;
199        let any_builder = query_builder as &dyn Any;
200        if any_builder.is::<MysqlQueryBuilder>() {
201            return data_type.is_nested();
202        }
203    }
204    false
205}
206
207impl InsertBuilder {
208    #[must_use]
209    pub fn new(table_name: &str, record_batches: Vec<RecordBatch>) -> Self {
210        Self {
211            table_name: table_name.to_string(),
212            record_batches,
213        }
214    }
215
216    /// Create an Insert statement from a `RecordBatch`.
217    ///
218    /// # Errors
219    ///
220    /// Returns an error if a column's data type is not supported, or its conversion failed.
221    #[allow(clippy::too_many_lines)]
222    pub fn construct_insert_stmt<T: QueryBuilder + 'static>(
223        &self,
224        insert_stmt: &mut InsertStatement,
225        record_batch: &RecordBatch,
226        query_builder: &T,
227    ) -> Result<()> {
228        for row in 0..record_batch.num_rows() {
229            let mut row_values: Vec<SimpleExpr> = vec![];
230            for col in 0..record_batch.num_columns() {
231                let column = record_batch.column(col);
232                let column_data_type = column.data_type();
233
234                match column_data_type {
235                    DataType::Int8 => push_value!(row_values, column, row, Int8Array),
236                    DataType::Int16 => push_value!(row_values, column, row, Int16Array),
237                    DataType::Int32 => push_value!(row_values, column, row, Int32Array),
238                    DataType::Int64 => push_value!(row_values, column, row, Int64Array),
239                    DataType::UInt8 => push_value!(row_values, column, row, UInt8Array),
240                    DataType::UInt16 => push_value!(row_values, column, row, UInt16Array),
241                    DataType::UInt32 => push_value!(row_values, column, row, UInt32Array),
242                    DataType::UInt64 => push_value!(row_values, column, row, UInt64Array),
243                    DataType::Float32 => push_value!(row_values, column, row, Float32Array),
244                    DataType::Float64 => push_value!(row_values, column, row, Float64Array),
245                    DataType::Utf8 => push_value!(row_values, column, row, StringArray),
246                    DataType::LargeUtf8 => push_value!(row_values, column, row, LargeStringArray),
247                    DataType::Boolean => push_value!(row_values, column, row, BooleanArray),
248                    DataType::Decimal128(_, scale) => {
249                        let array = column.as_any().downcast_ref::<array::Decimal128Array>();
250                        if let Some(valid_array) = array {
251                            if valid_array.is_null(row) {
252                                row_values.push(Keyword::Null.into());
253                                continue;
254                            }
255                            row_values.push(
256                                BigDecimal::new(valid_array.value(row).into(), i64::from(*scale))
257                                    .into(),
258                            );
259                        }
260                    }
261                    DataType::Decimal256(_, scale) => {
262                        let array = column.as_any().downcast_ref::<array::Decimal256Array>();
263                        if let Some(valid_array) = array {
264                            if valid_array.is_null(row) {
265                                row_values.push(Keyword::Null.into());
266                                continue;
267                            }
268
269                            let bigint =
270                                BigInt::from_signed_bytes_le(&valid_array.value(row).to_le_bytes());
271
272                            row_values.push(BigDecimal::new(bigint, i64::from(*scale)).into());
273                        }
274                    }
275                    DataType::Date32 => {
276                        let array = column.as_any().downcast_ref::<array::Date32Array>();
277                        if let Some(valid_array) = array {
278                            if valid_array.is_null(row) {
279                                row_values.push(Keyword::Null.into());
280                                continue;
281                            }
282                            row_values.push(
283                                match OffsetDateTime::from_unix_timestamp(
284                                    i64::from(valid_array.value(row)) * 86_400,
285                                ) {
286                                    Ok(offset_time) => offset_time.date().into(),
287                                    Err(e) => {
288                                        return Result::Err(Error::FailedToCreateInsertStatement {
289                                            source: Box::new(e),
290                                        })
291                                    }
292                                },
293                            );
294                        }
295                    }
296                    DataType::Date64 => {
297                        let array = column.as_any().downcast_ref::<array::Date64Array>();
298                        if let Some(valid_array) = array {
299                            if valid_array.is_null(row) {
300                                row_values.push(Keyword::Null.into());
301                                continue;
302                            }
303                            row_values.push(
304                                match OffsetDateTime::from_unix_timestamp(
305                                    valid_array.value(row) / 1000,
306                                ) {
307                                    Ok(offset_time) => offset_time.date().into(),
308                                    Err(e) => {
309                                        return Result::Err(Error::FailedToCreateInsertStatement {
310                                            source: Box::new(e),
311                                        })
312                                    }
313                                },
314                            );
315                        }
316                    }
317                    DataType::Duration(time_unit) => match time_unit {
318                        TimeUnit::Second => {
319                            push_value!(row_values, column, row, DurationSecondArray);
320                        }
321                        TimeUnit::Microsecond => {
322                            push_value!(row_values, column, row, DurationMicrosecondArray);
323                        }
324                        TimeUnit::Millisecond => {
325                            push_value!(row_values, column, row, DurationMillisecondArray);
326                        }
327                        TimeUnit::Nanosecond => {
328                            push_value!(row_values, column, row, DurationNanosecondArray);
329                        }
330                    },
331                    DataType::Time32(time_unit) => match time_unit {
332                        TimeUnit::Millisecond => {
333                            let array = column
334                                .as_any()
335                                .downcast_ref::<array::Time32MillisecondArray>();
336                            if let Some(valid_array) = array {
337                                if valid_array.is_null(row) {
338                                    row_values.push(Keyword::Null.into());
339                                    continue;
340                                }
341
342                                let (h, m, s, micro) =
343                                    match OffsetDateTime::from_unix_timestamp_nanos(
344                                        i128::from(valid_array.value(row)) * 1_000_000,
345                                    ) {
346                                        Ok(timestamp) => timestamp.to_hms_micro(),
347                                        Err(e) => {
348                                            return Result::Err(
349                                                Error::FailedToCreateInsertStatement {
350                                                    source: Box::new(e),
351                                                },
352                                            )
353                                        }
354                                    };
355
356                                let time = match time::Time::from_hms_micro(h, m, s, micro) {
357                                    Ok(value) => value,
358                                    Err(e) => {
359                                        return Result::Err(Error::FailedToCreateInsertStatement {
360                                            source: Box::new(e),
361                                        })
362                                    }
363                                };
364
365                                row_values.push(time.into());
366                            }
367                        }
368                        TimeUnit::Second => {
369                            let array = column.as_any().downcast_ref::<array::Time32SecondArray>();
370                            if let Some(valid_array) = array {
371                                if valid_array.is_null(row) {
372                                    row_values.push(Keyword::Null.into());
373                                    continue;
374                                }
375
376                                let (h, m, s) = match OffsetDateTime::from_unix_timestamp(
377                                    i64::from(valid_array.value(row)),
378                                ) {
379                                    Ok(timestamp) => timestamp.to_hms(),
380                                    Err(e) => {
381                                        return Result::Err(Error::FailedToCreateInsertStatement {
382                                            source: Box::new(e),
383                                        })
384                                    }
385                                };
386
387                                let time = match time::Time::from_hms(h, m, s) {
388                                    Ok(value) => value,
389                                    Err(e) => {
390                                        return Result::Err(Error::FailedToCreateInsertStatement {
391                                            source: Box::new(e),
392                                        })
393                                    }
394                                };
395
396                                row_values.push(time.into());
397                            }
398                        }
399                        _ => unreachable!(),
400                    },
401                    DataType::Time64(time_unit) => match time_unit {
402                        TimeUnit::Nanosecond => {
403                            let array = column
404                                .as_any()
405                                .downcast_ref::<array::Time64NanosecondArray>();
406                            if let Some(valid_array) = array {
407                                if valid_array.is_null(row) {
408                                    row_values.push(Keyword::Null.into());
409                                    continue;
410                                }
411                                let (h, m, s, nano) =
412                                    match OffsetDateTime::from_unix_timestamp_nanos(i128::from(
413                                        valid_array.value(row),
414                                    )) {
415                                        Ok(timestamp) => timestamp.to_hms_nano(),
416                                        Err(e) => {
417                                            return Result::Err(
418                                                Error::FailedToCreateInsertStatement {
419                                                    source: Box::new(e),
420                                                },
421                                            )
422                                        }
423                                    };
424
425                                let time = match time::Time::from_hms_nano(h, m, s, nano) {
426                                    Ok(value) => value,
427                                    Err(e) => {
428                                        return Result::Err(Error::FailedToCreateInsertStatement {
429                                            source: Box::new(e),
430                                        })
431                                    }
432                                };
433
434                                row_values.push(time.into());
435                            }
436                        }
437                        TimeUnit::Microsecond => {
438                            let array = column
439                                .as_any()
440                                .downcast_ref::<array::Time64MicrosecondArray>();
441                            if let Some(valid_array) = array {
442                                if valid_array.is_null(row) {
443                                    row_values.push(Keyword::Null.into());
444                                    continue;
445                                }
446
447                                let (h, m, s, micro) =
448                                    match OffsetDateTime::from_unix_timestamp_nanos(
449                                        i128::from(valid_array.value(row)) * 1_000,
450                                    ) {
451                                        Ok(timestamp) => timestamp.to_hms_micro(),
452                                        Err(e) => {
453                                            return Result::Err(
454                                                Error::FailedToCreateInsertStatement {
455                                                    source: Box::new(e),
456                                                },
457                                            )
458                                        }
459                                    };
460
461                                let time = match time::Time::from_hms_micro(h, m, s, micro) {
462                                    Ok(value) => value,
463                                    Err(e) => {
464                                        return Result::Err(Error::FailedToCreateInsertStatement {
465                                            source: Box::new(e),
466                                        })
467                                    }
468                                };
469
470                                row_values.push(time.into());
471                            }
472                        }
473                        _ => unreachable!(),
474                    },
475                    DataType::Timestamp(TimeUnit::Second, timezone) => {
476                        let array = column
477                            .as_any()
478                            .downcast_ref::<array::TimestampSecondArray>();
479
480                        if let Some(valid_array) = array {
481                            if valid_array.is_null(row) {
482                                row_values.push(Keyword::Null.into());
483                                continue;
484                            }
485                            if let Some(timezone) = timezone {
486                                let utc_time = DateTime::from_timestamp_nanos(
487                                    valid_array.value(row) * 1_000_000_000,
488                                )
489                                .to_utc();
490                                let offset = parse_fixed_offset(timezone.as_ref()).ok_or(
491                                    Error::FailedToCreateInsertStatement {
492                                        source: "Unable to parse arrow timezone information".into(),
493                                    },
494                                )?;
495                                let time_with_offset = utc_time.with_timezone(&offset);
496                                row_values.push(time_with_offset.into());
497                            } else {
498                                insert_timestamp_into_row_values(
499                                    OffsetDateTime::from_unix_timestamp(valid_array.value(row)),
500                                    &mut row_values,
501                                )?;
502                            }
503                        }
504                    }
505                    DataType::Timestamp(TimeUnit::Millisecond, timezone) => {
506                        let array = column
507                            .as_any()
508                            .downcast_ref::<array::TimestampMillisecondArray>();
509
510                        if let Some(valid_array) = array {
511                            if valid_array.is_null(row) {
512                                row_values.push(Keyword::Null.into());
513                                continue;
514                            }
515                            if let Some(timezone) = timezone {
516                                let utc_time = DateTime::from_timestamp_nanos(
517                                    valid_array.value(row) * 1_000_000,
518                                )
519                                .to_utc();
520                                let offset = parse_fixed_offset(timezone.as_ref()).ok_or(
521                                    Error::FailedToCreateInsertStatement {
522                                        source: "Unable to parse arrow timezone information".into(),
523                                    },
524                                )?;
525                                let time_with_offset = utc_time.with_timezone(&offset);
526                                row_values.push(time_with_offset.into());
527                            } else {
528                                insert_timestamp_into_row_values(
529                                    OffsetDateTime::from_unix_timestamp_nanos(
530                                        i128::from(valid_array.value(row)) * 1_000_000,
531                                    ),
532                                    &mut row_values,
533                                )?;
534                            }
535                        }
536                    }
537                    DataType::Timestamp(TimeUnit::Microsecond, timezone) => {
538                        let array = column
539                            .as_any()
540                            .downcast_ref::<array::TimestampMicrosecondArray>();
541
542                        if let Some(valid_array) = array {
543                            if valid_array.is_null(row) {
544                                row_values.push(Keyword::Null.into());
545                                continue;
546                            }
547                            if let Some(timezone) = timezone {
548                                let utc_time =
549                                    DateTime::from_timestamp_nanos(valid_array.value(row) * 1_000)
550                                        .to_utc();
551                                let offset = parse_fixed_offset(timezone.as_ref()).ok_or(
552                                    Error::FailedToCreateInsertStatement {
553                                        source: "Unable to parse arrow timezone information".into(),
554                                    },
555                                )?;
556                                let time_with_offset = utc_time.with_timezone(&offset);
557                                row_values.push(time_with_offset.into());
558                            } else {
559                                insert_timestamp_into_row_values(
560                                    OffsetDateTime::from_unix_timestamp_nanos(
561                                        i128::from(valid_array.value(row)) * 1_000,
562                                    ),
563                                    &mut row_values,
564                                )?;
565                            }
566                        }
567                    }
568                    DataType::Timestamp(TimeUnit::Nanosecond, timezone) => {
569                        let array = column
570                            .as_any()
571                            .downcast_ref::<array::TimestampNanosecondArray>();
572
573                        if let Some(valid_array) = array {
574                            if valid_array.is_null(row) {
575                                row_values.push(Keyword::Null.into());
576                                continue;
577                            }
578                            if let Some(timezone) = timezone {
579                                let utc_time =
580                                    DateTime::from_timestamp_nanos(valid_array.value(row)).to_utc();
581                                let offset = parse_fixed_offset(timezone.as_ref()).ok_or(
582                                    Error::FailedToCreateInsertStatement {
583                                        source: "Unable to parse arrow timezone information".into(),
584                                    },
585                                )?;
586                                let time_with_offset = utc_time.with_timezone(&offset);
587                                row_values.push(time_with_offset.into());
588                            } else {
589                                insert_timestamp_into_row_values(
590                                    OffsetDateTime::from_unix_timestamp_nanos(i128::from(
591                                        valid_array.value(row),
592                                    )),
593                                    &mut row_values,
594                                )?;
595                            }
596                        }
597                    }
598                    DataType::List(list_type) => {
599                        let array = column.as_any().downcast_ref::<array::ListArray>();
600                        if let Some(valid_array) = array {
601                            if valid_array.is_null(row) {
602                                row_values.push(Keyword::Null.into());
603                                continue;
604                            }
605                            let list_array = valid_array.value(row);
606
607                            if use_json_insert_for_type(column_data_type, query_builder) {
608                                insert_list_into_row_values_json(
609                                    list_array,
610                                    list_type,
611                                    &mut row_values,
612                                )?;
613                            } else {
614                                insert_list_into_row_values(list_array, list_type, &mut row_values);
615                            }
616                        }
617                    }
618                    DataType::LargeList(list_type) => {
619                        let array = column.as_any().downcast_ref::<array::LargeListArray>();
620                        if let Some(valid_array) = array {
621                            if valid_array.is_null(row) {
622                                row_values.push(Keyword::Null.into());
623                                continue;
624                            }
625                            let list_array = valid_array.value(row);
626
627                            if use_json_insert_for_type(column_data_type, query_builder) {
628                                insert_list_into_row_values_json(
629                                    list_array,
630                                    list_type,
631                                    &mut row_values,
632                                )?;
633                            } else {
634                                insert_list_into_row_values(list_array, list_type, &mut row_values);
635                            }
636                        }
637                    }
638                    DataType::FixedSizeList(list_type, _) => {
639                        let array = column.as_any().downcast_ref::<array::FixedSizeListArray>();
640                        if let Some(valid_array) = array {
641                            if valid_array.is_null(row) {
642                                row_values.push(Keyword::Null.into());
643                                continue;
644                            }
645                            let list_array = valid_array.value(row);
646
647                            if use_json_insert_for_type(column_data_type, query_builder) {
648                                insert_list_into_row_values_json(
649                                    list_array,
650                                    list_type,
651                                    &mut row_values,
652                                )?;
653                            } else {
654                                insert_list_into_row_values(list_array, list_type, &mut row_values);
655                            }
656                        }
657                    }
658                    DataType::Binary => {
659                        let array = column.as_any().downcast_ref::<array::BinaryArray>();
660
661                        if let Some(valid_array) = array {
662                            if valid_array.is_null(row) {
663                                row_values.push(Keyword::Null.into());
664                                continue;
665                            }
666
667                            row_values.push(valid_array.value(row).into());
668                        }
669                    }
670                    DataType::LargeBinary => {
671                        let array = column.as_any().downcast_ref::<array::LargeBinaryArray>();
672
673                        if let Some(valid_array) = array {
674                            if valid_array.is_null(row) {
675                                row_values.push(Keyword::Null.into());
676                                continue;
677                            }
678
679                            row_values.push(valid_array.value(row).into());
680                        }
681                    }
682                    DataType::FixedSizeBinary(_) => {
683                        let array = column
684                            .as_any()
685                            .downcast_ref::<array::FixedSizeBinaryArray>();
686
687                        if let Some(valid_array) = array {
688                            if valid_array.is_null(row) {
689                                row_values.push(Keyword::Null.into());
690                                continue;
691                            }
692
693                            row_values.push(valid_array.value(row).into());
694                        }
695                    }
696                    DataType::Interval(interval_unit) => match interval_unit {
697                        IntervalUnit::DayTime => {
698                            let array = column
699                                .as_any()
700                                .downcast_ref::<array::IntervalDayTimeArray>();
701
702                            if let Some(valid_array) = array {
703                                if valid_array.is_null(row) {
704                                    row_values.push(Keyword::Null.into());
705                                    continue;
706                                }
707
708                                let interval_str =
709                                    if let Ok(str) = array_value_to_string(valid_array, row) {
710                                        str
711                                    } else {
712                                        let days = valid_array.value(row).days;
713                                        let milliseconds = valid_array.value(row).milliseconds;
714                                        format!("{days} days {milliseconds} milliseconds")
715                                    };
716
717                                row_values.push(interval_str.into());
718                            }
719                        }
720                        IntervalUnit::YearMonth => {
721                            let array = column
722                                .as_any()
723                                .downcast_ref::<array::IntervalYearMonthArray>();
724
725                            if let Some(valid_array) = array {
726                                if valid_array.is_null(row) {
727                                    row_values.push(Keyword::Null.into());
728                                    continue;
729                                }
730
731                                let interval_str =
732                                    if let Ok(str) = array_value_to_string(valid_array, row) {
733                                        str
734                                    } else {
735                                        let months = valid_array.value(row);
736                                        format!("{months} months")
737                                    };
738
739                                row_values.push(interval_str.into());
740                            }
741                        }
742                        // The smallest unit in Postgres for interval is microsecond
743                        // Cast with loss of precision in nano second
744                        IntervalUnit::MonthDayNano => {
745                            let array = column
746                                .as_any()
747                                .downcast_ref::<array::IntervalMonthDayNanoArray>();
748
749                            if let Some(valid_array) = array {
750                                if valid_array.is_null(row) {
751                                    row_values.push(Keyword::Null.into());
752                                    continue;
753                                }
754
755                                let interval_str =
756                                    if let Ok(str) = array_value_to_string(valid_array, row) {
757                                        str
758                                    } else {
759                                        let months = valid_array.value(row).months;
760                                        let days = valid_array.value(row).days;
761                                        let nanoseconds = valid_array.value(row).nanoseconds;
762                                        let micros = nanoseconds / 1_000;
763                                        format!("{months} months {days} days {micros} microseconds")
764                                    };
765
766                                row_values.push(interval_str.into());
767                            }
768                        }
769                    },
770                    DataType::Struct(fields) => {
771                        let array = column.as_any().downcast_ref::<array::StructArray>();
772
773                        if let Some(valid_array) = array {
774                            if valid_array.is_null(row) {
775                                row_values.push(Keyword::Null.into());
776                                continue;
777                            }
778
779                            if use_json_insert_for_type(column_data_type, query_builder) {
780                                insert_struct_into_row_values_json(
781                                    fields,
782                                    valid_array,
783                                    row,
784                                    &mut row_values,
785                                )?;
786                                continue;
787                            }
788
789                            let mut param_values: Vec<SimpleExpr> = vec![];
790
791                            for col in valid_array.columns() {
792                                match col.data_type() {
793                                    DataType::Int8 => {
794                                        let int_array =
795                                            col.as_any().downcast_ref::<array::Int8Array>();
796
797                                        if let Some(valid_int_array) = int_array {
798                                            param_values.push(valid_int_array.value(row).into());
799                                        }
800                                    }
801                                    DataType::Int16 => {
802                                        let int_array =
803                                            col.as_any().downcast_ref::<array::Int16Array>();
804
805                                        if let Some(valid_int_array) = int_array {
806                                            param_values.push(valid_int_array.value(row).into());
807                                        }
808                                    }
809                                    DataType::Int32 => {
810                                        let int_array =
811                                            col.as_any().downcast_ref::<array::Int32Array>();
812
813                                        if let Some(valid_int_array) = int_array {
814                                            param_values.push(valid_int_array.value(row).into());
815                                        }
816                                    }
817                                    DataType::Int64 => {
818                                        let int_array =
819                                            col.as_any().downcast_ref::<array::Int64Array>();
820
821                                        if let Some(valid_int_array) = int_array {
822                                            param_values.push(valid_int_array.value(row).into());
823                                        }
824                                    }
825                                    DataType::UInt8 => {
826                                        let int_array =
827                                            col.as_any().downcast_ref::<array::UInt8Array>();
828
829                                        if let Some(valid_int_array) = int_array {
830                                            param_values.push(valid_int_array.value(row).into());
831                                        }
832                                    }
833                                    DataType::UInt16 => {
834                                        let int_array =
835                                            col.as_any().downcast_ref::<array::UInt16Array>();
836
837                                        if let Some(valid_int_array) = int_array {
838                                            param_values.push(valid_int_array.value(row).into());
839                                        }
840                                    }
841                                    DataType::UInt32 => {
842                                        let int_array =
843                                            col.as_any().downcast_ref::<array::UInt32Array>();
844
845                                        if let Some(valid_int_array) = int_array {
846                                            param_values.push(valid_int_array.value(row).into());
847                                        }
848                                    }
849                                    DataType::UInt64 => {
850                                        let int_array =
851                                            col.as_any().downcast_ref::<array::UInt64Array>();
852
853                                        if let Some(valid_int_array) = int_array {
854                                            param_values.push(valid_int_array.value(row).into());
855                                        }
856                                    }
857                                    DataType::Float32 => {
858                                        let float_array =
859                                            col.as_any().downcast_ref::<array::Float32Array>();
860
861                                        if let Some(valid_float_array) = float_array {
862                                            param_values.push(valid_float_array.value(row).into());
863                                        }
864                                    }
865                                    DataType::Float64 => {
866                                        let float_array =
867                                            col.as_any().downcast_ref::<array::Float64Array>();
868
869                                        if let Some(valid_float_array) = float_array {
870                                            param_values.push(valid_float_array.value(row).into());
871                                        }
872                                    }
873                                    DataType::Utf8 => {
874                                        let string_array =
875                                            col.as_any().downcast_ref::<array::StringArray>();
876
877                                        if let Some(valid_string_array) = string_array {
878                                            param_values.push(valid_string_array.value(row).into());
879                                        }
880                                    }
881                                    DataType::Null => {
882                                        param_values.push(Keyword::Null.into());
883                                    }
884                                    DataType::Boolean => {
885                                        let bool_array =
886                                            col.as_any().downcast_ref::<array::BooleanArray>();
887
888                                        if let Some(valid_bool_array) = bool_array {
889                                            param_values.push(valid_bool_array.value(row).into());
890                                        }
891                                    }
892                                    DataType::Binary => {
893                                        let binary_array =
894                                            col.as_any().downcast_ref::<array::BinaryArray>();
895
896                                        if let Some(valid_binary_array) = binary_array {
897                                            param_values.push(valid_binary_array.value(row).into());
898                                        }
899                                    }
900                                    DataType::FixedSizeBinary(_) => {
901                                        let binary_array = col
902                                            .as_any()
903                                            .downcast_ref::<array::FixedSizeBinaryArray>();
904
905                                        if let Some(valid_binary_array) = binary_array {
906                                            param_values.push(valid_binary_array.value(row).into());
907                                        }
908                                    }
909                                    DataType::LargeBinary => {
910                                        let binary_array =
911                                            col.as_any().downcast_ref::<array::LargeBinaryArray>();
912
913                                        if let Some(valid_binary_array) = binary_array {
914                                            param_values.push(valid_binary_array.value(row).into());
915                                        }
916                                    }
917                                    DataType::LargeUtf8 => {
918                                        let string_array =
919                                            col.as_any().downcast_ref::<array::LargeStringArray>();
920
921                                        if let Some(valid_string_array) = string_array {
922                                            param_values.push(valid_string_array.value(row).into());
923                                        }
924                                    }
925                                    DataType::Float16
926                                    | DataType::Timestamp(_, _)
927                                    | DataType::Date32
928                                    | DataType::Date64
929                                    | DataType::Time32(_)
930                                    | DataType::Time64(_)
931                                    | DataType::Duration(_)
932                                    | DataType::Interval(_)
933                                    | DataType::BinaryView
934                                    | DataType::Utf8View
935                                    | DataType::List(_)
936                                    | DataType::ListView(_)
937                                    | DataType::FixedSizeList(_, _)
938                                    | DataType::LargeList(_)
939                                    | DataType::LargeListView(_)
940                                    | DataType::Struct(_)
941                                    | DataType::Union(_, _)
942                                    | DataType::Dictionary(_, _)
943                                    | DataType::Map(_, _)
944                                    | DataType::RunEndEncoded(_, _)
945                                    | DataType::Decimal128(_, _)
946                                    | DataType::Decimal256(_, _) => {
947                                        unimplemented!(
948                                            "Data type mapping not implemented for Struct of {}",
949                                            col.data_type()
950                                        )
951                                    }
952                                }
953                            }
954
955                            let mut params_vec = Vec::new();
956                            for param_value in &param_values {
957                                let mut params_str = String::new();
958                                query_builder.prepare_simple_expr(param_value, &mut params_str);
959                                params_vec.push(params_str);
960                            }
961
962                            let params = params_vec.join(", ");
963                            row_values.push(Expr::cust(format!("ROW({params})")));
964                        }
965                    }
966                    unimplemented_type => {
967                        return Result::Err(Error::UnimplementedDataTypeInInsertStatement {
968                            data_type: unimplemented_type.clone(),
969                        })
970                    }
971                }
972            }
973            match insert_stmt.values(row_values) {
974                Ok(_) => (),
975                Err(e) => {
976                    return Result::Err(Error::FailedToCreateInsertStatement {
977                        source: Box::new(e),
978                    })
979                }
980            }
981        }
982        Ok(())
983    }
984
985    ///
986    /// # Errors
987    ///
988    /// Returns an error if any `RecordBatch` fails to convert into a valid postgres insert statement.
989    pub fn build_postgres(self, on_conflict: Option<OnConflict>) -> Result<String> {
990        self.build(PostgresQueryBuilder, on_conflict)
991    }
992
993    ///
994    /// # Errors
995    ///
996    /// Returns an error if any `RecordBatch` fails to convert into a valid sqlite insert statement.
997    pub fn build_sqlite(self, on_conflict: Option<OnConflict>) -> Result<String> {
998        self.build(SqliteQueryBuilder, on_conflict)
999    }
1000
1001    ///
1002    /// # Errors
1003    ///
1004    /// Returns an error if any `RecordBatch` fails to convert into a valid `MySQL` insert statement.
1005    pub fn build_mysql(self, on_conflict: Option<OnConflict>) -> Result<String> {
1006        self.build(MysqlQueryBuilder, on_conflict)
1007    }
1008
1009    /// # Errors
1010    ///
1011    /// Returns an error if any `RecordBatch` fails to convert into a valid insert statement. Upon
1012    /// error, no further `RecordBatch` is processed.
1013    pub fn build<T: GenericBuilder + 'static>(
1014        &self,
1015        query_builder: T,
1016        on_conflict: Option<OnConflict>,
1017    ) -> Result<String> {
1018        let columns: Vec<Alias> = (self.record_batches[0])
1019            .schema()
1020            .fields()
1021            .iter()
1022            .map(|field| Alias::new(field.name()))
1023            .collect();
1024
1025        let mut insert_stmt = Query::insert()
1026            .into_table(Alias::new(&self.table_name))
1027            .columns(columns)
1028            .to_owned();
1029
1030        for record_batch in &self.record_batches {
1031            self.construct_insert_stmt(&mut insert_stmt, record_batch, &query_builder)?;
1032        }
1033        if let Some(on_conflict) = on_conflict {
1034            insert_stmt.on_conflict(on_conflict);
1035        }
1036        Ok(insert_stmt.to_string(query_builder))
1037    }
1038}
1039
1040pub struct IndexBuilder {
1041    table_name: String,
1042    columns: Vec<String>,
1043    unique: bool,
1044}
1045
1046impl IndexBuilder {
1047    #[must_use]
1048    pub fn new(table_name: &str, columns: Vec<&str>) -> Self {
1049        Self {
1050            table_name: table_name.to_string(),
1051            columns: columns.into_iter().map(ToString::to_string).collect(),
1052            unique: false,
1053        }
1054    }
1055
1056    #[must_use]
1057    pub fn unique(mut self) -> Self {
1058        self.unique = true;
1059        self
1060    }
1061
1062    #[must_use]
1063    pub fn index_name(&self) -> String {
1064        format!("i_{}_{}", self.table_name, self.columns.join("_"))
1065    }
1066
1067    #[must_use]
1068    pub fn build_postgres(self) -> String {
1069        self.build(PostgresQueryBuilder)
1070    }
1071
1072    #[must_use]
1073    pub fn build_sqlite(self) -> String {
1074        self.build(SqliteQueryBuilder)
1075    }
1076
1077    #[must_use]
1078    pub fn build_mysql(self) -> String {
1079        self.build(MysqlQueryBuilder)
1080    }
1081
1082    #[must_use]
1083    pub fn build<T: GenericBuilder>(self, query_builder: T) -> String {
1084        let mut index = Index::create();
1085        index.table(Alias::new(&self.table_name));
1086        index.name(self.index_name());
1087        if self.unique {
1088            index.unique();
1089        }
1090        for column in self.columns {
1091            index.col(Alias::new(column).into_iden().into_index_column());
1092        }
1093        index.if_not_exists();
1094        index.to_string(query_builder)
1095    }
1096}
1097
1098fn insert_timestamp_into_row_values(
1099    timestamp: Result<OffsetDateTime, time::error::ComponentRange>,
1100    row_values: &mut Vec<SimpleExpr>,
1101) -> Result<()> {
1102    match timestamp {
1103        Ok(offset_time) => {
1104            row_values.push(PrimitiveDateTime::new(offset_time.date(), offset_time.time()).into());
1105            Ok(())
1106        }
1107        Err(e) => Err(Error::FailedToCreateInsertStatement {
1108            source: Box::new(e),
1109        }),
1110    }
1111}
1112
1113// Reference: https://github.com/apache/arrow-rs/blob/6c59b7637592e4b67b18762b8313f91086c0d5d8/arrow-array/src/timezone.rs#L25
1114#[allow(clippy::cast_lossless)]
1115fn parse_fixed_offset(tz: &str) -> Option<FixedOffset> {
1116    let bytes = tz.as_bytes();
1117
1118    let mut values = match bytes.len() {
1119        // [+-]XX:XX
1120        6 if bytes[3] == b':' => [bytes[1], bytes[2], bytes[4], bytes[5]],
1121        // [+-]XXXX
1122        5 => [bytes[1], bytes[2], bytes[3], bytes[4]],
1123        // [+-]XX
1124        3 => [bytes[1], bytes[2], b'0', b'0'],
1125        _ => return None,
1126    };
1127    values.iter_mut().for_each(|x| *x = x.wrapping_sub(b'0'));
1128    if values.iter().any(|x| *x > 9) {
1129        return None;
1130    }
1131    let secs =
1132        (values[0] * 10 + values[1]) as i32 * 60 * 60 + (values[2] * 10 + values[3]) as i32 * 60;
1133
1134    match bytes[0] {
1135        b'+' => FixedOffset::east_opt(secs),
1136        b'-' => FixedOffset::west_opt(secs),
1137        _ => None,
1138    }
1139}
1140
1141#[allow(clippy::needless_pass_by_value)]
1142fn insert_list_into_row_values(
1143    list_array: Arc<dyn Array>,
1144    list_type: &Arc<Field>,
1145    row_values: &mut Vec<SimpleExpr>,
1146) {
1147    match list_type.data_type() {
1148        DataType::Int8 => push_list_values!(
1149            list_type.data_type(),
1150            list_array,
1151            row_values,
1152            array::Int8Array,
1153            i8,
1154            "int2[]"
1155        ),
1156        DataType::Int16 => push_list_values!(
1157            list_type.data_type(),
1158            list_array,
1159            row_values,
1160            array::Int16Array,
1161            i16,
1162            "int2[]"
1163        ),
1164        DataType::Int32 => push_list_values!(
1165            list_type.data_type(),
1166            list_array,
1167            row_values,
1168            array::Int32Array,
1169            i32,
1170            "int4[]"
1171        ),
1172        DataType::Int64 => push_list_values!(
1173            list_type.data_type(),
1174            list_array,
1175            row_values,
1176            array::Int64Array,
1177            i64,
1178            "int8[]"
1179        ),
1180        DataType::Float32 => push_list_values!(
1181            list_type.data_type(),
1182            list_array,
1183            row_values,
1184            array::Float32Array,
1185            f32,
1186            "float4[]"
1187        ),
1188        DataType::Float64 => push_list_values!(
1189            list_type.data_type(),
1190            list_array,
1191            row_values,
1192            array::Float64Array,
1193            f64,
1194            "float8[]"
1195        ),
1196        DataType::Utf8 => {
1197            let mut list_values: Vec<String> = vec![];
1198            for i in 0..list_array.len() {
1199                let int_array = list_array.as_any().downcast_ref::<array::StringArray>();
1200                if let Some(valid_int_array) = int_array {
1201                    list_values.push(valid_int_array.value(i).to_string());
1202                }
1203            }
1204            let expr: SimpleExpr = list_values.into();
1205            // We must cast here in case the array is empty which SeaQuery does not handle.
1206            row_values.push(expr.cast_as(Alias::new("text[]")));
1207        }
1208        DataType::LargeUtf8 => {
1209            let mut list_values: Vec<String> = vec![];
1210            for i in 0..list_array.len() {
1211                let int_array = list_array
1212                    .as_any()
1213                    .downcast_ref::<array::LargeStringArray>();
1214                if let Some(valid_int_array) = int_array {
1215                    list_values.push(valid_int_array.value(i).to_string());
1216                }
1217            }
1218            let expr: SimpleExpr = list_values.into();
1219            // We must cast here in case the array is empty which SeaQuery does not handle.
1220            row_values.push(expr.cast_as(Alias::new("text[]")));
1221        }
1222        DataType::Boolean => push_list_values!(
1223            list_type.data_type(),
1224            list_array,
1225            row_values,
1226            array::BooleanArray,
1227            bool,
1228            "boolean[]"
1229        ),
1230        DataType::Binary => {
1231            let mut list_values: Vec<Vec<u8>> = Vec::new();
1232            for i in 0..list_array.len() {
1233                let temp_array = list_array.as_any().downcast_ref::<array::BinaryArray>();
1234                if let Some(valid_array) = temp_array {
1235                    list_values.push(valid_array.value(i).to_vec());
1236                }
1237            }
1238            let expr: SimpleExpr = list_values.into();
1239            // We must cast here in case the array is empty which SeaQuery does not handle.
1240            row_values.push(expr.cast_as(Alias::new("bytea[]")));
1241        }
1242        _ => unimplemented!(
1243            "Data type mapping not implemented for {}",
1244            list_type.data_type()
1245        ),
1246    }
1247}
1248
1249#[allow(clippy::cast_sign_loss)]
1250pub(crate) fn map_data_type_to_column_type(data_type: &DataType) -> ColumnType {
1251    match data_type {
1252        DataType::Int8 => ColumnType::TinyInteger,
1253        DataType::Int16 => ColumnType::SmallInteger,
1254        DataType::Int32 => ColumnType::Integer,
1255        DataType::Int64 | DataType::Duration(_) => ColumnType::BigInteger,
1256        DataType::UInt8 => ColumnType::TinyUnsigned,
1257        DataType::UInt16 => ColumnType::SmallUnsigned,
1258        DataType::UInt32 => ColumnType::Unsigned,
1259        DataType::UInt64 => ColumnType::BigUnsigned,
1260        DataType::Float32 => ColumnType::Float,
1261        DataType::Float64 => ColumnType::Double,
1262        DataType::Utf8 | DataType::LargeUtf8 => ColumnType::Text,
1263        DataType::Boolean => ColumnType::Boolean,
1264        #[allow(clippy::cast_sign_loss)] // This is safe because scale will never be negative
1265        DataType::Decimal128(p, s) | DataType::Decimal256(p, s) => {
1266            ColumnType::Decimal(Some((u32::from(*p), *s as u32)))
1267        }
1268        DataType::Timestamp(_unit, time_zone) => {
1269            if time_zone.is_some() {
1270                return ColumnType::TimestampWithTimeZone;
1271            }
1272            ColumnType::Timestamp
1273        }
1274        DataType::Date32 | DataType::Date64 => ColumnType::Date,
1275        DataType::Time64(_unit) | DataType::Time32(_unit) => ColumnType::Time,
1276        DataType::List(list_type)
1277        | DataType::LargeList(list_type)
1278        | DataType::FixedSizeList(list_type, _) => {
1279            ColumnType::Array(map_data_type_to_column_type(list_type.data_type()).into())
1280        }
1281        // Originally mapped to VarBinary type, corresponding to MySQL's varbinary, which has a maximum length of 65535.
1282        // This caused the error: "Row size too large. The maximum row size for the used table type, not counting BLOBs, is 65535.
1283        // This includes storage overhead, check the manual. You have to change some columns to TEXT or BLOBs."
1284        // Changing to Blob fixes this issue. This change does not affect Postgres, and for Sqlite, the mapping type changes from varbinary_blob to blob.
1285        DataType::Binary | DataType::LargeBinary => ColumnType::Blob,
1286        DataType::FixedSizeBinary(num_bytes) => ColumnType::Binary(num_bytes.to_owned() as u32),
1287        DataType::Interval(_) => ColumnType::Interval(None, None),
1288        // Add more mappings here as needed
1289        _ => unimplemented!("Data type mapping not implemented for {:?}", data_type),
1290    }
1291}
1292
1293macro_rules! serialize_list_values {
1294    ($data_type:expr, $list_array:expr, $array_type:ty, $vec_type:ty) => {{
1295        let mut list_values: Vec<$vec_type> = vec![];
1296        if let Some(array) = $list_array.as_any().downcast_ref::<$array_type>() {
1297            for i in 0..array.len() {
1298                list_values.push(array.value(i).into());
1299            }
1300        }
1301
1302        serde_json::to_string(&list_values).map_err(|e| Error::FailedToCreateInsertStatement {
1303            source: Box::new(e),
1304        })?
1305    }};
1306}
1307
1308fn insert_list_into_row_values_json(
1309    list_array: Arc<dyn Array>,
1310    list_type: &Arc<Field>,
1311    row_values: &mut Vec<SimpleExpr>,
1312) -> Result<()> {
1313    let data_type = list_type.data_type();
1314
1315    let json_string: String = match data_type {
1316        DataType::Int8 => serialize_list_values!(data_type, list_array, Int8Array, i8),
1317        DataType::Int16 => serialize_list_values!(data_type, list_array, Int16Array, i16),
1318        DataType::Int32 => serialize_list_values!(data_type, list_array, Int32Array, i32),
1319        DataType::Int64 => serialize_list_values!(data_type, list_array, Int64Array, i64),
1320        DataType::UInt8 => serialize_list_values!(data_type, list_array, UInt8Array, u8),
1321        DataType::UInt16 => serialize_list_values!(data_type, list_array, UInt16Array, u16),
1322        DataType::UInt32 => serialize_list_values!(data_type, list_array, UInt32Array, u32),
1323        DataType::UInt64 => serialize_list_values!(data_type, list_array, UInt64Array, u64),
1324        DataType::Float32 => serialize_list_values!(data_type, list_array, Float32Array, f32),
1325        DataType::Float64 => serialize_list_values!(data_type, list_array, Float64Array, f64),
1326        DataType::Utf8 => serialize_list_values!(data_type, list_array, StringArray, String),
1327        DataType::LargeUtf8 => {
1328            serialize_list_values!(data_type, list_array, LargeStringArray, String)
1329        }
1330        DataType::Boolean => serialize_list_values!(data_type, list_array, BooleanArray, bool),
1331        _ => unimplemented!(
1332            "List to json conversion is not implemented for {}",
1333            list_type.data_type()
1334        ),
1335    };
1336
1337    let expr: SimpleExpr = Expr::value(json_string);
1338    row_values.push(expr);
1339
1340    Ok(())
1341}
1342
1343fn insert_struct_into_row_values_json(
1344    fields: &Fields,
1345    array: &StructArray,
1346    row_index: usize,
1347    row_values: &mut Vec<SimpleExpr>,
1348) -> Result<()> {
1349    // The length of each column in a StructArray is the same as the length of the StructArray itself.
1350    // The presence of null values does not change the length of the columns (affects the validity bitmap only).
1351    // Similar to Recordbatch slice: https://github.com/apache/arrow-rs/blob/855666d9e9283c1ef11648762fe92c7c188b68f1/arrow-array/src/record_batch.rs#L375
1352    let single_row_columns: Vec<ArrayRef> = (0..array.num_columns())
1353        .map(|i| array.column(i).slice(row_index, 1))
1354        .collect();
1355
1356    let batch = RecordBatch::try_new(Arc::new(Schema::new(fields.clone())), single_row_columns)
1357        .map_err(|e| Error::FailedToCreateInsertStatement {
1358            source: Box::new(e),
1359        })?;
1360
1361    let mut writer = datafusion::arrow::json::LineDelimitedWriter::new(Vec::new());
1362    writer
1363        .write(&batch)
1364        .map_err(|e| Error::FailedToCreateInsertStatement {
1365            source: Box::new(e),
1366        })?;
1367    writer
1368        .finish()
1369        .map_err(|e| Error::FailedToCreateInsertStatement {
1370            source: Box::new(e),
1371        })?;
1372    let json_bytes = writer.into_inner();
1373
1374    let json = String::from_utf8(json_bytes).map_err(|e| Error::FailedToCreateInsertStatement {
1375        source: Box::new(e),
1376    })?;
1377
1378    let expr: SimpleExpr = Expr::value(json);
1379    row_values.push(expr);
1380
1381    Ok(())
1382}
1383
1384#[cfg(test)]
1385mod tests {
1386    use std::sync::Arc;
1387
1388    use super::*;
1389    use datafusion::arrow::datatypes::{DataType, Field, Int32Type, Schema};
1390
1391    #[test]
1392    fn test_basic_table_creation() {
1393        let schema = Schema::new(vec![
1394            Field::new("id", DataType::Int32, false),
1395            Field::new("name", DataType::Utf8, false),
1396            Field::new("age", DataType::Int32, true),
1397        ]);
1398        let sql = CreateTableBuilder::new(SchemaRef::new(schema), "users").build_sqlite();
1399
1400        assert_eq!(sql, "CREATE TABLE IF NOT EXISTS \"users\" ( \"id\" integer NOT NULL, \"name\" text NOT NULL, \"age\" integer )");
1401    }
1402
1403    #[test]
1404    fn test_table_insertion() {
1405        let schema1 = Schema::new(vec![
1406            Field::new("id", DataType::Int32, false),
1407            Field::new("name", DataType::Utf8, false),
1408            Field::new("age", DataType::Int32, true),
1409        ]);
1410        let id_array = array::Int32Array::from(vec![1, 2, 3]);
1411        let name_array = array::StringArray::from(vec!["a", "b", "c"]);
1412        let age_array = array::Int32Array::from(vec![10, 20, 30]);
1413
1414        let batch1 = RecordBatch::try_new(
1415            Arc::new(schema1.clone()),
1416            vec![
1417                Arc::new(id_array.clone()),
1418                Arc::new(name_array.clone()),
1419                Arc::new(age_array.clone()),
1420            ],
1421        )
1422        .expect("Unable to build record batch");
1423
1424        let schema2 = Schema::new(vec![
1425            Field::new("id", DataType::Int32, false),
1426            Field::new("name", DataType::Utf8, false),
1427            Field::new("blah", DataType::Int32, true),
1428        ]);
1429
1430        let batch2 = RecordBatch::try_new(
1431            Arc::new(schema2),
1432            vec![
1433                Arc::new(id_array),
1434                Arc::new(name_array),
1435                Arc::new(age_array),
1436            ],
1437        )
1438        .expect("Unable to build record batch");
1439        let record_batches = vec![batch1, batch2];
1440
1441        let sql = InsertBuilder::new("users", record_batches)
1442            .build_postgres(None)
1443            .expect("Failed to build insert statement");
1444        assert_eq!(sql, "INSERT INTO \"users\" (\"id\", \"name\", \"age\") VALUES (1, 'a', 10), (2, 'b', 20), (3, 'c', 30), (1, 'a', 10), (2, 'b', 20), (3, 'c', 30)");
1445    }
1446
1447    #[test]
1448    fn test_table_creation_with_primary_keys() {
1449        let schema = Schema::new(vec![
1450            Field::new("id", DataType::Int32, false),
1451            Field::new("id2", DataType::Int32, false),
1452            Field::new("name", DataType::Utf8, false),
1453            Field::new("age", DataType::Int32, true),
1454        ]);
1455        let sql = CreateTableBuilder::new(SchemaRef::new(schema), "users")
1456            .primary_keys(vec!["id", "id2"])
1457            .build_sqlite();
1458
1459        assert_eq!(sql, "CREATE TABLE IF NOT EXISTS \"users\" ( \"id\" integer NOT NULL, \"id2\" integer NOT NULL, \"name\" text NOT NULL, \"age\" integer, PRIMARY KEY (\"id\", \"id2\") )");
1460    }
1461
1462    #[test]
1463    fn test_table_insertion_with_list() {
1464        let schema1 = Schema::new(vec![Field::new(
1465            "list",
1466            DataType::List(Field::new("item", DataType::Int32, true).into()),
1467            true,
1468        )]);
1469        let list_array = array::ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1470            Some(vec![Some(1), Some(2), Some(3)]),
1471            Some(vec![Some(4), Some(5), Some(6)]),
1472            Some(vec![Some(7), Some(8), Some(9)]),
1473        ]);
1474
1475        let batch = RecordBatch::try_new(Arc::new(schema1.clone()), vec![Arc::new(list_array)])
1476            .expect("Unable to build record batch");
1477
1478        let sql = InsertBuilder::new("arrays", vec![batch])
1479            .build_postgres(None)
1480            .expect("Failed to build insert statement");
1481        assert_eq!(
1482            sql,
1483            "INSERT INTO \"arrays\" (\"list\") VALUES (CAST(ARRAY [1,2,3] AS int4[])), (CAST(ARRAY [4,5,6] AS int4[])), (CAST(ARRAY [7,8,9] AS int4[]))"
1484        );
1485    }
1486
1487    #[test]
1488    fn test_create_index() {
1489        let sql = IndexBuilder::new("users", vec!["id", "name"]).build_postgres();
1490        assert_eq!(
1491            sql,
1492            r#"CREATE INDEX IF NOT EXISTS "i_users_id_name" ON "users" ("id", "name")"#
1493        );
1494    }
1495
1496    #[test]
1497    fn test_create_unique_index() {
1498        let sql = IndexBuilder::new("users", vec!["id", "name"])
1499            .unique()
1500            .build_postgres();
1501        assert_eq!(
1502            sql,
1503            r#"CREATE UNIQUE INDEX IF NOT EXISTS "i_users_id_name" ON "users" ("id", "name")"#
1504        );
1505    }
1506}