yerpc_tide/
lib.rs

1use async_std::stream::StreamExt;
2use async_std::task;
3use std::future::Future;
4use std::sync::Arc;
5use tide::{Endpoint, Request};
6use tide_websockets::{Message as WsMessage, WebSocket};
7use yerpc::{RpcClient, RpcServer, RpcSession};
8
9/// A Tide endpoint for a JSON-RPC 2.0 websocket.
10///
11/// The `handler` closure has to return a type that implements [yerpc::RpcHandler].
12/// Either implement that manually or use `yerpc_derive::rpc`.
13/// See the [webserver example](../../examples/webserver) for a usage example.
14pub fn yerpc_handler<State, Server, Fun, Fut>(handler: Fun) -> impl Endpoint<State>
15where
16    State: Send + Sync + Clone + 'static,
17    Fun: Fn(Request<State>, RpcClient) -> Fut + Sync + Send + 'static,
18    Fut: Future<Output = anyhow::Result<Server>> + Send + 'static,
19    Server: RpcServer,
20{
21    let handler = Arc::new(handler);
22    WebSocket::new(move |request: Request<State>, mut stream| {
23        let handler = handler.clone();
24        async move {
25            let (client, mut outgoing) = RpcClient::new();
26            let server = (handler)(request, client.clone()).await?;
27            let session = RpcSession::new(client, server);
28            task::spawn({
29                let stream = stream.clone();
30                async move {
31                    while let Some(message) = outgoing.next().await {
32                        let message = serde_json::to_string(&message)?;
33                        // Abort on error.
34                        stream.send(WsMessage::Text(message)).await?;
35                    }
36                    let res: Result<(), anyhow::Error> = Ok(());
37                    res
38                }
39            });
40            while let Some(Ok(WsMessage::Text(input))) = stream.next().await {
41                session.handle_incoming(&input).await;
42            }
43            Ok(())
44        }
45    })
46}