1use std::fmt;
111use std::io::{self, ErrorKind, BufWriter, Write};
112use std::net::{SocketAddr, ToSocketAddrs};
113use std::thread::{self, JoinHandle};
114use std::time::Duration;
115
116use num_cpus;
117
118pub use self::request::Request;
119pub use self::response::Response;
120
121pub use net::{Fresh, Streaming};
122
123use Error;
124use buffer::BufReader;
125use header::{Headers, Expect, Connection};
126use http;
127use method::Method;
128use net::{NetworkListener, NetworkStream, HttpListener, HttpsListener, SslServer};
129use status::StatusCode;
130use uri::RequestUri;
131use version::HttpVersion::Http11;
132
133use self::listener::ListenerPool;
134
135pub mod request;
136pub mod response;
137
138mod listener;
139
140#[derive(Debug)]
145pub struct Server<L = HttpListener> {
146 listener: L,
147 timeouts: Timeouts,
148}
149
150#[derive(Clone, Copy, Debug)]
151struct Timeouts {
152 read: Option<Duration>,
153 keep_alive: Option<Duration>,
154}
155
156impl Default for Timeouts {
157 fn default() -> Timeouts {
158 Timeouts {
159 read: None,
160 keep_alive: Some(Duration::from_secs(5))
161 }
162 }
163}
164
165impl<L: NetworkListener> Server<L> {
166 #[inline]
168 pub fn new(listener: L) -> Server<L> {
169 Server {
170 listener: listener,
171 timeouts: Timeouts::default()
172 }
173 }
174
175 #[inline]
184 pub fn keep_alive(&mut self, timeout: Option<Duration>) {
185 self.timeouts.keep_alive = timeout;
186 }
187
188 pub fn set_read_timeout(&mut self, dur: Option<Duration>) {
190 self.listener.set_read_timeout(dur);
191 self.timeouts.read = dur;
192 }
193
194 pub fn set_write_timeout(&mut self, dur: Option<Duration>) {
196 self.listener.set_write_timeout(dur);
197 }
198
199 pub fn local_addr(&mut self) -> io::Result<SocketAddr> {
201 self.listener.local_addr()
202 }
203}
204
205impl Server<HttpListener> {
206 pub fn http<To: ToSocketAddrs>(addr: To) -> ::Result<Server<HttpListener>> {
208 HttpListener::new(addr).map(Server::new)
209 }
210}
211
212impl<S: SslServer + Clone + Send> Server<HttpsListener<S>> {
213 pub fn https<A: ToSocketAddrs>(addr: A, ssl: S) -> ::Result<Server<HttpsListener<S>>> {
217 HttpsListener::new(addr, ssl).map(Server::new)
218 }
219}
220
221impl<L: NetworkListener + Send + 'static> Server<L> {
222 pub fn handle<H: Handler + 'static>(self, handler: H) -> ::Result<Listening> {
224 self.handle_threads(handler, num_cpus::get() * 5 / 4)
225 }
226
227 pub fn handle_threads<H: Handler + 'static>(self, handler: H,
230 threads: usize) -> ::Result<Listening> {
231 handle(self, handler, threads)
232 }
233}
234
235fn handle<H, L>(mut server: Server<L>, handler: H, threads: usize) -> ::Result<Listening>
236where H: Handler + 'static, L: NetworkListener + Send + 'static {
237 let socket = try!(server.listener.local_addr());
238
239 debug!("threads = {:?}", threads);
240 let pool = ListenerPool::new(server.listener);
241 let worker = Worker::new(handler, server.timeouts);
242 let work = move |mut stream| worker.handle_connection(&mut stream);
243
244 let guard = thread::spawn(move || pool.accept(work, threads));
245
246 Ok(Listening {
247 _guard: Some(guard),
248 socket: socket,
249 })
250}
251
252struct Worker<H: Handler + 'static> {
253 handler: H,
254 timeouts: Timeouts,
255}
256
257impl<H: Handler + 'static> Worker<H> {
258 fn new(handler: H, timeouts: Timeouts) -> Worker<H> {
259 Worker {
260 handler: handler,
261 timeouts: timeouts,
262 }
263 }
264
265 fn handle_connection<S>(&self, stream: &mut S) where S: NetworkStream + Clone {
266 debug!("Incoming stream");
267
268 self.handler.on_connection_start();
269
270 let addr = match stream.peer_addr() {
271 Ok(addr) => addr,
272 Err(e) => {
273 info!("Peer Name error: {:?}", e);
274 return;
275 }
276 };
277
278 let stream_clone: &mut NetworkStream = &mut stream.clone();
280 let mut rdr = BufReader::new(stream_clone);
281 let mut wrt = BufWriter::new(stream);
282
283 while self.keep_alive_loop(&mut rdr, &mut wrt, addr) {
284 if let Err(e) = self.set_read_timeout(*rdr.get_ref(), self.timeouts.keep_alive) {
285 info!("set_read_timeout keep_alive {:?}", e);
286 break;
287 }
288 }
289
290 self.handler.on_connection_end();
291
292 debug!("keep_alive loop ending for {}", addr);
293 }
294
295 fn set_read_timeout(&self, s: &NetworkStream, timeout: Option<Duration>) -> io::Result<()> {
296 s.set_read_timeout(timeout)
297 }
298
299 fn keep_alive_loop<W: Write>(&self, rdr: &mut BufReader<&mut NetworkStream>,
300 wrt: &mut W, addr: SocketAddr) -> bool {
301 let req = match Request::new(rdr, addr) {
302 Ok(req) => req,
303 Err(Error::Io(ref e)) if e.kind() == ErrorKind::ConnectionAborted => {
304 trace!("tcp closed, cancelling keep-alive loop");
305 return false;
306 }
307 Err(Error::Io(e)) => {
308 debug!("ioerror in keepalive loop = {:?}", e);
309 return false;
310 }
311 Err(e) => {
312 info!("request error = {:?}", e);
314 return false;
315 }
316 };
317
318 if !self.handle_expect(&req, wrt) {
319 return false;
320 }
321
322 if let Err(e) = req.set_read_timeout(self.timeouts.read) {
323 info!("set_read_timeout {:?}", e);
324 return false;
325 }
326
327 let mut keep_alive = self.timeouts.keep_alive.is_some() &&
328 http::should_keep_alive(req.version, &req.headers);
329 let version = req.version;
330 let mut res_headers = Headers::new();
331 if !keep_alive {
332 res_headers.set(Connection::close());
333 }
334 {
335 let mut res = Response::new(wrt, &mut res_headers);
336 res.version = version;
337 self.handler.handle(req, res);
338 }
339
340 if keep_alive {
343 keep_alive = http::should_keep_alive(version, &res_headers);
344 }
345
346 debug!("keep_alive = {:?} for {}", keep_alive, addr);
347 keep_alive
348 }
349
350 fn handle_expect<W: Write>(&self, req: &Request, wrt: &mut W) -> bool {
351 if req.version == Http11 && req.headers.get() == Some(&Expect::Continue) {
352 let status = self.handler.check_continue((&req.method, &req.uri, &req.headers));
353 match write!(wrt, "{} {}\r\n\r\n", Http11, status).and_then(|_| wrt.flush()) {
354 Ok(..) => (),
355 Err(e) => {
356 info!("error writing 100-continue: {:?}", e);
357 return false;
358 }
359 }
360
361 if status != StatusCode::Continue {
362 debug!("non-100 status ({}) for Expect 100 request", status);
363 return false;
364 }
365 }
366
367 true
368 }
369}
370
371pub struct Listening {
373 _guard: Option<JoinHandle<()>>,
374 pub socket: SocketAddr,
376}
377
378impl fmt::Debug for Listening {
379 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
380 write!(f, "Listening {{ socket: {:?} }}", self.socket)
381 }
382}
383
384impl Drop for Listening {
385 fn drop(&mut self) {
386 let _ = self._guard.take().map(|g| g.join());
387 }
388}
389
390impl Listening {
391 pub fn close(&mut self) -> ::Result<()> {
396 let _ = self._guard.take();
397 debug!("closing server");
398 Ok(())
399 }
400}
401
402pub trait Handler: Sync + Send {
404 fn handle<'a, 'k>(&'a self, Request<'a, 'k>, Response<'a, Fresh>);
408
409 fn check_continue(&self, _: (&Method, &RequestUri, &Headers)) -> StatusCode {
414 StatusCode::Continue
415 }
416
417 fn on_connection_start(&self) { }
421
422 fn on_connection_end(&self) { }
426}
427
428impl<F> Handler for F where F: Fn(Request, Response<Fresh>), F: Sync + Send {
429 fn handle<'a, 'k>(&'a self, req: Request<'a, 'k>, res: Response<'a, Fresh>) {
430 self(req, res)
431 }
432}
433
434#[cfg(test)]
435mod tests {
436 use header::Headers;
437 use method::Method;
438 use mock::MockStream;
439 use status::StatusCode;
440 use uri::RequestUri;
441
442 use super::{Request, Response, Fresh, Handler, Worker};
443
444 #[test]
445 fn test_check_continue_default() {
446 let mut mock = MockStream::with_input(b"\
447 POST /upload HTTP/1.1\r\n\
448 Host: example.domain\r\n\
449 Expect: 100-continue\r\n\
450 Content-Length: 10\r\n\
451 \r\n\
452 1234567890\
453 ");
454
455 fn handle(_: Request, res: Response<Fresh>) {
456 res.start().unwrap().end().unwrap();
457 }
458
459 Worker::new(handle, Default::default()).handle_connection(&mut mock);
460 let cont = b"HTTP/1.1 100 Continue\r\n\r\n";
461 assert_eq!(&mock.write[..cont.len()], cont);
462 let res = b"HTTP/1.1 200 OK\r\n";
463 assert_eq!(&mock.write[cont.len()..cont.len() + res.len()], res);
464 }
465
466 #[test]
467 fn test_check_continue_reject() {
468 struct Reject;
469 impl Handler for Reject {
470 fn handle<'a, 'k>(&'a self, _: Request<'a, 'k>, res: Response<'a, Fresh>) {
471 res.start().unwrap().end().unwrap();
472 }
473
474 fn check_continue(&self, _: (&Method, &RequestUri, &Headers)) -> StatusCode {
475 StatusCode::ExpectationFailed
476 }
477 }
478
479 let mut mock = MockStream::with_input(b"\
480 POST /upload HTTP/1.1\r\n\
481 Host: example.domain\r\n\
482 Expect: 100-continue\r\n\
483 Content-Length: 10\r\n\
484 \r\n\
485 1234567890\
486 ");
487
488 Worker::new(Reject, Default::default()).handle_connection(&mut mock);
489 assert_eq!(mock.write, &b"HTTP/1.1 417 Expectation Failed\r\n\r\n"[..]);
490 }
491}