jsonrpsee_core/server/helpers.rs
1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any
4// person obtaining a copy of this software and associated
5// documentation files (the "Software"), to deal in the
6// Software without restriction, including without
7// limitation the rights to use, copy, modify, merge,
8// publish, distribute, sublicense, and/or sell copies of
9// the Software, and to permit persons to whom the Software
10// is furnished to do so, subject to the following
11// conditions:
12//
13// The above copyright notice and this permission notice
14// shall be included in all copies or substantial portions
15// of the Software.
16//
17// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
18// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
19// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
20// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
21// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
22// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
23// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
24// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25// DEALINGS IN THE SOFTWARE.
26
27use std::time::Duration;
28
29use jsonrpsee_types::{ErrorCode, ErrorObject, Id, InvalidRequest, Response, ResponsePayload};
30use serde_json::value::RawValue;
31use tokio::sync::mpsc;
32
33use super::{DisconnectError, SendTimeoutError, TrySendError};
34
35/// Sink that is used to send back the result to the server for a specific method.
36#[derive(Clone, Debug)]
37pub struct MethodSink {
38 /// Channel sender.
39 tx: mpsc::Sender<Box<RawValue>>,
40 /// Max response size in bytes for a executed call.
41 max_response_size: u32,
42}
43
44impl MethodSink {
45 /// Create a new `MethodSink` with unlimited response size.
46 pub fn new(tx: mpsc::Sender<Box<RawValue>>) -> Self {
47 MethodSink { tx, max_response_size: u32::MAX }
48 }
49
50 /// Create a new `MethodSink` with a limited response size.
51 pub fn new_with_limit(tx: mpsc::Sender<Box<RawValue>>, max_response_size: u32) -> Self {
52 MethodSink { tx, max_response_size }
53 }
54
55 /// Returns whether this channel is closed without needing a context.
56 pub fn is_closed(&self) -> bool {
57 self.tx.is_closed()
58 }
59
60 /// Same as [`tokio::sync::mpsc::Sender::closed`].
61 ///
62 /// # Cancel safety
63 /// This method is cancel safe. Once the channel is closed,
64 /// it stays closed forever and all future calls to closed will return immediately.
65 pub async fn closed(&self) {
66 self.tx.closed().await
67 }
68
69 /// Get the max response size.
70 pub const fn max_response_size(&self) -> u32 {
71 self.max_response_size
72 }
73
74 /// Attempts to send out the message immediately and fails if the underlying
75 /// connection has been closed or if the message buffer is full.
76 ///
77 /// Returns the message if the send fails such that either can be thrown away or re-sent later.
78 pub fn try_send(&mut self, msg: Box<RawValue>) -> Result<(), TrySendError> {
79 self.tx.try_send(msg).map_err(Into::into)
80 }
81
82 /// Async send which will wait until there is space in channel buffer or that the subscription is disconnected.
83 pub async fn send(&self, msg: Box<RawValue>) -> Result<(), DisconnectError> {
84 self.tx.send(msg).await.map_err(Into::into)
85 }
86
87 /// Send a JSON-RPC error to the client
88 pub async fn send_error<'a>(&self, id: Id<'a>, err: ErrorObject<'a>) -> Result<(), DisconnectError> {
89 let payload = ResponsePayload::<()>::error_borrowed(err);
90 let json = serde_json::value::to_raw_value(&Response::new(payload, id)).expect("valid JSON; qed");
91
92 self.send(json).await
93 }
94
95 /// Similar to `MethodSink::send` but only waits for a limited time.
96 pub async fn send_timeout(&self, msg: Box<RawValue>, timeout: Duration) -> Result<(), SendTimeoutError> {
97 self.tx.send_timeout(msg, timeout).await.map_err(Into::into)
98 }
99
100 /// Get the capacity of the channel.
101 pub fn capacity(&self) -> usize {
102 self.tx.capacity()
103 }
104
105 /// Get the max capacity of the channel.
106 pub fn max_capacity(&self) -> usize {
107 self.tx.max_capacity()
108 }
109
110 /// Waits for there to be space on the return channel.
111 pub async fn has_capacity(&self) -> Result<(), DisconnectError> {
112 match self.tx.reserve().await {
113 // The permit is thrown away here because it's just
114 // a way to ensure that the return buffer has space.
115 Ok(_) => Ok(()),
116 Err(_) => Err(DisconnectError(RawValue::NULL.to_owned().into())),
117 }
118 }
119}
120
121/// Figure out if this is a sufficiently complete request that we can extract an [`Id`] out of, or just plain
122/// unparseable garbage.
123pub fn prepare_error(data: &[u8]) -> (Id<'_>, ErrorCode) {
124 match serde_json::from_slice::<InvalidRequest>(data) {
125 Ok(InvalidRequest { id }) => (id, ErrorCode::InvalidRequest),
126 Err(_) => (Id::Null, ErrorCode::ParseError),
127 }
128}