1#![cfg(feature = "server")]
3
4mod pool;
5mod worker;
6
7use crate::bytes::{Sink, Source};
8use crate::error::Error;
9use crate::http::{Request, Response, ResponseExt};
10use crate::server::pool::{Executable, Threadpool};
11use std::convert::Infallible;
12use std::io::{BufReader, BufWriter, Write};
13use std::net::{TcpListener, ToSocketAddrs};
14use std::sync::Arc;
15
16#[derive(Clone)]
18enum Handler {
19 SourceSink(Arc<dyn Fn(&mut Source, &mut Sink) -> bool + Send + Sync + 'static>),
21 RequestResponse(Arc<dyn Fn(Request) -> Response + Send + Sync + 'static>),
23}
24impl Handler {
25 pub fn exec(&self, source: &mut Source, sink: &mut Sink) -> bool {
27 match self {
28 Handler::SourceSink(handler) => handler(source, sink),
29 Handler::RequestResponse(handler) => Self::bridge_request_response(source, sink, handler.as_ref()),
30 }
31 }
32
33 fn bridge_request_response<F>(source: &mut Source, sink: &mut Sink, handler: &F) -> bool
35 where
36 F: Fn(Request) -> Response + ?Sized,
37 {
38 let Ok(Some(request)) = Request::from_stream(source) else {
40 return false;
41 };
42
43 let is_head = request.method.as_ref().eq_ignore_ascii_case(b"HEAD");
45 let mut response = handler(request);
46 if is_head {
47 response.make_head();
49 }
50
51 let Ok(_) = response.to_stream(sink) else {
53 return false;
54 };
55
56 !response.has_connection_close()
58 }
59}
60
61struct Connection<const STACK_SIZE: usize> {
63 pub handler: Handler,
65 pub source: Source,
67 pub sink: Sink,
69 pub threadpool: Threadpool<Self, STACK_SIZE>,
71}
72impl<const STACK_SIZE: usize> Executable for Connection<STACK_SIZE> {
73 fn exec(mut self) {
74 let reschedule = self.handler.exec(&mut self.source, &mut self.sink);
76 if self.sink.flush().is_ok() && reschedule {
77 let threadpool = self.threadpool.clone();
79 let _ = threadpool.dispatch(self);
80 }
81 }
82}
83
84pub struct Server<const STACK_SIZE: usize> {
86 threadpool: Threadpool<Connection<STACK_SIZE>, STACK_SIZE>,
88 handler: Handler,
90}
91impl<const STACK_SIZE: usize> Server<STACK_SIZE> {
92 pub fn with_source_sink<F>(workers_max: usize, source_sink_handler: F) -> Self
94 where
95 F: Fn(&mut Source, &mut Sink) -> bool + Send + Sync + 'static,
96 {
97 let threadpool: Threadpool<_, STACK_SIZE> = Threadpool::new(workers_max);
99 let handler = Handler::SourceSink(Arc::new(source_sink_handler));
100 Self { threadpool, handler }
101 }
102 pub fn with_request_response<F>(workers_max: usize, request_response_handler: F) -> Self
104 where
105 F: Fn(Request) -> Response + Send + Sync + 'static,
106 {
107 let threadpool: Threadpool<_, STACK_SIZE> = Threadpool::new(workers_max);
109 let handler = Handler::RequestResponse(Arc::new(request_response_handler));
110 Self { threadpool, handler }
111 }
112
113 pub fn dispatch(&self, source: Source, sink: Sink) -> Result<(), Error> {
115 self.threadpool.dispatch(Connection {
117 handler: self.handler.clone(),
118 source,
119 sink,
120 threadpool: self.threadpool.clone(),
121 })
122 }
123
124 pub fn accept<A>(self, address: A) -> Result<Infallible, Error>
126 where
127 A: ToSocketAddrs,
128 {
129 let socket = TcpListener::bind(address)?;
131 loop {
132 let (source, _) = socket.accept()?;
134 let sink = source.try_clone()?;
135
136 let source = BufReader::new(source);
138 let sink = BufWriter::new(sink);
139 self.dispatch(source.into(), sink.into())?;
140 }
141 }
142}