Skip to main content

forest/rpc/
mod.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use crate::db::EthMappingsStore;
5use crate::rpc::methods::eth::pubsub_trait::EthPubSubApiServer;
6mod auth_layer;
7mod channel;
8mod client;
9mod compression_layer;
10mod filter_layer;
11mod filter_list;
12pub mod json_validator;
13mod log_layer;
14mod metrics_layer;
15mod request;
16mod segregation_layer;
17mod set_extension_layer;
18mod validation_layer;
19
20use crate::rpc::eth::types::RandomHexStringIdProvider;
21use crate::shim::clock::ChainEpoch;
22use clap::ValueEnum as _;
23pub use client::Client;
24pub use error::ServerError;
25use eth::filter::EthEventHandler;
26use filter_layer::FilterLayer;
27pub use filter_list::FilterList;
28use futures::FutureExt as _;
29use jsonrpsee::server::ServerConfig;
30use log_layer::LogLayer;
31use reflect::Ctx;
32pub use reflect::{ApiPaths, Permission, RpcMethod, RpcMethodExt};
33pub use request::Request;
34use schemars::Schema;
35use segregation_layer::SegregationLayer;
36use set_extension_layer::SetExtensionLayer;
37mod error;
38mod reflect;
39use ahash::HashMap;
40mod registry;
41pub mod types;
42
43pub use methods::*;
44
45/// Protocol or transport-specific error
46pub use jsonrpsee::core::ClientError;
47
48/// Sentinel value, indicating no limit on how far back to search in the chain (all the way to genesis epoch).
49pub const LOOKBACK_NO_LIMIT: ChainEpoch = -1;
50
51/// The macro `callback` will be passed in each type that implements
52/// [`RpcMethod`].
53///
54/// This is a macro because there is no way to abstract the `ARITY` on that
55/// trait.
56///
57/// All methods should be entered here.
58#[macro_export]
59macro_rules! for_each_rpc_method {
60    ($callback:path) => {
61        // auth vertical
62        $callback!($crate::rpc::auth::AuthNew);
63        $callback!($crate::rpc::auth::AuthVerify);
64
65        // beacon vertical
66        $callback!($crate::rpc::beacon::BeaconGetEntry);
67
68        // chain vertical
69        $callback!($crate::rpc::chain::ChainPruneSnapshot);
70        $callback!($crate::rpc::chain::ChainExport);
71        $callback!($crate::rpc::chain::ChainGetBlock);
72        $callback!($crate::rpc::chain::ChainGetBlockMessages);
73        $callback!($crate::rpc::chain::ChainGetEvents);
74        $callback!($crate::rpc::chain::ChainGetGenesis);
75        $callback!($crate::rpc::chain::ChainGetFinalizedTipset);
76        $callback!($crate::rpc::chain::ChainGetMessage);
77        $callback!($crate::rpc::chain::ChainGetMessagesInTipset);
78        $callback!($crate::rpc::chain::ChainGetMinBaseFee);
79        $callback!($crate::rpc::chain::ChainGetParentMessages);
80        $callback!($crate::rpc::chain::ChainGetParentReceipts);
81        $callback!($crate::rpc::chain::ChainGetPath);
82        $callback!($crate::rpc::chain::ChainGetTipSet);
83        $callback!($crate::rpc::chain::ChainGetTipSetV2);
84        $callback!($crate::rpc::chain::ChainGetTipSetFinalityStatus);
85        $callback!($crate::rpc::chain::ChainGetTipSetAfterHeight);
86        $callback!($crate::rpc::chain::ChainGetTipSetByHeight);
87        $callback!($crate::rpc::chain::ChainHasObj);
88        $callback!($crate::rpc::chain::ChainHead);
89        $callback!($crate::rpc::chain::ChainReadObj);
90        $callback!($crate::rpc::chain::ChainSetHead);
91        $callback!($crate::rpc::chain::ChainStatObj);
92        $callback!($crate::rpc::chain::ChainTipSetWeight);
93        $callback!($crate::rpc::chain::ForestChainExport);
94        $callback!($crate::rpc::chain::ForestChainExportDiff);
95        $callback!($crate::rpc::chain::ForestChainExportStatus);
96        $callback!($crate::rpc::chain::ForestChainExportCancel);
97        $callback!($crate::rpc::chain::ChainGetTipsetByParentState);
98
99        // common vertical
100        $callback!($crate::rpc::common::Session);
101        $callback!($crate::rpc::common::Shutdown);
102        $callback!($crate::rpc::common::StartTime);
103        $callback!($crate::rpc::common::Version);
104
105        // eth vertical
106        $callback!($crate::rpc::eth::EthAccounts);
107        $callback!($crate::rpc::eth::EthAddressToFilecoinAddress);
108        $callback!($crate::rpc::eth::FilecoinAddressToEthAddress);
109        $callback!($crate::rpc::eth::EthBlockNumber);
110        $callback!($crate::rpc::eth::EthCall);
111        $callback!($crate::rpc::eth::EthChainId);
112        $callback!($crate::rpc::eth::EthEstimateGas);
113        $callback!($crate::rpc::eth::EthFeeHistory);
114        $callback!($crate::rpc::eth::EthGasPrice);
115        $callback!($crate::rpc::eth::EthGetBalance);
116        $callback!($crate::rpc::eth::EthGetBlockByHash);
117        $callback!($crate::rpc::eth::EthGetBlockByNumber);
118        $callback!($crate::rpc::eth::EthGetBlockReceipts);
119        $callback!($crate::rpc::eth::EthGetBlockReceiptsLimited);
120        $callback!($crate::rpc::eth::EthGetBlockTransactionCountByHash);
121        $callback!($crate::rpc::eth::EthGetBlockTransactionCountByNumber);
122        $callback!($crate::rpc::eth::EthGetCode);
123        $callback!($crate::rpc::eth::EthGetLogs);
124        $callback!($crate::rpc::eth::EthGetFilterLogs);
125        $callback!($crate::rpc::eth::EthGetFilterChanges);
126        $callback!($crate::rpc::eth::EthGetMessageCidByTransactionHash);
127        $callback!($crate::rpc::eth::EthGetStorageAt);
128        $callback!($crate::rpc::eth::EthGetTransactionByHash);
129        $callback!($crate::rpc::eth::EthGetTransactionByHashLimited);
130        $callback!($crate::rpc::eth::EthGetTransactionCount);
131        $callback!($crate::rpc::eth::EthGetTransactionHashByCid);
132        $callback!($crate::rpc::eth::EthGetTransactionByBlockNumberAndIndex);
133        $callback!($crate::rpc::eth::EthGetTransactionByBlockHashAndIndex);
134        $callback!($crate::rpc::eth::EthMaxPriorityFeePerGas);
135        $callback!($crate::rpc::eth::EthProtocolVersion);
136        $callback!($crate::rpc::eth::EthGetTransactionReceipt);
137        $callback!($crate::rpc::eth::EthGetTransactionReceiptLimited);
138        $callback!($crate::rpc::eth::EthNewFilter);
139        $callback!($crate::rpc::eth::EthNewPendingTransactionFilter);
140        $callback!($crate::rpc::eth::EthNewBlockFilter);
141        $callback!($crate::rpc::eth::EthUninstallFilter);
142        $callback!($crate::rpc::eth::EthUnsubscribe);
143        $callback!($crate::rpc::eth::EthSubscribe);
144        $callback!($crate::rpc::eth::EthSyncing);
145        $callback!($crate::rpc::eth::EthTraceBlock);
146        $callback!($crate::rpc::eth::EthTraceCall);
147        $callback!($crate::rpc::eth::EthTraceFilter);
148        $callback!($crate::rpc::eth::EthTraceTransaction);
149        $callback!($crate::rpc::eth::EthDebugTraceTransaction);
150        $callback!($crate::rpc::eth::EthTraceReplayBlockTransactions);
151        $callback!($crate::rpc::eth::Web3ClientVersion);
152        $callback!($crate::rpc::eth::EthSendRawTransaction);
153        $callback!($crate::rpc::eth::EthSendRawTransactionUntrusted);
154
155        // gas vertical
156        $callback!($crate::rpc::gas::GasEstimateFeeCap);
157        $callback!($crate::rpc::gas::GasEstimateGasLimit);
158        $callback!($crate::rpc::gas::GasEstimateGasPremium);
159        $callback!($crate::rpc::gas::GasEstimateMessageGas);
160
161        // market vertical
162        $callback!($crate::rpc::market::MarketAddBalance);
163
164        // miner vertical
165        $callback!($crate::rpc::miner::MinerCreateBlock);
166        $callback!($crate::rpc::miner::MinerGetBaseInfo);
167
168        // mpool vertical
169        $callback!($crate::rpc::mpool::MpoolBatchPush);
170        $callback!($crate::rpc::mpool::MpoolBatchPushUntrusted);
171        $callback!($crate::rpc::mpool::MpoolGetNonce);
172        $callback!($crate::rpc::mpool::MpoolPending);
173        $callback!($crate::rpc::mpool::MpoolPush);
174        $callback!($crate::rpc::mpool::MpoolPushMessage);
175        $callback!($crate::rpc::mpool::MpoolPushUntrusted);
176        $callback!($crate::rpc::mpool::MpoolSelect);
177
178        // msig vertical
179        $callback!($crate::rpc::msig::MsigGetAvailableBalance);
180        $callback!($crate::rpc::msig::MsigGetPending);
181        $callback!($crate::rpc::msig::MsigGetVested);
182        $callback!($crate::rpc::msig::MsigGetVestingSchedule);
183
184        // net vertical
185        $callback!($crate::rpc::net::NetAddrsListen);
186        $callback!($crate::rpc::net::NetAgentVersion);
187        $callback!($crate::rpc::net::NetAutoNatStatus);
188        $callback!($crate::rpc::net::NetConnect);
189        $callback!($crate::rpc::net::NetDisconnect);
190        $callback!($crate::rpc::net::NetFindPeer);
191        $callback!($crate::rpc::net::NetInfo);
192        $callback!($crate::rpc::net::NetListening);
193        $callback!($crate::rpc::net::NetPeers);
194        $callback!($crate::rpc::net::NetProtectAdd);
195        $callback!($crate::rpc::net::NetProtectList);
196        $callback!($crate::rpc::net::NetProtectRemove);
197        $callback!($crate::rpc::net::NetVersion);
198        $callback!($crate::rpc::net::NetChainExchange);
199
200        // node vertical
201        $callback!($crate::rpc::node::NodeStatus);
202
203        // state vertical
204        $callback!($crate::rpc::state::StateAccountKey);
205        $callback!($crate::rpc::state::StateCall);
206        $callback!($crate::rpc::state::StateCirculatingSupply);
207        $callback!($crate::rpc::state::ForestStateCompute);
208        $callback!($crate::rpc::state::StateCompute);
209        $callback!($crate::rpc::state::StateDealProviderCollateralBounds);
210        $callback!($crate::rpc::state::StateFetchRoot);
211        $callback!($crate::rpc::state::StateGetActor);
212        $callback!($crate::rpc::state::StateGetActorV2);
213        $callback!($crate::rpc::state::StateGetID);
214        $callback!($crate::rpc::state::StateGetAllAllocations);
215        $callback!($crate::rpc::state::StateGetAllClaims);
216        $callback!($crate::rpc::state::StateGetAllocation);
217        $callback!($crate::rpc::state::StateGetAllocationForPendingDeal);
218        $callback!($crate::rpc::state::StateGetAllocationIdForPendingDeal);
219        $callback!($crate::rpc::state::StateGetAllocations);
220        $callback!($crate::rpc::state::StateGetBeaconEntry);
221        $callback!($crate::rpc::state::StateGetClaim);
222        $callback!($crate::rpc::state::StateGetClaims);
223        $callback!($crate::rpc::state::StateGetNetworkParams);
224        $callback!($crate::rpc::state::StateGetRandomnessDigestFromBeacon);
225        $callback!($crate::rpc::state::StateGetRandomnessDigestFromTickets);
226        $callback!($crate::rpc::state::StateGetRandomnessFromBeacon);
227        $callback!($crate::rpc::state::StateGetRandomnessFromTickets);
228        $callback!($crate::rpc::state::StateGetReceipt);
229        $callback!($crate::rpc::state::StateListActors);
230        $callback!($crate::rpc::state::StateListMessages);
231        $callback!($crate::rpc::state::StateListMiners);
232        $callback!($crate::rpc::state::StateLookupID);
233        $callback!($crate::rpc::state::StateLookupRobustAddress);
234        $callback!($crate::rpc::state::StateMarketBalance);
235        $callback!($crate::rpc::state::StateMarketDeals);
236        $callback!($crate::rpc::state::StateMarketParticipants);
237        $callback!($crate::rpc::state::StateMarketStorageDeal);
238        $callback!($crate::rpc::state::StateMinerActiveSectors);
239        $callback!($crate::rpc::state::StateMinerAllocated);
240        $callback!($crate::rpc::state::StateMinerAvailableBalance);
241        $callback!($crate::rpc::state::StateMinerDeadlines);
242        $callback!($crate::rpc::state::StateMinerFaults);
243        $callback!($crate::rpc::state::StateMinerInfo);
244        $callback!($crate::rpc::state::StateMinerInitialPledgeCollateral);
245        $callback!($crate::rpc::state::StateMinerPartitions);
246        $callback!($crate::rpc::state::StateMinerPower);
247        $callback!($crate::rpc::state::StateMinerPreCommitDepositForPower);
248        $callback!($crate::rpc::state::StateMinerProvingDeadline);
249        $callback!($crate::rpc::state::StateMinerRecoveries);
250        $callback!($crate::rpc::state::StateMinerSectorAllocated);
251        $callback!($crate::rpc::state::StateMinerSectorCount);
252        $callback!($crate::rpc::state::StateMinerSectors);
253        $callback!($crate::rpc::state::StateNetworkName);
254        $callback!($crate::rpc::state::StateNetworkVersion);
255        $callback!($crate::rpc::state::StateActorInfo);
256        $callback!($crate::rpc::state::StateReadState);
257        $callback!($crate::rpc::state::StateDecodeParams);
258        $callback!($crate::rpc::state::StateReplay);
259        $callback!($crate::rpc::state::StateSearchMsg);
260        $callback!($crate::rpc::state::StateSearchMsgLimited);
261        $callback!($crate::rpc::state::StateSectorExpiration);
262        $callback!($crate::rpc::state::StateSectorGetInfo);
263        $callback!($crate::rpc::state::StateSectorPartition);
264        $callback!($crate::rpc::state::StateSectorPreCommitInfo);
265        $callback!($crate::rpc::state::StateSectorPreCommitInfoV0);
266        $callback!($crate::rpc::state::StateVerifiedClientStatus);
267        $callback!($crate::rpc::state::StateVerifiedRegistryRootKey);
268        $callback!($crate::rpc::state::StateVerifierStatus);
269        $callback!($crate::rpc::state::StateVMCirculatingSupplyInternal);
270        $callback!($crate::rpc::state::StateWaitMsg);
271        $callback!($crate::rpc::state::StateWaitMsgV0);
272        $callback!($crate::rpc::state::StateMinerInitialPledgeForSector);
273
274        // sync vertical
275        $callback!($crate::rpc::sync::SyncCheckBad);
276        $callback!($crate::rpc::sync::SyncMarkBad);
277        $callback!($crate::rpc::sync::SyncSnapshotProgress);
278        $callback!($crate::rpc::sync::SyncStatus);
279        $callback!($crate::rpc::sync::SyncSubmitBlock);
280
281        // wallet vertical
282        $callback!($crate::rpc::wallet::WalletBalance);
283        $callback!($crate::rpc::wallet::WalletDefaultAddress);
284        $callback!($crate::rpc::wallet::WalletDelete);
285        $callback!($crate::rpc::wallet::WalletExport);
286        $callback!($crate::rpc::wallet::WalletHas);
287        $callback!($crate::rpc::wallet::WalletImport);
288        $callback!($crate::rpc::wallet::WalletList);
289        $callback!($crate::rpc::wallet::WalletNew);
290        $callback!($crate::rpc::wallet::WalletSetDefault);
291        $callback!($crate::rpc::wallet::WalletSign);
292        $callback!($crate::rpc::wallet::WalletSignMessage);
293        $callback!($crate::rpc::wallet::WalletValidateAddress);
294        $callback!($crate::rpc::wallet::WalletVerify);
295
296        // f3
297        $callback!($crate::rpc::f3::GetRawNetworkName);
298        $callback!($crate::rpc::f3::F3GetCertificate);
299        $callback!($crate::rpc::f3::F3GetECPowerTable);
300        $callback!($crate::rpc::f3::F3GetF3PowerTable);
301        $callback!($crate::rpc::f3::F3GetF3PowerTableByInstance);
302        $callback!($crate::rpc::f3::F3IsRunning);
303        $callback!($crate::rpc::f3::F3GetProgress);
304        $callback!($crate::rpc::f3::F3GetManifest);
305        $callback!($crate::rpc::f3::F3ListParticipants);
306        $callback!($crate::rpc::f3::F3GetLatestCertificate);
307        $callback!($crate::rpc::f3::F3GetOrRenewParticipationTicket);
308        $callback!($crate::rpc::f3::F3Participate);
309        $callback!($crate::rpc::f3::F3ExportLatestSnapshot);
310        $callback!($crate::rpc::f3::GetHead);
311        $callback!($crate::rpc::f3::GetParent);
312        $callback!($crate::rpc::f3::GetParticipatingMinerIDs);
313        $callback!($crate::rpc::f3::GetPowerTable);
314        $callback!($crate::rpc::f3::GetTipset);
315        $callback!($crate::rpc::f3::GetTipsetByEpoch);
316        $callback!($crate::rpc::f3::Finalize);
317        $callback!($crate::rpc::f3::ProtectPeer);
318        $callback!($crate::rpc::f3::SignMessage);
319
320        // misc
321        $callback!($crate::rpc::misc::GetActorEventsRaw);
322    };
323}
324use compression_layer::{COMPRESS_MIN_BODY_SIZE, CompressionLayer};
325pub(crate) use for_each_rpc_method;
326use sync::SnapshotProgressTracker;
327use tower_http::sensitive_headers::SetSensitiveRequestHeadersLayer;
328
329#[allow(unused)]
330/// All handler definitions.
331///
332/// Usage guide:
333/// ```ignore
334/// use crate::rpc::{self, prelude::*};
335///
336/// let client = rpc::Client::from(..);
337/// ChainHead::call(&client, ()).await?;
338/// fn foo() -> rpc::ClientError {..}
339/// fn bar() -> rpc::ServerError {..}
340/// ```
341pub mod prelude {
342    use super::*;
343
344    pub use reflect::RpcMethodExt as _;
345
346    macro_rules! export {
347        ($ty:ty) => {
348            pub use $ty;
349        };
350    }
351
352    for_each_rpc_method!(export);
353}
354
355/// Collects all the RPC method names and permission available in the Forest
356pub fn collect_rpc_method_info() -> Vec<(&'static str, Permission)> {
357    use crate::rpc::RpcMethod;
358
359    let mut methods = Vec::new();
360
361    macro_rules! add_method {
362        ($ty:ty) => {
363            methods.push((<$ty>::NAME, <$ty>::PERMISSION));
364        };
365    }
366
367    for_each_rpc_method!(add_method);
368
369    methods
370}
371
372/// All the methods live in their own folder
373///
374/// # Handling types
375/// - If a `struct` or `enum` is only used in the RPC API, it should live in `src/rpc`.
376///   - If it is used in only one API vertical (i.e `auth` or `chain`), then it should live
377///     in either:
378///     - `src/rpc/methods/auth.rs` (if there are only a few).
379///     - `src/rpc/methods/auth/types.rs` (if there are so many that they would cause clutter).
380///   - If it is used _across_ API verticals, it should live in `src/rpc/types.rs`
381///
382/// # Interactions with the [`lotus_json`] APIs
383/// - Types may have fields which must go through [`LotusJson`],
384///   and MUST reflect that in their [`JsonSchema`].
385///   You have two options for this:
386///   - Use `#[attributes]` to control serialization and schema generation:
387///     ```ignore
388///     #[derive(Deserialize, Serialize, JsonSchema)]
389///     struct Foo {
390///         #[serde(with = "crate::lotus_json")] // perform the conversion
391///         #[schemars(with = "LotusJson<Cid>")] // advertise the schema to be converted
392///         cid: Cid, // use the native type in application logic
393///     }
394///     ```
395///   - Use [`LotusJson`] directly. This means that serialization and the [`JsonSchema`]
396///     will never go out of sync.
397///     ```ignore
398///     #[derive(Deserialize, Serialize, JsonSchema)]
399///     struct Foo {
400///         cid: LotusJson<Cid>, // use the shim type in application logic, manually performing conversions
401///     }
402///     ```
403///
404/// [`lotus_json`]: crate::lotus_json
405/// [`HasLotusJson`]: crate::lotus_json::HasLotusJson
406/// [`LotusJson`]: crate::lotus_json::LotusJson
407/// [`JsonSchema`]: schemars::JsonSchema
408mod methods {
409    pub mod auth;
410    pub mod beacon;
411    pub mod chain;
412    pub mod common;
413    pub mod eth;
414    pub mod f3;
415    pub mod gas;
416    pub mod market;
417    pub mod miner;
418    pub mod misc;
419    pub mod mpool;
420    pub mod msig;
421    pub mod net;
422    pub mod node;
423    pub mod state;
424    pub mod sync;
425    pub mod wallet;
426}
427
428use crate::rpc::auth_layer::AuthLayer;
429pub use crate::rpc::channel::CANCEL_METHOD_NAME;
430use crate::rpc::channel::RpcModule as FilRpcModule;
431use crate::rpc::eth::pubsub::EthPubSub;
432use crate::rpc::metrics_layer::MetricsLayer;
433use crate::{chain_sync::network_context::SyncNetworkContext, key_management::KeyStore};
434
435use crate::blocks::FullTipset;
436use crate::utils::misc::env::env_or_default;
437use fvm_ipld_blockstore::Blockstore;
438use jsonrpsee::{
439    Methods,
440    core::middleware::RpcServiceBuilder,
441    server::{RpcModule, Server, StopHandle, TowerServiceBuilder},
442};
443use parking_lot::RwLock;
444use std::env;
445use std::sync::{Arc, LazyLock};
446use std::time::Duration;
447use tokio::sync::mpsc;
448use tower::Service;
449
450use crate::rpc::sync::SnapshotProgressState;
451use openrpc_types::{self, ParamStructure};
452
453pub const DEFAULT_PORT: u16 = 2345;
454
455/// Request timeout read from environment variables
456static DEFAULT_REQUEST_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
457    env::var("FOREST_RPC_DEFAULT_TIMEOUT")
458        .ok()
459        .and_then(|it| Duration::from_secs(it.parse().ok()?).into())
460        .unwrap_or(Duration::from_secs(60))
461});
462
463/// Maximum concurrent connections accepted by the RPC server.
464///
465/// Configurable via `FOREST_RPC_MAX_CONNECTIONS`. The value also bounds the
466/// TCP listen backlog so that bursts of connection attempts do not get
467/// silently dropped by the kernel.
468pub fn default_max_connections() -> u32 {
469    static VALUE: LazyLock<u32> = LazyLock::new(|| {
470        env::var("FOREST_RPC_MAX_CONNECTIONS")
471            .ok()
472            .and_then(|it| it.parse().ok())
473            .unwrap_or(1000)
474    });
475    *VALUE
476}
477
478const MAX_REQUEST_BODY_SIZE: u32 = 64 * 1024 * 1024;
479
480/// Maximum JSON-RPC response body size in bytes. Defaults to 64 MiB.
481///
482/// `eth_getTransactionReceipt` and `eth_getBlockReceipts` can return very
483/// large responses for log-heavy transactions (a single tx emitting hundreds
484/// of thousands of events can exceed 64 MiB). Operators serving such queries
485/// can raise this with `FOREST_RPC_MAX_RESPONSE_BODY_SIZE` (in bytes).
486static MAX_RESPONSE_BODY_SIZE: LazyLock<u32> =
487    LazyLock::new(|| env_or_default("FOREST_RPC_MAX_RESPONSE_BODY_SIZE", MAX_REQUEST_BODY_SIZE));
488
489/// This is where you store persistent data, or at least access to stateful
490/// data.
491pub struct RPCState<DB> {
492    pub keystore: Arc<RwLock<KeyStore>>,
493    pub state_manager: Arc<crate::state_manager::StateManager<DB>>,
494    pub mpool: Arc<crate::message_pool::MessagePool<Arc<crate::chain::ChainStore<DB>>>>,
495    pub bad_blocks: Option<Arc<crate::chain_sync::BadBlockCache>>,
496    pub sync_status: crate::chain_sync::SyncStatus,
497    pub eth_event_handler: Arc<EthEventHandler>,
498    pub sync_network_context: SyncNetworkContext<DB>,
499    pub tipset_send: flume::Sender<FullTipset>,
500    pub start_time: chrono::DateTime<chrono::Utc>,
501    pub snapshot_progress_tracker: SnapshotProgressTracker,
502    pub shutdown: mpsc::Sender<()>,
503    pub mpool_locker: crate::message_pool::MpoolLocker,
504    pub nonce_tracker: crate::message_pool::NonceTracker,
505    pub temp_dir: Arc<std::path::PathBuf>,
506}
507
508impl<DB: Blockstore> RPCState<DB> {
509    pub fn beacon(&self) -> &Arc<crate::beacon::BeaconSchedule> {
510        self.state_manager.beacon_schedule()
511    }
512
513    pub fn chain_store(&self) -> &Arc<crate::chain::ChainStore<DB>> {
514        self.state_manager.chain_store()
515    }
516
517    pub fn chain_index(&self) -> &crate::chain::index::ChainIndex<DB> {
518        self.chain_store().chain_index()
519    }
520
521    pub fn chain_config(&self) -> &Arc<crate::networks::ChainConfig> {
522        self.state_manager.chain_config()
523    }
524
525    pub fn store(&self) -> &Arc<DB> {
526        self.chain_store().blockstore()
527    }
528
529    pub fn store_owned(&self) -> Arc<DB> {
530        self.state_manager.blockstore_owned()
531    }
532
533    pub fn network_send(&self) -> &flume::Sender<crate::libp2p::NetworkMessage> {
534        self.sync_network_context.network_send()
535    }
536
537    pub fn get_snapshot_progress_tracker(&self) -> SnapshotProgressState {
538        self.snapshot_progress_tracker.state()
539    }
540}
541
542#[derive(Clone)]
543struct PerConnection<RpcMiddleware, HttpMiddleware> {
544    stop_handle: StopHandle,
545    svc_builder: TowerServiceBuilder<RpcMiddleware, HttpMiddleware>,
546    keystore: Arc<RwLock<KeyStore>>,
547}
548
549pub async fn start_rpc<DB>(
550    state: RPCState<DB>,
551    rpc_listener: tokio::net::TcpListener,
552    stop_handle: StopHandle,
553    filter_list: Option<FilterList>,
554) -> anyhow::Result<()>
555where
556    DB: Blockstore + EthMappingsStore + Send + Sync + 'static,
557{
558    let filter_list = filter_list.unwrap_or_default();
559    // `Arc` is needed because we will share the state between two modules
560    let state = Arc::new(state);
561    let keystore = state.keystore.clone();
562    let mut modules = create_modules(state.clone());
563
564    let mut pubsub_module = FilRpcModule::default();
565    pubsub_module.register_channel("Filecoin.ChainNotify", {
566        let state_clone = state.clone();
567        move |params| chain::chain_notify(params, &state_clone)
568    })?;
569
570    for module in modules.values_mut() {
571        // register eth subscription APIs
572        module.merge(EthPubSub::new(state.clone()).into_rpc())?;
573        module.merge(pubsub_module.clone())?;
574    }
575
576    let methods: Arc<HashMap<ApiPaths, Methods>> =
577        Arc::new(modules.into_iter().map(|(k, v)| (k, v.into())).collect());
578
579    let per_conn = PerConnection {
580        stop_handle: stop_handle.clone(),
581        svc_builder: Server::builder()
582            .set_config(
583                ServerConfig::builder()
584                    .max_request_body_size(MAX_REQUEST_BODY_SIZE)
585                    // Default size (10 MiB) is not enough for methods like `Filecoin.StateMinerActiveSectors`
586                    .max_response_body_size(*MAX_RESPONSE_BODY_SIZE)
587                    .max_connections(default_max_connections())
588                    .set_id_provider(RandomHexStringIdProvider::new())
589                    .build(),
590            )
591            .set_http_middleware(
592                tower::ServiceBuilder::new()
593                    .option_layer(COMPRESS_MIN_BODY_SIZE.map(CompressionLayer::new))
594                    // Mark the `Authorization` request header as sensitive so it doesn't show in logs
595                    .layer(SetSensitiveRequestHeadersLayer::new(std::iter::once(
596                        http::header::AUTHORIZATION,
597                    ))),
598            )
599            .to_service_builder(),
600        keystore,
601    };
602    tracing::info!("Ready for RPC connections");
603    loop {
604        let sock = tokio::select! {
605        res = rpc_listener.accept() => {
606            match res {
607              Ok((stream, _remote_addr)) => stream,
608              Err(e) => {
609                tracing::error!("failed to accept v4 connection: {:?}", e);
610                continue;
611              }
612            }
613          }
614          _ = per_conn.stop_handle.clone().shutdown() => break,
615        };
616
617        let svc = tower::service_fn({
618            let methods = methods.clone();
619            let per_conn = per_conn.clone();
620            let filter_list = filter_list.clone();
621            move |req| {
622                let is_websocket = jsonrpsee::server::ws::is_upgrade_request(&req);
623                let path = if let Ok(p) = ApiPaths::from_uri(req.uri()) {
624                    p
625                } else {
626                    return async move {
627                        Ok(http::Response::builder()
628                            .status(http::StatusCode::NOT_FOUND)
629                            .body(Default::default())
630                            .unwrap_or_else(|_| http::Response::new(Default::default())))
631                    }
632                    .boxed();
633                };
634                let methods = methods.get(&path).cloned().unwrap_or_default();
635                let PerConnection {
636                    stop_handle,
637                    svc_builder,
638                    keystore,
639                } = per_conn.clone();
640                // NOTE, the rpc middleware must be initialized here to be able to be created once per connection
641                // with data from the connection such as the headers in this example
642                let headers = req.headers().clone();
643                let rpc_middleware = RpcServiceBuilder::new()
644                    .layer(SetExtensionLayer { path })
645                    .layer(SegregationLayer)
646                    .layer(FilterLayer::new(filter_list.clone()))
647                    .layer(validation_layer::JsonValidationLayer)
648                    .layer(AuthLayer {
649                        headers,
650                        keystore: keystore.clone(),
651                    })
652                    .layer(LogLayer::default())
653                    .layer(MetricsLayer::default());
654                let mut jsonrpsee_svc = svc_builder
655                    .set_rpc_middleware(rpc_middleware)
656                    .build(methods, stop_handle);
657
658                if is_websocket {
659                    // Utilize the session close future to know when the actual WebSocket
660                    // session was closed.
661                    let session_close = jsonrpsee_svc.on_session_closed();
662
663                    // A little bit weird API but the response to HTTP request must be returned below
664                    // and we spawn a task to register when the session is closed.
665                    tokio::spawn(async move {
666                        session_close.await;
667                        tracing::trace!("Closed WebSocket connection");
668                    });
669
670                    async move {
671                        tracing::trace!("Opened WebSocket connection");
672                        // https://github.com/rust-lang/rust/issues/102211 the error type can't be inferred
673                        // to be `Box<dyn std::error::Error + Send + Sync>` so we need to convert it to a concrete type
674                        // as workaround.
675                        jsonrpsee_svc
676                            .call(req)
677                            .await
678                            .map_err(|e| anyhow::anyhow!("{:?}", e))
679                    }
680                    .boxed()
681                } else {
682                    // HTTP.
683                    async move {
684                        tracing::trace!("Opened HTTP connection");
685                        let rp = jsonrpsee_svc.call(req).await;
686                        tracing::trace!("Closed HTTP connection");
687                        // https://github.com/rust-lang/rust/issues/102211 the error type can't be inferred
688                        // to be `Box<dyn std::error::Error + Send + Sync>` so we need to convert it to a concrete type
689                        // as workaround.
690                        rp.map_err(|e| anyhow::anyhow!("{:?}", e))
691                    }
692                    .boxed()
693                }
694            }
695        });
696
697        tokio::spawn(jsonrpsee::server::serve_with_graceful_shutdown(
698            sock,
699            svc,
700            stop_handle.clone().shutdown(),
701        ));
702    }
703
704    Ok(())
705}
706
707fn create_modules<DB>(state: Arc<RPCState<DB>>) -> HashMap<ApiPaths, RpcModule<RPCState<DB>>>
708where
709    DB: Blockstore + EthMappingsStore + Send + Sync + 'static,
710{
711    let mut modules = HashMap::default();
712    for api_version in ApiPaths::value_variants() {
713        modules.insert(*api_version, RpcModule::from_arc(state.clone()));
714    }
715    macro_rules! register {
716        ($ty:ty) => {
717            // Register only non-subscription RPC methods.
718            // Subscription methods are registered separately in the RPC module.
719            if !<$ty>::SUBSCRIPTION {
720                <$ty>::register(&mut modules, ParamStructure::ByPosition).unwrap();
721            }
722        };
723    }
724    for_each_rpc_method!(register);
725    modules
726}
727
728/// If `include` is not [`None`], only methods that are listed will be returned
729pub fn openrpc(path: ApiPaths, include: Option<&[&str]>) -> openrpc_types::OpenRPC {
730    use schemars::generate::{SchemaGenerator, SchemaSettings};
731
732    let mut methods = vec![];
733    // spec says draft07
734    let mut settings = SchemaSettings::draft07();
735    // ..but uses `components`
736    settings.definitions_path = "#/components/schemas/".into();
737    let mut generator = SchemaGenerator::new(settings);
738    macro_rules! callback {
739        ($ty:ty) => {
740            if <$ty>::API_PATHS.contains(path) {
741                match include {
742                    Some(include) => match include.contains(&<$ty>::NAME) {
743                        true => {
744                            methods.push(openrpc_types::ReferenceOr::Item(<$ty>::openrpc(
745                                &mut generator,
746                                ParamStructure::ByPosition,
747                                &<$ty>::NAME,
748                            )));
749                            if let Some(alias) = &<$ty>::NAME_ALIAS {
750                                methods.push(openrpc_types::ReferenceOr::Item(<$ty>::openrpc(
751                                    &mut generator,
752                                    ParamStructure::ByPosition,
753                                    &alias,
754                                )));
755                            }
756                        }
757                        false => {}
758                    },
759                    None => {
760                        methods.push(openrpc_types::ReferenceOr::Item(<$ty>::openrpc(
761                            &mut generator,
762                            ParamStructure::ByPosition,
763                            &<$ty>::NAME,
764                        )));
765                        if let Some(alias) = &<$ty>::NAME_ALIAS {
766                            methods.push(openrpc_types::ReferenceOr::Item(<$ty>::openrpc(
767                                &mut generator,
768                                ParamStructure::ByPosition,
769                                &alias,
770                            )));
771                        }
772                    }
773                }
774            }
775        };
776    }
777    for_each_rpc_method!(callback);
778    openrpc_types::OpenRPC {
779        methods,
780        components: Some(openrpc_types::Components {
781            schemas: Some(
782                generator
783                    .take_definitions(false)
784                    .into_iter()
785                    .filter_map(|(k, v)| {
786                        if let Ok(v) = Schema::try_from(v) {
787                            Some((k, v))
788                        } else {
789                            None
790                        }
791                    })
792                    .collect(),
793            ),
794            ..Default::default()
795        }),
796        openrpc: openrpc_types::OPEN_RPC_SPECIFICATION_VERSION,
797        info: openrpc_types::Info {
798            title: String::from("forest"),
799            version: env!("CARGO_PKG_VERSION").into(),
800            ..Default::default()
801        },
802        ..Default::default()
803    }
804}
805
806#[cfg(test)]
807mod tests {
808    use super::*;
809    use crate::{
810        db::MemoryDB, networks::NetworkChain, rpc::common::ShiftingVersion,
811        tool::offline_server::server::offline_rpc_state,
812    };
813    use jsonrpsee::server::stop_channel;
814    use std::net::{Ipv4Addr, SocketAddr};
815    use tokio::task::JoinSet;
816
817    // To update RPC specs:
818    // `cargo test --lib -- rpc::tests::openrpc`
819    // `cargo insta review`
820
821    #[test]
822    fn openrpc_v0() {
823        openrpc(ApiPaths::V0);
824    }
825
826    #[test]
827    fn openrpc_v1() {
828        openrpc(ApiPaths::V1);
829    }
830
831    #[test]
832    fn openrpc_v2() {
833        openrpc(ApiPaths::V2);
834    }
835
836    fn openrpc(path: ApiPaths) {
837        let spec = super::openrpc(path, None);
838        insta::assert_yaml_snapshot!(path.path(), spec);
839    }
840
841    #[test]
842    fn test_rpc_server() {
843        const TIMEOUT: Duration = Duration::from_secs(5);
844        let (done_tx, done_rx) = flume::bounded(1);
845        let rt = tokio::runtime::Builder::new_multi_thread()
846            .enable_all()
847            .build()
848            .unwrap();
849        rt.block_on(async move { test_rpc_server_inner(done_tx).await });
850        done_rx.recv().unwrap();
851        // To mitigate the transient timeout issue
852        rt.shutdown_timeout(TIMEOUT);
853    }
854
855    async fn test_rpc_server_inner(done_tx: flume::Sender<()>) {
856        let chain = NetworkChain::Calibnet;
857        let db = Arc::new(MemoryDB::default());
858        let mut services = JoinSet::new();
859        let (state, mut shutdown_recv) = offline_rpc_state(chain, db, None, None, &mut services)
860            .await
861            .unwrap();
862        let block_delay_secs = state.chain_config().block_delay_secs;
863        let shutdown_send = state.shutdown.clone();
864        let jwt_read_permissions = vec!["read".to_owned()];
865        let jwt_read = super::methods::auth::AuthNew::create_token(
866            &state.keystore.read(),
867            chrono::Duration::hours(1),
868            jwt_read_permissions.clone(),
869        )
870        .unwrap();
871        let rpc_listener =
872            tokio::net::TcpListener::bind(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0))
873                .await
874                .unwrap();
875        let rpc_address = rpc_listener.local_addr().unwrap();
876        let (stop_handle, server_handle) = stop_channel();
877
878        // Start an RPC server
879
880        let handle = tokio::spawn(start_rpc(state, rpc_listener, stop_handle, None));
881
882        println!("sending a few http requests");
883
884        let client = Client::from_url(
885            format!("http://{}:{}/", rpc_address.ip(), rpc_address.port())
886                .parse()
887                .unwrap(),
888        );
889
890        let response = super::methods::common::Version::call(&client, ())
891            .await
892            .unwrap();
893        assert_eq!(
894            &response.version,
895            &*crate::utils::version::FOREST_VERSION_STRING
896        );
897        assert_eq!(response.block_delay, block_delay_secs);
898        assert_eq!(response.api_version, ShiftingVersion::new(2, 3, 0));
899
900        let response = super::methods::auth::AuthVerify::call(&client, (jwt_read.clone(),))
901            .await
902            .unwrap();
903        assert_eq!(response, jwt_read_permissions);
904
905        drop(client);
906
907        println!("sending a few websocket requests");
908
909        let client = Client::from_url(
910            format!("ws://{}:{}/", rpc_address.ip(), rpc_address.port())
911                .parse()
912                .unwrap(),
913        );
914
915        let response = super::methods::auth::AuthVerify::call(&client, (jwt_read,))
916            .await
917            .unwrap();
918        assert_eq!(response, jwt_read_permissions);
919
920        drop(client);
921
922        // Gracefully shutdown the RPC server
923        println!("sending shutdown signal");
924        shutdown_send.send(()).await.unwrap();
925        println!("waiting on shutdown receiver");
926        shutdown_recv.recv().await;
927        println!("sending server stop signal");
928        server_handle.stop().unwrap();
929        println!("waiting on graceful shutdown");
930        handle.await.unwrap().unwrap();
931        println!("done");
932        done_tx.send(()).unwrap();
933    }
934}