mco_http/http/
h1.rs

1//! Adapts the HTTP/1.1 implementation into the `HttpMessage` API.
2use std::borrow::Cow;
3use std::cmp::min;
4use std::fmt;
5use std::io::{self, Write, BufWriter, BufRead, Read};
6use std::net::Shutdown;
7use std::time::Duration;
8
9use httparse;
10use url::Position as UrlPosition;
11
12use crate::buffer::BufReader;
13use crate::Error;
14use crate::header::{Headers, ContentLength, TransferEncoding};
15use crate::header::Encoding::Chunked;
16use crate::method::{Method};
17use crate::net::{NetworkConnector, NetworkStream};
18use crate::status::StatusCode;
19use crate::version::HttpVersion;
20use crate::version::HttpVersion::{Http10, Http11};
21use crate::uri::RequestUri;
22
23use self::HttpReader::{SizedReader, ChunkedReader, EofReader, EmptyReader};
24use self::HttpWriter::{ChunkedWriter, SizedWriter, EmptyWriter, ThroughWriter};
25
26use crate::http::{
27    RawStatus,
28    Protocol,
29    HttpMessage,
30    RequestHead,
31    ResponseHead,
32};
33use crate::header;
34use crate::version;
35
36const MAX_INVALID_RESPONSE_BYTES: usize = 1024 * 128;
37
38#[derive(Debug)]
39struct Wrapper<T> {
40    obj: Option<T>,
41}
42
43impl<T> Wrapper<T> {
44    pub fn new(obj: T) -> Wrapper<T> {
45        Wrapper { obj: Some(obj) }
46    }
47
48    pub fn map_in_place<F>(&mut self, f: F) where F: FnOnce(T) -> T {
49        let obj = self.obj.take().unwrap();
50        let res = f(obj);
51        self.obj = Some(res);
52    }
53
54    pub fn into_inner(self) -> T { self.obj.unwrap() }
55    pub fn as_mut(&mut self) -> &mut T { self.obj.as_mut().unwrap() }
56    pub fn as_ref(&self) -> &T { self.obj.as_ref().unwrap() }
57}
58
59#[derive(Debug)]
60enum Stream {
61    Idle(Box<dyn NetworkStream + Send>),
62    Writing(HttpWriter<BufWriter<Box<dyn NetworkStream + Send>>>),
63    Reading(HttpReader<BufReader<Box<dyn NetworkStream + Send>>>),
64}
65
66impl Stream {
67    fn writer_mut(&mut self) -> Option<&mut HttpWriter<BufWriter<Box<dyn NetworkStream + Send>>>> {
68        match *self {
69            Stream::Writing(ref mut writer) => Some(writer),
70            _ => None,
71        }
72    }
73    fn reader_mut(&mut self) -> Option<&mut HttpReader<BufReader<Box<dyn NetworkStream + Send>>>> {
74        match *self {
75            Stream::Reading(ref mut reader) => Some(reader),
76            _ => None,
77        }
78    }
79    fn reader_ref(&self) -> Option<&HttpReader<BufReader<Box<dyn NetworkStream + Send>>>> {
80        match *self {
81            Stream::Reading(ref reader) => Some(reader),
82            _ => None,
83        }
84    }
85
86    fn new(stream: Box<dyn NetworkStream + Send>) -> Stream {
87        Stream::Idle(stream)
88    }
89}
90
91/// An implementation of the `HttpMessage` trait for HTTP/1.1.
92#[derive(Debug)]
93pub struct Http11Message {
94    is_proxied: bool,
95    method: Option<Method>,
96    stream: Wrapper<Stream>,
97}
98
99impl Write for Http11Message {
100    #[inline]
101    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
102        match self.stream.as_mut().writer_mut() {
103            None => Err(io::Error::new(io::ErrorKind::Other,
104                                          "Not in a writable state")),
105            Some(ref mut writer) => writer.write(buf),
106        }
107    }
108    #[inline]
109    fn flush(&mut self) -> io::Result<()> {
110        match self.stream.as_mut().writer_mut() {
111            None => Err(io::Error::new(io::ErrorKind::Other,
112                                          "Not in a writable state")),
113            Some(ref mut writer) => writer.flush(),
114        }
115    }
116}
117
118impl Read for Http11Message {
119    #[inline]
120    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
121        match self.stream.as_mut().reader_mut() {
122            None => Err(io::Error::new(io::ErrorKind::Other,
123                                          "Not in a readable state")),
124            Some(ref mut reader) => reader.read(buf),
125        }
126    }
127}
128
129impl HttpMessage for Http11Message {
130    fn set_outgoing(&mut self, mut head: RequestHead) -> crate::Result<RequestHead> {
131        let mut res = Err(Error::from(io::Error::new(
132                            io::ErrorKind::Other,
133                            "")));
134        let mut method = None;
135        let is_proxied = self.is_proxied;
136        self.stream.map_in_place(|stream: Stream| -> Stream {
137            let stream = match stream {
138                Stream::Idle(stream) => stream,
139                _ => {
140                    res = Err(Error::from(io::Error::new(
141                                io::ErrorKind::Other,
142                                "Message not idle, cannot start new outgoing")));
143                    return stream;
144                },
145            };
146            let mut stream = BufWriter::new(stream);
147
148            {
149                let uri = if is_proxied {
150                    head.url.as_ref()
151                } else {
152                    &head.url[UrlPosition::BeforePath..UrlPosition::AfterQuery]
153                };
154
155                let version = version::HttpVersion::Http11;
156                debug!("request line: {:?} {:?} {:?}", head.method, uri, version);
157                match write!(&mut stream, "{} {} {}{}",
158                             head.method, uri, version, LINE_ENDING) {
159                                 Err(e) => {
160                                     res = Err(From::from(e));
161                                     // TODO What should we do if the BufWriter doesn't wanna
162                                     // relinquish the stream?
163                                     return Stream::Idle(stream.into_inner().ok().unwrap());
164                                 },
165                                 Ok(_) => {},
166                             };
167            }
168
169            let stream = {
170                let write_headers = |mut stream: BufWriter<Box<dyn NetworkStream + Send>>, head: &RequestHead| {
171                    debug!("headers={:?}", head.headers);
172                    match write!(&mut stream, "{}{}", head.headers, LINE_ENDING) {
173                        Ok(_) => Ok(stream),
174                        Err(e) => {
175                            Err((e, stream.into_inner().unwrap()))
176                        }
177                    }
178                };
179                match head.method {
180                    Method::Get | Method::Head => {
181                        let writer = match write_headers(stream, &head) {
182                            Ok(w) => w,
183                            Err(e) => {
184                                res = Err(From::from(e.0));
185                                return Stream::Idle(e.1);
186                            }
187                        };
188                        EmptyWriter(writer)
189                    },
190                    _ => {
191                        let mut chunked = true;
192                        let mut len = 0;
193
194                        match head.headers.get::<header::ContentLength>() {
195                            Some(cl) => {
196                                chunked = false;
197                                len = **cl;
198                            },
199                            None => ()
200                        };
201
202                        // can't do in match above, thanks borrowck
203                        if chunked {
204                            let encodings = match head.headers.get_mut::<header::TransferEncoding>() {
205                                Some(encodings) => {
206                                    //TODO: check if chunked is already in encodings. use HashSet?
207                                    encodings.push(header::Encoding::Chunked);
208                                    false
209                                },
210                                None => true
211                            };
212
213                            if encodings {
214                                head.headers.set(
215                                    header::TransferEncoding(vec![header::Encoding::Chunked]))
216                            }
217                        }
218
219                        let stream = match write_headers(stream, &head) {
220                            Ok(s) => s,
221                            Err(e) => {
222                                res = Err(From::from(e.0));
223                                return Stream::Idle(e.1);
224                            },
225                        };
226
227                        if chunked {
228                            ChunkedWriter(stream)
229                        } else {
230                            SizedWriter(stream, len)
231                        }
232                    }
233                }
234            };
235
236            method = Some(head.method.clone());
237            res = Ok(head);
238            Stream::Writing(stream)
239        });
240
241        self.method = method;
242        res
243    }
244
245    fn get_incoming(&mut self) -> crate::Result<ResponseHead> {
246        self.flush_outgoing()?;
247        let method = self.method.take().unwrap_or(Method::Get);
248        let mut res = Err(From::from(
249                        io::Error::new(io::ErrorKind::Other,
250                        "Read already in progress")));
251        self.stream.map_in_place(|stream| {
252            let stream = match stream {
253                Stream::Idle(stream) => stream,
254                _ => {
255                    // The message was already in the reading state...
256                    // TODO Decide what happens in case we try to get a new incoming at that point
257                    res = Err(From::from(
258                            io::Error::new(io::ErrorKind::Other,
259                                           "Read already in progress")));
260                    return stream;
261                }
262            };
263
264            let expected_no_content = stream.previous_response_expected_no_content();
265            trace!("previous_response_expected_no_content = {}", expected_no_content);
266
267            let mut stream = BufReader::new(stream);
268
269            let mut invalid_bytes_read = 0;
270            let head;
271            loop {
272                head = match parse_response(&mut stream) {
273                    Ok(head) => head,
274                    Err(crate::Error::Version)
275                        if expected_no_content && invalid_bytes_read < MAX_INVALID_RESPONSE_BYTES => {
276                            trace!("expected_no_content, found content");
277                            invalid_bytes_read += 1;
278                            stream.consume(1);
279                            continue;
280                        }
281                    Err(e) => {
282                        res = Err(e);
283                        return Stream::Idle(stream.into_inner());
284                    }
285                };
286                break;
287            }
288
289            let raw_status = head.subject;
290            let headers = head.headers;
291
292            let is_empty = !should_have_response_body(&method, raw_status.0);
293            stream.get_mut().set_previous_response_expected_no_content(is_empty);
294            // According to https://tools.ietf.org/html/rfc7230#section-3.3.3
295            // 1. HEAD reponses, and Status 1xx, 204, and 304 cannot have a body.
296            // 2. Status 2xx to a CONNECT cannot have a body.
297            // 3. Transfer-Encoding: chunked has a chunked body.
298            // 4. If multiple differing Content-Length headers or invalid, close connection.
299            // 5. Content-Length header has a sized body.
300            // 6. Not Client.
301            // 7. Read till EOF.
302            let reader = if is_empty {
303                EmptyReader(stream)
304            } else {
305                if let Some(&TransferEncoding(ref codings)) = headers.get() {
306                    if codings.last() == Some(&Chunked) {
307                        ChunkedReader(stream, None)
308                    } else {
309                        trace!("not chuncked. read till eof");
310                        EofReader(stream)
311                    }
312                } else if let Some(&ContentLength(len)) =  headers.get() {
313                    SizedReader(stream, len)
314                } else if headers.has::<ContentLength>() {
315                    trace!("illegal Content-Length: {:?}", headers.get_raw("Content-Length"));
316                    res = Err(Error::Header);
317                    return Stream::Idle(stream.into_inner());
318                } else {
319                    trace!("neither Transfer-Encoding nor Content-Length");
320                    EofReader(stream)
321                }
322            };
323
324            trace!("Http11Message.reader = {:?}", reader);
325
326
327            res = Ok(ResponseHead {
328                headers: headers,
329                raw_status: raw_status,
330                version: head.version,
331            });
332
333            Stream::Reading(reader)
334        });
335        res
336    }
337
338    fn has_body(&self) -> bool {
339        match self.stream.as_ref().reader_ref() {
340            Some(&EmptyReader(..)) |
341            Some(&SizedReader(_, 0)) |
342            Some(&ChunkedReader(_, Some(0))) => false,
343            // specifically EofReader is always true
344            _ => true
345        }
346    }
347
348    #[inline]
349    fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
350        self.get_ref().set_read_timeout(dur)
351    }
352
353    #[inline]
354    fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
355        self.get_ref().set_write_timeout(dur)
356    }
357
358    #[inline]
359    fn close_connection(&mut self) -> crate::Result<()> {
360        self.get_mut().close(Shutdown::Both)?;
361        Ok(())
362    }
363
364    #[inline]
365    fn set_proxied(&mut self, val: bool) {
366        self.is_proxied = val;
367    }
368}
369
370impl Http11Message {
371    /// Consumes the `Http11Message` and returns the underlying `NetworkStream`.
372    pub fn into_inner(self) -> Box<dyn NetworkStream + Send> {
373        match self.stream.into_inner() {
374            Stream::Idle(stream) => stream,
375            Stream::Writing(stream) => stream.into_inner().into_inner().unwrap(),
376            Stream::Reading(stream) => stream.into_inner().into_inner(),
377        }
378    }
379
380    /// Gets a borrowed reference to the underlying `NetworkStream`, regardless of the state of the
381    /// `Http11Message`.
382    pub fn get_ref(&self) -> &(dyn NetworkStream + Send) {
383        match *self.stream.as_ref() {
384            Stream::Idle(ref stream) => &**stream,
385            Stream::Writing(ref stream) => &**stream.get_ref().get_ref(),
386            Stream::Reading(ref stream) => &**stream.get_ref().get_ref()
387        }
388    }
389
390    /// Gets a mutable reference to the underlying `NetworkStream`, regardless of the state of the
391    /// `Http11Message`.
392    pub fn get_mut(&mut self) -> &mut (dyn NetworkStream + Send) {
393        match *self.stream.as_mut() {
394            Stream::Idle(ref mut stream) => &mut **stream,
395            Stream::Writing(ref mut stream) => &mut **stream.get_mut().get_mut(),
396            Stream::Reading(ref mut stream) => &mut **stream.get_mut().get_mut()
397        }
398    }
399
400    /// Creates a new `Http11Message` that will use the given `NetworkStream` for communicating to
401    /// the peer.
402    pub fn with_stream(stream: Box<dyn NetworkStream + Send>) -> Http11Message {
403        Http11Message {
404            is_proxied: false,
405            method: None,
406            stream: Wrapper::new(Stream::new(stream)),
407        }
408    }
409
410    /// Flushes the current outgoing content and moves the stream into the `stream` property.
411    ///
412    /// TODO It might be sensible to lift this up to the `HttpMessage` trait itself...
413    pub fn flush_outgoing(&mut self) -> crate::Result<()> {
414        let mut res = Ok(());
415        self.stream.map_in_place(|stream| {
416            let writer = match stream {
417                Stream::Writing(writer) => writer,
418                _ => {
419                    res = Ok(());
420                    return stream;
421                },
422            };
423            // end() already flushes
424            let raw = match writer.end() {
425                Ok(buf) => buf.into_inner().unwrap(),
426                Err(e) => {
427                    res = Err(From::from(e.0));
428                    return Stream::Writing(e.1);
429                }
430            };
431            Stream::Idle(raw)
432        });
433        res
434    }
435}
436
437/// The `Protocol` implementation provides HTTP/1.1 messages.
438pub struct Http11Protocol {
439    connector: Connector,
440}
441
442impl Protocol for Http11Protocol {
443    fn new_message(&self, host: &str, port: u16, scheme: &str) -> crate::Result<Box<dyn HttpMessage>> {
444        let stream = self.connector.connect(host, port, scheme)?.into();
445
446        Ok(Box::new(Http11Message::with_stream(stream)))
447    }
448}
449
450impl Http11Protocol {
451    /// Creates a new `Http11Protocol` instance that will use the given `NetworkConnector` for
452    /// establishing HTTP connections.
453    pub fn with_connector<C, S>(c: C) -> Http11Protocol
454            where C: NetworkConnector<Stream=S> + Send + Sync + 'static,
455                  S: NetworkStream + Send {
456        Http11Protocol {
457            connector: Connector(Box::new(ConnAdapter(c))),
458        }
459    }
460}
461
462struct ConnAdapter<C: NetworkConnector + Send + Sync>(C);
463
464impl<C: NetworkConnector<Stream=S> + Send + Sync, S: NetworkStream + Send>
465        NetworkConnector for ConnAdapter<C> {
466    type Stream = Box<dyn NetworkStream + Send>;
467    #[inline]
468    fn connect(&self, host: &str, port: u16, scheme: &str)
469        -> crate::Result<Box<dyn NetworkStream + Send>> {
470        Ok(self.0.connect(host, port, scheme)?.into())
471    }
472}
473
474struct Connector(Box<dyn NetworkConnector<Stream=Box<dyn NetworkStream + Send>> + Send + Sync>);
475
476impl NetworkConnector for Connector {
477    type Stream = Box<dyn NetworkStream + Send>;
478    #[inline]
479    fn connect(&self, host: &str, port: u16, scheme: &str)
480        -> crate::Result<Box<dyn NetworkStream + Send>> {
481        Ok(self.0.connect(host, port, scheme)?.into())
482    }
483}
484
485
486/// Readers to handle different Transfer-Encodings.
487///
488/// If a message body does not include a Transfer-Encoding, it *should*
489/// include a Content-Length header.
490pub enum HttpReader<R> {
491    /// A Reader used when a Content-Length header is passed with a positive integer.
492    SizedReader(R, u64),
493    /// A Reader used when Transfer-Encoding is `chunked`.
494    ChunkedReader(R, Option<u64>),
495    /// A Reader used for responses that don't indicate a length or chunked.
496    ///
497    /// Note: This should only used for `Response`s. It is illegal for a
498    /// `Request` to be made with both `Content-Length` and
499    /// `Transfer-Encoding: chunked` missing, as explained from the spec:
500    ///
501    /// > If a Transfer-Encoding header field is present in a response and
502    /// > the chunked transfer coding is not the final encoding, the
503    /// > message body length is determined by reading the connection until
504    /// > it is closed by the server.  If a Transfer-Encoding header field
505    /// > is present in a request and the chunked transfer coding is not
506    /// > the final encoding, the message body length cannot be determined
507    /// > reliably; the server MUST respond with the 400 (Bad Request)
508    /// > status code and then close the connection.
509    EofReader(R),
510    /// A Reader used for messages that should never have a body.
511    ///
512    /// See https://tools.ietf.org/html/rfc7230#section-3.3.3
513    EmptyReader(R),
514}
515
516impl<R: Read> HttpReader<R> {
517
518    /// Unwraps this HttpReader and returns the underlying Reader.
519    pub fn into_inner(self) -> R {
520        match self {
521            SizedReader(r, _) => r,
522            ChunkedReader(r, _) => r,
523            EofReader(r) => r,
524            EmptyReader(r) => r,
525        }
526    }
527
528    /// Gets a borrowed reference to the underlying Reader.
529    pub fn get_ref(&self) -> &R {
530        match *self {
531            SizedReader(ref r, _) => r,
532            ChunkedReader(ref r, _) => r,
533            EofReader(ref r) => r,
534            EmptyReader(ref r) => r,
535        }
536    }
537
538    /// Gets a mutable reference to the underlying Reader.
539    pub fn get_mut(&mut self) -> &mut R {
540        match *self {
541            SizedReader(ref mut r, _) => r,
542            ChunkedReader(ref mut r, _) => r,
543            EofReader(ref mut r) => r,
544            EmptyReader(ref mut r) => r,
545        }
546    }
547}
548
549impl<R> fmt::Debug for HttpReader<R> {
550    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
551        match *self {
552            SizedReader(_,rem) => write!(fmt, "SizedReader(remaining={:?})", rem),
553            ChunkedReader(_, None) => write!(fmt, "ChunkedReader(chunk_remaining=unknown)"),
554            ChunkedReader(_, Some(rem)) => write!(fmt, "ChunkedReader(chunk_remaining={:?})", rem),
555            EofReader(_) => write!(fmt, "EofReader"),
556            EmptyReader(_) => write!(fmt, "EmptyReader"),
557        }
558    }
559}
560
561impl<R: Read> Read for HttpReader<R> {
562    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
563        if buf.is_empty() {
564            return Ok(0);
565        }
566        match *self {
567            SizedReader(ref mut body, ref mut remaining) => {
568                trace!("Sized read, remaining={:?}", remaining);
569                if *remaining == 0 {
570                    Ok(0)
571                } else {
572                    let to_read = min(*remaining as usize, buf.len());
573                    let num = body.read(&mut buf[..to_read])? as u64;
574                    trace!("Sized read: {}", num);
575                    if num > *remaining {
576                        *remaining = 0;
577                    } else if num == 0 {
578                        return Err(io::Error::new(io::ErrorKind::Other, "early eof"));
579                    } else {
580                        *remaining -= num;
581                    }
582                    Ok(num as usize)
583                }
584            },
585            ChunkedReader(ref mut body, ref mut opt_remaining) => {
586                let mut rem = match *opt_remaining {
587                    Some(ref rem) => *rem,
588                    // None means we don't know the size of the next chunk
589                    None => read_chunk_size(body)?
590                };
591                trace!("Chunked read, remaining={:?}", rem);
592
593                if rem == 0 {
594                    if opt_remaining.is_none() {
595                        eat(body, LINE_ENDING.as_bytes())?;
596                    }
597
598                    *opt_remaining = Some(0);
599
600                    // chunk of size 0 signals the end of the chunked stream
601                    // if the 0 digit was missing from the stream, it would
602                    // be an InvalidInput error instead.
603                    trace!("end of chunked");
604
605                    return Ok(0)
606                }
607
608                let to_read = min(rem as usize, buf.len());
609                let count = body.read(&mut buf[..to_read])? as u64;
610
611                if count == 0 {
612                    *opt_remaining = Some(0);
613                    return Err(io::Error::new(io::ErrorKind::Other, "early eof"));
614                }
615
616                rem -= count;
617                *opt_remaining = if rem > 0 {
618                    Some(rem)
619                } else {
620                    eat(body, LINE_ENDING.as_bytes())?;
621                    None
622                };
623                Ok(count as usize)
624            },
625            EofReader(ref mut body) => {
626                let r = body.read(buf);
627                trace!("eofread: {:?}", r);
628                r
629            },
630            EmptyReader(_) => Ok(0)
631        }
632    }
633}
634
635fn eat<R: Read>(rdr: &mut R, bytes: &[u8]) -> io::Result<()> {
636    let mut buf = [0];
637    for &b in bytes.iter() {
638        match rdr.read(&mut buf)? {
639            1 if buf[0] == b => (),
640            _ => return Err(io::Error::new(io::ErrorKind::InvalidInput,
641                                          "Invalid characters found")),
642        }
643    }
644    Ok(())
645}
646
647/// Chunked chunks start with 1*HEXDIGIT, indicating the size of the chunk.
648fn read_chunk_size<R: Read>(rdr: &mut R) -> io::Result<u64> {
649    macro_rules! byte (
650        ($rdr:ident) => ({
651            let mut buf = [0];
652            match $rdr.read(&mut buf)? {
653                1 => buf[0],
654                _ => return Err(io::Error::new(io::ErrorKind::InvalidInput,
655                                                  "Invalid chunk size line")),
656
657            }
658        })
659    );
660    let mut size = 0u64;
661    let radix = 16;
662    let mut in_ext = false;
663    let mut in_chunk_size = true;
664    loop {
665        match byte!(rdr) {
666            b@b'0'..=b'9' if in_chunk_size => {
667                size *= radix;
668                size += (b - b'0') as u64;
669            },
670            b@b'a'..=b'f' if in_chunk_size => {
671                size *= radix;
672                size += (b + 10 - b'a') as u64;
673            },
674            b@b'A'..=b'F' if in_chunk_size => {
675                size *= radix;
676                size += (b + 10 - b'A') as u64;
677            },
678            CR => {
679                match byte!(rdr) {
680                    LF => break,
681                    _ => return Err(io::Error::new(io::ErrorKind::InvalidInput,
682                                                  "Invalid chunk size line"))
683
684                }
685            },
686            // If we weren't in the extension yet, the ";" signals its start
687            b';' if !in_ext => {
688                in_ext = true;
689                in_chunk_size = false;
690            },
691            // "Linear white space" is ignored between the chunk size and the
692            // extension separator token (";") due to the "implied *LWS rule".
693            b'\t' | b' ' if !in_ext & !in_chunk_size => {},
694            // LWS can follow the chunk size, but no more digits can come
695            b'\t' | b' ' if in_chunk_size => in_chunk_size = false,
696            // We allow any arbitrary octet once we are in the extension, since
697            // they all get ignored anyway. According to the HTTP spec, valid
698            // extensions would have a more strict syntax:
699            //     (token ["=" (token | quoted-string)])
700            // but we gain nothing by rejecting an otherwise valid chunk size.
701            ext if in_ext => {
702                todo!("chunk extension byte={}", ext);
703            },
704            // Finally, if we aren't in the extension and we're reading any
705            // other octet, the chunk size line is invalid!
706            _ => {
707                return Err(io::Error::new(io::ErrorKind::InvalidInput,
708                                         "Invalid chunk size line"));
709            }
710        }
711    }
712    trace!("chunk size={:?}", size);
713    Ok(size)
714}
715
716fn should_have_response_body(method: &Method, status: u16) -> bool {
717    trace!("should_have_response_body({:?}, {})", method, status);
718    match (method, status) {
719        (&Method::Head, _) |
720        (_, 100..=199) |
721        (_, 204) |
722        (_, 304) |
723        (&Method::Connect, 200..=299) => false,
724        _ => true
725    }
726}
727
728/// Writers to handle different Transfer-Encodings.
729pub enum HttpWriter<W: Write> {
730    /// A no-op Writer, used initially before Transfer-Encoding is determined.
731    ThroughWriter(W),
732    /// A Writer for when Transfer-Encoding includes `chunked`.
733    ChunkedWriter(W),
734    /// A Writer for when Content-Length is set.
735    ///
736    /// Enforces that the body is not longer than the Content-Length header.
737    SizedWriter(W, u64),
738    /// A writer that should not write any body.
739    EmptyWriter(W),
740}
741
742impl<W: Write> HttpWriter<W> {
743    /// Unwraps the HttpWriter and returns the underlying Writer.
744    #[inline]
745    pub fn into_inner(self) -> W {
746        match self {
747            ThroughWriter(w) => w,
748            ChunkedWriter(w) => w,
749            SizedWriter(w, _) => w,
750            EmptyWriter(w) => w,
751        }
752    }
753
754    /// Access the inner Writer.
755    #[inline]
756    pub fn get_ref(&self) -> &W {
757        match *self {
758            ThroughWriter(ref w) => w,
759            ChunkedWriter(ref w) => w,
760            SizedWriter(ref w, _) => w,
761            EmptyWriter(ref w) => w,
762        }
763    }
764
765    /// Access the inner Writer mutably.
766    ///
767    /// Warning: You should not write to this directly, as you can corrupt
768    /// the state.
769    #[inline]
770    pub fn get_mut(&mut self) -> &mut W {
771        match *self {
772            ThroughWriter(ref mut w) => w,
773            ChunkedWriter(ref mut w) => w,
774            SizedWriter(ref mut w, _) => w,
775            EmptyWriter(ref mut w) => w,
776        }
777    }
778
779    /// Ends the HttpWriter, and returns the underlying Writer.
780    ///
781    /// A final `write_all()` is called with an empty message, and then flushed.
782    /// The ChunkedWriter variant will use this to write the 0-sized last-chunk.
783    #[inline]
784    pub fn end(mut self) -> Result<W, EndError<W>> {
785        fn inner<W: Write>(w: &mut W) -> io::Result<()> {
786            w.write(&[])?;
787            w.flush()
788        }
789
790        match inner(&mut self) {
791            Ok(..) => Ok(self.into_inner()),
792            Err(e) => Err(EndError(e, self))
793        }
794    }
795}
796
797#[derive(Debug)]
798pub struct EndError<W: Write>(io::Error, HttpWriter<W>);
799
800impl<W: Write> From<EndError<W>> for io::Error {
801    fn from(e: EndError<W>) -> io::Error {
802        e.0
803    }
804}
805
806impl<W: Write> Write for HttpWriter<W> {
807    #[inline]
808    fn write(&mut self, msg: &[u8]) -> io::Result<usize> {
809        match *self {
810            ThroughWriter(ref mut w) => w.write(msg),
811            ChunkedWriter(ref mut w) => {
812                let chunk_size = msg.len();
813                trace!("chunked write, size = {:?}", chunk_size);
814                write!(w, "{:X}{}", chunk_size, LINE_ENDING)?;
815                w.write_all(msg)?;
816                w.write_all(LINE_ENDING.as_bytes())?;
817                Ok(msg.len())
818            },
819            SizedWriter(ref mut w, ref mut remaining) => {
820                let len = msg.len() as u64;
821                if len > *remaining {
822                    let len = *remaining;
823                    w.write_all(&msg[..len as usize])?;
824                    *remaining = 0;
825                    Ok(len as usize)
826                } else {
827                    w.write_all(msg)?;
828                    *remaining -= len;
829                    Ok(len as usize)
830                }
831            },
832            EmptyWriter(..) => {
833                if !msg.is_empty() {
834                    error!("Cannot include a body with this kind of message");
835                }
836                Ok(0)
837            }
838        }
839    }
840
841    #[inline]
842    fn flush(&mut self) -> io::Result<()> {
843        match *self {
844            ThroughWriter(ref mut w) => w.flush(),
845            ChunkedWriter(ref mut w) => w.flush(),
846            SizedWriter(ref mut w, _) => w.flush(),
847            EmptyWriter(ref mut w) => w.flush(),
848        }
849    }
850}
851
852impl<W: Write> fmt::Debug for HttpWriter<W> {
853    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
854        match *self {
855            ThroughWriter(_) => write!(fmt, "ThroughWriter"),
856            ChunkedWriter(_) => write!(fmt, "ChunkedWriter"),
857            SizedWriter(_, rem) => write!(fmt, "SizedWriter(remaining={:?})", rem),
858            EmptyWriter(_) => write!(fmt, "EmptyWriter"),
859        }
860    }
861}
862
863const MAX_HEADERS: usize = 100;
864
865/// Parses a request into an Incoming message head.
866#[inline]
867pub fn parse_request<R: Read>(buf: &mut BufReader<R>) -> crate::Result<Incoming<(Method, RequestUri)>> {
868    parse::<R, httparse::Request, (Method, RequestUri)>(buf)
869}
870
871/// Parses a response into an Incoming message head.
872#[inline]
873pub fn parse_response<R: Read>(buf: &mut BufReader<R>) -> crate::Result<Incoming<RawStatus>> {
874    parse::<R, httparse::Response, RawStatus>(buf)
875}
876
877fn parse<R: Read, T: TryParse<Subject=I>, I>(rdr: &mut BufReader<R>) -> crate::Result<Incoming<I>> {
878    loop {
879        match try_parse::<R, T, I>(rdr)? {
880            httparse::Status::Complete((inc, len)) => {
881                rdr.consume(len);
882                return Ok(inc);
883            },
884            _partial => ()
885        }
886        let n = rdr.read_into_buf()?;
887        if n == 0 {
888            let buffered = rdr.get_buf().len();
889            if buffered == crate::buffer::MAX_BUFFER_SIZE {
890                return Err(Error::TooLarge);
891            } else {
892                return Err(Error::Io(io::Error::new(
893                    io::ErrorKind::UnexpectedEof,
894                    "end of stream before headers finished"
895                )));
896            }
897        }
898    }
899}
900
901fn try_parse<R: Read, T: TryParse<Subject=I>, I>(rdr: &mut BufReader<R>) -> TryParseResult<I> {
902    let mut headers = [httparse::EMPTY_HEADER; MAX_HEADERS];
903    let buf = rdr.get_buf();
904    if buf.len() == 0 {
905        return Ok(httparse::Status::Partial);
906    }
907    trace!("try_parse({:?})", buf);
908    <T as TryParse>::try_parse(&mut headers, buf)
909}
910
911#[doc(hidden)]
912trait TryParse {
913    type Subject;
914    fn try_parse<'a>(headers: &'a mut [httparse::Header<'a>], buf: &'a [u8]) ->
915        TryParseResult<Self::Subject>;
916}
917
918type TryParseResult<T> = Result<httparse::Status<(Incoming<T>, usize)>, Error>;
919
920impl<'a> TryParse for httparse::Request<'a, 'a> {
921    type Subject = (Method, RequestUri);
922
923    fn try_parse<'b>(headers: &'b mut [httparse::Header<'b>], buf: &'b [u8]) ->
924            TryParseResult<(Method, RequestUri)> {
925        trace!("Request.try_parse([Header; {}], [u8; {}])", headers.len(), buf.len());
926        let mut req = httparse::Request::new(headers);
927        Ok(match req.parse(buf)? {
928            httparse::Status::Complete(len) => {
929                trace!("Request.try_parse Complete({})", len);
930                httparse::Status::Complete((Incoming {
931                    version: if req.version.unwrap() == 1 { Http11 } else { Http10 },
932                    subject: (
933                        req.method.unwrap().parse()?,
934                        req.path.unwrap().parse()?),
935                    headers: Headers::from_raw(req.headers)?
936                }, len))
937            },
938            httparse::Status::Partial => httparse::Status::Partial
939        })
940    }
941}
942
943impl<'a> TryParse for httparse::Response<'a, 'a> {
944    type Subject = RawStatus;
945
946    fn try_parse<'b>(headers: &'b mut [httparse::Header<'b>], buf: &'b [u8]) ->
947            TryParseResult<RawStatus> {
948        trace!("Response.try_parse([Header; {}], [u8; {}])", headers.len(), buf.len());
949        let mut res = httparse::Response::new(headers);
950        Ok(match res.parse(buf)? {
951            httparse::Status::Complete(len) => {
952                trace!("Response.try_parse Complete({})", len);
953                let code = res.code.unwrap();
954                let reason = match StatusCode::from_u16(code).canonical_reason() {
955                    Some(reason) if reason == res.reason.unwrap() => Cow::Borrowed(reason),
956                    _ => Cow::Owned(res.reason.unwrap().to_owned())
957                };
958                httparse::Status::Complete((Incoming {
959                    version: if res.version.unwrap() == 1 { Http11 } else { Http10 },
960                    subject: RawStatus(code, reason),
961                    headers: Headers::from_raw(res.headers)?
962                }, len))
963            },
964            httparse::Status::Partial => httparse::Status::Partial
965        })
966    }
967}
968
969/// An Incoming Message head. Includes request/status line, and headers.
970#[derive(Debug)]
971pub struct Incoming<S> {
972    /// HTTP version of the message.
973    pub version: HttpVersion,
974    /// Subject (request line or status line) of Incoming message.
975    pub subject: S,
976    /// Headers of the Incoming message.
977    pub headers: Headers
978}
979
980/// The `\r` byte.
981pub const CR: u8 = b'\r';
982/// The `\n` byte.
983pub const LF: u8 = b'\n';
984/// The bytes `\r\n`.
985pub const LINE_ENDING: &'static str = "\r\n";
986
987#[cfg(test)]
988mod tests {
989    use std::error::Error;
990    use std::io::{self, Read, Write};
991
992
993    use crate::buffer::BufReader;
994    use crate::mock::MockStream;
995    use crate::http::HttpMessage;
996
997    use super::{read_chunk_size, parse_request, parse_response, Http11Message};
998
999    #[test]
1000    fn test_write_chunked() {
1001        use std::str::from_utf8;
1002        let mut w = super::HttpWriter::ChunkedWriter(Vec::new());
1003        w.write_all(b"foo bar").unwrap();
1004        w.write_all(b"baz quux herp").unwrap();
1005        let buf = w.end().unwrap();
1006        let s = from_utf8(buf.as_ref()).unwrap();
1007        assert_eq!(s, "7\r\nfoo bar\r\nD\r\nbaz quux herp\r\n0\r\n\r\n");
1008    }
1009
1010    #[test]
1011    fn test_write_sized() {
1012        use std::str::from_utf8;
1013        let mut w = super::HttpWriter::SizedWriter(Vec::new(), 8);
1014        w.write_all(b"foo bar").unwrap();
1015        assert_eq!(w.write(b"baz").unwrap(), 1);
1016
1017        let buf = w.end().unwrap();
1018        let s = from_utf8(buf.as_ref()).unwrap();
1019        assert_eq!(s, "foo barb");
1020    }
1021
1022    #[test]
1023    fn test_read_chunk_size() {
1024        fn read(s: &str, result: u64) {
1025            assert_eq!(read_chunk_size(&mut s.as_bytes()).unwrap(), result);
1026        }
1027
1028        fn read_err(s: &str) {
1029            assert_eq!(read_chunk_size(&mut s.as_bytes()).unwrap_err().kind(),
1030                io::ErrorKind::InvalidInput);
1031        }
1032
1033        read("1\r\n", 1);
1034        read("01\r\n", 1);
1035        read("0\r\n", 0);
1036        read("00\r\n", 0);
1037        read("A\r\n", 10);
1038        read("a\r\n", 10);
1039        read("Ff\r\n", 255);
1040        read("Ff   \r\n", 255);
1041        // Missing LF or CRLF
1042        read_err("F\rF");
1043        read_err("F");
1044        // Invalid hex digit
1045        read_err("X\r\n");
1046        read_err("1X\r\n");
1047        read_err("-\r\n");
1048        read_err("-1\r\n");
1049        // Acceptable (if not fully valid) extensions do not influence the size
1050        read("1;extension\r\n", 1);
1051        read("a;ext name=value\r\n", 10);
1052        read("1;extension;extension2\r\n", 1);
1053        read("1;;;  ;\r\n", 1);
1054        read("2; extension...\r\n", 2);
1055        read("3   ; extension=123\r\n", 3);
1056        read("3   ;\r\n", 3);
1057        read("3   ;   \r\n", 3);
1058        // Invalid extensions cause an error
1059        read_err("1 invalid extension\r\n");
1060        read_err("1 A\r\n");
1061        read_err("1;no CRLF");
1062    }
1063
1064    #[test]
1065    fn test_read_sized_early_eof() {
1066        let mut r = super::HttpReader::SizedReader(MockStream::with_input(b"foo bar"), 10);
1067        let mut buf = [0u8; 10];
1068        assert_eq!(r.read(&mut buf).unwrap(), 7);
1069        let e = r.read(&mut buf).unwrap_err();
1070        assert_eq!(e.kind(), io::ErrorKind::Other);
1071        assert_eq!(e.description(), "early eof");
1072    }
1073
1074    #[test]
1075    fn test_read_chunked_early_eof() {
1076        let mut r = super::HttpReader::ChunkedReader(MockStream::with_input(b"\
1077            9\r\n\
1078            foo bar\
1079        "), None);
1080
1081        let mut buf = [0u8; 10];
1082        assert_eq!(r.read(&mut buf).unwrap(), 7);
1083        let e = r.read(&mut buf).unwrap_err();
1084        assert_eq!(e.kind(), io::ErrorKind::Other);
1085        assert_eq!(e.description(), "early eof");
1086    }
1087
1088    #[test]
1089    fn test_read_sized_zero_len_buf() {
1090        let mut r = super::HttpReader::SizedReader(MockStream::with_input(b"foo bar"), 7);
1091        let mut buf = [0u8; 0];
1092        assert_eq!(r.read(&mut buf).unwrap(), 0);
1093    }
1094
1095    #[test]
1096    fn test_read_chunked_zero_len_buf() {
1097        let mut r = super::HttpReader::ChunkedReader(MockStream::with_input(b"\
1098            7\r\n\
1099            foo bar\
1100            0\r\n\r\n\
1101        "), None);
1102
1103        let mut buf = [0u8; 0];
1104        assert_eq!(r.read(&mut buf).unwrap(), 0);
1105    }
1106
1107    #[test]
1108    fn test_read_chunked_fully_consumes() {
1109        let mut r = super::HttpReader::ChunkedReader(MockStream::with_input(b"0\r\n\r\n"), None);
1110        let mut buf = [0; 1];
1111        assert_eq!(r.read(&mut buf).unwrap(), 0);
1112        assert_eq!(r.read(&mut buf).unwrap(), 0);
1113
1114        match r {
1115            super::HttpReader::ChunkedReader(mut r, _) => assert_eq!(r.read(&mut buf).unwrap(), 0),
1116            _ => unreachable!(),
1117        }
1118    }
1119
1120    #[test]
1121    fn test_message_get_incoming_invalid_content_length() {
1122        let raw = MockStream::with_input(
1123            b"HTTP/1.1 200 OK\r\nContent-Length: asdf\r\n\r\n");
1124        let mut msg = Http11Message::with_stream(Box::new(raw));
1125        assert!(msg.get_incoming().is_err());
1126        assert!(msg.close_connection().is_ok());
1127    }
1128
1129    #[test]
1130    fn test_parse_incoming() {
1131        let mut raw = MockStream::with_input(b"GET /echo HTTP/1.1\r\nHost: mco_http.rs\r\n\r\n");
1132        let mut buf = BufReader::new(&mut raw);
1133        parse_request(&mut buf).unwrap();
1134    }
1135
1136    #[test]
1137    fn test_parse_raw_status() {
1138        let mut raw = MockStream::with_input(b"HTTP/1.1 200 OK\r\n\r\n");
1139        let mut buf = BufReader::new(&mut raw);
1140        let res = parse_response(&mut buf).unwrap();
1141
1142        assert_eq!(res.subject.1, "OK");
1143
1144        let mut raw = MockStream::with_input(b"HTTP/1.1 200 Howdy\r\n\r\n");
1145        let mut buf = BufReader::new(&mut raw);
1146        let res = parse_response(&mut buf).unwrap();
1147
1148        assert_eq!(res.subject.1, "Howdy");
1149    }
1150
1151
1152    #[test]
1153    fn test_parse_tcp_closed() {
1154        use std::io::ErrorKind;
1155        use crate::error::Error;
1156
1157        let mut empty = MockStream::new();
1158        let mut buf = BufReader::new(&mut empty);
1159        match parse_request(&mut buf) {
1160            Err(Error::Io(ref e)) if e.kind() == ErrorKind::UnexpectedEof => (),
1161            other => panic!("unexpected result: {:?}", other)
1162        }
1163    }
1164
1165    #[cfg(feature = "nightly")]
1166    use test::Bencher;
1167
1168    #[cfg(feature = "nightly")]
1169    #[bench]
1170    fn bench_parse_incoming(b: &mut Bencher) {
1171        let mut raw = MockStream::with_input(b"GET /echo HTTP/1.1\r\nHost: mco_http.rs\r\n\r\n");
1172        let mut buf = BufReader::new(&mut raw);
1173        b.iter(|| {
1174            parse_request(&mut buf).unwrap();
1175            buf.get_mut().read.set_position(0);
1176        });
1177    }
1178}