jsonrpc_rs/server/
session.rs1use futures::{SinkExt, TryStreamExt};
2
3use crate::{
4 channel::{RPCData, TransportChannel},
5 map_error, Error, ErrorCode, RPCResult, Request, Response,
6};
7
8use super::handler::*;
9
10pub struct ServiceSession<C: TransportChannel> {
11 id: String,
12 input: C::Input,
13 output: C::Output,
14 methods: HandlerClonerRegister<ServerHandler>,
15 async_methods: HandlerClonerRegister<AsyncServerHandler>,
16}
17
18impl<C: TransportChannel> ServiceSession<C> {
19 pub(crate) fn new(
20 id: String,
21 input: C::Input,
22 output: C::Output,
23 methods: HandlerClonerRegister<ServerHandler>,
24 async_methods: HandlerClonerRegister<AsyncServerHandler>,
25 ) -> Self {
26 Self {
27 id,
28 input,
29 output,
30 methods,
31 async_methods,
32 }
33 }
34 pub async fn run(&mut self) -> RPCResult<()> {
35 while let Some(next) = self.input.try_next().await.map_err(map_error)? {
36 let request = serde_json::from_slice::<Request<&str, serde_json::Value>>(&next)?;
37
38 if let Some(mut handler) = self.methods.clone_from(request.method) {
39 self.handle_resp(
40 request.id,
41 request.method,
42 handler(request.id, request.params),
43 )
44 .await?;
45 } else if let Some(mut handler) = self.async_methods.clone_from(request.method) {
46 self.handle_resp(
47 request.id,
48 request.method,
49 handler(request.id, request.params).await,
50 )
51 .await?;
52 }
53 }
54
55 log::info!("Server session {} stop.", self.id);
56
57 Ok(())
58 }
59
60 async fn handle_resp(
61 &mut self,
62 id: Option<usize>,
63 method: &str,
64 result: RPCResult<Option<RPCData>>,
65 ) -> RPCResult<()> {
66 match result {
67 Ok(Some(response)) => {
68 self.output.send(response).await.map_err(map_error)?;
69 }
70 Err(code) => {
71 if let Some(id) = id {
72 let resp = Self::new_error_resp(id, code.code, Some(code.message));
73 self.output.send(resp).await.map_err(map_error)?;
74 } else {
75 log::trace!("Method {} call return error, {}", method, code);
76 }
77 }
78 _ => {}
79 }
80
81 Ok(())
82 }
83
84 fn new_error_resp(id: usize, code: ErrorCode, message: Option<String>) -> RPCData {
85 let response = Response::<String, (), ()> {
86 id,
87 error: Some(Error {
88 code: code.clone(),
89 message: message.unwrap_or(code.to_string()),
90 data: None,
91 }),
92 ..Default::default()
93 };
94
95 serde_json::to_vec(&response)
96 .expect("Inner error, serialize jsonrpc response")
97 .into()
98 }
99}