jsonrpsee_core/server/
method_response.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any
4// person obtaining a copy of this software and associated
5// documentation files (the "Software"), to deal in the
6// Software without restriction, including without
7// limitation the rights to use, copy, modify, merge,
8// publish, distribute, sublicense, and/or sell copies of
9// the Software, and to permit persons to whom the Software
10// is furnished to do so, subject to the following
11// conditions:
12//
13// The above copyright notice and this permission notice
14// shall be included in all copies or substantial portions
15// of the Software.
16//
17// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
18// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
19// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
20// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
21// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
22// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
23// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
24// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25// DEALINGS IN THE SOFTWARE.
26
27//! Represent a method response.
28
29const LOG_TARGET: &str = "jsonrpsee-core";
30
31use std::io;
32use std::task::Poll;
33
34use crate::traits::ToJson;
35
36use futures_util::{Future, FutureExt};
37use http::Extensions;
38use jsonrpsee_types::error::{
39	ErrorCode, ErrorObject, OVERSIZED_RESPONSE_CODE, OVERSIZED_RESPONSE_MSG, reject_too_big_batch_response,
40};
41use jsonrpsee_types::{ErrorObjectOwned, Id, Response, ResponsePayload as InnerResponsePayload};
42use serde::Serialize;
43use serde_json::value::{RawValue, to_raw_value};
44
45#[derive(Debug, Clone)]
46enum ResponseKind {
47	MethodCall,
48	Subscription,
49	Batch,
50	Notification,
51}
52
53/// Represents a response to a method call.
54///
55/// NOTE: A subscription is also a method call but it's
56/// possible determine whether a method response
57/// is "subscription" or "ordinary method call"
58/// by calling [`MethodResponse::is_subscription`]
59#[derive(Debug)]
60pub struct MethodResponse {
61	/// Serialized JSON-RPC response,
62	json: Box<RawValue>,
63	/// Indicates whether the call was successful or not.
64	success_or_error: MethodResponseResult,
65	/// Indicates whether the call was a subscription response.
66	kind: ResponseKind,
67	/// Optional callback that may be utilized to notif
68	/// that the method response has been processed
69	on_close: Option<MethodResponseNotifyTx>,
70	/// The response's extensions.
71	extensions: Extensions,
72}
73
74impl AsRef<str> for MethodResponse {
75	fn as_ref(&self) -> &str {
76		self.json.get()
77	}
78}
79
80impl ToJson for MethodResponse {
81	fn to_json(&self) -> Result<Box<RawValue>, serde_json::Error> {
82		Ok(self.json.clone())
83	}
84}
85
86impl std::fmt::Display for MethodResponse {
87	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88		write!(f, "{}", self.json)
89	}
90}
91
92impl MethodResponse {
93	/// Returns whether the call was successful.
94	pub fn is_success(&self) -> bool {
95		self.success_or_error.is_success()
96	}
97
98	/// Returns whether the call failed.
99	pub fn is_error(&self) -> bool {
100		self.success_or_error.is_error()
101	}
102
103	/// Returns whether the response is a subscription response.
104	pub fn is_subscription(&self) -> bool {
105		matches!(self.kind, ResponseKind::Subscription)
106	}
107
108	/// Returns whether the response is a method response.
109	pub fn is_method_call(&self) -> bool {
110		matches!(self.kind, ResponseKind::MethodCall)
111	}
112
113	/// Returns whether the response is a notification response.
114	pub fn is_notification(&self) -> bool {
115		matches!(self.kind, ResponseKind::Notification)
116	}
117
118	/// Returns whether the response is a batch response.
119	pub fn is_batch(&self) -> bool {
120		matches!(self.kind, ResponseKind::Batch)
121	}
122
123	/// Consume the method response and extract the serialized JSON response.
124	pub fn into_json(self) -> Box<RawValue> {
125		self.json
126	}
127
128	/// Get the serialized JSON response.
129	pub fn to_json(&self) -> Box<RawValue> {
130		self.json.clone()
131	}
132
133	/// Consume the method response and extract the parts.
134	pub fn into_parts(self) -> (Box<RawValue>, Option<MethodResponseNotifyTx>, Extensions) {
135		(self.json, self.on_close, self.extensions)
136	}
137
138	/// Get the error code
139	///
140	/// Returns `Some(error code)` if the call failed.
141	pub fn as_error_code(&self) -> Option<i32> {
142		self.success_or_error.as_error_code()
143	}
144
145	/// Get a reference to the serialized JSON response.
146	pub fn as_json(&self) -> &RawValue {
147		&self.json
148	}
149
150	/// Create a method response from [`BatchResponse`].
151	pub fn from_batch(batch: BatchResponse) -> Self {
152		Self {
153			json: batch.json,
154			success_or_error: MethodResponseResult::Success,
155			kind: ResponseKind::Batch,
156			on_close: None,
157			extensions: batch.extensions,
158		}
159	}
160
161	/// This is similar to [`MethodResponse::response`] but sets a flag to indicate
162	/// that response is a subscription.
163	pub fn subscription_response<T>(id: Id, result: ResponsePayload<T>, max_response_size: usize) -> Self
164	where
165		T: Serialize + Clone,
166	{
167		let mut rp = Self::response(id, result, max_response_size);
168		rp.kind = ResponseKind::Subscription;
169		rp
170	}
171
172	/// Create a new method response.
173	///
174	/// If the serialization of `result` exceeds `max_response_size` then
175	/// the response is changed to an JSON-RPC error object.
176	pub fn response<T>(id: Id, rp: ResponsePayload<T>, max_response_size: usize) -> Self
177	where
178		T: Serialize + Clone,
179	{
180		let mut writer = BoundedWriter::new(max_response_size);
181
182		let success_or_error = if let InnerResponsePayload::Error(ref e) = rp.inner {
183			MethodResponseResult::Failed(e.code())
184		} else {
185			MethodResponseResult::Success
186		};
187
188		let kind = ResponseKind::MethodCall;
189
190		match serde_json::to_writer(&mut writer, &Response::new(rp.inner, id.clone())) {
191			Ok(_) => {
192				// Safety - serde_json does not emit invalid UTF-8.
193				let result = unsafe { String::from_utf8_unchecked(writer.into_bytes()) };
194				let json = RawValue::from_string(result).expect("Valid JSON String; qed");
195
196				Self { json, success_or_error, kind, on_close: rp.on_exit, extensions: Extensions::new() }
197			}
198			Err(err) => {
199				tracing::error!(target: LOG_TARGET, "Error serializing response: {:?}", err);
200
201				if err.is_io() {
202					let data = to_raw_value(&format!("Exceeded max limit of {max_response_size}")).ok();
203					let err_code = OVERSIZED_RESPONSE_CODE;
204
205					let err = InnerResponsePayload::<()>::error_borrowed(ErrorObject::borrowed(
206						err_code,
207						OVERSIZED_RESPONSE_MSG,
208						data.as_deref(),
209					));
210					let json = serde_json::value::to_raw_value(&Response::new(err, id))
211						.expect("JSON serialization infallible; qed");
212
213					Self {
214						json,
215						success_or_error: MethodResponseResult::Failed(err_code),
216						kind,
217						on_close: rp.on_exit,
218						extensions: Extensions::new(),
219					}
220				} else {
221					let err = ErrorCode::InternalError;
222					let payload = jsonrpsee_types::ResponsePayload::<()>::error(err);
223					let json = serde_json::value::to_raw_value(&Response::new(payload, id))
224						.expect("JSON serialization infallible; qed");
225					Self {
226						json,
227						success_or_error: MethodResponseResult::Failed(err.code()),
228						kind,
229						on_close: rp.on_exit,
230						extensions: Extensions::new(),
231					}
232				}
233			}
234		}
235	}
236
237	/// This is similar to [`MethodResponse::error`] but sets a flag to indicate
238	/// that error is a subscription.
239	pub fn subscription_error<'a>(id: Id, err: impl Into<ErrorObject<'a>>) -> Self {
240		let mut rp = Self::error(id, err);
241		rp.kind = ResponseKind::Subscription;
242		rp
243	}
244
245	/// Create a [`MethodResponse`] from a JSON-RPC error.
246	pub fn error<'a>(id: Id, err: impl Into<ErrorObject<'a>>) -> Self {
247		let err: ErrorObject = err.into();
248		let err_code = err.code();
249		let err = InnerResponsePayload::<()>::error_borrowed(err);
250		let json =
251			serde_json::value::to_raw_value(&Response::new(err, id)).expect("JSON serialization infallible; qed");
252		Self {
253			json,
254			success_or_error: MethodResponseResult::Failed(err_code),
255			kind: ResponseKind::MethodCall,
256			on_close: None,
257			extensions: Extensions::new(),
258		}
259	}
260
261	/// Create notification response which is a response that doesn't expect a reply.
262	pub fn notification() -> Self {
263		Self {
264			json: RawValue::NULL.to_owned(),
265			success_or_error: MethodResponseResult::Success,
266			kind: ResponseKind::Notification,
267			on_close: None,
268			extensions: Extensions::new(),
269		}
270	}
271
272	/// Returns a reference to the associated extensions.
273	pub fn extensions(&self) -> &Extensions {
274		&self.extensions
275	}
276
277	/// Returns a mut reference to the associated extensions.
278	pub fn extensions_mut(&mut self) -> &mut Extensions {
279		&mut self.extensions
280	}
281
282	/// Consumes the method response and returns a new one with the given extensions.
283	pub fn with_extensions(self, extensions: Extensions) -> Self {
284		Self { extensions, ..self }
285	}
286}
287
288/// Represent the outcome of a method call success or failed.
289#[derive(Debug, Copy, Clone)]
290enum MethodResponseResult {
291	/// The method call was successful.
292	Success,
293	/// The method call failed with error code.
294	Failed(i32),
295}
296
297impl MethodResponseResult {
298	/// Returns whether the call was successful.
299	fn is_success(&self) -> bool {
300		matches!(self, MethodResponseResult::Success)
301	}
302
303	/// Returns whether the call failed.
304	fn is_error(&self) -> bool {
305		matches!(self, MethodResponseResult::Failed(_))
306	}
307
308	/// Get the error code
309	///
310	/// Returns `Some(error code)` if the call failed.
311	fn as_error_code(&self) -> Option<i32> {
312		match self {
313			Self::Failed(e) => Some(*e),
314			_ => None,
315		}
316	}
317}
318
319/// Builder to build a `BatchResponse`.
320#[derive(Debug, Clone, Default)]
321pub struct BatchResponseBuilder {
322	/// Serialized JSON-RPC response,
323	result: String,
324	/// Max limit for the batch
325	max_response_size: usize,
326	/// Extensions for the batch response.
327	extensions: Extensions,
328}
329
330impl BatchResponseBuilder {
331	/// Create a new batch response builder with limit.
332	pub fn new_with_limit(limit: usize) -> Self {
333		let mut initial = String::with_capacity(2048);
334		initial.push('[');
335
336		Self { result: initial, max_response_size: limit, extensions: Extensions::new() }
337	}
338
339	/// Append a result from an individual method to the batch response.
340	///
341	/// Fails if the max limit is exceeded and returns to error response to
342	/// return early in order to not process method call responses which are thrown away anyway.
343	pub fn append(&mut self, response: MethodResponse) -> Result<(), MethodResponse> {
344		// `,` will occupy one extra byte for each entry
345		// on the last item the `,` is replaced by `]`.
346		let len = response.json.get().len() + self.result.len() + 1;
347		self.extensions.extend(response.extensions);
348
349		if len > self.max_response_size {
350			Err(MethodResponse::error(Id::Null, reject_too_big_batch_response(self.max_response_size)))
351		} else {
352			self.result.push_str(response.json.get());
353			self.result.push(',');
354			Ok(())
355		}
356	}
357
358	/// Check if the batch is empty.
359	pub fn is_empty(&self) -> bool {
360		self.result.len() <= 1
361	}
362
363	/// Finish the batch response
364	pub fn finish(mut self) -> BatchResponse {
365		if self.result.len() == 1 {
366			BatchResponse {
367				json: batch_response_error(Id::Null, ErrorObject::from(ErrorCode::InvalidRequest)),
368				extensions: self.extensions,
369			}
370		} else {
371			self.result.pop();
372			self.result.push(']');
373			let json = RawValue::from_string(self.result).expect("BatchResponse builds a valid JSON String; qed");
374			BatchResponse { json, extensions: self.extensions }
375		}
376	}
377}
378
379/// Serialized batch response.
380#[derive(Debug, Clone)]
381pub struct BatchResponse {
382	json: Box<RawValue>,
383	extensions: Extensions,
384}
385
386/// Create a JSON-RPC error response.
387pub fn batch_response_error(id: Id, err: impl Into<ErrorObject<'static>>) -> Box<RawValue> {
388	let err = InnerResponsePayload::<()>::error_borrowed(err);
389	serde_json::value::to_raw_value(&Response::new(err, id)).expect("JSON serialization infallible; qed")
390}
391
392/// Similar to [`jsonrpsee_types::ResponsePayload`] but possible to with an async-like
393/// API to detect when a method response has been sent.
394#[derive(Debug)]
395pub struct ResponsePayload<'a, T>
396where
397	T: Clone,
398{
399	inner: InnerResponsePayload<'a, T>,
400	on_exit: Option<MethodResponseNotifyTx>,
401}
402
403impl<'a, T: Clone> From<InnerResponsePayload<'a, T>> for ResponsePayload<'a, T> {
404	fn from(inner: InnerResponsePayload<'a, T>) -> Self {
405		Self { inner, on_exit: None }
406	}
407}
408
409impl<'a, T> ResponsePayload<'a, T>
410where
411	T: Clone,
412{
413	/// Create a successful owned response payload.
414	pub fn success(t: T) -> Self {
415		InnerResponsePayload::success(t).into()
416	}
417
418	/// Create a successful borrowed response payload.
419	pub fn success_borrowed(t: &'a T) -> Self {
420		InnerResponsePayload::success_borrowed(t).into()
421	}
422
423	/// Create an error response payload.
424	pub fn error(e: impl Into<ErrorObjectOwned>) -> Self {
425		InnerResponsePayload::error(e.into()).into()
426	}
427
428	/// Create a borrowd error response payload.
429	pub fn error_borrowed(e: impl Into<ErrorObject<'a>>) -> Self {
430		InnerResponsePayload::error_borrowed(e.into()).into()
431	}
432
433	/// Consumes the [`ResponsePayload`] and produces new [`ResponsePayload`] and a future
434	/// [`MethodResponseFuture`] that will be resolved once the response has been processed.
435	///
436	/// If this has been called more than once then this will overwrite
437	/// the old result the previous future(s) will be resolved with error.
438	pub fn notify_on_completion(mut self) -> (Self, MethodResponseFuture) {
439		let (tx, rx) = response_channel();
440		self.on_exit = Some(tx);
441		(self, rx)
442	}
443
444	/// Convert the response payload into owned.
445	pub fn into_owned(self) -> ResponsePayload<'static, T> {
446		ResponsePayload { inner: self.inner.into_owned(), on_exit: self.on_exit }
447	}
448}
449
450impl<T> From<ErrorCode> for ResponsePayload<'_, T>
451where
452	T: Clone,
453{
454	fn from(code: ErrorCode) -> Self {
455		let err: ErrorObject = code.into();
456		Self::error(err)
457	}
458}
459
460/// Create a channel to be used in combination with [`ResponsePayload`] to
461/// notify when a method call has been processed.
462fn response_channel() -> (MethodResponseNotifyTx, MethodResponseFuture) {
463	let (tx, rx) = tokio::sync::oneshot::channel();
464	(MethodResponseNotifyTx(tx), MethodResponseFuture(rx))
465}
466
467/// Sends a message once the method response has been processed.
468#[derive(Debug)]
469pub struct MethodResponseNotifyTx(tokio::sync::oneshot::Sender<NotifyMsg>);
470
471impl MethodResponseNotifyTx {
472	/// Send a notify message.
473	pub fn notify(self, is_success: bool) {
474		let msg = if is_success { NotifyMsg::Ok } else { NotifyMsg::Err };
475		_ = self.0.send(msg);
476	}
477}
478
479/// Future that resolves when the method response has been processed.
480#[derive(Debug)]
481pub struct MethodResponseFuture(tokio::sync::oneshot::Receiver<NotifyMsg>);
482
483/// A message that that tells whether notification
484/// was succesful or not.
485#[derive(Debug, Copy, Clone)]
486pub enum NotifyMsg {
487	/// The response was successfully processed.
488	Ok,
489	/// The response was the wrong kind
490	/// such an error response when
491	/// one expected a successful response.
492	Err,
493}
494
495/// Method response error.
496#[derive(Debug, Copy, Clone)]
497pub enum MethodResponseError {
498	/// The connection was closed.
499	Closed,
500	/// The response was a JSON-RPC error.
501	JsonRpcError,
502}
503
504impl Future for MethodResponseFuture {
505	type Output = Result<(), MethodResponseError>;
506
507	fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
508		match self.0.poll_unpin(cx) {
509			Poll::Ready(Ok(NotifyMsg::Ok)) => Poll::Ready(Ok(())),
510			Poll::Ready(Ok(NotifyMsg::Err)) => Poll::Ready(Err(MethodResponseError::JsonRpcError)),
511			Poll::Ready(Err(_)) => Poll::Ready(Err(MethodResponseError::Closed)),
512			Poll::Pending => Poll::Pending,
513		}
514	}
515}
516
517/// Bounded writer that allows writing at most `max_len` bytes.
518#[derive(Debug, Clone)]
519struct BoundedWriter {
520	max_len: usize,
521	buf: Vec<u8>,
522}
523
524impl BoundedWriter {
525	/// Create a new bounded writer.
526	pub fn new(max_len: usize) -> Self {
527		Self { max_len, buf: Vec::with_capacity(128) }
528	}
529
530	/// Consume the writer and extract the written bytes.
531	pub fn into_bytes(self) -> Vec<u8> {
532		self.buf
533	}
534}
535
536impl io::Write for &mut BoundedWriter {
537	fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
538		let len = self.buf.len() + buf.len();
539		if self.max_len >= len {
540			self.buf.extend_from_slice(buf);
541			Ok(buf.len())
542		} else {
543			Err(io::Error::new(io::ErrorKind::OutOfMemory, "Memory capacity exceeded"))
544		}
545	}
546
547	fn flush(&mut self) -> io::Result<()> {
548		Ok(())
549	}
550}
551
552#[cfg(test)]
553mod tests {
554	use super::{BatchResponseBuilder, BoundedWriter, Id, MethodResponse, ResponsePayload};
555
556	#[test]
557	fn batch_with_single_works() {
558		let method = MethodResponse::response(Id::Number(1), ResponsePayload::success_borrowed(&"a"), usize::MAX);
559		assert_eq!(method.json.get().len(), 37);
560
561		// Recall a batch appends two bytes for the `[]`.
562		let mut builder = BatchResponseBuilder::new_with_limit(39);
563		builder.append(method).unwrap();
564		let batch = builder.finish();
565
566		assert_eq!(batch.json.get(), r#"[{"jsonrpc":"2.0","id":1,"result":"a"}]"#)
567	}
568
569	#[test]
570	fn batch_with_multiple_works() {
571		let m1 = MethodResponse::response(Id::Number(1), ResponsePayload::success_borrowed(&"a"), usize::MAX);
572		let m11 = MethodResponse::response(Id::Number(1), ResponsePayload::success_borrowed(&"a"), usize::MAX);
573		assert_eq!(m1.json.get().len(), 37);
574
575		// Recall a batch appends two bytes for the `[]` and one byte for `,` to append a method call.
576		// so it should be 2 + (37 * n) + (n-1)
577		let limit = 2 + (37 * 2) + 1;
578		let mut builder = BatchResponseBuilder::new_with_limit(limit);
579		builder.append(m1).unwrap();
580		builder.append(m11).unwrap();
581		let batch = builder.finish();
582
583		assert_eq!(batch.json.get(), r#"[{"jsonrpc":"2.0","id":1,"result":"a"},{"jsonrpc":"2.0","id":1,"result":"a"}]"#)
584	}
585
586	#[test]
587	fn batch_empty_err() {
588		let batch = BatchResponseBuilder::new_with_limit(1024).finish();
589
590		let exp_err = r#"{"jsonrpc":"2.0","id":null,"error":{"code":-32600,"message":"Invalid request"}}"#;
591		assert_eq!(batch.json.get(), exp_err);
592	}
593
594	#[test]
595	fn batch_too_big() {
596		let method = MethodResponse::response(Id::Number(1), ResponsePayload::success_borrowed(&"a".repeat(28)), 128);
597		assert_eq!(method.json.get().len(), 64);
598
599		let batch = BatchResponseBuilder::new_with_limit(63).append(method).unwrap_err();
600
601		let exp_err = r#"{"jsonrpc":"2.0","id":null,"error":{"code":-32011,"message":"The batch response was too large","data":"Exceeded max limit of 63"}}"#;
602		assert_eq!(batch.json.get(), exp_err);
603	}
604
605	#[test]
606	fn bounded_serializer_work() {
607		use jsonrpsee_types::{Response, ResponsePayload};
608
609		let mut writer = BoundedWriter::new(100);
610		let result = ResponsePayload::success(&"success");
611		let rp = &Response::new(result, Id::Number(1));
612
613		assert!(serde_json::to_writer(&mut writer, rp).is_ok());
614		assert_eq!(String::from_utf8(writer.into_bytes()).unwrap(), r#"{"jsonrpc":"2.0","id":1,"result":"success"}"#);
615	}
616
617	#[test]
618	fn bounded_serializer_cap_works() {
619		let mut writer = BoundedWriter::new(100);
620		// NOTE: `"` is part of the serialization so 101 characters.
621		assert!(serde_json::to_writer(&mut writer, &"x".repeat(99)).is_err());
622	}
623}