Skip to main content

spg_embedded/
lib.rs

1// v7.7.2 — every public item in this crate must carry a
2// doc-comment; new code that adds a `pub` without one fails CI.
3#![deny(missing_docs)]
4
5//! # spg-embedded
6//!
7//! Ergonomic embedded-mode entry point for SPG. Wraps the
8//! `spg-engine` execution layer for in-process applications
9//! that don't want to spin up a TCP listener / fork to the
10//! `spg-server` binary.
11//!
12//! ## Quick start
13//!
14//! ```no_run
15//! use spg_embedded::Database;
16//!
17//! // On-disk, durable. WAL fsynced per commit; auto-checkpoint
18//! // at 4 MiB WAL by default.
19//! let mut db = Database::open_path("/data/app.db").unwrap();
20//! db.execute("CREATE TABLE users (id INT NOT NULL, name TEXT)").unwrap();
21//! db.execute("INSERT INTO users VALUES (1, 'alice')").unwrap();
22//! let rows = db.query("SELECT name FROM users WHERE id = 1").unwrap();
23//! for row in &rows {
24//!     println!("{:?}", row);
25//! }
26//! ```
27//!
28//! ## Production checklist (v7.5)
29//!
30//! - **Persistence**: `Database::open_path(p)` writes a
31//!   crash-consistent WAL + periodic checkpoint snapshot. The
32//!   on-disk format is byte-identical to what `spg-server`
33//!   produces, so a database can move between modes without
34//!   conversion.
35//! - **Durability**: every `execute()` that mutates calls
36//!   `fsync` before returning `Ok`. There is no group commit
37//!   in embedded mode — every commit pays one fsync. If you
38//!   need batch throughput, wrap multiple statements in
39//!   [`Database::with_transaction`] which fsyncs only at
40//!   commit.
41//! - **Concurrency**: [`Database`] is `Send` but **not** `Sync`.
42//!   Share across threads via `Arc<Mutex<Database>>`. The
43//!   single-writer model is intentional — see
44//!   [STABILITY § A1](https://github.com/lihao/spg/blob/master/STABILITY.md).
45//! - **Background work**: [`Database::spawn_background_freezer`]
46//!   moves cold rows to disk-resident segments while you keep
47//!   serving requests. It runs in a dedicated thread; drop the
48//!   returned [`FreezerHandle`] (or call `stop()`) for clean
49//!   shutdown.
50//! - **Errors**: all public enums ([`EngineError`],
51//!   [`QueryResult`], [`Value`]) are `#[non_exhaustive]`. Match
52//!   them with a wildcard arm so future v7.x releases can add
53//!   variants without breaking your code.
54//!
55//! ## Panic contract
56//!
57//! - **No `execute()` / `query()` call panics on user input.**
58//!   Malformed SQL, type mismatches, missing tables — all
59//!   return `Err(EngineError::…)`. If you observe a panic on
60//!   a user-controlled string, that is a bug; file an issue.
61//! - The library panics **only** on internal invariant
62//!   violations (e.g., catalog snapshot magic mismatch, WAL
63//!   record CRC sentinel corruption that survived the boot-
64//!   time validation). These represent silent disk corruption
65//!   and an unwind would leak inconsistent state, so the
66//!   release profile uses `panic = abort` — your host process
67//!   dies fast rather than continuing on poisoned data.
68//! - If you cannot tolerate `panic = abort`, build with
69//!   `--profile release-dbg` (keeps unwind tables) and use
70//!   `std::panic::catch_unwind` at your application boundary.
71//!
72//! ## Why a separate crate?
73//!
74//! `spg-engine` is `no_std`-compatible (vendored alloc-only).
75//! The embedded-mode entry point uses `std` (filesystem,
76//! threading), so it lives in its own crate to keep the
77//! `no_std` boundary clean.
78
79pub use spg_engine::{CatalogSnapshot, Engine, EngineError, ParsedStatement, QueryResult};
80pub use spg_storage::{ColumnSchema, DataType, Value};
81
82/// v7.16.0 — handle for a parsed-and-planned SQL statement.
83/// Hand off to [`Database::execute_prepared`] / [`Database::query_prepared`]
84/// with a `&[Value]` slice carrying the bind parameters (PG-style
85/// `$1`, `$2`, … positional). Cheap to `Clone`; the underlying AST
86/// is shared by handle copies and cloned per bind call by the
87/// engine's executor.
88///
89/// The handle holds a snapshot of the AST at prepare time. If
90/// the engine's plan cache evicts the entry between prepare and
91/// execute (e.g. ANALYZE bumps the statistics version) the
92/// stored AST keeps working — `execute_prepared` operates on
93/// the handle's clone, not the cache entry.
94#[derive(Debug, Clone)]
95pub struct Statement {
96    /// The parsed + planned AST. `spg-engine::prepare_cached`
97    /// returns it as a clone of the cached plan, so any rewrite
98    /// passes (`expand_group_by_all`, `reorder_joins`, …) have
99    /// already run.
100    pub(crate) stmt: ParsedStatement,
101    /// Original SQL source, kept for `Display` / debug only.
102    /// WAL persistence renders from the AST so a bind-time
103    /// rewrite of `$1..$N` survives replay.
104    pub(crate) sql: String,
105}
106
107impl Statement {
108    /// Borrow the original SQL source — useful for tracing and
109    /// debug logs. WAL replay does NOT use this; it serialises
110    /// the bind-final AST instead.
111    #[must_use]
112    pub fn sql(&self) -> &str {
113        &self.sql
114    }
115}
116
117/// v7.16.0 — internal WAL helper. Mirrors what
118/// `Engine::execute_prepared` does to the cloned AST so the WAL
119/// record carries the bind-final SQL text (so replay's
120/// simple-query path reconstructs the same row state without
121/// needing the original `Statement` handle to still be alive).
122/// Errors from the underlying engine helper would only fire if
123/// the bind-final stmt referenced a placeholder past the params
124/// slice — and that case has already errored in the executor
125/// above before this helper runs, so we discard the Result here.
126fn wal_render_with_params(stmt: &mut ParsedStatement, params: &[Value]) {
127    let _ = spg_engine::substitute_placeholders(stmt, params);
128}
129
130use std::collections::BTreeMap;
131use std::fs::{File, OpenOptions};
132use std::io::{Seek, SeekFrom, Write};
133use std::path::{Path, PathBuf};
134use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
135use std::sync::{Arc, Mutex};
136use std::thread::{self, JoinHandle};
137use std::time::{Duration, SystemTime, UNIX_EPOCH};
138
139/// v7.11.3 — wall-clock provider injected into every embedded
140/// `Engine`. Microseconds since the Unix epoch; clamps to
141/// `i64::MAX` if the system clock is far-future. Used by SQL's
142/// `NOW()` / `CURRENT_TIMESTAMP` / `CURRENT_DATE` rewrite layer
143/// so PG-idiomatic time queries work without the caller wiring
144/// their own clock.
145fn wall_clock_micros() -> i64 {
146    SystemTime::now()
147        .duration_since(UNIX_EPOCH)
148        .map_or(0, |d| i64::try_from(d.as_micros()).unwrap_or(i64::MAX))
149}
150
151use spg_manifest::{CatalogManifest, ColdSegmentEntry, manifest_path as spg_manifest_path};
152
153// -- v7.1 WAL format constants (mirror `spg-server`'s) ---------
154// Kept private so callers can't mis-frame records; the v3 layout
155// is the same the server uses, so a `spg-server` boot can read a
156// database an embedded process wrote and vice versa.
157const WAL_V2_SENTINEL: u32 = 0x8000_0000;
158const WAL_V3_FLAG: u32 = 0x4000_0000;
159const WAL_V3_TYPE_AUTO_COMMIT_SQL: u8 = 0x01;
160/// v7.18 — durability checkpoint marker stays at 0x02 (skipped on replay).
161const WAL_V3_TYPE_DURABILITY_CHECKPOINT: u8 = 0x02;
162/// v7.18 PITR — auto-commit-sql record with appended (commit_lsn,
163/// commit_unix_us) fields so replay can target a specific point in
164/// time. Backward-compat: v3 records (type 0x01) keep working, the
165/// envelope flag bits are unchanged. The new type byte is the
166/// schema-version discriminator.
167const WAL_V4_TYPE_AUTO_COMMIT_SQL: u8 = 0x10;
168/// v7.18 — sentinel for "no wall clock" inside a v4 record's
169/// commit_unix_us slot. Restore-to-timestamp skips records with
170/// this sentinel (no time anchor); LSN-based restore is
171/// unaffected.
172const WAL_V4_NO_CLOCK: i64 = i64::MIN;
173/// v7.18 — extra header bytes after the type byte in a v4 record:
174/// 8 bytes commit_lsn (u64 LE) + 8 bytes commit_unix_us (i64 LE).
175const WAL_V4_EXTRA_HEADER: usize = 16;
176/// v7.18 PITR — checkpoint anchor record written to the WAL *before*
177/// the snapshot file replaces the on-disk catalog. Carries the
178/// (lsn, ts, snapshot_path) triple so restore tooling can find the
179/// matching base snapshot without scanning the filesystem. Replay
180/// dispatch skips it (same as the v3 durability marker).
181const WAL_V4_TYPE_CHECKPOINT_MARKER: u8 = 0x11;
182
183/// v7.1 — auto-checkpoint threshold. Once the WAL grows past
184/// this many bytes, the next successful `execute()` call ends
185/// with a `checkpoint()` so the WAL stays bounded. Tunable via
186/// `SPG_EMBEDDED_CHECKPOINT_BYTES` env.
187fn default_checkpoint_threshold_bytes() -> u64 {
188    std::env::var("SPG_EMBEDDED_CHECKPOINT_BYTES")
189        .ok()
190        .and_then(|s| s.parse::<u64>().ok())
191        .filter(|&n| n > 0)
192        .unwrap_or(4 * 1024 * 1024)
193}
194
195/// v7.1 — encode one v3 `auto_commit_sql` record. Layout:
196///
197/// ```text
198/// [u32 LE (len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
199/// [u32 LE crc32 over (type_byte || sql_bytes)]
200/// [u8 type = 0x01]
201/// [sql bytes]
202/// ```
203fn encode_v3_auto_commit(sql: &str) -> Vec<u8> {
204    let payload = sql.as_bytes();
205    let mut crc_buf = Vec::with_capacity(1 + payload.len());
206    crc_buf.push(WAL_V3_TYPE_AUTO_COMMIT_SQL);
207    crc_buf.extend_from_slice(payload);
208    let crc = spg_crypto::crc32::crc32(&crc_buf);
209    let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
210    let mut out = Vec::with_capacity(4 + 4 + 1 + payload.len());
211    out.extend_from_slice(&header);
212    out.extend_from_slice(&crc.to_le_bytes());
213    out.push(WAL_V3_TYPE_AUTO_COMMIT_SQL);
214    out.extend_from_slice(payload);
215    out
216}
217
218/// v7.18 PITR — encode one v4 `checkpoint_marker` record. Layout:
219///
220/// ```text
221/// [u32 LE (payload_len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
222/// [u32 LE crc32 over (type_byte || payload)]
223/// [u8  type = 0x11]
224/// payload:
225///   [u64 LE checkpoint_lsn]
226///   [i64 LE checkpoint_unix_us  (WAL_V4_NO_CLOCK if no clock)]
227///   [u16 LE snapshot_path_len]
228///   [snapshot_path_bytes]
229/// ```
230///
231/// `payload_len` covers only the payload — keeping the framing
232/// uniform across v3 / v4 record types so torn-write detection in
233/// `replay_wal_into_engine` stays trivial.
234fn encode_v4_checkpoint_marker(
235    checkpoint_lsn: u64,
236    checkpoint_unix_us: i64,
237    snapshot_path: &Path,
238) -> Vec<u8> {
239    let snapshot_bytes = snapshot_path.to_string_lossy().into_owned();
240    let snap_payload = snapshot_bytes.as_bytes();
241    let snap_len_u16: u16 = snap_payload.len().min(u16::MAX as usize) as u16;
242    let mut payload = Vec::with_capacity(8 + 8 + 2 + snap_payload.len());
243    payload.extend_from_slice(&checkpoint_lsn.to_le_bytes());
244    payload.extend_from_slice(&checkpoint_unix_us.to_le_bytes());
245    payload.extend_from_slice(&snap_len_u16.to_le_bytes());
246    payload.extend_from_slice(&snap_payload[..snap_len_u16 as usize]);
247    let mut crc_buf = Vec::with_capacity(1 + payload.len());
248    crc_buf.push(WAL_V4_TYPE_CHECKPOINT_MARKER);
249    crc_buf.extend_from_slice(&payload);
250    let crc = spg_crypto::crc32::crc32(&crc_buf);
251    let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
252    let mut out = Vec::with_capacity(4 + 4 + 1 + payload.len());
253    out.extend_from_slice(&header);
254    out.extend_from_slice(&crc.to_le_bytes());
255    out.push(WAL_V4_TYPE_CHECKPOINT_MARKER);
256    out.extend_from_slice(&payload);
257    out
258}
259
260/// v7.18 PITR — encode one v4 `auto_commit_sql` record. Layout:
261///
262/// ```text
263/// [u32 LE (sql_len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
264/// [u32 LE crc32 over (type_byte || lsn || ts || sql_bytes)]
265/// [u8  type = 0x10]
266/// [u64 LE commit_lsn]
267/// [i64 LE commit_unix_us  (= WAL_V4_NO_CLOCK when no ClockFn)]
268/// [sql bytes]
269/// ```
270///
271/// `sql_len` field stays the SQL byte count — same shape as v3 — so
272/// replay-buffer torn-write detection compares against
273/// `WAL_V4_EXTRA_HEADER + sql_len`. v3 records (type 0x01) stay
274/// readable by the same loop with their original 9-byte header
275/// arithmetic.
276fn encode_v4_auto_commit(sql: &str, commit_lsn: u64, commit_unix_us: i64) -> Vec<u8> {
277    let payload = sql.as_bytes();
278    let mut crc_buf = Vec::with_capacity(1 + WAL_V4_EXTRA_HEADER + payload.len());
279    crc_buf.push(WAL_V4_TYPE_AUTO_COMMIT_SQL);
280    crc_buf.extend_from_slice(&commit_lsn.to_le_bytes());
281    crc_buf.extend_from_slice(&commit_unix_us.to_le_bytes());
282    crc_buf.extend_from_slice(payload);
283    let crc = spg_crypto::crc32::crc32(&crc_buf);
284    let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
285    let mut out = Vec::with_capacity(4 + 4 + 1 + WAL_V4_EXTRA_HEADER + payload.len());
286    out.extend_from_slice(&header);
287    out.extend_from_slice(&crc.to_le_bytes());
288    out.push(WAL_V4_TYPE_AUTO_COMMIT_SQL);
289    out.extend_from_slice(&commit_lsn.to_le_bytes());
290    out.extend_from_slice(&commit_unix_us.to_le_bytes());
291    out.extend_from_slice(payload);
292    out
293}
294
295/// v7.1 — decode + apply every record in `wal_bytes` to `engine`.
296/// Returns the count of records successfully applied. A truncated
297/// trailing record (mid-write torn) is dropped silently — the
298/// same recovery story `spg-server`'s boot path uses.
299fn replay_wal_into_engine(wal_bytes: &[u8], engine: &mut Engine) -> Result<usize, String> {
300    let mut applied = 0usize;
301    let mut cur = 0usize;
302    while cur < wal_bytes.len() {
303        if wal_bytes.len() - cur < 4 {
304            // Trailing partial header — torn write, drop and stop.
305            break;
306        }
307        let raw_len = u32::from_le_bytes(wal_bytes[cur..cur + 4].try_into().unwrap());
308        let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
309        let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
310        let len_mask = if is_v3 {
311            !(WAL_V2_SENTINEL | WAL_V3_FLAG)
312        } else {
313            !WAL_V2_SENTINEL
314        };
315        let rec_len = (raw_len & len_mask) as usize;
316        let header_len = if is_v3 {
317            9
318        } else if is_v2 {
319            8
320        } else {
321            4
322        };
323        if wal_bytes.len() - cur < header_len + rec_len {
324            // Torn record at the tail — drop, stop.
325            break;
326        }
327        if is_v3 {
328            let type_byte = wal_bytes[cur + 8];
329            match type_byte {
330                WAL_V3_TYPE_AUTO_COMMIT_SQL => {}
331                WAL_V3_TYPE_DURABILITY_CHECKPOINT => {
332                    // durability_checkpoint marker — skip, no SQL.
333                    cur += header_len + rec_len;
334                    continue;
335                }
336                WAL_V4_TYPE_CHECKPOINT_MARKER => {
337                    // v7.18 PITR — checkpoint anchor, skip on replay
338                    // (engine state past this point reflects the
339                    // matching snapshot already loaded by the caller).
340                    cur += header_len + rec_len;
341                    continue;
342                }
343                WAL_V4_TYPE_AUTO_COMMIT_SQL => {
344                    // v7.18 PITR — v4 record carries 16 bytes of
345                    // (commit_lsn, commit_unix_us) between the type
346                    // byte and the SQL payload. Replay reads them but
347                    // does not enforce them — the engine doesn't
348                    // surface LSN/clock here. Restore tooling
349                    // (spgctl) parses them via parse_wal_record below.
350                    let v4_total = header_len + WAL_V4_EXTRA_HEADER + rec_len;
351                    if wal_bytes.len() - cur < v4_total {
352                        // Torn v4 record at the tail — drop, stop.
353                        break;
354                    }
355                    let sql_start = cur + header_len + WAL_V4_EXTRA_HEADER;
356                    let sql_bytes = &wal_bytes[sql_start..sql_start + rec_len];
357                    let sql = std::str::from_utf8(sql_bytes).map_err(|e| {
358                        format!("WAL replay: non-UTF-8 SQL at offset {cur}: {e}")
359                    })?;
360                    engine.execute(sql).map_err(|e| {
361                        format!(
362                            "WAL replay: apply {sql:?} at offset {cur} rejected: {e:?}"
363                        )
364                    })?;
365                    applied += 1;
366                    cur += v4_total;
367                    continue;
368                }
369                other => {
370                    return Err(format!(
371                        "WAL replay: unknown v3 type byte {other:#04x} at offset {cur}"
372                    ));
373                }
374            }
375        }
376        let sql_bytes = &wal_bytes[cur + header_len..cur + header_len + rec_len];
377        let sql = std::str::from_utf8(sql_bytes)
378            .map_err(|e| format!("WAL replay: non-UTF-8 SQL at offset {cur}: {e}"))?;
379        engine
380            .execute(sql)
381            .map_err(|e| format!("WAL replay: apply {sql:?} at offset {cur} rejected: {e:?}"))?;
382        applied += 1;
383        cur += header_len + rec_len;
384    }
385    Ok(applied)
386}
387
388/// v7.18 PITR — parsed WAL record, surfaced for restore / verify
389/// tooling. The replay loop above doesn't expose LSN/timestamp;
390/// `spgctl restore --to <timestamp>` and `spgctl verify` need them.
391/// Returned offsets are byte-positions inside the WAL buffer.
392#[derive(Debug, Clone)]
393pub struct WalRecord<'a> {
394    /// Byte offset in the WAL buffer where this record starts.
395    pub offset: usize,
396    /// Type byte (0x01 = v3 auto-commit, 0x10 = v4 auto-commit,
397    /// 0x02 = durability checkpoint marker).
398    pub type_byte: u8,
399    /// `Some(lsn)` for v4 records, `None` for v3.
400    pub commit_lsn: Option<u64>,
401    /// `Some(unix_us)` for v4 records carrying a clock-set timestamp,
402    /// `None` for v3 or for v4 records explicitly written with
403    /// `WAL_V4_NO_CLOCK` (sentinel for "no ClockFn at commit time").
404    pub commit_unix_us: Option<i64>,
405    /// SQL payload as borrowed bytes. Empty for durability markers.
406    pub sql: &'a [u8],
407}
408
409/// v7.18 PITR — iterate over `wal_bytes` yielding one `WalRecord`
410/// per intact record. Torn-tail records terminate iteration
411/// silently (same recovery story as `replay_wal_into_engine`).
412/// Unknown type bytes inside a v3 envelope return `Err` so the
413/// caller knows the WAL was written by a newer SPG.
414pub fn parse_wal_records(wal_bytes: &[u8]) -> Result<Vec<WalRecord<'_>>, String> {
415    let mut out = Vec::new();
416    let mut cur = 0usize;
417    while cur < wal_bytes.len() {
418        if wal_bytes.len() - cur < 4 {
419            break;
420        }
421        let raw_len = u32::from_le_bytes(wal_bytes[cur..cur + 4].try_into().unwrap());
422        let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
423        let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
424        let len_mask = if is_v3 {
425            !(WAL_V2_SENTINEL | WAL_V3_FLAG)
426        } else {
427            !WAL_V2_SENTINEL
428        };
429        let rec_len = (raw_len & len_mask) as usize;
430        let header_len = if is_v3 {
431            9
432        } else if is_v2 {
433            8
434        } else {
435            4
436        };
437        if wal_bytes.len() - cur < header_len + rec_len {
438            break;
439        }
440        if !is_v3 {
441            // v1 / v2 records carry no type byte; treat as legacy
442            // auto-commit SQL with no LSN/time.
443            let sql = &wal_bytes[cur + header_len..cur + header_len + rec_len];
444            out.push(WalRecord {
445                offset: cur,
446                type_byte: WAL_V3_TYPE_AUTO_COMMIT_SQL,
447                commit_lsn: None,
448                commit_unix_us: None,
449                sql,
450            });
451            cur += header_len + rec_len;
452            continue;
453        }
454        let type_byte = wal_bytes[cur + 8];
455        match type_byte {
456            WAL_V3_TYPE_AUTO_COMMIT_SQL => {
457                let sql = &wal_bytes[cur + header_len..cur + header_len + rec_len];
458                out.push(WalRecord {
459                    offset: cur,
460                    type_byte,
461                    commit_lsn: None,
462                    commit_unix_us: None,
463                    sql,
464                });
465                cur += header_len + rec_len;
466            }
467            WAL_V3_TYPE_DURABILITY_CHECKPOINT => {
468                out.push(WalRecord {
469                    offset: cur,
470                    type_byte,
471                    commit_lsn: None,
472                    commit_unix_us: None,
473                    sql: &[],
474                });
475                cur += header_len + rec_len;
476            }
477            WAL_V4_TYPE_CHECKPOINT_MARKER => {
478                // v7.18 PITR — payload = (lsn u64)(ts i64)(path_len u16)(path bytes).
479                // We surface lsn + ts on the WalRecord; the path lives
480                // in `sql` since the type byte already disambiguates
481                // record meaning and adding a dedicated field would
482                // bloat the iterator return type for every variant.
483                if rec_len < 18 {
484                    return Err(format!(
485                        "WAL parse: checkpoint marker at offset {cur} too short ({rec_len} bytes)"
486                    ));
487                }
488                let lsn = u64::from_le_bytes(
489                    wal_bytes[cur + header_len..cur + header_len + 8]
490                        .try_into()
491                        .unwrap(),
492                );
493                let ts_raw = i64::from_le_bytes(
494                    wal_bytes[cur + header_len + 8..cur + header_len + 16]
495                        .try_into()
496                        .unwrap(),
497                );
498                let path_len = u16::from_le_bytes(
499                    wal_bytes[cur + header_len + 16..cur + header_len + 18]
500                        .try_into()
501                        .unwrap(),
502                ) as usize;
503                if rec_len < 18 + path_len {
504                    return Err(format!(
505                        "WAL parse: checkpoint marker at offset {cur} truncated path"
506                    ));
507                }
508                let path_start = cur + header_len + 18;
509                let path_bytes = &wal_bytes[path_start..path_start + path_len];
510                let commit_unix_us = if ts_raw == WAL_V4_NO_CLOCK {
511                    None
512                } else {
513                    Some(ts_raw)
514                };
515                out.push(WalRecord {
516                    offset: cur,
517                    type_byte,
518                    commit_lsn: Some(lsn),
519                    commit_unix_us,
520                    sql: path_bytes,
521                });
522                cur += header_len + rec_len;
523            }
524            WAL_V4_TYPE_AUTO_COMMIT_SQL => {
525                let v4_total = header_len + WAL_V4_EXTRA_HEADER + rec_len;
526                if wal_bytes.len() - cur < v4_total {
527                    break;
528                }
529                let lsn = u64::from_le_bytes(
530                    wal_bytes[cur + header_len..cur + header_len + 8]
531                        .try_into()
532                        .unwrap(),
533                );
534                let ts_raw = i64::from_le_bytes(
535                    wal_bytes[cur + header_len + 8..cur + header_len + 16]
536                        .try_into()
537                        .unwrap(),
538                );
539                let commit_unix_us = if ts_raw == WAL_V4_NO_CLOCK {
540                    None
541                } else {
542                    Some(ts_raw)
543                };
544                let sql_start = cur + header_len + WAL_V4_EXTRA_HEADER;
545                let sql = &wal_bytes[sql_start..sql_start + rec_len];
546                out.push(WalRecord {
547                    offset: cur,
548                    type_byte,
549                    commit_lsn: Some(lsn),
550                    commit_unix_us,
551                    sql,
552                });
553                cur += v4_total;
554            }
555            other => {
556                return Err(format!(
557                    "WAL parse: unknown type byte {other:#04x} at offset {cur}"
558                ));
559            }
560        }
561    }
562    Ok(out)
563}
564
565/// v7.1 — predicate for "should the next `execute()` mutate the
566/// WAL?" Returns `false` for SELECT / SHOW / EXPLAIN / BEGIN /
567/// COMMIT / ROLLBACK and the SPG-specific verbs that don't go
568/// through the auto-commit record path on the server (CHECKPOINT,
569/// COMPACT). Conservative: anything we don't explicitly know is
570/// read-only falls through to "write a WAL record".
571fn sql_is_read_only(sql: &str) -> bool {
572    let t = sql.trim_start();
573    let head = t
574        .split(|c: char| c.is_whitespace() || c == ';' || c == '(')
575        .next()
576        .unwrap_or("");
577    matches!(
578        head.to_ascii_lowercase().as_str(),
579        "select"
580            | "show"
581            | "explain"
582            | "begin"
583            | "commit"
584            | "rollback"
585            | "checkpoint"
586            | "compact"
587            | "wait"
588            | "with"
589    )
590}
591
592/// Embedded SPG database handle. Owns an `Engine` + provides
593/// ergonomic wrappers around `execute` and `query`. Drops the
594/// engine on `Drop` — no WAL flush / fsync, because v6.10.3
595/// is in-memory only.
596#[derive(Debug)]
597pub struct Database {
598    engine: Engine,
599    /// v7.1 — persistence sidecar. When `Some(p)`, every
600    /// `execute(sql)` that mutates state appends a v4
601    /// `auto_commit_sql` WAL record + fsyncs before the call
602    /// returns; `Drop` writes a final catalog snapshot to
603    /// `<db_path>` so the next session boots from a clean
604    /// snapshot + an empty WAL. `None` = in-memory only (the
605    /// v6.10.3 shape).
606    persistence: Option<PersistenceCtx>,
607    /// v7.18 PITR — monotonic per-database commit LSN. Increments
608    /// before each successful WAL append; bootstrapped at
609    /// open_path from `max(parse_wal_records → commit_lsn)` so
610    /// reopen never reuses an LSN. In-memory databases start at
611    /// 0 and never advance (no WAL = no LSN-meaningful records).
612    commit_lsn: AtomicU64,
613}
614
615#[derive(Debug)]
616#[allow(dead_code)] // `wal_path` is read at boot; kept for Drop/diag introspection.
617struct PersistenceCtx {
618    db_path: PathBuf,
619    wal_path: PathBuf,
620    wal: File,
621    /// Cached WAL file length so each `execute()` doesn't have
622    /// to stat. Refreshed on append + on `checkpoint()` (which
623    /// truncates back to 0).
624    wal_len: u64,
625    checkpoint_threshold_bytes: u64,
626    /// v7.1.4 — `<db_path>.spg/segments/` directory. Cold-tier
627    /// segments produced by `freeze_oldest_to_cold` / compaction
628    /// are persisted here as `seg_<id>.spg` files; the manifest
629    /// at `<db_path>.spg/manifest.v10` records every active
630    /// segment + its CRC32 so the next boot can verify + reload.
631    cold_segments_dir: PathBuf,
632    cold_segment_paths: BTreeMap<u32, PathBuf>,
633    /// v7.17.0 Phase 6.2 — cross-process exclusion lock. Acquired
634    /// via `fs::create_dir` on `<db_path>.lock` at open_path
635    /// entry; released on Drop by `fs::remove_dir`. atomic on
636    /// every supported platform. A second process opening the
637    /// same path while the first is still alive hits the
638    /// create_dir failure and returns
639    /// `EngineError::Unsupported("database is locked by another
640    /// process: …")`. Stale locks (process crashed mid-session)
641    /// must be cleared via `Database::force_unlock(path)` —
642    /// SPG can't safely fingerprint who owned a stale directory
643    /// without a libc dep, which would violate spg-embedded's
644    /// zero-deps charter.
645    lock_path: PathBuf,
646}
647
648impl Database {
649    /// Open a fresh in-memory database. No WAL, no catalog
650    /// snapshot on disk — perfect for tests + short-lived
651    /// CLI tools.
652    #[must_use]
653    pub fn open_in_memory() -> Self {
654        Self {
655            engine: Engine::new().with_clock(wall_clock_micros),
656            persistence: None,
657            commit_lsn: AtomicU64::new(0),
658        }
659    }
660
661    /// v7.1 — Open or create a persistent database backed by
662    /// the file at `db_path`. The WAL lives at `db_path` +
663    /// ".wal" (e.g. `./data/spg.db` → `./data/spg.db.wal`). Boot
664    /// path:
665    ///
666    /// 1. If `db_path` exists, restore the catalog snapshot.
667    /// 2. If the WAL exists, replay every record into the
668    ///    restored engine — the same recovery story
669    ///    `spg-server` uses.
670    /// 3. Open the WAL in append+sync mode so subsequent
671    ///    `execute()` writes durably commit (one fsync per
672    ///    mutation).
673    ///
674    /// `Drop` writes a final catalog snapshot + truncates the
675    /// WAL — operators that need a sync barrier at a specific
676    /// point use `checkpoint()` explicitly.
677    pub fn open_path(db_path: impl AsRef<Path>) -> Result<Self, EngineError> {
678        let db_path = db_path.as_ref().to_path_buf();
679        let wal_path = {
680            let mut p = db_path.clone();
681            let name = p
682                .file_name()
683                .map(|n| {
684                    let mut s = n.to_os_string();
685                    s.push(".wal");
686                    s
687                })
688                .unwrap_or_else(|| std::ffi::OsString::from(".wal"));
689            p.set_file_name(name);
690            p
691        };
692        if let Some(parent) = db_path.parent()
693            && !parent.as_os_str().is_empty()
694        {
695            std::fs::create_dir_all(parent).map_err(io_err)?;
696        }
697        // v7.17.0 Phase 6.2 — acquire cross-process exclusion
698        // lock before touching any catalog / WAL bytes. atomic
699        // mkdir on every supported platform; a second process
700        // opening the same path while the first is still alive
701        // hits the create_dir failure and gets a clear error.
702        let lock_path = {
703            let mut p = db_path.clone();
704            let name = p
705                .file_name()
706                .map(|n| {
707                    let mut s = n.to_os_string();
708                    s.push(".lock");
709                    s
710                })
711                .unwrap_or_else(|| std::ffi::OsString::from(".lock"));
712            p.set_file_name(name);
713            p
714        };
715        std::fs::create_dir(&lock_path).map_err(|e| {
716            if e.kind() == std::io::ErrorKind::AlreadyExists {
717                EngineError::Unsupported(format!(
718                    "database is locked by another process (or stale lock): {}; \
719                     remove the directory manually after confirming no other \
720                     process holds it, or call Database::force_unlock()",
721                    lock_path.display()
722                ))
723            } else {
724                io_err(e)
725            }
726        })?;
727        let mut engine = if db_path.exists() {
728            let bytes = std::fs::read(&db_path).map_err(io_err)?;
729            let engine = Engine::restore_envelope(&bytes).map_err(|e| {
730                EngineError::Storage(spg_storage::StorageError::Corrupt(format!(
731                    "restore from {}: {e}",
732                    db_path.display()
733                )))
734            })?;
735            engine.with_clock(wall_clock_micros)
736        } else {
737            Engine::new().with_clock(wall_clock_micros)
738        };
739        // v7.1.4 — manifest-driven cold-segment reload. The
740        // manifest sidecar pairs the catalog snapshot CRC with a
741        // list of `(segment_id, path, crc32)` triples; verify
742        // before loading so a torn or stale manifest doesn't
743        // surface phantom data.
744        let cold_segments_dir = {
745            let parent = db_path.parent().unwrap_or_else(|| Path::new("."));
746            let stem = db_path
747                .file_stem()
748                .unwrap_or_else(|| std::ffi::OsStr::new("db"))
749                .to_string_lossy()
750                .into_owned();
751            parent.join(format!("{stem}.spg")).join("segments")
752        };
753        let mut cold_segment_paths: BTreeMap<u32, PathBuf> = BTreeMap::new();
754        let manifest_pth = spg_manifest_path(&db_path);
755        if manifest_pth.exists() && db_path.exists() {
756            let m_bytes = std::fs::read(&manifest_pth).map_err(io_err)?;
757            if let Ok(m) = CatalogManifest::deserialize(&m_bytes) {
758                let snap_bytes = std::fs::read(&db_path).map_err(io_err)?;
759                let snap_crc = spg_crypto::crc32::crc32(&snap_bytes);
760                if snap_crc == m.catalog_crc32 {
761                    for entry in &m.cold_segments {
762                        if let Ok(seg_bytes) = std::fs::read(&entry.path) {
763                            let computed = spg_crypto::crc32::crc32(&seg_bytes);
764                            if computed != entry.crc32 {
765                                eprintln!(
766                                    "spg-embedded: manifest skip segment {}: CRC mismatch",
767                                    entry.segment_id
768                                );
769                                continue;
770                            }
771                            if engine.catalog().cold_segment(entry.segment_id).is_some() {
772                                // Already loaded via Catalog::clone path (shouldn't happen
773                                // since Engine::new + restore_envelope don't populate cold).
774                                continue;
775                            }
776                            let mut new_cat = engine.catalog().clone();
777                            if let Err(e) =
778                                new_cat.load_segment_bytes_at(entry.segment_id, seg_bytes)
779                            {
780                                eprintln!(
781                                    "spg-embedded: manifest load segment {} failed: {e}",
782                                    entry.segment_id
783                                );
784                                continue;
785                            }
786                            engine.replace_catalog(new_cat);
787                            cold_segment_paths.insert(entry.segment_id, entry.path.clone());
788                        } else {
789                            eprintln!(
790                                "spg-embedded: manifest skip segment {}: file unreadable",
791                                entry.segment_id
792                            );
793                        }
794                    }
795                }
796            }
797        }
798        let mut initial_lsn: u64 = 0;
799        if wal_path.exists() {
800            let wal_bytes = std::fs::read(&wal_path).map_err(io_err)?;
801            if !wal_bytes.is_empty() {
802                replay_wal_into_engine(&wal_bytes, &mut engine)
803                    .map_err(|m| EngineError::Storage(spg_storage::StorageError::Corrupt(m)))?;
804                // v7.18 PITR — recover the commit-LSN watermark so
805                // the new session does not re-issue an LSN that
806                // already lives in the WAL. parse_wal_records yields
807                // None for v3 records (they predate the LSN field);
808                // an empty / v3-only WAL leaves the counter at 0.
809                if let Ok(records) = parse_wal_records(&wal_bytes) {
810                    if let Some(max) = records.iter().filter_map(|r| r.commit_lsn).max() {
811                        initial_lsn = max;
812                    }
813                }
814            }
815        }
816        let wal = OpenOptions::new()
817            .create(true)
818            .append(true)
819            .read(true)
820            .open(&wal_path)
821            .map_err(io_err)?;
822        let wal_len = wal.metadata().map_err(io_err)?.len();
823        Ok(Self {
824            engine,
825            commit_lsn: AtomicU64::new(initial_lsn),
826            persistence: Some(PersistenceCtx {
827                db_path,
828                wal_path,
829                wal,
830                wal_len,
831                checkpoint_threshold_bytes: default_checkpoint_threshold_bytes(),
832                cold_segments_dir,
833                cold_segment_paths,
834                lock_path,
835            }),
836        })
837    }
838
839    /// v7.1.4 — freeze the oldest `max_rows` of `table_name`'s
840    /// hot tier into a brand-new cold-tier segment + persist
841    /// it to disk. Same semantics as `spg-server`'s freezer
842    /// thread; embedded just runs the freeze synchronously on
843    /// the caller's thread. Persistence + manifest update
844    /// happen as part of the next `checkpoint()` (or on Drop).
845    pub fn freeze_oldest_to_cold(
846        &mut self,
847        table_name: &str,
848        index_name: &str,
849        max_rows: usize,
850    ) -> Result<spg_storage::FreezeReport, EngineError> {
851        let report = self
852            .engine
853            .freeze_oldest_to_cold(table_name, index_name, max_rows)?;
854        if let Some(p) = &mut self.persistence {
855            std::fs::create_dir_all(&p.cold_segments_dir).map_err(io_err)?;
856            let final_path = p
857                .cold_segments_dir
858                .join(format!("seg_{}.spg", report.segment_id));
859            let tmp_path = p
860                .cold_segments_dir
861                .join(format!("seg_{}.spg.tmp", report.segment_id));
862            std::fs::write(&tmp_path, &report.segment_bytes).map_err(io_err)?;
863            std::fs::rename(&tmp_path, &final_path).map_err(io_err)?;
864            p.cold_segment_paths.insert(report.segment_id, final_path);
865        }
866        Ok(report)
867    }
868
869    /// v7.1 — override the auto-checkpoint WAL-size ceiling for
870    /// this `Database` instance. Default is
871    /// `SPG_EMBEDDED_CHECKPOINT_BYTES` env (4 MiB if unset); the
872    /// setter wins. No-op when the database is in-memory.
873    pub fn set_checkpoint_threshold_bytes(&mut self, bytes: u64) {
874        if let Some(p) = &mut self.persistence {
875            p.checkpoint_threshold_bytes = bytes.max(1);
876        }
877    }
878
879    /// v7.1 — flush a fresh catalog snapshot to `db_path` and
880    /// truncate the WAL. Idempotent; cheap when nothing has
881    /// happened since the last checkpoint. No-op when the
882    /// database is in-memory (no `db_path` configured).
883    ///
884    /// Called automatically when:
885    /// - the WAL grows past
886    ///   `SPG_EMBEDDED_CHECKPOINT_BYTES` (default 4 MiB) at the
887    ///   end of an `execute()`, and
888    /// - `Drop` runs (best-effort; checkpoint failure on drop is
889    ///   logged to stderr).
890    pub fn checkpoint(&mut self) -> Result<(), EngineError> {
891        let snapshot = self.engine.snapshot();
892        let Some(p) = &mut self.persistence else {
893            return Ok(());
894        };
895        // Snapshot first (atomic via tmp+rename), then WAL
896        // truncate. Same order as `spg-server`'s CHECKPOINT —
897        // a crash between the two leaves the WAL holding
898        // already-snapshotted ops, which replay cleanly on the
899        // next boot (idempotent for SPG's standard DDL/DML
900        // mutations).
901        let tmp = {
902            let mut t = p.db_path.clone();
903            let mut name = t
904                .file_name()
905                .map(std::ffi::OsStr::to_os_string)
906                .unwrap_or_default();
907            name.push(".tmp");
908            t.set_file_name(name);
909            t
910        };
911        std::fs::write(&tmp, &snapshot).map_err(io_err)?;
912        std::fs::rename(&tmp, &p.db_path).map_err(io_err)?;
913        // v7.1.4 — refresh the manifest so the next boot can
914        // reload cold segments alongside the snapshot. Bytes
915        // come from the freshly-written snapshot file (= the
916        // canonical CRC source).
917        if !p.cold_segment_paths.is_empty() {
918            let snap_crc = spg_crypto::crc32::crc32(&snapshot);
919            let entries: Vec<ColdSegmentEntry> = p
920                .cold_segment_paths
921                .iter()
922                .filter_map(|(&segment_id, path)| {
923                    let bytes = std::fs::read(path).ok()?;
924                    Some(ColdSegmentEntry {
925                        segment_id,
926                        path: path.clone(),
927                        crc32: spg_crypto::crc32::crc32(&bytes),
928                    })
929                })
930                .collect();
931            let manifest = CatalogManifest {
932                catalog_crc32: snap_crc,
933                cold_segments: entries,
934                wal_baseline_offset: 0,
935            };
936            let m_bytes = manifest.serialize();
937            let m_path = spg_manifest_path(&p.db_path);
938            if let Some(dir) = m_path.parent() {
939                std::fs::create_dir_all(dir).map_err(io_err)?;
940            }
941            let m_tmp = {
942                let mut t = m_path.clone();
943                let mut name = t
944                    .file_name()
945                    .map(std::ffi::OsStr::to_os_string)
946                    .unwrap_or_default();
947                name.push(".tmp");
948                t.set_file_name(name);
949                t
950            };
951            std::fs::write(&m_tmp, &m_bytes).map_err(io_err)?;
952            std::fs::rename(&m_tmp, &m_path).map_err(io_err)?;
953        }
954        // v7.18 PITR — append a checkpoint marker BEFORE truncating
955        // so backup tooling that copies the WAL between snapshot
956        // rotations sees the (lsn, ts, snapshot_path) triple that
957        // anchors restore-to-time. The marker rides the WAL's
958        // existing CRC-protected v3 envelope under the new v4 type
959        // byte 0x11; replay dispatch (and `revert_wal_to_seq`)
960        // already skip it. The truncate below then drops both the
961        // SQL records the snapshot superseded AND this marker —
962        // PITR retention work (P4/P6) intercepts the WAL before
963        // this point to archive both pieces together.
964        let marker_lsn = self.commit_lsn.load(Ordering::SeqCst);
965        let marker_ts = wall_clock_micros();
966        let marker = encode_v4_checkpoint_marker(marker_lsn, marker_ts, &p.db_path);
967        p.wal.write_all(&marker).map_err(io_err)?;
968        p.wal.sync_data().map_err(io_err)?;
969        p.wal.set_len(0).map_err(io_err)?;
970        p.wal.seek(SeekFrom::Start(0)).map_err(io_err)?;
971        p.wal.sync_data().map_err(io_err)?;
972        p.wal_len = 0;
973        Ok(())
974    }
975
976    /// Restore a database from a previously-captured catalog
977    /// snapshot. Pairs with `Database::snapshot()` for
978    /// round-tripping in-memory state without going through
979    /// the `spg-server` WAL.
980    pub fn restore(snapshot: &[u8]) -> Result<Self, EngineError> {
981        let engine = Engine::restore_envelope(snapshot).map_err(|e| {
982            EngineError::Storage(spg_storage::StorageError::Corrupt(format!("restore: {e}")))
983        })?;
984        Ok(Self {
985            engine,
986            persistence: None,
987            commit_lsn: AtomicU64::new(0),
988        })
989    }
990
991    /// Take a catalog snapshot suitable for `Database::restore`.
992    /// The bytes are SPG's canonical catalog envelope (FILE_MAGIC
993    /// + version + payload); round-trips through every released
994    /// SPG version per the STABILITY contract.
995    #[must_use]
996    pub fn snapshot(&self) -> Vec<u8> {
997        self.engine.snapshot()
998    }
999
1000    /// Execute a SQL statement and return the engine's
1001    /// `QueryResult` verbatim. Pass-through for callers that
1002    /// want to keep PG-flavoured column/row metadata.
1003    ///
1004    /// v7.1 — when the database was opened via `open_path`,
1005    /// successful mutations are appended to the WAL + fsynced
1006    /// before the call returns. A subsequent process crash will
1007    /// recover state up to the last successful return from
1008    /// `execute()`. Read-only statements (SELECT / SHOW /
1009    /// EXPLAIN / BEGIN-COMMIT-ROLLBACK / CHECKPOINT / COMPACT
1010    /// etc.) skip the WAL entirely.
1011    pub fn execute(&mut self, sql: &str) -> Result<QueryResult, EngineError> {
1012        let result = self.engine.execute(sql)?;
1013        if self.persistence.is_some()
1014            && !sql_is_read_only(sql)
1015            && matches!(
1016                &result,
1017                QueryResult::CommandOk {
1018                    modified_catalog: true,
1019                    ..
1020                }
1021            )
1022        {
1023            // v7.18 PITR — write v4 records that carry the commit
1024            // LSN + wall-clock micros so restore tooling can
1025            // target a point in time. Replay path still accepts
1026            // v3 records emitted by older spg-embedded versions.
1027            // Crash window is bounded by one record exactly as
1028            // under v3: WAL fsync happens after the in-memory
1029            // mutation, so the WAL never describes a write that
1030            // didn't apply.
1031            let lsn = self.commit_lsn.fetch_add(1, Ordering::SeqCst) + 1;
1032            let ts = wall_clock_micros();
1033            let record = encode_v4_auto_commit(sql, lsn, ts);
1034            let p = self.persistence.as_mut().expect("checked above");
1035            p.wal.write_all(&record).map_err(io_err)?;
1036            p.wal.sync_data().map_err(io_err)?;
1037            p.wal_len = p.wal_len.saturating_add(record.len() as u64);
1038            if p.wal_len >= p.checkpoint_threshold_bytes {
1039                self.checkpoint()?;
1040            }
1041        }
1042        Ok(result)
1043    }
1044
1045    /// v7.3.0 — typed-row variant of [`Database::query`]. Each
1046    /// row decodes into a `T: FromSpgRow` so callers don't
1047    /// pattern-match on `Value` themselves. Use [`spg_row!`] to
1048    /// generate the impl, or write it by hand.
1049    pub fn query_typed<T: FromSpgRow>(&mut self, sql: &str) -> Result<Vec<T>, EngineError> {
1050        let rows = self.query(sql)?;
1051        rows.into_iter().map(|r| T::from_spg_row(&r)).collect()
1052    }
1053
1054    /// Run a SELECT and return rows as a `Vec<Vec<Value>>` —
1055    /// strips the column-schema metadata for read-side
1056    /// ergonomics. Errors on non-Rows results (DML / DDL
1057    /// statements should go through `execute` instead).
1058    pub fn query(&mut self, sql: &str) -> Result<Vec<Vec<Value>>, EngineError> {
1059        match self.engine.execute(sql)? {
1060            QueryResult::Rows { rows, .. } => Ok(rows.into_iter().map(|r| r.values).collect()),
1061            QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
1062                "query() expects a SELECT — use execute() for DML/DDL".into(),
1063            )),
1064            // v7.5.0 — QueryResult is #[non_exhaustive]; any future
1065            // variant is not a SELECT row stream, treat as Unsupported.
1066            _ => Err(EngineError::Unsupported(
1067                "query() expects a SELECT — use execute() for DML/DDL".into(),
1068            )),
1069        }
1070    }
1071
1072    /// v7.16.0 — column-aware variant of [`Self::query`].
1073    /// Returns the column schema vec alongside the rows so
1074    /// adapters (the spg-sqlx Row impl most notably) can drive
1075    /// name + type-based column lookups. Errors on non-Rows
1076    /// results identically to `query`.
1077    pub fn query_with_columns(
1078        &mut self,
1079        sql: &str,
1080    ) -> Result<(Vec<spg_storage::ColumnSchema>, Vec<Vec<Value>>), EngineError> {
1081        match self.engine.execute(sql)? {
1082            QueryResult::Rows { columns, rows } => {
1083                Ok((columns, rows.into_iter().map(|r| r.values).collect()))
1084            }
1085            QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
1086                "query_with_columns() expects a SELECT — use execute() for DML/DDL".into(),
1087            )),
1088            _ => Err(EngineError::Unsupported(
1089                "query_with_columns() expects a SELECT — use execute() for DML/DDL".into(),
1090            )),
1091        }
1092    }
1093
1094    /// v7.16.0 — column-aware variant of
1095    /// [`Self::query_prepared`]. Same shape as
1096    /// `query_with_columns` but driven from a prepared
1097    /// statement + bound params.
1098    pub fn query_prepared_with_columns(
1099        &mut self,
1100        stmt: &Statement,
1101        params: &[Value],
1102    ) -> Result<(Vec<spg_storage::ColumnSchema>, Vec<Vec<Value>>), EngineError> {
1103        match self.engine.execute_prepared(stmt.stmt.clone(), params)? {
1104            QueryResult::Rows { columns, rows } => {
1105                Ok((columns, rows.into_iter().map(|r| r.values).collect()))
1106            }
1107            QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
1108                "query_prepared_with_columns() expects a SELECT — use execute_prepared() for DML/DDL".into(),
1109            )),
1110            _ => Err(EngineError::Unsupported(
1111                "query_prepared_with_columns() expects a SELECT — use execute_prepared() for DML/DDL".into(),
1112            )),
1113        }
1114    }
1115
1116    /// Borrow the underlying engine. Escape hatch for callers
1117    /// that need access to `spg-engine` APIs not yet surfaced
1118    /// here (transactions, EXPLAIN ANALYZE, etc.).
1119    #[must_use]
1120    pub const fn engine(&self) -> &Engine {
1121        &self.engine
1122    }
1123
1124    /// Mutable borrow of the underlying engine. Same intent as
1125    /// `engine()` but for write-side APIs (e.g. inserting
1126    /// directly through `Catalog::insert` for high-throughput
1127    /// bulk loads that bypass SQL parsing).
1128    pub const fn engine_mut(&mut self) -> &mut Engine {
1129        &mut self.engine
1130    }
1131
1132    /// v7.16.0 — parse + plan a SQL string ONCE so subsequent
1133    /// `execute_prepared` / `query_prepared` calls can re-bind
1134    /// parameters without re-parsing. The returned [`Statement`]
1135    /// is a thin handle around the AST + cached source SQL; it's
1136    /// `Clone` so the same plan can drive many bind calls
1137    /// concurrently (each call clones the AST and runs
1138    /// placeholder substitution on the clone — the cached
1139    /// plan stays intact).
1140    ///
1141    /// Plan caching follows the engine's existing version-aware
1142    /// rule: a prepared `Statement` whose statistics version
1143    /// has rolled (ANALYZE ran between prepare and execute)
1144    /// will silently re-prepare under the hood. Callers don't
1145    /// need to detect this.
1146    ///
1147    /// Placeholders in the SQL use PG's `$1`, `$2`, … convention.
1148    /// `bind`-time `Value`s are passed as a slice; arity
1149    /// mismatches surface as `EvalError::PlaceholderOutOfRange`
1150    /// at `execute_prepared` time, not here.
1151    ///
1152    /// # Errors
1153    /// Surfaces `EngineError` (parse error / plan rewrite
1154    /// failure) from the underlying `Engine::prepare`.
1155    pub fn prepare(&mut self, sql: &str) -> Result<Statement, EngineError> {
1156        // Use the cached path so repeated prepares of the same
1157        // SQL are O(1). The engine's plan cache stays shared
1158        // across all callers of this Database — a single
1159        // `PgPool`-shaped consumer (or, later, the spg-sqlx
1160        // adapter) prepares once and reaps the win on every bind.
1161        let stmt = self
1162            .engine
1163            .prepare_cached(sql)
1164            .map_err(EngineError::Parse)?;
1165        Ok(Statement {
1166            stmt,
1167            sql: sql.to_string(),
1168        })
1169    }
1170
1171    /// v7.17.0 Phase 3.P0-66 — describe a SQL string without
1172    /// executing. Returns `(parameter_oid_count, output_columns)`
1173    /// where `output_columns` is empty for non-SELECT statements
1174    /// or for SELECT shapes the describe planner can't resolve
1175    /// (JOIN / subquery / unknown table). Wraps
1176    /// `Engine::describe_prepared` so the spg-sqlx bridge can
1177    /// surface PG-shape Describe replies for
1178    /// `sqlx::query!()` compile-time validation.
1179    ///
1180    /// # Errors
1181    /// Propagates parse errors from the underlying prepare path.
1182    pub fn describe(&mut self, sql: &str) -> Result<(Vec<u32>, Vec<ColumnSchema>), EngineError> {
1183        let stmt = self
1184            .engine
1185            .prepare_cached(sql)
1186            .map_err(EngineError::Parse)?;
1187        Ok(self.engine.describe_prepared(&stmt))
1188    }
1189
1190    /// v7.16.0 — execute a prepared statement with bound
1191    /// parameters. Mirrors `Engine::execute_prepared`: clones
1192    /// the AST, substitutes `$1..$N` → `params[0..N-1]`, runs.
1193    ///
1194    /// Persistence (WAL fsync + auto-checkpoint) follows the
1195    /// same rules as `execute(sql)`: mutating statements get a
1196    /// WAL record AFTER the in-memory exec succeeds. The WAL
1197    /// record carries the substituted, bind-final SQL, so
1198    /// replay reconstructs the same row state without needing
1199    /// the original prepared `Statement` to still be alive.
1200    ///
1201    /// # Errors
1202    /// Propagates engine errors. Param arity mismatch surfaces
1203    /// as `EvalError::PlaceholderOutOfRange`.
1204    pub fn execute_prepared(
1205        &mut self,
1206        stmt: &Statement,
1207        params: &[Value],
1208    ) -> Result<QueryResult, EngineError> {
1209        let result = self.engine.execute_prepared(stmt.stmt.clone(), params)?;
1210        // WAL persistence on the bind-final SQL. Build the
1211        // canonical Display form by re-printing the
1212        // placeholder-substituted statement (cheap — the AST
1213        // is already in hand from execute_prepared's internal
1214        // clone) so replay's path is identical to the
1215        // simple-query path.
1216        if self.persistence.is_some()
1217            && matches!(
1218                &result,
1219                QueryResult::CommandOk {
1220                    modified_catalog: true,
1221                    ..
1222                }
1223            )
1224        {
1225            // Render the AST back to SQL for WAL replay. The
1226            // placeholder positions are already substituted in
1227            // the executed clone; we re-substitute on a fresh
1228            // clone here purely to obtain the canonical text.
1229            let mut wal_stmt = stmt.stmt.clone();
1230            // Use the engine's substitute_placeholders entry —
1231            // exposed via execute_prepared above. Here we
1232            // re-run the substitution only for Display.
1233            crate::wal_render_with_params(&mut wal_stmt, params);
1234            let canonical = format!("{wal_stmt}");
1235            // v7.18 PITR — prepared path also emits v4 records so
1236            // LSN/timestamp coverage is uniform across simple and
1237            // extended query.
1238            let lsn = self.commit_lsn.fetch_add(1, Ordering::SeqCst) + 1;
1239            let ts = wall_clock_micros();
1240            let record = encode_v4_auto_commit(&canonical, lsn, ts);
1241            let p = self.persistence.as_mut().expect("checked above");
1242            p.wal.write_all(&record).map_err(io_err)?;
1243            p.wal.sync_data().map_err(io_err)?;
1244            p.wal_len = p.wal_len.saturating_add(record.len() as u64);
1245            if p.wal_len >= p.checkpoint_threshold_bytes {
1246                self.checkpoint()?;
1247            }
1248        }
1249        Ok(result)
1250    }
1251
1252    /// v7.16.0 — run a prepared SELECT with bound params and
1253    /// return rows as `Vec<Vec<Value>>`, matching `query()`
1254    /// shape. SELECTs are read-only so this never writes the
1255    /// WAL.
1256    ///
1257    /// # Errors
1258    /// Returns `Unsupported` if the prepared statement isn't a
1259    /// SELECT (use `execute_prepared` for DML/DDL).
1260    pub fn query_prepared(
1261        &mut self,
1262        stmt: &Statement,
1263        params: &[Value],
1264    ) -> Result<Vec<Vec<Value>>, EngineError> {
1265        match self.engine.execute_prepared(stmt.stmt.clone(), params)? {
1266            QueryResult::Rows { rows, .. } => Ok(rows.into_iter().map(|r| r.values).collect()),
1267            QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
1268                "query_prepared() expects a SELECT — use execute_prepared() for DML/DDL".into(),
1269            )),
1270            _ => Err(EngineError::Unsupported(
1271                "query_prepared() expects a SELECT — use execute_prepared() for DML/DDL".into(),
1272            )),
1273        }
1274    }
1275
1276    /// v7.18 — parse + plan a SQL string against a
1277    /// `CatalogSnapshot`. Mirror of [`Database::prepare`] for the
1278    /// readonly fan-out path: no writer lock taken, no WAL write,
1279    /// no plan-cache mutation. Static-on-`Self` so callers can
1280    /// dispatch against a snapshot without an `&mut Database`
1281    /// borrow — `AsyncReadHandle::prepare` in spg-embedded-tokio
1282    /// is the load-bearing consumer.
1283    ///
1284    /// # Errors
1285    /// Propagates `EngineError::Parse` from the parser.
1286    pub fn prepare_on_snapshot(
1287        snapshot: &CatalogSnapshot,
1288        sql: &str,
1289    ) -> Result<Statement, EngineError> {
1290        let stmt = spg_engine::Engine::prepare_on_snapshot(snapshot, sql)
1291            .map_err(EngineError::Parse)?;
1292        Ok(Statement {
1293            stmt,
1294            sql: sql.to_string(),
1295        })
1296    }
1297
1298    /// v7.18 — execute a prepared `Statement` against a
1299    /// `CatalogSnapshot` with bound params. Mirror of
1300    /// [`Database::execute_prepared`] on the readonly path:
1301    /// writes / DDL hit `EngineError::WriteRequired`. No WAL
1302    /// write, no writer lock, multiple snapshots can run
1303    /// concurrently — the snapshot is immutable from prepare time.
1304    ///
1305    /// # Errors
1306    /// Surfaces `EngineError::WriteRequired` for non-readonly
1307    /// statements; propagates other engine errors.
1308    pub fn execute_prepared_on_snapshot(
1309        snapshot: &CatalogSnapshot,
1310        stmt: &Statement,
1311        params: &[Value],
1312    ) -> Result<QueryResult, EngineError> {
1313        spg_engine::Engine::execute_readonly_prepared_on_snapshot(
1314            snapshot,
1315            stmt.stmt.clone(),
1316            params,
1317        )
1318    }
1319
1320    /// v7.18 — describe a SQL string against a
1321    /// `CatalogSnapshot`. Mirror of [`Database::describe`] on
1322    /// the readonly path. Pure function on the snapshot's
1323    /// catalog; safe to call from any thread.
1324    ///
1325    /// # Errors
1326    /// Propagates `EngineError::Parse` from the parser.
1327    pub fn describe_on_snapshot(
1328        snapshot: &CatalogSnapshot,
1329        sql: &str,
1330    ) -> Result<(Vec<u32>, Vec<ColumnSchema>), EngineError> {
1331        let stmt = spg_engine::Engine::prepare_on_snapshot(snapshot, sql)
1332            .map_err(EngineError::Parse)?;
1333        Ok(spg_engine::Engine::describe_prepared_on_snapshot(
1334            snapshot, &stmt,
1335        ))
1336    }
1337
1338    /// v7.2.0 — run `body` inside an implicit `BEGIN` /
1339    /// `COMMIT` pair. The body receives `&mut Database` so it
1340    /// can `execute()` / `query()` like any other code path;
1341    /// the only difference is that every write in the body
1342    /// lands inside one transaction, and a returned `Err` from
1343    /// the body triggers `ROLLBACK` before the error propagates.
1344    ///
1345    /// Nested calls are not supported — SPG's transaction
1346    /// model is single-writer with explicit `BEGIN` /
1347    /// `COMMIT` / `ROLLBACK`, and a nested `with_transaction`
1348    /// would hit `EngineError::Unsupported("nested
1349    /// transaction")` at the inner `BEGIN`.
1350    pub fn with_transaction<R, F>(&mut self, body: F) -> Result<R, EngineError>
1351    where
1352        F: FnOnce(&mut Self) -> Result<R, EngineError>,
1353    {
1354        self.execute("BEGIN")?;
1355        match body(self) {
1356            Ok(value) => {
1357                self.execute("COMMIT")?;
1358                Ok(value)
1359            }
1360            Err(e) => {
1361                // Best-effort rollback. If ROLLBACK itself
1362                // fails (rare — the engine reports it via
1363                // `Unsupported` only when there's no active
1364                // TX, which can't happen here) we surface the
1365                // original body error, not the rollback error.
1366                let _ = self.execute("ROLLBACK");
1367                Err(e)
1368            }
1369        }
1370    }
1371}
1372
1373impl Default for Database {
1374    fn default() -> Self {
1375        Self::open_in_memory()
1376    }
1377}
1378
1379/// v7.7.5 — observability snapshot returned by
1380/// [`Database::metrics`]. Plain data, no allocations beyond
1381/// what the struct itself takes; cheap to construct and
1382/// cheap to serialise.
1383#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1384#[non_exhaustive]
1385pub struct EmbeddedMetrics {
1386    /// Total live row count across every user table (hot
1387    /// tier only — cold-tier rows live in segment files).
1388    pub hot_rows: u64,
1389    /// Sum of `Table::hot_bytes` across every user table.
1390    /// Tracks against the freezer's `hot_tier_bytes` budget.
1391    pub hot_bytes: u64,
1392    /// Number of cold-tier segments registered in the catalog.
1393    /// Includes tombstoned slots (segments retired by
1394    /// compaction whose disk file may still be on disk).
1395    pub cold_segments: u64,
1396    /// User-table count (excludes any future engine-managed
1397    /// internal tables).
1398    pub tables: u64,
1399    /// WAL size at last `execute()` / `checkpoint()`. Zero
1400    /// when the database is in-memory.
1401    pub wal_bytes: u64,
1402    /// `true` when the database was opened with `open_path` —
1403    /// i.e. WAL + checkpoint persistence is active.
1404    pub persistent: bool,
1405}
1406
1407/// v7.2.1 — handle returned by `spawn_background_freezer`.
1408/// Drop signals the worker thread to wind down + joins it,
1409/// so a `Database` (or its shared `Arc<Mutex<Database>>`)
1410/// can safely drop after the handle does.
1411#[must_use = "the background freezer keeps running until this handle is dropped"]
1412#[derive(Debug)]
1413pub struct FreezerHandle {
1414    shutdown: Arc<AtomicBool>,
1415    join: Option<JoinHandle<()>>,
1416}
1417
1418impl FreezerHandle {
1419    /// v7.2.1 — request the worker stop + join. Idempotent;
1420    /// safe to call from `Drop` (which also calls it).
1421    pub fn stop(&mut self) {
1422        self.shutdown.store(true, Ordering::Release);
1423        if let Some(h) = self.join.take() {
1424            let _ = h.join();
1425        }
1426    }
1427}
1428
1429impl Drop for FreezerHandle {
1430    fn drop(&mut self) {
1431        self.stop();
1432    }
1433}
1434
1435/// v7.2.1 — knobs for `Database::spawn_background_freezer`.
1436#[derive(Debug, Clone)]
1437pub struct FreezerOptions {
1438    /// Tick interval. Worker wakes every `tick`, checks the
1439    /// catalog's `hot_tier_bytes`, and freezes if over budget.
1440    pub tick: Duration,
1441    /// Hot-tier byte budget. Exceeded → next tick freezes the
1442    /// largest table's oldest `batch_rows` rows into a new
1443    /// cold segment.
1444    pub hot_tier_bytes: u64,
1445    /// Max rows the freezer demotes per fire.
1446    pub batch_rows: usize,
1447    /// v7.7.4 — auto-compact threshold. When the catalog has
1448    /// at least this many cold segments across all tables, the
1449    /// freezer fires a compaction pass after its next freeze.
1450    /// Set to `usize::MAX` to disable auto-compact entirely;
1451    /// the default is `64`, matching the `spg-server` operating
1452    /// point for SPG_COLD_COMPACT_SEGMENT_THRESHOLD.
1453    pub compact_when_segments_exceed: usize,
1454    /// v7.7.4 — target segment size for compaction merges,
1455    /// in bytes. Default 64 MiB, mirroring `spg-server`. Small
1456    /// segments below this size are merge candidates;
1457    /// segments at or above stay untouched.
1458    pub compact_target_bytes: u64,
1459}
1460
1461impl Default for FreezerOptions {
1462    fn default() -> Self {
1463        // Match the `spg-server` freezer's default operating
1464        // point (SPG_HOT_TIER_BYTES = 4 GiB, batch 1000 rows,
1465        // tick every 1 s) so embedded behaviour is predictable
1466        // for operators familiar with the server.
1467        Self {
1468            tick: Duration::from_secs(1),
1469            hot_tier_bytes: 4 * 1024 * 1024 * 1024,
1470            batch_rows: 1000,
1471            compact_when_segments_exceed: 64,
1472            compact_target_bytes: 64 * 1024 * 1024,
1473        }
1474    }
1475}
1476
1477impl Database {
1478    /// v7.7.4 — observe the catalog's cold-segment count.
1479    /// Useful for tests + dashboards that want to verify
1480    /// auto-compaction is firing.
1481    #[must_use]
1482    pub fn cold_segment_count(&self) -> usize {
1483        self.engine.catalog().cold_segment_count()
1484    }
1485
1486    /// v7.7.5 — observability snapshot. Returns a point-in-time
1487    /// view of the engine + persistence counters. Cheap (no
1488    /// locks beyond the existing `&self` borrow), so safe to
1489    /// call from a hot metrics-scrape path.
1490    ///
1491    /// Fields mirror the operational dashboard
1492    /// [`spg-server`](https://crates.io/crates/spg-server) exposes,
1493    /// minus the network counters that don't apply to embedded.
1494    #[must_use]
1495    pub fn metrics(&self) -> EmbeddedMetrics {
1496        let cat = self.engine.catalog();
1497        let mut hot_rows: u64 = 0;
1498        let mut hot_bytes: u64 = 0;
1499        for name in cat.table_names() {
1500            if let Some(t) = cat.get(&name) {
1501                hot_rows = hot_rows.saturating_add(t.row_count() as u64);
1502                hot_bytes = hot_bytes.saturating_add(t.hot_bytes());
1503            }
1504        }
1505        let (wal_bytes, persistent) = match &self.persistence {
1506            Some(p) => (p.wal_len, true),
1507            None => (0, false),
1508        };
1509        EmbeddedMetrics {
1510            hot_rows,
1511            hot_bytes,
1512            cold_segments: cat.cold_segment_count() as u64,
1513            tables: cat.table_count() as u64,
1514            wal_bytes,
1515            persistent,
1516        }
1517    }
1518
1519    /// v7.2.1 — spawn a background thread that periodically
1520    /// runs `freeze_oldest_to_cold` when the catalog-wide hot
1521    /// tier exceeds `opts.hot_tier_bytes`. The `Arc<Mutex<_>>`
1522    /// pattern matches the v7.2 sharing story: callers wrap
1523    /// their `Database` in `Arc::new(Mutex::new(db))` once,
1524    /// then clone the Arc for the worker + for foreground
1525    /// access. Return value is a handle whose `Drop` joins the
1526    /// worker.
1527    ///
1528    /// Picks the freeze target the same way `spg-server`'s
1529    /// freezer does: largest-`hot_bytes` user table with at
1530    /// least one BTree integer-PK index. Tables without a
1531    /// freezable index are skipped silently.
1532    pub fn spawn_background_freezer(
1533        db: Arc<Mutex<Database>>,
1534        opts: FreezerOptions,
1535    ) -> FreezerHandle {
1536        let shutdown = Arc::new(AtomicBool::new(false));
1537        let shutdown_for_thread = Arc::clone(&shutdown);
1538        let join = thread::Builder::new()
1539            .name("spg-embedded-freezer".into())
1540            .spawn(move || {
1541                background_freezer_loop(db, opts, shutdown_for_thread);
1542            })
1543            .expect("spawn background freezer thread");
1544        FreezerHandle {
1545            shutdown,
1546            join: Some(join),
1547        }
1548    }
1549}
1550
1551/// v7.2.1 — the freezer's main loop, factored out so the
1552/// `Database::spawn_background_freezer` path stays readable.
1553fn background_freezer_loop(
1554    db: Arc<Mutex<Database>>,
1555    opts: FreezerOptions,
1556    shutdown: Arc<AtomicBool>,
1557) {
1558    // Sleep in short slices so a shutdown request resolves
1559    // quickly (vs sleeping the full tick).
1560    let slice = Duration::from_millis(50.min(opts.tick.as_millis() as u64));
1561    let mut last_tick = std::time::Instant::now();
1562    loop {
1563        if shutdown.load(Ordering::Acquire) {
1564            return;
1565        }
1566        thread::sleep(slice);
1567        if last_tick.elapsed() < opts.tick {
1568            continue;
1569        }
1570        last_tick = std::time::Instant::now();
1571        let Ok(mut guard) = db.lock() else {
1572            return;
1573        };
1574        if guard.engine.catalog().hot_tier_bytes() <= opts.hot_tier_bytes {
1575            continue;
1576        }
1577        let Some((table, index)) = pick_freeze_target(&guard) else {
1578            continue;
1579        };
1580        let row_count = guard
1581            .engine
1582            .catalog()
1583            .get(&table)
1584            .map_or(0, spg_storage::Table::row_count);
1585        let to_freeze = opts.batch_rows.min(row_count);
1586        if to_freeze == 0 {
1587            continue;
1588        }
1589        if let Err(e) = guard.freeze_oldest_to_cold(&table, &index, to_freeze) {
1590            eprintln!("spg-embedded: background freeze on {table}.{index} failed: {e:?}");
1591            continue;
1592        }
1593        // v7.7.4 — auto-compact. If the catalog now carries
1594        // more cold segments than the configured threshold,
1595        // run a single compaction pass. Failures are reported
1596        // but don't kill the loop; the next tick will retry.
1597        let count = guard.engine.catalog().cold_segment_count();
1598        if count > opts.compact_when_segments_exceed {
1599            if let Err(e) = guard
1600                .engine
1601                .compact_cold_segments_with_target(opts.compact_target_bytes)
1602            {
1603                eprintln!(
1604                    "spg-embedded: background compact failed (segments={count}, \
1605                     threshold={}): {e:?}",
1606                    opts.compact_when_segments_exceed,
1607                );
1608            }
1609        }
1610    }
1611}
1612
1613/// v7.2.1 — pick the highest-`hot_bytes` user table with a
1614/// BTree integer-PK index. Returns `(table, index_name)` so the
1615/// caller can dispatch through `freeze_oldest_to_cold`.
1616fn pick_freeze_target(db: &Database) -> Option<(String, String)> {
1617    let cat = db.engine.catalog();
1618    let mut best: Option<(String, String, u64)> = None;
1619    for name in cat.table_names() {
1620        let Some(t) = cat.get(&name) else { continue };
1621        if t.row_count() == 0 {
1622            continue;
1623        }
1624        let cols = &t.schema().columns;
1625        let Some(idx) = t.indices().iter().find(|i| {
1626            matches!(i.kind, spg_storage::IndexKind::BTree(_))
1627                && i.column_position < cols.len()
1628                && matches!(
1629                    cols[i.column_position].ty,
1630                    spg_storage::DataType::SmallInt
1631                        | spg_storage::DataType::Int
1632                        | spg_storage::DataType::BigInt
1633                )
1634        }) else {
1635            continue;
1636        };
1637        let hot = t.hot_bytes();
1638        match best {
1639            None => best = Some((name, idx.name.clone(), hot)),
1640            Some((_, _, best_hot)) if hot > best_hot => {
1641                best = Some((name, idx.name.clone(), hot));
1642            }
1643            _ => {}
1644        }
1645    }
1646    best.map(|(t, i, _)| (t, i))
1647}
1648
1649/// v7.7.6 — replay the first `to_seq` records of the WAL at
1650/// `wal_path` into a fresh engine and write the resulting
1651/// catalog snapshot to `out_db_path`. Same semantics as
1652/// `spg revert --wal … --to-seq N --out …` from the CLI:
1653///
1654///   - `to_seq == 0` → snapshot is the empty catalog
1655///   - WAL records beyond `to_seq` are not applied
1656///   - durability-checkpoint markers (v3 type 0x02) are
1657///     consumed without counting against the budget
1658///
1659/// Returns the number of statements actually applied
1660/// (`≤ to_seq`). The output snapshot is byte-identical to
1661/// what `Database::open_path(out_db_path)` would consume on
1662/// a subsequent open.
1663///
1664/// This is the "rewind" operator for an embedded database
1665/// that has been corrupted by a poison statement or a
1666/// half-applied migration. Pair with `cold_segment_paths`
1667/// preservation if your cold-tier files are still on disk.
1668///
1669/// # Errors
1670///
1671/// - `wal_path` unreadable or truncated mid-record
1672/// - WAL record decodes to invalid UTF-8 SQL
1673/// - WAL record's SQL is rejected by the engine
1674/// - `out_db_path` unwritable
1675pub fn revert_wal_to_seq(
1676    wal_path: impl AsRef<Path>,
1677    to_seq: u64,
1678    out_db_path: impl AsRef<Path>,
1679) -> Result<u64, EngineError> {
1680    let wal_bytes = std::fs::read(wal_path.as_ref()).map_err(io_err)?;
1681    let mut engine = Engine::new();
1682    let mut applied = 0u64;
1683    let mut cur = 0usize;
1684    while cur < wal_bytes.len() && applied < to_seq {
1685        let (sql_bytes, total) = decode_wal_record(&wal_bytes[cur..])?;
1686        cur += total;
1687        if sql_bytes.is_empty() {
1688            continue;
1689        }
1690        let sql = core::str::from_utf8(&sql_bytes).map_err(|e| {
1691            EngineError::Storage(spg_storage::StorageError::Corrupt(format!(
1692                "WAL record at offset {cur}: non-UTF-8 SQL: {e}"
1693            )))
1694        })?;
1695        engine.execute(sql)?;
1696        applied += 1;
1697    }
1698    let snapshot = engine.snapshot();
1699    std::fs::write(out_db_path.as_ref(), &snapshot).map_err(io_err)?;
1700    Ok(applied)
1701}
1702
1703/// v7.7.6 — decode one WAL record from a byte tail. Returns
1704/// `(sql_bytes, header_plus_payload_len)`. Handles the three
1705/// on-disk formats (v1 / v2 / v3) the same way the CLI
1706/// `decode_one_record` and the engine's `replay_wal_bytes`
1707/// do. CRCs are not re-validated; the caller's intent is
1708/// "apply", not "validate".
1709fn decode_wal_record(tail: &[u8]) -> Result<(Vec<u8>, usize), EngineError> {
1710    if tail.len() < 4 {
1711        return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
1712            format!("WAL truncated record: {} < 4 header bytes", tail.len()),
1713        )));
1714    }
1715    let raw_len = u32::from_le_bytes(tail[..4].try_into().unwrap());
1716    let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
1717    let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
1718    let len_mask = if is_v3 {
1719        !(WAL_V2_SENTINEL | WAL_V3_FLAG)
1720    } else {
1721        !WAL_V2_SENTINEL
1722    };
1723    let rec_len = (raw_len & len_mask) as usize;
1724    let header_len = if is_v3 {
1725        9
1726    } else if is_v2 {
1727        8
1728    } else {
1729        4
1730    };
1731    if tail.len() < header_len + rec_len {
1732        return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
1733            format!(
1734                "WAL truncated record: header+payload {} > available {}",
1735                header_len + rec_len,
1736                tail.len()
1737            ),
1738        )));
1739    }
1740    if is_v3 {
1741        let type_byte = tail[8];
1742        // v3 type 0x01 = auto_commit_sql (payload = SQL).
1743        // v3 type 0x02 = durability marker (no SQL to apply).
1744        // v4 type 0x10 = auto_commit_sql with 16-byte (lsn, ts)
1745        //                prefix between type and SQL — strip
1746        //                the prefix so the caller still sees raw
1747        //                SQL bytes.
1748        // Anything else is unknown.
1749        if type_byte == WAL_V3_TYPE_AUTO_COMMIT_SQL {
1750            let payload = &tail[header_len..header_len + rec_len];
1751            return Ok((payload.to_vec(), header_len + rec_len));
1752        }
1753        if type_byte == WAL_V4_TYPE_AUTO_COMMIT_SQL {
1754            let v4_total = header_len + WAL_V4_EXTRA_HEADER + rec_len;
1755            if tail.len() < v4_total {
1756                return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
1757                    format!(
1758                        "WAL truncated v4 record: header+payload {v4_total} > available {}",
1759                        tail.len()
1760                    ),
1761                )));
1762            }
1763            let sql_start = header_len + WAL_V4_EXTRA_HEADER;
1764            let sql_bytes = tail[sql_start..sql_start + rec_len].to_vec();
1765            return Ok((sql_bytes, v4_total));
1766        }
1767        // Caller treats empty payload as a skip-marker.
1768        return Ok((Vec::new(), header_len + rec_len));
1769    }
1770    let payload = &tail[header_len..header_len + rec_len];
1771    Ok((payload.to_vec(), header_len + rec_len))
1772}
1773
1774impl Drop for Database {
1775    fn drop(&mut self) {
1776        // v7.1 — best-effort final checkpoint when a persistent
1777        // Database leaves scope. Failures here go to stderr so
1778        // operators see them, but Drop can't propagate errors —
1779        // the WAL itself is already durable, so a checkpoint
1780        // miss only means the next boot replays a few more
1781        // records than strictly necessary.
1782        if self.persistence.is_some() {
1783            if let Err(e) = self.checkpoint() {
1784                eprintln!(
1785                    "spg-embedded: final checkpoint on Drop failed: {e:?} \
1786                     (WAL is intact; next open_path will replay)"
1787                );
1788            }
1789        }
1790        // v7.17.0 Phase 6.2 — release the cross-process lock on
1791        // clean shutdown. Failure is logged but never panics;
1792        // the operator can clear a stale lock via
1793        // `Database::force_unlock` if a crash kept the
1794        // directory around.
1795        if let Some(ctx) = &self.persistence
1796            && ctx.lock_path.exists()
1797        {
1798            if let Err(e) = std::fs::remove_dir(&ctx.lock_path) {
1799                eprintln!(
1800                    "spg-embedded: lock release on Drop failed for {}: {e:?}",
1801                    ctx.lock_path.display()
1802                );
1803            }
1804        }
1805    }
1806}
1807
1808impl Database {
1809    /// v7.17.0 Phase 6.2 — clear a stale cross-process lock.
1810    /// Use when a previous process crashed mid-session and
1811    /// left `<db_path>.lock` behind. Operators should confirm
1812    /// no other process is currently using the database before
1813    /// calling this — SPG cannot fingerprint stale-vs-live
1814    /// without a libc dep, which would violate spg-embedded's
1815    /// zero-deps charter.
1816    pub fn force_unlock(db_path: impl AsRef<Path>) -> Result<(), EngineError> {
1817        let lock_path = {
1818            let mut p = db_path.as_ref().to_path_buf();
1819            let name = p
1820                .file_name()
1821                .map(|n| {
1822                    let mut s = n.to_os_string();
1823                    s.push(".lock");
1824                    s
1825                })
1826                .unwrap_or_else(|| std::ffi::OsString::from(".lock"));
1827            p.set_file_name(name);
1828            p
1829        };
1830        if !lock_path.exists() {
1831            return Ok(());
1832        }
1833        std::fs::remove_dir(&lock_path).map_err(io_err)
1834    }
1835}
1836
1837/// v7.1 — turn a `std::io::Error` into the workspace's
1838/// `EngineError` shape. `EngineError::Storage(Corrupt(_))` is
1839/// the closest existing variant — io failures during boot or
1840/// during a WAL append surface as a storage-layer fault to
1841/// callers, which keeps the public error enum unchanged.
1842fn io_err(e: std::io::Error) -> EngineError {
1843    EngineError::Storage(spg_storage::StorageError::Corrupt(format!("io: {e}")))
1844}
1845
1846/// v7.2.2 — `Database` is `Send`, so the recommended sharing
1847/// pattern for multi-threaded callers is `Arc<Mutex<Database>>`:
1848///
1849/// ```no_run
1850/// use std::sync::{Arc, Mutex};
1851/// use spg_embedded::Database;
1852///
1853/// let db = Database::open_in_memory();
1854/// let shared = Arc::new(Mutex::new(db));
1855/// let shared_for_worker = Arc::clone(&shared);
1856/// std::thread::spawn(move || {
1857///     let mut guard = shared_for_worker.lock().unwrap();
1858///     guard.execute("INSERT INTO t VALUES (1)").unwrap();
1859/// });
1860/// ```
1861///
1862/// Internal `RwLock`-wrapped state — letting many threads
1863/// hold concurrent `&Database` for `SELECT` without contending
1864/// — is parked as STABILITY § "Out of v7.2"; multi-reader
1865/// embedded throughput needs a planner-side change to release
1866/// the engine read lock between scans, which is the v7.x
1867/// "Choice A" line of work already documented in v6.9.1's
1868/// carve-out.
1869#[allow(dead_code)]
1870fn _database_is_send() {
1871    fn assert_send<T: Send>() {}
1872    assert_send::<Database>();
1873}
1874
1875/// v6.10.3 — trait that maps a row's columns onto a user
1876/// struct's fields. v7.3.0 ships the [`spg_row!`] declarative
1877/// macro that generates `impl FromSpgRow for YourStruct` from
1878/// a struct definition (no proc-macro, no syn/quote/
1879/// proc-macro2 deps — the workspace's "0 external deps"
1880/// policy holds).
1881///
1882/// Implementors map a row's columns onto a user struct's
1883/// fields. Errors surface as `EngineError::Unsupported` so the
1884/// caller's error type stays uniform.
1885pub trait FromSpgRow: Sized {
1886    /// Decode one query result row into `Self`. Called once per
1887    /// row by [`Database::query_typed`]. The slice length equals
1888    /// the number of columns in the SELECT projection.
1889    fn from_spg_row(row: &[Value]) -> Result<Self, EngineError>;
1890}
1891
1892/// v7.3.0 — declarative macro that generates `FromSpgRow` impl
1893/// for a user struct. Avoids proc-macro deps
1894/// (syn/quote/proc-macro2) so the workspace's 0-deps policy
1895/// holds; the trade-off vs `#[derive(SpgRow)]` is that the
1896/// macro takes the entire struct definition (fields + types)
1897/// as input rather than annotating an existing struct.
1898///
1899/// ```no_run
1900/// use spg_embedded::{Database, spg_row, FromSpgRow};
1901///
1902/// spg_row! {
1903///     pub struct User {
1904///         pub id: i32,
1905///         pub name: String,
1906///     }
1907/// }
1908///
1909/// let mut db = Database::open_in_memory();
1910/// db.execute("CREATE TABLE users (id INT NOT NULL, name TEXT)").unwrap();
1911/// db.execute("INSERT INTO users VALUES (1, 'alice')").unwrap();
1912/// let users: Vec<User> = db.query_typed("SELECT id, name FROM users").unwrap();
1913/// ```
1914///
1915/// Supported field types: `i16`, `i32`, `i64`, `f32`, `f64`,
1916/// `bool`, `String`, `Vec<f32>` (for `VECTOR(N)` columns),
1917/// `Option<T>` of any of the above.
1918#[macro_export]
1919macro_rules! spg_row {
1920    (
1921        $(#[$meta:meta])*
1922        $vis:vis struct $name:ident {
1923            $(
1924                $(#[$fmeta:meta])*
1925                $fvis:vis $field:ident : $ty:ty,
1926            )*
1927        }
1928    ) => {
1929        $(#[$meta])*
1930        #[derive(Debug, Clone)]
1931        $vis struct $name {
1932            $(
1933                $(#[$fmeta])*
1934                $fvis $field : $ty,
1935            )*
1936        }
1937
1938        impl $crate::FromSpgRow for $name {
1939            fn from_spg_row(row: &[$crate::Value]) -> ::core::result::Result<Self, $crate::EngineError> {
1940                let mut __spg_row_iter = row.iter();
1941                $(
1942                    let $field: $ty = {
1943                        let v = __spg_row_iter
1944                            .next()
1945                            .ok_or_else(|| $crate::EngineError::Unsupported(
1946                                ::std::format!(
1947                                    "spg_row! {}: missing column for field `{}`",
1948                                    ::core::stringify!($name),
1949                                    ::core::stringify!($field)
1950                                )
1951                            ))?;
1952                        <$ty as $crate::FromSpgValue>::from_spg_value(v)
1953                            .map_err(|e| $crate::EngineError::Unsupported(
1954                                ::std::format!(
1955                                    "spg_row! {}: column `{}`: {}",
1956                                    ::core::stringify!($name),
1957                                    ::core::stringify!($field),
1958                                    e
1959                                )
1960                            ))?
1961                    };
1962                )*
1963                Ok(Self { $($field,)* })
1964            }
1965        }
1966    };
1967}
1968
1969/// v7.3.0 — per-column decoder used by `spg_row!`. Surface
1970/// covers every numeric / text / bytes / bool variant in
1971/// `Value`, plus `Option<T>` for nullable columns.
1972pub trait FromSpgValue: Sized {
1973    /// Decode one cell into `Self`. The returned `&'static str`
1974    /// is a short diagnostic for type mismatches (e.g. `"expected
1975    /// integer, got TEXT"`); callers wrap it into their own
1976    /// error type.
1977    fn from_spg_value(v: &Value) -> Result<Self, &'static str>;
1978}
1979
1980macro_rules! impl_from_value_int {
1981    ($($t:ty),* $(,)?) => {
1982        $(
1983            impl FromSpgValue for $t {
1984                fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
1985                    match v {
1986                        Value::SmallInt(n) => <$t>::try_from(*n).map_err(|_| "SmallInt does not fit target int type"),
1987                        Value::Int(n)      => <$t>::try_from(*n).map_err(|_| "Int does not fit target int type"),
1988                        Value::BigInt(n)   => <$t>::try_from(*n).map_err(|_| "BigInt does not fit target int type"),
1989                        Value::Null        => Err("NULL in non-Option int column"),
1990                        _ => Err("non-integer value in int column"),
1991                    }
1992                }
1993            }
1994        )*
1995    };
1996}
1997impl_from_value_int!(i16, i32, i64);
1998
1999impl FromSpgValue for f32 {
2000    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
2001        match v {
2002            Value::Float(f) => Ok(*f as f32),
2003            Value::Null => Err("NULL in non-Option float column"),
2004            _ => Err("non-float value in float column"),
2005        }
2006    }
2007}
2008
2009impl FromSpgValue for f64 {
2010    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
2011        match v {
2012            Value::Float(f) => Ok(*f),
2013            Value::Null => Err("NULL in non-Option float column"),
2014            _ => Err("non-float value in float column"),
2015        }
2016    }
2017}
2018
2019impl FromSpgValue for bool {
2020    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
2021        match v {
2022            Value::Bool(b) => Ok(*b),
2023            Value::Null => Err("NULL in non-Option bool column"),
2024            _ => Err("non-bool value in bool column"),
2025        }
2026    }
2027}
2028
2029impl FromSpgValue for String {
2030    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
2031        match v {
2032            Value::Text(s) => Ok(s.clone()),
2033            Value::Null => Err("NULL in non-Option text column"),
2034            _ => Err("non-text value in String column"),
2035        }
2036    }
2037}
2038
2039impl FromSpgValue for Vec<f32> {
2040    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
2041        match v {
2042            Value::Vector(xs) => Ok(xs.clone()),
2043            Value::Null => Err("NULL in non-Option vector column"),
2044            _ => Err("non-vector value in Vec<f32> column"),
2045        }
2046    }
2047}
2048
2049impl<T: FromSpgValue> FromSpgValue for Option<T> {
2050    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
2051        match v {
2052            Value::Null => Ok(None),
2053            other => T::from_spg_value(other).map(Some),
2054        }
2055    }
2056}
2057
2058#[cfg(test)]
2059mod tests {
2060    use super::*;
2061
2062    #[test]
2063    fn in_memory_create_insert_select() {
2064        let mut db = Database::open_in_memory();
2065        db.execute("CREATE TABLE t (id INT NOT NULL, name TEXT)")
2066            .unwrap();
2067        db.execute("INSERT INTO t VALUES (1, 'alice')").unwrap();
2068        db.execute("INSERT INTO t VALUES (2, 'bob')").unwrap();
2069        let rows = db.query("SELECT id FROM t WHERE id = 1").unwrap();
2070        assert_eq!(rows.len(), 1);
2071        match &rows[0][0] {
2072            Value::Int(1) => {}
2073            other => panic!("expected Int(1), got {other:?}"),
2074        }
2075    }
2076
2077    #[test]
2078    fn query_on_non_select_errors() {
2079        let mut db = Database::open_in_memory();
2080        db.execute("CREATE TABLE t (id INT)").unwrap();
2081        let r = db.query("INSERT INTO t VALUES (1)");
2082        assert!(r.is_err(), "query() on INSERT must error");
2083    }
2084
2085    #[test]
2086    fn snapshot_roundtrip() {
2087        let mut db = Database::open_in_memory();
2088        db.execute("CREATE TABLE t (id INT NOT NULL)").unwrap();
2089        db.execute("INSERT INTO t VALUES (42)").unwrap();
2090        let bytes = db.snapshot();
2091        let mut restored = Database::restore(&bytes).unwrap();
2092        let rows = restored.query("SELECT id FROM t WHERE id = 42").unwrap();
2093        assert_eq!(rows.len(), 1);
2094        match &rows[0][0] {
2095            Value::Int(42) => {}
2096            other => panic!("expected Int(42), got {other:?}"),
2097        }
2098    }
2099
2100    #[test]
2101    fn from_spg_row_trait_shape() {
2102        struct User {
2103            _id: i32,
2104        }
2105        impl FromSpgRow for User {
2106            fn from_spg_row(row: &[Value]) -> Result<Self, EngineError> {
2107                match row.first() {
2108                    Some(Value::Int(n)) => Ok(Self { _id: *n }),
2109                    _ => Err(EngineError::Unsupported("bad id".into())),
2110                }
2111            }
2112        }
2113        let row = vec![Value::Int(7)];
2114        let _u = User::from_spg_row(&row).unwrap();
2115    }
2116}