1use std::fmt;
111use std::io::{self, ErrorKind, BufWriter, Write};
112use std::net::{SocketAddr, ToSocketAddrs, Shutdown};
113use std::sync::Arc;
114use std::time::Duration;
115use cogo::coroutine::yield_now;
116pub use self::request::Request;
117pub use self::response::Response;
118pub use crate::net::{Fresh, Streaming};
119use crate::{Error, runtime};
120use crate::buffer::BufReader;
121use crate::header::{Headers, Expect, Connection};
122use crate::http;
123use crate::method::Method;
124use crate::net::{NetworkListener, NetworkStream, HttpListener, HttpsListener, SslServer};
125use crate::status::StatusCode;
126use crate::uri::RequestUri;
127use crate::version::HttpVersion::Http11;
128
129use self::listener::ListenerPool;
130
131pub mod request;
132pub mod response;
133
134mod listener;
135
136#[derive(Debug)]
141pub struct Server<L = HttpListener> {
142 pub listener: L,
143 pub timeouts: Timeouts,
144}
145
146#[derive(Clone, Copy, Debug)]
147pub struct Timeouts {
148 read: Option<Duration>,
149 keep_alive: Option<Duration>,
150 keep_alive_type: KeepAliveType,
151}
152
153#[derive(Clone, Copy, Debug)]
154pub enum KeepAliveType {
155 WaitTime(Duration),
156 WaitError(i32),}
159
160impl Default for Timeouts {
161 fn default() -> Timeouts {
162 Timeouts {
163 read: None,
164 keep_alive: Some(Duration::from_secs(5)),
165 keep_alive_type: KeepAliveType::WaitTime(Duration::from_secs(5)),
166 }
167 }
168}
169
170impl<L: NetworkListener> Server<L> {
171 #[inline]
173 pub fn new(listener: L) -> Server<L> {
174 Server {
175 listener: listener,
176 timeouts: Timeouts::default(),
177 }
178 }
179
180 #[inline]
189 pub fn keep_alive(mut self, timeout: Option<Duration>) -> Self {
190 self.timeouts.keep_alive = timeout;
191 self.timeouts.keep_alive_type = KeepAliveType::WaitTime(timeout.unwrap_or(Duration::from_secs(5)));
192 self
193 }
194
195 pub fn set_read_timeout(mut self, dur: Option<Duration>) -> Self {
197 self.listener.set_read_timeout(dur);
198 self.timeouts.read = dur;
199 self
200 }
201
202 pub fn set_write_timeout(mut self, dur: Option<Duration>) -> Self {
204 self.listener.set_write_timeout(dur);
205 self
206 }
207
208 pub fn set_keep_alive_type(mut self, t: KeepAliveType) -> Self {
210 self.timeouts.keep_alive_type = t;
211 self
212 }
213
214 pub fn local_addr(&mut self) -> io::Result<SocketAddr> {
216 self.listener.local_addr()
217 }
218}
219
220impl Server<HttpListener> {
221 pub fn http<To: ToSocketAddrs>(addr: To) -> crate::Result<Server<HttpListener>> {
223 HttpListener::new(addr).map(Server::new)
224 }
225}
226
227impl<S: SslServer + Clone + Send> Server<HttpsListener<S>> {
228 pub fn https<A: ToSocketAddrs>(addr: A, ssl: S) -> crate::Result<Server<HttpsListener<S>>> {
232 HttpsListener::new(addr, ssl).map(Server::new)
233 }
234}
235
236macro_rules! t_c {
237 ($e: expr) => {
238 match $e {
239 Ok(val) => val,
240 Err(err) => {
241 error!("call = {:?}\nerr = {:?}", stringify!($e), err);
242 continue;
243 }
244 }
245 };
246}
247
248impl<L: NetworkListener + Send + 'static> Server<L> {
249 pub fn handle<H: Handler + 'static>(self, handler: H) -> crate::Result<Listening> {
251 Self::handle_stack(self, handler, 0x2000)
252 }
253
254 pub fn handle_stack<H: Handler + 'static>(self, handler: H, stack_size: usize) -> crate::Result<Listening> {
256 let worker = Arc::new(Worker::new(handler, self.timeouts));
257 let mut listener = self.listener.clone();
258 let h = runtime::spawn_stack_size(move || {
259 for stream in listener.incoming() {
260 let mut stream = t_c!(stream);
261 let w = worker.clone();
262 runtime::spawn_stack_size(move || {
263 {
264 #[cfg(unix)]
265 stream.set_nonblocking(true);
266 {
267 match w.timeouts.keep_alive_type {
268 KeepAliveType::WaitTime(timeout) => {
269 let mut now = std::time::Instant::now();
270 loop {
271 stream.reset_io();
272 let keep_alive = w.handle_connection(&mut stream);
273 stream.wait_io();
274 if keep_alive == false {
275 if now.elapsed() >= timeout {
276 return;
277 } else {
278 yield_now();
279 continue;
280 }
281 } else {
282 if now.elapsed() <= timeout {
283 now = std::time::Instant::now();
284 }
285 }
286 }
287 }
288 KeepAliveType::WaitError(total) => {
289 let mut count = 0;
290 loop {
291 stream.reset_io();
292 let keep_alive = w.handle_connection(&mut stream);
293 stream.wait_io();
294 if keep_alive == false {
295 count += 1;
296 if count >= total {
297 return;
298 }
299 yield_now();
300 }
301 }
302 }
303 }
304 }
305 }
306 }, stack_size);
307 }
308 }, stack_size);
309 let socket = r#try!(self.listener.clone().local_addr());
310 return Ok(Listening {
311 _guard: Some(h),
312 socket: socket,
313 });
314 }
315
316 pub fn handle_tasks<H: Handler + 'static>(self, handler: H, tasks: usize) -> crate::Result<Listening> {
319 handle_task(self, handler, tasks)
320 }
321}
322
323fn handle_task<H, L>(mut server: Server<L>, handler: H, tasks: usize) -> crate::Result<Listening>
324 where H: Handler + 'static, L: NetworkListener + Send + 'static {
325 let socket = r#try!(server.listener.local_addr());
326
327 debug!("tasks = {:?}", tasks);
328 let pool = ListenerPool::new(server.listener);
329 let worker = Worker::new(handler, server.timeouts);
330 let work = move |mut stream| {
331 worker.handle_connection(&mut stream);
332 };
333
334 let guard = runtime::spawn(move || {
335 pool.accept(work, tasks);
336 });
337
338 Ok(Listening {
339 _guard: Some(guard),
340 socket: socket,
341 })
342}
343
344pub struct Worker<H: Handler + 'static> {
345 handler: H,
346 timeouts: Timeouts,
347}
348
349impl<H: Handler + 'static> Worker<H> {
350 pub fn new(handler: H, timeouts: Timeouts) -> Worker<H> {
351 Worker {
352 handler: handler,
353 timeouts: timeouts,
354 }
355 }
356
357 pub fn handle_connection<S>(&self, stream: &mut S) -> bool where S: NetworkStream {
358 debug!("Incoming stream");
359 self.handler.on_connection_start();
360
361 let addr = match stream.peer_addr() {
362 Ok(addr) => addr,
363 Err(e) => {
364 info!("Peer Name error: {:?}", e);
365 return false;
366 }
367 };
368 let mut s: S = unsafe { std::mem::transmute_copy(stream) };
370 let stream2: &mut dyn NetworkStream = &mut s;
371 let mut rdr = BufReader::new(stream2);
372 let mut wrt = BufWriter::new(stream);
373
374 let mut keep_alive = false;
375 while self.keep_alive_loop(&mut rdr, &mut wrt, addr) {
376 if let Err(e) = self.set_read_timeout(*rdr.get_ref(), self.timeouts.keep_alive) {
377 info!("set_read_timeout keep_alive {:?}", e);
378 break;
379 }
380 keep_alive = true;
381 }
382 self.handler.on_connection_end();
383 debug!("keep_alive loop ending for {}", addr);
384
385 std::mem::forget(s);
386 keep_alive
387 }
388
389 fn set_read_timeout(&self, s: &dyn NetworkStream, timeout: Option<Duration>) -> io::Result<()> {
390 s.set_read_timeout(timeout)
391 }
392
393 fn keep_alive_loop<W: Write>(&self, rdr: &mut BufReader<&mut dyn NetworkStream>,
394 wrt: &mut W, addr: SocketAddr) -> bool {
395 let req = match Request::new(rdr, addr) {
396 Ok(req) => req,
397 Err(Error::Io(ref e)) if e.kind() == ErrorKind::ConnectionAborted => {
398 trace!("tcp closed, cancelling keep-alive loop");
399 return false;
400 }
401 Err(Error::Io(e)) => {
402 debug!("ioerror in keepalive loop = {:?}", e);
403 return false;
404 }
405 Err(e) => {
406 info!("request error = {:?}", e);
408 return false;
409 }
410 };
411
412 if !self.handle_expect(&req, wrt) {
413 return false;
414 }
415
416 if let Err(e) = req.set_read_timeout(self.timeouts.read) {
417 info!("set_read_timeout {:?}", e);
418 return false;
419 }
420
421 let mut keep_alive = self.timeouts.keep_alive.is_some() &&
422 http::should_keep_alive(req.version, &req.headers);
423 let version = req.version;
424 let mut res_headers = Headers::with_capacity(1);
425 if !keep_alive {
426 res_headers.set(Connection::close());
427 }
428 {
429 let mut res = Response::new(wrt, &mut res_headers);
430 res.version = version;
431 self.handler.handle(req, res);
432 }
433
434 if keep_alive {
437 keep_alive = http::should_keep_alive(version, &res_headers);
438 }
439
440 debug!("keep_alive = {:?} for {}", keep_alive, addr);
441 keep_alive
442 }
443
444 fn handle_expect<W: Write>(&self, req: &Request, wrt: &mut W) -> bool {
445 if req.version == Http11 && req.headers.get() == Some(&Expect::Continue) {
446 let status = self.handler.check_continue((&req.method, &req.uri, &req.headers));
447 match write!(wrt, "{} {}\r\n\r\n", Http11, status).and_then(|_| wrt.flush()) {
448 Ok(..) => (),
449 Err(e) => {
450 info!("error writing 100-continue: {:?}", e);
451 return false;
452 }
453 }
454
455 if status != StatusCode::Continue {
456 debug!("non-100 status ({}) for Expect 100 request", status);
457 return false;
458 }
459 }
460
461 true
462 }
463}
464
465pub struct Listening {
467 _guard: Option<runtime::JoinHandle<()>>,
468 pub socket: SocketAddr,
470}
471
472impl fmt::Debug for Listening {
473 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
474 write!(f, "Listening {{ socket: {:?} }}", self.socket)
475 }
476}
477
478impl Drop for Listening {
479 fn drop(&mut self) {
480 let _ = self._guard.take().map(|g| g.join());
481 }
482}
483
484impl Listening {
485 pub fn close(&mut self) -> crate::Result<()> {
490 let _ = self._guard.take();
491 debug!("closing server");
492 Ok(())
493 }
494}
495
496pub trait Handler: Sync + Send {
498 fn handle(&self, req: Request, resp: Response<'_, Fresh>);
502
503 fn check_continue(&self, _: (&Method, &RequestUri, &Headers)) -> StatusCode {
508 StatusCode::Continue
509 }
510
511 fn on_connection_start(&self) {}
515
516 fn on_connection_end(&self) {}
520}
521
522impl<F> Handler for F where F: Fn(Request, Response<Fresh>), F: Sync + Send {
523 fn handle(&self, req: Request, res: Response) {
524 self(req, res)
525 }
526}
527
528#[cfg(test)]
529mod tests {
530 use crate::header::Headers;
531 use crate::method::Method;
532 use crate::mock::MockStream;
533 use crate::status::StatusCode;
534 use crate::uri::RequestUri;
535
536 use super::{Request, Response, Fresh, Handler, Worker};
537
538 #[test]
539 fn test_check_continue_default() {
540 let mut mock = MockStream::with_input(b"\
541 POST /upload HTTP/1.1\r\n\
542 Host: example.domain\r\n\
543 Expect: 100-continue\r\n\
544 Content-Length: 10\r\n\
545 \r\n\
546 1234567890\
547 ");
548
549 fn handle(_: Request, res: Response<Fresh>) {
550 res.start().unwrap().end().unwrap();
551 }
552
553 Worker::new(handle, Default::default()).handle_connection(&mut mock);
554 let cont = b"HTTP/1.1 100 Continue\r\n\r\n";
555 assert_eq!(&mock.write[..cont.len()], cont);
556 let res = b"HTTP/1.1 200 OK\r\n";
557 assert_eq!(&mock.write[cont.len()..cont.len() + res.len()], res);
558 }
559
560 #[test]
561 fn test_check_continue_reject() {
562 struct Reject;
563 impl Handler for Reject {
564 fn handle(&self, _: Request, res: Response<'_, Fresh>) {
565 res.start().unwrap().end().unwrap();
566 }
567
568 fn check_continue(&self, _: (&Method, &RequestUri, &Headers)) -> StatusCode {
569 StatusCode::ExpectationFailed
570 }
571 }
572
573 let mut mock = MockStream::with_input(b"\
574 POST /upload HTTP/1.1\r\n\
575 Host: example.domain\r\n\
576 Expect: 100-continue\r\n\
577 Content-Length: 10\r\n\
578 \r\n\
579 1234567890\
580 ");
581
582 Worker::new(Reject, Default::default()).handle_connection(&mut mock);
583 assert_eq!(mock.write, &b"HTTP/1.1 417 Expectation Failed\r\n\r\n"[..]);
584 }
585}