#![allow(unused_mut, unused_variables, clippy::uninlined_format_args)]
use std::io::{Read, Write};
use std::net::TcpStream;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use spg_wire::{Frame, Op, build_query, encode, parse_command_complete};
mod common;
const READ_TIMEOUT: Duration = Duration::from_secs(5);
static TMPDIR_COUNTER: AtomicU64 = AtomicU64::new(0);
fn unique_tmpdir(tag: &str) -> PathBuf {
let pid = std::process::id();
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0, |d| d.as_nanos());
let serial = TMPDIR_COUNTER.fetch_add(1, Ordering::SeqCst);
let dir = std::env::temp_dir().join(format!("spg-wait-e2e-{tag}-{pid}-{nanos}-{serial}"));
std::fs::create_dir_all(&dir).expect("create tmpdir");
dir
}
fn spawn_with_repl(
db: &std::path::Path,
wal: &std::path::Path,
) -> (std::process::Child, common::ServerAddrs) {
common::ServerBuilder::new()
.arg_path(db)
.arg("-")
.arg_path(wal)
.with_repl()
.spawn()
}
fn spawn_follower(
db: &std::path::Path,
wal: &std::path::Path,
follow_of: &str,
) -> (std::process::Child, common::ServerAddrs) {
common::ServerBuilder::new()
.arg_path(db)
.arg("-")
.arg_path(wal)
.env("SPG_FOLLOW_OF", follow_of)
.spawn()
}
fn wait_for_addr(addr: &str) {
let deadline = Instant::now() + Duration::from_secs(3);
while TcpStream::connect(addr).is_err() {
assert!(Instant::now() < deadline);
std::thread::sleep(Duration::from_millis(50));
}
}
fn read_frame(s: &mut TcpStream) -> Frame {
let mut header = [0u8; spg_wire::FRAME_HEADER_LEN];
s.read_exact(&mut header).expect("read header");
let payload_len = u32::from_le_bytes([header[0], header[1], header[2], header[3]]) as usize;
let op = Op::from_byte(header[4]).expect("known op");
let mut payload = vec![0u8; payload_len];
if payload_len > 0 {
s.read_exact(&mut payload).expect("read payload");
}
Frame { op, payload }
}
fn send(s: &mut TcpStream, f: &Frame) {
let mut out = Vec::new();
encode(f, &mut out).unwrap();
s.write_all(&out).unwrap();
}
fn exec_ok_count(s: &mut TcpStream, sql: &str) -> u64 {
send(s, &build_query(sql));
loop {
let f = read_frame(s);
match f.op {
Op::CommandComplete => return parse_command_complete(&f).unwrap(),
Op::ErrorResponse | Op::Error => {
let msg = spg_wire::parse_error_response(&f).unwrap_or("<undecodable>");
panic!("server rejected SQL {sql:?}: {msg}");
}
_ => {}
}
}
}
fn exec_ok(s: &mut TcpStream, sql: &str) {
exec_ok_count(s, sql);
}
#[test]
fn wait_for_position_zero_returns_immediately() {
let dir = unique_tmpdir("zero");
let (raw, addrs) = spawn_with_repl(&dir.join("s.db"), &dir.join("s.wal"));
let _guard = common::ChildGuard(raw);
let mut client = common::connect_to(&addrs.native);
client.set_read_timeout(Some(READ_TIMEOUT)).unwrap();
let t0 = Instant::now();
let reached = exec_ok_count(&mut client, "WAIT FOR WAL POSITION 0");
let elapsed = t0.elapsed();
assert_eq!(reached, 1, "WAIT FOR WAL POSITION 0 must always reach");
assert!(
elapsed < Duration::from_millis(50),
"expected immediate return, took {:?}",
elapsed
);
}
#[test]
fn wait_for_position_timeout_returns_zero() {
let dir = unique_tmpdir("tmo");
let (raw, addrs) = spawn_with_repl(&dir.join("s.db"), &dir.join("s.wal"));
let _guard = common::ChildGuard(raw);
let mut client = common::connect_to(&addrs.native);
client.set_read_timeout(Some(READ_TIMEOUT)).unwrap();
let t0 = Instant::now();
let reached = exec_ok_count(
&mut client,
"WAIT FOR WAL POSITION 999999999 WITH TIMEOUT 300",
);
let elapsed = t0.elapsed();
assert_eq!(reached, 0, "timeout case must report reached=0");
assert!(
elapsed >= Duration::from_millis(280),
"expected ≥ 280ms wait, took {:?}",
elapsed
);
assert!(
elapsed < Duration::from_millis(1000),
"expected ≤ 1s wait, took {:?}",
elapsed
);
}
#[test]
fn wait_for_position_resolves_when_follower_catches_up() {
let dir_m = unique_tmpdir("master");
let dir_f = unique_tmpdir("follower");
let (m_raw, m_addrs) = spawn_with_repl(&dir_m.join("m.db"), &dir_m.join("m.wal"));
let _m_guard = common::ChildGuard(m_raw);
let mut m_client = common::connect_to(&m_addrs.native);
m_client.set_read_timeout(Some(READ_TIMEOUT)).unwrap();
wait_for_addr(m_addrs.repl.as_ref().unwrap());
let (f_raw, f_addrs) = spawn_follower(
&dir_f.join("f.db"),
&dir_f.join("f.wal"),
m_addrs.repl.as_ref().unwrap(),
);
let _f_guard = common::ChildGuard(f_raw);
let mut f_client = common::connect_to(&f_addrs.native);
f_client.set_read_timeout(Some(READ_TIMEOUT)).unwrap();
exec_ok(&mut m_client, "CREATE TABLE t (id INT NOT NULL)");
for i in 0..10 {
exec_ok(&mut m_client, &format!("INSERT INTO t VALUES ({i})"));
}
std::thread::sleep(Duration::from_millis(400));
let target: u64 = 50; let t0 = Instant::now();
let reached = exec_ok_count(
&mut f_client,
&format!("WAIT FOR WAL POSITION {target} WITH TIMEOUT 5000"),
);
let elapsed = t0.elapsed();
assert_eq!(reached, 1, "follower must have crossed position {target}");
assert!(
elapsed < Duration::from_millis(200),
"expected near-instant return (already crossed), took {:?}",
elapsed
);
}
#[test]
fn wait_for_resolves_after_target_is_reached() {
let dir_m = unique_tmpdir("master2");
let dir_f = unique_tmpdir("follower2");
let (m_raw, m_addrs) = spawn_with_repl(&dir_m.join("m.db"), &dir_m.join("m.wal"));
let _m_guard = common::ChildGuard(m_raw);
let mut m_client = common::connect_to(&m_addrs.native);
m_client.set_read_timeout(Some(READ_TIMEOUT)).unwrap();
wait_for_addr(m_addrs.repl.as_ref().unwrap());
let (f_raw, f_addrs) = spawn_follower(
&dir_f.join("f.db"),
&dir_f.join("f.wal"),
m_addrs.repl.as_ref().unwrap(),
);
let _f_guard = common::ChildGuard(f_raw);
let mut f_client = common::connect_to(&f_addrs.native);
f_client.set_read_timeout(Some(READ_TIMEOUT)).unwrap();
exec_ok(&mut m_client, "CREATE TABLE t (id INT NOT NULL)");
std::thread::sleep(Duration::from_millis(300));
let target: u64 = 500;
let m_native = m_addrs.native.clone();
let writer = std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(200));
let mut s = common::connect_to(&m_native);
s.set_read_timeout(Some(READ_TIMEOUT)).unwrap();
for i in 0..50 {
exec_ok(&mut s, &format!("INSERT INTO t VALUES ({i})"));
}
});
let t0 = Instant::now();
let reached = exec_ok_count(
&mut f_client,
&format!("WAIT FOR WAL POSITION {target} WITH TIMEOUT 5000"),
);
let elapsed = t0.elapsed();
writer.join().unwrap();
assert_eq!(
reached, 1,
"follower must catch up to position {target} within 5s"
);
assert!(
elapsed < Duration::from_millis(5000),
"expected resolve under 5s, took {:?}",
elapsed
);
}
#[test]
fn wait_for_no_timeout_with_zero_target_does_not_block() {
let dir = unique_tmpdir("notmo");
let (raw, addrs) = spawn_with_repl(&dir.join("s.db"), &dir.join("s.wal"));
let _guard = common::ChildGuard(raw);
let mut client = common::connect_to(&addrs.native);
client.set_read_timeout(Some(READ_TIMEOUT)).unwrap();
let t0 = Instant::now();
let reached = exec_ok_count(&mut client, "WAIT FOR WAL POSITION 0");
let elapsed = t0.elapsed();
assert_eq!(reached, 1);
assert!(elapsed < Duration::from_millis(50));
}