tokio_arangodb/
connection.rs

1use std::marker::PhantomData;
2use std::net::SocketAddr;
3use std::{io, mem};
4
5use bytes::Bytes;
6use futures::sink::Send;
7use futures::{Async, Future, Poll, Sink, Stream};
8use http::{header, Method, Request};
9use http_codec::client::HttpCodec;
10use serde::de::DeserializeOwned;
11use serde::ser::Serialize;
12use serde_json;
13use tokio_codec::{Decoder, Framed};
14use tokio_tcp::{ConnectFuture, TcpStream};
15
16use error::Error;
17
18pub struct Connection {
19    stream: Framed<TcpStream, HttpCodec>,
20}
21
22impl Connection {
23    pub fn connect(addr: &SocketAddr) -> ConnectionFuture {
24        ConnectionFuture {
25            inner: TcpStream::connect(addr),
26        }
27    }
28
29    pub fn run<'a, T, U>(
30        self,
31        body: &T,
32        auth: &'a str,
33        path: &'a str,
34        method: Method,
35    ) -> Result<ResponseFuture<U>, Error>
36    where
37        T: Serialize,
38        U: DeserializeOwned,
39    {
40        let body = serde_json::to_vec(body)?;
41        let req = Request::builder()
42            .uri(path)
43            .method(method)
44            .header(header::AUTHORIZATION, auth)
45            .header(header::CONTENT_LENGTH, body.len())
46            .body(Bytes::from(body))
47            .unwrap();
48        let fut = ResponseFuture {
49            state: State::Sending(self.stream.send(req)),
50            phantom: PhantomData,
51        };
52        Ok(fut)
53    }
54}
55
56pub struct ConnectionFuture {
57    inner: ConnectFuture,
58}
59
60impl Future for ConnectionFuture {
61    type Item = Connection;
62    type Error = Error;
63
64    fn poll(&mut self) -> Poll<Connection, Error> {
65        let stream = try_ready!(self.inner.poll());
66        Ok(Async::Ready(Connection {
67            stream: HttpCodec::new().framed(stream),
68        }))
69    }
70}
71
72enum State {
73    Sending(Send<Framed<TcpStream, HttpCodec>>),
74    Receiving(Framed<TcpStream, HttpCodec>),
75    Complete,
76}
77
78pub struct ResponseFuture<T: DeserializeOwned> {
79    state: State,
80    phantom: PhantomData<T>,
81}
82
83impl<T: DeserializeOwned> Future for ResponseFuture<T> {
84    type Item = (Connection, T);
85    type Error = Error;
86
87    fn poll(&mut self) -> Poll<(Connection, T), Error> {
88        use self::State::*;
89
90        loop {
91            match mem::replace(&mut self.state, Complete) {
92                Sending(mut fut) => {
93                    let stream = try_ready!(fut.poll());
94                    self.state = Receiving(stream);
95                }
96                Receiving(mut stream) => match stream.poll()? {
97                    Async::Ready(Some(res)) => {
98                        let status = res.status();
99                        if !status.is_success() {
100                            return Err(Error::StatusCode(status.as_u16()));
101                        }
102                        let r = serde_json::from_slice(&res.into_body())?;
103                        let conn = Connection { stream };
104                        return Ok(Async::Ready((conn, r)));
105                    }
106                    Async::Ready(None) => {
107                        return Err(Error::IoError(io::Error::new(
108                            io::ErrorKind::Other,
109                            "arangodb server closed connection before responding",
110                        )));
111                    }
112                    Async::NotReady => {
113                        self.state = Receiving(stream);
114                        return Ok(Async::NotReady);
115                    }
116                },
117                Complete => unreachable!(),
118            }
119        }
120    }
121}