#![allow(
clippy::cast_lossless,
clippy::cast_possible_truncation,
clippy::doc_markdown,
clippy::doc_overindented_list_items,
clippy::manual_assert,
clippy::uninlined_format_args,
clippy::unnecessary_debug_formatting,
clippy::unreadable_literal,
unused_mut,
unused_variables
)]
use std::io::{Read, Write};
use std::net::TcpStream;
use std::path::{Path, PathBuf};
use std::time::Duration;
use spg_wire::{Frame, Op, WireValue, build_query, encode, parse_data_row, parse_data_row_batch};
mod common;
fn local_spawn(
db: &std::path::Path,
wal: &std::path::Path,
) -> (std::process::Child, common::ServerAddrs) {
common::ServerBuilder::new()
.arg_path(db)
.arg("-")
.arg_path(wal)
.spawn()
}
const READ_TIMEOUT: Duration = Duration::from_secs(5);
fn workspace_root() -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.parent()
.unwrap()
.parent()
.unwrap()
.to_path_buf()
}
fn unique_tmpdir(tag: &str) -> PathBuf {
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
let p = std::env::temp_dir().join(format!("spg-compat-{tag}-{nanos}"));
std::fs::create_dir_all(&p).unwrap();
p
}
fn read_frame(s: &mut TcpStream) -> Frame {
let mut header = [0u8; spg_wire::FRAME_HEADER_LEN];
s.read_exact(&mut header).unwrap();
let payload_len = u32::from_le_bytes([header[0], header[1], header[2], header[3]]) as usize;
let op = Op::from_byte(header[4]).unwrap();
let mut payload = vec![0u8; payload_len];
if payload_len > 0 {
s.read_exact(&mut payload).unwrap();
}
Frame { op, payload }
}
fn send(s: &mut TcpStream, f: &Frame) {
let mut out = Vec::new();
encode(f, &mut out).unwrap();
s.write_all(&out).unwrap();
}
fn select_one(s: &mut TcpStream, sql: &str) -> WireValue {
send(s, &build_query(sql));
let rd = read_frame(s);
if rd.op == Op::ErrorResponse {
let msg = spg_wire::parse_error_response(&rd).unwrap_or("<undecodable>");
panic!("server rejected SQL {sql:?}: {msg}");
}
assert_eq!(rd.op, Op::RowDescription);
let mut got: Option<WireValue> = None;
loop {
let f = read_frame(s);
match f.op {
Op::DataRow => got = parse_data_row(&f).unwrap().into_iter().next(),
Op::DataRowBatch => {
got = parse_data_row_batch(&f)
.unwrap()
.into_iter()
.next()
.and_then(|r| r.into_iter().next());
}
Op::CommandComplete => return got.expect("no row"),
other => panic!("unexpected {other:?}"),
}
}
}
fn wire_to_i64(v: &WireValue) -> i64 {
match v {
WireValue::Int(n) => i64::from(*n),
WireValue::BigInt(n) => *n,
WireValue::Text(t) => t.parse().unwrap(),
WireValue::Float(f) => *f as i64,
other => panic!("expected integer, got {other:?}"),
}
}
fn wire_to_string(v: &WireValue) -> String {
match v {
WireValue::Text(t) => t.clone(),
other => panic!("expected text, got {other:?}"),
}
}
fn apply_bundle(dest_db: &Path, dest_wal: &Path, bundle: &Path) {
let bytes = std::fs::read(bundle).unwrap();
assert!(
&bytes[..8] == b"SPGBKUP\x01" || &bytes[..8] == b"SPGBKUP\x02",
"not an SPG bundle (magic = {:?})",
&bytes[..8]
);
let snap_len = u64::from_le_bytes(bytes[25..33].try_into().unwrap()) as usize;
let snap_end = 33 + snap_len;
let wal_len =
u64::from_le_bytes(bytes[snap_end + 8..snap_end + 16].try_into().unwrap()) as usize;
let wal_start = snap_end + 16;
if snap_len > 0 {
std::fs::write(dest_db, &bytes[33..33 + snap_len]).unwrap();
std::fs::write(dest_wal, &bytes[wal_start..wal_start + wal_len]).unwrap();
} else {
let mut f = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(dest_wal)
.unwrap();
f.write_all(&bytes[wal_start..wal_start + wal_len]).unwrap();
f.sync_data().unwrap();
}
}
fn parse_expected(path: &Path) -> std::collections::BTreeMap<String, String> {
let mut out = std::collections::BTreeMap::new();
for line in std::fs::read_to_string(path).unwrap().lines() {
if line.is_empty() {
continue;
}
let (k, v) = line
.split_once('=')
.unwrap_or_else(|| panic!("bad line in expected.txt: {line:?}"));
out.insert(k.to_string(), v.to_string());
}
out
}
fn list_fixture_dirs() -> Vec<PathBuf> {
let root = workspace_root().join("xtests/compat-fixtures");
let mut dirs: Vec<PathBuf> = std::fs::read_dir(&root)
.unwrap_or_else(|_| panic!("missing {}", root.display()))
.filter_map(Result::ok)
.filter_map(|e| {
let p = e.path();
if p.is_dir() { Some(p) } else { None }
})
.collect();
dirs.sort();
assert!(
!dirs.is_empty(),
"no fixtures under {} — add at least the current-version capture",
root.display()
);
dirs
}
#[test]
fn every_fixture_restores_and_verifies() {
for fixture in list_fixture_dirs() {
let label = fixture.file_name().unwrap().to_string_lossy().to_string();
eprintln!("-- compat fixture: {label}");
let expected = parse_expected(&fixture.join("expected.txt"));
let bundle = fixture.join("full.bkp");
assert!(bundle.exists(), "fixture {label} missing full.bkp");
let rec = unique_tmpdir(&label);
let rec_db = rec.join("rec.db");
let rec_wal = rec.join("rec.wal");
apply_bundle(&rec_db, &rec_wal, &bundle);
let (raw, addrs) = local_spawn(&rec_db, &rec_wal);
let _child = common::ChildGuard(raw);
let mut s = common::connect_to(&addrs.native);
s.set_read_timeout(Some(READ_TIMEOUT)).unwrap();
let table = expected
.get("table")
.unwrap_or_else(|| panic!("fixture {label} expected.txt missing `table=`"));
if let Some(want) = expected.get("rows") {
let got = wire_to_i64(&select_one(
&mut s,
&format!("SELECT count(*) FROM {table}"),
));
assert_eq!(
got.to_string(),
*want,
"{label}: SELECT count(*) FROM {table}"
);
}
if let Some(want) = expected.get("sum_score") {
let got = wire_to_i64(&select_one(
&mut s,
&format!("SELECT sum(score) FROM {table}"),
));
assert_eq!(got.to_string(), *want, "{label}: SELECT sum(score)");
}
if let Some(want) = expected.get("max_score") {
let got = wire_to_i64(&select_one(
&mut s,
&format!("SELECT max(score) FROM {table}"),
));
assert_eq!(got.to_string(), *want, "{label}: SELECT max(score)");
}
if let Some(want) = expected.get("first_name") {
let got = wire_to_string(&select_one(
&mut s,
&format!("SELECT name FROM {table} ORDER BY id LIMIT 1"),
));
assert_eq!(got, *want, "{label}: first name by id");
}
}
}
#[test]
#[ignore = "release-process capture: regenerates xtests/compat-fixtures/v4.41/"]
fn capture_v4_41_fixture() {
capture_fixture("v4.41");
}
#[test]
#[ignore = "release-process capture: regenerates xtests/compat-fixtures/v5.2/"]
fn capture_v5_2_fixture() {
capture_fixture("v5.2");
}
fn capture_fixture(label: &str) {
let dest = workspace_root().join("xtests/compat-fixtures").join(label);
std::fs::create_dir_all(&dest).expect("mkdir fixture dir");
let work = unique_tmpdir(&format!("capture-{label}"));
let db = work.join("a.db");
let wal = work.join("a.wal");
let bkp = work.join("full.bkp");
let (raw, addrs) = local_spawn(&db, &wal);
let _child = common::ChildGuard(raw);
let mut s = common::connect_to(&addrs.native);
s.set_read_timeout(Some(READ_TIMEOUT)).unwrap();
let stmts = [
"CREATE TABLE compat (id INT, name TEXT NOT NULL, score INT)",
"INSERT INTO compat VALUES (1, 'alice', 100)",
"INSERT INTO compat VALUES (2, 'bob', 90)",
"INSERT INTO compat VALUES (3, 'carol', 87)",
];
for sql in stmts {
send(&mut s, &build_query(sql));
let f = read_frame(&mut s);
if f.op == Op::ErrorResponse {
let msg = spg_wire::parse_error_response(&f).unwrap_or("<undecodable>");
panic!("capture: server rejected {sql:?}: {msg}");
}
}
send(
&mut s,
&build_query(&format!("BACKUP TO '{}'", bkp.display())),
);
let f = read_frame(&mut s);
if f.op == Op::ErrorResponse {
let msg = spg_wire::parse_error_response(&f).unwrap_or("<undecodable>");
panic!("capture: BACKUP TO {bkp:?} failed: {msg}");
}
std::fs::copy(&bkp, dest.join("full.bkp")).expect("copy bundle");
std::fs::copy(&wal, dest.join("a.wal")).expect("copy raw WAL");
std::fs::write(
dest.join("expected.txt"),
"table=compat\nrows=3\nsum_score=277\nmax_score=100\nfirst_name=alice\n",
)
.expect("write expected.txt");
eprintln!("captured fixture: {}", dest.display());
}