1use crate::rpc::methods::eth::pubsub_trait::EthPubSubApiServer;
5mod auth_layer;
6mod channel;
7mod client;
8mod filter_layer;
9mod filter_list;
10mod log_layer;
11mod metrics_layer;
12mod request;
13mod segregation_layer;
14mod set_extension_layer;
15
16use crate::rpc::eth::types::RandomHexStringIdProvider;
17use crate::shim::clock::ChainEpoch;
18use clap::ValueEnum as _;
19pub use client::Client;
20pub use error::ServerError;
21use eth::filter::EthEventHandler;
22use filter_layer::FilterLayer;
23pub use filter_list::FilterList;
24use futures::FutureExt as _;
25use jsonrpsee::server::ServerConfig;
26use log_layer::LogLayer;
27use reflect::Ctx;
28pub use reflect::{ApiPaths, Permission, RpcMethod, RpcMethodExt};
29pub use request::Request;
30use schemars::Schema;
31use segregation_layer::SegregationLayer;
32use set_extension_layer::SetExtensionLayer;
33mod error;
34mod reflect;
35use ahash::HashMap;
36mod registry;
37pub mod types;
38
39pub use methods::*;
40
41pub use jsonrpsee::core::ClientError;
43
44const LOOKBACK_NO_LIMIT: ChainEpoch = -1;
45
46#[macro_export]
54macro_rules! for_each_rpc_method {
55 ($callback:path) => {
56 $callback!($crate::rpc::auth::AuthNew);
58 $callback!($crate::rpc::auth::AuthVerify);
59
60 $callback!($crate::rpc::beacon::BeaconGetEntry);
62
63 $callback!($crate::rpc::chain::ChainPruneSnapshot);
65 $callback!($crate::rpc::chain::ChainExport);
66 $callback!($crate::rpc::chain::ChainGetBlock);
67 $callback!($crate::rpc::chain::ChainGetBlockMessages);
68 $callback!($crate::rpc::chain::ChainGetEvents);
69 $callback!($crate::rpc::chain::ChainGetGenesis);
70 $callback!($crate::rpc::chain::ChainGetFinalizedTipset);
71 $callback!($crate::rpc::chain::ChainGetMessage);
72 $callback!($crate::rpc::chain::ChainGetMessagesInTipset);
73 $callback!($crate::rpc::chain::ChainGetMinBaseFee);
74 $callback!($crate::rpc::chain::ChainGetParentMessages);
75 $callback!($crate::rpc::chain::ChainGetParentReceipts);
76 $callback!($crate::rpc::chain::ChainGetPath);
77 $callback!($crate::rpc::chain::ChainGetTipSet);
78 $callback!($crate::rpc::chain::ChainGetTipSetV2);
79 $callback!($crate::rpc::chain::ChainGetTipSetAfterHeight);
80 $callback!($crate::rpc::chain::ChainGetTipSetByHeight);
81 $callback!($crate::rpc::chain::ChainHasObj);
82 $callback!($crate::rpc::chain::ChainHead);
83 $callback!($crate::rpc::chain::ChainReadObj);
84 $callback!($crate::rpc::chain::ChainSetHead);
85 $callback!($crate::rpc::chain::ChainStatObj);
86 $callback!($crate::rpc::chain::ChainTipSetWeight);
87 $callback!($crate::rpc::chain::ForestChainExport);
88 $callback!($crate::rpc::chain::ForestChainExportDiff);
89 $callback!($crate::rpc::chain::ForestChainExportStatus);
90 $callback!($crate::rpc::chain::ForestChainExportCancel);
91
92 $callback!($crate::rpc::common::Session);
94 $callback!($crate::rpc::common::Shutdown);
95 $callback!($crate::rpc::common::StartTime);
96 $callback!($crate::rpc::common::Version);
97
98 $callback!($crate::rpc::eth::EthAccounts);
100 $callback!($crate::rpc::eth::EthAddressToFilecoinAddress);
101 $callback!($crate::rpc::eth::EthBlockNumber);
102 $callback!($crate::rpc::eth::EthCall);
103 $callback!($crate::rpc::eth::EthChainId);
104 $callback!($crate::rpc::eth::EthEstimateGas);
105 $callback!($crate::rpc::eth::EthFeeHistory);
106 $callback!($crate::rpc::eth::EthGasPrice);
107 $callback!($crate::rpc::eth::EthGetBalance);
108 $callback!($crate::rpc::eth::EthGetBlockByHash);
109 $callback!($crate::rpc::eth::EthGetBlockByNumber);
110 $callback!($crate::rpc::eth::EthGetBlockReceipts);
111 $callback!($crate::rpc::eth::EthGetBlockReceiptsLimited);
112 $callback!($crate::rpc::eth::EthGetBlockTransactionCountByHash);
113 $callback!($crate::rpc::eth::EthGetBlockTransactionCountByNumber);
114 $callback!($crate::rpc::eth::EthGetCode);
115 $callback!($crate::rpc::eth::EthGetLogs);
116 $callback!($crate::rpc::eth::EthGetFilterLogs);
117 $callback!($crate::rpc::eth::EthGetFilterChanges);
118 $callback!($crate::rpc::eth::EthGetMessageCidByTransactionHash);
119 $callback!($crate::rpc::eth::EthGetStorageAt);
120 $callback!($crate::rpc::eth::EthGetTransactionByHash);
121 $callback!($crate::rpc::eth::EthGetTransactionByHashLimited);
122 $callback!($crate::rpc::eth::EthGetTransactionCount);
123 $callback!($crate::rpc::eth::EthGetTransactionHashByCid);
124 $callback!($crate::rpc::eth::EthGetTransactionByBlockNumberAndIndex);
125 $callback!($crate::rpc::eth::EthGetTransactionByBlockHashAndIndex);
126 $callback!($crate::rpc::eth::EthMaxPriorityFeePerGas);
127 $callback!($crate::rpc::eth::EthProtocolVersion);
128 $callback!($crate::rpc::eth::EthGetTransactionReceipt);
129 $callback!($crate::rpc::eth::EthGetTransactionReceiptLimited);
130 $callback!($crate::rpc::eth::EthNewFilter);
131 $callback!($crate::rpc::eth::EthNewPendingTransactionFilter);
132 $callback!($crate::rpc::eth::EthNewBlockFilter);
133 $callback!($crate::rpc::eth::EthUninstallFilter);
134 $callback!($crate::rpc::eth::EthUnsubscribe);
135 $callback!($crate::rpc::eth::EthSubscribe);
136 $callback!($crate::rpc::eth::EthSyncing);
137 $callback!($crate::rpc::eth::EthTraceBlock);
138 $callback!($crate::rpc::eth::EthTraceFilter);
139 $callback!($crate::rpc::eth::EthTraceTransaction);
140 $callback!($crate::rpc::eth::EthTraceReplayBlockTransactions);
141 $callback!($crate::rpc::eth::Web3ClientVersion);
142 $callback!($crate::rpc::eth::EthSendRawTransaction);
143
144 $callback!($crate::rpc::gas::GasEstimateFeeCap);
146 $callback!($crate::rpc::gas::GasEstimateGasLimit);
147 $callback!($crate::rpc::gas::GasEstimateGasPremium);
148 $callback!($crate::rpc::gas::GasEstimateMessageGas);
149
150 $callback!($crate::rpc::market::MarketAddBalance);
152
153 $callback!($crate::rpc::miner::MinerCreateBlock);
155 $callback!($crate::rpc::miner::MinerGetBaseInfo);
156
157 $callback!($crate::rpc::mpool::MpoolBatchPush);
159 $callback!($crate::rpc::mpool::MpoolBatchPushUntrusted);
160 $callback!($crate::rpc::mpool::MpoolGetNonce);
161 $callback!($crate::rpc::mpool::MpoolPending);
162 $callback!($crate::rpc::mpool::MpoolPush);
163 $callback!($crate::rpc::mpool::MpoolPushMessage);
164 $callback!($crate::rpc::mpool::MpoolPushUntrusted);
165 $callback!($crate::rpc::mpool::MpoolSelect);
166
167 $callback!($crate::rpc::msig::MsigGetAvailableBalance);
169 $callback!($crate::rpc::msig::MsigGetPending);
170 $callback!($crate::rpc::msig::MsigGetVested);
171 $callback!($crate::rpc::msig::MsigGetVestingSchedule);
172
173 $callback!($crate::rpc::net::NetAddrsListen);
175 $callback!($crate::rpc::net::NetAgentVersion);
176 $callback!($crate::rpc::net::NetAutoNatStatus);
177 $callback!($crate::rpc::net::NetConnect);
178 $callback!($crate::rpc::net::NetDisconnect);
179 $callback!($crate::rpc::net::NetFindPeer);
180 $callback!($crate::rpc::net::NetInfo);
181 $callback!($crate::rpc::net::NetListening);
182 $callback!($crate::rpc::net::NetPeers);
183 $callback!($crate::rpc::net::NetProtectAdd);
184 $callback!($crate::rpc::net::NetProtectList);
185 $callback!($crate::rpc::net::NetProtectRemove);
186 $callback!($crate::rpc::net::NetVersion);
187
188 $callback!($crate::rpc::node::NodeStatus);
190
191 $callback!($crate::rpc::state::StateAccountKey);
193 $callback!($crate::rpc::state::StateCall);
194 $callback!($crate::rpc::state::StateCirculatingSupply);
195 $callback!($crate::rpc::state::ForestStateCompute);
196 $callback!($crate::rpc::state::StateCompute);
197 $callback!($crate::rpc::state::StateDealProviderCollateralBounds);
198 $callback!($crate::rpc::state::StateFetchRoot);
199 $callback!($crate::rpc::state::StateGetActor);
200 $callback!($crate::rpc::state::StateGetAllAllocations);
201 $callback!($crate::rpc::state::StateGetAllClaims);
202 $callback!($crate::rpc::state::StateGetAllocation);
203 $callback!($crate::rpc::state::StateGetAllocationForPendingDeal);
204 $callback!($crate::rpc::state::StateGetAllocationIdForPendingDeal);
205 $callback!($crate::rpc::state::StateGetAllocations);
206 $callback!($crate::rpc::state::StateGetBeaconEntry);
207 $callback!($crate::rpc::state::StateGetClaim);
208 $callback!($crate::rpc::state::StateGetClaims);
209 $callback!($crate::rpc::state::StateGetNetworkParams);
210 $callback!($crate::rpc::state::StateGetRandomnessDigestFromBeacon);
211 $callback!($crate::rpc::state::StateGetRandomnessDigestFromTickets);
212 $callback!($crate::rpc::state::StateGetRandomnessFromBeacon);
213 $callback!($crate::rpc::state::StateGetRandomnessFromTickets);
214 $callback!($crate::rpc::state::StateGetReceipt);
215 $callback!($crate::rpc::state::StateListActors);
216 $callback!($crate::rpc::state::StateListMessages);
217 $callback!($crate::rpc::state::StateListMiners);
218 $callback!($crate::rpc::state::StateLookupID);
219 $callback!($crate::rpc::state::StateLookupRobustAddress);
220 $callback!($crate::rpc::state::StateMarketBalance);
221 $callback!($crate::rpc::state::StateMarketDeals);
222 $callback!($crate::rpc::state::StateMarketParticipants);
223 $callback!($crate::rpc::state::StateMarketStorageDeal);
224 $callback!($crate::rpc::state::StateMinerActiveSectors);
225 $callback!($crate::rpc::state::StateMinerAllocated);
226 $callback!($crate::rpc::state::StateMinerAvailableBalance);
227 $callback!($crate::rpc::state::StateMinerDeadlines);
228 $callback!($crate::rpc::state::StateMinerFaults);
229 $callback!($crate::rpc::state::StateMinerInfo);
230 $callback!($crate::rpc::state::StateMinerInitialPledgeCollateral);
231 $callback!($crate::rpc::state::StateMinerPartitions);
232 $callback!($crate::rpc::state::StateMinerPower);
233 $callback!($crate::rpc::state::StateMinerPreCommitDepositForPower);
234 $callback!($crate::rpc::state::StateMinerProvingDeadline);
235 $callback!($crate::rpc::state::StateMinerRecoveries);
236 $callback!($crate::rpc::state::StateMinerSectorAllocated);
237 $callback!($crate::rpc::state::StateMinerSectorCount);
238 $callback!($crate::rpc::state::StateMinerSectors);
239 $callback!($crate::rpc::state::StateNetworkName);
240 $callback!($crate::rpc::state::StateNetworkVersion);
241 $callback!($crate::rpc::state::StateActorInfo);
242 $callback!($crate::rpc::state::StateReadState);
243 $callback!($crate::rpc::state::StateDecodeParams);
244 $callback!($crate::rpc::state::StateReplay);
245 $callback!($crate::rpc::state::StateSearchMsg);
246 $callback!($crate::rpc::state::StateSearchMsgLimited);
247 $callback!($crate::rpc::state::StateSectorExpiration);
248 $callback!($crate::rpc::state::StateSectorGetInfo);
249 $callback!($crate::rpc::state::StateSectorPartition);
250 $callback!($crate::rpc::state::StateSectorPreCommitInfo);
251 $callback!($crate::rpc::state::StateSectorPreCommitInfoV0);
252 $callback!($crate::rpc::state::StateVerifiedClientStatus);
253 $callback!($crate::rpc::state::StateVerifiedRegistryRootKey);
254 $callback!($crate::rpc::state::StateVerifierStatus);
255 $callback!($crate::rpc::state::StateVMCirculatingSupplyInternal);
256 $callback!($crate::rpc::state::StateWaitMsg);
257 $callback!($crate::rpc::state::StateWaitMsgV0);
258 $callback!($crate::rpc::state::StateMinerInitialPledgeForSector);
259
260 $callback!($crate::rpc::sync::SyncCheckBad);
262 $callback!($crate::rpc::sync::SyncMarkBad);
263 $callback!($crate::rpc::sync::SyncSnapshotProgress);
264 $callback!($crate::rpc::sync::SyncStatus);
265 $callback!($crate::rpc::sync::SyncSubmitBlock);
266
267 $callback!($crate::rpc::wallet::WalletBalance);
269 $callback!($crate::rpc::wallet::WalletDefaultAddress);
270 $callback!($crate::rpc::wallet::WalletDelete);
271 $callback!($crate::rpc::wallet::WalletExport);
272 $callback!($crate::rpc::wallet::WalletHas);
273 $callback!($crate::rpc::wallet::WalletImport);
274 $callback!($crate::rpc::wallet::WalletList);
275 $callback!($crate::rpc::wallet::WalletNew);
276 $callback!($crate::rpc::wallet::WalletSetDefault);
277 $callback!($crate::rpc::wallet::WalletSign);
278 $callback!($crate::rpc::wallet::WalletSignMessage);
279 $callback!($crate::rpc::wallet::WalletValidateAddress);
280 $callback!($crate::rpc::wallet::WalletVerify);
281
282 $callback!($crate::rpc::f3::GetRawNetworkName);
284 $callback!($crate::rpc::f3::F3GetCertificate);
285 $callback!($crate::rpc::f3::F3GetECPowerTable);
286 $callback!($crate::rpc::f3::F3GetF3PowerTable);
287 $callback!($crate::rpc::f3::F3GetF3PowerTableByInstance);
288 $callback!($crate::rpc::f3::F3IsRunning);
289 $callback!($crate::rpc::f3::F3GetProgress);
290 $callback!($crate::rpc::f3::F3GetManifest);
291 $callback!($crate::rpc::f3::F3ListParticipants);
292 $callback!($crate::rpc::f3::F3GetLatestCertificate);
293 $callback!($crate::rpc::f3::F3GetOrRenewParticipationTicket);
294 $callback!($crate::rpc::f3::F3Participate);
295 $callback!($crate::rpc::f3::F3ExportLatestSnapshot);
296 $callback!($crate::rpc::f3::GetHead);
297 $callback!($crate::rpc::f3::GetParent);
298 $callback!($crate::rpc::f3::GetParticipatingMinerIDs);
299 $callback!($crate::rpc::f3::GetPowerTable);
300 $callback!($crate::rpc::f3::GetTipset);
301 $callback!($crate::rpc::f3::GetTipsetByEpoch);
302 $callback!($crate::rpc::f3::Finalize);
303 $callback!($crate::rpc::f3::ProtectPeer);
304 $callback!($crate::rpc::f3::SignMessage);
305
306 $callback!($crate::rpc::misc::GetActorEventsRaw);
308 };
309}
310pub(crate) use for_each_rpc_method;
311use sync::SnapshotProgressTracker;
312use tower_http::compression::CompressionLayer;
313use tower_http::sensitive_headers::SetSensitiveRequestHeadersLayer;
314
315#[allow(unused)]
316pub mod prelude {
328 use super::*;
329
330 pub use reflect::RpcMethodExt as _;
331
332 macro_rules! export {
333 ($ty:ty) => {
334 pub use $ty;
335 };
336 }
337
338 for_each_rpc_method!(export);
339}
340
341pub fn collect_rpc_method_info() -> Vec<(&'static str, Permission)> {
343 use crate::rpc::RpcMethod;
344
345 let mut methods = Vec::new();
346
347 macro_rules! add_method {
348 ($ty:ty) => {
349 methods.push((<$ty>::NAME, <$ty>::PERMISSION));
350 };
351 }
352
353 for_each_rpc_method!(add_method);
354
355 methods
356}
357
358mod methods {
395 pub mod auth;
396 pub mod beacon;
397 pub mod chain;
398 pub mod common;
399 pub mod eth;
400 pub mod f3;
401 pub mod gas;
402 pub mod market;
403 pub mod miner;
404 pub mod misc;
405 pub mod mpool;
406 pub mod msig;
407 pub mod net;
408 pub mod node;
409 pub mod state;
410 pub mod sync;
411 pub mod wallet;
412}
413
414use crate::rpc::auth_layer::AuthLayer;
415pub use crate::rpc::channel::CANCEL_METHOD_NAME;
416use crate::rpc::channel::RpcModule as FilRpcModule;
417use crate::rpc::eth::pubsub::EthPubSub;
418use crate::rpc::metrics_layer::MetricsLayer;
419use crate::{chain_sync::network_context::SyncNetworkContext, key_management::KeyStore};
420
421use crate::blocks::FullTipset;
422use fvm_ipld_blockstore::Blockstore;
423use jsonrpsee::{
424 Methods,
425 core::middleware::RpcServiceBuilder,
426 server::{RpcModule, Server, StopHandle, TowerServiceBuilder, stop_channel},
427};
428use parking_lot::RwLock;
429use std::env;
430use std::net::SocketAddr;
431use std::sync::{Arc, LazyLock};
432use std::time::Duration;
433use tokio::sync::mpsc;
434use tower::Service;
435
436use crate::rpc::sync::SnapshotProgressState;
437use openrpc_types::{self, ParamStructure};
438
439pub const DEFAULT_PORT: u16 = 2345;
440
441static DEFAULT_REQUEST_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
443 env::var("FOREST_RPC_DEFAULT_TIMEOUT")
444 .ok()
445 .and_then(|it| Duration::from_secs(it.parse().ok()?).into())
446 .unwrap_or(Duration::from_secs(60))
447});
448
449const MAX_REQUEST_BODY_SIZE: u32 = 64 * 1024 * 1024;
450const MAX_RESPONSE_BODY_SIZE: u32 = MAX_REQUEST_BODY_SIZE;
451
452pub struct RPCState<DB> {
455 pub keystore: Arc<RwLock<KeyStore>>,
456 pub state_manager: Arc<crate::state_manager::StateManager<DB>>,
457 pub mpool: Arc<crate::message_pool::MessagePool<crate::message_pool::MpoolRpcProvider<DB>>>,
458 pub bad_blocks: Option<Arc<crate::chain_sync::BadBlockCache>>,
459 pub msgs_in_tipset: Arc<crate::chain::store::MsgsInTipsetCache>,
460 pub sync_status: crate::chain_sync::SyncStatus,
461 pub eth_event_handler: Arc<EthEventHandler>,
462 pub sync_network_context: SyncNetworkContext<DB>,
463 pub tipset_send: flume::Sender<Arc<FullTipset>>,
464 pub start_time: chrono::DateTime<chrono::Utc>,
465 pub snapshot_progress_tracker: SnapshotProgressTracker,
466 pub shutdown: mpsc::Sender<()>,
467}
468
469impl<DB: Blockstore> RPCState<DB> {
470 pub fn beacon(&self) -> &Arc<crate::beacon::BeaconSchedule> {
471 self.state_manager.beacon_schedule()
472 }
473
474 pub fn chain_store(&self) -> &Arc<crate::chain::ChainStore<DB>> {
475 self.state_manager.chain_store()
476 }
477
478 pub fn chain_index(&self) -> &Arc<crate::chain::index::ChainIndex<Arc<DB>>> {
479 self.chain_store().chain_index()
480 }
481
482 pub fn chain_config(&self) -> &Arc<crate::networks::ChainConfig> {
483 self.state_manager.chain_config()
484 }
485
486 pub fn store(&self) -> &DB {
487 self.chain_store().blockstore()
488 }
489
490 pub fn store_owned(&self) -> Arc<DB> {
491 self.state_manager.blockstore_owned()
492 }
493
494 pub fn network_send(&self) -> &flume::Sender<crate::libp2p::NetworkMessage> {
495 self.sync_network_context.network_send()
496 }
497
498 pub fn get_snapshot_progress_tracker(&self) -> SnapshotProgressState {
499 self.snapshot_progress_tracker.state()
500 }
501}
502
503#[derive(Clone)]
504struct PerConnection<RpcMiddleware, HttpMiddleware> {
505 stop_handle: StopHandle,
506 svc_builder: TowerServiceBuilder<RpcMiddleware, HttpMiddleware>,
507 keystore: Arc<RwLock<KeyStore>>,
508}
509
510pub async fn start_rpc<DB>(
511 state: RPCState<DB>,
512 rpc_endpoint: SocketAddr,
513 filter_list: Option<FilterList>,
514) -> anyhow::Result<()>
515where
516 DB: Blockstore + Send + Sync + 'static,
517{
518 let filter_list = filter_list.unwrap_or_default();
519 let state = Arc::new(state);
521 let keystore = state.keystore.clone();
522 let mut modules = create_modules(state.clone());
523
524 let mut pubsub_module = FilRpcModule::default();
525 pubsub_module.register_channel("Filecoin.ChainNotify", {
526 let state_clone = state.clone();
527 move |params| chain::chain_notify(params, &state_clone)
528 })?;
529
530 for module in modules.values_mut() {
531 module.merge(EthPubSub::new(state.clone()).into_rpc())?;
533 module.merge(pubsub_module.clone())?;
534 }
535
536 let methods: Arc<HashMap<ApiPaths, Methods>> =
537 Arc::new(modules.into_iter().map(|(k, v)| (k, v.into())).collect());
538
539 let (stop_handle, _server_handle) = stop_channel();
540
541 let per_conn = PerConnection {
542 stop_handle: stop_handle.clone(),
543 svc_builder: Server::builder()
544 .set_config(
545 ServerConfig::builder()
546 .max_request_body_size(MAX_REQUEST_BODY_SIZE)
548 .max_response_body_size(MAX_RESPONSE_BODY_SIZE)
549 .set_id_provider(RandomHexStringIdProvider::new())
550 .build(),
551 )
552 .set_http_middleware(
553 tower::ServiceBuilder::new()
554 .layer(CompressionLayer::new())
555 .layer(SetSensitiveRequestHeadersLayer::new(std::iter::once(
557 http::header::AUTHORIZATION,
558 ))),
559 )
560 .to_service_builder(),
561 keystore,
562 };
563
564 let listener = tokio::net::TcpListener::bind(rpc_endpoint).await.unwrap();
565 tracing::info!("Ready for RPC connections");
566 loop {
567 let sock = tokio::select! {
568 res = listener.accept() => {
569 match res {
570 Ok((stream, _remote_addr)) => stream,
571 Err(e) => {
572 tracing::error!("failed to accept v4 connection: {:?}", e);
573 continue;
574 }
575 }
576 }
577 _ = per_conn.stop_handle.clone().shutdown() => break,
578 };
579
580 let svc = tower::service_fn({
581 let methods = methods.clone();
582 let per_conn = per_conn.clone();
583 let filter_list = filter_list.clone();
584 move |req| {
585 let is_websocket = jsonrpsee::server::ws::is_upgrade_request(&req);
586 let path = ApiPaths::from_uri(req.uri()).unwrap_or(ApiPaths::V1);
587 let methods = methods.get(&path).cloned().unwrap_or_default();
588 let PerConnection {
589 stop_handle,
590 svc_builder,
591 keystore,
592 } = per_conn.clone();
593 let headers = req.headers().clone();
596 let rpc_middleware = RpcServiceBuilder::new()
597 .layer(SetExtensionLayer { path })
598 .layer(SegregationLayer)
599 .layer(FilterLayer::new(filter_list.clone()))
600 .layer(AuthLayer {
601 headers,
602 keystore: keystore.clone(),
603 })
604 .layer(LogLayer::default())
605 .layer(MetricsLayer::default());
606 let mut jsonrpsee_svc = svc_builder
607 .set_rpc_middleware(rpc_middleware)
608 .build(methods, stop_handle);
609
610 if is_websocket {
611 let session_close = jsonrpsee_svc.on_session_closed();
614
615 tokio::spawn(async move {
618 session_close.await;
619 tracing::trace!("Closed WebSocket connection");
620 });
621
622 async move {
623 tracing::trace!("Opened WebSocket connection");
624 jsonrpsee_svc
628 .call(req)
629 .await
630 .map_err(|e| anyhow::anyhow!("{:?}", e))
631 }
632 .boxed()
633 } else {
634 async move {
636 tracing::trace!("Opened HTTP connection");
637 let rp = jsonrpsee_svc.call(req).await;
638 tracing::trace!("Closed HTTP connection");
639 rp.map_err(|e| anyhow::anyhow!("{:?}", e))
643 }
644 .boxed()
645 }
646 }
647 });
648
649 tokio::spawn(jsonrpsee::server::serve_with_graceful_shutdown(
650 sock,
651 svc,
652 stop_handle.clone().shutdown(),
653 ));
654 }
655
656 Ok(())
657}
658
659fn create_modules<DB>(state: Arc<RPCState<DB>>) -> HashMap<ApiPaths, RpcModule<RPCState<DB>>>
660where
661 DB: Blockstore + Send + Sync + 'static,
662{
663 let mut modules = HashMap::default();
664 for api_version in ApiPaths::value_variants() {
665 modules.insert(*api_version, RpcModule::from_arc(state.clone()));
666 }
667 macro_rules! register {
668 ($ty:ty) => {
669 if !<$ty>::SUBSCRIPTION {
672 <$ty>::register(&mut modules, ParamStructure::ByPosition).unwrap();
673 }
674 };
675 }
676 for_each_rpc_method!(register);
677 modules
678}
679
680pub fn openrpc(path: ApiPaths, include: Option<&[&str]>) -> openrpc_types::OpenRPC {
682 use schemars::generate::{SchemaGenerator, SchemaSettings};
683
684 let mut methods = vec![];
685 let mut settings = SchemaSettings::draft07();
687 settings.definitions_path = "#/components/schemas/".into();
689 let mut generator = SchemaGenerator::new(settings);
690 macro_rules! callback {
691 ($ty:ty) => {
692 if <$ty>::API_PATHS.contains(path) {
693 match include {
694 Some(include) => match include.contains(&<$ty>::NAME) {
695 true => {
696 methods.push(openrpc_types::ReferenceOr::Item(<$ty>::openrpc(
697 &mut generator,
698 ParamStructure::ByPosition,
699 &<$ty>::NAME,
700 )));
701 if let Some(alias) = &<$ty>::NAME_ALIAS {
702 methods.push(openrpc_types::ReferenceOr::Item(<$ty>::openrpc(
703 &mut generator,
704 ParamStructure::ByPosition,
705 &alias,
706 )));
707 }
708 }
709 false => {}
710 },
711 None => {
712 methods.push(openrpc_types::ReferenceOr::Item(<$ty>::openrpc(
713 &mut generator,
714 ParamStructure::ByPosition,
715 &<$ty>::NAME,
716 )));
717 if let Some(alias) = &<$ty>::NAME_ALIAS {
718 methods.push(openrpc_types::ReferenceOr::Item(<$ty>::openrpc(
719 &mut generator,
720 ParamStructure::ByPosition,
721 &alias,
722 )));
723 }
724 }
725 }
726 }
727 };
728 }
729 for_each_rpc_method!(callback);
730 openrpc_types::OpenRPC {
731 methods,
732 components: Some(openrpc_types::Components {
733 schemas: Some(
734 generator
735 .take_definitions(false)
736 .into_iter()
737 .filter_map(|(k, v)| {
738 if let Ok(v) = Schema::try_from(v) {
739 Some((k, v))
740 } else {
741 None
742 }
743 })
744 .collect(),
745 ),
746 ..Default::default()
747 }),
748 openrpc: openrpc_types::OPEN_RPC_SPECIFICATION_VERSION,
749 info: openrpc_types::Info {
750 title: String::from("forest"),
751 version: env!("CARGO_PKG_VERSION").into(),
752 ..Default::default()
753 },
754 ..Default::default()
755 }
756}
757
758#[cfg(test)]
759mod tests {
760 use crate::rpc::ApiPaths;
761
762 #[test]
765 #[ignore = "https://github.com/ChainSafe/forest/issues/4032"]
766 fn openrpc() {
767 for path in [ApiPaths::V0, ApiPaths::V1] {
768 let _spec = super::openrpc(path, None);
769 insta::assert_yaml_snapshot!(_spec);
777 }
778 }
779}