ehttpd/server/
mod.rs

1//! Implements a simple threadpool-based server
2#![cfg(feature = "server")]
3
4mod pool;
5mod worker;
6
7use crate::bytes::{Sink, Source};
8use crate::error::Error;
9use crate::http::{Request, Response, ResponseExt};
10use crate::server::pool::{Executable, Threadpool};
11use std::convert::Infallible;
12use std::io::{BufReader, BufWriter, Write};
13use std::net::{TcpListener, ToSocketAddrs};
14use std::sync::Arc;
15
16/// A connection handler
17#[derive(Clone)]
18enum Handler {
19    /// A `source,sink`-handler
20    SourceSink(Arc<dyn Fn(&mut Source, &mut Sink) -> bool + Send + Sync + 'static>),
21    /// A `request->response`-handler
22    RequestResponse(Arc<dyn Fn(Request) -> Response + Send + Sync + 'static>),
23}
24impl Handler {
25    /// Handles a given connection and returns whether the handler wants to be rescheduled (e.g. keep-alive)
26    pub fn exec(&self, source: &mut Source, sink: &mut Sink) -> bool {
27        match self {
28            Handler::SourceSink(handler) => handler(source, sink),
29            Handler::RequestResponse(handler) => Self::bridge_request_response(source, sink, handler.as_ref()),
30        }
31    }
32
33    /// Bridges a `request->response`-handler to a source-sink pattern
34    fn bridge_request_response<F>(source: &mut Source, sink: &mut Sink, handler: &F) -> bool
35    where
36        F: Fn(Request) -> Response + ?Sized,
37    {
38        // Read request
39        let Ok(Some(request)) = Request::from_stream(source) else {
40            return false;
41        };
42
43        // Handle the request accordingly
44        let is_head = request.method.as_ref().eq_ignore_ascii_case(b"HEAD");
45        let mut response = handler(request);
46        if is_head {
47            // Drop body for HEAD requests
48            response.make_head();
49        }
50
51        // Write response
52        let Ok(_) = response.to_stream(sink) else {
53            return false;
54        };
55
56        // Mark connection as to-be-rescheduled
57        !response.has_connection_close()
58    }
59}
60
61/// An encapsulated connection to pass to the thread pool
62struct Connection<const STACK_SIZE: usize> {
63    /// The connection handler
64    pub handler: Handler,
65    /// The receiving half of the stream
66    pub source: Source,
67    /// The writing half of the stream
68    pub sink: Sink,
69    /// The thread-pool so that keep-alive connections can requeue themselves
70    pub threadpool: Threadpool<Self, STACK_SIZE>,
71}
72impl<const STACK_SIZE: usize> Executable for Connection<STACK_SIZE> {
73    fn exec(mut self) {
74        // Call the connection handler and flush the output
75        let reschedule = self.handler.exec(&mut self.source, &mut self.sink);
76        if self.sink.flush().is_ok() && reschedule {
77            // Reschedule the connection
78            let threadpool = self.threadpool.clone();
79            let _ = threadpool.dispatch(self);
80        }
81    }
82}
83
84/// A threadpool-based HTTP server
85pub struct Server<const STACK_SIZE: usize> {
86    /// The thread pool to handle the incoming connections
87    threadpool: Threadpool<Connection<STACK_SIZE>, STACK_SIZE>,
88    /// The connection handler
89    handler: Handler,
90}
91impl<const STACK_SIZE: usize> Server<STACK_SIZE> {
92    /// Creates a new server with the given connection handler
93    pub fn with_source_sink<F>(workers_max: usize, source_sink_handler: F) -> Self
94    where
95        F: Fn(&mut Source, &mut Sink) -> bool + Send + Sync + 'static,
96    {
97        // Create threadpool and init self
98        let threadpool: Threadpool<_, STACK_SIZE> = Threadpool::new(workers_max);
99        let handler = Handler::SourceSink(Arc::new(source_sink_handler));
100        Self { threadpool, handler }
101    }
102    /// Creates a new server with the given connection handler
103    pub fn with_request_response<F>(workers_max: usize, request_response_handler: F) -> Self
104    where
105        F: Fn(Request) -> Response + Send + Sync + 'static,
106    {
107        // Create threadpool and init self
108        let threadpool: Threadpool<_, STACK_SIZE> = Threadpool::new(workers_max);
109        let handler = Handler::RequestResponse(Arc::new(request_response_handler));
110        Self { threadpool, handler }
111    }
112
113    /// Manually dispatches a connection
114    pub fn dispatch(&self, source: Source, sink: Sink) -> Result<(), Error> {
115        // Create and dispatch the job
116        self.threadpool.dispatch(Connection {
117            handler: self.handler.clone(),
118            source,
119            sink,
120            threadpool: self.threadpool.clone(),
121        })
122    }
123
124    /// Listens on the given address and accepts forever
125    pub fn accept<A>(self, address: A) -> Result<Infallible, Error>
126    where
127        A: ToSocketAddrs,
128    {
129        // Bind and listen
130        let socket = TcpListener::bind(address)?;
131        loop {
132            // Accept and prepare connection
133            let (source, _) = socket.accept()?;
134            let sink = source.try_clone()?;
135
136            // Dispatch connection
137            let source = BufReader::new(source);
138            let sink = BufWriter::new(sink);
139            self.dispatch(source.into(), sink.into())?;
140        }
141    }
142}