spg_embedded/lib.rs
1// v7.7.2 — every public item in this crate must carry a
2// doc-comment; new code that adds a `pub` without one fails CI.
3#![deny(missing_docs)]
4
5//! # spg-embedded
6//!
7//! Ergonomic embedded-mode entry point for SPG. Wraps the
8//! `spg-engine` execution layer for in-process applications
9//! that don't want to spin up a TCP listener / fork to the
10//! `spg-server` binary.
11//!
12//! ## Quick start
13//!
14//! ```no_run
15//! use spg_embedded::Database;
16//!
17//! // On-disk, durable. WAL fsynced per commit; auto-checkpoint
18//! // at 4 MiB WAL by default.
19//! let mut db = Database::open_path("/data/app.db").unwrap();
20//! db.execute("CREATE TABLE users (id INT NOT NULL, name TEXT)").unwrap();
21//! db.execute("INSERT INTO users VALUES (1, 'alice')").unwrap();
22//! let rows = db.query("SELECT name FROM users WHERE id = 1").unwrap();
23//! for row in &rows {
24//! println!("{:?}", row);
25//! }
26//! ```
27//!
28//! ## Production checklist (v7.5)
29//!
30//! - **Persistence**: `Database::open_path(p)` writes a
31//! crash-consistent WAL + periodic checkpoint snapshot. The
32//! on-disk format is byte-identical to what `spg-server`
33//! produces, so a database can move between modes without
34//! conversion.
35//! - **Durability**: every `execute()` that mutates calls
36//! `fsync` before returning `Ok`. There is no group commit
37//! in embedded mode — every commit pays one fsync. If you
38//! need batch throughput, wrap multiple statements in
39//! [`Database::with_transaction`] which fsyncs only at
40//! commit.
41//! - **Concurrency**: [`Database`] is `Send` but **not** `Sync`.
42//! Share across threads via `Arc<Mutex<Database>>`. The
43//! single-writer model is intentional — see
44//! [STABILITY § A1](https://github.com/lihao/spg/blob/master/STABILITY.md).
45//! - **Background work**: [`Database::spawn_background_freezer`]
46//! moves cold rows to disk-resident segments while you keep
47//! serving requests. It runs in a dedicated thread; drop the
48//! returned [`FreezerHandle`] (or call `stop()`) for clean
49//! shutdown.
50//! - **Errors**: all public enums ([`EngineError`],
51//! [`QueryResult`], [`Value`]) are `#[non_exhaustive]`. Match
52//! them with a wildcard arm so future v7.x releases can add
53//! variants without breaking your code.
54//!
55//! ## Panic contract
56//!
57//! - **No `execute()` / `query()` call panics on user input.**
58//! Malformed SQL, type mismatches, missing tables — all
59//! return `Err(EngineError::…)`. If you observe a panic on
60//! a user-controlled string, that is a bug; file an issue.
61//! - The library panics **only** on internal invariant
62//! violations (e.g., catalog snapshot magic mismatch, WAL
63//! record CRC sentinel corruption that survived the boot-
64//! time validation). These represent silent disk corruption
65//! and an unwind would leak inconsistent state, so the
66//! release profile uses `panic = abort` — your host process
67//! dies fast rather than continuing on poisoned data.
68//! - If you cannot tolerate `panic = abort`, build with
69//! `--profile release-dbg` (keeps unwind tables) and use
70//! `std::panic::catch_unwind` at your application boundary.
71//!
72//! ## Why a separate crate?
73//!
74//! `spg-engine` is `no_std`-compatible (vendored alloc-only).
75//! The embedded-mode entry point uses `std` (filesystem,
76//! threading), so it lives in its own crate to keep the
77//! `no_std` boundary clean.
78
79pub use spg_engine::{Engine, EngineError, ParsedStatement, QueryResult};
80pub use spg_storage::{ColumnSchema, DataType, Value};
81
82/// v7.16.0 — handle for a parsed-and-planned SQL statement.
83/// Hand off to [`Database::execute_prepared`] / [`Database::query_prepared`]
84/// with a `&[Value]` slice carrying the bind parameters (PG-style
85/// `$1`, `$2`, … positional). Cheap to `Clone`; the underlying AST
86/// is shared by handle copies and cloned per bind call by the
87/// engine's executor.
88///
89/// The handle holds a snapshot of the AST at prepare time. If
90/// the engine's plan cache evicts the entry between prepare and
91/// execute (e.g. ANALYZE bumps the statistics version) the
92/// stored AST keeps working — `execute_prepared` operates on
93/// the handle's clone, not the cache entry.
94#[derive(Debug, Clone)]
95pub struct Statement {
96 /// The parsed + planned AST. `spg-engine::prepare_cached`
97 /// returns it as a clone of the cached plan, so any rewrite
98 /// passes (`expand_group_by_all`, `reorder_joins`, …) have
99 /// already run.
100 pub(crate) stmt: ParsedStatement,
101 /// Original SQL source, kept for `Display` / debug only.
102 /// WAL persistence renders from the AST so a bind-time
103 /// rewrite of `$1..$N` survives replay.
104 pub(crate) sql: String,
105}
106
107impl Statement {
108 /// Borrow the original SQL source — useful for tracing and
109 /// debug logs. WAL replay does NOT use this; it serialises
110 /// the bind-final AST instead.
111 #[must_use]
112 pub fn sql(&self) -> &str {
113 &self.sql
114 }
115}
116
117/// v7.16.0 — internal WAL helper. Mirrors what
118/// `Engine::execute_prepared` does to the cloned AST so the WAL
119/// record carries the bind-final SQL text (so replay's
120/// simple-query path reconstructs the same row state without
121/// needing the original `Statement` handle to still be alive).
122/// Errors from the underlying engine helper would only fire if
123/// the bind-final stmt referenced a placeholder past the params
124/// slice — and that case has already errored in the executor
125/// above before this helper runs, so we discard the Result here.
126fn wal_render_with_params(stmt: &mut ParsedStatement, params: &[Value]) {
127 let _ = spg_engine::substitute_placeholders(stmt, params);
128}
129
130use std::collections::BTreeMap;
131use std::fs::{File, OpenOptions};
132use std::io::{Seek, SeekFrom, Write};
133use std::path::{Path, PathBuf};
134use std::sync::atomic::{AtomicBool, Ordering};
135use std::sync::{Arc, Mutex};
136use std::thread::{self, JoinHandle};
137use std::time::{Duration, SystemTime, UNIX_EPOCH};
138
139/// v7.11.3 — wall-clock provider injected into every embedded
140/// `Engine`. Microseconds since the Unix epoch; clamps to
141/// `i64::MAX` if the system clock is far-future. Used by SQL's
142/// `NOW()` / `CURRENT_TIMESTAMP` / `CURRENT_DATE` rewrite layer
143/// so PG-idiomatic time queries work without the caller wiring
144/// their own clock.
145fn wall_clock_micros() -> i64 {
146 SystemTime::now()
147 .duration_since(UNIX_EPOCH)
148 .map_or(0, |d| i64::try_from(d.as_micros()).unwrap_or(i64::MAX))
149}
150
151use spg_manifest::{CatalogManifest, ColdSegmentEntry, manifest_path as spg_manifest_path};
152
153// -- v7.1 WAL format constants (mirror `spg-server`'s) ---------
154// Kept private so callers can't mis-frame records; the v3 layout
155// is the same the server uses, so a `spg-server` boot can read a
156// database an embedded process wrote and vice versa.
157const WAL_V2_SENTINEL: u32 = 0x8000_0000;
158const WAL_V3_FLAG: u32 = 0x4000_0000;
159const WAL_V3_TYPE_AUTO_COMMIT_SQL: u8 = 0x01;
160
161/// v7.1 — auto-checkpoint threshold. Once the WAL grows past
162/// this many bytes, the next successful `execute()` call ends
163/// with a `checkpoint()` so the WAL stays bounded. Tunable via
164/// `SPG_EMBEDDED_CHECKPOINT_BYTES` env.
165fn default_checkpoint_threshold_bytes() -> u64 {
166 std::env::var("SPG_EMBEDDED_CHECKPOINT_BYTES")
167 .ok()
168 .and_then(|s| s.parse::<u64>().ok())
169 .filter(|&n| n > 0)
170 .unwrap_or(4 * 1024 * 1024)
171}
172
173/// v7.1 — encode one v3 `auto_commit_sql` record. Layout:
174///
175/// ```text
176/// [u32 LE (len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
177/// [u32 LE crc32 over (type_byte || sql_bytes)]
178/// [u8 type = 0x01]
179/// [sql bytes]
180/// ```
181fn encode_v3_auto_commit(sql: &str) -> Vec<u8> {
182 let payload = sql.as_bytes();
183 let mut crc_buf = Vec::with_capacity(1 + payload.len());
184 crc_buf.push(WAL_V3_TYPE_AUTO_COMMIT_SQL);
185 crc_buf.extend_from_slice(payload);
186 let crc = spg_crypto::crc32::crc32(&crc_buf);
187 let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
188 let mut out = Vec::with_capacity(4 + 4 + 1 + payload.len());
189 out.extend_from_slice(&header);
190 out.extend_from_slice(&crc.to_le_bytes());
191 out.push(WAL_V3_TYPE_AUTO_COMMIT_SQL);
192 out.extend_from_slice(payload);
193 out
194}
195
196/// v7.1 — decode + apply every record in `wal_bytes` to `engine`.
197/// Returns the count of records successfully applied. A truncated
198/// trailing record (mid-write torn) is dropped silently — the
199/// same recovery story `spg-server`'s boot path uses.
200fn replay_wal_into_engine(wal_bytes: &[u8], engine: &mut Engine) -> Result<usize, String> {
201 let mut applied = 0usize;
202 let mut cur = 0usize;
203 while cur < wal_bytes.len() {
204 if wal_bytes.len() - cur < 4 {
205 // Trailing partial header — torn write, drop and stop.
206 break;
207 }
208 let raw_len = u32::from_le_bytes(wal_bytes[cur..cur + 4].try_into().unwrap());
209 let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
210 let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
211 let len_mask = if is_v3 {
212 !(WAL_V2_SENTINEL | WAL_V3_FLAG)
213 } else {
214 !WAL_V2_SENTINEL
215 };
216 let rec_len = (raw_len & len_mask) as usize;
217 let header_len = if is_v3 {
218 9
219 } else if is_v2 {
220 8
221 } else {
222 4
223 };
224 if wal_bytes.len() - cur < header_len + rec_len {
225 // Torn record at the tail — drop, stop.
226 break;
227 }
228 if is_v3 {
229 let type_byte = wal_bytes[cur + 8];
230 match type_byte {
231 WAL_V3_TYPE_AUTO_COMMIT_SQL => {}
232 0x02 => {
233 // durability_checkpoint marker — skip, no SQL.
234 cur += header_len + rec_len;
235 continue;
236 }
237 other => {
238 return Err(format!(
239 "WAL replay: unknown v3 type byte {other:#04x} at offset {cur}"
240 ));
241 }
242 }
243 }
244 let sql_bytes = &wal_bytes[cur + header_len..cur + header_len + rec_len];
245 let sql = std::str::from_utf8(sql_bytes)
246 .map_err(|e| format!("WAL replay: non-UTF-8 SQL at offset {cur}: {e}"))?;
247 engine
248 .execute(sql)
249 .map_err(|e| format!("WAL replay: apply {sql:?} at offset {cur} rejected: {e:?}"))?;
250 applied += 1;
251 cur += header_len + rec_len;
252 }
253 Ok(applied)
254}
255
256/// v7.1 — predicate for "should the next `execute()` mutate the
257/// WAL?" Returns `false` for SELECT / SHOW / EXPLAIN / BEGIN /
258/// COMMIT / ROLLBACK and the SPG-specific verbs that don't go
259/// through the auto-commit record path on the server (CHECKPOINT,
260/// COMPACT). Conservative: anything we don't explicitly know is
261/// read-only falls through to "write a WAL record".
262fn sql_is_read_only(sql: &str) -> bool {
263 let t = sql.trim_start();
264 let head = t
265 .split(|c: char| c.is_whitespace() || c == ';' || c == '(')
266 .next()
267 .unwrap_or("");
268 matches!(
269 head.to_ascii_lowercase().as_str(),
270 "select"
271 | "show"
272 | "explain"
273 | "begin"
274 | "commit"
275 | "rollback"
276 | "checkpoint"
277 | "compact"
278 | "wait"
279 | "with"
280 )
281}
282
283/// Embedded SPG database handle. Owns an `Engine` + provides
284/// ergonomic wrappers around `execute` and `query`. Drops the
285/// engine on `Drop` — no WAL flush / fsync, because v6.10.3
286/// is in-memory only.
287#[derive(Debug)]
288pub struct Database {
289 engine: Engine,
290 /// v7.1 — persistence sidecar. When `Some(p)`, every
291 /// `execute(sql)` that mutates state appends a v3
292 /// `auto_commit_sql` WAL record + fsyncs before the call
293 /// returns; `Drop` writes a final catalog snapshot to
294 /// `<db_path>` so the next session boots from a clean
295 /// snapshot + an empty WAL. `None` = in-memory only (the
296 /// v6.10.3 shape).
297 persistence: Option<PersistenceCtx>,
298}
299
300#[derive(Debug)]
301#[allow(dead_code)] // `wal_path` is read at boot; kept for Drop/diag introspection.
302struct PersistenceCtx {
303 db_path: PathBuf,
304 wal_path: PathBuf,
305 wal: File,
306 /// Cached WAL file length so each `execute()` doesn't have
307 /// to stat. Refreshed on append + on `checkpoint()` (which
308 /// truncates back to 0).
309 wal_len: u64,
310 checkpoint_threshold_bytes: u64,
311 /// v7.1.4 — `<db_path>.spg/segments/` directory. Cold-tier
312 /// segments produced by `freeze_oldest_to_cold` / compaction
313 /// are persisted here as `seg_<id>.spg` files; the manifest
314 /// at `<db_path>.spg/manifest.v10` records every active
315 /// segment + its CRC32 so the next boot can verify + reload.
316 cold_segments_dir: PathBuf,
317 cold_segment_paths: BTreeMap<u32, PathBuf>,
318}
319
320impl Database {
321 /// Open a fresh in-memory database. No WAL, no catalog
322 /// snapshot on disk — perfect for tests + short-lived
323 /// CLI tools.
324 #[must_use]
325 pub fn open_in_memory() -> Self {
326 Self {
327 engine: Engine::new().with_clock(wall_clock_micros),
328 persistence: None,
329 }
330 }
331
332 /// v7.1 — Open or create a persistent database backed by
333 /// the file at `db_path`. The WAL lives at `db_path` +
334 /// ".wal" (e.g. `./data/spg.db` → `./data/spg.db.wal`). Boot
335 /// path:
336 ///
337 /// 1. If `db_path` exists, restore the catalog snapshot.
338 /// 2. If the WAL exists, replay every record into the
339 /// restored engine — the same recovery story
340 /// `spg-server` uses.
341 /// 3. Open the WAL in append+sync mode so subsequent
342 /// `execute()` writes durably commit (one fsync per
343 /// mutation).
344 ///
345 /// `Drop` writes a final catalog snapshot + truncates the
346 /// WAL — operators that need a sync barrier at a specific
347 /// point use `checkpoint()` explicitly.
348 pub fn open_path(db_path: impl AsRef<Path>) -> Result<Self, EngineError> {
349 let db_path = db_path.as_ref().to_path_buf();
350 let wal_path = {
351 let mut p = db_path.clone();
352 let name = p
353 .file_name()
354 .map(|n| {
355 let mut s = n.to_os_string();
356 s.push(".wal");
357 s
358 })
359 .unwrap_or_else(|| std::ffi::OsString::from(".wal"));
360 p.set_file_name(name);
361 p
362 };
363 if let Some(parent) = db_path.parent()
364 && !parent.as_os_str().is_empty()
365 {
366 std::fs::create_dir_all(parent).map_err(io_err)?;
367 }
368 let mut engine = if db_path.exists() {
369 let bytes = std::fs::read(&db_path).map_err(io_err)?;
370 let engine = Engine::restore_envelope(&bytes).map_err(|e| {
371 EngineError::Storage(spg_storage::StorageError::Corrupt(format!(
372 "restore from {}: {e}",
373 db_path.display()
374 )))
375 })?;
376 engine.with_clock(wall_clock_micros)
377 } else {
378 Engine::new().with_clock(wall_clock_micros)
379 };
380 // v7.1.4 — manifest-driven cold-segment reload. The
381 // manifest sidecar pairs the catalog snapshot CRC with a
382 // list of `(segment_id, path, crc32)` triples; verify
383 // before loading so a torn or stale manifest doesn't
384 // surface phantom data.
385 let cold_segments_dir = {
386 let parent = db_path.parent().unwrap_or_else(|| Path::new("."));
387 let stem = db_path
388 .file_stem()
389 .unwrap_or_else(|| std::ffi::OsStr::new("db"))
390 .to_string_lossy()
391 .into_owned();
392 parent.join(format!("{stem}.spg")).join("segments")
393 };
394 let mut cold_segment_paths: BTreeMap<u32, PathBuf> = BTreeMap::new();
395 let manifest_pth = spg_manifest_path(&db_path);
396 if manifest_pth.exists() && db_path.exists() {
397 let m_bytes = std::fs::read(&manifest_pth).map_err(io_err)?;
398 if let Ok(m) = CatalogManifest::deserialize(&m_bytes) {
399 let snap_bytes = std::fs::read(&db_path).map_err(io_err)?;
400 let snap_crc = spg_crypto::crc32::crc32(&snap_bytes);
401 if snap_crc == m.catalog_crc32 {
402 for entry in &m.cold_segments {
403 if let Ok(seg_bytes) = std::fs::read(&entry.path) {
404 let computed = spg_crypto::crc32::crc32(&seg_bytes);
405 if computed != entry.crc32 {
406 eprintln!(
407 "spg-embedded: manifest skip segment {}: CRC mismatch",
408 entry.segment_id
409 );
410 continue;
411 }
412 if engine.catalog().cold_segment(entry.segment_id).is_some() {
413 // Already loaded via Catalog::clone path (shouldn't happen
414 // since Engine::new + restore_envelope don't populate cold).
415 continue;
416 }
417 let mut new_cat = engine.catalog().clone();
418 if let Err(e) =
419 new_cat.load_segment_bytes_at(entry.segment_id, seg_bytes)
420 {
421 eprintln!(
422 "spg-embedded: manifest load segment {} failed: {e}",
423 entry.segment_id
424 );
425 continue;
426 }
427 engine.replace_catalog(new_cat);
428 cold_segment_paths.insert(entry.segment_id, entry.path.clone());
429 } else {
430 eprintln!(
431 "spg-embedded: manifest skip segment {}: file unreadable",
432 entry.segment_id
433 );
434 }
435 }
436 }
437 }
438 }
439 if wal_path.exists() {
440 let wal_bytes = std::fs::read(&wal_path).map_err(io_err)?;
441 if !wal_bytes.is_empty() {
442 replay_wal_into_engine(&wal_bytes, &mut engine)
443 .map_err(|m| EngineError::Storage(spg_storage::StorageError::Corrupt(m)))?;
444 }
445 }
446 let wal = OpenOptions::new()
447 .create(true)
448 .append(true)
449 .read(true)
450 .open(&wal_path)
451 .map_err(io_err)?;
452 let wal_len = wal.metadata().map_err(io_err)?.len();
453 Ok(Self {
454 engine,
455 persistence: Some(PersistenceCtx {
456 db_path,
457 wal_path,
458 wal,
459 wal_len,
460 checkpoint_threshold_bytes: default_checkpoint_threshold_bytes(),
461 cold_segments_dir,
462 cold_segment_paths,
463 }),
464 })
465 }
466
467 /// v7.1.4 — freeze the oldest `max_rows` of `table_name`'s
468 /// hot tier into a brand-new cold-tier segment + persist
469 /// it to disk. Same semantics as `spg-server`'s freezer
470 /// thread; embedded just runs the freeze synchronously on
471 /// the caller's thread. Persistence + manifest update
472 /// happen as part of the next `checkpoint()` (or on Drop).
473 pub fn freeze_oldest_to_cold(
474 &mut self,
475 table_name: &str,
476 index_name: &str,
477 max_rows: usize,
478 ) -> Result<spg_storage::FreezeReport, EngineError> {
479 let report = self
480 .engine
481 .freeze_oldest_to_cold(table_name, index_name, max_rows)?;
482 if let Some(p) = &mut self.persistence {
483 std::fs::create_dir_all(&p.cold_segments_dir).map_err(io_err)?;
484 let final_path = p
485 .cold_segments_dir
486 .join(format!("seg_{}.spg", report.segment_id));
487 let tmp_path = p
488 .cold_segments_dir
489 .join(format!("seg_{}.spg.tmp", report.segment_id));
490 std::fs::write(&tmp_path, &report.segment_bytes).map_err(io_err)?;
491 std::fs::rename(&tmp_path, &final_path).map_err(io_err)?;
492 p.cold_segment_paths.insert(report.segment_id, final_path);
493 }
494 Ok(report)
495 }
496
497 /// v7.1 — override the auto-checkpoint WAL-size ceiling for
498 /// this `Database` instance. Default is
499 /// `SPG_EMBEDDED_CHECKPOINT_BYTES` env (4 MiB if unset); the
500 /// setter wins. No-op when the database is in-memory.
501 pub fn set_checkpoint_threshold_bytes(&mut self, bytes: u64) {
502 if let Some(p) = &mut self.persistence {
503 p.checkpoint_threshold_bytes = bytes.max(1);
504 }
505 }
506
507 /// v7.1 — flush a fresh catalog snapshot to `db_path` and
508 /// truncate the WAL. Idempotent; cheap when nothing has
509 /// happened since the last checkpoint. No-op when the
510 /// database is in-memory (no `db_path` configured).
511 ///
512 /// Called automatically when:
513 /// - the WAL grows past
514 /// `SPG_EMBEDDED_CHECKPOINT_BYTES` (default 4 MiB) at the
515 /// end of an `execute()`, and
516 /// - `Drop` runs (best-effort; checkpoint failure on drop is
517 /// logged to stderr).
518 pub fn checkpoint(&mut self) -> Result<(), EngineError> {
519 let snapshot = self.engine.snapshot();
520 let Some(p) = &mut self.persistence else {
521 return Ok(());
522 };
523 // Snapshot first (atomic via tmp+rename), then WAL
524 // truncate. Same order as `spg-server`'s CHECKPOINT —
525 // a crash between the two leaves the WAL holding
526 // already-snapshotted ops, which replay cleanly on the
527 // next boot (idempotent for SPG's standard DDL/DML
528 // mutations).
529 let tmp = {
530 let mut t = p.db_path.clone();
531 let mut name = t
532 .file_name()
533 .map(std::ffi::OsStr::to_os_string)
534 .unwrap_or_default();
535 name.push(".tmp");
536 t.set_file_name(name);
537 t
538 };
539 std::fs::write(&tmp, &snapshot).map_err(io_err)?;
540 std::fs::rename(&tmp, &p.db_path).map_err(io_err)?;
541 // v7.1.4 — refresh the manifest so the next boot can
542 // reload cold segments alongside the snapshot. Bytes
543 // come from the freshly-written snapshot file (= the
544 // canonical CRC source).
545 if !p.cold_segment_paths.is_empty() {
546 let snap_crc = spg_crypto::crc32::crc32(&snapshot);
547 let entries: Vec<ColdSegmentEntry> = p
548 .cold_segment_paths
549 .iter()
550 .filter_map(|(&segment_id, path)| {
551 let bytes = std::fs::read(path).ok()?;
552 Some(ColdSegmentEntry {
553 segment_id,
554 path: path.clone(),
555 crc32: spg_crypto::crc32::crc32(&bytes),
556 })
557 })
558 .collect();
559 let manifest = CatalogManifest {
560 catalog_crc32: snap_crc,
561 cold_segments: entries,
562 wal_baseline_offset: 0,
563 };
564 let m_bytes = manifest.serialize();
565 let m_path = spg_manifest_path(&p.db_path);
566 if let Some(dir) = m_path.parent() {
567 std::fs::create_dir_all(dir).map_err(io_err)?;
568 }
569 let m_tmp = {
570 let mut t = m_path.clone();
571 let mut name = t
572 .file_name()
573 .map(std::ffi::OsStr::to_os_string)
574 .unwrap_or_default();
575 name.push(".tmp");
576 t.set_file_name(name);
577 t
578 };
579 std::fs::write(&m_tmp, &m_bytes).map_err(io_err)?;
580 std::fs::rename(&m_tmp, &m_path).map_err(io_err)?;
581 }
582 p.wal.set_len(0).map_err(io_err)?;
583 p.wal.seek(SeekFrom::Start(0)).map_err(io_err)?;
584 p.wal.sync_data().map_err(io_err)?;
585 p.wal_len = 0;
586 Ok(())
587 }
588
589 /// Restore a database from a previously-captured catalog
590 /// snapshot. Pairs with `Database::snapshot()` for
591 /// round-tripping in-memory state without going through
592 /// the `spg-server` WAL.
593 pub fn restore(snapshot: &[u8]) -> Result<Self, EngineError> {
594 let engine = Engine::restore_envelope(snapshot).map_err(|e| {
595 EngineError::Storage(spg_storage::StorageError::Corrupt(format!("restore: {e}")))
596 })?;
597 Ok(Self {
598 engine,
599 persistence: None,
600 })
601 }
602
603 /// Take a catalog snapshot suitable for `Database::restore`.
604 /// The bytes are SPG's canonical catalog envelope (FILE_MAGIC
605 /// + version + payload); round-trips through every released
606 /// SPG version per the STABILITY contract.
607 #[must_use]
608 pub fn snapshot(&self) -> Vec<u8> {
609 self.engine.snapshot()
610 }
611
612 /// Execute a SQL statement and return the engine's
613 /// `QueryResult` verbatim. Pass-through for callers that
614 /// want to keep PG-flavoured column/row metadata.
615 ///
616 /// v7.1 — when the database was opened via `open_path`,
617 /// successful mutations are appended to the WAL + fsynced
618 /// before the call returns. A subsequent process crash will
619 /// recover state up to the last successful return from
620 /// `execute()`. Read-only statements (SELECT / SHOW /
621 /// EXPLAIN / BEGIN-COMMIT-ROLLBACK / CHECKPOINT / COMPACT
622 /// etc.) skip the WAL entirely.
623 pub fn execute(&mut self, sql: &str) -> Result<QueryResult, EngineError> {
624 let result = self.engine.execute(sql)?;
625 if self.persistence.is_some()
626 && !sql_is_read_only(sql)
627 && matches!(
628 &result,
629 QueryResult::CommandOk {
630 modified_catalog: true,
631 ..
632 }
633 )
634 {
635 // Append + sync the v3 record AFTER the in-memory
636 // exec succeeds, so a WAL record never describes a
637 // mutation that didn't actually apply. The crash
638 // window between in-memory commit and WAL fsync is
639 // bounded by one record — replay re-applies the
640 // statement idempotently on next boot if we crashed
641 // between (and SPG's DDL/DML are crash-idempotent at
642 // the granularities the wire protocol exposes).
643 let record = encode_v3_auto_commit(sql);
644 let p = self.persistence.as_mut().expect("checked above");
645 p.wal.write_all(&record).map_err(io_err)?;
646 p.wal.sync_data().map_err(io_err)?;
647 p.wal_len = p.wal_len.saturating_add(record.len() as u64);
648 if p.wal_len >= p.checkpoint_threshold_bytes {
649 self.checkpoint()?;
650 }
651 }
652 Ok(result)
653 }
654
655 /// v7.3.0 — typed-row variant of [`Database::query`]. Each
656 /// row decodes into a `T: FromSpgRow` so callers don't
657 /// pattern-match on `Value` themselves. Use [`spg_row!`] to
658 /// generate the impl, or write it by hand.
659 pub fn query_typed<T: FromSpgRow>(&mut self, sql: &str) -> Result<Vec<T>, EngineError> {
660 let rows = self.query(sql)?;
661 rows.into_iter().map(|r| T::from_spg_row(&r)).collect()
662 }
663
664 /// Run a SELECT and return rows as a `Vec<Vec<Value>>` —
665 /// strips the column-schema metadata for read-side
666 /// ergonomics. Errors on non-Rows results (DML / DDL
667 /// statements should go through `execute` instead).
668 pub fn query(&mut self, sql: &str) -> Result<Vec<Vec<Value>>, EngineError> {
669 match self.engine.execute(sql)? {
670 QueryResult::Rows { rows, .. } => Ok(rows.into_iter().map(|r| r.values).collect()),
671 QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
672 "query() expects a SELECT — use execute() for DML/DDL".into(),
673 )),
674 // v7.5.0 — QueryResult is #[non_exhaustive]; any future
675 // variant is not a SELECT row stream, treat as Unsupported.
676 _ => Err(EngineError::Unsupported(
677 "query() expects a SELECT — use execute() for DML/DDL".into(),
678 )),
679 }
680 }
681
682 /// v7.16.0 — column-aware variant of [`Self::query`].
683 /// Returns the column schema vec alongside the rows so
684 /// adapters (the spg-sqlx Row impl most notably) can drive
685 /// name + type-based column lookups. Errors on non-Rows
686 /// results identically to `query`.
687 pub fn query_with_columns(
688 &mut self,
689 sql: &str,
690 ) -> Result<(Vec<spg_storage::ColumnSchema>, Vec<Vec<Value>>), EngineError> {
691 match self.engine.execute(sql)? {
692 QueryResult::Rows { columns, rows } => {
693 Ok((columns, rows.into_iter().map(|r| r.values).collect()))
694 }
695 QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
696 "query_with_columns() expects a SELECT — use execute() for DML/DDL".into(),
697 )),
698 _ => Err(EngineError::Unsupported(
699 "query_with_columns() expects a SELECT — use execute() for DML/DDL".into(),
700 )),
701 }
702 }
703
704 /// v7.16.0 — column-aware variant of
705 /// [`Self::query_prepared`]. Same shape as
706 /// `query_with_columns` but driven from a prepared
707 /// statement + bound params.
708 pub fn query_prepared_with_columns(
709 &mut self,
710 stmt: &Statement,
711 params: &[Value],
712 ) -> Result<(Vec<spg_storage::ColumnSchema>, Vec<Vec<Value>>), EngineError> {
713 match self.engine.execute_prepared(stmt.stmt.clone(), params)? {
714 QueryResult::Rows { columns, rows } => {
715 Ok((columns, rows.into_iter().map(|r| r.values).collect()))
716 }
717 QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
718 "query_prepared_with_columns() expects a SELECT — use execute_prepared() for DML/DDL".into(),
719 )),
720 _ => Err(EngineError::Unsupported(
721 "query_prepared_with_columns() expects a SELECT — use execute_prepared() for DML/DDL".into(),
722 )),
723 }
724 }
725
726 /// Borrow the underlying engine. Escape hatch for callers
727 /// that need access to `spg-engine` APIs not yet surfaced
728 /// here (transactions, EXPLAIN ANALYZE, etc.).
729 #[must_use]
730 pub const fn engine(&self) -> &Engine {
731 &self.engine
732 }
733
734 /// Mutable borrow of the underlying engine. Same intent as
735 /// `engine()` but for write-side APIs (e.g. inserting
736 /// directly through `Catalog::insert` for high-throughput
737 /// bulk loads that bypass SQL parsing).
738 pub const fn engine_mut(&mut self) -> &mut Engine {
739 &mut self.engine
740 }
741
742 /// v7.16.0 — parse + plan a SQL string ONCE so subsequent
743 /// `execute_prepared` / `query_prepared` calls can re-bind
744 /// parameters without re-parsing. The returned [`Statement`]
745 /// is a thin handle around the AST + cached source SQL; it's
746 /// `Clone` so the same plan can drive many bind calls
747 /// concurrently (each call clones the AST and runs
748 /// placeholder substitution on the clone — the cached
749 /// plan stays intact).
750 ///
751 /// Plan caching follows the engine's existing version-aware
752 /// rule: a prepared `Statement` whose statistics version
753 /// has rolled (ANALYZE ran between prepare and execute)
754 /// will silently re-prepare under the hood. Callers don't
755 /// need to detect this.
756 ///
757 /// Placeholders in the SQL use PG's `$1`, `$2`, … convention.
758 /// `bind`-time `Value`s are passed as a slice; arity
759 /// mismatches surface as `EvalError::PlaceholderOutOfRange`
760 /// at `execute_prepared` time, not here.
761 ///
762 /// # Errors
763 /// Surfaces `EngineError` (parse error / plan rewrite
764 /// failure) from the underlying `Engine::prepare`.
765 pub fn prepare(&mut self, sql: &str) -> Result<Statement, EngineError> {
766 // Use the cached path so repeated prepares of the same
767 // SQL are O(1). The engine's plan cache stays shared
768 // across all callers of this Database — a single
769 // `PgPool`-shaped consumer (or, later, the spg-sqlx
770 // adapter) prepares once and reaps the win on every bind.
771 let stmt = self
772 .engine
773 .prepare_cached(sql)
774 .map_err(EngineError::Parse)?;
775 Ok(Statement {
776 stmt,
777 sql: sql.to_string(),
778 })
779 }
780
781 /// v7.16.0 — execute a prepared statement with bound
782 /// parameters. Mirrors `Engine::execute_prepared`: clones
783 /// the AST, substitutes `$1..$N` → `params[0..N-1]`, runs.
784 ///
785 /// Persistence (WAL fsync + auto-checkpoint) follows the
786 /// same rules as `execute(sql)`: mutating statements get a
787 /// WAL record AFTER the in-memory exec succeeds. The WAL
788 /// record carries the substituted, bind-final SQL, so
789 /// replay reconstructs the same row state without needing
790 /// the original prepared `Statement` to still be alive.
791 ///
792 /// # Errors
793 /// Propagates engine errors. Param arity mismatch surfaces
794 /// as `EvalError::PlaceholderOutOfRange`.
795 pub fn execute_prepared(
796 &mut self,
797 stmt: &Statement,
798 params: &[Value],
799 ) -> Result<QueryResult, EngineError> {
800 let result = self.engine.execute_prepared(stmt.stmt.clone(), params)?;
801 // WAL persistence on the bind-final SQL. Build the
802 // canonical Display form by re-printing the
803 // placeholder-substituted statement (cheap — the AST
804 // is already in hand from execute_prepared's internal
805 // clone) so replay's path is identical to the
806 // simple-query path.
807 if self.persistence.is_some()
808 && matches!(
809 &result,
810 QueryResult::CommandOk {
811 modified_catalog: true,
812 ..
813 }
814 )
815 {
816 // Render the AST back to SQL for WAL replay. The
817 // placeholder positions are already substituted in
818 // the executed clone; we re-substitute on a fresh
819 // clone here purely to obtain the canonical text.
820 let mut wal_stmt = stmt.stmt.clone();
821 // Use the engine's substitute_placeholders entry —
822 // exposed via execute_prepared above. Here we
823 // re-run the substitution only for Display.
824 crate::wal_render_with_params(&mut wal_stmt, params);
825 let canonical = format!("{wal_stmt}");
826 let record = encode_v3_auto_commit(&canonical);
827 let p = self.persistence.as_mut().expect("checked above");
828 p.wal.write_all(&record).map_err(io_err)?;
829 p.wal.sync_data().map_err(io_err)?;
830 p.wal_len = p.wal_len.saturating_add(record.len() as u64);
831 if p.wal_len >= p.checkpoint_threshold_bytes {
832 self.checkpoint()?;
833 }
834 }
835 Ok(result)
836 }
837
838 /// v7.16.0 — run a prepared SELECT with bound params and
839 /// return rows as `Vec<Vec<Value>>`, matching `query()`
840 /// shape. SELECTs are read-only so this never writes the
841 /// WAL.
842 ///
843 /// # Errors
844 /// Returns `Unsupported` if the prepared statement isn't a
845 /// SELECT (use `execute_prepared` for DML/DDL).
846 pub fn query_prepared(
847 &mut self,
848 stmt: &Statement,
849 params: &[Value],
850 ) -> Result<Vec<Vec<Value>>, EngineError> {
851 match self.engine.execute_prepared(stmt.stmt.clone(), params)? {
852 QueryResult::Rows { rows, .. } => Ok(rows.into_iter().map(|r| r.values).collect()),
853 QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
854 "query_prepared() expects a SELECT — use execute_prepared() for DML/DDL".into(),
855 )),
856 _ => Err(EngineError::Unsupported(
857 "query_prepared() expects a SELECT — use execute_prepared() for DML/DDL".into(),
858 )),
859 }
860 }
861
862 /// v7.2.0 — run `body` inside an implicit `BEGIN` /
863 /// `COMMIT` pair. The body receives `&mut Database` so it
864 /// can `execute()` / `query()` like any other code path;
865 /// the only difference is that every write in the body
866 /// lands inside one transaction, and a returned `Err` from
867 /// the body triggers `ROLLBACK` before the error propagates.
868 ///
869 /// Nested calls are not supported — SPG's transaction
870 /// model is single-writer with explicit `BEGIN` /
871 /// `COMMIT` / `ROLLBACK`, and a nested `with_transaction`
872 /// would hit `EngineError::Unsupported("nested
873 /// transaction")` at the inner `BEGIN`.
874 pub fn with_transaction<R, F>(&mut self, body: F) -> Result<R, EngineError>
875 where
876 F: FnOnce(&mut Self) -> Result<R, EngineError>,
877 {
878 self.execute("BEGIN")?;
879 match body(self) {
880 Ok(value) => {
881 self.execute("COMMIT")?;
882 Ok(value)
883 }
884 Err(e) => {
885 // Best-effort rollback. If ROLLBACK itself
886 // fails (rare — the engine reports it via
887 // `Unsupported` only when there's no active
888 // TX, which can't happen here) we surface the
889 // original body error, not the rollback error.
890 let _ = self.execute("ROLLBACK");
891 Err(e)
892 }
893 }
894 }
895}
896
897impl Default for Database {
898 fn default() -> Self {
899 Self::open_in_memory()
900 }
901}
902
903/// v7.7.5 — observability snapshot returned by
904/// [`Database::metrics`]. Plain data, no allocations beyond
905/// what the struct itself takes; cheap to construct and
906/// cheap to serialise.
907#[derive(Debug, Clone, Copy, PartialEq, Eq)]
908#[non_exhaustive]
909pub struct EmbeddedMetrics {
910 /// Total live row count across every user table (hot
911 /// tier only — cold-tier rows live in segment files).
912 pub hot_rows: u64,
913 /// Sum of `Table::hot_bytes` across every user table.
914 /// Tracks against the freezer's `hot_tier_bytes` budget.
915 pub hot_bytes: u64,
916 /// Number of cold-tier segments registered in the catalog.
917 /// Includes tombstoned slots (segments retired by
918 /// compaction whose disk file may still be on disk).
919 pub cold_segments: u64,
920 /// User-table count (excludes any future engine-managed
921 /// internal tables).
922 pub tables: u64,
923 /// WAL size at last `execute()` / `checkpoint()`. Zero
924 /// when the database is in-memory.
925 pub wal_bytes: u64,
926 /// `true` when the database was opened with `open_path` —
927 /// i.e. WAL + checkpoint persistence is active.
928 pub persistent: bool,
929}
930
931/// v7.2.1 — handle returned by `spawn_background_freezer`.
932/// Drop signals the worker thread to wind down + joins it,
933/// so a `Database` (or its shared `Arc<Mutex<Database>>`)
934/// can safely drop after the handle does.
935#[must_use = "the background freezer keeps running until this handle is dropped"]
936#[derive(Debug)]
937pub struct FreezerHandle {
938 shutdown: Arc<AtomicBool>,
939 join: Option<JoinHandle<()>>,
940}
941
942impl FreezerHandle {
943 /// v7.2.1 — request the worker stop + join. Idempotent;
944 /// safe to call from `Drop` (which also calls it).
945 pub fn stop(&mut self) {
946 self.shutdown.store(true, Ordering::Release);
947 if let Some(h) = self.join.take() {
948 let _ = h.join();
949 }
950 }
951}
952
953impl Drop for FreezerHandle {
954 fn drop(&mut self) {
955 self.stop();
956 }
957}
958
959/// v7.2.1 — knobs for `Database::spawn_background_freezer`.
960#[derive(Debug, Clone)]
961pub struct FreezerOptions {
962 /// Tick interval. Worker wakes every `tick`, checks the
963 /// catalog's `hot_tier_bytes`, and freezes if over budget.
964 pub tick: Duration,
965 /// Hot-tier byte budget. Exceeded → next tick freezes the
966 /// largest table's oldest `batch_rows` rows into a new
967 /// cold segment.
968 pub hot_tier_bytes: u64,
969 /// Max rows the freezer demotes per fire.
970 pub batch_rows: usize,
971 /// v7.7.4 — auto-compact threshold. When the catalog has
972 /// at least this many cold segments across all tables, the
973 /// freezer fires a compaction pass after its next freeze.
974 /// Set to `usize::MAX` to disable auto-compact entirely;
975 /// the default is `64`, matching the `spg-server` operating
976 /// point for SPG_COLD_COMPACT_SEGMENT_THRESHOLD.
977 pub compact_when_segments_exceed: usize,
978 /// v7.7.4 — target segment size for compaction merges,
979 /// in bytes. Default 64 MiB, mirroring `spg-server`. Small
980 /// segments below this size are merge candidates;
981 /// segments at or above stay untouched.
982 pub compact_target_bytes: u64,
983}
984
985impl Default for FreezerOptions {
986 fn default() -> Self {
987 // Match the `spg-server` freezer's default operating
988 // point (SPG_HOT_TIER_BYTES = 4 GiB, batch 1000 rows,
989 // tick every 1 s) so embedded behaviour is predictable
990 // for operators familiar with the server.
991 Self {
992 tick: Duration::from_secs(1),
993 hot_tier_bytes: 4 * 1024 * 1024 * 1024,
994 batch_rows: 1000,
995 compact_when_segments_exceed: 64,
996 compact_target_bytes: 64 * 1024 * 1024,
997 }
998 }
999}
1000
1001impl Database {
1002 /// v7.7.4 — observe the catalog's cold-segment count.
1003 /// Useful for tests + dashboards that want to verify
1004 /// auto-compaction is firing.
1005 #[must_use]
1006 pub fn cold_segment_count(&self) -> usize {
1007 self.engine.catalog().cold_segment_count()
1008 }
1009
1010 /// v7.7.5 — observability snapshot. Returns a point-in-time
1011 /// view of the engine + persistence counters. Cheap (no
1012 /// locks beyond the existing `&self` borrow), so safe to
1013 /// call from a hot metrics-scrape path.
1014 ///
1015 /// Fields mirror the operational dashboard
1016 /// [`spg-server`](https://crates.io/crates/spg-server) exposes,
1017 /// minus the network counters that don't apply to embedded.
1018 #[must_use]
1019 pub fn metrics(&self) -> EmbeddedMetrics {
1020 let cat = self.engine.catalog();
1021 let mut hot_rows: u64 = 0;
1022 let mut hot_bytes: u64 = 0;
1023 for name in cat.table_names() {
1024 if let Some(t) = cat.get(&name) {
1025 hot_rows = hot_rows.saturating_add(t.row_count() as u64);
1026 hot_bytes = hot_bytes.saturating_add(t.hot_bytes());
1027 }
1028 }
1029 let (wal_bytes, persistent) = match &self.persistence {
1030 Some(p) => (p.wal_len, true),
1031 None => (0, false),
1032 };
1033 EmbeddedMetrics {
1034 hot_rows,
1035 hot_bytes,
1036 cold_segments: cat.cold_segment_count() as u64,
1037 tables: cat.table_count() as u64,
1038 wal_bytes,
1039 persistent,
1040 }
1041 }
1042
1043 /// v7.2.1 — spawn a background thread that periodically
1044 /// runs `freeze_oldest_to_cold` when the catalog-wide hot
1045 /// tier exceeds `opts.hot_tier_bytes`. The `Arc<Mutex<_>>`
1046 /// pattern matches the v7.2 sharing story: callers wrap
1047 /// their `Database` in `Arc::new(Mutex::new(db))` once,
1048 /// then clone the Arc for the worker + for foreground
1049 /// access. Return value is a handle whose `Drop` joins the
1050 /// worker.
1051 ///
1052 /// Picks the freeze target the same way `spg-server`'s
1053 /// freezer does: largest-`hot_bytes` user table with at
1054 /// least one BTree integer-PK index. Tables without a
1055 /// freezable index are skipped silently.
1056 pub fn spawn_background_freezer(
1057 db: Arc<Mutex<Database>>,
1058 opts: FreezerOptions,
1059 ) -> FreezerHandle {
1060 let shutdown = Arc::new(AtomicBool::new(false));
1061 let shutdown_for_thread = Arc::clone(&shutdown);
1062 let join = thread::Builder::new()
1063 .name("spg-embedded-freezer".into())
1064 .spawn(move || {
1065 background_freezer_loop(db, opts, shutdown_for_thread);
1066 })
1067 .expect("spawn background freezer thread");
1068 FreezerHandle {
1069 shutdown,
1070 join: Some(join),
1071 }
1072 }
1073}
1074
1075/// v7.2.1 — the freezer's main loop, factored out so the
1076/// `Database::spawn_background_freezer` path stays readable.
1077fn background_freezer_loop(
1078 db: Arc<Mutex<Database>>,
1079 opts: FreezerOptions,
1080 shutdown: Arc<AtomicBool>,
1081) {
1082 // Sleep in short slices so a shutdown request resolves
1083 // quickly (vs sleeping the full tick).
1084 let slice = Duration::from_millis(50.min(opts.tick.as_millis() as u64));
1085 let mut last_tick = std::time::Instant::now();
1086 loop {
1087 if shutdown.load(Ordering::Acquire) {
1088 return;
1089 }
1090 thread::sleep(slice);
1091 if last_tick.elapsed() < opts.tick {
1092 continue;
1093 }
1094 last_tick = std::time::Instant::now();
1095 let Ok(mut guard) = db.lock() else {
1096 return;
1097 };
1098 if guard.engine.catalog().hot_tier_bytes() <= opts.hot_tier_bytes {
1099 continue;
1100 }
1101 let Some((table, index)) = pick_freeze_target(&guard) else {
1102 continue;
1103 };
1104 let row_count = guard
1105 .engine
1106 .catalog()
1107 .get(&table)
1108 .map_or(0, spg_storage::Table::row_count);
1109 let to_freeze = opts.batch_rows.min(row_count);
1110 if to_freeze == 0 {
1111 continue;
1112 }
1113 if let Err(e) = guard.freeze_oldest_to_cold(&table, &index, to_freeze) {
1114 eprintln!("spg-embedded: background freeze on {table}.{index} failed: {e:?}");
1115 continue;
1116 }
1117 // v7.7.4 — auto-compact. If the catalog now carries
1118 // more cold segments than the configured threshold,
1119 // run a single compaction pass. Failures are reported
1120 // but don't kill the loop; the next tick will retry.
1121 let count = guard.engine.catalog().cold_segment_count();
1122 if count > opts.compact_when_segments_exceed {
1123 if let Err(e) = guard
1124 .engine
1125 .compact_cold_segments_with_target(opts.compact_target_bytes)
1126 {
1127 eprintln!(
1128 "spg-embedded: background compact failed (segments={count}, \
1129 threshold={}): {e:?}",
1130 opts.compact_when_segments_exceed,
1131 );
1132 }
1133 }
1134 }
1135}
1136
1137/// v7.2.1 — pick the highest-`hot_bytes` user table with a
1138/// BTree integer-PK index. Returns `(table, index_name)` so the
1139/// caller can dispatch through `freeze_oldest_to_cold`.
1140fn pick_freeze_target(db: &Database) -> Option<(String, String)> {
1141 let cat = db.engine.catalog();
1142 let mut best: Option<(String, String, u64)> = None;
1143 for name in cat.table_names() {
1144 let Some(t) = cat.get(&name) else { continue };
1145 if t.row_count() == 0 {
1146 continue;
1147 }
1148 let cols = &t.schema().columns;
1149 let Some(idx) = t.indices().iter().find(|i| {
1150 matches!(i.kind, spg_storage::IndexKind::BTree(_))
1151 && i.column_position < cols.len()
1152 && matches!(
1153 cols[i.column_position].ty,
1154 spg_storage::DataType::SmallInt
1155 | spg_storage::DataType::Int
1156 | spg_storage::DataType::BigInt
1157 )
1158 }) else {
1159 continue;
1160 };
1161 let hot = t.hot_bytes();
1162 match best {
1163 None => best = Some((name, idx.name.clone(), hot)),
1164 Some((_, _, best_hot)) if hot > best_hot => {
1165 best = Some((name, idx.name.clone(), hot));
1166 }
1167 _ => {}
1168 }
1169 }
1170 best.map(|(t, i, _)| (t, i))
1171}
1172
1173/// v7.7.6 — replay the first `to_seq` records of the WAL at
1174/// `wal_path` into a fresh engine and write the resulting
1175/// catalog snapshot to `out_db_path`. Same semantics as
1176/// `spg revert --wal … --to-seq N --out …` from the CLI:
1177///
1178/// - `to_seq == 0` → snapshot is the empty catalog
1179/// - WAL records beyond `to_seq` are not applied
1180/// - durability-checkpoint markers (v3 type 0x02) are
1181/// consumed without counting against the budget
1182///
1183/// Returns the number of statements actually applied
1184/// (`≤ to_seq`). The output snapshot is byte-identical to
1185/// what `Database::open_path(out_db_path)` would consume on
1186/// a subsequent open.
1187///
1188/// This is the "rewind" operator for an embedded database
1189/// that has been corrupted by a poison statement or a
1190/// half-applied migration. Pair with `cold_segment_paths`
1191/// preservation if your cold-tier files are still on disk.
1192///
1193/// # Errors
1194///
1195/// - `wal_path` unreadable or truncated mid-record
1196/// - WAL record decodes to invalid UTF-8 SQL
1197/// - WAL record's SQL is rejected by the engine
1198/// - `out_db_path` unwritable
1199pub fn revert_wal_to_seq(
1200 wal_path: impl AsRef<Path>,
1201 to_seq: u64,
1202 out_db_path: impl AsRef<Path>,
1203) -> Result<u64, EngineError> {
1204 let wal_bytes = std::fs::read(wal_path.as_ref()).map_err(io_err)?;
1205 let mut engine = Engine::new();
1206 let mut applied = 0u64;
1207 let mut cur = 0usize;
1208 while cur < wal_bytes.len() && applied < to_seq {
1209 let (sql_bytes, total) = decode_wal_record(&wal_bytes[cur..])?;
1210 cur += total;
1211 if sql_bytes.is_empty() {
1212 continue;
1213 }
1214 let sql = core::str::from_utf8(&sql_bytes).map_err(|e| {
1215 EngineError::Storage(spg_storage::StorageError::Corrupt(format!(
1216 "WAL record at offset {cur}: non-UTF-8 SQL: {e}"
1217 )))
1218 })?;
1219 engine.execute(sql)?;
1220 applied += 1;
1221 }
1222 let snapshot = engine.snapshot();
1223 std::fs::write(out_db_path.as_ref(), &snapshot).map_err(io_err)?;
1224 Ok(applied)
1225}
1226
1227/// v7.7.6 — decode one WAL record from a byte tail. Returns
1228/// `(sql_bytes, header_plus_payload_len)`. Handles the three
1229/// on-disk formats (v1 / v2 / v3) the same way the CLI
1230/// `decode_one_record` and the engine's `replay_wal_bytes`
1231/// do. CRCs are not re-validated; the caller's intent is
1232/// "apply", not "validate".
1233fn decode_wal_record(tail: &[u8]) -> Result<(Vec<u8>, usize), EngineError> {
1234 if tail.len() < 4 {
1235 return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
1236 format!("WAL truncated record: {} < 4 header bytes", tail.len()),
1237 )));
1238 }
1239 let raw_len = u32::from_le_bytes(tail[..4].try_into().unwrap());
1240 let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
1241 let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
1242 let len_mask = if is_v3 {
1243 !(WAL_V2_SENTINEL | WAL_V3_FLAG)
1244 } else {
1245 !WAL_V2_SENTINEL
1246 };
1247 let rec_len = (raw_len & len_mask) as usize;
1248 let header_len = if is_v3 {
1249 9
1250 } else if is_v2 {
1251 8
1252 } else {
1253 4
1254 };
1255 if tail.len() < header_len + rec_len {
1256 return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
1257 format!(
1258 "WAL truncated record: header+payload {} > available {}",
1259 header_len + rec_len,
1260 tail.len()
1261 ),
1262 )));
1263 }
1264 let payload = &tail[header_len..header_len + rec_len];
1265 let sql_bytes = if is_v3 {
1266 let type_byte = tail[8];
1267 // v3 type 0x01 = auto_commit_sql (payload = SQL).
1268 // v3 type 0x02 = durability marker (payload = u64
1269 // offset, no SQL to apply). Anything else is unknown.
1270 if type_byte == WAL_V3_TYPE_AUTO_COMMIT_SQL {
1271 payload.to_vec()
1272 } else {
1273 // Caller treats empty payload as a skip-marker.
1274 Vec::new()
1275 }
1276 } else {
1277 payload.to_vec()
1278 };
1279 Ok((sql_bytes, header_len + rec_len))
1280}
1281
1282impl Drop for Database {
1283 fn drop(&mut self) {
1284 // v7.1 — best-effort final checkpoint when a persistent
1285 // Database leaves scope. Failures here go to stderr so
1286 // operators see them, but Drop can't propagate errors —
1287 // the WAL itself is already durable, so a checkpoint
1288 // miss only means the next boot replays a few more
1289 // records than strictly necessary.
1290 if self.persistence.is_some() {
1291 if let Err(e) = self.checkpoint() {
1292 eprintln!(
1293 "spg-embedded: final checkpoint on Drop failed: {e:?} \
1294 (WAL is intact; next open_path will replay)"
1295 );
1296 }
1297 }
1298 }
1299}
1300
1301/// v7.1 — turn a `std::io::Error` into the workspace's
1302/// `EngineError` shape. `EngineError::Storage(Corrupt(_))` is
1303/// the closest existing variant — io failures during boot or
1304/// during a WAL append surface as a storage-layer fault to
1305/// callers, which keeps the public error enum unchanged.
1306fn io_err(e: std::io::Error) -> EngineError {
1307 EngineError::Storage(spg_storage::StorageError::Corrupt(format!("io: {e}")))
1308}
1309
1310/// v7.2.2 — `Database` is `Send`, so the recommended sharing
1311/// pattern for multi-threaded callers is `Arc<Mutex<Database>>`:
1312///
1313/// ```no_run
1314/// use std::sync::{Arc, Mutex};
1315/// use spg_embedded::Database;
1316///
1317/// let db = Database::open_in_memory();
1318/// let shared = Arc::new(Mutex::new(db));
1319/// let shared_for_worker = Arc::clone(&shared);
1320/// std::thread::spawn(move || {
1321/// let mut guard = shared_for_worker.lock().unwrap();
1322/// guard.execute("INSERT INTO t VALUES (1)").unwrap();
1323/// });
1324/// ```
1325///
1326/// Internal `RwLock`-wrapped state — letting many threads
1327/// hold concurrent `&Database` for `SELECT` without contending
1328/// — is parked as STABILITY § "Out of v7.2"; multi-reader
1329/// embedded throughput needs a planner-side change to release
1330/// the engine read lock between scans, which is the v7.x
1331/// "Choice A" line of work already documented in v6.9.1's
1332/// carve-out.
1333#[allow(dead_code)]
1334fn _database_is_send() {
1335 fn assert_send<T: Send>() {}
1336 assert_send::<Database>();
1337}
1338
1339/// v6.10.3 — trait that maps a row's columns onto a user
1340/// struct's fields. v7.3.0 ships the [`spg_row!`] declarative
1341/// macro that generates `impl FromSpgRow for YourStruct` from
1342/// a struct definition (no proc-macro, no syn/quote/
1343/// proc-macro2 deps — the workspace's "0 external deps"
1344/// policy holds).
1345///
1346/// Implementors map a row's columns onto a user struct's
1347/// fields. Errors surface as `EngineError::Unsupported` so the
1348/// caller's error type stays uniform.
1349pub trait FromSpgRow: Sized {
1350 /// Decode one query result row into `Self`. Called once per
1351 /// row by [`Database::query_typed`]. The slice length equals
1352 /// the number of columns in the SELECT projection.
1353 fn from_spg_row(row: &[Value]) -> Result<Self, EngineError>;
1354}
1355
1356/// v7.3.0 — declarative macro that generates `FromSpgRow` impl
1357/// for a user struct. Avoids proc-macro deps
1358/// (syn/quote/proc-macro2) so the workspace's 0-deps policy
1359/// holds; the trade-off vs `#[derive(SpgRow)]` is that the
1360/// macro takes the entire struct definition (fields + types)
1361/// as input rather than annotating an existing struct.
1362///
1363/// ```no_run
1364/// use spg_embedded::{Database, spg_row, FromSpgRow};
1365///
1366/// spg_row! {
1367/// pub struct User {
1368/// pub id: i32,
1369/// pub name: String,
1370/// }
1371/// }
1372///
1373/// let mut db = Database::open_in_memory();
1374/// db.execute("CREATE TABLE users (id INT NOT NULL, name TEXT)").unwrap();
1375/// db.execute("INSERT INTO users VALUES (1, 'alice')").unwrap();
1376/// let users: Vec<User> = db.query_typed("SELECT id, name FROM users").unwrap();
1377/// ```
1378///
1379/// Supported field types: `i16`, `i32`, `i64`, `f32`, `f64`,
1380/// `bool`, `String`, `Vec<f32>` (for `VECTOR(N)` columns),
1381/// `Option<T>` of any of the above.
1382#[macro_export]
1383macro_rules! spg_row {
1384 (
1385 $(#[$meta:meta])*
1386 $vis:vis struct $name:ident {
1387 $(
1388 $(#[$fmeta:meta])*
1389 $fvis:vis $field:ident : $ty:ty,
1390 )*
1391 }
1392 ) => {
1393 $(#[$meta])*
1394 #[derive(Debug, Clone)]
1395 $vis struct $name {
1396 $(
1397 $(#[$fmeta])*
1398 $fvis $field : $ty,
1399 )*
1400 }
1401
1402 impl $crate::FromSpgRow for $name {
1403 fn from_spg_row(row: &[$crate::Value]) -> ::core::result::Result<Self, $crate::EngineError> {
1404 let mut __spg_row_iter = row.iter();
1405 $(
1406 let $field: $ty = {
1407 let v = __spg_row_iter
1408 .next()
1409 .ok_or_else(|| $crate::EngineError::Unsupported(
1410 ::std::format!(
1411 "spg_row! {}: missing column for field `{}`",
1412 ::core::stringify!($name),
1413 ::core::stringify!($field)
1414 )
1415 ))?;
1416 <$ty as $crate::FromSpgValue>::from_spg_value(v)
1417 .map_err(|e| $crate::EngineError::Unsupported(
1418 ::std::format!(
1419 "spg_row! {}: column `{}`: {}",
1420 ::core::stringify!($name),
1421 ::core::stringify!($field),
1422 e
1423 )
1424 ))?
1425 };
1426 )*
1427 Ok(Self { $($field,)* })
1428 }
1429 }
1430 };
1431}
1432
1433/// v7.3.0 — per-column decoder used by `spg_row!`. Surface
1434/// covers every numeric / text / bytes / bool variant in
1435/// `Value`, plus `Option<T>` for nullable columns.
1436pub trait FromSpgValue: Sized {
1437 /// Decode one cell into `Self`. The returned `&'static str`
1438 /// is a short diagnostic for type mismatches (e.g. `"expected
1439 /// integer, got TEXT"`); callers wrap it into their own
1440 /// error type.
1441 fn from_spg_value(v: &Value) -> Result<Self, &'static str>;
1442}
1443
1444macro_rules! impl_from_value_int {
1445 ($($t:ty),* $(,)?) => {
1446 $(
1447 impl FromSpgValue for $t {
1448 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
1449 match v {
1450 Value::SmallInt(n) => <$t>::try_from(*n).map_err(|_| "SmallInt does not fit target int type"),
1451 Value::Int(n) => <$t>::try_from(*n).map_err(|_| "Int does not fit target int type"),
1452 Value::BigInt(n) => <$t>::try_from(*n).map_err(|_| "BigInt does not fit target int type"),
1453 Value::Null => Err("NULL in non-Option int column"),
1454 _ => Err("non-integer value in int column"),
1455 }
1456 }
1457 }
1458 )*
1459 };
1460}
1461impl_from_value_int!(i16, i32, i64);
1462
1463impl FromSpgValue for f32 {
1464 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
1465 match v {
1466 Value::Float(f) => Ok(*f as f32),
1467 Value::Null => Err("NULL in non-Option float column"),
1468 _ => Err("non-float value in float column"),
1469 }
1470 }
1471}
1472
1473impl FromSpgValue for f64 {
1474 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
1475 match v {
1476 Value::Float(f) => Ok(*f),
1477 Value::Null => Err("NULL in non-Option float column"),
1478 _ => Err("non-float value in float column"),
1479 }
1480 }
1481}
1482
1483impl FromSpgValue for bool {
1484 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
1485 match v {
1486 Value::Bool(b) => Ok(*b),
1487 Value::Null => Err("NULL in non-Option bool column"),
1488 _ => Err("non-bool value in bool column"),
1489 }
1490 }
1491}
1492
1493impl FromSpgValue for String {
1494 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
1495 match v {
1496 Value::Text(s) => Ok(s.clone()),
1497 Value::Null => Err("NULL in non-Option text column"),
1498 _ => Err("non-text value in String column"),
1499 }
1500 }
1501}
1502
1503impl FromSpgValue for Vec<f32> {
1504 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
1505 match v {
1506 Value::Vector(xs) => Ok(xs.clone()),
1507 Value::Null => Err("NULL in non-Option vector column"),
1508 _ => Err("non-vector value in Vec<f32> column"),
1509 }
1510 }
1511}
1512
1513impl<T: FromSpgValue> FromSpgValue for Option<T> {
1514 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
1515 match v {
1516 Value::Null => Ok(None),
1517 other => T::from_spg_value(other).map(Some),
1518 }
1519 }
1520}
1521
1522#[cfg(test)]
1523mod tests {
1524 use super::*;
1525
1526 #[test]
1527 fn in_memory_create_insert_select() {
1528 let mut db = Database::open_in_memory();
1529 db.execute("CREATE TABLE t (id INT NOT NULL, name TEXT)")
1530 .unwrap();
1531 db.execute("INSERT INTO t VALUES (1, 'alice')").unwrap();
1532 db.execute("INSERT INTO t VALUES (2, 'bob')").unwrap();
1533 let rows = db.query("SELECT id FROM t WHERE id = 1").unwrap();
1534 assert_eq!(rows.len(), 1);
1535 match &rows[0][0] {
1536 Value::Int(1) => {}
1537 other => panic!("expected Int(1), got {other:?}"),
1538 }
1539 }
1540
1541 #[test]
1542 fn query_on_non_select_errors() {
1543 let mut db = Database::open_in_memory();
1544 db.execute("CREATE TABLE t (id INT)").unwrap();
1545 let r = db.query("INSERT INTO t VALUES (1)");
1546 assert!(r.is_err(), "query() on INSERT must error");
1547 }
1548
1549 #[test]
1550 fn snapshot_roundtrip() {
1551 let mut db = Database::open_in_memory();
1552 db.execute("CREATE TABLE t (id INT NOT NULL)").unwrap();
1553 db.execute("INSERT INTO t VALUES (42)").unwrap();
1554 let bytes = db.snapshot();
1555 let mut restored = Database::restore(&bytes).unwrap();
1556 let rows = restored.query("SELECT id FROM t WHERE id = 42").unwrap();
1557 assert_eq!(rows.len(), 1);
1558 match &rows[0][0] {
1559 Value::Int(42) => {}
1560 other => panic!("expected Int(42), got {other:?}"),
1561 }
1562 }
1563
1564 #[test]
1565 fn from_spg_row_trait_shape() {
1566 struct User {
1567 _id: i32,
1568 }
1569 impl FromSpgRow for User {
1570 fn from_spg_row(row: &[Value]) -> Result<Self, EngineError> {
1571 match row.first() {
1572 Some(Value::Int(n)) => Ok(Self { _id: *n }),
1573 _ => Err(EngineError::Unsupported("bad id".into())),
1574 }
1575 }
1576 }
1577 let row = vec![Value::Int(7)];
1578 let _u = User::from_spg_row(&row).unwrap();
1579 }
1580}