hyper_sync/server/
mod.rs

1//! HTTP Server
2//!
3//! # Server
4//!
5//! A `Server` is created to listen on port, parse HTTP requests, and hand
6//! them off to a `Handler`. By default, the Server will listen across multiple
7//! threads, but that can be configured to a single thread if preferred.
8//!
9//! # Handling requests
10//!
11//! You must pass a `Handler` to the Server that will handle requests. There is
12//! a default implementation for `fn`s and closures, allowing you pass one of
13//! those easily.
14//!
15//!
16//! ```no_run
17//! use hyper_sync::server::{Server, Request, Response};
18//!
19//! fn hello(req: Request, res: Response) {
20//!     // handle things here
21//! }
22//!
23//! Server::http("0.0.0.0:0").unwrap().handle(hello).unwrap();
24//! ```
25//!
26//! As with any trait, you can also define a struct and implement `Handler`
27//! directly on your own type, and pass that to the `Server` instead.
28//!
29//! ```no_run
30//! use std::sync::Mutex;
31//! use std::sync::mpsc::{channel, Sender};
32//! use hyper_sync::server::{Handler, Server, Request, Response};
33//!
34//! struct SenderHandler {
35//!     sender: Mutex<Sender<&'static str>>
36//! }
37//!
38//! impl Handler for SenderHandler {
39//!     fn handle(&self, req: Request, res: Response) {
40//!         self.sender.lock().unwrap().send("start").unwrap();
41//!     }
42//! }
43//!
44//!
45//! let (tx, rx) = channel();
46//! Server::http("0.0.0.0:0").unwrap().handle(SenderHandler {
47//!     sender: Mutex::new(tx)
48//! }).unwrap();
49//! ```
50//!
51//! Since the `Server` will be listening on multiple threads, the `Handler`
52//! must implement `Sync`: any mutable state must be synchronized.
53//!
54//! ```no_run
55//! use std::sync::atomic::{AtomicUsize, Ordering};
56//! use hyper_sync::server::{Server, Request, Response};
57//!
58//! let counter = AtomicUsize::new(0);
59//! Server::http("0.0.0.0:0").unwrap().handle(move |req: Request, res: Response| {
60//!     counter.fetch_add(1, Ordering::Relaxed);
61//! }).unwrap();
62//! ```
63//!
64//! # The `Request` and `Response` pair
65//!
66//! A `Handler` receives a pair of arguments, a `Request` and a `Response`. The
67//! `Request` includes access to the `method`, `uri`, and `headers` of the
68//! incoming HTTP request. It also implements `std::io::Read`, in order to
69//! read any body, such as with `POST` or `PUT` messages.
70//!
71//! Likewise, the `Response` includes ways to set the `status` and `headers`,
72//! and implements `std::io::Write` to allow writing the response body.
73//!
74//! ```no_run
75//! use std::io;
76//! use hyper_sync::server::{Server, Request, Response};
77//! use hyper_sync::status::StatusCode;
78//!
79//! Server::http("0.0.0.0:0").unwrap().handle(|mut req: Request, mut res: Response| {
80//!     match req.method {
81//!         hyper_sync::Post => {
82//!             io::copy(&mut req, &mut res.start().unwrap()).unwrap();
83//!         },
84//!         _ => *res.status_mut() = StatusCode::MethodNotAllowed
85//!     }
86//! }).unwrap();
87//! ```
88//!
89//! ## An aside: Write Status
90//!
91//! The `Response` uses a phantom type parameter to determine its write status.
92//! What does that mean? In short, it ensures you never write a body before
93//! adding all headers, and never add a header after writing some of the body.
94//!
95//! This is often done in most implementations by include a boolean property
96//! on the response, such as `headers_written`, checking that each time the
97//! body has something to write, so as to make sure the headers are sent once,
98//! and only once. But this has 2 downsides:
99//!
100//! 1. You are typically never notified that your late header is doing nothing.
101//! 2. There's a runtime cost to checking on every write.
102//!
103//! Instead, hyper_sync handles this statically, or at compile-time. A
104//! `Response<Fresh>` includes a `headers_mut()` method, allowing you add more
105//! headers. It also does not implement `Write`, so you can't accidentally
106//! write early. Once the "head" of the response is correct, you can "send" it
107//! out by calling `start` on the `Response<Fresh>`. This will return a new
108//! `Response<Streaming>` object, that no longer has `headers_mut()`, but does
109//! implement `Write`.
110use std::fmt;
111use std::io::{self, ErrorKind, BufWriter, Write};
112use std::net::{SocketAddr, ToSocketAddrs};
113use std::thread::{self, JoinHandle};
114use std::time::Duration;
115
116use num_cpus;
117
118pub use self::request::Request;
119pub use self::response::Response;
120
121pub use net::{Fresh, Streaming};
122
123use Error;
124use buffer::BufReader;
125use header::{Headers, Expect, Connection};
126use http;
127use method::Method;
128use net::{NetworkListener, NetworkStream, HttpListener, HttpsListener, SslServer};
129use status::StatusCode;
130use uri::RequestUri;
131use version::HttpVersion::Http11;
132
133use self::listener::ListenerPool;
134
135pub mod request;
136pub mod response;
137
138mod listener;
139
140/// A server can listen on a TCP socket.
141///
142/// Once listening, it will create a `Request`/`Response` pair for each
143/// incoming connection, and hand them to the provided handler.
144#[derive(Debug)]
145pub struct Server<L = HttpListener> {
146    listener: L,
147    timeouts: Timeouts,
148}
149
150#[derive(Clone, Copy, Debug)]
151struct Timeouts {
152    read: Option<Duration>,
153    keep_alive: Option<Duration>,
154}
155
156impl Default for Timeouts {
157    fn default() -> Timeouts {
158        Timeouts {
159            read: None,
160            keep_alive: Some(Duration::from_secs(5))
161        }
162    }
163}
164
165impl<L: NetworkListener> Server<L> {
166    /// Creates a new server with the provided handler.
167    #[inline]
168    pub fn new(listener: L) -> Server<L> {
169        Server {
170            listener: listener,
171            timeouts: Timeouts::default()
172        }
173    }
174
175    /// Controls keep-alive for this server.
176    ///
177    /// The timeout duration passed will be used to determine how long
178    /// to keep the connection alive before dropping it.
179    ///
180    /// Passing `None` will disable keep-alive.
181    ///
182    /// Default is enabled with a 5 second timeout.
183    #[inline]
184    pub fn keep_alive(&mut self, timeout: Option<Duration>) {
185        self.timeouts.keep_alive = timeout;
186    }
187
188    /// Sets the read timeout for all Request reads.
189    pub fn set_read_timeout(&mut self, dur: Option<Duration>) {
190        self.listener.set_read_timeout(dur);
191        self.timeouts.read = dur;
192    }
193
194    /// Sets the write timeout for all Response writes.
195    pub fn set_write_timeout(&mut self, dur: Option<Duration>) {
196        self.listener.set_write_timeout(dur);
197    }
198
199    /// Get the address that the server is listening on.
200    pub fn local_addr(&mut self) -> io::Result<SocketAddr> {
201        self.listener.local_addr()
202    }
203}
204
205impl Server<HttpListener> {
206    /// Creates a new server that will handle `HttpStream`s.
207    pub fn http<To: ToSocketAddrs>(addr: To) -> ::Result<Server<HttpListener>> {
208        HttpListener::new(addr).map(Server::new)
209    }
210}
211
212impl<S: SslServer + Clone + Send> Server<HttpsListener<S>> {
213    /// Creates a new server that will handle `HttpStream`s over SSL.
214    ///
215    /// You can use any SSL implementation, as long as implements `hyper_sync::net::Ssl`.
216    pub fn https<A: ToSocketAddrs>(addr: A, ssl: S) -> ::Result<Server<HttpsListener<S>>> {
217        HttpsListener::new(addr, ssl).map(Server::new)
218    }
219}
220
221impl<L: NetworkListener + Send + 'static> Server<L> {
222    /// Binds to a socket and starts handling connections.
223    pub fn handle<H: Handler + 'static>(self, handler: H) -> ::Result<Listening> {
224        self.handle_threads(handler, num_cpus::get() * 5 / 4)
225    }
226
227    /// Binds to a socket and starts handling connections with the provided
228    /// number of threads.
229    pub fn handle_threads<H: Handler + 'static>(self, handler: H,
230            threads: usize) -> ::Result<Listening> {
231        handle(self, handler, threads)
232    }
233}
234
235fn handle<H, L>(mut server: Server<L>, handler: H, threads: usize) -> ::Result<Listening>
236where H: Handler + 'static, L: NetworkListener + Send + 'static {
237    let socket = try!(server.listener.local_addr());
238
239    debug!("threads = {:?}", threads);
240    let pool = ListenerPool::new(server.listener);
241    let worker = Worker::new(handler, server.timeouts);
242    let work = move |mut stream| worker.handle_connection(&mut stream);
243
244    let guard = thread::spawn(move || pool.accept(work, threads));
245
246    Ok(Listening {
247        _guard: Some(guard),
248        socket: socket,
249    })
250}
251
252struct Worker<H: Handler + 'static> {
253    handler: H,
254    timeouts: Timeouts,
255}
256
257impl<H: Handler + 'static> Worker<H> {
258    fn new(handler: H, timeouts: Timeouts) -> Worker<H> {
259        Worker {
260            handler: handler,
261            timeouts: timeouts,
262        }
263    }
264
265    fn handle_connection<S>(&self, stream: &mut S) where S: NetworkStream + Clone {
266        debug!("Incoming stream");
267
268        self.handler.on_connection_start();
269
270        let addr = match stream.peer_addr() {
271            Ok(addr) => addr,
272            Err(e) => {
273                info!("Peer Name error: {:?}", e);
274                return;
275            }
276        };
277
278        // FIXME: Use Type ascription
279        let stream_clone: &mut NetworkStream = &mut stream.clone();
280        let mut rdr = BufReader::new(stream_clone);
281        let mut wrt = BufWriter::new(stream);
282
283        while self.keep_alive_loop(&mut rdr, &mut wrt, addr) {
284            if let Err(e) = self.set_read_timeout(*rdr.get_ref(), self.timeouts.keep_alive) {
285                info!("set_read_timeout keep_alive {:?}", e);
286                break;
287            }
288        }
289
290        self.handler.on_connection_end();
291
292        debug!("keep_alive loop ending for {}", addr);
293    }
294
295    fn set_read_timeout(&self, s: &NetworkStream, timeout: Option<Duration>) -> io::Result<()> {
296        s.set_read_timeout(timeout)
297    }
298
299    fn keep_alive_loop<W: Write>(&self, rdr: &mut BufReader<&mut NetworkStream>,
300            wrt: &mut W, addr: SocketAddr) -> bool {
301        let req = match Request::new(rdr, addr) {
302            Ok(req) => req,
303            Err(Error::Io(ref e)) if e.kind() == ErrorKind::ConnectionAborted => {
304                trace!("tcp closed, cancelling keep-alive loop");
305                return false;
306            }
307            Err(Error::Io(e)) => {
308                debug!("ioerror in keepalive loop = {:?}", e);
309                return false;
310            }
311            Err(e) => {
312                //TODO: send a 400 response
313                info!("request error = {:?}", e);
314                return false;
315            }
316        };
317
318        if !self.handle_expect(&req, wrt) {
319            return false;
320        }
321
322        if let Err(e) = req.set_read_timeout(self.timeouts.read) {
323            info!("set_read_timeout {:?}", e);
324            return false;
325        }
326
327        let mut keep_alive = self.timeouts.keep_alive.is_some() &&
328            http::should_keep_alive(req.version, &req.headers);
329        let version = req.version;
330        let mut res_headers = Headers::new();
331        if !keep_alive {
332            res_headers.set(Connection::close());
333        }
334        {
335            let mut res = Response::new(wrt, &mut res_headers);
336            res.version = version;
337            self.handler.handle(req, res);
338        }
339
340        // if the request was keep-alive, we need to check that the server agrees
341        // if it wasn't, then the server cannot force it to be true anyways
342        if keep_alive {
343            keep_alive = http::should_keep_alive(version, &res_headers);
344        }
345
346        debug!("keep_alive = {:?} for {}", keep_alive, addr);
347        keep_alive
348    }
349
350    fn handle_expect<W: Write>(&self, req: &Request, wrt: &mut W) -> bool {
351         if req.version == Http11 && req.headers.get() == Some(&Expect::Continue) {
352            let status = self.handler.check_continue((&req.method, &req.uri, &req.headers));
353            match write!(wrt, "{} {}\r\n\r\n", Http11, status).and_then(|_| wrt.flush()) {
354                Ok(..) => (),
355                Err(e) => {
356                    info!("error writing 100-continue: {:?}", e);
357                    return false;
358                }
359            }
360
361            if status != StatusCode::Continue {
362                debug!("non-100 status ({}) for Expect 100 request", status);
363                return false;
364            }
365        }
366
367        true
368    }
369}
370
371/// A listening server, which can later be closed.
372pub struct Listening {
373    _guard: Option<JoinHandle<()>>,
374    /// The socket addresses that the server is bound to.
375    pub socket: SocketAddr,
376}
377
378impl fmt::Debug for Listening {
379    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
380        write!(f, "Listening {{ socket: {:?} }}", self.socket)
381    }
382}
383
384impl Drop for Listening {
385    fn drop(&mut self) {
386        let _ = self._guard.take().map(|g| g.join());
387    }
388}
389
390impl Listening {
391    /// Warning: This function doesn't work. The server remains listening after you called
392    /// it. See https://github.com/hyperium/hyper/issues/338 for more details.
393    ///
394    /// Stop the server from listening to its socket address.
395    pub fn close(&mut self) -> ::Result<()> {
396        let _ = self._guard.take();
397        debug!("closing server");
398        Ok(())
399    }
400}
401
402/// A handler that can handle incoming requests for a server.
403pub trait Handler: Sync + Send {
404    /// Receives a `Request`/`Response` pair, and should perform some action on them.
405    ///
406    /// This could reading from the request, and writing to the response.
407    fn handle<'a, 'k>(&'a self, Request<'a, 'k>, Response<'a, Fresh>);
408
409    /// Called when a Request includes a `Expect: 100-continue` header.
410    ///
411    /// By default, this will always immediately response with a `StatusCode::Continue`,
412    /// but can be overridden with custom behavior.
413    fn check_continue(&self, _: (&Method, &RequestUri, &Headers)) -> StatusCode {
414        StatusCode::Continue
415    }
416
417    /// This is run after a connection is received, on a per-connection basis (not a
418    /// per-request basis, as a connection with keep-alive may handle multiple
419    /// requests)
420    fn on_connection_start(&self) { }
421
422    /// This is run before a connection is closed, on a per-connection basis (not a
423    /// per-request basis, as a connection with keep-alive may handle multiple
424    /// requests)
425    fn on_connection_end(&self) { }
426}
427
428impl<F> Handler for F where F: Fn(Request, Response<Fresh>), F: Sync + Send {
429    fn handle<'a, 'k>(&'a self, req: Request<'a, 'k>, res: Response<'a, Fresh>) {
430        self(req, res)
431    }
432}
433
434#[cfg(test)]
435mod tests {
436    use header::Headers;
437    use method::Method;
438    use mock::MockStream;
439    use status::StatusCode;
440    use uri::RequestUri;
441
442    use super::{Request, Response, Fresh, Handler, Worker};
443
444    #[test]
445    fn test_check_continue_default() {
446        let mut mock = MockStream::with_input(b"\
447            POST /upload HTTP/1.1\r\n\
448            Host: example.domain\r\n\
449            Expect: 100-continue\r\n\
450            Content-Length: 10\r\n\
451            \r\n\
452            1234567890\
453        ");
454
455        fn handle(_: Request, res: Response<Fresh>) {
456            res.start().unwrap().end().unwrap();
457        }
458
459        Worker::new(handle, Default::default()).handle_connection(&mut mock);
460        let cont = b"HTTP/1.1 100 Continue\r\n\r\n";
461        assert_eq!(&mock.write[..cont.len()], cont);
462        let res = b"HTTP/1.1 200 OK\r\n";
463        assert_eq!(&mock.write[cont.len()..cont.len() + res.len()], res);
464    }
465
466    #[test]
467    fn test_check_continue_reject() {
468        struct Reject;
469        impl Handler for Reject {
470            fn handle<'a, 'k>(&'a self, _: Request<'a, 'k>, res: Response<'a, Fresh>) {
471                res.start().unwrap().end().unwrap();
472            }
473
474            fn check_continue(&self, _: (&Method, &RequestUri, &Headers)) -> StatusCode {
475                StatusCode::ExpectationFailed
476            }
477        }
478
479        let mut mock = MockStream::with_input(b"\
480            POST /upload HTTP/1.1\r\n\
481            Host: example.domain\r\n\
482            Expect: 100-continue\r\n\
483            Content-Length: 10\r\n\
484            \r\n\
485            1234567890\
486        ");
487
488        Worker::new(Reject, Default::default()).handle_connection(&mut mock);
489        assert_eq!(mock.write, &b"HTTP/1.1 417 Expectation Failed\r\n\r\n"[..]);
490    }
491}