1use std::borrow::Cow;
3use std::cmp::min;
4use std::fmt;
5use std::io::{self, Write, BufWriter, BufRead, Read};
6use std::net::Shutdown;
7use std::time::Duration;
8
9use httparse;
10use url::Position as UrlPosition;
11
12use crate::buffer::BufReader;
13use crate::Error;
14use crate::header::{Headers, ContentLength, TransferEncoding};
15use crate::header::Encoding::Chunked;
16use crate::method::{Method};
17use crate::net::{NetworkConnector, NetworkStream};
18use crate::status::StatusCode;
19use crate::version::HttpVersion;
20use crate::version::HttpVersion::{Http10, Http11};
21use crate::uri::RequestUri;
22
23use self::HttpReader::{SizedReader, ChunkedReader, EofReader, EmptyReader};
24use self::HttpWriter::{ChunkedWriter, SizedWriter, EmptyWriter, ThroughWriter};
25
26use crate::http::{
27 RawStatus,
28 Protocol,
29 HttpMessage,
30 RequestHead,
31 ResponseHead,
32};
33use crate::header;
34use crate::version;
35
36const MAX_INVALID_RESPONSE_BYTES: usize = 1024 * 128;
37
38#[derive(Debug)]
39struct Wrapper<T> {
40 obj: Option<T>,
41}
42
43impl<T> Wrapper<T> {
44 pub fn new(obj: T) -> Wrapper<T> {
45 Wrapper { obj: Some(obj) }
46 }
47
48 pub fn map_in_place<F>(&mut self, f: F) where F: FnOnce(T) -> T {
49 let obj = self.obj.take().unwrap();
50 let res = f(obj);
51 self.obj = Some(res);
52 }
53
54 pub fn into_inner(self) -> T { self.obj.unwrap() }
55 pub fn as_mut(&mut self) -> &mut T { self.obj.as_mut().unwrap() }
56 pub fn as_ref(&self) -> &T { self.obj.as_ref().unwrap() }
57}
58
59#[derive(Debug)]
60enum Stream {
61 Idle(Box<dyn NetworkStream + Send>),
62 Writing(HttpWriter<BufWriter<Box<dyn NetworkStream + Send>>>),
63 Reading(HttpReader<BufReader<Box<dyn NetworkStream + Send>>>),
64}
65
66impl Stream {
67 fn writer_mut(&mut self) -> Option<&mut HttpWriter<BufWriter<Box<dyn NetworkStream + Send>>>> {
68 match *self {
69 Stream::Writing(ref mut writer) => Some(writer),
70 _ => None,
71 }
72 }
73 fn reader_mut(&mut self) -> Option<&mut HttpReader<BufReader<Box<dyn NetworkStream + Send>>>> {
74 match *self {
75 Stream::Reading(ref mut reader) => Some(reader),
76 _ => None,
77 }
78 }
79 fn reader_ref(&self) -> Option<&HttpReader<BufReader<Box<dyn NetworkStream + Send>>>> {
80 match *self {
81 Stream::Reading(ref reader) => Some(reader),
82 _ => None,
83 }
84 }
85
86 fn new(stream: Box<dyn NetworkStream + Send>) -> Stream {
87 Stream::Idle(stream)
88 }
89}
90
91#[derive(Debug)]
93pub struct Http11Message {
94 is_proxied: bool,
95 method: Option<Method>,
96 stream: Wrapper<Stream>,
97}
98
99impl Write for Http11Message {
100 #[inline]
101 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
102 match self.stream.as_mut().writer_mut() {
103 None => Err(io::Error::new(io::ErrorKind::Other,
104 "Not in a writable state")),
105 Some(ref mut writer) => writer.write(buf),
106 }
107 }
108 #[inline]
109 fn flush(&mut self) -> io::Result<()> {
110 match self.stream.as_mut().writer_mut() {
111 None => Err(io::Error::new(io::ErrorKind::Other,
112 "Not in a writable state")),
113 Some(ref mut writer) => writer.flush(),
114 }
115 }
116}
117
118impl Read for Http11Message {
119 #[inline]
120 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
121 match self.stream.as_mut().reader_mut() {
122 None => Err(io::Error::new(io::ErrorKind::Other,
123 "Not in a readable state")),
124 Some(ref mut reader) => reader.read(buf),
125 }
126 }
127}
128
129impl HttpMessage for Http11Message {
130 fn set_outgoing(&mut self, mut head: RequestHead) -> crate::Result<RequestHead> {
131 let mut res = Err(Error::from(io::Error::new(
132 io::ErrorKind::Other,
133 "")));
134 let mut method = None;
135 let is_proxied = self.is_proxied;
136 self.stream.map_in_place(|stream: Stream| -> Stream {
137 let stream = match stream {
138 Stream::Idle(stream) => stream,
139 _ => {
140 res = Err(Error::from(io::Error::new(
141 io::ErrorKind::Other,
142 "Message not idle, cannot start new outgoing")));
143 return stream;
144 },
145 };
146 let mut stream = BufWriter::new(stream);
147
148 {
149 let uri = if is_proxied {
150 head.url.as_ref()
151 } else {
152 &head.url[UrlPosition::BeforePath..UrlPosition::AfterQuery]
153 };
154
155 let version = version::HttpVersion::Http11;
156 debug!("request line: {:?} {:?} {:?}", head.method, uri, version);
157 match write!(&mut stream, "{} {} {}{}",
158 head.method, uri, version, LINE_ENDING) {
159 Err(e) => {
160 res = Err(From::from(e));
161 return Stream::Idle(stream.into_inner().ok().unwrap());
164 },
165 Ok(_) => {},
166 };
167 }
168
169 let stream = {
170 let write_headers = |mut stream: BufWriter<Box<dyn NetworkStream + Send>>, head: &RequestHead| {
171 debug!("headers={:?}", head.headers);
172 match write!(&mut stream, "{}{}", head.headers, LINE_ENDING) {
173 Ok(_) => Ok(stream),
174 Err(e) => {
175 Err((e, stream.into_inner().unwrap()))
176 }
177 }
178 };
179 match head.method {
180 Method::Get | Method::Head => {
181 let writer = match write_headers(stream, &head) {
182 Ok(w) => w,
183 Err(e) => {
184 res = Err(From::from(e.0));
185 return Stream::Idle(e.1);
186 }
187 };
188 EmptyWriter(writer)
189 },
190 _ => {
191 let mut chunked = true;
192 let mut len = 0;
193
194 match head.headers.get::<header::ContentLength>() {
195 Some(cl) => {
196 chunked = false;
197 len = **cl;
198 },
199 None => ()
200 };
201
202 if chunked {
204 let encodings = match head.headers.get_mut::<header::TransferEncoding>() {
205 Some(encodings) => {
206 encodings.push(header::Encoding::Chunked);
208 false
209 },
210 None => true
211 };
212
213 if encodings {
214 head.headers.set(
215 header::TransferEncoding(vec![header::Encoding::Chunked]))
216 }
217 }
218
219 let stream = match write_headers(stream, &head) {
220 Ok(s) => s,
221 Err(e) => {
222 res = Err(From::from(e.0));
223 return Stream::Idle(e.1);
224 },
225 };
226
227 if chunked {
228 ChunkedWriter(stream)
229 } else {
230 SizedWriter(stream, len)
231 }
232 }
233 }
234 };
235
236 method = Some(head.method.clone());
237 res = Ok(head);
238 Stream::Writing(stream)
239 });
240
241 self.method = method;
242 res
243 }
244
245 fn get_incoming(&mut self) -> crate::Result<ResponseHead> {
246 self.flush_outgoing()?;
247 let method = self.method.take().unwrap_or(Method::Get);
248 let mut res = Err(From::from(
249 io::Error::new(io::ErrorKind::Other,
250 "Read already in progress")));
251 self.stream.map_in_place(|stream| {
252 let stream = match stream {
253 Stream::Idle(stream) => stream,
254 _ => {
255 res = Err(From::from(
258 io::Error::new(io::ErrorKind::Other,
259 "Read already in progress")));
260 return stream;
261 }
262 };
263
264 let expected_no_content = stream.previous_response_expected_no_content();
265 trace!("previous_response_expected_no_content = {}", expected_no_content);
266
267 let mut stream = BufReader::new(stream);
268
269 let mut invalid_bytes_read = 0;
270 let head;
271 loop {
272 head = match parse_response(&mut stream) {
273 Ok(head) => head,
274 Err(crate::Error::Version)
275 if expected_no_content && invalid_bytes_read < MAX_INVALID_RESPONSE_BYTES => {
276 trace!("expected_no_content, found content");
277 invalid_bytes_read += 1;
278 stream.consume(1);
279 continue;
280 }
281 Err(e) => {
282 res = Err(e);
283 return Stream::Idle(stream.into_inner());
284 }
285 };
286 break;
287 }
288
289 let raw_status = head.subject;
290 let headers = head.headers;
291
292 let is_empty = !should_have_response_body(&method, raw_status.0);
293 stream.get_mut().set_previous_response_expected_no_content(is_empty);
294 let reader = if is_empty {
303 EmptyReader(stream)
304 } else {
305 if let Some(&TransferEncoding(ref codings)) = headers.get() {
306 if codings.last() == Some(&Chunked) {
307 ChunkedReader(stream, None)
308 } else {
309 trace!("not chuncked. read till eof");
310 EofReader(stream)
311 }
312 } else if let Some(&ContentLength(len)) = headers.get() {
313 SizedReader(stream, len)
314 } else if headers.has::<ContentLength>() {
315 trace!("illegal Content-Length: {:?}", headers.get_raw("Content-Length"));
316 res = Err(Error::Header);
317 return Stream::Idle(stream.into_inner());
318 } else {
319 trace!("neither Transfer-Encoding nor Content-Length");
320 EofReader(stream)
321 }
322 };
323
324 trace!("Http11Message.reader = {:?}", reader);
325
326
327 res = Ok(ResponseHead {
328 headers: headers,
329 raw_status: raw_status,
330 version: head.version,
331 });
332
333 Stream::Reading(reader)
334 });
335 res
336 }
337
338 fn has_body(&self) -> bool {
339 match self.stream.as_ref().reader_ref() {
340 Some(&EmptyReader(..)) |
341 Some(&SizedReader(_, 0)) |
342 Some(&ChunkedReader(_, Some(0))) => false,
343 _ => true
345 }
346 }
347
348 #[inline]
349 fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
350 self.get_ref().set_read_timeout(dur)
351 }
352
353 #[inline]
354 fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
355 self.get_ref().set_write_timeout(dur)
356 }
357
358 #[inline]
359 fn close_connection(&mut self) -> crate::Result<()> {
360 self.get_mut().close(Shutdown::Both)?;
361 Ok(())
362 }
363
364 #[inline]
365 fn set_proxied(&mut self, val: bool) {
366 self.is_proxied = val;
367 }
368}
369
370impl Http11Message {
371 pub fn into_inner(self) -> Box<dyn NetworkStream + Send> {
373 match self.stream.into_inner() {
374 Stream::Idle(stream) => stream,
375 Stream::Writing(stream) => stream.into_inner().into_inner().unwrap(),
376 Stream::Reading(stream) => stream.into_inner().into_inner(),
377 }
378 }
379
380 pub fn get_ref(&self) -> &(dyn NetworkStream + Send) {
383 match *self.stream.as_ref() {
384 Stream::Idle(ref stream) => &**stream,
385 Stream::Writing(ref stream) => &**stream.get_ref().get_ref(),
386 Stream::Reading(ref stream) => &**stream.get_ref().get_ref()
387 }
388 }
389
390 pub fn get_mut(&mut self) -> &mut (dyn NetworkStream + Send) {
393 match *self.stream.as_mut() {
394 Stream::Idle(ref mut stream) => &mut **stream,
395 Stream::Writing(ref mut stream) => &mut **stream.get_mut().get_mut(),
396 Stream::Reading(ref mut stream) => &mut **stream.get_mut().get_mut()
397 }
398 }
399
400 pub fn with_stream(stream: Box<dyn NetworkStream + Send>) -> Http11Message {
403 Http11Message {
404 is_proxied: false,
405 method: None,
406 stream: Wrapper::new(Stream::new(stream)),
407 }
408 }
409
410 pub fn flush_outgoing(&mut self) -> crate::Result<()> {
414 let mut res = Ok(());
415 self.stream.map_in_place(|stream| {
416 let writer = match stream {
417 Stream::Writing(writer) => writer,
418 _ => {
419 res = Ok(());
420 return stream;
421 },
422 };
423 let raw = match writer.end() {
425 Ok(buf) => buf.into_inner().unwrap(),
426 Err(e) => {
427 res = Err(From::from(e.0));
428 return Stream::Writing(e.1);
429 }
430 };
431 Stream::Idle(raw)
432 });
433 res
434 }
435}
436
437pub struct Http11Protocol {
439 connector: Connector,
440}
441
442impl Protocol for Http11Protocol {
443 fn new_message(&self, host: &str, port: u16, scheme: &str) -> crate::Result<Box<dyn HttpMessage>> {
444 let stream = self.connector.connect(host, port, scheme)?.into();
445
446 Ok(Box::new(Http11Message::with_stream(stream)))
447 }
448}
449
450impl Http11Protocol {
451 pub fn with_connector<C, S>(c: C) -> Http11Protocol
454 where C: NetworkConnector<Stream=S> + Send + Sync + 'static,
455 S: NetworkStream + Send {
456 Http11Protocol {
457 connector: Connector(Box::new(ConnAdapter(c))),
458 }
459 }
460}
461
462struct ConnAdapter<C: NetworkConnector + Send + Sync>(C);
463
464impl<C: NetworkConnector<Stream=S> + Send + Sync, S: NetworkStream + Send>
465 NetworkConnector for ConnAdapter<C> {
466 type Stream = Box<dyn NetworkStream + Send>;
467 #[inline]
468 fn connect(&self, host: &str, port: u16, scheme: &str)
469 -> crate::Result<Box<dyn NetworkStream + Send>> {
470 Ok(self.0.connect(host, port, scheme)?.into())
471 }
472}
473
474struct Connector(Box<dyn NetworkConnector<Stream=Box<dyn NetworkStream + Send>> + Send + Sync>);
475
476impl NetworkConnector for Connector {
477 type Stream = Box<dyn NetworkStream + Send>;
478 #[inline]
479 fn connect(&self, host: &str, port: u16, scheme: &str)
480 -> crate::Result<Box<dyn NetworkStream + Send>> {
481 Ok(self.0.connect(host, port, scheme)?.into())
482 }
483}
484
485
486pub enum HttpReader<R> {
491 SizedReader(R, u64),
493 ChunkedReader(R, Option<u64>),
495 EofReader(R),
510 EmptyReader(R),
514}
515
516impl<R: Read> HttpReader<R> {
517
518 pub fn into_inner(self) -> R {
520 match self {
521 SizedReader(r, _) => r,
522 ChunkedReader(r, _) => r,
523 EofReader(r) => r,
524 EmptyReader(r) => r,
525 }
526 }
527
528 pub fn get_ref(&self) -> &R {
530 match *self {
531 SizedReader(ref r, _) => r,
532 ChunkedReader(ref r, _) => r,
533 EofReader(ref r) => r,
534 EmptyReader(ref r) => r,
535 }
536 }
537
538 pub fn get_mut(&mut self) -> &mut R {
540 match *self {
541 SizedReader(ref mut r, _) => r,
542 ChunkedReader(ref mut r, _) => r,
543 EofReader(ref mut r) => r,
544 EmptyReader(ref mut r) => r,
545 }
546 }
547}
548
549impl<R> fmt::Debug for HttpReader<R> {
550 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
551 match *self {
552 SizedReader(_,rem) => write!(fmt, "SizedReader(remaining={:?})", rem),
553 ChunkedReader(_, None) => write!(fmt, "ChunkedReader(chunk_remaining=unknown)"),
554 ChunkedReader(_, Some(rem)) => write!(fmt, "ChunkedReader(chunk_remaining={:?})", rem),
555 EofReader(_) => write!(fmt, "EofReader"),
556 EmptyReader(_) => write!(fmt, "EmptyReader"),
557 }
558 }
559}
560
561impl<R: Read> Read for HttpReader<R> {
562 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
563 if buf.is_empty() {
564 return Ok(0);
565 }
566 match *self {
567 SizedReader(ref mut body, ref mut remaining) => {
568 trace!("Sized read, remaining={:?}", remaining);
569 if *remaining == 0 {
570 Ok(0)
571 } else {
572 let to_read = min(*remaining as usize, buf.len());
573 let num = body.read(&mut buf[..to_read])? as u64;
574 trace!("Sized read: {}", num);
575 if num > *remaining {
576 *remaining = 0;
577 } else if num == 0 {
578 return Err(io::Error::new(io::ErrorKind::Other, "early eof"));
579 } else {
580 *remaining -= num;
581 }
582 Ok(num as usize)
583 }
584 },
585 ChunkedReader(ref mut body, ref mut opt_remaining) => {
586 let mut rem = match *opt_remaining {
587 Some(ref rem) => *rem,
588 None => read_chunk_size(body)?
590 };
591 trace!("Chunked read, remaining={:?}", rem);
592
593 if rem == 0 {
594 if opt_remaining.is_none() {
595 eat(body, LINE_ENDING.as_bytes())?;
596 }
597
598 *opt_remaining = Some(0);
599
600 trace!("end of chunked");
604
605 return Ok(0)
606 }
607
608 let to_read = min(rem as usize, buf.len());
609 let count = body.read(&mut buf[..to_read])? as u64;
610
611 if count == 0 {
612 *opt_remaining = Some(0);
613 return Err(io::Error::new(io::ErrorKind::Other, "early eof"));
614 }
615
616 rem -= count;
617 *opt_remaining = if rem > 0 {
618 Some(rem)
619 } else {
620 eat(body, LINE_ENDING.as_bytes())?;
621 None
622 };
623 Ok(count as usize)
624 },
625 EofReader(ref mut body) => {
626 let r = body.read(buf);
627 trace!("eofread: {:?}", r);
628 r
629 },
630 EmptyReader(_) => Ok(0)
631 }
632 }
633}
634
635fn eat<R: Read>(rdr: &mut R, bytes: &[u8]) -> io::Result<()> {
636 let mut buf = [0];
637 for &b in bytes.iter() {
638 match rdr.read(&mut buf)? {
639 1 if buf[0] == b => (),
640 _ => return Err(io::Error::new(io::ErrorKind::InvalidInput,
641 "Invalid characters found")),
642 }
643 }
644 Ok(())
645}
646
647fn read_chunk_size<R: Read>(rdr: &mut R) -> io::Result<u64> {
649 macro_rules! byte (
650 ($rdr:ident) => ({
651 let mut buf = [0];
652 match $rdr.read(&mut buf)? {
653 1 => buf[0],
654 _ => return Err(io::Error::new(io::ErrorKind::InvalidInput,
655 "Invalid chunk size line")),
656
657 }
658 })
659 );
660 let mut size = 0u64;
661 let radix = 16;
662 let mut in_ext = false;
663 let mut in_chunk_size = true;
664 loop {
665 match byte!(rdr) {
666 b@b'0'..=b'9' if in_chunk_size => {
667 size *= radix;
668 size += (b - b'0') as u64;
669 },
670 b@b'a'..=b'f' if in_chunk_size => {
671 size *= radix;
672 size += (b + 10 - b'a') as u64;
673 },
674 b@b'A'..=b'F' if in_chunk_size => {
675 size *= radix;
676 size += (b + 10 - b'A') as u64;
677 },
678 CR => {
679 match byte!(rdr) {
680 LF => break,
681 _ => return Err(io::Error::new(io::ErrorKind::InvalidInput,
682 "Invalid chunk size line"))
683
684 }
685 },
686 b';' if !in_ext => {
688 in_ext = true;
689 in_chunk_size = false;
690 },
691 b'\t' | b' ' if !in_ext & !in_chunk_size => {},
694 b'\t' | b' ' if in_chunk_size => in_chunk_size = false,
696 ext if in_ext => {
702 todo!("chunk extension byte={}", ext);
703 },
704 _ => {
707 return Err(io::Error::new(io::ErrorKind::InvalidInput,
708 "Invalid chunk size line"));
709 }
710 }
711 }
712 trace!("chunk size={:?}", size);
713 Ok(size)
714}
715
716fn should_have_response_body(method: &Method, status: u16) -> bool {
717 trace!("should_have_response_body({:?}, {})", method, status);
718 match (method, status) {
719 (&Method::Head, _) |
720 (_, 100..=199) |
721 (_, 204) |
722 (_, 304) |
723 (&Method::Connect, 200..=299) => false,
724 _ => true
725 }
726}
727
728pub enum HttpWriter<W: Write> {
730 ThroughWriter(W),
732 ChunkedWriter(W),
734 SizedWriter(W, u64),
738 EmptyWriter(W),
740}
741
742impl<W: Write> HttpWriter<W> {
743 #[inline]
745 pub fn into_inner(self) -> W {
746 match self {
747 ThroughWriter(w) => w,
748 ChunkedWriter(w) => w,
749 SizedWriter(w, _) => w,
750 EmptyWriter(w) => w,
751 }
752 }
753
754 #[inline]
756 pub fn get_ref(&self) -> &W {
757 match *self {
758 ThroughWriter(ref w) => w,
759 ChunkedWriter(ref w) => w,
760 SizedWriter(ref w, _) => w,
761 EmptyWriter(ref w) => w,
762 }
763 }
764
765 #[inline]
770 pub fn get_mut(&mut self) -> &mut W {
771 match *self {
772 ThroughWriter(ref mut w) => w,
773 ChunkedWriter(ref mut w) => w,
774 SizedWriter(ref mut w, _) => w,
775 EmptyWriter(ref mut w) => w,
776 }
777 }
778
779 #[inline]
784 pub fn end(mut self) -> Result<W, EndError<W>> {
785 fn inner<W: Write>(w: &mut W) -> io::Result<()> {
786 w.write(&[])?;
787 w.flush()
788 }
789
790 match inner(&mut self) {
791 Ok(..) => Ok(self.into_inner()),
792 Err(e) => Err(EndError(e, self))
793 }
794 }
795}
796
797#[derive(Debug)]
798pub struct EndError<W: Write>(io::Error, HttpWriter<W>);
799
800impl<W: Write> From<EndError<W>> for io::Error {
801 fn from(e: EndError<W>) -> io::Error {
802 e.0
803 }
804}
805
806impl<W: Write> Write for HttpWriter<W> {
807 #[inline]
808 fn write(&mut self, msg: &[u8]) -> io::Result<usize> {
809 match *self {
810 ThroughWriter(ref mut w) => w.write(msg),
811 ChunkedWriter(ref mut w) => {
812 let chunk_size = msg.len();
813 trace!("chunked write, size = {:?}", chunk_size);
814 write!(w, "{:X}{}", chunk_size, LINE_ENDING)?;
815 w.write_all(msg)?;
816 w.write_all(LINE_ENDING.as_bytes())?;
817 Ok(msg.len())
818 },
819 SizedWriter(ref mut w, ref mut remaining) => {
820 let len = msg.len() as u64;
821 if len > *remaining {
822 let len = *remaining;
823 w.write_all(&msg[..len as usize])?;
824 *remaining = 0;
825 Ok(len as usize)
826 } else {
827 w.write_all(msg)?;
828 *remaining -= len;
829 Ok(len as usize)
830 }
831 },
832 EmptyWriter(..) => {
833 if !msg.is_empty() {
834 error!("Cannot include a body with this kind of message");
835 }
836 Ok(0)
837 }
838 }
839 }
840
841 #[inline]
842 fn flush(&mut self) -> io::Result<()> {
843 match *self {
844 ThroughWriter(ref mut w) => w.flush(),
845 ChunkedWriter(ref mut w) => w.flush(),
846 SizedWriter(ref mut w, _) => w.flush(),
847 EmptyWriter(ref mut w) => w.flush(),
848 }
849 }
850}
851
852impl<W: Write> fmt::Debug for HttpWriter<W> {
853 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
854 match *self {
855 ThroughWriter(_) => write!(fmt, "ThroughWriter"),
856 ChunkedWriter(_) => write!(fmt, "ChunkedWriter"),
857 SizedWriter(_, rem) => write!(fmt, "SizedWriter(remaining={:?})", rem),
858 EmptyWriter(_) => write!(fmt, "EmptyWriter"),
859 }
860 }
861}
862
863const MAX_HEADERS: usize = 100;
864
865#[inline]
867pub fn parse_request<R: Read>(buf: &mut BufReader<R>) -> crate::Result<Incoming<(Method, RequestUri)>> {
868 parse::<R, httparse::Request, (Method, RequestUri)>(buf)
869}
870
871#[inline]
873pub fn parse_response<R: Read>(buf: &mut BufReader<R>) -> crate::Result<Incoming<RawStatus>> {
874 parse::<R, httparse::Response, RawStatus>(buf)
875}
876
877fn parse<R: Read, T: TryParse<Subject=I>, I>(rdr: &mut BufReader<R>) -> crate::Result<Incoming<I>> {
878 loop {
879 match try_parse::<R, T, I>(rdr)? {
880 httparse::Status::Complete((inc, len)) => {
881 rdr.consume(len);
882 return Ok(inc);
883 },
884 _partial => ()
885 }
886 let n = rdr.read_into_buf()?;
887 if n == 0 {
888 let buffered = rdr.get_buf().len();
889 if buffered == crate::buffer::MAX_BUFFER_SIZE {
890 return Err(Error::TooLarge);
891 } else {
892 return Err(Error::Io(io::Error::new(
893 io::ErrorKind::UnexpectedEof,
894 "end of stream before headers finished"
895 )));
896 }
897 }
898 }
899}
900
901fn try_parse<R: Read, T: TryParse<Subject=I>, I>(rdr: &mut BufReader<R>) -> TryParseResult<I> {
902 let mut headers = [httparse::EMPTY_HEADER; MAX_HEADERS];
903 let buf = rdr.get_buf();
904 if buf.len() == 0 {
905 return Ok(httparse::Status::Partial);
906 }
907 trace!("try_parse({:?})", buf);
908 <T as TryParse>::try_parse(&mut headers, buf)
909}
910
911#[doc(hidden)]
912trait TryParse {
913 type Subject;
914 fn try_parse<'a>(headers: &'a mut [httparse::Header<'a>], buf: &'a [u8]) ->
915 TryParseResult<Self::Subject>;
916}
917
918type TryParseResult<T> = Result<httparse::Status<(Incoming<T>, usize)>, Error>;
919
920impl<'a> TryParse for httparse::Request<'a, 'a> {
921 type Subject = (Method, RequestUri);
922
923 fn try_parse<'b>(headers: &'b mut [httparse::Header<'b>], buf: &'b [u8]) ->
924 TryParseResult<(Method, RequestUri)> {
925 trace!("Request.try_parse([Header; {}], [u8; {}])", headers.len(), buf.len());
926 let mut req = httparse::Request::new(headers);
927 Ok(match req.parse(buf)? {
928 httparse::Status::Complete(len) => {
929 trace!("Request.try_parse Complete({})", len);
930 httparse::Status::Complete((Incoming {
931 version: if req.version.unwrap() == 1 { Http11 } else { Http10 },
932 subject: (
933 req.method.unwrap().parse()?,
934 req.path.unwrap().parse()?),
935 headers: Headers::from_raw(req.headers)?
936 }, len))
937 },
938 httparse::Status::Partial => httparse::Status::Partial
939 })
940 }
941}
942
943impl<'a> TryParse for httparse::Response<'a, 'a> {
944 type Subject = RawStatus;
945
946 fn try_parse<'b>(headers: &'b mut [httparse::Header<'b>], buf: &'b [u8]) ->
947 TryParseResult<RawStatus> {
948 trace!("Response.try_parse([Header; {}], [u8; {}])", headers.len(), buf.len());
949 let mut res = httparse::Response::new(headers);
950 Ok(match res.parse(buf)? {
951 httparse::Status::Complete(len) => {
952 trace!("Response.try_parse Complete({})", len);
953 let code = res.code.unwrap();
954 let reason = match StatusCode::from_u16(code).canonical_reason() {
955 Some(reason) if reason == res.reason.unwrap() => Cow::Borrowed(reason),
956 _ => Cow::Owned(res.reason.unwrap().to_owned())
957 };
958 httparse::Status::Complete((Incoming {
959 version: if res.version.unwrap() == 1 { Http11 } else { Http10 },
960 subject: RawStatus(code, reason),
961 headers: Headers::from_raw(res.headers)?
962 }, len))
963 },
964 httparse::Status::Partial => httparse::Status::Partial
965 })
966 }
967}
968
969#[derive(Debug)]
971pub struct Incoming<S> {
972 pub version: HttpVersion,
974 pub subject: S,
976 pub headers: Headers
978}
979
980pub const CR: u8 = b'\r';
982pub const LF: u8 = b'\n';
984pub const LINE_ENDING: &'static str = "\r\n";
986
987#[cfg(test)]
988mod tests {
989 use std::error::Error;
990 use std::io::{self, Read, Write};
991
992
993 use crate::buffer::BufReader;
994 use crate::mock::MockStream;
995 use crate::http::HttpMessage;
996
997 use super::{read_chunk_size, parse_request, parse_response, Http11Message};
998
999 #[test]
1000 fn test_write_chunked() {
1001 use std::str::from_utf8;
1002 let mut w = super::HttpWriter::ChunkedWriter(Vec::new());
1003 w.write_all(b"foo bar").unwrap();
1004 w.write_all(b"baz quux herp").unwrap();
1005 let buf = w.end().unwrap();
1006 let s = from_utf8(buf.as_ref()).unwrap();
1007 assert_eq!(s, "7\r\nfoo bar\r\nD\r\nbaz quux herp\r\n0\r\n\r\n");
1008 }
1009
1010 #[test]
1011 fn test_write_sized() {
1012 use std::str::from_utf8;
1013 let mut w = super::HttpWriter::SizedWriter(Vec::new(), 8);
1014 w.write_all(b"foo bar").unwrap();
1015 assert_eq!(w.write(b"baz").unwrap(), 1);
1016
1017 let buf = w.end().unwrap();
1018 let s = from_utf8(buf.as_ref()).unwrap();
1019 assert_eq!(s, "foo barb");
1020 }
1021
1022 #[test]
1023 fn test_read_chunk_size() {
1024 fn read(s: &str, result: u64) {
1025 assert_eq!(read_chunk_size(&mut s.as_bytes()).unwrap(), result);
1026 }
1027
1028 fn read_err(s: &str) {
1029 assert_eq!(read_chunk_size(&mut s.as_bytes()).unwrap_err().kind(),
1030 io::ErrorKind::InvalidInput);
1031 }
1032
1033 read("1\r\n", 1);
1034 read("01\r\n", 1);
1035 read("0\r\n", 0);
1036 read("00\r\n", 0);
1037 read("A\r\n", 10);
1038 read("a\r\n", 10);
1039 read("Ff\r\n", 255);
1040 read("Ff \r\n", 255);
1041 read_err("F\rF");
1043 read_err("F");
1044 read_err("X\r\n");
1046 read_err("1X\r\n");
1047 read_err("-\r\n");
1048 read_err("-1\r\n");
1049 read("1;extension\r\n", 1);
1051 read("a;ext name=value\r\n", 10);
1052 read("1;extension;extension2\r\n", 1);
1053 read("1;;; ;\r\n", 1);
1054 read("2; extension...\r\n", 2);
1055 read("3 ; extension=123\r\n", 3);
1056 read("3 ;\r\n", 3);
1057 read("3 ; \r\n", 3);
1058 read_err("1 invalid extension\r\n");
1060 read_err("1 A\r\n");
1061 read_err("1;no CRLF");
1062 }
1063
1064 #[test]
1065 fn test_read_sized_early_eof() {
1066 let mut r = super::HttpReader::SizedReader(MockStream::with_input(b"foo bar"), 10);
1067 let mut buf = [0u8; 10];
1068 assert_eq!(r.read(&mut buf).unwrap(), 7);
1069 let e = r.read(&mut buf).unwrap_err();
1070 assert_eq!(e.kind(), io::ErrorKind::Other);
1071 assert_eq!(e.description(), "early eof");
1072 }
1073
1074 #[test]
1075 fn test_read_chunked_early_eof() {
1076 let mut r = super::HttpReader::ChunkedReader(MockStream::with_input(b"\
1077 9\r\n\
1078 foo bar\
1079 "), None);
1080
1081 let mut buf = [0u8; 10];
1082 assert_eq!(r.read(&mut buf).unwrap(), 7);
1083 let e = r.read(&mut buf).unwrap_err();
1084 assert_eq!(e.kind(), io::ErrorKind::Other);
1085 assert_eq!(e.description(), "early eof");
1086 }
1087
1088 #[test]
1089 fn test_read_sized_zero_len_buf() {
1090 let mut r = super::HttpReader::SizedReader(MockStream::with_input(b"foo bar"), 7);
1091 let mut buf = [0u8; 0];
1092 assert_eq!(r.read(&mut buf).unwrap(), 0);
1093 }
1094
1095 #[test]
1096 fn test_read_chunked_zero_len_buf() {
1097 let mut r = super::HttpReader::ChunkedReader(MockStream::with_input(b"\
1098 7\r\n\
1099 foo bar\
1100 0\r\n\r\n\
1101 "), None);
1102
1103 let mut buf = [0u8; 0];
1104 assert_eq!(r.read(&mut buf).unwrap(), 0);
1105 }
1106
1107 #[test]
1108 fn test_read_chunked_fully_consumes() {
1109 let mut r = super::HttpReader::ChunkedReader(MockStream::with_input(b"0\r\n\r\n"), None);
1110 let mut buf = [0; 1];
1111 assert_eq!(r.read(&mut buf).unwrap(), 0);
1112 assert_eq!(r.read(&mut buf).unwrap(), 0);
1113
1114 match r {
1115 super::HttpReader::ChunkedReader(mut r, _) => assert_eq!(r.read(&mut buf).unwrap(), 0),
1116 _ => unreachable!(),
1117 }
1118 }
1119
1120 #[test]
1121 fn test_message_get_incoming_invalid_content_length() {
1122 let raw = MockStream::with_input(
1123 b"HTTP/1.1 200 OK\r\nContent-Length: asdf\r\n\r\n");
1124 let mut msg = Http11Message::with_stream(Box::new(raw));
1125 assert!(msg.get_incoming().is_err());
1126 assert!(msg.close_connection().is_ok());
1127 }
1128
1129 #[test]
1130 fn test_parse_incoming() {
1131 let mut raw = MockStream::with_input(b"GET /echo HTTP/1.1\r\nHost: mco_http.rs\r\n\r\n");
1132 let mut buf = BufReader::new(&mut raw);
1133 parse_request(&mut buf).unwrap();
1134 }
1135
1136 #[test]
1137 fn test_parse_raw_status() {
1138 let mut raw = MockStream::with_input(b"HTTP/1.1 200 OK\r\n\r\n");
1139 let mut buf = BufReader::new(&mut raw);
1140 let res = parse_response(&mut buf).unwrap();
1141
1142 assert_eq!(res.subject.1, "OK");
1143
1144 let mut raw = MockStream::with_input(b"HTTP/1.1 200 Howdy\r\n\r\n");
1145 let mut buf = BufReader::new(&mut raw);
1146 let res = parse_response(&mut buf).unwrap();
1147
1148 assert_eq!(res.subject.1, "Howdy");
1149 }
1150
1151
1152 #[test]
1153 fn test_parse_tcp_closed() {
1154 use std::io::ErrorKind;
1155 use crate::error::Error;
1156
1157 let mut empty = MockStream::new();
1158 let mut buf = BufReader::new(&mut empty);
1159 match parse_request(&mut buf) {
1160 Err(Error::Io(ref e)) if e.kind() == ErrorKind::UnexpectedEof => (),
1161 other => panic!("unexpected result: {:?}", other)
1162 }
1163 }
1164
1165 #[cfg(feature = "nightly")]
1166 use test::Bencher;
1167
1168 #[cfg(feature = "nightly")]
1169 #[bench]
1170 fn bench_parse_incoming(b: &mut Bencher) {
1171 let mut raw = MockStream::with_input(b"GET /echo HTTP/1.1\r\nHost: mco_http.rs\r\n\r\n");
1172 let mut buf = BufReader::new(&mut raw);
1173 b.iter(|| {
1174 parse_request(&mut buf).unwrap();
1175 buf.get_mut().read.set_position(0);
1176 });
1177 }
1178}