Skip to main content

spg_embedded/
lib.rs

1// v7.7.2 — every public item in this crate must carry a
2// doc-comment; new code that adds a `pub` without one fails CI.
3#![deny(missing_docs)]
4
5//! # spg-embedded
6//!
7//! Ergonomic embedded-mode entry point for SPG. Wraps the
8//! `spg-engine` execution layer for in-process applications
9//! that don't want to spin up a TCP listener / fork to the
10//! `spg-server` binary.
11//!
12//! ## Quick start
13//!
14//! ```no_run
15//! use spg_embedded::Database;
16//!
17//! // On-disk, durable. WAL fsynced per commit; auto-checkpoint
18//! // at 4 MiB WAL by default.
19//! let mut db = Database::open_path("/data/app.db").unwrap();
20//! db.execute("CREATE TABLE users (id INT NOT NULL, name TEXT)").unwrap();
21//! db.execute("INSERT INTO users VALUES (1, 'alice')").unwrap();
22//! let rows = db.query("SELECT name FROM users WHERE id = 1").unwrap();
23//! for row in &rows {
24//!     println!("{:?}", row);
25//! }
26//! ```
27//!
28//! ## Production checklist (v7.5)
29//!
30//! - **Persistence**: `Database::open_path(p)` writes a
31//!   crash-consistent WAL + periodic checkpoint snapshot. The
32//!   on-disk format is byte-identical to what `spg-server`
33//!   produces, so a database can move between modes without
34//!   conversion.
35//! - **Durability**: every `execute()` that mutates calls
36//!   `fsync` before returning `Ok`. There is no group commit
37//!   in embedded mode — every commit pays one fsync. If you
38//!   need batch throughput, wrap multiple statements in
39//!   [`Database::with_transaction`] which fsyncs only at
40//!   commit.
41//! - **Concurrency**: [`Database`] is `Send` but **not** `Sync`.
42//!   Share across threads via `Arc<Mutex<Database>>`. The
43//!   single-writer model is intentional — see
44//!   [STABILITY § A1](https://github.com/lihao/spg/blob/master/STABILITY.md).
45//! - **Background work**: [`Database::spawn_background_freezer`]
46//!   moves cold rows to disk-resident segments while you keep
47//!   serving requests. It runs in a dedicated thread; drop the
48//!   returned [`FreezerHandle`] (or call `stop()`) for clean
49//!   shutdown.
50//! - **Errors**: all public enums ([`EngineError`],
51//!   [`QueryResult`], [`Value`]) are `#[non_exhaustive]`. Match
52//!   them with a wildcard arm so future v7.x releases can add
53//!   variants without breaking your code.
54//!
55//! ## Panic contract
56//!
57//! - **No `execute()` / `query()` call panics on user input.**
58//!   Malformed SQL, type mismatches, missing tables — all
59//!   return `Err(EngineError::…)`. If you observe a panic on
60//!   a user-controlled string, that is a bug; file an issue.
61//! - The library panics **only** on internal invariant
62//!   violations (e.g., catalog snapshot magic mismatch, WAL
63//!   record CRC sentinel corruption that survived the boot-
64//!   time validation). These represent silent disk corruption
65//!   and an unwind would leak inconsistent state, so the
66//!   release profile uses `panic = abort` — your host process
67//!   dies fast rather than continuing on poisoned data.
68//! - If you cannot tolerate `panic = abort`, build with
69//!   `--profile release-dbg` (keeps unwind tables) and use
70//!   `std::panic::catch_unwind` at your application boundary.
71//!
72//! ## Why a separate crate?
73//!
74//! `spg-engine` is `no_std`-compatible (vendored alloc-only).
75//! The embedded-mode entry point uses `std` (filesystem,
76//! threading), so it lives in its own crate to keep the
77//! `no_std` boundary clean.
78
79pub use spg_engine::{CatalogSnapshot, Engine, EngineError, ParsedStatement, QueryResult};
80pub use spg_storage::{ColumnSchema, DataType, Value};
81
82/// v7.16.0 — handle for a parsed-and-planned SQL statement.
83/// Hand off to [`Database::execute_prepared`] / [`Database::query_prepared`]
84/// with a `&[Value]` slice carrying the bind parameters (PG-style
85/// `$1`, `$2`, … positional). Cheap to `Clone`; the underlying AST
86/// is shared by handle copies and cloned per bind call by the
87/// engine's executor.
88///
89/// The handle holds a snapshot of the AST at prepare time. If
90/// the engine's plan cache evicts the entry between prepare and
91/// execute (e.g. ANALYZE bumps the statistics version) the
92/// stored AST keeps working — `execute_prepared` operates on
93/// the handle's clone, not the cache entry.
94#[derive(Debug, Clone)]
95pub struct Statement {
96    /// The parsed + planned AST. `spg-engine::prepare_cached`
97    /// returns it as a clone of the cached plan, so any rewrite
98    /// passes (`expand_group_by_all`, `reorder_joins`, …) have
99    /// already run.
100    pub(crate) stmt: ParsedStatement,
101    /// Original SQL source, kept for `Display` / debug only.
102    /// WAL persistence renders from the AST so a bind-time
103    /// rewrite of `$1..$N` survives replay.
104    pub(crate) sql: String,
105}
106
107impl Statement {
108    /// Borrow the original SQL source — useful for tracing and
109    /// debug logs. WAL replay does NOT use this; it serialises
110    /// the bind-final AST instead.
111    #[must_use]
112    pub fn sql(&self) -> &str {
113        &self.sql
114    }
115}
116
117/// v7.16.0 — internal WAL helper. Mirrors what
118/// `Engine::execute_prepared` does to the cloned AST so the WAL
119/// record carries the bind-final SQL text (so replay's
120/// simple-query path reconstructs the same row state without
121/// needing the original `Statement` handle to still be alive).
122/// Errors from the underlying engine helper would only fire if
123/// the bind-final stmt referenced a placeholder past the params
124/// slice — and that case has already errored in the executor
125/// above before this helper runs, so we discard the Result here.
126fn wal_render_with_params(stmt: &mut ParsedStatement, params: &[Value]) {
127    let _ = spg_engine::substitute_placeholders(stmt, params);
128}
129
130use std::collections::BTreeMap;
131use std::fs::{File, OpenOptions};
132use std::io::Write;
133use std::path::{Path, PathBuf};
134use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
135use std::sync::{Arc, Mutex};
136use std::thread::{self, JoinHandle};
137use std::time::{Duration, SystemTime, UNIX_EPOCH};
138
139/// v7.11.3 — wall-clock provider injected into every embedded
140/// `Engine`. Microseconds since the Unix epoch; clamps to
141/// `i64::MAX` if the system clock is far-future. Used by SQL's
142/// `NOW()` / `CURRENT_TIMESTAMP` / `CURRENT_DATE` rewrite layer
143/// so PG-idiomatic time queries work without the caller wiring
144/// their own clock.
145fn wall_clock_micros() -> i64 {
146    SystemTime::now()
147        .duration_since(UNIX_EPOCH)
148        .map_or(0, |d| i64::try_from(d.as_micros()).unwrap_or(i64::MAX))
149}
150
151use spg_manifest::{CatalogManifest, ColdSegmentEntry, manifest_path as spg_manifest_path};
152
153// -- v7.1 WAL format constants (mirror `spg-server`'s) ---------
154// Kept private so callers can't mis-frame records; the v3 layout
155// is the same the server uses, so a `spg-server` boot can read a
156// database an embedded process wrote and vice versa.
157const WAL_V2_SENTINEL: u32 = 0x8000_0000;
158const WAL_V3_FLAG: u32 = 0x4000_0000;
159const WAL_V3_TYPE_AUTO_COMMIT_SQL: u8 = 0x01;
160/// v7.18 — durability checkpoint marker stays at 0x02 (skipped on replay).
161const WAL_V3_TYPE_DURABILITY_CHECKPOINT: u8 = 0x02;
162/// v7.18 PITR — auto-commit-sql record with appended (commit_lsn,
163/// commit_unix_us) fields so replay can target a specific point in
164/// time. Backward-compat: v3 records (type 0x01) keep working, the
165/// envelope flag bits are unchanged. The new type byte is the
166/// schema-version discriminator.
167const WAL_V4_TYPE_AUTO_COMMIT_SQL: u8 = 0x10;
168/// v7.18 — sentinel for "no wall clock" inside a v4 record's
169/// commit_unix_us slot. Restore-to-timestamp skips records with
170/// this sentinel (no time anchor); LSN-based restore is
171/// unaffected.
172const WAL_V4_NO_CLOCK: i64 = i64::MIN;
173/// v7.18 — extra header bytes after the type byte in a v4 record:
174/// 8 bytes commit_lsn (u64 LE) + 8 bytes commit_unix_us (i64 LE).
175const WAL_V4_EXTRA_HEADER: usize = 16;
176/// v7.18 PITR — checkpoint anchor record written to the WAL *before*
177/// the snapshot file replaces the on-disk catalog. Carries the
178/// (lsn, ts, snapshot_path) triple so restore tooling can find the
179/// matching base snapshot without scanning the filesystem. Replay
180/// dispatch skips it (same as the v3 durability marker).
181const WAL_V4_TYPE_CHECKPOINT_MARKER: u8 = 0x11;
182
183/// v7.1 — auto-checkpoint threshold. Once the WAL grows past
184/// this many bytes, the next successful `execute()` call ends
185/// with a `checkpoint()` so the WAL stays bounded. Tunable via
186/// `SPG_EMBEDDED_CHECKPOINT_BYTES` env.
187fn default_checkpoint_threshold_bytes() -> u64 {
188    std::env::var("SPG_EMBEDDED_CHECKPOINT_BYTES")
189        .ok()
190        .and_then(|s| s.parse::<u64>().ok())
191        .filter(|&n| n > 0)
192        .unwrap_or(4 * 1024 * 1024)
193}
194
195/// v7.1 — encode one v3 `auto_commit_sql` record. Layout:
196///
197/// ```text
198/// [u32 LE (len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
199/// [u32 LE crc32 over (type_byte || sql_bytes)]
200/// [u8 type = 0x01]
201/// [sql bytes]
202/// ```
203fn encode_v3_auto_commit(sql: &str) -> Vec<u8> {
204    let payload = sql.as_bytes();
205    let mut crc_buf = Vec::with_capacity(1 + payload.len());
206    crc_buf.push(WAL_V3_TYPE_AUTO_COMMIT_SQL);
207    crc_buf.extend_from_slice(payload);
208    let crc = spg_crypto::crc32::crc32(&crc_buf);
209    let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
210    let mut out = Vec::with_capacity(4 + 4 + 1 + payload.len());
211    out.extend_from_slice(&header);
212    out.extend_from_slice(&crc.to_le_bytes());
213    out.push(WAL_V3_TYPE_AUTO_COMMIT_SQL);
214    out.extend_from_slice(payload);
215    out
216}
217
218/// v7.20 P2 — WAL group-commit. N concurrent commits share one
219/// fsync (the 4.2 ms p50 that profile_breakdown measured as
220/// 99.2% of the durable write path).
221///
222/// Leader-follower protocol, same family as PG's group commit:
223///
224/// 1. `enqueue(record)` — called while the caller still holds
225///    the engine's write lock. Appends the encoded record to the
226///    shared buffer, returns a sequence ticket. O(memcpy).
227/// 2. Caller RELEASES the engine write lock (the next writer's
228///    mutation proceeds in parallel with this batch's fsync).
229/// 3. `wait_flushed(seq)` — if nobody is flushing, the caller
230///    elects itself leader: swaps the buffer out, writes +
231///    fsyncs ONCE for every record in the batch, marks the
232///    batch durable, wakes all followers. Otherwise it parks on
233///    the condvar until a leader covers its seq.
234///
235/// Durability contract is unchanged from v7.19: `execute()`
236/// does not return Ok until the record that describes its
237/// mutation is fsynced. The only change is N callers sharing
238/// one fsync instead of paying one each.
239///
240/// Lock order (deadlock-free): `state` then `file`; never the
241/// reverse. The leader holds `file` WITHOUT `state` during IO so
242/// enqueues continue while fsync runs.
243#[derive(Debug)]
244struct WalGroup {
245    state: Mutex<WalGroupState>,
246    cond: std::sync::Condvar,
247    /// Active chunk file handle. Separate lock from `state` so
248    /// the leader's write+fsync doesn't block concurrent
249    /// enqueues. Swapped by `checkpoint()` at rotation.
250    file: Mutex<File>,
251}
252
253#[derive(Debug)]
254struct WalGroupState {
255    /// Encoded records awaiting flush.
256    buf: Vec<u8>,
257    /// Monotonic enqueue counter (1-based).
258    enqueued_seq: u64,
259    /// Highest seq whose record is fsynced.
260    flushed_seq: u64,
261    /// True while some caller is inside the leader IO section.
262    leader_active: bool,
263    /// Sticky fatal error — a failed fsync poisons the WAL
264    /// (loud, never silent). All current + future waiters error.
265    failed: Option<String>,
266    /// Bytes written to the active chunk since rotation —
267    /// drives the auto-checkpoint trigger.
268    written_len: u64,
269}
270
271/// Ticket returned by the buffered write path; `wait()` blocks
272/// until the record it covers is durable (or the WAL is
273/// poisoned). Cheap to move across threads.
274#[derive(Debug)]
275pub struct WalTicket {
276    group: Arc<WalGroup>,
277    seq: u64,
278}
279
280impl WalGroup {
281    fn new(file: File, initial_len: u64) -> Self {
282        Self {
283            state: Mutex::new(WalGroupState {
284                buf: Vec::new(),
285                enqueued_seq: 0,
286                flushed_seq: 0,
287                leader_active: false,
288                failed: None,
289                written_len: initial_len,
290            }),
291            cond: std::sync::Condvar::new(),
292            file: Mutex::new(file),
293        }
294    }
295
296    /// Append `record` to the pending batch. Returns the seq the
297    /// caller must wait on. Called under the engine write lock —
298    /// keep it O(memcpy).
299    fn enqueue(&self, record: &[u8]) -> u64 {
300        let mut g = self.state.lock().expect("wal state poisoned");
301        g.buf.extend_from_slice(record);
302        g.enqueued_seq += 1;
303        g.enqueued_seq
304    }
305
306    /// Block until `seq` is durable. Leader-follower: the first
307    /// arriving waiter flushes for everyone.
308    fn wait_flushed(&self, seq: u64) -> Result<(), EngineError> {
309        let mut g = self.state.lock().expect("wal state poisoned");
310        loop {
311            if let Some(e) = &g.failed {
312                return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
313                    format!("WAL poisoned by earlier flush failure: {e}"),
314                )));
315            }
316            if g.flushed_seq >= seq {
317                return Ok(());
318            }
319            if !g.leader_active {
320                // Elect self leader.
321                g.leader_active = true;
322                drop(g);
323                // v7.20 — commit_delay (PG's same-named knob):
324                // before taking the batch, give in-flight
325                // writers a short window to enqueue so the
326                // shared fsync covers more commits. 150 µs costs
327                // ~3.5% on a solo 4.2 ms fsync but multiplies
328                // batch size under load. Tunable via
329                // SPG_COMMIT_DELAY_US (0 disables).
330                let delay = commit_delay_us();
331                if delay > 0 {
332                    std::thread::sleep(std::time::Duration::from_micros(delay));
333                }
334                let (batch, flush_to) = {
335                    let mut g2 = self.state.lock().expect("wal state poisoned");
336                    (core::mem::take(&mut g2.buf), g2.enqueued_seq)
337                };
338                let io_result: std::io::Result<()> = (|| {
339                    let mut f = self.file.lock().expect("wal file poisoned");
340                    f.write_all(&batch)?;
341                    f.sync_data()
342                })();
343                g = self.state.lock().expect("wal state poisoned");
344                g.leader_active = false;
345                match io_result {
346                    Ok(()) => {
347                        g.flushed_seq = flush_to;
348                        g.written_len = g.written_len.saturating_add(batch.len() as u64);
349                    }
350                    Err(e) => {
351                        g.failed = Some(e.to_string());
352                    }
353                }
354                self.cond.notify_all();
355                //
356
357                // Loop continues: either our seq is now covered
358                // (leader path normally returns next iteration)
359                // or the error branch surfaces.
360                continue;
361            }
362            g = self.cond.wait(g).expect("wal condvar poisoned");
363        }
364    }
365
366    /// Drain the pending batch + flush synchronously. Caller must
367    /// guarantee no concurrent enqueues (checkpoint holds the
368    /// engine exclusively). Used before rotation so the marker
369    /// lands in the right chunk.
370    fn flush_now(&self) -> Result<(), EngineError> {
371        let mut g = self.state.lock().expect("wal state poisoned");
372        if let Some(e) = &g.failed {
373            return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
374                format!("WAL poisoned: {e}"),
375            )));
376        }
377        let batch = core::mem::take(&mut g.buf);
378        let flush_to = g.enqueued_seq;
379        if batch.is_empty() {
380            return Ok(());
381        }
382        drop(g);
383        let io: std::io::Result<()> = (|| {
384            let mut f = self.file.lock().expect("wal file poisoned");
385            f.write_all(&batch)?;
386            f.sync_data()
387        })();
388        let mut g = self.state.lock().expect("wal state poisoned");
389        match io {
390            Ok(()) => {
391                g.flushed_seq = flush_to;
392                g.written_len = g.written_len.saturating_add(batch.len() as u64);
393                self.cond.notify_all();
394                Ok(())
395            }
396            Err(e) => {
397                g.failed = Some(e.to_string());
398                self.cond.notify_all();
399                Err(io_err(e))
400            }
401        }
402    }
403
404    /// Swap the active chunk handle (rotation). Caller flushes
405    /// first; both locks taken in canonical order.
406    fn rotate_file(&self, new_file: File) {
407        let mut g = self.state.lock().expect("wal state poisoned");
408        let mut f = self.file.lock().expect("wal file poisoned");
409        *f = new_file;
410        g.written_len = 0;
411    }
412
413    fn written_len(&self) -> u64 {
414        let g = self.state.lock().expect("wal state poisoned");
415        g.written_len + g.buf.len() as u64
416    }
417}
418
419impl WalTicket {
420    /// Block until the record this ticket covers is durable.
421    ///
422    /// Under `SPG_SYNCHRONOUS_COMMIT=off` this returns
423    /// immediately — the background flusher (or the next
424    /// checkpoint / clean shutdown) makes the record durable
425    /// within `SPG_WAL_WRITER_DELAY_MS`. Same contract as PG's
426    /// `synchronous_commit = off`.
427    ///
428    /// # Errors
429    /// Surfaces the leader's IO error if the batch flush failed
430    /// (the WAL is then poisoned for all subsequent writes).
431    pub fn wait(&self) -> Result<(), EngineError> {
432        if !synchronous_commit_on() {
433            return Ok(());
434        }
435        self.group.wait_flushed(self.seq)
436    }
437}
438
439/// v7.19 P3 — retention sweep loop. Runs in a dedicated thread
440/// spawned by `Database::open_path` when `SPG_PITR_RETENTION_HOURS`
441/// is set to a non-zero value. Wakes every
442/// `SPG_PITR_RETENTION_CHECK_SEC` (default 60 s), enumerates chunks
443/// under `wal_dir`, archives via `SPG_PITR_ARCHIVE_CMD` if set, and
444/// deletes anything older than `retention_hours`.
445///
446/// Loud-failure posture matches PG's `archive_command`: if the
447/// archive command returns non-zero, the chunk stays on disk and
448/// a warning prints to stderr. The retention sweep doesn't delete
449/// a chunk it failed to archive.
450fn retention_sweep_loop(
451    wal_dir: PathBuf,
452    retention_hours: u64,
453    check_interval: std::time::Duration,
454    archive_cmd: Option<String>,
455    shutdown: Arc<AtomicBool>,
456) {
457    while !shutdown.load(Ordering::SeqCst) {
458        if let Err(e) = retention_sweep_once(&wal_dir, retention_hours, archive_cmd.as_deref()) {
459            eprintln!("spg-embedded: retention sweep error: {e}");
460        }
461        // Sleep in short ticks so shutdown isn't blocked on a
462        // 60 s naptime when Drop signals.
463        let mut elapsed = std::time::Duration::ZERO;
464        let tick = std::time::Duration::from_millis(250);
465        while elapsed < check_interval {
466            if shutdown.load(Ordering::SeqCst) {
467                return;
468            }
469            std::thread::sleep(tick);
470            elapsed += tick;
471        }
472    }
473}
474
475/// v7.19 P3 — one retention sweep pass over `wal_dir`. Extracted
476/// from the loop so tests can drive it directly. Public so the
477/// e2e_pitr_retention integration test (and any future operator
478/// tooling that wants synchronous retention) can call it.
479pub fn retention_sweep_once(
480    wal_dir: &Path,
481    retention_hours: u64,
482    archive_cmd: Option<&str>,
483) -> std::io::Result<()> {
484    if !wal_dir.exists() {
485        return Ok(());
486    }
487    let now_us = wall_clock_micros();
488    let cutoff_us = (now_us as i128 - (retention_hours as i128 * 3_600 * 1_000_000)) as i64;
489    let chunks = sorted_wal_chunks(wal_dir)?;
490    for chunk in chunks {
491        // Don't sweep the most-recent chunk; it's the live one
492        // execute() is appending to. Compare against the largest
493        // filename-prefix unix_us.
494        let stem = match chunk.file_stem().and_then(|s| s.to_str()) {
495            Some(s) => s,
496            None => continue,
497        };
498        let chunk_us: i64 = stem
499            .split_once('_')
500            .and_then(|(prefix, _)| i64::from_str_radix(prefix, 16).ok())
501            .unwrap_or(0);
502        if chunk_us >= cutoff_us {
503            continue;
504        }
505        // Archive first if requested.
506        if let Some(cmd) = archive_cmd {
507            if !cmd.is_empty() {
508                let output = std::process::Command::new("sh")
509                    .arg("-c")
510                    .arg(cmd)
511                    .arg("--")
512                    .arg(&chunk)
513                    .output()?;
514                if !output.status.success() {
515                    eprintln!(
516                        "spg-embedded: SPG_PITR_ARCHIVE_CMD failed for {} (exit {}); chunk stays on disk",
517                        chunk.display(),
518                        output.status.code().unwrap_or(-1)
519                    );
520                    continue;
521                }
522            }
523        }
524        // Delete the chunk + its sibling .checksum if present.
525        if let Err(e) = std::fs::remove_file(&chunk) {
526            eprintln!(
527                "spg-embedded: retention remove {} failed: {e}",
528                chunk.display()
529            );
530            continue;
531        }
532        let mut cs = chunk.clone();
533        let mut name = cs.file_name().map(|n| n.to_os_string()).unwrap_or_default();
534        name.push(".checksum");
535        cs.set_file_name(name);
536        let _ = std::fs::remove_file(&cs);
537    }
538    Ok(())
539}
540
541/// v7.20 — group-commit delay window in µs (PG `commit_delay`
542/// analogue). The flush leader sleeps this long before taking
543/// the batch so concurrent writers pile in. Default 150 µs;
544/// `SPG_COMMIT_DELAY_US=0` disables.
545fn commit_delay_us() -> u64 {
546    static CACHED: std::sync::OnceLock<u64> = std::sync::OnceLock::new();
547    *CACHED.get_or_init(|| {
548        std::env::var("SPG_COMMIT_DELAY_US")
549            .ok()
550            .and_then(|s| s.parse::<u64>().ok())
551            .unwrap_or(150)
552    })
553}
554
555/// v7.20 — PG `synchronous_commit` analogue. `on` (default):
556/// `execute()` blocks until its WAL record is fsynced —
557/// zero-loss durability. `off`: `execute()` returns after the
558/// in-memory mutation + WAL enqueue; a background flusher
559/// thread writes + fsyncs every `SPG_WAL_WRITER_DELAY_MS`
560/// (default 200 ms — PG's `wal_writer_delay` default). Crash
561/// window = up to one flush interval of confirmed-but-unsynced
562/// commits — exactly the trade PG documents for the same
563/// setting. Clean shutdown (Drop / checkpoint) always flushes.
564fn synchronous_commit_on() -> bool {
565    static CACHED: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
566    *CACHED.get_or_init(|| {
567        !std::env::var("SPG_SYNCHRONOUS_COMMIT")
568            .map(|v| v.eq_ignore_ascii_case("off") || v == "0" || v.eq_ignore_ascii_case("false"))
569            .unwrap_or(false)
570    })
571}
572
573/// v7.20 — background WAL flusher cadence for
574/// `SPG_SYNCHRONOUS_COMMIT=off` (PG `wal_writer_delay`).
575fn wal_writer_delay_ms() -> u64 {
576    static CACHED: std::sync::OnceLock<u64> = std::sync::OnceLock::new();
577    *CACHED.get_or_init(|| {
578        std::env::var("SPG_WAL_WRITER_DELAY_MS")
579            .ok()
580            .and_then(|s| s.parse::<u64>().ok())
581            .filter(|&n| n > 0)
582            .unwrap_or(200)
583    })
584}
585
586fn pitr_retention_hours() -> u64 {
587    std::env::var("SPG_PITR_RETENTION_HOURS")
588        .ok()
589        .and_then(|s| s.parse::<u64>().ok())
590        .unwrap_or(0)
591}
592
593fn pitr_retention_check_sec() -> u64 {
594    std::env::var("SPG_PITR_RETENTION_CHECK_SEC")
595        .ok()
596        .and_then(|s| s.parse::<u64>().ok())
597        .filter(|&n| n > 0)
598        .unwrap_or(60)
599}
600
601fn pitr_archive_cmd() -> Option<String> {
602    std::env::var("SPG_PITR_ARCHIVE_CMD")
603        .ok()
604        .filter(|s| !s.is_empty())
605}
606
607/// v7.19 — replay every record from `wal_bytes` whose
608/// `commit_lsn` is strictly greater than `floor_lsn`. v3 records
609/// (no LSN) and v4 records with `commit_lsn <= floor_lsn` are
610/// skipped — the snapshot loaded ahead of this call already
611/// reflects them, and re-applying would DuplicateTable /
612/// double-insert. v3 records inside the legacy migration chunk
613/// always apply because the migration sets `floor_lsn = 0` and
614/// v3 records carry no LSN to compare; the pre-migration
615/// behaviour (every record replays) is what the migration
616/// preserves.
617///
618/// Returns the count of records successfully applied. Same
619/// torn-tail semantics as `replay_wal_into_engine`.
620fn replay_wal_filtered(
621    wal_bytes: &[u8],
622    engine: &mut Engine,
623    floor_lsn: u64,
624) -> Result<usize, String> {
625    let records = parse_wal_records(wal_bytes)?;
626    let mut applied = 0usize;
627    for r in &records {
628        // Skip markers + non-SQL records.
629        if r.type_byte == WAL_V3_TYPE_DURABILITY_CHECKPOINT
630            || r.type_byte == WAL_V4_TYPE_CHECKPOINT_MARKER
631        {
632            continue;
633        }
634        // v4 SQL records carry an LSN. Apply iff strictly above
635        // the snapshot floor.
636        if r.type_byte == WAL_V4_TYPE_AUTO_COMMIT_SQL {
637            if let Some(lsn) = r.commit_lsn {
638                if lsn <= floor_lsn {
639                    continue;
640                }
641            }
642        }
643        // v3 records (type 0x01, no LSN) always apply — the
644        // legacy migration path is the only place they appear,
645        // and floor_lsn=0 there.
646        let sql = match std::str::from_utf8(r.sql) {
647            Ok(s) => s,
648            Err(e) => return Err(format!("non-UTF-8 SQL at offset {}: {e}", r.offset)),
649        };
650        engine.execute(sql).map_err(|e| {
651            format!(
652                "WAL replay: apply {sql:?} at offset {} rejected: {e:?}",
653                r.offset
654            )
655        })?;
656        applied += 1;
657    }
658    Ok(applied)
659}
660
661/// v7.19 — WAL chunk filename format. Zero-padded 16-digit
662/// hex on both parts so default lexicographic sort matches
663/// numeric order, with the unix_us prefix coming first so
664/// the on-disk listing is chronological too.
665fn chunk_filename(unix_us: i64, leading_lsn: u64) -> String {
666    // Negative timestamps shouldn't happen in practice (we sit
667    // post-1970), but clamp to 0 so the zero-padded
668    // representation stays sortable.
669    let us = unix_us.max(0) as u64;
670    format!("{us:016x}_{leading_lsn:016x}.wal")
671}
672
673/// v7.19 — filename used for the legacy single-file WAL when
674/// `open_path` migrates a v7.18-layout database into the new
675/// chunk directory. Lexicographically smallest possible value
676/// so subsequent chunks sort after it.
677fn legacy_chunk_filename() -> String {
678    chunk_filename(0, 0)
679}
680
681/// v7.19 — list every `.wal` file in `wal_dir` in
682/// lexicographic order (which doubles as chunk-creation
683/// order thanks to the zero-padded filename format).
684fn sorted_wal_chunks(wal_dir: &Path) -> std::io::Result<Vec<PathBuf>> {
685    let mut paths = Vec::new();
686    let read_dir = match std::fs::read_dir(wal_dir) {
687        Ok(rd) => rd,
688        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(paths),
689        Err(e) => return Err(e),
690    };
691    for entry in read_dir {
692        let entry = entry?;
693        let path = entry.path();
694        if path.extension().and_then(|s| s.to_str()) == Some("wal") {
695            paths.push(path);
696        }
697    }
698    paths.sort();
699    Ok(paths)
700}
701
702/// v7.18 PITR — encode one v4 `checkpoint_marker` record. Layout:
703///
704/// ```text
705/// [u32 LE (payload_len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
706/// [u32 LE crc32 over (type_byte || payload)]
707/// [u8  type = 0x11]
708/// payload:
709///   [u64 LE checkpoint_lsn]
710///   [i64 LE checkpoint_unix_us  (WAL_V4_NO_CLOCK if no clock)]
711///   [u16 LE snapshot_path_len]
712///   [snapshot_path_bytes]
713/// ```
714///
715/// `payload_len` covers only the payload — keeping the framing
716/// uniform across v3 / v4 record types so torn-write detection in
717/// `replay_wal_into_engine` stays trivial.
718fn encode_v4_checkpoint_marker(
719    checkpoint_lsn: u64,
720    checkpoint_unix_us: i64,
721    snapshot_path: &Path,
722) -> Vec<u8> {
723    let snapshot_bytes = snapshot_path.to_string_lossy().into_owned();
724    let snap_payload = snapshot_bytes.as_bytes();
725    let snap_len_u16: u16 = snap_payload.len().min(u16::MAX as usize) as u16;
726    let mut payload = Vec::with_capacity(8 + 8 + 2 + snap_payload.len());
727    payload.extend_from_slice(&checkpoint_lsn.to_le_bytes());
728    payload.extend_from_slice(&checkpoint_unix_us.to_le_bytes());
729    payload.extend_from_slice(&snap_len_u16.to_le_bytes());
730    payload.extend_from_slice(&snap_payload[..snap_len_u16 as usize]);
731    let mut crc_buf = Vec::with_capacity(1 + payload.len());
732    crc_buf.push(WAL_V4_TYPE_CHECKPOINT_MARKER);
733    crc_buf.extend_from_slice(&payload);
734    let crc = spg_crypto::crc32::crc32(&crc_buf);
735    let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
736    let mut out = Vec::with_capacity(4 + 4 + 1 + payload.len());
737    out.extend_from_slice(&header);
738    out.extend_from_slice(&crc.to_le_bytes());
739    out.push(WAL_V4_TYPE_CHECKPOINT_MARKER);
740    out.extend_from_slice(&payload);
741    out
742}
743
744/// v7.18 PITR — encode one v4 `auto_commit_sql` record. Layout:
745///
746/// ```text
747/// [u32 LE (sql_len | WAL_V2_SENTINEL | WAL_V3_FLAG)]
748/// [u32 LE crc32 over (type_byte || lsn || ts || sql_bytes)]
749/// [u8  type = 0x10]
750/// [u64 LE commit_lsn]
751/// [i64 LE commit_unix_us  (= WAL_V4_NO_CLOCK when no ClockFn)]
752/// [sql bytes]
753/// ```
754///
755/// `sql_len` field stays the SQL byte count — same shape as v3 — so
756/// replay-buffer torn-write detection compares against
757/// `WAL_V4_EXTRA_HEADER + sql_len`. v3 records (type 0x01) stay
758/// readable by the same loop with their original 9-byte header
759/// arithmetic.
760fn encode_v4_auto_commit(sql: &str, commit_lsn: u64, commit_unix_us: i64) -> Vec<u8> {
761    let payload = sql.as_bytes();
762    let mut crc_buf = Vec::with_capacity(1 + WAL_V4_EXTRA_HEADER + payload.len());
763    crc_buf.push(WAL_V4_TYPE_AUTO_COMMIT_SQL);
764    crc_buf.extend_from_slice(&commit_lsn.to_le_bytes());
765    crc_buf.extend_from_slice(&commit_unix_us.to_le_bytes());
766    crc_buf.extend_from_slice(payload);
767    let crc = spg_crypto::crc32::crc32(&crc_buf);
768    let header = ((payload.len() as u32) | WAL_V2_SENTINEL | WAL_V3_FLAG).to_le_bytes();
769    let mut out = Vec::with_capacity(4 + 4 + 1 + WAL_V4_EXTRA_HEADER + payload.len());
770    out.extend_from_slice(&header);
771    out.extend_from_slice(&crc.to_le_bytes());
772    out.push(WAL_V4_TYPE_AUTO_COMMIT_SQL);
773    out.extend_from_slice(&commit_lsn.to_le_bytes());
774    out.extend_from_slice(&commit_unix_us.to_le_bytes());
775    out.extend_from_slice(payload);
776    out
777}
778
779/// v7.1 — decode + apply every record in `wal_bytes` to `engine`.
780/// Returns the count of records successfully applied. A truncated
781/// trailing record (mid-write torn) is dropped silently — the
782/// same recovery story `spg-server`'s boot path uses.
783fn replay_wal_into_engine(wal_bytes: &[u8], engine: &mut Engine) -> Result<usize, String> {
784    let mut applied = 0usize;
785    let mut cur = 0usize;
786    while cur < wal_bytes.len() {
787        if wal_bytes.len() - cur < 4 {
788            // Trailing partial header — torn write, drop and stop.
789            break;
790        }
791        let raw_len = u32::from_le_bytes(wal_bytes[cur..cur + 4].try_into().unwrap());
792        let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
793        let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
794        let len_mask = if is_v3 {
795            !(WAL_V2_SENTINEL | WAL_V3_FLAG)
796        } else {
797            !WAL_V2_SENTINEL
798        };
799        let rec_len = (raw_len & len_mask) as usize;
800        let header_len = if is_v3 {
801            9
802        } else if is_v2 {
803            8
804        } else {
805            4
806        };
807        if wal_bytes.len() - cur < header_len + rec_len {
808            // Torn record at the tail — drop, stop.
809            break;
810        }
811        if is_v3 {
812            let type_byte = wal_bytes[cur + 8];
813            match type_byte {
814                WAL_V3_TYPE_AUTO_COMMIT_SQL => {}
815                WAL_V3_TYPE_DURABILITY_CHECKPOINT => {
816                    // durability_checkpoint marker — skip, no SQL.
817                    cur += header_len + rec_len;
818                    continue;
819                }
820                WAL_V4_TYPE_CHECKPOINT_MARKER => {
821                    // v7.18 PITR — checkpoint anchor, skip on replay
822                    // (engine state past this point reflects the
823                    // matching snapshot already loaded by the caller).
824                    cur += header_len + rec_len;
825                    continue;
826                }
827                WAL_V4_TYPE_AUTO_COMMIT_SQL => {
828                    // v7.18 PITR — v4 record carries 16 bytes of
829                    // (commit_lsn, commit_unix_us) between the type
830                    // byte and the SQL payload. Replay reads them but
831                    // does not enforce them — the engine doesn't
832                    // surface LSN/clock here. Restore tooling
833                    // (spgctl) parses them via parse_wal_record below.
834                    let v4_total = header_len + WAL_V4_EXTRA_HEADER + rec_len;
835                    if wal_bytes.len() - cur < v4_total {
836                        // Torn v4 record at the tail — drop, stop.
837                        break;
838                    }
839                    let sql_start = cur + header_len + WAL_V4_EXTRA_HEADER;
840                    let sql_bytes = &wal_bytes[sql_start..sql_start + rec_len];
841                    let sql = std::str::from_utf8(sql_bytes)
842                        .map_err(|e| format!("WAL replay: non-UTF-8 SQL at offset {cur}: {e}"))?;
843                    engine.execute(sql).map_err(|e| {
844                        format!("WAL replay: apply {sql:?} at offset {cur} rejected: {e:?}")
845                    })?;
846                    applied += 1;
847                    cur += v4_total;
848                    continue;
849                }
850                other => {
851                    return Err(format!(
852                        "WAL replay: unknown v3 type byte {other:#04x} at offset {cur}"
853                    ));
854                }
855            }
856        }
857        let sql_bytes = &wal_bytes[cur + header_len..cur + header_len + rec_len];
858        let sql = std::str::from_utf8(sql_bytes)
859            .map_err(|e| format!("WAL replay: non-UTF-8 SQL at offset {cur}: {e}"))?;
860        engine
861            .execute(sql)
862            .map_err(|e| format!("WAL replay: apply {sql:?} at offset {cur} rejected: {e:?}"))?;
863        applied += 1;
864        cur += header_len + rec_len;
865    }
866    Ok(applied)
867}
868
869/// v7.18 PITR — parsed WAL record, surfaced for restore / verify
870/// tooling. The replay loop above doesn't expose LSN/timestamp;
871/// `spgctl restore --to <timestamp>` and `spgctl verify` need them.
872/// Returned offsets are byte-positions inside the WAL buffer.
873#[derive(Debug, Clone)]
874pub struct WalRecord<'a> {
875    /// Byte offset in the WAL buffer where this record starts.
876    pub offset: usize,
877    /// Type byte (0x01 = v3 auto-commit, 0x10 = v4 auto-commit,
878    /// 0x02 = durability checkpoint marker).
879    pub type_byte: u8,
880    /// `Some(lsn)` for v4 records, `None` for v3.
881    pub commit_lsn: Option<u64>,
882    /// `Some(unix_us)` for v4 records carrying a clock-set timestamp,
883    /// `None` for v3 or for v4 records explicitly written with
884    /// `WAL_V4_NO_CLOCK` (sentinel for "no ClockFn at commit time").
885    pub commit_unix_us: Option<i64>,
886    /// SQL payload as borrowed bytes. Empty for durability markers.
887    pub sql: &'a [u8],
888}
889
890/// v7.18 PITR — iterate over `wal_bytes` yielding one `WalRecord`
891/// per intact record. Torn-tail records terminate iteration
892/// silently (same recovery story as `replay_wal_into_engine`).
893/// Unknown type bytes inside a v3 envelope return `Err` so the
894/// caller knows the WAL was written by a newer SPG.
895pub fn parse_wal_records(wal_bytes: &[u8]) -> Result<Vec<WalRecord<'_>>, String> {
896    let mut out = Vec::new();
897    let mut cur = 0usize;
898    while cur < wal_bytes.len() {
899        if wal_bytes.len() - cur < 4 {
900            break;
901        }
902        let raw_len = u32::from_le_bytes(wal_bytes[cur..cur + 4].try_into().unwrap());
903        let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
904        let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
905        let len_mask = if is_v3 {
906            !(WAL_V2_SENTINEL | WAL_V3_FLAG)
907        } else {
908            !WAL_V2_SENTINEL
909        };
910        let rec_len = (raw_len & len_mask) as usize;
911        let header_len = if is_v3 {
912            9
913        } else if is_v2 {
914            8
915        } else {
916            4
917        };
918        if wal_bytes.len() - cur < header_len + rec_len {
919            break;
920        }
921        if !is_v3 {
922            // v1 / v2 records carry no type byte; treat as legacy
923            // auto-commit SQL with no LSN/time.
924            let sql = &wal_bytes[cur + header_len..cur + header_len + rec_len];
925            out.push(WalRecord {
926                offset: cur,
927                type_byte: WAL_V3_TYPE_AUTO_COMMIT_SQL,
928                commit_lsn: None,
929                commit_unix_us: None,
930                sql,
931            });
932            cur += header_len + rec_len;
933            continue;
934        }
935        let type_byte = wal_bytes[cur + 8];
936        match type_byte {
937            WAL_V3_TYPE_AUTO_COMMIT_SQL => {
938                let sql = &wal_bytes[cur + header_len..cur + header_len + rec_len];
939                out.push(WalRecord {
940                    offset: cur,
941                    type_byte,
942                    commit_lsn: None,
943                    commit_unix_us: None,
944                    sql,
945                });
946                cur += header_len + rec_len;
947            }
948            WAL_V3_TYPE_DURABILITY_CHECKPOINT => {
949                out.push(WalRecord {
950                    offset: cur,
951                    type_byte,
952                    commit_lsn: None,
953                    commit_unix_us: None,
954                    sql: &[],
955                });
956                cur += header_len + rec_len;
957            }
958            WAL_V4_TYPE_CHECKPOINT_MARKER => {
959                // v7.18 PITR — payload = (lsn u64)(ts i64)(path_len u16)(path bytes).
960                // We surface lsn + ts on the WalRecord; the path lives
961                // in `sql` since the type byte already disambiguates
962                // record meaning and adding a dedicated field would
963                // bloat the iterator return type for every variant.
964                if rec_len < 18 {
965                    return Err(format!(
966                        "WAL parse: checkpoint marker at offset {cur} too short ({rec_len} bytes)"
967                    ));
968                }
969                let lsn = u64::from_le_bytes(
970                    wal_bytes[cur + header_len..cur + header_len + 8]
971                        .try_into()
972                        .unwrap(),
973                );
974                let ts_raw = i64::from_le_bytes(
975                    wal_bytes[cur + header_len + 8..cur + header_len + 16]
976                        .try_into()
977                        .unwrap(),
978                );
979                let path_len = u16::from_le_bytes(
980                    wal_bytes[cur + header_len + 16..cur + header_len + 18]
981                        .try_into()
982                        .unwrap(),
983                ) as usize;
984                if rec_len < 18 + path_len {
985                    return Err(format!(
986                        "WAL parse: checkpoint marker at offset {cur} truncated path"
987                    ));
988                }
989                let path_start = cur + header_len + 18;
990                let path_bytes = &wal_bytes[path_start..path_start + path_len];
991                let commit_unix_us = if ts_raw == WAL_V4_NO_CLOCK {
992                    None
993                } else {
994                    Some(ts_raw)
995                };
996                out.push(WalRecord {
997                    offset: cur,
998                    type_byte,
999                    commit_lsn: Some(lsn),
1000                    commit_unix_us,
1001                    sql: path_bytes,
1002                });
1003                cur += header_len + rec_len;
1004            }
1005            WAL_V4_TYPE_AUTO_COMMIT_SQL => {
1006                let v4_total = header_len + WAL_V4_EXTRA_HEADER + rec_len;
1007                if wal_bytes.len() - cur < v4_total {
1008                    break;
1009                }
1010                let lsn = u64::from_le_bytes(
1011                    wal_bytes[cur + header_len..cur + header_len + 8]
1012                        .try_into()
1013                        .unwrap(),
1014                );
1015                let ts_raw = i64::from_le_bytes(
1016                    wal_bytes[cur + header_len + 8..cur + header_len + 16]
1017                        .try_into()
1018                        .unwrap(),
1019                );
1020                let commit_unix_us = if ts_raw == WAL_V4_NO_CLOCK {
1021                    None
1022                } else {
1023                    Some(ts_raw)
1024                };
1025                let sql_start = cur + header_len + WAL_V4_EXTRA_HEADER;
1026                let sql = &wal_bytes[sql_start..sql_start + rec_len];
1027                out.push(WalRecord {
1028                    offset: cur,
1029                    type_byte,
1030                    commit_lsn: Some(lsn),
1031                    commit_unix_us,
1032                    sql,
1033                });
1034                cur += v4_total;
1035            }
1036            other => {
1037                return Err(format!(
1038                    "WAL parse: unknown type byte {other:#04x} at offset {cur}"
1039                ));
1040            }
1041        }
1042    }
1043    Ok(out)
1044}
1045
1046/// v7.1 — predicate for "should the next `execute()` mutate the
1047/// WAL?" Returns `false` for SELECT / SHOW / EXPLAIN / BEGIN /
1048/// COMMIT / ROLLBACK and the SPG-specific verbs that don't go
1049/// through the auto-commit record path on the server (CHECKPOINT,
1050/// COMPACT). Conservative: anything we don't explicitly know is
1051/// read-only falls through to "write a WAL record".
1052fn sql_is_read_only(sql: &str) -> bool {
1053    let t = sql.trim_start();
1054    let head = t
1055        .split(|c: char| c.is_whitespace() || c == ';' || c == '(')
1056        .next()
1057        .unwrap_or("");
1058    matches!(
1059        head.to_ascii_lowercase().as_str(),
1060        "select"
1061            | "show"
1062            | "explain"
1063            | "begin"
1064            | "commit"
1065            | "rollback"
1066            | "checkpoint"
1067            | "compact"
1068            | "wait"
1069            | "with"
1070    )
1071}
1072
1073/// Embedded SPG database handle. Owns an `Engine` + provides
1074/// ergonomic wrappers around `execute` and `query`. Drops the
1075/// engine on `Drop` — no WAL flush / fsync, because v6.10.3
1076/// is in-memory only.
1077#[derive(Debug)]
1078pub struct Database {
1079    engine: Engine,
1080    /// v7.1 — persistence sidecar. When `Some(p)`, every
1081    /// `execute(sql)` that mutates state appends a v4
1082    /// `auto_commit_sql` WAL record + fsyncs before the call
1083    /// returns; `Drop` writes a final catalog snapshot to
1084    /// `<db_path>` so the next session boots from a clean
1085    /// snapshot + an empty WAL. `None` = in-memory only (the
1086    /// v6.10.3 shape).
1087    persistence: Option<PersistenceCtx>,
1088    /// v7.18 PITR — monotonic per-database commit LSN. Increments
1089    /// before each successful WAL append; bootstrapped at
1090    /// open_path from `max(parse_wal_records → commit_lsn)` so
1091    /// reopen never reuses an LSN. In-memory databases start at
1092    /// 0 and never advance (no WAL = no LSN-meaningful records).
1093    commit_lsn: AtomicU64,
1094}
1095
1096#[derive(Debug)]
1097#[allow(dead_code)] // `wal_dir`/`current_chunk_path` are read at boot; kept for Drop/diag introspection.
1098struct PersistenceCtx {
1099    db_path: PathBuf,
1100    /// v7.19 — WAL chunk directory at `<db_path>.wal/`.
1101    /// Replaces the v7.18 single-file `<db_path>.wal` layout.
1102    /// Each chunk file inside is named
1103    /// `<unix_us>_<leading_lsn>.wal` (zero-padded to 16 digits
1104    /// so default-lex sort = LSN order).
1105    wal_dir: PathBuf,
1106    /// Path of the currently-open chunk file inside `wal_dir`.
1107    /// Rotated at checkpoint and whenever the chunk crosses
1108    /// `checkpoint_threshold_bytes`.
1109    current_chunk_path: PathBuf,
1110    /// v7.19 P3 — retention sweeper handle. `Some` when
1111    /// `SPG_PITR_RETENTION_HOURS > 0` at open_path time; `None`
1112    /// when retention is disabled (the default; v7.18 behaviour
1113    /// preserved). The thread polls `wal_dir` every
1114    /// `SPG_PITR_RETENTION_CHECK_SEC` seconds, archives via
1115    /// `SPG_PITR_ARCHIVE_CMD` if set, then deletes chunks older
1116    /// than the retention window. Signalled to exit via
1117    /// `retention_shutdown` on Drop.
1118    retention_shutdown: Option<Arc<AtomicBool>>,
1119    retention_thread: Option<std::thread::JoinHandle<()>>,
1120    /// v7.20 — background WAL flusher for
1121    /// `SPG_SYNCHRONOUS_COMMIT=off`. `None` in the default
1122    /// synchronous mode. Flushes the pending batch every
1123    /// `SPG_WAL_WRITER_DELAY_MS`; signalled + joined on Drop
1124    /// before the final checkpoint so clean shutdown never
1125    /// loses confirmed commits.
1126    flusher_shutdown: Option<Arc<AtomicBool>>,
1127    flusher_thread: Option<std::thread::JoinHandle<()>>,
1128    /// v7.20 P2 — group-commit WAL. Shared with WalTickets
1129    /// returned by the buffered write path so `wait()` can run
1130    /// after the engine write lock is released.
1131    wal: Arc<WalGroup>,
1132    checkpoint_threshold_bytes: u64,
1133    /// v7.1.4 — `<db_path>.spg/segments/` directory. Cold-tier
1134    /// segments produced by `freeze_oldest_to_cold` / compaction
1135    /// are persisted here as `seg_<id>.spg` files; the manifest
1136    /// at `<db_path>.spg/manifest.v10` records every active
1137    /// segment + its CRC32 so the next boot can verify + reload.
1138    cold_segments_dir: PathBuf,
1139    cold_segment_paths: BTreeMap<u32, PathBuf>,
1140    /// v7.17.0 Phase 6.2 — cross-process exclusion lock. Acquired
1141    /// via `fs::create_dir` on `<db_path>.lock` at open_path
1142    /// entry; released on Drop by `fs::remove_dir`. atomic on
1143    /// every supported platform. A second process opening the
1144    /// same path while the first is still alive hits the
1145    /// create_dir failure and returns
1146    /// `EngineError::Unsupported("database is locked by another
1147    /// process: …")`. Stale locks (process crashed mid-session)
1148    /// must be cleared via `Database::force_unlock(path)` —
1149    /// SPG can't safely fingerprint who owned a stale directory
1150    /// without a libc dep, which would violate spg-embedded's
1151    /// zero-deps charter.
1152    lock_path: PathBuf,
1153}
1154
1155impl Database {
1156    /// Open a fresh in-memory database. No WAL, no catalog
1157    /// snapshot on disk — perfect for tests + short-lived
1158    /// CLI tools.
1159    #[must_use]
1160    pub fn open_in_memory() -> Self {
1161        Self {
1162            engine: Engine::new().with_clock(wall_clock_micros),
1163            persistence: None,
1164            commit_lsn: AtomicU64::new(0),
1165        }
1166    }
1167
1168    /// v7.1 — Open or create a persistent database backed by
1169    /// the file at `db_path`. The WAL lives at `db_path` +
1170    /// ".wal" (e.g. `./data/spg.db` → `./data/spg.db.wal`). Boot
1171    /// path:
1172    ///
1173    /// 1. If `db_path` exists, restore the catalog snapshot.
1174    /// 2. If the WAL exists, replay every record into the
1175    ///    restored engine — the same recovery story
1176    ///    `spg-server` uses.
1177    /// 3. Open the WAL in append+sync mode so subsequent
1178    ///    `execute()` writes durably commit (one fsync per
1179    ///    mutation).
1180    ///
1181    /// `Drop` writes a final catalog snapshot + truncates the
1182    /// WAL — operators that need a sync barrier at a specific
1183    /// point use `checkpoint()` explicitly.
1184    pub fn open_path(db_path: impl AsRef<Path>) -> Result<Self, EngineError> {
1185        let db_path = db_path.as_ref().to_path_buf();
1186        // v7.19 — WAL is a directory of chunk files. Legacy
1187        // single-file path stays variable-named `wal_path` for
1188        // the backward-compat migration block below.
1189        let wal_path = {
1190            let mut p = db_path.clone();
1191            let name = p
1192                .file_name()
1193                .map(|n| {
1194                    let mut s = n.to_os_string();
1195                    s.push(".wal");
1196                    s
1197                })
1198                .unwrap_or_else(|| std::ffi::OsString::from(".wal"));
1199            p.set_file_name(name);
1200            p
1201        };
1202        let wal_dir = wal_path.clone();
1203        if let Some(parent) = db_path.parent()
1204            && !parent.as_os_str().is_empty()
1205        {
1206            std::fs::create_dir_all(parent).map_err(io_err)?;
1207        }
1208        // v7.17.0 Phase 6.2 — acquire cross-process exclusion
1209        // lock before touching any catalog / WAL bytes. atomic
1210        // mkdir on every supported platform; a second process
1211        // opening the same path while the first is still alive
1212        // hits the create_dir failure and gets a clear error.
1213        let lock_path = {
1214            let mut p = db_path.clone();
1215            let name = p
1216                .file_name()
1217                .map(|n| {
1218                    let mut s = n.to_os_string();
1219                    s.push(".lock");
1220                    s
1221                })
1222                .unwrap_or_else(|| std::ffi::OsString::from(".lock"));
1223            p.set_file_name(name);
1224            p
1225        };
1226        std::fs::create_dir(&lock_path).map_err(|e| {
1227            if e.kind() == std::io::ErrorKind::AlreadyExists {
1228                EngineError::Unsupported(format!(
1229                    "database is locked by another process (or stale lock): {}; \
1230                     remove the directory manually after confirming no other \
1231                     process holds it, or call Database::force_unlock()",
1232                    lock_path.display()
1233                ))
1234            } else {
1235                io_err(e)
1236            }
1237        })?;
1238        let mut engine = if db_path.exists() {
1239            let bytes = std::fs::read(&db_path).map_err(io_err)?;
1240            let engine = Engine::restore_envelope(&bytes).map_err(|e| {
1241                EngineError::Storage(spg_storage::StorageError::Corrupt(format!(
1242                    "restore from {}: {e}",
1243                    db_path.display()
1244                )))
1245            })?;
1246            engine.with_clock(wall_clock_micros)
1247        } else {
1248            Engine::new().with_clock(wall_clock_micros)
1249        };
1250        // v7.1.4 — manifest-driven cold-segment reload. The
1251        // manifest sidecar pairs the catalog snapshot CRC with a
1252        // list of `(segment_id, path, crc32)` triples; verify
1253        // before loading so a torn or stale manifest doesn't
1254        // surface phantom data.
1255        let cold_segments_dir = {
1256            let parent = db_path.parent().unwrap_or_else(|| Path::new("."));
1257            let stem = db_path
1258                .file_stem()
1259                .unwrap_or_else(|| std::ffi::OsStr::new("db"))
1260                .to_string_lossy()
1261                .into_owned();
1262            parent.join(format!("{stem}.spg")).join("segments")
1263        };
1264        let mut cold_segment_paths: BTreeMap<u32, PathBuf> = BTreeMap::new();
1265        let manifest_pth = spg_manifest_path(&db_path);
1266        if manifest_pth.exists() && db_path.exists() {
1267            let m_bytes = std::fs::read(&manifest_pth).map_err(io_err)?;
1268            if let Ok(m) = CatalogManifest::deserialize(&m_bytes) {
1269                let snap_bytes = std::fs::read(&db_path).map_err(io_err)?;
1270                let snap_crc = spg_crypto::crc32::crc32(&snap_bytes);
1271                if snap_crc == m.catalog_crc32 {
1272                    for entry in &m.cold_segments {
1273                        if let Ok(seg_bytes) = std::fs::read(&entry.path) {
1274                            let computed = spg_crypto::crc32::crc32(&seg_bytes);
1275                            if computed != entry.crc32 {
1276                                eprintln!(
1277                                    "spg-embedded: manifest skip segment {}: CRC mismatch",
1278                                    entry.segment_id
1279                                );
1280                                continue;
1281                            }
1282                            if engine.catalog().cold_segment(entry.segment_id).is_some() {
1283                                // Already loaded via Catalog::clone path (shouldn't happen
1284                                // since Engine::new + restore_envelope don't populate cold).
1285                                continue;
1286                            }
1287                            let mut new_cat = engine.catalog().clone();
1288                            if let Err(e) =
1289                                new_cat.load_segment_bytes_at(entry.segment_id, seg_bytes)
1290                            {
1291                                eprintln!(
1292                                    "spg-embedded: manifest load segment {} failed: {e}",
1293                                    entry.segment_id
1294                                );
1295                                continue;
1296                            }
1297                            engine.replace_catalog(new_cat);
1298                            cold_segment_paths.insert(entry.segment_id, entry.path.clone());
1299                        } else {
1300                            eprintln!(
1301                                "spg-embedded: manifest skip segment {}: file unreadable",
1302                                entry.segment_id
1303                            );
1304                        }
1305                    }
1306                }
1307            }
1308        }
1309        // v7.19 — chunked WAL on-disk layout.
1310        //
1311        // Three cases handled here:
1312        //
1313        // 1. wal_dir exists as a DIRECTORY → scan its
1314        //    `<unix_us>_<leading_lsn>.wal` chunks (sorted
1315        //    lexicographically = chunk-creation order), replay
1316        //    them in sequence, advance the LSN watermark to the
1317        //    max commit_lsn seen.
1318        //
1319        // 2. wal_path exists as a FILE → legacy v7.18 layout.
1320        //    Migrate it: create `wal_dir/`, move the single file
1321        //    inside as `0000000000000000_0000000000000000.wal`,
1322        //    then fall through to case 1's replay loop.
1323        //
1324        // 3. Neither exists → fresh database; create wal_dir.
1325        let mut initial_lsn: u64 = 0;
1326        if wal_path.is_file() {
1327            // Case 2: legacy single-file WAL migration.
1328            let legacy_bytes = std::fs::read(&wal_path).map_err(io_err)?;
1329            std::fs::remove_file(&wal_path).map_err(io_err)?;
1330            std::fs::create_dir_all(&wal_dir).map_err(io_err)?;
1331            if !legacy_bytes.is_empty() {
1332                let migrated = wal_dir.join(legacy_chunk_filename());
1333                std::fs::write(&migrated, &legacy_bytes).map_err(io_err)?;
1334            }
1335        } else if !wal_dir.exists() {
1336            // Case 3: fresh database.
1337            std::fs::create_dir_all(&wal_dir).map_err(io_err)?;
1338        }
1339        // Cases 1 + 2 share replay logic now that wal_dir is
1340        // guaranteed to exist (and may be empty for case 3).
1341        //
1342        // Two-pass replay so we don't double-apply records the
1343        // snapshot already reflects:
1344        //
1345        // 1. Find the highest commit_lsn carried by a
1346        //    checkpoint_marker across all chunks. That LSN is the
1347        //    snapshot's high-water mark — anything ≤ it is
1348        //    already in `<db_path>` and replaying it would
1349        //    DuplicateTable / double-insert.
1350        // 2. Replay only records strictly above that LSN.
1351        //
1352        // Case 2 migration (legacy single-file WAL) lands here
1353        // too: the migrated chunk has no marker so the LSN floor
1354        // is 0 and every record applies — exactly the v7.18
1355        // behaviour the migration is supposed to preserve.
1356        let chunk_paths = sorted_wal_chunks(&wal_dir).map_err(io_err)?;
1357        let mut snapshot_lsn: u64 = 0;
1358        for chunk in &chunk_paths {
1359            let bytes = std::fs::read(chunk).map_err(io_err)?;
1360            if let Ok(records) = parse_wal_records(&bytes) {
1361                for r in &records {
1362                    if r.type_byte == WAL_V4_TYPE_CHECKPOINT_MARKER {
1363                        if let Some(l) = r.commit_lsn {
1364                            if l > snapshot_lsn {
1365                                snapshot_lsn = l;
1366                            }
1367                        }
1368                    }
1369                }
1370            }
1371        }
1372        for chunk in &chunk_paths {
1373            let bytes = std::fs::read(chunk).map_err(io_err)?;
1374            if bytes.is_empty() {
1375                continue;
1376            }
1377            replay_wal_filtered(&bytes, &mut engine, snapshot_lsn)
1378                .map_err(|m| EngineError::Storage(spg_storage::StorageError::Corrupt(m)))?;
1379            if let Ok(records) = parse_wal_records(&bytes) {
1380                if let Some(max) = records.iter().filter_map(|r| r.commit_lsn).max() {
1381                    if max > initial_lsn {
1382                        initial_lsn = max;
1383                    }
1384                }
1385            }
1386        }
1387        // Open the "current" chunk — either the last existing
1388        // chunk file (so subsequent appends extend it until the
1389        // size threshold rotates) or a fresh first chunk.
1390        let now_us = wall_clock_micros();
1391        let current_chunk_path = if let Some(last) = chunk_paths.last() {
1392            last.clone()
1393        } else {
1394            wal_dir.join(chunk_filename(now_us, initial_lsn + 1))
1395        };
1396        let wal_file = OpenOptions::new()
1397            .create(true)
1398            .append(true)
1399            .read(true)
1400            .open(&current_chunk_path)
1401            .map_err(io_err)?;
1402        let wal_len = wal_file.metadata().map_err(io_err)?.len();
1403        let wal = Arc::new(WalGroup::new(wal_file, wal_len));
1404        // v7.19 P3 — spawn retention sweep thread when the
1405        // operator opted in via SPG_PITR_RETENTION_HOURS > 0.
1406        // Otherwise stay on the v7.18 behaviour (chunks accumulate
1407        // until something else — backup-pitr archival, manual
1408        // cleanup — moves them).
1409        let retention_hours = pitr_retention_hours();
1410        let (retention_shutdown, retention_thread) = if retention_hours > 0 {
1411            let shutdown = Arc::new(AtomicBool::new(false));
1412            let shutdown_clone = Arc::clone(&shutdown);
1413            let wal_dir_clone = wal_dir.clone();
1414            let check_interval = std::time::Duration::from_secs(pitr_retention_check_sec());
1415            let archive_cmd = pitr_archive_cmd();
1416            let handle = std::thread::Builder::new()
1417                .name("spg-pitr-retention".into())
1418                .spawn(move || {
1419                    retention_sweep_loop(
1420                        wal_dir_clone,
1421                        retention_hours,
1422                        check_interval,
1423                        archive_cmd,
1424                        shutdown_clone,
1425                    );
1426                })
1427                .map_err(io_err)?;
1428            (Some(shutdown), Some(handle))
1429        } else {
1430            (None, None)
1431        };
1432        // v7.20 — background flusher for SPG_SYNCHRONOUS_COMMIT=off.
1433        let (flusher_shutdown, flusher_thread) = if synchronous_commit_on() {
1434            (None, None)
1435        } else {
1436            let shutdown = Arc::new(AtomicBool::new(false));
1437            let shutdown_clone = Arc::clone(&shutdown);
1438            let group = Arc::clone(&wal);
1439            let interval = std::time::Duration::from_millis(wal_writer_delay_ms());
1440            let handle = std::thread::Builder::new()
1441                .name("spg-wal-flusher".into())
1442                .spawn(move || {
1443                    while !shutdown_clone.load(Ordering::SeqCst) {
1444                        std::thread::sleep(interval);
1445                        if let Err(e) = group.flush_now() {
1446                            eprintln!("spg-embedded: background WAL flush failed: {e:?}");
1447                        }
1448                    }
1449                    // Final drain on shutdown signal.
1450                    let _ = group.flush_now();
1451                })
1452                .map_err(io_err)?;
1453            (Some(shutdown), Some(handle))
1454        };
1455        Ok(Self {
1456            engine,
1457            commit_lsn: AtomicU64::new(initial_lsn),
1458            persistence: Some(PersistenceCtx {
1459                db_path,
1460                wal_dir,
1461                current_chunk_path,
1462                wal,
1463                checkpoint_threshold_bytes: default_checkpoint_threshold_bytes(),
1464                cold_segments_dir,
1465                cold_segment_paths,
1466                lock_path,
1467                retention_shutdown,
1468                retention_thread,
1469                flusher_shutdown,
1470                flusher_thread,
1471            }),
1472        })
1473    }
1474
1475    /// v7.1.4 — freeze the oldest `max_rows` of `table_name`'s
1476    /// hot tier into a brand-new cold-tier segment + persist
1477    /// it to disk. Same semantics as `spg-server`'s freezer
1478    /// thread; embedded just runs the freeze synchronously on
1479    /// the caller's thread. Persistence + manifest update
1480    /// happen as part of the next `checkpoint()` (or on Drop).
1481    pub fn freeze_oldest_to_cold(
1482        &mut self,
1483        table_name: &str,
1484        index_name: &str,
1485        max_rows: usize,
1486    ) -> Result<spg_storage::FreezeReport, EngineError> {
1487        let report = self
1488            .engine
1489            .freeze_oldest_to_cold(table_name, index_name, max_rows)?;
1490        if let Some(p) = &mut self.persistence {
1491            std::fs::create_dir_all(&p.cold_segments_dir).map_err(io_err)?;
1492            let final_path = p
1493                .cold_segments_dir
1494                .join(format!("seg_{}.spg", report.segment_id));
1495            let tmp_path = p
1496                .cold_segments_dir
1497                .join(format!("seg_{}.spg.tmp", report.segment_id));
1498            std::fs::write(&tmp_path, &report.segment_bytes).map_err(io_err)?;
1499            std::fs::rename(&tmp_path, &final_path).map_err(io_err)?;
1500            p.cold_segment_paths.insert(report.segment_id, final_path);
1501        }
1502        Ok(report)
1503    }
1504
1505    /// v7.1 — override the auto-checkpoint WAL-size ceiling for
1506    /// this `Database` instance. Default is
1507    /// `SPG_EMBEDDED_CHECKPOINT_BYTES` env (4 MiB if unset); the
1508    /// setter wins. No-op when the database is in-memory.
1509    pub fn set_checkpoint_threshold_bytes(&mut self, bytes: u64) {
1510        if let Some(p) = &mut self.persistence {
1511            p.checkpoint_threshold_bytes = bytes.max(1);
1512        }
1513    }
1514
1515    /// v7.1 — flush a fresh catalog snapshot to `db_path` and
1516    /// truncate the WAL. Idempotent; cheap when nothing has
1517    /// happened since the last checkpoint. No-op when the
1518    /// database is in-memory (no `db_path` configured).
1519    ///
1520    /// Called automatically when:
1521    /// - the WAL grows past
1522    ///   `SPG_EMBEDDED_CHECKPOINT_BYTES` (default 4 MiB) at the
1523    ///   end of an `execute()`, and
1524    /// - `Drop` runs (best-effort; checkpoint failure on drop is
1525    ///   logged to stderr).
1526    pub fn checkpoint(&mut self) -> Result<(), EngineError> {
1527        let snapshot = self.engine.snapshot();
1528        let Some(p) = &mut self.persistence else {
1529            return Ok(());
1530        };
1531        // Snapshot first (atomic via tmp+rename), then WAL
1532        // truncate. Same order as `spg-server`'s CHECKPOINT —
1533        // a crash between the two leaves the WAL holding
1534        // already-snapshotted ops, which replay cleanly on the
1535        // next boot (idempotent for SPG's standard DDL/DML
1536        // mutations).
1537        let tmp = {
1538            let mut t = p.db_path.clone();
1539            let mut name = t
1540                .file_name()
1541                .map(std::ffi::OsStr::to_os_string)
1542                .unwrap_or_default();
1543            name.push(".tmp");
1544            t.set_file_name(name);
1545            t
1546        };
1547        std::fs::write(&tmp, &snapshot).map_err(io_err)?;
1548        std::fs::rename(&tmp, &p.db_path).map_err(io_err)?;
1549        // v7.1.4 — refresh the manifest so the next boot can
1550        // reload cold segments alongside the snapshot. Bytes
1551        // come from the freshly-written snapshot file (= the
1552        // canonical CRC source).
1553        if !p.cold_segment_paths.is_empty() {
1554            let snap_crc = spg_crypto::crc32::crc32(&snapshot);
1555            let entries: Vec<ColdSegmentEntry> = p
1556                .cold_segment_paths
1557                .iter()
1558                .filter_map(|(&segment_id, path)| {
1559                    let bytes = std::fs::read(path).ok()?;
1560                    Some(ColdSegmentEntry {
1561                        segment_id,
1562                        path: path.clone(),
1563                        crc32: spg_crypto::crc32::crc32(&bytes),
1564                    })
1565                })
1566                .collect();
1567            let manifest = CatalogManifest {
1568                catalog_crc32: snap_crc,
1569                cold_segments: entries,
1570                wal_baseline_offset: 0,
1571            };
1572            let m_bytes = manifest.serialize();
1573            let m_path = spg_manifest_path(&p.db_path);
1574            if let Some(dir) = m_path.parent() {
1575                std::fs::create_dir_all(dir).map_err(io_err)?;
1576            }
1577            let m_tmp = {
1578                let mut t = m_path.clone();
1579                let mut name = t
1580                    .file_name()
1581                    .map(std::ffi::OsStr::to_os_string)
1582                    .unwrap_or_default();
1583                name.push(".tmp");
1584                t.set_file_name(name);
1585                t
1586            };
1587            std::fs::write(&m_tmp, &m_bytes).map_err(io_err)?;
1588            std::fs::rename(&m_tmp, &m_path).map_err(io_err)?;
1589        }
1590        // v7.19 — append a checkpoint marker to the current chunk
1591        // (anchors restore-to-time backups), then rotate to a
1592        // fresh chunk file. Old chunks stay on disk and become
1593        // input to the retention thread (P3) + spgctl backup-pitr
1594        // (P6). The single-file `set_len(0)` truncate the v7.18
1595        // path used is gone — that path silently discarded WAL
1596        // history between checkpoint and the operator's next cron
1597        // run, which is exactly what PITR was meant to fix.
1598        let marker_lsn = self.commit_lsn.load(Ordering::SeqCst);
1599        let marker_ts = wall_clock_micros();
1600        let marker = encode_v4_checkpoint_marker(marker_lsn, marker_ts, &p.db_path);
1601        // v7.20 P2 — checkpoint holds &mut self (engine
1602        // exclusive), so there are no concurrent enqueues: drain
1603        // the pending batch, append the marker, flush, then
1604        // rotate the chunk handle inside the group.
1605        p.wal.enqueue(&marker);
1606        p.wal.flush_now()?;
1607        let new_chunk_path = p.wal_dir.join(chunk_filename(marker_ts, marker_lsn + 1));
1608        let new_handle = OpenOptions::new()
1609            .create(true)
1610            .append(true)
1611            .read(true)
1612            .open(&new_chunk_path)
1613            .map_err(io_err)?;
1614        p.current_chunk_path = new_chunk_path;
1615        p.wal.rotate_file(new_handle);
1616        Ok(())
1617    }
1618
1619    /// Restore a database from a previously-captured catalog
1620    /// snapshot. Pairs with `Database::snapshot()` for
1621    /// round-tripping in-memory state without going through
1622    /// the `spg-server` WAL.
1623    pub fn restore(snapshot: &[u8]) -> Result<Self, EngineError> {
1624        let engine = Engine::restore_envelope(snapshot).map_err(|e| {
1625            EngineError::Storage(spg_storage::StorageError::Corrupt(format!("restore: {e}")))
1626        })?;
1627        Ok(Self {
1628            engine,
1629            persistence: None,
1630            commit_lsn: AtomicU64::new(0),
1631        })
1632    }
1633
1634    /// Take a catalog snapshot suitable for `Database::restore`.
1635    /// The bytes are SPG's canonical catalog envelope (FILE_MAGIC
1636    /// + version + payload); round-trips through every released
1637    /// SPG version per the STABILITY contract.
1638    #[must_use]
1639    pub fn snapshot(&self) -> Vec<u8> {
1640        self.engine.snapshot()
1641    }
1642
1643    /// Execute a SQL statement and return the engine's
1644    /// `QueryResult` verbatim. Pass-through for callers that
1645    /// want to keep PG-flavoured column/row metadata.
1646    ///
1647    /// v7.1 — when the database was opened via `open_path`,
1648    /// successful mutations are appended to the WAL + fsynced
1649    /// before the call returns. A subsequent process crash will
1650    /// recover state up to the last successful return from
1651    /// `execute()`. Read-only statements (SELECT / SHOW /
1652    /// EXPLAIN / BEGIN-COMMIT-ROLLBACK / CHECKPOINT / COMPACT
1653    /// etc.) skip the WAL entirely.
1654    pub fn execute(&mut self, sql: &str) -> Result<QueryResult, EngineError> {
1655        // v7.20 P2 — single-caller convenience over the buffered
1656        // path: enqueue + immediately wait. Batch size is 1 here,
1657        // so the durability behaviour (one fsync before Ok) is
1658        // identical to v7.19. Concurrent callers go through
1659        // `execute_buffered` (AsyncDatabase does) and share the
1660        // leader's fsync.
1661        let (result, ticket) = self.execute_buffered(sql)?;
1662        if let Some(t) = ticket {
1663            t.wait()?;
1664        }
1665        Ok(result)
1666    }
1667
1668    /// v7.20 P2 — group-commit write entry. Runs the engine
1669    /// mutation + encodes/enqueues the WAL record, then RETURNS
1670    /// WITHOUT waiting for the fsync. The caller must call
1671    /// [`WalTicket::wait`] before treating the write as durable
1672    /// — crucially, the caller can (and should) drop whatever
1673    /// lock guards this `Database` first, so the next writer's
1674    /// mutation overlaps this batch's fsync.
1675    ///
1676    /// `None` ticket = nothing hit the WAL (read-only statement,
1677    /// no-op DDL, or in-memory database) — the result is final
1678    /// as returned.
1679    ///
1680    /// # Errors
1681    /// Engine errors propagate unchanged. Auto-checkpoint (when
1682    /// the active chunk crosses the threshold) runs inline and
1683    /// may surface IO errors.
1684    pub fn execute_buffered(
1685        &mut self,
1686        sql: &str,
1687    ) -> Result<(QueryResult, Option<WalTicket>), EngineError> {
1688        let result = self.engine.execute(sql)?;
1689        let mut ticket = None;
1690        if self.persistence.is_some()
1691            && !sql_is_read_only(sql)
1692            && matches!(
1693                &result,
1694                QueryResult::CommandOk {
1695                    modified_catalog: true,
1696                    ..
1697                }
1698            )
1699        {
1700            // v7.18 PITR — v4 records carry commit LSN +
1701            // wall-clock micros. The crash window remains one
1702            // BATCH now instead of one record: replay re-applies
1703            // idempotently exactly as before, and a torn batch
1704            // tail drops cleanly (same torn-write handling).
1705            let lsn = self.commit_lsn.fetch_add(1, Ordering::SeqCst) + 1;
1706            let ts = wall_clock_micros();
1707            let record = encode_v4_auto_commit(sql, lsn, ts);
1708            let p = self.persistence.as_mut().expect("checked above");
1709            let seq = p.wal.enqueue(&record);
1710            ticket = Some(WalTicket {
1711                group: Arc::clone(&p.wal),
1712                seq,
1713            });
1714            if p.wal.written_len() >= p.checkpoint_threshold_bytes {
1715                self.checkpoint()?;
1716            }
1717        }
1718        Ok((result, ticket))
1719    }
1720
1721    /// v7.3.0 — typed-row variant of [`Database::query`]. Each
1722    /// row decodes into a `T: FromSpgRow` so callers don't
1723    /// pattern-match on `Value` themselves. Use [`spg_row!`] to
1724    /// generate the impl, or write it by hand.
1725    pub fn query_typed<T: FromSpgRow>(&mut self, sql: &str) -> Result<Vec<T>, EngineError> {
1726        let rows = self.query(sql)?;
1727        rows.into_iter().map(|r| T::from_spg_row(&r)).collect()
1728    }
1729
1730    /// Run a SELECT and return rows as a `Vec<Vec<Value>>` —
1731    /// strips the column-schema metadata for read-side
1732    /// ergonomics. Errors on non-Rows results (DML / DDL
1733    /// statements should go through `execute` instead).
1734    pub fn query(&mut self, sql: &str) -> Result<Vec<Vec<Value>>, EngineError> {
1735        match self.engine.execute(sql)? {
1736            QueryResult::Rows { rows, .. } => Ok(rows.into_iter().map(|r| r.values).collect()),
1737            QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
1738                "query() expects a SELECT — use execute() for DML/DDL".into(),
1739            )),
1740            // v7.5.0 — QueryResult is #[non_exhaustive]; any future
1741            // variant is not a SELECT row stream, treat as Unsupported.
1742            _ => Err(EngineError::Unsupported(
1743                "query() expects a SELECT — use execute() for DML/DDL".into(),
1744            )),
1745        }
1746    }
1747
1748    /// v7.16.0 — column-aware variant of [`Self::query`].
1749    /// Returns the column schema vec alongside the rows so
1750    /// adapters (the spg-sqlx Row impl most notably) can drive
1751    /// name + type-based column lookups. Errors on non-Rows
1752    /// results identically to `query`.
1753    pub fn query_with_columns(
1754        &mut self,
1755        sql: &str,
1756    ) -> Result<(Vec<spg_storage::ColumnSchema>, Vec<Vec<Value>>), EngineError> {
1757        match self.engine.execute(sql)? {
1758            QueryResult::Rows { columns, rows } => {
1759                Ok((columns, rows.into_iter().map(|r| r.values).collect()))
1760            }
1761            QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
1762                "query_with_columns() expects a SELECT — use execute() for DML/DDL".into(),
1763            )),
1764            _ => Err(EngineError::Unsupported(
1765                "query_with_columns() expects a SELECT — use execute() for DML/DDL".into(),
1766            )),
1767        }
1768    }
1769
1770    /// v7.16.0 — column-aware variant of
1771    /// [`Self::query_prepared`]. Same shape as
1772    /// `query_with_columns` but driven from a prepared
1773    /// statement + bound params.
1774    pub fn query_prepared_with_columns(
1775        &mut self,
1776        stmt: &Statement,
1777        params: &[Value],
1778    ) -> Result<(Vec<spg_storage::ColumnSchema>, Vec<Vec<Value>>), EngineError> {
1779        match self.engine.execute_prepared(stmt.stmt.clone(), params)? {
1780            QueryResult::Rows { columns, rows } => {
1781                Ok((columns, rows.into_iter().map(|r| r.values).collect()))
1782            }
1783            QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
1784                "query_prepared_with_columns() expects a SELECT — use execute_prepared() for DML/DDL".into(),
1785            )),
1786            _ => Err(EngineError::Unsupported(
1787                "query_prepared_with_columns() expects a SELECT — use execute_prepared() for DML/DDL".into(),
1788            )),
1789        }
1790    }
1791
1792    /// Borrow the underlying engine. Escape hatch for callers
1793    /// that need access to `spg-engine` APIs not yet surfaced
1794    /// here (transactions, EXPLAIN ANALYZE, etc.).
1795    #[must_use]
1796    pub const fn engine(&self) -> &Engine {
1797        &self.engine
1798    }
1799
1800    /// Mutable borrow of the underlying engine. Same intent as
1801    /// `engine()` but for write-side APIs (e.g. inserting
1802    /// directly through `Catalog::insert` for high-throughput
1803    /// bulk loads that bypass SQL parsing).
1804    pub const fn engine_mut(&mut self) -> &mut Engine {
1805        &mut self.engine
1806    }
1807
1808    /// v7.16.0 — parse + plan a SQL string ONCE so subsequent
1809    /// `execute_prepared` / `query_prepared` calls can re-bind
1810    /// parameters without re-parsing. The returned [`Statement`]
1811    /// is a thin handle around the AST + cached source SQL; it's
1812    /// `Clone` so the same plan can drive many bind calls
1813    /// concurrently (each call clones the AST and runs
1814    /// placeholder substitution on the clone — the cached
1815    /// plan stays intact).
1816    ///
1817    /// Plan caching follows the engine's existing version-aware
1818    /// rule: a prepared `Statement` whose statistics version
1819    /// has rolled (ANALYZE ran between prepare and execute)
1820    /// will silently re-prepare under the hood. Callers don't
1821    /// need to detect this.
1822    ///
1823    /// Placeholders in the SQL use PG's `$1`, `$2`, … convention.
1824    /// `bind`-time `Value`s are passed as a slice; arity
1825    /// mismatches surface as `EvalError::PlaceholderOutOfRange`
1826    /// at `execute_prepared` time, not here.
1827    ///
1828    /// # Errors
1829    /// Surfaces `EngineError` (parse error / plan rewrite
1830    /// failure) from the underlying `Engine::prepare`.
1831    pub fn prepare(&mut self, sql: &str) -> Result<Statement, EngineError> {
1832        // Use the cached path so repeated prepares of the same
1833        // SQL are O(1). The engine's plan cache stays shared
1834        // across all callers of this Database — a single
1835        // `PgPool`-shaped consumer (or, later, the spg-sqlx
1836        // adapter) prepares once and reaps the win on every bind.
1837        let stmt = self
1838            .engine
1839            .prepare_cached(sql)
1840            .map_err(EngineError::Parse)?;
1841        Ok(Statement {
1842            stmt,
1843            sql: sql.to_string(),
1844        })
1845    }
1846
1847    /// v7.17.0 Phase 3.P0-66 — describe a SQL string without
1848    /// executing. Returns `(parameter_oid_count, output_columns)`
1849    /// where `output_columns` is empty for non-SELECT statements
1850    /// or for SELECT shapes the describe planner can't resolve
1851    /// (JOIN / subquery / unknown table). Wraps
1852    /// `Engine::describe_prepared` so the spg-sqlx bridge can
1853    /// surface PG-shape Describe replies for
1854    /// `sqlx::query!()` compile-time validation.
1855    ///
1856    /// # Errors
1857    /// Propagates parse errors from the underlying prepare path.
1858    pub fn describe(&mut self, sql: &str) -> Result<(Vec<u32>, Vec<ColumnSchema>), EngineError> {
1859        let stmt = self
1860            .engine
1861            .prepare_cached(sql)
1862            .map_err(EngineError::Parse)?;
1863        Ok(self.engine.describe_prepared(&stmt))
1864    }
1865
1866    /// v7.16.0 — execute a prepared statement with bound
1867    /// parameters. Mirrors `Engine::execute_prepared`: clones
1868    /// the AST, substitutes `$1..$N` → `params[0..N-1]`, runs.
1869    ///
1870    /// Persistence (WAL fsync + auto-checkpoint) follows the
1871    /// same rules as `execute(sql)`: mutating statements get a
1872    /// WAL record AFTER the in-memory exec succeeds. The WAL
1873    /// record carries the substituted, bind-final SQL, so
1874    /// replay reconstructs the same row state without needing
1875    /// the original prepared `Statement` to still be alive.
1876    ///
1877    /// # Errors
1878    /// Propagates engine errors. Param arity mismatch surfaces
1879    /// as `EvalError::PlaceholderOutOfRange`.
1880    pub fn execute_prepared(
1881        &mut self,
1882        stmt: &Statement,
1883        params: &[Value],
1884    ) -> Result<QueryResult, EngineError> {
1885        let (result, ticket) = self.execute_prepared_buffered(stmt, params)?;
1886        if let Some(t) = ticket {
1887            t.wait()?;
1888        }
1889        Ok(result)
1890    }
1891
1892    /// v7.20 P2 — group-commit variant of
1893    /// [`Database::execute_prepared`]. Same contract as
1894    /// [`Database::execute_buffered`]: mutation + enqueue happen
1895    /// here; the caller waits on the ticket AFTER releasing
1896    /// whatever lock guards this `Database`.
1897    ///
1898    /// # Errors
1899    /// Engine errors propagate unchanged; inline auto-checkpoint
1900    /// may surface IO errors.
1901    pub fn execute_prepared_buffered(
1902        &mut self,
1903        stmt: &Statement,
1904        params: &[Value],
1905    ) -> Result<(QueryResult, Option<WalTicket>), EngineError> {
1906        let result = self.engine.execute_prepared(stmt.stmt.clone(), params)?;
1907        let mut ticket = None;
1908        // WAL persistence on the bind-final SQL. Build the
1909        // canonical Display form by re-printing the
1910        // placeholder-substituted statement (cheap — the AST
1911        // is already in hand from execute_prepared's internal
1912        // clone) so replay's path is identical to the
1913        // simple-query path.
1914        if self.persistence.is_some()
1915            && matches!(
1916                &result,
1917                QueryResult::CommandOk {
1918                    modified_catalog: true,
1919                    ..
1920                }
1921            )
1922        {
1923            let mut wal_stmt = stmt.stmt.clone();
1924            crate::wal_render_with_params(&mut wal_stmt, params);
1925            let canonical = format!("{wal_stmt}");
1926            // v7.18 PITR — prepared path also emits v4 records so
1927            // LSN/timestamp coverage is uniform across simple and
1928            // extended query.
1929            let lsn = self.commit_lsn.fetch_add(1, Ordering::SeqCst) + 1;
1930            let ts = wall_clock_micros();
1931            let record = encode_v4_auto_commit(&canonical, lsn, ts);
1932            let p = self.persistence.as_mut().expect("checked above");
1933            let seq = p.wal.enqueue(&record);
1934            ticket = Some(WalTicket {
1935                group: Arc::clone(&p.wal),
1936                seq,
1937            });
1938            if p.wal.written_len() >= p.checkpoint_threshold_bytes {
1939                self.checkpoint()?;
1940            }
1941        }
1942        Ok((result, ticket))
1943    }
1944
1945    /// v7.16.0 — run a prepared SELECT with bound params and
1946    /// return rows as `Vec<Vec<Value>>`, matching `query()`
1947    /// shape. SELECTs are read-only so this never writes the
1948    /// WAL.
1949    ///
1950    /// # Errors
1951    /// Returns `Unsupported` if the prepared statement isn't a
1952    /// SELECT (use `execute_prepared` for DML/DDL).
1953    pub fn query_prepared(
1954        &mut self,
1955        stmt: &Statement,
1956        params: &[Value],
1957    ) -> Result<Vec<Vec<Value>>, EngineError> {
1958        match self.engine.execute_prepared(stmt.stmt.clone(), params)? {
1959            QueryResult::Rows { rows, .. } => Ok(rows.into_iter().map(|r| r.values).collect()),
1960            QueryResult::CommandOk { .. } => Err(EngineError::Unsupported(
1961                "query_prepared() expects a SELECT — use execute_prepared() for DML/DDL".into(),
1962            )),
1963            _ => Err(EngineError::Unsupported(
1964                "query_prepared() expects a SELECT — use execute_prepared() for DML/DDL".into(),
1965            )),
1966        }
1967    }
1968
1969    /// v7.18 — parse + plan a SQL string against a
1970    /// `CatalogSnapshot`. Mirror of [`Database::prepare`] for the
1971    /// readonly fan-out path: no writer lock taken, no WAL write,
1972    /// no plan-cache mutation. Static-on-`Self` so callers can
1973    /// dispatch against a snapshot without an `&mut Database`
1974    /// borrow — `AsyncReadHandle::prepare` in spg-embedded-tokio
1975    /// is the load-bearing consumer.
1976    ///
1977    /// # Errors
1978    /// Propagates `EngineError::Parse` from the parser.
1979    pub fn prepare_on_snapshot(
1980        snapshot: &CatalogSnapshot,
1981        sql: &str,
1982    ) -> Result<Statement, EngineError> {
1983        let stmt =
1984            spg_engine::Engine::prepare_on_snapshot(snapshot, sql).map_err(EngineError::Parse)?;
1985        Ok(Statement {
1986            stmt,
1987            sql: sql.to_string(),
1988        })
1989    }
1990
1991    /// v7.18 — execute a prepared `Statement` against a
1992    /// `CatalogSnapshot` with bound params. Mirror of
1993    /// [`Database::execute_prepared`] on the readonly path:
1994    /// writes / DDL hit `EngineError::WriteRequired`. No WAL
1995    /// write, no writer lock, multiple snapshots can run
1996    /// concurrently — the snapshot is immutable from prepare time.
1997    ///
1998    /// # Errors
1999    /// Surfaces `EngineError::WriteRequired` for non-readonly
2000    /// statements; propagates other engine errors.
2001    pub fn execute_prepared_on_snapshot(
2002        snapshot: &CatalogSnapshot,
2003        stmt: &Statement,
2004        params: &[Value],
2005    ) -> Result<QueryResult, EngineError> {
2006        spg_engine::Engine::execute_readonly_prepared_on_snapshot(
2007            snapshot,
2008            stmt.stmt.clone(),
2009            params,
2010        )
2011    }
2012
2013    /// v7.18 — describe a SQL string against a
2014    /// `CatalogSnapshot`. Mirror of [`Database::describe`] on
2015    /// the readonly path. Pure function on the snapshot's
2016    /// catalog; safe to call from any thread.
2017    ///
2018    /// # Errors
2019    /// Propagates `EngineError::Parse` from the parser.
2020    pub fn describe_on_snapshot(
2021        snapshot: &CatalogSnapshot,
2022        sql: &str,
2023    ) -> Result<(Vec<u32>, Vec<ColumnSchema>), EngineError> {
2024        let stmt =
2025            spg_engine::Engine::prepare_on_snapshot(snapshot, sql).map_err(EngineError::Parse)?;
2026        Ok(spg_engine::Engine::describe_prepared_on_snapshot(
2027            snapshot, &stmt,
2028        ))
2029    }
2030
2031    /// v7.2.0 — run `body` inside an implicit `BEGIN` /
2032    /// `COMMIT` pair. The body receives `&mut Database` so it
2033    /// can `execute()` / `query()` like any other code path;
2034    /// the only difference is that every write in the body
2035    /// lands inside one transaction, and a returned `Err` from
2036    /// the body triggers `ROLLBACK` before the error propagates.
2037    ///
2038    /// Nested calls are not supported — SPG's transaction
2039    /// model is single-writer with explicit `BEGIN` /
2040    /// `COMMIT` / `ROLLBACK`, and a nested `with_transaction`
2041    /// would hit `EngineError::Unsupported("nested
2042    /// transaction")` at the inner `BEGIN`.
2043    pub fn with_transaction<R, F>(&mut self, body: F) -> Result<R, EngineError>
2044    where
2045        F: FnOnce(&mut Self) -> Result<R, EngineError>,
2046    {
2047        self.execute("BEGIN")?;
2048        match body(self) {
2049            Ok(value) => {
2050                self.execute("COMMIT")?;
2051                Ok(value)
2052            }
2053            Err(e) => {
2054                // Best-effort rollback. If ROLLBACK itself
2055                // fails (rare — the engine reports it via
2056                // `Unsupported` only when there's no active
2057                // TX, which can't happen here) we surface the
2058                // original body error, not the rollback error.
2059                let _ = self.execute("ROLLBACK");
2060                Err(e)
2061            }
2062        }
2063    }
2064}
2065
2066impl Default for Database {
2067    fn default() -> Self {
2068        Self::open_in_memory()
2069    }
2070}
2071
2072/// v7.7.5 — observability snapshot returned by
2073/// [`Database::metrics`]. Plain data, no allocations beyond
2074/// what the struct itself takes; cheap to construct and
2075/// cheap to serialise.
2076#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2077#[non_exhaustive]
2078pub struct EmbeddedMetrics {
2079    /// Total live row count across every user table (hot
2080    /// tier only — cold-tier rows live in segment files).
2081    pub hot_rows: u64,
2082    /// Sum of `Table::hot_bytes` across every user table.
2083    /// Tracks against the freezer's `hot_tier_bytes` budget.
2084    pub hot_bytes: u64,
2085    /// Number of cold-tier segments registered in the catalog.
2086    /// Includes tombstoned slots (segments retired by
2087    /// compaction whose disk file may still be on disk).
2088    pub cold_segments: u64,
2089    /// User-table count (excludes any future engine-managed
2090    /// internal tables).
2091    pub tables: u64,
2092    /// WAL size at last `execute()` / `checkpoint()`. Zero
2093    /// when the database is in-memory.
2094    pub wal_bytes: u64,
2095    /// `true` when the database was opened with `open_path` —
2096    /// i.e. WAL + checkpoint persistence is active.
2097    pub persistent: bool,
2098}
2099
2100/// v7.2.1 — handle returned by `spawn_background_freezer`.
2101/// Drop signals the worker thread to wind down + joins it,
2102/// so a `Database` (or its shared `Arc<Mutex<Database>>`)
2103/// can safely drop after the handle does.
2104#[must_use = "the background freezer keeps running until this handle is dropped"]
2105#[derive(Debug)]
2106pub struct FreezerHandle {
2107    shutdown: Arc<AtomicBool>,
2108    join: Option<JoinHandle<()>>,
2109}
2110
2111impl FreezerHandle {
2112    /// v7.2.1 — request the worker stop + join. Idempotent;
2113    /// safe to call from `Drop` (which also calls it).
2114    pub fn stop(&mut self) {
2115        self.shutdown.store(true, Ordering::Release);
2116        if let Some(h) = self.join.take() {
2117            let _ = h.join();
2118        }
2119    }
2120}
2121
2122impl Drop for FreezerHandle {
2123    fn drop(&mut self) {
2124        self.stop();
2125    }
2126}
2127
2128/// v7.2.1 — knobs for `Database::spawn_background_freezer`.
2129#[derive(Debug, Clone)]
2130pub struct FreezerOptions {
2131    /// Tick interval. Worker wakes every `tick`, checks the
2132    /// catalog's `hot_tier_bytes`, and freezes if over budget.
2133    pub tick: Duration,
2134    /// Hot-tier byte budget. Exceeded → next tick freezes the
2135    /// largest table's oldest `batch_rows` rows into a new
2136    /// cold segment.
2137    pub hot_tier_bytes: u64,
2138    /// Max rows the freezer demotes per fire.
2139    pub batch_rows: usize,
2140    /// v7.7.4 — auto-compact threshold. When the catalog has
2141    /// at least this many cold segments across all tables, the
2142    /// freezer fires a compaction pass after its next freeze.
2143    /// Set to `usize::MAX` to disable auto-compact entirely;
2144    /// the default is `64`, matching the `spg-server` operating
2145    /// point for SPG_COLD_COMPACT_SEGMENT_THRESHOLD.
2146    pub compact_when_segments_exceed: usize,
2147    /// v7.7.4 — target segment size for compaction merges,
2148    /// in bytes. Default 64 MiB, mirroring `spg-server`. Small
2149    /// segments below this size are merge candidates;
2150    /// segments at or above stay untouched.
2151    pub compact_target_bytes: u64,
2152}
2153
2154impl Default for FreezerOptions {
2155    fn default() -> Self {
2156        // Match the `spg-server` freezer's default operating
2157        // point (SPG_HOT_TIER_BYTES = 4 GiB, batch 1000 rows,
2158        // tick every 1 s) so embedded behaviour is predictable
2159        // for operators familiar with the server.
2160        Self {
2161            tick: Duration::from_secs(1),
2162            hot_tier_bytes: 4 * 1024 * 1024 * 1024,
2163            batch_rows: 1000,
2164            compact_when_segments_exceed: 64,
2165            compact_target_bytes: 64 * 1024 * 1024,
2166        }
2167    }
2168}
2169
2170impl Database {
2171    /// v7.7.4 — observe the catalog's cold-segment count.
2172    /// Useful for tests + dashboards that want to verify
2173    /// auto-compaction is firing.
2174    #[must_use]
2175    pub fn cold_segment_count(&self) -> usize {
2176        self.engine.catalog().cold_segment_count()
2177    }
2178
2179    /// v7.7.5 — observability snapshot. Returns a point-in-time
2180    /// view of the engine + persistence counters. Cheap (no
2181    /// locks beyond the existing `&self` borrow), so safe to
2182    /// call from a hot metrics-scrape path.
2183    ///
2184    /// Fields mirror the operational dashboard
2185    /// [`spg-server`](https://crates.io/crates/spg-server) exposes,
2186    /// minus the network counters that don't apply to embedded.
2187    #[must_use]
2188    pub fn metrics(&self) -> EmbeddedMetrics {
2189        let cat = self.engine.catalog();
2190        let mut hot_rows: u64 = 0;
2191        let mut hot_bytes: u64 = 0;
2192        for name in cat.table_names() {
2193            if let Some(t) = cat.get(&name) {
2194                hot_rows = hot_rows.saturating_add(t.row_count() as u64);
2195                hot_bytes = hot_bytes.saturating_add(t.hot_bytes());
2196            }
2197        }
2198        let (wal_bytes, persistent) = match &self.persistence {
2199            Some(p) => (p.wal.written_len(), true),
2200            None => (0, false),
2201        };
2202        EmbeddedMetrics {
2203            hot_rows,
2204            hot_bytes,
2205            cold_segments: cat.cold_segment_count() as u64,
2206            tables: cat.table_count() as u64,
2207            wal_bytes,
2208            persistent,
2209        }
2210    }
2211
2212    /// v7.2.1 — spawn a background thread that periodically
2213    /// runs `freeze_oldest_to_cold` when the catalog-wide hot
2214    /// tier exceeds `opts.hot_tier_bytes`. The `Arc<Mutex<_>>`
2215    /// pattern matches the v7.2 sharing story: callers wrap
2216    /// their `Database` in `Arc::new(Mutex::new(db))` once,
2217    /// then clone the Arc for the worker + for foreground
2218    /// access. Return value is a handle whose `Drop` joins the
2219    /// worker.
2220    ///
2221    /// Picks the freeze target the same way `spg-server`'s
2222    /// freezer does: largest-`hot_bytes` user table with at
2223    /// least one BTree integer-PK index. Tables without a
2224    /// freezable index are skipped silently.
2225    pub fn spawn_background_freezer(
2226        db: Arc<Mutex<Database>>,
2227        opts: FreezerOptions,
2228    ) -> FreezerHandle {
2229        let shutdown = Arc::new(AtomicBool::new(false));
2230        let shutdown_for_thread = Arc::clone(&shutdown);
2231        let join = thread::Builder::new()
2232            .name("spg-embedded-freezer".into())
2233            .spawn(move || {
2234                background_freezer_loop(db, opts, shutdown_for_thread);
2235            })
2236            .expect("spawn background freezer thread");
2237        FreezerHandle {
2238            shutdown,
2239            join: Some(join),
2240        }
2241    }
2242}
2243
2244/// v7.2.1 — the freezer's main loop, factored out so the
2245/// `Database::spawn_background_freezer` path stays readable.
2246fn background_freezer_loop(
2247    db: Arc<Mutex<Database>>,
2248    opts: FreezerOptions,
2249    shutdown: Arc<AtomicBool>,
2250) {
2251    // Sleep in short slices so a shutdown request resolves
2252    // quickly (vs sleeping the full tick).
2253    let slice = Duration::from_millis(50.min(opts.tick.as_millis() as u64));
2254    let mut last_tick = std::time::Instant::now();
2255    loop {
2256        if shutdown.load(Ordering::Acquire) {
2257            return;
2258        }
2259        thread::sleep(slice);
2260        if last_tick.elapsed() < opts.tick {
2261            continue;
2262        }
2263        last_tick = std::time::Instant::now();
2264        let Ok(mut guard) = db.lock() else {
2265            return;
2266        };
2267        if guard.engine.catalog().hot_tier_bytes() <= opts.hot_tier_bytes {
2268            continue;
2269        }
2270        let Some((table, index)) = pick_freeze_target(&guard) else {
2271            continue;
2272        };
2273        let row_count = guard
2274            .engine
2275            .catalog()
2276            .get(&table)
2277            .map_or(0, spg_storage::Table::row_count);
2278        let to_freeze = opts.batch_rows.min(row_count);
2279        if to_freeze == 0 {
2280            continue;
2281        }
2282        if let Err(e) = guard.freeze_oldest_to_cold(&table, &index, to_freeze) {
2283            eprintln!("spg-embedded: background freeze on {table}.{index} failed: {e:?}");
2284            continue;
2285        }
2286        // v7.7.4 — auto-compact. If the catalog now carries
2287        // more cold segments than the configured threshold,
2288        // run a single compaction pass. Failures are reported
2289        // but don't kill the loop; the next tick will retry.
2290        let count = guard.engine.catalog().cold_segment_count();
2291        if count > opts.compact_when_segments_exceed {
2292            if let Err(e) = guard
2293                .engine
2294                .compact_cold_segments_with_target(opts.compact_target_bytes)
2295            {
2296                eprintln!(
2297                    "spg-embedded: background compact failed (segments={count}, \
2298                     threshold={}): {e:?}",
2299                    opts.compact_when_segments_exceed,
2300                );
2301            }
2302        }
2303    }
2304}
2305
2306/// v7.2.1 — pick the highest-`hot_bytes` user table with a
2307/// BTree integer-PK index. Returns `(table, index_name)` so the
2308/// caller can dispatch through `freeze_oldest_to_cold`.
2309fn pick_freeze_target(db: &Database) -> Option<(String, String)> {
2310    let cat = db.engine.catalog();
2311    let mut best: Option<(String, String, u64)> = None;
2312    for name in cat.table_names() {
2313        let Some(t) = cat.get(&name) else { continue };
2314        if t.row_count() == 0 {
2315            continue;
2316        }
2317        let cols = &t.schema().columns;
2318        let Some(idx) = t.indices().iter().find(|i| {
2319            matches!(i.kind, spg_storage::IndexKind::BTree(_))
2320                && i.column_position < cols.len()
2321                && matches!(
2322                    cols[i.column_position].ty,
2323                    spg_storage::DataType::SmallInt
2324                        | spg_storage::DataType::Int
2325                        | spg_storage::DataType::BigInt
2326                )
2327        }) else {
2328            continue;
2329        };
2330        let hot = t.hot_bytes();
2331        match best {
2332            None => best = Some((name, idx.name.clone(), hot)),
2333            Some((_, _, best_hot)) if hot > best_hot => {
2334                best = Some((name, idx.name.clone(), hot));
2335            }
2336            _ => {}
2337        }
2338    }
2339    best.map(|(t, i, _)| (t, i))
2340}
2341
2342/// v7.7.6 — replay the first `to_seq` records of the WAL at
2343/// `wal_path` into a fresh engine and write the resulting
2344/// catalog snapshot to `out_db_path`. Same semantics as
2345/// `spg revert --wal … --to-seq N --out …` from the CLI:
2346///
2347///   - `to_seq == 0` → snapshot is the empty catalog
2348///   - WAL records beyond `to_seq` are not applied
2349///   - durability-checkpoint markers (v3 type 0x02) are
2350///     consumed without counting against the budget
2351///
2352/// Returns the number of statements actually applied
2353/// (`≤ to_seq`). The output snapshot is byte-identical to
2354/// what `Database::open_path(out_db_path)` would consume on
2355/// a subsequent open.
2356///
2357/// This is the "rewind" operator for an embedded database
2358/// that has been corrupted by a poison statement or a
2359/// half-applied migration. Pair with `cold_segment_paths`
2360/// preservation if your cold-tier files are still on disk.
2361///
2362/// # Errors
2363///
2364/// - `wal_path` unreadable or truncated mid-record
2365/// - WAL record decodes to invalid UTF-8 SQL
2366/// - WAL record's SQL is rejected by the engine
2367/// - `out_db_path` unwritable
2368pub fn revert_wal_to_seq(
2369    wal_path: impl AsRef<Path>,
2370    to_seq: u64,
2371    out_db_path: impl AsRef<Path>,
2372) -> Result<u64, EngineError> {
2373    // v7.19 — accept either a single-file legacy WAL (v7.18 and
2374    // earlier layout) or a chunked WAL directory (v7.19+). For a
2375    // directory, concatenate every `.wal` chunk in sorted order
2376    // — the same order open_path replays them in — so revert
2377    // sees the full record stream.
2378    let path = wal_path.as_ref();
2379    let wal_bytes = if path.is_dir() {
2380        let mut combined = Vec::new();
2381        let chunks = sorted_wal_chunks(path).map_err(io_err)?;
2382        for chunk in chunks {
2383            let bytes = std::fs::read(&chunk).map_err(io_err)?;
2384            combined.extend_from_slice(&bytes);
2385        }
2386        combined
2387    } else {
2388        std::fs::read(path).map_err(io_err)?
2389    };
2390    let mut engine = Engine::new();
2391    let mut applied = 0u64;
2392    let mut cur = 0usize;
2393    while cur < wal_bytes.len() && applied < to_seq {
2394        let (sql_bytes, total) = decode_wal_record(&wal_bytes[cur..])?;
2395        cur += total;
2396        if sql_bytes.is_empty() {
2397            continue;
2398        }
2399        let sql = core::str::from_utf8(&sql_bytes).map_err(|e| {
2400            EngineError::Storage(spg_storage::StorageError::Corrupt(format!(
2401                "WAL record at offset {cur}: non-UTF-8 SQL: {e}"
2402            )))
2403        })?;
2404        engine.execute(sql)?;
2405        applied += 1;
2406    }
2407    let snapshot = engine.snapshot();
2408    std::fs::write(out_db_path.as_ref(), &snapshot).map_err(io_err)?;
2409    Ok(applied)
2410}
2411
2412/// v7.7.6 — decode one WAL record from a byte tail. Returns
2413/// `(sql_bytes, header_plus_payload_len)`. Handles the three
2414/// on-disk formats (v1 / v2 / v3) the same way the CLI
2415/// `decode_one_record` and the engine's `replay_wal_bytes`
2416/// do. CRCs are not re-validated; the caller's intent is
2417/// "apply", not "validate".
2418fn decode_wal_record(tail: &[u8]) -> Result<(Vec<u8>, usize), EngineError> {
2419    if tail.len() < 4 {
2420        return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
2421            format!("WAL truncated record: {} < 4 header bytes", tail.len()),
2422        )));
2423    }
2424    let raw_len = u32::from_le_bytes(tail[..4].try_into().unwrap());
2425    let is_v2 = raw_len & WAL_V2_SENTINEL != 0;
2426    let is_v3 = is_v2 && (raw_len & WAL_V3_FLAG != 0);
2427    let len_mask = if is_v3 {
2428        !(WAL_V2_SENTINEL | WAL_V3_FLAG)
2429    } else {
2430        !WAL_V2_SENTINEL
2431    };
2432    let rec_len = (raw_len & len_mask) as usize;
2433    let header_len = if is_v3 {
2434        9
2435    } else if is_v2 {
2436        8
2437    } else {
2438        4
2439    };
2440    if tail.len() < header_len + rec_len {
2441        return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
2442            format!(
2443                "WAL truncated record: header+payload {} > available {}",
2444                header_len + rec_len,
2445                tail.len()
2446            ),
2447        )));
2448    }
2449    if is_v3 {
2450        let type_byte = tail[8];
2451        // v3 type 0x01 = auto_commit_sql (payload = SQL).
2452        // v3 type 0x02 = durability marker (no SQL to apply).
2453        // v4 type 0x10 = auto_commit_sql with 16-byte (lsn, ts)
2454        //                prefix between type and SQL — strip
2455        //                the prefix so the caller still sees raw
2456        //                SQL bytes.
2457        // Anything else is unknown.
2458        if type_byte == WAL_V3_TYPE_AUTO_COMMIT_SQL {
2459            let payload = &tail[header_len..header_len + rec_len];
2460            return Ok((payload.to_vec(), header_len + rec_len));
2461        }
2462        if type_byte == WAL_V4_TYPE_AUTO_COMMIT_SQL {
2463            let v4_total = header_len + WAL_V4_EXTRA_HEADER + rec_len;
2464            if tail.len() < v4_total {
2465                return Err(EngineError::Storage(spg_storage::StorageError::Corrupt(
2466                    format!(
2467                        "WAL truncated v4 record: header+payload {v4_total} > available {}",
2468                        tail.len()
2469                    ),
2470                )));
2471            }
2472            let sql_start = header_len + WAL_V4_EXTRA_HEADER;
2473            let sql_bytes = tail[sql_start..sql_start + rec_len].to_vec();
2474            return Ok((sql_bytes, v4_total));
2475        }
2476        // Caller treats empty payload as a skip-marker.
2477        return Ok((Vec::new(), header_len + rec_len));
2478    }
2479    let payload = &tail[header_len..header_len + rec_len];
2480    Ok((payload.to_vec(), header_len + rec_len))
2481}
2482
2483impl Drop for Database {
2484    fn drop(&mut self) {
2485        // v7.1 — best-effort final checkpoint when a persistent
2486        // Database leaves scope. Failures here go to stderr so
2487        // operators see them, but Drop can't propagate errors —
2488        // the WAL itself is already durable, so a checkpoint
2489        // miss only means the next boot replays a few more
2490        // records than strictly necessary.
2491        if self.persistence.is_some() {
2492            if let Err(e) = self.checkpoint() {
2493                eprintln!(
2494                    "spg-embedded: final checkpoint on Drop failed: {e:?} \
2495                     (WAL is intact; next open_path will replay)"
2496                );
2497            }
2498        }
2499        // v7.19 P3 / v7.20 — signal the retention + flusher
2500        // threads to exit, then wait for them. Done BEFORE the
2501        // lock release so background threads don't outlive the
2502        // database handle. The flusher drains the pending batch
2503        // on its way out (final flush_now in the thread body),
2504        // so `SPG_SYNCHRONOUS_COMMIT=off` never loses confirmed
2505        // commits across a clean shutdown.
2506        if let Some(ctx) = self.persistence.as_mut() {
2507            if let Some(shutdown) = ctx.retention_shutdown.take() {
2508                shutdown.store(true, Ordering::SeqCst);
2509            }
2510            if let Some(handle) = ctx.retention_thread.take() {
2511                let _ = handle.join();
2512            }
2513            if let Some(shutdown) = ctx.flusher_shutdown.take() {
2514                shutdown.store(true, Ordering::SeqCst);
2515            }
2516            if let Some(handle) = ctx.flusher_thread.take() {
2517                let _ = handle.join();
2518            }
2519        }
2520        // v7.17.0 Phase 6.2 — release the cross-process lock on
2521        // clean shutdown. Failure is logged but never panics;
2522        // the operator can clear a stale lock via
2523        // `Database::force_unlock` if a crash kept the
2524        // directory around.
2525        if let Some(ctx) = &self.persistence
2526            && ctx.lock_path.exists()
2527        {
2528            if let Err(e) = std::fs::remove_dir(&ctx.lock_path) {
2529                eprintln!(
2530                    "spg-embedded: lock release on Drop failed for {}: {e:?}",
2531                    ctx.lock_path.display()
2532                );
2533            }
2534        }
2535    }
2536}
2537
2538impl Database {
2539    /// v7.17.0 Phase 6.2 — clear a stale cross-process lock.
2540    /// Use when a previous process crashed mid-session and
2541    /// left `<db_path>.lock` behind. Operators should confirm
2542    /// no other process is currently using the database before
2543    /// calling this — SPG cannot fingerprint stale-vs-live
2544    /// without a libc dep, which would violate spg-embedded's
2545    /// zero-deps charter.
2546    pub fn force_unlock(db_path: impl AsRef<Path>) -> Result<(), EngineError> {
2547        let lock_path = {
2548            let mut p = db_path.as_ref().to_path_buf();
2549            let name = p
2550                .file_name()
2551                .map(|n| {
2552                    let mut s = n.to_os_string();
2553                    s.push(".lock");
2554                    s
2555                })
2556                .unwrap_or_else(|| std::ffi::OsString::from(".lock"));
2557            p.set_file_name(name);
2558            p
2559        };
2560        if !lock_path.exists() {
2561            return Ok(());
2562        }
2563        std::fs::remove_dir(&lock_path).map_err(io_err)
2564    }
2565}
2566
2567/// v7.1 — turn a `std::io::Error` into the workspace's
2568/// `EngineError` shape. `EngineError::Storage(Corrupt(_))` is
2569/// the closest existing variant — io failures during boot or
2570/// during a WAL append surface as a storage-layer fault to
2571/// callers, which keeps the public error enum unchanged.
2572fn io_err(e: std::io::Error) -> EngineError {
2573    EngineError::Storage(spg_storage::StorageError::Corrupt(format!("io: {e}")))
2574}
2575
2576/// v7.2.2 — `Database` is `Send`, so the recommended sharing
2577/// pattern for multi-threaded callers is `Arc<Mutex<Database>>`:
2578///
2579/// ```no_run
2580/// use std::sync::{Arc, Mutex};
2581/// use spg_embedded::Database;
2582///
2583/// let db = Database::open_in_memory();
2584/// let shared = Arc::new(Mutex::new(db));
2585/// let shared_for_worker = Arc::clone(&shared);
2586/// std::thread::spawn(move || {
2587///     let mut guard = shared_for_worker.lock().unwrap();
2588///     guard.execute("INSERT INTO t VALUES (1)").unwrap();
2589/// });
2590/// ```
2591///
2592/// Internal `RwLock`-wrapped state — letting many threads
2593/// hold concurrent `&Database` for `SELECT` without contending
2594/// — is parked as STABILITY § "Out of v7.2"; multi-reader
2595/// embedded throughput needs a planner-side change to release
2596/// the engine read lock between scans, which is the v7.x
2597/// "Choice A" line of work already documented in v6.9.1's
2598/// carve-out.
2599#[allow(dead_code)]
2600fn _database_is_send() {
2601    fn assert_send<T: Send>() {}
2602    assert_send::<Database>();
2603}
2604
2605/// v6.10.3 — trait that maps a row's columns onto a user
2606/// struct's fields. v7.3.0 ships the [`spg_row!`] declarative
2607/// macro that generates `impl FromSpgRow for YourStruct` from
2608/// a struct definition (no proc-macro, no syn/quote/
2609/// proc-macro2 deps — the workspace's "0 external deps"
2610/// policy holds).
2611///
2612/// Implementors map a row's columns onto a user struct's
2613/// fields. Errors surface as `EngineError::Unsupported` so the
2614/// caller's error type stays uniform.
2615pub trait FromSpgRow: Sized {
2616    /// Decode one query result row into `Self`. Called once per
2617    /// row by [`Database::query_typed`]. The slice length equals
2618    /// the number of columns in the SELECT projection.
2619    fn from_spg_row(row: &[Value]) -> Result<Self, EngineError>;
2620}
2621
2622/// v7.3.0 — declarative macro that generates `FromSpgRow` impl
2623/// for a user struct. Avoids proc-macro deps
2624/// (syn/quote/proc-macro2) so the workspace's 0-deps policy
2625/// holds; the trade-off vs `#[derive(SpgRow)]` is that the
2626/// macro takes the entire struct definition (fields + types)
2627/// as input rather than annotating an existing struct.
2628///
2629/// ```no_run
2630/// use spg_embedded::{Database, spg_row, FromSpgRow};
2631///
2632/// spg_row! {
2633///     pub struct User {
2634///         pub id: i32,
2635///         pub name: String,
2636///     }
2637/// }
2638///
2639/// let mut db = Database::open_in_memory();
2640/// db.execute("CREATE TABLE users (id INT NOT NULL, name TEXT)").unwrap();
2641/// db.execute("INSERT INTO users VALUES (1, 'alice')").unwrap();
2642/// let users: Vec<User> = db.query_typed("SELECT id, name FROM users").unwrap();
2643/// ```
2644///
2645/// Supported field types: `i16`, `i32`, `i64`, `f32`, `f64`,
2646/// `bool`, `String`, `Vec<f32>` (for `VECTOR(N)` columns),
2647/// `Option<T>` of any of the above.
2648#[macro_export]
2649macro_rules! spg_row {
2650    (
2651        $(#[$meta:meta])*
2652        $vis:vis struct $name:ident {
2653            $(
2654                $(#[$fmeta:meta])*
2655                $fvis:vis $field:ident : $ty:ty,
2656            )*
2657        }
2658    ) => {
2659        $(#[$meta])*
2660        #[derive(Debug, Clone)]
2661        $vis struct $name {
2662            $(
2663                $(#[$fmeta])*
2664                $fvis $field : $ty,
2665            )*
2666        }
2667
2668        impl $crate::FromSpgRow for $name {
2669            fn from_spg_row(row: &[$crate::Value]) -> ::core::result::Result<Self, $crate::EngineError> {
2670                let mut __spg_row_iter = row.iter();
2671                $(
2672                    let $field: $ty = {
2673                        let v = __spg_row_iter
2674                            .next()
2675                            .ok_or_else(|| $crate::EngineError::Unsupported(
2676                                ::std::format!(
2677                                    "spg_row! {}: missing column for field `{}`",
2678                                    ::core::stringify!($name),
2679                                    ::core::stringify!($field)
2680                                )
2681                            ))?;
2682                        <$ty as $crate::FromSpgValue>::from_spg_value(v)
2683                            .map_err(|e| $crate::EngineError::Unsupported(
2684                                ::std::format!(
2685                                    "spg_row! {}: column `{}`: {}",
2686                                    ::core::stringify!($name),
2687                                    ::core::stringify!($field),
2688                                    e
2689                                )
2690                            ))?
2691                    };
2692                )*
2693                Ok(Self { $($field,)* })
2694            }
2695        }
2696    };
2697}
2698
2699/// v7.3.0 — per-column decoder used by `spg_row!`. Surface
2700/// covers every numeric / text / bytes / bool variant in
2701/// `Value`, plus `Option<T>` for nullable columns.
2702pub trait FromSpgValue: Sized {
2703    /// Decode one cell into `Self`. The returned `&'static str`
2704    /// is a short diagnostic for type mismatches (e.g. `"expected
2705    /// integer, got TEXT"`); callers wrap it into their own
2706    /// error type.
2707    fn from_spg_value(v: &Value) -> Result<Self, &'static str>;
2708}
2709
2710macro_rules! impl_from_value_int {
2711    ($($t:ty),* $(,)?) => {
2712        $(
2713            impl FromSpgValue for $t {
2714                fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
2715                    match v {
2716                        Value::SmallInt(n) => <$t>::try_from(*n).map_err(|_| "SmallInt does not fit target int type"),
2717                        Value::Int(n)      => <$t>::try_from(*n).map_err(|_| "Int does not fit target int type"),
2718                        Value::BigInt(n)   => <$t>::try_from(*n).map_err(|_| "BigInt does not fit target int type"),
2719                        Value::Null        => Err("NULL in non-Option int column"),
2720                        _ => Err("non-integer value in int column"),
2721                    }
2722                }
2723            }
2724        )*
2725    };
2726}
2727impl_from_value_int!(i16, i32, i64);
2728
2729impl FromSpgValue for f32 {
2730    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
2731        match v {
2732            Value::Float(f) => Ok(*f as f32),
2733            Value::Null => Err("NULL in non-Option float column"),
2734            _ => Err("non-float value in float column"),
2735        }
2736    }
2737}
2738
2739impl FromSpgValue for f64 {
2740    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
2741        match v {
2742            Value::Float(f) => Ok(*f),
2743            Value::Null => Err("NULL in non-Option float column"),
2744            _ => Err("non-float value in float column"),
2745        }
2746    }
2747}
2748
2749impl FromSpgValue for bool {
2750    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
2751        match v {
2752            Value::Bool(b) => Ok(*b),
2753            Value::Null => Err("NULL in non-Option bool column"),
2754            _ => Err("non-bool value in bool column"),
2755        }
2756    }
2757}
2758
2759impl FromSpgValue for String {
2760    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
2761        match v {
2762            Value::Text(s) => Ok(s.clone()),
2763            Value::Null => Err("NULL in non-Option text column"),
2764            _ => Err("non-text value in String column"),
2765        }
2766    }
2767}
2768
2769impl FromSpgValue for Vec<f32> {
2770    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
2771        match v {
2772            Value::Vector(xs) => Ok(xs.clone()),
2773            Value::Null => Err("NULL in non-Option vector column"),
2774            _ => Err("non-vector value in Vec<f32> column"),
2775        }
2776    }
2777}
2778
2779impl<T: FromSpgValue> FromSpgValue for Option<T> {
2780    fn from_spg_value(v: &Value) -> Result<Self, &'static str> {
2781        match v {
2782            Value::Null => Ok(None),
2783            other => T::from_spg_value(other).map(Some),
2784        }
2785    }
2786}
2787
2788#[cfg(test)]
2789mod tests {
2790    use super::*;
2791
2792    #[test]
2793    fn in_memory_create_insert_select() {
2794        let mut db = Database::open_in_memory();
2795        db.execute("CREATE TABLE t (id INT NOT NULL, name TEXT)")
2796            .unwrap();
2797        db.execute("INSERT INTO t VALUES (1, 'alice')").unwrap();
2798        db.execute("INSERT INTO t VALUES (2, 'bob')").unwrap();
2799        let rows = db.query("SELECT id FROM t WHERE id = 1").unwrap();
2800        assert_eq!(rows.len(), 1);
2801        match &rows[0][0] {
2802            Value::Int(1) => {}
2803            other => panic!("expected Int(1), got {other:?}"),
2804        }
2805    }
2806
2807    #[test]
2808    fn query_on_non_select_errors() {
2809        let mut db = Database::open_in_memory();
2810        db.execute("CREATE TABLE t (id INT)").unwrap();
2811        let r = db.query("INSERT INTO t VALUES (1)");
2812        assert!(r.is_err(), "query() on INSERT must error");
2813    }
2814
2815    #[test]
2816    fn snapshot_roundtrip() {
2817        let mut db = Database::open_in_memory();
2818        db.execute("CREATE TABLE t (id INT NOT NULL)").unwrap();
2819        db.execute("INSERT INTO t VALUES (42)").unwrap();
2820        let bytes = db.snapshot();
2821        let mut restored = Database::restore(&bytes).unwrap();
2822        let rows = restored.query("SELECT id FROM t WHERE id = 42").unwrap();
2823        assert_eq!(rows.len(), 1);
2824        match &rows[0][0] {
2825            Value::Int(42) => {}
2826            other => panic!("expected Int(42), got {other:?}"),
2827        }
2828    }
2829
2830    #[test]
2831    fn from_spg_row_trait_shape() {
2832        struct User {
2833            _id: i32,
2834        }
2835        impl FromSpgRow for User {
2836            fn from_spg_row(row: &[Value]) -> Result<Self, EngineError> {
2837                match row.first() {
2838                    Some(Value::Int(n)) => Ok(Self { _id: *n }),
2839                    _ => Err(EngineError::Unsupported("bad id".into())),
2840                }
2841            }
2842        }
2843        let row = vec![Value::Int(7)];
2844        let _u = User::from_spg_row(&row).unwrap();
2845    }
2846}