jsonrpc_rs/server/
session.rs

1use futures::{SinkExt, TryStreamExt};
2
3use crate::{
4    channel::{RPCData, TransportChannel},
5    map_error, Error, ErrorCode, RPCResult, Request, Response,
6};
7
8use super::handler::*;
9
10pub struct ServiceSession<C: TransportChannel> {
11    id: String,
12    input: C::Input,
13    output: C::Output,
14    methods: HandlerClonerRegister<ServerHandler>,
15    async_methods: HandlerClonerRegister<AsyncServerHandler>,
16}
17
18impl<C: TransportChannel> ServiceSession<C> {
19    pub(crate) fn new(
20        id: String,
21        input: C::Input,
22        output: C::Output,
23        methods: HandlerClonerRegister<ServerHandler>,
24        async_methods: HandlerClonerRegister<AsyncServerHandler>,
25    ) -> Self {
26        Self {
27            id,
28            input,
29            output,
30            methods,
31            async_methods,
32        }
33    }
34    pub async fn run(&mut self) -> RPCResult<()> {
35        while let Some(next) = self.input.try_next().await.map_err(map_error)? {
36            let request = serde_json::from_slice::<Request<&str, serde_json::Value>>(&next)?;
37
38            if let Some(mut handler) = self.methods.clone_from(request.method) {
39                self.handle_resp(
40                    request.id,
41                    request.method,
42                    handler(request.id, request.params),
43                )
44                .await?;
45            } else if let Some(mut handler) = self.async_methods.clone_from(request.method) {
46                self.handle_resp(
47                    request.id,
48                    request.method,
49                    handler(request.id, request.params).await,
50                )
51                .await?;
52            }
53        }
54
55        log::info!("Server session {} stop.", self.id);
56
57        Ok(())
58    }
59
60    async fn handle_resp(
61        &mut self,
62        id: Option<usize>,
63        method: &str,
64        result: RPCResult<Option<RPCData>>,
65    ) -> RPCResult<()> {
66        match result {
67            Ok(Some(response)) => {
68                self.output.send(response).await.map_err(map_error)?;
69            }
70            Err(code) => {
71                if let Some(id) = id {
72                    let resp = Self::new_error_resp(id, code.code, Some(code.message));
73                    self.output.send(resp).await.map_err(map_error)?;
74                } else {
75                    log::trace!("Method {} call return error, {}", method, code);
76                }
77            }
78            _ => {}
79        }
80
81        Ok(())
82    }
83
84    fn new_error_resp(id: usize, code: ErrorCode, message: Option<String>) -> RPCData {
85        let response = Response::<String, (), ()> {
86            id,
87            error: Some(Error {
88                code: code.clone(),
89                message: message.unwrap_or(code.to_string()),
90                data: None,
91            }),
92            ..Default::default()
93        };
94
95        serde_json::to_vec(&response)
96            .expect("Inner error, serialize jsonrpc response")
97            .into()
98    }
99}