1#![warn(missing_docs)]
10
11pub mod middleware;
12pub mod utils;
13
14use std::{error::Error as StdError, net::SocketAddr, time::Duration};
15
16use jsonrpsee::{
17 core::BoxError,
18 server::{
19 serve_with_graceful_shutdown, stop_channel, ws, PingConfig, ServerHandle, StopHandle,
20 },
21 Methods, RpcModule,
22};
23use tower::Service;
24use utils::{
25 build_rpc_api, deny_unsafe, format_listen_addrs, get_proxy_ip, ListenAddrError, RpcSettings,
26};
27
28pub use ip_network::IpNetwork;
29pub use jsonrpsee::{
30 core::id_providers::{RandomIntegerIdProvider, RandomStringIdProvider},
31 server::{middleware::rpc::RpcServiceBuilder, BatchRequestConfig},
32};
33pub use middleware::{Metrics, MiddlewareLayer, NodeHealthProxyLayer, RpcMetrics};
34pub use utils::{RpcEndpoint, RpcMethods};
35
36const MEGABYTE: u32 = 1024 * 1024;
37
38pub struct Server {
40 handle: ServerHandle,
42 listen_addrs: Vec<SocketAddr>,
44}
45
46impl Server {
47 pub fn new(handle: ServerHandle, listen_addrs: Vec<SocketAddr>) -> Server {
49 Server { handle, listen_addrs }
50 }
51
52 pub fn handle(&self) -> &ServerHandle {
55 &self.handle
56 }
57
58 pub fn listen_addrs(&self) -> &[SocketAddr] {
60 &self.listen_addrs
61 }
62}
63
64impl Drop for Server {
65 fn drop(&mut self) {
66 let _ = self.handle.stop();
68 }
69}
70
71pub trait SubscriptionIdProvider:
73 jsonrpsee::core::traits::IdProvider + dyn_clone::DynClone
74{
75}
76
77dyn_clone::clone_trait_object!(SubscriptionIdProvider);
78
79#[derive(Debug)]
81pub struct Config<M: Send + Sync + 'static> {
82 pub endpoints: Vec<RpcEndpoint>,
84 pub metrics: Option<RpcMetrics>,
86 pub rpc_api: RpcModule<M>,
88 pub id_provider: Option<Box<dyn SubscriptionIdProvider>>,
90 pub tokio_handle: tokio::runtime::Handle,
92 pub request_logger_limit: u32,
94}
95
96#[derive(Debug, Clone)]
97struct PerConnection {
98 methods: Methods,
99 stop_handle: StopHandle,
100 metrics: Option<RpcMetrics>,
101 tokio_handle: tokio::runtime::Handle,
102}
103
104pub async fn start_server<M>(config: Config<M>) -> Result<Server, Box<dyn StdError + Send + Sync>>
106where
107 M: Send + Sync,
108{
109 let Config { endpoints, metrics, tokio_handle, rpc_api, id_provider, request_logger_limit } =
110 config;
111
112 let (stop_handle, server_handle) = stop_channel();
113 let cfg = PerConnection {
114 methods: build_rpc_api(rpc_api).into(),
115 metrics,
116 tokio_handle: tokio_handle.clone(),
117 stop_handle,
118 };
119
120 let mut local_addrs = Vec::new();
121
122 for endpoint in endpoints {
123 let allowed_to_fail = endpoint.is_optional;
124 let local_addr = endpoint.listen_addr;
125
126 let mut listener = match endpoint.bind().await {
127 Ok(l) => l,
128 Err(e) if allowed_to_fail => {
129 log::debug!(target: "rpc", "JSON-RPC server failed to bind optional address: {:?}, error: {:?}", local_addr, e);
130 continue;
131 },
132 Err(e) => return Err(e),
133 };
134 let local_addr = listener.local_addr();
135 local_addrs.push(local_addr);
136 let cfg = cfg.clone();
137
138 let RpcSettings {
139 batch_config,
140 max_connections,
141 max_payload_in_mb,
142 max_payload_out_mb,
143 max_buffer_capacity_per_connection,
144 max_subscriptions_per_connection,
145 rpc_methods,
146 rate_limit_trust_proxy_headers,
147 rate_limit_whitelisted_ips,
148 host_filter,
149 cors,
150 rate_limit,
151 } = listener.rpc_settings();
152
153 let http_middleware = tower::ServiceBuilder::new()
154 .option_layer(host_filter)
155 .layer(NodeHealthProxyLayer::default())
158 .layer(cors);
159
160 let mut builder = jsonrpsee::server::Server::builder()
161 .max_request_body_size(max_payload_in_mb.saturating_mul(MEGABYTE))
162 .max_response_body_size(max_payload_out_mb.saturating_mul(MEGABYTE))
163 .max_connections(max_connections)
164 .max_subscriptions_per_connection(max_subscriptions_per_connection)
165 .enable_ws_ping(
166 PingConfig::new()
167 .ping_interval(Duration::from_secs(30))
168 .inactive_limit(Duration::from_secs(60))
169 .max_failures(3),
170 )
171 .set_http_middleware(http_middleware)
172 .set_message_buffer_capacity(max_buffer_capacity_per_connection)
173 .set_batch_request_config(batch_config)
174 .custom_tokio_runtime(cfg.tokio_handle.clone());
175
176 if let Some(provider) = id_provider.clone() {
177 builder = builder.set_id_provider(provider);
178 } else {
179 builder = builder.set_id_provider(RandomStringIdProvider::new(16));
180 };
181
182 let service_builder = builder.to_service_builder();
183 let deny_unsafe = deny_unsafe(&local_addr, &rpc_methods);
184
185 tokio_handle.spawn(async move {
186 loop {
187 let (sock, remote_addr) = tokio::select! {
188 res = listener.accept() => {
189 match res {
190 Ok(s) => s,
191 Err(e) => {
192 log::debug!(target: "rpc", "Failed to accept connection: {:?}", e);
193 continue;
194 }
195 }
196 }
197 _ = cfg.stop_handle.clone().shutdown() => break,
198 };
199
200 let ip = remote_addr.ip();
201 let cfg2 = cfg.clone();
202 let service_builder2 = service_builder.clone();
203 let rate_limit_whitelisted_ips2 = rate_limit_whitelisted_ips.clone();
204
205 let svc =
206 tower::service_fn(move |mut req: http::Request<hyper::body::Incoming>| {
207 req.extensions_mut().insert(deny_unsafe);
208
209 let PerConnection { methods, metrics, tokio_handle, stop_handle } =
210 cfg2.clone();
211 let service_builder = service_builder2.clone();
212
213 let proxy_ip =
214 if rate_limit_trust_proxy_headers { get_proxy_ip(&req) } else { None };
215
216 let rate_limit_cfg = if rate_limit_whitelisted_ips2
217 .iter()
218 .any(|ips| ips.contains(proxy_ip.unwrap_or(ip)))
219 {
220 log::debug!(target: "rpc", "ip={ip}, proxy_ip={:?} is trusted, disabling rate-limit", proxy_ip);
221 None
222 } else {
223 if !rate_limit_whitelisted_ips2.is_empty() {
224 log::debug!(target: "rpc", "ip={ip}, proxy_ip={:?} is not trusted, rate-limit enabled", proxy_ip);
225 }
226 rate_limit
227 };
228
229 let is_websocket = ws::is_upgrade_request(&req);
230 let transport_label = if is_websocket { "ws" } else { "http" };
231
232 let middleware_layer = match (metrics, rate_limit_cfg) {
233 (None, None) => None,
234 (Some(metrics), None) => Some(
235 MiddlewareLayer::new()
236 .with_metrics(Metrics::new(metrics, transport_label)),
237 ),
238 (None, Some(rate_limit)) =>
239 Some(MiddlewareLayer::new().with_rate_limit_per_minute(rate_limit)),
240 (Some(metrics), Some(rate_limit)) => Some(
241 MiddlewareLayer::new()
242 .with_metrics(Metrics::new(metrics, transport_label))
243 .with_rate_limit_per_minute(rate_limit),
244 ),
245 };
246
247 let rpc_middleware = RpcServiceBuilder::new()
248 .rpc_logger(request_logger_limit)
249 .option_layer(middleware_layer.clone());
250 let mut svc = service_builder
251 .set_rpc_middleware(rpc_middleware)
252 .build(methods, stop_handle);
253
254 async move {
255 if is_websocket {
256 let on_disconnect = svc.on_session_closed();
257
258 tokio_handle.spawn(async move {
260 let now = std::time::Instant::now();
261 middleware_layer.as_ref().map(|m| m.ws_connect());
262 on_disconnect.await;
263 middleware_layer.as_ref().map(|m| m.ws_disconnect(now));
264 });
265 }
266
267 svc.call(req).await.map_err(|e| BoxError::from(e))
271 }
272 });
273
274 cfg.tokio_handle.spawn(serve_with_graceful_shutdown(
275 sock,
276 svc,
277 cfg.stop_handle.clone().shutdown(),
278 ));
279 }
280 });
281 }
282
283 if local_addrs.is_empty() {
284 return Err(Box::new(ListenAddrError));
285 }
286
287 log::info!("Running JSON-RPC server: addr={}", format_listen_addrs(&local_addrs));
296
297 Ok(Server::new(server_handle, local_addrs))
298}