spg_embedded/lib.rs
1// v7.7.2 — every public item in this crate must carry a
2// doc-comment; new code that adds a `pub` without one fails CI.
3#![deny(missing_docs)]
4
5//! # spg-embedded
6//!
7//! Ergonomic embedded-mode entry point for SPG. Wraps the
8//! `spg-engine` execution layer for in-process applications
9//! that don't want to spin up a TCP listener / fork to the
10//! `spg-server` binary.
11//!
12//! ## Quick start
13//!
14//! ```no_run
15//! use spg_embedded::Database;
16//!
17//! // On-disk, durable. WAL fsynced per commit; auto-checkpoint
18//! // at 4 MiB WAL by default.
19//! let mut db = Database::open_path("/data/app.db").unwrap();
20//! db.execute("CREATE TABLE users (id INT NOT NULL, name TEXT)").unwrap();
21//! db.execute("INSERT INTO users VALUES (1, 'alice')").unwrap();
22//! let rows = db.query("SELECT name FROM users WHERE id = 1").unwrap();
23//! for row in &rows {
24//! println!("{:?}", row);
25//! }
26//! ```
27//!
28//! ## Production checklist (v7.5)
29//!
30//! - **Persistence**: `Database::open_path(p)` writes a
31//! crash-consistent WAL + periodic checkpoint snapshot. The
32//! on-disk format is byte-identical to what `spg-server`
33//! produces, so a database can move between modes without
34//! conversion.
35//! - **Durability**: every `execute()` that mutates calls
36//! `fsync` before returning `Ok`. There is no group commit
37//! in embedded mode — every commit pays one fsync. If you
38//! need batch throughput, wrap multiple statements in
39//! [`Database::with_transaction`] which fsyncs only at
40//! commit.
41//! - **Concurrency**: [`Database`] is `Send` but **not** `Sync`.
42//! Share across threads via `Arc<Mutex<Database>>`. The
43//! single-writer model is intentional — see
44//! [STABILITY § A1](https://github.com/lihao/spg/blob/master/STABILITY.md).
45//! - **Background work**: [`Database::spawn_background_freezer`]
46//! moves cold rows to disk-resident segments while you keep
47//! serving requests. It runs in a dedicated thread; drop the
48//! returned [`FreezerHandle`] (or call `stop()`) for clean
49//! shutdown.
50//! - **Errors**: all public enums ([`EngineError`],
51//! [`QueryResult`], [`Value`]) are `#[non_exhaustive]`. Match
52//! them with a wildcard arm so future v7.x releases can add
53//! variants without breaking your code.
54//!
55//! ## Panic contract
56//!
57//! - **No `execute()` / `query()` call panics on user input.**
58//! Malformed SQL, type mismatches, missing tables — all
59//! return `Err(EngineError::…)`. If you observe a panic on
60//! a user-controlled string, that is a bug; file an issue.
61//! - The library panics **only** on internal invariant
62//! violations (e.g., catalog snapshot magic mismatch, WAL
63//! record CRC sentinel corruption that survived the boot-
64//! time validation). These represent silent disk corruption
65//! and an unwind would leak inconsistent state, so the
66//! release profile uses `panic = abort` — your host process
67//! dies fast rather than continuing on poisoned data.
68//! - If you cannot tolerate `panic = abort`, build with
69//! `--profile release-dbg` (keeps unwind tables) and use
70//! `std::panic::catch_unwind` at your application boundary.
71//!
72//! ## Why a separate crate?
73//!
74//! `spg-engine` is `no_std`-compatible (vendored alloc-only).
75//! The embedded-mode entry point uses `std` (filesystem,
76//! threading), so it lives in its own crate to keep the
77//! `no_std` boundary clean.
78
79pub use spg_engine::{CatalogSnapshot, Engine, EngineError, ParsedStatement, QueryResult};
80pub use spg_storage::{ColumnSchema, DataType, Value};
81
82/// v7.16.0 — handle for a parsed-and-planned SQL statement.
83/// Hand off to [`Database::execute_prepared`] / [`Database::query_prepared`]
84/// with a `&[Value]` slice carrying the bind parameters (PG-style
85/// `$1`, `$2`, … positional). Cheap to `Clone`; the underlying AST
86/// is shared by handle copies and cloned per bind call by the
87/// engine's executor.
88///
89/// The handle holds a snapshot of the AST at prepare time. If
90/// the engine's plan cache evicts the entry between prepare and
91/// execute (e.g. ANALYZE bumps the statistics version) the
92/// stored AST keeps working — `execute_prepared` operates on
93/// the handle's clone, not the cache entry.
94#[derive(Debug, Clone)]
95pub struct Statement {
96 /// The parsed + planned AST. `spg-engine::prepare_cached`
97 /// returns it as a clone of the cached plan, so any rewrite
98 /// passes (`expand_group_by_all`, `reorder_joins`, …) have
99 /// already run.
100 pub(crate) stmt: ParsedStatement,
101 /// Original SQL source, kept for `Display` / debug only.
102 /// WAL persistence renders from the AST so a bind-time
103 /// rewrite of `$1..$N` survives replay.
104 pub(crate) sql: String,
105}
106
107impl Statement {
108 /// Borrow the original SQL source — useful for tracing and
109 /// debug logs. WAL replay does NOT use this; it serialises
110 /// the bind-final AST instead.
111 #[must_use]
112 pub fn sql(&self) -> &str {
113 &self.sql
114 }
115}
116
117/// v7.16.0 — internal WAL helper. Mirrors what
118/// `Engine::execute_prepared` does to the cloned AST so the WAL
119/// record carries the bind-final SQL text (so replay's
120/// simple-query path reconstructs the same row state without
121/// needing the original `Statement` handle to still be alive).
122/// Errors from the underlying engine helper would only fire if
123/// the bind-final stmt referenced a placeholder past the params
124/// slice — and that case has already errored in the executor
125/// above before this helper runs, so we discard the Result here.
126fn wal_render_with_params(stmt: &mut ParsedStatement, params: &[Value]) {
127 let _ = spg_engine::substitute_placeholders(stmt, params);
128}
129
130use std::collections::BTreeMap;
131use std::fs::{File, OpenOptions};
132use std::io::{Seek, SeekFrom, Write};
133use std::path::{Path, PathBuf};
134use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
135use std::sync::{Arc, Mutex};
136use std::thread::{self, JoinHandle};
137use std::time::{Duration, SystemTime, UNIX_EPOCH};
138
139/// v7.11.3 — wall-clock provider injected into every embedded
140/// `Engine`. Microseconds since the Unix epoch; clamps to
141/// `i64::MAX` if the system clock is far-future. Used by SQL's
142/// `NOW()` / `CURRENT_TIMESTAMP` / `CURRENT_DATE` rewrite layer
143/// so PG-idiomatic time queries work without the caller wiring
144/// their own clock.
145fn wall_clock_micros() -> i64 {
146 SystemTime::now()
147 .duration_since(UNIX_EPOCH)
148 .map_or(0, |d| i64::try_from(d.as_micros()).unwrap_or(i64::MAX))
149}
150
151use spg_manifest::{CatalogManifest, ColdSegmentEntry, manifest_path as spg_manifest_path};
152
153// -- v7.1 WAL format constants (mirror `spg-server`'s) ---------
154// Kept private so callers can't mis-frame records; the v3 layout
155// is the same the server uses, so a `spg-server` boot can read a
156// database an embedded process wrote and vice versa.
157const WAL_V2_SENTINEL: u32 = 0x8000_0000;
158const WAL_V3_FLAG: u32 = 0x4000_0000;
159const WAL_V3_TYPE_AUTO_COMMIT_SQL: u8 = 0x01;
160/// v7.18 — durability checkpoint marker stays at 0x02 (skipped on replay).
161const WAL_V3_TYPE_DURABILITY_CHECKPOINT: u8 = 0x02;
162/// v7.18 PITR — auto-commit-sql record with appended (commit_lsn,
163/// commit_unix_us) fields so replay can target a specific point in
164/// time. Backward-compat: v3 records (type 0x01) keep working, the
165/// envelope flag bits are unchanged. The new type byte is the
166/// schema-version discriminator.
167const WAL_V4_TYPE_AUTO_COMMIT_SQL: u8 = 0x10;
168/// v7.18 — sentinel for "no wall clock" inside a v4 record's
169/// commit_unix_us slot. Restore-to-timestamp skips records with
170/// this sentinel (no time anchor); LSN-based restore is
171/// unaffected.
172const WAL_V4_NO_CLOCK: i64 = i64::MIN;
173/// v7.18 — extra header bytes after the type byte in a v4 record:
174/// 8 bytes commit_lsn (u64 LE) + 8 bytes commit_unix_us (i64 LE).
175const WAL_V4_EXTRA_HEADER: usize = 16;
176/// v7.18 PITR — checkpoint anchor record written to the WAL *before*
177/// the snapshot file replaces the on-disk catalog. Carries the
178/// (lsn, ts, snapshot_path) triple so restore tooling can find the
179/// matching base snapshot without scanning the filesystem. Replay
180/// dispatch skips it (same as the v3 durability marker).
181const WAL_V4_TYPE_CHECKPOINT_MARKER: u8 = 0x11;
182
183/// v7.1 — auto-checkpoint threshold. Once the WAL grows past
184/// this many bytes, the next successful `execute()` call ends
185/// with a `checkpoint()` so the WAL stays bounded. Tunable via
186/// `SPG_EMBEDDED_CHECKPOINT_BYTES` env.
187fn default_checkpoint_threshold_bytes() -> u64 {
188 std::env::var("SPG_EMBEDDED_CHECKPOINT_BYTES")
189 .ok()
190 .and_then(|s| s.parse::<u64>().ok())
191 .filter(|&n| n > 0)
192 .unwrap_or(4 * 1024 * 1024)
193}
194
195/// v7.1 — encode one v3 `auto_commit_sql` record. Layout:
196///
197/// ```text
198/// [u32 LE (len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
199/// [u32 LE crc32 over (type_byte || sql_bytes)]
200/// [u8 type = 0x01]
201/// [sql bytes]
202/// ```
203fn encode_v3_auto_commit(sql: &str) -> Vec<u8> {
204 let payload = sql.as_bytes();
205 let mut crc_buf = Vec::with_capacity(1 + payload.len());
206 crc_buf.push(WAL_V3_TYPE_AUTO_COMMIT_SQL);
207 crc_buf.extend_from_slice(payload);
208 let crc = spg_crypto::crc32::crc32(&crc_buf);
209 let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
210 let mut out = Vec::with_capacity(4 + 4 + 1 + payload.len());
211 out.extend_from_slice(&header);
212 out.extend_from_slice(&crc.to_le_bytes());
213 out.push(WAL_V3_TYPE_AUTO_COMMIT_SQL);
214 out.extend_from_slice(payload);
215 out
216}
217
218/// v7.18 PITR — encode one v4 `checkpoint_marker` record. Layout:
219///
220/// ```text
221/// [u32 LE (payload_len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
222/// [u32 LE crc32 over (type_byte || payload)]
223/// [u8 type = 0x11]
224/// payload:
225/// [u64 LE checkpoint_lsn]
226/// [i64 LE checkpoint_unix_us (WAL_V4_NO_CLOCK if no clock)]
227/// [u16 LE snapshot_path_len]
228/// [snapshot_path_bytes]
229/// ```
230///
231/// `payload_len` covers only the payload — keeping the framing
232/// uniform across v3 / v4 record types so torn-write detection in
233/// `replay_wal_into_engine` stays trivial.
234fn encode_v4_checkpoint_marker(
235 checkpoint_lsn: u64,
236 checkpoint_unix_us: i64,
237 snapshot_path: &Path,
238) -> Vec<u8> {
239 let snapshot_bytes = snapshot_path.to_string_lossy().into_owned();
240 let snap_payload = snapshot_bytes.as_bytes();
241 let snap_len_u16: u16 = snap_payload.len().min(u16::MAX as usize) as u16;
242 let mut payload = Vec::with_capacity(8 + 8 + 2 + snap_payload.len());
243 payload.extend_from_slice(&checkpoint_lsn.to_le_bytes());
244 payload.extend_from_slice(&checkpoint_unix_us.to_le_bytes());
245 payload.extend_from_slice(&snap_len_u16.to_le_bytes());
246 payload.extend_from_slice(&snap_payload[..snap_len_u16 as usize]);
247 let mut crc_buf = Vec::with_capacity(1 + payload.len());
248 crc_buf.push(WAL_V4_TYPE_CHECKPOINT_MARKER);
249 crc_buf.extend_from_slice(&payload);
250 let crc = spg_crypto::crc32::crc32(&crc_buf);
251 let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
252 let mut out = Vec::with_capacity(4 + 4 + 1 + payload.len());
253 out.extend_from_slice(&header);
254 out.extend_from_slice(&crc.to_le_bytes());
255 out.push(WAL_V4_TYPE_CHECKPOINT_MARKER);
256 out.extend_from_slice(&payload);
257 out
258}
259
260/// v7.18 PITR — encode one v4 `auto_commit_sql` record. Layout:
261///
262/// ```text
263/// [u32 LE (sql_len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
264/// [u32 LE crc32 over (type_byte || lsn || ts || sql_bytes)]
265/// [u8 type = 0x10]
266/// [u64 LE commit_lsn]
267/// [i64 LE commit_unix_us (= WAL_V4_NO_CLOCK when no ClockFn)]
268/// [sql bytes]
269/// ```
270///
271/// `sql_len` field stays the SQL byte count — same shape as v3 — so
272/// replay-buffer torn-write detection compares against
273/// `WAL_V4_EXTRA_HEADER + sql_len`. v3 records (type 0x01) stay
274/// readable by the same loop with their original 9-byte header
275/// arithmetic.
276fn encode_v4_auto_commit(sql: &str, commit_lsn: u64, commit_unix_us: i64) -> Vec<u8> {
277 let payload = sql.as_bytes();
278 let mut crc_buf = Vec::with_capacity(1 + WAL_V4_EXTRA_HEADER + payload.len());
279 crc_buf.push(WAL_V4_TYPE_AUTO_COMMIT_SQL);
280 crc_buf.extend_from_slice(&commit_lsn.to_le_bytes());
281 crc_buf.extend_from_slice(&commit_unix_us.to_le_bytes());
282 crc_buf.extend_from_slice(payload);
283 let crc = spg_crypto::crc32::crc32(&crc_buf);
284 let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
285 let mut out = Vec::with_capacity(4 + 4 + 1 + WAL_V4_EXTRA_HEADER + payload.len());
286 out.extend_from_slice(&header);
287 out.extend_from_slice(&crc.to_le_bytes());
288 out.push(WAL_V4_TYPE_AUTO_COMMIT_SQL);
289 out.extend_from_slice(&commit_lsn.to_le_bytes());
290 out.extend_from_slice(&commit_unix_us.to_le_bytes());
291 out.extend_from_slice(payload);
292 out
293}
294
295/// v7.1 — decode + apply every record in `wal_bytes` to `engine`.
296/// Returns the count of records successfully applied. A truncated
297/// trailing record (mid-write torn) is dropped silently — the
298/// same recovery story `spg-server`'s boot path uses.
299fn replay_wal_into_engine(wal_bytes: &[u8], engine: &mut Engine) -> Result<usize, String> {
300 let mut applied = 0usize;
301 let mut cur = 0usize;
302 while cur < wal_bytes.len() {
303 if wal_bytes.len() - cur < 4 {
304 // Trailing partial header — torn write, drop and stop.
305 break;
306 }
307 let raw_len = u32::from_le_bytes(wal_bytes[cur..cur + 4].try_into().unwrap());
308 let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
309 let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
310 let len_mask = if is_v3 {
311 !(WAL_V2_SENTINEL | WAL_V3_FLAG)
312 } else {
313 !WAL_V2_SENTINEL
314 };
315 let rec_len = (raw_len & len_mask) as usize;
316 let header_len = if is_v3 {
317 9
318 } else if is_v2 {
319 8
320 } else {
321 4
322 };
323 if wal_bytes.len() - cur < header_len + rec_len {
324 // Torn record at the tail — drop, stop.
325 break;
326 }
327 if is_v3 {
328 let type_byte = wal_bytes[cur + 8];
329 match type_byte {
330 WAL_V3_TYPE_AUTO_COMMIT_SQL => {}
331 WAL_V3_TYPE_DURABILITY_CHECKPOINT => {
332 // durability_checkpoint marker — skip, no SQL.
333 cur += header_len + rec_len;
334 continue;
335 }
336 WAL_V4_TYPE_CHECKPOINT_MARKER => {
337 // v7.18 PITR — checkpoint anchor, skip on replay
338 // (engine state past this point reflects the
339 // matching snapshot already loaded by the caller).
340 cur += header_len + rec_len;
341 continue;
342 }
343 WAL_V4_TYPE_AUTO_COMMIT_SQL => {
344 // v7.18 PITR — v4 record carries 16 bytes of
345 // (commit_lsn, commit_unix_us) between the type
346 // byte and the SQL payload. Replay reads them but
347 // does not enforce them — the engine doesn't
348 // surface LSN/clock here. Restore tooling
349 // (spgctl) parses them via parse_wal_record below.
350 let v4_total = header_len + WAL_V4_EXTRA_HEADER + rec_len;
351 if wal_bytes.len() - cur < v4_total {
352 // Torn v4 record at the tail — drop, stop.
353 break;
354 }
355 let sql_start = cur + header_len + WAL_V4_EXTRA_HEADER;
356 let sql_bytes = &wal_bytes[sql_start..sql_start + rec_len];
357 let sql = std::str::from_utf8(sql_bytes).map_err(|e| {
358 format!("WAL replay: non-UTF-8 SQL at offset {cur}: {e}")
359 })?;
360 engine.execute(sql).map_err(|e| {
361 format!(
362 "WAL replay: apply {sql:?} at offset {cur} rejected: {e:?}"
363 )
364 })?;
365 applied += 1;
366 cur += v4_total;
367 continue;
368 }
369 other => {
370 return Err(format!(
371 "WAL replay: unknown v3 type byte {other:#04x} at offset {cur}"
372 ));
373 }
374 }
375 }
376 let sql_bytes = &wal_bytes[cur + header_len..cur + header_len + rec_len];
377 let sql = std::str::from_utf8(sql_bytes)
378 .map_err(|e| format!("WAL replay: non-UTF-8 SQL at offset {cur}: {e}"))?;
379 engine
380 .execute(sql)
381 .map_err(|e| format!("WAL replay: apply {sql:?} at offset {cur} rejected: {e:?}"))?;
382 applied += 1;
383 cur += header_len + rec_len;
384 }
385 Ok(applied)
386}
387
388/// v7.18 PITR — parsed WAL record, surfaced for restore / verify
389/// tooling. The replay loop above doesn't expose LSN/timestamp;
390/// `spgctl restore --to <timestamp>` and `spgctl verify` need them.
391/// Returned offsets are byte-positions inside the WAL buffer.
392#[derive(Debug, Clone)]
393pub struct WalRecord<'a> {
394 /// Byte offset in the WAL buffer where this record starts.
395 pub offset: usize,
396 /// Type byte (0x01 = v3 auto-commit, 0x10 = v4 auto-commit,
397 /// 0x02 = durability checkpoint marker).
398 pub type_byte: u8,
399 /// `Some(lsn)` for v4 records, `None` for v3.
400 pub commit_lsn: Option<u64>,
401 /// `Some(unix_us)` for v4 records carrying a clock-set timestamp,
402 /// `None` for v3 or for v4 records explicitly written with
403 /// `WAL_V4_NO_CLOCK` (sentinel for "no ClockFn at commit time").
404 pub commit_unix_us: Option<i64>,
405 /// SQL payload as borrowed bytes. Empty for durability markers.
406 pub sql: &'a [u8],
407}
408
409/// v7.18 PITR — iterate over `wal_bytes` yielding one `WalRecord`
410/// per intact record. Torn-tail records terminate iteration
411/// silently (same recovery story as `replay_wal_into_engine`).
412/// Unknown type bytes inside a v3 envelope return `Err` so the
413/// caller knows the WAL was written by a newer SPG.
414pub fn parse_wal_records(wal_bytes: &[u8]) -> Result<Vec<WalRecord<'_>>, String> {
415 let mut out = Vec::new();
416 let mut cur = 0usize;
417 while cur < wal_bytes.len() {
418 if wal_bytes.len() - cur < 4 {
419 break;
420 }
421 let raw_len = u32::from_le_bytes(wal_bytes[cur..cur + 4].try_into().unwrap());
422 let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
423 let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
424 let len_mask = if is_v3 {
425 !(WAL_V2_SENTINEL | WAL_V3_FLAG)
426 } else {
427 !WAL_V2_SENTINEL
428 };
429 let rec_len = (raw_len & len_mask) as usize;
430 let header_len = if is_v3 {
431 9
432 } else if is_v2 {
433 8
434 } else {
435 4
436 };
437 if wal_bytes.len() - cur < header_len + rec_len {
438 break;
439 }
440 if !is_v3 {
441 // v1 / v2 records carry no type byte; treat as legacy
442 // auto-commit SQL with no LSN/time.
443 let sql = &wal_bytes[cur + header_len..cur + header_len + rec_len];
444 out.push(WalRecord {
445 offset: cur,
446 type_byte: WAL_V3_TYPE_AUTO_COMMIT_SQL,
447 commit_lsn: None,
448 commit_unix_us: None,
449 sql,
450 });
451 cur += header_len + rec_len;
452 continue;
453 }
454 let type_byte = wal_bytes[cur + 8];
455 match type_byte {
456 WAL_V3_TYPE_AUTO_COMMIT_SQL => {
457 let sql = &wal_bytes[cur + header_len..cur + header_len + rec_len];
458 out.push(WalRecord {
459 offset: cur,
460 type_byte,
461 commit_lsn: None,
462 commit_unix_us: None,
463 sql,
464 });
465 cur += header_len + rec_len;
466 }
467 WAL_V3_TYPE_DURABILITY_CHECKPOINT => {
468 out.push(WalRecord {
469 offset: cur,
470 type_byte,
471 commit_lsn: None,
472 commit_unix_us: None,
473 sql: &[],
474 });
475 cur += header_len + rec_len;
476 }
477 WAL_V4_TYPE_CHECKPOINT_MARKER => {
478 // v7.18 PITR — payload = (lsn u64)(ts i64)(path_len u16)(path bytes).
479 // We surface lsn + ts on the WalRecord; the path lives
480 // in `sql` since the type byte already disambiguates
481 // record meaning and adding a dedicated field would
482 // bloat the iterator return type for every variant.
483 if rec_len < 18 {
484 return Err(format!(
485 "WAL parse: checkpoint marker at offset {cur} too short ({rec_len} bytes)"
486 ));
487 }
488 let lsn = u64::from_le_bytes(
489 wal_bytes[cur + header_len..cur + header_len + 8]
490 .try_into()
491 .unwrap(),
492 );
493 let ts_raw = i64::from_le_bytes(
494 wal_bytes[cur + header_len + 8..cur + header_len + 16]
495 .try_into()
496 .unwrap(),
497 );
498 let path_len = u16::from_le_bytes(
499 wal_bytes[cur + header_len + 16..cur + header_len + 18]
500 .try_into()
501 .unwrap(),
502 ) as usize;
503 if rec_len < 18 + path_len {
504 return Err(format!(
505 "WAL parse: checkpoint marker at offset {cur} truncated path"
506 ));
507 }
508 let path_start = cur + header_len + 18;
509 let path_bytes = &wal_bytes[path_start..path_start + path_len];
510 let commit_unix_us = if ts_raw == WAL_V4_NO_CLOCK {
511 None
512 } else {
513 Some(ts_raw)
514 };
515 out.push(WalRecord {
516 offset: cur,
517 type_byte,
518 commit_lsn: Some(lsn),
519 commit_unix_us,
520 sql: path_bytes,
521 });
522 cur += header_len + rec_len;
523 }
524 WAL_V4_TYPE_AUTO_COMMIT_SQL => {
525 let v4_total = header_len + WAL_V4_EXTRA_HEADER + rec_len;
526 if wal_bytes.len() - cur < v4_total {
527 break;
528 }
529 let lsn = u64::from_le_bytes(
530 wal_bytes[cur + header_len..cur + header_len + 8]
531 .try_into()
532 .unwrap(),
533 );
534 let ts_raw = i64::from_le_bytes(
535 wal_bytes[cur + header_len + 8..cur + header_len + 16]
536 .try_into()
537 .unwrap(),
538 );
539 let commit_unix_us = if ts_raw == WAL_V4_NO_CLOCK {
540 None
541 } else {
542 Some(ts_raw)
543 };
544 let sql_start = cur + header_len + WAL_V4_EXTRA_HEADER;
545 let sql = &wal_bytes[sql_start..sql_start + rec_len];
546 out.push(WalRecord {
547 offset: cur,
548 type_byte,
549 commit_lsn: Some(lsn),
550 commit_unix_us,
551 sql,
552 });
553 cur += v4_total;
554 }
555 other => {
556 return Err(format!(
557 "WAL parse: unknown type byte {other:#04x} at offset {cur}"
558 ));
559 }
560 }
561 }
562 Ok(out)
563}
564
565/// v7.1 — predicate for "should the next `execute()` mutate the
566/// WAL?" Returns `false` for SELECT / SHOW / EXPLAIN / BEGIN /
567/// COMMIT / ROLLBACK and the SPG-specific verbs that don't go
568/// through the auto-commit record path on the server (CHECKPOINT,
569/// COMPACT). Conservative: anything we don't explicitly know is
570/// read-only falls through to "write a WAL record".
571fn sql_is_read_only(sql: &str) -> bool {
572 let t = sql.trim_start();
573 let head = t
574 .split(|c: char| c.is_whitespace() || c == ';' || c == '(')
575 .next()
576 .unwrap_or("");
577 matches!(
578 head.to_ascii_lowercase().as_str(),
579 "select"
580 | "show"
581 | "explain"
582 | "begin"
583 | "commit"
584 | "rollback"
585 | "checkpoint"
586 | "compact"
587 | "wait"
588 | "with"
589 )
590}
591
592/// Embedded SPG database handle. Owns an `Engine` + provides
593/// ergonomic wrappers around `execute` and `query`. Drops the
594/// engine on `Drop` — no WAL flush / fsync, because v6.10.3
595/// is in-memory only.
596#[derive(Debug)]
597pub struct Database {
598 engine: Engine,
599 /// v7.1 — persistence sidecar. When `Some(p)`, every
600 /// `execute(sql)` that mutates state appends a v4
601 /// `auto_commit_sql` WAL record + fsyncs before the call
602 /// returns; `Drop` writes a final catalog snapshot to
603 /// `<db_path>` so the next session boots from a clean
604 /// snapshot + an empty WAL. `None` = in-memory only (the
605 /// v6.10.3 shape).
606 persistence: Option<PersistenceCtx>,
607 /// v7.18 PITR — monotonic per-database commit LSN. Increments
608 /// before each successful WAL append; bootstrapped at
609 /// open_path from `max(parse_wal_records → commit_lsn)` so
610 /// reopen never reuses an LSN. In-memory databases start at
611 /// 0 and never advance (no WAL = no LSN-meaningful records).
612 commit_lsn: AtomicU64,
613}
614
615#[derive(Debug)]
616#[allow(dead_code)] // `wal_path` is read at boot; kept for Drop/diag introspection.
617struct PersistenceCtx {
618 db_path: PathBuf,
619 wal_path: PathBuf,
620 wal: File,
621 /// Cached WAL file length so each `execute()` doesn't have
622 /// to stat. Refreshed on append + on `checkpoint()` (which
623 /// truncates back to 0).
624 wal_len: u64,
625 checkpoint_threshold_bytes: u64,
626 /// v7.1.4 — `<db_path>.spg/segments/` directory. Cold-tier
627 /// segments produced by `freeze_oldest_to_cold` / compaction
628 /// are persisted here as `seg_<id>.spg` files; the manifest
629 /// at `<db_path>.spg/manifest.v10` records every active
630 /// segment + its CRC32 so the next boot can verify + reload.
631 cold_segments_dir: PathBuf,
632 cold_segment_paths: BTreeMap<u32, PathBuf>,
633 /// v7.17.0 Phase 6.2 — cross-process exclusion lock. Acquired
634 /// via `fs::create_dir` on `<db_path>.lock` at open_path
635 /// entry; released on Drop by `fs::remove_dir`. atomic on
636 /// every supported platform. A second process opening the
637 /// same path while the first is still alive hits the
638 /// create_dir failure and returns
639 /// `EngineError::Unsupported("database is locked by another
640 /// process: …")`. Stale locks (process crashed mid-session)
641 /// must be cleared via `Database::force_unlock(path)` —
642 /// SPG can't safely fingerprint who owned a stale directory
643 /// without a libc dep, which would violate spg-embedded's
644 /// zero-deps charter.
645 lock_path: PathBuf,
646}
647
648impl Database {
649 /// Open a fresh in-memory database. No WAL, no catalog
650 /// snapshot on disk — perfect for tests + short-lived
651 /// CLI tools.
652 #[must_use]
653 pub fn open_in_memory() -> Self {
654 Self {
655 engine: Engine::new().with_clock(wall_clock_micros),
656 persistence: None,
657 commit_lsn: AtomicU64::new(0),
658 }
659 }
660
661 /// v7.1 — Open or create a persistent database backed by
662 /// the file at `db_path`. The WAL lives at `db_path` +
663 /// ".wal" (e.g. `./data/spg.db` → `./data/spg.db.wal`). Boot
664 /// path:
665 ///
666 /// 1. If `db_path` exists, restore the catalog snapshot.
667 /// 2. If the WAL exists, replay every record into the
668 /// restored engine — the same recovery story
669 /// `spg-server` uses.
670 /// 3. Open the WAL in append+sync mode so subsequent
671 /// `execute()` writes durably commit (one fsync per
672 /// mutation).
673 ///
674 /// `Drop` writes a final catalog snapshot + truncates the
675 /// WAL — operators that need a sync barrier at a specific
676 /// point use `checkpoint()` explicitly.
677 pub fn open_path(db_path: impl AsRef<Path>) -> Result<Self, EngineError> {
678 let db_path = db_path.as_ref().to_path_buf();
679 let wal_path = {
680 let mut p = db_path.clone();
681 let name = p
682 .file_name()
683 .map(|n| {
684 let mut s = n.to_os_string();
685 s.push(".wal");
686 s
687 })
688 .unwrap_or_else(|| std::ffi::OsString::from(".wal"));
689 p.set_file_name(name);
690 p
691 };
692 if let Some(parent) = db_path.parent()
693 && !parent.as_os_str().is_empty()
694 {
695 std::fs::create_dir_all(parent).map_err(io_err)?;
696 }
697 // v7.17.0 Phase 6.2 — acquire cross-process exclusion
698 // lock before touching any catalog / WAL bytes. atomic
699 // mkdir on every supported platform; a second process
700 // opening the same path while the first is still alive
701 // hits the create_dir failure and gets a clear error.
702 let lock_path = {
703 let mut p = db_path.clone();
704 let name = p
705 .file_name()
706 .map(|n| {
707 let mut s = n.to_os_string();
708 s.push(".lock");
709 s
710 })
711 .unwrap_or_else(|| std::ffi::OsString::from(".lock"));
712 p.set_file_name(name);
713 p
714 };
715 std::fs::create_dir(&lock_path).map_err(|e| {
716 if e.kind() == std::io::ErrorKind::AlreadyExists {
717 EngineError::Unsupported(format!(
718 "database is locked by another process (or stale lock): {}; \
719 remove the directory manually after confirming no other \
720 process holds it, or call Database::force_unlock()",
721 lock_path.display()
722 ))
723 } else {
724 io_err(e)
725 }
726 })?;
727 let mut engine = if db_path.exists() {
728 let bytes = std::fs::read(&db_path).map_err(io_err)?;
729 let engine = Engine::restore_envelope(&bytes).map_err(|e| {
730 EngineError::Storage(spg_storage::StorageError::Corrupt(format!(
731 "restore from {}: {e}",
732 db_path.display()
733 )))
734 })?;
735 engine.with_clock(wall_clock_micros)
736 } else {
737 Engine::new().with_clock(wall_clock_micros)
738 };
739 // v7.1.4 — manifest-driven cold-segment reload. The
740 // manifest sidecar pairs the catalog snapshot CRC with a
741 // list of `(segment_id, path, crc32)` triples; verify
742 // before loading so a torn or stale manifest doesn't
743 // surface phantom data.
744 let cold_segments_dir = {
745 let parent = db_path.parent().unwrap_or_else(|| Path::new("."));
746 let stem = db_path
747 .file_stem()
748 .unwrap_or_else(|| std::ffi::OsStr::new("db"))
749 .to_string_lossy()
750 .into_owned();
751 parent.join(format!("{stem}.spg")).join("segments")
752 };
753 let mut cold_segment_paths: BTreeMap<u32, PathBuf> = BTreeMap::new();
754 let manifest_pth = spg_manifest_path(&db_path);
755 if manifest_pth.exists() && db_path.exists() {
756 let m_bytes = std::fs::read(&manifest_pth).map_err(io_err)?;
757 if let Ok(m) = CatalogManifest::deserialize(&m_bytes) {
758 let snap_bytes = std::fs::read(&db_path).map_err(io_err)?;
759 let snap_crc = spg_crypto::crc32::crc32(&snap_bytes);
760 if snap_crc == m.catalog_crc32 {
761 for entry in &m.cold_segments {
762 if let Ok(seg_bytes) = std::fs::read(&entry.path) {
763 let computed = spg_crypto::crc32::crc32(&seg_bytes);
764 if computed != entry.crc32 {
765 eprintln!(
766 "spg-embedded: manifest skip segment {}: CRC mismatch",
767 entry.segment_id
768 );
769 continue;
770 }
771 if engine.catalog().cold_segment(entry.segment_id).is_some() {
772 // Already loaded via Catalog::clone path (shouldn't happen
773 // since Engine::new + restore_envelope don't populate cold).
774 continue;
775 }
776 let mut new_cat = engine.catalog().clone();
777 if let Err(e) =
778 new_cat.load_segment_bytes_at(entry.segment_id, seg_bytes)
779 {
780 eprintln!(
781 "spg-embedded: manifest load segment {} failed: {e}",
782 entry.segment_id
783 );
784 continue;
785 }
786 engine.replace_catalog(new_cat);
787 cold_segment_paths.insert(entry.segment_id, entry.path.clone());
788 } else {
789 eprintln!(
790 "spg-embedded: manifest skip segment {}: file unreadable",
791 entry.segment_id
792 );
793 }
794 }
795 }
796 }
797 }
798 let mut initial_lsn: u64 = 0;
799 if wal_path.exists() {
800 let wal_bytes = std::fs::read(&wal_path).map_err(io_err)?;
801 if !wal_bytes.is_empty() {
802 replay_wal_into_engine(&wal_bytes, &mut engine)
803 .map_err(|m| EngineError::Storage(spg_storage::StorageError::Corrupt(m)))?;
804 // v7.18 PITR — recover the commit-LSN watermark so
805 // the new session does not re-issue an LSN that
806 // already lives in the WAL. parse_wal_records yields
807 // None for v3 records (they predate the LSN field);
808 // an empty / v3-only WAL leaves the counter at 0.
809 if let Ok(records) = parse_wal_records(&wal_bytes) {
810 if let Some(max) = records.iter().filter_map(|r| r.commit_lsn).max() {
811 initial_lsn = max;
812 }
813 }
814 }
815 }
816 let wal = OpenOptions::new()
817 .create(true)
818 .append(true)
819 .read(true)
820 .open(&wal_path)
821 .map_err(io_err)?;
822 let wal_len = wal.metadata().map_err(io_err)?.len();
823 Ok(Self {
824 engine,
825 commit_lsn: AtomicU64::new(initial_lsn),
826 persistence: Some(PersistenceCtx {
827 db_path,
828 wal_path,
829 wal,
830 wal_len,
831 checkpoint_threshold_bytes: default_checkpoint_threshold_bytes(),
832 cold_segments_dir,
833 cold_segment_paths,
834 lock_path,
835 }),
836 })
837 }
838
839 /// v7.1.4 — freeze the oldest `max_rows` of `table_name`'s
840 /// hot tier into a brand-new cold-tier segment + persist
841 /// it to disk. Same semantics as `spg-server`'s freezer
842 /// thread; embedded just runs the freeze synchronously on
843 /// the caller's thread. Persistence + manifest update
844 /// happen as part of the next `checkpoint()` (or on Drop).
845 pub fn freeze_oldest_to_cold(
846 &mut self,
847 table_name: &str,
848 index_name: &str,
849 max_rows: usize,
850 ) -> Result<spg_storage::FreezeReport, EngineError> {
851 let report = self
852 .engine
853 .freeze_oldest_to_cold(table_name, index_name, max_rows)?;
854 if let Some(p) = &mut self.persistence {
855 std::fs::create_dir_all(&p.cold_segments_dir).map_err(io_err)?;
856 let final_path = p
857 .cold_segments_dir
858 .join(format!("seg_{}.spg", report.segment_id));
859 let tmp_path = p
860 .cold_segments_dir
861 .join(format!("seg_{}.spg.tmp", report.segment_id));
862 std::fs::write(&tmp_path, &report.segment_bytes).map_err(io_err)?;
863 std::fs::rename(&tmp_path, &final_path).map_err(io_err)?;
864 p.cold_segment_paths.insert(report.segment_id, final_path);
865 }
866 Ok(report)
867 }
868
869 /// v7.1 — override the auto-checkpoint WAL-size ceiling for
870 /// this `Database` instance. Default is
871 /// `SPG_EMBEDDED_CHECKPOINT_BYTES` env (4 MiB if unset); the
872 /// setter wins. No-op when the database is in-memory.
873 pub fn set_checkpoint_threshold_bytes(&mut self, bytes: u64) {
874 if let Some(p) = &mut self.persistence {
875 p.checkpoint_threshold_bytes = bytes.max(1);
876 }
877 }
878
879 /// v7.1 — flush a fresh catalog snapshot to `db_path` and
880 /// truncate the WAL. Idempotent; cheap when nothing has
881 /// happened since the last checkpoint. No-op when the
882 /// database is in-memory (no `db_path` configured).
883 ///
884 /// Called automatically when:
885 /// - the WAL grows past
886 /// `SPG_EMBEDDED_CHECKPOINT_BYTES` (default 4 MiB) at the
887 /// end of an `execute()`, and
888 /// - `Drop` runs (best-effort; checkpoint failure on drop is
889 /// logged to stderr).
890 pub fn checkpoint(&mut self) -> Result<(), EngineError> {
891 let snapshot = self.engine.snapshot();
892 let Some(p) = &mut self.persistence else {
893 return Ok(());
894 };
895 // Snapshot first (atomic via tmp+rename), then WAL
896 // truncate. Same order as `spg-server`'s CHECKPOINT —
897 // a crash between the two leaves the WAL holding
898 // already-snapshotted ops, which replay cleanly on the
899 // next boot (idempotent for SPG's standard DDL/DML
900 // mutations).
901 let tmp = {
902 let mut t = p.db_path.clone();
903 let mut name = t
904 .file_name()
905 .map(std::ffi::OsStr::to_os_string)
906 .unwrap_or_default();
907 name.push(".tmp");
908 t.set_file_name(name);
909 t
910 };
911 std::fs::write(&tmp, &snapshot).map_err(io_err)?;
912 std::fs::rename(&tmp, &p.db_path).map_err(io_err)?;
913 // v7.1.4 — refresh the manifest so the next boot can
914 // reload cold segments alongside the snapshot. Bytes
915 // come from the freshly-written snapshot file (= the
916 // canonical CRC source).
917 if !p.cold_segment_paths.is_empty() {
918 let snap_crc = spg_crypto::crc32::crc32(&snapshot);
919 let entries: Vec<ColdSegmentEntry> = p
920 .cold_segment_paths
921 .iter()
922 .filter_map(|(&segment_id, path)| {
923 let bytes = std::fs::read(path).ok()?;
924 Some(ColdSegmentEntry {
925 segment_id,
926 path: path.clone(),
927 crc32: spg_crypto::crc32::crc32(&bytes),
928 })
929 })
930 .collect();
931 let manifest = CatalogManifest {
932 catalog_crc32: snap_crc,
933 cold_segments: entries,
934 wal_baseline_offset: 0,
935 };
936 let m_bytes = manifest.serialize();
937 let m_path = spg_manifest_path(&p.db_path);
938 if let Some(dir) = m_path.parent() {
939 std::fs::create_dir_all(dir).map_err(io_err)?;
940 }
941 let m_tmp = {
942 let mut t = m_path.clone();
943 let mut name = t
944 .file_name()
945 .map(std::ffi::OsStr::to_os_string)
946 .unwrap_or_default();
947 name.push(".tmp");
948 t.set_file_name(name);
949 t
950 };
951 std::fs::write(&m_tmp, &m_bytes).map_err(io_err)?;
952 std::fs::rename(&m_tmp, &m_path).map_err(io_err)?;
953 }
954 // v7.18 PITR — append a checkpoint marker BEFORE truncating
955 // so backup tooling that copies the WAL between snapshot
956 // rotations sees the (lsn, ts, snapshot_path) triple that
957 // anchors restore-to-time. The marker rides the WAL's
958 // existing CRC-protected v3 envelope under the new v4 type
959 // byte 0x11; replay dispatch (and `revert_wal_to_seq`)
960 // already skip it. The truncate below then drops both the
961 // SQL records the snapshot superseded AND this marker —
962 // PITR retention work (P4/P6) intercepts the WAL before
963 // this point to archive both pieces together.
964 let marker_lsn = self.commit_lsn.load(Ordering::SeqCst);
965 let marker_ts = wall_clock_micros();
966 let marker = encode_v4_checkpoint_marker(marker_lsn, marker_ts, &p.db_path);
967 p.wal.write_all(&marker).map_err(io_err)?;
968 p.wal.sync_data().map_err(io_err)?;
969 p.wal.set_len(0).map_err(io_err)?;
970 p.wal.seek(SeekFrom::Start(0)).map_err(io_err)?;
971 p.wal.sync_data().map_err(io_err)?;
972 p.wal_len = 0;
973 Ok(())
974 }
975
976 /// Restore a database from a previously-captured catalog
977 /// snapshot. Pairs with `Database::snapshot()` for
978 /// round-tripping in-memory state without going through
979 /// the `spg-server` WAL.
980 pub fn restore(snapshot: &[u8]) -> Result<Self, EngineError> {
981 let engine = Engine::restore_envelope(snapshot).map_err(|e| {
982 EngineError::Storage(spg_storage::StorageError::Corrupt(format!("restore: {e}")))
983 })?;
984 Ok(Self {
985 engine,
986 persistence: None,
987 commit_lsn: AtomicU64::new(0),
988 })
989 }
990
991 /// Take a catalog snapshot suitable for `Database::restore`.
992 /// The bytes are SPG's canonical catalog envelope (FILE_MAGIC
993 /// + version + payload); round-trips through every released
994 /// SPG version per the STABILITY contract.
995 #[must_use]
996 pub fn snapshot(&self) -> Vec<u8> {
997 self.engine.snapshot()
998 }
999
1000 /// Execute a SQL statement and return the engine's
1001 /// `QueryResult` verbatim. Pass-through for callers that
1002 /// want to keep PG-flavoured column/row metadata.
1003 ///
1004 /// v7.1 — when the database was opened via `open_path`,
1005 /// successful mutations are appended to the WAL + fsynced
1006 /// before the call returns. A subsequent process crash will
1007 /// recover state up to the last successful return from
1008 /// `execute()`. Read-only statements (SELECT / SHOW /
1009 /// EXPLAIN / BEGIN-COMMIT-ROLLBACK / CHECKPOINT / COMPACT
1010 /// etc.) skip the WAL entirely.
1011 pub fn execute(&mut self, sql: &str) -> Result<QueryResult, EngineError> {
1012 let result = self.engine.execute(sql)?;
1013 if self.persistence.is_some()
1014 && !sql_is_read_only(sql)
1015 && matches!(
1016 &result,
1017 QueryResult::CommandOk {
1018 modified_catalog: true,
1019 ..
1020 }
1021 )
1022 {
1023 // v7.18 PITR — write v4 records that carry the commit
1024 // LSN + wall-clock micros so restore tooling can
1025 // target a point in time. Replay path still accepts
1026 // v3 records emitted by older spg-embedded versions.
1027 // Crash window is bounded by one record exactly as
1028 // under v3: WAL fsync happens after the in-memory
1029 // mutation, so the WAL never describes a write that
1030 // didn't apply.
1031 let lsn = self.commit_lsn.fetch_add(1, Ordering::SeqCst) + 1;
1032 let ts = wall_clock_micros();
1033 let record = encode_v4_auto_commit(sql, lsn, ts);
1034 let p = self.persistence.as_mut().expect("checked above");
1035 p.wal.write_all(&record).map_err(io_err)?;
1036 p.wal.sync_data().map_err(io_err)?;
1037 p.wal_len = p.wal_len.saturating_add(record.len() as u64);
1038 if p.wal_len >= p.checkpoint_threshold_bytes {
1039 self.checkpoint()?;
1040 }
1041 }
1042 Ok(result)
1043 }
1044
1045 /// v7.3.0 — typed-row variant of [`Database::query`]. Each
1046 /// row decodes into a `T: FromSpgRow` so callers don't
1047 /// pattern-match on `Value` themselves. Use [`spg_row!`] to
1048 /// generate the impl, or write it by hand.
1049 pub fn query_typed<T: FromSpgRow>(&mut self, sql: &str) -> Result<Vec<T>, EngineError> {
1050 let rows = self.query(sql)?;
1051 rows.into_iter().map(|r| T::from_spg_row(&r)).collect()
1052 }
1053
1054 /// Run a SELECT and return rows as a `Vec<Vec<Value>>` —
1055 /// strips the column-schema metadata for read-side
1056 /// ergonomics. Errors on non-Rows results (DML / DDL
1057 /// statements should go through `execute` instead).
1058 pub fn query(&mut self, sql: &str) -> Result<Vec<Vec<Value>>, EngineError> {
1059 match self.engine.execute(sql)? {
1060 QueryResult::Rows { rows, .. } => Ok(rows.into_iter().map(|r| r.values).collect()),
1061 QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
1062 "query() expects a SELECT — use execute() for DML/DDL".into(),
1063 )),
1064 // v7.5.0 — QueryResult is #[non_exhaustive]; any future
1065 // variant is not a SELECT row stream, treat as Unsupported.
1066 _ => Err(EngineError::Unsupported(
1067 "query() expects a SELECT — use execute() for DML/DDL".into(),
1068 )),
1069 }
1070 }
1071
1072 /// v7.16.0 — column-aware variant of [`Self::query`].
1073 /// Returns the column schema vec alongside the rows so
1074 /// adapters (the spg-sqlx Row impl most notably) can drive
1075 /// name + type-based column lookups. Errors on non-Rows
1076 /// results identically to `query`.
1077 pub fn query_with_columns(
1078 &mut self,
1079 sql: &str,
1080 ) -> Result<(Vec<spg_storage::ColumnSchema>, Vec<Vec<Value>>), EngineError> {
1081 match self.engine.execute(sql)? {
1082 QueryResult::Rows { columns, rows } => {
1083 Ok((columns, rows.into_iter().map(|r| r.values).collect()))
1084 }
1085 QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
1086 "query_with_columns() expects a SELECT — use execute() for DML/DDL".into(),
1087 )),
1088 _ => Err(EngineError::Unsupported(
1089 "query_with_columns() expects a SELECT — use execute() for DML/DDL".into(),
1090 )),
1091 }
1092 }
1093
1094 /// v7.16.0 — column-aware variant of
1095 /// [`Self::query_prepared`]. Same shape as
1096 /// `query_with_columns` but driven from a prepared
1097 /// statement + bound params.
1098 pub fn query_prepared_with_columns(
1099 &mut self,
1100 stmt: &Statement,
1101 params: &[Value],
1102 ) -> Result<(Vec<spg_storage::ColumnSchema>, Vec<Vec<Value>>), EngineError> {
1103 match self.engine.execute_prepared(stmt.stmt.clone(), params)? {
1104 QueryResult::Rows { columns, rows } => {
1105 Ok((columns, rows.into_iter().map(|r| r.values).collect()))
1106 }
1107 QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
1108 "query_prepared_with_columns() expects a SELECT — use execute_prepared() for DML/DDL".into(),
1109 )),
1110 _ => Err(EngineError::Unsupported(
1111 "query_prepared_with_columns() expects a SELECT — use execute_prepared() for DML/DDL".into(),
1112 )),
1113 }
1114 }
1115
1116 /// Borrow the underlying engine. Escape hatch for callers
1117 /// that need access to `spg-engine` APIs not yet surfaced
1118 /// here (transactions, EXPLAIN ANALYZE, etc.).
1119 #[must_use]
1120 pub const fn engine(&self) -> &Engine {
1121 &self.engine
1122 }
1123
1124 /// Mutable borrow of the underlying engine. Same intent as
1125 /// `engine()` but for write-side APIs (e.g. inserting
1126 /// directly through `Catalog::insert` for high-throughput
1127 /// bulk loads that bypass SQL parsing).
1128 pub const fn engine_mut(&mut self) -> &mut Engine {
1129 &mut self.engine
1130 }
1131
1132 /// v7.16.0 — parse + plan a SQL string ONCE so subsequent
1133 /// `execute_prepared` / `query_prepared` calls can re-bind
1134 /// parameters without re-parsing. The returned [`Statement`]
1135 /// is a thin handle around the AST + cached source SQL; it's
1136 /// `Clone` so the same plan can drive many bind calls
1137 /// concurrently (each call clones the AST and runs
1138 /// placeholder substitution on the clone — the cached
1139 /// plan stays intact).
1140 ///
1141 /// Plan caching follows the engine's existing version-aware
1142 /// rule: a prepared `Statement` whose statistics version
1143 /// has rolled (ANALYZE ran between prepare and execute)
1144 /// will silently re-prepare under the hood. Callers don't
1145 /// need to detect this.
1146 ///
1147 /// Placeholders in the SQL use PG's `$1`, `$2`, … convention.
1148 /// `bind`-time `Value`s are passed as a slice; arity
1149 /// mismatches surface as `EvalError::PlaceholderOutOfRange`
1150 /// at `execute_prepared` time, not here.
1151 ///
1152 /// # Errors
1153 /// Surfaces `EngineError` (parse error / plan rewrite
1154 /// failure) from the underlying `Engine::prepare`.
1155 pub fn prepare(&mut self, sql: &str) -> Result<Statement, EngineError> {
1156 // Use the cached path so repeated prepares of the same
1157 // SQL are O(1). The engine's plan cache stays shared
1158 // across all callers of this Database — a single
1159 // `PgPool`-shaped consumer (or, later, the spg-sqlx
1160 // adapter) prepares once and reaps the win on every bind.
1161 let stmt = self
1162 .engine
1163 .prepare_cached(sql)
1164 .map_err(EngineError::Parse)?;
1165 Ok(Statement {
1166 stmt,
1167 sql: sql.to_string(),
1168 })
1169 }
1170
1171 /// v7.17.0 Phase 3.P0-66 — describe a SQL string without
1172 /// executing. Returns `(parameter_oid_count, output_columns)`
1173 /// where `output_columns` is empty for non-SELECT statements
1174 /// or for SELECT shapes the describe planner can't resolve
1175 /// (JOIN / subquery / unknown table). Wraps
1176 /// `Engine::describe_prepared` so the spg-sqlx bridge can
1177 /// surface PG-shape Describe replies for
1178 /// `sqlx::query!()` compile-time validation.
1179 ///
1180 /// # Errors
1181 /// Propagates parse errors from the underlying prepare path.
1182 pub fn describe(&mut self, sql: &str) -> Result<(Vec<u32>, Vec<ColumnSchema>), EngineError> {
1183 let stmt = self
1184 .engine
1185 .prepare_cached(sql)
1186 .map_err(EngineError::Parse)?;
1187 Ok(self.engine.describe_prepared(&stmt))
1188 }
1189
1190 /// v7.16.0 — execute a prepared statement with bound
1191 /// parameters. Mirrors `Engine::execute_prepared`: clones
1192 /// the AST, substitutes `$1..$N` → `params[0..N-1]`, runs.
1193 ///
1194 /// Persistence (WAL fsync + auto-checkpoint) follows the
1195 /// same rules as `execute(sql)`: mutating statements get a
1196 /// WAL record AFTER the in-memory exec succeeds. The WAL
1197 /// record carries the substituted, bind-final SQL, so
1198 /// replay reconstructs the same row state without needing
1199 /// the original prepared `Statement` to still be alive.
1200 ///
1201 /// # Errors
1202 /// Propagates engine errors. Param arity mismatch surfaces
1203 /// as `EvalError::PlaceholderOutOfRange`.
1204 pub fn execute_prepared(
1205 &mut self,
1206 stmt: &Statement,
1207 params: &[Value],
1208 ) -> Result<QueryResult, EngineError> {
1209 let result = self.engine.execute_prepared(stmt.stmt.clone(), params)?;
1210 // WAL persistence on the bind-final SQL. Build the
1211 // canonical Display form by re-printing the
1212 // placeholder-substituted statement (cheap — the AST
1213 // is already in hand from execute_prepared's internal
1214 // clone) so replay's path is identical to the
1215 // simple-query path.
1216 if self.persistence.is_some()
1217 && matches!(
1218 &result,
1219 QueryResult::CommandOk {
1220 modified_catalog: true,
1221 ..
1222 }
1223 )
1224 {
1225 // Render the AST back to SQL for WAL replay. The
1226 // placeholder positions are already substituted in
1227 // the executed clone; we re-substitute on a fresh
1228 // clone here purely to obtain the canonical text.
1229 let mut wal_stmt = stmt.stmt.clone();
1230 // Use the engine's substitute_placeholders entry —
1231 // exposed via execute_prepared above. Here we
1232 // re-run the substitution only for Display.
1233 crate::wal_render_with_params(&mut wal_stmt, params);
1234 let canonical = format!("{wal_stmt}");
1235 // v7.18 PITR — prepared path also emits v4 records so
1236 // LSN/timestamp coverage is uniform across simple and
1237 // extended query.
1238 let lsn = self.commit_lsn.fetch_add(1, Ordering::SeqCst) + 1;
1239 let ts = wall_clock_micros();
1240 let record = encode_v4_auto_commit(&canonical, lsn, ts);
1241 let p = self.persistence.as_mut().expect("checked above");
1242 p.wal.write_all(&record).map_err(io_err)?;
1243 p.wal.sync_data().map_err(io_err)?;
1244 p.wal_len = p.wal_len.saturating_add(record.len() as u64);
1245 if p.wal_len >= p.checkpoint_threshold_bytes {
1246 self.checkpoint()?;
1247 }
1248 }
1249 Ok(result)
1250 }
1251
1252 /// v7.16.0 — run a prepared SELECT with bound params and
1253 /// return rows as `Vec<Vec<Value>>`, matching `query()`
1254 /// shape. SELECTs are read-only so this never writes the
1255 /// WAL.
1256 ///
1257 /// # Errors
1258 /// Returns `Unsupported` if the prepared statement isn't a
1259 /// SELECT (use `execute_prepared` for DML/DDL).
1260 pub fn query_prepared(
1261 &mut self,
1262 stmt: &Statement,
1263 params: &[Value],
1264 ) -> Result<Vec<Vec<Value>>, EngineError> {
1265 match self.engine.execute_prepared(stmt.stmt.clone(), params)? {
1266 QueryResult::Rows { rows, .. } => Ok(rows.into_iter().map(|r| r.values).collect()),
1267 QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
1268 "query_prepared() expects a SELECT — use execute_prepared() for DML/DDL".into(),
1269 )),
1270 _ => Err(EngineError::Unsupported(
1271 "query_prepared() expects a SELECT — use execute_prepared() for DML/DDL".into(),
1272 )),
1273 }
1274 }
1275
1276 /// v7.18 — parse + plan a SQL string against a
1277 /// `CatalogSnapshot`. Mirror of [`Database::prepare`] for the
1278 /// readonly fan-out path: no writer lock taken, no WAL write,
1279 /// no plan-cache mutation. Static-on-`Self` so callers can
1280 /// dispatch against a snapshot without an `&mut Database`
1281 /// borrow — `AsyncReadHandle::prepare` in spg-embedded-tokio
1282 /// is the load-bearing consumer.
1283 ///
1284 /// # Errors
1285 /// Propagates `EngineError::Parse` from the parser.
1286 pub fn prepare_on_snapshot(
1287 snapshot: &CatalogSnapshot,
1288 sql: &str,
1289 ) -> Result<Statement, EngineError> {
1290 let stmt = spg_engine::Engine::prepare_on_snapshot(snapshot, sql)
1291 .map_err(EngineError::Parse)?;
1292 Ok(Statement {
1293 stmt,
1294 sql: sql.to_string(),
1295 })
1296 }
1297
1298 /// v7.18 — execute a prepared `Statement` against a
1299 /// `CatalogSnapshot` with bound params. Mirror of
1300 /// [`Database::execute_prepared`] on the readonly path:
1301 /// writes / DDL hit `EngineError::WriteRequired`. No WAL
1302 /// write, no writer lock, multiple snapshots can run
1303 /// concurrently — the snapshot is immutable from prepare time.
1304 ///
1305 /// # Errors
1306 /// Surfaces `EngineError::WriteRequired` for non-readonly
1307 /// statements; propagates other engine errors.
1308 pub fn execute_prepared_on_snapshot(
1309 snapshot: &CatalogSnapshot,
1310 stmt: &Statement,
1311 params: &[Value],
1312 ) -> Result<QueryResult, EngineError> {
1313 spg_engine::Engine::execute_readonly_prepared_on_snapshot(
1314 snapshot,
1315 stmt.stmt.clone(),
1316 params,
1317 )
1318 }
1319
1320 /// v7.18 — describe a SQL string against a
1321 /// `CatalogSnapshot`. Mirror of [`Database::describe`] on
1322 /// the readonly path. Pure function on the snapshot's
1323 /// catalog; safe to call from any thread.
1324 ///
1325 /// # Errors
1326 /// Propagates `EngineError::Parse` from the parser.
1327 pub fn describe_on_snapshot(
1328 snapshot: &CatalogSnapshot,
1329 sql: &str,
1330 ) -> Result<(Vec<u32>, Vec<ColumnSchema>), EngineError> {
1331 let stmt = spg_engine::Engine::prepare_on_snapshot(snapshot, sql)
1332 .map_err(EngineError::Parse)?;
1333 Ok(spg_engine::Engine::describe_prepared_on_snapshot(
1334 snapshot, &stmt,
1335 ))
1336 }
1337
1338 /// v7.2.0 — run `body` inside an implicit `BEGIN` /
1339 /// `COMMIT` pair. The body receives `&mut Database` so it
1340 /// can `execute()` / `query()` like any other code path;
1341 /// the only difference is that every write in the body
1342 /// lands inside one transaction, and a returned `Err` from
1343 /// the body triggers `ROLLBACK` before the error propagates.
1344 ///
1345 /// Nested calls are not supported — SPG's transaction
1346 /// model is single-writer with explicit `BEGIN` /
1347 /// `COMMIT` / `ROLLBACK`, and a nested `with_transaction`
1348 /// would hit `EngineError::Unsupported("nested
1349 /// transaction")` at the inner `BEGIN`.
1350 pub fn with_transaction<R, F>(&mut self, body: F) -> Result<R, EngineError>
1351 where
1352 F: FnOnce(&mut Self) -> Result<R, EngineError>,
1353 {
1354 self.execute("BEGIN")?;
1355 match body(self) {
1356 Ok(value) => {
1357 self.execute("COMMIT")?;
1358 Ok(value)
1359 }
1360 Err(e) => {
1361 // Best-effort rollback. If ROLLBACK itself
1362 // fails (rare — the engine reports it via
1363 // `Unsupported` only when there's no active
1364 // TX, which can't happen here) we surface the
1365 // original body error, not the rollback error.
1366 let _ = self.execute("ROLLBACK");
1367 Err(e)
1368 }
1369 }
1370 }
1371}
1372
1373impl Default for Database {
1374 fn default() -> Self {
1375 Self::open_in_memory()
1376 }
1377}
1378
1379/// v7.7.5 — observability snapshot returned by
1380/// [`Database::metrics`]. Plain data, no allocations beyond
1381/// what the struct itself takes; cheap to construct and
1382/// cheap to serialise.
1383#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1384#[non_exhaustive]
1385pub struct EmbeddedMetrics {
1386 /// Total live row count across every user table (hot
1387 /// tier only — cold-tier rows live in segment files).
1388 pub hot_rows: u64,
1389 /// Sum of `Table::hot_bytes` across every user table.
1390 /// Tracks against the freezer's `hot_tier_bytes` budget.
1391 pub hot_bytes: u64,
1392 /// Number of cold-tier segments registered in the catalog.
1393 /// Includes tombstoned slots (segments retired by
1394 /// compaction whose disk file may still be on disk).
1395 pub cold_segments: u64,
1396 /// User-table count (excludes any future engine-managed
1397 /// internal tables).
1398 pub tables: u64,
1399 /// WAL size at last `execute()` / `checkpoint()`. Zero
1400 /// when the database is in-memory.
1401 pub wal_bytes: u64,
1402 /// `true` when the database was opened with `open_path` —
1403 /// i.e. WAL + checkpoint persistence is active.
1404 pub persistent: bool,
1405}
1406
1407/// v7.2.1 — handle returned by `spawn_background_freezer`.
1408/// Drop signals the worker thread to wind down + joins it,
1409/// so a `Database` (or its shared `Arc<Mutex<Database>>`)
1410/// can safely drop after the handle does.
1411#[must_use = "the background freezer keeps running until this handle is dropped"]
1412#[derive(Debug)]
1413pub struct FreezerHandle {
1414 shutdown: Arc<AtomicBool>,
1415 join: Option<JoinHandle<()>>,
1416}
1417
1418impl FreezerHandle {
1419 /// v7.2.1 — request the worker stop + join. Idempotent;
1420 /// safe to call from `Drop` (which also calls it).
1421 pub fn stop(&mut self) {
1422 self.shutdown.store(true, Ordering::Release);
1423 if let Some(h) = self.join.take() {
1424 let _ = h.join();
1425 }
1426 }
1427}
1428
1429impl Drop for FreezerHandle {
1430 fn drop(&mut self) {
1431 self.stop();
1432 }
1433}
1434
1435/// v7.2.1 — knobs for `Database::spawn_background_freezer`.
1436#[derive(Debug, Clone)]
1437pub struct FreezerOptions {
1438 /// Tick interval. Worker wakes every `tick`, checks the
1439 /// catalog's `hot_tier_bytes`, and freezes if over budget.
1440 pub tick: Duration,
1441 /// Hot-tier byte budget. Exceeded → next tick freezes the
1442 /// largest table's oldest `batch_rows` rows into a new
1443 /// cold segment.
1444 pub hot_tier_bytes: u64,
1445 /// Max rows the freezer demotes per fire.
1446 pub batch_rows: usize,
1447 /// v7.7.4 — auto-compact threshold. When the catalog has
1448 /// at least this many cold segments across all tables, the
1449 /// freezer fires a compaction pass after its next freeze.
1450 /// Set to `usize::MAX` to disable auto-compact entirely;
1451 /// the default is `64`, matching the `spg-server` operating
1452 /// point for SPG_COLD_COMPACT_SEGMENT_THRESHOLD.
1453 pub compact_when_segments_exceed: usize,
1454 /// v7.7.4 — target segment size for compaction merges,
1455 /// in bytes. Default 64 MiB, mirroring `spg-server`. Small
1456 /// segments below this size are merge candidates;
1457 /// segments at or above stay untouched.
1458 pub compact_target_bytes: u64,
1459}
1460
1461impl Default for FreezerOptions {
1462 fn default() -> Self {
1463 // Match the `spg-server` freezer's default operating
1464 // point (SPG_HOT_TIER_BYTES = 4 GiB, batch 1000 rows,
1465 // tick every 1 s) so embedded behaviour is predictable
1466 // for operators familiar with the server.
1467 Self {
1468 tick: Duration::from_secs(1),
1469 hot_tier_bytes: 4 * 1024 * 1024 * 1024,
1470 batch_rows: 1000,
1471 compact_when_segments_exceed: 64,
1472 compact_target_bytes: 64 * 1024 * 1024,
1473 }
1474 }
1475}
1476
1477impl Database {
1478 /// v7.7.4 — observe the catalog's cold-segment count.
1479 /// Useful for tests + dashboards that want to verify
1480 /// auto-compaction is firing.
1481 #[must_use]
1482 pub fn cold_segment_count(&self) -> usize {
1483 self.engine.catalog().cold_segment_count()
1484 }
1485
1486 /// v7.7.5 — observability snapshot. Returns a point-in-time
1487 /// view of the engine + persistence counters. Cheap (no
1488 /// locks beyond the existing `&self` borrow), so safe to
1489 /// call from a hot metrics-scrape path.
1490 ///
1491 /// Fields mirror the operational dashboard
1492 /// [`spg-server`](https://crates.io/crates/spg-server) exposes,
1493 /// minus the network counters that don't apply to embedded.
1494 #[must_use]
1495 pub fn metrics(&self) -> EmbeddedMetrics {
1496 let cat = self.engine.catalog();
1497 let mut hot_rows: u64 = 0;
1498 let mut hot_bytes: u64 = 0;
1499 for name in cat.table_names() {
1500 if let Some(t) = cat.get(&name) {
1501 hot_rows = hot_rows.saturating_add(t.row_count() as u64);
1502 hot_bytes = hot_bytes.saturating_add(t.hot_bytes());
1503 }
1504 }
1505 let (wal_bytes, persistent) = match &self.persistence {
1506 Some(p) => (p.wal_len, true),
1507 None => (0, false),
1508 };
1509 EmbeddedMetrics {
1510 hot_rows,
1511 hot_bytes,
1512 cold_segments: cat.cold_segment_count() as u64,
1513 tables: cat.table_count() as u64,
1514 wal_bytes,
1515 persistent,
1516 }
1517 }
1518
1519 /// v7.2.1 — spawn a background thread that periodically
1520 /// runs `freeze_oldest_to_cold` when the catalog-wide hot
1521 /// tier exceeds `opts.hot_tier_bytes`. The `Arc<Mutex<_>>`
1522 /// pattern matches the v7.2 sharing story: callers wrap
1523 /// their `Database` in `Arc::new(Mutex::new(db))` once,
1524 /// then clone the Arc for the worker + for foreground
1525 /// access. Return value is a handle whose `Drop` joins the
1526 /// worker.
1527 ///
1528 /// Picks the freeze target the same way `spg-server`'s
1529 /// freezer does: largest-`hot_bytes` user table with at
1530 /// least one BTree integer-PK index. Tables without a
1531 /// freezable index are skipped silently.
1532 pub fn spawn_background_freezer(
1533 db: Arc<Mutex<Database>>,
1534 opts: FreezerOptions,
1535 ) -> FreezerHandle {
1536 let shutdown = Arc::new(AtomicBool::new(false));
1537 let shutdown_for_thread = Arc::clone(&shutdown);
1538 let join = thread::Builder::new()
1539 .name("spg-embedded-freezer".into())
1540 .spawn(move || {
1541 background_freezer_loop(db, opts, shutdown_for_thread);
1542 })
1543 .expect("spawn background freezer thread");
1544 FreezerHandle {
1545 shutdown,
1546 join: Some(join),
1547 }
1548 }
1549}
1550
1551/// v7.2.1 — the freezer's main loop, factored out so the
1552/// `Database::spawn_background_freezer` path stays readable.
1553fn background_freezer_loop(
1554 db: Arc<Mutex<Database>>,
1555 opts: FreezerOptions,
1556 shutdown: Arc<AtomicBool>,
1557) {
1558 // Sleep in short slices so a shutdown request resolves
1559 // quickly (vs sleeping the full tick).
1560 let slice = Duration::from_millis(50.min(opts.tick.as_millis() as u64));
1561 let mut last_tick = std::time::Instant::now();
1562 loop {
1563 if shutdown.load(Ordering::Acquire) {
1564 return;
1565 }
1566 thread::sleep(slice);
1567 if last_tick.elapsed() < opts.tick {
1568 continue;
1569 }
1570 last_tick = std::time::Instant::now();
1571 let Ok(mut guard) = db.lock() else {
1572 return;
1573 };
1574 if guard.engine.catalog().hot_tier_bytes() <= opts.hot_tier_bytes {
1575 continue;
1576 }
1577 let Some((table, index)) = pick_freeze_target(&guard) else {
1578 continue;
1579 };
1580 let row_count = guard
1581 .engine
1582 .catalog()
1583 .get(&table)
1584 .map_or(0, spg_storage::Table::row_count);
1585 let to_freeze = opts.batch_rows.min(row_count);
1586 if to_freeze == 0 {
1587 continue;
1588 }
1589 if let Err(e) = guard.freeze_oldest_to_cold(&table, &index, to_freeze) {
1590 eprintln!("spg-embedded: background freeze on {table}.{index} failed: {e:?}");
1591 continue;
1592 }
1593 // v7.7.4 — auto-compact. If the catalog now carries
1594 // more cold segments than the configured threshold,
1595 // run a single compaction pass. Failures are reported
1596 // but don't kill the loop; the next tick will retry.
1597 let count = guard.engine.catalog().cold_segment_count();
1598 if count > opts.compact_when_segments_exceed {
1599 if let Err(e) = guard
1600 .engine
1601 .compact_cold_segments_with_target(opts.compact_target_bytes)
1602 {
1603 eprintln!(
1604 "spg-embedded: background compact failed (segments={count}, \
1605 threshold={}): {e:?}",
1606 opts.compact_when_segments_exceed,
1607 );
1608 }
1609 }
1610 }
1611}
1612
1613/// v7.2.1 — pick the highest-`hot_bytes` user table with a
1614/// BTree integer-PK index. Returns `(table, index_name)` so the
1615/// caller can dispatch through `freeze_oldest_to_cold`.
1616fn pick_freeze_target(db: &Database) -> Option<(String, String)> {
1617 let cat = db.engine.catalog();
1618 let mut best: Option<(String, String, u64)> = None;
1619 for name in cat.table_names() {
1620 let Some(t) = cat.get(&name) else { continue };
1621 if t.row_count() == 0 {
1622 continue;
1623 }
1624 let cols = &t.schema().columns;
1625 let Some(idx) = t.indices().iter().find(|i| {
1626 matches!(i.kind, spg_storage::IndexKind::BTree(_))
1627 && i.column_position < cols.len()
1628 && matches!(
1629 cols[i.column_position].ty,
1630 spg_storage::DataType::SmallInt
1631 | spg_storage::DataType::Int
1632 | spg_storage::DataType::BigInt
1633 )
1634 }) else {
1635 continue;
1636 };
1637 let hot = t.hot_bytes();
1638 match best {
1639 None => best = Some((name, idx.name.clone(), hot)),
1640 Some((_, _, best_hot)) if hot > best_hot => {
1641 best = Some((name, idx.name.clone(), hot));
1642 }
1643 _ => {}
1644 }
1645 }
1646 best.map(|(t, i, _)| (t, i))
1647}
1648
1649/// v7.7.6 — replay the first `to_seq` records of the WAL at
1650/// `wal_path` into a fresh engine and write the resulting
1651/// catalog snapshot to `out_db_path`. Same semantics as
1652/// `spg revert --wal … --to-seq N --out …` from the CLI:
1653///
1654/// - `to_seq == 0` → snapshot is the empty catalog
1655/// - WAL records beyond `to_seq` are not applied
1656/// - durability-checkpoint markers (v3 type 0x02) are
1657/// consumed without counting against the budget
1658///
1659/// Returns the number of statements actually applied
1660/// (`≤ to_seq`). The output snapshot is byte-identical to
1661/// what `Database::open_path(out_db_path)` would consume on
1662/// a subsequent open.
1663///
1664/// This is the "rewind" operator for an embedded database
1665/// that has been corrupted by a poison statement or a
1666/// half-applied migration. Pair with `cold_segment_paths`
1667/// preservation if your cold-tier files are still on disk.
1668///
1669/// # Errors
1670///
1671/// - `wal_path` unreadable or truncated mid-record
1672/// - WAL record decodes to invalid UTF-8 SQL
1673/// - WAL record's SQL is rejected by the engine
1674/// - `out_db_path` unwritable
1675pub fn revert_wal_to_seq(
1676 wal_path: impl AsRef<Path>,
1677 to_seq: u64,
1678 out_db_path: impl AsRef<Path>,
1679) -> Result<u64, EngineError> {
1680 let wal_bytes = std::fs::read(wal_path.as_ref()).map_err(io_err)?;
1681 let mut engine = Engine::new();
1682 let mut applied = 0u64;
1683 let mut cur = 0usize;
1684 while cur < wal_bytes.len() && applied < to_seq {
1685 let (sql_bytes, total) = decode_wal_record(&wal_bytes[cur..])?;
1686 cur += total;
1687 if sql_bytes.is_empty() {
1688 continue;
1689 }
1690 let sql = core::str::from_utf8(&sql_bytes).map_err(|e| {
1691 EngineError::Storage(spg_storage::StorageError::Corrupt(format!(
1692 "WAL record at offset {cur}: non-UTF-8 SQL: {e}"
1693 )))
1694 })?;
1695 engine.execute(sql)?;
1696 applied += 1;
1697 }
1698 let snapshot = engine.snapshot();
1699 std::fs::write(out_db_path.as_ref(), &snapshot).map_err(io_err)?;
1700 Ok(applied)
1701}
1702
1703/// v7.7.6 — decode one WAL record from a byte tail. Returns
1704/// `(sql_bytes, header_plus_payload_len)`. Handles the three
1705/// on-disk formats (v1 / v2 / v3) the same way the CLI
1706/// `decode_one_record` and the engine's `replay_wal_bytes`
1707/// do. CRCs are not re-validated; the caller's intent is
1708/// "apply", not "validate".
1709fn decode_wal_record(tail: &[u8]) -> Result<(Vec<u8>, usize), EngineError> {
1710 if tail.len() < 4 {
1711 return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
1712 format!("WAL truncated record: {} < 4 header bytes", tail.len()),
1713 )));
1714 }
1715 let raw_len = u32::from_le_bytes(tail[..4].try_into().unwrap());
1716 let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
1717 let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
1718 let len_mask = if is_v3 {
1719 !(WAL_V2_SENTINEL | WAL_V3_FLAG)
1720 } else {
1721 !WAL_V2_SENTINEL
1722 };
1723 let rec_len = (raw_len & len_mask) as usize;
1724 let header_len = if is_v3 {
1725 9
1726 } else if is_v2 {
1727 8
1728 } else {
1729 4
1730 };
1731 if tail.len() < header_len + rec_len {
1732 return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
1733 format!(
1734 "WAL truncated record: header+payload {} > available {}",
1735 header_len + rec_len,
1736 tail.len()
1737 ),
1738 )));
1739 }
1740 if is_v3 {
1741 let type_byte = tail[8];
1742 // v3 type 0x01 = auto_commit_sql (payload = SQL).
1743 // v3 type 0x02 = durability marker (no SQL to apply).
1744 // v4 type 0x10 = auto_commit_sql with 16-byte (lsn, ts)
1745 // prefix between type and SQL — strip
1746 // the prefix so the caller still sees raw
1747 // SQL bytes.
1748 // Anything else is unknown.
1749 if type_byte == WAL_V3_TYPE_AUTO_COMMIT_SQL {
1750 let payload = &tail[header_len..header_len + rec_len];
1751 return Ok((payload.to_vec(), header_len + rec_len));
1752 }
1753 if type_byte == WAL_V4_TYPE_AUTO_COMMIT_SQL {
1754 let v4_total = header_len + WAL_V4_EXTRA_HEADER + rec_len;
1755 if tail.len() < v4_total {
1756 return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
1757 format!(
1758 "WAL truncated v4 record: header+payload {v4_total} > available {}",
1759 tail.len()
1760 ),
1761 )));
1762 }
1763 let sql_start = header_len + WAL_V4_EXTRA_HEADER;
1764 let sql_bytes = tail[sql_start..sql_start + rec_len].to_vec();
1765 return Ok((sql_bytes, v4_total));
1766 }
1767 // Caller treats empty payload as a skip-marker.
1768 return Ok((Vec::new(), header_len + rec_len));
1769 }
1770 let payload = &tail[header_len..header_len + rec_len];
1771 Ok((payload.to_vec(), header_len + rec_len))
1772}
1773
1774impl Drop for Database {
1775 fn drop(&mut self) {
1776 // v7.1 — best-effort final checkpoint when a persistent
1777 // Database leaves scope. Failures here go to stderr so
1778 // operators see them, but Drop can't propagate errors —
1779 // the WAL itself is already durable, so a checkpoint
1780 // miss only means the next boot replays a few more
1781 // records than strictly necessary.
1782 if self.persistence.is_some() {
1783 if let Err(e) = self.checkpoint() {
1784 eprintln!(
1785 "spg-embedded: final checkpoint on Drop failed: {e:?} \
1786 (WAL is intact; next open_path will replay)"
1787 );
1788 }
1789 }
1790 // v7.17.0 Phase 6.2 — release the cross-process lock on
1791 // clean shutdown. Failure is logged but never panics;
1792 // the operator can clear a stale lock via
1793 // `Database::force_unlock` if a crash kept the
1794 // directory around.
1795 if let Some(ctx) = &self.persistence
1796 && ctx.lock_path.exists()
1797 {
1798 if let Err(e) = std::fs::remove_dir(&ctx.lock_path) {
1799 eprintln!(
1800 "spg-embedded: lock release on Drop failed for {}: {e:?}",
1801 ctx.lock_path.display()
1802 );
1803 }
1804 }
1805 }
1806}
1807
1808impl Database {
1809 /// v7.17.0 Phase 6.2 — clear a stale cross-process lock.
1810 /// Use when a previous process crashed mid-session and
1811 /// left `<db_path>.lock` behind. Operators should confirm
1812 /// no other process is currently using the database before
1813 /// calling this — SPG cannot fingerprint stale-vs-live
1814 /// without a libc dep, which would violate spg-embedded's
1815 /// zero-deps charter.
1816 pub fn force_unlock(db_path: impl AsRef<Path>) -> Result<(), EngineError> {
1817 let lock_path = {
1818 let mut p = db_path.as_ref().to_path_buf();
1819 let name = p
1820 .file_name()
1821 .map(|n| {
1822 let mut s = n.to_os_string();
1823 s.push(".lock");
1824 s
1825 })
1826 .unwrap_or_else(|| std::ffi::OsString::from(".lock"));
1827 p.set_file_name(name);
1828 p
1829 };
1830 if !lock_path.exists() {
1831 return Ok(());
1832 }
1833 std::fs::remove_dir(&lock_path).map_err(io_err)
1834 }
1835}
1836
1837/// v7.1 — turn a `std::io::Error` into the workspace's
1838/// `EngineError` shape. `EngineError::Storage(Corrupt(_))` is
1839/// the closest existing variant — io failures during boot or
1840/// during a WAL append surface as a storage-layer fault to
1841/// callers, which keeps the public error enum unchanged.
1842fn io_err(e: std::io::Error) -> EngineError {
1843 EngineError::Storage(spg_storage::StorageError::Corrupt(format!("io: {e}")))
1844}
1845
1846/// v7.2.2 — `Database` is `Send`, so the recommended sharing
1847/// pattern for multi-threaded callers is `Arc<Mutex<Database>>`:
1848///
1849/// ```no_run
1850/// use std::sync::{Arc, Mutex};
1851/// use spg_embedded::Database;
1852///
1853/// let db = Database::open_in_memory();
1854/// let shared = Arc::new(Mutex::new(db));
1855/// let shared_for_worker = Arc::clone(&shared);
1856/// std::thread::spawn(move || {
1857/// let mut guard = shared_for_worker.lock().unwrap();
1858/// guard.execute("INSERT INTO t VALUES (1)").unwrap();
1859/// });
1860/// ```
1861///
1862/// Internal `RwLock`-wrapped state — letting many threads
1863/// hold concurrent `&Database` for `SELECT` without contending
1864/// — is parked as STABILITY § "Out of v7.2"; multi-reader
1865/// embedded throughput needs a planner-side change to release
1866/// the engine read lock between scans, which is the v7.x
1867/// "Choice A" line of work already documented in v6.9.1's
1868/// carve-out.
1869#[allow(dead_code)]
1870fn _database_is_send() {
1871 fn assert_send<T: Send>() {}
1872 assert_send::<Database>();
1873}
1874
1875/// v6.10.3 — trait that maps a row's columns onto a user
1876/// struct's fields. v7.3.0 ships the [`spg_row!`] declarative
1877/// macro that generates `impl FromSpgRow for YourStruct` from
1878/// a struct definition (no proc-macro, no syn/quote/
1879/// proc-macro2 deps — the workspace's "0 external deps"
1880/// policy holds).
1881///
1882/// Implementors map a row's columns onto a user struct's
1883/// fields. Errors surface as `EngineError::Unsupported` so the
1884/// caller's error type stays uniform.
1885pub trait FromSpgRow: Sized {
1886 /// Decode one query result row into `Self`. Called once per
1887 /// row by [`Database::query_typed`]. The slice length equals
1888 /// the number of columns in the SELECT projection.
1889 fn from_spg_row(row: &[Value]) -> Result<Self, EngineError>;
1890}
1891
1892/// v7.3.0 — declarative macro that generates `FromSpgRow` impl
1893/// for a user struct. Avoids proc-macro deps
1894/// (syn/quote/proc-macro2) so the workspace's 0-deps policy
1895/// holds; the trade-off vs `#[derive(SpgRow)]` is that the
1896/// macro takes the entire struct definition (fields + types)
1897/// as input rather than annotating an existing struct.
1898///
1899/// ```no_run
1900/// use spg_embedded::{Database, spg_row, FromSpgRow};
1901///
1902/// spg_row! {
1903/// pub struct User {
1904/// pub id: i32,
1905/// pub name: String,
1906/// }
1907/// }
1908///
1909/// let mut db = Database::open_in_memory();
1910/// db.execute("CREATE TABLE users (id INT NOT NULL, name TEXT)").unwrap();
1911/// db.execute("INSERT INTO users VALUES (1, 'alice')").unwrap();
1912/// let users: Vec<User> = db.query_typed("SELECT id, name FROM users").unwrap();
1913/// ```
1914///
1915/// Supported field types: `i16`, `i32`, `i64`, `f32`, `f64`,
1916/// `bool`, `String`, `Vec<f32>` (for `VECTOR(N)` columns),
1917/// `Option<T>` of any of the above.
1918#[macro_export]
1919macro_rules! spg_row {
1920 (
1921 $(#[$meta:meta])*
1922 $vis:vis struct $name:ident {
1923 $(
1924 $(#[$fmeta:meta])*
1925 $fvis:vis $field:ident : $ty:ty,
1926 )*
1927 }
1928 ) => {
1929 $(#[$meta])*
1930 #[derive(Debug, Clone)]
1931 $vis struct $name {
1932 $(
1933 $(#[$fmeta])*
1934 $fvis $field : $ty,
1935 )*
1936 }
1937
1938 impl $crate::FromSpgRow for $name {
1939 fn from_spg_row(row: &[$crate::Value]) -> ::core::result::Result<Self, $crate::EngineError> {
1940 let mut __spg_row_iter = row.iter();
1941 $(
1942 let $field: $ty = {
1943 let v = __spg_row_iter
1944 .next()
1945 .ok_or_else(|| $crate::EngineError::Unsupported(
1946 ::std::format!(
1947 "spg_row! {}: missing column for field `{}`",
1948 ::core::stringify!($name),
1949 ::core::stringify!($field)
1950 )
1951 ))?;
1952 <$ty as $crate::FromSpgValue>::from_spg_value(v)
1953 .map_err(|e| $crate::EngineError::Unsupported(
1954 ::std::format!(
1955 "spg_row! {}: column `{}`: {}",
1956 ::core::stringify!($name),
1957 ::core::stringify!($field),
1958 e
1959 )
1960 ))?
1961 };
1962 )*
1963 Ok(Self { $($field,)* })
1964 }
1965 }
1966 };
1967}
1968
1969/// v7.3.0 — per-column decoder used by `spg_row!`. Surface
1970/// covers every numeric / text / bytes / bool variant in
1971/// `Value`, plus `Option<T>` for nullable columns.
1972pub trait FromSpgValue: Sized {
1973 /// Decode one cell into `Self`. The returned `&'static str`
1974 /// is a short diagnostic for type mismatches (e.g. `"expected
1975 /// integer, got TEXT"`); callers wrap it into their own
1976 /// error type.
1977 fn from_spg_value(v: &Value) -> Result<Self, &'static str>;
1978}
1979
1980macro_rules! impl_from_value_int {
1981 ($($t:ty),* $(,)?) => {
1982 $(
1983 impl FromSpgValue for $t {
1984 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
1985 match v {
1986 Value::SmallInt(n) => <$t>::try_from(*n).map_err(|_| "SmallInt does not fit target int type"),
1987 Value::Int(n) => <$t>::try_from(*n).map_err(|_| "Int does not fit target int type"),
1988 Value::BigInt(n) => <$t>::try_from(*n).map_err(|_| "BigInt does not fit target int type"),
1989 Value::Null => Err("NULL in non-Option int column"),
1990 _ => Err("non-integer value in int column"),
1991 }
1992 }
1993 }
1994 )*
1995 };
1996}
1997impl_from_value_int!(i16, i32, i64);
1998
1999impl FromSpgValue for f32 {
2000 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
2001 match v {
2002 Value::Float(f) => Ok(*f as f32),
2003 Value::Null => Err("NULL in non-Option float column"),
2004 _ => Err("non-float value in float column"),
2005 }
2006 }
2007}
2008
2009impl FromSpgValue for f64 {
2010 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
2011 match v {
2012 Value::Float(f) => Ok(*f),
2013 Value::Null => Err("NULL in non-Option float column"),
2014 _ => Err("non-float value in float column"),
2015 }
2016 }
2017}
2018
2019impl FromSpgValue for bool {
2020 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
2021 match v {
2022 Value::Bool(b) => Ok(*b),
2023 Value::Null => Err("NULL in non-Option bool column"),
2024 _ => Err("non-bool value in bool column"),
2025 }
2026 }
2027}
2028
2029impl FromSpgValue for String {
2030 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
2031 match v {
2032 Value::Text(s) => Ok(s.clone()),
2033 Value::Null => Err("NULL in non-Option text column"),
2034 _ => Err("non-text value in String column"),
2035 }
2036 }
2037}
2038
2039impl FromSpgValue for Vec<f32> {
2040 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
2041 match v {
2042 Value::Vector(xs) => Ok(xs.clone()),
2043 Value::Null => Err("NULL in non-Option vector column"),
2044 _ => Err("non-vector value in Vec<f32> column"),
2045 }
2046 }
2047}
2048
2049impl<T: FromSpgValue> FromSpgValue for Option<T> {
2050 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
2051 match v {
2052 Value::Null => Ok(None),
2053 other => T::from_spg_value(other).map(Some),
2054 }
2055 }
2056}
2057
2058#[cfg(test)]
2059mod tests {
2060 use super::*;
2061
2062 #[test]
2063 fn in_memory_create_insert_select() {
2064 let mut db = Database::open_in_memory();
2065 db.execute("CREATE TABLE t (id INT NOT NULL, name TEXT)")
2066 .unwrap();
2067 db.execute("INSERT INTO t VALUES (1, 'alice')").unwrap();
2068 db.execute("INSERT INTO t VALUES (2, 'bob')").unwrap();
2069 let rows = db.query("SELECT id FROM t WHERE id = 1").unwrap();
2070 assert_eq!(rows.len(), 1);
2071 match &rows[0][0] {
2072 Value::Int(1) => {}
2073 other => panic!("expected Int(1), got {other:?}"),
2074 }
2075 }
2076
2077 #[test]
2078 fn query_on_non_select_errors() {
2079 let mut db = Database::open_in_memory();
2080 db.execute("CREATE TABLE t (id INT)").unwrap();
2081 let r = db.query("INSERT INTO t VALUES (1)");
2082 assert!(r.is_err(), "query() on INSERT must error");
2083 }
2084
2085 #[test]
2086 fn snapshot_roundtrip() {
2087 let mut db = Database::open_in_memory();
2088 db.execute("CREATE TABLE t (id INT NOT NULL)").unwrap();
2089 db.execute("INSERT INTO t VALUES (42)").unwrap();
2090 let bytes = db.snapshot();
2091 let mut restored = Database::restore(&bytes).unwrap();
2092 let rows = restored.query("SELECT id FROM t WHERE id = 42").unwrap();
2093 assert_eq!(rows.len(), 1);
2094 match &rows[0][0] {
2095 Value::Int(42) => {}
2096 other => panic!("expected Int(42), got {other:?}"),
2097 }
2098 }
2099
2100 #[test]
2101 fn from_spg_row_trait_shape() {
2102 struct User {
2103 _id: i32,
2104 }
2105 impl FromSpgRow for User {
2106 fn from_spg_row(row: &[Value]) -> Result<Self, EngineError> {
2107 match row.first() {
2108 Some(Value::Int(n)) => Ok(Self { _id: *n }),
2109 _ => Err(EngineError::Unsupported("bad id".into())),
2110 }
2111 }
2112 }
2113 let row = vec![Value::Int(7)];
2114 let _u = User::from_spg_row(&row).unwrap();
2115 }
2116}