jsonrpsee_core/server/
helpers.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any
4// person obtaining a copy of this software and associated
5// documentation files (the "Software"), to deal in the
6// Software without restriction, including without
7// limitation the rights to use, copy, modify, merge,
8// publish, distribute, sublicense, and/or sell copies of
9// the Software, and to permit persons to whom the Software
10// is furnished to do so, subject to the following
11// conditions:
12//
13// The above copyright notice and this permission notice
14// shall be included in all copies or substantial portions
15// of the Software.
16//
17// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
18// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
19// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
20// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
21// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
22// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
23// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
24// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25// DEALINGS IN THE SOFTWARE.
26
27use std::io;
28use std::time::Duration;
29
30use jsonrpsee_types::{ErrorCode, ErrorObject, Id, InvalidRequest, Response, ResponsePayload};
31use tokio::sync::mpsc;
32
33use super::{DisconnectError, SendTimeoutError, SubscriptionMessage, TrySendError};
34
35/// Bounded writer that allows writing at most `max_len` bytes.
36///
37/// ```
38///    use std::io::Write;
39///
40///    use jsonrpsee_core::server::helpers::BoundedWriter;
41///
42///    let mut writer = BoundedWriter::new(10);
43///    (&mut writer).write("hello".as_bytes()).unwrap();
44///    assert_eq!(std::str::from_utf8(&writer.into_bytes()).unwrap(), "hello");
45/// ```
46#[derive(Debug, Clone)]
47pub struct BoundedWriter {
48	max_len: usize,
49	buf: Vec<u8>,
50}
51
52impl BoundedWriter {
53	/// Create a new bounded writer.
54	pub fn new(max_len: usize) -> Self {
55		Self { max_len, buf: Vec::with_capacity(128) }
56	}
57
58	/// Consume the writer and extract the written bytes.
59	pub fn into_bytes(self) -> Vec<u8> {
60		self.buf
61	}
62}
63
64impl<'a> io::Write for &'a mut BoundedWriter {
65	fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
66		let len = self.buf.len() + buf.len();
67		if self.max_len >= len {
68			self.buf.extend_from_slice(buf);
69			Ok(buf.len())
70		} else {
71			Err(io::Error::new(io::ErrorKind::OutOfMemory, "Memory capacity exceeded"))
72		}
73	}
74
75	fn flush(&mut self) -> io::Result<()> {
76		Ok(())
77	}
78}
79
80/// Sink that is used to send back the result to the server for a specific method.
81#[derive(Clone, Debug)]
82pub struct MethodSink {
83	/// Channel sender.
84	tx: mpsc::Sender<String>,
85	/// Max response size in bytes for a executed call.
86	max_response_size: u32,
87}
88
89impl MethodSink {
90	/// Create a new `MethodSink` with unlimited response size.
91	pub fn new(tx: mpsc::Sender<String>) -> Self {
92		MethodSink { tx, max_response_size: u32::MAX }
93	}
94
95	/// Create a new `MethodSink` with a limited response size.
96	pub fn new_with_limit(tx: mpsc::Sender<String>, max_response_size: u32) -> Self {
97		MethodSink { tx, max_response_size }
98	}
99
100	/// Returns whether this channel is closed without needing a context.
101	pub fn is_closed(&self) -> bool {
102		self.tx.is_closed()
103	}
104
105	/// Same as [`tokio::sync::mpsc::Sender::closed`].
106	///
107	/// # Cancel safety
108	/// This method is cancel safe. Once the channel is closed,
109	/// it stays closed forever and all future calls to closed will return immediately.
110	pub async fn closed(&self) {
111		self.tx.closed().await
112	}
113
114	/// Get the max response size.
115	pub const fn max_response_size(&self) -> u32 {
116		self.max_response_size
117	}
118
119	/// Attempts to send out the message immediately and fails if the underlying
120	/// connection has been closed or if the message buffer is full.
121	///
122	/// Returns the message if the send fails such that either can be thrown away or re-sent later.
123	pub fn try_send(&mut self, msg: String) -> Result<(), TrySendError> {
124		self.tx.try_send(msg).map_err(Into::into)
125	}
126
127	/// Async send which will wait until there is space in channel buffer or that the subscription is disconnected.
128	pub async fn send(&self, msg: String) -> Result<(), DisconnectError> {
129		self.tx.send(msg).await.map_err(Into::into)
130	}
131
132	/// Send a JSON-RPC error to the client
133	pub async fn send_error<'a>(&self, id: Id<'a>, err: ErrorObject<'a>) -> Result<(), DisconnectError> {
134		let payload = ResponsePayload::<()>::error_borrowed(err);
135		let json = serde_json::to_string(&Response::new(payload, id)).expect("valid JSON; qed");
136
137		self.send(json).await
138	}
139
140	/// Similar to `MethodSink::send` but only waits for a limited time.
141	pub async fn send_timeout(&self, msg: String, timeout: Duration) -> Result<(), SendTimeoutError> {
142		self.tx.send_timeout(msg, timeout).await.map_err(Into::into)
143	}
144
145	/// Get the capacity of the channel.
146	pub fn capacity(&self) -> usize {
147		self.tx.capacity()
148	}
149
150	/// Get the max capacity of the channel.
151	pub fn max_capacity(&self) -> usize {
152		self.tx.max_capacity()
153	}
154
155	/// Waits for there to be space on the return channel.
156	pub async fn has_capacity(&self) -> Result<(), DisconnectError> {
157		match self.tx.reserve().await {
158			// The permit is thrown away here because it's just
159			// a way to ensure that the return buffer has space.
160			Ok(_) => Ok(()),
161			Err(_) => Err(DisconnectError(SubscriptionMessage::empty())),
162		}
163	}
164}
165
166/// Figure out if this is a sufficiently complete request that we can extract an [`Id`] out of, or just plain
167/// unparseable garbage.
168pub fn prepare_error(data: &[u8]) -> (Id<'_>, ErrorCode) {
169	match serde_json::from_slice::<InvalidRequest>(data) {
170		Ok(InvalidRequest { id }) => (id, ErrorCode::InvalidRequest),
171		Err(_) => (Id::Null, ErrorCode::ParseError),
172	}
173}
174
175#[cfg(test)]
176mod tests {
177	use crate::server::BoundedWriter;
178	use jsonrpsee_types::{Id, Response, ResponsePayload};
179
180	#[test]
181	fn bounded_serializer_work() {
182		let mut writer = BoundedWriter::new(100);
183		let result = ResponsePayload::success(&"success");
184		let rp = &Response::new(result, Id::Number(1));
185
186		assert!(serde_json::to_writer(&mut writer, rp).is_ok());
187		assert_eq!(String::from_utf8(writer.into_bytes()).unwrap(), r#"{"jsonrpc":"2.0","id":1,"result":"success"}"#);
188	}
189
190	#[test]
191	fn bounded_serializer_cap_works() {
192		let mut writer = BoundedWriter::new(100);
193		// NOTE: `"` is part of the serialization so 101 characters.
194		assert!(serde_json::to_writer(&mut writer, &"x".repeat(99)).is_err());
195	}
196}