1use bigdecimal::BigDecimal;
2use chrono::{DateTime, FixedOffset};
3use datafusion::arrow::{
4 array::{
5 array, Array, ArrayRef, BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array,
6 Int64Array, Int8Array, LargeStringArray, RecordBatch, StringArray, StructArray,
7 UInt16Array, UInt32Array, UInt64Array, UInt8Array,
8 },
9 datatypes::{DataType, Field, Fields, IntervalUnit, Schema, SchemaRef, TimeUnit},
10 util::display::array_value_to_string,
11};
12use num_bigint::BigInt;
13use sea_query::{
14 Alias, ColumnDef, ColumnType, Expr, GenericBuilder, Index, InsertStatement, IntoIden,
15 IntoIndexColumn, Keyword, MysqlQueryBuilder, OnConflict, PostgresQueryBuilder, Query,
16 QueryBuilder, SimpleExpr, SqliteQueryBuilder, Table,
17};
18use snafu::Snafu;
19use std::sync::Arc;
20use time::{OffsetDateTime, PrimitiveDateTime};
21
22#[derive(Debug, Snafu)]
23pub enum Error {
24 #[snafu(display("Failed to build insert statement: {source}"))]
25 FailedToCreateInsertStatement {
26 source: Box<dyn std::error::Error + Send + Sync>,
27 },
28
29 #[snafu(display("Unimplemented data type in insert statement: {data_type:?}"))]
30 UnimplementedDataTypeInInsertStatement { data_type: DataType },
31}
32
33pub type Result<T, E = Error> = std::result::Result<T, E>;
34
35pub struct CreateTableBuilder {
36 schema: SchemaRef,
37 table_name: String,
38 primary_keys: Vec<String>,
39}
40
41impl CreateTableBuilder {
42 #[must_use]
43 pub fn new(schema: SchemaRef, table_name: &str) -> Self {
44 Self {
45 schema,
46 table_name: table_name.to_string(),
47 primary_keys: Vec::new(),
48 }
49 }
50
51 #[must_use]
52 pub fn primary_keys<T>(mut self, keys: Vec<T>) -> Self
53 where
54 T: Into<String>,
55 {
56 self.primary_keys = keys.into_iter().map(Into::into).collect();
57 self
58 }
59
60 #[must_use]
61 #[cfg(feature = "postgres")]
62 pub fn build_postgres(self) -> Vec<String> {
63 use crate::sql::arrow_sql_gen::postgres::{
64 builder::TypeBuilder, get_postgres_composite_type_name,
65 map_data_type_to_column_type_postgres,
66 };
67 let schema = Arc::clone(&self.schema);
68 let table_name = self.table_name.clone();
69 let main_table_creation =
70 self.build(PostgresQueryBuilder, &|f: &Arc<Field>| -> ColumnType {
71 map_data_type_to_column_type_postgres(f.data_type(), &table_name, f.name())
72 });
73
74 let mut creation_stmts = Vec::new();
77 for field in schema.fields() {
78 let DataType::Struct(struct_inner_fields) = field.data_type() else {
79 continue;
80 };
81 let type_builder = TypeBuilder::new(
82 get_postgres_composite_type_name(&table_name, field.name()),
83 struct_inner_fields,
84 );
85 creation_stmts.push(type_builder.build());
86 }
87
88 creation_stmts.push(main_table_creation);
89 creation_stmts
90 }
91
92 #[must_use]
93 pub fn build_sqlite(self) -> String {
94 self.build(SqliteQueryBuilder, &|f: &Arc<Field>| -> ColumnType {
95 if f.data_type().is_nested() {
98 return ColumnType::JsonBinary;
99 }
100
101 map_data_type_to_column_type(f.data_type())
102 })
103 }
104
105 #[must_use]
106 pub fn build_mysql(self) -> String {
107 self.build(MysqlQueryBuilder, &|f: &Arc<Field>| -> ColumnType {
108 if f.data_type().is_nested() {
111 return ColumnType::JsonBinary;
112 }
113 map_data_type_to_column_type(f.data_type())
114 })
115 }
116
117 #[must_use]
118 fn build<T: GenericBuilder>(
119 self,
120 query_builder: T,
121 map_data_type_to_column_type_fn: &dyn Fn(&Arc<Field>) -> ColumnType,
122 ) -> String {
123 let mut create_stmt = Table::create();
124 create_stmt
125 .table(Alias::new(self.table_name.clone()))
126 .if_not_exists();
127
128 for field in self.schema.fields() {
129 let column_type = map_data_type_to_column_type_fn(field);
130 let mut column_def = ColumnDef::new_with_type(Alias::new(field.name()), column_type);
131 if !field.is_nullable() {
132 column_def.not_null();
133 }
134
135 create_stmt.col(&mut column_def);
136 }
137
138 if !self.primary_keys.is_empty() {
139 let mut index = Index::create();
140 index.primary();
141 for key in self.primary_keys {
142 index.col(Alias::new(key).into_iden().into_index_column());
143 }
144 create_stmt.primary_key(&mut index);
145 }
146
147 create_stmt.to_string(query_builder)
148 }
149}
150
151macro_rules! push_value {
152 ($row_values:expr, $column:expr, $row:expr, $array_type:ident) => {{
153 let array = $column.as_any().downcast_ref::<array::$array_type>();
154 if let Some(valid_array) = array {
155 if valid_array.is_null($row) {
156 $row_values.push(Keyword::Null.into());
157 continue;
158 }
159 $row_values.push(valid_array.value($row).into());
160 }
161 }};
162}
163
164macro_rules! push_list_values {
165 ($data_type:expr, $list_array:expr, $row_values:expr, $array_type:ty, $vec_type:ty, $sql_type:expr) => {{
166 let mut list_values: Vec<$vec_type> = Vec::new();
167 for i in 0..$list_array.len() {
168 let temp_array = $list_array.as_any().downcast_ref::<$array_type>();
169 if let Some(valid_array) = temp_array {
170 list_values.push(valid_array.value(i));
171 }
172 }
173 let expr: SimpleExpr = list_values.into();
174 $row_values.push(expr.cast_as(Alias::new($sql_type)));
176 }};
177}
178
179pub struct InsertBuilder {
180 table_name: String,
181 record_batches: Vec<RecordBatch>,
182}
183
184pub fn use_json_insert_for_type<T: QueryBuilder + 'static>(
185 data_type: &DataType,
186 query_builder: &T,
187) -> bool {
188 #[cfg(feature = "sqlite")]
189 {
190 use std::any::Any;
191 let any_builder = query_builder as &dyn Any;
192 if any_builder.is::<SqliteQueryBuilder>() {
193 return data_type.is_nested();
194 }
195 }
196 #[cfg(feature = "mysql")]
197 {
198 use std::any::Any;
199 let any_builder = query_builder as &dyn Any;
200 if any_builder.is::<MysqlQueryBuilder>() {
201 return data_type.is_nested();
202 }
203 }
204 false
205}
206
207impl InsertBuilder {
208 #[must_use]
209 pub fn new(table_name: &str, record_batches: Vec<RecordBatch>) -> Self {
210 Self {
211 table_name: table_name.to_string(),
212 record_batches,
213 }
214 }
215
216 #[allow(clippy::too_many_lines)]
222 pub fn construct_insert_stmt<T: QueryBuilder + 'static>(
223 &self,
224 insert_stmt: &mut InsertStatement,
225 record_batch: &RecordBatch,
226 query_builder: &T,
227 ) -> Result<()> {
228 for row in 0..record_batch.num_rows() {
229 let mut row_values: Vec<SimpleExpr> = vec![];
230 for col in 0..record_batch.num_columns() {
231 let column = record_batch.column(col);
232 let column_data_type = column.data_type();
233
234 match column_data_type {
235 DataType::Int8 => push_value!(row_values, column, row, Int8Array),
236 DataType::Int16 => push_value!(row_values, column, row, Int16Array),
237 DataType::Int32 => push_value!(row_values, column, row, Int32Array),
238 DataType::Int64 => push_value!(row_values, column, row, Int64Array),
239 DataType::UInt8 => push_value!(row_values, column, row, UInt8Array),
240 DataType::UInt16 => push_value!(row_values, column, row, UInt16Array),
241 DataType::UInt32 => push_value!(row_values, column, row, UInt32Array),
242 DataType::UInt64 => push_value!(row_values, column, row, UInt64Array),
243 DataType::Float32 => push_value!(row_values, column, row, Float32Array),
244 DataType::Float64 => push_value!(row_values, column, row, Float64Array),
245 DataType::Utf8 => push_value!(row_values, column, row, StringArray),
246 DataType::LargeUtf8 => push_value!(row_values, column, row, LargeStringArray),
247 DataType::Boolean => push_value!(row_values, column, row, BooleanArray),
248 DataType::Decimal128(_, scale) => {
249 let array = column.as_any().downcast_ref::<array::Decimal128Array>();
250 if let Some(valid_array) = array {
251 if valid_array.is_null(row) {
252 row_values.push(Keyword::Null.into());
253 continue;
254 }
255 row_values.push(
256 BigDecimal::new(valid_array.value(row).into(), i64::from(*scale))
257 .into(),
258 );
259 }
260 }
261 DataType::Decimal256(_, scale) => {
262 let array = column.as_any().downcast_ref::<array::Decimal256Array>();
263 if let Some(valid_array) = array {
264 if valid_array.is_null(row) {
265 row_values.push(Keyword::Null.into());
266 continue;
267 }
268
269 let bigint =
270 BigInt::from_signed_bytes_le(&valid_array.value(row).to_le_bytes());
271
272 row_values.push(BigDecimal::new(bigint, i64::from(*scale)).into());
273 }
274 }
275 DataType::Date32 => {
276 let array = column.as_any().downcast_ref::<array::Date32Array>();
277 if let Some(valid_array) = array {
278 if valid_array.is_null(row) {
279 row_values.push(Keyword::Null.into());
280 continue;
281 }
282 row_values.push(
283 match OffsetDateTime::from_unix_timestamp(
284 i64::from(valid_array.value(row)) * 86_400,
285 ) {
286 Ok(offset_time) => offset_time.date().into(),
287 Err(e) => {
288 return Result::Err(Error::FailedToCreateInsertStatement {
289 source: Box::new(e),
290 })
291 }
292 },
293 );
294 }
295 }
296 DataType::Date64 => {
297 let array = column.as_any().downcast_ref::<array::Date64Array>();
298 if let Some(valid_array) = array {
299 if valid_array.is_null(row) {
300 row_values.push(Keyword::Null.into());
301 continue;
302 }
303 row_values.push(
304 match OffsetDateTime::from_unix_timestamp(
305 valid_array.value(row) / 1000,
306 ) {
307 Ok(offset_time) => offset_time.date().into(),
308 Err(e) => {
309 return Result::Err(Error::FailedToCreateInsertStatement {
310 source: Box::new(e),
311 })
312 }
313 },
314 );
315 }
316 }
317 DataType::Duration(time_unit) => match time_unit {
318 TimeUnit::Second => {
319 push_value!(row_values, column, row, DurationSecondArray);
320 }
321 TimeUnit::Microsecond => {
322 push_value!(row_values, column, row, DurationMicrosecondArray);
323 }
324 TimeUnit::Millisecond => {
325 push_value!(row_values, column, row, DurationMillisecondArray);
326 }
327 TimeUnit::Nanosecond => {
328 push_value!(row_values, column, row, DurationNanosecondArray);
329 }
330 },
331 DataType::Time32(time_unit) => match time_unit {
332 TimeUnit::Millisecond => {
333 let array = column
334 .as_any()
335 .downcast_ref::<array::Time32MillisecondArray>();
336 if let Some(valid_array) = array {
337 if valid_array.is_null(row) {
338 row_values.push(Keyword::Null.into());
339 continue;
340 }
341
342 let (h, m, s, micro) =
343 match OffsetDateTime::from_unix_timestamp_nanos(
344 i128::from(valid_array.value(row)) * 1_000_000,
345 ) {
346 Ok(timestamp) => timestamp.to_hms_micro(),
347 Err(e) => {
348 return Result::Err(
349 Error::FailedToCreateInsertStatement {
350 source: Box::new(e),
351 },
352 )
353 }
354 };
355
356 let time = match time::Time::from_hms_micro(h, m, s, micro) {
357 Ok(value) => value,
358 Err(e) => {
359 return Result::Err(Error::FailedToCreateInsertStatement {
360 source: Box::new(e),
361 })
362 }
363 };
364
365 row_values.push(time.into());
366 }
367 }
368 TimeUnit::Second => {
369 let array = column.as_any().downcast_ref::<array::Time32SecondArray>();
370 if let Some(valid_array) = array {
371 if valid_array.is_null(row) {
372 row_values.push(Keyword::Null.into());
373 continue;
374 }
375
376 let (h, m, s) = match OffsetDateTime::from_unix_timestamp(
377 i64::from(valid_array.value(row)),
378 ) {
379 Ok(timestamp) => timestamp.to_hms(),
380 Err(e) => {
381 return Result::Err(Error::FailedToCreateInsertStatement {
382 source: Box::new(e),
383 })
384 }
385 };
386
387 let time = match time::Time::from_hms(h, m, s) {
388 Ok(value) => value,
389 Err(e) => {
390 return Result::Err(Error::FailedToCreateInsertStatement {
391 source: Box::new(e),
392 })
393 }
394 };
395
396 row_values.push(time.into());
397 }
398 }
399 _ => unreachable!(),
400 },
401 DataType::Time64(time_unit) => match time_unit {
402 TimeUnit::Nanosecond => {
403 let array = column
404 .as_any()
405 .downcast_ref::<array::Time64NanosecondArray>();
406 if let Some(valid_array) = array {
407 if valid_array.is_null(row) {
408 row_values.push(Keyword::Null.into());
409 continue;
410 }
411 let (h, m, s, nano) =
412 match OffsetDateTime::from_unix_timestamp_nanos(i128::from(
413 valid_array.value(row),
414 )) {
415 Ok(timestamp) => timestamp.to_hms_nano(),
416 Err(e) => {
417 return Result::Err(
418 Error::FailedToCreateInsertStatement {
419 source: Box::new(e),
420 },
421 )
422 }
423 };
424
425 let time = match time::Time::from_hms_nano(h, m, s, nano) {
426 Ok(value) => value,
427 Err(e) => {
428 return Result::Err(Error::FailedToCreateInsertStatement {
429 source: Box::new(e),
430 })
431 }
432 };
433
434 row_values.push(time.into());
435 }
436 }
437 TimeUnit::Microsecond => {
438 let array = column
439 .as_any()
440 .downcast_ref::<array::Time64MicrosecondArray>();
441 if let Some(valid_array) = array {
442 if valid_array.is_null(row) {
443 row_values.push(Keyword::Null.into());
444 continue;
445 }
446
447 let (h, m, s, micro) =
448 match OffsetDateTime::from_unix_timestamp_nanos(
449 i128::from(valid_array.value(row)) * 1_000,
450 ) {
451 Ok(timestamp) => timestamp.to_hms_micro(),
452 Err(e) => {
453 return Result::Err(
454 Error::FailedToCreateInsertStatement {
455 source: Box::new(e),
456 },
457 )
458 }
459 };
460
461 let time = match time::Time::from_hms_micro(h, m, s, micro) {
462 Ok(value) => value,
463 Err(e) => {
464 return Result::Err(Error::FailedToCreateInsertStatement {
465 source: Box::new(e),
466 })
467 }
468 };
469
470 row_values.push(time.into());
471 }
472 }
473 _ => unreachable!(),
474 },
475 DataType::Timestamp(TimeUnit::Second, timezone) => {
476 let array = column
477 .as_any()
478 .downcast_ref::<array::TimestampSecondArray>();
479
480 if let Some(valid_array) = array {
481 if valid_array.is_null(row) {
482 row_values.push(Keyword::Null.into());
483 continue;
484 }
485 if let Some(timezone) = timezone {
486 let utc_time = DateTime::from_timestamp_nanos(
487 valid_array.value(row) * 1_000_000_000,
488 )
489 .to_utc();
490 let offset = parse_fixed_offset(timezone.as_ref()).ok_or(
491 Error::FailedToCreateInsertStatement {
492 source: "Unable to parse arrow timezone information".into(),
493 },
494 )?;
495 let time_with_offset = utc_time.with_timezone(&offset);
496 row_values.push(time_with_offset.into());
497 } else {
498 insert_timestamp_into_row_values(
499 OffsetDateTime::from_unix_timestamp(valid_array.value(row)),
500 &mut row_values,
501 )?;
502 }
503 }
504 }
505 DataType::Timestamp(TimeUnit::Millisecond, timezone) => {
506 let array = column
507 .as_any()
508 .downcast_ref::<array::TimestampMillisecondArray>();
509
510 if let Some(valid_array) = array {
511 if valid_array.is_null(row) {
512 row_values.push(Keyword::Null.into());
513 continue;
514 }
515 if let Some(timezone) = timezone {
516 let utc_time = DateTime::from_timestamp_nanos(
517 valid_array.value(row) * 1_000_000,
518 )
519 .to_utc();
520 let offset = parse_fixed_offset(timezone.as_ref()).ok_or(
521 Error::FailedToCreateInsertStatement {
522 source: "Unable to parse arrow timezone information".into(),
523 },
524 )?;
525 let time_with_offset = utc_time.with_timezone(&offset);
526 row_values.push(time_with_offset.into());
527 } else {
528 insert_timestamp_into_row_values(
529 OffsetDateTime::from_unix_timestamp_nanos(
530 i128::from(valid_array.value(row)) * 1_000_000,
531 ),
532 &mut row_values,
533 )?;
534 }
535 }
536 }
537 DataType::Timestamp(TimeUnit::Microsecond, timezone) => {
538 let array = column
539 .as_any()
540 .downcast_ref::<array::TimestampMicrosecondArray>();
541
542 if let Some(valid_array) = array {
543 if valid_array.is_null(row) {
544 row_values.push(Keyword::Null.into());
545 continue;
546 }
547 if let Some(timezone) = timezone {
548 let utc_time =
549 DateTime::from_timestamp_nanos(valid_array.value(row) * 1_000)
550 .to_utc();
551 let offset = parse_fixed_offset(timezone.as_ref()).ok_or(
552 Error::FailedToCreateInsertStatement {
553 source: "Unable to parse arrow timezone information".into(),
554 },
555 )?;
556 let time_with_offset = utc_time.with_timezone(&offset);
557 row_values.push(time_with_offset.into());
558 } else {
559 insert_timestamp_into_row_values(
560 OffsetDateTime::from_unix_timestamp_nanos(
561 i128::from(valid_array.value(row)) * 1_000,
562 ),
563 &mut row_values,
564 )?;
565 }
566 }
567 }
568 DataType::Timestamp(TimeUnit::Nanosecond, timezone) => {
569 let array = column
570 .as_any()
571 .downcast_ref::<array::TimestampNanosecondArray>();
572
573 if let Some(valid_array) = array {
574 if valid_array.is_null(row) {
575 row_values.push(Keyword::Null.into());
576 continue;
577 }
578 if let Some(timezone) = timezone {
579 let utc_time =
580 DateTime::from_timestamp_nanos(valid_array.value(row)).to_utc();
581 let offset = parse_fixed_offset(timezone.as_ref()).ok_or(
582 Error::FailedToCreateInsertStatement {
583 source: "Unable to parse arrow timezone information".into(),
584 },
585 )?;
586 let time_with_offset = utc_time.with_timezone(&offset);
587 row_values.push(time_with_offset.into());
588 } else {
589 insert_timestamp_into_row_values(
590 OffsetDateTime::from_unix_timestamp_nanos(i128::from(
591 valid_array.value(row),
592 )),
593 &mut row_values,
594 )?;
595 }
596 }
597 }
598 DataType::List(list_type) => {
599 let array = column.as_any().downcast_ref::<array::ListArray>();
600 if let Some(valid_array) = array {
601 if valid_array.is_null(row) {
602 row_values.push(Keyword::Null.into());
603 continue;
604 }
605 let list_array = valid_array.value(row);
606
607 if use_json_insert_for_type(column_data_type, query_builder) {
608 insert_list_into_row_values_json(
609 list_array,
610 list_type,
611 &mut row_values,
612 )?;
613 } else {
614 insert_list_into_row_values(list_array, list_type, &mut row_values);
615 }
616 }
617 }
618 DataType::LargeList(list_type) => {
619 let array = column.as_any().downcast_ref::<array::LargeListArray>();
620 if let Some(valid_array) = array {
621 if valid_array.is_null(row) {
622 row_values.push(Keyword::Null.into());
623 continue;
624 }
625 let list_array = valid_array.value(row);
626
627 if use_json_insert_for_type(column_data_type, query_builder) {
628 insert_list_into_row_values_json(
629 list_array,
630 list_type,
631 &mut row_values,
632 )?;
633 } else {
634 insert_list_into_row_values(list_array, list_type, &mut row_values);
635 }
636 }
637 }
638 DataType::FixedSizeList(list_type, _) => {
639 let array = column.as_any().downcast_ref::<array::FixedSizeListArray>();
640 if let Some(valid_array) = array {
641 if valid_array.is_null(row) {
642 row_values.push(Keyword::Null.into());
643 continue;
644 }
645 let list_array = valid_array.value(row);
646
647 if use_json_insert_for_type(column_data_type, query_builder) {
648 insert_list_into_row_values_json(
649 list_array,
650 list_type,
651 &mut row_values,
652 )?;
653 } else {
654 insert_list_into_row_values(list_array, list_type, &mut row_values);
655 }
656 }
657 }
658 DataType::Binary => {
659 let array = column.as_any().downcast_ref::<array::BinaryArray>();
660
661 if let Some(valid_array) = array {
662 if valid_array.is_null(row) {
663 row_values.push(Keyword::Null.into());
664 continue;
665 }
666
667 row_values.push(valid_array.value(row).into());
668 }
669 }
670 DataType::LargeBinary => {
671 let array = column.as_any().downcast_ref::<array::LargeBinaryArray>();
672
673 if let Some(valid_array) = array {
674 if valid_array.is_null(row) {
675 row_values.push(Keyword::Null.into());
676 continue;
677 }
678
679 row_values.push(valid_array.value(row).into());
680 }
681 }
682 DataType::FixedSizeBinary(_) => {
683 let array = column
684 .as_any()
685 .downcast_ref::<array::FixedSizeBinaryArray>();
686
687 if let Some(valid_array) = array {
688 if valid_array.is_null(row) {
689 row_values.push(Keyword::Null.into());
690 continue;
691 }
692
693 row_values.push(valid_array.value(row).into());
694 }
695 }
696 DataType::Interval(interval_unit) => match interval_unit {
697 IntervalUnit::DayTime => {
698 let array = column
699 .as_any()
700 .downcast_ref::<array::IntervalDayTimeArray>();
701
702 if let Some(valid_array) = array {
703 if valid_array.is_null(row) {
704 row_values.push(Keyword::Null.into());
705 continue;
706 }
707
708 let interval_str =
709 if let Ok(str) = array_value_to_string(valid_array, row) {
710 str
711 } else {
712 let days = valid_array.value(row).days;
713 let milliseconds = valid_array.value(row).milliseconds;
714 format!("{days} days {milliseconds} milliseconds")
715 };
716
717 row_values.push(interval_str.into());
718 }
719 }
720 IntervalUnit::YearMonth => {
721 let array = column
722 .as_any()
723 .downcast_ref::<array::IntervalYearMonthArray>();
724
725 if let Some(valid_array) = array {
726 if valid_array.is_null(row) {
727 row_values.push(Keyword::Null.into());
728 continue;
729 }
730
731 let interval_str =
732 if let Ok(str) = array_value_to_string(valid_array, row) {
733 str
734 } else {
735 let months = valid_array.value(row);
736 format!("{months} months")
737 };
738
739 row_values.push(interval_str.into());
740 }
741 }
742 IntervalUnit::MonthDayNano => {
745 let array = column
746 .as_any()
747 .downcast_ref::<array::IntervalMonthDayNanoArray>();
748
749 if let Some(valid_array) = array {
750 if valid_array.is_null(row) {
751 row_values.push(Keyword::Null.into());
752 continue;
753 }
754
755 let interval_str =
756 if let Ok(str) = array_value_to_string(valid_array, row) {
757 str
758 } else {
759 let months = valid_array.value(row).months;
760 let days = valid_array.value(row).days;
761 let nanoseconds = valid_array.value(row).nanoseconds;
762 let micros = nanoseconds / 1_000;
763 format!("{months} months {days} days {micros} microseconds")
764 };
765
766 row_values.push(interval_str.into());
767 }
768 }
769 },
770 DataType::Struct(fields) => {
771 let array = column.as_any().downcast_ref::<array::StructArray>();
772
773 if let Some(valid_array) = array {
774 if valid_array.is_null(row) {
775 row_values.push(Keyword::Null.into());
776 continue;
777 }
778
779 if use_json_insert_for_type(column_data_type, query_builder) {
780 insert_struct_into_row_values_json(
781 fields,
782 valid_array,
783 row,
784 &mut row_values,
785 )?;
786 continue;
787 }
788
789 let mut param_values: Vec<SimpleExpr> = vec![];
790
791 for col in valid_array.columns() {
792 match col.data_type() {
793 DataType::Int8 => {
794 let int_array =
795 col.as_any().downcast_ref::<array::Int8Array>();
796
797 if let Some(valid_int_array) = int_array {
798 param_values.push(valid_int_array.value(row).into());
799 }
800 }
801 DataType::Int16 => {
802 let int_array =
803 col.as_any().downcast_ref::<array::Int16Array>();
804
805 if let Some(valid_int_array) = int_array {
806 param_values.push(valid_int_array.value(row).into());
807 }
808 }
809 DataType::Int32 => {
810 let int_array =
811 col.as_any().downcast_ref::<array::Int32Array>();
812
813 if let Some(valid_int_array) = int_array {
814 param_values.push(valid_int_array.value(row).into());
815 }
816 }
817 DataType::Int64 => {
818 let int_array =
819 col.as_any().downcast_ref::<array::Int64Array>();
820
821 if let Some(valid_int_array) = int_array {
822 param_values.push(valid_int_array.value(row).into());
823 }
824 }
825 DataType::UInt8 => {
826 let int_array =
827 col.as_any().downcast_ref::<array::UInt8Array>();
828
829 if let Some(valid_int_array) = int_array {
830 param_values.push(valid_int_array.value(row).into());
831 }
832 }
833 DataType::UInt16 => {
834 let int_array =
835 col.as_any().downcast_ref::<array::UInt16Array>();
836
837 if let Some(valid_int_array) = int_array {
838 param_values.push(valid_int_array.value(row).into());
839 }
840 }
841 DataType::UInt32 => {
842 let int_array =
843 col.as_any().downcast_ref::<array::UInt32Array>();
844
845 if let Some(valid_int_array) = int_array {
846 param_values.push(valid_int_array.value(row).into());
847 }
848 }
849 DataType::UInt64 => {
850 let int_array =
851 col.as_any().downcast_ref::<array::UInt64Array>();
852
853 if let Some(valid_int_array) = int_array {
854 param_values.push(valid_int_array.value(row).into());
855 }
856 }
857 DataType::Float32 => {
858 let float_array =
859 col.as_any().downcast_ref::<array::Float32Array>();
860
861 if let Some(valid_float_array) = float_array {
862 param_values.push(valid_float_array.value(row).into());
863 }
864 }
865 DataType::Float64 => {
866 let float_array =
867 col.as_any().downcast_ref::<array::Float64Array>();
868
869 if let Some(valid_float_array) = float_array {
870 param_values.push(valid_float_array.value(row).into());
871 }
872 }
873 DataType::Utf8 => {
874 let string_array =
875 col.as_any().downcast_ref::<array::StringArray>();
876
877 if let Some(valid_string_array) = string_array {
878 param_values.push(valid_string_array.value(row).into());
879 }
880 }
881 DataType::Null => {
882 param_values.push(Keyword::Null.into());
883 }
884 DataType::Boolean => {
885 let bool_array =
886 col.as_any().downcast_ref::<array::BooleanArray>();
887
888 if let Some(valid_bool_array) = bool_array {
889 param_values.push(valid_bool_array.value(row).into());
890 }
891 }
892 DataType::Binary => {
893 let binary_array =
894 col.as_any().downcast_ref::<array::BinaryArray>();
895
896 if let Some(valid_binary_array) = binary_array {
897 param_values.push(valid_binary_array.value(row).into());
898 }
899 }
900 DataType::FixedSizeBinary(_) => {
901 let binary_array = col
902 .as_any()
903 .downcast_ref::<array::FixedSizeBinaryArray>();
904
905 if let Some(valid_binary_array) = binary_array {
906 param_values.push(valid_binary_array.value(row).into());
907 }
908 }
909 DataType::LargeBinary => {
910 let binary_array =
911 col.as_any().downcast_ref::<array::LargeBinaryArray>();
912
913 if let Some(valid_binary_array) = binary_array {
914 param_values.push(valid_binary_array.value(row).into());
915 }
916 }
917 DataType::LargeUtf8 => {
918 let string_array =
919 col.as_any().downcast_ref::<array::LargeStringArray>();
920
921 if let Some(valid_string_array) = string_array {
922 param_values.push(valid_string_array.value(row).into());
923 }
924 }
925 DataType::Float16
926 | DataType::Timestamp(_, _)
927 | DataType::Date32
928 | DataType::Date64
929 | DataType::Time32(_)
930 | DataType::Time64(_)
931 | DataType::Duration(_)
932 | DataType::Interval(_)
933 | DataType::BinaryView
934 | DataType::Utf8View
935 | DataType::List(_)
936 | DataType::ListView(_)
937 | DataType::FixedSizeList(_, _)
938 | DataType::LargeList(_)
939 | DataType::LargeListView(_)
940 | DataType::Struct(_)
941 | DataType::Union(_, _)
942 | DataType::Dictionary(_, _)
943 | DataType::Map(_, _)
944 | DataType::RunEndEncoded(_, _)
945 | DataType::Decimal128(_, _)
946 | DataType::Decimal256(_, _) => {
947 unimplemented!(
948 "Data type mapping not implemented for Struct of {}",
949 col.data_type()
950 )
951 }
952 }
953 }
954
955 let mut params_vec = Vec::new();
956 for param_value in ¶m_values {
957 let mut params_str = String::new();
958 query_builder.prepare_simple_expr(param_value, &mut params_str);
959 params_vec.push(params_str);
960 }
961
962 let params = params_vec.join(", ");
963 row_values.push(Expr::cust(format!("ROW({params})")));
964 }
965 }
966 unimplemented_type => {
967 return Result::Err(Error::UnimplementedDataTypeInInsertStatement {
968 data_type: unimplemented_type.clone(),
969 })
970 }
971 }
972 }
973 match insert_stmt.values(row_values) {
974 Ok(_) => (),
975 Err(e) => {
976 return Result::Err(Error::FailedToCreateInsertStatement {
977 source: Box::new(e),
978 })
979 }
980 }
981 }
982 Ok(())
983 }
984
985 pub fn build_postgres(self, on_conflict: Option<OnConflict>) -> Result<String> {
990 self.build(PostgresQueryBuilder, on_conflict)
991 }
992
993 pub fn build_sqlite(self, on_conflict: Option<OnConflict>) -> Result<String> {
998 self.build(SqliteQueryBuilder, on_conflict)
999 }
1000
1001 pub fn build_mysql(self, on_conflict: Option<OnConflict>) -> Result<String> {
1006 self.build(MysqlQueryBuilder, on_conflict)
1007 }
1008
1009 pub fn build<T: GenericBuilder + 'static>(
1014 &self,
1015 query_builder: T,
1016 on_conflict: Option<OnConflict>,
1017 ) -> Result<String> {
1018 let columns: Vec<Alias> = (self.record_batches[0])
1019 .schema()
1020 .fields()
1021 .iter()
1022 .map(|field| Alias::new(field.name()))
1023 .collect();
1024
1025 let mut insert_stmt = Query::insert()
1026 .into_table(Alias::new(&self.table_name))
1027 .columns(columns)
1028 .to_owned();
1029
1030 for record_batch in &self.record_batches {
1031 self.construct_insert_stmt(&mut insert_stmt, record_batch, &query_builder)?;
1032 }
1033 if let Some(on_conflict) = on_conflict {
1034 insert_stmt.on_conflict(on_conflict);
1035 }
1036 Ok(insert_stmt.to_string(query_builder))
1037 }
1038}
1039
1040pub struct IndexBuilder {
1041 table_name: String,
1042 columns: Vec<String>,
1043 unique: bool,
1044}
1045
1046impl IndexBuilder {
1047 #[must_use]
1048 pub fn new(table_name: &str, columns: Vec<&str>) -> Self {
1049 Self {
1050 table_name: table_name.to_string(),
1051 columns: columns.into_iter().map(ToString::to_string).collect(),
1052 unique: false,
1053 }
1054 }
1055
1056 #[must_use]
1057 pub fn unique(mut self) -> Self {
1058 self.unique = true;
1059 self
1060 }
1061
1062 #[must_use]
1063 pub fn index_name(&self) -> String {
1064 format!("i_{}_{}", self.table_name, self.columns.join("_"))
1065 }
1066
1067 #[must_use]
1068 pub fn build_postgres(self) -> String {
1069 self.build(PostgresQueryBuilder)
1070 }
1071
1072 #[must_use]
1073 pub fn build_sqlite(self) -> String {
1074 self.build(SqliteQueryBuilder)
1075 }
1076
1077 #[must_use]
1078 pub fn build_mysql(self) -> String {
1079 self.build(MysqlQueryBuilder)
1080 }
1081
1082 #[must_use]
1083 pub fn build<T: GenericBuilder>(self, query_builder: T) -> String {
1084 let mut index = Index::create();
1085 index.table(Alias::new(&self.table_name));
1086 index.name(self.index_name());
1087 if self.unique {
1088 index.unique();
1089 }
1090 for column in self.columns {
1091 index.col(Alias::new(column).into_iden().into_index_column());
1092 }
1093 index.if_not_exists();
1094 index.to_string(query_builder)
1095 }
1096}
1097
1098fn insert_timestamp_into_row_values(
1099 timestamp: Result<OffsetDateTime, time::error::ComponentRange>,
1100 row_values: &mut Vec<SimpleExpr>,
1101) -> Result<()> {
1102 match timestamp {
1103 Ok(offset_time) => {
1104 row_values.push(PrimitiveDateTime::new(offset_time.date(), offset_time.time()).into());
1105 Ok(())
1106 }
1107 Err(e) => Err(Error::FailedToCreateInsertStatement {
1108 source: Box::new(e),
1109 }),
1110 }
1111}
1112
1113#[allow(clippy::cast_lossless)]
1115fn parse_fixed_offset(tz: &str) -> Option<FixedOffset> {
1116 let bytes = tz.as_bytes();
1117
1118 let mut values = match bytes.len() {
1119 6 if bytes[3] == b':' => [bytes[1], bytes[2], bytes[4], bytes[5]],
1121 5 => [bytes[1], bytes[2], bytes[3], bytes[4]],
1123 3 => [bytes[1], bytes[2], b'0', b'0'],
1125 _ => return None,
1126 };
1127 values.iter_mut().for_each(|x| *x = x.wrapping_sub(b'0'));
1128 if values.iter().any(|x| *x > 9) {
1129 return None;
1130 }
1131 let secs =
1132 (values[0] * 10 + values[1]) as i32 * 60 * 60 + (values[2] * 10 + values[3]) as i32 * 60;
1133
1134 match bytes[0] {
1135 b'+' => FixedOffset::east_opt(secs),
1136 b'-' => FixedOffset::west_opt(secs),
1137 _ => None,
1138 }
1139}
1140
1141#[allow(clippy::needless_pass_by_value)]
1142fn insert_list_into_row_values(
1143 list_array: Arc<dyn Array>,
1144 list_type: &Arc<Field>,
1145 row_values: &mut Vec<SimpleExpr>,
1146) {
1147 match list_type.data_type() {
1148 DataType::Int8 => push_list_values!(
1149 list_type.data_type(),
1150 list_array,
1151 row_values,
1152 array::Int8Array,
1153 i8,
1154 "int2[]"
1155 ),
1156 DataType::Int16 => push_list_values!(
1157 list_type.data_type(),
1158 list_array,
1159 row_values,
1160 array::Int16Array,
1161 i16,
1162 "int2[]"
1163 ),
1164 DataType::Int32 => push_list_values!(
1165 list_type.data_type(),
1166 list_array,
1167 row_values,
1168 array::Int32Array,
1169 i32,
1170 "int4[]"
1171 ),
1172 DataType::Int64 => push_list_values!(
1173 list_type.data_type(),
1174 list_array,
1175 row_values,
1176 array::Int64Array,
1177 i64,
1178 "int8[]"
1179 ),
1180 DataType::Float32 => push_list_values!(
1181 list_type.data_type(),
1182 list_array,
1183 row_values,
1184 array::Float32Array,
1185 f32,
1186 "float4[]"
1187 ),
1188 DataType::Float64 => push_list_values!(
1189 list_type.data_type(),
1190 list_array,
1191 row_values,
1192 array::Float64Array,
1193 f64,
1194 "float8[]"
1195 ),
1196 DataType::Utf8 => {
1197 let mut list_values: Vec<String> = vec![];
1198 for i in 0..list_array.len() {
1199 let int_array = list_array.as_any().downcast_ref::<array::StringArray>();
1200 if let Some(valid_int_array) = int_array {
1201 list_values.push(valid_int_array.value(i).to_string());
1202 }
1203 }
1204 let expr: SimpleExpr = list_values.into();
1205 row_values.push(expr.cast_as(Alias::new("text[]")));
1207 }
1208 DataType::LargeUtf8 => {
1209 let mut list_values: Vec<String> = vec![];
1210 for i in 0..list_array.len() {
1211 let int_array = list_array
1212 .as_any()
1213 .downcast_ref::<array::LargeStringArray>();
1214 if let Some(valid_int_array) = int_array {
1215 list_values.push(valid_int_array.value(i).to_string());
1216 }
1217 }
1218 let expr: SimpleExpr = list_values.into();
1219 row_values.push(expr.cast_as(Alias::new("text[]")));
1221 }
1222 DataType::Boolean => push_list_values!(
1223 list_type.data_type(),
1224 list_array,
1225 row_values,
1226 array::BooleanArray,
1227 bool,
1228 "boolean[]"
1229 ),
1230 DataType::Binary => {
1231 let mut list_values: Vec<Vec<u8>> = Vec::new();
1232 for i in 0..list_array.len() {
1233 let temp_array = list_array.as_any().downcast_ref::<array::BinaryArray>();
1234 if let Some(valid_array) = temp_array {
1235 list_values.push(valid_array.value(i).to_vec());
1236 }
1237 }
1238 let expr: SimpleExpr = list_values.into();
1239 row_values.push(expr.cast_as(Alias::new("bytea[]")));
1241 }
1242 _ => unimplemented!(
1243 "Data type mapping not implemented for {}",
1244 list_type.data_type()
1245 ),
1246 }
1247}
1248
1249#[allow(clippy::cast_sign_loss)]
1250pub(crate) fn map_data_type_to_column_type(data_type: &DataType) -> ColumnType {
1251 match data_type {
1252 DataType::Int8 => ColumnType::TinyInteger,
1253 DataType::Int16 => ColumnType::SmallInteger,
1254 DataType::Int32 => ColumnType::Integer,
1255 DataType::Int64 | DataType::Duration(_) => ColumnType::BigInteger,
1256 DataType::UInt8 => ColumnType::TinyUnsigned,
1257 DataType::UInt16 => ColumnType::SmallUnsigned,
1258 DataType::UInt32 => ColumnType::Unsigned,
1259 DataType::UInt64 => ColumnType::BigUnsigned,
1260 DataType::Float32 => ColumnType::Float,
1261 DataType::Float64 => ColumnType::Double,
1262 DataType::Utf8 | DataType::LargeUtf8 => ColumnType::Text,
1263 DataType::Boolean => ColumnType::Boolean,
1264 #[allow(clippy::cast_sign_loss)] DataType::Decimal128(p, s) | DataType::Decimal256(p, s) => {
1266 ColumnType::Decimal(Some((u32::from(*p), *s as u32)))
1267 }
1268 DataType::Timestamp(_unit, time_zone) => {
1269 if time_zone.is_some() {
1270 return ColumnType::TimestampWithTimeZone;
1271 }
1272 ColumnType::Timestamp
1273 }
1274 DataType::Date32 | DataType::Date64 => ColumnType::Date,
1275 DataType::Time64(_unit) | DataType::Time32(_unit) => ColumnType::Time,
1276 DataType::List(list_type)
1277 | DataType::LargeList(list_type)
1278 | DataType::FixedSizeList(list_type, _) => {
1279 ColumnType::Array(map_data_type_to_column_type(list_type.data_type()).into())
1280 }
1281 DataType::Binary | DataType::LargeBinary => ColumnType::Blob,
1286 DataType::FixedSizeBinary(num_bytes) => ColumnType::Binary(num_bytes.to_owned() as u32),
1287 DataType::Interval(_) => ColumnType::Interval(None, None),
1288 _ => unimplemented!("Data type mapping not implemented for {:?}", data_type),
1290 }
1291}
1292
1293macro_rules! serialize_list_values {
1294 ($data_type:expr, $list_array:expr, $array_type:ty, $vec_type:ty) => {{
1295 let mut list_values: Vec<$vec_type> = vec![];
1296 if let Some(array) = $list_array.as_any().downcast_ref::<$array_type>() {
1297 for i in 0..array.len() {
1298 list_values.push(array.value(i).into());
1299 }
1300 }
1301
1302 serde_json::to_string(&list_values).map_err(|e| Error::FailedToCreateInsertStatement {
1303 source: Box::new(e),
1304 })?
1305 }};
1306}
1307
1308fn insert_list_into_row_values_json(
1309 list_array: Arc<dyn Array>,
1310 list_type: &Arc<Field>,
1311 row_values: &mut Vec<SimpleExpr>,
1312) -> Result<()> {
1313 let data_type = list_type.data_type();
1314
1315 let json_string: String = match data_type {
1316 DataType::Int8 => serialize_list_values!(data_type, list_array, Int8Array, i8),
1317 DataType::Int16 => serialize_list_values!(data_type, list_array, Int16Array, i16),
1318 DataType::Int32 => serialize_list_values!(data_type, list_array, Int32Array, i32),
1319 DataType::Int64 => serialize_list_values!(data_type, list_array, Int64Array, i64),
1320 DataType::UInt8 => serialize_list_values!(data_type, list_array, UInt8Array, u8),
1321 DataType::UInt16 => serialize_list_values!(data_type, list_array, UInt16Array, u16),
1322 DataType::UInt32 => serialize_list_values!(data_type, list_array, UInt32Array, u32),
1323 DataType::UInt64 => serialize_list_values!(data_type, list_array, UInt64Array, u64),
1324 DataType::Float32 => serialize_list_values!(data_type, list_array, Float32Array, f32),
1325 DataType::Float64 => serialize_list_values!(data_type, list_array, Float64Array, f64),
1326 DataType::Utf8 => serialize_list_values!(data_type, list_array, StringArray, String),
1327 DataType::LargeUtf8 => {
1328 serialize_list_values!(data_type, list_array, LargeStringArray, String)
1329 }
1330 DataType::Boolean => serialize_list_values!(data_type, list_array, BooleanArray, bool),
1331 _ => unimplemented!(
1332 "List to json conversion is not implemented for {}",
1333 list_type.data_type()
1334 ),
1335 };
1336
1337 let expr: SimpleExpr = Expr::value(json_string);
1338 row_values.push(expr);
1339
1340 Ok(())
1341}
1342
1343fn insert_struct_into_row_values_json(
1344 fields: &Fields,
1345 array: &StructArray,
1346 row_index: usize,
1347 row_values: &mut Vec<SimpleExpr>,
1348) -> Result<()> {
1349 let single_row_columns: Vec<ArrayRef> = (0..array.num_columns())
1353 .map(|i| array.column(i).slice(row_index, 1))
1354 .collect();
1355
1356 let batch = RecordBatch::try_new(Arc::new(Schema::new(fields.clone())), single_row_columns)
1357 .map_err(|e| Error::FailedToCreateInsertStatement {
1358 source: Box::new(e),
1359 })?;
1360
1361 let mut writer = datafusion::arrow::json::LineDelimitedWriter::new(Vec::new());
1362 writer
1363 .write(&batch)
1364 .map_err(|e| Error::FailedToCreateInsertStatement {
1365 source: Box::new(e),
1366 })?;
1367 writer
1368 .finish()
1369 .map_err(|e| Error::FailedToCreateInsertStatement {
1370 source: Box::new(e),
1371 })?;
1372 let json_bytes = writer.into_inner();
1373
1374 let json = String::from_utf8(json_bytes).map_err(|e| Error::FailedToCreateInsertStatement {
1375 source: Box::new(e),
1376 })?;
1377
1378 let expr: SimpleExpr = Expr::value(json);
1379 row_values.push(expr);
1380
1381 Ok(())
1382}
1383
1384#[cfg(test)]
1385mod tests {
1386 use std::sync::Arc;
1387
1388 use super::*;
1389 use datafusion::arrow::datatypes::{DataType, Field, Int32Type, Schema};
1390
1391 #[test]
1392 fn test_basic_table_creation() {
1393 let schema = Schema::new(vec![
1394 Field::new("id", DataType::Int32, false),
1395 Field::new("name", DataType::Utf8, false),
1396 Field::new("age", DataType::Int32, true),
1397 ]);
1398 let sql = CreateTableBuilder::new(SchemaRef::new(schema), "users").build_sqlite();
1399
1400 assert_eq!(sql, "CREATE TABLE IF NOT EXISTS \"users\" ( \"id\" integer NOT NULL, \"name\" text NOT NULL, \"age\" integer )");
1401 }
1402
1403 #[test]
1404 fn test_table_insertion() {
1405 let schema1 = Schema::new(vec![
1406 Field::new("id", DataType::Int32, false),
1407 Field::new("name", DataType::Utf8, false),
1408 Field::new("age", DataType::Int32, true),
1409 ]);
1410 let id_array = array::Int32Array::from(vec![1, 2, 3]);
1411 let name_array = array::StringArray::from(vec!["a", "b", "c"]);
1412 let age_array = array::Int32Array::from(vec![10, 20, 30]);
1413
1414 let batch1 = RecordBatch::try_new(
1415 Arc::new(schema1.clone()),
1416 vec![
1417 Arc::new(id_array.clone()),
1418 Arc::new(name_array.clone()),
1419 Arc::new(age_array.clone()),
1420 ],
1421 )
1422 .expect("Unable to build record batch");
1423
1424 let schema2 = Schema::new(vec![
1425 Field::new("id", DataType::Int32, false),
1426 Field::new("name", DataType::Utf8, false),
1427 Field::new("blah", DataType::Int32, true),
1428 ]);
1429
1430 let batch2 = RecordBatch::try_new(
1431 Arc::new(schema2),
1432 vec![
1433 Arc::new(id_array),
1434 Arc::new(name_array),
1435 Arc::new(age_array),
1436 ],
1437 )
1438 .expect("Unable to build record batch");
1439 let record_batches = vec![batch1, batch2];
1440
1441 let sql = InsertBuilder::new("users", record_batches)
1442 .build_postgres(None)
1443 .expect("Failed to build insert statement");
1444 assert_eq!(sql, "INSERT INTO \"users\" (\"id\", \"name\", \"age\") VALUES (1, 'a', 10), (2, 'b', 20), (3, 'c', 30), (1, 'a', 10), (2, 'b', 20), (3, 'c', 30)");
1445 }
1446
1447 #[test]
1448 fn test_table_creation_with_primary_keys() {
1449 let schema = Schema::new(vec![
1450 Field::new("id", DataType::Int32, false),
1451 Field::new("id2", DataType::Int32, false),
1452 Field::new("name", DataType::Utf8, false),
1453 Field::new("age", DataType::Int32, true),
1454 ]);
1455 let sql = CreateTableBuilder::new(SchemaRef::new(schema), "users")
1456 .primary_keys(vec!["id", "id2"])
1457 .build_sqlite();
1458
1459 assert_eq!(sql, "CREATE TABLE IF NOT EXISTS \"users\" ( \"id\" integer NOT NULL, \"id2\" integer NOT NULL, \"name\" text NOT NULL, \"age\" integer, PRIMARY KEY (\"id\", \"id2\") )");
1460 }
1461
1462 #[test]
1463 fn test_table_insertion_with_list() {
1464 let schema1 = Schema::new(vec![Field::new(
1465 "list",
1466 DataType::List(Field::new("item", DataType::Int32, true).into()),
1467 true,
1468 )]);
1469 let list_array = array::ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1470 Some(vec![Some(1), Some(2), Some(3)]),
1471 Some(vec![Some(4), Some(5), Some(6)]),
1472 Some(vec![Some(7), Some(8), Some(9)]),
1473 ]);
1474
1475 let batch = RecordBatch::try_new(Arc::new(schema1.clone()), vec![Arc::new(list_array)])
1476 .expect("Unable to build record batch");
1477
1478 let sql = InsertBuilder::new("arrays", vec![batch])
1479 .build_postgres(None)
1480 .expect("Failed to build insert statement");
1481 assert_eq!(
1482 sql,
1483 "INSERT INTO \"arrays\" (\"list\") VALUES (CAST(ARRAY [1,2,3] AS int4[])), (CAST(ARRAY [4,5,6] AS int4[])), (CAST(ARRAY [7,8,9] AS int4[]))"
1484 );
1485 }
1486
1487 #[test]
1488 fn test_create_index() {
1489 let sql = IndexBuilder::new("users", vec!["id", "name"]).build_postgres();
1490 assert_eq!(
1491 sql,
1492 r#"CREATE INDEX IF NOT EXISTS "i_users_id_name" ON "users" ("id", "name")"#
1493 );
1494 }
1495
1496 #[test]
1497 fn test_create_unique_index() {
1498 let sql = IndexBuilder::new("users", vec!["id", "name"])
1499 .unique()
1500 .build_postgres();
1501 assert_eq!(
1502 sql,
1503 r#"CREATE UNIQUE INDEX IF NOT EXISTS "i_users_id_name" ON "users" ("id", "name")"#
1504 );
1505 }
1506}