Skip to main content

spg_embedded/
lib.rs

1// v7.7.2 — every public item in this crate must carry a
2// doc-comment; new code that adds a `pub` without one fails CI.
3#![deny(missing_docs)]
4
5//! # spg-embedded
6//!
7//! Ergonomic embedded-mode entry point for SPG. Wraps the
8//! `spg-engine` execution layer for in-process applications
9//! that don't want to spin up a TCP listener / fork to the
10//! `spg-server` binary.
11//!
12//! ## Quick start
13//!
14//! ```no_run
15//! use spg_embedded::Database;
16//!
17//! // On-disk, durable. WAL fsynced per commit; auto-checkpoint
18//! // at 4 MiB WAL by default.
19//! let mut db = Database::open_path("/data/app.db").unwrap();
20//! db.execute("CREATE TABLE users (id INT NOT NULL, name TEXT)").unwrap();
21//! db.execute("INSERT INTO users VALUES (1, 'alice')").unwrap();
22//! let rows = db.query("SELECT name FROM users WHERE id = 1").unwrap();
23//! for row in &rows {
24//!     println!("{:?}", row);
25//! }
26//! ```
27//!
28//! ## Production checklist (v7.5)
29//!
30//! - **Persistence**: `Database::open_path(p)` writes a
31//!   crash-consistent WAL + periodic checkpoint snapshot. The
32//!   on-disk format is byte-identical to what `spg-server`
33//!   produces, so a database can move between modes without
34//!   conversion.
35//! - **Durability**: every `execute()` that mutates calls
36//!   `fsync` before returning `Ok`. There is no group commit
37//!   in embedded mode — every commit pays one fsync. If you
38//!   need batch throughput, wrap multiple statements in
39//!   [`Database::with_transaction`] which fsyncs only at
40//!   commit.
41//! - **Concurrency**: [`Database`] is `Send` but **not** `Sync`.
42//!   Share across threads via `Arc<Mutex<Database>>`. The
43//!   single-writer model is intentional — see
44//!   [STABILITY § A1](https://github.com/lihao/spg/blob/master/STABILITY.md).
45//! - **Background work**: [`Database::spawn_background_freezer`]
46//!   moves cold rows to disk-resident segments while you keep
47//!   serving requests. It runs in a dedicated thread; drop the
48//!   returned [`FreezerHandle`] (or call `stop()`) for clean
49//!   shutdown.
50//! - **Errors**: all public enums ([`EngineError`],
51//!   [`QueryResult`], [`Value`]) are `#[non_exhaustive]`. Match
52//!   them with a wildcard arm so future v7.x releases can add
53//!   variants without breaking your code.
54//!
55//! ## Panic contract
56//!
57//! - **No `execute()` / `query()` call panics on user input.**
58//!   Malformed SQL, type mismatches, missing tables — all
59//!   return `Err(EngineError::…)`. If you observe a panic on
60//!   a user-controlled string, that is a bug; file an issue.
61//! - The library panics **only** on internal invariant
62//!   violations (e.g., catalog snapshot magic mismatch, WAL
63//!   record CRC sentinel corruption that survived the boot-
64//!   time validation). These represent silent disk corruption
65//!   and an unwind would leak inconsistent state, so the
66//!   release profile uses `panic = abort` — your host process
67//!   dies fast rather than continuing on poisoned data.
68//! - If you cannot tolerate `panic = abort`, build with
69//!   `--profile release-dbg` (keeps unwind tables) and use
70//!   `std::panic::catch_unwind` at your application boundary.
71//!
72//! ## Why a separate crate?
73//!
74//! `spg-engine` is `no_std`-compatible (vendored alloc-only).
75//! The embedded-mode entry point uses `std` (filesystem,
76//! threading), so it lives in its own crate to keep the
77//! `no_std` boundary clean.
78
79pub use spg_engine::{CatalogSnapshot, Engine, EngineError, ParsedStatement, QueryResult};
80pub use spg_storage::{ColumnSchema, DataType, Value};
81
82/// v7.16.0 — handle for a parsed-and-planned SQL statement.
83/// Hand off to [`Database::execute_prepared`] / [`Database::query_prepared`]
84/// with a `&[Value]` slice carrying the bind parameters (PG-style
85/// `$1`, `$2`, … positional). Cheap to `Clone`; the underlying AST
86/// is shared by handle copies and cloned per bind call by the
87/// engine's executor.
88///
89/// The handle holds a snapshot of the AST at prepare time. If
90/// the engine's plan cache evicts the entry between prepare and
91/// execute (e.g. ANALYZE bumps the statistics version) the
92/// stored AST keeps working — `execute_prepared` operates on
93/// the handle's clone, not the cache entry.
94#[derive(Debug, Clone)]
95pub struct Statement {
96    /// The parsed + planned AST. `spg-engine::prepare_cached`
97    /// returns it as a clone of the cached plan, so any rewrite
98    /// passes (`expand_group_by_all`, `reorder_joins`, …) have
99    /// already run.
100    pub(crate) stmt: ParsedStatement,
101    /// Original SQL source, kept for `Display` / debug only.
102    /// WAL persistence renders from the AST so a bind-time
103    /// rewrite of `$1..$N` survives replay.
104    pub(crate) sql: String,
105}
106
107impl Statement {
108    /// Borrow the original SQL source — useful for tracing and
109    /// debug logs. WAL replay does NOT use this; it serialises
110    /// the bind-final AST instead.
111    #[must_use]
112    pub fn sql(&self) -> &str {
113        &self.sql
114    }
115}
116
117/// v7.16.0 — internal WAL helper. Mirrors what
118/// `Engine::execute_prepared` does to the cloned AST so the WAL
119/// record carries the bind-final SQL text (so replay's
120/// simple-query path reconstructs the same row state without
121/// needing the original `Statement` handle to still be alive).
122/// Errors from the underlying engine helper would only fire if
123/// the bind-final stmt referenced a placeholder past the params
124/// slice — and that case has already errored in the executor
125/// above before this helper runs, so we discard the Result here.
126fn wal_render_with_params(stmt: &mut ParsedStatement, params: &[Value]) {
127    let _ = spg_engine::substitute_placeholders(stmt, params);
128}
129
130use std::collections::BTreeMap;
131use std::fs::{File, OpenOptions};
132use std::io::Write;
133use std::path::{Path, PathBuf};
134use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
135use std::sync::{Arc, Condvar, Mutex};
136use std::thread::{self, JoinHandle};
137use std::time::{Duration, SystemTime, UNIX_EPOCH};
138
139/// v7.11.3 — wall-clock provider injected into every embedded
140/// `Engine`. Microseconds since the Unix epoch; clamps to
141/// `i64::MAX` if the system clock is far-future. Used by SQL's
142/// `NOW()` / `CURRENT_TIMESTAMP` / `CURRENT_DATE` rewrite layer
143/// so PG-idiomatic time queries work without the caller wiring
144/// their own clock.
145fn wall_clock_micros() -> i64 {
146    SystemTime::now()
147        .duration_since(UNIX_EPOCH)
148        .map_or(0, |d| i64::try_from(d.as_micros()).unwrap_or(i64::MAX))
149}
150
151use spg_manifest::{CatalogManifest, ColdSegmentEntry, manifest_path as spg_manifest_path};
152
153// -- v7.1 WAL format constants (mirror `spg-server`'s) ---------
154// Kept private so callers can't mis-frame records; the v3 layout
155// is the same the server uses, so a `spg-server` boot can read a
156// database an embedded process wrote and vice versa.
157const WAL_V2_SENTINEL: u32 = 0x8000_0000;
158const WAL_V3_FLAG: u32 = 0x4000_0000;
159const WAL_V3_TYPE_AUTO_COMMIT_SQL: u8 = 0x01;
160/// v7.18 — durability checkpoint marker stays at 0x02 (skipped on replay).
161const WAL_V3_TYPE_DURABILITY_CHECKPOINT: u8 = 0x02;
162/// v7.18 PITR — auto-commit-sql record with appended (commit_lsn,
163/// commit_unix_us) fields so replay can target a specific point in
164/// time. Backward-compat: v3 records (type 0x01) keep working, the
165/// envelope flag bits are unchanged. The new type byte is the
166/// schema-version discriminator.
167const WAL_V4_TYPE_AUTO_COMMIT_SQL: u8 = 0x10;
168/// v7.18 — sentinel for "no wall clock" inside a v4 record's
169/// commit_unix_us slot. Restore-to-timestamp skips records with
170/// this sentinel (no time anchor); LSN-based restore is
171/// unaffected.
172const WAL_V4_NO_CLOCK: i64 = i64::MIN;
173/// v7.18 — extra header bytes after the type byte in a v4 record:
174/// 8 bytes commit_lsn (u64 LE) + 8 bytes commit_unix_us (i64 LE).
175const WAL_V4_EXTRA_HEADER: usize = 16;
176/// v7.18 PITR — checkpoint anchor record written to the WAL *before*
177/// the snapshot file replaces the on-disk catalog. Carries the
178/// (lsn, ts, snapshot_path) triple so restore tooling can find the
179/// matching base snapshot without scanning the filesystem. Replay
180/// dispatch skips it (same as the v3 durability marker).
181const WAL_V4_TYPE_CHECKPOINT_MARKER: u8 = 0x11;
182
183/// v7.21 (mailrs embed round-12 polish) — one COMMITted explicit
184/// transaction, flushed atomically at COMMIT time. Payload = the
185/// transaction's bind-final mutation statements joined with `";\n"`;
186/// replay re-splits via [`split_statements`] and applies in order.
187/// Same 16-byte (commit_lsn, commit_unix_us) prefix as the v4
188/// auto-commit record. The record is CRC-framed like every other
189/// record, so replay applies the whole transaction or — torn tail —
190/// none of it; a transaction can never half-resurrect.
191///
192/// Why it exists: in-transaction mutations only touch the engine's
193/// shadow catalog (`modified_catalog: false`), so the per-statement
194/// auto-commit append never fired and a COMMIT followed by a crash
195/// (no graceful Drop checkpoint) lost the transaction.
196const WAL_V4_TYPE_TX_COMMIT_SQL: u8 = 0x12;
197
198/// v7.34 (crash-recovery P0 #2) — row-level physical redo record. Same v4
199/// envelope (lsn + ts + payload + CRC) but the payload is `encode_redo_log`
200/// bytes, not SQL. Replay applies the physical [`RowChange`]s via
201/// `Engine::apply_redo` instead of re-executing — O(changed rows), not the
202/// O(records × catalog_rows) statement-replay that hung the mailrs P0.
203const WAL_V5_TYPE_ROW_REDO: u8 = 0x13;
204
205/// v7.1 — auto-checkpoint threshold. Once the WAL grows past
206/// this many bytes, the next successful `execute()` call ends
207/// with a `checkpoint()` so the WAL stays bounded. Tunable via
208/// `SPG_EMBEDDED_CHECKPOINT_BYTES` env.
209/// v7.34 (crash-recovery P0 #2) — opt-in row-level redo WAL records.
210/// Default OFF during bringup; `SPG_WAL_ROW_REDO=1` makes mutating
211/// statements log physical changes (0x13) instead of SQL, so crash
212/// recovery applies them in O(changed rows) rather than re-executing in
213/// O(records × catalog_rows) (the superlinear replay hang root-caused on
214/// the mailrs P0). DDL still logs as SQL (hybrid log). When this returns
215/// true, `open_path` arms the engine's redo capture.
216fn row_redo_enabled() -> bool {
217    std::env::var("SPG_WAL_ROW_REDO")
218        .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
219        .unwrap_or(false)
220}
221
222fn default_checkpoint_threshold_bytes() -> u64 {
223    std::env::var("SPG_EMBEDDED_CHECKPOINT_BYTES")
224        .ok()
225        .and_then(|s| s.parse::<u64>().ok())
226        .filter(|&n| n > 0)
227        .unwrap_or(4 * 1024 * 1024)
228}
229
230/// v7.30.3 (mailrs round-26) — per-query byte budget on join/filter
231/// materialisation, default ON at 256 MiB for embed parity with the
232/// server's allocator-level `SPG_MAX_QUERY_BYTES` default. A fat
233/// backfill batch (1000 × full mail bodies) then errors with
234/// `QueryBytesExceeded` instead of walking the host into reclaim
235/// livelock. `SPG_MAX_QUERY_BYTES=0` disables; any other value
236/// overrides. NOT applied to the WAL-replay engine — replay must
237/// never fail on a tuning knob.
238fn engine_with_query_byte_budget(engine: Engine) -> Engine {
239    const DEFAULT_MAX_QUERY_BYTES: usize = 256 * 1024 * 1024;
240    match std::env::var("SPG_MAX_QUERY_BYTES")
241        .ok()
242        .and_then(|s| s.trim().parse::<usize>().ok())
243    {
244        Some(0) => engine,
245        Some(n) => engine.with_max_query_bytes(n),
246        None => engine.with_max_query_bytes(DEFAULT_MAX_QUERY_BYTES),
247    }
248}
249
250/// v7.1 — encode one v3 `auto_commit_sql` record. Layout:
251///
252/// ```text
253/// [u32 LE (len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
254/// [u32 LE crc32 over (type_byte || sql_bytes)]
255/// [u8 type = 0x01]
256/// [sql bytes]
257/// ```
258fn encode_v3_auto_commit(sql: &str) -> Vec<u8> {
259    let payload = sql.as_bytes();
260    let mut crc_buf = Vec::with_capacity(1 + payload.len());
261    crc_buf.push(WAL_V3_TYPE_AUTO_COMMIT_SQL);
262    crc_buf.extend_from_slice(payload);
263    let crc = spg_crypto::crc32::crc32(&crc_buf);
264    let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
265    let mut out = Vec::with_capacity(4 + 4 + 1 + payload.len());
266    out.extend_from_slice(&header);
267    out.extend_from_slice(&crc.to_le_bytes());
268    out.push(WAL_V3_TYPE_AUTO_COMMIT_SQL);
269    out.extend_from_slice(payload);
270    out
271}
272
273/// v7.20 P2 — WAL group-commit. N concurrent commits share one
274/// fsync (the 4.2 ms p50 that profile_breakdown measured as
275/// 99.2% of the durable write path).
276///
277/// Leader-follower protocol, same family as PG's group commit:
278///
279/// 1. `enqueue(record)` — called while the caller still holds
280///    the engine's write lock. Appends the encoded record to the
281///    shared buffer, returns a sequence ticket. O(memcpy).
282/// 2. Caller RELEASES the engine write lock (the next writer's
283///    mutation proceeds in parallel with this batch's fsync).
284/// 3. `wait_flushed(seq)` — if nobody is flushing, the caller
285///    elects itself leader: swaps the buffer out, writes +
286///    fsyncs ONCE for every record in the batch, marks the
287///    batch durable, wakes all followers. Otherwise it parks on
288///    the condvar until a leader covers its seq.
289///
290/// Durability contract is unchanged from v7.19: `execute()`
291/// does not return Ok until the record that describes its
292/// mutation is fsynced. The only change is N callers sharing
293/// one fsync instead of paying one each.
294///
295/// Lock order (deadlock-free): `state` then `file`; never the
296/// reverse. The leader holds `file` WITHOUT `state` during IO so
297/// enqueues continue while fsync runs.
298#[derive(Debug)]
299struct WalGroup {
300    state: Mutex<WalGroupState>,
301    cond: std::sync::Condvar,
302    /// Active chunk file handle. Separate lock from `state` so
303    /// the leader's write+fsync doesn't block concurrent
304    /// enqueues. Swapped by `checkpoint()` at rotation.
305    file: Mutex<File>,
306}
307
308#[derive(Debug)]
309struct WalGroupState {
310    /// Encoded records awaiting flush.
311    buf: Vec<u8>,
312    /// Monotonic enqueue counter (1-based).
313    enqueued_seq: u64,
314    /// Highest seq whose record is fsynced.
315    flushed_seq: u64,
316    /// True while some caller is inside the leader IO section.
317    leader_active: bool,
318    /// Sticky fatal error — a failed fsync poisons the WAL
319    /// (loud, never silent). All current + future waiters error.
320    failed: Option<String>,
321    /// Bytes written to the active chunk since rotation —
322    /// drives the auto-checkpoint trigger.
323    written_len: u64,
324}
325
326/// Ticket returned by the buffered write path; `wait()` blocks
327/// until the record it covers is durable (or the WAL is
328/// poisoned). Cheap to move across threads.
329#[derive(Debug)]
330pub struct WalTicket {
331    group: Arc<WalGroup>,
332    seq: u64,
333}
334
335/// v7.34 (crash-recovery P0 #2) — RAII reset for the WalGroup leader
336/// flag. Electing a leader sets `leader_active = true` and releases the
337/// state lock for the sleep+IO window; if a panic unwinds through that
338/// window the flag would stay true and every follower would park forever
339/// on the condvar — no one left to flush or wake them, the same
340/// total-write hang an unclean stop causes, but self-inflicted. This
341/// guard clears the flag and wakes the followers (so one re-elects) on
342/// ANY drop, including a panic unwind; the normal path disarms it after
343/// resetting the flag itself.
344struct LeaderGuard<'a> {
345    group: &'a WalGroup,
346    armed: bool,
347}
348
349impl Drop for LeaderGuard<'_> {
350    fn drop(&mut self) {
351        if self.armed {
352            let mut g = self.group.state.lock().unwrap_or_else(|e| e.into_inner());
353            g.leader_active = false;
354            drop(g);
355            self.group.cond.notify_all();
356        }
357    }
358}
359
360impl WalGroup {
361    fn new(file: File, initial_len: u64) -> Self {
362        Self {
363            state: Mutex::new(WalGroupState {
364                buf: Vec::new(),
365                enqueued_seq: 0,
366                flushed_seq: 0,
367                leader_active: false,
368                failed: None,
369                written_len: initial_len,
370            }),
371            cond: std::sync::Condvar::new(),
372            file: Mutex::new(file),
373        }
374    }
375
376    /// Append `record` to the pending batch. Returns the seq the
377    /// caller must wait on. Called under the engine write lock —
378    /// keep it O(memcpy).
379    fn enqueue(&self, record: &[u8]) -> u64 {
380        let mut g = self.state.lock().unwrap_or_else(|e| e.into_inner());
381        g.buf.extend_from_slice(record);
382        g.enqueued_seq += 1;
383        g.enqueued_seq
384    }
385
386    /// Block until `seq` is durable. Leader-follower: the first
387    /// arriving waiter flushes for everyone.
388    fn wait_flushed(&self, seq: u64) -> Result<(), EngineError> {
389        let mut g = self.state.lock().unwrap_or_else(|e| e.into_inner());
390        loop {
391            if let Some(e) = &g.failed {
392                return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
393                    format!("WAL poisoned by earlier flush failure: {e}"),
394                )));
395            }
396            if g.flushed_seq >= seq {
397                return Ok(());
398            }
399            if !g.leader_active {
400                // Elect self leader.
401                g.leader_active = true;
402                drop(g);
403                // v7.34 — panic-safety: if anything below unwinds before
404                // `leader_active` is reset, this guard releases it +
405                // wakes a follower to re-elect (else all writers park
406                // forever). Disarmed on the normal path after the reset.
407                let mut leader_guard = LeaderGuard {
408                    group: self,
409                    armed: true,
410                };
411                // v7.20 — commit_delay (PG's same-named knob):
412                // before taking the batch, give in-flight
413                // writers a short window to enqueue so the
414                // shared fsync covers more commits. 150 µs costs
415                // ~3.5% on a solo 4.2 ms fsync but multiplies
416                // batch size under load. Tunable via
417                // SPG_COMMIT_DELAY_US (0 disables).
418                let delay = commit_delay_us();
419                if delay > 0 {
420                    std::thread::sleep(std::time::Duration::from_micros(delay));
421                }
422                let (batch, flush_to) = {
423                    let mut g2 = self.state.lock().unwrap_or_else(|e| e.into_inner());
424                    (core::mem::take(&mut g2.buf), g2.enqueued_seq)
425                };
426                let io_result: std::io::Result<()> = (|| {
427                    let mut f = self.file.lock().unwrap_or_else(|e| e.into_inner());
428                    f.write_all(&batch)?;
429                    f.sync_data()
430                })();
431                g = self.state.lock().unwrap_or_else(|e| e.into_inner());
432                g.leader_active = false;
433                leader_guard.armed = false; // normal completion — disarm
434                match io_result {
435                    Ok(()) => {
436                        g.flushed_seq = flush_to;
437                        g.written_len = g.written_len.saturating_add(batch.len() as u64);
438                    }
439                    Err(e) => {
440                        g.failed = Some(e.to_string());
441                    }
442                }
443                self.cond.notify_all();
444                //
445
446                // Loop continues: either our seq is now covered
447                // (leader path normally returns next iteration)
448                // or the error branch surfaces.
449                continue;
450            }
451            g = self.cond.wait(g).unwrap_or_else(|e| e.into_inner());
452        }
453    }
454
455    /// Drain the pending batch + flush synchronously. Caller must
456    /// guarantee no concurrent enqueues (checkpoint holds the
457    /// engine exclusively). Used before rotation so the marker
458    /// lands in the right chunk.
459    fn flush_now(&self) -> Result<(), EngineError> {
460        let mut g = self.state.lock().unwrap_or_else(|e| e.into_inner());
461        if let Some(e) = &g.failed {
462            return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
463                format!("WAL poisoned: {e}"),
464            )));
465        }
466        let batch = core::mem::take(&mut g.buf);
467        let flush_to = g.enqueued_seq;
468        if batch.is_empty() {
469            return Ok(());
470        }
471        drop(g);
472        let io: std::io::Result<()> = (|| {
473            let mut f = self.file.lock().unwrap_or_else(|e| e.into_inner());
474            f.write_all(&batch)?;
475            f.sync_data()
476        })();
477        let mut g = self.state.lock().unwrap_or_else(|e| e.into_inner());
478        match io {
479            Ok(()) => {
480                g.flushed_seq = flush_to;
481                g.written_len = g.written_len.saturating_add(batch.len() as u64);
482                self.cond.notify_all();
483                Ok(())
484            }
485            Err(e) => {
486                g.failed = Some(e.to_string());
487                self.cond.notify_all();
488                Err(io_err(e))
489            }
490        }
491    }
492
493    /// Swap the active chunk handle (rotation). Caller flushes
494    /// first; both locks taken in canonical order.
495    fn rotate_file(&self, new_file: File) {
496        let mut g = self.state.lock().unwrap_or_else(|e| e.into_inner());
497        let mut f = self.file.lock().unwrap_or_else(|e| e.into_inner());
498        *f = new_file;
499        g.written_len = 0;
500    }
501
502    fn written_len(&self) -> u64 {
503        let g = self.state.lock().unwrap_or_else(|e| e.into_inner());
504        g.written_len + g.buf.len() as u64
505    }
506}
507
508// ─────────────────────────────────────────────────────────────────────────────
509// CoW-2 (v7.34) — background-checkpoint worker.
510//
511// Splits checkpoint into two halves so the front-end pays only the cheap one:
512//   • Capture (`Database::snapshot_checkpoint_job`) — under &mut self,
513//     Arc-bump the catalog + cheap trailer/cold-segment clones + atomic
514//     commit_lsn load. Front returns to caller in microseconds.
515//   • Execute (`execute_checkpoint_job`, on the worker thread) — serialize
516//     the snapshot, tmp+rename the db / manifest files (each fsynced via
517//     the rename + dir-fsync), enqueue the v4 marker through the WalGroup
518//     (which is already thread-safe so live commits interleave fine),
519//     then rotate the chunk file.
520//
521// Replay floor is the marker LSN captured at front-end time. A crash any
522// time during the worker's sequence is safe: nothing past the previous
523// checkpoint's marker can have been forgotten until the new marker hits
524// the WAL, and live writes between the two go into the same chunk under
525// the old marker — replay re-applies them after restoring the (older)
526// snapshot. snapshot+manifest atomicity (D10) is unchanged from the sync
527// path — CoW-4 tightens it later.
528//
529// Single-instance: a state machine of {pending, inflight} so a new
530// trigger fires only when the worker is fully idle. Any sticky error
531// surfaces on the next `wait()`.
532
533#[derive(Debug)]
534struct CheckpointJob {
535    snapshot: spg_engine::EngineSnapshot,
536    marker_lsn: u64,
537    db_path: PathBuf,
538    wal_dir: PathBuf,
539    wal: Arc<WalGroup>,
540    /// Snapshot-time view of the cold-tier segment set. Carried into the
541    /// worker so any concurrent `freeze_oldest_to_cold` after the trigger
542    /// rides the *next* checkpoint's manifest — same staleness window
543    /// the sync path already had.
544    cold_segments: Vec<(u32, PathBuf)>,
545    /// Shared with `PersistenceCtx` so the worker's chunk rotation is
546    /// visible to subsequent diag / Drop introspection.
547    current_chunk_path: Arc<Mutex<PathBuf>>,
548}
549
550#[derive(Debug, Default)]
551struct CheckpointState {
552    /// Set by the front when it has a job ready; cleared when the worker
553    /// picks it up.
554    pending: Option<CheckpointJob>,
555    /// True while the worker is mid-execute. `pending.is_some() || inflight`
556    /// defines "busy" for the trigger / wait predicate.
557    inflight: bool,
558    /// Sticky error from the worker's last failure. Cleared when surfaced
559    /// to a `wait()` caller.
560    last_error: Option<EngineError>,
561    /// Drop signal — worker exits after the current job (or immediately if
562    /// idle and no pending).
563    shutdown: bool,
564}
565
566#[derive(Debug)]
567struct CheckpointWorker {
568    state: Arc<(Mutex<CheckpointState>, Condvar)>,
569    handle: Option<JoinHandle<()>>,
570}
571
572impl CheckpointWorker {
573    fn spawn() -> Self {
574        let state: Arc<(Mutex<CheckpointState>, Condvar)> =
575            Arc::new((Mutex::new(CheckpointState::default()), Condvar::new()));
576        let state_for_thread = Arc::clone(&state);
577        let handle = thread::Builder::new()
578            .name("spg-checkpoint".into())
579            .spawn(move || checkpoint_worker_loop(&state_for_thread))
580            .expect("spawn checkpoint worker");
581        Self {
582            state,
583            handle: Some(handle),
584        }
585    }
586
587    /// Try to enqueue a job. Returns `Ok(true)` if the worker accepted it,
588    /// `Ok(false)` if a job was already pending or in flight (skip — the
589    /// next trigger will pick up newer state). Surfaces any sticky error
590    /// from a previous run before considering the new job, so async paths
591    /// can't lose a failure indefinitely.
592    fn try_enqueue(&self, job: CheckpointJob) -> Result<bool, EngineError> {
593        let (lock, cond) = &*self.state;
594        let mut g = lock.lock().unwrap_or_else(|e| e.into_inner());
595        if let Some(e) = g.last_error.take() {
596            return Err(e);
597        }
598        if g.pending.is_some() || g.inflight {
599            return Ok(false);
600        }
601        g.pending = Some(job);
602        cond.notify_one();
603        Ok(true)
604    }
605
606    /// Block until the worker is idle (no pending, not in flight). Returns
607    /// any sticky error from the last run; clears it on the way out.
608    fn wait(&self) -> Result<(), EngineError> {
609        let (lock, cond) = &*self.state;
610        let mut g = lock.lock().unwrap_or_else(|e| e.into_inner());
611        while g.pending.is_some() || g.inflight {
612            g = cond.wait(g).unwrap_or_else(|e| e.into_inner());
613        }
614        match g.last_error.take() {
615            Some(e) => Err(e),
616            None => Ok(()),
617        }
618    }
619}
620
621impl Drop for CheckpointWorker {
622    fn drop(&mut self) {
623        {
624            let (lock, cond) = &*self.state;
625            let mut g = lock.lock().unwrap_or_else(|e| e.into_inner());
626            g.shutdown = true;
627            cond.notify_one();
628        }
629        if let Some(h) = self.handle.take() {
630            let _ = h.join();
631        }
632    }
633}
634
635fn checkpoint_worker_loop(state: &Arc<(Mutex<CheckpointState>, Condvar)>) {
636    let (lock, cond) = &**state;
637    loop {
638        let job = {
639            let mut g = lock.lock().unwrap_or_else(|e| e.into_inner());
640            while g.pending.is_none() && !g.shutdown {
641                g = cond.wait(g).unwrap_or_else(|e| e.into_inner());
642            }
643            if g.pending.is_none() {
644                // shutdown with no pending → exit cleanly.
645                return;
646            }
647            // Even on shutdown, drain the pending job first so the Drop-time
648            // final checkpoint is durable before exit.
649            let job = g.pending.take().expect("loop invariant");
650            g.inflight = true;
651            job
652        };
653        let result = execute_checkpoint_job(job);
654        {
655            let mut g = lock.lock().unwrap_or_else(|e| e.into_inner());
656            g.inflight = false;
657            if let Err(e) = result {
658                g.last_error = Some(e);
659            }
660            cond.notify_all();
661        }
662    }
663}
664
665fn execute_checkpoint_job(job: CheckpointJob) -> Result<(), EngineError> {
666    // 1. Serialize the captured snapshot. Heavy; this is the whole point
667    //    of CoW — it runs off the engine borrow.
668    let snapshot = job.snapshot.serialize();
669    // 2. Snapshot tmp+rename. Atomic on POSIX; rename implicitly fsyncs
670    //    the data the next directory walk sees.
671    let tmp = {
672        let mut t = job.db_path.clone();
673        let mut name = t
674            .file_name()
675            .map(std::ffi::OsStr::to_os_string)
676            .unwrap_or_default();
677        name.push(".tmp");
678        t.set_file_name(name);
679        t
680    };
681    std::fs::write(&tmp, &snapshot).map_err(io_err)?;
682    std::fs::rename(&tmp, &job.db_path).map_err(io_err)?;
683    // 3. Manifest tmp+rename (cold tier present).
684    if !job.cold_segments.is_empty() {
685        let snap_crc = spg_crypto::crc32::crc32(&snapshot);
686        let entries: Vec<ColdSegmentEntry> = job
687            .cold_segments
688            .iter()
689            .filter_map(|(segment_id, path)| {
690                let bytes = std::fs::read(path).ok()?;
691                Some(ColdSegmentEntry {
692                    segment_id: *segment_id,
693                    path: path.clone(),
694                    crc32: spg_crypto::crc32::crc32(&bytes),
695                })
696            })
697            .collect();
698        let manifest = CatalogManifest {
699            catalog_crc32: snap_crc,
700            cold_segments: entries,
701            wal_baseline_offset: 0,
702        };
703        let m_bytes = manifest.serialize();
704        let m_path = spg_manifest_path(&job.db_path);
705        if let Some(dir) = m_path.parent() {
706            std::fs::create_dir_all(dir).map_err(io_err)?;
707        }
708        let m_tmp = {
709            let mut t = m_path.clone();
710            let mut name = t
711                .file_name()
712                .map(std::ffi::OsStr::to_os_string)
713                .unwrap_or_default();
714            name.push(".tmp");
715            t.set_file_name(name);
716            t
717        };
718        std::fs::write(&m_tmp, &m_bytes).map_err(io_err)?;
719        std::fs::rename(&m_tmp, &m_path).map_err(io_err)?;
720    }
721    // 4. Enqueue the v4 checkpoint marker carrying the captured LSN. The
722    //    WalGroup is thread-safe so a live commit can interleave — the
723    //    marker's LSN, not its position in the chunk, anchors replay.
724    let marker_ts = wall_clock_micros();
725    let marker = encode_v4_checkpoint_marker(job.marker_lsn, marker_ts, &job.db_path);
726    job.wal.enqueue(&marker);
727    job.wal.flush_now()?;
728    // 5. Rotate the active chunk. New commits land in the fresh chunk;
729    //    pre-marker history stays addressable in the old chunk for PITR /
730    //    retention. The shared `current_chunk_path` is updated under its
731    //    own lock before the WalGroup swap so diag readers never see a
732    //    handle that no longer matches the recorded path.
733    let new_chunk_path = job
734        .wal_dir
735        .join(chunk_filename(marker_ts, job.marker_lsn + 1));
736    let new_handle = OpenOptions::new()
737        .create(true)
738        .append(true)
739        .read(true)
740        .open(&new_chunk_path)
741        .map_err(io_err)?;
742    fsync_dir(&job.wal_dir);
743    {
744        let mut p = job
745            .current_chunk_path
746            .lock()
747            .unwrap_or_else(|e| e.into_inner());
748        *p = new_chunk_path;
749    }
750    job.wal.rotate_file(new_handle);
751    Ok(())
752}
753
754impl WalTicket {
755    /// Block until the record this ticket covers is durable.
756    ///
757    /// Under `SPG_SYNCHRONOUS_COMMIT=off` this returns
758    /// immediately — the background flusher (or the next
759    /// checkpoint / clean shutdown) makes the record durable
760    /// within `SPG_WAL_WRITER_DELAY_MS`. Same contract as PG's
761    /// `synchronous_commit = off`.
762    ///
763    /// # Errors
764    /// Surfaces the leader's IO error if the batch flush failed
765    /// (the WAL is then poisoned for all subsequent writes).
766    pub fn wait(&self) -> Result<(), EngineError> {
767        if !synchronous_commit_on() {
768            return Ok(());
769        }
770        self.group.wait_flushed(self.seq)
771    }
772}
773
774/// v7.19 P3 — retention sweep loop. Runs in a dedicated thread
775/// spawned by `Database::open_path` when `SPG_PITR_RETENTION_HOURS`
776/// is set to a non-zero value. Wakes every
777/// `SPG_PITR_RETENTION_CHECK_SEC` (default 60 s), enumerates chunks
778/// under `wal_dir`, archives via `SPG_PITR_ARCHIVE_CMD` if set, and
779/// deletes anything older than `retention_hours`.
780///
781/// Loud-failure posture matches PG's `archive_command`: if the
782/// archive command returns non-zero, the chunk stays on disk and
783/// a warning prints to stderr. The retention sweep doesn't delete
784/// a chunk it failed to archive.
785fn retention_sweep_loop(
786    wal_dir: PathBuf,
787    retention_hours: u64,
788    check_interval: std::time::Duration,
789    archive_cmd: Option<String>,
790    shutdown: Arc<AtomicBool>,
791) {
792    while !shutdown.load(Ordering::SeqCst) {
793        if let Err(e) = retention_sweep_once(&wal_dir, retention_hours, archive_cmd.as_deref()) {
794            eprintln!("spg-embedded: retention sweep error: {e}");
795        }
796        // Sleep in short ticks so shutdown isn't blocked on a
797        // 60 s naptime when Drop signals.
798        let mut elapsed = std::time::Duration::ZERO;
799        let tick = std::time::Duration::from_millis(250);
800        while elapsed < check_interval {
801            if shutdown.load(Ordering::SeqCst) {
802                return;
803            }
804            std::thread::sleep(tick);
805            elapsed += tick;
806        }
807    }
808}
809
810/// v7.19 P3 — one retention sweep pass over `wal_dir`. Extracted
811/// from the loop so tests can drive it directly. Public so the
812/// e2e_pitr_retention integration test (and any future operator
813/// tooling that wants synchronous retention) can call it.
814pub fn retention_sweep_once(
815    wal_dir: &Path,
816    retention_hours: u64,
817    archive_cmd: Option<&str>,
818) -> std::io::Result<()> {
819    if !wal_dir.exists() {
820        return Ok(());
821    }
822    let now_us = wall_clock_micros();
823    let cutoff_us = (now_us as i128 - (retention_hours as i128 * 3_600 * 1_000_000)) as i64;
824    let chunks = sorted_wal_chunks(wal_dir)?;
825    for chunk in chunks {
826        // Don't sweep the most-recent chunk; it's the live one
827        // execute() is appending to. Compare against the largest
828        // filename-prefix unix_us.
829        let stem = match chunk.file_stem().and_then(|s| s.to_str()) {
830            Some(s) => s,
831            None => continue,
832        };
833        let chunk_us: i64 = stem
834            .split_once('_')
835            .and_then(|(prefix, _)| i64::from_str_radix(prefix, 16).ok())
836            .unwrap_or(0);
837        if chunk_us >= cutoff_us {
838            continue;
839        }
840        // Archive first if requested.
841        if let Some(cmd) = archive_cmd {
842            if !cmd.is_empty() {
843                let output = std::process::Command::new("sh")
844                    .arg("-c")
845                    .arg(cmd)
846                    .arg("--")
847                    .arg(&chunk)
848                    .output()?;
849                if !output.status.success() {
850                    eprintln!(
851                        "spg-embedded: SPG_PITR_ARCHIVE_CMD failed for {} (exit {}); chunk stays on disk",
852                        chunk.display(),
853                        output.status.code().unwrap_or(-1)
854                    );
855                    continue;
856                }
857            }
858        }
859        // Delete the chunk + its sibling .checksum if present.
860        if let Err(e) = std::fs::remove_file(&chunk) {
861            eprintln!(
862                "spg-embedded: retention remove {} failed: {e}",
863                chunk.display()
864            );
865            continue;
866        }
867        let mut cs = chunk.clone();
868        let mut name = cs.file_name().map(|n| n.to_os_string()).unwrap_or_default();
869        name.push(".checksum");
870        cs.set_file_name(name);
871        let _ = std::fs::remove_file(&cs);
872    }
873    Ok(())
874}
875
876/// v7.20 — group-commit delay window in µs (PG `commit_delay`
877/// analogue). The flush leader sleeps this long before taking
878/// the batch so concurrent writers pile in. Default 150 µs;
879/// `SPG_COMMIT_DELAY_US=0` disables.
880fn commit_delay_us() -> u64 {
881    static CACHED: std::sync::OnceLock<u64> = std::sync::OnceLock::new();
882    *CACHED.get_or_init(|| {
883        std::env::var("SPG_COMMIT_DELAY_US")
884            .ok()
885            .and_then(|s| s.parse::<u64>().ok())
886            .unwrap_or(150)
887    })
888}
889
890/// v7.20 — PG `synchronous_commit` analogue. `on` (default):
891/// `execute()` blocks until its WAL record is fsynced —
892/// zero-loss durability. `off`: `execute()` returns after the
893/// in-memory mutation + WAL enqueue; a background flusher
894/// thread writes + fsyncs every `SPG_WAL_WRITER_DELAY_MS`
895/// (default 200 ms — PG's `wal_writer_delay` default). Crash
896/// window = up to one flush interval of confirmed-but-unsynced
897/// commits — exactly the trade PG documents for the same
898/// setting. Clean shutdown (Drop / checkpoint) always flushes.
899fn synchronous_commit_on() -> bool {
900    static CACHED: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
901    *CACHED.get_or_init(|| {
902        !std::env::var("SPG_SYNCHRONOUS_COMMIT")
903            .map(|v| v.eq_ignore_ascii_case("off") || v == "0" || v.eq_ignore_ascii_case("false"))
904            .unwrap_or(false)
905    })
906}
907
908/// v7.20 — background WAL flusher cadence for
909/// `SPG_SYNCHRONOUS_COMMIT=off` (PG `wal_writer_delay`).
910fn wal_writer_delay_ms() -> u64 {
911    static CACHED: std::sync::OnceLock<u64> = std::sync::OnceLock::new();
912    *CACHED.get_or_init(|| {
913        std::env::var("SPG_WAL_WRITER_DELAY_MS")
914            .ok()
915            .and_then(|s| s.parse::<u64>().ok())
916            .filter(|&n| n > 0)
917            .unwrap_or(200)
918    })
919}
920
921fn pitr_retention_hours() -> u64 {
922    std::env::var("SPG_PITR_RETENTION_HOURS")
923        .ok()
924        .and_then(|s| s.parse::<u64>().ok())
925        .unwrap_or(0)
926}
927
928fn pitr_retention_check_sec() -> u64 {
929    std::env::var("SPG_PITR_RETENTION_CHECK_SEC")
930        .ok()
931        .and_then(|s| s.parse::<u64>().ok())
932        .filter(|&n| n > 0)
933        .unwrap_or(60)
934}
935
936fn pitr_archive_cmd() -> Option<String> {
937    std::env::var("SPG_PITR_ARCHIVE_CMD")
938        .ok()
939        .filter(|s| !s.is_empty())
940}
941
942/// v7.19 — replay every record from `wal_bytes` whose
943/// `commit_lsn` is strictly greater than `floor_lsn`. v3 records
944/// (no LSN) and v4 records with `commit_lsn <= floor_lsn` are
945/// skipped — the snapshot loaded ahead of this call already
946/// reflects them, and re-applying would DuplicateTable /
947/// double-insert. v3 records inside the legacy migration chunk
948/// always apply because the migration sets `floor_lsn = 0` and
949/// v3 records carry no LSN to compare; the pre-migration
950/// behaviour (every record replays) is what the migration
951/// preserves.
952///
953/// Returns the count of records successfully applied. Same
954/// torn-tail semantics as `replay_wal_into_engine`.
955fn replay_wal_filtered(
956    wal_bytes: &[u8],
957    engine: &mut Engine,
958    floor_lsn: u64,
959    quarantine: &mut Vec<QuarantinedStmt>,
960) -> Result<usize, String> {
961    let records = parse_wal_records(wal_bytes)?;
962    let mut applied = 0usize;
963    for r in &records {
964        // Skip markers + non-SQL records.
965        if r.type_byte == WAL_V3_TYPE_DURABILITY_CHECKPOINT
966            || r.type_byte == WAL_V4_TYPE_CHECKPOINT_MARKER
967        {
968            continue;
969        }
970        // v4 SQL records carry an LSN. Apply iff strictly above
971        // the snapshot floor.
972        if r.type_byte == WAL_V4_TYPE_AUTO_COMMIT_SQL
973            || r.type_byte == WAL_V4_TYPE_TX_COMMIT_SQL
974            || r.type_byte == WAL_V5_TYPE_ROW_REDO
975        {
976            if let Some(lsn) = r.commit_lsn {
977                if lsn <= floor_lsn {
978                    continue;
979                }
980            }
981        }
982        // v7.34 (crash-recovery P0 #2) — row-level redo record: apply the
983        // physical changes directly (O(changed rows)) instead of
984        // re-executing SQL (the O(records × rows) statement-replay that
985        // hung the mailrs P0). The payload is `encode_redo_log` bytes, not
986        // SQL, so it never enters the from_utf8 / split_statements path.
987        if r.type_byte == WAL_V5_TYPE_ROW_REDO {
988            let changes = spg_storage::decode_redo_log(r.sql)
989                .map_err(|e| format!("redo decode at offset {}: {e:?}", r.offset))?;
990            engine
991                .apply_redo(&changes)
992                .map_err(|e| format!("redo apply at offset {}: {e:?}", r.offset))?;
993            applied += 1;
994            continue;
995        }
996        // v3 records (type 0x01, no LSN) always apply — the
997        // legacy migration path is the only place they appear,
998        // and floor_lsn=0 there.
999        let sql = match std::str::from_utf8(r.sql) {
1000            Ok(s) => s,
1001            Err(e) => return Err(format!("non-UTF-8 SQL at offset {}: {e}", r.offset)),
1002        };
1003        // v7.21 — a tx-commit record carries the whole transaction
1004        // as a `";\n"`-joined script; auto-commit records are a
1005        // single statement, for which split_statements is a no-op.
1006        //
1007        // v7.30.1 (mailrs round-24 ask 2) — a statement the engine
1008        // REJECTS is quarantined, not fatal: "one statement failed
1009        // to replay" ≠ "the catalog is corrupt". Framing damage
1010        // (parse_wal_records / non-UTF-8 above) still errors — that
1011        // IS corruption. Subsequent statements of a tx script keep
1012        // applying: the bricking class is a no-op-at-runtime
1013        // statement that re-applies non-idempotently, and skipping
1014        // just it reconstructs the runtime state.
1015        for stmt in split_statements(sql) {
1016            if let Err(e) = engine.execute(stmt) {
1017                quarantine.push(QuarantinedStmt {
1018                    offset: r.offset,
1019                    sql: stmt.to_string(),
1020                    error: format!("{e:?}"),
1021                });
1022            }
1023        }
1024        applied += 1;
1025    }
1026    Ok(applied)
1027}
1028
1029/// v7.30.1 (mailrs round-24 ask 2) — one statement that failed to
1030/// re-apply during boot replay. Kept for forensics in a
1031/// `quarantine-*.log` beside the WAL chunks; the boot continues.
1032struct QuarantinedStmt {
1033    offset: usize,
1034    sql: String,
1035    error: String,
1036}
1037
1038fn format_quarantine_line(q: &QuarantinedStmt) -> String {
1039    format!("offset {}: {}\n  rejected: {}\n", q.offset, q.sql, q.error)
1040}
1041
1042/// v7.19 — WAL chunk filename format. Zero-padded 16-digit
1043/// hex on both parts so default lexicographic sort matches
1044/// numeric order, with the unix_us prefix coming first so
1045/// the on-disk listing is chronological too.
1046/// v7.34 (crash-recovery P0 #2) — fsync a directory so a newly created
1047/// file's entry is durable. `sync_data` on a chunk file persists its
1048/// bytes but NOT the parent directory entry that names it; a power loss
1049/// after creating a fresh WAL chunk could lose that entry and make the
1050/// chunk (and the committed records in it) unreachable on restart.
1051/// Best-effort — a platform that rejects directory fsync is no worse off.
1052fn fsync_dir(dir: &Path) {
1053    if let Ok(f) = File::open(dir) {
1054        let _ = f.sync_all();
1055    }
1056}
1057
1058fn chunk_filename(unix_us: i64, leading_lsn: u64) -> String {
1059    // Negative timestamps shouldn't happen in practice (we sit
1060    // post-1970), but clamp to 0 so the zero-padded
1061    // representation stays sortable.
1062    let us = unix_us.max(0) as u64;
1063    format!("{us:016x}_{leading_lsn:016x}.wal")
1064}
1065
1066/// v7.19 — filename used for the legacy single-file WAL when
1067/// `open_path` migrates a v7.18-layout database into the new
1068/// chunk directory. Lexicographically smallest possible value
1069/// so subsequent chunks sort after it.
1070fn legacy_chunk_filename() -> String {
1071    chunk_filename(0, 0)
1072}
1073
1074/// CoW-4 (v7.34) — D10 fallback: read one cold-segment file and
1075/// hand its bytes to the catalog. The segment binary is self-validating
1076/// (magic + internal CRC32 via `OwnedSegment::from_bytes`), so we don't
1077/// need the manifest's `segment_crc32` to trust it. Returns `true` on a
1078/// successful attach (caller bumps `cold_segment_paths`), `false` on a
1079/// per-segment failure that is logged but doesn't abort boot.
1080fn attach_segment_from_disk(engine: &mut Engine, segment_id: u32, path: &Path) -> bool {
1081    if engine.catalog().cold_segment(segment_id).is_some() {
1082        return true;
1083    }
1084    let bytes = match std::fs::read(path) {
1085        Ok(b) => b,
1086        Err(e) => {
1087            eprintln!(
1088                "spg-embedded: cold-segment scan skip {}: read failed: {e}",
1089                path.display()
1090            );
1091            return false;
1092        }
1093    };
1094    let mut new_cat = engine.catalog().clone();
1095    if let Err(e) = new_cat.load_segment_bytes_at(segment_id, bytes) {
1096        eprintln!(
1097            "spg-embedded: cold-segment scan skip {}: parse/load failed: {e}",
1098            path.display()
1099        );
1100        return false;
1101    }
1102    engine.replace_catalog(new_cat);
1103    true
1104}
1105
1106/// CoW-4 (v7.34) — D10 + missing-manifest fallback: scan
1107/// `<db>.spg/segments/` for `seg_<id>.spg` files and attach any that
1108/// aren't already in `cold_segment_paths`. Closes the window where a
1109/// crash between snapshot rename and manifest rename leaves
1110/// post-checkpoint cold segments orphaned on disk (the snapshot's CRC
1111/// no longer matches the stale manifest, so the manifest path
1112/// silently dropped them). The segment parser self-verifies, so a
1113/// torn write surfaces as a per-segment skip, never silent corruption.
1114fn scan_cold_segments_dir(
1115    segments_dir: &Path,
1116    engine: &mut Engine,
1117    cold_segment_paths: &mut BTreeMap<u32, PathBuf>,
1118) {
1119    // v7.34.1 (mailrs prod report bug A): single-file catalogs (e.g.
1120    // `/data/spg/mailrs.spg` is a regular file, not the `<db>/<db>.spg`
1121    // layout this scan assumes) make the computed `<db>.spg/segments`
1122    // path traverse a file inode, which surfaces as ENOTDIR (`Not a
1123    // directory`, errno 20). Treat any non-directory state — absent,
1124    // file-in-the-way, stat-blocked — as "no segments to scan" and
1125    // silently return. The eprintln below only fires for the genuine
1126    // mid-walk read errors (permission flip, IO failure) that operators
1127    // need to see.
1128    if !segments_dir.is_dir() {
1129        return;
1130    }
1131    let read_dir = match std::fs::read_dir(segments_dir) {
1132        Ok(rd) => rd,
1133        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return,
1134        Err(e) => {
1135            eprintln!(
1136                "spg-embedded: cold-segment scan: cannot read {}: {e}",
1137                segments_dir.display()
1138            );
1139            return;
1140        }
1141    };
1142    for entry in read_dir.flatten() {
1143        let path = entry.path();
1144        // Only the canonical `seg_<id>.spg` form. `.tmp` half-renames
1145        // and unknown extensions are skipped — the segment writer's
1146        // tmp+rename pattern guarantees `.spg` files are either fully
1147        // written or absent.
1148        if path.extension().and_then(|s| s.to_str()) != Some("spg") {
1149            continue;
1150        }
1151        let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
1152            continue;
1153        };
1154        let Some(id_str) = stem.strip_prefix("seg_") else {
1155            continue;
1156        };
1157        let Ok(segment_id) = id_str.parse::<u32>() else {
1158            continue;
1159        };
1160        if cold_segment_paths.contains_key(&segment_id) {
1161            continue;
1162        }
1163        if attach_segment_from_disk(engine, segment_id, &path) {
1164            cold_segment_paths.insert(segment_id, path);
1165        }
1166    }
1167}
1168
1169/// v7.19 — list every `.wal` file in `wal_dir` in
1170/// lexicographic order (which doubles as chunk-creation
1171/// order thanks to the zero-padded filename format).
1172fn sorted_wal_chunks(wal_dir: &Path) -> std::io::Result<Vec<PathBuf>> {
1173    let mut paths = Vec::new();
1174    let read_dir = match std::fs::read_dir(wal_dir) {
1175        Ok(rd) => rd,
1176        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(paths),
1177        Err(e) => return Err(e),
1178    };
1179    for entry in read_dir {
1180        let entry = entry?;
1181        let path = entry.path();
1182        if path.extension().and_then(|s| s.to_str()) == Some("wal") {
1183            paths.push(path);
1184        }
1185    }
1186    paths.sort();
1187    Ok(paths)
1188}
1189
1190/// v7.18 PITR — encode one v4 `checkpoint_marker` record. Layout:
1191///
1192/// ```text
1193/// [u32 LE (payload_len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
1194/// [u32 LE crc32 over (type_byte || payload)]
1195/// [u8  type = 0x11]
1196/// payload:
1197///   [u64 LE checkpoint_lsn]
1198///   [i64 LE checkpoint_unix_us  (WAL_V4_NO_CLOCK if no clock)]
1199///   [u16 LE snapshot_path_len]
1200///   [snapshot_path_bytes]
1201/// ```
1202///
1203/// `payload_len` covers only the payload — keeping the framing
1204/// uniform across v3 / v4 record types so torn-write detection in
1205/// `replay_wal_into_engine` stays trivial.
1206fn encode_v4_checkpoint_marker(
1207    checkpoint_lsn: u64,
1208    checkpoint_unix_us: i64,
1209    snapshot_path: &Path,
1210) -> Vec<u8> {
1211    let snapshot_bytes = snapshot_path.to_string_lossy().into_owned();
1212    let snap_payload = snapshot_bytes.as_bytes();
1213    let snap_len_u16: u16 = snap_payload.len().min(u16::MAX as usize) as u16;
1214    let mut payload = Vec::with_capacity(8 + 8 + 2 + snap_payload.len());
1215    payload.extend_from_slice(&checkpoint_lsn.to_le_bytes());
1216    payload.extend_from_slice(&checkpoint_unix_us.to_le_bytes());
1217    payload.extend_from_slice(&snap_len_u16.to_le_bytes());
1218    payload.extend_from_slice(&snap_payload[..snap_len_u16 as usize]);
1219    let mut crc_buf = Vec::with_capacity(1 + payload.len());
1220    crc_buf.push(WAL_V4_TYPE_CHECKPOINT_MARKER);
1221    crc_buf.extend_from_slice(&payload);
1222    let crc = spg_crypto::crc32::crc32(&crc_buf);
1223    let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
1224    let mut out = Vec::with_capacity(4 + 4 + 1 + payload.len());
1225    out.extend_from_slice(&header);
1226    out.extend_from_slice(&crc.to_le_bytes());
1227    out.push(WAL_V4_TYPE_CHECKPOINT_MARKER);
1228    out.extend_from_slice(&payload);
1229    out
1230}
1231
1232/// v7.18 PITR — encode one v4 `auto_commit_sql` record. Layout:
1233///
1234/// ```text
1235/// [u32 LE (sql_len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
1236/// [u32 LE crc32 over (type_byte || lsn || ts || sql_bytes)]
1237/// [u8  type = 0x10]
1238/// [u64 LE commit_lsn]
1239/// [i64 LE commit_unix_us  (= WAL_V4_NO_CLOCK when no ClockFn)]
1240/// [sql bytes]
1241/// ```
1242///
1243/// `sql_len` field stays the SQL byte count — same shape as v3 — so
1244/// replay-buffer torn-write detection compares against
1245/// `WAL_V4_EXTRA_HEADER + sql_len`. v3 records (type 0x01) stay
1246/// readable by the same loop with their original 9-byte header
1247/// arithmetic.
1248fn encode_v4_auto_commit(sql: &str, commit_lsn: u64, commit_unix_us: i64) -> Vec<u8> {
1249    encode_v4_framed(
1250        WAL_V4_TYPE_AUTO_COMMIT_SQL,
1251        sql.as_bytes(),
1252        commit_lsn,
1253        commit_unix_us,
1254    )
1255}
1256
1257/// v7.21 — same envelope, `WAL_V4_TYPE_TX_COMMIT_SQL` type byte.
1258/// `script` = the transaction's statements joined with `";\n"`.
1259fn encode_v4_tx_commit(script: &str, commit_lsn: u64, commit_unix_us: i64) -> Vec<u8> {
1260    encode_v4_framed(
1261        WAL_V4_TYPE_TX_COMMIT_SQL,
1262        script.as_bytes(),
1263        commit_lsn,
1264        commit_unix_us,
1265    )
1266}
1267
1268/// v7.34 (crash-recovery P0 #2) — encode one row-level redo record. Same
1269/// v4 envelope + CRC, type byte 0x13; the payload is the
1270/// `encode_redo_log` bytes (physical changes) instead of SQL text, so
1271/// replay applies them in place of re-executing the statement.
1272fn encode_v5_row_redo(redo_bytes: &[u8], commit_lsn: u64, commit_unix_us: i64) -> Vec<u8> {
1273    encode_v4_framed(WAL_V5_TYPE_ROW_REDO, redo_bytes, commit_lsn, commit_unix_us)
1274}
1275
1276fn encode_v4_framed(
1277    type_byte: u8,
1278    payload: &[u8],
1279    commit_lsn: u64,
1280    commit_unix_us: i64,
1281) -> Vec<u8> {
1282    let mut crc_buf = Vec::with_capacity(1 + WAL_V4_EXTRA_HEADER + payload.len());
1283    crc_buf.push(type_byte);
1284    crc_buf.extend_from_slice(&commit_lsn.to_le_bytes());
1285    crc_buf.extend_from_slice(&commit_unix_us.to_le_bytes());
1286    crc_buf.extend_from_slice(payload);
1287    let crc = spg_crypto::crc32::crc32(&crc_buf);
1288    let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
1289    let mut out = Vec::with_capacity(4 + 4 + 1 + WAL_V4_EXTRA_HEADER + payload.len());
1290    out.extend_from_slice(&header);
1291    out.extend_from_slice(&crc.to_le_bytes());
1292    out.push(type_byte);
1293    out.extend_from_slice(&commit_lsn.to_le_bytes());
1294    out.extend_from_slice(&commit_unix_us.to_le_bytes());
1295    out.extend_from_slice(payload);
1296    out
1297}
1298
1299/// v7.1 — decode + apply every record in `wal_bytes` to `engine`.
1300/// Returns the count of records successfully applied. A truncated
1301/// trailing record (mid-write torn) is dropped silently — the
1302/// same recovery story `spg-server`'s boot path uses.
1303fn replay_wal_into_engine(wal_bytes: &[u8], engine: &mut Engine) -> Result<usize, String> {
1304    let mut applied = 0usize;
1305    let mut cur = 0usize;
1306    while cur < wal_bytes.len() {
1307        if wal_bytes.len() - cur < 4 {
1308            // Trailing partial header — torn write, drop and stop.
1309            break;
1310        }
1311        let raw_len = u32::from_le_bytes(wal_bytes[cur..cur + 4].try_into().unwrap());
1312        let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
1313        let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
1314        let len_mask = if is_v3 {
1315            !(WAL_V2_SENTINEL | WAL_V3_FLAG)
1316        } else {
1317            !WAL_V2_SENTINEL
1318        };
1319        let rec_len = (raw_len & len_mask) as usize;
1320        let header_len = if is_v3 {
1321            9
1322        } else if is_v2 {
1323            8
1324        } else {
1325            4
1326        };
1327        if wal_bytes.len() - cur < header_len + rec_len {
1328            // Torn record at the tail — drop, stop.
1329            break;
1330        }
1331        if is_v3 {
1332            let type_byte = wal_bytes[cur + 8];
1333            match type_byte {
1334                WAL_V3_TYPE_AUTO_COMMIT_SQL => {}
1335                WAL_V3_TYPE_DURABILITY_CHECKPOINT => {
1336                    // durability_checkpoint marker — skip, no SQL.
1337                    cur += header_len + rec_len;
1338                    continue;
1339                }
1340                WAL_V4_TYPE_CHECKPOINT_MARKER => {
1341                    // v7.18 PITR — checkpoint anchor, skip on replay
1342                    // (engine state past this point reflects the
1343                    // matching snapshot already loaded by the caller).
1344                    cur += header_len + rec_len;
1345                    continue;
1346                }
1347                WAL_V4_TYPE_AUTO_COMMIT_SQL | WAL_V4_TYPE_TX_COMMIT_SQL => {
1348                    // v7.18 PITR — v4 record carries 16 bytes of
1349                    // (commit_lsn, commit_unix_us) between the type
1350                    // byte and the SQL payload. Replay reads them but
1351                    // does not enforce them — the engine doesn't
1352                    // surface LSN/clock here. Restore tooling
1353                    // (spgctl) parses them via parse_wal_record below.
1354                    //
1355                    // v7.21 — tx-commit records (0x12) carry a whole
1356                    // transaction as a `";\n"`-joined script;
1357                    // split_statements is a no-op on the single-
1358                    // statement auto-commit form.
1359                    let v4_total = header_len + WAL_V4_EXTRA_HEADER + rec_len;
1360                    if wal_bytes.len() - cur < v4_total {
1361                        // Torn v4 record at the tail — drop, stop.
1362                        break;
1363                    }
1364                    let sql_start = cur + header_len + WAL_V4_EXTRA_HEADER;
1365                    let sql_bytes = &wal_bytes[sql_start..sql_start + rec_len];
1366                    let sql = std::str::from_utf8(sql_bytes)
1367                        .map_err(|e| format!("WAL replay: non-UTF-8 SQL at offset {cur}: {e}"))?;
1368                    for stmt in split_statements(sql) {
1369                        engine.execute(stmt).map_err(|e| {
1370                            format!("WAL replay: apply {stmt:?} at offset {cur} rejected: {e:?}")
1371                        })?;
1372                    }
1373                    applied += 1;
1374                    cur += v4_total;
1375                    continue;
1376                }
1377                other => {
1378                    return Err(format!(
1379                        "WAL replay: unknown v3 type byte {other:#04x} at offset {cur}"
1380                    ));
1381                }
1382            }
1383        }
1384        let sql_bytes = &wal_bytes[cur + header_len..cur + header_len + rec_len];
1385        let sql = std::str::from_utf8(sql_bytes)
1386            .map_err(|e| format!("WAL replay: non-UTF-8 SQL at offset {cur}: {e}"))?;
1387        engine
1388            .execute(sql)
1389            .map_err(|e| format!("WAL replay: apply {sql:?} at offset {cur} rejected: {e:?}"))?;
1390        applied += 1;
1391        cur += header_len + rec_len;
1392    }
1393    Ok(applied)
1394}
1395
1396/// v7.18 PITR — parsed WAL record, surfaced for restore / verify
1397/// tooling. The replay loop above doesn't expose LSN/timestamp;
1398/// `spgctl restore --to <timestamp>` and `spgctl verify` need them.
1399/// Returned offsets are byte-positions inside the WAL buffer.
1400#[derive(Debug, Clone)]
1401pub struct WalRecord<'a> {
1402    /// Byte offset in the WAL buffer where this record starts.
1403    pub offset: usize,
1404    /// Type byte (0x01 = v3 auto-commit, 0x10 = v4 auto-commit,
1405    /// 0x02 = durability checkpoint marker).
1406    pub type_byte: u8,
1407    /// `Some(lsn)` for v4 records, `None` for v3.
1408    pub commit_lsn: Option<u64>,
1409    /// `Some(unix_us)` for v4 records carrying a clock-set timestamp,
1410    /// `None` for v3 or for v4 records explicitly written with
1411    /// `WAL_V4_NO_CLOCK` (sentinel for "no ClockFn at commit time").
1412    pub commit_unix_us: Option<i64>,
1413    /// SQL payload as borrowed bytes. Empty for durability markers.
1414    pub sql: &'a [u8],
1415}
1416
1417/// v7.18 PITR — iterate over `wal_bytes` yielding one `WalRecord`
1418/// per intact record. Torn-tail records terminate iteration
1419/// silently (same recovery story as `replay_wal_into_engine`).
1420/// Unknown type bytes inside a v3 envelope return `Err` so the
1421/// caller knows the WAL was written by a newer SPG.
1422pub fn parse_wal_records(wal_bytes: &[u8]) -> Result<Vec<WalRecord<'_>>, String> {
1423    let mut out = Vec::new();
1424    let mut cur = 0usize;
1425    while cur < wal_bytes.len() {
1426        if wal_bytes.len() - cur < 4 {
1427            break;
1428        }
1429        let raw_len = u32::from_le_bytes(wal_bytes[cur..cur + 4].try_into().unwrap());
1430        let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
1431        let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
1432        let len_mask = if is_v3 {
1433            !(WAL_V2_SENTINEL | WAL_V3_FLAG)
1434        } else {
1435            !WAL_V2_SENTINEL
1436        };
1437        let rec_len = (raw_len & len_mask) as usize;
1438        let header_len = if is_v3 {
1439            9
1440        } else if is_v2 {
1441            8
1442        } else {
1443            4
1444        };
1445        if wal_bytes.len() - cur < header_len + rec_len {
1446            break;
1447        }
1448        if !is_v3 {
1449            // v1 / v2 records carry no type byte; treat as legacy
1450            // auto-commit SQL with no LSN/time.
1451            let sql = &wal_bytes[cur + header_len..cur + header_len + rec_len];
1452            out.push(WalRecord {
1453                offset: cur,
1454                type_byte: WAL_V3_TYPE_AUTO_COMMIT_SQL,
1455                commit_lsn: None,
1456                commit_unix_us: None,
1457                sql,
1458            });
1459            cur += header_len + rec_len;
1460            continue;
1461        }
1462        let type_byte = wal_bytes[cur + 8];
1463        match type_byte {
1464            WAL_V3_TYPE_AUTO_COMMIT_SQL => {
1465                let sql = &wal_bytes[cur + header_len..cur + header_len + rec_len];
1466                out.push(WalRecord {
1467                    offset: cur,
1468                    type_byte,
1469                    commit_lsn: None,
1470                    commit_unix_us: None,
1471                    sql,
1472                });
1473                cur += header_len + rec_len;
1474            }
1475            WAL_V3_TYPE_DURABILITY_CHECKPOINT => {
1476                out.push(WalRecord {
1477                    offset: cur,
1478                    type_byte,
1479                    commit_lsn: None,
1480                    commit_unix_us: None,
1481                    sql: &[],
1482                });
1483                cur += header_len + rec_len;
1484            }
1485            WAL_V4_TYPE_CHECKPOINT_MARKER => {
1486                // v7.18 PITR — payload = (lsn u64)(ts i64)(path_len u16)(path bytes).
1487                // We surface lsn + ts on the WalRecord; the path lives
1488                // in `sql` since the type byte already disambiguates
1489                // record meaning and adding a dedicated field would
1490                // bloat the iterator return type for every variant.
1491                if rec_len < 18 {
1492                    return Err(format!(
1493                        "WAL parse: checkpoint marker at offset {cur} too short ({rec_len} bytes)"
1494                    ));
1495                }
1496                let lsn = u64::from_le_bytes(
1497                    wal_bytes[cur + header_len..cur + header_len + 8]
1498                        .try_into()
1499                        .unwrap(),
1500                );
1501                let ts_raw = i64::from_le_bytes(
1502                    wal_bytes[cur + header_len + 8..cur + header_len + 16]
1503                        .try_into()
1504                        .unwrap(),
1505                );
1506                let path_len = u16::from_le_bytes(
1507                    wal_bytes[cur + header_len + 16..cur + header_len + 18]
1508                        .try_into()
1509                        .unwrap(),
1510                ) as usize;
1511                if rec_len < 18 + path_len {
1512                    return Err(format!(
1513                        "WAL parse: checkpoint marker at offset {cur} truncated path"
1514                    ));
1515                }
1516                let path_start = cur + header_len + 18;
1517                let path_bytes = &wal_bytes[path_start..path_start + path_len];
1518                let commit_unix_us = if ts_raw == WAL_V4_NO_CLOCK {
1519                    None
1520                } else {
1521                    Some(ts_raw)
1522                };
1523                out.push(WalRecord {
1524                    offset: cur,
1525                    type_byte,
1526                    commit_lsn: Some(lsn),
1527                    commit_unix_us,
1528                    sql: path_bytes,
1529                });
1530                cur += header_len + rec_len;
1531            }
1532            WAL_V4_TYPE_AUTO_COMMIT_SQL | WAL_V4_TYPE_TX_COMMIT_SQL | WAL_V5_TYPE_ROW_REDO => {
1533                let v4_total = header_len + WAL_V4_EXTRA_HEADER + rec_len;
1534                if wal_bytes.len() - cur < v4_total {
1535                    break;
1536                }
1537                let lsn = u64::from_le_bytes(
1538                    wal_bytes[cur + header_len..cur + header_len + 8]
1539                        .try_into()
1540                        .unwrap(),
1541                );
1542                let ts_raw = i64::from_le_bytes(
1543                    wal_bytes[cur + header_len + 8..cur + header_len + 16]
1544                        .try_into()
1545                        .unwrap(),
1546                );
1547                let commit_unix_us = if ts_raw == WAL_V4_NO_CLOCK {
1548                    None
1549                } else {
1550                    Some(ts_raw)
1551                };
1552                let sql_start = cur + header_len + WAL_V4_EXTRA_HEADER;
1553                let sql = &wal_bytes[sql_start..sql_start + rec_len];
1554                out.push(WalRecord {
1555                    offset: cur,
1556                    type_byte,
1557                    commit_lsn: Some(lsn),
1558                    commit_unix_us,
1559                    sql,
1560                });
1561                cur += v4_total;
1562            }
1563            other => {
1564                return Err(format!(
1565                    "WAL parse: unknown type byte {other:#04x} at offset {cur}"
1566                ));
1567            }
1568        }
1569    }
1570    Ok(out)
1571}
1572
1573/// v7.1 — predicate for "should the next `execute()` mutate the
1574/// WAL?" Returns `false` for SELECT / SHOW / EXPLAIN / BEGIN /
1575/// COMMIT / ROLLBACK and the SPG-specific verbs that don't go
1576/// through the auto-commit record path on the server (CHECKPOINT,
1577/// COMPACT). Conservative: anything we don't explicitly know is
1578/// read-only falls through to "write a WAL record".
1579fn sql_is_read_only(sql: &str) -> bool {
1580    let t = sql.trim_start();
1581    let head = t
1582        .split(|c: char| c.is_whitespace() || c == ';' || c == '(')
1583        .next()
1584        .unwrap_or("");
1585    matches!(
1586        head.to_ascii_lowercase().as_str(),
1587        "select"
1588            | "show"
1589            | "explain"
1590            | "begin"
1591            | "commit"
1592            | "rollback"
1593            | "checkpoint"
1594            | "compact"
1595            | "wait"
1596            | "with"
1597    )
1598}
1599
1600/// Embedded SPG database handle. Owns an `Engine` + provides
1601/// ergonomic wrappers around `execute` and `query`. Drops the
1602/// engine on `Drop` — no WAL flush / fsync, because v6.10.3
1603/// is in-memory only.
1604#[derive(Debug)]
1605pub struct Database {
1606    engine: Engine,
1607    /// v7.1 — persistence sidecar. When `Some(p)`, every
1608    /// `execute(sql)` that mutates state appends a v4
1609    /// `auto_commit_sql` WAL record + fsyncs before the call
1610    /// returns; `Drop` writes a final catalog snapshot to
1611    /// `<db_path>` so the next session boots from a clean
1612    /// snapshot + an empty WAL. `None` = in-memory only (the
1613    /// v6.10.3 shape).
1614    persistence: Option<PersistenceCtx>,
1615    /// v7.18 PITR — monotonic per-database commit LSN. Increments
1616    /// before each successful WAL append; bootstrapped at
1617    /// open_path from `max(parse_wal_records → commit_lsn)` so
1618    /// reopen never reuses an LSN. In-memory databases start at
1619    /// 0 and never advance (no WAL = no LSN-meaningful records).
1620    commit_lsn: AtomicU64,
1621    /// v7.21 (round-12 polish) — explicit-transaction WAL buffer.
1622    /// `Some` between an engine-accepted BEGIN and its
1623    /// COMMIT / ROLLBACK on a persistent database. In-transaction
1624    /// mutations only touch the engine's shadow catalog and report
1625    /// `modified_catalog: false`, so the per-statement auto-commit
1626    /// append never fires for them; their bind-final SQL collects
1627    /// here instead and COMMIT flushes the lot as ONE atomic
1628    /// `WAL_V4_TYPE_TX_COMMIT_SQL` record (ROLLBACK just drops it).
1629    /// Always `None` for in-memory databases.
1630    tx_wal: Option<TxWalBuffer>,
1631}
1632
1633/// See [`Database::tx_wal`].
1634#[derive(Debug, Default)]
1635struct TxWalBuffer {
1636    /// Bind-final SQL of every non-read-only statement the engine
1637    /// accepted inside the open transaction, in execution order.
1638    statements: Vec<String>,
1639    /// `(savepoint_name, statements.len() at SAVEPOINT time)` —
1640    /// `ROLLBACK TO SAVEPOINT` truncates `statements` back to the
1641    /// recorded mark so the WAL record matches what the engine
1642    /// keeps. PG name-reuse semantics (latest wins).
1643    savepoints: Vec<(String, usize)>,
1644}
1645
1646/// Statement-level transaction-control classification for the WAL
1647/// buffer. Runs AFTER the engine accepted the statement, so the
1648/// engine stays the single validator — this only mirrors state.
1649enum TxControl {
1650    Begin,
1651    Commit,
1652    Rollback,
1653    RollbackToSavepoint(String),
1654    Savepoint(String),
1655    ReleaseSavepoint,
1656}
1657
1658fn tx_control_kind(sql: &str) -> Option<TxControl> {
1659    let mut words = sql
1660        .split(|c: char| c.is_whitespace() || c == ';')
1661        .filter(|w| !w.is_empty())
1662        .map(str::to_ascii_lowercase);
1663    let head = words.next()?;
1664    match head.as_str() {
1665        "begin" | "start" => Some(TxControl::Begin),
1666        "commit" | "end" => Some(TxControl::Commit),
1667        "savepoint" => words.next().map(TxControl::Savepoint),
1668        "release" => Some(TxControl::ReleaseSavepoint),
1669        "rollback" => match words.next().as_deref() {
1670            // ROLLBACK TO [SAVEPOINT] <name>
1671            Some("to") => {
1672                let next = words.next()?;
1673                let name = if next == "savepoint" {
1674                    words.next()?
1675                } else {
1676                    next
1677                };
1678                Some(TxControl::RollbackToSavepoint(name))
1679            }
1680            _ => Some(TxControl::Rollback),
1681        },
1682        _ => None,
1683    }
1684}
1685
1686#[derive(Debug)]
1687#[allow(dead_code)] // `wal_dir`/`current_chunk_path` are read at boot; kept for Drop/diag introspection.
1688struct PersistenceCtx {
1689    db_path: PathBuf,
1690    /// v7.19 — WAL chunk directory at `<db_path>.wal/`.
1691    /// Replaces the v7.18 single-file `<db_path>.wal` layout.
1692    /// Each chunk file inside is named
1693    /// `<unix_us>_<leading_lsn>.wal` (zero-padded to 16 digits
1694    /// so default-lex sort = LSN order).
1695    wal_dir: PathBuf,
1696    /// Path of the currently-open chunk file inside `wal_dir`.
1697    /// Rotated at checkpoint and whenever the chunk crosses
1698    /// `checkpoint_threshold_bytes`. CoW-2 (v7.34) wraps it in
1699    /// `Arc<Mutex<…>>` because the background-checkpoint worker
1700    /// performs the rotation; this struct keeps a clone so Drop /
1701    /// diag introspection still see the live path.
1702    current_chunk_path: Arc<Mutex<PathBuf>>,
1703    /// v7.19 P3 — retention sweeper handle. `Some` when
1704    /// `SPG_PITR_RETENTION_HOURS > 0` at open_path time; `None`
1705    /// when retention is disabled (the default; v7.18 behaviour
1706    /// preserved). The thread polls `wal_dir` every
1707    /// `SPG_PITR_RETENTION_CHECK_SEC` seconds, archives via
1708    /// `SPG_PITR_ARCHIVE_CMD` if set, then deletes chunks older
1709    /// than the retention window. Signalled to exit via
1710    /// `retention_shutdown` on Drop.
1711    retention_shutdown: Option<Arc<AtomicBool>>,
1712    retention_thread: Option<std::thread::JoinHandle<()>>,
1713    /// v7.20 — background WAL flusher for
1714    /// `SPG_SYNCHRONOUS_COMMIT=off`. `None` in the default
1715    /// synchronous mode. Flushes the pending batch every
1716    /// `SPG_WAL_WRITER_DELAY_MS`; signalled + joined on Drop
1717    /// before the final checkpoint so clean shutdown never
1718    /// loses confirmed commits.
1719    flusher_shutdown: Option<Arc<AtomicBool>>,
1720    flusher_thread: Option<std::thread::JoinHandle<()>>,
1721    /// v7.20 P2 — group-commit WAL. Shared with WalTickets
1722    /// returned by the buffered write path so `wait()` can run
1723    /// after the engine write lock is released.
1724    wal: Arc<WalGroup>,
1725    checkpoint_threshold_bytes: u64,
1726    /// v7.1.4 — `<db_path>.spg/segments/` directory. Cold-tier
1727    /// segments produced by `freeze_oldest_to_cold` / compaction
1728    /// are persisted here as `seg_<id>.spg` files; the manifest
1729    /// at `<db_path>.spg/manifest.v10` records every active
1730    /// segment + its CRC32 so the next boot can verify + reload.
1731    cold_segments_dir: PathBuf,
1732    cold_segment_paths: BTreeMap<u32, PathBuf>,
1733    /// v7.17.0 Phase 6.2 — cross-process exclusion lock. Acquired
1734    /// via `fs::create_dir` on `<db_path>.lock` at open_path
1735    /// entry; released on Drop by `fs::remove_dir`. atomic on
1736    /// every supported platform. A second process opening the
1737    /// same path while the first is still alive hits the
1738    /// create_dir failure and returns
1739    /// `EngineError::Unsupported("database is locked by another
1740    /// process: …")`. Stale locks (process crashed mid-session)
1741    /// must be cleared via `Database::force_unlock(path)` —
1742    /// SPG can't safely fingerprint who owned a stale directory
1743    /// without a libc dep, which would violate spg-embedded's
1744    /// zero-deps charter.
1745    lock_path: PathBuf,
1746    /// CoW-2 (v7.34) — background-checkpoint worker. `None` only
1747    /// transiently inside `Drop` after the worker has been signalled
1748    /// and joined. The worker carries Arc clones of `wal` and
1749    /// `current_chunk_path`, so it can rotate the active chunk and
1750    /// reflect the new path back here even after the front-end has
1751    /// returned to the caller.
1752    checkpoint_worker: Option<CheckpointWorker>,
1753}
1754
1755impl Database {
1756    /// Open a fresh in-memory database. No WAL, no catalog
1757    /// snapshot on disk — perfect for tests + short-lived
1758    /// CLI tools.
1759    #[must_use]
1760    pub fn open_in_memory() -> Self {
1761        Self {
1762            engine: engine_with_query_byte_budget(Engine::new().with_clock(wall_clock_micros)),
1763            persistence: None,
1764            commit_lsn: AtomicU64::new(0),
1765            tx_wal: None,
1766        }
1767    }
1768
1769    /// v7.1 — Open or create a persistent database backed by
1770    /// the file at `db_path`. The WAL lives at `db_path` +
1771    /// ".wal" (e.g. `./data/spg.db` → `./data/spg.db.wal`). Boot
1772    /// path:
1773    ///
1774    /// 1. If `db_path` exists, restore the catalog snapshot.
1775    /// 2. If the WAL exists, replay every record into the
1776    ///    restored engine — the same recovery story
1777    ///    `spg-server` uses.
1778    /// 3. Open the WAL in append+sync mode so subsequent
1779    ///    `execute()` writes durably commit (one fsync per
1780    ///    mutation).
1781    ///
1782    /// `Drop` writes a final catalog snapshot + truncates the
1783    /// WAL — operators that need a sync barrier at a specific
1784    /// point use `checkpoint()` explicitly.
1785    pub fn open_path(db_path: impl AsRef<Path>) -> Result<Self, EngineError> {
1786        let db_path = db_path.as_ref().to_path_buf();
1787        // v7.19 — WAL is a directory of chunk files. Legacy
1788        // single-file path stays variable-named `wal_path` for
1789        // the backward-compat migration block below.
1790        let wal_path = {
1791            let mut p = db_path.clone();
1792            let name = p
1793                .file_name()
1794                .map(|n| {
1795                    let mut s = n.to_os_string();
1796                    s.push(".wal");
1797                    s
1798                })
1799                .unwrap_or_else(|| std::ffi::OsString::from(".wal"));
1800            p.set_file_name(name);
1801            p
1802        };
1803        let wal_dir = wal_path.clone();
1804        if let Some(parent) = db_path.parent()
1805            && !parent.as_os_str().is_empty()
1806        {
1807            std::fs::create_dir_all(parent).map_err(io_err)?;
1808        }
1809        // v7.17.0 Phase 6.2 — acquire cross-process exclusion
1810        // lock before touching any catalog / WAL bytes. atomic
1811        // mkdir on every supported platform; a second process
1812        // opening the same path while the first is still alive
1813        // hits the create_dir failure and gets a clear error.
1814        let lock_path = {
1815            let mut p = db_path.clone();
1816            let name = p
1817                .file_name()
1818                .map(|n| {
1819                    let mut s = n.to_os_string();
1820                    s.push(".lock");
1821                    s
1822                })
1823                .unwrap_or_else(|| std::ffi::OsString::from(".lock"));
1824            p.set_file_name(name);
1825            p
1826        };
1827        acquire_path_lock(&lock_path)?;
1828        let mut engine = if db_path.exists() {
1829            let bytes = std::fs::read(&db_path).map_err(io_err)?;
1830            let engine = Engine::restore_envelope(&bytes).map_err(|e| {
1831                EngineError::Storage(spg_storage::StorageError::Corrupt(format!(
1832                    "restore from {}: {e}",
1833                    db_path.display()
1834                )))
1835            })?;
1836            engine_with_query_byte_budget(engine.with_clock(wall_clock_micros))
1837        } else {
1838            engine_with_query_byte_budget(Engine::new().with_clock(wall_clock_micros))
1839        };
1840        // v7.1.4 — manifest-driven cold-segment reload. The
1841        // manifest sidecar pairs the catalog snapshot CRC with a
1842        // list of `(segment_id, path, crc32)` triples; verify
1843        // before loading so a torn or stale manifest doesn't
1844        // surface phantom data.
1845        let cold_segments_dir = {
1846            let parent = db_path.parent().unwrap_or_else(|| Path::new("."));
1847            let stem = db_path
1848                .file_stem()
1849                .unwrap_or_else(|| std::ffi::OsStr::new("db"))
1850                .to_string_lossy()
1851                .into_owned();
1852            parent.join(format!("{stem}.spg")).join("segments")
1853        };
1854        let mut cold_segment_paths: BTreeMap<u32, PathBuf> = BTreeMap::new();
1855        let manifest_pth = spg_manifest_path(&db_path);
1856        if manifest_pth.exists() && db_path.exists() {
1857            let m_bytes = std::fs::read(&manifest_pth).map_err(io_err)?;
1858            if let Ok(m) = CatalogManifest::deserialize(&m_bytes) {
1859                let snap_bytes = std::fs::read(&db_path).map_err(io_err)?;
1860                let snap_crc = spg_crypto::crc32::crc32(&snap_bytes);
1861                if snap_crc == m.catalog_crc32 {
1862                    for entry in &m.cold_segments {
1863                        if let Ok(seg_bytes) = std::fs::read(&entry.path) {
1864                            let computed = spg_crypto::crc32::crc32(&seg_bytes);
1865                            if computed != entry.crc32 {
1866                                eprintln!(
1867                                    "spg-embedded: manifest skip segment {}: CRC mismatch",
1868                                    entry.segment_id
1869                                );
1870                                continue;
1871                            }
1872                            if engine.catalog().cold_segment(entry.segment_id).is_some() {
1873                                // Already loaded via Catalog::clone path (shouldn't happen
1874                                // since Engine::new + restore_envelope don't populate cold).
1875                                continue;
1876                            }
1877                            let mut new_cat = engine.catalog().clone();
1878                            if let Err(e) =
1879                                new_cat.load_segment_bytes_at(entry.segment_id, seg_bytes)
1880                            {
1881                                eprintln!(
1882                                    "spg-embedded: manifest load segment {} failed: {e}",
1883                                    entry.segment_id
1884                                );
1885                                continue;
1886                            }
1887                            engine.replace_catalog(new_cat);
1888                            cold_segment_paths.insert(entry.segment_id, entry.path.clone());
1889                        } else {
1890                            eprintln!(
1891                                "spg-embedded: manifest skip segment {}: file unreadable",
1892                                entry.segment_id
1893                            );
1894                        }
1895                    }
1896                }
1897            }
1898        }
1899        // CoW-4 (v7.34) — D10 + missing-manifest fallback. Walk
1900        // `<db>.spg/segments/` and attach any `seg_<id>.spg` file that
1901        // the manifest didn't already cover (manifest absent / CRC
1902        // mismatched / a fresher freeze landed after the last
1903        // checkpoint wrote its manifest). The segment binary's own
1904        // magic + CRC32 guards integrity — no need to trust a stale
1905        // manifest entry to trust the file.
1906        scan_cold_segments_dir(&cold_segments_dir, &mut engine, &mut cold_segment_paths);
1907        // v7.19 — chunked WAL on-disk layout.
1908        //
1909        // Three cases handled here:
1910        //
1911        // 1. wal_dir exists as a DIRECTORY → scan its
1912        //    `<unix_us>_<leading_lsn>.wal` chunks (sorted
1913        //    lexicographically = chunk-creation order), replay
1914        //    them in sequence, advance the LSN watermark to the
1915        //    max commit_lsn seen.
1916        //
1917        // 2. wal_path exists as a FILE → legacy v7.18 layout.
1918        //    Migrate it: create `wal_dir/`, move the single file
1919        //    inside as `0000000000000000_0000000000000000.wal`,
1920        //    then fall through to case 1's replay loop.
1921        //
1922        // 3. Neither exists → fresh database; create wal_dir.
1923        let mut initial_lsn: u64 = 0;
1924        if wal_path.is_file() {
1925            // Case 2: legacy single-file WAL migration.
1926            let legacy_bytes = std::fs::read(&wal_path).map_err(io_err)?;
1927            std::fs::remove_file(&wal_path).map_err(io_err)?;
1928            std::fs::create_dir_all(&wal_dir).map_err(io_err)?;
1929            if !legacy_bytes.is_empty() {
1930                let migrated = wal_dir.join(legacy_chunk_filename());
1931                std::fs::write(&migrated, &legacy_bytes).map_err(io_err)?;
1932            }
1933        } else if !wal_dir.exists() {
1934            // Case 3: fresh database.
1935            std::fs::create_dir_all(&wal_dir).map_err(io_err)?;
1936        }
1937        // Cases 1 + 2 share replay logic now that wal_dir is
1938        // guaranteed to exist (and may be empty for case 3).
1939        //
1940        // Two-pass replay so we don't double-apply records the
1941        // snapshot already reflects:
1942        //
1943        // 1. Find the highest commit_lsn carried by a
1944        //    checkpoint_marker across all chunks. That LSN is the
1945        //    snapshot's high-water mark — anything ≤ it is
1946        //    already in `<db_path>` and replaying it would
1947        //    DuplicateTable / double-insert.
1948        // 2. Replay only records strictly above that LSN.
1949        //
1950        // Case 2 migration (legacy single-file WAL) lands here
1951        // too: the migrated chunk has no marker so the LSN floor
1952        // is 0 and every record applies — exactly the v7.18
1953        // behaviour the migration is supposed to preserve.
1954        let chunk_paths = sorted_wal_chunks(&wal_dir).map_err(io_err)?;
1955        let mut snapshot_lsn: u64 = 0;
1956        for chunk in &chunk_paths {
1957            let bytes = std::fs::read(chunk).map_err(io_err)?;
1958            if let Ok(records) = parse_wal_records(&bytes) {
1959                for r in &records {
1960                    if r.type_byte == WAL_V4_TYPE_CHECKPOINT_MARKER {
1961                        if let Some(l) = r.commit_lsn {
1962                            if l > snapshot_lsn {
1963                                snapshot_lsn = l;
1964                            }
1965                        }
1966                    }
1967                }
1968            }
1969        }
1970        let mut quarantined: Vec<QuarantinedStmt> = Vec::new();
1971        for chunk in &chunk_paths {
1972            let bytes = std::fs::read(chunk).map_err(io_err)?;
1973            if bytes.is_empty() {
1974                continue;
1975            }
1976            replay_wal_filtered(&bytes, &mut engine, snapshot_lsn, &mut quarantined)
1977                .map_err(|m| EngineError::Storage(spg_storage::StorageError::Corrupt(m)))?;
1978            if let Ok(records) = parse_wal_records(&bytes) {
1979                if let Some(max) = records.iter().filter_map(|r| r.commit_lsn).max() {
1980                    if max > initial_lsn {
1981                        initial_lsn = max;
1982                    }
1983                }
1984            }
1985        }
1986        // v7.30.1 (mailrs round-24 ask 2) — replay rejects no longer
1987        // brick the open. Persist the rejected statements beside the
1988        // WAL chunks for forensics and say so loudly; the boot
1989        // continues with every other record applied.
1990        if !quarantined.is_empty() {
1991            let mut body = String::new();
1992            for q in &quarantined {
1993                body.push_str(&format_quarantine_line(q));
1994            }
1995            let qpath = wal_dir.join(format!(
1996                "quarantine-{:016x}.log",
1997                wall_clock_micros().max(0) as u64
1998            ));
1999            match std::fs::write(&qpath, &body) {
2000                Ok(()) => eprintln!(
2001                    "spg-embedded: WAL replay quarantined {} statement(s) — boot continues; \
2002                     forensics at {}",
2003                    quarantined.len(),
2004                    qpath.display()
2005                ),
2006                Err(e) => eprintln!(
2007                    "spg-embedded: WAL replay quarantined {} statement(s) — boot continues; \
2008                     quarantine file write FAILED ({e}), entries follow:\n{body}",
2009                    quarantined.len()
2010                ),
2011            }
2012        }
2013        // Open the "current" chunk — either the last existing
2014        // chunk file (so subsequent appends extend it until the
2015        // size threshold rotates) or a fresh first chunk.
2016        let now_us = wall_clock_micros();
2017        let current_chunk_path = if let Some(last) = chunk_paths.last() {
2018            last.clone()
2019        } else {
2020            wal_dir.join(chunk_filename(now_us, initial_lsn + 1))
2021        };
2022        let wal_file = OpenOptions::new()
2023            .create(true)
2024            .append(true)
2025            .read(true)
2026            .open(&current_chunk_path)
2027            .map_err(io_err)?;
2028        // Persist the (possibly freshly created) chunk's directory entry.
2029        fsync_dir(&wal_dir);
2030        let wal_len = wal_file.metadata().map_err(io_err)?.len();
2031        let wal = Arc::new(WalGroup::new(wal_file, wal_len));
2032        // v7.19 P3 — spawn retention sweep thread when the
2033        // operator opted in via SPG_PITR_RETENTION_HOURS > 0.
2034        // Otherwise stay on the v7.18 behaviour (chunks accumulate
2035        // until something else — backup-pitr archival, manual
2036        // cleanup — moves them).
2037        let retention_hours = pitr_retention_hours();
2038        let (retention_shutdown, retention_thread) = if retention_hours > 0 {
2039            let shutdown = Arc::new(AtomicBool::new(false));
2040            let shutdown_clone = Arc::clone(&shutdown);
2041            let wal_dir_clone = wal_dir.clone();
2042            let check_interval = std::time::Duration::from_secs(pitr_retention_check_sec());
2043            let archive_cmd = pitr_archive_cmd();
2044            let handle = std::thread::Builder::new()
2045                .name("spg-pitr-retention".into())
2046                .spawn(move || {
2047                    retention_sweep_loop(
2048                        wal_dir_clone,
2049                        retention_hours,
2050                        check_interval,
2051                        archive_cmd,
2052                        shutdown_clone,
2053                    );
2054                })
2055                .map_err(io_err)?;
2056            (Some(shutdown), Some(handle))
2057        } else {
2058            (None, None)
2059        };
2060        // v7.20 — background flusher for SPG_SYNCHRONOUS_COMMIT=off.
2061        let (flusher_shutdown, flusher_thread) = if synchronous_commit_on() {
2062            (None, None)
2063        } else {
2064            let shutdown = Arc::new(AtomicBool::new(false));
2065            let shutdown_clone = Arc::clone(&shutdown);
2066            let group = Arc::clone(&wal);
2067            let interval = std::time::Duration::from_millis(wal_writer_delay_ms());
2068            let handle = std::thread::Builder::new()
2069                .name("spg-wal-flusher".into())
2070                .spawn(move || {
2071                    while !shutdown_clone.load(Ordering::SeqCst) {
2072                        std::thread::sleep(interval);
2073                        if let Err(e) = group.flush_now() {
2074                            eprintln!("spg-embedded: background WAL flush failed: {e:?}");
2075                        }
2076                    }
2077                    // Final drain on shutdown signal.
2078                    let _ = group.flush_now();
2079                })
2080                .map_err(io_err)?;
2081            (Some(shutdown), Some(handle))
2082        };
2083        // v7.34 (crash-recovery P0 #2) — arm row-level redo capture for
2084        // subsequent writes (AFTER replay, so re-executed SQL records
2085        // don't capture; 0x13 records replay via apply_redo and never do).
2086        if row_redo_enabled() {
2087            engine.set_redo_capture(true);
2088        }
2089        Ok(Self {
2090            engine,
2091            commit_lsn: AtomicU64::new(initial_lsn),
2092            tx_wal: None,
2093            persistence: Some(PersistenceCtx {
2094                db_path,
2095                wal_dir,
2096                current_chunk_path: Arc::new(Mutex::new(current_chunk_path)),
2097                wal,
2098                checkpoint_threshold_bytes: default_checkpoint_threshold_bytes(),
2099                cold_segments_dir,
2100                cold_segment_paths,
2101                lock_path,
2102                retention_shutdown,
2103                retention_thread,
2104                flusher_shutdown,
2105                flusher_thread,
2106                checkpoint_worker: Some(CheckpointWorker::spawn()),
2107            }),
2108        })
2109    }
2110
2111    /// v7.1.4 — freeze the oldest `max_rows` of `table_name`'s
2112    /// hot tier into a brand-new cold-tier segment + persist
2113    /// it to disk. Same semantics as `spg-server`'s freezer
2114    /// thread; embedded just runs the freeze synchronously on
2115    /// the caller's thread. Persistence + manifest update
2116    /// happen as part of the next `checkpoint()` (or on Drop).
2117    pub fn freeze_oldest_to_cold(
2118        &mut self,
2119        table_name: &str,
2120        index_name: &str,
2121        max_rows: usize,
2122    ) -> Result<spg_storage::FreezeReport, EngineError> {
2123        let report = self
2124            .engine
2125            .freeze_oldest_to_cold(table_name, index_name, max_rows)?;
2126        if let Some(p) = &mut self.persistence {
2127            std::fs::create_dir_all(&p.cold_segments_dir).map_err(io_err)?;
2128            let final_path = p
2129                .cold_segments_dir
2130                .join(format!("seg_{}.spg", report.segment_id));
2131            let tmp_path = p
2132                .cold_segments_dir
2133                .join(format!("seg_{}.spg.tmp", report.segment_id));
2134            std::fs::write(&tmp_path, &report.segment_bytes).map_err(io_err)?;
2135            std::fs::rename(&tmp_path, &final_path).map_err(io_err)?;
2136            p.cold_segment_paths.insert(report.segment_id, final_path);
2137        }
2138        Ok(report)
2139    }
2140
2141    /// v7.1 — override the auto-checkpoint WAL-size ceiling for
2142    /// this `Database` instance. Default is
2143    /// `SPG_EMBEDDED_CHECKPOINT_BYTES` env (4 MiB if unset); the
2144    /// setter wins. No-op when the database is in-memory.
2145    pub fn set_checkpoint_threshold_bytes(&mut self, bytes: u64) {
2146        if let Some(p) = &mut self.persistence {
2147            p.checkpoint_threshold_bytes = bytes.max(1);
2148        }
2149    }
2150
2151    /// v7.31 (memory campaign, round-26 ask 1/ask 4) — per-bucket
2152    /// memory snapshot for the embedding host. Poll it from prod to
2153    /// see where resident bytes live (rows / representation /
2154    /// indexes per table) and to drive host-side shedding before
2155    /// the kernel does it. Same numbers as the server path's
2156    /// `SELECT * FROM spg_memory_stats`.
2157    #[must_use]
2158    pub fn memory_stats(&self) -> spg_engine::MemoryStats {
2159        let mut stats = self.engine.memory_stats();
2160        // v7.31 C2 — fill in bucket D: the engine leaves `wal_bytes`
2161        // None (it has no WAL); we report the live (uncheckpointed)
2162        // WAL footprint via the same `written_len()` meter `metrics()`
2163        // reads. In-memory databases have no persistence → stays None.
2164        if let Some(p) = &self.persistence {
2165            stats.wal_bytes = Some(p.wal.written_len());
2166        }
2167        stats
2168    }
2169
2170    /// v7.1 — flush a fresh catalog snapshot to `db_path` and
2171    /// rotate the WAL. Idempotent; cheap when nothing has happened
2172    /// since the last checkpoint. No-op when the database is in-memory.
2173    ///
2174    /// CoW-2 (v7.34): the heavy half (serialize + tmp+rename + fsync +
2175    /// marker enqueue + chunk rotation) runs on a dedicated worker thread
2176    /// so the caller's engine borrow is released after the cheap capture
2177    /// step. This entry point keeps the **synchronous** contract — it
2178    /// waits for the worker to finish before returning — so existing
2179    /// callers, tests, and operator scripts see no behaviour change;
2180    /// they just pay one extra hop. The non-blocking variant lives at
2181    /// `trigger_checkpoint`, used by the auto-checkpoint hot path so
2182    /// the write that crossed `SPG_EMBEDDED_CHECKPOINT_BYTES` doesn't
2183    /// stall on disk IO.
2184    ///
2185    /// Called automatically when:
2186    /// - the WAL grows past `SPG_EMBEDDED_CHECKPOINT_BYTES` (default
2187    ///   4 MiB) at the end of an `execute()` (via `trigger_checkpoint`,
2188    ///   non-blocking), and
2189    /// - `Drop` runs (synchronous; best-effort, failures logged).
2190    pub fn checkpoint(&mut self) -> Result<(), EngineError> {
2191        if self.persistence.is_none() {
2192            return Ok(());
2193        }
2194        // Drain any prior async checkpoint first so our snapshot reflects
2195        // post-it state (and so a sticky error from it surfaces here, not
2196        // smeared across the next two `wait`s).
2197        self.wait_checkpoint()?;
2198        let Some(job) = self.snapshot_checkpoint_job() else {
2199            return Ok(());
2200        };
2201        let Some(worker) = self
2202            .persistence
2203            .as_ref()
2204            .and_then(|p| p.checkpoint_worker.as_ref())
2205        else {
2206            return Ok(());
2207        };
2208        // `wait_checkpoint` above guaranteed idle; `try_enqueue` only
2209        // returns Ok(false) when busy, so we expect Ok(true) here. The
2210        // bool is dropped — we wait unconditionally to honour the sync
2211        // contract.
2212        let _ = worker.try_enqueue(job)?;
2213        self.wait_checkpoint()
2214    }
2215
2216    /// CoW-2 (v7.34) — non-blocking checkpoint trigger used by the
2217    /// auto-checkpoint hot path (`wal_after_ok` over the threshold).
2218    /// Captures the engine state under `&mut self` then signals the
2219    /// background worker and returns; the serialize / fsync / rotate
2220    /// sequence runs on the worker thread. If a checkpoint is already
2221    /// pending or in flight, the new trigger is silently dropped —
2222    /// the next threshold crossing picks up the newer state.
2223    ///
2224    /// Sticky errors from a prior async run surface here (via
2225    /// `try_enqueue`), so a failed background checkpoint still reaches
2226    /// the caller eventually rather than vanishing.
2227    fn trigger_checkpoint(&mut self) -> Result<(), EngineError> {
2228        if self.persistence.is_none() {
2229            return Ok(());
2230        }
2231        let Some(job) = self.snapshot_checkpoint_job() else {
2232            return Ok(());
2233        };
2234        let Some(worker) = self
2235            .persistence
2236            .as_ref()
2237            .and_then(|p| p.checkpoint_worker.as_ref())
2238        else {
2239            return Ok(());
2240        };
2241        let _accepted = worker.try_enqueue(job)?;
2242        Ok(())
2243    }
2244
2245    /// CoW-2 (v7.34) — block until the background checkpoint worker is
2246    /// idle. Used by sync `checkpoint()` and by Drop to ensure the final
2247    /// snapshot is durable before the process exits.
2248    fn wait_checkpoint(&self) -> Result<(), EngineError> {
2249        match self
2250            .persistence
2251            .as_ref()
2252            .and_then(|p| p.checkpoint_worker.as_ref())
2253        {
2254            Some(w) => w.wait(),
2255            None => Ok(()),
2256        }
2257    }
2258
2259    /// CoW-2 (v7.34) — capture a checkpoint job under `&mut self` (or
2260    /// `&self`, since reading from atomics + cheap clones don't mutate).
2261    /// Returns `None` if the database is in-memory.
2262    fn snapshot_checkpoint_job(&self) -> Option<CheckpointJob> {
2263        let p = self.persistence.as_ref()?;
2264        Some(CheckpointJob {
2265            snapshot: self.engine.snapshot_data(),
2266            marker_lsn: self.commit_lsn.load(Ordering::SeqCst),
2267            db_path: p.db_path.clone(),
2268            wal_dir: p.wal_dir.clone(),
2269            wal: Arc::clone(&p.wal),
2270            cold_segments: p
2271                .cold_segment_paths
2272                .iter()
2273                .map(|(&id, path)| (id, path.clone()))
2274                .collect(),
2275            current_chunk_path: Arc::clone(&p.current_chunk_path),
2276        })
2277    }
2278
2279    /// Restore a database from a previously-captured catalog
2280    /// snapshot. Pairs with `Database::snapshot()` for
2281    /// round-tripping in-memory state without going through
2282    /// the `spg-server` WAL.
2283    pub fn restore(snapshot: &[u8]) -> Result<Self, EngineError> {
2284        let engine = Engine::restore_envelope(snapshot).map_err(|e| {
2285            EngineError::Storage(spg_storage::StorageError::Corrupt(format!("restore: {e}")))
2286        })?;
2287        Ok(Self {
2288            engine,
2289            persistence: None,
2290            commit_lsn: AtomicU64::new(0),
2291            tx_wal: None,
2292        })
2293    }
2294
2295    /// Take a catalog snapshot suitable for `Database::restore`.
2296    /// The bytes are SPG's canonical catalog envelope (FILE_MAGIC
2297    /// + version + payload); round-trips through every released
2298    /// SPG version per the STABILITY contract.
2299    #[must_use]
2300    pub fn snapshot(&self) -> Vec<u8> {
2301        self.engine.snapshot()
2302    }
2303
2304    /// Execute a SQL statement and return the engine's
2305    /// `QueryResult` verbatim. Pass-through for callers that
2306    /// want to keep PG-flavoured column/row metadata.
2307    ///
2308    /// v7.1 — when the database was opened via `open_path`,
2309    /// successful mutations are appended to the WAL + fsynced
2310    /// before the call returns. A subsequent process crash will
2311    /// recover state up to the last successful return from
2312    /// `execute()`. Read-only statements (SELECT / SHOW /
2313    /// EXPLAIN / BEGIN-COMMIT-ROLLBACK / CHECKPOINT / COMPACT
2314    /// etc.) skip the WAL entirely.
2315    pub fn execute(&mut self, sql: &str) -> Result<QueryResult, EngineError> {
2316        // v7.20 P2 — single-caller convenience over the buffered
2317        // path: enqueue + immediately wait. Batch size is 1 here,
2318        // so the durability behaviour (one fsync before Ok) is
2319        // identical to v7.19. Concurrent callers go through
2320        // `execute_buffered` (AsyncDatabase does) and share the
2321        // leader's fsync.
2322        let (result, ticket) = self.execute_buffered(sql)?;
2323        if let Some(t) = ticket {
2324            t.wait()?;
2325        }
2326        Ok(result)
2327    }
2328
2329    /// v7.20 P2 — group-commit write entry. Runs the engine
2330    /// mutation + encodes/enqueues the WAL record, then RETURNS
2331    /// WITHOUT waiting for the fsync. The caller must call
2332    /// [`WalTicket::wait`] before treating the write as durable
2333    /// — crucially, the caller can (and should) drop whatever
2334    /// lock guards this `Database` first, so the next writer's
2335    /// mutation overlaps this batch's fsync.
2336    ///
2337    /// `None` ticket = nothing hit the WAL (read-only statement,
2338    /// no-op DDL, or in-memory database) — the result is final
2339    /// as returned.
2340    ///
2341    /// # Errors
2342    /// Engine errors propagate unchanged. Auto-checkpoint (when
2343    /// the active chunk crosses the threshold) runs inline and
2344    /// may surface IO errors.
2345    pub fn execute_buffered(
2346        &mut self,
2347        sql: &str,
2348    ) -> Result<(QueryResult, Option<WalTicket>), EngineError> {
2349        let result = self.engine.execute(sql)?;
2350        let modified = matches!(
2351            &result,
2352            QueryResult::CommandOk {
2353                modified_catalog: true,
2354                ..
2355            }
2356        );
2357        let ticket = self.wal_after_ok(sql, modified)?;
2358        Ok((result, ticket))
2359    }
2360
2361    /// v7.21 (round-12 polish) — post-engine WAL bookkeeping shared
2362    /// by the simple ([`Self::execute_buffered`]) and prepared
2363    /// ([`Self::execute_prepared_buffered`]) write paths. `canonical`
2364    /// is the replay text (bind-final for prepared statements);
2365    /// `modified_catalog` comes from the engine result. Three routes:
2366    ///
2367    /// - transaction control → maintain [`Self::tx_wal`]: BEGIN opens
2368    ///   the buffer, COMMIT flushes it as ONE atomic
2369    ///   `WAL_V4_TYPE_TX_COMMIT_SQL` record, ROLLBACK drops it,
2370    ///   SAVEPOINT / ROLLBACK TO mark / truncate it. The engine has
2371    ///   already accepted the statement, so this only mirrors state.
2372    /// - inside an open transaction → buffer the statement (shadow-
2373    ///   catalog mutations report `modified_catalog: false`, so the
2374    ///   auto-commit arm below can't see them).
2375    /// - auto-commit mutation → classic per-statement v4 record.
2376    ///
2377    /// v7.18 PITR — v4 records carry commit LSN + wall-clock micros.
2378    /// The crash window remains one BATCH: replay re-applies
2379    /// idempotently exactly as before, and a torn batch tail drops
2380    /// cleanly (same torn-write handling).
2381    fn wal_after_ok(
2382        &mut self,
2383        canonical: &str,
2384        modified_catalog: bool,
2385    ) -> Result<Option<WalTicket>, EngineError> {
2386        if self.persistence.is_none() {
2387            return Ok(None);
2388        }
2389        let mut record = None;
2390        match tx_control_kind(canonical) {
2391            Some(TxControl::Begin) => {
2392                self.tx_wal = Some(TxWalBuffer::default());
2393            }
2394            Some(TxControl::Commit) => {
2395                if let Some(buf) = self.tx_wal.take()
2396                    && !buf.statements.is_empty()
2397                {
2398                    let script = buf.statements.join(";\n");
2399                    let lsn = self.commit_lsn.fetch_add(1, Ordering::SeqCst) + 1;
2400                    record = Some(encode_v4_tx_commit(&script, lsn, wall_clock_micros()));
2401                }
2402            }
2403            Some(TxControl::Rollback) => {
2404                self.tx_wal = None;
2405            }
2406            Some(TxControl::Savepoint(name)) => {
2407                if let Some(buf) = &mut self.tx_wal {
2408                    // PG name-reuse semantics: latest mark wins.
2409                    buf.savepoints.retain(|(n, _)| n != &name);
2410                    let mark = buf.statements.len();
2411                    buf.savepoints.push((name, mark));
2412                }
2413            }
2414            Some(TxControl::RollbackToSavepoint(name)) => {
2415                if let Some(buf) = &mut self.tx_wal
2416                    && let Some(pos) = buf.savepoints.iter().position(|(n, _)| n == &name)
2417                {
2418                    let mark = buf.savepoints[pos].1;
2419                    buf.statements.truncate(mark);
2420                    // Later savepoints die with the rollback; the
2421                    // target itself survives (PG keeps it
2422                    // re-rollbackable).
2423                    buf.savepoints.truncate(pos + 1);
2424                }
2425            }
2426            Some(TxControl::ReleaseSavepoint) => {
2427                // RELEASE folds the savepoint into the enclosing tx —
2428                // buffered statements stay. The mark also stays:
2429                // marks are only consulted by ROLLBACK TO, which the
2430                // engine validates first, so a dangling mark is
2431                // unreachable.
2432            }
2433            None => {
2434                if let Some(buf) = &mut self.tx_wal {
2435                    if !sql_is_read_only(canonical) {
2436                        buf.statements.push(canonical.to_string());
2437                    }
2438                } else if modified_catalog && !sql_is_read_only(canonical) {
2439                    let lsn = self.commit_lsn.fetch_add(1, Ordering::SeqCst) + 1;
2440                    // v7.34 (crash-recovery P0 #2) — hybrid log: when
2441                    // row-level redo is on and this statement produced row
2442                    // changes (DML), write a physical 0x13 redo record so
2443                    // replay applies it directly. A statement with no row
2444                    // changes (DDL: CREATE/ALTER, never goes through
2445                    // Table::insert/update/delete) drains an empty redo and
2446                    // keeps the SQL record so the schema still replays.
2447                    let redo = if row_redo_enabled() {
2448                        self.engine.take_redo()
2449                    } else {
2450                        Vec::new()
2451                    };
2452                    record = Some(if redo.is_empty() {
2453                        encode_v4_auto_commit(canonical, lsn, wall_clock_micros())
2454                    } else {
2455                        encode_v5_row_redo(
2456                            &spg_storage::encode_redo_log(&redo),
2457                            lsn,
2458                            wall_clock_micros(),
2459                        )
2460                    });
2461                }
2462            }
2463        }
2464        let mut ticket = None;
2465        if let Some(record) = record {
2466            let p = self.persistence.as_mut().expect("checked above");
2467            let seq = p.wal.enqueue(&record);
2468            ticket = Some(WalTicket {
2469                group: Arc::clone(&p.wal),
2470                seq,
2471            });
2472            if p.wal.written_len() >= p.checkpoint_threshold_bytes {
2473                // CoW-2 (v7.34): hot path — fire-and-forget. The worker
2474                // serializes off this thread so the commit that just
2475                // crossed the threshold doesn't stall on a multi-hundred-ms
2476                // snapshot write. Any sticky error from a prior async
2477                // checkpoint surfaces here.
2478                self.trigger_checkpoint()?;
2479            }
2480        }
2481        Ok(ticket)
2482    }
2483
2484    /// v7.3.0 — typed-row variant of [`Database::query`]. Each
2485    /// row decodes into a `T: FromSpgRow` so callers don't
2486    /// pattern-match on `Value` themselves. Use [`spg_row!`] to
2487    /// generate the impl, or write it by hand.
2488    pub fn query_typed<T: FromSpgRow>(&mut self, sql: &str) -> Result<Vec<T>, EngineError> {
2489        let rows = self.query(sql)?;
2490        rows.into_iter().map(|r| T::from_spg_row(&r)).collect()
2491    }
2492
2493    /// Run a SELECT and return rows as a `Vec<Vec<Value>>` —
2494    /// strips the column-schema metadata for read-side
2495    /// ergonomics. Errors on non-Rows results (DML / DDL
2496    /// statements should go through `execute` instead).
2497    pub fn query(&mut self, sql: &str) -> Result<Vec<Vec<Value>>, EngineError> {
2498        match self.engine.execute(sql)? {
2499            QueryResult::Rows { rows, .. } => Ok(rows.into_iter().map(|r| r.values).collect()),
2500            QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
2501                "query() expects a SELECT — use execute() for DML/DDL".into(),
2502            )),
2503            // v7.5.0 — QueryResult is #[non_exhaustive]; any future
2504            // variant is not a SELECT row stream, treat as Unsupported.
2505            _ => Err(EngineError::Unsupported(
2506                "query() expects a SELECT — use execute() for DML/DDL".into(),
2507            )),
2508        }
2509    }
2510
2511    /// v7.16.0 — column-aware variant of [`Self::query`].
2512    /// Returns the column schema vec alongside the rows so
2513    /// adapters (the spg-sqlx Row impl most notably) can drive
2514    /// name + type-based column lookups. Errors on non-Rows
2515    /// results identically to `query`.
2516    pub fn query_with_columns(
2517        &mut self,
2518        sql: &str,
2519    ) -> Result<(Vec<spg_storage::ColumnSchema>, Vec<Vec<Value>>), EngineError> {
2520        match self.engine.execute(sql)? {
2521            QueryResult::Rows { columns, rows } => {
2522                Ok((columns, rows.into_iter().map(|r| r.values).collect()))
2523            }
2524            QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
2525                "query_with_columns() expects a SELECT — use execute() for DML/DDL".into(),
2526            )),
2527            _ => Err(EngineError::Unsupported(
2528                "query_with_columns() expects a SELECT — use execute() for DML/DDL".into(),
2529            )),
2530        }
2531    }
2532
2533    /// v7.16.0 — column-aware variant of
2534    /// [`Self::query_prepared`]. Same shape as
2535    /// `query_with_columns` but driven from a prepared
2536    /// statement + bound params.
2537    pub fn query_prepared_with_columns(
2538        &mut self,
2539        stmt: &Statement,
2540        params: &[Value],
2541    ) -> Result<(Vec<spg_storage::ColumnSchema>, Vec<Vec<Value>>), EngineError> {
2542        match self.engine.execute_prepared(stmt.stmt.clone(), params)? {
2543            QueryResult::Rows { columns, rows } => {
2544                Ok((columns, rows.into_iter().map(|r| r.values).collect()))
2545            }
2546            QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
2547                "query_prepared_with_columns() expects a SELECT — use execute_prepared() for DML/DDL".into(),
2548            )),
2549            _ => Err(EngineError::Unsupported(
2550                "query_prepared_with_columns() expects a SELECT — use execute_prepared() for DML/DDL".into(),
2551            )),
2552        }
2553    }
2554
2555    /// Borrow the underlying engine. Escape hatch for callers
2556    /// that need access to `spg-engine` APIs not yet surfaced
2557    /// here (transactions, EXPLAIN ANALYZE, etc.).
2558    #[must_use]
2559    pub const fn engine(&self) -> &Engine {
2560        &self.engine
2561    }
2562
2563    /// Mutable borrow of the underlying engine. Same intent as
2564    /// `engine()` but for write-side APIs (e.g. inserting
2565    /// directly through `Catalog::insert` for high-throughput
2566    /// bulk loads that bypass SQL parsing).
2567    pub const fn engine_mut(&mut self) -> &mut Engine {
2568        &mut self.engine
2569    }
2570
2571    /// v7.16.0 — parse + plan a SQL string ONCE so subsequent
2572    /// `execute_prepared` / `query_prepared` calls can re-bind
2573    /// parameters without re-parsing. The returned [`Statement`]
2574    /// is a thin handle around the AST + cached source SQL; it's
2575    /// `Clone` so the same plan can drive many bind calls
2576    /// concurrently (each call clones the AST and runs
2577    /// placeholder substitution on the clone — the cached
2578    /// plan stays intact).
2579    ///
2580    /// Plan caching follows the engine's existing version-aware
2581    /// rule: a prepared `Statement` whose statistics version
2582    /// has rolled (ANALYZE ran between prepare and execute)
2583    /// will silently re-prepare under the hood. Callers don't
2584    /// need to detect this.
2585    ///
2586    /// Placeholders in the SQL use PG's `$1`, `$2`, … convention.
2587    /// `bind`-time `Value`s are passed as a slice; arity
2588    /// mismatches surface as `EvalError::PlaceholderOutOfRange`
2589    /// at `execute_prepared` time, not here.
2590    ///
2591    /// # Errors
2592    /// Surfaces `EngineError` (parse error / plan rewrite
2593    /// failure) from the underlying `Engine::prepare`.
2594    pub fn prepare(&mut self, sql: &str) -> Result<Statement, EngineError> {
2595        // Use the cached path so repeated prepares of the same
2596        // SQL are O(1). The engine's plan cache stays shared
2597        // across all callers of this Database — a single
2598        // `PgPool`-shaped consumer (or, later, the spg-sqlx
2599        // adapter) prepares once and reaps the win on every bind.
2600        let stmt = self
2601            .engine
2602            .prepare_cached(sql)
2603            .map_err(EngineError::Parse)?;
2604        Ok(Statement {
2605            stmt,
2606            sql: sql.to_string(),
2607        })
2608    }
2609
2610    /// v7.17.0 Phase 3.P0-66 — describe a SQL string without
2611    /// executing. Returns `(parameter_oid_count, output_columns)`
2612    /// where `output_columns` is empty for non-SELECT statements
2613    /// or for SELECT shapes the describe planner can't resolve
2614    /// (JOIN / subquery / unknown table). Wraps
2615    /// `Engine::describe_prepared` so the spg-sqlx bridge can
2616    /// surface PG-shape Describe replies for
2617    /// `sqlx::query!()` compile-time validation.
2618    ///
2619    /// # Errors
2620    /// Propagates parse errors from the underlying prepare path.
2621    pub fn describe(&mut self, sql: &str) -> Result<(Vec<u32>, Vec<ColumnSchema>), EngineError> {
2622        let stmt = self
2623            .engine
2624            .prepare_cached(sql)
2625            .map_err(EngineError::Parse)?;
2626        Ok(self.engine.describe_prepared(&stmt))
2627    }
2628
2629    /// v7.16.0 — execute a prepared statement with bound
2630    /// parameters. Mirrors `Engine::execute_prepared`: clones
2631    /// the AST, substitutes `$1..$N` → `params[0..N-1]`, runs.
2632    ///
2633    /// Persistence (WAL fsync + auto-checkpoint) follows the
2634    /// same rules as `execute(sql)`: mutating statements get a
2635    /// WAL record AFTER the in-memory exec succeeds. The WAL
2636    /// record carries the substituted, bind-final SQL, so
2637    /// replay reconstructs the same row state without needing
2638    /// the original prepared `Statement` to still be alive.
2639    ///
2640    /// # Errors
2641    /// Propagates engine errors. Param arity mismatch surfaces
2642    /// as `EvalError::PlaceholderOutOfRange`.
2643    pub fn execute_prepared(
2644        &mut self,
2645        stmt: &Statement,
2646        params: &[Value],
2647    ) -> Result<QueryResult, EngineError> {
2648        let (result, ticket) = self.execute_prepared_buffered(stmt, params)?;
2649        if let Some(t) = ticket {
2650            t.wait()?;
2651        }
2652        Ok(result)
2653    }
2654
2655    /// v7.20 P2 — group-commit variant of
2656    /// [`Database::execute_prepared`]. Same contract as
2657    /// [`Database::execute_buffered`]: mutation + enqueue happen
2658    /// here; the caller waits on the ticket AFTER releasing
2659    /// whatever lock guards this `Database`.
2660    ///
2661    /// # Errors
2662    /// Engine errors propagate unchanged; inline auto-checkpoint
2663    /// may surface IO errors.
2664    pub fn execute_prepared_buffered(
2665        &mut self,
2666        stmt: &Statement,
2667        params: &[Value],
2668    ) -> Result<(QueryResult, Option<WalTicket>), EngineError> {
2669        let result = self.engine.execute_prepared(stmt.stmt.clone(), params)?;
2670        let modified = matches!(
2671            &result,
2672            QueryResult::CommandOk {
2673                modified_catalog: true,
2674                ..
2675            }
2676        );
2677        // WAL persistence on the bind-final SQL. Build the
2678        // canonical Display form by re-printing the
2679        // placeholder-substituted statement (cheap — the AST
2680        // is already in hand from execute_prepared's internal
2681        // clone) so replay's path is identical to the
2682        // simple-query path. v7.21: also when a transaction is
2683        // open — in-tx mutations report `modified_catalog: false`
2684        // but must reach the tx WAL buffer (see `wal_after_ok`).
2685        let mut ticket = None;
2686        if self.persistence.is_some()
2687            && (modified
2688                || (self.tx_wal.is_some() && !sql_is_read_only(&stmt.sql))
2689                || tx_control_kind(&stmt.sql).is_some())
2690        {
2691            let mut wal_stmt = stmt.stmt.clone();
2692            crate::wal_render_with_params(&mut wal_stmt, params);
2693            let canonical = format!("{wal_stmt}");
2694            ticket = self.wal_after_ok(&canonical, modified)?;
2695        }
2696        Ok((result, ticket))
2697    }
2698
2699    /// v7.16.0 — run a prepared SELECT with bound params and
2700    /// return rows as `Vec<Vec<Value>>`, matching `query()`
2701    /// shape. SELECTs are read-only so this never writes the
2702    /// WAL.
2703    ///
2704    /// # Errors
2705    /// Returns `Unsupported` if the prepared statement isn't a
2706    /// SELECT (use `execute_prepared` for DML/DDL).
2707    pub fn query_prepared(
2708        &mut self,
2709        stmt: &Statement,
2710        params: &[Value],
2711    ) -> Result<Vec<Vec<Value>>, EngineError> {
2712        match self.engine.execute_prepared(stmt.stmt.clone(), params)? {
2713            QueryResult::Rows { rows, .. } => Ok(rows.into_iter().map(|r| r.values).collect()),
2714            QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
2715                "query_prepared() expects a SELECT — use execute_prepared() for DML/DDL".into(),
2716            )),
2717            _ => Err(EngineError::Unsupported(
2718                "query_prepared() expects a SELECT — use execute_prepared() for DML/DDL".into(),
2719            )),
2720        }
2721    }
2722
2723    /// v7.18 — parse + plan a SQL string against a
2724    /// `CatalogSnapshot`. Mirror of [`Database::prepare`] for the
2725    /// readonly fan-out path: no writer lock taken, no WAL write,
2726    /// no plan-cache mutation. Static-on-`Self` so callers can
2727    /// dispatch against a snapshot without an `&mut Database`
2728    /// borrow — `AsyncReadHandle::prepare` in spg-embedded-tokio
2729    /// is the load-bearing consumer.
2730    ///
2731    /// # Errors
2732    /// Propagates `EngineError::Parse` from the parser.
2733    pub fn prepare_on_snapshot(
2734        snapshot: &CatalogSnapshot,
2735        sql: &str,
2736    ) -> Result<Statement, EngineError> {
2737        let stmt =
2738            spg_engine::Engine::prepare_on_snapshot(snapshot, sql).map_err(EngineError::Parse)?;
2739        Ok(Statement {
2740            stmt,
2741            sql: sql.to_string(),
2742        })
2743    }
2744
2745    /// v7.18 — execute a prepared `Statement` against a
2746    /// `CatalogSnapshot` with bound params. Mirror of
2747    /// [`Database::execute_prepared`] on the readonly path:
2748    /// writes / DDL hit `EngineError::WriteRequired`. No WAL
2749    /// write, no writer lock, multiple snapshots can run
2750    /// concurrently — the snapshot is immutable from prepare time.
2751    ///
2752    /// # Errors
2753    /// Surfaces `EngineError::WriteRequired` for non-readonly
2754    /// statements; propagates other engine errors.
2755    pub fn execute_prepared_on_snapshot(
2756        snapshot: &CatalogSnapshot,
2757        stmt: &Statement,
2758        params: &[Value],
2759    ) -> Result<QueryResult, EngineError> {
2760        spg_engine::Engine::execute_readonly_prepared_on_snapshot(
2761            snapshot,
2762            stmt.stmt.clone(),
2763            params,
2764        )
2765    }
2766
2767    /// v7.28 (round-22) — deadline-bounded variant of
2768    /// [`Database::execute_prepared_on_snapshot`]. Returns
2769    /// `EngineError::Cancelled` once the budget elapses; the
2770    /// sqlx driver uses this to keep readonly-INLINE execution
2771    /// from monopolising the caller's async runtime (four slow
2772    /// inbox queries saturated mailrs's whole tokio pool) and
2773    /// re-runs over the blocking pool on timeout.
2774    ///
2775    /// # Errors
2776    /// `EngineError::Cancelled` on budget expiry; engine errors
2777    /// otherwise.
2778    pub fn execute_prepared_on_snapshot_with_budget(
2779        snapshot: &CatalogSnapshot,
2780        stmt: &Statement,
2781        params: &[Value],
2782        budget_us: u64,
2783    ) -> Result<QueryResult, EngineError> {
2784        fn mono_now_us() -> u64 {
2785            use std::time::{SystemTime, UNIX_EPOCH};
2786            // Monotonic enough for a per-call relative budget: the
2787            // engine only compares (now - start) against the budget
2788            // within one call.
2789            SystemTime::now()
2790                .duration_since(UNIX_EPOCH)
2791                .map(|d| u64::try_from(d.as_micros()).unwrap_or(u64::MAX))
2792                .unwrap_or(0)
2793        }
2794        let deadline = mono_now_us().saturating_add(budget_us);
2795        let token = spg_engine::CancelToken::none().with_deadline(mono_now_us, deadline);
2796        spg_engine::Engine::execute_readonly_prepared_on_snapshot_with_cancel(
2797            snapshot,
2798            stmt.stmt.clone(),
2799            params,
2800            token,
2801        )
2802    }
2803
2804    /// v7.18 — describe a SQL string against a
2805    /// `CatalogSnapshot`. Mirror of [`Database::describe`] on
2806    /// the readonly path. Pure function on the snapshot's
2807    /// catalog; safe to call from any thread.
2808    ///
2809    /// # Errors
2810    /// Propagates `EngineError::Parse` from the parser.
2811    pub fn describe_on_snapshot(
2812        snapshot: &CatalogSnapshot,
2813        sql: &str,
2814    ) -> Result<(Vec<u32>, Vec<ColumnSchema>), EngineError> {
2815        let stmt =
2816            spg_engine::Engine::prepare_on_snapshot(snapshot, sql).map_err(EngineError::Parse)?;
2817        Ok(spg_engine::Engine::describe_prepared_on_snapshot(
2818            snapshot, &stmt,
2819        ))
2820    }
2821
2822    /// v7.21 (round-12 polish) — run a multi-statement SQL script
2823    /// with PG simple-query semantics: the statements execute in
2824    /// order inside ONE implicit transaction, so a mid-script error
2825    /// rolls back the whole script (PG wraps every simple-query
2826    /// message in an implicit transaction). Three exceptions, all
2827    /// PG-faithful:
2828    ///
2829    /// - a script that carries its OWN transaction control
2830    ///   (BEGIN / COMMIT / …) runs statement-by-statement — the
2831    ///   script owns its boundaries;
2832    /// - a script run while the caller already has a transaction
2833    ///   open joins that transaction (no nested BEGIN), and the
2834    ///   caller's COMMIT / ROLLBACK decides its fate;
2835    /// - a single-statement script is plain auto-commit.
2836    ///
2837    /// Returns one `QueryResult` per executed statement. This is the
2838    /// engine behind `sqlx::raw_sql` (mailrs feeds whole
2839    /// `init-schema.sql` files through it) and `spgctl import`.
2840    ///
2841    /// # Errors
2842    /// The first failing statement's error propagates after the
2843    /// implicit ROLLBACK; nothing from the script remains applied.
2844    pub fn execute_script(&mut self, sql: &str) -> Result<Vec<QueryResult>, EngineError> {
2845        let stmts = split_statements(sql);
2846        let script_owns_tx = stmts.iter().any(|s| tx_control_kind(s).is_some());
2847        let wrap = stmts.len() > 1 && !script_owns_tx && !self.engine.in_transaction();
2848        if !wrap {
2849            let mut out = Vec::with_capacity(stmts.len());
2850            for stmt in &stmts {
2851                out.push(self.execute_dump_statement(stmt)?);
2852            }
2853            return Ok(out);
2854        }
2855        self.execute("BEGIN")?;
2856        let mut out = Vec::with_capacity(stmts.len());
2857        for stmt in &stmts {
2858            match self.execute_dump_statement(stmt) {
2859                Ok(r) => out.push(r),
2860                Err(e) => {
2861                    // Best-effort rollback; surface the script error.
2862                    let _ = self.execute("ROLLBACK");
2863                    return Err(e);
2864                }
2865            }
2866        }
2867        self.execute("COMMIT")?;
2868        Ok(out)
2869    }
2870
2871    /// v7.22 (round-13 T2) — execute one `split_statements` chunk,
2872    /// lowering a `COPY … FROM stdin;` block (statement + its data
2873    /// lines, as one chunk) to per-row INSERTs through the shared
2874    /// `spg_engine::copy` helpers. Default-format pg_dump emits
2875    /// COPY blocks, so the zero-change import promise needs this on
2876    /// the embed path; non-COPY statements pass straight through to
2877    /// [`Self::execute`]. Public so `spgctl import` can keep its
2878    /// per-statement error indexing while sharing the lowering.
2879    ///
2880    /// # Errors
2881    /// Engine errors propagate; for COPY the failing row's INSERT
2882    /// error carries the synthesized statement context.
2883    pub fn execute_dump_statement(&mut self, stmt: &str) -> Result<QueryResult, EngineError> {
2884        // Strip pg_dump's `-- Data for Name: …;` banner (it carries
2885        // semicolons of its own) before splitting head from data.
2886        let stmt_clean = strip_leading_sql_noise(stmt);
2887        let head_is_copy = stmt_clean
2888            .get(..4)
2889            .is_some_and(|p| p.eq_ignore_ascii_case("copy"));
2890        if head_is_copy
2891            && let Some((head, data)) = stmt_clean.split_once(';')
2892            && let Some(spec) = spg_engine::copy::parse_copy_from_stdin_head(head)
2893        {
2894            let mut affected: usize = 0;
2895            for line in data.lines() {
2896                // Empty fragments only occur at the chunk boundary
2897                // (the remainder of the COPY line right after `;`);
2898                // data rows are whole non-empty lines.
2899                let line = line.strip_suffix('\r').unwrap_or(line);
2900                if line.is_empty() {
2901                    continue;
2902                }
2903                let values = spg_engine::copy::decode_copy_text_row(line);
2904                let insert = spg_engine::copy::build_copy_insert(
2905                    &spec.table,
2906                    spec.columns.as_deref(),
2907                    &values,
2908                );
2909                match self.execute(&insert)? {
2910                    QueryResult::CommandOk { affected: n, .. } => affected += n,
2911                    _ => affected += 1,
2912                }
2913            }
2914            return Ok(QueryResult::CommandOk {
2915                affected,
2916                modified_catalog: false,
2917            });
2918        }
2919        self.execute(stmt)
2920    }
2921
2922    /// v7.2.0 — run `body` inside an implicit `BEGIN` /
2923    /// `COMMIT` pair. The body receives `&mut Database` so it
2924    /// can `execute()` / `query()` like any other code path;
2925    /// the only difference is that every write in the body
2926    /// lands inside one transaction, and a returned `Err` from
2927    /// the body triggers `ROLLBACK` before the error propagates.
2928    ///
2929    /// Nested calls are not supported — SPG's transaction
2930    /// model is single-writer with explicit `BEGIN` /
2931    /// `COMMIT` / `ROLLBACK`, and a nested `with_transaction`
2932    /// would hit `EngineError::Unsupported("nested
2933    /// transaction")` at the inner `BEGIN`.
2934    pub fn with_transaction<R, F>(&mut self, body: F) -> Result<R, EngineError>
2935    where
2936        F: FnOnce(&mut Self) -> Result<R, EngineError>,
2937    {
2938        self.execute("BEGIN")?;
2939        match body(self) {
2940            Ok(value) => {
2941                self.execute("COMMIT")?;
2942                Ok(value)
2943            }
2944            Err(e) => {
2945                // Best-effort rollback. If ROLLBACK itself
2946                // fails (rare — the engine reports it via
2947                // `Unsupported` only when there's no active
2948                // TX, which can't happen here) we surface the
2949                // original body error, not the rollback error.
2950                let _ = self.execute("ROLLBACK");
2951                Err(e)
2952            }
2953        }
2954    }
2955}
2956
2957impl Default for Database {
2958    fn default() -> Self {
2959        Self::open_in_memory()
2960    }
2961}
2962
2963/// v7.7.5 — observability snapshot returned by
2964/// [`Database::metrics`]. Plain data, no allocations beyond
2965/// what the struct itself takes; cheap to construct and
2966/// cheap to serialise.
2967#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2968#[non_exhaustive]
2969pub struct EmbeddedMetrics {
2970    /// Total live row count across every user table (hot
2971    /// tier only — cold-tier rows live in segment files).
2972    pub hot_rows: u64,
2973    /// Sum of `Table::hot_bytes` across every user table.
2974    /// Tracks against the freezer's `hot_tier_bytes` budget.
2975    pub hot_bytes: u64,
2976    /// Number of cold-tier segments registered in the catalog.
2977    /// Includes tombstoned slots (segments retired by
2978    /// compaction whose disk file may still be on disk).
2979    pub cold_segments: u64,
2980    /// User-table count (excludes any future engine-managed
2981    /// internal tables).
2982    pub tables: u64,
2983    /// WAL size at last `execute()` / `checkpoint()`. Zero
2984    /// when the database is in-memory.
2985    pub wal_bytes: u64,
2986    /// `true` when the database was opened with `open_path` —
2987    /// i.e. WAL + checkpoint persistence is active.
2988    pub persistent: bool,
2989}
2990
2991/// v7.2.1 — handle returned by `spawn_background_freezer`.
2992/// Drop signals the worker thread to wind down + joins it,
2993/// so a `Database` (or its shared `Arc<Mutex<Database>>`)
2994/// can safely drop after the handle does.
2995#[must_use = "the background freezer keeps running until this handle is dropped"]
2996#[derive(Debug)]
2997pub struct FreezerHandle {
2998    shutdown: Arc<AtomicBool>,
2999    join: Option<JoinHandle<()>>,
3000}
3001
3002impl FreezerHandle {
3003    /// v7.2.1 — request the worker stop + join. Idempotent;
3004    /// safe to call from `Drop` (which also calls it).
3005    pub fn stop(&mut self) {
3006        self.shutdown.store(true, Ordering::Release);
3007        if let Some(h) = self.join.take() {
3008            let _ = h.join();
3009        }
3010    }
3011}
3012
3013impl Drop for FreezerHandle {
3014    fn drop(&mut self) {
3015        self.stop();
3016    }
3017}
3018
3019/// v7.2.1 — knobs for `Database::spawn_background_freezer`.
3020#[derive(Debug, Clone)]
3021pub struct FreezerOptions {
3022    /// Tick interval. Worker wakes every `tick`, checks the
3023    /// catalog's `hot_tier_bytes`, and freezes if over budget.
3024    pub tick: Duration,
3025    /// Hot-tier byte budget. Exceeded → next tick freezes the
3026    /// largest table's oldest `batch_rows` rows into a new
3027    /// cold segment.
3028    pub hot_tier_bytes: u64,
3029    /// Max rows the freezer demotes per fire.
3030    pub batch_rows: usize,
3031    /// v7.7.4 — auto-compact threshold. When the catalog has
3032    /// at least this many cold segments across all tables, the
3033    /// freezer fires a compaction pass after its next freeze.
3034    /// Set to `usize::MAX` to disable auto-compact entirely;
3035    /// the default is `64`, matching the `spg-server` operating
3036    /// point for SPG_COLD_COMPACT_SEGMENT_THRESHOLD.
3037    pub compact_when_segments_exceed: usize,
3038    /// v7.7.4 — target segment size for compaction merges,
3039    /// in bytes. Default 64 MiB, mirroring `spg-server`. Small
3040    /// segments below this size are merge candidates;
3041    /// segments at or above stay untouched.
3042    pub compact_target_bytes: u64,
3043}
3044
3045impl Default for FreezerOptions {
3046    fn default() -> Self {
3047        // Match the `spg-server` freezer's default operating
3048        // point (SPG_HOT_TIER_BYTES = 4 GiB, batch 1000 rows,
3049        // tick every 1 s) so embedded behaviour is predictable
3050        // for operators familiar with the server.
3051        Self {
3052            tick: Duration::from_secs(1),
3053            hot_tier_bytes: 4 * 1024 * 1024 * 1024,
3054            batch_rows: 1000,
3055            compact_when_segments_exceed: 64,
3056            compact_target_bytes: 64 * 1024 * 1024,
3057        }
3058    }
3059}
3060
3061impl Database {
3062    /// v7.7.4 — observe the catalog's cold-segment count.
3063    /// Useful for tests + dashboards that want to verify
3064    /// auto-compaction is firing.
3065    #[must_use]
3066    pub fn cold_segment_count(&self) -> usize {
3067        self.engine.catalog().cold_segment_count()
3068    }
3069
3070    /// v7.7.5 — observability snapshot. Returns a point-in-time
3071    /// view of the engine + persistence counters. Cheap (no
3072    /// locks beyond the existing `&self` borrow), so safe to
3073    /// call from a hot metrics-scrape path.
3074    ///
3075    /// Fields mirror the operational dashboard
3076    /// [`spg-server`](https://crates.io/crates/spg-server) exposes,
3077    /// minus the network counters that don't apply to embedded.
3078    #[must_use]
3079    pub fn metrics(&self) -> EmbeddedMetrics {
3080        let cat = self.engine.catalog();
3081        let mut hot_rows: u64 = 0;
3082        let mut hot_bytes: u64 = 0;
3083        for name in cat.table_names() {
3084            if let Some(t) = cat.get(&name) {
3085                hot_rows = hot_rows.saturating_add(t.row_count() as u64);
3086                hot_bytes = hot_bytes.saturating_add(t.hot_bytes());
3087            }
3088        }
3089        let (wal_bytes, persistent) = match &self.persistence {
3090            Some(p) => (p.wal.written_len(), true),
3091            None => (0, false),
3092        };
3093        EmbeddedMetrics {
3094            hot_rows,
3095            hot_bytes,
3096            cold_segments: cat.cold_segment_count() as u64,
3097            tables: cat.table_count() as u64,
3098            wal_bytes,
3099            persistent,
3100        }
3101    }
3102
3103    /// v7.2.1 — spawn a background thread that periodically
3104    /// runs `freeze_oldest_to_cold` when the catalog-wide hot
3105    /// tier exceeds `opts.hot_tier_bytes`. The `Arc<Mutex<_>>`
3106    /// pattern matches the v7.2 sharing story: callers wrap
3107    /// their `Database` in `Arc::new(Mutex::new(db))` once,
3108    /// then clone the Arc for the worker + for foreground
3109    /// access. Return value is a handle whose `Drop` joins the
3110    /// worker.
3111    ///
3112    /// Picks the freeze target the same way `spg-server`'s
3113    /// freezer does: largest-`hot_bytes` user table with at
3114    /// least one BTree integer-PK index. Tables without a
3115    /// freezable index are skipped silently.
3116    pub fn spawn_background_freezer(
3117        db: Arc<Mutex<Database>>,
3118        opts: FreezerOptions,
3119    ) -> FreezerHandle {
3120        let shutdown = Arc::new(AtomicBool::new(false));
3121        let shutdown_for_thread = Arc::clone(&shutdown);
3122        let join = thread::Builder::new()
3123            .name("spg-embedded-freezer".into())
3124            .spawn(move || {
3125                background_freezer_loop(db, opts, shutdown_for_thread);
3126            })
3127            .expect("spawn background freezer thread");
3128        FreezerHandle {
3129            shutdown,
3130            join: Some(join),
3131        }
3132    }
3133}
3134
3135/// v7.2.1 — the freezer's main loop, factored out so the
3136/// `Database::spawn_background_freezer` path stays readable.
3137fn background_freezer_loop(
3138    db: Arc<Mutex<Database>>,
3139    opts: FreezerOptions,
3140    shutdown: Arc<AtomicBool>,
3141) {
3142    // Sleep in short slices so a shutdown request resolves
3143    // quickly (vs sleeping the full tick).
3144    let slice = Duration::from_millis(50.min(opts.tick.as_millis() as u64));
3145    let mut last_tick = std::time::Instant::now();
3146    loop {
3147        if shutdown.load(Ordering::Acquire) {
3148            return;
3149        }
3150        thread::sleep(slice);
3151        if last_tick.elapsed() < opts.tick {
3152            continue;
3153        }
3154        last_tick = std::time::Instant::now();
3155        let Ok(mut guard) = db.lock() else {
3156            return;
3157        };
3158        if guard.engine.catalog().hot_tier_bytes() <= opts.hot_tier_bytes {
3159            continue;
3160        }
3161        let Some((table, index)) = pick_freeze_target(&guard) else {
3162            continue;
3163        };
3164        let row_count = guard
3165            .engine
3166            .catalog()
3167            .get(&table)
3168            .map_or(0, spg_storage::Table::row_count);
3169        let to_freeze = opts.batch_rows.min(row_count);
3170        if to_freeze == 0 {
3171            continue;
3172        }
3173        if let Err(e) = guard.freeze_oldest_to_cold(&table, &index, to_freeze) {
3174            eprintln!("spg-embedded: background freeze on {table}.{index} failed: {e:?}");
3175            continue;
3176        }
3177        // v7.7.4 — auto-compact. If the catalog now carries
3178        // more cold segments than the configured threshold,
3179        // run a single compaction pass. Failures are reported
3180        // but don't kill the loop; the next tick will retry.
3181        let count = guard.engine.catalog().cold_segment_count();
3182        if count > opts.compact_when_segments_exceed {
3183            if let Err(e) = guard
3184                .engine
3185                .compact_cold_segments_with_target(opts.compact_target_bytes)
3186            {
3187                eprintln!(
3188                    "spg-embedded: background compact failed (segments={count}, \
3189                     threshold={}): {e:?}",
3190                    opts.compact_when_segments_exceed,
3191                );
3192            }
3193        }
3194    }
3195}
3196
3197/// v7.2.1 — pick the highest-`hot_bytes` user table with a
3198/// BTree integer-PK index. Returns `(table, index_name)` so the
3199/// caller can dispatch through `freeze_oldest_to_cold`.
3200fn pick_freeze_target(db: &Database) -> Option<(String, String)> {
3201    let cat = db.engine.catalog();
3202    let mut best: Option<(String, String, u64)> = None;
3203    for name in cat.table_names() {
3204        let Some(t) = cat.get(&name) else { continue };
3205        if t.row_count() == 0 {
3206            continue;
3207        }
3208        let cols = &t.schema().columns;
3209        let Some(idx) = t.indices().iter().find(|i| {
3210            matches!(i.kind, spg_storage::IndexKind::BTree(_))
3211                && i.column_position < cols.len()
3212                && matches!(
3213                    cols[i.column_position].ty,
3214                    spg_storage::DataType::SmallInt
3215                        | spg_storage::DataType::Int
3216                        | spg_storage::DataType::BigInt
3217                )
3218        }) else {
3219            continue;
3220        };
3221        let hot = t.hot_bytes();
3222        match best {
3223            None => best = Some((name, idx.name.clone(), hot)),
3224            Some((_, _, best_hot)) if hot > best_hot => {
3225                best = Some((name, idx.name.clone(), hot));
3226            }
3227            _ => {}
3228        }
3229    }
3230    best.map(|(t, i, _)| (t, i))
3231}
3232
3233/// v7.7.6 — replay the first `to_seq` records of the WAL at
3234/// `wal_path` into a fresh engine and write the resulting
3235/// catalog snapshot to `out_db_path`. Same semantics as
3236/// `spg revert --wal … --to-seq N --out …` from the CLI:
3237///
3238///   - `to_seq == 0` → snapshot is the empty catalog
3239///   - WAL records beyond `to_seq` are not applied
3240///   - durability-checkpoint markers (v3 type 0x02) are
3241///     consumed without counting against the budget
3242///
3243/// Returns the number of statements actually applied
3244/// (`≤ to_seq`). The output snapshot is byte-identical to
3245/// what `Database::open_path(out_db_path)` would consume on
3246/// a subsequent open.
3247///
3248/// This is the "rewind" operator for an embedded database
3249/// that has been corrupted by a poison statement or a
3250/// half-applied migration. Pair with `cold_segment_paths`
3251/// preservation if your cold-tier files are still on disk.
3252///
3253/// # Errors
3254///
3255/// - `wal_path` unreadable or truncated mid-record
3256/// - WAL record decodes to invalid UTF-8 SQL
3257/// - WAL record's SQL is rejected by the engine
3258/// - `out_db_path` unwritable
3259pub fn revert_wal_to_seq(
3260    wal_path: impl AsRef<Path>,
3261    to_seq: u64,
3262    out_db_path: impl AsRef<Path>,
3263) -> Result<u64, EngineError> {
3264    // v7.19 — accept either a single-file legacy WAL (v7.18 and
3265    // earlier layout) or a chunked WAL directory (v7.19+). For a
3266    // directory, concatenate every `.wal` chunk in sorted order
3267    // — the same order open_path replays them in — so revert
3268    // sees the full record stream.
3269    let path = wal_path.as_ref();
3270    let wal_bytes = if path.is_dir() {
3271        let mut combined = Vec::new();
3272        let chunks = sorted_wal_chunks(path).map_err(io_err)?;
3273        for chunk in chunks {
3274            let bytes = std::fs::read(&chunk).map_err(io_err)?;
3275            combined.extend_from_slice(&bytes);
3276        }
3277        combined
3278    } else {
3279        std::fs::read(path).map_err(io_err)?
3280    };
3281    let mut engine = Engine::new();
3282    let mut applied = 0u64;
3283    let mut cur = 0usize;
3284    while cur < wal_bytes.len() && applied < to_seq {
3285        let (sql_bytes, total) = decode_wal_record(&wal_bytes[cur..])?;
3286        cur += total;
3287        if sql_bytes.is_empty() {
3288            continue;
3289        }
3290        let sql = core::str::from_utf8(&sql_bytes).map_err(|e| {
3291            EngineError::Storage(spg_storage::StorageError::Corrupt(format!(
3292                "WAL record at offset {cur}: non-UTF-8 SQL: {e}"
3293            )))
3294        })?;
3295        // v7.21 — tx-commit records carry a multi-statement script;
3296        // split_statements is a no-op for single-statement records.
3297        for stmt in split_statements(sql) {
3298            engine.execute(stmt)?;
3299        }
3300        applied += 1;
3301    }
3302    let snapshot = engine.snapshot();
3303    std::fs::write(out_db_path.as_ref(), &snapshot).map_err(io_err)?;
3304    Ok(applied)
3305}
3306
3307/// v7.7.6 — decode one WAL record from a byte tail. Returns
3308/// `(sql_bytes, header_plus_payload_len)`. Handles the three
3309/// on-disk formats (v1 / v2 / v3) the same way the CLI
3310/// `decode_one_record` and the engine's `replay_wal_bytes`
3311/// do. CRCs are not re-validated; the caller's intent is
3312/// "apply", not "validate".
3313fn decode_wal_record(tail: &[u8]) -> Result<(Vec<u8>, usize), EngineError> {
3314    if tail.len() < 4 {
3315        return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
3316            format!("WAL truncated record: {} < 4 header bytes", tail.len()),
3317        )));
3318    }
3319    let raw_len = u32::from_le_bytes(tail[..4].try_into().unwrap());
3320    let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
3321    let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
3322    let len_mask = if is_v3 {
3323        !(WAL_V2_SENTINEL | WAL_V3_FLAG)
3324    } else {
3325        !WAL_V2_SENTINEL
3326    };
3327    let rec_len = (raw_len & len_mask) as usize;
3328    let header_len = if is_v3 {
3329        9
3330    } else if is_v2 {
3331        8
3332    } else {
3333        4
3334    };
3335    if tail.len() < header_len + rec_len {
3336        return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
3337            format!(
3338                "WAL truncated record: header+payload {} > available {}",
3339                header_len + rec_len,
3340                tail.len()
3341            ),
3342        )));
3343    }
3344    if is_v3 {
3345        let type_byte = tail[8];
3346        // v3 type 0x01 = auto_commit_sql (payload = SQL).
3347        // v3 type 0x02 = durability marker (no SQL to apply).
3348        // v4 type 0x10 = auto_commit_sql with 16-byte (lsn, ts)
3349        //                prefix between type and SQL — strip
3350        //                the prefix so the caller still sees raw
3351        //                SQL bytes.
3352        // Anything else is unknown.
3353        if type_byte == WAL_V3_TYPE_AUTO_COMMIT_SQL {
3354            let payload = &tail[header_len..header_len + rec_len];
3355            return Ok((payload.to_vec(), header_len + rec_len));
3356        }
3357        if type_byte == WAL_V4_TYPE_AUTO_COMMIT_SQL || type_byte == WAL_V4_TYPE_TX_COMMIT_SQL {
3358            let v4_total = header_len + WAL_V4_EXTRA_HEADER + rec_len;
3359            if tail.len() < v4_total {
3360                return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
3361                    format!(
3362                        "WAL truncated v4 record: header+payload {v4_total} > available {}",
3363                        tail.len()
3364                    ),
3365                )));
3366            }
3367            let sql_start = header_len + WAL_V4_EXTRA_HEADER;
3368            let sql_bytes = tail[sql_start..sql_start + rec_len].to_vec();
3369            return Ok((sql_bytes, v4_total));
3370        }
3371        // Caller treats empty payload as a skip-marker.
3372        return Ok((Vec::new(), header_len + rec_len));
3373    }
3374    let payload = &tail[header_len..header_len + rec_len];
3375    Ok((payload.to_vec(), header_len + rec_len))
3376}
3377
3378impl Drop for Database {
3379    fn drop(&mut self) {
3380        // v7.1 — best-effort final checkpoint when a persistent
3381        // Database leaves scope. Failures here go to stderr so
3382        // operators see them, but Drop can't propagate errors —
3383        // the WAL itself is already durable, so a checkpoint
3384        // miss only means the next boot replays a few more
3385        // records than strictly necessary.
3386        if self.persistence.is_some() {
3387            if let Err(e) = self.checkpoint() {
3388                eprintln!(
3389                    "spg-embedded: final checkpoint on Drop failed: {e:?} \
3390                     (WAL is intact; next open_path will replay)"
3391                );
3392            }
3393        }
3394        // v7.19 P3 / v7.20 — signal the retention + flusher
3395        // threads to exit, then wait for them. Done BEFORE the
3396        // lock release so background threads don't outlive the
3397        // database handle. The flusher drains the pending batch
3398        // on its way out (final flush_now in the thread body),
3399        // so `SPG_SYNCHRONOUS_COMMIT=off` never loses confirmed
3400        // commits across a clean shutdown.
3401        if let Some(ctx) = self.persistence.as_mut() {
3402            if let Some(shutdown) = ctx.retention_shutdown.take() {
3403                shutdown.store(true, Ordering::SeqCst);
3404            }
3405            if let Some(handle) = ctx.retention_thread.take() {
3406                let _ = handle.join();
3407            }
3408            if let Some(shutdown) = ctx.flusher_shutdown.take() {
3409                shutdown.store(true, Ordering::SeqCst);
3410            }
3411            if let Some(handle) = ctx.flusher_thread.take() {
3412                let _ = handle.join();
3413            }
3414            // CoW-2 (v7.34) — final checkpoint above left the worker
3415            // idle; explicitly drop it here so its shutdown signal +
3416            // thread join happens with a deterministic ordering (before
3417            // the lock release / persistence drop), not whenever Rust
3418            // happens to drop the PersistenceCtx fields.
3419            ctx.checkpoint_worker = None;
3420        }
3421        // v7.17.0 Phase 6.2 — release the cross-process lock on
3422        // clean shutdown. Failure is logged but never panics;
3423        // the operator can clear a stale lock via
3424        // `Database::force_unlock` if a crash kept the
3425        // directory around.
3426        if let Some(ctx) = &self.persistence
3427            && ctx.lock_path.exists()
3428        {
3429            // remove_dir_all: the lock dir carries the owner-pid
3430            // record since round-12.
3431            if let Err(e) = std::fs::remove_dir_all(&ctx.lock_path) {
3432                eprintln!(
3433                    "spg-embedded: lock release on Drop failed for {}: {e:?}",
3434                    ctx.lock_path.display()
3435                );
3436            }
3437        }
3438    }
3439}
3440
3441impl Database {
3442    /// v7.17.0 Phase 6.2 — clear a stale cross-process lock.
3443    /// Use when a previous process crashed mid-session and
3444    /// left `<db_path>.lock` behind. Operators should confirm
3445    /// no other process is currently using the database before
3446    /// calling this — SPG cannot fingerprint stale-vs-live
3447    /// without a libc dep, which would violate spg-embedded's
3448    /// zero-deps charter.
3449    pub fn force_unlock(db_path: impl AsRef<Path>) -> Result<(), EngineError> {
3450        let lock_path = {
3451            let mut p = db_path.as_ref().to_path_buf();
3452            let name = p
3453                .file_name()
3454                .map(|n| {
3455                    let mut s = n.to_os_string();
3456                    s.push(".lock");
3457                    s
3458                })
3459                .unwrap_or_else(|| std::ffi::OsString::from(".lock"));
3460            p.set_file_name(name);
3461            p
3462        };
3463        if !lock_path.exists() {
3464            return Ok(());
3465        }
3466        std::fs::remove_dir_all(&lock_path).map_err(io_err)
3467    }
3468}
3469
3470/// v7.1 — turn a `std::io::Error` into the workspace's
3471/// `EngineError` shape. `EngineError::Storage(Corrupt(_))` is
3472/// the closest existing variant — io failures during boot or
3473/// during a WAL append surface as a storage-layer fault to
3474/// callers, which keeps the public error enum unchanged.
3475fn io_err(e: std::io::Error) -> EngineError {
3476    EngineError::Storage(spg_storage::StorageError::Corrupt(format!("io: {e}")))
3477}
3478
3479/// v7.2.2 — `Database` is `Send`, so the recommended sharing
3480/// pattern for multi-threaded callers is `Arc<Mutex<Database>>`:
3481///
3482/// ```no_run
3483/// use std::sync::{Arc, Mutex};
3484/// use spg_embedded::Database;
3485///
3486/// let db = Database::open_in_memory();
3487/// let shared = Arc::new(Mutex::new(db));
3488/// let shared_for_worker = Arc::clone(&shared);
3489/// std::thread::spawn(move || {
3490///     let mut guard = shared_for_worker.lock().unwrap();
3491///     guard.execute("INSERT INTO t VALUES (1)").unwrap();
3492/// });
3493/// ```
3494///
3495/// Internal `RwLock`-wrapped state — letting many threads
3496/// hold concurrent `&Database` for `SELECT` without contending
3497/// — is parked as STABILITY § "Out of v7.2"; multi-reader
3498/// embedded throughput needs a planner-side change to release
3499/// the engine read lock between scans, which is the v7.x
3500/// "Choice A" line of work already documented in v6.9.1's
3501/// carve-out.
3502#[allow(dead_code)]
3503fn _database_is_send() {
3504    fn assert_send<T: Send>() {}
3505    assert_send::<Database>();
3506}
3507
3508/// v6.10.3 — trait that maps a row's columns onto a user
3509/// struct's fields. v7.3.0 ships the [`spg_row!`] declarative
3510/// macro that generates `impl FromSpgRow for YourStruct` from
3511/// a struct definition (no proc-macro, no syn/quote/
3512/// proc-macro2 deps — the workspace's "0 external deps"
3513/// policy holds).
3514///
3515/// Implementors map a row's columns onto a user struct's
3516/// fields. Errors surface as `EngineError::Unsupported` so the
3517/// caller's error type stays uniform.
3518pub trait FromSpgRow: Sized {
3519    /// Decode one query result row into `Self`. Called once per
3520    /// row by [`Database::query_typed`]. The slice length equals
3521    /// the number of columns in the SELECT projection.
3522    fn from_spg_row(row: &[Value]) -> Result<Self, EngineError>;
3523}
3524
3525/// v7.3.0 — declarative macro that generates `FromSpgRow` impl
3526/// for a user struct. Avoids proc-macro deps
3527/// (syn/quote/proc-macro2) so the workspace's 0-deps policy
3528/// holds; the trade-off vs `#[derive(SpgRow)]` is that the
3529/// macro takes the entire struct definition (fields + types)
3530/// as input rather than annotating an existing struct.
3531///
3532/// ```no_run
3533/// use spg_embedded::{Database, spg_row, FromSpgRow};
3534///
3535/// spg_row! {
3536///     pub struct User {
3537///         pub id: i32,
3538///         pub name: String,
3539///     }
3540/// }
3541///
3542/// let mut db = Database::open_in_memory();
3543/// db.execute("CREATE TABLE users (id INT NOT NULL, name TEXT)").unwrap();
3544/// db.execute("INSERT INTO users VALUES (1, 'alice')").unwrap();
3545/// let users: Vec<User> = db.query_typed("SELECT id, name FROM users").unwrap();
3546/// ```
3547///
3548/// Supported field types: `i16`, `i32`, `i64`, `f32`, `f64`,
3549/// `bool`, `String`, `Vec<f32>` (for `VECTOR(N)` columns),
3550/// `Option<T>` of any of the above.
3551#[macro_export]
3552macro_rules! spg_row {
3553    (
3554        $(#[$meta:meta])*
3555        $vis:vis struct $name:ident {
3556            $(
3557                $(#[$fmeta:meta])*
3558                $fvis:vis $field:ident : $ty:ty,
3559            )*
3560        }
3561    ) => {
3562        $(#[$meta])*
3563        #[derive(Debug, Clone)]
3564        $vis struct $name {
3565            $(
3566                $(#[$fmeta])*
3567                $fvis $field : $ty,
3568            )*
3569        }
3570
3571        impl $crate::FromSpgRow for $name {
3572            fn from_spg_row(row: &[$crate::Value]) -> ::core::result::Result<Self, $crate::EngineError> {
3573                let mut __spg_row_iter = row.iter();
3574                $(
3575                    let $field: $ty = {
3576                        let v = __spg_row_iter
3577                            .next()
3578                            .ok_or_else(|| $crate::EngineError::Unsupported(
3579                                ::std::format!(
3580                                    "spg_row! {}: missing column for field `{}`",
3581                                    ::core::stringify!($name),
3582                                    ::core::stringify!($field)
3583                                )
3584                            ))?;
3585                        <$ty as $crate::FromSpgValue>::from_spg_value(v)
3586                            .map_err(|e| $crate::EngineError::Unsupported(
3587                                ::std::format!(
3588                                    "spg_row! {}: column `{}`: {}",
3589                                    ::core::stringify!($name),
3590                                    ::core::stringify!($field),
3591                                    e
3592                                )
3593                            ))?
3594                    };
3595                )*
3596                Ok(Self { $($field,)* })
3597            }
3598        }
3599    };
3600}
3601
3602/// v7.3.0 — per-column decoder used by `spg_row!`. Surface
3603/// covers every numeric / text / bytes / bool variant in
3604/// `Value`, plus `Option<T>` for nullable columns.
3605pub trait FromSpgValue: Sized {
3606    /// Decode one cell into `Self`. The returned `&'static str`
3607    /// is a short diagnostic for type mismatches (e.g. `"expected
3608    /// integer, got TEXT"`); callers wrap it into their own
3609    /// error type.
3610    fn from_spg_value(v: &Value) -> Result<Self, &'static str>;
3611}
3612
3613macro_rules! impl_from_value_int {
3614    ($($t:ty),* $(,)?) => {
3615        $(
3616            impl FromSpgValue for $t {
3617                fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3618                    match v {
3619                        Value::SmallInt(n) => <$t>::try_from(*n).map_err(|_| "SmallInt does not fit target int type"),
3620                        Value::Int(n)      => <$t>::try_from(*n).map_err(|_| "Int does not fit target int type"),
3621                        Value::BigInt(n)   => <$t>::try_from(*n).map_err(|_| "BigInt does not fit target int type"),
3622                        Value::Null        => Err("NULL in non-Option int column"),
3623                        _ => Err("non-integer value in int column"),
3624                    }
3625                }
3626            }
3627        )*
3628    };
3629}
3630impl_from_value_int!(i16, i32, i64);
3631
3632impl FromSpgValue for f32 {
3633    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3634        match v {
3635            Value::Float(f) => Ok(*f as f32),
3636            Value::Null => Err("NULL in non-Option float column"),
3637            _ => Err("non-float value in float column"),
3638        }
3639    }
3640}
3641
3642impl FromSpgValue for f64 {
3643    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3644        match v {
3645            Value::Float(f) => Ok(*f),
3646            Value::Null => Err("NULL in non-Option float column"),
3647            _ => Err("non-float value in float column"),
3648        }
3649    }
3650}
3651
3652impl FromSpgValue for bool {
3653    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3654        match v {
3655            Value::Bool(b) => Ok(*b),
3656            Value::Null => Err("NULL in non-Option bool column"),
3657            _ => Err("non-bool value in bool column"),
3658        }
3659    }
3660}
3661
3662impl FromSpgValue for String {
3663    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3664        match v {
3665            Value::Text(s) => Ok(s.clone()),
3666            Value::Null => Err("NULL in non-Option text column"),
3667            _ => Err("non-text value in String column"),
3668        }
3669    }
3670}
3671
3672impl FromSpgValue for Vec<f32> {
3673    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3674        match v {
3675            Value::Vector(xs) => Ok(xs.clone()),
3676            Value::Null => Err("NULL in non-Option vector column"),
3677            _ => Err("non-vector value in Vec<f32> column"),
3678        }
3679    }
3680}
3681
3682impl<T: FromSpgValue> FromSpgValue for Option<T> {
3683    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3684        match v {
3685            Value::Null => Ok(None),
3686            other => T::from_spg_value(other).map(Some),
3687        }
3688    }
3689}
3690
3691/// Acquire the cross-process exclusion lock at `lock_path` (atomic
3692/// `mkdir`), recording the owner pid inside. If the lock already
3693/// exists, read the recorded pid and probe liveness — a lock left
3694/// behind by a killed process (docker SIGKILL, crash) is reclaimed
3695/// automatically instead of forcing the operator to delete it by
3696/// hand (mailrs embed round-12: a restarted server came up in
3697/// degraded mode because the previous instance's lock survived).
3698/// v7.27 (mailrs round-21 B) — the prober's environment identity:
3699/// `(hostname, boot-or-container id)`. A pid is only meaningful
3700/// inside the PID namespace that recorded it; mailrs's recovery
3701/// window saw "locked by pid 1" from a STOPPED container because
3702/// the prober's pid 1 (its own init) was alive. When the lock's
3703/// identity differs from ours, liveness is UNDECIDABLE and we
3704/// refuse honestly instead of guessing in either direction.
3705fn host_identity() -> (String, String) {
3706    let hostname = std::process::Command::new("hostname")
3707        .output()
3708        .ok()
3709        .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string())
3710        .unwrap_or_default();
3711    // Linux boot id; containers share the host kernel's boot id, so
3712    // hostname (= container id by default) is the namespace
3713    // discriminator and boot id catches host reboots / pid reuse.
3714    let boot_id = std::fs::read_to_string("/proc/sys/kernel/random/boot_id")
3715        .map(|s| s.trim().to_string())
3716        .or_else(|_| {
3717            std::process::Command::new("sysctl")
3718                .args(["-n", "kern.bootsessionuuid"])
3719                .output()
3720                .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string())
3721        })
3722        .unwrap_or_default();
3723    (hostname, boot_id)
3724}
3725
3726/// v7.34 (crash-recovery P0 #2) — process start-time, to tell a reused
3727/// pid apart from a genuinely-held lock. In a container the holder is
3728/// always pid 1; `docker start` reuses the container so the NEW process
3729/// is pid 1 too, on the same host+boot id — a bare `pid_alive(1)` probe
3730/// (`ps -p 1` always succeeds) reads a dead owner's lock as live and the
3731/// engine self-deadlocks on its own catalog. The `(pid, start-time)`
3732/// pair is unique per live process within a boot: a reused pid carries a
3733/// LATER start-time, so a mismatch means the recorded owner is gone.
3734/// Linux reads `/proc/<pid>/stat` field 22 (clock ticks since boot);
3735/// `comm` (field 2) is parenthesised and may contain spaces, so fields
3736/// are taken after the LAST ')'. Other platforms return None and the
3737/// liveness check falls back to pid-alive + the self-pid reclaim. Pure
3738/// std — no libc.
3739#[cfg(target_os = "linux")]
3740fn process_start_time(pid: u32) -> Option<String> {
3741    let stat = std::fs::read_to_string(format!("/proc/{pid}/stat")).ok()?;
3742    let after = stat.rsplit_once(')').map(|(_, rest)| rest)?;
3743    // After comm: state(1) ppid(2) … starttime is the 20th token.
3744    after.split_whitespace().nth(19).map(str::to_string)
3745}
3746
3747#[cfg(not(target_os = "linux"))]
3748fn process_start_time(_pid: u32) -> Option<String> {
3749    None
3750}
3751
3752fn acquire_path_lock(lock_path: &Path) -> Result<(), EngineError> {
3753    for attempt in 0..2 {
3754        match std::fs::create_dir(lock_path) {
3755            Ok(()) => {
3756                // Best-effort owner record; liveness probing treats a
3757                // missing pid file as stale (crash between mkdir and
3758                // write is indistinguishable from an ancient lock).
3759                // v7.27 — lines 2+3 record the owner's environment
3760                // identity (hostname, boot id) so a prober in a
3761                // different namespace refuses instead of misreading
3762                // the pid. v7.34 — line 4 records the owner's process
3763                // start-time so a reused pid (container pid-1 restart)
3764                // is distinguishable from a live holder.
3765                let (host, boot) = host_identity();
3766                let start = process_start_time(std::process::id()).unwrap_or_default();
3767                let _ = std::fs::write(
3768                    lock_path.join("pid"),
3769                    format!("{}\n{host}\n{boot}\n{start}\n", std::process::id()),
3770                );
3771                return Ok(());
3772            }
3773            Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists && attempt == 0 => {
3774                let record = std::fs::read_to_string(lock_path.join("pid")).unwrap_or_default();
3775                let mut lines = record.lines();
3776                let owner = lines.next().and_then(|s| s.trim().parse::<u32>().ok());
3777                let lock_host = lines.next().unwrap_or("").trim().to_string();
3778                let lock_boot = lines.next().unwrap_or("").trim().to_string();
3779                let lock_start = lines.next().unwrap_or("").trim().to_string();
3780                // v7.27 — identity check BEFORE the pid probe. A pid
3781                // recorded in another namespace is undecidable both
3782                // ways (a stale lock can look held, a held lock can
3783                // look stale — the unsafe direction). Old-format
3784                // locks (pid only) keep the legacy same-host
3785                // assumption.
3786                if !lock_host.is_empty() {
3787                    let (my_host, my_boot) = host_identity();
3788                    let same_env = lock_host == my_host
3789                        && (lock_boot.is_empty() || my_boot.is_empty() || lock_boot == my_boot);
3790                    if !same_env {
3791                        return Err(EngineError::Unsupported(format!(
3792                            "database lock {} was taken in a different host/container \
3793                             (owner: pid {} on {:?}; we are {:?}) — liveness is \
3794                             undecidable from here. If you are sure the owner is gone, \
3795                             call Database::force_unlock() or `spg import --force-unlock`.",
3796                            lock_path.display(),
3797                            owner.unwrap_or(0),
3798                            lock_host,
3799                            my_host
3800                        )));
3801                    }
3802                }
3803                // v7.34 (crash-recovery P0 #2) — pid-reuse-safe liveness.
3804                // A bare `pid_alive` self-deadlocks in a container: the
3805                // dead owner was pid 1, `docker start` reuses the container
3806                // so the prober is pid 1 too, and `ps -p 1` always succeeds.
3807                // The recorded (pid, start-time) pair settles it — the
3808                // owner is alive ONLY if its pid is alive AND its CURRENT
3809                // start-time still matches the recorded one:
3810                //  - container restart: pid 1 alive, but the new pid-1's
3811                //    start-time differs from the dead owner's → stale.
3812                //  - genuine double-open (same live process): start-time
3813                //    matches (it wrote it) → held — correctly refused, so a
3814                //    second writer can't steal a live lock.
3815                // An empty/uncomparable start-time (old-format lock or a
3816                // non-Linux owner with no /proc) falls back to the
3817                // pid-alive answer (the pre-v7.34 behaviour).
3818                let owner_alive = owner.is_some_and(|p| {
3819                    pid_alive(p)
3820                        && match process_start_time(p) {
3821                            Some(now) if !lock_start.is_empty() => now == lock_start,
3822                            _ => true,
3823                        }
3824                });
3825                if owner_alive {
3826                    return Err(EngineError::Unsupported(format!(
3827                        "database is locked by another process (pid {}): {}; \
3828                         stop that process first, or call Database::force_unlock()",
3829                        owner.unwrap_or(0),
3830                        lock_path.display()
3831                    )));
3832                }
3833                // Stale — owner pid dead, reused, or unrecorded. Reclaim.
3834                eprintln!(
3835                    "spg-embedded: reclaiming stale lock {} (owner pid {:?} not a live holder)",
3836                    lock_path.display(),
3837                    owner
3838                );
3839                std::fs::remove_dir_all(lock_path).map_err(io_err)?;
3840                // Loop retries the create_dir; a concurrent reclaimer
3841                // winning the race surfaces as AlreadyExists on
3842                // attempt 1 below.
3843            }
3844            Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
3845                return Err(EngineError::Unsupported(format!(
3846                    "database is locked by another process: {}; \
3847                     stop that process first, or call Database::force_unlock()",
3848                    lock_path.display()
3849                )));
3850            }
3851            Err(e) => return Err(io_err(e)),
3852        }
3853    }
3854    unreachable!("acquire_path_lock loop covers both attempts")
3855}
3856
3857/// Probe whether `pid` is a live process. Unix: `ps -p` via the
3858/// system binary (std-only — no libc dependency). `ps -p` exits 0
3859/// for ANY live pid regardless of owner; `kill -0` was rejected
3860/// here because it fails with EPERM on another user's live process,
3861/// which would read as "dead" and reclaim a held lock. Probe
3862/// failure (no `ps` binary, exec error) conservatively reports
3863/// alive so locks are never auto-reclaimed on doubt; non-unix
3864/// targets do the same.
3865#[cfg(unix)]
3866fn pid_alive(pid: u32) -> bool {
3867    match std::process::Command::new("ps")
3868        .arg("-p")
3869        .arg(pid.to_string())
3870        .stdout(std::process::Stdio::null())
3871        .stderr(std::process::Stdio::null())
3872        .status()
3873    {
3874        Ok(status) => status.success(),
3875        Err(_) => true,
3876    }
3877}
3878
3879#[cfg(not(unix))]
3880fn pid_alive(_pid: u32) -> bool {
3881    true
3882}
3883
3884/// Strip leading whitespace, `--` line comments and NON-conditional
3885/// block comments from a chunk so statement-head checks (COPY
3886/// detection most notably) see the first real token. pg_dump
3887/// prefixes every data block with a `-- Data for Name: …;` banner —
3888/// which itself contains semicolons, so head checks must run on the
3889/// stripped text. MySQL executable conditional comments (`/*!`) are
3890/// content and stay.
3891/// v7.22 — see `split_statements`' `mysql_escapes` tracking. Only
3892/// short chunks are inspected (the signal statements are one-liners;
3893/// COPY data blocks are skipped by the length guard).
3894fn note_dialect_signals(chunk: &str, mysql_escapes: &mut bool) {
3895    if chunk.len() > 4096 {
3896        return;
3897    }
3898    let lower = chunk.to_ascii_lowercase();
3899    if lower.contains("sql_mode") {
3900        *mysql_escapes = true;
3901    } else if lower.contains("standard_conforming_strings") {
3902        *mysql_escapes = lower.contains("off");
3903    }
3904}
3905
3906fn strip_leading_sql_noise(mut s: &str) -> &str {
3907    loop {
3908        let t = s.trim_start();
3909        if let Some(rest) = t.strip_prefix("--") {
3910            s = rest.split_once('\n').map_or("", |(_, r)| r);
3911            continue;
3912        }
3913        if t.starts_with("/*") && !t.starts_with("/*!") {
3914            match t.find("*/") {
3915                Some(e) => {
3916                    s = &t[e + 2..];
3917                    continue;
3918                }
3919                None => return "",
3920            }
3921        }
3922        return t;
3923    }
3924}
3925
3926/// Split a multi-statement SQL script into individual statements on
3927/// top-level `;`, honouring single-quoted strings (with `''`
3928/// escapes), double-quoted identifiers, dollar-quoted bodies
3929/// (`$tag$ … $tag$`), line comments (`--`) and MySQL executable
3930/// conditional comments (`/*!… */` stay statement content; plain
3931/// nested block comments don't). Chunks that contain no statement
3932/// content (whitespace / comments only) are dropped. PG's
3933/// simple-query protocol does this server-side; the embed path owns
3934/// it here.
3935///
3936/// v7.22 (mailrs round-13 gap 1) — psql meta-command lines are
3937/// dropped for client parity: a line whose first non-whitespace
3938/// byte is `\` BETWEEN statements (PG 18's pg_dump wraps scripts in
3939/// `\restrict` / `\unrestrict`) never reaches the parser, the same
3940/// way psql consumes `\`-lines client-side and never sends them. A
3941/// mid-statement backslash stays an ordinary byte — pg_dump only
3942/// emits meta-commands between statements.
3943pub fn split_statements(sql: &str) -> Vec<&str> {
3944    let bytes = sql.as_bytes();
3945    let mut stmts = Vec::new();
3946    let mut start = 0usize;
3947    let mut has_content = false;
3948    // v7.22 (round-13 T3) — stream-tracked string dialect, mirroring
3949    // the engine's session flag: a statement mentioning `sql_mode`
3950    // (mysqldump preamble, often inside `/*!…*/`) switches plain
3951    // strings to backslash-escape scanning;
3952    // `standard_conforming_strings` (pg_dump preamble) switches
3953    // back. Without this the scanner ends a MySQL `'…\'…'` literal
3954    // early and splits inside data.
3955    let mut mysql_escapes = false;
3956    let mut i = 0usize;
3957    while i < bytes.len() {
3958        match bytes[i] {
3959            b'\\' if !has_content => {
3960                // Start-of-statement `\` = psql meta-command line.
3961                // Consume through end-of-line; restart the chunk
3962                // after it so the line never lands in the output.
3963                while i < bytes.len() && bytes[i] != b'\n' {
3964                    i += 1;
3965                }
3966                start = if i < bytes.len() { i + 1 } else { i };
3967            }
3968            b'\'' => {
3969                has_content = true;
3970                // PG escape-string form `E'...'` honours backslash
3971                // escapes (`E'a\';b'` is ONE literal) — detect via
3972                // the immediately-preceding standalone E/e. MySQL
3973                // dialect sessions treat EVERY plain string that way.
3974                let escape_string = mysql_escapes
3975                    || (i >= 1
3976                        && matches!(bytes[i - 1], b'e' | b'E')
3977                        && !(i >= 2
3978                            && (bytes[i - 2].is_ascii_alphanumeric() || bytes[i - 2] == b'_')));
3979                i += 1;
3980                while i < bytes.len() {
3981                    if escape_string && bytes[i] == b'\\' {
3982                        // Skip the escaped byte (covers \' and \\).
3983                        i += 2;
3984                        continue;
3985                    }
3986                    if bytes[i] == b'\'' {
3987                        // `''` is an escaped quote inside the literal.
3988                        if i + 1 < bytes.len() && bytes[i + 1] == b'\'' {
3989                            i += 2;
3990                            continue;
3991                        }
3992                        break;
3993                    }
3994                    i += 1;
3995                }
3996            }
3997            b'"' => {
3998                has_content = true;
3999                i += 1;
4000                while i < bytes.len() && bytes[i] != b'"' {
4001                    i += 1;
4002                }
4003            }
4004            b'$' => {
4005                // Possible dollar-quote opener `$tag$` (tag may be
4006                // empty). If the shape doesn't match, it's a plain
4007                // `$` (positional param) — fall through.
4008                let tag_end = bytes[i + 1..]
4009                    .iter()
4010                    .position(|&b| !(b.is_ascii_alphanumeric() || b == b'_'))
4011                    .map(|off| i + 1 + off);
4012                if let Some(te) = tag_end
4013                    && te < bytes.len()
4014                    && bytes[te] == b'$'
4015                {
4016                    has_content = true;
4017                    let tag = &sql[i..=te];
4018                    // Find the closing `$tag$`.
4019                    if let Some(close) = sql[te + 1..].find(tag) {
4020                        i = te + 1 + close + tag.len();
4021                        continue;
4022                    }
4023                    // Unterminated — consume the rest; the parser
4024                    // will report it.
4025                    i = bytes.len();
4026                    continue;
4027                }
4028                has_content = true;
4029            }
4030            b'-' if i + 1 < bytes.len() && bytes[i + 1] == b'-' => {
4031                while i < bytes.len() && bytes[i] != b'\n' {
4032                    i += 1;
4033                }
4034            }
4035            b'/' if i + 1 < bytes.len() && bytes[i + 1] == b'*' => {
4036                // v7.22 (round-13 T3) — MySQL conditional comments
4037                // `/*!40101 … */` are EXECUTABLE (mysqldump wraps
4038                // its whole preamble + DISABLE KEYS hints in them);
4039                // they must stay statement content for the engine,
4040                // not be skipped as commentary.
4041                if i + 2 < bytes.len() && bytes[i + 2] == b'!' {
4042                    has_content = true;
4043                }
4044                let mut depth = 1usize;
4045                i += 2;
4046                while i < bytes.len() && depth > 0 {
4047                    if bytes[i] == b'/' && i + 1 < bytes.len() && bytes[i + 1] == b'*' {
4048                        depth += 1;
4049                        i += 2;
4050                    } else if bytes[i] == b'*' && i + 1 < bytes.len() && bytes[i + 1] == b'/' {
4051                        depth -= 1;
4052                        i += 2;
4053                    } else {
4054                        i += 1;
4055                    }
4056                }
4057                continue;
4058            }
4059            b';' => {
4060                if has_content {
4061                    let head = &sql[start..i];
4062                    // v7.22 (round-13 T2) — a `COPY … FROM stdin;`
4063                    // statement owns its following data block
4064                    // through the `\.` terminator line (data lines
4065                    // may contain `;`, so generic splitting would
4066                    // shred them). Swallow head + data into ONE
4067                    // chunk; `execute_script` lowers it to INSERTs.
4068                    // pg_dump prefixes the COPY with a comment
4069                    // banner — strip it before the head check.
4070                    let head_clean = strip_leading_sql_noise(head);
4071                    let is_copy_head = head_clean
4072                        .get(..4)
4073                        .is_some_and(|p| p.eq_ignore_ascii_case("copy"))
4074                        && spg_engine::copy::parse_copy_from_stdin_head(head_clean).is_some();
4075                    if is_copy_head {
4076                        // Scan whole lines after the ';' until the
4077                        // `\.` terminator (or EOF — torn dumps lose
4078                        // their tail, same as psql would error).
4079                        let mut j = i + 1;
4080                        let data_end;
4081                        loop {
4082                            if j >= bytes.len() {
4083                                data_end = bytes.len();
4084                                break;
4085                            }
4086                            let line_end = sql[j..].find('\n').map_or(bytes.len(), |off| j + off);
4087                            if sql[j..line_end].trim_end_matches('\r').trim() == "\\." {
4088                                data_end = j;
4089                                i = line_end; // bottom i += 1 skips \n
4090                                break;
4091                            }
4092                            j = line_end + 1;
4093                        }
4094                        stmts.push(&sql[start..data_end]);
4095                        if data_end == bytes.len() {
4096                            i = bytes.len();
4097                        }
4098                        start = i + 1;
4099                        has_content = false;
4100                        i += 1;
4101                        continue;
4102                    }
4103                    note_dialect_signals(head, &mut mysql_escapes);
4104                    stmts.push(head);
4105                }
4106                start = i + 1;
4107                has_content = false;
4108            }
4109            b => {
4110                if !b.is_ascii_whitespace() {
4111                    has_content = true;
4112                }
4113            }
4114        }
4115        i += 1;
4116    }
4117    if has_content {
4118        stmts.push(&sql[start..]);
4119    }
4120    stmts
4121}
4122
4123#[cfg(test)]
4124mod tests {
4125    use super::*;
4126
4127    #[test]
4128    fn split_statements_basic_and_trailing() {
4129        assert_eq!(
4130            split_statements("CREATE TABLE a (x INT); INSERT INTO a VALUES (1)"),
4131            vec!["CREATE TABLE a (x INT)", " INSERT INTO a VALUES (1)"]
4132        );
4133        // whitespace/comment-only chunks drop
4134        assert!(split_statements("  ;; -- nothing\n;").is_empty());
4135    }
4136
4137    #[test]
4138    fn split_statements_quoting_forms() {
4139        // ';' inside a plain literal, a doubled quote, an E-string
4140        // backslash escape, a quoted identifier, and a dollar-quoted
4141        // body must not split.
4142        let cases = [
4143            "INSERT INTO t VALUES ('a;b')",
4144            "INSERT INTO t VALUES ('it''s; fine')",
4145            r"INSERT INTO t VALUES (E'it\'s; fine')",
4146            "CREATE TABLE \"odd;name\" (x INT)",
4147            "DO $body$ BEGIN PERFORM 1; END $body$",
4148            "DO $$ SELECT 1; $$",
4149        ];
4150        for sql in cases {
4151            assert_eq!(split_statements(sql), vec![sql], "must stay whole: {sql}");
4152        }
4153        // ...and each still splits cleanly from a neighbour.
4154        for sql in cases {
4155            let script = format!("{sql};\nSELECT 2");
4156            assert_eq!(
4157                split_statements(&script),
4158                vec![sql, "\nSELECT 2"],
4159                "must split after: {sql}"
4160            );
4161        }
4162    }
4163
4164    #[test]
4165    fn split_statements_drops_psql_meta_lines() {
4166        // v7.22 round-13 gap 1 — PG 18 pg_dump wraps scripts in
4167        // `\restrict` / `\unrestrict`; psql parity = the lines never
4168        // reach the parser.
4169        let script = "\\restrict TOKEN123\nSELECT 1;\n\\unrestrict TOKEN123\nSELECT 2;\n\\.\n";
4170        assert_eq!(split_statements(script), vec!["SELECT 1", "SELECT 2"]);
4171        // Mid-statement backslash is NOT a meta-command.
4172        let s2 = r"SELECT E'a\\b'";
4173        assert_eq!(split_statements(s2), vec![s2]);
4174    }
4175
4176    #[test]
4177    fn split_statements_comments_hide_semicolons() {
4178        let script = "-- c1 ; still comment\nSELECT 1; /* a ; b /* nested ; */ */ SELECT 2";
4179        let got = split_statements(script);
4180        assert_eq!(got.len(), 2);
4181        assert!(got[0].contains("SELECT 1"));
4182        assert!(got[1].contains("SELECT 2"));
4183    }
4184
4185    #[test]
4186    fn in_memory_create_insert_select() {
4187        let mut db = Database::open_in_memory();
4188        db.execute("CREATE TABLE t (id INT NOT NULL, name TEXT)")
4189            .unwrap();
4190        db.execute("INSERT INTO t VALUES (1, 'alice')").unwrap();
4191        db.execute("INSERT INTO t VALUES (2, 'bob')").unwrap();
4192        let rows = db.query("SELECT id FROM t WHERE id = 1").unwrap();
4193        assert_eq!(rows.len(), 1);
4194        match &rows[0][0] {
4195            Value::Int(1) => {}
4196            other => panic!("expected Int(1), got {other:?}"),
4197        }
4198    }
4199
4200    #[test]
4201    fn query_on_non_select_errors() {
4202        let mut db = Database::open_in_memory();
4203        db.execute("CREATE TABLE t (id INT)").unwrap();
4204        let r = db.query("INSERT INTO t VALUES (1)");
4205        assert!(r.is_err(), "query() on INSERT must error");
4206    }
4207
4208    #[test]
4209    fn snapshot_roundtrip() {
4210        let mut db = Database::open_in_memory();
4211        db.execute("CREATE TABLE t (id INT NOT NULL)").unwrap();
4212        db.execute("INSERT INTO t VALUES (42)").unwrap();
4213        let bytes = db.snapshot();
4214        let mut restored = Database::restore(&bytes).unwrap();
4215        let rows = restored.query("SELECT id FROM t WHERE id = 42").unwrap();
4216        assert_eq!(rows.len(), 1);
4217        match &rows[0][0] {
4218            Value::Int(42) => {}
4219            other => panic!("expected Int(42), got {other:?}"),
4220        }
4221    }
4222
4223    #[test]
4224    fn from_spg_row_trait_shape() {
4225        struct User {
4226            _id: i32,
4227        }
4228        impl FromSpgRow for User {
4229            fn from_spg_row(row: &[Value]) -> Result<Self, EngineError> {
4230                match row.first() {
4231                    Some(Value::Int(n)) => Ok(Self { _id: *n }),
4232                    _ => Err(EngineError::Unsupported("bad id".into())),
4233                }
4234            }
4235        }
4236        let row = vec![Value::Int(7)];
4237        let _u = User::from_spg_row(&row).unwrap();
4238    }
4239}