essrpc 0.2.0

RPC using natural trait definitions and calls.
Documentation
use std::io::{Read, Write};

use serde::{Deserialize, Serialize};
use serde_json::json;
use serde_json::value::Value;
use uuid::Uuid;

use crate::{
    ClientTransport, MethodId, PartialMethodId, RPCError, RPCErrorKind, Result, ServerTransport,
};

pub struct JTXState {
    method: &'static str,
    params: Value,
}

pub struct JRXState {
    json: Value,
}

/// Transport implementation over JSON-RPC. Can be used over any
/// `Read+Write` channel (local socket, internet socket, pipe,
/// etc). Enable the "json_transport" feature to use this.
pub struct JSONTransport<C: Read + Write> {
    channel: C,
}

impl<C: Read + Write> JSONTransport<C> {
    pub fn new(channel: C) -> Self {
        JSONTransport { channel: channel }
    }

    /// Get the underlying read/write channel
    pub fn channel<'a>(&'a self) -> &'a C {
        &self.channel
    }

    // Deserialize a value from the channel
    fn from_channel<T>(&mut self) -> Result<T>
    where
        for<'de> T: serde::Deserialize<'de>,
    {
        read_value_from_json(Read::by_ref(&mut self.channel))
    }
}
impl<C: Read + Write> ClientTransport for JSONTransport<C> {
    type TXState = JTXState;
    type FinalState = ();

    fn tx_begin_call(&mut self, method: MethodId) -> Result<JTXState> {
        Ok(begin_call(method))
    }

    fn tx_add_param(
        &mut self,
        name: &'static str,
        value: impl Serialize,
        state: &mut JTXState,
    ) -> Result<()> {
        add_param(name, value, state)
    }

    fn tx_finalize(&mut self, state: JTXState) -> Result<()> {
        serde_json::to_writer(Write::by_ref(&mut self.channel), &value_for_state(&state))
            .map_err(convert_error)
    }

    fn rx_response<T>(&mut self, _state: ()) -> Result<T>
    where
        for<'de> T: Deserialize<'de>,
    {
        self.from_channel()
    }
}

fn convert_error(e: impl std::error::Error) -> RPCError {
    RPCError::with_cause(
        RPCErrorKind::SerializationError,
        "json serialization or deserialization failed",
        e,
    )
}

fn begin_call(method: MethodId) -> JTXState {
    JTXState {
        method: method.name,
        params: json!({}),
    }
}

fn value_for_state(state: &JTXState) -> serde_json::Value {
    json!({
        "jsonrpc": "2.0",
        "method": state.method,
        "params": state.params,
        "id": format!("{}", Uuid::new_v4())
    })
}

fn add_param(name: &'static str, value: impl Serialize, state: &mut JTXState) -> Result<()> {
    state.params.as_object_mut().unwrap().insert(
        name.to_string(),
        serde_json::to_value(value).map_err(convert_error)?,
    );
    Ok(())
}

fn read_value_from_json<T, R>(reader: R) -> Result<T>
where
    for<'de> T: serde::Deserialize<'de>,
    R: Read,
{
    let read = serde_json::de::IoRead::new(reader);
    let mut de = serde_json::de::Deserializer::new(read);
    serde::de::Deserialize::deserialize(&mut de).map_err(convert_error)
}

impl<C: Read + Write> ServerTransport for JSONTransport<C> {
    type RXState = JRXState;

    fn rx_begin_call(&mut self) -> Result<(PartialMethodId, JRXState)> {
        let value: Value = self.from_channel()?;
        let method = value
            .get("method")
            .ok_or(RPCError::new(
                RPCErrorKind::SerializationError,
                "json is not expected object",
            ))?
            .as_str()
            .ok_or(RPCError::new(
                RPCErrorKind::SerializationError,
                "json method was not string",
            ))?
            .to_string();
        Ok((PartialMethodId::Name(method), JRXState { json: value }))
    }

    fn rx_read_param<T>(&mut self, name: &'static str, state: &mut JRXState) -> Result<T>
    where
        for<'de> T: serde::Deserialize<'de>,
    {
        let param_val = state
            .json
            .get("params")
            .ok_or(RPCError::new(
                RPCErrorKind::SerializationError,
                "json is not expected object",
            ))?
            .get(name)
            .ok_or(RPCError::new(
                RPCErrorKind::SerializationError,
                format!("parameters do not contain {}", name),
            ))?;
        return serde_json::from_value(param_val.clone()).map_err(convert_error);
    }

    fn tx_response(&mut self, value: impl Serialize) -> Result<()> {
        serde_json::to_writer(Write::by_ref(&mut self.channel), &value).map_err(convert_error)
    }
}

#[cfg(feature = "async_client")]
mod async_client {
    use super::*;
    use crate::AsyncClientTransport;
    use futures::{future, Future};
    use std::ops::Deref;

    type FutureBytes = Box<Future<Item = Vec<u8>, Error = RPCError>>;

    /// Like JSONTransport except for use as AsyncClientTransport.
    pub struct JSONAsyncClientTransport<F>
    where
        F: Fn(Vec<u8>) -> FutureBytes,
    {
        transact: F,
    }

    impl<F> JSONAsyncClientTransport<F>
    where
        F: Fn(Vec<u8>) -> FutureBytes,
    {
        /// Create an AsyncJSONTransport. `transact` must be a
        /// function which given the raw bytes to transmit to the server,
        /// returns a future representing the raw bytes returned from the server.
        pub fn new(transact: F) -> Self {
            JSONAsyncClientTransport { transact }
        }
    }

    impl<F> AsyncClientTransport for JSONAsyncClientTransport<F>
    where
        F: Fn(Vec<u8>) -> FutureBytes,
    {
        type TXState = JTXState;
        type FinalState = FutureBytes;

        fn tx_begin_call(&mut self, method: MethodId) -> Result<JTXState> {
            Ok(begin_call(method))
        }

        fn tx_add_param(
            &mut self,
            name: &'static str,
            value: impl Serialize,
            state: &mut JTXState,
        ) -> Result<()> {
            add_param(name, value, state)
        }

        fn tx_finalize(&mut self, state: JTXState) -> Result<FutureBytes> {
            let j = serde_json::to_vec(&value_for_state(&state)).map_err(convert_error)?;
            Ok((self.transact)(j))
        }

        fn rx_response<T>(&mut self, state: FutureBytes) -> Box<Future<Item = T, Error = RPCError>>
        where
            for<'de> T: Deserialize<'de>,
            T: 'static,
        {
            Box::new(state.and_then(|data: Vec<u8>| {
                let ret = read_value_from_json(data.deref());
                match ret {
                    Ok(val) => future::result(val),
                    Err(e) => future::err(e),
                }
            }))
        }
    }
}

#[cfg(feature = "async_client")]
pub use self::async_client::JSONAsyncClientTransport;