mco_http/server/
mod.rs

1//! HTTP Server
2//!
3//! # Server
4//!
5//! A `Server` is created to listen on port, parse HTTP requests, and hand
6//! them off to a `Handler`. By default, the Server will listen across multiple
7//! threads, but that can be configured to a single thread if preferred.
8//!
9//! # Handling requests
10//!
11//! You must pass a `Handler` to the Server that will handle requests. There is
12//! a default implementation for `fn`s and closures, allowing you pass one of
13//! those easily.
14//!
15//!
16//! ```no_run
17//! use mco_http::server::{Server, Request, Response};
18//!
19//! fn hello(req: Request, res: Response) {
20//!     // handle things here
21//! }
22//!
23//! Server::http("0.0.0.0:0").unwrap().handle(hello).unwrap();
24//! ```
25//!
26//! As with any trait, you can also define a struct and implement `Handler`
27//! directly on your own type, and pass that to the `Server` instead.
28//!
29//! ```no_run
30//! use std::sync::Mutex;
31//! use std::sync::mpsc::{channel, Sender};
32//! use mco_http::server::{Handler, Server, Request, Response};
33//!
34//! struct SenderHandler {
35//!     sender: Mutex<Sender<&'static str>>
36//! }
37//!
38//! impl Handler for SenderHandler {
39//!     fn handle(&self, req: Request, res: Response) {
40//!         self.sender.lock().unwrap().send("start").unwrap();
41//!     }
42//! }
43//!
44//!
45//! let (tx, rx) = channel();
46//! Server::http("0.0.0.0:0").unwrap().handle(SenderHandler {
47//!     sender: Mutex::new(tx)
48//! }).unwrap();
49//! ```
50//!
51//! Since the `Server` will be listening on multiple threads, the `Handler`
52//! must implement `Sync`: any mutable state must be synchronized.
53//!
54//! ```no_run
55//! use std::sync::atomic::{AtomicUsize, Ordering};
56//! use mco_http::server::{Server, Request, Response};
57//!
58//! let counter = AtomicUsize::new(0);
59//! Server::http("0.0.0.0:0").unwrap().handle(move |req: Request, res: Response| {
60//!     counter.fetch_add(1, Ordering::Relaxed);
61//! }).unwrap();
62//! ```
63//!
64//! # The `Request` and `Response` pair
65//!
66//! A `Handler` receives a pair of arguments, a `Request` and a `Response`. The
67//! `Request` includes access to the `method`, `uri`, and `headers` of the
68//! incoming HTTP request. It also implements `std::io::Read`, in order to
69//! read any body, such as with `POST` or `PUT` messages.
70//!
71//! Likewise, the `Response` includes ways to set the `status` and `headers`,
72//! and implements `std::io::Write` to allow writing the response body.
73//!
74//! ```no_run
75//! use std::io;
76//! use mco_http::server::{Server, Request, Response};
77//! use mco_http::status::StatusCode;
78//!
79//! Server::http("0.0.0.0:0").unwrap().handle(|mut req: Request, mut res: Response| {
80//!     match req.method {
81//!         mco_http::Post => {
82//!             io::copy(&mut req, &mut res.start().unwrap()).unwrap();
83//!         },
84//!         _ => *res.status_mut() = StatusCode::MethodNotAllowed
85//!     }
86//! }).unwrap();
87//! ```
88//!
89//! ## An aside: Write Status
90//!
91//! The `Response` uses a phantom type parameter to determine its write status.
92//! What does that mean? In short, it ensures you never write a body before
93//! adding all headers, and never add a header after writing some of the body.
94//!
95//! This is often done in most implementations by include a boolean property
96//! on the response, such as `headers_written`, checking that each time the
97//! body has something to write, so as to make sure the headers are sent once,
98//! and only once. But this has 2 downsides:
99//!
100//! 1. You are typically never notified that your late header is doing nothing.
101//! 2. There's a runtime cost to checking on every write.
102//!
103//! Instead, mco_http handles this statically, or at compile-time. A
104//! `Response<Fresh>` includes a `headers_mut()` method, allowing you add more
105//! headers. It also does not implement `Write`, so you can't accidentally
106//! write early. Once the "head" of the response is correct, you can "send" it
107//! out by calling `start` on the `Response<Fresh>`. This will return a new
108//! `Response<Streaming>` object, that no longer has `headers_mut()`, but does
109//! implement `Write`.
110use std::fmt;
111use std::io::{self, ErrorKind, BufWriter, Write};
112use std::net::{SocketAddr, ToSocketAddrs, Shutdown};
113use std::time::Duration;
114
115use num_cpus;
116
117pub use self::request::Request;
118pub use self::response::Response;
119
120pub use crate::net::{Fresh, Streaming};
121
122use crate::{Error, runtime};
123use crate::buffer::BufReader;
124use crate::header::{Headers, Expect, Connection};
125use crate::http;
126use crate::method::Method;
127use crate::net::{NetworkListener, NetworkStream, HttpListener, HttpsListener, SslServer};
128use crate::status::StatusCode;
129use crate::uri::RequestUri;
130use crate::version::HttpVersion::Http11;
131
132use self::listener::ListenerPool;
133
134pub mod request;
135pub mod response;
136
137pub mod extensions;
138
139pub use extensions::*;
140
141mod listener;
142
143/// A server can listen on a TCP socket.
144///
145/// Once listening, it will create a `Request`/`Response` pair for each
146/// incoming connection, and hand them to the provided handler.
147#[derive(Debug)]
148pub struct Server<L = HttpListener> {
149    listener: L,
150    timeouts: Timeouts,
151}
152
153#[derive(Clone, Copy, Debug)]
154pub struct Timeouts {
155    pub read: Option<Duration>,
156    pub keep_alive: Option<Duration>,
157}
158
159impl Default for Timeouts {
160    fn default() -> Timeouts {
161        Timeouts {
162            read: None,
163            keep_alive: Some(Duration::from_secs(5))
164        }
165    }
166}
167
168impl<L: NetworkListener> Server<L> {
169    /// Creates a new server with the provided handler.
170    #[inline]
171    pub fn new(listener: L) -> Server<L> {
172        Server {
173            listener: listener,
174            timeouts: Timeouts::default()
175        }
176    }
177
178    /// Controls keep-alive for this server.
179    ///
180    /// The timeout duration passed will be used to determine how long
181    /// to keep the connection alive before dropping it.
182    ///
183    /// Passing `None` will disable keep-alive.
184    ///
185    /// Default is enabled with a 5 second timeout.
186    #[inline]
187    pub fn keep_alive(&mut self, timeout: Option<Duration>) {
188        self.timeouts.keep_alive = timeout;
189    }
190
191    /// Sets the read timeout for all Request reads.
192    pub fn set_read_timeout(&mut self, dur: Option<Duration>) {
193        self.listener.set_read_timeout(dur);
194        self.timeouts.read = dur;
195    }
196
197    /// Sets the write timeout for all Response writes.
198    pub fn set_write_timeout(&mut self, dur: Option<Duration>) {
199        self.listener.set_write_timeout(dur);
200    }
201
202    /// Get the address that the server is listening on.
203    pub fn local_addr(&mut self) -> io::Result<SocketAddr> {
204        self.listener.local_addr()
205    }
206}
207
208impl Server<HttpListener> {
209    /// Creates a new server that will handle `HttpStream`s.
210    pub fn http<To: ToSocketAddrs>(addr: To) -> crate::Result<Server<HttpListener>> {
211        HttpListener::new(addr).map(Server::new)
212    }
213}
214
215impl<S: SslServer + Clone + Send> Server<HttpsListener<S>> {
216    /// Creates a new server that will handle `HttpStream`s over SSL.
217    ///
218    /// You can use any SSL implementation, as long as implements `mco_http::net::Ssl`.
219    pub fn https<A: ToSocketAddrs>(addr: A, ssl: S) -> crate::Result<Server<HttpsListener<S>>> {
220        HttpsListener::new(addr, ssl).map(Server::new)
221    }
222}
223
224impl<L: NetworkListener + Send + 'static> Server<L> {
225    /// Binds to a socket and starts handling connections.
226    pub fn handle<H: Handler + 'static>(self, handler: H) -> crate::Result<Listening> {
227        self.handle_threads(handler, num_cpus::get() * 2)
228    }
229
230    /// Binds to a socket and starts handling connections with the provided
231    /// number of threads.
232    pub fn handle_threads<H: Handler + 'static>(self, handler: H,
233            threads: usize) -> crate::Result<Listening> {
234        handle(self, handler, threads)
235    }
236
237    /// Binds to a socket and starts handling connections.
238    pub fn handle_accept(self, handler: fn(L::Stream)) -> crate::Result<Listening> {
239        handle_accept::<L>(self,handler, num_cpus::get() * 2)
240    }
241}
242
243fn handle<H, L>(mut server: Server<L>, handler: H, threads: usize) -> crate::Result<Listening>
244where H: Handler + 'static, L: NetworkListener + Send + 'static {
245    let socket = server.listener.local_addr()?;
246
247    debug!("threads = {:?}", threads);
248    let pool = ListenerPool::new(server.listener);
249    let worker = Worker::new(handler, server.timeouts);
250    let work = move |mut stream| {
251        worker.handle_connection(&mut stream);
252    };
253
254    let guard = runtime::spawn(move || pool.accept(work, threads));
255
256    Ok(Listening {
257        _guard: Some(guard),
258        socket: socket,
259    })
260}
261
262fn handle_accept<L>(mut server: Server<L>, handler: fn(L::Stream), threads: usize) -> crate::Result<Listening>
263    where L: NetworkListener + Send + 'static {
264    let socket = server.listener.local_addr()?;
265
266    debug!("threads = {:?}", threads);
267    let pool = ListenerPool::new(server.listener);
268    let work = move |stream| {
269        handler(stream)
270    };
271
272    let guard = runtime::spawn(move || pool.accept(work, threads));
273
274    Ok(Listening {
275        _guard: Some(guard),
276        socket: socket,
277    })
278}
279
280pub struct Worker<H: Handler + 'static> {
281    handler: H,
282    timeouts: Timeouts,
283}
284
285impl<H: Handler + 'static> Worker<H> {
286    pub fn new(handler: H, timeouts: Timeouts) -> Worker<H> {
287        Worker {
288            handler: handler,
289            timeouts: timeouts,
290        }
291    }
292
293    pub fn handle_connection<S>(&self, stream: &mut S) where S: NetworkStream + Clone {
294        debug!("Incoming stream");
295
296        self.handler.on_connection_start();
297
298        let addr = match stream.peer_addr() {
299            Ok(addr) => addr,
300            Err(e) => {
301                info!("Peer Name error: {:?}", e);
302                return;
303            }
304        };
305
306        let stream2: &mut dyn NetworkStream = &mut stream.clone();
307        let mut rdr = BufReader::new(stream2);
308        let mut wrt = BufWriter::new(stream);
309
310        while self.keep_alive_loop(&mut rdr, &mut wrt, addr) {
311            if let Err(e) = self.set_read_timeout(*rdr.get_ref(), self.timeouts.keep_alive) {
312                info!("set_read_timeout keep_alive {:?}", e);
313                break;
314            }
315        }
316
317        self.handler.on_connection_end();
318
319        debug!("keep_alive loop ending for {}", addr);
320
321        if let Err(e) = rdr.get_mut().close(Shutdown::Both) {
322            info!("failed to close stream: {}", e);
323        }
324    }
325
326    fn set_read_timeout(&self, s: &dyn NetworkStream, timeout: Option<Duration>) -> io::Result<()> {
327        s.set_read_timeout(timeout)
328    }
329
330    fn keep_alive_loop<W: Write>(&self, rdr: &mut BufReader<&mut dyn NetworkStream>,
331            wrt: &mut W, addr: SocketAddr) -> bool {
332        let req = match Request::new(rdr, addr) {
333            Ok(req) => req,
334            Err(Error::Io(ref e)) if e.kind() == ErrorKind::ConnectionAborted => {
335                trace!("tcp closed, cancelling keep-alive loop");
336                return false;
337            }
338            Err(Error::Io(e)) => {
339                debug!("ioerror in keepalive loop = {:?}", e);
340                return false;
341            }
342            Err(e) => {
343                //TODO: send a 400 response
344                info!("request error = {:?}", e);
345                return false;
346            }
347        };
348
349        if !self.handle_expect(&req, wrt) {
350            return false;
351        }
352
353        if let Err(e) = req.set_read_timeout(self.timeouts.read) {
354            info!("set_read_timeout {:?}", e);
355            return false;
356        }
357
358        let mut keep_alive = self.timeouts.keep_alive.is_some() &&
359            http::should_keep_alive(req.version, &req.headers);
360        let version = req.version;
361        let mut res_headers = Headers::new();
362        if !keep_alive {
363            res_headers.set(Connection::close());
364        }
365        {
366            let mut res = Response::new(wrt, &mut res_headers);
367            res.version = version;
368            self.handler.handle(req, res);
369        }
370
371        // if the request was keep-alive, we need to check that the server agrees
372        // if it wasn't, then the server cannot force it to be true anyways
373        if keep_alive {
374            keep_alive = http::should_keep_alive(version, &res_headers);
375        }
376
377        debug!("keep_alive = {:?} for {}", keep_alive, addr);
378        keep_alive
379    }
380
381    fn handle_expect<W: Write>(&self, req: &Request, wrt: &mut W) -> bool {
382         if req.version == Http11 && req.headers.get() == Some(&Expect::Continue) {
383            let status = self.handler.check_continue((&req.method, &req.uri, &req.headers));
384            match write!(wrt, "{} {}\r\n\r\n", Http11, status).and_then(|_| wrt.flush()) {
385                Ok(..) => (),
386                Err(e) => {
387                    info!("error writing 100-continue: {:?}", e);
388                    return false;
389                }
390            }
391
392            if status != StatusCode::Continue {
393                debug!("non-100 status ({}) for Expect 100 request", status);
394                return false;
395            }
396        }
397
398        true
399    }
400}
401
402/// A listening server, which can later be closed.
403pub struct Listening {
404    _guard: Option<runtime::JoinHandle<()>>,
405    /// The socket addresses that the server is bound to.
406    pub socket: SocketAddr,
407}
408
409impl fmt::Debug for Listening {
410    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
411        write!(f, "Listening {{ socket: {:?} }}", self.socket)
412    }
413}
414
415impl Drop for Listening {
416    fn drop(&mut self) {
417        let _ = self._guard.take().map(|g| g.join());
418    }
419}
420
421impl Listening {
422    /// Warning: This function doesn't work. The server remains listening after you called
423    /// it. See https://github.com/mco_httpium/mco_http/issues/338 for more details.
424    ///
425    /// Stop the server from listening to its socket address.
426    pub fn close(&mut self) -> crate::Result<()> {
427        let _ = self._guard.take();
428        debug!("closing server");
429        Ok(())
430    }
431}
432
433/// A handler that can handle incoming requests for a server.
434pub trait Handler: Sync + Send {
435    /// Receives a `Request`/`Response` pair, and should perform some action on them.
436    ///
437    /// This could reading from the request, and writing to the response.
438    fn handle<'a, 'k>(&'a self, _: Request<'a, 'k>, _: Response<'a, Fresh>);
439
440    /// Called when a Request includes a `Expect: 100-continue` header.
441    ///
442    /// By default, this will always immediately response with a `StatusCode::Continue`,
443    /// but can be overridden with custom behavior.
444    fn check_continue(&self, _: (&Method, &RequestUri, &Headers)) -> StatusCode {
445        StatusCode::Continue
446    }
447
448    /// This is run after a connection is received, on a per-connection basis (not a
449    /// per-request basis, as a connection with keep-alive may handle multiple
450    /// requests)
451    fn on_connection_start(&self) { }
452
453    /// This is run before a connection is closed, on a per-connection basis (not a
454    /// per-request basis, as a connection with keep-alive may handle multiple
455    /// requests)
456    fn on_connection_end(&self) { }
457}
458
459impl<F> Handler for F where F: Fn(Request, Response<Fresh>), F: Sync + Send {
460    fn handle<'a, 'k>(&'a self, req: Request<'a, 'k>, res: Response<'a, Fresh>) {
461        self(req, res)
462    }
463}
464
465#[cfg(test)]
466mod tests {
467    use crate::header::Headers;
468    use crate::method::Method;
469    use crate::mock::MockStream;
470    use crate::status::StatusCode;
471    use crate::uri::RequestUri;
472
473    use super::{Request, Response, Fresh, Handler, Worker};
474
475    #[test]
476    fn test_check_continue_default() {
477        let mut mock = MockStream::with_input(b"\
478            POST /upload HTTP/1.1\r\n\
479            Host: example.domain\r\n\
480            Expect: 100-continue\r\n\
481            Content-Length: 10\r\n\
482            \r\n\
483            1234567890\
484        ");
485
486        fn handle(_: Request, res: Response<Fresh>) {
487            res.start().unwrap().end().unwrap();
488        }
489
490        Worker::new(handle, Default::default()).handle_connection(&mut mock);
491        let cont = b"HTTP/1.1 100 Continue\r\n\r\n";
492        assert_eq!(&mock.write[..cont.len()], cont);
493        let res = b"HTTP/1.1 200 OK\r\n";
494        assert_eq!(&mock.write[cont.len()..cont.len() + res.len()], res);
495    }
496
497    #[test]
498    fn test_check_continue_reject() {
499        struct Reject;
500        impl Handler for Reject {
501            fn handle<'a, 'k>(&'a self, _: Request<'a, 'k>, res: Response<'a, Fresh>) {
502                res.start().unwrap().end().unwrap();
503            }
504
505            fn check_continue(&self, _: (&Method, &RequestUri, &Headers)) -> StatusCode {
506                StatusCode::ExpectationFailed
507            }
508        }
509
510        let mut mock = MockStream::with_input(b"\
511            POST /upload HTTP/1.1\r\n\
512            Host: example.domain\r\n\
513            Expect: 100-continue\r\n\
514            Content-Length: 10\r\n\
515            \r\n\
516            1234567890\
517        ");
518
519        Worker::new(Reject, Default::default()).handle_connection(&mut mock);
520        assert_eq!(mock.write, &b"HTTP/1.1 417 Expectation Failed\r\n\r\n"[..]);
521    }
522}