#![allow(clippy::unwrap_used, clippy::expect_used, clippy::cast_precision_loss)]
use std::process::{Child, Command};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use bytes::Bytes;
use http_body_util::{BodyExt, Full};
use hyper::{Method, Request};
use hyper_util::client::legacy::Client;
use hyper_util::rt::TokioExecutor;
use osproxy_bench::{footprint_brief, judge_footprint, FootprintProfile, FootprintThresholds};
use testcontainers::core::{ContainerPort, WaitFor};
use testcontainers::runners::AsyncRunner;
use testcontainers::{GenericImage, ImageExt};
use tokio::net::TcpListener;
const INDEX: &str = "osproxy-shared";
const PAGE_BYTES: u64 = 4096;
const SOAK_REQUESTS: u64 = 50_000;
const SOAK_CONCURRENCY: u32 = 16;
type HttpClient = Client<hyper_util::client::legacy::connect::HttpConnector, Full<Bytes>>;
async fn start_opensearch() -> (testcontainers::ContainerAsync<GenericImage>, String) {
let container = GenericImage::new("opensearchproject/opensearch", "2.11.1")
.with_exposed_port(ContainerPort::Tcp(9200))
.with_wait_for(WaitFor::message_on_stdout("] started"))
.with_env_var("discovery.type", "single-node")
.with_env_var("DISABLE_SECURITY_PLUGIN", "true")
.with_env_var("DISABLE_INSTALL_DEMO_CONFIG", "true")
.with_env_var("bootstrap.memory_lock", "false")
.with_env_var("OPENSEARCH_JAVA_OPTS", "-Xms512m -Xmx512m")
.start()
.await
.unwrap();
let host = container.get_host().await.unwrap();
let port = container.get_host_port_ipv4(9200).await.unwrap();
(container, format!("http://{host}:{port}"))
}
struct ProxyChild(Child);
impl Drop for ProxyChild {
fn drop(&mut self) {
let _ = self.0.kill();
let _ = self.0.wait();
}
}
async fn spawn_proxy_process(upstream: &str) -> (ProxyChild, String, u32) {
let port = {
let l = TcpListener::bind("127.0.0.1:0").await.unwrap();
l.local_addr().unwrap().port()
};
let bind = format!("127.0.0.1:{port}");
let child = Command::new(env!("CARGO_BIN_EXE_osproxy"))
.env("OSPROXY_BIND", &bind)
.env("OSPROXY_UPSTREAM", upstream)
.env("OSPROXY_INDEX", INDEX)
.env("OSPROXY_TOKENS", "") .env("OSPROXY_ALLOW_CLEARTEXT_MUTATION", "1") .spawn()
.expect("spawn osproxy binary");
let pid = child.id();
(ProxyChild(child), format!("http://{bind}"), pid)
}
fn ingest(base: &str, i: u64) -> Request<Full<Bytes>> {
let body = format!(r#"{{"tenant_id":"soak","id":{i},"msg":"x"}}"#);
Request::builder()
.method(Method::POST)
.uri(format!("{base}/orders/_doc"))
.header("content-type", "application/json")
.body(Full::new(Bytes::from(body)))
.unwrap()
}
async fn wait_proxy_ready(client: &HttpClient, base: &str) -> bool {
for _ in 0..60 {
if client.request(ingest(base, 0)).await.is_ok() {
return true;
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
false
}
fn rss_bytes(pid: u32) -> Option<u64> {
let statm = std::fs::read_to_string(format!("/proc/{pid}/statm")).ok()?;
let resident_pages: u64 = statm.split_whitespace().nth(1)?.parse().ok()?;
Some(resident_pages * PAGE_BYTES)
}
async fn soak(client: &HttpClient, base: &str) -> u64 {
let next = Arc::new(AtomicU64::new(0));
let ok = Arc::new(AtomicU64::new(0));
let mut workers = Vec::new();
for _ in 0..SOAK_CONCURRENCY {
let client = client.clone();
let base = base.to_owned();
let next = next.clone();
let ok = ok.clone();
workers.push(tokio::spawn(async move {
loop {
let i = next.fetch_add(1, Ordering::Relaxed);
if i >= SOAK_REQUESTS {
break;
}
if let Ok(resp) = client.request(ingest(&base, i)).await {
let success = resp.status().is_success();
let _ = resp.into_body().collect().await;
if success {
ok.fetch_add(1, Ordering::Relaxed);
}
}
}
}));
}
for w in workers {
w.await.unwrap();
}
ok.load(Ordering::Relaxed)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[ignore = "requires Docker + Linux /proc; run with --ignored --nocapture"]
async fn nfr_p6_footprint_under_soak() {
let client: HttpClient = Client::builder(TokioExecutor::new()).build_http();
let (_container, os_base) = start_opensearch().await;
let (proxy, proxy_base, pid) = spawn_proxy_process(&os_base).await;
assert!(
wait_proxy_ready(&client, &proxy_base).await,
"proxy process did not become ready"
);
tokio::time::sleep(Duration::from_secs(2)).await;
let idle_rss_bytes = rss_bytes(pid).expect("read idle RSS");
let ok = soak(&client, &proxy_base).await;
assert_eq!(ok, SOAK_REQUESTS, "every soak ingest should succeed");
tokio::time::sleep(Duration::from_secs(2)).await;
let soak_rss_bytes = rss_bytes(pid).expect("read post-soak RSS");
let profile = FootprintProfile {
idle_rss_bytes,
soak_rss_bytes,
soak_requests: SOAK_REQUESTS,
};
let verdict = judge_footprint(&profile, &FootprintThresholds::provisional());
let dir = env!("CARGO_TARGET_TMPDIR");
std::fs::write(format!("{dir}/nfr-footprint.json"), profile.to_json()).unwrap();
std::fs::write(
format!("{dir}/nfr-footprint-verdict.json"),
verdict.to_json(),
)
.unwrap();
std::fs::write(
format!("{dir}/nfr-footprint.md"),
footprint_brief(&profile, &verdict),
)
.unwrap();
println!("NFR-P6 footprint:\n{}", profile.to_json());
println!(
"idle = {:.1} MiB, post-soak = {:.1} MiB, growth = {:.2}x over {} reqs\nverdict (provisional):\n{}",
idle_rss_bytes as f64 / 1_048_576.0,
soak_rss_bytes as f64 / 1_048_576.0,
profile.growth_ratio(),
SOAK_REQUESTS,
verdict.to_json(),
);
assert!(idle_rss_bytes > 0, "idle RSS should be measurable");
let growth = verdict
.findings
.iter()
.find(|f| f.nfr == "NFR-P6-growth")
.expect("growth finding present");
assert!(
growth.pass,
"footprint should stay bounded under soak: {}",
growth.detail
);
drop(proxy); }