1use std::fmt;
111use std::io::{self, ErrorKind, BufWriter, Write};
112use std::net::{SocketAddr, ToSocketAddrs, Shutdown};
113use std::time::Duration;
114
115use num_cpus;
116
117pub use self::request::Request;
118pub use self::response::Response;
119
120pub use crate::net::{Fresh, Streaming};
121
122use crate::{Error, runtime};
123use crate::buffer::BufReader;
124use crate::header::{Headers, Expect, Connection};
125use crate::http;
126use crate::method::Method;
127use crate::net::{NetworkListener, NetworkStream, HttpListener, HttpsListener, SslServer};
128use crate::status::StatusCode;
129use crate::uri::RequestUri;
130use crate::version::HttpVersion::Http11;
131
132use self::listener::ListenerPool;
133
134pub mod request;
135pub mod response;
136
137pub mod extensions;
138
139pub use extensions::*;
140
141mod listener;
142
143#[derive(Debug)]
148pub struct Server<L = HttpListener> {
149 listener: L,
150 timeouts: Timeouts,
151}
152
153#[derive(Clone, Copy, Debug)]
154pub struct Timeouts {
155 pub read: Option<Duration>,
156 pub keep_alive: Option<Duration>,
157}
158
159impl Default for Timeouts {
160 fn default() -> Timeouts {
161 Timeouts {
162 read: None,
163 keep_alive: Some(Duration::from_secs(5))
164 }
165 }
166}
167
168impl<L: NetworkListener> Server<L> {
169 #[inline]
171 pub fn new(listener: L) -> Server<L> {
172 Server {
173 listener: listener,
174 timeouts: Timeouts::default()
175 }
176 }
177
178 #[inline]
187 pub fn keep_alive(&mut self, timeout: Option<Duration>) {
188 self.timeouts.keep_alive = timeout;
189 }
190
191 pub fn set_read_timeout(&mut self, dur: Option<Duration>) {
193 self.listener.set_read_timeout(dur);
194 self.timeouts.read = dur;
195 }
196
197 pub fn set_write_timeout(&mut self, dur: Option<Duration>) {
199 self.listener.set_write_timeout(dur);
200 }
201
202 pub fn local_addr(&mut self) -> io::Result<SocketAddr> {
204 self.listener.local_addr()
205 }
206}
207
208impl Server<HttpListener> {
209 pub fn http<To: ToSocketAddrs>(addr: To) -> crate::Result<Server<HttpListener>> {
211 HttpListener::new(addr).map(Server::new)
212 }
213}
214
215impl<S: SslServer + Clone + Send> Server<HttpsListener<S>> {
216 pub fn https<A: ToSocketAddrs>(addr: A, ssl: S) -> crate::Result<Server<HttpsListener<S>>> {
220 HttpsListener::new(addr, ssl).map(Server::new)
221 }
222}
223
224impl<L: NetworkListener + Send + 'static> Server<L> {
225 pub fn handle<H: Handler + 'static>(self, handler: H) -> crate::Result<Listening> {
227 self.handle_threads(handler, num_cpus::get() * 2)
228 }
229
230 pub fn handle_threads<H: Handler + 'static>(self, handler: H,
233 threads: usize) -> crate::Result<Listening> {
234 handle(self, handler, threads)
235 }
236
237 pub fn handle_accept(self, handler: fn(L::Stream)) -> crate::Result<Listening> {
239 handle_accept::<L>(self,handler, num_cpus::get() * 2)
240 }
241}
242
243fn handle<H, L>(mut server: Server<L>, handler: H, threads: usize) -> crate::Result<Listening>
244where H: Handler + 'static, L: NetworkListener + Send + 'static {
245 let socket = server.listener.local_addr()?;
246
247 debug!("threads = {:?}", threads);
248 let pool = ListenerPool::new(server.listener);
249 let worker = Worker::new(handler, server.timeouts);
250 let work = move |mut stream| {
251 worker.handle_connection(&mut stream);
252 };
253
254 let guard = runtime::spawn(move || pool.accept(work, threads));
255
256 Ok(Listening {
257 _guard: Some(guard),
258 socket: socket,
259 })
260}
261
262fn handle_accept<L>(mut server: Server<L>, handler: fn(L::Stream), threads: usize) -> crate::Result<Listening>
263 where L: NetworkListener + Send + 'static {
264 let socket = server.listener.local_addr()?;
265
266 debug!("threads = {:?}", threads);
267 let pool = ListenerPool::new(server.listener);
268 let work = move |stream| {
269 handler(stream)
270 };
271
272 let guard = runtime::spawn(move || pool.accept(work, threads));
273
274 Ok(Listening {
275 _guard: Some(guard),
276 socket: socket,
277 })
278}
279
280pub struct Worker<H: Handler + 'static> {
281 handler: H,
282 timeouts: Timeouts,
283}
284
285impl<H: Handler + 'static> Worker<H> {
286 pub fn new(handler: H, timeouts: Timeouts) -> Worker<H> {
287 Worker {
288 handler: handler,
289 timeouts: timeouts,
290 }
291 }
292
293 pub fn handle_connection<S>(&self, stream: &mut S) where S: NetworkStream + Clone {
294 debug!("Incoming stream");
295
296 self.handler.on_connection_start();
297
298 let addr = match stream.peer_addr() {
299 Ok(addr) => addr,
300 Err(e) => {
301 info!("Peer Name error: {:?}", e);
302 return;
303 }
304 };
305
306 let stream2: &mut dyn NetworkStream = &mut stream.clone();
307 let mut rdr = BufReader::new(stream2);
308 let mut wrt = BufWriter::new(stream);
309
310 while self.keep_alive_loop(&mut rdr, &mut wrt, addr) {
311 if let Err(e) = self.set_read_timeout(*rdr.get_ref(), self.timeouts.keep_alive) {
312 info!("set_read_timeout keep_alive {:?}", e);
313 break;
314 }
315 }
316
317 self.handler.on_connection_end();
318
319 debug!("keep_alive loop ending for {}", addr);
320
321 if let Err(e) = rdr.get_mut().close(Shutdown::Both) {
322 info!("failed to close stream: {}", e);
323 }
324 }
325
326 fn set_read_timeout(&self, s: &dyn NetworkStream, timeout: Option<Duration>) -> io::Result<()> {
327 s.set_read_timeout(timeout)
328 }
329
330 fn keep_alive_loop<W: Write>(&self, rdr: &mut BufReader<&mut dyn NetworkStream>,
331 wrt: &mut W, addr: SocketAddr) -> bool {
332 let req = match Request::new(rdr, addr) {
333 Ok(req) => req,
334 Err(Error::Io(ref e)) if e.kind() == ErrorKind::ConnectionAborted => {
335 trace!("tcp closed, cancelling keep-alive loop");
336 return false;
337 }
338 Err(Error::Io(e)) => {
339 debug!("ioerror in keepalive loop = {:?}", e);
340 return false;
341 }
342 Err(e) => {
343 info!("request error = {:?}", e);
345 return false;
346 }
347 };
348
349 if !self.handle_expect(&req, wrt) {
350 return false;
351 }
352
353 if let Err(e) = req.set_read_timeout(self.timeouts.read) {
354 info!("set_read_timeout {:?}", e);
355 return false;
356 }
357
358 let mut keep_alive = self.timeouts.keep_alive.is_some() &&
359 http::should_keep_alive(req.version, &req.headers);
360 let version = req.version;
361 let mut res_headers = Headers::new();
362 if !keep_alive {
363 res_headers.set(Connection::close());
364 }
365 {
366 let mut res = Response::new(wrt, &mut res_headers);
367 res.version = version;
368 self.handler.handle(req, res);
369 }
370
371 if keep_alive {
374 keep_alive = http::should_keep_alive(version, &res_headers);
375 }
376
377 debug!("keep_alive = {:?} for {}", keep_alive, addr);
378 keep_alive
379 }
380
381 fn handle_expect<W: Write>(&self, req: &Request, wrt: &mut W) -> bool {
382 if req.version == Http11 && req.headers.get() == Some(&Expect::Continue) {
383 let status = self.handler.check_continue((&req.method, &req.uri, &req.headers));
384 match write!(wrt, "{} {}\r\n\r\n", Http11, status).and_then(|_| wrt.flush()) {
385 Ok(..) => (),
386 Err(e) => {
387 info!("error writing 100-continue: {:?}", e);
388 return false;
389 }
390 }
391
392 if status != StatusCode::Continue {
393 debug!("non-100 status ({}) for Expect 100 request", status);
394 return false;
395 }
396 }
397
398 true
399 }
400}
401
402pub struct Listening {
404 _guard: Option<runtime::JoinHandle<()>>,
405 pub socket: SocketAddr,
407}
408
409impl fmt::Debug for Listening {
410 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
411 write!(f, "Listening {{ socket: {:?} }}", self.socket)
412 }
413}
414
415impl Drop for Listening {
416 fn drop(&mut self) {
417 let _ = self._guard.take().map(|g| g.join());
418 }
419}
420
421impl Listening {
422 pub fn close(&mut self) -> crate::Result<()> {
427 let _ = self._guard.take();
428 debug!("closing server");
429 Ok(())
430 }
431}
432
433pub trait Handler: Sync + Send {
435 fn handle<'a, 'k>(&'a self, _: Request<'a, 'k>, _: Response<'a, Fresh>);
439
440 fn check_continue(&self, _: (&Method, &RequestUri, &Headers)) -> StatusCode {
445 StatusCode::Continue
446 }
447
448 fn on_connection_start(&self) { }
452
453 fn on_connection_end(&self) { }
457}
458
459impl<F> Handler for F where F: Fn(Request, Response<Fresh>), F: Sync + Send {
460 fn handle<'a, 'k>(&'a self, req: Request<'a, 'k>, res: Response<'a, Fresh>) {
461 self(req, res)
462 }
463}
464
465#[cfg(test)]
466mod tests {
467 use crate::header::Headers;
468 use crate::method::Method;
469 use crate::mock::MockStream;
470 use crate::status::StatusCode;
471 use crate::uri::RequestUri;
472
473 use super::{Request, Response, Fresh, Handler, Worker};
474
475 #[test]
476 fn test_check_continue_default() {
477 let mut mock = MockStream::with_input(b"\
478 POST /upload HTTP/1.1\r\n\
479 Host: example.domain\r\n\
480 Expect: 100-continue\r\n\
481 Content-Length: 10\r\n\
482 \r\n\
483 1234567890\
484 ");
485
486 fn handle(_: Request, res: Response<Fresh>) {
487 res.start().unwrap().end().unwrap();
488 }
489
490 Worker::new(handle, Default::default()).handle_connection(&mut mock);
491 let cont = b"HTTP/1.1 100 Continue\r\n\r\n";
492 assert_eq!(&mock.write[..cont.len()], cont);
493 let res = b"HTTP/1.1 200 OK\r\n";
494 assert_eq!(&mock.write[cont.len()..cont.len() + res.len()], res);
495 }
496
497 #[test]
498 fn test_check_continue_reject() {
499 struct Reject;
500 impl Handler for Reject {
501 fn handle<'a, 'k>(&'a self, _: Request<'a, 'k>, res: Response<'a, Fresh>) {
502 res.start().unwrap().end().unwrap();
503 }
504
505 fn check_continue(&self, _: (&Method, &RequestUri, &Headers)) -> StatusCode {
506 StatusCode::ExpectationFailed
507 }
508 }
509
510 let mut mock = MockStream::with_input(b"\
511 POST /upload HTTP/1.1\r\n\
512 Host: example.domain\r\n\
513 Expect: 100-continue\r\n\
514 Content-Length: 10\r\n\
515 \r\n\
516 1234567890\
517 ");
518
519 Worker::new(Reject, Default::default()).handle_connection(&mut mock);
520 assert_eq!(mock.write, &b"HTTP/1.1 417 Expectation Failed\r\n\r\n"[..]);
521 }
522}