#![allow(
clippy::doc_markdown,
clippy::uninlined_format_args,
unused_mut,
unused_variables
)]
use std::io::{Read, Write};
use std::net::TcpStream;
use std::time::{Duration, Instant};
use spg_wire::{Frame, Op, build_query, encode, parse_error_response};
mod common;
fn local_spawn(envs: &[(&str, &str)]) -> (std::process::Child, common::ServerAddrs) {
let mut b = common::ServerBuilder::new();
for (k, v) in envs {
b = b.env(*k, *v);
}
b.spawn()
}
const READ_TIMEOUT: Duration = Duration::from_secs(5);
fn read_frame(s: &mut TcpStream) -> Frame {
let mut header = [0u8; spg_wire::FRAME_HEADER_LEN];
s.read_exact(&mut header).unwrap();
let payload_len = u32::from_le_bytes([header[0], header[1], header[2], header[3]]) as usize;
let op = Op::from_byte(header[4]).unwrap();
let mut payload = vec![0u8; payload_len];
if payload_len > 0 {
s.read_exact(&mut payload).unwrap();
}
Frame { op, payload }
}
fn send(s: &mut TcpStream, f: &Frame) {
let mut out = Vec::new();
encode(f, &mut out).unwrap();
s.write_all(&out).unwrap();
}
fn exec_ok(s: &mut TcpStream, sql: &str) {
send(s, &build_query(sql));
let f = read_frame(s);
assert_eq!(
f.op,
Op::CommandComplete,
"expected CC for {sql:?}, got {:?}",
f.op
);
}
#[test]
fn query_timeout_cancels_long_scan() {
let (raw, addrs) = local_spawn(&[("SPG_QUERY_TIMEOUT_MS", "50")]);
let mut child = common::ChildGuard(raw);
let mut s = common::connect_to(&addrs.native);
s.set_read_timeout(Some(READ_TIMEOUT)).unwrap();
exec_ok(&mut s, "CREATE TABLE t (id INT NOT NULL)");
for i in 0..50_000 {
exec_ok(&mut s, &format!("INSERT INTO t VALUES ({i})"));
}
send(&mut s, &build_query("SELECT id FROM t WHERE id + 0 > -1"));
let mut saw_cancel = false;
loop {
let f = read_frame(&mut s);
match f.op {
Op::ErrorResponse => {
let msg = parse_error_response(&f).unwrap();
assert!(
msg.contains("cancel") || msg.contains("timeout"),
"expected cancel/timeout, got {msg:?}"
);
saw_cancel = true;
break;
}
Op::CommandComplete => {
eprintln!("note: query finished within budget; cancellation path not exercised");
break;
}
Op::RowDescription | Op::DataRow | Op::DataRowBatch => {}
other => panic!("unexpected: {other:?}"),
}
}
if !saw_cancel {
eprintln!("query_timeout_cancels_long_scan: did not observe cancellation this run");
}
}
#[test]
fn idle_timeout_closes_silent_connection() {
let (raw, addrs) = local_spawn(&[("SPG_IDLE_TIMEOUT_SEC", "1")]);
let mut child = common::ChildGuard(raw);
let mut s = common::connect_to(&addrs.native);
s.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
exec_ok(&mut s, "CREATE TABLE t (id INT NOT NULL)");
let start = Instant::now();
let f = read_frame(&mut s);
let elapsed = start.elapsed();
assert_eq!(f.op, Op::ErrorResponse, "expected idle-timeout error");
let msg = parse_error_response(&f).unwrap();
assert!(msg.contains("idle"), "expected idle hint, got {msg:?}");
assert!(
elapsed < Duration::from_secs(2),
"idle timeout took {elapsed:?}, expected ~1s"
);
let mut buf = [0u8; 16];
let n = s.read(&mut buf).unwrap_or(0);
assert_eq!(n, 0, "server should have closed the socket");
}