multiplexed_connection/utils/
async_duplex_rpc_server.rs1use futures::{StreamExt, SinkExt};
2use jsonrpc_core_client::RpcError;
3use std::sync::Arc;
4
5use cs_utils::futures::GenericCodec;
6use jsonrpc_core::{MetaIoHandler, Metadata, Middleware, Error};
7use tokio::io::{AsyncRead, AsyncWrite};
8use tokio_util::codec::Framed;
9
10pub struct AsyncDuplexRpcServer<M: Metadata = (), T: Middleware<M> = jsonrpc_core::NoopMiddleware> {
12 handler: Arc<MetaIoHandler<M, T>>,
13}
14
15impl<M: Metadata, T: Middleware<M>> AsyncDuplexRpcServer<M, T>
16where
17 M: Default,
18 T::Future: Unpin,
19 T::CallFuture: Unpin,
20{
21 pub fn new(
23 handler: impl Into<MetaIoHandler<M, T>>
24 ) -> Self {
25 return AsyncDuplexRpcServer {
26 handler: Arc::new(handler.into()),
27 };
28 }
29
30 pub fn build<TAsyncDuplex: AsyncRead + AsyncWrite + Send + 'static>(
32 self,
33 async_duplex: TAsyncDuplex,
34 ) -> Self {
35 let handler = Arc::clone(&self.handler);
36
37 let framed_remote_stream = Framed::new(
38 async_duplex,
39 GenericCodec::<String>::new(),
40 );
41 let (mut sink, mut source) = framed_remote_stream.split();
42
43 tokio::spawn(async move {
44 loop {
46 let request = source
47 .next().await
48 .expect("End of the stream.")
49 .expect("Received error on the stream.");
50
51 let maybe_response = handler
53 .handle_request(&request, Default::default()).await;
54
55 let response = match maybe_response {
56 Some(resp) => resp,
57 None => {
58 let error = RpcError::JsonRpcError(
59 Error::method_not_found(),
60 );
61
62 error.to_string()
63 },
64 };
65
66 sink
67 .send(response).await
68 .expect("Failed send RPC response to the stream.");
69 }
70 });
71
72 return self;
73 }
74}
75
76#[cfg(test)]
77mod tests {
78 use cs_utils::futures::GenericCodec;
79 use futures::{StreamExt, SinkExt};
80 use jsonrpc_core::IoHandler;
81 use tokio::io::duplex;
82 use tokio_util::codec::Framed;
83
84 use crate::AsyncDuplexRpcServer;
85
86 #[tokio::test]
87 async fn returns_no_method_found_error() {
88 let (server_stream, client_stream) = duplex(1024);
89 let server_io = IoHandler::new();
90
91 let _server = AsyncDuplexRpcServer::new(server_io)
92 .build(server_stream);
93
94 let framed_remote_stream = Framed::new(client_stream, GenericCodec::<String>::new());
95 let (mut sink, mut source) = framed_remote_stream.split();
96
97 tokio::try_join!(
98 tokio::spawn(async move {
99 let message = r#"{ version: \"2\", method: \"unknown_method\", args: [] }"#;
100 sink
101 .send(message.to_string()).await
102 .unwrap();
103 }),
104 tokio::spawn(async move {
105 let response = source.next().await.unwrap().unwrap();
106
107 assert_eq!(
108 response,
109 r#"{"jsonrpc":"2.0","error":{"code":-32700,"message":"Parse error"},"id":null}"#,
110 );
111 }),
112 ).unwrap();
113 }
114}