Skip to main content

fsqlite_core/
compat_persist.rs

1//! Compat persistence: read/write real SQLite-format database files.
2//!
3//! Bridges the in-memory `MemDatabase` to on-disk SQLite files via the
4//! pager + B-tree stack. The VDBE continues to execute against `MemDatabase`;
5//! this module serializes/deserializes that state to proper binary format.
6//!
7//! On **persist**, all tables and their rows are written to a real SQLite
8//! database file (with a valid header, sqlite_master, and B-tree pages).
9//!
10//! On **load**, a real `.db` file is read via B-tree cursors and its
11//! contents are replayed into a fresh `MemDatabase` + schema vector.
12
13#[cfg(not(target_arch = "wasm32"))]
14use std::path::Path;
15
16use fsqlite_ast::{
17    ColumnConstraintKind, CreateTableBody, DefaultValue, Expr, GeneratedStorage, IndexedColumn,
18    SortDirection, Statement, TableConstraintKind,
19};
20#[cfg(not(target_arch = "wasm32"))]
21use fsqlite_btree::BtreeCursorOps;
22#[cfg(not(target_arch = "wasm32"))]
23use fsqlite_btree::cursor::TransactionPageIo;
24use fsqlite_error::{FrankenError, Result};
25#[cfg(not(target_arch = "wasm32"))]
26use fsqlite_pager::{MvccPager, SimplePager, TransactionHandle, TransactionMode};
27use fsqlite_parser::Parser;
28use fsqlite_types::StrictColumnType;
29#[cfg(not(target_arch = "wasm32"))]
30use fsqlite_types::cx::Cx;
31#[cfg(not(target_arch = "wasm32"))]
32use fsqlite_types::record::{
33    RecordProfileScope, enter_record_profile_scope, parse_record, serialize_record,
34};
35use fsqlite_types::value::SqliteValue;
36#[cfg(not(target_arch = "wasm32"))]
37use fsqlite_types::{PageNumber, PageSize};
38use fsqlite_vdbe::codegen::{ColumnInfo, FkActionType, FkDef, IndexSchema, TableSchema};
39use fsqlite_vdbe::engine::MemDatabase;
40#[cfg(all(not(target_arch = "wasm32"), unix))]
41use fsqlite_vfs::UnixVfs as PlatformVfs;
42#[cfg(all(not(target_arch = "wasm32"), target_os = "windows"))]
43use fsqlite_vfs::WindowsVfs as PlatformVfs;
44#[cfg(not(target_arch = "wasm32"))]
45use fsqlite_vfs::host_fs;
46
47/// SQLite file header magic bytes (first 16 bytes).
48#[cfg(not(target_arch = "wasm32"))]
49const SQLITE_MAGIC: &[u8; 16] = b"SQLite format 3\0";
50
51/// Default page size used for newly-created databases.
52#[cfg(not(target_arch = "wasm32"))]
53const DEFAULT_PAGE_SIZE: PageSize = PageSize::DEFAULT;
54
55// ── Public API ──────────────────────────────────────────────────────────
56
57/// State loaded from a real SQLite file.
58#[derive(Debug)]
59pub struct LoadedState {
60    /// Reconstructed table schemas.
61    pub schema: Vec<TableSchema>,
62    /// In-memory database populated with all rows.
63    pub db: MemDatabase,
64    /// Number of sqlite_master entries loaded (the next available rowid
65    /// for sqlite_master is `master_row_count + 1`).
66    pub master_row_count: i64,
67    /// Schema cookie read from the database header (offset 40).
68    pub schema_cookie: u32,
69    /// File change counter read from the database header (offset 24).
70    pub change_counter: u32,
71}
72
73/// Detect whether a file starts with the SQLite magic header.
74///
75/// Returns `false` for non-existent, empty, or non-SQLite files.
76#[cfg(not(target_arch = "wasm32"))]
77pub fn is_sqlite_format(path: &Path) -> bool {
78    let Ok(data) = host_fs::read(path) else {
79        return false;
80    };
81    data.len() >= SQLITE_MAGIC.len() && data[..SQLITE_MAGIC.len()] == *SQLITE_MAGIC
82}
83
84/// Persist `schema` + `db` to a real SQLite-format database file at `path`.
85///
86/// Overwrites any existing file. The resulting file is readable by `sqlite3`.
87/// The caller supplies the capability context so pager and B-tree work stay
88/// attached to the active runtime lineage.
89///
90/// # Errors
91///
92/// Returns an error on I/O failure or if the B-tree layer rejects an
93/// insertion (e.g. duplicate rowid in sqlite_master).
94#[allow(clippy::too_many_lines)]
95#[cfg(not(target_arch = "wasm32"))]
96pub fn persist_to_sqlite(
97    cx: &Cx,
98    path: &Path,
99    schema: &[TableSchema],
100    db: &MemDatabase,
101    schema_cookie: u32,
102    change_counter: u32,
103) -> Result<()> {
104    // Remove existing file so the pager creates a fresh one.
105    if path.exists() {
106        // Truncate to empty so the pager treats it as a fresh DB, without
107        // requiring delete permissions on the parent directory.
108        host_fs::create_empty_file(path)?;
109    }
110
111    let vfs = PlatformVfs::new();
112    let pager = SimplePager::open_with_cx(cx, vfs, path, DEFAULT_PAGE_SIZE)?;
113    let mut txn = pager.begin(cx, TransactionMode::Immediate)?;
114
115    let ps = DEFAULT_PAGE_SIZE.as_usize();
116    let usable_size =
117        u32::try_from(ps).map_err(|_| FrankenError::internal("page size exceeds u32"))?;
118
119    // Track (name, root_page, create_sql) for sqlite_master entries.
120    let mut master_entries: Vec<(String, u32, String)> = Vec::new();
121
122    // Write each table's data into its own B-tree.
123    for table in schema {
124        let Some(mem_table) = db.get_table(table.root_page) else {
125            continue;
126        };
127
128        // Allocate a fresh root page for this table in the on-disk file.
129        let root_page = txn.allocate_page(cx)?;
130
131        // Initialize the root page as an empty leaf table B-tree.
132        init_leaf_table_page(cx, &mut txn, root_page, ps)?;
133
134        // Insert all rows.
135        {
136            let mut cursor = fsqlite_btree::BtCursor::new(
137                TransactionPageIo::new(&mut txn),
138                root_page,
139                usable_size,
140                true,
141            );
142            for (rowid, values) in mem_table.iter_rows() {
143                let payload = serialize_record(values);
144                cursor.table_insert(cx, rowid, &payload)?;
145            }
146        }
147
148        // Build CREATE TABLE SQL for sqlite_master.
149        let create_sql = build_create_table_sql(table);
150        master_entries.push((table.name.clone(), root_page.get(), create_sql));
151    }
152
153    // Write sqlite_master entries into page 1's B-tree.
154    // sqlite_master columns: type TEXT, name TEXT, tbl_name TEXT, rootpage INTEGER, sql TEXT
155    {
156        let master_root = PageNumber::ONE;
157        let mut cursor = fsqlite_btree::BtCursor::new(
158            TransactionPageIo::new(&mut txn),
159            master_root,
160            usable_size,
161            true,
162        );
163
164        for (rowid, (name, root_page_num, create_sql)) in master_entries.iter().enumerate() {
165            let record = serialize_record(&[
166                SqliteValue::Text("table".into()),
167                SqliteValue::Text(name.clone().into()),
168                SqliteValue::Text(name.clone().into()),
169                SqliteValue::Integer(i64::from(*root_page_num)),
170                SqliteValue::Text(create_sql.clone().into()),
171            ]);
172            #[allow(clippy::cast_possible_wrap)]
173            let rid = (rowid as i64) + 1;
174            cursor.table_insert(cx, rid, &record)?;
175        }
176    }
177
178    // Fix up the database header on page 1: update page_count,
179    // change_counter, and schema_cookie so sqlite3 validates the file.
180    {
181        let mut hdr_page = txn.get_page(cx, PageNumber::ONE)?.into_vec();
182
183        // Compute actual page count by finding the highest page number that was allocated.
184        // In a fresh database with no freelist, allocating one more page gives us (max_page + 1).
185        let next_page = txn.allocate_page(cx)?.get();
186        let max_page = next_page.saturating_sub(1).max(1);
187
188        // page_count at offset 28 (4 bytes, big-endian)
189        hdr_page[28..32].copy_from_slice(&max_page.to_be_bytes());
190
191        // change_counter at offset 24 — tracked by Connection, must be
192        // non-zero for sqlite3 to trust the header.  Use at least 1.
193        let effective_counter = change_counter.max(1);
194        hdr_page[24..28].copy_from_slice(&effective_counter.to_be_bytes());
195
196        // schema_cookie at offset 40 — tracked by Connection, incremented
197        // on every DDL operation.  Non-zero so sqlite3 re-reads schema.
198        let effective_cookie = schema_cookie.max(1);
199        hdr_page[40..44].copy_from_slice(&effective_cookie.to_be_bytes());
200
201        // version-valid-for at offset 92 (must match change_counter)
202        hdr_page[92..96].copy_from_slice(&effective_counter.to_be_bytes());
203
204        txn.write_page(cx, PageNumber::ONE, &hdr_page)?;
205    }
206
207    txn.commit(cx)?;
208    Ok(())
209}
210
211/// Load a real SQLite-format database file into `MemDatabase` + schema.
212///
213/// Reads sqlite_master from page 1, then reads each table's B-tree to
214/// populate the in-memory store.
215/// The caller supplies the capability context so pager reads inherit the
216/// active trace and budget lineage.
217///
218/// # Errors
219///
220/// Returns an error if the file is not a valid SQLite database, or on
221/// I/O / B-tree navigation failures.
222#[allow(clippy::too_many_lines, clippy::similar_names)]
223#[cfg(not(target_arch = "wasm32"))]
224pub fn load_from_sqlite(cx: &Cx, path: &Path) -> Result<LoadedState> {
225    let _record_profile_scope = enter_record_profile_scope(RecordProfileScope::CoreCompatPersist);
226    let vfs = PlatformVfs::new();
227    let pager = SimplePager::open_with_cx(cx, vfs, path, DEFAULT_PAGE_SIZE)?;
228    let mut txn = pager.begin(cx, TransactionMode::ReadOnly)?;
229
230    let ps = pager.page_size().as_usize();
231    let usable_size =
232        u32::try_from(ps).map_err(|_| FrankenError::internal("page size exceeds u32"))?;
233
234    // Read sqlite_master entries from page 1.
235    let master_entries = {
236        let mut entries = Vec::new();
237        let master_root = PageNumber::ONE;
238        let mut cursor = fsqlite_btree::BtCursor::new(
239            TransactionPageIo::new(&mut txn),
240            master_root,
241            usable_size,
242            true,
243        );
244
245        if cursor.first(cx)? {
246            loop {
247                let rowid = cursor.rowid(cx)?;
248                let payload = cursor.payload(cx)?;
249                let values =
250                    parse_record(&payload).ok_or_else(|| FrankenError::DatabaseCorrupt {
251                        detail: format!(
252                            "sqlite_master row {rowid} payload is not a valid SQLite record"
253                        ),
254                    })?;
255                entries.push(values);
256                if !cursor.next(cx)? {
257                    break;
258                }
259            }
260        }
261        entries
262    };
263
264    // Parse each sqlite_master row.
265    // Columns: type(0), name(1), tbl_name(2), rootpage(3), sql(4)
266    let mut schema = Vec::new();
267    let mut db = MemDatabase::new();
268
269    for entry in &master_entries {
270        if entry.len() < 5 {
271            continue;
272        }
273        let entry_type = match &entry[0] {
274            SqliteValue::Text(s) => s,
275            _ => continue,
276        };
277        if &**entry_type != "table" {
278            continue; // Skip indexes, views, triggers for now.
279        }
280
281        let name = match &entry[1] {
282            SqliteValue::Text(s) => s.clone(),
283            _ => continue,
284        };
285        let root_page_num = match &entry[3] {
286            SqliteValue::Integer(n) => *n,
287            _ => continue,
288        };
289        let create_sql = match &entry[4] {
290            SqliteValue::Text(s) => s.clone(),
291            _ => continue,
292        };
293
294        // Detect virtual tables by either of two patterns:
295        //   1. rootpage=0 — the stock SQLite convention (FTS5, rtree, etc.)
296        //   2. CREATE SQL starts with CREATE VIRTUAL TABLE — FrankenSQLite-native
297        //      virtual tables that are assigned a positive rootpage number.
298        // Using AND here was the bug (PR #33): a positive-rootpage virtual table
299        // would fail condition 1, fall through, and be reloaded as an ordinary
300        // B-tree table on reopen, silently breaking FTS5 and other vtab modules.
301        if root_page_num == 0 || is_virtual_table_sql(&create_sql) {
302            continue;
303        }
304        let root_page_u32 = validate_sqlite_master_root_page(&name, root_page_num)?;
305
306        // Parse the CREATE TABLE to extract column info and schema decorations.
307        let columns = parse_columns_from_sqlite_master_sql(&create_sql);
308        let indexes = extract_unique_constraint_indexes_from_sql(&create_sql, &name);
309        let primary_key_constraints = extract_primary_key_constraints_from_sql(&create_sql);
310        let foreign_keys = extract_foreign_keys_from_sql(&create_sql, &columns);
311        let check_constraints = extract_check_constraints_from_sql(&create_sql);
312        let num_columns = columns.len();
313        let without_rowid = is_without_rowid_table_sql(&create_sql);
314        let ipk_col_idx = columns.iter().position(|c| c.is_ipk);
315
316        // Use the REAL root page from sqlite_master (5A.4: bd-1soh).
317        let real_root_page =
318            i32::try_from(root_page_u32).expect("validated root page must fit MemDatabase");
319        db.create_table_at(real_root_page, num_columns);
320
321        let table_name_for_err = name.to_string();
322        schema.push(TableSchema {
323            name: name.to_string(),
324            root_page: real_root_page,
325            columns,
326            indexes: indexes.clone(),
327            strict: is_strict_table_sql(&create_sql),
328            without_rowid,
329            primary_key_constraints,
330            foreign_keys,
331            check_constraints,
332        });
333
334        // Read all rows from this table's B-tree.
335        let file_root =
336            PageNumber::new(root_page_u32).expect("validated sqlite_master root page is positive");
337
338        let mut cursor = fsqlite_btree::BtCursor::new(
339            TransactionPageIo::new(&mut txn),
340            file_root,
341            usable_size,
342            true,
343        );
344
345        if let Some(mem_table) = db.tables.get_mut(&real_root_page) {
346            let mut unique_groups = Vec::<Vec<usize>>::new();
347            for (column_index, column) in schema
348                .last()
349                .expect("current table schema must exist")
350                .columns
351                .iter()
352                .enumerate()
353            {
354                if column.unique && !column.is_ipk {
355                    unique_groups.push(vec![column_index]);
356                }
357            }
358            for index in &indexes {
359                if !index.is_unique || index.columns.is_empty() {
360                    continue;
361                }
362                let group = index
363                    .columns
364                    .iter()
365                    .filter_map(|column_name| {
366                        schema
367                            .last()
368                            .expect("current table schema must exist")
369                            .columns
370                            .iter()
371                            .position(|column| column.name.eq_ignore_ascii_case(column_name))
372                    })
373                    .collect::<Vec<_>>();
374                if group.is_empty()
375                    || group.iter().all(|&column_index| {
376                        schema
377                            .last()
378                            .expect("current table schema must exist")
379                            .columns[column_index]
380                            .is_ipk
381                    })
382                    || unique_groups.iter().any(|existing| existing == &group)
383                {
384                    continue;
385                }
386                unique_groups.push(group);
387            }
388            for group in unique_groups {
389                mem_table.add_unique_column_group(group);
390            }
391            if cursor.first(cx)? {
392                if without_rowid {
393                    return Err(FrankenError::NotImplemented(format!(
394                        "loading populated WITHOUT ROWID table `{table_name_for_err}` is not yet supported"
395                    )));
396                }
397                loop {
398                    let rowid = cursor.rowid(cx)?;
399                    let payload = cursor.payload(cx)?;
400                    let mut values = parse_record(&payload).ok_or_else(|| {
401                        FrankenError::DatabaseCorrupt {
402                            detail: format!(
403                                "table `{table_name_for_err}` rowid {rowid} payload is not a valid SQLite record"
404                            ),
405                        }
406                    })?;
407                    if !without_rowid && let Some(ipk_idx) = ipk_col_idx {
408                        hydrate_rowid_alias_value(
409                            &mut values,
410                            ipk_idx,
411                            rowid,
412                            num_columns,
413                            &table_name_for_err,
414                        )?;
415                    }
416                    mem_table.insert_row(rowid, values);
417                    if !cursor.next(cx)? {
418                        break;
419                    }
420                }
421            }
422        }
423    }
424
425    // Read schema_cookie and change_counter from the database header (page 1).
426    let (schema_cookie, change_counter) = {
427        let header_buf = txn.get_page(cx, PageNumber::ONE)?;
428        let hdr = header_buf.as_ref();
429        let cookie = if hdr.len() >= 44 {
430            u32::from_be_bytes([hdr[40], hdr[41], hdr[42], hdr[43]])
431        } else {
432            0
433        };
434        let counter = if hdr.len() >= 28 {
435            u32::from_be_bytes([hdr[24], hdr[25], hdr[26], hdr[27]])
436        } else {
437            0
438        };
439        (cookie, counter)
440    };
441
442    #[allow(clippy::cast_possible_wrap)]
443    let master_row_count = master_entries.len() as i64;
444    Ok(LoadedState {
445        schema,
446        db,
447        master_row_count,
448        schema_cookie,
449        change_counter,
450    })
451}
452
453// ── Helpers ─────────────────────────────────────────────────────────────
454
455/// Initialize a page as an empty leaf table B-tree page (type 0x0D).
456#[cfg(not(target_arch = "wasm32"))]
457fn init_leaf_table_page(
458    cx: &Cx,
459    txn: &mut impl TransactionHandle,
460    page_no: PageNumber,
461    page_size: usize,
462) -> Result<()> {
463    let mut page = vec![0u8; page_size];
464    page[0] = 0x0D; // Leaf table
465    // cell_count = 0 (bytes 3..5)
466    page[3..5].copy_from_slice(&0u16.to_be_bytes());
467    // cell content area starts at end of page
468    #[allow(clippy::cast_possible_truncation)]
469    let content_start = page_size as u16;
470    page[5..7].copy_from_slice(&content_start.to_be_bytes());
471    txn.write_page(cx, page_no, &page)
472}
473
474fn quote_identifier(identifier: &str) -> String {
475    let escaped = identifier.replace('"', "\"\"");
476    format!("\"{escaped}\"")
477}
478
479/// Reconstruct a `CREATE TABLE` statement from a `TableSchema`.
480pub(crate) fn build_create_table_sql(table: &TableSchema) -> String {
481    use std::fmt::Write as _;
482    let mut sql = format!("CREATE TABLE {} (", quote_identifier(&table.name));
483    let is_single_column_primary_key = |column_name: &str| {
484        table
485            .primary_key_constraints
486            .iter()
487            .any(|pk| pk.len() == 1 && pk[0].eq_ignore_ascii_case(column_name))
488    };
489    let primary_key_matches_index = |index: &fsqlite_vdbe::codegen::IndexSchema| {
490        table.primary_key_constraints.iter().any(|pk| {
491            pk.len() == index.columns.len()
492                && pk
493                    .iter()
494                    .zip(index.columns.iter())
495                    .all(|(lhs, rhs): (&String, &String)| lhs.eq_ignore_ascii_case(rhs))
496        })
497    };
498    for (i, col) in table.columns.iter().enumerate() {
499        if i > 0 {
500            sql.push_str(", ");
501        }
502        sql.push_str(&quote_identifier(&col.name));
503        if let Some(type_kw) = col.type_name.as_deref() {
504            let _ = write!(sql, " {type_kw}");
505        }
506        if col.is_ipk {
507            sql.push_str(" PRIMARY KEY");
508        }
509        if col.notnull && !col.is_ipk {
510            sql.push_str(" NOT NULL");
511        }
512        if col.unique && !col.is_ipk && !is_single_column_primary_key(&col.name) {
513            sql.push_str(" UNIQUE");
514        }
515        if let Some(ref default) = col.default_value {
516            sql.push_str(" DEFAULT ");
517            sql.push_str(default);
518        }
519        if let Some(ref collation) = col.collation {
520            sql.push_str(" COLLATE ");
521            sql.push_str(&quote_identifier(collation));
522        }
523        if let Some(ref gen_expr) = col.generated_expr {
524            sql.push_str(" GENERATED ALWAYS AS (");
525            sql.push_str(gen_expr);
526            sql.push(')');
527            if col.generated_stored == Some(true) {
528                sql.push_str(" STORED");
529            } else {
530                sql.push_str(" VIRTUAL");
531            }
532        }
533    }
534    for index in &table.indexes {
535        if !index.is_unique || index.columns.is_empty() || primary_key_matches_index(index) {
536            continue;
537        }
538        if index.columns.len() == 1
539            && table.columns.iter().any(|column| {
540                column.unique
541                    && !column.is_ipk
542                    && column.name.eq_ignore_ascii_case(&index.columns[0])
543            })
544        {
545            continue;
546        }
547        let cols = index
548            .columns
549            .iter()
550            .map(|name| quote_identifier(name))
551            .collect::<Vec<_>>()
552            .join(", ");
553        let _ = write!(sql, ", UNIQUE ({cols})");
554    }
555    for pk in &table.primary_key_constraints {
556        if pk.len() == 1
557            && table
558                .columns
559                .iter()
560                .any(|column| column.is_ipk && column.name.eq_ignore_ascii_case(&pk[0]))
561        {
562            continue;
563        }
564        let cols = pk
565            .iter()
566            .map(|name| quote_identifier(name))
567            .collect::<Vec<_>>()
568            .join(", ");
569        let _ = write!(sql, ", PRIMARY KEY ({cols})");
570    }
571    for fk in &table.foreign_keys {
572        let child_columns = fk
573            .child_columns
574            .iter()
575            .filter_map(|&column_index| table.columns.get(column_index))
576            .map(|column| quote_identifier(&column.name))
577            .collect::<Vec<_>>();
578        if child_columns.is_empty() {
579            continue;
580        }
581        let _ = write!(
582            sql,
583            ", FOREIGN KEY({}) REFERENCES {}",
584            child_columns.join(", "),
585            quote_identifier(&fk.parent_table)
586        );
587        if !fk.parent_columns.is_empty() {
588            let parent_columns = fk
589                .parent_columns
590                .iter()
591                .map(|column_name| quote_identifier(column_name))
592                .collect::<Vec<_>>()
593                .join(", ");
594            let _ = write!(sql, "({parent_columns})");
595        }
596        if fk.on_delete != FkActionType::NoAction {
597            let _ = write!(sql, " ON DELETE {}", fk_action_sql(fk.on_delete));
598        }
599        if fk.on_update != FkActionType::NoAction {
600            let _ = write!(sql, " ON UPDATE {}", fk_action_sql(fk.on_update));
601        }
602    }
603    for check_expr in &table.check_constraints {
604        let _ = write!(sql, ", CHECK({check_expr})");
605    }
606    sql.push(')');
607    let mut table_options = Vec::new();
608    if table.without_rowid {
609        table_options.push("WITHOUT ROWID");
610    }
611    if table.strict {
612        table_options.push("STRICT");
613    }
614    if !table_options.is_empty() {
615        sql.push(' ');
616        sql.push_str(&table_options.join(", "));
617    }
618    sql
619}
620
621const fn fk_action_sql(action: FkActionType) -> &'static str {
622    match action {
623        FkActionType::NoAction => "NO ACTION",
624        FkActionType::Restrict => "RESTRICT",
625        FkActionType::SetNull => "SET NULL",
626        FkActionType::SetDefault => "SET DEFAULT",
627        FkActionType::Cascade => "CASCADE",
628    }
629}
630
631pub(crate) fn extract_primary_key_constraints_from_sql(sql: &str) -> Vec<Vec<String>> {
632    let Some(Statement::CreateTable(create)) = parse_single_statement(sql) else {
633        return Vec::new();
634    };
635    let CreateTableBody::Columns {
636        columns,
637        constraints,
638    } = &create.body
639    else {
640        return Vec::new();
641    };
642
643    let mut primary_keys = columns
644        .iter()
645        .filter(|column| {
646            column.constraints.iter().any(|constraint| {
647                matches!(constraint.kind, ColumnConstraintKind::PrimaryKey { .. })
648            })
649        })
650        .map(|column| vec![column.name.clone()])
651        .collect::<Vec<_>>();
652
653    primary_keys.extend(constraints.iter().filter_map(|constraint| {
654        let TableConstraintKind::PrimaryKey {
655            columns: indexed_columns,
656            ..
657        } = &constraint.kind
658        else {
659            return None;
660        };
661        let columns = indexed_columns
662            .iter()
663            .filter_map(indexed_column_name)
664            .map(str::to_owned)
665            .collect::<Vec<_>>();
666        (!columns.is_empty()).then_some(columns)
667    }));
668
669    primary_keys
670}
671
672fn extract_unique_constraint_indexes_from_sql(sql: &str, table_name: &str) -> Vec<IndexSchema> {
673    let Some(Statement::CreateTable(create)) = parse_single_statement(sql) else {
674        return Vec::new();
675    };
676    let CreateTableBody::Columns {
677        columns,
678        constraints,
679    } = &create.body
680    else {
681        return Vec::new();
682    };
683
684    let mut indexes = Vec::new();
685    let mut autoindex_ordinal = 1_usize;
686
687    for column in columns {
688        let has_unique_constraint = column.constraints.iter().any(|constraint| {
689            matches!(
690                constraint.kind,
691                ColumnConstraintKind::Unique { .. } | ColumnConstraintKind::PrimaryKey { .. }
692            )
693        });
694        let is_ipk = column.type_name.as_ref().is_some_and(|type_name| {
695            type_name.name.eq_ignore_ascii_case("INTEGER")
696                && column.constraints.iter().any(|constraint| {
697                    matches!(
698                        constraint.kind,
699                        ColumnConstraintKind::PrimaryKey {
700                            direction: None | Some(SortDirection::Asc),
701                            ..
702                        }
703                    )
704                })
705        });
706        if has_unique_constraint && !is_ipk {
707            indexes.push(IndexSchema {
708                name: format!("sqlite_autoindex_{table_name}_{autoindex_ordinal}"),
709                root_page: 0,
710                columns: vec![column.name.clone()],
711                key_expressions: Vec::new(),
712                key_sort_directions: vec![SortDirection::Asc],
713                where_clause: None,
714                is_unique: true,
715            });
716            autoindex_ordinal += 1;
717        }
718    }
719
720    for constraint in constraints {
721        let (indexed_columns, is_primary_key) = match &constraint.kind {
722            TableConstraintKind::Unique {
723                columns: indexed_columns,
724                ..
725            } => (indexed_columns, false),
726            TableConstraintKind::PrimaryKey {
727                columns: indexed_columns,
728                ..
729            } => (indexed_columns, true),
730            _ => continue,
731        };
732        if is_primary_key
733            && table_primary_key_is_rowid_alias(columns, indexed_columns, create.without_rowid)
734        {
735            continue;
736        }
737        let columns = indexed_columns
738            .iter()
739            .filter_map(indexed_column_name)
740            .map(str::to_owned)
741            .collect::<Vec<_>>();
742        if columns.is_empty() {
743            continue;
744        }
745        indexes.push(IndexSchema {
746            name: format!("sqlite_autoindex_{table_name}_{autoindex_ordinal}"),
747            root_page: 0,
748            columns,
749            key_expressions: Vec::new(),
750            key_sort_directions: indexed_columns
751                .iter()
752                .map(|indexed| indexed.direction.unwrap_or(SortDirection::Asc))
753                .collect(),
754            where_clause: None,
755            is_unique: true,
756        });
757        autoindex_ordinal += 1;
758    }
759
760    indexes
761}
762
763fn extract_foreign_keys_from_sql(sql: &str, columns: &[ColumnInfo]) -> Vec<FkDef> {
764    let Some(Statement::CreateTable(create)) = parse_single_statement(sql) else {
765        return Vec::new();
766    };
767    let CreateTableBody::Columns {
768        columns: column_defs,
769        constraints,
770    } = &create.body
771    else {
772        return Vec::new();
773    };
774
775    let mut foreign_keys = Vec::new();
776    for (column_index, column) in column_defs.iter().enumerate() {
777        for constraint in &column.constraints {
778            if let ColumnConstraintKind::ForeignKey(clause) = &constraint.kind {
779                foreign_keys.push(fk_clause_to_def(&[column_index], clause));
780            }
781        }
782    }
783    for constraint in constraints {
784        if let TableConstraintKind::ForeignKey {
785            columns: child_columns,
786            clause,
787        } = &constraint.kind
788        {
789            let child_indices = child_columns
790                .iter()
791                .filter_map(|column_name| {
792                    columns
793                        .iter()
794                        .position(|column| column.name.eq_ignore_ascii_case(column_name))
795                })
796                .collect::<Vec<_>>();
797            if !child_indices.is_empty() {
798                foreign_keys.push(fk_clause_to_def(&child_indices, clause));
799            }
800        }
801    }
802
803    foreign_keys
804}
805
806fn fk_clause_to_def(child_indices: &[usize], clause: &fsqlite_ast::ForeignKeyClause) -> FkDef {
807    let mut on_delete = FkActionType::NoAction;
808    let mut on_update = FkActionType::NoAction;
809    for action in &clause.actions {
810        let action_type = match action.action {
811            fsqlite_ast::ForeignKeyActionType::SetNull => FkActionType::SetNull,
812            fsqlite_ast::ForeignKeyActionType::SetDefault => FkActionType::SetDefault,
813            fsqlite_ast::ForeignKeyActionType::Cascade => FkActionType::Cascade,
814            fsqlite_ast::ForeignKeyActionType::Restrict => FkActionType::Restrict,
815            fsqlite_ast::ForeignKeyActionType::NoAction => FkActionType::NoAction,
816        };
817        match action.trigger {
818            fsqlite_ast::ForeignKeyTrigger::OnDelete => on_delete = action_type,
819            fsqlite_ast::ForeignKeyTrigger::OnUpdate => on_update = action_type,
820        }
821    }
822    FkDef {
823        child_columns: child_indices.to_vec(),
824        parent_table: clause.table.clone(),
825        parent_columns: clause.columns.clone(),
826        on_delete,
827        on_update,
828    }
829}
830
831/// Indexed term metadata used to reconstruct `CREATE INDEX` SQL.
832#[derive(Debug, Clone, Copy)]
833#[allow(dead_code)]
834pub(crate) struct CreateIndexSqlTerm<'a> {
835    pub(crate) column_name: &'a str,
836    pub(crate) collation: Option<&'a str>,
837    pub(crate) direction: Option<SortDirection>,
838}
839
840/// Reconstruct a `CREATE INDEX` statement from index metadata.
841#[allow(dead_code)]
842pub(crate) fn build_create_index_sql(
843    index_name: &str,
844    table_name: &str,
845    unique: bool,
846    terms: &[CreateIndexSqlTerm<'_>],
847    where_clause: Option<&fsqlite_ast::Expr>,
848) -> String {
849    use std::fmt::Write as _;
850    let mut sql = if unique {
851        format!(
852            "CREATE UNIQUE INDEX {} ON {} (",
853            quote_identifier(index_name),
854            quote_identifier(table_name)
855        )
856    } else {
857        format!(
858            "CREATE INDEX {} ON {} (",
859            quote_identifier(index_name),
860            quote_identifier(table_name)
861        )
862    };
863    for (i, term) in terms.iter().enumerate() {
864        if i > 0 {
865            sql.push_str(", ");
866        }
867        sql.push_str(&quote_identifier(term.column_name));
868        if let Some(collation) = term.collation {
869            let _ = write!(sql, " COLLATE {}", quote_identifier(collation));
870        }
871        match term.direction {
872            Some(SortDirection::Asc) => sql.push_str(" ASC"),
873            Some(SortDirection::Desc) => sql.push_str(" DESC"),
874            None => {}
875        }
876    }
877    sql.push(')');
878    if let Some(expr) = where_clause {
879        let _ = write!(sql, " WHERE {expr}");
880    }
881    sql
882}
883
884/// Parse column info from a CREATE TABLE SQL string.
885///
886/// This is a best-effort parser that handles the common case of
887/// `CREATE TABLE "name" ("col1" TYPE, "col2" TYPE, ...)`.
888/// Extracts column names and affinities from the column definitions.
889/// Used by `load_from_sqlite` and `reload_memdb_from_pager` (bd-1ene).
890pub fn parse_columns_from_create_sql(sql: &str) -> Vec<ColumnInfo> {
891    if let Some(columns) = try_parse_columns_from_create_sql_ast(sql) {
892        return columns;
893    }
894
895    let is_strict = is_strict_table_sql(sql);
896    let is_without_rowid = is_without_rowid_table_sql(sql);
897    // Find the parenthesized column list.
898    let Some(open) = sql.find('(') else {
899        return Vec::new();
900    };
901    let Some(close) = sql.rfind(')') else {
902        return Vec::new();
903    };
904    if open >= close {
905        return Vec::new();
906    }
907
908    let body = &sql[open + 1..close];
909    split_top_level_csv_items(body)
910        .into_iter()
911        .filter_map(|col_def| {
912            if starts_with_unquoted_table_constraint(&col_def) {
913                return None;
914            }
915
916            let (name, remainder) = parse_column_name_and_remainder(&col_def)?;
917            let tokens: Vec<&str> = remainder.split_whitespace().collect();
918            let type_decl = extract_type_declaration(&tokens);
919            let affinity = type_to_affinity(&type_decl);
920            let upper = col_def.to_ascii_uppercase();
921            let is_ipk = !is_without_rowid
922                && upper.contains("PRIMARY KEY")
923                && !upper.contains("PRIMARY KEY DESC")
924                && type_decl.eq_ignore_ascii_case("INTEGER");
925            let type_name = if type_decl.is_empty() {
926                None
927            } else {
928                Some(type_decl)
929            };
930            let strict_type = if is_strict {
931                type_name
932                    .as_deref()
933                    .and_then(StrictColumnType::from_type_name)
934            } else {
935                None
936            };
937
938            let default_value = extract_default_value(remainder);
939
940            // Extract COLLATE name from column definition.
941            let collation = upper
942                .find("COLLATE ")
943                .map(|pos| {
944                    // Read the collation name from the original (non-uppercased) text.
945                    let after = &col_def[pos + 8..];
946                    after
947                        .split_whitespace()
948                        .next()
949                        .unwrap_or("")
950                        .trim_end_matches(',')
951                        .to_owned()
952                })
953                .filter(|s| !s.is_empty());
954
955            Some(ColumnInfo {
956                name,
957                affinity,
958                is_ipk,
959                type_name,
960                notnull: upper.contains("NOT NULL"),
961                unique: upper.contains("UNIQUE") || upper.contains("PRIMARY KEY"),
962                default_value,
963                strict_type,
964                generated_expr: None,
965                generated_stored: None,
966                collation,
967            })
968        })
969        .collect()
970}
971
972/// Extract column metadata from sqlite_master SQL for both ordinary and
973/// materialized virtual tables.
974#[must_use]
975pub fn parse_columns_from_sqlite_master_sql(sql: &str) -> Vec<ColumnInfo> {
976    if is_virtual_table_sql(sql) {
977        return parse_virtual_table_columns_from_sql(sql)
978            .unwrap_or_else(|| parse_columns_from_create_sql(sql));
979    }
980    parse_columns_from_create_sql(sql)
981}
982
983pub(crate) fn validate_sqlite_master_root_page(name: &str, root_page_num: i64) -> Result<u32> {
984    if root_page_num <= 0 {
985        return Err(FrankenError::DatabaseCorrupt {
986            detail: format!("table `{name}` has invalid rootpage {root_page_num} in sqlite_master"),
987        });
988    }
989
990    let root_page_u32 =
991        u32::try_from(root_page_num).map_err(|_| FrankenError::DatabaseCorrupt {
992            detail: format!(
993                "table `{name}` has out-of-range rootpage {root_page_num} in sqlite_master"
994            ),
995        })?;
996    i32::try_from(root_page_u32).map_err(|_| FrankenError::DatabaseCorrupt {
997        detail: format!("table `{name}` has rootpage {root_page_num} that exceeds supported range"),
998    })?;
999    Ok(root_page_u32)
1000}
1001
1002fn is_virtual_table_sql(sql: &str) -> bool {
1003    sql.trim_start()
1004        .to_ascii_uppercase()
1005        .starts_with("CREATE VIRTUAL TABLE")
1006}
1007
1008#[must_use]
1009pub fn is_without_rowid_table_sql(sql: &str) -> bool {
1010    if let Some(Statement::CreateTable(create)) = parse_single_statement(sql) {
1011        return create.without_rowid;
1012    }
1013
1014    let Some(close_paren) = sql.rfind(')') else {
1015        return false;
1016    };
1017    let tail = &sql[close_paren + 1..];
1018    let mut tokens = Vec::new();
1019    let mut token = String::new();
1020    for ch in tail.chars() {
1021        if ch.is_ascii_alphanumeric() || ch == '_' {
1022            token.push(ch.to_ascii_uppercase());
1023        } else if !token.is_empty() {
1024            tokens.push(std::mem::take(&mut token));
1025        }
1026    }
1027    if !token.is_empty() {
1028        tokens.push(token);
1029    }
1030    tokens
1031        .windows(2)
1032        .any(|window| window[0] == "WITHOUT" && window[1] == "ROWID")
1033}
1034
1035fn parse_virtual_table_columns_from_sql(sql: &str) -> Option<Vec<ColumnInfo>> {
1036    let mut parser = Parser::from_sql(sql);
1037    let (statements, errors) = parser.parse_all();
1038    if !errors.is_empty() || statements.len() != 1 {
1039        return None;
1040    }
1041    match statements.into_iter().next()? {
1042        Statement::CreateVirtualTable(create) => {
1043            Some(parse_virtual_table_column_infos(&create.args))
1044        }
1045        _ => None,
1046    }
1047}
1048
1049fn parse_virtual_table_column_infos(args: &[String]) -> Vec<ColumnInfo> {
1050    let mut columns = Vec::new();
1051    let mut seen = std::collections::HashSet::<String>::new();
1052
1053    for arg in args {
1054        let trimmed = arg.trim();
1055        if trimmed.is_empty() || trimmed.contains('=') {
1056            continue;
1057        }
1058        let raw_name = trimmed
1059            .split_whitespace()
1060            .next()
1061            .unwrap_or_default()
1062            .trim_matches(|ch| matches!(ch, '"' | '\'' | '`' | '[' | ']'));
1063        if raw_name.is_empty() {
1064            continue;
1065        }
1066        let key = raw_name.to_ascii_lowercase();
1067        if !seen.insert(key) {
1068            continue;
1069        }
1070        columns.push(ColumnInfo {
1071            name: raw_name.to_owned(),
1072            affinity: 'C',
1073            is_ipk: false,
1074            type_name: None,
1075            notnull: false,
1076            unique: false,
1077            default_value: None,
1078            strict_type: None,
1079            generated_expr: None,
1080            generated_stored: None,
1081            collation: None,
1082        });
1083    }
1084
1085    if columns.is_empty() {
1086        columns.push(ColumnInfo {
1087            name: "content".to_owned(),
1088            affinity: 'C',
1089            is_ipk: false,
1090            type_name: None,
1091            notnull: false,
1092            unique: false,
1093            default_value: None,
1094            strict_type: None,
1095            generated_expr: None,
1096            generated_stored: None,
1097            collation: None,
1098        });
1099    }
1100
1101    columns
1102}
1103
1104/// Return true when CREATE TABLE SQL declares the table as STRICT.
1105#[must_use]
1106pub fn is_strict_table_sql(sql: &str) -> bool {
1107    if let Some(Statement::CreateTable(create)) = parse_single_statement(sql) {
1108        return create.strict;
1109    }
1110
1111    let Some(close_paren) = sql.rfind(')') else {
1112        return false;
1113    };
1114    let tail = &sql[close_paren + 1..];
1115    let mut token = String::new();
1116    for ch in tail.chars() {
1117        if ch.is_ascii_alphanumeric() || ch == '_' {
1118            token.push(ch.to_ascii_uppercase());
1119        } else if !token.is_empty() {
1120            if token == "STRICT" {
1121                return true;
1122            }
1123            token.clear();
1124        }
1125    }
1126    token == "STRICT"
1127}
1128
1129/// Return true when CREATE TABLE SQL declares AUTOINCREMENT.
1130#[must_use]
1131pub fn is_autoincrement_table_sql(sql: &str) -> bool {
1132    if let Some(Statement::CreateTable(create)) = parse_single_statement(sql)
1133        && let CreateTableBody::Columns { columns, .. } = &create.body
1134    {
1135        return columns.iter().any(|col| {
1136            let is_integer = col
1137                .type_name
1138                .as_ref()
1139                .is_some_and(|tn| tn.name.eq_ignore_ascii_case("INTEGER"));
1140            is_integer
1141                && col.constraints.iter().any(|constraint| {
1142                    matches!(
1143                        &constraint.kind,
1144                        ColumnConstraintKind::PrimaryKey {
1145                            autoincrement: true,
1146                            direction,
1147                            ..
1148                        } if *direction != Some(SortDirection::Desc)
1149                    )
1150                })
1151        });
1152    }
1153
1154    let mut token = String::new();
1155    for ch in sql.chars() {
1156        if ch.is_ascii_alphanumeric() || ch == '_' {
1157            token.push(ch.to_ascii_uppercase());
1158        } else if !token.is_empty() {
1159            if token == "AUTOINCREMENT" {
1160                return true;
1161            }
1162            token.clear();
1163        }
1164    }
1165    token == "AUTOINCREMENT"
1166}
1167
1168/// Extract CHECK constraint expressions from a CREATE TABLE SQL string.
1169///
1170/// Finds `CHECK(...)` clauses in the column-def body and returns the
1171/// expression text (inside the parentheses) for each one.
1172#[must_use]
1173pub fn extract_check_constraints_from_sql(sql: &str) -> Vec<String> {
1174    if let Some(Statement::CreateTable(create)) = parse_single_statement(sql)
1175        && let CreateTableBody::Columns {
1176            columns,
1177            constraints,
1178        } = &create.body
1179    {
1180        let mut checks = Vec::new();
1181        for column in columns {
1182            for constraint in &column.constraints {
1183                if let ColumnConstraintKind::Check(expr) = &constraint.kind {
1184                    checks.push(expr.to_string());
1185                }
1186            }
1187        }
1188        for constraint in constraints {
1189            if let TableConstraintKind::Check(expr) = &constraint.kind {
1190                checks.push(expr.to_string());
1191            }
1192        }
1193        return checks;
1194    }
1195
1196    let Some(open) = sql.find('(') else {
1197        return Vec::new();
1198    };
1199    let Some(close) = sql.rfind(')') else {
1200        return Vec::new();
1201    };
1202    if open >= close {
1203        return Vec::new();
1204    }
1205    let body = &sql[open + 1..close];
1206    let upper = body.to_ascii_uppercase();
1207    let mut checks = Vec::new();
1208    let mut search_from = 0;
1209    while let Some(pos) = upper[search_from..].find("CHECK") {
1210        let abs_pos = search_from + pos;
1211        let after = &body[abs_pos + 5..].trim_start();
1212        if after.starts_with('(') {
1213            // Find matching closing paren.
1214            let mut depth = 0_i32;
1215            let mut end = None;
1216            for (i, ch) in after.char_indices() {
1217                match ch {
1218                    '(' => depth += 1,
1219                    ')' => {
1220                        depth -= 1;
1221                        if depth == 0 {
1222                            end = Some(i);
1223                            break;
1224                        }
1225                    }
1226                    _ => {}
1227                }
1228            }
1229            if let Some(end_idx) = end {
1230                let expr = &after[1..end_idx];
1231                checks.push(expr.trim().to_owned());
1232                search_from = abs_pos + 5 + end_idx + 1;
1233            } else {
1234                search_from = abs_pos + 5;
1235            }
1236        } else {
1237            search_from = abs_pos + 5;
1238        }
1239    }
1240    checks
1241}
1242
1243fn parse_column_name_and_remainder(def: &str) -> Option<(String, &str)> {
1244    let trimmed = def.trim_start();
1245    if trimmed.is_empty() {
1246        return None;
1247    }
1248    let bytes = trimmed.as_bytes();
1249    let (name_raw, remainder) = match bytes[0] {
1250        b'"' => parse_quoted_identifier(trimmed, b'"', b'"')?,
1251        b'`' => parse_quoted_identifier(trimmed, b'`', b'`')?,
1252        b'[' => parse_bracket_identifier(trimmed)?,
1253        _ => {
1254            let end = trimmed.find(char::is_whitespace).unwrap_or(trimmed.len());
1255            (&trimmed[..end], &trimmed[end..])
1256        }
1257    };
1258    Some((strip_identifier_quotes(name_raw), remainder.trim_start()))
1259}
1260
1261fn parse_single_statement(sql: &str) -> Option<Statement> {
1262    let mut parser = Parser::from_sql(sql);
1263    let (statements, errors) = parser.parse_all();
1264    if !errors.is_empty() || statements.len() != 1 {
1265        return None;
1266    }
1267    statements.into_iter().next()
1268}
1269
1270fn format_default_value(dv: &DefaultValue) -> String {
1271    match dv {
1272        DefaultValue::Expr(expr) => expr.to_string(),
1273        DefaultValue::ParenExpr(expr) => format!("({expr})"),
1274    }
1275}
1276
1277fn indexed_column_name(indexed_column: &IndexedColumn) -> Option<&str> {
1278    fn extract(expr: &Expr) -> Option<&str> {
1279        match expr {
1280            Expr::Column(col_ref, _) if col_ref.table.is_none() => Some(&col_ref.column),
1281            Expr::Collate { expr, .. } => extract(expr),
1282            _ => None,
1283        }
1284    }
1285
1286    extract(&indexed_column.expr)
1287}
1288
1289fn hydrate_rowid_alias_value(
1290    values: &mut Vec<SqliteValue>,
1291    ipk_idx: usize,
1292    rowid: i64,
1293    num_columns: usize,
1294    table_name: &str,
1295) -> Result<()> {
1296    match values.len() {
1297        len if len + 1 == num_columns => {
1298            values.insert(ipk_idx, SqliteValue::Integer(rowid));
1299        }
1300        len if len == num_columns => match values.get_mut(ipk_idx) {
1301            Some(slot @ SqliteValue::Null) => {
1302                *slot = SqliteValue::Integer(rowid);
1303            }
1304            Some(SqliteValue::Integer(encoded_rowid)) if *encoded_rowid == rowid => {}
1305            Some(SqliteValue::Integer(encoded_rowid)) => {
1306                return Err(FrankenError::DatabaseCorrupt {
1307                    detail: format!(
1308                        "table `{table_name}` rowid {rowid} stores inconsistent INTEGER PRIMARY KEY alias value {encoded_rowid}"
1309                    ),
1310                });
1311            }
1312            Some(other) => {
1313                return Err(FrankenError::DatabaseCorrupt {
1314                    detail: format!(
1315                        "table `{table_name}` rowid {rowid} stores non-integer INTEGER PRIMARY KEY alias value {other:?}"
1316                    ),
1317                });
1318            }
1319            None => {
1320                return Err(FrankenError::DatabaseCorrupt {
1321                    detail: format!(
1322                        "table `{table_name}` rowid {rowid} payload is missing INTEGER PRIMARY KEY alias column"
1323                    ),
1324                });
1325            }
1326        },
1327        len => {
1328            return Err(FrankenError::DatabaseCorrupt {
1329                detail: format!(
1330                    "table `{table_name}` rowid {rowid} payload has {len} columns; expected {} or {}",
1331                    num_columns.saturating_sub(1),
1332                    num_columns
1333                ),
1334            });
1335        }
1336    }
1337
1338    Ok(())
1339}
1340
1341fn table_primary_key_is_rowid_alias(
1342    columns: &[fsqlite_ast::ColumnDef],
1343    indexed_columns: &[IndexedColumn],
1344    without_rowid: bool,
1345) -> bool {
1346    if without_rowid || indexed_columns.len() != 1 {
1347        return false;
1348    }
1349    let Some(column_name) = indexed_column_name(&indexed_columns[0]) else {
1350        return false;
1351    };
1352    columns
1353        .iter()
1354        .find(|column| column.name.eq_ignore_ascii_case(column_name))
1355        .and_then(|column| column.type_name.as_ref())
1356        .is_some_and(|type_name| type_name.name.eq_ignore_ascii_case("INTEGER"))
1357}
1358
1359fn try_parse_columns_from_create_sql_ast(sql: &str) -> Option<Vec<ColumnInfo>> {
1360    let Statement::CreateTable(create) = parse_single_statement(sql)? else {
1361        return None;
1362    };
1363    let CreateTableBody::Columns { columns, .. } = &create.body else {
1364        return None;
1365    };
1366
1367    let mut table_pk_cols = vec![false; columns.len()];
1368    let mut table_unique_cols = vec![false; columns.len()];
1369    let mut table_pk_rowid_col_idx = None;
1370
1371    if let CreateTableBody::Columns { constraints, .. } = &create.body {
1372        for constraint in constraints {
1373            match &constraint.kind {
1374                TableConstraintKind::PrimaryKey {
1375                    columns: pk_columns,
1376                    ..
1377                } if pk_columns.len() == 1 => {
1378                    let Some(column_name) = indexed_column_name(&pk_columns[0]) else {
1379                        continue;
1380                    };
1381                    let Some(index) = columns
1382                        .iter()
1383                        .position(|col| col.name.eq_ignore_ascii_case(column_name))
1384                    else {
1385                        continue;
1386                    };
1387
1388                    table_pk_cols[index] = true;
1389                    table_unique_cols[index] = true;
1390
1391                    let is_integer = columns[index]
1392                        .type_name
1393                        .as_ref()
1394                        .is_some_and(|tn| tn.name.eq_ignore_ascii_case("INTEGER"));
1395                    if is_integer && !create.without_rowid {
1396                        table_pk_rowid_col_idx = Some(index);
1397                    }
1398                }
1399                TableConstraintKind::Unique {
1400                    columns: unique_columns,
1401                    ..
1402                } if unique_columns.len() == 1 => {
1403                    let Some(column_name) = indexed_column_name(&unique_columns[0]) else {
1404                        continue;
1405                    };
1406                    let Some(index) = columns
1407                        .iter()
1408                        .position(|col| col.name.eq_ignore_ascii_case(column_name))
1409                    else {
1410                        continue;
1411                    };
1412                    table_unique_cols[index] = true;
1413                }
1414                _ => {}
1415            }
1416        }
1417    }
1418
1419    let rowid_col_idx = columns
1420        .iter()
1421        .enumerate()
1422        .find_map(|(index, col)| {
1423            let is_integer = col
1424                .type_name
1425                .as_ref()
1426                .is_some_and(|tn| tn.name.eq_ignore_ascii_case("INTEGER"));
1427            let pk = col.constraints.iter().find_map(|constraint| {
1428                if let ColumnConstraintKind::PrimaryKey { direction, .. } = &constraint.kind {
1429                    if *direction != Some(SortDirection::Desc) {
1430                        Some(())
1431                    } else {
1432                        None
1433                    }
1434                } else {
1435                    None
1436                }
1437            });
1438            if is_integer && pk.is_some() && !create.without_rowid {
1439                Some(index)
1440            } else {
1441                None
1442            }
1443        })
1444        .or(table_pk_rowid_col_idx);
1445
1446    Some(
1447        columns
1448            .iter()
1449            .enumerate()
1450            .map(|(index, col)| {
1451                let affinity = col
1452                    .type_name
1453                    .as_ref()
1454                    .map_or('A', |type_name| type_to_affinity(&type_name.name));
1455                let type_name = col.type_name.as_ref().map(std::string::ToString::to_string);
1456                let is_ipk = rowid_col_idx.is_some_and(|rowid_index| rowid_index == index);
1457                let notnull = col.constraints.iter().any(|constraint| {
1458                    matches!(&constraint.kind, ColumnConstraintKind::NotNull { .. })
1459                });
1460                let has_primary_key = col.constraints.iter().any(|constraint| {
1461                    matches!(&constraint.kind, ColumnConstraintKind::PrimaryKey { .. })
1462                });
1463                let unique = (!is_ipk && has_primary_key)
1464                    || table_pk_cols[index]
1465                    || table_unique_cols[index]
1466                    || col.constraints.iter().any(|constraint| {
1467                        matches!(&constraint.kind, ColumnConstraintKind::Unique { .. })
1468                    });
1469                let default_value = col
1470                    .constraints
1471                    .iter()
1472                    .find_map(|constraint| match &constraint.kind {
1473                        ColumnConstraintKind::Default(default_value) => {
1474                            Some(format_default_value(default_value))
1475                        }
1476                        _ => None,
1477                    });
1478                let strict_type = if create.strict {
1479                    type_name
1480                        .as_deref()
1481                        .and_then(StrictColumnType::from_type_name)
1482                } else {
1483                    None
1484                };
1485                let (generated_expr, generated_stored) = col
1486                    .constraints
1487                    .iter()
1488                    .find_map(|constraint| match &constraint.kind {
1489                        ColumnConstraintKind::Generated { expr, storage } => {
1490                            let stored = storage
1491                                .as_ref()
1492                                .is_some_and(|storage| *storage == GeneratedStorage::Stored);
1493                            Some((Some(expr.to_string()), Some(stored)))
1494                        }
1495                        _ => None,
1496                    })
1497                    .unwrap_or((None, None));
1498                let collation = col.constraints.iter().find_map(|constraint| {
1499                    if let ColumnConstraintKind::Collate(name) = &constraint.kind {
1500                        Some(name.clone())
1501                    } else {
1502                        None
1503                    }
1504                });
1505
1506                ColumnInfo {
1507                    name: col.name.clone(),
1508                    affinity,
1509                    is_ipk,
1510                    type_name,
1511                    notnull,
1512                    unique,
1513                    default_value,
1514                    strict_type,
1515                    generated_expr,
1516                    generated_stored,
1517                    collation,
1518                }
1519            })
1520            .collect(),
1521    )
1522}
1523
1524fn parse_quoted_identifier(input: &str, quote: u8, escape: u8) -> Option<(&str, &str)> {
1525    let bytes = input.as_bytes();
1526    let mut i = 1usize;
1527    while i < bytes.len() {
1528        if bytes[i] == quote {
1529            if i + 1 < bytes.len() && bytes[i + 1] == escape {
1530                i += 2;
1531                continue;
1532            }
1533            return Some((&input[..=i], &input[i + 1..]));
1534        }
1535        i += 1;
1536    }
1537    None
1538}
1539
1540fn parse_bracket_identifier(input: &str) -> Option<(&str, &str)> {
1541    let bytes = input.as_bytes();
1542    let mut i = 1usize;
1543    while i < bytes.len() {
1544        if bytes[i] == b']' {
1545            return Some((&input[..=i], &input[i + 1..]));
1546        }
1547        i += 1;
1548    }
1549    None
1550}
1551
1552const COLUMN_CONSTRAINT_KEYWORDS: &[&str] = &[
1553    "CONSTRAINT",
1554    "PRIMARY",
1555    "NOT",
1556    "NULL",
1557    "UNIQUE",
1558    "CHECK",
1559    "DEFAULT",
1560    "COLLATE",
1561    "REFERENCES",
1562    "GENERATED",
1563    "AS",
1564];
1565
1566/// Split a comma-separated SQL list while respecting parentheses, quotes,
1567/// and top-level `-- ...` line comments.
1568fn split_top_level_csv_items(input: &str) -> Vec<String> {
1569    let mut chars = input.char_indices().peekable();
1570    let mut out = Vec::new();
1571    let mut current = String::new();
1572    let mut paren_depth = 0usize;
1573    let mut quote: Option<char> = None;
1574    let mut in_brackets = false;
1575
1576    while let Some((_, ch)) = chars.next() {
1577        if let Some(q) = quote {
1578            current.push(ch);
1579            if ch == q {
1580                if let Some(&(_, next_ch)) = chars.peek() {
1581                    if next_ch == q {
1582                        current.push(next_ch);
1583                        chars.next();
1584                    } else {
1585                        quote = None;
1586                    }
1587                } else {
1588                    quote = None;
1589                }
1590            }
1591            continue;
1592        }
1593
1594        if in_brackets {
1595            current.push(ch);
1596            if ch == ']' {
1597                in_brackets = false;
1598            }
1599            continue;
1600        }
1601
1602        match ch {
1603            '\'' | '"' | '`' => {
1604                quote = Some(ch);
1605                current.push(ch);
1606            }
1607            '[' => {
1608                in_brackets = true;
1609                current.push(ch);
1610            }
1611            '-' if chars.peek().is_some_and(|(_, next_ch)| *next_ch == '-') => {
1612                chars.next();
1613                let ends_with_whitespace = current.chars().last().is_some_and(char::is_whitespace);
1614                if !current.trim_end().is_empty() && !ends_with_whitespace {
1615                    current.push(' ');
1616                }
1617
1618                while let Some((_, next_ch)) = chars.next() {
1619                    if next_ch == '\n' {
1620                        break;
1621                    }
1622                    if next_ch == '\r' {
1623                        if chars.peek().is_some_and(|(_, next_ch)| *next_ch == '\n') {
1624                            chars.next();
1625                        }
1626                        break;
1627                    }
1628                }
1629            }
1630            '(' => {
1631                paren_depth = paren_depth.saturating_add(1);
1632                current.push(ch);
1633            }
1634            ')' => {
1635                paren_depth = paren_depth.saturating_sub(1);
1636                current.push(ch);
1637            }
1638            ',' if paren_depth == 0 => {
1639                let part = current.trim();
1640                if !part.is_empty() {
1641                    out.push(part.to_owned());
1642                }
1643                current.clear();
1644            }
1645            _ => current.push(ch),
1646        }
1647    }
1648
1649    let tail = current.trim();
1650    if !tail.is_empty() {
1651        out.push(tail.to_owned());
1652    }
1653
1654    out
1655}
1656
1657fn starts_with_unquoted_table_constraint(def: &str) -> bool {
1658    let trimmed = def.trim_start();
1659    if trimmed.is_empty() {
1660        return false;
1661    }
1662    match trimmed.as_bytes()[0] {
1663        b'"' | b'`' | b'[' => return false,
1664        _ => {}
1665    }
1666    let upper = trimmed.to_ascii_uppercase();
1667    upper.starts_with("CONSTRAINT ")
1668        || upper.starts_with("PRIMARY KEY")
1669        || upper == "PRIMARY"
1670        || upper.starts_with("UNIQUE ")
1671        || upper.starts_with("UNIQUE(")
1672        || upper == "UNIQUE"
1673        || upper.starts_with("CHECK ")
1674        || upper.starts_with("CHECK(")
1675        || upper == "CHECK"
1676        || upper.starts_with("FOREIGN KEY")
1677        || upper.starts_with("FOREIGN(")
1678        || upper == "FOREIGN"
1679}
1680
1681fn strip_identifier_quotes(token: &str) -> String {
1682    let trimmed = token.trim();
1683    if trimmed.len() >= 2 {
1684        if trimmed.starts_with('"') && trimmed.ends_with('"') {
1685            return trimmed[1..trimmed.len() - 1].replace("\"\"", "\"");
1686        }
1687        if trimmed.starts_with('`') && trimmed.ends_with('`') {
1688            return trimmed[1..trimmed.len() - 1].replace("``", "`");
1689        }
1690        if trimmed.starts_with('[') && trimmed.ends_with(']') {
1691            return trimmed[1..trimmed.len() - 1].to_owned();
1692        }
1693    }
1694    trimmed.to_owned()
1695}
1696
1697fn extract_type_declaration(tokens: &[&str]) -> String {
1698    let mut parts = Vec::new();
1699    let mut paren_depth = 0isize;
1700    for token in tokens {
1701        let token_upper = token
1702            .trim_matches(|c: char| c == ',' || c == ';')
1703            .to_ascii_uppercase();
1704        if paren_depth == 0 && COLUMN_CONSTRAINT_KEYWORDS.contains(&token_upper.as_str()) {
1705            break;
1706        }
1707        parts.push(*token);
1708        for ch in token.chars() {
1709            if ch == '(' {
1710                paren_depth += 1;
1711            } else if ch == ')' && paren_depth > 0 {
1712                paren_depth -= 1;
1713            }
1714        }
1715    }
1716    parts.join(" ")
1717}
1718
1719/// Extract a DEFAULT value from a column definition remainder (the part after
1720/// the column name).  Handles `DEFAULT literal`, `DEFAULT -number`,
1721/// `DEFAULT 'string'`, and `DEFAULT (expr)`.
1722fn extract_default_value(remainder: &str) -> Option<String> {
1723    let upper = remainder.to_ascii_uppercase();
1724    let pos = upper.find("DEFAULT")?;
1725    let after = remainder[pos + 7..].trim_start();
1726    if after.is_empty() {
1727        return None;
1728    }
1729    // Parenthesized expression: DEFAULT (...)
1730    if after.starts_with('(') {
1731        let mut depth = 0i32;
1732        for (i, ch) in after.char_indices() {
1733            if ch == '(' {
1734                depth += 1;
1735            } else if ch == ')' {
1736                depth -= 1;
1737                if depth == 0 {
1738                    return Some(after[..=i].to_owned());
1739                }
1740            }
1741        }
1742        return None;
1743    }
1744    // Quoted string: DEFAULT '...'
1745    if let Some(rest) = after.strip_prefix('\'') {
1746        let mut i = 0;
1747        let bytes = rest.as_bytes();
1748        while i < bytes.len() {
1749            if bytes[i] == b'\'' {
1750                if i + 1 < bytes.len() && bytes[i + 1] == b'\'' {
1751                    i += 2;
1752                    continue;
1753                }
1754                return Some(after[..i + 2].to_owned());
1755            }
1756            i += 1;
1757        }
1758        return None;
1759    }
1760    // Unquoted token: DEFAULT NULL, DEFAULT 0, DEFAULT -1, DEFAULT CURRENT_TIMESTAMP
1761    let end = after
1762        .find(|c: char| c.is_ascii_whitespace() || c == ',')
1763        .unwrap_or(after.len());
1764    let token = &after[..end];
1765    if token.is_empty() {
1766        None
1767    } else {
1768        Some(token.to_owned())
1769    }
1770}
1771
1772/// Map a SQL type keyword to an affinity character.
1773fn type_to_affinity(type_str: &str) -> char {
1774    // SQLite affinity rules (section 3.1 of datatype3.html):
1775    // Priority: INT > TEXT/CHAR/CLOB > BLOB/empty > REAL/FLOA/DOUB > NUMERIC
1776    let upper = type_str.to_uppercase();
1777    if upper.contains("INT") {
1778        'D' // INTEGER affinity
1779    } else if upper.contains("TEXT") || upper.contains("CHAR") || upper.contains("CLOB") {
1780        'B' // TEXT affinity
1781    } else if upper.contains("BLOB") || upper.is_empty() {
1782        'A' // BLOB (none) affinity
1783    } else if upper.contains("REAL") || upper.contains("FLOA") || upper.contains("DOUB") {
1784        'E' // REAL affinity
1785    } else {
1786        'C' // NUMERIC affinity
1787    }
1788}
1789
1790// ── Tests ───────────────────────────────────────────────────────────────
1791
1792#[cfg(test)]
1793mod tests {
1794    use super::*;
1795
1796    fn persist_test_db(
1797        path: &Path,
1798        schema: &[TableSchema],
1799        db: &MemDatabase,
1800        schema_cookie: u32,
1801        change_counter: u32,
1802    ) -> Result<()> {
1803        let cx = Cx::new();
1804        persist_to_sqlite(&cx, path, schema, db, schema_cookie, change_counter)
1805    }
1806
1807    fn load_test_db(path: &Path) -> Result<LoadedState> {
1808        let cx = Cx::new();
1809        load_from_sqlite(&cx, path)
1810    }
1811
1812    fn make_test_schema_and_db() -> (Vec<TableSchema>, MemDatabase) {
1813        let mut db = MemDatabase::new();
1814        let root = db.create_table(2);
1815        let table = db.tables.get_mut(&root).unwrap();
1816        table.insert_row(
1817            1,
1818            vec![SqliteValue::Integer(42), SqliteValue::Text("hello".into())],
1819        );
1820        table.insert_row(
1821            2,
1822            vec![SqliteValue::Integer(99), SqliteValue::Text("world".into())],
1823        );
1824
1825        let schema = vec![TableSchema {
1826            name: "test_table".to_owned(),
1827            root_page: root,
1828            columns: vec![
1829                ColumnInfo {
1830                    name: "id".to_owned(),
1831                    affinity: 'd',
1832                    is_ipk: false,
1833                    type_name: None,
1834                    notnull: false,
1835                    unique: false,
1836                    default_value: None,
1837                    strict_type: None,
1838                    generated_expr: None,
1839                    generated_stored: None,
1840                    collation: None,
1841                },
1842                ColumnInfo {
1843                    name: "name".to_owned(),
1844                    affinity: 'C',
1845                    is_ipk: false,
1846                    type_name: None,
1847                    notnull: false,
1848                    unique: false,
1849                    default_value: None,
1850                    strict_type: None,
1851                    generated_expr: None,
1852                    generated_stored: None,
1853                    collation: None,
1854                },
1855            ],
1856            indexes: Vec::new(),
1857            strict: false,
1858            without_rowid: false,
1859            primary_key_constraints: Vec::new(),
1860            foreign_keys: Vec::new(),
1861            check_constraints: Vec::new(),
1862        }];
1863
1864        (schema, db)
1865    }
1866
1867    #[test]
1868    fn test_roundtrip_persist_and_load() {
1869        let dir = tempfile::tempdir().unwrap();
1870        let db_path = dir.path().join("test.db");
1871
1872        let (schema, db) = make_test_schema_and_db();
1873        persist_test_db(&db_path, &schema, &db, 0, 0).unwrap();
1874
1875        assert!(db_path.exists(), "db file should exist");
1876        assert!(is_sqlite_format(&db_path), "should have SQLite magic");
1877
1878        let loaded = load_test_db(&db_path).unwrap();
1879        assert_eq!(loaded.schema.len(), 1);
1880        assert_eq!(loaded.schema[0].name, "test_table");
1881        assert_eq!(loaded.schema[0].columns.len(), 2);
1882
1883        let table = loaded.db.get_table(loaded.schema[0].root_page).unwrap();
1884        let rows: Vec<_> = table.iter_rows().collect();
1885        assert_eq!(rows.len(), 2);
1886        assert_eq!(rows[0].0, 1); // rowid
1887        assert_eq!(rows[0].1[0], SqliteValue::Integer(42));
1888        assert_eq!(rows[0].1[1], SqliteValue::Text("hello".into()));
1889        assert_eq!(rows[1].0, 2);
1890        assert_eq!(rows[1].1[0], SqliteValue::Integer(99));
1891        assert_eq!(rows[1].1[1], SqliteValue::Text("world".into()));
1892    }
1893
1894    #[test]
1895    fn test_empty_database_roundtrip() {
1896        let dir = tempfile::tempdir().unwrap();
1897        let db_path = dir.path().join("empty.db");
1898
1899        let schema: Vec<TableSchema> = Vec::new();
1900        let db = MemDatabase::new();
1901        persist_test_db(&db_path, &schema, &db, 0, 0).unwrap();
1902
1903        assert!(is_sqlite_format(&db_path));
1904
1905        let loaded = load_test_db(&db_path).unwrap();
1906        assert!(loaded.schema.is_empty());
1907    }
1908
1909    #[test]
1910    fn test_persist_creates_sqlite3_readable_file() {
1911        let dir = tempfile::tempdir().unwrap();
1912        let db_path = dir.path().join("readable.db");
1913
1914        let (schema, db) = make_test_schema_and_db();
1915        persist_test_db(&db_path, &schema, &db, 0, 0).unwrap();
1916
1917        // Verify with rusqlite (C SQLite) that the file is valid.
1918        let conn = rusqlite::Connection::open(&db_path).unwrap();
1919        let mut stmt = conn
1920            .prepare("SELECT id, name FROM test_table ORDER BY id")
1921            .unwrap();
1922        let rows: Vec<(i64, String)> = stmt
1923            .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))
1924            .unwrap()
1925            .collect::<std::result::Result<Vec<_>, _>>()
1926            .unwrap();
1927
1928        assert_eq!(rows.len(), 2);
1929        assert_eq!(rows[0], (42, "hello".to_owned()));
1930        assert_eq!(rows[1], (99, "world".to_owned()));
1931    }
1932
1933    #[test]
1934    fn test_parse_virtual_table_columns_from_sql_rejects_trailing_junk() {
1935        assert!(
1936            parse_virtual_table_columns_from_sql("CREATE VIRTUAL TABLE docs USING fts5(a) garbage")
1937                .is_none(),
1938            "trailing tokens must invalidate virtual-table SQL during compat import"
1939        );
1940    }
1941
1942    #[test]
1943    fn test_load_sqlite3_created_file() {
1944        let dir = tempfile::tempdir().unwrap();
1945        let db_path = dir.path().join("from_c.db");
1946
1947        // Create with C SQLite via rusqlite.
1948        {
1949            let conn = rusqlite::Connection::open(&db_path).unwrap();
1950            conn.execute_batch(
1951                "CREATE TABLE items (val INTEGER, label TEXT);
1952                 INSERT INTO items VALUES (10, 'alpha');
1953                 INSERT INTO items VALUES (20, 'beta');",
1954            )
1955            .unwrap();
1956        }
1957
1958        // Load with our compat loader.
1959        let loaded = load_test_db(&db_path).unwrap();
1960        assert_eq!(loaded.schema.len(), 1);
1961        assert_eq!(loaded.schema[0].name, "items");
1962
1963        let table = loaded.db.get_table(loaded.schema[0].root_page).unwrap();
1964        let rows: Vec<_> = table.iter_rows().collect();
1965        assert_eq!(rows.len(), 2);
1966        assert_eq!(rows[0].1[0], SqliteValue::Integer(10));
1967        assert_eq!(rows[0].1[1], SqliteValue::Text("alpha".into()));
1968        assert_eq!(rows[1].1[0], SqliteValue::Integer(20));
1969        assert_eq!(rows[1].1[1], SqliteValue::Text("beta".into()));
1970    }
1971
1972    #[test]
1973    fn test_load_sqlite3_created_file_restores_integer_primary_key_alias_values() {
1974        let dir = tempfile::tempdir().unwrap();
1975        let db_path = dir.path().join("from_c_ipk.db");
1976
1977        {
1978            let conn = rusqlite::Connection::open(&db_path).unwrap();
1979            conn.execute_batch(
1980                "CREATE TABLE items (id INTEGER PRIMARY KEY, label TEXT);
1981                 INSERT INTO items (id, label) VALUES (10, 'alpha');
1982                 INSERT INTO items (id, label) VALUES (20, 'beta');",
1983            )
1984            .unwrap();
1985        }
1986
1987        let loaded = load_test_db(&db_path).unwrap();
1988        assert_eq!(loaded.schema.len(), 1);
1989        assert_eq!(loaded.schema[0].name, "items");
1990        assert!(loaded.schema[0].columns[0].is_ipk);
1991        assert!(
1992            loaded.schema[0].indexes.is_empty(),
1993            "table-level INTEGER PRIMARY KEY rowid aliases must not synthesize autoindexes"
1994        );
1995
1996        let table = loaded.db.get_table(loaded.schema[0].root_page).unwrap();
1997        let rows: Vec<_> = table.iter_rows().collect();
1998        assert_eq!(rows.len(), 2);
1999        assert_eq!(rows[0].0, 10);
2000        assert_eq!(rows[0].1[0], SqliteValue::Integer(10));
2001        assert_eq!(rows[0].1[1], SqliteValue::Text("alpha".into()));
2002        assert_eq!(rows[1].0, 20);
2003        assert_eq!(rows[1].1[0], SqliteValue::Integer(20));
2004        assert_eq!(rows[1].1[1], SqliteValue::Text("beta".into()));
2005    }
2006
2007    #[test]
2008    fn test_load_sqlite3_created_file_restores_table_level_integer_primary_key_alias_values() {
2009        let dir = tempfile::tempdir().unwrap();
2010        let db_path = dir.path().join("from_c_table_pk.db");
2011
2012        {
2013            let conn = rusqlite::Connection::open(&db_path).unwrap();
2014            conn.execute_batch(
2015                "CREATE TABLE items (id INTEGER, label TEXT, PRIMARY KEY(id));
2016                 INSERT INTO items (id, label) VALUES (10, 'alpha');
2017                 INSERT INTO items (id, label) VALUES (20, 'beta');",
2018            )
2019            .unwrap();
2020        }
2021
2022        let loaded = load_test_db(&db_path).unwrap();
2023        assert_eq!(loaded.schema.len(), 1);
2024        assert_eq!(loaded.schema[0].name, "items");
2025        assert!(loaded.schema[0].columns[0].is_ipk);
2026
2027        let table = loaded.db.get_table(loaded.schema[0].root_page).unwrap();
2028        let rows: Vec<_> = table.iter_rows().collect();
2029        assert_eq!(rows.len(), 2);
2030        assert_eq!(rows[0].0, 10);
2031        assert_eq!(rows[0].1[0], SqliteValue::Integer(10));
2032        assert_eq!(rows[0].1[1], SqliteValue::Text("alpha".into()));
2033        assert_eq!(rows[1].0, 20);
2034        assert_eq!(rows[1].1[0], SqliteValue::Integer(20));
2035        assert_eq!(rows[1].1[1], SqliteValue::Text("beta".into()));
2036    }
2037
2038    #[test]
2039    fn test_is_sqlite_format_text_file() {
2040        let dir = tempfile::tempdir().unwrap();
2041        let path = dir.path().join("text.db");
2042        host_fs::write(&path, b"CREATE TABLE t (x);").unwrap();
2043        assert!(!is_sqlite_format(&path));
2044    }
2045
2046    #[test]
2047    fn test_is_sqlite_format_nonexistent() {
2048        assert!(!is_sqlite_format(Path::new(
2049            "/tmp/nonexistent_compat_test.db"
2050        )));
2051    }
2052
2053    #[test]
2054    fn test_multiple_tables_roundtrip() {
2055        let dir = tempfile::tempdir().unwrap();
2056        let db_path = dir.path().join("multi.db");
2057
2058        let mut db = MemDatabase::new();
2059        let root_a = db.create_table(1);
2060        db.tables
2061            .get_mut(&root_a)
2062            .unwrap()
2063            .insert_row(1, vec![SqliteValue::Text("row_a".into())]);
2064
2065        let root_b = db.create_table(1);
2066        db.tables
2067            .get_mut(&root_b)
2068            .unwrap()
2069            .insert_row(1, vec![SqliteValue::Integer(777)]);
2070
2071        let schema = vec![
2072            TableSchema {
2073                name: "alpha".to_owned(),
2074                root_page: root_a,
2075                columns: vec![ColumnInfo {
2076                    name: "val".to_owned(),
2077                    affinity: 'C',
2078                    is_ipk: false,
2079                    type_name: None,
2080                    notnull: false,
2081                    unique: false,
2082                    default_value: None,
2083                    strict_type: None,
2084                    generated_expr: None,
2085                    generated_stored: None,
2086                    collation: None,
2087                }],
2088                indexes: Vec::new(),
2089                strict: false,
2090                without_rowid: false,
2091                primary_key_constraints: Vec::new(),
2092                foreign_keys: Vec::new(),
2093                check_constraints: Vec::new(),
2094            },
2095            TableSchema {
2096                name: "beta".to_owned(),
2097                root_page: root_b,
2098                columns: vec![ColumnInfo {
2099                    name: "num".to_owned(),
2100                    affinity: 'd',
2101                    is_ipk: false,
2102                    type_name: None,
2103                    notnull: false,
2104                    unique: false,
2105                    default_value: None,
2106                    strict_type: None,
2107                    generated_expr: None,
2108                    generated_stored: None,
2109                    collation: None,
2110                }],
2111                indexes: Vec::new(),
2112                strict: false,
2113                without_rowid: false,
2114                primary_key_constraints: Vec::new(),
2115                foreign_keys: Vec::new(),
2116                check_constraints: Vec::new(),
2117            },
2118        ];
2119
2120        persist_test_db(&db_path, &schema, &db, 0, 0).unwrap();
2121        let loaded = load_test_db(&db_path).unwrap();
2122
2123        assert_eq!(loaded.schema.len(), 2);
2124        assert_eq!(loaded.schema[0].name, "alpha");
2125        assert_eq!(loaded.schema[1].name, "beta");
2126
2127        let tbl_a = loaded.db.get_table(loaded.schema[0].root_page).unwrap();
2128        let rows_a: Vec<_> = tbl_a.iter_rows().collect();
2129        assert_eq!(rows_a[0].1[0], SqliteValue::Text("row_a".into()));
2130
2131        let tbl_b = loaded.db.get_table(loaded.schema[1].root_page).unwrap();
2132        let rows_b: Vec<_> = tbl_b.iter_rows().collect();
2133        assert_eq!(rows_b[0].1[0], SqliteValue::Integer(777));
2134    }
2135
2136    #[test]
2137    fn test_parse_columns_from_create_sql() {
2138        let sql = r#"CREATE TABLE "foo" ("id" INTEGER, "name" TEXT, "data" BLOB)"#;
2139        let cols = parse_columns_from_create_sql(sql);
2140        assert_eq!(cols.len(), 3);
2141        assert_eq!(cols[0].name, "id");
2142        assert_eq!(cols[0].affinity, 'D');
2143        assert_eq!(cols[1].name, "name");
2144        assert_eq!(cols[1].affinity, 'B');
2145        assert_eq!(cols[2].name, "data");
2146        assert_eq!(cols[2].affinity, 'A');
2147    }
2148
2149    #[test]
2150    fn test_parse_columns_from_create_sql_handles_nested_commas_and_constraints() {
2151        let sql = r"CREATE TABLE metrics (
2152            id INTEGER PRIMARY KEY,
2153            amount DECIMAL(10,2) NOT NULL,
2154            status TEXT CHECK (status IN ('a,b', 'c')),
2155            CONSTRAINT metrics_pk PRIMARY KEY (id)
2156        )";
2157        let cols = parse_columns_from_create_sql(sql);
2158        assert_eq!(cols.len(), 3);
2159        assert_eq!(cols[0].name, "id");
2160        assert_eq!(cols[0].affinity, 'D');
2161        assert!(cols[0].is_ipk);
2162        assert_eq!(cols[1].name, "amount");
2163        assert_eq!(cols[1].affinity, 'C');
2164        assert_eq!(cols[2].name, "status");
2165        assert_eq!(cols[2].affinity, 'B');
2166    }
2167
2168    #[test]
2169    fn test_parse_columns_from_create_sql_table_level_integer_primary_key_is_ipk() {
2170        let sql = "CREATE TABLE metrics (id INTEGER, body TEXT, PRIMARY KEY(id))";
2171        let cols = parse_columns_from_create_sql(sql);
2172        assert_eq!(cols.len(), 2);
2173        assert_eq!(cols[0].name, "id");
2174        assert!(cols[0].is_ipk);
2175        assert_eq!(cols[1].name, "body");
2176    }
2177
2178    #[test]
2179    fn test_parse_columns_from_create_sql_table_level_integer_primary_key_desc_is_ipk() {
2180        let sql = "CREATE TABLE metrics (id INTEGER, body TEXT, PRIMARY KEY(id DESC))";
2181        let cols = parse_columns_from_create_sql(sql);
2182        assert_eq!(cols.len(), 2);
2183        assert_eq!(cols[0].name, "id");
2184        assert!(cols[0].is_ipk);
2185        assert_eq!(cols[1].name, "body");
2186    }
2187
2188    #[test]
2189    fn test_parse_columns_from_create_sql_table_level_integer_primary_key_collate_desc_is_ipk() {
2190        let sql =
2191            "CREATE TABLE metrics (id INTEGER, body TEXT, PRIMARY KEY(id COLLATE NOCASE DESC))";
2192        let cols = parse_columns_from_create_sql(sql);
2193        assert_eq!(cols.len(), 2);
2194        assert_eq!(cols[0].name, "id");
2195        assert!(cols[0].is_ipk);
2196        assert_eq!(cols[1].name, "body");
2197    }
2198
2199    #[test]
2200    fn test_parse_columns_from_create_sql_without_rowid_integer_pk_is_not_ipk() {
2201        let sql = "CREATE TABLE wr (id INTEGER PRIMARY KEY, body TEXT) WITHOUT ROWID";
2202        let cols = parse_columns_from_create_sql(sql);
2203        assert_eq!(cols.len(), 2);
2204        assert_eq!(cols[0].name, "id");
2205        assert!(!cols[0].is_ipk);
2206        assert!(cols[0].unique);
2207        assert_eq!(cols[1].name, "body");
2208    }
2209
2210    #[test]
2211    fn test_parse_columns_from_create_sql_keeps_quoted_keyword_column_name() {
2212        let sql = r#"CREATE TABLE t ("primary" TEXT, value INTEGER)"#;
2213        let cols = parse_columns_from_create_sql(sql);
2214        assert_eq!(cols.len(), 2);
2215        assert_eq!(cols[0].name, "primary");
2216        assert_eq!(cols[0].affinity, 'B');
2217        assert_eq!(cols[1].name, "value");
2218        assert_eq!(cols[1].affinity, 'D');
2219    }
2220
2221    #[test]
2222    fn test_parse_columns_from_create_sql_handles_quoted_names_with_spaces() {
2223        let sql = r#"CREATE TABLE t ("first name" TEXT, [last name] INTEGER, `role name` NUMERIC)"#;
2224        let cols = parse_columns_from_create_sql(sql);
2225        assert_eq!(cols.len(), 3);
2226        assert_eq!(cols[0].name, "first name");
2227        assert_eq!(cols[0].affinity, 'B');
2228        assert_eq!(cols[1].name, "last name");
2229        assert_eq!(cols[1].affinity, 'D');
2230        assert_eq!(cols[2].name, "role name");
2231        assert_eq!(cols[2].affinity, 'C');
2232    }
2233
2234    #[test]
2235    fn test_parse_columns_from_create_sql_ignores_constraint_keywords_inside_default_literals() {
2236        let sql = "CREATE TABLE t (note TEXT DEFAULT 'NOT NULL UNIQUE PRIMARY KEY')";
2237        let cols = parse_columns_from_create_sql(sql);
2238        assert_eq!(cols.len(), 1);
2239        assert!(!cols[0].notnull);
2240        assert!(!cols[0].unique);
2241        assert!(!cols[0].is_ipk);
2242        assert_eq!(
2243            cols[0].default_value.as_deref(),
2244            Some("'NOT NULL UNIQUE PRIMARY KEY'")
2245        );
2246    }
2247
2248    #[test]
2249    fn test_parse_columns_from_create_sql_preserves_type_arguments() {
2250        let sql = "CREATE TABLE metrics (amount DECIMAL(10, 2), name VARCHAR(255))";
2251        let cols = parse_columns_from_create_sql(sql);
2252        assert_eq!(cols[0].type_name.as_deref(), Some("DECIMAL(10, 2)"));
2253        assert_eq!(cols[1].type_name.as_deref(), Some("VARCHAR(255)"));
2254    }
2255
2256    #[test]
2257    fn test_parse_columns_from_beads_style_multiline_create_table_sql() {
2258        let cases = [
2259            (
2260                "labels",
2261                r"CREATE TABLE labels (
2262                    issue_id TEXT NOT NULL,
2263                    label TEXT NOT NULL,
2264                    PRIMARY KEY (issue_id, label),
2265                    FOREIGN KEY (issue_id) REFERENCES issues(id) ON DELETE CASCADE
2266                )",
2267                &["issue_id", "label"][..],
2268            ),
2269            (
2270                "comments",
2271                r"CREATE TABLE comments (
2272                    id INTEGER PRIMARY KEY AUTOINCREMENT,
2273                    issue_id TEXT NOT NULL,
2274                    author TEXT NOT NULL,
2275                    text TEXT NOT NULL,
2276                    created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
2277                    FOREIGN KEY (issue_id) REFERENCES issues(id) ON DELETE CASCADE
2278                )",
2279                &["id", "issue_id", "author", "text", "created_at"][..],
2280            ),
2281            (
2282                "events",
2283                r"CREATE TABLE events (
2284                    id INTEGER PRIMARY KEY AUTOINCREMENT,
2285                    issue_id TEXT NOT NULL,
2286                    event_type TEXT NOT NULL,
2287                    actor TEXT NOT NULL DEFAULT '',
2288                    old_value TEXT,
2289                    new_value TEXT,
2290                    comment TEXT,
2291                    created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
2292                    FOREIGN KEY (issue_id) REFERENCES issues(id) ON DELETE CASCADE
2293                )",
2294                &[
2295                    "id",
2296                    "issue_id",
2297                    "event_type",
2298                    "actor",
2299                    "old_value",
2300                    "new_value",
2301                    "comment",
2302                    "created_at",
2303                ][..],
2304            ),
2305            (
2306                "config",
2307                r"CREATE TABLE config (
2308                    key TEXT PRIMARY KEY,
2309                    value TEXT NOT NULL
2310                )",
2311                &["key", "value"][..],
2312            ),
2313            (
2314                "blocked_issues_cache",
2315                r"CREATE TABLE blocked_issues_cache (
2316                    issue_id TEXT PRIMARY KEY,
2317                    blocked_by TEXT NOT NULL,  -- JSON array of blocking issue IDs
2318                    blocked_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
2319                    FOREIGN KEY (issue_id) REFERENCES issues(id) ON DELETE CASCADE
2320                )",
2321                &["issue_id", "blocked_by", "blocked_at"][..],
2322            ),
2323            (
2324                "issues",
2325                r"CREATE TABLE issues (
2326                    id TEXT PRIMARY KEY,
2327                    content_hash TEXT,
2328                    title TEXT NOT NULL,
2329                    description TEXT NOT NULL DEFAULT '',
2330                    design TEXT NOT NULL DEFAULT '',
2331                    acceptance_criteria TEXT NOT NULL DEFAULT '',
2332                    notes TEXT NOT NULL DEFAULT '',
2333                    status TEXT NOT NULL DEFAULT 'open',
2334                    priority INTEGER NOT NULL DEFAULT 2,
2335                    issue_type TEXT NOT NULL DEFAULT 'task',
2336                    assignee TEXT,
2337                    owner TEXT DEFAULT '',
2338                    estimated_minutes INTEGER,
2339                    created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
2340                    created_by TEXT DEFAULT '',
2341                    updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
2342                    closed_at DATETIME,
2343                    close_reason TEXT DEFAULT '',
2344                    closed_by_session TEXT DEFAULT '',
2345                    due_at DATETIME,
2346                    defer_until DATETIME,
2347                    external_ref TEXT,
2348                    source_system TEXT DEFAULT '',
2349                    source_repo TEXT NOT NULL DEFAULT '.',
2350                    deleted_at DATETIME,
2351                    deleted_by TEXT DEFAULT '',
2352                    delete_reason TEXT DEFAULT '',
2353                    original_type TEXT DEFAULT '',
2354                    compaction_level INTEGER DEFAULT 0,
2355                    compacted_at DATETIME,
2356                    compacted_at_commit TEXT,
2357                    original_size INTEGER,
2358                    sender TEXT DEFAULT '',
2359                    ephemeral INTEGER DEFAULT 0,
2360                    pinned INTEGER DEFAULT 0,
2361                    is_template INTEGER DEFAULT 0,
2362                    CHECK(length(title) <= 500),
2363                    CHECK(priority >= 0 AND priority <= 4),
2364                    CHECK((status = 'closed' AND closed_at IS NOT NULL) OR (status != 'closed'))
2365                )",
2366                &[
2367                    "id",
2368                    "content_hash",
2369                    "title",
2370                    "description",
2371                    "design",
2372                    "acceptance_criteria",
2373                    "notes",
2374                    "status",
2375                    "priority",
2376                    "issue_type",
2377                    "assignee",
2378                    "owner",
2379                    "estimated_minutes",
2380                    "created_at",
2381                    "created_by",
2382                    "updated_at",
2383                    "closed_at",
2384                    "close_reason",
2385                    "closed_by_session",
2386                    "due_at",
2387                    "defer_until",
2388                    "external_ref",
2389                    "source_system",
2390                    "source_repo",
2391                    "deleted_at",
2392                    "deleted_by",
2393                    "delete_reason",
2394                    "original_type",
2395                    "compaction_level",
2396                    "compacted_at",
2397                    "compacted_at_commit",
2398                    "original_size",
2399                    "sender",
2400                    "ephemeral",
2401                    "pinned",
2402                    "is_template",
2403                ][..],
2404            ),
2405        ];
2406
2407        for (table_name, sql, expected_columns) in cases {
2408            let cols = parse_columns_from_create_sql(sql);
2409            let actual_names: Vec<&str> = cols.iter().map(|col| col.name.as_str()).collect();
2410            assert_eq!(
2411                actual_names, expected_columns,
2412                "failed to parse Beads-style column list for table {table_name}"
2413            );
2414        }
2415    }
2416
2417    #[test]
2418    fn test_build_create_table_sql_appends_strict_keyword() {
2419        let table = TableSchema {
2420            name: "strict_table".to_owned(),
2421            root_page: 2,
2422            columns: vec![ColumnInfo {
2423                name: "id".to_owned(),
2424                affinity: 'D',
2425                is_ipk: false,
2426                type_name: Some("INTEGER".to_owned()),
2427                notnull: false,
2428                unique: false,
2429                default_value: None,
2430                strict_type: Some(StrictColumnType::Integer),
2431                generated_expr: None,
2432                generated_stored: None,
2433                collation: None,
2434            }],
2435            indexes: Vec::new(),
2436            strict: true,
2437            without_rowid: false,
2438            primary_key_constraints: Vec::new(),
2439            foreign_keys: Vec::new(),
2440            check_constraints: Vec::new(),
2441        };
2442
2443        let sql = build_create_table_sql(&table);
2444        assert!(
2445            sql.ends_with(" STRICT"),
2446            "STRICT tables must round-trip with STRICT suffix: {sql}"
2447        );
2448    }
2449
2450    #[test]
2451    fn test_build_create_table_sql_preserves_declared_type_text() {
2452        let table = TableSchema {
2453            name: "typed_table".to_owned(),
2454            root_page: 2,
2455            columns: vec![
2456                ColumnInfo {
2457                    name: "amount".to_owned(),
2458                    affinity: 'C',
2459                    is_ipk: false,
2460                    type_name: Some("DECIMAL(10, 2)".to_owned()),
2461                    notnull: false,
2462                    unique: false,
2463                    default_value: None,
2464                    strict_type: None,
2465                    generated_expr: None,
2466                    generated_stored: None,
2467                    collation: None,
2468                },
2469                ColumnInfo {
2470                    name: "name".to_owned(),
2471                    affinity: 'B',
2472                    is_ipk: false,
2473                    type_name: Some("VARCHAR(255)".to_owned()),
2474                    notnull: false,
2475                    unique: false,
2476                    default_value: None,
2477                    strict_type: None,
2478                    generated_expr: None,
2479                    generated_stored: None,
2480                    collation: None,
2481                },
2482            ],
2483            indexes: Vec::new(),
2484            strict: false,
2485            without_rowid: false,
2486            primary_key_constraints: Vec::new(),
2487            foreign_keys: Vec::new(),
2488            check_constraints: Vec::new(),
2489        };
2490
2491        let sql = build_create_table_sql(&table);
2492        assert!(sql.contains("\"amount\" DECIMAL(10, 2)"), "{sql}");
2493        assert!(sql.contains("\"name\" VARCHAR(255)"), "{sql}");
2494    }
2495
2496    #[test]
2497    fn test_build_create_table_sql_preserves_typeless_columns() {
2498        let table = TableSchema {
2499            name: "typeless_table".to_owned(),
2500            root_page: 2,
2501            columns: vec![ColumnInfo {
2502                name: "payload".to_owned(),
2503                affinity: 'A',
2504                is_ipk: false,
2505                type_name: None,
2506                notnull: false,
2507                unique: false,
2508                default_value: None,
2509                strict_type: None,
2510                generated_expr: None,
2511                generated_stored: None,
2512                collation: None,
2513            }],
2514            indexes: Vec::new(),
2515            strict: false,
2516            without_rowid: false,
2517            primary_key_constraints: Vec::new(),
2518            foreign_keys: Vec::new(),
2519            check_constraints: Vec::new(),
2520        };
2521
2522        let sql = build_create_table_sql(&table);
2523        assert_eq!(sql, "CREATE TABLE \"typeless_table\" (\"payload\")");
2524    }
2525
2526    #[test]
2527    fn test_build_create_table_sql_escapes_embedded_quotes_in_identifiers() {
2528        let table = TableSchema {
2529            name: "ty\"ped_table".to_owned(),
2530            root_page: 2,
2531            columns: vec![
2532                ColumnInfo {
2533                    name: "pay\"load".to_owned(),
2534                    affinity: 'A',
2535                    is_ipk: false,
2536                    type_name: None,
2537                    notnull: false,
2538                    unique: false,
2539                    default_value: None,
2540                    strict_type: None,
2541                    generated_expr: None,
2542                    generated_stored: None,
2543                    collation: Some("noca\"se".to_owned()),
2544                },
2545                ColumnInfo {
2546                    name: "parent\"id".to_owned(),
2547                    affinity: 'D',
2548                    is_ipk: false,
2549                    type_name: Some("INTEGER".to_owned()),
2550                    notnull: false,
2551                    unique: false,
2552                    default_value: None,
2553                    strict_type: None,
2554                    generated_expr: None,
2555                    generated_stored: None,
2556                    collation: None,
2557                },
2558            ],
2559            indexes: Vec::new(),
2560            strict: false,
2561            without_rowid: false,
2562            primary_key_constraints: Vec::new(),
2563            foreign_keys: vec![FkDef {
2564                child_columns: vec![1],
2565                parent_table: "pa\"rent".to_owned(),
2566                parent_columns: vec!["id\"x".to_owned()],
2567                on_delete: FkActionType::Cascade,
2568                on_update: FkActionType::NoAction,
2569            }],
2570            check_constraints: Vec::new(),
2571        };
2572
2573        let sql = build_create_table_sql(&table);
2574        assert!(sql.contains("\"ty\"\"ped_table\""), "{sql}");
2575        assert!(
2576            sql.contains("\"pay\"\"load\" COLLATE \"noca\"\"se\""),
2577            "{sql}"
2578        );
2579        assert!(
2580            sql.contains("FOREIGN KEY(\"parent\"\"id\") REFERENCES \"pa\"\"rent\"(\"id\"\"x\")"),
2581            "{sql}"
2582        );
2583    }
2584
2585    #[test]
2586    fn test_build_create_table_sql_preserves_primary_key_constraints() {
2587        let table = TableSchema {
2588            name: "pk_table".to_owned(),
2589            root_page: 2,
2590            columns: vec![
2591                ColumnInfo {
2592                    name: "id".to_owned(),
2593                    affinity: 'B',
2594                    is_ipk: false,
2595                    type_name: Some("TEXT".to_owned()),
2596                    notnull: false,
2597                    unique: true,
2598                    default_value: None,
2599                    strict_type: None,
2600                    generated_expr: None,
2601                    generated_stored: None,
2602                    collation: None,
2603                },
2604                ColumnInfo {
2605                    name: "body".to_owned(),
2606                    affinity: 'A',
2607                    is_ipk: false,
2608                    type_name: None,
2609                    notnull: false,
2610                    unique: false,
2611                    default_value: None,
2612                    strict_type: None,
2613                    generated_expr: None,
2614                    generated_stored: None,
2615                    collation: None,
2616                },
2617            ],
2618            indexes: Vec::new(),
2619            strict: false,
2620            without_rowid: false,
2621            primary_key_constraints: vec![vec!["id".to_owned()]],
2622            foreign_keys: Vec::new(),
2623            check_constraints: Vec::new(),
2624        };
2625
2626        let sql = build_create_table_sql(&table);
2627        assert!(sql.contains("PRIMARY KEY"), "{sql}");
2628        assert!(!sql.contains("UNIQUE"), "{sql}");
2629        assert_eq!(
2630            sql,
2631            "CREATE TABLE \"pk_table\" (\"id\" TEXT, \"body\", PRIMARY KEY (\"id\"))"
2632        );
2633    }
2634
2635    #[test]
2636    fn test_build_create_table_sql_appends_without_rowid_and_strict_options() {
2637        let table = TableSchema {
2638            name: "wr_strict".to_owned(),
2639            root_page: 2,
2640            columns: vec![ColumnInfo {
2641                name: "id".to_owned(),
2642                affinity: 'D',
2643                is_ipk: false,
2644                type_name: Some("INTEGER".to_owned()),
2645                notnull: false,
2646                unique: true,
2647                default_value: None,
2648                strict_type: Some(StrictColumnType::Integer),
2649                generated_expr: None,
2650                generated_stored: None,
2651                collation: None,
2652            }],
2653            indexes: Vec::new(),
2654            strict: true,
2655            without_rowid: true,
2656            primary_key_constraints: Vec::new(),
2657            foreign_keys: Vec::new(),
2658            check_constraints: Vec::new(),
2659        };
2660
2661        let sql = build_create_table_sql(&table);
2662        assert!(sql.ends_with(" WITHOUT ROWID, STRICT"), "{sql}");
2663    }
2664
2665    #[test]
2666    fn test_build_create_table_sql_preserves_unique_foreign_key_and_check_constraints() {
2667        let table = TableSchema {
2668            name: "child".to_owned(),
2669            root_page: 2,
2670            columns: vec![
2671                ColumnInfo {
2672                    name: "parent_id".to_owned(),
2673                    affinity: 'D',
2674                    is_ipk: false,
2675                    type_name: Some("INTEGER".to_owned()),
2676                    notnull: true,
2677                    unique: false,
2678                    default_value: None,
2679                    strict_type: None,
2680                    generated_expr: None,
2681                    generated_stored: None,
2682                    collation: None,
2683                },
2684                ColumnInfo {
2685                    name: "slug".to_owned(),
2686                    affinity: 'B',
2687                    is_ipk: false,
2688                    type_name: Some("TEXT".to_owned()),
2689                    notnull: false,
2690                    unique: false,
2691                    default_value: None,
2692                    strict_type: None,
2693                    generated_expr: None,
2694                    generated_stored: None,
2695                    collation: None,
2696                },
2697            ],
2698            indexes: vec![IndexSchema {
2699                name: "sqlite_autoindex_child_1".to_owned(),
2700                root_page: 0,
2701                columns: vec!["parent_id".to_owned(), "slug".to_owned()],
2702                key_expressions: Vec::new(),
2703                key_sort_directions: vec![SortDirection::Asc, SortDirection::Asc],
2704                where_clause: None,
2705                is_unique: true,
2706            }],
2707            strict: false,
2708            without_rowid: false,
2709            primary_key_constraints: Vec::new(),
2710            foreign_keys: vec![FkDef {
2711                child_columns: vec![0],
2712                parent_table: "parent".to_owned(),
2713                parent_columns: vec!["id".to_owned()],
2714                on_delete: FkActionType::Cascade,
2715                on_update: FkActionType::Restrict,
2716            }],
2717            check_constraints: vec!["length(slug) > 0".to_owned()],
2718        };
2719
2720        let sql = build_create_table_sql(&table);
2721        assert!(sql.contains("UNIQUE (\"parent_id\", \"slug\")"), "{sql}");
2722        assert!(
2723            sql.contains(
2724                "FOREIGN KEY(\"parent_id\") REFERENCES \"parent\"(\"id\") ON DELETE CASCADE ON UPDATE RESTRICT"
2725            ),
2726            "{sql}"
2727        );
2728        assert!(sql.contains("CHECK(length(slug) > 0)"), "{sql}");
2729    }
2730
2731    #[test]
2732    fn test_extract_unique_constraint_indexes_from_sql_preserves_table_level_unique_constraints() {
2733        let indexes = extract_unique_constraint_indexes_from_sql(
2734            "CREATE TABLE child (tenant TEXT, slug TEXT, UNIQUE(tenant, slug))",
2735            "child",
2736        );
2737        assert_eq!(indexes.len(), 1);
2738        assert_eq!(indexes[0].columns, vec!["tenant", "slug"]);
2739        assert!(indexes[0].is_unique);
2740    }
2741
2742    #[test]
2743    fn test_extract_unique_constraint_indexes_skips_table_level_integer_primary_key_alias() {
2744        let indexes = extract_unique_constraint_indexes_from_sql(
2745            "CREATE TABLE metrics (id INTEGER, body TEXT, PRIMARY KEY(id COLLATE NOCASE DESC))",
2746            "metrics",
2747        );
2748        assert!(indexes.is_empty(), "{indexes:?}");
2749    }
2750
2751    #[test]
2752    fn test_is_strict_table_sql_detects_strict_options() {
2753        assert!(is_strict_table_sql(
2754            "CREATE TABLE s (id INTEGER, body TEXT) STRICT"
2755        ));
2756        assert!(is_strict_table_sql(
2757            "CREATE TABLE s (id INTEGER) WITHOUT ROWID, STRICT;"
2758        ));
2759        assert!(!is_strict_table_sql(
2760            "CREATE TABLE s (id INTEGER, body TEXT) WITHOUT ROWID"
2761        ));
2762    }
2763
2764    #[test]
2765    fn test_is_without_rowid_table_sql_detects_option() {
2766        assert!(is_without_rowid_table_sql(
2767            "CREATE TABLE s (id INTEGER PRIMARY KEY, body TEXT) WITHOUT ROWID"
2768        ));
2769        assert!(is_without_rowid_table_sql(
2770            "CREATE TABLE s (id INTEGER PRIMARY KEY, body TEXT) WITHOUT ROWID, STRICT;"
2771        ));
2772        assert!(!is_without_rowid_table_sql(
2773            "CREATE TABLE s (id INTEGER PRIMARY KEY, body TEXT) STRICT"
2774        ));
2775    }
2776
2777    #[test]
2778    fn test_is_autoincrement_table_sql_detects_keyword() {
2779        assert!(is_autoincrement_table_sql(
2780            "CREATE TABLE t(id INTEGER PRIMARY KEY AUTOINCREMENT, v TEXT)"
2781        ));
2782        assert!(!is_autoincrement_table_sql(
2783            "CREATE TABLE t(id INTEGER PRIMARY KEY, v TEXT)"
2784        ));
2785    }
2786
2787    #[test]
2788    fn test_is_autoincrement_table_sql_ignores_default_literal_keyword() {
2789        assert!(!is_autoincrement_table_sql(
2790            "CREATE TABLE t(id INTEGER PRIMARY KEY, note TEXT DEFAULT 'AUTOINCREMENT')"
2791        ));
2792    }
2793
2794    #[test]
2795    fn test_parse_columns_from_create_sql_populates_strict_types() {
2796        let sql = "CREATE TABLE strict_cols (id INTEGER, score REAL, body TEXT, payload BLOB, any_col ANY) STRICT";
2797        let cols = parse_columns_from_create_sql(sql);
2798        assert_eq!(cols.len(), 5);
2799        assert_eq!(cols[0].strict_type, Some(StrictColumnType::Integer));
2800        assert_eq!(cols[1].strict_type, Some(StrictColumnType::Real));
2801        assert_eq!(cols[2].strict_type, Some(StrictColumnType::Text));
2802        assert_eq!(cols[3].strict_type, Some(StrictColumnType::Blob));
2803        assert_eq!(cols[4].strict_type, Some(StrictColumnType::Any));
2804    }
2805
2806    #[test]
2807    fn test_parse_columns_from_sqlite_master_sql_ignores_virtual_table_options() {
2808        let sql =
2809            "CREATE VIRTUAL TABLE docs USING fts5(subject, body, tokenize='porter', prefix='2 3')";
2810        let cols = parse_columns_from_sqlite_master_sql(sql);
2811        let names: Vec<&str> = cols.iter().map(|column| column.name.as_str()).collect();
2812        assert_eq!(names, vec!["subject", "body"]);
2813    }
2814
2815    #[test]
2816    fn test_extract_check_constraints_from_sql_ignores_literal_check_text() {
2817        let sql = "CREATE TABLE t (note TEXT DEFAULT 'CHECK(fake)', CHECK(length(note) > 0))";
2818        let checks = extract_check_constraints_from_sql(sql);
2819        assert_eq!(checks, vec!["length(note) > 0".to_owned()]);
2820    }
2821
2822    #[test]
2823    fn test_type_to_affinity_mapping() {
2824        assert_eq!(type_to_affinity("INTEGER"), 'D');
2825        assert_eq!(type_to_affinity("INT"), 'D');
2826        assert_eq!(type_to_affinity("REAL"), 'E');
2827        assert_eq!(type_to_affinity("FLOAT"), 'E');
2828        assert_eq!(type_to_affinity("TEXT"), 'B');
2829        assert_eq!(type_to_affinity("VARCHAR"), 'B');
2830        assert_eq!(type_to_affinity("BLOB"), 'A');
2831        assert_eq!(type_to_affinity("NUMERIC"), 'C');
2832    }
2833
2834    #[test]
2835    fn test_build_create_index_sql_preserves_unique_collation_and_direction() {
2836        let terms = [
2837            CreateIndexSqlTerm {
2838                column_name: "project_id",
2839                collation: None,
2840                direction: Some(SortDirection::Asc),
2841            },
2842            CreateIndexSqlTerm {
2843                column_name: "name",
2844                collation: Some("NOCASE"),
2845                direction: Some(SortDirection::Desc),
2846            },
2847        ];
2848
2849        let sql = build_create_index_sql(
2850            "idx_agents_project_name_nocase",
2851            "agents",
2852            true,
2853            &terms,
2854            None,
2855        );
2856
2857        assert_eq!(
2858            sql,
2859            "CREATE UNIQUE INDEX \"idx_agents_project_name_nocase\" ON \"agents\" (\"project_id\" ASC, \"name\" COLLATE \"NOCASE\" DESC)"
2860        );
2861    }
2862
2863    #[test]
2864    fn test_build_create_index_sql_escapes_embedded_quotes_in_identifiers() {
2865        let terms = [CreateIndexSqlTerm {
2866            column_name: "na\"me",
2867            collation: Some("NO\"CASE"),
2868            direction: Some(SortDirection::Desc),
2869        }];
2870
2871        let sql = build_create_index_sql("idx\"q", "ta\"ble", true, &terms, None);
2872
2873        assert_eq!(
2874            sql,
2875            "CREATE UNIQUE INDEX \"idx\"\"q\" ON \"ta\"\"ble\" (\"na\"\"me\" COLLATE \"NO\"\"CASE\" DESC)"
2876        );
2877    }
2878
2879    #[test]
2880    fn test_overwrite_existing_file() {
2881        let dir = tempfile::tempdir().unwrap();
2882        let db_path = dir.path().join("overwrite.db");
2883
2884        // Write once.
2885        let (schema, db) = make_test_schema_and_db();
2886        persist_test_db(&db_path, &schema, &db, 0, 0).unwrap();
2887
2888        // Overwrite with empty.
2889        persist_test_db(&db_path, &[], &MemDatabase::new(), 0, 0).unwrap();
2890
2891        let loaded = load_test_db(&db_path).unwrap();
2892        assert!(loaded.schema.is_empty());
2893    }
2894
2895    #[test]
2896    fn test_load_from_sqlite_keeps_materialized_virtual_tables_with_real_root_page() {
2897        let dir = tempfile::tempdir().unwrap();
2898        let db_path = dir.path().join("materialized_vtab_load.db");
2899        let db_str = db_path.to_string_lossy().to_string();
2900
2901        {
2902            let conn = crate::connection::Connection::open(&db_str).unwrap();
2903            conn.execute("CREATE VIRTUAL TABLE docs USING fts5(subject, body, tokenize='porter')")
2904                .unwrap();
2905            conn.execute(
2906                "INSERT INTO docs(rowid, subject, body) VALUES (1, 'Hello', 'Rust world')",
2907            )
2908            .unwrap();
2909            conn.execute("INSERT INTO docs(rowid, subject, body) VALUES (2, 'Other', 'Nothing')")
2910                .unwrap();
2911            conn.close().unwrap();
2912        }
2913
2914        let loaded = load_test_db(&db_path).unwrap();
2915        let table = loaded
2916            .schema
2917            .iter()
2918            .find(|table| table.name.eq_ignore_ascii_case("docs"))
2919            .expect("materialized virtual table should survive direct load");
2920        let column_names: Vec<&str> = table
2921            .columns
2922            .iter()
2923            .map(|column| column.name.as_str())
2924            .collect();
2925        assert_eq!(column_names, vec!["subject", "body"]);
2926        let mem_table = loaded
2927            .db
2928            .get_table(table.root_page)
2929            .expect("loaded table should exist in MemDatabase");
2930        let rows: Vec<_> = mem_table.iter_rows().collect();
2931        assert_eq!(rows.len(), 2);
2932        assert_eq!(rows[0].0, 1);
2933        assert_eq!(rows[0].1[0], SqliteValue::Text("Hello".into()));
2934        assert_eq!(rows[0].1[1], SqliteValue::Text("Rust world".into()));
2935        assert_eq!(rows[1].0, 2);
2936        assert_eq!(rows[1].1[0], SqliteValue::Text("Other".into()));
2937        assert_eq!(rows[1].1[1], SqliteValue::Text("Nothing".into()));
2938    }
2939
2940    #[test]
2941    fn test_load_from_sqlite_rejects_non_virtual_table_with_rootpage_zero() {
2942        let dir = tempfile::tempdir().unwrap();
2943        let db_path = dir.path().join("compat_corrupt_rootpage_zero.db");
2944
2945        {
2946            let conn = rusqlite::Connection::open(&db_path).unwrap();
2947            conn.execute_batch(
2948                r"
2949                CREATE TABLE docs (id INTEGER PRIMARY KEY, title TEXT);
2950                INSERT INTO docs VALUES (1, 'hello');
2951                PRAGMA writable_schema = ON;
2952                UPDATE sqlite_master SET rootpage = 0 WHERE name = 'docs';
2953                PRAGMA writable_schema = OFF;
2954                ",
2955            )
2956            .unwrap();
2957        }
2958
2959        let err = match load_test_db(&db_path) {
2960            Ok(_) => panic!("corrupt rootpage should fail load"),
2961            Err(err) => err,
2962        };
2963        let message = err.to_string();
2964        assert!(
2965            message.contains("rootpage 0") || message.contains("root page"),
2966            "unexpected load error: {message}"
2967        );
2968    }
2969
2970    #[test]
2971    fn test_load_from_sqlite_rejects_negative_rootpage() {
2972        let dir = tempfile::tempdir().unwrap();
2973        let db_path = dir.path().join("compat_corrupt_rootpage_negative.db");
2974
2975        {
2976            let conn = rusqlite::Connection::open(&db_path).unwrap();
2977            conn.execute_batch(
2978                r"
2979                CREATE TABLE docs (id INTEGER PRIMARY KEY, title TEXT);
2980                INSERT INTO docs VALUES (1, 'hello');
2981                PRAGMA writable_schema = ON;
2982                UPDATE sqlite_master SET rootpage = -7 WHERE name = 'docs';
2983                PRAGMA writable_schema = OFF;
2984                ",
2985            )
2986            .unwrap();
2987        }
2988
2989        let err = match load_test_db(&db_path) {
2990            Ok(_) => panic!("negative rootpage should fail load"),
2991            Err(err) => err,
2992        };
2993        let message = err.to_string();
2994        assert!(
2995            message.contains("rootpage -7") || message.contains("invalid rootpage"),
2996            "unexpected load error: {message}"
2997        );
2998    }
2999
3000    #[test]
3001    fn test_load_from_sqlite_rejects_rootpage_above_supported_range() {
3002        let dir = tempfile::tempdir().unwrap();
3003        let db_path = dir.path().join("compat_corrupt_rootpage_large.db");
3004
3005        {
3006            let conn = rusqlite::Connection::open(&db_path).unwrap();
3007            conn.execute_batch(
3008                r"
3009                CREATE TABLE docs (id INTEGER PRIMARY KEY, title TEXT);
3010                INSERT INTO docs VALUES (1, 'hello');
3011                PRAGMA writable_schema = ON;
3012                UPDATE sqlite_master SET rootpage = 2147483648 WHERE name = 'docs';
3013                PRAGMA writable_schema = OFF;
3014                ",
3015            )
3016            .unwrap();
3017        }
3018
3019        let err = match load_test_db(&db_path) {
3020            Ok(_) => panic!("oversized rootpage should fail load"),
3021            Err(err) => err,
3022        };
3023        let message = err.to_string();
3024        assert!(
3025            message.contains("supported range")
3026                || message.contains("out-of-range")
3027                || message.contains("2147483648"),
3028            "unexpected load error: {message}"
3029        );
3030    }
3031
3032    #[test]
3033    fn test_load_from_sqlite_rejects_invalid_utf8_in_sqlite_master_record() {
3034        let dir = tempfile::tempdir().unwrap();
3035        let db_path = dir.path().join("compat_corrupt_master_utf8.db");
3036
3037        {
3038            let conn = rusqlite::Connection::open(&db_path).unwrap();
3039            conn.execute_batch(
3040                r"
3041                CREATE TABLE docs (id INTEGER PRIMARY KEY, title TEXT);
3042                INSERT INTO docs VALUES (1, 'hello');
3043                PRAGMA writable_schema = ON;
3044                UPDATE sqlite_master
3045                SET sql = CAST(x'FF' AS TEXT)
3046                WHERE name = 'docs';
3047                PRAGMA writable_schema = OFF;
3048                ",
3049            )
3050            .unwrap();
3051        }
3052
3053        let err = load_test_db(&db_path).expect_err("invalid sqlite_master text should fail");
3054        let message = err.to_string();
3055        assert!(
3056            message.contains("sqlite_master row")
3057                || message.contains("valid SQLite record")
3058                || message.contains("payload"),
3059            "unexpected load error: {message}"
3060        );
3061    }
3062
3063    #[test]
3064    fn test_load_from_sqlite_rejects_invalid_utf8_in_table_record() {
3065        let dir = tempfile::tempdir().unwrap();
3066        let db_path = dir.path().join("compat_corrupt_table_utf8.db");
3067
3068        {
3069            let conn = rusqlite::Connection::open(&db_path).unwrap();
3070            conn.execute_batch(
3071                r"
3072                CREATE TABLE docs (title TEXT);
3073                INSERT INTO docs VALUES (CAST(x'FF' AS TEXT));
3074                ",
3075            )
3076            .unwrap();
3077        }
3078
3079        let err = load_test_db(&db_path).expect_err("invalid table text should fail");
3080        let message = err.to_string();
3081        assert!(
3082            message.contains("table `docs`")
3083                || message.contains("valid SQLite record")
3084                || message.contains("payload"),
3085            "unexpected load error: {message}"
3086        );
3087    }
3088}