1use std::future::Future;
28use std::net::{SocketAddr, TcpListener as StdTcpListener};
29use std::pin::Pin;
30use std::task::{Context, Poll};
31
32use crate::response;
33use crate::response::{internal_error, malformed};
34use futures_channel::mpsc;
35use futures_util::future::FutureExt;
36use futures_util::stream::{StreamExt, TryStreamExt};
37use hyper::header::{HeaderMap, HeaderValue};
38use hyper::server::conn::AddrStream;
39use hyper::server::{conn::AddrIncoming, Builder as HyperBuilder};
40use hyper::service::{make_service_fn, service_fn};
41use hyper::{Error as HyperError, Method};
42use jsonrpsee_core::error::{Error, GenericTransportError};
43use jsonrpsee_core::http_helpers::{self, read_body};
44use jsonrpsee_core::middleware::{self, HttpMiddleware as Middleware};
45use jsonrpsee_core::server::access_control::AccessControl;
46use jsonrpsee_core::server::helpers::{prepare_error, MethodResponse};
47use jsonrpsee_core::server::helpers::{BatchResponse, BatchResponseBuilder};
48use jsonrpsee_core::server::resource_limiting::Resources;
49use jsonrpsee_core::server::rpc_module::{MethodKind, Methods};
50use jsonrpsee_core::tracing::{rx_log_from_json, rx_log_from_str, tx_log_from_str, RpcTracing};
51use jsonrpsee_core::TEN_MB_SIZE_BYTES;
52use jsonrpsee_types::error::{ErrorCode, ErrorObject, BATCHES_NOT_SUPPORTED_CODE, BATCHES_NOT_SUPPORTED_MSG};
53use jsonrpsee_types::{Id, Notification, Params, Request};
54use serde_json::value::RawValue;
55use tokio::net::{TcpListener, ToSocketAddrs};
56use tracing_futures::Instrument;
57
58type Notif<'a> = Notification<'a, Option<&'a RawValue>>;
59
60#[derive(Debug)]
62pub struct Builder<M = ()> {
63 access_control: AccessControl,
65 resources: Resources,
66 max_request_body_size: u32,
67 max_response_body_size: u32,
68 batch_requests_supported: bool,
69 tokio_runtime: Option<tokio::runtime::Handle>,
71 middleware: M,
72 max_log_length: u32,
73 health_api: Option<HealthApi>,
74}
75
76impl Default for Builder {
77 fn default() -> Self {
78 Self {
79 access_control: AccessControl::default(),
80 max_request_body_size: TEN_MB_SIZE_BYTES,
81 max_response_body_size: TEN_MB_SIZE_BYTES,
82 batch_requests_supported: true,
83 resources: Resources::default(),
84 tokio_runtime: None,
85 middleware: (),
86 max_log_length: 4096,
87 health_api: None,
88 }
89 }
90}
91
92impl Builder {
93 pub fn new() -> Self {
95 Self::default()
96 }
97}
98
99impl<M> Builder<M> {
100 pub fn set_middleware<T: Middleware>(self, middleware: T) -> Builder<T> {
141 Builder {
142 access_control: self.access_control,
143 max_request_body_size: self.max_request_body_size,
144 max_response_body_size: self.max_response_body_size,
145 batch_requests_supported: self.batch_requests_supported,
146 resources: self.resources,
147 tokio_runtime: self.tokio_runtime,
148 middleware,
149 max_log_length: self.max_log_length,
150 health_api: self.health_api,
151 }
152 }
153
154 pub fn max_request_body_size(mut self, size: u32) -> Self {
156 self.max_request_body_size = size;
157 self
158 }
159
160 pub fn max_response_body_size(mut self, size: u32) -> Self {
162 self.max_response_body_size = size;
163 self
164 }
165
166 pub fn set_access_control(mut self, acl: AccessControl) -> Self {
168 self.access_control = acl;
169 self
170 }
171
172 pub fn batch_requests_supported(mut self, supported: bool) -> Self {
175 self.batch_requests_supported = supported;
176 self
177 }
178
179 pub fn register_resource(mut self, label: &'static str, capacity: u16, default: u16) -> Result<Self, Error> {
185 self.resources.register(label, capacity, default)?;
186
187 Ok(self)
188 }
189
190 pub fn custom_tokio_runtime(mut self, rt: tokio::runtime::Handle) -> Self {
194 self.tokio_runtime = Some(rt);
195 self
196 }
197
198 pub fn health_api(mut self, path: impl Into<String>, method: impl Into<String>) -> Result<Self, Error> {
205 let path = path.into();
206
207 if !path.starts_with('/') {
208 return Err(Error::Custom(format!("Health endpoint path must start with `/` to work, got: {}", path)));
209 }
210
211 self.health_api = Some(HealthApi { path, method: method.into() });
212 Ok(self)
213 }
214
215 pub fn build_from_hyper(
248 self,
249 listener: hyper::server::Builder<AddrIncoming>,
250 local_addr: SocketAddr,
251 ) -> Result<Server<M>, Error> {
252 Ok(Server {
253 access_control: self.access_control,
254 listener,
255 local_addr: Some(local_addr),
256 max_request_body_size: self.max_request_body_size,
257 max_response_body_size: self.max_response_body_size,
258 batch_requests_supported: self.batch_requests_supported,
259 resources: self.resources,
260 tokio_runtime: self.tokio_runtime,
261 middleware: self.middleware,
262 max_log_length: self.max_log_length,
263 health_api: self.health_api,
264 })
265 }
266
267 pub fn build_from_tcp(self, listener: impl Into<StdTcpListener>) -> Result<Server<M>, Error> {
292 let listener = listener.into();
293 let local_addr = listener.local_addr().ok();
294
295 let listener = hyper::Server::from_tcp(listener)?;
296
297 Ok(Server {
298 listener,
299 local_addr,
300 access_control: self.access_control,
301 max_request_body_size: self.max_request_body_size,
302 max_response_body_size: self.max_response_body_size,
303 batch_requests_supported: self.batch_requests_supported,
304 resources: self.resources,
305 tokio_runtime: self.tokio_runtime,
306 middleware: self.middleware,
307 max_log_length: self.max_log_length,
308 health_api: self.health_api,
309 })
310 }
311
312 pub async fn build(self, addrs: impl ToSocketAddrs) -> Result<Server<M>, Error> {
328 let listener = TcpListener::bind(addrs).await?.into_std()?;
329
330 let local_addr = listener.local_addr().ok();
331 let listener = hyper::Server::from_tcp(listener)?.tcp_nodelay(true);
332
333 Ok(Server {
334 listener,
335 local_addr,
336 access_control: self.access_control,
337 max_request_body_size: self.max_request_body_size,
338 max_response_body_size: self.max_response_body_size,
339 batch_requests_supported: self.batch_requests_supported,
340 resources: self.resources,
341 tokio_runtime: self.tokio_runtime,
342 middleware: self.middleware,
343 max_log_length: self.max_log_length,
344 health_api: self.health_api,
345 })
346 }
347}
348
349#[derive(Debug, Clone)]
350struct HealthApi {
351 path: String,
352 method: String,
353}
354
355#[derive(Debug)]
357pub struct ServerHandle {
358 stop_sender: mpsc::Sender<()>,
359 pub(crate) handle: Option<tokio::task::JoinHandle<()>>,
360}
361
362impl ServerHandle {
363 pub fn stop(mut self) -> Result<tokio::task::JoinHandle<()>, Error> {
365 let stop = self.stop_sender.try_send(()).map(|_| self.handle.take());
366 match stop {
367 Ok(Some(handle)) => Ok(handle),
368 _ => Err(Error::AlreadyStopped),
369 }
370 }
371}
372
373impl Future for ServerHandle {
374 type Output = ();
375
376 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
377 let handle = match &mut self.handle {
378 Some(handle) => handle,
379 None => return Poll::Ready(()),
380 };
381
382 handle.poll_unpin(cx).map(|_| ())
383 }
384}
385
386#[derive(Debug)]
388pub struct Server<M = ()> {
389 listener: HyperBuilder<AddrIncoming>,
391 local_addr: Option<SocketAddr>,
393 max_request_body_size: u32,
395 max_response_body_size: u32,
397 max_log_length: u32,
401 batch_requests_supported: bool,
403 access_control: AccessControl,
405 resources: Resources,
407 tokio_runtime: Option<tokio::runtime::Handle>,
409 middleware: M,
410 health_api: Option<HealthApi>,
411}
412
413impl<M: Middleware> Server<M> {
414 pub fn local_addr(&self) -> Result<SocketAddr, Error> {
416 self.local_addr.ok_or_else(|| Error::Custom("Local address not found".into()))
417 }
418
419 pub fn start(mut self, methods: impl Into<Methods>) -> Result<ServerHandle, Error> {
421 let max_request_body_size = self.max_request_body_size;
422 let max_response_body_size = self.max_response_body_size;
423 let max_log_length = self.max_log_length;
424 let acl = self.access_control;
425 let (tx, mut rx) = mpsc::channel(1);
426 let listener = self.listener;
427 let resources = self.resources;
428 let middleware = self.middleware;
429 let batch_requests_supported = self.batch_requests_supported;
430 let methods = methods.into().initialize_resources(&resources)?;
431 let health_api = self.health_api;
432
433 let make_service = make_service_fn(move |conn: &AddrStream| {
434 let remote_addr = conn.remote_addr();
435 let methods = methods.clone();
436 let acl = acl.clone();
437 let resources = resources.clone();
438 let middleware = middleware.clone();
439 let health_api = health_api.clone();
440
441 async move {
442 Ok::<_, HyperError>(service_fn(move |request| {
443 let request_start = middleware.on_request(remote_addr, request.headers());
444
445 let methods = methods.clone();
446 let acl = acl.clone();
447 let resources = resources.clone();
448 let middleware = middleware.clone();
449 let health_api = health_api.clone();
450
451 async move {
454 let keys = request.headers().keys().map(|k| k.as_str());
455 let cors_request_headers = http_helpers::get_cors_request_headers(request.headers());
456
457 let host = match http_helpers::read_header_value(request.headers(), "host") {
458 Some(origin) => origin,
459 None => return Ok(malformed()),
460 };
461 let maybe_origin = http_helpers::read_header_value(request.headers(), "origin");
462
463 if let Err(e) = acl.verify_host(host) {
464 tracing::warn!("Denied request: {:?}", e);
465 return Ok(response::host_not_allowed());
466 }
467
468 if let Err(e) = acl.verify_origin(maybe_origin, host) {
469 tracing::warn!("Denied request: {:?}", e);
470 return Ok(response::invalid_allow_origin());
471 }
472
473 if let Err(e) = acl.verify_headers(keys, cors_request_headers) {
474 tracing::warn!("Denied request: {:?}", e);
475 return Ok(response::invalid_allow_headers());
476 }
477
478 match *request.method() {
480 Method::OPTIONS => {
483 let origin = match maybe_origin {
484 Some(origin) => origin,
485 None => return Ok(malformed()),
486 };
487
488 let allowed_headers = acl.allowed_headers().to_cors_header_value();
489 let allowed_header_bytes = allowed_headers.as_bytes();
490
491 let res = hyper::Response::builder()
492 .header("access-control-allow-origin", origin)
493 .header("access-control-allow-methods", "POST")
494 .header("access-control-allow-headers", allowed_header_bytes)
495 .body(hyper::Body::empty())
496 .unwrap_or_else(|e| {
497 tracing::error!("Error forming preflight response: {}", e);
498 internal_error()
499 });
500
501 Ok(res)
502 }
503 Method::POST if content_type_is_json(&request) => {
507 let origin = return_origin_if_different_from_host(request.headers()).cloned();
508 let mut res = process_validated_request(ProcessValidatedRequest {
509 request,
510 middleware,
511 methods,
512 resources,
513 max_request_body_size,
514 max_response_body_size,
515 max_log_length,
516 batch_requests_supported,
517 request_start,
518 })
519 .await?;
520
521 if let Some(origin) = origin {
522 res.headers_mut().insert("access-control-allow-origin", origin);
523 }
524 Ok(res)
525 }
526 Method::GET => match health_api.as_ref() {
527 Some(health) if health.path.as_str() == request.uri().path() => {
528 process_health_request(
529 health,
530 middleware,
531 methods,
532 max_response_body_size,
533 request_start,
534 max_log_length,
535 )
536 .await
537 }
538 _ => Ok(response::method_not_allowed()),
539 },
540 Method::POST => Ok(response::unsupported_content_type()),
542 _ => Ok(response::method_not_allowed()),
543 }
544 }
545 }))
546 }
547 });
548
549 let rt = match self.tokio_runtime.take() {
550 Some(rt) => rt,
551 None => tokio::runtime::Handle::current(),
552 };
553
554 let handle = rt.spawn(async move {
555 let server = listener.serve(make_service);
556 let _ = server.with_graceful_shutdown(async move { rx.next().await.map_or((), |_| ()) }).await;
557 });
558
559 Ok(ServerHandle { handle: Some(handle), stop_sender: tx })
560 }
561}
562
563fn return_origin_if_different_from_host(headers: &HeaderMap) -> Option<&HeaderValue> {
566 if let (Some(origin), Some(host)) = (headers.get("origin"), headers.get("host")) {
567 if origin != host {
568 Some(origin)
569 } else {
570 None
571 }
572 } else {
573 None
574 }
575}
576
577fn content_type_is_json(request: &hyper::Request<hyper::Body>) -> bool {
579 is_json(request.headers().get("content-type"))
580}
581
582fn is_json(content_type: Option<&hyper::header::HeaderValue>) -> bool {
584 match content_type.and_then(|val| val.to_str().ok()) {
585 Some(content)
586 if content.eq_ignore_ascii_case("application/json")
587 || content.eq_ignore_ascii_case("application/json; charset=utf-8")
588 || content.eq_ignore_ascii_case("application/json;charset=utf-8") =>
589 {
590 true
591 }
592 _ => false,
593 }
594}
595
596struct ProcessValidatedRequest<M: Middleware> {
597 request: hyper::Request<hyper::Body>,
598 middleware: M,
599 methods: Methods,
600 resources: Resources,
601 max_request_body_size: u32,
602 max_response_body_size: u32,
603 max_log_length: u32,
604 batch_requests_supported: bool,
605 request_start: M::Instant,
606}
607
608async fn process_validated_request<M: Middleware>(
610 input: ProcessValidatedRequest<M>,
611) -> Result<hyper::Response<hyper::Body>, HyperError> {
612 let ProcessValidatedRequest {
613 request,
614 middleware,
615 methods,
616 resources,
617 max_request_body_size,
618 max_response_body_size,
619 max_log_length,
620 batch_requests_supported,
621 request_start,
622 } = input;
623
624 let (parts, body) = request.into_parts();
625
626 let (body, is_single) = match read_body(&parts.headers, body, max_request_body_size).await {
627 Ok(r) => r,
628 Err(GenericTransportError::TooLarge) => return Ok(response::too_large(max_request_body_size)),
629 Err(GenericTransportError::Malformed) => return Ok(response::malformed()),
630 Err(GenericTransportError::Inner(e)) => {
631 tracing::error!("Internal error reading request body: {}", e);
632 return Ok(response::internal_error());
633 }
634 };
635
636 if is_single {
638 let call = CallData {
639 conn_id: 0,
640 middleware: &middleware,
641 methods: &methods,
642 max_response_body_size,
643 max_log_length,
644 resources: &resources,
645 request_start,
646 };
647 let response = process_single_request(body, call).await;
648 middleware.on_response(&response.result, request_start);
649 Ok(response::ok_response(response.result))
650 }
651 else if !batch_requests_supported {
653 let err = MethodResponse::error(
654 Id::Null,
655 ErrorObject::borrowed(BATCHES_NOT_SUPPORTED_CODE, &BATCHES_NOT_SUPPORTED_MSG, None),
656 );
657 middleware.on_response(&err.result, request_start);
658 Ok(response::ok_response(err.result))
659 }
660 else {
662 let response = process_batch_request(Batch {
663 data: body,
664 call: CallData {
665 conn_id: 0,
666 middleware: &middleware,
667 methods: &methods,
668 max_response_body_size,
669 max_log_length,
670 resources: &resources,
671 request_start,
672 },
673 })
674 .await;
675 middleware.on_response(&response.result, request_start);
676 Ok(response::ok_response(response.result))
677 }
678}
679
680async fn process_health_request<M: Middleware>(
681 health_api: &HealthApi,
682 middleware: M,
683 methods: Methods,
684 max_response_body_size: u32,
685 request_start: M::Instant,
686 max_log_length: u32,
687) -> Result<hyper::Response<hyper::Body>, HyperError> {
688 let trace = RpcTracing::method_call(&health_api.method);
689 async {
690 tx_log_from_str("HTTP health API", max_log_length);
691 let response = match methods.method_with_name(&health_api.method) {
692 None => MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::MethodNotFound)),
693 Some((_name, method_callback)) => match method_callback.inner() {
694 MethodKind::Sync(callback) => {
695 (callback)(Id::Number(0), Params::new(None), max_response_body_size as usize)
696 }
697 MethodKind::Async(callback) => {
698 (callback)(Id::Number(0), Params::new(None), 0, max_response_body_size as usize, None).await
699 }
700 MethodKind::Subscription(_) | MethodKind::Unsubscription(_) => {
701 MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::InternalError))
702 }
703 },
704 };
705
706 rx_log_from_str(&response.result, max_log_length);
707 middleware.on_result(&health_api.method, response.success, request_start);
708 middleware.on_response(&response.result, request_start);
709
710 if response.success {
711 #[derive(serde::Deserialize)]
712 struct RpcPayload<'a> {
713 #[serde(borrow)]
714 result: &'a serde_json::value::RawValue,
715 }
716
717 let payload: RpcPayload = serde_json::from_str(&response.result)
718 .expect("valid JSON-RPC response must have a result field and be valid JSON; qed");
719 Ok(response::ok_response(payload.result.to_string()))
720 } else {
721 Ok(response::internal_error())
722 }
723 }
724 .instrument(trace.into_span())
725 .await
726}
727
728#[derive(Debug, Clone)]
729struct Batch<'a, M: Middleware> {
730 data: Vec<u8>,
731 call: CallData<'a, M>,
732}
733
734#[derive(Debug, Clone)]
735struct CallData<'a, M: Middleware> {
736 conn_id: usize,
737 middleware: &'a M,
738 methods: &'a Methods,
739 max_response_body_size: u32,
740 max_log_length: u32,
741 resources: &'a Resources,
742 request_start: M::Instant,
743}
744
745#[derive(Debug, Clone)]
746struct Call<'a, M: Middleware> {
747 params: Params<'a>,
748 name: &'a str,
749 call: CallData<'a, M>,
750 id: Id<'a>,
751}
752
753async fn process_batch_request<M>(b: Batch<'_, M>) -> BatchResponse
757where
758 M: Middleware,
759{
760 let Batch { data, call } = b;
761
762 if let Ok(batch) = serde_json::from_slice::<Vec<Request>>(&data) {
763 let max_response_size = call.max_response_body_size;
764 let batch = batch.into_iter().map(|req| Ok((req, call.clone())));
765
766 let batch_stream = futures_util::stream::iter(batch);
767
768 let trace = RpcTracing::batch();
769 return async {
770 let batch_response = batch_stream
771 .try_fold(
772 BatchResponseBuilder::new_with_limit(max_response_size as usize),
773 |batch_response, (req, call)| async move {
774 let params = Params::new(req.params.map(|params| params.get()));
775 let response = execute_call(Call { name: &req.method, params, id: req.id, call }).await;
776 batch_response.append(&response)
777 },
778 )
779 .await;
780
781 match batch_response {
782 Ok(batch) => batch.finish(),
783 Err(batch_err) => batch_err,
784 }
785 }
786 .instrument(trace.into_span())
787 .await;
788 }
789
790 if let Ok(batch) = serde_json::from_slice::<Vec<Notif>>(&data) {
791 return if !batch.is_empty() {
792 BatchResponse { result: "".to_string(), success: true }
793 } else {
794 BatchResponse::error(Id::Null, ErrorObject::from(ErrorCode::InvalidRequest))
795 };
796 }
797
798 let (id, code) = prepare_error(&data);
802 BatchResponse::error(id, ErrorObject::from(code))
803}
804
805async fn process_single_request<M: Middleware>(data: Vec<u8>, call: CallData<'_, M>) -> MethodResponse {
806 if let Ok(req) = serde_json::from_slice::<Request>(&data) {
807 let trace = RpcTracing::method_call(&req.method);
808 async {
809 rx_log_from_json(&req, call.max_log_length);
810 let params = Params::new(req.params.map(|params| params.get()));
811 let name = &req.method;
812 let id = req.id;
813 execute_call(Call { name, params, id, call }).await
814 }
815 .instrument(trace.into_span())
816 .await
817 } else if let Ok(req) = serde_json::from_slice::<Notif>(&data) {
818 let trace = RpcTracing::notification(&req.method);
819 let span = trace.into_span();
820 let _enter = span.enter();
821 rx_log_from_json(&req, call.max_log_length);
822
823 MethodResponse { result: String::new(), success: true }
824 } else {
825 let (id, code) = prepare_error(&data);
826 MethodResponse::error(id, ErrorObject::from(code))
827 }
828}
829
830async fn execute_call<M: Middleware>(c: Call<'_, M>) -> MethodResponse {
831 let Call { name, id, params, call } = c;
832 let CallData { resources, methods, middleware, max_response_body_size, max_log_length, conn_id, request_start } =
833 call;
834
835 let response = match methods.method_with_name(name) {
836 None => {
837 middleware.on_call(name, params.clone(), middleware::MethodKind::Unknown);
838 MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound))
839 }
840 Some((name, method)) => match &method.inner() {
841 MethodKind::Sync(callback) => {
842 middleware.on_call(name, params.clone(), middleware::MethodKind::MethodCall);
843
844 match method.claim(name, resources) {
845 Ok(guard) => {
846 let r = (callback)(id, params, max_response_body_size as usize);
847 drop(guard);
848 r
849 }
850 Err(err) => {
851 tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err);
852 MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy))
853 }
854 }
855 }
856 MethodKind::Async(callback) => {
857 middleware.on_call(name, params.clone(), middleware::MethodKind::MethodCall);
858 match method.claim(name, resources) {
859 Ok(guard) => {
860 let id = id.into_owned();
861 let params = params.into_owned();
862
863 (callback)(id, params, conn_id, max_response_body_size as usize, Some(guard)).await
864 }
865 Err(err) => {
866 tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err);
867 MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy))
868 }
869 }
870 }
871 MethodKind::Subscription(_) | MethodKind::Unsubscription(_) => {
872 middleware.on_call(name, params.clone(), middleware::MethodKind::Unknown);
873 tracing::error!("Subscriptions not supported on HTTP");
874 MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError))
875 }
876 },
877 };
878
879 tx_log_from_str(&response.result, max_log_length);
880 middleware.on_result(name, response.success, request_start);
881 response
882}