tokio_arangodb/
connection.rs1use std::marker::PhantomData;
2use std::net::SocketAddr;
3use std::{io, mem};
4
5use bytes::Bytes;
6use futures::sink::Send;
7use futures::{Async, Future, Poll, Sink, Stream};
8use http::{header, Method, Request};
9use http_codec::client::HttpCodec;
10use serde::de::DeserializeOwned;
11use serde::ser::Serialize;
12use serde_json;
13use tokio_codec::{Decoder, Framed};
14use tokio_tcp::{ConnectFuture, TcpStream};
15
16use error::Error;
17
18pub struct Connection {
19 stream: Framed<TcpStream, HttpCodec>,
20}
21
22impl Connection {
23 pub fn connect(addr: &SocketAddr) -> ConnectionFuture {
24 ConnectionFuture {
25 inner: TcpStream::connect(addr),
26 }
27 }
28
29 pub fn run<'a, T, U>(
30 self,
31 body: &T,
32 auth: &'a str,
33 path: &'a str,
34 method: Method,
35 ) -> Result<ResponseFuture<U>, Error>
36 where
37 T: Serialize,
38 U: DeserializeOwned,
39 {
40 let body = serde_json::to_vec(body)?;
41 let req = Request::builder()
42 .uri(path)
43 .method(method)
44 .header(header::AUTHORIZATION, auth)
45 .header(header::CONTENT_LENGTH, body.len())
46 .body(Bytes::from(body))
47 .unwrap();
48 let fut = ResponseFuture {
49 state: State::Sending(self.stream.send(req)),
50 phantom: PhantomData,
51 };
52 Ok(fut)
53 }
54}
55
56pub struct ConnectionFuture {
57 inner: ConnectFuture,
58}
59
60impl Future for ConnectionFuture {
61 type Item = Connection;
62 type Error = Error;
63
64 fn poll(&mut self) -> Poll<Connection, Error> {
65 let stream = try_ready!(self.inner.poll());
66 Ok(Async::Ready(Connection {
67 stream: HttpCodec::new().framed(stream),
68 }))
69 }
70}
71
72enum State {
73 Sending(Send<Framed<TcpStream, HttpCodec>>),
74 Receiving(Framed<TcpStream, HttpCodec>),
75 Complete,
76}
77
78pub struct ResponseFuture<T: DeserializeOwned> {
79 state: State,
80 phantom: PhantomData<T>,
81}
82
83impl<T: DeserializeOwned> Future for ResponseFuture<T> {
84 type Item = (Connection, T);
85 type Error = Error;
86
87 fn poll(&mut self) -> Poll<(Connection, T), Error> {
88 use self::State::*;
89
90 loop {
91 match mem::replace(&mut self.state, Complete) {
92 Sending(mut fut) => {
93 let stream = try_ready!(fut.poll());
94 self.state = Receiving(stream);
95 }
96 Receiving(mut stream) => match stream.poll()? {
97 Async::Ready(Some(res)) => {
98 let status = res.status();
99 if !status.is_success() {
100 return Err(Error::StatusCode(status.as_u16()));
101 }
102 let r = serde_json::from_slice(&res.into_body())?;
103 let conn = Connection { stream };
104 return Ok(Async::Ready((conn, r)));
105 }
106 Async::Ready(None) => {
107 return Err(Error::IoError(io::Error::new(
108 io::ErrorKind::Other,
109 "arangodb server closed connection before responding",
110 )));
111 }
112 Async::NotReady => {
113 self.state = Receiving(stream);
114 return Ok(Async::NotReady);
115 }
116 },
117 Complete => unreachable!(),
118 }
119 }
120 }
121}