spg_embedded/lib.rs
1// v7.7.2 — every public item in this crate must carry a
2// doc-comment; new code that adds a `pub` without one fails CI.
3#![deny(missing_docs)]
4
5//! # spg-embedded
6//!
7//! Ergonomic embedded-mode entry point for SPG. Wraps the
8//! `spg-engine` execution layer for in-process applications
9//! that don't want to spin up a TCP listener / fork to the
10//! `spg-server` binary.
11//!
12//! ## Quick start
13//!
14//! ```no_run
15//! use spg_embedded::Database;
16//!
17//! // On-disk, durable. WAL fsynced per commit; auto-checkpoint
18//! // at 4 MiB WAL by default.
19//! let mut db = Database::open_path("/data/app.db").unwrap();
20//! db.execute("CREATE TABLE users (id INT NOT NULL, name TEXT)").unwrap();
21//! db.execute("INSERT INTO users VALUES (1, 'alice')").unwrap();
22//! let rows = db.query("SELECT name FROM users WHERE id = 1").unwrap();
23//! for row in &rows {
24//! println!("{:?}", row);
25//! }
26//! ```
27//!
28//! ## Production checklist (v7.5)
29//!
30//! - **Persistence**: `Database::open_path(p)` writes a
31//! crash-consistent WAL + periodic checkpoint snapshot. The
32//! on-disk format is byte-identical to what `spg-server`
33//! produces, so a database can move between modes without
34//! conversion.
35//! - **Durability**: every `execute()` that mutates calls
36//! `fsync` before returning `Ok`. There is no group commit
37//! in embedded mode — every commit pays one fsync. If you
38//! need batch throughput, wrap multiple statements in
39//! [`Database::with_transaction`] which fsyncs only at
40//! commit.
41//! - **Concurrency**: [`Database`] is `Send` but **not** `Sync`.
42//! Share across threads via `Arc<Mutex<Database>>`. The
43//! single-writer model is intentional — see
44//! [STABILITY § A1](https://github.com/lihao/spg/blob/master/STABILITY.md).
45//! - **Background work**: [`Database::spawn_background_freezer`]
46//! moves cold rows to disk-resident segments while you keep
47//! serving requests. It runs in a dedicated thread; drop the
48//! returned [`FreezerHandle`] (or call `stop()`) for clean
49//! shutdown.
50//! - **Errors**: all public enums ([`EngineError`],
51//! [`QueryResult`], [`Value`]) are `#[non_exhaustive]`. Match
52//! them with a wildcard arm so future v7.x releases can add
53//! variants without breaking your code.
54//!
55//! ## Panic contract
56//!
57//! - **No `execute()` / `query()` call panics on user input.**
58//! Malformed SQL, type mismatches, missing tables — all
59//! return `Err(EngineError::…)`. If you observe a panic on
60//! a user-controlled string, that is a bug; file an issue.
61//! - The library panics **only** on internal invariant
62//! violations (e.g., catalog snapshot magic mismatch, WAL
63//! record CRC sentinel corruption that survived the boot-
64//! time validation). These represent silent disk corruption
65//! and an unwind would leak inconsistent state, so the
66//! release profile uses `panic = abort` — your host process
67//! dies fast rather than continuing on poisoned data.
68//! - If you cannot tolerate `panic = abort`, build with
69//! `--profile release-dbg` (keeps unwind tables) and use
70//! `std::panic::catch_unwind` at your application boundary.
71//!
72//! ## Why a separate crate?
73//!
74//! `spg-engine` is `no_std`-compatible (vendored alloc-only).
75//! The embedded-mode entry point uses `std` (filesystem,
76//! threading), so it lives in its own crate to keep the
77//! `no_std` boundary clean.
78
79pub use spg_engine::{CatalogSnapshot, Engine, EngineError, ParsedStatement, QueryResult};
80pub use spg_storage::{ColumnSchema, DataType, Value};
81
82/// v7.16.0 — handle for a parsed-and-planned SQL statement.
83/// Hand off to [`Database::execute_prepared`] / [`Database::query_prepared`]
84/// with a `&[Value]` slice carrying the bind parameters (PG-style
85/// `$1`, `$2`, … positional). Cheap to `Clone`; the underlying AST
86/// is shared by handle copies and cloned per bind call by the
87/// engine's executor.
88///
89/// The handle holds a snapshot of the AST at prepare time. If
90/// the engine's plan cache evicts the entry between prepare and
91/// execute (e.g. ANALYZE bumps the statistics version) the
92/// stored AST keeps working — `execute_prepared` operates on
93/// the handle's clone, not the cache entry.
94#[derive(Debug, Clone)]
95pub struct Statement {
96 /// The parsed + planned AST. `spg-engine::prepare_cached`
97 /// returns it as a clone of the cached plan, so any rewrite
98 /// passes (`expand_group_by_all`, `reorder_joins`, …) have
99 /// already run.
100 pub(crate) stmt: ParsedStatement,
101 /// Original SQL source, kept for `Display` / debug only.
102 /// WAL persistence renders from the AST so a bind-time
103 /// rewrite of `$1..$N` survives replay.
104 pub(crate) sql: String,
105}
106
107impl Statement {
108 /// Borrow the original SQL source — useful for tracing and
109 /// debug logs. WAL replay does NOT use this; it serialises
110 /// the bind-final AST instead.
111 #[must_use]
112 pub fn sql(&self) -> &str {
113 &self.sql
114 }
115}
116
117/// v7.16.0 — internal WAL helper. Mirrors what
118/// `Engine::execute_prepared` does to the cloned AST so the WAL
119/// record carries the bind-final SQL text (so replay's
120/// simple-query path reconstructs the same row state without
121/// needing the original `Statement` handle to still be alive).
122/// Errors from the underlying engine helper would only fire if
123/// the bind-final stmt referenced a placeholder past the params
124/// slice — and that case has already errored in the executor
125/// above before this helper runs, so we discard the Result here.
126fn wal_render_with_params(stmt: &mut ParsedStatement, params: &[Value]) {
127 let _ = spg_engine::substitute_placeholders(stmt, params);
128}
129
130use std::collections::BTreeMap;
131use std::fs::{File, OpenOptions};
132use std::io::Write;
133use std::path::{Path, PathBuf};
134use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
135use std::sync::{Arc, Condvar, Mutex};
136use std::thread::{self, JoinHandle};
137use std::time::{Duration, SystemTime, UNIX_EPOCH};
138
139/// v7.11.3 — wall-clock provider injected into every embedded
140/// `Engine`. Microseconds since the Unix epoch; clamps to
141/// `i64::MAX` if the system clock is far-future. Used by SQL's
142/// `NOW()` / `CURRENT_TIMESTAMP` / `CURRENT_DATE` rewrite layer
143/// so PG-idiomatic time queries work without the caller wiring
144/// their own clock.
145fn wall_clock_micros() -> i64 {
146 SystemTime::now()
147 .duration_since(UNIX_EPOCH)
148 .map_or(0, |d| i64::try_from(d.as_micros()).unwrap_or(i64::MAX))
149}
150
151use spg_manifest::{CatalogManifest, ColdSegmentEntry, manifest_path as spg_manifest_path};
152
153// -- v7.1 WAL format constants (mirror `spg-server`'s) ---------
154// Kept private so callers can't mis-frame records; the v3 layout
155// is the same the server uses, so a `spg-server` boot can read a
156// database an embedded process wrote and vice versa.
157const WAL_V2_SENTINEL: u32 = 0x8000_0000;
158const WAL_V3_FLAG: u32 = 0x4000_0000;
159const WAL_V3_TYPE_AUTO_COMMIT_SQL: u8 = 0x01;
160/// v7.18 — durability checkpoint marker stays at 0x02 (skipped on replay).
161const WAL_V3_TYPE_DURABILITY_CHECKPOINT: u8 = 0x02;
162/// v7.18 PITR — auto-commit-sql record with appended (commit_lsn,
163/// commit_unix_us) fields so replay can target a specific point in
164/// time. Backward-compat: v3 records (type 0x01) keep working, the
165/// envelope flag bits are unchanged. The new type byte is the
166/// schema-version discriminator.
167const WAL_V4_TYPE_AUTO_COMMIT_SQL: u8 = 0x10;
168/// v7.18 — sentinel for "no wall clock" inside a v4 record's
169/// commit_unix_us slot. Restore-to-timestamp skips records with
170/// this sentinel (no time anchor); LSN-based restore is
171/// unaffected.
172const WAL_V4_NO_CLOCK: i64 = i64::MIN;
173/// v7.18 — extra header bytes after the type byte in a v4 record:
174/// 8 bytes commit_lsn (u64 LE) + 8 bytes commit_unix_us (i64 LE).
175const WAL_V4_EXTRA_HEADER: usize = 16;
176/// v7.18 PITR — checkpoint anchor record written to the WAL *before*
177/// the snapshot file replaces the on-disk catalog. Carries the
178/// (lsn, ts, snapshot_path) triple so restore tooling can find the
179/// matching base snapshot without scanning the filesystem. Replay
180/// dispatch skips it (same as the v3 durability marker).
181const WAL_V4_TYPE_CHECKPOINT_MARKER: u8 = 0x11;
182
183/// v7.21 (mailrs embed round-12 polish) — one COMMITted explicit
184/// transaction, flushed atomically at COMMIT time. Payload = the
185/// transaction's bind-final mutation statements joined with `";\n"`;
186/// replay re-splits via [`split_statements`] and applies in order.
187/// Same 16-byte (commit_lsn, commit_unix_us) prefix as the v4
188/// auto-commit record. The record is CRC-framed like every other
189/// record, so replay applies the whole transaction or — torn tail —
190/// none of it; a transaction can never half-resurrect.
191///
192/// Why it exists: in-transaction mutations only touch the engine's
193/// shadow catalog (`modified_catalog: false`), so the per-statement
194/// auto-commit append never fired and a COMMIT followed by a crash
195/// (no graceful Drop checkpoint) lost the transaction.
196const WAL_V4_TYPE_TX_COMMIT_SQL: u8 = 0x12;
197
198/// v7.34 (crash-recovery P0 #2) — row-level physical redo record. Same v4
199/// envelope (lsn + ts + payload + CRC) but the payload is `encode_redo_log`
200/// bytes, not SQL. Replay applies the physical [`RowChange`]s via
201/// `Engine::apply_redo` instead of re-executing — O(changed rows), not the
202/// O(records × catalog_rows) statement-replay that hung the mailrs P0.
203const WAL_V5_TYPE_ROW_REDO: u8 = 0x13;
204
205/// v7.1 — auto-checkpoint threshold. Once the WAL grows past
206/// this many bytes, the next successful `execute()` call ends
207/// with a `checkpoint()` so the WAL stays bounded. Tunable via
208/// `SPG_EMBEDDED_CHECKPOINT_BYTES` env.
209/// v7.34 (crash-recovery P0 #2) — opt-in row-level redo WAL records.
210/// Default OFF during bringup; `SPG_WAL_ROW_REDO=1` makes mutating
211/// statements log physical changes (0x13) instead of SQL, so crash
212/// recovery applies them in O(changed rows) rather than re-executing in
213/// O(records × catalog_rows) (the superlinear replay hang root-caused on
214/// the mailrs P0). DDL still logs as SQL (hybrid log). When this returns
215/// true, `open_path` arms the engine's redo capture.
216fn row_redo_enabled() -> bool {
217 std::env::var("SPG_WAL_ROW_REDO")
218 .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
219 .unwrap_or(false)
220}
221
222fn default_checkpoint_threshold_bytes() -> u64 {
223 std::env::var("SPG_EMBEDDED_CHECKPOINT_BYTES")
224 .ok()
225 .and_then(|s| s.parse::<u64>().ok())
226 .filter(|&n| n > 0)
227 .unwrap_or(4 * 1024 * 1024)
228}
229
230/// v7.30.3 (mailrs round-26) — per-query byte budget on join/filter
231/// materialisation, default ON at 256 MiB for embed parity with the
232/// server's allocator-level `SPG_MAX_QUERY_BYTES` default. A fat
233/// backfill batch (1000 × full mail bodies) then errors with
234/// `QueryBytesExceeded` instead of walking the host into reclaim
235/// livelock. `SPG_MAX_QUERY_BYTES=0` disables; any other value
236/// overrides. NOT applied to the WAL-replay engine — replay must
237/// never fail on a tuning knob.
238fn engine_with_query_byte_budget(engine: Engine) -> Engine {
239 const DEFAULT_MAX_QUERY_BYTES: usize = 256 * 1024 * 1024;
240 match std::env::var("SPG_MAX_QUERY_BYTES")
241 .ok()
242 .and_then(|s| s.trim().parse::<usize>().ok())
243 {
244 Some(0) => engine,
245 Some(n) => engine.with_max_query_bytes(n),
246 None => engine.with_max_query_bytes(DEFAULT_MAX_QUERY_BYTES),
247 }
248}
249
250/// v7.1 — encode one v3 `auto_commit_sql` record. Layout:
251///
252/// ```text
253/// [u32 LE (len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
254/// [u32 LE crc32 over (type_byte || sql_bytes)]
255/// [u8 type = 0x01]
256/// [sql bytes]
257/// ```
258fn encode_v3_auto_commit(sql: &str) -> Vec<u8> {
259 let payload = sql.as_bytes();
260 let mut crc_buf = Vec::with_capacity(1 + payload.len());
261 crc_buf.push(WAL_V3_TYPE_AUTO_COMMIT_SQL);
262 crc_buf.extend_from_slice(payload);
263 let crc = spg_crypto::crc32::crc32(&crc_buf);
264 let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
265 let mut out = Vec::with_capacity(4 + 4 + 1 + payload.len());
266 out.extend_from_slice(&header);
267 out.extend_from_slice(&crc.to_le_bytes());
268 out.push(WAL_V3_TYPE_AUTO_COMMIT_SQL);
269 out.extend_from_slice(payload);
270 out
271}
272
273/// v7.20 P2 — WAL group-commit. N concurrent commits share one
274/// fsync (the 4.2 ms p50 that profile_breakdown measured as
275/// 99.2% of the durable write path).
276///
277/// Leader-follower protocol, same family as PG's group commit:
278///
279/// 1. `enqueue(record)` — called while the caller still holds
280/// the engine's write lock. Appends the encoded record to the
281/// shared buffer, returns a sequence ticket. O(memcpy).
282/// 2. Caller RELEASES the engine write lock (the next writer's
283/// mutation proceeds in parallel with this batch's fsync).
284/// 3. `wait_flushed(seq)` — if nobody is flushing, the caller
285/// elects itself leader: swaps the buffer out, writes +
286/// fsyncs ONCE for every record in the batch, marks the
287/// batch durable, wakes all followers. Otherwise it parks on
288/// the condvar until a leader covers its seq.
289///
290/// Durability contract is unchanged from v7.19: `execute()`
291/// does not return Ok until the record that describes its
292/// mutation is fsynced. The only change is N callers sharing
293/// one fsync instead of paying one each.
294///
295/// Lock order (deadlock-free): `state` then `file`; never the
296/// reverse. The leader holds `file` WITHOUT `state` during IO so
297/// enqueues continue while fsync runs.
298#[derive(Debug)]
299struct WalGroup {
300 state: Mutex<WalGroupState>,
301 cond: std::sync::Condvar,
302 /// Active chunk file handle. Separate lock from `state` so
303 /// the leader's write+fsync doesn't block concurrent
304 /// enqueues. Swapped by `checkpoint()` at rotation.
305 file: Mutex<File>,
306}
307
308#[derive(Debug)]
309struct WalGroupState {
310 /// Encoded records awaiting flush.
311 buf: Vec<u8>,
312 /// Monotonic enqueue counter (1-based).
313 enqueued_seq: u64,
314 /// Highest seq whose record is fsynced.
315 flushed_seq: u64,
316 /// True while some caller is inside the leader IO section.
317 leader_active: bool,
318 /// Sticky fatal error — a failed fsync poisons the WAL
319 /// (loud, never silent). All current + future waiters error.
320 failed: Option<String>,
321 /// Bytes written to the active chunk since rotation —
322 /// drives the auto-checkpoint trigger.
323 written_len: u64,
324}
325
326/// Ticket returned by the buffered write path; `wait()` blocks
327/// until the record it covers is durable (or the WAL is
328/// poisoned). Cheap to move across threads.
329#[derive(Debug)]
330pub struct WalTicket {
331 group: Arc<WalGroup>,
332 seq: u64,
333}
334
335/// v7.34 (crash-recovery P0 #2) — RAII reset for the WalGroup leader
336/// flag. Electing a leader sets `leader_active = true` and releases the
337/// state lock for the sleep+IO window; if a panic unwinds through that
338/// window the flag would stay true and every follower would park forever
339/// on the condvar — no one left to flush or wake them, the same
340/// total-write hang an unclean stop causes, but self-inflicted. This
341/// guard clears the flag and wakes the followers (so one re-elects) on
342/// ANY drop, including a panic unwind; the normal path disarms it after
343/// resetting the flag itself.
344struct LeaderGuard<'a> {
345 group: &'a WalGroup,
346 armed: bool,
347}
348
349impl Drop for LeaderGuard<'_> {
350 fn drop(&mut self) {
351 if self.armed {
352 let mut g = self.group.state.lock().unwrap_or_else(|e| e.into_inner());
353 g.leader_active = false;
354 drop(g);
355 self.group.cond.notify_all();
356 }
357 }
358}
359
360impl WalGroup {
361 fn new(file: File, initial_len: u64) -> Self {
362 Self {
363 state: Mutex::new(WalGroupState {
364 buf: Vec::new(),
365 enqueued_seq: 0,
366 flushed_seq: 0,
367 leader_active: false,
368 failed: None,
369 written_len: initial_len,
370 }),
371 cond: std::sync::Condvar::new(),
372 file: Mutex::new(file),
373 }
374 }
375
376 /// Append `record` to the pending batch. Returns the seq the
377 /// caller must wait on. Called under the engine write lock —
378 /// keep it O(memcpy).
379 fn enqueue(&self, record: &[u8]) -> u64 {
380 let mut g = self.state.lock().unwrap_or_else(|e| e.into_inner());
381 g.buf.extend_from_slice(record);
382 g.enqueued_seq += 1;
383 g.enqueued_seq
384 }
385
386 /// Block until `seq` is durable. Leader-follower: the first
387 /// arriving waiter flushes for everyone.
388 fn wait_flushed(&self, seq: u64) -> Result<(), EngineError> {
389 let mut g = self.state.lock().unwrap_or_else(|e| e.into_inner());
390 loop {
391 if let Some(e) = &g.failed {
392 return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
393 format!("WAL poisoned by earlier flush failure: {e}"),
394 )));
395 }
396 if g.flushed_seq >= seq {
397 return Ok(());
398 }
399 if !g.leader_active {
400 // Elect self leader.
401 g.leader_active = true;
402 drop(g);
403 // v7.34 — panic-safety: if anything below unwinds before
404 // `leader_active` is reset, this guard releases it +
405 // wakes a follower to re-elect (else all writers park
406 // forever). Disarmed on the normal path after the reset.
407 let mut leader_guard = LeaderGuard {
408 group: self,
409 armed: true,
410 };
411 // v7.20 — commit_delay (PG's same-named knob):
412 // before taking the batch, give in-flight
413 // writers a short window to enqueue so the
414 // shared fsync covers more commits. 150 µs costs
415 // ~3.5% on a solo 4.2 ms fsync but multiplies
416 // batch size under load. Tunable via
417 // SPG_COMMIT_DELAY_US (0 disables).
418 let delay = commit_delay_us();
419 if delay > 0 {
420 std::thread::sleep(std::time::Duration::from_micros(delay));
421 }
422 let (batch, flush_to) = {
423 let mut g2 = self.state.lock().unwrap_or_else(|e| e.into_inner());
424 (core::mem::take(&mut g2.buf), g2.enqueued_seq)
425 };
426 let io_result: std::io::Result<()> = (|| {
427 let mut f = self.file.lock().unwrap_or_else(|e| e.into_inner());
428 f.write_all(&batch)?;
429 f.sync_data()
430 })();
431 g = self.state.lock().unwrap_or_else(|e| e.into_inner());
432 g.leader_active = false;
433 leader_guard.armed = false; // normal completion — disarm
434 match io_result {
435 Ok(()) => {
436 g.flushed_seq = flush_to;
437 g.written_len = g.written_len.saturating_add(batch.len() as u64);
438 }
439 Err(e) => {
440 g.failed = Some(e.to_string());
441 }
442 }
443 self.cond.notify_all();
444 //
445
446 // Loop continues: either our seq is now covered
447 // (leader path normally returns next iteration)
448 // or the error branch surfaces.
449 continue;
450 }
451 g = self.cond.wait(g).unwrap_or_else(|e| e.into_inner());
452 }
453 }
454
455 /// Drain the pending batch + flush synchronously. Caller must
456 /// guarantee no concurrent enqueues (checkpoint holds the
457 /// engine exclusively). Used before rotation so the marker
458 /// lands in the right chunk.
459 fn flush_now(&self) -> Result<(), EngineError> {
460 let mut g = self.state.lock().unwrap_or_else(|e| e.into_inner());
461 if let Some(e) = &g.failed {
462 return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
463 format!("WAL poisoned: {e}"),
464 )));
465 }
466 let batch = core::mem::take(&mut g.buf);
467 let flush_to = g.enqueued_seq;
468 if batch.is_empty() {
469 return Ok(());
470 }
471 drop(g);
472 let io: std::io::Result<()> = (|| {
473 let mut f = self.file.lock().unwrap_or_else(|e| e.into_inner());
474 f.write_all(&batch)?;
475 f.sync_data()
476 })();
477 let mut g = self.state.lock().unwrap_or_else(|e| e.into_inner());
478 match io {
479 Ok(()) => {
480 g.flushed_seq = flush_to;
481 g.written_len = g.written_len.saturating_add(batch.len() as u64);
482 self.cond.notify_all();
483 Ok(())
484 }
485 Err(e) => {
486 g.failed = Some(e.to_string());
487 self.cond.notify_all();
488 Err(io_err(e))
489 }
490 }
491 }
492
493 /// Swap the active chunk handle (rotation). Caller flushes
494 /// first; both locks taken in canonical order.
495 fn rotate_file(&self, new_file: File) {
496 let mut g = self.state.lock().unwrap_or_else(|e| e.into_inner());
497 let mut f = self.file.lock().unwrap_or_else(|e| e.into_inner());
498 *f = new_file;
499 g.written_len = 0;
500 }
501
502 fn written_len(&self) -> u64 {
503 let g = self.state.lock().unwrap_or_else(|e| e.into_inner());
504 g.written_len + g.buf.len() as u64
505 }
506}
507
508// ─────────────────────────────────────────────────────────────────────────────
509// CoW-2 (v7.34) — background-checkpoint worker.
510//
511// Splits checkpoint into two halves so the front-end pays only the cheap one:
512// • Capture (`Database::snapshot_checkpoint_job`) — under &mut self,
513// Arc-bump the catalog + cheap trailer/cold-segment clones + atomic
514// commit_lsn load. Front returns to caller in microseconds.
515// • Execute (`execute_checkpoint_job`, on the worker thread) — serialize
516// the snapshot, tmp+rename the db / manifest files (each fsynced via
517// the rename + dir-fsync), enqueue the v4 marker through the WalGroup
518// (which is already thread-safe so live commits interleave fine),
519// then rotate the chunk file.
520//
521// Replay floor is the marker LSN captured at front-end time. A crash any
522// time during the worker's sequence is safe: nothing past the previous
523// checkpoint's marker can have been forgotten until the new marker hits
524// the WAL, and live writes between the two go into the same chunk under
525// the old marker — replay re-applies them after restoring the (older)
526// snapshot. snapshot+manifest atomicity (D10) is unchanged from the sync
527// path — CoW-4 tightens it later.
528//
529// Single-instance: a state machine of {pending, inflight} so a new
530// trigger fires only when the worker is fully idle. Any sticky error
531// surfaces on the next `wait()`.
532
533#[derive(Debug)]
534struct CheckpointJob {
535 snapshot: spg_engine::EngineSnapshot,
536 marker_lsn: u64,
537 db_path: PathBuf,
538 wal_dir: PathBuf,
539 wal: Arc<WalGroup>,
540 /// Snapshot-time view of the cold-tier segment set. Carried into the
541 /// worker so any concurrent `freeze_oldest_to_cold` after the trigger
542 /// rides the *next* checkpoint's manifest — same staleness window
543 /// the sync path already had.
544 cold_segments: Vec<(u32, PathBuf)>,
545 /// Shared with `PersistenceCtx` so the worker's chunk rotation is
546 /// visible to subsequent diag / Drop introspection.
547 current_chunk_path: Arc<Mutex<PathBuf>>,
548}
549
550#[derive(Debug, Default)]
551struct CheckpointState {
552 /// Set by the front when it has a job ready; cleared when the worker
553 /// picks it up.
554 pending: Option<CheckpointJob>,
555 /// True while the worker is mid-execute. `pending.is_some() || inflight`
556 /// defines "busy" for the trigger / wait predicate.
557 inflight: bool,
558 /// Sticky error from the worker's last failure. Cleared when surfaced
559 /// to a `wait()` caller.
560 last_error: Option<EngineError>,
561 /// Drop signal — worker exits after the current job (or immediately if
562 /// idle and no pending).
563 shutdown: bool,
564}
565
566#[derive(Debug)]
567struct CheckpointWorker {
568 state: Arc<(Mutex<CheckpointState>, Condvar)>,
569 handle: Option<JoinHandle<()>>,
570}
571
572impl CheckpointWorker {
573 fn spawn() -> Self {
574 let state: Arc<(Mutex<CheckpointState>, Condvar)> =
575 Arc::new((Mutex::new(CheckpointState::default()), Condvar::new()));
576 let state_for_thread = Arc::clone(&state);
577 let handle = thread::Builder::new()
578 .name("spg-checkpoint".into())
579 .spawn(move || checkpoint_worker_loop(&state_for_thread))
580 .expect("spawn checkpoint worker");
581 Self {
582 state,
583 handle: Some(handle),
584 }
585 }
586
587 /// Try to enqueue a job. Returns `Ok(true)` if the worker accepted it,
588 /// `Ok(false)` if a job was already pending or in flight (skip — the
589 /// next trigger will pick up newer state). Surfaces any sticky error
590 /// from a previous run before considering the new job, so async paths
591 /// can't lose a failure indefinitely.
592 fn try_enqueue(&self, job: CheckpointJob) -> Result<bool, EngineError> {
593 let (lock, cond) = &*self.state;
594 let mut g = lock.lock().unwrap_or_else(|e| e.into_inner());
595 if let Some(e) = g.last_error.take() {
596 return Err(e);
597 }
598 if g.pending.is_some() || g.inflight {
599 return Ok(false);
600 }
601 g.pending = Some(job);
602 cond.notify_one();
603 Ok(true)
604 }
605
606 /// Block until the worker is idle (no pending, not in flight). Returns
607 /// any sticky error from the last run; clears it on the way out.
608 fn wait(&self) -> Result<(), EngineError> {
609 let (lock, cond) = &*self.state;
610 let mut g = lock.lock().unwrap_or_else(|e| e.into_inner());
611 while g.pending.is_some() || g.inflight {
612 g = cond.wait(g).unwrap_or_else(|e| e.into_inner());
613 }
614 match g.last_error.take() {
615 Some(e) => Err(e),
616 None => Ok(()),
617 }
618 }
619}
620
621impl Drop for CheckpointWorker {
622 fn drop(&mut self) {
623 {
624 let (lock, cond) = &*self.state;
625 let mut g = lock.lock().unwrap_or_else(|e| e.into_inner());
626 g.shutdown = true;
627 cond.notify_one();
628 }
629 if let Some(h) = self.handle.take() {
630 let _ = h.join();
631 }
632 }
633}
634
635fn checkpoint_worker_loop(state: &Arc<(Mutex<CheckpointState>, Condvar)>) {
636 let (lock, cond) = &**state;
637 loop {
638 let job = {
639 let mut g = lock.lock().unwrap_or_else(|e| e.into_inner());
640 while g.pending.is_none() && !g.shutdown {
641 g = cond.wait(g).unwrap_or_else(|e| e.into_inner());
642 }
643 if g.pending.is_none() {
644 // shutdown with no pending → exit cleanly.
645 return;
646 }
647 // Even on shutdown, drain the pending job first so the Drop-time
648 // final checkpoint is durable before exit.
649 let job = g.pending.take().expect("loop invariant");
650 g.inflight = true;
651 job
652 };
653 let result = execute_checkpoint_job(job);
654 {
655 let mut g = lock.lock().unwrap_or_else(|e| e.into_inner());
656 g.inflight = false;
657 if let Err(e) = result {
658 g.last_error = Some(e);
659 }
660 cond.notify_all();
661 }
662 }
663}
664
665fn execute_checkpoint_job(job: CheckpointJob) -> Result<(), EngineError> {
666 // 1. Serialize the captured snapshot. Heavy; this is the whole point
667 // of CoW — it runs off the engine borrow.
668 let snapshot = job.snapshot.serialize();
669 // 2. Snapshot tmp+rename. Atomic on POSIX; rename implicitly fsyncs
670 // the data the next directory walk sees.
671 let tmp = {
672 let mut t = job.db_path.clone();
673 let mut name = t
674 .file_name()
675 .map(std::ffi::OsStr::to_os_string)
676 .unwrap_or_default();
677 name.push(".tmp");
678 t.set_file_name(name);
679 t
680 };
681 std::fs::write(&tmp, &snapshot).map_err(io_err)?;
682 std::fs::rename(&tmp, &job.db_path).map_err(io_err)?;
683 // 3. Manifest tmp+rename (cold tier present).
684 if !job.cold_segments.is_empty() {
685 let snap_crc = spg_crypto::crc32::crc32(&snapshot);
686 let entries: Vec<ColdSegmentEntry> = job
687 .cold_segments
688 .iter()
689 .filter_map(|(segment_id, path)| {
690 let bytes = std::fs::read(path).ok()?;
691 Some(ColdSegmentEntry {
692 segment_id: *segment_id,
693 path: path.clone(),
694 crc32: spg_crypto::crc32::crc32(&bytes),
695 })
696 })
697 .collect();
698 let manifest = CatalogManifest {
699 catalog_crc32: snap_crc,
700 cold_segments: entries,
701 wal_baseline_offset: 0,
702 };
703 let m_bytes = manifest.serialize();
704 let m_path = spg_manifest_path(&job.db_path);
705 if let Some(dir) = m_path.parent() {
706 std::fs::create_dir_all(dir).map_err(io_err)?;
707 }
708 let m_tmp = {
709 let mut t = m_path.clone();
710 let mut name = t
711 .file_name()
712 .map(std::ffi::OsStr::to_os_string)
713 .unwrap_or_default();
714 name.push(".tmp");
715 t.set_file_name(name);
716 t
717 };
718 std::fs::write(&m_tmp, &m_bytes).map_err(io_err)?;
719 std::fs::rename(&m_tmp, &m_path).map_err(io_err)?;
720 }
721 // 4. Enqueue the v4 checkpoint marker carrying the captured LSN. The
722 // WalGroup is thread-safe so a live commit can interleave — the
723 // marker's LSN, not its position in the chunk, anchors replay.
724 let marker_ts = wall_clock_micros();
725 let marker = encode_v4_checkpoint_marker(job.marker_lsn, marker_ts, &job.db_path);
726 job.wal.enqueue(&marker);
727 job.wal.flush_now()?;
728 // 5. Rotate the active chunk. New commits land in the fresh chunk;
729 // pre-marker history stays addressable in the old chunk for PITR /
730 // retention. The shared `current_chunk_path` is updated under its
731 // own lock before the WalGroup swap so diag readers never see a
732 // handle that no longer matches the recorded path.
733 let new_chunk_path = job
734 .wal_dir
735 .join(chunk_filename(marker_ts, job.marker_lsn + 1));
736 let new_handle = OpenOptions::new()
737 .create(true)
738 .append(true)
739 .read(true)
740 .open(&new_chunk_path)
741 .map_err(io_err)?;
742 fsync_dir(&job.wal_dir);
743 {
744 let mut p = job
745 .current_chunk_path
746 .lock()
747 .unwrap_or_else(|e| e.into_inner());
748 *p = new_chunk_path;
749 }
750 job.wal.rotate_file(new_handle);
751 Ok(())
752}
753
754impl WalTicket {
755 /// Block until the record this ticket covers is durable.
756 ///
757 /// Under `SPG_SYNCHRONOUS_COMMIT=off` this returns
758 /// immediately — the background flusher (or the next
759 /// checkpoint / clean shutdown) makes the record durable
760 /// within `SPG_WAL_WRITER_DELAY_MS`. Same contract as PG's
761 /// `synchronous_commit = off`.
762 ///
763 /// # Errors
764 /// Surfaces the leader's IO error if the batch flush failed
765 /// (the WAL is then poisoned for all subsequent writes).
766 pub fn wait(&self) -> Result<(), EngineError> {
767 if !synchronous_commit_on() {
768 return Ok(());
769 }
770 self.group.wait_flushed(self.seq)
771 }
772}
773
774/// v7.19 P3 — retention sweep loop. Runs in a dedicated thread
775/// spawned by `Database::open_path` when `SPG_PITR_RETENTION_HOURS`
776/// is set to a non-zero value. Wakes every
777/// `SPG_PITR_RETENTION_CHECK_SEC` (default 60 s), enumerates chunks
778/// under `wal_dir`, archives via `SPG_PITR_ARCHIVE_CMD` if set, and
779/// deletes anything older than `retention_hours`.
780///
781/// Loud-failure posture matches PG's `archive_command`: if the
782/// archive command returns non-zero, the chunk stays on disk and
783/// a warning prints to stderr. The retention sweep doesn't delete
784/// a chunk it failed to archive.
785fn retention_sweep_loop(
786 wal_dir: PathBuf,
787 retention_hours: u64,
788 check_interval: std::time::Duration,
789 archive_cmd: Option<String>,
790 shutdown: Arc<AtomicBool>,
791) {
792 while !shutdown.load(Ordering::SeqCst) {
793 if let Err(e) = retention_sweep_once(&wal_dir, retention_hours, archive_cmd.as_deref()) {
794 eprintln!("spg-embedded: retention sweep error: {e}");
795 }
796 // Sleep in short ticks so shutdown isn't blocked on a
797 // 60 s naptime when Drop signals.
798 let mut elapsed = std::time::Duration::ZERO;
799 let tick = std::time::Duration::from_millis(250);
800 while elapsed < check_interval {
801 if shutdown.load(Ordering::SeqCst) {
802 return;
803 }
804 std::thread::sleep(tick);
805 elapsed += tick;
806 }
807 }
808}
809
810/// v7.19 P3 — one retention sweep pass over `wal_dir`. Extracted
811/// from the loop so tests can drive it directly. Public so the
812/// e2e_pitr_retention integration test (and any future operator
813/// tooling that wants synchronous retention) can call it.
814pub fn retention_sweep_once(
815 wal_dir: &Path,
816 retention_hours: u64,
817 archive_cmd: Option<&str>,
818) -> std::io::Result<()> {
819 if !wal_dir.exists() {
820 return Ok(());
821 }
822 let now_us = wall_clock_micros();
823 let cutoff_us = (now_us as i128 - (retention_hours as i128 * 3_600 * 1_000_000)) as i64;
824 let chunks = sorted_wal_chunks(wal_dir)?;
825 for chunk in chunks {
826 // Don't sweep the most-recent chunk; it's the live one
827 // execute() is appending to. Compare against the largest
828 // filename-prefix unix_us.
829 let stem = match chunk.file_stem().and_then(|s| s.to_str()) {
830 Some(s) => s,
831 None => continue,
832 };
833 let chunk_us: i64 = stem
834 .split_once('_')
835 .and_then(|(prefix, _)| i64::from_str_radix(prefix, 16).ok())
836 .unwrap_or(0);
837 if chunk_us >= cutoff_us {
838 continue;
839 }
840 // Archive first if requested.
841 if let Some(cmd) = archive_cmd {
842 if !cmd.is_empty() {
843 let output = std::process::Command::new("sh")
844 .arg("-c")
845 .arg(cmd)
846 .arg("--")
847 .arg(&chunk)
848 .output()?;
849 if !output.status.success() {
850 eprintln!(
851 "spg-embedded: SPG_PITR_ARCHIVE_CMD failed for {} (exit {}); chunk stays on disk",
852 chunk.display(),
853 output.status.code().unwrap_or(-1)
854 );
855 continue;
856 }
857 }
858 }
859 // Delete the chunk + its sibling .checksum if present.
860 if let Err(e) = std::fs::remove_file(&chunk) {
861 eprintln!(
862 "spg-embedded: retention remove {} failed: {e}",
863 chunk.display()
864 );
865 continue;
866 }
867 let mut cs = chunk.clone();
868 let mut name = cs.file_name().map(|n| n.to_os_string()).unwrap_or_default();
869 name.push(".checksum");
870 cs.set_file_name(name);
871 let _ = std::fs::remove_file(&cs);
872 }
873 Ok(())
874}
875
876/// v7.20 — group-commit delay window in µs (PG `commit_delay`
877/// analogue). The flush leader sleeps this long before taking
878/// the batch so concurrent writers pile in. Default 150 µs;
879/// `SPG_COMMIT_DELAY_US=0` disables.
880fn commit_delay_us() -> u64 {
881 static CACHED: std::sync::OnceLock<u64> = std::sync::OnceLock::new();
882 *CACHED.get_or_init(|| {
883 std::env::var("SPG_COMMIT_DELAY_US")
884 .ok()
885 .and_then(|s| s.parse::<u64>().ok())
886 .unwrap_or(150)
887 })
888}
889
890/// v7.20 — PG `synchronous_commit` analogue. `on` (default):
891/// `execute()` blocks until its WAL record is fsynced —
892/// zero-loss durability. `off`: `execute()` returns after the
893/// in-memory mutation + WAL enqueue; a background flusher
894/// thread writes + fsyncs every `SPG_WAL_WRITER_DELAY_MS`
895/// (default 200 ms — PG's `wal_writer_delay` default). Crash
896/// window = up to one flush interval of confirmed-but-unsynced
897/// commits — exactly the trade PG documents for the same
898/// setting. Clean shutdown (Drop / checkpoint) always flushes.
899fn synchronous_commit_on() -> bool {
900 static CACHED: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
901 *CACHED.get_or_init(|| {
902 !std::env::var("SPG_SYNCHRONOUS_COMMIT")
903 .map(|v| v.eq_ignore_ascii_case("off") || v == "0" || v.eq_ignore_ascii_case("false"))
904 .unwrap_or(false)
905 })
906}
907
908/// v7.20 — background WAL flusher cadence for
909/// `SPG_SYNCHRONOUS_COMMIT=off` (PG `wal_writer_delay`).
910fn wal_writer_delay_ms() -> u64 {
911 static CACHED: std::sync::OnceLock<u64> = std::sync::OnceLock::new();
912 *CACHED.get_or_init(|| {
913 std::env::var("SPG_WAL_WRITER_DELAY_MS")
914 .ok()
915 .and_then(|s| s.parse::<u64>().ok())
916 .filter(|&n| n > 0)
917 .unwrap_or(200)
918 })
919}
920
921fn pitr_retention_hours() -> u64 {
922 std::env::var("SPG_PITR_RETENTION_HOURS")
923 .ok()
924 .and_then(|s| s.parse::<u64>().ok())
925 .unwrap_or(0)
926}
927
928fn pitr_retention_check_sec() -> u64 {
929 std::env::var("SPG_PITR_RETENTION_CHECK_SEC")
930 .ok()
931 .and_then(|s| s.parse::<u64>().ok())
932 .filter(|&n| n > 0)
933 .unwrap_or(60)
934}
935
936fn pitr_archive_cmd() -> Option<String> {
937 std::env::var("SPG_PITR_ARCHIVE_CMD")
938 .ok()
939 .filter(|s| !s.is_empty())
940}
941
942/// v7.19 — replay every record from `wal_bytes` whose
943/// `commit_lsn` is strictly greater than `floor_lsn`. v3 records
944/// (no LSN) and v4 records with `commit_lsn <= floor_lsn` are
945/// skipped — the snapshot loaded ahead of this call already
946/// reflects them, and re-applying would DuplicateTable /
947/// double-insert. v3 records inside the legacy migration chunk
948/// always apply because the migration sets `floor_lsn = 0` and
949/// v3 records carry no LSN to compare; the pre-migration
950/// behaviour (every record replays) is what the migration
951/// preserves.
952///
953/// Returns the count of records successfully applied. Same
954/// torn-tail semantics as `replay_wal_into_engine`.
955fn replay_wal_filtered(
956 wal_bytes: &[u8],
957 engine: &mut Engine,
958 floor_lsn: u64,
959 quarantine: &mut Vec<QuarantinedStmt>,
960) -> Result<usize, String> {
961 let records = parse_wal_records(wal_bytes)?;
962 let mut applied = 0usize;
963 for r in &records {
964 // Skip markers + non-SQL records.
965 if r.type_byte == WAL_V3_TYPE_DURABILITY_CHECKPOINT
966 || r.type_byte == WAL_V4_TYPE_CHECKPOINT_MARKER
967 {
968 continue;
969 }
970 // v4 SQL records carry an LSN. Apply iff strictly above
971 // the snapshot floor.
972 if r.type_byte == WAL_V4_TYPE_AUTO_COMMIT_SQL
973 || r.type_byte == WAL_V4_TYPE_TX_COMMIT_SQL
974 || r.type_byte == WAL_V5_TYPE_ROW_REDO
975 {
976 if let Some(lsn) = r.commit_lsn {
977 if lsn <= floor_lsn {
978 continue;
979 }
980 }
981 }
982 // v7.34 (crash-recovery P0 #2) — row-level redo record: apply the
983 // physical changes directly (O(changed rows)) instead of
984 // re-executing SQL (the O(records × rows) statement-replay that
985 // hung the mailrs P0). The payload is `encode_redo_log` bytes, not
986 // SQL, so it never enters the from_utf8 / split_statements path.
987 if r.type_byte == WAL_V5_TYPE_ROW_REDO {
988 let changes = spg_storage::decode_redo_log(r.sql)
989 .map_err(|e| format!("redo decode at offset {}: {e:?}", r.offset))?;
990 engine
991 .apply_redo(&changes)
992 .map_err(|e| format!("redo apply at offset {}: {e:?}", r.offset))?;
993 applied += 1;
994 continue;
995 }
996 // v3 records (type 0x01, no LSN) always apply — the
997 // legacy migration path is the only place they appear,
998 // and floor_lsn=0 there.
999 let sql = match std::str::from_utf8(r.sql) {
1000 Ok(s) => s,
1001 Err(e) => return Err(format!("non-UTF-8 SQL at offset {}: {e}", r.offset)),
1002 };
1003 // v7.21 — a tx-commit record carries the whole transaction
1004 // as a `";\n"`-joined script; auto-commit records are a
1005 // single statement, for which split_statements is a no-op.
1006 //
1007 // v7.30.1 (mailrs round-24 ask 2) — a statement the engine
1008 // REJECTS is quarantined, not fatal: "one statement failed
1009 // to replay" ≠ "the catalog is corrupt". Framing damage
1010 // (parse_wal_records / non-UTF-8 above) still errors — that
1011 // IS corruption. Subsequent statements of a tx script keep
1012 // applying: the bricking class is a no-op-at-runtime
1013 // statement that re-applies non-idempotently, and skipping
1014 // just it reconstructs the runtime state.
1015 for stmt in split_statements(sql) {
1016 if let Err(e) = engine.execute(stmt) {
1017 quarantine.push(QuarantinedStmt {
1018 offset: r.offset,
1019 sql: stmt.to_string(),
1020 error: format!("{e:?}"),
1021 });
1022 }
1023 }
1024 applied += 1;
1025 }
1026 Ok(applied)
1027}
1028
1029/// v7.30.1 (mailrs round-24 ask 2) — one statement that failed to
1030/// re-apply during boot replay. Kept for forensics in a
1031/// `quarantine-*.log` beside the WAL chunks; the boot continues.
1032struct QuarantinedStmt {
1033 offset: usize,
1034 sql: String,
1035 error: String,
1036}
1037
1038fn format_quarantine_line(q: &QuarantinedStmt) -> String {
1039 format!("offset {}: {}\n rejected: {}\n", q.offset, q.sql, q.error)
1040}
1041
1042/// v7.19 — WAL chunk filename format. Zero-padded 16-digit
1043/// hex on both parts so default lexicographic sort matches
1044/// numeric order, with the unix_us prefix coming first so
1045/// the on-disk listing is chronological too.
1046/// v7.34 (crash-recovery P0 #2) — fsync a directory so a newly created
1047/// file's entry is durable. `sync_data` on a chunk file persists its
1048/// bytes but NOT the parent directory entry that names it; a power loss
1049/// after creating a fresh WAL chunk could lose that entry and make the
1050/// chunk (and the committed records in it) unreachable on restart.
1051/// Best-effort — a platform that rejects directory fsync is no worse off.
1052fn fsync_dir(dir: &Path) {
1053 if let Ok(f) = File::open(dir) {
1054 let _ = f.sync_all();
1055 }
1056}
1057
1058fn chunk_filename(unix_us: i64, leading_lsn: u64) -> String {
1059 // Negative timestamps shouldn't happen in practice (we sit
1060 // post-1970), but clamp to 0 so the zero-padded
1061 // representation stays sortable.
1062 let us = unix_us.max(0) as u64;
1063 format!("{us:016x}_{leading_lsn:016x}.wal")
1064}
1065
1066/// v7.19 — filename used for the legacy single-file WAL when
1067/// `open_path` migrates a v7.18-layout database into the new
1068/// chunk directory. Lexicographically smallest possible value
1069/// so subsequent chunks sort after it.
1070fn legacy_chunk_filename() -> String {
1071 chunk_filename(0, 0)
1072}
1073
1074/// CoW-4 (v7.34) — D10 fallback: read one cold-segment file and
1075/// hand its bytes to the catalog. The segment binary is self-validating
1076/// (magic + internal CRC32 via `OwnedSegment::from_bytes`), so we don't
1077/// need the manifest's `segment_crc32` to trust it. Returns `true` on a
1078/// successful attach (caller bumps `cold_segment_paths`), `false` on a
1079/// per-segment failure that is logged but doesn't abort boot.
1080fn attach_segment_from_disk(engine: &mut Engine, segment_id: u32, path: &Path) -> bool {
1081 if engine.catalog().cold_segment(segment_id).is_some() {
1082 return true;
1083 }
1084 let bytes = match std::fs::read(path) {
1085 Ok(b) => b,
1086 Err(e) => {
1087 eprintln!(
1088 "spg-embedded: cold-segment scan skip {}: read failed: {e}",
1089 path.display()
1090 );
1091 return false;
1092 }
1093 };
1094 let mut new_cat = engine.catalog().clone();
1095 if let Err(e) = new_cat.load_segment_bytes_at(segment_id, bytes) {
1096 eprintln!(
1097 "spg-embedded: cold-segment scan skip {}: parse/load failed: {e}",
1098 path.display()
1099 );
1100 return false;
1101 }
1102 engine.replace_catalog(new_cat);
1103 true
1104}
1105
1106/// CoW-4 (v7.34) — D10 + missing-manifest fallback: scan
1107/// `<db>.spg/segments/` for `seg_<id>.spg` files and attach any that
1108/// aren't already in `cold_segment_paths`. Closes the window where a
1109/// crash between snapshot rename and manifest rename leaves
1110/// post-checkpoint cold segments orphaned on disk (the snapshot's CRC
1111/// no longer matches the stale manifest, so the manifest path
1112/// silently dropped them). The segment parser self-verifies, so a
1113/// torn write surfaces as a per-segment skip, never silent corruption.
1114fn scan_cold_segments_dir(
1115 segments_dir: &Path,
1116 engine: &mut Engine,
1117 cold_segment_paths: &mut BTreeMap<u32, PathBuf>,
1118) {
1119 let read_dir = match std::fs::read_dir(segments_dir) {
1120 Ok(rd) => rd,
1121 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return,
1122 Err(e) => {
1123 eprintln!(
1124 "spg-embedded: cold-segment scan: cannot read {}: {e}",
1125 segments_dir.display()
1126 );
1127 return;
1128 }
1129 };
1130 for entry in read_dir.flatten() {
1131 let path = entry.path();
1132 // Only the canonical `seg_<id>.spg` form. `.tmp` half-renames
1133 // and unknown extensions are skipped — the segment writer's
1134 // tmp+rename pattern guarantees `.spg` files are either fully
1135 // written or absent.
1136 if path.extension().and_then(|s| s.to_str()) != Some("spg") {
1137 continue;
1138 }
1139 let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
1140 continue;
1141 };
1142 let Some(id_str) = stem.strip_prefix("seg_") else {
1143 continue;
1144 };
1145 let Ok(segment_id) = id_str.parse::<u32>() else {
1146 continue;
1147 };
1148 if cold_segment_paths.contains_key(&segment_id) {
1149 continue;
1150 }
1151 if attach_segment_from_disk(engine, segment_id, &path) {
1152 cold_segment_paths.insert(segment_id, path);
1153 }
1154 }
1155}
1156
1157/// v7.19 — list every `.wal` file in `wal_dir` in
1158/// lexicographic order (which doubles as chunk-creation
1159/// order thanks to the zero-padded filename format).
1160fn sorted_wal_chunks(wal_dir: &Path) -> std::io::Result<Vec<PathBuf>> {
1161 let mut paths = Vec::new();
1162 let read_dir = match std::fs::read_dir(wal_dir) {
1163 Ok(rd) => rd,
1164 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(paths),
1165 Err(e) => return Err(e),
1166 };
1167 for entry in read_dir {
1168 let entry = entry?;
1169 let path = entry.path();
1170 if path.extension().and_then(|s| s.to_str()) == Some("wal") {
1171 paths.push(path);
1172 }
1173 }
1174 paths.sort();
1175 Ok(paths)
1176}
1177
1178/// v7.18 PITR — encode one v4 `checkpoint_marker` record. Layout:
1179///
1180/// ```text
1181/// [u32 LE (payload_len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
1182/// [u32 LE crc32 over (type_byte || payload)]
1183/// [u8 type = 0x11]
1184/// payload:
1185/// [u64 LE checkpoint_lsn]
1186/// [i64 LE checkpoint_unix_us (WAL_V4_NO_CLOCK if no clock)]
1187/// [u16 LE snapshot_path_len]
1188/// [snapshot_path_bytes]
1189/// ```
1190///
1191/// `payload_len` covers only the payload — keeping the framing
1192/// uniform across v3 / v4 record types so torn-write detection in
1193/// `replay_wal_into_engine` stays trivial.
1194fn encode_v4_checkpoint_marker(
1195 checkpoint_lsn: u64,
1196 checkpoint_unix_us: i64,
1197 snapshot_path: &Path,
1198) -> Vec<u8> {
1199 let snapshot_bytes = snapshot_path.to_string_lossy().into_owned();
1200 let snap_payload = snapshot_bytes.as_bytes();
1201 let snap_len_u16: u16 = snap_payload.len().min(u16::MAX as usize) as u16;
1202 let mut payload = Vec::with_capacity(8 + 8 + 2 + snap_payload.len());
1203 payload.extend_from_slice(&checkpoint_lsn.to_le_bytes());
1204 payload.extend_from_slice(&checkpoint_unix_us.to_le_bytes());
1205 payload.extend_from_slice(&snap_len_u16.to_le_bytes());
1206 payload.extend_from_slice(&snap_payload[..snap_len_u16 as usize]);
1207 let mut crc_buf = Vec::with_capacity(1 + payload.len());
1208 crc_buf.push(WAL_V4_TYPE_CHECKPOINT_MARKER);
1209 crc_buf.extend_from_slice(&payload);
1210 let crc = spg_crypto::crc32::crc32(&crc_buf);
1211 let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
1212 let mut out = Vec::with_capacity(4 + 4 + 1 + payload.len());
1213 out.extend_from_slice(&header);
1214 out.extend_from_slice(&crc.to_le_bytes());
1215 out.push(WAL_V4_TYPE_CHECKPOINT_MARKER);
1216 out.extend_from_slice(&payload);
1217 out
1218}
1219
1220/// v7.18 PITR — encode one v4 `auto_commit_sql` record. Layout:
1221///
1222/// ```text
1223/// [u32 LE (sql_len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
1224/// [u32 LE crc32 over (type_byte || lsn || ts || sql_bytes)]
1225/// [u8 type = 0x10]
1226/// [u64 LE commit_lsn]
1227/// [i64 LE commit_unix_us (= WAL_V4_NO_CLOCK when no ClockFn)]
1228/// [sql bytes]
1229/// ```
1230///
1231/// `sql_len` field stays the SQL byte count — same shape as v3 — so
1232/// replay-buffer torn-write detection compares against
1233/// `WAL_V4_EXTRA_HEADER + sql_len`. v3 records (type 0x01) stay
1234/// readable by the same loop with their original 9-byte header
1235/// arithmetic.
1236fn encode_v4_auto_commit(sql: &str, commit_lsn: u64, commit_unix_us: i64) -> Vec<u8> {
1237 encode_v4_framed(
1238 WAL_V4_TYPE_AUTO_COMMIT_SQL,
1239 sql.as_bytes(),
1240 commit_lsn,
1241 commit_unix_us,
1242 )
1243}
1244
1245/// v7.21 — same envelope, `WAL_V4_TYPE_TX_COMMIT_SQL` type byte.
1246/// `script` = the transaction's statements joined with `";\n"`.
1247fn encode_v4_tx_commit(script: &str, commit_lsn: u64, commit_unix_us: i64) -> Vec<u8> {
1248 encode_v4_framed(
1249 WAL_V4_TYPE_TX_COMMIT_SQL,
1250 script.as_bytes(),
1251 commit_lsn,
1252 commit_unix_us,
1253 )
1254}
1255
1256/// v7.34 (crash-recovery P0 #2) — encode one row-level redo record. Same
1257/// v4 envelope + CRC, type byte 0x13; the payload is the
1258/// `encode_redo_log` bytes (physical changes) instead of SQL text, so
1259/// replay applies them in place of re-executing the statement.
1260fn encode_v5_row_redo(redo_bytes: &[u8], commit_lsn: u64, commit_unix_us: i64) -> Vec<u8> {
1261 encode_v4_framed(WAL_V5_TYPE_ROW_REDO, redo_bytes, commit_lsn, commit_unix_us)
1262}
1263
1264fn encode_v4_framed(
1265 type_byte: u8,
1266 payload: &[u8],
1267 commit_lsn: u64,
1268 commit_unix_us: i64,
1269) -> Vec<u8> {
1270 let mut crc_buf = Vec::with_capacity(1 + WAL_V4_EXTRA_HEADER + payload.len());
1271 crc_buf.push(type_byte);
1272 crc_buf.extend_from_slice(&commit_lsn.to_le_bytes());
1273 crc_buf.extend_from_slice(&commit_unix_us.to_le_bytes());
1274 crc_buf.extend_from_slice(payload);
1275 let crc = spg_crypto::crc32::crc32(&crc_buf);
1276 let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
1277 let mut out = Vec::with_capacity(4 + 4 + 1 + WAL_V4_EXTRA_HEADER + payload.len());
1278 out.extend_from_slice(&header);
1279 out.extend_from_slice(&crc.to_le_bytes());
1280 out.push(type_byte);
1281 out.extend_from_slice(&commit_lsn.to_le_bytes());
1282 out.extend_from_slice(&commit_unix_us.to_le_bytes());
1283 out.extend_from_slice(payload);
1284 out
1285}
1286
1287/// v7.1 — decode + apply every record in `wal_bytes` to `engine`.
1288/// Returns the count of records successfully applied. A truncated
1289/// trailing record (mid-write torn) is dropped silently — the
1290/// same recovery story `spg-server`'s boot path uses.
1291fn replay_wal_into_engine(wal_bytes: &[u8], engine: &mut Engine) -> Result<usize, String> {
1292 let mut applied = 0usize;
1293 let mut cur = 0usize;
1294 while cur < wal_bytes.len() {
1295 if wal_bytes.len() - cur < 4 {
1296 // Trailing partial header — torn write, drop and stop.
1297 break;
1298 }
1299 let raw_len = u32::from_le_bytes(wal_bytes[cur..cur + 4].try_into().unwrap());
1300 let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
1301 let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
1302 let len_mask = if is_v3 {
1303 !(WAL_V2_SENTINEL | WAL_V3_FLAG)
1304 } else {
1305 !WAL_V2_SENTINEL
1306 };
1307 let rec_len = (raw_len & len_mask) as usize;
1308 let header_len = if is_v3 {
1309 9
1310 } else if is_v2 {
1311 8
1312 } else {
1313 4
1314 };
1315 if wal_bytes.len() - cur < header_len + rec_len {
1316 // Torn record at the tail — drop, stop.
1317 break;
1318 }
1319 if is_v3 {
1320 let type_byte = wal_bytes[cur + 8];
1321 match type_byte {
1322 WAL_V3_TYPE_AUTO_COMMIT_SQL => {}
1323 WAL_V3_TYPE_DURABILITY_CHECKPOINT => {
1324 // durability_checkpoint marker — skip, no SQL.
1325 cur += header_len + rec_len;
1326 continue;
1327 }
1328 WAL_V4_TYPE_CHECKPOINT_MARKER => {
1329 // v7.18 PITR — checkpoint anchor, skip on replay
1330 // (engine state past this point reflects the
1331 // matching snapshot already loaded by the caller).
1332 cur += header_len + rec_len;
1333 continue;
1334 }
1335 WAL_V4_TYPE_AUTO_COMMIT_SQL | WAL_V4_TYPE_TX_COMMIT_SQL => {
1336 // v7.18 PITR — v4 record carries 16 bytes of
1337 // (commit_lsn, commit_unix_us) between the type
1338 // byte and the SQL payload. Replay reads them but
1339 // does not enforce them — the engine doesn't
1340 // surface LSN/clock here. Restore tooling
1341 // (spgctl) parses them via parse_wal_record below.
1342 //
1343 // v7.21 — tx-commit records (0x12) carry a whole
1344 // transaction as a `";\n"`-joined script;
1345 // split_statements is a no-op on the single-
1346 // statement auto-commit form.
1347 let v4_total = header_len + WAL_V4_EXTRA_HEADER + rec_len;
1348 if wal_bytes.len() - cur < v4_total {
1349 // Torn v4 record at the tail — drop, stop.
1350 break;
1351 }
1352 let sql_start = cur + header_len + WAL_V4_EXTRA_HEADER;
1353 let sql_bytes = &wal_bytes[sql_start..sql_start + rec_len];
1354 let sql = std::str::from_utf8(sql_bytes)
1355 .map_err(|e| format!("WAL replay: non-UTF-8 SQL at offset {cur}: {e}"))?;
1356 for stmt in split_statements(sql) {
1357 engine.execute(stmt).map_err(|e| {
1358 format!("WAL replay: apply {stmt:?} at offset {cur} rejected: {e:?}")
1359 })?;
1360 }
1361 applied += 1;
1362 cur += v4_total;
1363 continue;
1364 }
1365 other => {
1366 return Err(format!(
1367 "WAL replay: unknown v3 type byte {other:#04x} at offset {cur}"
1368 ));
1369 }
1370 }
1371 }
1372 let sql_bytes = &wal_bytes[cur + header_len..cur + header_len + rec_len];
1373 let sql = std::str::from_utf8(sql_bytes)
1374 .map_err(|e| format!("WAL replay: non-UTF-8 SQL at offset {cur}: {e}"))?;
1375 engine
1376 .execute(sql)
1377 .map_err(|e| format!("WAL replay: apply {sql:?} at offset {cur} rejected: {e:?}"))?;
1378 applied += 1;
1379 cur += header_len + rec_len;
1380 }
1381 Ok(applied)
1382}
1383
1384/// v7.18 PITR — parsed WAL record, surfaced for restore / verify
1385/// tooling. The replay loop above doesn't expose LSN/timestamp;
1386/// `spgctl restore --to <timestamp>` and `spgctl verify` need them.
1387/// Returned offsets are byte-positions inside the WAL buffer.
1388#[derive(Debug, Clone)]
1389pub struct WalRecord<'a> {
1390 /// Byte offset in the WAL buffer where this record starts.
1391 pub offset: usize,
1392 /// Type byte (0x01 = v3 auto-commit, 0x10 = v4 auto-commit,
1393 /// 0x02 = durability checkpoint marker).
1394 pub type_byte: u8,
1395 /// `Some(lsn)` for v4 records, `None` for v3.
1396 pub commit_lsn: Option<u64>,
1397 /// `Some(unix_us)` for v4 records carrying a clock-set timestamp,
1398 /// `None` for v3 or for v4 records explicitly written with
1399 /// `WAL_V4_NO_CLOCK` (sentinel for "no ClockFn at commit time").
1400 pub commit_unix_us: Option<i64>,
1401 /// SQL payload as borrowed bytes. Empty for durability markers.
1402 pub sql: &'a [u8],
1403}
1404
1405/// v7.18 PITR — iterate over `wal_bytes` yielding one `WalRecord`
1406/// per intact record. Torn-tail records terminate iteration
1407/// silently (same recovery story as `replay_wal_into_engine`).
1408/// Unknown type bytes inside a v3 envelope return `Err` so the
1409/// caller knows the WAL was written by a newer SPG.
1410pub fn parse_wal_records(wal_bytes: &[u8]) -> Result<Vec<WalRecord<'_>>, String> {
1411 let mut out = Vec::new();
1412 let mut cur = 0usize;
1413 while cur < wal_bytes.len() {
1414 if wal_bytes.len() - cur < 4 {
1415 break;
1416 }
1417 let raw_len = u32::from_le_bytes(wal_bytes[cur..cur + 4].try_into().unwrap());
1418 let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
1419 let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
1420 let len_mask = if is_v3 {
1421 !(WAL_V2_SENTINEL | WAL_V3_FLAG)
1422 } else {
1423 !WAL_V2_SENTINEL
1424 };
1425 let rec_len = (raw_len & len_mask) as usize;
1426 let header_len = if is_v3 {
1427 9
1428 } else if is_v2 {
1429 8
1430 } else {
1431 4
1432 };
1433 if wal_bytes.len() - cur < header_len + rec_len {
1434 break;
1435 }
1436 if !is_v3 {
1437 // v1 / v2 records carry no type byte; treat as legacy
1438 // auto-commit SQL with no LSN/time.
1439 let sql = &wal_bytes[cur + header_len..cur + header_len + rec_len];
1440 out.push(WalRecord {
1441 offset: cur,
1442 type_byte: WAL_V3_TYPE_AUTO_COMMIT_SQL,
1443 commit_lsn: None,
1444 commit_unix_us: None,
1445 sql,
1446 });
1447 cur += header_len + rec_len;
1448 continue;
1449 }
1450 let type_byte = wal_bytes[cur + 8];
1451 match type_byte {
1452 WAL_V3_TYPE_AUTO_COMMIT_SQL => {
1453 let sql = &wal_bytes[cur + header_len..cur + header_len + rec_len];
1454 out.push(WalRecord {
1455 offset: cur,
1456 type_byte,
1457 commit_lsn: None,
1458 commit_unix_us: None,
1459 sql,
1460 });
1461 cur += header_len + rec_len;
1462 }
1463 WAL_V3_TYPE_DURABILITY_CHECKPOINT => {
1464 out.push(WalRecord {
1465 offset: cur,
1466 type_byte,
1467 commit_lsn: None,
1468 commit_unix_us: None,
1469 sql: &[],
1470 });
1471 cur += header_len + rec_len;
1472 }
1473 WAL_V4_TYPE_CHECKPOINT_MARKER => {
1474 // v7.18 PITR — payload = (lsn u64)(ts i64)(path_len u16)(path bytes).
1475 // We surface lsn + ts on the WalRecord; the path lives
1476 // in `sql` since the type byte already disambiguates
1477 // record meaning and adding a dedicated field would
1478 // bloat the iterator return type for every variant.
1479 if rec_len < 18 {
1480 return Err(format!(
1481 "WAL parse: checkpoint marker at offset {cur} too short ({rec_len} bytes)"
1482 ));
1483 }
1484 let lsn = u64::from_le_bytes(
1485 wal_bytes[cur + header_len..cur + header_len + 8]
1486 .try_into()
1487 .unwrap(),
1488 );
1489 let ts_raw = i64::from_le_bytes(
1490 wal_bytes[cur + header_len + 8..cur + header_len + 16]
1491 .try_into()
1492 .unwrap(),
1493 );
1494 let path_len = u16::from_le_bytes(
1495 wal_bytes[cur + header_len + 16..cur + header_len + 18]
1496 .try_into()
1497 .unwrap(),
1498 ) as usize;
1499 if rec_len < 18 + path_len {
1500 return Err(format!(
1501 "WAL parse: checkpoint marker at offset {cur} truncated path"
1502 ));
1503 }
1504 let path_start = cur + header_len + 18;
1505 let path_bytes = &wal_bytes[path_start..path_start + path_len];
1506 let commit_unix_us = if ts_raw == WAL_V4_NO_CLOCK {
1507 None
1508 } else {
1509 Some(ts_raw)
1510 };
1511 out.push(WalRecord {
1512 offset: cur,
1513 type_byte,
1514 commit_lsn: Some(lsn),
1515 commit_unix_us,
1516 sql: path_bytes,
1517 });
1518 cur += header_len + rec_len;
1519 }
1520 WAL_V4_TYPE_AUTO_COMMIT_SQL | WAL_V4_TYPE_TX_COMMIT_SQL | WAL_V5_TYPE_ROW_REDO => {
1521 let v4_total = header_len + WAL_V4_EXTRA_HEADER + rec_len;
1522 if wal_bytes.len() - cur < v4_total {
1523 break;
1524 }
1525 let lsn = u64::from_le_bytes(
1526 wal_bytes[cur + header_len..cur + header_len + 8]
1527 .try_into()
1528 .unwrap(),
1529 );
1530 let ts_raw = i64::from_le_bytes(
1531 wal_bytes[cur + header_len + 8..cur + header_len + 16]
1532 .try_into()
1533 .unwrap(),
1534 );
1535 let commit_unix_us = if ts_raw == WAL_V4_NO_CLOCK {
1536 None
1537 } else {
1538 Some(ts_raw)
1539 };
1540 let sql_start = cur + header_len + WAL_V4_EXTRA_HEADER;
1541 let sql = &wal_bytes[sql_start..sql_start + rec_len];
1542 out.push(WalRecord {
1543 offset: cur,
1544 type_byte,
1545 commit_lsn: Some(lsn),
1546 commit_unix_us,
1547 sql,
1548 });
1549 cur += v4_total;
1550 }
1551 other => {
1552 return Err(format!(
1553 "WAL parse: unknown type byte {other:#04x} at offset {cur}"
1554 ));
1555 }
1556 }
1557 }
1558 Ok(out)
1559}
1560
1561/// v7.1 — predicate for "should the next `execute()` mutate the
1562/// WAL?" Returns `false` for SELECT / SHOW / EXPLAIN / BEGIN /
1563/// COMMIT / ROLLBACK and the SPG-specific verbs that don't go
1564/// through the auto-commit record path on the server (CHECKPOINT,
1565/// COMPACT). Conservative: anything we don't explicitly know is
1566/// read-only falls through to "write a WAL record".
1567fn sql_is_read_only(sql: &str) -> bool {
1568 let t = sql.trim_start();
1569 let head = t
1570 .split(|c: char| c.is_whitespace() || c == ';' || c == '(')
1571 .next()
1572 .unwrap_or("");
1573 matches!(
1574 head.to_ascii_lowercase().as_str(),
1575 "select"
1576 | "show"
1577 | "explain"
1578 | "begin"
1579 | "commit"
1580 | "rollback"
1581 | "checkpoint"
1582 | "compact"
1583 | "wait"
1584 | "with"
1585 )
1586}
1587
1588/// Embedded SPG database handle. Owns an `Engine` + provides
1589/// ergonomic wrappers around `execute` and `query`. Drops the
1590/// engine on `Drop` — no WAL flush / fsync, because v6.10.3
1591/// is in-memory only.
1592#[derive(Debug)]
1593pub struct Database {
1594 engine: Engine,
1595 /// v7.1 — persistence sidecar. When `Some(p)`, every
1596 /// `execute(sql)` that mutates state appends a v4
1597 /// `auto_commit_sql` WAL record + fsyncs before the call
1598 /// returns; `Drop` writes a final catalog snapshot to
1599 /// `<db_path>` so the next session boots from a clean
1600 /// snapshot + an empty WAL. `None` = in-memory only (the
1601 /// v6.10.3 shape).
1602 persistence: Option<PersistenceCtx>,
1603 /// v7.18 PITR — monotonic per-database commit LSN. Increments
1604 /// before each successful WAL append; bootstrapped at
1605 /// open_path from `max(parse_wal_records → commit_lsn)` so
1606 /// reopen never reuses an LSN. In-memory databases start at
1607 /// 0 and never advance (no WAL = no LSN-meaningful records).
1608 commit_lsn: AtomicU64,
1609 /// v7.21 (round-12 polish) — explicit-transaction WAL buffer.
1610 /// `Some` between an engine-accepted BEGIN and its
1611 /// COMMIT / ROLLBACK on a persistent database. In-transaction
1612 /// mutations only touch the engine's shadow catalog and report
1613 /// `modified_catalog: false`, so the per-statement auto-commit
1614 /// append never fires for them; their bind-final SQL collects
1615 /// here instead and COMMIT flushes the lot as ONE atomic
1616 /// `WAL_V4_TYPE_TX_COMMIT_SQL` record (ROLLBACK just drops it).
1617 /// Always `None` for in-memory databases.
1618 tx_wal: Option<TxWalBuffer>,
1619}
1620
1621/// See [`Database::tx_wal`].
1622#[derive(Debug, Default)]
1623struct TxWalBuffer {
1624 /// Bind-final SQL of every non-read-only statement the engine
1625 /// accepted inside the open transaction, in execution order.
1626 statements: Vec<String>,
1627 /// `(savepoint_name, statements.len() at SAVEPOINT time)` —
1628 /// `ROLLBACK TO SAVEPOINT` truncates `statements` back to the
1629 /// recorded mark so the WAL record matches what the engine
1630 /// keeps. PG name-reuse semantics (latest wins).
1631 savepoints: Vec<(String, usize)>,
1632}
1633
1634/// Statement-level transaction-control classification for the WAL
1635/// buffer. Runs AFTER the engine accepted the statement, so the
1636/// engine stays the single validator — this only mirrors state.
1637enum TxControl {
1638 Begin,
1639 Commit,
1640 Rollback,
1641 RollbackToSavepoint(String),
1642 Savepoint(String),
1643 ReleaseSavepoint,
1644}
1645
1646fn tx_control_kind(sql: &str) -> Option<TxControl> {
1647 let mut words = sql
1648 .split(|c: char| c.is_whitespace() || c == ';')
1649 .filter(|w| !w.is_empty())
1650 .map(str::to_ascii_lowercase);
1651 let head = words.next()?;
1652 match head.as_str() {
1653 "begin" | "start" => Some(TxControl::Begin),
1654 "commit" | "end" => Some(TxControl::Commit),
1655 "savepoint" => words.next().map(TxControl::Savepoint),
1656 "release" => Some(TxControl::ReleaseSavepoint),
1657 "rollback" => match words.next().as_deref() {
1658 // ROLLBACK TO [SAVEPOINT] <name>
1659 Some("to") => {
1660 let next = words.next()?;
1661 let name = if next == "savepoint" {
1662 words.next()?
1663 } else {
1664 next
1665 };
1666 Some(TxControl::RollbackToSavepoint(name))
1667 }
1668 _ => Some(TxControl::Rollback),
1669 },
1670 _ => None,
1671 }
1672}
1673
1674#[derive(Debug)]
1675#[allow(dead_code)] // `wal_dir`/`current_chunk_path` are read at boot; kept for Drop/diag introspection.
1676struct PersistenceCtx {
1677 db_path: PathBuf,
1678 /// v7.19 — WAL chunk directory at `<db_path>.wal/`.
1679 /// Replaces the v7.18 single-file `<db_path>.wal` layout.
1680 /// Each chunk file inside is named
1681 /// `<unix_us>_<leading_lsn>.wal` (zero-padded to 16 digits
1682 /// so default-lex sort = LSN order).
1683 wal_dir: PathBuf,
1684 /// Path of the currently-open chunk file inside `wal_dir`.
1685 /// Rotated at checkpoint and whenever the chunk crosses
1686 /// `checkpoint_threshold_bytes`. CoW-2 (v7.34) wraps it in
1687 /// `Arc<Mutex<…>>` because the background-checkpoint worker
1688 /// performs the rotation; this struct keeps a clone so Drop /
1689 /// diag introspection still see the live path.
1690 current_chunk_path: Arc<Mutex<PathBuf>>,
1691 /// v7.19 P3 — retention sweeper handle. `Some` when
1692 /// `SPG_PITR_RETENTION_HOURS > 0` at open_path time; `None`
1693 /// when retention is disabled (the default; v7.18 behaviour
1694 /// preserved). The thread polls `wal_dir` every
1695 /// `SPG_PITR_RETENTION_CHECK_SEC` seconds, archives via
1696 /// `SPG_PITR_ARCHIVE_CMD` if set, then deletes chunks older
1697 /// than the retention window. Signalled to exit via
1698 /// `retention_shutdown` on Drop.
1699 retention_shutdown: Option<Arc<AtomicBool>>,
1700 retention_thread: Option<std::thread::JoinHandle<()>>,
1701 /// v7.20 — background WAL flusher for
1702 /// `SPG_SYNCHRONOUS_COMMIT=off`. `None` in the default
1703 /// synchronous mode. Flushes the pending batch every
1704 /// `SPG_WAL_WRITER_DELAY_MS`; signalled + joined on Drop
1705 /// before the final checkpoint so clean shutdown never
1706 /// loses confirmed commits.
1707 flusher_shutdown: Option<Arc<AtomicBool>>,
1708 flusher_thread: Option<std::thread::JoinHandle<()>>,
1709 /// v7.20 P2 — group-commit WAL. Shared with WalTickets
1710 /// returned by the buffered write path so `wait()` can run
1711 /// after the engine write lock is released.
1712 wal: Arc<WalGroup>,
1713 checkpoint_threshold_bytes: u64,
1714 /// v7.1.4 — `<db_path>.spg/segments/` directory. Cold-tier
1715 /// segments produced by `freeze_oldest_to_cold` / compaction
1716 /// are persisted here as `seg_<id>.spg` files; the manifest
1717 /// at `<db_path>.spg/manifest.v10` records every active
1718 /// segment + its CRC32 so the next boot can verify + reload.
1719 cold_segments_dir: PathBuf,
1720 cold_segment_paths: BTreeMap<u32, PathBuf>,
1721 /// v7.17.0 Phase 6.2 — cross-process exclusion lock. Acquired
1722 /// via `fs::create_dir` on `<db_path>.lock` at open_path
1723 /// entry; released on Drop by `fs::remove_dir`. atomic on
1724 /// every supported platform. A second process opening the
1725 /// same path while the first is still alive hits the
1726 /// create_dir failure and returns
1727 /// `EngineError::Unsupported("database is locked by another
1728 /// process: …")`. Stale locks (process crashed mid-session)
1729 /// must be cleared via `Database::force_unlock(path)` —
1730 /// SPG can't safely fingerprint who owned a stale directory
1731 /// without a libc dep, which would violate spg-embedded's
1732 /// zero-deps charter.
1733 lock_path: PathBuf,
1734 /// CoW-2 (v7.34) — background-checkpoint worker. `None` only
1735 /// transiently inside `Drop` after the worker has been signalled
1736 /// and joined. The worker carries Arc clones of `wal` and
1737 /// `current_chunk_path`, so it can rotate the active chunk and
1738 /// reflect the new path back here even after the front-end has
1739 /// returned to the caller.
1740 checkpoint_worker: Option<CheckpointWorker>,
1741}
1742
1743impl Database {
1744 /// Open a fresh in-memory database. No WAL, no catalog
1745 /// snapshot on disk — perfect for tests + short-lived
1746 /// CLI tools.
1747 #[must_use]
1748 pub fn open_in_memory() -> Self {
1749 Self {
1750 engine: engine_with_query_byte_budget(Engine::new().with_clock(wall_clock_micros)),
1751 persistence: None,
1752 commit_lsn: AtomicU64::new(0),
1753 tx_wal: None,
1754 }
1755 }
1756
1757 /// v7.1 — Open or create a persistent database backed by
1758 /// the file at `db_path`. The WAL lives at `db_path` +
1759 /// ".wal" (e.g. `./data/spg.db` → `./data/spg.db.wal`). Boot
1760 /// path:
1761 ///
1762 /// 1. If `db_path` exists, restore the catalog snapshot.
1763 /// 2. If the WAL exists, replay every record into the
1764 /// restored engine — the same recovery story
1765 /// `spg-server` uses.
1766 /// 3. Open the WAL in append+sync mode so subsequent
1767 /// `execute()` writes durably commit (one fsync per
1768 /// mutation).
1769 ///
1770 /// `Drop` writes a final catalog snapshot + truncates the
1771 /// WAL — operators that need a sync barrier at a specific
1772 /// point use `checkpoint()` explicitly.
1773 pub fn open_path(db_path: impl AsRef<Path>) -> Result<Self, EngineError> {
1774 let db_path = db_path.as_ref().to_path_buf();
1775 // v7.19 — WAL is a directory of chunk files. Legacy
1776 // single-file path stays variable-named `wal_path` for
1777 // the backward-compat migration block below.
1778 let wal_path = {
1779 let mut p = db_path.clone();
1780 let name = p
1781 .file_name()
1782 .map(|n| {
1783 let mut s = n.to_os_string();
1784 s.push(".wal");
1785 s
1786 })
1787 .unwrap_or_else(|| std::ffi::OsString::from(".wal"));
1788 p.set_file_name(name);
1789 p
1790 };
1791 let wal_dir = wal_path.clone();
1792 if let Some(parent) = db_path.parent()
1793 && !parent.as_os_str().is_empty()
1794 {
1795 std::fs::create_dir_all(parent).map_err(io_err)?;
1796 }
1797 // v7.17.0 Phase 6.2 — acquire cross-process exclusion
1798 // lock before touching any catalog / WAL bytes. atomic
1799 // mkdir on every supported platform; a second process
1800 // opening the same path while the first is still alive
1801 // hits the create_dir failure and gets a clear error.
1802 let lock_path = {
1803 let mut p = db_path.clone();
1804 let name = p
1805 .file_name()
1806 .map(|n| {
1807 let mut s = n.to_os_string();
1808 s.push(".lock");
1809 s
1810 })
1811 .unwrap_or_else(|| std::ffi::OsString::from(".lock"));
1812 p.set_file_name(name);
1813 p
1814 };
1815 acquire_path_lock(&lock_path)?;
1816 let mut engine = if db_path.exists() {
1817 let bytes = std::fs::read(&db_path).map_err(io_err)?;
1818 let engine = Engine::restore_envelope(&bytes).map_err(|e| {
1819 EngineError::Storage(spg_storage::StorageError::Corrupt(format!(
1820 "restore from {}: {e}",
1821 db_path.display()
1822 )))
1823 })?;
1824 engine_with_query_byte_budget(engine.with_clock(wall_clock_micros))
1825 } else {
1826 engine_with_query_byte_budget(Engine::new().with_clock(wall_clock_micros))
1827 };
1828 // v7.1.4 — manifest-driven cold-segment reload. The
1829 // manifest sidecar pairs the catalog snapshot CRC with a
1830 // list of `(segment_id, path, crc32)` triples; verify
1831 // before loading so a torn or stale manifest doesn't
1832 // surface phantom data.
1833 let cold_segments_dir = {
1834 let parent = db_path.parent().unwrap_or_else(|| Path::new("."));
1835 let stem = db_path
1836 .file_stem()
1837 .unwrap_or_else(|| std::ffi::OsStr::new("db"))
1838 .to_string_lossy()
1839 .into_owned();
1840 parent.join(format!("{stem}.spg")).join("segments")
1841 };
1842 let mut cold_segment_paths: BTreeMap<u32, PathBuf> = BTreeMap::new();
1843 let manifest_pth = spg_manifest_path(&db_path);
1844 if manifest_pth.exists() && db_path.exists() {
1845 let m_bytes = std::fs::read(&manifest_pth).map_err(io_err)?;
1846 if let Ok(m) = CatalogManifest::deserialize(&m_bytes) {
1847 let snap_bytes = std::fs::read(&db_path).map_err(io_err)?;
1848 let snap_crc = spg_crypto::crc32::crc32(&snap_bytes);
1849 if snap_crc == m.catalog_crc32 {
1850 for entry in &m.cold_segments {
1851 if let Ok(seg_bytes) = std::fs::read(&entry.path) {
1852 let computed = spg_crypto::crc32::crc32(&seg_bytes);
1853 if computed != entry.crc32 {
1854 eprintln!(
1855 "spg-embedded: manifest skip segment {}: CRC mismatch",
1856 entry.segment_id
1857 );
1858 continue;
1859 }
1860 if engine.catalog().cold_segment(entry.segment_id).is_some() {
1861 // Already loaded via Catalog::clone path (shouldn't happen
1862 // since Engine::new + restore_envelope don't populate cold).
1863 continue;
1864 }
1865 let mut new_cat = engine.catalog().clone();
1866 if let Err(e) =
1867 new_cat.load_segment_bytes_at(entry.segment_id, seg_bytes)
1868 {
1869 eprintln!(
1870 "spg-embedded: manifest load segment {} failed: {e}",
1871 entry.segment_id
1872 );
1873 continue;
1874 }
1875 engine.replace_catalog(new_cat);
1876 cold_segment_paths.insert(entry.segment_id, entry.path.clone());
1877 } else {
1878 eprintln!(
1879 "spg-embedded: manifest skip segment {}: file unreadable",
1880 entry.segment_id
1881 );
1882 }
1883 }
1884 }
1885 }
1886 }
1887 // CoW-4 (v7.34) — D10 + missing-manifest fallback. Walk
1888 // `<db>.spg/segments/` and attach any `seg_<id>.spg` file that
1889 // the manifest didn't already cover (manifest absent / CRC
1890 // mismatched / a fresher freeze landed after the last
1891 // checkpoint wrote its manifest). The segment binary's own
1892 // magic + CRC32 guards integrity — no need to trust a stale
1893 // manifest entry to trust the file.
1894 scan_cold_segments_dir(&cold_segments_dir, &mut engine, &mut cold_segment_paths);
1895 // v7.19 — chunked WAL on-disk layout.
1896 //
1897 // Three cases handled here:
1898 //
1899 // 1. wal_dir exists as a DIRECTORY → scan its
1900 // `<unix_us>_<leading_lsn>.wal` chunks (sorted
1901 // lexicographically = chunk-creation order), replay
1902 // them in sequence, advance the LSN watermark to the
1903 // max commit_lsn seen.
1904 //
1905 // 2. wal_path exists as a FILE → legacy v7.18 layout.
1906 // Migrate it: create `wal_dir/`, move the single file
1907 // inside as `0000000000000000_0000000000000000.wal`,
1908 // then fall through to case 1's replay loop.
1909 //
1910 // 3. Neither exists → fresh database; create wal_dir.
1911 let mut initial_lsn: u64 = 0;
1912 if wal_path.is_file() {
1913 // Case 2: legacy single-file WAL migration.
1914 let legacy_bytes = std::fs::read(&wal_path).map_err(io_err)?;
1915 std::fs::remove_file(&wal_path).map_err(io_err)?;
1916 std::fs::create_dir_all(&wal_dir).map_err(io_err)?;
1917 if !legacy_bytes.is_empty() {
1918 let migrated = wal_dir.join(legacy_chunk_filename());
1919 std::fs::write(&migrated, &legacy_bytes).map_err(io_err)?;
1920 }
1921 } else if !wal_dir.exists() {
1922 // Case 3: fresh database.
1923 std::fs::create_dir_all(&wal_dir).map_err(io_err)?;
1924 }
1925 // Cases 1 + 2 share replay logic now that wal_dir is
1926 // guaranteed to exist (and may be empty for case 3).
1927 //
1928 // Two-pass replay so we don't double-apply records the
1929 // snapshot already reflects:
1930 //
1931 // 1. Find the highest commit_lsn carried by a
1932 // checkpoint_marker across all chunks. That LSN is the
1933 // snapshot's high-water mark — anything ≤ it is
1934 // already in `<db_path>` and replaying it would
1935 // DuplicateTable / double-insert.
1936 // 2. Replay only records strictly above that LSN.
1937 //
1938 // Case 2 migration (legacy single-file WAL) lands here
1939 // too: the migrated chunk has no marker so the LSN floor
1940 // is 0 and every record applies — exactly the v7.18
1941 // behaviour the migration is supposed to preserve.
1942 let chunk_paths = sorted_wal_chunks(&wal_dir).map_err(io_err)?;
1943 let mut snapshot_lsn: u64 = 0;
1944 for chunk in &chunk_paths {
1945 let bytes = std::fs::read(chunk).map_err(io_err)?;
1946 if let Ok(records) = parse_wal_records(&bytes) {
1947 for r in &records {
1948 if r.type_byte == WAL_V4_TYPE_CHECKPOINT_MARKER {
1949 if let Some(l) = r.commit_lsn {
1950 if l > snapshot_lsn {
1951 snapshot_lsn = l;
1952 }
1953 }
1954 }
1955 }
1956 }
1957 }
1958 let mut quarantined: Vec<QuarantinedStmt> = Vec::new();
1959 for chunk in &chunk_paths {
1960 let bytes = std::fs::read(chunk).map_err(io_err)?;
1961 if bytes.is_empty() {
1962 continue;
1963 }
1964 replay_wal_filtered(&bytes, &mut engine, snapshot_lsn, &mut quarantined)
1965 .map_err(|m| EngineError::Storage(spg_storage::StorageError::Corrupt(m)))?;
1966 if let Ok(records) = parse_wal_records(&bytes) {
1967 if let Some(max) = records.iter().filter_map(|r| r.commit_lsn).max() {
1968 if max > initial_lsn {
1969 initial_lsn = max;
1970 }
1971 }
1972 }
1973 }
1974 // v7.30.1 (mailrs round-24 ask 2) — replay rejects no longer
1975 // brick the open. Persist the rejected statements beside the
1976 // WAL chunks for forensics and say so loudly; the boot
1977 // continues with every other record applied.
1978 if !quarantined.is_empty() {
1979 let mut body = String::new();
1980 for q in &quarantined {
1981 body.push_str(&format_quarantine_line(q));
1982 }
1983 let qpath = wal_dir.join(format!(
1984 "quarantine-{:016x}.log",
1985 wall_clock_micros().max(0) as u64
1986 ));
1987 match std::fs::write(&qpath, &body) {
1988 Ok(()) => eprintln!(
1989 "spg-embedded: WAL replay quarantined {} statement(s) — boot continues; \
1990 forensics at {}",
1991 quarantined.len(),
1992 qpath.display()
1993 ),
1994 Err(e) => eprintln!(
1995 "spg-embedded: WAL replay quarantined {} statement(s) — boot continues; \
1996 quarantine file write FAILED ({e}), entries follow:\n{body}",
1997 quarantined.len()
1998 ),
1999 }
2000 }
2001 // Open the "current" chunk — either the last existing
2002 // chunk file (so subsequent appends extend it until the
2003 // size threshold rotates) or a fresh first chunk.
2004 let now_us = wall_clock_micros();
2005 let current_chunk_path = if let Some(last) = chunk_paths.last() {
2006 last.clone()
2007 } else {
2008 wal_dir.join(chunk_filename(now_us, initial_lsn + 1))
2009 };
2010 let wal_file = OpenOptions::new()
2011 .create(true)
2012 .append(true)
2013 .read(true)
2014 .open(¤t_chunk_path)
2015 .map_err(io_err)?;
2016 // Persist the (possibly freshly created) chunk's directory entry.
2017 fsync_dir(&wal_dir);
2018 let wal_len = wal_file.metadata().map_err(io_err)?.len();
2019 let wal = Arc::new(WalGroup::new(wal_file, wal_len));
2020 // v7.19 P3 — spawn retention sweep thread when the
2021 // operator opted in via SPG_PITR_RETENTION_HOURS > 0.
2022 // Otherwise stay on the v7.18 behaviour (chunks accumulate
2023 // until something else — backup-pitr archival, manual
2024 // cleanup — moves them).
2025 let retention_hours = pitr_retention_hours();
2026 let (retention_shutdown, retention_thread) = if retention_hours > 0 {
2027 let shutdown = Arc::new(AtomicBool::new(false));
2028 let shutdown_clone = Arc::clone(&shutdown);
2029 let wal_dir_clone = wal_dir.clone();
2030 let check_interval = std::time::Duration::from_secs(pitr_retention_check_sec());
2031 let archive_cmd = pitr_archive_cmd();
2032 let handle = std::thread::Builder::new()
2033 .name("spg-pitr-retention".into())
2034 .spawn(move || {
2035 retention_sweep_loop(
2036 wal_dir_clone,
2037 retention_hours,
2038 check_interval,
2039 archive_cmd,
2040 shutdown_clone,
2041 );
2042 })
2043 .map_err(io_err)?;
2044 (Some(shutdown), Some(handle))
2045 } else {
2046 (None, None)
2047 };
2048 // v7.20 — background flusher for SPG_SYNCHRONOUS_COMMIT=off.
2049 let (flusher_shutdown, flusher_thread) = if synchronous_commit_on() {
2050 (None, None)
2051 } else {
2052 let shutdown = Arc::new(AtomicBool::new(false));
2053 let shutdown_clone = Arc::clone(&shutdown);
2054 let group = Arc::clone(&wal);
2055 let interval = std::time::Duration::from_millis(wal_writer_delay_ms());
2056 let handle = std::thread::Builder::new()
2057 .name("spg-wal-flusher".into())
2058 .spawn(move || {
2059 while !shutdown_clone.load(Ordering::SeqCst) {
2060 std::thread::sleep(interval);
2061 if let Err(e) = group.flush_now() {
2062 eprintln!("spg-embedded: background WAL flush failed: {e:?}");
2063 }
2064 }
2065 // Final drain on shutdown signal.
2066 let _ = group.flush_now();
2067 })
2068 .map_err(io_err)?;
2069 (Some(shutdown), Some(handle))
2070 };
2071 // v7.34 (crash-recovery P0 #2) — arm row-level redo capture for
2072 // subsequent writes (AFTER replay, so re-executed SQL records
2073 // don't capture; 0x13 records replay via apply_redo and never do).
2074 if row_redo_enabled() {
2075 engine.set_redo_capture(true);
2076 }
2077 Ok(Self {
2078 engine,
2079 commit_lsn: AtomicU64::new(initial_lsn),
2080 tx_wal: None,
2081 persistence: Some(PersistenceCtx {
2082 db_path,
2083 wal_dir,
2084 current_chunk_path: Arc::new(Mutex::new(current_chunk_path)),
2085 wal,
2086 checkpoint_threshold_bytes: default_checkpoint_threshold_bytes(),
2087 cold_segments_dir,
2088 cold_segment_paths,
2089 lock_path,
2090 retention_shutdown,
2091 retention_thread,
2092 flusher_shutdown,
2093 flusher_thread,
2094 checkpoint_worker: Some(CheckpointWorker::spawn()),
2095 }),
2096 })
2097 }
2098
2099 /// v7.1.4 — freeze the oldest `max_rows` of `table_name`'s
2100 /// hot tier into a brand-new cold-tier segment + persist
2101 /// it to disk. Same semantics as `spg-server`'s freezer
2102 /// thread; embedded just runs the freeze synchronously on
2103 /// the caller's thread. Persistence + manifest update
2104 /// happen as part of the next `checkpoint()` (or on Drop).
2105 pub fn freeze_oldest_to_cold(
2106 &mut self,
2107 table_name: &str,
2108 index_name: &str,
2109 max_rows: usize,
2110 ) -> Result<spg_storage::FreezeReport, EngineError> {
2111 let report = self
2112 .engine
2113 .freeze_oldest_to_cold(table_name, index_name, max_rows)?;
2114 if let Some(p) = &mut self.persistence {
2115 std::fs::create_dir_all(&p.cold_segments_dir).map_err(io_err)?;
2116 let final_path = p
2117 .cold_segments_dir
2118 .join(format!("seg_{}.spg", report.segment_id));
2119 let tmp_path = p
2120 .cold_segments_dir
2121 .join(format!("seg_{}.spg.tmp", report.segment_id));
2122 std::fs::write(&tmp_path, &report.segment_bytes).map_err(io_err)?;
2123 std::fs::rename(&tmp_path, &final_path).map_err(io_err)?;
2124 p.cold_segment_paths.insert(report.segment_id, final_path);
2125 }
2126 Ok(report)
2127 }
2128
2129 /// v7.1 — override the auto-checkpoint WAL-size ceiling for
2130 /// this `Database` instance. Default is
2131 /// `SPG_EMBEDDED_CHECKPOINT_BYTES` env (4 MiB if unset); the
2132 /// setter wins. No-op when the database is in-memory.
2133 pub fn set_checkpoint_threshold_bytes(&mut self, bytes: u64) {
2134 if let Some(p) = &mut self.persistence {
2135 p.checkpoint_threshold_bytes = bytes.max(1);
2136 }
2137 }
2138
2139 /// v7.31 (memory campaign, round-26 ask 1/ask 4) — per-bucket
2140 /// memory snapshot for the embedding host. Poll it from prod to
2141 /// see where resident bytes live (rows / representation /
2142 /// indexes per table) and to drive host-side shedding before
2143 /// the kernel does it. Same numbers as the server path's
2144 /// `SELECT * FROM spg_memory_stats`.
2145 #[must_use]
2146 pub fn memory_stats(&self) -> spg_engine::MemoryStats {
2147 let mut stats = self.engine.memory_stats();
2148 // v7.31 C2 — fill in bucket D: the engine leaves `wal_bytes`
2149 // None (it has no WAL); we report the live (uncheckpointed)
2150 // WAL footprint via the same `written_len()` meter `metrics()`
2151 // reads. In-memory databases have no persistence → stays None.
2152 if let Some(p) = &self.persistence {
2153 stats.wal_bytes = Some(p.wal.written_len());
2154 }
2155 stats
2156 }
2157
2158 /// v7.1 — flush a fresh catalog snapshot to `db_path` and
2159 /// rotate the WAL. Idempotent; cheap when nothing has happened
2160 /// since the last checkpoint. No-op when the database is in-memory.
2161 ///
2162 /// CoW-2 (v7.34): the heavy half (serialize + tmp+rename + fsync +
2163 /// marker enqueue + chunk rotation) runs on a dedicated worker thread
2164 /// so the caller's engine borrow is released after the cheap capture
2165 /// step. This entry point keeps the **synchronous** contract — it
2166 /// waits for the worker to finish before returning — so existing
2167 /// callers, tests, and operator scripts see no behaviour change;
2168 /// they just pay one extra hop. The non-blocking variant lives at
2169 /// `trigger_checkpoint`, used by the auto-checkpoint hot path so
2170 /// the write that crossed `SPG_EMBEDDED_CHECKPOINT_BYTES` doesn't
2171 /// stall on disk IO.
2172 ///
2173 /// Called automatically when:
2174 /// - the WAL grows past `SPG_EMBEDDED_CHECKPOINT_BYTES` (default
2175 /// 4 MiB) at the end of an `execute()` (via `trigger_checkpoint`,
2176 /// non-blocking), and
2177 /// - `Drop` runs (synchronous; best-effort, failures logged).
2178 pub fn checkpoint(&mut self) -> Result<(), EngineError> {
2179 if self.persistence.is_none() {
2180 return Ok(());
2181 }
2182 // Drain any prior async checkpoint first so our snapshot reflects
2183 // post-it state (and so a sticky error from it surfaces here, not
2184 // smeared across the next two `wait`s).
2185 self.wait_checkpoint()?;
2186 let Some(job) = self.snapshot_checkpoint_job() else {
2187 return Ok(());
2188 };
2189 let Some(worker) = self
2190 .persistence
2191 .as_ref()
2192 .and_then(|p| p.checkpoint_worker.as_ref())
2193 else {
2194 return Ok(());
2195 };
2196 // `wait_checkpoint` above guaranteed idle; `try_enqueue` only
2197 // returns Ok(false) when busy, so we expect Ok(true) here. The
2198 // bool is dropped — we wait unconditionally to honour the sync
2199 // contract.
2200 let _ = worker.try_enqueue(job)?;
2201 self.wait_checkpoint()
2202 }
2203
2204 /// CoW-2 (v7.34) — non-blocking checkpoint trigger used by the
2205 /// auto-checkpoint hot path (`wal_after_ok` over the threshold).
2206 /// Captures the engine state under `&mut self` then signals the
2207 /// background worker and returns; the serialize / fsync / rotate
2208 /// sequence runs on the worker thread. If a checkpoint is already
2209 /// pending or in flight, the new trigger is silently dropped —
2210 /// the next threshold crossing picks up the newer state.
2211 ///
2212 /// Sticky errors from a prior async run surface here (via
2213 /// `try_enqueue`), so a failed background checkpoint still reaches
2214 /// the caller eventually rather than vanishing.
2215 fn trigger_checkpoint(&mut self) -> Result<(), EngineError> {
2216 if self.persistence.is_none() {
2217 return Ok(());
2218 }
2219 let Some(job) = self.snapshot_checkpoint_job() else {
2220 return Ok(());
2221 };
2222 let Some(worker) = self
2223 .persistence
2224 .as_ref()
2225 .and_then(|p| p.checkpoint_worker.as_ref())
2226 else {
2227 return Ok(());
2228 };
2229 let _accepted = worker.try_enqueue(job)?;
2230 Ok(())
2231 }
2232
2233 /// CoW-2 (v7.34) — block until the background checkpoint worker is
2234 /// idle. Used by sync `checkpoint()` and by Drop to ensure the final
2235 /// snapshot is durable before the process exits.
2236 fn wait_checkpoint(&self) -> Result<(), EngineError> {
2237 match self
2238 .persistence
2239 .as_ref()
2240 .and_then(|p| p.checkpoint_worker.as_ref())
2241 {
2242 Some(w) => w.wait(),
2243 None => Ok(()),
2244 }
2245 }
2246
2247 /// CoW-2 (v7.34) — capture a checkpoint job under `&mut self` (or
2248 /// `&self`, since reading from atomics + cheap clones don't mutate).
2249 /// Returns `None` if the database is in-memory.
2250 fn snapshot_checkpoint_job(&self) -> Option<CheckpointJob> {
2251 let p = self.persistence.as_ref()?;
2252 Some(CheckpointJob {
2253 snapshot: self.engine.snapshot_data(),
2254 marker_lsn: self.commit_lsn.load(Ordering::SeqCst),
2255 db_path: p.db_path.clone(),
2256 wal_dir: p.wal_dir.clone(),
2257 wal: Arc::clone(&p.wal),
2258 cold_segments: p
2259 .cold_segment_paths
2260 .iter()
2261 .map(|(&id, path)| (id, path.clone()))
2262 .collect(),
2263 current_chunk_path: Arc::clone(&p.current_chunk_path),
2264 })
2265 }
2266
2267 /// Restore a database from a previously-captured catalog
2268 /// snapshot. Pairs with `Database::snapshot()` for
2269 /// round-tripping in-memory state without going through
2270 /// the `spg-server` WAL.
2271 pub fn restore(snapshot: &[u8]) -> Result<Self, EngineError> {
2272 let engine = Engine::restore_envelope(snapshot).map_err(|e| {
2273 EngineError::Storage(spg_storage::StorageError::Corrupt(format!("restore: {e}")))
2274 })?;
2275 Ok(Self {
2276 engine,
2277 persistence: None,
2278 commit_lsn: AtomicU64::new(0),
2279 tx_wal: None,
2280 })
2281 }
2282
2283 /// Take a catalog snapshot suitable for `Database::restore`.
2284 /// The bytes are SPG's canonical catalog envelope (FILE_MAGIC
2285 /// + version + payload); round-trips through every released
2286 /// SPG version per the STABILITY contract.
2287 #[must_use]
2288 pub fn snapshot(&self) -> Vec<u8> {
2289 self.engine.snapshot()
2290 }
2291
2292 /// Execute a SQL statement and return the engine's
2293 /// `QueryResult` verbatim. Pass-through for callers that
2294 /// want to keep PG-flavoured column/row metadata.
2295 ///
2296 /// v7.1 — when the database was opened via `open_path`,
2297 /// successful mutations are appended to the WAL + fsynced
2298 /// before the call returns. A subsequent process crash will
2299 /// recover state up to the last successful return from
2300 /// `execute()`. Read-only statements (SELECT / SHOW /
2301 /// EXPLAIN / BEGIN-COMMIT-ROLLBACK / CHECKPOINT / COMPACT
2302 /// etc.) skip the WAL entirely.
2303 pub fn execute(&mut self, sql: &str) -> Result<QueryResult, EngineError> {
2304 // v7.20 P2 — single-caller convenience over the buffered
2305 // path: enqueue + immediately wait. Batch size is 1 here,
2306 // so the durability behaviour (one fsync before Ok) is
2307 // identical to v7.19. Concurrent callers go through
2308 // `execute_buffered` (AsyncDatabase does) and share the
2309 // leader's fsync.
2310 let (result, ticket) = self.execute_buffered(sql)?;
2311 if let Some(t) = ticket {
2312 t.wait()?;
2313 }
2314 Ok(result)
2315 }
2316
2317 /// v7.20 P2 — group-commit write entry. Runs the engine
2318 /// mutation + encodes/enqueues the WAL record, then RETURNS
2319 /// WITHOUT waiting for the fsync. The caller must call
2320 /// [`WalTicket::wait`] before treating the write as durable
2321 /// — crucially, the caller can (and should) drop whatever
2322 /// lock guards this `Database` first, so the next writer's
2323 /// mutation overlaps this batch's fsync.
2324 ///
2325 /// `None` ticket = nothing hit the WAL (read-only statement,
2326 /// no-op DDL, or in-memory database) — the result is final
2327 /// as returned.
2328 ///
2329 /// # Errors
2330 /// Engine errors propagate unchanged. Auto-checkpoint (when
2331 /// the active chunk crosses the threshold) runs inline and
2332 /// may surface IO errors.
2333 pub fn execute_buffered(
2334 &mut self,
2335 sql: &str,
2336 ) -> Result<(QueryResult, Option<WalTicket>), EngineError> {
2337 let result = self.engine.execute(sql)?;
2338 let modified = matches!(
2339 &result,
2340 QueryResult::CommandOk {
2341 modified_catalog: true,
2342 ..
2343 }
2344 );
2345 let ticket = self.wal_after_ok(sql, modified)?;
2346 Ok((result, ticket))
2347 }
2348
2349 /// v7.21 (round-12 polish) — post-engine WAL bookkeeping shared
2350 /// by the simple ([`Self::execute_buffered`]) and prepared
2351 /// ([`Self::execute_prepared_buffered`]) write paths. `canonical`
2352 /// is the replay text (bind-final for prepared statements);
2353 /// `modified_catalog` comes from the engine result. Three routes:
2354 ///
2355 /// - transaction control → maintain [`Self::tx_wal`]: BEGIN opens
2356 /// the buffer, COMMIT flushes it as ONE atomic
2357 /// `WAL_V4_TYPE_TX_COMMIT_SQL` record, ROLLBACK drops it,
2358 /// SAVEPOINT / ROLLBACK TO mark / truncate it. The engine has
2359 /// already accepted the statement, so this only mirrors state.
2360 /// - inside an open transaction → buffer the statement (shadow-
2361 /// catalog mutations report `modified_catalog: false`, so the
2362 /// auto-commit arm below can't see them).
2363 /// - auto-commit mutation → classic per-statement v4 record.
2364 ///
2365 /// v7.18 PITR — v4 records carry commit LSN + wall-clock micros.
2366 /// The crash window remains one BATCH: replay re-applies
2367 /// idempotently exactly as before, and a torn batch tail drops
2368 /// cleanly (same torn-write handling).
2369 fn wal_after_ok(
2370 &mut self,
2371 canonical: &str,
2372 modified_catalog: bool,
2373 ) -> Result<Option<WalTicket>, EngineError> {
2374 if self.persistence.is_none() {
2375 return Ok(None);
2376 }
2377 let mut record = None;
2378 match tx_control_kind(canonical) {
2379 Some(TxControl::Begin) => {
2380 self.tx_wal = Some(TxWalBuffer::default());
2381 }
2382 Some(TxControl::Commit) => {
2383 if let Some(buf) = self.tx_wal.take()
2384 && !buf.statements.is_empty()
2385 {
2386 let script = buf.statements.join(";\n");
2387 let lsn = self.commit_lsn.fetch_add(1, Ordering::SeqCst) + 1;
2388 record = Some(encode_v4_tx_commit(&script, lsn, wall_clock_micros()));
2389 }
2390 }
2391 Some(TxControl::Rollback) => {
2392 self.tx_wal = None;
2393 }
2394 Some(TxControl::Savepoint(name)) => {
2395 if let Some(buf) = &mut self.tx_wal {
2396 // PG name-reuse semantics: latest mark wins.
2397 buf.savepoints.retain(|(n, _)| n != &name);
2398 let mark = buf.statements.len();
2399 buf.savepoints.push((name, mark));
2400 }
2401 }
2402 Some(TxControl::RollbackToSavepoint(name)) => {
2403 if let Some(buf) = &mut self.tx_wal
2404 && let Some(pos) = buf.savepoints.iter().position(|(n, _)| n == &name)
2405 {
2406 let mark = buf.savepoints[pos].1;
2407 buf.statements.truncate(mark);
2408 // Later savepoints die with the rollback; the
2409 // target itself survives (PG keeps it
2410 // re-rollbackable).
2411 buf.savepoints.truncate(pos + 1);
2412 }
2413 }
2414 Some(TxControl::ReleaseSavepoint) => {
2415 // RELEASE folds the savepoint into the enclosing tx —
2416 // buffered statements stay. The mark also stays:
2417 // marks are only consulted by ROLLBACK TO, which the
2418 // engine validates first, so a dangling mark is
2419 // unreachable.
2420 }
2421 None => {
2422 if let Some(buf) = &mut self.tx_wal {
2423 if !sql_is_read_only(canonical) {
2424 buf.statements.push(canonical.to_string());
2425 }
2426 } else if modified_catalog && !sql_is_read_only(canonical) {
2427 let lsn = self.commit_lsn.fetch_add(1, Ordering::SeqCst) + 1;
2428 // v7.34 (crash-recovery P0 #2) — hybrid log: when
2429 // row-level redo is on and this statement produced row
2430 // changes (DML), write a physical 0x13 redo record so
2431 // replay applies it directly. A statement with no row
2432 // changes (DDL: CREATE/ALTER, never goes through
2433 // Table::insert/update/delete) drains an empty redo and
2434 // keeps the SQL record so the schema still replays.
2435 let redo = if row_redo_enabled() {
2436 self.engine.take_redo()
2437 } else {
2438 Vec::new()
2439 };
2440 record = Some(if redo.is_empty() {
2441 encode_v4_auto_commit(canonical, lsn, wall_clock_micros())
2442 } else {
2443 encode_v5_row_redo(
2444 &spg_storage::encode_redo_log(&redo),
2445 lsn,
2446 wall_clock_micros(),
2447 )
2448 });
2449 }
2450 }
2451 }
2452 let mut ticket = None;
2453 if let Some(record) = record {
2454 let p = self.persistence.as_mut().expect("checked above");
2455 let seq = p.wal.enqueue(&record);
2456 ticket = Some(WalTicket {
2457 group: Arc::clone(&p.wal),
2458 seq,
2459 });
2460 if p.wal.written_len() >= p.checkpoint_threshold_bytes {
2461 // CoW-2 (v7.34): hot path — fire-and-forget. The worker
2462 // serializes off this thread so the commit that just
2463 // crossed the threshold doesn't stall on a multi-hundred-ms
2464 // snapshot write. Any sticky error from a prior async
2465 // checkpoint surfaces here.
2466 self.trigger_checkpoint()?;
2467 }
2468 }
2469 Ok(ticket)
2470 }
2471
2472 /// v7.3.0 — typed-row variant of [`Database::query`]. Each
2473 /// row decodes into a `T: FromSpgRow` so callers don't
2474 /// pattern-match on `Value` themselves. Use [`spg_row!`] to
2475 /// generate the impl, or write it by hand.
2476 pub fn query_typed<T: FromSpgRow>(&mut self, sql: &str) -> Result<Vec<T>, EngineError> {
2477 let rows = self.query(sql)?;
2478 rows.into_iter().map(|r| T::from_spg_row(&r)).collect()
2479 }
2480
2481 /// Run a SELECT and return rows as a `Vec<Vec<Value>>` —
2482 /// strips the column-schema metadata for read-side
2483 /// ergonomics. Errors on non-Rows results (DML / DDL
2484 /// statements should go through `execute` instead).
2485 pub fn query(&mut self, sql: &str) -> Result<Vec<Vec<Value>>, EngineError> {
2486 match self.engine.execute(sql)? {
2487 QueryResult::Rows { rows, .. } => Ok(rows.into_iter().map(|r| r.values).collect()),
2488 QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
2489 "query() expects a SELECT — use execute() for DML/DDL".into(),
2490 )),
2491 // v7.5.0 — QueryResult is #[non_exhaustive]; any future
2492 // variant is not a SELECT row stream, treat as Unsupported.
2493 _ => Err(EngineError::Unsupported(
2494 "query() expects a SELECT — use execute() for DML/DDL".into(),
2495 )),
2496 }
2497 }
2498
2499 /// v7.16.0 — column-aware variant of [`Self::query`].
2500 /// Returns the column schema vec alongside the rows so
2501 /// adapters (the spg-sqlx Row impl most notably) can drive
2502 /// name + type-based column lookups. Errors on non-Rows
2503 /// results identically to `query`.
2504 pub fn query_with_columns(
2505 &mut self,
2506 sql: &str,
2507 ) -> Result<(Vec<spg_storage::ColumnSchema>, Vec<Vec<Value>>), EngineError> {
2508 match self.engine.execute(sql)? {
2509 QueryResult::Rows { columns, rows } => {
2510 Ok((columns, rows.into_iter().map(|r| r.values).collect()))
2511 }
2512 QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
2513 "query_with_columns() expects a SELECT — use execute() for DML/DDL".into(),
2514 )),
2515 _ => Err(EngineError::Unsupported(
2516 "query_with_columns() expects a SELECT — use execute() for DML/DDL".into(),
2517 )),
2518 }
2519 }
2520
2521 /// v7.16.0 — column-aware variant of
2522 /// [`Self::query_prepared`]. Same shape as
2523 /// `query_with_columns` but driven from a prepared
2524 /// statement + bound params.
2525 pub fn query_prepared_with_columns(
2526 &mut self,
2527 stmt: &Statement,
2528 params: &[Value],
2529 ) -> Result<(Vec<spg_storage::ColumnSchema>, Vec<Vec<Value>>), EngineError> {
2530 match self.engine.execute_prepared(stmt.stmt.clone(), params)? {
2531 QueryResult::Rows { columns, rows } => {
2532 Ok((columns, rows.into_iter().map(|r| r.values).collect()))
2533 }
2534 QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
2535 "query_prepared_with_columns() expects a SELECT — use execute_prepared() for DML/DDL".into(),
2536 )),
2537 _ => Err(EngineError::Unsupported(
2538 "query_prepared_with_columns() expects a SELECT — use execute_prepared() for DML/DDL".into(),
2539 )),
2540 }
2541 }
2542
2543 /// Borrow the underlying engine. Escape hatch for callers
2544 /// that need access to `spg-engine` APIs not yet surfaced
2545 /// here (transactions, EXPLAIN ANALYZE, etc.).
2546 #[must_use]
2547 pub const fn engine(&self) -> &Engine {
2548 &self.engine
2549 }
2550
2551 /// Mutable borrow of the underlying engine. Same intent as
2552 /// `engine()` but for write-side APIs (e.g. inserting
2553 /// directly through `Catalog::insert` for high-throughput
2554 /// bulk loads that bypass SQL parsing).
2555 pub const fn engine_mut(&mut self) -> &mut Engine {
2556 &mut self.engine
2557 }
2558
2559 /// v7.16.0 — parse + plan a SQL string ONCE so subsequent
2560 /// `execute_prepared` / `query_prepared` calls can re-bind
2561 /// parameters without re-parsing. The returned [`Statement`]
2562 /// is a thin handle around the AST + cached source SQL; it's
2563 /// `Clone` so the same plan can drive many bind calls
2564 /// concurrently (each call clones the AST and runs
2565 /// placeholder substitution on the clone — the cached
2566 /// plan stays intact).
2567 ///
2568 /// Plan caching follows the engine's existing version-aware
2569 /// rule: a prepared `Statement` whose statistics version
2570 /// has rolled (ANALYZE ran between prepare and execute)
2571 /// will silently re-prepare under the hood. Callers don't
2572 /// need to detect this.
2573 ///
2574 /// Placeholders in the SQL use PG's `$1`, `$2`, … convention.
2575 /// `bind`-time `Value`s are passed as a slice; arity
2576 /// mismatches surface as `EvalError::PlaceholderOutOfRange`
2577 /// at `execute_prepared` time, not here.
2578 ///
2579 /// # Errors
2580 /// Surfaces `EngineError` (parse error / plan rewrite
2581 /// failure) from the underlying `Engine::prepare`.
2582 pub fn prepare(&mut self, sql: &str) -> Result<Statement, EngineError> {
2583 // Use the cached path so repeated prepares of the same
2584 // SQL are O(1). The engine's plan cache stays shared
2585 // across all callers of this Database — a single
2586 // `PgPool`-shaped consumer (or, later, the spg-sqlx
2587 // adapter) prepares once and reaps the win on every bind.
2588 let stmt = self
2589 .engine
2590 .prepare_cached(sql)
2591 .map_err(EngineError::Parse)?;
2592 Ok(Statement {
2593 stmt,
2594 sql: sql.to_string(),
2595 })
2596 }
2597
2598 /// v7.17.0 Phase 3.P0-66 — describe a SQL string without
2599 /// executing. Returns `(parameter_oid_count, output_columns)`
2600 /// where `output_columns` is empty for non-SELECT statements
2601 /// or for SELECT shapes the describe planner can't resolve
2602 /// (JOIN / subquery / unknown table). Wraps
2603 /// `Engine::describe_prepared` so the spg-sqlx bridge can
2604 /// surface PG-shape Describe replies for
2605 /// `sqlx::query!()` compile-time validation.
2606 ///
2607 /// # Errors
2608 /// Propagates parse errors from the underlying prepare path.
2609 pub fn describe(&mut self, sql: &str) -> Result<(Vec<u32>, Vec<ColumnSchema>), EngineError> {
2610 let stmt = self
2611 .engine
2612 .prepare_cached(sql)
2613 .map_err(EngineError::Parse)?;
2614 Ok(self.engine.describe_prepared(&stmt))
2615 }
2616
2617 /// v7.16.0 — execute a prepared statement with bound
2618 /// parameters. Mirrors `Engine::execute_prepared`: clones
2619 /// the AST, substitutes `$1..$N` → `params[0..N-1]`, runs.
2620 ///
2621 /// Persistence (WAL fsync + auto-checkpoint) follows the
2622 /// same rules as `execute(sql)`: mutating statements get a
2623 /// WAL record AFTER the in-memory exec succeeds. The WAL
2624 /// record carries the substituted, bind-final SQL, so
2625 /// replay reconstructs the same row state without needing
2626 /// the original prepared `Statement` to still be alive.
2627 ///
2628 /// # Errors
2629 /// Propagates engine errors. Param arity mismatch surfaces
2630 /// as `EvalError::PlaceholderOutOfRange`.
2631 pub fn execute_prepared(
2632 &mut self,
2633 stmt: &Statement,
2634 params: &[Value],
2635 ) -> Result<QueryResult, EngineError> {
2636 let (result, ticket) = self.execute_prepared_buffered(stmt, params)?;
2637 if let Some(t) = ticket {
2638 t.wait()?;
2639 }
2640 Ok(result)
2641 }
2642
2643 /// v7.20 P2 — group-commit variant of
2644 /// [`Database::execute_prepared`]. Same contract as
2645 /// [`Database::execute_buffered`]: mutation + enqueue happen
2646 /// here; the caller waits on the ticket AFTER releasing
2647 /// whatever lock guards this `Database`.
2648 ///
2649 /// # Errors
2650 /// Engine errors propagate unchanged; inline auto-checkpoint
2651 /// may surface IO errors.
2652 pub fn execute_prepared_buffered(
2653 &mut self,
2654 stmt: &Statement,
2655 params: &[Value],
2656 ) -> Result<(QueryResult, Option<WalTicket>), EngineError> {
2657 let result = self.engine.execute_prepared(stmt.stmt.clone(), params)?;
2658 let modified = matches!(
2659 &result,
2660 QueryResult::CommandOk {
2661 modified_catalog: true,
2662 ..
2663 }
2664 );
2665 // WAL persistence on the bind-final SQL. Build the
2666 // canonical Display form by re-printing the
2667 // placeholder-substituted statement (cheap — the AST
2668 // is already in hand from execute_prepared's internal
2669 // clone) so replay's path is identical to the
2670 // simple-query path. v7.21: also when a transaction is
2671 // open — in-tx mutations report `modified_catalog: false`
2672 // but must reach the tx WAL buffer (see `wal_after_ok`).
2673 let mut ticket = None;
2674 if self.persistence.is_some()
2675 && (modified
2676 || (self.tx_wal.is_some() && !sql_is_read_only(&stmt.sql))
2677 || tx_control_kind(&stmt.sql).is_some())
2678 {
2679 let mut wal_stmt = stmt.stmt.clone();
2680 crate::wal_render_with_params(&mut wal_stmt, params);
2681 let canonical = format!("{wal_stmt}");
2682 ticket = self.wal_after_ok(&canonical, modified)?;
2683 }
2684 Ok((result, ticket))
2685 }
2686
2687 /// v7.16.0 — run a prepared SELECT with bound params and
2688 /// return rows as `Vec<Vec<Value>>`, matching `query()`
2689 /// shape. SELECTs are read-only so this never writes the
2690 /// WAL.
2691 ///
2692 /// # Errors
2693 /// Returns `Unsupported` if the prepared statement isn't a
2694 /// SELECT (use `execute_prepared` for DML/DDL).
2695 pub fn query_prepared(
2696 &mut self,
2697 stmt: &Statement,
2698 params: &[Value],
2699 ) -> Result<Vec<Vec<Value>>, EngineError> {
2700 match self.engine.execute_prepared(stmt.stmt.clone(), params)? {
2701 QueryResult::Rows { rows, .. } => Ok(rows.into_iter().map(|r| r.values).collect()),
2702 QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
2703 "query_prepared() expects a SELECT — use execute_prepared() for DML/DDL".into(),
2704 )),
2705 _ => Err(EngineError::Unsupported(
2706 "query_prepared() expects a SELECT — use execute_prepared() for DML/DDL".into(),
2707 )),
2708 }
2709 }
2710
2711 /// v7.18 — parse + plan a SQL string against a
2712 /// `CatalogSnapshot`. Mirror of [`Database::prepare`] for the
2713 /// readonly fan-out path: no writer lock taken, no WAL write,
2714 /// no plan-cache mutation. Static-on-`Self` so callers can
2715 /// dispatch against a snapshot without an `&mut Database`
2716 /// borrow — `AsyncReadHandle::prepare` in spg-embedded-tokio
2717 /// is the load-bearing consumer.
2718 ///
2719 /// # Errors
2720 /// Propagates `EngineError::Parse` from the parser.
2721 pub fn prepare_on_snapshot(
2722 snapshot: &CatalogSnapshot,
2723 sql: &str,
2724 ) -> Result<Statement, EngineError> {
2725 let stmt =
2726 spg_engine::Engine::prepare_on_snapshot(snapshot, sql).map_err(EngineError::Parse)?;
2727 Ok(Statement {
2728 stmt,
2729 sql: sql.to_string(),
2730 })
2731 }
2732
2733 /// v7.18 — execute a prepared `Statement` against a
2734 /// `CatalogSnapshot` with bound params. Mirror of
2735 /// [`Database::execute_prepared`] on the readonly path:
2736 /// writes / DDL hit `EngineError::WriteRequired`. No WAL
2737 /// write, no writer lock, multiple snapshots can run
2738 /// concurrently — the snapshot is immutable from prepare time.
2739 ///
2740 /// # Errors
2741 /// Surfaces `EngineError::WriteRequired` for non-readonly
2742 /// statements; propagates other engine errors.
2743 pub fn execute_prepared_on_snapshot(
2744 snapshot: &CatalogSnapshot,
2745 stmt: &Statement,
2746 params: &[Value],
2747 ) -> Result<QueryResult, EngineError> {
2748 spg_engine::Engine::execute_readonly_prepared_on_snapshot(
2749 snapshot,
2750 stmt.stmt.clone(),
2751 params,
2752 )
2753 }
2754
2755 /// v7.28 (round-22) — deadline-bounded variant of
2756 /// [`Database::execute_prepared_on_snapshot`]. Returns
2757 /// `EngineError::Cancelled` once the budget elapses; the
2758 /// sqlx driver uses this to keep readonly-INLINE execution
2759 /// from monopolising the caller's async runtime (four slow
2760 /// inbox queries saturated mailrs's whole tokio pool) and
2761 /// re-runs over the blocking pool on timeout.
2762 ///
2763 /// # Errors
2764 /// `EngineError::Cancelled` on budget expiry; engine errors
2765 /// otherwise.
2766 pub fn execute_prepared_on_snapshot_with_budget(
2767 snapshot: &CatalogSnapshot,
2768 stmt: &Statement,
2769 params: &[Value],
2770 budget_us: u64,
2771 ) -> Result<QueryResult, EngineError> {
2772 fn mono_now_us() -> u64 {
2773 use std::time::{SystemTime, UNIX_EPOCH};
2774 // Monotonic enough for a per-call relative budget: the
2775 // engine only compares (now - start) against the budget
2776 // within one call.
2777 SystemTime::now()
2778 .duration_since(UNIX_EPOCH)
2779 .map(|d| u64::try_from(d.as_micros()).unwrap_or(u64::MAX))
2780 .unwrap_or(0)
2781 }
2782 let deadline = mono_now_us().saturating_add(budget_us);
2783 let token = spg_engine::CancelToken::none().with_deadline(mono_now_us, deadline);
2784 spg_engine::Engine::execute_readonly_prepared_on_snapshot_with_cancel(
2785 snapshot,
2786 stmt.stmt.clone(),
2787 params,
2788 token,
2789 )
2790 }
2791
2792 /// v7.18 — describe a SQL string against a
2793 /// `CatalogSnapshot`. Mirror of [`Database::describe`] on
2794 /// the readonly path. Pure function on the snapshot's
2795 /// catalog; safe to call from any thread.
2796 ///
2797 /// # Errors
2798 /// Propagates `EngineError::Parse` from the parser.
2799 pub fn describe_on_snapshot(
2800 snapshot: &CatalogSnapshot,
2801 sql: &str,
2802 ) -> Result<(Vec<u32>, Vec<ColumnSchema>), EngineError> {
2803 let stmt =
2804 spg_engine::Engine::prepare_on_snapshot(snapshot, sql).map_err(EngineError::Parse)?;
2805 Ok(spg_engine::Engine::describe_prepared_on_snapshot(
2806 snapshot, &stmt,
2807 ))
2808 }
2809
2810 /// v7.21 (round-12 polish) — run a multi-statement SQL script
2811 /// with PG simple-query semantics: the statements execute in
2812 /// order inside ONE implicit transaction, so a mid-script error
2813 /// rolls back the whole script (PG wraps every simple-query
2814 /// message in an implicit transaction). Three exceptions, all
2815 /// PG-faithful:
2816 ///
2817 /// - a script that carries its OWN transaction control
2818 /// (BEGIN / COMMIT / …) runs statement-by-statement — the
2819 /// script owns its boundaries;
2820 /// - a script run while the caller already has a transaction
2821 /// open joins that transaction (no nested BEGIN), and the
2822 /// caller's COMMIT / ROLLBACK decides its fate;
2823 /// - a single-statement script is plain auto-commit.
2824 ///
2825 /// Returns one `QueryResult` per executed statement. This is the
2826 /// engine behind `sqlx::raw_sql` (mailrs feeds whole
2827 /// `init-schema.sql` files through it) and `spgctl import`.
2828 ///
2829 /// # Errors
2830 /// The first failing statement's error propagates after the
2831 /// implicit ROLLBACK; nothing from the script remains applied.
2832 pub fn execute_script(&mut self, sql: &str) -> Result<Vec<QueryResult>, EngineError> {
2833 let stmts = split_statements(sql);
2834 let script_owns_tx = stmts.iter().any(|s| tx_control_kind(s).is_some());
2835 let wrap = stmts.len() > 1 && !script_owns_tx && !self.engine.in_transaction();
2836 if !wrap {
2837 let mut out = Vec::with_capacity(stmts.len());
2838 for stmt in &stmts {
2839 out.push(self.execute_dump_statement(stmt)?);
2840 }
2841 return Ok(out);
2842 }
2843 self.execute("BEGIN")?;
2844 let mut out = Vec::with_capacity(stmts.len());
2845 for stmt in &stmts {
2846 match self.execute_dump_statement(stmt) {
2847 Ok(r) => out.push(r),
2848 Err(e) => {
2849 // Best-effort rollback; surface the script error.
2850 let _ = self.execute("ROLLBACK");
2851 return Err(e);
2852 }
2853 }
2854 }
2855 self.execute("COMMIT")?;
2856 Ok(out)
2857 }
2858
2859 /// v7.22 (round-13 T2) — execute one `split_statements` chunk,
2860 /// lowering a `COPY … FROM stdin;` block (statement + its data
2861 /// lines, as one chunk) to per-row INSERTs through the shared
2862 /// `spg_engine::copy` helpers. Default-format pg_dump emits
2863 /// COPY blocks, so the zero-change import promise needs this on
2864 /// the embed path; non-COPY statements pass straight through to
2865 /// [`Self::execute`]. Public so `spgctl import` can keep its
2866 /// per-statement error indexing while sharing the lowering.
2867 ///
2868 /// # Errors
2869 /// Engine errors propagate; for COPY the failing row's INSERT
2870 /// error carries the synthesized statement context.
2871 pub fn execute_dump_statement(&mut self, stmt: &str) -> Result<QueryResult, EngineError> {
2872 // Strip pg_dump's `-- Data for Name: …;` banner (it carries
2873 // semicolons of its own) before splitting head from data.
2874 let stmt_clean = strip_leading_sql_noise(stmt);
2875 let head_is_copy = stmt_clean
2876 .get(..4)
2877 .is_some_and(|p| p.eq_ignore_ascii_case("copy"));
2878 if head_is_copy
2879 && let Some((head, data)) = stmt_clean.split_once(';')
2880 && let Some(spec) = spg_engine::copy::parse_copy_from_stdin_head(head)
2881 {
2882 let mut affected: usize = 0;
2883 for line in data.lines() {
2884 // Empty fragments only occur at the chunk boundary
2885 // (the remainder of the COPY line right after `;`);
2886 // data rows are whole non-empty lines.
2887 let line = line.strip_suffix('\r').unwrap_or(line);
2888 if line.is_empty() {
2889 continue;
2890 }
2891 let values = spg_engine::copy::decode_copy_text_row(line);
2892 let insert = spg_engine::copy::build_copy_insert(
2893 &spec.table,
2894 spec.columns.as_deref(),
2895 &values,
2896 );
2897 match self.execute(&insert)? {
2898 QueryResult::CommandOk { affected: n, .. } => affected += n,
2899 _ => affected += 1,
2900 }
2901 }
2902 return Ok(QueryResult::CommandOk {
2903 affected,
2904 modified_catalog: false,
2905 });
2906 }
2907 self.execute(stmt)
2908 }
2909
2910 /// v7.2.0 — run `body` inside an implicit `BEGIN` /
2911 /// `COMMIT` pair. The body receives `&mut Database` so it
2912 /// can `execute()` / `query()` like any other code path;
2913 /// the only difference is that every write in the body
2914 /// lands inside one transaction, and a returned `Err` from
2915 /// the body triggers `ROLLBACK` before the error propagates.
2916 ///
2917 /// Nested calls are not supported — SPG's transaction
2918 /// model is single-writer with explicit `BEGIN` /
2919 /// `COMMIT` / `ROLLBACK`, and a nested `with_transaction`
2920 /// would hit `EngineError::Unsupported("nested
2921 /// transaction")` at the inner `BEGIN`.
2922 pub fn with_transaction<R, F>(&mut self, body: F) -> Result<R, EngineError>
2923 where
2924 F: FnOnce(&mut Self) -> Result<R, EngineError>,
2925 {
2926 self.execute("BEGIN")?;
2927 match body(self) {
2928 Ok(value) => {
2929 self.execute("COMMIT")?;
2930 Ok(value)
2931 }
2932 Err(e) => {
2933 // Best-effort rollback. If ROLLBACK itself
2934 // fails (rare — the engine reports it via
2935 // `Unsupported` only when there's no active
2936 // TX, which can't happen here) we surface the
2937 // original body error, not the rollback error.
2938 let _ = self.execute("ROLLBACK");
2939 Err(e)
2940 }
2941 }
2942 }
2943}
2944
2945impl Default for Database {
2946 fn default() -> Self {
2947 Self::open_in_memory()
2948 }
2949}
2950
2951/// v7.7.5 — observability snapshot returned by
2952/// [`Database::metrics`]. Plain data, no allocations beyond
2953/// what the struct itself takes; cheap to construct and
2954/// cheap to serialise.
2955#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2956#[non_exhaustive]
2957pub struct EmbeddedMetrics {
2958 /// Total live row count across every user table (hot
2959 /// tier only — cold-tier rows live in segment files).
2960 pub hot_rows: u64,
2961 /// Sum of `Table::hot_bytes` across every user table.
2962 /// Tracks against the freezer's `hot_tier_bytes` budget.
2963 pub hot_bytes: u64,
2964 /// Number of cold-tier segments registered in the catalog.
2965 /// Includes tombstoned slots (segments retired by
2966 /// compaction whose disk file may still be on disk).
2967 pub cold_segments: u64,
2968 /// User-table count (excludes any future engine-managed
2969 /// internal tables).
2970 pub tables: u64,
2971 /// WAL size at last `execute()` / `checkpoint()`. Zero
2972 /// when the database is in-memory.
2973 pub wal_bytes: u64,
2974 /// `true` when the database was opened with `open_path` —
2975 /// i.e. WAL + checkpoint persistence is active.
2976 pub persistent: bool,
2977}
2978
2979/// v7.2.1 — handle returned by `spawn_background_freezer`.
2980/// Drop signals the worker thread to wind down + joins it,
2981/// so a `Database` (or its shared `Arc<Mutex<Database>>`)
2982/// can safely drop after the handle does.
2983#[must_use = "the background freezer keeps running until this handle is dropped"]
2984#[derive(Debug)]
2985pub struct FreezerHandle {
2986 shutdown: Arc<AtomicBool>,
2987 join: Option<JoinHandle<()>>,
2988}
2989
2990impl FreezerHandle {
2991 /// v7.2.1 — request the worker stop + join. Idempotent;
2992 /// safe to call from `Drop` (which also calls it).
2993 pub fn stop(&mut self) {
2994 self.shutdown.store(true, Ordering::Release);
2995 if let Some(h) = self.join.take() {
2996 let _ = h.join();
2997 }
2998 }
2999}
3000
3001impl Drop for FreezerHandle {
3002 fn drop(&mut self) {
3003 self.stop();
3004 }
3005}
3006
3007/// v7.2.1 — knobs for `Database::spawn_background_freezer`.
3008#[derive(Debug, Clone)]
3009pub struct FreezerOptions {
3010 /// Tick interval. Worker wakes every `tick`, checks the
3011 /// catalog's `hot_tier_bytes`, and freezes if over budget.
3012 pub tick: Duration,
3013 /// Hot-tier byte budget. Exceeded → next tick freezes the
3014 /// largest table's oldest `batch_rows` rows into a new
3015 /// cold segment.
3016 pub hot_tier_bytes: u64,
3017 /// Max rows the freezer demotes per fire.
3018 pub batch_rows: usize,
3019 /// v7.7.4 — auto-compact threshold. When the catalog has
3020 /// at least this many cold segments across all tables, the
3021 /// freezer fires a compaction pass after its next freeze.
3022 /// Set to `usize::MAX` to disable auto-compact entirely;
3023 /// the default is `64`, matching the `spg-server` operating
3024 /// point for SPG_COLD_COMPACT_SEGMENT_THRESHOLD.
3025 pub compact_when_segments_exceed: usize,
3026 /// v7.7.4 — target segment size for compaction merges,
3027 /// in bytes. Default 64 MiB, mirroring `spg-server`. Small
3028 /// segments below this size are merge candidates;
3029 /// segments at or above stay untouched.
3030 pub compact_target_bytes: u64,
3031}
3032
3033impl Default for FreezerOptions {
3034 fn default() -> Self {
3035 // Match the `spg-server` freezer's default operating
3036 // point (SPG_HOT_TIER_BYTES = 4 GiB, batch 1000 rows,
3037 // tick every 1 s) so embedded behaviour is predictable
3038 // for operators familiar with the server.
3039 Self {
3040 tick: Duration::from_secs(1),
3041 hot_tier_bytes: 4 * 1024 * 1024 * 1024,
3042 batch_rows: 1000,
3043 compact_when_segments_exceed: 64,
3044 compact_target_bytes: 64 * 1024 * 1024,
3045 }
3046 }
3047}
3048
3049impl Database {
3050 /// v7.7.4 — observe the catalog's cold-segment count.
3051 /// Useful for tests + dashboards that want to verify
3052 /// auto-compaction is firing.
3053 #[must_use]
3054 pub fn cold_segment_count(&self) -> usize {
3055 self.engine.catalog().cold_segment_count()
3056 }
3057
3058 /// v7.7.5 — observability snapshot. Returns a point-in-time
3059 /// view of the engine + persistence counters. Cheap (no
3060 /// locks beyond the existing `&self` borrow), so safe to
3061 /// call from a hot metrics-scrape path.
3062 ///
3063 /// Fields mirror the operational dashboard
3064 /// [`spg-server`](https://crates.io/crates/spg-server) exposes,
3065 /// minus the network counters that don't apply to embedded.
3066 #[must_use]
3067 pub fn metrics(&self) -> EmbeddedMetrics {
3068 let cat = self.engine.catalog();
3069 let mut hot_rows: u64 = 0;
3070 let mut hot_bytes: u64 = 0;
3071 for name in cat.table_names() {
3072 if let Some(t) = cat.get(&name) {
3073 hot_rows = hot_rows.saturating_add(t.row_count() as u64);
3074 hot_bytes = hot_bytes.saturating_add(t.hot_bytes());
3075 }
3076 }
3077 let (wal_bytes, persistent) = match &self.persistence {
3078 Some(p) => (p.wal.written_len(), true),
3079 None => (0, false),
3080 };
3081 EmbeddedMetrics {
3082 hot_rows,
3083 hot_bytes,
3084 cold_segments: cat.cold_segment_count() as u64,
3085 tables: cat.table_count() as u64,
3086 wal_bytes,
3087 persistent,
3088 }
3089 }
3090
3091 /// v7.2.1 — spawn a background thread that periodically
3092 /// runs `freeze_oldest_to_cold` when the catalog-wide hot
3093 /// tier exceeds `opts.hot_tier_bytes`. The `Arc<Mutex<_>>`
3094 /// pattern matches the v7.2 sharing story: callers wrap
3095 /// their `Database` in `Arc::new(Mutex::new(db))` once,
3096 /// then clone the Arc for the worker + for foreground
3097 /// access. Return value is a handle whose `Drop` joins the
3098 /// worker.
3099 ///
3100 /// Picks the freeze target the same way `spg-server`'s
3101 /// freezer does: largest-`hot_bytes` user table with at
3102 /// least one BTree integer-PK index. Tables without a
3103 /// freezable index are skipped silently.
3104 pub fn spawn_background_freezer(
3105 db: Arc<Mutex<Database>>,
3106 opts: FreezerOptions,
3107 ) -> FreezerHandle {
3108 let shutdown = Arc::new(AtomicBool::new(false));
3109 let shutdown_for_thread = Arc::clone(&shutdown);
3110 let join = thread::Builder::new()
3111 .name("spg-embedded-freezer".into())
3112 .spawn(move || {
3113 background_freezer_loop(db, opts, shutdown_for_thread);
3114 })
3115 .expect("spawn background freezer thread");
3116 FreezerHandle {
3117 shutdown,
3118 join: Some(join),
3119 }
3120 }
3121}
3122
3123/// v7.2.1 — the freezer's main loop, factored out so the
3124/// `Database::spawn_background_freezer` path stays readable.
3125fn background_freezer_loop(
3126 db: Arc<Mutex<Database>>,
3127 opts: FreezerOptions,
3128 shutdown: Arc<AtomicBool>,
3129) {
3130 // Sleep in short slices so a shutdown request resolves
3131 // quickly (vs sleeping the full tick).
3132 let slice = Duration::from_millis(50.min(opts.tick.as_millis() as u64));
3133 let mut last_tick = std::time::Instant::now();
3134 loop {
3135 if shutdown.load(Ordering::Acquire) {
3136 return;
3137 }
3138 thread::sleep(slice);
3139 if last_tick.elapsed() < opts.tick {
3140 continue;
3141 }
3142 last_tick = std::time::Instant::now();
3143 let Ok(mut guard) = db.lock() else {
3144 return;
3145 };
3146 if guard.engine.catalog().hot_tier_bytes() <= opts.hot_tier_bytes {
3147 continue;
3148 }
3149 let Some((table, index)) = pick_freeze_target(&guard) else {
3150 continue;
3151 };
3152 let row_count = guard
3153 .engine
3154 .catalog()
3155 .get(&table)
3156 .map_or(0, spg_storage::Table::row_count);
3157 let to_freeze = opts.batch_rows.min(row_count);
3158 if to_freeze == 0 {
3159 continue;
3160 }
3161 if let Err(e) = guard.freeze_oldest_to_cold(&table, &index, to_freeze) {
3162 eprintln!("spg-embedded: background freeze on {table}.{index} failed: {e:?}");
3163 continue;
3164 }
3165 // v7.7.4 — auto-compact. If the catalog now carries
3166 // more cold segments than the configured threshold,
3167 // run a single compaction pass. Failures are reported
3168 // but don't kill the loop; the next tick will retry.
3169 let count = guard.engine.catalog().cold_segment_count();
3170 if count > opts.compact_when_segments_exceed {
3171 if let Err(e) = guard
3172 .engine
3173 .compact_cold_segments_with_target(opts.compact_target_bytes)
3174 {
3175 eprintln!(
3176 "spg-embedded: background compact failed (segments={count}, \
3177 threshold={}): {e:?}",
3178 opts.compact_when_segments_exceed,
3179 );
3180 }
3181 }
3182 }
3183}
3184
3185/// v7.2.1 — pick the highest-`hot_bytes` user table with a
3186/// BTree integer-PK index. Returns `(table, index_name)` so the
3187/// caller can dispatch through `freeze_oldest_to_cold`.
3188fn pick_freeze_target(db: &Database) -> Option<(String, String)> {
3189 let cat = db.engine.catalog();
3190 let mut best: Option<(String, String, u64)> = None;
3191 for name in cat.table_names() {
3192 let Some(t) = cat.get(&name) else { continue };
3193 if t.row_count() == 0 {
3194 continue;
3195 }
3196 let cols = &t.schema().columns;
3197 let Some(idx) = t.indices().iter().find(|i| {
3198 matches!(i.kind, spg_storage::IndexKind::BTree(_))
3199 && i.column_position < cols.len()
3200 && matches!(
3201 cols[i.column_position].ty,
3202 spg_storage::DataType::SmallInt
3203 | spg_storage::DataType::Int
3204 | spg_storage::DataType::BigInt
3205 )
3206 }) else {
3207 continue;
3208 };
3209 let hot = t.hot_bytes();
3210 match best {
3211 None => best = Some((name, idx.name.clone(), hot)),
3212 Some((_, _, best_hot)) if hot > best_hot => {
3213 best = Some((name, idx.name.clone(), hot));
3214 }
3215 _ => {}
3216 }
3217 }
3218 best.map(|(t, i, _)| (t, i))
3219}
3220
3221/// v7.7.6 — replay the first `to_seq` records of the WAL at
3222/// `wal_path` into a fresh engine and write the resulting
3223/// catalog snapshot to `out_db_path`. Same semantics as
3224/// `spg revert --wal … --to-seq N --out …` from the CLI:
3225///
3226/// - `to_seq == 0` → snapshot is the empty catalog
3227/// - WAL records beyond `to_seq` are not applied
3228/// - durability-checkpoint markers (v3 type 0x02) are
3229/// consumed without counting against the budget
3230///
3231/// Returns the number of statements actually applied
3232/// (`≤ to_seq`). The output snapshot is byte-identical to
3233/// what `Database::open_path(out_db_path)` would consume on
3234/// a subsequent open.
3235///
3236/// This is the "rewind" operator for an embedded database
3237/// that has been corrupted by a poison statement or a
3238/// half-applied migration. Pair with `cold_segment_paths`
3239/// preservation if your cold-tier files are still on disk.
3240///
3241/// # Errors
3242///
3243/// - `wal_path` unreadable or truncated mid-record
3244/// - WAL record decodes to invalid UTF-8 SQL
3245/// - WAL record's SQL is rejected by the engine
3246/// - `out_db_path` unwritable
3247pub fn revert_wal_to_seq(
3248 wal_path: impl AsRef<Path>,
3249 to_seq: u64,
3250 out_db_path: impl AsRef<Path>,
3251) -> Result<u64, EngineError> {
3252 // v7.19 — accept either a single-file legacy WAL (v7.18 and
3253 // earlier layout) or a chunked WAL directory (v7.19+). For a
3254 // directory, concatenate every `.wal` chunk in sorted order
3255 // — the same order open_path replays them in — so revert
3256 // sees the full record stream.
3257 let path = wal_path.as_ref();
3258 let wal_bytes = if path.is_dir() {
3259 let mut combined = Vec::new();
3260 let chunks = sorted_wal_chunks(path).map_err(io_err)?;
3261 for chunk in chunks {
3262 let bytes = std::fs::read(&chunk).map_err(io_err)?;
3263 combined.extend_from_slice(&bytes);
3264 }
3265 combined
3266 } else {
3267 std::fs::read(path).map_err(io_err)?
3268 };
3269 let mut engine = Engine::new();
3270 let mut applied = 0u64;
3271 let mut cur = 0usize;
3272 while cur < wal_bytes.len() && applied < to_seq {
3273 let (sql_bytes, total) = decode_wal_record(&wal_bytes[cur..])?;
3274 cur += total;
3275 if sql_bytes.is_empty() {
3276 continue;
3277 }
3278 let sql = core::str::from_utf8(&sql_bytes).map_err(|e| {
3279 EngineError::Storage(spg_storage::StorageError::Corrupt(format!(
3280 "WAL record at offset {cur}: non-UTF-8 SQL: {e}"
3281 )))
3282 })?;
3283 // v7.21 — tx-commit records carry a multi-statement script;
3284 // split_statements is a no-op for single-statement records.
3285 for stmt in split_statements(sql) {
3286 engine.execute(stmt)?;
3287 }
3288 applied += 1;
3289 }
3290 let snapshot = engine.snapshot();
3291 std::fs::write(out_db_path.as_ref(), &snapshot).map_err(io_err)?;
3292 Ok(applied)
3293}
3294
3295/// v7.7.6 — decode one WAL record from a byte tail. Returns
3296/// `(sql_bytes, header_plus_payload_len)`. Handles the three
3297/// on-disk formats (v1 / v2 / v3) the same way the CLI
3298/// `decode_one_record` and the engine's `replay_wal_bytes`
3299/// do. CRCs are not re-validated; the caller's intent is
3300/// "apply", not "validate".
3301fn decode_wal_record(tail: &[u8]) -> Result<(Vec<u8>, usize), EngineError> {
3302 if tail.len() < 4 {
3303 return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
3304 format!("WAL truncated record: {} < 4 header bytes", tail.len()),
3305 )));
3306 }
3307 let raw_len = u32::from_le_bytes(tail[..4].try_into().unwrap());
3308 let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
3309 let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
3310 let len_mask = if is_v3 {
3311 !(WAL_V2_SENTINEL | WAL_V3_FLAG)
3312 } else {
3313 !WAL_V2_SENTINEL
3314 };
3315 let rec_len = (raw_len & len_mask) as usize;
3316 let header_len = if is_v3 {
3317 9
3318 } else if is_v2 {
3319 8
3320 } else {
3321 4
3322 };
3323 if tail.len() < header_len + rec_len {
3324 return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
3325 format!(
3326 "WAL truncated record: header+payload {} > available {}",
3327 header_len + rec_len,
3328 tail.len()
3329 ),
3330 )));
3331 }
3332 if is_v3 {
3333 let type_byte = tail[8];
3334 // v3 type 0x01 = auto_commit_sql (payload = SQL).
3335 // v3 type 0x02 = durability marker (no SQL to apply).
3336 // v4 type 0x10 = auto_commit_sql with 16-byte (lsn, ts)
3337 // prefix between type and SQL — strip
3338 // the prefix so the caller still sees raw
3339 // SQL bytes.
3340 // Anything else is unknown.
3341 if type_byte == WAL_V3_TYPE_AUTO_COMMIT_SQL {
3342 let payload = &tail[header_len..header_len + rec_len];
3343 return Ok((payload.to_vec(), header_len + rec_len));
3344 }
3345 if type_byte == WAL_V4_TYPE_AUTO_COMMIT_SQL || type_byte == WAL_V4_TYPE_TX_COMMIT_SQL {
3346 let v4_total = header_len + WAL_V4_EXTRA_HEADER + rec_len;
3347 if tail.len() < v4_total {
3348 return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
3349 format!(
3350 "WAL truncated v4 record: header+payload {v4_total} > available {}",
3351 tail.len()
3352 ),
3353 )));
3354 }
3355 let sql_start = header_len + WAL_V4_EXTRA_HEADER;
3356 let sql_bytes = tail[sql_start..sql_start + rec_len].to_vec();
3357 return Ok((sql_bytes, v4_total));
3358 }
3359 // Caller treats empty payload as a skip-marker.
3360 return Ok((Vec::new(), header_len + rec_len));
3361 }
3362 let payload = &tail[header_len..header_len + rec_len];
3363 Ok((payload.to_vec(), header_len + rec_len))
3364}
3365
3366impl Drop for Database {
3367 fn drop(&mut self) {
3368 // v7.1 — best-effort final checkpoint when a persistent
3369 // Database leaves scope. Failures here go to stderr so
3370 // operators see them, but Drop can't propagate errors —
3371 // the WAL itself is already durable, so a checkpoint
3372 // miss only means the next boot replays a few more
3373 // records than strictly necessary.
3374 if self.persistence.is_some() {
3375 if let Err(e) = self.checkpoint() {
3376 eprintln!(
3377 "spg-embedded: final checkpoint on Drop failed: {e:?} \
3378 (WAL is intact; next open_path will replay)"
3379 );
3380 }
3381 }
3382 // v7.19 P3 / v7.20 — signal the retention + flusher
3383 // threads to exit, then wait for them. Done BEFORE the
3384 // lock release so background threads don't outlive the
3385 // database handle. The flusher drains the pending batch
3386 // on its way out (final flush_now in the thread body),
3387 // so `SPG_SYNCHRONOUS_COMMIT=off` never loses confirmed
3388 // commits across a clean shutdown.
3389 if let Some(ctx) = self.persistence.as_mut() {
3390 if let Some(shutdown) = ctx.retention_shutdown.take() {
3391 shutdown.store(true, Ordering::SeqCst);
3392 }
3393 if let Some(handle) = ctx.retention_thread.take() {
3394 let _ = handle.join();
3395 }
3396 if let Some(shutdown) = ctx.flusher_shutdown.take() {
3397 shutdown.store(true, Ordering::SeqCst);
3398 }
3399 if let Some(handle) = ctx.flusher_thread.take() {
3400 let _ = handle.join();
3401 }
3402 // CoW-2 (v7.34) — final checkpoint above left the worker
3403 // idle; explicitly drop it here so its shutdown signal +
3404 // thread join happens with a deterministic ordering (before
3405 // the lock release / persistence drop), not whenever Rust
3406 // happens to drop the PersistenceCtx fields.
3407 ctx.checkpoint_worker = None;
3408 }
3409 // v7.17.0 Phase 6.2 — release the cross-process lock on
3410 // clean shutdown. Failure is logged but never panics;
3411 // the operator can clear a stale lock via
3412 // `Database::force_unlock` if a crash kept the
3413 // directory around.
3414 if let Some(ctx) = &self.persistence
3415 && ctx.lock_path.exists()
3416 {
3417 // remove_dir_all: the lock dir carries the owner-pid
3418 // record since round-12.
3419 if let Err(e) = std::fs::remove_dir_all(&ctx.lock_path) {
3420 eprintln!(
3421 "spg-embedded: lock release on Drop failed for {}: {e:?}",
3422 ctx.lock_path.display()
3423 );
3424 }
3425 }
3426 }
3427}
3428
3429impl Database {
3430 /// v7.17.0 Phase 6.2 — clear a stale cross-process lock.
3431 /// Use when a previous process crashed mid-session and
3432 /// left `<db_path>.lock` behind. Operators should confirm
3433 /// no other process is currently using the database before
3434 /// calling this — SPG cannot fingerprint stale-vs-live
3435 /// without a libc dep, which would violate spg-embedded's
3436 /// zero-deps charter.
3437 pub fn force_unlock(db_path: impl AsRef<Path>) -> Result<(), EngineError> {
3438 let lock_path = {
3439 let mut p = db_path.as_ref().to_path_buf();
3440 let name = p
3441 .file_name()
3442 .map(|n| {
3443 let mut s = n.to_os_string();
3444 s.push(".lock");
3445 s
3446 })
3447 .unwrap_or_else(|| std::ffi::OsString::from(".lock"));
3448 p.set_file_name(name);
3449 p
3450 };
3451 if !lock_path.exists() {
3452 return Ok(());
3453 }
3454 std::fs::remove_dir_all(&lock_path).map_err(io_err)
3455 }
3456}
3457
3458/// v7.1 — turn a `std::io::Error` into the workspace's
3459/// `EngineError` shape. `EngineError::Storage(Corrupt(_))` is
3460/// the closest existing variant — io failures during boot or
3461/// during a WAL append surface as a storage-layer fault to
3462/// callers, which keeps the public error enum unchanged.
3463fn io_err(e: std::io::Error) -> EngineError {
3464 EngineError::Storage(spg_storage::StorageError::Corrupt(format!("io: {e}")))
3465}
3466
3467/// v7.2.2 — `Database` is `Send`, so the recommended sharing
3468/// pattern for multi-threaded callers is `Arc<Mutex<Database>>`:
3469///
3470/// ```no_run
3471/// use std::sync::{Arc, Mutex};
3472/// use spg_embedded::Database;
3473///
3474/// let db = Database::open_in_memory();
3475/// let shared = Arc::new(Mutex::new(db));
3476/// let shared_for_worker = Arc::clone(&shared);
3477/// std::thread::spawn(move || {
3478/// let mut guard = shared_for_worker.lock().unwrap();
3479/// guard.execute("INSERT INTO t VALUES (1)").unwrap();
3480/// });
3481/// ```
3482///
3483/// Internal `RwLock`-wrapped state — letting many threads
3484/// hold concurrent `&Database` for `SELECT` without contending
3485/// — is parked as STABILITY § "Out of v7.2"; multi-reader
3486/// embedded throughput needs a planner-side change to release
3487/// the engine read lock between scans, which is the v7.x
3488/// "Choice A" line of work already documented in v6.9.1's
3489/// carve-out.
3490#[allow(dead_code)]
3491fn _database_is_send() {
3492 fn assert_send<T: Send>() {}
3493 assert_send::<Database>();
3494}
3495
3496/// v6.10.3 — trait that maps a row's columns onto a user
3497/// struct's fields. v7.3.0 ships the [`spg_row!`] declarative
3498/// macro that generates `impl FromSpgRow for YourStruct` from
3499/// a struct definition (no proc-macro, no syn/quote/
3500/// proc-macro2 deps — the workspace's "0 external deps"
3501/// policy holds).
3502///
3503/// Implementors map a row's columns onto a user struct's
3504/// fields. Errors surface as `EngineError::Unsupported` so the
3505/// caller's error type stays uniform.
3506pub trait FromSpgRow: Sized {
3507 /// Decode one query result row into `Self`. Called once per
3508 /// row by [`Database::query_typed`]. The slice length equals
3509 /// the number of columns in the SELECT projection.
3510 fn from_spg_row(row: &[Value]) -> Result<Self, EngineError>;
3511}
3512
3513/// v7.3.0 — declarative macro that generates `FromSpgRow` impl
3514/// for a user struct. Avoids proc-macro deps
3515/// (syn/quote/proc-macro2) so the workspace's 0-deps policy
3516/// holds; the trade-off vs `#[derive(SpgRow)]` is that the
3517/// macro takes the entire struct definition (fields + types)
3518/// as input rather than annotating an existing struct.
3519///
3520/// ```no_run
3521/// use spg_embedded::{Database, spg_row, FromSpgRow};
3522///
3523/// spg_row! {
3524/// pub struct User {
3525/// pub id: i32,
3526/// pub name: String,
3527/// }
3528/// }
3529///
3530/// let mut db = Database::open_in_memory();
3531/// db.execute("CREATE TABLE users (id INT NOT NULL, name TEXT)").unwrap();
3532/// db.execute("INSERT INTO users VALUES (1, 'alice')").unwrap();
3533/// let users: Vec<User> = db.query_typed("SELECT id, name FROM users").unwrap();
3534/// ```
3535///
3536/// Supported field types: `i16`, `i32`, `i64`, `f32`, `f64`,
3537/// `bool`, `String`, `Vec<f32>` (for `VECTOR(N)` columns),
3538/// `Option<T>` of any of the above.
3539#[macro_export]
3540macro_rules! spg_row {
3541 (
3542 $(#[$meta:meta])*
3543 $vis:vis struct $name:ident {
3544 $(
3545 $(#[$fmeta:meta])*
3546 $fvis:vis $field:ident : $ty:ty,
3547 )*
3548 }
3549 ) => {
3550 $(#[$meta])*
3551 #[derive(Debug, Clone)]
3552 $vis struct $name {
3553 $(
3554 $(#[$fmeta])*
3555 $fvis $field : $ty,
3556 )*
3557 }
3558
3559 impl $crate::FromSpgRow for $name {
3560 fn from_spg_row(row: &[$crate::Value]) -> ::core::result::Result<Self, $crate::EngineError> {
3561 let mut __spg_row_iter = row.iter();
3562 $(
3563 let $field: $ty = {
3564 let v = __spg_row_iter
3565 .next()
3566 .ok_or_else(|| $crate::EngineError::Unsupported(
3567 ::std::format!(
3568 "spg_row! {}: missing column for field `{}`",
3569 ::core::stringify!($name),
3570 ::core::stringify!($field)
3571 )
3572 ))?;
3573 <$ty as $crate::FromSpgValue>::from_spg_value(v)
3574 .map_err(|e| $crate::EngineError::Unsupported(
3575 ::std::format!(
3576 "spg_row! {}: column `{}`: {}",
3577 ::core::stringify!($name),
3578 ::core::stringify!($field),
3579 e
3580 )
3581 ))?
3582 };
3583 )*
3584 Ok(Self { $($field,)* })
3585 }
3586 }
3587 };
3588}
3589
3590/// v7.3.0 — per-column decoder used by `spg_row!`. Surface
3591/// covers every numeric / text / bytes / bool variant in
3592/// `Value`, plus `Option<T>` for nullable columns.
3593pub trait FromSpgValue: Sized {
3594 /// Decode one cell into `Self`. The returned `&'static str`
3595 /// is a short diagnostic for type mismatches (e.g. `"expected
3596 /// integer, got TEXT"`); callers wrap it into their own
3597 /// error type.
3598 fn from_spg_value(v: &Value) -> Result<Self, &'static str>;
3599}
3600
3601macro_rules! impl_from_value_int {
3602 ($($t:ty),* $(,)?) => {
3603 $(
3604 impl FromSpgValue for $t {
3605 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3606 match v {
3607 Value::SmallInt(n) => <$t>::try_from(*n).map_err(|_| "SmallInt does not fit target int type"),
3608 Value::Int(n) => <$t>::try_from(*n).map_err(|_| "Int does not fit target int type"),
3609 Value::BigInt(n) => <$t>::try_from(*n).map_err(|_| "BigInt does not fit target int type"),
3610 Value::Null => Err("NULL in non-Option int column"),
3611 _ => Err("non-integer value in int column"),
3612 }
3613 }
3614 }
3615 )*
3616 };
3617}
3618impl_from_value_int!(i16, i32, i64);
3619
3620impl FromSpgValue for f32 {
3621 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3622 match v {
3623 Value::Float(f) => Ok(*f as f32),
3624 Value::Null => Err("NULL in non-Option float column"),
3625 _ => Err("non-float value in float column"),
3626 }
3627 }
3628}
3629
3630impl FromSpgValue for f64 {
3631 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3632 match v {
3633 Value::Float(f) => Ok(*f),
3634 Value::Null => Err("NULL in non-Option float column"),
3635 _ => Err("non-float value in float column"),
3636 }
3637 }
3638}
3639
3640impl FromSpgValue for bool {
3641 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3642 match v {
3643 Value::Bool(b) => Ok(*b),
3644 Value::Null => Err("NULL in non-Option bool column"),
3645 _ => Err("non-bool value in bool column"),
3646 }
3647 }
3648}
3649
3650impl FromSpgValue for String {
3651 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3652 match v {
3653 Value::Text(s) => Ok(s.clone()),
3654 Value::Null => Err("NULL in non-Option text column"),
3655 _ => Err("non-text value in String column"),
3656 }
3657 }
3658}
3659
3660impl FromSpgValue for Vec<f32> {
3661 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3662 match v {
3663 Value::Vector(xs) => Ok(xs.clone()),
3664 Value::Null => Err("NULL in non-Option vector column"),
3665 _ => Err("non-vector value in Vec<f32> column"),
3666 }
3667 }
3668}
3669
3670impl<T: FromSpgValue> FromSpgValue for Option<T> {
3671 fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
3672 match v {
3673 Value::Null => Ok(None),
3674 other => T::from_spg_value(other).map(Some),
3675 }
3676 }
3677}
3678
3679/// Acquire the cross-process exclusion lock at `lock_path` (atomic
3680/// `mkdir`), recording the owner pid inside. If the lock already
3681/// exists, read the recorded pid and probe liveness — a lock left
3682/// behind by a killed process (docker SIGKILL, crash) is reclaimed
3683/// automatically instead of forcing the operator to delete it by
3684/// hand (mailrs embed round-12: a restarted server came up in
3685/// degraded mode because the previous instance's lock survived).
3686/// v7.27 (mailrs round-21 B) — the prober's environment identity:
3687/// `(hostname, boot-or-container id)`. A pid is only meaningful
3688/// inside the PID namespace that recorded it; mailrs's recovery
3689/// window saw "locked by pid 1" from a STOPPED container because
3690/// the prober's pid 1 (its own init) was alive. When the lock's
3691/// identity differs from ours, liveness is UNDECIDABLE and we
3692/// refuse honestly instead of guessing in either direction.
3693fn host_identity() -> (String, String) {
3694 let hostname = std::process::Command::new("hostname")
3695 .output()
3696 .ok()
3697 .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string())
3698 .unwrap_or_default();
3699 // Linux boot id; containers share the host kernel's boot id, so
3700 // hostname (= container id by default) is the namespace
3701 // discriminator and boot id catches host reboots / pid reuse.
3702 let boot_id = std::fs::read_to_string("/proc/sys/kernel/random/boot_id")
3703 .map(|s| s.trim().to_string())
3704 .or_else(|_| {
3705 std::process::Command::new("sysctl")
3706 .args(["-n", "kern.bootsessionuuid"])
3707 .output()
3708 .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string())
3709 })
3710 .unwrap_or_default();
3711 (hostname, boot_id)
3712}
3713
3714/// v7.34 (crash-recovery P0 #2) — process start-time, to tell a reused
3715/// pid apart from a genuinely-held lock. In a container the holder is
3716/// always pid 1; `docker start` reuses the container so the NEW process
3717/// is pid 1 too, on the same host+boot id — a bare `pid_alive(1)` probe
3718/// (`ps -p 1` always succeeds) reads a dead owner's lock as live and the
3719/// engine self-deadlocks on its own catalog. The `(pid, start-time)`
3720/// pair is unique per live process within a boot: a reused pid carries a
3721/// LATER start-time, so a mismatch means the recorded owner is gone.
3722/// Linux reads `/proc/<pid>/stat` field 22 (clock ticks since boot);
3723/// `comm` (field 2) is parenthesised and may contain spaces, so fields
3724/// are taken after the LAST ')'. Other platforms return None and the
3725/// liveness check falls back to pid-alive + the self-pid reclaim. Pure
3726/// std — no libc.
3727#[cfg(target_os = "linux")]
3728fn process_start_time(pid: u32) -> Option<String> {
3729 let stat = std::fs::read_to_string(format!("/proc/{pid}/stat")).ok()?;
3730 let after = stat.rsplit_once(')').map(|(_, rest)| rest)?;
3731 // After comm: state(1) ppid(2) … starttime is the 20th token.
3732 after.split_whitespace().nth(19).map(str::to_string)
3733}
3734
3735#[cfg(not(target_os = "linux"))]
3736fn process_start_time(_pid: u32) -> Option<String> {
3737 None
3738}
3739
3740fn acquire_path_lock(lock_path: &Path) -> Result<(), EngineError> {
3741 for attempt in 0..2 {
3742 match std::fs::create_dir(lock_path) {
3743 Ok(()) => {
3744 // Best-effort owner record; liveness probing treats a
3745 // missing pid file as stale (crash between mkdir and
3746 // write is indistinguishable from an ancient lock).
3747 // v7.27 — lines 2+3 record the owner's environment
3748 // identity (hostname, boot id) so a prober in a
3749 // different namespace refuses instead of misreading
3750 // the pid. v7.34 — line 4 records the owner's process
3751 // start-time so a reused pid (container pid-1 restart)
3752 // is distinguishable from a live holder.
3753 let (host, boot) = host_identity();
3754 let start = process_start_time(std::process::id()).unwrap_or_default();
3755 let _ = std::fs::write(
3756 lock_path.join("pid"),
3757 format!("{}\n{host}\n{boot}\n{start}\n", std::process::id()),
3758 );
3759 return Ok(());
3760 }
3761 Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists && attempt == 0 => {
3762 let record = std::fs::read_to_string(lock_path.join("pid")).unwrap_or_default();
3763 let mut lines = record.lines();
3764 let owner = lines.next().and_then(|s| s.trim().parse::<u32>().ok());
3765 let lock_host = lines.next().unwrap_or("").trim().to_string();
3766 let lock_boot = lines.next().unwrap_or("").trim().to_string();
3767 let lock_start = lines.next().unwrap_or("").trim().to_string();
3768 // v7.27 — identity check BEFORE the pid probe. A pid
3769 // recorded in another namespace is undecidable both
3770 // ways (a stale lock can look held, a held lock can
3771 // look stale — the unsafe direction). Old-format
3772 // locks (pid only) keep the legacy same-host
3773 // assumption.
3774 if !lock_host.is_empty() {
3775 let (my_host, my_boot) = host_identity();
3776 let same_env = lock_host == my_host
3777 && (lock_boot.is_empty() || my_boot.is_empty() || lock_boot == my_boot);
3778 if !same_env {
3779 return Err(EngineError::Unsupported(format!(
3780 "database lock {} was taken in a different host/container \
3781 (owner: pid {} on {:?}; we are {:?}) — liveness is \
3782 undecidable from here. If you are sure the owner is gone, \
3783 call Database::force_unlock() or `spg import --force-unlock`.",
3784 lock_path.display(),
3785 owner.unwrap_or(0),
3786 lock_host,
3787 my_host
3788 )));
3789 }
3790 }
3791 // v7.34 (crash-recovery P0 #2) — pid-reuse-safe liveness.
3792 // A bare `pid_alive` self-deadlocks in a container: the
3793 // dead owner was pid 1, `docker start` reuses the container
3794 // so the prober is pid 1 too, and `ps -p 1` always succeeds.
3795 // The recorded (pid, start-time) pair settles it — the
3796 // owner is alive ONLY if its pid is alive AND its CURRENT
3797 // start-time still matches the recorded one:
3798 // - container restart: pid 1 alive, but the new pid-1's
3799 // start-time differs from the dead owner's → stale.
3800 // - genuine double-open (same live process): start-time
3801 // matches (it wrote it) → held — correctly refused, so a
3802 // second writer can't steal a live lock.
3803 // An empty/uncomparable start-time (old-format lock or a
3804 // non-Linux owner with no /proc) falls back to the
3805 // pid-alive answer (the pre-v7.34 behaviour).
3806 let owner_alive = owner.is_some_and(|p| {
3807 pid_alive(p)
3808 && match process_start_time(p) {
3809 Some(now) if !lock_start.is_empty() => now == lock_start,
3810 _ => true,
3811 }
3812 });
3813 if owner_alive {
3814 return Err(EngineError::Unsupported(format!(
3815 "database is locked by another process (pid {}): {}; \
3816 stop that process first, or call Database::force_unlock()",
3817 owner.unwrap_or(0),
3818 lock_path.display()
3819 )));
3820 }
3821 // Stale — owner pid dead, reused, or unrecorded. Reclaim.
3822 eprintln!(
3823 "spg-embedded: reclaiming stale lock {} (owner pid {:?} not a live holder)",
3824 lock_path.display(),
3825 owner
3826 );
3827 std::fs::remove_dir_all(lock_path).map_err(io_err)?;
3828 // Loop retries the create_dir; a concurrent reclaimer
3829 // winning the race surfaces as AlreadyExists on
3830 // attempt 1 below.
3831 }
3832 Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
3833 return Err(EngineError::Unsupported(format!(
3834 "database is locked by another process: {}; \
3835 stop that process first, or call Database::force_unlock()",
3836 lock_path.display()
3837 )));
3838 }
3839 Err(e) => return Err(io_err(e)),
3840 }
3841 }
3842 unreachable!("acquire_path_lock loop covers both attempts")
3843}
3844
3845/// Probe whether `pid` is a live process. Unix: `ps -p` via the
3846/// system binary (std-only — no libc dependency). `ps -p` exits 0
3847/// for ANY live pid regardless of owner; `kill -0` was rejected
3848/// here because it fails with EPERM on another user's live process,
3849/// which would read as "dead" and reclaim a held lock. Probe
3850/// failure (no `ps` binary, exec error) conservatively reports
3851/// alive so locks are never auto-reclaimed on doubt; non-unix
3852/// targets do the same.
3853#[cfg(unix)]
3854fn pid_alive(pid: u32) -> bool {
3855 match std::process::Command::new("ps")
3856 .arg("-p")
3857 .arg(pid.to_string())
3858 .stdout(std::process::Stdio::null())
3859 .stderr(std::process::Stdio::null())
3860 .status()
3861 {
3862 Ok(status) => status.success(),
3863 Err(_) => true,
3864 }
3865}
3866
3867#[cfg(not(unix))]
3868fn pid_alive(_pid: u32) -> bool {
3869 true
3870}
3871
3872/// Strip leading whitespace, `--` line comments and NON-conditional
3873/// block comments from a chunk so statement-head checks (COPY
3874/// detection most notably) see the first real token. pg_dump
3875/// prefixes every data block with a `-- Data for Name: …;` banner —
3876/// which itself contains semicolons, so head checks must run on the
3877/// stripped text. MySQL executable conditional comments (`/*!`) are
3878/// content and stay.
3879/// v7.22 — see `split_statements`' `mysql_escapes` tracking. Only
3880/// short chunks are inspected (the signal statements are one-liners;
3881/// COPY data blocks are skipped by the length guard).
3882fn note_dialect_signals(chunk: &str, mysql_escapes: &mut bool) {
3883 if chunk.len() > 4096 {
3884 return;
3885 }
3886 let lower = chunk.to_ascii_lowercase();
3887 if lower.contains("sql_mode") {
3888 *mysql_escapes = true;
3889 } else if lower.contains("standard_conforming_strings") {
3890 *mysql_escapes = lower.contains("off");
3891 }
3892}
3893
3894fn strip_leading_sql_noise(mut s: &str) -> &str {
3895 loop {
3896 let t = s.trim_start();
3897 if let Some(rest) = t.strip_prefix("--") {
3898 s = rest.split_once('\n').map_or("", |(_, r)| r);
3899 continue;
3900 }
3901 if t.starts_with("/*") && !t.starts_with("/*!") {
3902 match t.find("*/") {
3903 Some(e) => {
3904 s = &t[e + 2..];
3905 continue;
3906 }
3907 None => return "",
3908 }
3909 }
3910 return t;
3911 }
3912}
3913
3914/// Split a multi-statement SQL script into individual statements on
3915/// top-level `;`, honouring single-quoted strings (with `''`
3916/// escapes), double-quoted identifiers, dollar-quoted bodies
3917/// (`$tag$ … $tag$`), line comments (`--`) and MySQL executable
3918/// conditional comments (`/*!… */` stay statement content; plain
3919/// nested block comments don't). Chunks that contain no statement
3920/// content (whitespace / comments only) are dropped. PG's
3921/// simple-query protocol does this server-side; the embed path owns
3922/// it here.
3923///
3924/// v7.22 (mailrs round-13 gap 1) — psql meta-command lines are
3925/// dropped for client parity: a line whose first non-whitespace
3926/// byte is `\` BETWEEN statements (PG 18's pg_dump wraps scripts in
3927/// `\restrict` / `\unrestrict`) never reaches the parser, the same
3928/// way psql consumes `\`-lines client-side and never sends them. A
3929/// mid-statement backslash stays an ordinary byte — pg_dump only
3930/// emits meta-commands between statements.
3931pub fn split_statements(sql: &str) -> Vec<&str> {
3932 let bytes = sql.as_bytes();
3933 let mut stmts = Vec::new();
3934 let mut start = 0usize;
3935 let mut has_content = false;
3936 // v7.22 (round-13 T3) — stream-tracked string dialect, mirroring
3937 // the engine's session flag: a statement mentioning `sql_mode`
3938 // (mysqldump preamble, often inside `/*!…*/`) switches plain
3939 // strings to backslash-escape scanning;
3940 // `standard_conforming_strings` (pg_dump preamble) switches
3941 // back. Without this the scanner ends a MySQL `'…\'…'` literal
3942 // early and splits inside data.
3943 let mut mysql_escapes = false;
3944 let mut i = 0usize;
3945 while i < bytes.len() {
3946 match bytes[i] {
3947 b'\\' if !has_content => {
3948 // Start-of-statement `\` = psql meta-command line.
3949 // Consume through end-of-line; restart the chunk
3950 // after it so the line never lands in the output.
3951 while i < bytes.len() && bytes[i] != b'\n' {
3952 i += 1;
3953 }
3954 start = if i < bytes.len() { i + 1 } else { i };
3955 }
3956 b'\'' => {
3957 has_content = true;
3958 // PG escape-string form `E'...'` honours backslash
3959 // escapes (`E'a\';b'` is ONE literal) — detect via
3960 // the immediately-preceding standalone E/e. MySQL
3961 // dialect sessions treat EVERY plain string that way.
3962 let escape_string = mysql_escapes
3963 || (i >= 1
3964 && matches!(bytes[i - 1], b'e' | b'E')
3965 && !(i >= 2
3966 && (bytes[i - 2].is_ascii_alphanumeric() || bytes[i - 2] == b'_')));
3967 i += 1;
3968 while i < bytes.len() {
3969 if escape_string && bytes[i] == b'\\' {
3970 // Skip the escaped byte (covers \' and \\).
3971 i += 2;
3972 continue;
3973 }
3974 if bytes[i] == b'\'' {
3975 // `''` is an escaped quote inside the literal.
3976 if i + 1 < bytes.len() && bytes[i + 1] == b'\'' {
3977 i += 2;
3978 continue;
3979 }
3980 break;
3981 }
3982 i += 1;
3983 }
3984 }
3985 b'"' => {
3986 has_content = true;
3987 i += 1;
3988 while i < bytes.len() && bytes[i] != b'"' {
3989 i += 1;
3990 }
3991 }
3992 b'$' => {
3993 // Possible dollar-quote opener `$tag$` (tag may be
3994 // empty). If the shape doesn't match, it's a plain
3995 // `$` (positional param) — fall through.
3996 let tag_end = bytes[i + 1..]
3997 .iter()
3998 .position(|&b| !(b.is_ascii_alphanumeric() || b == b'_'))
3999 .map(|off| i + 1 + off);
4000 if let Some(te) = tag_end
4001 && te < bytes.len()
4002 && bytes[te] == b'$'
4003 {
4004 has_content = true;
4005 let tag = &sql[i..=te];
4006 // Find the closing `$tag$`.
4007 if let Some(close) = sql[te + 1..].find(tag) {
4008 i = te + 1 + close + tag.len();
4009 continue;
4010 }
4011 // Unterminated — consume the rest; the parser
4012 // will report it.
4013 i = bytes.len();
4014 continue;
4015 }
4016 has_content = true;
4017 }
4018 b'-' if i + 1 < bytes.len() && bytes[i + 1] == b'-' => {
4019 while i < bytes.len() && bytes[i] != b'\n' {
4020 i += 1;
4021 }
4022 }
4023 b'/' if i + 1 < bytes.len() && bytes[i + 1] == b'*' => {
4024 // v7.22 (round-13 T3) — MySQL conditional comments
4025 // `/*!40101 … */` are EXECUTABLE (mysqldump wraps
4026 // its whole preamble + DISABLE KEYS hints in them);
4027 // they must stay statement content for the engine,
4028 // not be skipped as commentary.
4029 if i + 2 < bytes.len() && bytes[i + 2] == b'!' {
4030 has_content = true;
4031 }
4032 let mut depth = 1usize;
4033 i += 2;
4034 while i < bytes.len() && depth > 0 {
4035 if bytes[i] == b'/' && i + 1 < bytes.len() && bytes[i + 1] == b'*' {
4036 depth += 1;
4037 i += 2;
4038 } else if bytes[i] == b'*' && i + 1 < bytes.len() && bytes[i + 1] == b'/' {
4039 depth -= 1;
4040 i += 2;
4041 } else {
4042 i += 1;
4043 }
4044 }
4045 continue;
4046 }
4047 b';' => {
4048 if has_content {
4049 let head = &sql[start..i];
4050 // v7.22 (round-13 T2) — a `COPY … FROM stdin;`
4051 // statement owns its following data block
4052 // through the `\.` terminator line (data lines
4053 // may contain `;`, so generic splitting would
4054 // shred them). Swallow head + data into ONE
4055 // chunk; `execute_script` lowers it to INSERTs.
4056 // pg_dump prefixes the COPY with a comment
4057 // banner — strip it before the head check.
4058 let head_clean = strip_leading_sql_noise(head);
4059 let is_copy_head = head_clean
4060 .get(..4)
4061 .is_some_and(|p| p.eq_ignore_ascii_case("copy"))
4062 && spg_engine::copy::parse_copy_from_stdin_head(head_clean).is_some();
4063 if is_copy_head {
4064 // Scan whole lines after the ';' until the
4065 // `\.` terminator (or EOF — torn dumps lose
4066 // their tail, same as psql would error).
4067 let mut j = i + 1;
4068 let data_end;
4069 loop {
4070 if j >= bytes.len() {
4071 data_end = bytes.len();
4072 break;
4073 }
4074 let line_end = sql[j..].find('\n').map_or(bytes.len(), |off| j + off);
4075 if sql[j..line_end].trim_end_matches('\r').trim() == "\\." {
4076 data_end = j;
4077 i = line_end; // bottom i += 1 skips \n
4078 break;
4079 }
4080 j = line_end + 1;
4081 }
4082 stmts.push(&sql[start..data_end]);
4083 if data_end == bytes.len() {
4084 i = bytes.len();
4085 }
4086 start = i + 1;
4087 has_content = false;
4088 i += 1;
4089 continue;
4090 }
4091 note_dialect_signals(head, &mut mysql_escapes);
4092 stmts.push(head);
4093 }
4094 start = i + 1;
4095 has_content = false;
4096 }
4097 b => {
4098 if !b.is_ascii_whitespace() {
4099 has_content = true;
4100 }
4101 }
4102 }
4103 i += 1;
4104 }
4105 if has_content {
4106 stmts.push(&sql[start..]);
4107 }
4108 stmts
4109}
4110
4111#[cfg(test)]
4112mod tests {
4113 use super::*;
4114
4115 #[test]
4116 fn split_statements_basic_and_trailing() {
4117 assert_eq!(
4118 split_statements("CREATE TABLE a (x INT); INSERT INTO a VALUES (1)"),
4119 vec!["CREATE TABLE a (x INT)", " INSERT INTO a VALUES (1)"]
4120 );
4121 // whitespace/comment-only chunks drop
4122 assert!(split_statements(" ;; -- nothing\n;").is_empty());
4123 }
4124
4125 #[test]
4126 fn split_statements_quoting_forms() {
4127 // ';' inside a plain literal, a doubled quote, an E-string
4128 // backslash escape, a quoted identifier, and a dollar-quoted
4129 // body must not split.
4130 let cases = [
4131 "INSERT INTO t VALUES ('a;b')",
4132 "INSERT INTO t VALUES ('it''s; fine')",
4133 r"INSERT INTO t VALUES (E'it\'s; fine')",
4134 "CREATE TABLE \"odd;name\" (x INT)",
4135 "DO $body$ BEGIN PERFORM 1; END $body$",
4136 "DO $$ SELECT 1; $$",
4137 ];
4138 for sql in cases {
4139 assert_eq!(split_statements(sql), vec![sql], "must stay whole: {sql}");
4140 }
4141 // ...and each still splits cleanly from a neighbour.
4142 for sql in cases {
4143 let script = format!("{sql};\nSELECT 2");
4144 assert_eq!(
4145 split_statements(&script),
4146 vec![sql, "\nSELECT 2"],
4147 "must split after: {sql}"
4148 );
4149 }
4150 }
4151
4152 #[test]
4153 fn split_statements_drops_psql_meta_lines() {
4154 // v7.22 round-13 gap 1 — PG 18 pg_dump wraps scripts in
4155 // `\restrict` / `\unrestrict`; psql parity = the lines never
4156 // reach the parser.
4157 let script = "\\restrict TOKEN123\nSELECT 1;\n\\unrestrict TOKEN123\nSELECT 2;\n\\.\n";
4158 assert_eq!(split_statements(script), vec!["SELECT 1", "SELECT 2"]);
4159 // Mid-statement backslash is NOT a meta-command.
4160 let s2 = r"SELECT E'a\\b'";
4161 assert_eq!(split_statements(s2), vec![s2]);
4162 }
4163
4164 #[test]
4165 fn split_statements_comments_hide_semicolons() {
4166 let script = "-- c1 ; still comment\nSELECT 1; /* a ; b /* nested ; */ */ SELECT 2";
4167 let got = split_statements(script);
4168 assert_eq!(got.len(), 2);
4169 assert!(got[0].contains("SELECT 1"));
4170 assert!(got[1].contains("SELECT 2"));
4171 }
4172
4173 #[test]
4174 fn in_memory_create_insert_select() {
4175 let mut db = Database::open_in_memory();
4176 db.execute("CREATE TABLE t (id INT NOT NULL, name TEXT)")
4177 .unwrap();
4178 db.execute("INSERT INTO t VALUES (1, 'alice')").unwrap();
4179 db.execute("INSERT INTO t VALUES (2, 'bob')").unwrap();
4180 let rows = db.query("SELECT id FROM t WHERE id = 1").unwrap();
4181 assert_eq!(rows.len(), 1);
4182 match &rows[0][0] {
4183 Value::Int(1) => {}
4184 other => panic!("expected Int(1), got {other:?}"),
4185 }
4186 }
4187
4188 #[test]
4189 fn query_on_non_select_errors() {
4190 let mut db = Database::open_in_memory();
4191 db.execute("CREATE TABLE t (id INT)").unwrap();
4192 let r = db.query("INSERT INTO t VALUES (1)");
4193 assert!(r.is_err(), "query() on INSERT must error");
4194 }
4195
4196 #[test]
4197 fn snapshot_roundtrip() {
4198 let mut db = Database::open_in_memory();
4199 db.execute("CREATE TABLE t (id INT NOT NULL)").unwrap();
4200 db.execute("INSERT INTO t VALUES (42)").unwrap();
4201 let bytes = db.snapshot();
4202 let mut restored = Database::restore(&bytes).unwrap();
4203 let rows = restored.query("SELECT id FROM t WHERE id = 42").unwrap();
4204 assert_eq!(rows.len(), 1);
4205 match &rows[0][0] {
4206 Value::Int(42) => {}
4207 other => panic!("expected Int(42), got {other:?}"),
4208 }
4209 }
4210
4211 #[test]
4212 fn from_spg_row_trait_shape() {
4213 struct User {
4214 _id: i32,
4215 }
4216 impl FromSpgRow for User {
4217 fn from_spg_row(row: &[Value]) -> Result<Self, EngineError> {
4218 match row.first() {
4219 Some(Value::Int(n)) => Ok(Self { _id: *n }),
4220 _ => Err(EngineError::Unsupported("bad id".into())),
4221 }
4222 }
4223 }
4224 let row = vec![Value::Int(7)];
4225 let _u = User::from_spg_row(&row).unwrap();
4226 }
4227}