1use async_std::stream::StreamExt;
2use async_std::task;
3use std::future::Future;
4use std::sync::Arc;
5use tide::{Endpoint, Request};
6use tide_websockets::{Message as WsMessage, WebSocket};
7use yerpc::{RpcClient, RpcServer, RpcSession};
8
9pub fn yerpc_handler<State, Server, Fun, Fut>(handler: Fun) -> impl Endpoint<State>
15where
16 State: Send + Sync + Clone + 'static,
17 Fun: Fn(Request<State>, RpcClient) -> Fut + Sync + Send + 'static,
18 Fut: Future<Output = anyhow::Result<Server>> + Send + 'static,
19 Server: RpcServer,
20{
21 let handler = Arc::new(handler);
22 WebSocket::new(move |request: Request<State>, mut stream| {
23 let handler = handler.clone();
24 async move {
25 let (client, mut outgoing) = RpcClient::new();
26 let server = (handler)(request, client.clone()).await?;
27 let session = RpcSession::new(client, server);
28 task::spawn({
29 let stream = stream.clone();
30 async move {
31 while let Some(message) = outgoing.next().await {
32 let message = serde_json::to_string(&message)?;
33 stream.send(WsMessage::Text(message)).await?;
35 }
36 let res: Result<(), anyhow::Error> = Ok(());
37 res
38 }
39 });
40 while let Some(Ok(WsMessage::Text(input))) = stream.next().await {
41 session.handle_incoming(&input).await;
42 }
43 Ok(())
44 }
45 })
46}