use crate::execute_delay;
use crate::options::Options;
use crate::pretty::PrettyPrint;
use crate::response::build_response;
use crate::PORT_COUNTERS;
use crate::REQUESTS;
use crate::REQUEST_BYTES;
use crate::RESPONSES;
use crate::RESPONSE_BYTES;
use anyhow::Result;
use futures::stream::{select_all, unfold, StreamExt};
use hyper::Response;
use owo_colors::OwoColorize;
use monoio::io::AsyncReadRent;
use monoio::io::AsyncWriteRentExt;
use monoio::time::TimeDriver;
use monoio::IoUringDriver;
use monoio::RuntimeBuilder;
use std::mem::MaybeUninit;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tracing::error;
use crate::create_listener;
use crate::ServerConfig;
pub fn run_thread(
id: usize,
addrs: Vec<SocketAddr>,
config: Arc<ServerConfig>,
opts: &Options,
) -> Result<()> {
use tracing::info;
let delay = opts.delay;
let meter = opts.meter;
let verbose = opts.verbose;
let num_entries = opts.uring_entries.next_power_of_two();
let cqsize = num_entries * 2;
let mut uring = io_uring::IoUring::builder();
uring.setup_single_issuer().setup_cqsize(cqsize);
if let Some(idle) = opts.uring_sqpoll {
uring.setup_sqpoll(idle);
} else {
uring.setup_coop_taskrun().setup_taskrun_flag();
}
let builder: RuntimeBuilder<TimeDriver<IoUringDriver>> = monoio::RuntimeBuilder::new()
.enable_all()
.uring_builder(uring);
let mut rt = builder.build().unwrap();
rt.block_on(async move {
let mut listeners = Vec::new();
for addr in &addrs {
let std_listener = match create_listener(*addr, opts) {
Ok(l) => l,
Err(e) => {
error!("Failed to create listener for {}: {}", addr, e);
return;
}
};
let Ok(listener) = monoio::net::TcpListener::from_std(std_listener) else {
panic!("Failed to create monoio listener");
};
listeners.push(listener);
}
info!("Thread {} listening on {:?} (monoio)", id, addrs);
let streams: Vec<_> = listeners
.into_iter()
.map(|listener| {
let port = listener.local_addr().unwrap().port();
Box::pin(unfold(listener, move |l| async move {
match l.accept().await {
Ok((stream, _addr)) => Some((Ok((stream, port)), l)),
Err(e) => Some((Err(e), l)),
}
}))
})
.collect();
let mut all_listeners = select_all(streams);
loop {
let (stream, port) = match all_listeners.next().await {
Some(Ok(s)) => s,
Some(Err(e)) => {
error!("Thread {} accept error: {}", id, e);
continue;
}
None => {
error!("Thread {} all listeners closed", id);
break;
}
};
let config = config.clone();
monoio::spawn(async move {
if let Err(e) =
handle_connection_monoio(stream, port, config, false, meter, delay, verbose)
.await
{
error!("Error handling monoio connection: {}", e);
}
});
}
});
Ok(())
}
async fn handle_connection_monoio(
mut stream: monoio::net::TcpStream,
port: u16,
config: Arc<ServerConfig>,
http2: bool,
meter: bool,
delay: Option<Duration>,
verbose: u8,
) -> Result<usize> {
use http_wire::WireDecode;
if http2 {
return Err(anyhow::anyhow!("HTTP/2 not supported with monoio"));
}
let mut response_buf = build_response(&config)?;
let mut buf: Vec<u8> = Vec::with_capacity(8192);
let mut parsed = 0; let mut requests_served = 0;
loop {
if parsed == buf.len() && parsed > 0 {
unsafe {
buf.set_len(0);
}
parsed = 0;
}
else if parsed > 4096 {
buf.copy_within(parsed.., 0);
unsafe {
buf.set_len(buf.len() - parsed);
}
parsed = 0;
}
let current_len = buf.len();
if buf.capacity() - current_len < 4096 {
buf.reserve(4096);
}
let spare_cap = buf.capacity() - current_len;
let temp_buf =
unsafe { Vec::from_raw_parts(buf.as_mut_ptr().add(current_len), 0, spare_cap) };
let (result, temp_buf) = stream.read(temp_buf).await;
std::mem::forget(temp_buf);
let n = match result {
Ok(0) => {
break; }
Ok(n) => n,
Err(e) => {
return Err(anyhow::Error::new(e));
}
};
unsafe {
buf.set_len(current_len + n);
}
let mut headers = [const { MaybeUninit::uninit() }; 128];
loop {
match http_wire::request::FullRequest::decode_uninit(&buf[parsed..], &mut headers) {
Ok((req, req_len)) => {
requests_served += 1;
parsed += req_len;
if verbose > 0 {
println!("↩ {}:\n{}", "request".bold(), req.pretty(verbose));
}
if meter {
REQUESTS.add(1);
REQUEST_BYTES.add(req_len);
let entry = PORT_COUNTERS.entry(port).or_default();
entry.requests.add(1);
entry.request_bytes.add(req_len);
}
if let Some(d) = delay {
execute_delay(d).await;
}
if verbose > 0 {
let mut print_builder = Response::builder().status(config.status);
for (k, v) in &config.headers {
print_builder = print_builder.header(k, v);
}
let print_resp = print_builder.body(config.body.clone()).unwrap();
println!("↪ {}:\n{}", "response".bold(), print_resp.pretty(verbose));
}
let (res, buf_back) = stream.write_all(response_buf).await;
response_buf = buf_back;
res?;
if meter {
RESPONSES.add(1);
RESPONSE_BYTES.add(response_buf.len());
let entry = PORT_COUNTERS.entry(port).or_default();
entry.responses.add(1);
entry.response_bytes.add(response_buf.len());
}
}
Err(_) => break, }
}
}
Ok(requests_served)
}