1const LOG_TARGET: &str = "jsonrpsee-core";
30
31use std::io;
32use std::task::Poll;
33
34use crate::traits::ToJson;
35
36use futures_util::{Future, FutureExt};
37use http::Extensions;
38use jsonrpsee_types::error::{
39 ErrorCode, ErrorObject, OVERSIZED_RESPONSE_CODE, OVERSIZED_RESPONSE_MSG, reject_too_big_batch_response,
40};
41use jsonrpsee_types::{ErrorObjectOwned, Id, Response, ResponsePayload as InnerResponsePayload};
42use serde::Serialize;
43use serde_json::value::{RawValue, to_raw_value};
44
45#[derive(Debug, Clone)]
46enum ResponseKind {
47 MethodCall,
48 Subscription,
49 Batch,
50 Notification,
51}
52
53#[derive(Debug)]
60pub struct MethodResponse {
61 json: Box<RawValue>,
63 success_or_error: MethodResponseResult,
65 kind: ResponseKind,
67 on_close: Option<MethodResponseNotifyTx>,
70 extensions: Extensions,
72}
73
74impl AsRef<str> for MethodResponse {
75 fn as_ref(&self) -> &str {
76 self.json.get()
77 }
78}
79
80impl ToJson for MethodResponse {
81 fn to_json(&self) -> Result<Box<RawValue>, serde_json::Error> {
82 Ok(self.json.clone())
83 }
84}
85
86impl std::fmt::Display for MethodResponse {
87 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88 write!(f, "{}", self.json)
89 }
90}
91
92impl MethodResponse {
93 pub fn is_success(&self) -> bool {
95 self.success_or_error.is_success()
96 }
97
98 pub fn is_error(&self) -> bool {
100 self.success_or_error.is_error()
101 }
102
103 pub fn is_subscription(&self) -> bool {
105 matches!(self.kind, ResponseKind::Subscription)
106 }
107
108 pub fn is_method_call(&self) -> bool {
110 matches!(self.kind, ResponseKind::MethodCall)
111 }
112
113 pub fn is_notification(&self) -> bool {
115 matches!(self.kind, ResponseKind::Notification)
116 }
117
118 pub fn is_batch(&self) -> bool {
120 matches!(self.kind, ResponseKind::Batch)
121 }
122
123 pub fn into_json(self) -> Box<RawValue> {
125 self.json
126 }
127
128 pub fn to_json(&self) -> Box<RawValue> {
130 self.json.clone()
131 }
132
133 pub fn into_parts(self) -> (Box<RawValue>, Option<MethodResponseNotifyTx>, Extensions) {
135 (self.json, self.on_close, self.extensions)
136 }
137
138 pub fn as_error_code(&self) -> Option<i32> {
142 self.success_or_error.as_error_code()
143 }
144
145 pub fn as_json(&self) -> &RawValue {
147 &self.json
148 }
149
150 pub fn from_batch(batch: BatchResponse) -> Self {
152 Self {
153 json: batch.json,
154 success_or_error: MethodResponseResult::Success,
155 kind: ResponseKind::Batch,
156 on_close: None,
157 extensions: batch.extensions,
158 }
159 }
160
161 pub fn subscription_response<T>(id: Id, result: ResponsePayload<T>, max_response_size: usize) -> Self
164 where
165 T: Serialize + Clone,
166 {
167 let mut rp = Self::response(id, result, max_response_size);
168 rp.kind = ResponseKind::Subscription;
169 rp
170 }
171
172 pub fn response<T>(id: Id, rp: ResponsePayload<T>, max_response_size: usize) -> Self
177 where
178 T: Serialize + Clone,
179 {
180 let mut writer = BoundedWriter::new(max_response_size);
181
182 let success_or_error = if let InnerResponsePayload::Error(ref e) = rp.inner {
183 MethodResponseResult::Failed(e.code())
184 } else {
185 MethodResponseResult::Success
186 };
187
188 let kind = ResponseKind::MethodCall;
189
190 match serde_json::to_writer(&mut writer, &Response::new(rp.inner, id.clone())) {
191 Ok(_) => {
192 let result = unsafe { String::from_utf8_unchecked(writer.into_bytes()) };
194 let json = RawValue::from_string(result).expect("Valid JSON String; qed");
195
196 Self { json, success_or_error, kind, on_close: rp.on_exit, extensions: Extensions::new() }
197 }
198 Err(err) => {
199 tracing::error!(target: LOG_TARGET, "Error serializing response: {:?}", err);
200
201 if err.is_io() {
202 let data = to_raw_value(&format!("Exceeded max limit of {max_response_size}")).ok();
203 let err_code = OVERSIZED_RESPONSE_CODE;
204
205 let err = InnerResponsePayload::<()>::error_borrowed(ErrorObject::borrowed(
206 err_code,
207 OVERSIZED_RESPONSE_MSG,
208 data.as_deref(),
209 ));
210 let json = serde_json::value::to_raw_value(&Response::new(err, id))
211 .expect("JSON serialization infallible; qed");
212
213 Self {
214 json,
215 success_or_error: MethodResponseResult::Failed(err_code),
216 kind,
217 on_close: rp.on_exit,
218 extensions: Extensions::new(),
219 }
220 } else {
221 let err = ErrorCode::InternalError;
222 let payload = jsonrpsee_types::ResponsePayload::<()>::error(err);
223 let json = serde_json::value::to_raw_value(&Response::new(payload, id))
224 .expect("JSON serialization infallible; qed");
225 Self {
226 json,
227 success_or_error: MethodResponseResult::Failed(err.code()),
228 kind,
229 on_close: rp.on_exit,
230 extensions: Extensions::new(),
231 }
232 }
233 }
234 }
235 }
236
237 pub fn subscription_error<'a>(id: Id, err: impl Into<ErrorObject<'a>>) -> Self {
240 let mut rp = Self::error(id, err);
241 rp.kind = ResponseKind::Subscription;
242 rp
243 }
244
245 pub fn error<'a>(id: Id, err: impl Into<ErrorObject<'a>>) -> Self {
247 let err: ErrorObject = err.into();
248 let err_code = err.code();
249 let err = InnerResponsePayload::<()>::error_borrowed(err);
250 let json =
251 serde_json::value::to_raw_value(&Response::new(err, id)).expect("JSON serialization infallible; qed");
252 Self {
253 json,
254 success_or_error: MethodResponseResult::Failed(err_code),
255 kind: ResponseKind::MethodCall,
256 on_close: None,
257 extensions: Extensions::new(),
258 }
259 }
260
261 pub fn notification() -> Self {
263 Self {
264 json: RawValue::NULL.to_owned(),
265 success_or_error: MethodResponseResult::Success,
266 kind: ResponseKind::Notification,
267 on_close: None,
268 extensions: Extensions::new(),
269 }
270 }
271
272 pub fn extensions(&self) -> &Extensions {
274 &self.extensions
275 }
276
277 pub fn extensions_mut(&mut self) -> &mut Extensions {
279 &mut self.extensions
280 }
281
282 pub fn with_extensions(self, extensions: Extensions) -> Self {
284 Self { extensions, ..self }
285 }
286}
287
288#[derive(Debug, Copy, Clone)]
290enum MethodResponseResult {
291 Success,
293 Failed(i32),
295}
296
297impl MethodResponseResult {
298 fn is_success(&self) -> bool {
300 matches!(self, MethodResponseResult::Success)
301 }
302
303 fn is_error(&self) -> bool {
305 matches!(self, MethodResponseResult::Failed(_))
306 }
307
308 fn as_error_code(&self) -> Option<i32> {
312 match self {
313 Self::Failed(e) => Some(*e),
314 _ => None,
315 }
316 }
317}
318
319#[derive(Debug, Clone, Default)]
321pub struct BatchResponseBuilder {
322 result: String,
324 max_response_size: usize,
326 extensions: Extensions,
328}
329
330impl BatchResponseBuilder {
331 pub fn new_with_limit(limit: usize) -> Self {
333 let mut initial = String::with_capacity(2048);
334 initial.push('[');
335
336 Self { result: initial, max_response_size: limit, extensions: Extensions::new() }
337 }
338
339 pub fn append(&mut self, response: MethodResponse) -> Result<(), MethodResponse> {
344 let len = response.json.get().len() + self.result.len() + 1;
347 self.extensions.extend(response.extensions);
348
349 if len > self.max_response_size {
350 Err(MethodResponse::error(Id::Null, reject_too_big_batch_response(self.max_response_size)))
351 } else {
352 self.result.push_str(response.json.get());
353 self.result.push(',');
354 Ok(())
355 }
356 }
357
358 pub fn is_empty(&self) -> bool {
360 self.result.len() <= 1
361 }
362
363 pub fn finish(mut self) -> BatchResponse {
365 if self.result.len() == 1 {
366 BatchResponse {
367 json: batch_response_error(Id::Null, ErrorObject::from(ErrorCode::InvalidRequest)),
368 extensions: self.extensions,
369 }
370 } else {
371 self.result.pop();
372 self.result.push(']');
373 let json = RawValue::from_string(self.result).expect("BatchResponse builds a valid JSON String; qed");
374 BatchResponse { json, extensions: self.extensions }
375 }
376 }
377}
378
379#[derive(Debug, Clone)]
381pub struct BatchResponse {
382 json: Box<RawValue>,
383 extensions: Extensions,
384}
385
386pub fn batch_response_error(id: Id, err: impl Into<ErrorObject<'static>>) -> Box<RawValue> {
388 let err = InnerResponsePayload::<()>::error_borrowed(err);
389 serde_json::value::to_raw_value(&Response::new(err, id)).expect("JSON serialization infallible; qed")
390}
391
392#[derive(Debug)]
395pub struct ResponsePayload<'a, T>
396where
397 T: Clone,
398{
399 inner: InnerResponsePayload<'a, T>,
400 on_exit: Option<MethodResponseNotifyTx>,
401}
402
403impl<'a, T: Clone> From<InnerResponsePayload<'a, T>> for ResponsePayload<'a, T> {
404 fn from(inner: InnerResponsePayload<'a, T>) -> Self {
405 Self { inner, on_exit: None }
406 }
407}
408
409impl<'a, T> ResponsePayload<'a, T>
410where
411 T: Clone,
412{
413 pub fn success(t: T) -> Self {
415 InnerResponsePayload::success(t).into()
416 }
417
418 pub fn success_borrowed(t: &'a T) -> Self {
420 InnerResponsePayload::success_borrowed(t).into()
421 }
422
423 pub fn error(e: impl Into<ErrorObjectOwned>) -> Self {
425 InnerResponsePayload::error(e.into()).into()
426 }
427
428 pub fn error_borrowed(e: impl Into<ErrorObject<'a>>) -> Self {
430 InnerResponsePayload::error_borrowed(e.into()).into()
431 }
432
433 pub fn notify_on_completion(mut self) -> (Self, MethodResponseFuture) {
439 let (tx, rx) = response_channel();
440 self.on_exit = Some(tx);
441 (self, rx)
442 }
443
444 pub fn into_owned(self) -> ResponsePayload<'static, T> {
446 ResponsePayload { inner: self.inner.into_owned(), on_exit: self.on_exit }
447 }
448}
449
450impl<T> From<ErrorCode> for ResponsePayload<'_, T>
451where
452 T: Clone,
453{
454 fn from(code: ErrorCode) -> Self {
455 let err: ErrorObject = code.into();
456 Self::error(err)
457 }
458}
459
460fn response_channel() -> (MethodResponseNotifyTx, MethodResponseFuture) {
463 let (tx, rx) = tokio::sync::oneshot::channel();
464 (MethodResponseNotifyTx(tx), MethodResponseFuture(rx))
465}
466
467#[derive(Debug)]
469pub struct MethodResponseNotifyTx(tokio::sync::oneshot::Sender<NotifyMsg>);
470
471impl MethodResponseNotifyTx {
472 pub fn notify(self, is_success: bool) {
474 let msg = if is_success { NotifyMsg::Ok } else { NotifyMsg::Err };
475 _ = self.0.send(msg);
476 }
477}
478
479#[derive(Debug)]
481pub struct MethodResponseFuture(tokio::sync::oneshot::Receiver<NotifyMsg>);
482
483#[derive(Debug, Copy, Clone)]
486pub enum NotifyMsg {
487 Ok,
489 Err,
493}
494
495#[derive(Debug, Copy, Clone)]
497pub enum MethodResponseError {
498 Closed,
500 JsonRpcError,
502}
503
504impl Future for MethodResponseFuture {
505 type Output = Result<(), MethodResponseError>;
506
507 fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
508 match self.0.poll_unpin(cx) {
509 Poll::Ready(Ok(NotifyMsg::Ok)) => Poll::Ready(Ok(())),
510 Poll::Ready(Ok(NotifyMsg::Err)) => Poll::Ready(Err(MethodResponseError::JsonRpcError)),
511 Poll::Ready(Err(_)) => Poll::Ready(Err(MethodResponseError::Closed)),
512 Poll::Pending => Poll::Pending,
513 }
514 }
515}
516
517#[derive(Debug, Clone)]
519struct BoundedWriter {
520 max_len: usize,
521 buf: Vec<u8>,
522}
523
524impl BoundedWriter {
525 pub fn new(max_len: usize) -> Self {
527 Self { max_len, buf: Vec::with_capacity(128) }
528 }
529
530 pub fn into_bytes(self) -> Vec<u8> {
532 self.buf
533 }
534}
535
536impl io::Write for &mut BoundedWriter {
537 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
538 let len = self.buf.len() + buf.len();
539 if self.max_len >= len {
540 self.buf.extend_from_slice(buf);
541 Ok(buf.len())
542 } else {
543 Err(io::Error::new(io::ErrorKind::OutOfMemory, "Memory capacity exceeded"))
544 }
545 }
546
547 fn flush(&mut self) -> io::Result<()> {
548 Ok(())
549 }
550}
551
552#[cfg(test)]
553mod tests {
554 use super::{BatchResponseBuilder, BoundedWriter, Id, MethodResponse, ResponsePayload};
555
556 #[test]
557 fn batch_with_single_works() {
558 let method = MethodResponse::response(Id::Number(1), ResponsePayload::success_borrowed(&"a"), usize::MAX);
559 assert_eq!(method.json.get().len(), 37);
560
561 let mut builder = BatchResponseBuilder::new_with_limit(39);
563 builder.append(method).unwrap();
564 let batch = builder.finish();
565
566 assert_eq!(batch.json.get(), r#"[{"jsonrpc":"2.0","id":1,"result":"a"}]"#)
567 }
568
569 #[test]
570 fn batch_with_multiple_works() {
571 let m1 = MethodResponse::response(Id::Number(1), ResponsePayload::success_borrowed(&"a"), usize::MAX);
572 let m11 = MethodResponse::response(Id::Number(1), ResponsePayload::success_borrowed(&"a"), usize::MAX);
573 assert_eq!(m1.json.get().len(), 37);
574
575 let limit = 2 + (37 * 2) + 1;
578 let mut builder = BatchResponseBuilder::new_with_limit(limit);
579 builder.append(m1).unwrap();
580 builder.append(m11).unwrap();
581 let batch = builder.finish();
582
583 assert_eq!(batch.json.get(), r#"[{"jsonrpc":"2.0","id":1,"result":"a"},{"jsonrpc":"2.0","id":1,"result":"a"}]"#)
584 }
585
586 #[test]
587 fn batch_empty_err() {
588 let batch = BatchResponseBuilder::new_with_limit(1024).finish();
589
590 let exp_err = r#"{"jsonrpc":"2.0","id":null,"error":{"code":-32600,"message":"Invalid request"}}"#;
591 assert_eq!(batch.json.get(), exp_err);
592 }
593
594 #[test]
595 fn batch_too_big() {
596 let method = MethodResponse::response(Id::Number(1), ResponsePayload::success_borrowed(&"a".repeat(28)), 128);
597 assert_eq!(method.json.get().len(), 64);
598
599 let batch = BatchResponseBuilder::new_with_limit(63).append(method).unwrap_err();
600
601 let exp_err = r#"{"jsonrpc":"2.0","id":null,"error":{"code":-32011,"message":"The batch response was too large","data":"Exceeded max limit of 63"}}"#;
602 assert_eq!(batch.json.get(), exp_err);
603 }
604
605 #[test]
606 fn bounded_serializer_work() {
607 use jsonrpsee_types::{Response, ResponsePayload};
608
609 let mut writer = BoundedWriter::new(100);
610 let result = ResponsePayload::success(&"success");
611 let rp = &Response::new(result, Id::Number(1));
612
613 assert!(serde_json::to_writer(&mut writer, rp).is_ok());
614 assert_eq!(String::from_utf8(writer.into_bytes()).unwrap(), r#"{"jsonrpc":"2.0","id":1,"result":"success"}"#);
615 }
616
617 #[test]
618 fn bounded_serializer_cap_works() {
619 let mut writer = BoundedWriter::new(100);
620 assert!(serde_json::to_writer(&mut writer, &"x".repeat(99)).is_err());
622 }
623}