#[cfg(unix)]
extern crate tokio_uds;
use std::collections::BTreeMap;
use std::io::{self, Read, Write};
use std::path::Path;
use std::sync::{atomic, Arc};
#[cfg(unix)]
use self::tokio_uds::UnixStream;
use api::SubscriptionId;
use futures::sync::{mpsc, oneshot};
use futures::{self, Future, Stream};
use helpers;
use parking_lot::Mutex;
use rpc;
use transports::Result;
use transports::shared::{EventLoopHandle, Response};
use transports::tokio_core::reactor;
use transports::tokio_io::AsyncRead;
use transports::tokio_io::io::{ReadHalf, WriteHalf};
use {BatchTransport, DuplexTransport, Error, ErrorKind, RequestId, Transport};
macro_rules! try_nb {
($e:expr) => (match $e {
Ok(t) => t,
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
return Ok(futures::Async::NotReady)
}
Err(e) => {
warn!("Unexpected IO error: {:?}", e);
return Err(())
},
})
}
type Pending = oneshot::Sender<Result<Vec<Result<rpc::Value>>>>;
type Subscription = mpsc::UnboundedSender<rpc::Value>;
pub type IpcTask<F> = Response<F, Vec<Result<rpc::Value>>>;
#[derive(Debug, Clone)]
pub struct Ipc {
id: Arc<atomic::AtomicUsize>,
pending: Arc<Mutex<BTreeMap<RequestId, Pending>>>,
subscriptions: Arc<Mutex<BTreeMap<SubscriptionId, Subscription>>>,
write_sender: mpsc::UnboundedSender<Vec<u8>>,
}
impl Ipc {
pub fn new<P>(path: P) -> Result<(EventLoopHandle, Self)>
where
P: AsRef<Path>,
{
let path = path.as_ref().to_owned();
EventLoopHandle::spawn(move |handle| Self::with_event_loop(&path, &handle).map_err(Into::into))
}
#[cfg(unix)]
pub fn with_event_loop<P>(path: P, handle: &reactor::Handle) -> Result<Self>
where
P: AsRef<Path>,
{
trace!("Connecting to: {:?}", path.as_ref());
let stream = UnixStream::connect(path, handle)?;
Self::with_stream(stream, handle)
}
#[cfg(unix)]
fn with_stream(stream: UnixStream, handle: &reactor::Handle) -> Result<Self> {
let (read, write) = stream.split();
let (write_sender, write_receiver) = mpsc::unbounded();
let pending: Arc<Mutex<BTreeMap<RequestId, Pending>>> = Default::default();
let subscriptions: Arc<Mutex<BTreeMap<SubscriptionId, Subscription>>> = Default::default();
let r = ReadStream {
read,
pending: pending.clone(),
subscriptions: subscriptions.clone(),
buffer: vec![],
current_pos: 0,
};
let w = WriteStream {
write,
incoming: write_receiver,
state: WriteState::WaitingForRequest,
};
handle.spawn(r);
handle.spawn(w);
Ok(Ipc {
id: Arc::new(atomic::AtomicUsize::new(1)),
write_sender,
pending,
subscriptions,
})
}
#[cfg(not(unix))]
pub fn with_event_loop<P>(_path: P, _handle: &reactor::Handle) -> Result<Self> {
return Err(ErrorKind::Transport("IPC transport is only supported on Unix".into()).into());
}
fn send_request<F, O>(&self, id: RequestId, request: rpc::Request, extract: F) -> IpcTask<F>
where
F: Fn(Vec<Result<rpc::Value>>) -> O,
{
let request = helpers::to_string(&request);
debug!("[{}] Calling: {}", id, request);
let (tx, rx) = futures::oneshot();
self.pending.lock().insert(id, tx);
let result = self.write_sender
.unbounded_send(request.into_bytes())
.map_err(|_| ErrorKind::Io(io::ErrorKind::BrokenPipe.into()).into());
Response::new(id, result, rx, extract)
}
}
impl Transport for Ipc {
type Out = IpcTask<fn(Vec<Result<rpc::Value>>) -> Result<rpc::Value>>;
fn prepare(&self, method: &str, params: Vec<rpc::Value>) -> (RequestId, rpc::Call) {
let id = self.id.fetch_add(1, atomic::Ordering::AcqRel);
let request = helpers::build_request(id, method, params);
(id, request)
}
fn send(&self, id: RequestId, request: rpc::Call) -> Self::Out {
self.send_request(id, rpc::Request::Single(request), single_response)
}
}
fn single_response(response: Vec<Result<rpc::Value>>) -> Result<rpc::Value> {
match response.into_iter().next() {
Some(res) => res,
None => Err(ErrorKind::InvalidResponse("Expected single, got batch.".into()).into()),
}
}
impl BatchTransport for Ipc {
type Batch = IpcTask<fn(Vec<Result<rpc::Value>>) -> Result<Vec<Result<rpc::Value>>>>;
fn send_batch<T>(&self, requests: T) -> Self::Batch
where
T: IntoIterator<Item = (RequestId, rpc::Call)>,
{
let mut it = requests.into_iter();
let (id, first) = it.next()
.map(|x| (x.0, Some(x.1)))
.unwrap_or_else(|| (0, None));
let requests = first.into_iter().chain(it.map(|x| x.1)).collect();
self.send_request(id, rpc::Request::Batch(requests), Ok)
}
}
impl DuplexTransport for Ipc {
type NotificationStream = Box<Stream<Item = rpc::Value, Error = Error> + Send + 'static>;
fn subscribe(&self, id: &SubscriptionId) -> Self::NotificationStream {
let (tx, rx) = mpsc::unbounded();
if self.subscriptions.lock().insert(id.clone(), tx).is_some() {
warn!("Replacing already-registered subscription with id {:?}", id)
}
Box::new(rx.map_err(|()| ErrorKind::Transport("No data available".into()).into()))
}
fn unsubscribe(&self, id: &SubscriptionId) {
self.subscriptions.lock().remove(id);
}
}
enum WriteState {
WaitingForRequest,
Writing { buffer: Vec<u8>, current_pos: usize },
}
#[cfg(unix)]
struct WriteStream {
write: WriteHalf<UnixStream>,
incoming: mpsc::UnboundedReceiver<Vec<u8>>,
state: WriteState,
}
#[cfg(unix)]
impl Future for WriteStream {
type Item = ();
type Error = ();
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
loop {
self.state = match self.state {
WriteState::WaitingForRequest => {
let to_send = try_ready!(self.incoming.poll());
if let Some(to_send) = to_send {
trace!(
"Got new message to write: {:?}",
String::from_utf8_lossy(&to_send)
);
WriteState::Writing {
buffer: to_send,
current_pos: 0,
}
} else {
return Ok(futures::Async::NotReady);
}
}
WriteState::Writing {
ref buffer,
ref mut current_pos,
} => {
while *current_pos < buffer.len() {
let n = try_nb!(self.write.write(&buffer[*current_pos..]));
*current_pos += n;
if n == 0 {
warn!("IO Error: Zero write.");
return Err(());
}
}
WriteState::WaitingForRequest
}
};
}
}
}
#[cfg(unix)]
struct ReadStream {
read: ReadHalf<UnixStream>,
pending: Arc<Mutex<BTreeMap<RequestId, Pending>>>,
subscriptions: Arc<Mutex<BTreeMap<SubscriptionId, Subscription>>>,
buffer: Vec<u8>,
current_pos: usize,
}
#[cfg(unix)]
impl Future for ReadStream {
type Item = ();
type Error = ();
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
const DEFAULT_BUF_SIZE: usize = 4096;
let mut new_write_size = 128;
loop {
if self.current_pos == self.buffer.len() {
if new_write_size < DEFAULT_BUF_SIZE {
new_write_size *= 2;
}
self.buffer.resize(self.current_pos + new_write_size, 0);
}
let read = try_nb!(self.read.read(&mut self.buffer[self.current_pos..]));
if read == 0 {
return Ok(futures::Async::NotReady);
}
let mut min = self.current_pos;
self.current_pos += read;
while let Some((response, len)) = Self::extract_response(&self.buffer[0..self.current_pos], min) {
self.respond(response);
for i in len..self.current_pos {
self.buffer.swap(i, i - len);
}
let new_len = self.current_pos - len;
self.buffer.truncate(new_len + new_write_size);
self.current_pos = new_len;
min = 0;
}
}
}
}
enum Message {
Rpc(Vec<rpc::Output>),
Notification(rpc::Notification),
}
#[cfg(unix)]
impl ReadStream {
fn respond(&self, response: Message) {
match response {
Message::Rpc(outputs) => {
let id = match outputs.get(0) {
Some(&rpc::Output::Success(ref success)) => success.id.clone(),
Some(&rpc::Output::Failure(ref failure)) => failure.id.clone(),
None => rpc::Id::Num(0),
};
if let rpc::Id::Num(num) = id {
if let Some(request) = self.pending.lock().remove(&(num as usize)) {
trace!("Responding to (id: {:?}) with {:?}", num, outputs);
if let Err(err) = request.send(helpers::to_results_from_outputs(outputs)) {
warn!("Sending a response to deallocated channel: {:?}", err);
}
} else {
warn!("Got response for unknown request (id: {:?})", num);
}
} else {
warn!("Got unsupported response (id: {:?})", id);
}
}
Message::Notification(notification) => {
if let Some(rpc::Params::Map(params)) = notification.params {
let id = params.get("subscription");
let result = params.get("result");
if let (Some(&rpc::Value::String(ref id)), Some(result)) = (id, result) {
let id: SubscriptionId = id.clone().into();
if let Some(stream) = self.subscriptions.lock().get(&id) {
if let Err(e) = stream.unbounded_send(result.clone()) {
error!("Error sending notification (id: {:?}): {:?}", id, e);
}
} else {
warn!("Got notification for unknown subscription (id: {:?})", id);
}
} else {
error!("Got unsupported notification (id: {:?})", id);
}
}
}
}
}
fn extract_response(buf: &[u8], min: usize) -> Option<(Message, usize)> {
for pos in (min..buf.len()).rev() {
if buf[pos] == b']' || buf[pos] == b'}' {
let pos = pos + 1;
match helpers::to_response_from_slice(&buf[0..pos]) {
Ok(rpc::Response::Single(output)) => return Some((Message::Rpc(vec![output]), pos)),
Ok(rpc::Response::Batch(outputs)) => return Some((Message::Rpc(outputs), pos)),
_ => {}
}
match helpers::to_notification_from_slice(&buf[0..pos]) {
Ok(notification) => return Some((Message::Notification(notification), pos)),
_ => {}
}
}
}
None
}
}
#[cfg(all(test, unix))]
mod tests {
extern crate tokio_core;
extern crate tokio_uds;
use std::io::{self, Read, Write};
use super::Ipc;
use futures::{self, Future};
use rpc;
use Transport;
#[test]
fn should_send_a_request() {
let mut eloop = tokio_core::reactor::Core::new().unwrap();
let handle = eloop.handle();
let (server, client) = tokio_uds::UnixStream::pair(&handle).unwrap();
let ipc = Ipc::with_stream(client, &handle).unwrap();
eloop.remote().spawn(move |_| {
struct Task {
server: tokio_uds::UnixStream,
}
impl Future for Task {
type Item = ();
type Error = ();
fn poll(&mut self) -> futures::Poll<(), ()> {
let mut data = [0; 2048];
let read = try_nb!(self.server.read(&mut data));
let request = String::from_utf8(data[0..read].to_vec()).unwrap();
assert_eq!(
&request,
r#"{"jsonrpc":"2.0","method":"eth_accounts","params":["1"],"id":1}"#
);
let response = r#"{"jsonrpc":"2.0","id":1,"result":"x"}"#;
self.server.write_all(response.as_bytes()).unwrap();
self.server.flush().unwrap();
Ok(futures::Async::Ready(()))
}
}
Task { server: server }
});
let res = ipc.execute("eth_accounts", vec![rpc::Value::String("1".into())]);
assert_eq!(eloop.run(res), Ok(rpc::Value::String("x".into())));
}
#[test]
fn should_handle_double_response() {
let mut eloop = tokio_core::reactor::Core::new().unwrap();
let handle = eloop.handle();
let (server, client) = tokio_uds::UnixStream::pair(&handle).unwrap();
let ipc = Ipc::with_stream(client, &handle).unwrap();
eloop.remote().spawn(move |_| {
struct Task {
server: tokio_uds::UnixStream,
}
impl Future for Task {
type Item = ();
type Error = ();
fn poll(&mut self) -> futures::Poll<(), ()> {
let mut data = [0; 2048];
let read = try_nb!(self.server.read(&mut data));
let request = String::from_utf8(data[0..read].to_vec()).unwrap();
assert_eq!(
&request,
r#"{"jsonrpc":"2.0","method":"eth_accounts","params":["1"],"id":1}{"jsonrpc":"2.0","method":"eth_accounts","params":["1"],"id":2}"#
);
let response = r#"{"jsonrpc":"2.0","id":1,"result":"x"}{"jsonrpc":"2.0","id":2,"result":"x"}"#;
self.server.write_all(response.as_bytes()).unwrap();
self.server.flush().unwrap();
Ok(futures::Async::Ready(()))
}
}
Task { server: server }
});
let res1 = ipc.execute("eth_accounts", vec![rpc::Value::String("1".into())]);
let res2 = ipc.execute("eth_accounts", vec![rpc::Value::String("1".into())]);
assert_eq!(
eloop.run(res1.join(res2)),
Ok((
rpc::Value::String("x".into()),
rpc::Value::String("x".into())
))
);
}
}