1use std::borrow::Cow;
3use std::cmp::min;
4use std::fmt;
5use std::io::{self, Write, BufWriter, BufRead, Read};
6use std::net::Shutdown;
7use std::time::Duration;
8
9use httparse;
10use url::Position as UrlPosition;
11
12use buffer::BufReader;
13use Error;
14use header::{Headers, ContentLength, TransferEncoding};
15use header::Encoding::Chunked;
16use method::{Method};
17use net::{NetworkConnector, NetworkStream};
18use status::StatusCode;
19use version::HttpVersion;
20use version::HttpVersion::{Http10, Http11};
21use uri::RequestUri;
22
23use self::HttpReader::{SizedReader, ChunkedReader, EofReader, EmptyReader};
24use self::HttpWriter::{ChunkedWriter, SizedWriter, EmptyWriter, ThroughWriter};
25
26use http::{
27 RawStatus,
28 Protocol,
29 HttpMessage,
30 RequestHead,
31 ResponseHead,
32};
33use header;
34use version;
35
36const MAX_INVALID_RESPONSE_BYTES: usize = 1024 * 128;
37
38#[derive(Debug)]
39struct Wrapper<T> {
40 obj: Option<T>,
41}
42
43impl<T> Wrapper<T> {
44 pub fn new(obj: T) -> Wrapper<T> {
45 Wrapper { obj: Some(obj) }
46 }
47
48 pub fn map_in_place<F>(&mut self, f: F) where F: FnOnce(T) -> T {
49 let obj = self.obj.take().unwrap();
50 let res = f(obj);
51 self.obj = Some(res);
52 }
53
54 pub fn into_inner(self) -> T { self.obj.unwrap() }
55 pub fn as_mut(&mut self) -> &mut T { self.obj.as_mut().unwrap() }
56 pub fn as_ref(&self) -> &T { self.obj.as_ref().unwrap() }
57}
58
59#[derive(Debug)]
60enum Stream {
61 Idle(Box<NetworkStream + Send>),
62 Writing(HttpWriter<BufWriter<Box<NetworkStream + Send>>>),
63 Reading(HttpReader<BufReader<Box<NetworkStream + Send>>>),
64}
65
66impl Stream {
67 fn writer_mut(&mut self) -> Option<&mut HttpWriter<BufWriter<Box<NetworkStream + Send>>>> {
68 match *self {
69 Stream::Writing(ref mut writer) => Some(writer),
70 _ => None,
71 }
72 }
73 fn reader_mut(&mut self) -> Option<&mut HttpReader<BufReader<Box<NetworkStream + Send>>>> {
74 match *self {
75 Stream::Reading(ref mut reader) => Some(reader),
76 _ => None,
77 }
78 }
79 fn reader_ref(&self) -> Option<&HttpReader<BufReader<Box<NetworkStream + Send>>>> {
80 match *self {
81 Stream::Reading(ref reader) => Some(reader),
82 _ => None,
83 }
84 }
85
86 fn new(stream: Box<NetworkStream + Send>) -> Stream {
87 Stream::Idle(stream)
88 }
89}
90
91#[derive(Debug)]
93pub struct Http11Message {
94 is_proxied: bool,
95 method: Option<Method>,
96 stream: Wrapper<Stream>,
97}
98
99impl Write for Http11Message {
100 #[inline]
101 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
102 match self.stream.as_mut().writer_mut() {
103 None => Err(io::Error::new(io::ErrorKind::Other,
104 "Not in a writable state")),
105 Some(ref mut writer) => writer.write(buf),
106 }
107 }
108 #[inline]
109 fn flush(&mut self) -> io::Result<()> {
110 match self.stream.as_mut().writer_mut() {
111 None => Err(io::Error::new(io::ErrorKind::Other,
112 "Not in a writable state")),
113 Some(ref mut writer) => writer.flush(),
114 }
115 }
116}
117
118impl Read for Http11Message {
119 #[inline]
120 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
121 match self.stream.as_mut().reader_mut() {
122 None => Err(io::Error::new(io::ErrorKind::Other,
123 "Not in a readable state")),
124 Some(ref mut reader) => reader.read(buf),
125 }
126 }
127}
128
129impl HttpMessage for Http11Message {
130 fn set_outgoing(&mut self, mut head: RequestHead) -> ::Result<RequestHead> {
131 let mut res = Err(Error::from(io::Error::new(
132 io::ErrorKind::Other,
133 "")));
134 let mut method = None;
135 let is_proxied = self.is_proxied;
136 self.stream.map_in_place(|stream: Stream| -> Stream {
137 let stream = match stream {
138 Stream::Idle(stream) => stream,
139 _ => {
140 res = Err(Error::from(io::Error::new(
141 io::ErrorKind::Other,
142 "Message not idle, cannot start new outgoing")));
143 return stream;
144 },
145 };
146 let mut stream = BufWriter::new(stream);
147
148 {
149 let uri = if is_proxied {
150 head.url.as_ref()
151 } else {
152 &head.url[UrlPosition::BeforePath..UrlPosition::AfterQuery]
153 };
154
155 let version = version::HttpVersion::Http11;
156 debug!("request line: {:?} {:?} {:?}", head.method, uri, version);
157 match write!(&mut stream, "{} {} {}{}",
158 head.method, uri, version, LINE_ENDING) {
159 Err(e) => {
160 res = Err(From::from(e));
161 return Stream::Idle(stream.into_inner().ok().unwrap());
164 },
165 Ok(_) => {},
166 };
167 }
168
169 let stream = {
170 let write_headers = |mut stream: BufWriter<Box<NetworkStream + Send>>, head: &RequestHead| {
171 debug!("headers={:?}", head.headers);
172 match write!(&mut stream, "{}{}", head.headers, LINE_ENDING) {
173 Ok(_) => Ok(stream),
174 Err(e) => {
175 Err((e, stream.into_inner().unwrap()))
176 }
177 }
178 };
179 match head.method {
180 Method::Get | Method::Head => {
181 let writer = match write_headers(stream, &head) {
182 Ok(w) => w,
183 Err(e) => {
184 res = Err(From::from(e.0));
185 return Stream::Idle(e.1);
186 }
187 };
188 EmptyWriter(writer)
189 },
190 _ => {
191 let mut chunked = true;
192 let mut len = 0;
193
194 match head.headers.get::<header::ContentLength>() {
195 Some(cl) => {
196 chunked = false;
197 len = **cl;
198 },
199 None => ()
200 };
201
202 if chunked {
204 let encodings = match head.headers.get_mut::<header::TransferEncoding>() {
205 Some(encodings) => {
206 encodings.push(header::Encoding::Chunked);
208 false
209 },
210 None => true
211 };
212
213 if encodings {
214 head.headers.set(
215 header::TransferEncoding(vec![header::Encoding::Chunked]))
216 }
217 }
218
219 let stream = match write_headers(stream, &head) {
220 Ok(s) => s,
221 Err(e) => {
222 res = Err(From::from(e.0));
223 return Stream::Idle(e.1);
224 },
225 };
226
227 if chunked {
228 ChunkedWriter(stream)
229 } else {
230 SizedWriter(stream, len)
231 }
232 }
233 }
234 };
235
236 method = Some(head.method.clone());
237 res = Ok(head);
238 Stream::Writing(stream)
239 });
240
241 self.method = method;
242 res
243 }
244
245 fn get_incoming(&mut self) -> ::Result<ResponseHead> {
246 try!(self.flush_outgoing());
247 let method = self.method.take().unwrap_or(Method::Get);
248 let mut res = Err(From::from(
249 io::Error::new(io::ErrorKind::Other,
250 "Read already in progress")));
251 self.stream.map_in_place(|stream| {
252 let stream = match stream {
253 Stream::Idle(stream) => stream,
254 _ => {
255 res = Err(From::from(
258 io::Error::new(io::ErrorKind::Other,
259 "Read already in progress")));
260 return stream;
261 }
262 };
263
264 let expected_no_content = stream.previous_response_expected_no_content();
265 trace!("previous_response_expected_no_content = {}", expected_no_content);
266
267 let mut stream = BufReader::new(stream);
268
269 let mut invalid_bytes_read = 0;
270 let head;
271 loop {
272 head = match parse_response(&mut stream) {
273 Ok(head) => head,
274 Err(::Error::Version)
275 if expected_no_content && invalid_bytes_read < MAX_INVALID_RESPONSE_BYTES => {
276 trace!("expected_no_content, found content");
277 invalid_bytes_read += 1;
278 stream.consume(1);
279 continue;
280 }
281 Err(e) => {
282 res = Err(e);
283 return Stream::Idle(stream.into_inner());
284 }
285 };
286 break;
287 }
288
289 let raw_status = head.subject;
290 let headers = head.headers;
291
292 let is_empty = !should_have_response_body(&method, raw_status.0);
293 stream.get_mut().set_previous_response_expected_no_content(is_empty);
294 let reader = if is_empty {
303 EmptyReader(stream)
304 } else {
305 if let Some(&TransferEncoding(ref codings)) = headers.get() {
306 if codings.last() == Some(&Chunked) {
307 ChunkedReader(stream, None)
308 } else {
309 trace!("not chuncked. read till eof");
310 EofReader(stream)
311 }
312 } else if let Some(&ContentLength(len)) = headers.get() {
313 SizedReader(stream, len)
314 } else if headers.has::<ContentLength>() {
315 trace!("illegal Content-Length: {:?}", headers.get_raw("Content-Length"));
316 res = Err(Error::Header);
317 return Stream::Idle(stream.into_inner());
318 } else {
319 trace!("neither Transfer-Encoding nor Content-Length");
320 EofReader(stream)
321 }
322 };
323
324 trace!("Http11Message.reader = {:?}", reader);
325
326
327 res = Ok(ResponseHead {
328 headers: headers,
329 raw_status: raw_status,
330 version: head.version,
331 });
332
333 Stream::Reading(reader)
334 });
335 res
336 }
337
338 fn has_body(&self) -> bool {
339 match self.stream.as_ref().reader_ref() {
340 Some(&EmptyReader(..)) |
341 Some(&SizedReader(_, 0)) |
342 Some(&ChunkedReader(_, Some(0))) => false,
343 _ => true
345 }
346 }
347
348 #[inline]
349 fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
350 self.get_ref().set_read_timeout(dur)
351 }
352
353 #[inline]
354 fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
355 self.get_ref().set_write_timeout(dur)
356 }
357
358 #[inline]
359 fn close_connection(&mut self) -> ::Result<()> {
360 try!(self.get_mut().close(Shutdown::Both));
361 Ok(())
362 }
363
364 #[inline]
365 fn set_proxied(&mut self, val: bool) {
366 self.is_proxied = val;
367 }
368}
369
370impl Http11Message {
371 pub fn into_inner(self) -> Box<NetworkStream + Send> {
373 match self.stream.into_inner() {
374 Stream::Idle(stream) => stream,
375 Stream::Writing(stream) => stream.into_inner().into_inner().unwrap(),
376 Stream::Reading(stream) => stream.into_inner().into_inner(),
377 }
378 }
379
380 pub fn get_ref(&self) -> &(NetworkStream + Send) {
383 match *self.stream.as_ref() {
384 Stream::Idle(ref stream) => &**stream,
385 Stream::Writing(ref stream) => &**stream.get_ref().get_ref(),
386 Stream::Reading(ref stream) => &**stream.get_ref().get_ref()
387 }
388 }
389
390 pub fn get_mut(&mut self) -> &mut (NetworkStream + Send) {
393 match *self.stream.as_mut() {
394 Stream::Idle(ref mut stream) => &mut **stream,
395 Stream::Writing(ref mut stream) => &mut **stream.get_mut().get_mut(),
396 Stream::Reading(ref mut stream) => &mut **stream.get_mut().get_mut()
397 }
398 }
399
400 pub fn with_stream(stream: Box<NetworkStream + Send>) -> Http11Message {
403 Http11Message {
404 is_proxied: false,
405 method: None,
406 stream: Wrapper::new(Stream::new(stream)),
407 }
408 }
409
410 pub fn flush_outgoing(&mut self) -> ::Result<()> {
414 let mut res = Ok(());
415 self.stream.map_in_place(|stream| {
416 let writer = match stream {
417 Stream::Writing(writer) => writer,
418 _ => {
419 res = Ok(());
420 return stream;
421 },
422 };
423 let raw = match writer.end() {
425 Ok(buf) => buf.into_inner().unwrap(),
426 Err(e) => {
427 res = Err(From::from(e.0));
428 return Stream::Writing(e.1);
429 }
430 };
431 Stream::Idle(raw)
432 });
433 res
434 }
435}
436
437pub struct Http11Protocol {
439 connector: Connector,
440}
441
442impl Protocol for Http11Protocol {
443 fn new_message(&self, host: &str, port: u16, scheme: &str) -> ::Result<Box<HttpMessage>> {
444 let stream = try!(self.connector.connect(host, port, scheme)).into();
445
446 Ok(Box::new(Http11Message::with_stream(stream)))
447 }
448}
449
450impl Http11Protocol {
451 pub fn with_connector<C, S>(c: C) -> Http11Protocol
454 where C: NetworkConnector<Stream=S> + Send + Sync + 'static,
455 S: NetworkStream + Send {
456 Http11Protocol {
457 connector: Connector(Box::new(ConnAdapter(c))),
458 }
459 }
460}
461
462struct ConnAdapter<C: NetworkConnector + Send + Sync>(C);
463
464impl<C: NetworkConnector<Stream=S> + Send + Sync, S: NetworkStream + Send>
465 NetworkConnector for ConnAdapter<C> {
466 type Stream = Box<NetworkStream + Send>;
467 #[inline]
468 fn connect(&self, host: &str, port: u16, scheme: &str)
469 -> ::Result<Box<NetworkStream + Send>> {
470 Ok(try!(self.0.connect(host, port, scheme)).into())
471 }
472}
473
474struct Connector(Box<NetworkConnector<Stream=Box<NetworkStream + Send>> + Send + Sync>);
475
476impl NetworkConnector for Connector {
477 type Stream = Box<NetworkStream + Send>;
478 #[inline]
479 fn connect(&self, host: &str, port: u16, scheme: &str)
480 -> ::Result<Box<NetworkStream + Send>> {
481 Ok(try!(self.0.connect(host, port, scheme)).into())
482 }
483}
484
485
486pub enum HttpReader<R> {
491 SizedReader(R, u64),
493 ChunkedReader(R, Option<u64>),
495 EofReader(R),
510 EmptyReader(R),
514}
515
516impl<R: Read> HttpReader<R> {
517
518 pub fn into_inner(self) -> R {
520 match self {
521 SizedReader(r, _) => r,
522 ChunkedReader(r, _) => r,
523 EofReader(r) => r,
524 EmptyReader(r) => r,
525 }
526 }
527
528 pub fn get_ref(&self) -> &R {
530 match *self {
531 SizedReader(ref r, _) => r,
532 ChunkedReader(ref r, _) => r,
533 EofReader(ref r) => r,
534 EmptyReader(ref r) => r,
535 }
536 }
537
538 pub fn get_mut(&mut self) -> &mut R {
540 match *self {
541 SizedReader(ref mut r, _) => r,
542 ChunkedReader(ref mut r, _) => r,
543 EofReader(ref mut r) => r,
544 EmptyReader(ref mut r) => r,
545 }
546 }
547}
548
549impl<R> fmt::Debug for HttpReader<R> {
550 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
551 match *self {
552 SizedReader(_,rem) => write!(fmt, "SizedReader(remaining={:?})", rem),
553 ChunkedReader(_, None) => write!(fmt, "ChunkedReader(chunk_remaining=unknown)"),
554 ChunkedReader(_, Some(rem)) => write!(fmt, "ChunkedReader(chunk_remaining={:?})", rem),
555 EofReader(_) => write!(fmt, "EofReader"),
556 EmptyReader(_) => write!(fmt, "EmptyReader"),
557 }
558 }
559}
560
561impl<R: Read> Read for HttpReader<R> {
562 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
563 if buf.is_empty() {
564 return Ok(0);
565 }
566 match *self {
567 SizedReader(ref mut body, ref mut remaining) => {
568 trace!("Sized read, remaining={:?}", remaining);
569 if *remaining == 0 {
570 Ok(0)
571 } else {
572 let to_read = min(*remaining as usize, buf.len());
573 let num = try!(body.read(&mut buf[..to_read])) as u64;
574 trace!("Sized read: {}", num);
575 if num > *remaining {
576 *remaining = 0;
577 } else if num == 0 {
578 return Err(io::Error::new(io::ErrorKind::Other, "early eof"));
579 } else {
580 *remaining -= num;
581 }
582 Ok(num as usize)
583 }
584 },
585 ChunkedReader(ref mut body, ref mut opt_remaining) => {
586 let mut rem = match *opt_remaining {
587 Some(ref rem) => *rem,
588 None => try!(read_chunk_size(body))
590 };
591 trace!("Chunked read, remaining={:?}", rem);
592
593 if rem == 0 {
594 if opt_remaining.is_none() {
595 try!(eat(body, LINE_ENDING.as_bytes()));
596 }
597
598 *opt_remaining = Some(0);
599
600 trace!("end of chunked");
604
605 return Ok(0)
606 }
607
608 let to_read = min(rem as usize, buf.len());
609 let count = try!(body.read(&mut buf[..to_read])) as u64;
610
611 if count == 0 {
612 *opt_remaining = Some(0);
613 return Err(io::Error::new(io::ErrorKind::Other, "early eof"));
614 }
615
616 rem -= count;
617 *opt_remaining = if rem > 0 {
618 Some(rem)
619 } else {
620 try!(eat(body, LINE_ENDING.as_bytes()));
621 None
622 };
623 Ok(count as usize)
624 },
625 EofReader(ref mut body) => {
626 let r = body.read(buf);
627 trace!("eofread: {:?}", r);
628 r
629 },
630 EmptyReader(_) => Ok(0)
631 }
632 }
633}
634
635fn eat<R: Read>(rdr: &mut R, bytes: &[u8]) -> io::Result<()> {
636 let mut buf = [0];
637 for &b in bytes.iter() {
638 match try!(rdr.read(&mut buf)) {
639 1 if buf[0] == b => (),
640 _ => return Err(io::Error::new(io::ErrorKind::InvalidInput,
641 "Invalid characters found")),
642 }
643 }
644 Ok(())
645}
646
647fn read_chunk_size<R: Read>(rdr: &mut R) -> io::Result<u64> {
649 macro_rules! byte (
650 ($rdr:ident) => ({
651 let mut buf = [0];
652 match try!($rdr.read(&mut buf)) {
653 1 => buf[0],
654 _ => return Err(io::Error::new(io::ErrorKind::InvalidInput,
655 "Invalid chunk size line")),
656
657 }
658 })
659 );
660 let mut size = 0u64;
661 let radix = 16;
662 let mut in_ext = false;
663 let mut in_chunk_size = true;
664 loop {
665 match byte!(rdr) {
666 b@b'0'...b'9' if in_chunk_size => {
667 size *= radix;
668 size += (b - b'0') as u64;
669 },
670 b@b'a'...b'f' if in_chunk_size => {
671 size *= radix;
672 size += (b + 10 - b'a') as u64;
673 },
674 b@b'A'...b'F' if in_chunk_size => {
675 size *= radix;
676 size += (b + 10 - b'A') as u64;
677 },
678 CR => {
679 match byte!(rdr) {
680 LF => break,
681 _ => return Err(io::Error::new(io::ErrorKind::InvalidInput,
682 "Invalid chunk size line"))
683
684 }
685 },
686 b';' if !in_ext => {
688 in_ext = true;
689 in_chunk_size = false;
690 },
691 b'\t' | b' ' if !in_ext & !in_chunk_size => {},
694 b'\t' | b' ' if in_chunk_size => in_chunk_size = false,
696 ext if in_ext => {
702 todo!("chunk extension byte={}", ext);
703 },
704 _ => {
707 return Err(io::Error::new(io::ErrorKind::InvalidInput,
708 "Invalid chunk size line"));
709 }
710 }
711 }
712 trace!("chunk size={:?}", size);
713 Ok(size)
714}
715
716fn should_have_response_body(method: &Method, status: u16) -> bool {
717 trace!("should_have_response_body({:?}, {})", method, status);
718 match (method, status) {
719 (&Method::Head, _) |
720 (_, 100...199) |
721 (_, 204) |
722 (_, 304) |
723 (&Method::Connect, 200...299) => false,
724 _ => true
725 }
726}
727
728pub enum HttpWriter<W: Write> {
730 ThroughWriter(W),
732 ChunkedWriter(W),
734 SizedWriter(W, u64),
738 EmptyWriter(W),
740}
741
742impl<W: Write> HttpWriter<W> {
743 #[inline]
745 pub fn into_inner(self) -> W {
746 match self {
747 ThroughWriter(w) => w,
748 ChunkedWriter(w) => w,
749 SizedWriter(w, _) => w,
750 EmptyWriter(w) => w,
751 }
752 }
753
754 #[inline]
756 pub fn get_ref(&self) -> &W {
757 match *self {
758 ThroughWriter(ref w) => w,
759 ChunkedWriter(ref w) => w,
760 SizedWriter(ref w, _) => w,
761 EmptyWriter(ref w) => w,
762 }
763 }
764
765 #[inline]
770 pub fn get_mut(&mut self) -> &mut W {
771 match *self {
772 ThroughWriter(ref mut w) => w,
773 ChunkedWriter(ref mut w) => w,
774 SizedWriter(ref mut w, _) => w,
775 EmptyWriter(ref mut w) => w,
776 }
777 }
778
779 #[inline]
784 pub fn end(mut self) -> Result<W, EndError<W>> {
785 fn inner<W: Write>(w: &mut W) -> io::Result<()> {
786 try!(w.write(&[]));
787 w.flush()
788 }
789
790 match inner(&mut self) {
791 Ok(..) => Ok(self.into_inner()),
792 Err(e) => Err(EndError(e, self))
793 }
794 }
795}
796
797#[derive(Debug)]
798pub struct EndError<W: Write>(io::Error, HttpWriter<W>);
799
800impl<W: Write> From<EndError<W>> for io::Error {
801 fn from(e: EndError<W>) -> io::Error {
802 e.0
803 }
804}
805
806impl<W: Write> Write for HttpWriter<W> {
807 #[inline]
808 fn write(&mut self, msg: &[u8]) -> io::Result<usize> {
809 match *self {
810 ThroughWriter(ref mut w) => w.write(msg),
811 ChunkedWriter(ref mut w) => {
812 let chunk_size = msg.len();
813 trace!("chunked write, size = {:?}", chunk_size);
814 try!(write!(w, "{:X}{}", chunk_size, LINE_ENDING));
815 try!(w.write_all(msg));
816 try!(w.write_all(LINE_ENDING.as_bytes()));
817 Ok(msg.len())
818 },
819 SizedWriter(ref mut w, ref mut remaining) => {
820 let len = msg.len() as u64;
821 if len > *remaining {
822 let len = *remaining;
823 *remaining = 0;
824 try!(w.write_all(&msg[..len as usize]));
825 Ok(len as usize)
826 } else {
827 *remaining -= len;
828 try!(w.write_all(msg));
829 Ok(len as usize)
830 }
831 },
832 EmptyWriter(..) => {
833 if !msg.is_empty() {
834 error!("Cannot include a body with this kind of message");
835 }
836 Ok(0)
837 }
838 }
839 }
840
841 #[inline]
842 fn flush(&mut self) -> io::Result<()> {
843 match *self {
844 ThroughWriter(ref mut w) => w.flush(),
845 ChunkedWriter(ref mut w) => w.flush(),
846 SizedWriter(ref mut w, _) => w.flush(),
847 EmptyWriter(ref mut w) => w.flush(),
848 }
849 }
850}
851
852impl<W: Write> fmt::Debug for HttpWriter<W> {
853 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
854 match *self {
855 ThroughWriter(_) => write!(fmt, "ThroughWriter"),
856 ChunkedWriter(_) => write!(fmt, "ChunkedWriter"),
857 SizedWriter(_, rem) => write!(fmt, "SizedWriter(remaining={:?})", rem),
858 EmptyWriter(_) => write!(fmt, "EmptyWriter"),
859 }
860 }
861}
862
863const MAX_HEADERS: usize = 100;
864
865#[inline]
867pub fn parse_request<R: Read>(buf: &mut BufReader<R>) -> ::Result<Incoming<(Method, RequestUri)>> {
868 parse::<R, httparse::Request, (Method, RequestUri)>(buf)
869}
870
871#[inline]
873pub fn parse_response<R: Read>(buf: &mut BufReader<R>) -> ::Result<Incoming<RawStatus>> {
874 parse::<R, httparse::Response, RawStatus>(buf)
875}
876
877fn parse<R: Read, T: TryParse<Subject=I>, I>(rdr: &mut BufReader<R>) -> ::Result<Incoming<I>> {
878 loop {
879 match try!(try_parse::<R, T, I>(rdr)) {
880 httparse::Status::Complete((inc, len)) => {
881 rdr.consume(len);
882 return Ok(inc);
883 },
884 _partial => ()
885 }
886 let n = try!(rdr.read_into_buf());
887 if n == 0 {
888 let buffered = rdr.get_buf().len();
889 if buffered == ::buffer::MAX_BUFFER_SIZE {
890 return Err(Error::TooLarge);
891 } else {
892 return Err(Error::Io(io::Error::new(
893 io::ErrorKind::UnexpectedEof,
894 "end of stream before headers finished"
895 )));
896 }
897 }
898 }
899}
900
901fn try_parse<R: Read, T: TryParse<Subject=I>, I>(rdr: &mut BufReader<R>) -> TryParseResult<I> {
902 let mut headers = [httparse::EMPTY_HEADER; MAX_HEADERS];
903 let buf = rdr.get_buf();
904 if buf.len() == 0 {
905 return Ok(httparse::Status::Partial);
906 }
907 trace!("try_parse({:?})", buf);
908 <T as TryParse>::try_parse(&mut headers, buf)
909}
910
911#[doc(hidden)]
912trait TryParse {
913 type Subject;
914 fn try_parse<'a>(headers: &'a mut [httparse::Header<'a>], buf: &'a [u8]) ->
915 TryParseResult<Self::Subject>;
916}
917
918type TryParseResult<T> = Result<httparse::Status<(Incoming<T>, usize)>, Error>;
919
920impl<'a> TryParse for httparse::Request<'a, 'a> {
921 type Subject = (Method, RequestUri);
922
923 fn try_parse<'b>(headers: &'b mut [httparse::Header<'b>], buf: &'b [u8]) ->
924 TryParseResult<(Method, RequestUri)> {
925 trace!("Request.try_parse([Header; {}], [u8; {}])", headers.len(), buf.len());
926 let mut req = httparse::Request::new(headers);
927 Ok(match try!(req.parse(buf)) {
928 httparse::Status::Complete(len) => {
929 trace!("Request.try_parse Complete({})", len);
930 httparse::Status::Complete((Incoming {
931 version: if req.version.unwrap() == 1 { Http11 } else { Http10 },
932 subject: (
933 try!(req.method.unwrap().parse()),
934 try!(req.path.unwrap().parse())
935 ),
936 headers: try!(Headers::from_raw(req.headers))
937 }, len))
938 },
939 httparse::Status::Partial => httparse::Status::Partial
940 })
941 }
942}
943
944impl<'a> TryParse for httparse::Response<'a, 'a> {
945 type Subject = RawStatus;
946
947 fn try_parse<'b>(headers: &'b mut [httparse::Header<'b>], buf: &'b [u8]) ->
948 TryParseResult<RawStatus> {
949 trace!("Response.try_parse([Header; {}], [u8; {}])", headers.len(), buf.len());
950 let mut res = httparse::Response::new(headers);
951 Ok(match try!(res.parse(buf)) {
952 httparse::Status::Complete(len) => {
953 trace!("Response.try_parse Complete({})", len);
954 let code = res.code.unwrap();
955 let reason = match StatusCode::from_u16(code).canonical_reason() {
956 Some(reason) if reason == res.reason.unwrap() => Cow::Borrowed(reason),
957 _ => Cow::Owned(res.reason.unwrap().to_owned())
958 };
959 httparse::Status::Complete((Incoming {
960 version: if res.version.unwrap() == 1 { Http11 } else { Http10 },
961 subject: RawStatus(code, reason),
962 headers: try!(Headers::from_raw(res.headers))
963 }, len))
964 },
965 httparse::Status::Partial => httparse::Status::Partial
966 })
967 }
968}
969
970#[derive(Debug)]
972pub struct Incoming<S> {
973 pub version: HttpVersion,
975 pub subject: S,
977 pub headers: Headers
979}
980
981pub const CR: u8 = b'\r';
983pub const LF: u8 = b'\n';
985pub const LINE_ENDING: &'static str = "\r\n";
987
988#[cfg(test)]
989mod tests {
990 use std::error::Error;
991 use std::io::{self, Read, Write};
992
993
994 use buffer::BufReader;
995 use mock::MockStream;
996 use http::HttpMessage;
997
998 use super::{read_chunk_size, parse_request, parse_response, Http11Message};
999
1000 #[test]
1001 fn test_write_chunked() {
1002 use std::str::from_utf8;
1003 let mut w = super::HttpWriter::ChunkedWriter(Vec::new());
1004 w.write_all(b"foo bar").unwrap();
1005 w.write_all(b"baz quux herp").unwrap();
1006 let buf = w.end().unwrap();
1007 let s = from_utf8(buf.as_ref()).unwrap();
1008 assert_eq!(s, "7\r\nfoo bar\r\nD\r\nbaz quux herp\r\n0\r\n\r\n");
1009 }
1010
1011 #[test]
1012 fn test_write_sized() {
1013 use std::str::from_utf8;
1014 let mut w = super::HttpWriter::SizedWriter(Vec::new(), 8);
1015 w.write_all(b"foo bar").unwrap();
1016 assert_eq!(w.write(b"baz").unwrap(), 1);
1017
1018 let buf = w.end().unwrap();
1019 let s = from_utf8(buf.as_ref()).unwrap();
1020 assert_eq!(s, "foo barb");
1021 }
1022
1023 #[test]
1024 fn test_read_chunk_size() {
1025 fn read(s: &str, result: u64) {
1026 assert_eq!(read_chunk_size(&mut s.as_bytes()).unwrap(), result);
1027 }
1028
1029 fn read_err(s: &str) {
1030 assert_eq!(read_chunk_size(&mut s.as_bytes()).unwrap_err().kind(),
1031 io::ErrorKind::InvalidInput);
1032 }
1033
1034 read("1\r\n", 1);
1035 read("01\r\n", 1);
1036 read("0\r\n", 0);
1037 read("00\r\n", 0);
1038 read("A\r\n", 10);
1039 read("a\r\n", 10);
1040 read("Ff\r\n", 255);
1041 read("Ff \r\n", 255);
1042 read_err("F\rF");
1044 read_err("F");
1045 read_err("X\r\n");
1047 read_err("1X\r\n");
1048 read_err("-\r\n");
1049 read_err("-1\r\n");
1050 read("1;extension\r\n", 1);
1052 read("a;ext name=value\r\n", 10);
1053 read("1;extension;extension2\r\n", 1);
1054 read("1;;; ;\r\n", 1);
1055 read("2; extension...\r\n", 2);
1056 read("3 ; extension=123\r\n", 3);
1057 read("3 ;\r\n", 3);
1058 read("3 ; \r\n", 3);
1059 read_err("1 invalid extension\r\n");
1061 read_err("1 A\r\n");
1062 read_err("1;no CRLF");
1063 }
1064
1065 #[test]
1066 fn test_read_sized_early_eof() {
1067 let mut r = super::HttpReader::SizedReader(MockStream::with_input(b"foo bar"), 10);
1068 let mut buf = [0u8; 10];
1069 assert_eq!(r.read(&mut buf).unwrap(), 7);
1070 let e = r.read(&mut buf).unwrap_err();
1071 assert_eq!(e.kind(), io::ErrorKind::Other);
1072 assert_eq!(e.description(), "early eof");
1073 }
1074
1075 #[test]
1076 fn test_read_chunked_early_eof() {
1077 let mut r = super::HttpReader::ChunkedReader(MockStream::with_input(b"\
1078 9\r\n\
1079 foo bar\
1080 "), None);
1081
1082 let mut buf = [0u8; 10];
1083 assert_eq!(r.read(&mut buf).unwrap(), 7);
1084 let e = r.read(&mut buf).unwrap_err();
1085 assert_eq!(e.kind(), io::ErrorKind::Other);
1086 assert_eq!(e.description(), "early eof");
1087 }
1088
1089 #[test]
1090 fn test_read_sized_zero_len_buf() {
1091 let mut r = super::HttpReader::SizedReader(MockStream::with_input(b"foo bar"), 7);
1092 let mut buf = [0u8; 0];
1093 assert_eq!(r.read(&mut buf).unwrap(), 0);
1094 }
1095
1096 #[test]
1097 fn test_read_chunked_zero_len_buf() {
1098 let mut r = super::HttpReader::ChunkedReader(MockStream::with_input(b"\
1099 7\r\n\
1100 foo bar\
1101 0\r\n\r\n\
1102 "), None);
1103
1104 let mut buf = [0u8; 0];
1105 assert_eq!(r.read(&mut buf).unwrap(), 0);
1106 }
1107
1108 #[test]
1109 fn test_read_chunked_fully_consumes() {
1110 let mut r = super::HttpReader::ChunkedReader(MockStream::with_input(b"0\r\n\r\n"), None);
1111 let mut buf = [0; 1];
1112 assert_eq!(r.read(&mut buf).unwrap(), 0);
1113 assert_eq!(r.read(&mut buf).unwrap(), 0);
1114
1115 match r {
1116 super::HttpReader::ChunkedReader(mut r, _) => assert_eq!(r.read(&mut buf).unwrap(), 0),
1117 _ => unreachable!(),
1118 }
1119 }
1120
1121 #[test]
1122 fn test_message_get_incoming_invalid_content_length() {
1123 let raw = MockStream::with_input(
1124 b"HTTP/1.1 200 OK\r\nContent-Length: asdf\r\n\r\n");
1125 let mut msg = Http11Message::with_stream(Box::new(raw));
1126 assert!(msg.get_incoming().is_err());
1127 assert!(msg.close_connection().is_ok());
1128 }
1129
1130 #[test]
1131 fn test_parse_incoming() {
1132 let mut raw = MockStream::with_input(b"GET /echo HTTP/1.1\r\nHost: hyper.rs\r\n\r\n");
1133 let mut buf = BufReader::new(&mut raw);
1134 parse_request(&mut buf).unwrap();
1135 }
1136
1137 #[test]
1138 fn test_parse_raw_status() {
1139 let mut raw = MockStream::with_input(b"HTTP/1.1 200 OK\r\n\r\n");
1140 let mut buf = BufReader::new(&mut raw);
1141 let res = parse_response(&mut buf).unwrap();
1142
1143 assert_eq!(res.subject.1, "OK");
1144
1145 let mut raw = MockStream::with_input(b"HTTP/1.1 200 Howdy\r\n\r\n");
1146 let mut buf = BufReader::new(&mut raw);
1147 let res = parse_response(&mut buf).unwrap();
1148
1149 assert_eq!(res.subject.1, "Howdy");
1150 }
1151
1152
1153 #[test]
1154 fn test_parse_tcp_closed() {
1155 use std::io::ErrorKind;
1156 use error::Error;
1157
1158 let mut empty = MockStream::new();
1159 let mut buf = BufReader::new(&mut empty);
1160 match parse_request(&mut buf) {
1161 Err(Error::Io(ref e)) if e.kind() == ErrorKind::UnexpectedEof => (),
1162 other => panic!("unexpected result: {:?}", other)
1163 }
1164 }
1165
1166 #[cfg(feature = "nightly")]
1167 use test::Bencher;
1168
1169 #[cfg(feature = "nightly")]
1170 #[bench]
1171 fn bench_parse_incoming(b: &mut Bencher) {
1172 let mut raw = MockStream::with_input(b"GET /echo HTTP/1.1\r\nHost: hyper.rs\r\n\r\n");
1173 let mut buf = BufReader::new(&mut raw);
1174 b.iter(|| {
1175 parse_request(&mut buf).unwrap();
1176 buf.get_mut().read.set_position(0);
1177 });
1178 }
1179}