#![allow(
clippy::doc_markdown,
clippy::cast_possible_truncation,
clippy::uninlined_format_args,
unused_mut,
unused_variables
)]
use std::io::{Read, Write};
use std::net::TcpStream;
use std::thread;
use std::time::{Duration, Instant};
mod common;
fn local_spawn(envs: &[(&str, &str)]) -> (std::process::Child, common::ServerAddrs) {
let mut b = common::ServerBuilder::new().with_http();
for (k, v) in envs {
b = b.env(*k, *v);
}
b.spawn()
}
const STARTUP_TIMEOUT: Duration = Duration::from_secs(5);
const READ_TIMEOUT: Duration = Duration::from_secs(3);
fn http_get_body(addr: &str) -> String {
let deadline = Instant::now() + STARTUP_TIMEOUT;
let mut stream = loop {
if let Ok(s) = TcpStream::connect(addr) {
break s;
}
assert!(
Instant::now() <= deadline,
"http listener at {addr} never came up"
);
thread::sleep(Duration::from_millis(20));
};
stream.set_read_timeout(Some(READ_TIMEOUT)).unwrap();
let req = "GET /metrics HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n";
stream.write_all(req.as_bytes()).unwrap();
let mut buf = Vec::new();
stream.read_to_end(&mut buf).unwrap();
let response = String::from_utf8_lossy(&buf).to_string();
response
.split_once("\r\n\r\n")
.map_or(String::new(), |(_, b)| b.to_string())
}
fn flusher_iterations(http: &str) -> u64 {
metric_u64(http, "spg_flusher_iterations_total")
}
fn metric_u64(http: &str, name: &str) -> u64 {
let body = http_get_body(http);
body.lines()
.find_map(|l| l.strip_prefix(name).and_then(|tail| tail.strip_prefix(' ')))
.map_or(0, |s| s.trim().parse::<u64>().unwrap_or(0))
}
fn metric_f64(http: &str, name: &str) -> f64 {
let body = http_get_body(http);
body.lines()
.find_map(|l| l.strip_prefix(name).and_then(|tail| tail.strip_prefix(' ')))
.map_or(0.0, |s| s.trim().parse::<f64>().unwrap_or(0.0))
}
#[test]
fn flusher_metric_zero_in_default_sync_commit_mode() {
let (raw, addrs) = local_spawn(&[]);
let mut child = common::ChildGuard(raw);
thread::sleep(Duration::from_millis(150));
let v = flusher_iterations(addrs.http.as_ref().unwrap());
assert_eq!(
v, 0,
"sync-commit (the default) must not spawn the flusher; got iterations={v}"
);
}
#[test]
fn flusher_metric_rises_under_async_commit_off() {
let (raw, addrs) = local_spawn(&[
("SPG_SYNCHRONOUS_COMMIT", "off"),
("SPG_FLUSHER_INTERVAL_US", "1000"),
]);
let mut child = common::ChildGuard(raw);
thread::sleep(Duration::from_millis(200));
let v = flusher_iterations(addrs.http.as_ref().unwrap());
assert!(
v >= 10,
"expected flusher_iterations_total >= 10 after 200ms at 1ms cadence, got {v}"
);
}
#[test]
fn flusher_env_var_recognizes_off_false_zero() {
for val in ["off", "false", "0"] {
let (raw, addrs) = local_spawn(&[
("SPG_SYNCHRONOUS_COMMIT", val),
("SPG_FLUSHER_INTERVAL_US", "500"),
]);
let mut child = common::ChildGuard(raw);
thread::sleep(Duration::from_millis(100));
let v = flusher_iterations(addrs.http.as_ref().unwrap());
assert!(
v >= 5,
"SPG_SYNCHRONOUS_COMMIT={val:?} must enable the flusher; got iterations={v}"
);
}
}
#[test]
fn durability_lag_metrics_are_zero_in_sync_mode() {
let (raw, addrs) = local_spawn(&[]);
let mut child = common::ChildGuard(raw);
thread::sleep(Duration::from_millis(100));
let lag_bytes = metric_u64(addrs.http.as_ref().unwrap(), "spg_durability_lag_bytes");
let lag_seconds = metric_f64(addrs.http.as_ref().unwrap(), "spg_durability_lag_seconds");
assert_eq!(lag_bytes, 0, "sync mode must report 0 lag bytes");
assert!(
lag_seconds == 0.0,
"sync mode must report 0 lag seconds, got {lag_seconds}"
);
}
#[test]
fn durability_lag_seconds_bounded_in_async_mode() {
let (raw, addrs) = local_spawn(&[
("SPG_SYNCHRONOUS_COMMIT", "off"),
("SPG_FLUSHER_INTERVAL_US", "1000"),
]);
let mut child = common::ChildGuard(raw);
thread::sleep(Duration::from_millis(50));
let lag_seconds = metric_f64(addrs.http.as_ref().unwrap(), "spg_durability_lag_seconds");
assert!(
lag_seconds < 1.0,
"async-commit lag_seconds should be < 1 s under 1 ms cadence, got {lag_seconds}"
);
let iters = metric_u64(addrs.http.as_ref().unwrap(), "spg_flusher_iterations_total");
assert!(
iters >= 1,
"expected flusher to have ticked at least once before scrape, got {iters}"
);
}
#[test]
fn flusher_env_var_treats_on_as_sync() {
for val in ["on", "true", "1", "yes", ""] {
let (raw, addrs) = local_spawn(&[("SPG_SYNCHRONOUS_COMMIT", val)]);
let mut child = common::ChildGuard(raw);
thread::sleep(Duration::from_millis(100));
let v = flusher_iterations(addrs.http.as_ref().unwrap());
assert_eq!(
v, 0,
"SPG_SYNCHRONOUS_COMMIT={val:?} must keep sync semantics; got iterations={v}"
);
}
}