Skip to main content

write_rows

Function write_rows 

Source
pub fn write_rows<I>(
    dst: &mut dyn Connection,
    backend: Backend,
    table: &str,
    columns: &[ColumnInfo],
    rows: I,
    opts: &WriteOptions,
) -> Result<WriteReport, SqlError>
where I: IntoIterator<Item = Row>,
Expand description

Write rows into table on dst in bounded back-pressured batches, returning a structured WriteReport.

columns is the destination column order; every row must match it positionally. SQL generation, bulk dispatch, and transaction control are all delegated to the existing copy/transaction machinery (see the module docs). Rows are pulled from rows one batch at a time, so an unbounded iterator is written at O(batch_size) memory.

Blocking: issues synchronous statements through dst; blocks until the write completes (or the outer transaction rolls back under atomic).

Atomicity: with WriteOptions::atomic, all batches run inside one BEGIN/COMMIT (MSSQL SET XACT_ABORT ON, Oracle explicit COMMIT); the first failing batch rolls the whole write back and is recorded. Without it, each batch is independent — earlier committed batches stay, later ones still run, and failures are collected.

Examples found in repository?
examples/embed.rs (lines 114-121)
32fn main() -> Result<(), Box<dyn Error>> {
33    // A throwaway on-disk fixture in the temp dir. Using a file (rather
34    // than `:memory:`) keeps a single shared database across the two
35    // statements we run; the example removes it on the way out.
36    let path =
37        std::env::temp_dir().join(format!("ferrule-embed-example-{}.db", std::process::id()));
38    let _ = std::fs::remove_file(&path);
39    let url = DatabaseUrl::parse(&format!("sqlite://{}", path.display()))?;
40
41    // Step 1 — connect with a caller-resolved credential.
42    //
43    // `ConnectOptions::password` carries a `SecretString` the *host*
44    // resolved (env var, OS keyring, interactive prompt, …); `ferrule-sql`
45    // does no credential resolution of its own. The secret is redacted in
46    // `Debug` and zeroized on drop. SQLite ignores it (no auth), but a
47    // networked backend would consume it here.
48    let opts = ConnectOptions {
49        insecure: false,
50        password: Some(SecretString::from("unused-for-sqlite")),
51    };
52    let mut conn = ferrule_sql::connect(&url, &opts, None)?;
53
54    // Seed a fixture table to read back. `execute` blocks until the
55    // statement completes.
56    conn.execute("CREATE TABLE widget (id INTEGER PRIMARY KEY, name TEXT)")?;
57    conn.execute(
58        "INSERT INTO widget (id, name) VALUES \
59         (1, 'alpha'), (2, 'beta'), (3, 'gamma'), (4, 'delta'), (5, 'epsilon')",
60    )?;
61
62    // Step 2 — streaming read at bounded memory.
63    //
64    // `query_cursor` opens a native cursor; `next_batch(n)` pulls at most
65    // `n` rows at a time, so peak memory is `O(batch)` no matter how large
66    // the table is. The cursor borrows the connection for its lifetime, so
67    // it is scoped here and dropped before we write.
68    println!("streaming read (batched, bounded memory):");
69    let mut streamed = 0u64;
70    {
71        let mut cursor = conn.query_cursor("SELECT id, name FROM widget ORDER BY id")?;
72        loop {
73            let batch = cursor.next_batch(2)?; // 2 rows per pull
74            if batch.is_empty() {
75                break; // end of stream
76            }
77            for row in &batch {
78                println!("  row: {} = {}", render(&row[0]), render(&row[1]));
79                streamed += 1;
80            }
81        }
82    }
83    println!("  streamed {streamed} rows total\n");
84
85    // Step 3 — batched write with a structured report.
86    //
87    // `write_rows` consumes any `IntoIterator<Item = Row>` and flushes it
88    // in fixed-size batches (`WriteOptions::batch_size`), buffering only
89    // one batch at a time. The iterator below is materialized for brevity,
90    // but it could be a lazy generator over millions of rows — memory
91    // stays `O(batch_size)`. Pair it with the `query_cursor` above for an
92    // end-to-end bounded-memory pipe.
93    conn.execute("CREATE TABLE sink (id INTEGER PRIMARY KEY, label TEXT)")?;
94    let columns = [
95        ColumnInfo {
96            name: "id".into(),
97            type_hint: TypeHint::Int64,
98            nullable: false,
99        },
100        ColumnInfo {
101            name: "label".into(),
102            type_hint: TypeHint::String,
103            nullable: true,
104        },
105    ];
106    let rows: Vec<Row> = (1..=2500)
107        .map(|i| vec![Value::Int64(i), Value::String(format!("item-{i}"))])
108        .collect();
109    let write_opts = WriteOptions {
110        mode: WriteMode::Insert,
111        batch_size: 500, // 5 batches of 500; only one buffered at a time
112        ..Default::default()
113    };
114    let report = write_rows(
115        &mut *conn,
116        ferrule_sql::Backend::Sqlite,
117        "sink",
118        &columns,
119        rows,
120        &write_opts,
121    )?;
122    println!("batched write report:");
123    println!("  rows attempted : {}", report.rows_attempted);
124    println!("  rows written   : {}", report.rows_written);
125    println!("  batches        : {}", report.batches_committed);
126    println!("  complete       : {}", report.is_complete());
127
128    // Confirm the write landed.
129    let count = conn.query("SELECT COUNT(*) FROM sink")?;
130    if let Some(Value::Int64(n)) = count.rows.first().and_then(|r| r.first()) {
131        println!("  verified count : {n}");
132    }
133
134    let _ = std::fs::remove_file(&path);
135    Ok(())
136}