jsonrpsee_core/server/
method_response.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any
4// person obtaining a copy of this software and associated
5// documentation files (the "Software"), to deal in the
6// Software without restriction, including without
7// limitation the rights to use, copy, modify, merge,
8// publish, distribute, sublicense, and/or sell copies of
9// the Software, and to permit persons to whom the Software
10// is furnished to do so, subject to the following
11// conditions:
12//
13// The above copyright notice and this permission notice
14// shall be included in all copies or substantial portions
15// of the Software.
16//
17// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
18// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
19// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
20// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
21// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
22// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
23// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
24// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25// DEALINGS IN THE SOFTWARE.
26
27use crate::server::{BoundedWriter, LOG_TARGET};
28use std::task::Poll;
29
30use futures_util::{Future, FutureExt};
31use http::Extensions;
32use jsonrpsee_types::error::{
33	reject_too_big_batch_response, ErrorCode, ErrorObject, OVERSIZED_RESPONSE_CODE, OVERSIZED_RESPONSE_MSG,
34};
35use jsonrpsee_types::{ErrorObjectOwned, Id, Response, ResponsePayload as InnerResponsePayload};
36use serde::Serialize;
37use serde_json::value::to_raw_value;
38
39#[derive(Debug, Clone)]
40enum ResponseKind {
41	MethodCall,
42	Subscription,
43	Batch,
44}
45
46/// Represents a response to a method call.
47///
48/// NOTE: A subscription is also a method call but it's
49/// possible determine whether a method response
50/// is "subscription" or "ordinary method call"
51/// by calling [`MethodResponse::is_subscription`]
52#[derive(Debug)]
53pub struct MethodResponse {
54	/// Serialized JSON-RPC response,
55	result: String,
56	/// Indicates whether the call was successful or not.
57	success_or_error: MethodResponseResult,
58	/// Indicates whether the call was a subscription response.
59	kind: ResponseKind,
60	/// Optional callback that may be utilized to notif
61	/// that the method response has been processed
62	on_close: Option<MethodResponseNotifyTx>,
63	/// The response's extensions.
64	extensions: Extensions,
65}
66
67impl MethodResponse {
68	/// Returns whether the call was successful.
69	pub fn is_success(&self) -> bool {
70		self.success_or_error.is_success()
71	}
72
73	/// Returns whether the call failed.
74	pub fn is_error(&self) -> bool {
75		self.success_or_error.is_error()
76	}
77
78	/// Returns whether the response is a subscription response.
79	pub fn is_subscription(&self) -> bool {
80		matches!(self.kind, ResponseKind::Subscription)
81	}
82
83	/// Returns whether the response is a method response.
84	pub fn is_method_call(&self) -> bool {
85		matches!(self.kind, ResponseKind::MethodCall)
86	}
87
88	/// Returns whether the response is a batch response.
89	pub fn is_batch(&self) -> bool {
90		matches!(self.kind, ResponseKind::Batch)
91	}
92
93	/// Consume the method response and extract the serialized response.
94	pub fn into_result(self) -> String {
95		self.result
96	}
97
98	/// Extract the serialized response as a String.
99	pub fn to_result(&self) -> String {
100		self.result.clone()
101	}
102
103	/// Consume the method response and extract the parts.
104	pub fn into_parts(self) -> (String, Option<MethodResponseNotifyTx>) {
105		(self.result, self.on_close)
106	}
107
108	/// Get the error code
109	///
110	/// Returns `Some(error code)` if the call failed.
111	pub fn as_error_code(&self) -> Option<i32> {
112		self.success_or_error.as_error_code()
113	}
114
115	/// Get a reference to the serialized response.
116	pub fn as_result(&self) -> &str {
117		&self.result
118	}
119
120	/// Create a method response from [`BatchResponse`].
121	pub fn from_batch(batch: BatchResponse) -> Self {
122		Self {
123			result: batch.0,
124			success_or_error: MethodResponseResult::Success,
125			kind: ResponseKind::Batch,
126			on_close: None,
127			extensions: Extensions::new(),
128		}
129	}
130
131	/// This is similar to [`MethodResponse::response`] but sets a flag to indicate
132	/// that response is a subscription.
133	pub fn subscription_response<T>(id: Id, result: ResponsePayload<T>, max_response_size: usize) -> Self
134	where
135		T: Serialize + Clone,
136	{
137		let mut rp = Self::response(id, result, max_response_size);
138		rp.kind = ResponseKind::Subscription;
139		rp
140	}
141
142	/// Create a new method response.
143	///
144	/// If the serialization of `result` exceeds `max_response_size` then
145	/// the response is changed to an JSON-RPC error object.
146	pub fn response<T>(id: Id, rp: ResponsePayload<T>, max_response_size: usize) -> Self
147	where
148		T: Serialize + Clone,
149	{
150		let mut writer = BoundedWriter::new(max_response_size);
151
152		let success_or_error = if let InnerResponsePayload::Error(ref e) = rp.inner {
153			MethodResponseResult::Failed(e.code())
154		} else {
155			MethodResponseResult::Success
156		};
157
158		let kind = ResponseKind::MethodCall;
159
160		match serde_json::to_writer(&mut writer, &Response::new(rp.inner, id.clone())) {
161			Ok(_) => {
162				// Safety - serde_json does not emit invalid UTF-8.
163				let result = unsafe { String::from_utf8_unchecked(writer.into_bytes()) };
164
165				Self { result, success_or_error, kind, on_close: rp.on_exit, extensions: Extensions::new() }
166			}
167			Err(err) => {
168				tracing::error!(target: LOG_TARGET, "Error serializing response: {:?}", err);
169
170				if err.is_io() {
171					let data = to_raw_value(&format!("Exceeded max limit of {max_response_size}")).ok();
172					let err_code = OVERSIZED_RESPONSE_CODE;
173
174					let err = InnerResponsePayload::<()>::error_borrowed(ErrorObject::borrowed(
175						err_code,
176						OVERSIZED_RESPONSE_MSG,
177						data.as_deref(),
178					));
179					let result =
180						serde_json::to_string(&Response::new(err, id)).expect("JSON serialization infallible; qed");
181
182					Self {
183						result,
184						success_or_error: MethodResponseResult::Failed(err_code),
185						kind,
186						on_close: rp.on_exit,
187						extensions: Extensions::new(),
188					}
189				} else {
190					let err = ErrorCode::InternalError;
191					let payload = jsonrpsee_types::ResponsePayload::<()>::error(err);
192					let result =
193						serde_json::to_string(&Response::new(payload, id)).expect("JSON serialization infallible; qed");
194					Self {
195						result,
196						success_or_error: MethodResponseResult::Failed(err.code()),
197						kind,
198						on_close: rp.on_exit,
199						extensions: Extensions::new(),
200					}
201				}
202			}
203		}
204	}
205
206	/// This is similar to [`MethodResponse::error`] but sets a flag to indicate
207	/// that error is a subscription.
208	pub fn subscription_error<'a>(id: Id, err: impl Into<ErrorObject<'a>>) -> Self {
209		let mut rp = Self::error(id, err);
210		rp.kind = ResponseKind::Subscription;
211		rp
212	}
213
214	/// Create a [`MethodResponse`] from a JSON-RPC error.
215	pub fn error<'a>(id: Id, err: impl Into<ErrorObject<'a>>) -> Self {
216		let err: ErrorObject = err.into();
217		let err_code = err.code();
218		let err = InnerResponsePayload::<()>::error_borrowed(err);
219		let result = serde_json::to_string(&Response::new(err, id)).expect("JSON serialization infallible; qed");
220		Self {
221			result,
222			success_or_error: MethodResponseResult::Failed(err_code),
223			kind: ResponseKind::MethodCall,
224			on_close: None,
225			extensions: Extensions::new(),
226		}
227	}
228
229	/// Returns a reference to the associated extensions.
230	pub fn extensions(&self) -> &Extensions {
231		&self.extensions
232	}
233
234	/// Returns a reference to the associated extensions.
235	pub fn extensions_mut(&mut self) -> &mut Extensions {
236		&mut self.extensions
237	}
238
239	/// Consumes the method response and returns a new one with the given extensions.
240	pub fn with_extensions(self, extensions: Extensions) -> Self {
241		Self { extensions, ..self }
242	}
243}
244
245/// Represent the outcome of a method call success or failed.
246#[derive(Debug, Copy, Clone)]
247enum MethodResponseResult {
248	/// The method call was successful.
249	Success,
250	/// The method call failed with error code.
251	Failed(i32),
252}
253
254impl MethodResponseResult {
255	/// Returns whether the call was successful.
256	fn is_success(&self) -> bool {
257		matches!(self, MethodResponseResult::Success)
258	}
259
260	/// Returns whether the call failed.
261	fn is_error(&self) -> bool {
262		matches!(self, MethodResponseResult::Failed(_))
263	}
264
265	/// Get the error code
266	///
267	/// Returns `Some(error code)` if the call failed.
268	fn as_error_code(&self) -> Option<i32> {
269		match self {
270			Self::Failed(e) => Some(*e),
271			_ => None,
272		}
273	}
274}
275
276/// Builder to build a `BatchResponse`.
277#[derive(Debug, Clone, Default)]
278pub struct BatchResponseBuilder {
279	/// Serialized JSON-RPC response,
280	result: String,
281	/// Max limit for the batch
282	max_response_size: usize,
283}
284
285impl BatchResponseBuilder {
286	/// Create a new batch response builder with limit.
287	pub fn new_with_limit(limit: usize) -> Self {
288		let mut initial = String::with_capacity(2048);
289		initial.push('[');
290
291		Self { result: initial, max_response_size: limit }
292	}
293
294	/// Append a result from an individual method to the batch response.
295	///
296	/// Fails if the max limit is exceeded and returns to error response to
297	/// return early in order to not process method call responses which are thrown away anyway.
298	pub fn append(&mut self, response: &MethodResponse) -> Result<(), MethodResponse> {
299		// `,` will occupy one extra byte for each entry
300		// on the last item the `,` is replaced by `]`.
301		let len = response.result.len() + self.result.len() + 1;
302
303		if len > self.max_response_size {
304			Err(MethodResponse::error(Id::Null, reject_too_big_batch_response(self.max_response_size)))
305		} else {
306			self.result.push_str(&response.result);
307			self.result.push(',');
308			Ok(())
309		}
310	}
311
312	/// Check if the batch is empty.
313	pub fn is_empty(&self) -> bool {
314		self.result.len() <= 1
315	}
316
317	/// Finish the batch response
318	pub fn finish(mut self) -> BatchResponse {
319		if self.result.len() == 1 {
320			BatchResponse(batch_response_error(Id::Null, ErrorObject::from(ErrorCode::InvalidRequest)))
321		} else {
322			self.result.pop();
323			self.result.push(']');
324			BatchResponse(self.result)
325		}
326	}
327}
328
329/// Serialized batch response.
330#[derive(Debug, Clone)]
331pub struct BatchResponse(String);
332
333/// Create a JSON-RPC error response.
334pub fn batch_response_error(id: Id, err: impl Into<ErrorObject<'static>>) -> String {
335	let err = InnerResponsePayload::<()>::error_borrowed(err);
336	serde_json::to_string(&Response::new(err, id)).expect("JSON serialization infallible; qed")
337}
338
339/// Similar to [`jsonrpsee_types::ResponsePayload`] but possible to with an async-like
340/// API to detect when a method response has been sent.
341#[derive(Debug)]
342pub struct ResponsePayload<'a, T>
343where
344	T: Clone,
345{
346	inner: InnerResponsePayload<'a, T>,
347	on_exit: Option<MethodResponseNotifyTx>,
348}
349
350impl<'a, T: Clone> From<InnerResponsePayload<'a, T>> for ResponsePayload<'a, T> {
351	fn from(inner: InnerResponsePayload<'a, T>) -> Self {
352		Self { inner, on_exit: None }
353	}
354}
355
356impl<'a, T> ResponsePayload<'a, T>
357where
358	T: Clone,
359{
360	/// Create a successful owned response payload.
361	pub fn success(t: T) -> Self {
362		InnerResponsePayload::success(t).into()
363	}
364
365	/// Create a successful borrowed response payload.
366	pub fn success_borrowed(t: &'a T) -> Self {
367		InnerResponsePayload::success_borrowed(t).into()
368	}
369
370	/// Create an error response payload.
371	pub fn error(e: impl Into<ErrorObjectOwned>) -> Self {
372		InnerResponsePayload::error(e.into()).into()
373	}
374
375	/// Create a borrowd error response payload.
376	pub fn error_borrowed(e: impl Into<ErrorObject<'a>>) -> Self {
377		InnerResponsePayload::error_borrowed(e.into()).into()
378	}
379
380	/// Consumes the [`ResponsePayload`] and produces new [`ResponsePayload`] and a future
381	/// [`MethodResponseFuture`] that will be resolved once the response has been processed.
382	///
383	/// If this has been called more than once then this will overwrite
384	/// the old result the previous future(s) will be resolved with error.
385	pub fn notify_on_completion(mut self) -> (Self, MethodResponseFuture) {
386		let (tx, rx) = response_channel();
387		self.on_exit = Some(tx);
388		(self, rx)
389	}
390
391	/// Convert the response payload into owned.
392	pub fn into_owned(self) -> ResponsePayload<'static, T> {
393		ResponsePayload { inner: self.inner.into_owned(), on_exit: self.on_exit }
394	}
395}
396
397impl<'a, T> From<ErrorCode> for ResponsePayload<'a, T>
398where
399	T: Clone,
400{
401	fn from(code: ErrorCode) -> Self {
402		let err: ErrorObject = code.into();
403		Self::error(err)
404	}
405}
406
407/// Create a channel to be used in combination with [`ResponsePayload`] to
408/// notify when a method call has been processed.
409fn response_channel() -> (MethodResponseNotifyTx, MethodResponseFuture) {
410	let (tx, rx) = tokio::sync::oneshot::channel();
411	(MethodResponseNotifyTx(tx), MethodResponseFuture(rx))
412}
413
414/// Sends a message once the method response has been processed.
415#[derive(Debug)]
416pub struct MethodResponseNotifyTx(tokio::sync::oneshot::Sender<NotifyMsg>);
417
418impl MethodResponseNotifyTx {
419	/// Send a notify message.
420	pub fn notify(self, is_success: bool) {
421		let msg = if is_success { NotifyMsg::Ok } else { NotifyMsg::Err };
422		_ = self.0.send(msg);
423	}
424}
425
426/// Future that resolves when the method response has been processed.
427#[derive(Debug)]
428pub struct MethodResponseFuture(tokio::sync::oneshot::Receiver<NotifyMsg>);
429
430/// A message that that tells whether notification
431/// was succesful or not.
432#[derive(Debug, Copy, Clone)]
433pub enum NotifyMsg {
434	/// The response was succesfully processed.
435	Ok,
436	/// The response was the wrong kind
437	/// such an error response when
438	/// one expected a succesful response.
439	Err,
440}
441
442/// Method response error.
443#[derive(Debug, Copy, Clone)]
444pub enum MethodResponseError {
445	/// The connection was closed.
446	Closed,
447	/// The response was a JSON-RPC error.
448	JsonRpcError,
449}
450
451impl Future for MethodResponseFuture {
452	type Output = Result<(), MethodResponseError>;
453
454	fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
455		match self.0.poll_unpin(cx) {
456			Poll::Ready(Ok(NotifyMsg::Ok)) => Poll::Ready(Ok(())),
457			Poll::Ready(Ok(NotifyMsg::Err)) => Poll::Ready(Err(MethodResponseError::JsonRpcError)),
458			Poll::Ready(Err(_)) => Poll::Ready(Err(MethodResponseError::Closed)),
459			Poll::Pending => Poll::Pending,
460		}
461	}
462}
463
464#[cfg(test)]
465mod tests {
466	use super::{BatchResponseBuilder, MethodResponse, ResponsePayload};
467	use jsonrpsee_types::Id;
468
469	#[test]
470	fn batch_with_single_works() {
471		let method = MethodResponse::response(Id::Number(1), ResponsePayload::success_borrowed(&"a"), usize::MAX);
472		assert_eq!(method.result.len(), 37);
473
474		// Recall a batch appends two bytes for the `[]`.
475		let mut builder = BatchResponseBuilder::new_with_limit(39);
476		builder.append(&method).unwrap();
477		let batch = builder.finish();
478
479		assert_eq!(batch.0, r#"[{"jsonrpc":"2.0","id":1,"result":"a"}]"#)
480	}
481
482	#[test]
483	fn batch_with_multiple_works() {
484		let m1 = MethodResponse::response(Id::Number(1), ResponsePayload::success_borrowed(&"a"), usize::MAX);
485		assert_eq!(m1.result.len(), 37);
486
487		// Recall a batch appends two bytes for the `[]` and one byte for `,` to append a method call.
488		// so it should be 2 + (37 * n) + (n-1)
489		let limit = 2 + (37 * 2) + 1;
490		let mut builder = BatchResponseBuilder::new_with_limit(limit);
491		builder.append(&m1).unwrap();
492		builder.append(&m1).unwrap();
493		let batch = builder.finish();
494
495		assert_eq!(batch.0, r#"[{"jsonrpc":"2.0","id":1,"result":"a"},{"jsonrpc":"2.0","id":1,"result":"a"}]"#)
496	}
497
498	#[test]
499	fn batch_empty_err() {
500		let batch = BatchResponseBuilder::new_with_limit(1024).finish();
501
502		let exp_err = r#"{"jsonrpc":"2.0","id":null,"error":{"code":-32600,"message":"Invalid request"}}"#;
503		assert_eq!(batch.0, exp_err);
504	}
505
506	#[test]
507	fn batch_too_big() {
508		let method = MethodResponse::response(Id::Number(1), ResponsePayload::success_borrowed(&"a".repeat(28)), 128);
509		assert_eq!(method.result.len(), 64);
510
511		let batch = BatchResponseBuilder::new_with_limit(63).append(&method).unwrap_err();
512
513		let exp_err = r#"{"jsonrpc":"2.0","id":null,"error":{"code":-32011,"message":"The batch response was too large","data":"Exceeded max limit of 63"}}"#;
514		assert_eq!(batch.result, exp_err);
515	}
516}