jsonrpc-rs 0.1.6

Futures base jsonrpc server/client framework
Documentation
use futures::TryStreamExt;

use crate::{channel::TransportChannel, map_error, RPCResult, Response};

use super::user_event::RPCCompletedQ;

pub async fn recv_loop<C: TransportChannel, S: AsRef<str>>(
    client_id: S,
    mut input: C::Input,
    completed_q: RPCCompletedQ,
) -> RPCResult<()> {
    loop {
        let data = match input.try_next().await {
            Ok(Some(data)) => data,
            Err(err) => {
                log::error!("Error raise from input stream {}", err);
                completed_q.cancel_all();
                break;
            }
            _ => {
                break;
            }
        };

        let response =
            serde_json::from_slice::<Response<String, serde_json::Value, serde_json::Value>>(&data)
                .map_err(map_error);

        match response {
            Ok(response) => {
                log::trace!("parsed response: {:?}", response);
                if let Some(result) = response.result {
                    log::trace!("response {} with result: {}", response.id, result);
                    completed_q.complete_one(response.id, Ok(result));
                } else if let Some(err) = response.error {
                    log::trace!("response {} with error: {}", response.id, err);
                    completed_q.complete_one(response.id, Err(err));
                } else {
                    completed_q.complete_one(response.id, Ok(serde_json::Value::Null));
                    log::trace!("response {} with null result", response.id);
                }
            }
            Err(err) => {
                log::error!("parse response error,{}", err);
                log::error!("response {}", String::from_utf8_lossy(&data));
                completed_q.cancel_all();
                return Err(err);
            }
        }
    }

    completed_q.cancel_all();

    log::info!("rpc client {} recv_loop stop.", client_id.as_ref());

    Ok(())
}