#![cfg(feature = "server")]
mod pool;
mod worker;
use crate::bytes::{Sink, Source};
use crate::error::Error;
use crate::http::{Request, Response};
use crate::server::pool::{Executable, Threadpool};
use std::convert::Infallible;
use std::io::{BufReader, BufWriter, Write};
use std::net::{TcpListener, ToSocketAddrs};
use std::sync::Arc;
#[derive(Clone)]
enum Handler {
SourceSink(Arc<dyn Fn(&mut Source, &mut Sink) -> bool + Send + Sync + 'static>),
RequestResponse(Arc<dyn Fn(Request) -> Response + Send + Sync + 'static>),
}
impl Handler {
pub fn exec(&self, source: &mut Source, sink: &mut Sink) -> bool {
match self {
Handler::SourceSink(handler) => handler(source, sink),
Handler::RequestResponse(handler) => Self::bridge_request_response(source, sink, handler.as_ref()),
}
}
fn bridge_request_response<F>(source: &mut Source, sink: &mut Sink, handler: &F) -> bool
where
F: Fn(Request) -> Response + ?Sized,
{
let Ok(Some(request)) = Request::from_stream(source) else {
return false;
};
let is_head = request.method.as_ref().eq_ignore_ascii_case(b"HEAD");
let request_has_connection_close = request.has_connection_close();
let mut response = handler(request);
if is_head {
response.make_head();
}
let Ok(_) = response.to_stream(sink) else {
return false;
};
let has_connection_close = request_has_connection_close || response.has_connection_close();
!has_connection_close
}
}
struct Connection<const STACK_SIZE: usize> {
pub handler: Handler,
pub source: Source,
pub sink: Sink,
pub threadpool: Threadpool<Self, STACK_SIZE>,
}
impl<const STACK_SIZE: usize> Executable for Connection<STACK_SIZE> {
fn exec(mut self) {
let reschedule = self.handler.exec(&mut self.source, &mut self.sink);
if self.sink.flush().is_ok() && reschedule {
let threadpool = self.threadpool.clone();
let _ = threadpool.dispatch(self);
}
}
}
pub struct Server<const STACK_SIZE: usize> {
threadpool: Threadpool<Connection<STACK_SIZE>, STACK_SIZE>,
handler: Handler,
}
impl<const STACK_SIZE: usize> Server<STACK_SIZE> {
pub fn with_source_sink<F>(workers_max: usize, source_sink_handler: F) -> Self
where
F: Fn(&mut Source, &mut Sink) -> bool + Send + Sync + 'static,
{
let threadpool: Threadpool<_, STACK_SIZE> = Threadpool::new(workers_max);
let handler = Handler::SourceSink(Arc::new(source_sink_handler));
Self { threadpool, handler }
}
pub fn with_request_response<F>(workers_max: usize, request_response_handler: F) -> Self
where
F: Fn(Request) -> Response + Send + Sync + 'static,
{
let threadpool: Threadpool<_, STACK_SIZE> = Threadpool::new(workers_max);
let handler = Handler::RequestResponse(Arc::new(request_response_handler));
Self { threadpool, handler }
}
pub fn dispatch(&self, source: Source, sink: Sink) -> Result<(), Error> {
self.threadpool.dispatch(Connection {
handler: self.handler.clone(),
source,
sink,
threadpool: self.threadpool.clone(),
})
}
pub fn accept<A>(self, address: A) -> Result<Infallible, Error>
where
A: ToSocketAddrs,
{
let socket = TcpListener::bind(address)?;
loop {
let (source, _) = socket.accept()?;
let sink = source.try_clone()?;
let source = BufReader::new(source);
let sink = BufWriter::new(sink);
self.dispatch(source.into(), sink.into())?;
}
}
}