Skip to main content

spg_embedded/
lib.rs

1// v7.7.2 — every public item in this crate must carry a
2// doc-comment; new code that adds a `pub` without one fails CI.
3#![deny(missing_docs)]
4
5//! # spg-embedded
6//!
7//! Ergonomic embedded-mode entry point for SPG. Wraps the
8//! `spg-engine` execution layer for in-process applications
9//! that don't want to spin up a TCP listener / fork to the
10//! `spg-server` binary.
11//!
12//! ## Quick start
13//!
14//! ```no_run
15//! use spg_embedded::Database;
16//!
17//! // On-disk, durable. WAL fsynced per commit; auto-checkpoint
18//! // at 4 MiB WAL by default.
19//! let mut db = Database::open_path("/data/app.db").unwrap();
20//! db.execute("CREATE TABLE users (id INT NOT NULL, name TEXT)").unwrap();
21//! db.execute("INSERT INTO users VALUES (1, 'alice')").unwrap();
22//! let rows = db.query("SELECT name FROM users WHERE id = 1").unwrap();
23//! for row in &rows {
24//!     println!("{:?}", row);
25//! }
26//! ```
27//!
28//! ## Production checklist (v7.5)
29//!
30//! - **Persistence**: `Database::open_path(p)` writes a
31//!   crash-consistent WAL + periodic checkpoint snapshot. The
32//!   on-disk format is byte-identical to what `spg-server`
33//!   produces, so a database can move between modes without
34//!   conversion.
35//! - **Durability**: every `execute()` that mutates calls
36//!   `fsync` before returning `Ok`. There is no group commit
37//!   in embedded mode — every commit pays one fsync. If you
38//!   need batch throughput, wrap multiple statements in
39//!   [`Database::with_transaction`] which fsyncs only at
40//!   commit.
41//! - **Concurrency**: [`Database`] is `Send` but **not** `Sync`.
42//!   Share across threads via `Arc<Mutex<Database>>`. The
43//!   single-writer model is intentional — see
44//!   [STABILITY § A1](https://github.com/lihao/spg/blob/master/STABILITY.md).
45//! - **Background work**: [`Database::spawn_background_freezer`]
46//!   moves cold rows to disk-resident segments while you keep
47//!   serving requests. It runs in a dedicated thread; drop the
48//!   returned [`FreezerHandle`] (or call `stop()`) for clean
49//!   shutdown.
50//! - **Errors**: all public enums ([`EngineError`],
51//!   [`QueryResult`], [`Value`]) are `#[non_exhaustive]`. Match
52//!   them with a wildcard arm so future v7.x releases can add
53//!   variants without breaking your code.
54//!
55//! ## Panic contract
56//!
57//! - **No `execute()` / `query()` call panics on user input.**
58//!   Malformed SQL, type mismatches, missing tables — all
59//!   return `Err(EngineError::…)`. If you observe a panic on
60//!   a user-controlled string, that is a bug; file an issue.
61//! - The library panics **only** on internal invariant
62//!   violations (e.g., catalog snapshot magic mismatch, WAL
63//!   record CRC sentinel corruption that survived the boot-
64//!   time validation). These represent silent disk corruption
65//!   and an unwind would leak inconsistent state, so the
66//!   release profile uses `panic = abort` — your host process
67//!   dies fast rather than continuing on poisoned data.
68//! - If you cannot tolerate `panic = abort`, build with
69//!   `--profile release-dbg` (keeps unwind tables) and use
70//!   `std::panic::catch_unwind` at your application boundary.
71//!
72//! ## Why a separate crate?
73//!
74//! `spg-engine` is `no_std`-compatible (vendored alloc-only).
75//! The embedded-mode entry point uses `std` (filesystem,
76//! threading), so it lives in its own crate to keep the
77//! `no_std` boundary clean.
78
79pub use spg_engine::{CatalogSnapshot, Engine, EngineError, ParsedStatement, QueryResult};
80pub use spg_storage::{ColumnSchema, DataType, Value};
81
82/// v7.16.0 — handle for a parsed-and-planned SQL statement.
83/// Hand off to [`Database::execute_prepared`] / [`Database::query_prepared`]
84/// with a `&[Value]` slice carrying the bind parameters (PG-style
85/// `$1`, `$2`, … positional). Cheap to `Clone`; the underlying AST
86/// is shared by handle copies and cloned per bind call by the
87/// engine's executor.
88///
89/// The handle holds a snapshot of the AST at prepare time. If
90/// the engine's plan cache evicts the entry between prepare and
91/// execute (e.g. ANALYZE bumps the statistics version) the
92/// stored AST keeps working — `execute_prepared` operates on
93/// the handle's clone, not the cache entry.
94#[derive(Debug, Clone)]
95pub struct Statement {
96    /// The parsed + planned AST. `spg-engine::prepare_cached`
97    /// returns it as a clone of the cached plan, so any rewrite
98    /// passes (`expand_group_by_all`, `reorder_joins`, …) have
99    /// already run.
100    pub(crate) stmt: ParsedStatement,
101    /// Original SQL source, kept for `Display` / debug only.
102    /// WAL persistence renders from the AST so a bind-time
103    /// rewrite of `$1..$N` survives replay.
104    pub(crate) sql: String,
105}
106
107impl Statement {
108    /// Borrow the original SQL source — useful for tracing and
109    /// debug logs. WAL replay does NOT use this; it serialises
110    /// the bind-final AST instead.
111    #[must_use]
112    pub fn sql(&self) -> &str {
113        &self.sql
114    }
115}
116
117/// v7.16.0 — internal WAL helper. Mirrors what
118/// `Engine::execute_prepared` does to the cloned AST so the WAL
119/// record carries the bind-final SQL text (so replay's
120/// simple-query path reconstructs the same row state without
121/// needing the original `Statement` handle to still be alive).
122/// Errors from the underlying engine helper would only fire if
123/// the bind-final stmt referenced a placeholder past the params
124/// slice — and that case has already errored in the executor
125/// above before this helper runs, so we discard the Result here.
126fn wal_render_with_params(stmt: &mut ParsedStatement, params: &[Value]) {
127    let _ = spg_engine::substitute_placeholders(stmt, params);
128}
129
130use std::collections::BTreeMap;
131use std::fs::{File, OpenOptions};
132use std::io::Write;
133use std::path::{Path, PathBuf};
134use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
135use std::sync::{Arc, Condvar, Mutex};
136use std::thread::{self, JoinHandle};
137use std::time::{Duration, SystemTime, UNIX_EPOCH};
138
139/// v7.11.3 — wall-clock provider injected into every embedded
140/// `Engine`. Microseconds since the Unix epoch; clamps to
141/// `i64::MAX` if the system clock is far-future. Used by SQL's
142/// `NOW()` / `CURRENT_TIMESTAMP` / `CURRENT_DATE` rewrite layer
143/// so PG-idiomatic time queries work without the caller wiring
144/// their own clock.
145/// v7.36 (mailrs ask #4) — flatten an `EXPLAIN` QueryResult into
146/// the QUERY PLAN string lines. `EXPLAIN` always returns a single-
147/// column TEXT table; anything else is treated as no plan output.
148fn extract_query_plan_lines(result: QueryResult) -> Vec<String> {
149    match result {
150        QueryResult::Rows { rows, .. } => rows
151            .into_iter()
152            .filter_map(|r| {
153                r.values.into_iter().next().and_then(|v| match v {
154                    Value::Text(s) => Some(s),
155                    _ => None,
156                })
157            })
158            .collect(),
159        _ => Vec::new(),
160    }
161}
162
163/// v7.37.2 — auto-warm the OS page cache for cold-tier segments at
164/// `open_path` / `restore` time. Per the zero-customer-change rule
165/// the client never calls `warm_up_cold_tier()` from app code; the
166/// catalog is server-ready when its constructor returns.
167///
168/// Budget controls:
169/// * `SPG_WARM_UP_COLD_BUDGET_MS=N` — stop warming after N ms
170///   wall-clock (best-effort; granularity is per-table). Unset =
171///   no cap.
172/// * `SPG_WARM_UP_COLD_BUDGET_MS=0` — skip warm-up entirely (escape
173///   hatch for ops that need fast restart even at the cost of the
174///   first-query cold spike).
175fn autowarm_cold_tier_on_open(db: &Database) {
176    let budget_ms = std::env::var("SPG_WARM_UP_COLD_BUDGET_MS")
177        .ok()
178        .and_then(|s| s.parse::<u64>().ok());
179    if let Some(0) = budget_ms {
180        return;
181    }
182    let start = std::time::Instant::now();
183    let touched = db.warm_up_cold_tier();
184    let elapsed_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
185    let over_budget = budget_ms.is_some_and(|b| elapsed_ms > b);
186    let _ = (touched, over_budget); // future: tracing::info
187}
188
189fn wall_clock_micros() -> i64 {
190    SystemTime::now()
191        .duration_since(UNIX_EPOCH)
192        .map_or(0, |d| i64::try_from(d.as_micros()).unwrap_or(i64::MAX))
193}
194
195use spg_manifest::{CatalogManifest, ColdSegmentEntry, manifest_path as spg_manifest_path};
196
197// -- v7.1 WAL format constants (mirror `spg-server`'s) ---------
198// Kept private so callers can't mis-frame records; the v3 layout
199// is the same the server uses, so a `spg-server` boot can read a
200// database an embedded process wrote and vice versa.
201const WAL_V2_SENTINEL: u32 = 0x8000_0000;
202const WAL_V3_FLAG: u32 = 0x4000_0000;
203const WAL_V3_TYPE_AUTO_COMMIT_SQL: u8 = 0x01;
204/// v7.18 — durability checkpoint marker stays at 0x02 (skipped on replay).
205const WAL_V3_TYPE_DURABILITY_CHECKPOINT: u8 = 0x02;
206/// v7.18 PITR — auto-commit-sql record with appended (commit_lsn,
207/// commit_unix_us) fields so replay can target a specific point in
208/// time. Backward-compat: v3 records (type 0x01) keep working, the
209/// envelope flag bits are unchanged. The new type byte is the
210/// schema-version discriminator.
211const WAL_V4_TYPE_AUTO_COMMIT_SQL: u8 = 0x10;
212/// v7.18 — sentinel for "no wall clock" inside a v4 record's
213/// commit_unix_us slot. Restore-to-timestamp skips records with
214/// this sentinel (no time anchor); LSN-based restore is
215/// unaffected.
216const WAL_V4_NO_CLOCK: i64 = i64::MIN;
217/// v7.18 — extra header bytes after the type byte in a v4 record:
218/// 8 bytes commit_lsn (u64 LE) + 8 bytes commit_unix_us (i64 LE).
219const WAL_V4_EXTRA_HEADER: usize = 16;
220/// v7.18 PITR — checkpoint anchor record written to the WAL *before*
221/// the snapshot file replaces the on-disk catalog. Carries the
222/// (lsn, ts, snapshot_path) triple so restore tooling can find the
223/// matching base snapshot without scanning the filesystem. Replay
224/// dispatch skips it (same as the v3 durability marker).
225const WAL_V4_TYPE_CHECKPOINT_MARKER: u8 = 0x11;
226
227/// v7.21 (mailrs embed round-12 polish) — one COMMITted explicit
228/// transaction, flushed atomically at COMMIT time. Payload = the
229/// transaction's bind-final mutation statements joined with `";\n"`;
230/// replay re-splits via [`split_statements`] and applies in order.
231/// Same 16-byte (commit_lsn, commit_unix_us) prefix as the v4
232/// auto-commit record. The record is CRC-framed like every other
233/// record, so replay applies the whole transaction or — torn tail —
234/// none of it; a transaction can never half-resurrect.
235///
236/// Why it exists: in-transaction mutations only touch the engine's
237/// shadow catalog (`modified_catalog: false`), so the per-statement
238/// auto-commit append never fired and a COMMIT followed by a crash
239/// (no graceful Drop checkpoint) lost the transaction.
240const WAL_V4_TYPE_TX_COMMIT_SQL: u8 = 0x12;
241
242/// v7.34 (crash-recovery P0 #2) — row-level physical redo record. Same v4
243/// envelope (lsn + ts + payload + CRC) but the payload is `encode_redo_log`
244/// bytes, not SQL. Replay applies the physical [`RowChange`]s via
245/// `Engine::apply_redo` instead of re-executing — O(changed rows), not the
246/// O(records × catalog_rows) statement-replay that hung the mailrs P0.
247const WAL_V5_TYPE_ROW_REDO: u8 = 0x13;
248
249/// v7.1 — auto-checkpoint threshold. Once the WAL grows past
250/// this many bytes, the next successful `execute()` call ends
251/// with a `checkpoint()` so the WAL stays bounded. Tunable via
252/// `SPG_EMBEDDED_CHECKPOINT_BYTES` env.
253/// v7.34 (crash-recovery P0 #2) — opt-in row-level redo WAL records.
254/// Default OFF during bringup; `SPG_WAL_ROW_REDO=1` makes mutating
255/// statements log physical changes (0x13) instead of SQL, so crash
256/// recovery applies them in O(changed rows) rather than re-executing in
257/// O(records × catalog_rows) (the superlinear replay hang root-caused on
258/// the mailrs P0). DDL still logs as SQL (hybrid log). When this returns
259/// true, `open_path` arms the engine's redo capture.
260fn row_redo_enabled() -> bool {
261    std::env::var("SPG_WAL_ROW_REDO")
262        .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
263        .unwrap_or(false)
264}
265
266fn default_checkpoint_threshold_bytes() -> u64 {
267    std::env::var("SPG_EMBEDDED_CHECKPOINT_BYTES")
268        .ok()
269        .and_then(|s| s.parse::<u64>().ok())
270        .filter(|&n| n > 0)
271        .unwrap_or(4 * 1024 * 1024)
272}
273
274/// v7.30.3 (mailrs round-26) — per-query byte budget on join/filter
275/// materialisation, default ON at 256 MiB for embed parity with the
276/// server's allocator-level `SPG_MAX_QUERY_BYTES` default. A fat
277/// backfill batch (1000 × full mail bodies) then errors with
278/// `QueryBytesExceeded` instead of walking the host into reclaim
279/// livelock. `SPG_MAX_QUERY_BYTES=0` disables; any other value
280/// overrides. NOT applied to the WAL-replay engine — replay must
281/// never fail on a tuning knob.
282fn engine_with_query_byte_budget(engine: Engine) -> Engine {
283    const DEFAULT_MAX_QUERY_BYTES: usize = 256 * 1024 * 1024;
284    match std::env::var("SPG_MAX_QUERY_BYTES")
285        .ok()
286        .and_then(|s| s.trim().parse::<usize>().ok())
287    {
288        Some(0) => engine,
289        Some(n) => engine.with_max_query_bytes(n),
290        None => engine.with_max_query_bytes(DEFAULT_MAX_QUERY_BYTES),
291    }
292}
293
294/// v7.1 — encode one v3 `auto_commit_sql` record. Layout:
295///
296/// ```text
297/// [u32 LE (len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
298/// [u32 LE crc32 over (type_byte || sql_bytes)]
299/// [u8 type = 0x01]
300/// [sql bytes]
301/// ```
302fn encode_v3_auto_commit(sql: &str) -> Vec<u8> {
303    let payload = sql.as_bytes();
304    let mut crc_buf = Vec::with_capacity(1 + payload.len());
305    crc_buf.push(WAL_V3_TYPE_AUTO_COMMIT_SQL);
306    crc_buf.extend_from_slice(payload);
307    let crc = spg_crypto::crc32::crc32(&crc_buf);
308    let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
309    let mut out = Vec::with_capacity(4 + 4 + 1 + payload.len());
310    out.extend_from_slice(&header);
311    out.extend_from_slice(&crc.to_le_bytes());
312    out.push(WAL_V3_TYPE_AUTO_COMMIT_SQL);
313    out.extend_from_slice(payload);
314    out
315}
316
317/// v7.20 P2 — WAL group-commit. N concurrent commits share one
318/// fsync (the 4.2 ms p50 that profile_breakdown measured as
319/// 99.2% of the durable write path).
320///
321/// Leader-follower protocol, same family as PG's group commit:
322///
323/// 1. `enqueue(record)` — called while the caller still holds
324///    the engine's write lock. Appends the encoded record to the
325///    shared buffer, returns a sequence ticket. O(memcpy).
326/// 2. Caller RELEASES the engine write lock (the next writer's
327///    mutation proceeds in parallel with this batch's fsync).
328/// 3. `wait_flushed(seq)` — if nobody is flushing, the caller
329///    elects itself leader: swaps the buffer out, writes +
330///    fsyncs ONCE for every record in the batch, marks the
331///    batch durable, wakes all followers. Otherwise it parks on
332///    the condvar until a leader covers its seq.
333///
334/// Durability contract is unchanged from v7.19: `execute()`
335/// does not return Ok until the record that describes its
336/// mutation is fsynced. The only change is N callers sharing
337/// one fsync instead of paying one each.
338///
339/// Lock order (deadlock-free): `state` then `file`; never the
340/// reverse. The leader holds `file` WITHOUT `state` during IO so
341/// enqueues continue while fsync runs.
342#[derive(Debug)]
343struct WalGroup {
344    state: Mutex<WalGroupState>,
345    cond: std::sync::Condvar,
346    /// Active chunk file handle. Separate lock from `state` so
347    /// the leader's write+fsync doesn't block concurrent
348    /// enqueues. Swapped by `checkpoint()` at rotation.
349    file: Mutex<File>,
350}
351
352#[derive(Debug)]
353struct WalGroupState {
354    /// Encoded records awaiting flush.
355    buf: Vec<u8>,
356    /// Monotonic enqueue counter (1-based).
357    enqueued_seq: u64,
358    /// Highest seq whose record is fsynced.
359    flushed_seq: u64,
360    /// True while some caller is inside the leader IO section.
361    leader_active: bool,
362    /// Sticky fatal error — a failed fsync poisons the WAL
363    /// (loud, never silent). All current + future waiters error.
364    failed: Option<String>,
365    /// Bytes written to the active chunk since rotation —
366    /// drives the auto-checkpoint trigger.
367    written_len: u64,
368}
369
370/// Ticket returned by the buffered write path; `wait()` blocks
371/// until the record it covers is durable (or the WAL is
372/// poisoned). Cheap to move across threads.
373#[derive(Debug)]
374pub struct WalTicket {
375    group: Arc<WalGroup>,
376    seq: u64,
377}
378
379/// v7.34 (crash-recovery P0 #2) — RAII reset for the WalGroup leader
380/// flag. Electing a leader sets `leader_active = true` and releases the
381/// state lock for the sleep+IO window; if a panic unwinds through that
382/// window the flag would stay true and every follower would park forever
383/// on the condvar — no one left to flush or wake them, the same
384/// total-write hang an unclean stop causes, but self-inflicted. This
385/// guard clears the flag and wakes the followers (so one re-elects) on
386/// ANY drop, including a panic unwind; the normal path disarms it after
387/// resetting the flag itself.
388struct LeaderGuard<'a> {
389    group: &'a WalGroup,
390    armed: bool,
391}
392
393impl Drop for LeaderGuard<'_> {
394    fn drop(&mut self) {
395        if self.armed {
396            let mut g = self.group.state.lock().unwrap_or_else(|e| e.into_inner());
397            g.leader_active = false;
398            drop(g);
399            self.group.cond.notify_all();
400        }
401    }
402}
403
404impl WalGroup {
405    fn new(file: File, initial_len: u64) -> Self {
406        Self {
407            state: Mutex::new(WalGroupState {
408                buf: Vec::new(),
409                enqueued_seq: 0,
410                flushed_seq: 0,
411                leader_active: false,
412                failed: None,
413                written_len: initial_len,
414            }),
415            cond: std::sync::Condvar::new(),
416            file: Mutex::new(file),
417        }
418    }
419
420    /// Append `record` to the pending batch. Returns the seq the
421    /// caller must wait on. Called under the engine write lock —
422    /// keep it O(memcpy).
423    fn enqueue(&self, record: &[u8]) -> u64 {
424        let mut g = self.state.lock().unwrap_or_else(|e| e.into_inner());
425        g.buf.extend_from_slice(record);
426        g.enqueued_seq += 1;
427        g.enqueued_seq
428    }
429
430    /// Block until `seq` is durable. Leader-follower: the first
431    /// arriving waiter flushes for everyone.
432    fn wait_flushed(&self, seq: u64) -> Result<(), EngineError> {
433        let mut g = self.state.lock().unwrap_or_else(|e| e.into_inner());
434        loop {
435            if let Some(e) = &g.failed {
436                return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
437                    format!("WAL poisoned by earlier flush failure: {e}"),
438                )));
439            }
440            if g.flushed_seq >= seq {
441                return Ok(());
442            }
443            if !g.leader_active {
444                // Elect self leader.
445                g.leader_active = true;
446                drop(g);
447                // v7.34 — panic-safety: if anything below unwinds before
448                // `leader_active` is reset, this guard releases it +
449                // wakes a follower to re-elect (else all writers park
450                // forever). Disarmed on the normal path after the reset.
451                let mut leader_guard = LeaderGuard {
452                    group: self,
453                    armed: true,
454                };
455                // v7.20 — commit_delay (PG's same-named knob):
456                // before taking the batch, give in-flight
457                // writers a short window to enqueue so the
458                // shared fsync covers more commits. 150 µs costs
459                // ~3.5% on a solo 4.2 ms fsync but multiplies
460                // batch size under load. Tunable via
461                // SPG_COMMIT_DELAY_US (0 disables).
462                let delay = commit_delay_us();
463                if delay > 0 {
464                    std::thread::sleep(std::time::Duration::from_micros(delay));
465                }
466                let (batch, flush_to) = {
467                    let mut g2 = self.state.lock().unwrap_or_else(|e| e.into_inner());
468                    (core::mem::take(&mut g2.buf), g2.enqueued_seq)
469                };
470                let io_result: std::io::Result<()> = (|| {
471                    let mut f = self.file.lock().unwrap_or_else(|e| e.into_inner());
472                    f.write_all(&batch)?;
473                    f.sync_data()
474                })();
475                g = self.state.lock().unwrap_or_else(|e| e.into_inner());
476                g.leader_active = false;
477                leader_guard.armed = false; // normal completion — disarm
478                match io_result {
479                    Ok(()) => {
480                        g.flushed_seq = flush_to;
481                        g.written_len = g.written_len.saturating_add(batch.len() as u64);
482                    }
483                    Err(e) => {
484                        g.failed = Some(e.to_string());
485                    }
486                }
487                self.cond.notify_all();
488                //
489
490                // Loop continues: either our seq is now covered
491                // (leader path normally returns next iteration)
492                // or the error branch surfaces.
493                continue;
494            }
495            g = self.cond.wait(g).unwrap_or_else(|e| e.into_inner());
496        }
497    }
498
499    /// Drain the pending batch + flush synchronously. Caller must
500    /// guarantee no concurrent enqueues (checkpoint holds the
501    /// engine exclusively). Used before rotation so the marker
502    /// lands in the right chunk.
503    fn flush_now(&self) -> Result<(), EngineError> {
504        let mut g = self.state.lock().unwrap_or_else(|e| e.into_inner());
505        if let Some(e) = &g.failed {
506            return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
507                format!("WAL poisoned: {e}"),
508            )));
509        }
510        let batch = core::mem::take(&mut g.buf);
511        let flush_to = g.enqueued_seq;
512        if batch.is_empty() {
513            return Ok(());
514        }
515        drop(g);
516        let io: std::io::Result<()> = (|| {
517            let mut f = self.file.lock().unwrap_or_else(|e| e.into_inner());
518            f.write_all(&batch)?;
519            f.sync_data()
520        })();
521        let mut g = self.state.lock().unwrap_or_else(|e| e.into_inner());
522        match io {
523            Ok(()) => {
524                g.flushed_seq = flush_to;
525                g.written_len = g.written_len.saturating_add(batch.len() as u64);
526                self.cond.notify_all();
527                Ok(())
528            }
529            Err(e) => {
530                g.failed = Some(e.to_string());
531                self.cond.notify_all();
532                Err(io_err(e))
533            }
534        }
535    }
536
537    /// Swap the active chunk handle (rotation). Caller flushes
538    /// first; both locks taken in canonical order.
539    fn rotate_file(&self, new_file: File) {
540        let mut g = self.state.lock().unwrap_or_else(|e| e.into_inner());
541        let mut f = self.file.lock().unwrap_or_else(|e| e.into_inner());
542        *f = new_file;
543        g.written_len = 0;
544    }
545
546    fn written_len(&self) -> u64 {
547        let g = self.state.lock().unwrap_or_else(|e| e.into_inner());
548        g.written_len + g.buf.len() as u64
549    }
550}
551
552// ─────────────────────────────────────────────────────────────────────────────
553// CoW-2 (v7.34) — background-checkpoint worker.
554//
555// Splits checkpoint into two halves so the front-end pays only the cheap one:
556//   • Capture (`Database::snapshot_checkpoint_job`) — under &mut self,
557//     Arc-bump the catalog + cheap trailer/cold-segment clones + atomic
558//     commit_lsn load. Front returns to caller in microseconds.
559//   • Execute (`execute_checkpoint_job`, on the worker thread) — serialize
560//     the snapshot, tmp+rename the db / manifest files (each fsynced via
561//     the rename + dir-fsync), enqueue the v4 marker through the WalGroup
562//     (which is already thread-safe so live commits interleave fine),
563//     then rotate the chunk file.
564//
565// Replay floor is the marker LSN captured at front-end time. A crash any
566// time during the worker's sequence is safe: nothing past the previous
567// checkpoint's marker can have been forgotten until the new marker hits
568// the WAL, and live writes between the two go into the same chunk under
569// the old marker — replay re-applies them after restoring the (older)
570// snapshot. snapshot+manifest atomicity (D10) is unchanged from the sync
571// path — CoW-4 tightens it later.
572//
573// Single-instance: a state machine of {pending, inflight} so a new
574// trigger fires only when the worker is fully idle. Any sticky error
575// surfaces on the next `wait()`.
576
577#[derive(Debug)]
578struct CheckpointJob {
579    snapshot: spg_engine::EngineSnapshot,
580    marker_lsn: u64,
581    db_path: PathBuf,
582    wal_dir: PathBuf,
583    wal: Arc<WalGroup>,
584    /// Snapshot-time view of the cold-tier segment set. Carried into the
585    /// worker so any concurrent `freeze_oldest_to_cold` after the trigger
586    /// rides the *next* checkpoint's manifest — same staleness window
587    /// the sync path already had.
588    cold_segments: Vec<(u32, PathBuf)>,
589    /// Shared with `PersistenceCtx` so the worker's chunk rotation is
590    /// visible to subsequent diag / Drop introspection.
591    current_chunk_path: Arc<Mutex<PathBuf>>,
592}
593
594#[derive(Debug, Default)]
595struct CheckpointState {
596    /// Set by the front when it has a job ready; cleared when the worker
597    /// picks it up.
598    pending: Option<CheckpointJob>,
599    /// True while the worker is mid-execute. `pending.is_some() || inflight`
600    /// defines "busy" for the trigger / wait predicate.
601    inflight: bool,
602    /// Sticky error from the worker's last failure. Cleared when surfaced
603    /// to a `wait()` caller.
604    last_error: Option<EngineError>,
605    /// Drop signal — worker exits after the current job (or immediately if
606    /// idle and no pending).
607    shutdown: bool,
608}
609
610#[derive(Debug)]
611struct CheckpointWorker {
612    state: Arc<(Mutex<CheckpointState>, Condvar)>,
613    handle: Option<JoinHandle<()>>,
614}
615
616impl CheckpointWorker {
617    fn spawn() -> Self {
618        let state: Arc<(Mutex<CheckpointState>, Condvar)> =
619            Arc::new((Mutex::new(CheckpointState::default()), Condvar::new()));
620        let state_for_thread = Arc::clone(&state);
621        let handle = thread::Builder::new()
622            .name("spg-checkpoint".into())
623            .spawn(move || checkpoint_worker_loop(&state_for_thread))
624            .expect("spawn checkpoint worker");
625        Self {
626            state,
627            handle: Some(handle),
628        }
629    }
630
631    /// Try to enqueue a job. Returns `Ok(true)` if the worker accepted it,
632    /// `Ok(false)` if a job was already pending or in flight (skip — the
633    /// next trigger will pick up newer state). Surfaces any sticky error
634    /// from a previous run before considering the new job, so async paths
635    /// can't lose a failure indefinitely.
636    fn try_enqueue(&self, job: CheckpointJob) -> Result<bool, EngineError> {
637        let (lock, cond) = &*self.state;
638        let mut g = lock.lock().unwrap_or_else(|e| e.into_inner());
639        if let Some(e) = g.last_error.take() {
640            return Err(e);
641        }
642        if g.pending.is_some() || g.inflight {
643            return Ok(false);
644        }
645        g.pending = Some(job);
646        cond.notify_one();
647        Ok(true)
648    }
649
650    /// Block until the worker is idle (no pending, not in flight). Returns
651    /// any sticky error from the last run; clears it on the way out.
652    fn wait(&self) -> Result<(), EngineError> {
653        let (lock, cond) = &*self.state;
654        let mut g = lock.lock().unwrap_or_else(|e| e.into_inner());
655        while g.pending.is_some() || g.inflight {
656            g = cond.wait(g).unwrap_or_else(|e| e.into_inner());
657        }
658        match g.last_error.take() {
659            Some(e) => Err(e),
660            None => Ok(()),
661        }
662    }
663}
664
665impl Drop for CheckpointWorker {
666    fn drop(&mut self) {
667        {
668            let (lock, cond) = &*self.state;
669            let mut g = lock.lock().unwrap_or_else(|e| e.into_inner());
670            g.shutdown = true;
671            cond.notify_one();
672        }
673        if let Some(h) = self.handle.take() {
674            let _ = h.join();
675        }
676    }
677}
678
679fn checkpoint_worker_loop(state: &Arc<(Mutex<CheckpointState>, Condvar)>) {
680    let (lock, cond) = &**state;
681    loop {
682        let job = {
683            let mut g = lock.lock().unwrap_or_else(|e| e.into_inner());
684            while g.pending.is_none() && !g.shutdown {
685                g = cond.wait(g).unwrap_or_else(|e| e.into_inner());
686            }
687            if g.pending.is_none() {
688                // shutdown with no pending → exit cleanly.
689                return;
690            }
691            // Even on shutdown, drain the pending job first so the Drop-time
692            // final checkpoint is durable before exit.
693            let job = g.pending.take().expect("loop invariant");
694            g.inflight = true;
695            job
696        };
697        let result = execute_checkpoint_job(job);
698        {
699            let mut g = lock.lock().unwrap_or_else(|e| e.into_inner());
700            g.inflight = false;
701            if let Err(e) = result {
702                g.last_error = Some(e);
703            }
704            cond.notify_all();
705        }
706    }
707}
708
709fn execute_checkpoint_job(job: CheckpointJob) -> Result<(), EngineError> {
710    // 1. Serialize the captured snapshot. Heavy; this is the whole point
711    //    of CoW — it runs off the engine borrow.
712    let snapshot = job.snapshot.serialize();
713    // 2. Snapshot tmp+rename. Atomic on POSIX; rename implicitly fsyncs
714    //    the data the next directory walk sees.
715    let tmp = {
716        let mut t = job.db_path.clone();
717        let mut name = t
718            .file_name()
719            .map(std::ffi::OsStr::to_os_string)
720            .unwrap_or_default();
721        name.push(".tmp");
722        t.set_file_name(name);
723        t
724    };
725    std::fs::write(&tmp, &snapshot).map_err(io_err)?;
726    std::fs::rename(&tmp, &job.db_path).map_err(io_err)?;
727    // 3. Manifest tmp+rename (cold tier present).
728    if !job.cold_segments.is_empty() {
729        let snap_crc = spg_crypto::crc32::crc32(&snapshot);
730        let entries: Vec<ColdSegmentEntry> = job
731            .cold_segments
732            .iter()
733            .filter_map(|(segment_id, path)| {
734                let bytes = std::fs::read(path).ok()?;
735                Some(ColdSegmentEntry {
736                    segment_id: *segment_id,
737                    path: path.clone(),
738                    crc32: spg_crypto::crc32::crc32(&bytes),
739                })
740            })
741            .collect();
742        let manifest = CatalogManifest {
743            catalog_crc32: snap_crc,
744            cold_segments: entries,
745            wal_baseline_offset: 0,
746        };
747        let m_bytes = manifest.serialize();
748        let m_path = spg_manifest_path(&job.db_path);
749        if let Some(dir) = m_path.parent() {
750            std::fs::create_dir_all(dir).map_err(io_err)?;
751        }
752        let m_tmp = {
753            let mut t = m_path.clone();
754            let mut name = t
755                .file_name()
756                .map(std::ffi::OsStr::to_os_string)
757                .unwrap_or_default();
758            name.push(".tmp");
759            t.set_file_name(name);
760            t
761        };
762        std::fs::write(&m_tmp, &m_bytes).map_err(io_err)?;
763        std::fs::rename(&m_tmp, &m_path).map_err(io_err)?;
764    }
765    // 4. Enqueue the v4 checkpoint marker carrying the captured LSN. The
766    //    WalGroup is thread-safe so a live commit can interleave — the
767    //    marker's LSN, not its position in the chunk, anchors replay.
768    let marker_ts = wall_clock_micros();
769    let marker = encode_v4_checkpoint_marker(job.marker_lsn, marker_ts, &job.db_path);
770    job.wal.enqueue(&marker);
771    job.wal.flush_now()?;
772    // 5. Rotate the active chunk. New commits land in the fresh chunk;
773    //    pre-marker history stays addressable in the old chunk for PITR /
774    //    retention. The shared `current_chunk_path` is updated under its
775    //    own lock before the WalGroup swap so diag readers never see a
776    //    handle that no longer matches the recorded path.
777    let new_chunk_path = job
778        .wal_dir
779        .join(chunk_filename(marker_ts, job.marker_lsn + 1));
780    let new_handle = OpenOptions::new()
781        .create(true)
782        .append(true)
783        .read(true)
784        .open(&new_chunk_path)
785        .map_err(io_err)?;
786    fsync_dir(&job.wal_dir);
787    {
788        let mut p = job
789            .current_chunk_path
790            .lock()
791            .unwrap_or_else(|e| e.into_inner());
792        *p = new_chunk_path;
793    }
794    job.wal.rotate_file(new_handle);
795    Ok(())
796}
797
798impl WalTicket {
799    /// Block until the record this ticket covers is durable.
800    ///
801    /// Under `SPG_SYNCHRONOUS_COMMIT=off` this returns
802    /// immediately — the background flusher (or the next
803    /// checkpoint / clean shutdown) makes the record durable
804    /// within `SPG_WAL_WRITER_DELAY_MS`. Same contract as PG's
805    /// `synchronous_commit = off`.
806    ///
807    /// # Errors
808    /// Surfaces the leader's IO error if the batch flush failed
809    /// (the WAL is then poisoned for all subsequent writes).
810    pub fn wait(&self) -> Result<(), EngineError> {
811        if !synchronous_commit_on() {
812            return Ok(());
813        }
814        self.group.wait_flushed(self.seq)
815    }
816}
817
818/// v7.19 P3 — retention sweep loop. Runs in a dedicated thread
819/// spawned by `Database::open_path` when `SPG_PITR_RETENTION_HOURS`
820/// is set to a non-zero value. Wakes every
821/// `SPG_PITR_RETENTION_CHECK_SEC` (default 60 s), enumerates chunks
822/// under `wal_dir`, archives via `SPG_PITR_ARCHIVE_CMD` if set, and
823/// deletes anything older than `retention_hours`.
824///
825/// Loud-failure posture matches PG's `archive_command`: if the
826/// archive command returns non-zero, the chunk stays on disk and
827/// a warning prints to stderr. The retention sweep doesn't delete
828/// a chunk it failed to archive.
829fn retention_sweep_loop(
830    wal_dir: PathBuf,
831    retention_hours: u64,
832    check_interval: std::time::Duration,
833    archive_cmd: Option<String>,
834    shutdown: Arc<AtomicBool>,
835) {
836    while !shutdown.load(Ordering::SeqCst) {
837        if let Err(e) = retention_sweep_once(&wal_dir, retention_hours, archive_cmd.as_deref()) {
838            eprintln!("spg-embedded: retention sweep error: {e}");
839        }
840        // Sleep in short ticks so shutdown isn't blocked on a
841        // 60 s naptime when Drop signals.
842        let mut elapsed = std::time::Duration::ZERO;
843        let tick = std::time::Duration::from_millis(250);
844        while elapsed < check_interval {
845            if shutdown.load(Ordering::SeqCst) {
846                return;
847            }
848            std::thread::sleep(tick);
849            elapsed += tick;
850        }
851    }
852}
853
854/// v7.19 P3 — one retention sweep pass over `wal_dir`. Extracted
855/// from the loop so tests can drive it directly. Public so the
856/// e2e_pitr_retention integration test (and any future operator
857/// tooling that wants synchronous retention) can call it.
858pub fn retention_sweep_once(
859    wal_dir: &Path,
860    retention_hours: u64,
861    archive_cmd: Option<&str>,
862) -> std::io::Result<()> {
863    if !wal_dir.exists() {
864        return Ok(());
865    }
866    let now_us = wall_clock_micros();
867    let cutoff_us = (now_us as i128 - (retention_hours as i128 * 3_600 * 1_000_000)) as i64;
868    let chunks = sorted_wal_chunks(wal_dir)?;
869    for chunk in chunks {
870        // Don't sweep the most-recent chunk; it's the live one
871        // execute() is appending to. Compare against the largest
872        // filename-prefix unix_us.
873        let stem = match chunk.file_stem().and_then(|s| s.to_str()) {
874            Some(s) => s,
875            None => continue,
876        };
877        let chunk_us: i64 = stem
878            .split_once('_')
879            .and_then(|(prefix, _)| i64::from_str_radix(prefix, 16).ok())
880            .unwrap_or(0);
881        if chunk_us >= cutoff_us {
882            continue;
883        }
884        // Archive first if requested.
885        if let Some(cmd) = archive_cmd {
886            if !cmd.is_empty() {
887                let output = std::process::Command::new("sh")
888                    .arg("-c")
889                    .arg(cmd)
890                    .arg("--")
891                    .arg(&chunk)
892                    .output()?;
893                if !output.status.success() {
894                    eprintln!(
895                        "spg-embedded: SPG_PITR_ARCHIVE_CMD failed for {} (exit {}); chunk stays on disk",
896                        chunk.display(),
897                        output.status.code().unwrap_or(-1)
898                    );
899                    continue;
900                }
901            }
902        }
903        // Delete the chunk + its sibling .checksum if present.
904        if let Err(e) = std::fs::remove_file(&chunk) {
905            eprintln!(
906                "spg-embedded: retention remove {} failed: {e}",
907                chunk.display()
908            );
909            continue;
910        }
911        let mut cs = chunk.clone();
912        let mut name = cs.file_name().map(|n| n.to_os_string()).unwrap_or_default();
913        name.push(".checksum");
914        cs.set_file_name(name);
915        let _ = std::fs::remove_file(&cs);
916    }
917    Ok(())
918}
919
920/// v7.20 — group-commit delay window in µs (PG `commit_delay`
921/// analogue). The flush leader sleeps this long before taking
922/// the batch so concurrent writers pile in. Default 150 µs;
923/// `SPG_COMMIT_DELAY_US=0` disables.
924fn commit_delay_us() -> u64 {
925    static CACHED: std::sync::OnceLock<u64> = std::sync::OnceLock::new();
926    *CACHED.get_or_init(|| {
927        std::env::var("SPG_COMMIT_DELAY_US")
928            .ok()
929            .and_then(|s| s.parse::<u64>().ok())
930            .unwrap_or(150)
931    })
932}
933
934/// v7.20 — PG `synchronous_commit` analogue. `on` (default):
935/// `execute()` blocks until its WAL record is fsynced —
936/// zero-loss durability. `off`: `execute()` returns after the
937/// in-memory mutation + WAL enqueue; a background flusher
938/// thread writes + fsyncs every `SPG_WAL_WRITER_DELAY_MS`
939/// (default 200 ms — PG's `wal_writer_delay` default). Crash
940/// window = up to one flush interval of confirmed-but-unsynced
941/// commits — exactly the trade PG documents for the same
942/// setting. Clean shutdown (Drop / checkpoint) always flushes.
943fn synchronous_commit_on() -> bool {
944    static CACHED: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
945    *CACHED.get_or_init(|| {
946        !std::env::var("SPG_SYNCHRONOUS_COMMIT")
947            .map(|v| v.eq_ignore_ascii_case("off") || v == "0" || v.eq_ignore_ascii_case("false"))
948            .unwrap_or(false)
949    })
950}
951
952/// v7.20 — background WAL flusher cadence for
953/// `SPG_SYNCHRONOUS_COMMIT=off` (PG `wal_writer_delay`).
954fn wal_writer_delay_ms() -> u64 {
955    static CACHED: std::sync::OnceLock<u64> = std::sync::OnceLock::new();
956    *CACHED.get_or_init(|| {
957        std::env::var("SPG_WAL_WRITER_DELAY_MS")
958            .ok()
959            .and_then(|s| s.parse::<u64>().ok())
960            .filter(|&n| n > 0)
961            .unwrap_or(200)
962    })
963}
964
965fn pitr_retention_hours() -> u64 {
966    std::env::var("SPG_PITR_RETENTION_HOURS")
967        .ok()
968        .and_then(|s| s.parse::<u64>().ok())
969        .unwrap_or(0)
970}
971
972fn pitr_retention_check_sec() -> u64 {
973    std::env::var("SPG_PITR_RETENTION_CHECK_SEC")
974        .ok()
975        .and_then(|s| s.parse::<u64>().ok())
976        .filter(|&n| n > 0)
977        .unwrap_or(60)
978}
979
980fn pitr_archive_cmd() -> Option<String> {
981    std::env::var("SPG_PITR_ARCHIVE_CMD")
982        .ok()
983        .filter(|s| !s.is_empty())
984}
985
986/// v7.19 — replay every record from `wal_bytes` whose
987/// `commit_lsn` is strictly greater than `floor_lsn`. v3 records
988/// (no LSN) and v4 records with `commit_lsn <= floor_lsn` are
989/// skipped — the snapshot loaded ahead of this call already
990/// reflects them, and re-applying would DuplicateTable /
991/// double-insert. v3 records inside the legacy migration chunk
992/// always apply because the migration sets `floor_lsn = 0` and
993/// v3 records carry no LSN to compare; the pre-migration
994/// behaviour (every record replays) is what the migration
995/// preserves.
996///
997/// Returns the count of records successfully applied. Same
998/// torn-tail semantics as `replay_wal_into_engine`.
999fn replay_wal_filtered(
1000    wal_bytes: &[u8],
1001    engine: &mut Engine,
1002    floor_lsn: u64,
1003    quarantine: &mut Vec<QuarantinedStmt>,
1004) -> Result<usize, String> {
1005    let records = parse_wal_records(wal_bytes)?;
1006    let mut applied = 0usize;
1007    for r in &records {
1008        // Skip markers + non-SQL records.
1009        if r.type_byte == WAL_V3_TYPE_DURABILITY_CHECKPOINT
1010            || r.type_byte == WAL_V4_TYPE_CHECKPOINT_MARKER
1011        {
1012            continue;
1013        }
1014        // v4 SQL records carry an LSN. Apply iff strictly above
1015        // the snapshot floor.
1016        if r.type_byte == WAL_V4_TYPE_AUTO_COMMIT_SQL
1017            || r.type_byte == WAL_V4_TYPE_TX_COMMIT_SQL
1018            || r.type_byte == WAL_V5_TYPE_ROW_REDO
1019        {
1020            if let Some(lsn) = r.commit_lsn {
1021                if lsn <= floor_lsn {
1022                    continue;
1023                }
1024            }
1025        }
1026        // v7.34 (crash-recovery P0 #2) — row-level redo record: apply the
1027        // physical changes directly (O(changed rows)) instead of
1028        // re-executing SQL (the O(records × rows) statement-replay that
1029        // hung the mailrs P0). The payload is `encode_redo_log` bytes, not
1030        // SQL, so it never enters the from_utf8 / split_statements path.
1031        if r.type_byte == WAL_V5_TYPE_ROW_REDO {
1032            let changes = spg_storage::decode_redo_log(r.sql)
1033                .map_err(|e| format!("redo decode at offset {}: {e:?}", r.offset))?;
1034            engine
1035                .apply_redo(&changes)
1036                .map_err(|e| format!("redo apply at offset {}: {e:?}", r.offset))?;
1037            applied += 1;
1038            continue;
1039        }
1040        // v3 records (type 0x01, no LSN) always apply — the
1041        // legacy migration path is the only place they appear,
1042        // and floor_lsn=0 there.
1043        let sql = match std::str::from_utf8(r.sql) {
1044            Ok(s) => s,
1045            Err(e) => return Err(format!("non-UTF-8 SQL at offset {}: {e}", r.offset)),
1046        };
1047        // v7.21 — a tx-commit record carries the whole transaction
1048        // as a `";\n"`-joined script; auto-commit records are a
1049        // single statement, for which split_statements is a no-op.
1050        //
1051        // v7.30.1 (mailrs round-24 ask 2) — a statement the engine
1052        // REJECTS is quarantined, not fatal: "one statement failed
1053        // to replay" ≠ "the catalog is corrupt". Framing damage
1054        // (parse_wal_records / non-UTF-8 above) still errors — that
1055        // IS corruption. Subsequent statements of a tx script keep
1056        // applying: the bricking class is a no-op-at-runtime
1057        // statement that re-applies non-idempotently, and skipping
1058        // just it reconstructs the runtime state.
1059        for stmt in split_statements(sql) {
1060            if let Err(e) = engine.execute(stmt) {
1061                quarantine.push(QuarantinedStmt {
1062                    offset: r.offset,
1063                    sql: stmt.to_string(),
1064                    error: format!("{e:?}"),
1065                });
1066            }
1067        }
1068        applied += 1;
1069    }
1070    Ok(applied)
1071}
1072
1073/// v7.30.1 (mailrs round-24 ask 2) — one statement that failed to
1074/// re-apply during boot replay. Kept for forensics in a
1075/// `quarantine-*.log` beside the WAL chunks; the boot continues.
1076struct QuarantinedStmt {
1077    offset: usize,
1078    sql: String,
1079    error: String,
1080}
1081
1082fn format_quarantine_line(q: &QuarantinedStmt) -> String {
1083    format!("offset {}: {}\n  rejected: {}\n", q.offset, q.sql, q.error)
1084}
1085
1086/// v7.19 — WAL chunk filename format. Zero-padded 16-digit
1087/// hex on both parts so default lexicographic sort matches
1088/// numeric order, with the unix_us prefix coming first so
1089/// the on-disk listing is chronological too.
1090/// v7.34 (crash-recovery P0 #2) — fsync a directory so a newly created
1091/// file's entry is durable. `sync_data` on a chunk file persists its
1092/// bytes but NOT the parent directory entry that names it; a power loss
1093/// after creating a fresh WAL chunk could lose that entry and make the
1094/// chunk (and the committed records in it) unreachable on restart.
1095/// Best-effort — a platform that rejects directory fsync is no worse off.
1096fn fsync_dir(dir: &Path) {
1097    if let Ok(f) = File::open(dir) {
1098        let _ = f.sync_all();
1099    }
1100}
1101
1102fn chunk_filename(unix_us: i64, leading_lsn: u64) -> String {
1103    // Negative timestamps shouldn't happen in practice (we sit
1104    // post-1970), but clamp to 0 so the zero-padded
1105    // representation stays sortable.
1106    let us = unix_us.max(0) as u64;
1107    format!("{us:016x}_{leading_lsn:016x}.wal")
1108}
1109
1110/// v7.19 — filename used for the legacy single-file WAL when
1111/// `open_path` migrates a v7.18-layout database into the new
1112/// chunk directory. Lexicographically smallest possible value
1113/// so subsequent chunks sort after it.
1114fn legacy_chunk_filename() -> String {
1115    chunk_filename(0, 0)
1116}
1117
1118/// CoW-4 (v7.34) — D10 fallback: read one cold-segment file and
1119/// hand its bytes to the catalog. The segment binary is self-validating
1120/// (magic + internal CRC32 via `OwnedSegment::from_bytes`), so we don't
1121/// need the manifest's `segment_crc32` to trust it. Returns `true` on a
1122/// successful attach (caller bumps `cold_segment_paths`), `false` on a
1123/// per-segment failure that is logged but doesn't abort boot.
1124fn attach_segment_from_disk(engine: &mut Engine, segment_id: u32, path: &Path) -> bool {
1125    if engine.catalog().cold_segment(segment_id).is_some() {
1126        return true;
1127    }
1128    let bytes = match std::fs::read(path) {
1129        Ok(b) => b,
1130        Err(e) => {
1131            eprintln!(
1132                "spg-embedded: cold-segment scan skip {}: read failed: {e}",
1133                path.display()
1134            );
1135            return false;
1136        }
1137    };
1138    let mut new_cat = engine.catalog().clone();
1139    if let Err(e) = new_cat.load_segment_bytes_at(segment_id, bytes) {
1140        eprintln!(
1141            "spg-embedded: cold-segment scan skip {}: parse/load failed: {e}",
1142            path.display()
1143        );
1144        return false;
1145    }
1146    engine.replace_catalog(new_cat);
1147    true
1148}
1149
1150/// CoW-4 (v7.34) — D10 + missing-manifest fallback: scan
1151/// `<db>.spg/segments/` for `seg_<id>.spg` files and attach any that
1152/// aren't already in `cold_segment_paths`. Closes the window where a
1153/// crash between snapshot rename and manifest rename leaves
1154/// post-checkpoint cold segments orphaned on disk (the snapshot's CRC
1155/// no longer matches the stale manifest, so the manifest path
1156/// silently dropped them). The segment parser self-verifies, so a
1157/// torn write surfaces as a per-segment skip, never silent corruption.
1158fn scan_cold_segments_dir(
1159    segments_dir: &Path,
1160    engine: &mut Engine,
1161    cold_segment_paths: &mut BTreeMap<u32, PathBuf>,
1162) {
1163    // v7.34.1 (mailrs prod report bug A): single-file catalogs (e.g.
1164    // `/data/spg/mailrs.spg` is a regular file, not the `<db>/<db>.spg`
1165    // layout this scan assumes) make the computed `<db>.spg/segments`
1166    // path traverse a file inode, which surfaces as ENOTDIR (`Not a
1167    // directory`, errno 20). Treat any non-directory state — absent,
1168    // file-in-the-way, stat-blocked — as "no segments to scan" and
1169    // silently return. The eprintln below only fires for the genuine
1170    // mid-walk read errors (permission flip, IO failure) that operators
1171    // need to see.
1172    if !segments_dir.is_dir() {
1173        return;
1174    }
1175    let read_dir = match std::fs::read_dir(segments_dir) {
1176        Ok(rd) => rd,
1177        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return,
1178        Err(e) => {
1179            eprintln!(
1180                "spg-embedded: cold-segment scan: cannot read {}: {e}",
1181                segments_dir.display()
1182            );
1183            return;
1184        }
1185    };
1186    for entry in read_dir.flatten() {
1187        let path = entry.path();
1188        // Only the canonical `seg_<id>.spg` form. `.tmp` half-renames
1189        // and unknown extensions are skipped — the segment writer's
1190        // tmp+rename pattern guarantees `.spg` files are either fully
1191        // written or absent.
1192        if path.extension().and_then(|s| s.to_str()) != Some("spg") {
1193            continue;
1194        }
1195        let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
1196            continue;
1197        };
1198        let Some(id_str) = stem.strip_prefix("seg_") else {
1199            continue;
1200        };
1201        let Ok(segment_id) = id_str.parse::<u32>() else {
1202            continue;
1203        };
1204        if cold_segment_paths.contains_key(&segment_id) {
1205            continue;
1206        }
1207        if attach_segment_from_disk(engine, segment_id, &path) {
1208            cold_segment_paths.insert(segment_id, path);
1209        }
1210    }
1211}
1212
1213/// v7.19 — list every `.wal` file in `wal_dir` in
1214/// lexicographic order (which doubles as chunk-creation
1215/// order thanks to the zero-padded filename format).
1216fn sorted_wal_chunks(wal_dir: &Path) -> std::io::Result<Vec<PathBuf>> {
1217    let mut paths = Vec::new();
1218    let read_dir = match std::fs::read_dir(wal_dir) {
1219        Ok(rd) => rd,
1220        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(paths),
1221        Err(e) => return Err(e),
1222    };
1223    for entry in read_dir {
1224        let entry = entry?;
1225        let path = entry.path();
1226        if path.extension().and_then(|s| s.to_str()) == Some("wal") {
1227            paths.push(path);
1228        }
1229    }
1230    paths.sort();
1231    Ok(paths)
1232}
1233
1234/// v7.18 PITR — encode one v4 `checkpoint_marker` record. Layout:
1235///
1236/// ```text
1237/// [u32 LE (payload_len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
1238/// [u32 LE crc32 over (type_byte || payload)]
1239/// [u8  type = 0x11]
1240/// payload:
1241///   [u64 LE checkpoint_lsn]
1242///   [i64 LE checkpoint_unix_us  (WAL_V4_NO_CLOCK if no clock)]
1243///   [u16 LE snapshot_path_len]
1244///   [snapshot_path_bytes]
1245/// ```
1246///
1247/// `payload_len` covers only the payload — keeping the framing
1248/// uniform across v3 / v4 record types so torn-write detection in
1249/// `replay_wal_into_engine` stays trivial.
1250fn encode_v4_checkpoint_marker(
1251    checkpoint_lsn: u64,
1252    checkpoint_unix_us: i64,
1253    snapshot_path: &Path,
1254) -> Vec<u8> {
1255    let snapshot_bytes = snapshot_path.to_string_lossy().into_owned();
1256    let snap_payload = snapshot_bytes.as_bytes();
1257    let snap_len_u16: u16 = snap_payload.len().min(u16::MAX as usize) as u16;
1258    let mut payload = Vec::with_capacity(8 + 8 + 2 + snap_payload.len());
1259    payload.extend_from_slice(&checkpoint_lsn.to_le_bytes());
1260    payload.extend_from_slice(&checkpoint_unix_us.to_le_bytes());
1261    payload.extend_from_slice(&snap_len_u16.to_le_bytes());
1262    payload.extend_from_slice(&snap_payload[..snap_len_u16 as usize]);
1263    let mut crc_buf = Vec::with_capacity(1 + payload.len());
1264    crc_buf.push(WAL_V4_TYPE_CHECKPOINT_MARKER);
1265    crc_buf.extend_from_slice(&payload);
1266    let crc = spg_crypto::crc32::crc32(&crc_buf);
1267    let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
1268    let mut out = Vec::with_capacity(4 + 4 + 1 + payload.len());
1269    out.extend_from_slice(&header);
1270    out.extend_from_slice(&crc.to_le_bytes());
1271    out.push(WAL_V4_TYPE_CHECKPOINT_MARKER);
1272    out.extend_from_slice(&payload);
1273    out
1274}
1275
1276/// v7.18 PITR — encode one v4 `auto_commit_sql` record. Layout:
1277///
1278/// ```text
1279/// [u32 LE (sql_len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
1280/// [u32 LE crc32 over (type_byte || lsn || ts || sql_bytes)]
1281/// [u8  type = 0x10]
1282/// [u64 LE commit_lsn]
1283/// [i64 LE commit_unix_us  (= WAL_V4_NO_CLOCK when no ClockFn)]
1284/// [sql bytes]
1285/// ```
1286///
1287/// `sql_len` field stays the SQL byte count — same shape as v3 — so
1288/// replay-buffer torn-write detection compares against
1289/// `WAL_V4_EXTRA_HEADER + sql_len`. v3 records (type 0x01) stay
1290/// readable by the same loop with their original 9-byte header
1291/// arithmetic.
1292fn encode_v4_auto_commit(sql: &str, commit_lsn: u64, commit_unix_us: i64) -> Vec<u8> {
1293    encode_v4_framed(
1294        WAL_V4_TYPE_AUTO_COMMIT_SQL,
1295        sql.as_bytes(),
1296        commit_lsn,
1297        commit_unix_us,
1298    )
1299}
1300
1301/// v7.21 — same envelope, `WAL_V4_TYPE_TX_COMMIT_SQL` type byte.
1302/// `script` = the transaction's statements joined with `";\n"`.
1303fn encode_v4_tx_commit(script: &str, commit_lsn: u64, commit_unix_us: i64) -> Vec<u8> {
1304    encode_v4_framed(
1305        WAL_V4_TYPE_TX_COMMIT_SQL,
1306        script.as_bytes(),
1307        commit_lsn,
1308        commit_unix_us,
1309    )
1310}
1311
1312/// v7.34 (crash-recovery P0 #2) — encode one row-level redo record. Same
1313/// v4 envelope + CRC, type byte 0x13; the payload is the
1314/// `encode_redo_log` bytes (physical changes) instead of SQL text, so
1315/// replay applies them in place of re-executing the statement.
1316fn encode_v5_row_redo(redo_bytes: &[u8], commit_lsn: u64, commit_unix_us: i64) -> Vec<u8> {
1317    encode_v4_framed(WAL_V5_TYPE_ROW_REDO, redo_bytes, commit_lsn, commit_unix_us)
1318}
1319
1320fn encode_v4_framed(
1321    type_byte: u8,
1322    payload: &[u8],
1323    commit_lsn: u64,
1324    commit_unix_us: i64,
1325) -> Vec<u8> {
1326    let mut crc_buf = Vec::with_capacity(1 + WAL_V4_EXTRA_HEADER + payload.len());
1327    crc_buf.push(type_byte);
1328    crc_buf.extend_from_slice(&commit_lsn.to_le_bytes());
1329    crc_buf.extend_from_slice(&commit_unix_us.to_le_bytes());
1330    crc_buf.extend_from_slice(payload);
1331    let crc = spg_crypto::crc32::crc32(&crc_buf);
1332    let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
1333    let mut out = Vec::with_capacity(4 + 4 + 1 + WAL_V4_EXTRA_HEADER + payload.len());
1334    out.extend_from_slice(&header);
1335    out.extend_from_slice(&crc.to_le_bytes());
1336    out.push(type_byte);
1337    out.extend_from_slice(&commit_lsn.to_le_bytes());
1338    out.extend_from_slice(&commit_unix_us.to_le_bytes());
1339    out.extend_from_slice(payload);
1340    out
1341}
1342
1343/// v7.1 — decode + apply every record in `wal_bytes` to `engine`.
1344/// Returns the count of records successfully applied. A truncated
1345/// trailing record (mid-write torn) is dropped silently — the
1346/// same recovery story `spg-server`'s boot path uses.
1347fn replay_wal_into_engine(wal_bytes: &[u8], engine: &mut Engine) -> Result<usize, String> {
1348    let mut applied = 0usize;
1349    let mut cur = 0usize;
1350    while cur < wal_bytes.len() {
1351        if wal_bytes.len() - cur < 4 {
1352            // Trailing partial header — torn write, drop and stop.
1353            break;
1354        }
1355        let raw_len = u32::from_le_bytes(wal_bytes[cur..cur + 4].try_into().unwrap());
1356        let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
1357        let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
1358        let len_mask = if is_v3 {
1359            !(WAL_V2_SENTINEL | WAL_V3_FLAG)
1360        } else {
1361            !WAL_V2_SENTINEL
1362        };
1363        let rec_len = (raw_len & len_mask) as usize;
1364        let header_len = if is_v3 {
1365            9
1366        } else if is_v2 {
1367            8
1368        } else {
1369            4
1370        };
1371        if wal_bytes.len() - cur < header_len + rec_len {
1372            // Torn record at the tail — drop, stop.
1373            break;
1374        }
1375        if is_v3 {
1376            let type_byte = wal_bytes[cur + 8];
1377            match type_byte {
1378                WAL_V3_TYPE_AUTO_COMMIT_SQL => {}
1379                WAL_V3_TYPE_DURABILITY_CHECKPOINT => {
1380                    // durability_checkpoint marker — skip, no SQL.
1381                    cur += header_len + rec_len;
1382                    continue;
1383                }
1384                WAL_V4_TYPE_CHECKPOINT_MARKER => {
1385                    // v7.18 PITR — checkpoint anchor, skip on replay
1386                    // (engine state past this point reflects the
1387                    // matching snapshot already loaded by the caller).
1388                    cur += header_len + rec_len;
1389                    continue;
1390                }
1391                WAL_V4_TYPE_AUTO_COMMIT_SQL | WAL_V4_TYPE_TX_COMMIT_SQL => {
1392                    // v7.18 PITR — v4 record carries 16 bytes of
1393                    // (commit_lsn, commit_unix_us) between the type
1394                    // byte and the SQL payload. Replay reads them but
1395                    // does not enforce them — the engine doesn't
1396                    // surface LSN/clock here. Restore tooling
1397                    // (spgctl) parses them via parse_wal_record below.
1398                    //
1399                    // v7.21 — tx-commit records (0x12) carry a whole
1400                    // transaction as a `";\n"`-joined script;
1401                    // split_statements is a no-op on the single-
1402                    // statement auto-commit form.
1403                    let v4_total = header_len + WAL_V4_EXTRA_HEADER + rec_len;
1404                    if wal_bytes.len() - cur < v4_total {
1405                        // Torn v4 record at the tail — drop, stop.
1406                        break;
1407                    }
1408                    let sql_start = cur + header_len + WAL_V4_EXTRA_HEADER;
1409                    let sql_bytes = &wal_bytes[sql_start..sql_start + rec_len];
1410                    let sql = std::str::from_utf8(sql_bytes)
1411                        .map_err(|e| format!("WAL replay: non-UTF-8 SQL at offset {cur}: {e}"))?;
1412                    for stmt in split_statements(sql) {
1413                        engine.execute(stmt).map_err(|e| {
1414                            format!("WAL replay: apply {stmt:?} at offset {cur} rejected: {e:?}")
1415                        })?;
1416                    }
1417                    applied += 1;
1418                    cur += v4_total;
1419                    continue;
1420                }
1421                other => {
1422                    return Err(format!(
1423                        "WAL replay: unknown v3 type byte {other:#04x} at offset {cur}"
1424                    ));
1425                }
1426            }
1427        }
1428        let sql_bytes = &wal_bytes[cur + header_len..cur + header_len + rec_len];
1429        let sql = std::str::from_utf8(sql_bytes)
1430            .map_err(|e| format!("WAL replay: non-UTF-8 SQL at offset {cur}: {e}"))?;
1431        engine
1432            .execute(sql)
1433            .map_err(|e| format!("WAL replay: apply {sql:?} at offset {cur} rejected: {e:?}"))?;
1434        applied += 1;
1435        cur += header_len + rec_len;
1436    }
1437    Ok(applied)
1438}
1439
1440/// v7.18 PITR — parsed WAL record, surfaced for restore / verify
1441/// tooling. The replay loop above doesn't expose LSN/timestamp;
1442/// `spgctl restore --to <timestamp>` and `spgctl verify` need them.
1443/// Returned offsets are byte-positions inside the WAL buffer.
1444#[derive(Debug, Clone)]
1445pub struct WalRecord<'a> {
1446    /// Byte offset in the WAL buffer where this record starts.
1447    pub offset: usize,
1448    /// Type byte (0x01 = v3 auto-commit, 0x10 = v4 auto-commit,
1449    /// 0x02 = durability checkpoint marker).
1450    pub type_byte: u8,
1451    /// `Some(lsn)` for v4 records, `None` for v3.
1452    pub commit_lsn: Option<u64>,
1453    /// `Some(unix_us)` for v4 records carrying a clock-set timestamp,
1454    /// `None` for v3 or for v4 records explicitly written with
1455    /// `WAL_V4_NO_CLOCK` (sentinel for "no ClockFn at commit time").
1456    pub commit_unix_us: Option<i64>,
1457    /// SQL payload as borrowed bytes. Empty for durability markers.
1458    pub sql: &'a [u8],
1459}
1460
1461/// v7.18 PITR — iterate over `wal_bytes` yielding one `WalRecord`
1462/// per intact record. Torn-tail records terminate iteration
1463/// silently (same recovery story as `replay_wal_into_engine`).
1464/// Unknown type bytes inside a v3 envelope return `Err` so the
1465/// caller knows the WAL was written by a newer SPG.
1466pub fn parse_wal_records(wal_bytes: &[u8]) -> Result<Vec<WalRecord<'_>>, String> {
1467    let mut out = Vec::new();
1468    let mut cur = 0usize;
1469    while cur < wal_bytes.len() {
1470        if wal_bytes.len() - cur < 4 {
1471            break;
1472        }
1473        let raw_len = u32::from_le_bytes(wal_bytes[cur..cur + 4].try_into().unwrap());
1474        let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
1475        let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
1476        let len_mask = if is_v3 {
1477            !(WAL_V2_SENTINEL | WAL_V3_FLAG)
1478        } else {
1479            !WAL_V2_SENTINEL
1480        };
1481        let rec_len = (raw_len & len_mask) as usize;
1482        let header_len = if is_v3 {
1483            9
1484        } else if is_v2 {
1485            8
1486        } else {
1487            4
1488        };
1489        if wal_bytes.len() - cur < header_len + rec_len {
1490            break;
1491        }
1492        if !is_v3 {
1493            // v1 / v2 records carry no type byte; treat as legacy
1494            // auto-commit SQL with no LSN/time.
1495            let sql = &wal_bytes[cur + header_len..cur + header_len + rec_len];
1496            out.push(WalRecord {
1497                offset: cur,
1498                type_byte: WAL_V3_TYPE_AUTO_COMMIT_SQL,
1499                commit_lsn: None,
1500                commit_unix_us: None,
1501                sql,
1502            });
1503            cur += header_len + rec_len;
1504            continue;
1505        }
1506        let type_byte = wal_bytes[cur + 8];
1507        match type_byte {
1508            WAL_V3_TYPE_AUTO_COMMIT_SQL => {
1509                let sql = &wal_bytes[cur + header_len..cur + header_len + rec_len];
1510                out.push(WalRecord {
1511                    offset: cur,
1512                    type_byte,
1513                    commit_lsn: None,
1514                    commit_unix_us: None,
1515                    sql,
1516                });
1517                cur += header_len + rec_len;
1518            }
1519            WAL_V3_TYPE_DURABILITY_CHECKPOINT => {
1520                out.push(WalRecord {
1521                    offset: cur,
1522                    type_byte,
1523                    commit_lsn: None,
1524                    commit_unix_us: None,
1525                    sql: &[],
1526                });
1527                cur += header_len + rec_len;
1528            }
1529            WAL_V4_TYPE_CHECKPOINT_MARKER => {
1530                // v7.18 PITR — payload = (lsn u64)(ts i64)(path_len u16)(path bytes).
1531                // We surface lsn + ts on the WalRecord; the path lives
1532                // in `sql` since the type byte already disambiguates
1533                // record meaning and adding a dedicated field would
1534                // bloat the iterator return type for every variant.
1535                if rec_len < 18 {
1536                    return Err(format!(
1537                        "WAL parse: checkpoint marker at offset {cur} too short ({rec_len} bytes)"
1538                    ));
1539                }
1540                let lsn = u64::from_le_bytes(
1541                    wal_bytes[cur + header_len..cur + header_len + 8]
1542                        .try_into()
1543                        .unwrap(),
1544                );
1545                let ts_raw = i64::from_le_bytes(
1546                    wal_bytes[cur + header_len + 8..cur + header_len + 16]
1547                        .try_into()
1548                        .unwrap(),
1549                );
1550                let path_len = u16::from_le_bytes(
1551                    wal_bytes[cur + header_len + 16..cur + header_len + 18]
1552                        .try_into()
1553                        .unwrap(),
1554                ) as usize;
1555                if rec_len < 18 + path_len {
1556                    return Err(format!(
1557                        "WAL parse: checkpoint marker at offset {cur} truncated path"
1558                    ));
1559                }
1560                let path_start = cur + header_len + 18;
1561                let path_bytes = &wal_bytes[path_start..path_start + path_len];
1562                let commit_unix_us = if ts_raw == WAL_V4_NO_CLOCK {
1563                    None
1564                } else {
1565                    Some(ts_raw)
1566                };
1567                out.push(WalRecord {
1568                    offset: cur,
1569                    type_byte,
1570                    commit_lsn: Some(lsn),
1571                    commit_unix_us,
1572                    sql: path_bytes,
1573                });
1574                cur += header_len + rec_len;
1575            }
1576            WAL_V4_TYPE_AUTO_COMMIT_SQL | WAL_V4_TYPE_TX_COMMIT_SQL | WAL_V5_TYPE_ROW_REDO => {
1577                let v4_total = header_len + WAL_V4_EXTRA_HEADER + rec_len;
1578                if wal_bytes.len() - cur < v4_total {
1579                    break;
1580                }
1581                let lsn = u64::from_le_bytes(
1582                    wal_bytes[cur + header_len..cur + header_len + 8]
1583                        .try_into()
1584                        .unwrap(),
1585                );
1586                let ts_raw = i64::from_le_bytes(
1587                    wal_bytes[cur + header_len + 8..cur + header_len + 16]
1588                        .try_into()
1589                        .unwrap(),
1590                );
1591                let commit_unix_us = if ts_raw == WAL_V4_NO_CLOCK {
1592                    None
1593                } else {
1594                    Some(ts_raw)
1595                };
1596                let sql_start = cur + header_len + WAL_V4_EXTRA_HEADER;
1597                let sql = &wal_bytes[sql_start..sql_start + rec_len];
1598                out.push(WalRecord {
1599                    offset: cur,
1600                    type_byte,
1601                    commit_lsn: Some(lsn),
1602                    commit_unix_us,
1603                    sql,
1604                });
1605                cur += v4_total;
1606            }
1607            other => {
1608                return Err(format!(
1609                    "WAL parse: unknown type byte {other:#04x} at offset {cur}"
1610                ));
1611            }
1612        }
1613    }
1614    Ok(out)
1615}
1616
1617/// v7.1 — predicate for "should the next `execute()` mutate the
1618/// WAL?" Returns `false` for SELECT / SHOW / EXPLAIN / BEGIN /
1619/// COMMIT / ROLLBACK and the SPG-specific verbs that don't go
1620/// through the auto-commit record path on the server (CHECKPOINT,
1621/// COMPACT). Conservative: anything we don't explicitly know is
1622/// read-only falls through to "write a WAL record".
1623fn sql_is_read_only(sql: &str) -> bool {
1624    let t = sql.trim_start();
1625    let head = t
1626        .split(|c: char| c.is_whitespace() || c == ';' || c == '(')
1627        .next()
1628        .unwrap_or("");
1629    matches!(
1630        head.to_ascii_lowercase().as_str(),
1631        "select"
1632            | "show"
1633            | "explain"
1634            | "begin"
1635            | "commit"
1636            | "rollback"
1637            | "checkpoint"
1638            | "compact"
1639            | "wait"
1640            | "with"
1641    )
1642}
1643
1644/// Embedded SPG database handle. Owns an `Engine` + provides
1645/// ergonomic wrappers around `execute` and `query`. Drops the
1646/// engine on `Drop` — no WAL flush / fsync, because v6.10.3
1647/// is in-memory only.
1648#[derive(Debug)]
1649pub struct Database {
1650    engine: Engine,
1651    /// v7.1 — persistence sidecar. When `Some(p)`, every
1652    /// `execute(sql)` that mutates state appends a v4
1653    /// `auto_commit_sql` WAL record + fsyncs before the call
1654    /// returns; `Drop` writes a final catalog snapshot to
1655    /// `<db_path>` so the next session boots from a clean
1656    /// snapshot + an empty WAL. `None` = in-memory only (the
1657    /// v6.10.3 shape).
1658    persistence: Option<PersistenceCtx>,
1659    /// v7.18 PITR — monotonic per-database commit LSN. Increments
1660    /// before each successful WAL append; bootstrapped at
1661    /// open_path from `max(parse_wal_records → commit_lsn)` so
1662    /// reopen never reuses an LSN. In-memory databases start at
1663    /// 0 and never advance (no WAL = no LSN-meaningful records).
1664    commit_lsn: AtomicU64,
1665    /// v7.21 (round-12 polish) — explicit-transaction WAL buffer.
1666    /// `Some` between an engine-accepted BEGIN and its
1667    /// COMMIT / ROLLBACK on a persistent database. In-transaction
1668    /// mutations only touch the engine's shadow catalog and report
1669    /// `modified_catalog: false`, so the per-statement auto-commit
1670    /// append never fires for them; their bind-final SQL collects
1671    /// here instead and COMMIT flushes the lot as ONE atomic
1672    /// `WAL_V4_TYPE_TX_COMMIT_SQL` record (ROLLBACK just drops it).
1673    /// Always `None` for in-memory databases.
1674    tx_wal: Option<TxWalBuffer>,
1675}
1676
1677/// See [`Database::tx_wal`].
1678#[derive(Debug, Default)]
1679struct TxWalBuffer {
1680    /// Bind-final SQL of every non-read-only statement the engine
1681    /// accepted inside the open transaction, in execution order.
1682    statements: Vec<String>,
1683    /// `(savepoint_name, statements.len() at SAVEPOINT time)` —
1684    /// `ROLLBACK TO SAVEPOINT` truncates `statements` back to the
1685    /// recorded mark so the WAL record matches what the engine
1686    /// keeps. PG name-reuse semantics (latest wins).
1687    savepoints: Vec<(String, usize)>,
1688}
1689
1690/// Statement-level transaction-control classification for the WAL
1691/// buffer. Runs AFTER the engine accepted the statement, so the
1692/// engine stays the single validator — this only mirrors state.
1693enum TxControl {
1694    Begin,
1695    Commit,
1696    Rollback,
1697    RollbackToSavepoint(String),
1698    Savepoint(String),
1699    ReleaseSavepoint,
1700}
1701
1702fn tx_control_kind(sql: &str) -> Option<TxControl> {
1703    let mut words = sql
1704        .split(|c: char| c.is_whitespace() || c == ';')
1705        .filter(|w| !w.is_empty())
1706        .map(str::to_ascii_lowercase);
1707    let head = words.next()?;
1708    match head.as_str() {
1709        "begin" | "start" => Some(TxControl::Begin),
1710        "commit" | "end" => Some(TxControl::Commit),
1711        "savepoint" => words.next().map(TxControl::Savepoint),
1712        "release" => Some(TxControl::ReleaseSavepoint),
1713        "rollback" => match words.next().as_deref() {
1714            // ROLLBACK TO [SAVEPOINT] <name>
1715            Some("to") => {
1716                let next = words.next()?;
1717                let name = if next == "savepoint" {
1718                    words.next()?
1719                } else {
1720                    next
1721                };
1722                Some(TxControl::RollbackToSavepoint(name))
1723            }
1724            _ => Some(TxControl::Rollback),
1725        },
1726        _ => None,
1727    }
1728}
1729
1730#[derive(Debug)]
1731#[allow(dead_code)] // `wal_dir`/`current_chunk_path` are read at boot; kept for Drop/diag introspection.
1732struct PersistenceCtx {
1733    db_path: PathBuf,
1734    /// v7.19 — WAL chunk directory at `<db_path>.wal/`.
1735    /// Replaces the v7.18 single-file `<db_path>.wal` layout.
1736    /// Each chunk file inside is named
1737    /// `<unix_us>_<leading_lsn>.wal` (zero-padded to 16 digits
1738    /// so default-lex sort = LSN order).
1739    wal_dir: PathBuf,
1740    /// Path of the currently-open chunk file inside `wal_dir`.
1741    /// Rotated at checkpoint and whenever the chunk crosses
1742    /// `checkpoint_threshold_bytes`. CoW-2 (v7.34) wraps it in
1743    /// `Arc<Mutex<…>>` because the background-checkpoint worker
1744    /// performs the rotation; this struct keeps a clone so Drop /
1745    /// diag introspection still see the live path.
1746    current_chunk_path: Arc<Mutex<PathBuf>>,
1747    /// v7.19 P3 — retention sweeper handle. `Some` when
1748    /// `SPG_PITR_RETENTION_HOURS > 0` at open_path time; `None`
1749    /// when retention is disabled (the default; v7.18 behaviour
1750    /// preserved). The thread polls `wal_dir` every
1751    /// `SPG_PITR_RETENTION_CHECK_SEC` seconds, archives via
1752    /// `SPG_PITR_ARCHIVE_CMD` if set, then deletes chunks older
1753    /// than the retention window. Signalled to exit via
1754    /// `retention_shutdown` on Drop.
1755    retention_shutdown: Option<Arc<AtomicBool>>,
1756    retention_thread: Option<std::thread::JoinHandle<()>>,
1757    /// v7.20 — background WAL flusher for
1758    /// `SPG_SYNCHRONOUS_COMMIT=off`. `None` in the default
1759    /// synchronous mode. Flushes the pending batch every
1760    /// `SPG_WAL_WRITER_DELAY_MS`; signalled + joined on Drop
1761    /// before the final checkpoint so clean shutdown never
1762    /// loses confirmed commits.
1763    flusher_shutdown: Option<Arc<AtomicBool>>,
1764    flusher_thread: Option<std::thread::JoinHandle<()>>,
1765    /// v7.20 P2 — group-commit WAL. Shared with WalTickets
1766    /// returned by the buffered write path so `wait()` can run
1767    /// after the engine write lock is released.
1768    wal: Arc<WalGroup>,
1769    checkpoint_threshold_bytes: u64,
1770    /// v7.1.4 — `<db_path>.spg/segments/` directory. Cold-tier
1771    /// segments produced by `freeze_oldest_to_cold` / compaction
1772    /// are persisted here as `seg_<id>.spg` files; the manifest
1773    /// at `<db_path>.spg/manifest.v10` records every active
1774    /// segment + its CRC32 so the next boot can verify + reload.
1775    cold_segments_dir: PathBuf,
1776    cold_segment_paths: BTreeMap<u32, PathBuf>,
1777    /// v7.17.0 Phase 6.2 — cross-process exclusion lock. Acquired
1778    /// via `fs::create_dir` on `<db_path>.lock` at open_path
1779    /// entry; released on Drop by `fs::remove_dir`. atomic on
1780    /// every supported platform. A second process opening the
1781    /// same path while the first is still alive hits the
1782    /// create_dir failure and returns
1783    /// `EngineError::Unsupported("database is locked by another
1784    /// process: …")`. Stale locks (process crashed mid-session)
1785    /// must be cleared via `Database::force_unlock(path)` —
1786    /// SPG can't safely fingerprint who owned a stale directory
1787    /// without a libc dep, which would violate spg-embedded's
1788    /// zero-deps charter.
1789    lock_path: PathBuf,
1790    /// CoW-2 (v7.34) — background-checkpoint worker. `None` only
1791    /// transiently inside `Drop` after the worker has been signalled
1792    /// and joined. The worker carries Arc clones of `wal` and
1793    /// `current_chunk_path`, so it can rotate the active chunk and
1794    /// reflect the new path back here even after the front-end has
1795    /// returned to the caller.
1796    checkpoint_worker: Option<CheckpointWorker>,
1797}
1798
1799impl Database {
1800    /// Open a fresh in-memory database. No WAL, no catalog
1801    /// snapshot on disk — perfect for tests + short-lived
1802    /// CLI tools.
1803    #[must_use]
1804    pub fn open_in_memory() -> Self {
1805        Self {
1806            engine: engine_with_query_byte_budget(Engine::new().with_clock(wall_clock_micros)),
1807            persistence: None,
1808            commit_lsn: AtomicU64::new(0),
1809            tx_wal: None,
1810        }
1811    }
1812
1813    /// v7.1 — Open or create a persistent database backed by
1814    /// the file at `db_path`. The WAL lives at `db_path` +
1815    /// ".wal" (e.g. `./data/spg.db` → `./data/spg.db.wal`). Boot
1816    /// path:
1817    ///
1818    /// 1. If `db_path` exists, restore the catalog snapshot.
1819    /// 2. If the WAL exists, replay every record into the
1820    ///    restored engine — the same recovery story
1821    ///    `spg-server` uses.
1822    /// 3. Open the WAL in append+sync mode so subsequent
1823    ///    `execute()` writes durably commit (one fsync per
1824    ///    mutation).
1825    ///
1826    /// `Drop` writes a final catalog snapshot + truncates the
1827    /// WAL — operators that need a sync barrier at a specific
1828    /// point use `checkpoint()` explicitly.
1829    pub fn open_path(db_path: impl AsRef<Path>) -> Result<Self, EngineError> {
1830        let db_path = db_path.as_ref().to_path_buf();
1831        // v7.19 — WAL is a directory of chunk files. Legacy
1832        // single-file path stays variable-named `wal_path` for
1833        // the backward-compat migration block below.
1834        let wal_path = {
1835            let mut p = db_path.clone();
1836            let name = p
1837                .file_name()
1838                .map(|n| {
1839                    let mut s = n.to_os_string();
1840                    s.push(".wal");
1841                    s
1842                })
1843                .unwrap_or_else(|| std::ffi::OsString::from(".wal"));
1844            p.set_file_name(name);
1845            p
1846        };
1847        let wal_dir = wal_path.clone();
1848        if let Some(parent) = db_path.parent()
1849            && !parent.as_os_str().is_empty()
1850        {
1851            std::fs::create_dir_all(parent).map_err(io_err)?;
1852        }
1853        // v7.17.0 Phase 6.2 — acquire cross-process exclusion
1854        // lock before touching any catalog / WAL bytes. atomic
1855        // mkdir on every supported platform; a second process
1856        // opening the same path while the first is still alive
1857        // hits the create_dir failure and gets a clear error.
1858        let lock_path = {
1859            let mut p = db_path.clone();
1860            let name = p
1861                .file_name()
1862                .map(|n| {
1863                    let mut s = n.to_os_string();
1864                    s.push(".lock");
1865                    s
1866                })
1867                .unwrap_or_else(|| std::ffi::OsString::from(".lock"));
1868            p.set_file_name(name);
1869            p
1870        };
1871        acquire_path_lock(&lock_path)?;
1872        let mut engine = if db_path.exists() {
1873            let bytes = std::fs::read(&db_path).map_err(io_err)?;
1874            let engine = Engine::restore_envelope(&bytes).map_err(|e| {
1875                EngineError::Storage(spg_storage::StorageError::Corrupt(format!(
1876                    "restore from {}: {e}",
1877                    db_path.display()
1878                )))
1879            })?;
1880            engine_with_query_byte_budget(engine.with_clock(wall_clock_micros))
1881        } else {
1882            engine_with_query_byte_budget(Engine::new().with_clock(wall_clock_micros))
1883        };
1884        // v7.1.4 — manifest-driven cold-segment reload. The
1885        // manifest sidecar pairs the catalog snapshot CRC with a
1886        // list of `(segment_id, path, crc32)` triples; verify
1887        // before loading so a torn or stale manifest doesn't
1888        // surface phantom data.
1889        let cold_segments_dir = {
1890            let parent = db_path.parent().unwrap_or_else(|| Path::new("."));
1891            let stem = db_path
1892                .file_stem()
1893                .unwrap_or_else(|| std::ffi::OsStr::new("db"))
1894                .to_string_lossy()
1895                .into_owned();
1896            parent.join(format!("{stem}.spg")).join("segments")
1897        };
1898        let mut cold_segment_paths: BTreeMap<u32, PathBuf> = BTreeMap::new();
1899        let manifest_pth = spg_manifest_path(&db_path);
1900        if manifest_pth.exists() && db_path.exists() {
1901            let m_bytes = std::fs::read(&manifest_pth).map_err(io_err)?;
1902            if let Ok(m) = CatalogManifest::deserialize(&m_bytes) {
1903                let snap_bytes = std::fs::read(&db_path).map_err(io_err)?;
1904                let snap_crc = spg_crypto::crc32::crc32(&snap_bytes);
1905                if snap_crc == m.catalog_crc32 {
1906                    for entry in &m.cold_segments {
1907                        if let Ok(seg_bytes) = std::fs::read(&entry.path) {
1908                            let computed = spg_crypto::crc32::crc32(&seg_bytes);
1909                            if computed != entry.crc32 {
1910                                eprintln!(
1911                                    "spg-embedded: manifest skip segment {}: CRC mismatch",
1912                                    entry.segment_id
1913                                );
1914                                continue;
1915                            }
1916                            if engine.catalog().cold_segment(entry.segment_id).is_some() {
1917                                // Already loaded via Catalog::clone path (shouldn't happen
1918                                // since Engine::new + restore_envelope don't populate cold).
1919                                continue;
1920                            }
1921                            let mut new_cat = engine.catalog().clone();
1922                            if let Err(e) =
1923                                new_cat.load_segment_bytes_at(entry.segment_id, seg_bytes)
1924                            {
1925                                eprintln!(
1926                                    "spg-embedded: manifest load segment {} failed: {e}",
1927                                    entry.segment_id
1928                                );
1929                                continue;
1930                            }
1931                            engine.replace_catalog(new_cat);
1932                            cold_segment_paths.insert(entry.segment_id, entry.path.clone());
1933                        } else {
1934                            eprintln!(
1935                                "spg-embedded: manifest skip segment {}: file unreadable",
1936                                entry.segment_id
1937                            );
1938                        }
1939                    }
1940                }
1941            }
1942        }
1943        // CoW-4 (v7.34) — D10 + missing-manifest fallback. Walk
1944        // `<db>.spg/segments/` and attach any `seg_<id>.spg` file that
1945        // the manifest didn't already cover (manifest absent / CRC
1946        // mismatched / a fresher freeze landed after the last
1947        // checkpoint wrote its manifest). The segment binary's own
1948        // magic + CRC32 guards integrity — no need to trust a stale
1949        // manifest entry to trust the file.
1950        scan_cold_segments_dir(&cold_segments_dir, &mut engine, &mut cold_segment_paths);
1951        // v7.19 — chunked WAL on-disk layout.
1952        //
1953        // Three cases handled here:
1954        //
1955        // 1. wal_dir exists as a DIRECTORY → scan its
1956        //    `<unix_us>_<leading_lsn>.wal` chunks (sorted
1957        //    lexicographically = chunk-creation order), replay
1958        //    them in sequence, advance the LSN watermark to the
1959        //    max commit_lsn seen.
1960        //
1961        // 2. wal_path exists as a FILE → legacy v7.18 layout.
1962        //    Migrate it: create `wal_dir/`, move the single file
1963        //    inside as `0000000000000000_0000000000000000.wal`,
1964        //    then fall through to case 1's replay loop.
1965        //
1966        // 3. Neither exists → fresh database; create wal_dir.
1967        let mut initial_lsn: u64 = 0;
1968        if wal_path.is_file() {
1969            // Case 2: legacy single-file WAL migration.
1970            let legacy_bytes = std::fs::read(&wal_path).map_err(io_err)?;
1971            std::fs::remove_file(&wal_path).map_err(io_err)?;
1972            std::fs::create_dir_all(&wal_dir).map_err(io_err)?;
1973            if !legacy_bytes.is_empty() {
1974                let migrated = wal_dir.join(legacy_chunk_filename());
1975                std::fs::write(&migrated, &legacy_bytes).map_err(io_err)?;
1976            }
1977        } else if !wal_dir.exists() {
1978            // Case 3: fresh database.
1979            std::fs::create_dir_all(&wal_dir).map_err(io_err)?;
1980        }
1981        // Cases 1 + 2 share replay logic now that wal_dir is
1982        // guaranteed to exist (and may be empty for case 3).
1983        //
1984        // Two-pass replay so we don't double-apply records the
1985        // snapshot already reflects:
1986        //
1987        // 1. Find the highest commit_lsn carried by a
1988        //    checkpoint_marker across all chunks. That LSN is the
1989        //    snapshot's high-water mark — anything ≤ it is
1990        //    already in `<db_path>` and replaying it would
1991        //    DuplicateTable / double-insert.
1992        // 2. Replay only records strictly above that LSN.
1993        //
1994        // Case 2 migration (legacy single-file WAL) lands here
1995        // too: the migrated chunk has no marker so the LSN floor
1996        // is 0 and every record applies — exactly the v7.18
1997        // behaviour the migration is supposed to preserve.
1998        let chunk_paths = sorted_wal_chunks(&wal_dir).map_err(io_err)?;
1999        let mut snapshot_lsn: u64 = 0;
2000        for chunk in &chunk_paths {
2001            let bytes = std::fs::read(chunk).map_err(io_err)?;
2002            if let Ok(records) = parse_wal_records(&bytes) {
2003                for r in &records {
2004                    if r.type_byte == WAL_V4_TYPE_CHECKPOINT_MARKER {
2005                        if let Some(l) = r.commit_lsn {
2006                            if l > snapshot_lsn {
2007                                snapshot_lsn = l;
2008                            }
2009                        }
2010                    }
2011                }
2012            }
2013        }
2014        let mut quarantined: Vec<QuarantinedStmt> = Vec::new();
2015        for chunk in &chunk_paths {
2016            let bytes = std::fs::read(chunk).map_err(io_err)?;
2017            if bytes.is_empty() {
2018                continue;
2019            }
2020            replay_wal_filtered(&bytes, &mut engine, snapshot_lsn, &mut quarantined)
2021                .map_err(|m| EngineError::Storage(spg_storage::StorageError::Corrupt(m)))?;
2022            if let Ok(records) = parse_wal_records(&bytes) {
2023                if let Some(max) = records.iter().filter_map(|r| r.commit_lsn).max() {
2024                    if max > initial_lsn {
2025                        initial_lsn = max;
2026                    }
2027                }
2028            }
2029        }
2030        // v7.30.1 (mailrs round-24 ask 2) — replay rejects no longer
2031        // brick the open. Persist the rejected statements beside the
2032        // WAL chunks for forensics and say so loudly; the boot
2033        // continues with every other record applied.
2034        if !quarantined.is_empty() {
2035            let mut body = String::new();
2036            for q in &quarantined {
2037                body.push_str(&format_quarantine_line(q));
2038            }
2039            let qpath = wal_dir.join(format!(
2040                "quarantine-{:016x}.log",
2041                wall_clock_micros().max(0) as u64
2042            ));
2043            match std::fs::write(&qpath, &body) {
2044                Ok(()) => eprintln!(
2045                    "spg-embedded: WAL replay quarantined {} statement(s) — boot continues; \
2046                     forensics at {}",
2047                    quarantined.len(),
2048                    qpath.display()
2049                ),
2050                Err(e) => eprintln!(
2051                    "spg-embedded: WAL replay quarantined {} statement(s) — boot continues; \
2052                     quarantine file write FAILED ({e}), entries follow:\n{body}",
2053                    quarantined.len()
2054                ),
2055            }
2056        }
2057        // Open the "current" chunk — either the last existing
2058        // chunk file (so subsequent appends extend it until the
2059        // size threshold rotates) or a fresh first chunk.
2060        let now_us = wall_clock_micros();
2061        let current_chunk_path = if let Some(last) = chunk_paths.last() {
2062            last.clone()
2063        } else {
2064            wal_dir.join(chunk_filename(now_us, initial_lsn + 1))
2065        };
2066        let wal_file = OpenOptions::new()
2067            .create(true)
2068            .append(true)
2069            .read(true)
2070            .open(&current_chunk_path)
2071            .map_err(io_err)?;
2072        // Persist the (possibly freshly created) chunk's directory entry.
2073        fsync_dir(&wal_dir);
2074        let wal_len = wal_file.metadata().map_err(io_err)?.len();
2075        let wal = Arc::new(WalGroup::new(wal_file, wal_len));
2076        // v7.19 P3 — spawn retention sweep thread when the
2077        // operator opted in via SPG_PITR_RETENTION_HOURS > 0.
2078        // Otherwise stay on the v7.18 behaviour (chunks accumulate
2079        // until something else — backup-pitr archival, manual
2080        // cleanup — moves them).
2081        let retention_hours = pitr_retention_hours();
2082        let (retention_shutdown, retention_thread) = if retention_hours > 0 {
2083            let shutdown = Arc::new(AtomicBool::new(false));
2084            let shutdown_clone = Arc::clone(&shutdown);
2085            let wal_dir_clone = wal_dir.clone();
2086            let check_interval = std::time::Duration::from_secs(pitr_retention_check_sec());
2087            let archive_cmd = pitr_archive_cmd();
2088            let handle = std::thread::Builder::new()
2089                .name("spg-pitr-retention".into())
2090                .spawn(move || {
2091                    retention_sweep_loop(
2092                        wal_dir_clone,
2093                        retention_hours,
2094                        check_interval,
2095                        archive_cmd,
2096                        shutdown_clone,
2097                    );
2098                })
2099                .map_err(io_err)?;
2100            (Some(shutdown), Some(handle))
2101        } else {
2102            (None, None)
2103        };
2104        // v7.20 — background flusher for SPG_SYNCHRONOUS_COMMIT=off.
2105        let (flusher_shutdown, flusher_thread) = if synchronous_commit_on() {
2106            (None, None)
2107        } else {
2108            let shutdown = Arc::new(AtomicBool::new(false));
2109            let shutdown_clone = Arc::clone(&shutdown);
2110            let group = Arc::clone(&wal);
2111            let interval = std::time::Duration::from_millis(wal_writer_delay_ms());
2112            let handle = std::thread::Builder::new()
2113                .name("spg-wal-flusher".into())
2114                .spawn(move || {
2115                    while !shutdown_clone.load(Ordering::SeqCst) {
2116                        std::thread::sleep(interval);
2117                        if let Err(e) = group.flush_now() {
2118                            eprintln!("spg-embedded: background WAL flush failed: {e:?}");
2119                        }
2120                    }
2121                    // Final drain on shutdown signal.
2122                    let _ = group.flush_now();
2123                })
2124                .map_err(io_err)?;
2125            (Some(shutdown), Some(handle))
2126        };
2127        // v7.34 (crash-recovery P0 #2) — arm row-level redo capture for
2128        // subsequent writes (AFTER replay, so re-executed SQL records
2129        // don't capture; 0x13 records replay via apply_redo and never do).
2130        if row_redo_enabled() {
2131            engine.set_redo_capture(true);
2132        }
2133        let db = Self {
2134            engine,
2135            commit_lsn: AtomicU64::new(initial_lsn),
2136            tx_wal: None,
2137            persistence: Some(PersistenceCtx {
2138                db_path,
2139                wal_dir,
2140                current_chunk_path: Arc::new(Mutex::new(current_chunk_path)),
2141                wal,
2142                checkpoint_threshold_bytes: default_checkpoint_threshold_bytes(),
2143                cold_segments_dir,
2144                cold_segment_paths,
2145                lock_path,
2146                retention_shutdown,
2147                retention_thread,
2148                flusher_shutdown,
2149                flusher_thread,
2150                checkpoint_worker: Some(CheckpointWorker::spawn()),
2151            }),
2152        };
2153        // v7.37.2 (mailrs prod 7.35 pool-exhaustion incident — surface
2154        // fix per `feedback-zero-customer-change-warmup-incident`) —
2155        // automatic cold-tier OS page-cache warm-up so the catalog is
2156        // fully server-ready on return. The client never sees a SPG-
2157        // specific call site; `open_path` behaves like PG's "ready to
2158        // accept queries" semantics. Bounded by
2159        // `SPG_WARM_UP_COLD_BUDGET_MS` (default unset = no cap;
2160        // env-only spec channel, never a client-visible API). `0` =
2161        // skip warm-up entirely (escape hatch for fast restart).
2162        autowarm_cold_tier_on_open(&db);
2163        Ok(db)
2164    }
2165
2166    /// v7.1.4 — freeze the oldest `max_rows` of `table_name`'s
2167    /// hot tier into a brand-new cold-tier segment + persist
2168    /// it to disk. Same semantics as `spg-server`'s freezer
2169    /// thread; embedded just runs the freeze synchronously on
2170    /// the caller's thread. Persistence + manifest update
2171    /// happen as part of the next `checkpoint()` (or on Drop).
2172    pub fn freeze_oldest_to_cold(
2173        &mut self,
2174        table_name: &str,
2175        index_name: &str,
2176        max_rows: usize,
2177    ) -> Result<spg_storage::FreezeReport, EngineError> {
2178        let report = self
2179            .engine
2180            .freeze_oldest_to_cold(table_name, index_name, max_rows)?;
2181        if let Some(p) = &mut self.persistence {
2182            std::fs::create_dir_all(&p.cold_segments_dir).map_err(io_err)?;
2183            let final_path = p
2184                .cold_segments_dir
2185                .join(format!("seg_{}.spg", report.segment_id));
2186            let tmp_path = p
2187                .cold_segments_dir
2188                .join(format!("seg_{}.spg.tmp", report.segment_id));
2189            std::fs::write(&tmp_path, &report.segment_bytes).map_err(io_err)?;
2190            std::fs::rename(&tmp_path, &final_path).map_err(io_err)?;
2191            p.cold_segment_paths.insert(report.segment_id, final_path);
2192        }
2193        Ok(report)
2194    }
2195
2196    /// v7.1 — override the auto-checkpoint WAL-size ceiling for
2197    /// this `Database` instance. Default is
2198    /// `SPG_EMBEDDED_CHECKPOINT_BYTES` env (4 MiB if unset); the
2199    /// setter wins. No-op when the database is in-memory.
2200    pub fn set_checkpoint_threshold_bytes(&mut self, bytes: u64) {
2201        if let Some(p) = &mut self.persistence {
2202            p.checkpoint_threshold_bytes = bytes.max(1);
2203        }
2204    }
2205
2206    /// v7.31 (memory campaign, round-26 ask 1/ask 4) — per-bucket
2207    /// memory snapshot for the embedding host. Poll it from prod to
2208    /// see where resident bytes live (rows / representation /
2209    /// indexes per table) and to drive host-side shedding before
2210    /// the kernel does it. Same numbers as the server path's
2211    /// `SELECT * FROM spg_memory_stats`.
2212    #[must_use]
2213    pub fn memory_stats(&self) -> spg_engine::MemoryStats {
2214        let mut stats = self.engine.memory_stats();
2215        // v7.31 C2 — fill in bucket D: the engine leaves `wal_bytes`
2216        // None (it has no WAL); we report the live (uncheckpointed)
2217        // WAL footprint via the same `written_len()` meter `metrics()`
2218        // reads. In-memory databases have no persistence → stays None.
2219        if let Some(p) = &self.persistence {
2220            stats.wal_bytes = Some(p.wal.written_len());
2221        }
2222        stats
2223    }
2224
2225    /// v7.1 — flush a fresh catalog snapshot to `db_path` and
2226    /// rotate the WAL. Idempotent; cheap when nothing has happened
2227    /// since the last checkpoint. No-op when the database is in-memory.
2228    ///
2229    /// CoW-2 (v7.34): the heavy half (serialize + tmp+rename + fsync +
2230    /// marker enqueue + chunk rotation) runs on a dedicated worker thread
2231    /// so the caller's engine borrow is released after the cheap capture
2232    /// step. This entry point keeps the **synchronous** contract — it
2233    /// waits for the worker to finish before returning — so existing
2234    /// callers, tests, and operator scripts see no behaviour change;
2235    /// they just pay one extra hop. The non-blocking variant lives at
2236    /// `trigger_checkpoint`, used by the auto-checkpoint hot path so
2237    /// the write that crossed `SPG_EMBEDDED_CHECKPOINT_BYTES` doesn't
2238    /// stall on disk IO.
2239    ///
2240    /// Called automatically when:
2241    /// - the WAL grows past `SPG_EMBEDDED_CHECKPOINT_BYTES` (default
2242    ///   4 MiB) at the end of an `execute()` (via `trigger_checkpoint`,
2243    ///   non-blocking), and
2244    /// - `Drop` runs (synchronous; best-effort, failures logged).
2245    pub fn checkpoint(&mut self) -> Result<(), EngineError> {
2246        if self.persistence.is_none() {
2247            return Ok(());
2248        }
2249        // Drain any prior async checkpoint first so our snapshot reflects
2250        // post-it state (and so a sticky error from it surfaces here, not
2251        // smeared across the next two `wait`s).
2252        self.wait_checkpoint()?;
2253        let Some(job) = self.snapshot_checkpoint_job() else {
2254            return Ok(());
2255        };
2256        let Some(worker) = self
2257            .persistence
2258            .as_ref()
2259            .and_then(|p| p.checkpoint_worker.as_ref())
2260        else {
2261            return Ok(());
2262        };
2263        // `wait_checkpoint` above guaranteed idle; `try_enqueue` only
2264        // returns Ok(false) when busy, so we expect Ok(true) here. The
2265        // bool is dropped — we wait unconditionally to honour the sync
2266        // contract.
2267        let _ = worker.try_enqueue(job)?;
2268        self.wait_checkpoint()
2269    }
2270
2271    /// CoW-2 (v7.34) — non-blocking checkpoint trigger used by the
2272    /// auto-checkpoint hot path (`wal_after_ok` over the threshold).
2273    /// Captures the engine state under `&mut self` then signals the
2274    /// background worker and returns; the serialize / fsync / rotate
2275    /// sequence runs on the worker thread. If a checkpoint is already
2276    /// pending or in flight, the new trigger is silently dropped —
2277    /// the next threshold crossing picks up the newer state.
2278    ///
2279    /// Sticky errors from a prior async run surface here (via
2280    /// `try_enqueue`), so a failed background checkpoint still reaches
2281    /// the caller eventually rather than vanishing.
2282    fn trigger_checkpoint(&mut self) -> Result<(), EngineError> {
2283        if self.persistence.is_none() {
2284            return Ok(());
2285        }
2286        let Some(job) = self.snapshot_checkpoint_job() else {
2287            return Ok(());
2288        };
2289        let Some(worker) = self
2290            .persistence
2291            .as_ref()
2292            .and_then(|p| p.checkpoint_worker.as_ref())
2293        else {
2294            return Ok(());
2295        };
2296        let _accepted = worker.try_enqueue(job)?;
2297        Ok(())
2298    }
2299
2300    /// CoW-2 (v7.34) — block until the background checkpoint worker is
2301    /// idle. Used by sync `checkpoint()` and by Drop to ensure the final
2302    /// snapshot is durable before the process exits.
2303    fn wait_checkpoint(&self) -> Result<(), EngineError> {
2304        match self
2305            .persistence
2306            .as_ref()
2307            .and_then(|p| p.checkpoint_worker.as_ref())
2308        {
2309            Some(w) => w.wait(),
2310            None => Ok(()),
2311        }
2312    }
2313
2314    /// CoW-2 (v7.34) — capture a checkpoint job under `&mut self` (or
2315    /// `&self`, since reading from atomics + cheap clones don't mutate).
2316    /// Returns `None` if the database is in-memory.
2317    fn snapshot_checkpoint_job(&self) -> Option<CheckpointJob> {
2318        let p = self.persistence.as_ref()?;
2319        Some(CheckpointJob {
2320            snapshot: self.engine.snapshot_data(),
2321            marker_lsn: self.commit_lsn.load(Ordering::SeqCst),
2322            db_path: p.db_path.clone(),
2323            wal_dir: p.wal_dir.clone(),
2324            wal: Arc::clone(&p.wal),
2325            cold_segments: p
2326                .cold_segment_paths
2327                .iter()
2328                .map(|(&id, path)| (id, path.clone()))
2329                .collect(),
2330            current_chunk_path: Arc::clone(&p.current_chunk_path),
2331        })
2332    }
2333
2334    /// Restore a database from a previously-captured catalog
2335    /// snapshot. Pairs with `Database::snapshot()` for
2336    /// round-tripping in-memory state without going through
2337    /// the `spg-server` WAL.
2338    pub fn restore(snapshot: &[u8]) -> Result<Self, EngineError> {
2339        let engine = Engine::restore_envelope(snapshot).map_err(|e| {
2340            EngineError::Storage(spg_storage::StorageError::Corrupt(format!("restore: {e}")))
2341        })?;
2342        let db = Self {
2343            engine,
2344            persistence: None,
2345            commit_lsn: AtomicU64::new(0),
2346            tx_wal: None,
2347        };
2348        // v7.37.2 — auto-warm on snapshot restore for the same reason
2349        // `open_path` does (catalog is server-ready when constructor
2350        // returns; client never sees a SPG-specific warmup call).
2351        autowarm_cold_tier_on_open(&db);
2352        Ok(db)
2353    }
2354
2355    /// Take a catalog snapshot suitable for `Database::restore`.
2356    /// The bytes are SPG's canonical catalog envelope (FILE_MAGIC
2357    /// + version + payload); round-trips through every released
2358    /// SPG version per the STABILITY contract.
2359    #[must_use]
2360    pub fn snapshot(&self) -> Vec<u8> {
2361        self.engine.snapshot()
2362    }
2363
2364    /// v7.36 (mailrs ask #4) — programmatic `EXPLAIN` over `sql`,
2365    /// returning each line of the QUERY PLAN as an owned `String`.
2366    /// Skips the WAL (`EXPLAIN` is read-only) and runs against the
2367    /// engine's live catalog. Dogfood callers can attach the plan
2368    /// to a report or assert on its shape from a test without
2369    /// having to parse a tabular result themselves.
2370    ///
2371    /// `sql` is the inner SELECT (no `EXPLAIN` prefix); the helper
2372    /// adds it. For SQL with `$N` placeholders, substitute them
2373    /// into the SQL string before calling — programmatic
2374    /// placeholder-aware EXPLAIN is on the v7.37 plan.
2375    ///
2376    /// # Errors
2377    /// Propagates parse errors on `sql`, plus any engine error the
2378    /// `EXPLAIN` itself raises (table not found, column not found).
2379    pub fn explain(&self, sql: &str) -> Result<Vec<String>, EngineError> {
2380        let full = format!("EXPLAIN {sql}");
2381        let result = self.engine.execute_readonly(&full)?;
2382        Ok(extract_query_plan_lines(result))
2383    }
2384
2385    /// Write-side single-statement execute. Runs the SQL through
2386    /// the buffered group-commit pipeline and blocks until the
2387    /// resulting batch's WAL fsync returns. Read-only statements
2388    /// (SELECT / SHOW / EXPLAIN / BEGIN-COMMIT-ROLLBACK /
2389    /// CHECKPOINT / COMPACT etc.) skip the WAL entirely.
2390    pub fn execute(&mut self, sql: &str) -> Result<QueryResult, EngineError> {
2391        // v7.20 P2 — single-caller convenience over the buffered
2392        // path: enqueue + immediately wait. Batch size is 1 here,
2393        // so the durability behaviour (one fsync before Ok) is
2394        // identical to v7.19. Concurrent callers go through
2395        // `execute_buffered` (AsyncDatabase does) and share the
2396        // leader's fsync.
2397        let (result, ticket) = self.execute_buffered(sql)?;
2398        if let Some(t) = ticket {
2399            t.wait()?;
2400        }
2401        Ok(result)
2402    }
2403
2404    /// v7.20 P2 — group-commit write entry. Runs the engine
2405    /// mutation + encodes/enqueues the WAL record, then RETURNS
2406    /// WITHOUT waiting for the fsync. The caller must call
2407    /// [`WalTicket::wait`] before treating the write as durable
2408    /// — crucially, the caller can (and should) drop whatever
2409    /// lock guards this `Database` first, so the next writer's
2410    /// mutation overlaps this batch's fsync.
2411    ///
2412    /// `None` ticket = nothing hit the WAL (read-only statement,
2413    /// no-op DDL, or in-memory database) — the result is final
2414    /// as returned.
2415    ///
2416    /// # Errors
2417    /// Engine errors propagate unchanged. Auto-checkpoint (when
2418    /// the active chunk crosses the threshold) runs inline and
2419    /// may surface IO errors.
2420    pub fn execute_buffered(
2421        &mut self,
2422        sql: &str,
2423    ) -> Result<(QueryResult, Option<WalTicket>), EngineError> {
2424        let result = self.engine.execute(sql)?;
2425        let modified = matches!(
2426            &result,
2427            QueryResult::CommandOk {
2428                modified_catalog: true,
2429                ..
2430            }
2431        );
2432        let ticket = self.wal_after_ok(sql, modified)?;
2433        Ok((result, ticket))
2434    }
2435
2436    /// v7.21 (round-12 polish) — post-engine WAL bookkeeping shared
2437    /// by the simple ([`Self::execute_buffered`]) and prepared
2438    /// ([`Self::execute_prepared_buffered`]) write paths. `canonical`
2439    /// is the replay text (bind-final for prepared statements);
2440    /// `modified_catalog` comes from the engine result. Three routes:
2441    ///
2442    /// - transaction control → maintain [`Self::tx_wal`]: BEGIN opens
2443    ///   the buffer, COMMIT flushes it as ONE atomic
2444    ///   `WAL_V4_TYPE_TX_COMMIT_SQL` record, ROLLBACK drops it,
2445    ///   SAVEPOINT / ROLLBACK TO mark / truncate it. The engine has
2446    ///   already accepted the statement, so this only mirrors state.
2447    /// - inside an open transaction → buffer the statement (shadow-
2448    ///   catalog mutations report `modified_catalog: false`, so the
2449    ///   auto-commit arm below can't see them).
2450    /// - auto-commit mutation → classic per-statement v4 record.
2451    ///
2452    /// v7.18 PITR — v4 records carry commit LSN + wall-clock micros.
2453    /// The crash window remains one BATCH: replay re-applies
2454    /// idempotently exactly as before, and a torn batch tail drops
2455    /// cleanly (same torn-write handling).
2456    fn wal_after_ok(
2457        &mut self,
2458        canonical: &str,
2459        modified_catalog: bool,
2460    ) -> Result<Option<WalTicket>, EngineError> {
2461        if self.persistence.is_none() {
2462            return Ok(None);
2463        }
2464        let mut record = None;
2465        match tx_control_kind(canonical) {
2466            Some(TxControl::Begin) => {
2467                self.tx_wal = Some(TxWalBuffer::default());
2468            }
2469            Some(TxControl::Commit) => {
2470                if let Some(buf) = self.tx_wal.take()
2471                    && !buf.statements.is_empty()
2472                {
2473                    let script = buf.statements.join(";\n");
2474                    let lsn = self.commit_lsn.fetch_add(1, Ordering::SeqCst) + 1;
2475                    record = Some(encode_v4_tx_commit(&script, lsn, wall_clock_micros()));
2476                }
2477            }
2478            Some(TxControl::Rollback) => {
2479                self.tx_wal = None;
2480            }
2481            Some(TxControl::Savepoint(name)) => {
2482                if let Some(buf) = &mut self.tx_wal {
2483                    // PG name-reuse semantics: latest mark wins.
2484                    buf.savepoints.retain(|(n, _)| n != &name);
2485                    let mark = buf.statements.len();
2486                    buf.savepoints.push((name, mark));
2487                }
2488            }
2489            Some(TxControl::RollbackToSavepoint(name)) => {
2490                if let Some(buf) = &mut self.tx_wal
2491                    && let Some(pos) = buf.savepoints.iter().position(|(n, _)| n == &name)
2492                {
2493                    let mark = buf.savepoints[pos].1;
2494                    buf.statements.truncate(mark);
2495                    // Later savepoints die with the rollback; the
2496                    // target itself survives (PG keeps it
2497                    // re-rollbackable).
2498                    buf.savepoints.truncate(pos + 1);
2499                }
2500            }
2501            Some(TxControl::ReleaseSavepoint) => {
2502                // RELEASE folds the savepoint into the enclosing tx —
2503                // buffered statements stay. The mark also stays:
2504                // marks are only consulted by ROLLBACK TO, which the
2505                // engine validates first, so a dangling mark is
2506                // unreachable.
2507            }
2508            None => {
2509                if let Some(buf) = &mut self.tx_wal {
2510                    if !sql_is_read_only(canonical) {
2511                        buf.statements.push(canonical.to_string());
2512                    }
2513                } else if modified_catalog && !sql_is_read_only(canonical) {
2514                    let lsn = self.commit_lsn.fetch_add(1, Ordering::SeqCst) + 1;
2515                    // v7.34 (crash-recovery P0 #2) — hybrid log: when
2516                    // row-level redo is on and this statement produced row
2517                    // changes (DML), write a physical 0x13 redo record so
2518                    // replay applies it directly. A statement with no row
2519                    // changes (DDL: CREATE/ALTER, never goes through
2520                    // Table::insert/update/delete) drains an empty redo and
2521                    // keeps the SQL record so the schema still replays.
2522                    let redo = if row_redo_enabled() {
2523                        self.engine.take_redo()
2524                    } else {
2525                        Vec::new()
2526                    };
2527                    record = Some(if redo.is_empty() {
2528                        encode_v4_auto_commit(canonical, lsn, wall_clock_micros())
2529                    } else {
2530                        encode_v5_row_redo(
2531                            &spg_storage::encode_redo_log(&redo),
2532                            lsn,
2533                            wall_clock_micros(),
2534                        )
2535                    });
2536                }
2537            }
2538        }
2539        let mut ticket = None;
2540        if let Some(record) = record {
2541            let p = self.persistence.as_mut().expect("checked above");
2542            let seq = p.wal.enqueue(&record);
2543            ticket = Some(WalTicket {
2544                group: Arc::clone(&p.wal),
2545                seq,
2546            });
2547            if p.wal.written_len() >= p.checkpoint_threshold_bytes {
2548                // CoW-2 (v7.34): hot path — fire-and-forget. The worker
2549                // serializes off this thread so the commit that just
2550                // crossed the threshold doesn't stall on a multi-hundred-ms
2551                // snapshot write. Any sticky error from a prior async
2552                // checkpoint surfaces here.
2553                self.trigger_checkpoint()?;
2554            }
2555        }
2556        Ok(ticket)
2557    }
2558
2559    /// v7.3.0 — typed-row variant of [`Database::query`]. Each
2560    /// row decodes into a `T: FromSpgRow` so callers don't
2561    /// pattern-match on `Value` themselves. Use [`spg_row!`] to
2562    /// generate the impl, or write it by hand.
2563    pub fn query_typed<T: FromSpgRow>(&mut self, sql: &str) -> Result<Vec<T>, EngineError> {
2564        let rows = self.query(sql)?;
2565        rows.into_iter().map(|r| T::from_spg_row(&r)).collect()
2566    }
2567
2568    /// Run a SELECT and return rows as a `Vec<Vec<Value>>` —
2569    /// strips the column-schema metadata for read-side
2570    /// ergonomics. Errors on non-Rows results (DML / DDL
2571    /// statements should go through `execute` instead).
2572    pub fn query(&mut self, sql: &str) -> Result<Vec<Vec<Value>>, EngineError> {
2573        match self.engine.execute(sql)? {
2574            QueryResult::Rows { rows, .. } => Ok(rows.into_iter().map(|r| r.values).collect()),
2575            QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
2576                "query() expects a SELECT — use execute() for DML/DDL".into(),
2577            )),
2578            // v7.5.0 — QueryResult is #[non_exhaustive]; any future
2579            // variant is not a SELECT row stream, treat as Unsupported.
2580            _ => Err(EngineError::Unsupported(
2581                "query() expects a SELECT — use execute() for DML/DDL".into(),
2582            )),
2583        }
2584    }
2585
2586    /// v7.16.0 — column-aware variant of [`Self::query`].
2587    /// Returns the column schema vec alongside the rows so
2588    /// adapters (the spg-sqlx Row impl most notably) can drive
2589    /// name + type-based column lookups. Errors on non-Rows
2590    /// results identically to `query`.
2591    pub fn query_with_columns(
2592        &mut self,
2593        sql: &str,
2594    ) -> Result<(Vec<spg_storage::ColumnSchema>, Vec<Vec<Value>>), EngineError> {
2595        match self.engine.execute(sql)? {
2596            QueryResult::Rows { columns, rows } => {
2597                Ok((columns, rows.into_iter().map(|r| r.values).collect()))
2598            }
2599            QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
2600                "query_with_columns() expects a SELECT — use execute() for DML/DDL".into(),
2601            )),
2602            _ => Err(EngineError::Unsupported(
2603                "query_with_columns() expects a SELECT — use execute() for DML/DDL".into(),
2604            )),
2605        }
2606    }
2607
2608    /// v7.16.0 — column-aware variant of
2609    /// [`Self::query_prepared`]. Same shape as
2610    /// `query_with_columns` but driven from a prepared
2611    /// statement + bound params.
2612    pub fn query_prepared_with_columns(
2613        &mut self,
2614        stmt: &Statement,
2615        params: &[Value],
2616    ) -> Result<(Vec<spg_storage::ColumnSchema>, Vec<Vec<Value>>), EngineError> {
2617        match self.engine.execute_prepared(stmt.stmt.clone(), params)? {
2618            QueryResult::Rows { columns, rows } => {
2619                Ok((columns, rows.into_iter().map(|r| r.values).collect()))
2620            }
2621            QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
2622                "query_prepared_with_columns() expects a SELECT — use execute_prepared() for DML/DDL".into(),
2623            )),
2624            _ => Err(EngineError::Unsupported(
2625                "query_prepared_with_columns() expects a SELECT — use execute_prepared() for DML/DDL".into(),
2626            )),
2627        }
2628    }
2629
2630    /// Borrow the underlying engine. Escape hatch for callers
2631    /// that need access to `spg-engine` APIs not yet surfaced
2632    /// here (transactions, EXPLAIN ANALYZE, etc.).
2633    #[must_use]
2634    pub const fn engine(&self) -> &Engine {
2635        &self.engine
2636    }
2637
2638    /// Mutable borrow of the underlying engine. Same intent as
2639    /// `engine()` but for write-side APIs (e.g. inserting
2640    /// directly through `Catalog::insert` for high-throughput
2641    /// bulk loads that bypass SQL parsing).
2642    pub const fn engine_mut(&mut self) -> &mut Engine {
2643        &mut self.engine
2644    }
2645
2646    /// v7.38 (mailrs prod 7.35 pool-exhaustion incident) — boot-time
2647    /// plan-IR cache warm-up. Pre-prepares the listed SQL shapes so
2648    /// the first user-facing request doesn't pay the 2-3 s
2649    /// first-fire parse + JOIN-reorder cost on the readonly-blocking
2650    /// pool. Recommended call site: `Database::new` immediately after
2651    /// catalog restore, before serving any traffic. Returns the
2652    /// number of statements successfully cached.
2653    pub fn warm_up_plan_cache(&mut self, sqls: &[&str]) -> usize {
2654        self.engine.warm_up_plan_cache(sqls)
2655    }
2656
2657    /// v7.38 (mailrs prod 7.35 pool-exhaustion incident) — boot-time
2658    /// cold-tier OS page-cache warm-up. Touches every cold segment
2659    /// file in the active catalog so the kernel page cache loads
2660    /// them before user traffic arrives. On a hot-only catalog the
2661    /// call is a near-no-op. Returns the total cold rows touched.
2662    pub fn warm_up_cold_tier(&self) -> usize {
2663        self.engine.warm_up_cold_tier()
2664    }
2665
2666    /// v7.16.0 — parse + plan a SQL string ONCE so subsequent
2667    /// `execute_prepared` / `query_prepared` calls can re-bind
2668    /// parameters without re-parsing. The returned [`Statement`]
2669    /// is a thin handle around the AST + cached source SQL; it's
2670    /// `Clone` so the same plan can drive many bind calls
2671    /// concurrently (each call clones the AST and runs
2672    /// placeholder substitution on the clone — the cached
2673    /// plan stays intact).
2674    ///
2675    /// Plan caching follows the engine's existing version-aware
2676    /// rule: a prepared `Statement` whose statistics version
2677    /// has rolled (ANALYZE ran between prepare and execute)
2678    /// will silently re-prepare under the hood. Callers don't
2679    /// need to detect this.
2680    ///
2681    /// Placeholders in the SQL use PG's `$1`, `$2`, … convention.
2682    /// `bind`-time `Value`s are passed as a slice; arity
2683    /// mismatches surface as `EvalError::PlaceholderOutOfRange`
2684    /// at `execute_prepared` time, not here.
2685    ///
2686    /// # Errors
2687    /// Surfaces `EngineError` (parse error / plan rewrite
2688    /// failure) from the underlying `Engine::prepare`.
2689    pub fn prepare(&mut self, sql: &str) -> Result<Statement, EngineError> {
2690        // Use the cached path so repeated prepares of the same
2691        // SQL are O(1). The engine's plan cache stays shared
2692        // across all callers of this Database — a single
2693        // `PgPool`-shaped consumer (or, later, the spg-sqlx
2694        // adapter) prepares once and reaps the win on every bind.
2695        let stmt = self
2696            .engine
2697            .prepare_cached(sql)
2698            .map_err(EngineError::Parse)?;
2699        Ok(Statement {
2700            stmt,
2701            sql: sql.to_string(),
2702        })
2703    }
2704
2705    /// v7.17.0 Phase 3.P0-66 — describe a SQL string without
2706    /// executing. Returns `(parameter_oid_count, output_columns)`
2707    /// where `output_columns` is empty for non-SELECT statements
2708    /// or for SELECT shapes the describe planner can't resolve
2709    /// (JOIN / subquery / unknown table). Wraps
2710    /// `Engine::describe_prepared` so the spg-sqlx bridge can
2711    /// surface PG-shape Describe replies for
2712    /// `sqlx::query!()` compile-time validation.
2713    ///
2714    /// # Errors
2715    /// Propagates parse errors from the underlying prepare path.
2716    pub fn describe(&mut self, sql: &str) -> Result<(Vec<u32>, Vec<ColumnSchema>), EngineError> {
2717        let stmt = self
2718            .engine
2719            .prepare_cached(sql)
2720            .map_err(EngineError::Parse)?;
2721        Ok(self.engine.describe_prepared(&stmt))
2722    }
2723
2724    /// v7.16.0 — execute a prepared statement with bound
2725    /// parameters. Mirrors `Engine::execute_prepared`: clones
2726    /// the AST, substitutes `$1..$N` → `params[0..N-1]`, runs.
2727    ///
2728    /// Persistence (WAL fsync + auto-checkpoint) follows the
2729    /// same rules as `execute(sql)`: mutating statements get a
2730    /// WAL record AFTER the in-memory exec succeeds. The WAL
2731    /// record carries the substituted, bind-final SQL, so
2732    /// replay reconstructs the same row state without needing
2733    /// the original prepared `Statement` to still be alive.
2734    ///
2735    /// # Errors
2736    /// Propagates engine errors. Param arity mismatch surfaces
2737    /// as `EvalError::PlaceholderOutOfRange`.
2738    pub fn execute_prepared(
2739        &mut self,
2740        stmt: &Statement,
2741        params: &[Value],
2742    ) -> Result<QueryResult, EngineError> {
2743        let (result, ticket) = self.execute_prepared_buffered(stmt, params)?;
2744        if let Some(t) = ticket {
2745            t.wait()?;
2746        }
2747        Ok(result)
2748    }
2749
2750    /// v7.20 P2 — group-commit variant of
2751    /// [`Database::execute_prepared`]. Same contract as
2752    /// [`Database::execute_buffered`]: mutation + enqueue happen
2753    /// here; the caller waits on the ticket AFTER releasing
2754    /// whatever lock guards this `Database`.
2755    ///
2756    /// # Errors
2757    /// Engine errors propagate unchanged; inline auto-checkpoint
2758    /// may surface IO errors.
2759    pub fn execute_prepared_buffered(
2760        &mut self,
2761        stmt: &Statement,
2762        params: &[Value],
2763    ) -> Result<(QueryResult, Option<WalTicket>), EngineError> {
2764        let result = self.engine.execute_prepared(stmt.stmt.clone(), params)?;
2765        let modified = matches!(
2766            &result,
2767            QueryResult::CommandOk {
2768                modified_catalog: true,
2769                ..
2770            }
2771        );
2772        // WAL persistence on the bind-final SQL. Build the
2773        // canonical Display form by re-printing the
2774        // placeholder-substituted statement (cheap — the AST
2775        // is already in hand from execute_prepared's internal
2776        // clone) so replay's path is identical to the
2777        // simple-query path. v7.21: also when a transaction is
2778        // open — in-tx mutations report `modified_catalog: false`
2779        // but must reach the tx WAL buffer (see `wal_after_ok`).
2780        let mut ticket = None;
2781        if self.persistence.is_some()
2782            && (modified
2783                || (self.tx_wal.is_some() && !sql_is_read_only(&stmt.sql))
2784                || tx_control_kind(&stmt.sql).is_some())
2785        {
2786            let mut wal_stmt = stmt.stmt.clone();
2787            crate::wal_render_with_params(&mut wal_stmt, params);
2788            let canonical = format!("{wal_stmt}");
2789            ticket = self.wal_after_ok(&canonical, modified)?;
2790        }
2791        Ok((result, ticket))
2792    }
2793
2794    /// v7.16.0 — run a prepared SELECT with bound params and
2795    /// return rows as `Vec<Vec<Value>>`, matching `query()`
2796    /// shape. SELECTs are read-only so this never writes the
2797    /// WAL.
2798    ///
2799    /// # Errors
2800    /// Returns `Unsupported` if the prepared statement isn't a
2801    /// SELECT (use `execute_prepared` for DML/DDL).
2802    pub fn query_prepared(
2803        &mut self,
2804        stmt: &Statement,
2805        params: &[Value],
2806    ) -> Result<Vec<Vec<Value>>, EngineError> {
2807        match self.engine.execute_prepared(stmt.stmt.clone(), params)? {
2808            QueryResult::Rows { rows, .. } => Ok(rows.into_iter().map(|r| r.values).collect()),
2809            QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
2810                "query_prepared() expects a SELECT — use execute_prepared() for DML/DDL".into(),
2811            )),
2812            _ => Err(EngineError::Unsupported(
2813                "query_prepared() expects a SELECT — use execute_prepared() for DML/DDL".into(),
2814            )),
2815        }
2816    }
2817
2818    /// v7.18 — parse + plan a SQL string against a
2819    /// `CatalogSnapshot`. Mirror of [`Database::prepare`] for the
2820    /// readonly fan-out path: no writer lock taken, no WAL write,
2821    /// no plan-cache mutation. Static-on-`Self` so callers can
2822    /// dispatch against a snapshot without an `&mut Database`
2823    /// borrow — `AsyncReadHandle::prepare` in spg-embedded-tokio
2824    /// is the load-bearing consumer.
2825    ///
2826    /// # Errors
2827    /// Propagates `EngineError::Parse` from the parser.
2828    pub fn prepare_on_snapshot(
2829        snapshot: &CatalogSnapshot,
2830        sql: &str,
2831    ) -> Result<Statement, EngineError> {
2832        let stmt =
2833            spg_engine::Engine::prepare_on_snapshot(snapshot, sql).map_err(EngineError::Parse)?;
2834        Ok(Statement {
2835            stmt,
2836            sql: sql.to_string(),
2837        })
2838    }
2839
2840    /// v7.18 — execute a prepared `Statement` against a
2841    /// `CatalogSnapshot` with bound params. Mirror of
2842    /// [`Database::execute_prepared`] on the readonly path:
2843    /// writes / DDL hit `EngineError::WriteRequired`. No WAL
2844    /// write, no writer lock, multiple snapshots can run
2845    /// concurrently — the snapshot is immutable from prepare time.
2846    ///
2847    /// # Errors
2848    /// Surfaces `EngineError::WriteRequired` for non-readonly
2849    /// statements; propagates other engine errors.
2850    pub fn execute_prepared_on_snapshot(
2851        snapshot: &CatalogSnapshot,
2852        stmt: &Statement,
2853        params: &[Value],
2854    ) -> Result<QueryResult, EngineError> {
2855        spg_engine::Engine::execute_readonly_prepared_on_snapshot(
2856            snapshot,
2857            stmt.stmt.clone(),
2858            params,
2859        )
2860    }
2861
2862    /// v7.28 (round-22) — deadline-bounded variant of
2863    /// [`Database::execute_prepared_on_snapshot`]. Returns
2864    /// `EngineError::Cancelled` once the budget elapses; the
2865    /// sqlx driver uses this to keep readonly-INLINE execution
2866    /// from monopolising the caller's async runtime (four slow
2867    /// inbox queries saturated mailrs's whole tokio pool) and
2868    /// re-runs over the blocking pool on timeout.
2869    ///
2870    /// # Errors
2871    /// `EngineError::Cancelled` on budget expiry; engine errors
2872    /// otherwise.
2873    pub fn execute_prepared_on_snapshot_with_budget(
2874        snapshot: &CatalogSnapshot,
2875        stmt: &Statement,
2876        params: &[Value],
2877        budget_us: u64,
2878    ) -> Result<QueryResult, EngineError> {
2879        fn mono_now_us() -> u64 {
2880            use std::time::{SystemTime, UNIX_EPOCH};
2881            // Monotonic enough for a per-call relative budget: the
2882            // engine only compares (now - start) against the budget
2883            // within one call.
2884            SystemTime::now()
2885                .duration_since(UNIX_EPOCH)
2886                .map(|d| u64::try_from(d.as_micros()).unwrap_or(u64::MAX))
2887                .unwrap_or(0)
2888        }
2889        let deadline = mono_now_us().saturating_add(budget_us);
2890        let token = spg_engine::CancelToken::none().with_deadline(mono_now_us, deadline);
2891        spg_engine::Engine::execute_readonly_prepared_on_snapshot_with_cancel(
2892            snapshot,
2893            stmt.stmt.clone(),
2894            params,
2895            token,
2896        )
2897    }
2898
2899    /// v7.18 — describe a SQL string against a
2900    /// `CatalogSnapshot`. Mirror of [`Database::describe`] on
2901    /// the readonly path. Pure function on the snapshot's
2902    /// catalog; safe to call from any thread.
2903    ///
2904    /// # Errors
2905    /// Propagates `EngineError::Parse` from the parser.
2906    pub fn describe_on_snapshot(
2907        snapshot: &CatalogSnapshot,
2908        sql: &str,
2909    ) -> Result<(Vec<u32>, Vec<ColumnSchema>), EngineError> {
2910        let stmt =
2911            spg_engine::Engine::prepare_on_snapshot(snapshot, sql).map_err(EngineError::Parse)?;
2912        Ok(spg_engine::Engine::describe_prepared_on_snapshot(
2913            snapshot, &stmt,
2914        ))
2915    }
2916
2917    /// v7.21 (round-12 polish) — run a multi-statement SQL script
2918    /// with PG simple-query semantics: the statements execute in
2919    /// order inside ONE implicit transaction, so a mid-script error
2920    /// rolls back the whole script (PG wraps every simple-query
2921    /// message in an implicit transaction). Three exceptions, all
2922    /// PG-faithful:
2923    ///
2924    /// - a script that carries its OWN transaction control
2925    ///   (BEGIN / COMMIT / …) runs statement-by-statement — the
2926    ///   script owns its boundaries;
2927    /// - a script run while the caller already has a transaction
2928    ///   open joins that transaction (no nested BEGIN), and the
2929    ///   caller's COMMIT / ROLLBACK decides its fate;
2930    /// - a single-statement script is plain auto-commit.
2931    ///
2932    /// Returns one `QueryResult` per executed statement. This is the
2933    /// engine behind `sqlx::raw_sql` (mailrs feeds whole
2934    /// `init-schema.sql` files through it) and `spgctl import`.
2935    ///
2936    /// # Errors
2937    /// The first failing statement's error propagates after the
2938    /// implicit ROLLBACK; nothing from the script remains applied.
2939    pub fn execute_script(&mut self, sql: &str) -> Result<Vec<QueryResult>, EngineError> {
2940        let stmts = split_statements(sql);
2941        let script_owns_tx = stmts.iter().any(|s| tx_control_kind(s).is_some());
2942        let wrap = stmts.len() > 1 && !script_owns_tx && !self.engine.in_transaction();
2943        if !wrap {
2944            let mut out = Vec::with_capacity(stmts.len());
2945            for stmt in &stmts {
2946                out.push(self.execute_dump_statement(stmt)?);
2947            }
2948            return Ok(out);
2949        }
2950        self.execute("BEGIN")?;
2951        let mut out = Vec::with_capacity(stmts.len());
2952        for stmt in &stmts {
2953            match self.execute_dump_statement(stmt) {
2954                Ok(r) => out.push(r),
2955                Err(e) => {
2956                    // Best-effort rollback; surface the script error.
2957                    let _ = self.execute("ROLLBACK");
2958                    return Err(e);
2959                }
2960            }
2961        }
2962        self.execute("COMMIT")?;
2963        Ok(out)
2964    }
2965
2966    /// v7.22 (round-13 T2) — execute one `split_statements` chunk,
2967    /// lowering a `COPY … FROM stdin;` block (statement + its data
2968    /// lines, as one chunk) to per-row INSERTs through the shared
2969    /// `spg_engine::copy` helpers. Default-format pg_dump emits
2970    /// COPY blocks, so the zero-change import promise needs this on
2971    /// the embed path; non-COPY statements pass straight through to
2972    /// [`Self::execute`]. Public so `spgctl import` can keep its
2973    /// per-statement error indexing while sharing the lowering.
2974    ///
2975    /// # Errors
2976    /// Engine errors propagate; for COPY the failing row's INSERT
2977    /// error carries the synthesized statement context.
2978    pub fn execute_dump_statement(&mut self, stmt: &str) -> Result<QueryResult, EngineError> {
2979        // Strip pg_dump's `-- Data for Name: …;` banner (it carries
2980        // semicolons of its own) before splitting head from data.
2981        let stmt_clean = strip_leading_sql_noise(stmt);
2982        let head_is_copy = stmt_clean
2983            .get(..4)
2984            .is_some_and(|p| p.eq_ignore_ascii_case("copy"));
2985        if head_is_copy
2986            && let Some((head, data)) = stmt_clean.split_once(';')
2987            && let Some(spec) = spg_engine::copy::parse_copy_from_stdin_head(head)
2988        {
2989            let mut affected: usize = 0;
2990            for line in data.lines() {
2991                // Empty fragments only occur at the chunk boundary
2992                // (the remainder of the COPY line right after `;`);
2993                // data rows are whole non-empty lines.
2994                let line = line.strip_suffix('\r').unwrap_or(line);
2995                if line.is_empty() {
2996                    continue;
2997                }
2998                let values = spg_engine::copy::decode_copy_text_row(line);
2999                let insert = spg_engine::copy::build_copy_insert(
3000                    &spec.table,
3001                    spec.columns.as_deref(),
3002                    &values,
3003                );
3004                match self.execute(&insert)? {
3005                    QueryResult::CommandOk { affected: n, .. } => affected += n,
3006                    _ => affected += 1,
3007                }
3008            }
3009            return Ok(QueryResult::CommandOk {
3010                affected,
3011                modified_catalog: false,
3012            });
3013        }
3014        self.execute(stmt)
3015    }
3016
3017    /// v7.2.0 — run `body` inside an implicit `BEGIN` /
3018    /// `COMMIT` pair. The body receives `&mut Database` so it
3019    /// can `execute()` / `query()` like any other code path;
3020    /// the only difference is that every write in the body
3021    /// lands inside one transaction, and a returned `Err` from
3022    /// the body triggers `ROLLBACK` before the error propagates.
3023    ///
3024    /// Nested calls are not supported — SPG's transaction
3025    /// model is single-writer with explicit `BEGIN` /
3026    /// `COMMIT` / `ROLLBACK`, and a nested `with_transaction`
3027    /// would hit `EngineError::Unsupported("nested
3028    /// transaction")` at the inner `BEGIN`.
3029    pub fn with_transaction<R, F>(&mut self, body: F) -> Result<R, EngineError>
3030    where
3031        F: FnOnce(&mut Self) -> Result<R, EngineError>,
3032    {
3033        self.execute("BEGIN")?;
3034        match body(self) {
3035            Ok(value) => {
3036                self.execute("COMMIT")?;
3037                Ok(value)
3038            }
3039            Err(e) => {
3040                // Best-effort rollback. If ROLLBACK itself
3041                // fails (rare — the engine reports it via
3042                // `Unsupported` only when there's no active
3043                // TX, which can't happen here) we surface the
3044                // original body error, not the rollback error.
3045                let _ = self.execute("ROLLBACK");
3046                Err(e)
3047            }
3048        }
3049    }
3050}
3051
3052impl Default for Database {
3053    fn default() -> Self {
3054        Self::open_in_memory()
3055    }
3056}
3057
3058/// v7.7.5 — observability snapshot returned by
3059/// [`Database::metrics`]. Plain data, no allocations beyond
3060/// what the struct itself takes; cheap to construct and
3061/// cheap to serialise.
3062#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3063#[non_exhaustive]
3064pub struct EmbeddedMetrics {
3065    /// Total live row count across every user table (hot
3066    /// tier only — cold-tier rows live in segment files).
3067    pub hot_rows: u64,
3068    /// Sum of `Table::hot_bytes` across every user table.
3069    /// Tracks against the freezer's `hot_tier_bytes` budget.
3070    pub hot_bytes: u64,
3071    /// Number of cold-tier segments registered in the catalog.
3072    /// Includes tombstoned slots (segments retired by
3073    /// compaction whose disk file may still be on disk).
3074    pub cold_segments: u64,
3075    /// User-table count (excludes any future engine-managed
3076    /// internal tables).
3077    pub tables: u64,
3078    /// WAL size at last `execute()` / `checkpoint()`. Zero
3079    /// when the database is in-memory.
3080    pub wal_bytes: u64,
3081    /// `true` when the database was opened with `open_path` —
3082    /// i.e. WAL + checkpoint persistence is active.
3083    pub persistent: bool,
3084}
3085
3086/// v7.2.1 — handle returned by `spawn_background_freezer`.
3087/// Drop signals the worker thread to wind down + joins it,
3088/// so a `Database` (or its shared `Arc<Mutex<Database>>`)
3089/// can safely drop after the handle does.
3090#[must_use = "the background freezer keeps running until this handle is dropped"]
3091#[derive(Debug)]
3092pub struct FreezerHandle {
3093    shutdown: Arc<AtomicBool>,
3094    join: Option<JoinHandle<()>>,
3095}
3096
3097impl FreezerHandle {
3098    /// v7.2.1 — request the worker stop + join. Idempotent;
3099    /// safe to call from `Drop` (which also calls it).
3100    pub fn stop(&mut self) {
3101        self.shutdown.store(true, Ordering::Release);
3102        if let Some(h) = self.join.take() {
3103            let _ = h.join();
3104        }
3105    }
3106}
3107
3108impl Drop for FreezerHandle {
3109    fn drop(&mut self) {
3110        self.stop();
3111    }
3112}
3113
3114/// v7.2.1 — knobs for `Database::spawn_background_freezer`.
3115#[derive(Debug, Clone)]
3116pub struct FreezerOptions {
3117    /// Tick interval. Worker wakes every `tick`, checks the
3118    /// catalog's `hot_tier_bytes`, and freezes if over budget.
3119    pub tick: Duration,
3120    /// Hot-tier byte budget. Exceeded → next tick freezes the
3121    /// largest table's oldest `batch_rows` rows into a new
3122    /// cold segment.
3123    pub hot_tier_bytes: u64,
3124    /// Max rows the freezer demotes per fire.
3125    pub batch_rows: usize,
3126    /// v7.7.4 — auto-compact threshold. When the catalog has
3127    /// at least this many cold segments across all tables, the
3128    /// freezer fires a compaction pass after its next freeze.
3129    /// Set to `usize::MAX` to disable auto-compact entirely;
3130    /// the default is `64`, matching the `spg-server` operating
3131    /// point for SPG_COLD_COMPACT_SEGMENT_THRESHOLD.
3132    pub compact_when_segments_exceed: usize,
3133    /// v7.7.4 — target segment size for compaction merges,
3134    /// in bytes. Default 64 MiB, mirroring `spg-server`. Small
3135    /// segments below this size are merge candidates;
3136    /// segments at or above stay untouched.
3137    pub compact_target_bytes: u64,
3138}
3139
3140impl Default for FreezerOptions {
3141    fn default() -> Self {
3142        // Match the `spg-server` freezer's default operating
3143        // point (SPG_HOT_TIER_BYTES = 4 GiB, batch 1000 rows,
3144        // tick every 1 s) so embedded behaviour is predictable
3145        // for operators familiar with the server.
3146        Self {
3147            tick: Duration::from_secs(1),
3148            hot_tier_bytes: 4 * 1024 * 1024 * 1024,
3149            batch_rows: 1000,
3150            compact_when_segments_exceed: 64,
3151            compact_target_bytes: 64 * 1024 * 1024,
3152        }
3153    }
3154}
3155
3156impl Database {
3157    /// v7.7.4 — observe the catalog's cold-segment count.
3158    /// Useful for tests + dashboards that want to verify
3159    /// auto-compaction is firing.
3160    #[must_use]
3161    pub fn cold_segment_count(&self) -> usize {
3162        self.engine.catalog().cold_segment_count()
3163    }
3164
3165    /// v7.7.5 — observability snapshot. Returns a point-in-time
3166    /// view of the engine + persistence counters. Cheap (no
3167    /// locks beyond the existing `&self` borrow), so safe to
3168    /// call from a hot metrics-scrape path.
3169    ///
3170    /// Fields mirror the operational dashboard
3171    /// [`spg-server`](https://crates.io/crates/spg-server) exposes,
3172    /// minus the network counters that don't apply to embedded.
3173    #[must_use]
3174    pub fn metrics(&self) -> EmbeddedMetrics {
3175        let cat = self.engine.catalog();
3176        let mut hot_rows: u64 = 0;
3177        let mut hot_bytes: u64 = 0;
3178        for name in cat.table_names() {
3179            if let Some(t) = cat.get(&name) {
3180                hot_rows = hot_rows.saturating_add(t.row_count() as u64);
3181                hot_bytes = hot_bytes.saturating_add(t.hot_bytes());
3182            }
3183        }
3184        let (wal_bytes, persistent) = match &self.persistence {
3185            Some(p) => (p.wal.written_len(), true),
3186            None => (0, false),
3187        };
3188        EmbeddedMetrics {
3189            hot_rows,
3190            hot_bytes,
3191            cold_segments: cat.cold_segment_count() as u64,
3192            tables: cat.table_count() as u64,
3193            wal_bytes,
3194            persistent,
3195        }
3196    }
3197
3198    /// v7.2.1 — spawn a background thread that periodically
3199    /// runs `freeze_oldest_to_cold` when the catalog-wide hot
3200    /// tier exceeds `opts.hot_tier_bytes`. The `Arc<Mutex<_>>`
3201    /// pattern matches the v7.2 sharing story: callers wrap
3202    /// their `Database` in `Arc::new(Mutex::new(db))` once,
3203    /// then clone the Arc for the worker + for foreground
3204    /// access. Return value is a handle whose `Drop` joins the
3205    /// worker.
3206    ///
3207    /// Picks the freeze target the same way `spg-server`'s
3208    /// freezer does: largest-`hot_bytes` user table with at
3209    /// least one BTree integer-PK index. Tables without a
3210    /// freezable index are skipped silently.
3211    pub fn spawn_background_freezer(
3212        db: Arc<Mutex<Database>>,
3213        opts: FreezerOptions,
3214    ) -> FreezerHandle {
3215        let shutdown = Arc::new(AtomicBool::new(false));
3216        let shutdown_for_thread = Arc::clone(&shutdown);
3217        let join = thread::Builder::new()
3218            .name("spg-embedded-freezer".into())
3219            .spawn(move || {
3220                background_freezer_loop(db, opts, shutdown_for_thread);
3221            })
3222            .expect("spawn background freezer thread");
3223        FreezerHandle {
3224            shutdown,
3225            join: Some(join),
3226        }
3227    }
3228}
3229
3230/// v7.2.1 — the freezer's main loop, factored out so the
3231/// `Database::spawn_background_freezer` path stays readable.
3232fn background_freezer_loop(
3233    db: Arc<Mutex<Database>>,
3234    opts: FreezerOptions,
3235    shutdown: Arc<AtomicBool>,
3236) {
3237    // Sleep in short slices so a shutdown request resolves
3238    // quickly (vs sleeping the full tick).
3239    let slice = Duration::from_millis(50.min(opts.tick.as_millis() as u64));
3240    let mut last_tick = std::time::Instant::now();
3241    loop {
3242        if shutdown.load(Ordering::Acquire) {
3243            return;
3244        }
3245        thread::sleep(slice);
3246        if last_tick.elapsed() < opts.tick {
3247            continue;
3248        }
3249        last_tick = std::time::Instant::now();
3250        let Ok(mut guard) = db.lock() else {
3251            return;
3252        };
3253        if guard.engine.catalog().hot_tier_bytes() <= opts.hot_tier_bytes {
3254            continue;
3255        }
3256        let Some((table, index)) = pick_freeze_target(&guard) else {
3257            continue;
3258        };
3259        let row_count = guard
3260            .engine
3261            .catalog()
3262            .get(&table)
3263            .map_or(0, spg_storage::Table::row_count);
3264        let to_freeze = opts.batch_rows.min(row_count);
3265        if to_freeze == 0 {
3266            continue;
3267        }
3268        if let Err(e) = guard.freeze_oldest_to_cold(&table, &index, to_freeze) {
3269            eprintln!("spg-embedded: background freeze on {table}.{index} failed: {e:?}");
3270            continue;
3271        }
3272        // v7.7.4 — auto-compact. If the catalog now carries
3273        // more cold segments than the configured threshold,
3274        // run a single compaction pass. Failures are reported
3275        // but don't kill the loop; the next tick will retry.
3276        let count = guard.engine.catalog().cold_segment_count();
3277        if count > opts.compact_when_segments_exceed {
3278            if let Err(e) = guard
3279                .engine
3280                .compact_cold_segments_with_target(opts.compact_target_bytes)
3281            {
3282                eprintln!(
3283                    "spg-embedded: background compact failed (segments={count}, \
3284                     threshold={}): {e:?}",
3285                    opts.compact_when_segments_exceed,
3286                );
3287            }
3288        }
3289    }
3290}
3291
3292/// v7.2.1 — pick the highest-`hot_bytes` user table with a
3293/// BTree integer-PK index. Returns `(table, index_name)` so the
3294/// caller can dispatch through `freeze_oldest_to_cold`.
3295fn pick_freeze_target(db: &Database) -> Option<(String, String)> {
3296    let cat = db.engine.catalog();
3297    let mut best: Option<(String, String, u64)> = None;
3298    for name in cat.table_names() {
3299        let Some(t) = cat.get(&name) else { continue };
3300        if t.row_count() == 0 {
3301            continue;
3302        }
3303        let cols = &t.schema().columns;
3304        let Some(idx) = t.indices().iter().find(|i| {
3305            matches!(i.kind, spg_storage::IndexKind::BTree(_))
3306                && i.column_position < cols.len()
3307                && matches!(
3308                    cols[i.column_position].ty,
3309                    spg_storage::DataType::SmallInt
3310                        | spg_storage::DataType::Int
3311                        | spg_storage::DataType::BigInt
3312                )
3313        }) else {
3314            continue;
3315        };
3316        let hot = t.hot_bytes();
3317        match best {
3318            None => best = Some((name, idx.name.clone(), hot)),
3319            Some((_, _, best_hot)) if hot > best_hot => {
3320                best = Some((name, idx.name.clone(), hot));
3321            }
3322            _ => {}
3323        }
3324    }
3325    best.map(|(t, i, _)| (t, i))
3326}
3327
3328/// v7.7.6 — replay the first `to_seq` records of the WAL at
3329/// `wal_path` into a fresh engine and write the resulting
3330/// catalog snapshot to `out_db_path`. Same semantics as
3331/// `spg revert --wal … --to-seq N --out …` from the CLI:
3332///
3333///   - `to_seq == 0` → snapshot is the empty catalog
3334///   - WAL records beyond `to_seq` are not applied
3335///   - durability-checkpoint markers (v3 type 0x02) are
3336///     consumed without counting against the budget
3337///
3338/// Returns the number of statements actually applied
3339/// (`≤ to_seq`). The output snapshot is byte-identical to
3340/// what `Database::open_path(out_db_path)` would consume on
3341/// a subsequent open.
3342///
3343/// This is the "rewind" operator for an embedded database
3344/// that has been corrupted by a poison statement or a
3345/// half-applied migration. Pair with `cold_segment_paths`
3346/// preservation if your cold-tier files are still on disk.
3347///
3348/// # Errors
3349///
3350/// - `wal_path` unreadable or truncated mid-record
3351/// - WAL record decodes to invalid UTF-8 SQL
3352/// - WAL record's SQL is rejected by the engine
3353/// - `out_db_path` unwritable
3354pub fn revert_wal_to_seq(
3355    wal_path: impl AsRef<Path>,
3356    to_seq: u64,
3357    out_db_path: impl AsRef<Path>,
3358) -> Result<u64, EngineError> {
3359    // v7.19 — accept either a single-file legacy WAL (v7.18 and
3360    // earlier layout) or a chunked WAL directory (v7.19+). For a
3361    // directory, concatenate every `.wal` chunk in sorted order
3362    // — the same order open_path replays them in — so revert
3363    // sees the full record stream.
3364    let path = wal_path.as_ref();
3365    let wal_bytes = if path.is_dir() {
3366        let mut combined = Vec::new();
3367        let chunks = sorted_wal_chunks(path).map_err(io_err)?;
3368        for chunk in chunks {
3369            let bytes = std::fs::read(&chunk).map_err(io_err)?;
3370            combined.extend_from_slice(&bytes);
3371        }
3372        combined
3373    } else {
3374        std::fs::read(path).map_err(io_err)?
3375    };
3376    let mut engine = Engine::new();
3377    let mut applied = 0u64;
3378    let mut cur = 0usize;
3379    while cur < wal_bytes.len() && applied < to_seq {
3380        let (sql_bytes, total) = decode_wal_record(&wal_bytes[cur..])?;
3381        cur += total;
3382        if sql_bytes.is_empty() {
3383            continue;
3384        }
3385        let sql = core::str::from_utf8(&sql_bytes).map_err(|e| {
3386            EngineError::Storage(spg_storage::StorageError::Corrupt(format!(
3387                "WAL record at offset {cur}: non-UTF-8 SQL: {e}"
3388            )))
3389        })?;
3390        // v7.21 — tx-commit records carry a multi-statement script;
3391        // split_statements is a no-op for single-statement records.
3392        for stmt in split_statements(sql) {
3393            engine.execute(stmt)?;
3394        }
3395        applied += 1;
3396    }
3397    let snapshot = engine.snapshot();
3398    std::fs::write(out_db_path.as_ref(), &snapshot).map_err(io_err)?;
3399    Ok(applied)
3400}
3401
3402/// v7.7.6 — decode one WAL record from a byte tail. Returns
3403/// `(sql_bytes, header_plus_payload_len)`. Handles the three
3404/// on-disk formats (v1 / v2 / v3) the same way the CLI
3405/// `decode_one_record` and the engine's `replay_wal_bytes`
3406/// do. CRCs are not re-validated; the caller's intent is
3407/// "apply", not "validate".
3408fn decode_wal_record(tail: &[u8]) -> Result<(Vec<u8>, usize), EngineError> {
3409    if tail.len() < 4 {
3410        return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
3411            format!("WAL truncated record: {} < 4 header bytes", tail.len()),
3412        )));
3413    }
3414    let raw_len = u32::from_le_bytes(tail[..4].try_into().unwrap());
3415    let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
3416    let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
3417    let len_mask = if is_v3 {
3418        !(WAL_V2_SENTINEL | WAL_V3_FLAG)
3419    } else {
3420        !WAL_V2_SENTINEL
3421    };
3422    let rec_len = (raw_len & len_mask) as usize;
3423    let header_len = if is_v3 {
3424        9
3425    } else if is_v2 {
3426        8
3427    } else {
3428        4
3429    };
3430    if tail.len() < header_len + rec_len {
3431        return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
3432            format!(
3433                "WAL truncated record: header+payload {} > available {}",
3434                header_len + rec_len,
3435                tail.len()
3436            ),
3437        )));
3438    }
3439    if is_v3 {
3440        let type_byte = tail[8];
3441        // v3 type 0x01 = auto_commit_sql (payload = SQL).
3442        // v3 type 0x02 = durability marker (no SQL to apply).
3443        // v4 type 0x10 = auto_commit_sql with 16-byte (lsn, ts)
3444        //                prefix between type and SQL — strip
3445        //                the prefix so the caller still sees raw
3446        //                SQL bytes.
3447        // Anything else is unknown.
3448        if type_byte == WAL_V3_TYPE_AUTO_COMMIT_SQL {
3449            let payload = &tail[header_len..header_len + rec_len];
3450            return Ok((payload.to_vec(), header_len + rec_len));
3451        }
3452        if type_byte == WAL_V4_TYPE_AUTO_COMMIT_SQL || type_byte == WAL_V4_TYPE_TX_COMMIT_SQL {
3453            let v4_total = header_len + WAL_V4_EXTRA_HEADER + rec_len;
3454            if tail.len() < v4_total {
3455                return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
3456                    format!(
3457                        "WAL truncated v4 record: header+payload {v4_total} > available {}",
3458                        tail.len()
3459                    ),
3460                )));
3461            }
3462            let sql_start = header_len + WAL_V4_EXTRA_HEADER;
3463            let sql_bytes = tail[sql_start..sql_start + rec_len].to_vec();
3464            return Ok((sql_bytes, v4_total));
3465        }
3466        // Caller treats empty payload as a skip-marker.
3467        return Ok((Vec::new(), header_len + rec_len));
3468    }
3469    let payload = &tail[header_len..header_len + rec_len];
3470    Ok((payload.to_vec(), header_len + rec_len))
3471}
3472
3473impl Drop for Database {
3474    fn drop(&mut self) {
3475        // v7.1 — best-effort final checkpoint when a persistent
3476        // Database leaves scope. Failures here go to stderr so
3477        // operators see them, but Drop can't propagate errors —
3478        // the WAL itself is already durable, so a checkpoint
3479        // miss only means the next boot replays a few more
3480        // records than strictly necessary.
3481        if self.persistence.is_some() {
3482            if let Err(e) = self.checkpoint() {
3483                eprintln!(
3484                    "spg-embedded: final checkpoint on Drop failed: {e:?} \
3485                     (WAL is intact; next open_path will replay)"
3486                );
3487            }
3488        }
3489        // v7.19 P3 / v7.20 — signal the retention + flusher
3490        // threads to exit, then wait for them. Done BEFORE the
3491        // lock release so background threads don't outlive the
3492        // database handle. The flusher drains the pending batch
3493        // on its way out (final flush_now in the thread body),
3494        // so `SPG_SYNCHRONOUS_COMMIT=off` never loses confirmed
3495        // commits across a clean shutdown.
3496        if let Some(ctx) = self.persistence.as_mut() {
3497            if let Some(shutdown) = ctx.retention_shutdown.take() {
3498                shutdown.store(true, Ordering::SeqCst);
3499            }
3500            if let Some(handle) = ctx.retention_thread.take() {
3501                let _ = handle.join();
3502            }
3503            if let Some(shutdown) = ctx.flusher_shutdown.take() {
3504                shutdown.store(true, Ordering::SeqCst);
3505            }
3506            if let Some(handle) = ctx.flusher_thread.take() {
3507                let _ = handle.join();
3508            }
3509            // CoW-2 (v7.34) — final checkpoint above left the worker
3510            // idle; explicitly drop it here so its shutdown signal +
3511            // thread join happens with a deterministic ordering (before
3512            // the lock release / persistence drop), not whenever Rust
3513            // happens to drop the PersistenceCtx fields.
3514            ctx.checkpoint_worker = None;
3515        }
3516        // v7.17.0 Phase 6.2 — release the cross-process lock on
3517        // clean shutdown. Failure is logged but never panics;
3518        // the operator can clear a stale lock via
3519        // `Database::force_unlock` if a crash kept the
3520        // directory around.
3521        if let Some(ctx) = &self.persistence
3522            && ctx.lock_path.exists()
3523        {
3524            // remove_dir_all: the lock dir carries the owner-pid
3525            // record since round-12.
3526            if let Err(e) = std::fs::remove_dir_all(&ctx.lock_path) {
3527                eprintln!(
3528                    "spg-embedded: lock release on Drop failed for {}: {e:?}",
3529                    ctx.lock_path.display()
3530                );
3531            }
3532        }
3533    }
3534}
3535
3536impl Database {
3537    /// v7.17.0 Phase 6.2 — clear a stale cross-process lock.
3538    /// Use when a previous process crashed mid-session and
3539    /// left `<db_path>.lock` behind. Operators should confirm
3540    /// no other process is currently using the database before
3541    /// calling this — SPG cannot fingerprint stale-vs-live
3542    /// without a libc dep, which would violate spg-embedded's
3543    /// zero-deps charter.
3544    pub fn force_unlock(db_path: impl AsRef<Path>) -> Result<(), EngineError> {
3545        let lock_path = {
3546            let mut p = db_path.as_ref().to_path_buf();
3547            let name = p
3548                .file_name()
3549                .map(|n| {
3550                    let mut s = n.to_os_string();
3551                    s.push(".lock");
3552                    s
3553                })
3554                .unwrap_or_else(|| std::ffi::OsString::from(".lock"));
3555            p.set_file_name(name);
3556            p
3557        };
3558        if !lock_path.exists() {
3559            return Ok(());
3560        }
3561        std::fs::remove_dir_all(&lock_path).map_err(io_err)
3562    }
3563}
3564
3565/// v7.1 — turn a `std::io::Error` into the workspace's
3566/// `EngineError` shape. `EngineError::Storage(Corrupt(_))` is
3567/// the closest existing variant — io failures during boot or
3568/// during a WAL append surface as a storage-layer fault to
3569/// callers, which keeps the public error enum unchanged.
3570fn io_err(e: std::io::Error) -> EngineError {
3571    EngineError::Storage(spg_storage::StorageError::Corrupt(format!("io: {e}")))
3572}
3573
3574/// v7.2.2 — `Database` is `Send`, so the recommended sharing
3575/// pattern for multi-threaded callers is `Arc<Mutex<Database>>`:
3576///
3577/// ```no_run
3578/// use std::sync::{Arc, Mutex};
3579/// use spg_embedded::Database;
3580///
3581/// let db = Database::open_in_memory();
3582/// let shared = Arc::new(Mutex::new(db));
3583/// let shared_for_worker = Arc::clone(&shared);
3584/// std::thread::spawn(move || {
3585///     let mut guard = shared_for_worker.lock().unwrap();
3586///     guard.execute("INSERT INTO t VALUES (1)").unwrap();
3587/// });
3588/// ```
3589///
3590/// Internal `RwLock`-wrapped state — letting many threads
3591/// hold concurrent `&Database` for `SELECT` without contending
3592/// — is parked as STABILITY § "Out of v7.2"; multi-reader
3593/// embedded throughput needs a planner-side change to release
3594/// the engine read lock between scans, which is the v7.x
3595/// "Choice A" line of work already documented in v6.9.1's
3596/// carve-out.
3597#[allow(dead_code)]
3598fn _database_is_send() {
3599    fn assert_send<T: Send>() {}
3600    assert_send::<Database>();
3601}
3602
3603/// v6.10.3 — trait that maps a row's columns onto a user
3604/// struct's fields. v7.3.0 ships the [`spg_row!`] declarative
3605/// macro that generates `impl FromSpgRow for YourStruct` from
3606/// a struct definition (no proc-macro, no syn/quote/
3607/// proc-macro2 deps — the workspace's "0 external deps"
3608/// policy holds).
3609///
3610/// Implementors map a row's columns onto a user struct's
3611/// fields. Errors surface as `EngineError::Unsupported` so the
3612/// caller's error type stays uniform.
3613pub trait FromSpgRow: Sized {
3614    /// Decode one query result row into `Self`. Called once per
3615    /// row by [`Database::query_typed`]. The slice length equals
3616    /// the number of columns in the SELECT projection.
3617    fn from_spg_row(row: &[Value]) -> Result<Self, EngineError>;
3618}
3619
3620/// v7.3.0 — declarative macro that generates `FromSpgRow` impl
3621/// for a user struct. Avoids proc-macro deps
3622/// (syn/quote/proc-macro2) so the workspace's 0-deps policy
3623/// holds; the trade-off vs `#[derive(SpgRow)]` is that the
3624/// macro takes the entire struct definition (fields + types)
3625/// as input rather than annotating an existing struct.
3626///
3627/// ```no_run
3628/// use spg_embedded::{Database, spg_row, FromSpgRow};
3629///
3630/// spg_row! {
3631///     pub struct User {
3632///         pub id: i32,
3633///         pub name: String,
3634///     }
3635/// }
3636///
3637/// let mut db = Database::open_in_memory();
3638/// db.execute("CREATE TABLE users (id INT NOT NULL, name TEXT)").unwrap();
3639/// db.execute("INSERT INTO users VALUES (1, 'alice')").unwrap();
3640/// let users: Vec<User> = db.query_typed("SELECT id, name FROM users").unwrap();
3641/// ```
3642///
3643/// Supported field types: `i16`, `i32`, `i64`, `f32`, `f64`,
3644/// `bool`, `String`, `Vec<f32>` (for `VECTOR(N)` columns),
3645/// `Option<T>` of any of the above.
3646#[macro_export]
3647macro_rules! spg_row {
3648    (
3649        $(#[$meta:meta])*
3650        $vis:vis struct $name:ident {
3651            $(
3652                $(#[$fmeta:meta])*
3653                $fvis:vis $field:ident : $ty:ty,
3654            )*
3655        }
3656    ) => {
3657        $(#[$meta])*
3658        #[derive(Debug, Clone)]
3659        $vis struct $name {
3660            $(
3661                $(#[$fmeta])*
3662                $fvis $field : $ty,
3663            )*
3664        }
3665
3666        impl $crate::FromSpgRow for $name {
3667            fn from_spg_row(row: &[$crate::Value]) -> ::core::result::Result<Self, $crate::EngineError> {
3668                let mut __spg_row_iter = row.iter();
3669                $(
3670                    let $field: $ty = {
3671                        let v = __spg_row_iter
3672                            .next()
3673                            .ok_or_else(|| $crate::EngineError::Unsupported(
3674                                ::std::format!(
3675                                    "spg_row! {}: missing column for field `{}`",
3676                                    ::core::stringify!($name),
3677                                    ::core::stringify!($field)
3678                                )
3679                            ))?;
3680                        <$ty as $crate::FromSpgValue>::from_spg_value(v)
3681                            .map_err(|e| $crate::EngineError::Unsupported(
3682                                ::std::format!(
3683                                    "spg_row! {}: column `{}`: {}",
3684                                    ::core::stringify!($name),
3685                                    ::core::stringify!($field),
3686                                    e
3687                                )
3688                            ))?
3689                    };
3690                )*
3691                Ok(Self { $($field,)* })
3692            }
3693        }
3694    };
3695}
3696
3697/// v7.3.0 — per-column decoder used by `spg_row!`. Surface
3698/// covers every numeric / text / bytes / bool variant in
3699/// `Value`, plus `Option<T>` for nullable columns.
3700pub trait FromSpgValue: Sized {
3701    /// Decode one cell into `Self`. The returned `&'static str`
3702    /// is a short diagnostic for type mismatches (e.g. `"expected
3703    /// integer, got TEXT"`); callers wrap it into their own
3704    /// error type.
3705    fn from_spg_value(v: &Value) -> Result<Self, &'static str>;
3706}
3707
3708macro_rules! impl_from_value_int {
3709    ($($t:ty),* $(,)?) => {
3710        $(
3711            impl FromSpgValue for $t {
3712                fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3713                    match v {
3714                        Value::SmallInt(n) => <$t>::try_from(*n).map_err(|_| "SmallInt does not fit target int type"),
3715                        Value::Int(n)      => <$t>::try_from(*n).map_err(|_| "Int does not fit target int type"),
3716                        Value::BigInt(n)   => <$t>::try_from(*n).map_err(|_| "BigInt does not fit target int type"),
3717                        Value::Null        => Err("NULL in non-Option int column"),
3718                        _ => Err("non-integer value in int column"),
3719                    }
3720                }
3721            }
3722        )*
3723    };
3724}
3725impl_from_value_int!(i16, i32, i64);
3726
3727impl FromSpgValue for f32 {
3728    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3729        match v {
3730            Value::Float(f) => Ok(*f as f32),
3731            Value::Null => Err("NULL in non-Option float column"),
3732            _ => Err("non-float value in float column"),
3733        }
3734    }
3735}
3736
3737impl FromSpgValue for f64 {
3738    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3739        match v {
3740            Value::Float(f) => Ok(*f),
3741            Value::Null => Err("NULL in non-Option float column"),
3742            _ => Err("non-float value in float column"),
3743        }
3744    }
3745}
3746
3747impl FromSpgValue for bool {
3748    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3749        match v {
3750            Value::Bool(b) => Ok(*b),
3751            Value::Null => Err("NULL in non-Option bool column"),
3752            _ => Err("non-bool value in bool column"),
3753        }
3754    }
3755}
3756
3757impl FromSpgValue for String {
3758    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3759        match v {
3760            Value::Text(s) => Ok(s.clone()),
3761            Value::Null => Err("NULL in non-Option text column"),
3762            _ => Err("non-text value in String column"),
3763        }
3764    }
3765}
3766
3767impl FromSpgValue for Vec<f32> {
3768    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3769        match v {
3770            Value::Vector(xs) => Ok(xs.clone()),
3771            Value::Null => Err("NULL in non-Option vector column"),
3772            _ => Err("non-vector value in Vec<f32> column"),
3773        }
3774    }
3775}
3776
3777impl<T: FromSpgValue> FromSpgValue for Option<T> {
3778    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3779        match v {
3780            Value::Null => Ok(None),
3781            other => T::from_spg_value(other).map(Some),
3782        }
3783    }
3784}
3785
3786/// Acquire the cross-process exclusion lock at `lock_path` (atomic
3787/// `mkdir`), recording the owner pid inside. If the lock already
3788/// exists, read the recorded pid and probe liveness — a lock left
3789/// behind by a killed process (docker SIGKILL, crash) is reclaimed
3790/// automatically instead of forcing the operator to delete it by
3791/// hand (mailrs embed round-12: a restarted server came up in
3792/// degraded mode because the previous instance's lock survived).
3793/// v7.27 (mailrs round-21 B) — the prober's environment identity:
3794/// `(hostname, boot-or-container id)`. A pid is only meaningful
3795/// inside the PID namespace that recorded it; mailrs's recovery
3796/// window saw "locked by pid 1" from a STOPPED container because
3797/// the prober's pid 1 (its own init) was alive. When the lock's
3798/// identity differs from ours, liveness is UNDECIDABLE and we
3799/// refuse honestly instead of guessing in either direction.
3800fn host_identity() -> (String, String) {
3801    let hostname = std::process::Command::new("hostname")
3802        .output()
3803        .ok()
3804        .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string())
3805        .unwrap_or_default();
3806    // Linux boot id; containers share the host kernel's boot id, so
3807    // hostname (= container id by default) is the namespace
3808    // discriminator and boot id catches host reboots / pid reuse.
3809    let boot_id = std::fs::read_to_string("/proc/sys/kernel/random/boot_id")
3810        .map(|s| s.trim().to_string())
3811        .or_else(|_| {
3812            std::process::Command::new("sysctl")
3813                .args(["-n", "kern.bootsessionuuid"])
3814                .output()
3815                .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string())
3816        })
3817        .unwrap_or_default();
3818    (hostname, boot_id)
3819}
3820
3821/// v7.34 (crash-recovery P0 #2) — process start-time, to tell a reused
3822/// pid apart from a genuinely-held lock. In a container the holder is
3823/// always pid 1; `docker start` reuses the container so the NEW process
3824/// is pid 1 too, on the same host+boot id — a bare `pid_alive(1)` probe
3825/// (`ps -p 1` always succeeds) reads a dead owner's lock as live and the
3826/// engine self-deadlocks on its own catalog. The `(pid, start-time)`
3827/// pair is unique per live process within a boot: a reused pid carries a
3828/// LATER start-time, so a mismatch means the recorded owner is gone.
3829/// Linux reads `/proc/<pid>/stat` field 22 (clock ticks since boot);
3830/// `comm` (field 2) is parenthesised and may contain spaces, so fields
3831/// are taken after the LAST ')'. Other platforms return None and the
3832/// liveness check falls back to pid-alive + the self-pid reclaim. Pure
3833/// std — no libc.
3834#[cfg(target_os = "linux")]
3835fn process_start_time(pid: u32) -> Option<String> {
3836    let stat = std::fs::read_to_string(format!("/proc/{pid}/stat")).ok()?;
3837    let after = stat.rsplit_once(')').map(|(_, rest)| rest)?;
3838    // After comm: state(1) ppid(2) … starttime is the 20th token.
3839    after.split_whitespace().nth(19).map(str::to_string)
3840}
3841
3842#[cfg(not(target_os = "linux"))]
3843fn process_start_time(_pid: u32) -> Option<String> {
3844    None
3845}
3846
3847fn acquire_path_lock(lock_path: &Path) -> Result<(), EngineError> {
3848    for attempt in 0..2 {
3849        match std::fs::create_dir(lock_path) {
3850            Ok(()) => {
3851                // Best-effort owner record; liveness probing treats a
3852                // missing pid file as stale (crash between mkdir and
3853                // write is indistinguishable from an ancient lock).
3854                // v7.27 — lines 2+3 record the owner's environment
3855                // identity (hostname, boot id) so a prober in a
3856                // different namespace refuses instead of misreading
3857                // the pid. v7.34 — line 4 records the owner's process
3858                // start-time so a reused pid (container pid-1 restart)
3859                // is distinguishable from a live holder.
3860                let (host, boot) = host_identity();
3861                let start = process_start_time(std::process::id()).unwrap_or_default();
3862                let _ = std::fs::write(
3863                    lock_path.join("pid"),
3864                    format!("{}\n{host}\n{boot}\n{start}\n", std::process::id()),
3865                );
3866                return Ok(());
3867            }
3868            Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists && attempt == 0 => {
3869                let record = std::fs::read_to_string(lock_path.join("pid")).unwrap_or_default();
3870                let mut lines = record.lines();
3871                let owner = lines.next().and_then(|s| s.trim().parse::<u32>().ok());
3872                let lock_host = lines.next().unwrap_or("").trim().to_string();
3873                let lock_boot = lines.next().unwrap_or("").trim().to_string();
3874                let lock_start = lines.next().unwrap_or("").trim().to_string();
3875                // v7.27 — identity check BEFORE the pid probe. A pid
3876                // recorded in another namespace is undecidable both
3877                // ways (a stale lock can look held, a held lock can
3878                // look stale — the unsafe direction). Old-format
3879                // locks (pid only) keep the legacy same-host
3880                // assumption.
3881                if !lock_host.is_empty() {
3882                    let (my_host, my_boot) = host_identity();
3883                    let same_env = lock_host == my_host
3884                        && (lock_boot.is_empty() || my_boot.is_empty() || lock_boot == my_boot);
3885                    if !same_env {
3886                        return Err(EngineError::Unsupported(format!(
3887                            "database lock {} was taken in a different host/container \
3888                             (owner: pid {} on {:?}; we are {:?}) — liveness is \
3889                             undecidable from here. If you are sure the owner is gone, \
3890                             call Database::force_unlock() or `spg import --force-unlock`.",
3891                            lock_path.display(),
3892                            owner.unwrap_or(0),
3893                            lock_host,
3894                            my_host
3895                        )));
3896                    }
3897                }
3898                // v7.34 (crash-recovery P0 #2) — pid-reuse-safe liveness.
3899                // A bare `pid_alive` self-deadlocks in a container: the
3900                // dead owner was pid 1, `docker start` reuses the container
3901                // so the prober is pid 1 too, and `ps -p 1` always succeeds.
3902                // The recorded (pid, start-time) pair settles it — the
3903                // owner is alive ONLY if its pid is alive AND its CURRENT
3904                // start-time still matches the recorded one:
3905                //  - container restart: pid 1 alive, but the new pid-1's
3906                //    start-time differs from the dead owner's → stale.
3907                //  - genuine double-open (same live process): start-time
3908                //    matches (it wrote it) → held — correctly refused, so a
3909                //    second writer can't steal a live lock.
3910                // An empty/uncomparable start-time (old-format lock or a
3911                // non-Linux owner with no /proc) falls back to the
3912                // pid-alive answer (the pre-v7.34 behaviour).
3913                let owner_alive = owner.is_some_and(|p| {
3914                    pid_alive(p)
3915                        && match process_start_time(p) {
3916                            Some(now) if !lock_start.is_empty() => now == lock_start,
3917                            _ => true,
3918                        }
3919                });
3920                if owner_alive {
3921                    return Err(EngineError::Unsupported(format!(
3922                        "database is locked by another process (pid {}): {}; \
3923                         stop that process first, or call Database::force_unlock()",
3924                        owner.unwrap_or(0),
3925                        lock_path.display()
3926                    )));
3927                }
3928                // Stale — owner pid dead, reused, or unrecorded. Reclaim.
3929                eprintln!(
3930                    "spg-embedded: reclaiming stale lock {} (owner pid {:?} not a live holder)",
3931                    lock_path.display(),
3932                    owner
3933                );
3934                std::fs::remove_dir_all(lock_path).map_err(io_err)?;
3935                // Loop retries the create_dir; a concurrent reclaimer
3936                // winning the race surfaces as AlreadyExists on
3937                // attempt 1 below.
3938            }
3939            Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
3940                return Err(EngineError::Unsupported(format!(
3941                    "database is locked by another process: {}; \
3942                     stop that process first, or call Database::force_unlock()",
3943                    lock_path.display()
3944                )));
3945            }
3946            Err(e) => return Err(io_err(e)),
3947        }
3948    }
3949    unreachable!("acquire_path_lock loop covers both attempts")
3950}
3951
3952/// Probe whether `pid` is a live process. Unix: `ps -p` via the
3953/// system binary (std-only — no libc dependency). `ps -p` exits 0
3954/// for ANY live pid regardless of owner; `kill -0` was rejected
3955/// here because it fails with EPERM on another user's live process,
3956/// which would read as "dead" and reclaim a held lock. Probe
3957/// failure (no `ps` binary, exec error) conservatively reports
3958/// alive so locks are never auto-reclaimed on doubt; non-unix
3959/// targets do the same.
3960#[cfg(unix)]
3961fn pid_alive(pid: u32) -> bool {
3962    match std::process::Command::new("ps")
3963        .arg("-p")
3964        .arg(pid.to_string())
3965        .stdout(std::process::Stdio::null())
3966        .stderr(std::process::Stdio::null())
3967        .status()
3968    {
3969        Ok(status) => status.success(),
3970        Err(_) => true,
3971    }
3972}
3973
3974#[cfg(not(unix))]
3975fn pid_alive(_pid: u32) -> bool {
3976    true
3977}
3978
3979/// Strip leading whitespace, `--` line comments and NON-conditional
3980/// block comments from a chunk so statement-head checks (COPY
3981/// detection most notably) see the first real token. pg_dump
3982/// prefixes every data block with a `-- Data for Name: …;` banner —
3983/// which itself contains semicolons, so head checks must run on the
3984/// stripped text. MySQL executable conditional comments (`/*!`) are
3985/// content and stay.
3986/// v7.22 — see `split_statements`' `mysql_escapes` tracking. Only
3987/// short chunks are inspected (the signal statements are one-liners;
3988/// COPY data blocks are skipped by the length guard).
3989fn note_dialect_signals(chunk: &str, mysql_escapes: &mut bool) {
3990    if chunk.len() > 4096 {
3991        return;
3992    }
3993    let lower = chunk.to_ascii_lowercase();
3994    if lower.contains("sql_mode") {
3995        *mysql_escapes = true;
3996    } else if lower.contains("standard_conforming_strings") {
3997        *mysql_escapes = lower.contains("off");
3998    }
3999}
4000
4001fn strip_leading_sql_noise(mut s: &str) -> &str {
4002    loop {
4003        let t = s.trim_start();
4004        if let Some(rest) = t.strip_prefix("--") {
4005            s = rest.split_once('\n').map_or("", |(_, r)| r);
4006            continue;
4007        }
4008        if t.starts_with("/*") && !t.starts_with("/*!") {
4009            match t.find("*/") {
4010                Some(e) => {
4011                    s = &t[e + 2..];
4012                    continue;
4013                }
4014                None => return "",
4015            }
4016        }
4017        return t;
4018    }
4019}
4020
4021/// Split a multi-statement SQL script into individual statements on
4022/// top-level `;`, honouring single-quoted strings (with `''`
4023/// escapes), double-quoted identifiers, dollar-quoted bodies
4024/// (`$tag$ … $tag$`), line comments (`--`) and MySQL executable
4025/// conditional comments (`/*!… */` stay statement content; plain
4026/// nested block comments don't). Chunks that contain no statement
4027/// content (whitespace / comments only) are dropped. PG's
4028/// simple-query protocol does this server-side; the embed path owns
4029/// it here.
4030///
4031/// v7.22 (mailrs round-13 gap 1) — psql meta-command lines are
4032/// dropped for client parity: a line whose first non-whitespace
4033/// byte is `\` BETWEEN statements (PG 18's pg_dump wraps scripts in
4034/// `\restrict` / `\unrestrict`) never reaches the parser, the same
4035/// way psql consumes `\`-lines client-side and never sends them. A
4036/// mid-statement backslash stays an ordinary byte — pg_dump only
4037/// emits meta-commands between statements.
4038pub fn split_statements(sql: &str) -> Vec<&str> {
4039    let bytes = sql.as_bytes();
4040    let mut stmts = Vec::new();
4041    let mut start = 0usize;
4042    let mut has_content = false;
4043    // v7.22 (round-13 T3) — stream-tracked string dialect, mirroring
4044    // the engine's session flag: a statement mentioning `sql_mode`
4045    // (mysqldump preamble, often inside `/*!…*/`) switches plain
4046    // strings to backslash-escape scanning;
4047    // `standard_conforming_strings` (pg_dump preamble) switches
4048    // back. Without this the scanner ends a MySQL `'…\'…'` literal
4049    // early and splits inside data.
4050    let mut mysql_escapes = false;
4051    let mut i = 0usize;
4052    while i < bytes.len() {
4053        match bytes[i] {
4054            b'\\' if !has_content => {
4055                // Start-of-statement `\` = psql meta-command line.
4056                // Consume through end-of-line; restart the chunk
4057                // after it so the line never lands in the output.
4058                while i < bytes.len() && bytes[i] != b'\n' {
4059                    i += 1;
4060                }
4061                start = if i < bytes.len() { i + 1 } else { i };
4062            }
4063            b'\'' => {
4064                has_content = true;
4065                // PG escape-string form `E'...'` honours backslash
4066                // escapes (`E'a\';b'` is ONE literal) — detect via
4067                // the immediately-preceding standalone E/e. MySQL
4068                // dialect sessions treat EVERY plain string that way.
4069                let escape_string = mysql_escapes
4070                    || (i >= 1
4071                        && matches!(bytes[i - 1], b'e' | b'E')
4072                        && !(i >= 2
4073                            && (bytes[i - 2].is_ascii_alphanumeric() || bytes[i - 2] == b'_')));
4074                i += 1;
4075                while i < bytes.len() {
4076                    if escape_string && bytes[i] == b'\\' {
4077                        // Skip the escaped byte (covers \' and \\).
4078                        i += 2;
4079                        continue;
4080                    }
4081                    if bytes[i] == b'\'' {
4082                        // `''` is an escaped quote inside the literal.
4083                        if i + 1 < bytes.len() && bytes[i + 1] == b'\'' {
4084                            i += 2;
4085                            continue;
4086                        }
4087                        break;
4088                    }
4089                    i += 1;
4090                }
4091            }
4092            b'"' => {
4093                has_content = true;
4094                i += 1;
4095                while i < bytes.len() && bytes[i] != b'"' {
4096                    i += 1;
4097                }
4098            }
4099            b'$' => {
4100                // Possible dollar-quote opener `$tag$` (tag may be
4101                // empty). If the shape doesn't match, it's a plain
4102                // `$` (positional param) — fall through.
4103                let tag_end = bytes[i + 1..]
4104                    .iter()
4105                    .position(|&b| !(b.is_ascii_alphanumeric() || b == b'_'))
4106                    .map(|off| i + 1 + off);
4107                if let Some(te) = tag_end
4108                    && te < bytes.len()
4109                    && bytes[te] == b'$'
4110                {
4111                    has_content = true;
4112                    let tag = &sql[i..=te];
4113                    // Find the closing `$tag$`.
4114                    if let Some(close) = sql[te + 1..].find(tag) {
4115                        i = te + 1 + close + tag.len();
4116                        continue;
4117                    }
4118                    // Unterminated — consume the rest; the parser
4119                    // will report it.
4120                    i = bytes.len();
4121                    continue;
4122                }
4123                has_content = true;
4124            }
4125            b'-' if i + 1 < bytes.len() && bytes[i + 1] == b'-' => {
4126                while i < bytes.len() && bytes[i] != b'\n' {
4127                    i += 1;
4128                }
4129            }
4130            b'/' if i + 1 < bytes.len() && bytes[i + 1] == b'*' => {
4131                // v7.22 (round-13 T3) — MySQL conditional comments
4132                // `/*!40101 … */` are EXECUTABLE (mysqldump wraps
4133                // its whole preamble + DISABLE KEYS hints in them);
4134                // they must stay statement content for the engine,
4135                // not be skipped as commentary.
4136                if i + 2 < bytes.len() && bytes[i + 2] == b'!' {
4137                    has_content = true;
4138                }
4139                let mut depth = 1usize;
4140                i += 2;
4141                while i < bytes.len() && depth > 0 {
4142                    if bytes[i] == b'/' && i + 1 < bytes.len() && bytes[i + 1] == b'*' {
4143                        depth += 1;
4144                        i += 2;
4145                    } else if bytes[i] == b'*' && i + 1 < bytes.len() && bytes[i + 1] == b'/' {
4146                        depth -= 1;
4147                        i += 2;
4148                    } else {
4149                        i += 1;
4150                    }
4151                }
4152                continue;
4153            }
4154            b';' => {
4155                if has_content {
4156                    let head = &sql[start..i];
4157                    // v7.22 (round-13 T2) — a `COPY … FROM stdin;`
4158                    // statement owns its following data block
4159                    // through the `\.` terminator line (data lines
4160                    // may contain `;`, so generic splitting would
4161                    // shred them). Swallow head + data into ONE
4162                    // chunk; `execute_script` lowers it to INSERTs.
4163                    // pg_dump prefixes the COPY with a comment
4164                    // banner — strip it before the head check.
4165                    let head_clean = strip_leading_sql_noise(head);
4166                    let is_copy_head = head_clean
4167                        .get(..4)
4168                        .is_some_and(|p| p.eq_ignore_ascii_case("copy"))
4169                        && spg_engine::copy::parse_copy_from_stdin_head(head_clean).is_some();
4170                    if is_copy_head {
4171                        // Scan whole lines after the ';' until the
4172                        // `\.` terminator (or EOF — torn dumps lose
4173                        // their tail, same as psql would error).
4174                        let mut j = i + 1;
4175                        let data_end;
4176                        loop {
4177                            if j >= bytes.len() {
4178                                data_end = bytes.len();
4179                                break;
4180                            }
4181                            let line_end = sql[j..].find('\n').map_or(bytes.len(), |off| j + off);
4182                            if sql[j..line_end].trim_end_matches('\r').trim() == "\\." {
4183                                data_end = j;
4184                                i = line_end; // bottom i += 1 skips \n
4185                                break;
4186                            }
4187                            j = line_end + 1;
4188                        }
4189                        stmts.push(&sql[start..data_end]);
4190                        if data_end == bytes.len() {
4191                            i = bytes.len();
4192                        }
4193                        start = i + 1;
4194                        has_content = false;
4195                        i += 1;
4196                        continue;
4197                    }
4198                    note_dialect_signals(head, &mut mysql_escapes);
4199                    stmts.push(head);
4200                }
4201                start = i + 1;
4202                has_content = false;
4203            }
4204            b => {
4205                if !b.is_ascii_whitespace() {
4206                    has_content = true;
4207                }
4208            }
4209        }
4210        i += 1;
4211    }
4212    if has_content {
4213        stmts.push(&sql[start..]);
4214    }
4215    stmts
4216}
4217
4218#[cfg(test)]
4219mod tests {
4220    use super::*;
4221
4222    #[test]
4223    fn split_statements_basic_and_trailing() {
4224        assert_eq!(
4225            split_statements("CREATE TABLE a (x INT); INSERT INTO a VALUES (1)"),
4226            vec!["CREATE TABLE a (x INT)", " INSERT INTO a VALUES (1)"]
4227        );
4228        // whitespace/comment-only chunks drop
4229        assert!(split_statements("  ;; -- nothing\n;").is_empty());
4230    }
4231
4232    #[test]
4233    fn split_statements_quoting_forms() {
4234        // ';' inside a plain literal, a doubled quote, an E-string
4235        // backslash escape, a quoted identifier, and a dollar-quoted
4236        // body must not split.
4237        let cases = [
4238            "INSERT INTO t VALUES ('a;b')",
4239            "INSERT INTO t VALUES ('it''s; fine')",
4240            r"INSERT INTO t VALUES (E'it\'s; fine')",
4241            "CREATE TABLE \"odd;name\" (x INT)",
4242            "DO $body$ BEGIN PERFORM 1; END $body$",
4243            "DO $$ SELECT 1; $$",
4244        ];
4245        for sql in cases {
4246            assert_eq!(split_statements(sql), vec![sql], "must stay whole: {sql}");
4247        }
4248        // ...and each still splits cleanly from a neighbour.
4249        for sql in cases {
4250            let script = format!("{sql};\nSELECT 2");
4251            assert_eq!(
4252                split_statements(&script),
4253                vec![sql, "\nSELECT 2"],
4254                "must split after: {sql}"
4255            );
4256        }
4257    }
4258
4259    #[test]
4260    fn split_statements_drops_psql_meta_lines() {
4261        // v7.22 round-13 gap 1 — PG 18 pg_dump wraps scripts in
4262        // `\restrict` / `\unrestrict`; psql parity = the lines never
4263        // reach the parser.
4264        let script = "\\restrict TOKEN123\nSELECT 1;\n\\unrestrict TOKEN123\nSELECT 2;\n\\.\n";
4265        assert_eq!(split_statements(script), vec!["SELECT 1", "SELECT 2"]);
4266        // Mid-statement backslash is NOT a meta-command.
4267        let s2 = r"SELECT E'a\\b'";
4268        assert_eq!(split_statements(s2), vec![s2]);
4269    }
4270
4271    #[test]
4272    fn split_statements_comments_hide_semicolons() {
4273        let script = "-- c1 ; still comment\nSELECT 1; /* a ; b /* nested ; */ */ SELECT 2";
4274        let got = split_statements(script);
4275        assert_eq!(got.len(), 2);
4276        assert!(got[0].contains("SELECT 1"));
4277        assert!(got[1].contains("SELECT 2"));
4278    }
4279
4280    #[test]
4281    fn in_memory_create_insert_select() {
4282        let mut db = Database::open_in_memory();
4283        db.execute("CREATE TABLE t (id INT NOT NULL, name TEXT)")
4284            .unwrap();
4285        db.execute("INSERT INTO t VALUES (1, 'alice')").unwrap();
4286        db.execute("INSERT INTO t VALUES (2, 'bob')").unwrap();
4287        let rows = db.query("SELECT id FROM t WHERE id = 1").unwrap();
4288        assert_eq!(rows.len(), 1);
4289        match &rows[0][0] {
4290            Value::Int(1) => {}
4291            other => panic!("expected Int(1), got {other:?}"),
4292        }
4293    }
4294
4295    #[test]
4296    fn query_on_non_select_errors() {
4297        let mut db = Database::open_in_memory();
4298        db.execute("CREATE TABLE t (id INT)").unwrap();
4299        let r = db.query("INSERT INTO t VALUES (1)");
4300        assert!(r.is_err(), "query() on INSERT must error");
4301    }
4302
4303    #[test]
4304    fn snapshot_roundtrip() {
4305        let mut db = Database::open_in_memory();
4306        db.execute("CREATE TABLE t (id INT NOT NULL)").unwrap();
4307        db.execute("INSERT INTO t VALUES (42)").unwrap();
4308        let bytes = db.snapshot();
4309        let mut restored = Database::restore(&bytes).unwrap();
4310        let rows = restored.query("SELECT id FROM t WHERE id = 42").unwrap();
4311        assert_eq!(rows.len(), 1);
4312        match &rows[0][0] {
4313            Value::Int(42) => {}
4314            other => panic!("expected Int(42), got {other:?}"),
4315        }
4316    }
4317
4318    #[test]
4319    fn from_spg_row_trait_shape() {
4320        struct User {
4321            _id: i32,
4322        }
4323        impl FromSpgRow for User {
4324            fn from_spg_row(row: &[Value]) -> Result<Self, EngineError> {
4325                match row.first() {
4326                    Some(Value::Int(n)) => Ok(Self { _id: *n }),
4327                    _ => Err(EngineError::Unsupported("bad id".into())),
4328                }
4329            }
4330        }
4331        let row = vec![Value::Int(7)];
4332        let _u = User::from_spg_row(&row).unwrap();
4333    }
4334}