spg_embedded/lib.rs
1// v7.7.2 — every public item in this crate must carry a
2// doc-comment; new code that adds a `pub` without one fails CI.
3#![deny(missing_docs)]
4
5//! # spg-embedded
6//!
7//! Ergonomic embedded-mode entry point for SPG. Wraps the
8//! `spg-engine` execution layer for in-process applications
9//! that don't want to spin up a TCP listener / fork to the
10//! `spg-server` binary.
11//!
12//! ## Quick start
13//!
14//! ```no_run
15//! use spg_embedded::Database;
16//!
17//! // On-disk, durable. WAL fsynced per commit; auto-checkpoint
18//! // at 4 MiB WAL by default.
19//! let mut db = Database::open_path("/data/app.db").unwrap();
20//! db.execute("CREATE TABLE users (id INT NOT NULL, name TEXT)").unwrap();
21//! db.execute("INSERT INTO users VALUES (1, 'alice')").unwrap();
22//! let rows = db.query("SELECT name FROM users WHERE id = 1").unwrap();
23//! for row in &rows {
24//! println!("{:?}", row);
25//! }
26//! ```
27//!
28//! ## Production checklist (v7.5)
29//!
30//! - **Persistence**: `Database::open_path(p)` writes a
31//! crash-consistent WAL + periodic checkpoint snapshot. The
32//! on-disk format is byte-identical to what `spg-server`
33//! produces, so a database can move between modes without
34//! conversion.
35//! - **Durability**: every `execute()` that mutates calls
36//! `fsync` before returning `Ok`. There is no group commit
37//! in embedded mode — every commit pays one fsync. If you
38//! need batch throughput, wrap multiple statements in
39//! [`Database::with_transaction`] which fsyncs only at
40//! commit.
41//! - **Concurrency**: [`Database`] is `Send` but **not** `Sync`.
42//! Share across threads via `Arc<Mutex<Database>>`. The
43//! single-writer model is intentional — see
44//! [STABILITY § A1](https://github.com/lihao/spg/blob/master/STABILITY.md).
45//! - **Background work**: [`Database::spawn_background_freezer`]
46//! moves cold rows to disk-resident segments while you keep
47//! serving requests. It runs in a dedicated thread; drop the
48//! returned [`FreezerHandle`] (or call `stop()`) for clean
49//! shutdown.
50//! - **Errors**: all public enums ([`EngineError`],
51//! [`QueryResult`], [`Value`]) are `#[non_exhaustive]`. Match
52//! them with a wildcard arm so future v7.x releases can add
53//! variants without breaking your code.
54//!
55//! ## Panic contract
56//!
57//! - **No `execute()` / `query()` call panics on user input.**
58//! Malformed SQL, type mismatches, missing tables — all
59//! return `Err(EngineError::…)`. If you observe a panic on
60//! a user-controlled string, that is a bug; file an issue.
61//! - The library panics **only** on internal invariant
62//! violations (e.g., catalog snapshot magic mismatch, WAL
63//! record CRC sentinel corruption that survived the boot-
64//! time validation). These represent silent disk corruption
65//! and an unwind would leak inconsistent state, so the
66//! release profile uses `panic = abort` — your host process
67//! dies fast rather than continuing on poisoned data.
68//! - If you cannot tolerate `panic = abort`, build with
69//! `--profile release-dbg` (keeps unwind tables) and use
70//! `std::panic::catch_unwind` at your application boundary.
71//!
72//! ## Why a separate crate?
73//!
74//! `spg-engine` is `no_std`-compatible (vendored alloc-only).
75//! The embedded-mode entry point uses `std` (filesystem,
76//! threading), so it lives in its own crate to keep the
77//! `no_std` boundary clean.
78
79pub use spg_engine::{Engine, EngineError, QueryResult};
80pub use spg_storage::Value;
81
82use std::collections::BTreeMap;
83use std::fs::{File, OpenOptions};
84use std::io::{Seek, SeekFrom, Write};
85use std::path::{Path, PathBuf};
86use std::sync::{Arc, Mutex};
87use std::sync::atomic::{AtomicBool, Ordering};
88use std::thread::{self, JoinHandle};
89use std::time::Duration;
90
91use spg_manifest::{CatalogManifest, ColdSegmentEntry, manifest_path as spg_manifest_path};
92
93// -- v7.1 WAL format constants (mirror `spg-server`'s) ---------
94// Kept private so callers can't mis-frame records; the v3 layout
95// is the same the server uses, so a `spg-server` boot can read a
96// database an embedded process wrote and vice versa.
97const WAL_V2_SENTINEL: u32 = 0x8000_0000;
98const WAL_V3_FLAG: u32 = 0x4000_0000;
99const WAL_V3_TYPE_AUTO_COMMIT_SQL: u8 = 0x01;
100
101/// v7.1 — auto-checkpoint threshold. Once the WAL grows past
102/// this many bytes, the next successful `execute()` call ends
103/// with a `checkpoint()` so the WAL stays bounded. Tunable via
104/// `SPG_EMBEDDED_CHECKPOINT_BYTES` env.
105fn default_checkpoint_threshold_bytes() -> u64 {
106 std::env::var("SPG_EMBEDDED_CHECKPOINT_BYTES")
107 .ok()
108 .and_then(|s| s.parse::<u64>().ok())
109 .filter(|&n| n > 0)
110 .unwrap_or(4 * 1024 * 1024)
111}
112
113/// v7.1 — encode one v3 `auto_commit_sql` record. Layout:
114///
115/// ```text
116/// [u32 LE (len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
117/// [u32 LE crc32 over (type_byte || sql_bytes)]
118/// [u8 type = 0x01]
119/// [sql bytes]
120/// ```
121fn encode_v3_auto_commit(sql: &str) -> Vec<u8> {
122 let payload = sql.as_bytes();
123 let mut crc_buf = Vec::with_capacity(1 + payload.len());
124 crc_buf.push(WAL_V3_TYPE_AUTO_COMMIT_SQL);
125 crc_buf.extend_from_slice(payload);
126 let crc = spg_crypto::crc32::crc32(&crc_buf);
127 let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
128 let mut out = Vec::with_capacity(4 + 4 + 1 + payload.len());
129 out.extend_from_slice(&header);
130 out.extend_from_slice(&crc.to_le_bytes());
131 out.push(WAL_V3_TYPE_AUTO_COMMIT_SQL);
132 out.extend_from_slice(payload);
133 out
134}
135
136/// v7.1 — decode + apply every record in `wal_bytes` to `engine`.
137/// Returns the count of records successfully applied. A truncated
138/// trailing record (mid-write torn) is dropped silently — the
139/// same recovery story `spg-server`'s boot path uses.
140fn replay_wal_into_engine(wal_bytes: &[u8], engine: &mut Engine) -> Result<usize, String> {
141 let mut applied = 0usize;
142 let mut cur = 0usize;
143 while cur < wal_bytes.len() {
144 if wal_bytes.len() - cur < 4 {
145 // Trailing partial header — torn write, drop and stop.
146 break;
147 }
148 let raw_len = u32::from_le_bytes(wal_bytes[cur..cur + 4].try_into().unwrap());
149 let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
150 let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
151 let len_mask = if is_v3 {
152 !(WAL_V2_SENTINEL | WAL_V3_FLAG)
153 } else {
154 !WAL_V2_SENTINEL
155 };
156 let rec_len = (raw_len & len_mask) as usize;
157 let header_len = if is_v3 {
158 9
159 } else if is_v2 {
160 8
161 } else {
162 4
163 };
164 if wal_bytes.len() - cur < header_len + rec_len {
165 // Torn record at the tail — drop, stop.
166 break;
167 }
168 if is_v3 {
169 let type_byte = wal_bytes[cur + 8];
170 match type_byte {
171 WAL_V3_TYPE_AUTO_COMMIT_SQL => {}
172 0x02 => {
173 // durability_checkpoint marker — skip, no SQL.
174 cur += header_len + rec_len;
175 continue;
176 }
177 other => {
178 return Err(format!(
179 "WAL replay: unknown v3 type byte {other:#04x} at offset {cur}"
180 ));
181 }
182 }
183 }
184 let sql_bytes = &wal_bytes[cur + header_len..cur + header_len + rec_len];
185 let sql = std::str::from_utf8(sql_bytes).map_err(|e| format!("WAL replay: non-UTF-8 SQL at offset {cur}: {e}"))?;
186 engine
187 .execute(sql)
188 .map_err(|e| format!("WAL replay: apply {sql:?} at offset {cur} rejected: {e:?}"))?;
189 applied += 1;
190 cur += header_len + rec_len;
191 }
192 Ok(applied)
193}
194
195/// v7.1 — predicate for "should the next `execute()` mutate the
196/// WAL?" Returns `false` for SELECT / SHOW / EXPLAIN / BEGIN /
197/// COMMIT / ROLLBACK and the SPG-specific verbs that don't go
198/// through the auto-commit record path on the server (CHECKPOINT,
199/// COMPACT). Conservative: anything we don't explicitly know is
200/// read-only falls through to "write a WAL record".
201fn sql_is_read_only(sql: &str) -> bool {
202 let t = sql.trim_start();
203 let head = t
204 .split(|c: char| c.is_whitespace() || c == ';' || c == '(')
205 .next()
206 .unwrap_or("");
207 matches!(
208 head.to_ascii_lowercase().as_str(),
209 "select"
210 | "show"
211 | "explain"
212 | "begin"
213 | "commit"
214 | "rollback"
215 | "checkpoint"
216 | "compact"
217 | "wait"
218 | "with"
219 )
220}
221
222/// Embedded SPG database handle. Owns an `Engine` + provides
223/// ergonomic wrappers around `execute` and `query`. Drops the
224/// engine on `Drop` — no WAL flush / fsync, because v6.10.3
225/// is in-memory only.
226#[derive(Debug)]
227pub struct Database {
228 engine: Engine,
229 /// v7.1 — persistence sidecar. When `Some(p)`, every
230 /// `execute(sql)` that mutates state appends a v3
231 /// `auto_commit_sql` WAL record + fsyncs before the call
232 /// returns; `Drop` writes a final catalog snapshot to
233 /// `<db_path>` so the next session boots from a clean
234 /// snapshot + an empty WAL. `None` = in-memory only (the
235 /// v6.10.3 shape).
236 persistence: Option<PersistenceCtx>,
237}
238
239#[derive(Debug)]
240#[allow(dead_code)] // `wal_path` is read at boot; kept for Drop/diag introspection.
241struct PersistenceCtx {
242 db_path: PathBuf,
243 wal_path: PathBuf,
244 wal: File,
245 /// Cached WAL file length so each `execute()` doesn't have
246 /// to stat. Refreshed on append + on `checkpoint()` (which
247 /// truncates back to 0).
248 wal_len: u64,
249 checkpoint_threshold_bytes: u64,
250 /// v7.1.4 — `<db_path>.spg/segments/` directory. Cold-tier
251 /// segments produced by `freeze_oldest_to_cold` / compaction
252 /// are persisted here as `seg_<id>.spg` files; the manifest
253 /// at `<db_path>.spg/manifest.v10` records every active
254 /// segment + its CRC32 so the next boot can verify + reload.
255 cold_segments_dir: PathBuf,
256 cold_segment_paths: BTreeMap<u32, PathBuf>,
257}
258
259impl Database {
260 /// Open a fresh in-memory database. No WAL, no catalog
261 /// snapshot on disk — perfect for tests + short-lived
262 /// CLI tools.
263 #[must_use]
264 pub fn open_in_memory() -> Self {
265 Self {
266 engine: Engine::new(),
267 persistence: None,
268 }
269 }
270
271 /// v7.1 — Open or create a persistent database backed by
272 /// the file at `db_path`. The WAL lives at `db_path` +
273 /// ".wal" (e.g. `./data/spg.db` → `./data/spg.db.wal`). Boot
274 /// path:
275 ///
276 /// 1. If `db_path` exists, restore the catalog snapshot.
277 /// 2. If the WAL exists, replay every record into the
278 /// restored engine — the same recovery story
279 /// `spg-server` uses.
280 /// 3. Open the WAL in append+sync mode so subsequent
281 /// `execute()` writes durably commit (one fsync per
282 /// mutation).
283 ///
284 /// `Drop` writes a final catalog snapshot + truncates the
285 /// WAL — operators that need a sync barrier at a specific
286 /// point use `checkpoint()` explicitly.
287 pub fn open_path(db_path: impl AsRef<Path>) -> Result<Self, EngineError> {
288 let db_path = db_path.as_ref().to_path_buf();
289 let wal_path = {
290 let mut p = db_path.clone();
291 let name = p
292 .file_name()
293 .map(|n| {
294 let mut s = n.to_os_string();
295 s.push(".wal");
296 s
297 })
298 .unwrap_or_else(|| std::ffi::OsString::from(".wal"));
299 p.set_file_name(name);
300 p
301 };
302 if let Some(parent) = db_path.parent()
303 && !parent.as_os_str().is_empty()
304 {
305 std::fs::create_dir_all(parent).map_err(io_err)?;
306 }
307 let mut engine = if db_path.exists() {
308 let bytes = std::fs::read(&db_path).map_err(io_err)?;
309 let engine = Engine::restore_envelope(&bytes).map_err(|e| {
310 EngineError::Storage(spg_storage::StorageError::Corrupt(format!(
311 "restore from {}: {e}",
312 db_path.display()
313 )))
314 })?;
315 engine
316 } else {
317 Engine::new()
318 };
319 // v7.1.4 — manifest-driven cold-segment reload. The
320 // manifest sidecar pairs the catalog snapshot CRC with a
321 // list of `(segment_id, path, crc32)` triples; verify
322 // before loading so a torn or stale manifest doesn't
323 // surface phantom data.
324 let cold_segments_dir = {
325 let parent = db_path.parent().unwrap_or_else(|| Path::new("."));
326 let stem = db_path
327 .file_stem()
328 .unwrap_or_else(|| std::ffi::OsStr::new("db"))
329 .to_string_lossy()
330 .into_owned();
331 parent.join(format!("{stem}.spg")).join("segments")
332 };
333 let mut cold_segment_paths: BTreeMap<u32, PathBuf> = BTreeMap::new();
334 let manifest_pth = spg_manifest_path(&db_path);
335 if manifest_pth.exists() && db_path.exists() {
336 let m_bytes = std::fs::read(&manifest_pth).map_err(io_err)?;
337 if let Ok(m) = CatalogManifest::deserialize(&m_bytes) {
338 let snap_bytes = std::fs::read(&db_path).map_err(io_err)?;
339 let snap_crc = spg_crypto::crc32::crc32(&snap_bytes);
340 if snap_crc == m.catalog_crc32 {
341 for entry in &m.cold_segments {
342 if let Ok(seg_bytes) = std::fs::read(&entry.path) {
343 let computed = spg_crypto::crc32::crc32(&seg_bytes);
344 if computed != entry.crc32 {
345 eprintln!(
346 "spg-embedded: manifest skip segment {}: CRC mismatch",
347 entry.segment_id
348 );
349 continue;
350 }
351 if engine
352 .catalog()
353 .cold_segment(entry.segment_id)
354 .is_some()
355 {
356 // Already loaded via Catalog::clone path (shouldn't happen
357 // since Engine::new + restore_envelope don't populate cold).
358 continue;
359 }
360 let mut new_cat = engine.catalog().clone();
361 if let Err(e) = new_cat
362 .load_segment_bytes_at(entry.segment_id, seg_bytes)
363 {
364 eprintln!(
365 "spg-embedded: manifest load segment {} failed: {e}",
366 entry.segment_id
367 );
368 continue;
369 }
370 engine.replace_catalog(new_cat);
371 cold_segment_paths
372 .insert(entry.segment_id, entry.path.clone());
373 } else {
374 eprintln!(
375 "spg-embedded: manifest skip segment {}: file unreadable",
376 entry.segment_id
377 );
378 }
379 }
380 }
381 }
382 }
383 if wal_path.exists() {
384 let wal_bytes = std::fs::read(&wal_path).map_err(io_err)?;
385 if !wal_bytes.is_empty() {
386 replay_wal_into_engine(&wal_bytes, &mut engine)
387 .map_err(|m| EngineError::Storage(spg_storage::StorageError::Corrupt(m)))?;
388 }
389 }
390 let wal = OpenOptions::new()
391 .create(true)
392 .append(true)
393 .read(true)
394 .open(&wal_path)
395 .map_err(io_err)?;
396 let wal_len = wal.metadata().map_err(io_err)?.len();
397 Ok(Self {
398 engine,
399 persistence: Some(PersistenceCtx {
400 db_path,
401 wal_path,
402 wal,
403 wal_len,
404 checkpoint_threshold_bytes: default_checkpoint_threshold_bytes(),
405 cold_segments_dir,
406 cold_segment_paths,
407 }),
408 })
409 }
410
411 /// v7.1.4 — freeze the oldest `max_rows` of `table_name`'s
412 /// hot tier into a brand-new cold-tier segment + persist
413 /// it to disk. Same semantics as `spg-server`'s freezer
414 /// thread; embedded just runs the freeze synchronously on
415 /// the caller's thread. Persistence + manifest update
416 /// happen as part of the next `checkpoint()` (or on Drop).
417 pub fn freeze_oldest_to_cold(
418 &mut self,
419 table_name: &str,
420 index_name: &str,
421 max_rows: usize,
422 ) -> Result<spg_storage::FreezeReport, EngineError> {
423 let report = self
424 .engine
425 .freeze_oldest_to_cold(table_name, index_name, max_rows)?;
426 if let Some(p) = &mut self.persistence {
427 std::fs::create_dir_all(&p.cold_segments_dir).map_err(io_err)?;
428 let final_path = p
429 .cold_segments_dir
430 .join(format!("seg_{}.spg", report.segment_id));
431 let tmp_path = p
432 .cold_segments_dir
433 .join(format!("seg_{}.spg.tmp", report.segment_id));
434 std::fs::write(&tmp_path, &report.segment_bytes).map_err(io_err)?;
435 std::fs::rename(&tmp_path, &final_path).map_err(io_err)?;
436 p.cold_segment_paths.insert(report.segment_id, final_path);
437 }
438 Ok(report)
439 }
440
441 /// v7.1 — override the auto-checkpoint WAL-size ceiling for
442 /// this `Database` instance. Default is
443 /// `SPG_EMBEDDED_CHECKPOINT_BYTES` env (4 MiB if unset); the
444 /// setter wins. No-op when the database is in-memory.
445 pub fn set_checkpoint_threshold_bytes(&mut self, bytes: u64) {
446 if let Some(p) = &mut self.persistence {
447 p.checkpoint_threshold_bytes = bytes.max(1);
448 }
449 }
450
451 /// v7.1 — flush a fresh catalog snapshot to `db_path` and
452 /// truncate the WAL. Idempotent; cheap when nothing has
453 /// happened since the last checkpoint. No-op when the
454 /// database is in-memory (no `db_path` configured).
455 ///
456 /// Called automatically when:
457 /// - the WAL grows past
458 /// `SPG_EMBEDDED_CHECKPOINT_BYTES` (default 4 MiB) at the
459 /// end of an `execute()`, and
460 /// - `Drop` runs (best-effort; checkpoint failure on drop is
461 /// logged to stderr).
462 pub fn checkpoint(&mut self) -> Result<(), EngineError> {
463 let snapshot = self.engine.snapshot();
464 let Some(p) = &mut self.persistence else {
465 return Ok(());
466 };
467 // Snapshot first (atomic via tmp+rename), then WAL
468 // truncate. Same order as `spg-server`'s CHECKPOINT —
469 // a crash between the two leaves the WAL holding
470 // already-snapshotted ops, which replay cleanly on the
471 // next boot (idempotent for SPG's standard DDL/DML
472 // mutations).
473 let tmp = {
474 let mut t = p.db_path.clone();
475 let mut name = t
476 .file_name()
477 .map(std::ffi::OsStr::to_os_string)
478 .unwrap_or_default();
479 name.push(".tmp");
480 t.set_file_name(name);
481 t
482 };
483 std::fs::write(&tmp, &snapshot).map_err(io_err)?;
484 std::fs::rename(&tmp, &p.db_path).map_err(io_err)?;
485 // v7.1.4 — refresh the manifest so the next boot can
486 // reload cold segments alongside the snapshot. Bytes
487 // come from the freshly-written snapshot file (= the
488 // canonical CRC source).
489 if !p.cold_segment_paths.is_empty() {
490 let snap_crc = spg_crypto::crc32::crc32(&snapshot);
491 let entries: Vec<ColdSegmentEntry> = p
492 .cold_segment_paths
493 .iter()
494 .filter_map(|(&segment_id, path)| {
495 let bytes = std::fs::read(path).ok()?;
496 Some(ColdSegmentEntry {
497 segment_id,
498 path: path.clone(),
499 crc32: spg_crypto::crc32::crc32(&bytes),
500 })
501 })
502 .collect();
503 let manifest = CatalogManifest {
504 catalog_crc32: snap_crc,
505 cold_segments: entries,
506 wal_baseline_offset: 0,
507 };
508 let m_bytes = manifest.serialize();
509 let m_path = spg_manifest_path(&p.db_path);
510 if let Some(dir) = m_path.parent() {
511 std::fs::create_dir_all(dir).map_err(io_err)?;
512 }
513 let m_tmp = {
514 let mut t = m_path.clone();
515 let mut name = t
516 .file_name()
517 .map(std::ffi::OsStr::to_os_string)
518 .unwrap_or_default();
519 name.push(".tmp");
520 t.set_file_name(name);
521 t
522 };
523 std::fs::write(&m_tmp, &m_bytes).map_err(io_err)?;
524 std::fs::rename(&m_tmp, &m_path).map_err(io_err)?;
525 }
526 p.wal.set_len(0).map_err(io_err)?;
527 p.wal.seek(SeekFrom::Start(0)).map_err(io_err)?;
528 p.wal.sync_data().map_err(io_err)?;
529 p.wal_len = 0;
530 Ok(())
531 }
532
533 /// Restore a database from a previously-captured catalog
534 /// snapshot. Pairs with `Database::snapshot()` for
535 /// round-tripping in-memory state without going through
536 /// the `spg-server` WAL.
537 pub fn restore(snapshot: &[u8]) -> Result<Self, EngineError> {
538 let engine = Engine::restore_envelope(snapshot)
539 .map_err(|e| EngineError::Storage(spg_storage::StorageError::Corrupt(format!("restore: {e}"))))?;
540 Ok(Self {
541 engine,
542 persistence: None,
543 })
544 }
545
546 /// Take a catalog snapshot suitable for `Database::restore`.
547 /// The bytes are SPG's canonical catalog envelope (FILE_MAGIC
548 /// + version + payload); round-trips through every released
549 /// SPG version per the STABILITY contract.
550 #[must_use]
551 pub fn snapshot(&self) -> Vec<u8> {
552 self.engine.snapshot()
553 }
554
555 /// Execute a SQL statement and return the engine's
556 /// `QueryResult` verbatim. Pass-through for callers that
557 /// want to keep PG-flavoured column/row metadata.
558 ///
559 /// v7.1 — when the database was opened via `open_path`,
560 /// successful mutations are appended to the WAL + fsynced
561 /// before the call returns. A subsequent process crash will
562 /// recover state up to the last successful return from
563 /// `execute()`. Read-only statements (SELECT / SHOW /
564 /// EXPLAIN / BEGIN-COMMIT-ROLLBACK / CHECKPOINT / COMPACT
565 /// etc.) skip the WAL entirely.
566 pub fn execute(&mut self, sql: &str) -> Result<QueryResult, EngineError> {
567 let result = self.engine.execute(sql)?;
568 if self.persistence.is_some()
569 && !sql_is_read_only(sql)
570 && matches!(&result, QueryResult::CommandOk { modified_catalog: true, .. })
571 {
572 // Append + sync the v3 record AFTER the in-memory
573 // exec succeeds, so a WAL record never describes a
574 // mutation that didn't actually apply. The crash
575 // window between in-memory commit and WAL fsync is
576 // bounded by one record — replay re-applies the
577 // statement idempotently on next boot if we crashed
578 // between (and SPG's DDL/DML are crash-idempotent at
579 // the granularities the wire protocol exposes).
580 let record = encode_v3_auto_commit(sql);
581 let p = self.persistence.as_mut().expect("checked above");
582 p.wal.write_all(&record).map_err(io_err)?;
583 p.wal.sync_data().map_err(io_err)?;
584 p.wal_len = p.wal_len.saturating_add(record.len() as u64);
585 if p.wal_len >= p.checkpoint_threshold_bytes {
586 self.checkpoint()?;
587 }
588 }
589 Ok(result)
590 }
591
592 /// v7.3.0 — typed-row variant of [`Database::query`]. Each
593 /// row decodes into a `T: FromSpgRow` so callers don't
594 /// pattern-match on `Value` themselves. Use [`spg_row!`] to
595 /// generate the impl, or write it by hand.
596 pub fn query_typed<T: FromSpgRow>(&mut self, sql: &str) -> Result<Vec<T>, EngineError> {
597 let rows = self.query(sql)?;
598 rows.into_iter()
599 .map(|r| T::from_spg_row(&r))
600 .collect()
601 }
602
603 /// Run a SELECT and return rows as a `Vec<Vec<Value>>` —
604 /// strips the column-schema metadata for read-side
605 /// ergonomics. Errors on non-Rows results (DML / DDL
606 /// statements should go through `execute` instead).
607 pub fn query(&mut self, sql: &str) -> Result<Vec<Vec<Value>>, EngineError> {
608 match self.engine.execute(sql)? {
609 QueryResult::Rows { rows, .. } => Ok(rows.into_iter().map(|r| r.values).collect()),
610 QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
611 "query() expects a SELECT — use execute() for DML/DDL".into(),
612 )),
613 // v7.5.0 — QueryResult is #[non_exhaustive]; any future
614 // variant is not a SELECT row stream, treat as Unsupported.
615 _ => Err(EngineError::Unsupported(
616 "query() expects a SELECT — use execute() for DML/DDL".into(),
617 )),
618 }
619 }
620
621 /// Borrow the underlying engine. Escape hatch for callers
622 /// that need access to `spg-engine` APIs not yet surfaced
623 /// here (transactions, EXPLAIN ANALYZE, etc.).
624 #[must_use]
625 pub const fn engine(&self) -> &Engine {
626 &self.engine
627 }
628
629 /// Mutable borrow of the underlying engine. Same intent as
630 /// `engine()` but for write-side APIs (e.g. inserting
631 /// directly through `Catalog::insert` for high-throughput
632 /// bulk loads that bypass SQL parsing).
633 pub const fn engine_mut(&mut self) -> &mut Engine {
634 &mut self.engine
635 }
636
637 /// v7.2.0 — run `body` inside an implicit `BEGIN` /
638 /// `COMMIT` pair. The body receives `&mut Database` so it
639 /// can `execute()` / `query()` like any other code path;
640 /// the only difference is that every write in the body
641 /// lands inside one transaction, and a returned `Err` from
642 /// the body triggers `ROLLBACK` before the error propagates.
643 ///
644 /// Nested calls are not supported — SPG's transaction
645 /// model is single-writer with explicit `BEGIN` /
646 /// `COMMIT` / `ROLLBACK`, and a nested `with_transaction`
647 /// would hit `EngineError::Unsupported("nested
648 /// transaction")` at the inner `BEGIN`.
649 pub fn with_transaction<R, F>(&mut self, body: F) -> Result<R, EngineError>
650 where
651 F: FnOnce(&mut Self) -> Result<R, EngineError>,
652 {
653 self.execute("BEGIN")?;
654 match body(self) {
655 Ok(value) => {
656 self.execute("COMMIT")?;
657 Ok(value)
658 }
659 Err(e) => {
660 // Best-effort rollback. If ROLLBACK itself
661 // fails (rare — the engine reports it via
662 // `Unsupported` only when there's no active
663 // TX, which can't happen here) we surface the
664 // original body error, not the rollback error.
665 let _ = self.execute("ROLLBACK");
666 Err(e)
667 }
668 }
669 }
670}
671
672impl Default for Database {
673 fn default() -> Self {
674 Self::open_in_memory()
675 }
676}
677
678/// v7.7.5 — observability snapshot returned by
679/// [`Database::metrics`]. Plain data, no allocations beyond
680/// what the struct itself takes; cheap to construct and
681/// cheap to serialise.
682#[derive(Debug, Clone, Copy, PartialEq, Eq)]
683#[non_exhaustive]
684pub struct EmbeddedMetrics {
685 /// Total live row count across every user table (hot
686 /// tier only — cold-tier rows live in segment files).
687 pub hot_rows: u64,
688 /// Sum of `Table::hot_bytes` across every user table.
689 /// Tracks against the freezer's `hot_tier_bytes` budget.
690 pub hot_bytes: u64,
691 /// Number of cold-tier segments registered in the catalog.
692 /// Includes tombstoned slots (segments retired by
693 /// compaction whose disk file may still be on disk).
694 pub cold_segments: u64,
695 /// User-table count (excludes any future engine-managed
696 /// internal tables).
697 pub tables: u64,
698 /// WAL size at last `execute()` / `checkpoint()`. Zero
699 /// when the database is in-memory.
700 pub wal_bytes: u64,
701 /// `true` when the database was opened with `open_path` —
702 /// i.e. WAL + checkpoint persistence is active.
703 pub persistent: bool,
704}
705
706/// v7.2.1 — handle returned by `spawn_background_freezer`.
707/// Drop signals the worker thread to wind down + joins it,
708/// so a `Database` (or its shared `Arc<Mutex<Database>>`)
709/// can safely drop after the handle does.
710#[must_use = "the background freezer keeps running until this handle is dropped"]
711#[derive(Debug)]
712pub struct FreezerHandle {
713 shutdown: Arc<AtomicBool>,
714 join: Option<JoinHandle<()>>,
715}
716
717impl FreezerHandle {
718 /// v7.2.1 — request the worker stop + join. Idempotent;
719 /// safe to call from `Drop` (which also calls it).
720 pub fn stop(&mut self) {
721 self.shutdown.store(true, Ordering::Release);
722 if let Some(h) = self.join.take() {
723 let _ = h.join();
724 }
725 }
726}
727
728impl Drop for FreezerHandle {
729 fn drop(&mut self) {
730 self.stop();
731 }
732}
733
734/// v7.2.1 — knobs for `Database::spawn_background_freezer`.
735#[derive(Debug, Clone)]
736pub struct FreezerOptions {
737 /// Tick interval. Worker wakes every `tick`, checks the
738 /// catalog's `hot_tier_bytes`, and freezes if over budget.
739 pub tick: Duration,
740 /// Hot-tier byte budget. Exceeded → next tick freezes the
741 /// largest table's oldest `batch_rows` rows into a new
742 /// cold segment.
743 pub hot_tier_bytes: u64,
744 /// Max rows the freezer demotes per fire.
745 pub batch_rows: usize,
746 /// v7.7.4 — auto-compact threshold. When the catalog has
747 /// at least this many cold segments across all tables, the
748 /// freezer fires a compaction pass after its next freeze.
749 /// Set to `usize::MAX` to disable auto-compact entirely;
750 /// the default is `64`, matching the `spg-server` operating
751 /// point for SPG_COLD_COMPACT_SEGMENT_THRESHOLD.
752 pub compact_when_segments_exceed: usize,
753 /// v7.7.4 — target segment size for compaction merges,
754 /// in bytes. Default 64 MiB, mirroring `spg-server`. Small
755 /// segments below this size are merge candidates;
756 /// segments at or above stay untouched.
757 pub compact_target_bytes: u64,
758}
759
760impl Default for FreezerOptions {
761 fn default() -> Self {
762 // Match the `spg-server` freezer's default operating
763 // point (SPG_HOT_TIER_BYTES = 4 GiB, batch 1000 rows,
764 // tick every 1 s) so embedded behaviour is predictable
765 // for operators familiar with the server.
766 Self {
767 tick: Duration::from_secs(1),
768 hot_tier_bytes: 4 * 1024 * 1024 * 1024,
769 batch_rows: 1000,
770 compact_when_segments_exceed: 64,
771 compact_target_bytes: 64 * 1024 * 1024,
772 }
773 }
774}
775
776impl Database {
777 /// v7.7.4 — observe the catalog's cold-segment count.
778 /// Useful for tests + dashboards that want to verify
779 /// auto-compaction is firing.
780 #[must_use]
781 pub fn cold_segment_count(&self) -> usize {
782 self.engine.catalog().cold_segment_count()
783 }
784
785 /// v7.7.5 — observability snapshot. Returns a point-in-time
786 /// view of the engine + persistence counters. Cheap (no
787 /// locks beyond the existing `&self` borrow), so safe to
788 /// call from a hot metrics-scrape path.
789 ///
790 /// Fields mirror the operational dashboard
791 /// [`spg-server`](https://crates.io/crates/spg-server) exposes,
792 /// minus the network counters that don't apply to embedded.
793 #[must_use]
794 pub fn metrics(&self) -> EmbeddedMetrics {
795 let cat = self.engine.catalog();
796 let mut hot_rows: u64 = 0;
797 let mut hot_bytes: u64 = 0;
798 for name in cat.table_names() {
799 if let Some(t) = cat.get(&name) {
800 hot_rows = hot_rows.saturating_add(t.row_count() as u64);
801 hot_bytes = hot_bytes.saturating_add(t.hot_bytes());
802 }
803 }
804 let (wal_bytes, persistent) = match &self.persistence {
805 Some(p) => (p.wal_len, true),
806 None => (0, false),
807 };
808 EmbeddedMetrics {
809 hot_rows,
810 hot_bytes,
811 cold_segments: cat.cold_segment_count() as u64,
812 tables: cat.table_count() as u64,
813 wal_bytes,
814 persistent,
815 }
816 }
817
818 /// v7.2.1 — spawn a background thread that periodically
819 /// runs `freeze_oldest_to_cold` when the catalog-wide hot
820 /// tier exceeds `opts.hot_tier_bytes`. The `Arc<Mutex<_>>`
821 /// pattern matches the v7.2 sharing story: callers wrap
822 /// their `Database` in `Arc::new(Mutex::new(db))` once,
823 /// then clone the Arc for the worker + for foreground
824 /// access. Return value is a handle whose `Drop` joins the
825 /// worker.
826 ///
827 /// Picks the freeze target the same way `spg-server`'s
828 /// freezer does: largest-`hot_bytes` user table with at
829 /// least one BTree integer-PK index. Tables without a
830 /// freezable index are skipped silently.
831 pub fn spawn_background_freezer(
832 db: Arc<Mutex<Database>>,
833 opts: FreezerOptions,
834 ) -> FreezerHandle {
835 let shutdown = Arc::new(AtomicBool::new(false));
836 let shutdown_for_thread = Arc::clone(&shutdown);
837 let join = thread::Builder::new()
838 .name("spg-embedded-freezer".into())
839 .spawn(move || {
840 background_freezer_loop(db, opts, shutdown_for_thread);
841 })
842 .expect("spawn background freezer thread");
843 FreezerHandle {
844 shutdown,
845 join: Some(join),
846 }
847 }
848}
849
850/// v7.2.1 — the freezer's main loop, factored out so the
851/// `Database::spawn_background_freezer` path stays readable.
852fn background_freezer_loop(
853 db: Arc<Mutex<Database>>,
854 opts: FreezerOptions,
855 shutdown: Arc<AtomicBool>,
856) {
857 // Sleep in short slices so a shutdown request resolves
858 // quickly (vs sleeping the full tick).
859 let slice = Duration::from_millis(50.min(opts.tick.as_millis() as u64));
860 let mut last_tick = std::time::Instant::now();
861 loop {
862 if shutdown.load(Ordering::Acquire) {
863 return;
864 }
865 thread::sleep(slice);
866 if last_tick.elapsed() < opts.tick {
867 continue;
868 }
869 last_tick = std::time::Instant::now();
870 let Ok(mut guard) = db.lock() else {
871 return;
872 };
873 if guard.engine.catalog().hot_tier_bytes() <= opts.hot_tier_bytes {
874 continue;
875 }
876 let Some((table, index)) = pick_freeze_target(&guard) else {
877 continue;
878 };
879 let row_count = guard
880 .engine
881 .catalog()
882 .get(&table)
883 .map_or(0, spg_storage::Table::row_count);
884 let to_freeze = opts.batch_rows.min(row_count);
885 if to_freeze == 0 {
886 continue;
887 }
888 if let Err(e) = guard.freeze_oldest_to_cold(&table, &index, to_freeze) {
889 eprintln!(
890 "spg-embedded: background freeze on {table}.{index} failed: {e:?}"
891 );
892 continue;
893 }
894 // v7.7.4 — auto-compact. If the catalog now carries
895 // more cold segments than the configured threshold,
896 // run a single compaction pass. Failures are reported
897 // but don't kill the loop; the next tick will retry.
898 let count = guard.engine.catalog().cold_segment_count();
899 if count > opts.compact_when_segments_exceed {
900 if let Err(e) = guard
901 .engine
902 .compact_cold_segments_with_target(opts.compact_target_bytes)
903 {
904 eprintln!(
905 "spg-embedded: background compact failed (segments={count}, \
906 threshold={}): {e:?}",
907 opts.compact_when_segments_exceed,
908 );
909 }
910 }
911 }
912}
913
914/// v7.2.1 — pick the highest-`hot_bytes` user table with a
915/// BTree integer-PK index. Returns `(table, index_name)` so the
916/// caller can dispatch through `freeze_oldest_to_cold`.
917fn pick_freeze_target(db: &Database) -> Option<(String, String)> {
918 let cat = db.engine.catalog();
919 let mut best: Option<(String, String, u64)> = None;
920 for name in cat.table_names() {
921 let Some(t) = cat.get(&name) else { continue };
922 if t.row_count() == 0 {
923 continue;
924 }
925 let cols = &t.schema().columns;
926 let Some(idx) = t.indices().iter().find(|i| {
927 matches!(i.kind, spg_storage::IndexKind::BTree(_))
928 && i.column_position < cols.len()
929 && matches!(
930 cols[i.column_position].ty,
931 spg_storage::DataType::SmallInt
932 | spg_storage::DataType::Int
933 | spg_storage::DataType::BigInt
934 )
935 }) else {
936 continue;
937 };
938 let hot = t.hot_bytes();
939 match best {
940 None => best = Some((name, idx.name.clone(), hot)),
941 Some((_, _, best_hot)) if hot > best_hot => {
942 best = Some((name, idx.name.clone(), hot));
943 }
944 _ => {}
945 }
946 }
947 best.map(|(t, i, _)| (t, i))
948}
949
950/// v7.7.6 — replay the first `to_seq` records of the WAL at
951/// `wal_path` into a fresh engine and write the resulting
952/// catalog snapshot to `out_db_path`. Same semantics as
953/// `spg revert --wal … --to-seq N --out …` from the CLI:
954///
955/// - `to_seq == 0` → snapshot is the empty catalog
956/// - WAL records beyond `to_seq` are not applied
957/// - durability-checkpoint markers (v3 type 0x02) are
958/// consumed without counting against the budget
959///
960/// Returns the number of statements actually applied
961/// (`≤ to_seq`). The output snapshot is byte-identical to
962/// what `Database::open_path(out_db_path)` would consume on
963/// a subsequent open.
964///
965/// This is the "rewind" operator for an embedded database
966/// that has been corrupted by a poison statement or a
967/// half-applied migration. Pair with `cold_segment_paths`
968/// preservation if your cold-tier files are still on disk.
969///
970/// # Errors
971///
972/// - `wal_path` unreadable or truncated mid-record
973/// - WAL record decodes to invalid UTF-8 SQL
974/// - WAL record's SQL is rejected by the engine
975/// - `out_db_path` unwritable
976pub fn revert_wal_to_seq(
977 wal_path: impl AsRef<Path>,
978 to_seq: u64,
979 out_db_path: impl AsRef<Path>,
980) -> Result<u64, EngineError> {
981 let wal_bytes = std::fs::read(wal_path.as_ref()).map_err(io_err)?;
982 let mut engine = Engine::new();
983 let mut applied = 0u64;
984 let mut cur = 0usize;
985 while cur < wal_bytes.len() && applied < to_seq {
986 let (sql_bytes, total) = decode_wal_record(&wal_bytes[cur..])?;
987 cur += total;
988 if sql_bytes.is_empty() {
989 continue;
990 }
991 let sql = core::str::from_utf8(&sql_bytes).map_err(|e| {
992 EngineError::Storage(spg_storage::StorageError::Corrupt(format!(
993 "WAL record at offset {cur}: non-UTF-8 SQL: {e}"
994 )))
995 })?;
996 engine.execute(sql)?;
997 applied += 1;
998 }
999 let snapshot = engine.snapshot();
1000 std::fs::write(out_db_path.as_ref(), &snapshot).map_err(io_err)?;
1001 Ok(applied)
1002}
1003
1004/// v7.7.6 — decode one WAL record from a byte tail. Returns
1005/// `(sql_bytes, header_plus_payload_len)`. Handles the three
1006/// on-disk formats (v1 / v2 / v3) the same way the CLI
1007/// `decode_one_record` and the engine's `replay_wal_bytes`
1008/// do. CRCs are not re-validated; the caller's intent is
1009/// "apply", not "validate".
1010fn decode_wal_record(tail: &[u8]) -> Result<(Vec<u8>, usize), EngineError> {
1011 if tail.len() < 4 {
1012 return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
1013 format!("WAL truncated record: {} < 4 header bytes", tail.len()),
1014 )));
1015 }
1016 let raw_len = u32::from_le_bytes(tail[..4].try_into().unwrap());
1017 let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
1018 let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
1019 let len_mask = if is_v3 {
1020 !(WAL_V2_SENTINEL | WAL_V3_FLAG)
1021 } else {
1022 !WAL_V2_SENTINEL
1023 };
1024 let rec_len = (raw_len & len_mask) as usize;
1025 let header_len = if is_v3 {
1026 9
1027 } else if is_v2 {
1028 8
1029 } else {
1030 4
1031 };
1032 if tail.len() < header_len + rec_len {
1033 return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
1034 format!(
1035 "WAL truncated record: header+payload {} > available {}",
1036 header_len + rec_len,
1037 tail.len()
1038 ),
1039 )));
1040 }
1041 let payload = &tail[header_len..header_len + rec_len];
1042 let sql_bytes = if is_v3 {
1043 let type_byte = tail[8];
1044 // v3 type 0x01 = auto_commit_sql (payload = SQL).
1045 // v3 type 0x02 = durability marker (payload = u64
1046 // offset, no SQL to apply). Anything else is unknown.
1047 if type_byte == WAL_V3_TYPE_AUTO_COMMIT_SQL {
1048 payload.to_vec()
1049 } else {
1050 // Caller treats empty payload as a skip-marker.
1051 Vec::new()
1052 }
1053 } else {
1054 payload.to_vec()
1055 };
1056 Ok((sql_bytes, header_len + rec_len))
1057}
1058
1059impl Drop for Database {
1060 fn drop(&mut self) {
1061 // v7.1 — best-effort final checkpoint when a persistent
1062 // Database leaves scope. Failures here go to stderr so
1063 // operators see them, but Drop can't propagate errors —
1064 // the WAL itself is already durable, so a checkpoint
1065 // miss only means the next boot replays a few more
1066 // records than strictly necessary.
1067 if self.persistence.is_some() {
1068 if let Err(e) = self.checkpoint() {
1069 eprintln!(
1070 "spg-embedded: final checkpoint on Drop failed: {e:?} \
1071 (WAL is intact; next open_path will replay)"
1072 );
1073 }
1074 }
1075 }
1076}
1077
1078/// v7.1 — turn a `std::io::Error` into the workspace's
1079/// `EngineError` shape. `EngineError::Storage(Corrupt(_))` is
1080/// the closest existing variant — io failures during boot or
1081/// during a WAL append surface as a storage-layer fault to
1082/// callers, which keeps the public error enum unchanged.
1083fn io_err(e: std::io::Error) -> EngineError {
1084 EngineError::Storage(spg_storage::StorageError::Corrupt(format!("io: {e}")))
1085}
1086
1087/// v7.2.2 — `Database` is `Send`, so the recommended sharing
1088/// pattern for multi-threaded callers is `Arc<Mutex<Database>>`:
1089///
1090/// ```no_run
1091/// use std::sync::{Arc, Mutex};
1092/// use spg_embedded::Database;
1093///
1094/// let db = Database::open_in_memory();
1095/// let shared = Arc::new(Mutex::new(db));
1096/// let shared_for_worker = Arc::clone(&shared);
1097/// std::thread::spawn(move || {
1098/// let mut guard = shared_for_worker.lock().unwrap();
1099/// guard.execute("INSERT INTO t VALUES (1)").unwrap();
1100/// });
1101/// ```
1102///
1103/// Internal `RwLock`-wrapped state — letting many threads
1104/// hold concurrent `&Database` for `SELECT` without contending
1105/// — is parked as STABILITY § "Out of v7.2"; multi-reader
1106/// embedded throughput needs a planner-side change to release
1107/// the engine read lock between scans, which is the v7.x
1108/// "Choice A" line of work already documented in v6.9.1's
1109/// carve-out.
1110#[allow(dead_code)]
1111fn _database_is_send() {
1112 fn assert_send<T: Send>() {}
1113 assert_send::<Database>();
1114}
1115
1116/// v6.10.3 — trait that maps a row's columns onto a user
1117/// struct's fields. v7.3.0 ships the [`spg_row!`] declarative
1118/// macro that generates `impl FromSpgRow for YourStruct` from
1119/// a struct definition (no proc-macro, no syn/quote/
1120/// proc-macro2 deps — the workspace's "0 external deps"
1121/// policy holds).
1122///
1123/// Implementors map a row's columns onto a user struct's
1124/// fields. Errors surface as `EngineError::Unsupported` so the
1125/// caller's error type stays uniform.
1126pub trait FromSpgRow: Sized {
1127 /// Decode one query result row into `Self`. Called once per
1128 /// row by [`Database::query_typed`]. The slice length equals
1129 /// the number of columns in the SELECT projection.
1130 fn from_spg_row(row: &[Value]) -> Result<Self, EngineError>;
1131}
1132
1133/// v7.3.0 — declarative macro that generates `FromSpgRow` impl
1134/// for a user struct. Avoids proc-macro deps
1135/// (syn/quote/proc-macro2) so the workspace's 0-deps policy
1136/// holds; the trade-off vs `#[derive(SpgRow)]` is that the
1137/// macro takes the entire struct definition (fields + types)
1138/// as input rather than annotating an existing struct.
1139///
1140/// ```no_run
1141/// use spg_embedded::{Database, spg_row, FromSpgRow};
1142///
1143/// spg_row! {
1144/// pub struct User {
1145/// pub id: i32,
1146/// pub name: String,
1147/// }
1148/// }
1149///
1150/// let mut db = Database::open_in_memory();
1151/// db.execute("CREATE TABLE users (id INT NOT NULL, name TEXT)").unwrap();
1152/// db.execute("INSERT INTO users VALUES (1, 'alice')").unwrap();
1153/// let users: Vec<User> = db.query_typed("SELECT id, name FROM users").unwrap();
1154/// ```
1155///
1156/// Supported field types: `i16`, `i32`, `i64`, `f32`, `f64`,
1157/// `bool`, `String`, `Vec<f32>` (for `VECTOR(N)` columns),
1158/// `Option<T>` of any of the above.
1159#[macro_export]
1160macro_rules! spg_row {
1161 (
1162 $(#[$meta:meta])*
1163 $vis:vis struct $name:ident {
1164 $(
1165 $(#[$fmeta:meta])*
1166 $fvis:vis $field:ident : $ty:ty,
1167 )*
1168 }
1169 ) => {
1170 $(#[$meta])*
1171 #[derive(Debug, Clone)]
1172 $vis struct $name {
1173 $(
1174 $(#[$fmeta])*
1175 $fvis $field : $ty,
1176 )*
1177 }
1178
1179 impl $crate::FromSpgRow for $name {
1180 fn from_spg_row(row: &[$crate::Value]) -> ::core::result::Result<Self, $crate::EngineError> {
1181 let mut __spg_row_iter = row.iter();
1182 $(
1183 let $field: $ty = {
1184 let v = __spg_row_iter
1185 .next()
1186 .ok_or_else(|| $crate::EngineError::Unsupported(
1187 ::std::format!(
1188 "spg_row! {}: missing column for field `{}`",
1189 ::core::stringify!($name),
1190 ::core::stringify!($field)
1191 )
1192 ))?;
1193 <$ty as $crate::FromSpgValue>::from_spg_value(v)
1194 .map_err(|e| $crate::EngineError::Unsupported(
1195 ::std::format!(
1196 "spg_row! {}: column `{}`: {}",
1197 ::core::stringify!($name),
1198 ::core::stringify!($field),
1199 e
1200 )
1201 ))?
1202 };
1203 )*
1204 Ok(Self { $($field,)* })
1205 }
1206 }
1207 };
1208}
1209
1210/// v7.3.0 — per-column decoder used by `spg_row!`. Surface
1211/// covers every numeric / text / bytes / bool variant in
1212/// `Value`, plus `Option<T>` for nullable columns.
1213pub trait FromSpgValue: Sized {
1214 /// Decode one cell into `Self`. The returned `&'static str`
1215 /// is a short diagnostic for type mismatches (e.g. `"expected
1216 /// integer, got TEXT"`); callers wrap it into their own
1217 /// error type.
1218 fn from_spg_value(v: &Value) -> Result<Self, &'static str>;
1219}
1220
1221macro_rules! impl_from_value_int {
1222 ($($t:ty),* $(,)?) => {
1223 $(
1224 impl FromSpgValue for $t {
1225 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
1226 match v {
1227 Value::SmallInt(n) => <$t>::try_from(*n).map_err(|_| "SmallInt does not fit target int type"),
1228 Value::Int(n) => <$t>::try_from(*n).map_err(|_| "Int does not fit target int type"),
1229 Value::BigInt(n) => <$t>::try_from(*n).map_err(|_| "BigInt does not fit target int type"),
1230 Value::Null => Err("NULL in non-Option int column"),
1231 _ => Err("non-integer value in int column"),
1232 }
1233 }
1234 }
1235 )*
1236 };
1237}
1238impl_from_value_int!(i16, i32, i64);
1239
1240impl FromSpgValue for f32 {
1241 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
1242 match v {
1243 Value::Float(f) => Ok(*f as f32),
1244 Value::Null => Err("NULL in non-Option float column"),
1245 _ => Err("non-float value in float column"),
1246 }
1247 }
1248}
1249
1250impl FromSpgValue for f64 {
1251 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
1252 match v {
1253 Value::Float(f) => Ok(*f),
1254 Value::Null => Err("NULL in non-Option float column"),
1255 _ => Err("non-float value in float column"),
1256 }
1257 }
1258}
1259
1260impl FromSpgValue for bool {
1261 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
1262 match v {
1263 Value::Bool(b) => Ok(*b),
1264 Value::Null => Err("NULL in non-Option bool column"),
1265 _ => Err("non-bool value in bool column"),
1266 }
1267 }
1268}
1269
1270impl FromSpgValue for String {
1271 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
1272 match v {
1273 Value::Text(s) => Ok(s.clone()),
1274 Value::Null => Err("NULL in non-Option text column"),
1275 _ => Err("non-text value in String column"),
1276 }
1277 }
1278}
1279
1280impl FromSpgValue for Vec<f32> {
1281 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
1282 match v {
1283 Value::Vector(xs) => Ok(xs.clone()),
1284 Value::Null => Err("NULL in non-Option vector column"),
1285 _ => Err("non-vector value in Vec<f32> column"),
1286 }
1287 }
1288}
1289
1290impl<T: FromSpgValue> FromSpgValue for Option<T> {
1291 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
1292 match v {
1293 Value::Null => Ok(None),
1294 other => T::from_spg_value(other).map(Some),
1295 }
1296 }
1297}
1298
1299#[cfg(test)]
1300mod tests {
1301 use super::*;
1302
1303 #[test]
1304 fn in_memory_create_insert_select() {
1305 let mut db = Database::open_in_memory();
1306 db.execute("CREATE TABLE t (id INT NOT NULL, name TEXT)").unwrap();
1307 db.execute("INSERT INTO t VALUES (1, 'alice')").unwrap();
1308 db.execute("INSERT INTO t VALUES (2, 'bob')").unwrap();
1309 let rows = db.query("SELECT id FROM t WHERE id = 1").unwrap();
1310 assert_eq!(rows.len(), 1);
1311 match &rows[0][0] {
1312 Value::Int(1) => {}
1313 other => panic!("expected Int(1), got {other:?}"),
1314 }
1315 }
1316
1317 #[test]
1318 fn query_on_non_select_errors() {
1319 let mut db = Database::open_in_memory();
1320 db.execute("CREATE TABLE t (id INT)").unwrap();
1321 let r = db.query("INSERT INTO t VALUES (1)");
1322 assert!(r.is_err(), "query() on INSERT must error");
1323 }
1324
1325 #[test]
1326 fn snapshot_roundtrip() {
1327 let mut db = Database::open_in_memory();
1328 db.execute("CREATE TABLE t (id INT NOT NULL)").unwrap();
1329 db.execute("INSERT INTO t VALUES (42)").unwrap();
1330 let bytes = db.snapshot();
1331 let mut restored = Database::restore(&bytes).unwrap();
1332 let rows = restored.query("SELECT id FROM t WHERE id = 42").unwrap();
1333 assert_eq!(rows.len(), 1);
1334 match &rows[0][0] {
1335 Value::Int(42) => {}
1336 other => panic!("expected Int(42), got {other:?}"),
1337 }
1338 }
1339
1340 #[test]
1341 fn from_spg_row_trait_shape() {
1342 struct User {
1343 _id: i32,
1344 }
1345 impl FromSpgRow for User {
1346 fn from_spg_row(row: &[Value]) -> Result<Self, EngineError> {
1347 match row.first() {
1348 Some(Value::Int(n)) => Ok(Self { _id: *n }),
1349 _ => Err(EngineError::Unsupported("bad id".into())),
1350 }
1351 }
1352 }
1353 let row = vec![Value::Int(7)];
1354 let _u = User::from_spg_row(&row).unwrap();
1355 }
1356}