jsonrpsee_core/server/
helpers.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any
4// person obtaining a copy of this software and associated
5// documentation files (the "Software"), to deal in the
6// Software without restriction, including without
7// limitation the rights to use, copy, modify, merge,
8// publish, distribute, sublicense, and/or sell copies of
9// the Software, and to permit persons to whom the Software
10// is furnished to do so, subject to the following
11// conditions:
12//
13// The above copyright notice and this permission notice
14// shall be included in all copies or substantial portions
15// of the Software.
16//
17// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
18// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
19// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
20// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
21// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
22// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
23// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
24// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25// DEALINGS IN THE SOFTWARE.
26
27use std::time::Duration;
28
29use jsonrpsee_types::{ErrorCode, ErrorObject, Id, InvalidRequest, Response, ResponsePayload};
30use serde_json::value::RawValue;
31use tokio::sync::mpsc;
32
33use super::{DisconnectError, SendTimeoutError, TrySendError};
34
35/// Sink that is used to send back the result to the server for a specific method.
36#[derive(Clone, Debug)]
37pub struct MethodSink {
38	/// Channel sender.
39	tx: mpsc::Sender<Box<RawValue>>,
40	/// Max response size in bytes for a executed call.
41	max_response_size: u32,
42}
43
44impl MethodSink {
45	/// Create a new `MethodSink` with unlimited response size.
46	pub fn new(tx: mpsc::Sender<Box<RawValue>>) -> Self {
47		MethodSink { tx, max_response_size: u32::MAX }
48	}
49
50	/// Create a new `MethodSink` with a limited response size.
51	pub fn new_with_limit(tx: mpsc::Sender<Box<RawValue>>, max_response_size: u32) -> Self {
52		MethodSink { tx, max_response_size }
53	}
54
55	/// Returns whether this channel is closed without needing a context.
56	pub fn is_closed(&self) -> bool {
57		self.tx.is_closed()
58	}
59
60	/// Same as [`tokio::sync::mpsc::Sender::closed`].
61	///
62	/// # Cancel safety
63	/// This method is cancel safe. Once the channel is closed,
64	/// it stays closed forever and all future calls to closed will return immediately.
65	pub async fn closed(&self) {
66		self.tx.closed().await
67	}
68
69	/// Get the max response size.
70	pub const fn max_response_size(&self) -> u32 {
71		self.max_response_size
72	}
73
74	/// Attempts to send out the message immediately and fails if the underlying
75	/// connection has been closed or if the message buffer is full.
76	///
77	/// Returns the message if the send fails such that either can be thrown away or re-sent later.
78	pub fn try_send(&mut self, msg: Box<RawValue>) -> Result<(), TrySendError> {
79		self.tx.try_send(msg).map_err(Into::into)
80	}
81
82	/// Async send which will wait until there is space in channel buffer or that the subscription is disconnected.
83	pub async fn send(&self, msg: Box<RawValue>) -> Result<(), DisconnectError> {
84		self.tx.send(msg).await.map_err(Into::into)
85	}
86
87	/// Send a JSON-RPC error to the client
88	pub async fn send_error<'a>(&self, id: Id<'a>, err: ErrorObject<'a>) -> Result<(), DisconnectError> {
89		let payload = ResponsePayload::<()>::error_borrowed(err);
90		let json = serde_json::value::to_raw_value(&Response::new(payload, id)).expect("valid JSON; qed");
91
92		self.send(json).await
93	}
94
95	/// Similar to `MethodSink::send` but only waits for a limited time.
96	pub async fn send_timeout(&self, msg: Box<RawValue>, timeout: Duration) -> Result<(), SendTimeoutError> {
97		self.tx.send_timeout(msg, timeout).await.map_err(Into::into)
98	}
99
100	/// Get the capacity of the channel.
101	pub fn capacity(&self) -> usize {
102		self.tx.capacity()
103	}
104
105	/// Get the max capacity of the channel.
106	pub fn max_capacity(&self) -> usize {
107		self.tx.max_capacity()
108	}
109
110	/// Waits for there to be space on the return channel.
111	pub async fn has_capacity(&self) -> Result<(), DisconnectError> {
112		match self.tx.reserve().await {
113			// The permit is thrown away here because it's just
114			// a way to ensure that the return buffer has space.
115			Ok(_) => Ok(()),
116			Err(_) => Err(DisconnectError(RawValue::NULL.to_owned().into())),
117		}
118	}
119}
120
121/// Figure out if this is a sufficiently complete request that we can extract an [`Id`] out of, or just plain
122/// unparseable garbage.
123pub fn prepare_error(data: &[u8]) -> (Id<'_>, ErrorCode) {
124	match serde_json::from_slice::<InvalidRequest>(data) {
125		Ok(InvalidRequest { id }) => (id, ErrorCode::InvalidRequest),
126		Err(_) => (Id::Null, ErrorCode::ParseError),
127	}
128}