#![allow(
clippy::unwrap_used,
clippy::expect_used,
clippy::cast_precision_loss,
clippy::too_many_lines
)]
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use bytes::Bytes;
use http_body_util::{BodyExt, Full};
use hyper::body::Incoming;
use hyper::service::service_fn;
use hyper::{Method, Request, Response};
use hyper_util::client::legacy::Client;
use hyper_util::rt::{TokioExecutor, TokioIo};
use osproxy_bench::LatencySummary;
use osproxy_core::{Clock, ClusterId, IndexName, SystemClock};
use osproxy_engine::{Pipeline, QueueError, QueuedWrite, WriteMode, WriteQueue};
use osproxy_server::auth::ReferenceAuthenticator;
use osproxy_server::handler::AppHandler;
use osproxy_server::tenancy::ReferenceTenancy;
use osproxy_sink::OpenSearchSink;
use osproxy_tenancy::TenancyRouter;
use tokio::net::TcpListener;
const PAYLOADS: &[(&str, usize)] = &[("256B", 256), ("4KB", 4096), ("64KB", 65536)];
const CONNS: &[usize] = &[16, 64, 256];
const REQS_PER_CONN: usize = 60;
struct MemQueue;
impl WriteQueue for MemQueue {
fn enabled(&self) -> bool {
true
}
fn enqueue<'a>(
&'a self,
_write: QueuedWrite,
) -> Pin<Box<dyn Future<Output = Result<(), QueueError>> + Send + 'a>> {
Box::pin(async { Ok(()) })
}
}
async fn start_upstream() -> String {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
while let Ok((stream, _)) = listener.accept().await {
tokio::spawn(async move {
let io = TokioIo::new(stream);
let svc = service_fn(|_req: Request<Incoming>| async move {
Ok::<_, std::convert::Infallible>(
Response::builder()
.status(201)
.body(Full::new(Bytes::from(
r#"{"_id":"acme:1","result":"created"}"#,
)))
.unwrap(),
)
});
let _ = hyper::server::conn::http1::Builder::new()
.serve_connection(io, svc)
.await;
});
}
});
format!("http://{addr}")
}
async fn spawn_proxy(upstream: String, async_mode: bool) -> std::net::SocketAddr {
let tenancy = ReferenceTenancy::new(
ClusterId::from("default"),
IndexName::from("osproxy-shared"),
upstream,
);
let mut pipeline = Pipeline::new(TenancyRouter::new(tenancy), OpenSearchSink::new());
if async_mode {
pipeline = pipeline
.with_baseline_write_mode(WriteMode::Async)
.with_write_queue(Arc::new(MemQueue));
}
let handler = Arc::new(
AppHandler::new(pipeline, ReferenceAuthenticator::dev())
.with_require_tls_for_mutation(false),
);
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
let _ = osproxy_transport::serve(listener, handler).await;
});
addr
}
fn payload(size: usize) -> Bytes {
let fixed = r#"{"tenant_id":"acme","id":1,"data":""}"#.len();
let pad = size.saturating_sub(fixed).max(1);
Bytes::from(format!(
r#"{{"tenant_id":"acme","id":1,"data":"{}"}}"#,
"x".repeat(pad)
))
}
async fn run_cell(proxy: std::net::SocketAddr, body: Bytes, conns: usize) -> (f64, f64, f64) {
let ok = Arc::new(AtomicU64::new(0));
let lat = Arc::new(Mutex::new(Vec::<u64>::new()));
let clock = SystemClock;
let t0 = clock.now();
let mut workers = Vec::with_capacity(conns);
for _ in 0..conns {
let (ok, lat, body) = (Arc::clone(&ok), Arc::clone(&lat), body.clone());
workers.push(tokio::spawn(async move {
let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
let send = |c: &Client<_, Full<Bytes>>, b: Bytes| {
let c = c.clone();
async move {
let req = Request::builder()
.method(Method::POST)
.uri(format!("http://{proxy}/orders/_doc"))
.header("content-type", "application/json")
.body(Full::new(b))
.unwrap();
match c.request(req).await {
Ok(resp) => {
let ok = resp.status().is_success();
let _ = resp.into_body().collect().await;
ok
}
Err(_) => false,
}
}
};
let _ = send(&client, body.clone()).await; let mut local = Vec::with_capacity(REQS_PER_CONN);
for _ in 0..REQS_PER_CONN {
let r0 = SystemClock.now();
if send(&client, body.clone()).await {
ok.fetch_add(1, Ordering::Relaxed);
local.push(
u64::try_from(SystemClock.now().saturating_duration_since(r0).as_nanos())
.unwrap_or(u64::MAX),
);
}
}
lat.lock().unwrap().extend(local);
}));
}
for w in workers {
w.await.unwrap();
}
let wall = clock.now().saturating_duration_since(t0).as_secs_f64();
let done = ok.load(Ordering::Relaxed);
let s = LatencySummary::from_nanos(&lat.lock().unwrap()).expect("samples");
(
done as f64 / wall,
s.p50_ns as f64 / 1.0e6,
s.p99_ns as f64 / 1.0e6,
)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
#[ignore = "load matrix; run with --ignored --nocapture"]
async fn load_matrix_payload_x_connections_x_mode() {
let upstream = start_upstream().await;
let sync = spawn_proxy(upstream.clone(), false).await;
let r#async = spawn_proxy(upstream, true).await;
println!("load matrix: rps / p50ms / p99ms (co-located harness, host-bound):");
println!(
"{:<6} {:>6} | {:>24} | {:>24}",
"payload", "conns", "sync (forward upstream)", "async (fan-out enqueue)"
);
for &(label, size) in PAYLOADS {
let body = payload(size);
for &conns in CONNS {
let (srps, sp50, sp99) = run_cell(sync, body.clone(), conns).await;
let (arps, ap50, ap99) = run_cell(r#async, body.clone(), conns).await;
println!(
"{label:<6} {conns:>6} | {srps:>10.0} {sp50:>6.2} {sp99:>6.2} | {arps:>10.0} {ap50:>6.2} {ap99:>6.2}"
);
}
}
}