Skip to main content

forest/rpc/
mod.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use crate::rpc::methods::eth::pubsub_trait::EthPubSubApiServer;
5mod auth_layer;
6mod channel;
7mod client;
8mod compression_layer;
9mod filter_layer;
10mod filter_list;
11pub mod json_validator;
12mod log_layer;
13mod metrics_layer;
14mod request;
15mod segregation_layer;
16mod set_extension_layer;
17mod validation_layer;
18
19use crate::rpc::eth::types::RandomHexStringIdProvider;
20use crate::shim::clock::ChainEpoch;
21use clap::ValueEnum as _;
22pub use client::Client;
23pub use error::ServerError;
24use eth::filter::EthEventHandler;
25use filter_layer::FilterLayer;
26pub use filter_list::FilterList;
27use futures::FutureExt as _;
28use jsonrpsee::server::ServerConfig;
29use log_layer::LogLayer;
30use reflect::Ctx;
31pub use reflect::{ApiPaths, Permission, RpcMethod, RpcMethodExt};
32pub use request::Request;
33use schemars::Schema;
34use segregation_layer::SegregationLayer;
35use set_extension_layer::SetExtensionLayer;
36mod error;
37mod reflect;
38use ahash::HashMap;
39mod registry;
40pub mod types;
41
42pub use methods::*;
43
44/// Protocol or transport-specific error
45pub use jsonrpsee::core::ClientError;
46
47/// Sentinel value, indicating no limit on how far back to search in the chain (all the way to genesis epoch).
48pub const LOOKBACK_NO_LIMIT: ChainEpoch = -1;
49
50/// The macro `callback` will be passed in each type that implements
51/// [`RpcMethod`].
52///
53/// This is a macro because there is no way to abstract the `ARITY` on that
54/// trait.
55///
56/// All methods should be entered here.
57#[macro_export]
58macro_rules! for_each_rpc_method {
59    ($callback:path) => {
60        // auth vertical
61        $callback!($crate::rpc::auth::AuthNew);
62        $callback!($crate::rpc::auth::AuthVerify);
63
64        // beacon vertical
65        $callback!($crate::rpc::beacon::BeaconGetEntry);
66
67        // chain vertical
68        $callback!($crate::rpc::chain::ChainPruneSnapshot);
69        $callback!($crate::rpc::chain::ChainExport);
70        $callback!($crate::rpc::chain::ChainGetBlock);
71        $callback!($crate::rpc::chain::ChainGetBlockMessages);
72        $callback!($crate::rpc::chain::ChainGetEvents);
73        $callback!($crate::rpc::chain::ChainGetGenesis);
74        $callback!($crate::rpc::chain::ChainGetFinalizedTipset);
75        $callback!($crate::rpc::chain::ChainGetMessage);
76        $callback!($crate::rpc::chain::ChainGetMessagesInTipset);
77        $callback!($crate::rpc::chain::ChainGetMinBaseFee);
78        $callback!($crate::rpc::chain::ChainGetParentMessages);
79        $callback!($crate::rpc::chain::ChainGetParentReceipts);
80        $callback!($crate::rpc::chain::ChainGetPath);
81        $callback!($crate::rpc::chain::ChainGetTipSet);
82        $callback!($crate::rpc::chain::ChainGetTipSetV2);
83        $callback!($crate::rpc::chain::ChainGetTipSetFinalityStatus);
84        $callback!($crate::rpc::chain::ChainGetTipSetAfterHeight);
85        $callback!($crate::rpc::chain::ChainGetTipSetByHeight);
86        $callback!($crate::rpc::chain::ChainHasObj);
87        $callback!($crate::rpc::chain::ChainHead);
88        $callback!($crate::rpc::chain::ChainReadObj);
89        $callback!($crate::rpc::chain::ChainSetHead);
90        $callback!($crate::rpc::chain::ChainStatObj);
91        $callback!($crate::rpc::chain::ChainTipSetWeight);
92        $callback!($crate::rpc::chain::ForestChainExport);
93        $callback!($crate::rpc::chain::ForestChainExportDiff);
94        $callback!($crate::rpc::chain::ForestChainExportStatus);
95        $callback!($crate::rpc::chain::ForestChainExportCancel);
96        $callback!($crate::rpc::chain::ChainGetTipsetByParentState);
97
98        // common vertical
99        $callback!($crate::rpc::common::Session);
100        $callback!($crate::rpc::common::Shutdown);
101        $callback!($crate::rpc::common::StartTime);
102        $callback!($crate::rpc::common::Version);
103
104        // eth vertical
105        $callback!($crate::rpc::eth::EthAccounts);
106        $callback!($crate::rpc::eth::EthAddressToFilecoinAddress);
107        $callback!($crate::rpc::eth::FilecoinAddressToEthAddress);
108        $callback!($crate::rpc::eth::EthBlockNumber);
109        $callback!($crate::rpc::eth::EthCall);
110        $callback!($crate::rpc::eth::EthChainId);
111        $callback!($crate::rpc::eth::EthEstimateGas);
112        $callback!($crate::rpc::eth::EthFeeHistory);
113        $callback!($crate::rpc::eth::EthGasPrice);
114        $callback!($crate::rpc::eth::EthGetBalance);
115        $callback!($crate::rpc::eth::EthGetBlockByHash);
116        $callback!($crate::rpc::eth::EthGetBlockByNumber);
117        $callback!($crate::rpc::eth::EthGetBlockReceipts);
118        $callback!($crate::rpc::eth::EthGetBlockReceiptsLimited);
119        $callback!($crate::rpc::eth::EthGetBlockTransactionCountByHash);
120        $callback!($crate::rpc::eth::EthGetBlockTransactionCountByNumber);
121        $callback!($crate::rpc::eth::EthGetCode);
122        $callback!($crate::rpc::eth::EthGetLogs);
123        $callback!($crate::rpc::eth::EthGetFilterLogs);
124        $callback!($crate::rpc::eth::EthGetFilterChanges);
125        $callback!($crate::rpc::eth::EthGetMessageCidByTransactionHash);
126        $callback!($crate::rpc::eth::EthGetStorageAt);
127        $callback!($crate::rpc::eth::EthGetTransactionByHash);
128        $callback!($crate::rpc::eth::EthGetTransactionByHashLimited);
129        $callback!($crate::rpc::eth::EthGetTransactionCount);
130        $callback!($crate::rpc::eth::EthGetTransactionHashByCid);
131        $callback!($crate::rpc::eth::EthGetTransactionByBlockNumberAndIndex);
132        $callback!($crate::rpc::eth::EthGetTransactionByBlockHashAndIndex);
133        $callback!($crate::rpc::eth::EthMaxPriorityFeePerGas);
134        $callback!($crate::rpc::eth::EthProtocolVersion);
135        $callback!($crate::rpc::eth::EthGetTransactionReceipt);
136        $callback!($crate::rpc::eth::EthGetTransactionReceiptLimited);
137        $callback!($crate::rpc::eth::EthNewFilter);
138        $callback!($crate::rpc::eth::EthNewPendingTransactionFilter);
139        $callback!($crate::rpc::eth::EthNewBlockFilter);
140        $callback!($crate::rpc::eth::EthUninstallFilter);
141        $callback!($crate::rpc::eth::EthUnsubscribe);
142        $callback!($crate::rpc::eth::EthSubscribe);
143        $callback!($crate::rpc::eth::EthSyncing);
144        $callback!($crate::rpc::eth::EthTraceBlock);
145        $callback!($crate::rpc::eth::EthTraceCall);
146        $callback!($crate::rpc::eth::EthTraceFilter);
147        $callback!($crate::rpc::eth::EthTraceTransaction);
148        $callback!($crate::rpc::eth::EthDebugTraceTransaction);
149        $callback!($crate::rpc::eth::EthTraceReplayBlockTransactions);
150        $callback!($crate::rpc::eth::Web3ClientVersion);
151        $callback!($crate::rpc::eth::EthSendRawTransaction);
152        $callback!($crate::rpc::eth::EthSendRawTransactionUntrusted);
153
154        // gas vertical
155        $callback!($crate::rpc::gas::GasEstimateFeeCap);
156        $callback!($crate::rpc::gas::GasEstimateGasLimit);
157        $callback!($crate::rpc::gas::GasEstimateGasPremium);
158        $callback!($crate::rpc::gas::GasEstimateMessageGas);
159
160        // market vertical
161        $callback!($crate::rpc::market::MarketAddBalance);
162
163        // miner vertical
164        $callback!($crate::rpc::miner::MinerCreateBlock);
165        $callback!($crate::rpc::miner::MinerGetBaseInfo);
166
167        // mpool vertical
168        $callback!($crate::rpc::mpool::MpoolBatchPush);
169        $callback!($crate::rpc::mpool::MpoolBatchPushUntrusted);
170        $callback!($crate::rpc::mpool::MpoolGetNonce);
171        $callback!($crate::rpc::mpool::MpoolPending);
172        $callback!($crate::rpc::mpool::MpoolPush);
173        $callback!($crate::rpc::mpool::MpoolPushMessage);
174        $callback!($crate::rpc::mpool::MpoolPushUntrusted);
175        $callback!($crate::rpc::mpool::MpoolSelect);
176
177        // msig vertical
178        $callback!($crate::rpc::msig::MsigGetAvailableBalance);
179        $callback!($crate::rpc::msig::MsigGetPending);
180        $callback!($crate::rpc::msig::MsigGetVested);
181        $callback!($crate::rpc::msig::MsigGetVestingSchedule);
182
183        // net vertical
184        $callback!($crate::rpc::net::NetAddrsListen);
185        $callback!($crate::rpc::net::NetAgentVersion);
186        $callback!($crate::rpc::net::NetAutoNatStatus);
187        $callback!($crate::rpc::net::NetConnect);
188        $callback!($crate::rpc::net::NetDisconnect);
189        $callback!($crate::rpc::net::NetFindPeer);
190        $callback!($crate::rpc::net::NetInfo);
191        $callback!($crate::rpc::net::NetListening);
192        $callback!($crate::rpc::net::NetPeers);
193        $callback!($crate::rpc::net::NetProtectAdd);
194        $callback!($crate::rpc::net::NetProtectList);
195        $callback!($crate::rpc::net::NetProtectRemove);
196        $callback!($crate::rpc::net::NetVersion);
197        $callback!($crate::rpc::net::NetChainExchange);
198
199        // node vertical
200        $callback!($crate::rpc::node::NodeStatus);
201
202        // state vertical
203        $callback!($crate::rpc::state::StateAccountKey);
204        $callback!($crate::rpc::state::StateCall);
205        $callback!($crate::rpc::state::StateCirculatingSupply);
206        $callback!($crate::rpc::state::ForestStateCompute);
207        $callback!($crate::rpc::state::StateCompute);
208        $callback!($crate::rpc::state::StateDealProviderCollateralBounds);
209        $callback!($crate::rpc::state::StateFetchRoot);
210        $callback!($crate::rpc::state::StateGetActor);
211        $callback!($crate::rpc::state::StateGetActorV2);
212        $callback!($crate::rpc::state::StateGetID);
213        $callback!($crate::rpc::state::StateGetAllAllocations);
214        $callback!($crate::rpc::state::StateGetAllClaims);
215        $callback!($crate::rpc::state::StateGetAllocation);
216        $callback!($crate::rpc::state::StateGetAllocationForPendingDeal);
217        $callback!($crate::rpc::state::StateGetAllocationIdForPendingDeal);
218        $callback!($crate::rpc::state::StateGetAllocations);
219        $callback!($crate::rpc::state::StateGetBeaconEntry);
220        $callback!($crate::rpc::state::StateGetClaim);
221        $callback!($crate::rpc::state::StateGetClaims);
222        $callback!($crate::rpc::state::StateGetNetworkParams);
223        $callback!($crate::rpc::state::StateGetRandomnessDigestFromBeacon);
224        $callback!($crate::rpc::state::StateGetRandomnessDigestFromTickets);
225        $callback!($crate::rpc::state::StateGetRandomnessFromBeacon);
226        $callback!($crate::rpc::state::StateGetRandomnessFromTickets);
227        $callback!($crate::rpc::state::StateGetReceipt);
228        $callback!($crate::rpc::state::StateListActors);
229        $callback!($crate::rpc::state::StateListMessages);
230        $callback!($crate::rpc::state::StateListMiners);
231        $callback!($crate::rpc::state::StateLookupID);
232        $callback!($crate::rpc::state::StateLookupRobustAddress);
233        $callback!($crate::rpc::state::StateMarketBalance);
234        $callback!($crate::rpc::state::StateMarketDeals);
235        $callback!($crate::rpc::state::StateMarketParticipants);
236        $callback!($crate::rpc::state::StateMarketStorageDeal);
237        $callback!($crate::rpc::state::StateMinerActiveSectors);
238        $callback!($crate::rpc::state::StateMinerAllocated);
239        $callback!($crate::rpc::state::StateMinerAvailableBalance);
240        $callback!($crate::rpc::state::StateMinerDeadlines);
241        $callback!($crate::rpc::state::StateMinerFaults);
242        $callback!($crate::rpc::state::StateMinerInfo);
243        $callback!($crate::rpc::state::StateMinerInitialPledgeCollateral);
244        $callback!($crate::rpc::state::StateMinerPartitions);
245        $callback!($crate::rpc::state::StateMinerPower);
246        $callback!($crate::rpc::state::StateMinerPreCommitDepositForPower);
247        $callback!($crate::rpc::state::StateMinerProvingDeadline);
248        $callback!($crate::rpc::state::StateMinerRecoveries);
249        $callback!($crate::rpc::state::StateMinerSectorAllocated);
250        $callback!($crate::rpc::state::StateMinerSectorCount);
251        $callback!($crate::rpc::state::StateMinerSectors);
252        $callback!($crate::rpc::state::StateNetworkName);
253        $callback!($crate::rpc::state::StateNetworkVersion);
254        $callback!($crate::rpc::state::StateActorInfo);
255        $callback!($crate::rpc::state::StateReadState);
256        $callback!($crate::rpc::state::StateDecodeParams);
257        $callback!($crate::rpc::state::StateReplay);
258        $callback!($crate::rpc::state::StateSearchMsg);
259        $callback!($crate::rpc::state::StateSearchMsgLimited);
260        $callback!($crate::rpc::state::StateSectorExpiration);
261        $callback!($crate::rpc::state::StateSectorGetInfo);
262        $callback!($crate::rpc::state::StateSectorPartition);
263        $callback!($crate::rpc::state::StateSectorPreCommitInfo);
264        $callback!($crate::rpc::state::StateSectorPreCommitInfoV0);
265        $callback!($crate::rpc::state::StateVerifiedClientStatus);
266        $callback!($crate::rpc::state::StateVerifiedRegistryRootKey);
267        $callback!($crate::rpc::state::StateVerifierStatus);
268        $callback!($crate::rpc::state::StateVMCirculatingSupplyInternal);
269        $callback!($crate::rpc::state::StateWaitMsg);
270        $callback!($crate::rpc::state::StateWaitMsgV0);
271        $callback!($crate::rpc::state::StateMinerInitialPledgeForSector);
272
273        // sync vertical
274        $callback!($crate::rpc::sync::SyncCheckBad);
275        $callback!($crate::rpc::sync::SyncMarkBad);
276        $callback!($crate::rpc::sync::SyncSnapshotProgress);
277        $callback!($crate::rpc::sync::SyncStatus);
278        $callback!($crate::rpc::sync::SyncSubmitBlock);
279
280        // wallet vertical
281        $callback!($crate::rpc::wallet::WalletBalance);
282        $callback!($crate::rpc::wallet::WalletDefaultAddress);
283        $callback!($crate::rpc::wallet::WalletDelete);
284        $callback!($crate::rpc::wallet::WalletExport);
285        $callback!($crate::rpc::wallet::WalletHas);
286        $callback!($crate::rpc::wallet::WalletImport);
287        $callback!($crate::rpc::wallet::WalletList);
288        $callback!($crate::rpc::wallet::WalletNew);
289        $callback!($crate::rpc::wallet::WalletSetDefault);
290        $callback!($crate::rpc::wallet::WalletSign);
291        $callback!($crate::rpc::wallet::WalletSignMessage);
292        $callback!($crate::rpc::wallet::WalletValidateAddress);
293        $callback!($crate::rpc::wallet::WalletVerify);
294
295        // f3
296        $callback!($crate::rpc::f3::GetRawNetworkName);
297        $callback!($crate::rpc::f3::F3GetCertificate);
298        $callback!($crate::rpc::f3::F3GetECPowerTable);
299        $callback!($crate::rpc::f3::F3GetF3PowerTable);
300        $callback!($crate::rpc::f3::F3GetF3PowerTableByInstance);
301        $callback!($crate::rpc::f3::F3IsRunning);
302        $callback!($crate::rpc::f3::F3GetProgress);
303        $callback!($crate::rpc::f3::F3GetManifest);
304        $callback!($crate::rpc::f3::F3ListParticipants);
305        $callback!($crate::rpc::f3::F3GetLatestCertificate);
306        $callback!($crate::rpc::f3::F3GetOrRenewParticipationTicket);
307        $callback!($crate::rpc::f3::F3Participate);
308        $callback!($crate::rpc::f3::F3ExportLatestSnapshot);
309        $callback!($crate::rpc::f3::GetHead);
310        $callback!($crate::rpc::f3::GetParent);
311        $callback!($crate::rpc::f3::GetParticipatingMinerIDs);
312        $callback!($crate::rpc::f3::GetPowerTable);
313        $callback!($crate::rpc::f3::GetTipset);
314        $callback!($crate::rpc::f3::GetTipsetByEpoch);
315        $callback!($crate::rpc::f3::Finalize);
316        $callback!($crate::rpc::f3::ProtectPeer);
317        $callback!($crate::rpc::f3::SignMessage);
318
319        // misc
320        $callback!($crate::rpc::misc::GetActorEventsRaw);
321    };
322}
323use compression_layer::{COMPRESS_MIN_BODY_SIZE, CompressionLayer};
324pub(crate) use for_each_rpc_method;
325use sync::SnapshotProgressTracker;
326use tower_http::sensitive_headers::SetSensitiveRequestHeadersLayer;
327
328#[allow(unused)]
329/// All handler definitions.
330///
331/// Usage guide:
332/// ```ignore
333/// use crate::rpc::{self, prelude::*};
334///
335/// let client = rpc::Client::from(..);
336/// ChainHead::call(&client, ()).await?;
337/// fn foo() -> rpc::ClientError {..}
338/// fn bar() -> rpc::ServerError {..}
339/// ```
340pub mod prelude {
341    use super::*;
342
343    pub use reflect::RpcMethodExt as _;
344
345    macro_rules! export {
346        ($ty:ty) => {
347            pub use $ty;
348        };
349    }
350
351    for_each_rpc_method!(export);
352}
353
354/// Collects all the RPC method names and permission available in the Forest
355pub fn collect_rpc_method_info() -> Vec<(&'static str, Permission)> {
356    use crate::rpc::RpcMethod;
357
358    let mut methods = Vec::new();
359
360    macro_rules! add_method {
361        ($ty:ty) => {
362            methods.push((<$ty>::NAME, <$ty>::PERMISSION));
363        };
364    }
365
366    for_each_rpc_method!(add_method);
367
368    methods
369}
370
371/// All the methods live in their own folder
372///
373/// # Handling types
374/// - If a `struct` or `enum` is only used in the RPC API, it should live in `src/rpc`.
375///   - If it is used in only one API vertical (i.e `auth` or `chain`), then it should live
376///     in either:
377///     - `src/rpc/methods/auth.rs` (if there are only a few).
378///     - `src/rpc/methods/auth/types.rs` (if there are so many that they would cause clutter).
379///   - If it is used _across_ API verticals, it should live in `src/rpc/types.rs`
380///
381/// # Interactions with the [`lotus_json`] APIs
382/// - Types may have fields which must go through [`LotusJson`],
383///   and MUST reflect that in their [`JsonSchema`].
384///   You have two options for this:
385///   - Use `#[attributes]` to control serialization and schema generation:
386///     ```ignore
387///     #[derive(Deserialize, Serialize, JsonSchema)]
388///     struct Foo {
389///         #[serde(with = "crate::lotus_json")] // perform the conversion
390///         #[schemars(with = "LotusJson<Cid>")] // advertise the schema to be converted
391///         cid: Cid, // use the native type in application logic
392///     }
393///     ```
394///   - Use [`LotusJson`] directly. This means that serialization and the [`JsonSchema`]
395///     will never go out of sync.
396///     ```ignore
397///     #[derive(Deserialize, Serialize, JsonSchema)]
398///     struct Foo {
399///         cid: LotusJson<Cid>, // use the shim type in application logic, manually performing conversions
400///     }
401///     ```
402///
403/// [`lotus_json`]: crate::lotus_json
404/// [`HasLotusJson`]: crate::lotus_json::HasLotusJson
405/// [`LotusJson`]: crate::lotus_json::LotusJson
406/// [`JsonSchema`]: schemars::JsonSchema
407mod methods {
408    pub mod auth;
409    pub mod beacon;
410    pub mod chain;
411    pub mod common;
412    pub mod eth;
413    pub mod f3;
414    pub mod gas;
415    pub mod market;
416    pub mod miner;
417    pub mod misc;
418    pub mod mpool;
419    pub mod msig;
420    pub mod net;
421    pub mod node;
422    pub mod state;
423    pub mod sync;
424    pub mod wallet;
425}
426
427use crate::rpc::auth_layer::AuthLayer;
428pub use crate::rpc::channel::CANCEL_METHOD_NAME;
429use crate::rpc::channel::RpcModule as FilRpcModule;
430use crate::rpc::eth::pubsub::EthPubSub;
431use crate::rpc::metrics_layer::MetricsLayer;
432use crate::{chain_sync::network_context::SyncNetworkContext, key_management::KeyStore};
433
434use crate::blocks::FullTipset;
435use fvm_ipld_blockstore::Blockstore;
436use jsonrpsee::{
437    Methods,
438    core::middleware::RpcServiceBuilder,
439    server::{RpcModule, Server, StopHandle, TowerServiceBuilder},
440};
441use parking_lot::RwLock;
442use std::env;
443use std::sync::{Arc, LazyLock};
444use std::time::Duration;
445use tokio::sync::mpsc;
446use tower::Service;
447
448use crate::rpc::sync::SnapshotProgressState;
449use openrpc_types::{self, ParamStructure};
450
451pub const DEFAULT_PORT: u16 = 2345;
452
453/// Request timeout read from environment variables
454static DEFAULT_REQUEST_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
455    env::var("FOREST_RPC_DEFAULT_TIMEOUT")
456        .ok()
457        .and_then(|it| Duration::from_secs(it.parse().ok()?).into())
458        .unwrap_or(Duration::from_secs(60))
459});
460
461/// Default maximum connections for the RPC server. This needs to be high enough to
462/// accommodate the regular usage for RPC providers.
463static DEFAULT_MAX_CONNECTIONS: LazyLock<u32> = LazyLock::new(|| {
464    env::var("FOREST_RPC_MAX_CONNECTIONS")
465        .ok()
466        .and_then(|it| it.parse().ok())
467        .unwrap_or(1000)
468});
469
470const MAX_REQUEST_BODY_SIZE: u32 = 64 * 1024 * 1024;
471const MAX_RESPONSE_BODY_SIZE: u32 = MAX_REQUEST_BODY_SIZE;
472
473/// This is where you store persistent data, or at least access to stateful
474/// data.
475pub struct RPCState<DB> {
476    pub keystore: Arc<RwLock<KeyStore>>,
477    pub state_manager: Arc<crate::state_manager::StateManager<DB>>,
478    pub mpool: Arc<crate::message_pool::MessagePool<Arc<crate::chain::ChainStore<DB>>>>,
479    pub bad_blocks: Option<Arc<crate::chain_sync::BadBlockCache>>,
480    pub sync_status: crate::chain_sync::SyncStatus,
481    pub eth_event_handler: Arc<EthEventHandler>,
482    pub sync_network_context: SyncNetworkContext<DB>,
483    pub tipset_send: flume::Sender<FullTipset>,
484    pub start_time: chrono::DateTime<chrono::Utc>,
485    pub snapshot_progress_tracker: SnapshotProgressTracker,
486    pub shutdown: mpsc::Sender<()>,
487    pub mpool_locker: crate::message_pool::MpoolLocker,
488    pub nonce_tracker: crate::message_pool::NonceTracker,
489    pub temp_dir: Arc<std::path::PathBuf>,
490}
491
492impl<DB: Blockstore> RPCState<DB> {
493    pub fn beacon(&self) -> &Arc<crate::beacon::BeaconSchedule> {
494        self.state_manager.beacon_schedule()
495    }
496
497    pub fn chain_store(&self) -> &Arc<crate::chain::ChainStore<DB>> {
498        self.state_manager.chain_store()
499    }
500
501    pub fn chain_index(&self) -> &crate::chain::index::ChainIndex<DB> {
502        self.chain_store().chain_index()
503    }
504
505    pub fn chain_config(&self) -> &Arc<crate::networks::ChainConfig> {
506        self.state_manager.chain_config()
507    }
508
509    pub fn store(&self) -> &Arc<DB> {
510        self.chain_store().blockstore()
511    }
512
513    pub fn store_owned(&self) -> Arc<DB> {
514        self.state_manager.blockstore_owned()
515    }
516
517    pub fn network_send(&self) -> &flume::Sender<crate::libp2p::NetworkMessage> {
518        self.sync_network_context.network_send()
519    }
520
521    pub fn get_snapshot_progress_tracker(&self) -> SnapshotProgressState {
522        self.snapshot_progress_tracker.state()
523    }
524}
525
526#[derive(Clone)]
527struct PerConnection<RpcMiddleware, HttpMiddleware> {
528    stop_handle: StopHandle,
529    svc_builder: TowerServiceBuilder<RpcMiddleware, HttpMiddleware>,
530    keystore: Arc<RwLock<KeyStore>>,
531}
532
533pub async fn start_rpc<DB>(
534    state: RPCState<DB>,
535    rpc_listener: tokio::net::TcpListener,
536    stop_handle: StopHandle,
537    filter_list: Option<FilterList>,
538) -> anyhow::Result<()>
539where
540    DB: Blockstore + Send + Sync + 'static,
541{
542    let filter_list = filter_list.unwrap_or_default();
543    // `Arc` is needed because we will share the state between two modules
544    let state = Arc::new(state);
545    let keystore = state.keystore.clone();
546    let mut modules = create_modules(state.clone());
547
548    let mut pubsub_module = FilRpcModule::default();
549    pubsub_module.register_channel("Filecoin.ChainNotify", {
550        let state_clone = state.clone();
551        move |params| chain::chain_notify(params, &state_clone)
552    })?;
553
554    for module in modules.values_mut() {
555        // register eth subscription APIs
556        module.merge(EthPubSub::new(state.clone()).into_rpc())?;
557        module.merge(pubsub_module.clone())?;
558    }
559
560    let methods: Arc<HashMap<ApiPaths, Methods>> =
561        Arc::new(modules.into_iter().map(|(k, v)| (k, v.into())).collect());
562
563    let per_conn = PerConnection {
564        stop_handle: stop_handle.clone(),
565        svc_builder: Server::builder()
566            .set_config(
567                ServerConfig::builder()
568                    // Default size (10 MiB) is not enough for methods like `Filecoin.StateMinerActiveSectors`
569                    .max_request_body_size(MAX_REQUEST_BODY_SIZE)
570                    .max_response_body_size(MAX_RESPONSE_BODY_SIZE)
571                    .max_connections(*DEFAULT_MAX_CONNECTIONS)
572                    .set_id_provider(RandomHexStringIdProvider::new())
573                    .build(),
574            )
575            .set_http_middleware(
576                tower::ServiceBuilder::new()
577                    .option_layer(COMPRESS_MIN_BODY_SIZE.map(CompressionLayer::new))
578                    // Mark the `Authorization` request header as sensitive so it doesn't show in logs
579                    .layer(SetSensitiveRequestHeadersLayer::new(std::iter::once(
580                        http::header::AUTHORIZATION,
581                    ))),
582            )
583            .to_service_builder(),
584        keystore,
585    };
586    tracing::info!("Ready for RPC connections");
587    loop {
588        let sock = tokio::select! {
589        res = rpc_listener.accept() => {
590            match res {
591              Ok((stream, _remote_addr)) => stream,
592              Err(e) => {
593                tracing::error!("failed to accept v4 connection: {:?}", e);
594                continue;
595              }
596            }
597          }
598          _ = per_conn.stop_handle.clone().shutdown() => break,
599        };
600
601        let svc = tower::service_fn({
602            let methods = methods.clone();
603            let per_conn = per_conn.clone();
604            let filter_list = filter_list.clone();
605            move |req| {
606                let is_websocket = jsonrpsee::server::ws::is_upgrade_request(&req);
607                let path = if let Ok(p) = ApiPaths::from_uri(req.uri()) {
608                    p
609                } else {
610                    return async move {
611                        Ok(http::Response::builder()
612                            .status(http::StatusCode::NOT_FOUND)
613                            .body(Default::default())
614                            .unwrap_or_else(|_| http::Response::new(Default::default())))
615                    }
616                    .boxed();
617                };
618                let methods = methods.get(&path).cloned().unwrap_or_default();
619                let PerConnection {
620                    stop_handle,
621                    svc_builder,
622                    keystore,
623                } = per_conn.clone();
624                // NOTE, the rpc middleware must be initialized here to be able to be created once per connection
625                // with data from the connection such as the headers in this example
626                let headers = req.headers().clone();
627                let rpc_middleware = RpcServiceBuilder::new()
628                    .layer(SetExtensionLayer { path })
629                    .layer(SegregationLayer)
630                    .layer(FilterLayer::new(filter_list.clone()))
631                    .layer(validation_layer::JsonValidationLayer)
632                    .layer(AuthLayer {
633                        headers,
634                        keystore: keystore.clone(),
635                    })
636                    .layer(LogLayer::default())
637                    .layer(MetricsLayer::default());
638                let mut jsonrpsee_svc = svc_builder
639                    .set_rpc_middleware(rpc_middleware)
640                    .build(methods, stop_handle);
641
642                if is_websocket {
643                    // Utilize the session close future to know when the actual WebSocket
644                    // session was closed.
645                    let session_close = jsonrpsee_svc.on_session_closed();
646
647                    // A little bit weird API but the response to HTTP request must be returned below
648                    // and we spawn a task to register when the session is closed.
649                    tokio::spawn(async move {
650                        session_close.await;
651                        tracing::trace!("Closed WebSocket connection");
652                    });
653
654                    async move {
655                        tracing::trace!("Opened WebSocket connection");
656                        // https://github.com/rust-lang/rust/issues/102211 the error type can't be inferred
657                        // to be `Box<dyn std::error::Error + Send + Sync>` so we need to convert it to a concrete type
658                        // as workaround.
659                        jsonrpsee_svc
660                            .call(req)
661                            .await
662                            .map_err(|e| anyhow::anyhow!("{:?}", e))
663                    }
664                    .boxed()
665                } else {
666                    // HTTP.
667                    async move {
668                        tracing::trace!("Opened HTTP connection");
669                        let rp = jsonrpsee_svc.call(req).await;
670                        tracing::trace!("Closed HTTP connection");
671                        // https://github.com/rust-lang/rust/issues/102211 the error type can't be inferred
672                        // to be `Box<dyn std::error::Error + Send + Sync>` so we need to convert it to a concrete type
673                        // as workaround.
674                        rp.map_err(|e| anyhow::anyhow!("{:?}", e))
675                    }
676                    .boxed()
677                }
678            }
679        });
680
681        tokio::spawn(jsonrpsee::server::serve_with_graceful_shutdown(
682            sock,
683            svc,
684            stop_handle.clone().shutdown(),
685        ));
686    }
687
688    Ok(())
689}
690
691fn create_modules<DB>(state: Arc<RPCState<DB>>) -> HashMap<ApiPaths, RpcModule<RPCState<DB>>>
692where
693    DB: Blockstore + Send + Sync + 'static,
694{
695    let mut modules = HashMap::default();
696    for api_version in ApiPaths::value_variants() {
697        modules.insert(*api_version, RpcModule::from_arc(state.clone()));
698    }
699    macro_rules! register {
700        ($ty:ty) => {
701            // Register only non-subscription RPC methods.
702            // Subscription methods are registered separately in the RPC module.
703            if !<$ty>::SUBSCRIPTION {
704                <$ty>::register(&mut modules, ParamStructure::ByPosition).unwrap();
705            }
706        };
707    }
708    for_each_rpc_method!(register);
709    modules
710}
711
712/// If `include` is not [`None`], only methods that are listed will be returned
713pub fn openrpc(path: ApiPaths, include: Option<&[&str]>) -> openrpc_types::OpenRPC {
714    use schemars::generate::{SchemaGenerator, SchemaSettings};
715
716    let mut methods = vec![];
717    // spec says draft07
718    let mut settings = SchemaSettings::draft07();
719    // ..but uses `components`
720    settings.definitions_path = "#/components/schemas/".into();
721    let mut generator = SchemaGenerator::new(settings);
722    macro_rules! callback {
723        ($ty:ty) => {
724            if <$ty>::API_PATHS.contains(path) {
725                match include {
726                    Some(include) => match include.contains(&<$ty>::NAME) {
727                        true => {
728                            methods.push(openrpc_types::ReferenceOr::Item(<$ty>::openrpc(
729                                &mut generator,
730                                ParamStructure::ByPosition,
731                                &<$ty>::NAME,
732                            )));
733                            if let Some(alias) = &<$ty>::NAME_ALIAS {
734                                methods.push(openrpc_types::ReferenceOr::Item(<$ty>::openrpc(
735                                    &mut generator,
736                                    ParamStructure::ByPosition,
737                                    &alias,
738                                )));
739                            }
740                        }
741                        false => {}
742                    },
743                    None => {
744                        methods.push(openrpc_types::ReferenceOr::Item(<$ty>::openrpc(
745                            &mut generator,
746                            ParamStructure::ByPosition,
747                            &<$ty>::NAME,
748                        )));
749                        if let Some(alias) = &<$ty>::NAME_ALIAS {
750                            methods.push(openrpc_types::ReferenceOr::Item(<$ty>::openrpc(
751                                &mut generator,
752                                ParamStructure::ByPosition,
753                                &alias,
754                            )));
755                        }
756                    }
757                }
758            }
759        };
760    }
761    for_each_rpc_method!(callback);
762    openrpc_types::OpenRPC {
763        methods,
764        components: Some(openrpc_types::Components {
765            schemas: Some(
766                generator
767                    .take_definitions(false)
768                    .into_iter()
769                    .filter_map(|(k, v)| {
770                        if let Ok(v) = Schema::try_from(v) {
771                            Some((k, v))
772                        } else {
773                            None
774                        }
775                    })
776                    .collect(),
777            ),
778            ..Default::default()
779        }),
780        openrpc: openrpc_types::OPEN_RPC_SPECIFICATION_VERSION,
781        info: openrpc_types::Info {
782            title: String::from("forest"),
783            version: env!("CARGO_PKG_VERSION").into(),
784            ..Default::default()
785        },
786        ..Default::default()
787    }
788}
789
790#[cfg(test)]
791mod tests {
792    use super::*;
793    use crate::{
794        db::MemoryDB, networks::NetworkChain, rpc::common::ShiftingVersion,
795        tool::offline_server::server::offline_rpc_state,
796    };
797    use jsonrpsee::server::stop_channel;
798    use std::net::{Ipv4Addr, SocketAddr};
799    use tokio::task::JoinSet;
800
801    // To update RPC specs:
802    // `cargo test --lib -- rpc::tests::openrpc`
803    // `cargo insta review`
804
805    #[test]
806    fn openrpc_v0() {
807        openrpc(ApiPaths::V0);
808    }
809
810    #[test]
811    fn openrpc_v1() {
812        openrpc(ApiPaths::V1);
813    }
814
815    #[test]
816    fn openrpc_v2() {
817        openrpc(ApiPaths::V2);
818    }
819
820    fn openrpc(path: ApiPaths) {
821        let spec = super::openrpc(path, None);
822        insta::assert_yaml_snapshot!(path.path(), spec);
823    }
824
825    #[test]
826    fn test_rpc_server() {
827        const TIMEOUT: Duration = Duration::from_secs(5);
828        let (done_tx, done_rx) = flume::bounded(1);
829        let rt = tokio::runtime::Builder::new_multi_thread()
830            .enable_all()
831            .build()
832            .unwrap();
833        rt.block_on(async move { test_rpc_server_inner(done_tx).await });
834        done_rx.recv().unwrap();
835        // To mitigate the transient timeout issue
836        rt.shutdown_timeout(TIMEOUT);
837    }
838
839    async fn test_rpc_server_inner(done_tx: flume::Sender<()>) {
840        let chain = NetworkChain::Calibnet;
841        let db = Arc::new(MemoryDB::default());
842        let mut services = JoinSet::new();
843        let (state, mut shutdown_recv) = offline_rpc_state(chain, db, None, None, &mut services)
844            .await
845            .unwrap();
846        let block_delay_secs = state.chain_config().block_delay_secs;
847        let shutdown_send = state.shutdown.clone();
848        let jwt_read_permissions = vec!["read".to_owned()];
849        let jwt_read = super::methods::auth::AuthNew::create_token(
850            &state.keystore.read(),
851            chrono::Duration::hours(1),
852            jwt_read_permissions.clone(),
853        )
854        .unwrap();
855        let rpc_listener =
856            tokio::net::TcpListener::bind(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0))
857                .await
858                .unwrap();
859        let rpc_address = rpc_listener.local_addr().unwrap();
860        let (stop_handle, server_handle) = stop_channel();
861
862        // Start an RPC server
863
864        let handle = tokio::spawn(start_rpc(state, rpc_listener, stop_handle, None));
865
866        println!("sending a few http requests");
867
868        let client = Client::from_url(
869            format!("http://{}:{}/", rpc_address.ip(), rpc_address.port())
870                .parse()
871                .unwrap(),
872        );
873
874        let response = super::methods::common::Version::call(&client, ())
875            .await
876            .unwrap();
877        assert_eq!(
878            &response.version,
879            &*crate::utils::version::FOREST_VERSION_STRING
880        );
881        assert_eq!(response.block_delay, block_delay_secs);
882        assert_eq!(response.api_version, ShiftingVersion::new(2, 3, 0));
883
884        let response = super::methods::auth::AuthVerify::call(&client, (jwt_read.clone(),))
885            .await
886            .unwrap();
887        assert_eq!(response, jwt_read_permissions);
888
889        drop(client);
890
891        println!("sending a few websocket requests");
892
893        let client = Client::from_url(
894            format!("ws://{}:{}/", rpc_address.ip(), rpc_address.port())
895                .parse()
896                .unwrap(),
897        );
898
899        let response = super::methods::auth::AuthVerify::call(&client, (jwt_read,))
900            .await
901            .unwrap();
902        assert_eq!(response, jwt_read_permissions);
903
904        drop(client);
905
906        // Gracefully shutdown the RPC server
907        println!("sending shutdown signal");
908        shutdown_send.send(()).await.unwrap();
909        println!("waiting on shutdown receiver");
910        shutdown_recv.recv().await;
911        println!("sending server stop signal");
912        server_handle.stop().unwrap();
913        println!("waiting on graceful shutdown");
914        handle.await.unwrap().unwrap();
915        println!("done");
916        done_tx.send(()).unwrap();
917    }
918}