multiplexed_connection/utils/
async_duplex_rpc_server.rs

1use futures::{StreamExt, SinkExt};
2use jsonrpc_core_client::RpcError;
3use std::sync::Arc;
4
5use cs_utils::futures::GenericCodec;
6use jsonrpc_core::{MetaIoHandler, Metadata, Middleware, Error};
7use tokio::io::{AsyncRead, AsyncWrite};
8use tokio_util::codec::Framed;
9
10/// AsyncDuplex server builder
11pub struct AsyncDuplexRpcServer<M: Metadata = (), T: Middleware<M> = jsonrpc_core::NoopMiddleware> {
12	handler: Arc<MetaIoHandler<M, T>>,
13}
14
15impl<M: Metadata, T: Middleware<M>> AsyncDuplexRpcServer<M, T>
16where
17	M: Default,
18	T::Future: Unpin,
19	T::CallFuture: Unpin,
20{
21	/// Returns a new server instance
22	pub fn new(
23        handler: impl Into<MetaIoHandler<M, T>>
24    ) -> Self {
25		return AsyncDuplexRpcServer {
26			handler: Arc::new(handler.into()),
27		};
28	}
29
30	/// Returns a server future that needs to be polled in order to make progress.
31	pub fn build<TAsyncDuplex: AsyncRead + AsyncWrite + Send + 'static>(
32        self,
33        async_duplex: TAsyncDuplex,
34    ) -> Self {
35		let handler = Arc::clone(&self.handler);
36
37        let framed_remote_stream = Framed::new(
38            async_duplex,
39            GenericCodec::<String>::new(),
40        );
41        let (mut sink, mut source) = framed_remote_stream.split();
42
43        tokio::spawn(async move {
44            // TODO: propagate errors?
45            loop {
46                let request = source
47                    .next().await
48                    .expect("End of the stream.")
49                    .expect("Received error on the stream.");
50
51                // got RPC request
52                let maybe_response = handler
53                    .handle_request(&request, Default::default()).await;
54
55                let response = match maybe_response {
56                    Some(resp) => resp,
57                    None => {
58                        let error = RpcError::JsonRpcError(
59                            Error::method_not_found(),
60                        );
61
62                        error.to_string()
63                    },
64                };
65
66                sink
67                    .send(response).await
68                    .expect("Failed send RPC response to the stream.");
69            }
70        });
71
72        return self;
73	}
74}
75
76#[cfg(test)]
77mod tests {
78    use cs_utils::futures::GenericCodec;
79    use futures::{StreamExt, SinkExt};
80    use jsonrpc_core::IoHandler;
81    use tokio::io::duplex;
82    use tokio_util::codec::Framed;
83
84    use crate::AsyncDuplexRpcServer;
85
86    #[tokio::test]
87    async fn returns_no_method_found_error() {
88        let (server_stream, client_stream) = duplex(1024);
89        let server_io = IoHandler::new();
90
91        let _server = AsyncDuplexRpcServer::new(server_io)
92            .build(server_stream);
93
94        let framed_remote_stream = Framed::new(client_stream, GenericCodec::<String>::new());
95        let (mut sink, mut source) = framed_remote_stream.split();
96
97        tokio::try_join!(
98            tokio::spawn(async move {
99                let message = r#"{ version: \"2\", method: \"unknown_method\", args: [] }"#;
100                sink
101                    .send(message.to_string()).await
102                    .unwrap();
103            }),
104            tokio::spawn(async move {
105                let response = source.next().await.unwrap().unwrap();
106                
107                assert_eq!(
108                    response,
109                    r#"{"jsonrpc":"2.0","error":{"code":-32700,"message":"Parse error"},"id":null}"#,
110                );
111            }),
112        ).unwrap();
113    }
114}