1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
use std::marker::PhantomData; use std::net::SocketAddr; use std::{io, mem}; use bytes::Bytes; use futures::sink::Send; use futures::{Async, Future, Poll, Sink, Stream}; use http::{header, Method, Request}; use http_codec::client::HttpCodec; use serde::de::DeserializeOwned; use serde::ser::Serialize; use serde_json; use tokio_codec::{Decoder, Framed}; use tokio_tcp::{ConnectFuture, TcpStream}; use error::Error; pub struct Connection { stream: Framed<TcpStream, HttpCodec>, } impl Connection { pub fn connect(addr: &SocketAddr) -> ConnectionFuture { ConnectionFuture { inner: TcpStream::connect(addr), } } pub fn run<T, U>( self, query: &T, auth: &'static str, path: &'static str, method: Method, ) -> Result<ResponseFuture<U>, Error> where T: Serialize, U: DeserializeOwned, { let body = serde_json::to_vec(query)?; let req = Request::builder() .uri(path) .method(method) .header(header::AUTHORIZATION, auth) .header(header::CONTENT_LENGTH, body.len()) .body(Bytes::from(body)) .unwrap(); let fut = ResponseFuture { state: State::Sending(self.stream.send(req)), phantom: PhantomData, }; Ok(fut) } } pub struct ConnectionFuture { inner: ConnectFuture, } impl Future for ConnectionFuture { type Item = Connection; type Error = Error; fn poll(&mut self) -> Poll<Connection, Error> { let stream = try_ready!(self.inner.poll()); Ok(Async::Ready(Connection { stream: HttpCodec::new().framed(stream), })) } } enum State { Sending(Send<Framed<TcpStream, HttpCodec>>), Receiving(Framed<TcpStream, HttpCodec>), Complete, } pub struct ResponseFuture<T: DeserializeOwned> { state: State, phantom: PhantomData<T>, } impl<T: DeserializeOwned> Future for ResponseFuture<T> { type Item = (Connection, T); type Error = Error; fn poll(&mut self) -> Poll<(Connection, T), Error> { use self::State::*; loop { match mem::replace(&mut self.state, Complete) { Sending(mut fut) => { let stream = try_ready!(fut.poll()); self.state = Receiving(stream); } Receiving(mut stream) => match stream.poll()? { Async::Ready(Some(res)) => { let status = res.status(); if !status.is_success() { return Err(Error::StatusCode(status.as_u16())); } let r = serde_json::from_slice(&res.into_body())?; let conn = Connection { stream }; return Ok(Async::Ready((conn, r))); } Async::Ready(None) => { return Err(Error::IoError(io::Error::new( io::ErrorKind::Other, "arangodb server closed connection before responding", ))); } Async::NotReady => { self.state = Receiving(stream); return Ok(Async::NotReady); } }, Complete => unreachable!(), } } } }