Skip to main content

forest/rpc/
mod.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use crate::rpc::methods::eth::pubsub_trait::EthPubSubApiServer;
5mod auth_layer;
6mod channel;
7mod client;
8mod filter_layer;
9mod filter_list;
10pub mod json_validator;
11mod log_layer;
12mod metrics_layer;
13mod request;
14mod segregation_layer;
15mod set_extension_layer;
16mod validation_layer;
17
18use crate::rpc::eth::types::RandomHexStringIdProvider;
19use crate::shim::clock::ChainEpoch;
20use clap::ValueEnum as _;
21pub use client::Client;
22pub use error::ServerError;
23use eth::filter::EthEventHandler;
24use filter_layer::FilterLayer;
25pub use filter_list::FilterList;
26use futures::FutureExt as _;
27use jsonrpsee::server::ServerConfig;
28use log_layer::LogLayer;
29use reflect::Ctx;
30pub use reflect::{ApiPaths, Permission, RpcMethod, RpcMethodExt};
31pub use request::Request;
32use schemars::Schema;
33use segregation_layer::SegregationLayer;
34use set_extension_layer::SetExtensionLayer;
35mod error;
36mod reflect;
37use ahash::HashMap;
38mod registry;
39pub mod types;
40
41pub use methods::*;
42
43/// Protocol or transport-specific error
44pub use jsonrpsee::core::ClientError;
45
46/// Sentinel value, indicating no limit on how far back to search in the chain (all the way to genesis epoch).
47pub const LOOKBACK_NO_LIMIT: ChainEpoch = -1;
48
49/// The macro `callback` will be passed in each type that implements
50/// [`RpcMethod`].
51///
52/// This is a macro because there is no way to abstract the `ARITY` on that
53/// trait.
54///
55/// All methods should be entered here.
56#[macro_export]
57macro_rules! for_each_rpc_method {
58    ($callback:path) => {
59        // auth vertical
60        $callback!($crate::rpc::auth::AuthNew);
61        $callback!($crate::rpc::auth::AuthVerify);
62
63        // beacon vertical
64        $callback!($crate::rpc::beacon::BeaconGetEntry);
65
66        // chain vertical
67        $callback!($crate::rpc::chain::ChainPruneSnapshot);
68        $callback!($crate::rpc::chain::ChainExport);
69        $callback!($crate::rpc::chain::ChainGetBlock);
70        $callback!($crate::rpc::chain::ChainGetBlockMessages);
71        $callback!($crate::rpc::chain::ChainGetEvents);
72        $callback!($crate::rpc::chain::ChainGetGenesis);
73        $callback!($crate::rpc::chain::ChainGetFinalizedTipset);
74        $callback!($crate::rpc::chain::ChainGetMessage);
75        $callback!($crate::rpc::chain::ChainGetMessagesInTipset);
76        $callback!($crate::rpc::chain::ChainGetMinBaseFee);
77        $callback!($crate::rpc::chain::ChainGetParentMessages);
78        $callback!($crate::rpc::chain::ChainGetParentReceipts);
79        $callback!($crate::rpc::chain::ChainGetPath);
80        $callback!($crate::rpc::chain::ChainGetTipSet);
81        $callback!($crate::rpc::chain::ChainGetTipSetV2);
82        $callback!($crate::rpc::chain::ChainGetTipSetAfterHeight);
83        $callback!($crate::rpc::chain::ChainGetTipSetByHeight);
84        $callback!($crate::rpc::chain::ChainHasObj);
85        $callback!($crate::rpc::chain::ChainHead);
86        $callback!($crate::rpc::chain::ChainReadObj);
87        $callback!($crate::rpc::chain::ChainSetHead);
88        $callback!($crate::rpc::chain::ChainStatObj);
89        $callback!($crate::rpc::chain::ChainTipSetWeight);
90        $callback!($crate::rpc::chain::ForestChainExport);
91        $callback!($crate::rpc::chain::ForestChainExportDiff);
92        $callback!($crate::rpc::chain::ForestChainExportStatus);
93        $callback!($crate::rpc::chain::ForestChainExportCancel);
94        $callback!($crate::rpc::chain::ChainGetTipsetByParentState);
95
96        // common vertical
97        $callback!($crate::rpc::common::Session);
98        $callback!($crate::rpc::common::Shutdown);
99        $callback!($crate::rpc::common::StartTime);
100        $callback!($crate::rpc::common::Version);
101
102        // eth vertical
103        $callback!($crate::rpc::eth::EthAccounts);
104        $callback!($crate::rpc::eth::EthAddressToFilecoinAddress);
105        $callback!($crate::rpc::eth::FilecoinAddressToEthAddress);
106        $callback!($crate::rpc::eth::EthBlockNumber);
107        $callback!($crate::rpc::eth::EthCall);
108        $callback!($crate::rpc::eth::EthCallV2);
109        $callback!($crate::rpc::eth::EthChainId);
110        $callback!($crate::rpc::eth::EthEstimateGas);
111        $callback!($crate::rpc::eth::EthEstimateGasV2);
112        $callback!($crate::rpc::eth::EthFeeHistory);
113        $callback!($crate::rpc::eth::EthFeeHistoryV2);
114        $callback!($crate::rpc::eth::EthGasPrice);
115        $callback!($crate::rpc::eth::EthGetBalance);
116        $callback!($crate::rpc::eth::EthGetBalanceV2);
117        $callback!($crate::rpc::eth::EthGetBlockByHash);
118        $callback!($crate::rpc::eth::EthGetBlockByNumber);
119        $callback!($crate::rpc::eth::EthGetBlockByNumberV2);
120        $callback!($crate::rpc::eth::EthGetBlockReceipts);
121        $callback!($crate::rpc::eth::EthGetBlockReceiptsV2);
122        $callback!($crate::rpc::eth::EthGetBlockReceiptsLimited);
123        $callback!($crate::rpc::eth::EthGetBlockReceiptsLimitedV2);
124        $callback!($crate::rpc::eth::EthGetBlockTransactionCountByHash);
125        $callback!($crate::rpc::eth::EthGetBlockTransactionCountByNumber);
126        $callback!($crate::rpc::eth::EthGetBlockTransactionCountByNumberV2);
127        $callback!($crate::rpc::eth::EthGetCode);
128        $callback!($crate::rpc::eth::EthGetCodeV2);
129        $callback!($crate::rpc::eth::EthGetLogs);
130        $callback!($crate::rpc::eth::EthGetFilterLogs);
131        $callback!($crate::rpc::eth::EthGetFilterChanges);
132        $callback!($crate::rpc::eth::EthGetMessageCidByTransactionHash);
133        $callback!($crate::rpc::eth::EthGetStorageAt);
134        $callback!($crate::rpc::eth::EthGetStorageAtV2);
135        $callback!($crate::rpc::eth::EthGetTransactionByHash);
136        $callback!($crate::rpc::eth::EthGetTransactionByHashLimited);
137        $callback!($crate::rpc::eth::EthGetTransactionCount);
138        $callback!($crate::rpc::eth::EthGetTransactionCountV2);
139        $callback!($crate::rpc::eth::EthGetTransactionHashByCid);
140        $callback!($crate::rpc::eth::EthGetTransactionByBlockNumberAndIndex);
141        $callback!($crate::rpc::eth::EthGetTransactionByBlockNumberAndIndexV2);
142        $callback!($crate::rpc::eth::EthGetTransactionByBlockHashAndIndex);
143        $callback!($crate::rpc::eth::EthMaxPriorityFeePerGas);
144        $callback!($crate::rpc::eth::EthProtocolVersion);
145        $callback!($crate::rpc::eth::EthGetTransactionReceipt);
146        $callback!($crate::rpc::eth::EthGetTransactionReceiptLimited);
147        $callback!($crate::rpc::eth::EthNewFilter);
148        $callback!($crate::rpc::eth::EthNewPendingTransactionFilter);
149        $callback!($crate::rpc::eth::EthNewBlockFilter);
150        $callback!($crate::rpc::eth::EthUninstallFilter);
151        $callback!($crate::rpc::eth::EthUnsubscribe);
152        $callback!($crate::rpc::eth::EthSubscribe);
153        $callback!($crate::rpc::eth::EthSyncing);
154        $callback!($crate::rpc::eth::EthTraceBlock);
155        $callback!($crate::rpc::eth::EthTraceBlockV2);
156        $callback!($crate::rpc::eth::EthTraceCall);
157        $callback!($crate::rpc::eth::EthTraceFilter);
158        $callback!($crate::rpc::eth::EthTraceFilterV2);
159        $callback!($crate::rpc::eth::EthTraceTransaction);
160        $callback!($crate::rpc::eth::EthTraceReplayBlockTransactions);
161        $callback!($crate::rpc::eth::EthTraceReplayBlockTransactionsV2);
162        $callback!($crate::rpc::eth::Web3ClientVersion);
163        $callback!($crate::rpc::eth::EthSendRawTransaction);
164        $callback!($crate::rpc::eth::EthSendRawTransactionUntrusted);
165
166        // gas vertical
167        $callback!($crate::rpc::gas::GasEstimateFeeCap);
168        $callback!($crate::rpc::gas::GasEstimateGasLimit);
169        $callback!($crate::rpc::gas::GasEstimateGasPremium);
170        $callback!($crate::rpc::gas::GasEstimateMessageGas);
171
172        // market vertical
173        $callback!($crate::rpc::market::MarketAddBalance);
174
175        // miner vertical
176        $callback!($crate::rpc::miner::MinerCreateBlock);
177        $callback!($crate::rpc::miner::MinerGetBaseInfo);
178
179        // mpool vertical
180        $callback!($crate::rpc::mpool::MpoolBatchPush);
181        $callback!($crate::rpc::mpool::MpoolBatchPushUntrusted);
182        $callback!($crate::rpc::mpool::MpoolGetNonce);
183        $callback!($crate::rpc::mpool::MpoolPending);
184        $callback!($crate::rpc::mpool::MpoolPush);
185        $callback!($crate::rpc::mpool::MpoolPushMessage);
186        $callback!($crate::rpc::mpool::MpoolPushUntrusted);
187        $callback!($crate::rpc::mpool::MpoolSelect);
188
189        // msig vertical
190        $callback!($crate::rpc::msig::MsigGetAvailableBalance);
191        $callback!($crate::rpc::msig::MsigGetPending);
192        $callback!($crate::rpc::msig::MsigGetVested);
193        $callback!($crate::rpc::msig::MsigGetVestingSchedule);
194
195        // net vertical
196        $callback!($crate::rpc::net::NetAddrsListen);
197        $callback!($crate::rpc::net::NetAgentVersion);
198        $callback!($crate::rpc::net::NetAutoNatStatus);
199        $callback!($crate::rpc::net::NetConnect);
200        $callback!($crate::rpc::net::NetDisconnect);
201        $callback!($crate::rpc::net::NetFindPeer);
202        $callback!($crate::rpc::net::NetInfo);
203        $callback!($crate::rpc::net::NetListening);
204        $callback!($crate::rpc::net::NetPeers);
205        $callback!($crate::rpc::net::NetProtectAdd);
206        $callback!($crate::rpc::net::NetProtectList);
207        $callback!($crate::rpc::net::NetProtectRemove);
208        $callback!($crate::rpc::net::NetVersion);
209
210        // node vertical
211        $callback!($crate::rpc::node::NodeStatus);
212
213        // state vertical
214        $callback!($crate::rpc::state::StateAccountKey);
215        $callback!($crate::rpc::state::StateCall);
216        $callback!($crate::rpc::state::StateCirculatingSupply);
217        $callback!($crate::rpc::state::ForestStateCompute);
218        $callback!($crate::rpc::state::StateCompute);
219        $callback!($crate::rpc::state::StateDealProviderCollateralBounds);
220        $callback!($crate::rpc::state::StateFetchRoot);
221        $callback!($crate::rpc::state::StateGetActor);
222        $callback!($crate::rpc::state::StateGetActorV2);
223        $callback!($crate::rpc::state::StateGetID);
224        $callback!($crate::rpc::state::StateGetAllAllocations);
225        $callback!($crate::rpc::state::StateGetAllClaims);
226        $callback!($crate::rpc::state::StateGetAllocation);
227        $callback!($crate::rpc::state::StateGetAllocationForPendingDeal);
228        $callback!($crate::rpc::state::StateGetAllocationIdForPendingDeal);
229        $callback!($crate::rpc::state::StateGetAllocations);
230        $callback!($crate::rpc::state::StateGetBeaconEntry);
231        $callback!($crate::rpc::state::StateGetClaim);
232        $callback!($crate::rpc::state::StateGetClaims);
233        $callback!($crate::rpc::state::StateGetNetworkParams);
234        $callback!($crate::rpc::state::StateGetRandomnessDigestFromBeacon);
235        $callback!($crate::rpc::state::StateGetRandomnessDigestFromTickets);
236        $callback!($crate::rpc::state::StateGetRandomnessFromBeacon);
237        $callback!($crate::rpc::state::StateGetRandomnessFromTickets);
238        $callback!($crate::rpc::state::StateGetReceipt);
239        $callback!($crate::rpc::state::StateListActors);
240        $callback!($crate::rpc::state::StateListMessages);
241        $callback!($crate::rpc::state::StateListMiners);
242        $callback!($crate::rpc::state::StateLookupID);
243        $callback!($crate::rpc::state::StateLookupRobustAddress);
244        $callback!($crate::rpc::state::StateMarketBalance);
245        $callback!($crate::rpc::state::StateMarketDeals);
246        $callback!($crate::rpc::state::StateMarketParticipants);
247        $callback!($crate::rpc::state::StateMarketStorageDeal);
248        $callback!($crate::rpc::state::StateMinerActiveSectors);
249        $callback!($crate::rpc::state::StateMinerAllocated);
250        $callback!($crate::rpc::state::StateMinerAvailableBalance);
251        $callback!($crate::rpc::state::StateMinerDeadlines);
252        $callback!($crate::rpc::state::StateMinerFaults);
253        $callback!($crate::rpc::state::StateMinerInfo);
254        $callback!($crate::rpc::state::StateMinerInitialPledgeCollateral);
255        $callback!($crate::rpc::state::StateMinerPartitions);
256        $callback!($crate::rpc::state::StateMinerPower);
257        $callback!($crate::rpc::state::StateMinerPreCommitDepositForPower);
258        $callback!($crate::rpc::state::StateMinerProvingDeadline);
259        $callback!($crate::rpc::state::StateMinerRecoveries);
260        $callback!($crate::rpc::state::StateMinerSectorAllocated);
261        $callback!($crate::rpc::state::StateMinerSectorCount);
262        $callback!($crate::rpc::state::StateMinerSectors);
263        $callback!($crate::rpc::state::StateNetworkName);
264        $callback!($crate::rpc::state::StateNetworkVersion);
265        $callback!($crate::rpc::state::StateActorInfo);
266        $callback!($crate::rpc::state::StateReadState);
267        $callback!($crate::rpc::state::StateDecodeParams);
268        $callback!($crate::rpc::state::StateReplay);
269        $callback!($crate::rpc::state::StateSearchMsg);
270        $callback!($crate::rpc::state::StateSearchMsgLimited);
271        $callback!($crate::rpc::state::StateSectorExpiration);
272        $callback!($crate::rpc::state::StateSectorGetInfo);
273        $callback!($crate::rpc::state::StateSectorPartition);
274        $callback!($crate::rpc::state::StateSectorPreCommitInfo);
275        $callback!($crate::rpc::state::StateSectorPreCommitInfoV0);
276        $callback!($crate::rpc::state::StateVerifiedClientStatus);
277        $callback!($crate::rpc::state::StateVerifiedRegistryRootKey);
278        $callback!($crate::rpc::state::StateVerifierStatus);
279        $callback!($crate::rpc::state::StateVMCirculatingSupplyInternal);
280        $callback!($crate::rpc::state::StateWaitMsg);
281        $callback!($crate::rpc::state::StateWaitMsgV0);
282        $callback!($crate::rpc::state::StateMinerInitialPledgeForSector);
283
284        // sync vertical
285        $callback!($crate::rpc::sync::SyncCheckBad);
286        $callback!($crate::rpc::sync::SyncMarkBad);
287        $callback!($crate::rpc::sync::SyncSnapshotProgress);
288        $callback!($crate::rpc::sync::SyncStatus);
289        $callback!($crate::rpc::sync::SyncSubmitBlock);
290
291        // wallet vertical
292        $callback!($crate::rpc::wallet::WalletBalance);
293        $callback!($crate::rpc::wallet::WalletDefaultAddress);
294        $callback!($crate::rpc::wallet::WalletDelete);
295        $callback!($crate::rpc::wallet::WalletExport);
296        $callback!($crate::rpc::wallet::WalletHas);
297        $callback!($crate::rpc::wallet::WalletImport);
298        $callback!($crate::rpc::wallet::WalletList);
299        $callback!($crate::rpc::wallet::WalletNew);
300        $callback!($crate::rpc::wallet::WalletSetDefault);
301        $callback!($crate::rpc::wallet::WalletSign);
302        $callback!($crate::rpc::wallet::WalletSignMessage);
303        $callback!($crate::rpc::wallet::WalletValidateAddress);
304        $callback!($crate::rpc::wallet::WalletVerify);
305
306        // f3
307        $callback!($crate::rpc::f3::GetRawNetworkName);
308        $callback!($crate::rpc::f3::F3GetCertificate);
309        $callback!($crate::rpc::f3::F3GetECPowerTable);
310        $callback!($crate::rpc::f3::F3GetF3PowerTable);
311        $callback!($crate::rpc::f3::F3GetF3PowerTableByInstance);
312        $callback!($crate::rpc::f3::F3IsRunning);
313        $callback!($crate::rpc::f3::F3GetProgress);
314        $callback!($crate::rpc::f3::F3GetManifest);
315        $callback!($crate::rpc::f3::F3ListParticipants);
316        $callback!($crate::rpc::f3::F3GetLatestCertificate);
317        $callback!($crate::rpc::f3::F3GetOrRenewParticipationTicket);
318        $callback!($crate::rpc::f3::F3Participate);
319        $callback!($crate::rpc::f3::F3ExportLatestSnapshot);
320        $callback!($crate::rpc::f3::GetHead);
321        $callback!($crate::rpc::f3::GetParent);
322        $callback!($crate::rpc::f3::GetParticipatingMinerIDs);
323        $callback!($crate::rpc::f3::GetPowerTable);
324        $callback!($crate::rpc::f3::GetTipset);
325        $callback!($crate::rpc::f3::GetTipsetByEpoch);
326        $callback!($crate::rpc::f3::Finalize);
327        $callback!($crate::rpc::f3::ProtectPeer);
328        $callback!($crate::rpc::f3::SignMessage);
329
330        // misc
331        $callback!($crate::rpc::misc::GetActorEventsRaw);
332    };
333}
334pub(crate) use for_each_rpc_method;
335use sync::SnapshotProgressTracker;
336use tower_http::compression::CompressionLayer;
337use tower_http::sensitive_headers::SetSensitiveRequestHeadersLayer;
338
339#[allow(unused)]
340/// All handler definitions.
341///
342/// Usage guide:
343/// ```ignore
344/// use crate::rpc::{self, prelude::*};
345///
346/// let client = rpc::Client::from(..);
347/// ChainHead::call(&client, ()).await?;
348/// fn foo() -> rpc::ClientError {..}
349/// fn bar() -> rpc::ServerError {..}
350/// ```
351pub mod prelude {
352    use super::*;
353
354    pub use reflect::RpcMethodExt as _;
355
356    macro_rules! export {
357        ($ty:ty) => {
358            pub use $ty;
359        };
360    }
361
362    for_each_rpc_method!(export);
363}
364
365/// Collects all the RPC method names and permission available in the Forest
366pub fn collect_rpc_method_info() -> Vec<(&'static str, Permission)> {
367    use crate::rpc::RpcMethod;
368
369    let mut methods = Vec::new();
370
371    macro_rules! add_method {
372        ($ty:ty) => {
373            methods.push((<$ty>::NAME, <$ty>::PERMISSION));
374        };
375    }
376
377    for_each_rpc_method!(add_method);
378
379    methods
380}
381
382/// All the methods live in their own folder
383///
384/// # Handling types
385/// - If a `struct` or `enum` is only used in the RPC API, it should live in `src/rpc`.
386///   - If it is used in only one API vertical (i.e `auth` or `chain`), then it should live
387///     in either:
388///     - `src/rpc/methods/auth.rs` (if there are only a few).
389///     - `src/rpc/methods/auth/types.rs` (if there are so many that they would cause clutter).
390///   - If it is used _across_ API verticals, it should live in `src/rpc/types.rs`
391///
392/// # Interactions with the [`lotus_json`] APIs
393/// - Types may have fields which must go through [`LotusJson`],
394///   and MUST reflect that in their [`JsonSchema`].
395///   You have two options for this:
396///   - Use `#[attributes]` to control serialization and schema generation:
397///     ```ignore
398///     #[derive(Deserialize, Serialize, JsonSchema)]
399///     struct Foo {
400///         #[serde(with = "crate::lotus_json")] // perform the conversion
401///         #[schemars(with = "LotusJson<Cid>")] // advertise the schema to be converted
402///         cid: Cid, // use the native type in application logic
403///     }
404///     ```
405///   - Use [`LotusJson`] directly. This means that serialization and the [`JsonSchema`]
406///     will never go out of sync.
407///     ```ignore
408///     #[derive(Deserialize, Serialize, JsonSchema)]
409///     struct Foo {
410///         cid: LotusJson<Cid>, // use the shim type in application logic, manually performing conversions
411///     }
412///     ```
413///
414/// [`lotus_json`]: crate::lotus_json
415/// [`HasLotusJson`]: crate::lotus_json::HasLotusJson
416/// [`LotusJson`]: crate::lotus_json::LotusJson
417/// [`JsonSchema`]: schemars::JsonSchema
418mod methods {
419    pub mod auth;
420    pub mod beacon;
421    pub mod chain;
422    pub mod common;
423    pub mod eth;
424    pub mod f3;
425    pub mod gas;
426    pub mod market;
427    pub mod miner;
428    pub mod misc;
429    pub mod mpool;
430    pub mod msig;
431    pub mod net;
432    pub mod node;
433    pub mod state;
434    pub mod sync;
435    pub mod wallet;
436}
437
438use crate::rpc::auth_layer::AuthLayer;
439pub use crate::rpc::channel::CANCEL_METHOD_NAME;
440use crate::rpc::channel::RpcModule as FilRpcModule;
441use crate::rpc::eth::pubsub::EthPubSub;
442use crate::rpc::metrics_layer::MetricsLayer;
443use crate::{chain_sync::network_context::SyncNetworkContext, key_management::KeyStore};
444
445use crate::blocks::FullTipset;
446use fvm_ipld_blockstore::Blockstore;
447use jsonrpsee::{
448    Methods,
449    core::middleware::RpcServiceBuilder,
450    server::{RpcModule, Server, StopHandle, TowerServiceBuilder},
451};
452use parking_lot::RwLock;
453use std::env;
454use std::sync::{Arc, LazyLock};
455use std::time::Duration;
456use tokio::sync::mpsc;
457use tower::Service;
458
459use crate::rpc::sync::SnapshotProgressState;
460use openrpc_types::{self, ParamStructure};
461
462pub const DEFAULT_PORT: u16 = 2345;
463
464/// Request timeout read from environment variables
465static DEFAULT_REQUEST_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
466    env::var("FOREST_RPC_DEFAULT_TIMEOUT")
467        .ok()
468        .and_then(|it| Duration::from_secs(it.parse().ok()?).into())
469        .unwrap_or(Duration::from_secs(60))
470});
471
472/// Default maximum connections for the RPC server. This needs to be high enough to
473/// accommodate the regular usage for RPC providers.
474static DEFAULT_MAX_CONNECTIONS: LazyLock<u32> = LazyLock::new(|| {
475    env::var("FOREST_RPC_MAX_CONNECTIONS")
476        .ok()
477        .and_then(|it| it.parse().ok())
478        .unwrap_or(1000)
479});
480
481const MAX_REQUEST_BODY_SIZE: u32 = 64 * 1024 * 1024;
482const MAX_RESPONSE_BODY_SIZE: u32 = MAX_REQUEST_BODY_SIZE;
483
484/// This is where you store persistent data, or at least access to stateful
485/// data.
486pub struct RPCState<DB> {
487    pub keystore: Arc<RwLock<KeyStore>>,
488    pub state_manager: Arc<crate::state_manager::StateManager<DB>>,
489    pub mpool: Arc<crate::message_pool::MessagePool<crate::message_pool::MpoolRpcProvider<DB>>>,
490    pub bad_blocks: Option<Arc<crate::chain_sync::BadBlockCache>>,
491    pub msgs_in_tipset: Arc<crate::chain::store::MsgsInTipsetCache>,
492    pub sync_status: crate::chain_sync::SyncStatus,
493    pub eth_event_handler: Arc<EthEventHandler>,
494    pub sync_network_context: SyncNetworkContext<DB>,
495    pub tipset_send: flume::Sender<FullTipset>,
496    pub start_time: chrono::DateTime<chrono::Utc>,
497    pub snapshot_progress_tracker: SnapshotProgressTracker,
498    pub shutdown: mpsc::Sender<()>,
499}
500
501impl<DB: Blockstore> RPCState<DB> {
502    pub fn beacon(&self) -> &Arc<crate::beacon::BeaconSchedule> {
503        self.state_manager.beacon_schedule()
504    }
505
506    pub fn chain_store(&self) -> &Arc<crate::chain::ChainStore<DB>> {
507        self.state_manager.chain_store()
508    }
509
510    pub fn chain_index(&self) -> &Arc<crate::chain::index::ChainIndex<Arc<DB>>> {
511        self.chain_store().chain_index()
512    }
513
514    pub fn chain_config(&self) -> &Arc<crate::networks::ChainConfig> {
515        self.state_manager.chain_config()
516    }
517
518    pub fn store(&self) -> &Arc<DB> {
519        self.chain_store().blockstore()
520    }
521
522    pub fn store_owned(&self) -> Arc<DB> {
523        self.state_manager.blockstore_owned()
524    }
525
526    pub fn network_send(&self) -> &flume::Sender<crate::libp2p::NetworkMessage> {
527        self.sync_network_context.network_send()
528    }
529
530    pub fn get_snapshot_progress_tracker(&self) -> SnapshotProgressState {
531        self.snapshot_progress_tracker.state()
532    }
533}
534
535#[derive(Clone)]
536struct PerConnection<RpcMiddleware, HttpMiddleware> {
537    stop_handle: StopHandle,
538    svc_builder: TowerServiceBuilder<RpcMiddleware, HttpMiddleware>,
539    keystore: Arc<RwLock<KeyStore>>,
540}
541
542pub async fn start_rpc<DB>(
543    state: RPCState<DB>,
544    rpc_listener: tokio::net::TcpListener,
545    stop_handle: StopHandle,
546    filter_list: Option<FilterList>,
547) -> anyhow::Result<()>
548where
549    DB: Blockstore + Send + Sync + 'static,
550{
551    let filter_list = filter_list.unwrap_or_default();
552    // `Arc` is needed because we will share the state between two modules
553    let state = Arc::new(state);
554    let keystore = state.keystore.clone();
555    let mut modules = create_modules(state.clone());
556
557    let mut pubsub_module = FilRpcModule::default();
558    pubsub_module.register_channel("Filecoin.ChainNotify", {
559        let state_clone = state.clone();
560        move |params| chain::chain_notify(params, &state_clone)
561    })?;
562
563    for module in modules.values_mut() {
564        // register eth subscription APIs
565        module.merge(EthPubSub::new(state.clone()).into_rpc())?;
566        module.merge(pubsub_module.clone())?;
567    }
568
569    let methods: Arc<HashMap<ApiPaths, Methods>> =
570        Arc::new(modules.into_iter().map(|(k, v)| (k, v.into())).collect());
571
572    let per_conn = PerConnection {
573        stop_handle: stop_handle.clone(),
574        svc_builder: Server::builder()
575            .set_config(
576                ServerConfig::builder()
577                    // Default size (10 MiB) is not enough for methods like `Filecoin.StateMinerActiveSectors`
578                    .max_request_body_size(MAX_REQUEST_BODY_SIZE)
579                    .max_response_body_size(MAX_RESPONSE_BODY_SIZE)
580                    .max_connections(*DEFAULT_MAX_CONNECTIONS)
581                    .set_id_provider(RandomHexStringIdProvider::new())
582                    .build(),
583            )
584            .set_http_middleware(
585                tower::ServiceBuilder::new()
586                    .layer(CompressionLayer::new())
587                    // Mark the `Authorization` request header as sensitive so it doesn't show in logs
588                    .layer(SetSensitiveRequestHeadersLayer::new(std::iter::once(
589                        http::header::AUTHORIZATION,
590                    ))),
591            )
592            .to_service_builder(),
593        keystore,
594    };
595    tracing::info!("Ready for RPC connections");
596    loop {
597        let sock = tokio::select! {
598        res = rpc_listener.accept() => {
599            match res {
600              Ok((stream, _remote_addr)) => stream,
601              Err(e) => {
602                tracing::error!("failed to accept v4 connection: {:?}", e);
603                continue;
604              }
605            }
606          }
607          _ = per_conn.stop_handle.clone().shutdown() => break,
608        };
609
610        let svc = tower::service_fn({
611            let methods = methods.clone();
612            let per_conn = per_conn.clone();
613            let filter_list = filter_list.clone();
614            move |req| {
615                let is_websocket = jsonrpsee::server::ws::is_upgrade_request(&req);
616                let path = if let Ok(p) = ApiPaths::from_uri(req.uri()) {
617                    p
618                } else {
619                    return async move {
620                        Ok(http::Response::builder()
621                            .status(http::StatusCode::NOT_FOUND)
622                            .body(Default::default())
623                            .unwrap_or_else(|_| http::Response::new(Default::default())))
624                    }
625                    .boxed();
626                };
627                let methods = methods.get(&path).cloned().unwrap_or_default();
628                let PerConnection {
629                    stop_handle,
630                    svc_builder,
631                    keystore,
632                } = per_conn.clone();
633                // NOTE, the rpc middleware must be initialized here to be able to be created once per connection
634                // with data from the connection such as the headers in this example
635                let headers = req.headers().clone();
636                let rpc_middleware = RpcServiceBuilder::new()
637                    .layer(SetExtensionLayer { path })
638                    .layer(SegregationLayer)
639                    .layer(FilterLayer::new(filter_list.clone()))
640                    .layer(validation_layer::JsonValidationLayer)
641                    .layer(AuthLayer {
642                        headers,
643                        keystore: keystore.clone(),
644                    })
645                    .layer(LogLayer::default())
646                    .layer(MetricsLayer::default());
647                let mut jsonrpsee_svc = svc_builder
648                    .set_rpc_middleware(rpc_middleware)
649                    .build(methods, stop_handle);
650
651                if is_websocket {
652                    // Utilize the session close future to know when the actual WebSocket
653                    // session was closed.
654                    let session_close = jsonrpsee_svc.on_session_closed();
655
656                    // A little bit weird API but the response to HTTP request must be returned below
657                    // and we spawn a task to register when the session is closed.
658                    tokio::spawn(async move {
659                        session_close.await;
660                        tracing::trace!("Closed WebSocket connection");
661                    });
662
663                    async move {
664                        tracing::trace!("Opened WebSocket connection");
665                        // https://github.com/rust-lang/rust/issues/102211 the error type can't be inferred
666                        // to be `Box<dyn std::error::Error + Send + Sync>` so we need to convert it to a concrete type
667                        // as workaround.
668                        jsonrpsee_svc
669                            .call(req)
670                            .await
671                            .map_err(|e| anyhow::anyhow!("{:?}", e))
672                    }
673                    .boxed()
674                } else {
675                    // HTTP.
676                    async move {
677                        tracing::trace!("Opened HTTP connection");
678                        let rp = jsonrpsee_svc.call(req).await;
679                        tracing::trace!("Closed HTTP connection");
680                        // https://github.com/rust-lang/rust/issues/102211 the error type can't be inferred
681                        // to be `Box<dyn std::error::Error + Send + Sync>` so we need to convert it to a concrete type
682                        // as workaround.
683                        rp.map_err(|e| anyhow::anyhow!("{:?}", e))
684                    }
685                    .boxed()
686                }
687            }
688        });
689
690        tokio::spawn(jsonrpsee::server::serve_with_graceful_shutdown(
691            sock,
692            svc,
693            stop_handle.clone().shutdown(),
694        ));
695    }
696
697    Ok(())
698}
699
700fn create_modules<DB>(state: Arc<RPCState<DB>>) -> HashMap<ApiPaths, RpcModule<RPCState<DB>>>
701where
702    DB: Blockstore + Send + Sync + 'static,
703{
704    let mut modules = HashMap::default();
705    for api_version in ApiPaths::value_variants() {
706        modules.insert(*api_version, RpcModule::from_arc(state.clone()));
707    }
708    macro_rules! register {
709        ($ty:ty) => {
710            // Register only non-subscription RPC methods.
711            // Subscription methods are registered separately in the RPC module.
712            if !<$ty>::SUBSCRIPTION {
713                <$ty>::register(&mut modules, ParamStructure::ByPosition).unwrap();
714            }
715        };
716    }
717    for_each_rpc_method!(register);
718    modules
719}
720
721/// If `include` is not [`None`], only methods that are listed will be returned
722pub fn openrpc(path: ApiPaths, include: Option<&[&str]>) -> openrpc_types::OpenRPC {
723    use schemars::generate::{SchemaGenerator, SchemaSettings};
724
725    let mut methods = vec![];
726    // spec says draft07
727    let mut settings = SchemaSettings::draft07();
728    // ..but uses `components`
729    settings.definitions_path = "#/components/schemas/".into();
730    let mut generator = SchemaGenerator::new(settings);
731    macro_rules! callback {
732        ($ty:ty) => {
733            if <$ty>::API_PATHS.contains(path) {
734                match include {
735                    Some(include) => match include.contains(&<$ty>::NAME) {
736                        true => {
737                            methods.push(openrpc_types::ReferenceOr::Item(<$ty>::openrpc(
738                                &mut generator,
739                                ParamStructure::ByPosition,
740                                &<$ty>::NAME,
741                            )));
742                            if let Some(alias) = &<$ty>::NAME_ALIAS {
743                                methods.push(openrpc_types::ReferenceOr::Item(<$ty>::openrpc(
744                                    &mut generator,
745                                    ParamStructure::ByPosition,
746                                    &alias,
747                                )));
748                            }
749                        }
750                        false => {}
751                    },
752                    None => {
753                        methods.push(openrpc_types::ReferenceOr::Item(<$ty>::openrpc(
754                            &mut generator,
755                            ParamStructure::ByPosition,
756                            &<$ty>::NAME,
757                        )));
758                        if let Some(alias) = &<$ty>::NAME_ALIAS {
759                            methods.push(openrpc_types::ReferenceOr::Item(<$ty>::openrpc(
760                                &mut generator,
761                                ParamStructure::ByPosition,
762                                &alias,
763                            )));
764                        }
765                    }
766                }
767            }
768        };
769    }
770    for_each_rpc_method!(callback);
771    openrpc_types::OpenRPC {
772        methods,
773        components: Some(openrpc_types::Components {
774            schemas: Some(
775                generator
776                    .take_definitions(false)
777                    .into_iter()
778                    .filter_map(|(k, v)| {
779                        if let Ok(v) = Schema::try_from(v) {
780                            Some((k, v))
781                        } else {
782                            None
783                        }
784                    })
785                    .collect(),
786            ),
787            ..Default::default()
788        }),
789        openrpc: openrpc_types::OPEN_RPC_SPECIFICATION_VERSION,
790        info: openrpc_types::Info {
791            title: String::from("forest"),
792            version: env!("CARGO_PKG_VERSION").into(),
793            ..Default::default()
794        },
795        ..Default::default()
796    }
797}
798
799#[cfg(test)]
800mod tests {
801    use super::*;
802    use crate::{
803        db::MemoryDB, networks::NetworkChain, rpc::common::ShiftingVersion,
804        tool::offline_server::server::offline_rpc_state,
805    };
806    use jsonrpsee::server::stop_channel;
807    use std::net::{Ipv4Addr, SocketAddr};
808    use tokio::task::JoinSet;
809
810    // To update RPC specs:
811    // `cargo test --lib -- rpc::tests::openrpc`
812    // `cargo insta review`
813
814    #[test]
815    fn openrpc_v0() {
816        openrpc(ApiPaths::V0);
817    }
818
819    #[test]
820    fn openrpc_v1() {
821        openrpc(ApiPaths::V1);
822    }
823
824    #[test]
825    fn openrpc_v2() {
826        openrpc(ApiPaths::V2);
827    }
828
829    fn openrpc(path: ApiPaths) {
830        let spec = super::openrpc(path, None);
831        insta::assert_yaml_snapshot!(path.path(), spec);
832    }
833
834    #[test]
835    fn test_rpc_server() {
836        const TIMEOUT: Duration = Duration::from_secs(5);
837        let (done_tx, done_rx) = flume::bounded(1);
838        let rt = tokio::runtime::Builder::new_multi_thread()
839            .enable_all()
840            .build()
841            .unwrap();
842        rt.block_on(async move { test_rpc_server_inner(done_tx).await });
843        done_rx.recv().unwrap();
844        // To mitigate the transient timeout issue
845        rt.shutdown_timeout(TIMEOUT);
846    }
847
848    async fn test_rpc_server_inner(done_tx: flume::Sender<()>) {
849        let chain = NetworkChain::Calibnet;
850        let db = Arc::new(MemoryDB::default());
851        let mut services = JoinSet::new();
852        let (state, mut shutdown_recv) = offline_rpc_state(chain, db, None, None, &mut services)
853            .await
854            .unwrap();
855        let block_delay_secs = state.chain_config().block_delay_secs;
856        let shutdown_send = state.shutdown.clone();
857        let jwt_read_permissions = vec!["read".to_owned()];
858        let jwt_read = super::methods::auth::AuthNew::create_token(
859            &state.keystore.read(),
860            chrono::Duration::hours(1),
861            jwt_read_permissions.clone(),
862        )
863        .unwrap();
864        let rpc_listener =
865            tokio::net::TcpListener::bind(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0))
866                .await
867                .unwrap();
868        let rpc_address = rpc_listener.local_addr().unwrap();
869        let (stop_handle, server_handle) = stop_channel();
870
871        // Start an RPC server
872
873        let handle = tokio::spawn(start_rpc(state, rpc_listener, stop_handle, None));
874
875        println!("sending a few http requests");
876
877        let client = Client::from_url(
878            format!("http://{}:{}/", rpc_address.ip(), rpc_address.port())
879                .parse()
880                .unwrap(),
881        );
882
883        let response = super::methods::common::Version::call(&client, ())
884            .await
885            .unwrap();
886        assert_eq!(
887            &response.version,
888            &*crate::utils::version::FOREST_VERSION_STRING
889        );
890        assert_eq!(response.block_delay, block_delay_secs);
891        assert_eq!(response.api_version, ShiftingVersion::new(2, 3, 0));
892
893        let response = super::methods::auth::AuthVerify::call(&client, (jwt_read.clone(),))
894            .await
895            .unwrap();
896        assert_eq!(response, jwt_read_permissions);
897
898        drop(client);
899
900        println!("sending a few websocket requests");
901
902        let client = Client::from_url(
903            format!("ws://{}:{}/", rpc_address.ip(), rpc_address.port())
904                .parse()
905                .unwrap(),
906        );
907
908        let response = super::methods::auth::AuthVerify::call(&client, (jwt_read,))
909            .await
910            .unwrap();
911        assert_eq!(response, jwt_read_permissions);
912
913        drop(client);
914
915        // Gracefully shutdown the RPC server
916        println!("sending shutdown signal");
917        shutdown_send.send(()).await.unwrap();
918        println!("waiting on shutdown receiver");
919        shutdown_recv.recv().await;
920        println!("sending server stop signal");
921        server_handle.stop().unwrap();
922        println!("waiting on graceful shutdown");
923        handle.await.unwrap().unwrap();
924        println!("done");
925        done_tx.send(()).unwrap();
926    }
927}