1pub extern crate futures;
31pub extern crate tokio_core;
32
33pub extern crate url;
34
35#[macro_use]
36extern crate nom;
37
38use std::borrow::Cow;
39use std::fmt;
40use std::io::{self, Error, ErrorKind, Write};
41use std::net::{SocketAddr, ToSocketAddrs};
42
43use futures::{Future, Sink, Stream};
44
45use tokio_core::io::{EasyBuf, Codec, Framed, Io, IoFuture};
46
47pub mod prelude {
49 pub use tokio_core::io::Io;
50 pub use tokio_core::net::TcpStream;
51 pub use tokio_core::reactor::Core;
52
53 pub use futures::{Future, Sink, Stream, IntoFuture};
54 pub use futures::future::{BoxFuture, empty, err, lazy, ok, result};
55}
56
57use url::{Url, ParseError};
58
59use nom::IResult;
60
61mod parser;
62mod response;
63
64pub use response::{HttpResponse, Header};
65
66pub struct HttpRequest {
68 url: Url,
69 method: Method,
70 headers: Vec<(Cow<'static, str>, Cow<'static, str>)>,
71 body: Vec<u8>
72}
73
74pub enum Method {
76 Get,
77 Head,
78 Post,
79 Put,
80 Delete,
81 Connect,
82 Options,
83 Trace,
84 Other(String)
85}
86
87impl fmt::Display for Method {
88 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
89 use Method::*;
90 match *self {
91 Get => write!(f, "GET"),
92 Head => write!(f, "HEAD"),
93 Post => write!(f, "POST"),
94 Put => write!(f, "PUT"),
95 Delete => write!(f, "DELETE"),
96 Connect => write!(f, "CONNECT"),
97 Options => write!(f, "OPTIONS"),
98 Trace => write!(f, "TRACE"),
99 Other(ref other) => write!(f, "{}", other)
100 }
101 }
102}
103
104impl HttpRequest {
105 pub fn new<U: AsRef<str>>(method: Method, url: U) -> Result<HttpRequest, ParseError> {
107 url.as_ref().parse().map(|url: Url| {
108 use std::fmt::Write;
109
110 let mut host = url.host_str().unwrap_or("").to_string();
111 if let Some(port) = url.port() {
112 write!(host, ":{}", port).unwrap();
113 }
114
115 HttpRequest {
116 url: url,
117 method: method,
118 headers: vec![],
119 body: vec![]
120 }.header("Host", host)
121 })
122 }
123
124 pub fn header<K: Into<Cow<'static, str>>, V: Into<Cow<'static, str>>>(mut self, name: K, value: V) -> HttpRequest {
125 self.headers.push((name.into(), value.into()));
126 self
127 }
128
129 pub fn get<U: AsRef<str>>(url: U) -> Result<HttpRequest, ParseError> {
130 Self::new(Method::Get, url)
131 }
132
133 pub fn post<U: AsRef<str>, I: Into<Vec<u8>>>(url: U, body: I) -> Result<HttpRequest, ParseError> {
134 let bytes = body.into();
135 let mut req = Self::new(Method::Post, url)?.header("Content-Length", bytes.len().to_string());
136 req.body = bytes;
137 Ok(req)
138 }
139
140 pub fn addr(&self) -> Result<SocketAddr, Error> {
141 let mut addrs = self.url.to_socket_addrs()?;
142 addrs.next().ok_or(Error::new(ErrorKind::UnexpectedEof, "no address"))
143 }
144
145 pub fn send<T: 'static + Io + Send>(self, framed: Framed<T, HttpCodec>) -> IoFuture<(Option<HttpResponse>, Framed<T, HttpCodec>)> {
147 framed.send(self).and_then(|framed| {
148 framed.into_future().and_then(|(res, stream)| {
149 Ok((res, stream))
150 }).map_err(|(err, _stream)| err)
151 }).boxed()
152 }
153}
154
155impl fmt::Display for HttpRequest {
156 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
157 write!(f, "{} {}", self.method, self.url.path())?;
159 if let Some(query) = self.url.query() {
160 write!(f, "?{}", query)?;
161 }
162 if let Some(fragment) = self.url.fragment() {
163 write!(f, "#{}", fragment)?;
164 }
165 write!(f, " HTTP/1.1\r\n")?;
166
167 for &(ref name, ref value) in &self.headers {
169 write!(f, "{}: {}\r\n", name, value)?;
170 }
171 write!(f, "\r\n")
172 }
173}
174
175#[derive(Debug)]
177pub struct HttpCodec {
178 response: Option<HttpResponse>,
179 bytes_left: usize
180}
181
182impl HttpCodec {
183 pub fn new() -> HttpCodec {
185 HttpCodec {
186 response: None,
187 bytes_left: 0
188 }
189 }
190
191 fn decode_header(&mut self, buf: &mut EasyBuf) -> Result<Option<HttpResponse>, Error> {
192 let (bytes_left, response) = match parser::response(buf.as_ref()) {
193 IResult::Incomplete(_) => return Ok(None), IResult::Error(e) => return Err(Error::new(ErrorKind::InvalidData, e)),
195 IResult::Done(rest, response) => (rest.len(), response)
196 };
197
198 let after_header = buf.len() - bytes_left;
200 buf.drain_to(after_header);
201
202 if response.is_informational() || response.status() == 204 ||
204 response.status() == 304 {
205 assert!(bytes_left == 0);
206 return Ok(Some(response));
207 }
208
209 if response.has("Transfer-Encoding", "chunked") {
211 unimplemented!()
212 }
213
214 let length =
215 if let Some(ref length) = response["Content-Length"] {
216 Some(length.parse::<usize>().map_err(|e| Error::new(ErrorKind::InvalidData, e))?)
217 } else {
218 None
219 };
220
221 if let Some(length) = length {
222 self.response = Some(response);
223 self.bytes_left = length;
224 return self.decode(buf);
225 } else {
226 unimplemented!()
228 }
229 }
230}
231
232impl Codec for HttpCodec {
233 type In = HttpResponse;
234 type Out = HttpRequest;
235
236 fn decode(&mut self, buf: &mut EasyBuf) -> Result<Option<HttpResponse>, Error> {
237 if self.response.is_none() {
238 self.decode_header(buf)
239 } else {
240 let buf_len = buf.len();
241 println!("{} bytes left to read, got {} bytes", self.bytes_left, buf_len);
242 if buf_len > self.bytes_left {
243 Err(Error::new(ErrorKind::InvalidData, "extraneous data"))
244 } else {
245 self.response.as_mut().map(|res| response::append(res, buf.drain_to(buf_len).as_slice()));
246 if buf_len == self.bytes_left {
247 Ok(self.response.take())
248 } else {
249 self.bytes_left -= buf_len;
250 Ok(None) }
252 }
253 }
254 }
255
256 fn encode(&mut self, msg: HttpRequest, buf: &mut Vec<u8>) -> io::Result<()> {
257 write!(buf, "{}", msg)?;
258 buf.extend_from_slice(&msg.body);
259 Ok(())
260 }
261}
262
263#[cfg(test)]
264mod tests {
265 extern crate env_logger;
266
267 use std::io::{Error, ErrorKind};
269 use std::thread;
270 use std::time::Duration;
271
272 use super::prelude::*;
273 use super::futures::sync::mpsc;
274 use {HttpRequest, HttpCodec};
275
276 #[test]
277 fn channel() {
278 let string = "http://localhost:3000/post-test".to_string();
280 let req = HttpRequest::post(&string, vec![1, 2, 3, 4]).unwrap()
281 .header("Content-Type", "text/plain");
282
283 let mut core = Core::new().unwrap();
284 let addr = req.addr().unwrap();
285 let handle = core.handle();
286
287 let (mut sender, receiver) = mpsc::channel(1);
288
289 thread::spawn(|| {
290 for i in 0 .. 4 {
291 let url = "http://localhost:3000/post-test";
292 let elements = (0 .. (i + 1)).collect::<Vec<_>>();
293 let req = HttpRequest::post(url, elements).unwrap()
294 .header("Content-Type", "text/plain");
295 sender = sender.send(req).wait().unwrap();
296 thread::sleep(Duration::from_millis(100));
297 }
298 });
299
300
301 let _framed = core.run(TcpStream::connect(&addr, &handle).and_then(|connection| {
302 let framed = connection.framed(HttpCodec::new());
303 receiver.fold(framed, |framed, req| {
304 req.send(framed).and_then(|(res, framed)| {
305 println!("channel got response {}", res.unwrap());
306 Ok(framed)
307 }).map_err(|_| ())
308 }).map_err(|()| Error::new(ErrorKind::Other, "oops"))
309 })).unwrap();
310 }
311
312 #[test]
313 fn two_frames() {
314 let string = "http://localhost:3000/post-test".to_string();
316 let req = HttpRequest::post(&string, vec![1, 2, 3, 4, 5, 6]).unwrap()
317 .header("Content-Type", "text/plain");
318
319 let mut core = Core::new().unwrap();
320 let addr = req.addr().unwrap();
321 let handle = core.handle();
322 let (res, framed) = core.run(TcpStream::connect(&addr, &handle).and_then(|connection| {
323 let framed = connection.framed(HttpCodec::new());
324 req.send(framed)
325 })).unwrap();
326 println!("hello 1 {}", res.unwrap());
327
328 thread::sleep(Duration::from_secs(1));
329
330 let req = HttpRequest::get("http://localhost:3000/").unwrap();
332 let (res, _framed) = core.run(req.send(framed)).unwrap();
333 if let Some(res) = res {
334 println!("hello 2 {}", res);
335 assert!(res.is("Connection", "close"));
336 } else {
337 assert!(false);
338 }
339 }
340}