Skip to main content

spg_embedded/
lib.rs

1// v7.7.2 — every public item in this crate must carry a
2// doc-comment; new code that adds a `pub` without one fails CI.
3#![deny(missing_docs)]
4
5//! # spg-embedded
6//!
7//! Ergonomic embedded-mode entry point for SPG. Wraps the
8//! `spg-engine` execution layer for in-process applications
9//! that don't want to spin up a TCP listener / fork to the
10//! `spg-server` binary.
11//!
12//! ## Quick start
13//!
14//! ```no_run
15//! use spg_embedded::Database;
16//!
17//! // On-disk, durable. WAL fsynced per commit; auto-checkpoint
18//! // at 4 MiB WAL by default.
19//! let mut db = Database::open_path("/data/app.db").unwrap();
20//! db.execute("CREATE TABLE users (id INT NOT NULL, name TEXT)").unwrap();
21//! db.execute("INSERT INTO users VALUES (1, 'alice')").unwrap();
22//! let rows = db.query("SELECT name FROM users WHERE id = 1").unwrap();
23//! for row in &rows {
24//!     println!("{:?}", row);
25//! }
26//! ```
27//!
28//! ## Production checklist (v7.5)
29//!
30//! - **Persistence**: `Database::open_path(p)` writes a
31//!   crash-consistent WAL + periodic checkpoint snapshot. The
32//!   on-disk format is byte-identical to what `spg-server`
33//!   produces, so a database can move between modes without
34//!   conversion.
35//! - **Durability**: every `execute()` that mutates calls
36//!   `fsync` before returning `Ok`. There is no group commit
37//!   in embedded mode — every commit pays one fsync. If you
38//!   need batch throughput, wrap multiple statements in
39//!   [`Database::with_transaction`] which fsyncs only at
40//!   commit.
41//! - **Concurrency**: [`Database`] is `Send` but **not** `Sync`.
42//!   Share across threads via `Arc<Mutex<Database>>`. The
43//!   single-writer model is intentional — see
44//!   [STABILITY § A1](https://github.com/lihao/spg/blob/master/STABILITY.md).
45//! - **Background work**: [`Database::spawn_background_freezer`]
46//!   moves cold rows to disk-resident segments while you keep
47//!   serving requests. It runs in a dedicated thread; drop the
48//!   returned [`FreezerHandle`] (or call `stop()`) for clean
49//!   shutdown.
50//! - **Errors**: all public enums ([`EngineError`],
51//!   [`QueryResult`], [`Value`]) are `#[non_exhaustive]`. Match
52//!   them with a wildcard arm so future v7.x releases can add
53//!   variants without breaking your code.
54//!
55//! ## Panic contract
56//!
57//! - **No `execute()` / `query()` call panics on user input.**
58//!   Malformed SQL, type mismatches, missing tables — all
59//!   return `Err(EngineError::…)`. If you observe a panic on
60//!   a user-controlled string, that is a bug; file an issue.
61//! - The library panics **only** on internal invariant
62//!   violations (e.g., catalog snapshot magic mismatch, WAL
63//!   record CRC sentinel corruption that survived the boot-
64//!   time validation). These represent silent disk corruption
65//!   and an unwind would leak inconsistent state, so the
66//!   release profile uses `panic = abort` — your host process
67//!   dies fast rather than continuing on poisoned data.
68//! - If you cannot tolerate `panic = abort`, build with
69//!   `--profile release-dbg` (keeps unwind tables) and use
70//!   `std::panic::catch_unwind` at your application boundary.
71//!
72//! ## Why a separate crate?
73//!
74//! `spg-engine` is `no_std`-compatible (vendored alloc-only).
75//! The embedded-mode entry point uses `std` (filesystem,
76//! threading), so it lives in its own crate to keep the
77//! `no_std` boundary clean.
78
79pub use spg_engine::{CatalogSnapshot, Engine, EngineError, ParsedStatement, QueryResult};
80pub use spg_storage::{ColumnSchema, DataType, Value};
81
82/// v7.16.0 — handle for a parsed-and-planned SQL statement.
83/// Hand off to [`Database::execute_prepared`] / [`Database::query_prepared`]
84/// with a `&[Value]` slice carrying the bind parameters (PG-style
85/// `$1`, `$2`, … positional). Cheap to `Clone`; the underlying AST
86/// is shared by handle copies and cloned per bind call by the
87/// engine's executor.
88///
89/// The handle holds a snapshot of the AST at prepare time. If
90/// the engine's plan cache evicts the entry between prepare and
91/// execute (e.g. ANALYZE bumps the statistics version) the
92/// stored AST keeps working — `execute_prepared` operates on
93/// the handle's clone, not the cache entry.
94#[derive(Debug, Clone)]
95pub struct Statement {
96    /// The parsed + planned AST. `spg-engine::prepare_cached`
97    /// returns it as a clone of the cached plan, so any rewrite
98    /// passes (`expand_group_by_all`, `reorder_joins`, …) have
99    /// already run.
100    pub(crate) stmt: ParsedStatement,
101    /// Original SQL source, kept for `Display` / debug only.
102    /// WAL persistence renders from the AST so a bind-time
103    /// rewrite of `$1..$N` survives replay.
104    pub(crate) sql: String,
105}
106
107impl Statement {
108    /// Borrow the original SQL source — useful for tracing and
109    /// debug logs. WAL replay does NOT use this; it serialises
110    /// the bind-final AST instead.
111    #[must_use]
112    pub fn sql(&self) -> &str {
113        &self.sql
114    }
115}
116
117/// v7.16.0 — internal WAL helper. Mirrors what
118/// `Engine::execute_prepared` does to the cloned AST so the WAL
119/// record carries the bind-final SQL text (so replay's
120/// simple-query path reconstructs the same row state without
121/// needing the original `Statement` handle to still be alive).
122/// Errors from the underlying engine helper would only fire if
123/// the bind-final stmt referenced a placeholder past the params
124/// slice — and that case has already errored in the executor
125/// above before this helper runs, so we discard the Result here.
126fn wal_render_with_params(stmt: &mut ParsedStatement, params: &[Value]) {
127    let _ = spg_engine::substitute_placeholders(stmt, params);
128}
129
130use std::collections::BTreeMap;
131use std::fs::{File, OpenOptions};
132use std::io::Write;
133use std::path::{Path, PathBuf};
134use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
135use std::sync::{Arc, Mutex};
136use std::thread::{self, JoinHandle};
137use std::time::{Duration, SystemTime, UNIX_EPOCH};
138
139/// v7.11.3 — wall-clock provider injected into every embedded
140/// `Engine`. Microseconds since the Unix epoch; clamps to
141/// `i64::MAX` if the system clock is far-future. Used by SQL's
142/// `NOW()` / `CURRENT_TIMESTAMP` / `CURRENT_DATE` rewrite layer
143/// so PG-idiomatic time queries work without the caller wiring
144/// their own clock.
145fn wall_clock_micros() -> i64 {
146    SystemTime::now()
147        .duration_since(UNIX_EPOCH)
148        .map_or(0, |d| i64::try_from(d.as_micros()).unwrap_or(i64::MAX))
149}
150
151use spg_manifest::{CatalogManifest, ColdSegmentEntry, manifest_path as spg_manifest_path};
152
153// -- v7.1 WAL format constants (mirror `spg-server`'s) ---------
154// Kept private so callers can't mis-frame records; the v3 layout
155// is the same the server uses, so a `spg-server` boot can read a
156// database an embedded process wrote and vice versa.
157const WAL_V2_SENTINEL: u32 = 0x8000_0000;
158const WAL_V3_FLAG: u32 = 0x4000_0000;
159const WAL_V3_TYPE_AUTO_COMMIT_SQL: u8 = 0x01;
160/// v7.18 — durability checkpoint marker stays at 0x02 (skipped on replay).
161const WAL_V3_TYPE_DURABILITY_CHECKPOINT: u8 = 0x02;
162/// v7.18 PITR — auto-commit-sql record with appended (commit_lsn,
163/// commit_unix_us) fields so replay can target a specific point in
164/// time. Backward-compat: v3 records (type 0x01) keep working, the
165/// envelope flag bits are unchanged. The new type byte is the
166/// schema-version discriminator.
167const WAL_V4_TYPE_AUTO_COMMIT_SQL: u8 = 0x10;
168/// v7.18 — sentinel for "no wall clock" inside a v4 record's
169/// commit_unix_us slot. Restore-to-timestamp skips records with
170/// this sentinel (no time anchor); LSN-based restore is
171/// unaffected.
172const WAL_V4_NO_CLOCK: i64 = i64::MIN;
173/// v7.18 — extra header bytes after the type byte in a v4 record:
174/// 8 bytes commit_lsn (u64 LE) + 8 bytes commit_unix_us (i64 LE).
175const WAL_V4_EXTRA_HEADER: usize = 16;
176/// v7.18 PITR — checkpoint anchor record written to the WAL *before*
177/// the snapshot file replaces the on-disk catalog. Carries the
178/// (lsn, ts, snapshot_path) triple so restore tooling can find the
179/// matching base snapshot without scanning the filesystem. Replay
180/// dispatch skips it (same as the v3 durability marker).
181const WAL_V4_TYPE_CHECKPOINT_MARKER: u8 = 0x11;
182
183/// v7.1 — auto-checkpoint threshold. Once the WAL grows past
184/// this many bytes, the next successful `execute()` call ends
185/// with a `checkpoint()` so the WAL stays bounded. Tunable via
186/// `SPG_EMBEDDED_CHECKPOINT_BYTES` env.
187fn default_checkpoint_threshold_bytes() -> u64 {
188    std::env::var("SPG_EMBEDDED_CHECKPOINT_BYTES")
189        .ok()
190        .and_then(|s| s.parse::<u64>().ok())
191        .filter(|&n| n > 0)
192        .unwrap_or(4 * 1024 * 1024)
193}
194
195/// v7.1 — encode one v3 `auto_commit_sql` record. Layout:
196///
197/// ```text
198/// [u32 LE (len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
199/// [u32 LE crc32 over (type_byte || sql_bytes)]
200/// [u8 type = 0x01]
201/// [sql bytes]
202/// ```
203fn encode_v3_auto_commit(sql: &str) -> Vec<u8> {
204    let payload = sql.as_bytes();
205    let mut crc_buf = Vec::with_capacity(1 + payload.len());
206    crc_buf.push(WAL_V3_TYPE_AUTO_COMMIT_SQL);
207    crc_buf.extend_from_slice(payload);
208    let crc = spg_crypto::crc32::crc32(&crc_buf);
209    let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
210    let mut out = Vec::with_capacity(4 + 4 + 1 + payload.len());
211    out.extend_from_slice(&header);
212    out.extend_from_slice(&crc.to_le_bytes());
213    out.push(WAL_V3_TYPE_AUTO_COMMIT_SQL);
214    out.extend_from_slice(payload);
215    out
216}
217
218/// v7.19 P3 — retention sweep loop. Runs in a dedicated thread
219/// spawned by `Database::open_path` when `SPG_PITR_RETENTION_HOURS`
220/// is set to a non-zero value. Wakes every
221/// `SPG_PITR_RETENTION_CHECK_SEC` (default 60 s), enumerates chunks
222/// under `wal_dir`, archives via `SPG_PITR_ARCHIVE_CMD` if set, and
223/// deletes anything older than `retention_hours`.
224///
225/// Loud-failure posture matches PG's `archive_command`: if the
226/// archive command returns non-zero, the chunk stays on disk and
227/// a warning prints to stderr. The retention sweep doesn't delete
228/// a chunk it failed to archive.
229fn retention_sweep_loop(
230    wal_dir: PathBuf,
231    retention_hours: u64,
232    check_interval: std::time::Duration,
233    archive_cmd: Option<String>,
234    shutdown: Arc<AtomicBool>,
235) {
236    while !shutdown.load(Ordering::SeqCst) {
237        if let Err(e) = retention_sweep_once(&wal_dir, retention_hours, archive_cmd.as_deref()) {
238            eprintln!("spg-embedded: retention sweep error: {e}");
239        }
240        // Sleep in short ticks so shutdown isn't blocked on a
241        // 60 s naptime when Drop signals.
242        let mut elapsed = std::time::Duration::ZERO;
243        let tick = std::time::Duration::from_millis(250);
244        while elapsed < check_interval {
245            if shutdown.load(Ordering::SeqCst) {
246                return;
247            }
248            std::thread::sleep(tick);
249            elapsed += tick;
250        }
251    }
252}
253
254/// v7.19 P3 — one retention sweep pass over `wal_dir`. Extracted
255/// from the loop so tests can drive it directly. Public so the
256/// e2e_pitr_retention integration test (and any future operator
257/// tooling that wants synchronous retention) can call it.
258pub fn retention_sweep_once(
259    wal_dir: &Path,
260    retention_hours: u64,
261    archive_cmd: Option<&str>,
262) -> std::io::Result<()> {
263    if !wal_dir.exists() {
264        return Ok(());
265    }
266    let now_us = wall_clock_micros();
267    let cutoff_us = (now_us as i128 - (retention_hours as i128 * 3_600 * 1_000_000)) as i64;
268    let chunks = sorted_wal_chunks(wal_dir)?;
269    for chunk in chunks {
270        // Don't sweep the most-recent chunk; it's the live one
271        // execute() is appending to. Compare against the largest
272        // filename-prefix unix_us.
273        let stem = match chunk.file_stem().and_then(|s| s.to_str()) {
274            Some(s) => s,
275            None => continue,
276        };
277        let chunk_us: i64 = stem
278            .split_once('_')
279            .and_then(|(prefix, _)| i64::from_str_radix(prefix, 16).ok())
280            .unwrap_or(0);
281        if chunk_us >= cutoff_us {
282            continue;
283        }
284        // Archive first if requested.
285        if let Some(cmd) = archive_cmd {
286            if !cmd.is_empty() {
287                let output = std::process::Command::new("sh")
288                    .arg("-c")
289                    .arg(cmd)
290                    .arg("--")
291                    .arg(&chunk)
292                    .output()?;
293                if !output.status.success() {
294                    eprintln!(
295                        "spg-embedded: SPG_PITR_ARCHIVE_CMD failed for {} (exit {}); chunk stays on disk",
296                        chunk.display(),
297                        output.status.code().unwrap_or(-1)
298                    );
299                    continue;
300                }
301            }
302        }
303        // Delete the chunk + its sibling .checksum if present.
304        if let Err(e) = std::fs::remove_file(&chunk) {
305            eprintln!(
306                "spg-embedded: retention remove {} failed: {e}",
307                chunk.display()
308            );
309            continue;
310        }
311        let mut cs = chunk.clone();
312        let mut name = cs.file_name().map(|n| n.to_os_string()).unwrap_or_default();
313        name.push(".checksum");
314        cs.set_file_name(name);
315        let _ = std::fs::remove_file(&cs);
316    }
317    Ok(())
318}
319
320fn pitr_retention_hours() -> u64 {
321    std::env::var("SPG_PITR_RETENTION_HOURS")
322        .ok()
323        .and_then(|s| s.parse::<u64>().ok())
324        .unwrap_or(0)
325}
326
327fn pitr_retention_check_sec() -> u64 {
328    std::env::var("SPG_PITR_RETENTION_CHECK_SEC")
329        .ok()
330        .and_then(|s| s.parse::<u64>().ok())
331        .filter(|&n| n > 0)
332        .unwrap_or(60)
333}
334
335fn pitr_archive_cmd() -> Option<String> {
336    std::env::var("SPG_PITR_ARCHIVE_CMD")
337        .ok()
338        .filter(|s| !s.is_empty())
339}
340
341/// v7.19 — replay every record from `wal_bytes` whose
342/// `commit_lsn` is strictly greater than `floor_lsn`. v3 records
343/// (no LSN) and v4 records with `commit_lsn <= floor_lsn` are
344/// skipped — the snapshot loaded ahead of this call already
345/// reflects them, and re-applying would DuplicateTable /
346/// double-insert. v3 records inside the legacy migration chunk
347/// always apply because the migration sets `floor_lsn = 0` and
348/// v3 records carry no LSN to compare; the pre-migration
349/// behaviour (every record replays) is what the migration
350/// preserves.
351///
352/// Returns the count of records successfully applied. Same
353/// torn-tail semantics as `replay_wal_into_engine`.
354fn replay_wal_filtered(
355    wal_bytes: &[u8],
356    engine: &mut Engine,
357    floor_lsn: u64,
358) -> Result<usize, String> {
359    let records = parse_wal_records(wal_bytes)?;
360    let mut applied = 0usize;
361    for r in &records {
362        // Skip markers + non-SQL records.
363        if r.type_byte == WAL_V3_TYPE_DURABILITY_CHECKPOINT
364            || r.type_byte == WAL_V4_TYPE_CHECKPOINT_MARKER
365        {
366            continue;
367        }
368        // v4 SQL records carry an LSN. Apply iff strictly above
369        // the snapshot floor.
370        if r.type_byte == WAL_V4_TYPE_AUTO_COMMIT_SQL {
371            if let Some(lsn) = r.commit_lsn {
372                if lsn <= floor_lsn {
373                    continue;
374                }
375            }
376        }
377        // v3 records (type 0x01, no LSN) always apply — the
378        // legacy migration path is the only place they appear,
379        // and floor_lsn=0 there.
380        let sql = match std::str::from_utf8(r.sql) {
381            Ok(s) => s,
382            Err(e) => return Err(format!("non-UTF-8 SQL at offset {}: {e}", r.offset)),
383        };
384        engine.execute(sql).map_err(|e| {
385            format!(
386                "WAL replay: apply {sql:?} at offset {} rejected: {e:?}",
387                r.offset
388            )
389        })?;
390        applied += 1;
391    }
392    Ok(applied)
393}
394
395/// v7.19 — WAL chunk filename format. Zero-padded 16-digit
396/// hex on both parts so default lexicographic sort matches
397/// numeric order, with the unix_us prefix coming first so
398/// the on-disk listing is chronological too.
399fn chunk_filename(unix_us: i64, leading_lsn: u64) -> String {
400    // Negative timestamps shouldn't happen in practice (we sit
401    // post-1970), but clamp to 0 so the zero-padded
402    // representation stays sortable.
403    let us = unix_us.max(0) as u64;
404    format!("{us:016x}_{leading_lsn:016x}.wal")
405}
406
407/// v7.19 — filename used for the legacy single-file WAL when
408/// `open_path` migrates a v7.18-layout database into the new
409/// chunk directory. Lexicographically smallest possible value
410/// so subsequent chunks sort after it.
411fn legacy_chunk_filename() -> String {
412    chunk_filename(0, 0)
413}
414
415/// v7.19 — list every `.wal` file in `wal_dir` in
416/// lexicographic order (which doubles as chunk-creation
417/// order thanks to the zero-padded filename format).
418fn sorted_wal_chunks(wal_dir: &Path) -> std::io::Result<Vec<PathBuf>> {
419    let mut paths = Vec::new();
420    let read_dir = match std::fs::read_dir(wal_dir) {
421        Ok(rd) => rd,
422        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(paths),
423        Err(e) => return Err(e),
424    };
425    for entry in read_dir {
426        let entry = entry?;
427        let path = entry.path();
428        if path.extension().and_then(|s| s.to_str()) == Some("wal") {
429            paths.push(path);
430        }
431    }
432    paths.sort();
433    Ok(paths)
434}
435
436/// v7.18 PITR — encode one v4 `checkpoint_marker` record. Layout:
437///
438/// ```text
439/// [u32 LE (payload_len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
440/// [u32 LE crc32 over (type_byte || payload)]
441/// [u8  type = 0x11]
442/// payload:
443///   [u64 LE checkpoint_lsn]
444///   [i64 LE checkpoint_unix_us  (WAL_V4_NO_CLOCK if no clock)]
445///   [u16 LE snapshot_path_len]
446///   [snapshot_path_bytes]
447/// ```
448///
449/// `payload_len` covers only the payload — keeping the framing
450/// uniform across v3 / v4 record types so torn-write detection in
451/// `replay_wal_into_engine` stays trivial.
452fn encode_v4_checkpoint_marker(
453    checkpoint_lsn: u64,
454    checkpoint_unix_us: i64,
455    snapshot_path: &Path,
456) -> Vec<u8> {
457    let snapshot_bytes = snapshot_path.to_string_lossy().into_owned();
458    let snap_payload = snapshot_bytes.as_bytes();
459    let snap_len_u16: u16 = snap_payload.len().min(u16::MAX as usize) as u16;
460    let mut payload = Vec::with_capacity(8 + 8 + 2 + snap_payload.len());
461    payload.extend_from_slice(&checkpoint_lsn.to_le_bytes());
462    payload.extend_from_slice(&checkpoint_unix_us.to_le_bytes());
463    payload.extend_from_slice(&snap_len_u16.to_le_bytes());
464    payload.extend_from_slice(&snap_payload[..snap_len_u16 as usize]);
465    let mut crc_buf = Vec::with_capacity(1 + payload.len());
466    crc_buf.push(WAL_V4_TYPE_CHECKPOINT_MARKER);
467    crc_buf.extend_from_slice(&payload);
468    let crc = spg_crypto::crc32::crc32(&crc_buf);
469    let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
470    let mut out = Vec::with_capacity(4 + 4 + 1 + payload.len());
471    out.extend_from_slice(&header);
472    out.extend_from_slice(&crc.to_le_bytes());
473    out.push(WAL_V4_TYPE_CHECKPOINT_MARKER);
474    out.extend_from_slice(&payload);
475    out
476}
477
478/// v7.18 PITR — encode one v4 `auto_commit_sql` record. Layout:
479///
480/// ```text
481/// [u32 LE (sql_len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
482/// [u32 LE crc32 over (type_byte || lsn || ts || sql_bytes)]
483/// [u8  type = 0x10]
484/// [u64 LE commit_lsn]
485/// [i64 LE commit_unix_us  (= WAL_V4_NO_CLOCK when no ClockFn)]
486/// [sql bytes]
487/// ```
488///
489/// `sql_len` field stays the SQL byte count — same shape as v3 — so
490/// replay-buffer torn-write detection compares against
491/// `WAL_V4_EXTRA_HEADER + sql_len`. v3 records (type 0x01) stay
492/// readable by the same loop with their original 9-byte header
493/// arithmetic.
494fn encode_v4_auto_commit(sql: &str, commit_lsn: u64, commit_unix_us: i64) -> Vec<u8> {
495    let payload = sql.as_bytes();
496    let mut crc_buf = Vec::with_capacity(1 + WAL_V4_EXTRA_HEADER + payload.len());
497    crc_buf.push(WAL_V4_TYPE_AUTO_COMMIT_SQL);
498    crc_buf.extend_from_slice(&commit_lsn.to_le_bytes());
499    crc_buf.extend_from_slice(&commit_unix_us.to_le_bytes());
500    crc_buf.extend_from_slice(payload);
501    let crc = spg_crypto::crc32::crc32(&crc_buf);
502    let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
503    let mut out = Vec::with_capacity(4 + 4 + 1 + WAL_V4_EXTRA_HEADER + payload.len());
504    out.extend_from_slice(&header);
505    out.extend_from_slice(&crc.to_le_bytes());
506    out.push(WAL_V4_TYPE_AUTO_COMMIT_SQL);
507    out.extend_from_slice(&commit_lsn.to_le_bytes());
508    out.extend_from_slice(&commit_unix_us.to_le_bytes());
509    out.extend_from_slice(payload);
510    out
511}
512
513/// v7.1 — decode + apply every record in `wal_bytes` to `engine`.
514/// Returns the count of records successfully applied. A truncated
515/// trailing record (mid-write torn) is dropped silently — the
516/// same recovery story `spg-server`'s boot path uses.
517fn replay_wal_into_engine(wal_bytes: &[u8], engine: &mut Engine) -> Result<usize, String> {
518    let mut applied = 0usize;
519    let mut cur = 0usize;
520    while cur < wal_bytes.len() {
521        if wal_bytes.len() - cur < 4 {
522            // Trailing partial header — torn write, drop and stop.
523            break;
524        }
525        let raw_len = u32::from_le_bytes(wal_bytes[cur..cur + 4].try_into().unwrap());
526        let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
527        let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
528        let len_mask = if is_v3 {
529            !(WAL_V2_SENTINEL | WAL_V3_FLAG)
530        } else {
531            !WAL_V2_SENTINEL
532        };
533        let rec_len = (raw_len & len_mask) as usize;
534        let header_len = if is_v3 {
535            9
536        } else if is_v2 {
537            8
538        } else {
539            4
540        };
541        if wal_bytes.len() - cur < header_len + rec_len {
542            // Torn record at the tail — drop, stop.
543            break;
544        }
545        if is_v3 {
546            let type_byte = wal_bytes[cur + 8];
547            match type_byte {
548                WAL_V3_TYPE_AUTO_COMMIT_SQL => {}
549                WAL_V3_TYPE_DURABILITY_CHECKPOINT => {
550                    // durability_checkpoint marker — skip, no SQL.
551                    cur += header_len + rec_len;
552                    continue;
553                }
554                WAL_V4_TYPE_CHECKPOINT_MARKER => {
555                    // v7.18 PITR — checkpoint anchor, skip on replay
556                    // (engine state past this point reflects the
557                    // matching snapshot already loaded by the caller).
558                    cur += header_len + rec_len;
559                    continue;
560                }
561                WAL_V4_TYPE_AUTO_COMMIT_SQL => {
562                    // v7.18 PITR — v4 record carries 16 bytes of
563                    // (commit_lsn, commit_unix_us) between the type
564                    // byte and the SQL payload. Replay reads them but
565                    // does not enforce them — the engine doesn't
566                    // surface LSN/clock here. Restore tooling
567                    // (spgctl) parses them via parse_wal_record below.
568                    let v4_total = header_len + WAL_V4_EXTRA_HEADER + rec_len;
569                    if wal_bytes.len() - cur < v4_total {
570                        // Torn v4 record at the tail — drop, stop.
571                        break;
572                    }
573                    let sql_start = cur + header_len + WAL_V4_EXTRA_HEADER;
574                    let sql_bytes = &wal_bytes[sql_start..sql_start + rec_len];
575                    let sql = std::str::from_utf8(sql_bytes)
576                        .map_err(|e| format!("WAL replay: non-UTF-8 SQL at offset {cur}: {e}"))?;
577                    engine.execute(sql).map_err(|e| {
578                        format!("WAL replay: apply {sql:?} at offset {cur} rejected: {e:?}")
579                    })?;
580                    applied += 1;
581                    cur += v4_total;
582                    continue;
583                }
584                other => {
585                    return Err(format!(
586                        "WAL replay: unknown v3 type byte {other:#04x} at offset {cur}"
587                    ));
588                }
589            }
590        }
591        let sql_bytes = &wal_bytes[cur + header_len..cur + header_len + rec_len];
592        let sql = std::str::from_utf8(sql_bytes)
593            .map_err(|e| format!("WAL replay: non-UTF-8 SQL at offset {cur}: {e}"))?;
594        engine
595            .execute(sql)
596            .map_err(|e| format!("WAL replay: apply {sql:?} at offset {cur} rejected: {e:?}"))?;
597        applied += 1;
598        cur += header_len + rec_len;
599    }
600    Ok(applied)
601}
602
603/// v7.18 PITR — parsed WAL record, surfaced for restore / verify
604/// tooling. The replay loop above doesn't expose LSN/timestamp;
605/// `spgctl restore --to <timestamp>` and `spgctl verify` need them.
606/// Returned offsets are byte-positions inside the WAL buffer.
607#[derive(Debug, Clone)]
608pub struct WalRecord<'a> {
609    /// Byte offset in the WAL buffer where this record starts.
610    pub offset: usize,
611    /// Type byte (0x01 = v3 auto-commit, 0x10 = v4 auto-commit,
612    /// 0x02 = durability checkpoint marker).
613    pub type_byte: u8,
614    /// `Some(lsn)` for v4 records, `None` for v3.
615    pub commit_lsn: Option<u64>,
616    /// `Some(unix_us)` for v4 records carrying a clock-set timestamp,
617    /// `None` for v3 or for v4 records explicitly written with
618    /// `WAL_V4_NO_CLOCK` (sentinel for "no ClockFn at commit time").
619    pub commit_unix_us: Option<i64>,
620    /// SQL payload as borrowed bytes. Empty for durability markers.
621    pub sql: &'a [u8],
622}
623
624/// v7.18 PITR — iterate over `wal_bytes` yielding one `WalRecord`
625/// per intact record. Torn-tail records terminate iteration
626/// silently (same recovery story as `replay_wal_into_engine`).
627/// Unknown type bytes inside a v3 envelope return `Err` so the
628/// caller knows the WAL was written by a newer SPG.
629pub fn parse_wal_records(wal_bytes: &[u8]) -> Result<Vec<WalRecord<'_>>, String> {
630    let mut out = Vec::new();
631    let mut cur = 0usize;
632    while cur < wal_bytes.len() {
633        if wal_bytes.len() - cur < 4 {
634            break;
635        }
636        let raw_len = u32::from_le_bytes(wal_bytes[cur..cur + 4].try_into().unwrap());
637        let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
638        let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
639        let len_mask = if is_v3 {
640            !(WAL_V2_SENTINEL | WAL_V3_FLAG)
641        } else {
642            !WAL_V2_SENTINEL
643        };
644        let rec_len = (raw_len & len_mask) as usize;
645        let header_len = if is_v3 {
646            9
647        } else if is_v2 {
648            8
649        } else {
650            4
651        };
652        if wal_bytes.len() - cur < header_len + rec_len {
653            break;
654        }
655        if !is_v3 {
656            // v1 / v2 records carry no type byte; treat as legacy
657            // auto-commit SQL with no LSN/time.
658            let sql = &wal_bytes[cur + header_len..cur + header_len + rec_len];
659            out.push(WalRecord {
660                offset: cur,
661                type_byte: WAL_V3_TYPE_AUTO_COMMIT_SQL,
662                commit_lsn: None,
663                commit_unix_us: None,
664                sql,
665            });
666            cur += header_len + rec_len;
667            continue;
668        }
669        let type_byte = wal_bytes[cur + 8];
670        match type_byte {
671            WAL_V3_TYPE_AUTO_COMMIT_SQL => {
672                let sql = &wal_bytes[cur + header_len..cur + header_len + rec_len];
673                out.push(WalRecord {
674                    offset: cur,
675                    type_byte,
676                    commit_lsn: None,
677                    commit_unix_us: None,
678                    sql,
679                });
680                cur += header_len + rec_len;
681            }
682            WAL_V3_TYPE_DURABILITY_CHECKPOINT => {
683                out.push(WalRecord {
684                    offset: cur,
685                    type_byte,
686                    commit_lsn: None,
687                    commit_unix_us: None,
688                    sql: &[],
689                });
690                cur += header_len + rec_len;
691            }
692            WAL_V4_TYPE_CHECKPOINT_MARKER => {
693                // v7.18 PITR — payload = (lsn u64)(ts i64)(path_len u16)(path bytes).
694                // We surface lsn + ts on the WalRecord; the path lives
695                // in `sql` since the type byte already disambiguates
696                // record meaning and adding a dedicated field would
697                // bloat the iterator return type for every variant.
698                if rec_len < 18 {
699                    return Err(format!(
700                        "WAL parse: checkpoint marker at offset {cur} too short ({rec_len} bytes)"
701                    ));
702                }
703                let lsn = u64::from_le_bytes(
704                    wal_bytes[cur + header_len..cur + header_len + 8]
705                        .try_into()
706                        .unwrap(),
707                );
708                let ts_raw = i64::from_le_bytes(
709                    wal_bytes[cur + header_len + 8..cur + header_len + 16]
710                        .try_into()
711                        .unwrap(),
712                );
713                let path_len = u16::from_le_bytes(
714                    wal_bytes[cur + header_len + 16..cur + header_len + 18]
715                        .try_into()
716                        .unwrap(),
717                ) as usize;
718                if rec_len < 18 + path_len {
719                    return Err(format!(
720                        "WAL parse: checkpoint marker at offset {cur} truncated path"
721                    ));
722                }
723                let path_start = cur + header_len + 18;
724                let path_bytes = &wal_bytes[path_start..path_start + path_len];
725                let commit_unix_us = if ts_raw == WAL_V4_NO_CLOCK {
726                    None
727                } else {
728                    Some(ts_raw)
729                };
730                out.push(WalRecord {
731                    offset: cur,
732                    type_byte,
733                    commit_lsn: Some(lsn),
734                    commit_unix_us,
735                    sql: path_bytes,
736                });
737                cur += header_len + rec_len;
738            }
739            WAL_V4_TYPE_AUTO_COMMIT_SQL => {
740                let v4_total = header_len + WAL_V4_EXTRA_HEADER + rec_len;
741                if wal_bytes.len() - cur < v4_total {
742                    break;
743                }
744                let lsn = u64::from_le_bytes(
745                    wal_bytes[cur + header_len..cur + header_len + 8]
746                        .try_into()
747                        .unwrap(),
748                );
749                let ts_raw = i64::from_le_bytes(
750                    wal_bytes[cur + header_len + 8..cur + header_len + 16]
751                        .try_into()
752                        .unwrap(),
753                );
754                let commit_unix_us = if ts_raw == WAL_V4_NO_CLOCK {
755                    None
756                } else {
757                    Some(ts_raw)
758                };
759                let sql_start = cur + header_len + WAL_V4_EXTRA_HEADER;
760                let sql = &wal_bytes[sql_start..sql_start + rec_len];
761                out.push(WalRecord {
762                    offset: cur,
763                    type_byte,
764                    commit_lsn: Some(lsn),
765                    commit_unix_us,
766                    sql,
767                });
768                cur += v4_total;
769            }
770            other => {
771                return Err(format!(
772                    "WAL parse: unknown type byte {other:#04x} at offset {cur}"
773                ));
774            }
775        }
776    }
777    Ok(out)
778}
779
780/// v7.1 — predicate for "should the next `execute()` mutate the
781/// WAL?" Returns `false` for SELECT / SHOW / EXPLAIN / BEGIN /
782/// COMMIT / ROLLBACK and the SPG-specific verbs that don't go
783/// through the auto-commit record path on the server (CHECKPOINT,
784/// COMPACT). Conservative: anything we don't explicitly know is
785/// read-only falls through to "write a WAL record".
786fn sql_is_read_only(sql: &str) -> bool {
787    let t = sql.trim_start();
788    let head = t
789        .split(|c: char| c.is_whitespace() || c == ';' || c == '(')
790        .next()
791        .unwrap_or("");
792    matches!(
793        head.to_ascii_lowercase().as_str(),
794        "select"
795            | "show"
796            | "explain"
797            | "begin"
798            | "commit"
799            | "rollback"
800            | "checkpoint"
801            | "compact"
802            | "wait"
803            | "with"
804    )
805}
806
807/// Embedded SPG database handle. Owns an `Engine` + provides
808/// ergonomic wrappers around `execute` and `query`. Drops the
809/// engine on `Drop` — no WAL flush / fsync, because v6.10.3
810/// is in-memory only.
811#[derive(Debug)]
812pub struct Database {
813    engine: Engine,
814    /// v7.1 — persistence sidecar. When `Some(p)`, every
815    /// `execute(sql)` that mutates state appends a v4
816    /// `auto_commit_sql` WAL record + fsyncs before the call
817    /// returns; `Drop` writes a final catalog snapshot to
818    /// `<db_path>` so the next session boots from a clean
819    /// snapshot + an empty WAL. `None` = in-memory only (the
820    /// v6.10.3 shape).
821    persistence: Option<PersistenceCtx>,
822    /// v7.18 PITR — monotonic per-database commit LSN. Increments
823    /// before each successful WAL append; bootstrapped at
824    /// open_path from `max(parse_wal_records → commit_lsn)` so
825    /// reopen never reuses an LSN. In-memory databases start at
826    /// 0 and never advance (no WAL = no LSN-meaningful records).
827    commit_lsn: AtomicU64,
828}
829
830#[derive(Debug)]
831#[allow(dead_code)] // `wal_dir`/`current_chunk_path` are read at boot; kept for Drop/diag introspection.
832struct PersistenceCtx {
833    db_path: PathBuf,
834    /// v7.19 — WAL chunk directory at `<db_path>.wal/`.
835    /// Replaces the v7.18 single-file `<db_path>.wal` layout.
836    /// Each chunk file inside is named
837    /// `<unix_us>_<leading_lsn>.wal` (zero-padded to 16 digits
838    /// so default-lex sort = LSN order).
839    wal_dir: PathBuf,
840    /// Path of the currently-open chunk file inside `wal_dir`.
841    /// Rotated at checkpoint and whenever `wal_len` crosses
842    /// `checkpoint_threshold_bytes`.
843    current_chunk_path: PathBuf,
844    /// v7.19 P3 — retention sweeper handle. `Some` when
845    /// `SPG_PITR_RETENTION_HOURS > 0` at open_path time; `None`
846    /// when retention is disabled (the default; v7.18 behaviour
847    /// preserved). The thread polls `wal_dir` every
848    /// `SPG_PITR_RETENTION_CHECK_SEC` seconds, archives via
849    /// `SPG_PITR_ARCHIVE_CMD` if set, then deletes chunks older
850    /// than the retention window. Signalled to exit via
851    /// `retention_shutdown` on Drop.
852    retention_shutdown: Option<Arc<AtomicBool>>,
853    retention_thread: Option<std::thread::JoinHandle<()>>,
854    /// Append-only handle on `current_chunk_path`.
855    wal: File,
856    /// Cached length of the current chunk so `execute()` skips
857    /// a `stat()` per write. Refreshed on append + reset to 0
858    /// on rotation.
859    wal_len: u64,
860    checkpoint_threshold_bytes: u64,
861    /// v7.1.4 — `<db_path>.spg/segments/` directory. Cold-tier
862    /// segments produced by `freeze_oldest_to_cold` / compaction
863    /// are persisted here as `seg_<id>.spg` files; the manifest
864    /// at `<db_path>.spg/manifest.v10` records every active
865    /// segment + its CRC32 so the next boot can verify + reload.
866    cold_segments_dir: PathBuf,
867    cold_segment_paths: BTreeMap<u32, PathBuf>,
868    /// v7.17.0 Phase 6.2 — cross-process exclusion lock. Acquired
869    /// via `fs::create_dir` on `<db_path>.lock` at open_path
870    /// entry; released on Drop by `fs::remove_dir`. atomic on
871    /// every supported platform. A second process opening the
872    /// same path while the first is still alive hits the
873    /// create_dir failure and returns
874    /// `EngineError::Unsupported("database is locked by another
875    /// process: …")`. Stale locks (process crashed mid-session)
876    /// must be cleared via `Database::force_unlock(path)` —
877    /// SPG can't safely fingerprint who owned a stale directory
878    /// without a libc dep, which would violate spg-embedded's
879    /// zero-deps charter.
880    lock_path: PathBuf,
881}
882
883impl Database {
884    /// Open a fresh in-memory database. No WAL, no catalog
885    /// snapshot on disk — perfect for tests + short-lived
886    /// CLI tools.
887    #[must_use]
888    pub fn open_in_memory() -> Self {
889        Self {
890            engine: Engine::new().with_clock(wall_clock_micros),
891            persistence: None,
892            commit_lsn: AtomicU64::new(0),
893        }
894    }
895
896    /// v7.1 — Open or create a persistent database backed by
897    /// the file at `db_path`. The WAL lives at `db_path` +
898    /// ".wal" (e.g. `./data/spg.db` → `./data/spg.db.wal`). Boot
899    /// path:
900    ///
901    /// 1. If `db_path` exists, restore the catalog snapshot.
902    /// 2. If the WAL exists, replay every record into the
903    ///    restored engine — the same recovery story
904    ///    `spg-server` uses.
905    /// 3. Open the WAL in append+sync mode so subsequent
906    ///    `execute()` writes durably commit (one fsync per
907    ///    mutation).
908    ///
909    /// `Drop` writes a final catalog snapshot + truncates the
910    /// WAL — operators that need a sync barrier at a specific
911    /// point use `checkpoint()` explicitly.
912    pub fn open_path(db_path: impl AsRef<Path>) -> Result<Self, EngineError> {
913        let db_path = db_path.as_ref().to_path_buf();
914        // v7.19 — WAL is a directory of chunk files. Legacy
915        // single-file path stays variable-named `wal_path` for
916        // the backward-compat migration block below.
917        let wal_path = {
918            let mut p = db_path.clone();
919            let name = p
920                .file_name()
921                .map(|n| {
922                    let mut s = n.to_os_string();
923                    s.push(".wal");
924                    s
925                })
926                .unwrap_or_else(|| std::ffi::OsString::from(".wal"));
927            p.set_file_name(name);
928            p
929        };
930        let wal_dir = wal_path.clone();
931        if let Some(parent) = db_path.parent()
932            && !parent.as_os_str().is_empty()
933        {
934            std::fs::create_dir_all(parent).map_err(io_err)?;
935        }
936        // v7.17.0 Phase 6.2 — acquire cross-process exclusion
937        // lock before touching any catalog / WAL bytes. atomic
938        // mkdir on every supported platform; a second process
939        // opening the same path while the first is still alive
940        // hits the create_dir failure and gets a clear error.
941        let lock_path = {
942            let mut p = db_path.clone();
943            let name = p
944                .file_name()
945                .map(|n| {
946                    let mut s = n.to_os_string();
947                    s.push(".lock");
948                    s
949                })
950                .unwrap_or_else(|| std::ffi::OsString::from(".lock"));
951            p.set_file_name(name);
952            p
953        };
954        std::fs::create_dir(&lock_path).map_err(|e| {
955            if e.kind() == std::io::ErrorKind::AlreadyExists {
956                EngineError::Unsupported(format!(
957                    "database is locked by another process (or stale lock): {}; \
958                     remove the directory manually after confirming no other \
959                     process holds it, or call Database::force_unlock()",
960                    lock_path.display()
961                ))
962            } else {
963                io_err(e)
964            }
965        })?;
966        let mut engine = if db_path.exists() {
967            let bytes = std::fs::read(&db_path).map_err(io_err)?;
968            let engine = Engine::restore_envelope(&bytes).map_err(|e| {
969                EngineError::Storage(spg_storage::StorageError::Corrupt(format!(
970                    "restore from {}: {e}",
971                    db_path.display()
972                )))
973            })?;
974            engine.with_clock(wall_clock_micros)
975        } else {
976            Engine::new().with_clock(wall_clock_micros)
977        };
978        // v7.1.4 — manifest-driven cold-segment reload. The
979        // manifest sidecar pairs the catalog snapshot CRC with a
980        // list of `(segment_id, path, crc32)` triples; verify
981        // before loading so a torn or stale manifest doesn't
982        // surface phantom data.
983        let cold_segments_dir = {
984            let parent = db_path.parent().unwrap_or_else(|| Path::new("."));
985            let stem = db_path
986                .file_stem()
987                .unwrap_or_else(|| std::ffi::OsStr::new("db"))
988                .to_string_lossy()
989                .into_owned();
990            parent.join(format!("{stem}.spg")).join("segments")
991        };
992        let mut cold_segment_paths: BTreeMap<u32, PathBuf> = BTreeMap::new();
993        let manifest_pth = spg_manifest_path(&db_path);
994        if manifest_pth.exists() && db_path.exists() {
995            let m_bytes = std::fs::read(&manifest_pth).map_err(io_err)?;
996            if let Ok(m) = CatalogManifest::deserialize(&m_bytes) {
997                let snap_bytes = std::fs::read(&db_path).map_err(io_err)?;
998                let snap_crc = spg_crypto::crc32::crc32(&snap_bytes);
999                if snap_crc == m.catalog_crc32 {
1000                    for entry in &m.cold_segments {
1001                        if let Ok(seg_bytes) = std::fs::read(&entry.path) {
1002                            let computed = spg_crypto::crc32::crc32(&seg_bytes);
1003                            if computed != entry.crc32 {
1004                                eprintln!(
1005                                    "spg-embedded: manifest skip segment {}: CRC mismatch",
1006                                    entry.segment_id
1007                                );
1008                                continue;
1009                            }
1010                            if engine.catalog().cold_segment(entry.segment_id).is_some() {
1011                                // Already loaded via Catalog::clone path (shouldn't happen
1012                                // since Engine::new + restore_envelope don't populate cold).
1013                                continue;
1014                            }
1015                            let mut new_cat = engine.catalog().clone();
1016                            if let Err(e) =
1017                                new_cat.load_segment_bytes_at(entry.segment_id, seg_bytes)
1018                            {
1019                                eprintln!(
1020                                    "spg-embedded: manifest load segment {} failed: {e}",
1021                                    entry.segment_id
1022                                );
1023                                continue;
1024                            }
1025                            engine.replace_catalog(new_cat);
1026                            cold_segment_paths.insert(entry.segment_id, entry.path.clone());
1027                        } else {
1028                            eprintln!(
1029                                "spg-embedded: manifest skip segment {}: file unreadable",
1030                                entry.segment_id
1031                            );
1032                        }
1033                    }
1034                }
1035            }
1036        }
1037        // v7.19 — chunked WAL on-disk layout.
1038        //
1039        // Three cases handled here:
1040        //
1041        // 1. wal_dir exists as a DIRECTORY → scan its
1042        //    `<unix_us>_<leading_lsn>.wal` chunks (sorted
1043        //    lexicographically = chunk-creation order), replay
1044        //    them in sequence, advance the LSN watermark to the
1045        //    max commit_lsn seen.
1046        //
1047        // 2. wal_path exists as a FILE → legacy v7.18 layout.
1048        //    Migrate it: create `wal_dir/`, move the single file
1049        //    inside as `0000000000000000_0000000000000000.wal`,
1050        //    then fall through to case 1's replay loop.
1051        //
1052        // 3. Neither exists → fresh database; create wal_dir.
1053        let mut initial_lsn: u64 = 0;
1054        if wal_path.is_file() {
1055            // Case 2: legacy single-file WAL migration.
1056            let legacy_bytes = std::fs::read(&wal_path).map_err(io_err)?;
1057            std::fs::remove_file(&wal_path).map_err(io_err)?;
1058            std::fs::create_dir_all(&wal_dir).map_err(io_err)?;
1059            if !legacy_bytes.is_empty() {
1060                let migrated = wal_dir.join(legacy_chunk_filename());
1061                std::fs::write(&migrated, &legacy_bytes).map_err(io_err)?;
1062            }
1063        } else if !wal_dir.exists() {
1064            // Case 3: fresh database.
1065            std::fs::create_dir_all(&wal_dir).map_err(io_err)?;
1066        }
1067        // Cases 1 + 2 share replay logic now that wal_dir is
1068        // guaranteed to exist (and may be empty for case 3).
1069        //
1070        // Two-pass replay so we don't double-apply records the
1071        // snapshot already reflects:
1072        //
1073        // 1. Find the highest commit_lsn carried by a
1074        //    checkpoint_marker across all chunks. That LSN is the
1075        //    snapshot's high-water mark — anything ≤ it is
1076        //    already in `<db_path>` and replaying it would
1077        //    DuplicateTable / double-insert.
1078        // 2. Replay only records strictly above that LSN.
1079        //
1080        // Case 2 migration (legacy single-file WAL) lands here
1081        // too: the migrated chunk has no marker so the LSN floor
1082        // is 0 and every record applies — exactly the v7.18
1083        // behaviour the migration is supposed to preserve.
1084        let chunk_paths = sorted_wal_chunks(&wal_dir).map_err(io_err)?;
1085        let mut snapshot_lsn: u64 = 0;
1086        for chunk in &chunk_paths {
1087            let bytes = std::fs::read(chunk).map_err(io_err)?;
1088            if let Ok(records) = parse_wal_records(&bytes) {
1089                for r in &records {
1090                    if r.type_byte == WAL_V4_TYPE_CHECKPOINT_MARKER {
1091                        if let Some(l) = r.commit_lsn {
1092                            if l > snapshot_lsn {
1093                                snapshot_lsn = l;
1094                            }
1095                        }
1096                    }
1097                }
1098            }
1099        }
1100        for chunk in &chunk_paths {
1101            let bytes = std::fs::read(chunk).map_err(io_err)?;
1102            if bytes.is_empty() {
1103                continue;
1104            }
1105            replay_wal_filtered(&bytes, &mut engine, snapshot_lsn)
1106                .map_err(|m| EngineError::Storage(spg_storage::StorageError::Corrupt(m)))?;
1107            if let Ok(records) = parse_wal_records(&bytes) {
1108                if let Some(max) = records.iter().filter_map(|r| r.commit_lsn).max() {
1109                    if max > initial_lsn {
1110                        initial_lsn = max;
1111                    }
1112                }
1113            }
1114        }
1115        // Open the "current" chunk — either the last existing
1116        // chunk file (so subsequent appends extend it until the
1117        // size threshold rotates) or a fresh first chunk.
1118        let now_us = wall_clock_micros();
1119        let current_chunk_path = if let Some(last) = chunk_paths.last() {
1120            last.clone()
1121        } else {
1122            wal_dir.join(chunk_filename(now_us, initial_lsn + 1))
1123        };
1124        let wal = OpenOptions::new()
1125            .create(true)
1126            .append(true)
1127            .read(true)
1128            .open(&current_chunk_path)
1129            .map_err(io_err)?;
1130        let wal_len = wal.metadata().map_err(io_err)?.len();
1131        // v7.19 P3 — spawn retention sweep thread when the
1132        // operator opted in via SPG_PITR_RETENTION_HOURS > 0.
1133        // Otherwise stay on the v7.18 behaviour (chunks accumulate
1134        // until something else — backup-pitr archival, manual
1135        // cleanup — moves them).
1136        let retention_hours = pitr_retention_hours();
1137        let (retention_shutdown, retention_thread) = if retention_hours > 0 {
1138            let shutdown = Arc::new(AtomicBool::new(false));
1139            let shutdown_clone = Arc::clone(&shutdown);
1140            let wal_dir_clone = wal_dir.clone();
1141            let check_interval = std::time::Duration::from_secs(pitr_retention_check_sec());
1142            let archive_cmd = pitr_archive_cmd();
1143            let handle = std::thread::Builder::new()
1144                .name("spg-pitr-retention".into())
1145                .spawn(move || {
1146                    retention_sweep_loop(
1147                        wal_dir_clone,
1148                        retention_hours,
1149                        check_interval,
1150                        archive_cmd,
1151                        shutdown_clone,
1152                    );
1153                })
1154                .map_err(io_err)?;
1155            (Some(shutdown), Some(handle))
1156        } else {
1157            (None, None)
1158        };
1159        Ok(Self {
1160            engine,
1161            commit_lsn: AtomicU64::new(initial_lsn),
1162            persistence: Some(PersistenceCtx {
1163                db_path,
1164                wal_dir,
1165                current_chunk_path,
1166                wal,
1167                wal_len,
1168                checkpoint_threshold_bytes: default_checkpoint_threshold_bytes(),
1169                cold_segments_dir,
1170                cold_segment_paths,
1171                lock_path,
1172                retention_shutdown,
1173                retention_thread,
1174            }),
1175        })
1176    }
1177
1178    /// v7.1.4 — freeze the oldest `max_rows` of `table_name`'s
1179    /// hot tier into a brand-new cold-tier segment + persist
1180    /// it to disk. Same semantics as `spg-server`'s freezer
1181    /// thread; embedded just runs the freeze synchronously on
1182    /// the caller's thread. Persistence + manifest update
1183    /// happen as part of the next `checkpoint()` (or on Drop).
1184    pub fn freeze_oldest_to_cold(
1185        &mut self,
1186        table_name: &str,
1187        index_name: &str,
1188        max_rows: usize,
1189    ) -> Result<spg_storage::FreezeReport, EngineError> {
1190        let report = self
1191            .engine
1192            .freeze_oldest_to_cold(table_name, index_name, max_rows)?;
1193        if let Some(p) = &mut self.persistence {
1194            std::fs::create_dir_all(&p.cold_segments_dir).map_err(io_err)?;
1195            let final_path = p
1196                .cold_segments_dir
1197                .join(format!("seg_{}.spg", report.segment_id));
1198            let tmp_path = p
1199                .cold_segments_dir
1200                .join(format!("seg_{}.spg.tmp", report.segment_id));
1201            std::fs::write(&tmp_path, &report.segment_bytes).map_err(io_err)?;
1202            std::fs::rename(&tmp_path, &final_path).map_err(io_err)?;
1203            p.cold_segment_paths.insert(report.segment_id, final_path);
1204        }
1205        Ok(report)
1206    }
1207
1208    /// v7.1 — override the auto-checkpoint WAL-size ceiling for
1209    /// this `Database` instance. Default is
1210    /// `SPG_EMBEDDED_CHECKPOINT_BYTES` env (4 MiB if unset); the
1211    /// setter wins. No-op when the database is in-memory.
1212    pub fn set_checkpoint_threshold_bytes(&mut self, bytes: u64) {
1213        if let Some(p) = &mut self.persistence {
1214            p.checkpoint_threshold_bytes = bytes.max(1);
1215        }
1216    }
1217
1218    /// v7.1 — flush a fresh catalog snapshot to `db_path` and
1219    /// truncate the WAL. Idempotent; cheap when nothing has
1220    /// happened since the last checkpoint. No-op when the
1221    /// database is in-memory (no `db_path` configured).
1222    ///
1223    /// Called automatically when:
1224    /// - the WAL grows past
1225    ///   `SPG_EMBEDDED_CHECKPOINT_BYTES` (default 4 MiB) at the
1226    ///   end of an `execute()`, and
1227    /// - `Drop` runs (best-effort; checkpoint failure on drop is
1228    ///   logged to stderr).
1229    pub fn checkpoint(&mut self) -> Result<(), EngineError> {
1230        let snapshot = self.engine.snapshot();
1231        let Some(p) = &mut self.persistence else {
1232            return Ok(());
1233        };
1234        // Snapshot first (atomic via tmp+rename), then WAL
1235        // truncate. Same order as `spg-server`'s CHECKPOINT —
1236        // a crash between the two leaves the WAL holding
1237        // already-snapshotted ops, which replay cleanly on the
1238        // next boot (idempotent for SPG's standard DDL/DML
1239        // mutations).
1240        let tmp = {
1241            let mut t = p.db_path.clone();
1242            let mut name = t
1243                .file_name()
1244                .map(std::ffi::OsStr::to_os_string)
1245                .unwrap_or_default();
1246            name.push(".tmp");
1247            t.set_file_name(name);
1248            t
1249        };
1250        std::fs::write(&tmp, &snapshot).map_err(io_err)?;
1251        std::fs::rename(&tmp, &p.db_path).map_err(io_err)?;
1252        // v7.1.4 — refresh the manifest so the next boot can
1253        // reload cold segments alongside the snapshot. Bytes
1254        // come from the freshly-written snapshot file (= the
1255        // canonical CRC source).
1256        if !p.cold_segment_paths.is_empty() {
1257            let snap_crc = spg_crypto::crc32::crc32(&snapshot);
1258            let entries: Vec<ColdSegmentEntry> = p
1259                .cold_segment_paths
1260                .iter()
1261                .filter_map(|(&segment_id, path)| {
1262                    let bytes = std::fs::read(path).ok()?;
1263                    Some(ColdSegmentEntry {
1264                        segment_id,
1265                        path: path.clone(),
1266                        crc32: spg_crypto::crc32::crc32(&bytes),
1267                    })
1268                })
1269                .collect();
1270            let manifest = CatalogManifest {
1271                catalog_crc32: snap_crc,
1272                cold_segments: entries,
1273                wal_baseline_offset: 0,
1274            };
1275            let m_bytes = manifest.serialize();
1276            let m_path = spg_manifest_path(&p.db_path);
1277            if let Some(dir) = m_path.parent() {
1278                std::fs::create_dir_all(dir).map_err(io_err)?;
1279            }
1280            let m_tmp = {
1281                let mut t = m_path.clone();
1282                let mut name = t
1283                    .file_name()
1284                    .map(std::ffi::OsStr::to_os_string)
1285                    .unwrap_or_default();
1286                name.push(".tmp");
1287                t.set_file_name(name);
1288                t
1289            };
1290            std::fs::write(&m_tmp, &m_bytes).map_err(io_err)?;
1291            std::fs::rename(&m_tmp, &m_path).map_err(io_err)?;
1292        }
1293        // v7.19 — append a checkpoint marker to the current chunk
1294        // (anchors restore-to-time backups), then rotate to a
1295        // fresh chunk file. Old chunks stay on disk and become
1296        // input to the retention thread (P3) + spgctl backup-pitr
1297        // (P6). The single-file `set_len(0)` truncate the v7.18
1298        // path used is gone — that path silently discarded WAL
1299        // history between checkpoint and the operator's next cron
1300        // run, which is exactly what PITR was meant to fix.
1301        let marker_lsn = self.commit_lsn.load(Ordering::SeqCst);
1302        let marker_ts = wall_clock_micros();
1303        let marker = encode_v4_checkpoint_marker(marker_lsn, marker_ts, &p.db_path);
1304        p.wal.write_all(&marker).map_err(io_err)?;
1305        p.wal.sync_data().map_err(io_err)?;
1306        // Close the active chunk by replacing the handle. The
1307        // OpenOptions append+create combo creates the new chunk
1308        // file fresh; `wal_len` resets to 0 ready for the next
1309        // execute()'s record.
1310        let new_chunk_path = p.wal_dir.join(chunk_filename(marker_ts, marker_lsn + 1));
1311        let new_handle = OpenOptions::new()
1312            .create(true)
1313            .append(true)
1314            .read(true)
1315            .open(&new_chunk_path)
1316            .map_err(io_err)?;
1317        p.current_chunk_path = new_chunk_path;
1318        p.wal = new_handle;
1319        p.wal_len = 0;
1320        Ok(())
1321    }
1322
1323    /// Restore a database from a previously-captured catalog
1324    /// snapshot. Pairs with `Database::snapshot()` for
1325    /// round-tripping in-memory state without going through
1326    /// the `spg-server` WAL.
1327    pub fn restore(snapshot: &[u8]) -> Result<Self, EngineError> {
1328        let engine = Engine::restore_envelope(snapshot).map_err(|e| {
1329            EngineError::Storage(spg_storage::StorageError::Corrupt(format!("restore: {e}")))
1330        })?;
1331        Ok(Self {
1332            engine,
1333            persistence: None,
1334            commit_lsn: AtomicU64::new(0),
1335        })
1336    }
1337
1338    /// Take a catalog snapshot suitable for `Database::restore`.
1339    /// The bytes are SPG's canonical catalog envelope (FILE_MAGIC
1340    /// + version + payload); round-trips through every released
1341    /// SPG version per the STABILITY contract.
1342    #[must_use]
1343    pub fn snapshot(&self) -> Vec<u8> {
1344        self.engine.snapshot()
1345    }
1346
1347    /// Execute a SQL statement and return the engine's
1348    /// `QueryResult` verbatim. Pass-through for callers that
1349    /// want to keep PG-flavoured column/row metadata.
1350    ///
1351    /// v7.1 — when the database was opened via `open_path`,
1352    /// successful mutations are appended to the WAL + fsynced
1353    /// before the call returns. A subsequent process crash will
1354    /// recover state up to the last successful return from
1355    /// `execute()`. Read-only statements (SELECT / SHOW /
1356    /// EXPLAIN / BEGIN-COMMIT-ROLLBACK / CHECKPOINT / COMPACT
1357    /// etc.) skip the WAL entirely.
1358    pub fn execute(&mut self, sql: &str) -> Result<QueryResult, EngineError> {
1359        let result = self.engine.execute(sql)?;
1360        if self.persistence.is_some()
1361            && !sql_is_read_only(sql)
1362            && matches!(
1363                &result,
1364                QueryResult::CommandOk {
1365                    modified_catalog: true,
1366                    ..
1367                }
1368            )
1369        {
1370            // v7.18 PITR — write v4 records that carry the commit
1371            // LSN + wall-clock micros so restore tooling can
1372            // target a point in time. Replay path still accepts
1373            // v3 records emitted by older spg-embedded versions.
1374            // Crash window is bounded by one record exactly as
1375            // under v3: WAL fsync happens after the in-memory
1376            // mutation, so the WAL never describes a write that
1377            // didn't apply.
1378            let lsn = self.commit_lsn.fetch_add(1, Ordering::SeqCst) + 1;
1379            let ts = wall_clock_micros();
1380            let record = encode_v4_auto_commit(sql, lsn, ts);
1381            let p = self.persistence.as_mut().expect("checked above");
1382            p.wal.write_all(&record).map_err(io_err)?;
1383            p.wal.sync_data().map_err(io_err)?;
1384            p.wal_len = p.wal_len.saturating_add(record.len() as u64);
1385            if p.wal_len >= p.checkpoint_threshold_bytes {
1386                self.checkpoint()?;
1387            }
1388        }
1389        Ok(result)
1390    }
1391
1392    /// v7.3.0 — typed-row variant of [`Database::query`]. Each
1393    /// row decodes into a `T: FromSpgRow` so callers don't
1394    /// pattern-match on `Value` themselves. Use [`spg_row!`] to
1395    /// generate the impl, or write it by hand.
1396    pub fn query_typed<T: FromSpgRow>(&mut self, sql: &str) -> Result<Vec<T>, EngineError> {
1397        let rows = self.query(sql)?;
1398        rows.into_iter().map(|r| T::from_spg_row(&r)).collect()
1399    }
1400
1401    /// Run a SELECT and return rows as a `Vec<Vec<Value>>` —
1402    /// strips the column-schema metadata for read-side
1403    /// ergonomics. Errors on non-Rows results (DML / DDL
1404    /// statements should go through `execute` instead).
1405    pub fn query(&mut self, sql: &str) -> Result<Vec<Vec<Value>>, EngineError> {
1406        match self.engine.execute(sql)? {
1407            QueryResult::Rows { rows, .. } => Ok(rows.into_iter().map(|r| r.values).collect()),
1408            QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
1409                "query() expects a SELECT — use execute() for DML/DDL".into(),
1410            )),
1411            // v7.5.0 — QueryResult is #[non_exhaustive]; any future
1412            // variant is not a SELECT row stream, treat as Unsupported.
1413            _ => Err(EngineError::Unsupported(
1414                "query() expects a SELECT — use execute() for DML/DDL".into(),
1415            )),
1416        }
1417    }
1418
1419    /// v7.16.0 — column-aware variant of [`Self::query`].
1420    /// Returns the column schema vec alongside the rows so
1421    /// adapters (the spg-sqlx Row impl most notably) can drive
1422    /// name + type-based column lookups. Errors on non-Rows
1423    /// results identically to `query`.
1424    pub fn query_with_columns(
1425        &mut self,
1426        sql: &str,
1427    ) -> Result<(Vec<spg_storage::ColumnSchema>, Vec<Vec<Value>>), EngineError> {
1428        match self.engine.execute(sql)? {
1429            QueryResult::Rows { columns, rows } => {
1430                Ok((columns, rows.into_iter().map(|r| r.values).collect()))
1431            }
1432            QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
1433                "query_with_columns() expects a SELECT — use execute() for DML/DDL".into(),
1434            )),
1435            _ => Err(EngineError::Unsupported(
1436                "query_with_columns() expects a SELECT — use execute() for DML/DDL".into(),
1437            )),
1438        }
1439    }
1440
1441    /// v7.16.0 — column-aware variant of
1442    /// [`Self::query_prepared`]. Same shape as
1443    /// `query_with_columns` but driven from a prepared
1444    /// statement + bound params.
1445    pub fn query_prepared_with_columns(
1446        &mut self,
1447        stmt: &Statement,
1448        params: &[Value],
1449    ) -> Result<(Vec<spg_storage::ColumnSchema>, Vec<Vec<Value>>), EngineError> {
1450        match self.engine.execute_prepared(stmt.stmt.clone(), params)? {
1451            QueryResult::Rows { columns, rows } => {
1452                Ok((columns, rows.into_iter().map(|r| r.values).collect()))
1453            }
1454            QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
1455                "query_prepared_with_columns() expects a SELECT — use execute_prepared() for DML/DDL".into(),
1456            )),
1457            _ => Err(EngineError::Unsupported(
1458                "query_prepared_with_columns() expects a SELECT — use execute_prepared() for DML/DDL".into(),
1459            )),
1460        }
1461    }
1462
1463    /// Borrow the underlying engine. Escape hatch for callers
1464    /// that need access to `spg-engine` APIs not yet surfaced
1465    /// here (transactions, EXPLAIN ANALYZE, etc.).
1466    #[must_use]
1467    pub const fn engine(&self) -> &Engine {
1468        &self.engine
1469    }
1470
1471    /// Mutable borrow of the underlying engine. Same intent as
1472    /// `engine()` but for write-side APIs (e.g. inserting
1473    /// directly through `Catalog::insert` for high-throughput
1474    /// bulk loads that bypass SQL parsing).
1475    pub const fn engine_mut(&mut self) -> &mut Engine {
1476        &mut self.engine
1477    }
1478
1479    /// v7.16.0 — parse + plan a SQL string ONCE so subsequent
1480    /// `execute_prepared` / `query_prepared` calls can re-bind
1481    /// parameters without re-parsing. The returned [`Statement`]
1482    /// is a thin handle around the AST + cached source SQL; it's
1483    /// `Clone` so the same plan can drive many bind calls
1484    /// concurrently (each call clones the AST and runs
1485    /// placeholder substitution on the clone — the cached
1486    /// plan stays intact).
1487    ///
1488    /// Plan caching follows the engine's existing version-aware
1489    /// rule: a prepared `Statement` whose statistics version
1490    /// has rolled (ANALYZE ran between prepare and execute)
1491    /// will silently re-prepare under the hood. Callers don't
1492    /// need to detect this.
1493    ///
1494    /// Placeholders in the SQL use PG's `$1`, `$2`, … convention.
1495    /// `bind`-time `Value`s are passed as a slice; arity
1496    /// mismatches surface as `EvalError::PlaceholderOutOfRange`
1497    /// at `execute_prepared` time, not here.
1498    ///
1499    /// # Errors
1500    /// Surfaces `EngineError` (parse error / plan rewrite
1501    /// failure) from the underlying `Engine::prepare`.
1502    pub fn prepare(&mut self, sql: &str) -> Result<Statement, EngineError> {
1503        // Use the cached path so repeated prepares of the same
1504        // SQL are O(1). The engine's plan cache stays shared
1505        // across all callers of this Database — a single
1506        // `PgPool`-shaped consumer (or, later, the spg-sqlx
1507        // adapter) prepares once and reaps the win on every bind.
1508        let stmt = self
1509            .engine
1510            .prepare_cached(sql)
1511            .map_err(EngineError::Parse)?;
1512        Ok(Statement {
1513            stmt,
1514            sql: sql.to_string(),
1515        })
1516    }
1517
1518    /// v7.17.0 Phase 3.P0-66 — describe a SQL string without
1519    /// executing. Returns `(parameter_oid_count, output_columns)`
1520    /// where `output_columns` is empty for non-SELECT statements
1521    /// or for SELECT shapes the describe planner can't resolve
1522    /// (JOIN / subquery / unknown table). Wraps
1523    /// `Engine::describe_prepared` so the spg-sqlx bridge can
1524    /// surface PG-shape Describe replies for
1525    /// `sqlx::query!()` compile-time validation.
1526    ///
1527    /// # Errors
1528    /// Propagates parse errors from the underlying prepare path.
1529    pub fn describe(&mut self, sql: &str) -> Result<(Vec<u32>, Vec<ColumnSchema>), EngineError> {
1530        let stmt = self
1531            .engine
1532            .prepare_cached(sql)
1533            .map_err(EngineError::Parse)?;
1534        Ok(self.engine.describe_prepared(&stmt))
1535    }
1536
1537    /// v7.16.0 — execute a prepared statement with bound
1538    /// parameters. Mirrors `Engine::execute_prepared`: clones
1539    /// the AST, substitutes `$1..$N` → `params[0..N-1]`, runs.
1540    ///
1541    /// Persistence (WAL fsync + auto-checkpoint) follows the
1542    /// same rules as `execute(sql)`: mutating statements get a
1543    /// WAL record AFTER the in-memory exec succeeds. The WAL
1544    /// record carries the substituted, bind-final SQL, so
1545    /// replay reconstructs the same row state without needing
1546    /// the original prepared `Statement` to still be alive.
1547    ///
1548    /// # Errors
1549    /// Propagates engine errors. Param arity mismatch surfaces
1550    /// as `EvalError::PlaceholderOutOfRange`.
1551    pub fn execute_prepared(
1552        &mut self,
1553        stmt: &Statement,
1554        params: &[Value],
1555    ) -> Result<QueryResult, EngineError> {
1556        let result = self.engine.execute_prepared(stmt.stmt.clone(), params)?;
1557        // WAL persistence on the bind-final SQL. Build the
1558        // canonical Display form by re-printing the
1559        // placeholder-substituted statement (cheap — the AST
1560        // is already in hand from execute_prepared's internal
1561        // clone) so replay's path is identical to the
1562        // simple-query path.
1563        if self.persistence.is_some()
1564            && matches!(
1565                &result,
1566                QueryResult::CommandOk {
1567                    modified_catalog: true,
1568                    ..
1569                }
1570            )
1571        {
1572            // Render the AST back to SQL for WAL replay. The
1573            // placeholder positions are already substituted in
1574            // the executed clone; we re-substitute on a fresh
1575            // clone here purely to obtain the canonical text.
1576            let mut wal_stmt = stmt.stmt.clone();
1577            // Use the engine's substitute_placeholders entry —
1578            // exposed via execute_prepared above. Here we
1579            // re-run the substitution only for Display.
1580            crate::wal_render_with_params(&mut wal_stmt, params);
1581            let canonical = format!("{wal_stmt}");
1582            // v7.18 PITR — prepared path also emits v4 records so
1583            // LSN/timestamp coverage is uniform across simple and
1584            // extended query.
1585            let lsn = self.commit_lsn.fetch_add(1, Ordering::SeqCst) + 1;
1586            let ts = wall_clock_micros();
1587            let record = encode_v4_auto_commit(&canonical, lsn, ts);
1588            let p = self.persistence.as_mut().expect("checked above");
1589            p.wal.write_all(&record).map_err(io_err)?;
1590            p.wal.sync_data().map_err(io_err)?;
1591            p.wal_len = p.wal_len.saturating_add(record.len() as u64);
1592            if p.wal_len >= p.checkpoint_threshold_bytes {
1593                self.checkpoint()?;
1594            }
1595        }
1596        Ok(result)
1597    }
1598
1599    /// v7.16.0 — run a prepared SELECT with bound params and
1600    /// return rows as `Vec<Vec<Value>>`, matching `query()`
1601    /// shape. SELECTs are read-only so this never writes the
1602    /// WAL.
1603    ///
1604    /// # Errors
1605    /// Returns `Unsupported` if the prepared statement isn't a
1606    /// SELECT (use `execute_prepared` for DML/DDL).
1607    pub fn query_prepared(
1608        &mut self,
1609        stmt: &Statement,
1610        params: &[Value],
1611    ) -> Result<Vec<Vec<Value>>, EngineError> {
1612        match self.engine.execute_prepared(stmt.stmt.clone(), params)? {
1613            QueryResult::Rows { rows, .. } => Ok(rows.into_iter().map(|r| r.values).collect()),
1614            QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
1615                "query_prepared() expects a SELECT — use execute_prepared() for DML/DDL".into(),
1616            )),
1617            _ => Err(EngineError::Unsupported(
1618                "query_prepared() expects a SELECT — use execute_prepared() for DML/DDL".into(),
1619            )),
1620        }
1621    }
1622
1623    /// v7.18 — parse + plan a SQL string against a
1624    /// `CatalogSnapshot`. Mirror of [`Database::prepare`] for the
1625    /// readonly fan-out path: no writer lock taken, no WAL write,
1626    /// no plan-cache mutation. Static-on-`Self` so callers can
1627    /// dispatch against a snapshot without an `&mut Database`
1628    /// borrow — `AsyncReadHandle::prepare` in spg-embedded-tokio
1629    /// is the load-bearing consumer.
1630    ///
1631    /// # Errors
1632    /// Propagates `EngineError::Parse` from the parser.
1633    pub fn prepare_on_snapshot(
1634        snapshot: &CatalogSnapshot,
1635        sql: &str,
1636    ) -> Result<Statement, EngineError> {
1637        let stmt =
1638            spg_engine::Engine::prepare_on_snapshot(snapshot, sql).map_err(EngineError::Parse)?;
1639        Ok(Statement {
1640            stmt,
1641            sql: sql.to_string(),
1642        })
1643    }
1644
1645    /// v7.18 — execute a prepared `Statement` against a
1646    /// `CatalogSnapshot` with bound params. Mirror of
1647    /// [`Database::execute_prepared`] on the readonly path:
1648    /// writes / DDL hit `EngineError::WriteRequired`. No WAL
1649    /// write, no writer lock, multiple snapshots can run
1650    /// concurrently — the snapshot is immutable from prepare time.
1651    ///
1652    /// # Errors
1653    /// Surfaces `EngineError::WriteRequired` for non-readonly
1654    /// statements; propagates other engine errors.
1655    pub fn execute_prepared_on_snapshot(
1656        snapshot: &CatalogSnapshot,
1657        stmt: &Statement,
1658        params: &[Value],
1659    ) -> Result<QueryResult, EngineError> {
1660        spg_engine::Engine::execute_readonly_prepared_on_snapshot(
1661            snapshot,
1662            stmt.stmt.clone(),
1663            params,
1664        )
1665    }
1666
1667    /// v7.18 — describe a SQL string against a
1668    /// `CatalogSnapshot`. Mirror of [`Database::describe`] on
1669    /// the readonly path. Pure function on the snapshot's
1670    /// catalog; safe to call from any thread.
1671    ///
1672    /// # Errors
1673    /// Propagates `EngineError::Parse` from the parser.
1674    pub fn describe_on_snapshot(
1675        snapshot: &CatalogSnapshot,
1676        sql: &str,
1677    ) -> Result<(Vec<u32>, Vec<ColumnSchema>), EngineError> {
1678        let stmt =
1679            spg_engine::Engine::prepare_on_snapshot(snapshot, sql).map_err(EngineError::Parse)?;
1680        Ok(spg_engine::Engine::describe_prepared_on_snapshot(
1681            snapshot, &stmt,
1682        ))
1683    }
1684
1685    /// v7.2.0 — run `body` inside an implicit `BEGIN` /
1686    /// `COMMIT` pair. The body receives `&mut Database` so it
1687    /// can `execute()` / `query()` like any other code path;
1688    /// the only difference is that every write in the body
1689    /// lands inside one transaction, and a returned `Err` from
1690    /// the body triggers `ROLLBACK` before the error propagates.
1691    ///
1692    /// Nested calls are not supported — SPG's transaction
1693    /// model is single-writer with explicit `BEGIN` /
1694    /// `COMMIT` / `ROLLBACK`, and a nested `with_transaction`
1695    /// would hit `EngineError::Unsupported("nested
1696    /// transaction")` at the inner `BEGIN`.
1697    pub fn with_transaction<R, F>(&mut self, body: F) -> Result<R, EngineError>
1698    where
1699        F: FnOnce(&mut Self) -> Result<R, EngineError>,
1700    {
1701        self.execute("BEGIN")?;
1702        match body(self) {
1703            Ok(value) => {
1704                self.execute("COMMIT")?;
1705                Ok(value)
1706            }
1707            Err(e) => {
1708                // Best-effort rollback. If ROLLBACK itself
1709                // fails (rare — the engine reports it via
1710                // `Unsupported` only when there's no active
1711                // TX, which can't happen here) we surface the
1712                // original body error, not the rollback error.
1713                let _ = self.execute("ROLLBACK");
1714                Err(e)
1715            }
1716        }
1717    }
1718}
1719
1720impl Default for Database {
1721    fn default() -> Self {
1722        Self::open_in_memory()
1723    }
1724}
1725
1726/// v7.7.5 — observability snapshot returned by
1727/// [`Database::metrics`]. Plain data, no allocations beyond
1728/// what the struct itself takes; cheap to construct and
1729/// cheap to serialise.
1730#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1731#[non_exhaustive]
1732pub struct EmbeddedMetrics {
1733    /// Total live row count across every user table (hot
1734    /// tier only — cold-tier rows live in segment files).
1735    pub hot_rows: u64,
1736    /// Sum of `Table::hot_bytes` across every user table.
1737    /// Tracks against the freezer's `hot_tier_bytes` budget.
1738    pub hot_bytes: u64,
1739    /// Number of cold-tier segments registered in the catalog.
1740    /// Includes tombstoned slots (segments retired by
1741    /// compaction whose disk file may still be on disk).
1742    pub cold_segments: u64,
1743    /// User-table count (excludes any future engine-managed
1744    /// internal tables).
1745    pub tables: u64,
1746    /// WAL size at last `execute()` / `checkpoint()`. Zero
1747    /// when the database is in-memory.
1748    pub wal_bytes: u64,
1749    /// `true` when the database was opened with `open_path` —
1750    /// i.e. WAL + checkpoint persistence is active.
1751    pub persistent: bool,
1752}
1753
1754/// v7.2.1 — handle returned by `spawn_background_freezer`.
1755/// Drop signals the worker thread to wind down + joins it,
1756/// so a `Database` (or its shared `Arc<Mutex<Database>>`)
1757/// can safely drop after the handle does.
1758#[must_use = "the background freezer keeps running until this handle is dropped"]
1759#[derive(Debug)]
1760pub struct FreezerHandle {
1761    shutdown: Arc<AtomicBool>,
1762    join: Option<JoinHandle<()>>,
1763}
1764
1765impl FreezerHandle {
1766    /// v7.2.1 — request the worker stop + join. Idempotent;
1767    /// safe to call from `Drop` (which also calls it).
1768    pub fn stop(&mut self) {
1769        self.shutdown.store(true, Ordering::Release);
1770        if let Some(h) = self.join.take() {
1771            let _ = h.join();
1772        }
1773    }
1774}
1775
1776impl Drop for FreezerHandle {
1777    fn drop(&mut self) {
1778        self.stop();
1779    }
1780}
1781
1782/// v7.2.1 — knobs for `Database::spawn_background_freezer`.
1783#[derive(Debug, Clone)]
1784pub struct FreezerOptions {
1785    /// Tick interval. Worker wakes every `tick`, checks the
1786    /// catalog's `hot_tier_bytes`, and freezes if over budget.
1787    pub tick: Duration,
1788    /// Hot-tier byte budget. Exceeded → next tick freezes the
1789    /// largest table's oldest `batch_rows` rows into a new
1790    /// cold segment.
1791    pub hot_tier_bytes: u64,
1792    /// Max rows the freezer demotes per fire.
1793    pub batch_rows: usize,
1794    /// v7.7.4 — auto-compact threshold. When the catalog has
1795    /// at least this many cold segments across all tables, the
1796    /// freezer fires a compaction pass after its next freeze.
1797    /// Set to `usize::MAX` to disable auto-compact entirely;
1798    /// the default is `64`, matching the `spg-server` operating
1799    /// point for SPG_COLD_COMPACT_SEGMENT_THRESHOLD.
1800    pub compact_when_segments_exceed: usize,
1801    /// v7.7.4 — target segment size for compaction merges,
1802    /// in bytes. Default 64 MiB, mirroring `spg-server`. Small
1803    /// segments below this size are merge candidates;
1804    /// segments at or above stay untouched.
1805    pub compact_target_bytes: u64,
1806}
1807
1808impl Default for FreezerOptions {
1809    fn default() -> Self {
1810        // Match the `spg-server` freezer's default operating
1811        // point (SPG_HOT_TIER_BYTES = 4 GiB, batch 1000 rows,
1812        // tick every 1 s) so embedded behaviour is predictable
1813        // for operators familiar with the server.
1814        Self {
1815            tick: Duration::from_secs(1),
1816            hot_tier_bytes: 4 * 1024 * 1024 * 1024,
1817            batch_rows: 1000,
1818            compact_when_segments_exceed: 64,
1819            compact_target_bytes: 64 * 1024 * 1024,
1820        }
1821    }
1822}
1823
1824impl Database {
1825    /// v7.7.4 — observe the catalog's cold-segment count.
1826    /// Useful for tests + dashboards that want to verify
1827    /// auto-compaction is firing.
1828    #[must_use]
1829    pub fn cold_segment_count(&self) -> usize {
1830        self.engine.catalog().cold_segment_count()
1831    }
1832
1833    /// v7.7.5 — observability snapshot. Returns a point-in-time
1834    /// view of the engine + persistence counters. Cheap (no
1835    /// locks beyond the existing `&self` borrow), so safe to
1836    /// call from a hot metrics-scrape path.
1837    ///
1838    /// Fields mirror the operational dashboard
1839    /// [`spg-server`](https://crates.io/crates/spg-server) exposes,
1840    /// minus the network counters that don't apply to embedded.
1841    #[must_use]
1842    pub fn metrics(&self) -> EmbeddedMetrics {
1843        let cat = self.engine.catalog();
1844        let mut hot_rows: u64 = 0;
1845        let mut hot_bytes: u64 = 0;
1846        for name in cat.table_names() {
1847            if let Some(t) = cat.get(&name) {
1848                hot_rows = hot_rows.saturating_add(t.row_count() as u64);
1849                hot_bytes = hot_bytes.saturating_add(t.hot_bytes());
1850            }
1851        }
1852        let (wal_bytes, persistent) = match &self.persistence {
1853            Some(p) => (p.wal_len, true),
1854            None => (0, false),
1855        };
1856        EmbeddedMetrics {
1857            hot_rows,
1858            hot_bytes,
1859            cold_segments: cat.cold_segment_count() as u64,
1860            tables: cat.table_count() as u64,
1861            wal_bytes,
1862            persistent,
1863        }
1864    }
1865
1866    /// v7.2.1 — spawn a background thread that periodically
1867    /// runs `freeze_oldest_to_cold` when the catalog-wide hot
1868    /// tier exceeds `opts.hot_tier_bytes`. The `Arc<Mutex<_>>`
1869    /// pattern matches the v7.2 sharing story: callers wrap
1870    /// their `Database` in `Arc::new(Mutex::new(db))` once,
1871    /// then clone the Arc for the worker + for foreground
1872    /// access. Return value is a handle whose `Drop` joins the
1873    /// worker.
1874    ///
1875    /// Picks the freeze target the same way `spg-server`'s
1876    /// freezer does: largest-`hot_bytes` user table with at
1877    /// least one BTree integer-PK index. Tables without a
1878    /// freezable index are skipped silently.
1879    pub fn spawn_background_freezer(
1880        db: Arc<Mutex<Database>>,
1881        opts: FreezerOptions,
1882    ) -> FreezerHandle {
1883        let shutdown = Arc::new(AtomicBool::new(false));
1884        let shutdown_for_thread = Arc::clone(&shutdown);
1885        let join = thread::Builder::new()
1886            .name("spg-embedded-freezer".into())
1887            .spawn(move || {
1888                background_freezer_loop(db, opts, shutdown_for_thread);
1889            })
1890            .expect("spawn background freezer thread");
1891        FreezerHandle {
1892            shutdown,
1893            join: Some(join),
1894        }
1895    }
1896}
1897
1898/// v7.2.1 — the freezer's main loop, factored out so the
1899/// `Database::spawn_background_freezer` path stays readable.
1900fn background_freezer_loop(
1901    db: Arc<Mutex<Database>>,
1902    opts: FreezerOptions,
1903    shutdown: Arc<AtomicBool>,
1904) {
1905    // Sleep in short slices so a shutdown request resolves
1906    // quickly (vs sleeping the full tick).
1907    let slice = Duration::from_millis(50.min(opts.tick.as_millis() as u64));
1908    let mut last_tick = std::time::Instant::now();
1909    loop {
1910        if shutdown.load(Ordering::Acquire) {
1911            return;
1912        }
1913        thread::sleep(slice);
1914        if last_tick.elapsed() < opts.tick {
1915            continue;
1916        }
1917        last_tick = std::time::Instant::now();
1918        let Ok(mut guard) = db.lock() else {
1919            return;
1920        };
1921        if guard.engine.catalog().hot_tier_bytes() <= opts.hot_tier_bytes {
1922            continue;
1923        }
1924        let Some((table, index)) = pick_freeze_target(&guard) else {
1925            continue;
1926        };
1927        let row_count = guard
1928            .engine
1929            .catalog()
1930            .get(&table)
1931            .map_or(0, spg_storage::Table::row_count);
1932        let to_freeze = opts.batch_rows.min(row_count);
1933        if to_freeze == 0 {
1934            continue;
1935        }
1936        if let Err(e) = guard.freeze_oldest_to_cold(&table, &index, to_freeze) {
1937            eprintln!("spg-embedded: background freeze on {table}.{index} failed: {e:?}");
1938            continue;
1939        }
1940        // v7.7.4 — auto-compact. If the catalog now carries
1941        // more cold segments than the configured threshold,
1942        // run a single compaction pass. Failures are reported
1943        // but don't kill the loop; the next tick will retry.
1944        let count = guard.engine.catalog().cold_segment_count();
1945        if count > opts.compact_when_segments_exceed {
1946            if let Err(e) = guard
1947                .engine
1948                .compact_cold_segments_with_target(opts.compact_target_bytes)
1949            {
1950                eprintln!(
1951                    "spg-embedded: background compact failed (segments={count}, \
1952                     threshold={}): {e:?}",
1953                    opts.compact_when_segments_exceed,
1954                );
1955            }
1956        }
1957    }
1958}
1959
1960/// v7.2.1 — pick the highest-`hot_bytes` user table with a
1961/// BTree integer-PK index. Returns `(table, index_name)` so the
1962/// caller can dispatch through `freeze_oldest_to_cold`.
1963fn pick_freeze_target(db: &Database) -> Option<(String, String)> {
1964    let cat = db.engine.catalog();
1965    let mut best: Option<(String, String, u64)> = None;
1966    for name in cat.table_names() {
1967        let Some(t) = cat.get(&name) else { continue };
1968        if t.row_count() == 0 {
1969            continue;
1970        }
1971        let cols = &t.schema().columns;
1972        let Some(idx) = t.indices().iter().find(|i| {
1973            matches!(i.kind, spg_storage::IndexKind::BTree(_))
1974                && i.column_position < cols.len()
1975                && matches!(
1976                    cols[i.column_position].ty,
1977                    spg_storage::DataType::SmallInt
1978                        | spg_storage::DataType::Int
1979                        | spg_storage::DataType::BigInt
1980                )
1981        }) else {
1982            continue;
1983        };
1984        let hot = t.hot_bytes();
1985        match best {
1986            None => best = Some((name, idx.name.clone(), hot)),
1987            Some((_, _, best_hot)) if hot > best_hot => {
1988                best = Some((name, idx.name.clone(), hot));
1989            }
1990            _ => {}
1991        }
1992    }
1993    best.map(|(t, i, _)| (t, i))
1994}
1995
1996/// v7.7.6 — replay the first `to_seq` records of the WAL at
1997/// `wal_path` into a fresh engine and write the resulting
1998/// catalog snapshot to `out_db_path`. Same semantics as
1999/// `spg revert --wal … --to-seq N --out …` from the CLI:
2000///
2001///   - `to_seq == 0` → snapshot is the empty catalog
2002///   - WAL records beyond `to_seq` are not applied
2003///   - durability-checkpoint markers (v3 type 0x02) are
2004///     consumed without counting against the budget
2005///
2006/// Returns the number of statements actually applied
2007/// (`≤ to_seq`). The output snapshot is byte-identical to
2008/// what `Database::open_path(out_db_path)` would consume on
2009/// a subsequent open.
2010///
2011/// This is the "rewind" operator for an embedded database
2012/// that has been corrupted by a poison statement or a
2013/// half-applied migration. Pair with `cold_segment_paths`
2014/// preservation if your cold-tier files are still on disk.
2015///
2016/// # Errors
2017///
2018/// - `wal_path` unreadable or truncated mid-record
2019/// - WAL record decodes to invalid UTF-8 SQL
2020/// - WAL record's SQL is rejected by the engine
2021/// - `out_db_path` unwritable
2022pub fn revert_wal_to_seq(
2023    wal_path: impl AsRef<Path>,
2024    to_seq: u64,
2025    out_db_path: impl AsRef<Path>,
2026) -> Result<u64, EngineError> {
2027    // v7.19 — accept either a single-file legacy WAL (v7.18 and
2028    // earlier layout) or a chunked WAL directory (v7.19+). For a
2029    // directory, concatenate every `.wal` chunk in sorted order
2030    // — the same order open_path replays them in — so revert
2031    // sees the full record stream.
2032    let path = wal_path.as_ref();
2033    let wal_bytes = if path.is_dir() {
2034        let mut combined = Vec::new();
2035        let chunks = sorted_wal_chunks(path).map_err(io_err)?;
2036        for chunk in chunks {
2037            let bytes = std::fs::read(&chunk).map_err(io_err)?;
2038            combined.extend_from_slice(&bytes);
2039        }
2040        combined
2041    } else {
2042        std::fs::read(path).map_err(io_err)?
2043    };
2044    let mut engine = Engine::new();
2045    let mut applied = 0u64;
2046    let mut cur = 0usize;
2047    while cur < wal_bytes.len() && applied < to_seq {
2048        let (sql_bytes, total) = decode_wal_record(&wal_bytes[cur..])?;
2049        cur += total;
2050        if sql_bytes.is_empty() {
2051            continue;
2052        }
2053        let sql = core::str::from_utf8(&sql_bytes).map_err(|e| {
2054            EngineError::Storage(spg_storage::StorageError::Corrupt(format!(
2055                "WAL record at offset {cur}: non-UTF-8 SQL: {e}"
2056            )))
2057        })?;
2058        engine.execute(sql)?;
2059        applied += 1;
2060    }
2061    let snapshot = engine.snapshot();
2062    std::fs::write(out_db_path.as_ref(), &snapshot).map_err(io_err)?;
2063    Ok(applied)
2064}
2065
2066/// v7.7.6 — decode one WAL record from a byte tail. Returns
2067/// `(sql_bytes, header_plus_payload_len)`. Handles the three
2068/// on-disk formats (v1 / v2 / v3) the same way the CLI
2069/// `decode_one_record` and the engine's `replay_wal_bytes`
2070/// do. CRCs are not re-validated; the caller's intent is
2071/// "apply", not "validate".
2072fn decode_wal_record(tail: &[u8]) -> Result<(Vec<u8>, usize), EngineError> {
2073    if tail.len() < 4 {
2074        return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
2075            format!("WAL truncated record: {} < 4 header bytes", tail.len()),
2076        )));
2077    }
2078    let raw_len = u32::from_le_bytes(tail[..4].try_into().unwrap());
2079    let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
2080    let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
2081    let len_mask = if is_v3 {
2082        !(WAL_V2_SENTINEL | WAL_V3_FLAG)
2083    } else {
2084        !WAL_V2_SENTINEL
2085    };
2086    let rec_len = (raw_len & len_mask) as usize;
2087    let header_len = if is_v3 {
2088        9
2089    } else if is_v2 {
2090        8
2091    } else {
2092        4
2093    };
2094    if tail.len() < header_len + rec_len {
2095        return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
2096            format!(
2097                "WAL truncated record: header+payload {} > available {}",
2098                header_len + rec_len,
2099                tail.len()
2100            ),
2101        )));
2102    }
2103    if is_v3 {
2104        let type_byte = tail[8];
2105        // v3 type 0x01 = auto_commit_sql (payload = SQL).
2106        // v3 type 0x02 = durability marker (no SQL to apply).
2107        // v4 type 0x10 = auto_commit_sql with 16-byte (lsn, ts)
2108        //                prefix between type and SQL — strip
2109        //                the prefix so the caller still sees raw
2110        //                SQL bytes.
2111        // Anything else is unknown.
2112        if type_byte == WAL_V3_TYPE_AUTO_COMMIT_SQL {
2113            let payload = &tail[header_len..header_len + rec_len];
2114            return Ok((payload.to_vec(), header_len + rec_len));
2115        }
2116        if type_byte == WAL_V4_TYPE_AUTO_COMMIT_SQL {
2117            let v4_total = header_len + WAL_V4_EXTRA_HEADER + rec_len;
2118            if tail.len() < v4_total {
2119                return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
2120                    format!(
2121                        "WAL truncated v4 record: header+payload {v4_total} > available {}",
2122                        tail.len()
2123                    ),
2124                )));
2125            }
2126            let sql_start = header_len + WAL_V4_EXTRA_HEADER;
2127            let sql_bytes = tail[sql_start..sql_start + rec_len].to_vec();
2128            return Ok((sql_bytes, v4_total));
2129        }
2130        // Caller treats empty payload as a skip-marker.
2131        return Ok((Vec::new(), header_len + rec_len));
2132    }
2133    let payload = &tail[header_len..header_len + rec_len];
2134    Ok((payload.to_vec(), header_len + rec_len))
2135}
2136
2137impl Drop for Database {
2138    fn drop(&mut self) {
2139        // v7.1 — best-effort final checkpoint when a persistent
2140        // Database leaves scope. Failures here go to stderr so
2141        // operators see them, but Drop can't propagate errors —
2142        // the WAL itself is already durable, so a checkpoint
2143        // miss only means the next boot replays a few more
2144        // records than strictly necessary.
2145        if self.persistence.is_some() {
2146            if let Err(e) = self.checkpoint() {
2147                eprintln!(
2148                    "spg-embedded: final checkpoint on Drop failed: {e:?} \
2149                     (WAL is intact; next open_path will replay)"
2150                );
2151            }
2152        }
2153        // v7.19 P3 — signal the retention thread to exit, then
2154        // wait for it. Done BEFORE the lock release so the
2155        // background thread doesn't outlive the database handle.
2156        // The signal + join pair lives behind take() because Drop
2157        // takes `&mut self` and we need to move the thread handle
2158        // out.
2159        if let Some(ctx) = self.persistence.as_mut() {
2160            if let Some(shutdown) = ctx.retention_shutdown.take() {
2161                shutdown.store(true, Ordering::SeqCst);
2162            }
2163            if let Some(handle) = ctx.retention_thread.take() {
2164                let _ = handle.join();
2165            }
2166        }
2167        // v7.17.0 Phase 6.2 — release the cross-process lock on
2168        // clean shutdown. Failure is logged but never panics;
2169        // the operator can clear a stale lock via
2170        // `Database::force_unlock` if a crash kept the
2171        // directory around.
2172        if let Some(ctx) = &self.persistence
2173            && ctx.lock_path.exists()
2174        {
2175            if let Err(e) = std::fs::remove_dir(&ctx.lock_path) {
2176                eprintln!(
2177                    "spg-embedded: lock release on Drop failed for {}: {e:?}",
2178                    ctx.lock_path.display()
2179                );
2180            }
2181        }
2182    }
2183}
2184
2185impl Database {
2186    /// v7.17.0 Phase 6.2 — clear a stale cross-process lock.
2187    /// Use when a previous process crashed mid-session and
2188    /// left `<db_path>.lock` behind. Operators should confirm
2189    /// no other process is currently using the database before
2190    /// calling this — SPG cannot fingerprint stale-vs-live
2191    /// without a libc dep, which would violate spg-embedded's
2192    /// zero-deps charter.
2193    pub fn force_unlock(db_path: impl AsRef<Path>) -> Result<(), EngineError> {
2194        let lock_path = {
2195            let mut p = db_path.as_ref().to_path_buf();
2196            let name = p
2197                .file_name()
2198                .map(|n| {
2199                    let mut s = n.to_os_string();
2200                    s.push(".lock");
2201                    s
2202                })
2203                .unwrap_or_else(|| std::ffi::OsString::from(".lock"));
2204            p.set_file_name(name);
2205            p
2206        };
2207        if !lock_path.exists() {
2208            return Ok(());
2209        }
2210        std::fs::remove_dir(&lock_path).map_err(io_err)
2211    }
2212}
2213
2214/// v7.1 — turn a `std::io::Error` into the workspace's
2215/// `EngineError` shape. `EngineError::Storage(Corrupt(_))` is
2216/// the closest existing variant — io failures during boot or
2217/// during a WAL append surface as a storage-layer fault to
2218/// callers, which keeps the public error enum unchanged.
2219fn io_err(e: std::io::Error) -> EngineError {
2220    EngineError::Storage(spg_storage::StorageError::Corrupt(format!("io: {e}")))
2221}
2222
2223/// v7.2.2 — `Database` is `Send`, so the recommended sharing
2224/// pattern for multi-threaded callers is `Arc<Mutex<Database>>`:
2225///
2226/// ```no_run
2227/// use std::sync::{Arc, Mutex};
2228/// use spg_embedded::Database;
2229///
2230/// let db = Database::open_in_memory();
2231/// let shared = Arc::new(Mutex::new(db));
2232/// let shared_for_worker = Arc::clone(&shared);
2233/// std::thread::spawn(move || {
2234///     let mut guard = shared_for_worker.lock().unwrap();
2235///     guard.execute("INSERT INTO t VALUES (1)").unwrap();
2236/// });
2237/// ```
2238///
2239/// Internal `RwLock`-wrapped state — letting many threads
2240/// hold concurrent `&Database` for `SELECT` without contending
2241/// — is parked as STABILITY § "Out of v7.2"; multi-reader
2242/// embedded throughput needs a planner-side change to release
2243/// the engine read lock between scans, which is the v7.x
2244/// "Choice A" line of work already documented in v6.9.1's
2245/// carve-out.
2246#[allow(dead_code)]
2247fn _database_is_send() {
2248    fn assert_send<T: Send>() {}
2249    assert_send::<Database>();
2250}
2251
2252/// v6.10.3 — trait that maps a row's columns onto a user
2253/// struct's fields. v7.3.0 ships the [`spg_row!`] declarative
2254/// macro that generates `impl FromSpgRow for YourStruct` from
2255/// a struct definition (no proc-macro, no syn/quote/
2256/// proc-macro2 deps — the workspace's "0 external deps"
2257/// policy holds).
2258///
2259/// Implementors map a row's columns onto a user struct's
2260/// fields. Errors surface as `EngineError::Unsupported` so the
2261/// caller's error type stays uniform.
2262pub trait FromSpgRow: Sized {
2263    /// Decode one query result row into `Self`. Called once per
2264    /// row by [`Database::query_typed`]. The slice length equals
2265    /// the number of columns in the SELECT projection.
2266    fn from_spg_row(row: &[Value]) -> Result<Self, EngineError>;
2267}
2268
2269/// v7.3.0 — declarative macro that generates `FromSpgRow` impl
2270/// for a user struct. Avoids proc-macro deps
2271/// (syn/quote/proc-macro2) so the workspace's 0-deps policy
2272/// holds; the trade-off vs `#[derive(SpgRow)]` is that the
2273/// macro takes the entire struct definition (fields + types)
2274/// as input rather than annotating an existing struct.
2275///
2276/// ```no_run
2277/// use spg_embedded::{Database, spg_row, FromSpgRow};
2278///
2279/// spg_row! {
2280///     pub struct User {
2281///         pub id: i32,
2282///         pub name: String,
2283///     }
2284/// }
2285///
2286/// let mut db = Database::open_in_memory();
2287/// db.execute("CREATE TABLE users (id INT NOT NULL, name TEXT)").unwrap();
2288/// db.execute("INSERT INTO users VALUES (1, 'alice')").unwrap();
2289/// let users: Vec<User> = db.query_typed("SELECT id, name FROM users").unwrap();
2290/// ```
2291///
2292/// Supported field types: `i16`, `i32`, `i64`, `f32`, `f64`,
2293/// `bool`, `String`, `Vec<f32>` (for `VECTOR(N)` columns),
2294/// `Option<T>` of any of the above.
2295#[macro_export]
2296macro_rules! spg_row {
2297    (
2298        $(#[$meta:meta])*
2299        $vis:vis struct $name:ident {
2300            $(
2301                $(#[$fmeta:meta])*
2302                $fvis:vis $field:ident : $ty:ty,
2303            )*
2304        }
2305    ) => {
2306        $(#[$meta])*
2307        #[derive(Debug, Clone)]
2308        $vis struct $name {
2309            $(
2310                $(#[$fmeta])*
2311                $fvis $field : $ty,
2312            )*
2313        }
2314
2315        impl $crate::FromSpgRow for $name {
2316            fn from_spg_row(row: &[$crate::Value]) -> ::core::result::Result<Self, $crate::EngineError> {
2317                let mut __spg_row_iter = row.iter();
2318                $(
2319                    let $field: $ty = {
2320                        let v = __spg_row_iter
2321                            .next()
2322                            .ok_or_else(|| $crate::EngineError::Unsupported(
2323                                ::std::format!(
2324                                    "spg_row! {}: missing column for field `{}`",
2325                                    ::core::stringify!($name),
2326                                    ::core::stringify!($field)
2327                                )
2328                            ))?;
2329                        <$ty as $crate::FromSpgValue>::from_spg_value(v)
2330                            .map_err(|e| $crate::EngineError::Unsupported(
2331                                ::std::format!(
2332                                    "spg_row! {}: column `{}`: {}",
2333                                    ::core::stringify!($name),
2334                                    ::core::stringify!($field),
2335                                    e
2336                                )
2337                            ))?
2338                    };
2339                )*
2340                Ok(Self { $($field,)* })
2341            }
2342        }
2343    };
2344}
2345
2346/// v7.3.0 — per-column decoder used by `spg_row!`. Surface
2347/// covers every numeric / text / bytes / bool variant in
2348/// `Value`, plus `Option<T>` for nullable columns.
2349pub trait FromSpgValue: Sized {
2350    /// Decode one cell into `Self`. The returned `&'static str`
2351    /// is a short diagnostic for type mismatches (e.g. `"expected
2352    /// integer, got TEXT"`); callers wrap it into their own
2353    /// error type.
2354    fn from_spg_value(v: &Value) -> Result<Self, &'static str>;
2355}
2356
2357macro_rules! impl_from_value_int {
2358    ($($t:ty),* $(,)?) => {
2359        $(
2360            impl FromSpgValue for $t {
2361                fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
2362                    match v {
2363                        Value::SmallInt(n) => <$t>::try_from(*n).map_err(|_| "SmallInt does not fit target int type"),
2364                        Value::Int(n)      => <$t>::try_from(*n).map_err(|_| "Int does not fit target int type"),
2365                        Value::BigInt(n)   => <$t>::try_from(*n).map_err(|_| "BigInt does not fit target int type"),
2366                        Value::Null        => Err("NULL in non-Option int column"),
2367                        _ => Err("non-integer value in int column"),
2368                    }
2369                }
2370            }
2371        )*
2372    };
2373}
2374impl_from_value_int!(i16, i32, i64);
2375
2376impl FromSpgValue for f32 {
2377    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
2378        match v {
2379            Value::Float(f) => Ok(*f as f32),
2380            Value::Null => Err("NULL in non-Option float column"),
2381            _ => Err("non-float value in float column"),
2382        }
2383    }
2384}
2385
2386impl FromSpgValue for f64 {
2387    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
2388        match v {
2389            Value::Float(f) => Ok(*f),
2390            Value::Null => Err("NULL in non-Option float column"),
2391            _ => Err("non-float value in float column"),
2392        }
2393    }
2394}
2395
2396impl FromSpgValue for bool {
2397    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
2398        match v {
2399            Value::Bool(b) => Ok(*b),
2400            Value::Null => Err("NULL in non-Option bool column"),
2401            _ => Err("non-bool value in bool column"),
2402        }
2403    }
2404}
2405
2406impl FromSpgValue for String {
2407    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
2408        match v {
2409            Value::Text(s) => Ok(s.clone()),
2410            Value::Null => Err("NULL in non-Option text column"),
2411            _ => Err("non-text value in String column"),
2412        }
2413    }
2414}
2415
2416impl FromSpgValue for Vec<f32> {
2417    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
2418        match v {
2419            Value::Vector(xs) => Ok(xs.clone()),
2420            Value::Null => Err("NULL in non-Option vector column"),
2421            _ => Err("non-vector value in Vec<f32> column"),
2422        }
2423    }
2424}
2425
2426impl<T: FromSpgValue> FromSpgValue for Option<T> {
2427    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
2428        match v {
2429            Value::Null => Ok(None),
2430            other => T::from_spg_value(other).map(Some),
2431        }
2432    }
2433}
2434
2435#[cfg(test)]
2436mod tests {
2437    use super::*;
2438
2439    #[test]
2440    fn in_memory_create_insert_select() {
2441        let mut db = Database::open_in_memory();
2442        db.execute("CREATE TABLE t (id INT NOT NULL, name TEXT)")
2443            .unwrap();
2444        db.execute("INSERT INTO t VALUES (1, 'alice')").unwrap();
2445        db.execute("INSERT INTO t VALUES (2, 'bob')").unwrap();
2446        let rows = db.query("SELECT id FROM t WHERE id = 1").unwrap();
2447        assert_eq!(rows.len(), 1);
2448        match &rows[0][0] {
2449            Value::Int(1) => {}
2450            other => panic!("expected Int(1), got {other:?}"),
2451        }
2452    }
2453
2454    #[test]
2455    fn query_on_non_select_errors() {
2456        let mut db = Database::open_in_memory();
2457        db.execute("CREATE TABLE t (id INT)").unwrap();
2458        let r = db.query("INSERT INTO t VALUES (1)");
2459        assert!(r.is_err(), "query() on INSERT must error");
2460    }
2461
2462    #[test]
2463    fn snapshot_roundtrip() {
2464        let mut db = Database::open_in_memory();
2465        db.execute("CREATE TABLE t (id INT NOT NULL)").unwrap();
2466        db.execute("INSERT INTO t VALUES (42)").unwrap();
2467        let bytes = db.snapshot();
2468        let mut restored = Database::restore(&bytes).unwrap();
2469        let rows = restored.query("SELECT id FROM t WHERE id = 42").unwrap();
2470        assert_eq!(rows.len(), 1);
2471        match &rows[0][0] {
2472            Value::Int(42) => {}
2473            other => panic!("expected Int(42), got {other:?}"),
2474        }
2475    }
2476
2477    #[test]
2478    fn from_spg_row_trait_shape() {
2479        struct User {
2480            _id: i32,
2481        }
2482        impl FromSpgRow for User {
2483            fn from_spg_row(row: &[Value]) -> Result<Self, EngineError> {
2484                match row.first() {
2485                    Some(Value::Int(n)) => Ok(Self { _id: *n }),
2486                    _ => Err(EngineError::Unsupported("bad id".into())),
2487                }
2488            }
2489        }
2490        let row = vec![Value::Int(7)];
2491        let _u = User::from_spg_row(&row).unwrap();
2492    }
2493}