http_codec/
client.rs

1use std::mem;
2
3use atoi::atoi;
4use bytes::{Bytes, BytesMut};
5use http::{header, response, Request, Response};
6use httparse;
7use tokio_codec::{Decoder, Encoder};
8
9use error;
10
11#[derive(Debug)]
12pub struct HttpCodec {
13    state: State,
14}
15
16impl HttpCodec {
17    pub fn new() -> HttpCodec {
18        HttpCodec {
19            state: State::ParsingResponse,
20        }
21    }
22}
23
24#[derive(Debug)]
25enum State {
26    ParsingResponse,
27    ReadingBody(response::Parts, usize),
28}
29
30impl Encoder for HttpCodec {
31    type Item = Request<Bytes>;
32    type Error = error::Error;
33
34    fn encode(&mut self, item: Request<Bytes>, dst: &mut BytesMut) -> error::Result<()> {
35        dst.extend_from_slice(item.method().as_str().as_bytes());
36        dst.extend_from_slice(b" ");
37        dst.extend_from_slice(item.uri().path().as_bytes());
38        dst.extend_from_slice(b" HTTP/1.1\r\n");
39
40        for (k, v) in item.headers() {
41            dst.extend_from_slice(k.as_str().as_bytes());
42            dst.extend_from_slice(b": ");
43            dst.extend_from_slice(v.as_bytes());
44            dst.extend_from_slice(b"\r\n");
45        }
46
47        dst.extend_from_slice(b"\r\n");
48        dst.extend_from_slice(item.body());
49
50        Ok(())
51    }
52}
53
54impl Decoder for HttpCodec {
55    type Item = Response<Bytes>;
56    type Error = error::Error;
57
58    fn decode(&mut self, src: &mut BytesMut) -> error::Result<Option<Response<Bytes>>> {
59        use self::State::*;
60
61        if src.len() == 0 {
62            return Ok(None);
63        }
64
65        loop {
66            match mem::replace(&mut self.state, ParsingResponse) {
67                ParsingResponse => {
68                    let amt = {
69                        let mut headers = [httparse::EMPTY_HEADER; 16];
70                        let mut response = httparse::Response::new(&mut headers);
71                        let amt = match response.parse(src)? {
72                            httparse::Status::Complete(amt) => amt,
73                            httparse::Status::Partial => return Ok(None),
74                        };
75                        match response.version.unwrap() {
76                            1 => (),
77                            version => return Err(error::Error::VersionError(version)),
78                        }
79                        let mut builder = Response::builder();
80                        builder.status(response.code.unwrap());
81                        for header in response.headers.iter() {
82                            builder.header(header.name, header.value);
83                        }
84                        let r = builder.body(()).unwrap();
85                        let cl = match r.headers().get(header::CONTENT_LENGTH) {
86                            Some(cl) => match atoi(cl.as_bytes()) {
87                                Some(cl) => cl,
88                                None => return Err(error::Error::ContentLengthError),
89                            },
90                            None => return Err(error::Error::ContentLengthError),
91                        };
92                        let (parts, _) = r.into_parts();
93                        self.state = ReadingBody(parts, cl);
94                        amt
95                    };
96                    src.advance(amt);
97                }
98                ReadingBody(parts, cl) => {
99                    if src.len() < cl {
100                        self.state = ReadingBody(parts, cl);
101                        return Ok(None);
102                    }
103                    let body = src.split_to(cl).freeze();
104                    return Ok(Some(Response::from_parts(parts, body)));
105                }
106            }
107        }
108    }
109}
110
111#[cfg(test)]
112mod tests {
113    extern crate fake_stream;
114    extern crate futures;
115
116    use self::fake_stream::FakeStream;
117    use self::futures::{Async, Future, Sink, Stream};
118    use super::*;
119    use http::{header, Method};
120    use std::io::{Read, Write};
121    use std::str::from_utf8;
122
123    #[test]
124    fn test_decode() {
125        let mut fake = FakeStream::new();
126        let res = b"\
127            HTTP/1.1 200 OK\r\n\
128            Date: Mon, 27 Jul 2009 12:28:53 GMT\r\n\
129            Server: Apache/2.2.14 (Win32)\r\n\
130            Last-Modified: Wed, 22 Jul 2009 19:15:56 GMT\r\n\
131            Content-Length: 9\r\n\
132            Content-Type: text/html\r\n\r\n\
133            something";
134        let wl = fake.write(res).unwrap();
135
136        assert_eq!(res.len(), wl);
137
138        let mut framed = HttpCodec::new().framed(fake);
139
140        let response = match framed.poll().unwrap() {
141            Async::Ready(Some(response)) => response,
142            _ => panic!("no response"),
143        };
144
145        assert_eq!(response.status().as_u16(), 200);
146        assert_eq!(response.headers().len(), 5);
147        assert_eq!(response.body(), &Bytes::from_static(b"something"));
148    }
149
150    #[test]
151    fn test_encode() {
152        let fake = FakeStream::new();
153        let expected = "\
154                        POST /cgi-bin/process.cgi HTTP/1.1\r\n\
155                        connection: Keep-Alive\r\n\
156                        content-length: 9\r\n\r\n\
157                        something";
158
159        let mut buf = vec![0; expected.len()];
160
161        let req = Request::builder()
162            .method(Method::POST)
163            .uri("/cgi-bin/process.cgi")
164            .header(header::CONNECTION, "Keep-Alive")
165            .header(header::CONTENT_LENGTH, 9)
166            .body(Bytes::from_static(b"something"))
167            .unwrap();
168
169        let framed = HttpCodec::new().framed(fake);
170
171        let framed = framed.send(req).wait().unwrap();
172
173        let mut fake = framed.into_inner();
174
175        let rl = fake.read(&mut buf).unwrap();
176
177        assert_eq!(rl, expected.len());
178        assert_eq!(from_utf8(&buf).unwrap(), expected);
179    }
180}