use std::io::{Read, Write};
use std::net::TcpStream;
use std::time::{Duration, Instant};
use spg_wire::{Op, build_query, encode, parse_data_row_batch, parse_error_response};
mod common;
use common::{ChildGuard, ServerBuilder};
const READ_TIMEOUT: Duration = Duration::from_secs(15);
const WORKLOAD_BUDGET: Duration = Duration::from_secs(60);
fn send_query(s: &mut TcpStream, sql: &str) {
let q = build_query(sql);
let mut out = Vec::new();
encode(&q, &mut out).unwrap();
s.write_all(&out).unwrap();
}
fn drain_until_cc(s: &mut TcpStream, sql: &str) {
loop {
let mut header = [0u8; spg_wire::FRAME_HEADER_LEN];
s.read_exact(&mut header).unwrap();
let len = u32::from_le_bytes([header[0], header[1], header[2], header[3]]) as usize;
let op = Op::from_byte(header[4]).unwrap();
let mut body = vec![0u8; len];
if len > 0 {
s.read_exact(&mut body).unwrap();
}
match op {
Op::CommandComplete => return,
Op::ErrorResponse | Op::Error => {
let f = spg_wire::Frame { op, payload: body };
panic!(
"SQL failed: {sql:?} → {}",
parse_error_response(&f).unwrap_or("<undecodable>")
);
}
_ => continue,
}
}
}
fn exec_native(s: &mut TcpStream, sql: &str) {
send_query(s, sql);
drain_until_cc(s, sql);
}
fn count_rows(s: &mut TcpStream, sql: &str) -> usize {
send_query(s, sql);
let mut total = 0usize;
loop {
let mut header = [0u8; spg_wire::FRAME_HEADER_LEN];
s.read_exact(&mut header).unwrap();
let len = u32::from_le_bytes([header[0], header[1], header[2], header[3]]) as usize;
let op = Op::from_byte(header[4]).unwrap();
let mut body = vec![0u8; len];
if len > 0 {
s.read_exact(&mut body).unwrap();
}
match op {
Op::DataRow => total += 1,
Op::DataRowBatch => {
let f = spg_wire::Frame { op, payload: body };
if let Ok(rows) = parse_data_row_batch(&f) {
total += rows.len();
}
}
Op::CommandComplete => return total,
Op::ErrorResponse | Op::Error => {
let f = spg_wire::Frame { op, payload: body };
panic!(
"select failed: {sql} → {}",
parse_error_response(&f).unwrap_or("<undecodable>")
);
}
_ => continue,
}
}
}
#[test]
fn workload_completes_under_sustained_writes() {
let (raw, addrs) = ServerBuilder::new()
.env("SPG_HOT_TIER_BYTES", "256")
.env("SPG_FREEZER_TICK_MS", "50")
.env("SPG_FREEZER_BATCH_ROWS", "16")
.env("SPG_FREEZER_WORKERS", "4")
.spawn();
let _guard = ChildGuard(raw);
let mut s = TcpStream::connect(&addrs.native).unwrap();
s.set_read_timeout(Some(READ_TIMEOUT)).unwrap();
exec_native(&mut s, "CREATE TABLE t (id INT NOT NULL, payload TEXT)");
exec_native(&mut s, "CREATE INDEX by_id ON t (id)");
let n = 200i64;
let start = Instant::now();
for i in 0..n {
exec_native(
&mut s,
&format!("INSERT INTO t VALUES ({i}, 'aaaaaaaaaaaaaa-row-{i}')"),
);
}
let workload_wall = start.elapsed();
assert!(
workload_wall < WORKLOAD_BUDGET,
"200-row workload took {workload_wall:?} (budget {WORKLOAD_BUDGET:?})"
);
std::thread::sleep(Duration::from_millis(500));
let cold_segs = count_rows(&mut s, "SELECT * FROM spg_stat_segment");
assert!(
cold_segs >= 2,
"expected ≥ 2 cold segments after sustained writes, got {cold_segs}"
);
let mut missing = Vec::new();
for id in 0..n {
let cnt = count_rows(&mut s, &format!("SELECT id FROM t WHERE id = {id}"));
if cnt != 1 {
missing.push(id);
}
}
assert!(
missing.is_empty(),
"{} PKs disappeared after parallel-freezer workload: {:?}",
missing.len(),
&missing[..missing.len().min(10)]
);
}