#![allow(
clippy::cast_possible_truncation,
clippy::doc_markdown,
clippy::manual_assert,
clippy::uninlined_format_args,
unused_mut,
unused_variables
)]
use std::io::{Read, Write};
use std::net::TcpStream;
use std::path::{Path, PathBuf};
use std::time::Duration;
use spg_wire::{Frame, Op, WireValue, build_query, encode, parse_data_row, parse_data_row_batch};
mod common;
fn local_spawn(
db: &std::path::Path,
wal: &std::path::Path,
env: &[(&str, String)],
) -> (std::process::Child, common::ServerAddrs) {
let mut b = common::ServerBuilder::new()
.arg_path(db)
.arg("-")
.arg_path(wal);
for (k, v) in env {
b = b.env(*k, v);
}
b.spawn()
}
const READ_TIMEOUT: Duration = Duration::from_secs(5);
fn unique_tmpdir() -> PathBuf {
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
let p = std::env::temp_dir().join(format!("spg-e2e-bkup-{nanos}"));
std::fs::create_dir_all(&p).unwrap();
p
}
fn read_frame(s: &mut TcpStream) -> Frame {
let mut header = [0u8; spg_wire::FRAME_HEADER_LEN];
s.read_exact(&mut header).unwrap();
let payload_len = u32::from_le_bytes([header[0], header[1], header[2], header[3]]) as usize;
let op = Op::from_byte(header[4]).unwrap();
let mut payload = vec![0u8; payload_len];
if payload_len > 0 {
s.read_exact(&mut payload).unwrap();
}
Frame { op, payload }
}
fn send(s: &mut TcpStream, f: &Frame) {
let mut out = Vec::new();
encode(f, &mut out).unwrap();
s.write_all(&out).unwrap();
}
fn exec_ok(s: &mut TcpStream, sql: &str) {
send(s, &build_query(sql));
let f = read_frame(s);
if f.op == Op::ErrorResponse {
let msg = spg_wire::parse_error_response(&f).unwrap_or("<undecodable>");
panic!("server rejected SQL {sql:?}: {msg}");
}
assert_eq!(f.op, Op::CommandComplete, "expected CC for {sql:?}");
}
fn exec_with_count(s: &mut TcpStream, sql: &str) -> u64 {
send(s, &build_query(sql));
let f = read_frame(s);
if f.op == Op::ErrorResponse {
let msg = spg_wire::parse_error_response(&f).unwrap_or("<undecodable>");
panic!("server rejected SQL {sql:?}: {msg}");
}
assert_eq!(f.op, Op::CommandComplete, "expected CC for {sql:?}");
spg_wire::parse_command_complete(&f).unwrap()
}
fn select_count(s: &mut TcpStream, sql: &str) -> i64 {
send(s, &build_query(sql));
let rd = read_frame(s);
if rd.op == Op::ErrorResponse {
let msg = spg_wire::parse_error_response(&rd).unwrap_or("<undecodable>");
panic!("server rejected SQL {sql:?}: {msg}");
}
assert_eq!(rd.op, Op::RowDescription);
let mut count: i64 = -1;
loop {
let f = read_frame(s);
match f.op {
Op::DataRow => {
let row = parse_data_row(&f).unwrap();
count = wire_to_i64(&row[0]);
}
Op::DataRowBatch => {
let rows = parse_data_row_batch(&f).unwrap();
count = wire_to_i64(&rows[0][0]);
}
Op::CommandComplete => return count,
other => panic!("unexpected {other:?}"),
}
}
}
fn wire_to_i64(v: &WireValue) -> i64 {
match v {
WireValue::Int(n) => i64::from(*n),
WireValue::BigInt(n) => *n,
WireValue::Text(t) => t.parse().unwrap(),
other => panic!("expected integer, got {other:?}"),
}
}
fn apply_bundle_to(dest_db: &Path, dest_wal: &Path, bundle: &Path) -> (u8, u64, u64) {
let bytes = std::fs::read(bundle).unwrap();
assert!(
&bytes[..8] == b"SPGBKUP\x01" || &bytes[..8] == b"SPGBKUP\x02",
"unknown bundle magic {:?}",
&bytes[..8]
);
let kind = bytes[8];
let since = u64::from_le_bytes(bytes[9..17].try_into().unwrap());
let snap_len = u64::from_le_bytes(bytes[25..33].try_into().unwrap()) as usize;
let snap_end = 33 + snap_len;
let wal_pos = u64::from_le_bytes(bytes[snap_end..snap_end + 8].try_into().unwrap());
let wal_len =
u64::from_le_bytes(bytes[snap_end + 8..snap_end + 16].try_into().unwrap()) as usize;
let wal_start = snap_end + 16;
let wal_slice = &bytes[wal_start..wal_start + wal_len];
if snap_len > 0 {
std::fs::write(dest_db, &bytes[33..33 + snap_len]).unwrap();
std::fs::write(dest_wal, wal_slice).unwrap();
} else {
let mut f = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(dest_wal)
.unwrap();
f.write_all(wal_slice).unwrap();
f.sync_data().unwrap();
}
(kind, since, wal_pos)
}
#[test]
fn full_plus_incremental_round_trip_restores_state() {
let dir = unique_tmpdir();
let db = dir.join("a.db");
let wal = dir.join("a.wal");
let (raw, addrs) = local_spawn(&db, &wal, &[]);
let child = common::ChildGuard(raw);
let mut s = common::connect_to(&addrs.native);
s.set_read_timeout(Some(READ_TIMEOUT)).unwrap();
exec_ok(&mut s, "CREATE TABLE k (id INT NOT NULL, v INT NOT NULL)");
for i in 1..=3 {
exec_ok(&mut s, &format!("INSERT INTO k VALUES ({i}, {})", i * 10));
}
let full_bundle = dir.join("full.bkp");
let pos_after_full = exec_with_count(&mut s, &format!("BACKUP TO '{}'", full_bundle.display()));
for i in 4..=5 {
exec_ok(&mut s, &format!("INSERT INTO k VALUES ({i}, {})", i * 10));
}
let incr_bundle = dir.join("incr.bkp");
let pos_after_incr = exec_with_count(
&mut s,
&format!(
"BACKUP TO '{}' INCREMENTAL SINCE {}",
incr_bundle.display(),
pos_after_full
),
);
assert!(pos_after_incr > pos_after_full);
drop(s);
drop(child);
let rec_dir = unique_tmpdir();
let rec_db = rec_dir.join("rec.db");
let rec_wal = rec_dir.join("rec.wal");
let (kind_full, _, _) = apply_bundle_to(&rec_db, &rec_wal, &full_bundle);
assert_eq!(kind_full, 0, "expected full bundle marker");
let (kind_incr, since_incr, _) = apply_bundle_to(&rec_db, &rec_wal, &incr_bundle);
assert_eq!(kind_incr, 1, "expected incremental marker");
assert_eq!(since_incr, pos_after_full);
let (raw, rec_addrs) = local_spawn(&rec_db, &rec_wal, &[]);
let _rec_child = common::ChildGuard(raw);
let mut rs = common::connect_to(&rec_addrs.native);
rs.set_read_timeout(Some(READ_TIMEOUT)).unwrap();
assert_eq!(select_count(&mut rs, "SELECT count(*) FROM k"), 5);
}
#[test]
fn pitr_via_replay_upto_truncates_history() {
let dir = unique_tmpdir();
let db = dir.join("p.db");
let wal = dir.join("p.wal");
let (raw, addrs) = local_spawn(&db, &wal, &[]);
let child = common::ChildGuard(raw);
let mut s = common::connect_to(&addrs.native);
s.set_read_timeout(Some(READ_TIMEOUT)).unwrap();
exec_ok(&mut s, "CREATE TABLE p (id INT NOT NULL)");
exec_ok(&mut s, "INSERT INTO p VALUES (1)");
exec_ok(&mut s, "INSERT INTO p VALUES (2)");
let bundle = dir.join("snap.bkp");
let pivot_pos = exec_with_count(&mut s, &format!("BACKUP TO '{}'", bundle.display()));
exec_ok(&mut s, "INSERT INTO p VALUES (3)");
exec_ok(&mut s, "INSERT INTO p VALUES (4)");
drop(s);
drop(child);
let (raw, rec_addrs) = local_spawn(&db, &wal, &[("SPG_REPLAY_UPTO", pivot_pos.to_string())]);
let _rec_child = common::ChildGuard(raw);
let mut rs = common::connect_to(&rec_addrs.native);
rs.set_read_timeout(Some(READ_TIMEOUT)).unwrap();
assert_eq!(select_count(&mut rs, "SELECT count(*) FROM p"), 2);
}