rpc_toolkit/server/
mod.rs1use std::collections::VecDeque;
2use std::sync::Arc;
3
4use futures::future::{join_all, BoxFuture};
5use futures::{Future, FutureExt, Stream, StreamExt};
6use imbl_value::Value;
7use yajrc::{RpcError, RpcMethod};
8
9use crate::util::{invalid_request, JobRunner};
10use crate::{AnyHandler, Empty, HandleAny, HandleAnyArgs, ParentHandler};
11
12pub type GenericRpcMethod = yajrc::GenericRpcMethod<String, Value, Value>;
13pub type RpcRequest = yajrc::RpcRequest<GenericRpcMethod>;
14pub type RpcResponse = yajrc::RpcResponse<GenericRpcMethod>;
15pub type SingleOrBatchRpcRequest = yajrc::SingleOrBatchRpcRequest<GenericRpcMethod>;
16
17pub mod http;
18pub mod socket;
19
20pub use http::*;
21pub use socket::*;
22
23pub struct Server<Context: crate::Context> {
24 make_ctx: Arc<dyn Fn() -> BoxFuture<'static, Result<Context, RpcError>> + Send + Sync>,
25 root_handler: Arc<AnyHandler<Context, Empty, ParentHandler<Context>>>,
26}
27impl<Context: crate::Context> Clone for Server<Context> {
28 fn clone(&self) -> Self {
29 Self {
30 make_ctx: self.make_ctx.clone(),
31 root_handler: self.root_handler.clone(),
32 }
33 }
34}
35impl<Context: crate::Context> Server<Context> {
36 pub fn new<
37 MakeCtx: Fn() -> Fut + Send + Sync + 'static,
38 Fut: Future<Output = Result<Context, RpcError>> + Send + 'static,
39 >(
40 make_ctx: MakeCtx,
41 root_handler: ParentHandler<Context>,
42 ) -> Self {
43 Server {
44 make_ctx: Arc::new(move || make_ctx().boxed()),
45 root_handler: Arc::new(AnyHandler::new(root_handler)),
46 }
47 }
48
49 pub fn handle_command(
50 &self,
51 method: &str,
52 params: Value,
53 ) -> impl Future<Output = Result<Value, RpcError>> + Send + 'static {
54 let (make_ctx, root_handler, method) = (
55 self.make_ctx.clone(),
56 self.root_handler.clone(),
57 self.root_handler.method_from_dots(method),
58 );
59
60 async move {
61 root_handler
62 .handle_async(HandleAnyArgs {
63 context: make_ctx().await?,
64 parent_method: VecDeque::new(),
65 method: method.ok_or_else(|| yajrc::METHOD_NOT_FOUND_ERROR)?,
66 params,
67 inherited: crate::Empty {},
68 })
69 .await
70 }
71 }
72
73 fn handle_single_request(
74 &self,
75 RpcRequest { id, method, params }: RpcRequest,
76 ) -> impl Future<Output = RpcResponse> + Send + 'static {
77 let handle = (|| Ok::<_, RpcError>(self.handle_command(method.as_str(), params)))();
78 async move {
79 RpcResponse {
80 id,
81 result: match handle {
82 Ok(handle) => handle.await,
83 Err(e) => Err(e),
84 },
85 }
86 }
87 }
88
89 pub fn handle(
90 &self,
91 request: Result<Value, RpcError>,
92 ) -> BoxFuture<'static, Result<Value, imbl_value::Error>> {
93 match request.and_then(|request| {
94 imbl_value::from_value::<SingleOrBatchRpcRequest>(request).map_err(invalid_request)
95 }) {
96 Ok(SingleOrBatchRpcRequest::Single(req)) => {
97 let fut = self.handle_single_request(req);
98 async { imbl_value::to_value(&fut.await) }.boxed()
99 }
100 Ok(SingleOrBatchRpcRequest::Batch(reqs)) => {
101 let futs: Vec<_> = reqs
102 .into_iter()
103 .map(|req| self.handle_single_request(req))
104 .collect();
105 async { imbl_value::to_value(&join_all(futs).await) }.boxed()
106 }
107 Err(e) => async {
108 imbl_value::to_value(&RpcResponse {
109 id: None,
110 result: Err(e),
111 })
112 }
113 .boxed(),
114 }
115 }
116
117 pub fn stream<'a>(
118 &'a self,
119 requests: impl Stream<Item = Result<Value, RpcError>> + Send + 'a,
120 ) -> impl Stream<Item = Result<Value, imbl_value::Error>> + 'a {
121 async_stream::try_stream! {
122 let mut runner = JobRunner::new();
123 let requests = requests.fuse().map(|req| self.handle(req));
124 tokio::pin!(requests);
125
126 while let Some(res) = runner.next_result(&mut requests).await.transpose()? {
127 yield res;
128 }
129 }
130 }
131}