1use std::error::Error as StdError;
28use std::future::Future;
29use std::net::{SocketAddr, TcpListener as StdTcpListener};
30use std::pin::Pin;
31use std::sync::Arc;
32use std::sync::atomic::AtomicU32;
33use std::task::Poll;
34use std::time::Duration;
35
36use crate::future::{ConnectionGuard, ServerHandle, SessionClose, SessionClosedFuture, StopHandle, session_close};
37use crate::middleware::rpc::{RpcService, RpcServiceCfg};
38use crate::transport::ws::BackgroundTaskParams;
39use crate::transport::{http, ws};
40use crate::utils::deserialize_with_ext;
41use crate::{Extensions, HttpBody, HttpRequest, HttpResponse, LOG_TARGET};
42
43use futures_util::future::{self, Either, FutureExt};
44use futures_util::io::{BufReader, BufWriter};
45use hyper::body::Bytes;
46use hyper_util::rt::{TokioExecutor, TokioIo};
47use jsonrpsee_core::id_providers::RandomIntegerIdProvider;
48use jsonrpsee_core::middleware::{Batch, BatchEntry, BatchEntryErr, RpcServiceBuilder, RpcServiceT};
49use jsonrpsee_core::server::helpers::prepare_error;
50use jsonrpsee_core::server::{BoundedSubscriptions, ConnectionId, MethodResponse, MethodSink, Methods};
51use jsonrpsee_core::traits::IdProvider;
52use jsonrpsee_core::{BoxError, JsonRawValue, TEN_MB_SIZE_BYTES};
53use jsonrpsee_types::error::{
54 BATCHES_NOT_SUPPORTED_CODE, BATCHES_NOT_SUPPORTED_MSG, ErrorCode, reject_too_big_batch_request,
55};
56use jsonrpsee_types::{ErrorObject, Id};
57use soketto::handshake::http::is_upgrade_request;
58use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
59use tokio::sync::{OwnedSemaphorePermit, mpsc, watch};
60use tokio_util::compat::TokioAsyncReadCompatExt;
61use tower::layer::util::Identity;
62use tower::{Layer, Service};
63use tracing::{Instrument, instrument};
64
65const MAX_CONNECTIONS: u32 = 100;
67
68type Notif<'a> = Option<std::borrow::Cow<'a, JsonRawValue>>;
69
70pub struct Server<HttpMiddleware = Identity, RpcMiddleware = Identity> {
72 listener: TcpListener,
73 server_cfg: ServerConfig,
74 rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
75 http_middleware: tower::ServiceBuilder<HttpMiddleware>,
76}
77
78impl Server<Identity, Identity> {
79 pub fn builder() -> Builder<Identity, Identity> {
81 Builder::new()
82 }
83}
84
85impl<RpcMiddleware, HttpMiddleware> std::fmt::Debug for Server<RpcMiddleware, HttpMiddleware> {
86 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87 f.debug_struct("Server").field("listener", &self.listener).field("server_cfg", &self.server_cfg).finish()
88 }
89}
90
91impl<RpcMiddleware, HttpMiddleware> Server<RpcMiddleware, HttpMiddleware> {
92 pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
94 self.listener.local_addr()
95 }
96}
97
98impl<HttpMiddleware, RpcMiddleware, Body> Server<HttpMiddleware, RpcMiddleware>
99where
100 RpcMiddleware: tower::Layer<RpcService> + Clone + Send + 'static,
101 <RpcMiddleware as Layer<RpcService>>::Service: RpcServiceT,
102 HttpMiddleware: Layer<TowerServiceNoHttp<RpcMiddleware>> + Send + 'static,
103 <HttpMiddleware as Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service:
104 Send + Clone + Service<HttpRequest, Response = HttpResponse<Body>, Error = BoxError>,
105 <<HttpMiddleware as Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service as Service<HttpRequest>>::Future: Send,
106 Body: http_body::Body<Data = Bytes> + Send + 'static,
107 <Body as http_body::Body>::Error: Into<BoxError>,
108 <Body as http_body::Body>::Data: Send,
109{
110 pub fn start(mut self, methods: impl Into<Methods>) -> ServerHandle {
114 let methods = methods.into();
115 let (stop_tx, stop_rx) = watch::channel(());
116
117 let stop_handle = StopHandle::new(stop_rx);
118
119 match self.server_cfg.tokio_runtime.take() {
120 Some(rt) => rt.spawn(self.start_inner(methods, stop_handle)),
121 None => tokio::spawn(self.start_inner(methods, stop_handle)),
122 };
123
124 ServerHandle::new(stop_tx)
125 }
126
127 async fn start_inner(self, methods: Methods, stop_handle: StopHandle) {
128 let mut id: u32 = 0;
129 let connection_guard = ConnectionGuard::new(self.server_cfg.max_connections as usize);
130 let listener = self.listener;
131
132 let stopped = stop_handle.clone().shutdown();
133 tokio::pin!(stopped);
134
135 let (drop_on_completion, mut process_connection_awaiter) = mpsc::channel::<()>(1);
136
137 loop {
138 match try_accept_conn(&listener, stopped).await {
139 AcceptConnection::Established { socket, remote_addr, stop } => {
140 process_connection(ProcessConnection {
141 http_middleware: &self.http_middleware,
142 rpc_middleware: self.rpc_middleware.clone(),
143 remote_addr,
144 methods: methods.clone(),
145 stop_handle: stop_handle.clone(),
146 conn_id: id,
147 server_cfg: self.server_cfg.clone(),
148 conn_guard: &connection_guard,
149 socket,
150 drop_on_completion: drop_on_completion.clone(),
151 });
152 id = id.wrapping_add(1);
153 stopped = stop;
154 }
155 AcceptConnection::Err((e, stop)) => {
156 tracing::debug!(target: LOG_TARGET, "Error while awaiting a new connection: {:?}", e);
157 stopped = stop;
158 }
159 AcceptConnection::Shutdown => break,
160 }
161 }
162
163 drop(drop_on_completion);
165
166 while process_connection_awaiter.recv().await.is_some() {
168 }
171 }
172}
173
174#[derive(Debug, Clone)]
176pub struct ServerConfig {
177 pub(crate) max_request_body_size: u32,
179 pub(crate) max_response_body_size: u32,
181 pub(crate) max_connections: u32,
183 pub(crate) max_subscriptions_per_connection: u32,
185 pub(crate) batch_requests_config: BatchRequestConfig,
187 pub(crate) tokio_runtime: Option<tokio::runtime::Handle>,
189 pub(crate) enable_http: bool,
191 pub(crate) enable_ws: bool,
193 pub(crate) message_buffer_capacity: u32,
195 pub(crate) ping_config: Option<PingConfig>,
197 pub(crate) id_provider: Arc<dyn IdProvider>,
199 pub(crate) tcp_no_delay: bool,
201}
202
203#[derive(Debug, Clone)]
205pub struct ServerConfigBuilder {
206 max_request_body_size: u32,
208 max_response_body_size: u32,
210 max_connections: u32,
212 max_subscriptions_per_connection: u32,
214 batch_requests_config: BatchRequestConfig,
216 tokio_runtime: Option<tokio::runtime::Handle>,
218 enable_http: bool,
220 enable_ws: bool,
222 message_buffer_capacity: u32,
224 ping_config: Option<PingConfig>,
226 id_provider: Arc<dyn IdProvider>,
228 tcp_no_delay: bool,
230}
231
232#[derive(Debug, Clone)]
234pub struct TowerServiceBuilder<RpcMiddleware, HttpMiddleware> {
235 pub(crate) server_cfg: ServerConfig,
237 pub(crate) rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
239 pub(crate) http_middleware: tower::ServiceBuilder<HttpMiddleware>,
241 pub(crate) conn_id: Arc<AtomicU32>,
243 pub(crate) conn_guard: ConnectionGuard,
245}
246
247#[derive(Debug, Copy, Clone)]
249pub enum BatchRequestConfig {
250 Disabled,
252 Limit(u32),
254 Unlimited,
256}
257
258#[derive(Debug, Clone)]
261pub struct ConnectionState {
262 pub(crate) stop_handle: StopHandle,
264 pub(crate) conn_id: u32,
266 pub(crate) _conn_permit: Arc<OwnedSemaphorePermit>,
268}
269
270impl ConnectionState {
271 pub fn new(stop_handle: StopHandle, conn_id: u32, conn_permit: OwnedSemaphorePermit) -> ConnectionState {
273 Self { stop_handle, conn_id, _conn_permit: Arc::new(conn_permit) }
274 }
275}
276
277#[derive(Debug, Copy, Clone)]
290pub struct PingConfig {
291 pub(crate) ping_interval: Duration,
293 pub(crate) inactive_limit: Duration,
295 pub(crate) max_failures: usize,
297}
298
299impl Default for PingConfig {
300 fn default() -> Self {
301 Self { ping_interval: Duration::from_secs(30), max_failures: 1, inactive_limit: Duration::from_secs(40) }
302 }
303}
304
305impl PingConfig {
306 pub fn new() -> Self {
308 Self::default()
309 }
310
311 pub fn ping_interval(mut self, ping_interval: Duration) -> Self {
313 self.ping_interval = ping_interval;
314 self
315 }
316
317 pub fn inactive_limit(mut self, inactivity_limit: Duration) -> Self {
323 self.inactive_limit = inactivity_limit;
324 self
325 }
326
327 pub fn max_failures(mut self, max: usize) -> Self {
334 assert!(max > 0);
335 self.max_failures = max;
336 self
337 }
338}
339
340impl Default for ServerConfig {
341 fn default() -> Self {
342 ServerConfig::builder().build()
343 }
344}
345
346impl ServerConfig {
347 pub fn builder() -> ServerConfigBuilder {
349 ServerConfigBuilder::default()
350 }
351}
352
353impl Default for ServerConfigBuilder {
354 fn default() -> Self {
355 ServerConfigBuilder {
356 max_request_body_size: TEN_MB_SIZE_BYTES,
357 max_response_body_size: TEN_MB_SIZE_BYTES,
358 max_connections: MAX_CONNECTIONS,
359 max_subscriptions_per_connection: 1024,
360 batch_requests_config: BatchRequestConfig::Unlimited,
361 tokio_runtime: None,
362 enable_http: true,
363 enable_ws: true,
364 message_buffer_capacity: 1024,
365 ping_config: None,
366 id_provider: Arc::new(RandomIntegerIdProvider),
367 tcp_no_delay: true,
368 }
369 }
370}
371
372impl ServerConfigBuilder {
373 pub fn new() -> Self {
375 Self::default()
376 }
377
378 pub fn max_request_body_size(mut self, size: u32) -> Self {
380 self.max_request_body_size = size;
381 self
382 }
383
384 pub fn max_response_body_size(mut self, size: u32) -> Self {
386 self.max_response_body_size = size;
387 self
388 }
389
390 pub fn max_connections(mut self, max: u32) -> Self {
392 self.max_connections = max;
393 self
394 }
395
396 pub fn max_subscriptions_per_connection(mut self, max: u32) -> Self {
398 self.max_subscriptions_per_connection = max;
399 self
400 }
401
402 pub fn set_batch_request_config(mut self, cfg: BatchRequestConfig) -> Self {
407 self.batch_requests_config = cfg;
408 self
409 }
410
411 pub fn custom_tokio_runtime(mut self, rt: tokio::runtime::Handle) -> Self {
415 self.tokio_runtime = Some(rt);
416 self
417 }
418
419 pub fn http_only(mut self) -> Self {
423 self.enable_http = true;
424 self.enable_ws = false;
425 self
426 }
427
428 pub fn ws_only(mut self) -> Self {
434 self.enable_http = false;
435 self.enable_ws = true;
436 self
437 }
438
439 pub fn set_message_buffer_capacity(mut self, c: u32) -> Self {
458 assert!(c > 0, "buffer capacity must be set to > 0");
459 self.message_buffer_capacity = c;
460 self
461 }
462
463 pub fn enable_ws_ping(mut self, config: PingConfig) -> Self {
478 self.ping_config = Some(config);
479 self
480 }
481
482 pub fn disable_ws_ping(mut self) -> Self {
486 self.ping_config = None;
487 self
488 }
489
490 pub fn set_id_provider<I: IdProvider + 'static>(mut self, id_provider: I) -> Self {
511 self.id_provider = Arc::new(id_provider);
512 self
513 }
514
515 pub fn set_tcp_no_delay(mut self, no_delay: bool) -> Self {
519 self.tcp_no_delay = no_delay;
520 self
521 }
522
523 pub fn build(self) -> ServerConfig {
525 ServerConfig {
526 max_request_body_size: self.max_request_body_size,
527 max_response_body_size: self.max_response_body_size,
528 max_connections: self.max_connections,
529 max_subscriptions_per_connection: self.max_subscriptions_per_connection,
530 batch_requests_config: self.batch_requests_config,
531 tokio_runtime: self.tokio_runtime,
532 enable_http: self.enable_http,
533 enable_ws: self.enable_ws,
534 message_buffer_capacity: self.message_buffer_capacity,
535 ping_config: self.ping_config,
536 id_provider: self.id_provider,
537 tcp_no_delay: self.tcp_no_delay,
538 }
539 }
540}
541
542#[derive(Debug)]
544pub struct Builder<HttpMiddleware, RpcMiddleware> {
545 server_cfg: ServerConfig,
546 rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
547 http_middleware: tower::ServiceBuilder<HttpMiddleware>,
548}
549
550impl Default for Builder<Identity, Identity> {
551 fn default() -> Self {
552 Builder {
553 server_cfg: ServerConfig::default(),
554 rpc_middleware: RpcServiceBuilder::new(),
555 http_middleware: tower::ServiceBuilder::new(),
556 }
557 }
558}
559
560impl Builder<Identity, Identity> {
561 pub fn new() -> Self {
563 Self::default()
564 }
565
566 pub fn with_config(config: ServerConfig) -> Self {
568 Self { server_cfg: config, ..Default::default() }
569 }
570}
571
572impl<RpcMiddleware, HttpMiddleware> TowerServiceBuilder<RpcMiddleware, HttpMiddleware> {
573 pub fn build(
575 self,
576 methods: impl Into<Methods>,
577 stop_handle: StopHandle,
578 ) -> TowerService<RpcMiddleware, HttpMiddleware> {
579 let conn_id = self.conn_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
580
581 let rpc_middleware = TowerServiceNoHttp {
582 rpc_middleware: self.rpc_middleware,
583 inner: ServiceData {
584 methods: methods.into(),
585 stop_handle,
586 conn_id,
587 conn_guard: self.conn_guard,
588 server_cfg: self.server_cfg,
589 },
590 on_session_close: None,
591 };
592
593 TowerService { rpc_middleware, http_middleware: self.http_middleware }
594 }
595
596 pub fn connection_id(mut self, id: u32) -> Self {
600 self.conn_id = Arc::new(AtomicU32::new(id));
601 self
602 }
603
604 pub fn max_connections(mut self, limit: u32) -> Self {
606 self.conn_guard = ConnectionGuard::new(limit as usize);
607 self
608 }
609
610 pub fn set_rpc_middleware<T>(self, rpc_middleware: RpcServiceBuilder<T>) -> TowerServiceBuilder<T, HttpMiddleware> {
612 TowerServiceBuilder {
613 server_cfg: self.server_cfg,
614 rpc_middleware,
615 http_middleware: self.http_middleware,
616 conn_id: self.conn_id,
617 conn_guard: self.conn_guard,
618 }
619 }
620
621 pub fn set_http_middleware<T>(
623 self,
624 http_middleware: tower::ServiceBuilder<T>,
625 ) -> TowerServiceBuilder<RpcMiddleware, T> {
626 TowerServiceBuilder {
627 server_cfg: self.server_cfg,
628 rpc_middleware: self.rpc_middleware,
629 http_middleware,
630 conn_id: self.conn_id,
631 conn_guard: self.conn_guard,
632 }
633 }
634}
635
636impl<HttpMiddleware, RpcMiddleware> Builder<HttpMiddleware, RpcMiddleware> {
637 pub fn set_config(mut self, cfg: ServerConfig) -> Self {
639 self.server_cfg = cfg;
640 self
641 }
642
643 pub fn set_rpc_middleware<T>(self, rpc_middleware: RpcServiceBuilder<T>) -> Builder<HttpMiddleware, T> {
710 Builder { server_cfg: self.server_cfg, rpc_middleware, http_middleware: self.http_middleware }
711 }
712
713 pub fn set_http_middleware<T>(self, http_middleware: tower::ServiceBuilder<T>) -> Builder<T, RpcMiddleware> {
736 Builder { server_cfg: self.server_cfg, http_middleware, rpc_middleware: self.rpc_middleware }
737 }
738
739 pub fn to_service_builder(self) -> TowerServiceBuilder<RpcMiddleware, HttpMiddleware> {
815 let max_conns = self.server_cfg.max_connections as usize;
816
817 TowerServiceBuilder {
818 server_cfg: self.server_cfg,
819 rpc_middleware: self.rpc_middleware,
820 http_middleware: self.http_middleware,
821 conn_id: Arc::new(AtomicU32::new(0)),
822 conn_guard: ConnectionGuard::new(max_conns),
823 }
824 }
825
826 pub async fn build(self, addrs: impl ToSocketAddrs) -> std::io::Result<Server<HttpMiddleware, RpcMiddleware>> {
843 let listener = TcpListener::bind(addrs).await?;
844
845 Ok(Server {
846 listener,
847 server_cfg: self.server_cfg,
848 rpc_middleware: self.rpc_middleware,
849 http_middleware: self.http_middleware,
850 })
851 }
852
853 pub fn build_from_tcp(
877 self,
878 listener: impl Into<StdTcpListener>,
879 ) -> std::io::Result<Server<HttpMiddleware, RpcMiddleware>> {
880 let listener = TcpListener::from_std(listener.into())?;
881
882 Ok(Server {
883 listener,
884 server_cfg: self.server_cfg,
885 rpc_middleware: self.rpc_middleware,
886 http_middleware: self.http_middleware,
887 })
888 }
889}
890
891#[derive(Debug, Clone)]
893struct ServiceData {
894 methods: Methods,
896 stop_handle: StopHandle,
898 conn_id: u32,
900 conn_guard: ConnectionGuard,
902 server_cfg: ServerConfig,
904}
905
906#[derive(Debug, Clone)]
911pub struct TowerService<RpcMiddleware, HttpMiddleware> {
912 rpc_middleware: TowerServiceNoHttp<RpcMiddleware>,
913 http_middleware: tower::ServiceBuilder<HttpMiddleware>,
914}
915
916impl<RpcMiddleware, HttpMiddleware> TowerService<RpcMiddleware, HttpMiddleware> {
917 pub fn on_session_closed(&mut self) -> SessionClosedFuture {
923 if let Some(n) = self.rpc_middleware.on_session_close.as_mut() {
924 n.closed()
926 } else {
927 let (session_close, fut) = session_close();
928 self.rpc_middleware.on_session_close = Some(session_close);
929 fut
930 }
931 }
932}
933
934impl<RequestBody, ResponseBody, RpcMiddleware, HttpMiddleware> Service<HttpRequest<RequestBody>> for TowerService<RpcMiddleware, HttpMiddleware>
935where
936 RpcMiddleware: tower::Layer<RpcService> + Clone,
937 <RpcMiddleware as Layer<RpcService>>::Service: RpcServiceT + Send + Sync + 'static,
938 HttpMiddleware: Layer<TowerServiceNoHttp<RpcMiddleware>> + Send + 'static,
939 <HttpMiddleware as Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service:
940 Send + Service<HttpRequest<RequestBody>, Response = HttpResponse<ResponseBody>, Error = Box<(dyn StdError + Send + Sync + 'static)>>,
941 <<HttpMiddleware as Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service as Service<HttpRequest<RequestBody>>>::Future:
942 Send + 'static,
943 RequestBody: http_body::Body<Data = Bytes> + Send + 'static,
944 RequestBody::Error: Into<BoxError>,
945{
946 type Response = HttpResponse<ResponseBody>;
947 type Error = BoxError;
948 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
949
950 fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
951 Poll::Ready(Ok(()))
952 }
953
954 fn call(&mut self, request: HttpRequest<RequestBody>) -> Self::Future {
955 Box::pin(self.http_middleware.service(self.rpc_middleware.clone()).call(request))
956 }
957}
958
959#[derive(Debug, Clone)]
964pub struct TowerServiceNoHttp<L> {
965 inner: ServiceData,
966 rpc_middleware: RpcServiceBuilder<L>,
967 on_session_close: Option<SessionClose>,
968}
969
970impl<Body, RpcMiddleware> Service<HttpRequest<Body>> for TowerServiceNoHttp<RpcMiddleware>
971where
972 RpcMiddleware: tower::Layer<RpcService>,
973 <RpcMiddleware as Layer<RpcService>>::Service: RpcServiceT<
974 MethodResponse = MethodResponse,
975 BatchResponse = MethodResponse,
976 NotificationResponse = MethodResponse,
977 > + Send
978 + Sync
979 + 'static,
980 Body: http_body::Body<Data = Bytes> + Send + 'static,
981 Body::Error: Into<BoxError>,
982{
983 type Response = HttpResponse;
984
985 type Error = BoxError;
988
989 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
990
991 fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
992 Poll::Ready(Ok(()))
993 }
994
995 fn call(&mut self, request: HttpRequest<Body>) -> Self::Future {
996 let mut request = request.map(HttpBody::new);
997
998 let conn_guard = &self.inner.conn_guard;
999 let stop_handle = self.inner.stop_handle.clone();
1000 let conn_id = self.inner.conn_id;
1001 let on_session_close = self.on_session_close.take();
1002
1003 tracing::trace!(target: LOG_TARGET, "{:?}", request);
1004
1005 let Some(conn_permit) = conn_guard.try_acquire() else {
1006 return async move { Ok(http::response::too_many_requests()) }.boxed();
1007 };
1008
1009 let conn = ConnectionState::new(stop_handle.clone(), conn_id, conn_permit);
1010
1011 let max_conns = conn_guard.max_connections();
1012 let curr_conns = max_conns - conn_guard.available_connections();
1013 tracing::debug!(target: LOG_TARGET, "Accepting new connection {}/{}", curr_conns, max_conns);
1014
1015 let req_ext = request.extensions_mut();
1016 req_ext.insert::<ConnectionGuard>(conn_guard.clone());
1017 req_ext.insert::<ConnectionId>(conn.conn_id.into());
1018
1019 let is_upgrade_request = is_upgrade_request(&request);
1020
1021 if self.inner.server_cfg.enable_ws && is_upgrade_request {
1022 let this = self.inner.clone();
1023
1024 let mut server = soketto::handshake::http::Server::new();
1025
1026 let response = match server.receive_request(&request) {
1027 Ok(response) => {
1028 let (tx, rx) = mpsc::channel(this.server_cfg.message_buffer_capacity as usize);
1029 let sink = MethodSink::new(tx);
1030
1031 let (pending_calls, pending_calls_completed) = mpsc::channel::<()>(1);
1035
1036 let cfg = RpcServiceCfg::CallsAndSubscriptions {
1037 bounded_subscriptions: BoundedSubscriptions::new(
1038 this.server_cfg.max_subscriptions_per_connection,
1039 ),
1040 id_provider: this.server_cfg.id_provider.clone(),
1041 sink: sink.clone(),
1042 _pending_calls: pending_calls,
1043 };
1044
1045 let rpc_service = RpcService::new(
1046 this.methods.clone(),
1047 this.server_cfg.max_response_body_size as usize,
1048 this.conn_id.into(),
1049 cfg,
1050 );
1051
1052 let rpc_service = self.rpc_middleware.service(rpc_service);
1053
1054 tokio::spawn(
1055 async move {
1056 let extensions = request.extensions().clone();
1057
1058 let upgraded = match hyper::upgrade::on(request).await {
1059 Ok(u) => u,
1060 Err(e) => {
1061 tracing::debug!(target: LOG_TARGET, "Could not upgrade connection: {}", e);
1062 return;
1063 }
1064 };
1065
1066 let io = hyper_util::rt::TokioIo::new(upgraded);
1067
1068 let stream = BufReader::new(BufWriter::new(io.compat()));
1069 let mut ws_builder = server.into_builder(stream);
1070 ws_builder.set_max_message_size(this.server_cfg.max_request_body_size as usize);
1071 let (sender, receiver) = ws_builder.finish();
1072
1073 let params = BackgroundTaskParams {
1074 server_cfg: this.server_cfg,
1075 conn,
1076 ws_sender: sender,
1077 ws_receiver: receiver,
1078 rpc_service,
1079 sink,
1080 rx,
1081 pending_calls_completed,
1082 on_session_close,
1083 extensions,
1084 };
1085
1086 ws::background_task(params).await;
1087 }
1088 .in_current_span(),
1089 );
1090
1091 response.map(|()| HttpBody::empty())
1092 }
1093 Err(e) => {
1094 tracing::debug!(target: LOG_TARGET, "Could not upgrade connection: {}", e);
1095 HttpResponse::new(HttpBody::from(format!("Could not upgrade connection: {e}")))
1096 }
1097 };
1098
1099 async { Ok(response) }.boxed()
1100 } else if self.inner.server_cfg.enable_http && !is_upgrade_request {
1101 let this = &self.inner;
1102 let max_response_size = this.server_cfg.max_response_body_size;
1103 let max_request_size = this.server_cfg.max_request_body_size;
1104 let methods = this.methods.clone();
1105 let batch_config = this.server_cfg.batch_requests_config;
1106
1107 let rpc_service = self.rpc_middleware.service(RpcService::new(
1108 methods,
1109 max_response_size as usize,
1110 this.conn_id.into(),
1111 RpcServiceCfg::OnlyCalls,
1112 ));
1113
1114 Box::pin(async move {
1115 let rp = http::call_with_service(request, batch_config, max_request_size, rpc_service).await;
1116 drop(conn);
1119 Ok(rp)
1120 })
1121 } else {
1122 Box::pin(async { Ok(http::response::denied()) })
1125 }
1126 }
1127}
1128
1129struct ProcessConnection<'a, HttpMiddleware, RpcMiddleware> {
1130 http_middleware: &'a tower::ServiceBuilder<HttpMiddleware>,
1131 rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
1132 conn_guard: &'a ConnectionGuard,
1133 conn_id: u32,
1134 server_cfg: ServerConfig,
1135 stop_handle: StopHandle,
1136 socket: TcpStream,
1137 drop_on_completion: mpsc::Sender<()>,
1138 remote_addr: SocketAddr,
1139 methods: Methods,
1140}
1141
1142#[instrument(name = "connection", skip_all, fields(remote_addr = %params.remote_addr, conn_id = %params.conn_id), level = "INFO")]
1143fn process_connection<'a, RpcMiddleware, HttpMiddleware, Body>(params: ProcessConnection<HttpMiddleware, RpcMiddleware>)
1144where
1145 RpcMiddleware: 'static,
1146 HttpMiddleware: Layer<TowerServiceNoHttp<RpcMiddleware>> + Send + 'static,
1147 <HttpMiddleware as Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service:
1148 Send + 'static + Clone + Service<HttpRequest, Response = HttpResponse<Body>, Error = BoxError>,
1149 <<HttpMiddleware as Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service as Service<HttpRequest>>::Future:
1150 Send + 'static,
1151 Body: http_body::Body<Data = Bytes> + Send + 'static,
1152 <Body as http_body::Body>::Error: Into<BoxError>,
1153 <Body as http_body::Body>::Data: Send,
1154{
1155 let ProcessConnection {
1156 http_middleware,
1157 rpc_middleware,
1158 conn_guard,
1159 conn_id,
1160 server_cfg,
1161 socket,
1162 stop_handle,
1163 drop_on_completion,
1164 methods,
1165 ..
1166 } = params;
1167
1168 if let Err(e) = socket.set_nodelay(server_cfg.tcp_no_delay) {
1169 tracing::warn!(target: LOG_TARGET, "Could not set NODELAY on socket: {:?}", e);
1170 return;
1171 }
1172
1173 let tower_service = TowerServiceNoHttp {
1174 inner: ServiceData {
1175 server_cfg,
1176 methods,
1177 stop_handle: stop_handle.clone(),
1178 conn_id,
1179 conn_guard: conn_guard.clone(),
1180 },
1181 rpc_middleware,
1182 on_session_close: None,
1183 };
1184
1185 let service = http_middleware.service(tower_service);
1186
1187 tokio::spawn(async {
1188 let service = crate::utils::TowerToHyperService::new(service);
1190 let io = TokioIo::new(socket);
1191 let builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
1192
1193 let conn = builder.serve_connection_with_upgrades(io, service);
1194 let stopped = stop_handle.shutdown();
1195
1196 tokio::pin!(stopped, conn);
1197
1198 let res = match future::select(conn, stopped).await {
1199 Either::Left((conn, _)) => conn,
1200 Either::Right((_, mut conn)) => {
1201 conn.as_mut().graceful_shutdown();
1204 conn.await
1205 }
1206 };
1207
1208 if let Err(e) = res {
1209 tracing::debug!(target: LOG_TARGET, "HTTP serve connection failed {:?}", e);
1210 }
1211 drop(drop_on_completion)
1212 });
1213}
1214
1215enum AcceptConnection<S> {
1216 Shutdown,
1217 Established { socket: TcpStream, remote_addr: SocketAddr, stop: S },
1218 Err((std::io::Error, S)),
1219}
1220
1221async fn try_accept_conn<S>(listener: &TcpListener, stopped: S) -> AcceptConnection<S>
1222where
1223 S: Future + Unpin,
1224{
1225 let accept = listener.accept();
1226 tokio::pin!(accept);
1227
1228 match futures_util::future::select(accept, stopped).await {
1229 Either::Left((res, stop)) => match res {
1230 Ok((socket, remote_addr)) => AcceptConnection::Established { socket, remote_addr, stop },
1231 Err(e) => AcceptConnection::Err((e, stop)),
1232 },
1233 Either::Right(_) => AcceptConnection::Shutdown,
1234 }
1235}
1236
1237pub(crate) async fn handle_rpc_call<S>(
1238 body: &[u8],
1239 is_single: bool,
1240 batch_config: BatchRequestConfig,
1241 rpc_service: &S,
1242 extensions: Extensions,
1243) -> MethodResponse
1244where
1245 S: RpcServiceT<
1246 MethodResponse = MethodResponse,
1247 BatchResponse = MethodResponse,
1248 NotificationResponse = MethodResponse,
1249 > + Send,
1250{
1251 if is_single {
1253 if let Ok(req) = deserialize_with_ext::call::from_slice(body, &extensions) {
1254 rpc_service.call(req).await
1255 } else if let Ok(notif) = deserialize_with_ext::notif::from_slice::<Notif>(body, &extensions) {
1256 rpc_service.notification(notif).await
1257 } else {
1258 let (id, code) = prepare_error(body);
1259 MethodResponse::error(id, ErrorObject::from(code))
1260 }
1261 }
1262 else {
1264 let max_len = match batch_config {
1265 BatchRequestConfig::Disabled => {
1266 let rp = MethodResponse::error(
1267 Id::Null,
1268 ErrorObject::borrowed(BATCHES_NOT_SUPPORTED_CODE, BATCHES_NOT_SUPPORTED_MSG, None),
1269 );
1270 return rp;
1271 }
1272 BatchRequestConfig::Limit(limit) => limit as usize,
1273 BatchRequestConfig::Unlimited => usize::MAX,
1274 };
1275
1276 if let Ok(unchecked_batch) = serde_json::from_slice::<Vec<&JsonRawValue>>(body) {
1277 if unchecked_batch.len() > max_len {
1278 return MethodResponse::error(Id::Null, reject_too_big_batch_request(max_len));
1279 }
1280
1281 let mut batch = Vec::with_capacity(unchecked_batch.len());
1282
1283 for call in unchecked_batch {
1284 if let Ok(req) = deserialize_with_ext::call::from_str(call.get(), &extensions) {
1285 batch.push(Ok(BatchEntry::Call(req)));
1286 } else if let Ok(notif) = deserialize_with_ext::notif::from_str::<Notif>(call.get(), &extensions) {
1287 batch.push(Ok(BatchEntry::Notification(notif)));
1288 } else {
1289 let id = match serde_json::from_str::<jsonrpsee_types::InvalidRequest>(call.get()) {
1290 Ok(err) => err.id,
1291 Err(_) => Id::Null,
1292 };
1293
1294 batch.push(Err(BatchEntryErr::new(id, ErrorCode::InvalidRequest.into())));
1295 }
1296 }
1297
1298 rpc_service.batch(Batch::from(batch)).await
1299 } else {
1300 MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::ParseError))
1301 }
1302 }
1303}