Skip to main content

spg_embedded/
lib.rs

1// v7.7.2 — every public item in this crate must carry a
2// doc-comment; new code that adds a `pub` without one fails CI.
3#![deny(missing_docs)]
4
5//! # spg-embedded
6//!
7//! Ergonomic embedded-mode entry point for SPG. Wraps the
8//! `spg-engine` execution layer for in-process applications
9//! that don't want to spin up a TCP listener / fork to the
10//! `spg-server` binary.
11//!
12//! ## Quick start
13//!
14//! ```no_run
15//! use spg_embedded::Database;
16//!
17//! // On-disk, durable. WAL fsynced per commit; auto-checkpoint
18//! // at 4 MiB WAL by default.
19//! let mut db = Database::open_path("/data/app.db").unwrap();
20//! db.execute("CREATE TABLE users (id INT NOT NULL, name TEXT)").unwrap();
21//! db.execute("INSERT INTO users VALUES (1, 'alice')").unwrap();
22//! let rows = db.query("SELECT name FROM users WHERE id = 1").unwrap();
23//! for row in &rows {
24//!     println!("{:?}", row);
25//! }
26//! ```
27//!
28//! ## Production checklist (v7.5)
29//!
30//! - **Persistence**: `Database::open_path(p)` writes a
31//!   crash-consistent WAL + periodic checkpoint snapshot. The
32//!   on-disk format is byte-identical to what `spg-server`
33//!   produces, so a database can move between modes without
34//!   conversion.
35//! - **Durability**: every `execute()` that mutates calls
36//!   `fsync` before returning `Ok`. There is no group commit
37//!   in embedded mode — every commit pays one fsync. If you
38//!   need batch throughput, wrap multiple statements in
39//!   [`Database::with_transaction`] which fsyncs only at
40//!   commit.
41//! - **Concurrency**: [`Database`] is `Send` but **not** `Sync`.
42//!   Share across threads via `Arc<Mutex<Database>>`. The
43//!   single-writer model is intentional — see
44//!   [STABILITY § A1](https://github.com/lihao/spg/blob/master/STABILITY.md).
45//! - **Background work**: [`Database::spawn_background_freezer`]
46//!   moves cold rows to disk-resident segments while you keep
47//!   serving requests. It runs in a dedicated thread; drop the
48//!   returned [`FreezerHandle`] (or call `stop()`) for clean
49//!   shutdown.
50//! - **Errors**: all public enums ([`EngineError`],
51//!   [`QueryResult`], [`Value`]) are `#[non_exhaustive]`. Match
52//!   them with a wildcard arm so future v7.x releases can add
53//!   variants without breaking your code.
54//!
55//! ## Panic contract
56//!
57//! - **No `execute()` / `query()` call panics on user input.**
58//!   Malformed SQL, type mismatches, missing tables — all
59//!   return `Err(EngineError::…)`. If you observe a panic on
60//!   a user-controlled string, that is a bug; file an issue.
61//! - The library panics **only** on internal invariant
62//!   violations (e.g., catalog snapshot magic mismatch, WAL
63//!   record CRC sentinel corruption that survived the boot-
64//!   time validation). These represent silent disk corruption
65//!   and an unwind would leak inconsistent state, so the
66//!   release profile uses `panic = abort` — your host process
67//!   dies fast rather than continuing on poisoned data.
68//! - If you cannot tolerate `panic = abort`, build with
69//!   `--profile release-dbg` (keeps unwind tables) and use
70//!   `std::panic::catch_unwind` at your application boundary.
71//!
72//! ## Why a separate crate?
73//!
74//! `spg-engine` is `no_std`-compatible (vendored alloc-only).
75//! The embedded-mode entry point uses `std` (filesystem,
76//! threading), so it lives in its own crate to keep the
77//! `no_std` boundary clean.
78
79pub use spg_engine::{Engine, EngineError, QueryResult};
80pub use spg_storage::Value;
81
82use std::collections::BTreeMap;
83use std::fs::{File, OpenOptions};
84use std::io::{Seek, SeekFrom, Write};
85use std::path::{Path, PathBuf};
86use std::sync::atomic::{AtomicBool, Ordering};
87use std::sync::{Arc, Mutex};
88use std::thread::{self, JoinHandle};
89use std::time::{Duration, SystemTime, UNIX_EPOCH};
90
91/// v7.11.3 — wall-clock provider injected into every embedded
92/// `Engine`. Microseconds since the Unix epoch; clamps to
93/// `i64::MAX` if the system clock is far-future. Used by SQL's
94/// `NOW()` / `CURRENT_TIMESTAMP` / `CURRENT_DATE` rewrite layer
95/// so PG-idiomatic time queries work without the caller wiring
96/// their own clock.
97fn wall_clock_micros() -> i64 {
98    SystemTime::now()
99        .duration_since(UNIX_EPOCH)
100        .map_or(0, |d| i64::try_from(d.as_micros()).unwrap_or(i64::MAX))
101}
102
103use spg_manifest::{CatalogManifest, ColdSegmentEntry, manifest_path as spg_manifest_path};
104
105// -- v7.1 WAL format constants (mirror `spg-server`'s) ---------
106// Kept private so callers can't mis-frame records; the v3 layout
107// is the same the server uses, so a `spg-server` boot can read a
108// database an embedded process wrote and vice versa.
109const WAL_V2_SENTINEL: u32 = 0x8000_0000;
110const WAL_V3_FLAG: u32 = 0x4000_0000;
111const WAL_V3_TYPE_AUTO_COMMIT_SQL: u8 = 0x01;
112
113/// v7.1 — auto-checkpoint threshold. Once the WAL grows past
114/// this many bytes, the next successful `execute()` call ends
115/// with a `checkpoint()` so the WAL stays bounded. Tunable via
116/// `SPG_EMBEDDED_CHECKPOINT_BYTES` env.
117fn default_checkpoint_threshold_bytes() -> u64 {
118    std::env::var("SPG_EMBEDDED_CHECKPOINT_BYTES")
119        .ok()
120        .and_then(|s| s.parse::<u64>().ok())
121        .filter(|&n| n > 0)
122        .unwrap_or(4 * 1024 * 1024)
123}
124
125/// v7.1 — encode one v3 `auto_commit_sql` record. Layout:
126///
127/// ```text
128/// [u32 LE (len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
129/// [u32 LE crc32 over (type_byte || sql_bytes)]
130/// [u8 type = 0x01]
131/// [sql bytes]
132/// ```
133fn encode_v3_auto_commit(sql: &str) -> Vec<u8> {
134    let payload = sql.as_bytes();
135    let mut crc_buf = Vec::with_capacity(1 + payload.len());
136    crc_buf.push(WAL_V3_TYPE_AUTO_COMMIT_SQL);
137    crc_buf.extend_from_slice(payload);
138    let crc = spg_crypto::crc32::crc32(&crc_buf);
139    let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
140    let mut out = Vec::with_capacity(4 + 4 + 1 + payload.len());
141    out.extend_from_slice(&header);
142    out.extend_from_slice(&crc.to_le_bytes());
143    out.push(WAL_V3_TYPE_AUTO_COMMIT_SQL);
144    out.extend_from_slice(payload);
145    out
146}
147
148/// v7.1 — decode + apply every record in `wal_bytes` to `engine`.
149/// Returns the count of records successfully applied. A truncated
150/// trailing record (mid-write torn) is dropped silently — the
151/// same recovery story `spg-server`'s boot path uses.
152fn replay_wal_into_engine(wal_bytes: &[u8], engine: &mut Engine) -> Result<usize, String> {
153    let mut applied = 0usize;
154    let mut cur = 0usize;
155    while cur < wal_bytes.len() {
156        if wal_bytes.len() - cur < 4 {
157            // Trailing partial header — torn write, drop and stop.
158            break;
159        }
160        let raw_len = u32::from_le_bytes(wal_bytes[cur..cur + 4].try_into().unwrap());
161        let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
162        let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
163        let len_mask = if is_v3 {
164            !(WAL_V2_SENTINEL | WAL_V3_FLAG)
165        } else {
166            !WAL_V2_SENTINEL
167        };
168        let rec_len = (raw_len & len_mask) as usize;
169        let header_len = if is_v3 {
170            9
171        } else if is_v2 {
172            8
173        } else {
174            4
175        };
176        if wal_bytes.len() - cur < header_len + rec_len {
177            // Torn record at the tail — drop, stop.
178            break;
179        }
180        if is_v3 {
181            let type_byte = wal_bytes[cur + 8];
182            match type_byte {
183                WAL_V3_TYPE_AUTO_COMMIT_SQL => {}
184                0x02 => {
185                    // durability_checkpoint marker — skip, no SQL.
186                    cur += header_len + rec_len;
187                    continue;
188                }
189                other => {
190                    return Err(format!(
191                        "WAL replay: unknown v3 type byte {other:#04x} at offset {cur}"
192                    ));
193                }
194            }
195        }
196        let sql_bytes = &wal_bytes[cur + header_len..cur + header_len + rec_len];
197        let sql = std::str::from_utf8(sql_bytes)
198            .map_err(|e| format!("WAL replay: non-UTF-8 SQL at offset {cur}: {e}"))?;
199        engine
200            .execute(sql)
201            .map_err(|e| format!("WAL replay: apply {sql:?} at offset {cur} rejected: {e:?}"))?;
202        applied += 1;
203        cur += header_len + rec_len;
204    }
205    Ok(applied)
206}
207
208/// v7.1 — predicate for "should the next `execute()` mutate the
209/// WAL?" Returns `false` for SELECT / SHOW / EXPLAIN / BEGIN /
210/// COMMIT / ROLLBACK and the SPG-specific verbs that don't go
211/// through the auto-commit record path on the server (CHECKPOINT,
212/// COMPACT). Conservative: anything we don't explicitly know is
213/// read-only falls through to "write a WAL record".
214fn sql_is_read_only(sql: &str) -> bool {
215    let t = sql.trim_start();
216    let head = t
217        .split(|c: char| c.is_whitespace() || c == ';' || c == '(')
218        .next()
219        .unwrap_or("");
220    matches!(
221        head.to_ascii_lowercase().as_str(),
222        "select"
223            | "show"
224            | "explain"
225            | "begin"
226            | "commit"
227            | "rollback"
228            | "checkpoint"
229            | "compact"
230            | "wait"
231            | "with"
232    )
233}
234
235/// Embedded SPG database handle. Owns an `Engine` + provides
236/// ergonomic wrappers around `execute` and `query`. Drops the
237/// engine on `Drop` — no WAL flush / fsync, because v6.10.3
238/// is in-memory only.
239#[derive(Debug)]
240pub struct Database {
241    engine: Engine,
242    /// v7.1 — persistence sidecar. When `Some(p)`, every
243    /// `execute(sql)` that mutates state appends a v3
244    /// `auto_commit_sql` WAL record + fsyncs before the call
245    /// returns; `Drop` writes a final catalog snapshot to
246    /// `<db_path>` so the next session boots from a clean
247    /// snapshot + an empty WAL. `None` = in-memory only (the
248    /// v6.10.3 shape).
249    persistence: Option<PersistenceCtx>,
250}
251
252#[derive(Debug)]
253#[allow(dead_code)] // `wal_path` is read at boot; kept for Drop/diag introspection.
254struct PersistenceCtx {
255    db_path: PathBuf,
256    wal_path: PathBuf,
257    wal: File,
258    /// Cached WAL file length so each `execute()` doesn't have
259    /// to stat. Refreshed on append + on `checkpoint()` (which
260    /// truncates back to 0).
261    wal_len: u64,
262    checkpoint_threshold_bytes: u64,
263    /// v7.1.4 — `<db_path>.spg/segments/` directory. Cold-tier
264    /// segments produced by `freeze_oldest_to_cold` / compaction
265    /// are persisted here as `seg_<id>.spg` files; the manifest
266    /// at `<db_path>.spg/manifest.v10` records every active
267    /// segment + its CRC32 so the next boot can verify + reload.
268    cold_segments_dir: PathBuf,
269    cold_segment_paths: BTreeMap<u32, PathBuf>,
270}
271
272impl Database {
273    /// Open a fresh in-memory database. No WAL, no catalog
274    /// snapshot on disk — perfect for tests + short-lived
275    /// CLI tools.
276    #[must_use]
277    pub fn open_in_memory() -> Self {
278        Self {
279            engine: Engine::new().with_clock(wall_clock_micros),
280            persistence: None,
281        }
282    }
283
284    /// v7.1 — Open or create a persistent database backed by
285    /// the file at `db_path`. The WAL lives at `db_path` +
286    /// ".wal" (e.g. `./data/spg.db` → `./data/spg.db.wal`). Boot
287    /// path:
288    ///
289    /// 1. If `db_path` exists, restore the catalog snapshot.
290    /// 2. If the WAL exists, replay every record into the
291    ///    restored engine — the same recovery story
292    ///    `spg-server` uses.
293    /// 3. Open the WAL in append+sync mode so subsequent
294    ///    `execute()` writes durably commit (one fsync per
295    ///    mutation).
296    ///
297    /// `Drop` writes a final catalog snapshot + truncates the
298    /// WAL — operators that need a sync barrier at a specific
299    /// point use `checkpoint()` explicitly.
300    pub fn open_path(db_path: impl AsRef<Path>) -> Result<Self, EngineError> {
301        let db_path = db_path.as_ref().to_path_buf();
302        let wal_path = {
303            let mut p = db_path.clone();
304            let name = p
305                .file_name()
306                .map(|n| {
307                    let mut s = n.to_os_string();
308                    s.push(".wal");
309                    s
310                })
311                .unwrap_or_else(|| std::ffi::OsString::from(".wal"));
312            p.set_file_name(name);
313            p
314        };
315        if let Some(parent) = db_path.parent()
316            && !parent.as_os_str().is_empty()
317        {
318            std::fs::create_dir_all(parent).map_err(io_err)?;
319        }
320        let mut engine = if db_path.exists() {
321            let bytes = std::fs::read(&db_path).map_err(io_err)?;
322            let engine = Engine::restore_envelope(&bytes).map_err(|e| {
323                EngineError::Storage(spg_storage::StorageError::Corrupt(format!(
324                    "restore from {}: {e}",
325                    db_path.display()
326                )))
327            })?;
328            engine.with_clock(wall_clock_micros)
329        } else {
330            Engine::new().with_clock(wall_clock_micros)
331        };
332        // v7.1.4 — manifest-driven cold-segment reload. The
333        // manifest sidecar pairs the catalog snapshot CRC with a
334        // list of `(segment_id, path, crc32)` triples; verify
335        // before loading so a torn or stale manifest doesn't
336        // surface phantom data.
337        let cold_segments_dir = {
338            let parent = db_path.parent().unwrap_or_else(|| Path::new("."));
339            let stem = db_path
340                .file_stem()
341                .unwrap_or_else(|| std::ffi::OsStr::new("db"))
342                .to_string_lossy()
343                .into_owned();
344            parent.join(format!("{stem}.spg")).join("segments")
345        };
346        let mut cold_segment_paths: BTreeMap<u32, PathBuf> = BTreeMap::new();
347        let manifest_pth = spg_manifest_path(&db_path);
348        if manifest_pth.exists() && db_path.exists() {
349            let m_bytes = std::fs::read(&manifest_pth).map_err(io_err)?;
350            if let Ok(m) = CatalogManifest::deserialize(&m_bytes) {
351                let snap_bytes = std::fs::read(&db_path).map_err(io_err)?;
352                let snap_crc = spg_crypto::crc32::crc32(&snap_bytes);
353                if snap_crc == m.catalog_crc32 {
354                    for entry in &m.cold_segments {
355                        if let Ok(seg_bytes) = std::fs::read(&entry.path) {
356                            let computed = spg_crypto::crc32::crc32(&seg_bytes);
357                            if computed != entry.crc32 {
358                                eprintln!(
359                                    "spg-embedded: manifest skip segment {}: CRC mismatch",
360                                    entry.segment_id
361                                );
362                                continue;
363                            }
364                            if engine.catalog().cold_segment(entry.segment_id).is_some() {
365                                // Already loaded via Catalog::clone path (shouldn't happen
366                                // since Engine::new + restore_envelope don't populate cold).
367                                continue;
368                            }
369                            let mut new_cat = engine.catalog().clone();
370                            if let Err(e) =
371                                new_cat.load_segment_bytes_at(entry.segment_id, seg_bytes)
372                            {
373                                eprintln!(
374                                    "spg-embedded: manifest load segment {} failed: {e}",
375                                    entry.segment_id
376                                );
377                                continue;
378                            }
379                            engine.replace_catalog(new_cat);
380                            cold_segment_paths.insert(entry.segment_id, entry.path.clone());
381                        } else {
382                            eprintln!(
383                                "spg-embedded: manifest skip segment {}: file unreadable",
384                                entry.segment_id
385                            );
386                        }
387                    }
388                }
389            }
390        }
391        if wal_path.exists() {
392            let wal_bytes = std::fs::read(&wal_path).map_err(io_err)?;
393            if !wal_bytes.is_empty() {
394                replay_wal_into_engine(&wal_bytes, &mut engine)
395                    .map_err(|m| EngineError::Storage(spg_storage::StorageError::Corrupt(m)))?;
396            }
397        }
398        let wal = OpenOptions::new()
399            .create(true)
400            .append(true)
401            .read(true)
402            .open(&wal_path)
403            .map_err(io_err)?;
404        let wal_len = wal.metadata().map_err(io_err)?.len();
405        Ok(Self {
406            engine,
407            persistence: Some(PersistenceCtx {
408                db_path,
409                wal_path,
410                wal,
411                wal_len,
412                checkpoint_threshold_bytes: default_checkpoint_threshold_bytes(),
413                cold_segments_dir,
414                cold_segment_paths,
415            }),
416        })
417    }
418
419    /// v7.1.4 — freeze the oldest `max_rows` of `table_name`'s
420    /// hot tier into a brand-new cold-tier segment + persist
421    /// it to disk. Same semantics as `spg-server`'s freezer
422    /// thread; embedded just runs the freeze synchronously on
423    /// the caller's thread. Persistence + manifest update
424    /// happen as part of the next `checkpoint()` (or on Drop).
425    pub fn freeze_oldest_to_cold(
426        &mut self,
427        table_name: &str,
428        index_name: &str,
429        max_rows: usize,
430    ) -> Result<spg_storage::FreezeReport, EngineError> {
431        let report = self
432            .engine
433            .freeze_oldest_to_cold(table_name, index_name, max_rows)?;
434        if let Some(p) = &mut self.persistence {
435            std::fs::create_dir_all(&p.cold_segments_dir).map_err(io_err)?;
436            let final_path = p
437                .cold_segments_dir
438                .join(format!("seg_{}.spg", report.segment_id));
439            let tmp_path = p
440                .cold_segments_dir
441                .join(format!("seg_{}.spg.tmp", report.segment_id));
442            std::fs::write(&tmp_path, &report.segment_bytes).map_err(io_err)?;
443            std::fs::rename(&tmp_path, &final_path).map_err(io_err)?;
444            p.cold_segment_paths.insert(report.segment_id, final_path);
445        }
446        Ok(report)
447    }
448
449    /// v7.1 — override the auto-checkpoint WAL-size ceiling for
450    /// this `Database` instance. Default is
451    /// `SPG_EMBEDDED_CHECKPOINT_BYTES` env (4 MiB if unset); the
452    /// setter wins. No-op when the database is in-memory.
453    pub fn set_checkpoint_threshold_bytes(&mut self, bytes: u64) {
454        if let Some(p) = &mut self.persistence {
455            p.checkpoint_threshold_bytes = bytes.max(1);
456        }
457    }
458
459    /// v7.1 — flush a fresh catalog snapshot to `db_path` and
460    /// truncate the WAL. Idempotent; cheap when nothing has
461    /// happened since the last checkpoint. No-op when the
462    /// database is in-memory (no `db_path` configured).
463    ///
464    /// Called automatically when:
465    /// - the WAL grows past
466    ///   `SPG_EMBEDDED_CHECKPOINT_BYTES` (default 4 MiB) at the
467    ///   end of an `execute()`, and
468    /// - `Drop` runs (best-effort; checkpoint failure on drop is
469    ///   logged to stderr).
470    pub fn checkpoint(&mut self) -> Result<(), EngineError> {
471        let snapshot = self.engine.snapshot();
472        let Some(p) = &mut self.persistence else {
473            return Ok(());
474        };
475        // Snapshot first (atomic via tmp+rename), then WAL
476        // truncate. Same order as `spg-server`'s CHECKPOINT —
477        // a crash between the two leaves the WAL holding
478        // already-snapshotted ops, which replay cleanly on the
479        // next boot (idempotent for SPG's standard DDL/DML
480        // mutations).
481        let tmp = {
482            let mut t = p.db_path.clone();
483            let mut name = t
484                .file_name()
485                .map(std::ffi::OsStr::to_os_string)
486                .unwrap_or_default();
487            name.push(".tmp");
488            t.set_file_name(name);
489            t
490        };
491        std::fs::write(&tmp, &snapshot).map_err(io_err)?;
492        std::fs::rename(&tmp, &p.db_path).map_err(io_err)?;
493        // v7.1.4 — refresh the manifest so the next boot can
494        // reload cold segments alongside the snapshot. Bytes
495        // come from the freshly-written snapshot file (= the
496        // canonical CRC source).
497        if !p.cold_segment_paths.is_empty() {
498            let snap_crc = spg_crypto::crc32::crc32(&snapshot);
499            let entries: Vec<ColdSegmentEntry> = p
500                .cold_segment_paths
501                .iter()
502                .filter_map(|(&segment_id, path)| {
503                    let bytes = std::fs::read(path).ok()?;
504                    Some(ColdSegmentEntry {
505                        segment_id,
506                        path: path.clone(),
507                        crc32: spg_crypto::crc32::crc32(&bytes),
508                    })
509                })
510                .collect();
511            let manifest = CatalogManifest {
512                catalog_crc32: snap_crc,
513                cold_segments: entries,
514                wal_baseline_offset: 0,
515            };
516            let m_bytes = manifest.serialize();
517            let m_path = spg_manifest_path(&p.db_path);
518            if let Some(dir) = m_path.parent() {
519                std::fs::create_dir_all(dir).map_err(io_err)?;
520            }
521            let m_tmp = {
522                let mut t = m_path.clone();
523                let mut name = t
524                    .file_name()
525                    .map(std::ffi::OsStr::to_os_string)
526                    .unwrap_or_default();
527                name.push(".tmp");
528                t.set_file_name(name);
529                t
530            };
531            std::fs::write(&m_tmp, &m_bytes).map_err(io_err)?;
532            std::fs::rename(&m_tmp, &m_path).map_err(io_err)?;
533        }
534        p.wal.set_len(0).map_err(io_err)?;
535        p.wal.seek(SeekFrom::Start(0)).map_err(io_err)?;
536        p.wal.sync_data().map_err(io_err)?;
537        p.wal_len = 0;
538        Ok(())
539    }
540
541    /// Restore a database from a previously-captured catalog
542    /// snapshot. Pairs with `Database::snapshot()` for
543    /// round-tripping in-memory state without going through
544    /// the `spg-server` WAL.
545    pub fn restore(snapshot: &[u8]) -> Result<Self, EngineError> {
546        let engine = Engine::restore_envelope(snapshot).map_err(|e| {
547            EngineError::Storage(spg_storage::StorageError::Corrupt(format!("restore: {e}")))
548        })?;
549        Ok(Self {
550            engine,
551            persistence: None,
552        })
553    }
554
555    /// Take a catalog snapshot suitable for `Database::restore`.
556    /// The bytes are SPG's canonical catalog envelope (FILE_MAGIC
557    /// + version + payload); round-trips through every released
558    /// SPG version per the STABILITY contract.
559    #[must_use]
560    pub fn snapshot(&self) -> Vec<u8> {
561        self.engine.snapshot()
562    }
563
564    /// Execute a SQL statement and return the engine's
565    /// `QueryResult` verbatim. Pass-through for callers that
566    /// want to keep PG-flavoured column/row metadata.
567    ///
568    /// v7.1 — when the database was opened via `open_path`,
569    /// successful mutations are appended to the WAL + fsynced
570    /// before the call returns. A subsequent process crash will
571    /// recover state up to the last successful return from
572    /// `execute()`. Read-only statements (SELECT / SHOW /
573    /// EXPLAIN / BEGIN-COMMIT-ROLLBACK / CHECKPOINT / COMPACT
574    /// etc.) skip the WAL entirely.
575    pub fn execute(&mut self, sql: &str) -> Result<QueryResult, EngineError> {
576        let result = self.engine.execute(sql)?;
577        if self.persistence.is_some()
578            && !sql_is_read_only(sql)
579            && matches!(
580                &result,
581                QueryResult::CommandOk {
582                    modified_catalog: true,
583                    ..
584                }
585            )
586        {
587            // Append + sync the v3 record AFTER the in-memory
588            // exec succeeds, so a WAL record never describes a
589            // mutation that didn't actually apply. The crash
590            // window between in-memory commit and WAL fsync is
591            // bounded by one record — replay re-applies the
592            // statement idempotently on next boot if we crashed
593            // between (and SPG's DDL/DML are crash-idempotent at
594            // the granularities the wire protocol exposes).
595            let record = encode_v3_auto_commit(sql);
596            let p = self.persistence.as_mut().expect("checked above");
597            p.wal.write_all(&record).map_err(io_err)?;
598            p.wal.sync_data().map_err(io_err)?;
599            p.wal_len = p.wal_len.saturating_add(record.len() as u64);
600            if p.wal_len >= p.checkpoint_threshold_bytes {
601                self.checkpoint()?;
602            }
603        }
604        Ok(result)
605    }
606
607    /// v7.3.0 — typed-row variant of [`Database::query`]. Each
608    /// row decodes into a `T: FromSpgRow` so callers don't
609    /// pattern-match on `Value` themselves. Use [`spg_row!`] to
610    /// generate the impl, or write it by hand.
611    pub fn query_typed<T: FromSpgRow>(&mut self, sql: &str) -> Result<Vec<T>, EngineError> {
612        let rows = self.query(sql)?;
613        rows.into_iter().map(|r| T::from_spg_row(&r)).collect()
614    }
615
616    /// Run a SELECT and return rows as a `Vec<Vec<Value>>` —
617    /// strips the column-schema metadata for read-side
618    /// ergonomics. Errors on non-Rows results (DML / DDL
619    /// statements should go through `execute` instead).
620    pub fn query(&mut self, sql: &str) -> Result<Vec<Vec<Value>>, EngineError> {
621        match self.engine.execute(sql)? {
622            QueryResult::Rows { rows, .. } => Ok(rows.into_iter().map(|r| r.values).collect()),
623            QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
624                "query() expects a SELECT — use execute() for DML/DDL".into(),
625            )),
626            // v7.5.0 — QueryResult is #[non_exhaustive]; any future
627            // variant is not a SELECT row stream, treat as Unsupported.
628            _ => Err(EngineError::Unsupported(
629                "query() expects a SELECT — use execute() for DML/DDL".into(),
630            )),
631        }
632    }
633
634    /// Borrow the underlying engine. Escape hatch for callers
635    /// that need access to `spg-engine` APIs not yet surfaced
636    /// here (transactions, EXPLAIN ANALYZE, etc.).
637    #[must_use]
638    pub const fn engine(&self) -> &Engine {
639        &self.engine
640    }
641
642    /// Mutable borrow of the underlying engine. Same intent as
643    /// `engine()` but for write-side APIs (e.g. inserting
644    /// directly through `Catalog::insert` for high-throughput
645    /// bulk loads that bypass SQL parsing).
646    pub const fn engine_mut(&mut self) -> &mut Engine {
647        &mut self.engine
648    }
649
650    /// v7.2.0 — run `body` inside an implicit `BEGIN` /
651    /// `COMMIT` pair. The body receives `&mut Database` so it
652    /// can `execute()` / `query()` like any other code path;
653    /// the only difference is that every write in the body
654    /// lands inside one transaction, and a returned `Err` from
655    /// the body triggers `ROLLBACK` before the error propagates.
656    ///
657    /// Nested calls are not supported — SPG's transaction
658    /// model is single-writer with explicit `BEGIN` /
659    /// `COMMIT` / `ROLLBACK`, and a nested `with_transaction`
660    /// would hit `EngineError::Unsupported("nested
661    /// transaction")` at the inner `BEGIN`.
662    pub fn with_transaction<R, F>(&mut self, body: F) -> Result<R, EngineError>
663    where
664        F: FnOnce(&mut Self) -> Result<R, EngineError>,
665    {
666        self.execute("BEGIN")?;
667        match body(self) {
668            Ok(value) => {
669                self.execute("COMMIT")?;
670                Ok(value)
671            }
672            Err(e) => {
673                // Best-effort rollback. If ROLLBACK itself
674                // fails (rare — the engine reports it via
675                // `Unsupported` only when there's no active
676                // TX, which can't happen here) we surface the
677                // original body error, not the rollback error.
678                let _ = self.execute("ROLLBACK");
679                Err(e)
680            }
681        }
682    }
683}
684
685impl Default for Database {
686    fn default() -> Self {
687        Self::open_in_memory()
688    }
689}
690
691/// v7.7.5 — observability snapshot returned by
692/// [`Database::metrics`]. Plain data, no allocations beyond
693/// what the struct itself takes; cheap to construct and
694/// cheap to serialise.
695#[derive(Debug, Clone, Copy, PartialEq, Eq)]
696#[non_exhaustive]
697pub struct EmbeddedMetrics {
698    /// Total live row count across every user table (hot
699    /// tier only — cold-tier rows live in segment files).
700    pub hot_rows: u64,
701    /// Sum of `Table::hot_bytes` across every user table.
702    /// Tracks against the freezer's `hot_tier_bytes` budget.
703    pub hot_bytes: u64,
704    /// Number of cold-tier segments registered in the catalog.
705    /// Includes tombstoned slots (segments retired by
706    /// compaction whose disk file may still be on disk).
707    pub cold_segments: u64,
708    /// User-table count (excludes any future engine-managed
709    /// internal tables).
710    pub tables: u64,
711    /// WAL size at last `execute()` / `checkpoint()`. Zero
712    /// when the database is in-memory.
713    pub wal_bytes: u64,
714    /// `true` when the database was opened with `open_path` —
715    /// i.e. WAL + checkpoint persistence is active.
716    pub persistent: bool,
717}
718
719/// v7.2.1 — handle returned by `spawn_background_freezer`.
720/// Drop signals the worker thread to wind down + joins it,
721/// so a `Database` (or its shared `Arc<Mutex<Database>>`)
722/// can safely drop after the handle does.
723#[must_use = "the background freezer keeps running until this handle is dropped"]
724#[derive(Debug)]
725pub struct FreezerHandle {
726    shutdown: Arc<AtomicBool>,
727    join: Option<JoinHandle<()>>,
728}
729
730impl FreezerHandle {
731    /// v7.2.1 — request the worker stop + join. Idempotent;
732    /// safe to call from `Drop` (which also calls it).
733    pub fn stop(&mut self) {
734        self.shutdown.store(true, Ordering::Release);
735        if let Some(h) = self.join.take() {
736            let _ = h.join();
737        }
738    }
739}
740
741impl Drop for FreezerHandle {
742    fn drop(&mut self) {
743        self.stop();
744    }
745}
746
747/// v7.2.1 — knobs for `Database::spawn_background_freezer`.
748#[derive(Debug, Clone)]
749pub struct FreezerOptions {
750    /// Tick interval. Worker wakes every `tick`, checks the
751    /// catalog's `hot_tier_bytes`, and freezes if over budget.
752    pub tick: Duration,
753    /// Hot-tier byte budget. Exceeded → next tick freezes the
754    /// largest table's oldest `batch_rows` rows into a new
755    /// cold segment.
756    pub hot_tier_bytes: u64,
757    /// Max rows the freezer demotes per fire.
758    pub batch_rows: usize,
759    /// v7.7.4 — auto-compact threshold. When the catalog has
760    /// at least this many cold segments across all tables, the
761    /// freezer fires a compaction pass after its next freeze.
762    /// Set to `usize::MAX` to disable auto-compact entirely;
763    /// the default is `64`, matching the `spg-server` operating
764    /// point for SPG_COLD_COMPACT_SEGMENT_THRESHOLD.
765    pub compact_when_segments_exceed: usize,
766    /// v7.7.4 — target segment size for compaction merges,
767    /// in bytes. Default 64 MiB, mirroring `spg-server`. Small
768    /// segments below this size are merge candidates;
769    /// segments at or above stay untouched.
770    pub compact_target_bytes: u64,
771}
772
773impl Default for FreezerOptions {
774    fn default() -> Self {
775        // Match the `spg-server` freezer's default operating
776        // point (SPG_HOT_TIER_BYTES = 4 GiB, batch 1000 rows,
777        // tick every 1 s) so embedded behaviour is predictable
778        // for operators familiar with the server.
779        Self {
780            tick: Duration::from_secs(1),
781            hot_tier_bytes: 4 * 1024 * 1024 * 1024,
782            batch_rows: 1000,
783            compact_when_segments_exceed: 64,
784            compact_target_bytes: 64 * 1024 * 1024,
785        }
786    }
787}
788
789impl Database {
790    /// v7.7.4 — observe the catalog's cold-segment count.
791    /// Useful for tests + dashboards that want to verify
792    /// auto-compaction is firing.
793    #[must_use]
794    pub fn cold_segment_count(&self) -> usize {
795        self.engine.catalog().cold_segment_count()
796    }
797
798    /// v7.7.5 — observability snapshot. Returns a point-in-time
799    /// view of the engine + persistence counters. Cheap (no
800    /// locks beyond the existing `&self` borrow), so safe to
801    /// call from a hot metrics-scrape path.
802    ///
803    /// Fields mirror the operational dashboard
804    /// [`spg-server`](https://crates.io/crates/spg-server) exposes,
805    /// minus the network counters that don't apply to embedded.
806    #[must_use]
807    pub fn metrics(&self) -> EmbeddedMetrics {
808        let cat = self.engine.catalog();
809        let mut hot_rows: u64 = 0;
810        let mut hot_bytes: u64 = 0;
811        for name in cat.table_names() {
812            if let Some(t) = cat.get(&name) {
813                hot_rows = hot_rows.saturating_add(t.row_count() as u64);
814                hot_bytes = hot_bytes.saturating_add(t.hot_bytes());
815            }
816        }
817        let (wal_bytes, persistent) = match &self.persistence {
818            Some(p) => (p.wal_len, true),
819            None => (0, false),
820        };
821        EmbeddedMetrics {
822            hot_rows,
823            hot_bytes,
824            cold_segments: cat.cold_segment_count() as u64,
825            tables: cat.table_count() as u64,
826            wal_bytes,
827            persistent,
828        }
829    }
830
831    /// v7.2.1 — spawn a background thread that periodically
832    /// runs `freeze_oldest_to_cold` when the catalog-wide hot
833    /// tier exceeds `opts.hot_tier_bytes`. The `Arc<Mutex<_>>`
834    /// pattern matches the v7.2 sharing story: callers wrap
835    /// their `Database` in `Arc::new(Mutex::new(db))` once,
836    /// then clone the Arc for the worker + for foreground
837    /// access. Return value is a handle whose `Drop` joins the
838    /// worker.
839    ///
840    /// Picks the freeze target the same way `spg-server`'s
841    /// freezer does: largest-`hot_bytes` user table with at
842    /// least one BTree integer-PK index. Tables without a
843    /// freezable index are skipped silently.
844    pub fn spawn_background_freezer(
845        db: Arc<Mutex<Database>>,
846        opts: FreezerOptions,
847    ) -> FreezerHandle {
848        let shutdown = Arc::new(AtomicBool::new(false));
849        let shutdown_for_thread = Arc::clone(&shutdown);
850        let join = thread::Builder::new()
851            .name("spg-embedded-freezer".into())
852            .spawn(move || {
853                background_freezer_loop(db, opts, shutdown_for_thread);
854            })
855            .expect("spawn background freezer thread");
856        FreezerHandle {
857            shutdown,
858            join: Some(join),
859        }
860    }
861}
862
863/// v7.2.1 — the freezer's main loop, factored out so the
864/// `Database::spawn_background_freezer` path stays readable.
865fn background_freezer_loop(
866    db: Arc<Mutex<Database>>,
867    opts: FreezerOptions,
868    shutdown: Arc<AtomicBool>,
869) {
870    // Sleep in short slices so a shutdown request resolves
871    // quickly (vs sleeping the full tick).
872    let slice = Duration::from_millis(50.min(opts.tick.as_millis() as u64));
873    let mut last_tick = std::time::Instant::now();
874    loop {
875        if shutdown.load(Ordering::Acquire) {
876            return;
877        }
878        thread::sleep(slice);
879        if last_tick.elapsed() < opts.tick {
880            continue;
881        }
882        last_tick = std::time::Instant::now();
883        let Ok(mut guard) = db.lock() else {
884            return;
885        };
886        if guard.engine.catalog().hot_tier_bytes() <= opts.hot_tier_bytes {
887            continue;
888        }
889        let Some((table, index)) = pick_freeze_target(&guard) else {
890            continue;
891        };
892        let row_count = guard
893            .engine
894            .catalog()
895            .get(&table)
896            .map_or(0, spg_storage::Table::row_count);
897        let to_freeze = opts.batch_rows.min(row_count);
898        if to_freeze == 0 {
899            continue;
900        }
901        if let Err(e) = guard.freeze_oldest_to_cold(&table, &index, to_freeze) {
902            eprintln!("spg-embedded: background freeze on {table}.{index} failed: {e:?}");
903            continue;
904        }
905        // v7.7.4 — auto-compact. If the catalog now carries
906        // more cold segments than the configured threshold,
907        // run a single compaction pass. Failures are reported
908        // but don't kill the loop; the next tick will retry.
909        let count = guard.engine.catalog().cold_segment_count();
910        if count > opts.compact_when_segments_exceed {
911            if let Err(e) = guard
912                .engine
913                .compact_cold_segments_with_target(opts.compact_target_bytes)
914            {
915                eprintln!(
916                    "spg-embedded: background compact failed (segments={count}, \
917                     threshold={}): {e:?}",
918                    opts.compact_when_segments_exceed,
919                );
920            }
921        }
922    }
923}
924
925/// v7.2.1 — pick the highest-`hot_bytes` user table with a
926/// BTree integer-PK index. Returns `(table, index_name)` so the
927/// caller can dispatch through `freeze_oldest_to_cold`.
928fn pick_freeze_target(db: &Database) -> Option<(String, String)> {
929    let cat = db.engine.catalog();
930    let mut best: Option<(String, String, u64)> = None;
931    for name in cat.table_names() {
932        let Some(t) = cat.get(&name) else { continue };
933        if t.row_count() == 0 {
934            continue;
935        }
936        let cols = &t.schema().columns;
937        let Some(idx) = t.indices().iter().find(|i| {
938            matches!(i.kind, spg_storage::IndexKind::BTree(_))
939                && i.column_position < cols.len()
940                && matches!(
941                    cols[i.column_position].ty,
942                    spg_storage::DataType::SmallInt
943                        | spg_storage::DataType::Int
944                        | spg_storage::DataType::BigInt
945                )
946        }) else {
947            continue;
948        };
949        let hot = t.hot_bytes();
950        match best {
951            None => best = Some((name, idx.name.clone(), hot)),
952            Some((_, _, best_hot)) if hot > best_hot => {
953                best = Some((name, idx.name.clone(), hot));
954            }
955            _ => {}
956        }
957    }
958    best.map(|(t, i, _)| (t, i))
959}
960
961/// v7.7.6 — replay the first `to_seq` records of the WAL at
962/// `wal_path` into a fresh engine and write the resulting
963/// catalog snapshot to `out_db_path`. Same semantics as
964/// `spg revert --wal … --to-seq N --out …` from the CLI:
965///
966///   - `to_seq == 0` → snapshot is the empty catalog
967///   - WAL records beyond `to_seq` are not applied
968///   - durability-checkpoint markers (v3 type 0x02) are
969///     consumed without counting against the budget
970///
971/// Returns the number of statements actually applied
972/// (`≤ to_seq`). The output snapshot is byte-identical to
973/// what `Database::open_path(out_db_path)` would consume on
974/// a subsequent open.
975///
976/// This is the "rewind" operator for an embedded database
977/// that has been corrupted by a poison statement or a
978/// half-applied migration. Pair with `cold_segment_paths`
979/// preservation if your cold-tier files are still on disk.
980///
981/// # Errors
982///
983/// - `wal_path` unreadable or truncated mid-record
984/// - WAL record decodes to invalid UTF-8 SQL
985/// - WAL record's SQL is rejected by the engine
986/// - `out_db_path` unwritable
987pub fn revert_wal_to_seq(
988    wal_path: impl AsRef<Path>,
989    to_seq: u64,
990    out_db_path: impl AsRef<Path>,
991) -> Result<u64, EngineError> {
992    let wal_bytes = std::fs::read(wal_path.as_ref()).map_err(io_err)?;
993    let mut engine = Engine::new();
994    let mut applied = 0u64;
995    let mut cur = 0usize;
996    while cur < wal_bytes.len() && applied < to_seq {
997        let (sql_bytes, total) = decode_wal_record(&wal_bytes[cur..])?;
998        cur += total;
999        if sql_bytes.is_empty() {
1000            continue;
1001        }
1002        let sql = core::str::from_utf8(&sql_bytes).map_err(|e| {
1003            EngineError::Storage(spg_storage::StorageError::Corrupt(format!(
1004                "WAL record at offset {cur}: non-UTF-8 SQL: {e}"
1005            )))
1006        })?;
1007        engine.execute(sql)?;
1008        applied += 1;
1009    }
1010    let snapshot = engine.snapshot();
1011    std::fs::write(out_db_path.as_ref(), &snapshot).map_err(io_err)?;
1012    Ok(applied)
1013}
1014
1015/// v7.7.6 — decode one WAL record from a byte tail. Returns
1016/// `(sql_bytes, header_plus_payload_len)`. Handles the three
1017/// on-disk formats (v1 / v2 / v3) the same way the CLI
1018/// `decode_one_record` and the engine's `replay_wal_bytes`
1019/// do. CRCs are not re-validated; the caller's intent is
1020/// "apply", not "validate".
1021fn decode_wal_record(tail: &[u8]) -> Result<(Vec<u8>, usize), EngineError> {
1022    if tail.len() < 4 {
1023        return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
1024            format!("WAL truncated record: {} < 4 header bytes", tail.len()),
1025        )));
1026    }
1027    let raw_len = u32::from_le_bytes(tail[..4].try_into().unwrap());
1028    let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
1029    let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
1030    let len_mask = if is_v3 {
1031        !(WAL_V2_SENTINEL | WAL_V3_FLAG)
1032    } else {
1033        !WAL_V2_SENTINEL
1034    };
1035    let rec_len = (raw_len & len_mask) as usize;
1036    let header_len = if is_v3 {
1037        9
1038    } else if is_v2 {
1039        8
1040    } else {
1041        4
1042    };
1043    if tail.len() < header_len + rec_len {
1044        return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
1045            format!(
1046                "WAL truncated record: header+payload {} > available {}",
1047                header_len + rec_len,
1048                tail.len()
1049            ),
1050        )));
1051    }
1052    let payload = &tail[header_len..header_len + rec_len];
1053    let sql_bytes = if is_v3 {
1054        let type_byte = tail[8];
1055        // v3 type 0x01 = auto_commit_sql (payload = SQL).
1056        // v3 type 0x02 = durability marker (payload = u64
1057        // offset, no SQL to apply). Anything else is unknown.
1058        if type_byte == WAL_V3_TYPE_AUTO_COMMIT_SQL {
1059            payload.to_vec()
1060        } else {
1061            // Caller treats empty payload as a skip-marker.
1062            Vec::new()
1063        }
1064    } else {
1065        payload.to_vec()
1066    };
1067    Ok((sql_bytes, header_len + rec_len))
1068}
1069
1070impl Drop for Database {
1071    fn drop(&mut self) {
1072        // v7.1 — best-effort final checkpoint when a persistent
1073        // Database leaves scope. Failures here go to stderr so
1074        // operators see them, but Drop can't propagate errors —
1075        // the WAL itself is already durable, so a checkpoint
1076        // miss only means the next boot replays a few more
1077        // records than strictly necessary.
1078        if self.persistence.is_some() {
1079            if let Err(e) = self.checkpoint() {
1080                eprintln!(
1081                    "spg-embedded: final checkpoint on Drop failed: {e:?} \
1082                     (WAL is intact; next open_path will replay)"
1083                );
1084            }
1085        }
1086    }
1087}
1088
1089/// v7.1 — turn a `std::io::Error` into the workspace's
1090/// `EngineError` shape. `EngineError::Storage(Corrupt(_))` is
1091/// the closest existing variant — io failures during boot or
1092/// during a WAL append surface as a storage-layer fault to
1093/// callers, which keeps the public error enum unchanged.
1094fn io_err(e: std::io::Error) -> EngineError {
1095    EngineError::Storage(spg_storage::StorageError::Corrupt(format!("io: {e}")))
1096}
1097
1098/// v7.2.2 — `Database` is `Send`, so the recommended sharing
1099/// pattern for multi-threaded callers is `Arc<Mutex<Database>>`:
1100///
1101/// ```no_run
1102/// use std::sync::{Arc, Mutex};
1103/// use spg_embedded::Database;
1104///
1105/// let db = Database::open_in_memory();
1106/// let shared = Arc::new(Mutex::new(db));
1107/// let shared_for_worker = Arc::clone(&shared);
1108/// std::thread::spawn(move || {
1109///     let mut guard = shared_for_worker.lock().unwrap();
1110///     guard.execute("INSERT INTO t VALUES (1)").unwrap();
1111/// });
1112/// ```
1113///
1114/// Internal `RwLock`-wrapped state — letting many threads
1115/// hold concurrent `&Database` for `SELECT` without contending
1116/// — is parked as STABILITY § "Out of v7.2"; multi-reader
1117/// embedded throughput needs a planner-side change to release
1118/// the engine read lock between scans, which is the v7.x
1119/// "Choice A" line of work already documented in v6.9.1's
1120/// carve-out.
1121#[allow(dead_code)]
1122fn _database_is_send() {
1123    fn assert_send<T: Send>() {}
1124    assert_send::<Database>();
1125}
1126
1127/// v6.10.3 — trait that maps a row's columns onto a user
1128/// struct's fields. v7.3.0 ships the [`spg_row!`] declarative
1129/// macro that generates `impl FromSpgRow for YourStruct` from
1130/// a struct definition (no proc-macro, no syn/quote/
1131/// proc-macro2 deps — the workspace's "0 external deps"
1132/// policy holds).
1133///
1134/// Implementors map a row's columns onto a user struct's
1135/// fields. Errors surface as `EngineError::Unsupported` so the
1136/// caller's error type stays uniform.
1137pub trait FromSpgRow: Sized {
1138    /// Decode one query result row into `Self`. Called once per
1139    /// row by [`Database::query_typed`]. The slice length equals
1140    /// the number of columns in the SELECT projection.
1141    fn from_spg_row(row: &[Value]) -> Result<Self, EngineError>;
1142}
1143
1144/// v7.3.0 — declarative macro that generates `FromSpgRow` impl
1145/// for a user struct. Avoids proc-macro deps
1146/// (syn/quote/proc-macro2) so the workspace's 0-deps policy
1147/// holds; the trade-off vs `#[derive(SpgRow)]` is that the
1148/// macro takes the entire struct definition (fields + types)
1149/// as input rather than annotating an existing struct.
1150///
1151/// ```no_run
1152/// use spg_embedded::{Database, spg_row, FromSpgRow};
1153///
1154/// spg_row! {
1155///     pub struct User {
1156///         pub id: i32,
1157///         pub name: String,
1158///     }
1159/// }
1160///
1161/// let mut db = Database::open_in_memory();
1162/// db.execute("CREATE TABLE users (id INT NOT NULL, name TEXT)").unwrap();
1163/// db.execute("INSERT INTO users VALUES (1, 'alice')").unwrap();
1164/// let users: Vec<User> = db.query_typed("SELECT id, name FROM users").unwrap();
1165/// ```
1166///
1167/// Supported field types: `i16`, `i32`, `i64`, `f32`, `f64`,
1168/// `bool`, `String`, `Vec<f32>` (for `VECTOR(N)` columns),
1169/// `Option<T>` of any of the above.
1170#[macro_export]
1171macro_rules! spg_row {
1172    (
1173        $(#[$meta:meta])*
1174        $vis:vis struct $name:ident {
1175            $(
1176                $(#[$fmeta:meta])*
1177                $fvis:vis $field:ident : $ty:ty,
1178            )*
1179        }
1180    ) => {
1181        $(#[$meta])*
1182        #[derive(Debug, Clone)]
1183        $vis struct $name {
1184            $(
1185                $(#[$fmeta])*
1186                $fvis $field : $ty,
1187            )*
1188        }
1189
1190        impl $crate::FromSpgRow for $name {
1191            fn from_spg_row(row: &[$crate::Value]) -> ::core::result::Result<Self, $crate::EngineError> {
1192                let mut __spg_row_iter = row.iter();
1193                $(
1194                    let $field: $ty = {
1195                        let v = __spg_row_iter
1196                            .next()
1197                            .ok_or_else(|| $crate::EngineError::Unsupported(
1198                                ::std::format!(
1199                                    "spg_row! {}: missing column for field `{}`",
1200                                    ::core::stringify!($name),
1201                                    ::core::stringify!($field)
1202                                )
1203                            ))?;
1204                        <$ty as $crate::FromSpgValue>::from_spg_value(v)
1205                            .map_err(|e| $crate::EngineError::Unsupported(
1206                                ::std::format!(
1207                                    "spg_row! {}: column `{}`: {}",
1208                                    ::core::stringify!($name),
1209                                    ::core::stringify!($field),
1210                                    e
1211                                )
1212                            ))?
1213                    };
1214                )*
1215                Ok(Self { $($field,)* })
1216            }
1217        }
1218    };
1219}
1220
1221/// v7.3.0 — per-column decoder used by `spg_row!`. Surface
1222/// covers every numeric / text / bytes / bool variant in
1223/// `Value`, plus `Option<T>` for nullable columns.
1224pub trait FromSpgValue: Sized {
1225    /// Decode one cell into `Self`. The returned `&'static str`
1226    /// is a short diagnostic for type mismatches (e.g. `"expected
1227    /// integer, got TEXT"`); callers wrap it into their own
1228    /// error type.
1229    fn from_spg_value(v: &Value) -> Result<Self, &'static str>;
1230}
1231
1232macro_rules! impl_from_value_int {
1233    ($($t:ty),* $(,)?) => {
1234        $(
1235            impl FromSpgValue for $t {
1236                fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
1237                    match v {
1238                        Value::SmallInt(n) => <$t>::try_from(*n).map_err(|_| "SmallInt does not fit target int type"),
1239                        Value::Int(n)      => <$t>::try_from(*n).map_err(|_| "Int does not fit target int type"),
1240                        Value::BigInt(n)   => <$t>::try_from(*n).map_err(|_| "BigInt does not fit target int type"),
1241                        Value::Null        => Err("NULL in non-Option int column"),
1242                        _ => Err("non-integer value in int column"),
1243                    }
1244                }
1245            }
1246        )*
1247    };
1248}
1249impl_from_value_int!(i16, i32, i64);
1250
1251impl FromSpgValue for f32 {
1252    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
1253        match v {
1254            Value::Float(f) => Ok(*f as f32),
1255            Value::Null => Err("NULL in non-Option float column"),
1256            _ => Err("non-float value in float column"),
1257        }
1258    }
1259}
1260
1261impl FromSpgValue for f64 {
1262    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
1263        match v {
1264            Value::Float(f) => Ok(*f),
1265            Value::Null => Err("NULL in non-Option float column"),
1266            _ => Err("non-float value in float column"),
1267        }
1268    }
1269}
1270
1271impl FromSpgValue for bool {
1272    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
1273        match v {
1274            Value::Bool(b) => Ok(*b),
1275            Value::Null => Err("NULL in non-Option bool column"),
1276            _ => Err("non-bool value in bool column"),
1277        }
1278    }
1279}
1280
1281impl FromSpgValue for String {
1282    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
1283        match v {
1284            Value::Text(s) => Ok(s.clone()),
1285            Value::Null => Err("NULL in non-Option text column"),
1286            _ => Err("non-text value in String column"),
1287        }
1288    }
1289}
1290
1291impl FromSpgValue for Vec<f32> {
1292    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
1293        match v {
1294            Value::Vector(xs) => Ok(xs.clone()),
1295            Value::Null => Err("NULL in non-Option vector column"),
1296            _ => Err("non-vector value in Vec<f32> column"),
1297        }
1298    }
1299}
1300
1301impl<T: FromSpgValue> FromSpgValue for Option<T> {
1302    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
1303        match v {
1304            Value::Null => Ok(None),
1305            other => T::from_spg_value(other).map(Some),
1306        }
1307    }
1308}
1309
1310#[cfg(test)]
1311mod tests {
1312    use super::*;
1313
1314    #[test]
1315    fn in_memory_create_insert_select() {
1316        let mut db = Database::open_in_memory();
1317        db.execute("CREATE TABLE t (id INT NOT NULL, name TEXT)")
1318            .unwrap();
1319        db.execute("INSERT INTO t VALUES (1, 'alice')").unwrap();
1320        db.execute("INSERT INTO t VALUES (2, 'bob')").unwrap();
1321        let rows = db.query("SELECT id FROM t WHERE id = 1").unwrap();
1322        assert_eq!(rows.len(), 1);
1323        match &rows[0][0] {
1324            Value::Int(1) => {}
1325            other => panic!("expected Int(1), got {other:?}"),
1326        }
1327    }
1328
1329    #[test]
1330    fn query_on_non_select_errors() {
1331        let mut db = Database::open_in_memory();
1332        db.execute("CREATE TABLE t (id INT)").unwrap();
1333        let r = db.query("INSERT INTO t VALUES (1)");
1334        assert!(r.is_err(), "query() on INSERT must error");
1335    }
1336
1337    #[test]
1338    fn snapshot_roundtrip() {
1339        let mut db = Database::open_in_memory();
1340        db.execute("CREATE TABLE t (id INT NOT NULL)").unwrap();
1341        db.execute("INSERT INTO t VALUES (42)").unwrap();
1342        let bytes = db.snapshot();
1343        let mut restored = Database::restore(&bytes).unwrap();
1344        let rows = restored.query("SELECT id FROM t WHERE id = 42").unwrap();
1345        assert_eq!(rows.len(), 1);
1346        match &rows[0][0] {
1347            Value::Int(42) => {}
1348            other => panic!("expected Int(42), got {other:?}"),
1349        }
1350    }
1351
1352    #[test]
1353    fn from_spg_row_trait_shape() {
1354        struct User {
1355            _id: i32,
1356        }
1357        impl FromSpgRow for User {
1358            fn from_spg_row(row: &[Value]) -> Result<Self, EngineError> {
1359                match row.first() {
1360                    Some(Value::Int(n)) => Ok(Self { _id: *n }),
1361                    _ => Err(EngineError::Unsupported("bad id".into())),
1362                }
1363            }
1364        }
1365        let row = vec![Value::Int(7)];
1366        let _u = User::from_spg_row(&row).unwrap();
1367    }
1368}