1use std::mem;
2
3use atoi::atoi;
4use bytes::{Bytes, BytesMut};
5use http::{header, response, Request, Response};
6use httparse;
7use tokio_codec::{Decoder, Encoder};
8
9use error;
10
11#[derive(Debug)]
12pub struct HttpCodec {
13 state: State,
14}
15
16impl HttpCodec {
17 pub fn new() -> HttpCodec {
18 HttpCodec {
19 state: State::ParsingResponse,
20 }
21 }
22}
23
24#[derive(Debug)]
25enum State {
26 ParsingResponse,
27 ReadingBody(response::Parts, usize),
28}
29
30impl Encoder for HttpCodec {
31 type Item = Request<Bytes>;
32 type Error = error::Error;
33
34 fn encode(&mut self, item: Request<Bytes>, dst: &mut BytesMut) -> error::Result<()> {
35 dst.extend_from_slice(item.method().as_str().as_bytes());
36 dst.extend_from_slice(b" ");
37 dst.extend_from_slice(item.uri().path().as_bytes());
38 dst.extend_from_slice(b" HTTP/1.1\r\n");
39
40 for (k, v) in item.headers() {
41 dst.extend_from_slice(k.as_str().as_bytes());
42 dst.extend_from_slice(b": ");
43 dst.extend_from_slice(v.as_bytes());
44 dst.extend_from_slice(b"\r\n");
45 }
46
47 dst.extend_from_slice(b"\r\n");
48 dst.extend_from_slice(item.body());
49
50 Ok(())
51 }
52}
53
54impl Decoder for HttpCodec {
55 type Item = Response<Bytes>;
56 type Error = error::Error;
57
58 fn decode(&mut self, src: &mut BytesMut) -> error::Result<Option<Response<Bytes>>> {
59 use self::State::*;
60
61 if src.len() == 0 {
62 return Ok(None);
63 }
64
65 loop {
66 match mem::replace(&mut self.state, ParsingResponse) {
67 ParsingResponse => {
68 let amt = {
69 let mut headers = [httparse::EMPTY_HEADER; 16];
70 let mut response = httparse::Response::new(&mut headers);
71 let amt = match response.parse(src)? {
72 httparse::Status::Complete(amt) => amt,
73 httparse::Status::Partial => return Ok(None),
74 };
75 match response.version.unwrap() {
76 1 => (),
77 version => return Err(error::Error::VersionError(version)),
78 }
79 let mut builder = Response::builder();
80 builder.status(response.code.unwrap());
81 for header in response.headers.iter() {
82 builder.header(header.name, header.value);
83 }
84 let r = builder.body(()).unwrap();
85 let cl = match r.headers().get(header::CONTENT_LENGTH) {
86 Some(cl) => match atoi(cl.as_bytes()) {
87 Some(cl) => cl,
88 None => return Err(error::Error::ContentLengthError),
89 },
90 None => return Err(error::Error::ContentLengthError),
91 };
92 let (parts, _) = r.into_parts();
93 self.state = ReadingBody(parts, cl);
94 amt
95 };
96 src.advance(amt);
97 }
98 ReadingBody(parts, cl) => {
99 if src.len() < cl {
100 self.state = ReadingBody(parts, cl);
101 return Ok(None);
102 }
103 let body = src.split_to(cl).freeze();
104 return Ok(Some(Response::from_parts(parts, body)));
105 }
106 }
107 }
108 }
109}
110
111#[cfg(test)]
112mod tests {
113 extern crate fake_stream;
114 extern crate futures;
115
116 use self::fake_stream::FakeStream;
117 use self::futures::{Async, Future, Sink, Stream};
118 use super::*;
119 use http::{header, Method};
120 use std::io::{Read, Write};
121 use std::str::from_utf8;
122
123 #[test]
124 fn test_decode() {
125 let mut fake = FakeStream::new();
126 let res = b"\
127 HTTP/1.1 200 OK\r\n\
128 Date: Mon, 27 Jul 2009 12:28:53 GMT\r\n\
129 Server: Apache/2.2.14 (Win32)\r\n\
130 Last-Modified: Wed, 22 Jul 2009 19:15:56 GMT\r\n\
131 Content-Length: 9\r\n\
132 Content-Type: text/html\r\n\r\n\
133 something";
134 let wl = fake.write(res).unwrap();
135
136 assert_eq!(res.len(), wl);
137
138 let mut framed = HttpCodec::new().framed(fake);
139
140 let response = match framed.poll().unwrap() {
141 Async::Ready(Some(response)) => response,
142 _ => panic!("no response"),
143 };
144
145 assert_eq!(response.status().as_u16(), 200);
146 assert_eq!(response.headers().len(), 5);
147 assert_eq!(response.body(), &Bytes::from_static(b"something"));
148 }
149
150 #[test]
151 fn test_encode() {
152 let fake = FakeStream::new();
153 let expected = "\
154 POST /cgi-bin/process.cgi HTTP/1.1\r\n\
155 connection: Keep-Alive\r\n\
156 content-length: 9\r\n\r\n\
157 something";
158
159 let mut buf = vec![0; expected.len()];
160
161 let req = Request::builder()
162 .method(Method::POST)
163 .uri("/cgi-bin/process.cgi")
164 .header(header::CONNECTION, "Keep-Alive")
165 .header(header::CONTENT_LENGTH, 9)
166 .body(Bytes::from_static(b"something"))
167 .unwrap();
168
169 let framed = HttpCodec::new().framed(fake);
170
171 let framed = framed.send(req).wait().unwrap();
172
173 let mut fake = framed.into_inner();
174
175 let rl = fake.read(&mut buf).unwrap();
176
177 assert_eq!(rl, expected.len());
178 assert_eq!(from_utf8(&buf).unwrap(), expected);
179 }
180}