#![deny(missing_docs)]
#[macro_use]
pub extern crate error_chain;
extern crate futures;
extern crate hyper;
extern crate jsonrpc_client_core;
#[macro_use]
extern crate log;
extern crate tokio_core;
#[cfg(feature = "tls")]
extern crate hyper_tls;
#[cfg(feature = "tls")]
extern crate native_tls;
use futures::future::{self, Either, Select2};
use futures::sync::{mpsc, oneshot};
use futures::{Async, Future, Poll, Stream};
pub use hyper::header;
use hyper::{Client, Request, StatusCode, Uri};
use jsonrpc_client_core::Transport;
use std::str::FromStr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
pub use tokio_core::reactor::Handle;
use tokio_core::reactor::{Core, Timeout};
mod client_creator;
pub use client_creator::*;
error_chain! {
errors {
ClientCreatorError {
description("Failed to create the Hyper Client")
}
HttpError(http_code: StatusCode) {
description("Http error. Server did not return 200 OK")
display("Http error. Status code {}", http_code)
}
RequestTimeout {
description("Timeout while waiting for a request")
}
TokioCoreError(msg: &'static str) {
description("Error with the Tokio Core")
display("Error with the Tokio Core: {}", msg)
}
}
foreign_links {
Hyper(hyper::Error) #[doc = "An error occured in Hyper."];
Uri(hyper::error::UriError) #[doc = "The string given was not a valid URI."];
}
}
type CoreSender = mpsc::UnboundedSender<(Request, oneshot::Sender<Result<Vec<u8>>>)>;
type CoreReceiver = mpsc::UnboundedReceiver<(Request, oneshot::Sender<Result<Vec<u8>>>)>;
#[derive(Debug, Clone)]
pub struct HttpTransport {
request_tx: CoreSender,
id: Arc<AtomicUsize>,
}
impl HttpTransport {
pub fn new() -> HttpTransportBuilder<DefaultClient> {
HttpTransportBuilder::with_client(DefaultClient)
}
#[cfg(feature = "tls")]
pub fn with_tls() -> HttpTransportBuilder<DefaultTlsClient> {
HttpTransportBuilder::with_client(DefaultTlsClient)
}
pub fn handle(&self, uri: &str) -> Result<HttpHandle> {
let uri = Uri::from_str(uri)?;
Ok(HttpHandle {
request_tx: self.request_tx.clone(),
uri,
id: self.id.clone(),
headers: header::Headers::new(),
})
}
}
pub struct HttpTransportBuilder<C: ClientCreator> {
client_creator: C,
timeout: Option<Duration>,
}
impl<C: ClientCreator> HttpTransportBuilder<C> {
pub fn with_client(client_creator: C) -> HttpTransportBuilder<C> {
HttpTransportBuilder {
client_creator,
timeout: None,
}
}
pub fn timeout(mut self, duration: Duration) -> Self {
self.timeout = Some(duration);
self
}
pub fn standalone(self) -> Result<HttpTransport> {
let (tx, rx) = ::std::sync::mpsc::channel();
thread::spawn(
move || match create_standalone_core(self.client_creator, self.timeout) {
Err(e) => {
tx.send(Err(e)).unwrap();
}
Ok((mut core, request_tx, future)) => {
tx.send(Ok(Self::build(request_tx))).unwrap();
if let Err(_) = core.run(future) {
error!("JSON-RPC processing thread had an error");
}
debug!("Standalone HttpTransport thread exiting");
}
},
);
rx.recv().unwrap()
}
pub fn shared(self, handle: &Handle) -> Result<HttpTransport> {
let client = self
.client_creator
.create(handle)
.chain_err(|| ErrorKind::ClientCreatorError)?;
let (request_tx, request_rx) = mpsc::unbounded();
handle.spawn(create_request_processing_future(
request_rx,
client,
self.timeout,
handle.clone(),
));
Ok(Self::build(request_tx))
}
fn build(request_tx: CoreSender) -> HttpTransport {
HttpTransport {
request_tx,
id: Arc::new(AtomicUsize::new(1)),
}
}
}
#[derive(Debug)]
enum TimeLimited<F: Future> {
Limited(Select2<F, Timeout>),
Unlimited(F),
}
impl<F: Future> TimeLimited<F> {
pub fn new(future: F, optional_time_limit: Option<Duration>, handle: &Handle) -> Self {
match optional_time_limit {
Some(time_limit) => Self::limited(future, time_limit, handle),
None => TimeLimited::Unlimited(future),
}
}
pub fn limited(future: F, time_limit: Duration, handle: &Handle) -> Self {
let timeout =
Timeout::new(time_limit, handle).expect("failure to create Timeout for TimeLimited");
TimeLimited::Limited(future.select2(timeout))
}
}
impl<F: Future<Error = Error>> Future for TimeLimited<F> {
type Item = F::Item;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match *self {
TimeLimited::Unlimited(ref mut future) => future.poll(),
TimeLimited::Limited(ref mut future) => match future.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(Either::A((result, _)))) => Ok(Async::Ready(result)),
Ok(Async::Ready(Either::B(((), _)))) => Err(ErrorKind::RequestTimeout.into()),
Err(Either::A((error, _))) => Err(error),
Err(Either::B((error, _))) => Err(error).chain_err(|| ErrorKind::RequestTimeout),
},
}
}
}
fn create_standalone_core<C: ClientCreator>(
client_creator: C,
timeout: Option<Duration>,
) -> Result<(Core, CoreSender, Box<Future<Item = (), Error = ()>>)> {
let core = Core::new().chain_err(|| ErrorKind::TokioCoreError("Unable to create"))?;
let handle = core.handle();
let client = client_creator
.create(&handle)
.chain_err(|| ErrorKind::ClientCreatorError)?;
let (request_tx, request_rx) = mpsc::unbounded();
let future = create_request_processing_future(request_rx, client, timeout, handle);
Ok((core, request_tx, future))
}
fn create_request_processing_future<CC: hyper::client::Connect>(
request_rx: CoreReceiver,
client: Client<CC, hyper::Body>,
timeout: Option<Duration>,
handle: Handle,
) -> Box<Future<Item = (), Error = ()>> {
let f = request_rx.for_each(move |(request, response_tx)| {
trace!("Sending request to {}", request.uri());
let request = client.request(request).from_err();
TimeLimited::new(request, timeout, &handle)
.and_then(|response: hyper::Response| {
if response.status() == hyper::StatusCode::Ok {
future::ok(response)
} else {
future::err(ErrorKind::HttpError(response.status()).into())
}
})
.and_then(|response: hyper::Response| response.body().concat2().from_err())
.map(|response_chunk| response_chunk.to_vec())
.then(move |response_result| {
if let Err(_) = response_tx.send(response_result) {
warn!("Unable to send response back to caller");
}
Ok(())
})
});
Box::new(f) as Box<Future<Item = (), Error = ()>>
}
#[derive(Debug, Clone)]
pub struct HttpHandle {
request_tx: CoreSender,
uri: Uri,
id: Arc<AtomicUsize>,
headers: header::Headers,
}
impl HttpHandle {
pub fn set_header<H: header::Header>(&mut self, header: H) -> &mut Self {
self.headers.set(header);
self
}
fn create_request(&self, body: Vec<u8>) -> Request {
let mut request = hyper::Request::new(hyper::Method::Post, self.uri.clone());
{
let headers = request.headers_mut();
headers.set(hyper::header::ContentType::json());
headers.set(hyper::header::ContentLength(body.len() as u64));
headers.extend(self.headers.iter());
}
request.set_body(body);
request
}
}
impl Transport for HttpHandle {
type Future = Box<Future<Item = Vec<u8>, Error = Self::Error> + Send>;
type Error = Error;
fn get_next_id(&mut self) -> u64 {
self.id.fetch_add(1, Ordering::SeqCst) as u64
}
fn send(&self, json_data: Vec<u8>) -> Self::Future {
let request = self.create_request(json_data);
let (response_tx, response_rx) = oneshot::channel();
let future = future::result(self.request_tx.unbounded_send((request, response_tx)))
.map_err(|e| {
Error::with_chain(e, ErrorKind::TokioCoreError("Not listening for requests"))
})
.and_then(move |_| {
response_rx.map_err(|e| {
Error::with_chain(
e,
ErrorKind::TokioCoreError("Died without returning response"),
)
})
})
.and_then(future::result);
Box::new(future)
}
}
#[cfg(test)]
mod tests {
use super::*;
use hyper::client::HttpConnector;
use std::io;
#[test]
fn new_shared() {
let core = Core::new().unwrap();
HttpTransport::new().shared(&core.handle()).unwrap();
}
#[test]
fn new_standalone() {
HttpTransport::new().standalone().unwrap();
}
#[test]
fn new_custom_client() {
HttpTransportBuilder::with_client(|handle: &Handle| {
Ok(Client::configure().keep_alive(false).build(handle)) as Result<_>
}).standalone()
.unwrap();
}
#[test]
fn failing_client_creator() {
let error = HttpTransportBuilder::with_client(|_: &Handle| {
Err(io::Error::new(io::ErrorKind::Other, "Dummy error"))
as ::std::result::Result<Client<HttpConnector, hyper::Body>, io::Error>
}).standalone()
.unwrap_err();
match error.kind() {
&ErrorKind::ClientCreatorError => (),
kind => panic!("invalid error kind response: {:?}", kind),
}
}
}