Skip to main content

ferrule_sql/
write.rs

1//! Embeddable batched write path with structured partial-failure
2//! reporting.
3//!
4//! This is the write counterpart to the streaming read cursor: a host
5//! embedding `ferrule-sql` as an *output sink* pushes rows in and gets
6//! back a structured [`WriteReport`] describing exactly which batches /
7//! rows landed and which were rejected, so it can route the rejects.
8//!
9//! **Reuse, not reinvention.** Every byte of SQL this module emits comes
10//! from the existing copy/load machinery — [`build_insert_sql`] and the
11//! per-dialect conflict (ON CONFLICT / MERGE / ODKU) builders, the
12//! [`insert_batch`] bulk-vs-generic dispatcher, [`quote_identifier`],
13//! [`render_value`](crate::render::render_value), and the
14//! [`transaction`](crate::transaction) helpers. The write path only adds
15//! batching, back-pressure, the atomic-boundary policy, and the
16//! structured result.
17//!
18//! **Bounded, back-pressured batches.** Rows are consumed from any
19//! iterator and flushed in fixed-size batches
20//! ([`WriteOptions::batch_size`]); only one batch is buffered at a time,
21//! so peak memory is `O(batch_size)` regardless of how many rows the
22//! source iterator yields. Pair this with the streaming
23//! [`RowCursor`](crate::RowCursor) on the read side for an end-to-end
24//! bounded-memory pipe.
25//!
26//! [`build_insert_sql`]: crate::copy
27//! [`insert_batch`]: crate::copy
28//! [`quote_identifier`]: crate::copy::quote_identifier
29
30use crate::backend::Backend;
31use crate::connection::Connection;
32use crate::copy::{
33    BulkMode, CopyFormat, IfExists, backend_needs_explicit_commit, insert_batch, quote_identifier,
34};
35use crate::error::SqlError;
36use crate::transaction::{begin_transaction, commit_transaction, rollback_transaction};
37use crate::value::{ColumnInfo, Row};
38
39/// Default rows per write batch. Caps the in-flight buffer so a write of
40/// an unbounded row stream stays `O(batch_size)` in memory.
41pub const DEFAULT_WRITE_BATCH: usize = 1000;
42
43/// Host-facing conflict semantics for a batched write.
44///
45/// Maps onto the same per-dialect SQL the cross-DB copy path uses
46/// ([`IfExists`]); exposed as its own enum so the write API names the
47/// *write* intent (insert / skip / upsert) rather than the copy-time
48/// "what if the target exists" framing.
49#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
50pub enum WriteMode {
51    /// Plain `INSERT`. A primary-key / unique conflict surfaces as a
52    /// driver error and rejects the batch (or, under
53    /// [`WriteOptions::isolate_failures`], the offending rows).
54    #[default]
55    Insert,
56    /// Insert rows whose key is new; silently skip rows whose key
57    /// already exists (`ON CONFLICT DO NOTHING` / `INSERT IGNORE` /
58    /// `MERGE … WHEN NOT MATCHED`). Requires conflict columns.
59    Skip,
60    /// Insert new rows; overwrite the non-key columns of rows whose key
61    /// already exists (`ON CONFLICT DO UPDATE` / `ON DUPLICATE KEY
62    /// UPDATE` / full `MERGE`). Requires conflict columns.
63    Upsert,
64}
65
66impl WriteMode {
67    /// The copy-layer [`IfExists`] strategy this write mode reuses.
68    fn if_exists(self) -> IfExists {
69        match self {
70            WriteMode::Insert => IfExists::Append,
71            WriteMode::Skip => IfExists::Skip,
72            WriteMode::Upsert => IfExists::Upsert,
73        }
74    }
75
76    /// True for the key-driven modes that need conflict columns.
77    fn needs_key(self) -> bool {
78        matches!(self, WriteMode::Skip | WriteMode::Upsert)
79    }
80}
81
82/// Configuration for a batched write.
83pub struct WriteOptions {
84    /// Conflict semantics. Default [`WriteMode::Insert`].
85    pub mode: WriteMode,
86    /// Rows per batch. `0` is treated as [`DEFAULT_WRITE_BATCH`].
87    pub batch_size: usize,
88    /// Conflict-key columns for [`WriteMode::Skip`] / [`WriteMode::Upsert`].
89    /// Must be a subset of `columns`. Empty is rejected for those modes.
90    pub key_columns: Vec<String>,
91    /// Route batches through the destination's native bulk loader
92    /// ([`BulkMode::Auto`] / [`BulkMode::On`]). Ignored for the
93    /// conflict modes, whose bulk loaders carry no MERGE semantics.
94    pub bulk_mode: BulkMode,
95    /// Postgres `COPY` wire format for the bulk path. Other backends
96    /// ignore it.
97    pub copy_format: CopyFormat,
98    /// Wrap the whole write in one outer transaction: all batches commit
99    /// together or roll back together. Uses the same per-backend
100    /// BEGIN/COMMIT/ROLLBACK as `migrate`'s atomic apply — MSSQL adds
101    /// `SET XACT_ABORT ON`; Oracle gets an explicit COMMIT. A batch
102    /// failure under `atomic` rolls the whole write back and the report
103    /// records the failing batch.
104    pub atomic: bool,
105    /// On a batch failure, retry that batch **one row at a time** to
106    /// pinpoint the rejected rows (recorded as [`RejectedRow`]s) and let
107    /// the good rows through. Off by default — the cheaper per-batch
108    /// granularity rejects the whole failing batch. Mutually weakened by
109    /// `atomic`: under one outer transaction a row probe cannot commit
110    /// partial good rows, so `isolate_failures` only refines the
111    /// *diagnosis*, not the commit boundary.
112    pub isolate_failures: bool,
113    /// Emit per-batch diagnostics on stderr (bulk path selection /
114    /// fallback). Mirrors the CLI `--verbose` flag.
115    pub verbose: bool,
116}
117
118impl Default for WriteOptions {
119    fn default() -> Self {
120        Self {
121            mode: WriteMode::default(),
122            batch_size: DEFAULT_WRITE_BATCH,
123            key_columns: Vec::new(),
124            bulk_mode: BulkMode::Off,
125            copy_format: CopyFormat::Text,
126            atomic: false,
127            isolate_failures: false,
128            verbose: false,
129        }
130    }
131}
132
133impl std::fmt::Debug for WriteOptions {
134    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135        f.debug_struct("WriteOptions")
136            .field("mode", &self.mode)
137            .field("batch_size", &self.batch_size)
138            .field("key_columns", &self.key_columns)
139            .field("bulk_mode", &self.bulk_mode)
140            .field("copy_format", &self.copy_format)
141            .field("atomic", &self.atomic)
142            .field("isolate_failures", &self.isolate_failures)
143            .field("verbose", &self.verbose)
144            .finish()
145    }
146}
147
148/// What happened to one batch.
149#[derive(Debug, Clone, PartialEq, Eq)]
150pub enum BatchOutcome {
151    /// Every row in the batch was written.
152    Written,
153    /// The batch (or specific rows within it) was rejected; details are
154    /// in the [`WriteReport`]'s `rejected_batches` / `rejected_rows`.
155    Rejected,
156}
157
158/// A batch that failed to write as a unit.
159///
160/// `start_row` is the 0-based index (within the whole write) of the
161/// first row in the batch; `row_count` its size. `error` is the driver
162/// error message. When [`WriteOptions::isolate_failures`] is set, the
163/// per-row breakdown is in [`WriteReport::rejected_rows`] instead.
164#[derive(Debug, Clone)]
165pub struct RejectedBatch {
166    pub batch_index: usize,
167    pub start_row: u64,
168    pub row_count: usize,
169    pub error: String,
170}
171
172/// A single row rejected during a [`WriteOptions::isolate_failures`]
173/// probe. `row_index` is the 0-based index within the whole write.
174#[derive(Debug, Clone)]
175pub struct RejectedRow {
176    pub row_index: u64,
177    pub error: String,
178}
179
180/// Structured outcome of a batched write.
181///
182/// `rows_written` + the rejected counts let a host reconcile exactly
183/// what landed. `rejected_batches` / `rejected_rows` carry the routable
184/// rejects (whole batches by default; individual rows under
185/// [`WriteOptions::isolate_failures`]).
186#[derive(Debug, Clone, Default)]
187pub struct WriteReport {
188    /// Total rows the caller handed in.
189    pub rows_attempted: u64,
190    /// Rows the database accepted.
191    pub rows_written: u64,
192    /// Batches that committed cleanly.
193    pub batches_committed: usize,
194    /// Batches rejected as a unit (default granularity).
195    pub rejected_batches: Vec<RejectedBatch>,
196    /// Rows rejected individually (only populated under
197    /// [`WriteOptions::isolate_failures`]).
198    pub rejected_rows: Vec<RejectedRow>,
199}
200
201impl WriteReport {
202    /// True when every attempted row was written.
203    #[must_use]
204    pub fn is_complete(&self) -> bool {
205        self.rejected_batches.is_empty() && self.rejected_rows.is_empty()
206    }
207}
208
209/// Write `rows` into `table` on `dst` in bounded back-pressured batches,
210/// returning a structured [`WriteReport`].
211///
212/// `columns` is the destination column order; every row must match it
213/// positionally. SQL generation, bulk dispatch, and transaction control
214/// are all delegated to the existing copy/transaction machinery (see the
215/// [module docs](self)). Rows are pulled from `rows` one batch at a time,
216/// so an unbounded iterator is written at `O(batch_size)` memory.
217///
218/// **Blocking:** issues synchronous statements through `dst`; blocks
219/// until the write completes (or the outer transaction rolls back under
220/// `atomic`).
221///
222/// **Atomicity:** with [`WriteOptions::atomic`], all batches run inside
223/// one BEGIN/COMMIT (MSSQL `SET XACT_ABORT ON`, Oracle explicit COMMIT);
224/// the first failing batch rolls the whole write back and is recorded.
225/// Without it, each batch is independent — earlier committed batches
226/// stay, later ones still run, and failures are collected.
227pub fn write_rows<I>(
228    dst: &mut dyn Connection,
229    backend: Backend,
230    table: &str,
231    columns: &[ColumnInfo],
232    rows: I,
233    opts: &WriteOptions,
234) -> Result<WriteReport, SqlError>
235where
236    I: IntoIterator<Item = Row>,
237{
238    if opts.mode.needs_key() && opts.key_columns.is_empty() {
239        return Err(SqlError::QueryFailed(format!(
240            "{:?} write mode requires key_columns (conflict key); none supplied",
241            opts.mode
242        )));
243    }
244    // Validate the key columns are part of the destination shape before
245    // any row is sent — fail fast, like the copy path's preflight.
246    for key in &opts.key_columns {
247        if !columns.iter().any(|c| &c.name == key) {
248            return Err(SqlError::QueryFailed(format!(
249                "key column {key:?} is not among the destination columns"
250            )));
251        }
252    }
253
254    let batch_size = if opts.batch_size == 0 {
255        DEFAULT_WRITE_BATCH
256    } else {
257        opts.batch_size
258    };
259    let if_exists = opts.mode.if_exists();
260    let quoted_table = quote_identifier(table, backend);
261    let cols_clause = columns
262        .iter()
263        .map(|c| quote_identifier(&c.name, backend))
264        .collect::<Vec<_>>()
265        .join(", ");
266
267    let mut report = WriteReport::default();
268
269    let atomic_opened = if opts.atomic {
270        // MSSQL: XACT_ABORT ON makes any statement error abort the whole
271        // transaction, matching migrate's atomic-apply contract.
272        #[cfg(feature = "mssql")]
273        if matches!(backend, Backend::MsSql) {
274            let _ = dst.execute("SET XACT_ABORT ON");
275        }
276        begin_transaction(dst, backend)
277    } else {
278        false
279    };
280
281    let mut iter = rows.into_iter();
282    let mut batch: Vec<Row> = Vec::with_capacity(batch_size);
283    let mut batch_index = 0usize;
284    let mut next_row: u64 = 0;
285    let mut atomic_failure: Option<SqlError> = None;
286
287    loop {
288        batch.clear();
289        for _ in 0..batch_size {
290            match iter.next() {
291                Some(row) => batch.push(row),
292                None => break,
293            }
294        }
295        if batch.is_empty() {
296            break;
297        }
298        let start_row = next_row;
299        let n = batch.len();
300        report.rows_attempted += n as u64;
301        next_row += n as u64;
302
303        match insert_batch(
304            dst,
305            table,
306            columns,
307            &opts.key_columns,
308            &quoted_table,
309            &cols_clause,
310            &batch,
311            backend,
312            if_exists,
313            opts.bulk_mode,
314            opts.copy_format,
315            opts.verbose,
316        ) {
317            Ok(()) => {
318                report.rows_written += n as u64;
319                report.batches_committed += 1;
320            }
321            Err(err) => {
322                if atomic_opened {
323                    // Under one outer transaction a failed batch dooms
324                    // the whole write; stop and roll back below.
325                    record_batch_rejection(&mut report, batch_index, start_row, n, &err);
326                    atomic_failure = Some(err);
327                    break;
328                }
329                if opts.isolate_failures {
330                    // Probe the batch row-by-row to attribute the
331                    // failure and let the good rows through.
332                    let written = probe_rows(
333                        dst,
334                        table,
335                        columns,
336                        &opts.key_columns,
337                        &quoted_table,
338                        &cols_clause,
339                        &batch,
340                        backend,
341                        if_exists,
342                        opts.copy_format,
343                        opts.verbose,
344                        start_row,
345                        &mut report,
346                    );
347                    report.rows_written += written;
348                } else {
349                    record_batch_rejection(&mut report, batch_index, start_row, n, &err);
350                }
351            }
352        }
353        batch_index += 1;
354    }
355
356    if atomic_opened {
357        if let Some(err) = atomic_failure {
358            let _ = rollback_transaction(dst, backend);
359            // The rollback undoes every prior batch, so nothing landed.
360            report.rows_written = 0;
361            report.batches_committed = 0;
362            return Err(SqlError::QueryFailed(format!(
363                "atomic write rolled back after batch {} failed: {err}",
364                report.rejected_batches.last().map_or(0, |b| b.batch_index)
365            )));
366        }
367        commit_transaction(dst, backend)?;
368        // Oracle has no DML autocommit; the explicit COMMIT above already
369        // terminated the transaction, so the extra-commit guard is a
370        // no-op here but kept for parity with the copy/migrate paths.
371        let _ = backend_needs_explicit_commit(backend);
372    }
373
374    Ok(report)
375}
376
377/// Record a whole-batch rejection in the report.
378fn record_batch_rejection(
379    report: &mut WriteReport,
380    batch_index: usize,
381    start_row: u64,
382    row_count: usize,
383    err: &SqlError,
384) {
385    report.rejected_batches.push(RejectedBatch {
386        batch_index,
387        start_row,
388        row_count,
389        error: err.to_string(),
390    });
391}
392
393/// Retry a failed batch one row at a time (non-atomic path only),
394/// recording each rejected row and returning how many rows were written.
395#[allow(clippy::too_many_arguments)]
396fn probe_rows(
397    dst: &mut dyn Connection,
398    table: &str,
399    columns: &[ColumnInfo],
400    key_columns: &[String],
401    quoted_table: &str,
402    cols_clause: &str,
403    batch: &[Row],
404    backend: Backend,
405    if_exists: IfExists,
406    copy_format: CopyFormat,
407    verbose: bool,
408    start_row: u64,
409    report: &mut WriteReport,
410) -> u64 {
411    let mut written = 0u64;
412    for (offset, row) in batch.iter().enumerate() {
413        let single = std::slice::from_ref(row);
414        // Force the generic path for the per-row probe: bulk loaders
415        // carry no per-row error attribution.
416        match insert_batch(
417            dst,
418            table,
419            columns,
420            key_columns,
421            quoted_table,
422            cols_clause,
423            single,
424            backend,
425            if_exists,
426            BulkMode::Off,
427            copy_format,
428            verbose,
429        ) {
430            Ok(()) => written += 1,
431            Err(err) => report.rejected_rows.push(RejectedRow {
432                row_index: start_row + offset as u64,
433                error: err.to_string(),
434            }),
435        }
436    }
437    written
438}
439
440// The write-path tests exercise the round trip against an embedded
441// SQLite database (always available, no container), so they are gated on
442// the `sqlite` feature.
443#[cfg(all(test, feature = "sqlite"))]
444mod tests {
445    use super::*;
446    use crate::connection::ConnectOptions;
447    use crate::url::DatabaseUrl;
448    use crate::value::{TypeHint, Value};
449    use std::sync::atomic::{AtomicU64, Ordering};
450
451    static CTR: AtomicU64 = AtomicU64::new(0);
452
453    fn fresh_sqlite() -> (Box<dyn Connection>, std::path::PathBuf) {
454        let pid = std::process::id();
455        let n = CTR.fetch_add(1, Ordering::SeqCst);
456        let path = std::env::temp_dir().join(format!("ferrule-write-test-{pid}-{n}.db"));
457        let _ = std::fs::remove_file(&path);
458        let url = DatabaseUrl::parse(&format!("sqlite://{}", path.display())).unwrap();
459        let conn = crate::connect(&url, &ConnectOptions::default(), None).unwrap();
460        (conn, path)
461    }
462
463    fn col(name: &str) -> ColumnInfo {
464        ColumnInfo {
465            name: name.to_string(),
466            type_hint: TypeHint::Other,
467            nullable: true,
468        }
469    }
470
471    /// Round trip: write N rows in bounded batches and read them back.
472    /// `batch_size` smaller than N forces multiple batches, exercising
473    /// the bounded back-pressured loop (only one batch buffered).
474    #[test]
475    fn write_rows_round_trip_in_bounded_batches() {
476        let (mut conn, path) = fresh_sqlite();
477        conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT)")
478            .unwrap();
479        let columns = vec![col("id"), col("name")];
480        let rows: Vec<Row> = (1..=2500)
481            .map(|i| vec![Value::Int64(i), Value::String(format!("n{i}"))])
482            .collect();
483        let opts = WriteOptions {
484            batch_size: 100,
485            ..Default::default()
486        };
487        let report = write_rows(&mut *conn, Backend::Sqlite, "t", &columns, rows, &opts).unwrap();
488        assert_eq!(report.rows_attempted, 2500);
489        assert_eq!(report.rows_written, 2500);
490        // ceil(2500 / 100) batches, all committed.
491        assert_eq!(report.batches_committed, 25);
492        assert!(report.is_complete());
493
494        let back = conn.query("SELECT COUNT(*) FROM t").unwrap();
495        assert!(matches!(back.rows[0][0], Value::Int64(2500)));
496        let _ = std::fs::remove_file(&path);
497    }
498
499    /// Per-batch partial-failure routing: a batch containing a duplicate
500    /// PK is rejected as a unit (default granularity), surfaced
501    /// structurally, while clean batches still land.
502    #[test]
503    fn write_rows_rejects_failing_batch_structurally() {
504        let (mut conn, path) = fresh_sqlite();
505        conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY)")
506            .unwrap();
507        // Seed id=5 so the batch containing it collides.
508        conn.execute("INSERT INTO t VALUES (5)").unwrap();
509        let columns = vec![col("id")];
510        // Two batches of size 4: [1,2,3,4] ok, [5,6,7,8] collides on 5.
511        let rows: Vec<Row> = (1..=8).map(|i| vec![Value::Int64(i)]).collect();
512        let opts = WriteOptions {
513            batch_size: 4,
514            ..Default::default()
515        };
516        let report = write_rows(&mut *conn, Backend::Sqlite, "t", &columns, rows, &opts).unwrap();
517        assert_eq!(report.rows_attempted, 8);
518        assert_eq!(report.rows_written, 4, "only the clean batch landed");
519        assert_eq!(report.batches_committed, 1);
520        assert_eq!(report.rejected_batches.len(), 1);
521        let rej = &report.rejected_batches[0];
522        assert_eq!(rej.batch_index, 1);
523        assert_eq!(rej.start_row, 4);
524        assert_eq!(rej.row_count, 4);
525        assert!(!report.is_complete());
526        let _ = std::fs::remove_file(&path);
527    }
528
529    /// `isolate_failures` probes the failing batch row-by-row, so the
530    /// good rows in a partially-bad batch land and only the true
531    /// offender is recorded as a rejected row.
532    #[test]
533    fn write_rows_isolates_offending_row() {
534        let (mut conn, path) = fresh_sqlite();
535        conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY)")
536            .unwrap();
537        conn.execute("INSERT INTO t VALUES (3)").unwrap();
538        let columns = vec![col("id")];
539        // One batch [1,2,3,4]; only id=3 collides.
540        let rows: Vec<Row> = (1..=4).map(|i| vec![Value::Int64(i)]).collect();
541        let opts = WriteOptions {
542            batch_size: 10,
543            isolate_failures: true,
544            ..Default::default()
545        };
546        let report = write_rows(&mut *conn, Backend::Sqlite, "t", &columns, rows, &opts).unwrap();
547        assert_eq!(report.rows_written, 3, "1,2,4 landed; 3 rejected");
548        assert_eq!(report.rejected_batches.len(), 0);
549        assert_eq!(report.rejected_rows.len(), 1);
550        assert_eq!(
551            report.rejected_rows[0].row_index, 2,
552            "0-based index of id=3"
553        );
554        let back = conn.query("SELECT COUNT(*) FROM t").unwrap();
555        assert!(matches!(back.rows[0][0], Value::Int64(4)));
556        let _ = std::fs::remove_file(&path);
557    }
558
559    /// `atomic` rolls the entire write back when any batch fails: nothing
560    /// from the write survives, even batches that ran before the failure.
561    #[test]
562    fn write_rows_atomic_rolls_back_on_failure() {
563        let (mut conn, path) = fresh_sqlite();
564        conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY)")
565            .unwrap();
566        conn.execute("INSERT INTO t VALUES (7)").unwrap();
567        let columns = vec![col("id")];
568        // Batch 0 [1,2] clean, batch 1 [7,8] collides on the seeded 7.
569        let rows: Vec<Row> = vec![1, 2, 7, 8]
570            .into_iter()
571            .map(|i| vec![Value::Int64(i)])
572            .collect();
573        let opts = WriteOptions {
574            batch_size: 2,
575            atomic: true,
576            ..Default::default()
577        };
578        let err = write_rows(&mut *conn, Backend::Sqlite, "t", &columns, rows, &opts)
579            .expect_err("atomic write must surface the failure");
580        assert!(matches!(err, SqlError::QueryFailed(_)));
581        // Only the pre-seeded row remains; the write's batch 0 rolled back.
582        let back = conn.query("SELECT COUNT(*) FROM t").unwrap();
583        assert!(
584            matches!(back.rows[0][0], Value::Int64(1)),
585            "atomic rollback left only the pre-existing row"
586        );
587        let _ = std::fs::remove_file(&path);
588    }
589
590    /// Upsert mode overwrites existing rows by key; reuses the copy
591    /// path's ON CONFLICT DO UPDATE builder.
592    #[test]
593    fn write_rows_upsert_overwrites_by_key() {
594        let (mut conn, path) = fresh_sqlite();
595        conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v TEXT)")
596            .unwrap();
597        conn.execute("INSERT INTO t VALUES (1, 'old')").unwrap();
598        let columns = vec![col("id"), col("v")];
599        let rows: Vec<Row> = vec![
600            vec![Value::Int64(1), Value::String("new".into())],
601            vec![Value::Int64(2), Value::String("two".into())],
602        ];
603        let opts = WriteOptions {
604            mode: WriteMode::Upsert,
605            key_columns: vec!["id".into()],
606            ..Default::default()
607        };
608        let report = write_rows(&mut *conn, Backend::Sqlite, "t", &columns, rows, &opts).unwrap();
609        assert!(report.is_complete());
610        let v1 = conn.query("SELECT v FROM t WHERE id = 1").unwrap();
611        assert!(matches!(&v1.rows[0][0], Value::String(s) if s == "new"));
612        let _ = std::fs::remove_file(&path);
613    }
614
615    /// Skip/Upsert without a key is rejected before any row is sent.
616    #[test]
617    fn write_rows_conflict_mode_requires_key() {
618        let (mut conn, path) = fresh_sqlite();
619        conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY)")
620            .unwrap();
621        let columns = vec![col("id")];
622        let opts = WriteOptions {
623            mode: WriteMode::Skip,
624            ..Default::default()
625        };
626        let err = write_rows(
627            &mut *conn,
628            Backend::Sqlite,
629            "t",
630            &columns,
631            vec![vec![Value::Int64(1)]],
632            &opts,
633        )
634        .expect_err("skip without key must fail fast");
635        assert!(matches!(err, SqlError::QueryFailed(_)));
636        let _ = std::fs::remove_file(&path);
637    }
638
639    /// A key column not present in the destination shape fails fast.
640    #[test]
641    fn write_rows_unknown_key_column_fails_fast() {
642        let (mut conn, path) = fresh_sqlite();
643        conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY)")
644            .unwrap();
645        let columns = vec![col("id")];
646        let opts = WriteOptions {
647            mode: WriteMode::Upsert,
648            key_columns: vec!["nonexistent".into()],
649            ..Default::default()
650        };
651        let err = write_rows(
652            &mut *conn,
653            Backend::Sqlite,
654            "t",
655            &columns,
656            vec![vec![Value::Int64(1)]],
657            &opts,
658        )
659        .expect_err("unknown key column must fail fast");
660        assert!(matches!(err, SqlError::QueryFailed(_)));
661        let _ = std::fs::remove_file(&path);
662    }
663}