1#![warn(missing_docs)]
22
23pub mod middleware;
24pub mod utils;
25
26use std::{error::Error as StdError, net::SocketAddr, time::Duration};
27
28use jsonrpsee::{
29 core::BoxError,
30 server::{
31 serve_with_graceful_shutdown, stop_channel, ws, PingConfig, ServerHandle, StopHandle,
32 },
33 Methods, RpcModule,
34};
35use tower::Service;
36use utils::{
37 build_rpc_api, deny_unsafe, format_listen_addrs, get_proxy_ip, ListenAddrError, RpcSettings,
38};
39
40pub use ip_network::IpNetwork;
41pub use jsonrpsee::{
42 core::id_providers::{RandomIntegerIdProvider, RandomStringIdProvider},
43 server::{middleware::rpc::RpcServiceBuilder, BatchRequestConfig},
44};
45pub use middleware::{Metrics, MiddlewareLayer, NodeHealthProxyLayer, RpcMetrics};
46pub use utils::{RpcEndpoint, RpcMethods};
47
48const MEGABYTE: u32 = 1024 * 1024;
49
50pub struct Server {
52 handle: ServerHandle,
54 listen_addrs: Vec<SocketAddr>,
56}
57
58impl Server {
59 pub fn new(handle: ServerHandle, listen_addrs: Vec<SocketAddr>) -> Server {
61 Server { handle, listen_addrs }
62 }
63
64 pub fn handle(&self) -> &ServerHandle {
67 &self.handle
68 }
69
70 pub fn listen_addrs(&self) -> &[SocketAddr] {
72 &self.listen_addrs
73 }
74}
75
76impl Drop for Server {
77 fn drop(&mut self) {
78 let _ = self.handle.stop();
80 }
81}
82
83pub trait SubscriptionIdProvider:
85 jsonrpsee::core::traits::IdProvider + dyn_clone::DynClone
86{
87}
88
89dyn_clone::clone_trait_object!(SubscriptionIdProvider);
90
91#[derive(Debug)]
93pub struct Config<M: Send + Sync + 'static> {
94 pub endpoints: Vec<RpcEndpoint>,
96 pub metrics: Option<RpcMetrics>,
98 pub rpc_api: RpcModule<M>,
100 pub id_provider: Option<Box<dyn SubscriptionIdProvider>>,
102 pub tokio_handle: tokio::runtime::Handle,
104 pub request_logger_limit: u32,
106}
107
108#[derive(Debug, Clone)]
109struct PerConnection {
110 methods: Methods,
111 stop_handle: StopHandle,
112 metrics: Option<RpcMetrics>,
113 tokio_handle: tokio::runtime::Handle,
114}
115
116pub async fn start_server<M>(config: Config<M>) -> Result<Server, Box<dyn StdError + Send + Sync>>
118where
119 M: Send + Sync,
120{
121 let Config { endpoints, metrics, tokio_handle, rpc_api, id_provider, request_logger_limit } =
122 config;
123
124 let (stop_handle, server_handle) = stop_channel();
125 let cfg = PerConnection {
126 methods: build_rpc_api(rpc_api).into(),
127 metrics,
128 tokio_handle: tokio_handle.clone(),
129 stop_handle,
130 };
131
132 let mut local_addrs = Vec::new();
133
134 for endpoint in endpoints {
135 let allowed_to_fail = endpoint.is_optional;
136 let local_addr = endpoint.listen_addr;
137
138 let mut listener = match endpoint.bind().await {
139 Ok(l) => l,
140 Err(e) if allowed_to_fail => {
141 log::debug!(target: "rpc", "JSON-RPC server failed to bind optional address: {:?}, error: {:?}", local_addr, e);
142 continue;
143 },
144 Err(e) => return Err(e),
145 };
146 let local_addr = listener.local_addr();
147 local_addrs.push(local_addr);
148 let cfg = cfg.clone();
149
150 let RpcSettings {
151 batch_config,
152 max_connections,
153 max_payload_in_mb,
154 max_payload_out_mb,
155 max_buffer_capacity_per_connection,
156 max_subscriptions_per_connection,
157 rpc_methods,
158 rate_limit_trust_proxy_headers,
159 rate_limit_whitelisted_ips,
160 host_filter,
161 cors,
162 rate_limit,
163 } = listener.rpc_settings();
164
165 let http_middleware = tower::ServiceBuilder::new()
166 .option_layer(host_filter)
167 .layer(NodeHealthProxyLayer::default())
170 .layer(cors);
171
172 let mut builder = jsonrpsee::server::Server::builder()
173 .max_request_body_size(max_payload_in_mb.saturating_mul(MEGABYTE))
174 .max_response_body_size(max_payload_out_mb.saturating_mul(MEGABYTE))
175 .max_connections(max_connections)
176 .max_subscriptions_per_connection(max_subscriptions_per_connection)
177 .enable_ws_ping(
178 PingConfig::new()
179 .ping_interval(Duration::from_secs(30))
180 .inactive_limit(Duration::from_secs(60))
181 .max_failures(3),
182 )
183 .set_http_middleware(http_middleware)
184 .set_message_buffer_capacity(max_buffer_capacity_per_connection)
185 .set_batch_request_config(batch_config)
186 .custom_tokio_runtime(cfg.tokio_handle.clone());
187
188 if let Some(provider) = id_provider.clone() {
189 builder = builder.set_id_provider(provider);
190 } else {
191 builder = builder.set_id_provider(RandomStringIdProvider::new(16));
192 };
193
194 let service_builder = builder.to_service_builder();
195 let deny_unsafe = deny_unsafe(&local_addr, &rpc_methods);
196
197 tokio_handle.spawn(async move {
198 loop {
199 let (sock, remote_addr) = tokio::select! {
200 res = listener.accept() => {
201 match res {
202 Ok(s) => s,
203 Err(e) => {
204 log::debug!(target: "rpc", "Failed to accept connection: {:?}", e);
205 continue;
206 }
207 }
208 }
209 _ = cfg.stop_handle.clone().shutdown() => break,
210 };
211
212 let ip = remote_addr.ip();
213 let cfg2 = cfg.clone();
214 let service_builder2 = service_builder.clone();
215 let rate_limit_whitelisted_ips2 = rate_limit_whitelisted_ips.clone();
216
217 let svc =
218 tower::service_fn(move |mut req: http::Request<hyper::body::Incoming>| {
219 req.extensions_mut().insert(deny_unsafe);
220
221 let PerConnection { methods, metrics, tokio_handle, stop_handle } =
222 cfg2.clone();
223 let service_builder = service_builder2.clone();
224
225 let proxy_ip =
226 if rate_limit_trust_proxy_headers { get_proxy_ip(&req) } else { None };
227
228 let rate_limit_cfg = if rate_limit_whitelisted_ips2
229 .iter()
230 .any(|ips| ips.contains(proxy_ip.unwrap_or(ip)))
231 {
232 log::debug!(target: "rpc", "ip={ip}, proxy_ip={:?} is trusted, disabling rate-limit", proxy_ip);
233 None
234 } else {
235 if !rate_limit_whitelisted_ips2.is_empty() {
236 log::debug!(target: "rpc", "ip={ip}, proxy_ip={:?} is not trusted, rate-limit enabled", proxy_ip);
237 }
238 rate_limit
239 };
240
241 let is_websocket = ws::is_upgrade_request(&req);
242 let transport_label = if is_websocket { "ws" } else { "http" };
243
244 let middleware_layer = match (metrics, rate_limit_cfg) {
245 (None, None) => None,
246 (Some(metrics), None) => Some(
247 MiddlewareLayer::new()
248 .with_metrics(Metrics::new(metrics, transport_label)),
249 ),
250 (None, Some(rate_limit)) =>
251 Some(MiddlewareLayer::new().with_rate_limit_per_minute(rate_limit)),
252 (Some(metrics), Some(rate_limit)) => Some(
253 MiddlewareLayer::new()
254 .with_metrics(Metrics::new(metrics, transport_label))
255 .with_rate_limit_per_minute(rate_limit),
256 ),
257 };
258
259 let rpc_middleware = RpcServiceBuilder::new()
260 .rpc_logger(request_logger_limit)
261 .option_layer(middleware_layer.clone());
262 let mut svc = service_builder
263 .set_rpc_middleware(rpc_middleware)
264 .build(methods, stop_handle);
265
266 async move {
267 if is_websocket {
268 let on_disconnect = svc.on_session_closed();
269
270 tokio_handle.spawn(async move {
272 let now = std::time::Instant::now();
273 middleware_layer.as_ref().map(|m| m.ws_connect());
274 on_disconnect.await;
275 middleware_layer.as_ref().map(|m| m.ws_disconnect(now));
276 });
277 }
278
279 svc.call(req).await.map_err(|e| BoxError::from(e))
283 }
284 });
285
286 cfg.tokio_handle.spawn(serve_with_graceful_shutdown(
287 sock,
288 svc,
289 cfg.stop_handle.clone().shutdown(),
290 ));
291 }
292 });
293 }
294
295 if local_addrs.is_empty() {
296 return Err(Box::new(ListenAddrError));
297 }
298
299 log::info!("Running JSON-RPC server: addr={}", format_listen_addrs(&local_addrs));
308
309 Ok(Server::new(server_handle, local_addrs))
310}