use futures::{StreamExt, SinkExt};
use jsonrpc_core_client::RpcError;
use std::sync::Arc;
use cs_utils::futures::GenericCodec;
use jsonrpc_core::{MetaIoHandler, Metadata, Middleware, Error};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::codec::Framed;
pub struct AsyncDuplexRpcServer<M: Metadata = (), T: Middleware<M> = jsonrpc_core::NoopMiddleware> {
handler: Arc<MetaIoHandler<M, T>>,
}
impl<M: Metadata, T: Middleware<M>> AsyncDuplexRpcServer<M, T>
where
M: Default,
T::Future: Unpin,
T::CallFuture: Unpin,
{
pub fn new(
handler: impl Into<MetaIoHandler<M, T>>
) -> Self {
return AsyncDuplexRpcServer {
handler: Arc::new(handler.into()),
};
}
pub fn build<TAsyncDuplex: AsyncRead + AsyncWrite + Send + 'static>(
self,
async_duplex: TAsyncDuplex,
) -> Self {
let handler = Arc::clone(&self.handler);
let framed_remote_stream = Framed::new(
async_duplex,
GenericCodec::<String>::new(),
);
let (mut sink, mut source) = framed_remote_stream.split();
tokio::spawn(async move {
loop {
let request = source
.next().await
.expect("End of the stream.")
.expect("Received error on the stream.");
let maybe_response = handler
.handle_request(&request, Default::default()).await;
let response = match maybe_response {
Some(resp) => resp,
None => {
let error = RpcError::JsonRpcError(
Error::method_not_found(),
);
error.to_string()
},
};
sink
.send(response).await
.expect("Failed send RPC response to the stream.");
}
});
return self;
}
}
#[cfg(test)]
mod tests {
use cs_utils::futures::GenericCodec;
use futures::{StreamExt, SinkExt};
use jsonrpc_core::IoHandler;
use tokio::io::duplex;
use tokio_util::codec::Framed;
use crate::AsyncDuplexRpcServer;
#[tokio::test]
async fn returns_no_method_found_error() {
let (server_stream, client_stream) = duplex(1024);
let server_io = IoHandler::new();
let _server = AsyncDuplexRpcServer::new(server_io)
.build(server_stream);
let framed_remote_stream = Framed::new(client_stream, GenericCodec::<String>::new());
let (mut sink, mut source) = framed_remote_stream.split();
tokio::try_join!(
tokio::spawn(async move {
let message = r#"{ version: \"2\", method: \"unknown_method\", args: [] }"#;
sink
.send(message.to_string()).await
.unwrap();
}),
tokio::spawn(async move {
let response = source.next().await.unwrap().unwrap();
assert_eq!(
response,
r#"{"jsonrpc":"2.0","error":{"code":-32700,"message":"Parse error"},"id":null}"#,
);
}),
).unwrap();
}
}