async_http_client/
lib.rs

1//! Asynchronous HTTP client.
2//!
3//! ## Installation
4//!
5//! Add this to your `Cargo.toml`:
6//!
7//! ```toml
8//! [dependencies]
9//! async-http-client = "0.2"
10//! ```
11//! ## Example
12//!
13//! ```no-run
14//! extern crate async_http_client;
15//!
16//! use async_http_client::prelude::*;
17//! use async_http_client::{HttpRequest, HttpCodec};
18//!
19//! let req = HttpRequest::get("http://www.google.com").unwrap();
20//! let mut core = Core::new().unwrap();
21//! let addr = req.addr().unwrap();
22//! let handle = core.handle();
23//! let (res, framed) = core.run(TcpStream::connect(&addr, &handle).and_then(|connection| {
24//!     let framed = connection.framed(HttpCodec::new());
25//!     req.send(framed)
26//! })).unwrap();
27//! println!("got response {}", res.unwrap());
28//! ```
29
30pub extern crate futures;
31pub extern crate tokio_core;
32
33pub extern crate url;
34
35#[macro_use]
36extern crate nom;
37
38use std::borrow::Cow;
39use std::fmt;
40use std::io::{self, Error, ErrorKind, Write};
41use std::net::{SocketAddr, ToSocketAddrs};
42
43use futures::{Future, Sink, Stream};
44
45use tokio_core::io::{EasyBuf, Codec, Framed, Io, IoFuture};
46
47/// Commonly needed reexports from futures and tokio-core.
48pub mod prelude {
49    pub use tokio_core::io::Io;
50    pub use tokio_core::net::TcpStream;
51    pub use tokio_core::reactor::Core;
52
53    pub use futures::{Future, Sink, Stream, IntoFuture};
54    pub use futures::future::{BoxFuture, empty, err, lazy, ok, result};
55}
56
57use url::{Url, ParseError};
58
59use nom::IResult;
60
61mod parser;
62mod response;
63
64pub use response::{HttpResponse, Header};
65
66/// Representation of an HTTP request.
67pub struct HttpRequest {
68    url: Url,
69    method: Method,
70    headers: Vec<(Cow<'static, str>, Cow<'static, str>)>,
71    body: Vec<u8>
72}
73
74/// Representation of an HTTP method.
75pub enum Method {
76    Get,
77    Head,
78    Post,
79    Put,
80    Delete,
81    Connect,
82    Options,
83    Trace,
84    Other(String)
85}
86
87impl fmt::Display for Method {
88    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
89        use Method::*;
90        match *self {
91            Get => write!(f, "GET"),
92            Head => write!(f, "HEAD"),
93            Post => write!(f, "POST"),
94            Put => write!(f, "PUT"),
95            Delete => write!(f, "DELETE"),
96            Connect => write!(f, "CONNECT"),
97            Options => write!(f, "OPTIONS"),
98            Trace => write!(f, "TRACE"),
99            Other(ref other) => write!(f, "{}", other)
100        }
101    }
102}
103
104impl HttpRequest {
105    /// Creates a new HTTP request.
106    pub fn new<U: AsRef<str>>(method: Method, url: U) -> Result<HttpRequest, ParseError> {
107        url.as_ref().parse().map(|url: Url| {
108            use std::fmt::Write;
109
110            let mut host = url.host_str().unwrap_or("").to_string();
111            if let Some(port) = url.port() {
112                write!(host, ":{}", port).unwrap();
113            }
114
115            HttpRequest {
116                url: url,
117                method: method,
118                headers: vec![],
119                body: vec![]
120            }.header("Host", host)
121        })
122    }
123
124    pub fn header<K: Into<Cow<'static, str>>, V: Into<Cow<'static, str>>>(mut self, name: K, value: V) -> HttpRequest {
125        self.headers.push((name.into(), value.into()));
126        self
127    }
128
129    pub fn get<U: AsRef<str>>(url: U) -> Result<HttpRequest, ParseError> {
130        Self::new(Method::Get, url)
131    }
132
133    pub fn post<U: AsRef<str>, I: Into<Vec<u8>>>(url: U, body: I) -> Result<HttpRequest, ParseError> {
134        let bytes = body.into();
135        let mut req = Self::new(Method::Post, url)?.header("Content-Length", bytes.len().to_string());
136        req.body = bytes;
137        Ok(req)
138    }
139
140    pub fn addr(&self) -> Result<SocketAddr, Error> {
141        let mut addrs = self.url.to_socket_addrs()?;
142        addrs.next().ok_or(Error::new(ErrorKind::UnexpectedEof, "no address"))
143    }
144
145    /// Returns a future that, given a framed, will resolve to a tuple (response?, framed).
146    pub fn send<T: 'static + Io + Send>(self, framed: Framed<T, HttpCodec>) -> IoFuture<(Option<HttpResponse>, Framed<T, HttpCodec>)> {
147        framed.send(self).and_then(|framed| {
148            framed.into_future().and_then(|(res, stream)| {
149                Ok((res, stream))
150            }).map_err(|(err, _stream)| err)
151        }).boxed()
152    }
153}
154
155impl fmt::Display for HttpRequest {
156    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
157        // request line
158        write!(f, "{} {}", self.method, self.url.path())?;
159        if let Some(query) = self.url.query() {
160            write!(f, "?{}", query)?;
161        }
162        if let Some(fragment) = self.url.fragment() {
163            write!(f, "#{}", fragment)?;
164        }
165        write!(f, " HTTP/1.1\r\n")?;
166
167        // headers
168        for &(ref name, ref value) in &self.headers {
169            write!(f, "{}: {}\r\n", name, value)?;
170        }
171        write!(f, "\r\n")
172    }
173}
174
175/// Codec that parses HTTP responses.
176#[derive(Debug)]
177pub struct HttpCodec {
178    response: Option<HttpResponse>,
179    bytes_left: usize
180}
181
182impl HttpCodec {
183    /// Creates a new HTTP codec.
184    pub fn new() -> HttpCodec {
185        HttpCodec {
186            response: None,
187            bytes_left: 0
188        }
189    }
190
191    fn decode_header(&mut self, buf: &mut EasyBuf) -> Result<Option<HttpResponse>, Error> {
192        let (bytes_left, response) = match parser::response(buf.as_ref()) {
193            IResult::Incomplete(_) => return Ok(None), // not enough data
194            IResult::Error(e) => return Err(Error::new(ErrorKind::InvalidData, e)),
195            IResult::Done(rest, response) => (rest.len(), response)
196        };
197
198        // eat parsed bytes
199        let after_header = buf.len() - bytes_left;
200        buf.drain_to(after_header);
201
202        // no content
203        if response.is_informational() || response.status() == 204 ||
204            response.status() == 304 {
205            assert!(bytes_left == 0);
206            return Ok(Some(response));
207        }
208
209        // chunked
210        if response.has("Transfer-Encoding", "chunked") {
211            unimplemented!()
212        }
213
214        let length =
215            if let Some(ref length) = response["Content-Length"] {
216                Some(length.parse::<usize>().map_err(|e| Error::new(ErrorKind::InvalidData, e))?)
217            } else {
218                None
219            };
220
221        if let Some(length) = length {
222            self.response = Some(response);
223            self.bytes_left = length;
224            return self.decode(buf);
225        } else {
226            // legacy HTTP/1.0 mode (close connection)
227            unimplemented!()
228        }
229    }
230}
231
232impl Codec for HttpCodec {
233    type In = HttpResponse;
234    type Out = HttpRequest;
235
236    fn decode(&mut self, buf: &mut EasyBuf) -> Result<Option<HttpResponse>, Error> {
237        if self.response.is_none() {
238            self.decode_header(buf)
239        } else {
240            let buf_len = buf.len();
241            println!("{} bytes left to read, got {} bytes", self.bytes_left, buf_len);
242            if buf_len > self.bytes_left {
243                Err(Error::new(ErrorKind::InvalidData, "extraneous data"))
244            } else {
245                self.response.as_mut().map(|res| response::append(res, buf.drain_to(buf_len).as_slice()));
246                if buf_len == self.bytes_left {
247                    Ok(self.response.take())
248                } else {
249                    self.bytes_left -= buf_len;
250                    Ok(None) // not enough data
251                }
252            }
253        }
254    }
255
256    fn encode(&mut self, msg: HttpRequest, buf: &mut Vec<u8>) -> io::Result<()> {
257        write!(buf, "{}", msg)?;
258        buf.extend_from_slice(&msg.body);
259        Ok(())
260    }
261}
262
263#[cfg(test)]
264mod tests {
265    extern crate env_logger;
266
267    //use std::env;
268    use std::io::{Error, ErrorKind};
269    use std::thread;
270    use std::time::Duration;
271
272    use super::prelude::*;
273    use super::futures::sync::mpsc;
274    use {HttpRequest, HttpCodec};
275
276    #[test]
277    fn channel() {
278        // Create the event loop that will drive this server
279        let string = "http://localhost:3000/post-test".to_string();
280        let req = HttpRequest::post(&string, vec![1, 2, 3, 4]).unwrap()
281            .header("Content-Type", "text/plain");
282
283        let mut core = Core::new().unwrap();
284        let addr = req.addr().unwrap();
285        let handle = core.handle();
286
287        let (mut sender, receiver) = mpsc::channel(1);
288
289        thread::spawn(|| {
290            for i in 0 .. 4 {
291                let url = "http://localhost:3000/post-test";
292                let elements = (0 .. (i + 1)).collect::<Vec<_>>();
293                let req = HttpRequest::post(url, elements).unwrap()
294                    .header("Content-Type", "text/plain");
295                sender = sender.send(req).wait().unwrap();
296                thread::sleep(Duration::from_millis(100));
297            }
298        });
299
300
301        let _framed = core.run(TcpStream::connect(&addr, &handle).and_then(|connection| {
302            let framed = connection.framed(HttpCodec::new());
303            receiver.fold(framed, |framed, req| {
304                req.send(framed).and_then(|(res, framed)| {
305                    println!("channel got response {}", res.unwrap());
306                    Ok(framed)
307                }).map_err(|_| ())
308            }).map_err(|()| Error::new(ErrorKind::Other, "oops"))
309        })).unwrap();
310    }
311
312    #[test]
313    fn two_frames() {
314        // Create the event loop that will drive this server
315        let string = "http://localhost:3000/post-test".to_string();
316        let req = HttpRequest::post(&string, vec![1, 2, 3, 4, 5, 6]).unwrap()
317            .header("Content-Type", "text/plain");
318
319        let mut core = Core::new().unwrap();
320        let addr = req.addr().unwrap();
321        let handle = core.handle();
322        let (res, framed) = core.run(TcpStream::connect(&addr, &handle).and_then(|connection| {
323            let framed = connection.framed(HttpCodec::new());
324            req.send(framed)
325        })).unwrap();
326        println!("hello 1 {}", res.unwrap());
327
328        thread::sleep(Duration::from_secs(1));
329
330        // should receive a response and then close the connection
331        let req = HttpRequest::get("http://localhost:3000/").unwrap();
332        let (res, _framed) = core.run(req.send(framed)).unwrap();
333        if let Some(res) = res {
334            println!("hello 2 {}", res);
335            assert!(res.is("Connection", "close"));
336        } else {
337            assert!(false);
338        }
339    }
340}