Skip to main content

ferrule_core/
dump.rs

1use ferrule_sql::render_value;
2use ferrule_sql::value::{ColumnInfo, Value};
3use ferrule_sql::{Backend, Connection, SqlError};
4use std::fmt::Write as _;
5
6/// Supported dump formats.
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum DumpFormat {
9    Csv,
10    Json,
11    Sql,
12}
13
14impl DumpFormat {
15    pub fn parse(s: &str) -> Option<Self> {
16        match s.to_ascii_lowercase().as_str() {
17            "csv" => Some(Self::Csv),
18            "json" => Some(Self::Json),
19            "sql" => Some(Self::Sql),
20            _ => None,
21        }
22    }
23}
24
25/// Options for a dump operation.
26#[derive(Debug, Clone)]
27pub struct DumpOptions {
28    pub format: DumpFormat,
29    pub batch_size: usize,
30    pub schema: Option<String>,
31    /// When true and `format == DumpFormat::Sql`, produce a byte-stable
32    /// stream of one `INSERT INTO ... VALUES (...);` per row, with rows
33    /// ordered server-side by primary key (or by every column,
34    /// lexicographically, when the table has no PK — a warning is
35    /// emitted on stderr in that case). JSON cell values are
36    /// re-serialised with sorted object keys.
37    ///
38    /// Has no effect on `DumpFormat::Csv` / `DumpFormat::Json`.
39    pub deterministic: bool,
40}
41
42impl Default for DumpOptions {
43    fn default() -> Self {
44        Self {
45            format: DumpFormat::Csv,
46            batch_size: 1000,
47            schema: None,
48            deterministic: false,
49        }
50    }
51}
52
53/// Dump an entire table by streaming a single native cursor.
54pub fn dump_table(
55    conn: &mut dyn Connection,
56    table: &str,
57    backend: Backend,
58    opts: &DumpOptions,
59) -> Result<String, SqlError> {
60    let quoted_table = ferrule_sql::copy::quote_identifier(table, backend);
61
62    // Determinism (Scope A): append ORDER BY to the SELECT so the
63    // single streaming cursor yields rows in a stable, server-side
64    // order. (No LIMIT/OFFSET windowing is involved any more — one
65    // ordered SELECT streamed through one cursor is strictly more
66    // stable than the old paged path.)
67    let sql = if opts.deterministic && opts.format == DumpFormat::Sql {
68        let pks = conn.primary_key(opts.schema.as_deref(), table)?;
69        let order_cols: Vec<String> = if pks.is_empty() {
70            eprintln!(
71                "[ferrule] note: table '{table}' has no PRIMARY KEY; \
72                 sorting by all columns (slower)."
73            );
74            let described = conn.describe_table(opts.schema.as_deref(), table)?;
75            let mut names: Vec<String> = described.columns.iter().map(|c| c.name.clone()).collect();
76            names.sort();
77            names
78        } else {
79            pks
80        };
81        let order_by = build_order_by(&order_cols, backend);
82        format!("SELECT * FROM {quoted_table}{order_by}")
83    } else {
84        format!("SELECT * FROM {quoted_table}")
85    };
86
87    dump_query(conn, &sql, backend, opts, Some(table))
88}
89
90/// Dump the results of an arbitrary SELECT query.
91///
92/// Rows are streamed from a single native database cursor in bounded
93/// `batch_size` chunks, so fetch memory stays `O(batch)` rather than
94/// re-issuing an `O(offset)` LIMIT/OFFSET query per page. The assembled
95/// output is still buffered into the returned `String` (true
96/// row-to-writer streaming is a separate, deferred signature change).
97pub fn dump_query(
98    conn: &mut dyn Connection,
99    sql: &str,
100    backend: Backend,
101    opts: &DumpOptions,
102    table_name: Option<&str>,
103) -> Result<String, SqlError> {
104    // Determinism precondition: refuse `--deterministic` against a
105    // raw query that lacks an ORDER BY. The substring match is
106    // intentionally pragmatic — a query that contains "order by"
107    // inside a string literal or comment will pass this check. We
108    // accept those false positives in exchange for not building a
109    // SQL parser.
110    if opts.deterministic
111        && opts.format == DumpFormat::Sql
112        && !sql.to_lowercase().contains("order by")
113    {
114        return Err(SqlError::QueryFailed(
115            "dump_query --deterministic requires an ORDER BY clause in the source SQL \
116             (substring match is intentionally pragmatic — a query that contains \
117             'order by' only inside a comment or string literal will pass this check)."
118                .into(),
119        ));
120    }
121
122    // Stream rows from a single native cursor (#55-adjacent debt: was an
123    // O(n^2) LIMIT/OFFSET re-query loop). The cursor borrows `conn` for
124    // its whole lifetime, so any introspection call (`primary_key`,
125    // `describe_table`) must already have run — `dump_table` does that
126    // before it reaches here. Fetch memory is bounded to one
127    // `batch_size` chunk; the assembled output string is still buffered
128    // by value (true row-to-writer streaming is a separate signature
129    // change, deferred).
130    let mut cursor = conn.query_cursor(sql)?;
131    let columns: Vec<ColumnInfo> = cursor.columns().to_vec();
132
133    match opts.format {
134        DumpFormat::Csv => {
135            let mut buf = Vec::new();
136            {
137                let mut wtr = csv::Writer::from_writer(&mut buf);
138                if !columns.is_empty() {
139                    let headers: Vec<&str> = columns.iter().map(|c| c.name.as_str()).collect();
140                    wtr.write_record(&headers)
141                        .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
142                }
143                loop {
144                    let batch = cursor.next_batch(opts.batch_size)?;
145                    if batch.is_empty() {
146                        break;
147                    }
148                    for row in &batch {
149                        let cells: Vec<String> = row.iter().map(value_to_csv_cell).collect();
150                        wtr.write_record(&cells)
151                            .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
152                    }
153                }
154                wtr.flush()
155                    .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
156            }
157            String::from_utf8(buf).map_err(|e| SqlError::QueryFailed(e.to_string()))
158        }
159
160        DumpFormat::Json => {
161            let mut buf = Vec::new();
162            buf.push(b'[');
163            let mut first_row = true;
164
165            loop {
166                let batch = cursor.next_batch(opts.batch_size)?;
167                if batch.is_empty() {
168                    break;
169                }
170                for row in &batch {
171                    if !first_row {
172                        buf.push(b',');
173                    }
174                    first_row = false;
175
176                    let mut obj = serde_json::Map::new();
177                    for (col, val) in columns.iter().zip(row.iter()) {
178                        obj.insert(col.name.clone(), json_value(val));
179                    }
180                    let json_str = serde_json::to_string(&serde_json::Value::Object(obj))
181                        .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
182                    buf.extend_from_slice(json_str.as_bytes());
183                }
184            }
185
186            buf.push(b']');
187            String::from_utf8(buf).map_err(|e| SqlError::QueryFailed(e.to_string()))
188        }
189
190        DumpFormat::Sql => {
191            let table = table_name.unwrap_or("dumped_table");
192            let quoted_table = ferrule_sql::copy::quote_identifier(table, backend);
193            let col_names: Vec<String> = columns
194                .iter()
195                .map(|c| ferrule_sql::copy::quote_identifier(&c.name, backend))
196                .collect();
197            let cols = col_names.join(", ");
198            let mut out = String::new();
199
200            loop {
201                let batch = cursor.next_batch(opts.batch_size)?;
202                if batch.is_empty() {
203                    break;
204                }
205
206                if opts.deterministic {
207                    // One INSERT statement per row — eliminates batch-
208                    // boundary noise in diffs and keeps each row
209                    // independently re-orderable / re-loadable. Stream
210                    // straight into `out` to avoid a per-row Vec<String>
211                    // and an intermediate format!() temp.
212                    for row in &batch {
213                        let _ = write!(&mut out, "INSERT INTO {quoted_table} ({cols}) VALUES (");
214                        for (i, v) in row.iter().enumerate() {
215                            if i > 0 {
216                                out.push_str(", ");
217                            }
218                            out.push_str(&render_value_deterministic(v, backend));
219                        }
220                        out.push_str(");\n");
221                    }
222                } else {
223                    // One batched VALUES list per cursor chunk — keeps the
224                    // non-deterministic dump compact. (A chunk maps to one
225                    // `INSERT` statement, as the batched path always has.)
226                    let values: Vec<String> = batch
227                        .iter()
228                        .map(|row| {
229                            let cells: Vec<String> =
230                                row.iter().map(|v| render_value(v, backend)).collect();
231                            format!("({})", cells.join(", "))
232                        })
233                        .collect();
234
235                    out.push_str(&format!(
236                        "INSERT INTO {quoted_table} ({cols}) VALUES {};\n",
237                        values.join(", ")
238                    ));
239                }
240            }
241
242            Ok(out)
243        }
244    }
245}
246
247fn value_to_csv_cell(v: &Value) -> String {
248    match v {
249        Value::Null => String::new(),
250        Value::String(s) => s.clone(),
251        other => other.to_string(),
252    }
253}
254
255fn json_value(v: &Value) -> serde_json::Value {
256    match v {
257        Value::Null => serde_json::Value::Null,
258        Value::Bool(b) => serde_json::Value::Bool(*b),
259        Value::Int64(i) => serde_json::Value::Number((*i).into()),
260        Value::Float64(f) => serde_json::Value::Number(
261            serde_json::Number::from_f64(*f).unwrap_or_else(|| serde_json::Number::from(0)),
262        ),
263        Value::Decimal(d) => serde_json::Value::String(d.clone()),
264        Value::String(s) => serde_json::Value::String(s.clone()),
265        Value::Bytes(_b) => serde_json::Value::String(format!("<{} bytes>", _b.len())),
266        Value::Date(d) => serde_json::Value::String(d.to_string()),
267        Value::Time(t) => serde_json::Value::String(t.to_string()),
268        Value::DateTime(dt) => serde_json::Value::String(dt.to_string()),
269        Value::DateTimeTz(dt) => serde_json::Value::String(dt.to_rfc3339()),
270        Value::Json(j) => j.clone(),
271        Value::Uuid(u) => serde_json::Value::String(u.clone()),
272        Value::Array(a) => serde_json::Value::Array(a.iter().map(json_value).collect()),
273    }
274}
275
276/// Recursively re-serialise a JSON value with object keys in
277/// lexicographic order. Arrays preserve element order (positional);
278/// only object keys are reordered. Used by `--deterministic` to make
279/// dump output byte-stable across Postgres `JSONB` (hash-ordered) and
280/// MySQL `JSON` (insertion-ordered).
281fn canonicalize_json_value(v: serde_json::Value) -> serde_json::Value {
282    use serde_json::Value as J;
283    use std::collections::BTreeMap;
284    match v {
285        J::Object(map) => {
286            let sorted: BTreeMap<String, J> = map
287                .into_iter()
288                .map(|(k, v)| (k, canonicalize_json_value(v)))
289                .collect();
290            let mut out = serde_json::Map::with_capacity(sorted.len());
291            for (k, v) in sorted {
292                out.insert(k, v);
293            }
294            J::Object(out)
295        }
296        J::Array(arr) => J::Array(arr.into_iter().map(canonicalize_json_value).collect()),
297        other => other,
298    }
299}
300
301/// Render a [`Value`] for inclusion in a deterministic SQL dump.
302///
303/// Identical to [`ferrule_sql::render_value`] for every variant
304/// except `Value::Json`, which is re-serialised through
305/// [`canonicalize_json_value`] so object keys are sorted.
306fn render_value_deterministic(v: &Value, backend: Backend) -> String {
307    match v {
308        Value::Json(j) => {
309            let canon = canonicalize_json_value(j.clone());
310            ferrule_sql::quote_string(&canon.to_string())
311        }
312        _ => render_value(v, backend),
313    }
314}
315
316/// Build a backend-quoted `" ORDER BY a, b, c"` clause from a list of
317/// column names. Returns an empty string when `cols` is empty (no
318/// columns to sort by — caller is expected to handle that path).
319fn build_order_by(cols: &[String], backend: Backend) -> String {
320    if cols.is_empty() {
321        return String::new();
322    }
323    let quoted: Vec<String> = cols
324        .iter()
325        .map(|c| ferrule_sql::copy::quote_identifier(c, backend))
326        .collect();
327    format!(" ORDER BY {}", quoted.join(", "))
328}
329
330#[cfg(test)]
331mod tests {
332    use super::*;
333
334    #[test]
335    fn json_value_keys_sorted_in_deterministic() {
336        let v = serde_json::json!({"z":1, "a":2, "nested":{"y":1,"b":2}});
337        let c = canonicalize_json_value(v);
338        assert_eq!(c.to_string(), r#"{"a":2,"nested":{"b":2,"y":1},"z":1}"#);
339    }
340
341    #[cfg(feature = "sqlite")]
342    mod sqlite_dump_tests {
343        use super::*;
344        use ferrule_sql::ConnectOptions;
345        use ferrule_sql::DatabaseUrl;
346        use std::sync::atomic::{AtomicU64, Ordering};
347
348        static N: AtomicU64 = AtomicU64::new(0);
349
350        fn tmp_path(suffix: &str) -> std::path::PathBuf {
351            let pid = std::process::id();
352            let n = N.fetch_add(1, Ordering::SeqCst);
353            std::env::temp_dir().join(format!("ferrule-dump-test-{pid}-{n}-{suffix}.db"))
354        }
355
356        fn open_sqlite(path: &std::path::Path) -> Box<dyn ferrule_sql::Connection> {
357            let _ = std::fs::remove_file(path);
358            let url = DatabaseUrl::parse(&format!("sqlite://{}", path.display())).unwrap();
359            ferrule_sql::connect(&url, &ConnectOptions::default(), None).unwrap()
360        }
361
362        #[test]
363        fn dump_twice_byte_equal() {
364            let path = tmp_path("twice");
365            let mut conn = open_sqlite(&path);
366            conn.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
367                .unwrap();
368            // Insert out of PK order.
369            conn.execute("INSERT INTO users VALUES (2, 'Bob')").unwrap();
370            conn.execute("INSERT INTO users VALUES (1, 'Alice')")
371                .unwrap();
372            conn.execute("INSERT INTO users VALUES (3, 'Carol')")
373                .unwrap();
374
375            let opts = DumpOptions {
376                format: DumpFormat::Sql,
377                deterministic: true,
378                ..Default::default()
379            };
380            let out1 = dump_table(&mut conn, "users", Backend::Sqlite, &opts).unwrap();
381            let out2 = dump_table(&mut conn, "users", Backend::Sqlite, &opts).unwrap();
382            assert_eq!(out1, out2, "deterministic dump not byte-equal");
383            assert_eq!(
384                out1.matches("INSERT INTO").count(),
385                3,
386                "expected 3 INSERT lines, got:\n{out1}"
387            );
388            // Confirm the row order is sorted by PK (1, 2, 3).
389            let pos_alice = out1.find("Alice").unwrap();
390            let pos_bob = out1.find("Bob").unwrap();
391            let pos_carol = out1.find("Carol").unwrap();
392            assert!(pos_alice < pos_bob && pos_bob < pos_carol);
393
394            let _ = std::fs::remove_file(&path);
395        }
396
397        #[test]
398        fn dump_stable_across_insertion_order() {
399            let path_a = tmp_path("stable-a");
400            let path_b = tmp_path("stable-b");
401            let mut a = open_sqlite(&path_a);
402            let mut b = open_sqlite(&path_b);
403
404            a.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
405                .unwrap();
406            b.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
407                .unwrap();
408            // a: 1, 2, 3
409            a.execute("INSERT INTO users VALUES (1, 'Alice')").unwrap();
410            a.execute("INSERT INTO users VALUES (2, 'Bob')").unwrap();
411            a.execute("INSERT INTO users VALUES (3, 'Carol')").unwrap();
412            // b: 3, 1, 2
413            b.execute("INSERT INTO users VALUES (3, 'Carol')").unwrap();
414            b.execute("INSERT INTO users VALUES (1, 'Alice')").unwrap();
415            b.execute("INSERT INTO users VALUES (2, 'Bob')").unwrap();
416
417            let opts = DumpOptions {
418                format: DumpFormat::Sql,
419                deterministic: true,
420                ..Default::default()
421            };
422            let out_a = dump_table(&mut a, "users", Backend::Sqlite, &opts).unwrap();
423            let out_b = dump_table(&mut b, "users", Backend::Sqlite, &opts).unwrap();
424            assert_eq!(out_a, out_b);
425
426            let _ = std::fs::remove_file(&path_a);
427            let _ = std::fs::remove_file(&path_b);
428        }
429
430        #[test]
431        fn dump_no_pk_warns_and_sorts() {
432            let path = tmp_path("nopk");
433            let mut conn = open_sqlite(&path);
434            // SQLite "heap-ish" table — no INTEGER PRIMARY KEY. Note
435            // that a `WITHOUT ROWID` table would require an explicit
436            // PK, so we use a plain heap and confirm primary_key()
437            // returns empty.
438            conn.execute("CREATE TABLE heap (a INTEGER, b TEXT)")
439                .unwrap();
440            let pks = conn.primary_key(None, "heap").unwrap();
441            assert!(pks.is_empty(), "expected no PK for heap, got {pks:?}");
442
443            conn.execute("INSERT INTO heap VALUES (2, 'beta')").unwrap();
444            conn.execute("INSERT INTO heap VALUES (1, 'alpha')")
445                .unwrap();
446            conn.execute("INSERT INTO heap VALUES (3, 'gamma')")
447                .unwrap();
448
449            let opts = DumpOptions {
450                format: DumpFormat::Sql,
451                deterministic: true,
452                ..Default::default()
453            };
454            // Stderr capture is painful in cargo test; the docs/test
455            // contract is that the dumps are byte-equal even without
456            // a PK.
457            let out1 = dump_table(&mut conn, "heap", Backend::Sqlite, &opts).unwrap();
458            let out2 = dump_table(&mut conn, "heap", Backend::Sqlite, &opts).unwrap();
459            assert_eq!(out1, out2);
460            assert_eq!(out1.matches("INSERT INTO").count(), 3);
461
462            let _ = std::fs::remove_file(&path);
463        }
464
465        #[test]
466        fn dump_uses_backend_quoting() {
467            let path = tmp_path("quote");
468            let mut conn = open_sqlite(&path);
469            conn.execute(
470                "CREATE TABLE \"weird name\" (\"id\" INTEGER PRIMARY KEY, \"first name\" TEXT)",
471            )
472            .unwrap();
473            conn.execute("INSERT INTO \"weird name\" VALUES (1, 'Alice')")
474                .unwrap();
475
476            let opts = DumpOptions {
477                format: DumpFormat::Sql,
478                deterministic: true,
479                ..Default::default()
480            };
481            let out = dump_table(&mut conn, "weird name", Backend::Sqlite, &opts).unwrap();
482            // SQLite uses ANSI quotes — table and column names must
483            // each appear inside double quotes.
484            assert!(
485                out.contains("INSERT INTO \"weird name\""),
486                "expected ANSI-quoted table name, got:\n{out}"
487            );
488            assert!(
489                out.contains("\"first name\""),
490                "expected ANSI-quoted column name, got:\n{out}"
491            );
492
493            let _ = std::fs::remove_file(&path);
494        }
495
496        // #55-adjacent: the cursor rewrite must span multiple batches.
497        // 2500 rows > the 1000-row batch_size AND >
498        // DEFAULT_CURSOR_CAPACITY (1024), so every format walks the
499        // `next_batch` loop several times.
500        #[test]
501        fn dump_streams_across_multiple_batches() {
502            let path = tmp_path("multibatch");
503            let mut conn = open_sqlite(&path);
504            conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v TEXT)")
505                .unwrap();
506            const N: usize = 2500;
507            // Bulk insert in one statement to keep the fixture fast.
508            let mut sql = String::from("INSERT INTO t (id, v) VALUES ");
509            for i in 0..N {
510                if i > 0 {
511                    sql.push_str(", ");
512                }
513                sql.push_str(&format!("({}, 'row-{}')", i + 1, i + 1));
514            }
515            conn.execute(&sql).unwrap();
516
517            // CSV: N data rows + 1 header line.
518            let csv_opts = DumpOptions {
519                format: DumpFormat::Csv,
520                ..Default::default()
521            };
522            let csv = dump_table(&mut conn, "t", Backend::Sqlite, &csv_opts).unwrap();
523            let csv_lines = csv.lines().count();
524            assert_eq!(csv_lines, N + 1, "CSV should have N data + 1 header line");
525
526            // JSON: parses to an array of exactly N objects.
527            let json_opts = DumpOptions {
528                format: DumpFormat::Json,
529                ..Default::default()
530            };
531            let json = dump_table(&mut conn, "t", Backend::Sqlite, &json_opts).unwrap();
532            let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
533            assert_eq!(
534                parsed.as_array().map(|a| a.len()),
535                Some(N),
536                "JSON should hold N objects"
537            );
538
539            // SQL (deterministic = one INSERT per row): N INSERT lines.
540            let sql_opts = DumpOptions {
541                format: DumpFormat::Sql,
542                deterministic: true,
543                ..Default::default()
544            };
545            let dump = dump_table(&mut conn, "t", Backend::Sqlite, &sql_opts).unwrap();
546            assert_eq!(
547                dump.matches("INSERT INTO").count(),
548                N,
549                "deterministic SQL should have N INSERT lines"
550            );
551            // First and last rows are present and ordered by PK.
552            let first = dump.find("row-1\'").unwrap();
553            let last = dump.find(&format!("row-{N}\'")).unwrap();
554            assert!(first < last, "rows should be PK-ordered across batches");
555
556            let _ = std::fs::remove_file(&path);
557        }
558
559        #[test]
560        fn dump_deterministic_query_requires_order_by() {
561            let path = tmp_path("query-orderby");
562            let mut conn = open_sqlite(&path);
563            conn.execute("CREATE TABLE t (x INTEGER)").unwrap();
564
565            let opts = DumpOptions {
566                format: DumpFormat::Sql,
567                deterministic: true,
568                ..Default::default()
569            };
570
571            // Missing ORDER BY → error referencing the clause.
572            let err = dump_query(
573                &mut conn,
574                "SELECT 1 AS x",
575                Backend::Sqlite,
576                &opts,
577                Some("dummy"),
578            )
579            .unwrap_err();
580            assert!(
581                err.to_string().to_lowercase().contains("order by"),
582                "error should mention ORDER BY, got: {err}"
583            );
584
585            // Happy path — ORDER BY present.
586            dump_query(
587                &mut conn,
588                "SELECT 1 AS x ORDER BY 1",
589                Backend::Sqlite,
590                &opts,
591                Some("dummy"),
592            )
593            .expect("dump_query with ORDER BY should succeed");
594
595            let _ = std::fs::remove_file(&path);
596        }
597    }
598}