Skip to main content

spg_embedded/
lib.rs

1// v7.7.2 — every public item in this crate must carry a
2// doc-comment; new code that adds a `pub` without one fails CI.
3#![deny(missing_docs)]
4
5//! # spg-embedded
6//!
7//! Ergonomic embedded-mode entry point for SPG. Wraps the
8//! `spg-engine` execution layer for in-process applications
9//! that don't want to spin up a TCP listener / fork to the
10//! `spg-server` binary.
11//!
12//! ## Quick start
13//!
14//! ```no_run
15//! use spg_embedded::Database;
16//!
17//! // On-disk, durable. WAL fsynced per commit; auto-checkpoint
18//! // at 4 MiB WAL by default.
19//! let mut db = Database::open_path("/data/app.db").unwrap();
20//! db.execute("CREATE TABLE users (id INT NOT NULL, name TEXT)").unwrap();
21//! db.execute("INSERT INTO users VALUES (1, 'alice')").unwrap();
22//! let rows = db.query("SELECT name FROM users WHERE id = 1").unwrap();
23//! for row in &rows {
24//!     println!("{:?}", row);
25//! }
26//! ```
27//!
28//! ## Production checklist (v7.5)
29//!
30//! - **Persistence**: `Database::open_path(p)` writes a
31//!   crash-consistent WAL + periodic checkpoint snapshot. The
32//!   on-disk format is byte-identical to what `spg-server`
33//!   produces, so a database can move between modes without
34//!   conversion.
35//! - **Durability**: every `execute()` that mutates calls
36//!   `fsync` before returning `Ok`. There is no group commit
37//!   in embedded mode — every commit pays one fsync. If you
38//!   need batch throughput, wrap multiple statements in
39//!   [`Database::with_transaction`] which fsyncs only at
40//!   commit.
41//! - **Concurrency**: [`Database`] is `Send` but **not** `Sync`.
42//!   Share across threads via `Arc<Mutex<Database>>`. The
43//!   single-writer model is intentional — see
44//!   [STABILITY § A1](https://github.com/lihao/spg/blob/master/STABILITY.md).
45//! - **Background work**: [`Database::spawn_background_freezer`]
46//!   moves cold rows to disk-resident segments while you keep
47//!   serving requests. It runs in a dedicated thread; drop the
48//!   returned [`FreezerHandle`] (or call `stop()`) for clean
49//!   shutdown.
50//! - **Errors**: all public enums ([`EngineError`],
51//!   [`QueryResult`], [`Value`]) are `#[non_exhaustive]`. Match
52//!   them with a wildcard arm so future v7.x releases can add
53//!   variants without breaking your code.
54//!
55//! ## Panic contract
56//!
57//! - **No `execute()` / `query()` call panics on user input.**
58//!   Malformed SQL, type mismatches, missing tables — all
59//!   return `Err(EngineError::…)`. If you observe a panic on
60//!   a user-controlled string, that is a bug; file an issue.
61//! - The library panics **only** on internal invariant
62//!   violations (e.g., catalog snapshot magic mismatch, WAL
63//!   record CRC sentinel corruption that survived the boot-
64//!   time validation). These represent silent disk corruption
65//!   and an unwind would leak inconsistent state, so the
66//!   release profile uses `panic = abort` — your host process
67//!   dies fast rather than continuing on poisoned data.
68//! - If you cannot tolerate `panic = abort`, build with
69//!   `--profile release-dbg` (keeps unwind tables) and use
70//!   `std::panic::catch_unwind` at your application boundary.
71//!
72//! ## Why a separate crate?
73//!
74//! `spg-engine` is `no_std`-compatible (vendored alloc-only).
75//! The embedded-mode entry point uses `std` (filesystem,
76//! threading), so it lives in its own crate to keep the
77//! `no_std` boundary clean.
78
79pub use spg_engine::{CatalogSnapshot, Engine, EngineError, ParsedStatement, QueryResult};
80pub use spg_storage::{ColumnSchema, DataType, Value};
81
82/// v7.16.0 — handle for a parsed-and-planned SQL statement.
83/// Hand off to [`Database::execute_prepared`] / [`Database::query_prepared`]
84/// with a `&[Value]` slice carrying the bind parameters (PG-style
85/// `$1`, `$2`, … positional). Cheap to `Clone`; the underlying AST
86/// is shared by handle copies and cloned per bind call by the
87/// engine's executor.
88///
89/// The handle holds a snapshot of the AST at prepare time. If
90/// the engine's plan cache evicts the entry between prepare and
91/// execute (e.g. ANALYZE bumps the statistics version) the
92/// stored AST keeps working — `execute_prepared` operates on
93/// the handle's clone, not the cache entry.
94#[derive(Debug, Clone)]
95pub struct Statement {
96    /// The parsed + planned AST. `spg-engine::prepare_cached`
97    /// returns it as a clone of the cached plan, so any rewrite
98    /// passes (`expand_group_by_all`, `reorder_joins`, …) have
99    /// already run.
100    pub(crate) stmt: ParsedStatement,
101    /// Original SQL source, kept for `Display` / debug only.
102    /// WAL persistence renders from the AST so a bind-time
103    /// rewrite of `$1..$N` survives replay.
104    pub(crate) sql: String,
105}
106
107impl Statement {
108    /// Borrow the original SQL source — useful for tracing and
109    /// debug logs. WAL replay does NOT use this; it serialises
110    /// the bind-final AST instead.
111    #[must_use]
112    pub fn sql(&self) -> &str {
113        &self.sql
114    }
115}
116
117/// v7.16.0 — internal WAL helper. Mirrors what
118/// `Engine::execute_prepared` does to the cloned AST so the WAL
119/// record carries the bind-final SQL text (so replay's
120/// simple-query path reconstructs the same row state without
121/// needing the original `Statement` handle to still be alive).
122/// Errors from the underlying engine helper would only fire if
123/// the bind-final stmt referenced a placeholder past the params
124/// slice — and that case has already errored in the executor
125/// above before this helper runs, so we discard the Result here.
126fn wal_render_with_params(stmt: &mut ParsedStatement, params: &[Value]) {
127    let _ = spg_engine::substitute_placeholders(stmt, params);
128}
129
130use std::collections::BTreeMap;
131use std::fs::{File, OpenOptions};
132use std::io::Write;
133use std::path::{Path, PathBuf};
134use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
135use std::sync::{Arc, Mutex};
136use std::thread::{self, JoinHandle};
137use std::time::{Duration, SystemTime, UNIX_EPOCH};
138
139/// v7.11.3 — wall-clock provider injected into every embedded
140/// `Engine`. Microseconds since the Unix epoch; clamps to
141/// `i64::MAX` if the system clock is far-future. Used by SQL's
142/// `NOW()` / `CURRENT_TIMESTAMP` / `CURRENT_DATE` rewrite layer
143/// so PG-idiomatic time queries work without the caller wiring
144/// their own clock.
145fn wall_clock_micros() -> i64 {
146    SystemTime::now()
147        .duration_since(UNIX_EPOCH)
148        .map_or(0, |d| i64::try_from(d.as_micros()).unwrap_or(i64::MAX))
149}
150
151use spg_manifest::{CatalogManifest, ColdSegmentEntry, manifest_path as spg_manifest_path};
152
153// -- v7.1 WAL format constants (mirror `spg-server`'s) ---------
154// Kept private so callers can't mis-frame records; the v3 layout
155// is the same the server uses, so a `spg-server` boot can read a
156// database an embedded process wrote and vice versa.
157const WAL_V2_SENTINEL: u32 = 0x8000_0000;
158const WAL_V3_FLAG: u32 = 0x4000_0000;
159const WAL_V3_TYPE_AUTO_COMMIT_SQL: u8 = 0x01;
160/// v7.18 — durability checkpoint marker stays at 0x02 (skipped on replay).
161const WAL_V3_TYPE_DURABILITY_CHECKPOINT: u8 = 0x02;
162/// v7.18 PITR — auto-commit-sql record with appended (commit_lsn,
163/// commit_unix_us) fields so replay can target a specific point in
164/// time. Backward-compat: v3 records (type 0x01) keep working, the
165/// envelope flag bits are unchanged. The new type byte is the
166/// schema-version discriminator.
167const WAL_V4_TYPE_AUTO_COMMIT_SQL: u8 = 0x10;
168/// v7.18 — sentinel for "no wall clock" inside a v4 record's
169/// commit_unix_us slot. Restore-to-timestamp skips records with
170/// this sentinel (no time anchor); LSN-based restore is
171/// unaffected.
172const WAL_V4_NO_CLOCK: i64 = i64::MIN;
173/// v7.18 — extra header bytes after the type byte in a v4 record:
174/// 8 bytes commit_lsn (u64 LE) + 8 bytes commit_unix_us (i64 LE).
175const WAL_V4_EXTRA_HEADER: usize = 16;
176/// v7.18 PITR — checkpoint anchor record written to the WAL *before*
177/// the snapshot file replaces the on-disk catalog. Carries the
178/// (lsn, ts, snapshot_path) triple so restore tooling can find the
179/// matching base snapshot without scanning the filesystem. Replay
180/// dispatch skips it (same as the v3 durability marker).
181const WAL_V4_TYPE_CHECKPOINT_MARKER: u8 = 0x11;
182
183/// v7.21 (mailrs embed round-12 polish) — one COMMITted explicit
184/// transaction, flushed atomically at COMMIT time. Payload = the
185/// transaction's bind-final mutation statements joined with `";\n"`;
186/// replay re-splits via [`split_statements`] and applies in order.
187/// Same 16-byte (commit_lsn, commit_unix_us) prefix as the v4
188/// auto-commit record. The record is CRC-framed like every other
189/// record, so replay applies the whole transaction or — torn tail —
190/// none of it; a transaction can never half-resurrect.
191///
192/// Why it exists: in-transaction mutations only touch the engine's
193/// shadow catalog (`modified_catalog: false`), so the per-statement
194/// auto-commit append never fired and a COMMIT followed by a crash
195/// (no graceful Drop checkpoint) lost the transaction.
196const WAL_V4_TYPE_TX_COMMIT_SQL: u8 = 0x12;
197
198/// v7.1 — auto-checkpoint threshold. Once the WAL grows past
199/// this many bytes, the next successful `execute()` call ends
200/// with a `checkpoint()` so the WAL stays bounded. Tunable via
201/// `SPG_EMBEDDED_CHECKPOINT_BYTES` env.
202fn default_checkpoint_threshold_bytes() -> u64 {
203    std::env::var("SPG_EMBEDDED_CHECKPOINT_BYTES")
204        .ok()
205        .and_then(|s| s.parse::<u64>().ok())
206        .filter(|&n| n > 0)
207        .unwrap_or(4 * 1024 * 1024)
208}
209
210/// v7.30.3 (mailrs round-26) — per-query byte budget on join/filter
211/// materialisation, default ON at 256 MiB for embed parity with the
212/// server's allocator-level `SPG_MAX_QUERY_BYTES` default. A fat
213/// backfill batch (1000 × full mail bodies) then errors with
214/// `QueryBytesExceeded` instead of walking the host into reclaim
215/// livelock. `SPG_MAX_QUERY_BYTES=0` disables; any other value
216/// overrides. NOT applied to the WAL-replay engine — replay must
217/// never fail on a tuning knob.
218fn engine_with_query_byte_budget(engine: Engine) -> Engine {
219    const DEFAULT_MAX_QUERY_BYTES: usize = 256 * 1024 * 1024;
220    match std::env::var("SPG_MAX_QUERY_BYTES")
221        .ok()
222        .and_then(|s| s.trim().parse::<usize>().ok())
223    {
224        Some(0) => engine,
225        Some(n) => engine.with_max_query_bytes(n),
226        None => engine.with_max_query_bytes(DEFAULT_MAX_QUERY_BYTES),
227    }
228}
229
230/// v7.1 — encode one v3 `auto_commit_sql` record. Layout:
231///
232/// ```text
233/// [u32 LE (len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
234/// [u32 LE crc32 over (type_byte || sql_bytes)]
235/// [u8 type = 0x01]
236/// [sql bytes]
237/// ```
238fn encode_v3_auto_commit(sql: &str) -> Vec<u8> {
239    let payload = sql.as_bytes();
240    let mut crc_buf = Vec::with_capacity(1 + payload.len());
241    crc_buf.push(WAL_V3_TYPE_AUTO_COMMIT_SQL);
242    crc_buf.extend_from_slice(payload);
243    let crc = spg_crypto::crc32::crc32(&crc_buf);
244    let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
245    let mut out = Vec::with_capacity(4 + 4 + 1 + payload.len());
246    out.extend_from_slice(&header);
247    out.extend_from_slice(&crc.to_le_bytes());
248    out.push(WAL_V3_TYPE_AUTO_COMMIT_SQL);
249    out.extend_from_slice(payload);
250    out
251}
252
253/// v7.20 P2 — WAL group-commit. N concurrent commits share one
254/// fsync (the 4.2 ms p50 that profile_breakdown measured as
255/// 99.2% of the durable write path).
256///
257/// Leader-follower protocol, same family as PG's group commit:
258///
259/// 1. `enqueue(record)` — called while the caller still holds
260///    the engine's write lock. Appends the encoded record to the
261///    shared buffer, returns a sequence ticket. O(memcpy).
262/// 2. Caller RELEASES the engine write lock (the next writer's
263///    mutation proceeds in parallel with this batch's fsync).
264/// 3. `wait_flushed(seq)` — if nobody is flushing, the caller
265///    elects itself leader: swaps the buffer out, writes +
266///    fsyncs ONCE for every record in the batch, marks the
267///    batch durable, wakes all followers. Otherwise it parks on
268///    the condvar until a leader covers its seq.
269///
270/// Durability contract is unchanged from v7.19: `execute()`
271/// does not return Ok until the record that describes its
272/// mutation is fsynced. The only change is N callers sharing
273/// one fsync instead of paying one each.
274///
275/// Lock order (deadlock-free): `state` then `file`; never the
276/// reverse. The leader holds `file` WITHOUT `state` during IO so
277/// enqueues continue while fsync runs.
278#[derive(Debug)]
279struct WalGroup {
280    state: Mutex<WalGroupState>,
281    cond: std::sync::Condvar,
282    /// Active chunk file handle. Separate lock from `state` so
283    /// the leader's write+fsync doesn't block concurrent
284    /// enqueues. Swapped by `checkpoint()` at rotation.
285    file: Mutex<File>,
286}
287
288#[derive(Debug)]
289struct WalGroupState {
290    /// Encoded records awaiting flush.
291    buf: Vec<u8>,
292    /// Monotonic enqueue counter (1-based).
293    enqueued_seq: u64,
294    /// Highest seq whose record is fsynced.
295    flushed_seq: u64,
296    /// True while some caller is inside the leader IO section.
297    leader_active: bool,
298    /// Sticky fatal error — a failed fsync poisons the WAL
299    /// (loud, never silent). All current + future waiters error.
300    failed: Option<String>,
301    /// Bytes written to the active chunk since rotation —
302    /// drives the auto-checkpoint trigger.
303    written_len: u64,
304}
305
306/// Ticket returned by the buffered write path; `wait()` blocks
307/// until the record it covers is durable (or the WAL is
308/// poisoned). Cheap to move across threads.
309#[derive(Debug)]
310pub struct WalTicket {
311    group: Arc<WalGroup>,
312    seq: u64,
313}
314
315impl WalGroup {
316    fn new(file: File, initial_len: u64) -> Self {
317        Self {
318            state: Mutex::new(WalGroupState {
319                buf: Vec::new(),
320                enqueued_seq: 0,
321                flushed_seq: 0,
322                leader_active: false,
323                failed: None,
324                written_len: initial_len,
325            }),
326            cond: std::sync::Condvar::new(),
327            file: Mutex::new(file),
328        }
329    }
330
331    /// Append `record` to the pending batch. Returns the seq the
332    /// caller must wait on. Called under the engine write lock —
333    /// keep it O(memcpy).
334    fn enqueue(&self, record: &[u8]) -> u64 {
335        let mut g = self.state.lock().expect("wal state poisoned");
336        g.buf.extend_from_slice(record);
337        g.enqueued_seq += 1;
338        g.enqueued_seq
339    }
340
341    /// Block until `seq` is durable. Leader-follower: the first
342    /// arriving waiter flushes for everyone.
343    fn wait_flushed(&self, seq: u64) -> Result<(), EngineError> {
344        let mut g = self.state.lock().expect("wal state poisoned");
345        loop {
346            if let Some(e) = &g.failed {
347                return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
348                    format!("WAL poisoned by earlier flush failure: {e}"),
349                )));
350            }
351            if g.flushed_seq >= seq {
352                return Ok(());
353            }
354            if !g.leader_active {
355                // Elect self leader.
356                g.leader_active = true;
357                drop(g);
358                // v7.20 — commit_delay (PG's same-named knob):
359                // before taking the batch, give in-flight
360                // writers a short window to enqueue so the
361                // shared fsync covers more commits. 150 µs costs
362                // ~3.5% on a solo 4.2 ms fsync but multiplies
363                // batch size under load. Tunable via
364                // SPG_COMMIT_DELAY_US (0 disables).
365                let delay = commit_delay_us();
366                if delay > 0 {
367                    std::thread::sleep(std::time::Duration::from_micros(delay));
368                }
369                let (batch, flush_to) = {
370                    let mut g2 = self.state.lock().expect("wal state poisoned");
371                    (core::mem::take(&mut g2.buf), g2.enqueued_seq)
372                };
373                let io_result: std::io::Result<()> = (|| {
374                    let mut f = self.file.lock().expect("wal file poisoned");
375                    f.write_all(&batch)?;
376                    f.sync_data()
377                })();
378                g = self.state.lock().expect("wal state poisoned");
379                g.leader_active = false;
380                match io_result {
381                    Ok(()) => {
382                        g.flushed_seq = flush_to;
383                        g.written_len = g.written_len.saturating_add(batch.len() as u64);
384                    }
385                    Err(e) => {
386                        g.failed = Some(e.to_string());
387                    }
388                }
389                self.cond.notify_all();
390                //
391
392                // Loop continues: either our seq is now covered
393                // (leader path normally returns next iteration)
394                // or the error branch surfaces.
395                continue;
396            }
397            g = self.cond.wait(g).expect("wal condvar poisoned");
398        }
399    }
400
401    /// Drain the pending batch + flush synchronously. Caller must
402    /// guarantee no concurrent enqueues (checkpoint holds the
403    /// engine exclusively). Used before rotation so the marker
404    /// lands in the right chunk.
405    fn flush_now(&self) -> Result<(), EngineError> {
406        let mut g = self.state.lock().expect("wal state poisoned");
407        if let Some(e) = &g.failed {
408            return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
409                format!("WAL poisoned: {e}"),
410            )));
411        }
412        let batch = core::mem::take(&mut g.buf);
413        let flush_to = g.enqueued_seq;
414        if batch.is_empty() {
415            return Ok(());
416        }
417        drop(g);
418        let io: std::io::Result<()> = (|| {
419            let mut f = self.file.lock().expect("wal file poisoned");
420            f.write_all(&batch)?;
421            f.sync_data()
422        })();
423        let mut g = self.state.lock().expect("wal state poisoned");
424        match io {
425            Ok(()) => {
426                g.flushed_seq = flush_to;
427                g.written_len = g.written_len.saturating_add(batch.len() as u64);
428                self.cond.notify_all();
429                Ok(())
430            }
431            Err(e) => {
432                g.failed = Some(e.to_string());
433                self.cond.notify_all();
434                Err(io_err(e))
435            }
436        }
437    }
438
439    /// Swap the active chunk handle (rotation). Caller flushes
440    /// first; both locks taken in canonical order.
441    fn rotate_file(&self, new_file: File) {
442        let mut g = self.state.lock().expect("wal state poisoned");
443        let mut f = self.file.lock().expect("wal file poisoned");
444        *f = new_file;
445        g.written_len = 0;
446    }
447
448    fn written_len(&self) -> u64 {
449        let g = self.state.lock().expect("wal state poisoned");
450        g.written_len + g.buf.len() as u64
451    }
452}
453
454impl WalTicket {
455    /// Block until the record this ticket covers is durable.
456    ///
457    /// Under `SPG_SYNCHRONOUS_COMMIT=off` this returns
458    /// immediately — the background flusher (or the next
459    /// checkpoint / clean shutdown) makes the record durable
460    /// within `SPG_WAL_WRITER_DELAY_MS`. Same contract as PG's
461    /// `synchronous_commit = off`.
462    ///
463    /// # Errors
464    /// Surfaces the leader's IO error if the batch flush failed
465    /// (the WAL is then poisoned for all subsequent writes).
466    pub fn wait(&self) -> Result<(), EngineError> {
467        if !synchronous_commit_on() {
468            return Ok(());
469        }
470        self.group.wait_flushed(self.seq)
471    }
472}
473
474/// v7.19 P3 — retention sweep loop. Runs in a dedicated thread
475/// spawned by `Database::open_path` when `SPG_PITR_RETENTION_HOURS`
476/// is set to a non-zero value. Wakes every
477/// `SPG_PITR_RETENTION_CHECK_SEC` (default 60 s), enumerates chunks
478/// under `wal_dir`, archives via `SPG_PITR_ARCHIVE_CMD` if set, and
479/// deletes anything older than `retention_hours`.
480///
481/// Loud-failure posture matches PG's `archive_command`: if the
482/// archive command returns non-zero, the chunk stays on disk and
483/// a warning prints to stderr. The retention sweep doesn't delete
484/// a chunk it failed to archive.
485fn retention_sweep_loop(
486    wal_dir: PathBuf,
487    retention_hours: u64,
488    check_interval: std::time::Duration,
489    archive_cmd: Option<String>,
490    shutdown: Arc<AtomicBool>,
491) {
492    while !shutdown.load(Ordering::SeqCst) {
493        if let Err(e) = retention_sweep_once(&wal_dir, retention_hours, archive_cmd.as_deref()) {
494            eprintln!("spg-embedded: retention sweep error: {e}");
495        }
496        // Sleep in short ticks so shutdown isn't blocked on a
497        // 60 s naptime when Drop signals.
498        let mut elapsed = std::time::Duration::ZERO;
499        let tick = std::time::Duration::from_millis(250);
500        while elapsed < check_interval {
501            if shutdown.load(Ordering::SeqCst) {
502                return;
503            }
504            std::thread::sleep(tick);
505            elapsed += tick;
506        }
507    }
508}
509
510/// v7.19 P3 — one retention sweep pass over `wal_dir`. Extracted
511/// from the loop so tests can drive it directly. Public so the
512/// e2e_pitr_retention integration test (and any future operator
513/// tooling that wants synchronous retention) can call it.
514pub fn retention_sweep_once(
515    wal_dir: &Path,
516    retention_hours: u64,
517    archive_cmd: Option<&str>,
518) -> std::io::Result<()> {
519    if !wal_dir.exists() {
520        return Ok(());
521    }
522    let now_us = wall_clock_micros();
523    let cutoff_us = (now_us as i128 - (retention_hours as i128 * 3_600 * 1_000_000)) as i64;
524    let chunks = sorted_wal_chunks(wal_dir)?;
525    for chunk in chunks {
526        // Don't sweep the most-recent chunk; it's the live one
527        // execute() is appending to. Compare against the largest
528        // filename-prefix unix_us.
529        let stem = match chunk.file_stem().and_then(|s| s.to_str()) {
530            Some(s) => s,
531            None => continue,
532        };
533        let chunk_us: i64 = stem
534            .split_once('_')
535            .and_then(|(prefix, _)| i64::from_str_radix(prefix, 16).ok())
536            .unwrap_or(0);
537        if chunk_us >= cutoff_us {
538            continue;
539        }
540        // Archive first if requested.
541        if let Some(cmd) = archive_cmd {
542            if !cmd.is_empty() {
543                let output = std::process::Command::new("sh")
544                    .arg("-c")
545                    .arg(cmd)
546                    .arg("--")
547                    .arg(&chunk)
548                    .output()?;
549                if !output.status.success() {
550                    eprintln!(
551                        "spg-embedded: SPG_PITR_ARCHIVE_CMD failed for {} (exit {}); chunk stays on disk",
552                        chunk.display(),
553                        output.status.code().unwrap_or(-1)
554                    );
555                    continue;
556                }
557            }
558        }
559        // Delete the chunk + its sibling .checksum if present.
560        if let Err(e) = std::fs::remove_file(&chunk) {
561            eprintln!(
562                "spg-embedded: retention remove {} failed: {e}",
563                chunk.display()
564            );
565            continue;
566        }
567        let mut cs = chunk.clone();
568        let mut name = cs.file_name().map(|n| n.to_os_string()).unwrap_or_default();
569        name.push(".checksum");
570        cs.set_file_name(name);
571        let _ = std::fs::remove_file(&cs);
572    }
573    Ok(())
574}
575
576/// v7.20 — group-commit delay window in µs (PG `commit_delay`
577/// analogue). The flush leader sleeps this long before taking
578/// the batch so concurrent writers pile in. Default 150 µs;
579/// `SPG_COMMIT_DELAY_US=0` disables.
580fn commit_delay_us() -> u64 {
581    static CACHED: std::sync::OnceLock<u64> = std::sync::OnceLock::new();
582    *CACHED.get_or_init(|| {
583        std::env::var("SPG_COMMIT_DELAY_US")
584            .ok()
585            .and_then(|s| s.parse::<u64>().ok())
586            .unwrap_or(150)
587    })
588}
589
590/// v7.20 — PG `synchronous_commit` analogue. `on` (default):
591/// `execute()` blocks until its WAL record is fsynced —
592/// zero-loss durability. `off`: `execute()` returns after the
593/// in-memory mutation + WAL enqueue; a background flusher
594/// thread writes + fsyncs every `SPG_WAL_WRITER_DELAY_MS`
595/// (default 200 ms — PG's `wal_writer_delay` default). Crash
596/// window = up to one flush interval of confirmed-but-unsynced
597/// commits — exactly the trade PG documents for the same
598/// setting. Clean shutdown (Drop / checkpoint) always flushes.
599fn synchronous_commit_on() -> bool {
600    static CACHED: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
601    *CACHED.get_or_init(|| {
602        !std::env::var("SPG_SYNCHRONOUS_COMMIT")
603            .map(|v| v.eq_ignore_ascii_case("off") || v == "0" || v.eq_ignore_ascii_case("false"))
604            .unwrap_or(false)
605    })
606}
607
608/// v7.20 — background WAL flusher cadence for
609/// `SPG_SYNCHRONOUS_COMMIT=off` (PG `wal_writer_delay`).
610fn wal_writer_delay_ms() -> u64 {
611    static CACHED: std::sync::OnceLock<u64> = std::sync::OnceLock::new();
612    *CACHED.get_or_init(|| {
613        std::env::var("SPG_WAL_WRITER_DELAY_MS")
614            .ok()
615            .and_then(|s| s.parse::<u64>().ok())
616            .filter(|&n| n > 0)
617            .unwrap_or(200)
618    })
619}
620
621fn pitr_retention_hours() -> u64 {
622    std::env::var("SPG_PITR_RETENTION_HOURS")
623        .ok()
624        .and_then(|s| s.parse::<u64>().ok())
625        .unwrap_or(0)
626}
627
628fn pitr_retention_check_sec() -> u64 {
629    std::env::var("SPG_PITR_RETENTION_CHECK_SEC")
630        .ok()
631        .and_then(|s| s.parse::<u64>().ok())
632        .filter(|&n| n > 0)
633        .unwrap_or(60)
634}
635
636fn pitr_archive_cmd() -> Option<String> {
637    std::env::var("SPG_PITR_ARCHIVE_CMD")
638        .ok()
639        .filter(|s| !s.is_empty())
640}
641
642/// v7.19 — replay every record from `wal_bytes` whose
643/// `commit_lsn` is strictly greater than `floor_lsn`. v3 records
644/// (no LSN) and v4 records with `commit_lsn <= floor_lsn` are
645/// skipped — the snapshot loaded ahead of this call already
646/// reflects them, and re-applying would DuplicateTable /
647/// double-insert. v3 records inside the legacy migration chunk
648/// always apply because the migration sets `floor_lsn = 0` and
649/// v3 records carry no LSN to compare; the pre-migration
650/// behaviour (every record replays) is what the migration
651/// preserves.
652///
653/// Returns the count of records successfully applied. Same
654/// torn-tail semantics as `replay_wal_into_engine`.
655fn replay_wal_filtered(
656    wal_bytes: &[u8],
657    engine: &mut Engine,
658    floor_lsn: u64,
659    quarantine: &mut Vec<QuarantinedStmt>,
660) -> Result<usize, String> {
661    let records = parse_wal_records(wal_bytes)?;
662    let mut applied = 0usize;
663    for r in &records {
664        // Skip markers + non-SQL records.
665        if r.type_byte == WAL_V3_TYPE_DURABILITY_CHECKPOINT
666            || r.type_byte == WAL_V4_TYPE_CHECKPOINT_MARKER
667        {
668            continue;
669        }
670        // v4 SQL records carry an LSN. Apply iff strictly above
671        // the snapshot floor.
672        if r.type_byte == WAL_V4_TYPE_AUTO_COMMIT_SQL || r.type_byte == WAL_V4_TYPE_TX_COMMIT_SQL {
673            if let Some(lsn) = r.commit_lsn {
674                if lsn <= floor_lsn {
675                    continue;
676                }
677            }
678        }
679        // v3 records (type 0x01, no LSN) always apply — the
680        // legacy migration path is the only place they appear,
681        // and floor_lsn=0 there.
682        let sql = match std::str::from_utf8(r.sql) {
683            Ok(s) => s,
684            Err(e) => return Err(format!("non-UTF-8 SQL at offset {}: {e}", r.offset)),
685        };
686        // v7.21 — a tx-commit record carries the whole transaction
687        // as a `";\n"`-joined script; auto-commit records are a
688        // single statement, for which split_statements is a no-op.
689        //
690        // v7.30.1 (mailrs round-24 ask 2) — a statement the engine
691        // REJECTS is quarantined, not fatal: "one statement failed
692        // to replay" ≠ "the catalog is corrupt". Framing damage
693        // (parse_wal_records / non-UTF-8 above) still errors — that
694        // IS corruption. Subsequent statements of a tx script keep
695        // applying: the bricking class is a no-op-at-runtime
696        // statement that re-applies non-idempotently, and skipping
697        // just it reconstructs the runtime state.
698        for stmt in split_statements(sql) {
699            if let Err(e) = engine.execute(stmt) {
700                quarantine.push(QuarantinedStmt {
701                    offset: r.offset,
702                    sql: stmt.to_string(),
703                    error: format!("{e:?}"),
704                });
705            }
706        }
707        applied += 1;
708    }
709    Ok(applied)
710}
711
712/// v7.30.1 (mailrs round-24 ask 2) — one statement that failed to
713/// re-apply during boot replay. Kept for forensics in a
714/// `quarantine-*.log` beside the WAL chunks; the boot continues.
715struct QuarantinedStmt {
716    offset: usize,
717    sql: String,
718    error: String,
719}
720
721fn format_quarantine_line(q: &QuarantinedStmt) -> String {
722    format!("offset {}: {}\n  rejected: {}\n", q.offset, q.sql, q.error)
723}
724
725/// v7.19 — WAL chunk filename format. Zero-padded 16-digit
726/// hex on both parts so default lexicographic sort matches
727/// numeric order, with the unix_us prefix coming first so
728/// the on-disk listing is chronological too.
729fn chunk_filename(unix_us: i64, leading_lsn: u64) -> String {
730    // Negative timestamps shouldn't happen in practice (we sit
731    // post-1970), but clamp to 0 so the zero-padded
732    // representation stays sortable.
733    let us = unix_us.max(0) as u64;
734    format!("{us:016x}_{leading_lsn:016x}.wal")
735}
736
737/// v7.19 — filename used for the legacy single-file WAL when
738/// `open_path` migrates a v7.18-layout database into the new
739/// chunk directory. Lexicographically smallest possible value
740/// so subsequent chunks sort after it.
741fn legacy_chunk_filename() -> String {
742    chunk_filename(0, 0)
743}
744
745/// v7.19 — list every `.wal` file in `wal_dir` in
746/// lexicographic order (which doubles as chunk-creation
747/// order thanks to the zero-padded filename format).
748fn sorted_wal_chunks(wal_dir: &Path) -> std::io::Result<Vec<PathBuf>> {
749    let mut paths = Vec::new();
750    let read_dir = match std::fs::read_dir(wal_dir) {
751        Ok(rd) => rd,
752        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(paths),
753        Err(e) => return Err(e),
754    };
755    for entry in read_dir {
756        let entry = entry?;
757        let path = entry.path();
758        if path.extension().and_then(|s| s.to_str()) == Some("wal") {
759            paths.push(path);
760        }
761    }
762    paths.sort();
763    Ok(paths)
764}
765
766/// v7.18 PITR — encode one v4 `checkpoint_marker` record. Layout:
767///
768/// ```text
769/// [u32 LE (payload_len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
770/// [u32 LE crc32 over (type_byte || payload)]
771/// [u8  type = 0x11]
772/// payload:
773///   [u64 LE checkpoint_lsn]
774///   [i64 LE checkpoint_unix_us  (WAL_V4_NO_CLOCK if no clock)]
775///   [u16 LE snapshot_path_len]
776///   [snapshot_path_bytes]
777/// ```
778///
779/// `payload_len` covers only the payload — keeping the framing
780/// uniform across v3 / v4 record types so torn-write detection in
781/// `replay_wal_into_engine` stays trivial.
782fn encode_v4_checkpoint_marker(
783    checkpoint_lsn: u64,
784    checkpoint_unix_us: i64,
785    snapshot_path: &Path,
786) -> Vec<u8> {
787    let snapshot_bytes = snapshot_path.to_string_lossy().into_owned();
788    let snap_payload = snapshot_bytes.as_bytes();
789    let snap_len_u16: u16 = snap_payload.len().min(u16::MAX as usize) as u16;
790    let mut payload = Vec::with_capacity(8 + 8 + 2 + snap_payload.len());
791    payload.extend_from_slice(&checkpoint_lsn.to_le_bytes());
792    payload.extend_from_slice(&checkpoint_unix_us.to_le_bytes());
793    payload.extend_from_slice(&snap_len_u16.to_le_bytes());
794    payload.extend_from_slice(&snap_payload[..snap_len_u16 as usize]);
795    let mut crc_buf = Vec::with_capacity(1 + payload.len());
796    crc_buf.push(WAL_V4_TYPE_CHECKPOINT_MARKER);
797    crc_buf.extend_from_slice(&payload);
798    let crc = spg_crypto::crc32::crc32(&crc_buf);
799    let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
800    let mut out = Vec::with_capacity(4 + 4 + 1 + payload.len());
801    out.extend_from_slice(&header);
802    out.extend_from_slice(&crc.to_le_bytes());
803    out.push(WAL_V4_TYPE_CHECKPOINT_MARKER);
804    out.extend_from_slice(&payload);
805    out
806}
807
808/// v7.18 PITR — encode one v4 `auto_commit_sql` record. Layout:
809///
810/// ```text
811/// [u32 LE (sql_len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
812/// [u32 LE crc32 over (type_byte || lsn || ts || sql_bytes)]
813/// [u8  type = 0x10]
814/// [u64 LE commit_lsn]
815/// [i64 LE commit_unix_us  (= WAL_V4_NO_CLOCK when no ClockFn)]
816/// [sql bytes]
817/// ```
818///
819/// `sql_len` field stays the SQL byte count — same shape as v3 — so
820/// replay-buffer torn-write detection compares against
821/// `WAL_V4_EXTRA_HEADER + sql_len`. v3 records (type 0x01) stay
822/// readable by the same loop with their original 9-byte header
823/// arithmetic.
824fn encode_v4_auto_commit(sql: &str, commit_lsn: u64, commit_unix_us: i64) -> Vec<u8> {
825    encode_v4_sql_record(WAL_V4_TYPE_AUTO_COMMIT_SQL, sql, commit_lsn, commit_unix_us)
826}
827
828/// v7.21 — same envelope, `WAL_V4_TYPE_TX_COMMIT_SQL` type byte.
829/// `script` = the transaction's statements joined with `";\n"`.
830fn encode_v4_tx_commit(script: &str, commit_lsn: u64, commit_unix_us: i64) -> Vec<u8> {
831    encode_v4_sql_record(
832        WAL_V4_TYPE_TX_COMMIT_SQL,
833        script,
834        commit_lsn,
835        commit_unix_us,
836    )
837}
838
839fn encode_v4_sql_record(type_byte: u8, sql: &str, commit_lsn: u64, commit_unix_us: i64) -> Vec<u8> {
840    let payload = sql.as_bytes();
841    let mut crc_buf = Vec::with_capacity(1 + WAL_V4_EXTRA_HEADER + payload.len());
842    crc_buf.push(type_byte);
843    crc_buf.extend_from_slice(&commit_lsn.to_le_bytes());
844    crc_buf.extend_from_slice(&commit_unix_us.to_le_bytes());
845    crc_buf.extend_from_slice(payload);
846    let crc = spg_crypto::crc32::crc32(&crc_buf);
847    let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
848    let mut out = Vec::with_capacity(4 + 4 + 1 + WAL_V4_EXTRA_HEADER + payload.len());
849    out.extend_from_slice(&header);
850    out.extend_from_slice(&crc.to_le_bytes());
851    out.push(type_byte);
852    out.extend_from_slice(&commit_lsn.to_le_bytes());
853    out.extend_from_slice(&commit_unix_us.to_le_bytes());
854    out.extend_from_slice(payload);
855    out
856}
857
858/// v7.1 — decode + apply every record in `wal_bytes` to `engine`.
859/// Returns the count of records successfully applied. A truncated
860/// trailing record (mid-write torn) is dropped silently — the
861/// same recovery story `spg-server`'s boot path uses.
862fn replay_wal_into_engine(wal_bytes: &[u8], engine: &mut Engine) -> Result<usize, String> {
863    let mut applied = 0usize;
864    let mut cur = 0usize;
865    while cur < wal_bytes.len() {
866        if wal_bytes.len() - cur < 4 {
867            // Trailing partial header — torn write, drop and stop.
868            break;
869        }
870        let raw_len = u32::from_le_bytes(wal_bytes[cur..cur + 4].try_into().unwrap());
871        let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
872        let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
873        let len_mask = if is_v3 {
874            !(WAL_V2_SENTINEL | WAL_V3_FLAG)
875        } else {
876            !WAL_V2_SENTINEL
877        };
878        let rec_len = (raw_len & len_mask) as usize;
879        let header_len = if is_v3 {
880            9
881        } else if is_v2 {
882            8
883        } else {
884            4
885        };
886        if wal_bytes.len() - cur < header_len + rec_len {
887            // Torn record at the tail — drop, stop.
888            break;
889        }
890        if is_v3 {
891            let type_byte = wal_bytes[cur + 8];
892            match type_byte {
893                WAL_V3_TYPE_AUTO_COMMIT_SQL => {}
894                WAL_V3_TYPE_DURABILITY_CHECKPOINT => {
895                    // durability_checkpoint marker — skip, no SQL.
896                    cur += header_len + rec_len;
897                    continue;
898                }
899                WAL_V4_TYPE_CHECKPOINT_MARKER => {
900                    // v7.18 PITR — checkpoint anchor, skip on replay
901                    // (engine state past this point reflects the
902                    // matching snapshot already loaded by the caller).
903                    cur += header_len + rec_len;
904                    continue;
905                }
906                WAL_V4_TYPE_AUTO_COMMIT_SQL | WAL_V4_TYPE_TX_COMMIT_SQL => {
907                    // v7.18 PITR — v4 record carries 16 bytes of
908                    // (commit_lsn, commit_unix_us) between the type
909                    // byte and the SQL payload. Replay reads them but
910                    // does not enforce them — the engine doesn't
911                    // surface LSN/clock here. Restore tooling
912                    // (spgctl) parses them via parse_wal_record below.
913                    //
914                    // v7.21 — tx-commit records (0x12) carry a whole
915                    // transaction as a `";\n"`-joined script;
916                    // split_statements is a no-op on the single-
917                    // statement auto-commit form.
918                    let v4_total = header_len + WAL_V4_EXTRA_HEADER + rec_len;
919                    if wal_bytes.len() - cur < v4_total {
920                        // Torn v4 record at the tail — drop, stop.
921                        break;
922                    }
923                    let sql_start = cur + header_len + WAL_V4_EXTRA_HEADER;
924                    let sql_bytes = &wal_bytes[sql_start..sql_start + rec_len];
925                    let sql = std::str::from_utf8(sql_bytes)
926                        .map_err(|e| format!("WAL replay: non-UTF-8 SQL at offset {cur}: {e}"))?;
927                    for stmt in split_statements(sql) {
928                        engine.execute(stmt).map_err(|e| {
929                            format!("WAL replay: apply {stmt:?} at offset {cur} rejected: {e:?}")
930                        })?;
931                    }
932                    applied += 1;
933                    cur += v4_total;
934                    continue;
935                }
936                other => {
937                    return Err(format!(
938                        "WAL replay: unknown v3 type byte {other:#04x} at offset {cur}"
939                    ));
940                }
941            }
942        }
943        let sql_bytes = &wal_bytes[cur + header_len..cur + header_len + rec_len];
944        let sql = std::str::from_utf8(sql_bytes)
945            .map_err(|e| format!("WAL replay: non-UTF-8 SQL at offset {cur}: {e}"))?;
946        engine
947            .execute(sql)
948            .map_err(|e| format!("WAL replay: apply {sql:?} at offset {cur} rejected: {e:?}"))?;
949        applied += 1;
950        cur += header_len + rec_len;
951    }
952    Ok(applied)
953}
954
955/// v7.18 PITR — parsed WAL record, surfaced for restore / verify
956/// tooling. The replay loop above doesn't expose LSN/timestamp;
957/// `spgctl restore --to <timestamp>` and `spgctl verify` need them.
958/// Returned offsets are byte-positions inside the WAL buffer.
959#[derive(Debug, Clone)]
960pub struct WalRecord<'a> {
961    /// Byte offset in the WAL buffer where this record starts.
962    pub offset: usize,
963    /// Type byte (0x01 = v3 auto-commit, 0x10 = v4 auto-commit,
964    /// 0x02 = durability checkpoint marker).
965    pub type_byte: u8,
966    /// `Some(lsn)` for v4 records, `None` for v3.
967    pub commit_lsn: Option<u64>,
968    /// `Some(unix_us)` for v4 records carrying a clock-set timestamp,
969    /// `None` for v3 or for v4 records explicitly written with
970    /// `WAL_V4_NO_CLOCK` (sentinel for "no ClockFn at commit time").
971    pub commit_unix_us: Option<i64>,
972    /// SQL payload as borrowed bytes. Empty for durability markers.
973    pub sql: &'a [u8],
974}
975
976/// v7.18 PITR — iterate over `wal_bytes` yielding one `WalRecord`
977/// per intact record. Torn-tail records terminate iteration
978/// silently (same recovery story as `replay_wal_into_engine`).
979/// Unknown type bytes inside a v3 envelope return `Err` so the
980/// caller knows the WAL was written by a newer SPG.
981pub fn parse_wal_records(wal_bytes: &[u8]) -> Result<Vec<WalRecord<'_>>, String> {
982    let mut out = Vec::new();
983    let mut cur = 0usize;
984    while cur < wal_bytes.len() {
985        if wal_bytes.len() - cur < 4 {
986            break;
987        }
988        let raw_len = u32::from_le_bytes(wal_bytes[cur..cur + 4].try_into().unwrap());
989        let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
990        let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
991        let len_mask = if is_v3 {
992            !(WAL_V2_SENTINEL | WAL_V3_FLAG)
993        } else {
994            !WAL_V2_SENTINEL
995        };
996        let rec_len = (raw_len & len_mask) as usize;
997        let header_len = if is_v3 {
998            9
999        } else if is_v2 {
1000            8
1001        } else {
1002            4
1003        };
1004        if wal_bytes.len() - cur < header_len + rec_len {
1005            break;
1006        }
1007        if !is_v3 {
1008            // v1 / v2 records carry no type byte; treat as legacy
1009            // auto-commit SQL with no LSN/time.
1010            let sql = &wal_bytes[cur + header_len..cur + header_len + rec_len];
1011            out.push(WalRecord {
1012                offset: cur,
1013                type_byte: WAL_V3_TYPE_AUTO_COMMIT_SQL,
1014                commit_lsn: None,
1015                commit_unix_us: None,
1016                sql,
1017            });
1018            cur += header_len + rec_len;
1019            continue;
1020        }
1021        let type_byte = wal_bytes[cur + 8];
1022        match type_byte {
1023            WAL_V3_TYPE_AUTO_COMMIT_SQL => {
1024                let sql = &wal_bytes[cur + header_len..cur + header_len + rec_len];
1025                out.push(WalRecord {
1026                    offset: cur,
1027                    type_byte,
1028                    commit_lsn: None,
1029                    commit_unix_us: None,
1030                    sql,
1031                });
1032                cur += header_len + rec_len;
1033            }
1034            WAL_V3_TYPE_DURABILITY_CHECKPOINT => {
1035                out.push(WalRecord {
1036                    offset: cur,
1037                    type_byte,
1038                    commit_lsn: None,
1039                    commit_unix_us: None,
1040                    sql: &[],
1041                });
1042                cur += header_len + rec_len;
1043            }
1044            WAL_V4_TYPE_CHECKPOINT_MARKER => {
1045                // v7.18 PITR — payload = (lsn u64)(ts i64)(path_len u16)(path bytes).
1046                // We surface lsn + ts on the WalRecord; the path lives
1047                // in `sql` since the type byte already disambiguates
1048                // record meaning and adding a dedicated field would
1049                // bloat the iterator return type for every variant.
1050                if rec_len < 18 {
1051                    return Err(format!(
1052                        "WAL parse: checkpoint marker at offset {cur} too short ({rec_len} bytes)"
1053                    ));
1054                }
1055                let lsn = u64::from_le_bytes(
1056                    wal_bytes[cur + header_len..cur + header_len + 8]
1057                        .try_into()
1058                        .unwrap(),
1059                );
1060                let ts_raw = i64::from_le_bytes(
1061                    wal_bytes[cur + header_len + 8..cur + header_len + 16]
1062                        .try_into()
1063                        .unwrap(),
1064                );
1065                let path_len = u16::from_le_bytes(
1066                    wal_bytes[cur + header_len + 16..cur + header_len + 18]
1067                        .try_into()
1068                        .unwrap(),
1069                ) as usize;
1070                if rec_len < 18 + path_len {
1071                    return Err(format!(
1072                        "WAL parse: checkpoint marker at offset {cur} truncated path"
1073                    ));
1074                }
1075                let path_start = cur + header_len + 18;
1076                let path_bytes = &wal_bytes[path_start..path_start + path_len];
1077                let commit_unix_us = if ts_raw == WAL_V4_NO_CLOCK {
1078                    None
1079                } else {
1080                    Some(ts_raw)
1081                };
1082                out.push(WalRecord {
1083                    offset: cur,
1084                    type_byte,
1085                    commit_lsn: Some(lsn),
1086                    commit_unix_us,
1087                    sql: path_bytes,
1088                });
1089                cur += header_len + rec_len;
1090            }
1091            WAL_V4_TYPE_AUTO_COMMIT_SQL | WAL_V4_TYPE_TX_COMMIT_SQL => {
1092                let v4_total = header_len + WAL_V4_EXTRA_HEADER + rec_len;
1093                if wal_bytes.len() - cur < v4_total {
1094                    break;
1095                }
1096                let lsn = u64::from_le_bytes(
1097                    wal_bytes[cur + header_len..cur + header_len + 8]
1098                        .try_into()
1099                        .unwrap(),
1100                );
1101                let ts_raw = i64::from_le_bytes(
1102                    wal_bytes[cur + header_len + 8..cur + header_len + 16]
1103                        .try_into()
1104                        .unwrap(),
1105                );
1106                let commit_unix_us = if ts_raw == WAL_V4_NO_CLOCK {
1107                    None
1108                } else {
1109                    Some(ts_raw)
1110                };
1111                let sql_start = cur + header_len + WAL_V4_EXTRA_HEADER;
1112                let sql = &wal_bytes[sql_start..sql_start + rec_len];
1113                out.push(WalRecord {
1114                    offset: cur,
1115                    type_byte,
1116                    commit_lsn: Some(lsn),
1117                    commit_unix_us,
1118                    sql,
1119                });
1120                cur += v4_total;
1121            }
1122            other => {
1123                return Err(format!(
1124                    "WAL parse: unknown type byte {other:#04x} at offset {cur}"
1125                ));
1126            }
1127        }
1128    }
1129    Ok(out)
1130}
1131
1132/// v7.1 — predicate for "should the next `execute()` mutate the
1133/// WAL?" Returns `false` for SELECT / SHOW / EXPLAIN / BEGIN /
1134/// COMMIT / ROLLBACK and the SPG-specific verbs that don't go
1135/// through the auto-commit record path on the server (CHECKPOINT,
1136/// COMPACT). Conservative: anything we don't explicitly know is
1137/// read-only falls through to "write a WAL record".
1138fn sql_is_read_only(sql: &str) -> bool {
1139    let t = sql.trim_start();
1140    let head = t
1141        .split(|c: char| c.is_whitespace() || c == ';' || c == '(')
1142        .next()
1143        .unwrap_or("");
1144    matches!(
1145        head.to_ascii_lowercase().as_str(),
1146        "select"
1147            | "show"
1148            | "explain"
1149            | "begin"
1150            | "commit"
1151            | "rollback"
1152            | "checkpoint"
1153            | "compact"
1154            | "wait"
1155            | "with"
1156    )
1157}
1158
1159/// Embedded SPG database handle. Owns an `Engine` + provides
1160/// ergonomic wrappers around `execute` and `query`. Drops the
1161/// engine on `Drop` — no WAL flush / fsync, because v6.10.3
1162/// is in-memory only.
1163#[derive(Debug)]
1164pub struct Database {
1165    engine: Engine,
1166    /// v7.1 — persistence sidecar. When `Some(p)`, every
1167    /// `execute(sql)` that mutates state appends a v4
1168    /// `auto_commit_sql` WAL record + fsyncs before the call
1169    /// returns; `Drop` writes a final catalog snapshot to
1170    /// `<db_path>` so the next session boots from a clean
1171    /// snapshot + an empty WAL. `None` = in-memory only (the
1172    /// v6.10.3 shape).
1173    persistence: Option<PersistenceCtx>,
1174    /// v7.18 PITR — monotonic per-database commit LSN. Increments
1175    /// before each successful WAL append; bootstrapped at
1176    /// open_path from `max(parse_wal_records → commit_lsn)` so
1177    /// reopen never reuses an LSN. In-memory databases start at
1178    /// 0 and never advance (no WAL = no LSN-meaningful records).
1179    commit_lsn: AtomicU64,
1180    /// v7.21 (round-12 polish) — explicit-transaction WAL buffer.
1181    /// `Some` between an engine-accepted BEGIN and its
1182    /// COMMIT / ROLLBACK on a persistent database. In-transaction
1183    /// mutations only touch the engine's shadow catalog and report
1184    /// `modified_catalog: false`, so the per-statement auto-commit
1185    /// append never fires for them; their bind-final SQL collects
1186    /// here instead and COMMIT flushes the lot as ONE atomic
1187    /// `WAL_V4_TYPE_TX_COMMIT_SQL` record (ROLLBACK just drops it).
1188    /// Always `None` for in-memory databases.
1189    tx_wal: Option<TxWalBuffer>,
1190}
1191
1192/// See [`Database::tx_wal`].
1193#[derive(Debug, Default)]
1194struct TxWalBuffer {
1195    /// Bind-final SQL of every non-read-only statement the engine
1196    /// accepted inside the open transaction, in execution order.
1197    statements: Vec<String>,
1198    /// `(savepoint_name, statements.len() at SAVEPOINT time)` —
1199    /// `ROLLBACK TO SAVEPOINT` truncates `statements` back to the
1200    /// recorded mark so the WAL record matches what the engine
1201    /// keeps. PG name-reuse semantics (latest wins).
1202    savepoints: Vec<(String, usize)>,
1203}
1204
1205/// Statement-level transaction-control classification for the WAL
1206/// buffer. Runs AFTER the engine accepted the statement, so the
1207/// engine stays the single validator — this only mirrors state.
1208enum TxControl {
1209    Begin,
1210    Commit,
1211    Rollback,
1212    RollbackToSavepoint(String),
1213    Savepoint(String),
1214    ReleaseSavepoint,
1215}
1216
1217fn tx_control_kind(sql: &str) -> Option<TxControl> {
1218    let mut words = sql
1219        .split(|c: char| c.is_whitespace() || c == ';')
1220        .filter(|w| !w.is_empty())
1221        .map(str::to_ascii_lowercase);
1222    let head = words.next()?;
1223    match head.as_str() {
1224        "begin" | "start" => Some(TxControl::Begin),
1225        "commit" | "end" => Some(TxControl::Commit),
1226        "savepoint" => words.next().map(TxControl::Savepoint),
1227        "release" => Some(TxControl::ReleaseSavepoint),
1228        "rollback" => match words.next().as_deref() {
1229            // ROLLBACK TO [SAVEPOINT] <name>
1230            Some("to") => {
1231                let next = words.next()?;
1232                let name = if next == "savepoint" {
1233                    words.next()?
1234                } else {
1235                    next
1236                };
1237                Some(TxControl::RollbackToSavepoint(name))
1238            }
1239            _ => Some(TxControl::Rollback),
1240        },
1241        _ => None,
1242    }
1243}
1244
1245#[derive(Debug)]
1246#[allow(dead_code)] // `wal_dir`/`current_chunk_path` are read at boot; kept for Drop/diag introspection.
1247struct PersistenceCtx {
1248    db_path: PathBuf,
1249    /// v7.19 — WAL chunk directory at `<db_path>.wal/`.
1250    /// Replaces the v7.18 single-file `<db_path>.wal` layout.
1251    /// Each chunk file inside is named
1252    /// `<unix_us>_<leading_lsn>.wal` (zero-padded to 16 digits
1253    /// so default-lex sort = LSN order).
1254    wal_dir: PathBuf,
1255    /// Path of the currently-open chunk file inside `wal_dir`.
1256    /// Rotated at checkpoint and whenever the chunk crosses
1257    /// `checkpoint_threshold_bytes`.
1258    current_chunk_path: PathBuf,
1259    /// v7.19 P3 — retention sweeper handle. `Some` when
1260    /// `SPG_PITR_RETENTION_HOURS > 0` at open_path time; `None`
1261    /// when retention is disabled (the default; v7.18 behaviour
1262    /// preserved). The thread polls `wal_dir` every
1263    /// `SPG_PITR_RETENTION_CHECK_SEC` seconds, archives via
1264    /// `SPG_PITR_ARCHIVE_CMD` if set, then deletes chunks older
1265    /// than the retention window. Signalled to exit via
1266    /// `retention_shutdown` on Drop.
1267    retention_shutdown: Option<Arc<AtomicBool>>,
1268    retention_thread: Option<std::thread::JoinHandle<()>>,
1269    /// v7.20 — background WAL flusher for
1270    /// `SPG_SYNCHRONOUS_COMMIT=off`. `None` in the default
1271    /// synchronous mode. Flushes the pending batch every
1272    /// `SPG_WAL_WRITER_DELAY_MS`; signalled + joined on Drop
1273    /// before the final checkpoint so clean shutdown never
1274    /// loses confirmed commits.
1275    flusher_shutdown: Option<Arc<AtomicBool>>,
1276    flusher_thread: Option<std::thread::JoinHandle<()>>,
1277    /// v7.20 P2 — group-commit WAL. Shared with WalTickets
1278    /// returned by the buffered write path so `wait()` can run
1279    /// after the engine write lock is released.
1280    wal: Arc<WalGroup>,
1281    checkpoint_threshold_bytes: u64,
1282    /// v7.1.4 — `<db_path>.spg/segments/` directory. Cold-tier
1283    /// segments produced by `freeze_oldest_to_cold` / compaction
1284    /// are persisted here as `seg_<id>.spg` files; the manifest
1285    /// at `<db_path>.spg/manifest.v10` records every active
1286    /// segment + its CRC32 so the next boot can verify + reload.
1287    cold_segments_dir: PathBuf,
1288    cold_segment_paths: BTreeMap<u32, PathBuf>,
1289    /// v7.17.0 Phase 6.2 — cross-process exclusion lock. Acquired
1290    /// via `fs::create_dir` on `<db_path>.lock` at open_path
1291    /// entry; released on Drop by `fs::remove_dir`. atomic on
1292    /// every supported platform. A second process opening the
1293    /// same path while the first is still alive hits the
1294    /// create_dir failure and returns
1295    /// `EngineError::Unsupported("database is locked by another
1296    /// process: …")`. Stale locks (process crashed mid-session)
1297    /// must be cleared via `Database::force_unlock(path)` —
1298    /// SPG can't safely fingerprint who owned a stale directory
1299    /// without a libc dep, which would violate spg-embedded's
1300    /// zero-deps charter.
1301    lock_path: PathBuf,
1302}
1303
1304impl Database {
1305    /// Open a fresh in-memory database. No WAL, no catalog
1306    /// snapshot on disk — perfect for tests + short-lived
1307    /// CLI tools.
1308    #[must_use]
1309    pub fn open_in_memory() -> Self {
1310        Self {
1311            engine: engine_with_query_byte_budget(Engine::new().with_clock(wall_clock_micros)),
1312            persistence: None,
1313            commit_lsn: AtomicU64::new(0),
1314            tx_wal: None,
1315        }
1316    }
1317
1318    /// v7.1 — Open or create a persistent database backed by
1319    /// the file at `db_path`. The WAL lives at `db_path` +
1320    /// ".wal" (e.g. `./data/spg.db` → `./data/spg.db.wal`). Boot
1321    /// path:
1322    ///
1323    /// 1. If `db_path` exists, restore the catalog snapshot.
1324    /// 2. If the WAL exists, replay every record into the
1325    ///    restored engine — the same recovery story
1326    ///    `spg-server` uses.
1327    /// 3. Open the WAL in append+sync mode so subsequent
1328    ///    `execute()` writes durably commit (one fsync per
1329    ///    mutation).
1330    ///
1331    /// `Drop` writes a final catalog snapshot + truncates the
1332    /// WAL — operators that need a sync barrier at a specific
1333    /// point use `checkpoint()` explicitly.
1334    pub fn open_path(db_path: impl AsRef<Path>) -> Result<Self, EngineError> {
1335        let db_path = db_path.as_ref().to_path_buf();
1336        // v7.19 — WAL is a directory of chunk files. Legacy
1337        // single-file path stays variable-named `wal_path` for
1338        // the backward-compat migration block below.
1339        let wal_path = {
1340            let mut p = db_path.clone();
1341            let name = p
1342                .file_name()
1343                .map(|n| {
1344                    let mut s = n.to_os_string();
1345                    s.push(".wal");
1346                    s
1347                })
1348                .unwrap_or_else(|| std::ffi::OsString::from(".wal"));
1349            p.set_file_name(name);
1350            p
1351        };
1352        let wal_dir = wal_path.clone();
1353        if let Some(parent) = db_path.parent()
1354            && !parent.as_os_str().is_empty()
1355        {
1356            std::fs::create_dir_all(parent).map_err(io_err)?;
1357        }
1358        // v7.17.0 Phase 6.2 — acquire cross-process exclusion
1359        // lock before touching any catalog / WAL bytes. atomic
1360        // mkdir on every supported platform; a second process
1361        // opening the same path while the first is still alive
1362        // hits the create_dir failure and gets a clear error.
1363        let lock_path = {
1364            let mut p = db_path.clone();
1365            let name = p
1366                .file_name()
1367                .map(|n| {
1368                    let mut s = n.to_os_string();
1369                    s.push(".lock");
1370                    s
1371                })
1372                .unwrap_or_else(|| std::ffi::OsString::from(".lock"));
1373            p.set_file_name(name);
1374            p
1375        };
1376        acquire_path_lock(&lock_path)?;
1377        let mut engine = if db_path.exists() {
1378            let bytes = std::fs::read(&db_path).map_err(io_err)?;
1379            let engine = Engine::restore_envelope(&bytes).map_err(|e| {
1380                EngineError::Storage(spg_storage::StorageError::Corrupt(format!(
1381                    "restore from {}: {e}",
1382                    db_path.display()
1383                )))
1384            })?;
1385            engine_with_query_byte_budget(engine.with_clock(wall_clock_micros))
1386        } else {
1387            engine_with_query_byte_budget(Engine::new().with_clock(wall_clock_micros))
1388        };
1389        // v7.1.4 — manifest-driven cold-segment reload. The
1390        // manifest sidecar pairs the catalog snapshot CRC with a
1391        // list of `(segment_id, path, crc32)` triples; verify
1392        // before loading so a torn or stale manifest doesn't
1393        // surface phantom data.
1394        let cold_segments_dir = {
1395            let parent = db_path.parent().unwrap_or_else(|| Path::new("."));
1396            let stem = db_path
1397                .file_stem()
1398                .unwrap_or_else(|| std::ffi::OsStr::new("db"))
1399                .to_string_lossy()
1400                .into_owned();
1401            parent.join(format!("{stem}.spg")).join("segments")
1402        };
1403        let mut cold_segment_paths: BTreeMap<u32, PathBuf> = BTreeMap::new();
1404        let manifest_pth = spg_manifest_path(&db_path);
1405        if manifest_pth.exists() && db_path.exists() {
1406            let m_bytes = std::fs::read(&manifest_pth).map_err(io_err)?;
1407            if let Ok(m) = CatalogManifest::deserialize(&m_bytes) {
1408                let snap_bytes = std::fs::read(&db_path).map_err(io_err)?;
1409                let snap_crc = spg_crypto::crc32::crc32(&snap_bytes);
1410                if snap_crc == m.catalog_crc32 {
1411                    for entry in &m.cold_segments {
1412                        if let Ok(seg_bytes) = std::fs::read(&entry.path) {
1413                            let computed = spg_crypto::crc32::crc32(&seg_bytes);
1414                            if computed != entry.crc32 {
1415                                eprintln!(
1416                                    "spg-embedded: manifest skip segment {}: CRC mismatch",
1417                                    entry.segment_id
1418                                );
1419                                continue;
1420                            }
1421                            if engine.catalog().cold_segment(entry.segment_id).is_some() {
1422                                // Already loaded via Catalog::clone path (shouldn't happen
1423                                // since Engine::new + restore_envelope don't populate cold).
1424                                continue;
1425                            }
1426                            let mut new_cat = engine.catalog().clone();
1427                            if let Err(e) =
1428                                new_cat.load_segment_bytes_at(entry.segment_id, seg_bytes)
1429                            {
1430                                eprintln!(
1431                                    "spg-embedded: manifest load segment {} failed: {e}",
1432                                    entry.segment_id
1433                                );
1434                                continue;
1435                            }
1436                            engine.replace_catalog(new_cat);
1437                            cold_segment_paths.insert(entry.segment_id, entry.path.clone());
1438                        } else {
1439                            eprintln!(
1440                                "spg-embedded: manifest skip segment {}: file unreadable",
1441                                entry.segment_id
1442                            );
1443                        }
1444                    }
1445                }
1446            }
1447        }
1448        // v7.19 — chunked WAL on-disk layout.
1449        //
1450        // Three cases handled here:
1451        //
1452        // 1. wal_dir exists as a DIRECTORY → scan its
1453        //    `<unix_us>_<leading_lsn>.wal` chunks (sorted
1454        //    lexicographically = chunk-creation order), replay
1455        //    them in sequence, advance the LSN watermark to the
1456        //    max commit_lsn seen.
1457        //
1458        // 2. wal_path exists as a FILE → legacy v7.18 layout.
1459        //    Migrate it: create `wal_dir/`, move the single file
1460        //    inside as `0000000000000000_0000000000000000.wal`,
1461        //    then fall through to case 1's replay loop.
1462        //
1463        // 3. Neither exists → fresh database; create wal_dir.
1464        let mut initial_lsn: u64 = 0;
1465        if wal_path.is_file() {
1466            // Case 2: legacy single-file WAL migration.
1467            let legacy_bytes = std::fs::read(&wal_path).map_err(io_err)?;
1468            std::fs::remove_file(&wal_path).map_err(io_err)?;
1469            std::fs::create_dir_all(&wal_dir).map_err(io_err)?;
1470            if !legacy_bytes.is_empty() {
1471                let migrated = wal_dir.join(legacy_chunk_filename());
1472                std::fs::write(&migrated, &legacy_bytes).map_err(io_err)?;
1473            }
1474        } else if !wal_dir.exists() {
1475            // Case 3: fresh database.
1476            std::fs::create_dir_all(&wal_dir).map_err(io_err)?;
1477        }
1478        // Cases 1 + 2 share replay logic now that wal_dir is
1479        // guaranteed to exist (and may be empty for case 3).
1480        //
1481        // Two-pass replay so we don't double-apply records the
1482        // snapshot already reflects:
1483        //
1484        // 1. Find the highest commit_lsn carried by a
1485        //    checkpoint_marker across all chunks. That LSN is the
1486        //    snapshot's high-water mark — anything ≤ it is
1487        //    already in `<db_path>` and replaying it would
1488        //    DuplicateTable / double-insert.
1489        // 2. Replay only records strictly above that LSN.
1490        //
1491        // Case 2 migration (legacy single-file WAL) lands here
1492        // too: the migrated chunk has no marker so the LSN floor
1493        // is 0 and every record applies — exactly the v7.18
1494        // behaviour the migration is supposed to preserve.
1495        let chunk_paths = sorted_wal_chunks(&wal_dir).map_err(io_err)?;
1496        let mut snapshot_lsn: u64 = 0;
1497        for chunk in &chunk_paths {
1498            let bytes = std::fs::read(chunk).map_err(io_err)?;
1499            if let Ok(records) = parse_wal_records(&bytes) {
1500                for r in &records {
1501                    if r.type_byte == WAL_V4_TYPE_CHECKPOINT_MARKER {
1502                        if let Some(l) = r.commit_lsn {
1503                            if l > snapshot_lsn {
1504                                snapshot_lsn = l;
1505                            }
1506                        }
1507                    }
1508                }
1509            }
1510        }
1511        let mut quarantined: Vec<QuarantinedStmt> = Vec::new();
1512        for chunk in &chunk_paths {
1513            let bytes = std::fs::read(chunk).map_err(io_err)?;
1514            if bytes.is_empty() {
1515                continue;
1516            }
1517            replay_wal_filtered(&bytes, &mut engine, snapshot_lsn, &mut quarantined)
1518                .map_err(|m| EngineError::Storage(spg_storage::StorageError::Corrupt(m)))?;
1519            if let Ok(records) = parse_wal_records(&bytes) {
1520                if let Some(max) = records.iter().filter_map(|r| r.commit_lsn).max() {
1521                    if max > initial_lsn {
1522                        initial_lsn = max;
1523                    }
1524                }
1525            }
1526        }
1527        // v7.30.1 (mailrs round-24 ask 2) — replay rejects no longer
1528        // brick the open. Persist the rejected statements beside the
1529        // WAL chunks for forensics and say so loudly; the boot
1530        // continues with every other record applied.
1531        if !quarantined.is_empty() {
1532            let mut body = String::new();
1533            for q in &quarantined {
1534                body.push_str(&format_quarantine_line(q));
1535            }
1536            let qpath = wal_dir.join(format!(
1537                "quarantine-{:016x}.log",
1538                wall_clock_micros().max(0) as u64
1539            ));
1540            match std::fs::write(&qpath, &body) {
1541                Ok(()) => eprintln!(
1542                    "spg-embedded: WAL replay quarantined {} statement(s) — boot continues; \
1543                     forensics at {}",
1544                    quarantined.len(),
1545                    qpath.display()
1546                ),
1547                Err(e) => eprintln!(
1548                    "spg-embedded: WAL replay quarantined {} statement(s) — boot continues; \
1549                     quarantine file write FAILED ({e}), entries follow:\n{body}",
1550                    quarantined.len()
1551                ),
1552            }
1553        }
1554        // Open the "current" chunk — either the last existing
1555        // chunk file (so subsequent appends extend it until the
1556        // size threshold rotates) or a fresh first chunk.
1557        let now_us = wall_clock_micros();
1558        let current_chunk_path = if let Some(last) = chunk_paths.last() {
1559            last.clone()
1560        } else {
1561            wal_dir.join(chunk_filename(now_us, initial_lsn + 1))
1562        };
1563        let wal_file = OpenOptions::new()
1564            .create(true)
1565            .append(true)
1566            .read(true)
1567            .open(&current_chunk_path)
1568            .map_err(io_err)?;
1569        let wal_len = wal_file.metadata().map_err(io_err)?.len();
1570        let wal = Arc::new(WalGroup::new(wal_file, wal_len));
1571        // v7.19 P3 — spawn retention sweep thread when the
1572        // operator opted in via SPG_PITR_RETENTION_HOURS > 0.
1573        // Otherwise stay on the v7.18 behaviour (chunks accumulate
1574        // until something else — backup-pitr archival, manual
1575        // cleanup — moves them).
1576        let retention_hours = pitr_retention_hours();
1577        let (retention_shutdown, retention_thread) = if retention_hours > 0 {
1578            let shutdown = Arc::new(AtomicBool::new(false));
1579            let shutdown_clone = Arc::clone(&shutdown);
1580            let wal_dir_clone = wal_dir.clone();
1581            let check_interval = std::time::Duration::from_secs(pitr_retention_check_sec());
1582            let archive_cmd = pitr_archive_cmd();
1583            let handle = std::thread::Builder::new()
1584                .name("spg-pitr-retention".into())
1585                .spawn(move || {
1586                    retention_sweep_loop(
1587                        wal_dir_clone,
1588                        retention_hours,
1589                        check_interval,
1590                        archive_cmd,
1591                        shutdown_clone,
1592                    );
1593                })
1594                .map_err(io_err)?;
1595            (Some(shutdown), Some(handle))
1596        } else {
1597            (None, None)
1598        };
1599        // v7.20 — background flusher for SPG_SYNCHRONOUS_COMMIT=off.
1600        let (flusher_shutdown, flusher_thread) = if synchronous_commit_on() {
1601            (None, None)
1602        } else {
1603            let shutdown = Arc::new(AtomicBool::new(false));
1604            let shutdown_clone = Arc::clone(&shutdown);
1605            let group = Arc::clone(&wal);
1606            let interval = std::time::Duration::from_millis(wal_writer_delay_ms());
1607            let handle = std::thread::Builder::new()
1608                .name("spg-wal-flusher".into())
1609                .spawn(move || {
1610                    while !shutdown_clone.load(Ordering::SeqCst) {
1611                        std::thread::sleep(interval);
1612                        if let Err(e) = group.flush_now() {
1613                            eprintln!("spg-embedded: background WAL flush failed: {e:?}");
1614                        }
1615                    }
1616                    // Final drain on shutdown signal.
1617                    let _ = group.flush_now();
1618                })
1619                .map_err(io_err)?;
1620            (Some(shutdown), Some(handle))
1621        };
1622        Ok(Self {
1623            engine,
1624            commit_lsn: AtomicU64::new(initial_lsn),
1625            tx_wal: None,
1626            persistence: Some(PersistenceCtx {
1627                db_path,
1628                wal_dir,
1629                current_chunk_path,
1630                wal,
1631                checkpoint_threshold_bytes: default_checkpoint_threshold_bytes(),
1632                cold_segments_dir,
1633                cold_segment_paths,
1634                lock_path,
1635                retention_shutdown,
1636                retention_thread,
1637                flusher_shutdown,
1638                flusher_thread,
1639            }),
1640        })
1641    }
1642
1643    /// v7.1.4 — freeze the oldest `max_rows` of `table_name`'s
1644    /// hot tier into a brand-new cold-tier segment + persist
1645    /// it to disk. Same semantics as `spg-server`'s freezer
1646    /// thread; embedded just runs the freeze synchronously on
1647    /// the caller's thread. Persistence + manifest update
1648    /// happen as part of the next `checkpoint()` (or on Drop).
1649    pub fn freeze_oldest_to_cold(
1650        &mut self,
1651        table_name: &str,
1652        index_name: &str,
1653        max_rows: usize,
1654    ) -> Result<spg_storage::FreezeReport, EngineError> {
1655        let report = self
1656            .engine
1657            .freeze_oldest_to_cold(table_name, index_name, max_rows)?;
1658        if let Some(p) = &mut self.persistence {
1659            std::fs::create_dir_all(&p.cold_segments_dir).map_err(io_err)?;
1660            let final_path = p
1661                .cold_segments_dir
1662                .join(format!("seg_{}.spg", report.segment_id));
1663            let tmp_path = p
1664                .cold_segments_dir
1665                .join(format!("seg_{}.spg.tmp", report.segment_id));
1666            std::fs::write(&tmp_path, &report.segment_bytes).map_err(io_err)?;
1667            std::fs::rename(&tmp_path, &final_path).map_err(io_err)?;
1668            p.cold_segment_paths.insert(report.segment_id, final_path);
1669        }
1670        Ok(report)
1671    }
1672
1673    /// v7.1 — override the auto-checkpoint WAL-size ceiling for
1674    /// this `Database` instance. Default is
1675    /// `SPG_EMBEDDED_CHECKPOINT_BYTES` env (4 MiB if unset); the
1676    /// setter wins. No-op when the database is in-memory.
1677    pub fn set_checkpoint_threshold_bytes(&mut self, bytes: u64) {
1678        if let Some(p) = &mut self.persistence {
1679            p.checkpoint_threshold_bytes = bytes.max(1);
1680        }
1681    }
1682
1683    /// v7.31 (memory campaign, round-26 ask 1/ask 4) — per-bucket
1684    /// memory snapshot for the embedding host. Poll it from prod to
1685    /// see where resident bytes live (rows / representation /
1686    /// indexes per table) and to drive host-side shedding before
1687    /// the kernel does it. Same numbers as the server path's
1688    /// `SELECT * FROM spg_memory_stats`.
1689    #[must_use]
1690    pub fn memory_stats(&self) -> spg_engine::MemoryStats {
1691        let mut stats = self.engine.memory_stats();
1692        // v7.31 C2 — fill in bucket D: the engine leaves `wal_bytes`
1693        // None (it has no WAL); we report the live (uncheckpointed)
1694        // WAL footprint via the same `written_len()` meter `metrics()`
1695        // reads. In-memory databases have no persistence → stays None.
1696        if let Some(p) = &self.persistence {
1697            stats.wal_bytes = Some(p.wal.written_len());
1698        }
1699        stats
1700    }
1701
1702    /// v7.1 — flush a fresh catalog snapshot to `db_path` and
1703    /// truncate the WAL. Idempotent; cheap when nothing has
1704    /// happened since the last checkpoint. No-op when the
1705    /// database is in-memory (no `db_path` configured).
1706    ///
1707    /// Called automatically when:
1708    /// - the WAL grows past
1709    ///   `SPG_EMBEDDED_CHECKPOINT_BYTES` (default 4 MiB) at the
1710    ///   end of an `execute()`, and
1711    /// - `Drop` runs (best-effort; checkpoint failure on drop is
1712    ///   logged to stderr).
1713    pub fn checkpoint(&mut self) -> Result<(), EngineError> {
1714        let snapshot = self.engine.snapshot();
1715        let Some(p) = &mut self.persistence else {
1716            return Ok(());
1717        };
1718        // Snapshot first (atomic via tmp+rename), then WAL
1719        // truncate. Same order as `spg-server`'s CHECKPOINT —
1720        // a crash between the two leaves the WAL holding
1721        // already-snapshotted ops, which replay cleanly on the
1722        // next boot (idempotent for SPG's standard DDL/DML
1723        // mutations).
1724        let tmp = {
1725            let mut t = p.db_path.clone();
1726            let mut name = t
1727                .file_name()
1728                .map(std::ffi::OsStr::to_os_string)
1729                .unwrap_or_default();
1730            name.push(".tmp");
1731            t.set_file_name(name);
1732            t
1733        };
1734        std::fs::write(&tmp, &snapshot).map_err(io_err)?;
1735        std::fs::rename(&tmp, &p.db_path).map_err(io_err)?;
1736        // v7.1.4 — refresh the manifest so the next boot can
1737        // reload cold segments alongside the snapshot. Bytes
1738        // come from the freshly-written snapshot file (= the
1739        // canonical CRC source).
1740        if !p.cold_segment_paths.is_empty() {
1741            let snap_crc = spg_crypto::crc32::crc32(&snapshot);
1742            let entries: Vec<ColdSegmentEntry> = p
1743                .cold_segment_paths
1744                .iter()
1745                .filter_map(|(&segment_id, path)| {
1746                    let bytes = std::fs::read(path).ok()?;
1747                    Some(ColdSegmentEntry {
1748                        segment_id,
1749                        path: path.clone(),
1750                        crc32: spg_crypto::crc32::crc32(&bytes),
1751                    })
1752                })
1753                .collect();
1754            let manifest = CatalogManifest {
1755                catalog_crc32: snap_crc,
1756                cold_segments: entries,
1757                wal_baseline_offset: 0,
1758            };
1759            let m_bytes = manifest.serialize();
1760            let m_path = spg_manifest_path(&p.db_path);
1761            if let Some(dir) = m_path.parent() {
1762                std::fs::create_dir_all(dir).map_err(io_err)?;
1763            }
1764            let m_tmp = {
1765                let mut t = m_path.clone();
1766                let mut name = t
1767                    .file_name()
1768                    .map(std::ffi::OsStr::to_os_string)
1769                    .unwrap_or_default();
1770                name.push(".tmp");
1771                t.set_file_name(name);
1772                t
1773            };
1774            std::fs::write(&m_tmp, &m_bytes).map_err(io_err)?;
1775            std::fs::rename(&m_tmp, &m_path).map_err(io_err)?;
1776        }
1777        // v7.19 — append a checkpoint marker to the current chunk
1778        // (anchors restore-to-time backups), then rotate to a
1779        // fresh chunk file. Old chunks stay on disk and become
1780        // input to the retention thread (P3) + spgctl backup-pitr
1781        // (P6). The single-file `set_len(0)` truncate the v7.18
1782        // path used is gone — that path silently discarded WAL
1783        // history between checkpoint and the operator's next cron
1784        // run, which is exactly what PITR was meant to fix.
1785        let marker_lsn = self.commit_lsn.load(Ordering::SeqCst);
1786        let marker_ts = wall_clock_micros();
1787        let marker = encode_v4_checkpoint_marker(marker_lsn, marker_ts, &p.db_path);
1788        // v7.20 P2 — checkpoint holds &mut self (engine
1789        // exclusive), so there are no concurrent enqueues: drain
1790        // the pending batch, append the marker, flush, then
1791        // rotate the chunk handle inside the group.
1792        p.wal.enqueue(&marker);
1793        p.wal.flush_now()?;
1794        let new_chunk_path = p.wal_dir.join(chunk_filename(marker_ts, marker_lsn + 1));
1795        let new_handle = OpenOptions::new()
1796            .create(true)
1797            .append(true)
1798            .read(true)
1799            .open(&new_chunk_path)
1800            .map_err(io_err)?;
1801        p.current_chunk_path = new_chunk_path;
1802        p.wal.rotate_file(new_handle);
1803        Ok(())
1804    }
1805
1806    /// Restore a database from a previously-captured catalog
1807    /// snapshot. Pairs with `Database::snapshot()` for
1808    /// round-tripping in-memory state without going through
1809    /// the `spg-server` WAL.
1810    pub fn restore(snapshot: &[u8]) -> Result<Self, EngineError> {
1811        let engine = Engine::restore_envelope(snapshot).map_err(|e| {
1812            EngineError::Storage(spg_storage::StorageError::Corrupt(format!("restore: {e}")))
1813        })?;
1814        Ok(Self {
1815            engine,
1816            persistence: None,
1817            commit_lsn: AtomicU64::new(0),
1818            tx_wal: None,
1819        })
1820    }
1821
1822    /// Take a catalog snapshot suitable for `Database::restore`.
1823    /// The bytes are SPG's canonical catalog envelope (FILE_MAGIC
1824    /// + version + payload); round-trips through every released
1825    /// SPG version per the STABILITY contract.
1826    #[must_use]
1827    pub fn snapshot(&self) -> Vec<u8> {
1828        self.engine.snapshot()
1829    }
1830
1831    /// Execute a SQL statement and return the engine's
1832    /// `QueryResult` verbatim. Pass-through for callers that
1833    /// want to keep PG-flavoured column/row metadata.
1834    ///
1835    /// v7.1 — when the database was opened via `open_path`,
1836    /// successful mutations are appended to the WAL + fsynced
1837    /// before the call returns. A subsequent process crash will
1838    /// recover state up to the last successful return from
1839    /// `execute()`. Read-only statements (SELECT / SHOW /
1840    /// EXPLAIN / BEGIN-COMMIT-ROLLBACK / CHECKPOINT / COMPACT
1841    /// etc.) skip the WAL entirely.
1842    pub fn execute(&mut self, sql: &str) -> Result<QueryResult, EngineError> {
1843        // v7.20 P2 — single-caller convenience over the buffered
1844        // path: enqueue + immediately wait. Batch size is 1 here,
1845        // so the durability behaviour (one fsync before Ok) is
1846        // identical to v7.19. Concurrent callers go through
1847        // `execute_buffered` (AsyncDatabase does) and share the
1848        // leader's fsync.
1849        let (result, ticket) = self.execute_buffered(sql)?;
1850        if let Some(t) = ticket {
1851            t.wait()?;
1852        }
1853        Ok(result)
1854    }
1855
1856    /// v7.20 P2 — group-commit write entry. Runs the engine
1857    /// mutation + encodes/enqueues the WAL record, then RETURNS
1858    /// WITHOUT waiting for the fsync. The caller must call
1859    /// [`WalTicket::wait`] before treating the write as durable
1860    /// — crucially, the caller can (and should) drop whatever
1861    /// lock guards this `Database` first, so the next writer's
1862    /// mutation overlaps this batch's fsync.
1863    ///
1864    /// `None` ticket = nothing hit the WAL (read-only statement,
1865    /// no-op DDL, or in-memory database) — the result is final
1866    /// as returned.
1867    ///
1868    /// # Errors
1869    /// Engine errors propagate unchanged. Auto-checkpoint (when
1870    /// the active chunk crosses the threshold) runs inline and
1871    /// may surface IO errors.
1872    pub fn execute_buffered(
1873        &mut self,
1874        sql: &str,
1875    ) -> Result<(QueryResult, Option<WalTicket>), EngineError> {
1876        let result = self.engine.execute(sql)?;
1877        let modified = matches!(
1878            &result,
1879            QueryResult::CommandOk {
1880                modified_catalog: true,
1881                ..
1882            }
1883        );
1884        let ticket = self.wal_after_ok(sql, modified)?;
1885        Ok((result, ticket))
1886    }
1887
1888    /// v7.21 (round-12 polish) — post-engine WAL bookkeeping shared
1889    /// by the simple ([`Self::execute_buffered`]) and prepared
1890    /// ([`Self::execute_prepared_buffered`]) write paths. `canonical`
1891    /// is the replay text (bind-final for prepared statements);
1892    /// `modified_catalog` comes from the engine result. Three routes:
1893    ///
1894    /// - transaction control → maintain [`Self::tx_wal`]: BEGIN opens
1895    ///   the buffer, COMMIT flushes it as ONE atomic
1896    ///   `WAL_V4_TYPE_TX_COMMIT_SQL` record, ROLLBACK drops it,
1897    ///   SAVEPOINT / ROLLBACK TO mark / truncate it. The engine has
1898    ///   already accepted the statement, so this only mirrors state.
1899    /// - inside an open transaction → buffer the statement (shadow-
1900    ///   catalog mutations report `modified_catalog: false`, so the
1901    ///   auto-commit arm below can't see them).
1902    /// - auto-commit mutation → classic per-statement v4 record.
1903    ///
1904    /// v7.18 PITR — v4 records carry commit LSN + wall-clock micros.
1905    /// The crash window remains one BATCH: replay re-applies
1906    /// idempotently exactly as before, and a torn batch tail drops
1907    /// cleanly (same torn-write handling).
1908    fn wal_after_ok(
1909        &mut self,
1910        canonical: &str,
1911        modified_catalog: bool,
1912    ) -> Result<Option<WalTicket>, EngineError> {
1913        if self.persistence.is_none() {
1914            return Ok(None);
1915        }
1916        let mut record = None;
1917        match tx_control_kind(canonical) {
1918            Some(TxControl::Begin) => {
1919                self.tx_wal = Some(TxWalBuffer::default());
1920            }
1921            Some(TxControl::Commit) => {
1922                if let Some(buf) = self.tx_wal.take()
1923                    && !buf.statements.is_empty()
1924                {
1925                    let script = buf.statements.join(";\n");
1926                    let lsn = self.commit_lsn.fetch_add(1, Ordering::SeqCst) + 1;
1927                    record = Some(encode_v4_tx_commit(&script, lsn, wall_clock_micros()));
1928                }
1929            }
1930            Some(TxControl::Rollback) => {
1931                self.tx_wal = None;
1932            }
1933            Some(TxControl::Savepoint(name)) => {
1934                if let Some(buf) = &mut self.tx_wal {
1935                    // PG name-reuse semantics: latest mark wins.
1936                    buf.savepoints.retain(|(n, _)| n != &name);
1937                    let mark = buf.statements.len();
1938                    buf.savepoints.push((name, mark));
1939                }
1940            }
1941            Some(TxControl::RollbackToSavepoint(name)) => {
1942                if let Some(buf) = &mut self.tx_wal
1943                    && let Some(pos) = buf.savepoints.iter().position(|(n, _)| n == &name)
1944                {
1945                    let mark = buf.savepoints[pos].1;
1946                    buf.statements.truncate(mark);
1947                    // Later savepoints die with the rollback; the
1948                    // target itself survives (PG keeps it
1949                    // re-rollbackable).
1950                    buf.savepoints.truncate(pos + 1);
1951                }
1952            }
1953            Some(TxControl::ReleaseSavepoint) => {
1954                // RELEASE folds the savepoint into the enclosing tx —
1955                // buffered statements stay. The mark also stays:
1956                // marks are only consulted by ROLLBACK TO, which the
1957                // engine validates first, so a dangling mark is
1958                // unreachable.
1959            }
1960            None => {
1961                if let Some(buf) = &mut self.tx_wal {
1962                    if !sql_is_read_only(canonical) {
1963                        buf.statements.push(canonical.to_string());
1964                    }
1965                } else if modified_catalog && !sql_is_read_only(canonical) {
1966                    let lsn = self.commit_lsn.fetch_add(1, Ordering::SeqCst) + 1;
1967                    record = Some(encode_v4_auto_commit(canonical, lsn, wall_clock_micros()));
1968                }
1969            }
1970        }
1971        let mut ticket = None;
1972        if let Some(record) = record {
1973            let p = self.persistence.as_mut().expect("checked above");
1974            let seq = p.wal.enqueue(&record);
1975            ticket = Some(WalTicket {
1976                group: Arc::clone(&p.wal),
1977                seq,
1978            });
1979            if p.wal.written_len() >= p.checkpoint_threshold_bytes {
1980                self.checkpoint()?;
1981            }
1982        }
1983        Ok(ticket)
1984    }
1985
1986    /// v7.3.0 — typed-row variant of [`Database::query`]. Each
1987    /// row decodes into a `T: FromSpgRow` so callers don't
1988    /// pattern-match on `Value` themselves. Use [`spg_row!`] to
1989    /// generate the impl, or write it by hand.
1990    pub fn query_typed<T: FromSpgRow>(&mut self, sql: &str) -> Result<Vec<T>, EngineError> {
1991        let rows = self.query(sql)?;
1992        rows.into_iter().map(|r| T::from_spg_row(&r)).collect()
1993    }
1994
1995    /// Run a SELECT and return rows as a `Vec<Vec<Value>>` —
1996    /// strips the column-schema metadata for read-side
1997    /// ergonomics. Errors on non-Rows results (DML / DDL
1998    /// statements should go through `execute` instead).
1999    pub fn query(&mut self, sql: &str) -> Result<Vec<Vec<Value>>, EngineError> {
2000        match self.engine.execute(sql)? {
2001            QueryResult::Rows { rows, .. } => Ok(rows.into_iter().map(|r| r.values).collect()),
2002            QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
2003                "query() expects a SELECT — use execute() for DML/DDL".into(),
2004            )),
2005            // v7.5.0 — QueryResult is #[non_exhaustive]; any future
2006            // variant is not a SELECT row stream, treat as Unsupported.
2007            _ => Err(EngineError::Unsupported(
2008                "query() expects a SELECT — use execute() for DML/DDL".into(),
2009            )),
2010        }
2011    }
2012
2013    /// v7.16.0 — column-aware variant of [`Self::query`].
2014    /// Returns the column schema vec alongside the rows so
2015    /// adapters (the spg-sqlx Row impl most notably) can drive
2016    /// name + type-based column lookups. Errors on non-Rows
2017    /// results identically to `query`.
2018    pub fn query_with_columns(
2019        &mut self,
2020        sql: &str,
2021    ) -> Result<(Vec<spg_storage::ColumnSchema>, Vec<Vec<Value>>), EngineError> {
2022        match self.engine.execute(sql)? {
2023            QueryResult::Rows { columns, rows } => {
2024                Ok((columns, rows.into_iter().map(|r| r.values).collect()))
2025            }
2026            QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
2027                "query_with_columns() expects a SELECT — use execute() for DML/DDL".into(),
2028            )),
2029            _ => Err(EngineError::Unsupported(
2030                "query_with_columns() expects a SELECT — use execute() for DML/DDL".into(),
2031            )),
2032        }
2033    }
2034
2035    /// v7.16.0 — column-aware variant of
2036    /// [`Self::query_prepared`]. Same shape as
2037    /// `query_with_columns` but driven from a prepared
2038    /// statement + bound params.
2039    pub fn query_prepared_with_columns(
2040        &mut self,
2041        stmt: &Statement,
2042        params: &[Value],
2043    ) -> Result<(Vec<spg_storage::ColumnSchema>, Vec<Vec<Value>>), EngineError> {
2044        match self.engine.execute_prepared(stmt.stmt.clone(), params)? {
2045            QueryResult::Rows { columns, rows } => {
2046                Ok((columns, rows.into_iter().map(|r| r.values).collect()))
2047            }
2048            QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
2049                "query_prepared_with_columns() expects a SELECT — use execute_prepared() for DML/DDL".into(),
2050            )),
2051            _ => Err(EngineError::Unsupported(
2052                "query_prepared_with_columns() expects a SELECT — use execute_prepared() for DML/DDL".into(),
2053            )),
2054        }
2055    }
2056
2057    /// Borrow the underlying engine. Escape hatch for callers
2058    /// that need access to `spg-engine` APIs not yet surfaced
2059    /// here (transactions, EXPLAIN ANALYZE, etc.).
2060    #[must_use]
2061    pub const fn engine(&self) -> &Engine {
2062        &self.engine
2063    }
2064
2065    /// Mutable borrow of the underlying engine. Same intent as
2066    /// `engine()` but for write-side APIs (e.g. inserting
2067    /// directly through `Catalog::insert` for high-throughput
2068    /// bulk loads that bypass SQL parsing).
2069    pub const fn engine_mut(&mut self) -> &mut Engine {
2070        &mut self.engine
2071    }
2072
2073    /// v7.16.0 — parse + plan a SQL string ONCE so subsequent
2074    /// `execute_prepared` / `query_prepared` calls can re-bind
2075    /// parameters without re-parsing. The returned [`Statement`]
2076    /// is a thin handle around the AST + cached source SQL; it's
2077    /// `Clone` so the same plan can drive many bind calls
2078    /// concurrently (each call clones the AST and runs
2079    /// placeholder substitution on the clone — the cached
2080    /// plan stays intact).
2081    ///
2082    /// Plan caching follows the engine's existing version-aware
2083    /// rule: a prepared `Statement` whose statistics version
2084    /// has rolled (ANALYZE ran between prepare and execute)
2085    /// will silently re-prepare under the hood. Callers don't
2086    /// need to detect this.
2087    ///
2088    /// Placeholders in the SQL use PG's `$1`, `$2`, … convention.
2089    /// `bind`-time `Value`s are passed as a slice; arity
2090    /// mismatches surface as `EvalError::PlaceholderOutOfRange`
2091    /// at `execute_prepared` time, not here.
2092    ///
2093    /// # Errors
2094    /// Surfaces `EngineError` (parse error / plan rewrite
2095    /// failure) from the underlying `Engine::prepare`.
2096    pub fn prepare(&mut self, sql: &str) -> Result<Statement, EngineError> {
2097        // Use the cached path so repeated prepares of the same
2098        // SQL are O(1). The engine's plan cache stays shared
2099        // across all callers of this Database — a single
2100        // `PgPool`-shaped consumer (or, later, the spg-sqlx
2101        // adapter) prepares once and reaps the win on every bind.
2102        let stmt = self
2103            .engine
2104            .prepare_cached(sql)
2105            .map_err(EngineError::Parse)?;
2106        Ok(Statement {
2107            stmt,
2108            sql: sql.to_string(),
2109        })
2110    }
2111
2112    /// v7.17.0 Phase 3.P0-66 — describe a SQL string without
2113    /// executing. Returns `(parameter_oid_count, output_columns)`
2114    /// where `output_columns` is empty for non-SELECT statements
2115    /// or for SELECT shapes the describe planner can't resolve
2116    /// (JOIN / subquery / unknown table). Wraps
2117    /// `Engine::describe_prepared` so the spg-sqlx bridge can
2118    /// surface PG-shape Describe replies for
2119    /// `sqlx::query!()` compile-time validation.
2120    ///
2121    /// # Errors
2122    /// Propagates parse errors from the underlying prepare path.
2123    pub fn describe(&mut self, sql: &str) -> Result<(Vec<u32>, Vec<ColumnSchema>), EngineError> {
2124        let stmt = self
2125            .engine
2126            .prepare_cached(sql)
2127            .map_err(EngineError::Parse)?;
2128        Ok(self.engine.describe_prepared(&stmt))
2129    }
2130
2131    /// v7.16.0 — execute a prepared statement with bound
2132    /// parameters. Mirrors `Engine::execute_prepared`: clones
2133    /// the AST, substitutes `$1..$N` → `params[0..N-1]`, runs.
2134    ///
2135    /// Persistence (WAL fsync + auto-checkpoint) follows the
2136    /// same rules as `execute(sql)`: mutating statements get a
2137    /// WAL record AFTER the in-memory exec succeeds. The WAL
2138    /// record carries the substituted, bind-final SQL, so
2139    /// replay reconstructs the same row state without needing
2140    /// the original prepared `Statement` to still be alive.
2141    ///
2142    /// # Errors
2143    /// Propagates engine errors. Param arity mismatch surfaces
2144    /// as `EvalError::PlaceholderOutOfRange`.
2145    pub fn execute_prepared(
2146        &mut self,
2147        stmt: &Statement,
2148        params: &[Value],
2149    ) -> Result<QueryResult, EngineError> {
2150        let (result, ticket) = self.execute_prepared_buffered(stmt, params)?;
2151        if let Some(t) = ticket {
2152            t.wait()?;
2153        }
2154        Ok(result)
2155    }
2156
2157    /// v7.20 P2 — group-commit variant of
2158    /// [`Database::execute_prepared`]. Same contract as
2159    /// [`Database::execute_buffered`]: mutation + enqueue happen
2160    /// here; the caller waits on the ticket AFTER releasing
2161    /// whatever lock guards this `Database`.
2162    ///
2163    /// # Errors
2164    /// Engine errors propagate unchanged; inline auto-checkpoint
2165    /// may surface IO errors.
2166    pub fn execute_prepared_buffered(
2167        &mut self,
2168        stmt: &Statement,
2169        params: &[Value],
2170    ) -> Result<(QueryResult, Option<WalTicket>), EngineError> {
2171        let result = self.engine.execute_prepared(stmt.stmt.clone(), params)?;
2172        let modified = matches!(
2173            &result,
2174            QueryResult::CommandOk {
2175                modified_catalog: true,
2176                ..
2177            }
2178        );
2179        // WAL persistence on the bind-final SQL. Build the
2180        // canonical Display form by re-printing the
2181        // placeholder-substituted statement (cheap — the AST
2182        // is already in hand from execute_prepared's internal
2183        // clone) so replay's path is identical to the
2184        // simple-query path. v7.21: also when a transaction is
2185        // open — in-tx mutations report `modified_catalog: false`
2186        // but must reach the tx WAL buffer (see `wal_after_ok`).
2187        let mut ticket = None;
2188        if self.persistence.is_some()
2189            && (modified
2190                || (self.tx_wal.is_some() && !sql_is_read_only(&stmt.sql))
2191                || tx_control_kind(&stmt.sql).is_some())
2192        {
2193            let mut wal_stmt = stmt.stmt.clone();
2194            crate::wal_render_with_params(&mut wal_stmt, params);
2195            let canonical = format!("{wal_stmt}");
2196            ticket = self.wal_after_ok(&canonical, modified)?;
2197        }
2198        Ok((result, ticket))
2199    }
2200
2201    /// v7.16.0 — run a prepared SELECT with bound params and
2202    /// return rows as `Vec<Vec<Value>>`, matching `query()`
2203    /// shape. SELECTs are read-only so this never writes the
2204    /// WAL.
2205    ///
2206    /// # Errors
2207    /// Returns `Unsupported` if the prepared statement isn't a
2208    /// SELECT (use `execute_prepared` for DML/DDL).
2209    pub fn query_prepared(
2210        &mut self,
2211        stmt: &Statement,
2212        params: &[Value],
2213    ) -> Result<Vec<Vec<Value>>, EngineError> {
2214        match self.engine.execute_prepared(stmt.stmt.clone(), params)? {
2215            QueryResult::Rows { rows, .. } => Ok(rows.into_iter().map(|r| r.values).collect()),
2216            QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
2217                "query_prepared() expects a SELECT — use execute_prepared() for DML/DDL".into(),
2218            )),
2219            _ => Err(EngineError::Unsupported(
2220                "query_prepared() expects a SELECT — use execute_prepared() for DML/DDL".into(),
2221            )),
2222        }
2223    }
2224
2225    /// v7.18 — parse + plan a SQL string against a
2226    /// `CatalogSnapshot`. Mirror of [`Database::prepare`] for the
2227    /// readonly fan-out path: no writer lock taken, no WAL write,
2228    /// no plan-cache mutation. Static-on-`Self` so callers can
2229    /// dispatch against a snapshot without an `&mut Database`
2230    /// borrow — `AsyncReadHandle::prepare` in spg-embedded-tokio
2231    /// is the load-bearing consumer.
2232    ///
2233    /// # Errors
2234    /// Propagates `EngineError::Parse` from the parser.
2235    pub fn prepare_on_snapshot(
2236        snapshot: &CatalogSnapshot,
2237        sql: &str,
2238    ) -> Result<Statement, EngineError> {
2239        let stmt =
2240            spg_engine::Engine::prepare_on_snapshot(snapshot, sql).map_err(EngineError::Parse)?;
2241        Ok(Statement {
2242            stmt,
2243            sql: sql.to_string(),
2244        })
2245    }
2246
2247    /// v7.18 — execute a prepared `Statement` against a
2248    /// `CatalogSnapshot` with bound params. Mirror of
2249    /// [`Database::execute_prepared`] on the readonly path:
2250    /// writes / DDL hit `EngineError::WriteRequired`. No WAL
2251    /// write, no writer lock, multiple snapshots can run
2252    /// concurrently — the snapshot is immutable from prepare time.
2253    ///
2254    /// # Errors
2255    /// Surfaces `EngineError::WriteRequired` for non-readonly
2256    /// statements; propagates other engine errors.
2257    pub fn execute_prepared_on_snapshot(
2258        snapshot: &CatalogSnapshot,
2259        stmt: &Statement,
2260        params: &[Value],
2261    ) -> Result<QueryResult, EngineError> {
2262        spg_engine::Engine::execute_readonly_prepared_on_snapshot(
2263            snapshot,
2264            stmt.stmt.clone(),
2265            params,
2266        )
2267    }
2268
2269    /// v7.28 (round-22) — deadline-bounded variant of
2270    /// [`Database::execute_prepared_on_snapshot`]. Returns
2271    /// `EngineError::Cancelled` once the budget elapses; the
2272    /// sqlx driver uses this to keep readonly-INLINE execution
2273    /// from monopolising the caller's async runtime (four slow
2274    /// inbox queries saturated mailrs's whole tokio pool) and
2275    /// re-runs over the blocking pool on timeout.
2276    ///
2277    /// # Errors
2278    /// `EngineError::Cancelled` on budget expiry; engine errors
2279    /// otherwise.
2280    pub fn execute_prepared_on_snapshot_with_budget(
2281        snapshot: &CatalogSnapshot,
2282        stmt: &Statement,
2283        params: &[Value],
2284        budget_us: u64,
2285    ) -> Result<QueryResult, EngineError> {
2286        fn mono_now_us() -> u64 {
2287            use std::time::{SystemTime, UNIX_EPOCH};
2288            // Monotonic enough for a per-call relative budget: the
2289            // engine only compares (now - start) against the budget
2290            // within one call.
2291            SystemTime::now()
2292                .duration_since(UNIX_EPOCH)
2293                .map(|d| u64::try_from(d.as_micros()).unwrap_or(u64::MAX))
2294                .unwrap_or(0)
2295        }
2296        let deadline = mono_now_us().saturating_add(budget_us);
2297        let token = spg_engine::CancelToken::none().with_deadline(mono_now_us, deadline);
2298        spg_engine::Engine::execute_readonly_prepared_on_snapshot_with_cancel(
2299            snapshot,
2300            stmt.stmt.clone(),
2301            params,
2302            token,
2303        )
2304    }
2305
2306    /// v7.18 — describe a SQL string against a
2307    /// `CatalogSnapshot`. Mirror of [`Database::describe`] on
2308    /// the readonly path. Pure function on the snapshot's
2309    /// catalog; safe to call from any thread.
2310    ///
2311    /// # Errors
2312    /// Propagates `EngineError::Parse` from the parser.
2313    pub fn describe_on_snapshot(
2314        snapshot: &CatalogSnapshot,
2315        sql: &str,
2316    ) -> Result<(Vec<u32>, Vec<ColumnSchema>), EngineError> {
2317        let stmt =
2318            spg_engine::Engine::prepare_on_snapshot(snapshot, sql).map_err(EngineError::Parse)?;
2319        Ok(spg_engine::Engine::describe_prepared_on_snapshot(
2320            snapshot, &stmt,
2321        ))
2322    }
2323
2324    /// v7.21 (round-12 polish) — run a multi-statement SQL script
2325    /// with PG simple-query semantics: the statements execute in
2326    /// order inside ONE implicit transaction, so a mid-script error
2327    /// rolls back the whole script (PG wraps every simple-query
2328    /// message in an implicit transaction). Three exceptions, all
2329    /// PG-faithful:
2330    ///
2331    /// - a script that carries its OWN transaction control
2332    ///   (BEGIN / COMMIT / …) runs statement-by-statement — the
2333    ///   script owns its boundaries;
2334    /// - a script run while the caller already has a transaction
2335    ///   open joins that transaction (no nested BEGIN), and the
2336    ///   caller's COMMIT / ROLLBACK decides its fate;
2337    /// - a single-statement script is plain auto-commit.
2338    ///
2339    /// Returns one `QueryResult` per executed statement. This is the
2340    /// engine behind `sqlx::raw_sql` (mailrs feeds whole
2341    /// `init-schema.sql` files through it) and `spgctl import`.
2342    ///
2343    /// # Errors
2344    /// The first failing statement's error propagates after the
2345    /// implicit ROLLBACK; nothing from the script remains applied.
2346    pub fn execute_script(&mut self, sql: &str) -> Result<Vec<QueryResult>, EngineError> {
2347        let stmts = split_statements(sql);
2348        let script_owns_tx = stmts.iter().any(|s| tx_control_kind(s).is_some());
2349        let wrap = stmts.len() > 1 && !script_owns_tx && !self.engine.in_transaction();
2350        if !wrap {
2351            let mut out = Vec::with_capacity(stmts.len());
2352            for stmt in &stmts {
2353                out.push(self.execute_dump_statement(stmt)?);
2354            }
2355            return Ok(out);
2356        }
2357        self.execute("BEGIN")?;
2358        let mut out = Vec::with_capacity(stmts.len());
2359        for stmt in &stmts {
2360            match self.execute_dump_statement(stmt) {
2361                Ok(r) => out.push(r),
2362                Err(e) => {
2363                    // Best-effort rollback; surface the script error.
2364                    let _ = self.execute("ROLLBACK");
2365                    return Err(e);
2366                }
2367            }
2368        }
2369        self.execute("COMMIT")?;
2370        Ok(out)
2371    }
2372
2373    /// v7.22 (round-13 T2) — execute one `split_statements` chunk,
2374    /// lowering a `COPY … FROM stdin;` block (statement + its data
2375    /// lines, as one chunk) to per-row INSERTs through the shared
2376    /// `spg_engine::copy` helpers. Default-format pg_dump emits
2377    /// COPY blocks, so the zero-change import promise needs this on
2378    /// the embed path; non-COPY statements pass straight through to
2379    /// [`Self::execute`]. Public so `spgctl import` can keep its
2380    /// per-statement error indexing while sharing the lowering.
2381    ///
2382    /// # Errors
2383    /// Engine errors propagate; for COPY the failing row's INSERT
2384    /// error carries the synthesized statement context.
2385    pub fn execute_dump_statement(&mut self, stmt: &str) -> Result<QueryResult, EngineError> {
2386        // Strip pg_dump's `-- Data for Name: …;` banner (it carries
2387        // semicolons of its own) before splitting head from data.
2388        let stmt_clean = strip_leading_sql_noise(stmt);
2389        let head_is_copy = stmt_clean
2390            .get(..4)
2391            .is_some_and(|p| p.eq_ignore_ascii_case("copy"));
2392        if head_is_copy
2393            && let Some((head, data)) = stmt_clean.split_once(';')
2394            && let Some(spec) = spg_engine::copy::parse_copy_from_stdin_head(head)
2395        {
2396            let mut affected: usize = 0;
2397            for line in data.lines() {
2398                // Empty fragments only occur at the chunk boundary
2399                // (the remainder of the COPY line right after `;`);
2400                // data rows are whole non-empty lines.
2401                let line = line.strip_suffix('\r').unwrap_or(line);
2402                if line.is_empty() {
2403                    continue;
2404                }
2405                let values = spg_engine::copy::decode_copy_text_row(line);
2406                let insert = spg_engine::copy::build_copy_insert(
2407                    &spec.table,
2408                    spec.columns.as_deref(),
2409                    &values,
2410                );
2411                match self.execute(&insert)? {
2412                    QueryResult::CommandOk { affected: n, .. } => affected += n,
2413                    _ => affected += 1,
2414                }
2415            }
2416            return Ok(QueryResult::CommandOk {
2417                affected,
2418                modified_catalog: false,
2419            });
2420        }
2421        self.execute(stmt)
2422    }
2423
2424    /// v7.2.0 — run `body` inside an implicit `BEGIN` /
2425    /// `COMMIT` pair. The body receives `&mut Database` so it
2426    /// can `execute()` / `query()` like any other code path;
2427    /// the only difference is that every write in the body
2428    /// lands inside one transaction, and a returned `Err` from
2429    /// the body triggers `ROLLBACK` before the error propagates.
2430    ///
2431    /// Nested calls are not supported — SPG's transaction
2432    /// model is single-writer with explicit `BEGIN` /
2433    /// `COMMIT` / `ROLLBACK`, and a nested `with_transaction`
2434    /// would hit `EngineError::Unsupported("nested
2435    /// transaction")` at the inner `BEGIN`.
2436    pub fn with_transaction<R, F>(&mut self, body: F) -> Result<R, EngineError>
2437    where
2438        F: FnOnce(&mut Self) -> Result<R, EngineError>,
2439    {
2440        self.execute("BEGIN")?;
2441        match body(self) {
2442            Ok(value) => {
2443                self.execute("COMMIT")?;
2444                Ok(value)
2445            }
2446            Err(e) => {
2447                // Best-effort rollback. If ROLLBACK itself
2448                // fails (rare — the engine reports it via
2449                // `Unsupported` only when there's no active
2450                // TX, which can't happen here) we surface the
2451                // original body error, not the rollback error.
2452                let _ = self.execute("ROLLBACK");
2453                Err(e)
2454            }
2455        }
2456    }
2457}
2458
2459impl Default for Database {
2460    fn default() -> Self {
2461        Self::open_in_memory()
2462    }
2463}
2464
2465/// v7.7.5 — observability snapshot returned by
2466/// [`Database::metrics`]. Plain data, no allocations beyond
2467/// what the struct itself takes; cheap to construct and
2468/// cheap to serialise.
2469#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2470#[non_exhaustive]
2471pub struct EmbeddedMetrics {
2472    /// Total live row count across every user table (hot
2473    /// tier only — cold-tier rows live in segment files).
2474    pub hot_rows: u64,
2475    /// Sum of `Table::hot_bytes` across every user table.
2476    /// Tracks against the freezer's `hot_tier_bytes` budget.
2477    pub hot_bytes: u64,
2478    /// Number of cold-tier segments registered in the catalog.
2479    /// Includes tombstoned slots (segments retired by
2480    /// compaction whose disk file may still be on disk).
2481    pub cold_segments: u64,
2482    /// User-table count (excludes any future engine-managed
2483    /// internal tables).
2484    pub tables: u64,
2485    /// WAL size at last `execute()` / `checkpoint()`. Zero
2486    /// when the database is in-memory.
2487    pub wal_bytes: u64,
2488    /// `true` when the database was opened with `open_path` —
2489    /// i.e. WAL + checkpoint persistence is active.
2490    pub persistent: bool,
2491}
2492
2493/// v7.2.1 — handle returned by `spawn_background_freezer`.
2494/// Drop signals the worker thread to wind down + joins it,
2495/// so a `Database` (or its shared `Arc<Mutex<Database>>`)
2496/// can safely drop after the handle does.
2497#[must_use = "the background freezer keeps running until this handle is dropped"]
2498#[derive(Debug)]
2499pub struct FreezerHandle {
2500    shutdown: Arc<AtomicBool>,
2501    join: Option<JoinHandle<()>>,
2502}
2503
2504impl FreezerHandle {
2505    /// v7.2.1 — request the worker stop + join. Idempotent;
2506    /// safe to call from `Drop` (which also calls it).
2507    pub fn stop(&mut self) {
2508        self.shutdown.store(true, Ordering::Release);
2509        if let Some(h) = self.join.take() {
2510            let _ = h.join();
2511        }
2512    }
2513}
2514
2515impl Drop for FreezerHandle {
2516    fn drop(&mut self) {
2517        self.stop();
2518    }
2519}
2520
2521/// v7.2.1 — knobs for `Database::spawn_background_freezer`.
2522#[derive(Debug, Clone)]
2523pub struct FreezerOptions {
2524    /// Tick interval. Worker wakes every `tick`, checks the
2525    /// catalog's `hot_tier_bytes`, and freezes if over budget.
2526    pub tick: Duration,
2527    /// Hot-tier byte budget. Exceeded → next tick freezes the
2528    /// largest table's oldest `batch_rows` rows into a new
2529    /// cold segment.
2530    pub hot_tier_bytes: u64,
2531    /// Max rows the freezer demotes per fire.
2532    pub batch_rows: usize,
2533    /// v7.7.4 — auto-compact threshold. When the catalog has
2534    /// at least this many cold segments across all tables, the
2535    /// freezer fires a compaction pass after its next freeze.
2536    /// Set to `usize::MAX` to disable auto-compact entirely;
2537    /// the default is `64`, matching the `spg-server` operating
2538    /// point for SPG_COLD_COMPACT_SEGMENT_THRESHOLD.
2539    pub compact_when_segments_exceed: usize,
2540    /// v7.7.4 — target segment size for compaction merges,
2541    /// in bytes. Default 64 MiB, mirroring `spg-server`. Small
2542    /// segments below this size are merge candidates;
2543    /// segments at or above stay untouched.
2544    pub compact_target_bytes: u64,
2545}
2546
2547impl Default for FreezerOptions {
2548    fn default() -> Self {
2549        // Match the `spg-server` freezer's default operating
2550        // point (SPG_HOT_TIER_BYTES = 4 GiB, batch 1000 rows,
2551        // tick every 1 s) so embedded behaviour is predictable
2552        // for operators familiar with the server.
2553        Self {
2554            tick: Duration::from_secs(1),
2555            hot_tier_bytes: 4 * 1024 * 1024 * 1024,
2556            batch_rows: 1000,
2557            compact_when_segments_exceed: 64,
2558            compact_target_bytes: 64 * 1024 * 1024,
2559        }
2560    }
2561}
2562
2563impl Database {
2564    /// v7.7.4 — observe the catalog's cold-segment count.
2565    /// Useful for tests + dashboards that want to verify
2566    /// auto-compaction is firing.
2567    #[must_use]
2568    pub fn cold_segment_count(&self) -> usize {
2569        self.engine.catalog().cold_segment_count()
2570    }
2571
2572    /// v7.7.5 — observability snapshot. Returns a point-in-time
2573    /// view of the engine + persistence counters. Cheap (no
2574    /// locks beyond the existing `&self` borrow), so safe to
2575    /// call from a hot metrics-scrape path.
2576    ///
2577    /// Fields mirror the operational dashboard
2578    /// [`spg-server`](https://crates.io/crates/spg-server) exposes,
2579    /// minus the network counters that don't apply to embedded.
2580    #[must_use]
2581    pub fn metrics(&self) -> EmbeddedMetrics {
2582        let cat = self.engine.catalog();
2583        let mut hot_rows: u64 = 0;
2584        let mut hot_bytes: u64 = 0;
2585        for name in cat.table_names() {
2586            if let Some(t) = cat.get(&name) {
2587                hot_rows = hot_rows.saturating_add(t.row_count() as u64);
2588                hot_bytes = hot_bytes.saturating_add(t.hot_bytes());
2589            }
2590        }
2591        let (wal_bytes, persistent) = match &self.persistence {
2592            Some(p) => (p.wal.written_len(), true),
2593            None => (0, false),
2594        };
2595        EmbeddedMetrics {
2596            hot_rows,
2597            hot_bytes,
2598            cold_segments: cat.cold_segment_count() as u64,
2599            tables: cat.table_count() as u64,
2600            wal_bytes,
2601            persistent,
2602        }
2603    }
2604
2605    /// v7.2.1 — spawn a background thread that periodically
2606    /// runs `freeze_oldest_to_cold` when the catalog-wide hot
2607    /// tier exceeds `opts.hot_tier_bytes`. The `Arc<Mutex<_>>`
2608    /// pattern matches the v7.2 sharing story: callers wrap
2609    /// their `Database` in `Arc::new(Mutex::new(db))` once,
2610    /// then clone the Arc for the worker + for foreground
2611    /// access. Return value is a handle whose `Drop` joins the
2612    /// worker.
2613    ///
2614    /// Picks the freeze target the same way `spg-server`'s
2615    /// freezer does: largest-`hot_bytes` user table with at
2616    /// least one BTree integer-PK index. Tables without a
2617    /// freezable index are skipped silently.
2618    pub fn spawn_background_freezer(
2619        db: Arc<Mutex<Database>>,
2620        opts: FreezerOptions,
2621    ) -> FreezerHandle {
2622        let shutdown = Arc::new(AtomicBool::new(false));
2623        let shutdown_for_thread = Arc::clone(&shutdown);
2624        let join = thread::Builder::new()
2625            .name("spg-embedded-freezer".into())
2626            .spawn(move || {
2627                background_freezer_loop(db, opts, shutdown_for_thread);
2628            })
2629            .expect("spawn background freezer thread");
2630        FreezerHandle {
2631            shutdown,
2632            join: Some(join),
2633        }
2634    }
2635}
2636
2637/// v7.2.1 — the freezer's main loop, factored out so the
2638/// `Database::spawn_background_freezer` path stays readable.
2639fn background_freezer_loop(
2640    db: Arc<Mutex<Database>>,
2641    opts: FreezerOptions,
2642    shutdown: Arc<AtomicBool>,
2643) {
2644    // Sleep in short slices so a shutdown request resolves
2645    // quickly (vs sleeping the full tick).
2646    let slice = Duration::from_millis(50.min(opts.tick.as_millis() as u64));
2647    let mut last_tick = std::time::Instant::now();
2648    loop {
2649        if shutdown.load(Ordering::Acquire) {
2650            return;
2651        }
2652        thread::sleep(slice);
2653        if last_tick.elapsed() < opts.tick {
2654            continue;
2655        }
2656        last_tick = std::time::Instant::now();
2657        let Ok(mut guard) = db.lock() else {
2658            return;
2659        };
2660        if guard.engine.catalog().hot_tier_bytes() <= opts.hot_tier_bytes {
2661            continue;
2662        }
2663        let Some((table, index)) = pick_freeze_target(&guard) else {
2664            continue;
2665        };
2666        let row_count = guard
2667            .engine
2668            .catalog()
2669            .get(&table)
2670            .map_or(0, spg_storage::Table::row_count);
2671        let to_freeze = opts.batch_rows.min(row_count);
2672        if to_freeze == 0 {
2673            continue;
2674        }
2675        if let Err(e) = guard.freeze_oldest_to_cold(&table, &index, to_freeze) {
2676            eprintln!("spg-embedded: background freeze on {table}.{index} failed: {e:?}");
2677            continue;
2678        }
2679        // v7.7.4 — auto-compact. If the catalog now carries
2680        // more cold segments than the configured threshold,
2681        // run a single compaction pass. Failures are reported
2682        // but don't kill the loop; the next tick will retry.
2683        let count = guard.engine.catalog().cold_segment_count();
2684        if count > opts.compact_when_segments_exceed {
2685            if let Err(e) = guard
2686                .engine
2687                .compact_cold_segments_with_target(opts.compact_target_bytes)
2688            {
2689                eprintln!(
2690                    "spg-embedded: background compact failed (segments={count}, \
2691                     threshold={}): {e:?}",
2692                    opts.compact_when_segments_exceed,
2693                );
2694            }
2695        }
2696    }
2697}
2698
2699/// v7.2.1 — pick the highest-`hot_bytes` user table with a
2700/// BTree integer-PK index. Returns `(table, index_name)` so the
2701/// caller can dispatch through `freeze_oldest_to_cold`.
2702fn pick_freeze_target(db: &Database) -> Option<(String, String)> {
2703    let cat = db.engine.catalog();
2704    let mut best: Option<(String, String, u64)> = None;
2705    for name in cat.table_names() {
2706        let Some(t) = cat.get(&name) else { continue };
2707        if t.row_count() == 0 {
2708            continue;
2709        }
2710        let cols = &t.schema().columns;
2711        let Some(idx) = t.indices().iter().find(|i| {
2712            matches!(i.kind, spg_storage::IndexKind::BTree(_))
2713                && i.column_position < cols.len()
2714                && matches!(
2715                    cols[i.column_position].ty,
2716                    spg_storage::DataType::SmallInt
2717                        | spg_storage::DataType::Int
2718                        | spg_storage::DataType::BigInt
2719                )
2720        }) else {
2721            continue;
2722        };
2723        let hot = t.hot_bytes();
2724        match best {
2725            None => best = Some((name, idx.name.clone(), hot)),
2726            Some((_, _, best_hot)) if hot > best_hot => {
2727                best = Some((name, idx.name.clone(), hot));
2728            }
2729            _ => {}
2730        }
2731    }
2732    best.map(|(t, i, _)| (t, i))
2733}
2734
2735/// v7.7.6 — replay the first `to_seq` records of the WAL at
2736/// `wal_path` into a fresh engine and write the resulting
2737/// catalog snapshot to `out_db_path`. Same semantics as
2738/// `spg revert --wal … --to-seq N --out …` from the CLI:
2739///
2740///   - `to_seq == 0` → snapshot is the empty catalog
2741///   - WAL records beyond `to_seq` are not applied
2742///   - durability-checkpoint markers (v3 type 0x02) are
2743///     consumed without counting against the budget
2744///
2745/// Returns the number of statements actually applied
2746/// (`≤ to_seq`). The output snapshot is byte-identical to
2747/// what `Database::open_path(out_db_path)` would consume on
2748/// a subsequent open.
2749///
2750/// This is the "rewind" operator for an embedded database
2751/// that has been corrupted by a poison statement or a
2752/// half-applied migration. Pair with `cold_segment_paths`
2753/// preservation if your cold-tier files are still on disk.
2754///
2755/// # Errors
2756///
2757/// - `wal_path` unreadable or truncated mid-record
2758/// - WAL record decodes to invalid UTF-8 SQL
2759/// - WAL record's SQL is rejected by the engine
2760/// - `out_db_path` unwritable
2761pub fn revert_wal_to_seq(
2762    wal_path: impl AsRef<Path>,
2763    to_seq: u64,
2764    out_db_path: impl AsRef<Path>,
2765) -> Result<u64, EngineError> {
2766    // v7.19 — accept either a single-file legacy WAL (v7.18 and
2767    // earlier layout) or a chunked WAL directory (v7.19+). For a
2768    // directory, concatenate every `.wal` chunk in sorted order
2769    // — the same order open_path replays them in — so revert
2770    // sees the full record stream.
2771    let path = wal_path.as_ref();
2772    let wal_bytes = if path.is_dir() {
2773        let mut combined = Vec::new();
2774        let chunks = sorted_wal_chunks(path).map_err(io_err)?;
2775        for chunk in chunks {
2776            let bytes = std::fs::read(&chunk).map_err(io_err)?;
2777            combined.extend_from_slice(&bytes);
2778        }
2779        combined
2780    } else {
2781        std::fs::read(path).map_err(io_err)?
2782    };
2783    let mut engine = Engine::new();
2784    let mut applied = 0u64;
2785    let mut cur = 0usize;
2786    while cur < wal_bytes.len() && applied < to_seq {
2787        let (sql_bytes, total) = decode_wal_record(&wal_bytes[cur..])?;
2788        cur += total;
2789        if sql_bytes.is_empty() {
2790            continue;
2791        }
2792        let sql = core::str::from_utf8(&sql_bytes).map_err(|e| {
2793            EngineError::Storage(spg_storage::StorageError::Corrupt(format!(
2794                "WAL record at offset {cur}: non-UTF-8 SQL: {e}"
2795            )))
2796        })?;
2797        // v7.21 — tx-commit records carry a multi-statement script;
2798        // split_statements is a no-op for single-statement records.
2799        for stmt in split_statements(sql) {
2800            engine.execute(stmt)?;
2801        }
2802        applied += 1;
2803    }
2804    let snapshot = engine.snapshot();
2805    std::fs::write(out_db_path.as_ref(), &snapshot).map_err(io_err)?;
2806    Ok(applied)
2807}
2808
2809/// v7.7.6 — decode one WAL record from a byte tail. Returns
2810/// `(sql_bytes, header_plus_payload_len)`. Handles the three
2811/// on-disk formats (v1 / v2 / v3) the same way the CLI
2812/// `decode_one_record` and the engine's `replay_wal_bytes`
2813/// do. CRCs are not re-validated; the caller's intent is
2814/// "apply", not "validate".
2815fn decode_wal_record(tail: &[u8]) -> Result<(Vec<u8>, usize), EngineError> {
2816    if tail.len() < 4 {
2817        return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
2818            format!("WAL truncated record: {} < 4 header bytes", tail.len()),
2819        )));
2820    }
2821    let raw_len = u32::from_le_bytes(tail[..4].try_into().unwrap());
2822    let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
2823    let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
2824    let len_mask = if is_v3 {
2825        !(WAL_V2_SENTINEL | WAL_V3_FLAG)
2826    } else {
2827        !WAL_V2_SENTINEL
2828    };
2829    let rec_len = (raw_len & len_mask) as usize;
2830    let header_len = if is_v3 {
2831        9
2832    } else if is_v2 {
2833        8
2834    } else {
2835        4
2836    };
2837    if tail.len() < header_len + rec_len {
2838        return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
2839            format!(
2840                "WAL truncated record: header+payload {} > available {}",
2841                header_len + rec_len,
2842                tail.len()
2843            ),
2844        )));
2845    }
2846    if is_v3 {
2847        let type_byte = tail[8];
2848        // v3 type 0x01 = auto_commit_sql (payload = SQL).
2849        // v3 type 0x02 = durability marker (no SQL to apply).
2850        // v4 type 0x10 = auto_commit_sql with 16-byte (lsn, ts)
2851        //                prefix between type and SQL — strip
2852        //                the prefix so the caller still sees raw
2853        //                SQL bytes.
2854        // Anything else is unknown.
2855        if type_byte == WAL_V3_TYPE_AUTO_COMMIT_SQL {
2856            let payload = &tail[header_len..header_len + rec_len];
2857            return Ok((payload.to_vec(), header_len + rec_len));
2858        }
2859        if type_byte == WAL_V4_TYPE_AUTO_COMMIT_SQL || type_byte == WAL_V4_TYPE_TX_COMMIT_SQL {
2860            let v4_total = header_len + WAL_V4_EXTRA_HEADER + rec_len;
2861            if tail.len() < v4_total {
2862                return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
2863                    format!(
2864                        "WAL truncated v4 record: header+payload {v4_total} > available {}",
2865                        tail.len()
2866                    ),
2867                )));
2868            }
2869            let sql_start = header_len + WAL_V4_EXTRA_HEADER;
2870            let sql_bytes = tail[sql_start..sql_start + rec_len].to_vec();
2871            return Ok((sql_bytes, v4_total));
2872        }
2873        // Caller treats empty payload as a skip-marker.
2874        return Ok((Vec::new(), header_len + rec_len));
2875    }
2876    let payload = &tail[header_len..header_len + rec_len];
2877    Ok((payload.to_vec(), header_len + rec_len))
2878}
2879
2880impl Drop for Database {
2881    fn drop(&mut self) {
2882        // v7.1 — best-effort final checkpoint when a persistent
2883        // Database leaves scope. Failures here go to stderr so
2884        // operators see them, but Drop can't propagate errors —
2885        // the WAL itself is already durable, so a checkpoint
2886        // miss only means the next boot replays a few more
2887        // records than strictly necessary.
2888        if self.persistence.is_some() {
2889            if let Err(e) = self.checkpoint() {
2890                eprintln!(
2891                    "spg-embedded: final checkpoint on Drop failed: {e:?} \
2892                     (WAL is intact; next open_path will replay)"
2893                );
2894            }
2895        }
2896        // v7.19 P3 / v7.20 — signal the retention + flusher
2897        // threads to exit, then wait for them. Done BEFORE the
2898        // lock release so background threads don't outlive the
2899        // database handle. The flusher drains the pending batch
2900        // on its way out (final flush_now in the thread body),
2901        // so `SPG_SYNCHRONOUS_COMMIT=off` never loses confirmed
2902        // commits across a clean shutdown.
2903        if let Some(ctx) = self.persistence.as_mut() {
2904            if let Some(shutdown) = ctx.retention_shutdown.take() {
2905                shutdown.store(true, Ordering::SeqCst);
2906            }
2907            if let Some(handle) = ctx.retention_thread.take() {
2908                let _ = handle.join();
2909            }
2910            if let Some(shutdown) = ctx.flusher_shutdown.take() {
2911                shutdown.store(true, Ordering::SeqCst);
2912            }
2913            if let Some(handle) = ctx.flusher_thread.take() {
2914                let _ = handle.join();
2915            }
2916        }
2917        // v7.17.0 Phase 6.2 — release the cross-process lock on
2918        // clean shutdown. Failure is logged but never panics;
2919        // the operator can clear a stale lock via
2920        // `Database::force_unlock` if a crash kept the
2921        // directory around.
2922        if let Some(ctx) = &self.persistence
2923            && ctx.lock_path.exists()
2924        {
2925            // remove_dir_all: the lock dir carries the owner-pid
2926            // record since round-12.
2927            if let Err(e) = std::fs::remove_dir_all(&ctx.lock_path) {
2928                eprintln!(
2929                    "spg-embedded: lock release on Drop failed for {}: {e:?}",
2930                    ctx.lock_path.display()
2931                );
2932            }
2933        }
2934    }
2935}
2936
2937impl Database {
2938    /// v7.17.0 Phase 6.2 — clear a stale cross-process lock.
2939    /// Use when a previous process crashed mid-session and
2940    /// left `<db_path>.lock` behind. Operators should confirm
2941    /// no other process is currently using the database before
2942    /// calling this — SPG cannot fingerprint stale-vs-live
2943    /// without a libc dep, which would violate spg-embedded's
2944    /// zero-deps charter.
2945    pub fn force_unlock(db_path: impl AsRef<Path>) -> Result<(), EngineError> {
2946        let lock_path = {
2947            let mut p = db_path.as_ref().to_path_buf();
2948            let name = p
2949                .file_name()
2950                .map(|n| {
2951                    let mut s = n.to_os_string();
2952                    s.push(".lock");
2953                    s
2954                })
2955                .unwrap_or_else(|| std::ffi::OsString::from(".lock"));
2956            p.set_file_name(name);
2957            p
2958        };
2959        if !lock_path.exists() {
2960            return Ok(());
2961        }
2962        std::fs::remove_dir_all(&lock_path).map_err(io_err)
2963    }
2964}
2965
2966/// v7.1 — turn a `std::io::Error` into the workspace's
2967/// `EngineError` shape. `EngineError::Storage(Corrupt(_))` is
2968/// the closest existing variant — io failures during boot or
2969/// during a WAL append surface as a storage-layer fault to
2970/// callers, which keeps the public error enum unchanged.
2971fn io_err(e: std::io::Error) -> EngineError {
2972    EngineError::Storage(spg_storage::StorageError::Corrupt(format!("io: {e}")))
2973}
2974
2975/// v7.2.2 — `Database` is `Send`, so the recommended sharing
2976/// pattern for multi-threaded callers is `Arc<Mutex<Database>>`:
2977///
2978/// ```no_run
2979/// use std::sync::{Arc, Mutex};
2980/// use spg_embedded::Database;
2981///
2982/// let db = Database::open_in_memory();
2983/// let shared = Arc::new(Mutex::new(db));
2984/// let shared_for_worker = Arc::clone(&shared);
2985/// std::thread::spawn(move || {
2986///     let mut guard = shared_for_worker.lock().unwrap();
2987///     guard.execute("INSERT INTO t VALUES (1)").unwrap();
2988/// });
2989/// ```
2990///
2991/// Internal `RwLock`-wrapped state — letting many threads
2992/// hold concurrent `&Database` for `SELECT` without contending
2993/// — is parked as STABILITY § "Out of v7.2"; multi-reader
2994/// embedded throughput needs a planner-side change to release
2995/// the engine read lock between scans, which is the v7.x
2996/// "Choice A" line of work already documented in v6.9.1's
2997/// carve-out.
2998#[allow(dead_code)]
2999fn _database_is_send() {
3000    fn assert_send<T: Send>() {}
3001    assert_send::<Database>();
3002}
3003
3004/// v6.10.3 — trait that maps a row's columns onto a user
3005/// struct's fields. v7.3.0 ships the [`spg_row!`] declarative
3006/// macro that generates `impl FromSpgRow for YourStruct` from
3007/// a struct definition (no proc-macro, no syn/quote/
3008/// proc-macro2 deps — the workspace's "0 external deps"
3009/// policy holds).
3010///
3011/// Implementors map a row's columns onto a user struct's
3012/// fields. Errors surface as `EngineError::Unsupported` so the
3013/// caller's error type stays uniform.
3014pub trait FromSpgRow: Sized {
3015    /// Decode one query result row into `Self`. Called once per
3016    /// row by [`Database::query_typed`]. The slice length equals
3017    /// the number of columns in the SELECT projection.
3018    fn from_spg_row(row: &[Value]) -> Result<Self, EngineError>;
3019}
3020
3021/// v7.3.0 — declarative macro that generates `FromSpgRow` impl
3022/// for a user struct. Avoids proc-macro deps
3023/// (syn/quote/proc-macro2) so the workspace's 0-deps policy
3024/// holds; the trade-off vs `#[derive(SpgRow)]` is that the
3025/// macro takes the entire struct definition (fields + types)
3026/// as input rather than annotating an existing struct.
3027///
3028/// ```no_run
3029/// use spg_embedded::{Database, spg_row, FromSpgRow};
3030///
3031/// spg_row! {
3032///     pub struct User {
3033///         pub id: i32,
3034///         pub name: String,
3035///     }
3036/// }
3037///
3038/// let mut db = Database::open_in_memory();
3039/// db.execute("CREATE TABLE users (id INT NOT NULL, name TEXT)").unwrap();
3040/// db.execute("INSERT INTO users VALUES (1, 'alice')").unwrap();
3041/// let users: Vec<User> = db.query_typed("SELECT id, name FROM users").unwrap();
3042/// ```
3043///
3044/// Supported field types: `i16`, `i32`, `i64`, `f32`, `f64`,
3045/// `bool`, `String`, `Vec<f32>` (for `VECTOR(N)` columns),
3046/// `Option<T>` of any of the above.
3047#[macro_export]
3048macro_rules! spg_row {
3049    (
3050        $(#[$meta:meta])*
3051        $vis:vis struct $name:ident {
3052            $(
3053                $(#[$fmeta:meta])*
3054                $fvis:vis $field:ident : $ty:ty,
3055            )*
3056        }
3057    ) => {
3058        $(#[$meta])*
3059        #[derive(Debug, Clone)]
3060        $vis struct $name {
3061            $(
3062                $(#[$fmeta])*
3063                $fvis $field : $ty,
3064            )*
3065        }
3066
3067        impl $crate::FromSpgRow for $name {
3068            fn from_spg_row(row: &[$crate::Value]) -> ::core::result::Result<Self, $crate::EngineError> {
3069                let mut __spg_row_iter = row.iter();
3070                $(
3071                    let $field: $ty = {
3072                        let v = __spg_row_iter
3073                            .next()
3074                            .ok_or_else(|| $crate::EngineError::Unsupported(
3075                                ::std::format!(
3076                                    "spg_row! {}: missing column for field `{}`",
3077                                    ::core::stringify!($name),
3078                                    ::core::stringify!($field)
3079                                )
3080                            ))?;
3081                        <$ty as $crate::FromSpgValue>::from_spg_value(v)
3082                            .map_err(|e| $crate::EngineError::Unsupported(
3083                                ::std::format!(
3084                                    "spg_row! {}: column `{}`: {}",
3085                                    ::core::stringify!($name),
3086                                    ::core::stringify!($field),
3087                                    e
3088                                )
3089                            ))?
3090                    };
3091                )*
3092                Ok(Self { $($field,)* })
3093            }
3094        }
3095    };
3096}
3097
3098/// v7.3.0 — per-column decoder used by `spg_row!`. Surface
3099/// covers every numeric / text / bytes / bool variant in
3100/// `Value`, plus `Option<T>` for nullable columns.
3101pub trait FromSpgValue: Sized {
3102    /// Decode one cell into `Self`. The returned `&'static str`
3103    /// is a short diagnostic for type mismatches (e.g. `"expected
3104    /// integer, got TEXT"`); callers wrap it into their own
3105    /// error type.
3106    fn from_spg_value(v: &Value) -> Result<Self, &'static str>;
3107}
3108
3109macro_rules! impl_from_value_int {
3110    ($($t:ty),* $(,)?) => {
3111        $(
3112            impl FromSpgValue for $t {
3113                fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3114                    match v {
3115                        Value::SmallInt(n) => <$t>::try_from(*n).map_err(|_| "SmallInt does not fit target int type"),
3116                        Value::Int(n)      => <$t>::try_from(*n).map_err(|_| "Int does not fit target int type"),
3117                        Value::BigInt(n)   => <$t>::try_from(*n).map_err(|_| "BigInt does not fit target int type"),
3118                        Value::Null        => Err("NULL in non-Option int column"),
3119                        _ => Err("non-integer value in int column"),
3120                    }
3121                }
3122            }
3123        )*
3124    };
3125}
3126impl_from_value_int!(i16, i32, i64);
3127
3128impl FromSpgValue for f32 {
3129    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3130        match v {
3131            Value::Float(f) => Ok(*f as f32),
3132            Value::Null => Err("NULL in non-Option float column"),
3133            _ => Err("non-float value in float column"),
3134        }
3135    }
3136}
3137
3138impl FromSpgValue for f64 {
3139    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3140        match v {
3141            Value::Float(f) => Ok(*f),
3142            Value::Null => Err("NULL in non-Option float column"),
3143            _ => Err("non-float value in float column"),
3144        }
3145    }
3146}
3147
3148impl FromSpgValue for bool {
3149    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3150        match v {
3151            Value::Bool(b) => Ok(*b),
3152            Value::Null => Err("NULL in non-Option bool column"),
3153            _ => Err("non-bool value in bool column"),
3154        }
3155    }
3156}
3157
3158impl FromSpgValue for String {
3159    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3160        match v {
3161            Value::Text(s) => Ok(s.clone()),
3162            Value::Null => Err("NULL in non-Option text column"),
3163            _ => Err("non-text value in String column"),
3164        }
3165    }
3166}
3167
3168impl FromSpgValue for Vec<f32> {
3169    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3170        match v {
3171            Value::Vector(xs) => Ok(xs.clone()),
3172            Value::Null => Err("NULL in non-Option vector column"),
3173            _ => Err("non-vector value in Vec<f32> column"),
3174        }
3175    }
3176}
3177
3178impl<T: FromSpgValue> FromSpgValue for Option<T> {
3179    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3180        match v {
3181            Value::Null => Ok(None),
3182            other => T::from_spg_value(other).map(Some),
3183        }
3184    }
3185}
3186
3187/// Acquire the cross-process exclusion lock at `lock_path` (atomic
3188/// `mkdir`), recording the owner pid inside. If the lock already
3189/// exists, read the recorded pid and probe liveness — a lock left
3190/// behind by a killed process (docker SIGKILL, crash) is reclaimed
3191/// automatically instead of forcing the operator to delete it by
3192/// hand (mailrs embed round-12: a restarted server came up in
3193/// degraded mode because the previous instance's lock survived).
3194/// v7.27 (mailrs round-21 B) — the prober's environment identity:
3195/// `(hostname, boot-or-container id)`. A pid is only meaningful
3196/// inside the PID namespace that recorded it; mailrs's recovery
3197/// window saw "locked by pid 1" from a STOPPED container because
3198/// the prober's pid 1 (its own init) was alive. When the lock's
3199/// identity differs from ours, liveness is UNDECIDABLE and we
3200/// refuse honestly instead of guessing in either direction.
3201fn host_identity() -> (String, String) {
3202    let hostname = std::process::Command::new("hostname")
3203        .output()
3204        .ok()
3205        .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string())
3206        .unwrap_or_default();
3207    // Linux boot id; containers share the host kernel's boot id, so
3208    // hostname (= container id by default) is the namespace
3209    // discriminator and boot id catches host reboots / pid reuse.
3210    let boot_id = std::fs::read_to_string("/proc/sys/kernel/random/boot_id")
3211        .map(|s| s.trim().to_string())
3212        .or_else(|_| {
3213            std::process::Command::new("sysctl")
3214                .args(["-n", "kern.bootsessionuuid"])
3215                .output()
3216                .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string())
3217        })
3218        .unwrap_or_default();
3219    (hostname, boot_id)
3220}
3221
3222fn acquire_path_lock(lock_path: &Path) -> Result<(), EngineError> {
3223    for attempt in 0..2 {
3224        match std::fs::create_dir(lock_path) {
3225            Ok(()) => {
3226                // Best-effort owner record; liveness probing treats a
3227                // missing pid file as stale (crash between mkdir and
3228                // write is indistinguishable from an ancient lock).
3229                // v7.27 — lines 2+3 record the owner's environment
3230                // identity (hostname, boot id) so a prober in a
3231                // different namespace refuses instead of misreading
3232                // the pid.
3233                let (host, boot) = host_identity();
3234                let _ = std::fs::write(
3235                    lock_path.join("pid"),
3236                    format!("{}\n{host}\n{boot}\n", std::process::id()),
3237                );
3238                return Ok(());
3239            }
3240            Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists && attempt == 0 => {
3241                let record = std::fs::read_to_string(lock_path.join("pid")).unwrap_or_default();
3242                let mut lines = record.lines();
3243                let owner = lines.next().and_then(|s| s.trim().parse::<u32>().ok());
3244                let lock_host = lines.next().unwrap_or("").trim().to_string();
3245                let lock_boot = lines.next().unwrap_or("").trim().to_string();
3246                // v7.27 — identity check BEFORE the pid probe. A pid
3247                // recorded in another namespace is undecidable both
3248                // ways (a stale lock can look held, a held lock can
3249                // look stale — the unsafe direction). Old-format
3250                // locks (pid only) keep the legacy same-host
3251                // assumption.
3252                if !lock_host.is_empty() {
3253                    let (my_host, my_boot) = host_identity();
3254                    let same_env = lock_host == my_host
3255                        && (lock_boot.is_empty() || my_boot.is_empty() || lock_boot == my_boot);
3256                    if !same_env {
3257                        return Err(EngineError::Unsupported(format!(
3258                            "database lock {} was taken in a different host/container \
3259                             (owner: pid {} on {:?}; we are {:?}) — liveness is \
3260                             undecidable from here. If you are sure the owner is gone, \
3261                             call Database::force_unlock() or `spg import --force-unlock`.",
3262                            lock_path.display(),
3263                            owner.unwrap_or(0),
3264                            lock_host,
3265                            my_host
3266                        )));
3267                    }
3268                }
3269                let owner_alive = owner.is_some_and(pid_alive);
3270                if owner_alive {
3271                    return Err(EngineError::Unsupported(format!(
3272                        "database is locked by another process (pid {}): {}; \
3273                         stop that process first, or call Database::force_unlock()",
3274                        owner.unwrap_or(0),
3275                        lock_path.display()
3276                    )));
3277                }
3278                // Stale — owner pid dead or unrecorded. Reclaim.
3279                eprintln!(
3280                    "spg-embedded: reclaiming stale lock {} (owner pid {:?} not alive)",
3281                    lock_path.display(),
3282                    owner
3283                );
3284                std::fs::remove_dir_all(lock_path).map_err(io_err)?;
3285                // Loop retries the create_dir; a concurrent reclaimer
3286                // winning the race surfaces as AlreadyExists on
3287                // attempt 1 below.
3288            }
3289            Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
3290                return Err(EngineError::Unsupported(format!(
3291                    "database is locked by another process: {}; \
3292                     stop that process first, or call Database::force_unlock()",
3293                    lock_path.display()
3294                )));
3295            }
3296            Err(e) => return Err(io_err(e)),
3297        }
3298    }
3299    unreachable!("acquire_path_lock loop covers both attempts")
3300}
3301
3302/// Probe whether `pid` is a live process. Unix: `ps -p` via the
3303/// system binary (std-only — no libc dependency). `ps -p` exits 0
3304/// for ANY live pid regardless of owner; `kill -0` was rejected
3305/// here because it fails with EPERM on another user's live process,
3306/// which would read as "dead" and reclaim a held lock. Probe
3307/// failure (no `ps` binary, exec error) conservatively reports
3308/// alive so locks are never auto-reclaimed on doubt; non-unix
3309/// targets do the same.
3310#[cfg(unix)]
3311fn pid_alive(pid: u32) -> bool {
3312    match std::process::Command::new("ps")
3313        .arg("-p")
3314        .arg(pid.to_string())
3315        .stdout(std::process::Stdio::null())
3316        .stderr(std::process::Stdio::null())
3317        .status()
3318    {
3319        Ok(status) => status.success(),
3320        Err(_) => true,
3321    }
3322}
3323
3324#[cfg(not(unix))]
3325fn pid_alive(_pid: u32) -> bool {
3326    true
3327}
3328
3329/// Strip leading whitespace, `--` line comments and NON-conditional
3330/// block comments from a chunk so statement-head checks (COPY
3331/// detection most notably) see the first real token. pg_dump
3332/// prefixes every data block with a `-- Data for Name: …;` banner —
3333/// which itself contains semicolons, so head checks must run on the
3334/// stripped text. MySQL executable conditional comments (`/*!`) are
3335/// content and stay.
3336/// v7.22 — see `split_statements`' `mysql_escapes` tracking. Only
3337/// short chunks are inspected (the signal statements are one-liners;
3338/// COPY data blocks are skipped by the length guard).
3339fn note_dialect_signals(chunk: &str, mysql_escapes: &mut bool) {
3340    if chunk.len() > 4096 {
3341        return;
3342    }
3343    let lower = chunk.to_ascii_lowercase();
3344    if lower.contains("sql_mode") {
3345        *mysql_escapes = true;
3346    } else if lower.contains("standard_conforming_strings") {
3347        *mysql_escapes = lower.contains("off");
3348    }
3349}
3350
3351fn strip_leading_sql_noise(mut s: &str) -> &str {
3352    loop {
3353        let t = s.trim_start();
3354        if let Some(rest) = t.strip_prefix("--") {
3355            s = rest.split_once('\n').map_or("", |(_, r)| r);
3356            continue;
3357        }
3358        if t.starts_with("/*") && !t.starts_with("/*!") {
3359            match t.find("*/") {
3360                Some(e) => {
3361                    s = &t[e + 2..];
3362                    continue;
3363                }
3364                None => return "",
3365            }
3366        }
3367        return t;
3368    }
3369}
3370
3371/// Split a multi-statement SQL script into individual statements on
3372/// top-level `;`, honouring single-quoted strings (with `''`
3373/// escapes), double-quoted identifiers, dollar-quoted bodies
3374/// (`$tag$ … $tag$`), line comments (`--`) and MySQL executable
3375/// conditional comments (`/*!… */` stay statement content; plain
3376/// nested block comments don't). Chunks that contain no statement
3377/// content (whitespace / comments only) are dropped. PG's
3378/// simple-query protocol does this server-side; the embed path owns
3379/// it here.
3380///
3381/// v7.22 (mailrs round-13 gap 1) — psql meta-command lines are
3382/// dropped for client parity: a line whose first non-whitespace
3383/// byte is `\` BETWEEN statements (PG 18's pg_dump wraps scripts in
3384/// `\restrict` / `\unrestrict`) never reaches the parser, the same
3385/// way psql consumes `\`-lines client-side and never sends them. A
3386/// mid-statement backslash stays an ordinary byte — pg_dump only
3387/// emits meta-commands between statements.
3388pub fn split_statements(sql: &str) -> Vec<&str> {
3389    let bytes = sql.as_bytes();
3390    let mut stmts = Vec::new();
3391    let mut start = 0usize;
3392    let mut has_content = false;
3393    // v7.22 (round-13 T3) — stream-tracked string dialect, mirroring
3394    // the engine's session flag: a statement mentioning `sql_mode`
3395    // (mysqldump preamble, often inside `/*!…*/`) switches plain
3396    // strings to backslash-escape scanning;
3397    // `standard_conforming_strings` (pg_dump preamble) switches
3398    // back. Without this the scanner ends a MySQL `'…\'…'` literal
3399    // early and splits inside data.
3400    let mut mysql_escapes = false;
3401    let mut i = 0usize;
3402    while i < bytes.len() {
3403        match bytes[i] {
3404            b'\\' if !has_content => {
3405                // Start-of-statement `\` = psql meta-command line.
3406                // Consume through end-of-line; restart the chunk
3407                // after it so the line never lands in the output.
3408                while i < bytes.len() && bytes[i] != b'\n' {
3409                    i += 1;
3410                }
3411                start = if i < bytes.len() { i + 1 } else { i };
3412            }
3413            b'\'' => {
3414                has_content = true;
3415                // PG escape-string form `E'...'` honours backslash
3416                // escapes (`E'a\';b'` is ONE literal) — detect via
3417                // the immediately-preceding standalone E/e. MySQL
3418                // dialect sessions treat EVERY plain string that way.
3419                let escape_string = mysql_escapes
3420                    || (i >= 1
3421                        && matches!(bytes[i - 1], b'e' | b'E')
3422                        && !(i >= 2
3423                            && (bytes[i - 2].is_ascii_alphanumeric() || bytes[i - 2] == b'_')));
3424                i += 1;
3425                while i < bytes.len() {
3426                    if escape_string && bytes[i] == b'\\' {
3427                        // Skip the escaped byte (covers \' and \\).
3428                        i += 2;
3429                        continue;
3430                    }
3431                    if bytes[i] == b'\'' {
3432                        // `''` is an escaped quote inside the literal.
3433                        if i + 1 < bytes.len() && bytes[i + 1] == b'\'' {
3434                            i += 2;
3435                            continue;
3436                        }
3437                        break;
3438                    }
3439                    i += 1;
3440                }
3441            }
3442            b'"' => {
3443                has_content = true;
3444                i += 1;
3445                while i < bytes.len() && bytes[i] != b'"' {
3446                    i += 1;
3447                }
3448            }
3449            b'$' => {
3450                // Possible dollar-quote opener `$tag$` (tag may be
3451                // empty). If the shape doesn't match, it's a plain
3452                // `$` (positional param) — fall through.
3453                let tag_end = bytes[i + 1..]
3454                    .iter()
3455                    .position(|&b| !(b.is_ascii_alphanumeric() || b == b'_'))
3456                    .map(|off| i + 1 + off);
3457                if let Some(te) = tag_end
3458                    && te < bytes.len()
3459                    && bytes[te] == b'$'
3460                {
3461                    has_content = true;
3462                    let tag = &sql[i..=te];
3463                    // Find the closing `$tag$`.
3464                    if let Some(close) = sql[te + 1..].find(tag) {
3465                        i = te + 1 + close + tag.len();
3466                        continue;
3467                    }
3468                    // Unterminated — consume the rest; the parser
3469                    // will report it.
3470                    i = bytes.len();
3471                    continue;
3472                }
3473                has_content = true;
3474            }
3475            b'-' if i + 1 < bytes.len() && bytes[i + 1] == b'-' => {
3476                while i < bytes.len() && bytes[i] != b'\n' {
3477                    i += 1;
3478                }
3479            }
3480            b'/' if i + 1 < bytes.len() && bytes[i + 1] == b'*' => {
3481                // v7.22 (round-13 T3) — MySQL conditional comments
3482                // `/*!40101 … */` are EXECUTABLE (mysqldump wraps
3483                // its whole preamble + DISABLE KEYS hints in them);
3484                // they must stay statement content for the engine,
3485                // not be skipped as commentary.
3486                if i + 2 < bytes.len() && bytes[i + 2] == b'!' {
3487                    has_content = true;
3488                }
3489                let mut depth = 1usize;
3490                i += 2;
3491                while i < bytes.len() && depth > 0 {
3492                    if bytes[i] == b'/' && i + 1 < bytes.len() && bytes[i + 1] == b'*' {
3493                        depth += 1;
3494                        i += 2;
3495                    } else if bytes[i] == b'*' && i + 1 < bytes.len() && bytes[i + 1] == b'/' {
3496                        depth -= 1;
3497                        i += 2;
3498                    } else {
3499                        i += 1;
3500                    }
3501                }
3502                continue;
3503            }
3504            b';' => {
3505                if has_content {
3506                    let head = &sql[start..i];
3507                    // v7.22 (round-13 T2) — a `COPY … FROM stdin;`
3508                    // statement owns its following data block
3509                    // through the `\.` terminator line (data lines
3510                    // may contain `;`, so generic splitting would
3511                    // shred them). Swallow head + data into ONE
3512                    // chunk; `execute_script` lowers it to INSERTs.
3513                    // pg_dump prefixes the COPY with a comment
3514                    // banner — strip it before the head check.
3515                    let head_clean = strip_leading_sql_noise(head);
3516                    let is_copy_head = head_clean
3517                        .get(..4)
3518                        .is_some_and(|p| p.eq_ignore_ascii_case("copy"))
3519                        && spg_engine::copy::parse_copy_from_stdin_head(head_clean).is_some();
3520                    if is_copy_head {
3521                        // Scan whole lines after the ';' until the
3522                        // `\.` terminator (or EOF — torn dumps lose
3523                        // their tail, same as psql would error).
3524                        let mut j = i + 1;
3525                        let data_end;
3526                        loop {
3527                            if j >= bytes.len() {
3528                                data_end = bytes.len();
3529                                break;
3530                            }
3531                            let line_end = sql[j..].find('\n').map_or(bytes.len(), |off| j + off);
3532                            if sql[j..line_end].trim_end_matches('\r').trim() == "\\." {
3533                                data_end = j;
3534                                i = line_end; // bottom i += 1 skips \n
3535                                break;
3536                            }
3537                            j = line_end + 1;
3538                        }
3539                        stmts.push(&sql[start..data_end]);
3540                        if data_end == bytes.len() {
3541                            i = bytes.len();
3542                        }
3543                        start = i + 1;
3544                        has_content = false;
3545                        i += 1;
3546                        continue;
3547                    }
3548                    note_dialect_signals(head, &mut mysql_escapes);
3549                    stmts.push(head);
3550                }
3551                start = i + 1;
3552                has_content = false;
3553            }
3554            b => {
3555                if !b.is_ascii_whitespace() {
3556                    has_content = true;
3557                }
3558            }
3559        }
3560        i += 1;
3561    }
3562    if has_content {
3563        stmts.push(&sql[start..]);
3564    }
3565    stmts
3566}
3567
3568#[cfg(test)]
3569mod tests {
3570    use super::*;
3571
3572    #[test]
3573    fn split_statements_basic_and_trailing() {
3574        assert_eq!(
3575            split_statements("CREATE TABLE a (x INT); INSERT INTO a VALUES (1)"),
3576            vec!["CREATE TABLE a (x INT)", " INSERT INTO a VALUES (1)"]
3577        );
3578        // whitespace/comment-only chunks drop
3579        assert!(split_statements("  ;; -- nothing\n;").is_empty());
3580    }
3581
3582    #[test]
3583    fn split_statements_quoting_forms() {
3584        // ';' inside a plain literal, a doubled quote, an E-string
3585        // backslash escape, a quoted identifier, and a dollar-quoted
3586        // body must not split.
3587        let cases = [
3588            "INSERT INTO t VALUES ('a;b')",
3589            "INSERT INTO t VALUES ('it''s; fine')",
3590            r"INSERT INTO t VALUES (E'it\'s; fine')",
3591            "CREATE TABLE \"odd;name\" (x INT)",
3592            "DO $body$ BEGIN PERFORM 1; END $body$",
3593            "DO $$ SELECT 1; $$",
3594        ];
3595        for sql in cases {
3596            assert_eq!(split_statements(sql), vec![sql], "must stay whole: {sql}");
3597        }
3598        // ...and each still splits cleanly from a neighbour.
3599        for sql in cases {
3600            let script = format!("{sql};\nSELECT 2");
3601            assert_eq!(
3602                split_statements(&script),
3603                vec![sql, "\nSELECT 2"],
3604                "must split after: {sql}"
3605            );
3606        }
3607    }
3608
3609    #[test]
3610    fn split_statements_drops_psql_meta_lines() {
3611        // v7.22 round-13 gap 1 — PG 18 pg_dump wraps scripts in
3612        // `\restrict` / `\unrestrict`; psql parity = the lines never
3613        // reach the parser.
3614        let script = "\\restrict TOKEN123\nSELECT 1;\n\\unrestrict TOKEN123\nSELECT 2;\n\\.\n";
3615        assert_eq!(split_statements(script), vec!["SELECT 1", "SELECT 2"]);
3616        // Mid-statement backslash is NOT a meta-command.
3617        let s2 = r"SELECT E'a\\b'";
3618        assert_eq!(split_statements(s2), vec![s2]);
3619    }
3620
3621    #[test]
3622    fn split_statements_comments_hide_semicolons() {
3623        let script = "-- c1 ; still comment\nSELECT 1; /* a ; b /* nested ; */ */ SELECT 2";
3624        let got = split_statements(script);
3625        assert_eq!(got.len(), 2);
3626        assert!(got[0].contains("SELECT 1"));
3627        assert!(got[1].contains("SELECT 2"));
3628    }
3629
3630    #[test]
3631    fn in_memory_create_insert_select() {
3632        let mut db = Database::open_in_memory();
3633        db.execute("CREATE TABLE t (id INT NOT NULL, name TEXT)")
3634            .unwrap();
3635        db.execute("INSERT INTO t VALUES (1, 'alice')").unwrap();
3636        db.execute("INSERT INTO t VALUES (2, 'bob')").unwrap();
3637        let rows = db.query("SELECT id FROM t WHERE id = 1").unwrap();
3638        assert_eq!(rows.len(), 1);
3639        match &rows[0][0] {
3640            Value::Int(1) => {}
3641            other => panic!("expected Int(1), got {other:?}"),
3642        }
3643    }
3644
3645    #[test]
3646    fn query_on_non_select_errors() {
3647        let mut db = Database::open_in_memory();
3648        db.execute("CREATE TABLE t (id INT)").unwrap();
3649        let r = db.query("INSERT INTO t VALUES (1)");
3650        assert!(r.is_err(), "query() on INSERT must error");
3651    }
3652
3653    #[test]
3654    fn snapshot_roundtrip() {
3655        let mut db = Database::open_in_memory();
3656        db.execute("CREATE TABLE t (id INT NOT NULL)").unwrap();
3657        db.execute("INSERT INTO t VALUES (42)").unwrap();
3658        let bytes = db.snapshot();
3659        let mut restored = Database::restore(&bytes).unwrap();
3660        let rows = restored.query("SELECT id FROM t WHERE id = 42").unwrap();
3661        assert_eq!(rows.len(), 1);
3662        match &rows[0][0] {
3663            Value::Int(42) => {}
3664            other => panic!("expected Int(42), got {other:?}"),
3665        }
3666    }
3667
3668    #[test]
3669    fn from_spg_row_trait_shape() {
3670        struct User {
3671            _id: i32,
3672        }
3673        impl FromSpgRow for User {
3674            fn from_spg_row(row: &[Value]) -> Result<Self, EngineError> {
3675                match row.first() {
3676                    Some(Value::Int(n)) => Ok(Self { _id: *n }),
3677                    _ => Err(EngineError::Unsupported("bad id".into())),
3678                }
3679            }
3680        }
3681        let row = vec![Value::Int(7)];
3682        let _u = User::from_spg_row(&row).unwrap();
3683    }
3684}