spg_embedded/lib.rs
1// v7.7.2 — every public item in this crate must carry a
2// doc-comment; new code that adds a `pub` without one fails CI.
3#![deny(missing_docs)]
4
5//! # spg-embedded
6//!
7//! Ergonomic embedded-mode entry point for SPG. Wraps the
8//! `spg-engine` execution layer for in-process applications
9//! that don't want to spin up a TCP listener / fork to the
10//! `spg-server` binary.
11//!
12//! ## Quick start
13//!
14//! ```no_run
15//! use spg_embedded::Database;
16//!
17//! // On-disk, durable. WAL fsynced per commit; auto-checkpoint
18//! // at 4 MiB WAL by default.
19//! let mut db = Database::open_path("/data/app.db").unwrap();
20//! db.execute("CREATE TABLE users (id INT NOT NULL, name TEXT)").unwrap();
21//! db.execute("INSERT INTO users VALUES (1, 'alice')").unwrap();
22//! let rows = db.query("SELECT name FROM users WHERE id = 1").unwrap();
23//! for row in &rows {
24//! println!("{:?}", row);
25//! }
26//! ```
27//!
28//! ## Production checklist (v7.5)
29//!
30//! - **Persistence**: `Database::open_path(p)` writes a
31//! crash-consistent WAL + periodic checkpoint snapshot. The
32//! on-disk format is byte-identical to what `spg-server`
33//! produces, so a database can move between modes without
34//! conversion.
35//! - **Durability**: every `execute()` that mutates calls
36//! `fsync` before returning `Ok`. There is no group commit
37//! in embedded mode — every commit pays one fsync. If you
38//! need batch throughput, wrap multiple statements in
39//! [`Database::with_transaction`] which fsyncs only at
40//! commit.
41//! - **Concurrency**: [`Database`] is `Send` but **not** `Sync`.
42//! Share across threads via `Arc<Mutex<Database>>`. The
43//! single-writer model is intentional — see
44//! [STABILITY § A1](https://github.com/lihao/spg/blob/master/STABILITY.md).
45//! - **Background work**: [`Database::spawn_background_freezer`]
46//! moves cold rows to disk-resident segments while you keep
47//! serving requests. It runs in a dedicated thread; drop the
48//! returned [`FreezerHandle`] (or call `stop()`) for clean
49//! shutdown.
50//! - **Errors**: all public enums ([`EngineError`],
51//! [`QueryResult`], [`Value`]) are `#[non_exhaustive]`. Match
52//! them with a wildcard arm so future v7.x releases can add
53//! variants without breaking your code.
54//!
55//! ## Panic contract
56//!
57//! - **No `execute()` / `query()` call panics on user input.**
58//! Malformed SQL, type mismatches, missing tables — all
59//! return `Err(EngineError::…)`. If you observe a panic on
60//! a user-controlled string, that is a bug; file an issue.
61//! - The library panics **only** on internal invariant
62//! violations (e.g., catalog snapshot magic mismatch, WAL
63//! record CRC sentinel corruption that survived the boot-
64//! time validation). These represent silent disk corruption
65//! and an unwind would leak inconsistent state, so the
66//! release profile uses `panic = abort` — your host process
67//! dies fast rather than continuing on poisoned data.
68//! - If you cannot tolerate `panic = abort`, build with
69//! `--profile release-dbg` (keeps unwind tables) and use
70//! `std::panic::catch_unwind` at your application boundary.
71//!
72//! ## Why a separate crate?
73//!
74//! `spg-engine` is `no_std`-compatible (vendored alloc-only).
75//! The embedded-mode entry point uses `std` (filesystem,
76//! threading), so it lives in its own crate to keep the
77//! `no_std` boundary clean.
78
79pub use spg_engine::{CatalogSnapshot, Engine, EngineError, ParsedStatement, QueryResult};
80pub use spg_storage::{ColumnSchema, DataType, Value};
81
82/// v7.16.0 — handle for a parsed-and-planned SQL statement.
83/// Hand off to [`Database::execute_prepared`] / [`Database::query_prepared`]
84/// with a `&[Value]` slice carrying the bind parameters (PG-style
85/// `$1`, `$2`, … positional). Cheap to `Clone`; the underlying AST
86/// is shared by handle copies and cloned per bind call by the
87/// engine's executor.
88///
89/// The handle holds a snapshot of the AST at prepare time. If
90/// the engine's plan cache evicts the entry between prepare and
91/// execute (e.g. ANALYZE bumps the statistics version) the
92/// stored AST keeps working — `execute_prepared` operates on
93/// the handle's clone, not the cache entry.
94#[derive(Debug, Clone)]
95pub struct Statement {
96 /// The parsed + planned AST. `spg-engine::prepare_cached`
97 /// returns it as a clone of the cached plan, so any rewrite
98 /// passes (`expand_group_by_all`, `reorder_joins`, …) have
99 /// already run.
100 pub(crate) stmt: ParsedStatement,
101 /// Original SQL source, kept for `Display` / debug only.
102 /// WAL persistence renders from the AST so a bind-time
103 /// rewrite of `$1..$N` survives replay.
104 pub(crate) sql: String,
105}
106
107impl Statement {
108 /// Borrow the original SQL source — useful for tracing and
109 /// debug logs. WAL replay does NOT use this; it serialises
110 /// the bind-final AST instead.
111 #[must_use]
112 pub fn sql(&self) -> &str {
113 &self.sql
114 }
115}
116
117/// v7.16.0 — internal WAL helper. Mirrors what
118/// `Engine::execute_prepared` does to the cloned AST so the WAL
119/// record carries the bind-final SQL text (so replay's
120/// simple-query path reconstructs the same row state without
121/// needing the original `Statement` handle to still be alive).
122/// Errors from the underlying engine helper would only fire if
123/// the bind-final stmt referenced a placeholder past the params
124/// slice — and that case has already errored in the executor
125/// above before this helper runs, so we discard the Result here.
126fn wal_render_with_params(stmt: &mut ParsedStatement, params: &[Value]) {
127 let _ = spg_engine::substitute_placeholders(stmt, params);
128}
129
130use std::collections::BTreeMap;
131use std::fs::{File, OpenOptions};
132use std::io::Write;
133use std::path::{Path, PathBuf};
134use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
135use std::sync::{Arc, Condvar, Mutex};
136use std::thread::{self, JoinHandle};
137use std::time::{Duration, SystemTime, UNIX_EPOCH};
138
139/// v7.11.3 — wall-clock provider injected into every embedded
140/// `Engine`. Microseconds since the Unix epoch; clamps to
141/// `i64::MAX` if the system clock is far-future. Used by SQL's
142/// `NOW()` / `CURRENT_TIMESTAMP` / `CURRENT_DATE` rewrite layer
143/// so PG-idiomatic time queries work without the caller wiring
144/// their own clock.
145/// v7.36 (mailrs ask #4) — flatten an `EXPLAIN` QueryResult into
146/// the QUERY PLAN string lines. `EXPLAIN` always returns a single-
147/// column TEXT table; anything else is treated as no plan output.
148fn extract_query_plan_lines(result: QueryResult) -> Vec<String> {
149 match result {
150 QueryResult::Rows { rows, .. } => rows
151 .into_iter()
152 .filter_map(|r| {
153 r.values.into_iter().next().and_then(|v| match v {
154 Value::Text(s) => Some(s),
155 _ => None,
156 })
157 })
158 .collect(),
159 _ => Vec::new(),
160 }
161}
162
163fn wall_clock_micros() -> i64 {
164 SystemTime::now()
165 .duration_since(UNIX_EPOCH)
166 .map_or(0, |d| i64::try_from(d.as_micros()).unwrap_or(i64::MAX))
167}
168
169use spg_manifest::{CatalogManifest, ColdSegmentEntry, manifest_path as spg_manifest_path};
170
171// -- v7.1 WAL format constants (mirror `spg-server`'s) ---------
172// Kept private so callers can't mis-frame records; the v3 layout
173// is the same the server uses, so a `spg-server` boot can read a
174// database an embedded process wrote and vice versa.
175const WAL_V2_SENTINEL: u32 = 0x8000_0000;
176const WAL_V3_FLAG: u32 = 0x4000_0000;
177const WAL_V3_TYPE_AUTO_COMMIT_SQL: u8 = 0x01;
178/// v7.18 — durability checkpoint marker stays at 0x02 (skipped on replay).
179const WAL_V3_TYPE_DURABILITY_CHECKPOINT: u8 = 0x02;
180/// v7.18 PITR — auto-commit-sql record with appended (commit_lsn,
181/// commit_unix_us) fields so replay can target a specific point in
182/// time. Backward-compat: v3 records (type 0x01) keep working, the
183/// envelope flag bits are unchanged. The new type byte is the
184/// schema-version discriminator.
185const WAL_V4_TYPE_AUTO_COMMIT_SQL: u8 = 0x10;
186/// v7.18 — sentinel for "no wall clock" inside a v4 record's
187/// commit_unix_us slot. Restore-to-timestamp skips records with
188/// this sentinel (no time anchor); LSN-based restore is
189/// unaffected.
190const WAL_V4_NO_CLOCK: i64 = i64::MIN;
191/// v7.18 — extra header bytes after the type byte in a v4 record:
192/// 8 bytes commit_lsn (u64 LE) + 8 bytes commit_unix_us (i64 LE).
193const WAL_V4_EXTRA_HEADER: usize = 16;
194/// v7.18 PITR — checkpoint anchor record written to the WAL *before*
195/// the snapshot file replaces the on-disk catalog. Carries the
196/// (lsn, ts, snapshot_path) triple so restore tooling can find the
197/// matching base snapshot without scanning the filesystem. Replay
198/// dispatch skips it (same as the v3 durability marker).
199const WAL_V4_TYPE_CHECKPOINT_MARKER: u8 = 0x11;
200
201/// v7.21 (mailrs embed round-12 polish) — one COMMITted explicit
202/// transaction, flushed atomically at COMMIT time. Payload = the
203/// transaction's bind-final mutation statements joined with `";\n"`;
204/// replay re-splits via [`split_statements`] and applies in order.
205/// Same 16-byte (commit_lsn, commit_unix_us) prefix as the v4
206/// auto-commit record. The record is CRC-framed like every other
207/// record, so replay applies the whole transaction or — torn tail —
208/// none of it; a transaction can never half-resurrect.
209///
210/// Why it exists: in-transaction mutations only touch the engine's
211/// shadow catalog (`modified_catalog: false`), so the per-statement
212/// auto-commit append never fired and a COMMIT followed by a crash
213/// (no graceful Drop checkpoint) lost the transaction.
214const WAL_V4_TYPE_TX_COMMIT_SQL: u8 = 0x12;
215
216/// v7.34 (crash-recovery P0 #2) — row-level physical redo record. Same v4
217/// envelope (lsn + ts + payload + CRC) but the payload is `encode_redo_log`
218/// bytes, not SQL. Replay applies the physical [`RowChange`]s via
219/// `Engine::apply_redo` instead of re-executing — O(changed rows), not the
220/// O(records × catalog_rows) statement-replay that hung the mailrs P0.
221const WAL_V5_TYPE_ROW_REDO: u8 = 0x13;
222
223/// v7.1 — auto-checkpoint threshold. Once the WAL grows past
224/// this many bytes, the next successful `execute()` call ends
225/// with a `checkpoint()` so the WAL stays bounded. Tunable via
226/// `SPG_EMBEDDED_CHECKPOINT_BYTES` env.
227/// v7.34 (crash-recovery P0 #2) — opt-in row-level redo WAL records.
228/// Default OFF during bringup; `SPG_WAL_ROW_REDO=1` makes mutating
229/// statements log physical changes (0x13) instead of SQL, so crash
230/// recovery applies them in O(changed rows) rather than re-executing in
231/// O(records × catalog_rows) (the superlinear replay hang root-caused on
232/// the mailrs P0). DDL still logs as SQL (hybrid log). When this returns
233/// true, `open_path` arms the engine's redo capture.
234fn row_redo_enabled() -> bool {
235 std::env::var("SPG_WAL_ROW_REDO")
236 .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
237 .unwrap_or(false)
238}
239
240fn default_checkpoint_threshold_bytes() -> u64 {
241 std::env::var("SPG_EMBEDDED_CHECKPOINT_BYTES")
242 .ok()
243 .and_then(|s| s.parse::<u64>().ok())
244 .filter(|&n| n > 0)
245 .unwrap_or(4 * 1024 * 1024)
246}
247
248/// v7.30.3 (mailrs round-26) — per-query byte budget on join/filter
249/// materialisation, default ON at 256 MiB for embed parity with the
250/// server's allocator-level `SPG_MAX_QUERY_BYTES` default. A fat
251/// backfill batch (1000 × full mail bodies) then errors with
252/// `QueryBytesExceeded` instead of walking the host into reclaim
253/// livelock. `SPG_MAX_QUERY_BYTES=0` disables; any other value
254/// overrides. NOT applied to the WAL-replay engine — replay must
255/// never fail on a tuning knob.
256fn engine_with_query_byte_budget(engine: Engine) -> Engine {
257 const DEFAULT_MAX_QUERY_BYTES: usize = 256 * 1024 * 1024;
258 match std::env::var("SPG_MAX_QUERY_BYTES")
259 .ok()
260 .and_then(|s| s.trim().parse::<usize>().ok())
261 {
262 Some(0) => engine,
263 Some(n) => engine.with_max_query_bytes(n),
264 None => engine.with_max_query_bytes(DEFAULT_MAX_QUERY_BYTES),
265 }
266}
267
268/// v7.1 — encode one v3 `auto_commit_sql` record. Layout:
269///
270/// ```text
271/// [u32 LE (len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
272/// [u32 LE crc32 over (type_byte || sql_bytes)]
273/// [u8 type = 0x01]
274/// [sql bytes]
275/// ```
276fn encode_v3_auto_commit(sql: &str) -> Vec<u8> {
277 let payload = sql.as_bytes();
278 let mut crc_buf = Vec::with_capacity(1 + payload.len());
279 crc_buf.push(WAL_V3_TYPE_AUTO_COMMIT_SQL);
280 crc_buf.extend_from_slice(payload);
281 let crc = spg_crypto::crc32::crc32(&crc_buf);
282 let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
283 let mut out = Vec::with_capacity(4 + 4 + 1 + payload.len());
284 out.extend_from_slice(&header);
285 out.extend_from_slice(&crc.to_le_bytes());
286 out.push(WAL_V3_TYPE_AUTO_COMMIT_SQL);
287 out.extend_from_slice(payload);
288 out
289}
290
291/// v7.20 P2 — WAL group-commit. N concurrent commits share one
292/// fsync (the 4.2 ms p50 that profile_breakdown measured as
293/// 99.2% of the durable write path).
294///
295/// Leader-follower protocol, same family as PG's group commit:
296///
297/// 1. `enqueue(record)` — called while the caller still holds
298/// the engine's write lock. Appends the encoded record to the
299/// shared buffer, returns a sequence ticket. O(memcpy).
300/// 2. Caller RELEASES the engine write lock (the next writer's
301/// mutation proceeds in parallel with this batch's fsync).
302/// 3. `wait_flushed(seq)` — if nobody is flushing, the caller
303/// elects itself leader: swaps the buffer out, writes +
304/// fsyncs ONCE for every record in the batch, marks the
305/// batch durable, wakes all followers. Otherwise it parks on
306/// the condvar until a leader covers its seq.
307///
308/// Durability contract is unchanged from v7.19: `execute()`
309/// does not return Ok until the record that describes its
310/// mutation is fsynced. The only change is N callers sharing
311/// one fsync instead of paying one each.
312///
313/// Lock order (deadlock-free): `state` then `file`; never the
314/// reverse. The leader holds `file` WITHOUT `state` during IO so
315/// enqueues continue while fsync runs.
316#[derive(Debug)]
317struct WalGroup {
318 state: Mutex<WalGroupState>,
319 cond: std::sync::Condvar,
320 /// Active chunk file handle. Separate lock from `state` so
321 /// the leader's write+fsync doesn't block concurrent
322 /// enqueues. Swapped by `checkpoint()` at rotation.
323 file: Mutex<File>,
324}
325
326#[derive(Debug)]
327struct WalGroupState {
328 /// Encoded records awaiting flush.
329 buf: Vec<u8>,
330 /// Monotonic enqueue counter (1-based).
331 enqueued_seq: u64,
332 /// Highest seq whose record is fsynced.
333 flushed_seq: u64,
334 /// True while some caller is inside the leader IO section.
335 leader_active: bool,
336 /// Sticky fatal error — a failed fsync poisons the WAL
337 /// (loud, never silent). All current + future waiters error.
338 failed: Option<String>,
339 /// Bytes written to the active chunk since rotation —
340 /// drives the auto-checkpoint trigger.
341 written_len: u64,
342}
343
344/// Ticket returned by the buffered write path; `wait()` blocks
345/// until the record it covers is durable (or the WAL is
346/// poisoned). Cheap to move across threads.
347#[derive(Debug)]
348pub struct WalTicket {
349 group: Arc<WalGroup>,
350 seq: u64,
351}
352
353/// v7.34 (crash-recovery P0 #2) — RAII reset for the WalGroup leader
354/// flag. Electing a leader sets `leader_active = true` and releases the
355/// state lock for the sleep+IO window; if a panic unwinds through that
356/// window the flag would stay true and every follower would park forever
357/// on the condvar — no one left to flush or wake them, the same
358/// total-write hang an unclean stop causes, but self-inflicted. This
359/// guard clears the flag and wakes the followers (so one re-elects) on
360/// ANY drop, including a panic unwind; the normal path disarms it after
361/// resetting the flag itself.
362struct LeaderGuard<'a> {
363 group: &'a WalGroup,
364 armed: bool,
365}
366
367impl Drop for LeaderGuard<'_> {
368 fn drop(&mut self) {
369 if self.armed {
370 let mut g = self.group.state.lock().unwrap_or_else(|e| e.into_inner());
371 g.leader_active = false;
372 drop(g);
373 self.group.cond.notify_all();
374 }
375 }
376}
377
378impl WalGroup {
379 fn new(file: File, initial_len: u64) -> Self {
380 Self {
381 state: Mutex::new(WalGroupState {
382 buf: Vec::new(),
383 enqueued_seq: 0,
384 flushed_seq: 0,
385 leader_active: false,
386 failed: None,
387 written_len: initial_len,
388 }),
389 cond: std::sync::Condvar::new(),
390 file: Mutex::new(file),
391 }
392 }
393
394 /// Append `record` to the pending batch. Returns the seq the
395 /// caller must wait on. Called under the engine write lock —
396 /// keep it O(memcpy).
397 fn enqueue(&self, record: &[u8]) -> u64 {
398 let mut g = self.state.lock().unwrap_or_else(|e| e.into_inner());
399 g.buf.extend_from_slice(record);
400 g.enqueued_seq += 1;
401 g.enqueued_seq
402 }
403
404 /// Block until `seq` is durable. Leader-follower: the first
405 /// arriving waiter flushes for everyone.
406 fn wait_flushed(&self, seq: u64) -> Result<(), EngineError> {
407 let mut g = self.state.lock().unwrap_or_else(|e| e.into_inner());
408 loop {
409 if let Some(e) = &g.failed {
410 return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
411 format!("WAL poisoned by earlier flush failure: {e}"),
412 )));
413 }
414 if g.flushed_seq >= seq {
415 return Ok(());
416 }
417 if !g.leader_active {
418 // Elect self leader.
419 g.leader_active = true;
420 drop(g);
421 // v7.34 — panic-safety: if anything below unwinds before
422 // `leader_active` is reset, this guard releases it +
423 // wakes a follower to re-elect (else all writers park
424 // forever). Disarmed on the normal path after the reset.
425 let mut leader_guard = LeaderGuard {
426 group: self,
427 armed: true,
428 };
429 // v7.20 — commit_delay (PG's same-named knob):
430 // before taking the batch, give in-flight
431 // writers a short window to enqueue so the
432 // shared fsync covers more commits. 150 µs costs
433 // ~3.5% on a solo 4.2 ms fsync but multiplies
434 // batch size under load. Tunable via
435 // SPG_COMMIT_DELAY_US (0 disables).
436 let delay = commit_delay_us();
437 if delay > 0 {
438 std::thread::sleep(std::time::Duration::from_micros(delay));
439 }
440 let (batch, flush_to) = {
441 let mut g2 = self.state.lock().unwrap_or_else(|e| e.into_inner());
442 (core::mem::take(&mut g2.buf), g2.enqueued_seq)
443 };
444 let io_result: std::io::Result<()> = (|| {
445 let mut f = self.file.lock().unwrap_or_else(|e| e.into_inner());
446 f.write_all(&batch)?;
447 f.sync_data()
448 })();
449 g = self.state.lock().unwrap_or_else(|e| e.into_inner());
450 g.leader_active = false;
451 leader_guard.armed = false; // normal completion — disarm
452 match io_result {
453 Ok(()) => {
454 g.flushed_seq = flush_to;
455 g.written_len = g.written_len.saturating_add(batch.len() as u64);
456 }
457 Err(e) => {
458 g.failed = Some(e.to_string());
459 }
460 }
461 self.cond.notify_all();
462 //
463
464 // Loop continues: either our seq is now covered
465 // (leader path normally returns next iteration)
466 // or the error branch surfaces.
467 continue;
468 }
469 g = self.cond.wait(g).unwrap_or_else(|e| e.into_inner());
470 }
471 }
472
473 /// Drain the pending batch + flush synchronously. Caller must
474 /// guarantee no concurrent enqueues (checkpoint holds the
475 /// engine exclusively). Used before rotation so the marker
476 /// lands in the right chunk.
477 fn flush_now(&self) -> Result<(), EngineError> {
478 let mut g = self.state.lock().unwrap_or_else(|e| e.into_inner());
479 if let Some(e) = &g.failed {
480 return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
481 format!("WAL poisoned: {e}"),
482 )));
483 }
484 let batch = core::mem::take(&mut g.buf);
485 let flush_to = g.enqueued_seq;
486 if batch.is_empty() {
487 return Ok(());
488 }
489 drop(g);
490 let io: std::io::Result<()> = (|| {
491 let mut f = self.file.lock().unwrap_or_else(|e| e.into_inner());
492 f.write_all(&batch)?;
493 f.sync_data()
494 })();
495 let mut g = self.state.lock().unwrap_or_else(|e| e.into_inner());
496 match io {
497 Ok(()) => {
498 g.flushed_seq = flush_to;
499 g.written_len = g.written_len.saturating_add(batch.len() as u64);
500 self.cond.notify_all();
501 Ok(())
502 }
503 Err(e) => {
504 g.failed = Some(e.to_string());
505 self.cond.notify_all();
506 Err(io_err(e))
507 }
508 }
509 }
510
511 /// Swap the active chunk handle (rotation). Caller flushes
512 /// first; both locks taken in canonical order.
513 fn rotate_file(&self, new_file: File) {
514 let mut g = self.state.lock().unwrap_or_else(|e| e.into_inner());
515 let mut f = self.file.lock().unwrap_or_else(|e| e.into_inner());
516 *f = new_file;
517 g.written_len = 0;
518 }
519
520 fn written_len(&self) -> u64 {
521 let g = self.state.lock().unwrap_or_else(|e| e.into_inner());
522 g.written_len + g.buf.len() as u64
523 }
524}
525
526// ─────────────────────────────────────────────────────────────────────────────
527// CoW-2 (v7.34) — background-checkpoint worker.
528//
529// Splits checkpoint into two halves so the front-end pays only the cheap one:
530// • Capture (`Database::snapshot_checkpoint_job`) — under &mut self,
531// Arc-bump the catalog + cheap trailer/cold-segment clones + atomic
532// commit_lsn load. Front returns to caller in microseconds.
533// • Execute (`execute_checkpoint_job`, on the worker thread) — serialize
534// the snapshot, tmp+rename the db / manifest files (each fsynced via
535// the rename + dir-fsync), enqueue the v4 marker through the WalGroup
536// (which is already thread-safe so live commits interleave fine),
537// then rotate the chunk file.
538//
539// Replay floor is the marker LSN captured at front-end time. A crash any
540// time during the worker's sequence is safe: nothing past the previous
541// checkpoint's marker can have been forgotten until the new marker hits
542// the WAL, and live writes between the two go into the same chunk under
543// the old marker — replay re-applies them after restoring the (older)
544// snapshot. snapshot+manifest atomicity (D10) is unchanged from the sync
545// path — CoW-4 tightens it later.
546//
547// Single-instance: a state machine of {pending, inflight} so a new
548// trigger fires only when the worker is fully idle. Any sticky error
549// surfaces on the next `wait()`.
550
551#[derive(Debug)]
552struct CheckpointJob {
553 snapshot: spg_engine::EngineSnapshot,
554 marker_lsn: u64,
555 db_path: PathBuf,
556 wal_dir: PathBuf,
557 wal: Arc<WalGroup>,
558 /// Snapshot-time view of the cold-tier segment set. Carried into the
559 /// worker so any concurrent `freeze_oldest_to_cold` after the trigger
560 /// rides the *next* checkpoint's manifest — same staleness window
561 /// the sync path already had.
562 cold_segments: Vec<(u32, PathBuf)>,
563 /// Shared with `PersistenceCtx` so the worker's chunk rotation is
564 /// visible to subsequent diag / Drop introspection.
565 current_chunk_path: Arc<Mutex<PathBuf>>,
566}
567
568#[derive(Debug, Default)]
569struct CheckpointState {
570 /// Set by the front when it has a job ready; cleared when the worker
571 /// picks it up.
572 pending: Option<CheckpointJob>,
573 /// True while the worker is mid-execute. `pending.is_some() || inflight`
574 /// defines "busy" for the trigger / wait predicate.
575 inflight: bool,
576 /// Sticky error from the worker's last failure. Cleared when surfaced
577 /// to a `wait()` caller.
578 last_error: Option<EngineError>,
579 /// Drop signal — worker exits after the current job (or immediately if
580 /// idle and no pending).
581 shutdown: bool,
582}
583
584#[derive(Debug)]
585struct CheckpointWorker {
586 state: Arc<(Mutex<CheckpointState>, Condvar)>,
587 handle: Option<JoinHandle<()>>,
588}
589
590impl CheckpointWorker {
591 fn spawn() -> Self {
592 let state: Arc<(Mutex<CheckpointState>, Condvar)> =
593 Arc::new((Mutex::new(CheckpointState::default()), Condvar::new()));
594 let state_for_thread = Arc::clone(&state);
595 let handle = thread::Builder::new()
596 .name("spg-checkpoint".into())
597 .spawn(move || checkpoint_worker_loop(&state_for_thread))
598 .expect("spawn checkpoint worker");
599 Self {
600 state,
601 handle: Some(handle),
602 }
603 }
604
605 /// Try to enqueue a job. Returns `Ok(true)` if the worker accepted it,
606 /// `Ok(false)` if a job was already pending or in flight (skip — the
607 /// next trigger will pick up newer state). Surfaces any sticky error
608 /// from a previous run before considering the new job, so async paths
609 /// can't lose a failure indefinitely.
610 fn try_enqueue(&self, job: CheckpointJob) -> Result<bool, EngineError> {
611 let (lock, cond) = &*self.state;
612 let mut g = lock.lock().unwrap_or_else(|e| e.into_inner());
613 if let Some(e) = g.last_error.take() {
614 return Err(e);
615 }
616 if g.pending.is_some() || g.inflight {
617 return Ok(false);
618 }
619 g.pending = Some(job);
620 cond.notify_one();
621 Ok(true)
622 }
623
624 /// Block until the worker is idle (no pending, not in flight). Returns
625 /// any sticky error from the last run; clears it on the way out.
626 fn wait(&self) -> Result<(), EngineError> {
627 let (lock, cond) = &*self.state;
628 let mut g = lock.lock().unwrap_or_else(|e| e.into_inner());
629 while g.pending.is_some() || g.inflight {
630 g = cond.wait(g).unwrap_or_else(|e| e.into_inner());
631 }
632 match g.last_error.take() {
633 Some(e) => Err(e),
634 None => Ok(()),
635 }
636 }
637}
638
639impl Drop for CheckpointWorker {
640 fn drop(&mut self) {
641 {
642 let (lock, cond) = &*self.state;
643 let mut g = lock.lock().unwrap_or_else(|e| e.into_inner());
644 g.shutdown = true;
645 cond.notify_one();
646 }
647 if let Some(h) = self.handle.take() {
648 let _ = h.join();
649 }
650 }
651}
652
653fn checkpoint_worker_loop(state: &Arc<(Mutex<CheckpointState>, Condvar)>) {
654 let (lock, cond) = &**state;
655 loop {
656 let job = {
657 let mut g = lock.lock().unwrap_or_else(|e| e.into_inner());
658 while g.pending.is_none() && !g.shutdown {
659 g = cond.wait(g).unwrap_or_else(|e| e.into_inner());
660 }
661 if g.pending.is_none() {
662 // shutdown with no pending → exit cleanly.
663 return;
664 }
665 // Even on shutdown, drain the pending job first so the Drop-time
666 // final checkpoint is durable before exit.
667 let job = g.pending.take().expect("loop invariant");
668 g.inflight = true;
669 job
670 };
671 let result = execute_checkpoint_job(job);
672 {
673 let mut g = lock.lock().unwrap_or_else(|e| e.into_inner());
674 g.inflight = false;
675 if let Err(e) = result {
676 g.last_error = Some(e);
677 }
678 cond.notify_all();
679 }
680 }
681}
682
683fn execute_checkpoint_job(job: CheckpointJob) -> Result<(), EngineError> {
684 // 1. Serialize the captured snapshot. Heavy; this is the whole point
685 // of CoW — it runs off the engine borrow.
686 let snapshot = job.snapshot.serialize();
687 // 2. Snapshot tmp+rename. Atomic on POSIX; rename implicitly fsyncs
688 // the data the next directory walk sees.
689 let tmp = {
690 let mut t = job.db_path.clone();
691 let mut name = t
692 .file_name()
693 .map(std::ffi::OsStr::to_os_string)
694 .unwrap_or_default();
695 name.push(".tmp");
696 t.set_file_name(name);
697 t
698 };
699 std::fs::write(&tmp, &snapshot).map_err(io_err)?;
700 std::fs::rename(&tmp, &job.db_path).map_err(io_err)?;
701 // 3. Manifest tmp+rename (cold tier present).
702 if !job.cold_segments.is_empty() {
703 let snap_crc = spg_crypto::crc32::crc32(&snapshot);
704 let entries: Vec<ColdSegmentEntry> = job
705 .cold_segments
706 .iter()
707 .filter_map(|(segment_id, path)| {
708 let bytes = std::fs::read(path).ok()?;
709 Some(ColdSegmentEntry {
710 segment_id: *segment_id,
711 path: path.clone(),
712 crc32: spg_crypto::crc32::crc32(&bytes),
713 })
714 })
715 .collect();
716 let manifest = CatalogManifest {
717 catalog_crc32: snap_crc,
718 cold_segments: entries,
719 wal_baseline_offset: 0,
720 };
721 let m_bytes = manifest.serialize();
722 let m_path = spg_manifest_path(&job.db_path);
723 if let Some(dir) = m_path.parent() {
724 std::fs::create_dir_all(dir).map_err(io_err)?;
725 }
726 let m_tmp = {
727 let mut t = m_path.clone();
728 let mut name = t
729 .file_name()
730 .map(std::ffi::OsStr::to_os_string)
731 .unwrap_or_default();
732 name.push(".tmp");
733 t.set_file_name(name);
734 t
735 };
736 std::fs::write(&m_tmp, &m_bytes).map_err(io_err)?;
737 std::fs::rename(&m_tmp, &m_path).map_err(io_err)?;
738 }
739 // 4. Enqueue the v4 checkpoint marker carrying the captured LSN. The
740 // WalGroup is thread-safe so a live commit can interleave — the
741 // marker's LSN, not its position in the chunk, anchors replay.
742 let marker_ts = wall_clock_micros();
743 let marker = encode_v4_checkpoint_marker(job.marker_lsn, marker_ts, &job.db_path);
744 job.wal.enqueue(&marker);
745 job.wal.flush_now()?;
746 // 5. Rotate the active chunk. New commits land in the fresh chunk;
747 // pre-marker history stays addressable in the old chunk for PITR /
748 // retention. The shared `current_chunk_path` is updated under its
749 // own lock before the WalGroup swap so diag readers never see a
750 // handle that no longer matches the recorded path.
751 let new_chunk_path = job
752 .wal_dir
753 .join(chunk_filename(marker_ts, job.marker_lsn + 1));
754 let new_handle = OpenOptions::new()
755 .create(true)
756 .append(true)
757 .read(true)
758 .open(&new_chunk_path)
759 .map_err(io_err)?;
760 fsync_dir(&job.wal_dir);
761 {
762 let mut p = job
763 .current_chunk_path
764 .lock()
765 .unwrap_or_else(|e| e.into_inner());
766 *p = new_chunk_path;
767 }
768 job.wal.rotate_file(new_handle);
769 Ok(())
770}
771
772impl WalTicket {
773 /// Block until the record this ticket covers is durable.
774 ///
775 /// Under `SPG_SYNCHRONOUS_COMMIT=off` this returns
776 /// immediately — the background flusher (or the next
777 /// checkpoint / clean shutdown) makes the record durable
778 /// within `SPG_WAL_WRITER_DELAY_MS`. Same contract as PG's
779 /// `synchronous_commit = off`.
780 ///
781 /// # Errors
782 /// Surfaces the leader's IO error if the batch flush failed
783 /// (the WAL is then poisoned for all subsequent writes).
784 pub fn wait(&self) -> Result<(), EngineError> {
785 if !synchronous_commit_on() {
786 return Ok(());
787 }
788 self.group.wait_flushed(self.seq)
789 }
790}
791
792/// v7.19 P3 — retention sweep loop. Runs in a dedicated thread
793/// spawned by `Database::open_path` when `SPG_PITR_RETENTION_HOURS`
794/// is set to a non-zero value. Wakes every
795/// `SPG_PITR_RETENTION_CHECK_SEC` (default 60 s), enumerates chunks
796/// under `wal_dir`, archives via `SPG_PITR_ARCHIVE_CMD` if set, and
797/// deletes anything older than `retention_hours`.
798///
799/// Loud-failure posture matches PG's `archive_command`: if the
800/// archive command returns non-zero, the chunk stays on disk and
801/// a warning prints to stderr. The retention sweep doesn't delete
802/// a chunk it failed to archive.
803fn retention_sweep_loop(
804 wal_dir: PathBuf,
805 retention_hours: u64,
806 check_interval: std::time::Duration,
807 archive_cmd: Option<String>,
808 shutdown: Arc<AtomicBool>,
809) {
810 while !shutdown.load(Ordering::SeqCst) {
811 if let Err(e) = retention_sweep_once(&wal_dir, retention_hours, archive_cmd.as_deref()) {
812 eprintln!("spg-embedded: retention sweep error: {e}");
813 }
814 // Sleep in short ticks so shutdown isn't blocked on a
815 // 60 s naptime when Drop signals.
816 let mut elapsed = std::time::Duration::ZERO;
817 let tick = std::time::Duration::from_millis(250);
818 while elapsed < check_interval {
819 if shutdown.load(Ordering::SeqCst) {
820 return;
821 }
822 std::thread::sleep(tick);
823 elapsed += tick;
824 }
825 }
826}
827
828/// v7.19 P3 — one retention sweep pass over `wal_dir`. Extracted
829/// from the loop so tests can drive it directly. Public so the
830/// e2e_pitr_retention integration test (and any future operator
831/// tooling that wants synchronous retention) can call it.
832pub fn retention_sweep_once(
833 wal_dir: &Path,
834 retention_hours: u64,
835 archive_cmd: Option<&str>,
836) -> std::io::Result<()> {
837 if !wal_dir.exists() {
838 return Ok(());
839 }
840 let now_us = wall_clock_micros();
841 let cutoff_us = (now_us as i128 - (retention_hours as i128 * 3_600 * 1_000_000)) as i64;
842 let chunks = sorted_wal_chunks(wal_dir)?;
843 for chunk in chunks {
844 // Don't sweep the most-recent chunk; it's the live one
845 // execute() is appending to. Compare against the largest
846 // filename-prefix unix_us.
847 let stem = match chunk.file_stem().and_then(|s| s.to_str()) {
848 Some(s) => s,
849 None => continue,
850 };
851 let chunk_us: i64 = stem
852 .split_once('_')
853 .and_then(|(prefix, _)| i64::from_str_radix(prefix, 16).ok())
854 .unwrap_or(0);
855 if chunk_us >= cutoff_us {
856 continue;
857 }
858 // Archive first if requested.
859 if let Some(cmd) = archive_cmd {
860 if !cmd.is_empty() {
861 let output = std::process::Command::new("sh")
862 .arg("-c")
863 .arg(cmd)
864 .arg("--")
865 .arg(&chunk)
866 .output()?;
867 if !output.status.success() {
868 eprintln!(
869 "spg-embedded: SPG_PITR_ARCHIVE_CMD failed for {} (exit {}); chunk stays on disk",
870 chunk.display(),
871 output.status.code().unwrap_or(-1)
872 );
873 continue;
874 }
875 }
876 }
877 // Delete the chunk + its sibling .checksum if present.
878 if let Err(e) = std::fs::remove_file(&chunk) {
879 eprintln!(
880 "spg-embedded: retention remove {} failed: {e}",
881 chunk.display()
882 );
883 continue;
884 }
885 let mut cs = chunk.clone();
886 let mut name = cs.file_name().map(|n| n.to_os_string()).unwrap_or_default();
887 name.push(".checksum");
888 cs.set_file_name(name);
889 let _ = std::fs::remove_file(&cs);
890 }
891 Ok(())
892}
893
894/// v7.20 — group-commit delay window in µs (PG `commit_delay`
895/// analogue). The flush leader sleeps this long before taking
896/// the batch so concurrent writers pile in. Default 150 µs;
897/// `SPG_COMMIT_DELAY_US=0` disables.
898fn commit_delay_us() -> u64 {
899 static CACHED: std::sync::OnceLock<u64> = std::sync::OnceLock::new();
900 *CACHED.get_or_init(|| {
901 std::env::var("SPG_COMMIT_DELAY_US")
902 .ok()
903 .and_then(|s| s.parse::<u64>().ok())
904 .unwrap_or(150)
905 })
906}
907
908/// v7.20 — PG `synchronous_commit` analogue. `on` (default):
909/// `execute()` blocks until its WAL record is fsynced —
910/// zero-loss durability. `off`: `execute()` returns after the
911/// in-memory mutation + WAL enqueue; a background flusher
912/// thread writes + fsyncs every `SPG_WAL_WRITER_DELAY_MS`
913/// (default 200 ms — PG's `wal_writer_delay` default). Crash
914/// window = up to one flush interval of confirmed-but-unsynced
915/// commits — exactly the trade PG documents for the same
916/// setting. Clean shutdown (Drop / checkpoint) always flushes.
917fn synchronous_commit_on() -> bool {
918 static CACHED: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
919 *CACHED.get_or_init(|| {
920 !std::env::var("SPG_SYNCHRONOUS_COMMIT")
921 .map(|v| v.eq_ignore_ascii_case("off") || v == "0" || v.eq_ignore_ascii_case("false"))
922 .unwrap_or(false)
923 })
924}
925
926/// v7.20 — background WAL flusher cadence for
927/// `SPG_SYNCHRONOUS_COMMIT=off` (PG `wal_writer_delay`).
928fn wal_writer_delay_ms() -> u64 {
929 static CACHED: std::sync::OnceLock<u64> = std::sync::OnceLock::new();
930 *CACHED.get_or_init(|| {
931 std::env::var("SPG_WAL_WRITER_DELAY_MS")
932 .ok()
933 .and_then(|s| s.parse::<u64>().ok())
934 .filter(|&n| n > 0)
935 .unwrap_or(200)
936 })
937}
938
939fn pitr_retention_hours() -> u64 {
940 std::env::var("SPG_PITR_RETENTION_HOURS")
941 .ok()
942 .and_then(|s| s.parse::<u64>().ok())
943 .unwrap_or(0)
944}
945
946fn pitr_retention_check_sec() -> u64 {
947 std::env::var("SPG_PITR_RETENTION_CHECK_SEC")
948 .ok()
949 .and_then(|s| s.parse::<u64>().ok())
950 .filter(|&n| n > 0)
951 .unwrap_or(60)
952}
953
954fn pitr_archive_cmd() -> Option<String> {
955 std::env::var("SPG_PITR_ARCHIVE_CMD")
956 .ok()
957 .filter(|s| !s.is_empty())
958}
959
960/// v7.19 — replay every record from `wal_bytes` whose
961/// `commit_lsn` is strictly greater than `floor_lsn`. v3 records
962/// (no LSN) and v4 records with `commit_lsn <= floor_lsn` are
963/// skipped — the snapshot loaded ahead of this call already
964/// reflects them, and re-applying would DuplicateTable /
965/// double-insert. v3 records inside the legacy migration chunk
966/// always apply because the migration sets `floor_lsn = 0` and
967/// v3 records carry no LSN to compare; the pre-migration
968/// behaviour (every record replays) is what the migration
969/// preserves.
970///
971/// Returns the count of records successfully applied. Same
972/// torn-tail semantics as `replay_wal_into_engine`.
973fn replay_wal_filtered(
974 wal_bytes: &[u8],
975 engine: &mut Engine,
976 floor_lsn: u64,
977 quarantine: &mut Vec<QuarantinedStmt>,
978) -> Result<usize, String> {
979 let records = parse_wal_records(wal_bytes)?;
980 let mut applied = 0usize;
981 for r in &records {
982 // Skip markers + non-SQL records.
983 if r.type_byte == WAL_V3_TYPE_DURABILITY_CHECKPOINT
984 || r.type_byte == WAL_V4_TYPE_CHECKPOINT_MARKER
985 {
986 continue;
987 }
988 // v4 SQL records carry an LSN. Apply iff strictly above
989 // the snapshot floor.
990 if r.type_byte == WAL_V4_TYPE_AUTO_COMMIT_SQL
991 || r.type_byte == WAL_V4_TYPE_TX_COMMIT_SQL
992 || r.type_byte == WAL_V5_TYPE_ROW_REDO
993 {
994 if let Some(lsn) = r.commit_lsn {
995 if lsn <= floor_lsn {
996 continue;
997 }
998 }
999 }
1000 // v7.34 (crash-recovery P0 #2) — row-level redo record: apply the
1001 // physical changes directly (O(changed rows)) instead of
1002 // re-executing SQL (the O(records × rows) statement-replay that
1003 // hung the mailrs P0). The payload is `encode_redo_log` bytes, not
1004 // SQL, so it never enters the from_utf8 / split_statements path.
1005 if r.type_byte == WAL_V5_TYPE_ROW_REDO {
1006 let changes = spg_storage::decode_redo_log(r.sql)
1007 .map_err(|e| format!("redo decode at offset {}: {e:?}", r.offset))?;
1008 engine
1009 .apply_redo(&changes)
1010 .map_err(|e| format!("redo apply at offset {}: {e:?}", r.offset))?;
1011 applied += 1;
1012 continue;
1013 }
1014 // v3 records (type 0x01, no LSN) always apply — the
1015 // legacy migration path is the only place they appear,
1016 // and floor_lsn=0 there.
1017 let sql = match std::str::from_utf8(r.sql) {
1018 Ok(s) => s,
1019 Err(e) => return Err(format!("non-UTF-8 SQL at offset {}: {e}", r.offset)),
1020 };
1021 // v7.21 — a tx-commit record carries the whole transaction
1022 // as a `";\n"`-joined script; auto-commit records are a
1023 // single statement, for which split_statements is a no-op.
1024 //
1025 // v7.30.1 (mailrs round-24 ask 2) — a statement the engine
1026 // REJECTS is quarantined, not fatal: "one statement failed
1027 // to replay" ≠ "the catalog is corrupt". Framing damage
1028 // (parse_wal_records / non-UTF-8 above) still errors — that
1029 // IS corruption. Subsequent statements of a tx script keep
1030 // applying: the bricking class is a no-op-at-runtime
1031 // statement that re-applies non-idempotently, and skipping
1032 // just it reconstructs the runtime state.
1033 for stmt in split_statements(sql) {
1034 if let Err(e) = engine.execute(stmt) {
1035 quarantine.push(QuarantinedStmt {
1036 offset: r.offset,
1037 sql: stmt.to_string(),
1038 error: format!("{e:?}"),
1039 });
1040 }
1041 }
1042 applied += 1;
1043 }
1044 Ok(applied)
1045}
1046
1047/// v7.30.1 (mailrs round-24 ask 2) — one statement that failed to
1048/// re-apply during boot replay. Kept for forensics in a
1049/// `quarantine-*.log` beside the WAL chunks; the boot continues.
1050struct QuarantinedStmt {
1051 offset: usize,
1052 sql: String,
1053 error: String,
1054}
1055
1056fn format_quarantine_line(q: &QuarantinedStmt) -> String {
1057 format!("offset {}: {}\n rejected: {}\n", q.offset, q.sql, q.error)
1058}
1059
1060/// v7.19 — WAL chunk filename format. Zero-padded 16-digit
1061/// hex on both parts so default lexicographic sort matches
1062/// numeric order, with the unix_us prefix coming first so
1063/// the on-disk listing is chronological too.
1064/// v7.34 (crash-recovery P0 #2) — fsync a directory so a newly created
1065/// file's entry is durable. `sync_data` on a chunk file persists its
1066/// bytes but NOT the parent directory entry that names it; a power loss
1067/// after creating a fresh WAL chunk could lose that entry and make the
1068/// chunk (and the committed records in it) unreachable on restart.
1069/// Best-effort — a platform that rejects directory fsync is no worse off.
1070fn fsync_dir(dir: &Path) {
1071 if let Ok(f) = File::open(dir) {
1072 let _ = f.sync_all();
1073 }
1074}
1075
1076fn chunk_filename(unix_us: i64, leading_lsn: u64) -> String {
1077 // Negative timestamps shouldn't happen in practice (we sit
1078 // post-1970), but clamp to 0 so the zero-padded
1079 // representation stays sortable.
1080 let us = unix_us.max(0) as u64;
1081 format!("{us:016x}_{leading_lsn:016x}.wal")
1082}
1083
1084/// v7.19 — filename used for the legacy single-file WAL when
1085/// `open_path` migrates a v7.18-layout database into the new
1086/// chunk directory. Lexicographically smallest possible value
1087/// so subsequent chunks sort after it.
1088fn legacy_chunk_filename() -> String {
1089 chunk_filename(0, 0)
1090}
1091
1092/// CoW-4 (v7.34) — D10 fallback: read one cold-segment file and
1093/// hand its bytes to the catalog. The segment binary is self-validating
1094/// (magic + internal CRC32 via `OwnedSegment::from_bytes`), so we don't
1095/// need the manifest's `segment_crc32` to trust it. Returns `true` on a
1096/// successful attach (caller bumps `cold_segment_paths`), `false` on a
1097/// per-segment failure that is logged but doesn't abort boot.
1098fn attach_segment_from_disk(engine: &mut Engine, segment_id: u32, path: &Path) -> bool {
1099 if engine.catalog().cold_segment(segment_id).is_some() {
1100 return true;
1101 }
1102 let bytes = match std::fs::read(path) {
1103 Ok(b) => b,
1104 Err(e) => {
1105 eprintln!(
1106 "spg-embedded: cold-segment scan skip {}: read failed: {e}",
1107 path.display()
1108 );
1109 return false;
1110 }
1111 };
1112 let mut new_cat = engine.catalog().clone();
1113 if let Err(e) = new_cat.load_segment_bytes_at(segment_id, bytes) {
1114 eprintln!(
1115 "spg-embedded: cold-segment scan skip {}: parse/load failed: {e}",
1116 path.display()
1117 );
1118 return false;
1119 }
1120 engine.replace_catalog(new_cat);
1121 true
1122}
1123
1124/// CoW-4 (v7.34) — D10 + missing-manifest fallback: scan
1125/// `<db>.spg/segments/` for `seg_<id>.spg` files and attach any that
1126/// aren't already in `cold_segment_paths`. Closes the window where a
1127/// crash between snapshot rename and manifest rename leaves
1128/// post-checkpoint cold segments orphaned on disk (the snapshot's CRC
1129/// no longer matches the stale manifest, so the manifest path
1130/// silently dropped them). The segment parser self-verifies, so a
1131/// torn write surfaces as a per-segment skip, never silent corruption.
1132fn scan_cold_segments_dir(
1133 segments_dir: &Path,
1134 engine: &mut Engine,
1135 cold_segment_paths: &mut BTreeMap<u32, PathBuf>,
1136) {
1137 // v7.34.1 (mailrs prod report bug A): single-file catalogs (e.g.
1138 // `/data/spg/mailrs.spg` is a regular file, not the `<db>/<db>.spg`
1139 // layout this scan assumes) make the computed `<db>.spg/segments`
1140 // path traverse a file inode, which surfaces as ENOTDIR (`Not a
1141 // directory`, errno 20). Treat any non-directory state — absent,
1142 // file-in-the-way, stat-blocked — as "no segments to scan" and
1143 // silently return. The eprintln below only fires for the genuine
1144 // mid-walk read errors (permission flip, IO failure) that operators
1145 // need to see.
1146 if !segments_dir.is_dir() {
1147 return;
1148 }
1149 let read_dir = match std::fs::read_dir(segments_dir) {
1150 Ok(rd) => rd,
1151 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return,
1152 Err(e) => {
1153 eprintln!(
1154 "spg-embedded: cold-segment scan: cannot read {}: {e}",
1155 segments_dir.display()
1156 );
1157 return;
1158 }
1159 };
1160 for entry in read_dir.flatten() {
1161 let path = entry.path();
1162 // Only the canonical `seg_<id>.spg` form. `.tmp` half-renames
1163 // and unknown extensions are skipped — the segment writer's
1164 // tmp+rename pattern guarantees `.spg` files are either fully
1165 // written or absent.
1166 if path.extension().and_then(|s| s.to_str()) != Some("spg") {
1167 continue;
1168 }
1169 let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
1170 continue;
1171 };
1172 let Some(id_str) = stem.strip_prefix("seg_") else {
1173 continue;
1174 };
1175 let Ok(segment_id) = id_str.parse::<u32>() else {
1176 continue;
1177 };
1178 if cold_segment_paths.contains_key(&segment_id) {
1179 continue;
1180 }
1181 if attach_segment_from_disk(engine, segment_id, &path) {
1182 cold_segment_paths.insert(segment_id, path);
1183 }
1184 }
1185}
1186
1187/// v7.19 — list every `.wal` file in `wal_dir` in
1188/// lexicographic order (which doubles as chunk-creation
1189/// order thanks to the zero-padded filename format).
1190fn sorted_wal_chunks(wal_dir: &Path) -> std::io::Result<Vec<PathBuf>> {
1191 let mut paths = Vec::new();
1192 let read_dir = match std::fs::read_dir(wal_dir) {
1193 Ok(rd) => rd,
1194 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(paths),
1195 Err(e) => return Err(e),
1196 };
1197 for entry in read_dir {
1198 let entry = entry?;
1199 let path = entry.path();
1200 if path.extension().and_then(|s| s.to_str()) == Some("wal") {
1201 paths.push(path);
1202 }
1203 }
1204 paths.sort();
1205 Ok(paths)
1206}
1207
1208/// v7.18 PITR — encode one v4 `checkpoint_marker` record. Layout:
1209///
1210/// ```text
1211/// [u32 LE (payload_len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
1212/// [u32 LE crc32 over (type_byte || payload)]
1213/// [u8 type = 0x11]
1214/// payload:
1215/// [u64 LE checkpoint_lsn]
1216/// [i64 LE checkpoint_unix_us (WAL_V4_NO_CLOCK if no clock)]
1217/// [u16 LE snapshot_path_len]
1218/// [snapshot_path_bytes]
1219/// ```
1220///
1221/// `payload_len` covers only the payload — keeping the framing
1222/// uniform across v3 / v4 record types so torn-write detection in
1223/// `replay_wal_into_engine` stays trivial.
1224fn encode_v4_checkpoint_marker(
1225 checkpoint_lsn: u64,
1226 checkpoint_unix_us: i64,
1227 snapshot_path: &Path,
1228) -> Vec<u8> {
1229 let snapshot_bytes = snapshot_path.to_string_lossy().into_owned();
1230 let snap_payload = snapshot_bytes.as_bytes();
1231 let snap_len_u16: u16 = snap_payload.len().min(u16::MAX as usize) as u16;
1232 let mut payload = Vec::with_capacity(8 + 8 + 2 + snap_payload.len());
1233 payload.extend_from_slice(&checkpoint_lsn.to_le_bytes());
1234 payload.extend_from_slice(&checkpoint_unix_us.to_le_bytes());
1235 payload.extend_from_slice(&snap_len_u16.to_le_bytes());
1236 payload.extend_from_slice(&snap_payload[..snap_len_u16 as usize]);
1237 let mut crc_buf = Vec::with_capacity(1 + payload.len());
1238 crc_buf.push(WAL_V4_TYPE_CHECKPOINT_MARKER);
1239 crc_buf.extend_from_slice(&payload);
1240 let crc = spg_crypto::crc32::crc32(&crc_buf);
1241 let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
1242 let mut out = Vec::with_capacity(4 + 4 + 1 + payload.len());
1243 out.extend_from_slice(&header);
1244 out.extend_from_slice(&crc.to_le_bytes());
1245 out.push(WAL_V4_TYPE_CHECKPOINT_MARKER);
1246 out.extend_from_slice(&payload);
1247 out
1248}
1249
1250/// v7.18 PITR — encode one v4 `auto_commit_sql` record. Layout:
1251///
1252/// ```text
1253/// [u32 LE (sql_len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
1254/// [u32 LE crc32 over (type_byte || lsn || ts || sql_bytes)]
1255/// [u8 type = 0x10]
1256/// [u64 LE commit_lsn]
1257/// [i64 LE commit_unix_us (= WAL_V4_NO_CLOCK when no ClockFn)]
1258/// [sql bytes]
1259/// ```
1260///
1261/// `sql_len` field stays the SQL byte count — same shape as v3 — so
1262/// replay-buffer torn-write detection compares against
1263/// `WAL_V4_EXTRA_HEADER + sql_len`. v3 records (type 0x01) stay
1264/// readable by the same loop with their original 9-byte header
1265/// arithmetic.
1266fn encode_v4_auto_commit(sql: &str, commit_lsn: u64, commit_unix_us: i64) -> Vec<u8> {
1267 encode_v4_framed(
1268 WAL_V4_TYPE_AUTO_COMMIT_SQL,
1269 sql.as_bytes(),
1270 commit_lsn,
1271 commit_unix_us,
1272 )
1273}
1274
1275/// v7.21 — same envelope, `WAL_V4_TYPE_TX_COMMIT_SQL` type byte.
1276/// `script` = the transaction's statements joined with `";\n"`.
1277fn encode_v4_tx_commit(script: &str, commit_lsn: u64, commit_unix_us: i64) -> Vec<u8> {
1278 encode_v4_framed(
1279 WAL_V4_TYPE_TX_COMMIT_SQL,
1280 script.as_bytes(),
1281 commit_lsn,
1282 commit_unix_us,
1283 )
1284}
1285
1286/// v7.34 (crash-recovery P0 #2) — encode one row-level redo record. Same
1287/// v4 envelope + CRC, type byte 0x13; the payload is the
1288/// `encode_redo_log` bytes (physical changes) instead of SQL text, so
1289/// replay applies them in place of re-executing the statement.
1290fn encode_v5_row_redo(redo_bytes: &[u8], commit_lsn: u64, commit_unix_us: i64) -> Vec<u8> {
1291 encode_v4_framed(WAL_V5_TYPE_ROW_REDO, redo_bytes, commit_lsn, commit_unix_us)
1292}
1293
1294fn encode_v4_framed(
1295 type_byte: u8,
1296 payload: &[u8],
1297 commit_lsn: u64,
1298 commit_unix_us: i64,
1299) -> Vec<u8> {
1300 let mut crc_buf = Vec::with_capacity(1 + WAL_V4_EXTRA_HEADER + payload.len());
1301 crc_buf.push(type_byte);
1302 crc_buf.extend_from_slice(&commit_lsn.to_le_bytes());
1303 crc_buf.extend_from_slice(&commit_unix_us.to_le_bytes());
1304 crc_buf.extend_from_slice(payload);
1305 let crc = spg_crypto::crc32::crc32(&crc_buf);
1306 let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
1307 let mut out = Vec::with_capacity(4 + 4 + 1 + WAL_V4_EXTRA_HEADER + payload.len());
1308 out.extend_from_slice(&header);
1309 out.extend_from_slice(&crc.to_le_bytes());
1310 out.push(type_byte);
1311 out.extend_from_slice(&commit_lsn.to_le_bytes());
1312 out.extend_from_slice(&commit_unix_us.to_le_bytes());
1313 out.extend_from_slice(payload);
1314 out
1315}
1316
1317/// v7.1 — decode + apply every record in `wal_bytes` to `engine`.
1318/// Returns the count of records successfully applied. A truncated
1319/// trailing record (mid-write torn) is dropped silently — the
1320/// same recovery story `spg-server`'s boot path uses.
1321fn replay_wal_into_engine(wal_bytes: &[u8], engine: &mut Engine) -> Result<usize, String> {
1322 let mut applied = 0usize;
1323 let mut cur = 0usize;
1324 while cur < wal_bytes.len() {
1325 if wal_bytes.len() - cur < 4 {
1326 // Trailing partial header — torn write, drop and stop.
1327 break;
1328 }
1329 let raw_len = u32::from_le_bytes(wal_bytes[cur..cur + 4].try_into().unwrap());
1330 let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
1331 let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
1332 let len_mask = if is_v3 {
1333 !(WAL_V2_SENTINEL | WAL_V3_FLAG)
1334 } else {
1335 !WAL_V2_SENTINEL
1336 };
1337 let rec_len = (raw_len & len_mask) as usize;
1338 let header_len = if is_v3 {
1339 9
1340 } else if is_v2 {
1341 8
1342 } else {
1343 4
1344 };
1345 if wal_bytes.len() - cur < header_len + rec_len {
1346 // Torn record at the tail — drop, stop.
1347 break;
1348 }
1349 if is_v3 {
1350 let type_byte = wal_bytes[cur + 8];
1351 match type_byte {
1352 WAL_V3_TYPE_AUTO_COMMIT_SQL => {}
1353 WAL_V3_TYPE_DURABILITY_CHECKPOINT => {
1354 // durability_checkpoint marker — skip, no SQL.
1355 cur += header_len + rec_len;
1356 continue;
1357 }
1358 WAL_V4_TYPE_CHECKPOINT_MARKER => {
1359 // v7.18 PITR — checkpoint anchor, skip on replay
1360 // (engine state past this point reflects the
1361 // matching snapshot already loaded by the caller).
1362 cur += header_len + rec_len;
1363 continue;
1364 }
1365 WAL_V4_TYPE_AUTO_COMMIT_SQL | WAL_V4_TYPE_TX_COMMIT_SQL => {
1366 // v7.18 PITR — v4 record carries 16 bytes of
1367 // (commit_lsn, commit_unix_us) between the type
1368 // byte and the SQL payload. Replay reads them but
1369 // does not enforce them — the engine doesn't
1370 // surface LSN/clock here. Restore tooling
1371 // (spgctl) parses them via parse_wal_record below.
1372 //
1373 // v7.21 — tx-commit records (0x12) carry a whole
1374 // transaction as a `";\n"`-joined script;
1375 // split_statements is a no-op on the single-
1376 // statement auto-commit form.
1377 let v4_total = header_len + WAL_V4_EXTRA_HEADER + rec_len;
1378 if wal_bytes.len() - cur < v4_total {
1379 // Torn v4 record at the tail — drop, stop.
1380 break;
1381 }
1382 let sql_start = cur + header_len + WAL_V4_EXTRA_HEADER;
1383 let sql_bytes = &wal_bytes[sql_start..sql_start + rec_len];
1384 let sql = std::str::from_utf8(sql_bytes)
1385 .map_err(|e| format!("WAL replay: non-UTF-8 SQL at offset {cur}: {e}"))?;
1386 for stmt in split_statements(sql) {
1387 engine.execute(stmt).map_err(|e| {
1388 format!("WAL replay: apply {stmt:?} at offset {cur} rejected: {e:?}")
1389 })?;
1390 }
1391 applied += 1;
1392 cur += v4_total;
1393 continue;
1394 }
1395 other => {
1396 return Err(format!(
1397 "WAL replay: unknown v3 type byte {other:#04x} at offset {cur}"
1398 ));
1399 }
1400 }
1401 }
1402 let sql_bytes = &wal_bytes[cur + header_len..cur + header_len + rec_len];
1403 let sql = std::str::from_utf8(sql_bytes)
1404 .map_err(|e| format!("WAL replay: non-UTF-8 SQL at offset {cur}: {e}"))?;
1405 engine
1406 .execute(sql)
1407 .map_err(|e| format!("WAL replay: apply {sql:?} at offset {cur} rejected: {e:?}"))?;
1408 applied += 1;
1409 cur += header_len + rec_len;
1410 }
1411 Ok(applied)
1412}
1413
1414/// v7.18 PITR — parsed WAL record, surfaced for restore / verify
1415/// tooling. The replay loop above doesn't expose LSN/timestamp;
1416/// `spgctl restore --to <timestamp>` and `spgctl verify` need them.
1417/// Returned offsets are byte-positions inside the WAL buffer.
1418#[derive(Debug, Clone)]
1419pub struct WalRecord<'a> {
1420 /// Byte offset in the WAL buffer where this record starts.
1421 pub offset: usize,
1422 /// Type byte (0x01 = v3 auto-commit, 0x10 = v4 auto-commit,
1423 /// 0x02 = durability checkpoint marker).
1424 pub type_byte: u8,
1425 /// `Some(lsn)` for v4 records, `None` for v3.
1426 pub commit_lsn: Option<u64>,
1427 /// `Some(unix_us)` for v4 records carrying a clock-set timestamp,
1428 /// `None` for v3 or for v4 records explicitly written with
1429 /// `WAL_V4_NO_CLOCK` (sentinel for "no ClockFn at commit time").
1430 pub commit_unix_us: Option<i64>,
1431 /// SQL payload as borrowed bytes. Empty for durability markers.
1432 pub sql: &'a [u8],
1433}
1434
1435/// v7.18 PITR — iterate over `wal_bytes` yielding one `WalRecord`
1436/// per intact record. Torn-tail records terminate iteration
1437/// silently (same recovery story as `replay_wal_into_engine`).
1438/// Unknown type bytes inside a v3 envelope return `Err` so the
1439/// caller knows the WAL was written by a newer SPG.
1440pub fn parse_wal_records(wal_bytes: &[u8]) -> Result<Vec<WalRecord<'_>>, String> {
1441 let mut out = Vec::new();
1442 let mut cur = 0usize;
1443 while cur < wal_bytes.len() {
1444 if wal_bytes.len() - cur < 4 {
1445 break;
1446 }
1447 let raw_len = u32::from_le_bytes(wal_bytes[cur..cur + 4].try_into().unwrap());
1448 let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
1449 let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
1450 let len_mask = if is_v3 {
1451 !(WAL_V2_SENTINEL | WAL_V3_FLAG)
1452 } else {
1453 !WAL_V2_SENTINEL
1454 };
1455 let rec_len = (raw_len & len_mask) as usize;
1456 let header_len = if is_v3 {
1457 9
1458 } else if is_v2 {
1459 8
1460 } else {
1461 4
1462 };
1463 if wal_bytes.len() - cur < header_len + rec_len {
1464 break;
1465 }
1466 if !is_v3 {
1467 // v1 / v2 records carry no type byte; treat as legacy
1468 // auto-commit SQL with no LSN/time.
1469 let sql = &wal_bytes[cur + header_len..cur + header_len + rec_len];
1470 out.push(WalRecord {
1471 offset: cur,
1472 type_byte: WAL_V3_TYPE_AUTO_COMMIT_SQL,
1473 commit_lsn: None,
1474 commit_unix_us: None,
1475 sql,
1476 });
1477 cur += header_len + rec_len;
1478 continue;
1479 }
1480 let type_byte = wal_bytes[cur + 8];
1481 match type_byte {
1482 WAL_V3_TYPE_AUTO_COMMIT_SQL => {
1483 let sql = &wal_bytes[cur + header_len..cur + header_len + rec_len];
1484 out.push(WalRecord {
1485 offset: cur,
1486 type_byte,
1487 commit_lsn: None,
1488 commit_unix_us: None,
1489 sql,
1490 });
1491 cur += header_len + rec_len;
1492 }
1493 WAL_V3_TYPE_DURABILITY_CHECKPOINT => {
1494 out.push(WalRecord {
1495 offset: cur,
1496 type_byte,
1497 commit_lsn: None,
1498 commit_unix_us: None,
1499 sql: &[],
1500 });
1501 cur += header_len + rec_len;
1502 }
1503 WAL_V4_TYPE_CHECKPOINT_MARKER => {
1504 // v7.18 PITR — payload = (lsn u64)(ts i64)(path_len u16)(path bytes).
1505 // We surface lsn + ts on the WalRecord; the path lives
1506 // in `sql` since the type byte already disambiguates
1507 // record meaning and adding a dedicated field would
1508 // bloat the iterator return type for every variant.
1509 if rec_len < 18 {
1510 return Err(format!(
1511 "WAL parse: checkpoint marker at offset {cur} too short ({rec_len} bytes)"
1512 ));
1513 }
1514 let lsn = u64::from_le_bytes(
1515 wal_bytes[cur + header_len..cur + header_len + 8]
1516 .try_into()
1517 .unwrap(),
1518 );
1519 let ts_raw = i64::from_le_bytes(
1520 wal_bytes[cur + header_len + 8..cur + header_len + 16]
1521 .try_into()
1522 .unwrap(),
1523 );
1524 let path_len = u16::from_le_bytes(
1525 wal_bytes[cur + header_len + 16..cur + header_len + 18]
1526 .try_into()
1527 .unwrap(),
1528 ) as usize;
1529 if rec_len < 18 + path_len {
1530 return Err(format!(
1531 "WAL parse: checkpoint marker at offset {cur} truncated path"
1532 ));
1533 }
1534 let path_start = cur + header_len + 18;
1535 let path_bytes = &wal_bytes[path_start..path_start + path_len];
1536 let commit_unix_us = if ts_raw == WAL_V4_NO_CLOCK {
1537 None
1538 } else {
1539 Some(ts_raw)
1540 };
1541 out.push(WalRecord {
1542 offset: cur,
1543 type_byte,
1544 commit_lsn: Some(lsn),
1545 commit_unix_us,
1546 sql: path_bytes,
1547 });
1548 cur += header_len + rec_len;
1549 }
1550 WAL_V4_TYPE_AUTO_COMMIT_SQL | WAL_V4_TYPE_TX_COMMIT_SQL | WAL_V5_TYPE_ROW_REDO => {
1551 let v4_total = header_len + WAL_V4_EXTRA_HEADER + rec_len;
1552 if wal_bytes.len() - cur < v4_total {
1553 break;
1554 }
1555 let lsn = u64::from_le_bytes(
1556 wal_bytes[cur + header_len..cur + header_len + 8]
1557 .try_into()
1558 .unwrap(),
1559 );
1560 let ts_raw = i64::from_le_bytes(
1561 wal_bytes[cur + header_len + 8..cur + header_len + 16]
1562 .try_into()
1563 .unwrap(),
1564 );
1565 let commit_unix_us = if ts_raw == WAL_V4_NO_CLOCK {
1566 None
1567 } else {
1568 Some(ts_raw)
1569 };
1570 let sql_start = cur + header_len + WAL_V4_EXTRA_HEADER;
1571 let sql = &wal_bytes[sql_start..sql_start + rec_len];
1572 out.push(WalRecord {
1573 offset: cur,
1574 type_byte,
1575 commit_lsn: Some(lsn),
1576 commit_unix_us,
1577 sql,
1578 });
1579 cur += v4_total;
1580 }
1581 other => {
1582 return Err(format!(
1583 "WAL parse: unknown type byte {other:#04x} at offset {cur}"
1584 ));
1585 }
1586 }
1587 }
1588 Ok(out)
1589}
1590
1591/// v7.1 — predicate for "should the next `execute()` mutate the
1592/// WAL?" Returns `false` for SELECT / SHOW / EXPLAIN / BEGIN /
1593/// COMMIT / ROLLBACK and the SPG-specific verbs that don't go
1594/// through the auto-commit record path on the server (CHECKPOINT,
1595/// COMPACT). Conservative: anything we don't explicitly know is
1596/// read-only falls through to "write a WAL record".
1597fn sql_is_read_only(sql: &str) -> bool {
1598 let t = sql.trim_start();
1599 let head = t
1600 .split(|c: char| c.is_whitespace() || c == ';' || c == '(')
1601 .next()
1602 .unwrap_or("");
1603 matches!(
1604 head.to_ascii_lowercase().as_str(),
1605 "select"
1606 | "show"
1607 | "explain"
1608 | "begin"
1609 | "commit"
1610 | "rollback"
1611 | "checkpoint"
1612 | "compact"
1613 | "wait"
1614 | "with"
1615 )
1616}
1617
1618/// Embedded SPG database handle. Owns an `Engine` + provides
1619/// ergonomic wrappers around `execute` and `query`. Drops the
1620/// engine on `Drop` — no WAL flush / fsync, because v6.10.3
1621/// is in-memory only.
1622#[derive(Debug)]
1623pub struct Database {
1624 engine: Engine,
1625 /// v7.1 — persistence sidecar. When `Some(p)`, every
1626 /// `execute(sql)` that mutates state appends a v4
1627 /// `auto_commit_sql` WAL record + fsyncs before the call
1628 /// returns; `Drop` writes a final catalog snapshot to
1629 /// `<db_path>` so the next session boots from a clean
1630 /// snapshot + an empty WAL. `None` = in-memory only (the
1631 /// v6.10.3 shape).
1632 persistence: Option<PersistenceCtx>,
1633 /// v7.18 PITR — monotonic per-database commit LSN. Increments
1634 /// before each successful WAL append; bootstrapped at
1635 /// open_path from `max(parse_wal_records → commit_lsn)` so
1636 /// reopen never reuses an LSN. In-memory databases start at
1637 /// 0 and never advance (no WAL = no LSN-meaningful records).
1638 commit_lsn: AtomicU64,
1639 /// v7.21 (round-12 polish) — explicit-transaction WAL buffer.
1640 /// `Some` between an engine-accepted BEGIN and its
1641 /// COMMIT / ROLLBACK on a persistent database. In-transaction
1642 /// mutations only touch the engine's shadow catalog and report
1643 /// `modified_catalog: false`, so the per-statement auto-commit
1644 /// append never fires for them; their bind-final SQL collects
1645 /// here instead and COMMIT flushes the lot as ONE atomic
1646 /// `WAL_V4_TYPE_TX_COMMIT_SQL` record (ROLLBACK just drops it).
1647 /// Always `None` for in-memory databases.
1648 tx_wal: Option<TxWalBuffer>,
1649}
1650
1651/// See [`Database::tx_wal`].
1652#[derive(Debug, Default)]
1653struct TxWalBuffer {
1654 /// Bind-final SQL of every non-read-only statement the engine
1655 /// accepted inside the open transaction, in execution order.
1656 statements: Vec<String>,
1657 /// `(savepoint_name, statements.len() at SAVEPOINT time)` —
1658 /// `ROLLBACK TO SAVEPOINT` truncates `statements` back to the
1659 /// recorded mark so the WAL record matches what the engine
1660 /// keeps. PG name-reuse semantics (latest wins).
1661 savepoints: Vec<(String, usize)>,
1662}
1663
1664/// Statement-level transaction-control classification for the WAL
1665/// buffer. Runs AFTER the engine accepted the statement, so the
1666/// engine stays the single validator — this only mirrors state.
1667enum TxControl {
1668 Begin,
1669 Commit,
1670 Rollback,
1671 RollbackToSavepoint(String),
1672 Savepoint(String),
1673 ReleaseSavepoint,
1674}
1675
1676fn tx_control_kind(sql: &str) -> Option<TxControl> {
1677 let mut words = sql
1678 .split(|c: char| c.is_whitespace() || c == ';')
1679 .filter(|w| !w.is_empty())
1680 .map(str::to_ascii_lowercase);
1681 let head = words.next()?;
1682 match head.as_str() {
1683 "begin" | "start" => Some(TxControl::Begin),
1684 "commit" | "end" => Some(TxControl::Commit),
1685 "savepoint" => words.next().map(TxControl::Savepoint),
1686 "release" => Some(TxControl::ReleaseSavepoint),
1687 "rollback" => match words.next().as_deref() {
1688 // ROLLBACK TO [SAVEPOINT] <name>
1689 Some("to") => {
1690 let next = words.next()?;
1691 let name = if next == "savepoint" {
1692 words.next()?
1693 } else {
1694 next
1695 };
1696 Some(TxControl::RollbackToSavepoint(name))
1697 }
1698 _ => Some(TxControl::Rollback),
1699 },
1700 _ => None,
1701 }
1702}
1703
1704#[derive(Debug)]
1705#[allow(dead_code)] // `wal_dir`/`current_chunk_path` are read at boot; kept for Drop/diag introspection.
1706struct PersistenceCtx {
1707 db_path: PathBuf,
1708 /// v7.19 — WAL chunk directory at `<db_path>.wal/`.
1709 /// Replaces the v7.18 single-file `<db_path>.wal` layout.
1710 /// Each chunk file inside is named
1711 /// `<unix_us>_<leading_lsn>.wal` (zero-padded to 16 digits
1712 /// so default-lex sort = LSN order).
1713 wal_dir: PathBuf,
1714 /// Path of the currently-open chunk file inside `wal_dir`.
1715 /// Rotated at checkpoint and whenever the chunk crosses
1716 /// `checkpoint_threshold_bytes`. CoW-2 (v7.34) wraps it in
1717 /// `Arc<Mutex<…>>` because the background-checkpoint worker
1718 /// performs the rotation; this struct keeps a clone so Drop /
1719 /// diag introspection still see the live path.
1720 current_chunk_path: Arc<Mutex<PathBuf>>,
1721 /// v7.19 P3 — retention sweeper handle. `Some` when
1722 /// `SPG_PITR_RETENTION_HOURS > 0` at open_path time; `None`
1723 /// when retention is disabled (the default; v7.18 behaviour
1724 /// preserved). The thread polls `wal_dir` every
1725 /// `SPG_PITR_RETENTION_CHECK_SEC` seconds, archives via
1726 /// `SPG_PITR_ARCHIVE_CMD` if set, then deletes chunks older
1727 /// than the retention window. Signalled to exit via
1728 /// `retention_shutdown` on Drop.
1729 retention_shutdown: Option<Arc<AtomicBool>>,
1730 retention_thread: Option<std::thread::JoinHandle<()>>,
1731 /// v7.20 — background WAL flusher for
1732 /// `SPG_SYNCHRONOUS_COMMIT=off`. `None` in the default
1733 /// synchronous mode. Flushes the pending batch every
1734 /// `SPG_WAL_WRITER_DELAY_MS`; signalled + joined on Drop
1735 /// before the final checkpoint so clean shutdown never
1736 /// loses confirmed commits.
1737 flusher_shutdown: Option<Arc<AtomicBool>>,
1738 flusher_thread: Option<std::thread::JoinHandle<()>>,
1739 /// v7.20 P2 — group-commit WAL. Shared with WalTickets
1740 /// returned by the buffered write path so `wait()` can run
1741 /// after the engine write lock is released.
1742 wal: Arc<WalGroup>,
1743 checkpoint_threshold_bytes: u64,
1744 /// v7.1.4 — `<db_path>.spg/segments/` directory. Cold-tier
1745 /// segments produced by `freeze_oldest_to_cold` / compaction
1746 /// are persisted here as `seg_<id>.spg` files; the manifest
1747 /// at `<db_path>.spg/manifest.v10` records every active
1748 /// segment + its CRC32 so the next boot can verify + reload.
1749 cold_segments_dir: PathBuf,
1750 cold_segment_paths: BTreeMap<u32, PathBuf>,
1751 /// v7.17.0 Phase 6.2 — cross-process exclusion lock. Acquired
1752 /// via `fs::create_dir` on `<db_path>.lock` at open_path
1753 /// entry; released on Drop by `fs::remove_dir`. atomic on
1754 /// every supported platform. A second process opening the
1755 /// same path while the first is still alive hits the
1756 /// create_dir failure and returns
1757 /// `EngineError::Unsupported("database is locked by another
1758 /// process: …")`. Stale locks (process crashed mid-session)
1759 /// must be cleared via `Database::force_unlock(path)` —
1760 /// SPG can't safely fingerprint who owned a stale directory
1761 /// without a libc dep, which would violate spg-embedded's
1762 /// zero-deps charter.
1763 lock_path: PathBuf,
1764 /// CoW-2 (v7.34) — background-checkpoint worker. `None` only
1765 /// transiently inside `Drop` after the worker has been signalled
1766 /// and joined. The worker carries Arc clones of `wal` and
1767 /// `current_chunk_path`, so it can rotate the active chunk and
1768 /// reflect the new path back here even after the front-end has
1769 /// returned to the caller.
1770 checkpoint_worker: Option<CheckpointWorker>,
1771}
1772
1773impl Database {
1774 /// Open a fresh in-memory database. No WAL, no catalog
1775 /// snapshot on disk — perfect for tests + short-lived
1776 /// CLI tools.
1777 #[must_use]
1778 pub fn open_in_memory() -> Self {
1779 Self {
1780 engine: engine_with_query_byte_budget(Engine::new().with_clock(wall_clock_micros)),
1781 persistence: None,
1782 commit_lsn: AtomicU64::new(0),
1783 tx_wal: None,
1784 }
1785 }
1786
1787 /// v7.1 — Open or create a persistent database backed by
1788 /// the file at `db_path`. The WAL lives at `db_path` +
1789 /// ".wal" (e.g. `./data/spg.db` → `./data/spg.db.wal`). Boot
1790 /// path:
1791 ///
1792 /// 1. If `db_path` exists, restore the catalog snapshot.
1793 /// 2. If the WAL exists, replay every record into the
1794 /// restored engine — the same recovery story
1795 /// `spg-server` uses.
1796 /// 3. Open the WAL in append+sync mode so subsequent
1797 /// `execute()` writes durably commit (one fsync per
1798 /// mutation).
1799 ///
1800 /// `Drop` writes a final catalog snapshot + truncates the
1801 /// WAL — operators that need a sync barrier at a specific
1802 /// point use `checkpoint()` explicitly.
1803 pub fn open_path(db_path: impl AsRef<Path>) -> Result<Self, EngineError> {
1804 let db_path = db_path.as_ref().to_path_buf();
1805 // v7.19 — WAL is a directory of chunk files. Legacy
1806 // single-file path stays variable-named `wal_path` for
1807 // the backward-compat migration block below.
1808 let wal_path = {
1809 let mut p = db_path.clone();
1810 let name = p
1811 .file_name()
1812 .map(|n| {
1813 let mut s = n.to_os_string();
1814 s.push(".wal");
1815 s
1816 })
1817 .unwrap_or_else(|| std::ffi::OsString::from(".wal"));
1818 p.set_file_name(name);
1819 p
1820 };
1821 let wal_dir = wal_path.clone();
1822 if let Some(parent) = db_path.parent()
1823 && !parent.as_os_str().is_empty()
1824 {
1825 std::fs::create_dir_all(parent).map_err(io_err)?;
1826 }
1827 // v7.17.0 Phase 6.2 — acquire cross-process exclusion
1828 // lock before touching any catalog / WAL bytes. atomic
1829 // mkdir on every supported platform; a second process
1830 // opening the same path while the first is still alive
1831 // hits the create_dir failure and gets a clear error.
1832 let lock_path = {
1833 let mut p = db_path.clone();
1834 let name = p
1835 .file_name()
1836 .map(|n| {
1837 let mut s = n.to_os_string();
1838 s.push(".lock");
1839 s
1840 })
1841 .unwrap_or_else(|| std::ffi::OsString::from(".lock"));
1842 p.set_file_name(name);
1843 p
1844 };
1845 acquire_path_lock(&lock_path)?;
1846 let mut engine = if db_path.exists() {
1847 let bytes = std::fs::read(&db_path).map_err(io_err)?;
1848 let engine = Engine::restore_envelope(&bytes).map_err(|e| {
1849 EngineError::Storage(spg_storage::StorageError::Corrupt(format!(
1850 "restore from {}: {e}",
1851 db_path.display()
1852 )))
1853 })?;
1854 engine_with_query_byte_budget(engine.with_clock(wall_clock_micros))
1855 } else {
1856 engine_with_query_byte_budget(Engine::new().with_clock(wall_clock_micros))
1857 };
1858 // v7.1.4 — manifest-driven cold-segment reload. The
1859 // manifest sidecar pairs the catalog snapshot CRC with a
1860 // list of `(segment_id, path, crc32)` triples; verify
1861 // before loading so a torn or stale manifest doesn't
1862 // surface phantom data.
1863 let cold_segments_dir = {
1864 let parent = db_path.parent().unwrap_or_else(|| Path::new("."));
1865 let stem = db_path
1866 .file_stem()
1867 .unwrap_or_else(|| std::ffi::OsStr::new("db"))
1868 .to_string_lossy()
1869 .into_owned();
1870 parent.join(format!("{stem}.spg")).join("segments")
1871 };
1872 let mut cold_segment_paths: BTreeMap<u32, PathBuf> = BTreeMap::new();
1873 let manifest_pth = spg_manifest_path(&db_path);
1874 if manifest_pth.exists() && db_path.exists() {
1875 let m_bytes = std::fs::read(&manifest_pth).map_err(io_err)?;
1876 if let Ok(m) = CatalogManifest::deserialize(&m_bytes) {
1877 let snap_bytes = std::fs::read(&db_path).map_err(io_err)?;
1878 let snap_crc = spg_crypto::crc32::crc32(&snap_bytes);
1879 if snap_crc == m.catalog_crc32 {
1880 for entry in &m.cold_segments {
1881 if let Ok(seg_bytes) = std::fs::read(&entry.path) {
1882 let computed = spg_crypto::crc32::crc32(&seg_bytes);
1883 if computed != entry.crc32 {
1884 eprintln!(
1885 "spg-embedded: manifest skip segment {}: CRC mismatch",
1886 entry.segment_id
1887 );
1888 continue;
1889 }
1890 if engine.catalog().cold_segment(entry.segment_id).is_some() {
1891 // Already loaded via Catalog::clone path (shouldn't happen
1892 // since Engine::new + restore_envelope don't populate cold).
1893 continue;
1894 }
1895 let mut new_cat = engine.catalog().clone();
1896 if let Err(e) =
1897 new_cat.load_segment_bytes_at(entry.segment_id, seg_bytes)
1898 {
1899 eprintln!(
1900 "spg-embedded: manifest load segment {} failed: {e}",
1901 entry.segment_id
1902 );
1903 continue;
1904 }
1905 engine.replace_catalog(new_cat);
1906 cold_segment_paths.insert(entry.segment_id, entry.path.clone());
1907 } else {
1908 eprintln!(
1909 "spg-embedded: manifest skip segment {}: file unreadable",
1910 entry.segment_id
1911 );
1912 }
1913 }
1914 }
1915 }
1916 }
1917 // CoW-4 (v7.34) — D10 + missing-manifest fallback. Walk
1918 // `<db>.spg/segments/` and attach any `seg_<id>.spg` file that
1919 // the manifest didn't already cover (manifest absent / CRC
1920 // mismatched / a fresher freeze landed after the last
1921 // checkpoint wrote its manifest). The segment binary's own
1922 // magic + CRC32 guards integrity — no need to trust a stale
1923 // manifest entry to trust the file.
1924 scan_cold_segments_dir(&cold_segments_dir, &mut engine, &mut cold_segment_paths);
1925 // v7.19 — chunked WAL on-disk layout.
1926 //
1927 // Three cases handled here:
1928 //
1929 // 1. wal_dir exists as a DIRECTORY → scan its
1930 // `<unix_us>_<leading_lsn>.wal` chunks (sorted
1931 // lexicographically = chunk-creation order), replay
1932 // them in sequence, advance the LSN watermark to the
1933 // max commit_lsn seen.
1934 //
1935 // 2. wal_path exists as a FILE → legacy v7.18 layout.
1936 // Migrate it: create `wal_dir/`, move the single file
1937 // inside as `0000000000000000_0000000000000000.wal`,
1938 // then fall through to case 1's replay loop.
1939 //
1940 // 3. Neither exists → fresh database; create wal_dir.
1941 let mut initial_lsn: u64 = 0;
1942 if wal_path.is_file() {
1943 // Case 2: legacy single-file WAL migration.
1944 let legacy_bytes = std::fs::read(&wal_path).map_err(io_err)?;
1945 std::fs::remove_file(&wal_path).map_err(io_err)?;
1946 std::fs::create_dir_all(&wal_dir).map_err(io_err)?;
1947 if !legacy_bytes.is_empty() {
1948 let migrated = wal_dir.join(legacy_chunk_filename());
1949 std::fs::write(&migrated, &legacy_bytes).map_err(io_err)?;
1950 }
1951 } else if !wal_dir.exists() {
1952 // Case 3: fresh database.
1953 std::fs::create_dir_all(&wal_dir).map_err(io_err)?;
1954 }
1955 // Cases 1 + 2 share replay logic now that wal_dir is
1956 // guaranteed to exist (and may be empty for case 3).
1957 //
1958 // Two-pass replay so we don't double-apply records the
1959 // snapshot already reflects:
1960 //
1961 // 1. Find the highest commit_lsn carried by a
1962 // checkpoint_marker across all chunks. That LSN is the
1963 // snapshot's high-water mark — anything ≤ it is
1964 // already in `<db_path>` and replaying it would
1965 // DuplicateTable / double-insert.
1966 // 2. Replay only records strictly above that LSN.
1967 //
1968 // Case 2 migration (legacy single-file WAL) lands here
1969 // too: the migrated chunk has no marker so the LSN floor
1970 // is 0 and every record applies — exactly the v7.18
1971 // behaviour the migration is supposed to preserve.
1972 let chunk_paths = sorted_wal_chunks(&wal_dir).map_err(io_err)?;
1973 let mut snapshot_lsn: u64 = 0;
1974 for chunk in &chunk_paths {
1975 let bytes = std::fs::read(chunk).map_err(io_err)?;
1976 if let Ok(records) = parse_wal_records(&bytes) {
1977 for r in &records {
1978 if r.type_byte == WAL_V4_TYPE_CHECKPOINT_MARKER {
1979 if let Some(l) = r.commit_lsn {
1980 if l > snapshot_lsn {
1981 snapshot_lsn = l;
1982 }
1983 }
1984 }
1985 }
1986 }
1987 }
1988 let mut quarantined: Vec<QuarantinedStmt> = Vec::new();
1989 for chunk in &chunk_paths {
1990 let bytes = std::fs::read(chunk).map_err(io_err)?;
1991 if bytes.is_empty() {
1992 continue;
1993 }
1994 replay_wal_filtered(&bytes, &mut engine, snapshot_lsn, &mut quarantined)
1995 .map_err(|m| EngineError::Storage(spg_storage::StorageError::Corrupt(m)))?;
1996 if let Ok(records) = parse_wal_records(&bytes) {
1997 if let Some(max) = records.iter().filter_map(|r| r.commit_lsn).max() {
1998 if max > initial_lsn {
1999 initial_lsn = max;
2000 }
2001 }
2002 }
2003 }
2004 // v7.30.1 (mailrs round-24 ask 2) — replay rejects no longer
2005 // brick the open. Persist the rejected statements beside the
2006 // WAL chunks for forensics and say so loudly; the boot
2007 // continues with every other record applied.
2008 if !quarantined.is_empty() {
2009 let mut body = String::new();
2010 for q in &quarantined {
2011 body.push_str(&format_quarantine_line(q));
2012 }
2013 let qpath = wal_dir.join(format!(
2014 "quarantine-{:016x}.log",
2015 wall_clock_micros().max(0) as u64
2016 ));
2017 match std::fs::write(&qpath, &body) {
2018 Ok(()) => eprintln!(
2019 "spg-embedded: WAL replay quarantined {} statement(s) — boot continues; \
2020 forensics at {}",
2021 quarantined.len(),
2022 qpath.display()
2023 ),
2024 Err(e) => eprintln!(
2025 "spg-embedded: WAL replay quarantined {} statement(s) — boot continues; \
2026 quarantine file write FAILED ({e}), entries follow:\n{body}",
2027 quarantined.len()
2028 ),
2029 }
2030 }
2031 // Open the "current" chunk — either the last existing
2032 // chunk file (so subsequent appends extend it until the
2033 // size threshold rotates) or a fresh first chunk.
2034 let now_us = wall_clock_micros();
2035 let current_chunk_path = if let Some(last) = chunk_paths.last() {
2036 last.clone()
2037 } else {
2038 wal_dir.join(chunk_filename(now_us, initial_lsn + 1))
2039 };
2040 let wal_file = OpenOptions::new()
2041 .create(true)
2042 .append(true)
2043 .read(true)
2044 .open(¤t_chunk_path)
2045 .map_err(io_err)?;
2046 // Persist the (possibly freshly created) chunk's directory entry.
2047 fsync_dir(&wal_dir);
2048 let wal_len = wal_file.metadata().map_err(io_err)?.len();
2049 let wal = Arc::new(WalGroup::new(wal_file, wal_len));
2050 // v7.19 P3 — spawn retention sweep thread when the
2051 // operator opted in via SPG_PITR_RETENTION_HOURS > 0.
2052 // Otherwise stay on the v7.18 behaviour (chunks accumulate
2053 // until something else — backup-pitr archival, manual
2054 // cleanup — moves them).
2055 let retention_hours = pitr_retention_hours();
2056 let (retention_shutdown, retention_thread) = if retention_hours > 0 {
2057 let shutdown = Arc::new(AtomicBool::new(false));
2058 let shutdown_clone = Arc::clone(&shutdown);
2059 let wal_dir_clone = wal_dir.clone();
2060 let check_interval = std::time::Duration::from_secs(pitr_retention_check_sec());
2061 let archive_cmd = pitr_archive_cmd();
2062 let handle = std::thread::Builder::new()
2063 .name("spg-pitr-retention".into())
2064 .spawn(move || {
2065 retention_sweep_loop(
2066 wal_dir_clone,
2067 retention_hours,
2068 check_interval,
2069 archive_cmd,
2070 shutdown_clone,
2071 );
2072 })
2073 .map_err(io_err)?;
2074 (Some(shutdown), Some(handle))
2075 } else {
2076 (None, None)
2077 };
2078 // v7.20 — background flusher for SPG_SYNCHRONOUS_COMMIT=off.
2079 let (flusher_shutdown, flusher_thread) = if synchronous_commit_on() {
2080 (None, None)
2081 } else {
2082 let shutdown = Arc::new(AtomicBool::new(false));
2083 let shutdown_clone = Arc::clone(&shutdown);
2084 let group = Arc::clone(&wal);
2085 let interval = std::time::Duration::from_millis(wal_writer_delay_ms());
2086 let handle = std::thread::Builder::new()
2087 .name("spg-wal-flusher".into())
2088 .spawn(move || {
2089 while !shutdown_clone.load(Ordering::SeqCst) {
2090 std::thread::sleep(interval);
2091 if let Err(e) = group.flush_now() {
2092 eprintln!("spg-embedded: background WAL flush failed: {e:?}");
2093 }
2094 }
2095 // Final drain on shutdown signal.
2096 let _ = group.flush_now();
2097 })
2098 .map_err(io_err)?;
2099 (Some(shutdown), Some(handle))
2100 };
2101 // v7.34 (crash-recovery P0 #2) — arm row-level redo capture for
2102 // subsequent writes (AFTER replay, so re-executed SQL records
2103 // don't capture; 0x13 records replay via apply_redo and never do).
2104 if row_redo_enabled() {
2105 engine.set_redo_capture(true);
2106 }
2107 Ok(Self {
2108 engine,
2109 commit_lsn: AtomicU64::new(initial_lsn),
2110 tx_wal: None,
2111 persistence: Some(PersistenceCtx {
2112 db_path,
2113 wal_dir,
2114 current_chunk_path: Arc::new(Mutex::new(current_chunk_path)),
2115 wal,
2116 checkpoint_threshold_bytes: default_checkpoint_threshold_bytes(),
2117 cold_segments_dir,
2118 cold_segment_paths,
2119 lock_path,
2120 retention_shutdown,
2121 retention_thread,
2122 flusher_shutdown,
2123 flusher_thread,
2124 checkpoint_worker: Some(CheckpointWorker::spawn()),
2125 }),
2126 })
2127 }
2128
2129 /// v7.1.4 — freeze the oldest `max_rows` of `table_name`'s
2130 /// hot tier into a brand-new cold-tier segment + persist
2131 /// it to disk. Same semantics as `spg-server`'s freezer
2132 /// thread; embedded just runs the freeze synchronously on
2133 /// the caller's thread. Persistence + manifest update
2134 /// happen as part of the next `checkpoint()` (or on Drop).
2135 pub fn freeze_oldest_to_cold(
2136 &mut self,
2137 table_name: &str,
2138 index_name: &str,
2139 max_rows: usize,
2140 ) -> Result<spg_storage::FreezeReport, EngineError> {
2141 let report = self
2142 .engine
2143 .freeze_oldest_to_cold(table_name, index_name, max_rows)?;
2144 if let Some(p) = &mut self.persistence {
2145 std::fs::create_dir_all(&p.cold_segments_dir).map_err(io_err)?;
2146 let final_path = p
2147 .cold_segments_dir
2148 .join(format!("seg_{}.spg", report.segment_id));
2149 let tmp_path = p
2150 .cold_segments_dir
2151 .join(format!("seg_{}.spg.tmp", report.segment_id));
2152 std::fs::write(&tmp_path, &report.segment_bytes).map_err(io_err)?;
2153 std::fs::rename(&tmp_path, &final_path).map_err(io_err)?;
2154 p.cold_segment_paths.insert(report.segment_id, final_path);
2155 }
2156 Ok(report)
2157 }
2158
2159 /// v7.1 — override the auto-checkpoint WAL-size ceiling for
2160 /// this `Database` instance. Default is
2161 /// `SPG_EMBEDDED_CHECKPOINT_BYTES` env (4 MiB if unset); the
2162 /// setter wins. No-op when the database is in-memory.
2163 pub fn set_checkpoint_threshold_bytes(&mut self, bytes: u64) {
2164 if let Some(p) = &mut self.persistence {
2165 p.checkpoint_threshold_bytes = bytes.max(1);
2166 }
2167 }
2168
2169 /// v7.31 (memory campaign, round-26 ask 1/ask 4) — per-bucket
2170 /// memory snapshot for the embedding host. Poll it from prod to
2171 /// see where resident bytes live (rows / representation /
2172 /// indexes per table) and to drive host-side shedding before
2173 /// the kernel does it. Same numbers as the server path's
2174 /// `SELECT * FROM spg_memory_stats`.
2175 #[must_use]
2176 pub fn memory_stats(&self) -> spg_engine::MemoryStats {
2177 let mut stats = self.engine.memory_stats();
2178 // v7.31 C2 — fill in bucket D: the engine leaves `wal_bytes`
2179 // None (it has no WAL); we report the live (uncheckpointed)
2180 // WAL footprint via the same `written_len()` meter `metrics()`
2181 // reads. In-memory databases have no persistence → stays None.
2182 if let Some(p) = &self.persistence {
2183 stats.wal_bytes = Some(p.wal.written_len());
2184 }
2185 stats
2186 }
2187
2188 /// v7.1 — flush a fresh catalog snapshot to `db_path` and
2189 /// rotate the WAL. Idempotent; cheap when nothing has happened
2190 /// since the last checkpoint. No-op when the database is in-memory.
2191 ///
2192 /// CoW-2 (v7.34): the heavy half (serialize + tmp+rename + fsync +
2193 /// marker enqueue + chunk rotation) runs on a dedicated worker thread
2194 /// so the caller's engine borrow is released after the cheap capture
2195 /// step. This entry point keeps the **synchronous** contract — it
2196 /// waits for the worker to finish before returning — so existing
2197 /// callers, tests, and operator scripts see no behaviour change;
2198 /// they just pay one extra hop. The non-blocking variant lives at
2199 /// `trigger_checkpoint`, used by the auto-checkpoint hot path so
2200 /// the write that crossed `SPG_EMBEDDED_CHECKPOINT_BYTES` doesn't
2201 /// stall on disk IO.
2202 ///
2203 /// Called automatically when:
2204 /// - the WAL grows past `SPG_EMBEDDED_CHECKPOINT_BYTES` (default
2205 /// 4 MiB) at the end of an `execute()` (via `trigger_checkpoint`,
2206 /// non-blocking), and
2207 /// - `Drop` runs (synchronous; best-effort, failures logged).
2208 pub fn checkpoint(&mut self) -> Result<(), EngineError> {
2209 if self.persistence.is_none() {
2210 return Ok(());
2211 }
2212 // Drain any prior async checkpoint first so our snapshot reflects
2213 // post-it state (and so a sticky error from it surfaces here, not
2214 // smeared across the next two `wait`s).
2215 self.wait_checkpoint()?;
2216 let Some(job) = self.snapshot_checkpoint_job() else {
2217 return Ok(());
2218 };
2219 let Some(worker) = self
2220 .persistence
2221 .as_ref()
2222 .and_then(|p| p.checkpoint_worker.as_ref())
2223 else {
2224 return Ok(());
2225 };
2226 // `wait_checkpoint` above guaranteed idle; `try_enqueue` only
2227 // returns Ok(false) when busy, so we expect Ok(true) here. The
2228 // bool is dropped — we wait unconditionally to honour the sync
2229 // contract.
2230 let _ = worker.try_enqueue(job)?;
2231 self.wait_checkpoint()
2232 }
2233
2234 /// CoW-2 (v7.34) — non-blocking checkpoint trigger used by the
2235 /// auto-checkpoint hot path (`wal_after_ok` over the threshold).
2236 /// Captures the engine state under `&mut self` then signals the
2237 /// background worker and returns; the serialize / fsync / rotate
2238 /// sequence runs on the worker thread. If a checkpoint is already
2239 /// pending or in flight, the new trigger is silently dropped —
2240 /// the next threshold crossing picks up the newer state.
2241 ///
2242 /// Sticky errors from a prior async run surface here (via
2243 /// `try_enqueue`), so a failed background checkpoint still reaches
2244 /// the caller eventually rather than vanishing.
2245 fn trigger_checkpoint(&mut self) -> Result<(), EngineError> {
2246 if self.persistence.is_none() {
2247 return Ok(());
2248 }
2249 let Some(job) = self.snapshot_checkpoint_job() else {
2250 return Ok(());
2251 };
2252 let Some(worker) = self
2253 .persistence
2254 .as_ref()
2255 .and_then(|p| p.checkpoint_worker.as_ref())
2256 else {
2257 return Ok(());
2258 };
2259 let _accepted = worker.try_enqueue(job)?;
2260 Ok(())
2261 }
2262
2263 /// CoW-2 (v7.34) — block until the background checkpoint worker is
2264 /// idle. Used by sync `checkpoint()` and by Drop to ensure the final
2265 /// snapshot is durable before the process exits.
2266 fn wait_checkpoint(&self) -> Result<(), EngineError> {
2267 match self
2268 .persistence
2269 .as_ref()
2270 .and_then(|p| p.checkpoint_worker.as_ref())
2271 {
2272 Some(w) => w.wait(),
2273 None => Ok(()),
2274 }
2275 }
2276
2277 /// CoW-2 (v7.34) — capture a checkpoint job under `&mut self` (or
2278 /// `&self`, since reading from atomics + cheap clones don't mutate).
2279 /// Returns `None` if the database is in-memory.
2280 fn snapshot_checkpoint_job(&self) -> Option<CheckpointJob> {
2281 let p = self.persistence.as_ref()?;
2282 Some(CheckpointJob {
2283 snapshot: self.engine.snapshot_data(),
2284 marker_lsn: self.commit_lsn.load(Ordering::SeqCst),
2285 db_path: p.db_path.clone(),
2286 wal_dir: p.wal_dir.clone(),
2287 wal: Arc::clone(&p.wal),
2288 cold_segments: p
2289 .cold_segment_paths
2290 .iter()
2291 .map(|(&id, path)| (id, path.clone()))
2292 .collect(),
2293 current_chunk_path: Arc::clone(&p.current_chunk_path),
2294 })
2295 }
2296
2297 /// Restore a database from a previously-captured catalog
2298 /// snapshot. Pairs with `Database::snapshot()` for
2299 /// round-tripping in-memory state without going through
2300 /// the `spg-server` WAL.
2301 pub fn restore(snapshot: &[u8]) -> Result<Self, EngineError> {
2302 let engine = Engine::restore_envelope(snapshot).map_err(|e| {
2303 EngineError::Storage(spg_storage::StorageError::Corrupt(format!("restore: {e}")))
2304 })?;
2305 Ok(Self {
2306 engine,
2307 persistence: None,
2308 commit_lsn: AtomicU64::new(0),
2309 tx_wal: None,
2310 })
2311 }
2312
2313 /// Take a catalog snapshot suitable for `Database::restore`.
2314 /// The bytes are SPG's canonical catalog envelope (FILE_MAGIC
2315 /// + version + payload); round-trips through every released
2316 /// SPG version per the STABILITY contract.
2317 #[must_use]
2318 pub fn snapshot(&self) -> Vec<u8> {
2319 self.engine.snapshot()
2320 }
2321
2322 /// v7.36 (mailrs ask #4) — programmatic `EXPLAIN` over `sql`,
2323 /// returning each line of the QUERY PLAN as an owned `String`.
2324 /// Skips the WAL (`EXPLAIN` is read-only) and runs against the
2325 /// engine's live catalog. Dogfood callers can attach the plan
2326 /// to a report or assert on its shape from a test without
2327 /// having to parse a tabular result themselves.
2328 ///
2329 /// `sql` is the inner SELECT (no `EXPLAIN` prefix); the helper
2330 /// adds it. For SQL with `$N` placeholders, substitute them
2331 /// into the SQL string before calling — programmatic
2332 /// placeholder-aware EXPLAIN is on the v7.37 plan.
2333 ///
2334 /// # Errors
2335 /// Propagates parse errors on `sql`, plus any engine error the
2336 /// `EXPLAIN` itself raises (table not found, column not found).
2337 pub fn explain(&self, sql: &str) -> Result<Vec<String>, EngineError> {
2338 let full = format!("EXPLAIN {sql}");
2339 let result = self.engine.execute_readonly(&full)?;
2340 Ok(extract_query_plan_lines(result))
2341 }
2342
2343 /// Write-side single-statement execute. Runs the SQL through
2344 /// the buffered group-commit pipeline and blocks until the
2345 /// resulting batch's WAL fsync returns. Read-only statements
2346 /// (SELECT / SHOW / EXPLAIN / BEGIN-COMMIT-ROLLBACK /
2347 /// CHECKPOINT / COMPACT etc.) skip the WAL entirely.
2348 pub fn execute(&mut self, sql: &str) -> Result<QueryResult, EngineError> {
2349 // v7.20 P2 — single-caller convenience over the buffered
2350 // path: enqueue + immediately wait. Batch size is 1 here,
2351 // so the durability behaviour (one fsync before Ok) is
2352 // identical to v7.19. Concurrent callers go through
2353 // `execute_buffered` (AsyncDatabase does) and share the
2354 // leader's fsync.
2355 let (result, ticket) = self.execute_buffered(sql)?;
2356 if let Some(t) = ticket {
2357 t.wait()?;
2358 }
2359 Ok(result)
2360 }
2361
2362 /// v7.20 P2 — group-commit write entry. Runs the engine
2363 /// mutation + encodes/enqueues the WAL record, then RETURNS
2364 /// WITHOUT waiting for the fsync. The caller must call
2365 /// [`WalTicket::wait`] before treating the write as durable
2366 /// — crucially, the caller can (and should) drop whatever
2367 /// lock guards this `Database` first, so the next writer's
2368 /// mutation overlaps this batch's fsync.
2369 ///
2370 /// `None` ticket = nothing hit the WAL (read-only statement,
2371 /// no-op DDL, or in-memory database) — the result is final
2372 /// as returned.
2373 ///
2374 /// # Errors
2375 /// Engine errors propagate unchanged. Auto-checkpoint (when
2376 /// the active chunk crosses the threshold) runs inline and
2377 /// may surface IO errors.
2378 pub fn execute_buffered(
2379 &mut self,
2380 sql: &str,
2381 ) -> Result<(QueryResult, Option<WalTicket>), EngineError> {
2382 let result = self.engine.execute(sql)?;
2383 let modified = matches!(
2384 &result,
2385 QueryResult::CommandOk {
2386 modified_catalog: true,
2387 ..
2388 }
2389 );
2390 let ticket = self.wal_after_ok(sql, modified)?;
2391 Ok((result, ticket))
2392 }
2393
2394 /// v7.21 (round-12 polish) — post-engine WAL bookkeeping shared
2395 /// by the simple ([`Self::execute_buffered`]) and prepared
2396 /// ([`Self::execute_prepared_buffered`]) write paths. `canonical`
2397 /// is the replay text (bind-final for prepared statements);
2398 /// `modified_catalog` comes from the engine result. Three routes:
2399 ///
2400 /// - transaction control → maintain [`Self::tx_wal`]: BEGIN opens
2401 /// the buffer, COMMIT flushes it as ONE atomic
2402 /// `WAL_V4_TYPE_TX_COMMIT_SQL` record, ROLLBACK drops it,
2403 /// SAVEPOINT / ROLLBACK TO mark / truncate it. The engine has
2404 /// already accepted the statement, so this only mirrors state.
2405 /// - inside an open transaction → buffer the statement (shadow-
2406 /// catalog mutations report `modified_catalog: false`, so the
2407 /// auto-commit arm below can't see them).
2408 /// - auto-commit mutation → classic per-statement v4 record.
2409 ///
2410 /// v7.18 PITR — v4 records carry commit LSN + wall-clock micros.
2411 /// The crash window remains one BATCH: replay re-applies
2412 /// idempotently exactly as before, and a torn batch tail drops
2413 /// cleanly (same torn-write handling).
2414 fn wal_after_ok(
2415 &mut self,
2416 canonical: &str,
2417 modified_catalog: bool,
2418 ) -> Result<Option<WalTicket>, EngineError> {
2419 if self.persistence.is_none() {
2420 return Ok(None);
2421 }
2422 let mut record = None;
2423 match tx_control_kind(canonical) {
2424 Some(TxControl::Begin) => {
2425 self.tx_wal = Some(TxWalBuffer::default());
2426 }
2427 Some(TxControl::Commit) => {
2428 if let Some(buf) = self.tx_wal.take()
2429 && !buf.statements.is_empty()
2430 {
2431 let script = buf.statements.join(";\n");
2432 let lsn = self.commit_lsn.fetch_add(1, Ordering::SeqCst) + 1;
2433 record = Some(encode_v4_tx_commit(&script, lsn, wall_clock_micros()));
2434 }
2435 }
2436 Some(TxControl::Rollback) => {
2437 self.tx_wal = None;
2438 }
2439 Some(TxControl::Savepoint(name)) => {
2440 if let Some(buf) = &mut self.tx_wal {
2441 // PG name-reuse semantics: latest mark wins.
2442 buf.savepoints.retain(|(n, _)| n != &name);
2443 let mark = buf.statements.len();
2444 buf.savepoints.push((name, mark));
2445 }
2446 }
2447 Some(TxControl::RollbackToSavepoint(name)) => {
2448 if let Some(buf) = &mut self.tx_wal
2449 && let Some(pos) = buf.savepoints.iter().position(|(n, _)| n == &name)
2450 {
2451 let mark = buf.savepoints[pos].1;
2452 buf.statements.truncate(mark);
2453 // Later savepoints die with the rollback; the
2454 // target itself survives (PG keeps it
2455 // re-rollbackable).
2456 buf.savepoints.truncate(pos + 1);
2457 }
2458 }
2459 Some(TxControl::ReleaseSavepoint) => {
2460 // RELEASE folds the savepoint into the enclosing tx —
2461 // buffered statements stay. The mark also stays:
2462 // marks are only consulted by ROLLBACK TO, which the
2463 // engine validates first, so a dangling mark is
2464 // unreachable.
2465 }
2466 None => {
2467 if let Some(buf) = &mut self.tx_wal {
2468 if !sql_is_read_only(canonical) {
2469 buf.statements.push(canonical.to_string());
2470 }
2471 } else if modified_catalog && !sql_is_read_only(canonical) {
2472 let lsn = self.commit_lsn.fetch_add(1, Ordering::SeqCst) + 1;
2473 // v7.34 (crash-recovery P0 #2) — hybrid log: when
2474 // row-level redo is on and this statement produced row
2475 // changes (DML), write a physical 0x13 redo record so
2476 // replay applies it directly. A statement with no row
2477 // changes (DDL: CREATE/ALTER, never goes through
2478 // Table::insert/update/delete) drains an empty redo and
2479 // keeps the SQL record so the schema still replays.
2480 let redo = if row_redo_enabled() {
2481 self.engine.take_redo()
2482 } else {
2483 Vec::new()
2484 };
2485 record = Some(if redo.is_empty() {
2486 encode_v4_auto_commit(canonical, lsn, wall_clock_micros())
2487 } else {
2488 encode_v5_row_redo(
2489 &spg_storage::encode_redo_log(&redo),
2490 lsn,
2491 wall_clock_micros(),
2492 )
2493 });
2494 }
2495 }
2496 }
2497 let mut ticket = None;
2498 if let Some(record) = record {
2499 let p = self.persistence.as_mut().expect("checked above");
2500 let seq = p.wal.enqueue(&record);
2501 ticket = Some(WalTicket {
2502 group: Arc::clone(&p.wal),
2503 seq,
2504 });
2505 if p.wal.written_len() >= p.checkpoint_threshold_bytes {
2506 // CoW-2 (v7.34): hot path — fire-and-forget. The worker
2507 // serializes off this thread so the commit that just
2508 // crossed the threshold doesn't stall on a multi-hundred-ms
2509 // snapshot write. Any sticky error from a prior async
2510 // checkpoint surfaces here.
2511 self.trigger_checkpoint()?;
2512 }
2513 }
2514 Ok(ticket)
2515 }
2516
2517 /// v7.3.0 — typed-row variant of [`Database::query`]. Each
2518 /// row decodes into a `T: FromSpgRow` so callers don't
2519 /// pattern-match on `Value` themselves. Use [`spg_row!`] to
2520 /// generate the impl, or write it by hand.
2521 pub fn query_typed<T: FromSpgRow>(&mut self, sql: &str) -> Result<Vec<T>, EngineError> {
2522 let rows = self.query(sql)?;
2523 rows.into_iter().map(|r| T::from_spg_row(&r)).collect()
2524 }
2525
2526 /// Run a SELECT and return rows as a `Vec<Vec<Value>>` —
2527 /// strips the column-schema metadata for read-side
2528 /// ergonomics. Errors on non-Rows results (DML / DDL
2529 /// statements should go through `execute` instead).
2530 pub fn query(&mut self, sql: &str) -> Result<Vec<Vec<Value>>, EngineError> {
2531 match self.engine.execute(sql)? {
2532 QueryResult::Rows { rows, .. } => Ok(rows.into_iter().map(|r| r.values).collect()),
2533 QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
2534 "query() expects a SELECT — use execute() for DML/DDL".into(),
2535 )),
2536 // v7.5.0 — QueryResult is #[non_exhaustive]; any future
2537 // variant is not a SELECT row stream, treat as Unsupported.
2538 _ => Err(EngineError::Unsupported(
2539 "query() expects a SELECT — use execute() for DML/DDL".into(),
2540 )),
2541 }
2542 }
2543
2544 /// v7.16.0 — column-aware variant of [`Self::query`].
2545 /// Returns the column schema vec alongside the rows so
2546 /// adapters (the spg-sqlx Row impl most notably) can drive
2547 /// name + type-based column lookups. Errors on non-Rows
2548 /// results identically to `query`.
2549 pub fn query_with_columns(
2550 &mut self,
2551 sql: &str,
2552 ) -> Result<(Vec<spg_storage::ColumnSchema>, Vec<Vec<Value>>), EngineError> {
2553 match self.engine.execute(sql)? {
2554 QueryResult::Rows { columns, rows } => {
2555 Ok((columns, rows.into_iter().map(|r| r.values).collect()))
2556 }
2557 QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
2558 "query_with_columns() expects a SELECT — use execute() for DML/DDL".into(),
2559 )),
2560 _ => Err(EngineError::Unsupported(
2561 "query_with_columns() expects a SELECT — use execute() for DML/DDL".into(),
2562 )),
2563 }
2564 }
2565
2566 /// v7.16.0 — column-aware variant of
2567 /// [`Self::query_prepared`]. Same shape as
2568 /// `query_with_columns` but driven from a prepared
2569 /// statement + bound params.
2570 pub fn query_prepared_with_columns(
2571 &mut self,
2572 stmt: &Statement,
2573 params: &[Value],
2574 ) -> Result<(Vec<spg_storage::ColumnSchema>, Vec<Vec<Value>>), EngineError> {
2575 match self.engine.execute_prepared(stmt.stmt.clone(), params)? {
2576 QueryResult::Rows { columns, rows } => {
2577 Ok((columns, rows.into_iter().map(|r| r.values).collect()))
2578 }
2579 QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
2580 "query_prepared_with_columns() expects a SELECT — use execute_prepared() for DML/DDL".into(),
2581 )),
2582 _ => Err(EngineError::Unsupported(
2583 "query_prepared_with_columns() expects a SELECT — use execute_prepared() for DML/DDL".into(),
2584 )),
2585 }
2586 }
2587
2588 /// Borrow the underlying engine. Escape hatch for callers
2589 /// that need access to `spg-engine` APIs not yet surfaced
2590 /// here (transactions, EXPLAIN ANALYZE, etc.).
2591 #[must_use]
2592 pub const fn engine(&self) -> &Engine {
2593 &self.engine
2594 }
2595
2596 /// Mutable borrow of the underlying engine. Same intent as
2597 /// `engine()` but for write-side APIs (e.g. inserting
2598 /// directly through `Catalog::insert` for high-throughput
2599 /// bulk loads that bypass SQL parsing).
2600 pub const fn engine_mut(&mut self) -> &mut Engine {
2601 &mut self.engine
2602 }
2603
2604 /// v7.38 (mailrs prod 7.35 pool-exhaustion incident) — boot-time
2605 /// plan-IR cache warm-up. Pre-prepares the listed SQL shapes so
2606 /// the first user-facing request doesn't pay the 2-3 s
2607 /// first-fire parse + JOIN-reorder cost on the readonly-blocking
2608 /// pool. Recommended call site: `Database::new` immediately after
2609 /// catalog restore, before serving any traffic. Returns the
2610 /// number of statements successfully cached.
2611 pub fn warm_up_plan_cache(&mut self, sqls: &[&str]) -> usize {
2612 self.engine.warm_up_plan_cache(sqls)
2613 }
2614
2615 /// v7.38 (mailrs prod 7.35 pool-exhaustion incident) — boot-time
2616 /// cold-tier OS page-cache warm-up. Touches every cold segment
2617 /// file in the active catalog so the kernel page cache loads
2618 /// them before user traffic arrives. On a hot-only catalog the
2619 /// call is a near-no-op. Returns the total cold rows touched.
2620 pub fn warm_up_cold_tier(&self) -> usize {
2621 self.engine.warm_up_cold_tier()
2622 }
2623
2624 /// v7.16.0 — parse + plan a SQL string ONCE so subsequent
2625 /// `execute_prepared` / `query_prepared` calls can re-bind
2626 /// parameters without re-parsing. The returned [`Statement`]
2627 /// is a thin handle around the AST + cached source SQL; it's
2628 /// `Clone` so the same plan can drive many bind calls
2629 /// concurrently (each call clones the AST and runs
2630 /// placeholder substitution on the clone — the cached
2631 /// plan stays intact).
2632 ///
2633 /// Plan caching follows the engine's existing version-aware
2634 /// rule: a prepared `Statement` whose statistics version
2635 /// has rolled (ANALYZE ran between prepare and execute)
2636 /// will silently re-prepare under the hood. Callers don't
2637 /// need to detect this.
2638 ///
2639 /// Placeholders in the SQL use PG's `$1`, `$2`, … convention.
2640 /// `bind`-time `Value`s are passed as a slice; arity
2641 /// mismatches surface as `EvalError::PlaceholderOutOfRange`
2642 /// at `execute_prepared` time, not here.
2643 ///
2644 /// # Errors
2645 /// Surfaces `EngineError` (parse error / plan rewrite
2646 /// failure) from the underlying `Engine::prepare`.
2647 pub fn prepare(&mut self, sql: &str) -> Result<Statement, EngineError> {
2648 // Use the cached path so repeated prepares of the same
2649 // SQL are O(1). The engine's plan cache stays shared
2650 // across all callers of this Database — a single
2651 // `PgPool`-shaped consumer (or, later, the spg-sqlx
2652 // adapter) prepares once and reaps the win on every bind.
2653 let stmt = self
2654 .engine
2655 .prepare_cached(sql)
2656 .map_err(EngineError::Parse)?;
2657 Ok(Statement {
2658 stmt,
2659 sql: sql.to_string(),
2660 })
2661 }
2662
2663 /// v7.17.0 Phase 3.P0-66 — describe a SQL string without
2664 /// executing. Returns `(parameter_oid_count, output_columns)`
2665 /// where `output_columns` is empty for non-SELECT statements
2666 /// or for SELECT shapes the describe planner can't resolve
2667 /// (JOIN / subquery / unknown table). Wraps
2668 /// `Engine::describe_prepared` so the spg-sqlx bridge can
2669 /// surface PG-shape Describe replies for
2670 /// `sqlx::query!()` compile-time validation.
2671 ///
2672 /// # Errors
2673 /// Propagates parse errors from the underlying prepare path.
2674 pub fn describe(&mut self, sql: &str) -> Result<(Vec<u32>, Vec<ColumnSchema>), EngineError> {
2675 let stmt = self
2676 .engine
2677 .prepare_cached(sql)
2678 .map_err(EngineError::Parse)?;
2679 Ok(self.engine.describe_prepared(&stmt))
2680 }
2681
2682 /// v7.16.0 — execute a prepared statement with bound
2683 /// parameters. Mirrors `Engine::execute_prepared`: clones
2684 /// the AST, substitutes `$1..$N` → `params[0..N-1]`, runs.
2685 ///
2686 /// Persistence (WAL fsync + auto-checkpoint) follows the
2687 /// same rules as `execute(sql)`: mutating statements get a
2688 /// WAL record AFTER the in-memory exec succeeds. The WAL
2689 /// record carries the substituted, bind-final SQL, so
2690 /// replay reconstructs the same row state without needing
2691 /// the original prepared `Statement` to still be alive.
2692 ///
2693 /// # Errors
2694 /// Propagates engine errors. Param arity mismatch surfaces
2695 /// as `EvalError::PlaceholderOutOfRange`.
2696 pub fn execute_prepared(
2697 &mut self,
2698 stmt: &Statement,
2699 params: &[Value],
2700 ) -> Result<QueryResult, EngineError> {
2701 let (result, ticket) = self.execute_prepared_buffered(stmt, params)?;
2702 if let Some(t) = ticket {
2703 t.wait()?;
2704 }
2705 Ok(result)
2706 }
2707
2708 /// v7.20 P2 — group-commit variant of
2709 /// [`Database::execute_prepared`]. Same contract as
2710 /// [`Database::execute_buffered`]: mutation + enqueue happen
2711 /// here; the caller waits on the ticket AFTER releasing
2712 /// whatever lock guards this `Database`.
2713 ///
2714 /// # Errors
2715 /// Engine errors propagate unchanged; inline auto-checkpoint
2716 /// may surface IO errors.
2717 pub fn execute_prepared_buffered(
2718 &mut self,
2719 stmt: &Statement,
2720 params: &[Value],
2721 ) -> Result<(QueryResult, Option<WalTicket>), EngineError> {
2722 let result = self.engine.execute_prepared(stmt.stmt.clone(), params)?;
2723 let modified = matches!(
2724 &result,
2725 QueryResult::CommandOk {
2726 modified_catalog: true,
2727 ..
2728 }
2729 );
2730 // WAL persistence on the bind-final SQL. Build the
2731 // canonical Display form by re-printing the
2732 // placeholder-substituted statement (cheap — the AST
2733 // is already in hand from execute_prepared's internal
2734 // clone) so replay's path is identical to the
2735 // simple-query path. v7.21: also when a transaction is
2736 // open — in-tx mutations report `modified_catalog: false`
2737 // but must reach the tx WAL buffer (see `wal_after_ok`).
2738 let mut ticket = None;
2739 if self.persistence.is_some()
2740 && (modified
2741 || (self.tx_wal.is_some() && !sql_is_read_only(&stmt.sql))
2742 || tx_control_kind(&stmt.sql).is_some())
2743 {
2744 let mut wal_stmt = stmt.stmt.clone();
2745 crate::wal_render_with_params(&mut wal_stmt, params);
2746 let canonical = format!("{wal_stmt}");
2747 ticket = self.wal_after_ok(&canonical, modified)?;
2748 }
2749 Ok((result, ticket))
2750 }
2751
2752 /// v7.16.0 — run a prepared SELECT with bound params and
2753 /// return rows as `Vec<Vec<Value>>`, matching `query()`
2754 /// shape. SELECTs are read-only so this never writes the
2755 /// WAL.
2756 ///
2757 /// # Errors
2758 /// Returns `Unsupported` if the prepared statement isn't a
2759 /// SELECT (use `execute_prepared` for DML/DDL).
2760 pub fn query_prepared(
2761 &mut self,
2762 stmt: &Statement,
2763 params: &[Value],
2764 ) -> Result<Vec<Vec<Value>>, EngineError> {
2765 match self.engine.execute_prepared(stmt.stmt.clone(), params)? {
2766 QueryResult::Rows { rows, .. } => Ok(rows.into_iter().map(|r| r.values).collect()),
2767 QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
2768 "query_prepared() expects a SELECT — use execute_prepared() for DML/DDL".into(),
2769 )),
2770 _ => Err(EngineError::Unsupported(
2771 "query_prepared() expects a SELECT — use execute_prepared() for DML/DDL".into(),
2772 )),
2773 }
2774 }
2775
2776 /// v7.18 — parse + plan a SQL string against a
2777 /// `CatalogSnapshot`. Mirror of [`Database::prepare`] for the
2778 /// readonly fan-out path: no writer lock taken, no WAL write,
2779 /// no plan-cache mutation. Static-on-`Self` so callers can
2780 /// dispatch against a snapshot without an `&mut Database`
2781 /// borrow — `AsyncReadHandle::prepare` in spg-embedded-tokio
2782 /// is the load-bearing consumer.
2783 ///
2784 /// # Errors
2785 /// Propagates `EngineError::Parse` from the parser.
2786 pub fn prepare_on_snapshot(
2787 snapshot: &CatalogSnapshot,
2788 sql: &str,
2789 ) -> Result<Statement, EngineError> {
2790 let stmt =
2791 spg_engine::Engine::prepare_on_snapshot(snapshot, sql).map_err(EngineError::Parse)?;
2792 Ok(Statement {
2793 stmt,
2794 sql: sql.to_string(),
2795 })
2796 }
2797
2798 /// v7.18 — execute a prepared `Statement` against a
2799 /// `CatalogSnapshot` with bound params. Mirror of
2800 /// [`Database::execute_prepared`] on the readonly path:
2801 /// writes / DDL hit `EngineError::WriteRequired`. No WAL
2802 /// write, no writer lock, multiple snapshots can run
2803 /// concurrently — the snapshot is immutable from prepare time.
2804 ///
2805 /// # Errors
2806 /// Surfaces `EngineError::WriteRequired` for non-readonly
2807 /// statements; propagates other engine errors.
2808 pub fn execute_prepared_on_snapshot(
2809 snapshot: &CatalogSnapshot,
2810 stmt: &Statement,
2811 params: &[Value],
2812 ) -> Result<QueryResult, EngineError> {
2813 spg_engine::Engine::execute_readonly_prepared_on_snapshot(
2814 snapshot,
2815 stmt.stmt.clone(),
2816 params,
2817 )
2818 }
2819
2820 /// v7.28 (round-22) — deadline-bounded variant of
2821 /// [`Database::execute_prepared_on_snapshot`]. Returns
2822 /// `EngineError::Cancelled` once the budget elapses; the
2823 /// sqlx driver uses this to keep readonly-INLINE execution
2824 /// from monopolising the caller's async runtime (four slow
2825 /// inbox queries saturated mailrs's whole tokio pool) and
2826 /// re-runs over the blocking pool on timeout.
2827 ///
2828 /// # Errors
2829 /// `EngineError::Cancelled` on budget expiry; engine errors
2830 /// otherwise.
2831 pub fn execute_prepared_on_snapshot_with_budget(
2832 snapshot: &CatalogSnapshot,
2833 stmt: &Statement,
2834 params: &[Value],
2835 budget_us: u64,
2836 ) -> Result<QueryResult, EngineError> {
2837 fn mono_now_us() -> u64 {
2838 use std::time::{SystemTime, UNIX_EPOCH};
2839 // Monotonic enough for a per-call relative budget: the
2840 // engine only compares (now - start) against the budget
2841 // within one call.
2842 SystemTime::now()
2843 .duration_since(UNIX_EPOCH)
2844 .map(|d| u64::try_from(d.as_micros()).unwrap_or(u64::MAX))
2845 .unwrap_or(0)
2846 }
2847 let deadline = mono_now_us().saturating_add(budget_us);
2848 let token = spg_engine::CancelToken::none().with_deadline(mono_now_us, deadline);
2849 spg_engine::Engine::execute_readonly_prepared_on_snapshot_with_cancel(
2850 snapshot,
2851 stmt.stmt.clone(),
2852 params,
2853 token,
2854 )
2855 }
2856
2857 /// v7.18 — describe a SQL string against a
2858 /// `CatalogSnapshot`. Mirror of [`Database::describe`] on
2859 /// the readonly path. Pure function on the snapshot's
2860 /// catalog; safe to call from any thread.
2861 ///
2862 /// # Errors
2863 /// Propagates `EngineError::Parse` from the parser.
2864 pub fn describe_on_snapshot(
2865 snapshot: &CatalogSnapshot,
2866 sql: &str,
2867 ) -> Result<(Vec<u32>, Vec<ColumnSchema>), EngineError> {
2868 let stmt =
2869 spg_engine::Engine::prepare_on_snapshot(snapshot, sql).map_err(EngineError::Parse)?;
2870 Ok(spg_engine::Engine::describe_prepared_on_snapshot(
2871 snapshot, &stmt,
2872 ))
2873 }
2874
2875 /// v7.21 (round-12 polish) — run a multi-statement SQL script
2876 /// with PG simple-query semantics: the statements execute in
2877 /// order inside ONE implicit transaction, so a mid-script error
2878 /// rolls back the whole script (PG wraps every simple-query
2879 /// message in an implicit transaction). Three exceptions, all
2880 /// PG-faithful:
2881 ///
2882 /// - a script that carries its OWN transaction control
2883 /// (BEGIN / COMMIT / …) runs statement-by-statement — the
2884 /// script owns its boundaries;
2885 /// - a script run while the caller already has a transaction
2886 /// open joins that transaction (no nested BEGIN), and the
2887 /// caller's COMMIT / ROLLBACK decides its fate;
2888 /// - a single-statement script is plain auto-commit.
2889 ///
2890 /// Returns one `QueryResult` per executed statement. This is the
2891 /// engine behind `sqlx::raw_sql` (mailrs feeds whole
2892 /// `init-schema.sql` files through it) and `spgctl import`.
2893 ///
2894 /// # Errors
2895 /// The first failing statement's error propagates after the
2896 /// implicit ROLLBACK; nothing from the script remains applied.
2897 pub fn execute_script(&mut self, sql: &str) -> Result<Vec<QueryResult>, EngineError> {
2898 let stmts = split_statements(sql);
2899 let script_owns_tx = stmts.iter().any(|s| tx_control_kind(s).is_some());
2900 let wrap = stmts.len() > 1 && !script_owns_tx && !self.engine.in_transaction();
2901 if !wrap {
2902 let mut out = Vec::with_capacity(stmts.len());
2903 for stmt in &stmts {
2904 out.push(self.execute_dump_statement(stmt)?);
2905 }
2906 return Ok(out);
2907 }
2908 self.execute("BEGIN")?;
2909 let mut out = Vec::with_capacity(stmts.len());
2910 for stmt in &stmts {
2911 match self.execute_dump_statement(stmt) {
2912 Ok(r) => out.push(r),
2913 Err(e) => {
2914 // Best-effort rollback; surface the script error.
2915 let _ = self.execute("ROLLBACK");
2916 return Err(e);
2917 }
2918 }
2919 }
2920 self.execute("COMMIT")?;
2921 Ok(out)
2922 }
2923
2924 /// v7.22 (round-13 T2) — execute one `split_statements` chunk,
2925 /// lowering a `COPY … FROM stdin;` block (statement + its data
2926 /// lines, as one chunk) to per-row INSERTs through the shared
2927 /// `spg_engine::copy` helpers. Default-format pg_dump emits
2928 /// COPY blocks, so the zero-change import promise needs this on
2929 /// the embed path; non-COPY statements pass straight through to
2930 /// [`Self::execute`]. Public so `spgctl import` can keep its
2931 /// per-statement error indexing while sharing the lowering.
2932 ///
2933 /// # Errors
2934 /// Engine errors propagate; for COPY the failing row's INSERT
2935 /// error carries the synthesized statement context.
2936 pub fn execute_dump_statement(&mut self, stmt: &str) -> Result<QueryResult, EngineError> {
2937 // Strip pg_dump's `-- Data for Name: …;` banner (it carries
2938 // semicolons of its own) before splitting head from data.
2939 let stmt_clean = strip_leading_sql_noise(stmt);
2940 let head_is_copy = stmt_clean
2941 .get(..4)
2942 .is_some_and(|p| p.eq_ignore_ascii_case("copy"));
2943 if head_is_copy
2944 && let Some((head, data)) = stmt_clean.split_once(';')
2945 && let Some(spec) = spg_engine::copy::parse_copy_from_stdin_head(head)
2946 {
2947 let mut affected: usize = 0;
2948 for line in data.lines() {
2949 // Empty fragments only occur at the chunk boundary
2950 // (the remainder of the COPY line right after `;`);
2951 // data rows are whole non-empty lines.
2952 let line = line.strip_suffix('\r').unwrap_or(line);
2953 if line.is_empty() {
2954 continue;
2955 }
2956 let values = spg_engine::copy::decode_copy_text_row(line);
2957 let insert = spg_engine::copy::build_copy_insert(
2958 &spec.table,
2959 spec.columns.as_deref(),
2960 &values,
2961 );
2962 match self.execute(&insert)? {
2963 QueryResult::CommandOk { affected: n, .. } => affected += n,
2964 _ => affected += 1,
2965 }
2966 }
2967 return Ok(QueryResult::CommandOk {
2968 affected,
2969 modified_catalog: false,
2970 });
2971 }
2972 self.execute(stmt)
2973 }
2974
2975 /// v7.2.0 — run `body` inside an implicit `BEGIN` /
2976 /// `COMMIT` pair. The body receives `&mut Database` so it
2977 /// can `execute()` / `query()` like any other code path;
2978 /// the only difference is that every write in the body
2979 /// lands inside one transaction, and a returned `Err` from
2980 /// the body triggers `ROLLBACK` before the error propagates.
2981 ///
2982 /// Nested calls are not supported — SPG's transaction
2983 /// model is single-writer with explicit `BEGIN` /
2984 /// `COMMIT` / `ROLLBACK`, and a nested `with_transaction`
2985 /// would hit `EngineError::Unsupported("nested
2986 /// transaction")` at the inner `BEGIN`.
2987 pub fn with_transaction<R, F>(&mut self, body: F) -> Result<R, EngineError>
2988 where
2989 F: FnOnce(&mut Self) -> Result<R, EngineError>,
2990 {
2991 self.execute("BEGIN")?;
2992 match body(self) {
2993 Ok(value) => {
2994 self.execute("COMMIT")?;
2995 Ok(value)
2996 }
2997 Err(e) => {
2998 // Best-effort rollback. If ROLLBACK itself
2999 // fails (rare — the engine reports it via
3000 // `Unsupported` only when there's no active
3001 // TX, which can't happen here) we surface the
3002 // original body error, not the rollback error.
3003 let _ = self.execute("ROLLBACK");
3004 Err(e)
3005 }
3006 }
3007 }
3008}
3009
3010impl Default for Database {
3011 fn default() -> Self {
3012 Self::open_in_memory()
3013 }
3014}
3015
3016/// v7.7.5 — observability snapshot returned by
3017/// [`Database::metrics`]. Plain data, no allocations beyond
3018/// what the struct itself takes; cheap to construct and
3019/// cheap to serialise.
3020#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3021#[non_exhaustive]
3022pub struct EmbeddedMetrics {
3023 /// Total live row count across every user table (hot
3024 /// tier only — cold-tier rows live in segment files).
3025 pub hot_rows: u64,
3026 /// Sum of `Table::hot_bytes` across every user table.
3027 /// Tracks against the freezer's `hot_tier_bytes` budget.
3028 pub hot_bytes: u64,
3029 /// Number of cold-tier segments registered in the catalog.
3030 /// Includes tombstoned slots (segments retired by
3031 /// compaction whose disk file may still be on disk).
3032 pub cold_segments: u64,
3033 /// User-table count (excludes any future engine-managed
3034 /// internal tables).
3035 pub tables: u64,
3036 /// WAL size at last `execute()` / `checkpoint()`. Zero
3037 /// when the database is in-memory.
3038 pub wal_bytes: u64,
3039 /// `true` when the database was opened with `open_path` —
3040 /// i.e. WAL + checkpoint persistence is active.
3041 pub persistent: bool,
3042}
3043
3044/// v7.2.1 — handle returned by `spawn_background_freezer`.
3045/// Drop signals the worker thread to wind down + joins it,
3046/// so a `Database` (or its shared `Arc<Mutex<Database>>`)
3047/// can safely drop after the handle does.
3048#[must_use = "the background freezer keeps running until this handle is dropped"]
3049#[derive(Debug)]
3050pub struct FreezerHandle {
3051 shutdown: Arc<AtomicBool>,
3052 join: Option<JoinHandle<()>>,
3053}
3054
3055impl FreezerHandle {
3056 /// v7.2.1 — request the worker stop + join. Idempotent;
3057 /// safe to call from `Drop` (which also calls it).
3058 pub fn stop(&mut self) {
3059 self.shutdown.store(true, Ordering::Release);
3060 if let Some(h) = self.join.take() {
3061 let _ = h.join();
3062 }
3063 }
3064}
3065
3066impl Drop for FreezerHandle {
3067 fn drop(&mut self) {
3068 self.stop();
3069 }
3070}
3071
3072/// v7.2.1 — knobs for `Database::spawn_background_freezer`.
3073#[derive(Debug, Clone)]
3074pub struct FreezerOptions {
3075 /// Tick interval. Worker wakes every `tick`, checks the
3076 /// catalog's `hot_tier_bytes`, and freezes if over budget.
3077 pub tick: Duration,
3078 /// Hot-tier byte budget. Exceeded → next tick freezes the
3079 /// largest table's oldest `batch_rows` rows into a new
3080 /// cold segment.
3081 pub hot_tier_bytes: u64,
3082 /// Max rows the freezer demotes per fire.
3083 pub batch_rows: usize,
3084 /// v7.7.4 — auto-compact threshold. When the catalog has
3085 /// at least this many cold segments across all tables, the
3086 /// freezer fires a compaction pass after its next freeze.
3087 /// Set to `usize::MAX` to disable auto-compact entirely;
3088 /// the default is `64`, matching the `spg-server` operating
3089 /// point for SPG_COLD_COMPACT_SEGMENT_THRESHOLD.
3090 pub compact_when_segments_exceed: usize,
3091 /// v7.7.4 — target segment size for compaction merges,
3092 /// in bytes. Default 64 MiB, mirroring `spg-server`. Small
3093 /// segments below this size are merge candidates;
3094 /// segments at or above stay untouched.
3095 pub compact_target_bytes: u64,
3096}
3097
3098impl Default for FreezerOptions {
3099 fn default() -> Self {
3100 // Match the `spg-server` freezer's default operating
3101 // point (SPG_HOT_TIER_BYTES = 4 GiB, batch 1000 rows,
3102 // tick every 1 s) so embedded behaviour is predictable
3103 // for operators familiar with the server.
3104 Self {
3105 tick: Duration::from_secs(1),
3106 hot_tier_bytes: 4 * 1024 * 1024 * 1024,
3107 batch_rows: 1000,
3108 compact_when_segments_exceed: 64,
3109 compact_target_bytes: 64 * 1024 * 1024,
3110 }
3111 }
3112}
3113
3114impl Database {
3115 /// v7.7.4 — observe the catalog's cold-segment count.
3116 /// Useful for tests + dashboards that want to verify
3117 /// auto-compaction is firing.
3118 #[must_use]
3119 pub fn cold_segment_count(&self) -> usize {
3120 self.engine.catalog().cold_segment_count()
3121 }
3122
3123 /// v7.7.5 — observability snapshot. Returns a point-in-time
3124 /// view of the engine + persistence counters. Cheap (no
3125 /// locks beyond the existing `&self` borrow), so safe to
3126 /// call from a hot metrics-scrape path.
3127 ///
3128 /// Fields mirror the operational dashboard
3129 /// [`spg-server`](https://crates.io/crates/spg-server) exposes,
3130 /// minus the network counters that don't apply to embedded.
3131 #[must_use]
3132 pub fn metrics(&self) -> EmbeddedMetrics {
3133 let cat = self.engine.catalog();
3134 let mut hot_rows: u64 = 0;
3135 let mut hot_bytes: u64 = 0;
3136 for name in cat.table_names() {
3137 if let Some(t) = cat.get(&name) {
3138 hot_rows = hot_rows.saturating_add(t.row_count() as u64);
3139 hot_bytes = hot_bytes.saturating_add(t.hot_bytes());
3140 }
3141 }
3142 let (wal_bytes, persistent) = match &self.persistence {
3143 Some(p) => (p.wal.written_len(), true),
3144 None => (0, false),
3145 };
3146 EmbeddedMetrics {
3147 hot_rows,
3148 hot_bytes,
3149 cold_segments: cat.cold_segment_count() as u64,
3150 tables: cat.table_count() as u64,
3151 wal_bytes,
3152 persistent,
3153 }
3154 }
3155
3156 /// v7.2.1 — spawn a background thread that periodically
3157 /// runs `freeze_oldest_to_cold` when the catalog-wide hot
3158 /// tier exceeds `opts.hot_tier_bytes`. The `Arc<Mutex<_>>`
3159 /// pattern matches the v7.2 sharing story: callers wrap
3160 /// their `Database` in `Arc::new(Mutex::new(db))` once,
3161 /// then clone the Arc for the worker + for foreground
3162 /// access. Return value is a handle whose `Drop` joins the
3163 /// worker.
3164 ///
3165 /// Picks the freeze target the same way `spg-server`'s
3166 /// freezer does: largest-`hot_bytes` user table with at
3167 /// least one BTree integer-PK index. Tables without a
3168 /// freezable index are skipped silently.
3169 pub fn spawn_background_freezer(
3170 db: Arc<Mutex<Database>>,
3171 opts: FreezerOptions,
3172 ) -> FreezerHandle {
3173 let shutdown = Arc::new(AtomicBool::new(false));
3174 let shutdown_for_thread = Arc::clone(&shutdown);
3175 let join = thread::Builder::new()
3176 .name("spg-embedded-freezer".into())
3177 .spawn(move || {
3178 background_freezer_loop(db, opts, shutdown_for_thread);
3179 })
3180 .expect("spawn background freezer thread");
3181 FreezerHandle {
3182 shutdown,
3183 join: Some(join),
3184 }
3185 }
3186}
3187
3188/// v7.2.1 — the freezer's main loop, factored out so the
3189/// `Database::spawn_background_freezer` path stays readable.
3190fn background_freezer_loop(
3191 db: Arc<Mutex<Database>>,
3192 opts: FreezerOptions,
3193 shutdown: Arc<AtomicBool>,
3194) {
3195 // Sleep in short slices so a shutdown request resolves
3196 // quickly (vs sleeping the full tick).
3197 let slice = Duration::from_millis(50.min(opts.tick.as_millis() as u64));
3198 let mut last_tick = std::time::Instant::now();
3199 loop {
3200 if shutdown.load(Ordering::Acquire) {
3201 return;
3202 }
3203 thread::sleep(slice);
3204 if last_tick.elapsed() < opts.tick {
3205 continue;
3206 }
3207 last_tick = std::time::Instant::now();
3208 let Ok(mut guard) = db.lock() else {
3209 return;
3210 };
3211 if guard.engine.catalog().hot_tier_bytes() <= opts.hot_tier_bytes {
3212 continue;
3213 }
3214 let Some((table, index)) = pick_freeze_target(&guard) else {
3215 continue;
3216 };
3217 let row_count = guard
3218 .engine
3219 .catalog()
3220 .get(&table)
3221 .map_or(0, spg_storage::Table::row_count);
3222 let to_freeze = opts.batch_rows.min(row_count);
3223 if to_freeze == 0 {
3224 continue;
3225 }
3226 if let Err(e) = guard.freeze_oldest_to_cold(&table, &index, to_freeze) {
3227 eprintln!("spg-embedded: background freeze on {table}.{index} failed: {e:?}");
3228 continue;
3229 }
3230 // v7.7.4 — auto-compact. If the catalog now carries
3231 // more cold segments than the configured threshold,
3232 // run a single compaction pass. Failures are reported
3233 // but don't kill the loop; the next tick will retry.
3234 let count = guard.engine.catalog().cold_segment_count();
3235 if count > opts.compact_when_segments_exceed {
3236 if let Err(e) = guard
3237 .engine
3238 .compact_cold_segments_with_target(opts.compact_target_bytes)
3239 {
3240 eprintln!(
3241 "spg-embedded: background compact failed (segments={count}, \
3242 threshold={}): {e:?}",
3243 opts.compact_when_segments_exceed,
3244 );
3245 }
3246 }
3247 }
3248}
3249
3250/// v7.2.1 — pick the highest-`hot_bytes` user table with a
3251/// BTree integer-PK index. Returns `(table, index_name)` so the
3252/// caller can dispatch through `freeze_oldest_to_cold`.
3253fn pick_freeze_target(db: &Database) -> Option<(String, String)> {
3254 let cat = db.engine.catalog();
3255 let mut best: Option<(String, String, u64)> = None;
3256 for name in cat.table_names() {
3257 let Some(t) = cat.get(&name) else { continue };
3258 if t.row_count() == 0 {
3259 continue;
3260 }
3261 let cols = &t.schema().columns;
3262 let Some(idx) = t.indices().iter().find(|i| {
3263 matches!(i.kind, spg_storage::IndexKind::BTree(_))
3264 && i.column_position < cols.len()
3265 && matches!(
3266 cols[i.column_position].ty,
3267 spg_storage::DataType::SmallInt
3268 | spg_storage::DataType::Int
3269 | spg_storage::DataType::BigInt
3270 )
3271 }) else {
3272 continue;
3273 };
3274 let hot = t.hot_bytes();
3275 match best {
3276 None => best = Some((name, idx.name.clone(), hot)),
3277 Some((_, _, best_hot)) if hot > best_hot => {
3278 best = Some((name, idx.name.clone(), hot));
3279 }
3280 _ => {}
3281 }
3282 }
3283 best.map(|(t, i, _)| (t, i))
3284}
3285
3286/// v7.7.6 — replay the first `to_seq` records of the WAL at
3287/// `wal_path` into a fresh engine and write the resulting
3288/// catalog snapshot to `out_db_path`. Same semantics as
3289/// `spg revert --wal … --to-seq N --out …` from the CLI:
3290///
3291/// - `to_seq == 0` → snapshot is the empty catalog
3292/// - WAL records beyond `to_seq` are not applied
3293/// - durability-checkpoint markers (v3 type 0x02) are
3294/// consumed without counting against the budget
3295///
3296/// Returns the number of statements actually applied
3297/// (`≤ to_seq`). The output snapshot is byte-identical to
3298/// what `Database::open_path(out_db_path)` would consume on
3299/// a subsequent open.
3300///
3301/// This is the "rewind" operator for an embedded database
3302/// that has been corrupted by a poison statement or a
3303/// half-applied migration. Pair with `cold_segment_paths`
3304/// preservation if your cold-tier files are still on disk.
3305///
3306/// # Errors
3307///
3308/// - `wal_path` unreadable or truncated mid-record
3309/// - WAL record decodes to invalid UTF-8 SQL
3310/// - WAL record's SQL is rejected by the engine
3311/// - `out_db_path` unwritable
3312pub fn revert_wal_to_seq(
3313 wal_path: impl AsRef<Path>,
3314 to_seq: u64,
3315 out_db_path: impl AsRef<Path>,
3316) -> Result<u64, EngineError> {
3317 // v7.19 — accept either a single-file legacy WAL (v7.18 and
3318 // earlier layout) or a chunked WAL directory (v7.19+). For a
3319 // directory, concatenate every `.wal` chunk in sorted order
3320 // — the same order open_path replays them in — so revert
3321 // sees the full record stream.
3322 let path = wal_path.as_ref();
3323 let wal_bytes = if path.is_dir() {
3324 let mut combined = Vec::new();
3325 let chunks = sorted_wal_chunks(path).map_err(io_err)?;
3326 for chunk in chunks {
3327 let bytes = std::fs::read(&chunk).map_err(io_err)?;
3328 combined.extend_from_slice(&bytes);
3329 }
3330 combined
3331 } else {
3332 std::fs::read(path).map_err(io_err)?
3333 };
3334 let mut engine = Engine::new();
3335 let mut applied = 0u64;
3336 let mut cur = 0usize;
3337 while cur < wal_bytes.len() && applied < to_seq {
3338 let (sql_bytes, total) = decode_wal_record(&wal_bytes[cur..])?;
3339 cur += total;
3340 if sql_bytes.is_empty() {
3341 continue;
3342 }
3343 let sql = core::str::from_utf8(&sql_bytes).map_err(|e| {
3344 EngineError::Storage(spg_storage::StorageError::Corrupt(format!(
3345 "WAL record at offset {cur}: non-UTF-8 SQL: {e}"
3346 )))
3347 })?;
3348 // v7.21 — tx-commit records carry a multi-statement script;
3349 // split_statements is a no-op for single-statement records.
3350 for stmt in split_statements(sql) {
3351 engine.execute(stmt)?;
3352 }
3353 applied += 1;
3354 }
3355 let snapshot = engine.snapshot();
3356 std::fs::write(out_db_path.as_ref(), &snapshot).map_err(io_err)?;
3357 Ok(applied)
3358}
3359
3360/// v7.7.6 — decode one WAL record from a byte tail. Returns
3361/// `(sql_bytes, header_plus_payload_len)`. Handles the three
3362/// on-disk formats (v1 / v2 / v3) the same way the CLI
3363/// `decode_one_record` and the engine's `replay_wal_bytes`
3364/// do. CRCs are not re-validated; the caller's intent is
3365/// "apply", not "validate".
3366fn decode_wal_record(tail: &[u8]) -> Result<(Vec<u8>, usize), EngineError> {
3367 if tail.len() < 4 {
3368 return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
3369 format!("WAL truncated record: {} < 4 header bytes", tail.len()),
3370 )));
3371 }
3372 let raw_len = u32::from_le_bytes(tail[..4].try_into().unwrap());
3373 let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
3374 let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
3375 let len_mask = if is_v3 {
3376 !(WAL_V2_SENTINEL | WAL_V3_FLAG)
3377 } else {
3378 !WAL_V2_SENTINEL
3379 };
3380 let rec_len = (raw_len & len_mask) as usize;
3381 let header_len = if is_v3 {
3382 9
3383 } else if is_v2 {
3384 8
3385 } else {
3386 4
3387 };
3388 if tail.len() < header_len + rec_len {
3389 return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
3390 format!(
3391 "WAL truncated record: header+payload {} > available {}",
3392 header_len + rec_len,
3393 tail.len()
3394 ),
3395 )));
3396 }
3397 if is_v3 {
3398 let type_byte = tail[8];
3399 // v3 type 0x01 = auto_commit_sql (payload = SQL).
3400 // v3 type 0x02 = durability marker (no SQL to apply).
3401 // v4 type 0x10 = auto_commit_sql with 16-byte (lsn, ts)
3402 // prefix between type and SQL — strip
3403 // the prefix so the caller still sees raw
3404 // SQL bytes.
3405 // Anything else is unknown.
3406 if type_byte == WAL_V3_TYPE_AUTO_COMMIT_SQL {
3407 let payload = &tail[header_len..header_len + rec_len];
3408 return Ok((payload.to_vec(), header_len + rec_len));
3409 }
3410 if type_byte == WAL_V4_TYPE_AUTO_COMMIT_SQL || type_byte == WAL_V4_TYPE_TX_COMMIT_SQL {
3411 let v4_total = header_len + WAL_V4_EXTRA_HEADER + rec_len;
3412 if tail.len() < v4_total {
3413 return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
3414 format!(
3415 "WAL truncated v4 record: header+payload {v4_total} > available {}",
3416 tail.len()
3417 ),
3418 )));
3419 }
3420 let sql_start = header_len + WAL_V4_EXTRA_HEADER;
3421 let sql_bytes = tail[sql_start..sql_start + rec_len].to_vec();
3422 return Ok((sql_bytes, v4_total));
3423 }
3424 // Caller treats empty payload as a skip-marker.
3425 return Ok((Vec::new(), header_len + rec_len));
3426 }
3427 let payload = &tail[header_len..header_len + rec_len];
3428 Ok((payload.to_vec(), header_len + rec_len))
3429}
3430
3431impl Drop for Database {
3432 fn drop(&mut self) {
3433 // v7.1 — best-effort final checkpoint when a persistent
3434 // Database leaves scope. Failures here go to stderr so
3435 // operators see them, but Drop can't propagate errors —
3436 // the WAL itself is already durable, so a checkpoint
3437 // miss only means the next boot replays a few more
3438 // records than strictly necessary.
3439 if self.persistence.is_some() {
3440 if let Err(e) = self.checkpoint() {
3441 eprintln!(
3442 "spg-embedded: final checkpoint on Drop failed: {e:?} \
3443 (WAL is intact; next open_path will replay)"
3444 );
3445 }
3446 }
3447 // v7.19 P3 / v7.20 — signal the retention + flusher
3448 // threads to exit, then wait for them. Done BEFORE the
3449 // lock release so background threads don't outlive the
3450 // database handle. The flusher drains the pending batch
3451 // on its way out (final flush_now in the thread body),
3452 // so `SPG_SYNCHRONOUS_COMMIT=off` never loses confirmed
3453 // commits across a clean shutdown.
3454 if let Some(ctx) = self.persistence.as_mut() {
3455 if let Some(shutdown) = ctx.retention_shutdown.take() {
3456 shutdown.store(true, Ordering::SeqCst);
3457 }
3458 if let Some(handle) = ctx.retention_thread.take() {
3459 let _ = handle.join();
3460 }
3461 if let Some(shutdown) = ctx.flusher_shutdown.take() {
3462 shutdown.store(true, Ordering::SeqCst);
3463 }
3464 if let Some(handle) = ctx.flusher_thread.take() {
3465 let _ = handle.join();
3466 }
3467 // CoW-2 (v7.34) — final checkpoint above left the worker
3468 // idle; explicitly drop it here so its shutdown signal +
3469 // thread join happens with a deterministic ordering (before
3470 // the lock release / persistence drop), not whenever Rust
3471 // happens to drop the PersistenceCtx fields.
3472 ctx.checkpoint_worker = None;
3473 }
3474 // v7.17.0 Phase 6.2 — release the cross-process lock on
3475 // clean shutdown. Failure is logged but never panics;
3476 // the operator can clear a stale lock via
3477 // `Database::force_unlock` if a crash kept the
3478 // directory around.
3479 if let Some(ctx) = &self.persistence
3480 && ctx.lock_path.exists()
3481 {
3482 // remove_dir_all: the lock dir carries the owner-pid
3483 // record since round-12.
3484 if let Err(e) = std::fs::remove_dir_all(&ctx.lock_path) {
3485 eprintln!(
3486 "spg-embedded: lock release on Drop failed for {}: {e:?}",
3487 ctx.lock_path.display()
3488 );
3489 }
3490 }
3491 }
3492}
3493
3494impl Database {
3495 /// v7.17.0 Phase 6.2 — clear a stale cross-process lock.
3496 /// Use when a previous process crashed mid-session and
3497 /// left `<db_path>.lock` behind. Operators should confirm
3498 /// no other process is currently using the database before
3499 /// calling this — SPG cannot fingerprint stale-vs-live
3500 /// without a libc dep, which would violate spg-embedded's
3501 /// zero-deps charter.
3502 pub fn force_unlock(db_path: impl AsRef<Path>) -> Result<(), EngineError> {
3503 let lock_path = {
3504 let mut p = db_path.as_ref().to_path_buf();
3505 let name = p
3506 .file_name()
3507 .map(|n| {
3508 let mut s = n.to_os_string();
3509 s.push(".lock");
3510 s
3511 })
3512 .unwrap_or_else(|| std::ffi::OsString::from(".lock"));
3513 p.set_file_name(name);
3514 p
3515 };
3516 if !lock_path.exists() {
3517 return Ok(());
3518 }
3519 std::fs::remove_dir_all(&lock_path).map_err(io_err)
3520 }
3521}
3522
3523/// v7.1 — turn a `std::io::Error` into the workspace's
3524/// `EngineError` shape. `EngineError::Storage(Corrupt(_))` is
3525/// the closest existing variant — io failures during boot or
3526/// during a WAL append surface as a storage-layer fault to
3527/// callers, which keeps the public error enum unchanged.
3528fn io_err(e: std::io::Error) -> EngineError {
3529 EngineError::Storage(spg_storage::StorageError::Corrupt(format!("io: {e}")))
3530}
3531
3532/// v7.2.2 — `Database` is `Send`, so the recommended sharing
3533/// pattern for multi-threaded callers is `Arc<Mutex<Database>>`:
3534///
3535/// ```no_run
3536/// use std::sync::{Arc, Mutex};
3537/// use spg_embedded::Database;
3538///
3539/// let db = Database::open_in_memory();
3540/// let shared = Arc::new(Mutex::new(db));
3541/// let shared_for_worker = Arc::clone(&shared);
3542/// std::thread::spawn(move || {
3543/// let mut guard = shared_for_worker.lock().unwrap();
3544/// guard.execute("INSERT INTO t VALUES (1)").unwrap();
3545/// });
3546/// ```
3547///
3548/// Internal `RwLock`-wrapped state — letting many threads
3549/// hold concurrent `&Database` for `SELECT` without contending
3550/// — is parked as STABILITY § "Out of v7.2"; multi-reader
3551/// embedded throughput needs a planner-side change to release
3552/// the engine read lock between scans, which is the v7.x
3553/// "Choice A" line of work already documented in v6.9.1's
3554/// carve-out.
3555#[allow(dead_code)]
3556fn _database_is_send() {
3557 fn assert_send<T: Send>() {}
3558 assert_send::<Database>();
3559}
3560
3561/// v6.10.3 — trait that maps a row's columns onto a user
3562/// struct's fields. v7.3.0 ships the [`spg_row!`] declarative
3563/// macro that generates `impl FromSpgRow for YourStruct` from
3564/// a struct definition (no proc-macro, no syn/quote/
3565/// proc-macro2 deps — the workspace's "0 external deps"
3566/// policy holds).
3567///
3568/// Implementors map a row's columns onto a user struct's
3569/// fields. Errors surface as `EngineError::Unsupported` so the
3570/// caller's error type stays uniform.
3571pub trait FromSpgRow: Sized {
3572 /// Decode one query result row into `Self`. Called once per
3573 /// row by [`Database::query_typed`]. The slice length equals
3574 /// the number of columns in the SELECT projection.
3575 fn from_spg_row(row: &[Value]) -> Result<Self, EngineError>;
3576}
3577
3578/// v7.3.0 — declarative macro that generates `FromSpgRow` impl
3579/// for a user struct. Avoids proc-macro deps
3580/// (syn/quote/proc-macro2) so the workspace's 0-deps policy
3581/// holds; the trade-off vs `#[derive(SpgRow)]` is that the
3582/// macro takes the entire struct definition (fields + types)
3583/// as input rather than annotating an existing struct.
3584///
3585/// ```no_run
3586/// use spg_embedded::{Database, spg_row, FromSpgRow};
3587///
3588/// spg_row! {
3589/// pub struct User {
3590/// pub id: i32,
3591/// pub name: String,
3592/// }
3593/// }
3594///
3595/// let mut db = Database::open_in_memory();
3596/// db.execute("CREATE TABLE users (id INT NOT NULL, name TEXT)").unwrap();
3597/// db.execute("INSERT INTO users VALUES (1, 'alice')").unwrap();
3598/// let users: Vec<User> = db.query_typed("SELECT id, name FROM users").unwrap();
3599/// ```
3600///
3601/// Supported field types: `i16`, `i32`, `i64`, `f32`, `f64`,
3602/// `bool`, `String`, `Vec<f32>` (for `VECTOR(N)` columns),
3603/// `Option<T>` of any of the above.
3604#[macro_export]
3605macro_rules! spg_row {
3606 (
3607 $(#[$meta:meta])*
3608 $vis:vis struct $name:ident {
3609 $(
3610 $(#[$fmeta:meta])*
3611 $fvis:vis $field:ident : $ty:ty,
3612 )*
3613 }
3614 ) => {
3615 $(#[$meta])*
3616 #[derive(Debug, Clone)]
3617 $vis struct $name {
3618 $(
3619 $(#[$fmeta])*
3620 $fvis $field : $ty,
3621 )*
3622 }
3623
3624 impl $crate::FromSpgRow for $name {
3625 fn from_spg_row(row: &[$crate::Value]) -> ::core::result::Result<Self, $crate::EngineError> {
3626 let mut __spg_row_iter = row.iter();
3627 $(
3628 let $field: $ty = {
3629 let v = __spg_row_iter
3630 .next()
3631 .ok_or_else(|| $crate::EngineError::Unsupported(
3632 ::std::format!(
3633 "spg_row! {}: missing column for field `{}`",
3634 ::core::stringify!($name),
3635 ::core::stringify!($field)
3636 )
3637 ))?;
3638 <$ty as $crate::FromSpgValue>::from_spg_value(v)
3639 .map_err(|e| $crate::EngineError::Unsupported(
3640 ::std::format!(
3641 "spg_row! {}: column `{}`: {}",
3642 ::core::stringify!($name),
3643 ::core::stringify!($field),
3644 e
3645 )
3646 ))?
3647 };
3648 )*
3649 Ok(Self { $($field,)* })
3650 }
3651 }
3652 };
3653}
3654
3655/// v7.3.0 — per-column decoder used by `spg_row!`. Surface
3656/// covers every numeric / text / bytes / bool variant in
3657/// `Value`, plus `Option<T>` for nullable columns.
3658pub trait FromSpgValue: Sized {
3659 /// Decode one cell into `Self`. The returned `&'static str`
3660 /// is a short diagnostic for type mismatches (e.g. `"expected
3661 /// integer, got TEXT"`); callers wrap it into their own
3662 /// error type.
3663 fn from_spg_value(v: &Value) -> Result<Self, &'static str>;
3664}
3665
3666macro_rules! impl_from_value_int {
3667 ($($t:ty),* $(,)?) => {
3668 $(
3669 impl FromSpgValue for $t {
3670 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3671 match v {
3672 Value::SmallInt(n) => <$t>::try_from(*n).map_err(|_| "SmallInt does not fit target int type"),
3673 Value::Int(n) => <$t>::try_from(*n).map_err(|_| "Int does not fit target int type"),
3674 Value::BigInt(n) => <$t>::try_from(*n).map_err(|_| "BigInt does not fit target int type"),
3675 Value::Null => Err("NULL in non-Option int column"),
3676 _ => Err("non-integer value in int column"),
3677 }
3678 }
3679 }
3680 )*
3681 };
3682}
3683impl_from_value_int!(i16, i32, i64);
3684
3685impl FromSpgValue for f32 {
3686 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3687 match v {
3688 Value::Float(f) => Ok(*f as f32),
3689 Value::Null => Err("NULL in non-Option float column"),
3690 _ => Err("non-float value in float column"),
3691 }
3692 }
3693}
3694
3695impl FromSpgValue for f64 {
3696 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3697 match v {
3698 Value::Float(f) => Ok(*f),
3699 Value::Null => Err("NULL in non-Option float column"),
3700 _ => Err("non-float value in float column"),
3701 }
3702 }
3703}
3704
3705impl FromSpgValue for bool {
3706 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3707 match v {
3708 Value::Bool(b) => Ok(*b),
3709 Value::Null => Err("NULL in non-Option bool column"),
3710 _ => Err("non-bool value in bool column"),
3711 }
3712 }
3713}
3714
3715impl FromSpgValue for String {
3716 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3717 match v {
3718 Value::Text(s) => Ok(s.clone()),
3719 Value::Null => Err("NULL in non-Option text column"),
3720 _ => Err("non-text value in String column"),
3721 }
3722 }
3723}
3724
3725impl FromSpgValue for Vec<f32> {
3726 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3727 match v {
3728 Value::Vector(xs) => Ok(xs.clone()),
3729 Value::Null => Err("NULL in non-Option vector column"),
3730 _ => Err("non-vector value in Vec<f32> column"),
3731 }
3732 }
3733}
3734
3735impl<T: FromSpgValue> FromSpgValue for Option<T> {
3736 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3737 match v {
3738 Value::Null => Ok(None),
3739 other => T::from_spg_value(other).map(Some),
3740 }
3741 }
3742}
3743
3744/// Acquire the cross-process exclusion lock at `lock_path` (atomic
3745/// `mkdir`), recording the owner pid inside. If the lock already
3746/// exists, read the recorded pid and probe liveness — a lock left
3747/// behind by a killed process (docker SIGKILL, crash) is reclaimed
3748/// automatically instead of forcing the operator to delete it by
3749/// hand (mailrs embed round-12: a restarted server came up in
3750/// degraded mode because the previous instance's lock survived).
3751/// v7.27 (mailrs round-21 B) — the prober's environment identity:
3752/// `(hostname, boot-or-container id)`. A pid is only meaningful
3753/// inside the PID namespace that recorded it; mailrs's recovery
3754/// window saw "locked by pid 1" from a STOPPED container because
3755/// the prober's pid 1 (its own init) was alive. When the lock's
3756/// identity differs from ours, liveness is UNDECIDABLE and we
3757/// refuse honestly instead of guessing in either direction.
3758fn host_identity() -> (String, String) {
3759 let hostname = std::process::Command::new("hostname")
3760 .output()
3761 .ok()
3762 .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string())
3763 .unwrap_or_default();
3764 // Linux boot id; containers share the host kernel's boot id, so
3765 // hostname (= container id by default) is the namespace
3766 // discriminator and boot id catches host reboots / pid reuse.
3767 let boot_id = std::fs::read_to_string("/proc/sys/kernel/random/boot_id")
3768 .map(|s| s.trim().to_string())
3769 .or_else(|_| {
3770 std::process::Command::new("sysctl")
3771 .args(["-n", "kern.bootsessionuuid"])
3772 .output()
3773 .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string())
3774 })
3775 .unwrap_or_default();
3776 (hostname, boot_id)
3777}
3778
3779/// v7.34 (crash-recovery P0 #2) — process start-time, to tell a reused
3780/// pid apart from a genuinely-held lock. In a container the holder is
3781/// always pid 1; `docker start` reuses the container so the NEW process
3782/// is pid 1 too, on the same host+boot id — a bare `pid_alive(1)` probe
3783/// (`ps -p 1` always succeeds) reads a dead owner's lock as live and the
3784/// engine self-deadlocks on its own catalog. The `(pid, start-time)`
3785/// pair is unique per live process within a boot: a reused pid carries a
3786/// LATER start-time, so a mismatch means the recorded owner is gone.
3787/// Linux reads `/proc/<pid>/stat` field 22 (clock ticks since boot);
3788/// `comm` (field 2) is parenthesised and may contain spaces, so fields
3789/// are taken after the LAST ')'. Other platforms return None and the
3790/// liveness check falls back to pid-alive + the self-pid reclaim. Pure
3791/// std — no libc.
3792#[cfg(target_os = "linux")]
3793fn process_start_time(pid: u32) -> Option<String> {
3794 let stat = std::fs::read_to_string(format!("/proc/{pid}/stat")).ok()?;
3795 let after = stat.rsplit_once(')').map(|(_, rest)| rest)?;
3796 // After comm: state(1) ppid(2) … starttime is the 20th token.
3797 after.split_whitespace().nth(19).map(str::to_string)
3798}
3799
3800#[cfg(not(target_os = "linux"))]
3801fn process_start_time(_pid: u32) -> Option<String> {
3802 None
3803}
3804
3805fn acquire_path_lock(lock_path: &Path) -> Result<(), EngineError> {
3806 for attempt in 0..2 {
3807 match std::fs::create_dir(lock_path) {
3808 Ok(()) => {
3809 // Best-effort owner record; liveness probing treats a
3810 // missing pid file as stale (crash between mkdir and
3811 // write is indistinguishable from an ancient lock).
3812 // v7.27 — lines 2+3 record the owner's environment
3813 // identity (hostname, boot id) so a prober in a
3814 // different namespace refuses instead of misreading
3815 // the pid. v7.34 — line 4 records the owner's process
3816 // start-time so a reused pid (container pid-1 restart)
3817 // is distinguishable from a live holder.
3818 let (host, boot) = host_identity();
3819 let start = process_start_time(std::process::id()).unwrap_or_default();
3820 let _ = std::fs::write(
3821 lock_path.join("pid"),
3822 format!("{}\n{host}\n{boot}\n{start}\n", std::process::id()),
3823 );
3824 return Ok(());
3825 }
3826 Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists && attempt == 0 => {
3827 let record = std::fs::read_to_string(lock_path.join("pid")).unwrap_or_default();
3828 let mut lines = record.lines();
3829 let owner = lines.next().and_then(|s| s.trim().parse::<u32>().ok());
3830 let lock_host = lines.next().unwrap_or("").trim().to_string();
3831 let lock_boot = lines.next().unwrap_or("").trim().to_string();
3832 let lock_start = lines.next().unwrap_or("").trim().to_string();
3833 // v7.27 — identity check BEFORE the pid probe. A pid
3834 // recorded in another namespace is undecidable both
3835 // ways (a stale lock can look held, a held lock can
3836 // look stale — the unsafe direction). Old-format
3837 // locks (pid only) keep the legacy same-host
3838 // assumption.
3839 if !lock_host.is_empty() {
3840 let (my_host, my_boot) = host_identity();
3841 let same_env = lock_host == my_host
3842 && (lock_boot.is_empty() || my_boot.is_empty() || lock_boot == my_boot);
3843 if !same_env {
3844 return Err(EngineError::Unsupported(format!(
3845 "database lock {} was taken in a different host/container \
3846 (owner: pid {} on {:?}; we are {:?}) — liveness is \
3847 undecidable from here. If you are sure the owner is gone, \
3848 call Database::force_unlock() or `spg import --force-unlock`.",
3849 lock_path.display(),
3850 owner.unwrap_or(0),
3851 lock_host,
3852 my_host
3853 )));
3854 }
3855 }
3856 // v7.34 (crash-recovery P0 #2) — pid-reuse-safe liveness.
3857 // A bare `pid_alive` self-deadlocks in a container: the
3858 // dead owner was pid 1, `docker start` reuses the container
3859 // so the prober is pid 1 too, and `ps -p 1` always succeeds.
3860 // The recorded (pid, start-time) pair settles it — the
3861 // owner is alive ONLY if its pid is alive AND its CURRENT
3862 // start-time still matches the recorded one:
3863 // - container restart: pid 1 alive, but the new pid-1's
3864 // start-time differs from the dead owner's → stale.
3865 // - genuine double-open (same live process): start-time
3866 // matches (it wrote it) → held — correctly refused, so a
3867 // second writer can't steal a live lock.
3868 // An empty/uncomparable start-time (old-format lock or a
3869 // non-Linux owner with no /proc) falls back to the
3870 // pid-alive answer (the pre-v7.34 behaviour).
3871 let owner_alive = owner.is_some_and(|p| {
3872 pid_alive(p)
3873 && match process_start_time(p) {
3874 Some(now) if !lock_start.is_empty() => now == lock_start,
3875 _ => true,
3876 }
3877 });
3878 if owner_alive {
3879 return Err(EngineError::Unsupported(format!(
3880 "database is locked by another process (pid {}): {}; \
3881 stop that process first, or call Database::force_unlock()",
3882 owner.unwrap_or(0),
3883 lock_path.display()
3884 )));
3885 }
3886 // Stale — owner pid dead, reused, or unrecorded. Reclaim.
3887 eprintln!(
3888 "spg-embedded: reclaiming stale lock {} (owner pid {:?} not a live holder)",
3889 lock_path.display(),
3890 owner
3891 );
3892 std::fs::remove_dir_all(lock_path).map_err(io_err)?;
3893 // Loop retries the create_dir; a concurrent reclaimer
3894 // winning the race surfaces as AlreadyExists on
3895 // attempt 1 below.
3896 }
3897 Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
3898 return Err(EngineError::Unsupported(format!(
3899 "database is locked by another process: {}; \
3900 stop that process first, or call Database::force_unlock()",
3901 lock_path.display()
3902 )));
3903 }
3904 Err(e) => return Err(io_err(e)),
3905 }
3906 }
3907 unreachable!("acquire_path_lock loop covers both attempts")
3908}
3909
3910/// Probe whether `pid` is a live process. Unix: `ps -p` via the
3911/// system binary (std-only — no libc dependency). `ps -p` exits 0
3912/// for ANY live pid regardless of owner; `kill -0` was rejected
3913/// here because it fails with EPERM on another user's live process,
3914/// which would read as "dead" and reclaim a held lock. Probe
3915/// failure (no `ps` binary, exec error) conservatively reports
3916/// alive so locks are never auto-reclaimed on doubt; non-unix
3917/// targets do the same.
3918#[cfg(unix)]
3919fn pid_alive(pid: u32) -> bool {
3920 match std::process::Command::new("ps")
3921 .arg("-p")
3922 .arg(pid.to_string())
3923 .stdout(std::process::Stdio::null())
3924 .stderr(std::process::Stdio::null())
3925 .status()
3926 {
3927 Ok(status) => status.success(),
3928 Err(_) => true,
3929 }
3930}
3931
3932#[cfg(not(unix))]
3933fn pid_alive(_pid: u32) -> bool {
3934 true
3935}
3936
3937/// Strip leading whitespace, `--` line comments and NON-conditional
3938/// block comments from a chunk so statement-head checks (COPY
3939/// detection most notably) see the first real token. pg_dump
3940/// prefixes every data block with a `-- Data for Name: …;` banner —
3941/// which itself contains semicolons, so head checks must run on the
3942/// stripped text. MySQL executable conditional comments (`/*!`) are
3943/// content and stay.
3944/// v7.22 — see `split_statements`' `mysql_escapes` tracking. Only
3945/// short chunks are inspected (the signal statements are one-liners;
3946/// COPY data blocks are skipped by the length guard).
3947fn note_dialect_signals(chunk: &str, mysql_escapes: &mut bool) {
3948 if chunk.len() > 4096 {
3949 return;
3950 }
3951 let lower = chunk.to_ascii_lowercase();
3952 if lower.contains("sql_mode") {
3953 *mysql_escapes = true;
3954 } else if lower.contains("standard_conforming_strings") {
3955 *mysql_escapes = lower.contains("off");
3956 }
3957}
3958
3959fn strip_leading_sql_noise(mut s: &str) -> &str {
3960 loop {
3961 let t = s.trim_start();
3962 if let Some(rest) = t.strip_prefix("--") {
3963 s = rest.split_once('\n').map_or("", |(_, r)| r);
3964 continue;
3965 }
3966 if t.starts_with("/*") && !t.starts_with("/*!") {
3967 match t.find("*/") {
3968 Some(e) => {
3969 s = &t[e + 2..];
3970 continue;
3971 }
3972 None => return "",
3973 }
3974 }
3975 return t;
3976 }
3977}
3978
3979/// Split a multi-statement SQL script into individual statements on
3980/// top-level `;`, honouring single-quoted strings (with `''`
3981/// escapes), double-quoted identifiers, dollar-quoted bodies
3982/// (`$tag$ … $tag$`), line comments (`--`) and MySQL executable
3983/// conditional comments (`/*!… */` stay statement content; plain
3984/// nested block comments don't). Chunks that contain no statement
3985/// content (whitespace / comments only) are dropped. PG's
3986/// simple-query protocol does this server-side; the embed path owns
3987/// it here.
3988///
3989/// v7.22 (mailrs round-13 gap 1) — psql meta-command lines are
3990/// dropped for client parity: a line whose first non-whitespace
3991/// byte is `\` BETWEEN statements (PG 18's pg_dump wraps scripts in
3992/// `\restrict` / `\unrestrict`) never reaches the parser, the same
3993/// way psql consumes `\`-lines client-side and never sends them. A
3994/// mid-statement backslash stays an ordinary byte — pg_dump only
3995/// emits meta-commands between statements.
3996pub fn split_statements(sql: &str) -> Vec<&str> {
3997 let bytes = sql.as_bytes();
3998 let mut stmts = Vec::new();
3999 let mut start = 0usize;
4000 let mut has_content = false;
4001 // v7.22 (round-13 T3) — stream-tracked string dialect, mirroring
4002 // the engine's session flag: a statement mentioning `sql_mode`
4003 // (mysqldump preamble, often inside `/*!…*/`) switches plain
4004 // strings to backslash-escape scanning;
4005 // `standard_conforming_strings` (pg_dump preamble) switches
4006 // back. Without this the scanner ends a MySQL `'…\'…'` literal
4007 // early and splits inside data.
4008 let mut mysql_escapes = false;
4009 let mut i = 0usize;
4010 while i < bytes.len() {
4011 match bytes[i] {
4012 b'\\' if !has_content => {
4013 // Start-of-statement `\` = psql meta-command line.
4014 // Consume through end-of-line; restart the chunk
4015 // after it so the line never lands in the output.
4016 while i < bytes.len() && bytes[i] != b'\n' {
4017 i += 1;
4018 }
4019 start = if i < bytes.len() { i + 1 } else { i };
4020 }
4021 b'\'' => {
4022 has_content = true;
4023 // PG escape-string form `E'...'` honours backslash
4024 // escapes (`E'a\';b'` is ONE literal) — detect via
4025 // the immediately-preceding standalone E/e. MySQL
4026 // dialect sessions treat EVERY plain string that way.
4027 let escape_string = mysql_escapes
4028 || (i >= 1
4029 && matches!(bytes[i - 1], b'e' | b'E')
4030 && !(i >= 2
4031 && (bytes[i - 2].is_ascii_alphanumeric() || bytes[i - 2] == b'_')));
4032 i += 1;
4033 while i < bytes.len() {
4034 if escape_string && bytes[i] == b'\\' {
4035 // Skip the escaped byte (covers \' and \\).
4036 i += 2;
4037 continue;
4038 }
4039 if bytes[i] == b'\'' {
4040 // `''` is an escaped quote inside the literal.
4041 if i + 1 < bytes.len() && bytes[i + 1] == b'\'' {
4042 i += 2;
4043 continue;
4044 }
4045 break;
4046 }
4047 i += 1;
4048 }
4049 }
4050 b'"' => {
4051 has_content = true;
4052 i += 1;
4053 while i < bytes.len() && bytes[i] != b'"' {
4054 i += 1;
4055 }
4056 }
4057 b'$' => {
4058 // Possible dollar-quote opener `$tag$` (tag may be
4059 // empty). If the shape doesn't match, it's a plain
4060 // `$` (positional param) — fall through.
4061 let tag_end = bytes[i + 1..]
4062 .iter()
4063 .position(|&b| !(b.is_ascii_alphanumeric() || b == b'_'))
4064 .map(|off| i + 1 + off);
4065 if let Some(te) = tag_end
4066 && te < bytes.len()
4067 && bytes[te] == b'$'
4068 {
4069 has_content = true;
4070 let tag = &sql[i..=te];
4071 // Find the closing `$tag$`.
4072 if let Some(close) = sql[te + 1..].find(tag) {
4073 i = te + 1 + close + tag.len();
4074 continue;
4075 }
4076 // Unterminated — consume the rest; the parser
4077 // will report it.
4078 i = bytes.len();
4079 continue;
4080 }
4081 has_content = true;
4082 }
4083 b'-' if i + 1 < bytes.len() && bytes[i + 1] == b'-' => {
4084 while i < bytes.len() && bytes[i] != b'\n' {
4085 i += 1;
4086 }
4087 }
4088 b'/' if i + 1 < bytes.len() && bytes[i + 1] == b'*' => {
4089 // v7.22 (round-13 T3) — MySQL conditional comments
4090 // `/*!40101 … */` are EXECUTABLE (mysqldump wraps
4091 // its whole preamble + DISABLE KEYS hints in them);
4092 // they must stay statement content for the engine,
4093 // not be skipped as commentary.
4094 if i + 2 < bytes.len() && bytes[i + 2] == b'!' {
4095 has_content = true;
4096 }
4097 let mut depth = 1usize;
4098 i += 2;
4099 while i < bytes.len() && depth > 0 {
4100 if bytes[i] == b'/' && i + 1 < bytes.len() && bytes[i + 1] == b'*' {
4101 depth += 1;
4102 i += 2;
4103 } else if bytes[i] == b'*' && i + 1 < bytes.len() && bytes[i + 1] == b'/' {
4104 depth -= 1;
4105 i += 2;
4106 } else {
4107 i += 1;
4108 }
4109 }
4110 continue;
4111 }
4112 b';' => {
4113 if has_content {
4114 let head = &sql[start..i];
4115 // v7.22 (round-13 T2) — a `COPY … FROM stdin;`
4116 // statement owns its following data block
4117 // through the `\.` terminator line (data lines
4118 // may contain `;`, so generic splitting would
4119 // shred them). Swallow head + data into ONE
4120 // chunk; `execute_script` lowers it to INSERTs.
4121 // pg_dump prefixes the COPY with a comment
4122 // banner — strip it before the head check.
4123 let head_clean = strip_leading_sql_noise(head);
4124 let is_copy_head = head_clean
4125 .get(..4)
4126 .is_some_and(|p| p.eq_ignore_ascii_case("copy"))
4127 && spg_engine::copy::parse_copy_from_stdin_head(head_clean).is_some();
4128 if is_copy_head {
4129 // Scan whole lines after the ';' until the
4130 // `\.` terminator (or EOF — torn dumps lose
4131 // their tail, same as psql would error).
4132 let mut j = i + 1;
4133 let data_end;
4134 loop {
4135 if j >= bytes.len() {
4136 data_end = bytes.len();
4137 break;
4138 }
4139 let line_end = sql[j..].find('\n').map_or(bytes.len(), |off| j + off);
4140 if sql[j..line_end].trim_end_matches('\r').trim() == "\\." {
4141 data_end = j;
4142 i = line_end; // bottom i += 1 skips \n
4143 break;
4144 }
4145 j = line_end + 1;
4146 }
4147 stmts.push(&sql[start..data_end]);
4148 if data_end == bytes.len() {
4149 i = bytes.len();
4150 }
4151 start = i + 1;
4152 has_content = false;
4153 i += 1;
4154 continue;
4155 }
4156 note_dialect_signals(head, &mut mysql_escapes);
4157 stmts.push(head);
4158 }
4159 start = i + 1;
4160 has_content = false;
4161 }
4162 b => {
4163 if !b.is_ascii_whitespace() {
4164 has_content = true;
4165 }
4166 }
4167 }
4168 i += 1;
4169 }
4170 if has_content {
4171 stmts.push(&sql[start..]);
4172 }
4173 stmts
4174}
4175
4176#[cfg(test)]
4177mod tests {
4178 use super::*;
4179
4180 #[test]
4181 fn split_statements_basic_and_trailing() {
4182 assert_eq!(
4183 split_statements("CREATE TABLE a (x INT); INSERT INTO a VALUES (1)"),
4184 vec!["CREATE TABLE a (x INT)", " INSERT INTO a VALUES (1)"]
4185 );
4186 // whitespace/comment-only chunks drop
4187 assert!(split_statements(" ;; -- nothing\n;").is_empty());
4188 }
4189
4190 #[test]
4191 fn split_statements_quoting_forms() {
4192 // ';' inside a plain literal, a doubled quote, an E-string
4193 // backslash escape, a quoted identifier, and a dollar-quoted
4194 // body must not split.
4195 let cases = [
4196 "INSERT INTO t VALUES ('a;b')",
4197 "INSERT INTO t VALUES ('it''s; fine')",
4198 r"INSERT INTO t VALUES (E'it\'s; fine')",
4199 "CREATE TABLE \"odd;name\" (x INT)",
4200 "DO $body$ BEGIN PERFORM 1; END $body$",
4201 "DO $$ SELECT 1; $$",
4202 ];
4203 for sql in cases {
4204 assert_eq!(split_statements(sql), vec![sql], "must stay whole: {sql}");
4205 }
4206 // ...and each still splits cleanly from a neighbour.
4207 for sql in cases {
4208 let script = format!("{sql};\nSELECT 2");
4209 assert_eq!(
4210 split_statements(&script),
4211 vec![sql, "\nSELECT 2"],
4212 "must split after: {sql}"
4213 );
4214 }
4215 }
4216
4217 #[test]
4218 fn split_statements_drops_psql_meta_lines() {
4219 // v7.22 round-13 gap 1 — PG 18 pg_dump wraps scripts in
4220 // `\restrict` / `\unrestrict`; psql parity = the lines never
4221 // reach the parser.
4222 let script = "\\restrict TOKEN123\nSELECT 1;\n\\unrestrict TOKEN123\nSELECT 2;\n\\.\n";
4223 assert_eq!(split_statements(script), vec!["SELECT 1", "SELECT 2"]);
4224 // Mid-statement backslash is NOT a meta-command.
4225 let s2 = r"SELECT E'a\\b'";
4226 assert_eq!(split_statements(s2), vec![s2]);
4227 }
4228
4229 #[test]
4230 fn split_statements_comments_hide_semicolons() {
4231 let script = "-- c1 ; still comment\nSELECT 1; /* a ; b /* nested ; */ */ SELECT 2";
4232 let got = split_statements(script);
4233 assert_eq!(got.len(), 2);
4234 assert!(got[0].contains("SELECT 1"));
4235 assert!(got[1].contains("SELECT 2"));
4236 }
4237
4238 #[test]
4239 fn in_memory_create_insert_select() {
4240 let mut db = Database::open_in_memory();
4241 db.execute("CREATE TABLE t (id INT NOT NULL, name TEXT)")
4242 .unwrap();
4243 db.execute("INSERT INTO t VALUES (1, 'alice')").unwrap();
4244 db.execute("INSERT INTO t VALUES (2, 'bob')").unwrap();
4245 let rows = db.query("SELECT id FROM t WHERE id = 1").unwrap();
4246 assert_eq!(rows.len(), 1);
4247 match &rows[0][0] {
4248 Value::Int(1) => {}
4249 other => panic!("expected Int(1), got {other:?}"),
4250 }
4251 }
4252
4253 #[test]
4254 fn query_on_non_select_errors() {
4255 let mut db = Database::open_in_memory();
4256 db.execute("CREATE TABLE t (id INT)").unwrap();
4257 let r = db.query("INSERT INTO t VALUES (1)");
4258 assert!(r.is_err(), "query() on INSERT must error");
4259 }
4260
4261 #[test]
4262 fn snapshot_roundtrip() {
4263 let mut db = Database::open_in_memory();
4264 db.execute("CREATE TABLE t (id INT NOT NULL)").unwrap();
4265 db.execute("INSERT INTO t VALUES (42)").unwrap();
4266 let bytes = db.snapshot();
4267 let mut restored = Database::restore(&bytes).unwrap();
4268 let rows = restored.query("SELECT id FROM t WHERE id = 42").unwrap();
4269 assert_eq!(rows.len(), 1);
4270 match &rows[0][0] {
4271 Value::Int(42) => {}
4272 other => panic!("expected Int(42), got {other:?}"),
4273 }
4274 }
4275
4276 #[test]
4277 fn from_spg_row_trait_shape() {
4278 struct User {
4279 _id: i32,
4280 }
4281 impl FromSpgRow for User {
4282 fn from_spg_row(row: &[Value]) -> Result<Self, EngineError> {
4283 match row.first() {
4284 Some(Value::Int(n)) => Ok(Self { _id: *n }),
4285 _ => Err(EngineError::Unsupported("bad id".into())),
4286 }
4287 }
4288 }
4289 let row = vec![Value::Int(7)];
4290 let _u = User::from_spg_row(&row).unwrap();
4291 }
4292}