1use crate::server::{BoundedWriter, LOG_TARGET};
28use std::task::Poll;
29
30use futures_util::{Future, FutureExt};
31use http::Extensions;
32use jsonrpsee_types::error::{
33 reject_too_big_batch_response, ErrorCode, ErrorObject, OVERSIZED_RESPONSE_CODE, OVERSIZED_RESPONSE_MSG,
34};
35use jsonrpsee_types::{ErrorObjectOwned, Id, Response, ResponsePayload as InnerResponsePayload};
36use serde::Serialize;
37use serde_json::value::to_raw_value;
38
39#[derive(Debug, Clone)]
40enum ResponseKind {
41 MethodCall,
42 Subscription,
43 Batch,
44}
45
46#[derive(Debug)]
53pub struct MethodResponse {
54 result: String,
56 success_or_error: MethodResponseResult,
58 kind: ResponseKind,
60 on_close: Option<MethodResponseNotifyTx>,
63 extensions: Extensions,
65}
66
67impl MethodResponse {
68 pub fn is_success(&self) -> bool {
70 self.success_or_error.is_success()
71 }
72
73 pub fn is_error(&self) -> bool {
75 self.success_or_error.is_error()
76 }
77
78 pub fn is_subscription(&self) -> bool {
80 matches!(self.kind, ResponseKind::Subscription)
81 }
82
83 pub fn is_method_call(&self) -> bool {
85 matches!(self.kind, ResponseKind::MethodCall)
86 }
87
88 pub fn is_batch(&self) -> bool {
90 matches!(self.kind, ResponseKind::Batch)
91 }
92
93 pub fn into_result(self) -> String {
95 self.result
96 }
97
98 pub fn to_result(&self) -> String {
100 self.result.clone()
101 }
102
103 pub fn into_parts(self) -> (String, Option<MethodResponseNotifyTx>) {
105 (self.result, self.on_close)
106 }
107
108 pub fn as_error_code(&self) -> Option<i32> {
112 self.success_or_error.as_error_code()
113 }
114
115 pub fn as_result(&self) -> &str {
117 &self.result
118 }
119
120 pub fn from_batch(batch: BatchResponse) -> Self {
122 Self {
123 result: batch.0,
124 success_or_error: MethodResponseResult::Success,
125 kind: ResponseKind::Batch,
126 on_close: None,
127 extensions: Extensions::new(),
128 }
129 }
130
131 pub fn subscription_response<T>(id: Id, result: ResponsePayload<T>, max_response_size: usize) -> Self
134 where
135 T: Serialize + Clone,
136 {
137 let mut rp = Self::response(id, result, max_response_size);
138 rp.kind = ResponseKind::Subscription;
139 rp
140 }
141
142 pub fn response<T>(id: Id, rp: ResponsePayload<T>, max_response_size: usize) -> Self
147 where
148 T: Serialize + Clone,
149 {
150 let mut writer = BoundedWriter::new(max_response_size);
151
152 let success_or_error = if let InnerResponsePayload::Error(ref e) = rp.inner {
153 MethodResponseResult::Failed(e.code())
154 } else {
155 MethodResponseResult::Success
156 };
157
158 let kind = ResponseKind::MethodCall;
159
160 match serde_json::to_writer(&mut writer, &Response::new(rp.inner, id.clone())) {
161 Ok(_) => {
162 let result = unsafe { String::from_utf8_unchecked(writer.into_bytes()) };
164
165 Self { result, success_or_error, kind, on_close: rp.on_exit, extensions: Extensions::new() }
166 }
167 Err(err) => {
168 tracing::error!(target: LOG_TARGET, "Error serializing response: {:?}", err);
169
170 if err.is_io() {
171 let data = to_raw_value(&format!("Exceeded max limit of {max_response_size}")).ok();
172 let err_code = OVERSIZED_RESPONSE_CODE;
173
174 let err = InnerResponsePayload::<()>::error_borrowed(ErrorObject::borrowed(
175 err_code,
176 OVERSIZED_RESPONSE_MSG,
177 data.as_deref(),
178 ));
179 let result =
180 serde_json::to_string(&Response::new(err, id)).expect("JSON serialization infallible; qed");
181
182 Self {
183 result,
184 success_or_error: MethodResponseResult::Failed(err_code),
185 kind,
186 on_close: rp.on_exit,
187 extensions: Extensions::new(),
188 }
189 } else {
190 let err = ErrorCode::InternalError;
191 let payload = jsonrpsee_types::ResponsePayload::<()>::error(err);
192 let result =
193 serde_json::to_string(&Response::new(payload, id)).expect("JSON serialization infallible; qed");
194 Self {
195 result,
196 success_or_error: MethodResponseResult::Failed(err.code()),
197 kind,
198 on_close: rp.on_exit,
199 extensions: Extensions::new(),
200 }
201 }
202 }
203 }
204 }
205
206 pub fn subscription_error<'a>(id: Id, err: impl Into<ErrorObject<'a>>) -> Self {
209 let mut rp = Self::error(id, err);
210 rp.kind = ResponseKind::Subscription;
211 rp
212 }
213
214 pub fn error<'a>(id: Id, err: impl Into<ErrorObject<'a>>) -> Self {
216 let err: ErrorObject = err.into();
217 let err_code = err.code();
218 let err = InnerResponsePayload::<()>::error_borrowed(err);
219 let result = serde_json::to_string(&Response::new(err, id)).expect("JSON serialization infallible; qed");
220 Self {
221 result,
222 success_or_error: MethodResponseResult::Failed(err_code),
223 kind: ResponseKind::MethodCall,
224 on_close: None,
225 extensions: Extensions::new(),
226 }
227 }
228
229 pub fn extensions(&self) -> &Extensions {
231 &self.extensions
232 }
233
234 pub fn extensions_mut(&mut self) -> &mut Extensions {
236 &mut self.extensions
237 }
238
239 pub fn with_extensions(self, extensions: Extensions) -> Self {
241 Self { extensions, ..self }
242 }
243}
244
245#[derive(Debug, Copy, Clone)]
247enum MethodResponseResult {
248 Success,
250 Failed(i32),
252}
253
254impl MethodResponseResult {
255 fn is_success(&self) -> bool {
257 matches!(self, MethodResponseResult::Success)
258 }
259
260 fn is_error(&self) -> bool {
262 matches!(self, MethodResponseResult::Failed(_))
263 }
264
265 fn as_error_code(&self) -> Option<i32> {
269 match self {
270 Self::Failed(e) => Some(*e),
271 _ => None,
272 }
273 }
274}
275
276#[derive(Debug, Clone, Default)]
278pub struct BatchResponseBuilder {
279 result: String,
281 max_response_size: usize,
283}
284
285impl BatchResponseBuilder {
286 pub fn new_with_limit(limit: usize) -> Self {
288 let mut initial = String::with_capacity(2048);
289 initial.push('[');
290
291 Self { result: initial, max_response_size: limit }
292 }
293
294 pub fn append(&mut self, response: &MethodResponse) -> Result<(), MethodResponse> {
299 let len = response.result.len() + self.result.len() + 1;
302
303 if len > self.max_response_size {
304 Err(MethodResponse::error(Id::Null, reject_too_big_batch_response(self.max_response_size)))
305 } else {
306 self.result.push_str(&response.result);
307 self.result.push(',');
308 Ok(())
309 }
310 }
311
312 pub fn is_empty(&self) -> bool {
314 self.result.len() <= 1
315 }
316
317 pub fn finish(mut self) -> BatchResponse {
319 if self.result.len() == 1 {
320 BatchResponse(batch_response_error(Id::Null, ErrorObject::from(ErrorCode::InvalidRequest)))
321 } else {
322 self.result.pop();
323 self.result.push(']');
324 BatchResponse(self.result)
325 }
326 }
327}
328
329#[derive(Debug, Clone)]
331pub struct BatchResponse(String);
332
333pub fn batch_response_error(id: Id, err: impl Into<ErrorObject<'static>>) -> String {
335 let err = InnerResponsePayload::<()>::error_borrowed(err);
336 serde_json::to_string(&Response::new(err, id)).expect("JSON serialization infallible; qed")
337}
338
339#[derive(Debug)]
342pub struct ResponsePayload<'a, T>
343where
344 T: Clone,
345{
346 inner: InnerResponsePayload<'a, T>,
347 on_exit: Option<MethodResponseNotifyTx>,
348}
349
350impl<'a, T: Clone> From<InnerResponsePayload<'a, T>> for ResponsePayload<'a, T> {
351 fn from(inner: InnerResponsePayload<'a, T>) -> Self {
352 Self { inner, on_exit: None }
353 }
354}
355
356impl<'a, T> ResponsePayload<'a, T>
357where
358 T: Clone,
359{
360 pub fn success(t: T) -> Self {
362 InnerResponsePayload::success(t).into()
363 }
364
365 pub fn success_borrowed(t: &'a T) -> Self {
367 InnerResponsePayload::success_borrowed(t).into()
368 }
369
370 pub fn error(e: impl Into<ErrorObjectOwned>) -> Self {
372 InnerResponsePayload::error(e.into()).into()
373 }
374
375 pub fn error_borrowed(e: impl Into<ErrorObject<'a>>) -> Self {
377 InnerResponsePayload::error_borrowed(e.into()).into()
378 }
379
380 pub fn notify_on_completion(mut self) -> (Self, MethodResponseFuture) {
386 let (tx, rx) = response_channel();
387 self.on_exit = Some(tx);
388 (self, rx)
389 }
390
391 pub fn into_owned(self) -> ResponsePayload<'static, T> {
393 ResponsePayload { inner: self.inner.into_owned(), on_exit: self.on_exit }
394 }
395}
396
397impl<'a, T> From<ErrorCode> for ResponsePayload<'a, T>
398where
399 T: Clone,
400{
401 fn from(code: ErrorCode) -> Self {
402 let err: ErrorObject = code.into();
403 Self::error(err)
404 }
405}
406
407fn response_channel() -> (MethodResponseNotifyTx, MethodResponseFuture) {
410 let (tx, rx) = tokio::sync::oneshot::channel();
411 (MethodResponseNotifyTx(tx), MethodResponseFuture(rx))
412}
413
414#[derive(Debug)]
416pub struct MethodResponseNotifyTx(tokio::sync::oneshot::Sender<NotifyMsg>);
417
418impl MethodResponseNotifyTx {
419 pub fn notify(self, is_success: bool) {
421 let msg = if is_success { NotifyMsg::Ok } else { NotifyMsg::Err };
422 _ = self.0.send(msg);
423 }
424}
425
426#[derive(Debug)]
428pub struct MethodResponseFuture(tokio::sync::oneshot::Receiver<NotifyMsg>);
429
430#[derive(Debug, Copy, Clone)]
433pub enum NotifyMsg {
434 Ok,
436 Err,
440}
441
442#[derive(Debug, Copy, Clone)]
444pub enum MethodResponseError {
445 Closed,
447 JsonRpcError,
449}
450
451impl Future for MethodResponseFuture {
452 type Output = Result<(), MethodResponseError>;
453
454 fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
455 match self.0.poll_unpin(cx) {
456 Poll::Ready(Ok(NotifyMsg::Ok)) => Poll::Ready(Ok(())),
457 Poll::Ready(Ok(NotifyMsg::Err)) => Poll::Ready(Err(MethodResponseError::JsonRpcError)),
458 Poll::Ready(Err(_)) => Poll::Ready(Err(MethodResponseError::Closed)),
459 Poll::Pending => Poll::Pending,
460 }
461 }
462}
463
464#[cfg(test)]
465mod tests {
466 use super::{BatchResponseBuilder, MethodResponse, ResponsePayload};
467 use jsonrpsee_types::Id;
468
469 #[test]
470 fn batch_with_single_works() {
471 let method = MethodResponse::response(Id::Number(1), ResponsePayload::success_borrowed(&"a"), usize::MAX);
472 assert_eq!(method.result.len(), 37);
473
474 let mut builder = BatchResponseBuilder::new_with_limit(39);
476 builder.append(&method).unwrap();
477 let batch = builder.finish();
478
479 assert_eq!(batch.0, r#"[{"jsonrpc":"2.0","id":1,"result":"a"}]"#)
480 }
481
482 #[test]
483 fn batch_with_multiple_works() {
484 let m1 = MethodResponse::response(Id::Number(1), ResponsePayload::success_borrowed(&"a"), usize::MAX);
485 assert_eq!(m1.result.len(), 37);
486
487 let limit = 2 + (37 * 2) + 1;
490 let mut builder = BatchResponseBuilder::new_with_limit(limit);
491 builder.append(&m1).unwrap();
492 builder.append(&m1).unwrap();
493 let batch = builder.finish();
494
495 assert_eq!(batch.0, r#"[{"jsonrpc":"2.0","id":1,"result":"a"},{"jsonrpc":"2.0","id":1,"result":"a"}]"#)
496 }
497
498 #[test]
499 fn batch_empty_err() {
500 let batch = BatchResponseBuilder::new_with_limit(1024).finish();
501
502 let exp_err = r#"{"jsonrpc":"2.0","id":null,"error":{"code":-32600,"message":"Invalid request"}}"#;
503 assert_eq!(batch.0, exp_err);
504 }
505
506 #[test]
507 fn batch_too_big() {
508 let method = MethodResponse::response(Id::Number(1), ResponsePayload::success_borrowed(&"a".repeat(28)), 128);
509 assert_eq!(method.result.len(), 64);
510
511 let batch = BatchResponseBuilder::new_with_limit(63).append(&method).unwrap_err();
512
513 let exp_err = r#"{"jsonrpc":"2.0","id":null,"error":{"code":-32011,"message":"The batch response was too large","data":"Exceeded max limit of 63"}}"#;
514 assert_eq!(batch.result, exp_err);
515 }
516}