#![doc = include_str!("../README.md")]
pub mod error;
pub mod http;
pub mod log;
pub mod threadpool;
pub mod utils;
use crate::{
error::Error,
http::body::Body,
http::{Request, Response},
threadpool::{Pool, StaticFn},
};
use std::{
convert::Infallible,
io::{BufReader, Read},
net::{Shutdown, TcpListener, TcpStream},
sync::{
mpsc::{self, Receiver, SyncSender},
Arc,
},
thread::Builder,
};
struct ConnectionContext<T> {
#[allow(clippy::type_complexity)]
pub handler: Arc<Box<dyn Fn(&mut Request) -> Response<T> + Send + Sync + 'static>>,
pub stream: BufReader<TcpStream>,
pub connection_queue: SyncSender<BufReader<TcpStream>>,
}
pub struct Server<T = Body, const STACK_SIZE: usize = 65_536> {
socket: TcpListener,
threadpool: Pool<ConnectionContext<T>, STACK_SIZE>,
connection_queue_seed: SyncSender<BufReader<TcpStream>>,
connection_queue: Receiver<BufReader<TcpStream>>,
}
impl<T, const STACK_SIZE: usize> Server<T, STACK_SIZE> {
pub fn new(address: &str, soft_limit: usize, hard_limit: usize) -> Result<Self, Error> {
let socket = TcpListener::bind(address)?;
let threadpool: Pool<_, STACK_SIZE> = Pool::new(soft_limit, hard_limit);
let (connection_queue_seed, connection_queue) = mpsc::sync_channel(hard_limit);
Ok(Self { socket, threadpool, connection_queue_seed, connection_queue })
}
pub fn exec<F>(self, callback: F) -> Result<Infallible, Error>
where
F: Fn(&mut Request) -> Response<T> + Send + Sync + 'static,
T: Read + 'static,
{
#[allow(clippy::type_complexity)]
let callback = {
let boxed: Box<dyn Fn(&mut Request) -> Response<T> + Send + Sync + 'static> = Box::new(callback);
Arc::new(boxed)
};
Self::accept_async(self.socket, &self.connection_queue_seed)?;
loop {
let connection = self.connection_queue.recv().expect("connection queue is broken");
let context = ConnectionContext {
handler: callback.clone(),
stream: connection,
connection_queue: self.connection_queue_seed.clone(),
};
let job = StaticFn { fn_: Self::callback_executor, context };
self.threadpool.schedule(job)?;
}
}
fn accept_async(socket: TcpListener, connection_queue: &SyncSender<BufReader<TcpStream>>) -> Result<(), Error> {
let connection_queue = connection_queue.clone();
let builder = Builder::new().name("acceptor thread".to_string()).stack_size(STACK_SIZE);
builder.spawn(move || loop {
if let Ok((connection, _)) = socket.accept() {
let connection = BufReader::new(connection);
connection_queue.send(connection).expect("cannot schedule connection for processing")
}
})?;
Ok(())
}
fn callback_executor(context: ConnectionContext<T>)
where
T: Read,
{
fn try_<T>(context: ConnectionContext<T>) -> Result<(), Error>
where
T: Read,
{
let ConnectionContext { handler, stream, connection_queue } = context;
let Some(mut request) = Request::from_stream(stream)? else {
return Ok(());
};
let mut response: Response<T> = handler(&mut request);
let mut stream = request.stream;
response.to_stream(stream.get_mut())?;
match response.has_connection_close() {
true => stream.get_ref().shutdown(Shutdown::Both)?,
false => connection_queue.send(stream).expect("cannot reschedule connection for processing"),
}
Ok(())
}
if let Err(e) = try_(context) {
let mut error_string = e.to_string();
if e.has_backtrace() {
writeln!(&mut error_string, "{}", e.backtrace).expect("failed to format backtrace");
}
log_info!("Failed to handle connection: {error_string}");
}
}
}