Skip to main content

ferrule_sql/
copy.rs

1//! Cross-DB row copy: stream rows from one backend's table or query
2//! into another backend's table, translating types via the unified
3//! [`Value`] enum.
4//!
5//! The default conflict policy is non-destructive — a copy into a
6//! non-empty existing target table errors out before any INSERT (or
7//! source SELECT) runs. Callers opt in to `Append` or `Truncate` via
8//! [`IfExists`].
9
10use crate::backend::Backend;
11use crate::connection::{BulkInsert, Connection, ForeignKey};
12use crate::error::SqlError;
13use crate::render::render_value;
14use crate::transaction::{begin_transaction, commit_transaction, rollback_transaction};
15use crate::value::{ColumnInfo, TypeHint, Value};
16
17/// What to do when the target table already exists and is non-empty.
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
19pub enum IfExists {
20    /// Refuse to copy. Pre-flights the target with `SELECT 1 ... LIMIT 1`
21    /// before issuing any source SELECT.
22    #[default]
23    Error,
24    /// Insert alongside existing rows. UNIQUE/PK conflicts surface as
25    /// driver errors and abort the run with already-committed batches
26    /// still present on the target.
27    Append,
28    /// `DELETE FROM <tbl>` then insert. Destructive. Wrapped together
29    /// with the first batch in a backend-aware transaction so a transient
30    /// failure of the first INSERT cannot leave the target wiped + empty.
31    Truncate,
32    /// Insert rows whose primary key does not yet exist on the target;
33    /// silently skip rows whose PK is already present. PG/SQLite use
34    /// `ON CONFLICT (pk) DO NOTHING`, MySQL uses `INSERT IGNORE`,
35    /// MSSQL/Oracle use a `MERGE … WHEN NOT MATCHED` statement.
36    ///
37    /// Requires conflict columns on the destination table — either via
38    /// a declared primary key, or via the `--key COL[,COL...]`
39    /// override. Tables without either raise a hard error.
40    Skip,
41    /// Insert rows whose primary key does not yet exist; update all
42    /// non-PK columns when the PK already exists. PG/SQLite use
43    /// `ON CONFLICT (pk) DO UPDATE SET col = EXCLUDED.col`, MySQL
44    /// uses `INSERT … ON DUPLICATE KEY UPDATE col = VALUES(col)`,
45    /// MSSQL/Oracle use a full `MERGE` with both branches.
46    ///
47    /// Requires a declared primary key on the destination table; see
48    /// [`Skip`](Self::Skip) for the no-PK behaviour.
49    Upsert,
50}
51
52impl IfExists {
53    /// Parse a strategy name (case-insensitive). Recognised: `error`,
54    /// `append`, `truncate`, `skip`, `upsert`.
55    pub fn parse(s: &str) -> Option<Self> {
56        match s.to_ascii_lowercase().as_str() {
57            "error" => Some(Self::Error),
58            "append" => Some(Self::Append),
59            "truncate" => Some(Self::Truncate),
60            "skip" => Some(Self::Skip),
61            "upsert" => Some(Self::Upsert),
62            _ => None,
63        }
64    }
65
66    /// True for the two PK-driven conflict-resolution strategies.
67    /// Used by `run_copy` to look up the destination PK once up front
68    /// and to force the dispatcher onto the generic INSERT path (the
69    /// native bulk loaders carry no conflict semantics).
70    pub fn resolves_conflicts(self) -> bool {
71        matches!(self, Self::Skip | Self::Upsert)
72    }
73}
74
75/// Whether `copy_rows` should route INSERT batches through the
76/// backend's native bulk loader.
77///
78/// The default is [`Off`] so v1 behaviour is identical to the
79/// Phase 1 generic-INSERT path. Flipping to [`Auto`] is tracked as
80/// a separate follow-up.
81///
82/// [`Off`]: BulkMode::Off
83/// [`Auto`]: BulkMode::Auto
84#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
85pub enum BulkMode {
86    /// Never use the bulk path. Every batch goes through the generic
87    /// `INSERT INTO ... VALUES (..), (..)` (or backend equivalent).
88    #[default]
89    Off,
90    /// Try the bulk path; on [`SqlError::BulkUnavailable`] emit one
91    /// stderr warning and fall back to the generic path for the
92    /// current batch. Any other error surfaces immediately —
93    /// degrading on, e.g., a FK violation would risk double-inserts.
94    Auto,
95    /// Require the bulk path. If a backend returns
96    /// [`SqlError::BulkUnavailable`], `copy_rows` fails with a
97    /// usage-style error instead of falling back.
98    On,
99}
100
101impl BulkMode {
102    /// Parse a mode name (case-insensitive). Recognised: `off`,
103    /// `auto`, `on`.
104    pub fn parse(s: &str) -> Option<Self> {
105        match s.to_ascii_lowercase().as_str() {
106            "off" => Some(Self::Off),
107            "auto" => Some(Self::Auto),
108            "on" => Some(Self::On),
109            _ => None,
110        }
111    }
112}
113
114/// Wire format used by the Postgres `COPY` bulk path.
115///
116/// Defaults to [`Text`] for parity with PR #40 / the Phase-1
117/// dispatcher. Binary is opt-in: it skips PG's text parser, which is
118/// faster on `BIGINT` / `TIMESTAMPTZ` / `UUID` / `NUMERIC`-heavy
119/// schemas, but is at-best break-even on `TEXT` / `JSONB` / `BYTEA`-
120/// heavy ones because typed length prefixes inflate small payloads.
121///
122/// PG-only. Other backends ignore the field — their bulk paths are
123/// already protocol-native (TDS, MySQL `LOAD DATA`, ODPI-C).
124///
125/// [`Text`]: CopyFormat::Text
126#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
127pub enum CopyFormat {
128    /// `COPY … WITH (FORMAT TEXT)`. Tab-separated, newline-terminated;
129    /// the only path before this flag existed.
130    #[default]
131    Text,
132    /// `COPY … WITH (FORMAT BINARY)`. Streamed via
133    /// `tokio_postgres::binary_copy::BinaryCopyInWriter`; per-row
134    /// values are bound through their `ToSql` impls.
135    Binary,
136}
137
138impl CopyFormat {
139    /// Parse a format name (case-insensitive). Recognised: `text`,
140    /// `binary`.
141    pub fn parse(s: &str) -> Option<Self> {
142        match s.to_ascii_lowercase().as_str() {
143            "text" => Some(Self::Text),
144            "binary" => Some(Self::Binary),
145            _ => None,
146        }
147    }
148}
149
150/// Source side of a copy: a whole table or an arbitrary SELECT.
151#[derive(Debug, Clone)]
152pub enum CopySource {
153    /// Copy a whole table. Generates `SELECT * FROM <table>` against
154    /// the source.
155    Table(String),
156    /// Copy the result of an arbitrary SELECT into the named target
157    /// table. The query must be a single SELECT — paging requires it.
158    Query { sql: String, into: String },
159}
160
161impl CopySource {
162    /// Returns the target table name (whether sourced from `Table` or `into`).
163    pub fn target_table(&self) -> &str {
164        match self {
165            Self::Table(t) => t,
166            Self::Query { into, .. } => into,
167        }
168    }
169
170    fn source_sql(&self, src_backend: Backend) -> String {
171        match self {
172            Self::Table(t) => format!("SELECT * FROM {}", quote_identifier(t, src_backend)),
173            Self::Query { sql, .. } => sql.clone(),
174        }
175    }
176}
177
178/// Options for a copy operation.
179pub struct CopyOptions {
180    pub source: CopySource,
181    /// Translate source column metadata into destination DDL and
182    /// `CREATE TABLE` if the target does not exist.
183    pub create_table: bool,
184    /// When combined with [`create_table`](Self::create_table), look up
185    /// the source table's declared primary key and include a matching
186    /// `PRIMARY KEY (...)` clause in the emitted DDL. Default `false`
187    /// preserves the v1 column-only contract. Best-effort: source
188    /// tables with no declared PK fall through to the column-only DDL.
189    /// Ignored in `--query` mode (no canonical source table to inspect).
190    pub preserve_pk: bool,
191    /// What to do if the target table already exists with rows.
192    pub if_exists: IfExists,
193    /// User-supplied conflict-key column list, overriding the PK
194    /// auto-detection used by [`IfExists::Skip`] / [`IfExists::Upsert`].
195    /// Empty = fall back to the destination's declared primary key.
196    /// Validated against the source column shape during copy preflight.
197    /// Ignored for non-conflict strategies (`Error`/`Append`/`Truncate`).
198    pub conflict_key: Vec<String>,
199    /// Wrap the entire copy in a single target-side transaction.
200    pub atomic: bool,
201    /// How many rows per source-side page / target-side INSERT batch.
202    pub batch_size: usize,
203    /// Whether to route batches through the destination backend's
204    /// native bulk loader. Default [`BulkMode::Off`] preserves
205    /// Phase 1 behaviour.
206    pub bulk_mode: BulkMode,
207    /// Wire format for the Postgres `COPY` bulk path. Other backends
208    /// ignore this field. Default [`CopyFormat::Text`].
209    pub copy_format: CopyFormat,
210    /// Whether `copy_rows` should emit per-event diagnostics on
211    /// stderr (currently: a one-line "using native path" notice when
212    /// the bulk path is selected, plus the standard fallback warning
213    /// in [`BulkMode::Auto`]). Mirrors the CLI `--verbose` flag.
214    pub verbose: bool,
215    /// Optional progress callback invoked after each batch with the
216    /// running row count.
217    pub progress: Option<Box<dyn Fn(usize) + Send>>,
218}
219
220impl Default for CopyOptions {
221    fn default() -> Self {
222        Self {
223            source: CopySource::Table(String::new()),
224            create_table: false,
225            preserve_pk: false,
226            if_exists: IfExists::Error,
227            conflict_key: Vec::new(),
228            atomic: false,
229            batch_size: 1000,
230            bulk_mode: BulkMode::Off,
231            copy_format: CopyFormat::Text,
232            verbose: false,
233            progress: None,
234        }
235    }
236}
237
238impl std::fmt::Debug for CopyOptions {
239    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
240        f.debug_struct("CopyOptions")
241            .field("source", &self.source)
242            .field("create_table", &self.create_table)
243            .field("preserve_pk", &self.preserve_pk)
244            .field("if_exists", &self.if_exists)
245            .field("conflict_key", &self.conflict_key)
246            .field("atomic", &self.atomic)
247            .field("batch_size", &self.batch_size)
248            .field("bulk_mode", &self.bulk_mode)
249            .field("verbose", &self.verbose)
250            .field("progress", &self.progress.is_some())
251            .finish()
252    }
253}
254
255/// Stream rows from `src` to `dst` per `opts`. Returns the number of
256/// rows inserted into the target.
257pub fn copy_rows(
258    src: &mut dyn Connection,
259    src_backend: Backend,
260    dst: &mut dyn Connection,
261    dst_backend: Backend,
262    opts: &CopyOptions,
263) -> Result<usize, SqlError> {
264    let target_table = opts.source.target_table().to_string();
265    if target_table.is_empty() {
266        return Err(SqlError::QueryFailed(
267            "copy: target table name is empty".into(),
268        ));
269    }
270    if opts.batch_size == 0 {
271        return Err(SqlError::QueryFailed(
272            "copy: batch_size must be greater than zero".into(),
273        ));
274    }
275
276    let target_exists = table_exists(dst, &target_table)?;
277
278    if !target_exists && !opts.create_table {
279        return Err(SqlError::QueryFailed(format!(
280            "Target table '{target_table}' does not exist on destination. \
281             Pass --create-table to create it from the source schema."
282        )));
283    }
284
285    // Pre-flight conflict check: error strategy refuses if the target
286    // already holds at least one row. Source is never touched in this
287    // case — fail fast.
288    if target_exists
289        && opts.if_exists == IfExists::Error
290        && table_has_rows(dst, &target_table, dst_backend)?
291    {
292        return Err(SqlError::QueryFailed(format!(
293            "Target table '{target_table}' already contains rows. \
294             Pass --if-exists append, --if-exists truncate, --if-exists skip, \
295             --if-exists upsert, or empty the table first."
296        )));
297    }
298
299    // `--key` is a conflict-only flag: warn once on stderr if the user
300    // set it alongside a strategy that doesn't resolve conflicts. Same
301    // shape as the `--bulk-native` + conflict warning above.
302    if !opts.conflict_key.is_empty() && !opts.if_exists.resolves_conflicts() {
303        eprintln!(
304            "[ferrule] copy: --key is only meaningful with --if-exists skip|upsert; \
305             ignoring for --if-exists {}.",
306            if_exists_name(opts.if_exists)
307        );
308    }
309
310    // Conflict-key resolution for Skip/Upsert. Centralised in
311    // `resolve_conflict_key` so the same code path serves the
312    // destination-PK auto-detection and the `--key COL[,COL...]` user
313    // override. An empty override means "fall back to PK detection."
314    let pk_columns: Vec<String> =
315        resolve_conflict_key(dst, &target_table, opts.if_exists, &opts.conflict_key)?;
316
317    // Bulk loaders carry no conflict semantics (PG `COPY` ignores
318    // conflicts, MSSQL bulk has no MERGE, MySQL `LOAD DATA` has its
319    // own IGNORE/REPLACE keywords, Oracle Batch is straight array DML).
320    // When the user opts into Skip or Upsert we force the dispatcher
321    // onto the generic INSERT path so the conflict SQL is actually
322    // emitted. Warn once at the top of the copy rather than per batch.
323    if opts.if_exists.resolves_conflicts() && opts.bulk_mode != BulkMode::Off {
324        eprintln!(
325            "[ferrule] bulk: --if-exists {} requires the generic INSERT path; \
326             ignoring --bulk-native for this copy.",
327            if_exists_name(opts.if_exists)
328        );
329    }
330
331    // First page from source — establishes the column shape.
332    let source_sql = opts.source.source_sql(src_backend);
333    let first_paged = crate::query_builder::apply_paging(
334        &source_sql,
335        Some(opts.batch_size),
336        Some(0),
337        src_backend,
338    )?;
339    let first_page = src.query(&first_paged)?;
340
341    if first_page.columns.is_empty() {
342        return Err(SqlError::QueryFailed(
343            "copy: source query returned no column metadata".into(),
344        ));
345    }
346    let columns: Vec<ColumnInfo> = first_page.columns.clone();
347
348    // Validate that every destination-side PK column is present in the
349    // source column shape. Cross-backend copies can hit case-sensitivity
350    // mismatches here (Oracle returns uppercase names; PG/MySQL return
351    // them as declared) — surface those as an actionable error rather
352    // than letting the conflict SQL fail at execute time with a less
353    // helpful driver message.
354    if !pk_columns.is_empty() {
355        let source_names: Vec<&str> = columns.iter().map(|c| c.name.as_str()).collect();
356        for pk in &pk_columns {
357            if !source_names.iter().any(|n| n == pk) {
358                return Err(SqlError::QueryFailed(format!(
359                    "Target PK column '{pk}' is not present in source columns \
360                     {source_names:?}. Cross-backend identifier case mismatches \
361                     can cause this — re-select with explicit aliases (e.g. \
362                     `SELECT id AS \"{pk}\" ...`)."
363                )));
364            }
365        }
366    }
367
368    // Translate DDL when creating the target table. With --preserve-pk
369    // set, the emitted DDL also carries a PRIMARY KEY clause derived
370    // from the source's declared PK (best-effort: source tables with
371    // no PK still get the v1 column-only DDL).
372    if !target_exists && opts.create_table {
373        let preserved_pk: Vec<String> = if opts.preserve_pk {
374            let src_table = match &opts.source {
375                CopySource::Table(t) => t.clone(),
376                // --query mode: the source isn't a single table, so
377                // there's no canonical PK to lift. Skip silently.
378                CopySource::Query { .. } => String::new(),
379            };
380            if src_table.is_empty() {
381                Vec::new()
382            } else {
383                src.primary_key(None, &src_table).unwrap_or_default()
384            }
385        } else {
386            Vec::new()
387        };
388        let ddl = translate_ddl(&target_table, &columns, dst_backend, &preserved_pk);
389        dst.execute(&ddl)?;
390    }
391
392    // --atomic wraps the entire copy in one outer transaction. The
393    // truncate strategy uses a separate, *short* inner transaction
394    // around just the DELETE + first batch (handled inside run_copy)
395    // so it does not hold locks / redo / wal for the whole copy.
396    let outer_tx_opened = if opts.atomic {
397        begin_transaction(dst, dst_backend)
398    } else {
399        false
400    };
401
402    let result = run_copy(
403        src,
404        src_backend,
405        dst,
406        dst_backend,
407        opts,
408        &source_sql,
409        &target_table,
410        &columns,
411        &pk_columns,
412        target_exists,
413        first_page.rows,
414    );
415
416    if outer_tx_opened {
417        match &result {
418            Ok(_) => {
419                // Commit; if commit fails, surface that as the error.
420                commit_transaction(dst, dst_backend)?;
421            }
422            Err(_) => {
423                // Best-effort rollback; ignore secondary errors.
424                let _ = rollback_transaction(dst, dst_backend);
425            }
426        }
427    } else if result.is_ok() && backend_needs_explicit_commit(dst_backend) {
428        // L1: Oracle has no client-side autocommit (the oracle crate
429        // requires an explicit `COMMIT`). The other backends auto-
430        // commit each `execute()` by default. Without an explicit
431        // commit here, rows inserted by run_copy() would silently
432        // roll back at session close — making the function appear
433        // successful but losing the data. Issue the commit when no
434        // outer transaction was opened (the --atomic branch above
435        // already handles its own commit).
436        commit_transaction(dst, dst_backend)?;
437    }
438
439    result
440}
441
442/// Resolve the conflict-key column list for `--if-exists skip|upsert`.
443///
444/// Returns `Vec::new()` for non-conflict strategies. Otherwise returns
445/// the caller-supplied `override_` (if non-empty), else the
446/// destination's declared primary key. A conflict strategy with no
447/// available key surfaces a hard error before the source SELECT runs.
448///
449/// Single insertion point for both PK auto-detection and the (Phase 3)
450/// `--key COL[,COL...]` user override.
451fn resolve_conflict_key(
452    dst: &mut dyn Connection,
453    target_table: &str,
454    if_exists: IfExists,
455    override_: &[String],
456) -> Result<Vec<String>, SqlError> {
457    if !if_exists.resolves_conflicts() {
458        return Ok(Vec::new());
459    }
460    if !override_.is_empty() {
461        return Ok(override_.to_vec());
462    }
463    let pk = dst.primary_key(None, target_table)?;
464    if pk.is_empty() {
465        return Err(SqlError::QueryFailed(format!(
466            "Target table '{target_table}' has no declared primary key — \
467             --if-exists {} requires one. Declare a PK on the destination \
468             table, pass --preserve-pk when creating it, or supply \
469             --key COL[,COL...] to override the conflict columns.",
470            if_exists_name(if_exists)
471        )));
472    }
473    Ok(pk)
474}
475
476/// Backends whose client driver does *not* auto-commit each
477/// `execute()` call, so `copy_rows` must issue an explicit `COMMIT`
478/// at the end of a successful copy (when no outer transaction is in
479/// play). Currently only Oracle behaves this way; every other
480/// supported backend defaults to autocommit.
481pub(crate) fn backend_needs_explicit_commit(backend: Backend) -> bool {
482    #[cfg(feature = "oracle")]
483    {
484        if matches!(backend, Backend::Oracle) {
485            return true;
486        }
487    }
488    let _ = backend;
489    false
490}
491
492#[allow(clippy::too_many_arguments)]
493fn run_copy(
494    src: &mut dyn Connection,
495    src_backend: Backend,
496    dst: &mut dyn Connection,
497    dst_backend: Backend,
498    opts: &CopyOptions,
499    source_sql: &str,
500    target_table: &str,
501    columns: &[ColumnInfo],
502    pk_columns: &[String],
503    target_exists: bool,
504    first_rows: Vec<Vec<Value>>,
505) -> Result<usize, SqlError> {
506    let quoted_table = quote_identifier(target_table, dst_backend);
507    let quoted_cols: Vec<String> = columns
508        .iter()
509        .map(|c| quote_identifier(&c.name, dst_backend))
510        .collect();
511    let cols_clause = quoted_cols.join(", ");
512
513    // Inner mini-transaction wraps DELETE + first batch when the
514    // truncate strategy is in play AND we're not already inside the
515    // outer --atomic transaction. The inner txn commits as soon as the
516    // first batch lands, so subsequent batches do not hold locks.
517    let need_inner_tx = target_exists && opts.if_exists == IfExists::Truncate && !opts.atomic;
518    let inner_tx_opened = if need_inner_tx {
519        begin_transaction(dst, dst_backend)
520    } else {
521        false
522    };
523
524    // Prologue: DELETE (if truncate) + first INSERT batch.
525    let prologue = run_truncate_and_first_batch(
526        dst,
527        dst_backend,
528        opts,
529        target_exists,
530        target_table,
531        columns,
532        pk_columns,
533        &quoted_table,
534        &cols_clause,
535        &first_rows,
536    );
537
538    let first_len = match prologue {
539        Ok(n) => {
540            if inner_tx_opened {
541                // Commit the short truncate txn before continuing.
542                commit_transaction(dst, dst_backend)?;
543            }
544            n
545        }
546        Err(e) => {
547            if inner_tx_opened {
548                let _ = rollback_transaction(dst, dst_backend);
549            }
550            return Err(e);
551        }
552    };
553
554    if first_len > 0
555        && let Some(cb) = &opts.progress
556    {
557        cb(first_len);
558    }
559
560    let mut total = first_len;
561    let mut offset = first_len;
562
563    // Continue paging only if the first page was full.
564    if first_len >= opts.batch_size {
565        loop {
566            let paged = crate::query_builder::apply_paging(
567                source_sql,
568                Some(opts.batch_size),
569                Some(offset),
570                src_backend,
571            )?;
572            let page = src.query(&paged)?;
573            if page.rows.is_empty() {
574                break;
575            }
576            let fetched = page.rows.len();
577            insert_batch(
578                dst,
579                target_table,
580                columns,
581                pk_columns,
582                &quoted_table,
583                &cols_clause,
584                &page.rows,
585                dst_backend,
586                opts.if_exists,
587                opts.bulk_mode,
588                opts.copy_format,
589                opts.verbose,
590            )?;
591            total += fetched;
592            offset += fetched;
593            if let Some(cb) = &opts.progress {
594                cb(total);
595            }
596            if fetched < opts.batch_size {
597                break;
598            }
599        }
600    }
601
602    Ok(total)
603}
604
605#[allow(clippy::too_many_arguments)]
606fn run_truncate_and_first_batch(
607    dst: &mut dyn Connection,
608    dst_backend: Backend,
609    opts: &CopyOptions,
610    target_exists: bool,
611    target_table: &str,
612    columns: &[ColumnInfo],
613    pk_columns: &[String],
614    quoted_table: &str,
615    cols_clause: &str,
616    first_rows: &[Vec<Value>],
617) -> Result<usize, SqlError> {
618    if target_exists && opts.if_exists == IfExists::Truncate {
619        let sql = format!("DELETE FROM {quoted_table}");
620        dst.execute(&sql)?;
621    }
622    if !first_rows.is_empty() {
623        insert_batch(
624            dst,
625            target_table,
626            columns,
627            pk_columns,
628            quoted_table,
629            cols_clause,
630            first_rows,
631            dst_backend,
632            opts.if_exists,
633            opts.bulk_mode,
634            opts.copy_format,
635            opts.verbose,
636        )?;
637    }
638    Ok(first_rows.len())
639}
640
641/// Insert `rows` into the destination, choosing between the
642/// backend's native bulk loader and the generic INSERT path per
643/// `bulk_mode`. The dispatcher is shared by the truncate prologue
644/// (first batch) and the streaming loop, so a single copy never
645/// mixes the two paths within one run.
646#[allow(clippy::too_many_arguments)]
647pub(crate) fn insert_batch(
648    dst: &mut dyn Connection,
649    target_table: &str,
650    columns: &[ColumnInfo],
651    pk_columns: &[String],
652    quoted_table: &str,
653    cols_clause: &str,
654    rows: &[Vec<Value>],
655    dst_backend: Backend,
656    if_exists: IfExists,
657    bulk_mode: BulkMode,
658    copy_format: CopyFormat,
659    verbose: bool,
660) -> Result<(), SqlError> {
661    if rows.is_empty() {
662        return Ok(());
663    }
664    // Conflict-resolution strategies always use the generic path —
665    // the bulk loaders carry no MERGE/ON CONFLICT semantics. The
666    // top-of-copy warning in `copy_rows` already informed the user.
667    let bulk_eligible =
668        matches!(bulk_mode, BulkMode::Auto | BulkMode::On) && !if_exists.resolves_conflicts();
669    if bulk_eligible {
670        let target = BulkInsert {
671            table: target_table,
672            columns,
673            rows,
674            copy_format,
675        };
676        match dst.bulk_insert_rows(target) {
677            Ok(_) => {
678                if verbose {
679                    eprintln!(
680                        "[ferrule] bulk: inserted {} rows via {} native path",
681                        rows.len(),
682                        dst_backend.name()
683                    );
684                }
685                return Ok(());
686            }
687            Err(SqlError::BulkUnavailable(reason)) => {
688                if bulk_mode == BulkMode::On {
689                    return Err(SqlError::QueryFailed(format!(
690                        "--bulk-native=on but {} bulk path unavailable: {reason}. \
691                         Re-run with --bulk-native=auto to fall back to generic INSERT, \
692                         or --bulk-native=off to disable bulk entirely.",
693                        dst_backend.name()
694                    )));
695                }
696                // Auto: warn once per batch, then fall through. Per-batch
697                // is intentional — multi-batch copies on the same broken
698                // path would otherwise silently lose context.
699                eprintln!(
700                    "[ferrule] bulk: {} path unavailable: {reason}; using generic INSERT",
701                    dst_backend.name()
702                );
703            }
704            Err(other) => return Err(other),
705        }
706    }
707    for sql in build_insert_sql(
708        quoted_table,
709        cols_clause,
710        rows,
711        dst_backend,
712        columns,
713        if_exists,
714        pk_columns,
715    ) {
716        dst.execute(&sql)?;
717    }
718    Ok(())
719}
720
721/// Build one or more INSERT statements for `rows`, chunking and using
722/// backend-appropriate syntax. Returns an empty vec for empty input.
723///
724/// - Oracle uses `INSERT ALL ... SELECT 1 FROM DUAL` (multi-row
725///   `VALUES (..), (..)` is not valid Oracle syntax).
726/// - MSSQL caps each statement at 1000 rows (T-SQL row-constructor
727///   limit; error 10738 above that).
728/// - Oracle caps each statement at 250 rows (defensive — practical
729///   SQL-text-size and `ORA-01795` ceilings tighten quickly past a
730///   few hundred rows of literal values).
731/// - Postgres / MySQL / SQLite emit a single statement.
732pub(crate) fn build_insert_sql(
733    quoted_table: &str,
734    cols_clause: &str,
735    rows: &[Vec<Value>],
736    dst_backend: Backend,
737    columns: &[ColumnInfo],
738    if_exists: IfExists,
739    pk_columns: &[String],
740) -> Vec<String> {
741    if rows.is_empty() {
742        return Vec::new();
743    }
744    let chunk_size = per_statement_row_cap(dst_backend)
745        .unwrap_or(rows.len())
746        .max(1);
747    rows.chunks(chunk_size)
748        .map(|chunk| {
749            build_one_insert(
750                quoted_table,
751                cols_clause,
752                chunk,
753                dst_backend,
754                columns,
755                if_exists,
756                pk_columns,
757            )
758        })
759        .collect()
760}
761
762#[allow(clippy::too_many_arguments)]
763fn build_one_insert(
764    quoted_table: &str,
765    cols_clause: &str,
766    rows: &[Vec<Value>],
767    dst_backend: Backend,
768    columns: &[ColumnInfo],
769    if_exists: IfExists,
770    pk_columns: &[String],
771) -> String {
772    // Conflict-resolution dispatches to a per-backend MERGE / ON
773    // CONFLICT / ODKU shape. Non-conflict paths fall through to the
774    // existing plain INSERT (or INSERT ALL) code below.
775    if if_exists.resolves_conflicts() && !pk_columns.is_empty() {
776        return build_conflict_insert(
777            quoted_table,
778            cols_clause,
779            rows,
780            dst_backend,
781            columns,
782            if_exists,
783            pk_columns,
784        );
785    }
786    match dst_backend {
787        #[cfg(feature = "oracle")]
788        Backend::Oracle => {
789            let mut sql = String::from("INSERT ALL");
790            for row in rows {
791                let cells: Vec<String> = row.iter().map(|v| render_value(v, dst_backend)).collect();
792                sql.push_str(&format!(
793                    " INTO {quoted_table} ({cols_clause}) VALUES ({})",
794                    cells.join(", ")
795                ));
796            }
797            sql.push_str(" SELECT 1 FROM DUAL");
798            sql
799        }
800        _ => {
801            let values: Vec<String> = rows
802                .iter()
803                .map(|row| {
804                    let cells: Vec<String> =
805                        row.iter().map(|v| render_value(v, dst_backend)).collect();
806                    format!("({})", cells.join(", "))
807                })
808                .collect();
809            format!(
810                "INSERT INTO {quoted_table} ({cols_clause}) VALUES {}",
811                values.join(", ")
812            )
813        }
814    }
815}
816
817/// Build a single-statement INSERT with backend-specific conflict
818/// resolution: PG/SQLite `ON CONFLICT`, MySQL `INSERT IGNORE` /
819/// `ON DUPLICATE KEY UPDATE`, MSSQL/Oracle `MERGE`.
820///
821/// Pre-conditions enforced by the caller: `pk_columns` is non-empty,
822/// `columns` matches each row's positional shape, and `if_exists` is
823/// one of `Skip` / `Upsert`. The per-statement row cap (MSSQL: 1000;
824/// Oracle: 250) still applies — chunking happens in `build_insert_sql`.
825#[allow(clippy::too_many_arguments)]
826fn build_conflict_insert(
827    quoted_table: &str,
828    cols_clause: &str,
829    rows: &[Vec<Value>],
830    dst_backend: Backend,
831    columns: &[ColumnInfo],
832    if_exists: IfExists,
833    pk_columns: &[String],
834) -> String {
835    // PK columns must be quoted with the destination's identifier
836    // rules — emitted in WHERE / ON / RETURNING positions.
837    let quoted_pks: Vec<String> = pk_columns
838        .iter()
839        .map(|n| quote_identifier(n, dst_backend))
840        .collect();
841    // Non-PK column names, used for the UPDATE SET assignment list.
842    let non_pk_quoted: Vec<String> = columns
843        .iter()
844        .filter(|c| !pk_columns.iter().any(|pk| pk == &c.name))
845        .map(|c| quote_identifier(&c.name, dst_backend))
846        .collect();
847    match dst_backend {
848        #[cfg(feature = "mssql")]
849        Backend::MsSql => build_mssql_merge(
850            quoted_table,
851            cols_clause,
852            rows,
853            columns,
854            if_exists,
855            &quoted_pks,
856            &non_pk_quoted,
857            dst_backend,
858        ),
859        #[cfg(feature = "oracle")]
860        Backend::Oracle => build_oracle_merge(
861            quoted_table,
862            cols_clause,
863            rows,
864            columns,
865            if_exists,
866            &quoted_pks,
867            &non_pk_quoted,
868            dst_backend,
869        ),
870        #[cfg(feature = "mysql")]
871        Backend::MySql => build_mysql_conflict(
872            quoted_table,
873            cols_clause,
874            rows,
875            if_exists,
876            &non_pk_quoted,
877            dst_backend,
878        ),
879        // Postgres + SQLite share ON CONFLICT syntax.
880        _ => build_pg_sqlite_on_conflict(
881            quoted_table,
882            cols_clause,
883            rows,
884            if_exists,
885            &quoted_pks,
886            &non_pk_quoted,
887            dst_backend,
888        ),
889    }
890}
891
892fn build_pg_sqlite_on_conflict(
893    quoted_table: &str,
894    cols_clause: &str,
895    rows: &[Vec<Value>],
896    if_exists: IfExists,
897    quoted_pks: &[String],
898    non_pk_quoted: &[String],
899    dst_backend: Backend,
900) -> String {
901    let values = render_values_vec(rows, dst_backend);
902    let pk_list = quoted_pks.join(", ");
903    let conflict_clause = if if_exists == IfExists::Skip || non_pk_quoted.is_empty() {
904        // Upsert on a PK-only table collapses to DO NOTHING — there
905        // is nothing to update.
906        format!("ON CONFLICT ({pk_list}) DO NOTHING")
907    } else {
908        let assignments: Vec<String> = non_pk_quoted
909            .iter()
910            .map(|c| format!("{c} = EXCLUDED.{c}"))
911            .collect();
912        format!(
913            "ON CONFLICT ({pk_list}) DO UPDATE SET {}",
914            assignments.join(", ")
915        )
916    };
917    format!(
918        "INSERT INTO {quoted_table} ({cols_clause}) VALUES {} {conflict_clause}",
919        values.join(", ")
920    )
921}
922
923#[cfg(feature = "mysql")]
924fn build_mysql_conflict(
925    quoted_table: &str,
926    cols_clause: &str,
927    rows: &[Vec<Value>],
928    if_exists: IfExists,
929    non_pk_quoted: &[String],
930    dst_backend: Backend,
931) -> String {
932    let values = render_values_vec(rows, dst_backend);
933    match if_exists {
934        IfExists::Skip => format!(
935            "INSERT IGNORE INTO {quoted_table} ({cols_clause}) VALUES {}",
936            values.join(", ")
937        ),
938        IfExists::Upsert if !non_pk_quoted.is_empty() => {
939            let assignments: Vec<String> = non_pk_quoted
940                .iter()
941                .map(|c| format!("{c} = VALUES({c})"))
942                .collect();
943            format!(
944                "INSERT INTO {quoted_table} ({cols_clause}) VALUES {} \
945                 ON DUPLICATE KEY UPDATE {}",
946                values.join(", "),
947                assignments.join(", ")
948            )
949        }
950        // Upsert on a PK-only table collapses to INSERT IGNORE.
951        _ => format!(
952            "INSERT IGNORE INTO {quoted_table} ({cols_clause}) VALUES {}",
953            values.join(", ")
954        ),
955    }
956}
957
958#[cfg(feature = "mssql")]
959#[allow(clippy::too_many_arguments)]
960fn build_mssql_merge(
961    quoted_table: &str,
962    cols_clause: &str,
963    rows: &[Vec<Value>],
964    columns: &[ColumnInfo],
965    if_exists: IfExists,
966    quoted_pks: &[String],
967    non_pk_quoted: &[String],
968    dst_backend: Backend,
969) -> String {
970    let source_alias_cols: Vec<String> = columns
971        .iter()
972        .map(|c| quote_identifier(&c.name, dst_backend))
973        .collect();
974    let source_alias_clause = source_alias_cols.join(", ");
975    let values = render_values_vec(rows, dst_backend);
976    let on_clause: Vec<String> = quoted_pks
977        .iter()
978        .map(|pk| format!("dst.{pk} = src.{pk}"))
979        .collect();
980    let insert_values: Vec<String> = source_alias_cols
981        .iter()
982        .map(|c| format!("src.{c}"))
983        .collect();
984    let mut sql = format!(
985        "MERGE INTO {quoted_table} AS dst \
986         USING (VALUES {}) AS src ({source_alias_clause}) \
987         ON {} ",
988        values.join(", "),
989        on_clause.join(" AND "),
990    );
991    if if_exists == IfExists::Upsert && !non_pk_quoted.is_empty() {
992        let assignments: Vec<String> = non_pk_quoted
993            .iter()
994            .map(|c| format!("{c} = src.{c}"))
995            .collect();
996        sql.push_str(&format!(
997            "WHEN MATCHED THEN UPDATE SET {} ",
998            assignments.join(", ")
999        ));
1000    }
1001    sql.push_str(&format!(
1002        "WHEN NOT MATCHED THEN INSERT ({cols_clause}) VALUES ({});",
1003        insert_values.join(", ")
1004    ));
1005    sql
1006}
1007
1008#[cfg(feature = "oracle")]
1009#[allow(clippy::too_many_arguments)]
1010fn build_oracle_merge(
1011    quoted_table: &str,
1012    cols_clause: &str,
1013    rows: &[Vec<Value>],
1014    columns: &[ColumnInfo],
1015    if_exists: IfExists,
1016    quoted_pks: &[String],
1017    non_pk_quoted: &[String],
1018    dst_backend: Backend,
1019) -> String {
1020    let source_alias_cols: Vec<String> = columns
1021        .iter()
1022        .map(|c| quote_identifier(&c.name, dst_backend))
1023        .collect();
1024    // Oracle MERGE source clauses use `SELECT ... FROM dual UNION ALL
1025    // ...` rather than VALUES — Oracle has no row-constructor.
1026    let source_rows: Vec<String> = rows
1027        .iter()
1028        .map(|row| {
1029            let cells: Vec<String> = row
1030                .iter()
1031                .zip(source_alias_cols.iter())
1032                .map(|(v, alias)| format!("{} AS {alias}", render_value(v, dst_backend)))
1033                .collect();
1034            format!("SELECT {} FROM dual", cells.join(", "))
1035        })
1036        .collect();
1037    let on_clause: Vec<String> = quoted_pks
1038        .iter()
1039        .map(|pk| format!("dst.{pk} = src.{pk}"))
1040        .collect();
1041    let insert_values: Vec<String> = source_alias_cols
1042        .iter()
1043        .map(|c| format!("src.{c}"))
1044        .collect();
1045    let mut sql = format!(
1046        "MERGE INTO {quoted_table} dst \
1047         USING ({}) src \
1048         ON ({}) ",
1049        source_rows.join(" UNION ALL "),
1050        on_clause.join(" AND "),
1051    );
1052    if if_exists == IfExists::Upsert && !non_pk_quoted.is_empty() {
1053        let assignments: Vec<String> = non_pk_quoted
1054            .iter()
1055            .map(|c| format!("dst.{c} = src.{c}"))
1056            .collect();
1057        sql.push_str(&format!(
1058            "WHEN MATCHED THEN UPDATE SET {} ",
1059            assignments.join(", ")
1060        ));
1061    }
1062    sql.push_str(&format!(
1063        "WHEN NOT MATCHED THEN INSERT ({cols_clause}) VALUES ({})",
1064        insert_values.join(", ")
1065    ));
1066    sql
1067}
1068
1069fn render_values_vec(rows: &[Vec<Value>], dst_backend: Backend) -> Vec<String> {
1070    rows.iter()
1071        .map(|row| {
1072            let cells: Vec<String> = row.iter().map(|v| render_value(v, dst_backend)).collect();
1073            format!("({})", cells.join(", "))
1074        })
1075        .collect()
1076}
1077
1078fn if_exists_name(s: IfExists) -> &'static str {
1079    match s {
1080        IfExists::Error => "error",
1081        IfExists::Append => "append",
1082        IfExists::Truncate => "truncate",
1083        IfExists::Skip => "skip",
1084        IfExists::Upsert => "upsert",
1085    }
1086}
1087
1088fn per_statement_row_cap(backend: Backend) -> Option<usize> {
1089    match backend {
1090        #[cfg(feature = "mssql")]
1091        Backend::MsSql => Some(1000),
1092        #[cfg(feature = "oracle")]
1093        Backend::Oracle => Some(250),
1094        _ => None,
1095    }
1096}
1097
1098/// Backend-aware identifier quoting:
1099/// - Postgres / SQLite / Oracle / MSSQL: `"name"` (ANSI; MSSQL also
1100///   accepts `[name]`, but ANSI quotes work with QUOTED_IDENTIFIER ON,
1101///   the default).
1102/// - MySQL: backticks. The ANSI form requires `ANSI_QUOTES` SQL_MODE
1103///   which ferrule does not assume.
1104///
1105/// Public so `dump.rs` (and future schema-aware modules) can route
1106/// identifier quoting through a single backend-aware implementation.
1107pub fn quote_identifier(id: &str, backend: Backend) -> String {
1108    match backend {
1109        #[cfg(feature = "mysql")]
1110        Backend::MySql => format!("`{}`", id.replace('`', "``")),
1111        #[cfg(feature = "postgres")]
1112        Backend::Postgres => ansi_quote(id),
1113        #[cfg(feature = "sqlite")]
1114        Backend::Sqlite => ansi_quote(id),
1115        #[cfg(feature = "mssql")]
1116        Backend::MsSql => ansi_quote(id),
1117        #[cfg(feature = "oracle")]
1118        Backend::Oracle => ansi_quote(id),
1119    }
1120}
1121
1122fn ansi_quote(id: &str) -> String {
1123    format!("\"{}\"", id.replace('"', "\"\""))
1124}
1125
1126/// Translate source column metadata into a `CREATE TABLE IF NOT EXISTS`
1127/// statement for the destination backend.
1128///
1129/// When `pk` is non-empty, a `PRIMARY KEY (...)` clause is appended
1130/// after the column list — this is what `--preserve-pk` wires up.
1131/// Pass `&[]` for the v1 column-only DDL.
1132pub fn translate_ddl(table: &str, cols: &[ColumnInfo], dst: Backend, pk: &[String]) -> String {
1133    let quoted_table = quote_identifier(table, dst);
1134    let col_defs: Vec<String> = cols
1135        .iter()
1136        .map(|c| {
1137            let name = quote_identifier(&c.name, dst);
1138            let ty = translate_type(c.type_hint, dst);
1139            let null_clause = if c.nullable { "" } else { " NOT NULL" };
1140            format!("{name} {ty}{null_clause}")
1141        })
1142        .collect();
1143    let pk_clause = if pk.is_empty() {
1144        String::new()
1145    } else {
1146        let quoted_pks: Vec<String> = pk.iter().map(|c| quote_identifier(c, dst)).collect();
1147        format!(", PRIMARY KEY ({})", quoted_pks.join(", "))
1148    };
1149    format!(
1150        "CREATE TABLE IF NOT EXISTS {quoted_table} ({}{pk_clause})",
1151        col_defs.join(", ")
1152    )
1153}
1154
1155/// Map a unified [`TypeHint`] to a SQL type for the destination
1156/// backend. The mapping favours portability over fidelity:
1157/// `Decimal` collapses to a `(38,10)` default on backends that need
1158/// precision, `Array` stores as a JSON-ish text, and `Other`/`Null`
1159/// fall back to the backend's "wide string" type.
1160pub fn translate_type(hint: TypeHint, dst: Backend) -> &'static str {
1161    match dst {
1162        #[cfg(feature = "postgres")]
1163        Backend::Postgres => match hint {
1164            TypeHint::Bool => "BOOLEAN",
1165            TypeHint::Int64 => "BIGINT",
1166            TypeHint::Float64 => "DOUBLE PRECISION",
1167            TypeHint::Decimal => "NUMERIC",
1168            TypeHint::Bytes => "BYTEA",
1169            TypeHint::Date => "DATE",
1170            TypeHint::Time => "TIME",
1171            TypeHint::DateTime => "TIMESTAMP",
1172            TypeHint::DateTimeTz => "TIMESTAMPTZ",
1173            TypeHint::Json | TypeHint::Array => "JSONB",
1174            TypeHint::Uuid => "UUID",
1175            TypeHint::String | TypeHint::Other | TypeHint::Null => "TEXT",
1176        },
1177        #[cfg(feature = "mysql")]
1178        Backend::MySql => match hint {
1179            TypeHint::Bool => "TINYINT(1)",
1180            TypeHint::Int64 => "BIGINT",
1181            TypeHint::Float64 => "DOUBLE",
1182            TypeHint::Decimal => "DECIMAL(38,10)",
1183            TypeHint::Bytes => "LONGBLOB",
1184            TypeHint::Date => "DATE",
1185            TypeHint::Time => "TIME",
1186            TypeHint::DateTime | TypeHint::DateTimeTz => "DATETIME",
1187            TypeHint::Json | TypeHint::Array => "JSON",
1188            TypeHint::Uuid => "CHAR(36)",
1189            TypeHint::String | TypeHint::Other | TypeHint::Null => "TEXT",
1190        },
1191        #[cfg(feature = "mssql")]
1192        Backend::MsSql => match hint {
1193            TypeHint::Bool => "BIT",
1194            TypeHint::Int64 => "BIGINT",
1195            TypeHint::Float64 => "FLOAT",
1196            TypeHint::Decimal => "DECIMAL(38,10)",
1197            TypeHint::Bytes => "VARBINARY(MAX)",
1198            TypeHint::Date => "DATE",
1199            TypeHint::Time => "TIME",
1200            TypeHint::DateTime => "DATETIME2",
1201            TypeHint::DateTimeTz => "DATETIMEOFFSET",
1202            TypeHint::Json
1203            | TypeHint::Array
1204            | TypeHint::String
1205            | TypeHint::Other
1206            | TypeHint::Null => "NVARCHAR(MAX)",
1207            TypeHint::Uuid => "UNIQUEIDENTIFIER",
1208        },
1209        #[cfg(feature = "sqlite")]
1210        Backend::Sqlite => match hint {
1211            TypeHint::Bool | TypeHint::Int64 => "INTEGER",
1212            TypeHint::Float64 => "REAL",
1213            TypeHint::Decimal => "NUMERIC",
1214            TypeHint::Bytes => "BLOB",
1215            // SQLite stores everything else as TEXT under dynamic typing.
1216            _ => "TEXT",
1217        },
1218        #[cfg(feature = "oracle")]
1219        Backend::Oracle => match hint {
1220            TypeHint::Bool => "NUMBER(1)",
1221            TypeHint::Int64 => "NUMBER(19)",
1222            TypeHint::Float64 => "BINARY_DOUBLE",
1223            TypeHint::Decimal => "NUMBER",
1224            TypeHint::Bytes => "BLOB",
1225            TypeHint::Date => "DATE",
1226            TypeHint::Time | TypeHint::DateTime => "TIMESTAMP",
1227            TypeHint::DateTimeTz => "TIMESTAMP WITH TIME ZONE",
1228            TypeHint::Json
1229            | TypeHint::Array
1230            | TypeHint::String
1231            | TypeHint::Other
1232            | TypeHint::Null => "CLOB",
1233            TypeHint::Uuid => "RAW(16)",
1234        },
1235    }
1236}
1237
1238fn table_exists(conn: &mut dyn Connection, table: &str) -> Result<bool, SqlError> {
1239    let tables = conn.list_tables(None)?;
1240    Ok(tables.iter().any(|t| t.eq_ignore_ascii_case(table)))
1241}
1242
1243fn table_has_rows(
1244    conn: &mut dyn Connection,
1245    table: &str,
1246    backend: Backend,
1247) -> Result<bool, SqlError> {
1248    let qident = quote_identifier(table, backend);
1249    let sql = crate::query_builder::apply_paging(
1250        &format!("SELECT 1 FROM {qident}"),
1251        Some(1),
1252        None,
1253        backend,
1254    )?;
1255    let result = conn.query(&sql)?;
1256    Ok(!result.rows.is_empty())
1257}
1258
1259// -------------------------------------------------------------------
1260// Phase 3: schema-level (--all-tables) copy
1261// -------------------------------------------------------------------
1262
1263/// Options for `copy_all_tables`.
1264///
1265/// Patterns use simple shell-style globs (`*`, `?`); matching is
1266/// case-sensitive against the identifier shape the source returns
1267/// (Oracle uppercases unquoted identifiers). Tables are included when
1268/// they match *at least one* `--include` (or the include list is
1269/// empty) and match *no* `--exclude`.
1270pub struct AllTablesOptions {
1271    /// Repeatable include patterns. Empty = include everything.
1272    pub include: Vec<String>,
1273    /// Repeatable exclude patterns. Always applied after include.
1274    pub exclude: Vec<String>,
1275    /// Carry through to the per-table `CopyOptions`. Per-table
1276    /// `source` and `create_table` are derived inside this module.
1277    pub if_exists: IfExists,
1278    pub atomic: bool,
1279    pub batch_size: usize,
1280    pub bulk_mode: BulkMode,
1281    pub copy_format: CopyFormat,
1282    pub verbose: bool,
1283    pub create_table: bool,
1284    /// Preserve the source PK in emitted DDL (per-table lookup) when
1285    /// combined with `create_table`. See
1286    /// [`CopyOptions::preserve_pk`](CopyOptions::preserve_pk).
1287    pub preserve_pk: bool,
1288    /// User-supplied conflict-key columns applied to every table.
1289    /// Use sparingly with `--all-tables` — a single column list rarely
1290    /// makes sense across many tables. Empty = per-table PK detection.
1291    pub conflict_key: Vec<String>,
1292    /// If true, ignore cycle errors from `topo_sort` and copy in a
1293    /// deterministic-but-arbitrary order. The user must understand
1294    /// FK violations may surface as driver errors on first insert.
1295    pub no_fk_check: bool,
1296}
1297
1298impl Default for AllTablesOptions {
1299    fn default() -> Self {
1300        Self {
1301            include: Vec::new(),
1302            exclude: Vec::new(),
1303            if_exists: IfExists::Error,
1304            atomic: false,
1305            batch_size: 1000,
1306            bulk_mode: BulkMode::Off,
1307            copy_format: CopyFormat::Text,
1308            verbose: false,
1309            create_table: false,
1310            preserve_pk: false,
1311            conflict_key: Vec::new(),
1312            no_fk_check: false,
1313        }
1314    }
1315}
1316
1317/// Match `name` against `pattern` using shell-style `*` and `?`.
1318/// `*` matches zero or more characters; `?` matches exactly one.
1319/// Case-sensitive. All other characters match literally.
1320pub(crate) fn matches_glob(pattern: &str, name: &str) -> bool {
1321    fn helper(p: &[u8], n: &[u8]) -> bool {
1322        match (p.split_first(), n.split_first()) {
1323            (None, None) => true,
1324            (Some((b'*', rest)), _) => {
1325                // Try matching the rest at every suffix of `n`.
1326                if helper(rest, n) {
1327                    return true;
1328                }
1329                if let Some((_, ns)) = n.split_first() {
1330                    helper(p, ns)
1331                } else {
1332                    false
1333                }
1334            }
1335            (Some((b'?', rest_p)), Some((_, rest_n))) => helper(rest_p, rest_n),
1336            (Some((pc, rest_p)), Some((nc, rest_n))) if pc == nc => helper(rest_p, rest_n),
1337            _ => false,
1338        }
1339    }
1340    helper(pattern.as_bytes(), name.as_bytes())
1341}
1342
1343/// Discover tables on `src` and apply include/exclude filters.
1344/// Returns the filtered list in `list_tables` order (alphabetical for
1345/// most backends). Caller is expected to feed this into [`topo_sort`]
1346/// before issuing copies.
1347pub fn discover_tables(
1348    src: &mut dyn Connection,
1349    schema: Option<&str>,
1350    include: &[String],
1351    exclude: &[String],
1352) -> Result<Vec<String>, SqlError> {
1353    let raw = src.list_tables(schema)?;
1354    Ok(raw
1355        .into_iter()
1356        .filter(|t| {
1357            if !include.is_empty() && !include.iter().any(|p| matches_glob(p, t)) {
1358                return false;
1359            }
1360            if exclude.iter().any(|p| matches_glob(p, t)) {
1361                return false;
1362            }
1363            true
1364        })
1365        .collect())
1366}
1367
1368/// Error type for [`topo_sort`]. Carries the cycle path so callers
1369/// can surface it in the error message.
1370#[derive(Debug, Clone, PartialEq, Eq)]
1371pub struct CycleError {
1372    /// Tables that participate in at least one cycle, in
1373    /// alphabetical order so the message is deterministic.
1374    pub remaining: Vec<String>,
1375}
1376
1377/// Topologically sort `tables` so parents are loaded before
1378/// children, using `fks` as the edge set. Tables referenced as
1379/// parents but not present in `tables` (e.g. excluded from
1380/// `--all-tables` selection) are dropped from the dependency graph.
1381///
1382/// Returns the load order on success, or [`CycleError`] when the
1383/// remaining graph after Kahn's algorithm still has nodes (i.e.
1384/// every remaining node sits on a cycle).
1385///
1386/// The output preserves the relative order of independent tables
1387/// from `tables` so successive runs against the same fixture
1388/// produce identical orderings.
1389pub fn topo_sort(tables: &[String], fks: &[ForeignKey]) -> Result<Vec<String>, CycleError> {
1390    use std::collections::{BTreeSet, HashMap, HashSet};
1391
1392    let present: HashSet<String> = tables.iter().cloned().collect();
1393    let order_index: HashMap<String, usize> = tables
1394        .iter()
1395        .enumerate()
1396        .map(|(i, t)| (t.clone(), i))
1397        .collect();
1398    // Map child -> set of parents that are in `tables`.
1399    let mut parents: HashMap<String, BTreeSet<String>> = HashMap::new();
1400    let mut children: HashMap<String, Vec<String>> = HashMap::new();
1401    for t in tables {
1402        parents.entry(t.clone()).or_default();
1403    }
1404    for fk in fks {
1405        if !present.contains(&fk.child_table) || !present.contains(&fk.parent_table) {
1406            continue;
1407        }
1408        if fk.child_table == fk.parent_table {
1409            // Self-referential FK is not a multi-table cycle.
1410            continue;
1411        }
1412        if parents
1413            .get_mut(&fk.child_table)
1414            .unwrap()
1415            .insert(fk.parent_table.clone())
1416        {
1417            children
1418                .entry(fk.parent_table.clone())
1419                .or_default()
1420                .push(fk.child_table.clone());
1421        }
1422    }
1423
1424    // Kahn's algorithm. Tie-break on the original `tables` order so
1425    // the output is deterministic across runs against the same input.
1426    let mut ready: Vec<String> = tables
1427        .iter()
1428        .filter(|t| parents.get(*t).is_some_and(|p| p.is_empty()))
1429        .cloned()
1430        .collect();
1431    let mut out: Vec<String> = Vec::with_capacity(tables.len());
1432    let mut emitted: HashSet<String> = HashSet::new();
1433    while !ready.is_empty() {
1434        let t = ready.remove(0);
1435        out.push(t.clone());
1436        emitted.insert(t.clone());
1437        if let Some(kids) = children.get(&t).cloned() {
1438            for kid in kids {
1439                if let Some(ps) = parents.get_mut(&kid) {
1440                    ps.remove(&t);
1441                    if ps.is_empty() && !emitted.contains(&kid) && !ready.contains(&kid) {
1442                        let kid_idx = *order_index.get(&kid).unwrap_or(&usize::MAX);
1443                        let insert_at = ready
1444                            .iter()
1445                            .position(|r| *order_index.get(r).unwrap_or(&usize::MAX) > kid_idx)
1446                            .unwrap_or(ready.len());
1447                        ready.insert(insert_at, kid);
1448                    }
1449                }
1450            }
1451        }
1452    }
1453
1454    if out.len() == tables.len() {
1455        Ok(out)
1456    } else {
1457        let mut remaining: Vec<String> = tables
1458            .iter()
1459            .filter(|t| !emitted.contains(*t))
1460            .cloned()
1461            .collect();
1462        remaining.sort();
1463        Err(CycleError { remaining })
1464    }
1465}
1466
1467/// Copy every table from `src` to `dst` in FK-respecting order.
1468///
1469/// Discovers tables via [`discover_tables`], orders them via
1470/// [`topo_sort`] (or falls back to discovery order under
1471/// `opts.no_fk_check`), then loops [`copy_rows`] per table. Per-table
1472/// progress is printed to stderr in `[i/N] copying <table> (<rows> rows)…`
1473/// shape.
1474///
1475/// Returns the total number of rows passed through across all tables.
1476#[allow(clippy::too_many_arguments)]
1477pub fn copy_all_tables(
1478    src: &mut dyn Connection,
1479    src_backend: Backend,
1480    dst: &mut dyn Connection,
1481    dst_backend: Backend,
1482    opts: &AllTablesOptions,
1483) -> Result<usize, SqlError> {
1484    let tables = discover_tables(src, None, &opts.include, &opts.exclude)?;
1485    if tables.is_empty() {
1486        return Err(SqlError::QueryFailed(
1487            "copy --all-tables: no tables matched the include/exclude filters.".into(),
1488        ));
1489    }
1490
1491    let fks = src.list_foreign_keys(None)?;
1492    let ordered = match topo_sort(&tables, &fks) {
1493        Ok(o) => o,
1494        Err(cycle) if opts.no_fk_check => {
1495            eprintln!(
1496                "[ferrule] copy: FK cycle detected among {:?}; \
1497                 --no-fk-check is set, copying in discovery order.",
1498                cycle.remaining
1499            );
1500            tables.clone()
1501        }
1502        Err(cycle) => {
1503            return Err(SqlError::QueryFailed(format!(
1504                "copy --all-tables: foreign-key cycle prevents a strict load order \
1505                 (tables on the cycle: {:?}). Re-run with --no-fk-check to copy in \
1506                 discovery order; FK violations may then surface as driver errors.",
1507                cycle.remaining
1508            )));
1509        }
1510    };
1511
1512    let total_tables = ordered.len();
1513    let mut total_rows = 0usize;
1514    for (idx, table) in ordered.iter().enumerate() {
1515        if opts.verbose {
1516            eprintln!("[ferrule] [{}/{total_tables}] copying {table}…", idx + 1);
1517        }
1518        let per_table = CopyOptions {
1519            source: CopySource::Table(table.clone()),
1520            create_table: opts.create_table,
1521            preserve_pk: opts.preserve_pk,
1522            if_exists: opts.if_exists,
1523            conflict_key: opts.conflict_key.clone(),
1524            atomic: opts.atomic,
1525            batch_size: opts.batch_size,
1526            bulk_mode: opts.bulk_mode,
1527            copy_format: opts.copy_format,
1528            verbose: opts.verbose,
1529            progress: None,
1530        };
1531        let n = copy_rows(src, src_backend, dst, dst_backend, &per_table)?;
1532        if opts.verbose {
1533            eprintln!("[ferrule] [{}/{total_tables}] {table}: {n} rows", idx + 1);
1534        }
1535        total_rows += n;
1536    }
1537    Ok(total_rows)
1538}
1539
1540#[cfg(test)]
1541mod tests {
1542    use super::*;
1543    use crate::value::ColumnInfo;
1544
1545    fn col(name: &str, hint: TypeHint, nullable: bool) -> ColumnInfo {
1546        ColumnInfo {
1547            name: name.to_string(),
1548            type_hint: hint,
1549            nullable,
1550        }
1551    }
1552
1553    #[test]
1554    fn if_exists_parse_recognises_strategies() {
1555        assert_eq!(IfExists::parse("error"), Some(IfExists::Error));
1556        assert_eq!(IfExists::parse("APPEND"), Some(IfExists::Append));
1557        assert_eq!(IfExists::parse("Truncate"), Some(IfExists::Truncate));
1558        assert_eq!(IfExists::parse("skip"), Some(IfExists::Skip));
1559        assert_eq!(IfExists::parse("UPSERT"), Some(IfExists::Upsert));
1560        assert_eq!(IfExists::parse("merge"), None);
1561    }
1562
1563    #[test]
1564    fn if_exists_resolves_conflicts_only_for_skip_and_upsert() {
1565        assert!(!IfExists::Error.resolves_conflicts());
1566        assert!(!IfExists::Append.resolves_conflicts());
1567        assert!(!IfExists::Truncate.resolves_conflicts());
1568        assert!(IfExists::Skip.resolves_conflicts());
1569        assert!(IfExists::Upsert.resolves_conflicts());
1570    }
1571
1572    #[test]
1573    fn if_exists_default_is_non_destructive() {
1574        assert_eq!(IfExists::default(), IfExists::Error);
1575    }
1576
1577    #[test]
1578    fn bulk_mode_parse_recognises_modes() {
1579        assert_eq!(BulkMode::parse("off"), Some(BulkMode::Off));
1580        assert_eq!(BulkMode::parse("Auto"), Some(BulkMode::Auto));
1581        assert_eq!(BulkMode::parse("ON"), Some(BulkMode::On));
1582        assert_eq!(BulkMode::parse("native"), None);
1583    }
1584
1585    #[test]
1586    fn bulk_mode_default_is_off() {
1587        assert_eq!(BulkMode::default(), BulkMode::Off);
1588    }
1589
1590    #[cfg(feature = "sqlite")]
1591    #[test]
1592    fn quote_identifier_sqlite_uses_ansi_quotes() {
1593        assert_eq!(quote_identifier("users", Backend::Sqlite), "\"users\"");
1594        assert_eq!(quote_identifier("a\"b", Backend::Sqlite), "\"a\"\"b\"");
1595    }
1596
1597    #[cfg(feature = "mysql")]
1598    #[test]
1599    fn quote_identifier_mysql_uses_backticks() {
1600        assert_eq!(quote_identifier("users", Backend::MySql), "`users`");
1601        assert_eq!(quote_identifier("a`b", Backend::MySql), "`a``b`");
1602    }
1603
1604    // The three tests below were ported verbatim from
1605    // `ferrule-core/src/dump.rs::tests` when the local
1606    // `quote_identifier(id)` helper was removed in favour of routing
1607    // every dump-side identifier through the backend-aware
1608    // [`quote_identifier`] in this module.
1609
1610    #[cfg(feature = "sqlite")]
1611    #[test]
1612    fn quote_identifier_wraps_in_double_quotes() {
1613        assert_eq!(quote_identifier("users", Backend::Sqlite), "\"users\"");
1614    }
1615
1616    #[cfg(feature = "sqlite")]
1617    #[test]
1618    fn quote_identifier_escapes_embedded_quotes() {
1619        assert_eq!(quote_identifier("a\"b", Backend::Sqlite), "\"a\"\"b\"");
1620        assert_eq!(quote_identifier("\"\"", Backend::Sqlite), "\"\"\"\"\"\"");
1621    }
1622
1623    #[cfg(feature = "sqlite")]
1624    #[test]
1625    fn quote_identifier_preserves_other_chars() {
1626        assert_eq!(
1627            quote_identifier("col with space", Backend::Sqlite),
1628            "\"col with space\""
1629        );
1630        assert_eq!(
1631            quote_identifier("snake_case_99", Backend::Sqlite),
1632            "\"snake_case_99\""
1633        );
1634    }
1635
1636    #[cfg(feature = "postgres")]
1637    #[test]
1638    fn translate_type_postgres_maps_decimal_to_numeric() {
1639        assert_eq!(
1640            translate_type(TypeHint::Decimal, Backend::Postgres),
1641            "NUMERIC"
1642        );
1643        assert_eq!(
1644            translate_type(TypeHint::DateTimeTz, Backend::Postgres),
1645            "TIMESTAMPTZ"
1646        );
1647        assert_eq!(translate_type(TypeHint::Json, Backend::Postgres), "JSONB");
1648    }
1649
1650    #[cfg(feature = "sqlite")]
1651    #[test]
1652    fn translate_type_sqlite_collapses_to_storage_classes() {
1653        assert_eq!(translate_type(TypeHint::Bool, Backend::Sqlite), "INTEGER");
1654        assert_eq!(translate_type(TypeHint::Int64, Backend::Sqlite), "INTEGER");
1655        assert_eq!(translate_type(TypeHint::Float64, Backend::Sqlite), "REAL");
1656        assert_eq!(translate_type(TypeHint::Bytes, Backend::Sqlite), "BLOB");
1657        assert_eq!(translate_type(TypeHint::DateTime, Backend::Sqlite), "TEXT");
1658        assert_eq!(translate_type(TypeHint::Json, Backend::Sqlite), "TEXT");
1659    }
1660
1661    #[cfg(feature = "mssql")]
1662    #[test]
1663    fn translate_type_mssql_maps_string_to_nvarchar_max() {
1664        assert_eq!(
1665            translate_type(TypeHint::String, Backend::MsSql),
1666            "NVARCHAR(MAX)"
1667        );
1668        assert_eq!(
1669            translate_type(TypeHint::Uuid, Backend::MsSql),
1670            "UNIQUEIDENTIFIER"
1671        );
1672        assert_eq!(translate_type(TypeHint::Bool, Backend::MsSql), "BIT");
1673    }
1674
1675    #[cfg(all(feature = "postgres", feature = "sqlite"))]
1676    #[test]
1677    fn translate_ddl_postgres_to_sqlite() {
1678        let cols = vec![
1679            col("id", TypeHint::Int64, false),
1680            col("name", TypeHint::String, true),
1681            col("score", TypeHint::Float64, true),
1682            col("active", TypeHint::Bool, true),
1683            col("meta", TypeHint::Json, true),
1684        ];
1685        let ddl = translate_ddl("test_users", &cols, Backend::Sqlite, &[]);
1686        assert_eq!(
1687            ddl,
1688            "CREATE TABLE IF NOT EXISTS \"test_users\" (\
1689             \"id\" INTEGER NOT NULL, \
1690             \"name\" TEXT, \
1691             \"score\" REAL, \
1692             \"active\" INTEGER, \
1693             \"meta\" TEXT)"
1694        );
1695    }
1696
1697    /// `translate_ddl` with a non-empty `pk` slice appends a
1698    /// `PRIMARY KEY (...)` clause, identifier-quoted per the
1699    /// destination backend's rules. This is the `--preserve-pk` path.
1700    #[cfg(feature = "sqlite")]
1701    #[test]
1702    fn translate_ddl_with_preserve_pk_emits_primary_key_clause() {
1703        let cols = vec![
1704            col("id", TypeHint::Int64, false),
1705            col("name", TypeHint::String, true),
1706        ];
1707        let ddl = translate_ddl("users", &cols, Backend::Sqlite, &["id".to_string()]);
1708        assert_eq!(
1709            ddl,
1710            "CREATE TABLE IF NOT EXISTS \"users\" (\
1711             \"id\" INTEGER NOT NULL, \
1712             \"name\" TEXT, PRIMARY KEY (\"id\"))"
1713        );
1714    }
1715
1716    /// Composite primary key: every column in the supplied `pk` slice
1717    /// is identifier-quoted and emitted in order.
1718    #[cfg(feature = "sqlite")]
1719    #[test]
1720    fn translate_ddl_with_preserve_pk_emits_composite_primary_key() {
1721        let cols = vec![
1722            col("tenant", TypeHint::Int64, false),
1723            col("id", TypeHint::Int64, false),
1724            col("name", TypeHint::String, true),
1725        ];
1726        let ddl = translate_ddl(
1727            "users",
1728            &cols,
1729            Backend::Sqlite,
1730            &["tenant".to_string(), "id".to_string()],
1731        );
1732        assert!(
1733            ddl.ends_with("PRIMARY KEY (\"tenant\", \"id\"))"),
1734            "got: {ddl}"
1735        );
1736    }
1737
1738    #[cfg(all(feature = "mysql", feature = "mssql"))]
1739    #[test]
1740    fn translate_ddl_mysql_to_mssql_uses_correct_quoting_and_types() {
1741        let cols = vec![
1742            col("id", TypeHint::Int64, false),
1743            col("uid", TypeHint::Uuid, true),
1744            col("created_at", TypeHint::DateTimeTz, true),
1745        ];
1746        let ddl = translate_ddl("orders", &cols, Backend::MsSql, &[]);
1747        assert_eq!(
1748            ddl,
1749            "CREATE TABLE IF NOT EXISTS \"orders\" (\
1750             \"id\" BIGINT NOT NULL, \
1751             \"uid\" UNIQUEIDENTIFIER, \
1752             \"created_at\" DATETIMEOFFSET)"
1753        );
1754    }
1755
1756    fn row_int(n: i64) -> Vec<Value> {
1757        vec![Value::Int64(n)]
1758    }
1759
1760    /// Single-column `id` schema used by the legacy multi-row INSERT
1761    /// tests. Conflict-SQL tests build richer fixtures inline.
1762    fn cols_id_only() -> Vec<ColumnInfo> {
1763        vec![ColumnInfo {
1764            name: "id".to_string(),
1765            type_hint: TypeHint::Int64,
1766            nullable: false,
1767        }]
1768    }
1769
1770    #[test]
1771    fn build_insert_sql_empty_rows_returns_empty() {
1772        let cols = cols_id_only();
1773        let out = build_insert_sql(
1774            "\"t\"",
1775            "\"id\"",
1776            &[],
1777            default_backend_for_test(),
1778            &cols,
1779            IfExists::Append,
1780            &[],
1781        );
1782        assert!(out.is_empty());
1783    }
1784
1785    #[cfg(feature = "sqlite")]
1786    #[test]
1787    fn build_insert_sql_sqlite_emits_single_multi_row_insert() {
1788        let cols = cols_id_only();
1789        let rows = vec![row_int(1), row_int(2), row_int(3)];
1790        let out = build_insert_sql(
1791            "\"t\"",
1792            "\"id\"",
1793            &rows,
1794            Backend::Sqlite,
1795            &cols,
1796            IfExists::Append,
1797            &[],
1798        );
1799        assert_eq!(out.len(), 1);
1800        assert_eq!(out[0], "INSERT INTO \"t\" (\"id\") VALUES (1), (2), (3)");
1801    }
1802
1803    #[cfg(feature = "oracle")]
1804    #[test]
1805    fn build_insert_sql_oracle_emits_insert_all_with_select_from_dual() {
1806        let cols = cols_id_only();
1807        let rows = vec![row_int(1), row_int(2), row_int(3)];
1808        let out = build_insert_sql(
1809            "\"t\"",
1810            "\"id\"",
1811            &rows,
1812            Backend::Oracle,
1813            &cols,
1814            IfExists::Append,
1815            &[],
1816        );
1817        assert_eq!(out.len(), 1);
1818        assert_eq!(
1819            out[0],
1820            "INSERT ALL\
1821             \u{0020}INTO \"t\" (\"id\") VALUES (1)\
1822             \u{0020}INTO \"t\" (\"id\") VALUES (2)\
1823             \u{0020}INTO \"t\" (\"id\") VALUES (3)\
1824             \u{0020}SELECT 1 FROM DUAL"
1825        );
1826    }
1827
1828    #[cfg(feature = "mssql")]
1829    #[test]
1830    fn build_insert_sql_mssql_splits_above_1000_rows() {
1831        let cols = cols_id_only();
1832        let rows: Vec<Vec<Value>> = (0..2500).map(|i| row_int(i as i64)).collect();
1833        let out = build_insert_sql(
1834            "\"t\"",
1835            "\"id\"",
1836            &rows,
1837            Backend::MsSql,
1838            &cols,
1839            IfExists::Append,
1840            &[],
1841        );
1842        // 2500 rows / 1000 cap = 3 chunks (1000 / 1000 / 500).
1843        assert_eq!(out.len(), 3);
1844        // Each chunk should be a single INSERT statement.
1845        for sql in &out {
1846            assert!(sql.starts_with("INSERT INTO \"t\" (\"id\") VALUES "));
1847        }
1848        // Sanity check the row-counts via comma counts: chunk 0 should
1849        // have 999 commas separating 1000 row tuples.
1850        assert_eq!(out[0].matches("), (").count(), 999);
1851        assert_eq!(out[1].matches("), (").count(), 999);
1852        assert_eq!(out[2].matches("), (").count(), 499);
1853    }
1854
1855    #[cfg(feature = "oracle")]
1856    #[test]
1857    fn build_insert_sql_oracle_chunks_at_250_rows() {
1858        let cols = cols_id_only();
1859        let rows: Vec<Vec<Value>> = (0..600).map(|i| row_int(i as i64)).collect();
1860        let out = build_insert_sql(
1861            "\"t\"",
1862            "\"id\"",
1863            &rows,
1864            Backend::Oracle,
1865            &cols,
1866            IfExists::Append,
1867            &[],
1868        );
1869        // 600 / 250 = 3 chunks (250 / 250 / 100).
1870        assert_eq!(out.len(), 3);
1871        for sql in &out {
1872            assert!(sql.starts_with("INSERT ALL"));
1873            assert!(sql.ends_with(" SELECT 1 FROM DUAL"));
1874        }
1875        // Each "INTO ... VALUES" occurrence is exactly one row.
1876        assert_eq!(out[0].matches(" INTO ").count(), 250);
1877        assert_eq!(out[1].matches(" INTO ").count(), 250);
1878        assert_eq!(out[2].matches(" INTO ").count(), 100);
1879    }
1880
1881    // --- Phase 2 conflict-SQL codegen ----------------------------------
1882
1883    /// Two-column (id PK, name) row shape used by the conflict tests.
1884    fn cols_id_name() -> Vec<ColumnInfo> {
1885        vec![
1886            ColumnInfo {
1887                name: "id".to_string(),
1888                type_hint: TypeHint::Int64,
1889                nullable: false,
1890            },
1891            ColumnInfo {
1892                name: "name".to_string(),
1893                type_hint: TypeHint::String,
1894                nullable: true,
1895            },
1896        ]
1897    }
1898
1899    fn row_id_name(id: i64, name: &str) -> Vec<Value> {
1900        vec![Value::Int64(id), Value::String(name.to_string())]
1901    }
1902
1903    #[cfg(feature = "sqlite")]
1904    #[test]
1905    fn build_insert_sql_pg_skip_emits_on_conflict_do_nothing() {
1906        let cols = cols_id_name();
1907        let rows = vec![row_id_name(1, "a"), row_id_name(2, "b")];
1908        let pk = vec!["id".to_string()];
1909        // SQLite shares ON CONFLICT syntax with Postgres; assert on the
1910        // common branch using SQLite (default-feature) backend.
1911        let out = build_insert_sql(
1912            "\"t\"",
1913            "\"id\", \"name\"",
1914            &rows,
1915            Backend::Sqlite,
1916            &cols,
1917            IfExists::Skip,
1918            &pk,
1919        );
1920        assert_eq!(out.len(), 1);
1921        assert_eq!(
1922            out[0],
1923            "INSERT INTO \"t\" (\"id\", \"name\") VALUES (1, 'a'), (2, 'b') \
1924             ON CONFLICT (\"id\") DO NOTHING"
1925        );
1926    }
1927
1928    #[cfg(feature = "sqlite")]
1929    #[test]
1930    fn build_insert_sql_pg_upsert_emits_excluded_assignments() {
1931        let cols = cols_id_name();
1932        let rows = vec![row_id_name(1, "a"), row_id_name(2, "b")];
1933        let pk = vec!["id".to_string()];
1934        let out = build_insert_sql(
1935            "\"t\"",
1936            "\"id\", \"name\"",
1937            &rows,
1938            Backend::Sqlite,
1939            &cols,
1940            IfExists::Upsert,
1941            &pk,
1942        );
1943        assert_eq!(out.len(), 1);
1944        assert_eq!(
1945            out[0],
1946            "INSERT INTO \"t\" (\"id\", \"name\") VALUES (1, 'a'), (2, 'b') \
1947             ON CONFLICT (\"id\") DO UPDATE SET \"name\" = EXCLUDED.\"name\""
1948        );
1949    }
1950
1951    #[cfg(feature = "sqlite")]
1952    #[test]
1953    fn build_insert_sql_pg_upsert_pk_only_table_collapses_to_do_nothing() {
1954        // A PK-only table (no non-PK columns) has nothing to update;
1955        // Upsert collapses to ON CONFLICT DO NOTHING.
1956        let cols = vec![ColumnInfo {
1957            name: "id".to_string(),
1958            type_hint: TypeHint::Int64,
1959            nullable: false,
1960        }];
1961        let rows = vec![row_int(1), row_int(2)];
1962        let pk = vec!["id".to_string()];
1963        let out = build_insert_sql(
1964            "\"t\"",
1965            "\"id\"",
1966            &rows,
1967            Backend::Sqlite,
1968            &cols,
1969            IfExists::Upsert,
1970            &pk,
1971        );
1972        assert_eq!(out.len(), 1);
1973        assert!(out[0].ends_with("ON CONFLICT (\"id\") DO NOTHING"));
1974    }
1975
1976    #[cfg(feature = "sqlite")]
1977    #[test]
1978    fn build_insert_sql_pg_upsert_composite_pk_emits_full_pk_list() {
1979        let cols = vec![
1980            ColumnInfo {
1981                name: "a".to_string(),
1982                type_hint: TypeHint::Int64,
1983                nullable: false,
1984            },
1985            ColumnInfo {
1986                name: "b".to_string(),
1987                type_hint: TypeHint::Int64,
1988                nullable: false,
1989            },
1990            ColumnInfo {
1991                name: "v".to_string(),
1992                type_hint: TypeHint::String,
1993                nullable: true,
1994            },
1995        ];
1996        let rows = vec![vec![
1997            Value::Int64(1),
1998            Value::Int64(2),
1999            Value::String("x".into()),
2000        ]];
2001        let pk = vec!["a".to_string(), "b".to_string()];
2002        let out = build_insert_sql(
2003            "\"t\"",
2004            "\"a\", \"b\", \"v\"",
2005            &rows,
2006            Backend::Sqlite,
2007            &cols,
2008            IfExists::Upsert,
2009            &pk,
2010        );
2011        assert_eq!(out.len(), 1);
2012        assert!(out[0].contains("ON CONFLICT (\"a\", \"b\") DO UPDATE SET \"v\" = EXCLUDED.\"v\""));
2013        // Crucially: PK columns ('a', 'b') must NOT appear in the SET list.
2014        assert!(!out[0].contains("\"a\" = EXCLUDED.\"a\""));
2015        assert!(!out[0].contains("\"b\" = EXCLUDED.\"b\""));
2016    }
2017
2018    #[cfg(feature = "mysql")]
2019    #[test]
2020    fn build_insert_sql_mysql_skip_emits_insert_ignore() {
2021        let cols = cols_id_name();
2022        let rows = vec![row_id_name(1, "a")];
2023        let pk = vec!["id".to_string()];
2024        let out = build_insert_sql(
2025            "`t`",
2026            "`id`, `name`",
2027            &rows,
2028            Backend::MySql,
2029            &cols,
2030            IfExists::Skip,
2031            &pk,
2032        );
2033        assert_eq!(out.len(), 1);
2034        assert!(out[0].starts_with("INSERT IGNORE INTO `t`"));
2035    }
2036
2037    #[cfg(feature = "mysql")]
2038    #[test]
2039    fn build_insert_sql_mysql_upsert_emits_on_duplicate_key_update() {
2040        let cols = cols_id_name();
2041        let rows = vec![row_id_name(1, "a")];
2042        let pk = vec!["id".to_string()];
2043        let out = build_insert_sql(
2044            "`t`",
2045            "`id`, `name`",
2046            &rows,
2047            Backend::MySql,
2048            &cols,
2049            IfExists::Upsert,
2050            &pk,
2051        );
2052        assert_eq!(out.len(), 1);
2053        assert!(out[0].contains("ON DUPLICATE KEY UPDATE `name` = VALUES(`name`)"));
2054    }
2055
2056    #[cfg(feature = "mssql")]
2057    #[test]
2058    fn build_insert_sql_mssql_skip_emits_merge_when_not_matched() {
2059        let cols = cols_id_name();
2060        let rows = vec![row_id_name(1, "a")];
2061        let pk = vec!["id".to_string()];
2062        let out = build_insert_sql(
2063            "\"t\"",
2064            "\"id\", \"name\"",
2065            &rows,
2066            Backend::MsSql,
2067            &cols,
2068            IfExists::Skip,
2069            &pk,
2070        );
2071        assert_eq!(out.len(), 1);
2072        let sql = &out[0];
2073        assert!(sql.starts_with("MERGE INTO \"t\" AS dst USING (VALUES "));
2074        assert!(sql.contains("ON dst.\"id\" = src.\"id\""));
2075        assert!(sql.contains("WHEN NOT MATCHED THEN INSERT"));
2076        // Skip means no UPDATE branch.
2077        assert!(!sql.contains("WHEN MATCHED"));
2078        assert!(sql.ends_with(';'));
2079    }
2080
2081    #[cfg(feature = "mssql")]
2082    #[test]
2083    fn build_insert_sql_mssql_upsert_emits_full_merge() {
2084        let cols = cols_id_name();
2085        let rows = vec![row_id_name(1, "a")];
2086        let pk = vec!["id".to_string()];
2087        let out = build_insert_sql(
2088            "\"t\"",
2089            "\"id\", \"name\"",
2090            &rows,
2091            Backend::MsSql,
2092            &cols,
2093            IfExists::Upsert,
2094            &pk,
2095        );
2096        assert_eq!(out.len(), 1);
2097        let sql = &out[0];
2098        assert!(sql.contains("WHEN MATCHED THEN UPDATE SET \"name\" = src.\"name\""));
2099        assert!(sql.contains("WHEN NOT MATCHED THEN INSERT"));
2100    }
2101
2102    #[cfg(feature = "oracle")]
2103    #[test]
2104    fn build_insert_sql_oracle_skip_emits_merge_with_select_dual_source() {
2105        let cols = cols_id_name();
2106        let rows = vec![row_id_name(1, "a"), row_id_name(2, "b")];
2107        let pk = vec!["id".to_string()];
2108        let out = build_insert_sql(
2109            "\"t\"",
2110            "\"id\", \"name\"",
2111            &rows,
2112            Backend::Oracle,
2113            &cols,
2114            IfExists::Skip,
2115            &pk,
2116        );
2117        assert_eq!(out.len(), 1);
2118        let sql = &out[0];
2119        assert!(sql.starts_with("MERGE INTO \"t\" dst USING ("));
2120        assert!(sql.contains("SELECT 1 AS \"id\", 'a' AS \"name\" FROM dual"));
2121        assert!(sql.contains(" UNION ALL "));
2122        assert!(sql.contains("ON (dst.\"id\" = src.\"id\")"));
2123        assert!(sql.contains("WHEN NOT MATCHED THEN INSERT"));
2124        assert!(!sql.contains("WHEN MATCHED"));
2125    }
2126
2127    #[cfg(feature = "oracle")]
2128    #[test]
2129    fn build_insert_sql_oracle_upsert_includes_update_branch() {
2130        let cols = cols_id_name();
2131        let rows = vec![row_id_name(1, "a")];
2132        let pk = vec!["id".to_string()];
2133        let out = build_insert_sql(
2134            "\"t\"",
2135            "\"id\", \"name\"",
2136            &rows,
2137            Backend::Oracle,
2138            &cols,
2139            IfExists::Upsert,
2140            &pk,
2141        );
2142        let sql = &out[0];
2143        assert!(sql.contains("WHEN MATCHED THEN UPDATE SET dst.\"name\" = src.\"name\""));
2144        assert!(sql.contains("WHEN NOT MATCHED THEN INSERT"));
2145    }
2146
2147    #[cfg(feature = "sqlite")]
2148    fn default_backend_for_test() -> Backend {
2149        Backend::Sqlite
2150    }
2151    #[cfg(all(not(feature = "sqlite"), feature = "postgres"))]
2152    fn default_backend_for_test() -> Backend {
2153        Backend::Postgres
2154    }
2155    #[cfg(all(not(feature = "sqlite"), not(feature = "postgres"), feature = "mysql"))]
2156    fn default_backend_for_test() -> Backend {
2157        Backend::MySql
2158    }
2159    #[cfg(all(
2160        not(feature = "sqlite"),
2161        not(feature = "postgres"),
2162        not(feature = "mysql"),
2163        feature = "mssql"
2164    ))]
2165    fn default_backend_for_test() -> Backend {
2166        Backend::MsSql
2167    }
2168    #[cfg(all(
2169        not(feature = "sqlite"),
2170        not(feature = "postgres"),
2171        not(feature = "mysql"),
2172        not(feature = "mssql"),
2173        feature = "oracle"
2174    ))]
2175    fn default_backend_for_test() -> Backend {
2176        Backend::Oracle
2177    }
2178
2179    #[cfg(feature = "sqlite")]
2180    #[test]
2181    fn copy_sqlite_to_sqlite_round_trip() {
2182        use crate::connection::ConnectOptions;
2183        use crate::url::DatabaseUrl;
2184        use std::sync::atomic::{AtomicU64, Ordering};
2185
2186        static N: AtomicU64 = AtomicU64::new(0);
2187        let pid = std::process::id();
2188        let n_a = N.fetch_add(1, Ordering::SeqCst);
2189        let n_b = N.fetch_add(1, Ordering::SeqCst);
2190        let path_a = std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_a}-src.db"));
2191        let path_b = std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_b}-dst.db"));
2192        let _ = std::fs::remove_file(&path_a);
2193        let _ = std::fs::remove_file(&path_b);
2194
2195        let url_a = DatabaseUrl::parse(&format!("sqlite://{}", path_a.display())).unwrap();
2196        let url_b = DatabaseUrl::parse(&format!("sqlite://{}", path_b.display())).unwrap();
2197        let mut src = crate::connect(&url_a, &ConnectOptions::default(), None).unwrap();
2198        let mut dst = crate::connect(&url_b, &ConnectOptions::default(), None).unwrap();
2199
2200        src.execute(
2201            "CREATE TABLE test_users (id INTEGER, name TEXT, age INTEGER, score REAL, active INTEGER)",
2202        )
2203        .unwrap();
2204        src.execute("INSERT INTO test_users VALUES (1, 'Alice', 30, 99.5, 1)")
2205            .unwrap();
2206        src.execute("INSERT INTO test_users VALUES (2, 'Bob', 25, 88.25, 0)")
2207            .unwrap();
2208        src.execute("INSERT INTO test_users VALUES (3, 'Carol', 40, NULL, 1)")
2209            .unwrap();
2210
2211        let opts = CopyOptions {
2212            source: CopySource::Table("test_users".into()),
2213            create_table: true,
2214            preserve_pk: false,
2215            if_exists: IfExists::Error,
2216            conflict_key: Vec::new(),
2217            atomic: false,
2218            batch_size: 2,
2219            bulk_mode: BulkMode::Off,
2220            copy_format: CopyFormat::Text,
2221            verbose: false,
2222            progress: None,
2223        };
2224        let copied = copy_rows(&mut src, Backend::Sqlite, &mut dst, Backend::Sqlite, &opts)
2225            .expect("copy_rows");
2226        assert_eq!(copied, 3);
2227
2228        let out = dst
2229            .query("SELECT id, name, age, score, active FROM test_users ORDER BY id")
2230            .unwrap();
2231        assert_eq!(out.rows.len(), 3);
2232        assert!(matches!(&out.rows[0][1], Value::String(s) if s == "Alice"));
2233        assert!(matches!(&out.rows[2][3], Value::Null));
2234
2235        let _ = std::fs::remove_file(&path_a);
2236        let _ = std::fs::remove_file(&path_b);
2237    }
2238
2239    #[cfg(feature = "sqlite")]
2240    #[test]
2241    fn copy_refuses_when_target_non_empty_with_default_strategy() {
2242        use crate::connection::ConnectOptions;
2243        use crate::url::DatabaseUrl;
2244        use std::sync::atomic::{AtomicU64, Ordering};
2245
2246        static N: AtomicU64 = AtomicU64::new(0);
2247        let pid = std::process::id();
2248        let n = N.fetch_add(1, Ordering::SeqCst);
2249        let path = std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n}-conflict.db"));
2250        let _ = std::fs::remove_file(&path);
2251
2252        let url = DatabaseUrl::parse(&format!("sqlite://{}", path.display())).unwrap();
2253        let mut src = crate::connect(&url, &ConnectOptions::default(), None).unwrap();
2254        // Open a second connection (sqlite — file path is what matters).
2255        let mut dst = crate::connect(&url, &ConnectOptions::default(), None).unwrap();
2256
2257        src.execute("CREATE TABLE t (id INTEGER, name TEXT)")
2258            .unwrap();
2259        src.execute("INSERT INTO t VALUES (1, 'existing')").unwrap();
2260
2261        let opts = CopyOptions {
2262            source: CopySource::Table("t".into()),
2263            ..Default::default()
2264        };
2265        // Same DB on both sides — target table 't' exists with one row.
2266        let result = copy_rows(&mut src, Backend::Sqlite, &mut dst, Backend::Sqlite, &opts);
2267        let err = result.expect_err("copy should refuse non-empty target by default");
2268        let msg = err.to_string();
2269        assert!(
2270            msg.contains("already contains rows") && msg.contains("--if-exists"),
2271            "unhelpful error message: {msg}"
2272        );
2273
2274        let _ = std::fs::remove_file(&path);
2275    }
2276
2277    #[cfg(feature = "sqlite")]
2278    #[test]
2279    fn copy_truncate_replaces_existing_rows() {
2280        use crate::connection::ConnectOptions;
2281        use crate::url::DatabaseUrl;
2282        use std::sync::atomic::{AtomicU64, Ordering};
2283
2284        static N: AtomicU64 = AtomicU64::new(0);
2285        let pid = std::process::id();
2286        let n_a = N.fetch_add(1, Ordering::SeqCst);
2287        let n_b = N.fetch_add(1, Ordering::SeqCst);
2288        let path_a =
2289            std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_a}-trunc-src.db"));
2290        let path_b =
2291            std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_b}-trunc-dst.db"));
2292        let _ = std::fs::remove_file(&path_a);
2293        let _ = std::fs::remove_file(&path_b);
2294
2295        let url_a = DatabaseUrl::parse(&format!("sqlite://{}", path_a.display())).unwrap();
2296        let url_b = DatabaseUrl::parse(&format!("sqlite://{}", path_b.display())).unwrap();
2297        let mut src = crate::connect(&url_a, &ConnectOptions::default(), None).unwrap();
2298        let mut dst = crate::connect(&url_b, &ConnectOptions::default(), None).unwrap();
2299
2300        src.execute("CREATE TABLE t (id INTEGER, name TEXT)")
2301            .unwrap();
2302        dst.execute("CREATE TABLE t (id INTEGER, name TEXT)")
2303            .unwrap();
2304        dst.execute("INSERT INTO t VALUES (99, 'stale')").unwrap();
2305        src.execute("INSERT INTO t VALUES (1, 'fresh-1')").unwrap();
2306        src.execute("INSERT INTO t VALUES (2, 'fresh-2')").unwrap();
2307
2308        let opts = CopyOptions {
2309            source: CopySource::Table("t".into()),
2310            if_exists: IfExists::Truncate,
2311            ..Default::default()
2312        };
2313        let copied = copy_rows(&mut src, Backend::Sqlite, &mut dst, Backend::Sqlite, &opts)
2314            .expect("copy_rows");
2315        assert_eq!(copied, 2);
2316
2317        let out = dst.query("SELECT id, name FROM t ORDER BY id").unwrap();
2318        assert_eq!(out.rows.len(), 2);
2319        assert!(matches!(&out.rows[0][1], Value::String(s) if s == "fresh-1"));
2320        assert!(matches!(&out.rows[1][1], Value::String(s) if s == "fresh-2"));
2321
2322        let _ = std::fs::remove_file(&path_a);
2323        let _ = std::fs::remove_file(&path_b);
2324    }
2325
2326    #[cfg(feature = "sqlite")]
2327    #[test]
2328    fn copy_query_with_into_and_create_table() {
2329        use crate::connection::ConnectOptions;
2330        use crate::url::DatabaseUrl;
2331        use std::sync::atomic::{AtomicU64, Ordering};
2332
2333        static N: AtomicU64 = AtomicU64::new(0);
2334        let pid = std::process::id();
2335        let n_a = N.fetch_add(1, Ordering::SeqCst);
2336        let n_b = N.fetch_add(1, Ordering::SeqCst);
2337        let path_a = std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_a}-q-src.db"));
2338        let path_b = std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_b}-q-dst.db"));
2339        let _ = std::fs::remove_file(&path_a);
2340        let _ = std::fs::remove_file(&path_b);
2341
2342        let url_a = DatabaseUrl::parse(&format!("sqlite://{}", path_a.display())).unwrap();
2343        let url_b = DatabaseUrl::parse(&format!("sqlite://{}", path_b.display())).unwrap();
2344        let mut src = crate::connect(&url_a, &ConnectOptions::default(), None).unwrap();
2345        let mut dst = crate::connect(&url_b, &ConnectOptions::default(), None).unwrap();
2346
2347        src.execute("CREATE TABLE users (id INTEGER, name TEXT, age INTEGER, active INTEGER)")
2348            .unwrap();
2349        src.execute("INSERT INTO users VALUES (1, 'Alice', 30, 1)")
2350            .unwrap();
2351        src.execute("INSERT INTO users VALUES (2, 'Bob', 25, 0)")
2352            .unwrap();
2353        src.execute("INSERT INTO users VALUES (3, 'Carol', 40, 1)")
2354            .unwrap();
2355
2356        let opts = CopyOptions {
2357            source: CopySource::Query {
2358                sql: "SELECT id, name FROM users WHERE active = 1".into(),
2359                into: "active_users".into(),
2360            },
2361            create_table: true,
2362            ..Default::default()
2363        };
2364        let copied = copy_rows(&mut src, Backend::Sqlite, &mut dst, Backend::Sqlite, &opts)
2365            .expect("copy_rows");
2366        assert_eq!(copied, 2);
2367
2368        let out = dst
2369            .query("SELECT id, name FROM active_users ORDER BY id")
2370            .unwrap();
2371        assert_eq!(out.rows.len(), 2);
2372        assert!(matches!(&out.rows[0][1], Value::String(s) if s == "Alice"));
2373        assert!(matches!(&out.rows[1][1], Value::String(s) if s == "Carol"));
2374
2375        let _ = std::fs::remove_file(&path_a);
2376        let _ = std::fs::remove_file(&path_b);
2377    }
2378
2379    /// Dispatcher harness: wraps a real [`Connection`] but intercepts
2380    /// `bulk_insert_rows` so individual tests can observe how the
2381    /// `copy_rows` dispatcher routes batches per [`BulkMode`].
2382    #[cfg(feature = "sqlite")]
2383    mod dispatcher_harness {
2384        use crate::connection::{
2385            BulkInsert, Connection, ExecutionSummary, QueryResult, StatementResult,
2386        };
2387        use crate::error::SqlError;
2388        use async_trait::async_trait;
2389        use std::sync::Arc;
2390        use std::sync::atomic::{AtomicUsize, Ordering};
2391
2392        /// What `bulk_insert_rows` should do on the destination wrapper.
2393        pub enum BulkBehaviour {
2394            /// Test asserts the bulk path is never invoked.
2395            PanicIfCalled,
2396            /// Always return `BulkUnavailable`.
2397            AlwaysUnavailable,
2398        }
2399
2400        pub struct TrackingDst {
2401            pub inner: Box<dyn Connection>,
2402            pub bulk_calls: Arc<AtomicUsize>,
2403            pub behaviour: BulkBehaviour,
2404        }
2405
2406        impl Connection for TrackingDst {
2407            fn execute(&mut self, sql: &str) -> Result<ExecutionSummary, SqlError> {
2408                self.inner.execute(sql)
2409            }
2410            fn query(&mut self, sql: &str) -> Result<QueryResult, SqlError> {
2411                self.inner.query(sql)
2412            }
2413            fn query_cursor(
2414                &mut self,
2415                sql: &str,
2416            ) -> Result<crate::stream::RowCursor<'_>, SqlError> {
2417                self.inner.query_cursor(sql)
2418            }
2419            fn execute_multi(&mut self, sql: &str) -> Result<Vec<StatementResult>, SqlError> {
2420                self.inner.execute_multi(sql)
2421            }
2422            fn ping(&mut self) -> Result<(), SqlError> {
2423                self.inner.ping()
2424            }
2425            fn list_tables(&mut self, schema: Option<&str>) -> Result<Vec<String>, SqlError> {
2426                self.inner.list_tables(schema)
2427            }
2428            fn list_schemas(&mut self) -> Result<Vec<crate::connection::SchemaInfo>, SqlError> {
2429                self.inner.list_schemas()
2430            }
2431            fn describe_table(
2432                &mut self,
2433                schema: Option<&str>,
2434                table: &str,
2435            ) -> Result<QueryResult, SqlError> {
2436                self.inner.describe_table(schema, table)
2437            }
2438            fn primary_key(
2439                &mut self,
2440                schema: Option<&str>,
2441                table: &str,
2442            ) -> Result<Vec<String>, SqlError> {
2443                self.inner.primary_key(schema, table)
2444            }
2445            fn list_foreign_keys(
2446                &mut self,
2447                schema: Option<&str>,
2448            ) -> Result<Vec<crate::ForeignKey>, SqlError> {
2449                self.inner.list_foreign_keys(schema)
2450            }
2451            fn bulk_insert_rows(&mut self, _target: BulkInsert<'_>) -> Result<usize, SqlError> {
2452                self.bulk_calls.fetch_add(1, Ordering::SeqCst);
2453                match self.behaviour {
2454                    BulkBehaviour::PanicIfCalled => {
2455                        panic!("bulk_insert_rows was invoked under BulkMode::Off");
2456                    }
2457                    BulkBehaviour::AlwaysUnavailable => Err(SqlError::BulkUnavailable(
2458                        "test wrapper: bulk path forced unavailable".into(),
2459                    )),
2460                }
2461            }
2462        }
2463    }
2464
2465    #[cfg(feature = "sqlite")]
2466    fn seed_pair_for_dispatcher_test(
2467        tag: &str,
2468    ) -> (
2469        Box<dyn crate::connection::Connection>,
2470        Box<dyn crate::connection::Connection>,
2471        std::path::PathBuf,
2472        std::path::PathBuf,
2473    ) {
2474        use crate::connection::ConnectOptions;
2475        use crate::url::DatabaseUrl;
2476        use std::sync::atomic::{AtomicU64, Ordering};
2477
2478        static N: AtomicU64 = AtomicU64::new(0);
2479        let pid = std::process::id();
2480        let n_a = N.fetch_add(1, Ordering::SeqCst);
2481        let n_b = N.fetch_add(1, Ordering::SeqCst);
2482        let path_a =
2483            std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_a}-{tag}-src.db"));
2484        let path_b =
2485            std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_b}-{tag}-dst.db"));
2486        let _ = std::fs::remove_file(&path_a);
2487        let _ = std::fs::remove_file(&path_b);
2488
2489        let url_a = DatabaseUrl::parse(&format!("sqlite://{}", path_a.display())).unwrap();
2490        let url_b = DatabaseUrl::parse(&format!("sqlite://{}", path_b.display())).unwrap();
2491        let mut src = crate::connect(&url_a, &ConnectOptions::default(), None).unwrap();
2492        let dst = crate::connect(&url_b, &ConnectOptions::default(), None).unwrap();
2493
2494        src.execute("CREATE TABLE t (id INTEGER, name TEXT)")
2495            .unwrap();
2496        src.execute("INSERT INTO t VALUES (1, 'a')").unwrap();
2497        src.execute("INSERT INTO t VALUES (2, 'b')").unwrap();
2498        src.execute("INSERT INTO t VALUES (3, 'c')").unwrap();
2499
2500        (src, dst, path_a, path_b)
2501    }
2502
2503    /// Off mode must never call the destination's bulk path. The
2504    /// wrapper panics if it does.
2505    #[cfg(feature = "sqlite")]
2506    #[test]
2507    fn dispatcher_off_never_invokes_bulk_path() {
2508        use dispatcher_harness::{BulkBehaviour, TrackingDst};
2509        use std::sync::atomic::{AtomicUsize, Ordering};
2510
2511        let (src, dst_inner, path_a, path_b) = seed_pair_for_dispatcher_test("off");
2512        let bulk_calls = std::sync::Arc::new(AtomicUsize::new(0));
2513        let mut src = src;
2514        let mut dst = TrackingDst {
2515            inner: dst_inner,
2516            bulk_calls: bulk_calls.clone(),
2517            behaviour: BulkBehaviour::PanicIfCalled,
2518        };
2519
2520        let opts = CopyOptions {
2521            source: CopySource::Table("t".into()),
2522            create_table: true,
2523            bulk_mode: BulkMode::Off,
2524            ..Default::default()
2525        };
2526        let copied = copy_rows(
2527            src.as_mut(),
2528            Backend::Sqlite,
2529            &mut dst,
2530            Backend::Sqlite,
2531            &opts,
2532        )
2533        .expect("copy_rows");
2534        assert_eq!(copied, 3);
2535        assert_eq!(bulk_calls.load(Ordering::SeqCst), 0);
2536
2537        let _ = std::fs::remove_file(&path_a);
2538        let _ = std::fs::remove_file(&path_b);
2539    }
2540
2541    /// Auto mode tries the bulk path; on BulkUnavailable it falls
2542    /// back per batch and the rows still land via INSERT.
2543    #[cfg(feature = "sqlite")]
2544    #[test]
2545    fn dispatcher_auto_falls_back_on_bulk_unavailable() {
2546        use dispatcher_harness::{BulkBehaviour, TrackingDst};
2547        use std::sync::atomic::{AtomicUsize, Ordering};
2548
2549        let (src, dst_inner, path_a, path_b) = seed_pair_for_dispatcher_test("auto");
2550        let bulk_calls = std::sync::Arc::new(AtomicUsize::new(0));
2551        let mut src = src;
2552        let mut dst = TrackingDst {
2553            inner: dst_inner,
2554            bulk_calls: bulk_calls.clone(),
2555            behaviour: BulkBehaviour::AlwaysUnavailable,
2556        };
2557
2558        // batch_size=2 against 3 source rows means: 1 prologue
2559        // (2 rows) + 1 streaming batch (1 row) = 2 dispatcher calls.
2560        let opts = CopyOptions {
2561            source: CopySource::Table("t".into()),
2562            create_table: true,
2563            batch_size: 2,
2564            bulk_mode: BulkMode::Auto,
2565            ..Default::default()
2566        };
2567        let copied = copy_rows(
2568            src.as_mut(),
2569            Backend::Sqlite,
2570            &mut dst,
2571            Backend::Sqlite,
2572            &opts,
2573        )
2574        .expect("copy_rows");
2575        assert_eq!(copied, 3);
2576        // Both batches attempted the bulk path before falling back.
2577        assert_eq!(bulk_calls.load(Ordering::SeqCst), 2);
2578        // Rows landed via the generic INSERT path.
2579        let out = dst
2580            .inner
2581            .query("SELECT id, name FROM t ORDER BY id")
2582            .unwrap();
2583        assert_eq!(out.rows.len(), 3);
2584
2585        let _ = std::fs::remove_file(&path_a);
2586        let _ = std::fs::remove_file(&path_b);
2587    }
2588
2589    /// On mode does not fall back: BulkUnavailable becomes a hard
2590    /// error with `--bulk-native` mentioned in the message.
2591    #[cfg(feature = "sqlite")]
2592    #[test]
2593    fn dispatcher_on_errors_when_bulk_unavailable() {
2594        use dispatcher_harness::{BulkBehaviour, TrackingDst};
2595        use std::sync::atomic::{AtomicUsize, Ordering};
2596
2597        let (src, dst_inner, path_a, path_b) = seed_pair_for_dispatcher_test("on");
2598        let bulk_calls = std::sync::Arc::new(AtomicUsize::new(0));
2599        let mut src = src;
2600        let mut dst = TrackingDst {
2601            inner: dst_inner,
2602            bulk_calls: bulk_calls.clone(),
2603            behaviour: BulkBehaviour::AlwaysUnavailable,
2604        };
2605
2606        let opts = CopyOptions {
2607            source: CopySource::Table("t".into()),
2608            create_table: true,
2609            bulk_mode: BulkMode::On,
2610            ..Default::default()
2611        };
2612        let result = copy_rows(
2613            src.as_mut(),
2614            Backend::Sqlite,
2615            &mut dst,
2616            Backend::Sqlite,
2617            &opts,
2618        );
2619        let err = result.expect_err("copy should fail when bulk path unavailable in On mode");
2620        let msg = err.to_string();
2621        assert!(
2622            msg.contains("--bulk-native"),
2623            "error should mention --bulk-native: {msg}"
2624        );
2625        // Exactly one bulk attempt before the hard error.
2626        assert_eq!(bulk_calls.load(Ordering::SeqCst), 1);
2627
2628        let _ = std::fs::remove_file(&path_a);
2629        let _ = std::fs::remove_file(&path_b);
2630    }
2631
2632    /// SQLite end-to-end for `--if-exists skip`: rows whose PK already
2633    /// exists are silently dropped; new rows land; existing values on
2634    /// conflicting rows are preserved.
2635    #[cfg(feature = "sqlite")]
2636    #[test]
2637    fn copy_skip_preserves_existing_rows() {
2638        use crate::connection::ConnectOptions;
2639        use crate::url::DatabaseUrl;
2640        use std::sync::atomic::{AtomicU64, Ordering};
2641
2642        static N: AtomicU64 = AtomicU64::new(0);
2643        let pid = std::process::id();
2644        let n_a = N.fetch_add(1, Ordering::SeqCst);
2645        let n_b = N.fetch_add(1, Ordering::SeqCst);
2646        let path_a =
2647            std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_a}-skip-src.db"));
2648        let path_b =
2649            std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_b}-skip-dst.db"));
2650        let _ = std::fs::remove_file(&path_a);
2651        let _ = std::fs::remove_file(&path_b);
2652
2653        let url_a = DatabaseUrl::parse(&format!("sqlite://{}", path_a.display())).unwrap();
2654        let url_b = DatabaseUrl::parse(&format!("sqlite://{}", path_b.display())).unwrap();
2655        let mut src = crate::connect(&url_a, &ConnectOptions::default(), None).unwrap();
2656        let mut dst = crate::connect(&url_b, &ConnectOptions::default(), None).unwrap();
2657
2658        src.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT)")
2659            .unwrap();
2660        dst.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT)")
2661            .unwrap();
2662        // Destination has id=1 with the "old" value; copy will see
2663        // id=1 'new-1' from src and id=2 'src-only' (no dest match).
2664        dst.execute("INSERT INTO t VALUES (1, 'kept')").unwrap();
2665        src.execute("INSERT INTO t VALUES (1, 'new-1')").unwrap();
2666        src.execute("INSERT INTO t VALUES (2, 'src-only')").unwrap();
2667
2668        let opts = CopyOptions {
2669            source: CopySource::Table("t".into()),
2670            if_exists: IfExists::Skip,
2671            ..Default::default()
2672        };
2673        let copied = copy_rows(&mut src, Backend::Sqlite, &mut dst, Backend::Sqlite, &opts)
2674            .expect("copy_rows");
2675        // `copied` reports rows passed through the dispatcher, not
2676        // rows landed. The destination is the source of truth for
2677        // the visible effect.
2678        assert_eq!(copied, 2);
2679
2680        let out = dst.query("SELECT id, name FROM t ORDER BY id").unwrap();
2681        assert_eq!(out.rows.len(), 2);
2682        // id=1 keeps the original 'kept' value (skip), id=2 inserted.
2683        assert!(matches!(&out.rows[0][1], Value::String(s) if s == "kept"));
2684        assert!(matches!(&out.rows[1][1], Value::String(s) if s == "src-only"));
2685
2686        let _ = std::fs::remove_file(&path_a);
2687        let _ = std::fs::remove_file(&path_b);
2688    }
2689
2690    /// SQLite end-to-end for `--create-table --preserve-pk`: the
2691    /// destination DDL carries a `PRIMARY KEY (...)` clause derived
2692    /// from the source table's declared PK, so the freshly created
2693    /// destination is immediately usable as an `--if-exists upsert`
2694    /// target without a separate `--key` override.
2695    #[cfg(feature = "sqlite")]
2696    #[test]
2697    fn copy_create_table_preserve_pk_emits_primary_key() {
2698        use crate::connection::ConnectOptions;
2699        use crate::url::DatabaseUrl;
2700        use std::sync::atomic::{AtomicU64, Ordering};
2701
2702        static N: AtomicU64 = AtomicU64::new(0);
2703        let pid = std::process::id();
2704        let n_a = N.fetch_add(1, Ordering::SeqCst);
2705        let n_b = N.fetch_add(1, Ordering::SeqCst);
2706        let path_a = std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_a}-pp-src.db"));
2707        let path_b = std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_b}-pp-dst.db"));
2708        let _ = std::fs::remove_file(&path_a);
2709        let _ = std::fs::remove_file(&path_b);
2710
2711        let url_a = DatabaseUrl::parse(&format!("sqlite://{}", path_a.display())).unwrap();
2712        let url_b = DatabaseUrl::parse(&format!("sqlite://{}", path_b.display())).unwrap();
2713        let mut src = crate::connect(&url_a, &ConnectOptions::default(), None).unwrap();
2714        let mut dst = crate::connect(&url_b, &ConnectOptions::default(), None).unwrap();
2715
2716        src.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT)")
2717            .unwrap();
2718        src.execute("INSERT INTO t VALUES (1, 'a')").unwrap();
2719        src.execute("INSERT INTO t VALUES (2, 'b')").unwrap();
2720
2721        let opts = CopyOptions {
2722            source: CopySource::Table("t".into()),
2723            create_table: true,
2724            preserve_pk: true,
2725            ..Default::default()
2726        };
2727        let copied = copy_rows(&mut src, Backend::Sqlite, &mut dst, Backend::Sqlite, &opts)
2728            .expect("copy_rows");
2729        assert_eq!(copied, 2);
2730
2731        // Destination has an `id` PK we can immediately upsert against.
2732        let pk = dst.primary_key(None, "t").unwrap();
2733        assert_eq!(pk, vec!["id".to_string()]);
2734
2735        // Round-trip: upsert a changed source row, expect the destination row to overwrite.
2736        src.execute("UPDATE t SET name = 'a-upd' WHERE id = 1")
2737            .unwrap();
2738        let upsert_opts = CopyOptions {
2739            source: CopySource::Table("t".into()),
2740            if_exists: IfExists::Upsert,
2741            ..Default::default()
2742        };
2743        copy_rows(
2744            &mut src,
2745            Backend::Sqlite,
2746            &mut dst,
2747            Backend::Sqlite,
2748            &upsert_opts,
2749        )
2750        .expect("upsert");
2751        let out = dst.query("SELECT name FROM t WHERE id = 1").unwrap();
2752        assert!(matches!(&out.rows[0][0], Value::String(s) if s == "a-upd"));
2753
2754        let _ = std::fs::remove_file(&path_a);
2755        let _ = std::fs::remove_file(&path_b);
2756    }
2757
2758    /// `--preserve-pk` with a source table that has no declared PK
2759    /// falls through to the v1 column-only DDL (best-effort, not
2760    /// gated). The copy still completes successfully.
2761    #[cfg(feature = "sqlite")]
2762    #[test]
2763    fn copy_create_table_preserve_pk_falls_through_when_source_lacks_pk() {
2764        use crate::connection::ConnectOptions;
2765        use crate::url::DatabaseUrl;
2766        use std::sync::atomic::{AtomicU64, Ordering};
2767
2768        static N: AtomicU64 = AtomicU64::new(0);
2769        let pid = std::process::id();
2770        let n_a = N.fetch_add(1, Ordering::SeqCst);
2771        let n_b = N.fetch_add(1, Ordering::SeqCst);
2772        let path_a = std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_a}-pp2-src.db"));
2773        let path_b = std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_b}-pp2-dst.db"));
2774        let _ = std::fs::remove_file(&path_a);
2775        let _ = std::fs::remove_file(&path_b);
2776
2777        let url_a = DatabaseUrl::parse(&format!("sqlite://{}", path_a.display())).unwrap();
2778        let url_b = DatabaseUrl::parse(&format!("sqlite://{}", path_b.display())).unwrap();
2779        let mut src = crate::connect(&url_a, &ConnectOptions::default(), None).unwrap();
2780        let mut dst = crate::connect(&url_b, &ConnectOptions::default(), None).unwrap();
2781
2782        // No PRIMARY KEY on the source.
2783        src.execute("CREATE TABLE t (id INTEGER, name TEXT)")
2784            .unwrap();
2785        src.execute("INSERT INTO t VALUES (1, 'a')").unwrap();
2786
2787        let opts = CopyOptions {
2788            source: CopySource::Table("t".into()),
2789            create_table: true,
2790            preserve_pk: true,
2791            ..Default::default()
2792        };
2793        let copied = copy_rows(&mut src, Backend::Sqlite, &mut dst, Backend::Sqlite, &opts)
2794            .expect("copy_rows");
2795        assert_eq!(copied, 1);
2796
2797        // Destination created without a PK — fall-through, not failure.
2798        let pk = dst.primary_key(None, "t").unwrap();
2799        assert!(pk.is_empty(), "expected no PK; got {pk:?}");
2800
2801        let _ = std::fs::remove_file(&path_a);
2802        let _ = std::fs::remove_file(&path_b);
2803    }
2804
2805    /// SQLite end-to-end for `--if-exists upsert`: existing rows are
2806    /// overwritten by the source values; new rows are inserted.
2807    #[cfg(feature = "sqlite")]
2808    #[test]
2809    fn copy_upsert_overwrites_existing_rows() {
2810        use crate::connection::ConnectOptions;
2811        use crate::url::DatabaseUrl;
2812        use std::sync::atomic::{AtomicU64, Ordering};
2813
2814        static N: AtomicU64 = AtomicU64::new(0);
2815        let pid = std::process::id();
2816        let n_a = N.fetch_add(1, Ordering::SeqCst);
2817        let n_b = N.fetch_add(1, Ordering::SeqCst);
2818        let path_a = std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_a}-up-src.db"));
2819        let path_b = std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_b}-up-dst.db"));
2820        let _ = std::fs::remove_file(&path_a);
2821        let _ = std::fs::remove_file(&path_b);
2822
2823        let url_a = DatabaseUrl::parse(&format!("sqlite://{}", path_a.display())).unwrap();
2824        let url_b = DatabaseUrl::parse(&format!("sqlite://{}", path_b.display())).unwrap();
2825        let mut src = crate::connect(&url_a, &ConnectOptions::default(), None).unwrap();
2826        let mut dst = crate::connect(&url_b, &ConnectOptions::default(), None).unwrap();
2827
2828        src.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT)")
2829            .unwrap();
2830        dst.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT)")
2831            .unwrap();
2832        dst.execute("INSERT INTO t VALUES (1, 'old')").unwrap();
2833        src.execute("INSERT INTO t VALUES (1, 'new-1')").unwrap();
2834        src.execute("INSERT INTO t VALUES (2, 'src-only')").unwrap();
2835
2836        let opts = CopyOptions {
2837            source: CopySource::Table("t".into()),
2838            if_exists: IfExists::Upsert,
2839            ..Default::default()
2840        };
2841        let copied = copy_rows(&mut src, Backend::Sqlite, &mut dst, Backend::Sqlite, &opts)
2842            .expect("copy_rows");
2843        assert_eq!(copied, 2);
2844
2845        let out = dst.query("SELECT id, name FROM t ORDER BY id").unwrap();
2846        assert_eq!(out.rows.len(), 2);
2847        // id=1 overwritten to 'new-1' (upsert), id=2 inserted.
2848        assert!(matches!(&out.rows[0][1], Value::String(s) if s == "new-1"));
2849        assert!(matches!(&out.rows[1][1], Value::String(s) if s == "src-only"));
2850
2851        let _ = std::fs::remove_file(&path_a);
2852        let _ = std::fs::remove_file(&path_b);
2853    }
2854
2855    /// `--if-exists skip` / `upsert` against a PK-less destination
2856    /// must hard-error before the source is touched, pointing at the
2857    /// `--key` override or `--preserve-pk` for the create-table path.
2858    #[cfg(feature = "sqlite")]
2859    #[test]
2860    fn copy_skip_without_pk_hard_errors() {
2861        use crate::connection::ConnectOptions;
2862        use crate::url::DatabaseUrl;
2863        use std::sync::atomic::{AtomicU64, Ordering};
2864
2865        static N: AtomicU64 = AtomicU64::new(0);
2866        let pid = std::process::id();
2867        let n_a = N.fetch_add(1, Ordering::SeqCst);
2868        let n_b = N.fetch_add(1, Ordering::SeqCst);
2869        let path_a =
2870            std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_a}-nopk-src.db"));
2871        let path_b =
2872            std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_b}-nopk-dst.db"));
2873        let _ = std::fs::remove_file(&path_a);
2874        let _ = std::fs::remove_file(&path_b);
2875
2876        let url_a = DatabaseUrl::parse(&format!("sqlite://{}", path_a.display())).unwrap();
2877        let url_b = DatabaseUrl::parse(&format!("sqlite://{}", path_b.display())).unwrap();
2878        let mut src = crate::connect(&url_a, &ConnectOptions::default(), None).unwrap();
2879        let mut dst = crate::connect(&url_b, &ConnectOptions::default(), None).unwrap();
2880
2881        src.execute("CREATE TABLE t (id INTEGER, name TEXT)")
2882            .unwrap();
2883        // No PRIMARY KEY — Skip/Upsert can't pick conflict columns.
2884        dst.execute("CREATE TABLE t (id INTEGER, name TEXT)")
2885            .unwrap();
2886        src.execute("INSERT INTO t VALUES (1, 'a')").unwrap();
2887
2888        let opts = CopyOptions {
2889            source: CopySource::Table("t".into()),
2890            if_exists: IfExists::Skip,
2891            ..Default::default()
2892        };
2893        let err = copy_rows(&mut src, Backend::Sqlite, &mut dst, Backend::Sqlite, &opts)
2894            .expect_err("expected hard error for no-PK + skip");
2895        let msg = format!("{err}");
2896        assert!(
2897            msg.contains("no declared primary key"),
2898            "error should reference missing PK: {msg}"
2899        );
2900        assert!(
2901            msg.contains("--key"),
2902            "error should point at the --key override: {msg}"
2903        );
2904        assert!(
2905            msg.contains("--preserve-pk"),
2906            "error should point at --preserve-pk for create-table users: {msg}"
2907        );
2908
2909        let _ = std::fs::remove_file(&path_a);
2910        let _ = std::fs::remove_file(&path_b);
2911    }
2912
2913    /// `--key COL[,COL...]` lets the user supply conflict columns
2914    /// when the destination has no declared PK. Behaviour matches the
2915    /// PK-driven path: existing rows are upserted, new rows inserted.
2916    #[cfg(feature = "sqlite")]
2917    #[test]
2918    fn copy_key_override_upserts_against_pk_less_table() {
2919        use crate::connection::ConnectOptions;
2920        use crate::url::DatabaseUrl;
2921        use std::sync::atomic::{AtomicU64, Ordering};
2922
2923        static N: AtomicU64 = AtomicU64::new(0);
2924        let pid = std::process::id();
2925        let n_a = N.fetch_add(1, Ordering::SeqCst);
2926        let n_b = N.fetch_add(1, Ordering::SeqCst);
2927        let path_a =
2928            std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_a}-keyup-src.db"));
2929        let path_b =
2930            std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_b}-keyup-dst.db"));
2931        let _ = std::fs::remove_file(&path_a);
2932        let _ = std::fs::remove_file(&path_b);
2933
2934        let url_a = DatabaseUrl::parse(&format!("sqlite://{}", path_a.display())).unwrap();
2935        let url_b = DatabaseUrl::parse(&format!("sqlite://{}", path_b.display())).unwrap();
2936        let mut src = crate::connect(&url_a, &ConnectOptions::default(), None).unwrap();
2937        let mut dst = crate::connect(&url_b, &ConnectOptions::default(), None).unwrap();
2938
2939        // No PRIMARY KEY on either side; UNIQUE on (id) for the conflict.
2940        src.execute("CREATE TABLE t (id INTEGER, name TEXT)")
2941            .unwrap();
2942        dst.execute("CREATE TABLE t (id INTEGER, name TEXT, UNIQUE(id))")
2943            .unwrap();
2944        src.execute("INSERT INTO t VALUES (1, 'new-1')").unwrap();
2945        src.execute("INSERT INTO t VALUES (2, 'src-only')").unwrap();
2946        dst.execute("INSERT INTO t VALUES (1, 'old')").unwrap();
2947
2948        let opts = CopyOptions {
2949            source: CopySource::Table("t".into()),
2950            if_exists: IfExists::Upsert,
2951            conflict_key: vec!["id".to_string()],
2952            ..Default::default()
2953        };
2954        let copied = copy_rows(&mut src, Backend::Sqlite, &mut dst, Backend::Sqlite, &opts)
2955            .expect("copy_rows with --key");
2956        assert_eq!(copied, 2);
2957
2958        let out = dst.query("SELECT id, name FROM t ORDER BY id").unwrap();
2959        assert_eq!(out.rows.len(), 2);
2960        assert!(matches!(&out.rows[0][1], Value::String(s) if s == "new-1"));
2961        assert!(matches!(&out.rows[1][1], Value::String(s) if s == "src-only"));
2962
2963        let _ = std::fs::remove_file(&path_a);
2964        let _ = std::fs::remove_file(&path_b);
2965    }
2966
2967    /// `--key` naming a column that isn't in the source SELECT shape
2968    /// fails fast — before any INSERT runs — with an actionable error.
2969    #[cfg(feature = "sqlite")]
2970    #[test]
2971    fn copy_key_override_unknown_column_fails_fast() {
2972        use crate::connection::ConnectOptions;
2973        use crate::url::DatabaseUrl;
2974        use std::sync::atomic::{AtomicU64, Ordering};
2975
2976        static N: AtomicU64 = AtomicU64::new(0);
2977        let pid = std::process::id();
2978        let n_a = N.fetch_add(1, Ordering::SeqCst);
2979        let n_b = N.fetch_add(1, Ordering::SeqCst);
2980        let path_a =
2981            std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_a}-keybad-src.db"));
2982        let path_b =
2983            std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_b}-keybad-dst.db"));
2984        let _ = std::fs::remove_file(&path_a);
2985        let _ = std::fs::remove_file(&path_b);
2986
2987        let url_a = DatabaseUrl::parse(&format!("sqlite://{}", path_a.display())).unwrap();
2988        let url_b = DatabaseUrl::parse(&format!("sqlite://{}", path_b.display())).unwrap();
2989        let mut src = crate::connect(&url_a, &ConnectOptions::default(), None).unwrap();
2990        let mut dst = crate::connect(&url_b, &ConnectOptions::default(), None).unwrap();
2991
2992        src.execute("CREATE TABLE t (id INTEGER, name TEXT)")
2993            .unwrap();
2994        dst.execute("CREATE TABLE t (id INTEGER, name TEXT)")
2995            .unwrap();
2996        src.execute("INSERT INTO t VALUES (1, 'a')").unwrap();
2997
2998        let opts = CopyOptions {
2999            source: CopySource::Table("t".into()),
3000            if_exists: IfExists::Upsert,
3001            conflict_key: vec!["nonexistent".to_string()],
3002            ..Default::default()
3003        };
3004        let err = copy_rows(&mut src, Backend::Sqlite, &mut dst, Backend::Sqlite, &opts)
3005            .expect_err("expected error for unknown --key column");
3006        let msg = format!("{err}");
3007        assert!(
3008            msg.contains("nonexistent"),
3009            "error should name the unknown column: {msg}"
3010        );
3011
3012        let _ = std::fs::remove_file(&path_a);
3013        let _ = std::fs::remove_file(&path_b);
3014    }
3015
3016    /// Conflict resolution must force the dispatcher onto the generic
3017    /// INSERT path, even under `BulkMode::On` — the bulk loaders
3018    /// carry no MERGE / ON CONFLICT semantics.
3019    #[cfg(feature = "sqlite")]
3020    #[test]
3021    fn copy_upsert_forces_generic_path_even_under_bulk_on() {
3022        use crate::connection::ConnectOptions;
3023        use crate::url::DatabaseUrl;
3024        use dispatcher_harness::{BulkBehaviour, TrackingDst};
3025        use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
3026
3027        static N: AtomicU64 = AtomicU64::new(0);
3028        let pid = std::process::id();
3029        let n_a = N.fetch_add(1, Ordering::SeqCst);
3030        let n_b = N.fetch_add(1, Ordering::SeqCst);
3031        let path_a = std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_a}-bup-src.db"));
3032        let path_b = std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_b}-bup-dst.db"));
3033        let _ = std::fs::remove_file(&path_a);
3034        let _ = std::fs::remove_file(&path_b);
3035
3036        let url_a = DatabaseUrl::parse(&format!("sqlite://{}", path_a.display())).unwrap();
3037        let url_b = DatabaseUrl::parse(&format!("sqlite://{}", path_b.display())).unwrap();
3038        let mut src = crate::connect(&url_a, &ConnectOptions::default(), None).unwrap();
3039        let raw_dst = crate::connect(&url_b, &ConnectOptions::default(), None).unwrap();
3040
3041        src.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT)")
3042            .unwrap();
3043        // Seed the destination directly (before wrapping) so we keep
3044        // a single TrackingDst handle for the actual copy. The inner
3045        // PanicIfCalled wrapper would block bulk attempts during copy
3046        // but pass through plain execute()s — but plumbing seed DDL
3047        // through it adds noise. Seed via a short-lived second
3048        // connection on the same on-disk file instead.
3049        let mut seed_dst = crate::connect(&url_b, &ConnectOptions::default(), None).unwrap();
3050        seed_dst
3051            .execute("CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT)")
3052            .unwrap();
3053        seed_dst.execute("INSERT INTO t VALUES (1, 'old')").unwrap();
3054        drop(seed_dst);
3055        src.execute("INSERT INTO t VALUES (1, 'new-1')").unwrap();
3056        src.execute("INSERT INTO t VALUES (2, 'src-only')").unwrap();
3057
3058        let bulk_calls = std::sync::Arc::new(AtomicUsize::new(0));
3059        let mut tracking = TrackingDst {
3060            inner: Box::new(raw_dst),
3061            bulk_calls: bulk_calls.clone(),
3062            behaviour: BulkBehaviour::PanicIfCalled,
3063        };
3064
3065        let opts = CopyOptions {
3066            source: CopySource::Table("t".into()),
3067            if_exists: IfExists::Upsert,
3068            bulk_mode: BulkMode::On,
3069            ..Default::default()
3070        };
3071        let copied = copy_rows(
3072            &mut src,
3073            Backend::Sqlite,
3074            &mut tracking,
3075            Backend::Sqlite,
3076            &opts,
3077        )
3078        .expect("copy_rows should succeed under forced-generic path");
3079        assert_eq!(copied, 2);
3080        // PanicIfCalled would have aborted if bulk_insert_rows had
3081        // been invoked; assert zero invocations for belt-and-braces.
3082        assert_eq!(bulk_calls.load(Ordering::SeqCst), 0);
3083
3084        let _ = std::fs::remove_file(&path_a);
3085        let _ = std::fs::remove_file(&path_b);
3086    }
3087
3088    // --- Phase 3 unit tests --------------------------------------------
3089
3090    #[test]
3091    fn matches_glob_literal_and_wildcards() {
3092        assert!(matches_glob("users", "users"));
3093        assert!(!matches_glob("users", "Users"));
3094        assert!(matches_glob("*", "anything"));
3095        assert!(matches_glob("test_*", "test_users"));
3096        assert!(matches_glob("test_*", "test_orders"));
3097        assert!(!matches_glob("test_*", "users"));
3098        assert!(matches_glob("?ser", "user"));
3099        assert!(!matches_glob("?ser", "users"));
3100        assert!(matches_glob("a*b*c", "axxxbyyc"));
3101        assert!(matches_glob("*", ""));
3102        assert!(!matches_glob("nonempty", ""));
3103    }
3104
3105    fn fk(child: &str, parent: &str) -> ForeignKey {
3106        ForeignKey {
3107            child_table: child.to_string(),
3108            child_columns: vec!["fk".into()],
3109            parent_table: parent.to_string(),
3110            parent_columns: vec!["id".into()],
3111            on_delete: None,
3112        }
3113    }
3114
3115    #[test]
3116    fn topo_sort_simple_dag_orders_parents_first() {
3117        let tables: Vec<String> = ["orders", "users", "items"]
3118            .iter()
3119            .map(|s| s.to_string())
3120            .collect();
3121        // orders -> users; orders -> items.
3122        let fks = vec![fk("orders", "users"), fk("orders", "items")];
3123        let out = topo_sort(&tables, &fks).expect("ordered");
3124        let users_pos = out.iter().position(|t| t == "users").unwrap();
3125        let items_pos = out.iter().position(|t| t == "items").unwrap();
3126        let orders_pos = out.iter().position(|t| t == "orders").unwrap();
3127        assert!(users_pos < orders_pos, "users must precede orders: {out:?}");
3128        assert!(items_pos < orders_pos, "items must precede orders: {out:?}");
3129    }
3130
3131    #[test]
3132    fn topo_sort_preserves_input_order_for_independent_tables() {
3133        // Three tables with no FKs — output should match input order
3134        // so successive runs are deterministic.
3135        let tables: Vec<String> = ["c", "a", "b"].iter().map(|s| s.to_string()).collect();
3136        let out = topo_sort(&tables, &[]).expect("ordered");
3137        assert_eq!(out, tables);
3138    }
3139
3140    #[test]
3141    fn topo_sort_drops_edges_to_excluded_parents() {
3142        // `users` is excluded from `tables` — the FK orders -> users
3143        // should be ignored entirely, not block orders from emitting.
3144        let tables: Vec<String> = ["orders"].iter().map(|s| s.to_string()).collect();
3145        let fks = vec![fk("orders", "users")];
3146        let out = topo_sort(&tables, &fks).expect("ordered");
3147        assert_eq!(out, vec!["orders".to_string()]);
3148    }
3149
3150    #[test]
3151    fn topo_sort_ignores_self_referential_fk() {
3152        // tree-shaped tables with a `parent_id REFERENCES tree(id)`
3153        // self-FK should still emit cleanly — Kahn would otherwise
3154        // see `tree` as having itself as an unsatisfied parent.
3155        let tables: Vec<String> = ["tree"].iter().map(|s| s.to_string()).collect();
3156        let fks = vec![fk("tree", "tree")];
3157        let out = topo_sort(&tables, &fks).expect("ordered");
3158        assert_eq!(out, vec!["tree".to_string()]);
3159    }
3160
3161    #[test]
3162    fn topo_sort_reports_cycle_with_remaining_nodes_sorted() {
3163        let tables: Vec<String> = ["a", "b", "c"].iter().map(|s| s.to_string()).collect();
3164        // a -> b -> c -> a, a 3-cycle.
3165        let fks = vec![fk("a", "b"), fk("b", "c"), fk("c", "a")];
3166        let err = topo_sort(&tables, &fks).expect_err("cycle expected");
3167        assert_eq!(
3168            err.remaining,
3169            vec!["a".to_string(), "b".to_string(), "c".to_string()]
3170        );
3171    }
3172
3173    #[test]
3174    fn topo_sort_cycle_does_not_block_dag_tables() {
3175        // Mix a 2-cycle (a <-> b) with an independent table c. c
3176        // should still emit; only a, b are reported as the cycle.
3177        let tables: Vec<String> = ["a", "b", "c"].iter().map(|s| s.to_string()).collect();
3178        let fks = vec![fk("a", "b"), fk("b", "a")];
3179        let err = topo_sort(&tables, &fks).expect_err("cycle expected");
3180        assert_eq!(err.remaining, vec!["a".to_string(), "b".to_string()]);
3181    }
3182
3183    /// SQLite end-to-end for `--all-tables`: parent + child tables
3184    /// copied in FK order; child table is created on the destination
3185    /// (via `--create-table`) after the parent so the FK target
3186    /// exists when the child rows land.
3187    #[cfg(feature = "sqlite")]
3188    #[test]
3189    fn copy_all_tables_orders_by_fk_and_copies_everything() {
3190        use crate::connection::ConnectOptions;
3191        use crate::url::DatabaseUrl;
3192        use std::sync::atomic::{AtomicU64, Ordering};
3193
3194        static N: AtomicU64 = AtomicU64::new(0);
3195        let pid = std::process::id();
3196        let n_a = N.fetch_add(1, Ordering::SeqCst);
3197        let n_b = N.fetch_add(1, Ordering::SeqCst);
3198        let path_a = std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_a}-all-src.db"));
3199        let path_b = std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_b}-all-dst.db"));
3200        let _ = std::fs::remove_file(&path_a);
3201        let _ = std::fs::remove_file(&path_b);
3202
3203        let url_a = DatabaseUrl::parse(&format!("sqlite://{}", path_a.display())).unwrap();
3204        let url_b = DatabaseUrl::parse(&format!("sqlite://{}", path_b.display())).unwrap();
3205        let mut src = crate::connect(&url_a, &ConnectOptions::default(), None).unwrap();
3206        let mut dst = crate::connect(&url_b, &ConnectOptions::default(), None).unwrap();
3207
3208        // Enable FK enforcement on the destination so the test would
3209        // fail if the load order were wrong (child before parent).
3210        dst.execute("PRAGMA foreign_keys = ON").unwrap();
3211
3212        src.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
3213            .unwrap();
3214        src.execute(
3215            "CREATE TABLE orders (id INTEGER PRIMARY KEY, \
3216                                  user_id INTEGER REFERENCES users(id), \
3217                                  total REAL)",
3218        )
3219        .unwrap();
3220        src.execute("INSERT INTO users VALUES (1, 'Alice'), (2, 'Bob')")
3221            .unwrap();
3222        src.execute("INSERT INTO orders VALUES (1, 1, 9.99), (2, 1, 4.50), (3, 2, 12.00)")
3223            .unwrap();
3224
3225        let opts = AllTablesOptions {
3226            create_table: true,
3227            ..Default::default()
3228        };
3229        let copied = copy_all_tables(&mut src, Backend::Sqlite, &mut dst, Backend::Sqlite, &opts)
3230            .expect("copy_all_tables");
3231        // 2 users + 3 orders.
3232        assert_eq!(copied, 5);
3233
3234        let u = dst.query("SELECT count(*) FROM users").unwrap();
3235        let o = dst.query("SELECT count(*) FROM orders").unwrap();
3236        assert!(matches!(&u.rows[0][0], Value::Int64(2)));
3237        assert!(matches!(&o.rows[0][0], Value::Int64(3)));
3238
3239        let _ = std::fs::remove_file(&path_a);
3240        let _ = std::fs::remove_file(&path_b);
3241    }
3242
3243    /// `--include` / `--exclude` glob filters: only matched tables
3244    /// are copied; topo_sort runs over the filtered subset.
3245    #[cfg(feature = "sqlite")]
3246    #[test]
3247    fn copy_all_tables_respects_include_and_exclude() {
3248        use crate::connection::ConnectOptions;
3249        use crate::url::DatabaseUrl;
3250        use std::sync::atomic::{AtomicU64, Ordering};
3251
3252        static N: AtomicU64 = AtomicU64::new(0);
3253        let pid = std::process::id();
3254        let n_a = N.fetch_add(1, Ordering::SeqCst);
3255        let n_b = N.fetch_add(1, Ordering::SeqCst);
3256        let path_a =
3257            std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_a}-incl-src.db"));
3258        let path_b =
3259            std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_b}-incl-dst.db"));
3260        let _ = std::fs::remove_file(&path_a);
3261        let _ = std::fs::remove_file(&path_b);
3262
3263        let url_a = DatabaseUrl::parse(&format!("sqlite://{}", path_a.display())).unwrap();
3264        let url_b = DatabaseUrl::parse(&format!("sqlite://{}", path_b.display())).unwrap();
3265        let mut src = crate::connect(&url_a, &ConnectOptions::default(), None).unwrap();
3266        let mut dst = crate::connect(&url_b, &ConnectOptions::default(), None).unwrap();
3267
3268        // Three independent tables; include `app_*`, exclude `app_logs`.
3269        src.execute("CREATE TABLE app_users (id INTEGER, name TEXT)")
3270            .unwrap();
3271        src.execute("CREATE TABLE app_logs (id INTEGER, msg TEXT)")
3272            .unwrap();
3273        src.execute("CREATE TABLE other (id INTEGER)").unwrap();
3274        src.execute("INSERT INTO app_users VALUES (1, 'A')")
3275            .unwrap();
3276        src.execute("INSERT INTO app_logs VALUES (1, 'noise')")
3277            .unwrap();
3278        src.execute("INSERT INTO other VALUES (1)").unwrap();
3279
3280        let opts = AllTablesOptions {
3281            include: vec!["app_*".into()],
3282            exclude: vec!["app_logs".into()],
3283            create_table: true,
3284            ..Default::default()
3285        };
3286        let copied = copy_all_tables(&mut src, Backend::Sqlite, &mut dst, Backend::Sqlite, &opts)
3287            .expect("copy_all_tables");
3288        // Only app_users (1 row) — app_logs excluded, other not in include.
3289        assert_eq!(copied, 1);
3290        let tables = dst.list_tables(None).unwrap();
3291        assert!(tables.contains(&"app_users".to_string()));
3292        assert!(!tables.contains(&"app_logs".to_string()));
3293        assert!(!tables.contains(&"other".to_string()));
3294
3295        let _ = std::fs::remove_file(&path_a);
3296        let _ = std::fs::remove_file(&path_b);
3297    }
3298
3299    /// FK cycle without `--no-fk-check` hard-errors with the cycle
3300    /// path. With `--no-fk-check` set, the copy proceeds (and may or
3301    /// may not succeed depending on data; here we just verify the
3302    /// dispatcher doesn't gate on the cycle).
3303    #[cfg(feature = "sqlite")]
3304    #[test]
3305    fn copy_all_tables_rejects_cycle_unless_no_fk_check() {
3306        use crate::connection::ConnectOptions;
3307        use crate::url::DatabaseUrl;
3308        use std::sync::atomic::{AtomicU64, Ordering};
3309
3310        static N: AtomicU64 = AtomicU64::new(0);
3311        let pid = std::process::id();
3312        let n_a = N.fetch_add(1, Ordering::SeqCst);
3313        let n_b = N.fetch_add(1, Ordering::SeqCst);
3314        let path_a = std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_a}-cyc-src.db"));
3315        let path_b = std::env::temp_dir().join(format!("ferrule-copy-test-{pid}-{n_b}-cyc-dst.db"));
3316        let _ = std::fs::remove_file(&path_a);
3317        let _ = std::fs::remove_file(&path_b);
3318
3319        let url_a = DatabaseUrl::parse(&format!("sqlite://{}", path_a.display())).unwrap();
3320        let url_b = DatabaseUrl::parse(&format!("sqlite://{}", path_b.display())).unwrap();
3321        let mut src = crate::connect(&url_a, &ConnectOptions::default(), None).unwrap();
3322        let mut dst = crate::connect(&url_b, &ConnectOptions::default(), None).unwrap();
3323        // FK enforcement OFF on destination so the cyclic test data
3324        // can land at all.
3325        dst.execute("PRAGMA foreign_keys = OFF").unwrap();
3326
3327        // a -> b -> a, a 2-cycle.
3328        src.execute("CREATE TABLE a (id INTEGER PRIMARY KEY, b_id INTEGER REFERENCES b(id))")
3329            .unwrap();
3330        src.execute("CREATE TABLE b (id INTEGER PRIMARY KEY, a_id INTEGER REFERENCES a(id))")
3331            .unwrap();
3332        src.execute("INSERT INTO a VALUES (1, NULL)").unwrap();
3333        src.execute("INSERT INTO b VALUES (1, NULL)").unwrap();
3334
3335        let opts = AllTablesOptions {
3336            create_table: true,
3337            ..Default::default()
3338        };
3339        let err = copy_all_tables(&mut src, Backend::Sqlite, &mut dst, Backend::Sqlite, &opts)
3340            .expect_err("cycle should hard-error");
3341        let msg = format!("{err}");
3342        assert!(msg.contains("foreign-key cycle"), "{msg}");
3343        assert!(msg.contains("--no-fk-check"), "{msg}");
3344
3345        // Same input with --no-fk-check: should succeed (copies in
3346        // discovery order; FK enforcement on dst is OFF).
3347        let opts_relaxed = AllTablesOptions {
3348            create_table: true,
3349            no_fk_check: true,
3350            ..Default::default()
3351        };
3352        let copied = copy_all_tables(
3353            &mut src,
3354            Backend::Sqlite,
3355            &mut dst,
3356            Backend::Sqlite,
3357            &opts_relaxed,
3358        )
3359        .expect("copy_all_tables with --no-fk-check");
3360        assert_eq!(copied, 2);
3361
3362        let _ = std::fs::remove_file(&path_a);
3363        let _ = std::fs::remove_file(&path_b);
3364    }
3365}