#![feature(test)]
extern crate test;
use std::io::{Error, ErrorKind, Read, Write};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::cell::RefCell;
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use futures::{
future::{FutureExt, LocalBoxFuture},
};
use https::HeaderMap;
use pi_atom::Atom;
use pi_async_rt::rt::{serial::AsyncRuntimeBuilder, startup_global_time_loop};
use pi_gray::GrayVersion;
use pi_handler::{Args, Handler, SGenType};
use pi_hash::XHashMap;
use tcp::{
connect::TcpSocket,
server::{PortsAdapterFactory, SocketListener},
SocketConfig,
};
use test::{black_box, Bencher};
use pi_http::{
gateway::GatewayContext,
middleware::MiddlewareChain,
port::HttpPort,
response::ResponseHandler,
route::HttpRoute,
server::HttpListenerFactory,
sse::{
write_sse_accept_headers, SseAcceptDecision, SseConfig, SseEvent, SseHub,
SseMiddleware, SSE_PARAM_CONNECTION_ID, SSE_PARAM_NONCE,
},
virtual_host::{VirtualHost, VirtualHostPool, VirtualHostTab},
};
const STRICT_ORDER_EVENTS: usize = 64;
const THROUGHPUT_CLIENTS: usize = 16;
const THROUGHPUT_EVENTS_PER_CLIENT: usize = 32;
#[derive(Clone, Copy)]
enum BenchScenario {
StrictOrder { events: usize },
ConcurrentThroughput { events_per_client: usize },
FirstEventLatency,
}
struct SsePortBenchHandler;
impl Handler for SsePortBenchHandler {
type A = SocketAddr;
type B = String;
type C = Arc<HeaderMap>;
type D = Arc<RefCell<XHashMap<String, SGenType>>>;
type E = ResponseHandler;
type F = ();
type G = ();
type H = ();
type HandleResult = ();
fn handle(
&self,
_env: Arc<dyn GrayVersion>,
_topic: Atom,
args: Args<Self::A, Self::B, Self::C, Self::D, Self::E, Self::F, Self::G, Self::H>,
) -> LocalBoxFuture<'static, Self::HandleResult> {
async move {
if let Args::FiveArgs(_addr, _method, _headers, params, response) = args {
let (id, nonce) = {
let params = params.borrow();
let id = match params.get(SSE_PARAM_CONNECTION_ID) {
Some(SGenType::Str(value)) => value.clone(),
_ => panic!("port benchmark handler must receive SSE connection id"),
};
let nonce = match params.get(SSE_PARAM_NONCE) {
Some(SGenType::Str(value)) => value.clone(),
_ => panic!("port benchmark handler must receive SSE nonce"),
};
(id, nonce)
};
write_sse_accept_headers(&response, id.as_str(), &nonce, "bench-port-client")
.expect("port benchmark accept handshake headers must be valid");
response
.finish()
.await
.expect("port benchmark response must finish");
}
}
.boxed_local()
}
}
struct BenchServer {
_timer: Option<pi_async_rt::rt::GlobalTimeLoopHandle>,
listener: Option<SocketListener<TcpSocket, PortsAdapterFactory<TcpSocket>>>,
addr: SocketAddr,
}
impl Drop for BenchServer {
fn drop(&mut self) {
if let Some(listener) = self.listener.take() {
listener.close(Err(Error::new(
ErrorKind::Interrupted,
"close SSE benchmark listener",
)));
}
}
}
fn reserve_local_port() -> u16 {
TcpListener::bind("127.0.0.1:0")
.expect("benchmark must reserve a local TCP port")
.local_addr()
.expect("benchmark listener must expose local addr")
.port()
}
fn start_bench_server(scenario: BenchScenario) -> BenchServer {
let timer = startup_global_time_loop(10);
let port = reserve_local_port();
let addr: SocketAddr = format!("127.0.0.1:{}", port)
.parse()
.expect("benchmark addr must parse");
let hub = SseHub::<String, TcpSocket>::builder().build();
let hub_for_open = hub.clone();
let middleware = SseMiddleware::with_acceptor(hub.clone(), |_accept| {
Ok(SseAcceptDecision::accept("bench-client".to_string()))
})
.config(
SseConfig::builder()
.channel_size(1024)
.heartbeat_interval_ms(0)
.send_initial_comment(false)
.build()
.expect("benchmark SSE config must be valid"),
)
.on_open(move |open| {
let hub_for_thread = hub_for_open.clone();
let id = open.id;
thread::spawn(move || {
match scenario {
BenchScenario::StrictOrder { events } => {
send_sequence(&hub_for_thread, id, events);
}
BenchScenario::ConcurrentThroughput { events_per_client } => {
send_sequence(&hub_for_thread, id, events_per_client);
}
BenchScenario::FirstEventLatency => {
send_sequence(&hub_for_thread, id, 1);
}
}
hub_for_thread
.try_close(id)
.expect("benchmark SSE connection must close");
});
Ok(())
})
.build()
.expect("benchmark SSE middleware must build");
let mut route = HttpRoute::<TcpSocket, GatewayContext, SseMiddleware<String, TcpSocket>>::new();
route.at("/sse").get(middleware);
let host = VirtualHost::with(route);
let mut hosts = VirtualHostTab::<TcpSocket, SseMiddleware<String, TcpSocket>>::new();
hosts
.add_default(host)
.expect("benchmark virtual host must register");
let mut factory = PortsAdapterFactory::<TcpSocket>::new();
factory.bind(
port,
HttpListenerFactory::<TcpSocket, _>::with_hosts(hosts, 5000).new_service(),
);
let rt = AsyncRuntimeBuilder::default_local_thread(None, None);
let mut config = SocketConfig::new("127.0.0.1", &[port]);
config.set_option(16 * 1024, 16 * 1024, 16 * 1024, 64);
let listener = SocketListener::try_bind(
vec![rt],
factory,
config,
128,
1024 * 1024,
128,
8,
16 * 1024,
16 * 1024,
Some(10),
)
.expect("benchmark SSE server must bind");
thread::sleep(Duration::from_millis(100));
BenchServer {
_timer: timer,
listener: Some(listener),
addr,
}
}
fn start_port_bench_server(events_per_connection: usize) -> BenchServer {
let timer = startup_global_time_loop(10);
let port = reserve_local_port();
let addr: SocketAddr = format!("127.0.0.1:{}", port)
.parse()
.expect("benchmark addr must parse");
let hub = SseHub::<String, TcpSocket>::builder().build();
let hub_for_open = hub.clone();
let middleware = SseMiddleware::with_acceptor(hub.clone(), |_accept| {
unreachable!("port/params benchmark must not call direct acceptor")
})
.config(
SseConfig::builder()
.channel_size(1024)
.heartbeat_interval_ms(0)
.send_initial_comment(false)
.build()
.expect("benchmark SSE config must be valid"),
)
.port_handshake_string_key()
.on_open(move |open| {
let hub_for_thread = hub_for_open.clone();
let id = open.id;
thread::spawn(move || {
send_sequence(&hub_for_thread, id, events_per_connection);
hub_for_thread
.try_close(id)
.expect("benchmark SSE port connection must close");
});
Ok(())
})
.build()
.expect("benchmark SSE port middleware must build");
let port_handler = HttpPort::with_handler(None, Arc::new(SsePortBenchHandler));
let mut chain = MiddlewareChain::<TcpSocket, GatewayContext>::new();
chain.push_back(Arc::new(middleware));
chain.push_back(Arc::new(port_handler));
chain.finish();
let chain = Arc::new(chain);
type BenchChain = Arc<MiddlewareChain<TcpSocket, GatewayContext>>;
let mut route = HttpRoute::<TcpSocket, GatewayContext, BenchChain>::new();
route.at("/sse").get(chain);
let host = VirtualHost::with(route);
let mut hosts = VirtualHostTab::<TcpSocket, BenchChain>::new();
hosts
.add_default(host)
.expect("benchmark virtual host must register");
let mut factory = PortsAdapterFactory::<TcpSocket>::new();
factory.bind(
port,
HttpListenerFactory::<TcpSocket, _>::with_hosts(hosts, 5000).new_service(),
);
let rt = AsyncRuntimeBuilder::default_local_thread(None, None);
let mut config = SocketConfig::new("127.0.0.1", &[port]);
config.set_option(16 * 1024, 16 * 1024, 16 * 1024, 64);
let listener = SocketListener::try_bind(
vec![rt],
factory,
config,
128,
1024 * 1024,
128,
8,
16 * 1024,
16 * 1024,
Some(10),
)
.expect("benchmark SSE port server must bind");
thread::sleep(Duration::from_millis(100));
BenchServer {
_timer: timer,
listener: Some(listener),
addr,
}
}
fn send_sequence(
hub: &SseHub<String, TcpSocket>,
id: pi_http::sse::SseConnectionId,
events: usize,
) {
for index in 0..events {
let event = SseEvent::builder()
.id(format!("evt-{:04}", index))
.event("bench")
.data(format!("seq-{:04}", index))
.build()
.expect("benchmark SSE event must be valid");
let report = hub.try_send_to_id(id, event);
assert_eq!(report.total, 1);
assert_eq!(report.sent, 1);
}
}
fn read_sse_events(addr: SocketAddr, first_event_started: Instant) -> (Vec<String>, Duration) {
let mut stream = TcpStream::connect(addr).expect("benchmark client must connect");
stream
.set_read_timeout(Some(Duration::from_secs(2)))
.expect("benchmark client must set read timeout");
stream
.set_write_timeout(Some(Duration::from_secs(2)))
.expect("benchmark client must set write timeout");
let req = format!(
"GET /sse HTTP/1.1\r\nHost: {}\r\nAccept: text/event-stream\r\nConnection: close\r\n\r\n",
addr
);
stream
.write_all(req.as_bytes())
.expect("benchmark client must write HTTP request");
let mut response = Vec::new();
let mut buf = [0u8; 4096];
let mut data = Vec::new();
let mut first_latency = None;
loop {
match stream.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
response.extend_from_slice(&buf[..n]);
let text = String::from_utf8_lossy(&response);
data.clear();
for line in text.lines() {
if let Some(value) = line.strip_prefix("data: ") {
if first_latency.is_none() {
first_latency = Some(first_event_started.elapsed());
}
data.push(value.to_string());
}
}
if response
.windows(b"0\r\n\r\n".len())
.any(|w| w == b"0\r\n\r\n")
{
break;
}
}
Err(e) if e.kind() == ErrorKind::WouldBlock || e.kind() == ErrorKind::TimedOut => {
panic!(
"benchmark client timed out, partial response: {:?}",
response
);
}
Err(e) => panic!("benchmark client read failed: {:?}", e),
}
}
(
data,
first_latency.expect("benchmark response must contain at least one SSE data line"),
)
}
fn assert_strict_order(data: &[String], events: usize) {
assert_eq!(data.len(), events);
for (index, value) in data.iter().enumerate() {
assert_eq!(value, &format!("seq-{:04}", index));
}
}
#[bench]
fn bench_sse_real_network_strict_order_64_events(b: &mut Bencher) {
let server = start_bench_server(BenchScenario::StrictOrder {
events: STRICT_ORDER_EVENTS,
});
b.bytes = STRICT_ORDER_EVENTS as u64;
b.iter(|| {
let started = Instant::now();
let (data, first_latency) = read_sse_events(server.addr, started);
black_box(first_latency);
assert_strict_order(&data, STRICT_ORDER_EVENTS);
});
}
#[bench]
fn bench_sse_real_network_concurrent_throughput_16x32(b: &mut Bencher) {
let server = start_bench_server(BenchScenario::ConcurrentThroughput {
events_per_client: THROUGHPUT_EVENTS_PER_CLIENT,
});
b.bytes = (THROUGHPUT_CLIENTS * THROUGHPUT_EVENTS_PER_CLIENT) as u64;
b.iter(|| {
let mut clients = Vec::with_capacity(THROUGHPUT_CLIENTS);
for _ in 0..THROUGHPUT_CLIENTS {
let addr = server.addr;
clients.push(thread::spawn(move || {
let started = Instant::now();
let (data, first_latency) = read_sse_events(addr, started);
black_box(first_latency);
assert_eq!(data.len(), THROUGHPUT_EVENTS_PER_CLIENT);
}));
}
for client in clients {
client
.join()
.expect("benchmark concurrent client must finish");
}
});
}
#[bench]
fn bench_sse_real_network_first_event_latency(b: &mut Bencher) {
let server = start_bench_server(BenchScenario::FirstEventLatency);
b.bytes = 1;
b.iter(|| {
let started = Instant::now();
let (data, first_latency) = read_sse_events(server.addr, started);
assert_eq!(data.len(), 1);
black_box(first_latency);
});
}
#[bench]
fn bench_sse_real_network_port_handshake_strict_order_64_events(b: &mut Bencher) {
let server = start_port_bench_server(STRICT_ORDER_EVENTS);
b.bytes = STRICT_ORDER_EVENTS as u64;
b.iter(|| {
let started = Instant::now();
let (data, first_latency) = read_sse_events(server.addr, started);
black_box(first_latency);
assert_strict_order(&data, STRICT_ORDER_EVENTS);
});
}
#[bench]
fn bench_sse_real_network_port_handshake_concurrent_throughput_16x32(b: &mut Bencher) {
let server = start_port_bench_server(THROUGHPUT_EVENTS_PER_CLIENT);
b.bytes = (THROUGHPUT_CLIENTS * THROUGHPUT_EVENTS_PER_CLIENT) as u64;
b.iter(|| {
let mut clients = Vec::with_capacity(THROUGHPUT_CLIENTS);
for _ in 0..THROUGHPUT_CLIENTS {
let addr = server.addr;
clients.push(thread::spawn(move || {
let started = Instant::now();
let (data, first_latency) = read_sse_events(addr, started);
black_box(first_latency);
assert_eq!(data.len(), THROUGHPUT_EVENTS_PER_CLIENT);
}));
}
for client in clients {
client
.join()
.expect("benchmark port concurrent client must finish");
}
});
}
#[bench]
fn bench_sse_real_network_port_handshake_first_event_latency(b: &mut Bencher) {
let server = start_port_bench_server(1);
b.bytes = 1;
b.iter(|| {
let started = Instant::now();
let (data, first_latency) = read_sse_events(server.addr, started);
assert_eq!(data.len(), 1);
black_box(first_latency);
});
}