jsonrpc-rs 0.1.6

Futures base jsonrpc server/client framework
Documentation
use futures::{SinkExt, TryStreamExt};

use crate::{
    channel::{RPCData, TransportChannel},
    map_error, Error, ErrorCode, RPCResult, Request, Response,
};

use super::handler::*;

pub struct ServiceSession<C: TransportChannel> {
    id: String,
    input: C::Input,
    output: C::Output,
    methods: HandlerClonerRegister<ServerHandler>,
    async_methods: HandlerClonerRegister<AsyncServerHandler>,
}

impl<C: TransportChannel> ServiceSession<C> {
    pub(crate) fn new(
        id: String,
        input: C::Input,
        output: C::Output,
        methods: HandlerClonerRegister<ServerHandler>,
        async_methods: HandlerClonerRegister<AsyncServerHandler>,
    ) -> Self {
        Self {
            id,
            input,
            output,
            methods,
            async_methods,
        }
    }
    pub async fn run(&mut self) -> RPCResult<()> {
        while let Some(next) = self.input.try_next().await.map_err(map_error)? {
            let request = serde_json::from_slice::<Request<&str, serde_json::Value>>(&next)?;

            if let Some(mut handler) = self.methods.clone_from(request.method) {
                self.handle_resp(
                    request.id,
                    request.method,
                    handler(request.id, request.params),
                )
                .await?;
            } else if let Some(mut handler) = self.async_methods.clone_from(request.method) {
                self.handle_resp(
                    request.id,
                    request.method,
                    handler(request.id, request.params).await,
                )
                .await?;
            }
        }

        log::info!("Server session {} stop.", self.id);

        Ok(())
    }

    async fn handle_resp(
        &mut self,
        id: Option<usize>,
        method: &str,
        result: RPCResult<Option<RPCData>>,
    ) -> RPCResult<()> {
        match result {
            Ok(Some(response)) => {
                self.output.send(response).await.map_err(map_error)?;
            }
            Err(code) => {
                if let Some(id) = id {
                    let resp = Self::new_error_resp(id, code.code, Some(code.message));
                    self.output.send(resp).await.map_err(map_error)?;
                } else {
                    log::trace!("Method {} call return error, {}", method, code);
                }
            }
            _ => {}
        }

        Ok(())
    }

    fn new_error_resp(id: usize, code: ErrorCode, message: Option<String>) -> RPCData {
        let response = Response::<String, (), ()> {
            id,
            error: Some(Error {
                code: code.clone(),
                message: message.unwrap_or(code.to_string()),
                data: None,
            }),
            ..Default::default()
        };

        serde_json::to_vec(&response)
            .expect("Inner error, serialize jsonrpc response")
            .into()
    }
}