#![doc = include_str!("../README.md")]
pub mod bytes;
pub mod error;
pub mod http;
pub mod threadpool;
use crate::{
bytes::{Sink, Source},
error::Error,
http::{Request, Response},
threadpool::{Executable, Threadpool},
};
use std::{
convert::Infallible,
io::BufReader,
net::{TcpListener, ToSocketAddrs},
sync::Arc,
};
struct Connection<T, const STACK_SIZE: usize> {
pub handler: T,
pub rx: Source,
pub tx: Sink,
pub threadpool: Arc<Threadpool<Self, STACK_SIZE>>,
}
impl<T, const STACK_SIZE: usize> Connection<T, STACK_SIZE>
where
T: Fn(&mut Source, &mut Sink) -> bool + Send + Sync + 'static,
{
fn handle(mut self) -> Result<(), Error> {
if (self.handler)(&mut self.rx, &mut self.tx) {
let threadpool = self.threadpool.clone();
threadpool.dispatch(self)?;
}
Ok(())
}
}
impl<T, const STACK_SIZE: usize> Executable for Connection<T, STACK_SIZE>
where
T: Fn(&mut Source, &mut Sink) -> bool + Send + Sync + 'static,
{
fn exec(self) {
let _ = self.handle();
}
}
pub struct Server<T, const STACK_SIZE: usize = 65_536> {
threadpool: Arc<Threadpool<Connection<T, STACK_SIZE>, STACK_SIZE>>,
handler: T,
}
impl<T, const STACK_SIZE: usize> Server<T, STACK_SIZE>
where
T: Fn(&mut Source, &mut Sink) -> bool + Clone + Send + Sync + 'static,
{
pub fn new(worker_max: usize, handler: T) -> Self {
let threadpool: Threadpool<_, STACK_SIZE> = Threadpool::new(worker_max);
Self { threadpool: Arc::new(threadpool), handler }
}
pub fn dispatch(&self, rx: Source, tx: Sink) -> Result<(), Error> {
let job = Connection { handler: self.handler.clone(), rx, tx, threadpool: self.threadpool.clone() };
self.threadpool.dispatch(job)
}
pub fn accept<A>(self, address: A) -> Result<Infallible, Error>
where
A: ToSocketAddrs,
{
let socket = TcpListener::bind(address)?;
loop {
let (stream, _) = socket.accept()?;
let tx = stream.try_clone()?;
let rx = BufReader::new(stream);
let rx = Source::from_other(rx);
self.dispatch(rx, tx.into())?;
}
}
}
#[must_use]
pub fn reqresp<F>(source: &mut Source, sink: &mut Sink, handler: F) -> bool
where
F: Fn(Request) -> Response + Send + Sync + 'static,
{
let Ok(Some(request)) = Request::from_stream(source) else {
return false;
};
let mut response = handler(request);
let Ok(_) = response.to_stream(sink) else {
return false;
};
!response.has_connection_close()
}