1use crate::backend::Backend;
11use crate::connection::{BulkInsert, Connection, ForeignKey};
12use crate::error::SqlError;
13use crate::render::render_value;
14use crate::transaction::{begin_transaction, commit_transaction, rollback_transaction};
15use crate::value::{ColumnInfo, TypeHint, Value};
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
19pub enum IfExists {
20 #[default]
23 Error,
24 Append,
28 Truncate,
32 Skip,
41 Upsert,
50}
51
52impl IfExists {
53 pub fn parse(s: &str) -> Option<Self> {
56 match s.to_ascii_lowercase().as_str() {
57 "error" => Some(Self::Error),
58 "append" => Some(Self::Append),
59 "truncate" => Some(Self::Truncate),
60 "skip" => Some(Self::Skip),
61 "upsert" => Some(Self::Upsert),
62 _ => None,
63 }
64 }
65
66 pub fn resolves_conflicts(self) -> bool {
71 matches!(self, Self::Skip | Self::Upsert)
72 }
73}
74
75#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
85pub enum BulkMode {
86 #[default]
89 Off,
90 Auto,
95 On,
99}
100
101impl BulkMode {
102 pub fn parse(s: &str) -> Option<Self> {
105 match s.to_ascii_lowercase().as_str() {
106 "off" => Some(Self::Off),
107 "auto" => Some(Self::Auto),
108 "on" => Some(Self::On),
109 _ => None,
110 }
111 }
112}
113
114#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
127pub enum CopyFormat {
128 #[default]
131 Text,
132 Binary,
136}
137
138impl CopyFormat {
139 pub fn parse(s: &str) -> Option<Self> {
142 match s.to_ascii_lowercase().as_str() {
143 "text" => Some(Self::Text),
144 "binary" => Some(Self::Binary),
145 _ => None,
146 }
147 }
148}
149
150#[derive(Debug, Clone)]
152pub enum CopySource {
153 Table(String),
156 Query { sql: String, into: String },
159}
160
161impl CopySource {
162 pub fn target_table(&self) -> &str {
164 match self {
165 Self::Table(t) => t,
166 Self::Query { into, .. } => into,
167 }
168 }
169
170 fn source_sql(&self, src_backend: Backend) -> String {
171 match self {
172 Self::Table(t) => format!("SELECT * FROM {}", quote_identifier(t, src_backend)),
173 Self::Query { sql, .. } => sql.clone(),
174 }
175 }
176}
177
178pub struct CopyOptions {
180 pub source: CopySource,
181 pub create_table: bool,
184 pub preserve_pk: bool,
191 pub if_exists: IfExists,
193 pub conflict_key: Vec<String>,
199 pub atomic: bool,
201 pub batch_size: usize,
203 pub bulk_mode: BulkMode,
207 pub copy_format: CopyFormat,
210 pub verbose: bool,
215 pub progress: Option<Box<dyn Fn(usize) + Send>>,
218}
219
220impl Default for CopyOptions {
221 fn default() -> Self {
222 Self {
223 source: CopySource::Table(String::new()),
224 create_table: false,
225 preserve_pk: false,
226 if_exists: IfExists::Error,
227 conflict_key: Vec::new(),
228 atomic: false,
229 batch_size: 1000,
230 bulk_mode: BulkMode::Off,
231 copy_format: CopyFormat::Text,
232 verbose: false,
233 progress: None,
234 }
235 }
236}
237
238impl std::fmt::Debug for CopyOptions {
239 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
240 f.debug_struct("CopyOptions")
241 .field("source", &self.source)
242 .field("create_table", &self.create_table)
243 .field("preserve_pk", &self.preserve_pk)
244 .field("if_exists", &self.if_exists)
245 .field("conflict_key", &self.conflict_key)
246 .field("atomic", &self.atomic)
247 .field("batch_size", &self.batch_size)
248 .field("bulk_mode", &self.bulk_mode)
249 .field("verbose", &self.verbose)
250 .field("progress", &self.progress.is_some())
251 .finish()
252 }
253}
254
255pub fn copy_rows(
258 src: &mut dyn Connection,
259 src_backend: Backend,
260 dst: &mut dyn Connection,
261 dst_backend: Backend,
262 opts: &CopyOptions,
263) -> Result<usize, SqlError> {
264 let target_table = opts.source.target_table().to_string();
265 if target_table.is_empty() {
266 return Err(SqlError::QueryFailed(
267 "copy: target table name is empty".into(),
268 ));
269 }
270 if opts.batch_size == 0 {
271 return Err(SqlError::QueryFailed(
272 "copy: batch_size must be greater than zero".into(),
273 ));
274 }
275
276 let target_exists = table_exists(dst, &target_table)?;
277
278 if !target_exists && !opts.create_table {
279 return Err(SqlError::QueryFailed(format!(
280 "Target table '{target_table}' does not exist on destination. \
281 Pass --create-table to create it from the source schema."
282 )));
283 }
284
285 if target_exists
289 && opts.if_exists == IfExists::Error
290 && table_has_rows(dst, &target_table, dst_backend)?
291 {
292 return Err(SqlError::QueryFailed(format!(
293 "Target table '{target_table}' already contains rows. \
294 Pass --if-exists append, --if-exists truncate, --if-exists skip, \
295 --if-exists upsert, or empty the table first."
296 )));
297 }
298
299 if !opts.conflict_key.is_empty() && !opts.if_exists.resolves_conflicts() {
303 eprintln!(
304 "[ferrule] copy: --key is only meaningful with --if-exists skip|upsert; \
305 ignoring for --if-exists {}.",
306 if_exists_name(opts.if_exists)
307 );
308 }
309
310 let pk_columns: Vec<String> =
315 resolve_conflict_key(dst, &target_table, opts.if_exists, &opts.conflict_key)?;
316
317 if opts.if_exists.resolves_conflicts() && opts.bulk_mode != BulkMode::Off {
324 eprintln!(
325 "[ferrule] bulk: --if-exists {} requires the generic INSERT path; \
326 ignoring --bulk-native for this copy.",
327 if_exists_name(opts.if_exists)
328 );
329 }
330
331 let source_sql = opts.source.source_sql(src_backend);
333 let first_paged = crate::query_builder::apply_paging(
334 &source_sql,
335 Some(opts.batch_size),
336 Some(0),
337 src_backend,
338 )?;
339 let first_page = src.query(&first_paged)?;
340
341 if first_page.columns.is_empty() {
342 return Err(SqlError::QueryFailed(
343 "copy: source query returned no column metadata".into(),
344 ));
345 }
346 let columns: Vec<ColumnInfo> = first_page.columns.clone();
347
348 if !pk_columns.is_empty() {
355 let source_names: Vec<&str> = columns.iter().map(|c| c.name.as_str()).collect();
356 for pk in &pk_columns {
357 if !source_names.iter().any(|n| n == pk) {
358 return Err(SqlError::QueryFailed(format!(
359 "Target PK column '{pk}' is not present in source columns \
360 {source_names:?}. Cross-backend identifier case mismatches \
361 can cause this — re-select with explicit aliases (e.g. \
362 `SELECT id AS \"{pk}\" ...`)."
363 )));
364 }
365 }
366 }
367
368 if !target_exists && opts.create_table {
373 let preserved_pk: Vec<String> = if opts.preserve_pk {
374 let src_table = match &opts.source {
375 CopySource::Table(t) => t.clone(),
376 CopySource::Query { .. } => String::new(),
379 };
380 if src_table.is_empty() {
381 Vec::new()
382 } else {
383 src.primary_key(None, &src_table).unwrap_or_default()
384 }
385 } else {
386 Vec::new()
387 };
388 let ddl = translate_ddl(&target_table, &columns, dst_backend, &preserved_pk);
389 dst.execute(&ddl)?;
390 }
391
392 let outer_tx_opened = if opts.atomic {
397 begin_transaction(dst, dst_backend)
398 } else {
399 false
400 };
401
402 let result = run_copy(
403 src,
404 src_backend,
405 dst,
406 dst_backend,
407 opts,
408 &source_sql,
409 &target_table,
410 &columns,
411 &pk_columns,
412 target_exists,
413 first_page.rows,
414 );
415
416 if outer_tx_opened {
417 match &result {
418 Ok(_) => {
419 commit_transaction(dst, dst_backend)?;
421 }
422 Err(_) => {
423 let _ = rollback_transaction(dst, dst_backend);
425 }
426 }
427 } else if result.is_ok() && backend_needs_explicit_commit(dst_backend) {
428 commit_transaction(dst, dst_backend)?;
437 }
438
439 result
440}
441
442fn resolve_conflict_key(
452 dst: &mut dyn Connection,
453 target_table: &str,
454 if_exists: IfExists,
455 override_: &[String],
456) -> Result<Vec<String>, SqlError> {
457 if !if_exists.resolves_conflicts() {
458 return Ok(Vec::new());
459 }
460 if !override_.is_empty() {
461 return Ok(override_.to_vec());
462 }
463 let pk = dst.primary_key(None, target_table)?;
464 if pk.is_empty() {
465 return Err(SqlError::QueryFailed(format!(
466 "Target table '{target_table}' has no declared primary key — \
467 --if-exists {} requires one. Declare a PK on the destination \
468 table, pass --preserve-pk when creating it, or supply \
469 --key COL[,COL...] to override the conflict columns.",
470 if_exists_name(if_exists)
471 )));
472 }
473 Ok(pk)
474}
475
476pub(crate) fn backend_needs_explicit_commit(backend: Backend) -> bool {
482 #[cfg(feature = "oracle")]
483 {
484 if matches!(backend, Backend::Oracle) {
485 return true;
486 }
487 }
488 let _ = backend;
489 false
490}
491
492#[allow(clippy::too_many_arguments)]
493fn run_copy(
494 src: &mut dyn Connection,
495 src_backend: Backend,
496 dst: &mut dyn Connection,
497 dst_backend: Backend,
498 opts: &CopyOptions,
499 source_sql: &str,
500 target_table: &str,
501 columns: &[ColumnInfo],
502 pk_columns: &[String],
503 target_exists: bool,
504 first_rows: Vec<Vec<Value>>,
505) -> Result<usize, SqlError> {
506 let quoted_table = quote_identifier(target_table, dst_backend);
507 let quoted_cols: Vec<String> = columns
508 .iter()
509 .map(|c| quote_identifier(&c.name, dst_backend))
510 .collect();
511 let cols_clause = quoted_cols.join(", ");
512
513 let need_inner_tx = target_exists && opts.if_exists == IfExists::Truncate && !opts.atomic;
518 let inner_tx_opened = if need_inner_tx {
519 begin_transaction(dst, dst_backend)
520 } else {
521 false
522 };
523
524 let prologue = run_truncate_and_first_batch(
526 dst,
527 dst_backend,
528 opts,
529 target_exists,
530 target_table,
531 columns,
532 pk_columns,
533 "ed_table,
534 &cols_clause,
535 &first_rows,
536 );
537
538 let first_len = match prologue {
539 Ok(n) => {
540 if inner_tx_opened {
541 commit_transaction(dst, dst_backend)?;
543 }
544 n
545 }
546 Err(e) => {
547 if inner_tx_opened {
548 let _ = rollback_transaction(dst, dst_backend);
549 }
550 return Err(e);
551 }
552 };
553
554 if first_len > 0
555 && let Some(cb) = &opts.progress
556 {
557 cb(first_len);
558 }
559
560 let mut total = first_len;
561 let mut offset = first_len;
562
563 if first_len >= opts.batch_size {
565 loop {
566 let paged = crate::query_builder::apply_paging(
567 source_sql,
568 Some(opts.batch_size),
569 Some(offset),
570 src_backend,
571 )?;
572 let page = src.query(&paged)?;
573 if page.rows.is_empty() {
574 break;
575 }
576 let fetched = page.rows.len();
577 insert_batch(
578 dst,
579 target_table,
580 columns,
581 pk_columns,
582 "ed_table,
583 &cols_clause,
584 &page.rows,
585 dst_backend,
586 opts.if_exists,
587 opts.bulk_mode,
588 opts.copy_format,
589 opts.verbose,
590 )?;
591 total += fetched;
592 offset += fetched;
593 if let Some(cb) = &opts.progress {
594 cb(total);
595 }
596 if fetched < opts.batch_size {
597 break;
598 }
599 }
600 }
601
602 Ok(total)
603}
604
605#[allow(clippy::too_many_arguments)]
606fn run_truncate_and_first_batch(
607 dst: &mut dyn Connection,
608 dst_backend: Backend,
609 opts: &CopyOptions,
610 target_exists: bool,
611 target_table: &str,
612 columns: &[ColumnInfo],
613 pk_columns: &[String],
614 quoted_table: &str,
615 cols_clause: &str,
616 first_rows: &[Vec<Value>],
617) -> Result<usize, SqlError> {
618 if target_exists && opts.if_exists == IfExists::Truncate {
619 let sql = format!("DELETE FROM {quoted_table}");
620 dst.execute(&sql)?;
621 }
622 if !first_rows.is_empty() {
623 insert_batch(
624 dst,
625 target_table,
626 columns,
627 pk_columns,
628 quoted_table,
629 cols_clause,
630 first_rows,
631 dst_backend,
632 opts.if_exists,
633 opts.bulk_mode,
634 opts.copy_format,
635 opts.verbose,
636 )?;
637 }
638 Ok(first_rows.len())
639}
640
641#[allow(clippy::too_many_arguments)]
647pub(crate) fn insert_batch(
648 dst: &mut dyn Connection,
649 target_table: &str,
650 columns: &[ColumnInfo],
651 pk_columns: &[String],
652 quoted_table: &str,
653 cols_clause: &str,
654 rows: &[Vec<Value>],
655 dst_backend: Backend,
656 if_exists: IfExists,
657 bulk_mode: BulkMode,
658 copy_format: CopyFormat,
659 verbose: bool,
660) -> Result<(), SqlError> {
661 if rows.is_empty() {
662 return Ok(());
663 }
664 let bulk_eligible =
668 matches!(bulk_mode, BulkMode::Auto | BulkMode::On) && !if_exists.resolves_conflicts();
669 if bulk_eligible {
670 let target = BulkInsert {
671 table: target_table,
672 columns,
673 rows,
674 copy_format,
675 };
676 match dst.bulk_insert_rows(target) {
677 Ok(_) => {
678 if verbose {
679 eprintln!(
680 "[ferrule] bulk: inserted {} rows via {} native path",
681 rows.len(),
682 dst_backend.name()
683 );
684 }
685 return Ok(());
686 }
687 Err(SqlError::BulkUnavailable(reason)) => {
688 if bulk_mode == BulkMode::On {
689 return Err(SqlError::QueryFailed(format!(
690 "--bulk-native=on but {} bulk path unavailable: {reason}. \
691 Re-run with --bulk-native=auto to fall back to generic INSERT, \
692 or --bulk-native=off to disable bulk entirely.",
693 dst_backend.name()
694 )));
695 }
696 eprintln!(
700 "[ferrule] bulk: {} path unavailable: {reason}; using generic INSERT",
701 dst_backend.name()
702 );
703 }
704 Err(other) => return Err(other),
705 }
706 }
707 for sql in build_insert_sql(
708 quoted_table,
709 cols_clause,
710 rows,
711 dst_backend,
712 columns,
713 if_exists,
714 pk_columns,
715 ) {
716 dst.execute(&sql)?;
717 }
718 Ok(())
719}
720
721pub(crate) fn build_insert_sql(
733 quoted_table: &str,
734 cols_clause: &str,
735 rows: &[Vec<Value>],
736 dst_backend: Backend,
737 columns: &[ColumnInfo],
738 if_exists: IfExists,
739 pk_columns: &[String],
740) -> Vec<String> {
741 if rows.is_empty() {
742 return Vec::new();
743 }
744 let chunk_size = per_statement_row_cap(dst_backend)
745 .unwrap_or(rows.len())
746 .max(1);
747 rows.chunks(chunk_size)
748 .map(|chunk| {
749 build_one_insert(
750 quoted_table,
751 cols_clause,
752 chunk,
753 dst_backend,
754 columns,
755 if_exists,
756 pk_columns,
757 )
758 })
759 .collect()
760}
761
762#[allow(clippy::too_many_arguments)]
763fn build_one_insert(
764 quoted_table: &str,
765 cols_clause: &str,
766 rows: &[Vec<Value>],
767 dst_backend: Backend,
768 columns: &[ColumnInfo],
769 if_exists: IfExists,
770 pk_columns: &[String],
771) -> String {
772 if if_exists.resolves_conflicts() && !pk_columns.is_empty() {
776 return build_conflict_insert(
777 quoted_table,
778 cols_clause,
779 rows,
780 dst_backend,
781 columns,
782 if_exists,
783 pk_columns,
784 );
785 }
786 match dst_backend {
787 #[cfg(feature = "oracle")]
788 Backend::Oracle => {
789 let mut sql = String::from("INSERT ALL");
790 for row in rows {
791 let cells: Vec<String> = row.iter().map(|v| render_value(v, dst_backend)).collect();
792 sql.push_str(&format!(
793 " INTO {quoted_table} ({cols_clause}) VALUES ({})",
794 cells.join(", ")
795 ));
796 }
797 sql.push_str(" SELECT 1 FROM DUAL");
798 sql
799 }
800 _ => {
801 let values: Vec<String> = rows
802 .iter()
803 .map(|row| {
804 let cells: Vec<String> =
805 row.iter().map(|v| render_value(v, dst_backend)).collect();
806 format!("({})", cells.join(", "))
807 })
808 .collect();
809 format!(
810 "INSERT INTO {quoted_table} ({cols_clause}) VALUES {}",
811 values.join(", ")
812 )
813 }
814 }
815}
816
817#[allow(clippy::too_many_arguments)]
826fn build_conflict_insert(
827 quoted_table: &str,
828 cols_clause: &str,
829 rows: &[Vec<Value>],
830 dst_backend: Backend,
831 columns: &[ColumnInfo],
832 if_exists: IfExists,
833 pk_columns: &[String],
834) -> String {
835 let quoted_pks: Vec<String> = pk_columns
838 .iter()
839 .map(|n| quote_identifier(n, dst_backend))
840 .collect();
841 let non_pk_quoted: Vec<String> = columns
843 .iter()
844 .filter(|c| !pk_columns.iter().any(|pk| pk == &c.name))
845 .map(|c| quote_identifier(&c.name, dst_backend))
846 .collect();
847 match dst_backend {
848 #[cfg(feature = "mssql")]
849 Backend::MsSql => build_mssql_merge(
850 quoted_table,
851 cols_clause,
852 rows,
853 columns,
854 if_exists,
855 "ed_pks,
856 &non_pk_quoted,
857 dst_backend,
858 ),
859 #[cfg(feature = "oracle")]
860 Backend::Oracle => build_oracle_merge(
861 quoted_table,
862 cols_clause,
863 rows,
864 columns,
865 if_exists,
866 "ed_pks,
867 &non_pk_quoted,
868 dst_backend,
869 ),
870 #[cfg(feature = "mysql")]
871 Backend::MySql => build_mysql_conflict(
872 quoted_table,
873 cols_clause,
874 rows,
875 if_exists,
876 &non_pk_quoted,
877 dst_backend,
878 ),
879 _ => build_pg_sqlite_on_conflict(
881 quoted_table,
882 cols_clause,
883 rows,
884 if_exists,
885 "ed_pks,
886 &non_pk_quoted,
887 dst_backend,
888 ),
889 }
890}
891
892fn build_pg_sqlite_on_conflict(
893 quoted_table: &str,
894 cols_clause: &str,
895 rows: &[Vec<Value>],
896 if_exists: IfExists,
897 quoted_pks: &[String],
898 non_pk_quoted: &[String],
899 dst_backend: Backend,
900) -> String {
901 let values = render_values_vec(rows, dst_backend);
902 let pk_list = quoted_pks.join(", ");
903 let conflict_clause = if if_exists == IfExists::Skip || non_pk_quoted.is_empty() {
904 format!("ON CONFLICT ({pk_list}) DO NOTHING")
907 } else {
908 let assignments: Vec<String> = non_pk_quoted
909 .iter()
910 .map(|c| format!("{c} = EXCLUDED.{c}"))
911 .collect();
912 format!(
913 "ON CONFLICT ({pk_list}) DO UPDATE SET {}",
914 assignments.join(", ")
915 )
916 };
917 format!(
918 "INSERT INTO {quoted_table} ({cols_clause}) VALUES {} {conflict_clause}",
919 values.join(", ")
920 )
921}
922
923#[cfg(feature = "mysql")]
924fn build_mysql_conflict(
925 quoted_table: &str,
926 cols_clause: &str,
927 rows: &[Vec<Value>],
928 if_exists: IfExists,
929 non_pk_quoted: &[String],
930 dst_backend: Backend,
931) -> String {
932 let values = render_values_vec(rows, dst_backend);
933 match if_exists {
934 IfExists::Skip => format!(
935 "INSERT IGNORE INTO {quoted_table} ({cols_clause}) VALUES {}",
936 values.join(", ")
937 ),
938 IfExists::Upsert if !non_pk_quoted.is_empty() => {
939 let assignments: Vec<String> = non_pk_quoted
940 .iter()
941 .map(|c| format!("{c} = VALUES({c})"))
942 .collect();
943 format!(
944 "INSERT INTO {quoted_table} ({cols_clause}) VALUES {} \
945 ON DUPLICATE KEY UPDATE {}",
946 values.join(", "),
947 assignments.join(", ")
948 )
949 }
950 _ => format!(
952 "INSERT IGNORE INTO {quoted_table} ({cols_clause}) VALUES {}",
953 values.join(", ")
954 ),
955 }
956}
957
958#[cfg(feature = "mssql")]
959#[allow(clippy::too_many_arguments)]
960fn build_mssql_merge(
961 quoted_table: &str,
962 cols_clause: &str,
963 rows: &[Vec<Value>],
964 columns: &[ColumnInfo],
965 if_exists: IfExists,
966 quoted_pks: &[String],
967 non_pk_quoted: &[String],
968 dst_backend: Backend,
969) -> String {
970 let source_alias_cols: Vec<String> = columns
971 .iter()
972 .map(|c| quote_identifier(&c.name, dst_backend))
973 .collect();
974 let source_alias_clause = source_alias_cols.join(", ");
975 let values = render_values_vec(rows, dst_backend);
976 let on_clause: Vec<String> = quoted_pks
977 .iter()
978 .map(|pk| format!("dst.{pk} = src.{pk}"))
979 .collect();
980 let insert_values: Vec<String> = source_alias_cols
981 .iter()
982 .map(|c| format!("src.{c}"))
983 .collect();
984 let mut sql = format!(
985 "MERGE INTO {quoted_table} AS dst \
986 USING (VALUES {}) AS src ({source_alias_clause}) \
987 ON {} ",
988 values.join(", "),
989 on_clause.join(" AND "),
990 );
991 if if_exists == IfExists::Upsert && !non_pk_quoted.is_empty() {
992 let assignments: Vec<String> = non_pk_quoted
993 .iter()
994 .map(|c| format!("{c} = src.{c}"))
995 .collect();
996 sql.push_str(&format!(
997 "WHEN MATCHED THEN UPDATE SET {} ",
998 assignments.join(", ")
999 ));
1000 }
1001 sql.push_str(&format!(
1002 "WHEN NOT MATCHED THEN INSERT ({cols_clause}) VALUES ({});",
1003 insert_values.join(", ")
1004 ));
1005 sql
1006}
1007
1008#[cfg(feature = "oracle")]
1009#[allow(clippy::too_many_arguments)]
1010fn build_oracle_merge(
1011 quoted_table: &str,
1012 cols_clause: &str,
1013 rows: &[Vec<Value>],
1014 columns: &[ColumnInfo],
1015 if_exists: IfExists,
1016 quoted_pks: &[String],
1017 non_pk_quoted: &[String],
1018 dst_backend: Backend,
1019) -> String {
1020 let source_alias_cols: Vec<String> = columns
1021 .iter()
1022 .map(|c| quote_identifier(&c.name, dst_backend))
1023 .collect();
1024 let source_rows: Vec<String> = rows
1027 .iter()
1028 .map(|row| {
1029 let cells: Vec<String> = row
1030 .iter()
1031 .zip(source_alias_cols.iter())
1032 .map(|(v, alias)| format!("{} AS {alias}", render_value(v, dst_backend)))
1033 .collect();
1034 format!("SELECT {} FROM dual", cells.join(", "))
1035 })
1036 .collect();
1037 let on_clause: Vec<String> = quoted_pks
1038 .iter()
1039 .map(|pk| format!("dst.{pk} = src.{pk}"))
1040 .collect();
1041 let insert_values: Vec<String> = source_alias_cols
1042 .iter()
1043 .map(|c| format!("src.{c}"))
1044 .collect();
1045 let mut sql = format!(
1046 "MERGE INTO {quoted_table} dst \
1047 USING ({}) src \
1048 ON ({}) ",
1049 source_rows.join(" UNION ALL "),
1050 on_clause.join(" AND "),
1051 );
1052 if if_exists == IfExists::Upsert && !non_pk_quoted.is_empty() {
1053 let assignments: Vec<String> = non_pk_quoted
1054 .iter()
1055 .map(|c| format!("dst.{c} = src.{c}"))
1056 .collect();
1057 sql.push_str(&format!(
1058 "WHEN MATCHED THEN UPDATE SET {} ",
1059 assignments.join(", ")
1060 ));
1061 }
1062 sql.push_str(&format!(
1063 "WHEN NOT MATCHED THEN INSERT ({cols_clause}) VALUES ({})",
1064 insert_values.join(", ")
1065 ));
1066 sql
1067}
1068
1069fn render_values_vec(rows: &[Vec<Value>], dst_backend: Backend) -> Vec<String> {
1070 rows.iter()
1071 .map(|row| {
1072 let cells: Vec<String> = row.iter().map(|v| render_value(v, dst_backend)).collect();
1073 format!("({})", cells.join(", "))
1074 })
1075 .collect()
1076}
1077
1078fn if_exists_name(s: IfExists) -> &'static str {
1079 match s {
1080 IfExists::Error => "error",
1081 IfExists::Append => "append",
1082 IfExists::Truncate => "truncate",
1083 IfExists::Skip => "skip",
1084 IfExists::Upsert => "upsert",
1085 }
1086}
1087
1088fn per_statement_row_cap(backend: Backend) -> Option<usize> {
1089 match backend {
1090 #[cfg(feature = "mssql")]
1091 Backend::MsSql => Some(1000),
1092 #[cfg(feature = "oracle")]
1093 Backend::Oracle => Some(250),
1094 _ => None,
1095 }
1096}
1097
1098pub fn quote_identifier(id: &str, backend: Backend) -> String {
1108 match backend {
1109 #[cfg(feature = "mysql")]
1110 Backend::MySql => format!("`{}`", id.replace('`', "``")),
1111 #[cfg(feature = "postgres")]
1112 Backend::Postgres => ansi_quote(id),
1113 #[cfg(feature = "sqlite")]
1114 Backend::Sqlite => ansi_quote(id),
1115 #[cfg(feature = "mssql")]
1116 Backend::MsSql => ansi_quote(id),
1117 #[cfg(feature = "oracle")]
1118 Backend::Oracle => ansi_quote(id),
1119 }
1120}
1121
1122fn ansi_quote(id: &str) -> String {
1123 format!("\"{}\"", id.replace('"', "\"\""))
1124}
1125
1126pub fn translate_ddl(table: &str, cols: &[ColumnInfo], dst: Backend, pk: &[String]) -> String {
1133 let quoted_table = quote_identifier(table, dst);
1134 let col_defs: Vec<String> = cols
1135 .iter()
1136 .map(|c| {
1137 let name = quote_identifier(&c.name, dst);
1138 let ty = translate_type(c.type_hint, dst);
1139 let null_clause = if c.nullable { "" } else { " NOT NULL" };
1140 format!("{name} {ty}{null_clause}")
1141 })
1142 .collect();
1143 let pk_clause = if pk.is_empty() {
1144 String::new()
1145 } else {
1146 let quoted_pks: Vec<String> = pk.iter().map(|c| quote_identifier(c, dst)).collect();
1147 format!(", PRIMARY KEY ({})", quoted_pks.join(", "))
1148 };
1149 format!(
1150 "CREATE TABLE IF NOT EXISTS {quoted_table} ({}{pk_clause})",
1151 col_defs.join(", ")
1152 )
1153}
1154
1155pub fn translate_type(hint: TypeHint, dst: Backend) -> &'static str {
1161 match dst {
1162 #[cfg(feature = "postgres")]
1163 Backend::Postgres => match hint {
1164 TypeHint::Bool => "BOOLEAN",
1165 TypeHint::Int64 => "BIGINT",
1166 TypeHint::Float64 => "DOUBLE PRECISION",
1167 TypeHint::Decimal => "NUMERIC",
1168 TypeHint::Bytes => "BYTEA",
1169 TypeHint::Date => "DATE",
1170 TypeHint::Time => "TIME",
1171 TypeHint::DateTime => "TIMESTAMP",
1172 TypeHint::DateTimeTz => "TIMESTAMPTZ",
1173 TypeHint::Json | TypeHint::Array => "JSONB",
1174 TypeHint::Uuid => "UUID",
1175 TypeHint::String | TypeHint::Other | TypeHint::Null => "TEXT",
1176 },
1177 #[cfg(feature = "mysql")]
1178 Backend::MySql => match hint {
1179 TypeHint::Bool => "TINYINT(1)",
1180 TypeHint::Int64 => "BIGINT",
1181 TypeHint::Float64 => "DOUBLE",
1182 TypeHint::Decimal => "DECIMAL(38,10)",
1183 TypeHint::Bytes => "LONGBLOB",
1184 TypeHint::Date => "DATE",
1185 TypeHint::Time => "TIME",
1186 TypeHint::DateTime | TypeHint::DateTimeTz => "DATETIME",
1187 TypeHint::Json | TypeHint::Array => "JSON",
1188 TypeHint::Uuid => "CHAR(36)",
1189 TypeHint::String | TypeHint::Other | TypeHint::Null => "TEXT",
1190 },
1191 #[cfg(feature = "mssql")]
1192 Backend::MsSql => match hint {
1193 TypeHint::Bool => "BIT",
1194 TypeHint::Int64 => "BIGINT",
1195 TypeHint::Float64 => "FLOAT",
1196 TypeHint::Decimal => "DECIMAL(38,10)",
1197 TypeHint::Bytes => "VARBINARY(MAX)",
1198 TypeHint::Date => "DATE",
1199 TypeHint::Time => "TIME",
1200 TypeHint::DateTime => "DATETIME2",
1201 TypeHint::DateTimeTz => "DATETIMEOFFSET",
1202 TypeHint::Json
1203 | TypeHint::Array
1204 | TypeHint::String
1205 | TypeHint::Other
1206 | TypeHint::Null => "NVARCHAR(MAX)",
1207 TypeHint::Uuid => "UNIQUEIDENTIFIER",
1208 },
1209 #[cfg(feature = "sqlite")]
1210 Backend::Sqlite => match hint {
1211 TypeHint::Bool | TypeHint::Int64 => "INTEGER",
1212 TypeHint::Float64 => "REAL",
1213 TypeHint::Decimal => "NUMERIC",
1214 TypeHint::Bytes => "BLOB",
1215 _ => "TEXT",
1217 },
1218 #[cfg(feature = "oracle")]
1219 Backend::Oracle => match hint {
1220 TypeHint::Bool => "NUMBER(1)",
1221 TypeHint::Int64 => "NUMBER(19)",
1222 TypeHint::Float64 => "BINARY_DOUBLE",
1223 TypeHint::Decimal => "NUMBER",
1224 TypeHint::Bytes => "BLOB",
1225 TypeHint::Date => "DATE",
1226 TypeHint::Time | TypeHint::DateTime => "TIMESTAMP",
1227 TypeHint::DateTimeTz => "TIMESTAMP WITH TIME ZONE",
1228 TypeHint::Json
1229 | TypeHint::Array
1230 | TypeHint::String
1231 | TypeHint::Other
1232 | TypeHint::Null => "CLOB",
1233 TypeHint::Uuid => "RAW(16)",
1234 },
1235 }
1236}
1237
1238fn table_exists(conn: &mut dyn Connection, table: &str) -> Result<bool, SqlError> {
1239 let tables = conn.list_tables(None)?;
1240 Ok(tables.iter().any(|t| t.eq_ignore_ascii_case(table)))
1241}
1242
1243fn table_has_rows(
1244 conn: &mut dyn Connection,
1245 table: &str,
1246 backend: Backend,
1247) -> Result<bool, SqlError> {
1248 let qident = quote_identifier(table, backend);
1249 let sql = crate::query_builder::apply_paging(
1250 &format!("SELECT 1 FROM {qident}"),
1251 Some(1),
1252 None,
1253 backend,
1254 )?;
1255 let result = conn.query(&sql)?;
1256 Ok(!result.rows.is_empty())
1257}
1258
1259pub struct AllTablesOptions {
1271 pub include: Vec<String>,
1273 pub exclude: Vec<String>,
1275 pub if_exists: IfExists,
1278 pub atomic: bool,
1279 pub batch_size: usize,
1280 pub bulk_mode: BulkMode,
1281 pub copy_format: CopyFormat,
1282 pub verbose: bool,
1283 pub create_table: bool,
1284 pub preserve_pk: bool,
1288 pub conflict_key: Vec<String>,
1292 pub no_fk_check: bool,
1296}
1297
1298impl Default for AllTablesOptions {
1299 fn default() -> Self {
1300 Self {
1301 include: Vec::new(),
1302 exclude: Vec::new(),
1303 if_exists: IfExists::Error,
1304 atomic: false,
1305 batch_size: 1000,
1306 bulk_mode: BulkMode::Off,
1307 copy_format: CopyFormat::Text,
1308 verbose: false,
1309 create_table: false,
1310 preserve_pk: false,
1311 conflict_key: Vec::new(),
1312 no_fk_check: false,
1313 }
1314 }
1315}
1316
1317pub(crate) fn matches_glob(pattern: &str, name: &str) -> bool {
1321 fn helper(p: &[u8], n: &[u8]) -> bool {
1322 match (p.split_first(), n.split_first()) {
1323 (None, None) => true,
1324 (Some((b'*', rest)), _) => {
1325 if helper(rest, n) {
1327 return true;
1328 }
1329 if let Some((_, ns)) = n.split_first() {
1330 helper(p, ns)
1331 } else {
1332 false
1333 }
1334 }
1335 (Some((b'?', rest_p)), Some((_, rest_n))) => helper(rest_p, rest_n),
1336 (Some((pc, rest_p)), Some((nc, rest_n))) if pc == nc => helper(rest_p, rest_n),
1337 _ => false,
1338 }
1339 }
1340 helper(pattern.as_bytes(), name.as_bytes())
1341}
1342
1343pub fn discover_tables(
1348 src: &mut dyn Connection,
1349 schema: Option<&str>,
1350 include: &[String],
1351 exclude: &[String],
1352) -> Result<Vec<String>, SqlError> {
1353 let raw = src.list_tables(schema)?;
1354 Ok(raw
1355 .into_iter()
1356 .filter(|t| {
1357 if !include.is_empty() && !include.iter().any(|p| matches_glob(p, t)) {
1358 return false;
1359 }
1360 if exclude.iter().any(|p| matches_glob(p, t)) {
1361 return false;
1362 }
1363 true
1364 })
1365 .collect())
1366}
1367
1368#[derive(Debug, Clone, PartialEq, Eq)]
1371pub struct CycleError {
1372 pub remaining: Vec<String>,
1375}
1376
1377pub fn topo_sort(tables: &[String], fks: &[ForeignKey]) -> Result<Vec<String>, CycleError> {
1390 use std::collections::{BTreeSet, HashMap, HashSet};
1391
1392 let present: HashSet<String> = tables.iter().cloned().collect();
1393 let order_index: HashMap<String, usize> = tables
1394 .iter()
1395 .enumerate()
1396 .map(|(i, t)| (t.clone(), i))
1397 .collect();
1398 let mut parents: HashMap<String, BTreeSet<String>> = HashMap::new();
1400 let mut children: HashMap<String, Vec<String>> = HashMap::new();
1401 for t in tables {
1402 parents.entry(t.clone()).or_default();
1403 }
1404 for fk in fks {
1405 if !present.contains(&fk.child_table) || !present.contains(&fk.parent_table) {
1406 continue;
1407 }
1408 if fk.child_table == fk.parent_table {
1409 continue;
1411 }
1412 if parents
1413 .get_mut(&fk.child_table)
1414 .unwrap()
1415 .insert(fk.parent_table.clone())
1416 {
1417 children
1418 .entry(fk.parent_table.clone())
1419 .or_default()
1420 .push(fk.child_table.clone());
1421 }
1422 }
1423
1424 let mut ready: Vec<String> = tables
1427 .iter()
1428 .filter(|t| parents.get(*t).is_some_and(|p| p.is_empty()))
1429 .cloned()
1430 .collect();
1431 let mut out: Vec<String> = Vec::with_capacity(tables.len());
1432 let mut emitted: HashSet<String> = HashSet::new();
1433 while !ready.is_empty() {
1434 let t = ready.remove(0);
1435 out.push(t.clone());
1436 emitted.insert(t.clone());
1437 if let Some(kids) = children.get(&t).cloned() {
1438 for kid in kids {
1439 if let Some(ps) = parents.get_mut(&kid) {
1440 ps.remove(&t);
1441 if ps.is_empty() && !emitted.contains(&kid) && !ready.contains(&kid) {
1442 let kid_idx = *order_index.get(&kid).unwrap_or(&usize::MAX);
1443 let insert_at = ready
1444 .iter()
1445 .position(|r| *order_index.get(r).unwrap_or(&usize::MAX) > kid_idx)
1446 .unwrap_or(ready.len());
1447 ready.insert(insert_at, kid);
1448 }
1449 }
1450 }
1451 }
1452 }
1453
1454 if out.len() == tables.len() {
1455 Ok(out)
1456 } else {
1457 let mut remaining: Vec<String> = tables
1458 .iter()
1459 .filter(|t| !emitted.contains(*t))
1460 .cloned()
1461 .collect();
1462 remaining.sort();
1463 Err(CycleError { remaining })
1464 }
1465}
1466
1467#[allow(clippy::too_many_arguments)]
1477pub fn copy_all_tables(
1478 src: &mut dyn Connection,
1479 src_backend: Backend,
1480 dst: &mut dyn Connection,
1481 dst_backend: Backend,
1482 opts: &AllTablesOptions,
1483) -> Result<usize, SqlError> {
1484 let tables = discover_tables(src, None, &opts.include, &opts.exclude)?;
1485 if tables.is_empty() {
1486 return Err(SqlError::QueryFailed(
1487 "copy --all-tables: no tables matched the include/exclude filters.".into(),
1488 ));
1489 }
1490
1491 let fks = src.list_foreign_keys(None)?;
1492 let ordered = match topo_sort(&tables, &fks) {
1493 Ok(o) => o,
1494 Err(cycle) if opts.no_fk_check => {
1495 eprintln!(
1496 "[ferrule] copy: FK cycle detected among {:?}; \
1497 --no-fk-check is set, copying in discovery order.",
1498 cycle.remaining
1499 );
1500 tables.clone()
1501 }
1502 Err(cycle) => {
1503 return Err(SqlError::QueryFailed(format!(
1504 "copy --all-tables: foreign-key cycle prevents a strict load order \
1505 (tables on the cycle: {:?}). Re-run with --no-fk-check to copy in \
1506 discovery order; FK violations may then surface as driver errors.",
1507 cycle.remaining
1508 )));
1509 }
1510 };
1511
1512 let total_tables = ordered.len();
1513 let mut total_rows = 0usize;
1514 for (idx, table) in ordered.iter().enumerate() {
1515 if opts.verbose {
1516 eprintln!("[ferrule] [{}/{total_tables}] copying {table}…", idx + 1);
1517 }
1518 let per_table = CopyOptions {
1519 source: CopySource::Table(table.clone()),
1520 create_table: opts.create_table,
1521 preserve_pk: opts.preserve_pk,
1522 if_exists: opts.if_exists,
1523 conflict_key: opts.conflict_key.clone(),
1524 atomic: opts.atomic,
1525 batch_size: opts.batch_size,
1526 bulk_mode: opts.bulk_mode,
1527 copy_format: opts.copy_format,
1528 verbose: opts.verbose,
1529 progress: None,
1530 };
1531 let n = copy_rows(src, src_backend, dst, dst_backend, &per_table)?;
1532 if opts.verbose {
1533 eprintln!("[ferrule] [{}/{total_tables}] {table}: {n} rows", idx + 1);
1534 }
1535 total_rows += n;
1536 }
1537 Ok(total_rows)
1538}
1539
1540#[cfg(test)]
1541mod tests {
1542 use super::*;
1543 use crate::value::ColumnInfo;
1544
1545 fn col(name: &str, hint: TypeHint, nullable: bool) -> ColumnInfo {
1546 ColumnInfo {
1547 name: name.to_string(),
1548 type_hint: hint,
1549 nullable,
1550 }
1551 }
1552
1553 #[test]
1554 fn if_exists_parse_recognises_strategies() {
1555 assert_eq!(IfExists::parse("error"), Some(IfExists::Error));
1556 assert_eq!(IfExists::parse("APPEND"), Some(IfExists::Append));
1557 assert_eq!(IfExists::parse("Truncate"), Some(IfExists::Truncate));
1558 assert_eq!(IfExists::parse("skip"), Some(IfExists::Skip));
1559 assert_eq!(IfExists::parse("UPSERT"), Some(IfExists::Upsert));
1560 assert_eq!(IfExists::parse("merge"), None);
1561 }
1562
1563 #[test]
1564 fn if_exists_resolves_conflicts_only_for_skip_and_upsert() {
1565 assert!(!IfExists::Error.resolves_conflicts());
1566 assert!(!IfExists::Append.resolves_conflicts());
1567 assert!(!IfExists::Truncate.resolves_conflicts());
1568 assert!(IfExists::Skip.resolves_conflicts());
1569 assert!(IfExists::Upsert.resolves_conflicts());
1570 }
1571
1572 #[test]
1573 fn if_exists_default_is_non_destructive() {
1574 assert_eq!(IfExists::default(), IfExists::Error);
1575 }
1576
1577 #[test]
1578 fn bulk_mode_parse_recognises_modes() {
1579 assert_eq!(BulkMode::parse("off"), Some(BulkMode::Off));
1580 assert_eq!(BulkMode::parse("Auto"), Some(BulkMode::Auto));
1581 assert_eq!(BulkMode::parse("ON"), Some(BulkMode::On));
1582 assert_eq!(BulkMode::parse("native"), None);
1583 }
1584
1585 #[test]
1586 fn bulk_mode_default_is_off() {
1587 assert_eq!(BulkMode::default(), BulkMode::Off);
1588 }
1589
1590 #[cfg(feature = "sqlite")]
1591 #[test]
1592 fn quote_identifier_sqlite_uses_ansi_quotes() {
1593 assert_eq!(quote_identifier("users", Backend::Sqlite), "\"users\"");
1594 assert_eq!(quote_identifier("a\"b", Backend::Sqlite), "\"a\"\"b\"");
1595 }
1596
1597 #[cfg(feature = "mysql")]
1598 #[test]
1599 fn quote_identifier_mysql_uses_backticks() {
1600 assert_eq!(quote_identifier("users", Backend::MySql), "`users`");
1601 assert_eq!(quote_identifier("a`b", Backend::MySql), "`a``b`");
1602 }
1603
1604 #[cfg(feature = "sqlite")]
1611 #[test]
1612 fn quote_identifier_wraps_in_double_quotes() {
1613 assert_eq!(quote_identifier("users", Backend::Sqlite), "\"users\"");
1614 }
1615
1616 #[cfg(feature = "sqlite")]
1617 #[test]
1618 fn quote_identifier_escapes_embedded_quotes() {
1619 assert_eq!(quote_identifier("a\"b", Backend::Sqlite), "\"a\"\"b\"");
1620 assert_eq!(quote_identifier("\"\"", Backend::Sqlite), "\"\"\"\"\"\"");
1621 }
1622
1623 #[cfg(feature = "sqlite")]
1624 #[test]
1625 fn quote_identifier_preserves_other_chars() {
1626 assert_eq!(
1627 quote_identifier("col with space", Backend::Sqlite),
1628 "\"col with space\""
1629 );
1630 assert_eq!(
1631 quote_identifier("snake_case_99", Backend::Sqlite),
1632 "\"snake_case_99\""
1633 );
1634 }
1635
1636 #[cfg(feature = "postgres")]
1637 #[test]
1638 fn translate_type_postgres_maps_decimal_to_numeric() {
1639 assert_eq!(
1640 translate_type(TypeHint::Decimal, Backend::Postgres),
1641 "NUMERIC"
1642 );
1643 assert_eq!(
1644 translate_type(TypeHint::DateTimeTz, Backend::Postgres),
1645 "TIMESTAMPTZ"
1646 );
1647 assert_eq!(translate_type(TypeHint::Json, Backend::Postgres), "JSONB");
1648 }
1649
1650 #[cfg(feature = "sqlite")]
1651 #[test]
1652 fn translate_type_sqlite_collapses_to_storage_classes() {
1653 assert_eq!(translate_type(TypeHint::Bool, Backend::Sqlite), "INTEGER");
1654 assert_eq!(translate_type(TypeHint::Int64, Backend::Sqlite), "INTEGER");
1655 assert_eq!(translate_type(TypeHint::Float64, Backend::Sqlite), "REAL");
1656 assert_eq!(translate_type(TypeHint::Bytes, Backend::Sqlite), "BLOB");
1657 assert_eq!(translate_type(TypeHint::DateTime, Backend::Sqlite), "TEXT");
1658 assert_eq!(translate_type(TypeHint::Json, Backend::Sqlite), "TEXT");
1659 }
1660
1661 #[cfg(feature = "mssql")]
1662 #[test]
1663 fn translate_type_mssql_maps_string_to_nvarchar_max() {
1664 assert_eq!(
1665 translate_type(TypeHint::String, Backend::MsSql),
1666 "NVARCHAR(MAX)"
1667 );
1668 assert_eq!(
1669 translate_type(TypeHint::Uuid, Backend::MsSql),
1670 "UNIQUEIDENTIFIER"
1671 );
1672 assert_eq!(translate_type(TypeHint::Bool, Backend::MsSql), "BIT");
1673 }
1674
1675 #[cfg(all(feature = "postgres", feature = "sqlite"))]
1676 #[test]
1677 fn translate_ddl_postgres_to_sqlite() {
1678 let cols = vec![
1679 col("id", TypeHint::Int64, false),
1680 col("name", TypeHint::String, true),
1681 col("score", TypeHint::Float64, true),
1682 col("active", TypeHint::Bool, true),
1683 col("meta", TypeHint::Json, true),
1684 ];
1685 let ddl = translate_ddl("test_users", &cols, Backend::Sqlite, &[]);
1686 assert_eq!(
1687 ddl,
1688 "CREATE TABLE IF NOT EXISTS \"test_users\" (\
1689 \"id\" INTEGER NOT NULL, \
1690 \"name\" TEXT, \
1691 \"score\" REAL, \
1692 \"active\" INTEGER, \
1693 \"meta\" TEXT)"
1694 );
1695 }
1696
1697 #[cfg(feature = "sqlite")]
1701 #[test]
1702 fn translate_ddl_with_preserve_pk_emits_primary_key_clause() {
1703 let cols = vec![
1704 col("id", TypeHint::Int64, false),
1705 col("name", TypeHint::String, true),
1706 ];
1707 let ddl = translate_ddl("users", &cols, Backend::Sqlite, &["id".to_string()]);
1708 assert_eq!(
1709 ddl,
1710 "CREATE TABLE IF NOT EXISTS \"users\" (\
1711 \"id\" INTEGER NOT NULL, \
1712 \"name\" TEXT, PRIMARY KEY (\"id\"))"
1713 );
1714 }
1715
1716 #[cfg(feature = "sqlite")]
1719 #[test]
1720 fn translate_ddl_with_preserve_pk_emits_composite_primary_key() {
1721 let cols = vec![
1722 col("tenant", TypeHint::Int64, false),
1723 col("id", TypeHint::Int64, false),
1724 col("name", TypeHint::String, true),
1725 ];
1726 let ddl = translate_ddl(
1727 "users",
1728 &cols,
1729 Backend::Sqlite,
1730 &["tenant".to_string(), "id".to_string()],
1731 );
1732 assert!(
1733 ddl.ends_with("PRIMARY KEY (\"tenant\", \"id\"))"),
1734 "got: {ddl}"
1735 );
1736 }
1737
1738 #[cfg(all(feature = "mysql", feature = "mssql"))]
1739 #[test]
1740 fn translate_ddl_mysql_to_mssql_uses_correct_quoting_and_types() {
1741 let cols = vec![
1742 col("id", TypeHint::Int64, false),
1743 col("uid", TypeHint::Uuid, true),
1744 col("created_at", TypeHint::DateTimeTz, true),
1745 ];
1746 let ddl = translate_ddl("orders", &cols, Backend::MsSql, &[]);
1747 assert_eq!(
1748 ddl,
1749 "CREATE TABLE IF NOT EXISTS \"orders\" (\
1750 \"id\" BIGINT NOT NULL, \
1751 \"uid\" UNIQUEIDENTIFIER, \
1752 \"created_at\" DATETIMEOFFSET)"
1753 );
1754 }
1755
1756 fn row_int(n: i64) -> Vec<Value> {
1757 vec![Value::Int64(n)]
1758 }
1759
1760 fn cols_id_only() -> Vec<ColumnInfo> {
1763 vec![ColumnInfo {
1764 name: "id".to_string(),
1765 type_hint: TypeHint::Int64,
1766 nullable: false,
1767 }]
1768 }
1769
1770 #[test]
1771 fn build_insert_sql_empty_rows_returns_empty() {
1772 let cols = cols_id_only();
1773 let out = build_insert_sql(
1774 "\"t\"",
1775 "\"id\"",
1776 &[],
1777 default_backend_for_test(),
1778 &cols,
1779 IfExists::Append,
1780 &[],
1781 );
1782 assert!(out.is_empty());
1783 }
1784
1785 #[cfg(feature = "sqlite")]
1786 #[test]
1787 fn build_insert_sql_sqlite_emits_single_multi_row_insert() {
1788 let cols = cols_id_only();
1789 let rows = vec![row_int(1), row_int(2), row_int(3)];
1790 let out = build_insert_sql(
1791 "\"t\"",
1792 "\"id\"",
1793 &rows,
1794 Backend::Sqlite,
1795 &cols,
1796 IfExists::Append,
1797 &[],
1798 );
1799 assert_eq!(out.len(), 1);
1800 assert_eq!(out[0], "INSERT INTO \"t\" (\"id\") VALUES (1), (2), (3)");
1801 }
1802
1803 #[cfg(feature = "oracle")]
1804 #[test]
1805 fn build_insert_sql_oracle_emits_insert_all_with_select_from_dual() {
1806 let cols = cols_id_only();
1807 let rows = vec![row_int(1), row_int(2), row_int(3)];
1808 let out = build_insert_sql(
1809 "\"t\"",
1810 "\"id\"",
1811 &rows,
1812 Backend::Oracle,
1813 &cols,
1814 IfExists::Append,
1815 &[],
1816 );
1817 assert_eq!(out.len(), 1);
1818 assert_eq!(
1819 out[0],
1820 "INSERT ALL\
1821 \u{0020}INTO \"t\" (\"id\") VALUES (1)\
1822 \u{0020}INTO \"t\" (\"id\") VALUES (2)\
1823 \u{0020}INTO \"t\" (\"id\") VALUES (3)\
1824 \u{0020}SELECT 1 FROM DUAL"
1825 );
1826 }
1827
1828 #[cfg(feature = "mssql")]
1829 #[test]
1830 fn build_insert_sql_mssql_splits_above_1000_rows() {
1831 let cols = cols_id_only();
1832 let rows: Vec<Vec<Value>> = (0..2500).map(|i| row_int(i as i64)).collect();
1833 let out = build_insert_sql(
1834 "\"t\"",
1835 "\"id\"",
1836 &rows,
1837 Backend::MsSql,
1838 &cols,
1839 IfExists::Append,
1840 &[],
1841 );
1842 assert_eq!(out.len(), 3);
1844 for sql in &out {
1846 assert!(sql.starts_with("INSERT INTO \"t\" (\"id\") VALUES "));
1847 }
1848 assert_eq!(out[0].matches("), (").count(), 999);
1851 assert_eq!(out[1].matches("), (").count(), 999);
1852 assert_eq!(out[2].matches("), (").count(), 499);
1853 }
1854
1855 #[cfg(feature = "oracle")]
1856 #[test]
1857 fn build_insert_sql_oracle_chunks_at_250_rows() {
1858 let cols = cols_id_only();
1859 let rows: Vec<Vec<Value>> = (0..600).map(|i| row_int(i as i64)).collect();
1860 let out = build_insert_sql(
1861 "\"t\"",
1862 "\"id\"",
1863 &rows,
1864 Backend::Oracle,
1865 &cols,
1866 IfExists::Append,
1867 &[],
1868 );
1869 assert_eq!(out.len(), 3);
1871 for sql in &out {
1872 assert!(sql.starts_with("INSERT ALL"));
1873 assert!(sql.ends_with(" SELECT 1 FROM DUAL"));
1874 }
1875 assert_eq!(out[0].matches(" INTO ").count(), 250);
1877 assert_eq!(out[1].matches(" INTO ").count(), 250);
1878 assert_eq!(out[2].matches(" INTO ").count(), 100);
1879 }
1880
1881 fn cols_id_name() -> Vec<ColumnInfo> {
1885 vec![
1886 ColumnInfo {
1887 name: "id".to_string(),
1888 type_hint: TypeHint::Int64,
1889 nullable: false,
1890 },
1891 ColumnInfo {
1892 name: "name".to_string(),
1893 type_hint: TypeHint::String,
1894 nullable: true,
1895 },
1896 ]
1897 }
1898
1899 fn row_id_name(id: i64, name: &str) -> Vec<Value> {
1900 vec![Value::Int64(id), Value::String(name.to_string())]
1901 }
1902
1903 #[cfg(feature = "sqlite")]
1904 #[test]
1905 fn build_insert_sql_pg_skip_emits_on_conflict_do_nothing() {
1906 let cols = cols_id_name();
1907 let rows = vec![row_id_name(1, "a"), row_id_name(2, "b")];
1908 let pk = vec!["id".to_string()];
1909 let out = build_insert_sql(
1912 "\"t\"",
1913 "\"id\", \"name\"",
1914 &rows,
1915 Backend::Sqlite,
1916 &cols,
1917 IfExists::Skip,
1918 &pk,
1919 );
1920 assert_eq!(out.len(), 1);
1921 assert_eq!(
1922 out[0],
1923 "INSERT INTO \"t\" (\"id\", \"name\") VALUES (1, 'a'), (2, 'b') \
1924 ON CONFLICT (\"id\") DO NOTHING"
1925 );
1926 }
1927
1928 #[cfg(feature = "sqlite")]
1929 #[test]
1930 fn build_insert_sql_pg_upsert_emits_excluded_assignments() {
1931 let cols = cols_id_name();
1932 let rows = vec![row_id_name(1, "a"), row_id_name(2, "b")];
1933 let pk = vec!["id".to_string()];
1934 let out = build_insert_sql(
1935 "\"t\"",
1936 "\"id\", \"name\"",
1937 &rows,
1938 Backend::Sqlite,
1939 &cols,
1940 IfExists::Upsert,
1941 &pk,
1942 );
1943 assert_eq!(out.len(), 1);
1944 assert_eq!(
1945 out[0],
1946 "INSERT INTO \"t\" (\"id\", \"name\") VALUES (1, 'a'), (2, 'b') \
1947 ON CONFLICT (\"id\") DO UPDATE SET \"name\" = EXCLUDED.\"name\""
1948 );
1949 }
1950
1951 #[cfg(feature = "sqlite")]
1952 #[test]
1953 fn build_insert_sql_pg_upsert_pk_only_table_collapses_to_do_nothing() {
1954 let cols = vec![ColumnInfo {
1957 name: "id".to_string(),
1958 type_hint: TypeHint::Int64,
1959 nullable: false,
1960 }];
1961 let rows = vec![row_int(1), row_int(2)];
1962 let pk = vec!["id".to_string()];
1963 let out = build_insert_sql(
1964 "\"t\"",
1965 "\"id\"",
1966 &rows,
1967 Backend::Sqlite,
1968 &cols,
1969 IfExists::Upsert,
1970 &pk,
1971 );
1972 assert_eq!(out.len(), 1);
1973 assert!(out[0].ends_with("ON CONFLICT (\"id\") DO NOTHING"));
1974 }
1975
1976 #[cfg(feature = "sqlite")]
1977 #[test]
1978 fn build_insert_sql_pg_upsert_composite_pk_emits_full_pk_list() {
1979 let cols = vec![
1980 ColumnInfo {
1981 name: "a".to_string(),
1982 type_hint: TypeHint::Int64,
1983 nullable: false,
1984 },
1985 ColumnInfo {
1986 name: "b".to_string(),
1987 type_hint: TypeHint::Int64,
1988 nullable: false,
1989 },
1990 ColumnInfo {
1991 name: "v".to_string(),
1992 type_hint: TypeHint::String,
1993 nullable: true,
1994 },
1995 ];
1996 let rows = vec![vec![
1997 Value::Int64(1),
1998 Value::Int64(2),
1999 Value::String("x".into()),
2000 ]];
2001 let pk = vec!["a".to_string(), "b".to_string()];
2002 let out = build_insert_sql(
2003 "\"t\"",
2004 "\"a\", \"b\", \"v\"",
2005 &rows,
2006 Backend::Sqlite,
2007 &cols,
2008 IfExists::Upsert,
2009 &pk,
2010 );
2011 assert_eq!(out.len(), 1);
2012 assert!(out[0].contains("ON CONFLICT (\"a\", \"b\") DO UPDATE SET \"v\" = EXCLUDED.\"v\""));
2013 assert!(!out[0].contains("\"a\" = EXCLUDED.\"a\""));
2015 assert!(!out[0].contains("\"b\" = EXCLUDED.\"b\""));
2016 }
2017
2018 #[cfg(feature = "mysql")]
2019 #[test]
2020 fn build_insert_sql_mysql_skip_emits_insert_ignore() {
2021 let cols = cols_id_name();
2022 let rows = vec![row_id_name(1, "a")];
2023 let pk = vec!["id".to_string()];
2024 let out = build_insert_sql(
2025 "`t`",
2026 "`id`, `name`",
2027 &rows,
2028 Backend::MySql,
2029 &cols,
2030 IfExists::Skip,
2031 &pk,
2032 );
2033 assert_eq!(out.len(), 1);
2034 assert!(out[0].starts_with("INSERT IGNORE INTO `t`"));
2035 }
2036
2037 #[cfg(feature = "mysql")]
2038 #[test]
2039 fn build_insert_sql_mysql_upsert_emits_on_duplicate_key_update() {
2040 let cols = cols_id_name();
2041 let rows = vec![row_id_name(1, "a")];
2042 let pk = vec!["id".to_string()];
2043 let out = build_insert_sql(
2044 "`t`",
2045 "`id`, `name`",
2046 &rows,
2047 Backend::MySql,
2048 &cols,
2049 IfExists::Upsert,
2050 &pk,
2051 );
2052 assert_eq!(out.len(), 1);
2053 assert!(out[0].contains("ON DUPLICATE KEY UPDATE `name` = VALUES(`name`)"));
2054 }
2055
2056 #[cfg(feature = "mssql")]
2057 #[test]
2058 fn build_insert_sql_mssql_skip_emits_merge_when_not_matched() {
2059 let cols = cols_id_name();
2060 let rows = vec![row_id_name(1, "a")];
2061 let pk = vec!["id".to_string()];
2062 let out = build_insert_sql(
2063 "\"t\"",
2064 "\"id\", \"name\"",
2065 &rows,
2066 Backend::MsSql,
2067 &cols,
2068 IfExists::Skip,
2069 &pk,
2070 );
2071 assert_eq!(out.len(), 1);
2072 let sql = &out[0];
2073 assert!(sql.starts_with("MERGE INTO \"t\" AS dst USING (VALUES "));
2074 assert!(sql.contains("ON dst.\"id\" = src.\"id\""));
2075 assert!(sql.contains("WHEN NOT MATCHED THEN INSERT"));
2076 assert!(!sql.contains("WHEN MATCHED"));
2078 assert!(sql.ends_with(';'));
2079 }
2080
2081 #[cfg(feature = "mssql")]
2082 #[test]
2083 fn build_insert_sql_mssql_upsert_emits_full_merge() {
2084 let cols = cols_id_name();
2085 let rows = vec![row_id_name(1, "a")];
2086 let pk = vec!["id".to_string()];
2087 let out = build_insert_sql(
2088 "\"t\"",
2089 "\"id\", \"name\"",
2090 &rows,
2091 Backend::MsSql,
2092 &cols,
2093 IfExists::Upsert,
2094 &pk,
2095 );
2096 assert_eq!(out.len(), 1);
2097 let sql = &out[0];
2098 assert!(sql.contains("WHEN MATCHED THEN UPDATE SET \"name\" = src.\"name\""));
2099 assert!(sql.contains("WHEN NOT MATCHED THEN INSERT"));
2100 }
2101
2102 #[cfg(feature = "oracle")]
2103 #[test]
2104 fn build_insert_sql_oracle_skip_emits_merge_with_select_dual_source() {
2105 let cols = cols_id_name();
2106 let rows = vec![row_id_name(1, "a"), row_id_name(2, "b")];
2107 let pk = vec!["id".to_string()];
2108 let out = build_insert_sql(
2109 "\"t\"",
2110 "\"id\", \"name\"",
2111 &rows,
2112 Backend::Oracle,
2113 &cols,
2114 IfExists::Skip,
2115 &pk,
2116 );
2117 assert_eq!(out.len(), 1);
2118 let sql = &out[0];
2119 assert!(sql.starts_with("MERGE INTO \"t\" dst USING ("));
2120 assert!(sql.contains("SELECT 1 AS \"id\", 'a' AS \"name\" FROM dual"));
2121 assert!(sql.contains(" UNION ALL "));
2122 assert!(sql.contains("ON (dst.\"id\" = src.\"id\")"));
2123 assert!(sql.contains("WHEN NOT MATCHED THEN INSERT"));
2124 assert!(!sql.contains("WHEN MATCHED"));
2125 }
2126
2127 #[cfg(feature = "oracle")]
2128 #[test]
2129 fn build_insert_sql_oracle_upsert_includes_update_branch() {
2130 let cols = cols_id_name();
2131 let rows = vec![row_id_name(1, "a")];
2132 let pk = vec!["id".to_string()];
2133 let out = build_insert_sql(
2134 "\"t\"",
2135 "\"id\", \"name\"",
2136 &rows,
2137 Backend::Oracle,
2138 &cols,
2139 IfExists::Upsert,
2140 &pk,
2141 );
2142 let sql = &out[0];
2143 assert!(sql.contains("WHEN MATCHED THEN UPDATE SET dst.\"name\" = src.\"name\""));
2144 assert!(sql.contains("WHEN NOT MATCHED THEN INSERT"));
2145 }
2146
2147 #[cfg(feature = "sqlite")]
2148 fn default_backend_for_test() -> Backend {
2149 Backend::Sqlite
2150 }
2151 #[cfg(all(not(feature = "sqlite"), feature = "postgres"))]
2152 fn default_backend_for_test() -> Backend {
2153 Backend::Postgres
2154 }
2155 #[cfg(all(not(feature = "sqlite"), not(feature = "postgres"), feature = "mysql"))]
2156 fn default_backend_for_test() -> Backend {
2157 Backend::MySql
2158 }
2159 #[cfg(all(
2160 not(feature = "sqlite"),
2161 not(feature = "postgres"),
2162 not(feature = "mysql"),
2163 feature = "mssql"
2164 ))]
2165 fn default_backend_for_test() -> Backend {
2166 Backend::MsSql
2167 }
2168 #[cfg(all(
2169 not(feature = "sqlite"),
2170 not(feature = "postgres"),
2171 not(feature = "mysql"),
2172 not(feature = "mssql"),
2173 feature = "oracle"
2174 ))]
2175 fn default_backend_for_test() -> Backend {
2176 Backend::Oracle
2177 }
2178
2179 #[cfg(feature = "sqlite")]
2180 #[test]
2181 fn copy_sqlite_to_sqlite_round_trip() {
2182 use crate::connection::ConnectOptions;
2183 use crate::url::DatabaseUrl;
2184 use std::sync::atomic::{AtomicU64, Ordering};
2185
2186 static N: AtomicU64 = AtomicU64::new(0);
2187 let pid = std::process::id();
2188 let n_a = N.fetch_add(1, Ordering::SeqCst);
2189 let n_b = N.fetch_add(1, Ordering::SeqCst);
2190 let path_a = std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_a}-src.db"));
2191 let path_b = std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_b}-dst.db"));
2192 let _ = std::fs::remove_file(&path_a);
2193 let _ = std::fs::remove_file(&path_b);
2194
2195 let url_a = DatabaseUrl::parse(&format!("sqlite://{}", path_a.display())).unwrap();
2196 let url_b = DatabaseUrl::parse(&format!("sqlite://{}", path_b.display())).unwrap();
2197 let mut src = crate::connect(&url_a, &ConnectOptions::default(), None).unwrap();
2198 let mut dst = crate::connect(&url_b, &ConnectOptions::default(), None).unwrap();
2199
2200 src.execute(
2201 "CREATE TABLE test_users (id INTEGER, name TEXT, age INTEGER, score REAL, active INTEGER)",
2202 )
2203 .unwrap();
2204 src.execute("INSERT INTO test_users VALUES (1, 'Alice', 30, 99.5, 1)")
2205 .unwrap();
2206 src.execute("INSERT INTO test_users VALUES (2, 'Bob', 25, 88.25, 0)")
2207 .unwrap();
2208 src.execute("INSERT INTO test_users VALUES (3, 'Carol', 40, NULL, 1)")
2209 .unwrap();
2210
2211 let opts = CopyOptions {
2212 source: CopySource::Table("test_users".into()),
2213 create_table: true,
2214 preserve_pk: false,
2215 if_exists: IfExists::Error,
2216 conflict_key: Vec::new(),
2217 atomic: false,
2218 batch_size: 2,
2219 bulk_mode: BulkMode::Off,
2220 copy_format: CopyFormat::Text,
2221 verbose: false,
2222 progress: None,
2223 };
2224 let copied = copy_rows(&mut src, Backend::Sqlite, &mut dst, Backend::Sqlite, &opts)
2225 .expect("copy_rows");
2226 assert_eq!(copied, 3);
2227
2228 let out = dst
2229 .query("SELECT id, name, age, score, active FROM test_users ORDER BY id")
2230 .unwrap();
2231 assert_eq!(out.rows.len(), 3);
2232 assert!(matches!(&out.rows[0][1], Value::String(s) if s == "Alice"));
2233 assert!(matches!(&out.rows[2][3], Value::Null));
2234
2235 let _ = std::fs::remove_file(&path_a);
2236 let _ = std::fs::remove_file(&path_b);
2237 }
2238
2239 #[cfg(feature = "sqlite")]
2240 #[test]
2241 fn copy_refuses_when_target_non_empty_with_default_strategy() {
2242 use crate::connection::ConnectOptions;
2243 use crate::url::DatabaseUrl;
2244 use std::sync::atomic::{AtomicU64, Ordering};
2245
2246 static N: AtomicU64 = AtomicU64::new(0);
2247 let pid = std::process::id();
2248 let n = N.fetch_add(1, Ordering::SeqCst);
2249 let path = std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n}-conflict.db"));
2250 let _ = std::fs::remove_file(&path);
2251
2252 let url = DatabaseUrl::parse(&format!("sqlite://{}", path.display())).unwrap();
2253 let mut src = crate::connect(&url, &ConnectOptions::default(), None).unwrap();
2254 let mut dst = crate::connect(&url, &ConnectOptions::default(), None).unwrap();
2256
2257 src.execute("CREATE TABLE t (id INTEGER, name TEXT)")
2258 .unwrap();
2259 src.execute("INSERT INTO t VALUES (1, 'existing')").unwrap();
2260
2261 let opts = CopyOptions {
2262 source: CopySource::Table("t".into()),
2263 ..Default::default()
2264 };
2265 let result = copy_rows(&mut src, Backend::Sqlite, &mut dst, Backend::Sqlite, &opts);
2267 let err = result.expect_err("copy should refuse non-empty target by default");
2268 let msg = err.to_string();
2269 assert!(
2270 msg.contains("already contains rows") && msg.contains("--if-exists"),
2271 "unhelpful error message: {msg}"
2272 );
2273
2274 let _ = std::fs::remove_file(&path);
2275 }
2276
2277 #[cfg(feature = "sqlite")]
2278 #[test]
2279 fn copy_truncate_replaces_existing_rows() {
2280 use crate::connection::ConnectOptions;
2281 use crate::url::DatabaseUrl;
2282 use std::sync::atomic::{AtomicU64, Ordering};
2283
2284 static N: AtomicU64 = AtomicU64::new(0);
2285 let pid = std::process::id();
2286 let n_a = N.fetch_add(1, Ordering::SeqCst);
2287 let n_b = N.fetch_add(1, Ordering::SeqCst);
2288 let path_a =
2289 std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_a}-trunc-src.db"));
2290 let path_b =
2291 std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_b}-trunc-dst.db"));
2292 let _ = std::fs::remove_file(&path_a);
2293 let _ = std::fs::remove_file(&path_b);
2294
2295 let url_a = DatabaseUrl::parse(&format!("sqlite://{}", path_a.display())).unwrap();
2296 let url_b = DatabaseUrl::parse(&format!("sqlite://{}", path_b.display())).unwrap();
2297 let mut src = crate::connect(&url_a, &ConnectOptions::default(), None).unwrap();
2298 let mut dst = crate::connect(&url_b, &ConnectOptions::default(), None).unwrap();
2299
2300 src.execute("CREATE TABLE t (id INTEGER, name TEXT)")
2301 .unwrap();
2302 dst.execute("CREATE TABLE t (id INTEGER, name TEXT)")
2303 .unwrap();
2304 dst.execute("INSERT INTO t VALUES (99, 'stale')").unwrap();
2305 src.execute("INSERT INTO t VALUES (1, 'fresh-1')").unwrap();
2306 src.execute("INSERT INTO t VALUES (2, 'fresh-2')").unwrap();
2307
2308 let opts = CopyOptions {
2309 source: CopySource::Table("t".into()),
2310 if_exists: IfExists::Truncate,
2311 ..Default::default()
2312 };
2313 let copied = copy_rows(&mut src, Backend::Sqlite, &mut dst, Backend::Sqlite, &opts)
2314 .expect("copy_rows");
2315 assert_eq!(copied, 2);
2316
2317 let out = dst.query("SELECT id, name FROM t ORDER BY id").unwrap();
2318 assert_eq!(out.rows.len(), 2);
2319 assert!(matches!(&out.rows[0][1], Value::String(s) if s == "fresh-1"));
2320 assert!(matches!(&out.rows[1][1], Value::String(s) if s == "fresh-2"));
2321
2322 let _ = std::fs::remove_file(&path_a);
2323 let _ = std::fs::remove_file(&path_b);
2324 }
2325
2326 #[cfg(feature = "sqlite")]
2327 #[test]
2328 fn copy_query_with_into_and_create_table() {
2329 use crate::connection::ConnectOptions;
2330 use crate::url::DatabaseUrl;
2331 use std::sync::atomic::{AtomicU64, Ordering};
2332
2333 static N: AtomicU64 = AtomicU64::new(0);
2334 let pid = std::process::id();
2335 let n_a = N.fetch_add(1, Ordering::SeqCst);
2336 let n_b = N.fetch_add(1, Ordering::SeqCst);
2337 let path_a = std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_a}-q-src.db"));
2338 let path_b = std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_b}-q-dst.db"));
2339 let _ = std::fs::remove_file(&path_a);
2340 let _ = std::fs::remove_file(&path_b);
2341
2342 let url_a = DatabaseUrl::parse(&format!("sqlite://{}", path_a.display())).unwrap();
2343 let url_b = DatabaseUrl::parse(&format!("sqlite://{}", path_b.display())).unwrap();
2344 let mut src = crate::connect(&url_a, &ConnectOptions::default(), None).unwrap();
2345 let mut dst = crate::connect(&url_b, &ConnectOptions::default(), None).unwrap();
2346
2347 src.execute("CREATE TABLE users (id INTEGER, name TEXT, age INTEGER, active INTEGER)")
2348 .unwrap();
2349 src.execute("INSERT INTO users VALUES (1, 'Alice', 30, 1)")
2350 .unwrap();
2351 src.execute("INSERT INTO users VALUES (2, 'Bob', 25, 0)")
2352 .unwrap();
2353 src.execute("INSERT INTO users VALUES (3, 'Carol', 40, 1)")
2354 .unwrap();
2355
2356 let opts = CopyOptions {
2357 source: CopySource::Query {
2358 sql: "SELECT id, name FROM users WHERE active = 1".into(),
2359 into: "active_users".into(),
2360 },
2361 create_table: true,
2362 ..Default::default()
2363 };
2364 let copied = copy_rows(&mut src, Backend::Sqlite, &mut dst, Backend::Sqlite, &opts)
2365 .expect("copy_rows");
2366 assert_eq!(copied, 2);
2367
2368 let out = dst
2369 .query("SELECT id, name FROM active_users ORDER BY id")
2370 .unwrap();
2371 assert_eq!(out.rows.len(), 2);
2372 assert!(matches!(&out.rows[0][1], Value::String(s) if s == "Alice"));
2373 assert!(matches!(&out.rows[1][1], Value::String(s) if s == "Carol"));
2374
2375 let _ = std::fs::remove_file(&path_a);
2376 let _ = std::fs::remove_file(&path_b);
2377 }
2378
2379 #[cfg(feature = "sqlite")]
2383 mod dispatcher_harness {
2384 use crate::connection::{
2385 BulkInsert, Connection, ExecutionSummary, QueryResult, StatementResult,
2386 };
2387 use crate::error::SqlError;
2388 use async_trait::async_trait;
2389 use std::sync::Arc;
2390 use std::sync::atomic::{AtomicUsize, Ordering};
2391
2392 pub enum BulkBehaviour {
2394 PanicIfCalled,
2396 AlwaysUnavailable,
2398 }
2399
2400 pub struct TrackingDst {
2401 pub inner: Box<dyn Connection>,
2402 pub bulk_calls: Arc<AtomicUsize>,
2403 pub behaviour: BulkBehaviour,
2404 }
2405
2406 impl Connection for TrackingDst {
2407 fn execute(&mut self, sql: &str) -> Result<ExecutionSummary, SqlError> {
2408 self.inner.execute(sql)
2409 }
2410 fn query(&mut self, sql: &str) -> Result<QueryResult, SqlError> {
2411 self.inner.query(sql)
2412 }
2413 fn query_cursor(
2414 &mut self,
2415 sql: &str,
2416 ) -> Result<crate::stream::RowCursor<'_>, SqlError> {
2417 self.inner.query_cursor(sql)
2418 }
2419 fn execute_multi(&mut self, sql: &str) -> Result<Vec<StatementResult>, SqlError> {
2420 self.inner.execute_multi(sql)
2421 }
2422 fn ping(&mut self) -> Result<(), SqlError> {
2423 self.inner.ping()
2424 }
2425 fn list_tables(&mut self, schema: Option<&str>) -> Result<Vec<String>, SqlError> {
2426 self.inner.list_tables(schema)
2427 }
2428 fn list_schemas(&mut self) -> Result<Vec<crate::connection::SchemaInfo>, SqlError> {
2429 self.inner.list_schemas()
2430 }
2431 fn describe_table(
2432 &mut self,
2433 schema: Option<&str>,
2434 table: &str,
2435 ) -> Result<QueryResult, SqlError> {
2436 self.inner.describe_table(schema, table)
2437 }
2438 fn primary_key(
2439 &mut self,
2440 schema: Option<&str>,
2441 table: &str,
2442 ) -> Result<Vec<String>, SqlError> {
2443 self.inner.primary_key(schema, table)
2444 }
2445 fn list_foreign_keys(
2446 &mut self,
2447 schema: Option<&str>,
2448 ) -> Result<Vec<crate::ForeignKey>, SqlError> {
2449 self.inner.list_foreign_keys(schema)
2450 }
2451 fn bulk_insert_rows(&mut self, _target: BulkInsert<'_>) -> Result<usize, SqlError> {
2452 self.bulk_calls.fetch_add(1, Ordering::SeqCst);
2453 match self.behaviour {
2454 BulkBehaviour::PanicIfCalled => {
2455 panic!("bulk_insert_rows was invoked under BulkMode::Off");
2456 }
2457 BulkBehaviour::AlwaysUnavailable => Err(SqlError::BulkUnavailable(
2458 "test wrapper: bulk path forced unavailable".into(),
2459 )),
2460 }
2461 }
2462 }
2463 }
2464
2465 #[cfg(feature = "sqlite")]
2466 fn seed_pair_for_dispatcher_test(
2467 tag: &str,
2468 ) -> (
2469 Box<dyn crate::connection::Connection>,
2470 Box<dyn crate::connection::Connection>,
2471 std::path::PathBuf,
2472 std::path::PathBuf,
2473 ) {
2474 use crate::connection::ConnectOptions;
2475 use crate::url::DatabaseUrl;
2476 use std::sync::atomic::{AtomicU64, Ordering};
2477
2478 static N: AtomicU64 = AtomicU64::new(0);
2479 let pid = std::process::id();
2480 let n_a = N.fetch_add(1, Ordering::SeqCst);
2481 let n_b = N.fetch_add(1, Ordering::SeqCst);
2482 let path_a =
2483 std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_a}-{tag}-src.db"));
2484 let path_b =
2485 std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_b}-{tag}-dst.db"));
2486 let _ = std::fs::remove_file(&path_a);
2487 let _ = std::fs::remove_file(&path_b);
2488
2489 let url_a = DatabaseUrl::parse(&format!("sqlite://{}", path_a.display())).unwrap();
2490 let url_b = DatabaseUrl::parse(&format!("sqlite://{}", path_b.display())).unwrap();
2491 let mut src = crate::connect(&url_a, &ConnectOptions::default(), None).unwrap();
2492 let dst = crate::connect(&url_b, &ConnectOptions::default(), None).unwrap();
2493
2494 src.execute("CREATE TABLE t (id INTEGER, name TEXT)")
2495 .unwrap();
2496 src.execute("INSERT INTO t VALUES (1, 'a')").unwrap();
2497 src.execute("INSERT INTO t VALUES (2, 'b')").unwrap();
2498 src.execute("INSERT INTO t VALUES (3, 'c')").unwrap();
2499
2500 (src, dst, path_a, path_b)
2501 }
2502
2503 #[cfg(feature = "sqlite")]
2506 #[test]
2507 fn dispatcher_off_never_invokes_bulk_path() {
2508 use dispatcher_harness::{BulkBehaviour, TrackingDst};
2509 use std::sync::atomic::{AtomicUsize, Ordering};
2510
2511 let (src, dst_inner, path_a, path_b) = seed_pair_for_dispatcher_test("off");
2512 let bulk_calls = std::sync::Arc::new(AtomicUsize::new(0));
2513 let mut src = src;
2514 let mut dst = TrackingDst {
2515 inner: dst_inner,
2516 bulk_calls: bulk_calls.clone(),
2517 behaviour: BulkBehaviour::PanicIfCalled,
2518 };
2519
2520 let opts = CopyOptions {
2521 source: CopySource::Table("t".into()),
2522 create_table: true,
2523 bulk_mode: BulkMode::Off,
2524 ..Default::default()
2525 };
2526 let copied = copy_rows(
2527 src.as_mut(),
2528 Backend::Sqlite,
2529 &mut dst,
2530 Backend::Sqlite,
2531 &opts,
2532 )
2533 .expect("copy_rows");
2534 assert_eq!(copied, 3);
2535 assert_eq!(bulk_calls.load(Ordering::SeqCst), 0);
2536
2537 let _ = std::fs::remove_file(&path_a);
2538 let _ = std::fs::remove_file(&path_b);
2539 }
2540
2541 #[cfg(feature = "sqlite")]
2544 #[test]
2545 fn dispatcher_auto_falls_back_on_bulk_unavailable() {
2546 use dispatcher_harness::{BulkBehaviour, TrackingDst};
2547 use std::sync::atomic::{AtomicUsize, Ordering};
2548
2549 let (src, dst_inner, path_a, path_b) = seed_pair_for_dispatcher_test("auto");
2550 let bulk_calls = std::sync::Arc::new(AtomicUsize::new(0));
2551 let mut src = src;
2552 let mut dst = TrackingDst {
2553 inner: dst_inner,
2554 bulk_calls: bulk_calls.clone(),
2555 behaviour: BulkBehaviour::AlwaysUnavailable,
2556 };
2557
2558 let opts = CopyOptions {
2561 source: CopySource::Table("t".into()),
2562 create_table: true,
2563 batch_size: 2,
2564 bulk_mode: BulkMode::Auto,
2565 ..Default::default()
2566 };
2567 let copied = copy_rows(
2568 src.as_mut(),
2569 Backend::Sqlite,
2570 &mut dst,
2571 Backend::Sqlite,
2572 &opts,
2573 )
2574 .expect("copy_rows");
2575 assert_eq!(copied, 3);
2576 assert_eq!(bulk_calls.load(Ordering::SeqCst), 2);
2578 let out = dst
2580 .inner
2581 .query("SELECT id, name FROM t ORDER BY id")
2582 .unwrap();
2583 assert_eq!(out.rows.len(), 3);
2584
2585 let _ = std::fs::remove_file(&path_a);
2586 let _ = std::fs::remove_file(&path_b);
2587 }
2588
2589 #[cfg(feature = "sqlite")]
2592 #[test]
2593 fn dispatcher_on_errors_when_bulk_unavailable() {
2594 use dispatcher_harness::{BulkBehaviour, TrackingDst};
2595 use std::sync::atomic::{AtomicUsize, Ordering};
2596
2597 let (src, dst_inner, path_a, path_b) = seed_pair_for_dispatcher_test("on");
2598 let bulk_calls = std::sync::Arc::new(AtomicUsize::new(0));
2599 let mut src = src;
2600 let mut dst = TrackingDst {
2601 inner: dst_inner,
2602 bulk_calls: bulk_calls.clone(),
2603 behaviour: BulkBehaviour::AlwaysUnavailable,
2604 };
2605
2606 let opts = CopyOptions {
2607 source: CopySource::Table("t".into()),
2608 create_table: true,
2609 bulk_mode: BulkMode::On,
2610 ..Default::default()
2611 };
2612 let result = copy_rows(
2613 src.as_mut(),
2614 Backend::Sqlite,
2615 &mut dst,
2616 Backend::Sqlite,
2617 &opts,
2618 );
2619 let err = result.expect_err("copy should fail when bulk path unavailable in On mode");
2620 let msg = err.to_string();
2621 assert!(
2622 msg.contains("--bulk-native"),
2623 "error should mention --bulk-native: {msg}"
2624 );
2625 assert_eq!(bulk_calls.load(Ordering::SeqCst), 1);
2627
2628 let _ = std::fs::remove_file(&path_a);
2629 let _ = std::fs::remove_file(&path_b);
2630 }
2631
2632 #[cfg(feature = "sqlite")]
2636 #[test]
2637 fn copy_skip_preserves_existing_rows() {
2638 use crate::connection::ConnectOptions;
2639 use crate::url::DatabaseUrl;
2640 use std::sync::atomic::{AtomicU64, Ordering};
2641
2642 static N: AtomicU64 = AtomicU64::new(0);
2643 let pid = std::process::id();
2644 let n_a = N.fetch_add(1, Ordering::SeqCst);
2645 let n_b = N.fetch_add(1, Ordering::SeqCst);
2646 let path_a =
2647 std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_a}-skip-src.db"));
2648 let path_b =
2649 std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_b}-skip-dst.db"));
2650 let _ = std::fs::remove_file(&path_a);
2651 let _ = std::fs::remove_file(&path_b);
2652
2653 let url_a = DatabaseUrl::parse(&format!("sqlite://{}", path_a.display())).unwrap();
2654 let url_b = DatabaseUrl::parse(&format!("sqlite://{}", path_b.display())).unwrap();
2655 let mut src = crate::connect(&url_a, &ConnectOptions::default(), None).unwrap();
2656 let mut dst = crate::connect(&url_b, &ConnectOptions::default(), None).unwrap();
2657
2658 src.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT)")
2659 .unwrap();
2660 dst.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT)")
2661 .unwrap();
2662 dst.execute("INSERT INTO t VALUES (1, 'kept')").unwrap();
2665 src.execute("INSERT INTO t VALUES (1, 'new-1')").unwrap();
2666 src.execute("INSERT INTO t VALUES (2, 'src-only')").unwrap();
2667
2668 let opts = CopyOptions {
2669 source: CopySource::Table("t".into()),
2670 if_exists: IfExists::Skip,
2671 ..Default::default()
2672 };
2673 let copied = copy_rows(&mut src, Backend::Sqlite, &mut dst, Backend::Sqlite, &opts)
2674 .expect("copy_rows");
2675 assert_eq!(copied, 2);
2679
2680 let out = dst.query("SELECT id, name FROM t ORDER BY id").unwrap();
2681 assert_eq!(out.rows.len(), 2);
2682 assert!(matches!(&out.rows[0][1], Value::String(s) if s == "kept"));
2684 assert!(matches!(&out.rows[1][1], Value::String(s) if s == "src-only"));
2685
2686 let _ = std::fs::remove_file(&path_a);
2687 let _ = std::fs::remove_file(&path_b);
2688 }
2689
2690 #[cfg(feature = "sqlite")]
2696 #[test]
2697 fn copy_create_table_preserve_pk_emits_primary_key() {
2698 use crate::connection::ConnectOptions;
2699 use crate::url::DatabaseUrl;
2700 use std::sync::atomic::{AtomicU64, Ordering};
2701
2702 static N: AtomicU64 = AtomicU64::new(0);
2703 let pid = std::process::id();
2704 let n_a = N.fetch_add(1, Ordering::SeqCst);
2705 let n_b = N.fetch_add(1, Ordering::SeqCst);
2706 let path_a = std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_a}-pp-src.db"));
2707 let path_b = std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_b}-pp-dst.db"));
2708 let _ = std::fs::remove_file(&path_a);
2709 let _ = std::fs::remove_file(&path_b);
2710
2711 let url_a = DatabaseUrl::parse(&format!("sqlite://{}", path_a.display())).unwrap();
2712 let url_b = DatabaseUrl::parse(&format!("sqlite://{}", path_b.display())).unwrap();
2713 let mut src = crate::connect(&url_a, &ConnectOptions::default(), None).unwrap();
2714 let mut dst = crate::connect(&url_b, &ConnectOptions::default(), None).unwrap();
2715
2716 src.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT)")
2717 .unwrap();
2718 src.execute("INSERT INTO t VALUES (1, 'a')").unwrap();
2719 src.execute("INSERT INTO t VALUES (2, 'b')").unwrap();
2720
2721 let opts = CopyOptions {
2722 source: CopySource::Table("t".into()),
2723 create_table: true,
2724 preserve_pk: true,
2725 ..Default::default()
2726 };
2727 let copied = copy_rows(&mut src, Backend::Sqlite, &mut dst, Backend::Sqlite, &opts)
2728 .expect("copy_rows");
2729 assert_eq!(copied, 2);
2730
2731 let pk = dst.primary_key(None, "t").unwrap();
2733 assert_eq!(pk, vec!["id".to_string()]);
2734
2735 src.execute("UPDATE t SET name = 'a-upd' WHERE id = 1")
2737 .unwrap();
2738 let upsert_opts = CopyOptions {
2739 source: CopySource::Table("t".into()),
2740 if_exists: IfExists::Upsert,
2741 ..Default::default()
2742 };
2743 copy_rows(
2744 &mut src,
2745 Backend::Sqlite,
2746 &mut dst,
2747 Backend::Sqlite,
2748 &upsert_opts,
2749 )
2750 .expect("upsert");
2751 let out = dst.query("SELECT name FROM t WHERE id = 1").unwrap();
2752 assert!(matches!(&out.rows[0][0], Value::String(s) if s == "a-upd"));
2753
2754 let _ = std::fs::remove_file(&path_a);
2755 let _ = std::fs::remove_file(&path_b);
2756 }
2757
2758 #[cfg(feature = "sqlite")]
2762 #[test]
2763 fn copy_create_table_preserve_pk_falls_through_when_source_lacks_pk() {
2764 use crate::connection::ConnectOptions;
2765 use crate::url::DatabaseUrl;
2766 use std::sync::atomic::{AtomicU64, Ordering};
2767
2768 static N: AtomicU64 = AtomicU64::new(0);
2769 let pid = std::process::id();
2770 let n_a = N.fetch_add(1, Ordering::SeqCst);
2771 let n_b = N.fetch_add(1, Ordering::SeqCst);
2772 let path_a = std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_a}-pp2-src.db"));
2773 let path_b = std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_b}-pp2-dst.db"));
2774 let _ = std::fs::remove_file(&path_a);
2775 let _ = std::fs::remove_file(&path_b);
2776
2777 let url_a = DatabaseUrl::parse(&format!("sqlite://{}", path_a.display())).unwrap();
2778 let url_b = DatabaseUrl::parse(&format!("sqlite://{}", path_b.display())).unwrap();
2779 let mut src = crate::connect(&url_a, &ConnectOptions::default(), None).unwrap();
2780 let mut dst = crate::connect(&url_b, &ConnectOptions::default(), None).unwrap();
2781
2782 src.execute("CREATE TABLE t (id INTEGER, name TEXT)")
2784 .unwrap();
2785 src.execute("INSERT INTO t VALUES (1, 'a')").unwrap();
2786
2787 let opts = CopyOptions {
2788 source: CopySource::Table("t".into()),
2789 create_table: true,
2790 preserve_pk: true,
2791 ..Default::default()
2792 };
2793 let copied = copy_rows(&mut src, Backend::Sqlite, &mut dst, Backend::Sqlite, &opts)
2794 .expect("copy_rows");
2795 assert_eq!(copied, 1);
2796
2797 let pk = dst.primary_key(None, "t").unwrap();
2799 assert!(pk.is_empty(), "expected no PK; got {pk:?}");
2800
2801 let _ = std::fs::remove_file(&path_a);
2802 let _ = std::fs::remove_file(&path_b);
2803 }
2804
2805 #[cfg(feature = "sqlite")]
2808 #[test]
2809 fn copy_upsert_overwrites_existing_rows() {
2810 use crate::connection::ConnectOptions;
2811 use crate::url::DatabaseUrl;
2812 use std::sync::atomic::{AtomicU64, Ordering};
2813
2814 static N: AtomicU64 = AtomicU64::new(0);
2815 let pid = std::process::id();
2816 let n_a = N.fetch_add(1, Ordering::SeqCst);
2817 let n_b = N.fetch_add(1, Ordering::SeqCst);
2818 let path_a = std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_a}-up-src.db"));
2819 let path_b = std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_b}-up-dst.db"));
2820 let _ = std::fs::remove_file(&path_a);
2821 let _ = std::fs::remove_file(&path_b);
2822
2823 let url_a = DatabaseUrl::parse(&format!("sqlite://{}", path_a.display())).unwrap();
2824 let url_b = DatabaseUrl::parse(&format!("sqlite://{}", path_b.display())).unwrap();
2825 let mut src = crate::connect(&url_a, &ConnectOptions::default(), None).unwrap();
2826 let mut dst = crate::connect(&url_b, &ConnectOptions::default(), None).unwrap();
2827
2828 src.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT)")
2829 .unwrap();
2830 dst.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT)")
2831 .unwrap();
2832 dst.execute("INSERT INTO t VALUES (1, 'old')").unwrap();
2833 src.execute("INSERT INTO t VALUES (1, 'new-1')").unwrap();
2834 src.execute("INSERT INTO t VALUES (2, 'src-only')").unwrap();
2835
2836 let opts = CopyOptions {
2837 source: CopySource::Table("t".into()),
2838 if_exists: IfExists::Upsert,
2839 ..Default::default()
2840 };
2841 let copied = copy_rows(&mut src, Backend::Sqlite, &mut dst, Backend::Sqlite, &opts)
2842 .expect("copy_rows");
2843 assert_eq!(copied, 2);
2844
2845 let out = dst.query("SELECT id, name FROM t ORDER BY id").unwrap();
2846 assert_eq!(out.rows.len(), 2);
2847 assert!(matches!(&out.rows[0][1], Value::String(s) if s == "new-1"));
2849 assert!(matches!(&out.rows[1][1], Value::String(s) if s == "src-only"));
2850
2851 let _ = std::fs::remove_file(&path_a);
2852 let _ = std::fs::remove_file(&path_b);
2853 }
2854
2855 #[cfg(feature = "sqlite")]
2859 #[test]
2860 fn copy_skip_without_pk_hard_errors() {
2861 use crate::connection::ConnectOptions;
2862 use crate::url::DatabaseUrl;
2863 use std::sync::atomic::{AtomicU64, Ordering};
2864
2865 static N: AtomicU64 = AtomicU64::new(0);
2866 let pid = std::process::id();
2867 let n_a = N.fetch_add(1, Ordering::SeqCst);
2868 let n_b = N.fetch_add(1, Ordering::SeqCst);
2869 let path_a =
2870 std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_a}-nopk-src.db"));
2871 let path_b =
2872 std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_b}-nopk-dst.db"));
2873 let _ = std::fs::remove_file(&path_a);
2874 let _ = std::fs::remove_file(&path_b);
2875
2876 let url_a = DatabaseUrl::parse(&format!("sqlite://{}", path_a.display())).unwrap();
2877 let url_b = DatabaseUrl::parse(&format!("sqlite://{}", path_b.display())).unwrap();
2878 let mut src = crate::connect(&url_a, &ConnectOptions::default(), None).unwrap();
2879 let mut dst = crate::connect(&url_b, &ConnectOptions::default(), None).unwrap();
2880
2881 src.execute("CREATE TABLE t (id INTEGER, name TEXT)")
2882 .unwrap();
2883 dst.execute("CREATE TABLE t (id INTEGER, name TEXT)")
2885 .unwrap();
2886 src.execute("INSERT INTO t VALUES (1, 'a')").unwrap();
2887
2888 let opts = CopyOptions {
2889 source: CopySource::Table("t".into()),
2890 if_exists: IfExists::Skip,
2891 ..Default::default()
2892 };
2893 let err = copy_rows(&mut src, Backend::Sqlite, &mut dst, Backend::Sqlite, &opts)
2894 .expect_err("expected hard error for no-PK + skip");
2895 let msg = format!("{err}");
2896 assert!(
2897 msg.contains("no declared primary key"),
2898 "error should reference missing PK: {msg}"
2899 );
2900 assert!(
2901 msg.contains("--key"),
2902 "error should point at the --key override: {msg}"
2903 );
2904 assert!(
2905 msg.contains("--preserve-pk"),
2906 "error should point at --preserve-pk for create-table users: {msg}"
2907 );
2908
2909 let _ = std::fs::remove_file(&path_a);
2910 let _ = std::fs::remove_file(&path_b);
2911 }
2912
2913 #[cfg(feature = "sqlite")]
2917 #[test]
2918 fn copy_key_override_upserts_against_pk_less_table() {
2919 use crate::connection::ConnectOptions;
2920 use crate::url::DatabaseUrl;
2921 use std::sync::atomic::{AtomicU64, Ordering};
2922
2923 static N: AtomicU64 = AtomicU64::new(0);
2924 let pid = std::process::id();
2925 let n_a = N.fetch_add(1, Ordering::SeqCst);
2926 let n_b = N.fetch_add(1, Ordering::SeqCst);
2927 let path_a =
2928 std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_a}-keyup-src.db"));
2929 let path_b =
2930 std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_b}-keyup-dst.db"));
2931 let _ = std::fs::remove_file(&path_a);
2932 let _ = std::fs::remove_file(&path_b);
2933
2934 let url_a = DatabaseUrl::parse(&format!("sqlite://{}", path_a.display())).unwrap();
2935 let url_b = DatabaseUrl::parse(&format!("sqlite://{}", path_b.display())).unwrap();
2936 let mut src = crate::connect(&url_a, &ConnectOptions::default(), None).unwrap();
2937 let mut dst = crate::connect(&url_b, &ConnectOptions::default(), None).unwrap();
2938
2939 src.execute("CREATE TABLE t (id INTEGER, name TEXT)")
2941 .unwrap();
2942 dst.execute("CREATE TABLE t (id INTEGER, name TEXT, UNIQUE(id))")
2943 .unwrap();
2944 src.execute("INSERT INTO t VALUES (1, 'new-1')").unwrap();
2945 src.execute("INSERT INTO t VALUES (2, 'src-only')").unwrap();
2946 dst.execute("INSERT INTO t VALUES (1, 'old')").unwrap();
2947
2948 let opts = CopyOptions {
2949 source: CopySource::Table("t".into()),
2950 if_exists: IfExists::Upsert,
2951 conflict_key: vec!["id".to_string()],
2952 ..Default::default()
2953 };
2954 let copied = copy_rows(&mut src, Backend::Sqlite, &mut dst, Backend::Sqlite, &opts)
2955 .expect("copy_rows with --key");
2956 assert_eq!(copied, 2);
2957
2958 let out = dst.query("SELECT id, name FROM t ORDER BY id").unwrap();
2959 assert_eq!(out.rows.len(), 2);
2960 assert!(matches!(&out.rows[0][1], Value::String(s) if s == "new-1"));
2961 assert!(matches!(&out.rows[1][1], Value::String(s) if s == "src-only"));
2962
2963 let _ = std::fs::remove_file(&path_a);
2964 let _ = std::fs::remove_file(&path_b);
2965 }
2966
2967 #[cfg(feature = "sqlite")]
2970 #[test]
2971 fn copy_key_override_unknown_column_fails_fast() {
2972 use crate::connection::ConnectOptions;
2973 use crate::url::DatabaseUrl;
2974 use std::sync::atomic::{AtomicU64, Ordering};
2975
2976 static N: AtomicU64 = AtomicU64::new(0);
2977 let pid = std::process::id();
2978 let n_a = N.fetch_add(1, Ordering::SeqCst);
2979 let n_b = N.fetch_add(1, Ordering::SeqCst);
2980 let path_a =
2981 std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_a}-keybad-src.db"));
2982 let path_b =
2983 std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_b}-keybad-dst.db"));
2984 let _ = std::fs::remove_file(&path_a);
2985 let _ = std::fs::remove_file(&path_b);
2986
2987 let url_a = DatabaseUrl::parse(&format!("sqlite://{}", path_a.display())).unwrap();
2988 let url_b = DatabaseUrl::parse(&format!("sqlite://{}", path_b.display())).unwrap();
2989 let mut src = crate::connect(&url_a, &ConnectOptions::default(), None).unwrap();
2990 let mut dst = crate::connect(&url_b, &ConnectOptions::default(), None).unwrap();
2991
2992 src.execute("CREATE TABLE t (id INTEGER, name TEXT)")
2993 .unwrap();
2994 dst.execute("CREATE TABLE t (id INTEGER, name TEXT)")
2995 .unwrap();
2996 src.execute("INSERT INTO t VALUES (1, 'a')").unwrap();
2997
2998 let opts = CopyOptions {
2999 source: CopySource::Table("t".into()),
3000 if_exists: IfExists::Upsert,
3001 conflict_key: vec!["nonexistent".to_string()],
3002 ..Default::default()
3003 };
3004 let err = copy_rows(&mut src, Backend::Sqlite, &mut dst, Backend::Sqlite, &opts)
3005 .expect_err("expected error for unknown --key column");
3006 let msg = format!("{err}");
3007 assert!(
3008 msg.contains("nonexistent"),
3009 "error should name the unknown column: {msg}"
3010 );
3011
3012 let _ = std::fs::remove_file(&path_a);
3013 let _ = std::fs::remove_file(&path_b);
3014 }
3015
3016 #[cfg(feature = "sqlite")]
3020 #[test]
3021 fn copy_upsert_forces_generic_path_even_under_bulk_on() {
3022 use crate::connection::ConnectOptions;
3023 use crate::url::DatabaseUrl;
3024 use dispatcher_harness::{BulkBehaviour, TrackingDst};
3025 use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
3026
3027 static N: AtomicU64 = AtomicU64::new(0);
3028 let pid = std::process::id();
3029 let n_a = N.fetch_add(1, Ordering::SeqCst);
3030 let n_b = N.fetch_add(1, Ordering::SeqCst);
3031 let path_a = std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_a}-bup-src.db"));
3032 let path_b = std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_b}-bup-dst.db"));
3033 let _ = std::fs::remove_file(&path_a);
3034 let _ = std::fs::remove_file(&path_b);
3035
3036 let url_a = DatabaseUrl::parse(&format!("sqlite://{}", path_a.display())).unwrap();
3037 let url_b = DatabaseUrl::parse(&format!("sqlite://{}", path_b.display())).unwrap();
3038 let mut src = crate::connect(&url_a, &ConnectOptions::default(), None).unwrap();
3039 let raw_dst = crate::connect(&url_b, &ConnectOptions::default(), None).unwrap();
3040
3041 src.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT)")
3042 .unwrap();
3043 let mut seed_dst = crate::connect(&url_b, &ConnectOptions::default(), None).unwrap();
3050 seed_dst
3051 .execute("CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT)")
3052 .unwrap();
3053 seed_dst.execute("INSERT INTO t VALUES (1, 'old')").unwrap();
3054 drop(seed_dst);
3055 src.execute("INSERT INTO t VALUES (1, 'new-1')").unwrap();
3056 src.execute("INSERT INTO t VALUES (2, 'src-only')").unwrap();
3057
3058 let bulk_calls = std::sync::Arc::new(AtomicUsize::new(0));
3059 let mut tracking = TrackingDst {
3060 inner: Box::new(raw_dst),
3061 bulk_calls: bulk_calls.clone(),
3062 behaviour: BulkBehaviour::PanicIfCalled,
3063 };
3064
3065 let opts = CopyOptions {
3066 source: CopySource::Table("t".into()),
3067 if_exists: IfExists::Upsert,
3068 bulk_mode: BulkMode::On,
3069 ..Default::default()
3070 };
3071 let copied = copy_rows(
3072 &mut src,
3073 Backend::Sqlite,
3074 &mut tracking,
3075 Backend::Sqlite,
3076 &opts,
3077 )
3078 .expect("copy_rows should succeed under forced-generic path");
3079 assert_eq!(copied, 2);
3080 assert_eq!(bulk_calls.load(Ordering::SeqCst), 0);
3083
3084 let _ = std::fs::remove_file(&path_a);
3085 let _ = std::fs::remove_file(&path_b);
3086 }
3087
3088 #[test]
3091 fn matches_glob_literal_and_wildcards() {
3092 assert!(matches_glob("users", "users"));
3093 assert!(!matches_glob("users", "Users"));
3094 assert!(matches_glob("*", "anything"));
3095 assert!(matches_glob("test_*", "test_users"));
3096 assert!(matches_glob("test_*", "test_orders"));
3097 assert!(!matches_glob("test_*", "users"));
3098 assert!(matches_glob("?ser", "user"));
3099 assert!(!matches_glob("?ser", "users"));
3100 assert!(matches_glob("a*b*c", "axxxbyyc"));
3101 assert!(matches_glob("*", ""));
3102 assert!(!matches_glob("nonempty", ""));
3103 }
3104
3105 fn fk(child: &str, parent: &str) -> ForeignKey {
3106 ForeignKey {
3107 child_table: child.to_string(),
3108 child_columns: vec!["fk".into()],
3109 parent_table: parent.to_string(),
3110 parent_columns: vec!["id".into()],
3111 on_delete: None,
3112 }
3113 }
3114
3115 #[test]
3116 fn topo_sort_simple_dag_orders_parents_first() {
3117 let tables: Vec<String> = ["orders", "users", "items"]
3118 .iter()
3119 .map(|s| s.to_string())
3120 .collect();
3121 let fks = vec![fk("orders", "users"), fk("orders", "items")];
3123 let out = topo_sort(&tables, &fks).expect("ordered");
3124 let users_pos = out.iter().position(|t| t == "users").unwrap();
3125 let items_pos = out.iter().position(|t| t == "items").unwrap();
3126 let orders_pos = out.iter().position(|t| t == "orders").unwrap();
3127 assert!(users_pos < orders_pos, "users must precede orders: {out:?}");
3128 assert!(items_pos < orders_pos, "items must precede orders: {out:?}");
3129 }
3130
3131 #[test]
3132 fn topo_sort_preserves_input_order_for_independent_tables() {
3133 let tables: Vec<String> = ["c", "a", "b"].iter().map(|s| s.to_string()).collect();
3136 let out = topo_sort(&tables, &[]).expect("ordered");
3137 assert_eq!(out, tables);
3138 }
3139
3140 #[test]
3141 fn topo_sort_drops_edges_to_excluded_parents() {
3142 let tables: Vec<String> = ["orders"].iter().map(|s| s.to_string()).collect();
3145 let fks = vec![fk("orders", "users")];
3146 let out = topo_sort(&tables, &fks).expect("ordered");
3147 assert_eq!(out, vec!["orders".to_string()]);
3148 }
3149
3150 #[test]
3151 fn topo_sort_ignores_self_referential_fk() {
3152 let tables: Vec<String> = ["tree"].iter().map(|s| s.to_string()).collect();
3156 let fks = vec![fk("tree", "tree")];
3157 let out = topo_sort(&tables, &fks).expect("ordered");
3158 assert_eq!(out, vec!["tree".to_string()]);
3159 }
3160
3161 #[test]
3162 fn topo_sort_reports_cycle_with_remaining_nodes_sorted() {
3163 let tables: Vec<String> = ["a", "b", "c"].iter().map(|s| s.to_string()).collect();
3164 let fks = vec![fk("a", "b"), fk("b", "c"), fk("c", "a")];
3166 let err = topo_sort(&tables, &fks).expect_err("cycle expected");
3167 assert_eq!(
3168 err.remaining,
3169 vec!["a".to_string(), "b".to_string(), "c".to_string()]
3170 );
3171 }
3172
3173 #[test]
3174 fn topo_sort_cycle_does_not_block_dag_tables() {
3175 let tables: Vec<String> = ["a", "b", "c"].iter().map(|s| s.to_string()).collect();
3178 let fks = vec![fk("a", "b"), fk("b", "a")];
3179 let err = topo_sort(&tables, &fks).expect_err("cycle expected");
3180 assert_eq!(err.remaining, vec!["a".to_string(), "b".to_string()]);
3181 }
3182
3183 #[cfg(feature = "sqlite")]
3188 #[test]
3189 fn copy_all_tables_orders_by_fk_and_copies_everything() {
3190 use crate::connection::ConnectOptions;
3191 use crate::url::DatabaseUrl;
3192 use std::sync::atomic::{AtomicU64, Ordering};
3193
3194 static N: AtomicU64 = AtomicU64::new(0);
3195 let pid = std::process::id();
3196 let n_a = N.fetch_add(1, Ordering::SeqCst);
3197 let n_b = N.fetch_add(1, Ordering::SeqCst);
3198 let path_a = std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_a}-all-src.db"));
3199 let path_b = std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_b}-all-dst.db"));
3200 let _ = std::fs::remove_file(&path_a);
3201 let _ = std::fs::remove_file(&path_b);
3202
3203 let url_a = DatabaseUrl::parse(&format!("sqlite://{}", path_a.display())).unwrap();
3204 let url_b = DatabaseUrl::parse(&format!("sqlite://{}", path_b.display())).unwrap();
3205 let mut src = crate::connect(&url_a, &ConnectOptions::default(), None).unwrap();
3206 let mut dst = crate::connect(&url_b, &ConnectOptions::default(), None).unwrap();
3207
3208 dst.execute("PRAGMA foreign_keys = ON").unwrap();
3211
3212 src.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
3213 .unwrap();
3214 src.execute(
3215 "CREATE TABLE orders (id INTEGER PRIMARY KEY, \
3216 user_id INTEGER REFERENCES users(id), \
3217 total REAL)",
3218 )
3219 .unwrap();
3220 src.execute("INSERT INTO users VALUES (1, 'Alice'), (2, 'Bob')")
3221 .unwrap();
3222 src.execute("INSERT INTO orders VALUES (1, 1, 9.99), (2, 1, 4.50), (3, 2, 12.00)")
3223 .unwrap();
3224
3225 let opts = AllTablesOptions {
3226 create_table: true,
3227 ..Default::default()
3228 };
3229 let copied = copy_all_tables(&mut src, Backend::Sqlite, &mut dst, Backend::Sqlite, &opts)
3230 .expect("copy_all_tables");
3231 assert_eq!(copied, 5);
3233
3234 let u = dst.query("SELECT count(*) FROM users").unwrap();
3235 let o = dst.query("SELECT count(*) FROM orders").unwrap();
3236 assert!(matches!(&u.rows[0][0], Value::Int64(2)));
3237 assert!(matches!(&o.rows[0][0], Value::Int64(3)));
3238
3239 let _ = std::fs::remove_file(&path_a);
3240 let _ = std::fs::remove_file(&path_b);
3241 }
3242
3243 #[cfg(feature = "sqlite")]
3246 #[test]
3247 fn copy_all_tables_respects_include_and_exclude() {
3248 use crate::connection::ConnectOptions;
3249 use crate::url::DatabaseUrl;
3250 use std::sync::atomic::{AtomicU64, Ordering};
3251
3252 static N: AtomicU64 = AtomicU64::new(0);
3253 let pid = std::process::id();
3254 let n_a = N.fetch_add(1, Ordering::SeqCst);
3255 let n_b = N.fetch_add(1, Ordering::SeqCst);
3256 let path_a =
3257 std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_a}-incl-src.db"));
3258 let path_b =
3259 std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_b}-incl-dst.db"));
3260 let _ = std::fs::remove_file(&path_a);
3261 let _ = std::fs::remove_file(&path_b);
3262
3263 let url_a = DatabaseUrl::parse(&format!("sqlite://{}", path_a.display())).unwrap();
3264 let url_b = DatabaseUrl::parse(&format!("sqlite://{}", path_b.display())).unwrap();
3265 let mut src = crate::connect(&url_a, &ConnectOptions::default(), None).unwrap();
3266 let mut dst = crate::connect(&url_b, &ConnectOptions::default(), None).unwrap();
3267
3268 src.execute("CREATE TABLE app_users (id INTEGER, name TEXT)")
3270 .unwrap();
3271 src.execute("CREATE TABLE app_logs (id INTEGER, msg TEXT)")
3272 .unwrap();
3273 src.execute("CREATE TABLE other (id INTEGER)").unwrap();
3274 src.execute("INSERT INTO app_users VALUES (1, 'A')")
3275 .unwrap();
3276 src.execute("INSERT INTO app_logs VALUES (1, 'noise')")
3277 .unwrap();
3278 src.execute("INSERT INTO other VALUES (1)").unwrap();
3279
3280 let opts = AllTablesOptions {
3281 include: vec!["app_*".into()],
3282 exclude: vec!["app_logs".into()],
3283 create_table: true,
3284 ..Default::default()
3285 };
3286 let copied = copy_all_tables(&mut src, Backend::Sqlite, &mut dst, Backend::Sqlite, &opts)
3287 .expect("copy_all_tables");
3288 assert_eq!(copied, 1);
3290 let tables = dst.list_tables(None).unwrap();
3291 assert!(tables.contains(&"app_users".to_string()));
3292 assert!(!tables.contains(&"app_logs".to_string()));
3293 assert!(!tables.contains(&"other".to_string()));
3294
3295 let _ = std::fs::remove_file(&path_a);
3296 let _ = std::fs::remove_file(&path_b);
3297 }
3298
3299 #[cfg(feature = "sqlite")]
3304 #[test]
3305 fn copy_all_tables_rejects_cycle_unless_no_fk_check() {
3306 use crate::connection::ConnectOptions;
3307 use crate::url::DatabaseUrl;
3308 use std::sync::atomic::{AtomicU64, Ordering};
3309
3310 static N: AtomicU64 = AtomicU64::new(0);
3311 let pid = std::process::id();
3312 let n_a = N.fetch_add(1, Ordering::SeqCst);
3313 let n_b = N.fetch_add(1, Ordering::SeqCst);
3314 let path_a = std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_a}-cyc-src.db"));
3315 let path_b = std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_b}-cyc-dst.db"));
3316 let _ = std::fs::remove_file(&path_a);
3317 let _ = std::fs::remove_file(&path_b);
3318
3319 let url_a = DatabaseUrl::parse(&format!("sqlite://{}", path_a.display())).unwrap();
3320 let url_b = DatabaseUrl::parse(&format!("sqlite://{}", path_b.display())).unwrap();
3321 let mut src = crate::connect(&url_a, &ConnectOptions::default(), None).unwrap();
3322 let mut dst = crate::connect(&url_b, &ConnectOptions::default(), None).unwrap();
3323 dst.execute("PRAGMA foreign_keys = OFF").unwrap();
3326
3327 src.execute("CREATE TABLE a (id INTEGER PRIMARY KEY, b_id INTEGER REFERENCES b(id))")
3329 .unwrap();
3330 src.execute("CREATE TABLE b (id INTEGER PRIMARY KEY, a_id INTEGER REFERENCES a(id))")
3331 .unwrap();
3332 src.execute("INSERT INTO a VALUES (1, NULL)").unwrap();
3333 src.execute("INSERT INTO b VALUES (1, NULL)").unwrap();
3334
3335 let opts = AllTablesOptions {
3336 create_table: true,
3337 ..Default::default()
3338 };
3339 let err = copy_all_tables(&mut src, Backend::Sqlite, &mut dst, Backend::Sqlite, &opts)
3340 .expect_err("cycle should hard-error");
3341 let msg = format!("{err}");
3342 assert!(msg.contains("foreign-key cycle"), "{msg}");
3343 assert!(msg.contains("--no-fk-check"), "{msg}");
3344
3345 let opts_relaxed = AllTablesOptions {
3348 create_table: true,
3349 no_fk_check: true,
3350 ..Default::default()
3351 };
3352 let copied = copy_all_tables(
3353 &mut src,
3354 Backend::Sqlite,
3355 &mut dst,
3356 Backend::Sqlite,
3357 &opts_relaxed,
3358 )
3359 .expect("copy_all_tables with --no-fk-check");
3360 assert_eq!(copied, 2);
3361
3362 let _ = std::fs::remove_file(&path_a);
3363 let _ = std::fs::remove_file(&path_b);
3364 }
3365}