Skip to main content

spg_embedded/
lib.rs

1// v7.7.2 — every public item in this crate must carry a
2// doc-comment; new code that adds a `pub` without one fails CI.
3#![deny(missing_docs)]
4
5//! # spg-embedded
6//!
7//! Ergonomic embedded-mode entry point for SPG. Wraps the
8//! `spg-engine` execution layer for in-process applications
9//! that don't want to spin up a TCP listener / fork to the
10//! `spg-server` binary.
11//!
12//! ## Quick start
13//!
14//! ```no_run
15//! use spg_embedded::Database;
16//!
17//! // On-disk, durable. WAL fsynced per commit; auto-checkpoint
18//! // at 4 MiB WAL by default.
19//! let mut db = Database::open_path("/data/app.db").unwrap();
20//! db.execute("CREATE TABLE users (id INT NOT NULL, name TEXT)").unwrap();
21//! db.execute("INSERT INTO users VALUES (1, 'alice')").unwrap();
22//! let rows = db.query("SELECT name FROM users WHERE id = 1").unwrap();
23//! for row in &rows {
24//!     println!("{:?}", row);
25//! }
26//! ```
27//!
28//! ## Production checklist (v7.5)
29//!
30//! - **Persistence**: `Database::open_path(p)` writes a
31//!   crash-consistent WAL + periodic checkpoint snapshot. The
32//!   on-disk format is byte-identical to what `spg-server`
33//!   produces, so a database can move between modes without
34//!   conversion.
35//! - **Durability**: every `execute()` that mutates calls
36//!   `fsync` before returning `Ok`. There is no group commit
37//!   in embedded mode — every commit pays one fsync. If you
38//!   need batch throughput, wrap multiple statements in
39//!   [`Database::with_transaction`] which fsyncs only at
40//!   commit.
41//! - **Concurrency**: [`Database`] is `Send` but **not** `Sync`.
42//!   Share across threads via `Arc<Mutex<Database>>`. The
43//!   single-writer model is intentional — see
44//!   [STABILITY § A1](https://github.com/lihao/spg/blob/master/STABILITY.md).
45//! - **Background work**: [`Database::spawn_background_freezer`]
46//!   moves cold rows to disk-resident segments while you keep
47//!   serving requests. It runs in a dedicated thread; drop the
48//!   returned [`FreezerHandle`] (or call `stop()`) for clean
49//!   shutdown.
50//! - **Errors**: all public enums ([`EngineError`],
51//!   [`QueryResult`], [`Value`]) are `#[non_exhaustive]`. Match
52//!   them with a wildcard arm so future v7.x releases can add
53//!   variants without breaking your code.
54//!
55//! ## Panic contract
56//!
57//! - **No `execute()` / `query()` call panics on user input.**
58//!   Malformed SQL, type mismatches, missing tables — all
59//!   return `Err(EngineError::…)`. If you observe a panic on
60//!   a user-controlled string, that is a bug; file an issue.
61//! - The library panics **only** on internal invariant
62//!   violations (e.g., catalog snapshot magic mismatch, WAL
63//!   record CRC sentinel corruption that survived the boot-
64//!   time validation). These represent silent disk corruption
65//!   and an unwind would leak inconsistent state, so the
66//!   release profile uses `panic = abort` — your host process
67//!   dies fast rather than continuing on poisoned data.
68//! - If you cannot tolerate `panic = abort`, build with
69//!   `--profile release-dbg` (keeps unwind tables) and use
70//!   `std::panic::catch_unwind` at your application boundary.
71//!
72//! ## Why a separate crate?
73//!
74//! `spg-engine` is `no_std`-compatible (vendored alloc-only).
75//! The embedded-mode entry point uses `std` (filesystem,
76//! threading), so it lives in its own crate to keep the
77//! `no_std` boundary clean.
78
79pub use spg_engine::{Engine, EngineError, ParsedStatement, QueryResult};
80pub use spg_storage::{ColumnSchema, DataType, Value};
81
82/// v7.16.0 — handle for a parsed-and-planned SQL statement.
83/// Hand off to [`Database::execute_prepared`] / [`Database::query_prepared`]
84/// with a `&[Value]` slice carrying the bind parameters (PG-style
85/// `$1`, `$2`, … positional). Cheap to `Clone`; the underlying AST
86/// is shared by handle copies and cloned per bind call by the
87/// engine's executor.
88///
89/// The handle holds a snapshot of the AST at prepare time. If
90/// the engine's plan cache evicts the entry between prepare and
91/// execute (e.g. ANALYZE bumps the statistics version) the
92/// stored AST keeps working — `execute_prepared` operates on
93/// the handle's clone, not the cache entry.
94#[derive(Debug, Clone)]
95pub struct Statement {
96    /// The parsed + planned AST. `spg-engine::prepare_cached`
97    /// returns it as a clone of the cached plan, so any rewrite
98    /// passes (`expand_group_by_all`, `reorder_joins`, …) have
99    /// already run.
100    pub(crate) stmt: ParsedStatement,
101    /// Original SQL source, kept for `Display` / debug only.
102    /// WAL persistence renders from the AST so a bind-time
103    /// rewrite of `$1..$N` survives replay.
104    pub(crate) sql: String,
105}
106
107impl Statement {
108    /// Borrow the original SQL source — useful for tracing and
109    /// debug logs. WAL replay does NOT use this; it serialises
110    /// the bind-final AST instead.
111    #[must_use]
112    pub fn sql(&self) -> &str {
113        &self.sql
114    }
115}
116
117/// v7.16.0 — internal WAL helper. Mirrors what
118/// `Engine::execute_prepared` does to the cloned AST so the WAL
119/// record carries the bind-final SQL text (so replay's
120/// simple-query path reconstructs the same row state without
121/// needing the original `Statement` handle to still be alive).
122/// Errors from the underlying engine helper would only fire if
123/// the bind-final stmt referenced a placeholder past the params
124/// slice — and that case has already errored in the executor
125/// above before this helper runs, so we discard the Result here.
126fn wal_render_with_params(stmt: &mut ParsedStatement, params: &[Value]) {
127    let _ = spg_engine::substitute_placeholders(stmt, params);
128}
129
130use std::collections::BTreeMap;
131use std::fs::{File, OpenOptions};
132use std::io::{Seek, SeekFrom, Write};
133use std::path::{Path, PathBuf};
134use std::sync::atomic::{AtomicBool, Ordering};
135use std::sync::{Arc, Mutex};
136use std::thread::{self, JoinHandle};
137use std::time::{Duration, SystemTime, UNIX_EPOCH};
138
139/// v7.11.3 — wall-clock provider injected into every embedded
140/// `Engine`. Microseconds since the Unix epoch; clamps to
141/// `i64::MAX` if the system clock is far-future. Used by SQL's
142/// `NOW()` / `CURRENT_TIMESTAMP` / `CURRENT_DATE` rewrite layer
143/// so PG-idiomatic time queries work without the caller wiring
144/// their own clock.
145fn wall_clock_micros() -> i64 {
146    SystemTime::now()
147        .duration_since(UNIX_EPOCH)
148        .map_or(0, |d| i64::try_from(d.as_micros()).unwrap_or(i64::MAX))
149}
150
151use spg_manifest::{CatalogManifest, ColdSegmentEntry, manifest_path as spg_manifest_path};
152
153// -- v7.1 WAL format constants (mirror `spg-server`'s) ---------
154// Kept private so callers can't mis-frame records; the v3 layout
155// is the same the server uses, so a `spg-server` boot can read a
156// database an embedded process wrote and vice versa.
157const WAL_V2_SENTINEL: u32 = 0x8000_0000;
158const WAL_V3_FLAG: u32 = 0x4000_0000;
159const WAL_V3_TYPE_AUTO_COMMIT_SQL: u8 = 0x01;
160
161/// v7.1 — auto-checkpoint threshold. Once the WAL grows past
162/// this many bytes, the next successful `execute()` call ends
163/// with a `checkpoint()` so the WAL stays bounded. Tunable via
164/// `SPG_EMBEDDED_CHECKPOINT_BYTES` env.
165fn default_checkpoint_threshold_bytes() -> u64 {
166    std::env::var("SPG_EMBEDDED_CHECKPOINT_BYTES")
167        .ok()
168        .and_then(|s| s.parse::<u64>().ok())
169        .filter(|&n| n > 0)
170        .unwrap_or(4 * 1024 * 1024)
171}
172
173/// v7.1 — encode one v3 `auto_commit_sql` record. Layout:
174///
175/// ```text
176/// [u32 LE (len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
177/// [u32 LE crc32 over (type_byte || sql_bytes)]
178/// [u8 type = 0x01]
179/// [sql bytes]
180/// ```
181fn encode_v3_auto_commit(sql: &str) -> Vec<u8> {
182    let payload = sql.as_bytes();
183    let mut crc_buf = Vec::with_capacity(1 + payload.len());
184    crc_buf.push(WAL_V3_TYPE_AUTO_COMMIT_SQL);
185    crc_buf.extend_from_slice(payload);
186    let crc = spg_crypto::crc32::crc32(&crc_buf);
187    let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
188    let mut out = Vec::with_capacity(4 + 4 + 1 + payload.len());
189    out.extend_from_slice(&header);
190    out.extend_from_slice(&crc.to_le_bytes());
191    out.push(WAL_V3_TYPE_AUTO_COMMIT_SQL);
192    out.extend_from_slice(payload);
193    out
194}
195
196/// v7.1 — decode + apply every record in `wal_bytes` to `engine`.
197/// Returns the count of records successfully applied. A truncated
198/// trailing record (mid-write torn) is dropped silently — the
199/// same recovery story `spg-server`'s boot path uses.
200fn replay_wal_into_engine(wal_bytes: &[u8], engine: &mut Engine) -> Result<usize, String> {
201    let mut applied = 0usize;
202    let mut cur = 0usize;
203    while cur < wal_bytes.len() {
204        if wal_bytes.len() - cur < 4 {
205            // Trailing partial header — torn write, drop and stop.
206            break;
207        }
208        let raw_len = u32::from_le_bytes(wal_bytes[cur..cur + 4].try_into().unwrap());
209        let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
210        let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
211        let len_mask = if is_v3 {
212            !(WAL_V2_SENTINEL | WAL_V3_FLAG)
213        } else {
214            !WAL_V2_SENTINEL
215        };
216        let rec_len = (raw_len & len_mask) as usize;
217        let header_len = if is_v3 {
218            9
219        } else if is_v2 {
220            8
221        } else {
222            4
223        };
224        if wal_bytes.len() - cur < header_len + rec_len {
225            // Torn record at the tail — drop, stop.
226            break;
227        }
228        if is_v3 {
229            let type_byte = wal_bytes[cur + 8];
230            match type_byte {
231                WAL_V3_TYPE_AUTO_COMMIT_SQL => {}
232                0x02 => {
233                    // durability_checkpoint marker — skip, no SQL.
234                    cur += header_len + rec_len;
235                    continue;
236                }
237                other => {
238                    return Err(format!(
239                        "WAL replay: unknown v3 type byte {other:#04x} at offset {cur}"
240                    ));
241                }
242            }
243        }
244        let sql_bytes = &wal_bytes[cur + header_len..cur + header_len + rec_len];
245        let sql = std::str::from_utf8(sql_bytes)
246            .map_err(|e| format!("WAL replay: non-UTF-8 SQL at offset {cur}: {e}"))?;
247        engine
248            .execute(sql)
249            .map_err(|e| format!("WAL replay: apply {sql:?} at offset {cur} rejected: {e:?}"))?;
250        applied += 1;
251        cur += header_len + rec_len;
252    }
253    Ok(applied)
254}
255
256/// v7.1 — predicate for "should the next `execute()` mutate the
257/// WAL?" Returns `false` for SELECT / SHOW / EXPLAIN / BEGIN /
258/// COMMIT / ROLLBACK and the SPG-specific verbs that don't go
259/// through the auto-commit record path on the server (CHECKPOINT,
260/// COMPACT). Conservative: anything we don't explicitly know is
261/// read-only falls through to "write a WAL record".
262fn sql_is_read_only(sql: &str) -> bool {
263    let t = sql.trim_start();
264    let head = t
265        .split(|c: char| c.is_whitespace() || c == ';' || c == '(')
266        .next()
267        .unwrap_or("");
268    matches!(
269        head.to_ascii_lowercase().as_str(),
270        "select"
271            | "show"
272            | "explain"
273            | "begin"
274            | "commit"
275            | "rollback"
276            | "checkpoint"
277            | "compact"
278            | "wait"
279            | "with"
280    )
281}
282
283/// Embedded SPG database handle. Owns an `Engine` + provides
284/// ergonomic wrappers around `execute` and `query`. Drops the
285/// engine on `Drop` — no WAL flush / fsync, because v6.10.3
286/// is in-memory only.
287#[derive(Debug)]
288pub struct Database {
289    engine: Engine,
290    /// v7.1 — persistence sidecar. When `Some(p)`, every
291    /// `execute(sql)` that mutates state appends a v3
292    /// `auto_commit_sql` WAL record + fsyncs before the call
293    /// returns; `Drop` writes a final catalog snapshot to
294    /// `<db_path>` so the next session boots from a clean
295    /// snapshot + an empty WAL. `None` = in-memory only (the
296    /// v6.10.3 shape).
297    persistence: Option<PersistenceCtx>,
298}
299
300#[derive(Debug)]
301#[allow(dead_code)] // `wal_path` is read at boot; kept for Drop/diag introspection.
302struct PersistenceCtx {
303    db_path: PathBuf,
304    wal_path: PathBuf,
305    wal: File,
306    /// Cached WAL file length so each `execute()` doesn't have
307    /// to stat. Refreshed on append + on `checkpoint()` (which
308    /// truncates back to 0).
309    wal_len: u64,
310    checkpoint_threshold_bytes: u64,
311    /// v7.1.4 — `<db_path>.spg/segments/` directory. Cold-tier
312    /// segments produced by `freeze_oldest_to_cold` / compaction
313    /// are persisted here as `seg_<id>.spg` files; the manifest
314    /// at `<db_path>.spg/manifest.v10` records every active
315    /// segment + its CRC32 so the next boot can verify + reload.
316    cold_segments_dir: PathBuf,
317    cold_segment_paths: BTreeMap<u32, PathBuf>,
318    /// v7.17.0 Phase 6.2 — cross-process exclusion lock. Acquired
319    /// via `fs::create_dir` on `<db_path>.lock` at open_path
320    /// entry; released on Drop by `fs::remove_dir`. atomic on
321    /// every supported platform. A second process opening the
322    /// same path while the first is still alive hits the
323    /// create_dir failure and returns
324    /// `EngineError::Unsupported("database is locked by another
325    /// process: …")`. Stale locks (process crashed mid-session)
326    /// must be cleared via `Database::force_unlock(path)` —
327    /// SPG can't safely fingerprint who owned a stale directory
328    /// without a libc dep, which would violate spg-embedded's
329    /// zero-deps charter.
330    lock_path: PathBuf,
331}
332
333impl Database {
334    /// Open a fresh in-memory database. No WAL, no catalog
335    /// snapshot on disk — perfect for tests + short-lived
336    /// CLI tools.
337    #[must_use]
338    pub fn open_in_memory() -> Self {
339        Self {
340            engine: Engine::new().with_clock(wall_clock_micros),
341            persistence: None,
342        }
343    }
344
345    /// v7.1 — Open or create a persistent database backed by
346    /// the file at `db_path`. The WAL lives at `db_path` +
347    /// ".wal" (e.g. `./data/spg.db` → `./data/spg.db.wal`). Boot
348    /// path:
349    ///
350    /// 1. If `db_path` exists, restore the catalog snapshot.
351    /// 2. If the WAL exists, replay every record into the
352    ///    restored engine — the same recovery story
353    ///    `spg-server` uses.
354    /// 3. Open the WAL in append+sync mode so subsequent
355    ///    `execute()` writes durably commit (one fsync per
356    ///    mutation).
357    ///
358    /// `Drop` writes a final catalog snapshot + truncates the
359    /// WAL — operators that need a sync barrier at a specific
360    /// point use `checkpoint()` explicitly.
361    pub fn open_path(db_path: impl AsRef<Path>) -> Result<Self, EngineError> {
362        let db_path = db_path.as_ref().to_path_buf();
363        let wal_path = {
364            let mut p = db_path.clone();
365            let name = p
366                .file_name()
367                .map(|n| {
368                    let mut s = n.to_os_string();
369                    s.push(".wal");
370                    s
371                })
372                .unwrap_or_else(|| std::ffi::OsString::from(".wal"));
373            p.set_file_name(name);
374            p
375        };
376        if let Some(parent) = db_path.parent()
377            && !parent.as_os_str().is_empty()
378        {
379            std::fs::create_dir_all(parent).map_err(io_err)?;
380        }
381        // v7.17.0 Phase 6.2 — acquire cross-process exclusion
382        // lock before touching any catalog / WAL bytes. atomic
383        // mkdir on every supported platform; a second process
384        // opening the same path while the first is still alive
385        // hits the create_dir failure and gets a clear error.
386        let lock_path = {
387            let mut p = db_path.clone();
388            let name = p
389                .file_name()
390                .map(|n| {
391                    let mut s = n.to_os_string();
392                    s.push(".lock");
393                    s
394                })
395                .unwrap_or_else(|| std::ffi::OsString::from(".lock"));
396            p.set_file_name(name);
397            p
398        };
399        std::fs::create_dir(&lock_path).map_err(|e| {
400            if e.kind() == std::io::ErrorKind::AlreadyExists {
401                EngineError::Unsupported(format!(
402                    "database is locked by another process (or stale lock): {}; \
403                     remove the directory manually after confirming no other \
404                     process holds it, or call Database::force_unlock()",
405                    lock_path.display()
406                ))
407            } else {
408                io_err(e)
409            }
410        })?;
411        let mut engine = if db_path.exists() {
412            let bytes = std::fs::read(&db_path).map_err(io_err)?;
413            let engine = Engine::restore_envelope(&bytes).map_err(|e| {
414                EngineError::Storage(spg_storage::StorageError::Corrupt(format!(
415                    "restore from {}: {e}",
416                    db_path.display()
417                )))
418            })?;
419            engine.with_clock(wall_clock_micros)
420        } else {
421            Engine::new().with_clock(wall_clock_micros)
422        };
423        // v7.1.4 — manifest-driven cold-segment reload. The
424        // manifest sidecar pairs the catalog snapshot CRC with a
425        // list of `(segment_id, path, crc32)` triples; verify
426        // before loading so a torn or stale manifest doesn't
427        // surface phantom data.
428        let cold_segments_dir = {
429            let parent = db_path.parent().unwrap_or_else(|| Path::new("."));
430            let stem = db_path
431                .file_stem()
432                .unwrap_or_else(|| std::ffi::OsStr::new("db"))
433                .to_string_lossy()
434                .into_owned();
435            parent.join(format!("{stem}.spg")).join("segments")
436        };
437        let mut cold_segment_paths: BTreeMap<u32, PathBuf> = BTreeMap::new();
438        let manifest_pth = spg_manifest_path(&db_path);
439        if manifest_pth.exists() && db_path.exists() {
440            let m_bytes = std::fs::read(&manifest_pth).map_err(io_err)?;
441            if let Ok(m) = CatalogManifest::deserialize(&m_bytes) {
442                let snap_bytes = std::fs::read(&db_path).map_err(io_err)?;
443                let snap_crc = spg_crypto::crc32::crc32(&snap_bytes);
444                if snap_crc == m.catalog_crc32 {
445                    for entry in &m.cold_segments {
446                        if let Ok(seg_bytes) = std::fs::read(&entry.path) {
447                            let computed = spg_crypto::crc32::crc32(&seg_bytes);
448                            if computed != entry.crc32 {
449                                eprintln!(
450                                    "spg-embedded: manifest skip segment {}: CRC mismatch",
451                                    entry.segment_id
452                                );
453                                continue;
454                            }
455                            if engine.catalog().cold_segment(entry.segment_id).is_some() {
456                                // Already loaded via Catalog::clone path (shouldn't happen
457                                // since Engine::new + restore_envelope don't populate cold).
458                                continue;
459                            }
460                            let mut new_cat = engine.catalog().clone();
461                            if let Err(e) =
462                                new_cat.load_segment_bytes_at(entry.segment_id, seg_bytes)
463                            {
464                                eprintln!(
465                                    "spg-embedded: manifest load segment {} failed: {e}",
466                                    entry.segment_id
467                                );
468                                continue;
469                            }
470                            engine.replace_catalog(new_cat);
471                            cold_segment_paths.insert(entry.segment_id, entry.path.clone());
472                        } else {
473                            eprintln!(
474                                "spg-embedded: manifest skip segment {}: file unreadable",
475                                entry.segment_id
476                            );
477                        }
478                    }
479                }
480            }
481        }
482        if wal_path.exists() {
483            let wal_bytes = std::fs::read(&wal_path).map_err(io_err)?;
484            if !wal_bytes.is_empty() {
485                replay_wal_into_engine(&wal_bytes, &mut engine)
486                    .map_err(|m| EngineError::Storage(spg_storage::StorageError::Corrupt(m)))?;
487            }
488        }
489        let wal = OpenOptions::new()
490            .create(true)
491            .append(true)
492            .read(true)
493            .open(&wal_path)
494            .map_err(io_err)?;
495        let wal_len = wal.metadata().map_err(io_err)?.len();
496        Ok(Self {
497            engine,
498            persistence: Some(PersistenceCtx {
499                db_path,
500                wal_path,
501                wal,
502                wal_len,
503                checkpoint_threshold_bytes: default_checkpoint_threshold_bytes(),
504                cold_segments_dir,
505                cold_segment_paths,
506                lock_path,
507            }),
508        })
509    }
510
511    /// v7.1.4 — freeze the oldest `max_rows` of `table_name`'s
512    /// hot tier into a brand-new cold-tier segment + persist
513    /// it to disk. Same semantics as `spg-server`'s freezer
514    /// thread; embedded just runs the freeze synchronously on
515    /// the caller's thread. Persistence + manifest update
516    /// happen as part of the next `checkpoint()` (or on Drop).
517    pub fn freeze_oldest_to_cold(
518        &mut self,
519        table_name: &str,
520        index_name: &str,
521        max_rows: usize,
522    ) -> Result<spg_storage::FreezeReport, EngineError> {
523        let report = self
524            .engine
525            .freeze_oldest_to_cold(table_name, index_name, max_rows)?;
526        if let Some(p) = &mut self.persistence {
527            std::fs::create_dir_all(&p.cold_segments_dir).map_err(io_err)?;
528            let final_path = p
529                .cold_segments_dir
530                .join(format!("seg_{}.spg", report.segment_id));
531            let tmp_path = p
532                .cold_segments_dir
533                .join(format!("seg_{}.spg.tmp", report.segment_id));
534            std::fs::write(&tmp_path, &report.segment_bytes).map_err(io_err)?;
535            std::fs::rename(&tmp_path, &final_path).map_err(io_err)?;
536            p.cold_segment_paths.insert(report.segment_id, final_path);
537        }
538        Ok(report)
539    }
540
541    /// v7.1 — override the auto-checkpoint WAL-size ceiling for
542    /// this `Database` instance. Default is
543    /// `SPG_EMBEDDED_CHECKPOINT_BYTES` env (4 MiB if unset); the
544    /// setter wins. No-op when the database is in-memory.
545    pub fn set_checkpoint_threshold_bytes(&mut self, bytes: u64) {
546        if let Some(p) = &mut self.persistence {
547            p.checkpoint_threshold_bytes = bytes.max(1);
548        }
549    }
550
551    /// v7.1 — flush a fresh catalog snapshot to `db_path` and
552    /// truncate the WAL. Idempotent; cheap when nothing has
553    /// happened since the last checkpoint. No-op when the
554    /// database is in-memory (no `db_path` configured).
555    ///
556    /// Called automatically when:
557    /// - the WAL grows past
558    ///   `SPG_EMBEDDED_CHECKPOINT_BYTES` (default 4 MiB) at the
559    ///   end of an `execute()`, and
560    /// - `Drop` runs (best-effort; checkpoint failure on drop is
561    ///   logged to stderr).
562    pub fn checkpoint(&mut self) -> Result<(), EngineError> {
563        let snapshot = self.engine.snapshot();
564        let Some(p) = &mut self.persistence else {
565            return Ok(());
566        };
567        // Snapshot first (atomic via tmp+rename), then WAL
568        // truncate. Same order as `spg-server`'s CHECKPOINT —
569        // a crash between the two leaves the WAL holding
570        // already-snapshotted ops, which replay cleanly on the
571        // next boot (idempotent for SPG's standard DDL/DML
572        // mutations).
573        let tmp = {
574            let mut t = p.db_path.clone();
575            let mut name = t
576                .file_name()
577                .map(std::ffi::OsStr::to_os_string)
578                .unwrap_or_default();
579            name.push(".tmp");
580            t.set_file_name(name);
581            t
582        };
583        std::fs::write(&tmp, &snapshot).map_err(io_err)?;
584        std::fs::rename(&tmp, &p.db_path).map_err(io_err)?;
585        // v7.1.4 — refresh the manifest so the next boot can
586        // reload cold segments alongside the snapshot. Bytes
587        // come from the freshly-written snapshot file (= the
588        // canonical CRC source).
589        if !p.cold_segment_paths.is_empty() {
590            let snap_crc = spg_crypto::crc32::crc32(&snapshot);
591            let entries: Vec<ColdSegmentEntry> = p
592                .cold_segment_paths
593                .iter()
594                .filter_map(|(&segment_id, path)| {
595                    let bytes = std::fs::read(path).ok()?;
596                    Some(ColdSegmentEntry {
597                        segment_id,
598                        path: path.clone(),
599                        crc32: spg_crypto::crc32::crc32(&bytes),
600                    })
601                })
602                .collect();
603            let manifest = CatalogManifest {
604                catalog_crc32: snap_crc,
605                cold_segments: entries,
606                wal_baseline_offset: 0,
607            };
608            let m_bytes = manifest.serialize();
609            let m_path = spg_manifest_path(&p.db_path);
610            if let Some(dir) = m_path.parent() {
611                std::fs::create_dir_all(dir).map_err(io_err)?;
612            }
613            let m_tmp = {
614                let mut t = m_path.clone();
615                let mut name = t
616                    .file_name()
617                    .map(std::ffi::OsStr::to_os_string)
618                    .unwrap_or_default();
619                name.push(".tmp");
620                t.set_file_name(name);
621                t
622            };
623            std::fs::write(&m_tmp, &m_bytes).map_err(io_err)?;
624            std::fs::rename(&m_tmp, &m_path).map_err(io_err)?;
625        }
626        p.wal.set_len(0).map_err(io_err)?;
627        p.wal.seek(SeekFrom::Start(0)).map_err(io_err)?;
628        p.wal.sync_data().map_err(io_err)?;
629        p.wal_len = 0;
630        Ok(())
631    }
632
633    /// Restore a database from a previously-captured catalog
634    /// snapshot. Pairs with `Database::snapshot()` for
635    /// round-tripping in-memory state without going through
636    /// the `spg-server` WAL.
637    pub fn restore(snapshot: &[u8]) -> Result<Self, EngineError> {
638        let engine = Engine::restore_envelope(snapshot).map_err(|e| {
639            EngineError::Storage(spg_storage::StorageError::Corrupt(format!("restore: {e}")))
640        })?;
641        Ok(Self {
642            engine,
643            persistence: None,
644        })
645    }
646
647    /// Take a catalog snapshot suitable for `Database::restore`.
648    /// The bytes are SPG's canonical catalog envelope (FILE_MAGIC
649    /// + version + payload); round-trips through every released
650    /// SPG version per the STABILITY contract.
651    #[must_use]
652    pub fn snapshot(&self) -> Vec<u8> {
653        self.engine.snapshot()
654    }
655
656    /// Execute a SQL statement and return the engine's
657    /// `QueryResult` verbatim. Pass-through for callers that
658    /// want to keep PG-flavoured column/row metadata.
659    ///
660    /// v7.1 — when the database was opened via `open_path`,
661    /// successful mutations are appended to the WAL + fsynced
662    /// before the call returns. A subsequent process crash will
663    /// recover state up to the last successful return from
664    /// `execute()`. Read-only statements (SELECT / SHOW /
665    /// EXPLAIN / BEGIN-COMMIT-ROLLBACK / CHECKPOINT / COMPACT
666    /// etc.) skip the WAL entirely.
667    pub fn execute(&mut self, sql: &str) -> Result<QueryResult, EngineError> {
668        let result = self.engine.execute(sql)?;
669        if self.persistence.is_some()
670            && !sql_is_read_only(sql)
671            && matches!(
672                &result,
673                QueryResult::CommandOk {
674                    modified_catalog: true,
675                    ..
676                }
677            )
678        {
679            // Append + sync the v3 record AFTER the in-memory
680            // exec succeeds, so a WAL record never describes a
681            // mutation that didn't actually apply. The crash
682            // window between in-memory commit and WAL fsync is
683            // bounded by one record — replay re-applies the
684            // statement idempotently on next boot if we crashed
685            // between (and SPG's DDL/DML are crash-idempotent at
686            // the granularities the wire protocol exposes).
687            let record = encode_v3_auto_commit(sql);
688            let p = self.persistence.as_mut().expect("checked above");
689            p.wal.write_all(&record).map_err(io_err)?;
690            p.wal.sync_data().map_err(io_err)?;
691            p.wal_len = p.wal_len.saturating_add(record.len() as u64);
692            if p.wal_len >= p.checkpoint_threshold_bytes {
693                self.checkpoint()?;
694            }
695        }
696        Ok(result)
697    }
698
699    /// v7.3.0 — typed-row variant of [`Database::query`]. Each
700    /// row decodes into a `T: FromSpgRow` so callers don't
701    /// pattern-match on `Value` themselves. Use [`spg_row!`] to
702    /// generate the impl, or write it by hand.
703    pub fn query_typed<T: FromSpgRow>(&mut self, sql: &str) -> Result<Vec<T>, EngineError> {
704        let rows = self.query(sql)?;
705        rows.into_iter().map(|r| T::from_spg_row(&r)).collect()
706    }
707
708    /// Run a SELECT and return rows as a `Vec<Vec<Value>>` —
709    /// strips the column-schema metadata for read-side
710    /// ergonomics. Errors on non-Rows results (DML / DDL
711    /// statements should go through `execute` instead).
712    pub fn query(&mut self, sql: &str) -> Result<Vec<Vec<Value>>, EngineError> {
713        match self.engine.execute(sql)? {
714            QueryResult::Rows { rows, .. } => Ok(rows.into_iter().map(|r| r.values).collect()),
715            QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
716                "query() expects a SELECT — use execute() for DML/DDL".into(),
717            )),
718            // v7.5.0 — QueryResult is #[non_exhaustive]; any future
719            // variant is not a SELECT row stream, treat as Unsupported.
720            _ => Err(EngineError::Unsupported(
721                "query() expects a SELECT — use execute() for DML/DDL".into(),
722            )),
723        }
724    }
725
726    /// v7.16.0 — column-aware variant of [`Self::query`].
727    /// Returns the column schema vec alongside the rows so
728    /// adapters (the spg-sqlx Row impl most notably) can drive
729    /// name + type-based column lookups. Errors on non-Rows
730    /// results identically to `query`.
731    pub fn query_with_columns(
732        &mut self,
733        sql: &str,
734    ) -> Result<(Vec<spg_storage::ColumnSchema>, Vec<Vec<Value>>), EngineError> {
735        match self.engine.execute(sql)? {
736            QueryResult::Rows { columns, rows } => {
737                Ok((columns, rows.into_iter().map(|r| r.values).collect()))
738            }
739            QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
740                "query_with_columns() expects a SELECT — use execute() for DML/DDL".into(),
741            )),
742            _ => Err(EngineError::Unsupported(
743                "query_with_columns() expects a SELECT — use execute() for DML/DDL".into(),
744            )),
745        }
746    }
747
748    /// v7.16.0 — column-aware variant of
749    /// [`Self::query_prepared`]. Same shape as
750    /// `query_with_columns` but driven from a prepared
751    /// statement + bound params.
752    pub fn query_prepared_with_columns(
753        &mut self,
754        stmt: &Statement,
755        params: &[Value],
756    ) -> Result<(Vec<spg_storage::ColumnSchema>, Vec<Vec<Value>>), EngineError> {
757        match self.engine.execute_prepared(stmt.stmt.clone(), params)? {
758            QueryResult::Rows { columns, rows } => {
759                Ok((columns, rows.into_iter().map(|r| r.values).collect()))
760            }
761            QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
762                "query_prepared_with_columns() expects a SELECT — use execute_prepared() for DML/DDL".into(),
763            )),
764            _ => Err(EngineError::Unsupported(
765                "query_prepared_with_columns() expects a SELECT — use execute_prepared() for DML/DDL".into(),
766            )),
767        }
768    }
769
770    /// Borrow the underlying engine. Escape hatch for callers
771    /// that need access to `spg-engine` APIs not yet surfaced
772    /// here (transactions, EXPLAIN ANALYZE, etc.).
773    #[must_use]
774    pub const fn engine(&self) -> &Engine {
775        &self.engine
776    }
777
778    /// Mutable borrow of the underlying engine. Same intent as
779    /// `engine()` but for write-side APIs (e.g. inserting
780    /// directly through `Catalog::insert` for high-throughput
781    /// bulk loads that bypass SQL parsing).
782    pub const fn engine_mut(&mut self) -> &mut Engine {
783        &mut self.engine
784    }
785
786    /// v7.16.0 — parse + plan a SQL string ONCE so subsequent
787    /// `execute_prepared` / `query_prepared` calls can re-bind
788    /// parameters without re-parsing. The returned [`Statement`]
789    /// is a thin handle around the AST + cached source SQL; it's
790    /// `Clone` so the same plan can drive many bind calls
791    /// concurrently (each call clones the AST and runs
792    /// placeholder substitution on the clone — the cached
793    /// plan stays intact).
794    ///
795    /// Plan caching follows the engine's existing version-aware
796    /// rule: a prepared `Statement` whose statistics version
797    /// has rolled (ANALYZE ran between prepare and execute)
798    /// will silently re-prepare under the hood. Callers don't
799    /// need to detect this.
800    ///
801    /// Placeholders in the SQL use PG's `$1`, `$2`, … convention.
802    /// `bind`-time `Value`s are passed as a slice; arity
803    /// mismatches surface as `EvalError::PlaceholderOutOfRange`
804    /// at `execute_prepared` time, not here.
805    ///
806    /// # Errors
807    /// Surfaces `EngineError` (parse error / plan rewrite
808    /// failure) from the underlying `Engine::prepare`.
809    pub fn prepare(&mut self, sql: &str) -> Result<Statement, EngineError> {
810        // Use the cached path so repeated prepares of the same
811        // SQL are O(1). The engine's plan cache stays shared
812        // across all callers of this Database — a single
813        // `PgPool`-shaped consumer (or, later, the spg-sqlx
814        // adapter) prepares once and reaps the win on every bind.
815        let stmt = self
816            .engine
817            .prepare_cached(sql)
818            .map_err(EngineError::Parse)?;
819        Ok(Statement {
820            stmt,
821            sql: sql.to_string(),
822        })
823    }
824
825    /// v7.17.0 Phase 3.P0-66 — describe a SQL string without
826    /// executing. Returns `(parameter_oid_count, output_columns)`
827    /// where `output_columns` is empty for non-SELECT statements
828    /// or for SELECT shapes the describe planner can't resolve
829    /// (JOIN / subquery / unknown table). Wraps
830    /// `Engine::describe_prepared` so the spg-sqlx bridge can
831    /// surface PG-shape Describe replies for
832    /// `sqlx::query!()` compile-time validation.
833    ///
834    /// # Errors
835    /// Propagates parse errors from the underlying prepare path.
836    pub fn describe(&mut self, sql: &str) -> Result<(Vec<u32>, Vec<ColumnSchema>), EngineError> {
837        let stmt = self
838            .engine
839            .prepare_cached(sql)
840            .map_err(EngineError::Parse)?;
841        Ok(self.engine.describe_prepared(&stmt))
842    }
843
844    /// v7.16.0 — execute a prepared statement with bound
845    /// parameters. Mirrors `Engine::execute_prepared`: clones
846    /// the AST, substitutes `$1..$N` → `params[0..N-1]`, runs.
847    ///
848    /// Persistence (WAL fsync + auto-checkpoint) follows the
849    /// same rules as `execute(sql)`: mutating statements get a
850    /// WAL record AFTER the in-memory exec succeeds. The WAL
851    /// record carries the substituted, bind-final SQL, so
852    /// replay reconstructs the same row state without needing
853    /// the original prepared `Statement` to still be alive.
854    ///
855    /// # Errors
856    /// Propagates engine errors. Param arity mismatch surfaces
857    /// as `EvalError::PlaceholderOutOfRange`.
858    pub fn execute_prepared(
859        &mut self,
860        stmt: &Statement,
861        params: &[Value],
862    ) -> Result<QueryResult, EngineError> {
863        let result = self.engine.execute_prepared(stmt.stmt.clone(), params)?;
864        // WAL persistence on the bind-final SQL. Build the
865        // canonical Display form by re-printing the
866        // placeholder-substituted statement (cheap — the AST
867        // is already in hand from execute_prepared's internal
868        // clone) so replay's path is identical to the
869        // simple-query path.
870        if self.persistence.is_some()
871            && matches!(
872                &result,
873                QueryResult::CommandOk {
874                    modified_catalog: true,
875                    ..
876                }
877            )
878        {
879            // Render the AST back to SQL for WAL replay. The
880            // placeholder positions are already substituted in
881            // the executed clone; we re-substitute on a fresh
882            // clone here purely to obtain the canonical text.
883            let mut wal_stmt = stmt.stmt.clone();
884            // Use the engine's substitute_placeholders entry —
885            // exposed via execute_prepared above. Here we
886            // re-run the substitution only for Display.
887            crate::wal_render_with_params(&mut wal_stmt, params);
888            let canonical = format!("{wal_stmt}");
889            let record = encode_v3_auto_commit(&canonical);
890            let p = self.persistence.as_mut().expect("checked above");
891            p.wal.write_all(&record).map_err(io_err)?;
892            p.wal.sync_data().map_err(io_err)?;
893            p.wal_len = p.wal_len.saturating_add(record.len() as u64);
894            if p.wal_len >= p.checkpoint_threshold_bytes {
895                self.checkpoint()?;
896            }
897        }
898        Ok(result)
899    }
900
901    /// v7.16.0 — run a prepared SELECT with bound params and
902    /// return rows as `Vec<Vec<Value>>`, matching `query()`
903    /// shape. SELECTs are read-only so this never writes the
904    /// WAL.
905    ///
906    /// # Errors
907    /// Returns `Unsupported` if the prepared statement isn't a
908    /// SELECT (use `execute_prepared` for DML/DDL).
909    pub fn query_prepared(
910        &mut self,
911        stmt: &Statement,
912        params: &[Value],
913    ) -> Result<Vec<Vec<Value>>, EngineError> {
914        match self.engine.execute_prepared(stmt.stmt.clone(), params)? {
915            QueryResult::Rows { rows, .. } => Ok(rows.into_iter().map(|r| r.values).collect()),
916            QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
917                "query_prepared() expects a SELECT — use execute_prepared() for DML/DDL".into(),
918            )),
919            _ => Err(EngineError::Unsupported(
920                "query_prepared() expects a SELECT — use execute_prepared() for DML/DDL".into(),
921            )),
922        }
923    }
924
925    /// v7.2.0 — run `body` inside an implicit `BEGIN` /
926    /// `COMMIT` pair. The body receives `&mut Database` so it
927    /// can `execute()` / `query()` like any other code path;
928    /// the only difference is that every write in the body
929    /// lands inside one transaction, and a returned `Err` from
930    /// the body triggers `ROLLBACK` before the error propagates.
931    ///
932    /// Nested calls are not supported — SPG's transaction
933    /// model is single-writer with explicit `BEGIN` /
934    /// `COMMIT` / `ROLLBACK`, and a nested `with_transaction`
935    /// would hit `EngineError::Unsupported("nested
936    /// transaction")` at the inner `BEGIN`.
937    pub fn with_transaction<R, F>(&mut self, body: F) -> Result<R, EngineError>
938    where
939        F: FnOnce(&mut Self) -> Result<R, EngineError>,
940    {
941        self.execute("BEGIN")?;
942        match body(self) {
943            Ok(value) => {
944                self.execute("COMMIT")?;
945                Ok(value)
946            }
947            Err(e) => {
948                // Best-effort rollback. If ROLLBACK itself
949                // fails (rare — the engine reports it via
950                // `Unsupported` only when there's no active
951                // TX, which can't happen here) we surface the
952                // original body error, not the rollback error.
953                let _ = self.execute("ROLLBACK");
954                Err(e)
955            }
956        }
957    }
958}
959
960impl Default for Database {
961    fn default() -> Self {
962        Self::open_in_memory()
963    }
964}
965
966/// v7.7.5 — observability snapshot returned by
967/// [`Database::metrics`]. Plain data, no allocations beyond
968/// what the struct itself takes; cheap to construct and
969/// cheap to serialise.
970#[derive(Debug, Clone, Copy, PartialEq, Eq)]
971#[non_exhaustive]
972pub struct EmbeddedMetrics {
973    /// Total live row count across every user table (hot
974    /// tier only — cold-tier rows live in segment files).
975    pub hot_rows: u64,
976    /// Sum of `Table::hot_bytes` across every user table.
977    /// Tracks against the freezer's `hot_tier_bytes` budget.
978    pub hot_bytes: u64,
979    /// Number of cold-tier segments registered in the catalog.
980    /// Includes tombstoned slots (segments retired by
981    /// compaction whose disk file may still be on disk).
982    pub cold_segments: u64,
983    /// User-table count (excludes any future engine-managed
984    /// internal tables).
985    pub tables: u64,
986    /// WAL size at last `execute()` / `checkpoint()`. Zero
987    /// when the database is in-memory.
988    pub wal_bytes: u64,
989    /// `true` when the database was opened with `open_path` —
990    /// i.e. WAL + checkpoint persistence is active.
991    pub persistent: bool,
992}
993
994/// v7.2.1 — handle returned by `spawn_background_freezer`.
995/// Drop signals the worker thread to wind down + joins it,
996/// so a `Database` (or its shared `Arc<Mutex<Database>>`)
997/// can safely drop after the handle does.
998#[must_use = "the background freezer keeps running until this handle is dropped"]
999#[derive(Debug)]
1000pub struct FreezerHandle {
1001    shutdown: Arc<AtomicBool>,
1002    join: Option<JoinHandle<()>>,
1003}
1004
1005impl FreezerHandle {
1006    /// v7.2.1 — request the worker stop + join. Idempotent;
1007    /// safe to call from `Drop` (which also calls it).
1008    pub fn stop(&mut self) {
1009        self.shutdown.store(true, Ordering::Release);
1010        if let Some(h) = self.join.take() {
1011            let _ = h.join();
1012        }
1013    }
1014}
1015
1016impl Drop for FreezerHandle {
1017    fn drop(&mut self) {
1018        self.stop();
1019    }
1020}
1021
1022/// v7.2.1 — knobs for `Database::spawn_background_freezer`.
1023#[derive(Debug, Clone)]
1024pub struct FreezerOptions {
1025    /// Tick interval. Worker wakes every `tick`, checks the
1026    /// catalog's `hot_tier_bytes`, and freezes if over budget.
1027    pub tick: Duration,
1028    /// Hot-tier byte budget. Exceeded → next tick freezes the
1029    /// largest table's oldest `batch_rows` rows into a new
1030    /// cold segment.
1031    pub hot_tier_bytes: u64,
1032    /// Max rows the freezer demotes per fire.
1033    pub batch_rows: usize,
1034    /// v7.7.4 — auto-compact threshold. When the catalog has
1035    /// at least this many cold segments across all tables, the
1036    /// freezer fires a compaction pass after its next freeze.
1037    /// Set to `usize::MAX` to disable auto-compact entirely;
1038    /// the default is `64`, matching the `spg-server` operating
1039    /// point for SPG_COLD_COMPACT_SEGMENT_THRESHOLD.
1040    pub compact_when_segments_exceed: usize,
1041    /// v7.7.4 — target segment size for compaction merges,
1042    /// in bytes. Default 64 MiB, mirroring `spg-server`. Small
1043    /// segments below this size are merge candidates;
1044    /// segments at or above stay untouched.
1045    pub compact_target_bytes: u64,
1046}
1047
1048impl Default for FreezerOptions {
1049    fn default() -> Self {
1050        // Match the `spg-server` freezer's default operating
1051        // point (SPG_HOT_TIER_BYTES = 4 GiB, batch 1000 rows,
1052        // tick every 1 s) so embedded behaviour is predictable
1053        // for operators familiar with the server.
1054        Self {
1055            tick: Duration::from_secs(1),
1056            hot_tier_bytes: 4 * 1024 * 1024 * 1024,
1057            batch_rows: 1000,
1058            compact_when_segments_exceed: 64,
1059            compact_target_bytes: 64 * 1024 * 1024,
1060        }
1061    }
1062}
1063
1064impl Database {
1065    /// v7.7.4 — observe the catalog's cold-segment count.
1066    /// Useful for tests + dashboards that want to verify
1067    /// auto-compaction is firing.
1068    #[must_use]
1069    pub fn cold_segment_count(&self) -> usize {
1070        self.engine.catalog().cold_segment_count()
1071    }
1072
1073    /// v7.7.5 — observability snapshot. Returns a point-in-time
1074    /// view of the engine + persistence counters. Cheap (no
1075    /// locks beyond the existing `&self` borrow), so safe to
1076    /// call from a hot metrics-scrape path.
1077    ///
1078    /// Fields mirror the operational dashboard
1079    /// [`spg-server`](https://crates.io/crates/spg-server) exposes,
1080    /// minus the network counters that don't apply to embedded.
1081    #[must_use]
1082    pub fn metrics(&self) -> EmbeddedMetrics {
1083        let cat = self.engine.catalog();
1084        let mut hot_rows: u64 = 0;
1085        let mut hot_bytes: u64 = 0;
1086        for name in cat.table_names() {
1087            if let Some(t) = cat.get(&name) {
1088                hot_rows = hot_rows.saturating_add(t.row_count() as u64);
1089                hot_bytes = hot_bytes.saturating_add(t.hot_bytes());
1090            }
1091        }
1092        let (wal_bytes, persistent) = match &self.persistence {
1093            Some(p) => (p.wal_len, true),
1094            None => (0, false),
1095        };
1096        EmbeddedMetrics {
1097            hot_rows,
1098            hot_bytes,
1099            cold_segments: cat.cold_segment_count() as u64,
1100            tables: cat.table_count() as u64,
1101            wal_bytes,
1102            persistent,
1103        }
1104    }
1105
1106    /// v7.2.1 — spawn a background thread that periodically
1107    /// runs `freeze_oldest_to_cold` when the catalog-wide hot
1108    /// tier exceeds `opts.hot_tier_bytes`. The `Arc<Mutex<_>>`
1109    /// pattern matches the v7.2 sharing story: callers wrap
1110    /// their `Database` in `Arc::new(Mutex::new(db))` once,
1111    /// then clone the Arc for the worker + for foreground
1112    /// access. Return value is a handle whose `Drop` joins the
1113    /// worker.
1114    ///
1115    /// Picks the freeze target the same way `spg-server`'s
1116    /// freezer does: largest-`hot_bytes` user table with at
1117    /// least one BTree integer-PK index. Tables without a
1118    /// freezable index are skipped silently.
1119    pub fn spawn_background_freezer(
1120        db: Arc<Mutex<Database>>,
1121        opts: FreezerOptions,
1122    ) -> FreezerHandle {
1123        let shutdown = Arc::new(AtomicBool::new(false));
1124        let shutdown_for_thread = Arc::clone(&shutdown);
1125        let join = thread::Builder::new()
1126            .name("spg-embedded-freezer".into())
1127            .spawn(move || {
1128                background_freezer_loop(db, opts, shutdown_for_thread);
1129            })
1130            .expect("spawn background freezer thread");
1131        FreezerHandle {
1132            shutdown,
1133            join: Some(join),
1134        }
1135    }
1136}
1137
1138/// v7.2.1 — the freezer's main loop, factored out so the
1139/// `Database::spawn_background_freezer` path stays readable.
1140fn background_freezer_loop(
1141    db: Arc<Mutex<Database>>,
1142    opts: FreezerOptions,
1143    shutdown: Arc<AtomicBool>,
1144) {
1145    // Sleep in short slices so a shutdown request resolves
1146    // quickly (vs sleeping the full tick).
1147    let slice = Duration::from_millis(50.min(opts.tick.as_millis() as u64));
1148    let mut last_tick = std::time::Instant::now();
1149    loop {
1150        if shutdown.load(Ordering::Acquire) {
1151            return;
1152        }
1153        thread::sleep(slice);
1154        if last_tick.elapsed() < opts.tick {
1155            continue;
1156        }
1157        last_tick = std::time::Instant::now();
1158        let Ok(mut guard) = db.lock() else {
1159            return;
1160        };
1161        if guard.engine.catalog().hot_tier_bytes() <= opts.hot_tier_bytes {
1162            continue;
1163        }
1164        let Some((table, index)) = pick_freeze_target(&guard) else {
1165            continue;
1166        };
1167        let row_count = guard
1168            .engine
1169            .catalog()
1170            .get(&table)
1171            .map_or(0, spg_storage::Table::row_count);
1172        let to_freeze = opts.batch_rows.min(row_count);
1173        if to_freeze == 0 {
1174            continue;
1175        }
1176        if let Err(e) = guard.freeze_oldest_to_cold(&table, &index, to_freeze) {
1177            eprintln!("spg-embedded: background freeze on {table}.{index} failed: {e:?}");
1178            continue;
1179        }
1180        // v7.7.4 — auto-compact. If the catalog now carries
1181        // more cold segments than the configured threshold,
1182        // run a single compaction pass. Failures are reported
1183        // but don't kill the loop; the next tick will retry.
1184        let count = guard.engine.catalog().cold_segment_count();
1185        if count > opts.compact_when_segments_exceed {
1186            if let Err(e) = guard
1187                .engine
1188                .compact_cold_segments_with_target(opts.compact_target_bytes)
1189            {
1190                eprintln!(
1191                    "spg-embedded: background compact failed (segments={count}, \
1192                     threshold={}): {e:?}",
1193                    opts.compact_when_segments_exceed,
1194                );
1195            }
1196        }
1197    }
1198}
1199
1200/// v7.2.1 — pick the highest-`hot_bytes` user table with a
1201/// BTree integer-PK index. Returns `(table, index_name)` so the
1202/// caller can dispatch through `freeze_oldest_to_cold`.
1203fn pick_freeze_target(db: &Database) -> Option<(String, String)> {
1204    let cat = db.engine.catalog();
1205    let mut best: Option<(String, String, u64)> = None;
1206    for name in cat.table_names() {
1207        let Some(t) = cat.get(&name) else { continue };
1208        if t.row_count() == 0 {
1209            continue;
1210        }
1211        let cols = &t.schema().columns;
1212        let Some(idx) = t.indices().iter().find(|i| {
1213            matches!(i.kind, spg_storage::IndexKind::BTree(_))
1214                && i.column_position < cols.len()
1215                && matches!(
1216                    cols[i.column_position].ty,
1217                    spg_storage::DataType::SmallInt
1218                        | spg_storage::DataType::Int
1219                        | spg_storage::DataType::BigInt
1220                )
1221        }) else {
1222            continue;
1223        };
1224        let hot = t.hot_bytes();
1225        match best {
1226            None => best = Some((name, idx.name.clone(), hot)),
1227            Some((_, _, best_hot)) if hot > best_hot => {
1228                best = Some((name, idx.name.clone(), hot));
1229            }
1230            _ => {}
1231        }
1232    }
1233    best.map(|(t, i, _)| (t, i))
1234}
1235
1236/// v7.7.6 — replay the first `to_seq` records of the WAL at
1237/// `wal_path` into a fresh engine and write the resulting
1238/// catalog snapshot to `out_db_path`. Same semantics as
1239/// `spg revert --wal … --to-seq N --out …` from the CLI:
1240///
1241///   - `to_seq == 0` → snapshot is the empty catalog
1242///   - WAL records beyond `to_seq` are not applied
1243///   - durability-checkpoint markers (v3 type 0x02) are
1244///     consumed without counting against the budget
1245///
1246/// Returns the number of statements actually applied
1247/// (`≤ to_seq`). The output snapshot is byte-identical to
1248/// what `Database::open_path(out_db_path)` would consume on
1249/// a subsequent open.
1250///
1251/// This is the "rewind" operator for an embedded database
1252/// that has been corrupted by a poison statement or a
1253/// half-applied migration. Pair with `cold_segment_paths`
1254/// preservation if your cold-tier files are still on disk.
1255///
1256/// # Errors
1257///
1258/// - `wal_path` unreadable or truncated mid-record
1259/// - WAL record decodes to invalid UTF-8 SQL
1260/// - WAL record's SQL is rejected by the engine
1261/// - `out_db_path` unwritable
1262pub fn revert_wal_to_seq(
1263    wal_path: impl AsRef<Path>,
1264    to_seq: u64,
1265    out_db_path: impl AsRef<Path>,
1266) -> Result<u64, EngineError> {
1267    let wal_bytes = std::fs::read(wal_path.as_ref()).map_err(io_err)?;
1268    let mut engine = Engine::new();
1269    let mut applied = 0u64;
1270    let mut cur = 0usize;
1271    while cur < wal_bytes.len() && applied < to_seq {
1272        let (sql_bytes, total) = decode_wal_record(&wal_bytes[cur..])?;
1273        cur += total;
1274        if sql_bytes.is_empty() {
1275            continue;
1276        }
1277        let sql = core::str::from_utf8(&sql_bytes).map_err(|e| {
1278            EngineError::Storage(spg_storage::StorageError::Corrupt(format!(
1279                "WAL record at offset {cur}: non-UTF-8 SQL: {e}"
1280            )))
1281        })?;
1282        engine.execute(sql)?;
1283        applied += 1;
1284    }
1285    let snapshot = engine.snapshot();
1286    std::fs::write(out_db_path.as_ref(), &snapshot).map_err(io_err)?;
1287    Ok(applied)
1288}
1289
1290/// v7.7.6 — decode one WAL record from a byte tail. Returns
1291/// `(sql_bytes, header_plus_payload_len)`. Handles the three
1292/// on-disk formats (v1 / v2 / v3) the same way the CLI
1293/// `decode_one_record` and the engine's `replay_wal_bytes`
1294/// do. CRCs are not re-validated; the caller's intent is
1295/// "apply", not "validate".
1296fn decode_wal_record(tail: &[u8]) -> Result<(Vec<u8>, usize), EngineError> {
1297    if tail.len() < 4 {
1298        return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
1299            format!("WAL truncated record: {} < 4 header bytes", tail.len()),
1300        )));
1301    }
1302    let raw_len = u32::from_le_bytes(tail[..4].try_into().unwrap());
1303    let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
1304    let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
1305    let len_mask = if is_v3 {
1306        !(WAL_V2_SENTINEL | WAL_V3_FLAG)
1307    } else {
1308        !WAL_V2_SENTINEL
1309    };
1310    let rec_len = (raw_len & len_mask) as usize;
1311    let header_len = if is_v3 {
1312        9
1313    } else if is_v2 {
1314        8
1315    } else {
1316        4
1317    };
1318    if tail.len() < header_len + rec_len {
1319        return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
1320            format!(
1321                "WAL truncated record: header+payload {} > available {}",
1322                header_len + rec_len,
1323                tail.len()
1324            ),
1325        )));
1326    }
1327    let payload = &tail[header_len..header_len + rec_len];
1328    let sql_bytes = if is_v3 {
1329        let type_byte = tail[8];
1330        // v3 type 0x01 = auto_commit_sql (payload = SQL).
1331        // v3 type 0x02 = durability marker (payload = u64
1332        // offset, no SQL to apply). Anything else is unknown.
1333        if type_byte == WAL_V3_TYPE_AUTO_COMMIT_SQL {
1334            payload.to_vec()
1335        } else {
1336            // Caller treats empty payload as a skip-marker.
1337            Vec::new()
1338        }
1339    } else {
1340        payload.to_vec()
1341    };
1342    Ok((sql_bytes, header_len + rec_len))
1343}
1344
1345impl Drop for Database {
1346    fn drop(&mut self) {
1347        // v7.1 — best-effort final checkpoint when a persistent
1348        // Database leaves scope. Failures here go to stderr so
1349        // operators see them, but Drop can't propagate errors —
1350        // the WAL itself is already durable, so a checkpoint
1351        // miss only means the next boot replays a few more
1352        // records than strictly necessary.
1353        if self.persistence.is_some() {
1354            if let Err(e) = self.checkpoint() {
1355                eprintln!(
1356                    "spg-embedded: final checkpoint on Drop failed: {e:?} \
1357                     (WAL is intact; next open_path will replay)"
1358                );
1359            }
1360        }
1361        // v7.17.0 Phase 6.2 — release the cross-process lock on
1362        // clean shutdown. Failure is logged but never panics;
1363        // the operator can clear a stale lock via
1364        // `Database::force_unlock` if a crash kept the
1365        // directory around.
1366        if let Some(ctx) = &self.persistence
1367            && ctx.lock_path.exists()
1368        {
1369            if let Err(e) = std::fs::remove_dir(&ctx.lock_path) {
1370                eprintln!(
1371                    "spg-embedded: lock release on Drop failed for {}: {e:?}",
1372                    ctx.lock_path.display()
1373                );
1374            }
1375        }
1376    }
1377}
1378
1379impl Database {
1380    /// v7.17.0 Phase 6.2 — clear a stale cross-process lock.
1381    /// Use when a previous process crashed mid-session and
1382    /// left `<db_path>.lock` behind. Operators should confirm
1383    /// no other process is currently using the database before
1384    /// calling this — SPG cannot fingerprint stale-vs-live
1385    /// without a libc dep, which would violate spg-embedded's
1386    /// zero-deps charter.
1387    pub fn force_unlock(db_path: impl AsRef<Path>) -> Result<(), EngineError> {
1388        let lock_path = {
1389            let mut p = db_path.as_ref().to_path_buf();
1390            let name = p
1391                .file_name()
1392                .map(|n| {
1393                    let mut s = n.to_os_string();
1394                    s.push(".lock");
1395                    s
1396                })
1397                .unwrap_or_else(|| std::ffi::OsString::from(".lock"));
1398            p.set_file_name(name);
1399            p
1400        };
1401        if !lock_path.exists() {
1402            return Ok(());
1403        }
1404        std::fs::remove_dir(&lock_path).map_err(io_err)
1405    }
1406}
1407
1408/// v7.1 — turn a `std::io::Error` into the workspace's
1409/// `EngineError` shape. `EngineError::Storage(Corrupt(_))` is
1410/// the closest existing variant — io failures during boot or
1411/// during a WAL append surface as a storage-layer fault to
1412/// callers, which keeps the public error enum unchanged.
1413fn io_err(e: std::io::Error) -> EngineError {
1414    EngineError::Storage(spg_storage::StorageError::Corrupt(format!("io: {e}")))
1415}
1416
1417/// v7.2.2 — `Database` is `Send`, so the recommended sharing
1418/// pattern for multi-threaded callers is `Arc<Mutex<Database>>`:
1419///
1420/// ```no_run
1421/// use std::sync::{Arc, Mutex};
1422/// use spg_embedded::Database;
1423///
1424/// let db = Database::open_in_memory();
1425/// let shared = Arc::new(Mutex::new(db));
1426/// let shared_for_worker = Arc::clone(&shared);
1427/// std::thread::spawn(move || {
1428///     let mut guard = shared_for_worker.lock().unwrap();
1429///     guard.execute("INSERT INTO t VALUES (1)").unwrap();
1430/// });
1431/// ```
1432///
1433/// Internal `RwLock`-wrapped state — letting many threads
1434/// hold concurrent `&Database` for `SELECT` without contending
1435/// — is parked as STABILITY § "Out of v7.2"; multi-reader
1436/// embedded throughput needs a planner-side change to release
1437/// the engine read lock between scans, which is the v7.x
1438/// "Choice A" line of work already documented in v6.9.1's
1439/// carve-out.
1440#[allow(dead_code)]
1441fn _database_is_send() {
1442    fn assert_send<T: Send>() {}
1443    assert_send::<Database>();
1444}
1445
1446/// v6.10.3 — trait that maps a row's columns onto a user
1447/// struct's fields. v7.3.0 ships the [`spg_row!`] declarative
1448/// macro that generates `impl FromSpgRow for YourStruct` from
1449/// a struct definition (no proc-macro, no syn/quote/
1450/// proc-macro2 deps — the workspace's "0 external deps"
1451/// policy holds).
1452///
1453/// Implementors map a row's columns onto a user struct's
1454/// fields. Errors surface as `EngineError::Unsupported` so the
1455/// caller's error type stays uniform.
1456pub trait FromSpgRow: Sized {
1457    /// Decode one query result row into `Self`. Called once per
1458    /// row by [`Database::query_typed`]. The slice length equals
1459    /// the number of columns in the SELECT projection.
1460    fn from_spg_row(row: &[Value]) -> Result<Self, EngineError>;
1461}
1462
1463/// v7.3.0 — declarative macro that generates `FromSpgRow` impl
1464/// for a user struct. Avoids proc-macro deps
1465/// (syn/quote/proc-macro2) so the workspace's 0-deps policy
1466/// holds; the trade-off vs `#[derive(SpgRow)]` is that the
1467/// macro takes the entire struct definition (fields + types)
1468/// as input rather than annotating an existing struct.
1469///
1470/// ```no_run
1471/// use spg_embedded::{Database, spg_row, FromSpgRow};
1472///
1473/// spg_row! {
1474///     pub struct User {
1475///         pub id: i32,
1476///         pub name: String,
1477///     }
1478/// }
1479///
1480/// let mut db = Database::open_in_memory();
1481/// db.execute("CREATE TABLE users (id INT NOT NULL, name TEXT)").unwrap();
1482/// db.execute("INSERT INTO users VALUES (1, 'alice')").unwrap();
1483/// let users: Vec<User> = db.query_typed("SELECT id, name FROM users").unwrap();
1484/// ```
1485///
1486/// Supported field types: `i16`, `i32`, `i64`, `f32`, `f64`,
1487/// `bool`, `String`, `Vec<f32>` (for `VECTOR(N)` columns),
1488/// `Option<T>` of any of the above.
1489#[macro_export]
1490macro_rules! spg_row {
1491    (
1492        $(#[$meta:meta])*
1493        $vis:vis struct $name:ident {
1494            $(
1495                $(#[$fmeta:meta])*
1496                $fvis:vis $field:ident : $ty:ty,
1497            )*
1498        }
1499    ) => {
1500        $(#[$meta])*
1501        #[derive(Debug, Clone)]
1502        $vis struct $name {
1503            $(
1504                $(#[$fmeta])*
1505                $fvis $field : $ty,
1506            )*
1507        }
1508
1509        impl $crate::FromSpgRow for $name {
1510            fn from_spg_row(row: &[$crate::Value]) -> ::core::result::Result<Self, $crate::EngineError> {
1511                let mut __spg_row_iter = row.iter();
1512                $(
1513                    let $field: $ty = {
1514                        let v = __spg_row_iter
1515                            .next()
1516                            .ok_or_else(|| $crate::EngineError::Unsupported(
1517                                ::std::format!(
1518                                    "spg_row! {}: missing column for field `{}`",
1519                                    ::core::stringify!($name),
1520                                    ::core::stringify!($field)
1521                                )
1522                            ))?;
1523                        <$ty as $crate::FromSpgValue>::from_spg_value(v)
1524                            .map_err(|e| $crate::EngineError::Unsupported(
1525                                ::std::format!(
1526                                    "spg_row! {}: column `{}`: {}",
1527                                    ::core::stringify!($name),
1528                                    ::core::stringify!($field),
1529                                    e
1530                                )
1531                            ))?
1532                    };
1533                )*
1534                Ok(Self { $($field,)* })
1535            }
1536        }
1537    };
1538}
1539
1540/// v7.3.0 — per-column decoder used by `spg_row!`. Surface
1541/// covers every numeric / text / bytes / bool variant in
1542/// `Value`, plus `Option<T>` for nullable columns.
1543pub trait FromSpgValue: Sized {
1544    /// Decode one cell into `Self`. The returned `&'static str`
1545    /// is a short diagnostic for type mismatches (e.g. `"expected
1546    /// integer, got TEXT"`); callers wrap it into their own
1547    /// error type.
1548    fn from_spg_value(v: &Value) -> Result<Self, &'static str>;
1549}
1550
1551macro_rules! impl_from_value_int {
1552    ($($t:ty),* $(,)?) => {
1553        $(
1554            impl FromSpgValue for $t {
1555                fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
1556                    match v {
1557                        Value::SmallInt(n) => <$t>::try_from(*n).map_err(|_| "SmallInt does not fit target int type"),
1558                        Value::Int(n)      => <$t>::try_from(*n).map_err(|_| "Int does not fit target int type"),
1559                        Value::BigInt(n)   => <$t>::try_from(*n).map_err(|_| "BigInt does not fit target int type"),
1560                        Value::Null        => Err("NULL in non-Option int column"),
1561                        _ => Err("non-integer value in int column"),
1562                    }
1563                }
1564            }
1565        )*
1566    };
1567}
1568impl_from_value_int!(i16, i32, i64);
1569
1570impl FromSpgValue for f32 {
1571    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
1572        match v {
1573            Value::Float(f) => Ok(*f as f32),
1574            Value::Null => Err("NULL in non-Option float column"),
1575            _ => Err("non-float value in float column"),
1576        }
1577    }
1578}
1579
1580impl FromSpgValue for f64 {
1581    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
1582        match v {
1583            Value::Float(f) => Ok(*f),
1584            Value::Null => Err("NULL in non-Option float column"),
1585            _ => Err("non-float value in float column"),
1586        }
1587    }
1588}
1589
1590impl FromSpgValue for bool {
1591    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
1592        match v {
1593            Value::Bool(b) => Ok(*b),
1594            Value::Null => Err("NULL in non-Option bool column"),
1595            _ => Err("non-bool value in bool column"),
1596        }
1597    }
1598}
1599
1600impl FromSpgValue for String {
1601    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
1602        match v {
1603            Value::Text(s) => Ok(s.clone()),
1604            Value::Null => Err("NULL in non-Option text column"),
1605            _ => Err("non-text value in String column"),
1606        }
1607    }
1608}
1609
1610impl FromSpgValue for Vec<f32> {
1611    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
1612        match v {
1613            Value::Vector(xs) => Ok(xs.clone()),
1614            Value::Null => Err("NULL in non-Option vector column"),
1615            _ => Err("non-vector value in Vec<f32> column"),
1616        }
1617    }
1618}
1619
1620impl<T: FromSpgValue> FromSpgValue for Option<T> {
1621    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
1622        match v {
1623            Value::Null => Ok(None),
1624            other => T::from_spg_value(other).map(Some),
1625        }
1626    }
1627}
1628
1629#[cfg(test)]
1630mod tests {
1631    use super::*;
1632
1633    #[test]
1634    fn in_memory_create_insert_select() {
1635        let mut db = Database::open_in_memory();
1636        db.execute("CREATE TABLE t (id INT NOT NULL, name TEXT)")
1637            .unwrap();
1638        db.execute("INSERT INTO t VALUES (1, 'alice')").unwrap();
1639        db.execute("INSERT INTO t VALUES (2, 'bob')").unwrap();
1640        let rows = db.query("SELECT id FROM t WHERE id = 1").unwrap();
1641        assert_eq!(rows.len(), 1);
1642        match &rows[0][0] {
1643            Value::Int(1) => {}
1644            other => panic!("expected Int(1), got {other:?}"),
1645        }
1646    }
1647
1648    #[test]
1649    fn query_on_non_select_errors() {
1650        let mut db = Database::open_in_memory();
1651        db.execute("CREATE TABLE t (id INT)").unwrap();
1652        let r = db.query("INSERT INTO t VALUES (1)");
1653        assert!(r.is_err(), "query() on INSERT must error");
1654    }
1655
1656    #[test]
1657    fn snapshot_roundtrip() {
1658        let mut db = Database::open_in_memory();
1659        db.execute("CREATE TABLE t (id INT NOT NULL)").unwrap();
1660        db.execute("INSERT INTO t VALUES (42)").unwrap();
1661        let bytes = db.snapshot();
1662        let mut restored = Database::restore(&bytes).unwrap();
1663        let rows = restored.query("SELECT id FROM t WHERE id = 42").unwrap();
1664        assert_eq!(rows.len(), 1);
1665        match &rows[0][0] {
1666            Value::Int(42) => {}
1667            other => panic!("expected Int(42), got {other:?}"),
1668        }
1669    }
1670
1671    #[test]
1672    fn from_spg_row_trait_shape() {
1673        struct User {
1674            _id: i32,
1675        }
1676        impl FromSpgRow for User {
1677            fn from_spg_row(row: &[Value]) -> Result<Self, EngineError> {
1678                match row.first() {
1679                    Some(Value::Int(n)) => Ok(Self { _id: *n }),
1680                    _ => Err(EngineError::Unsupported("bad id".into())),
1681                }
1682            }
1683        }
1684        let row = vec![Value::Int(7)];
1685        let _u = User::from_spg_row(&row).unwrap();
1686    }
1687}