rpc_toolkit/server/
mod.rs

1use std::collections::VecDeque;
2use std::sync::Arc;
3
4use futures::future::{join_all, BoxFuture};
5use futures::{Future, FutureExt, Stream, StreamExt};
6use imbl_value::Value;
7use yajrc::{RpcError, RpcMethod};
8
9use crate::util::{invalid_request, JobRunner};
10use crate::{AnyHandler, Empty, HandleAny, HandleAnyArgs, ParentHandler};
11
12pub type GenericRpcMethod = yajrc::GenericRpcMethod<String, Value, Value>;
13pub type RpcRequest = yajrc::RpcRequest<GenericRpcMethod>;
14pub type RpcResponse = yajrc::RpcResponse<GenericRpcMethod>;
15pub type SingleOrBatchRpcRequest = yajrc::SingleOrBatchRpcRequest<GenericRpcMethod>;
16
17pub mod http;
18pub mod socket;
19
20pub use http::*;
21pub use socket::*;
22
23pub struct Server<Context: crate::Context> {
24    make_ctx: Arc<dyn Fn() -> BoxFuture<'static, Result<Context, RpcError>> + Send + Sync>,
25    root_handler: Arc<AnyHandler<Context, Empty, ParentHandler<Context>>>,
26}
27impl<Context: crate::Context> Clone for Server<Context> {
28    fn clone(&self) -> Self {
29        Self {
30            make_ctx: self.make_ctx.clone(),
31            root_handler: self.root_handler.clone(),
32        }
33    }
34}
35impl<Context: crate::Context> Server<Context> {
36    pub fn new<
37        MakeCtx: Fn() -> Fut + Send + Sync + 'static,
38        Fut: Future<Output = Result<Context, RpcError>> + Send + 'static,
39    >(
40        make_ctx: MakeCtx,
41        root_handler: ParentHandler<Context>,
42    ) -> Self {
43        Server {
44            make_ctx: Arc::new(move || make_ctx().boxed()),
45            root_handler: Arc::new(AnyHandler::new(root_handler)),
46        }
47    }
48
49    pub fn handle_command(
50        &self,
51        method: &str,
52        params: Value,
53    ) -> impl Future<Output = Result<Value, RpcError>> + Send + 'static {
54        let (make_ctx, root_handler, method) = (
55            self.make_ctx.clone(),
56            self.root_handler.clone(),
57            self.root_handler.method_from_dots(method),
58        );
59
60        async move {
61            root_handler
62                .handle_async(HandleAnyArgs {
63                    context: make_ctx().await?,
64                    parent_method: VecDeque::new(),
65                    method: method.ok_or_else(|| yajrc::METHOD_NOT_FOUND_ERROR)?,
66                    params,
67                    inherited: crate::Empty {},
68                })
69                .await
70        }
71    }
72
73    fn handle_single_request(
74        &self,
75        RpcRequest { id, method, params }: RpcRequest,
76    ) -> impl Future<Output = RpcResponse> + Send + 'static {
77        let handle = (|| Ok::<_, RpcError>(self.handle_command(method.as_str(), params)))();
78        async move {
79            RpcResponse {
80                id,
81                result: match handle {
82                    Ok(handle) => handle.await,
83                    Err(e) => Err(e),
84                },
85            }
86        }
87    }
88
89    pub fn handle(
90        &self,
91        request: Result<Value, RpcError>,
92    ) -> BoxFuture<'static, Result<Value, imbl_value::Error>> {
93        match request.and_then(|request| {
94            imbl_value::from_value::<SingleOrBatchRpcRequest>(request).map_err(invalid_request)
95        }) {
96            Ok(SingleOrBatchRpcRequest::Single(req)) => {
97                let fut = self.handle_single_request(req);
98                async { imbl_value::to_value(&fut.await) }.boxed()
99            }
100            Ok(SingleOrBatchRpcRequest::Batch(reqs)) => {
101                let futs: Vec<_> = reqs
102                    .into_iter()
103                    .map(|req| self.handle_single_request(req))
104                    .collect();
105                async { imbl_value::to_value(&join_all(futs).await) }.boxed()
106            }
107            Err(e) => async {
108                imbl_value::to_value(&RpcResponse {
109                    id: None,
110                    result: Err(e),
111                })
112            }
113            .boxed(),
114        }
115    }
116
117    pub fn stream<'a>(
118        &'a self,
119        requests: impl Stream<Item = Result<Value, RpcError>> + Send + 'a,
120    ) -> impl Stream<Item = Result<Value, imbl_value::Error>> + 'a {
121        async_stream::try_stream! {
122            let mut runner = JobRunner::new();
123            let requests = requests.fuse().map(|req| self.handle(req));
124            tokio::pin!(requests);
125
126            while let Some(res) = runner.next_result(&mut requests).await.transpose()? {
127                yield res;
128            }
129        }
130    }
131}