Skip to main content

spg_embedded/
lib.rs

1// v7.7.2 — every public item in this crate must carry a
2// doc-comment; new code that adds a `pub` without one fails CI.
3#![deny(missing_docs)]
4
5//! # spg-embedded
6//!
7//! Ergonomic embedded-mode entry point for SPG. Wraps the
8//! `spg-engine` execution layer for in-process applications
9//! that don't want to spin up a TCP listener / fork to the
10//! `spg-server` binary.
11//!
12//! ## Quick start
13//!
14//! ```no_run
15//! use spg_embedded::Database;
16//!
17//! // On-disk, durable. WAL fsynced per commit; auto-checkpoint
18//! // at 4 MiB WAL by default.
19//! let mut db = Database::open_path("/data/app.db").unwrap();
20//! db.execute("CREATE TABLE users (id INT NOT NULL, name TEXT)").unwrap();
21//! db.execute("INSERT INTO users VALUES (1, 'alice')").unwrap();
22//! let rows = db.query("SELECT name FROM users WHERE id = 1").unwrap();
23//! for row in &rows {
24//!     println!("{:?}", row);
25//! }
26//! ```
27//!
28//! ## Production checklist (v7.5)
29//!
30//! - **Persistence**: `Database::open_path(p)` writes a
31//!   crash-consistent WAL + periodic checkpoint snapshot. The
32//!   on-disk format is byte-identical to what `spg-server`
33//!   produces, so a database can move between modes without
34//!   conversion.
35//! - **Durability**: every `execute()` that mutates calls
36//!   `fsync` before returning `Ok`. There is no group commit
37//!   in embedded mode — every commit pays one fsync. If you
38//!   need batch throughput, wrap multiple statements in
39//!   [`Database::with_transaction`] which fsyncs only at
40//!   commit.
41//! - **Concurrency**: [`Database`] is `Send` but **not** `Sync`.
42//!   Share across threads via `Arc<Mutex<Database>>`. The
43//!   single-writer model is intentional — see
44//!   [STABILITY § A1](https://github.com/lihao/spg/blob/master/STABILITY.md).
45//! - **Background work**: [`Database::spawn_background_freezer`]
46//!   moves cold rows to disk-resident segments while you keep
47//!   serving requests. It runs in a dedicated thread; drop the
48//!   returned [`FreezerHandle`] (or call `stop()`) for clean
49//!   shutdown.
50//! - **Errors**: all public enums ([`EngineError`],
51//!   [`QueryResult`], [`Value`]) are `#[non_exhaustive]`. Match
52//!   them with a wildcard arm so future v7.x releases can add
53//!   variants without breaking your code.
54//!
55//! ## Panic contract
56//!
57//! - **No `execute()` / `query()` call panics on user input.**
58//!   Malformed SQL, type mismatches, missing tables — all
59//!   return `Err(EngineError::…)`. If you observe a panic on
60//!   a user-controlled string, that is a bug; file an issue.
61//! - The library panics **only** on internal invariant
62//!   violations (e.g., catalog snapshot magic mismatch, WAL
63//!   record CRC sentinel corruption that survived the boot-
64//!   time validation). These represent silent disk corruption
65//!   and an unwind would leak inconsistent state, so the
66//!   release profile uses `panic = abort` — your host process
67//!   dies fast rather than continuing on poisoned data.
68//! - If you cannot tolerate `panic = abort`, build with
69//!   `--profile release-dbg` (keeps unwind tables) and use
70//!   `std::panic::catch_unwind` at your application boundary.
71//!
72//! ## Why a separate crate?
73//!
74//! `spg-engine` is `no_std`-compatible (vendored alloc-only).
75//! The embedded-mode entry point uses `std` (filesystem,
76//! threading), so it lives in its own crate to keep the
77//! `no_std` boundary clean.
78
79pub use spg_engine::{CatalogSnapshot, Engine, EngineError, ParsedStatement, QueryResult};
80pub use spg_storage::{ColumnSchema, DataType, Value};
81
82/// v7.16.0 — handle for a parsed-and-planned SQL statement.
83/// Hand off to [`Database::execute_prepared`] / [`Database::query_prepared`]
84/// with a `&[Value]` slice carrying the bind parameters (PG-style
85/// `$1`, `$2`, … positional). Cheap to `Clone`; the underlying AST
86/// is shared by handle copies and cloned per bind call by the
87/// engine's executor.
88///
89/// The handle holds a snapshot of the AST at prepare time. If
90/// the engine's plan cache evicts the entry between prepare and
91/// execute (e.g. ANALYZE bumps the statistics version) the
92/// stored AST keeps working — `execute_prepared` operates on
93/// the handle's clone, not the cache entry.
94#[derive(Debug, Clone)]
95pub struct Statement {
96    /// The parsed + planned AST. `spg-engine::prepare_cached`
97    /// returns it as a clone of the cached plan, so any rewrite
98    /// passes (`expand_group_by_all`, `reorder_joins`, …) have
99    /// already run.
100    pub(crate) stmt: ParsedStatement,
101    /// Original SQL source, kept for `Display` / debug only.
102    /// WAL persistence renders from the AST so a bind-time
103    /// rewrite of `$1..$N` survives replay.
104    pub(crate) sql: String,
105}
106
107impl Statement {
108    /// Borrow the original SQL source — useful for tracing and
109    /// debug logs. WAL replay does NOT use this; it serialises
110    /// the bind-final AST instead.
111    #[must_use]
112    pub fn sql(&self) -> &str {
113        &self.sql
114    }
115}
116
117/// v7.16.0 — internal WAL helper. Mirrors what
118/// `Engine::execute_prepared` does to the cloned AST so the WAL
119/// record carries the bind-final SQL text (so replay's
120/// simple-query path reconstructs the same row state without
121/// needing the original `Statement` handle to still be alive).
122/// Errors from the underlying engine helper would only fire if
123/// the bind-final stmt referenced a placeholder past the params
124/// slice — and that case has already errored in the executor
125/// above before this helper runs, so we discard the Result here.
126fn wal_render_with_params(stmt: &mut ParsedStatement, params: &[Value]) {
127    let _ = spg_engine::substitute_placeholders(stmt, params);
128}
129
130use std::collections::BTreeMap;
131use std::fs::{File, OpenOptions};
132use std::io::Write;
133use std::path::{Path, PathBuf};
134use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
135use std::sync::{Arc, Mutex};
136use std::thread::{self, JoinHandle};
137use std::time::{Duration, SystemTime, UNIX_EPOCH};
138
139/// v7.11.3 — wall-clock provider injected into every embedded
140/// `Engine`. Microseconds since the Unix epoch; clamps to
141/// `i64::MAX` if the system clock is far-future. Used by SQL's
142/// `NOW()` / `CURRENT_TIMESTAMP` / `CURRENT_DATE` rewrite layer
143/// so PG-idiomatic time queries work without the caller wiring
144/// their own clock.
145fn wall_clock_micros() -> i64 {
146    SystemTime::now()
147        .duration_since(UNIX_EPOCH)
148        .map_or(0, |d| i64::try_from(d.as_micros()).unwrap_or(i64::MAX))
149}
150
151use spg_manifest::{CatalogManifest, ColdSegmentEntry, manifest_path as spg_manifest_path};
152
153// -- v7.1 WAL format constants (mirror `spg-server`'s) ---------
154// Kept private so callers can't mis-frame records; the v3 layout
155// is the same the server uses, so a `spg-server` boot can read a
156// database an embedded process wrote and vice versa.
157const WAL_V2_SENTINEL: u32 = 0x8000_0000;
158const WAL_V3_FLAG: u32 = 0x4000_0000;
159const WAL_V3_TYPE_AUTO_COMMIT_SQL: u8 = 0x01;
160/// v7.18 — durability checkpoint marker stays at 0x02 (skipped on replay).
161const WAL_V3_TYPE_DURABILITY_CHECKPOINT: u8 = 0x02;
162/// v7.18 PITR — auto-commit-sql record with appended (commit_lsn,
163/// commit_unix_us) fields so replay can target a specific point in
164/// time. Backward-compat: v3 records (type 0x01) keep working, the
165/// envelope flag bits are unchanged. The new type byte is the
166/// schema-version discriminator.
167const WAL_V4_TYPE_AUTO_COMMIT_SQL: u8 = 0x10;
168/// v7.18 — sentinel for "no wall clock" inside a v4 record's
169/// commit_unix_us slot. Restore-to-timestamp skips records with
170/// this sentinel (no time anchor); LSN-based restore is
171/// unaffected.
172const WAL_V4_NO_CLOCK: i64 = i64::MIN;
173/// v7.18 — extra header bytes after the type byte in a v4 record:
174/// 8 bytes commit_lsn (u64 LE) + 8 bytes commit_unix_us (i64 LE).
175const WAL_V4_EXTRA_HEADER: usize = 16;
176/// v7.18 PITR — checkpoint anchor record written to the WAL *before*
177/// the snapshot file replaces the on-disk catalog. Carries the
178/// (lsn, ts, snapshot_path) triple so restore tooling can find the
179/// matching base snapshot without scanning the filesystem. Replay
180/// dispatch skips it (same as the v3 durability marker).
181const WAL_V4_TYPE_CHECKPOINT_MARKER: u8 = 0x11;
182
183/// v7.21 (mailrs embed round-12 polish) — one COMMITted explicit
184/// transaction, flushed atomically at COMMIT time. Payload = the
185/// transaction's bind-final mutation statements joined with `";\n"`;
186/// replay re-splits via [`split_statements`] and applies in order.
187/// Same 16-byte (commit_lsn, commit_unix_us) prefix as the v4
188/// auto-commit record. The record is CRC-framed like every other
189/// record, so replay applies the whole transaction or — torn tail —
190/// none of it; a transaction can never half-resurrect.
191///
192/// Why it exists: in-transaction mutations only touch the engine's
193/// shadow catalog (`modified_catalog: false`), so the per-statement
194/// auto-commit append never fired and a COMMIT followed by a crash
195/// (no graceful Drop checkpoint) lost the transaction.
196const WAL_V4_TYPE_TX_COMMIT_SQL: u8 = 0x12;
197
198/// v7.1 — auto-checkpoint threshold. Once the WAL grows past
199/// this many bytes, the next successful `execute()` call ends
200/// with a `checkpoint()` so the WAL stays bounded. Tunable via
201/// `SPG_EMBEDDED_CHECKPOINT_BYTES` env.
202fn default_checkpoint_threshold_bytes() -> u64 {
203    std::env::var("SPG_EMBEDDED_CHECKPOINT_BYTES")
204        .ok()
205        .and_then(|s| s.parse::<u64>().ok())
206        .filter(|&n| n > 0)
207        .unwrap_or(4 * 1024 * 1024)
208}
209
210/// v7.1 — encode one v3 `auto_commit_sql` record. Layout:
211///
212/// ```text
213/// [u32 LE (len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
214/// [u32 LE crc32 over (type_byte || sql_bytes)]
215/// [u8 type = 0x01]
216/// [sql bytes]
217/// ```
218fn encode_v3_auto_commit(sql: &str) -> Vec<u8> {
219    let payload = sql.as_bytes();
220    let mut crc_buf = Vec::with_capacity(1 + payload.len());
221    crc_buf.push(WAL_V3_TYPE_AUTO_COMMIT_SQL);
222    crc_buf.extend_from_slice(payload);
223    let crc = spg_crypto::crc32::crc32(&crc_buf);
224    let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
225    let mut out = Vec::with_capacity(4 + 4 + 1 + payload.len());
226    out.extend_from_slice(&header);
227    out.extend_from_slice(&crc.to_le_bytes());
228    out.push(WAL_V3_TYPE_AUTO_COMMIT_SQL);
229    out.extend_from_slice(payload);
230    out
231}
232
233/// v7.20 P2 — WAL group-commit. N concurrent commits share one
234/// fsync (the 4.2 ms p50 that profile_breakdown measured as
235/// 99.2% of the durable write path).
236///
237/// Leader-follower protocol, same family as PG's group commit:
238///
239/// 1. `enqueue(record)` — called while the caller still holds
240///    the engine's write lock. Appends the encoded record to the
241///    shared buffer, returns a sequence ticket. O(memcpy).
242/// 2. Caller RELEASES the engine write lock (the next writer's
243///    mutation proceeds in parallel with this batch's fsync).
244/// 3. `wait_flushed(seq)` — if nobody is flushing, the caller
245///    elects itself leader: swaps the buffer out, writes +
246///    fsyncs ONCE for every record in the batch, marks the
247///    batch durable, wakes all followers. Otherwise it parks on
248///    the condvar until a leader covers its seq.
249///
250/// Durability contract is unchanged from v7.19: `execute()`
251/// does not return Ok until the record that describes its
252/// mutation is fsynced. The only change is N callers sharing
253/// one fsync instead of paying one each.
254///
255/// Lock order (deadlock-free): `state` then `file`; never the
256/// reverse. The leader holds `file` WITHOUT `state` during IO so
257/// enqueues continue while fsync runs.
258#[derive(Debug)]
259struct WalGroup {
260    state: Mutex<WalGroupState>,
261    cond: std::sync::Condvar,
262    /// Active chunk file handle. Separate lock from `state` so
263    /// the leader's write+fsync doesn't block concurrent
264    /// enqueues. Swapped by `checkpoint()` at rotation.
265    file: Mutex<File>,
266}
267
268#[derive(Debug)]
269struct WalGroupState {
270    /// Encoded records awaiting flush.
271    buf: Vec<u8>,
272    /// Monotonic enqueue counter (1-based).
273    enqueued_seq: u64,
274    /// Highest seq whose record is fsynced.
275    flushed_seq: u64,
276    /// True while some caller is inside the leader IO section.
277    leader_active: bool,
278    /// Sticky fatal error — a failed fsync poisons the WAL
279    /// (loud, never silent). All current + future waiters error.
280    failed: Option<String>,
281    /// Bytes written to the active chunk since rotation —
282    /// drives the auto-checkpoint trigger.
283    written_len: u64,
284}
285
286/// Ticket returned by the buffered write path; `wait()` blocks
287/// until the record it covers is durable (or the WAL is
288/// poisoned). Cheap to move across threads.
289#[derive(Debug)]
290pub struct WalTicket {
291    group: Arc<WalGroup>,
292    seq: u64,
293}
294
295impl WalGroup {
296    fn new(file: File, initial_len: u64) -> Self {
297        Self {
298            state: Mutex::new(WalGroupState {
299                buf: Vec::new(),
300                enqueued_seq: 0,
301                flushed_seq: 0,
302                leader_active: false,
303                failed: None,
304                written_len: initial_len,
305            }),
306            cond: std::sync::Condvar::new(),
307            file: Mutex::new(file),
308        }
309    }
310
311    /// Append `record` to the pending batch. Returns the seq the
312    /// caller must wait on. Called under the engine write lock —
313    /// keep it O(memcpy).
314    fn enqueue(&self, record: &[u8]) -> u64 {
315        let mut g = self.state.lock().expect("wal state poisoned");
316        g.buf.extend_from_slice(record);
317        g.enqueued_seq += 1;
318        g.enqueued_seq
319    }
320
321    /// Block until `seq` is durable. Leader-follower: the first
322    /// arriving waiter flushes for everyone.
323    fn wait_flushed(&self, seq: u64) -> Result<(), EngineError> {
324        let mut g = self.state.lock().expect("wal state poisoned");
325        loop {
326            if let Some(e) = &g.failed {
327                return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
328                    format!("WAL poisoned by earlier flush failure: {e}"),
329                )));
330            }
331            if g.flushed_seq >= seq {
332                return Ok(());
333            }
334            if !g.leader_active {
335                // Elect self leader.
336                g.leader_active = true;
337                drop(g);
338                // v7.20 — commit_delay (PG's same-named knob):
339                // before taking the batch, give in-flight
340                // writers a short window to enqueue so the
341                // shared fsync covers more commits. 150 µs costs
342                // ~3.5% on a solo 4.2 ms fsync but multiplies
343                // batch size under load. Tunable via
344                // SPG_COMMIT_DELAY_US (0 disables).
345                let delay = commit_delay_us();
346                if delay > 0 {
347                    std::thread::sleep(std::time::Duration::from_micros(delay));
348                }
349                let (batch, flush_to) = {
350                    let mut g2 = self.state.lock().expect("wal state poisoned");
351                    (core::mem::take(&mut g2.buf), g2.enqueued_seq)
352                };
353                let io_result: std::io::Result<()> = (|| {
354                    let mut f = self.file.lock().expect("wal file poisoned");
355                    f.write_all(&batch)?;
356                    f.sync_data()
357                })();
358                g = self.state.lock().expect("wal state poisoned");
359                g.leader_active = false;
360                match io_result {
361                    Ok(()) => {
362                        g.flushed_seq = flush_to;
363                        g.written_len = g.written_len.saturating_add(batch.len() as u64);
364                    }
365                    Err(e) => {
366                        g.failed = Some(e.to_string());
367                    }
368                }
369                self.cond.notify_all();
370                //
371
372                // Loop continues: either our seq is now covered
373                // (leader path normally returns next iteration)
374                // or the error branch surfaces.
375                continue;
376            }
377            g = self.cond.wait(g).expect("wal condvar poisoned");
378        }
379    }
380
381    /// Drain the pending batch + flush synchronously. Caller must
382    /// guarantee no concurrent enqueues (checkpoint holds the
383    /// engine exclusively). Used before rotation so the marker
384    /// lands in the right chunk.
385    fn flush_now(&self) -> Result<(), EngineError> {
386        let mut g = self.state.lock().expect("wal state poisoned");
387        if let Some(e) = &g.failed {
388            return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
389                format!("WAL poisoned: {e}"),
390            )));
391        }
392        let batch = core::mem::take(&mut g.buf);
393        let flush_to = g.enqueued_seq;
394        if batch.is_empty() {
395            return Ok(());
396        }
397        drop(g);
398        let io: std::io::Result<()> = (|| {
399            let mut f = self.file.lock().expect("wal file poisoned");
400            f.write_all(&batch)?;
401            f.sync_data()
402        })();
403        let mut g = self.state.lock().expect("wal state poisoned");
404        match io {
405            Ok(()) => {
406                g.flushed_seq = flush_to;
407                g.written_len = g.written_len.saturating_add(batch.len() as u64);
408                self.cond.notify_all();
409                Ok(())
410            }
411            Err(e) => {
412                g.failed = Some(e.to_string());
413                self.cond.notify_all();
414                Err(io_err(e))
415            }
416        }
417    }
418
419    /// Swap the active chunk handle (rotation). Caller flushes
420    /// first; both locks taken in canonical order.
421    fn rotate_file(&self, new_file: File) {
422        let mut g = self.state.lock().expect("wal state poisoned");
423        let mut f = self.file.lock().expect("wal file poisoned");
424        *f = new_file;
425        g.written_len = 0;
426    }
427
428    fn written_len(&self) -> u64 {
429        let g = self.state.lock().expect("wal state poisoned");
430        g.written_len + g.buf.len() as u64
431    }
432}
433
434impl WalTicket {
435    /// Block until the record this ticket covers is durable.
436    ///
437    /// Under `SPG_SYNCHRONOUS_COMMIT=off` this returns
438    /// immediately — the background flusher (or the next
439    /// checkpoint / clean shutdown) makes the record durable
440    /// within `SPG_WAL_WRITER_DELAY_MS`. Same contract as PG's
441    /// `synchronous_commit = off`.
442    ///
443    /// # Errors
444    /// Surfaces the leader's IO error if the batch flush failed
445    /// (the WAL is then poisoned for all subsequent writes).
446    pub fn wait(&self) -> Result<(), EngineError> {
447        if !synchronous_commit_on() {
448            return Ok(());
449        }
450        self.group.wait_flushed(self.seq)
451    }
452}
453
454/// v7.19 P3 — retention sweep loop. Runs in a dedicated thread
455/// spawned by `Database::open_path` when `SPG_PITR_RETENTION_HOURS`
456/// is set to a non-zero value. Wakes every
457/// `SPG_PITR_RETENTION_CHECK_SEC` (default 60 s), enumerates chunks
458/// under `wal_dir`, archives via `SPG_PITR_ARCHIVE_CMD` if set, and
459/// deletes anything older than `retention_hours`.
460///
461/// Loud-failure posture matches PG's `archive_command`: if the
462/// archive command returns non-zero, the chunk stays on disk and
463/// a warning prints to stderr. The retention sweep doesn't delete
464/// a chunk it failed to archive.
465fn retention_sweep_loop(
466    wal_dir: PathBuf,
467    retention_hours: u64,
468    check_interval: std::time::Duration,
469    archive_cmd: Option<String>,
470    shutdown: Arc<AtomicBool>,
471) {
472    while !shutdown.load(Ordering::SeqCst) {
473        if let Err(e) = retention_sweep_once(&wal_dir, retention_hours, archive_cmd.as_deref()) {
474            eprintln!("spg-embedded: retention sweep error: {e}");
475        }
476        // Sleep in short ticks so shutdown isn't blocked on a
477        // 60 s naptime when Drop signals.
478        let mut elapsed = std::time::Duration::ZERO;
479        let tick = std::time::Duration::from_millis(250);
480        while elapsed < check_interval {
481            if shutdown.load(Ordering::SeqCst) {
482                return;
483            }
484            std::thread::sleep(tick);
485            elapsed += tick;
486        }
487    }
488}
489
490/// v7.19 P3 — one retention sweep pass over `wal_dir`. Extracted
491/// from the loop so tests can drive it directly. Public so the
492/// e2e_pitr_retention integration test (and any future operator
493/// tooling that wants synchronous retention) can call it.
494pub fn retention_sweep_once(
495    wal_dir: &Path,
496    retention_hours: u64,
497    archive_cmd: Option<&str>,
498) -> std::io::Result<()> {
499    if !wal_dir.exists() {
500        return Ok(());
501    }
502    let now_us = wall_clock_micros();
503    let cutoff_us = (now_us as i128 - (retention_hours as i128 * 3_600 * 1_000_000)) as i64;
504    let chunks = sorted_wal_chunks(wal_dir)?;
505    for chunk in chunks {
506        // Don't sweep the most-recent chunk; it's the live one
507        // execute() is appending to. Compare against the largest
508        // filename-prefix unix_us.
509        let stem = match chunk.file_stem().and_then(|s| s.to_str()) {
510            Some(s) => s,
511            None => continue,
512        };
513        let chunk_us: i64 = stem
514            .split_once('_')
515            .and_then(|(prefix, _)| i64::from_str_radix(prefix, 16).ok())
516            .unwrap_or(0);
517        if chunk_us >= cutoff_us {
518            continue;
519        }
520        // Archive first if requested.
521        if let Some(cmd) = archive_cmd {
522            if !cmd.is_empty() {
523                let output = std::process::Command::new("sh")
524                    .arg("-c")
525                    .arg(cmd)
526                    .arg("--")
527                    .arg(&chunk)
528                    .output()?;
529                if !output.status.success() {
530                    eprintln!(
531                        "spg-embedded: SPG_PITR_ARCHIVE_CMD failed for {} (exit {}); chunk stays on disk",
532                        chunk.display(),
533                        output.status.code().unwrap_or(-1)
534                    );
535                    continue;
536                }
537            }
538        }
539        // Delete the chunk + its sibling .checksum if present.
540        if let Err(e) = std::fs::remove_file(&chunk) {
541            eprintln!(
542                "spg-embedded: retention remove {} failed: {e}",
543                chunk.display()
544            );
545            continue;
546        }
547        let mut cs = chunk.clone();
548        let mut name = cs.file_name().map(|n| n.to_os_string()).unwrap_or_default();
549        name.push(".checksum");
550        cs.set_file_name(name);
551        let _ = std::fs::remove_file(&cs);
552    }
553    Ok(())
554}
555
556/// v7.20 — group-commit delay window in µs (PG `commit_delay`
557/// analogue). The flush leader sleeps this long before taking
558/// the batch so concurrent writers pile in. Default 150 µs;
559/// `SPG_COMMIT_DELAY_US=0` disables.
560fn commit_delay_us() -> u64 {
561    static CACHED: std::sync::OnceLock<u64> = std::sync::OnceLock::new();
562    *CACHED.get_or_init(|| {
563        std::env::var("SPG_COMMIT_DELAY_US")
564            .ok()
565            .and_then(|s| s.parse::<u64>().ok())
566            .unwrap_or(150)
567    })
568}
569
570/// v7.20 — PG `synchronous_commit` analogue. `on` (default):
571/// `execute()` blocks until its WAL record is fsynced —
572/// zero-loss durability. `off`: `execute()` returns after the
573/// in-memory mutation + WAL enqueue; a background flusher
574/// thread writes + fsyncs every `SPG_WAL_WRITER_DELAY_MS`
575/// (default 200 ms — PG's `wal_writer_delay` default). Crash
576/// window = up to one flush interval of confirmed-but-unsynced
577/// commits — exactly the trade PG documents for the same
578/// setting. Clean shutdown (Drop / checkpoint) always flushes.
579fn synchronous_commit_on() -> bool {
580    static CACHED: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
581    *CACHED.get_or_init(|| {
582        !std::env::var("SPG_SYNCHRONOUS_COMMIT")
583            .map(|v| v.eq_ignore_ascii_case("off") || v == "0" || v.eq_ignore_ascii_case("false"))
584            .unwrap_or(false)
585    })
586}
587
588/// v7.20 — background WAL flusher cadence for
589/// `SPG_SYNCHRONOUS_COMMIT=off` (PG `wal_writer_delay`).
590fn wal_writer_delay_ms() -> u64 {
591    static CACHED: std::sync::OnceLock<u64> = std::sync::OnceLock::new();
592    *CACHED.get_or_init(|| {
593        std::env::var("SPG_WAL_WRITER_DELAY_MS")
594            .ok()
595            .and_then(|s| s.parse::<u64>().ok())
596            .filter(|&n| n > 0)
597            .unwrap_or(200)
598    })
599}
600
601fn pitr_retention_hours() -> u64 {
602    std::env::var("SPG_PITR_RETENTION_HOURS")
603        .ok()
604        .and_then(|s| s.parse::<u64>().ok())
605        .unwrap_or(0)
606}
607
608fn pitr_retention_check_sec() -> u64 {
609    std::env::var("SPG_PITR_RETENTION_CHECK_SEC")
610        .ok()
611        .and_then(|s| s.parse::<u64>().ok())
612        .filter(|&n| n > 0)
613        .unwrap_or(60)
614}
615
616fn pitr_archive_cmd() -> Option<String> {
617    std::env::var("SPG_PITR_ARCHIVE_CMD")
618        .ok()
619        .filter(|s| !s.is_empty())
620}
621
622/// v7.19 — replay every record from `wal_bytes` whose
623/// `commit_lsn` is strictly greater than `floor_lsn`. v3 records
624/// (no LSN) and v4 records with `commit_lsn <= floor_lsn` are
625/// skipped — the snapshot loaded ahead of this call already
626/// reflects them, and re-applying would DuplicateTable /
627/// double-insert. v3 records inside the legacy migration chunk
628/// always apply because the migration sets `floor_lsn = 0` and
629/// v3 records carry no LSN to compare; the pre-migration
630/// behaviour (every record replays) is what the migration
631/// preserves.
632///
633/// Returns the count of records successfully applied. Same
634/// torn-tail semantics as `replay_wal_into_engine`.
635fn replay_wal_filtered(
636    wal_bytes: &[u8],
637    engine: &mut Engine,
638    floor_lsn: u64,
639) -> Result<usize, String> {
640    let records = parse_wal_records(wal_bytes)?;
641    let mut applied = 0usize;
642    for r in &records {
643        // Skip markers + non-SQL records.
644        if r.type_byte == WAL_V3_TYPE_DURABILITY_CHECKPOINT
645            || r.type_byte == WAL_V4_TYPE_CHECKPOINT_MARKER
646        {
647            continue;
648        }
649        // v4 SQL records carry an LSN. Apply iff strictly above
650        // the snapshot floor.
651        if r.type_byte == WAL_V4_TYPE_AUTO_COMMIT_SQL || r.type_byte == WAL_V4_TYPE_TX_COMMIT_SQL {
652            if let Some(lsn) = r.commit_lsn {
653                if lsn <= floor_lsn {
654                    continue;
655                }
656            }
657        }
658        // v3 records (type 0x01, no LSN) always apply — the
659        // legacy migration path is the only place they appear,
660        // and floor_lsn=0 there.
661        let sql = match std::str::from_utf8(r.sql) {
662            Ok(s) => s,
663            Err(e) => return Err(format!("non-UTF-8 SQL at offset {}: {e}", r.offset)),
664        };
665        // v7.21 — a tx-commit record carries the whole transaction
666        // as a `";\n"`-joined script; auto-commit records are a
667        // single statement, for which split_statements is a no-op.
668        for stmt in split_statements(sql) {
669            engine.execute(stmt).map_err(|e| {
670                format!(
671                    "WAL replay: apply {stmt:?} at offset {} rejected: {e:?}",
672                    r.offset
673                )
674            })?;
675        }
676        applied += 1;
677    }
678    Ok(applied)
679}
680
681/// v7.19 — WAL chunk filename format. Zero-padded 16-digit
682/// hex on both parts so default lexicographic sort matches
683/// numeric order, with the unix_us prefix coming first so
684/// the on-disk listing is chronological too.
685fn chunk_filename(unix_us: i64, leading_lsn: u64) -> String {
686    // Negative timestamps shouldn't happen in practice (we sit
687    // post-1970), but clamp to 0 so the zero-padded
688    // representation stays sortable.
689    let us = unix_us.max(0) as u64;
690    format!("{us:016x}_{leading_lsn:016x}.wal")
691}
692
693/// v7.19 — filename used for the legacy single-file WAL when
694/// `open_path` migrates a v7.18-layout database into the new
695/// chunk directory. Lexicographically smallest possible value
696/// so subsequent chunks sort after it.
697fn legacy_chunk_filename() -> String {
698    chunk_filename(0, 0)
699}
700
701/// v7.19 — list every `.wal` file in `wal_dir` in
702/// lexicographic order (which doubles as chunk-creation
703/// order thanks to the zero-padded filename format).
704fn sorted_wal_chunks(wal_dir: &Path) -> std::io::Result<Vec<PathBuf>> {
705    let mut paths = Vec::new();
706    let read_dir = match std::fs::read_dir(wal_dir) {
707        Ok(rd) => rd,
708        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(paths),
709        Err(e) => return Err(e),
710    };
711    for entry in read_dir {
712        let entry = entry?;
713        let path = entry.path();
714        if path.extension().and_then(|s| s.to_str()) == Some("wal") {
715            paths.push(path);
716        }
717    }
718    paths.sort();
719    Ok(paths)
720}
721
722/// v7.18 PITR — encode one v4 `checkpoint_marker` record. Layout:
723///
724/// ```text
725/// [u32 LE (payload_len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
726/// [u32 LE crc32 over (type_byte || payload)]
727/// [u8  type = 0x11]
728/// payload:
729///   [u64 LE checkpoint_lsn]
730///   [i64 LE checkpoint_unix_us  (WAL_V4_NO_CLOCK if no clock)]
731///   [u16 LE snapshot_path_len]
732///   [snapshot_path_bytes]
733/// ```
734///
735/// `payload_len` covers only the payload — keeping the framing
736/// uniform across v3 / v4 record types so torn-write detection in
737/// `replay_wal_into_engine` stays trivial.
738fn encode_v4_checkpoint_marker(
739    checkpoint_lsn: u64,
740    checkpoint_unix_us: i64,
741    snapshot_path: &Path,
742) -> Vec<u8> {
743    let snapshot_bytes = snapshot_path.to_string_lossy().into_owned();
744    let snap_payload = snapshot_bytes.as_bytes();
745    let snap_len_u16: u16 = snap_payload.len().min(u16::MAX as usize) as u16;
746    let mut payload = Vec::with_capacity(8 + 8 + 2 + snap_payload.len());
747    payload.extend_from_slice(&checkpoint_lsn.to_le_bytes());
748    payload.extend_from_slice(&checkpoint_unix_us.to_le_bytes());
749    payload.extend_from_slice(&snap_len_u16.to_le_bytes());
750    payload.extend_from_slice(&snap_payload[..snap_len_u16 as usize]);
751    let mut crc_buf = Vec::with_capacity(1 + payload.len());
752    crc_buf.push(WAL_V4_TYPE_CHECKPOINT_MARKER);
753    crc_buf.extend_from_slice(&payload);
754    let crc = spg_crypto::crc32::crc32(&crc_buf);
755    let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
756    let mut out = Vec::with_capacity(4 + 4 + 1 + payload.len());
757    out.extend_from_slice(&header);
758    out.extend_from_slice(&crc.to_le_bytes());
759    out.push(WAL_V4_TYPE_CHECKPOINT_MARKER);
760    out.extend_from_slice(&payload);
761    out
762}
763
764/// v7.18 PITR — encode one v4 `auto_commit_sql` record. Layout:
765///
766/// ```text
767/// [u32 LE (sql_len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
768/// [u32 LE crc32 over (type_byte || lsn || ts || sql_bytes)]
769/// [u8  type = 0x10]
770/// [u64 LE commit_lsn]
771/// [i64 LE commit_unix_us  (= WAL_V4_NO_CLOCK when no ClockFn)]
772/// [sql bytes]
773/// ```
774///
775/// `sql_len` field stays the SQL byte count — same shape as v3 — so
776/// replay-buffer torn-write detection compares against
777/// `WAL_V4_EXTRA_HEADER + sql_len`. v3 records (type 0x01) stay
778/// readable by the same loop with their original 9-byte header
779/// arithmetic.
780fn encode_v4_auto_commit(sql: &str, commit_lsn: u64, commit_unix_us: i64) -> Vec<u8> {
781    encode_v4_sql_record(WAL_V4_TYPE_AUTO_COMMIT_SQL, sql, commit_lsn, commit_unix_us)
782}
783
784/// v7.21 — same envelope, `WAL_V4_TYPE_TX_COMMIT_SQL` type byte.
785/// `script` = the transaction's statements joined with `";\n"`.
786fn encode_v4_tx_commit(script: &str, commit_lsn: u64, commit_unix_us: i64) -> Vec<u8> {
787    encode_v4_sql_record(
788        WAL_V4_TYPE_TX_COMMIT_SQL,
789        script,
790        commit_lsn,
791        commit_unix_us,
792    )
793}
794
795fn encode_v4_sql_record(type_byte: u8, sql: &str, commit_lsn: u64, commit_unix_us: i64) -> Vec<u8> {
796    let payload = sql.as_bytes();
797    let mut crc_buf = Vec::with_capacity(1 + WAL_V4_EXTRA_HEADER + payload.len());
798    crc_buf.push(type_byte);
799    crc_buf.extend_from_slice(&commit_lsn.to_le_bytes());
800    crc_buf.extend_from_slice(&commit_unix_us.to_le_bytes());
801    crc_buf.extend_from_slice(payload);
802    let crc = spg_crypto::crc32::crc32(&crc_buf);
803    let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
804    let mut out = Vec::with_capacity(4 + 4 + 1 + WAL_V4_EXTRA_HEADER + payload.len());
805    out.extend_from_slice(&header);
806    out.extend_from_slice(&crc.to_le_bytes());
807    out.push(type_byte);
808    out.extend_from_slice(&commit_lsn.to_le_bytes());
809    out.extend_from_slice(&commit_unix_us.to_le_bytes());
810    out.extend_from_slice(payload);
811    out
812}
813
814/// v7.1 — decode + apply every record in `wal_bytes` to `engine`.
815/// Returns the count of records successfully applied. A truncated
816/// trailing record (mid-write torn) is dropped silently — the
817/// same recovery story `spg-server`'s boot path uses.
818fn replay_wal_into_engine(wal_bytes: &[u8], engine: &mut Engine) -> Result<usize, String> {
819    let mut applied = 0usize;
820    let mut cur = 0usize;
821    while cur < wal_bytes.len() {
822        if wal_bytes.len() - cur < 4 {
823            // Trailing partial header — torn write, drop and stop.
824            break;
825        }
826        let raw_len = u32::from_le_bytes(wal_bytes[cur..cur + 4].try_into().unwrap());
827        let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
828        let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
829        let len_mask = if is_v3 {
830            !(WAL_V2_SENTINEL | WAL_V3_FLAG)
831        } else {
832            !WAL_V2_SENTINEL
833        };
834        let rec_len = (raw_len & len_mask) as usize;
835        let header_len = if is_v3 {
836            9
837        } else if is_v2 {
838            8
839        } else {
840            4
841        };
842        if wal_bytes.len() - cur < header_len + rec_len {
843            // Torn record at the tail — drop, stop.
844            break;
845        }
846        if is_v3 {
847            let type_byte = wal_bytes[cur + 8];
848            match type_byte {
849                WAL_V3_TYPE_AUTO_COMMIT_SQL => {}
850                WAL_V3_TYPE_DURABILITY_CHECKPOINT => {
851                    // durability_checkpoint marker — skip, no SQL.
852                    cur += header_len + rec_len;
853                    continue;
854                }
855                WAL_V4_TYPE_CHECKPOINT_MARKER => {
856                    // v7.18 PITR — checkpoint anchor, skip on replay
857                    // (engine state past this point reflects the
858                    // matching snapshot already loaded by the caller).
859                    cur += header_len + rec_len;
860                    continue;
861                }
862                WAL_V4_TYPE_AUTO_COMMIT_SQL | WAL_V4_TYPE_TX_COMMIT_SQL => {
863                    // v7.18 PITR — v4 record carries 16 bytes of
864                    // (commit_lsn, commit_unix_us) between the type
865                    // byte and the SQL payload. Replay reads them but
866                    // does not enforce them — the engine doesn't
867                    // surface LSN/clock here. Restore tooling
868                    // (spgctl) parses them via parse_wal_record below.
869                    //
870                    // v7.21 — tx-commit records (0x12) carry a whole
871                    // transaction as a `";\n"`-joined script;
872                    // split_statements is a no-op on the single-
873                    // statement auto-commit form.
874                    let v4_total = header_len + WAL_V4_EXTRA_HEADER + rec_len;
875                    if wal_bytes.len() - cur < v4_total {
876                        // Torn v4 record at the tail — drop, stop.
877                        break;
878                    }
879                    let sql_start = cur + header_len + WAL_V4_EXTRA_HEADER;
880                    let sql_bytes = &wal_bytes[sql_start..sql_start + rec_len];
881                    let sql = std::str::from_utf8(sql_bytes)
882                        .map_err(|e| format!("WAL replay: non-UTF-8 SQL at offset {cur}: {e}"))?;
883                    for stmt in split_statements(sql) {
884                        engine.execute(stmt).map_err(|e| {
885                            format!("WAL replay: apply {stmt:?} at offset {cur} rejected: {e:?}")
886                        })?;
887                    }
888                    applied += 1;
889                    cur += v4_total;
890                    continue;
891                }
892                other => {
893                    return Err(format!(
894                        "WAL replay: unknown v3 type byte {other:#04x} at offset {cur}"
895                    ));
896                }
897            }
898        }
899        let sql_bytes = &wal_bytes[cur + header_len..cur + header_len + rec_len];
900        let sql = std::str::from_utf8(sql_bytes)
901            .map_err(|e| format!("WAL replay: non-UTF-8 SQL at offset {cur}: {e}"))?;
902        engine
903            .execute(sql)
904            .map_err(|e| format!("WAL replay: apply {sql:?} at offset {cur} rejected: {e:?}"))?;
905        applied += 1;
906        cur += header_len + rec_len;
907    }
908    Ok(applied)
909}
910
911/// v7.18 PITR — parsed WAL record, surfaced for restore / verify
912/// tooling. The replay loop above doesn't expose LSN/timestamp;
913/// `spgctl restore --to <timestamp>` and `spgctl verify` need them.
914/// Returned offsets are byte-positions inside the WAL buffer.
915#[derive(Debug, Clone)]
916pub struct WalRecord<'a> {
917    /// Byte offset in the WAL buffer where this record starts.
918    pub offset: usize,
919    /// Type byte (0x01 = v3 auto-commit, 0x10 = v4 auto-commit,
920    /// 0x02 = durability checkpoint marker).
921    pub type_byte: u8,
922    /// `Some(lsn)` for v4 records, `None` for v3.
923    pub commit_lsn: Option<u64>,
924    /// `Some(unix_us)` for v4 records carrying a clock-set timestamp,
925    /// `None` for v3 or for v4 records explicitly written with
926    /// `WAL_V4_NO_CLOCK` (sentinel for "no ClockFn at commit time").
927    pub commit_unix_us: Option<i64>,
928    /// SQL payload as borrowed bytes. Empty for durability markers.
929    pub sql: &'a [u8],
930}
931
932/// v7.18 PITR — iterate over `wal_bytes` yielding one `WalRecord`
933/// per intact record. Torn-tail records terminate iteration
934/// silently (same recovery story as `replay_wal_into_engine`).
935/// Unknown type bytes inside a v3 envelope return `Err` so the
936/// caller knows the WAL was written by a newer SPG.
937pub fn parse_wal_records(wal_bytes: &[u8]) -> Result<Vec<WalRecord<'_>>, String> {
938    let mut out = Vec::new();
939    let mut cur = 0usize;
940    while cur < wal_bytes.len() {
941        if wal_bytes.len() - cur < 4 {
942            break;
943        }
944        let raw_len = u32::from_le_bytes(wal_bytes[cur..cur + 4].try_into().unwrap());
945        let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
946        let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
947        let len_mask = if is_v3 {
948            !(WAL_V2_SENTINEL | WAL_V3_FLAG)
949        } else {
950            !WAL_V2_SENTINEL
951        };
952        let rec_len = (raw_len & len_mask) as usize;
953        let header_len = if is_v3 {
954            9
955        } else if is_v2 {
956            8
957        } else {
958            4
959        };
960        if wal_bytes.len() - cur < header_len + rec_len {
961            break;
962        }
963        if !is_v3 {
964            // v1 / v2 records carry no type byte; treat as legacy
965            // auto-commit SQL with no LSN/time.
966            let sql = &wal_bytes[cur + header_len..cur + header_len + rec_len];
967            out.push(WalRecord {
968                offset: cur,
969                type_byte: WAL_V3_TYPE_AUTO_COMMIT_SQL,
970                commit_lsn: None,
971                commit_unix_us: None,
972                sql,
973            });
974            cur += header_len + rec_len;
975            continue;
976        }
977        let type_byte = wal_bytes[cur + 8];
978        match type_byte {
979            WAL_V3_TYPE_AUTO_COMMIT_SQL => {
980                let sql = &wal_bytes[cur + header_len..cur + header_len + rec_len];
981                out.push(WalRecord {
982                    offset: cur,
983                    type_byte,
984                    commit_lsn: None,
985                    commit_unix_us: None,
986                    sql,
987                });
988                cur += header_len + rec_len;
989            }
990            WAL_V3_TYPE_DURABILITY_CHECKPOINT => {
991                out.push(WalRecord {
992                    offset: cur,
993                    type_byte,
994                    commit_lsn: None,
995                    commit_unix_us: None,
996                    sql: &[],
997                });
998                cur += header_len + rec_len;
999            }
1000            WAL_V4_TYPE_CHECKPOINT_MARKER => {
1001                // v7.18 PITR — payload = (lsn u64)(ts i64)(path_len u16)(path bytes).
1002                // We surface lsn + ts on the WalRecord; the path lives
1003                // in `sql` since the type byte already disambiguates
1004                // record meaning and adding a dedicated field would
1005                // bloat the iterator return type for every variant.
1006                if rec_len < 18 {
1007                    return Err(format!(
1008                        "WAL parse: checkpoint marker at offset {cur} too short ({rec_len} bytes)"
1009                    ));
1010                }
1011                let lsn = u64::from_le_bytes(
1012                    wal_bytes[cur + header_len..cur + header_len + 8]
1013                        .try_into()
1014                        .unwrap(),
1015                );
1016                let ts_raw = i64::from_le_bytes(
1017                    wal_bytes[cur + header_len + 8..cur + header_len + 16]
1018                        .try_into()
1019                        .unwrap(),
1020                );
1021                let path_len = u16::from_le_bytes(
1022                    wal_bytes[cur + header_len + 16..cur + header_len + 18]
1023                        .try_into()
1024                        .unwrap(),
1025                ) as usize;
1026                if rec_len < 18 + path_len {
1027                    return Err(format!(
1028                        "WAL parse: checkpoint marker at offset {cur} truncated path"
1029                    ));
1030                }
1031                let path_start = cur + header_len + 18;
1032                let path_bytes = &wal_bytes[path_start..path_start + path_len];
1033                let commit_unix_us = if ts_raw == WAL_V4_NO_CLOCK {
1034                    None
1035                } else {
1036                    Some(ts_raw)
1037                };
1038                out.push(WalRecord {
1039                    offset: cur,
1040                    type_byte,
1041                    commit_lsn: Some(lsn),
1042                    commit_unix_us,
1043                    sql: path_bytes,
1044                });
1045                cur += header_len + rec_len;
1046            }
1047            WAL_V4_TYPE_AUTO_COMMIT_SQL | WAL_V4_TYPE_TX_COMMIT_SQL => {
1048                let v4_total = header_len + WAL_V4_EXTRA_HEADER + rec_len;
1049                if wal_bytes.len() - cur < v4_total {
1050                    break;
1051                }
1052                let lsn = u64::from_le_bytes(
1053                    wal_bytes[cur + header_len..cur + header_len + 8]
1054                        .try_into()
1055                        .unwrap(),
1056                );
1057                let ts_raw = i64::from_le_bytes(
1058                    wal_bytes[cur + header_len + 8..cur + header_len + 16]
1059                        .try_into()
1060                        .unwrap(),
1061                );
1062                let commit_unix_us = if ts_raw == WAL_V4_NO_CLOCK {
1063                    None
1064                } else {
1065                    Some(ts_raw)
1066                };
1067                let sql_start = cur + header_len + WAL_V4_EXTRA_HEADER;
1068                let sql = &wal_bytes[sql_start..sql_start + rec_len];
1069                out.push(WalRecord {
1070                    offset: cur,
1071                    type_byte,
1072                    commit_lsn: Some(lsn),
1073                    commit_unix_us,
1074                    sql,
1075                });
1076                cur += v4_total;
1077            }
1078            other => {
1079                return Err(format!(
1080                    "WAL parse: unknown type byte {other:#04x} at offset {cur}"
1081                ));
1082            }
1083        }
1084    }
1085    Ok(out)
1086}
1087
1088/// v7.1 — predicate for "should the next `execute()` mutate the
1089/// WAL?" Returns `false` for SELECT / SHOW / EXPLAIN / BEGIN /
1090/// COMMIT / ROLLBACK and the SPG-specific verbs that don't go
1091/// through the auto-commit record path on the server (CHECKPOINT,
1092/// COMPACT). Conservative: anything we don't explicitly know is
1093/// read-only falls through to "write a WAL record".
1094fn sql_is_read_only(sql: &str) -> bool {
1095    let t = sql.trim_start();
1096    let head = t
1097        .split(|c: char| c.is_whitespace() || c == ';' || c == '(')
1098        .next()
1099        .unwrap_or("");
1100    matches!(
1101        head.to_ascii_lowercase().as_str(),
1102        "select"
1103            | "show"
1104            | "explain"
1105            | "begin"
1106            | "commit"
1107            | "rollback"
1108            | "checkpoint"
1109            | "compact"
1110            | "wait"
1111            | "with"
1112    )
1113}
1114
1115/// Embedded SPG database handle. Owns an `Engine` + provides
1116/// ergonomic wrappers around `execute` and `query`. Drops the
1117/// engine on `Drop` — no WAL flush / fsync, because v6.10.3
1118/// is in-memory only.
1119#[derive(Debug)]
1120pub struct Database {
1121    engine: Engine,
1122    /// v7.1 — persistence sidecar. When `Some(p)`, every
1123    /// `execute(sql)` that mutates state appends a v4
1124    /// `auto_commit_sql` WAL record + fsyncs before the call
1125    /// returns; `Drop` writes a final catalog snapshot to
1126    /// `<db_path>` so the next session boots from a clean
1127    /// snapshot + an empty WAL. `None` = in-memory only (the
1128    /// v6.10.3 shape).
1129    persistence: Option<PersistenceCtx>,
1130    /// v7.18 PITR — monotonic per-database commit LSN. Increments
1131    /// before each successful WAL append; bootstrapped at
1132    /// open_path from `max(parse_wal_records → commit_lsn)` so
1133    /// reopen never reuses an LSN. In-memory databases start at
1134    /// 0 and never advance (no WAL = no LSN-meaningful records).
1135    commit_lsn: AtomicU64,
1136    /// v7.21 (round-12 polish) — explicit-transaction WAL buffer.
1137    /// `Some` between an engine-accepted BEGIN and its
1138    /// COMMIT / ROLLBACK on a persistent database. In-transaction
1139    /// mutations only touch the engine's shadow catalog and report
1140    /// `modified_catalog: false`, so the per-statement auto-commit
1141    /// append never fires for them; their bind-final SQL collects
1142    /// here instead and COMMIT flushes the lot as ONE atomic
1143    /// `WAL_V4_TYPE_TX_COMMIT_SQL` record (ROLLBACK just drops it).
1144    /// Always `None` for in-memory databases.
1145    tx_wal: Option<TxWalBuffer>,
1146}
1147
1148/// See [`Database::tx_wal`].
1149#[derive(Debug, Default)]
1150struct TxWalBuffer {
1151    /// Bind-final SQL of every non-read-only statement the engine
1152    /// accepted inside the open transaction, in execution order.
1153    statements: Vec<String>,
1154    /// `(savepoint_name, statements.len() at SAVEPOINT time)` —
1155    /// `ROLLBACK TO SAVEPOINT` truncates `statements` back to the
1156    /// recorded mark so the WAL record matches what the engine
1157    /// keeps. PG name-reuse semantics (latest wins).
1158    savepoints: Vec<(String, usize)>,
1159}
1160
1161/// Statement-level transaction-control classification for the WAL
1162/// buffer. Runs AFTER the engine accepted the statement, so the
1163/// engine stays the single validator — this only mirrors state.
1164enum TxControl {
1165    Begin,
1166    Commit,
1167    Rollback,
1168    RollbackToSavepoint(String),
1169    Savepoint(String),
1170    ReleaseSavepoint,
1171}
1172
1173fn tx_control_kind(sql: &str) -> Option<TxControl> {
1174    let mut words = sql
1175        .split(|c: char| c.is_whitespace() || c == ';')
1176        .filter(|w| !w.is_empty())
1177        .map(str::to_ascii_lowercase);
1178    let head = words.next()?;
1179    match head.as_str() {
1180        "begin" | "start" => Some(TxControl::Begin),
1181        "commit" | "end" => Some(TxControl::Commit),
1182        "savepoint" => words.next().map(TxControl::Savepoint),
1183        "release" => Some(TxControl::ReleaseSavepoint),
1184        "rollback" => match words.next().as_deref() {
1185            // ROLLBACK TO [SAVEPOINT] <name>
1186            Some("to") => {
1187                let next = words.next()?;
1188                let name = if next == "savepoint" {
1189                    words.next()?
1190                } else {
1191                    next
1192                };
1193                Some(TxControl::RollbackToSavepoint(name))
1194            }
1195            _ => Some(TxControl::Rollback),
1196        },
1197        _ => None,
1198    }
1199}
1200
1201#[derive(Debug)]
1202#[allow(dead_code)] // `wal_dir`/`current_chunk_path` are read at boot; kept for Drop/diag introspection.
1203struct PersistenceCtx {
1204    db_path: PathBuf,
1205    /// v7.19 — WAL chunk directory at `<db_path>.wal/`.
1206    /// Replaces the v7.18 single-file `<db_path>.wal` layout.
1207    /// Each chunk file inside is named
1208    /// `<unix_us>_<leading_lsn>.wal` (zero-padded to 16 digits
1209    /// so default-lex sort = LSN order).
1210    wal_dir: PathBuf,
1211    /// Path of the currently-open chunk file inside `wal_dir`.
1212    /// Rotated at checkpoint and whenever the chunk crosses
1213    /// `checkpoint_threshold_bytes`.
1214    current_chunk_path: PathBuf,
1215    /// v7.19 P3 — retention sweeper handle. `Some` when
1216    /// `SPG_PITR_RETENTION_HOURS > 0` at open_path time; `None`
1217    /// when retention is disabled (the default; v7.18 behaviour
1218    /// preserved). The thread polls `wal_dir` every
1219    /// `SPG_PITR_RETENTION_CHECK_SEC` seconds, archives via
1220    /// `SPG_PITR_ARCHIVE_CMD` if set, then deletes chunks older
1221    /// than the retention window. Signalled to exit via
1222    /// `retention_shutdown` on Drop.
1223    retention_shutdown: Option<Arc<AtomicBool>>,
1224    retention_thread: Option<std::thread::JoinHandle<()>>,
1225    /// v7.20 — background WAL flusher for
1226    /// `SPG_SYNCHRONOUS_COMMIT=off`. `None` in the default
1227    /// synchronous mode. Flushes the pending batch every
1228    /// `SPG_WAL_WRITER_DELAY_MS`; signalled + joined on Drop
1229    /// before the final checkpoint so clean shutdown never
1230    /// loses confirmed commits.
1231    flusher_shutdown: Option<Arc<AtomicBool>>,
1232    flusher_thread: Option<std::thread::JoinHandle<()>>,
1233    /// v7.20 P2 — group-commit WAL. Shared with WalTickets
1234    /// returned by the buffered write path so `wait()` can run
1235    /// after the engine write lock is released.
1236    wal: Arc<WalGroup>,
1237    checkpoint_threshold_bytes: u64,
1238    /// v7.1.4 — `<db_path>.spg/segments/` directory. Cold-tier
1239    /// segments produced by `freeze_oldest_to_cold` / compaction
1240    /// are persisted here as `seg_<id>.spg` files; the manifest
1241    /// at `<db_path>.spg/manifest.v10` records every active
1242    /// segment + its CRC32 so the next boot can verify + reload.
1243    cold_segments_dir: PathBuf,
1244    cold_segment_paths: BTreeMap<u32, PathBuf>,
1245    /// v7.17.0 Phase 6.2 — cross-process exclusion lock. Acquired
1246    /// via `fs::create_dir` on `<db_path>.lock` at open_path
1247    /// entry; released on Drop by `fs::remove_dir`. atomic on
1248    /// every supported platform. A second process opening the
1249    /// same path while the first is still alive hits the
1250    /// create_dir failure and returns
1251    /// `EngineError::Unsupported("database is locked by another
1252    /// process: …")`. Stale locks (process crashed mid-session)
1253    /// must be cleared via `Database::force_unlock(path)` —
1254    /// SPG can't safely fingerprint who owned a stale directory
1255    /// without a libc dep, which would violate spg-embedded's
1256    /// zero-deps charter.
1257    lock_path: PathBuf,
1258}
1259
1260impl Database {
1261    /// Open a fresh in-memory database. No WAL, no catalog
1262    /// snapshot on disk — perfect for tests + short-lived
1263    /// CLI tools.
1264    #[must_use]
1265    pub fn open_in_memory() -> Self {
1266        Self {
1267            engine: Engine::new().with_clock(wall_clock_micros),
1268            persistence: None,
1269            commit_lsn: AtomicU64::new(0),
1270            tx_wal: None,
1271        }
1272    }
1273
1274    /// v7.1 — Open or create a persistent database backed by
1275    /// the file at `db_path`. The WAL lives at `db_path` +
1276    /// ".wal" (e.g. `./data/spg.db` → `./data/spg.db.wal`). Boot
1277    /// path:
1278    ///
1279    /// 1. If `db_path` exists, restore the catalog snapshot.
1280    /// 2. If the WAL exists, replay every record into the
1281    ///    restored engine — the same recovery story
1282    ///    `spg-server` uses.
1283    /// 3. Open the WAL in append+sync mode so subsequent
1284    ///    `execute()` writes durably commit (one fsync per
1285    ///    mutation).
1286    ///
1287    /// `Drop` writes a final catalog snapshot + truncates the
1288    /// WAL — operators that need a sync barrier at a specific
1289    /// point use `checkpoint()` explicitly.
1290    pub fn open_path(db_path: impl AsRef<Path>) -> Result<Self, EngineError> {
1291        let db_path = db_path.as_ref().to_path_buf();
1292        // v7.19 — WAL is a directory of chunk files. Legacy
1293        // single-file path stays variable-named `wal_path` for
1294        // the backward-compat migration block below.
1295        let wal_path = {
1296            let mut p = db_path.clone();
1297            let name = p
1298                .file_name()
1299                .map(|n| {
1300                    let mut s = n.to_os_string();
1301                    s.push(".wal");
1302                    s
1303                })
1304                .unwrap_or_else(|| std::ffi::OsString::from(".wal"));
1305            p.set_file_name(name);
1306            p
1307        };
1308        let wal_dir = wal_path.clone();
1309        if let Some(parent) = db_path.parent()
1310            && !parent.as_os_str().is_empty()
1311        {
1312            std::fs::create_dir_all(parent).map_err(io_err)?;
1313        }
1314        // v7.17.0 Phase 6.2 — acquire cross-process exclusion
1315        // lock before touching any catalog / WAL bytes. atomic
1316        // mkdir on every supported platform; a second process
1317        // opening the same path while the first is still alive
1318        // hits the create_dir failure and gets a clear error.
1319        let lock_path = {
1320            let mut p = db_path.clone();
1321            let name = p
1322                .file_name()
1323                .map(|n| {
1324                    let mut s = n.to_os_string();
1325                    s.push(".lock");
1326                    s
1327                })
1328                .unwrap_or_else(|| std::ffi::OsString::from(".lock"));
1329            p.set_file_name(name);
1330            p
1331        };
1332        acquire_path_lock(&lock_path)?;
1333        let mut engine = if db_path.exists() {
1334            let bytes = std::fs::read(&db_path).map_err(io_err)?;
1335            let engine = Engine::restore_envelope(&bytes).map_err(|e| {
1336                EngineError::Storage(spg_storage::StorageError::Corrupt(format!(
1337                    "restore from {}: {e}",
1338                    db_path.display()
1339                )))
1340            })?;
1341            engine.with_clock(wall_clock_micros)
1342        } else {
1343            Engine::new().with_clock(wall_clock_micros)
1344        };
1345        // v7.1.4 — manifest-driven cold-segment reload. The
1346        // manifest sidecar pairs the catalog snapshot CRC with a
1347        // list of `(segment_id, path, crc32)` triples; verify
1348        // before loading so a torn or stale manifest doesn't
1349        // surface phantom data.
1350        let cold_segments_dir = {
1351            let parent = db_path.parent().unwrap_or_else(|| Path::new("."));
1352            let stem = db_path
1353                .file_stem()
1354                .unwrap_or_else(|| std::ffi::OsStr::new("db"))
1355                .to_string_lossy()
1356                .into_owned();
1357            parent.join(format!("{stem}.spg")).join("segments")
1358        };
1359        let mut cold_segment_paths: BTreeMap<u32, PathBuf> = BTreeMap::new();
1360        let manifest_pth = spg_manifest_path(&db_path);
1361        if manifest_pth.exists() && db_path.exists() {
1362            let m_bytes = std::fs::read(&manifest_pth).map_err(io_err)?;
1363            if let Ok(m) = CatalogManifest::deserialize(&m_bytes) {
1364                let snap_bytes = std::fs::read(&db_path).map_err(io_err)?;
1365                let snap_crc = spg_crypto::crc32::crc32(&snap_bytes);
1366                if snap_crc == m.catalog_crc32 {
1367                    for entry in &m.cold_segments {
1368                        if let Ok(seg_bytes) = std::fs::read(&entry.path) {
1369                            let computed = spg_crypto::crc32::crc32(&seg_bytes);
1370                            if computed != entry.crc32 {
1371                                eprintln!(
1372                                    "spg-embedded: manifest skip segment {}: CRC mismatch",
1373                                    entry.segment_id
1374                                );
1375                                continue;
1376                            }
1377                            if engine.catalog().cold_segment(entry.segment_id).is_some() {
1378                                // Already loaded via Catalog::clone path (shouldn't happen
1379                                // since Engine::new + restore_envelope don't populate cold).
1380                                continue;
1381                            }
1382                            let mut new_cat = engine.catalog().clone();
1383                            if let Err(e) =
1384                                new_cat.load_segment_bytes_at(entry.segment_id, seg_bytes)
1385                            {
1386                                eprintln!(
1387                                    "spg-embedded: manifest load segment {} failed: {e}",
1388                                    entry.segment_id
1389                                );
1390                                continue;
1391                            }
1392                            engine.replace_catalog(new_cat);
1393                            cold_segment_paths.insert(entry.segment_id, entry.path.clone());
1394                        } else {
1395                            eprintln!(
1396                                "spg-embedded: manifest skip segment {}: file unreadable",
1397                                entry.segment_id
1398                            );
1399                        }
1400                    }
1401                }
1402            }
1403        }
1404        // v7.19 — chunked WAL on-disk layout.
1405        //
1406        // Three cases handled here:
1407        //
1408        // 1. wal_dir exists as a DIRECTORY → scan its
1409        //    `<unix_us>_<leading_lsn>.wal` chunks (sorted
1410        //    lexicographically = chunk-creation order), replay
1411        //    them in sequence, advance the LSN watermark to the
1412        //    max commit_lsn seen.
1413        //
1414        // 2. wal_path exists as a FILE → legacy v7.18 layout.
1415        //    Migrate it: create `wal_dir/`, move the single file
1416        //    inside as `0000000000000000_0000000000000000.wal`,
1417        //    then fall through to case 1's replay loop.
1418        //
1419        // 3. Neither exists → fresh database; create wal_dir.
1420        let mut initial_lsn: u64 = 0;
1421        if wal_path.is_file() {
1422            // Case 2: legacy single-file WAL migration.
1423            let legacy_bytes = std::fs::read(&wal_path).map_err(io_err)?;
1424            std::fs::remove_file(&wal_path).map_err(io_err)?;
1425            std::fs::create_dir_all(&wal_dir).map_err(io_err)?;
1426            if !legacy_bytes.is_empty() {
1427                let migrated = wal_dir.join(legacy_chunk_filename());
1428                std::fs::write(&migrated, &legacy_bytes).map_err(io_err)?;
1429            }
1430        } else if !wal_dir.exists() {
1431            // Case 3: fresh database.
1432            std::fs::create_dir_all(&wal_dir).map_err(io_err)?;
1433        }
1434        // Cases 1 + 2 share replay logic now that wal_dir is
1435        // guaranteed to exist (and may be empty for case 3).
1436        //
1437        // Two-pass replay so we don't double-apply records the
1438        // snapshot already reflects:
1439        //
1440        // 1. Find the highest commit_lsn carried by a
1441        //    checkpoint_marker across all chunks. That LSN is the
1442        //    snapshot's high-water mark — anything ≤ it is
1443        //    already in `<db_path>` and replaying it would
1444        //    DuplicateTable / double-insert.
1445        // 2. Replay only records strictly above that LSN.
1446        //
1447        // Case 2 migration (legacy single-file WAL) lands here
1448        // too: the migrated chunk has no marker so the LSN floor
1449        // is 0 and every record applies — exactly the v7.18
1450        // behaviour the migration is supposed to preserve.
1451        let chunk_paths = sorted_wal_chunks(&wal_dir).map_err(io_err)?;
1452        let mut snapshot_lsn: u64 = 0;
1453        for chunk in &chunk_paths {
1454            let bytes = std::fs::read(chunk).map_err(io_err)?;
1455            if let Ok(records) = parse_wal_records(&bytes) {
1456                for r in &records {
1457                    if r.type_byte == WAL_V4_TYPE_CHECKPOINT_MARKER {
1458                        if let Some(l) = r.commit_lsn {
1459                            if l > snapshot_lsn {
1460                                snapshot_lsn = l;
1461                            }
1462                        }
1463                    }
1464                }
1465            }
1466        }
1467        for chunk in &chunk_paths {
1468            let bytes = std::fs::read(chunk).map_err(io_err)?;
1469            if bytes.is_empty() {
1470                continue;
1471            }
1472            replay_wal_filtered(&bytes, &mut engine, snapshot_lsn)
1473                .map_err(|m| EngineError::Storage(spg_storage::StorageError::Corrupt(m)))?;
1474            if let Ok(records) = parse_wal_records(&bytes) {
1475                if let Some(max) = records.iter().filter_map(|r| r.commit_lsn).max() {
1476                    if max > initial_lsn {
1477                        initial_lsn = max;
1478                    }
1479                }
1480            }
1481        }
1482        // Open the "current" chunk — either the last existing
1483        // chunk file (so subsequent appends extend it until the
1484        // size threshold rotates) or a fresh first chunk.
1485        let now_us = wall_clock_micros();
1486        let current_chunk_path = if let Some(last) = chunk_paths.last() {
1487            last.clone()
1488        } else {
1489            wal_dir.join(chunk_filename(now_us, initial_lsn + 1))
1490        };
1491        let wal_file = OpenOptions::new()
1492            .create(true)
1493            .append(true)
1494            .read(true)
1495            .open(&current_chunk_path)
1496            .map_err(io_err)?;
1497        let wal_len = wal_file.metadata().map_err(io_err)?.len();
1498        let wal = Arc::new(WalGroup::new(wal_file, wal_len));
1499        // v7.19 P3 — spawn retention sweep thread when the
1500        // operator opted in via SPG_PITR_RETENTION_HOURS > 0.
1501        // Otherwise stay on the v7.18 behaviour (chunks accumulate
1502        // until something else — backup-pitr archival, manual
1503        // cleanup — moves them).
1504        let retention_hours = pitr_retention_hours();
1505        let (retention_shutdown, retention_thread) = if retention_hours > 0 {
1506            let shutdown = Arc::new(AtomicBool::new(false));
1507            let shutdown_clone = Arc::clone(&shutdown);
1508            let wal_dir_clone = wal_dir.clone();
1509            let check_interval = std::time::Duration::from_secs(pitr_retention_check_sec());
1510            let archive_cmd = pitr_archive_cmd();
1511            let handle = std::thread::Builder::new()
1512                .name("spg-pitr-retention".into())
1513                .spawn(move || {
1514                    retention_sweep_loop(
1515                        wal_dir_clone,
1516                        retention_hours,
1517                        check_interval,
1518                        archive_cmd,
1519                        shutdown_clone,
1520                    );
1521                })
1522                .map_err(io_err)?;
1523            (Some(shutdown), Some(handle))
1524        } else {
1525            (None, None)
1526        };
1527        // v7.20 — background flusher for SPG_SYNCHRONOUS_COMMIT=off.
1528        let (flusher_shutdown, flusher_thread) = if synchronous_commit_on() {
1529            (None, None)
1530        } else {
1531            let shutdown = Arc::new(AtomicBool::new(false));
1532            let shutdown_clone = Arc::clone(&shutdown);
1533            let group = Arc::clone(&wal);
1534            let interval = std::time::Duration::from_millis(wal_writer_delay_ms());
1535            let handle = std::thread::Builder::new()
1536                .name("spg-wal-flusher".into())
1537                .spawn(move || {
1538                    while !shutdown_clone.load(Ordering::SeqCst) {
1539                        std::thread::sleep(interval);
1540                        if let Err(e) = group.flush_now() {
1541                            eprintln!("spg-embedded: background WAL flush failed: {e:?}");
1542                        }
1543                    }
1544                    // Final drain on shutdown signal.
1545                    let _ = group.flush_now();
1546                })
1547                .map_err(io_err)?;
1548            (Some(shutdown), Some(handle))
1549        };
1550        Ok(Self {
1551            engine,
1552            commit_lsn: AtomicU64::new(initial_lsn),
1553            tx_wal: None,
1554            persistence: Some(PersistenceCtx {
1555                db_path,
1556                wal_dir,
1557                current_chunk_path,
1558                wal,
1559                checkpoint_threshold_bytes: default_checkpoint_threshold_bytes(),
1560                cold_segments_dir,
1561                cold_segment_paths,
1562                lock_path,
1563                retention_shutdown,
1564                retention_thread,
1565                flusher_shutdown,
1566                flusher_thread,
1567            }),
1568        })
1569    }
1570
1571    /// v7.1.4 — freeze the oldest `max_rows` of `table_name`'s
1572    /// hot tier into a brand-new cold-tier segment + persist
1573    /// it to disk. Same semantics as `spg-server`'s freezer
1574    /// thread; embedded just runs the freeze synchronously on
1575    /// the caller's thread. Persistence + manifest update
1576    /// happen as part of the next `checkpoint()` (or on Drop).
1577    pub fn freeze_oldest_to_cold(
1578        &mut self,
1579        table_name: &str,
1580        index_name: &str,
1581        max_rows: usize,
1582    ) -> Result<spg_storage::FreezeReport, EngineError> {
1583        let report = self
1584            .engine
1585            .freeze_oldest_to_cold(table_name, index_name, max_rows)?;
1586        if let Some(p) = &mut self.persistence {
1587            std::fs::create_dir_all(&p.cold_segments_dir).map_err(io_err)?;
1588            let final_path = p
1589                .cold_segments_dir
1590                .join(format!("seg_{}.spg", report.segment_id));
1591            let tmp_path = p
1592                .cold_segments_dir
1593                .join(format!("seg_{}.spg.tmp", report.segment_id));
1594            std::fs::write(&tmp_path, &report.segment_bytes).map_err(io_err)?;
1595            std::fs::rename(&tmp_path, &final_path).map_err(io_err)?;
1596            p.cold_segment_paths.insert(report.segment_id, final_path);
1597        }
1598        Ok(report)
1599    }
1600
1601    /// v7.1 — override the auto-checkpoint WAL-size ceiling for
1602    /// this `Database` instance. Default is
1603    /// `SPG_EMBEDDED_CHECKPOINT_BYTES` env (4 MiB if unset); the
1604    /// setter wins. No-op when the database is in-memory.
1605    pub fn set_checkpoint_threshold_bytes(&mut self, bytes: u64) {
1606        if let Some(p) = &mut self.persistence {
1607            p.checkpoint_threshold_bytes = bytes.max(1);
1608        }
1609    }
1610
1611    /// v7.1 — flush a fresh catalog snapshot to `db_path` and
1612    /// truncate the WAL. Idempotent; cheap when nothing has
1613    /// happened since the last checkpoint. No-op when the
1614    /// database is in-memory (no `db_path` configured).
1615    ///
1616    /// Called automatically when:
1617    /// - the WAL grows past
1618    ///   `SPG_EMBEDDED_CHECKPOINT_BYTES` (default 4 MiB) at the
1619    ///   end of an `execute()`, and
1620    /// - `Drop` runs (best-effort; checkpoint failure on drop is
1621    ///   logged to stderr).
1622    pub fn checkpoint(&mut self) -> Result<(), EngineError> {
1623        let snapshot = self.engine.snapshot();
1624        let Some(p) = &mut self.persistence else {
1625            return Ok(());
1626        };
1627        // Snapshot first (atomic via tmp+rename), then WAL
1628        // truncate. Same order as `spg-server`'s CHECKPOINT —
1629        // a crash between the two leaves the WAL holding
1630        // already-snapshotted ops, which replay cleanly on the
1631        // next boot (idempotent for SPG's standard DDL/DML
1632        // mutations).
1633        let tmp = {
1634            let mut t = p.db_path.clone();
1635            let mut name = t
1636                .file_name()
1637                .map(std::ffi::OsStr::to_os_string)
1638                .unwrap_or_default();
1639            name.push(".tmp");
1640            t.set_file_name(name);
1641            t
1642        };
1643        std::fs::write(&tmp, &snapshot).map_err(io_err)?;
1644        std::fs::rename(&tmp, &p.db_path).map_err(io_err)?;
1645        // v7.1.4 — refresh the manifest so the next boot can
1646        // reload cold segments alongside the snapshot. Bytes
1647        // come from the freshly-written snapshot file (= the
1648        // canonical CRC source).
1649        if !p.cold_segment_paths.is_empty() {
1650            let snap_crc = spg_crypto::crc32::crc32(&snapshot);
1651            let entries: Vec<ColdSegmentEntry> = p
1652                .cold_segment_paths
1653                .iter()
1654                .filter_map(|(&segment_id, path)| {
1655                    let bytes = std::fs::read(path).ok()?;
1656                    Some(ColdSegmentEntry {
1657                        segment_id,
1658                        path: path.clone(),
1659                        crc32: spg_crypto::crc32::crc32(&bytes),
1660                    })
1661                })
1662                .collect();
1663            let manifest = CatalogManifest {
1664                catalog_crc32: snap_crc,
1665                cold_segments: entries,
1666                wal_baseline_offset: 0,
1667            };
1668            let m_bytes = manifest.serialize();
1669            let m_path = spg_manifest_path(&p.db_path);
1670            if let Some(dir) = m_path.parent() {
1671                std::fs::create_dir_all(dir).map_err(io_err)?;
1672            }
1673            let m_tmp = {
1674                let mut t = m_path.clone();
1675                let mut name = t
1676                    .file_name()
1677                    .map(std::ffi::OsStr::to_os_string)
1678                    .unwrap_or_default();
1679                name.push(".tmp");
1680                t.set_file_name(name);
1681                t
1682            };
1683            std::fs::write(&m_tmp, &m_bytes).map_err(io_err)?;
1684            std::fs::rename(&m_tmp, &m_path).map_err(io_err)?;
1685        }
1686        // v7.19 — append a checkpoint marker to the current chunk
1687        // (anchors restore-to-time backups), then rotate to a
1688        // fresh chunk file. Old chunks stay on disk and become
1689        // input to the retention thread (P3) + spgctl backup-pitr
1690        // (P6). The single-file `set_len(0)` truncate the v7.18
1691        // path used is gone — that path silently discarded WAL
1692        // history between checkpoint and the operator's next cron
1693        // run, which is exactly what PITR was meant to fix.
1694        let marker_lsn = self.commit_lsn.load(Ordering::SeqCst);
1695        let marker_ts = wall_clock_micros();
1696        let marker = encode_v4_checkpoint_marker(marker_lsn, marker_ts, &p.db_path);
1697        // v7.20 P2 — checkpoint holds &mut self (engine
1698        // exclusive), so there are no concurrent enqueues: drain
1699        // the pending batch, append the marker, flush, then
1700        // rotate the chunk handle inside the group.
1701        p.wal.enqueue(&marker);
1702        p.wal.flush_now()?;
1703        let new_chunk_path = p.wal_dir.join(chunk_filename(marker_ts, marker_lsn + 1));
1704        let new_handle = OpenOptions::new()
1705            .create(true)
1706            .append(true)
1707            .read(true)
1708            .open(&new_chunk_path)
1709            .map_err(io_err)?;
1710        p.current_chunk_path = new_chunk_path;
1711        p.wal.rotate_file(new_handle);
1712        Ok(())
1713    }
1714
1715    /// Restore a database from a previously-captured catalog
1716    /// snapshot. Pairs with `Database::snapshot()` for
1717    /// round-tripping in-memory state without going through
1718    /// the `spg-server` WAL.
1719    pub fn restore(snapshot: &[u8]) -> Result<Self, EngineError> {
1720        let engine = Engine::restore_envelope(snapshot).map_err(|e| {
1721            EngineError::Storage(spg_storage::StorageError::Corrupt(format!("restore: {e}")))
1722        })?;
1723        Ok(Self {
1724            engine,
1725            persistence: None,
1726            commit_lsn: AtomicU64::new(0),
1727            tx_wal: None,
1728        })
1729    }
1730
1731    /// Take a catalog snapshot suitable for `Database::restore`.
1732    /// The bytes are SPG's canonical catalog envelope (FILE_MAGIC
1733    /// + version + payload); round-trips through every released
1734    /// SPG version per the STABILITY contract.
1735    #[must_use]
1736    pub fn snapshot(&self) -> Vec<u8> {
1737        self.engine.snapshot()
1738    }
1739
1740    /// Execute a SQL statement and return the engine's
1741    /// `QueryResult` verbatim. Pass-through for callers that
1742    /// want to keep PG-flavoured column/row metadata.
1743    ///
1744    /// v7.1 — when the database was opened via `open_path`,
1745    /// successful mutations are appended to the WAL + fsynced
1746    /// before the call returns. A subsequent process crash will
1747    /// recover state up to the last successful return from
1748    /// `execute()`. Read-only statements (SELECT / SHOW /
1749    /// EXPLAIN / BEGIN-COMMIT-ROLLBACK / CHECKPOINT / COMPACT
1750    /// etc.) skip the WAL entirely.
1751    pub fn execute(&mut self, sql: &str) -> Result<QueryResult, EngineError> {
1752        // v7.20 P2 — single-caller convenience over the buffered
1753        // path: enqueue + immediately wait. Batch size is 1 here,
1754        // so the durability behaviour (one fsync before Ok) is
1755        // identical to v7.19. Concurrent callers go through
1756        // `execute_buffered` (AsyncDatabase does) and share the
1757        // leader's fsync.
1758        let (result, ticket) = self.execute_buffered(sql)?;
1759        if let Some(t) = ticket {
1760            t.wait()?;
1761        }
1762        Ok(result)
1763    }
1764
1765    /// v7.20 P2 — group-commit write entry. Runs the engine
1766    /// mutation + encodes/enqueues the WAL record, then RETURNS
1767    /// WITHOUT waiting for the fsync. The caller must call
1768    /// [`WalTicket::wait`] before treating the write as durable
1769    /// — crucially, the caller can (and should) drop whatever
1770    /// lock guards this `Database` first, so the next writer's
1771    /// mutation overlaps this batch's fsync.
1772    ///
1773    /// `None` ticket = nothing hit the WAL (read-only statement,
1774    /// no-op DDL, or in-memory database) — the result is final
1775    /// as returned.
1776    ///
1777    /// # Errors
1778    /// Engine errors propagate unchanged. Auto-checkpoint (when
1779    /// the active chunk crosses the threshold) runs inline and
1780    /// may surface IO errors.
1781    pub fn execute_buffered(
1782        &mut self,
1783        sql: &str,
1784    ) -> Result<(QueryResult, Option<WalTicket>), EngineError> {
1785        let result = self.engine.execute(sql)?;
1786        let modified = matches!(
1787            &result,
1788            QueryResult::CommandOk {
1789                modified_catalog: true,
1790                ..
1791            }
1792        );
1793        let ticket = self.wal_after_ok(sql, modified)?;
1794        Ok((result, ticket))
1795    }
1796
1797    /// v7.21 (round-12 polish) — post-engine WAL bookkeeping shared
1798    /// by the simple ([`Self::execute_buffered`]) and prepared
1799    /// ([`Self::execute_prepared_buffered`]) write paths. `canonical`
1800    /// is the replay text (bind-final for prepared statements);
1801    /// `modified_catalog` comes from the engine result. Three routes:
1802    ///
1803    /// - transaction control → maintain [`Self::tx_wal`]: BEGIN opens
1804    ///   the buffer, COMMIT flushes it as ONE atomic
1805    ///   `WAL_V4_TYPE_TX_COMMIT_SQL` record, ROLLBACK drops it,
1806    ///   SAVEPOINT / ROLLBACK TO mark / truncate it. The engine has
1807    ///   already accepted the statement, so this only mirrors state.
1808    /// - inside an open transaction → buffer the statement (shadow-
1809    ///   catalog mutations report `modified_catalog: false`, so the
1810    ///   auto-commit arm below can't see them).
1811    /// - auto-commit mutation → classic per-statement v4 record.
1812    ///
1813    /// v7.18 PITR — v4 records carry commit LSN + wall-clock micros.
1814    /// The crash window remains one BATCH: replay re-applies
1815    /// idempotently exactly as before, and a torn batch tail drops
1816    /// cleanly (same torn-write handling).
1817    fn wal_after_ok(
1818        &mut self,
1819        canonical: &str,
1820        modified_catalog: bool,
1821    ) -> Result<Option<WalTicket>, EngineError> {
1822        if self.persistence.is_none() {
1823            return Ok(None);
1824        }
1825        let mut record = None;
1826        match tx_control_kind(canonical) {
1827            Some(TxControl::Begin) => {
1828                self.tx_wal = Some(TxWalBuffer::default());
1829            }
1830            Some(TxControl::Commit) => {
1831                if let Some(buf) = self.tx_wal.take()
1832                    && !buf.statements.is_empty()
1833                {
1834                    let script = buf.statements.join(";\n");
1835                    let lsn = self.commit_lsn.fetch_add(1, Ordering::SeqCst) + 1;
1836                    record = Some(encode_v4_tx_commit(&script, lsn, wall_clock_micros()));
1837                }
1838            }
1839            Some(TxControl::Rollback) => {
1840                self.tx_wal = None;
1841            }
1842            Some(TxControl::Savepoint(name)) => {
1843                if let Some(buf) = &mut self.tx_wal {
1844                    // PG name-reuse semantics: latest mark wins.
1845                    buf.savepoints.retain(|(n, _)| n != &name);
1846                    let mark = buf.statements.len();
1847                    buf.savepoints.push((name, mark));
1848                }
1849            }
1850            Some(TxControl::RollbackToSavepoint(name)) => {
1851                if let Some(buf) = &mut self.tx_wal
1852                    && let Some(pos) = buf.savepoints.iter().position(|(n, _)| n == &name)
1853                {
1854                    let mark = buf.savepoints[pos].1;
1855                    buf.statements.truncate(mark);
1856                    // Later savepoints die with the rollback; the
1857                    // target itself survives (PG keeps it
1858                    // re-rollbackable).
1859                    buf.savepoints.truncate(pos + 1);
1860                }
1861            }
1862            Some(TxControl::ReleaseSavepoint) => {
1863                // RELEASE folds the savepoint into the enclosing tx —
1864                // buffered statements stay. The mark also stays:
1865                // marks are only consulted by ROLLBACK TO, which the
1866                // engine validates first, so a dangling mark is
1867                // unreachable.
1868            }
1869            None => {
1870                if let Some(buf) = &mut self.tx_wal {
1871                    if !sql_is_read_only(canonical) {
1872                        buf.statements.push(canonical.to_string());
1873                    }
1874                } else if modified_catalog && !sql_is_read_only(canonical) {
1875                    let lsn = self.commit_lsn.fetch_add(1, Ordering::SeqCst) + 1;
1876                    record = Some(encode_v4_auto_commit(canonical, lsn, wall_clock_micros()));
1877                }
1878            }
1879        }
1880        let mut ticket = None;
1881        if let Some(record) = record {
1882            let p = self.persistence.as_mut().expect("checked above");
1883            let seq = p.wal.enqueue(&record);
1884            ticket = Some(WalTicket {
1885                group: Arc::clone(&p.wal),
1886                seq,
1887            });
1888            if p.wal.written_len() >= p.checkpoint_threshold_bytes {
1889                self.checkpoint()?;
1890            }
1891        }
1892        Ok(ticket)
1893    }
1894
1895    /// v7.3.0 — typed-row variant of [`Database::query`]. Each
1896    /// row decodes into a `T: FromSpgRow` so callers don't
1897    /// pattern-match on `Value` themselves. Use [`spg_row!`] to
1898    /// generate the impl, or write it by hand.
1899    pub fn query_typed<T: FromSpgRow>(&mut self, sql: &str) -> Result<Vec<T>, EngineError> {
1900        let rows = self.query(sql)?;
1901        rows.into_iter().map(|r| T::from_spg_row(&r)).collect()
1902    }
1903
1904    /// Run a SELECT and return rows as a `Vec<Vec<Value>>` —
1905    /// strips the column-schema metadata for read-side
1906    /// ergonomics. Errors on non-Rows results (DML / DDL
1907    /// statements should go through `execute` instead).
1908    pub fn query(&mut self, sql: &str) -> Result<Vec<Vec<Value>>, EngineError> {
1909        match self.engine.execute(sql)? {
1910            QueryResult::Rows { rows, .. } => Ok(rows.into_iter().map(|r| r.values).collect()),
1911            QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
1912                "query() expects a SELECT — use execute() for DML/DDL".into(),
1913            )),
1914            // v7.5.0 — QueryResult is #[non_exhaustive]; any future
1915            // variant is not a SELECT row stream, treat as Unsupported.
1916            _ => Err(EngineError::Unsupported(
1917                "query() expects a SELECT — use execute() for DML/DDL".into(),
1918            )),
1919        }
1920    }
1921
1922    /// v7.16.0 — column-aware variant of [`Self::query`].
1923    /// Returns the column schema vec alongside the rows so
1924    /// adapters (the spg-sqlx Row impl most notably) can drive
1925    /// name + type-based column lookups. Errors on non-Rows
1926    /// results identically to `query`.
1927    pub fn query_with_columns(
1928        &mut self,
1929        sql: &str,
1930    ) -> Result<(Vec<spg_storage::ColumnSchema>, Vec<Vec<Value>>), EngineError> {
1931        match self.engine.execute(sql)? {
1932            QueryResult::Rows { columns, rows } => {
1933                Ok((columns, rows.into_iter().map(|r| r.values).collect()))
1934            }
1935            QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
1936                "query_with_columns() expects a SELECT — use execute() for DML/DDL".into(),
1937            )),
1938            _ => Err(EngineError::Unsupported(
1939                "query_with_columns() expects a SELECT — use execute() for DML/DDL".into(),
1940            )),
1941        }
1942    }
1943
1944    /// v7.16.0 — column-aware variant of
1945    /// [`Self::query_prepared`]. Same shape as
1946    /// `query_with_columns` but driven from a prepared
1947    /// statement + bound params.
1948    pub fn query_prepared_with_columns(
1949        &mut self,
1950        stmt: &Statement,
1951        params: &[Value],
1952    ) -> Result<(Vec<spg_storage::ColumnSchema>, Vec<Vec<Value>>), EngineError> {
1953        match self.engine.execute_prepared(stmt.stmt.clone(), params)? {
1954            QueryResult::Rows { columns, rows } => {
1955                Ok((columns, rows.into_iter().map(|r| r.values).collect()))
1956            }
1957            QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
1958                "query_prepared_with_columns() expects a SELECT — use execute_prepared() for DML/DDL".into(),
1959            )),
1960            _ => Err(EngineError::Unsupported(
1961                "query_prepared_with_columns() expects a SELECT — use execute_prepared() for DML/DDL".into(),
1962            )),
1963        }
1964    }
1965
1966    /// Borrow the underlying engine. Escape hatch for callers
1967    /// that need access to `spg-engine` APIs not yet surfaced
1968    /// here (transactions, EXPLAIN ANALYZE, etc.).
1969    #[must_use]
1970    pub const fn engine(&self) -> &Engine {
1971        &self.engine
1972    }
1973
1974    /// Mutable borrow of the underlying engine. Same intent as
1975    /// `engine()` but for write-side APIs (e.g. inserting
1976    /// directly through `Catalog::insert` for high-throughput
1977    /// bulk loads that bypass SQL parsing).
1978    pub const fn engine_mut(&mut self) -> &mut Engine {
1979        &mut self.engine
1980    }
1981
1982    /// v7.16.0 — parse + plan a SQL string ONCE so subsequent
1983    /// `execute_prepared` / `query_prepared` calls can re-bind
1984    /// parameters without re-parsing. The returned [`Statement`]
1985    /// is a thin handle around the AST + cached source SQL; it's
1986    /// `Clone` so the same plan can drive many bind calls
1987    /// concurrently (each call clones the AST and runs
1988    /// placeholder substitution on the clone — the cached
1989    /// plan stays intact).
1990    ///
1991    /// Plan caching follows the engine's existing version-aware
1992    /// rule: a prepared `Statement` whose statistics version
1993    /// has rolled (ANALYZE ran between prepare and execute)
1994    /// will silently re-prepare under the hood. Callers don't
1995    /// need to detect this.
1996    ///
1997    /// Placeholders in the SQL use PG's `$1`, `$2`, … convention.
1998    /// `bind`-time `Value`s are passed as a slice; arity
1999    /// mismatches surface as `EvalError::PlaceholderOutOfRange`
2000    /// at `execute_prepared` time, not here.
2001    ///
2002    /// # Errors
2003    /// Surfaces `EngineError` (parse error / plan rewrite
2004    /// failure) from the underlying `Engine::prepare`.
2005    pub fn prepare(&mut self, sql: &str) -> Result<Statement, EngineError> {
2006        // Use the cached path so repeated prepares of the same
2007        // SQL are O(1). The engine's plan cache stays shared
2008        // across all callers of this Database — a single
2009        // `PgPool`-shaped consumer (or, later, the spg-sqlx
2010        // adapter) prepares once and reaps the win on every bind.
2011        let stmt = self
2012            .engine
2013            .prepare_cached(sql)
2014            .map_err(EngineError::Parse)?;
2015        Ok(Statement {
2016            stmt,
2017            sql: sql.to_string(),
2018        })
2019    }
2020
2021    /// v7.17.0 Phase 3.P0-66 — describe a SQL string without
2022    /// executing. Returns `(parameter_oid_count, output_columns)`
2023    /// where `output_columns` is empty for non-SELECT statements
2024    /// or for SELECT shapes the describe planner can't resolve
2025    /// (JOIN / subquery / unknown table). Wraps
2026    /// `Engine::describe_prepared` so the spg-sqlx bridge can
2027    /// surface PG-shape Describe replies for
2028    /// `sqlx::query!()` compile-time validation.
2029    ///
2030    /// # Errors
2031    /// Propagates parse errors from the underlying prepare path.
2032    pub fn describe(&mut self, sql: &str) -> Result<(Vec<u32>, Vec<ColumnSchema>), EngineError> {
2033        let stmt = self
2034            .engine
2035            .prepare_cached(sql)
2036            .map_err(EngineError::Parse)?;
2037        Ok(self.engine.describe_prepared(&stmt))
2038    }
2039
2040    /// v7.16.0 — execute a prepared statement with bound
2041    /// parameters. Mirrors `Engine::execute_prepared`: clones
2042    /// the AST, substitutes `$1..$N` → `params[0..N-1]`, runs.
2043    ///
2044    /// Persistence (WAL fsync + auto-checkpoint) follows the
2045    /// same rules as `execute(sql)`: mutating statements get a
2046    /// WAL record AFTER the in-memory exec succeeds. The WAL
2047    /// record carries the substituted, bind-final SQL, so
2048    /// replay reconstructs the same row state without needing
2049    /// the original prepared `Statement` to still be alive.
2050    ///
2051    /// # Errors
2052    /// Propagates engine errors. Param arity mismatch surfaces
2053    /// as `EvalError::PlaceholderOutOfRange`.
2054    pub fn execute_prepared(
2055        &mut self,
2056        stmt: &Statement,
2057        params: &[Value],
2058    ) -> Result<QueryResult, EngineError> {
2059        let (result, ticket) = self.execute_prepared_buffered(stmt, params)?;
2060        if let Some(t) = ticket {
2061            t.wait()?;
2062        }
2063        Ok(result)
2064    }
2065
2066    /// v7.20 P2 — group-commit variant of
2067    /// [`Database::execute_prepared`]. Same contract as
2068    /// [`Database::execute_buffered`]: mutation + enqueue happen
2069    /// here; the caller waits on the ticket AFTER releasing
2070    /// whatever lock guards this `Database`.
2071    ///
2072    /// # Errors
2073    /// Engine errors propagate unchanged; inline auto-checkpoint
2074    /// may surface IO errors.
2075    pub fn execute_prepared_buffered(
2076        &mut self,
2077        stmt: &Statement,
2078        params: &[Value],
2079    ) -> Result<(QueryResult, Option<WalTicket>), EngineError> {
2080        let result = self.engine.execute_prepared(stmt.stmt.clone(), params)?;
2081        let modified = matches!(
2082            &result,
2083            QueryResult::CommandOk {
2084                modified_catalog: true,
2085                ..
2086            }
2087        );
2088        // WAL persistence on the bind-final SQL. Build the
2089        // canonical Display form by re-printing the
2090        // placeholder-substituted statement (cheap — the AST
2091        // is already in hand from execute_prepared's internal
2092        // clone) so replay's path is identical to the
2093        // simple-query path. v7.21: also when a transaction is
2094        // open — in-tx mutations report `modified_catalog: false`
2095        // but must reach the tx WAL buffer (see `wal_after_ok`).
2096        let mut ticket = None;
2097        if self.persistence.is_some()
2098            && (modified
2099                || (self.tx_wal.is_some() && !sql_is_read_only(&stmt.sql))
2100                || tx_control_kind(&stmt.sql).is_some())
2101        {
2102            let mut wal_stmt = stmt.stmt.clone();
2103            crate::wal_render_with_params(&mut wal_stmt, params);
2104            let canonical = format!("{wal_stmt}");
2105            ticket = self.wal_after_ok(&canonical, modified)?;
2106        }
2107        Ok((result, ticket))
2108    }
2109
2110    /// v7.16.0 — run a prepared SELECT with bound params and
2111    /// return rows as `Vec<Vec<Value>>`, matching `query()`
2112    /// shape. SELECTs are read-only so this never writes the
2113    /// WAL.
2114    ///
2115    /// # Errors
2116    /// Returns `Unsupported` if the prepared statement isn't a
2117    /// SELECT (use `execute_prepared` for DML/DDL).
2118    pub fn query_prepared(
2119        &mut self,
2120        stmt: &Statement,
2121        params: &[Value],
2122    ) -> Result<Vec<Vec<Value>>, EngineError> {
2123        match self.engine.execute_prepared(stmt.stmt.clone(), params)? {
2124            QueryResult::Rows { rows, .. } => Ok(rows.into_iter().map(|r| r.values).collect()),
2125            QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
2126                "query_prepared() expects a SELECT — use execute_prepared() for DML/DDL".into(),
2127            )),
2128            _ => Err(EngineError::Unsupported(
2129                "query_prepared() expects a SELECT — use execute_prepared() for DML/DDL".into(),
2130            )),
2131        }
2132    }
2133
2134    /// v7.18 — parse + plan a SQL string against a
2135    /// `CatalogSnapshot`. Mirror of [`Database::prepare`] for the
2136    /// readonly fan-out path: no writer lock taken, no WAL write,
2137    /// no plan-cache mutation. Static-on-`Self` so callers can
2138    /// dispatch against a snapshot without an `&mut Database`
2139    /// borrow — `AsyncReadHandle::prepare` in spg-embedded-tokio
2140    /// is the load-bearing consumer.
2141    ///
2142    /// # Errors
2143    /// Propagates `EngineError::Parse` from the parser.
2144    pub fn prepare_on_snapshot(
2145        snapshot: &CatalogSnapshot,
2146        sql: &str,
2147    ) -> Result<Statement, EngineError> {
2148        let stmt =
2149            spg_engine::Engine::prepare_on_snapshot(snapshot, sql).map_err(EngineError::Parse)?;
2150        Ok(Statement {
2151            stmt,
2152            sql: sql.to_string(),
2153        })
2154    }
2155
2156    /// v7.18 — execute a prepared `Statement` against a
2157    /// `CatalogSnapshot` with bound params. Mirror of
2158    /// [`Database::execute_prepared`] on the readonly path:
2159    /// writes / DDL hit `EngineError::WriteRequired`. No WAL
2160    /// write, no writer lock, multiple snapshots can run
2161    /// concurrently — the snapshot is immutable from prepare time.
2162    ///
2163    /// # Errors
2164    /// Surfaces `EngineError::WriteRequired` for non-readonly
2165    /// statements; propagates other engine errors.
2166    pub fn execute_prepared_on_snapshot(
2167        snapshot: &CatalogSnapshot,
2168        stmt: &Statement,
2169        params: &[Value],
2170    ) -> Result<QueryResult, EngineError> {
2171        spg_engine::Engine::execute_readonly_prepared_on_snapshot(
2172            snapshot,
2173            stmt.stmt.clone(),
2174            params,
2175        )
2176    }
2177
2178    /// v7.28 (round-22) — deadline-bounded variant of
2179    /// [`Database::execute_prepared_on_snapshot`]. Returns
2180    /// `EngineError::Cancelled` once the budget elapses; the
2181    /// sqlx driver uses this to keep readonly-INLINE execution
2182    /// from monopolising the caller's async runtime (four slow
2183    /// inbox queries saturated mailrs's whole tokio pool) and
2184    /// re-runs over the blocking pool on timeout.
2185    ///
2186    /// # Errors
2187    /// `EngineError::Cancelled` on budget expiry; engine errors
2188    /// otherwise.
2189    pub fn execute_prepared_on_snapshot_with_budget(
2190        snapshot: &CatalogSnapshot,
2191        stmt: &Statement,
2192        params: &[Value],
2193        budget_us: u64,
2194    ) -> Result<QueryResult, EngineError> {
2195        fn mono_now_us() -> u64 {
2196            use std::time::{SystemTime, UNIX_EPOCH};
2197            // Monotonic enough for a per-call relative budget: the
2198            // engine only compares (now - start) against the budget
2199            // within one call.
2200            SystemTime::now()
2201                .duration_since(UNIX_EPOCH)
2202                .map(|d| u64::try_from(d.as_micros()).unwrap_or(u64::MAX))
2203                .unwrap_or(0)
2204        }
2205        let deadline = mono_now_us().saturating_add(budget_us);
2206        let token = spg_engine::CancelToken::none().with_deadline(mono_now_us, deadline);
2207        spg_engine::Engine::execute_readonly_prepared_on_snapshot_with_cancel(
2208            snapshot,
2209            stmt.stmt.clone(),
2210            params,
2211            token,
2212        )
2213    }
2214
2215    /// v7.18 — describe a SQL string against a
2216    /// `CatalogSnapshot`. Mirror of [`Database::describe`] on
2217    /// the readonly path. Pure function on the snapshot's
2218    /// catalog; safe to call from any thread.
2219    ///
2220    /// # Errors
2221    /// Propagates `EngineError::Parse` from the parser.
2222    pub fn describe_on_snapshot(
2223        snapshot: &CatalogSnapshot,
2224        sql: &str,
2225    ) -> Result<(Vec<u32>, Vec<ColumnSchema>), EngineError> {
2226        let stmt =
2227            spg_engine::Engine::prepare_on_snapshot(snapshot, sql).map_err(EngineError::Parse)?;
2228        Ok(spg_engine::Engine::describe_prepared_on_snapshot(
2229            snapshot, &stmt,
2230        ))
2231    }
2232
2233    /// v7.21 (round-12 polish) — run a multi-statement SQL script
2234    /// with PG simple-query semantics: the statements execute in
2235    /// order inside ONE implicit transaction, so a mid-script error
2236    /// rolls back the whole script (PG wraps every simple-query
2237    /// message in an implicit transaction). Three exceptions, all
2238    /// PG-faithful:
2239    ///
2240    /// - a script that carries its OWN transaction control
2241    ///   (BEGIN / COMMIT / …) runs statement-by-statement — the
2242    ///   script owns its boundaries;
2243    /// - a script run while the caller already has a transaction
2244    ///   open joins that transaction (no nested BEGIN), and the
2245    ///   caller's COMMIT / ROLLBACK decides its fate;
2246    /// - a single-statement script is plain auto-commit.
2247    ///
2248    /// Returns one `QueryResult` per executed statement. This is the
2249    /// engine behind `sqlx::raw_sql` (mailrs feeds whole
2250    /// `init-schema.sql` files through it) and `spgctl import`.
2251    ///
2252    /// # Errors
2253    /// The first failing statement's error propagates after the
2254    /// implicit ROLLBACK; nothing from the script remains applied.
2255    pub fn execute_script(&mut self, sql: &str) -> Result<Vec<QueryResult>, EngineError> {
2256        let stmts = split_statements(sql);
2257        let script_owns_tx = stmts.iter().any(|s| tx_control_kind(s).is_some());
2258        let wrap = stmts.len() > 1 && !script_owns_tx && !self.engine.in_transaction();
2259        if !wrap {
2260            let mut out = Vec::with_capacity(stmts.len());
2261            for stmt in &stmts {
2262                out.push(self.execute_dump_statement(stmt)?);
2263            }
2264            return Ok(out);
2265        }
2266        self.execute("BEGIN")?;
2267        let mut out = Vec::with_capacity(stmts.len());
2268        for stmt in &stmts {
2269            match self.execute_dump_statement(stmt) {
2270                Ok(r) => out.push(r),
2271                Err(e) => {
2272                    // Best-effort rollback; surface the script error.
2273                    let _ = self.execute("ROLLBACK");
2274                    return Err(e);
2275                }
2276            }
2277        }
2278        self.execute("COMMIT")?;
2279        Ok(out)
2280    }
2281
2282    /// v7.22 (round-13 T2) — execute one `split_statements` chunk,
2283    /// lowering a `COPY … FROM stdin;` block (statement + its data
2284    /// lines, as one chunk) to per-row INSERTs through the shared
2285    /// `spg_engine::copy` helpers. Default-format pg_dump emits
2286    /// COPY blocks, so the zero-change import promise needs this on
2287    /// the embed path; non-COPY statements pass straight through to
2288    /// [`Self::execute`]. Public so `spgctl import` can keep its
2289    /// per-statement error indexing while sharing the lowering.
2290    ///
2291    /// # Errors
2292    /// Engine errors propagate; for COPY the failing row's INSERT
2293    /// error carries the synthesized statement context.
2294    pub fn execute_dump_statement(&mut self, stmt: &str) -> Result<QueryResult, EngineError> {
2295        // Strip pg_dump's `-- Data for Name: …;` banner (it carries
2296        // semicolons of its own) before splitting head from data.
2297        let stmt_clean = strip_leading_sql_noise(stmt);
2298        let head_is_copy = stmt_clean
2299            .get(..4)
2300            .is_some_and(|p| p.eq_ignore_ascii_case("copy"));
2301        if head_is_copy
2302            && let Some((head, data)) = stmt_clean.split_once(';')
2303            && let Some(spec) = spg_engine::copy::parse_copy_from_stdin_head(head)
2304        {
2305            let mut affected: usize = 0;
2306            for line in data.lines() {
2307                // Empty fragments only occur at the chunk boundary
2308                // (the remainder of the COPY line right after `;`);
2309                // data rows are whole non-empty lines.
2310                let line = line.strip_suffix('\r').unwrap_or(line);
2311                if line.is_empty() {
2312                    continue;
2313                }
2314                let values = spg_engine::copy::decode_copy_text_row(line);
2315                let insert = spg_engine::copy::build_copy_insert(
2316                    &spec.table,
2317                    spec.columns.as_deref(),
2318                    &values,
2319                );
2320                match self.execute(&insert)? {
2321                    QueryResult::CommandOk { affected: n, .. } => affected += n,
2322                    _ => affected += 1,
2323                }
2324            }
2325            return Ok(QueryResult::CommandOk {
2326                affected,
2327                modified_catalog: false,
2328            });
2329        }
2330        self.execute(stmt)
2331    }
2332
2333    /// v7.2.0 — run `body` inside an implicit `BEGIN` /
2334    /// `COMMIT` pair. The body receives `&mut Database` so it
2335    /// can `execute()` / `query()` like any other code path;
2336    /// the only difference is that every write in the body
2337    /// lands inside one transaction, and a returned `Err` from
2338    /// the body triggers `ROLLBACK` before the error propagates.
2339    ///
2340    /// Nested calls are not supported — SPG's transaction
2341    /// model is single-writer with explicit `BEGIN` /
2342    /// `COMMIT` / `ROLLBACK`, and a nested `with_transaction`
2343    /// would hit `EngineError::Unsupported("nested
2344    /// transaction")` at the inner `BEGIN`.
2345    pub fn with_transaction<R, F>(&mut self, body: F) -> Result<R, EngineError>
2346    where
2347        F: FnOnce(&mut Self) -> Result<R, EngineError>,
2348    {
2349        self.execute("BEGIN")?;
2350        match body(self) {
2351            Ok(value) => {
2352                self.execute("COMMIT")?;
2353                Ok(value)
2354            }
2355            Err(e) => {
2356                // Best-effort rollback. If ROLLBACK itself
2357                // fails (rare — the engine reports it via
2358                // `Unsupported` only when there's no active
2359                // TX, which can't happen here) we surface the
2360                // original body error, not the rollback error.
2361                let _ = self.execute("ROLLBACK");
2362                Err(e)
2363            }
2364        }
2365    }
2366}
2367
2368impl Default for Database {
2369    fn default() -> Self {
2370        Self::open_in_memory()
2371    }
2372}
2373
2374/// v7.7.5 — observability snapshot returned by
2375/// [`Database::metrics`]. Plain data, no allocations beyond
2376/// what the struct itself takes; cheap to construct and
2377/// cheap to serialise.
2378#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2379#[non_exhaustive]
2380pub struct EmbeddedMetrics {
2381    /// Total live row count across every user table (hot
2382    /// tier only — cold-tier rows live in segment files).
2383    pub hot_rows: u64,
2384    /// Sum of `Table::hot_bytes` across every user table.
2385    /// Tracks against the freezer's `hot_tier_bytes` budget.
2386    pub hot_bytes: u64,
2387    /// Number of cold-tier segments registered in the catalog.
2388    /// Includes tombstoned slots (segments retired by
2389    /// compaction whose disk file may still be on disk).
2390    pub cold_segments: u64,
2391    /// User-table count (excludes any future engine-managed
2392    /// internal tables).
2393    pub tables: u64,
2394    /// WAL size at last `execute()` / `checkpoint()`. Zero
2395    /// when the database is in-memory.
2396    pub wal_bytes: u64,
2397    /// `true` when the database was opened with `open_path` —
2398    /// i.e. WAL + checkpoint persistence is active.
2399    pub persistent: bool,
2400}
2401
2402/// v7.2.1 — handle returned by `spawn_background_freezer`.
2403/// Drop signals the worker thread to wind down + joins it,
2404/// so a `Database` (or its shared `Arc<Mutex<Database>>`)
2405/// can safely drop after the handle does.
2406#[must_use = "the background freezer keeps running until this handle is dropped"]
2407#[derive(Debug)]
2408pub struct FreezerHandle {
2409    shutdown: Arc<AtomicBool>,
2410    join: Option<JoinHandle<()>>,
2411}
2412
2413impl FreezerHandle {
2414    /// v7.2.1 — request the worker stop + join. Idempotent;
2415    /// safe to call from `Drop` (which also calls it).
2416    pub fn stop(&mut self) {
2417        self.shutdown.store(true, Ordering::Release);
2418        if let Some(h) = self.join.take() {
2419            let _ = h.join();
2420        }
2421    }
2422}
2423
2424impl Drop for FreezerHandle {
2425    fn drop(&mut self) {
2426        self.stop();
2427    }
2428}
2429
2430/// v7.2.1 — knobs for `Database::spawn_background_freezer`.
2431#[derive(Debug, Clone)]
2432pub struct FreezerOptions {
2433    /// Tick interval. Worker wakes every `tick`, checks the
2434    /// catalog's `hot_tier_bytes`, and freezes if over budget.
2435    pub tick: Duration,
2436    /// Hot-tier byte budget. Exceeded → next tick freezes the
2437    /// largest table's oldest `batch_rows` rows into a new
2438    /// cold segment.
2439    pub hot_tier_bytes: u64,
2440    /// Max rows the freezer demotes per fire.
2441    pub batch_rows: usize,
2442    /// v7.7.4 — auto-compact threshold. When the catalog has
2443    /// at least this many cold segments across all tables, the
2444    /// freezer fires a compaction pass after its next freeze.
2445    /// Set to `usize::MAX` to disable auto-compact entirely;
2446    /// the default is `64`, matching the `spg-server` operating
2447    /// point for SPG_COLD_COMPACT_SEGMENT_THRESHOLD.
2448    pub compact_when_segments_exceed: usize,
2449    /// v7.7.4 — target segment size for compaction merges,
2450    /// in bytes. Default 64 MiB, mirroring `spg-server`. Small
2451    /// segments below this size are merge candidates;
2452    /// segments at or above stay untouched.
2453    pub compact_target_bytes: u64,
2454}
2455
2456impl Default for FreezerOptions {
2457    fn default() -> Self {
2458        // Match the `spg-server` freezer's default operating
2459        // point (SPG_HOT_TIER_BYTES = 4 GiB, batch 1000 rows,
2460        // tick every 1 s) so embedded behaviour is predictable
2461        // for operators familiar with the server.
2462        Self {
2463            tick: Duration::from_secs(1),
2464            hot_tier_bytes: 4 * 1024 * 1024 * 1024,
2465            batch_rows: 1000,
2466            compact_when_segments_exceed: 64,
2467            compact_target_bytes: 64 * 1024 * 1024,
2468        }
2469    }
2470}
2471
2472impl Database {
2473    /// v7.7.4 — observe the catalog's cold-segment count.
2474    /// Useful for tests + dashboards that want to verify
2475    /// auto-compaction is firing.
2476    #[must_use]
2477    pub fn cold_segment_count(&self) -> usize {
2478        self.engine.catalog().cold_segment_count()
2479    }
2480
2481    /// v7.7.5 — observability snapshot. Returns a point-in-time
2482    /// view of the engine + persistence counters. Cheap (no
2483    /// locks beyond the existing `&self` borrow), so safe to
2484    /// call from a hot metrics-scrape path.
2485    ///
2486    /// Fields mirror the operational dashboard
2487    /// [`spg-server`](https://crates.io/crates/spg-server) exposes,
2488    /// minus the network counters that don't apply to embedded.
2489    #[must_use]
2490    pub fn metrics(&self) -> EmbeddedMetrics {
2491        let cat = self.engine.catalog();
2492        let mut hot_rows: u64 = 0;
2493        let mut hot_bytes: u64 = 0;
2494        for name in cat.table_names() {
2495            if let Some(t) = cat.get(&name) {
2496                hot_rows = hot_rows.saturating_add(t.row_count() as u64);
2497                hot_bytes = hot_bytes.saturating_add(t.hot_bytes());
2498            }
2499        }
2500        let (wal_bytes, persistent) = match &self.persistence {
2501            Some(p) => (p.wal.written_len(), true),
2502            None => (0, false),
2503        };
2504        EmbeddedMetrics {
2505            hot_rows,
2506            hot_bytes,
2507            cold_segments: cat.cold_segment_count() as u64,
2508            tables: cat.table_count() as u64,
2509            wal_bytes,
2510            persistent,
2511        }
2512    }
2513
2514    /// v7.2.1 — spawn a background thread that periodically
2515    /// runs `freeze_oldest_to_cold` when the catalog-wide hot
2516    /// tier exceeds `opts.hot_tier_bytes`. The `Arc<Mutex<_>>`
2517    /// pattern matches the v7.2 sharing story: callers wrap
2518    /// their `Database` in `Arc::new(Mutex::new(db))` once,
2519    /// then clone the Arc for the worker + for foreground
2520    /// access. Return value is a handle whose `Drop` joins the
2521    /// worker.
2522    ///
2523    /// Picks the freeze target the same way `spg-server`'s
2524    /// freezer does: largest-`hot_bytes` user table with at
2525    /// least one BTree integer-PK index. Tables without a
2526    /// freezable index are skipped silently.
2527    pub fn spawn_background_freezer(
2528        db: Arc<Mutex<Database>>,
2529        opts: FreezerOptions,
2530    ) -> FreezerHandle {
2531        let shutdown = Arc::new(AtomicBool::new(false));
2532        let shutdown_for_thread = Arc::clone(&shutdown);
2533        let join = thread::Builder::new()
2534            .name("spg-embedded-freezer".into())
2535            .spawn(move || {
2536                background_freezer_loop(db, opts, shutdown_for_thread);
2537            })
2538            .expect("spawn background freezer thread");
2539        FreezerHandle {
2540            shutdown,
2541            join: Some(join),
2542        }
2543    }
2544}
2545
2546/// v7.2.1 — the freezer's main loop, factored out so the
2547/// `Database::spawn_background_freezer` path stays readable.
2548fn background_freezer_loop(
2549    db: Arc<Mutex<Database>>,
2550    opts: FreezerOptions,
2551    shutdown: Arc<AtomicBool>,
2552) {
2553    // Sleep in short slices so a shutdown request resolves
2554    // quickly (vs sleeping the full tick).
2555    let slice = Duration::from_millis(50.min(opts.tick.as_millis() as u64));
2556    let mut last_tick = std::time::Instant::now();
2557    loop {
2558        if shutdown.load(Ordering::Acquire) {
2559            return;
2560        }
2561        thread::sleep(slice);
2562        if last_tick.elapsed() < opts.tick {
2563            continue;
2564        }
2565        last_tick = std::time::Instant::now();
2566        let Ok(mut guard) = db.lock() else {
2567            return;
2568        };
2569        if guard.engine.catalog().hot_tier_bytes() <= opts.hot_tier_bytes {
2570            continue;
2571        }
2572        let Some((table, index)) = pick_freeze_target(&guard) else {
2573            continue;
2574        };
2575        let row_count = guard
2576            .engine
2577            .catalog()
2578            .get(&table)
2579            .map_or(0, spg_storage::Table::row_count);
2580        let to_freeze = opts.batch_rows.min(row_count);
2581        if to_freeze == 0 {
2582            continue;
2583        }
2584        if let Err(e) = guard.freeze_oldest_to_cold(&table, &index, to_freeze) {
2585            eprintln!("spg-embedded: background freeze on {table}.{index} failed: {e:?}");
2586            continue;
2587        }
2588        // v7.7.4 — auto-compact. If the catalog now carries
2589        // more cold segments than the configured threshold,
2590        // run a single compaction pass. Failures are reported
2591        // but don't kill the loop; the next tick will retry.
2592        let count = guard.engine.catalog().cold_segment_count();
2593        if count > opts.compact_when_segments_exceed {
2594            if let Err(e) = guard
2595                .engine
2596                .compact_cold_segments_with_target(opts.compact_target_bytes)
2597            {
2598                eprintln!(
2599                    "spg-embedded: background compact failed (segments={count}, \
2600                     threshold={}): {e:?}",
2601                    opts.compact_when_segments_exceed,
2602                );
2603            }
2604        }
2605    }
2606}
2607
2608/// v7.2.1 — pick the highest-`hot_bytes` user table with a
2609/// BTree integer-PK index. Returns `(table, index_name)` so the
2610/// caller can dispatch through `freeze_oldest_to_cold`.
2611fn pick_freeze_target(db: &Database) -> Option<(String, String)> {
2612    let cat = db.engine.catalog();
2613    let mut best: Option<(String, String, u64)> = None;
2614    for name in cat.table_names() {
2615        let Some(t) = cat.get(&name) else { continue };
2616        if t.row_count() == 0 {
2617            continue;
2618        }
2619        let cols = &t.schema().columns;
2620        let Some(idx) = t.indices().iter().find(|i| {
2621            matches!(i.kind, spg_storage::IndexKind::BTree(_))
2622                && i.column_position < cols.len()
2623                && matches!(
2624                    cols[i.column_position].ty,
2625                    spg_storage::DataType::SmallInt
2626                        | spg_storage::DataType::Int
2627                        | spg_storage::DataType::BigInt
2628                )
2629        }) else {
2630            continue;
2631        };
2632        let hot = t.hot_bytes();
2633        match best {
2634            None => best = Some((name, idx.name.clone(), hot)),
2635            Some((_, _, best_hot)) if hot > best_hot => {
2636                best = Some((name, idx.name.clone(), hot));
2637            }
2638            _ => {}
2639        }
2640    }
2641    best.map(|(t, i, _)| (t, i))
2642}
2643
2644/// v7.7.6 — replay the first `to_seq` records of the WAL at
2645/// `wal_path` into a fresh engine and write the resulting
2646/// catalog snapshot to `out_db_path`. Same semantics as
2647/// `spg revert --wal … --to-seq N --out …` from the CLI:
2648///
2649///   - `to_seq == 0` → snapshot is the empty catalog
2650///   - WAL records beyond `to_seq` are not applied
2651///   - durability-checkpoint markers (v3 type 0x02) are
2652///     consumed without counting against the budget
2653///
2654/// Returns the number of statements actually applied
2655/// (`≤ to_seq`). The output snapshot is byte-identical to
2656/// what `Database::open_path(out_db_path)` would consume on
2657/// a subsequent open.
2658///
2659/// This is the "rewind" operator for an embedded database
2660/// that has been corrupted by a poison statement or a
2661/// half-applied migration. Pair with `cold_segment_paths`
2662/// preservation if your cold-tier files are still on disk.
2663///
2664/// # Errors
2665///
2666/// - `wal_path` unreadable or truncated mid-record
2667/// - WAL record decodes to invalid UTF-8 SQL
2668/// - WAL record's SQL is rejected by the engine
2669/// - `out_db_path` unwritable
2670pub fn revert_wal_to_seq(
2671    wal_path: impl AsRef<Path>,
2672    to_seq: u64,
2673    out_db_path: impl AsRef<Path>,
2674) -> Result<u64, EngineError> {
2675    // v7.19 — accept either a single-file legacy WAL (v7.18 and
2676    // earlier layout) or a chunked WAL directory (v7.19+). For a
2677    // directory, concatenate every `.wal` chunk in sorted order
2678    // — the same order open_path replays them in — so revert
2679    // sees the full record stream.
2680    let path = wal_path.as_ref();
2681    let wal_bytes = if path.is_dir() {
2682        let mut combined = Vec::new();
2683        let chunks = sorted_wal_chunks(path).map_err(io_err)?;
2684        for chunk in chunks {
2685            let bytes = std::fs::read(&chunk).map_err(io_err)?;
2686            combined.extend_from_slice(&bytes);
2687        }
2688        combined
2689    } else {
2690        std::fs::read(path).map_err(io_err)?
2691    };
2692    let mut engine = Engine::new();
2693    let mut applied = 0u64;
2694    let mut cur = 0usize;
2695    while cur < wal_bytes.len() && applied < to_seq {
2696        let (sql_bytes, total) = decode_wal_record(&wal_bytes[cur..])?;
2697        cur += total;
2698        if sql_bytes.is_empty() {
2699            continue;
2700        }
2701        let sql = core::str::from_utf8(&sql_bytes).map_err(|e| {
2702            EngineError::Storage(spg_storage::StorageError::Corrupt(format!(
2703                "WAL record at offset {cur}: non-UTF-8 SQL: {e}"
2704            )))
2705        })?;
2706        // v7.21 — tx-commit records carry a multi-statement script;
2707        // split_statements is a no-op for single-statement records.
2708        for stmt in split_statements(sql) {
2709            engine.execute(stmt)?;
2710        }
2711        applied += 1;
2712    }
2713    let snapshot = engine.snapshot();
2714    std::fs::write(out_db_path.as_ref(), &snapshot).map_err(io_err)?;
2715    Ok(applied)
2716}
2717
2718/// v7.7.6 — decode one WAL record from a byte tail. Returns
2719/// `(sql_bytes, header_plus_payload_len)`. Handles the three
2720/// on-disk formats (v1 / v2 / v3) the same way the CLI
2721/// `decode_one_record` and the engine's `replay_wal_bytes`
2722/// do. CRCs are not re-validated; the caller's intent is
2723/// "apply", not "validate".
2724fn decode_wal_record(tail: &[u8]) -> Result<(Vec<u8>, usize), EngineError> {
2725    if tail.len() < 4 {
2726        return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
2727            format!("WAL truncated record: {} < 4 header bytes", tail.len()),
2728        )));
2729    }
2730    let raw_len = u32::from_le_bytes(tail[..4].try_into().unwrap());
2731    let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
2732    let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
2733    let len_mask = if is_v3 {
2734        !(WAL_V2_SENTINEL | WAL_V3_FLAG)
2735    } else {
2736        !WAL_V2_SENTINEL
2737    };
2738    let rec_len = (raw_len & len_mask) as usize;
2739    let header_len = if is_v3 {
2740        9
2741    } else if is_v2 {
2742        8
2743    } else {
2744        4
2745    };
2746    if tail.len() < header_len + rec_len {
2747        return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
2748            format!(
2749                "WAL truncated record: header+payload {} > available {}",
2750                header_len + rec_len,
2751                tail.len()
2752            ),
2753        )));
2754    }
2755    if is_v3 {
2756        let type_byte = tail[8];
2757        // v3 type 0x01 = auto_commit_sql (payload = SQL).
2758        // v3 type 0x02 = durability marker (no SQL to apply).
2759        // v4 type 0x10 = auto_commit_sql with 16-byte (lsn, ts)
2760        //                prefix between type and SQL — strip
2761        //                the prefix so the caller still sees raw
2762        //                SQL bytes.
2763        // Anything else is unknown.
2764        if type_byte == WAL_V3_TYPE_AUTO_COMMIT_SQL {
2765            let payload = &tail[header_len..header_len + rec_len];
2766            return Ok((payload.to_vec(), header_len + rec_len));
2767        }
2768        if type_byte == WAL_V4_TYPE_AUTO_COMMIT_SQL || type_byte == WAL_V4_TYPE_TX_COMMIT_SQL {
2769            let v4_total = header_len + WAL_V4_EXTRA_HEADER + rec_len;
2770            if tail.len() < v4_total {
2771                return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
2772                    format!(
2773                        "WAL truncated v4 record: header+payload {v4_total} > available {}",
2774                        tail.len()
2775                    ),
2776                )));
2777            }
2778            let sql_start = header_len + WAL_V4_EXTRA_HEADER;
2779            let sql_bytes = tail[sql_start..sql_start + rec_len].to_vec();
2780            return Ok((sql_bytes, v4_total));
2781        }
2782        // Caller treats empty payload as a skip-marker.
2783        return Ok((Vec::new(), header_len + rec_len));
2784    }
2785    let payload = &tail[header_len..header_len + rec_len];
2786    Ok((payload.to_vec(), header_len + rec_len))
2787}
2788
2789impl Drop for Database {
2790    fn drop(&mut self) {
2791        // v7.1 — best-effort final checkpoint when a persistent
2792        // Database leaves scope. Failures here go to stderr so
2793        // operators see them, but Drop can't propagate errors —
2794        // the WAL itself is already durable, so a checkpoint
2795        // miss only means the next boot replays a few more
2796        // records than strictly necessary.
2797        if self.persistence.is_some() {
2798            if let Err(e) = self.checkpoint() {
2799                eprintln!(
2800                    "spg-embedded: final checkpoint on Drop failed: {e:?} \
2801                     (WAL is intact; next open_path will replay)"
2802                );
2803            }
2804        }
2805        // v7.19 P3 / v7.20 — signal the retention + flusher
2806        // threads to exit, then wait for them. Done BEFORE the
2807        // lock release so background threads don't outlive the
2808        // database handle. The flusher drains the pending batch
2809        // on its way out (final flush_now in the thread body),
2810        // so `SPG_SYNCHRONOUS_COMMIT=off` never loses confirmed
2811        // commits across a clean shutdown.
2812        if let Some(ctx) = self.persistence.as_mut() {
2813            if let Some(shutdown) = ctx.retention_shutdown.take() {
2814                shutdown.store(true, Ordering::SeqCst);
2815            }
2816            if let Some(handle) = ctx.retention_thread.take() {
2817                let _ = handle.join();
2818            }
2819            if let Some(shutdown) = ctx.flusher_shutdown.take() {
2820                shutdown.store(true, Ordering::SeqCst);
2821            }
2822            if let Some(handle) = ctx.flusher_thread.take() {
2823                let _ = handle.join();
2824            }
2825        }
2826        // v7.17.0 Phase 6.2 — release the cross-process lock on
2827        // clean shutdown. Failure is logged but never panics;
2828        // the operator can clear a stale lock via
2829        // `Database::force_unlock` if a crash kept the
2830        // directory around.
2831        if let Some(ctx) = &self.persistence
2832            && ctx.lock_path.exists()
2833        {
2834            // remove_dir_all: the lock dir carries the owner-pid
2835            // record since round-12.
2836            if let Err(e) = std::fs::remove_dir_all(&ctx.lock_path) {
2837                eprintln!(
2838                    "spg-embedded: lock release on Drop failed for {}: {e:?}",
2839                    ctx.lock_path.display()
2840                );
2841            }
2842        }
2843    }
2844}
2845
2846impl Database {
2847    /// v7.17.0 Phase 6.2 — clear a stale cross-process lock.
2848    /// Use when a previous process crashed mid-session and
2849    /// left `<db_path>.lock` behind. Operators should confirm
2850    /// no other process is currently using the database before
2851    /// calling this — SPG cannot fingerprint stale-vs-live
2852    /// without a libc dep, which would violate spg-embedded's
2853    /// zero-deps charter.
2854    pub fn force_unlock(db_path: impl AsRef<Path>) -> Result<(), EngineError> {
2855        let lock_path = {
2856            let mut p = db_path.as_ref().to_path_buf();
2857            let name = p
2858                .file_name()
2859                .map(|n| {
2860                    let mut s = n.to_os_string();
2861                    s.push(".lock");
2862                    s
2863                })
2864                .unwrap_or_else(|| std::ffi::OsString::from(".lock"));
2865            p.set_file_name(name);
2866            p
2867        };
2868        if !lock_path.exists() {
2869            return Ok(());
2870        }
2871        std::fs::remove_dir_all(&lock_path).map_err(io_err)
2872    }
2873}
2874
2875/// v7.1 — turn a `std::io::Error` into the workspace's
2876/// `EngineError` shape. `EngineError::Storage(Corrupt(_))` is
2877/// the closest existing variant — io failures during boot or
2878/// during a WAL append surface as a storage-layer fault to
2879/// callers, which keeps the public error enum unchanged.
2880fn io_err(e: std::io::Error) -> EngineError {
2881    EngineError::Storage(spg_storage::StorageError::Corrupt(format!("io: {e}")))
2882}
2883
2884/// v7.2.2 — `Database` is `Send`, so the recommended sharing
2885/// pattern for multi-threaded callers is `Arc<Mutex<Database>>`:
2886///
2887/// ```no_run
2888/// use std::sync::{Arc, Mutex};
2889/// use spg_embedded::Database;
2890///
2891/// let db = Database::open_in_memory();
2892/// let shared = Arc::new(Mutex::new(db));
2893/// let shared_for_worker = Arc::clone(&shared);
2894/// std::thread::spawn(move || {
2895///     let mut guard = shared_for_worker.lock().unwrap();
2896///     guard.execute("INSERT INTO t VALUES (1)").unwrap();
2897/// });
2898/// ```
2899///
2900/// Internal `RwLock`-wrapped state — letting many threads
2901/// hold concurrent `&Database` for `SELECT` without contending
2902/// — is parked as STABILITY § "Out of v7.2"; multi-reader
2903/// embedded throughput needs a planner-side change to release
2904/// the engine read lock between scans, which is the v7.x
2905/// "Choice A" line of work already documented in v6.9.1's
2906/// carve-out.
2907#[allow(dead_code)]
2908fn _database_is_send() {
2909    fn assert_send<T: Send>() {}
2910    assert_send::<Database>();
2911}
2912
2913/// v6.10.3 — trait that maps a row's columns onto a user
2914/// struct's fields. v7.3.0 ships the [`spg_row!`] declarative
2915/// macro that generates `impl FromSpgRow for YourStruct` from
2916/// a struct definition (no proc-macro, no syn/quote/
2917/// proc-macro2 deps — the workspace's "0 external deps"
2918/// policy holds).
2919///
2920/// Implementors map a row's columns onto a user struct's
2921/// fields. Errors surface as `EngineError::Unsupported` so the
2922/// caller's error type stays uniform.
2923pub trait FromSpgRow: Sized {
2924    /// Decode one query result row into `Self`. Called once per
2925    /// row by [`Database::query_typed`]. The slice length equals
2926    /// the number of columns in the SELECT projection.
2927    fn from_spg_row(row: &[Value]) -> Result<Self, EngineError>;
2928}
2929
2930/// v7.3.0 — declarative macro that generates `FromSpgRow` impl
2931/// for a user struct. Avoids proc-macro deps
2932/// (syn/quote/proc-macro2) so the workspace's 0-deps policy
2933/// holds; the trade-off vs `#[derive(SpgRow)]` is that the
2934/// macro takes the entire struct definition (fields + types)
2935/// as input rather than annotating an existing struct.
2936///
2937/// ```no_run
2938/// use spg_embedded::{Database, spg_row, FromSpgRow};
2939///
2940/// spg_row! {
2941///     pub struct User {
2942///         pub id: i32,
2943///         pub name: String,
2944///     }
2945/// }
2946///
2947/// let mut db = Database::open_in_memory();
2948/// db.execute("CREATE TABLE users (id INT NOT NULL, name TEXT)").unwrap();
2949/// db.execute("INSERT INTO users VALUES (1, 'alice')").unwrap();
2950/// let users: Vec<User> = db.query_typed("SELECT id, name FROM users").unwrap();
2951/// ```
2952///
2953/// Supported field types: `i16`, `i32`, `i64`, `f32`, `f64`,
2954/// `bool`, `String`, `Vec<f32>` (for `VECTOR(N)` columns),
2955/// `Option<T>` of any of the above.
2956#[macro_export]
2957macro_rules! spg_row {
2958    (
2959        $(#[$meta:meta])*
2960        $vis:vis struct $name:ident {
2961            $(
2962                $(#[$fmeta:meta])*
2963                $fvis:vis $field:ident : $ty:ty,
2964            )*
2965        }
2966    ) => {
2967        $(#[$meta])*
2968        #[derive(Debug, Clone)]
2969        $vis struct $name {
2970            $(
2971                $(#[$fmeta])*
2972                $fvis $field : $ty,
2973            )*
2974        }
2975
2976        impl $crate::FromSpgRow for $name {
2977            fn from_spg_row(row: &[$crate::Value]) -> ::core::result::Result<Self, $crate::EngineError> {
2978                let mut __spg_row_iter = row.iter();
2979                $(
2980                    let $field: $ty = {
2981                        let v = __spg_row_iter
2982                            .next()
2983                            .ok_or_else(|| $crate::EngineError::Unsupported(
2984                                ::std::format!(
2985                                    "spg_row! {}: missing column for field `{}`",
2986                                    ::core::stringify!($name),
2987                                    ::core::stringify!($field)
2988                                )
2989                            ))?;
2990                        <$ty as $crate::FromSpgValue>::from_spg_value(v)
2991                            .map_err(|e| $crate::EngineError::Unsupported(
2992                                ::std::format!(
2993                                    "spg_row! {}: column `{}`: {}",
2994                                    ::core::stringify!($name),
2995                                    ::core::stringify!($field),
2996                                    e
2997                                )
2998                            ))?
2999                    };
3000                )*
3001                Ok(Self { $($field,)* })
3002            }
3003        }
3004    };
3005}
3006
3007/// v7.3.0 — per-column decoder used by `spg_row!`. Surface
3008/// covers every numeric / text / bytes / bool variant in
3009/// `Value`, plus `Option<T>` for nullable columns.
3010pub trait FromSpgValue: Sized {
3011    /// Decode one cell into `Self`. The returned `&'static str`
3012    /// is a short diagnostic for type mismatches (e.g. `"expected
3013    /// integer, got TEXT"`); callers wrap it into their own
3014    /// error type.
3015    fn from_spg_value(v: &Value) -> Result<Self, &'static str>;
3016}
3017
3018macro_rules! impl_from_value_int {
3019    ($($t:ty),* $(,)?) => {
3020        $(
3021            impl FromSpgValue for $t {
3022                fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3023                    match v {
3024                        Value::SmallInt(n) => <$t>::try_from(*n).map_err(|_| "SmallInt does not fit target int type"),
3025                        Value::Int(n)      => <$t>::try_from(*n).map_err(|_| "Int does not fit target int type"),
3026                        Value::BigInt(n)   => <$t>::try_from(*n).map_err(|_| "BigInt does not fit target int type"),
3027                        Value::Null        => Err("NULL in non-Option int column"),
3028                        _ => Err("non-integer value in int column"),
3029                    }
3030                }
3031            }
3032        )*
3033    };
3034}
3035impl_from_value_int!(i16, i32, i64);
3036
3037impl FromSpgValue for f32 {
3038    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3039        match v {
3040            Value::Float(f) => Ok(*f as f32),
3041            Value::Null => Err("NULL in non-Option float column"),
3042            _ => Err("non-float value in float column"),
3043        }
3044    }
3045}
3046
3047impl FromSpgValue for f64 {
3048    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3049        match v {
3050            Value::Float(f) => Ok(*f),
3051            Value::Null => Err("NULL in non-Option float column"),
3052            _ => Err("non-float value in float column"),
3053        }
3054    }
3055}
3056
3057impl FromSpgValue for bool {
3058    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3059        match v {
3060            Value::Bool(b) => Ok(*b),
3061            Value::Null => Err("NULL in non-Option bool column"),
3062            _ => Err("non-bool value in bool column"),
3063        }
3064    }
3065}
3066
3067impl FromSpgValue for String {
3068    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3069        match v {
3070            Value::Text(s) => Ok(s.clone()),
3071            Value::Null => Err("NULL in non-Option text column"),
3072            _ => Err("non-text value in String column"),
3073        }
3074    }
3075}
3076
3077impl FromSpgValue for Vec<f32> {
3078    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3079        match v {
3080            Value::Vector(xs) => Ok(xs.clone()),
3081            Value::Null => Err("NULL in non-Option vector column"),
3082            _ => Err("non-vector value in Vec<f32> column"),
3083        }
3084    }
3085}
3086
3087impl<T: FromSpgValue> FromSpgValue for Option<T> {
3088    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3089        match v {
3090            Value::Null => Ok(None),
3091            other => T::from_spg_value(other).map(Some),
3092        }
3093    }
3094}
3095
3096/// Acquire the cross-process exclusion lock at `lock_path` (atomic
3097/// `mkdir`), recording the owner pid inside. If the lock already
3098/// exists, read the recorded pid and probe liveness — a lock left
3099/// behind by a killed process (docker SIGKILL, crash) is reclaimed
3100/// automatically instead of forcing the operator to delete it by
3101/// hand (mailrs embed round-12: a restarted server came up in
3102/// degraded mode because the previous instance's lock survived).
3103/// v7.27 (mailrs round-21 B) — the prober's environment identity:
3104/// `(hostname, boot-or-container id)`. A pid is only meaningful
3105/// inside the PID namespace that recorded it; mailrs's recovery
3106/// window saw "locked by pid 1" from a STOPPED container because
3107/// the prober's pid 1 (its own init) was alive. When the lock's
3108/// identity differs from ours, liveness is UNDECIDABLE and we
3109/// refuse honestly instead of guessing in either direction.
3110fn host_identity() -> (String, String) {
3111    let hostname = std::process::Command::new("hostname")
3112        .output()
3113        .ok()
3114        .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string())
3115        .unwrap_or_default();
3116    // Linux boot id; containers share the host kernel's boot id, so
3117    // hostname (= container id by default) is the namespace
3118    // discriminator and boot id catches host reboots / pid reuse.
3119    let boot_id = std::fs::read_to_string("/proc/sys/kernel/random/boot_id")
3120        .map(|s| s.trim().to_string())
3121        .or_else(|_| {
3122            std::process::Command::new("sysctl")
3123                .args(["-n", "kern.bootsessionuuid"])
3124                .output()
3125                .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string())
3126        })
3127        .unwrap_or_default();
3128    (hostname, boot_id)
3129}
3130
3131fn acquire_path_lock(lock_path: &Path) -> Result<(), EngineError> {
3132    for attempt in 0..2 {
3133        match std::fs::create_dir(lock_path) {
3134            Ok(()) => {
3135                // Best-effort owner record; liveness probing treats a
3136                // missing pid file as stale (crash between mkdir and
3137                // write is indistinguishable from an ancient lock).
3138                // v7.27 — lines 2+3 record the owner's environment
3139                // identity (hostname, boot id) so a prober in a
3140                // different namespace refuses instead of misreading
3141                // the pid.
3142                let (host, boot) = host_identity();
3143                let _ = std::fs::write(
3144                    lock_path.join("pid"),
3145                    format!("{}\n{host}\n{boot}\n", std::process::id()),
3146                );
3147                return Ok(());
3148            }
3149            Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists && attempt == 0 => {
3150                let record = std::fs::read_to_string(lock_path.join("pid")).unwrap_or_default();
3151                let mut lines = record.lines();
3152                let owner = lines.next().and_then(|s| s.trim().parse::<u32>().ok());
3153                let lock_host = lines.next().unwrap_or("").trim().to_string();
3154                let lock_boot = lines.next().unwrap_or("").trim().to_string();
3155                // v7.27 — identity check BEFORE the pid probe. A pid
3156                // recorded in another namespace is undecidable both
3157                // ways (a stale lock can look held, a held lock can
3158                // look stale — the unsafe direction). Old-format
3159                // locks (pid only) keep the legacy same-host
3160                // assumption.
3161                if !lock_host.is_empty() {
3162                    let (my_host, my_boot) = host_identity();
3163                    let same_env = lock_host == my_host
3164                        && (lock_boot.is_empty() || my_boot.is_empty() || lock_boot == my_boot);
3165                    if !same_env {
3166                        return Err(EngineError::Unsupported(format!(
3167                            "database lock {} was taken in a different host/container \
3168                             (owner: pid {} on {:?}; we are {:?}) — liveness is \
3169                             undecidable from here. If you are sure the owner is gone, \
3170                             call Database::force_unlock() or `spg import --force-unlock`.",
3171                            lock_path.display(),
3172                            owner.unwrap_or(0),
3173                            lock_host,
3174                            my_host
3175                        )));
3176                    }
3177                }
3178                let owner_alive = owner.is_some_and(pid_alive);
3179                if owner_alive {
3180                    return Err(EngineError::Unsupported(format!(
3181                        "database is locked by another process (pid {}): {}; \
3182                         stop that process first, or call Database::force_unlock()",
3183                        owner.unwrap_or(0),
3184                        lock_path.display()
3185                    )));
3186                }
3187                // Stale — owner pid dead or unrecorded. Reclaim.
3188                eprintln!(
3189                    "spg-embedded: reclaiming stale lock {} (owner pid {:?} not alive)",
3190                    lock_path.display(),
3191                    owner
3192                );
3193                std::fs::remove_dir_all(lock_path).map_err(io_err)?;
3194                // Loop retries the create_dir; a concurrent reclaimer
3195                // winning the race surfaces as AlreadyExists on
3196                // attempt 1 below.
3197            }
3198            Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
3199                return Err(EngineError::Unsupported(format!(
3200                    "database is locked by another process: {}; \
3201                     stop that process first, or call Database::force_unlock()",
3202                    lock_path.display()
3203                )));
3204            }
3205            Err(e) => return Err(io_err(e)),
3206        }
3207    }
3208    unreachable!("acquire_path_lock loop covers both attempts")
3209}
3210
3211/// Probe whether `pid` is a live process. Unix: `ps -p` via the
3212/// system binary (std-only — no libc dependency). `ps -p` exits 0
3213/// for ANY live pid regardless of owner; `kill -0` was rejected
3214/// here because it fails with EPERM on another user's live process,
3215/// which would read as "dead" and reclaim a held lock. Probe
3216/// failure (no `ps` binary, exec error) conservatively reports
3217/// alive so locks are never auto-reclaimed on doubt; non-unix
3218/// targets do the same.
3219#[cfg(unix)]
3220fn pid_alive(pid: u32) -> bool {
3221    match std::process::Command::new("ps")
3222        .arg("-p")
3223        .arg(pid.to_string())
3224        .stdout(std::process::Stdio::null())
3225        .stderr(std::process::Stdio::null())
3226        .status()
3227    {
3228        Ok(status) => status.success(),
3229        Err(_) => true,
3230    }
3231}
3232
3233#[cfg(not(unix))]
3234fn pid_alive(_pid: u32) -> bool {
3235    true
3236}
3237
3238/// Strip leading whitespace, `--` line comments and NON-conditional
3239/// block comments from a chunk so statement-head checks (COPY
3240/// detection most notably) see the first real token. pg_dump
3241/// prefixes every data block with a `-- Data for Name: …;` banner —
3242/// which itself contains semicolons, so head checks must run on the
3243/// stripped text. MySQL executable conditional comments (`/*!`) are
3244/// content and stay.
3245/// v7.22 — see `split_statements`' `mysql_escapes` tracking. Only
3246/// short chunks are inspected (the signal statements are one-liners;
3247/// COPY data blocks are skipped by the length guard).
3248fn note_dialect_signals(chunk: &str, mysql_escapes: &mut bool) {
3249    if chunk.len() > 4096 {
3250        return;
3251    }
3252    let lower = chunk.to_ascii_lowercase();
3253    if lower.contains("sql_mode") {
3254        *mysql_escapes = true;
3255    } else if lower.contains("standard_conforming_strings") {
3256        *mysql_escapes = lower.contains("off");
3257    }
3258}
3259
3260fn strip_leading_sql_noise(mut s: &str) -> &str {
3261    loop {
3262        let t = s.trim_start();
3263        if let Some(rest) = t.strip_prefix("--") {
3264            s = rest.split_once('\n').map_or("", |(_, r)| r);
3265            continue;
3266        }
3267        if t.starts_with("/*") && !t.starts_with("/*!") {
3268            match t.find("*/") {
3269                Some(e) => {
3270                    s = &t[e + 2..];
3271                    continue;
3272                }
3273                None => return "",
3274            }
3275        }
3276        return t;
3277    }
3278}
3279
3280/// Split a multi-statement SQL script into individual statements on
3281/// top-level `;`, honouring single-quoted strings (with `''`
3282/// escapes), double-quoted identifiers, dollar-quoted bodies
3283/// (`$tag$ … $tag$`), line comments (`--`) and MySQL executable
3284/// conditional comments (`/*!… */` stay statement content; plain
3285/// nested block comments don't). Chunks that contain no statement
3286/// content (whitespace / comments only) are dropped. PG's
3287/// simple-query protocol does this server-side; the embed path owns
3288/// it here.
3289///
3290/// v7.22 (mailrs round-13 gap 1) — psql meta-command lines are
3291/// dropped for client parity: a line whose first non-whitespace
3292/// byte is `\` BETWEEN statements (PG 18's pg_dump wraps scripts in
3293/// `\restrict` / `\unrestrict`) never reaches the parser, the same
3294/// way psql consumes `\`-lines client-side and never sends them. A
3295/// mid-statement backslash stays an ordinary byte — pg_dump only
3296/// emits meta-commands between statements.
3297pub fn split_statements(sql: &str) -> Vec<&str> {
3298    let bytes = sql.as_bytes();
3299    let mut stmts = Vec::new();
3300    let mut start = 0usize;
3301    let mut has_content = false;
3302    // v7.22 (round-13 T3) — stream-tracked string dialect, mirroring
3303    // the engine's session flag: a statement mentioning `sql_mode`
3304    // (mysqldump preamble, often inside `/*!…*/`) switches plain
3305    // strings to backslash-escape scanning;
3306    // `standard_conforming_strings` (pg_dump preamble) switches
3307    // back. Without this the scanner ends a MySQL `'…\'…'` literal
3308    // early and splits inside data.
3309    let mut mysql_escapes = false;
3310    let mut i = 0usize;
3311    while i < bytes.len() {
3312        match bytes[i] {
3313            b'\\' if !has_content => {
3314                // Start-of-statement `\` = psql meta-command line.
3315                // Consume through end-of-line; restart the chunk
3316                // after it so the line never lands in the output.
3317                while i < bytes.len() && bytes[i] != b'\n' {
3318                    i += 1;
3319                }
3320                start = if i < bytes.len() { i + 1 } else { i };
3321            }
3322            b'\'' => {
3323                has_content = true;
3324                // PG escape-string form `E'...'` honours backslash
3325                // escapes (`E'a\';b'` is ONE literal) — detect via
3326                // the immediately-preceding standalone E/e. MySQL
3327                // dialect sessions treat EVERY plain string that way.
3328                let escape_string = mysql_escapes
3329                    || (i >= 1
3330                        && matches!(bytes[i - 1], b'e' | b'E')
3331                        && !(i >= 2
3332                            && (bytes[i - 2].is_ascii_alphanumeric() || bytes[i - 2] == b'_')));
3333                i += 1;
3334                while i < bytes.len() {
3335                    if escape_string && bytes[i] == b'\\' {
3336                        // Skip the escaped byte (covers \' and \\).
3337                        i += 2;
3338                        continue;
3339                    }
3340                    if bytes[i] == b'\'' {
3341                        // `''` is an escaped quote inside the literal.
3342                        if i + 1 < bytes.len() && bytes[i + 1] == b'\'' {
3343                            i += 2;
3344                            continue;
3345                        }
3346                        break;
3347                    }
3348                    i += 1;
3349                }
3350            }
3351            b'"' => {
3352                has_content = true;
3353                i += 1;
3354                while i < bytes.len() && bytes[i] != b'"' {
3355                    i += 1;
3356                }
3357            }
3358            b'$' => {
3359                // Possible dollar-quote opener `$tag$` (tag may be
3360                // empty). If the shape doesn't match, it's a plain
3361                // `$` (positional param) — fall through.
3362                let tag_end = bytes[i + 1..]
3363                    .iter()
3364                    .position(|&b| !(b.is_ascii_alphanumeric() || b == b'_'))
3365                    .map(|off| i + 1 + off);
3366                if let Some(te) = tag_end
3367                    && te < bytes.len()
3368                    && bytes[te] == b'$'
3369                {
3370                    has_content = true;
3371                    let tag = &sql[i..=te];
3372                    // Find the closing `$tag$`.
3373                    if let Some(close) = sql[te + 1..].find(tag) {
3374                        i = te + 1 + close + tag.len();
3375                        continue;
3376                    }
3377                    // Unterminated — consume the rest; the parser
3378                    // will report it.
3379                    i = bytes.len();
3380                    continue;
3381                }
3382                has_content = true;
3383            }
3384            b'-' if i + 1 < bytes.len() && bytes[i + 1] == b'-' => {
3385                while i < bytes.len() && bytes[i] != b'\n' {
3386                    i += 1;
3387                }
3388            }
3389            b'/' if i + 1 < bytes.len() && bytes[i + 1] == b'*' => {
3390                // v7.22 (round-13 T3) — MySQL conditional comments
3391                // `/*!40101 … */` are EXECUTABLE (mysqldump wraps
3392                // its whole preamble + DISABLE KEYS hints in them);
3393                // they must stay statement content for the engine,
3394                // not be skipped as commentary.
3395                if i + 2 < bytes.len() && bytes[i + 2] == b'!' {
3396                    has_content = true;
3397                }
3398                let mut depth = 1usize;
3399                i += 2;
3400                while i < bytes.len() && depth > 0 {
3401                    if bytes[i] == b'/' && i + 1 < bytes.len() && bytes[i + 1] == b'*' {
3402                        depth += 1;
3403                        i += 2;
3404                    } else if bytes[i] == b'*' && i + 1 < bytes.len() && bytes[i + 1] == b'/' {
3405                        depth -= 1;
3406                        i += 2;
3407                    } else {
3408                        i += 1;
3409                    }
3410                }
3411                continue;
3412            }
3413            b';' => {
3414                if has_content {
3415                    let head = &sql[start..i];
3416                    // v7.22 (round-13 T2) — a `COPY … FROM stdin;`
3417                    // statement owns its following data block
3418                    // through the `\.` terminator line (data lines
3419                    // may contain `;`, so generic splitting would
3420                    // shred them). Swallow head + data into ONE
3421                    // chunk; `execute_script` lowers it to INSERTs.
3422                    // pg_dump prefixes the COPY with a comment
3423                    // banner — strip it before the head check.
3424                    let head_clean = strip_leading_sql_noise(head);
3425                    let is_copy_head = head_clean
3426                        .get(..4)
3427                        .is_some_and(|p| p.eq_ignore_ascii_case("copy"))
3428                        && spg_engine::copy::parse_copy_from_stdin_head(head_clean).is_some();
3429                    if is_copy_head {
3430                        // Scan whole lines after the ';' until the
3431                        // `\.` terminator (or EOF — torn dumps lose
3432                        // their tail, same as psql would error).
3433                        let mut j = i + 1;
3434                        let data_end;
3435                        loop {
3436                            if j >= bytes.len() {
3437                                data_end = bytes.len();
3438                                break;
3439                            }
3440                            let line_end = sql[j..].find('\n').map_or(bytes.len(), |off| j + off);
3441                            if sql[j..line_end].trim_end_matches('\r').trim() == "\\." {
3442                                data_end = j;
3443                                i = line_end; // bottom i += 1 skips \n
3444                                break;
3445                            }
3446                            j = line_end + 1;
3447                        }
3448                        stmts.push(&sql[start..data_end]);
3449                        if data_end == bytes.len() {
3450                            i = bytes.len();
3451                        }
3452                        start = i + 1;
3453                        has_content = false;
3454                        i += 1;
3455                        continue;
3456                    }
3457                    note_dialect_signals(head, &mut mysql_escapes);
3458                    stmts.push(head);
3459                }
3460                start = i + 1;
3461                has_content = false;
3462            }
3463            b => {
3464                if !b.is_ascii_whitespace() {
3465                    has_content = true;
3466                }
3467            }
3468        }
3469        i += 1;
3470    }
3471    if has_content {
3472        stmts.push(&sql[start..]);
3473    }
3474    stmts
3475}
3476
3477#[cfg(test)]
3478mod tests {
3479    use super::*;
3480
3481    #[test]
3482    fn split_statements_basic_and_trailing() {
3483        assert_eq!(
3484            split_statements("CREATE TABLE a (x INT); INSERT INTO a VALUES (1)"),
3485            vec!["CREATE TABLE a (x INT)", " INSERT INTO a VALUES (1)"]
3486        );
3487        // whitespace/comment-only chunks drop
3488        assert!(split_statements("  ;; -- nothing\n;").is_empty());
3489    }
3490
3491    #[test]
3492    fn split_statements_quoting_forms() {
3493        // ';' inside a plain literal, a doubled quote, an E-string
3494        // backslash escape, a quoted identifier, and a dollar-quoted
3495        // body must not split.
3496        let cases = [
3497            "INSERT INTO t VALUES ('a;b')",
3498            "INSERT INTO t VALUES ('it''s; fine')",
3499            r"INSERT INTO t VALUES (E'it\'s; fine')",
3500            "CREATE TABLE \"odd;name\" (x INT)",
3501            "DO $body$ BEGIN PERFORM 1; END $body$",
3502            "DO $$ SELECT 1; $$",
3503        ];
3504        for sql in cases {
3505            assert_eq!(split_statements(sql), vec![sql], "must stay whole: {sql}");
3506        }
3507        // ...and each still splits cleanly from a neighbour.
3508        for sql in cases {
3509            let script = format!("{sql};\nSELECT 2");
3510            assert_eq!(
3511                split_statements(&script),
3512                vec![sql, "\nSELECT 2"],
3513                "must split after: {sql}"
3514            );
3515        }
3516    }
3517
3518    #[test]
3519    fn split_statements_drops_psql_meta_lines() {
3520        // v7.22 round-13 gap 1 — PG 18 pg_dump wraps scripts in
3521        // `\restrict` / `\unrestrict`; psql parity = the lines never
3522        // reach the parser.
3523        let script = "\\restrict TOKEN123\nSELECT 1;\n\\unrestrict TOKEN123\nSELECT 2;\n\\.\n";
3524        assert_eq!(split_statements(script), vec!["SELECT 1", "SELECT 2"]);
3525        // Mid-statement backslash is NOT a meta-command.
3526        let s2 = r"SELECT E'a\\b'";
3527        assert_eq!(split_statements(s2), vec![s2]);
3528    }
3529
3530    #[test]
3531    fn split_statements_comments_hide_semicolons() {
3532        let script = "-- c1 ; still comment\nSELECT 1; /* a ; b /* nested ; */ */ SELECT 2";
3533        let got = split_statements(script);
3534        assert_eq!(got.len(), 2);
3535        assert!(got[0].contains("SELECT 1"));
3536        assert!(got[1].contains("SELECT 2"));
3537    }
3538
3539    #[test]
3540    fn in_memory_create_insert_select() {
3541        let mut db = Database::open_in_memory();
3542        db.execute("CREATE TABLE t (id INT NOT NULL, name TEXT)")
3543            .unwrap();
3544        db.execute("INSERT INTO t VALUES (1, 'alice')").unwrap();
3545        db.execute("INSERT INTO t VALUES (2, 'bob')").unwrap();
3546        let rows = db.query("SELECT id FROM t WHERE id = 1").unwrap();
3547        assert_eq!(rows.len(), 1);
3548        match &rows[0][0] {
3549            Value::Int(1) => {}
3550            other => panic!("expected Int(1), got {other:?}"),
3551        }
3552    }
3553
3554    #[test]
3555    fn query_on_non_select_errors() {
3556        let mut db = Database::open_in_memory();
3557        db.execute("CREATE TABLE t (id INT)").unwrap();
3558        let r = db.query("INSERT INTO t VALUES (1)");
3559        assert!(r.is_err(), "query() on INSERT must error");
3560    }
3561
3562    #[test]
3563    fn snapshot_roundtrip() {
3564        let mut db = Database::open_in_memory();
3565        db.execute("CREATE TABLE t (id INT NOT NULL)").unwrap();
3566        db.execute("INSERT INTO t VALUES (42)").unwrap();
3567        let bytes = db.snapshot();
3568        let mut restored = Database::restore(&bytes).unwrap();
3569        let rows = restored.query("SELECT id FROM t WHERE id = 42").unwrap();
3570        assert_eq!(rows.len(), 1);
3571        match &rows[0][0] {
3572            Value::Int(42) => {}
3573            other => panic!("expected Int(42), got {other:?}"),
3574        }
3575    }
3576
3577    #[test]
3578    fn from_spg_row_trait_shape() {
3579        struct User {
3580            _id: i32,
3581        }
3582        impl FromSpgRow for User {
3583            fn from_spg_row(row: &[Value]) -> Result<Self, EngineError> {
3584                match row.first() {
3585                    Some(Value::Int(n)) => Ok(Self { _id: *n }),
3586                    _ => Err(EngineError::Unsupported("bad id".into())),
3587                }
3588            }
3589        }
3590        let row = vec![Value::Int(7)];
3591        let _u = User::from_spg_row(&row).unwrap();
3592    }
3593}