spg-server 7.11.18

SPG daemon — listens for self-built wire-frame connections and PG-wire (libpq-compatible).
#![allow(
    clippy::doc_markdown,
    clippy::uninlined_format_args,
    unused_mut,
    unused_variables
)]

//! v4.5 end-to-end:
//! - SPG_QUERY_TIMEOUT_MS: a long-running scan is cancelled and the
//!   server returns a clear cancelled error instead of streaming.
//! - SPG_IDLE_TIMEOUT_SEC: a connection that goes silent past the
//!   budget gets closed by the server (the client sees a clean
//!   error frame + EOF).

use std::io::{Read, Write};
use std::net::TcpStream;
use std::time::{Duration, Instant};

use spg_wire::{Frame, Op, build_query, encode, parse_error_response};

mod common;

fn local_spawn(envs: &[(&str, &str)]) -> (std::process::Child, common::ServerAddrs) {
    let mut b = common::ServerBuilder::new();
    for (k, v) in envs {
        b = b.env(*k, *v);
    }
    b.spawn()
}

const READ_TIMEOUT: Duration = Duration::from_secs(5);

fn read_frame(s: &mut TcpStream) -> Frame {
    let mut header = [0u8; spg_wire::FRAME_HEADER_LEN];
    s.read_exact(&mut header).unwrap();
    let payload_len = u32::from_le_bytes([header[0], header[1], header[2], header[3]]) as usize;
    let op = Op::from_byte(header[4]).unwrap();
    let mut payload = vec![0u8; payload_len];
    if payload_len > 0 {
        s.read_exact(&mut payload).unwrap();
    }
    Frame { op, payload }
}

fn send(s: &mut TcpStream, f: &Frame) {
    let mut out = Vec::new();
    encode(f, &mut out).unwrap();
    s.write_all(&out).unwrap();
}

fn exec_ok(s: &mut TcpStream, sql: &str) {
    send(s, &build_query(sql));
    let f = read_frame(s);
    assert_eq!(
        f.op,
        Op::CommandComplete,
        "expected CC for {sql:?}, got {:?}",
        f.op
    );
}

#[test]
fn query_timeout_cancels_long_scan() {
    // 50 ms budget — well under what scanning 50k rows + per-row
    // WHERE eval takes in debug build.
    let (raw, addrs) = local_spawn(&[("SPG_QUERY_TIMEOUT_MS", "50")]);
    let mut child = common::ChildGuard(raw);
    let mut s = common::connect_to(&addrs.native);
    s.set_read_timeout(Some(READ_TIMEOUT)).unwrap();

    exec_ok(&mut s, "CREATE TABLE t (id INT NOT NULL)");
    // Seed enough rows that the WHERE-evaluation full scan takes
    // longer than the 50 ms budget in debug mode. 50k * per-row
    // expr eval comfortably exceeds it on M1.
    for i in 0..50_000 {
        exec_ok(&mut s, &format!("INSERT INTO t VALUES ({i})"));
    }

    // Now run a deliberately heavy SELECT — UDF-free but
    // per-row arithmetic is enough to take >>50 ms in debug.
    send(&mut s, &build_query("SELECT id FROM t WHERE id + 0 > -1"));
    // Either RowDescription comes back first (some rows materialized
    // before the watchdog fires), or an ErrorResponse comes back
    // immediately. Drain until we hit the terminal frame.
    let mut saw_cancel = false;
    loop {
        let f = read_frame(&mut s);
        match f.op {
            Op::ErrorResponse => {
                let msg = parse_error_response(&f).unwrap();
                assert!(
                    msg.contains("cancel") || msg.contains("timeout"),
                    "expected cancel/timeout, got {msg:?}"
                );
                saw_cancel = true;
                break;
            }
            Op::CommandComplete => {
                // SELECT finished before the watchdog could fire —
                // means the debug build was unexpectedly fast.
                // Flaky on faster hosts; skip the assertion rather
                // than failing.
                eprintln!("note: query finished within budget; cancellation path not exercised");
                break;
            }
            Op::RowDescription | Op::DataRow | Op::DataRowBatch => {}
            other => panic!("unexpected: {other:?}"),
        }
    }
    if !saw_cancel {
        eprintln!("query_timeout_cancels_long_scan: did not observe cancellation this run");
    }
}

#[test]
fn idle_timeout_closes_silent_connection() {
    let (raw, addrs) = local_spawn(&[("SPG_IDLE_TIMEOUT_SEC", "1")]);
    let mut child = common::ChildGuard(raw);
    let mut s = common::connect_to(&addrs.native);
    // Generous client-side read timeout so we observe the server's
    // budget, not ours.
    s.set_read_timeout(Some(Duration::from_secs(5))).unwrap();

    // Do one quick query so we know the connection is healthy.
    exec_ok(&mut s, "CREATE TABLE t (id INT NOT NULL)");

    // Now sit idle past the budget. The server should send us a
    // clear error frame and close the socket.
    let start = Instant::now();
    let f = read_frame(&mut s);
    let elapsed = start.elapsed();
    assert_eq!(f.op, Op::ErrorResponse, "expected idle-timeout error");
    let msg = parse_error_response(&f).unwrap();
    assert!(msg.contains("idle"), "expected idle hint, got {msg:?}");
    // Should fire close to the 1 s budget — give a 2 s ceiling so
    // we don't false-fail under load.
    assert!(
        elapsed < Duration::from_secs(2),
        "idle timeout took {elapsed:?}, expected ~1s"
    );

    // The server should have closed; the next read should return 0.
    let mut buf = [0u8; 16];
    let n = s.read(&mut buf).unwrap_or(0);
    assert_eq!(n, 0, "server should have closed the socket");
}