forest/rpc/
mod.rs

1// Copyright 2019-2025 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use crate::rpc::methods::eth::pubsub_trait::EthPubSubApiServer;
5mod auth_layer;
6mod channel;
7mod client;
8mod filter_layer;
9mod filter_list;
10mod log_layer;
11mod metrics_layer;
12mod request;
13mod segregation_layer;
14mod set_extension_layer;
15
16use crate::rpc::eth::types::RandomHexStringIdProvider;
17use crate::shim::clock::ChainEpoch;
18use clap::ValueEnum as _;
19pub use client::Client;
20pub use error::ServerError;
21use eth::filter::EthEventHandler;
22use filter_layer::FilterLayer;
23pub use filter_list::FilterList;
24use futures::FutureExt as _;
25use jsonrpsee::server::ServerConfig;
26use log_layer::LogLayer;
27use reflect::Ctx;
28pub use reflect::{ApiPaths, Permission, RpcMethod, RpcMethodExt};
29pub use request::Request;
30use schemars::Schema;
31use segregation_layer::SegregationLayer;
32use set_extension_layer::SetExtensionLayer;
33mod error;
34mod reflect;
35use ahash::HashMap;
36mod registry;
37pub mod types;
38
39pub use methods::*;
40
41/// Protocol or transport-specific error
42pub use jsonrpsee::core::ClientError;
43
44const LOOKBACK_NO_LIMIT: ChainEpoch = -1;
45
46/// The macro `callback` will be passed in each type that implements
47/// [`RpcMethod`].
48///
49/// This is a macro because there is no way to abstract the `ARITY` on that
50/// trait.
51///
52/// All methods should be entered here.
53#[macro_export]
54macro_rules! for_each_rpc_method {
55    ($callback:path) => {
56        // auth vertical
57        $callback!($crate::rpc::auth::AuthNew);
58        $callback!($crate::rpc::auth::AuthVerify);
59
60        // beacon vertical
61        $callback!($crate::rpc::beacon::BeaconGetEntry);
62
63        // chain vertical
64        $callback!($crate::rpc::chain::ChainPruneSnapshot);
65        $callback!($crate::rpc::chain::ChainExport);
66        $callback!($crate::rpc::chain::ChainGetBlock);
67        $callback!($crate::rpc::chain::ChainGetBlockMessages);
68        $callback!($crate::rpc::chain::ChainGetEvents);
69        $callback!($crate::rpc::chain::ChainGetGenesis);
70        $callback!($crate::rpc::chain::ChainGetFinalizedTipset);
71        $callback!($crate::rpc::chain::ChainGetMessage);
72        $callback!($crate::rpc::chain::ChainGetMessagesInTipset);
73        $callback!($crate::rpc::chain::ChainGetMinBaseFee);
74        $callback!($crate::rpc::chain::ChainGetParentMessages);
75        $callback!($crate::rpc::chain::ChainGetParentReceipts);
76        $callback!($crate::rpc::chain::ChainGetPath);
77        $callback!($crate::rpc::chain::ChainGetTipSet);
78        $callback!($crate::rpc::chain::ChainGetTipSetV2);
79        $callback!($crate::rpc::chain::ChainGetTipSetAfterHeight);
80        $callback!($crate::rpc::chain::ChainGetTipSetByHeight);
81        $callback!($crate::rpc::chain::ChainHasObj);
82        $callback!($crate::rpc::chain::ChainHead);
83        $callback!($crate::rpc::chain::ChainReadObj);
84        $callback!($crate::rpc::chain::ChainSetHead);
85        $callback!($crate::rpc::chain::ChainStatObj);
86        $callback!($crate::rpc::chain::ChainTipSetWeight);
87        $callback!($crate::rpc::chain::ForestChainExport);
88        $callback!($crate::rpc::chain::ForestChainExportDiff);
89        $callback!($crate::rpc::chain::ForestChainExportStatus);
90        $callback!($crate::rpc::chain::ForestChainExportCancel);
91
92        // common vertical
93        $callback!($crate::rpc::common::Session);
94        $callback!($crate::rpc::common::Shutdown);
95        $callback!($crate::rpc::common::StartTime);
96        $callback!($crate::rpc::common::Version);
97
98        // eth vertical
99        $callback!($crate::rpc::eth::EthAccounts);
100        $callback!($crate::rpc::eth::EthAddressToFilecoinAddress);
101        $callback!($crate::rpc::eth::EthBlockNumber);
102        $callback!($crate::rpc::eth::EthCall);
103        $callback!($crate::rpc::eth::EthChainId);
104        $callback!($crate::rpc::eth::EthEstimateGas);
105        $callback!($crate::rpc::eth::EthFeeHistory);
106        $callback!($crate::rpc::eth::EthGasPrice);
107        $callback!($crate::rpc::eth::EthGetBalance);
108        $callback!($crate::rpc::eth::EthGetBlockByHash);
109        $callback!($crate::rpc::eth::EthGetBlockByNumber);
110        $callback!($crate::rpc::eth::EthGetBlockReceipts);
111        $callback!($crate::rpc::eth::EthGetBlockReceiptsLimited);
112        $callback!($crate::rpc::eth::EthGetBlockTransactionCountByHash);
113        $callback!($crate::rpc::eth::EthGetBlockTransactionCountByNumber);
114        $callback!($crate::rpc::eth::EthGetCode);
115        $callback!($crate::rpc::eth::EthGetLogs);
116        $callback!($crate::rpc::eth::EthGetFilterLogs);
117        $callback!($crate::rpc::eth::EthGetFilterChanges);
118        $callback!($crate::rpc::eth::EthGetMessageCidByTransactionHash);
119        $callback!($crate::rpc::eth::EthGetStorageAt);
120        $callback!($crate::rpc::eth::EthGetTransactionByHash);
121        $callback!($crate::rpc::eth::EthGetTransactionByHashLimited);
122        $callback!($crate::rpc::eth::EthGetTransactionCount);
123        $callback!($crate::rpc::eth::EthGetTransactionHashByCid);
124        $callback!($crate::rpc::eth::EthGetTransactionByBlockNumberAndIndex);
125        $callback!($crate::rpc::eth::EthGetTransactionByBlockHashAndIndex);
126        $callback!($crate::rpc::eth::EthMaxPriorityFeePerGas);
127        $callback!($crate::rpc::eth::EthProtocolVersion);
128        $callback!($crate::rpc::eth::EthGetTransactionReceipt);
129        $callback!($crate::rpc::eth::EthGetTransactionReceiptLimited);
130        $callback!($crate::rpc::eth::EthNewFilter);
131        $callback!($crate::rpc::eth::EthNewPendingTransactionFilter);
132        $callback!($crate::rpc::eth::EthNewBlockFilter);
133        $callback!($crate::rpc::eth::EthUninstallFilter);
134        $callback!($crate::rpc::eth::EthUnsubscribe);
135        $callback!($crate::rpc::eth::EthSubscribe);
136        $callback!($crate::rpc::eth::EthSyncing);
137        $callback!($crate::rpc::eth::EthTraceBlock);
138        $callback!($crate::rpc::eth::EthTraceFilter);
139        $callback!($crate::rpc::eth::EthTraceTransaction);
140        $callback!($crate::rpc::eth::EthTraceReplayBlockTransactions);
141        $callback!($crate::rpc::eth::Web3ClientVersion);
142        $callback!($crate::rpc::eth::EthSendRawTransaction);
143
144        // gas vertical
145        $callback!($crate::rpc::gas::GasEstimateFeeCap);
146        $callback!($crate::rpc::gas::GasEstimateGasLimit);
147        $callback!($crate::rpc::gas::GasEstimateGasPremium);
148        $callback!($crate::rpc::gas::GasEstimateMessageGas);
149
150        // market vertical
151        $callback!($crate::rpc::market::MarketAddBalance);
152
153        // miner vertical
154        $callback!($crate::rpc::miner::MinerCreateBlock);
155        $callback!($crate::rpc::miner::MinerGetBaseInfo);
156
157        // mpool vertical
158        $callback!($crate::rpc::mpool::MpoolBatchPush);
159        $callback!($crate::rpc::mpool::MpoolBatchPushUntrusted);
160        $callback!($crate::rpc::mpool::MpoolGetNonce);
161        $callback!($crate::rpc::mpool::MpoolPending);
162        $callback!($crate::rpc::mpool::MpoolPush);
163        $callback!($crate::rpc::mpool::MpoolPushMessage);
164        $callback!($crate::rpc::mpool::MpoolPushUntrusted);
165        $callback!($crate::rpc::mpool::MpoolSelect);
166
167        // msig vertical
168        $callback!($crate::rpc::msig::MsigGetAvailableBalance);
169        $callback!($crate::rpc::msig::MsigGetPending);
170        $callback!($crate::rpc::msig::MsigGetVested);
171        $callback!($crate::rpc::msig::MsigGetVestingSchedule);
172
173        // net vertical
174        $callback!($crate::rpc::net::NetAddrsListen);
175        $callback!($crate::rpc::net::NetAgentVersion);
176        $callback!($crate::rpc::net::NetAutoNatStatus);
177        $callback!($crate::rpc::net::NetConnect);
178        $callback!($crate::rpc::net::NetDisconnect);
179        $callback!($crate::rpc::net::NetFindPeer);
180        $callback!($crate::rpc::net::NetInfo);
181        $callback!($crate::rpc::net::NetListening);
182        $callback!($crate::rpc::net::NetPeers);
183        $callback!($crate::rpc::net::NetProtectAdd);
184        $callback!($crate::rpc::net::NetProtectList);
185        $callback!($crate::rpc::net::NetProtectRemove);
186        $callback!($crate::rpc::net::NetVersion);
187
188        // node vertical
189        $callback!($crate::rpc::node::NodeStatus);
190
191        // state vertical
192        $callback!($crate::rpc::state::StateAccountKey);
193        $callback!($crate::rpc::state::StateCall);
194        $callback!($crate::rpc::state::StateCirculatingSupply);
195        $callback!($crate::rpc::state::ForestStateCompute);
196        $callback!($crate::rpc::state::StateCompute);
197        $callback!($crate::rpc::state::StateDealProviderCollateralBounds);
198        $callback!($crate::rpc::state::StateFetchRoot);
199        $callback!($crate::rpc::state::StateGetActor);
200        $callback!($crate::rpc::state::StateGetAllAllocations);
201        $callback!($crate::rpc::state::StateGetAllClaims);
202        $callback!($crate::rpc::state::StateGetAllocation);
203        $callback!($crate::rpc::state::StateGetAllocationForPendingDeal);
204        $callback!($crate::rpc::state::StateGetAllocationIdForPendingDeal);
205        $callback!($crate::rpc::state::StateGetAllocations);
206        $callback!($crate::rpc::state::StateGetBeaconEntry);
207        $callback!($crate::rpc::state::StateGetClaim);
208        $callback!($crate::rpc::state::StateGetClaims);
209        $callback!($crate::rpc::state::StateGetNetworkParams);
210        $callback!($crate::rpc::state::StateGetRandomnessDigestFromBeacon);
211        $callback!($crate::rpc::state::StateGetRandomnessDigestFromTickets);
212        $callback!($crate::rpc::state::StateGetRandomnessFromBeacon);
213        $callback!($crate::rpc::state::StateGetRandomnessFromTickets);
214        $callback!($crate::rpc::state::StateGetReceipt);
215        $callback!($crate::rpc::state::StateListActors);
216        $callback!($crate::rpc::state::StateListMessages);
217        $callback!($crate::rpc::state::StateListMiners);
218        $callback!($crate::rpc::state::StateLookupID);
219        $callback!($crate::rpc::state::StateLookupRobustAddress);
220        $callback!($crate::rpc::state::StateMarketBalance);
221        $callback!($crate::rpc::state::StateMarketDeals);
222        $callback!($crate::rpc::state::StateMarketParticipants);
223        $callback!($crate::rpc::state::StateMarketStorageDeal);
224        $callback!($crate::rpc::state::StateMinerActiveSectors);
225        $callback!($crate::rpc::state::StateMinerAllocated);
226        $callback!($crate::rpc::state::StateMinerAvailableBalance);
227        $callback!($crate::rpc::state::StateMinerDeadlines);
228        $callback!($crate::rpc::state::StateMinerFaults);
229        $callback!($crate::rpc::state::StateMinerInfo);
230        $callback!($crate::rpc::state::StateMinerInitialPledgeCollateral);
231        $callback!($crate::rpc::state::StateMinerPartitions);
232        $callback!($crate::rpc::state::StateMinerPower);
233        $callback!($crate::rpc::state::StateMinerPreCommitDepositForPower);
234        $callback!($crate::rpc::state::StateMinerProvingDeadline);
235        $callback!($crate::rpc::state::StateMinerRecoveries);
236        $callback!($crate::rpc::state::StateMinerSectorAllocated);
237        $callback!($crate::rpc::state::StateMinerSectorCount);
238        $callback!($crate::rpc::state::StateMinerSectors);
239        $callback!($crate::rpc::state::StateNetworkName);
240        $callback!($crate::rpc::state::StateNetworkVersion);
241        $callback!($crate::rpc::state::StateActorInfo);
242        $callback!($crate::rpc::state::StateReadState);
243        $callback!($crate::rpc::state::StateDecodeParams);
244        $callback!($crate::rpc::state::StateReplay);
245        $callback!($crate::rpc::state::StateSearchMsg);
246        $callback!($crate::rpc::state::StateSearchMsgLimited);
247        $callback!($crate::rpc::state::StateSectorExpiration);
248        $callback!($crate::rpc::state::StateSectorGetInfo);
249        $callback!($crate::rpc::state::StateSectorPartition);
250        $callback!($crate::rpc::state::StateSectorPreCommitInfo);
251        $callback!($crate::rpc::state::StateSectorPreCommitInfoV0);
252        $callback!($crate::rpc::state::StateVerifiedClientStatus);
253        $callback!($crate::rpc::state::StateVerifiedRegistryRootKey);
254        $callback!($crate::rpc::state::StateVerifierStatus);
255        $callback!($crate::rpc::state::StateVMCirculatingSupplyInternal);
256        $callback!($crate::rpc::state::StateWaitMsg);
257        $callback!($crate::rpc::state::StateWaitMsgV0);
258        $callback!($crate::rpc::state::StateMinerInitialPledgeForSector);
259
260        // sync vertical
261        $callback!($crate::rpc::sync::SyncCheckBad);
262        $callback!($crate::rpc::sync::SyncMarkBad);
263        $callback!($crate::rpc::sync::SyncSnapshotProgress);
264        $callback!($crate::rpc::sync::SyncStatus);
265        $callback!($crate::rpc::sync::SyncSubmitBlock);
266
267        // wallet vertical
268        $callback!($crate::rpc::wallet::WalletBalance);
269        $callback!($crate::rpc::wallet::WalletDefaultAddress);
270        $callback!($crate::rpc::wallet::WalletDelete);
271        $callback!($crate::rpc::wallet::WalletExport);
272        $callback!($crate::rpc::wallet::WalletHas);
273        $callback!($crate::rpc::wallet::WalletImport);
274        $callback!($crate::rpc::wallet::WalletList);
275        $callback!($crate::rpc::wallet::WalletNew);
276        $callback!($crate::rpc::wallet::WalletSetDefault);
277        $callback!($crate::rpc::wallet::WalletSign);
278        $callback!($crate::rpc::wallet::WalletSignMessage);
279        $callback!($crate::rpc::wallet::WalletValidateAddress);
280        $callback!($crate::rpc::wallet::WalletVerify);
281
282        // f3
283        $callback!($crate::rpc::f3::GetRawNetworkName);
284        $callback!($crate::rpc::f3::F3GetCertificate);
285        $callback!($crate::rpc::f3::F3GetECPowerTable);
286        $callback!($crate::rpc::f3::F3GetF3PowerTable);
287        $callback!($crate::rpc::f3::F3GetF3PowerTableByInstance);
288        $callback!($crate::rpc::f3::F3IsRunning);
289        $callback!($crate::rpc::f3::F3GetProgress);
290        $callback!($crate::rpc::f3::F3GetManifest);
291        $callback!($crate::rpc::f3::F3ListParticipants);
292        $callback!($crate::rpc::f3::F3GetLatestCertificate);
293        $callback!($crate::rpc::f3::F3GetOrRenewParticipationTicket);
294        $callback!($crate::rpc::f3::F3Participate);
295        $callback!($crate::rpc::f3::F3ExportLatestSnapshot);
296        $callback!($crate::rpc::f3::GetHead);
297        $callback!($crate::rpc::f3::GetParent);
298        $callback!($crate::rpc::f3::GetParticipatingMinerIDs);
299        $callback!($crate::rpc::f3::GetPowerTable);
300        $callback!($crate::rpc::f3::GetTipset);
301        $callback!($crate::rpc::f3::GetTipsetByEpoch);
302        $callback!($crate::rpc::f3::Finalize);
303        $callback!($crate::rpc::f3::ProtectPeer);
304        $callback!($crate::rpc::f3::SignMessage);
305
306        // misc
307        $callback!($crate::rpc::misc::GetActorEventsRaw);
308    };
309}
310pub(crate) use for_each_rpc_method;
311use sync::SnapshotProgressTracker;
312use tower_http::compression::CompressionLayer;
313use tower_http::sensitive_headers::SetSensitiveRequestHeadersLayer;
314
315#[allow(unused)]
316/// All handler definitions.
317///
318/// Usage guide:
319/// ```ignore
320/// use crate::rpc::{self, prelude::*};
321///
322/// let client = rpc::Client::from(..);
323/// ChainHead::call(&client, ()).await?;
324/// fn foo() -> rpc::ClientError {..}
325/// fn bar() -> rpc::ServerError {..}
326/// ```
327pub mod prelude {
328    use super::*;
329
330    pub use reflect::RpcMethodExt as _;
331
332    macro_rules! export {
333        ($ty:ty) => {
334            pub use $ty;
335        };
336    }
337
338    for_each_rpc_method!(export);
339}
340
341/// Collects all the RPC method names and permission available in the Forest
342pub fn collect_rpc_method_info() -> Vec<(&'static str, Permission)> {
343    use crate::rpc::RpcMethod;
344
345    let mut methods = Vec::new();
346
347    macro_rules! add_method {
348        ($ty:ty) => {
349            methods.push((<$ty>::NAME, <$ty>::PERMISSION));
350        };
351    }
352
353    for_each_rpc_method!(add_method);
354
355    methods
356}
357
358/// All the methods live in their own folder
359///
360/// # Handling types
361/// - If a `struct` or `enum` is only used in the RPC API, it should live in `src/rpc`.
362///   - If it is used in only one API vertical (i.e `auth` or `chain`), then it should live
363///     in either:
364///     - `src/rpc/methods/auth.rs` (if there are only a few).
365///     - `src/rpc/methods/auth/types.rs` (if there are so many that they would cause clutter).
366///   - If it is used _across_ API verticals, it should live in `src/rpc/types.rs`
367///
368/// # Interactions with the [`lotus_json`] APIs
369/// - Types may have fields which must go through [`LotusJson`],
370///   and MUST reflect that in their [`JsonSchema`].
371///   You have two options for this:
372///   - Use `#[attributes]` to control serialization and schema generation:
373///     ```ignore
374///     #[derive(Deserialize, Serialize, JsonSchema)]
375///     struct Foo {
376///         #[serde(with = "crate::lotus_json")] // perform the conversion
377///         #[schemars(with = "LotusJson<Cid>")] // advertise the schema to be converted
378///         cid: Cid, // use the native type in application logic
379///     }
380///     ```
381///   - Use [`LotusJson`] directly. This means that serialization and the [`JsonSchema`]
382///     will never go out of sync.
383///     ```ignore
384///     #[derive(Deserialize, Serialize, JsonSchema)]
385///     struct Foo {
386///         cid: LotusJson<Cid>, // use the shim type in application logic, manually performing conversions
387///     }
388///     ```
389///
390/// [`lotus_json`]: crate::lotus_json
391/// [`HasLotusJson`]: crate::lotus_json::HasLotusJson
392/// [`LotusJson`]: crate::lotus_json::LotusJson
393/// [`JsonSchema`]: schemars::JsonSchema
394mod methods {
395    pub mod auth;
396    pub mod beacon;
397    pub mod chain;
398    pub mod common;
399    pub mod eth;
400    pub mod f3;
401    pub mod gas;
402    pub mod market;
403    pub mod miner;
404    pub mod misc;
405    pub mod mpool;
406    pub mod msig;
407    pub mod net;
408    pub mod node;
409    pub mod state;
410    pub mod sync;
411    pub mod wallet;
412}
413
414use crate::rpc::auth_layer::AuthLayer;
415pub use crate::rpc::channel::CANCEL_METHOD_NAME;
416use crate::rpc::channel::RpcModule as FilRpcModule;
417use crate::rpc::eth::pubsub::EthPubSub;
418use crate::rpc::metrics_layer::MetricsLayer;
419use crate::{chain_sync::network_context::SyncNetworkContext, key_management::KeyStore};
420
421use crate::blocks::FullTipset;
422use fvm_ipld_blockstore::Blockstore;
423use jsonrpsee::{
424    Methods,
425    core::middleware::RpcServiceBuilder,
426    server::{RpcModule, Server, StopHandle, TowerServiceBuilder, stop_channel},
427};
428use parking_lot::RwLock;
429use std::env;
430use std::net::SocketAddr;
431use std::sync::{Arc, LazyLock};
432use std::time::Duration;
433use tokio::sync::mpsc;
434use tower::Service;
435
436use crate::rpc::sync::SnapshotProgressState;
437use openrpc_types::{self, ParamStructure};
438
439pub const DEFAULT_PORT: u16 = 2345;
440
441/// Request timeout read from environment variables
442static DEFAULT_REQUEST_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
443    env::var("FOREST_RPC_DEFAULT_TIMEOUT")
444        .ok()
445        .and_then(|it| Duration::from_secs(it.parse().ok()?).into())
446        .unwrap_or(Duration::from_secs(60))
447});
448
449const MAX_REQUEST_BODY_SIZE: u32 = 64 * 1024 * 1024;
450const MAX_RESPONSE_BODY_SIZE: u32 = MAX_REQUEST_BODY_SIZE;
451
452/// This is where you store persistent data, or at least access to stateful
453/// data.
454pub struct RPCState<DB> {
455    pub keystore: Arc<RwLock<KeyStore>>,
456    pub state_manager: Arc<crate::state_manager::StateManager<DB>>,
457    pub mpool: Arc<crate::message_pool::MessagePool<crate::message_pool::MpoolRpcProvider<DB>>>,
458    pub bad_blocks: Option<Arc<crate::chain_sync::BadBlockCache>>,
459    pub msgs_in_tipset: Arc<crate::chain::store::MsgsInTipsetCache>,
460    pub sync_status: crate::chain_sync::SyncStatus,
461    pub eth_event_handler: Arc<EthEventHandler>,
462    pub sync_network_context: SyncNetworkContext<DB>,
463    pub tipset_send: flume::Sender<Arc<FullTipset>>,
464    pub start_time: chrono::DateTime<chrono::Utc>,
465    pub snapshot_progress_tracker: SnapshotProgressTracker,
466    pub shutdown: mpsc::Sender<()>,
467}
468
469impl<DB: Blockstore> RPCState<DB> {
470    pub fn beacon(&self) -> &Arc<crate::beacon::BeaconSchedule> {
471        self.state_manager.beacon_schedule()
472    }
473
474    pub fn chain_store(&self) -> &Arc<crate::chain::ChainStore<DB>> {
475        self.state_manager.chain_store()
476    }
477
478    pub fn chain_index(&self) -> &Arc<crate::chain::index::ChainIndex<Arc<DB>>> {
479        self.chain_store().chain_index()
480    }
481
482    pub fn chain_config(&self) -> &Arc<crate::networks::ChainConfig> {
483        self.state_manager.chain_config()
484    }
485
486    pub fn store(&self) -> &DB {
487        self.chain_store().blockstore()
488    }
489
490    pub fn store_owned(&self) -> Arc<DB> {
491        self.state_manager.blockstore_owned()
492    }
493
494    pub fn network_send(&self) -> &flume::Sender<crate::libp2p::NetworkMessage> {
495        self.sync_network_context.network_send()
496    }
497
498    pub fn get_snapshot_progress_tracker(&self) -> SnapshotProgressState {
499        self.snapshot_progress_tracker.state()
500    }
501}
502
503#[derive(Clone)]
504struct PerConnection<RpcMiddleware, HttpMiddleware> {
505    stop_handle: StopHandle,
506    svc_builder: TowerServiceBuilder<RpcMiddleware, HttpMiddleware>,
507    keystore: Arc<RwLock<KeyStore>>,
508}
509
510pub async fn start_rpc<DB>(
511    state: RPCState<DB>,
512    rpc_endpoint: SocketAddr,
513    filter_list: Option<FilterList>,
514) -> anyhow::Result<()>
515where
516    DB: Blockstore + Send + Sync + 'static,
517{
518    let filter_list = filter_list.unwrap_or_default();
519    // `Arc` is needed because we will share the state between two modules
520    let state = Arc::new(state);
521    let keystore = state.keystore.clone();
522    let mut modules = create_modules(state.clone());
523
524    let mut pubsub_module = FilRpcModule::default();
525    pubsub_module.register_channel("Filecoin.ChainNotify", {
526        let state_clone = state.clone();
527        move |params| chain::chain_notify(params, &state_clone)
528    })?;
529
530    for module in modules.values_mut() {
531        // register eth subscription APIs
532        module.merge(EthPubSub::new(state.clone()).into_rpc())?;
533        module.merge(pubsub_module.clone())?;
534    }
535
536    let methods: Arc<HashMap<ApiPaths, Methods>> =
537        Arc::new(modules.into_iter().map(|(k, v)| (k, v.into())).collect());
538
539    let (stop_handle, _server_handle) = stop_channel();
540
541    let per_conn = PerConnection {
542        stop_handle: stop_handle.clone(),
543        svc_builder: Server::builder()
544            .set_config(
545                ServerConfig::builder()
546                    // Default size (10 MiB) is not enough for methods like `Filecoin.StateMinerActiveSectors`
547                    .max_request_body_size(MAX_REQUEST_BODY_SIZE)
548                    .max_response_body_size(MAX_RESPONSE_BODY_SIZE)
549                    .set_id_provider(RandomHexStringIdProvider::new())
550                    .build(),
551            )
552            .set_http_middleware(
553                tower::ServiceBuilder::new()
554                    .layer(CompressionLayer::new())
555                    // Mark the `Authorization` request header as sensitive so it doesn't show in logs
556                    .layer(SetSensitiveRequestHeadersLayer::new(std::iter::once(
557                        http::header::AUTHORIZATION,
558                    ))),
559            )
560            .to_service_builder(),
561        keystore,
562    };
563
564    let listener = tokio::net::TcpListener::bind(rpc_endpoint).await.unwrap();
565    tracing::info!("Ready for RPC connections");
566    loop {
567        let sock = tokio::select! {
568        res = listener.accept() => {
569            match res {
570              Ok((stream, _remote_addr)) => stream,
571              Err(e) => {
572                tracing::error!("failed to accept v4 connection: {:?}", e);
573                continue;
574              }
575            }
576          }
577          _ = per_conn.stop_handle.clone().shutdown() => break,
578        };
579
580        let svc = tower::service_fn({
581            let methods = methods.clone();
582            let per_conn = per_conn.clone();
583            let filter_list = filter_list.clone();
584            move |req| {
585                let is_websocket = jsonrpsee::server::ws::is_upgrade_request(&req);
586                let path = ApiPaths::from_uri(req.uri()).unwrap_or(ApiPaths::V1);
587                let methods = methods.get(&path).cloned().unwrap_or_default();
588                let PerConnection {
589                    stop_handle,
590                    svc_builder,
591                    keystore,
592                } = per_conn.clone();
593                // NOTE, the rpc middleware must be initialized here to be able to be created once per connection
594                // with data from the connection such as the headers in this example
595                let headers = req.headers().clone();
596                let rpc_middleware = RpcServiceBuilder::new()
597                    .layer(SetExtensionLayer { path })
598                    .layer(SegregationLayer)
599                    .layer(FilterLayer::new(filter_list.clone()))
600                    .layer(AuthLayer {
601                        headers,
602                        keystore: keystore.clone(),
603                    })
604                    .layer(LogLayer::default())
605                    .layer(MetricsLayer::default());
606                let mut jsonrpsee_svc = svc_builder
607                    .set_rpc_middleware(rpc_middleware)
608                    .build(methods, stop_handle);
609
610                if is_websocket {
611                    // Utilize the session close future to know when the actual WebSocket
612                    // session was closed.
613                    let session_close = jsonrpsee_svc.on_session_closed();
614
615                    // A little bit weird API but the response to HTTP request must be returned below
616                    // and we spawn a task to register when the session is closed.
617                    tokio::spawn(async move {
618                        session_close.await;
619                        tracing::trace!("Closed WebSocket connection");
620                    });
621
622                    async move {
623                        tracing::trace!("Opened WebSocket connection");
624                        // https://github.com/rust-lang/rust/issues/102211 the error type can't be inferred
625                        // to be `Box<dyn std::error::Error + Send + Sync>` so we need to convert it to a concrete type
626                        // as workaround.
627                        jsonrpsee_svc
628                            .call(req)
629                            .await
630                            .map_err(|e| anyhow::anyhow!("{:?}", e))
631                    }
632                    .boxed()
633                } else {
634                    // HTTP.
635                    async move {
636                        tracing::trace!("Opened HTTP connection");
637                        let rp = jsonrpsee_svc.call(req).await;
638                        tracing::trace!("Closed HTTP connection");
639                        // https://github.com/rust-lang/rust/issues/102211 the error type can't be inferred
640                        // to be `Box<dyn std::error::Error + Send + Sync>` so we need to convert it to a concrete type
641                        // as workaround.
642                        rp.map_err(|e| anyhow::anyhow!("{:?}", e))
643                    }
644                    .boxed()
645                }
646            }
647        });
648
649        tokio::spawn(jsonrpsee::server::serve_with_graceful_shutdown(
650            sock,
651            svc,
652            stop_handle.clone().shutdown(),
653        ));
654    }
655
656    Ok(())
657}
658
659fn create_modules<DB>(state: Arc<RPCState<DB>>) -> HashMap<ApiPaths, RpcModule<RPCState<DB>>>
660where
661    DB: Blockstore + Send + Sync + 'static,
662{
663    let mut modules = HashMap::default();
664    for api_version in ApiPaths::value_variants() {
665        modules.insert(*api_version, RpcModule::from_arc(state.clone()));
666    }
667    macro_rules! register {
668        ($ty:ty) => {
669            // Register only non-subscription RPC methods.
670            // Subscription methods are registered separately in the RPC module.
671            if !<$ty>::SUBSCRIPTION {
672                <$ty>::register(&mut modules, ParamStructure::ByPosition).unwrap();
673            }
674        };
675    }
676    for_each_rpc_method!(register);
677    modules
678}
679
680/// If `include` is not [`None`], only methods that are listed will be returned
681pub fn openrpc(path: ApiPaths, include: Option<&[&str]>) -> openrpc_types::OpenRPC {
682    use schemars::generate::{SchemaGenerator, SchemaSettings};
683
684    let mut methods = vec![];
685    // spec says draft07
686    let mut settings = SchemaSettings::draft07();
687    // ..but uses `components`
688    settings.definitions_path = "#/components/schemas/".into();
689    let mut generator = SchemaGenerator::new(settings);
690    macro_rules! callback {
691        ($ty:ty) => {
692            if <$ty>::API_PATHS.contains(path) {
693                match include {
694                    Some(include) => match include.contains(&<$ty>::NAME) {
695                        true => {
696                            methods.push(openrpc_types::ReferenceOr::Item(<$ty>::openrpc(
697                                &mut generator,
698                                ParamStructure::ByPosition,
699                                &<$ty>::NAME,
700                            )));
701                            if let Some(alias) = &<$ty>::NAME_ALIAS {
702                                methods.push(openrpc_types::ReferenceOr::Item(<$ty>::openrpc(
703                                    &mut generator,
704                                    ParamStructure::ByPosition,
705                                    &alias,
706                                )));
707                            }
708                        }
709                        false => {}
710                    },
711                    None => {
712                        methods.push(openrpc_types::ReferenceOr::Item(<$ty>::openrpc(
713                            &mut generator,
714                            ParamStructure::ByPosition,
715                            &<$ty>::NAME,
716                        )));
717                        if let Some(alias) = &<$ty>::NAME_ALIAS {
718                            methods.push(openrpc_types::ReferenceOr::Item(<$ty>::openrpc(
719                                &mut generator,
720                                ParamStructure::ByPosition,
721                                &alias,
722                            )));
723                        }
724                    }
725                }
726            }
727        };
728    }
729    for_each_rpc_method!(callback);
730    openrpc_types::OpenRPC {
731        methods,
732        components: Some(openrpc_types::Components {
733            schemas: Some(
734                generator
735                    .take_definitions(false)
736                    .into_iter()
737                    .filter_map(|(k, v)| {
738                        if let Ok(v) = Schema::try_from(v) {
739                            Some((k, v))
740                        } else {
741                            None
742                        }
743                    })
744                    .collect(),
745            ),
746            ..Default::default()
747        }),
748        openrpc: openrpc_types::OPEN_RPC_SPECIFICATION_VERSION,
749        info: openrpc_types::Info {
750            title: String::from("forest"),
751            version: env!("CARGO_PKG_VERSION").into(),
752            ..Default::default()
753        },
754        ..Default::default()
755    }
756}
757
758#[cfg(test)]
759mod tests {
760    use crate::rpc::ApiPaths;
761
762    // `cargo test --lib -- --exact 'rpc::tests::openrpc'`
763    // `cargo insta review`
764    #[test]
765    #[ignore = "https://github.com/ChainSafe/forest/issues/4032"]
766    fn openrpc() {
767        for path in [ApiPaths::V0, ApiPaths::V1] {
768            let _spec = super::openrpc(path, None);
769            // TODO(forest): https://github.com/ChainSafe/forest/issues/4032
770            //               this is disabled because it causes lots of merge
771            //               conflicts.
772            //               We should consider re-enabling it when our RPC is
773            //               more stable.
774            //               (We still run this test to make sure we're not
775            //               violating other invariants)
776            insta::assert_yaml_snapshot!(_spec);
777        }
778    }
779}