spg_embedded/lib.rs
1// v7.7.2 — every public item in this crate must carry a
2// doc-comment; new code that adds a `pub` without one fails CI.
3#![deny(missing_docs)]
4
5//! # spg-embedded
6//!
7//! Ergonomic embedded-mode entry point for SPG. Wraps the
8//! `spg-engine` execution layer for in-process applications
9//! that don't want to spin up a TCP listener / fork to the
10//! `spg-server` binary.
11//!
12//! ## Quick start
13//!
14//! ```no_run
15//! use spg_embedded::Database;
16//!
17//! // On-disk, durable. WAL fsynced per commit; auto-checkpoint
18//! // at 4 MiB WAL by default.
19//! let mut db = Database::open_path("/data/app.db").unwrap();
20//! db.execute("CREATE TABLE users (id INT NOT NULL, name TEXT)").unwrap();
21//! db.execute("INSERT INTO users VALUES (1, 'alice')").unwrap();
22//! let rows = db.query("SELECT name FROM users WHERE id = 1").unwrap();
23//! for row in &rows {
24//! println!("{:?}", row);
25//! }
26//! ```
27//!
28//! ## Production checklist (v7.5)
29//!
30//! - **Persistence**: `Database::open_path(p)` writes a
31//! crash-consistent WAL + periodic checkpoint snapshot. The
32//! on-disk format is byte-identical to what `spg-server`
33//! produces, so a database can move between modes without
34//! conversion.
35//! - **Durability**: every `execute()` that mutates calls
36//! `fsync` before returning `Ok`. There is no group commit
37//! in embedded mode — every commit pays one fsync. If you
38//! need batch throughput, wrap multiple statements in
39//! [`Database::with_transaction`] which fsyncs only at
40//! commit.
41//! - **Concurrency**: [`Database`] is `Send` but **not** `Sync`.
42//! Share across threads via `Arc<Mutex<Database>>`. The
43//! single-writer model is intentional — see
44//! [STABILITY § A1](https://github.com/lihao/spg/blob/master/STABILITY.md).
45//! - **Background work**: [`Database::spawn_background_freezer`]
46//! moves cold rows to disk-resident segments while you keep
47//! serving requests. It runs in a dedicated thread; drop the
48//! returned [`FreezerHandle`] (or call `stop()`) for clean
49//! shutdown.
50//! - **Errors**: all public enums ([`EngineError`],
51//! [`QueryResult`], [`Value`]) are `#[non_exhaustive]`. Match
52//! them with a wildcard arm so future v7.x releases can add
53//! variants without breaking your code.
54//!
55//! ## Panic contract
56//!
57//! - **No `execute()` / `query()` call panics on user input.**
58//! Malformed SQL, type mismatches, missing tables — all
59//! return `Err(EngineError::…)`. If you observe a panic on
60//! a user-controlled string, that is a bug; file an issue.
61//! - The library panics **only** on internal invariant
62//! violations (e.g., catalog snapshot magic mismatch, WAL
63//! record CRC sentinel corruption that survived the boot-
64//! time validation). These represent silent disk corruption
65//! and an unwind would leak inconsistent state, so the
66//! release profile uses `panic = abort` — your host process
67//! dies fast rather than continuing on poisoned data.
68//! - If you cannot tolerate `panic = abort`, build with
69//! `--profile release-dbg` (keeps unwind tables) and use
70//! `std::panic::catch_unwind` at your application boundary.
71//!
72//! ## Why a separate crate?
73//!
74//! `spg-engine` is `no_std`-compatible (vendored alloc-only).
75//! The embedded-mode entry point uses `std` (filesystem,
76//! threading), so it lives in its own crate to keep the
77//! `no_std` boundary clean.
78
79pub use spg_engine::{CatalogSnapshot, Engine, EngineError, ParsedStatement, QueryResult};
80pub use spg_storage::{ColumnSchema, DataType, Value};
81
82/// v7.16.0 — handle for a parsed-and-planned SQL statement.
83/// Hand off to [`Database::execute_prepared`] / [`Database::query_prepared`]
84/// with a `&[Value]` slice carrying the bind parameters (PG-style
85/// `$1`, `$2`, … positional). Cheap to `Clone`; the underlying AST
86/// is shared by handle copies and cloned per bind call by the
87/// engine's executor.
88///
89/// The handle holds a snapshot of the AST at prepare time. If
90/// the engine's plan cache evicts the entry between prepare and
91/// execute (e.g. ANALYZE bumps the statistics version) the
92/// stored AST keeps working — `execute_prepared` operates on
93/// the handle's clone, not the cache entry.
94#[derive(Debug, Clone)]
95pub struct Statement {
96 /// The parsed + planned AST. `spg-engine::prepare_cached`
97 /// returns it as a clone of the cached plan, so any rewrite
98 /// passes (`expand_group_by_all`, `reorder_joins`, …) have
99 /// already run.
100 pub(crate) stmt: ParsedStatement,
101 /// Original SQL source, kept for `Display` / debug only.
102 /// WAL persistence renders from the AST so a bind-time
103 /// rewrite of `$1..$N` survives replay.
104 pub(crate) sql: String,
105}
106
107impl Statement {
108 /// Borrow the original SQL source — useful for tracing and
109 /// debug logs. WAL replay does NOT use this; it serialises
110 /// the bind-final AST instead.
111 #[must_use]
112 pub fn sql(&self) -> &str {
113 &self.sql
114 }
115}
116
117/// v7.16.0 — internal WAL helper. Mirrors what
118/// `Engine::execute_prepared` does to the cloned AST so the WAL
119/// record carries the bind-final SQL text (so replay's
120/// simple-query path reconstructs the same row state without
121/// needing the original `Statement` handle to still be alive).
122/// Errors from the underlying engine helper would only fire if
123/// the bind-final stmt referenced a placeholder past the params
124/// slice — and that case has already errored in the executor
125/// above before this helper runs, so we discard the Result here.
126fn wal_render_with_params(stmt: &mut ParsedStatement, params: &[Value]) {
127 let _ = spg_engine::substitute_placeholders(stmt, params);
128}
129
130use std::collections::BTreeMap;
131use std::fs::{File, OpenOptions};
132use std::io::Write;
133use std::path::{Path, PathBuf};
134use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
135use std::sync::{Arc, Mutex};
136use std::thread::{self, JoinHandle};
137use std::time::{Duration, SystemTime, UNIX_EPOCH};
138
139/// v7.11.3 — wall-clock provider injected into every embedded
140/// `Engine`. Microseconds since the Unix epoch; clamps to
141/// `i64::MAX` if the system clock is far-future. Used by SQL's
142/// `NOW()` / `CURRENT_TIMESTAMP` / `CURRENT_DATE` rewrite layer
143/// so PG-idiomatic time queries work without the caller wiring
144/// their own clock.
145fn wall_clock_micros() -> i64 {
146 SystemTime::now()
147 .duration_since(UNIX_EPOCH)
148 .map_or(0, |d| i64::try_from(d.as_micros()).unwrap_or(i64::MAX))
149}
150
151use spg_manifest::{CatalogManifest, ColdSegmentEntry, manifest_path as spg_manifest_path};
152
153// -- v7.1 WAL format constants (mirror `spg-server`'s) ---------
154// Kept private so callers can't mis-frame records; the v3 layout
155// is the same the server uses, so a `spg-server` boot can read a
156// database an embedded process wrote and vice versa.
157const WAL_V2_SENTINEL: u32 = 0x8000_0000;
158const WAL_V3_FLAG: u32 = 0x4000_0000;
159const WAL_V3_TYPE_AUTO_COMMIT_SQL: u8 = 0x01;
160/// v7.18 — durability checkpoint marker stays at 0x02 (skipped on replay).
161const WAL_V3_TYPE_DURABILITY_CHECKPOINT: u8 = 0x02;
162/// v7.18 PITR — auto-commit-sql record with appended (commit_lsn,
163/// commit_unix_us) fields so replay can target a specific point in
164/// time. Backward-compat: v3 records (type 0x01) keep working, the
165/// envelope flag bits are unchanged. The new type byte is the
166/// schema-version discriminator.
167const WAL_V4_TYPE_AUTO_COMMIT_SQL: u8 = 0x10;
168/// v7.18 — sentinel for "no wall clock" inside a v4 record's
169/// commit_unix_us slot. Restore-to-timestamp skips records with
170/// this sentinel (no time anchor); LSN-based restore is
171/// unaffected.
172const WAL_V4_NO_CLOCK: i64 = i64::MIN;
173/// v7.18 — extra header bytes after the type byte in a v4 record:
174/// 8 bytes commit_lsn (u64 LE) + 8 bytes commit_unix_us (i64 LE).
175const WAL_V4_EXTRA_HEADER: usize = 16;
176/// v7.18 PITR — checkpoint anchor record written to the WAL *before*
177/// the snapshot file replaces the on-disk catalog. Carries the
178/// (lsn, ts, snapshot_path) triple so restore tooling can find the
179/// matching base snapshot without scanning the filesystem. Replay
180/// dispatch skips it (same as the v3 durability marker).
181const WAL_V4_TYPE_CHECKPOINT_MARKER: u8 = 0x11;
182
183/// v7.21 (mailrs embed round-12 polish) — one COMMITted explicit
184/// transaction, flushed atomically at COMMIT time. Payload = the
185/// transaction's bind-final mutation statements joined with `";\n"`;
186/// replay re-splits via [`split_statements`] and applies in order.
187/// Same 16-byte (commit_lsn, commit_unix_us) prefix as the v4
188/// auto-commit record. The record is CRC-framed like every other
189/// record, so replay applies the whole transaction or — torn tail —
190/// none of it; a transaction can never half-resurrect.
191///
192/// Why it exists: in-transaction mutations only touch the engine's
193/// shadow catalog (`modified_catalog: false`), so the per-statement
194/// auto-commit append never fired and a COMMIT followed by a crash
195/// (no graceful Drop checkpoint) lost the transaction.
196const WAL_V4_TYPE_TX_COMMIT_SQL: u8 = 0x12;
197
198/// v7.1 — auto-checkpoint threshold. Once the WAL grows past
199/// this many bytes, the next successful `execute()` call ends
200/// with a `checkpoint()` so the WAL stays bounded. Tunable via
201/// `SPG_EMBEDDED_CHECKPOINT_BYTES` env.
202fn default_checkpoint_threshold_bytes() -> u64 {
203 std::env::var("SPG_EMBEDDED_CHECKPOINT_BYTES")
204 .ok()
205 .and_then(|s| s.parse::<u64>().ok())
206 .filter(|&n| n > 0)
207 .unwrap_or(4 * 1024 * 1024)
208}
209
210/// v7.30.3 (mailrs round-26) — per-query byte budget on join/filter
211/// materialisation, default ON at 256 MiB for embed parity with the
212/// server's allocator-level `SPG_MAX_QUERY_BYTES` default. A fat
213/// backfill batch (1000 × full mail bodies) then errors with
214/// `QueryBytesExceeded` instead of walking the host into reclaim
215/// livelock. `SPG_MAX_QUERY_BYTES=0` disables; any other value
216/// overrides. NOT applied to the WAL-replay engine — replay must
217/// never fail on a tuning knob.
218fn engine_with_query_byte_budget(engine: Engine) -> Engine {
219 const DEFAULT_MAX_QUERY_BYTES: usize = 256 * 1024 * 1024;
220 match std::env::var("SPG_MAX_QUERY_BYTES")
221 .ok()
222 .and_then(|s| s.trim().parse::<usize>().ok())
223 {
224 Some(0) => engine,
225 Some(n) => engine.with_max_query_bytes(n),
226 None => engine.with_max_query_bytes(DEFAULT_MAX_QUERY_BYTES),
227 }
228}
229
230/// v7.1 — encode one v3 `auto_commit_sql` record. Layout:
231///
232/// ```text
233/// [u32 LE (len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
234/// [u32 LE crc32 over (type_byte || sql_bytes)]
235/// [u8 type = 0x01]
236/// [sql bytes]
237/// ```
238fn encode_v3_auto_commit(sql: &str) -> Vec<u8> {
239 let payload = sql.as_bytes();
240 let mut crc_buf = Vec::with_capacity(1 + payload.len());
241 crc_buf.push(WAL_V3_TYPE_AUTO_COMMIT_SQL);
242 crc_buf.extend_from_slice(payload);
243 let crc = spg_crypto::crc32::crc32(&crc_buf);
244 let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
245 let mut out = Vec::with_capacity(4 + 4 + 1 + payload.len());
246 out.extend_from_slice(&header);
247 out.extend_from_slice(&crc.to_le_bytes());
248 out.push(WAL_V3_TYPE_AUTO_COMMIT_SQL);
249 out.extend_from_slice(payload);
250 out
251}
252
253/// v7.20 P2 — WAL group-commit. N concurrent commits share one
254/// fsync (the 4.2 ms p50 that profile_breakdown measured as
255/// 99.2% of the durable write path).
256///
257/// Leader-follower protocol, same family as PG's group commit:
258///
259/// 1. `enqueue(record)` — called while the caller still holds
260/// the engine's write lock. Appends the encoded record to the
261/// shared buffer, returns a sequence ticket. O(memcpy).
262/// 2. Caller RELEASES the engine write lock (the next writer's
263/// mutation proceeds in parallel with this batch's fsync).
264/// 3. `wait_flushed(seq)` — if nobody is flushing, the caller
265/// elects itself leader: swaps the buffer out, writes +
266/// fsyncs ONCE for every record in the batch, marks the
267/// batch durable, wakes all followers. Otherwise it parks on
268/// the condvar until a leader covers its seq.
269///
270/// Durability contract is unchanged from v7.19: `execute()`
271/// does not return Ok until the record that describes its
272/// mutation is fsynced. The only change is N callers sharing
273/// one fsync instead of paying one each.
274///
275/// Lock order (deadlock-free): `state` then `file`; never the
276/// reverse. The leader holds `file` WITHOUT `state` during IO so
277/// enqueues continue while fsync runs.
278#[derive(Debug)]
279struct WalGroup {
280 state: Mutex<WalGroupState>,
281 cond: std::sync::Condvar,
282 /// Active chunk file handle. Separate lock from `state` so
283 /// the leader's write+fsync doesn't block concurrent
284 /// enqueues. Swapped by `checkpoint()` at rotation.
285 file: Mutex<File>,
286}
287
288#[derive(Debug)]
289struct WalGroupState {
290 /// Encoded records awaiting flush.
291 buf: Vec<u8>,
292 /// Monotonic enqueue counter (1-based).
293 enqueued_seq: u64,
294 /// Highest seq whose record is fsynced.
295 flushed_seq: u64,
296 /// True while some caller is inside the leader IO section.
297 leader_active: bool,
298 /// Sticky fatal error — a failed fsync poisons the WAL
299 /// (loud, never silent). All current + future waiters error.
300 failed: Option<String>,
301 /// Bytes written to the active chunk since rotation —
302 /// drives the auto-checkpoint trigger.
303 written_len: u64,
304}
305
306/// Ticket returned by the buffered write path; `wait()` blocks
307/// until the record it covers is durable (or the WAL is
308/// poisoned). Cheap to move across threads.
309#[derive(Debug)]
310pub struct WalTicket {
311 group: Arc<WalGroup>,
312 seq: u64,
313}
314
315impl WalGroup {
316 fn new(file: File, initial_len: u64) -> Self {
317 Self {
318 state: Mutex::new(WalGroupState {
319 buf: Vec::new(),
320 enqueued_seq: 0,
321 flushed_seq: 0,
322 leader_active: false,
323 failed: None,
324 written_len: initial_len,
325 }),
326 cond: std::sync::Condvar::new(),
327 file: Mutex::new(file),
328 }
329 }
330
331 /// Append `record` to the pending batch. Returns the seq the
332 /// caller must wait on. Called under the engine write lock —
333 /// keep it O(memcpy).
334 fn enqueue(&self, record: &[u8]) -> u64 {
335 let mut g = self.state.lock().expect("wal state poisoned");
336 g.buf.extend_from_slice(record);
337 g.enqueued_seq += 1;
338 g.enqueued_seq
339 }
340
341 /// Block until `seq` is durable. Leader-follower: the first
342 /// arriving waiter flushes for everyone.
343 fn wait_flushed(&self, seq: u64) -> Result<(), EngineError> {
344 let mut g = self.state.lock().expect("wal state poisoned");
345 loop {
346 if let Some(e) = &g.failed {
347 return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
348 format!("WAL poisoned by earlier flush failure: {e}"),
349 )));
350 }
351 if g.flushed_seq >= seq {
352 return Ok(());
353 }
354 if !g.leader_active {
355 // Elect self leader.
356 g.leader_active = true;
357 drop(g);
358 // v7.20 — commit_delay (PG's same-named knob):
359 // before taking the batch, give in-flight
360 // writers a short window to enqueue so the
361 // shared fsync covers more commits. 150 µs costs
362 // ~3.5% on a solo 4.2 ms fsync but multiplies
363 // batch size under load. Tunable via
364 // SPG_COMMIT_DELAY_US (0 disables).
365 let delay = commit_delay_us();
366 if delay > 0 {
367 std::thread::sleep(std::time::Duration::from_micros(delay));
368 }
369 let (batch, flush_to) = {
370 let mut g2 = self.state.lock().expect("wal state poisoned");
371 (core::mem::take(&mut g2.buf), g2.enqueued_seq)
372 };
373 let io_result: std::io::Result<()> = (|| {
374 let mut f = self.file.lock().expect("wal file poisoned");
375 f.write_all(&batch)?;
376 f.sync_data()
377 })();
378 g = self.state.lock().expect("wal state poisoned");
379 g.leader_active = false;
380 match io_result {
381 Ok(()) => {
382 g.flushed_seq = flush_to;
383 g.written_len = g.written_len.saturating_add(batch.len() as u64);
384 }
385 Err(e) => {
386 g.failed = Some(e.to_string());
387 }
388 }
389 self.cond.notify_all();
390 //
391
392 // Loop continues: either our seq is now covered
393 // (leader path normally returns next iteration)
394 // or the error branch surfaces.
395 continue;
396 }
397 g = self.cond.wait(g).expect("wal condvar poisoned");
398 }
399 }
400
401 /// Drain the pending batch + flush synchronously. Caller must
402 /// guarantee no concurrent enqueues (checkpoint holds the
403 /// engine exclusively). Used before rotation so the marker
404 /// lands in the right chunk.
405 fn flush_now(&self) -> Result<(), EngineError> {
406 let mut g = self.state.lock().expect("wal state poisoned");
407 if let Some(e) = &g.failed {
408 return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
409 format!("WAL poisoned: {e}"),
410 )));
411 }
412 let batch = core::mem::take(&mut g.buf);
413 let flush_to = g.enqueued_seq;
414 if batch.is_empty() {
415 return Ok(());
416 }
417 drop(g);
418 let io: std::io::Result<()> = (|| {
419 let mut f = self.file.lock().expect("wal file poisoned");
420 f.write_all(&batch)?;
421 f.sync_data()
422 })();
423 let mut g = self.state.lock().expect("wal state poisoned");
424 match io {
425 Ok(()) => {
426 g.flushed_seq = flush_to;
427 g.written_len = g.written_len.saturating_add(batch.len() as u64);
428 self.cond.notify_all();
429 Ok(())
430 }
431 Err(e) => {
432 g.failed = Some(e.to_string());
433 self.cond.notify_all();
434 Err(io_err(e))
435 }
436 }
437 }
438
439 /// Swap the active chunk handle (rotation). Caller flushes
440 /// first; both locks taken in canonical order.
441 fn rotate_file(&self, new_file: File) {
442 let mut g = self.state.lock().expect("wal state poisoned");
443 let mut f = self.file.lock().expect("wal file poisoned");
444 *f = new_file;
445 g.written_len = 0;
446 }
447
448 fn written_len(&self) -> u64 {
449 let g = self.state.lock().expect("wal state poisoned");
450 g.written_len + g.buf.len() as u64
451 }
452}
453
454impl WalTicket {
455 /// Block until the record this ticket covers is durable.
456 ///
457 /// Under `SPG_SYNCHRONOUS_COMMIT=off` this returns
458 /// immediately — the background flusher (or the next
459 /// checkpoint / clean shutdown) makes the record durable
460 /// within `SPG_WAL_WRITER_DELAY_MS`. Same contract as PG's
461 /// `synchronous_commit = off`.
462 ///
463 /// # Errors
464 /// Surfaces the leader's IO error if the batch flush failed
465 /// (the WAL is then poisoned for all subsequent writes).
466 pub fn wait(&self) -> Result<(), EngineError> {
467 if !synchronous_commit_on() {
468 return Ok(());
469 }
470 self.group.wait_flushed(self.seq)
471 }
472}
473
474/// v7.19 P3 — retention sweep loop. Runs in a dedicated thread
475/// spawned by `Database::open_path` when `SPG_PITR_RETENTION_HOURS`
476/// is set to a non-zero value. Wakes every
477/// `SPG_PITR_RETENTION_CHECK_SEC` (default 60 s), enumerates chunks
478/// under `wal_dir`, archives via `SPG_PITR_ARCHIVE_CMD` if set, and
479/// deletes anything older than `retention_hours`.
480///
481/// Loud-failure posture matches PG's `archive_command`: if the
482/// archive command returns non-zero, the chunk stays on disk and
483/// a warning prints to stderr. The retention sweep doesn't delete
484/// a chunk it failed to archive.
485fn retention_sweep_loop(
486 wal_dir: PathBuf,
487 retention_hours: u64,
488 check_interval: std::time::Duration,
489 archive_cmd: Option<String>,
490 shutdown: Arc<AtomicBool>,
491) {
492 while !shutdown.load(Ordering::SeqCst) {
493 if let Err(e) = retention_sweep_once(&wal_dir, retention_hours, archive_cmd.as_deref()) {
494 eprintln!("spg-embedded: retention sweep error: {e}");
495 }
496 // Sleep in short ticks so shutdown isn't blocked on a
497 // 60 s naptime when Drop signals.
498 let mut elapsed = std::time::Duration::ZERO;
499 let tick = std::time::Duration::from_millis(250);
500 while elapsed < check_interval {
501 if shutdown.load(Ordering::SeqCst) {
502 return;
503 }
504 std::thread::sleep(tick);
505 elapsed += tick;
506 }
507 }
508}
509
510/// v7.19 P3 — one retention sweep pass over `wal_dir`. Extracted
511/// from the loop so tests can drive it directly. Public so the
512/// e2e_pitr_retention integration test (and any future operator
513/// tooling that wants synchronous retention) can call it.
514pub fn retention_sweep_once(
515 wal_dir: &Path,
516 retention_hours: u64,
517 archive_cmd: Option<&str>,
518) -> std::io::Result<()> {
519 if !wal_dir.exists() {
520 return Ok(());
521 }
522 let now_us = wall_clock_micros();
523 let cutoff_us = (now_us as i128 - (retention_hours as i128 * 3_600 * 1_000_000)) as i64;
524 let chunks = sorted_wal_chunks(wal_dir)?;
525 for chunk in chunks {
526 // Don't sweep the most-recent chunk; it's the live one
527 // execute() is appending to. Compare against the largest
528 // filename-prefix unix_us.
529 let stem = match chunk.file_stem().and_then(|s| s.to_str()) {
530 Some(s) => s,
531 None => continue,
532 };
533 let chunk_us: i64 = stem
534 .split_once('_')
535 .and_then(|(prefix, _)| i64::from_str_radix(prefix, 16).ok())
536 .unwrap_or(0);
537 if chunk_us >= cutoff_us {
538 continue;
539 }
540 // Archive first if requested.
541 if let Some(cmd) = archive_cmd {
542 if !cmd.is_empty() {
543 let output = std::process::Command::new("sh")
544 .arg("-c")
545 .arg(cmd)
546 .arg("--")
547 .arg(&chunk)
548 .output()?;
549 if !output.status.success() {
550 eprintln!(
551 "spg-embedded: SPG_PITR_ARCHIVE_CMD failed for {} (exit {}); chunk stays on disk",
552 chunk.display(),
553 output.status.code().unwrap_or(-1)
554 );
555 continue;
556 }
557 }
558 }
559 // Delete the chunk + its sibling .checksum if present.
560 if let Err(e) = std::fs::remove_file(&chunk) {
561 eprintln!(
562 "spg-embedded: retention remove {} failed: {e}",
563 chunk.display()
564 );
565 continue;
566 }
567 let mut cs = chunk.clone();
568 let mut name = cs.file_name().map(|n| n.to_os_string()).unwrap_or_default();
569 name.push(".checksum");
570 cs.set_file_name(name);
571 let _ = std::fs::remove_file(&cs);
572 }
573 Ok(())
574}
575
576/// v7.20 — group-commit delay window in µs (PG `commit_delay`
577/// analogue). The flush leader sleeps this long before taking
578/// the batch so concurrent writers pile in. Default 150 µs;
579/// `SPG_COMMIT_DELAY_US=0` disables.
580fn commit_delay_us() -> u64 {
581 static CACHED: std::sync::OnceLock<u64> = std::sync::OnceLock::new();
582 *CACHED.get_or_init(|| {
583 std::env::var("SPG_COMMIT_DELAY_US")
584 .ok()
585 .and_then(|s| s.parse::<u64>().ok())
586 .unwrap_or(150)
587 })
588}
589
590/// v7.20 — PG `synchronous_commit` analogue. `on` (default):
591/// `execute()` blocks until its WAL record is fsynced —
592/// zero-loss durability. `off`: `execute()` returns after the
593/// in-memory mutation + WAL enqueue; a background flusher
594/// thread writes + fsyncs every `SPG_WAL_WRITER_DELAY_MS`
595/// (default 200 ms — PG's `wal_writer_delay` default). Crash
596/// window = up to one flush interval of confirmed-but-unsynced
597/// commits — exactly the trade PG documents for the same
598/// setting. Clean shutdown (Drop / checkpoint) always flushes.
599fn synchronous_commit_on() -> bool {
600 static CACHED: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
601 *CACHED.get_or_init(|| {
602 !std::env::var("SPG_SYNCHRONOUS_COMMIT")
603 .map(|v| v.eq_ignore_ascii_case("off") || v == "0" || v.eq_ignore_ascii_case("false"))
604 .unwrap_or(false)
605 })
606}
607
608/// v7.20 — background WAL flusher cadence for
609/// `SPG_SYNCHRONOUS_COMMIT=off` (PG `wal_writer_delay`).
610fn wal_writer_delay_ms() -> u64 {
611 static CACHED: std::sync::OnceLock<u64> = std::sync::OnceLock::new();
612 *CACHED.get_or_init(|| {
613 std::env::var("SPG_WAL_WRITER_DELAY_MS")
614 .ok()
615 .and_then(|s| s.parse::<u64>().ok())
616 .filter(|&n| n > 0)
617 .unwrap_or(200)
618 })
619}
620
621fn pitr_retention_hours() -> u64 {
622 std::env::var("SPG_PITR_RETENTION_HOURS")
623 .ok()
624 .and_then(|s| s.parse::<u64>().ok())
625 .unwrap_or(0)
626}
627
628fn pitr_retention_check_sec() -> u64 {
629 std::env::var("SPG_PITR_RETENTION_CHECK_SEC")
630 .ok()
631 .and_then(|s| s.parse::<u64>().ok())
632 .filter(|&n| n > 0)
633 .unwrap_or(60)
634}
635
636fn pitr_archive_cmd() -> Option<String> {
637 std::env::var("SPG_PITR_ARCHIVE_CMD")
638 .ok()
639 .filter(|s| !s.is_empty())
640}
641
642/// v7.19 — replay every record from `wal_bytes` whose
643/// `commit_lsn` is strictly greater than `floor_lsn`. v3 records
644/// (no LSN) and v4 records with `commit_lsn <= floor_lsn` are
645/// skipped — the snapshot loaded ahead of this call already
646/// reflects them, and re-applying would DuplicateTable /
647/// double-insert. v3 records inside the legacy migration chunk
648/// always apply because the migration sets `floor_lsn = 0` and
649/// v3 records carry no LSN to compare; the pre-migration
650/// behaviour (every record replays) is what the migration
651/// preserves.
652///
653/// Returns the count of records successfully applied. Same
654/// torn-tail semantics as `replay_wal_into_engine`.
655fn replay_wal_filtered(
656 wal_bytes: &[u8],
657 engine: &mut Engine,
658 floor_lsn: u64,
659 quarantine: &mut Vec<QuarantinedStmt>,
660) -> Result<usize, String> {
661 let records = parse_wal_records(wal_bytes)?;
662 let mut applied = 0usize;
663 for r in &records {
664 // Skip markers + non-SQL records.
665 if r.type_byte == WAL_V3_TYPE_DURABILITY_CHECKPOINT
666 || r.type_byte == WAL_V4_TYPE_CHECKPOINT_MARKER
667 {
668 continue;
669 }
670 // v4 SQL records carry an LSN. Apply iff strictly above
671 // the snapshot floor.
672 if r.type_byte == WAL_V4_TYPE_AUTO_COMMIT_SQL || r.type_byte == WAL_V4_TYPE_TX_COMMIT_SQL {
673 if let Some(lsn) = r.commit_lsn {
674 if lsn <= floor_lsn {
675 continue;
676 }
677 }
678 }
679 // v3 records (type 0x01, no LSN) always apply — the
680 // legacy migration path is the only place they appear,
681 // and floor_lsn=0 there.
682 let sql = match std::str::from_utf8(r.sql) {
683 Ok(s) => s,
684 Err(e) => return Err(format!("non-UTF-8 SQL at offset {}: {e}", r.offset)),
685 };
686 // v7.21 — a tx-commit record carries the whole transaction
687 // as a `";\n"`-joined script; auto-commit records are a
688 // single statement, for which split_statements is a no-op.
689 //
690 // v7.30.1 (mailrs round-24 ask 2) — a statement the engine
691 // REJECTS is quarantined, not fatal: "one statement failed
692 // to replay" ≠ "the catalog is corrupt". Framing damage
693 // (parse_wal_records / non-UTF-8 above) still errors — that
694 // IS corruption. Subsequent statements of a tx script keep
695 // applying: the bricking class is a no-op-at-runtime
696 // statement that re-applies non-idempotently, and skipping
697 // just it reconstructs the runtime state.
698 for stmt in split_statements(sql) {
699 if let Err(e) = engine.execute(stmt) {
700 quarantine.push(QuarantinedStmt {
701 offset: r.offset,
702 sql: stmt.to_string(),
703 error: format!("{e:?}"),
704 });
705 }
706 }
707 applied += 1;
708 }
709 Ok(applied)
710}
711
712/// v7.30.1 (mailrs round-24 ask 2) — one statement that failed to
713/// re-apply during boot replay. Kept for forensics in a
714/// `quarantine-*.log` beside the WAL chunks; the boot continues.
715struct QuarantinedStmt {
716 offset: usize,
717 sql: String,
718 error: String,
719}
720
721fn format_quarantine_line(q: &QuarantinedStmt) -> String {
722 format!("offset {}: {}\n rejected: {}\n", q.offset, q.sql, q.error)
723}
724
725/// v7.19 — WAL chunk filename format. Zero-padded 16-digit
726/// hex on both parts so default lexicographic sort matches
727/// numeric order, with the unix_us prefix coming first so
728/// the on-disk listing is chronological too.
729fn chunk_filename(unix_us: i64, leading_lsn: u64) -> String {
730 // Negative timestamps shouldn't happen in practice (we sit
731 // post-1970), but clamp to 0 so the zero-padded
732 // representation stays sortable.
733 let us = unix_us.max(0) as u64;
734 format!("{us:016x}_{leading_lsn:016x}.wal")
735}
736
737/// v7.19 — filename used for the legacy single-file WAL when
738/// `open_path` migrates a v7.18-layout database into the new
739/// chunk directory. Lexicographically smallest possible value
740/// so subsequent chunks sort after it.
741fn legacy_chunk_filename() -> String {
742 chunk_filename(0, 0)
743}
744
745/// v7.19 — list every `.wal` file in `wal_dir` in
746/// lexicographic order (which doubles as chunk-creation
747/// order thanks to the zero-padded filename format).
748fn sorted_wal_chunks(wal_dir: &Path) -> std::io::Result<Vec<PathBuf>> {
749 let mut paths = Vec::new();
750 let read_dir = match std::fs::read_dir(wal_dir) {
751 Ok(rd) => rd,
752 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(paths),
753 Err(e) => return Err(e),
754 };
755 for entry in read_dir {
756 let entry = entry?;
757 let path = entry.path();
758 if path.extension().and_then(|s| s.to_str()) == Some("wal") {
759 paths.push(path);
760 }
761 }
762 paths.sort();
763 Ok(paths)
764}
765
766/// v7.18 PITR — encode one v4 `checkpoint_marker` record. Layout:
767///
768/// ```text
769/// [u32 LE (payload_len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
770/// [u32 LE crc32 over (type_byte || payload)]
771/// [u8 type = 0x11]
772/// payload:
773/// [u64 LE checkpoint_lsn]
774/// [i64 LE checkpoint_unix_us (WAL_V4_NO_CLOCK if no clock)]
775/// [u16 LE snapshot_path_len]
776/// [snapshot_path_bytes]
777/// ```
778///
779/// `payload_len` covers only the payload — keeping the framing
780/// uniform across v3 / v4 record types so torn-write detection in
781/// `replay_wal_into_engine` stays trivial.
782fn encode_v4_checkpoint_marker(
783 checkpoint_lsn: u64,
784 checkpoint_unix_us: i64,
785 snapshot_path: &Path,
786) -> Vec<u8> {
787 let snapshot_bytes = snapshot_path.to_string_lossy().into_owned();
788 let snap_payload = snapshot_bytes.as_bytes();
789 let snap_len_u16: u16 = snap_payload.len().min(u16::MAX as usize) as u16;
790 let mut payload = Vec::with_capacity(8 + 8 + 2 + snap_payload.len());
791 payload.extend_from_slice(&checkpoint_lsn.to_le_bytes());
792 payload.extend_from_slice(&checkpoint_unix_us.to_le_bytes());
793 payload.extend_from_slice(&snap_len_u16.to_le_bytes());
794 payload.extend_from_slice(&snap_payload[..snap_len_u16 as usize]);
795 let mut crc_buf = Vec::with_capacity(1 + payload.len());
796 crc_buf.push(WAL_V4_TYPE_CHECKPOINT_MARKER);
797 crc_buf.extend_from_slice(&payload);
798 let crc = spg_crypto::crc32::crc32(&crc_buf);
799 let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
800 let mut out = Vec::with_capacity(4 + 4 + 1 + payload.len());
801 out.extend_from_slice(&header);
802 out.extend_from_slice(&crc.to_le_bytes());
803 out.push(WAL_V4_TYPE_CHECKPOINT_MARKER);
804 out.extend_from_slice(&payload);
805 out
806}
807
808/// v7.18 PITR — encode one v4 `auto_commit_sql` record. Layout:
809///
810/// ```text
811/// [u32 LE (sql_len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
812/// [u32 LE crc32 over (type_byte || lsn || ts || sql_bytes)]
813/// [u8 type = 0x10]
814/// [u64 LE commit_lsn]
815/// [i64 LE commit_unix_us (= WAL_V4_NO_CLOCK when no ClockFn)]
816/// [sql bytes]
817/// ```
818///
819/// `sql_len` field stays the SQL byte count — same shape as v3 — so
820/// replay-buffer torn-write detection compares against
821/// `WAL_V4_EXTRA_HEADER + sql_len`. v3 records (type 0x01) stay
822/// readable by the same loop with their original 9-byte header
823/// arithmetic.
824fn encode_v4_auto_commit(sql: &str, commit_lsn: u64, commit_unix_us: i64) -> Vec<u8> {
825 encode_v4_sql_record(WAL_V4_TYPE_AUTO_COMMIT_SQL, sql, commit_lsn, commit_unix_us)
826}
827
828/// v7.21 — same envelope, `WAL_V4_TYPE_TX_COMMIT_SQL` type byte.
829/// `script` = the transaction's statements joined with `";\n"`.
830fn encode_v4_tx_commit(script: &str, commit_lsn: u64, commit_unix_us: i64) -> Vec<u8> {
831 encode_v4_sql_record(
832 WAL_V4_TYPE_TX_COMMIT_SQL,
833 script,
834 commit_lsn,
835 commit_unix_us,
836 )
837}
838
839fn encode_v4_sql_record(type_byte: u8, sql: &str, commit_lsn: u64, commit_unix_us: i64) -> Vec<u8> {
840 let payload = sql.as_bytes();
841 let mut crc_buf = Vec::with_capacity(1 + WAL_V4_EXTRA_HEADER + payload.len());
842 crc_buf.push(type_byte);
843 crc_buf.extend_from_slice(&commit_lsn.to_le_bytes());
844 crc_buf.extend_from_slice(&commit_unix_us.to_le_bytes());
845 crc_buf.extend_from_slice(payload);
846 let crc = spg_crypto::crc32::crc32(&crc_buf);
847 let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
848 let mut out = Vec::with_capacity(4 + 4 + 1 + WAL_V4_EXTRA_HEADER + payload.len());
849 out.extend_from_slice(&header);
850 out.extend_from_slice(&crc.to_le_bytes());
851 out.push(type_byte);
852 out.extend_from_slice(&commit_lsn.to_le_bytes());
853 out.extend_from_slice(&commit_unix_us.to_le_bytes());
854 out.extend_from_slice(payload);
855 out
856}
857
858/// v7.1 — decode + apply every record in `wal_bytes` to `engine`.
859/// Returns the count of records successfully applied. A truncated
860/// trailing record (mid-write torn) is dropped silently — the
861/// same recovery story `spg-server`'s boot path uses.
862fn replay_wal_into_engine(wal_bytes: &[u8], engine: &mut Engine) -> Result<usize, String> {
863 let mut applied = 0usize;
864 let mut cur = 0usize;
865 while cur < wal_bytes.len() {
866 if wal_bytes.len() - cur < 4 {
867 // Trailing partial header — torn write, drop and stop.
868 break;
869 }
870 let raw_len = u32::from_le_bytes(wal_bytes[cur..cur + 4].try_into().unwrap());
871 let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
872 let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
873 let len_mask = if is_v3 {
874 !(WAL_V2_SENTINEL | WAL_V3_FLAG)
875 } else {
876 !WAL_V2_SENTINEL
877 };
878 let rec_len = (raw_len & len_mask) as usize;
879 let header_len = if is_v3 {
880 9
881 } else if is_v2 {
882 8
883 } else {
884 4
885 };
886 if wal_bytes.len() - cur < header_len + rec_len {
887 // Torn record at the tail — drop, stop.
888 break;
889 }
890 if is_v3 {
891 let type_byte = wal_bytes[cur + 8];
892 match type_byte {
893 WAL_V3_TYPE_AUTO_COMMIT_SQL => {}
894 WAL_V3_TYPE_DURABILITY_CHECKPOINT => {
895 // durability_checkpoint marker — skip, no SQL.
896 cur += header_len + rec_len;
897 continue;
898 }
899 WAL_V4_TYPE_CHECKPOINT_MARKER => {
900 // v7.18 PITR — checkpoint anchor, skip on replay
901 // (engine state past this point reflects the
902 // matching snapshot already loaded by the caller).
903 cur += header_len + rec_len;
904 continue;
905 }
906 WAL_V4_TYPE_AUTO_COMMIT_SQL | WAL_V4_TYPE_TX_COMMIT_SQL => {
907 // v7.18 PITR — v4 record carries 16 bytes of
908 // (commit_lsn, commit_unix_us) between the type
909 // byte and the SQL payload. Replay reads them but
910 // does not enforce them — the engine doesn't
911 // surface LSN/clock here. Restore tooling
912 // (spgctl) parses them via parse_wal_record below.
913 //
914 // v7.21 — tx-commit records (0x12) carry a whole
915 // transaction as a `";\n"`-joined script;
916 // split_statements is a no-op on the single-
917 // statement auto-commit form.
918 let v4_total = header_len + WAL_V4_EXTRA_HEADER + rec_len;
919 if wal_bytes.len() - cur < v4_total {
920 // Torn v4 record at the tail — drop, stop.
921 break;
922 }
923 let sql_start = cur + header_len + WAL_V4_EXTRA_HEADER;
924 let sql_bytes = &wal_bytes[sql_start..sql_start + rec_len];
925 let sql = std::str::from_utf8(sql_bytes)
926 .map_err(|e| format!("WAL replay: non-UTF-8 SQL at offset {cur}: {e}"))?;
927 for stmt in split_statements(sql) {
928 engine.execute(stmt).map_err(|e| {
929 format!("WAL replay: apply {stmt:?} at offset {cur} rejected: {e:?}")
930 })?;
931 }
932 applied += 1;
933 cur += v4_total;
934 continue;
935 }
936 other => {
937 return Err(format!(
938 "WAL replay: unknown v3 type byte {other:#04x} at offset {cur}"
939 ));
940 }
941 }
942 }
943 let sql_bytes = &wal_bytes[cur + header_len..cur + header_len + rec_len];
944 let sql = std::str::from_utf8(sql_bytes)
945 .map_err(|e| format!("WAL replay: non-UTF-8 SQL at offset {cur}: {e}"))?;
946 engine
947 .execute(sql)
948 .map_err(|e| format!("WAL replay: apply {sql:?} at offset {cur} rejected: {e:?}"))?;
949 applied += 1;
950 cur += header_len + rec_len;
951 }
952 Ok(applied)
953}
954
955/// v7.18 PITR — parsed WAL record, surfaced for restore / verify
956/// tooling. The replay loop above doesn't expose LSN/timestamp;
957/// `spgctl restore --to <timestamp>` and `spgctl verify` need them.
958/// Returned offsets are byte-positions inside the WAL buffer.
959#[derive(Debug, Clone)]
960pub struct WalRecord<'a> {
961 /// Byte offset in the WAL buffer where this record starts.
962 pub offset: usize,
963 /// Type byte (0x01 = v3 auto-commit, 0x10 = v4 auto-commit,
964 /// 0x02 = durability checkpoint marker).
965 pub type_byte: u8,
966 /// `Some(lsn)` for v4 records, `None` for v3.
967 pub commit_lsn: Option<u64>,
968 /// `Some(unix_us)` for v4 records carrying a clock-set timestamp,
969 /// `None` for v3 or for v4 records explicitly written with
970 /// `WAL_V4_NO_CLOCK` (sentinel for "no ClockFn at commit time").
971 pub commit_unix_us: Option<i64>,
972 /// SQL payload as borrowed bytes. Empty for durability markers.
973 pub sql: &'a [u8],
974}
975
976/// v7.18 PITR — iterate over `wal_bytes` yielding one `WalRecord`
977/// per intact record. Torn-tail records terminate iteration
978/// silently (same recovery story as `replay_wal_into_engine`).
979/// Unknown type bytes inside a v3 envelope return `Err` so the
980/// caller knows the WAL was written by a newer SPG.
981pub fn parse_wal_records(wal_bytes: &[u8]) -> Result<Vec<WalRecord<'_>>, String> {
982 let mut out = Vec::new();
983 let mut cur = 0usize;
984 while cur < wal_bytes.len() {
985 if wal_bytes.len() - cur < 4 {
986 break;
987 }
988 let raw_len = u32::from_le_bytes(wal_bytes[cur..cur + 4].try_into().unwrap());
989 let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
990 let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
991 let len_mask = if is_v3 {
992 !(WAL_V2_SENTINEL | WAL_V3_FLAG)
993 } else {
994 !WAL_V2_SENTINEL
995 };
996 let rec_len = (raw_len & len_mask) as usize;
997 let header_len = if is_v3 {
998 9
999 } else if is_v2 {
1000 8
1001 } else {
1002 4
1003 };
1004 if wal_bytes.len() - cur < header_len + rec_len {
1005 break;
1006 }
1007 if !is_v3 {
1008 // v1 / v2 records carry no type byte; treat as legacy
1009 // auto-commit SQL with no LSN/time.
1010 let sql = &wal_bytes[cur + header_len..cur + header_len + rec_len];
1011 out.push(WalRecord {
1012 offset: cur,
1013 type_byte: WAL_V3_TYPE_AUTO_COMMIT_SQL,
1014 commit_lsn: None,
1015 commit_unix_us: None,
1016 sql,
1017 });
1018 cur += header_len + rec_len;
1019 continue;
1020 }
1021 let type_byte = wal_bytes[cur + 8];
1022 match type_byte {
1023 WAL_V3_TYPE_AUTO_COMMIT_SQL => {
1024 let sql = &wal_bytes[cur + header_len..cur + header_len + rec_len];
1025 out.push(WalRecord {
1026 offset: cur,
1027 type_byte,
1028 commit_lsn: None,
1029 commit_unix_us: None,
1030 sql,
1031 });
1032 cur += header_len + rec_len;
1033 }
1034 WAL_V3_TYPE_DURABILITY_CHECKPOINT => {
1035 out.push(WalRecord {
1036 offset: cur,
1037 type_byte,
1038 commit_lsn: None,
1039 commit_unix_us: None,
1040 sql: &[],
1041 });
1042 cur += header_len + rec_len;
1043 }
1044 WAL_V4_TYPE_CHECKPOINT_MARKER => {
1045 // v7.18 PITR — payload = (lsn u64)(ts i64)(path_len u16)(path bytes).
1046 // We surface lsn + ts on the WalRecord; the path lives
1047 // in `sql` since the type byte already disambiguates
1048 // record meaning and adding a dedicated field would
1049 // bloat the iterator return type for every variant.
1050 if rec_len < 18 {
1051 return Err(format!(
1052 "WAL parse: checkpoint marker at offset {cur} too short ({rec_len} bytes)"
1053 ));
1054 }
1055 let lsn = u64::from_le_bytes(
1056 wal_bytes[cur + header_len..cur + header_len + 8]
1057 .try_into()
1058 .unwrap(),
1059 );
1060 let ts_raw = i64::from_le_bytes(
1061 wal_bytes[cur + header_len + 8..cur + header_len + 16]
1062 .try_into()
1063 .unwrap(),
1064 );
1065 let path_len = u16::from_le_bytes(
1066 wal_bytes[cur + header_len + 16..cur + header_len + 18]
1067 .try_into()
1068 .unwrap(),
1069 ) as usize;
1070 if rec_len < 18 + path_len {
1071 return Err(format!(
1072 "WAL parse: checkpoint marker at offset {cur} truncated path"
1073 ));
1074 }
1075 let path_start = cur + header_len + 18;
1076 let path_bytes = &wal_bytes[path_start..path_start + path_len];
1077 let commit_unix_us = if ts_raw == WAL_V4_NO_CLOCK {
1078 None
1079 } else {
1080 Some(ts_raw)
1081 };
1082 out.push(WalRecord {
1083 offset: cur,
1084 type_byte,
1085 commit_lsn: Some(lsn),
1086 commit_unix_us,
1087 sql: path_bytes,
1088 });
1089 cur += header_len + rec_len;
1090 }
1091 WAL_V4_TYPE_AUTO_COMMIT_SQL | WAL_V4_TYPE_TX_COMMIT_SQL => {
1092 let v4_total = header_len + WAL_V4_EXTRA_HEADER + rec_len;
1093 if wal_bytes.len() - cur < v4_total {
1094 break;
1095 }
1096 let lsn = u64::from_le_bytes(
1097 wal_bytes[cur + header_len..cur + header_len + 8]
1098 .try_into()
1099 .unwrap(),
1100 );
1101 let ts_raw = i64::from_le_bytes(
1102 wal_bytes[cur + header_len + 8..cur + header_len + 16]
1103 .try_into()
1104 .unwrap(),
1105 );
1106 let commit_unix_us = if ts_raw == WAL_V4_NO_CLOCK {
1107 None
1108 } else {
1109 Some(ts_raw)
1110 };
1111 let sql_start = cur + header_len + WAL_V4_EXTRA_HEADER;
1112 let sql = &wal_bytes[sql_start..sql_start + rec_len];
1113 out.push(WalRecord {
1114 offset: cur,
1115 type_byte,
1116 commit_lsn: Some(lsn),
1117 commit_unix_us,
1118 sql,
1119 });
1120 cur += v4_total;
1121 }
1122 other => {
1123 return Err(format!(
1124 "WAL parse: unknown type byte {other:#04x} at offset {cur}"
1125 ));
1126 }
1127 }
1128 }
1129 Ok(out)
1130}
1131
1132/// v7.1 — predicate for "should the next `execute()` mutate the
1133/// WAL?" Returns `false` for SELECT / SHOW / EXPLAIN / BEGIN /
1134/// COMMIT / ROLLBACK and the SPG-specific verbs that don't go
1135/// through the auto-commit record path on the server (CHECKPOINT,
1136/// COMPACT). Conservative: anything we don't explicitly know is
1137/// read-only falls through to "write a WAL record".
1138fn sql_is_read_only(sql: &str) -> bool {
1139 let t = sql.trim_start();
1140 let head = t
1141 .split(|c: char| c.is_whitespace() || c == ';' || c == '(')
1142 .next()
1143 .unwrap_or("");
1144 matches!(
1145 head.to_ascii_lowercase().as_str(),
1146 "select"
1147 | "show"
1148 | "explain"
1149 | "begin"
1150 | "commit"
1151 | "rollback"
1152 | "checkpoint"
1153 | "compact"
1154 | "wait"
1155 | "with"
1156 )
1157}
1158
1159/// Embedded SPG database handle. Owns an `Engine` + provides
1160/// ergonomic wrappers around `execute` and `query`. Drops the
1161/// engine on `Drop` — no WAL flush / fsync, because v6.10.3
1162/// is in-memory only.
1163#[derive(Debug)]
1164pub struct Database {
1165 engine: Engine,
1166 /// v7.1 — persistence sidecar. When `Some(p)`, every
1167 /// `execute(sql)` that mutates state appends a v4
1168 /// `auto_commit_sql` WAL record + fsyncs before the call
1169 /// returns; `Drop` writes a final catalog snapshot to
1170 /// `<db_path>` so the next session boots from a clean
1171 /// snapshot + an empty WAL. `None` = in-memory only (the
1172 /// v6.10.3 shape).
1173 persistence: Option<PersistenceCtx>,
1174 /// v7.18 PITR — monotonic per-database commit LSN. Increments
1175 /// before each successful WAL append; bootstrapped at
1176 /// open_path from `max(parse_wal_records → commit_lsn)` so
1177 /// reopen never reuses an LSN. In-memory databases start at
1178 /// 0 and never advance (no WAL = no LSN-meaningful records).
1179 commit_lsn: AtomicU64,
1180 /// v7.21 (round-12 polish) — explicit-transaction WAL buffer.
1181 /// `Some` between an engine-accepted BEGIN and its
1182 /// COMMIT / ROLLBACK on a persistent database. In-transaction
1183 /// mutations only touch the engine's shadow catalog and report
1184 /// `modified_catalog: false`, so the per-statement auto-commit
1185 /// append never fires for them; their bind-final SQL collects
1186 /// here instead and COMMIT flushes the lot as ONE atomic
1187 /// `WAL_V4_TYPE_TX_COMMIT_SQL` record (ROLLBACK just drops it).
1188 /// Always `None` for in-memory databases.
1189 tx_wal: Option<TxWalBuffer>,
1190}
1191
1192/// See [`Database::tx_wal`].
1193#[derive(Debug, Default)]
1194struct TxWalBuffer {
1195 /// Bind-final SQL of every non-read-only statement the engine
1196 /// accepted inside the open transaction, in execution order.
1197 statements: Vec<String>,
1198 /// `(savepoint_name, statements.len() at SAVEPOINT time)` —
1199 /// `ROLLBACK TO SAVEPOINT` truncates `statements` back to the
1200 /// recorded mark so the WAL record matches what the engine
1201 /// keeps. PG name-reuse semantics (latest wins).
1202 savepoints: Vec<(String, usize)>,
1203}
1204
1205/// Statement-level transaction-control classification for the WAL
1206/// buffer. Runs AFTER the engine accepted the statement, so the
1207/// engine stays the single validator — this only mirrors state.
1208enum TxControl {
1209 Begin,
1210 Commit,
1211 Rollback,
1212 RollbackToSavepoint(String),
1213 Savepoint(String),
1214 ReleaseSavepoint,
1215}
1216
1217fn tx_control_kind(sql: &str) -> Option<TxControl> {
1218 let mut words = sql
1219 .split(|c: char| c.is_whitespace() || c == ';')
1220 .filter(|w| !w.is_empty())
1221 .map(str::to_ascii_lowercase);
1222 let head = words.next()?;
1223 match head.as_str() {
1224 "begin" | "start" => Some(TxControl::Begin),
1225 "commit" | "end" => Some(TxControl::Commit),
1226 "savepoint" => words.next().map(TxControl::Savepoint),
1227 "release" => Some(TxControl::ReleaseSavepoint),
1228 "rollback" => match words.next().as_deref() {
1229 // ROLLBACK TO [SAVEPOINT] <name>
1230 Some("to") => {
1231 let next = words.next()?;
1232 let name = if next == "savepoint" {
1233 words.next()?
1234 } else {
1235 next
1236 };
1237 Some(TxControl::RollbackToSavepoint(name))
1238 }
1239 _ => Some(TxControl::Rollback),
1240 },
1241 _ => None,
1242 }
1243}
1244
1245#[derive(Debug)]
1246#[allow(dead_code)] // `wal_dir`/`current_chunk_path` are read at boot; kept for Drop/diag introspection.
1247struct PersistenceCtx {
1248 db_path: PathBuf,
1249 /// v7.19 — WAL chunk directory at `<db_path>.wal/`.
1250 /// Replaces the v7.18 single-file `<db_path>.wal` layout.
1251 /// Each chunk file inside is named
1252 /// `<unix_us>_<leading_lsn>.wal` (zero-padded to 16 digits
1253 /// so default-lex sort = LSN order).
1254 wal_dir: PathBuf,
1255 /// Path of the currently-open chunk file inside `wal_dir`.
1256 /// Rotated at checkpoint and whenever the chunk crosses
1257 /// `checkpoint_threshold_bytes`.
1258 current_chunk_path: PathBuf,
1259 /// v7.19 P3 — retention sweeper handle. `Some` when
1260 /// `SPG_PITR_RETENTION_HOURS > 0` at open_path time; `None`
1261 /// when retention is disabled (the default; v7.18 behaviour
1262 /// preserved). The thread polls `wal_dir` every
1263 /// `SPG_PITR_RETENTION_CHECK_SEC` seconds, archives via
1264 /// `SPG_PITR_ARCHIVE_CMD` if set, then deletes chunks older
1265 /// than the retention window. Signalled to exit via
1266 /// `retention_shutdown` on Drop.
1267 retention_shutdown: Option<Arc<AtomicBool>>,
1268 retention_thread: Option<std::thread::JoinHandle<()>>,
1269 /// v7.20 — background WAL flusher for
1270 /// `SPG_SYNCHRONOUS_COMMIT=off`. `None` in the default
1271 /// synchronous mode. Flushes the pending batch every
1272 /// `SPG_WAL_WRITER_DELAY_MS`; signalled + joined on Drop
1273 /// before the final checkpoint so clean shutdown never
1274 /// loses confirmed commits.
1275 flusher_shutdown: Option<Arc<AtomicBool>>,
1276 flusher_thread: Option<std::thread::JoinHandle<()>>,
1277 /// v7.20 P2 — group-commit WAL. Shared with WalTickets
1278 /// returned by the buffered write path so `wait()` can run
1279 /// after the engine write lock is released.
1280 wal: Arc<WalGroup>,
1281 checkpoint_threshold_bytes: u64,
1282 /// v7.1.4 — `<db_path>.spg/segments/` directory. Cold-tier
1283 /// segments produced by `freeze_oldest_to_cold` / compaction
1284 /// are persisted here as `seg_<id>.spg` files; the manifest
1285 /// at `<db_path>.spg/manifest.v10` records every active
1286 /// segment + its CRC32 so the next boot can verify + reload.
1287 cold_segments_dir: PathBuf,
1288 cold_segment_paths: BTreeMap<u32, PathBuf>,
1289 /// v7.17.0 Phase 6.2 — cross-process exclusion lock. Acquired
1290 /// via `fs::create_dir` on `<db_path>.lock` at open_path
1291 /// entry; released on Drop by `fs::remove_dir`. atomic on
1292 /// every supported platform. A second process opening the
1293 /// same path while the first is still alive hits the
1294 /// create_dir failure and returns
1295 /// `EngineError::Unsupported("database is locked by another
1296 /// process: …")`. Stale locks (process crashed mid-session)
1297 /// must be cleared via `Database::force_unlock(path)` —
1298 /// SPG can't safely fingerprint who owned a stale directory
1299 /// without a libc dep, which would violate spg-embedded's
1300 /// zero-deps charter.
1301 lock_path: PathBuf,
1302}
1303
1304impl Database {
1305 /// Open a fresh in-memory database. No WAL, no catalog
1306 /// snapshot on disk — perfect for tests + short-lived
1307 /// CLI tools.
1308 #[must_use]
1309 pub fn open_in_memory() -> Self {
1310 Self {
1311 engine: engine_with_query_byte_budget(Engine::new().with_clock(wall_clock_micros)),
1312 persistence: None,
1313 commit_lsn: AtomicU64::new(0),
1314 tx_wal: None,
1315 }
1316 }
1317
1318 /// v7.1 — Open or create a persistent database backed by
1319 /// the file at `db_path`. The WAL lives at `db_path` +
1320 /// ".wal" (e.g. `./data/spg.db` → `./data/spg.db.wal`). Boot
1321 /// path:
1322 ///
1323 /// 1. If `db_path` exists, restore the catalog snapshot.
1324 /// 2. If the WAL exists, replay every record into the
1325 /// restored engine — the same recovery story
1326 /// `spg-server` uses.
1327 /// 3. Open the WAL in append+sync mode so subsequent
1328 /// `execute()` writes durably commit (one fsync per
1329 /// mutation).
1330 ///
1331 /// `Drop` writes a final catalog snapshot + truncates the
1332 /// WAL — operators that need a sync barrier at a specific
1333 /// point use `checkpoint()` explicitly.
1334 pub fn open_path(db_path: impl AsRef<Path>) -> Result<Self, EngineError> {
1335 let db_path = db_path.as_ref().to_path_buf();
1336 // v7.19 — WAL is a directory of chunk files. Legacy
1337 // single-file path stays variable-named `wal_path` for
1338 // the backward-compat migration block below.
1339 let wal_path = {
1340 let mut p = db_path.clone();
1341 let name = p
1342 .file_name()
1343 .map(|n| {
1344 let mut s = n.to_os_string();
1345 s.push(".wal");
1346 s
1347 })
1348 .unwrap_or_else(|| std::ffi::OsString::from(".wal"));
1349 p.set_file_name(name);
1350 p
1351 };
1352 let wal_dir = wal_path.clone();
1353 if let Some(parent) = db_path.parent()
1354 && !parent.as_os_str().is_empty()
1355 {
1356 std::fs::create_dir_all(parent).map_err(io_err)?;
1357 }
1358 // v7.17.0 Phase 6.2 — acquire cross-process exclusion
1359 // lock before touching any catalog / WAL bytes. atomic
1360 // mkdir on every supported platform; a second process
1361 // opening the same path while the first is still alive
1362 // hits the create_dir failure and gets a clear error.
1363 let lock_path = {
1364 let mut p = db_path.clone();
1365 let name = p
1366 .file_name()
1367 .map(|n| {
1368 let mut s = n.to_os_string();
1369 s.push(".lock");
1370 s
1371 })
1372 .unwrap_or_else(|| std::ffi::OsString::from(".lock"));
1373 p.set_file_name(name);
1374 p
1375 };
1376 acquire_path_lock(&lock_path)?;
1377 let mut engine = if db_path.exists() {
1378 let bytes = std::fs::read(&db_path).map_err(io_err)?;
1379 let engine = Engine::restore_envelope(&bytes).map_err(|e| {
1380 EngineError::Storage(spg_storage::StorageError::Corrupt(format!(
1381 "restore from {}: {e}",
1382 db_path.display()
1383 )))
1384 })?;
1385 engine_with_query_byte_budget(engine.with_clock(wall_clock_micros))
1386 } else {
1387 engine_with_query_byte_budget(Engine::new().with_clock(wall_clock_micros))
1388 };
1389 // v7.1.4 — manifest-driven cold-segment reload. The
1390 // manifest sidecar pairs the catalog snapshot CRC with a
1391 // list of `(segment_id, path, crc32)` triples; verify
1392 // before loading so a torn or stale manifest doesn't
1393 // surface phantom data.
1394 let cold_segments_dir = {
1395 let parent = db_path.parent().unwrap_or_else(|| Path::new("."));
1396 let stem = db_path
1397 .file_stem()
1398 .unwrap_or_else(|| std::ffi::OsStr::new("db"))
1399 .to_string_lossy()
1400 .into_owned();
1401 parent.join(format!("{stem}.spg")).join("segments")
1402 };
1403 let mut cold_segment_paths: BTreeMap<u32, PathBuf> = BTreeMap::new();
1404 let manifest_pth = spg_manifest_path(&db_path);
1405 if manifest_pth.exists() && db_path.exists() {
1406 let m_bytes = std::fs::read(&manifest_pth).map_err(io_err)?;
1407 if let Ok(m) = CatalogManifest::deserialize(&m_bytes) {
1408 let snap_bytes = std::fs::read(&db_path).map_err(io_err)?;
1409 let snap_crc = spg_crypto::crc32::crc32(&snap_bytes);
1410 if snap_crc == m.catalog_crc32 {
1411 for entry in &m.cold_segments {
1412 if let Ok(seg_bytes) = std::fs::read(&entry.path) {
1413 let computed = spg_crypto::crc32::crc32(&seg_bytes);
1414 if computed != entry.crc32 {
1415 eprintln!(
1416 "spg-embedded: manifest skip segment {}: CRC mismatch",
1417 entry.segment_id
1418 );
1419 continue;
1420 }
1421 if engine.catalog().cold_segment(entry.segment_id).is_some() {
1422 // Already loaded via Catalog::clone path (shouldn't happen
1423 // since Engine::new + restore_envelope don't populate cold).
1424 continue;
1425 }
1426 let mut new_cat = engine.catalog().clone();
1427 if let Err(e) =
1428 new_cat.load_segment_bytes_at(entry.segment_id, seg_bytes)
1429 {
1430 eprintln!(
1431 "spg-embedded: manifest load segment {} failed: {e}",
1432 entry.segment_id
1433 );
1434 continue;
1435 }
1436 engine.replace_catalog(new_cat);
1437 cold_segment_paths.insert(entry.segment_id, entry.path.clone());
1438 } else {
1439 eprintln!(
1440 "spg-embedded: manifest skip segment {}: file unreadable",
1441 entry.segment_id
1442 );
1443 }
1444 }
1445 }
1446 }
1447 }
1448 // v7.19 — chunked WAL on-disk layout.
1449 //
1450 // Three cases handled here:
1451 //
1452 // 1. wal_dir exists as a DIRECTORY → scan its
1453 // `<unix_us>_<leading_lsn>.wal` chunks (sorted
1454 // lexicographically = chunk-creation order), replay
1455 // them in sequence, advance the LSN watermark to the
1456 // max commit_lsn seen.
1457 //
1458 // 2. wal_path exists as a FILE → legacy v7.18 layout.
1459 // Migrate it: create `wal_dir/`, move the single file
1460 // inside as `0000000000000000_0000000000000000.wal`,
1461 // then fall through to case 1's replay loop.
1462 //
1463 // 3. Neither exists → fresh database; create wal_dir.
1464 let mut initial_lsn: u64 = 0;
1465 if wal_path.is_file() {
1466 // Case 2: legacy single-file WAL migration.
1467 let legacy_bytes = std::fs::read(&wal_path).map_err(io_err)?;
1468 std::fs::remove_file(&wal_path).map_err(io_err)?;
1469 std::fs::create_dir_all(&wal_dir).map_err(io_err)?;
1470 if !legacy_bytes.is_empty() {
1471 let migrated = wal_dir.join(legacy_chunk_filename());
1472 std::fs::write(&migrated, &legacy_bytes).map_err(io_err)?;
1473 }
1474 } else if !wal_dir.exists() {
1475 // Case 3: fresh database.
1476 std::fs::create_dir_all(&wal_dir).map_err(io_err)?;
1477 }
1478 // Cases 1 + 2 share replay logic now that wal_dir is
1479 // guaranteed to exist (and may be empty for case 3).
1480 //
1481 // Two-pass replay so we don't double-apply records the
1482 // snapshot already reflects:
1483 //
1484 // 1. Find the highest commit_lsn carried by a
1485 // checkpoint_marker across all chunks. That LSN is the
1486 // snapshot's high-water mark — anything ≤ it is
1487 // already in `<db_path>` and replaying it would
1488 // DuplicateTable / double-insert.
1489 // 2. Replay only records strictly above that LSN.
1490 //
1491 // Case 2 migration (legacy single-file WAL) lands here
1492 // too: the migrated chunk has no marker so the LSN floor
1493 // is 0 and every record applies — exactly the v7.18
1494 // behaviour the migration is supposed to preserve.
1495 let chunk_paths = sorted_wal_chunks(&wal_dir).map_err(io_err)?;
1496 let mut snapshot_lsn: u64 = 0;
1497 for chunk in &chunk_paths {
1498 let bytes = std::fs::read(chunk).map_err(io_err)?;
1499 if let Ok(records) = parse_wal_records(&bytes) {
1500 for r in &records {
1501 if r.type_byte == WAL_V4_TYPE_CHECKPOINT_MARKER {
1502 if let Some(l) = r.commit_lsn {
1503 if l > snapshot_lsn {
1504 snapshot_lsn = l;
1505 }
1506 }
1507 }
1508 }
1509 }
1510 }
1511 let mut quarantined: Vec<QuarantinedStmt> = Vec::new();
1512 for chunk in &chunk_paths {
1513 let bytes = std::fs::read(chunk).map_err(io_err)?;
1514 if bytes.is_empty() {
1515 continue;
1516 }
1517 replay_wal_filtered(&bytes, &mut engine, snapshot_lsn, &mut quarantined)
1518 .map_err(|m| EngineError::Storage(spg_storage::StorageError::Corrupt(m)))?;
1519 if let Ok(records) = parse_wal_records(&bytes) {
1520 if let Some(max) = records.iter().filter_map(|r| r.commit_lsn).max() {
1521 if max > initial_lsn {
1522 initial_lsn = max;
1523 }
1524 }
1525 }
1526 }
1527 // v7.30.1 (mailrs round-24 ask 2) — replay rejects no longer
1528 // brick the open. Persist the rejected statements beside the
1529 // WAL chunks for forensics and say so loudly; the boot
1530 // continues with every other record applied.
1531 if !quarantined.is_empty() {
1532 let mut body = String::new();
1533 for q in &quarantined {
1534 body.push_str(&format_quarantine_line(q));
1535 }
1536 let qpath = wal_dir.join(format!(
1537 "quarantine-{:016x}.log",
1538 wall_clock_micros().max(0) as u64
1539 ));
1540 match std::fs::write(&qpath, &body) {
1541 Ok(()) => eprintln!(
1542 "spg-embedded: WAL replay quarantined {} statement(s) — boot continues; \
1543 forensics at {}",
1544 quarantined.len(),
1545 qpath.display()
1546 ),
1547 Err(e) => eprintln!(
1548 "spg-embedded: WAL replay quarantined {} statement(s) — boot continues; \
1549 quarantine file write FAILED ({e}), entries follow:\n{body}",
1550 quarantined.len()
1551 ),
1552 }
1553 }
1554 // Open the "current" chunk — either the last existing
1555 // chunk file (so subsequent appends extend it until the
1556 // size threshold rotates) or a fresh first chunk.
1557 let now_us = wall_clock_micros();
1558 let current_chunk_path = if let Some(last) = chunk_paths.last() {
1559 last.clone()
1560 } else {
1561 wal_dir.join(chunk_filename(now_us, initial_lsn + 1))
1562 };
1563 let wal_file = OpenOptions::new()
1564 .create(true)
1565 .append(true)
1566 .read(true)
1567 .open(¤t_chunk_path)
1568 .map_err(io_err)?;
1569 let wal_len = wal_file.metadata().map_err(io_err)?.len();
1570 let wal = Arc::new(WalGroup::new(wal_file, wal_len));
1571 // v7.19 P3 — spawn retention sweep thread when the
1572 // operator opted in via SPG_PITR_RETENTION_HOURS > 0.
1573 // Otherwise stay on the v7.18 behaviour (chunks accumulate
1574 // until something else — backup-pitr archival, manual
1575 // cleanup — moves them).
1576 let retention_hours = pitr_retention_hours();
1577 let (retention_shutdown, retention_thread) = if retention_hours > 0 {
1578 let shutdown = Arc::new(AtomicBool::new(false));
1579 let shutdown_clone = Arc::clone(&shutdown);
1580 let wal_dir_clone = wal_dir.clone();
1581 let check_interval = std::time::Duration::from_secs(pitr_retention_check_sec());
1582 let archive_cmd = pitr_archive_cmd();
1583 let handle = std::thread::Builder::new()
1584 .name("spg-pitr-retention".into())
1585 .spawn(move || {
1586 retention_sweep_loop(
1587 wal_dir_clone,
1588 retention_hours,
1589 check_interval,
1590 archive_cmd,
1591 shutdown_clone,
1592 );
1593 })
1594 .map_err(io_err)?;
1595 (Some(shutdown), Some(handle))
1596 } else {
1597 (None, None)
1598 };
1599 // v7.20 — background flusher for SPG_SYNCHRONOUS_COMMIT=off.
1600 let (flusher_shutdown, flusher_thread) = if synchronous_commit_on() {
1601 (None, None)
1602 } else {
1603 let shutdown = Arc::new(AtomicBool::new(false));
1604 let shutdown_clone = Arc::clone(&shutdown);
1605 let group = Arc::clone(&wal);
1606 let interval = std::time::Duration::from_millis(wal_writer_delay_ms());
1607 let handle = std::thread::Builder::new()
1608 .name("spg-wal-flusher".into())
1609 .spawn(move || {
1610 while !shutdown_clone.load(Ordering::SeqCst) {
1611 std::thread::sleep(interval);
1612 if let Err(e) = group.flush_now() {
1613 eprintln!("spg-embedded: background WAL flush failed: {e:?}");
1614 }
1615 }
1616 // Final drain on shutdown signal.
1617 let _ = group.flush_now();
1618 })
1619 .map_err(io_err)?;
1620 (Some(shutdown), Some(handle))
1621 };
1622 Ok(Self {
1623 engine,
1624 commit_lsn: AtomicU64::new(initial_lsn),
1625 tx_wal: None,
1626 persistence: Some(PersistenceCtx {
1627 db_path,
1628 wal_dir,
1629 current_chunk_path,
1630 wal,
1631 checkpoint_threshold_bytes: default_checkpoint_threshold_bytes(),
1632 cold_segments_dir,
1633 cold_segment_paths,
1634 lock_path,
1635 retention_shutdown,
1636 retention_thread,
1637 flusher_shutdown,
1638 flusher_thread,
1639 }),
1640 })
1641 }
1642
1643 /// v7.1.4 — freeze the oldest `max_rows` of `table_name`'s
1644 /// hot tier into a brand-new cold-tier segment + persist
1645 /// it to disk. Same semantics as `spg-server`'s freezer
1646 /// thread; embedded just runs the freeze synchronously on
1647 /// the caller's thread. Persistence + manifest update
1648 /// happen as part of the next `checkpoint()` (or on Drop).
1649 pub fn freeze_oldest_to_cold(
1650 &mut self,
1651 table_name: &str,
1652 index_name: &str,
1653 max_rows: usize,
1654 ) -> Result<spg_storage::FreezeReport, EngineError> {
1655 let report = self
1656 .engine
1657 .freeze_oldest_to_cold(table_name, index_name, max_rows)?;
1658 if let Some(p) = &mut self.persistence {
1659 std::fs::create_dir_all(&p.cold_segments_dir).map_err(io_err)?;
1660 let final_path = p
1661 .cold_segments_dir
1662 .join(format!("seg_{}.spg", report.segment_id));
1663 let tmp_path = p
1664 .cold_segments_dir
1665 .join(format!("seg_{}.spg.tmp", report.segment_id));
1666 std::fs::write(&tmp_path, &report.segment_bytes).map_err(io_err)?;
1667 std::fs::rename(&tmp_path, &final_path).map_err(io_err)?;
1668 p.cold_segment_paths.insert(report.segment_id, final_path);
1669 }
1670 Ok(report)
1671 }
1672
1673 /// v7.1 — override the auto-checkpoint WAL-size ceiling for
1674 /// this `Database` instance. Default is
1675 /// `SPG_EMBEDDED_CHECKPOINT_BYTES` env (4 MiB if unset); the
1676 /// setter wins. No-op when the database is in-memory.
1677 pub fn set_checkpoint_threshold_bytes(&mut self, bytes: u64) {
1678 if let Some(p) = &mut self.persistence {
1679 p.checkpoint_threshold_bytes = bytes.max(1);
1680 }
1681 }
1682
1683 /// v7.1 — flush a fresh catalog snapshot to `db_path` and
1684 /// truncate the WAL. Idempotent; cheap when nothing has
1685 /// happened since the last checkpoint. No-op when the
1686 /// database is in-memory (no `db_path` configured).
1687 ///
1688 /// Called automatically when:
1689 /// - the WAL grows past
1690 /// `SPG_EMBEDDED_CHECKPOINT_BYTES` (default 4 MiB) at the
1691 /// end of an `execute()`, and
1692 /// - `Drop` runs (best-effort; checkpoint failure on drop is
1693 /// logged to stderr).
1694 pub fn checkpoint(&mut self) -> Result<(), EngineError> {
1695 let snapshot = self.engine.snapshot();
1696 let Some(p) = &mut self.persistence else {
1697 return Ok(());
1698 };
1699 // Snapshot first (atomic via tmp+rename), then WAL
1700 // truncate. Same order as `spg-server`'s CHECKPOINT —
1701 // a crash between the two leaves the WAL holding
1702 // already-snapshotted ops, which replay cleanly on the
1703 // next boot (idempotent for SPG's standard DDL/DML
1704 // mutations).
1705 let tmp = {
1706 let mut t = p.db_path.clone();
1707 let mut name = t
1708 .file_name()
1709 .map(std::ffi::OsStr::to_os_string)
1710 .unwrap_or_default();
1711 name.push(".tmp");
1712 t.set_file_name(name);
1713 t
1714 };
1715 std::fs::write(&tmp, &snapshot).map_err(io_err)?;
1716 std::fs::rename(&tmp, &p.db_path).map_err(io_err)?;
1717 // v7.1.4 — refresh the manifest so the next boot can
1718 // reload cold segments alongside the snapshot. Bytes
1719 // come from the freshly-written snapshot file (= the
1720 // canonical CRC source).
1721 if !p.cold_segment_paths.is_empty() {
1722 let snap_crc = spg_crypto::crc32::crc32(&snapshot);
1723 let entries: Vec<ColdSegmentEntry> = p
1724 .cold_segment_paths
1725 .iter()
1726 .filter_map(|(&segment_id, path)| {
1727 let bytes = std::fs::read(path).ok()?;
1728 Some(ColdSegmentEntry {
1729 segment_id,
1730 path: path.clone(),
1731 crc32: spg_crypto::crc32::crc32(&bytes),
1732 })
1733 })
1734 .collect();
1735 let manifest = CatalogManifest {
1736 catalog_crc32: snap_crc,
1737 cold_segments: entries,
1738 wal_baseline_offset: 0,
1739 };
1740 let m_bytes = manifest.serialize();
1741 let m_path = spg_manifest_path(&p.db_path);
1742 if let Some(dir) = m_path.parent() {
1743 std::fs::create_dir_all(dir).map_err(io_err)?;
1744 }
1745 let m_tmp = {
1746 let mut t = m_path.clone();
1747 let mut name = t
1748 .file_name()
1749 .map(std::ffi::OsStr::to_os_string)
1750 .unwrap_or_default();
1751 name.push(".tmp");
1752 t.set_file_name(name);
1753 t
1754 };
1755 std::fs::write(&m_tmp, &m_bytes).map_err(io_err)?;
1756 std::fs::rename(&m_tmp, &m_path).map_err(io_err)?;
1757 }
1758 // v7.19 — append a checkpoint marker to the current chunk
1759 // (anchors restore-to-time backups), then rotate to a
1760 // fresh chunk file. Old chunks stay on disk and become
1761 // input to the retention thread (P3) + spgctl backup-pitr
1762 // (P6). The single-file `set_len(0)` truncate the v7.18
1763 // path used is gone — that path silently discarded WAL
1764 // history between checkpoint and the operator's next cron
1765 // run, which is exactly what PITR was meant to fix.
1766 let marker_lsn = self.commit_lsn.load(Ordering::SeqCst);
1767 let marker_ts = wall_clock_micros();
1768 let marker = encode_v4_checkpoint_marker(marker_lsn, marker_ts, &p.db_path);
1769 // v7.20 P2 — checkpoint holds &mut self (engine
1770 // exclusive), so there are no concurrent enqueues: drain
1771 // the pending batch, append the marker, flush, then
1772 // rotate the chunk handle inside the group.
1773 p.wal.enqueue(&marker);
1774 p.wal.flush_now()?;
1775 let new_chunk_path = p.wal_dir.join(chunk_filename(marker_ts, marker_lsn + 1));
1776 let new_handle = OpenOptions::new()
1777 .create(true)
1778 .append(true)
1779 .read(true)
1780 .open(&new_chunk_path)
1781 .map_err(io_err)?;
1782 p.current_chunk_path = new_chunk_path;
1783 p.wal.rotate_file(new_handle);
1784 Ok(())
1785 }
1786
1787 /// Restore a database from a previously-captured catalog
1788 /// snapshot. Pairs with `Database::snapshot()` for
1789 /// round-tripping in-memory state without going through
1790 /// the `spg-server` WAL.
1791 pub fn restore(snapshot: &[u8]) -> Result<Self, EngineError> {
1792 let engine = Engine::restore_envelope(snapshot).map_err(|e| {
1793 EngineError::Storage(spg_storage::StorageError::Corrupt(format!("restore: {e}")))
1794 })?;
1795 Ok(Self {
1796 engine,
1797 persistence: None,
1798 commit_lsn: AtomicU64::new(0),
1799 tx_wal: None,
1800 })
1801 }
1802
1803 /// Take a catalog snapshot suitable for `Database::restore`.
1804 /// The bytes are SPG's canonical catalog envelope (FILE_MAGIC
1805 /// + version + payload); round-trips through every released
1806 /// SPG version per the STABILITY contract.
1807 #[must_use]
1808 pub fn snapshot(&self) -> Vec<u8> {
1809 self.engine.snapshot()
1810 }
1811
1812 /// Execute a SQL statement and return the engine's
1813 /// `QueryResult` verbatim. Pass-through for callers that
1814 /// want to keep PG-flavoured column/row metadata.
1815 ///
1816 /// v7.1 — when the database was opened via `open_path`,
1817 /// successful mutations are appended to the WAL + fsynced
1818 /// before the call returns. A subsequent process crash will
1819 /// recover state up to the last successful return from
1820 /// `execute()`. Read-only statements (SELECT / SHOW /
1821 /// EXPLAIN / BEGIN-COMMIT-ROLLBACK / CHECKPOINT / COMPACT
1822 /// etc.) skip the WAL entirely.
1823 pub fn execute(&mut self, sql: &str) -> Result<QueryResult, EngineError> {
1824 // v7.20 P2 — single-caller convenience over the buffered
1825 // path: enqueue + immediately wait. Batch size is 1 here,
1826 // so the durability behaviour (one fsync before Ok) is
1827 // identical to v7.19. Concurrent callers go through
1828 // `execute_buffered` (AsyncDatabase does) and share the
1829 // leader's fsync.
1830 let (result, ticket) = self.execute_buffered(sql)?;
1831 if let Some(t) = ticket {
1832 t.wait()?;
1833 }
1834 Ok(result)
1835 }
1836
1837 /// v7.20 P2 — group-commit write entry. Runs the engine
1838 /// mutation + encodes/enqueues the WAL record, then RETURNS
1839 /// WITHOUT waiting for the fsync. The caller must call
1840 /// [`WalTicket::wait`] before treating the write as durable
1841 /// — crucially, the caller can (and should) drop whatever
1842 /// lock guards this `Database` first, so the next writer's
1843 /// mutation overlaps this batch's fsync.
1844 ///
1845 /// `None` ticket = nothing hit the WAL (read-only statement,
1846 /// no-op DDL, or in-memory database) — the result is final
1847 /// as returned.
1848 ///
1849 /// # Errors
1850 /// Engine errors propagate unchanged. Auto-checkpoint (when
1851 /// the active chunk crosses the threshold) runs inline and
1852 /// may surface IO errors.
1853 pub fn execute_buffered(
1854 &mut self,
1855 sql: &str,
1856 ) -> Result<(QueryResult, Option<WalTicket>), EngineError> {
1857 let result = self.engine.execute(sql)?;
1858 let modified = matches!(
1859 &result,
1860 QueryResult::CommandOk {
1861 modified_catalog: true,
1862 ..
1863 }
1864 );
1865 let ticket = self.wal_after_ok(sql, modified)?;
1866 Ok((result, ticket))
1867 }
1868
1869 /// v7.21 (round-12 polish) — post-engine WAL bookkeeping shared
1870 /// by the simple ([`Self::execute_buffered`]) and prepared
1871 /// ([`Self::execute_prepared_buffered`]) write paths. `canonical`
1872 /// is the replay text (bind-final for prepared statements);
1873 /// `modified_catalog` comes from the engine result. Three routes:
1874 ///
1875 /// - transaction control → maintain [`Self::tx_wal`]: BEGIN opens
1876 /// the buffer, COMMIT flushes it as ONE atomic
1877 /// `WAL_V4_TYPE_TX_COMMIT_SQL` record, ROLLBACK drops it,
1878 /// SAVEPOINT / ROLLBACK TO mark / truncate it. The engine has
1879 /// already accepted the statement, so this only mirrors state.
1880 /// - inside an open transaction → buffer the statement (shadow-
1881 /// catalog mutations report `modified_catalog: false`, so the
1882 /// auto-commit arm below can't see them).
1883 /// - auto-commit mutation → classic per-statement v4 record.
1884 ///
1885 /// v7.18 PITR — v4 records carry commit LSN + wall-clock micros.
1886 /// The crash window remains one BATCH: replay re-applies
1887 /// idempotently exactly as before, and a torn batch tail drops
1888 /// cleanly (same torn-write handling).
1889 fn wal_after_ok(
1890 &mut self,
1891 canonical: &str,
1892 modified_catalog: bool,
1893 ) -> Result<Option<WalTicket>, EngineError> {
1894 if self.persistence.is_none() {
1895 return Ok(None);
1896 }
1897 let mut record = None;
1898 match tx_control_kind(canonical) {
1899 Some(TxControl::Begin) => {
1900 self.tx_wal = Some(TxWalBuffer::default());
1901 }
1902 Some(TxControl::Commit) => {
1903 if let Some(buf) = self.tx_wal.take()
1904 && !buf.statements.is_empty()
1905 {
1906 let script = buf.statements.join(";\n");
1907 let lsn = self.commit_lsn.fetch_add(1, Ordering::SeqCst) + 1;
1908 record = Some(encode_v4_tx_commit(&script, lsn, wall_clock_micros()));
1909 }
1910 }
1911 Some(TxControl::Rollback) => {
1912 self.tx_wal = None;
1913 }
1914 Some(TxControl::Savepoint(name)) => {
1915 if let Some(buf) = &mut self.tx_wal {
1916 // PG name-reuse semantics: latest mark wins.
1917 buf.savepoints.retain(|(n, _)| n != &name);
1918 let mark = buf.statements.len();
1919 buf.savepoints.push((name, mark));
1920 }
1921 }
1922 Some(TxControl::RollbackToSavepoint(name)) => {
1923 if let Some(buf) = &mut self.tx_wal
1924 && let Some(pos) = buf.savepoints.iter().position(|(n, _)| n == &name)
1925 {
1926 let mark = buf.savepoints[pos].1;
1927 buf.statements.truncate(mark);
1928 // Later savepoints die with the rollback; the
1929 // target itself survives (PG keeps it
1930 // re-rollbackable).
1931 buf.savepoints.truncate(pos + 1);
1932 }
1933 }
1934 Some(TxControl::ReleaseSavepoint) => {
1935 // RELEASE folds the savepoint into the enclosing tx —
1936 // buffered statements stay. The mark also stays:
1937 // marks are only consulted by ROLLBACK TO, which the
1938 // engine validates first, so a dangling mark is
1939 // unreachable.
1940 }
1941 None => {
1942 if let Some(buf) = &mut self.tx_wal {
1943 if !sql_is_read_only(canonical) {
1944 buf.statements.push(canonical.to_string());
1945 }
1946 } else if modified_catalog && !sql_is_read_only(canonical) {
1947 let lsn = self.commit_lsn.fetch_add(1, Ordering::SeqCst) + 1;
1948 record = Some(encode_v4_auto_commit(canonical, lsn, wall_clock_micros()));
1949 }
1950 }
1951 }
1952 let mut ticket = None;
1953 if let Some(record) = record {
1954 let p = self.persistence.as_mut().expect("checked above");
1955 let seq = p.wal.enqueue(&record);
1956 ticket = Some(WalTicket {
1957 group: Arc::clone(&p.wal),
1958 seq,
1959 });
1960 if p.wal.written_len() >= p.checkpoint_threshold_bytes {
1961 self.checkpoint()?;
1962 }
1963 }
1964 Ok(ticket)
1965 }
1966
1967 /// v7.3.0 — typed-row variant of [`Database::query`]. Each
1968 /// row decodes into a `T: FromSpgRow` so callers don't
1969 /// pattern-match on `Value` themselves. Use [`spg_row!`] to
1970 /// generate the impl, or write it by hand.
1971 pub fn query_typed<T: FromSpgRow>(&mut self, sql: &str) -> Result<Vec<T>, EngineError> {
1972 let rows = self.query(sql)?;
1973 rows.into_iter().map(|r| T::from_spg_row(&r)).collect()
1974 }
1975
1976 /// Run a SELECT and return rows as a `Vec<Vec<Value>>` —
1977 /// strips the column-schema metadata for read-side
1978 /// ergonomics. Errors on non-Rows results (DML / DDL
1979 /// statements should go through `execute` instead).
1980 pub fn query(&mut self, sql: &str) -> Result<Vec<Vec<Value>>, EngineError> {
1981 match self.engine.execute(sql)? {
1982 QueryResult::Rows { rows, .. } => Ok(rows.into_iter().map(|r| r.values).collect()),
1983 QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
1984 "query() expects a SELECT — use execute() for DML/DDL".into(),
1985 )),
1986 // v7.5.0 — QueryResult is #[non_exhaustive]; any future
1987 // variant is not a SELECT row stream, treat as Unsupported.
1988 _ => Err(EngineError::Unsupported(
1989 "query() expects a SELECT — use execute() for DML/DDL".into(),
1990 )),
1991 }
1992 }
1993
1994 /// v7.16.0 — column-aware variant of [`Self::query`].
1995 /// Returns the column schema vec alongside the rows so
1996 /// adapters (the spg-sqlx Row impl most notably) can drive
1997 /// name + type-based column lookups. Errors on non-Rows
1998 /// results identically to `query`.
1999 pub fn query_with_columns(
2000 &mut self,
2001 sql: &str,
2002 ) -> Result<(Vec<spg_storage::ColumnSchema>, Vec<Vec<Value>>), EngineError> {
2003 match self.engine.execute(sql)? {
2004 QueryResult::Rows { columns, rows } => {
2005 Ok((columns, rows.into_iter().map(|r| r.values).collect()))
2006 }
2007 QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
2008 "query_with_columns() expects a SELECT — use execute() for DML/DDL".into(),
2009 )),
2010 _ => Err(EngineError::Unsupported(
2011 "query_with_columns() expects a SELECT — use execute() for DML/DDL".into(),
2012 )),
2013 }
2014 }
2015
2016 /// v7.16.0 — column-aware variant of
2017 /// [`Self::query_prepared`]. Same shape as
2018 /// `query_with_columns` but driven from a prepared
2019 /// statement + bound params.
2020 pub fn query_prepared_with_columns(
2021 &mut self,
2022 stmt: &Statement,
2023 params: &[Value],
2024 ) -> Result<(Vec<spg_storage::ColumnSchema>, Vec<Vec<Value>>), EngineError> {
2025 match self.engine.execute_prepared(stmt.stmt.clone(), params)? {
2026 QueryResult::Rows { columns, rows } => {
2027 Ok((columns, rows.into_iter().map(|r| r.values).collect()))
2028 }
2029 QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
2030 "query_prepared_with_columns() expects a SELECT — use execute_prepared() for DML/DDL".into(),
2031 )),
2032 _ => Err(EngineError::Unsupported(
2033 "query_prepared_with_columns() expects a SELECT — use execute_prepared() for DML/DDL".into(),
2034 )),
2035 }
2036 }
2037
2038 /// Borrow the underlying engine. Escape hatch for callers
2039 /// that need access to `spg-engine` APIs not yet surfaced
2040 /// here (transactions, EXPLAIN ANALYZE, etc.).
2041 #[must_use]
2042 pub const fn engine(&self) -> &Engine {
2043 &self.engine
2044 }
2045
2046 /// Mutable borrow of the underlying engine. Same intent as
2047 /// `engine()` but for write-side APIs (e.g. inserting
2048 /// directly through `Catalog::insert` for high-throughput
2049 /// bulk loads that bypass SQL parsing).
2050 pub const fn engine_mut(&mut self) -> &mut Engine {
2051 &mut self.engine
2052 }
2053
2054 /// v7.16.0 — parse + plan a SQL string ONCE so subsequent
2055 /// `execute_prepared` / `query_prepared` calls can re-bind
2056 /// parameters without re-parsing. The returned [`Statement`]
2057 /// is a thin handle around the AST + cached source SQL; it's
2058 /// `Clone` so the same plan can drive many bind calls
2059 /// concurrently (each call clones the AST and runs
2060 /// placeholder substitution on the clone — the cached
2061 /// plan stays intact).
2062 ///
2063 /// Plan caching follows the engine's existing version-aware
2064 /// rule: a prepared `Statement` whose statistics version
2065 /// has rolled (ANALYZE ran between prepare and execute)
2066 /// will silently re-prepare under the hood. Callers don't
2067 /// need to detect this.
2068 ///
2069 /// Placeholders in the SQL use PG's `$1`, `$2`, … convention.
2070 /// `bind`-time `Value`s are passed as a slice; arity
2071 /// mismatches surface as `EvalError::PlaceholderOutOfRange`
2072 /// at `execute_prepared` time, not here.
2073 ///
2074 /// # Errors
2075 /// Surfaces `EngineError` (parse error / plan rewrite
2076 /// failure) from the underlying `Engine::prepare`.
2077 pub fn prepare(&mut self, sql: &str) -> Result<Statement, EngineError> {
2078 // Use the cached path so repeated prepares of the same
2079 // SQL are O(1). The engine's plan cache stays shared
2080 // across all callers of this Database — a single
2081 // `PgPool`-shaped consumer (or, later, the spg-sqlx
2082 // adapter) prepares once and reaps the win on every bind.
2083 let stmt = self
2084 .engine
2085 .prepare_cached(sql)
2086 .map_err(EngineError::Parse)?;
2087 Ok(Statement {
2088 stmt,
2089 sql: sql.to_string(),
2090 })
2091 }
2092
2093 /// v7.17.0 Phase 3.P0-66 — describe a SQL string without
2094 /// executing. Returns `(parameter_oid_count, output_columns)`
2095 /// where `output_columns` is empty for non-SELECT statements
2096 /// or for SELECT shapes the describe planner can't resolve
2097 /// (JOIN / subquery / unknown table). Wraps
2098 /// `Engine::describe_prepared` so the spg-sqlx bridge can
2099 /// surface PG-shape Describe replies for
2100 /// `sqlx::query!()` compile-time validation.
2101 ///
2102 /// # Errors
2103 /// Propagates parse errors from the underlying prepare path.
2104 pub fn describe(&mut self, sql: &str) -> Result<(Vec<u32>, Vec<ColumnSchema>), EngineError> {
2105 let stmt = self
2106 .engine
2107 .prepare_cached(sql)
2108 .map_err(EngineError::Parse)?;
2109 Ok(self.engine.describe_prepared(&stmt))
2110 }
2111
2112 /// v7.16.0 — execute a prepared statement with bound
2113 /// parameters. Mirrors `Engine::execute_prepared`: clones
2114 /// the AST, substitutes `$1..$N` → `params[0..N-1]`, runs.
2115 ///
2116 /// Persistence (WAL fsync + auto-checkpoint) follows the
2117 /// same rules as `execute(sql)`: mutating statements get a
2118 /// WAL record AFTER the in-memory exec succeeds. The WAL
2119 /// record carries the substituted, bind-final SQL, so
2120 /// replay reconstructs the same row state without needing
2121 /// the original prepared `Statement` to still be alive.
2122 ///
2123 /// # Errors
2124 /// Propagates engine errors. Param arity mismatch surfaces
2125 /// as `EvalError::PlaceholderOutOfRange`.
2126 pub fn execute_prepared(
2127 &mut self,
2128 stmt: &Statement,
2129 params: &[Value],
2130 ) -> Result<QueryResult, EngineError> {
2131 let (result, ticket) = self.execute_prepared_buffered(stmt, params)?;
2132 if let Some(t) = ticket {
2133 t.wait()?;
2134 }
2135 Ok(result)
2136 }
2137
2138 /// v7.20 P2 — group-commit variant of
2139 /// [`Database::execute_prepared`]. Same contract as
2140 /// [`Database::execute_buffered`]: mutation + enqueue happen
2141 /// here; the caller waits on the ticket AFTER releasing
2142 /// whatever lock guards this `Database`.
2143 ///
2144 /// # Errors
2145 /// Engine errors propagate unchanged; inline auto-checkpoint
2146 /// may surface IO errors.
2147 pub fn execute_prepared_buffered(
2148 &mut self,
2149 stmt: &Statement,
2150 params: &[Value],
2151 ) -> Result<(QueryResult, Option<WalTicket>), EngineError> {
2152 let result = self.engine.execute_prepared(stmt.stmt.clone(), params)?;
2153 let modified = matches!(
2154 &result,
2155 QueryResult::CommandOk {
2156 modified_catalog: true,
2157 ..
2158 }
2159 );
2160 // WAL persistence on the bind-final SQL. Build the
2161 // canonical Display form by re-printing the
2162 // placeholder-substituted statement (cheap — the AST
2163 // is already in hand from execute_prepared's internal
2164 // clone) so replay's path is identical to the
2165 // simple-query path. v7.21: also when a transaction is
2166 // open — in-tx mutations report `modified_catalog: false`
2167 // but must reach the tx WAL buffer (see `wal_after_ok`).
2168 let mut ticket = None;
2169 if self.persistence.is_some()
2170 && (modified
2171 || (self.tx_wal.is_some() && !sql_is_read_only(&stmt.sql))
2172 || tx_control_kind(&stmt.sql).is_some())
2173 {
2174 let mut wal_stmt = stmt.stmt.clone();
2175 crate::wal_render_with_params(&mut wal_stmt, params);
2176 let canonical = format!("{wal_stmt}");
2177 ticket = self.wal_after_ok(&canonical, modified)?;
2178 }
2179 Ok((result, ticket))
2180 }
2181
2182 /// v7.16.0 — run a prepared SELECT with bound params and
2183 /// return rows as `Vec<Vec<Value>>`, matching `query()`
2184 /// shape. SELECTs are read-only so this never writes the
2185 /// WAL.
2186 ///
2187 /// # Errors
2188 /// Returns `Unsupported` if the prepared statement isn't a
2189 /// SELECT (use `execute_prepared` for DML/DDL).
2190 pub fn query_prepared(
2191 &mut self,
2192 stmt: &Statement,
2193 params: &[Value],
2194 ) -> Result<Vec<Vec<Value>>, EngineError> {
2195 match self.engine.execute_prepared(stmt.stmt.clone(), params)? {
2196 QueryResult::Rows { rows, .. } => Ok(rows.into_iter().map(|r| r.values).collect()),
2197 QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
2198 "query_prepared() expects a SELECT — use execute_prepared() for DML/DDL".into(),
2199 )),
2200 _ => Err(EngineError::Unsupported(
2201 "query_prepared() expects a SELECT — use execute_prepared() for DML/DDL".into(),
2202 )),
2203 }
2204 }
2205
2206 /// v7.18 — parse + plan a SQL string against a
2207 /// `CatalogSnapshot`. Mirror of [`Database::prepare`] for the
2208 /// readonly fan-out path: no writer lock taken, no WAL write,
2209 /// no plan-cache mutation. Static-on-`Self` so callers can
2210 /// dispatch against a snapshot without an `&mut Database`
2211 /// borrow — `AsyncReadHandle::prepare` in spg-embedded-tokio
2212 /// is the load-bearing consumer.
2213 ///
2214 /// # Errors
2215 /// Propagates `EngineError::Parse` from the parser.
2216 pub fn prepare_on_snapshot(
2217 snapshot: &CatalogSnapshot,
2218 sql: &str,
2219 ) -> Result<Statement, EngineError> {
2220 let stmt =
2221 spg_engine::Engine::prepare_on_snapshot(snapshot, sql).map_err(EngineError::Parse)?;
2222 Ok(Statement {
2223 stmt,
2224 sql: sql.to_string(),
2225 })
2226 }
2227
2228 /// v7.18 — execute a prepared `Statement` against a
2229 /// `CatalogSnapshot` with bound params. Mirror of
2230 /// [`Database::execute_prepared`] on the readonly path:
2231 /// writes / DDL hit `EngineError::WriteRequired`. No WAL
2232 /// write, no writer lock, multiple snapshots can run
2233 /// concurrently — the snapshot is immutable from prepare time.
2234 ///
2235 /// # Errors
2236 /// Surfaces `EngineError::WriteRequired` for non-readonly
2237 /// statements; propagates other engine errors.
2238 pub fn execute_prepared_on_snapshot(
2239 snapshot: &CatalogSnapshot,
2240 stmt: &Statement,
2241 params: &[Value],
2242 ) -> Result<QueryResult, EngineError> {
2243 spg_engine::Engine::execute_readonly_prepared_on_snapshot(
2244 snapshot,
2245 stmt.stmt.clone(),
2246 params,
2247 )
2248 }
2249
2250 /// v7.28 (round-22) — deadline-bounded variant of
2251 /// [`Database::execute_prepared_on_snapshot`]. Returns
2252 /// `EngineError::Cancelled` once the budget elapses; the
2253 /// sqlx driver uses this to keep readonly-INLINE execution
2254 /// from monopolising the caller's async runtime (four slow
2255 /// inbox queries saturated mailrs's whole tokio pool) and
2256 /// re-runs over the blocking pool on timeout.
2257 ///
2258 /// # Errors
2259 /// `EngineError::Cancelled` on budget expiry; engine errors
2260 /// otherwise.
2261 pub fn execute_prepared_on_snapshot_with_budget(
2262 snapshot: &CatalogSnapshot,
2263 stmt: &Statement,
2264 params: &[Value],
2265 budget_us: u64,
2266 ) -> Result<QueryResult, EngineError> {
2267 fn mono_now_us() -> u64 {
2268 use std::time::{SystemTime, UNIX_EPOCH};
2269 // Monotonic enough for a per-call relative budget: the
2270 // engine only compares (now - start) against the budget
2271 // within one call.
2272 SystemTime::now()
2273 .duration_since(UNIX_EPOCH)
2274 .map(|d| u64::try_from(d.as_micros()).unwrap_or(u64::MAX))
2275 .unwrap_or(0)
2276 }
2277 let deadline = mono_now_us().saturating_add(budget_us);
2278 let token = spg_engine::CancelToken::none().with_deadline(mono_now_us, deadline);
2279 spg_engine::Engine::execute_readonly_prepared_on_snapshot_with_cancel(
2280 snapshot,
2281 stmt.stmt.clone(),
2282 params,
2283 token,
2284 )
2285 }
2286
2287 /// v7.18 — describe a SQL string against a
2288 /// `CatalogSnapshot`. Mirror of [`Database::describe`] on
2289 /// the readonly path. Pure function on the snapshot's
2290 /// catalog; safe to call from any thread.
2291 ///
2292 /// # Errors
2293 /// Propagates `EngineError::Parse` from the parser.
2294 pub fn describe_on_snapshot(
2295 snapshot: &CatalogSnapshot,
2296 sql: &str,
2297 ) -> Result<(Vec<u32>, Vec<ColumnSchema>), EngineError> {
2298 let stmt =
2299 spg_engine::Engine::prepare_on_snapshot(snapshot, sql).map_err(EngineError::Parse)?;
2300 Ok(spg_engine::Engine::describe_prepared_on_snapshot(
2301 snapshot, &stmt,
2302 ))
2303 }
2304
2305 /// v7.21 (round-12 polish) — run a multi-statement SQL script
2306 /// with PG simple-query semantics: the statements execute in
2307 /// order inside ONE implicit transaction, so a mid-script error
2308 /// rolls back the whole script (PG wraps every simple-query
2309 /// message in an implicit transaction). Three exceptions, all
2310 /// PG-faithful:
2311 ///
2312 /// - a script that carries its OWN transaction control
2313 /// (BEGIN / COMMIT / …) runs statement-by-statement — the
2314 /// script owns its boundaries;
2315 /// - a script run while the caller already has a transaction
2316 /// open joins that transaction (no nested BEGIN), and the
2317 /// caller's COMMIT / ROLLBACK decides its fate;
2318 /// - a single-statement script is plain auto-commit.
2319 ///
2320 /// Returns one `QueryResult` per executed statement. This is the
2321 /// engine behind `sqlx::raw_sql` (mailrs feeds whole
2322 /// `init-schema.sql` files through it) and `spgctl import`.
2323 ///
2324 /// # Errors
2325 /// The first failing statement's error propagates after the
2326 /// implicit ROLLBACK; nothing from the script remains applied.
2327 pub fn execute_script(&mut self, sql: &str) -> Result<Vec<QueryResult>, EngineError> {
2328 let stmts = split_statements(sql);
2329 let script_owns_tx = stmts.iter().any(|s| tx_control_kind(s).is_some());
2330 let wrap = stmts.len() > 1 && !script_owns_tx && !self.engine.in_transaction();
2331 if !wrap {
2332 let mut out = Vec::with_capacity(stmts.len());
2333 for stmt in &stmts {
2334 out.push(self.execute_dump_statement(stmt)?);
2335 }
2336 return Ok(out);
2337 }
2338 self.execute("BEGIN")?;
2339 let mut out = Vec::with_capacity(stmts.len());
2340 for stmt in &stmts {
2341 match self.execute_dump_statement(stmt) {
2342 Ok(r) => out.push(r),
2343 Err(e) => {
2344 // Best-effort rollback; surface the script error.
2345 let _ = self.execute("ROLLBACK");
2346 return Err(e);
2347 }
2348 }
2349 }
2350 self.execute("COMMIT")?;
2351 Ok(out)
2352 }
2353
2354 /// v7.22 (round-13 T2) — execute one `split_statements` chunk,
2355 /// lowering a `COPY … FROM stdin;` block (statement + its data
2356 /// lines, as one chunk) to per-row INSERTs through the shared
2357 /// `spg_engine::copy` helpers. Default-format pg_dump emits
2358 /// COPY blocks, so the zero-change import promise needs this on
2359 /// the embed path; non-COPY statements pass straight through to
2360 /// [`Self::execute`]. Public so `spgctl import` can keep its
2361 /// per-statement error indexing while sharing the lowering.
2362 ///
2363 /// # Errors
2364 /// Engine errors propagate; for COPY the failing row's INSERT
2365 /// error carries the synthesized statement context.
2366 pub fn execute_dump_statement(&mut self, stmt: &str) -> Result<QueryResult, EngineError> {
2367 // Strip pg_dump's `-- Data for Name: …;` banner (it carries
2368 // semicolons of its own) before splitting head from data.
2369 let stmt_clean = strip_leading_sql_noise(stmt);
2370 let head_is_copy = stmt_clean
2371 .get(..4)
2372 .is_some_and(|p| p.eq_ignore_ascii_case("copy"));
2373 if head_is_copy
2374 && let Some((head, data)) = stmt_clean.split_once(';')
2375 && let Some(spec) = spg_engine::copy::parse_copy_from_stdin_head(head)
2376 {
2377 let mut affected: usize = 0;
2378 for line in data.lines() {
2379 // Empty fragments only occur at the chunk boundary
2380 // (the remainder of the COPY line right after `;`);
2381 // data rows are whole non-empty lines.
2382 let line = line.strip_suffix('\r').unwrap_or(line);
2383 if line.is_empty() {
2384 continue;
2385 }
2386 let values = spg_engine::copy::decode_copy_text_row(line);
2387 let insert = spg_engine::copy::build_copy_insert(
2388 &spec.table,
2389 spec.columns.as_deref(),
2390 &values,
2391 );
2392 match self.execute(&insert)? {
2393 QueryResult::CommandOk { affected: n, .. } => affected += n,
2394 _ => affected += 1,
2395 }
2396 }
2397 return Ok(QueryResult::CommandOk {
2398 affected,
2399 modified_catalog: false,
2400 });
2401 }
2402 self.execute(stmt)
2403 }
2404
2405 /// v7.2.0 — run `body` inside an implicit `BEGIN` /
2406 /// `COMMIT` pair. The body receives `&mut Database` so it
2407 /// can `execute()` / `query()` like any other code path;
2408 /// the only difference is that every write in the body
2409 /// lands inside one transaction, and a returned `Err` from
2410 /// the body triggers `ROLLBACK` before the error propagates.
2411 ///
2412 /// Nested calls are not supported — SPG's transaction
2413 /// model is single-writer with explicit `BEGIN` /
2414 /// `COMMIT` / `ROLLBACK`, and a nested `with_transaction`
2415 /// would hit `EngineError::Unsupported("nested
2416 /// transaction")` at the inner `BEGIN`.
2417 pub fn with_transaction<R, F>(&mut self, body: F) -> Result<R, EngineError>
2418 where
2419 F: FnOnce(&mut Self) -> Result<R, EngineError>,
2420 {
2421 self.execute("BEGIN")?;
2422 match body(self) {
2423 Ok(value) => {
2424 self.execute("COMMIT")?;
2425 Ok(value)
2426 }
2427 Err(e) => {
2428 // Best-effort rollback. If ROLLBACK itself
2429 // fails (rare — the engine reports it via
2430 // `Unsupported` only when there's no active
2431 // TX, which can't happen here) we surface the
2432 // original body error, not the rollback error.
2433 let _ = self.execute("ROLLBACK");
2434 Err(e)
2435 }
2436 }
2437 }
2438}
2439
2440impl Default for Database {
2441 fn default() -> Self {
2442 Self::open_in_memory()
2443 }
2444}
2445
2446/// v7.7.5 — observability snapshot returned by
2447/// [`Database::metrics`]. Plain data, no allocations beyond
2448/// what the struct itself takes; cheap to construct and
2449/// cheap to serialise.
2450#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2451#[non_exhaustive]
2452pub struct EmbeddedMetrics {
2453 /// Total live row count across every user table (hot
2454 /// tier only — cold-tier rows live in segment files).
2455 pub hot_rows: u64,
2456 /// Sum of `Table::hot_bytes` across every user table.
2457 /// Tracks against the freezer's `hot_tier_bytes` budget.
2458 pub hot_bytes: u64,
2459 /// Number of cold-tier segments registered in the catalog.
2460 /// Includes tombstoned slots (segments retired by
2461 /// compaction whose disk file may still be on disk).
2462 pub cold_segments: u64,
2463 /// User-table count (excludes any future engine-managed
2464 /// internal tables).
2465 pub tables: u64,
2466 /// WAL size at last `execute()` / `checkpoint()`. Zero
2467 /// when the database is in-memory.
2468 pub wal_bytes: u64,
2469 /// `true` when the database was opened with `open_path` —
2470 /// i.e. WAL + checkpoint persistence is active.
2471 pub persistent: bool,
2472}
2473
2474/// v7.2.1 — handle returned by `spawn_background_freezer`.
2475/// Drop signals the worker thread to wind down + joins it,
2476/// so a `Database` (or its shared `Arc<Mutex<Database>>`)
2477/// can safely drop after the handle does.
2478#[must_use = "the background freezer keeps running until this handle is dropped"]
2479#[derive(Debug)]
2480pub struct FreezerHandle {
2481 shutdown: Arc<AtomicBool>,
2482 join: Option<JoinHandle<()>>,
2483}
2484
2485impl FreezerHandle {
2486 /// v7.2.1 — request the worker stop + join. Idempotent;
2487 /// safe to call from `Drop` (which also calls it).
2488 pub fn stop(&mut self) {
2489 self.shutdown.store(true, Ordering::Release);
2490 if let Some(h) = self.join.take() {
2491 let _ = h.join();
2492 }
2493 }
2494}
2495
2496impl Drop for FreezerHandle {
2497 fn drop(&mut self) {
2498 self.stop();
2499 }
2500}
2501
2502/// v7.2.1 — knobs for `Database::spawn_background_freezer`.
2503#[derive(Debug, Clone)]
2504pub struct FreezerOptions {
2505 /// Tick interval. Worker wakes every `tick`, checks the
2506 /// catalog's `hot_tier_bytes`, and freezes if over budget.
2507 pub tick: Duration,
2508 /// Hot-tier byte budget. Exceeded → next tick freezes the
2509 /// largest table's oldest `batch_rows` rows into a new
2510 /// cold segment.
2511 pub hot_tier_bytes: u64,
2512 /// Max rows the freezer demotes per fire.
2513 pub batch_rows: usize,
2514 /// v7.7.4 — auto-compact threshold. When the catalog has
2515 /// at least this many cold segments across all tables, the
2516 /// freezer fires a compaction pass after its next freeze.
2517 /// Set to `usize::MAX` to disable auto-compact entirely;
2518 /// the default is `64`, matching the `spg-server` operating
2519 /// point for SPG_COLD_COMPACT_SEGMENT_THRESHOLD.
2520 pub compact_when_segments_exceed: usize,
2521 /// v7.7.4 — target segment size for compaction merges,
2522 /// in bytes. Default 64 MiB, mirroring `spg-server`. Small
2523 /// segments below this size are merge candidates;
2524 /// segments at or above stay untouched.
2525 pub compact_target_bytes: u64,
2526}
2527
2528impl Default for FreezerOptions {
2529 fn default() -> Self {
2530 // Match the `spg-server` freezer's default operating
2531 // point (SPG_HOT_TIER_BYTES = 4 GiB, batch 1000 rows,
2532 // tick every 1 s) so embedded behaviour is predictable
2533 // for operators familiar with the server.
2534 Self {
2535 tick: Duration::from_secs(1),
2536 hot_tier_bytes: 4 * 1024 * 1024 * 1024,
2537 batch_rows: 1000,
2538 compact_when_segments_exceed: 64,
2539 compact_target_bytes: 64 * 1024 * 1024,
2540 }
2541 }
2542}
2543
2544impl Database {
2545 /// v7.7.4 — observe the catalog's cold-segment count.
2546 /// Useful for tests + dashboards that want to verify
2547 /// auto-compaction is firing.
2548 #[must_use]
2549 pub fn cold_segment_count(&self) -> usize {
2550 self.engine.catalog().cold_segment_count()
2551 }
2552
2553 /// v7.7.5 — observability snapshot. Returns a point-in-time
2554 /// view of the engine + persistence counters. Cheap (no
2555 /// locks beyond the existing `&self` borrow), so safe to
2556 /// call from a hot metrics-scrape path.
2557 ///
2558 /// Fields mirror the operational dashboard
2559 /// [`spg-server`](https://crates.io/crates/spg-server) exposes,
2560 /// minus the network counters that don't apply to embedded.
2561 #[must_use]
2562 pub fn metrics(&self) -> EmbeddedMetrics {
2563 let cat = self.engine.catalog();
2564 let mut hot_rows: u64 = 0;
2565 let mut hot_bytes: u64 = 0;
2566 for name in cat.table_names() {
2567 if let Some(t) = cat.get(&name) {
2568 hot_rows = hot_rows.saturating_add(t.row_count() as u64);
2569 hot_bytes = hot_bytes.saturating_add(t.hot_bytes());
2570 }
2571 }
2572 let (wal_bytes, persistent) = match &self.persistence {
2573 Some(p) => (p.wal.written_len(), true),
2574 None => (0, false),
2575 };
2576 EmbeddedMetrics {
2577 hot_rows,
2578 hot_bytes,
2579 cold_segments: cat.cold_segment_count() as u64,
2580 tables: cat.table_count() as u64,
2581 wal_bytes,
2582 persistent,
2583 }
2584 }
2585
2586 /// v7.2.1 — spawn a background thread that periodically
2587 /// runs `freeze_oldest_to_cold` when the catalog-wide hot
2588 /// tier exceeds `opts.hot_tier_bytes`. The `Arc<Mutex<_>>`
2589 /// pattern matches the v7.2 sharing story: callers wrap
2590 /// their `Database` in `Arc::new(Mutex::new(db))` once,
2591 /// then clone the Arc for the worker + for foreground
2592 /// access. Return value is a handle whose `Drop` joins the
2593 /// worker.
2594 ///
2595 /// Picks the freeze target the same way `spg-server`'s
2596 /// freezer does: largest-`hot_bytes` user table with at
2597 /// least one BTree integer-PK index. Tables without a
2598 /// freezable index are skipped silently.
2599 pub fn spawn_background_freezer(
2600 db: Arc<Mutex<Database>>,
2601 opts: FreezerOptions,
2602 ) -> FreezerHandle {
2603 let shutdown = Arc::new(AtomicBool::new(false));
2604 let shutdown_for_thread = Arc::clone(&shutdown);
2605 let join = thread::Builder::new()
2606 .name("spg-embedded-freezer".into())
2607 .spawn(move || {
2608 background_freezer_loop(db, opts, shutdown_for_thread);
2609 })
2610 .expect("spawn background freezer thread");
2611 FreezerHandle {
2612 shutdown,
2613 join: Some(join),
2614 }
2615 }
2616}
2617
2618/// v7.2.1 — the freezer's main loop, factored out so the
2619/// `Database::spawn_background_freezer` path stays readable.
2620fn background_freezer_loop(
2621 db: Arc<Mutex<Database>>,
2622 opts: FreezerOptions,
2623 shutdown: Arc<AtomicBool>,
2624) {
2625 // Sleep in short slices so a shutdown request resolves
2626 // quickly (vs sleeping the full tick).
2627 let slice = Duration::from_millis(50.min(opts.tick.as_millis() as u64));
2628 let mut last_tick = std::time::Instant::now();
2629 loop {
2630 if shutdown.load(Ordering::Acquire) {
2631 return;
2632 }
2633 thread::sleep(slice);
2634 if last_tick.elapsed() < opts.tick {
2635 continue;
2636 }
2637 last_tick = std::time::Instant::now();
2638 let Ok(mut guard) = db.lock() else {
2639 return;
2640 };
2641 if guard.engine.catalog().hot_tier_bytes() <= opts.hot_tier_bytes {
2642 continue;
2643 }
2644 let Some((table, index)) = pick_freeze_target(&guard) else {
2645 continue;
2646 };
2647 let row_count = guard
2648 .engine
2649 .catalog()
2650 .get(&table)
2651 .map_or(0, spg_storage::Table::row_count);
2652 let to_freeze = opts.batch_rows.min(row_count);
2653 if to_freeze == 0 {
2654 continue;
2655 }
2656 if let Err(e) = guard.freeze_oldest_to_cold(&table, &index, to_freeze) {
2657 eprintln!("spg-embedded: background freeze on {table}.{index} failed: {e:?}");
2658 continue;
2659 }
2660 // v7.7.4 — auto-compact. If the catalog now carries
2661 // more cold segments than the configured threshold,
2662 // run a single compaction pass. Failures are reported
2663 // but don't kill the loop; the next tick will retry.
2664 let count = guard.engine.catalog().cold_segment_count();
2665 if count > opts.compact_when_segments_exceed {
2666 if let Err(e) = guard
2667 .engine
2668 .compact_cold_segments_with_target(opts.compact_target_bytes)
2669 {
2670 eprintln!(
2671 "spg-embedded: background compact failed (segments={count}, \
2672 threshold={}): {e:?}",
2673 opts.compact_when_segments_exceed,
2674 );
2675 }
2676 }
2677 }
2678}
2679
2680/// v7.2.1 — pick the highest-`hot_bytes` user table with a
2681/// BTree integer-PK index. Returns `(table, index_name)` so the
2682/// caller can dispatch through `freeze_oldest_to_cold`.
2683fn pick_freeze_target(db: &Database) -> Option<(String, String)> {
2684 let cat = db.engine.catalog();
2685 let mut best: Option<(String, String, u64)> = None;
2686 for name in cat.table_names() {
2687 let Some(t) = cat.get(&name) else { continue };
2688 if t.row_count() == 0 {
2689 continue;
2690 }
2691 let cols = &t.schema().columns;
2692 let Some(idx) = t.indices().iter().find(|i| {
2693 matches!(i.kind, spg_storage::IndexKind::BTree(_))
2694 && i.column_position < cols.len()
2695 && matches!(
2696 cols[i.column_position].ty,
2697 spg_storage::DataType::SmallInt
2698 | spg_storage::DataType::Int
2699 | spg_storage::DataType::BigInt
2700 )
2701 }) else {
2702 continue;
2703 };
2704 let hot = t.hot_bytes();
2705 match best {
2706 None => best = Some((name, idx.name.clone(), hot)),
2707 Some((_, _, best_hot)) if hot > best_hot => {
2708 best = Some((name, idx.name.clone(), hot));
2709 }
2710 _ => {}
2711 }
2712 }
2713 best.map(|(t, i, _)| (t, i))
2714}
2715
2716/// v7.7.6 — replay the first `to_seq` records of the WAL at
2717/// `wal_path` into a fresh engine and write the resulting
2718/// catalog snapshot to `out_db_path`. Same semantics as
2719/// `spg revert --wal … --to-seq N --out …` from the CLI:
2720///
2721/// - `to_seq == 0` → snapshot is the empty catalog
2722/// - WAL records beyond `to_seq` are not applied
2723/// - durability-checkpoint markers (v3 type 0x02) are
2724/// consumed without counting against the budget
2725///
2726/// Returns the number of statements actually applied
2727/// (`≤ to_seq`). The output snapshot is byte-identical to
2728/// what `Database::open_path(out_db_path)` would consume on
2729/// a subsequent open.
2730///
2731/// This is the "rewind" operator for an embedded database
2732/// that has been corrupted by a poison statement or a
2733/// half-applied migration. Pair with `cold_segment_paths`
2734/// preservation if your cold-tier files are still on disk.
2735///
2736/// # Errors
2737///
2738/// - `wal_path` unreadable or truncated mid-record
2739/// - WAL record decodes to invalid UTF-8 SQL
2740/// - WAL record's SQL is rejected by the engine
2741/// - `out_db_path` unwritable
2742pub fn revert_wal_to_seq(
2743 wal_path: impl AsRef<Path>,
2744 to_seq: u64,
2745 out_db_path: impl AsRef<Path>,
2746) -> Result<u64, EngineError> {
2747 // v7.19 — accept either a single-file legacy WAL (v7.18 and
2748 // earlier layout) or a chunked WAL directory (v7.19+). For a
2749 // directory, concatenate every `.wal` chunk in sorted order
2750 // — the same order open_path replays them in — so revert
2751 // sees the full record stream.
2752 let path = wal_path.as_ref();
2753 let wal_bytes = if path.is_dir() {
2754 let mut combined = Vec::new();
2755 let chunks = sorted_wal_chunks(path).map_err(io_err)?;
2756 for chunk in chunks {
2757 let bytes = std::fs::read(&chunk).map_err(io_err)?;
2758 combined.extend_from_slice(&bytes);
2759 }
2760 combined
2761 } else {
2762 std::fs::read(path).map_err(io_err)?
2763 };
2764 let mut engine = Engine::new();
2765 let mut applied = 0u64;
2766 let mut cur = 0usize;
2767 while cur < wal_bytes.len() && applied < to_seq {
2768 let (sql_bytes, total) = decode_wal_record(&wal_bytes[cur..])?;
2769 cur += total;
2770 if sql_bytes.is_empty() {
2771 continue;
2772 }
2773 let sql = core::str::from_utf8(&sql_bytes).map_err(|e| {
2774 EngineError::Storage(spg_storage::StorageError::Corrupt(format!(
2775 "WAL record at offset {cur}: non-UTF-8 SQL: {e}"
2776 )))
2777 })?;
2778 // v7.21 — tx-commit records carry a multi-statement script;
2779 // split_statements is a no-op for single-statement records.
2780 for stmt in split_statements(sql) {
2781 engine.execute(stmt)?;
2782 }
2783 applied += 1;
2784 }
2785 let snapshot = engine.snapshot();
2786 std::fs::write(out_db_path.as_ref(), &snapshot).map_err(io_err)?;
2787 Ok(applied)
2788}
2789
2790/// v7.7.6 — decode one WAL record from a byte tail. Returns
2791/// `(sql_bytes, header_plus_payload_len)`. Handles the three
2792/// on-disk formats (v1 / v2 / v3) the same way the CLI
2793/// `decode_one_record` and the engine's `replay_wal_bytes`
2794/// do. CRCs are not re-validated; the caller's intent is
2795/// "apply", not "validate".
2796fn decode_wal_record(tail: &[u8]) -> Result<(Vec<u8>, usize), EngineError> {
2797 if tail.len() < 4 {
2798 return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
2799 format!("WAL truncated record: {} < 4 header bytes", tail.len()),
2800 )));
2801 }
2802 let raw_len = u32::from_le_bytes(tail[..4].try_into().unwrap());
2803 let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
2804 let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
2805 let len_mask = if is_v3 {
2806 !(WAL_V2_SENTINEL | WAL_V3_FLAG)
2807 } else {
2808 !WAL_V2_SENTINEL
2809 };
2810 let rec_len = (raw_len & len_mask) as usize;
2811 let header_len = if is_v3 {
2812 9
2813 } else if is_v2 {
2814 8
2815 } else {
2816 4
2817 };
2818 if tail.len() < header_len + rec_len {
2819 return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
2820 format!(
2821 "WAL truncated record: header+payload {} > available {}",
2822 header_len + rec_len,
2823 tail.len()
2824 ),
2825 )));
2826 }
2827 if is_v3 {
2828 let type_byte = tail[8];
2829 // v3 type 0x01 = auto_commit_sql (payload = SQL).
2830 // v3 type 0x02 = durability marker (no SQL to apply).
2831 // v4 type 0x10 = auto_commit_sql with 16-byte (lsn, ts)
2832 // prefix between type and SQL — strip
2833 // the prefix so the caller still sees raw
2834 // SQL bytes.
2835 // Anything else is unknown.
2836 if type_byte == WAL_V3_TYPE_AUTO_COMMIT_SQL {
2837 let payload = &tail[header_len..header_len + rec_len];
2838 return Ok((payload.to_vec(), header_len + rec_len));
2839 }
2840 if type_byte == WAL_V4_TYPE_AUTO_COMMIT_SQL || type_byte == WAL_V4_TYPE_TX_COMMIT_SQL {
2841 let v4_total = header_len + WAL_V4_EXTRA_HEADER + rec_len;
2842 if tail.len() < v4_total {
2843 return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
2844 format!(
2845 "WAL truncated v4 record: header+payload {v4_total} > available {}",
2846 tail.len()
2847 ),
2848 )));
2849 }
2850 let sql_start = header_len + WAL_V4_EXTRA_HEADER;
2851 let sql_bytes = tail[sql_start..sql_start + rec_len].to_vec();
2852 return Ok((sql_bytes, v4_total));
2853 }
2854 // Caller treats empty payload as a skip-marker.
2855 return Ok((Vec::new(), header_len + rec_len));
2856 }
2857 let payload = &tail[header_len..header_len + rec_len];
2858 Ok((payload.to_vec(), header_len + rec_len))
2859}
2860
2861impl Drop for Database {
2862 fn drop(&mut self) {
2863 // v7.1 — best-effort final checkpoint when a persistent
2864 // Database leaves scope. Failures here go to stderr so
2865 // operators see them, but Drop can't propagate errors —
2866 // the WAL itself is already durable, so a checkpoint
2867 // miss only means the next boot replays a few more
2868 // records than strictly necessary.
2869 if self.persistence.is_some() {
2870 if let Err(e) = self.checkpoint() {
2871 eprintln!(
2872 "spg-embedded: final checkpoint on Drop failed: {e:?} \
2873 (WAL is intact; next open_path will replay)"
2874 );
2875 }
2876 }
2877 // v7.19 P3 / v7.20 — signal the retention + flusher
2878 // threads to exit, then wait for them. Done BEFORE the
2879 // lock release so background threads don't outlive the
2880 // database handle. The flusher drains the pending batch
2881 // on its way out (final flush_now in the thread body),
2882 // so `SPG_SYNCHRONOUS_COMMIT=off` never loses confirmed
2883 // commits across a clean shutdown.
2884 if let Some(ctx) = self.persistence.as_mut() {
2885 if let Some(shutdown) = ctx.retention_shutdown.take() {
2886 shutdown.store(true, Ordering::SeqCst);
2887 }
2888 if let Some(handle) = ctx.retention_thread.take() {
2889 let _ = handle.join();
2890 }
2891 if let Some(shutdown) = ctx.flusher_shutdown.take() {
2892 shutdown.store(true, Ordering::SeqCst);
2893 }
2894 if let Some(handle) = ctx.flusher_thread.take() {
2895 let _ = handle.join();
2896 }
2897 }
2898 // v7.17.0 Phase 6.2 — release the cross-process lock on
2899 // clean shutdown. Failure is logged but never panics;
2900 // the operator can clear a stale lock via
2901 // `Database::force_unlock` if a crash kept the
2902 // directory around.
2903 if let Some(ctx) = &self.persistence
2904 && ctx.lock_path.exists()
2905 {
2906 // remove_dir_all: the lock dir carries the owner-pid
2907 // record since round-12.
2908 if let Err(e) = std::fs::remove_dir_all(&ctx.lock_path) {
2909 eprintln!(
2910 "spg-embedded: lock release on Drop failed for {}: {e:?}",
2911 ctx.lock_path.display()
2912 );
2913 }
2914 }
2915 }
2916}
2917
2918impl Database {
2919 /// v7.17.0 Phase 6.2 — clear a stale cross-process lock.
2920 /// Use when a previous process crashed mid-session and
2921 /// left `<db_path>.lock` behind. Operators should confirm
2922 /// no other process is currently using the database before
2923 /// calling this — SPG cannot fingerprint stale-vs-live
2924 /// without a libc dep, which would violate spg-embedded's
2925 /// zero-deps charter.
2926 pub fn force_unlock(db_path: impl AsRef<Path>) -> Result<(), EngineError> {
2927 let lock_path = {
2928 let mut p = db_path.as_ref().to_path_buf();
2929 let name = p
2930 .file_name()
2931 .map(|n| {
2932 let mut s = n.to_os_string();
2933 s.push(".lock");
2934 s
2935 })
2936 .unwrap_or_else(|| std::ffi::OsString::from(".lock"));
2937 p.set_file_name(name);
2938 p
2939 };
2940 if !lock_path.exists() {
2941 return Ok(());
2942 }
2943 std::fs::remove_dir_all(&lock_path).map_err(io_err)
2944 }
2945}
2946
2947/// v7.1 — turn a `std::io::Error` into the workspace's
2948/// `EngineError` shape. `EngineError::Storage(Corrupt(_))` is
2949/// the closest existing variant — io failures during boot or
2950/// during a WAL append surface as a storage-layer fault to
2951/// callers, which keeps the public error enum unchanged.
2952fn io_err(e: std::io::Error) -> EngineError {
2953 EngineError::Storage(spg_storage::StorageError::Corrupt(format!("io: {e}")))
2954}
2955
2956/// v7.2.2 — `Database` is `Send`, so the recommended sharing
2957/// pattern for multi-threaded callers is `Arc<Mutex<Database>>`:
2958///
2959/// ```no_run
2960/// use std::sync::{Arc, Mutex};
2961/// use spg_embedded::Database;
2962///
2963/// let db = Database::open_in_memory();
2964/// let shared = Arc::new(Mutex::new(db));
2965/// let shared_for_worker = Arc::clone(&shared);
2966/// std::thread::spawn(move || {
2967/// let mut guard = shared_for_worker.lock().unwrap();
2968/// guard.execute("INSERT INTO t VALUES (1)").unwrap();
2969/// });
2970/// ```
2971///
2972/// Internal `RwLock`-wrapped state — letting many threads
2973/// hold concurrent `&Database` for `SELECT` without contending
2974/// — is parked as STABILITY § "Out of v7.2"; multi-reader
2975/// embedded throughput needs a planner-side change to release
2976/// the engine read lock between scans, which is the v7.x
2977/// "Choice A" line of work already documented in v6.9.1's
2978/// carve-out.
2979#[allow(dead_code)]
2980fn _database_is_send() {
2981 fn assert_send<T: Send>() {}
2982 assert_send::<Database>();
2983}
2984
2985/// v6.10.3 — trait that maps a row's columns onto a user
2986/// struct's fields. v7.3.0 ships the [`spg_row!`] declarative
2987/// macro that generates `impl FromSpgRow for YourStruct` from
2988/// a struct definition (no proc-macro, no syn/quote/
2989/// proc-macro2 deps — the workspace's "0 external deps"
2990/// policy holds).
2991///
2992/// Implementors map a row's columns onto a user struct's
2993/// fields. Errors surface as `EngineError::Unsupported` so the
2994/// caller's error type stays uniform.
2995pub trait FromSpgRow: Sized {
2996 /// Decode one query result row into `Self`. Called once per
2997 /// row by [`Database::query_typed`]. The slice length equals
2998 /// the number of columns in the SELECT projection.
2999 fn from_spg_row(row: &[Value]) -> Result<Self, EngineError>;
3000}
3001
3002/// v7.3.0 — declarative macro that generates `FromSpgRow` impl
3003/// for a user struct. Avoids proc-macro deps
3004/// (syn/quote/proc-macro2) so the workspace's 0-deps policy
3005/// holds; the trade-off vs `#[derive(SpgRow)]` is that the
3006/// macro takes the entire struct definition (fields + types)
3007/// as input rather than annotating an existing struct.
3008///
3009/// ```no_run
3010/// use spg_embedded::{Database, spg_row, FromSpgRow};
3011///
3012/// spg_row! {
3013/// pub struct User {
3014/// pub id: i32,
3015/// pub name: String,
3016/// }
3017/// }
3018///
3019/// let mut db = Database::open_in_memory();
3020/// db.execute("CREATE TABLE users (id INT NOT NULL, name TEXT)").unwrap();
3021/// db.execute("INSERT INTO users VALUES (1, 'alice')").unwrap();
3022/// let users: Vec<User> = db.query_typed("SELECT id, name FROM users").unwrap();
3023/// ```
3024///
3025/// Supported field types: `i16`, `i32`, `i64`, `f32`, `f64`,
3026/// `bool`, `String`, `Vec<f32>` (for `VECTOR(N)` columns),
3027/// `Option<T>` of any of the above.
3028#[macro_export]
3029macro_rules! spg_row {
3030 (
3031 $(#[$meta:meta])*
3032 $vis:vis struct $name:ident {
3033 $(
3034 $(#[$fmeta:meta])*
3035 $fvis:vis $field:ident : $ty:ty,
3036 )*
3037 }
3038 ) => {
3039 $(#[$meta])*
3040 #[derive(Debug, Clone)]
3041 $vis struct $name {
3042 $(
3043 $(#[$fmeta])*
3044 $fvis $field : $ty,
3045 )*
3046 }
3047
3048 impl $crate::FromSpgRow for $name {
3049 fn from_spg_row(row: &[$crate::Value]) -> ::core::result::Result<Self, $crate::EngineError> {
3050 let mut __spg_row_iter = row.iter();
3051 $(
3052 let $field: $ty = {
3053 let v = __spg_row_iter
3054 .next()
3055 .ok_or_else(|| $crate::EngineError::Unsupported(
3056 ::std::format!(
3057 "spg_row! {}: missing column for field `{}`",
3058 ::core::stringify!($name),
3059 ::core::stringify!($field)
3060 )
3061 ))?;
3062 <$ty as $crate::FromSpgValue>::from_spg_value(v)
3063 .map_err(|e| $crate::EngineError::Unsupported(
3064 ::std::format!(
3065 "spg_row! {}: column `{}`: {}",
3066 ::core::stringify!($name),
3067 ::core::stringify!($field),
3068 e
3069 )
3070 ))?
3071 };
3072 )*
3073 Ok(Self { $($field,)* })
3074 }
3075 }
3076 };
3077}
3078
3079/// v7.3.0 — per-column decoder used by `spg_row!`. Surface
3080/// covers every numeric / text / bytes / bool variant in
3081/// `Value`, plus `Option<T>` for nullable columns.
3082pub trait FromSpgValue: Sized {
3083 /// Decode one cell into `Self`. The returned `&'static str`
3084 /// is a short diagnostic for type mismatches (e.g. `"expected
3085 /// integer, got TEXT"`); callers wrap it into their own
3086 /// error type.
3087 fn from_spg_value(v: &Value) -> Result<Self, &'static str>;
3088}
3089
3090macro_rules! impl_from_value_int {
3091 ($($t:ty),* $(,)?) => {
3092 $(
3093 impl FromSpgValue for $t {
3094 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3095 match v {
3096 Value::SmallInt(n) => <$t>::try_from(*n).map_err(|_| "SmallInt does not fit target int type"),
3097 Value::Int(n) => <$t>::try_from(*n).map_err(|_| "Int does not fit target int type"),
3098 Value::BigInt(n) => <$t>::try_from(*n).map_err(|_| "BigInt does not fit target int type"),
3099 Value::Null => Err("NULL in non-Option int column"),
3100 _ => Err("non-integer value in int column"),
3101 }
3102 }
3103 }
3104 )*
3105 };
3106}
3107impl_from_value_int!(i16, i32, i64);
3108
3109impl FromSpgValue for f32 {
3110 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3111 match v {
3112 Value::Float(f) => Ok(*f as f32),
3113 Value::Null => Err("NULL in non-Option float column"),
3114 _ => Err("non-float value in float column"),
3115 }
3116 }
3117}
3118
3119impl FromSpgValue for f64 {
3120 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3121 match v {
3122 Value::Float(f) => Ok(*f),
3123 Value::Null => Err("NULL in non-Option float column"),
3124 _ => Err("non-float value in float column"),
3125 }
3126 }
3127}
3128
3129impl FromSpgValue for bool {
3130 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3131 match v {
3132 Value::Bool(b) => Ok(*b),
3133 Value::Null => Err("NULL in non-Option bool column"),
3134 _ => Err("non-bool value in bool column"),
3135 }
3136 }
3137}
3138
3139impl FromSpgValue for String {
3140 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3141 match v {
3142 Value::Text(s) => Ok(s.clone()),
3143 Value::Null => Err("NULL in non-Option text column"),
3144 _ => Err("non-text value in String column"),
3145 }
3146 }
3147}
3148
3149impl FromSpgValue for Vec<f32> {
3150 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3151 match v {
3152 Value::Vector(xs) => Ok(xs.clone()),
3153 Value::Null => Err("NULL in non-Option vector column"),
3154 _ => Err("non-vector value in Vec<f32> column"),
3155 }
3156 }
3157}
3158
3159impl<T: FromSpgValue> FromSpgValue for Option<T> {
3160 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3161 match v {
3162 Value::Null => Ok(None),
3163 other => T::from_spg_value(other).map(Some),
3164 }
3165 }
3166}
3167
3168/// Acquire the cross-process exclusion lock at `lock_path` (atomic
3169/// `mkdir`), recording the owner pid inside. If the lock already
3170/// exists, read the recorded pid and probe liveness — a lock left
3171/// behind by a killed process (docker SIGKILL, crash) is reclaimed
3172/// automatically instead of forcing the operator to delete it by
3173/// hand (mailrs embed round-12: a restarted server came up in
3174/// degraded mode because the previous instance's lock survived).
3175/// v7.27 (mailrs round-21 B) — the prober's environment identity:
3176/// `(hostname, boot-or-container id)`. A pid is only meaningful
3177/// inside the PID namespace that recorded it; mailrs's recovery
3178/// window saw "locked by pid 1" from a STOPPED container because
3179/// the prober's pid 1 (its own init) was alive. When the lock's
3180/// identity differs from ours, liveness is UNDECIDABLE and we
3181/// refuse honestly instead of guessing in either direction.
3182fn host_identity() -> (String, String) {
3183 let hostname = std::process::Command::new("hostname")
3184 .output()
3185 .ok()
3186 .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string())
3187 .unwrap_or_default();
3188 // Linux boot id; containers share the host kernel's boot id, so
3189 // hostname (= container id by default) is the namespace
3190 // discriminator and boot id catches host reboots / pid reuse.
3191 let boot_id = std::fs::read_to_string("/proc/sys/kernel/random/boot_id")
3192 .map(|s| s.trim().to_string())
3193 .or_else(|_| {
3194 std::process::Command::new("sysctl")
3195 .args(["-n", "kern.bootsessionuuid"])
3196 .output()
3197 .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string())
3198 })
3199 .unwrap_or_default();
3200 (hostname, boot_id)
3201}
3202
3203fn acquire_path_lock(lock_path: &Path) -> Result<(), EngineError> {
3204 for attempt in 0..2 {
3205 match std::fs::create_dir(lock_path) {
3206 Ok(()) => {
3207 // Best-effort owner record; liveness probing treats a
3208 // missing pid file as stale (crash between mkdir and
3209 // write is indistinguishable from an ancient lock).
3210 // v7.27 — lines 2+3 record the owner's environment
3211 // identity (hostname, boot id) so a prober in a
3212 // different namespace refuses instead of misreading
3213 // the pid.
3214 let (host, boot) = host_identity();
3215 let _ = std::fs::write(
3216 lock_path.join("pid"),
3217 format!("{}\n{host}\n{boot}\n", std::process::id()),
3218 );
3219 return Ok(());
3220 }
3221 Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists && attempt == 0 => {
3222 let record = std::fs::read_to_string(lock_path.join("pid")).unwrap_or_default();
3223 let mut lines = record.lines();
3224 let owner = lines.next().and_then(|s| s.trim().parse::<u32>().ok());
3225 let lock_host = lines.next().unwrap_or("").trim().to_string();
3226 let lock_boot = lines.next().unwrap_or("").trim().to_string();
3227 // v7.27 — identity check BEFORE the pid probe. A pid
3228 // recorded in another namespace is undecidable both
3229 // ways (a stale lock can look held, a held lock can
3230 // look stale — the unsafe direction). Old-format
3231 // locks (pid only) keep the legacy same-host
3232 // assumption.
3233 if !lock_host.is_empty() {
3234 let (my_host, my_boot) = host_identity();
3235 let same_env = lock_host == my_host
3236 && (lock_boot.is_empty() || my_boot.is_empty() || lock_boot == my_boot);
3237 if !same_env {
3238 return Err(EngineError::Unsupported(format!(
3239 "database lock {} was taken in a different host/container \
3240 (owner: pid {} on {:?}; we are {:?}) — liveness is \
3241 undecidable from here. If you are sure the owner is gone, \
3242 call Database::force_unlock() or `spg import --force-unlock`.",
3243 lock_path.display(),
3244 owner.unwrap_or(0),
3245 lock_host,
3246 my_host
3247 )));
3248 }
3249 }
3250 let owner_alive = owner.is_some_and(pid_alive);
3251 if owner_alive {
3252 return Err(EngineError::Unsupported(format!(
3253 "database is locked by another process (pid {}): {}; \
3254 stop that process first, or call Database::force_unlock()",
3255 owner.unwrap_or(0),
3256 lock_path.display()
3257 )));
3258 }
3259 // Stale — owner pid dead or unrecorded. Reclaim.
3260 eprintln!(
3261 "spg-embedded: reclaiming stale lock {} (owner pid {:?} not alive)",
3262 lock_path.display(),
3263 owner
3264 );
3265 std::fs::remove_dir_all(lock_path).map_err(io_err)?;
3266 // Loop retries the create_dir; a concurrent reclaimer
3267 // winning the race surfaces as AlreadyExists on
3268 // attempt 1 below.
3269 }
3270 Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
3271 return Err(EngineError::Unsupported(format!(
3272 "database is locked by another process: {}; \
3273 stop that process first, or call Database::force_unlock()",
3274 lock_path.display()
3275 )));
3276 }
3277 Err(e) => return Err(io_err(e)),
3278 }
3279 }
3280 unreachable!("acquire_path_lock loop covers both attempts")
3281}
3282
3283/// Probe whether `pid` is a live process. Unix: `ps -p` via the
3284/// system binary (std-only — no libc dependency). `ps -p` exits 0
3285/// for ANY live pid regardless of owner; `kill -0` was rejected
3286/// here because it fails with EPERM on another user's live process,
3287/// which would read as "dead" and reclaim a held lock. Probe
3288/// failure (no `ps` binary, exec error) conservatively reports
3289/// alive so locks are never auto-reclaimed on doubt; non-unix
3290/// targets do the same.
3291#[cfg(unix)]
3292fn pid_alive(pid: u32) -> bool {
3293 match std::process::Command::new("ps")
3294 .arg("-p")
3295 .arg(pid.to_string())
3296 .stdout(std::process::Stdio::null())
3297 .stderr(std::process::Stdio::null())
3298 .status()
3299 {
3300 Ok(status) => status.success(),
3301 Err(_) => true,
3302 }
3303}
3304
3305#[cfg(not(unix))]
3306fn pid_alive(_pid: u32) -> bool {
3307 true
3308}
3309
3310/// Strip leading whitespace, `--` line comments and NON-conditional
3311/// block comments from a chunk so statement-head checks (COPY
3312/// detection most notably) see the first real token. pg_dump
3313/// prefixes every data block with a `-- Data for Name: …;` banner —
3314/// which itself contains semicolons, so head checks must run on the
3315/// stripped text. MySQL executable conditional comments (`/*!`) are
3316/// content and stay.
3317/// v7.22 — see `split_statements`' `mysql_escapes` tracking. Only
3318/// short chunks are inspected (the signal statements are one-liners;
3319/// COPY data blocks are skipped by the length guard).
3320fn note_dialect_signals(chunk: &str, mysql_escapes: &mut bool) {
3321 if chunk.len() > 4096 {
3322 return;
3323 }
3324 let lower = chunk.to_ascii_lowercase();
3325 if lower.contains("sql_mode") {
3326 *mysql_escapes = true;
3327 } else if lower.contains("standard_conforming_strings") {
3328 *mysql_escapes = lower.contains("off");
3329 }
3330}
3331
3332fn strip_leading_sql_noise(mut s: &str) -> &str {
3333 loop {
3334 let t = s.trim_start();
3335 if let Some(rest) = t.strip_prefix("--") {
3336 s = rest.split_once('\n').map_or("", |(_, r)| r);
3337 continue;
3338 }
3339 if t.starts_with("/*") && !t.starts_with("/*!") {
3340 match t.find("*/") {
3341 Some(e) => {
3342 s = &t[e + 2..];
3343 continue;
3344 }
3345 None => return "",
3346 }
3347 }
3348 return t;
3349 }
3350}
3351
3352/// Split a multi-statement SQL script into individual statements on
3353/// top-level `;`, honouring single-quoted strings (with `''`
3354/// escapes), double-quoted identifiers, dollar-quoted bodies
3355/// (`$tag$ … $tag$`), line comments (`--`) and MySQL executable
3356/// conditional comments (`/*!… */` stay statement content; plain
3357/// nested block comments don't). Chunks that contain no statement
3358/// content (whitespace / comments only) are dropped. PG's
3359/// simple-query protocol does this server-side; the embed path owns
3360/// it here.
3361///
3362/// v7.22 (mailrs round-13 gap 1) — psql meta-command lines are
3363/// dropped for client parity: a line whose first non-whitespace
3364/// byte is `\` BETWEEN statements (PG 18's pg_dump wraps scripts in
3365/// `\restrict` / `\unrestrict`) never reaches the parser, the same
3366/// way psql consumes `\`-lines client-side and never sends them. A
3367/// mid-statement backslash stays an ordinary byte — pg_dump only
3368/// emits meta-commands between statements.
3369pub fn split_statements(sql: &str) -> Vec<&str> {
3370 let bytes = sql.as_bytes();
3371 let mut stmts = Vec::new();
3372 let mut start = 0usize;
3373 let mut has_content = false;
3374 // v7.22 (round-13 T3) — stream-tracked string dialect, mirroring
3375 // the engine's session flag: a statement mentioning `sql_mode`
3376 // (mysqldump preamble, often inside `/*!…*/`) switches plain
3377 // strings to backslash-escape scanning;
3378 // `standard_conforming_strings` (pg_dump preamble) switches
3379 // back. Without this the scanner ends a MySQL `'…\'…'` literal
3380 // early and splits inside data.
3381 let mut mysql_escapes = false;
3382 let mut i = 0usize;
3383 while i < bytes.len() {
3384 match bytes[i] {
3385 b'\\' if !has_content => {
3386 // Start-of-statement `\` = psql meta-command line.
3387 // Consume through end-of-line; restart the chunk
3388 // after it so the line never lands in the output.
3389 while i < bytes.len() && bytes[i] != b'\n' {
3390 i += 1;
3391 }
3392 start = if i < bytes.len() { i + 1 } else { i };
3393 }
3394 b'\'' => {
3395 has_content = true;
3396 // PG escape-string form `E'...'` honours backslash
3397 // escapes (`E'a\';b'` is ONE literal) — detect via
3398 // the immediately-preceding standalone E/e. MySQL
3399 // dialect sessions treat EVERY plain string that way.
3400 let escape_string = mysql_escapes
3401 || (i >= 1
3402 && matches!(bytes[i - 1], b'e' | b'E')
3403 && !(i >= 2
3404 && (bytes[i - 2].is_ascii_alphanumeric() || bytes[i - 2] == b'_')));
3405 i += 1;
3406 while i < bytes.len() {
3407 if escape_string && bytes[i] == b'\\' {
3408 // Skip the escaped byte (covers \' and \\).
3409 i += 2;
3410 continue;
3411 }
3412 if bytes[i] == b'\'' {
3413 // `''` is an escaped quote inside the literal.
3414 if i + 1 < bytes.len() && bytes[i + 1] == b'\'' {
3415 i += 2;
3416 continue;
3417 }
3418 break;
3419 }
3420 i += 1;
3421 }
3422 }
3423 b'"' => {
3424 has_content = true;
3425 i += 1;
3426 while i < bytes.len() && bytes[i] != b'"' {
3427 i += 1;
3428 }
3429 }
3430 b'$' => {
3431 // Possible dollar-quote opener `$tag$` (tag may be
3432 // empty). If the shape doesn't match, it's a plain
3433 // `$` (positional param) — fall through.
3434 let tag_end = bytes[i + 1..]
3435 .iter()
3436 .position(|&b| !(b.is_ascii_alphanumeric() || b == b'_'))
3437 .map(|off| i + 1 + off);
3438 if let Some(te) = tag_end
3439 && te < bytes.len()
3440 && bytes[te] == b'$'
3441 {
3442 has_content = true;
3443 let tag = &sql[i..=te];
3444 // Find the closing `$tag$`.
3445 if let Some(close) = sql[te + 1..].find(tag) {
3446 i = te + 1 + close + tag.len();
3447 continue;
3448 }
3449 // Unterminated — consume the rest; the parser
3450 // will report it.
3451 i = bytes.len();
3452 continue;
3453 }
3454 has_content = true;
3455 }
3456 b'-' if i + 1 < bytes.len() && bytes[i + 1] == b'-' => {
3457 while i < bytes.len() && bytes[i] != b'\n' {
3458 i += 1;
3459 }
3460 }
3461 b'/' if i + 1 < bytes.len() && bytes[i + 1] == b'*' => {
3462 // v7.22 (round-13 T3) — MySQL conditional comments
3463 // `/*!40101 … */` are EXECUTABLE (mysqldump wraps
3464 // its whole preamble + DISABLE KEYS hints in them);
3465 // they must stay statement content for the engine,
3466 // not be skipped as commentary.
3467 if i + 2 < bytes.len() && bytes[i + 2] == b'!' {
3468 has_content = true;
3469 }
3470 let mut depth = 1usize;
3471 i += 2;
3472 while i < bytes.len() && depth > 0 {
3473 if bytes[i] == b'/' && i + 1 < bytes.len() && bytes[i + 1] == b'*' {
3474 depth += 1;
3475 i += 2;
3476 } else if bytes[i] == b'*' && i + 1 < bytes.len() && bytes[i + 1] == b'/' {
3477 depth -= 1;
3478 i += 2;
3479 } else {
3480 i += 1;
3481 }
3482 }
3483 continue;
3484 }
3485 b';' => {
3486 if has_content {
3487 let head = &sql[start..i];
3488 // v7.22 (round-13 T2) — a `COPY … FROM stdin;`
3489 // statement owns its following data block
3490 // through the `\.` terminator line (data lines
3491 // may contain `;`, so generic splitting would
3492 // shred them). Swallow head + data into ONE
3493 // chunk; `execute_script` lowers it to INSERTs.
3494 // pg_dump prefixes the COPY with a comment
3495 // banner — strip it before the head check.
3496 let head_clean = strip_leading_sql_noise(head);
3497 let is_copy_head = head_clean
3498 .get(..4)
3499 .is_some_and(|p| p.eq_ignore_ascii_case("copy"))
3500 && spg_engine::copy::parse_copy_from_stdin_head(head_clean).is_some();
3501 if is_copy_head {
3502 // Scan whole lines after the ';' until the
3503 // `\.` terminator (or EOF — torn dumps lose
3504 // their tail, same as psql would error).
3505 let mut j = i + 1;
3506 let data_end;
3507 loop {
3508 if j >= bytes.len() {
3509 data_end = bytes.len();
3510 break;
3511 }
3512 let line_end = sql[j..].find('\n').map_or(bytes.len(), |off| j + off);
3513 if sql[j..line_end].trim_end_matches('\r').trim() == "\\." {
3514 data_end = j;
3515 i = line_end; // bottom i += 1 skips \n
3516 break;
3517 }
3518 j = line_end + 1;
3519 }
3520 stmts.push(&sql[start..data_end]);
3521 if data_end == bytes.len() {
3522 i = bytes.len();
3523 }
3524 start = i + 1;
3525 has_content = false;
3526 i += 1;
3527 continue;
3528 }
3529 note_dialect_signals(head, &mut mysql_escapes);
3530 stmts.push(head);
3531 }
3532 start = i + 1;
3533 has_content = false;
3534 }
3535 b => {
3536 if !b.is_ascii_whitespace() {
3537 has_content = true;
3538 }
3539 }
3540 }
3541 i += 1;
3542 }
3543 if has_content {
3544 stmts.push(&sql[start..]);
3545 }
3546 stmts
3547}
3548
3549#[cfg(test)]
3550mod tests {
3551 use super::*;
3552
3553 #[test]
3554 fn split_statements_basic_and_trailing() {
3555 assert_eq!(
3556 split_statements("CREATE TABLE a (x INT); INSERT INTO a VALUES (1)"),
3557 vec!["CREATE TABLE a (x INT)", " INSERT INTO a VALUES (1)"]
3558 );
3559 // whitespace/comment-only chunks drop
3560 assert!(split_statements(" ;; -- nothing\n;").is_empty());
3561 }
3562
3563 #[test]
3564 fn split_statements_quoting_forms() {
3565 // ';' inside a plain literal, a doubled quote, an E-string
3566 // backslash escape, a quoted identifier, and a dollar-quoted
3567 // body must not split.
3568 let cases = [
3569 "INSERT INTO t VALUES ('a;b')",
3570 "INSERT INTO t VALUES ('it''s; fine')",
3571 r"INSERT INTO t VALUES (E'it\'s; fine')",
3572 "CREATE TABLE \"odd;name\" (x INT)",
3573 "DO $body$ BEGIN PERFORM 1; END $body$",
3574 "DO $$ SELECT 1; $$",
3575 ];
3576 for sql in cases {
3577 assert_eq!(split_statements(sql), vec![sql], "must stay whole: {sql}");
3578 }
3579 // ...and each still splits cleanly from a neighbour.
3580 for sql in cases {
3581 let script = format!("{sql};\nSELECT 2");
3582 assert_eq!(
3583 split_statements(&script),
3584 vec![sql, "\nSELECT 2"],
3585 "must split after: {sql}"
3586 );
3587 }
3588 }
3589
3590 #[test]
3591 fn split_statements_drops_psql_meta_lines() {
3592 // v7.22 round-13 gap 1 — PG 18 pg_dump wraps scripts in
3593 // `\restrict` / `\unrestrict`; psql parity = the lines never
3594 // reach the parser.
3595 let script = "\\restrict TOKEN123\nSELECT 1;\n\\unrestrict TOKEN123\nSELECT 2;\n\\.\n";
3596 assert_eq!(split_statements(script), vec!["SELECT 1", "SELECT 2"]);
3597 // Mid-statement backslash is NOT a meta-command.
3598 let s2 = r"SELECT E'a\\b'";
3599 assert_eq!(split_statements(s2), vec![s2]);
3600 }
3601
3602 #[test]
3603 fn split_statements_comments_hide_semicolons() {
3604 let script = "-- c1 ; still comment\nSELECT 1; /* a ; b /* nested ; */ */ SELECT 2";
3605 let got = split_statements(script);
3606 assert_eq!(got.len(), 2);
3607 assert!(got[0].contains("SELECT 1"));
3608 assert!(got[1].contains("SELECT 2"));
3609 }
3610
3611 #[test]
3612 fn in_memory_create_insert_select() {
3613 let mut db = Database::open_in_memory();
3614 db.execute("CREATE TABLE t (id INT NOT NULL, name TEXT)")
3615 .unwrap();
3616 db.execute("INSERT INTO t VALUES (1, 'alice')").unwrap();
3617 db.execute("INSERT INTO t VALUES (2, 'bob')").unwrap();
3618 let rows = db.query("SELECT id FROM t WHERE id = 1").unwrap();
3619 assert_eq!(rows.len(), 1);
3620 match &rows[0][0] {
3621 Value::Int(1) => {}
3622 other => panic!("expected Int(1), got {other:?}"),
3623 }
3624 }
3625
3626 #[test]
3627 fn query_on_non_select_errors() {
3628 let mut db = Database::open_in_memory();
3629 db.execute("CREATE TABLE t (id INT)").unwrap();
3630 let r = db.query("INSERT INTO t VALUES (1)");
3631 assert!(r.is_err(), "query() on INSERT must error");
3632 }
3633
3634 #[test]
3635 fn snapshot_roundtrip() {
3636 let mut db = Database::open_in_memory();
3637 db.execute("CREATE TABLE t (id INT NOT NULL)").unwrap();
3638 db.execute("INSERT INTO t VALUES (42)").unwrap();
3639 let bytes = db.snapshot();
3640 let mut restored = Database::restore(&bytes).unwrap();
3641 let rows = restored.query("SELECT id FROM t WHERE id = 42").unwrap();
3642 assert_eq!(rows.len(), 1);
3643 match &rows[0][0] {
3644 Value::Int(42) => {}
3645 other => panic!("expected Int(42), got {other:?}"),
3646 }
3647 }
3648
3649 #[test]
3650 fn from_spg_row_trait_shape() {
3651 struct User {
3652 _id: i32,
3653 }
3654 impl FromSpgRow for User {
3655 fn from_spg_row(row: &[Value]) -> Result<Self, EngineError> {
3656 match row.first() {
3657 Some(Value::Int(n)) => Ok(Self { _id: *n }),
3658 _ => Err(EngineError::Unsupported("bad id".into())),
3659 }
3660 }
3661 }
3662 let row = vec![Value::Int(7)];
3663 let _u = User::from_spg_row(&row).unwrap();
3664 }
3665}