const LOG_TARGET: &str = "jsonrpsee-core";
use std::io;
use std::task::Poll;
use crate::traits::ToJson;
use futures_util::{Future, FutureExt};
use http::Extensions;
use jsonrpsee_types::error::{
ErrorCode, ErrorObject, OVERSIZED_RESPONSE_CODE, OVERSIZED_RESPONSE_MSG, reject_too_big_batch_response,
};
use jsonrpsee_types::{ErrorObjectOwned, Id, Response, ResponsePayload as InnerResponsePayload};
use serde::Serialize;
use serde_json::value::{RawValue, to_raw_value};
#[derive(Debug, Clone)]
enum ResponseKind {
MethodCall,
Subscription,
Batch,
Notification,
}
#[derive(Debug)]
pub struct MethodResponse {
json: Box<RawValue>,
success_or_error: MethodResponseResult,
kind: ResponseKind,
on_close: Option<MethodResponseNotifyTx>,
extensions: Extensions,
}
impl AsRef<str> for MethodResponse {
fn as_ref(&self) -> &str {
self.json.get()
}
}
impl ToJson for MethodResponse {
fn to_json(&self) -> Result<Box<RawValue>, serde_json::Error> {
Ok(self.json.clone())
}
}
impl std::fmt::Display for MethodResponse {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.json)
}
}
impl MethodResponse {
pub fn is_success(&self) -> bool {
self.success_or_error.is_success()
}
pub fn is_error(&self) -> bool {
self.success_or_error.is_error()
}
pub fn is_subscription(&self) -> bool {
matches!(self.kind, ResponseKind::Subscription)
}
pub fn is_method_call(&self) -> bool {
matches!(self.kind, ResponseKind::MethodCall)
}
pub fn is_notification(&self) -> bool {
matches!(self.kind, ResponseKind::Notification)
}
pub fn is_batch(&self) -> bool {
matches!(self.kind, ResponseKind::Batch)
}
pub fn into_json(self) -> Box<RawValue> {
self.json
}
pub fn to_json(&self) -> Box<RawValue> {
self.json.clone()
}
pub fn into_parts(self) -> (Box<RawValue>, Option<MethodResponseNotifyTx>, Extensions) {
(self.json, self.on_close, self.extensions)
}
pub fn as_error_code(&self) -> Option<i32> {
self.success_or_error.as_error_code()
}
pub fn as_json(&self) -> &RawValue {
&self.json
}
pub fn from_batch(batch: BatchResponse) -> Self {
Self {
json: batch.json,
success_or_error: MethodResponseResult::Success,
kind: ResponseKind::Batch,
on_close: None,
extensions: batch.extensions,
}
}
pub fn subscription_response<T>(id: Id, result: ResponsePayload<T>, max_response_size: usize) -> Self
where
T: Serialize + Clone,
{
let mut rp = Self::response(id, result, max_response_size);
rp.kind = ResponseKind::Subscription;
rp
}
pub fn response<T>(id: Id, rp: ResponsePayload<T>, max_response_size: usize) -> Self
where
T: Serialize + Clone,
{
let mut writer = BoundedWriter::new(max_response_size);
let success_or_error = if let InnerResponsePayload::Error(ref e) = rp.inner {
MethodResponseResult::Failed(e.code())
} else {
MethodResponseResult::Success
};
let kind = ResponseKind::MethodCall;
match serde_json::to_writer(&mut writer, &Response::new(rp.inner, id.clone())) {
Ok(_) => {
let result = unsafe { String::from_utf8_unchecked(writer.into_bytes()) };
let json = RawValue::from_string(result).expect("Valid JSON String; qed");
Self { json, success_or_error, kind, on_close: rp.on_exit, extensions: Extensions::new() }
}
Err(err) => {
tracing::error!(target: LOG_TARGET, "Error serializing response: {:?}", err);
if err.is_io() {
let data = to_raw_value(&format!("Exceeded max limit of {max_response_size}")).ok();
let err_code = OVERSIZED_RESPONSE_CODE;
let err = InnerResponsePayload::<()>::error_borrowed(ErrorObject::borrowed(
err_code,
OVERSIZED_RESPONSE_MSG,
data.as_deref(),
));
let json = serde_json::value::to_raw_value(&Response::new(err, id))
.expect("JSON serialization infallible; qed");
Self {
json,
success_or_error: MethodResponseResult::Failed(err_code),
kind,
on_close: rp.on_exit,
extensions: Extensions::new(),
}
} else {
let err = ErrorCode::InternalError;
let payload = jsonrpsee_types::ResponsePayload::<()>::error(err);
let json = serde_json::value::to_raw_value(&Response::new(payload, id))
.expect("JSON serialization infallible; qed");
Self {
json,
success_or_error: MethodResponseResult::Failed(err.code()),
kind,
on_close: rp.on_exit,
extensions: Extensions::new(),
}
}
}
}
}
pub fn subscription_error<'a>(id: Id, err: impl Into<ErrorObject<'a>>) -> Self {
let mut rp = Self::error(id, err);
rp.kind = ResponseKind::Subscription;
rp
}
pub fn error<'a>(id: Id, err: impl Into<ErrorObject<'a>>) -> Self {
let err: ErrorObject = err.into();
let err_code = err.code();
let err = InnerResponsePayload::<()>::error_borrowed(err);
let json =
serde_json::value::to_raw_value(&Response::new(err, id)).expect("JSON serialization infallible; qed");
Self {
json,
success_or_error: MethodResponseResult::Failed(err_code),
kind: ResponseKind::MethodCall,
on_close: None,
extensions: Extensions::new(),
}
}
pub fn notification() -> Self {
Self {
json: RawValue::NULL.to_owned(),
success_or_error: MethodResponseResult::Success,
kind: ResponseKind::Notification,
on_close: None,
extensions: Extensions::new(),
}
}
pub fn extensions(&self) -> &Extensions {
&self.extensions
}
pub fn extensions_mut(&mut self) -> &mut Extensions {
&mut self.extensions
}
pub fn with_extensions(self, extensions: Extensions) -> Self {
Self { extensions, ..self }
}
}
#[derive(Debug, Copy, Clone)]
enum MethodResponseResult {
Success,
Failed(i32),
}
impl MethodResponseResult {
fn is_success(&self) -> bool {
matches!(self, MethodResponseResult::Success)
}
fn is_error(&self) -> bool {
matches!(self, MethodResponseResult::Failed(_))
}
fn as_error_code(&self) -> Option<i32> {
match self {
Self::Failed(e) => Some(*e),
_ => None,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct BatchResponseBuilder {
result: String,
max_response_size: usize,
extensions: Extensions,
}
impl BatchResponseBuilder {
pub fn new_with_limit(limit: usize) -> Self {
let mut initial = String::with_capacity(2048);
initial.push('[');
Self { result: initial, max_response_size: limit, extensions: Extensions::new() }
}
pub fn append(&mut self, response: MethodResponse) -> Result<(), MethodResponse> {
let len = response.json.get().len() + self.result.len() + 1;
self.extensions.extend(response.extensions);
if len > self.max_response_size {
Err(MethodResponse::error(Id::Null, reject_too_big_batch_response(self.max_response_size)))
} else {
self.result.push_str(response.json.get());
self.result.push(',');
Ok(())
}
}
pub fn is_empty(&self) -> bool {
self.result.len() <= 1
}
pub fn finish(mut self) -> BatchResponse {
if self.result.len() == 1 {
BatchResponse {
json: batch_response_error(Id::Null, ErrorObject::from(ErrorCode::InvalidRequest)),
extensions: self.extensions,
}
} else {
self.result.pop();
self.result.push(']');
let json = RawValue::from_string(self.result).expect("BatchResponse builds a valid JSON String; qed");
BatchResponse { json, extensions: self.extensions }
}
}
}
#[derive(Debug, Clone)]
pub struct BatchResponse {
json: Box<RawValue>,
extensions: Extensions,
}
pub fn batch_response_error(id: Id, err: impl Into<ErrorObject<'static>>) -> Box<RawValue> {
let err = InnerResponsePayload::<()>::error_borrowed(err);
serde_json::value::to_raw_value(&Response::new(err, id)).expect("JSON serialization infallible; qed")
}
#[derive(Debug)]
pub struct ResponsePayload<'a, T>
where
T: Clone,
{
inner: InnerResponsePayload<'a, T>,
on_exit: Option<MethodResponseNotifyTx>,
}
impl<'a, T: Clone> From<InnerResponsePayload<'a, T>> for ResponsePayload<'a, T> {
fn from(inner: InnerResponsePayload<'a, T>) -> Self {
Self { inner, on_exit: None }
}
}
impl<'a, T> ResponsePayload<'a, T>
where
T: Clone,
{
pub fn success(t: T) -> Self {
InnerResponsePayload::success(t).into()
}
pub fn success_borrowed(t: &'a T) -> Self {
InnerResponsePayload::success_borrowed(t).into()
}
pub fn error(e: impl Into<ErrorObjectOwned>) -> Self {
InnerResponsePayload::error(e.into()).into()
}
pub fn error_borrowed(e: impl Into<ErrorObject<'a>>) -> Self {
InnerResponsePayload::error_borrowed(e.into()).into()
}
pub fn notify_on_completion(mut self) -> (Self, MethodResponseFuture) {
let (tx, rx) = response_channel();
self.on_exit = Some(tx);
(self, rx)
}
pub fn into_owned(self) -> ResponsePayload<'static, T> {
ResponsePayload { inner: self.inner.into_owned(), on_exit: self.on_exit }
}
}
impl<T> From<ErrorCode> for ResponsePayload<'_, T>
where
T: Clone,
{
fn from(code: ErrorCode) -> Self {
let err: ErrorObject = code.into();
Self::error(err)
}
}
fn response_channel() -> (MethodResponseNotifyTx, MethodResponseFuture) {
let (tx, rx) = tokio::sync::oneshot::channel();
(MethodResponseNotifyTx(tx), MethodResponseFuture(rx))
}
#[derive(Debug)]
pub struct MethodResponseNotifyTx(tokio::sync::oneshot::Sender<NotifyMsg>);
impl MethodResponseNotifyTx {
pub fn notify(self, is_success: bool) {
let msg = if is_success { NotifyMsg::Ok } else { NotifyMsg::Err };
_ = self.0.send(msg);
}
}
#[derive(Debug)]
pub struct MethodResponseFuture(tokio::sync::oneshot::Receiver<NotifyMsg>);
#[derive(Debug, Copy, Clone)]
pub enum NotifyMsg {
Ok,
Err,
}
#[derive(Debug, Copy, Clone)]
pub enum MethodResponseError {
Closed,
JsonRpcError,
}
impl Future for MethodResponseFuture {
type Output = Result<(), MethodResponseError>;
fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
match self.0.poll_unpin(cx) {
Poll::Ready(Ok(NotifyMsg::Ok)) => Poll::Ready(Ok(())),
Poll::Ready(Ok(NotifyMsg::Err)) => Poll::Ready(Err(MethodResponseError::JsonRpcError)),
Poll::Ready(Err(_)) => Poll::Ready(Err(MethodResponseError::Closed)),
Poll::Pending => Poll::Pending,
}
}
}
#[derive(Debug, Clone)]
struct BoundedWriter {
max_len: usize,
buf: Vec<u8>,
}
impl BoundedWriter {
pub fn new(max_len: usize) -> Self {
Self { max_len, buf: Vec::with_capacity(128) }
}
pub fn into_bytes(self) -> Vec<u8> {
self.buf
}
}
impl io::Write for &mut BoundedWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let len = self.buf.len() + buf.len();
if self.max_len >= len {
self.buf.extend_from_slice(buf);
Ok(buf.len())
} else {
Err(io::Error::new(io::ErrorKind::OutOfMemory, "Memory capacity exceeded"))
}
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::{BatchResponseBuilder, BoundedWriter, Id, MethodResponse, ResponsePayload};
#[test]
fn batch_with_single_works() {
let method = MethodResponse::response(Id::Number(1), ResponsePayload::success_borrowed(&"a"), usize::MAX);
assert_eq!(method.json.get().len(), 37);
let mut builder = BatchResponseBuilder::new_with_limit(39);
builder.append(method).unwrap();
let batch = builder.finish();
assert_eq!(batch.json.get(), r#"[{"jsonrpc":"2.0","id":1,"result":"a"}]"#)
}
#[test]
fn batch_with_multiple_works() {
let m1 = MethodResponse::response(Id::Number(1), ResponsePayload::success_borrowed(&"a"), usize::MAX);
let m11 = MethodResponse::response(Id::Number(1), ResponsePayload::success_borrowed(&"a"), usize::MAX);
assert_eq!(m1.json.get().len(), 37);
let limit = 2 + (37 * 2) + 1;
let mut builder = BatchResponseBuilder::new_with_limit(limit);
builder.append(m1).unwrap();
builder.append(m11).unwrap();
let batch = builder.finish();
assert_eq!(batch.json.get(), r#"[{"jsonrpc":"2.0","id":1,"result":"a"},{"jsonrpc":"2.0","id":1,"result":"a"}]"#)
}
#[test]
fn batch_empty_err() {
let batch = BatchResponseBuilder::new_with_limit(1024).finish();
let exp_err = r#"{"jsonrpc":"2.0","id":null,"error":{"code":-32600,"message":"Invalid request"}}"#;
assert_eq!(batch.json.get(), exp_err);
}
#[test]
fn batch_too_big() {
let method = MethodResponse::response(Id::Number(1), ResponsePayload::success_borrowed(&"a".repeat(28)), 128);
assert_eq!(method.json.get().len(), 64);
let batch = BatchResponseBuilder::new_with_limit(63).append(method).unwrap_err();
let exp_err = r#"{"jsonrpc":"2.0","id":null,"error":{"code":-32011,"message":"The batch response was too large","data":"Exceeded max limit of 63"}}"#;
assert_eq!(batch.json.get(), exp_err);
}
#[test]
fn bounded_serializer_work() {
use jsonrpsee_types::{Response, ResponsePayload};
let mut writer = BoundedWriter::new(100);
let result = ResponsePayload::success(&"success");
let rp = &Response::new(result, Id::Number(1));
assert!(serde_json::to_writer(&mut writer, rp).is_ok());
assert_eq!(String::from_utf8(writer.into_bytes()).unwrap(), r#"{"jsonrpc":"2.0","id":1,"result":"success"}"#);
}
#[test]
fn bounded_serializer_cap_works() {
let mut writer = BoundedWriter::new(100);
assert!(serde_json::to_writer(&mut writer, &"x".repeat(99)).is_err());
}
}