Skip to main content

fsqlite_vdbe/
lib.rs

1// bd-gird: §10.7-10.8 VDBE Instruction Format + Coroutines
2//
3// This crate provides the VDBE (Virtual Database Engine) program builder,
4// label resolution, register allocation, coroutine mechanism, and disassembly.
5// The foundational types (Opcode, VdbeOp, P4) live in fsqlite-types.
6
7use hashbrown::HashMap;
8
9use fsqlite_error::{FrankenError, Result};
10use fsqlite_types::opcode::{Opcode, P4, VdbeOp};
11use std::sync::Arc;
12
13pub mod codegen;
14pub mod engine;
15pub mod frame;
16#[cfg(test)]
17mod repro_delete_skip;
18pub mod vectorized;
19pub mod vectorized_agg;
20#[cfg(not(target_arch = "wasm32"))]
21pub mod vectorized_dispatch;
22pub mod vectorized_hash_join;
23pub mod vectorized_join;
24pub mod vectorized_ops;
25pub mod vectorized_scan;
26pub mod vectorized_sort;
27
28#[cfg(test)]
29mod vectorized_prop_tests;
30
31/// Register spans touched by an opcode.
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub(crate) struct OpcodeRegisterSpans {
34    pub(crate) read_start: i32,
35    pub(crate) read_len: i32,
36    pub(crate) write_start: i32,
37    pub(crate) write_len: i32,
38}
39
40impl OpcodeRegisterSpans {
41    pub(crate) const NONE: Self = Self {
42        read_start: -1,
43        read_len: 0,
44        write_start: -1,
45        write_len: 0,
46    };
47
48    pub(crate) fn max_touched_register(self) -> i32 {
49        let read_end = if self.read_start > 0 {
50            self.read_start + self.read_len - 1
51        } else {
52            0
53        };
54        let write_end = if self.write_start > 0 {
55            self.write_start + self.write_len - 1
56        } else {
57            0
58        };
59        read_end.max(write_end)
60    }
61}
62
63fn register_range(start: i32, len: i32) -> (i32, i32) {
64    if start <= 0 {
65        (-1, 0)
66    } else {
67        (start, len.max(1))
68    }
69}
70
71pub(crate) fn opcode_register_spans(op: &VdbeOp) -> OpcodeRegisterSpans {
72    let (read_start, read_len, write_start, write_len) = match op.opcode {
73        Opcode::Integer
74        | Opcode::Int64
75        | Opcode::Real
76        | Opcode::String
77        | Opcode::String8
78        | Opcode::Blob
79        | Opcode::Variable => {
80            let (write_start, write_len) = register_range(op.p2, 1);
81            (-1, 0, write_start, write_len)
82        }
83        Opcode::Null => {
84            let write_count = if op.p3 > 0 { op.p3 - op.p2 + 1 } else { 1 };
85            let (write_start, write_len) = register_range(op.p2, write_count);
86            (-1, 0, write_start, write_len)
87        }
88        Opcode::SoftNull
89        | Opcode::Cast
90        | Opcode::RealAffinity
91        | Opcode::AddImm
92        | Opcode::MustBeInt
93        | Opcode::InitCoroutine
94        | Opcode::Yield
95        | Opcode::EndCoroutine => {
96            let (start, len) = register_range(op.p1, 1);
97            (start, len, start, len)
98        }
99        Opcode::Move => {
100            let (read_start, read_len) = register_range(op.p1, op.p3);
101            let (write_start, write_len) = register_range(op.p2, op.p3);
102            (read_start, read_len, write_start, write_len)
103        }
104        Opcode::Copy | Opcode::SCopy | Opcode::IntCopy | Opcode::BitNot | Opcode::Not => {
105            let (read_start, read_len) = register_range(op.p1, 1);
106            let (write_start, write_len) = register_range(op.p2, 1);
107            (read_start, read_len, write_start, write_len)
108        }
109        Opcode::ResultRow => {
110            let (read_start, read_len) = register_range(op.p1, op.p2);
111            (read_start, read_len, -1, 0)
112        }
113        Opcode::Add
114        | Opcode::Subtract
115        | Opcode::Multiply
116        | Opcode::Divide
117        | Opcode::Remainder
118        | Opcode::Concat
119        | Opcode::BitAnd
120        | Opcode::BitOr
121        | Opcode::ShiftLeft
122        | Opcode::ShiftRight
123        | Opcode::And
124        | Opcode::Or => {
125            let (read_start, read_len) = register_range(op.p1, 2);
126            let (write_start, write_len) = register_range(op.p3, 1);
127            (read_start, read_len, write_start, write_len)
128        }
129        Opcode::Eq | Opcode::Ne | Opcode::Lt | Opcode::Le | Opcode::Gt | Opcode::Ge => {
130            let (read_start, read_len) = register_range(op.p1, 1);
131            let (rhs_start, rhs_len) = register_range(op.p3, 1);
132            let normalized_start = if read_start > 0 && rhs_start > 0 {
133                read_start.min(rhs_start)
134            } else if read_start > 0 {
135                read_start
136            } else {
137                rhs_start
138            };
139            let normalized_len = if read_start > 0 && rhs_start > 0 && read_start != rhs_start {
140                2
141            } else {
142                read_len.max(rhs_len)
143            };
144            (normalized_start, normalized_len, -1, 0)
145        }
146        Opcode::If | Opcode::IfNot | Opcode::IsNull | Opcode::NotNull | Opcode::IsTrue => {
147            let (read_start, read_len) = register_range(op.p1, 1);
148            (read_start, read_len, -1, 0)
149        }
150        Opcode::MakeRecord => {
151            let (read_start, read_len) = register_range(op.p1, op.p2);
152            let (write_start, write_len) = register_range(op.p3, 1);
153            (read_start, read_len, write_start, write_len)
154        }
155        _ => (
156            OpcodeRegisterSpans::NONE.read_start,
157            OpcodeRegisterSpans::NONE.read_len,
158            OpcodeRegisterSpans::NONE.write_start,
159            OpcodeRegisterSpans::NONE.write_len,
160        ),
161    };
162
163    OpcodeRegisterSpans {
164        read_start,
165        read_len,
166        write_start,
167        write_len,
168    }
169}
170
171// ── Label System ────────────────────────────────────────────────────────────
172
173/// An opaque handle representing a forward-reference label.
174///
175/// Labels allow codegen to emit jump instructions before the target address
176/// is known. All labels MUST be resolved before execution begins; unresolved
177/// labels are a codegen bug.
178#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
179pub struct Label(usize);
180
181/// Internal tracking for label resolution.
182#[derive(Debug)]
183enum LabelState {
184    /// Not yet resolved. Contains the indices of instructions whose `p2`
185    /// field should be patched when the label is resolved.
186    Unresolved(Vec<usize>),
187    /// Resolved to a concrete instruction address.
188    Resolved(i32),
189}
190
191// ── Sort Order ──────────────────────────────────────────────────────────────
192
193/// Sort direction for key comparison.
194#[derive(Debug, Clone, Copy, PartialEq, Eq)]
195pub enum SortOrder {
196    /// Ascending order (default).
197    Asc,
198    /// Descending order.
199    Desc,
200}
201
202// ── KeyInfo ─────────────────────────────────────────────────────────────────
203
204/// Describes the key structure for multi-column index comparisons.
205///
206/// Used by Compare, IdxInsert, IdxDelete, and seek operations. Each field
207/// has an associated collation sequence and sort order.
208#[derive(Debug, Clone, PartialEq, Eq)]
209pub struct KeyInfo {
210    /// Number of key fields.
211    pub num_fields: u16,
212    /// Collation sequence name per field (one entry per `num_fields`).
213    pub collations: Vec<String>,
214    /// Sort direction per field.
215    pub sort_orders: Vec<SortOrder>,
216}
217
218// ── Coroutine State ─────────────────────────────────────────────────────────
219
220/// Tracks the execution state of a coroutine.
221///
222/// Coroutines in VDBE are cooperative PC-swap state machines (NOT async).
223/// `InitCoroutine` initializes the state, `Yield` swaps PCs bidirectionally,
224/// and `EndCoroutine` marks exhaustion and returns to the caller.
225#[derive(Debug, Clone, PartialEq, Eq)]
226pub struct CoroutineState {
227    /// The register that stores the yield/resume PC.
228    pub yield_reg: i32,
229    /// The saved program counter (where to resume).
230    pub saved_pc: i32,
231    /// Whether the coroutine has been exhausted (EndCoroutine reached).
232    pub exhausted: bool,
233}
234
235impl CoroutineState {
236    /// Create a new coroutine state with the given yield register and
237    /// initial body address.
238    pub fn new(yield_reg: i32, body_pc: i32) -> Self {
239        Self {
240            yield_reg,
241            saved_pc: body_pc,
242            exhausted: false,
243        }
244    }
245
246    /// Perform a bidirectional PC swap (Yield semantics).
247    ///
248    /// The current PC is saved into this state, and the previously saved PC
249    /// is returned as the new PC to jump to.
250    pub fn yield_swap(&mut self, current_pc: i32) -> i32 {
251        let resume_at = self.saved_pc;
252        self.saved_pc = current_pc;
253        resume_at
254    }
255
256    /// Mark the coroutine as exhausted (EndCoroutine semantics).
257    ///
258    /// Returns the saved PC to return to the caller.
259    pub fn end(&mut self) -> i32 {
260        self.exhausted = true;
261        self.saved_pc
262    }
263}
264
265// ── Register Allocator ──────────────────────────────────────────────────────
266
267/// Sequential register allocator for the VDBE register file.
268///
269/// Registers are numbered starting at 1 (register 0 is reserved/unused,
270/// matching C SQLite convention). The allocator supports both persistent
271/// registers (held for statement lifetime) and temporary registers that
272/// can be returned to a reuse pool.
273#[derive(Debug)]
274pub struct RegisterAllocator {
275    /// The next register number to allocate (starts at 1).
276    next_reg: i32,
277    /// Pool of returned temporary registers available for reuse.
278    temp_pool: Vec<i32>,
279}
280
281impl RegisterAllocator {
282    /// Create a new allocator. First allocation returns register 1.
283    pub fn new() -> Self {
284        Self {
285            next_reg: 1,
286            temp_pool: Vec::new(),
287        }
288    }
289
290    /// Allocate a single persistent register.
291    pub fn alloc_reg(&mut self) -> i32 {
292        let reg = self.next_reg;
293        self.next_reg += 1;
294        reg
295    }
296
297    /// Allocate a contiguous block of `n` persistent registers.
298    ///
299    /// Returns the first register number. The block spans `[result, result+n)`.
300    pub fn alloc_regs(&mut self, n: i32) -> i32 {
301        let first = self.next_reg;
302        self.next_reg += n;
303        first
304    }
305
306    /// Allocate a temporary register (reuses from pool if available).
307    pub fn alloc_temp(&mut self) -> i32 {
308        self.temp_pool.pop().unwrap_or_else(|| {
309            let reg = self.next_reg;
310            self.next_reg += 1;
311            reg
312        })
313    }
314
315    /// Return a temporary register to the reuse pool.
316    pub fn free_temp(&mut self, reg: i32) {
317        self.temp_pool.push(reg);
318    }
319
320    /// The total number of registers allocated (high water mark).
321    pub fn count(&self) -> i32 {
322        self.next_reg - 1
323    }
324}
325
326impl Default for RegisterAllocator {
327    fn default() -> Self {
328        Self::new()
329    }
330}
331
332// ── VDBE Program Builder ────────────────────────────────────────────────────
333
334/// A VDBE bytecode program under construction.
335///
336/// Provides methods to emit instructions, create/resolve labels for forward
337/// jumps, and allocate registers. Once construction is complete, call
338/// [`finish`](Self::finish) to validate and extract the final instruction
339/// sequence.
340#[derive(Debug)]
341pub struct ProgramBuilder {
342    /// The instruction sequence.
343    ops: smallvec::SmallVec<[VdbeOp; 64]>,
344    /// Label states (indexed by `Label.0`).
345    labels: Vec<LabelState>,
346    /// Register allocator.
347    regs: RegisterAllocator,
348    /// Counter for anonymous placeholder numbering (1-based).
349    next_anon_placeholder: u32,
350    /// Table-to-index cursor metadata for REPLACE conflict resolution.
351    table_index_meta: HashMap<i32, Vec<fsqlite_types::opcode::IndexCursorMeta>>,
352}
353
354impl ProgramBuilder {
355    /// Create a new empty program builder.
356    pub fn new() -> Self {
357        Self {
358            ops: smallvec::SmallVec::new(),
359            labels: Vec::new(),
360            regs: RegisterAllocator::new(),
361            next_anon_placeholder: 1,
362            table_index_meta: HashMap::new(),
363        }
364    }
365
366    /// Get the next anonymous placeholder index (1-based) and increment the counter.
367    pub fn next_anon_placeholder_idx(&mut self) -> u32 {
368        let idx = self.next_anon_placeholder;
369        self.next_anon_placeholder += 1;
370        idx
371    }
372
373    /// Set the anonymous placeholder counter to a specific value.
374    /// Used when codegen emission order differs from SQL textual order.
375    pub fn set_next_anon_placeholder(&mut self, val: u32) {
376        self.next_anon_placeholder = val;
377    }
378
379    /// Get the current anonymous placeholder counter without incrementing.
380    pub fn current_anon_placeholder(&self) -> u32 {
381        self.next_anon_placeholder
382    }
383
384    // ── Instruction emission ────────────────────────────────────────────
385
386    /// Emit a single instruction and return its address (index in `ops`).
387    pub fn emit(&mut self, op: VdbeOp) -> usize {
388        let addr = self.ops.len();
389        self.ops.push(op);
390        addr
391    }
392
393    /// Emit a simple instruction from parts.
394    pub fn emit_op(&mut self, opcode: Opcode, p1: i32, p2: i32, p3: i32, p4: P4, p5: u16) -> usize {
395        self.emit(VdbeOp {
396            opcode,
397            p1,
398            p2,
399            p3,
400            p4,
401            p5,
402        })
403    }
404
405    /// The current address (index of the next instruction to be emitted).
406    pub fn current_addr(&self) -> usize {
407        self.ops.len()
408    }
409
410    /// Get a reference to the instruction at `addr`.
411    pub fn op_at(&self, addr: usize) -> Option<&VdbeOp> {
412        self.ops.get(addr)
413    }
414
415    /// Get a mutable reference to the instruction at `addr`.
416    pub fn op_at_mut(&mut self, addr: usize) -> Option<&mut VdbeOp> {
417        self.ops.get_mut(addr)
418    }
419
420    // ── Label system ────────────────────────────────────────────────────
421
422    /// Create a new label for forward-reference jumps.
423    pub fn emit_label(&mut self) -> Label {
424        let id = self.labels.len();
425        self.labels.push(LabelState::Unresolved(Vec::new()));
426        Label(id)
427    }
428
429    /// Emit a jump instruction whose p2 target is a label (forward reference).
430    ///
431    /// The label's address will be patched into p2 when `resolve_label` is called.
432    pub fn emit_jump_to_label(
433        &mut self,
434        opcode: Opcode,
435        p1: i32,
436        p3: i32,
437        label: Label,
438        p4: P4,
439        p5: u16,
440    ) -> usize {
441        let addr = self.emit(VdbeOp {
442            opcode,
443            p1,
444            p2: -1, // placeholder; will be patched
445            p3,
446            p4,
447            p5,
448        });
449
450        let idx = label.0;
451        match &mut self.labels[idx] {
452            LabelState::Unresolved(refs) => refs.push(addr),
453            LabelState::Resolved(target) => {
454                // Label already resolved; patch immediately.
455                self.ops[addr].p2 = *target;
456            }
457        }
458
459        addr
460    }
461
462    /// Resolve a label to the current instruction address.
463    ///
464    /// All instructions that reference this label have their `p2` patched.
465    pub fn resolve_label(&mut self, label: Label) {
466        let Ok(target) = i32::try_from(self.ops.len()) else {
467            // Keep label unresolved so finish() returns a deterministic internal
468            // error instead of panicking on oversized programs.
469            return;
470        };
471        let idx = label.0;
472
473        let refs = match std::mem::replace(&mut self.labels[idx], LabelState::Resolved(target)) {
474            LabelState::Unresolved(refs) => refs,
475            LabelState::Resolved(_) => {
476                // Double resolve is a codegen bug, but we tolerate it
477                // if the target is the same.
478                return;
479            }
480        };
481
482        for op_idx in refs {
483            self.ops[op_idx].p2 = target;
484        }
485    }
486
487    /// Resolve a label to a specific address (not necessarily current).
488    pub fn resolve_label_to(&mut self, label: Label, address: i32) {
489        let idx = label.0;
490
491        let refs = match std::mem::replace(&mut self.labels[idx], LabelState::Resolved(address)) {
492            LabelState::Unresolved(refs) => refs,
493            LabelState::Resolved(_) => return,
494        };
495
496        for op_idx in refs {
497            self.ops[op_idx].p2 = address;
498        }
499    }
500
501    // ── Register allocation (delegates to RegisterAllocator) ────────────
502
503    /// Allocate a single persistent register.
504    pub fn alloc_reg(&mut self) -> i32 {
505        self.regs.alloc_reg()
506    }
507
508    /// Allocate a contiguous block of `n` persistent registers.
509    pub fn alloc_regs(&mut self, n: i32) -> i32 {
510        self.regs.alloc_regs(n)
511    }
512
513    /// Allocate a temporary register (reusable).
514    pub fn alloc_temp(&mut self) -> i32 {
515        self.regs.alloc_temp()
516    }
517
518    /// Return a temporary register to the pool.
519    pub fn free_temp(&mut self, reg: i32) {
520        self.regs.free_temp(reg);
521    }
522
523    /// Total registers allocated (high water mark).
524    pub fn register_count(&self) -> i32 {
525        self.regs.count()
526    }
527
528    // ── Table-index metadata ─────────────────────────────────────────────
529
530    /// Register the index cursors associated with a table cursor.
531    ///
532    /// Used by the engine during REPLACE conflict resolution to delete
533    /// orphaned secondary index entries before replacing the table row.
534    pub fn register_table_indexes(
535        &mut self,
536        table_cursor: i32,
537        indexes: Vec<fsqlite_types::opcode::IndexCursorMeta>,
538    ) {
539        if !indexes.is_empty() {
540            self.table_index_meta
541                .entry(table_cursor)
542                .or_default()
543                .extend(indexes);
544        }
545    }
546
547    // ── Finalization ────────────────────────────────────────────────────
548
549    /// Validate all labels are resolved and return the finished program.
550    pub fn finish(self) -> Result<VdbeProgram> {
551        // Check for unresolved labels.
552        for (i, state) in self.labels.iter().enumerate() {
553            if let LabelState::Unresolved(refs) = state {
554                if !refs.is_empty() {
555                    return Err(FrankenError::Internal(format!(
556                        "unresolved label {i} referenced by {} instruction(s)",
557                        refs.len()
558                    )));
559                }
560            }
561        }
562        let bind_parameter_requirement = compute_bind_parameter_requirement(&self.ops);
563        let table_index_meta = self
564            .table_index_meta
565            .into_iter()
566            .map(|(table_cursor, indexes)| (table_cursor, indexes.into_boxed_slice()))
567            .collect();
568
569        let inferred_register_count = self.ops.iter().fold(0, |max_register, op| {
570            max_register.max(opcode_register_spans(op).max_touched_register())
571        });
572        let has_insert = self.ops.iter().any(|op| op.opcode == Opcode::Insert);
573        Ok(VdbeProgram {
574            ops: self.ops,
575            register_count: self.regs.count().max(inferred_register_count),
576            bind_parameter_requirement,
577            table_index_meta: Arc::new(table_index_meta),
578            has_insert,
579        })
580    }
581}
582
583impl Default for ProgramBuilder {
584    fn default() -> Self {
585        Self::new()
586    }
587}
588
589// ── VDBE Program ────────────────────────────────────────────────────────────
590
591pub(crate) type TableIndexMetaMap = HashMap<i32, Box<[fsqlite_types::opcode::IndexCursorMeta]>>;
592
593/// A finalized VDBE bytecode program ready for execution.
594#[derive(Debug, Clone, PartialEq)]
595pub struct VdbeProgram {
596    /// The instruction sequence.
597    ops: smallvec::SmallVec<[VdbeOp; 64]>,
598    /// Number of registers needed (high water mark from allocation).
599    register_count: i32,
600    /// Precomputed bind parameter requirement for `Opcode::Variable` opcodes.
601    ///
602    /// `Ok(max_index)` means all variable opcodes carry valid 1-based indexes.
603    /// `Err(raw_index)` stores the first invalid raw index encountered.
604    bind_parameter_requirement: std::result::Result<usize, i32>,
605    /// Table-to-index cursor metadata for REPLACE conflict resolution.
606    table_index_meta: Arc<TableIndexMetaMap>,
607    /// Precomputed flag: true when the program contains at least one Insert
608    /// opcode, meaning column defaults may be needed during execution.
609    has_insert: bool,
610}
611
612impl VdbeProgram {
613    /// The instruction sequence.
614    pub fn ops(&self) -> &[VdbeOp] {
615        &self.ops
616    }
617
618    /// Number of instructions.
619    pub fn len(&self) -> usize {
620        self.ops.len()
621    }
622
623    /// Whether the program is empty.
624    pub fn is_empty(&self) -> bool {
625        self.ops.is_empty()
626    }
627
628    /// Number of registers required.
629    pub fn register_count(&self) -> i32 {
630        self.register_count
631    }
632
633    /// Highest 1-based bind parameter index referenced by the program.
634    ///
635    /// Returns `Ok(0)` when no `Variable` opcodes are present.
636    /// Returns `Err(raw_index)` if the bytecode contains an invalid
637    /// parameter index (`<= 0` or not representable as `usize`).
638    pub fn max_bind_parameter_index(&self) -> std::result::Result<usize, i32> {
639        self.bind_parameter_requirement
640    }
641
642    /// Get the instruction at the given program counter.
643    pub fn get(&self, pc: usize) -> Option<&VdbeOp> {
644        self.ops.get(pc)
645    }
646
647    /// Table-to-index cursor metadata for REPLACE conflict resolution.
648    pub fn table_index_meta(&self) -> &TableIndexMetaMap {
649        self.table_index_meta.as_ref()
650    }
651
652    pub(crate) fn shared_table_index_meta(&self) -> &Arc<TableIndexMetaMap> {
653        &self.table_index_meta
654    }
655
656    /// Returns `true` if the program contains any `Insert` opcodes,
657    /// meaning column defaults may be needed during execution.
658    /// Precomputed at build time — O(1) at call time.
659    pub fn has_insert_ops(&self) -> bool {
660        self.has_insert
661    }
662
663    /// Disassemble the program to a human-readable string.
664    ///
665    /// Output format matches SQLite's `EXPLAIN` output:
666    /// ```text
667    /// addr  opcode         p1    p2    p3    p4             p5
668    /// ----  ----------     ----  ----  ----  -----          --
669    /// 0     Init           0     8     0                    0
670    /// ```
671    pub fn disassemble(&self) -> String {
672        use std::fmt::Write;
673
674        let mut out = std::string::String::with_capacity(self.ops.len() * 60);
675        out.push_str("addr  opcode           p1    p2    p3    p4                 p5\n");
676        out.push_str("----  ---------------  ----  ----  ----  -----------------  --\n");
677
678        for (addr, op) in self.ops.iter().enumerate() {
679            let p4_str = match &op.p4 {
680                P4::None => String::new(),
681                P4::Int(v) => format!("(int){v}"),
682                P4::Int64(v) => format!("(i64){v}"),
683                P4::Real(v) => format!("(real){v}"),
684                P4::Str(s) => format!("(str){s}"),
685                P4::Blob(b) => format!("(blob)[{}B]", b.len()),
686                P4::Collation(c) => format!("(coll){c}"),
687                P4::FuncName(f) => format!("(func){f}"),
688                P4::FuncNameCollated(f, c) => format!("(func){f} coll={c}"),
689                P4::Table(t) => format!("(tbl){t}"),
690                P4::Index(i) => format!("(idx){i}"),
691                P4::Affinity(a) => format!("(aff){a}"),
692                P4::TimeTravelCommitSeq(seq) => format!("(tt-seq){seq}"),
693                P4::TimeTravelTimestamp(ts) => format!("(tt-ts){ts}"),
694            };
695
696            let _ = writeln!(
697                &mut out,
698                "{addr:<4}  {:<15}  {:<4}  {:<4}  {:<4}  {:<17}  {:<2}",
699                op.opcode.name(),
700                op.p1,
701                op.p2,
702                op.p3,
703                p4_str,
704                op.p5,
705            );
706        }
707
708        out
709    }
710}
711
712fn compute_bind_parameter_requirement(ops: &[VdbeOp]) -> std::result::Result<usize, i32> {
713    let mut max_required = 0_usize;
714    for op in ops {
715        if op.opcode != Opcode::Variable {
716            continue;
717        }
718        let one_based = match usize::try_from(op.p1) {
719            Ok(index) if index > 0 => index,
720            _ => return Err(op.p1),
721        };
722        max_required = max_required.max(one_based);
723    }
724    Ok(max_required)
725}
726
727// ── PRAGMA Handling ──────────────────────────────────────────────────────────
728
729/// Minimal PRAGMA dispatch for early phases.
730///
731/// The full engine will execute PRAGMA statements through the SQL pipeline,
732/// but we keep these handlers in VDBE (the execution boundary) so higher layers
733/// can remain declarative.
734pub mod pragma {
735    use std::path::Path;
736
737    use fsqlite_ast::{Expr, Literal, PragmaStatement, PragmaValue, QualifiedName, UnaryOp};
738    use fsqlite_error::{FrankenError, Result};
739    use fsqlite_mvcc::TransactionManager;
740    use fsqlite_wal::{
741        DEFAULT_RAPTORQ_REPAIR_SYMBOLS, MAX_RAPTORQ_REPAIR_SYMBOLS,
742        persist_wal_fec_raptorq_repair_symbols, read_wal_fec_raptorq_repair_symbols,
743    };
744    use tracing::{debug, error, info, warn};
745
746    /// Result of applying a PRAGMA statement.
747    #[derive(Debug, Clone, PartialEq, Eq)]
748    pub enum PragmaOutput {
749        /// PRAGMA not recognized by this handler.
750        Unsupported,
751        /// PRAGMA yields a boolean value (e.g. query or echo after set).
752        Bool(bool),
753        /// PRAGMA yields an integer value.
754        Int(i64),
755        /// PRAGMA yields a text value (e.g. `journal_mode`).
756        Text(String),
757    }
758
759    /// Connection-level settings controlled by PRAGMA statements.
760    ///
761    /// These mirror the standard SQLite PRAGMAs that the E2E harness needs to
762    /// set consistently across both `sqlite3` and FrankenSQLite runs.  Values
763    /// are stored here for future backend wiring (Phase 5+) and are immediately
764    /// queryable via `PRAGMA <name>`.
765    #[derive(Debug, Clone, Copy)]
766    pub enum DifferentialViewsSetting {
767        Off,
768        On,
769    }
770
771    impl DifferentialViewsSetting {
772        #[must_use]
773        pub const fn is_enabled(&self) -> bool {
774            matches!(self, Self::On)
775        }
776
777        #[must_use]
778        pub const fn from_enabled(enabled: bool) -> Self {
779            if enabled { Self::On } else { Self::Off }
780        }
781    }
782
783    #[derive(Debug, Clone)]
784    pub struct ConnectionPragmaState {
785        /// Journal mode (`delete`, `truncate`, `persist`, `memory`, `wal`, `off`).
786        pub journal_mode: String,
787        /// Synchronous level (`OFF`, `NORMAL`, `FULL`, `EXTRA`).
788        pub synchronous: String,
789        /// Page cache size (negative = KiB, positive = pages).
790        pub cache_size: i64,
791        /// Page size in bytes (512..=65536, power of two).
792        pub page_size: u32,
793        /// Busy timeout in milliseconds for lock contention.
794        pub busy_timeout_ms: i64,
795        /// Temporary storage mode (`0` default, `1` file, `2` memory).
796        pub temp_store: i64,
797        /// Memory-map size in bytes (`PRAGMA mmap_size`).
798        pub mmap_size: i64,
799        /// Auto-vacuum mode (`0` none, `1` full, `2` incremental).
800        pub auto_vacuum: i64,
801        /// WAL auto-checkpoint threshold in pages.
802        pub wal_autocheckpoint: i64,
803        /// User schema version (`PRAGMA user_version`).
804        pub user_version: i64,
805        /// Application ID (`PRAGMA application_id`).
806        pub application_id: i64,
807        /// Foreign key enforcement toggle (`PRAGMA foreign_keys`).
808        pub foreign_keys: bool,
809        /// Recursive trigger toggle (`PRAGMA recursive_triggers`).
810        pub recursive_triggers: bool,
811        /// Connection-level SSI toggle (`PRAGMA fsqlite.serializable`).
812        pub serializable: bool,
813        /// Differential-view streaming toggle (`PRAGMA fsqlite_differential_views`).
814        pub differential_views: DifferentialViewsSetting,
815        /// WAL-FEC repair symbol budget (`PRAGMA raptorq_repair_symbols`).
816        pub raptorq_repair_symbols: u8,
817    }
818
819    impl Default for ConnectionPragmaState {
820        fn default() -> Self {
821            Self {
822                journal_mode: "wal".to_owned(),
823                synchronous: "NORMAL".to_owned(),
824                cache_size: -2000,
825                page_size: 4096,
826                busy_timeout_ms: 5000,
827                temp_store: 0,
828                mmap_size: 0,
829                auto_vacuum: 0,
830                wal_autocheckpoint: 1000,
831                user_version: 0,
832                application_id: 0,
833                foreign_keys: false,
834                recursive_triggers: false,
835                serializable: true,
836                differential_views: DifferentialViewsSetting::Off,
837                raptorq_repair_symbols: DEFAULT_RAPTORQ_REPAIR_SYMBOLS,
838            }
839        }
840    }
841
842    /// Apply a PRAGMA statement to the provided connection-scoped state.
843    ///
844    /// Currently supports:
845    /// - `PRAGMA fsqlite.serializable`
846    /// - `PRAGMA fsqlite.serializable = ON|OFF|TRUE|FALSE|1|0`
847    /// - `PRAGMA raptorq_repair_symbols`
848    /// - `PRAGMA raptorq_repair_symbols = N` (N in [0, 255])
849    ///
850    /// Unknown pragmas return [`PragmaOutput::Unsupported`].
851    pub fn apply(mgr: &mut TransactionManager, stmt: &PragmaStatement) -> Result<PragmaOutput> {
852        apply_with_sidecar(mgr, stmt, None)
853    }
854
855    /// Apply a PRAGMA statement with optional `.wal-fec` sidecar persistence.
856    pub fn apply_with_sidecar(
857        mgr: &mut TransactionManager,
858        stmt: &PragmaStatement,
859        wal_fec_sidecar_path: Option<&Path>,
860    ) -> Result<PragmaOutput> {
861        if is_fsqlite_serializable(&stmt.name) {
862            return apply_serializable(mgr, stmt);
863        }
864        if is_raptorq_repair_symbols(&stmt.name) {
865            return apply_raptorq_repair_symbols(mgr, stmt, wal_fec_sidecar_path);
866        }
867        Ok(PragmaOutput::Unsupported)
868    }
869
870    /// Apply a PRAGMA to connection-level settings.
871    ///
872    /// Handles common connection-scoped PRAGMAs used by the harness and
873    /// compatibility paths. Returns `Unsupported` for pragmas not handled at
874    /// this layer, allowing the caller to chain with [`apply`].
875    pub fn apply_connection_pragma(
876        state: &mut ConnectionPragmaState,
877        stmt: &PragmaStatement,
878    ) -> Result<PragmaOutput> {
879        let name = &stmt.name.name;
880        if is_fsqlite_serializable(&stmt.name) {
881            return apply_serializable_connection(state, stmt);
882        }
883        if is_fsqlite_differential_views(&stmt.name) {
884            return apply_differential_views_connection(state, stmt);
885        }
886        if is_raptorq_repair_symbols(&stmt.name) {
887            return apply_raptorq_repair_symbols_connection(state, stmt);
888        }
889        if name.eq_ignore_ascii_case("journal_mode") {
890            return apply_journal_mode(state, stmt);
891        }
892        if name.eq_ignore_ascii_case("synchronous") {
893            return apply_synchronous(state, stmt);
894        }
895        if name.eq_ignore_ascii_case("cache_size") {
896            return apply_cache_size(state, stmt);
897        }
898        if name.eq_ignore_ascii_case("page_size") {
899            return apply_page_size(state, stmt);
900        }
901        if name.eq_ignore_ascii_case("busy_timeout") {
902            return apply_busy_timeout(state, stmt);
903        }
904        if name.eq_ignore_ascii_case("temp_store") {
905            return apply_temp_store(state, stmt);
906        }
907        if name.eq_ignore_ascii_case("mmap_size") {
908            return apply_mmap_size(state, stmt);
909        }
910        if name.eq_ignore_ascii_case("auto_vacuum") {
911            return apply_auto_vacuum(state, stmt);
912        }
913        if name.eq_ignore_ascii_case("wal_autocheckpoint") {
914            return apply_wal_autocheckpoint(state, stmt);
915        }
916        if name.eq_ignore_ascii_case("user_version") {
917            return apply_user_version(state, stmt);
918        }
919        if name.eq_ignore_ascii_case("application_id") {
920            return apply_application_id(state, stmt);
921        }
922        if name.eq_ignore_ascii_case("foreign_keys") {
923            return apply_foreign_keys(state, stmt);
924        }
925        if name.eq_ignore_ascii_case("recursive_triggers") {
926            return apply_recursive_triggers(state, stmt);
927        }
928        Ok(PragmaOutput::Unsupported)
929    }
930
931    fn apply_serializable_connection(
932        state: &mut ConnectionPragmaState,
933        stmt: &PragmaStatement,
934    ) -> Result<PragmaOutput> {
935        match &stmt.value {
936            None => Ok(PragmaOutput::Bool(state.serializable)),
937            Some(PragmaValue::Assign(expr) | PragmaValue::Call(expr)) => {
938                let enabled = parse_bool(expr)?;
939                state.serializable = enabled;
940                Ok(PragmaOutput::Bool(enabled))
941            }
942        }
943    }
944
945    fn apply_raptorq_repair_symbols_connection(
946        state: &mut ConnectionPragmaState,
947        stmt: &PragmaStatement,
948    ) -> Result<PragmaOutput> {
949        match &stmt.value {
950            None => Ok(PragmaOutput::Int(i64::from(state.raptorq_repair_symbols))),
951            Some(PragmaValue::Assign(expr) | PragmaValue::Call(expr)) => {
952                let value = parse_integer_expr(expr)?;
953                if !(0..=i64::from(MAX_RAPTORQ_REPAIR_SYMBOLS)).contains(&value) {
954                    return Err(FrankenError::OutOfRange {
955                        what: "raptorq_repair_symbols".to_owned(),
956                        value: value.to_string(),
957                    });
958                }
959                #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
960                {
961                    state.raptorq_repair_symbols = value as u8;
962                }
963                Ok(PragmaOutput::Int(i64::from(state.raptorq_repair_symbols)))
964            }
965        }
966    }
967
968    fn apply_differential_views_connection(
969        state: &mut ConnectionPragmaState,
970        stmt: &PragmaStatement,
971    ) -> Result<PragmaOutput> {
972        match &stmt.value {
973            None => Ok(PragmaOutput::Bool(state.differential_views.is_enabled())),
974            Some(PragmaValue::Assign(expr) | PragmaValue::Call(expr)) => {
975                let enabled = parse_bool(expr)?;
976                state.differential_views = DifferentialViewsSetting::from_enabled(enabled);
977                Ok(PragmaOutput::Bool(enabled))
978            }
979        }
980    }
981
982    fn apply_journal_mode(
983        state: &mut ConnectionPragmaState,
984        stmt: &PragmaStatement,
985    ) -> Result<PragmaOutput> {
986        match &stmt.value {
987            None => Ok(PragmaOutput::Text(state.journal_mode.clone())),
988            Some(PragmaValue::Assign(expr) | PragmaValue::Call(expr)) => {
989                let mode = parse_text_expr(expr)?;
990                let lower = mode.to_ascii_lowercase();
991                match lower.as_str() {
992                    "delete" | "truncate" | "persist" | "memory" | "wal" | "off" => {
993                        state.journal_mode.clone_from(&lower);
994                        Ok(PragmaOutput::Text(lower))
995                    }
996                    _ => Err(FrankenError::TypeMismatch {
997                        expected: "delete|truncate|persist|memory|wal|off".to_owned(),
998                        actual: mode,
999                    }),
1000                }
1001            }
1002        }
1003    }
1004
1005    fn apply_synchronous(
1006        state: &mut ConnectionPragmaState,
1007        stmt: &PragmaStatement,
1008    ) -> Result<PragmaOutput> {
1009        match &stmt.value {
1010            None => Ok(PragmaOutput::Text(state.synchronous.clone())),
1011            Some(PragmaValue::Assign(expr) | PragmaValue::Call(expr)) => {
1012                let val = parse_synchronous_value(expr)?;
1013                state.synchronous.clone_from(&val);
1014                Ok(PragmaOutput::Text(val))
1015            }
1016        }
1017    }
1018
1019    fn parse_synchronous_value(expr: &Expr) -> Result<String> {
1020        // Accept both text names and integer codes (0=OFF, 1=NORMAL, 2=FULL, 3=EXTRA).
1021        if let Expr::Literal(Literal::Integer(n), _) = expr {
1022            match n {
1023                0 => Ok("OFF".to_owned()),
1024                1 => Ok("NORMAL".to_owned()),
1025                2 => Ok("FULL".to_owned()),
1026                3 => Ok("EXTRA".to_owned()),
1027                _ => Err(FrankenError::OutOfRange {
1028                    what: "synchronous".to_owned(),
1029                    value: n.to_string(),
1030                }),
1031            }
1032        } else {
1033            let text = parse_text_expr(expr)?;
1034            let upper = text.to_ascii_uppercase();
1035            match upper.as_str() {
1036                "OFF" | "NORMAL" | "FULL" | "EXTRA" => Ok(upper),
1037                _ => Err(FrankenError::TypeMismatch {
1038                    expected: "OFF|NORMAL|FULL|EXTRA|0|1|2|3".to_owned(),
1039                    actual: text,
1040                }),
1041            }
1042        }
1043    }
1044
1045    fn apply_cache_size(
1046        state: &mut ConnectionPragmaState,
1047        stmt: &PragmaStatement,
1048    ) -> Result<PragmaOutput> {
1049        match &stmt.value {
1050            None => Ok(PragmaOutput::Int(state.cache_size)),
1051            Some(PragmaValue::Assign(expr) | PragmaValue::Call(expr)) => {
1052                let val = parse_integer_expr(expr)?;
1053                state.cache_size = val;
1054                Ok(PragmaOutput::Int(val))
1055            }
1056        }
1057    }
1058
1059    fn apply_page_size(
1060        state: &mut ConnectionPragmaState,
1061        stmt: &PragmaStatement,
1062    ) -> Result<PragmaOutput> {
1063        match &stmt.value {
1064            None => Ok(PragmaOutput::Int(i64::from(state.page_size))),
1065            Some(PragmaValue::Assign(expr) | PragmaValue::Call(expr)) => {
1066                let val = parse_integer_expr(expr)?;
1067                if !(512..=65536).contains(&val) || !is_power_of_two(val) {
1068                    return Err(FrankenError::OutOfRange {
1069                        what: "page_size".to_owned(),
1070                        value: val.to_string(),
1071                    });
1072                }
1073                #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
1074                {
1075                    state.page_size = val as u32;
1076                }
1077                Ok(PragmaOutput::Int(val))
1078            }
1079        }
1080    }
1081
1082    fn apply_busy_timeout(
1083        state: &mut ConnectionPragmaState,
1084        stmt: &PragmaStatement,
1085    ) -> Result<PragmaOutput> {
1086        match &stmt.value {
1087            None => Ok(PragmaOutput::Int(state.busy_timeout_ms)),
1088            Some(PragmaValue::Assign(expr) | PragmaValue::Call(expr)) => {
1089                let val = parse_integer_expr(expr)?;
1090                state.busy_timeout_ms = val.max(0);
1091                Ok(PragmaOutput::Int(state.busy_timeout_ms))
1092            }
1093        }
1094    }
1095
1096    fn apply_temp_store(
1097        state: &mut ConnectionPragmaState,
1098        stmt: &PragmaStatement,
1099    ) -> Result<PragmaOutput> {
1100        match &stmt.value {
1101            None => Ok(PragmaOutput::Int(state.temp_store)),
1102            Some(PragmaValue::Assign(expr) | PragmaValue::Call(expr)) => {
1103                let val = parse_temp_store_value(expr)?;
1104                state.temp_store = val;
1105                Ok(PragmaOutput::Int(val))
1106            }
1107        }
1108    }
1109
1110    fn apply_mmap_size(
1111        state: &mut ConnectionPragmaState,
1112        stmt: &PragmaStatement,
1113    ) -> Result<PragmaOutput> {
1114        match &stmt.value {
1115            None => Ok(PragmaOutput::Int(state.mmap_size)),
1116            Some(PragmaValue::Assign(expr) | PragmaValue::Call(expr)) => {
1117                let val = parse_integer_expr(expr)?;
1118                state.mmap_size = val.max(0);
1119                Ok(PragmaOutput::Int(state.mmap_size))
1120            }
1121        }
1122    }
1123
1124    fn apply_auto_vacuum(
1125        state: &mut ConnectionPragmaState,
1126        stmt: &PragmaStatement,
1127    ) -> Result<PragmaOutput> {
1128        match &stmt.value {
1129            None => Ok(PragmaOutput::Int(state.auto_vacuum)),
1130            Some(PragmaValue::Assign(expr) | PragmaValue::Call(expr)) => {
1131                let val = parse_auto_vacuum_value(expr)?;
1132                state.auto_vacuum = val;
1133                Ok(PragmaOutput::Int(val))
1134            }
1135        }
1136    }
1137
1138    fn apply_wal_autocheckpoint(
1139        state: &mut ConnectionPragmaState,
1140        stmt: &PragmaStatement,
1141    ) -> Result<PragmaOutput> {
1142        match &stmt.value {
1143            None => Ok(PragmaOutput::Int(state.wal_autocheckpoint)),
1144            Some(PragmaValue::Assign(expr) | PragmaValue::Call(expr)) => {
1145                let val = parse_integer_expr(expr)?;
1146                state.wal_autocheckpoint = val.max(0);
1147                Ok(PragmaOutput::Int(state.wal_autocheckpoint))
1148            }
1149        }
1150    }
1151
1152    fn apply_user_version(
1153        state: &mut ConnectionPragmaState,
1154        stmt: &PragmaStatement,
1155    ) -> Result<PragmaOutput> {
1156        match &stmt.value {
1157            None => Ok(PragmaOutput::Int(state.user_version)),
1158            Some(PragmaValue::Assign(expr) | PragmaValue::Call(expr)) => {
1159                let val = parse_integer_expr(expr)?;
1160                state.user_version = val;
1161                Ok(PragmaOutput::Int(val))
1162            }
1163        }
1164    }
1165
1166    fn apply_application_id(
1167        state: &mut ConnectionPragmaState,
1168        stmt: &PragmaStatement,
1169    ) -> Result<PragmaOutput> {
1170        match &stmt.value {
1171            None => Ok(PragmaOutput::Int(state.application_id)),
1172            Some(PragmaValue::Assign(expr) | PragmaValue::Call(expr)) => {
1173                let val = parse_integer_expr(expr)?;
1174                state.application_id = val;
1175                Ok(PragmaOutput::Int(val))
1176            }
1177        }
1178    }
1179
1180    fn apply_foreign_keys(
1181        state: &mut ConnectionPragmaState,
1182        stmt: &PragmaStatement,
1183    ) -> Result<PragmaOutput> {
1184        match &stmt.value {
1185            None => Ok(PragmaOutput::Int(i64::from(state.foreign_keys))),
1186            Some(PragmaValue::Assign(expr) | PragmaValue::Call(expr)) => {
1187                let enabled = parse_bool(expr)?;
1188                state.foreign_keys = enabled;
1189                Ok(PragmaOutput::Int(i64::from(enabled)))
1190            }
1191        }
1192    }
1193
1194    fn apply_recursive_triggers(
1195        state: &mut ConnectionPragmaState,
1196        stmt: &PragmaStatement,
1197    ) -> Result<PragmaOutput> {
1198        match &stmt.value {
1199            None => Ok(PragmaOutput::Int(i64::from(state.recursive_triggers))),
1200            Some(PragmaValue::Assign(expr) | PragmaValue::Call(expr)) => {
1201                let enabled = parse_bool(expr)?;
1202                state.recursive_triggers = enabled;
1203                Ok(PragmaOutput::Int(i64::from(enabled)))
1204            }
1205        }
1206    }
1207
1208    fn parse_temp_store_value(expr: &Expr) -> Result<i64> {
1209        if let Expr::Literal(Literal::Integer(n), _) = expr {
1210            return match *n {
1211                0..=2 => Ok(*n),
1212                _ => Err(FrankenError::OutOfRange {
1213                    what: "temp_store".to_owned(),
1214                    value: n.to_string(),
1215                }),
1216            };
1217        }
1218
1219        let text = parse_text_expr(expr)?;
1220        match text.to_ascii_lowercase().as_str() {
1221            "default" => Ok(0),
1222            "file" => Ok(1),
1223            "memory" => Ok(2),
1224            _ => Err(FrankenError::TypeMismatch {
1225                expected: "DEFAULT|FILE|MEMORY|0|1|2".to_owned(),
1226                actual: text,
1227            }),
1228        }
1229    }
1230
1231    fn parse_auto_vacuum_value(expr: &Expr) -> Result<i64> {
1232        if let Expr::Literal(Literal::Integer(n), _) = expr {
1233            return match *n {
1234                0..=2 => Ok(*n),
1235                _ => Err(FrankenError::OutOfRange {
1236                    what: "auto_vacuum".to_owned(),
1237                    value: n.to_string(),
1238                }),
1239            };
1240        }
1241
1242        let text = parse_text_expr(expr)?;
1243        match text.to_ascii_lowercase().as_str() {
1244            "none" => Ok(0),
1245            "full" => Ok(1),
1246            "incremental" => Ok(2),
1247            _ => Err(FrankenError::TypeMismatch {
1248                expected: "NONE|FULL|INCREMENTAL|0|1|2".to_owned(),
1249                actual: text,
1250            }),
1251        }
1252    }
1253
1254    fn is_power_of_two(n: i64) -> bool {
1255        n > 0 && (n & (n - 1)) == 0
1256    }
1257
1258    /// Extract a text value from a PRAGMA assignment expression.
1259    fn parse_text_expr(expr: &Expr) -> Result<String> {
1260        match expr {
1261            Expr::Literal(Literal::String(s), _) => Ok(s.clone()),
1262            Expr::Column(col, _) => Ok(col.column.clone()),
1263            Expr::Literal(Literal::Integer(n), _) => Ok(n.to_string()),
1264            other => Err(FrankenError::TypeMismatch {
1265                expected: "text or identifier".to_owned(),
1266                actual: format!("{other:?}"),
1267            }),
1268        }
1269    }
1270
1271    fn apply_serializable(
1272        mgr: &mut TransactionManager,
1273        stmt: &PragmaStatement,
1274    ) -> Result<PragmaOutput> {
1275        match &stmt.value {
1276            None => Ok(PragmaOutput::Bool(mgr.ssi_enabled())),
1277            Some(PragmaValue::Assign(expr) | PragmaValue::Call(expr)) => {
1278                let enabled = parse_bool(expr)?;
1279                mgr.set_ssi_enabled(enabled);
1280                Ok(PragmaOutput::Bool(mgr.ssi_enabled()))
1281            }
1282        }
1283    }
1284
1285    fn is_fsqlite_serializable(name: &QualifiedName) -> bool {
1286        name.schema
1287            .as_deref()
1288            .is_some_and(|s| s.eq_ignore_ascii_case("fsqlite"))
1289            && name.name.eq_ignore_ascii_case("serializable")
1290    }
1291
1292    fn is_fsqlite_differential_views(name: &QualifiedName) -> bool {
1293        match name.schema.as_deref() {
1294            Some(schema) => {
1295                schema.eq_ignore_ascii_case("fsqlite")
1296                    && name.name.eq_ignore_ascii_case("differential_views")
1297            }
1298            None => name.name.eq_ignore_ascii_case("fsqlite_differential_views"),
1299        }
1300    }
1301
1302    fn is_raptorq_repair_symbols(name: &QualifiedName) -> bool {
1303        let schema_ok = match name.schema.as_deref() {
1304            None => true,
1305            Some(schema) => schema.eq_ignore_ascii_case("fsqlite"),
1306        };
1307        schema_ok && name.name.eq_ignore_ascii_case("raptorq_repair_symbols")
1308    }
1309
1310    fn apply_raptorq_repair_symbols(
1311        mgr: &mut TransactionManager,
1312        stmt: &PragmaStatement,
1313        wal_fec_sidecar_path: Option<&Path>,
1314    ) -> Result<PragmaOutput> {
1315        match &stmt.value {
1316            None => {
1317                if let Some(sidecar) = wal_fec_sidecar_path {
1318                    let persisted = read_wal_fec_raptorq_repair_symbols(sidecar)?;
1319                    mgr.set_raptorq_repair_symbols(persisted);
1320                    debug!(
1321                        sidecar = %sidecar.display(),
1322                        raptorq_repair_symbols = persisted,
1323                        "loaded raptorq_repair_symbols from wal-fec sidecar"
1324                    );
1325                }
1326                Ok(PragmaOutput::Int(i64::from(mgr.raptorq_repair_symbols())))
1327            }
1328            Some(PragmaValue::Assign(expr) | PragmaValue::Call(expr)) => {
1329                let requested = parse_raptorq_repair_symbols(expr)?;
1330                mgr.set_raptorq_repair_symbols(requested);
1331
1332                if let Some(sidecar) = wal_fec_sidecar_path {
1333                    persist_wal_fec_raptorq_repair_symbols(sidecar, requested)?;
1334                    info!(
1335                        sidecar = %sidecar.display(),
1336                        raptorq_repair_symbols = requested,
1337                        "persisted raptorq_repair_symbols to wal-fec sidecar"
1338                    );
1339                }
1340
1341                Ok(PragmaOutput::Int(i64::from(mgr.raptorq_repair_symbols())))
1342            }
1343        }
1344    }
1345
1346    fn parse_raptorq_repair_symbols(expr: &Expr) -> Result<u8> {
1347        let raw = parse_integer_expr(expr)?;
1348        if raw < 0 {
1349            warn!(
1350                value = raw,
1351                "rejecting negative raptorq_repair_symbols value"
1352            );
1353            return Err(FrankenError::OutOfRange {
1354                what: "raptorq_repair_symbols".to_owned(),
1355                value: raw.to_string(),
1356            });
1357        }
1358
1359        let max = i64::from(MAX_RAPTORQ_REPAIR_SYMBOLS);
1360        if raw > max {
1361            warn!(
1362                value = raw,
1363                max = MAX_RAPTORQ_REPAIR_SYMBOLS,
1364                "rejecting out-of-range raptorq_repair_symbols value"
1365            );
1366            return Err(FrankenError::OutOfRange {
1367                what: "raptorq_repair_symbols".to_owned(),
1368                value: raw.to_string(),
1369            });
1370        }
1371
1372        u8::try_from(raw).map_err(|_| {
1373            error!(
1374                value = raw,
1375                "failed to convert validated raptorq_repair_symbols to u8"
1376            );
1377            FrankenError::OutOfRange {
1378                what: "raptorq_repair_symbols".to_owned(),
1379                value: raw.to_string(),
1380            }
1381        })
1382    }
1383
1384    fn parse_integer_expr(expr: &Expr) -> Result<i64> {
1385        match expr {
1386            Expr::Literal(Literal::Integer(n), _) => Ok(*n),
1387            Expr::UnaryOp {
1388                op: UnaryOp::Negate,
1389                expr,
1390                ..
1391            } => Ok(-parse_integer_expr(expr)?),
1392            Expr::UnaryOp {
1393                op: UnaryOp::Plus,
1394                expr,
1395                ..
1396            } => parse_integer_expr(expr),
1397            Expr::Column(col, _) => {
1398                col.column
1399                    .parse::<i64>()
1400                    .map_err(|_| FrankenError::TypeMismatch {
1401                        expected: "integer (0..255)".to_owned(),
1402                        actual: col.column.clone(),
1403                    })
1404            }
1405            other => Err(FrankenError::TypeMismatch {
1406                expected: "integer (0..255)".to_owned(),
1407                actual: format!("{other:?}"),
1408            }),
1409        }
1410    }
1411
1412    fn parse_bool(expr: &Expr) -> Result<bool> {
1413        let (raw, parsed) = match expr {
1414            Expr::Literal(Literal::Integer(n), _) => (format!("{n}"), parse_int_bool(*n)),
1415            Expr::Literal(Literal::String(s), _) => (s.clone(), parse_str_bool(s)),
1416            Expr::Literal(Literal::True, _) => ("TRUE".to_owned(), Some(true)),
1417            Expr::Literal(Literal::False, _) => ("FALSE".to_owned(), Some(false)),
1418            Expr::Column(col, _) => (col.column.clone(), parse_str_bool(&col.column)),
1419            other => {
1420                return Err(FrankenError::TypeMismatch {
1421                    expected: "ON|OFF|TRUE|FALSE|1|0".to_owned(),
1422                    actual: format!("{other:?}"),
1423                });
1424            }
1425        };
1426
1427        parsed.ok_or_else(|| FrankenError::TypeMismatch {
1428            expected: "ON|OFF|TRUE|FALSE|1|0".to_owned(),
1429            actual: raw,
1430        })
1431    }
1432
1433    fn parse_int_bool(n: i64) -> Option<bool> {
1434        match n {
1435            0 => Some(false),
1436            1 => Some(true),
1437            _ => None,
1438        }
1439    }
1440
1441    fn parse_str_bool(s: &str) -> Option<bool> {
1442        if s.eq_ignore_ascii_case("on") || s.eq_ignore_ascii_case("true") {
1443            Some(true)
1444        } else if s.eq_ignore_ascii_case("off") || s.eq_ignore_ascii_case("false") {
1445            Some(false)
1446        } else if s == "1" {
1447            Some(true)
1448        } else if s == "0" {
1449            Some(false)
1450        } else {
1451            None
1452        }
1453    }
1454}
1455
1456// ── Tests ───────────────────────────────────────────────────────────────────
1457
1458#[cfg(test)]
1459mod tests {
1460    use super::*;
1461
1462    // ── test_vdbe_op_struct_size ─────────────────────────────────────────
1463    #[test]
1464    fn test_vdbe_op_struct_size() {
1465        // Verify VdbeOp fields are accessible and correctly typed.
1466        let op = VdbeOp {
1467            opcode: Opcode::Integer,
1468            p1: 42,
1469            p2: 1,
1470            p3: 0,
1471            p4: P4::None,
1472            p5: 0,
1473        };
1474        assert_eq!(op.opcode, Opcode::Integer);
1475        assert_eq!(op.p1, 42_i32);
1476        assert_eq!(op.p2, 1_i32);
1477        assert_eq!(op.p3, 0_i32);
1478        assert_eq!(op.p4, P4::None);
1479        assert_eq!(op.p5, 0_u16);
1480    }
1481
1482    // ── test_p4_variant_all_types ───────────────────────────────────────
1483    #[test]
1484    fn test_p4_variant_all_types() {
1485        // Each P4 variant can be constructed and pattern-matched.
1486        let variants: Vec<P4> = vec![
1487            P4::None,
1488            P4::Int(42),
1489            P4::Int64(i64::MAX),
1490            P4::Real(1.234_567_89),
1491            P4::Str("hello".to_owned()),
1492            P4::Blob(vec![0xDE, 0xAD]),
1493            P4::Collation("BINARY".to_owned()),
1494            P4::FuncName("count".to_owned()),
1495            P4::Table("users".to_owned()),
1496            P4::Affinity("ddd".to_owned()),
1497        ];
1498        assert_eq!(variants.len(), 10);
1499
1500        // Verify each variant matches itself.
1501        assert!(matches!(variants[0], P4::None));
1502        assert!(matches!(variants[1], P4::Int(42)));
1503        assert!(matches!(variants[2], P4::Int64(i64::MAX)));
1504        assert!(matches!(variants[3], P4::Real(_)));
1505        assert!(matches!(variants[4], P4::Str(_)));
1506        assert!(matches!(variants[5], P4::Blob(_)));
1507        assert!(matches!(variants[6], P4::Collation(_)));
1508        assert!(matches!(variants[7], P4::FuncName(ref s) if s == "count"));
1509        assert!(matches!(variants[8], P4::Table(ref s) if s == "users"));
1510        assert!(matches!(variants[9], P4::Affinity(ref s) if s == "ddd"));
1511    }
1512
1513    // ── test_label_emit_and_resolve ─────────────────────────────────────
1514    #[test]
1515    fn test_label_emit_and_resolve() {
1516        let mut b = ProgramBuilder::new();
1517
1518        // Emit two distinct labels.
1519        let label_a = b.emit_label();
1520        let label_b = b.emit_label();
1521        assert_ne!(label_a, label_b);
1522
1523        // Emit a jump to label_a (forward reference).
1524        let jump_addr = b.emit_jump_to_label(Opcode::Goto, 0, 0, label_a, P4::None, 0);
1525        assert_eq!(b.op_at(jump_addr).unwrap().p2, -1); // unresolved placeholder
1526
1527        // Emit some instructions.
1528        b.emit_op(Opcode::Integer, 1, 1, 0, P4::None, 0);
1529        b.emit_op(Opcode::Integer, 2, 2, 0, P4::None, 0);
1530
1531        // Resolve label_a to the current address (2 instructions after the jump).
1532        b.resolve_label(label_a);
1533
1534        // The jump's p2 should now be patched to address 3.
1535        assert_eq!(b.op_at(jump_addr).unwrap().p2, 3);
1536
1537        // Emit another jump to label_b.
1538        let jump2 = b.emit_jump_to_label(Opcode::If, 1, 0, label_b, P4::None, 0);
1539        b.emit_op(Opcode::Halt, 0, 0, 0, P4::None, 0);
1540        b.resolve_label(label_b);
1541        assert_eq!(b.op_at(jump2).unwrap().p2, 5);
1542
1543        // Finish should succeed (all labels resolved).
1544        let prog = b.finish().unwrap();
1545        assert_eq!(prog.len(), 5);
1546    }
1547
1548    // ── test_unresolved_label_error ─────────────────────────────────────
1549    #[test]
1550    fn test_unresolved_label_error() {
1551        let mut b = ProgramBuilder::new();
1552        let label = b.emit_label();
1553        b.emit_jump_to_label(Opcode::Goto, 0, 0, label, P4::None, 0);
1554
1555        // Don't resolve the label — finish should fail.
1556        let result = b.finish();
1557        assert!(result.is_err());
1558    }
1559
1560    // ── test_register_alloc_sequential ──────────────────────────────────
1561    #[test]
1562    fn test_register_alloc_sequential() {
1563        let mut alloc = RegisterAllocator::new();
1564
1565        // Sequential single allocations start at 1.
1566        assert_eq!(alloc.alloc_reg(), 1);
1567        assert_eq!(alloc.alloc_reg(), 2);
1568        assert_eq!(alloc.alloc_reg(), 3);
1569
1570        // Block allocation returns first register of contiguous block.
1571        let block_start = alloc.alloc_regs(3);
1572        assert_eq!(block_start, 4);
1573        // Next single alloc continues after the block.
1574        assert_eq!(alloc.alloc_reg(), 7);
1575
1576        assert_eq!(alloc.count(), 7);
1577    }
1578
1579    // ── test_register_temp_pool_reuse ───────────────────────────────────
1580    #[test]
1581    fn test_register_temp_pool_reuse() {
1582        let mut alloc = RegisterAllocator::new();
1583
1584        let r1 = alloc.alloc_reg(); // 1
1585        let t1 = alloc.alloc_temp(); // 2 (new allocation)
1586        let t2 = alloc.alloc_temp(); // 3 (new allocation)
1587        assert_eq!(r1, 1);
1588        assert_eq!(t1, 2);
1589        assert_eq!(t2, 3);
1590
1591        // Return temps to pool.
1592        alloc.free_temp(t1);
1593        alloc.free_temp(t2);
1594
1595        // Next temp allocations reuse from pool (LIFO order).
1596        let t3 = alloc.alloc_temp();
1597        let t4 = alloc.alloc_temp();
1598        assert_eq!(t3, t2); // 3 (last freed)
1599        assert_eq!(t4, t1); // 2
1600
1601        // High water mark unchanged (no new registers needed).
1602        assert_eq!(alloc.count(), 3);
1603    }
1604
1605    // ── test_coroutine_init_yield_end ───────────────────────────────────
1606    #[test]
1607    fn test_coroutine_init_yield_end() {
1608        // InitCoroutine: set yield register to body PC.
1609        let yield_reg = 1;
1610        let body_pc = 10;
1611        let mut co = CoroutineState::new(yield_reg, body_pc);
1612        assert_eq!(co.yield_reg, yield_reg);
1613        assert_eq!(co.saved_pc, body_pc);
1614        assert!(!co.exhausted);
1615
1616        // Yield: bidirectional PC swap.
1617        // Caller is at PC=5, coroutine body is at PC=10.
1618        let resume = co.yield_swap(5);
1619        assert_eq!(resume, 10); // jump to body
1620        assert_eq!(co.saved_pc, 5); // caller's PC saved
1621
1622        // Body yields back: caller at 5, body at 15.
1623        let resume2 = co.yield_swap(15);
1624        assert_eq!(resume2, 5); // back to caller
1625        assert_eq!(co.saved_pc, 15);
1626
1627        // EndCoroutine: marks exhaustion, returns to caller.
1628        let final_pc = co.end();
1629        assert_eq!(final_pc, 15); // returns saved_pc
1630        assert!(co.exhausted);
1631    }
1632
1633    // ── test_coroutine_multi_row_production ─────────────────────────────
1634    #[test]
1635    fn test_coroutine_multi_row_production() {
1636        // Simulate a CTE body producing 5 rows via Yield loop.
1637        let mut co = CoroutineState::new(1, 10); // body starts at PC=10
1638        let mut rows_consumed = 0;
1639        let caller_start_pc = 5;
1640
1641        // Caller yields to body.
1642        let mut next_pc = co.yield_swap(caller_start_pc);
1643        assert_eq!(next_pc, 10); // first entry into body
1644
1645        // Body produces rows.
1646        for row in 1..=5 {
1647            // Body "produces" a row, then yields back to caller.
1648            let body_pc = 10 + row; // body advances its PC
1649            next_pc = co.yield_swap(body_pc);
1650            // Caller resumes at its saved PC.
1651            assert_eq!(next_pc, caller_start_pc);
1652            rows_consumed += 1;
1653
1654            if row < 5 {
1655                // Caller yields back to body to get next row.
1656                next_pc = co.yield_swap(caller_start_pc);
1657                assert_eq!(next_pc, body_pc); // resume body
1658            }
1659        }
1660
1661        assert_eq!(rows_consumed, 5);
1662
1663        // Body signals exhaustion.
1664        let final_pc = co.end();
1665        assert!(co.exhausted);
1666        assert!(final_pc > 0); // valid return PC
1667    }
1668
1669    #[test]
1670    fn test_program_builder_infers_register_count_from_manual_opcode_registers() {
1671        let mut builder = ProgramBuilder::new();
1672        builder.emit_op(Opcode::Integer, 11, 3, 0, P4::None, 0);
1673        builder.emit_op(Opcode::ResultRow, 3, 1, 0, P4::None, 0);
1674        builder.emit_op(Opcode::Halt, 0, 0, 0, P4::None, 0);
1675
1676        let program = builder.finish().expect("program should build");
1677        assert_eq!(
1678            program.register_count(),
1679            3,
1680            "bytecode that writes raw registers must still allocate a large enough register file",
1681        );
1682    }
1683
1684    // ── test_all_opcode_dispatch_coverage ────────────────────────────────
1685    #[test]
1686    fn test_all_opcode_dispatch_coverage() {
1687        // Every Opcode enum variant (1..=191) has a valid name and can be
1688        // constructed from its byte value. This ensures no gaps in the enum.
1689        for byte in 1..=191u8 {
1690            let opcode = Opcode::from_byte(byte);
1691            assert!(
1692                opcode.is_some(),
1693                "Opcode::from_byte({byte}) returned None — gap in opcode enum"
1694            );
1695            let opcode = opcode.unwrap();
1696            let name = opcode.name();
1697            assert!(!name.is_empty(), "opcode {byte} has empty name");
1698        }
1699        assert_eq!(Opcode::COUNT, 192);
1700    }
1701
1702    // ── test_p5_flags_u16_range ─────────────────────────────────────────
1703    #[test]
1704    fn test_p5_flags_u16_range() {
1705        // Confirm p5 is u16 and accepts values above 0xFF.
1706        let op = VdbeOp {
1707            opcode: Opcode::Eq,
1708            p1: 1,
1709            p2: 5,
1710            p3: 2,
1711            p4: P4::None,
1712            p5: 0x1FF, // 511, exceeds u8 range
1713        };
1714        assert_eq!(op.p5, 0x1FF);
1715        assert!(op.p5 > 255);
1716
1717        let op2 = VdbeOp {
1718            opcode: Opcode::Noop,
1719            p1: 0,
1720            p2: 0,
1721            p3: 0,
1722            p4: P4::None,
1723            p5: u16::MAX,
1724        };
1725        assert_eq!(op2.p5, 65535);
1726    }
1727
1728    // ── test_program_builder_basic ──────────────────────────────────────
1729    #[test]
1730    fn test_program_builder_basic() {
1731        let mut b = ProgramBuilder::new();
1732
1733        // Build: Init -> Integer 42 into r1 -> ResultRow r1,1 -> Halt
1734        let end_label = b.emit_label();
1735        b.emit_jump_to_label(Opcode::Init, 0, 0, end_label, P4::None, 0);
1736        let r1 = b.alloc_reg();
1737        assert_eq!(r1, 1);
1738        b.emit_op(Opcode::Integer, 42, r1, 0, P4::None, 0);
1739        b.emit_op(Opcode::ResultRow, r1, 1, 0, P4::None, 0);
1740        b.emit_op(Opcode::Halt, 0, 0, 0, P4::None, 0);
1741        b.resolve_label(end_label);
1742
1743        let prog = b.finish().unwrap();
1744        assert_eq!(prog.len(), 4);
1745        assert_eq!(prog.register_count(), 1);
1746        assert_eq!(prog.max_bind_parameter_index().unwrap(), 0);
1747
1748        // The Init instruction's p2 should point to address 4 (after Halt).
1749        assert_eq!(prog.get(0).unwrap().opcode, Opcode::Init);
1750        assert_eq!(prog.get(0).unwrap().p2, 4);
1751    }
1752
1753    #[test]
1754    fn test_program_precomputes_max_bind_parameter_index() {
1755        let mut b = ProgramBuilder::new();
1756        b.emit_op(Opcode::Variable, 1, 1, 0, P4::None, 0);
1757        b.emit_op(Opcode::Variable, 4, 2, 0, P4::None, 0);
1758        b.emit_op(Opcode::Variable, 2, 3, 0, P4::None, 0);
1759        let prog = b.finish().unwrap();
1760        assert_eq!(prog.max_bind_parameter_index(), Ok(4));
1761    }
1762
1763    #[test]
1764    fn test_program_tracks_invalid_bind_parameter_index() {
1765        let mut b = ProgramBuilder::new();
1766        b.emit_op(Opcode::Variable, 0, 1, 0, P4::None, 0);
1767        let prog = b.finish().unwrap();
1768        assert_eq!(prog.max_bind_parameter_index(), Err(0));
1769    }
1770
1771    #[test]
1772    fn test_program_builder_accumulates_table_index_meta_by_table_cursor() {
1773        use fsqlite_types::opcode::IndexCursorMeta;
1774
1775        let mut b = ProgramBuilder::new();
1776        b.register_table_indexes(
1777            3,
1778            vec![IndexCursorMeta {
1779                cursor_id: 4,
1780                column_indices: vec![0, 2],
1781            }],
1782        );
1783        b.register_table_indexes(
1784            3,
1785            vec![IndexCursorMeta {
1786                cursor_id: 5,
1787                column_indices: vec![1],
1788            }],
1789        );
1790
1791        let prog = b.finish().expect("program should build");
1792        let metas = prog
1793            .table_index_meta()
1794            .get(&3)
1795            .expect("table cursor metadata should be present");
1796        assert_eq!(metas.len(), 2);
1797        assert_eq!(metas[0].cursor_id, 4);
1798        assert_eq!(metas[0].column_indices, vec![0, 2]);
1799        assert_eq!(metas[1].cursor_id, 5);
1800        assert_eq!(metas[1].column_indices, vec![1]);
1801    }
1802
1803    // ── test_disassemble ────────────────────────────────────────────────
1804    #[test]
1805    fn test_disassemble() {
1806        let mut b = ProgramBuilder::new();
1807        b.emit_op(Opcode::Init, 0, 2, 0, P4::None, 0);
1808        b.emit_op(Opcode::Integer, 42, 1, 0, P4::None, 0);
1809        b.emit_op(Opcode::Halt, 0, 0, 0, P4::None, 0);
1810        let prog = b.finish().unwrap();
1811
1812        let asm = prog.disassemble();
1813        assert!(asm.contains("Init"));
1814        assert!(asm.contains("Integer"));
1815        assert!(asm.contains("Halt"));
1816        assert!(asm.contains("42")); // p1 of Integer
1817    }
1818
1819    // ── test_key_info ───────────────────────────────────────────────────
1820    #[test]
1821    fn test_key_info() {
1822        let ki = KeyInfo {
1823            num_fields: 3,
1824            collations: vec![
1825                "BINARY".to_owned(),
1826                "NOCASE".to_owned(),
1827                "BINARY".to_owned(),
1828            ],
1829            sort_orders: vec![SortOrder::Asc, SortOrder::Desc, SortOrder::Asc],
1830        };
1831        assert_eq!(ki.num_fields, 3);
1832        assert_eq!(ki.collations.len(), 3);
1833        assert_eq!(ki.sort_orders[1], SortOrder::Desc);
1834    }
1835
1836    // ── test_label_already_resolved ─────────────────────────────────────
1837    #[test]
1838    fn test_label_already_resolved() {
1839        // If a label is resolved before a jump references it, the jump
1840        // should be patched immediately.
1841        let mut b = ProgramBuilder::new();
1842        let label = b.emit_label();
1843        b.emit_op(Opcode::Noop, 0, 0, 0, P4::None, 0);
1844        b.resolve_label(label); // resolved to address 1
1845
1846        // Now emit a jump referencing the already-resolved label.
1847        let jump_addr = b.emit_jump_to_label(Opcode::Goto, 0, 0, label, P4::None, 0);
1848        // p2 should already be patched to 1.
1849        assert_eq!(b.op_at(jump_addr).unwrap().p2, 1);
1850
1851        let prog = b.finish().unwrap();
1852        assert_eq!(prog.len(), 2);
1853    }
1854
1855    // ── test_builder_register_via_builder ────────────────────────────────
1856    #[test]
1857    fn test_builder_register_via_builder() {
1858        let mut b = ProgramBuilder::new();
1859        let r1 = b.alloc_reg();
1860        let r2 = b.alloc_reg();
1861        let block = b.alloc_regs(4);
1862        assert_eq!(r1, 1);
1863        assert_eq!(r2, 2);
1864        assert_eq!(block, 3);
1865        assert_eq!(b.register_count(), 6);
1866
1867        // Temp allocation.
1868        let t1 = b.alloc_temp();
1869        assert_eq!(t1, 7);
1870        b.free_temp(t1);
1871        let t2 = b.alloc_temp();
1872        assert_eq!(t2, t1); // reused
1873    }
1874
1875    // ── test_resolve_label_to_specific_address ──────────────────────────
1876    #[test]
1877    fn test_resolve_label_to_specific_address() {
1878        let mut b = ProgramBuilder::new();
1879        let label = b.emit_label();
1880        let jump_addr = b.emit_jump_to_label(Opcode::Goto, 0, 0, label, P4::None, 0);
1881        b.emit_op(Opcode::Noop, 0, 0, 0, P4::None, 0);
1882        b.emit_op(Opcode::Noop, 0, 0, 0, P4::None, 0);
1883
1884        // Resolve to a specific address (not current).
1885        b.resolve_label_to(label, 42);
1886        assert_eq!(b.op_at(jump_addr).unwrap().p2, 42);
1887    }
1888
1889    // ── test_empty_program_finishes ─────────────────────────────────────
1890    #[test]
1891    fn test_empty_program_finishes() {
1892        let b = ProgramBuilder::new();
1893        let prog = b.finish().unwrap();
1894        assert!(prog.is_empty());
1895        assert_eq!(prog.register_count(), 0);
1896    }
1897
1898    // ── test_unreferenced_unresolved_label_ok ───────────────────────────
1899    #[test]
1900    fn test_unreferenced_unresolved_label_ok() {
1901        // A label that was created but never referenced or resolved should
1902        // not cause an error (it's unused, not a dangling reference).
1903        let mut b = ProgramBuilder::new();
1904        let _label = b.emit_label();
1905        b.emit_op(Opcode::Halt, 0, 0, 0, P4::None, 0);
1906        let prog = b.finish().unwrap();
1907        assert_eq!(prog.len(), 1);
1908    }
1909
1910    // ── PRAGMA handling (bd-iwu.5) ───────────────────────────────────────
1911
1912    #[cfg(not(target_arch = "wasm32"))]
1913    use std::fs;
1914
1915    use fsqlite_ast::Statement;
1916    use fsqlite_error::FrankenError;
1917    use fsqlite_mvcc::{BeginKind, MvccError, TransactionManager};
1918    use fsqlite_parser::Parser;
1919    use fsqlite_types::{CommitSeq, ObjectId, Oti, PageData, PageNumber, PageSize};
1920    use fsqlite_wal::{
1921        DEFAULT_RAPTORQ_REPAIR_SYMBOLS, WalFecGroupMeta, WalFecGroupMetaInit, WalFecGroupRecord,
1922        WalFecRecoveryOutcome, WalFrameCandidate, WalSalts, append_wal_fec_group,
1923        build_source_page_hashes, generate_wal_fec_repair_symbols,
1924        recover_wal_fec_group_with_decoder, scan_wal_fec,
1925    };
1926    #[cfg(not(target_arch = "wasm32"))]
1927    use tempfile::tempdir;
1928
1929    fn parse_pragma(sql: &str) -> std::result::Result<fsqlite_ast::PragmaStatement, String> {
1930        let mut p = Parser::from_sql(sql);
1931        let stmt = p.parse_statement().expect("parse statement");
1932        match stmt {
1933            Statement::Pragma(p) => Ok(p),
1934            other => Err(format!("expected PRAGMA, got: {other:?}")),
1935        }
1936    }
1937
1938    fn test_page(first_byte: u8) -> PageData {
1939        let mut page = PageData::zeroed(PageSize::DEFAULT);
1940        page.as_bytes_mut()[0] = first_byte;
1941        page
1942    }
1943
1944    fn make_source_pages(seed: u8, k_source: u32) -> Vec<Vec<u8>> {
1945        let page_len = usize::try_from(PageSize::DEFAULT.get()).expect("page size fits usize");
1946        (0..k_source)
1947            .map(|idx| {
1948                let idx_u8 = u8::try_from(idx).expect("test k_source fits u8");
1949                let mut page = vec![seed.wrapping_add(idx_u8); page_len];
1950                page[0] = idx_u8;
1951                page
1952            })
1953            .collect()
1954    }
1955
1956    fn make_wal_fec_group(
1957        start_frame_no: u32,
1958        r_repair: u8,
1959        seed: u8,
1960    ) -> (WalFecGroupRecord, Vec<Vec<u8>>) {
1961        let k_source = 5_u32;
1962        let source_pages = make_source_pages(seed, k_source);
1963        let page_size = PageSize::DEFAULT.get();
1964        let source_hashes = build_source_page_hashes(&source_pages);
1965        let page_numbers = (0..k_source).map(|i| 10 + i).collect::<Vec<_>>();
1966        let oti = Oti {
1967            f: u64::from(k_source) * u64::from(page_size),
1968            al: 1,
1969            t: page_size,
1970            z: 1,
1971            n: 1,
1972        };
1973        let meta = WalFecGroupMeta::from_init(WalFecGroupMetaInit {
1974            wal_salt1: 0xA11C_E001,
1975            wal_salt2: 0xA11C_E002,
1976            start_frame_no,
1977            end_frame_no: start_frame_no + (k_source - 1),
1978            db_size_pages: 256,
1979            page_size,
1980            k_source,
1981            r_repair: u32::from(r_repair),
1982            oti,
1983            object_id: ObjectId::from_bytes([seed; 16]),
1984            page_numbers,
1985            source_page_xxh3_128: source_hashes,
1986        })
1987        .expect("meta");
1988        let repair_symbols =
1989            generate_wal_fec_repair_symbols(&meta, &source_pages).expect("symbols");
1990        (
1991            WalFecGroupRecord::new(meta, repair_symbols).expect("group"),
1992            source_pages,
1993        )
1994    }
1995
1996    #[test]
1997    fn test_pragma_serializable_query_returns_current_setting() {
1998        let mut mgr = TransactionManager::new(PageSize::DEFAULT);
1999
2000        let stmt = parse_pragma("PRAGMA fsqlite.serializable").expect("parse pragma");
2001        let out = pragma::apply(&mut mgr, &stmt).unwrap();
2002        assert_eq!(out, pragma::PragmaOutput::Bool(true));
2003    }
2004
2005    #[test]
2006    fn test_connection_pragma_differential_views_default_query_returns_false() {
2007        let mut state = pragma::ConnectionPragmaState::default();
2008
2009        let stmt = parse_pragma("PRAGMA fsqlite_differential_views").expect("parse pragma");
2010        let out = pragma::apply_connection_pragma(&mut state, &stmt).expect("query pragma");
2011        assert_eq!(out, pragma::PragmaOutput::Bool(false));
2012    }
2013
2014    #[test]
2015    fn test_connection_pragma_differential_views_set_and_query_across_aliases() {
2016        let mut state = pragma::ConnectionPragmaState::default();
2017
2018        let set_on = parse_pragma("PRAGMA fsqlite.differential_views = ON").expect("parse pragma");
2019        assert_eq!(
2020            pragma::apply_connection_pragma(&mut state, &set_on).expect("set pragma"),
2021            pragma::PragmaOutput::Bool(true)
2022        );
2023        assert!(state.differential_views.is_enabled());
2024
2025        let query = parse_pragma("PRAGMA fsqlite_differential_views").expect("parse pragma");
2026        assert_eq!(
2027            pragma::apply_connection_pragma(&mut state, &query).expect("query pragma"),
2028            pragma::PragmaOutput::Bool(true)
2029        );
2030    }
2031
2032    #[test]
2033    fn test_connection_pragma_differential_views_rejects_non_boolean_values() {
2034        let mut state = pragma::ConnectionPragmaState::default();
2035
2036        let stmt = parse_pragma("PRAGMA fsqlite_differential_views = 2").expect("parse pragma");
2037        assert!(matches!(
2038            pragma::apply_connection_pragma(&mut state, &stmt),
2039            Err(FrankenError::TypeMismatch { .. })
2040        ));
2041    }
2042
2043    #[test]
2044    fn test_pragma_serializable_set_and_query() {
2045        let mut mgr = TransactionManager::new(PageSize::DEFAULT);
2046
2047        let set_off = parse_pragma("PRAGMA fsqlite.serializable = OFF").expect("parse pragma");
2048        assert_eq!(
2049            pragma::apply(&mut mgr, &set_off).unwrap(),
2050            pragma::PragmaOutput::Bool(false)
2051        );
2052
2053        let query = parse_pragma("PRAGMA fsqlite.serializable").expect("parse pragma");
2054        assert_eq!(
2055            pragma::apply(&mut mgr, &query).unwrap(),
2056            pragma::PragmaOutput::Bool(false)
2057        );
2058    }
2059
2060    #[test]
2061    fn test_pragma_scope_per_connection_via_handler() {
2062        let mut conn_a = TransactionManager::new(PageSize::DEFAULT);
2063        let mut conn_b = TransactionManager::new(PageSize::DEFAULT);
2064
2065        let set_off = parse_pragma("PRAGMA fsqlite.serializable = OFF").expect("parse pragma");
2066        let _ = pragma::apply(&mut conn_a, &set_off).unwrap();
2067
2068        let query = parse_pragma("PRAGMA fsqlite.serializable").expect("parse pragma");
2069        assert_eq!(
2070            pragma::apply(&mut conn_a, &query).unwrap(),
2071            pragma::PragmaOutput::Bool(false)
2072        );
2073        assert_eq!(
2074            pragma::apply(&mut conn_b, &query).unwrap(),
2075            pragma::PragmaOutput::Bool(true)
2076        );
2077    }
2078
2079    #[test]
2080    fn test_pragma_not_retroactive_to_active_txn_via_handler() {
2081        let mut mgr = TransactionManager::new(PageSize::DEFAULT);
2082
2083        let mut txn = mgr.begin(BeginKind::Concurrent).unwrap();
2084        mgr.write_page(&mut txn, PageNumber::new(1).unwrap(), test_page(0x01))
2085            .unwrap();
2086        txn.has_in_rw = true;
2087        txn.has_out_rw = true;
2088        assert!(txn.has_dangerous_structure());
2089
2090        // Flip OFF mid-txn; this must not affect the already-begun transaction.
2091        let set_off = parse_pragma("PRAGMA fsqlite.serializable = OFF").expect("parse pragma");
2092        let _ = pragma::apply(&mut mgr, &set_off).unwrap();
2093
2094        assert_eq!(
2095            mgr.commit(&mut txn).unwrap_err(),
2096            MvccError::BusySnapshot,
2097            "PRAGMA change must not be retroactive to an active txn"
2098        );
2099    }
2100
2101    #[test]
2102    fn test_e2e_serializable_pragma_switch_changes_behavior() {
2103        let mut mgr = TransactionManager::new(PageSize::DEFAULT);
2104
2105        // Run workload with serializable=ON: must abort on dangerous structure.
2106        let set_on = parse_pragma("PRAGMA fsqlite.serializable = ON").expect("parse pragma");
2107        let _ = pragma::apply(&mut mgr, &set_on).unwrap();
2108
2109        let mut txn_on = mgr.begin(BeginKind::Concurrent).unwrap();
2110        mgr.write_page(&mut txn_on, PageNumber::new(1).unwrap(), test_page(0x10))
2111            .unwrap();
2112        txn_on.has_in_rw = true;
2113        txn_on.has_out_rw = true;
2114        assert_eq!(
2115            mgr.commit(&mut txn_on).unwrap_err(),
2116            MvccError::BusySnapshot,
2117            "serializable=ON must enforce SSI (abort)"
2118        );
2119
2120        // Run the same workload with serializable=OFF: must commit (plain SI).
2121        let set_off = parse_pragma("PRAGMA fsqlite.serializable = OFF").expect("parse pragma");
2122        let _ = pragma::apply(&mut mgr, &set_off).unwrap();
2123
2124        let mut txn_off = mgr.begin(BeginKind::Concurrent).unwrap();
2125        mgr.write_page(&mut txn_off, PageNumber::new(2).unwrap(), test_page(0x20))
2126            .unwrap();
2127        txn_off.has_in_rw = true;
2128        txn_off.has_out_rw = true;
2129
2130        let seq = mgr.commit(&mut txn_off).unwrap();
2131        assert!(
2132            seq > CommitSeq::ZERO,
2133            "serializable=OFF must allow write skew"
2134        );
2135    }
2136
2137    #[test]
2138    fn test_pragma_raptorq_repair_symbols_default_query() {
2139        let mut mgr = TransactionManager::new(PageSize::DEFAULT);
2140        let query = parse_pragma("PRAGMA raptorq_repair_symbols").expect("parse query");
2141        assert_eq!(
2142            pragma::apply(&mut mgr, &query).expect("query pragma"),
2143            pragma::PragmaOutput::Int(i64::from(DEFAULT_RAPTORQ_REPAIR_SYMBOLS))
2144        );
2145    }
2146
2147    #[cfg(not(target_arch = "wasm32"))]
2148    #[test]
2149    fn test_bd_1hi_12_unit_compliance_gate() {
2150        let dir = tempdir().expect("tempdir");
2151        let sidecar = dir.path().join("unit.wal-fec");
2152        let db_path = dir.path().join("unit.db");
2153        fs::write(&db_path, vec![0_u8; 100]).expect("seed db header");
2154
2155        let mut conn_a = TransactionManager::new(PageSize::DEFAULT);
2156        let mut conn_b = TransactionManager::new(PageSize::DEFAULT);
2157
2158        let query = parse_pragma("PRAGMA raptorq_repair_symbols").expect("parse query");
2159        assert_eq!(
2160            pragma::apply_with_sidecar(&mut conn_a, &query, Some(&sidecar)).expect("query default"),
2161            pragma::PragmaOutput::Int(i64::from(DEFAULT_RAPTORQ_REPAIR_SYMBOLS))
2162        );
2163
2164        let set_max = parse_pragma("PRAGMA raptorq_repair_symbols = 255").expect("parse set max");
2165        assert_eq!(
2166            pragma::apply_with_sidecar(&mut conn_a, &set_max, Some(&sidecar)).expect("set max"),
2167            pragma::PragmaOutput::Int(255)
2168        );
2169
2170        let set_too_high =
2171            parse_pragma("PRAGMA raptorq_repair_symbols = 256").expect("parse set too high");
2172        assert!(matches!(
2173            pragma::apply_with_sidecar(&mut conn_a, &set_too_high, Some(&sidecar)),
2174            Err(FrankenError::OutOfRange { .. })
2175        ));
2176
2177        let set_negative =
2178            parse_pragma("PRAGMA raptorq_repair_symbols = -1").expect("parse set negative");
2179        assert!(matches!(
2180            pragma::apply_with_sidecar(&mut conn_a, &set_negative, Some(&sidecar)),
2181            Err(FrankenError::OutOfRange { .. })
2182        ));
2183
2184        let set_non_integer =
2185            parse_pragma("PRAGMA raptorq_repair_symbols = ON").expect("parse set non-integer");
2186        assert!(matches!(
2187            pragma::apply_with_sidecar(&mut conn_a, &set_non_integer, Some(&sidecar)),
2188            Err(FrankenError::TypeMismatch { .. })
2189        ));
2190
2191        let query_new_conn = parse_pragma("PRAGMA raptorq_repair_symbols").expect("parse query");
2192        assert_eq!(
2193            pragma::apply_with_sidecar(&mut conn_b, &query_new_conn, Some(&sidecar))
2194                .expect("query persisted value"),
2195            pragma::PragmaOutput::Int(255)
2196        );
2197
2198        let set_shared = parse_pragma("PRAGMA raptorq_repair_symbols = 7").expect("parse shared");
2199        let _ = pragma::apply_with_sidecar(&mut conn_a, &set_shared, Some(&sidecar))
2200            .expect("persist shared setting");
2201        assert_eq!(
2202            pragma::apply_with_sidecar(&mut conn_b, &query_new_conn, Some(&sidecar))
2203                .expect("cross-connection visibility"),
2204            pragma::PragmaOutput::Int(7)
2205        );
2206
2207        let db_bytes = fs::read(&db_path).expect("read db header");
2208        assert!(
2209            db_bytes[72..92].iter().all(|&byte| byte == 0),
2210            "sqlite header reserved bytes must remain untouched"
2211        );
2212    }
2213
2214    #[cfg(not(target_arch = "wasm32"))]
2215    #[test]
2216    fn prop_bd_1hi_12_structure_compliance() {
2217        let dir = tempdir().expect("tempdir");
2218        let sidecar = dir.path().join("property.wal-fec");
2219        let mut mgr = TransactionManager::new(PageSize::DEFAULT);
2220        let query = parse_pragma("PRAGMA raptorq_repair_symbols").expect("parse query");
2221
2222        for value in 0_u16..=255_u16 {
2223            let sql = format!("PRAGMA raptorq_repair_symbols = {value}");
2224            let set_stmt = parse_pragma(&sql).expect("parse set statement");
2225            assert_eq!(
2226                pragma::apply_with_sidecar(&mut mgr, &set_stmt, Some(&sidecar)).expect("set value"),
2227                pragma::PragmaOutput::Int(i64::from(value))
2228            );
2229            assert_eq!(
2230                pragma::apply_with_sidecar(&mut mgr, &query, Some(&sidecar)).expect("query value"),
2231                pragma::PragmaOutput::Int(i64::from(value))
2232            );
2233        }
2234    }
2235
2236    #[cfg(not(target_arch = "wasm32"))]
2237    #[test]
2238    #[allow(clippy::too_many_lines)]
2239    fn test_e2e_bd_1hi_12_compliance() {
2240        let dir = tempdir().expect("tempdir");
2241        let sidecar = dir.path().join("e2e.wal-fec");
2242        let mut mgr = TransactionManager::new(PageSize::DEFAULT);
2243
2244        let set_zero = parse_pragma("PRAGMA raptorq_repair_symbols = 0").expect("parse set 0");
2245        let _ = pragma::apply_with_sidecar(&mut mgr, &set_zero, Some(&sidecar)).expect("set 0");
2246        if mgr.raptorq_repair_symbols() > 0 {
2247            let (group, _) = make_wal_fec_group(1, mgr.raptorq_repair_symbols(), 0x10);
2248            append_wal_fec_group(&sidecar, &group).expect("append group");
2249        }
2250        let after_zero = scan_wal_fec(&sidecar).expect("scan after zero");
2251        assert!(
2252            after_zero.groups.is_empty(),
2253            "N=0 must produce no .wal-fec groups for new commits"
2254        );
2255
2256        let set_one = parse_pragma("PRAGMA raptorq_repair_symbols = 1").expect("parse set 1");
2257        let _ = pragma::apply_with_sidecar(&mut mgr, &set_one, Some(&sidecar)).expect("set 1");
2258        let (group_r1, _) = make_wal_fec_group(1, mgr.raptorq_repair_symbols(), 0x11);
2259        append_wal_fec_group(&sidecar, &group_r1).expect("append r=1 group");
2260
2261        let set_two = parse_pragma("PRAGMA raptorq_repair_symbols = 2").expect("parse set 2");
2262        let _ = pragma::apply_with_sidecar(&mut mgr, &set_two, Some(&sidecar)).expect("set 2");
2263        let (group_r2, _) = make_wal_fec_group(6, mgr.raptorq_repair_symbols(), 0x22);
2264        append_wal_fec_group(&sidecar, &group_r2).expect("append r=2 group");
2265
2266        let set_four = parse_pragma("PRAGMA raptorq_repair_symbols = 4").expect("parse set 4");
2267        let _ = pragma::apply_with_sidecar(&mut mgr, &set_four, Some(&sidecar)).expect("set 4");
2268        let (group_r4, source_pages_r4) =
2269            make_wal_fec_group(11, mgr.raptorq_repair_symbols(), 0x33);
2270        append_wal_fec_group(&sidecar, &group_r4).expect("append r=4 group");
2271
2272        let scan = scan_wal_fec(&sidecar).expect("scan sidecar");
2273        assert_eq!(scan.groups.len(), 3);
2274        assert_eq!(scan.groups[0].repair_symbols.len(), 1);
2275        assert_eq!(scan.groups[1].repair_symbols.len(), 2);
2276        assert_eq!(scan.groups[2].repair_symbols.len(), 4);
2277        assert_eq!(scan.groups[1].meta.r_repair, 2);
2278        assert_eq!(scan.groups[2].meta.r_repair, 4);
2279
2280        let group_id = group_r4.meta.group_id();
2281        let wal_salts = WalSalts {
2282            salt1: group_r4.meta.wal_salt1,
2283            salt2: group_r4.meta.wal_salt2,
2284        };
2285        let k_source = usize::try_from(group_r4.meta.k_source).expect("k fits usize");
2286
2287        let mut corrupt_three_frames = Vec::new();
2288        for (idx, page) in source_pages_r4.iter().enumerate() {
2289            let mut payload = page.clone();
2290            if idx < 3 {
2291                payload[0] ^= 0xFF;
2292            }
2293            corrupt_three_frames.push(WalFrameCandidate {
2294                frame_no: group_r4.meta.start_frame_no + u32::try_from(idx).expect("idx fits u32"),
2295                page_data: payload,
2296            });
2297        }
2298        let expected_pages = source_pages_r4.clone();
2299        let recovered = recover_wal_fec_group_with_decoder(
2300            &sidecar,
2301            group_id,
2302            wal_salts,
2303            group_r4.meta.start_frame_no,
2304            &corrupt_three_frames,
2305            move |meta: &WalFecGroupMeta, symbols| {
2306                if symbols.len() < usize::try_from(meta.k_source).expect("k fits usize") {
2307                    return Err(FrankenError::WalCorrupt {
2308                        detail: "insufficient symbols".to_owned(),
2309                    });
2310                }
2311                Ok(expected_pages.clone())
2312            },
2313        )
2314        .expect("recover with <=R corruption");
2315        assert!(
2316            matches!(recovered, WalFecRecoveryOutcome::Recovered(_)),
2317            "expected recovered outcome"
2318        );
2319        let WalFecRecoveryOutcome::Recovered(group) = recovered else {
2320            unreachable!("asserted recovered outcome above");
2321        };
2322        assert_eq!(group.recovered_pages.len(), k_source);
2323
2324        let mut corrupt_five_frames = Vec::new();
2325        for (idx, page) in source_pages_r4.iter().enumerate() {
2326            let mut payload = page.clone();
2327            payload[0] ^= 0x55;
2328            corrupt_five_frames.push(WalFrameCandidate {
2329                frame_no: group_r4.meta.start_frame_no + u32::try_from(idx).expect("idx fits u32"),
2330                page_data: payload,
2331            });
2332        }
2333        let truncated = recover_wal_fec_group_with_decoder(
2334            &sidecar,
2335            group_id,
2336            wal_salts,
2337            group_r4.meta.start_frame_no,
2338            &corrupt_five_frames,
2339            |_meta: &WalFecGroupMeta, _symbols| {
2340                Err(FrankenError::WalCorrupt {
2341                    detail: "decoder should not be able to recover".to_owned(),
2342                })
2343            },
2344        )
2345        .expect("recover with >R corruption");
2346        assert!(matches!(
2347            truncated,
2348            WalFecRecoveryOutcome::TruncateBeforeGroup { .. }
2349        ));
2350    }
2351}