#![allow(
clippy::cast_lossless,
clippy::cast_possible_truncation,
clippy::doc_markdown,
clippy::manual_assert,
clippy::uninlined_format_args,
clippy::unreadable_literal,
unused_mut,
unused_variables
)]
use std::io::{Read, Write};
use std::net::TcpStream;
use std::path::PathBuf;
use std::thread;
use std::time::{Duration, Instant};
const STARTUP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(8);
mod common;
fn local_spawn_with_http(
db: &std::path::Path,
wal: &std::path::Path,
env: &[(&str, &str)],
) -> (std::process::Child, common::ServerAddrs) {
let mut b = common::ServerBuilder::new()
.arg_path(db)
.arg("-")
.arg_path(wal)
.with_http();
for (k, v) in env {
b = b.env(*k, *v);
}
b.spawn()
}
fn local_spawn(
db: &std::path::Path,
wal: &std::path::Path,
env: &[(&str, &str)],
) -> (std::process::Child, common::ServerAddrs) {
let mut b = common::ServerBuilder::new()
.arg_path(db)
.arg("-")
.arg_path(wal);
for (k, v) in env {
b = b.env(*k, *v);
}
b.spawn()
}
const READ_TIMEOUT: Duration = Duration::from_secs(5);
fn workspace_root() -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.parent()
.unwrap()
.parent()
.unwrap()
.to_path_buf()
}
fn prod_ready_md() -> String {
std::fs::read_to_string(workspace_root().join("PROD_READY.md")).unwrap_or_default()
}
fn prod_doc_missing(name: &str) -> bool {
!workspace_root().join(name).exists()
}
fn machine_marked_rows() -> Vec<String> {
let md = prod_ready_md();
let mut ids = Vec::new();
for line in md.lines() {
if !line.starts_with("| ") {
continue;
}
let cells: Vec<&str> = line.split('|').collect();
if cells.len() < 4 {
continue;
}
let id = cells[1].trim();
let name = cells[2].trim();
if name.contains("[machine]")
&& !id.is_empty()
&& id.chars().next().unwrap().is_ascii_digit()
{
ids.push(id.to_string());
}
}
ids
}
fn unique_tmpdir(tag: &str) -> PathBuf {
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
let p = std::env::temp_dir().join(format!("spg-prod-ready-{tag}-{nanos}"));
std::fs::create_dir_all(&p).unwrap();
p
}
fn assert_doc_has_sections(doc_name: &str, required: &[&str]) {
let path = workspace_root().join(doc_name);
let Ok(src) = std::fs::read_to_string(&path) else {
eprintln!(
"SKIP: {doc_name} not present at workspace root ({})",
path.display()
);
return;
};
for needle in required {
assert!(
src.contains(needle),
"{doc_name} missing required content: {needle:?}"
);
}
}
#[test]
fn row_3_7_gitleaks_in_ci() {
let yml =
std::fs::read_to_string(workspace_root().join(".github/workflows/ci.yml")).expect("ci.yml");
assert!(
yml.contains("gitleaks/gitleaks-action"),
"CI must run gitleaks for secret scanning"
);
}
#[test]
fn row_10_4_slo_contract_published() {
assert_doc_has_sections(
"PERFORMANCE.md",
&[
"## SLO contract",
"### Latency SLOs",
"### Throughput SLOs",
"### Replication SLOs",
],
);
}
#[test]
fn row_10_5_slo_smoke_test_present() {
let p = workspace_root().join("crates/spg-server/tests/slo_smoke.rs");
assert!(p.exists(), "slo_smoke.rs missing");
let src = std::fs::read_to_string(&p).unwrap();
assert!(src.contains("fn slo_smoke_select_and_insert_p99_under_budget"));
}
#[test]
fn row_3_9_cargo_deny_config_and_ci_present() {
assert!(
workspace_root().join("deny.toml").exists(),
"deny.toml must exist at workspace root"
);
let yml =
std::fs::read_to_string(workspace_root().join(".github/workflows/ci.yml")).expect("ci.yml");
assert!(yml.contains("cargo-deny-action"), "CI must run cargo-deny");
}
#[test]
fn row_3_11_fuzz_harnesses_present() {
let sql_fuzz = workspace_root().join("crates/spg-sql/tests/fuzz.rs");
let wire_fuzz = workspace_root().join("crates/spg-server/tests/e2e_fuzz.rs");
assert!(
sql_fuzz.exists(),
"SQL fuzz harness must exist at {}",
sql_fuzz.display()
);
assert!(
wire_fuzz.exists(),
"wire fuzz harness must exist at {}",
wire_fuzz.display()
);
let sql_src = std::fs::read_to_string(&sql_fuzz).unwrap();
let wire_src = std::fs::read_to_string(&wire_fuzz).unwrap();
assert!(sql_src.contains("fn fuzz_parse_statement_does_not_panic"));
assert!(wire_src.contains("fn fuzz_wire_frame_does_not_panic"));
}
#[test]
fn row_8_4_snapshot_backwards_compat_test_present() {
let p = workspace_root().join("crates/spg-server/tests/cross_version_compat.rs");
assert!(p.exists(), "cross_version_compat.rs missing");
let src = std::fs::read_to_string(&p).unwrap();
assert!(src.contains("fn every_fixture_restores_and_verifies"));
}
#[test]
fn row_8_5_stability_doc_present() {
assert_doc_has_sections(
"STABILITY.md",
&[
"## Frozen surfaces",
"Native wire protocol",
"Snapshot file format",
"Backup bundle format",
"Env-var contract",
],
);
}
#[test]
fn row_8_6_cross_version_fixture_present() {
let fixtures = workspace_root().join("xtests/compat-fixtures");
assert!(fixtures.is_dir(), "compat-fixtures dir missing");
let entries: Vec<_> = std::fs::read_dir(&fixtures)
.unwrap()
.filter_map(Result::ok)
.filter(|e| e.path().is_dir())
.collect();
assert!(
!entries.is_empty(),
"compat-fixtures must contain at least one version directory"
);
for e in &entries {
let p = e.path();
assert!(
p.join("expected.txt").exists(),
"{}/expected.txt missing",
p.display()
);
assert!(
p.join("full.bkp").exists(),
"{}/full.bkp missing",
p.display()
);
}
}
#[test]
fn row_2_5_restore_drill_doc_present() {
assert_doc_has_sections(
"RESTORE_DRILL.md",
&[
"Step 0 — establish what you have",
"Step 1 — assemble db + WAL from bundles",
"Step 2 — point-in-time recovery",
"Step 3 — start the recovered server",
"Step 4 — verify",
"Step 5 — re-bootstrap follower",
],
);
}
#[test]
fn row_2_6_restore_drill_e2e_test_present() {
let p = workspace_root().join("crates/spg-server/tests/e2e_restore_drill.rs");
let src = std::fs::read_to_string(&p)
.unwrap_or_else(|_| panic!("e2e_restore_drill.rs missing at {}", p.display()));
assert!(
src.contains("fn restore_drill_full_plus_incremental_recovers_row_count"),
"the named restore-drill test must exist"
);
}
#[test]
fn row_3_10_threat_model_documented() {
assert_doc_has_sections(
"SECURITY.md",
&["Threat model", "Secret handling", "What's out of scope"],
);
}
#[test]
fn row_3_12_cve_process_documented() {
assert_doc_has_sections(
"SECURITY.md",
&[
"Reporting a vulnerability",
"Acknowledge within",
"Initial triage within",
],
);
}
#[test]
fn row_7_2_deployment_doc_present() {
assert_doc_has_sections(
"DEPLOYMENT.md",
&[
"## Install",
"## File layout",
"## Environment variables",
"## Ports",
],
);
}
#[test]
fn row_7_3_runbook_present() {
assert_doc_has_sections(
"RUNBOOK.md",
&[
"Alert: high `spg_errors_total`",
"Alert: disk full",
"Alert: replication lag growing",
"Alert: backup taking longer",
"Alert: audit log verify fails",
],
);
}
#[test]
fn row_7_4_restore_drill_doc_plus_e2e() {
assert_doc_has_sections("RESTORE_DRILL.md", &["e2e_restore_drill.rs"]);
}
#[test]
fn row_7_5_security_doc_present() {
assert_doc_has_sections(
"SECURITY.md",
&["Reporting a vulnerability", "Secret handling"],
);
}
#[test]
fn row_7_6_changelog_present() {
assert_doc_has_sections(
"CHANGELOG.md",
&["[4.30.0]", "[4.29.0]", "[4.28.0]", "Format:"],
);
}
#[test]
fn row_1_9_partial_fsync_recovery_covered_by_e2e_chaos() {
let p = workspace_root().join("crates/spg-server/tests/e2e_chaos.rs");
let src = std::fs::read_to_string(&p)
.unwrap_or_else(|_| panic!("e2e_chaos.rs missing at {}", p.display()));
assert!(
src.contains("fn chaos_wal_tail_truncation_drops_partial_record_no_panic"),
"e2e_chaos.rs must contain the chaos_wal_tail_truncation_… test"
);
}
#[test]
fn row_1_8_storage_crc32_present_and_chaos_tested() {
let chaos_path = workspace_root().join("crates/spg-server/tests/e2e_chaos.rs");
let chaos_src = std::fs::read_to_string(&chaos_path)
.unwrap_or_else(|_| panic!("e2e_chaos.rs missing at {}", chaos_path.display()));
assert!(
chaos_src.contains("fn chaos_wal_bit_flip_caught_by_crc32_refuses_to_replay"),
"e2e_chaos.rs must contain the v4.37 bit-flip CRC32 test"
);
let engine_src = std::fs::read_to_string(workspace_root().join("crates/spg-engine/src/lib.rs"))
.expect("spg-engine lib.rs");
assert!(
engine_src.contains("ENVELOPE_VERSION_V2") && engine_src.contains("spg_crypto::crc32"),
"engine envelope must carry a v2 CRC32 trailer"
);
let main_src = std::fs::read_to_string(workspace_root().join("crates/spg-server/src/main.rs"))
.expect("main.rs");
assert!(
main_src.contains("WAL_V2_SENTINEL") && main_src.contains("encode_wal_record"),
"main.rs must encode v2 WAL records with CRC32"
);
let backup_src =
std::fs::read_to_string(workspace_root().join("crates/spg-server/src/backup.rs"))
.expect("backup.rs");
assert!(
backup_src.contains("MAGIC_V2") && backup_src.contains("SPGBKUP\\x02"),
"backup bundle writer must emit SPGBKUP\\x02 with CRC32"
);
}
#[test]
fn row_2_9_netsplit_chaos_covered_by_e2e() {
let p = workspace_root().join("crates/spg-server/tests/e2e_chaos_netsplit.rs");
let src = std::fs::read_to_string(&p)
.unwrap_or_else(|_| panic!("e2e_chaos_netsplit.rs missing at {}", p.display()));
assert!(
src.contains("fn netsplit_disconnect_then_heal_resyncs_without_loss_or_dup"),
"e2e_chaos_netsplit.rs must contain the netsplit chaos test"
);
}
#[test]
fn row_4_7_replication_lag_metric_covered_by_e2e() {
let p = workspace_root().join("crates/spg-server/tests/e2e_chaos_netsplit.rs");
let src = std::fs::read_to_string(&p)
.unwrap_or_else(|_| panic!("e2e_chaos_netsplit.rs missing at {}", p.display()));
assert!(
src.contains("fn follower_metrics_expose_replication_lag_after_status_frame"),
"e2e_chaos_netsplit.rs must contain the v4.36 lag-metric test"
);
let repl_src =
std::fs::read_to_string(workspace_root().join("crates/spg-server/src/replication.rs"))
.expect("replication.rs");
assert!(
repl_src.contains("SPGREPL\\x02") || repl_src.contains("MAGIC_V2"),
"replication.rs must declare the v2 magic for the status-frame protocol"
);
assert!(
repl_src.contains("FRAME_TYPE_STATUS"),
"replication.rs must define the status frame type"
);
let obs_src =
std::fs::read_to_string(workspace_root().join("crates/spg-server/src/observability.rs"))
.expect("observability.rs");
assert!(
obs_src.contains("spg_replication_lag_bytes")
&& obs_src.contains("spg_replication_lag_seconds"),
"observability.rs must emit both lag series"
);
}
#[test]
fn row_4_6_per_table_metrics_covered_by_e2e() {
let test_path = workspace_root().join("crates/spg-server/tests/e2e_table_metrics.rs");
let src = std::fs::read_to_string(&test_path)
.unwrap_or_else(|_| panic!("e2e_table_metrics.rs missing at {}", test_path.display()));
for needle in [
"fn table_metrics_default_top_n_emits_rows_and_bytes_per_table",
"fn table_metrics_allowlist_filters_and_orders",
"fn table_metrics_topn_caps_cardinality_under_load",
] {
assert!(
src.contains(needle),
"e2e_table_metrics.rs must contain the named v4.35 test: {needle}"
);
}
let obs_src =
std::fs::read_to_string(workspace_root().join("crates/spg-server/src/observability.rs"))
.expect("observability.rs");
assert!(
obs_src.contains("spg_table_rows{table=\"") && obs_src.contains("spg_table_bytes{table=\""),
"observability.rs must emit both spg_table_rows and spg_table_bytes series"
);
assert!(
obs_src.contains("SPG_METRICS_TABLE_ALLOWLIST"),
"observability.rs must parse SPG_METRICS_TABLE_ALLOWLIST"
);
assert!(
obs_src.contains("SPG_METRICS_TABLE_TOPN"),
"observability.rs must parse SPG_METRICS_TABLE_TOPN"
);
}
#[test]
fn row_1_11_in_memory_consistency_covered_by_e2e() {
let chaos_path = workspace_root().join("crates/spg-server/tests/e2e_chaos.rs");
let chaos_src = std::fs::read_to_string(&chaos_path)
.unwrap_or_else(|_| panic!("e2e_chaos.rs missing at {}", chaos_path.display()));
assert!(
chaos_src.contains(
"fn chaos_disk_full_no_preflight_rolls_back_in_memory_to_match_durable_state"
),
"e2e_chaos.rs must contain the v4.34 no-preflight rollback test"
);
let main_src = std::fs::read_to_string(workspace_root().join("crates/spg-server/src/main.rs"))
.expect("main.rs");
assert!(
main_src.contains("SPG_DISABLE_WAL_PREFLIGHT"),
"main.rs must declare the SPG_DISABLE_WAL_PREFLIGHT knob"
);
assert!(
main_src.contains("WAL_V3_TYPE_AUTO_COMMIT_SQL")
&& main_src.contains("encode_wal_v3_record"),
"main.rs must use the v4.41 single-record v3 framing for the implicit auto-commit \
(WAL_V3_TYPE_AUTO_COMMIT_SQL + encode_wal_v3_record)"
);
assert!(
main_src.contains("append_wal_v3_group"),
"main.rs must batch the v3 auto-commit fsync via append_wal_v3_group (v4.42 group commit)"
);
assert!(
main_src.contains("run_leader_commit_round"),
"main.rs must implement the v4.42 commit-barrier leader (run_leader_commit_round)"
);
assert!(
main_src.contains("needs_wrap"),
"main.rs must compute the implicit BEGIN..COMMIT wrap gate"
);
let slo_path = workspace_root().join("crates/spg-server/tests/slo_smoke.rs");
let slo_src = std::fs::read_to_string(&slo_path).expect("slo_smoke.rs");
assert!(
slo_src.contains("fn slo_wal_insert_p99_under_budget"),
"slo_smoke.rs must perf-gate the v4.34 wrap"
);
}
#[test]
fn row_2_7_graceful_shutdown_covered_by_e2e() {
let test_path = workspace_root().join("crates/spg-server/tests/e2e_graceful_shutdown.rs");
let src = std::fs::read_to_string(&test_path).unwrap_or_else(|_| {
panic!(
"e2e_graceful_shutdown.rs missing at {}",
test_path.display()
)
});
assert!(
src.contains("fn graceful_shutdown_drains_inflight_and_refuses_new_conns_and_exits_zero"),
"e2e_graceful_shutdown.rs must contain the named drain test"
);
let main_src = std::fs::read_to_string(workspace_root().join("crates/spg-server/src/main.rs"))
.expect("main.rs");
assert!(
main_src.contains("SPG_SHUTDOWN_DEADLINE_SEC"),
"main.rs must declare the SPG_SHUTDOWN_DEADLINE_SEC env var"
);
assert!(
main_src.contains("install_shutdown_handlers"),
"main.rs must install signal handlers for SIGTERM/SIGINT"
);
}
#[test]
fn row_4_5_slow_query_log_covered_by_e2e() {
let test_path = workspace_root().join("crates/spg-server/tests/e2e_slow_query_log.rs");
let src = std::fs::read_to_string(&test_path)
.unwrap_or_else(|_| panic!("e2e_slow_query_log.rs missing at {}", test_path.display()));
assert!(
src.contains("fn slow_query_log_fires_above_threshold_and_silent_below"),
"e2e_slow_query_log.rs must contain the named threshold test"
);
let main_src = std::fs::read_to_string(workspace_root().join("crates/spg-server/src/main.rs"))
.expect("main.rs");
assert!(
main_src.contains("SPG_SLOW_QUERY_LOG_MS"),
"main.rs must declare the SPG_SLOW_QUERY_LOG_MS env var"
);
assert!(
main_src.contains("\"event\":\"slow_query\""),
"main.rs must emit the slow_query JSON event"
);
}
#[test]
fn row_5_7_disk_watermark_covered_by_e2e() {
let test_path = workspace_root().join("crates/spg-server/tests/e2e_disk_watermark.rs");
let src = std::fs::read_to_string(&test_path)
.unwrap_or_else(|_| panic!("e2e_disk_watermark.rs missing at {}", test_path.display()));
assert!(
src.contains("fn disk_watermark_refuses_writes_keeps_reads_keeps_server_alive"),
"e2e_disk_watermark.rs must contain the named water-mark test"
);
let main_src = std::fs::read_to_string(workspace_root().join("crates/spg-server/src/main.rs"))
.expect("main.rs");
assert!(
main_src.contains("SPG_WAL_MIN_FREE_BYTES"),
"main.rs must declare the SPG_WAL_MIN_FREE_BYTES env var"
);
assert!(
main_src.contains("libc::statvfs"),
"main.rs must call statvfs for the WAL volume"
);
}
#[test]
fn row_1_10_disk_full_covered_by_e2e_chaos() {
let p = workspace_root().join("crates/spg-server/tests/e2e_chaos.rs");
let src = std::fs::read_to_string(&p)
.unwrap_or_else(|_| panic!("e2e_chaos.rs missing at {}", p.display()));
assert!(
src.contains("fn chaos_disk_full_returns_clean_error_and_keeps_serving"),
"e2e_chaos.rs must contain the chaos_disk_full_… test"
);
let main_src = std::fs::read_to_string(workspace_root().join("crates/spg-server/src/main.rs"))
.expect("main.rs");
assert!(
main_src.contains("SPG_FAIL_WAL_QUOTA_BYTES"),
"main.rs must declare the SPG_FAIL_WAL_QUOTA_BYTES knob"
);
}
#[test]
fn row_1_3_wal_replay_on_startup() {
let dir = unique_tmpdir("wal");
let db = dir.join("a.db");
let wal = dir.join("a.wal");
{
let (raw, addrs) = local_spawn(&db, &wal, &[]);
let mut c = common::ChildGuard(raw);
let mut s = common::connect_to(&addrs.native);
s.set_read_timeout(Some(READ_TIMEOUT)).unwrap();
exec_ok(&mut s, "CREATE TABLE t (id INT NOT NULL)");
exec_ok(&mut s, "INSERT INTO t VALUES (1)");
exec_ok(&mut s, "INSERT INTO t VALUES (2)");
}
let (raw, addrs) = local_spawn(&db, &wal, &[]);
let mut c = common::ChildGuard(raw);
let mut s = common::connect_to(&addrs.native);
s.set_read_timeout(Some(READ_TIMEOUT)).unwrap();
assert_eq!(select_int(&mut s, "SELECT count(*) FROM t"), 2);
}
#[test]
fn row_4_1_health_endpoint() {
let dir = unique_tmpdir("hz");
let db = dir.join("a.db");
let wal = dir.join("a.wal");
let (raw, addrs) = local_spawn_with_http(&db, &wal, &[]);
let mut c = common::ChildGuard(raw); let deadline = Instant::now() + STARTUP_TIMEOUT;
let mut s = loop {
match TcpStream::connect(addrs.http.as_ref().unwrap()) {
Ok(s) => break s,
Err(_) if Instant::now() < deadline => {
thread::sleep(Duration::from_millis(50));
}
Err(e) => panic!("/healthz never came up: {e}"),
}
};
s.write_all(b"GET /healthz HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n")
.unwrap();
let mut buf = String::new();
s.read_to_string(&mut buf).unwrap();
assert!(
buf.starts_with("HTTP/1.1 200"),
"/healthz returned: {buf:?}"
);
}
#[test]
fn row_4_2_prometheus_metrics() {
let dir = unique_tmpdir("mx");
let db = dir.join("a.db");
let wal = dir.join("a.wal");
let (raw, addrs) = local_spawn_with_http(&db, &wal, &[]);
let mut c = common::ChildGuard(raw);
let deadline = Instant::now() + STARTUP_TIMEOUT;
let mut s = loop {
match TcpStream::connect(addrs.http.as_ref().unwrap()) {
Ok(s) => break s,
Err(_) if Instant::now() < deadline => {
thread::sleep(Duration::from_millis(50));
}
Err(e) => panic!("/metrics never came up: {e}"),
}
};
s.write_all(b"GET /metrics HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n")
.unwrap();
let mut buf = String::new();
s.read_to_string(&mut buf).unwrap();
for required in [
"spg_server_info",
"spg_connections_active",
"spg_queries_total",
"spg_errors_total",
] {
assert!(
buf.contains(required),
"/metrics missing required series {required:?} in:\n{buf}"
);
}
}
#[test]
fn row_5_1_max_connections_enforced() {
let dir = unique_tmpdir("mc");
let db = dir.join("a.db");
let wal = dir.join("a.wal");
let (raw, addrs) = local_spawn(&db, &wal, &[("SPG_MAX_CONNECTIONS", "1")]);
let mut c = common::ChildGuard(raw);
let _hold = common::connect_to(&addrs.native);
let mut s2 = TcpStream::connect(&addrs.native).expect("second connect");
s2.set_read_timeout(Some(Duration::from_millis(500)))
.unwrap();
let mut buf = [0u8; 256];
let n = s2.read(&mut buf).unwrap_or(0);
let saw_err_frame = n >= 5 && buf[4] == 0x14;
let closed = n == 0;
assert!(
saw_err_frame || closed,
"expected ErrorResponse (op 0x14 at byte 4) or EOF on 2nd conn; got n={n} bytes={:?}",
&buf[..n.min(32)]
);
}
#[test]
fn row_8_2_native_wire_opcode_stable() {
use spg_wire::Op;
let expected: &[(u8, &str)] = &[
(0x00, "Ping"),
(0x01, "Pong"),
(0x02, "Auth"),
(0x03, "AuthUser"),
(0x10, "Query"),
(0x11, "RowDescription"),
(0x12, "DataRow"),
(0x13, "CommandComplete"),
(0x14, "ErrorResponse"),
(0x15, "Stats"),
(0x16, "StatsResponse"),
(0x17, "DataRowBatch"),
(0xFF, "Error"),
];
for (byte, name) in expected {
let op = Op::from_byte(*byte)
.unwrap_or_else(|_| panic!("opcode 0x{byte:02x} ({name}) must decode"));
let dbg = format!("{op:?}");
assert_eq!(
dbg, *name,
"opcode 0x{byte:02x} should be {name}, got {dbg}"
);
}
}
#[test]
fn row_9_8_ci_workflow_present() {
let path = workspace_root().join(".github/workflows/ci.yml");
let yml = std::fs::read_to_string(&path)
.unwrap_or_else(|_| panic!(".github/workflows/ci.yml must exist at {}", path.display()));
for job in [
"rustfmt",
"clippy",
"test",
"cargo-audit",
"prod_ready gate",
] {
assert!(
yml.contains(&format!("name: {job}")),
"CI missing required job {job:?}"
);
}
}
#[test]
fn row_9_2_perf_gates_present() {
let root = workspace_root();
for crate_name in [
"spg-wire",
"spg-sql",
"spg-storage",
"spg-crypto",
"spg-audit",
"spg-engine",
"spgctl",
] {
let p = root.join(format!("crates/{crate_name}/tests/perf_gate.rs"));
assert!(
p.exists(),
"perf_gate.rs missing for {crate_name}; expected at {}",
p.display()
);
}
}
#[test]
fn row_3_8_cargo_audit_in_ci() {
let yml = std::fs::read_to_string(workspace_root().join(".github/workflows/ci.yml"))
.expect("ci.yml must exist");
assert!(
yml.contains("rustsec/audit-check"),
"CI audit job must use rustsec/audit-check action"
);
}
#[test]
fn row_10_x_performance_doc_has_v4_27_baseline() {
let Ok(perf) = std::fs::read_to_string(workspace_root().join("PERFORMANCE.md")) else {
eprintln!("SKIP: PERFORMANCE.md not present at workspace root");
return;
};
assert!(
perf.contains("v4.27 competitor rerun"),
"PERFORMANCE.md must carry the v4.27 baseline section"
);
for table_hint in ["latency p50", "INSERT rows/s", "vector kNN"] {
assert!(
perf.contains(table_hint),
"PERFORMANCE.md missing table containing {table_hint:?}"
);
}
}
#[test]
fn row_6_3_concurrent_reads_dont_serialize() {
let dir = unique_tmpdir("cc");
let db = dir.join("a.db");
let wal = dir.join("a.wal");
let (raw, addrs) = local_spawn(&db, &wal, &[]);
let mut c = common::ChildGuard(raw);
let mut s = common::connect_to(&addrs.native);
s.set_read_timeout(Some(READ_TIMEOUT)).unwrap();
exec_ok(&mut s, "CREATE TABLE r (id INT NOT NULL, v INT NOT NULL)");
for i in 0..200 {
exec_ok(&mut s, &format!("INSERT INTO r VALUES ({i}, {})", i * 7));
}
drop(s);
let serial = run_reads(&addrs.native, 200);
let server_addr = addrs.native.clone();
let handles: Vec<_> = (0..4)
.map(|_| {
let a = server_addr.clone();
thread::spawn(move || run_reads(&a, 200))
})
.collect();
let parallel_started = Instant::now();
for h in handles {
let _ = h.join().unwrap();
}
let parallel_total = parallel_started.elapsed();
let bound = serial * 4;
assert!(
parallel_total < bound,
"parallel 4×200 reads {parallel_total:?} expected < serial×4 = {bound:?}"
);
}
#[test]
fn row_2_10_fast_restart_at_scale_covered_by_e2e() {
let manifest_path = workspace_root().join("crates/spg-manifest/src/lib.rs");
let manifest_src = std::fs::read_to_string(&manifest_path)
.unwrap_or_else(|_| panic!("manifest lib.rs missing at {}", manifest_path.display()));
assert!(
manifest_src.contains("SPGMAN01") && manifest_src.contains("MANIFEST_VERSION"),
"spg-manifest/src/lib.rs must declare the v10 magic + MANIFEST_VERSION constants"
);
let main_src = std::fs::read_to_string(workspace_root().join("crates/spg-server/src/main.rs"))
.expect("main.rs");
assert!(
main_src.contains("write_manifest_alongside"),
"main.rs must call write_manifest_alongside on snapshot writes"
);
assert!(
main_src.contains("load_manifest_and_preload_cold"),
"main.rs must call load_manifest_and_preload_cold on boot"
);
assert!(
main_src.contains("run_checkpoint_command") && main_src.contains("parse_checkpoint_intent"),
"main.rs must implement the CHECKPOINT SQL command + parser"
);
let e2e_path = workspace_root().join("crates/spg-server/tests/e2e_manifest.rs");
let e2e_src = std::fs::read_to_string(&e2e_path)
.unwrap_or_else(|_| panic!("e2e_manifest.rs missing at {}", e2e_path.display()));
for fn_name in [
"fn manifest_restores_cold_segments_across_restart",
"fn checkpoint_truncates_wal_and_persists_through_restart",
"fn checkpoint_rejects_non_admin_caller",
"fn restart_at_100m_under_60s_after_checkpoint",
] {
assert!(
e2e_src.contains(fn_name),
"e2e_manifest.rs must contain {fn_name}"
);
}
}
#[test]
#[allow(
clippy::too_many_lines,
reason = "many independent contract checks against the v5.4 surface; splitting into sub-tests would obscure the single PROD_READY row this gate corresponds to"
)]
fn row_1_12_async_commit_durability_window_covered_by_e2e() {
let main_src = std::fs::read_to_string(workspace_root().join("crates/spg-server/src/main.rs"))
.expect("main.rs");
assert!(
main_src.contains("fn synchronous_commit_disabled"),
"main.rs must define synchronous_commit_disabled() OnceLock parser"
);
assert!(
main_src.contains("WAL_V3_TYPE_DURABILITY_CHECKPOINT"),
"main.rs must register the v3 0x02 durability_checkpoint kind tag"
);
assert!(
main_src.contains("fn encode_durability_marker")
&& main_src.contains("fn append_durability_marker"),
"main.rs must expose encode_durability_marker + append_durability_marker"
);
assert!(
main_src.contains("wal_sync_clone"),
"main.rs must hold a try_clone'd WAL handle for lock-free fsync (v5.4.4)"
);
assert!(
main_src.contains("fn dispatch_v3_record"),
"main.rs replay path must dispatch v3 records via dispatch_v3_record (marker = no-op)"
);
let flusher_path = workspace_root().join("crates/spg-server/src/flusher.rs");
let flusher_src = std::fs::read_to_string(&flusher_path)
.unwrap_or_else(|_| panic!("flusher.rs missing at {}", flusher_path.display()));
assert!(
flusher_src.contains("SPG_SYNCHRONOUS_COMMIT")
&& flusher_src.contains("SPG_FLUSHER_INTERVAL_US"),
"flusher.rs must reference both env knobs"
);
assert!(
flusher_src.contains("append_durability_marker"),
"flusher.rs must call append_durability_marker on each tick"
);
let obs_src =
std::fs::read_to_string(workspace_root().join("crates/spg-server/src/observability.rs"))
.expect("observability.rs");
for atomic in [
"flusher_iterations",
"flusher_errors",
"last_durable_wal_offset",
"last_fsync_us",
] {
assert!(
obs_src.contains(atomic),
"observability.rs must declare the {atomic} atomic for /metrics"
);
}
for series in [
"spg_flusher_iterations_total",
"spg_flusher_errors_total",
"spg_durability_lag_bytes",
"spg_durability_lag_seconds",
] {
assert!(
obs_src.contains(series),
"observability.rs must render the {series} Prometheus series"
);
}
let async_e2e_path = workspace_root().join("crates/spg-server/tests/e2e_async_commit.rs");
let async_e2e_src = std::fs::read_to_string(&async_e2e_path).unwrap_or_else(|_| {
panic!(
"e2e_async_commit.rs missing at {}",
async_e2e_path.display()
)
});
for fn_name in [
"fn sync_commit_default_writes_apply_and_are_visible",
"fn async_commit_off_inserts_visible_immediately",
"fn explicit_sync_commit_on_behaves_like_default",
] {
assert!(
async_e2e_src.contains(fn_name),
"e2e_async_commit.rs must contain {fn_name}"
);
}
let flusher_e2e_path = workspace_root().join("crates/spg-server/tests/e2e_flusher.rs");
let flusher_e2e_src = std::fs::read_to_string(&flusher_e2e_path)
.unwrap_or_else(|_| panic!("e2e_flusher.rs missing at {}", flusher_e2e_path.display()));
for fn_name in [
"fn flusher_metric_zero_in_default_sync_commit_mode",
"fn flusher_metric_rises_under_async_commit_off",
"fn flusher_env_var_recognizes_off_false_zero",
"fn flusher_env_var_treats_on_as_sync",
"fn durability_lag_metrics_are_zero_in_sync_mode",
"fn durability_lag_seconds_bounded_in_async_mode",
] {
assert!(
flusher_e2e_src.contains(fn_name),
"e2e_flusher.rs must contain {fn_name}"
);
}
let chaos_path = workspace_root().join("crates/spg-server/tests/e2e_chaos_async_commit.rs");
let chaos_src = std::fs::read_to_string(&chaos_path).unwrap_or_else(|_| {
panic!(
"e2e_chaos_async_commit.rs missing at {}",
chaos_path.display()
)
});
assert!(
chaos_src.contains("fn chaos_kill_during_async_commit_window_loses_only_unflushed"),
"e2e_chaos_async_commit.rs must contain the kill-mid-window prefix-recovery chaos test"
);
let slo_path = workspace_root().join("crates/spg-server/tests/slo_smoke.rs");
let slo_src = std::fs::read_to_string(&slo_path).expect("slo_smoke.rs");
assert!(
slo_src.contains("fn slo_wal_insert_async_commit_smoke_speedup_vs_sync"),
"slo_smoke.rs must contain the CI ratio gate"
);
assert!(
slo_src.contains("fn slo_wal_insert_async_commit_above_200k"),
"slo_smoke.rs must contain the release-process 200K ship gate"
);
let stability_src =
std::fs::read_to_string(workspace_root().join("STABILITY.md")).expect("STABILITY.md");
assert!(
stability_src.contains("Async-commit mode (v5.4)"),
"STABILITY.md must carry the v5.4 async-commit subsection under Env-var contract"
);
}
#[test]
fn row_5_5_per_query_memory_cap_covered_by_e2e() {
let ab_src =
std::fs::read_to_string(workspace_root().join("crates/spg-server/src/alloc_budget.rs"))
.expect("alloc_budget.rs");
assert!(
ab_src.contains("impl GlobalAlloc for BudgetAllocator"),
"alloc_budget.rs must implement GlobalAlloc for BudgetAllocator"
);
assert!(
ab_src.contains("fn reset_query_budget") && ab_src.contains("fn clear_query_budget"),
"alloc_budget.rs must expose reset/clear_query_budget"
);
let main_src = std::fs::read_to_string(workspace_root().join("crates/spg-server/src/main.rs"))
.expect("main.rs");
assert!(
main_src.contains("#[global_allocator]") && main_src.contains("BudgetAllocator"),
"main.rs must install BudgetAllocator as #[global_allocator]"
);
assert!(
main_src.contains("SPG_MAX_QUERY_BYTES"),
"main.rs must parse the SPG_MAX_QUERY_BYTES knob"
);
let e2e = std::fs::read_to_string(
workspace_root().join("crates/spg-server/tests/e2e_query_budget.rs"),
)
.expect("e2e_query_budget.rs");
assert!(
e2e.contains("fn over_budget_select_is_cancelled")
&& e2e.contains("fn under_budget_select_succeeds"),
"e2e_query_budget.rs must pin the over/under-budget behaviours"
);
}
#[test]
fn row_5_6_memory_exhaustion_survives_covered_by_e2e() {
let e2e = std::fs::read_to_string(
workspace_root().join("crates/spg-server/tests/e2e_query_budget.rs"),
)
.expect("e2e_query_budget.rs");
assert!(
e2e.contains("fn chaos_oom_returns_cancelled_not_panic"),
"e2e_query_budget.rs must pin the OOM-pressure-survival chaos test"
);
let stab =
std::fs::read_to_string(workspace_root().join("STABILITY.md")).expect("STABILITY.md");
assert!(
stab.contains("Per-query memory budget (v5.5)") && stab.contains("panic = \"abort\""),
"STABILITY.md must document the per-query budget + panic=abort OOM semantics"
);
}
#[test]
fn row_6_11_vector_encoding_alternatives_covered_by_e2e() {
for path in [
"crates/spg-server/tests/e2e_sq8.rs",
"crates/spg-server/tests/e2e_half.rs",
] {
let body = std::fs::read_to_string(workspace_root().join(path))
.unwrap_or_else(|_| panic!("{path} missing"));
assert!(
body.contains("USING SQ8") || body.contains("USING HALF"),
"{path} must exercise the encoding it tests"
);
}
}
#[test]
fn row_6_12_vector_knn_1m_scale_perf_gate_present() {
let path = "crates/spg-server/tests/perf_gate_sq8.rs";
let body = std::fs::read_to_string(workspace_root().join(path))
.unwrap_or_else(|_| panic!("{path} missing"));
assert!(
body.contains("sq8_knn_1m_dim128_p50_under_5ms_server")
&& body.contains("sq8_rss_1m_dim128_under_800mib"),
"{path} must declare the v6.0.5 1M-scale perf gates"
);
assert!(
body.contains("#[ignore"),
"{path}'s 1M-scale gates must be #[ignore]'d (run via --ignored)"
);
}
#[test]
fn row_6_13_alter_index_rebuild_covered_by_e2e() {
let path = "crates/spg-server/tests/e2e_alter_rebuild.rs";
let body = std::fs::read_to_string(workspace_root().join(path)).expect("e2e_alter_rebuild.rs");
assert!(
body.contains("ALTER INDEX") && body.contains("REBUILD"),
"e2e_alter_rebuild.rs must exercise ALTER INDEX … REBUILD"
);
assert!(
body.contains("WITH (encoding"),
"e2e_alter_rebuild.rs must exercise the encoding-switch path"
);
}
#[test]
fn meta_every_machine_row_has_a_test() {
let ids = machine_marked_rows();
let self_path = workspace_root().join(file!());
let src = std::fs::read_to_string(&self_path)
.unwrap_or_else(|e| panic!("read self at {}: {e}", self_path.display()));
let mut missing = Vec::new();
for id in &ids {
let prefix = format!("fn row_{}_", id.replace('.', "_"));
if !src.contains(&prefix) {
missing.push(id.clone());
}
}
if !missing.is_empty() {
panic!(
"PROD_READY.md rows marked [machine] but missing tests in prod_ready.rs: {missing:?}\n\
Convention: row id 'X.Y' → fn row_X_Y_<name>"
);
}
}
fn run_reads(addr: &str, n: usize) -> Duration {
let mut s = TcpStream::connect(addr).unwrap();
s.set_read_timeout(Some(READ_TIMEOUT)).unwrap();
let started = Instant::now();
for _ in 0..n {
let _ = select_int(&mut s, "SELECT count(*) FROM r");
}
started.elapsed()
}
fn read_frame(s: &mut TcpStream) -> spg_wire::Frame {
let mut header = [0u8; spg_wire::FRAME_HEADER_LEN];
s.read_exact(&mut header).unwrap();
let payload_len = u32::from_le_bytes([header[0], header[1], header[2], header[3]]) as usize;
let op = spg_wire::Op::from_byte(header[4]).unwrap();
let mut payload = vec![0u8; payload_len];
if payload_len > 0 {
s.read_exact(&mut payload).unwrap();
}
spg_wire::Frame { op, payload }
}
fn send(s: &mut TcpStream, f: &spg_wire::Frame) {
let mut out = Vec::new();
spg_wire::encode(f, &mut out).unwrap();
s.write_all(&out).unwrap();
}
fn exec_ok(s: &mut TcpStream, sql: &str) {
send(s, &spg_wire::build_query(sql));
let f = read_frame(s);
if f.op == spg_wire::Op::ErrorResponse {
let msg = spg_wire::parse_error_response(&f).unwrap_or("<undecodable>");
panic!("server rejected SQL {sql:?}: {msg}");
}
assert_eq!(
f.op,
spg_wire::Op::CommandComplete,
"expected CC for {sql:?}"
);
}
fn select_int(s: &mut TcpStream, sql: &str) -> i64 {
send(s, &spg_wire::build_query(sql));
let rd = read_frame(s);
if rd.op == spg_wire::Op::ErrorResponse {
let msg = spg_wire::parse_error_response(&rd).unwrap_or("<undecodable>");
panic!("server rejected SQL {sql:?}: {msg}");
}
assert_eq!(rd.op, spg_wire::Op::RowDescription);
let mut count: i64 = -1;
loop {
let f = read_frame(s);
match f.op {
spg_wire::Op::DataRow => {
let row = spg_wire::parse_data_row(&f).unwrap();
count = wire_to_i64(&row[0]);
}
spg_wire::Op::DataRowBatch => {
let rows = spg_wire::parse_data_row_batch(&f).unwrap();
count = wire_to_i64(&rows[0][0]);
}
spg_wire::Op::CommandComplete => return count,
other => panic!("unexpected {other:?}"),
}
}
}
fn wire_to_i64(v: &spg_wire::WireValue) -> i64 {
match v {
spg_wire::WireValue::Int(n) => i64::from(*n),
spg_wire::WireValue::BigInt(n) => *n,
spg_wire::WireValue::Text(t) => t.parse().unwrap(),
other => panic!("expected integer, got {other:?}"),
}
}