mod buffer;
mod cancel;
use harrow_codec_h1 as codec;
mod connection;
mod h1;
mod h2;
pub mod kernel_check;
mod o11y;
mod protocol;
use std::cell::Cell;
use std::error::Error;
use std::future::Future;
use std::io;
use std::net::SocketAddr;
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, mpsc};
use std::thread;
use std::time::Duration;
use monoio::net::{ListenerOpts, TcpListener};
use harrow_core::dispatch::SharedState;
use harrow_core::route::App;
use connection::ProtocolVersion;
type BoxError = Box<dyn Error + Send + Sync>;
#[derive(Debug, Clone, Copy)]
pub struct ServerConfig {
pub max_connections: usize,
pub max_h2_streams: u32,
pub workers: Option<usize>,
pub header_read_timeout: Option<Duration>,
pub body_read_timeout: Option<Duration>,
pub connection_timeout: Option<Duration>,
pub drain_timeout: Duration,
pub enable_http2: bool,
}
impl Default for ServerConfig {
fn default() -> Self {
Self {
max_connections: 8192,
max_h2_streams: 256,
workers: None,
header_read_timeout: Some(Duration::from_secs(5)),
body_read_timeout: Some(Duration::from_secs(30)),
connection_timeout: Some(Duration::from_secs(300)),
drain_timeout: Duration::from_secs(30),
enable_http2: false,
}
}
}
pub struct ServerHandle {
addr: SocketAddr,
shutdown: Arc<AtomicBool>,
completion: mpsc::Receiver<Result<(), String>>,
workers: Vec<thread::JoinHandle<Result<(), BoxError>>>,
}
impl ServerHandle {
pub fn local_addr(&self) -> SocketAddr {
self.addr
}
pub fn shutdown(mut self) -> Result<(), Box<dyn Error>> {
self.shutdown.store(true, Ordering::Release);
self.join_workers().map_err(into_public_error)
}
pub fn wait(mut self) -> Result<(), Box<dyn Error>> {
let _ = self.completion.recv();
self.shutdown.store(true, Ordering::Release);
self.join_workers().map_err(into_public_error)
}
fn join_workers(&mut self) -> Result<(), BoxError> {
let mut first_error: Option<BoxError> = None;
for worker in self.workers.drain(..) {
match worker.join() {
Ok(Ok(())) => {}
Ok(Err(err)) => {
if first_error.is_none() {
self.shutdown.store(true, Ordering::Release);
first_error = Some(err);
}
}
Err(panic) => {
if first_error.is_none() {
self.shutdown.store(true, Ordering::Release);
first_error = Some(join_panic_error(panic));
}
}
}
}
if let Some(err) = first_error {
Err(err)
} else {
Ok(())
}
}
}
impl Drop for ServerHandle {
fn drop(&mut self) {
self.shutdown.store(true, Ordering::Release);
for worker in self.workers.drain(..) {
let _ = worker.join();
}
}
}
pub fn run<F>(make_app: F, addr: SocketAddr) -> Result<(), Box<dyn Error>>
where
F: Fn() -> App + Send + Clone + 'static,
{
run_with_config(make_app, addr, ServerConfig::default())
}
pub fn run_with_config<F>(
make_app: F,
addr: SocketAddr,
config: ServerConfig,
) -> Result<(), Box<dyn Error>>
where
F: Fn() -> App + Send + Clone + 'static,
{
start_with_config(make_app, addr, config)?.wait()
}
pub fn start<F>(make_app: F, addr: SocketAddr) -> Result<ServerHandle, Box<dyn Error>>
where
F: Fn() -> App + Send + Clone + 'static,
{
start_with_config(make_app, addr, ServerConfig::default())
}
pub fn start_with_config<F>(
make_app: F,
addr: SocketAddr,
config: ServerConfig,
) -> Result<ServerHandle, Box<dyn Error>>
where
F: Fn() -> App + Send + Clone + 'static,
{
if let Err(err) = kernel_check::check_kernel_version() {
return Err(Box::new(err));
}
let worker_count = resolved_worker_count(config.workers)?;
let worker_config = per_worker_config(config, worker_count);
let shutdown = Arc::new(AtomicBool::new(false));
let mut workers = Vec::with_capacity(worker_count);
let (completion_tx, completion_rx) = mpsc::channel();
let first_worker = spawn_worker(
make_app.clone(),
addr,
worker_config,
Arc::clone(&shutdown),
completion_tx.clone(),
true,
);
let bound_addr = match first_worker.startup.recv_timeout(Duration::from_secs(5)) {
Ok(Ok(bound_addr)) => bound_addr,
Ok(Err(err)) => {
shutdown.store(true, Ordering::Release);
let mut handle = ServerHandle {
addr,
shutdown,
completion: completion_rx,
workers: vec![first_worker.handle],
};
let _ = handle.join_workers();
return Err(into_public_error(err));
}
Err(err) => {
shutdown.store(true, Ordering::Release);
let mut handle = ServerHandle {
addr,
shutdown,
completion: completion_rx,
workers: vec![first_worker.handle],
};
let _ = handle.join_workers();
return Err(Box::new(io::Error::new(
io::ErrorKind::TimedOut,
format!("worker startup failed before reporting a bound address: {err}"),
)));
}
};
workers.push(first_worker.handle);
for _ in 1..worker_count {
let worker = spawn_worker(
make_app.clone(),
bound_addr,
worker_config,
Arc::clone(&shutdown),
completion_tx.clone(),
false,
);
match worker.startup.recv_timeout(Duration::from_secs(5)) {
Ok(Ok(_)) => workers.push(worker.handle),
Ok(Err(err)) => {
shutdown.store(true, Ordering::Release);
workers.push(worker.handle);
let mut handle = ServerHandle {
addr: bound_addr,
shutdown,
completion: completion_rx,
workers,
};
let _ = handle.join_workers();
return Err(into_public_error(err));
}
Err(err) => {
shutdown.store(true, Ordering::Release);
workers.push(worker.handle);
let mut handle = ServerHandle {
addr: bound_addr,
shutdown,
completion: completion_rx,
workers,
};
let _ = handle.join_workers();
return Err(Box::new(io::Error::new(
io::ErrorKind::TimedOut,
format!("worker startup timed out: {err}"),
)));
}
}
}
o11y::record_server_start(bound_addr, &config);
Ok(ServerHandle {
addr: bound_addr,
shutdown,
completion: completion_rx,
workers,
})
}
pub async fn serve(app: App, addr: SocketAddr) -> Result<(), Box<dyn std::error::Error>> {
serve_with_config(
app,
addr,
futures_util::future::pending(),
ServerConfig::default(),
)
.await
}
pub async fn serve_with_shutdown(
app: App,
addr: SocketAddr,
shutdown: impl Future<Output = ()>,
) -> Result<(), Box<dyn std::error::Error>> {
serve_with_config(app, addr, shutdown, ServerConfig::default()).await
}
pub async fn serve_with_config(
app: App,
addr: SocketAddr,
shutdown: impl Future<Output = ()>,
config: ServerConfig,
) -> Result<(), Box<dyn std::error::Error>> {
if config.workers.is_some_and(|workers| workers > 1) {
return Err(Box::new(io::Error::new(
io::ErrorKind::InvalidInput,
"ServerConfig::workers > 1 requires harrow_server_monoio::run/start; async serve_with_config runs on a single monoio runtime",
)));
}
if let Err(e) = kernel_check::check_kernel_version() {
return Err(Box::new(e));
}
let shared = app.into_shared_state();
shared.route_table.print_routes();
let listener = TcpListener::bind_with_config(addr, &listener_options())?;
o11y::record_server_start(addr, &config);
serve_listener(shared, listener, shutdown, config)
.await
.map_err(into_public_error)
}
async fn serve_listener(
shared: Arc<SharedState>,
listener: TcpListener,
shutdown: impl Future<Output = ()>,
config: ServerConfig,
) -> Result<(), BoxError> {
let active_count: Rc<Cell<usize>> = Rc::new(Cell::new(0));
let protocol = if config.enable_http2 {
ProtocolVersion::Http2PriorKnowledge
} else {
ProtocolVersion::Http11
};
let mut shutdown = std::pin::pin!(shutdown);
loop {
monoio::select! {
result = listener.accept() => {
let (stream, remote) = match result {
Ok(conn) => conn,
Err(e) => {
o11y::record_accept_error(e);
continue;
}
};
if let Err(e) = stream.set_nodelay(true) {
o11y::record_tcp_nodelay_error(e);
}
if active_count.get() >= config.max_connections {
drop(stream);
o11y::record_connection_limit_rejected(config.max_connections);
continue;
}
let shared = Arc::clone(&shared);
let header_read_timeout = config.header_read_timeout;
let body_read_timeout = config.body_read_timeout;
let connection_timeout = config.connection_timeout;
let max_h2_streams = config.max_h2_streams;
let counter = Rc::clone(&active_count);
monoio::spawn(connection::handle_connection(
stream,
connection::ConnConfig {
shared,
remote_addr: Some(remote),
header_read_timeout,
body_read_timeout,
connection_timeout,
max_h2_streams,
active_count: counter,
protocol,
},
));
}
() = &mut shutdown => {
o11y::record_server_shutdown();
break;
}
}
}
let drain_start = std::time::Instant::now();
while active_count.get() > 0 {
if drain_start.elapsed() >= config.drain_timeout {
o11y::record_drain_timeout(config.drain_timeout.as_secs(), active_count.get());
break;
}
monoio::time::sleep(Duration::from_millis(10)).await;
}
o11y::record_drain_complete(active_count.get());
Ok(())
}
async fn wait_for_shutdown(shutdown: Arc<AtomicBool>) {
while !shutdown.load(Ordering::Acquire) {
monoio::time::sleep(Duration::from_millis(50)).await;
}
}
fn listener_options() -> ListenerOpts {
ListenerOpts::new().reuse_port(true).reuse_addr(true)
}
fn resolved_worker_count(workers: Option<usize>) -> Result<usize, Box<dyn Error>> {
match workers {
Some(0) => Err(Box::new(io::Error::new(
io::ErrorKind::InvalidInput,
"ServerConfig::workers must be greater than 0",
))),
Some(workers) => Ok(workers),
None => Ok(thread::available_parallelism()
.map(|count| count.get())
.unwrap_or(1)),
}
}
fn per_worker_config(config: ServerConfig, workers: usize) -> ServerConfig {
let per_worker_max = config.max_connections.div_ceil(workers.max(1));
ServerConfig {
max_connections: per_worker_max.max(1),
workers: Some(1),
..config
}
}
fn into_public_error(err: BoxError) -> Box<dyn Error> {
err
}
fn join_panic_error(panic: Box<dyn std::any::Any + Send + 'static>) -> BoxError {
let message = if let Some(message) = panic.downcast_ref::<&str>() {
format!("worker thread panicked: {message}")
} else if let Some(message) = panic.downcast_ref::<String>() {
format!("worker thread panicked: {message}")
} else {
"worker thread panicked".to_string()
};
Box::new(io::Error::other(message))
}
struct WorkerThread {
handle: thread::JoinHandle<Result<(), BoxError>>,
startup: mpsc::Receiver<Result<SocketAddr, BoxError>>,
}
fn spawn_worker<F>(
make_app: F,
addr: SocketAddr,
config: ServerConfig,
shutdown: Arc<AtomicBool>,
completion: mpsc::Sender<Result<(), String>>,
print_routes: bool,
) -> WorkerThread
where
F: Fn() -> App + Send + 'static,
{
let (startup_tx, startup_rx) = mpsc::channel::<Result<SocketAddr, BoxError>>();
let handle = thread::spawn(move || {
let app = make_app();
let shared = app.into_shared_state();
if print_routes {
shared.route_table.print_routes();
}
let mut runtime = match monoio::RuntimeBuilder::<monoio::FusionDriver>::new()
.enable_timer()
.build()
{
Ok(runtime) => runtime,
Err(err) => {
let err: BoxError = Box::new(err);
let _ = startup_tx.send(Err(Box::new(io::Error::other(err.to_string()))));
return Err(err);
}
};
let result = runtime.block_on(async move {
let listener = match TcpListener::bind_with_config(addr, &listener_options()) {
Ok(listener) => listener,
Err(err) => {
let err: BoxError = Box::new(err);
let _ = startup_tx.send(Err(Box::new(io::Error::other(err.to_string()))));
return Err(err);
}
};
let local_addr = match listener.local_addr() {
Ok(local_addr) => local_addr,
Err(err) => {
let err: BoxError = Box::new(err);
let _ = startup_tx.send(Err(Box::new(io::Error::other(err.to_string()))));
return Err(err);
}
};
let _ = startup_tx.send(Ok(local_addr));
serve_listener(shared, listener, wait_for_shutdown(shutdown), config).await
});
let _ = completion.send(result.as_ref().map(|_| ()).map_err(|err| err.to_string()));
result
});
WorkerThread {
handle,
startup: startup_rx,
}
}