1use crate::rpc::methods::eth::pubsub_trait::EthPubSubApiServer;
5mod auth_layer;
6mod channel;
7mod client;
8mod filter_layer;
9mod filter_list;
10pub mod json_validator;
11mod log_layer;
12mod metrics_layer;
13mod request;
14mod segregation_layer;
15mod set_extension_layer;
16mod validation_layer;
17
18use crate::rpc::eth::types::RandomHexStringIdProvider;
19use crate::shim::clock::ChainEpoch;
20use clap::ValueEnum as _;
21pub use client::Client;
22pub use error::ServerError;
23use eth::filter::EthEventHandler;
24use filter_layer::FilterLayer;
25pub use filter_list::FilterList;
26use futures::FutureExt as _;
27use jsonrpsee::server::ServerConfig;
28use log_layer::LogLayer;
29use reflect::Ctx;
30pub use reflect::{ApiPaths, Permission, RpcMethod, RpcMethodExt};
31pub use request::Request;
32use schemars::Schema;
33use segregation_layer::SegregationLayer;
34use set_extension_layer::SetExtensionLayer;
35mod error;
36mod reflect;
37use ahash::HashMap;
38mod registry;
39pub mod types;
40
41pub use methods::*;
42
43pub use jsonrpsee::core::ClientError;
45
46pub const LOOKBACK_NO_LIMIT: ChainEpoch = -1;
48
49#[macro_export]
57macro_rules! for_each_rpc_method {
58 ($callback:path) => {
59 $callback!($crate::rpc::auth::AuthNew);
61 $callback!($crate::rpc::auth::AuthVerify);
62
63 $callback!($crate::rpc::beacon::BeaconGetEntry);
65
66 $callback!($crate::rpc::chain::ChainPruneSnapshot);
68 $callback!($crate::rpc::chain::ChainExport);
69 $callback!($crate::rpc::chain::ChainGetBlock);
70 $callback!($crate::rpc::chain::ChainGetBlockMessages);
71 $callback!($crate::rpc::chain::ChainGetEvents);
72 $callback!($crate::rpc::chain::ChainGetGenesis);
73 $callback!($crate::rpc::chain::ChainGetFinalizedTipset);
74 $callback!($crate::rpc::chain::ChainGetMessage);
75 $callback!($crate::rpc::chain::ChainGetMessagesInTipset);
76 $callback!($crate::rpc::chain::ChainGetMinBaseFee);
77 $callback!($crate::rpc::chain::ChainGetParentMessages);
78 $callback!($crate::rpc::chain::ChainGetParentReceipts);
79 $callback!($crate::rpc::chain::ChainGetPath);
80 $callback!($crate::rpc::chain::ChainGetTipSet);
81 $callback!($crate::rpc::chain::ChainGetTipSetV2);
82 $callback!($crate::rpc::chain::ChainGetTipSetAfterHeight);
83 $callback!($crate::rpc::chain::ChainGetTipSetByHeight);
84 $callback!($crate::rpc::chain::ChainHasObj);
85 $callback!($crate::rpc::chain::ChainHead);
86 $callback!($crate::rpc::chain::ChainReadObj);
87 $callback!($crate::rpc::chain::ChainSetHead);
88 $callback!($crate::rpc::chain::ChainStatObj);
89 $callback!($crate::rpc::chain::ChainTipSetWeight);
90 $callback!($crate::rpc::chain::ForestChainExport);
91 $callback!($crate::rpc::chain::ForestChainExportDiff);
92 $callback!($crate::rpc::chain::ForestChainExportStatus);
93 $callback!($crate::rpc::chain::ForestChainExportCancel);
94 $callback!($crate::rpc::chain::ChainGetTipsetByParentState);
95
96 $callback!($crate::rpc::common::Session);
98 $callback!($crate::rpc::common::Shutdown);
99 $callback!($crate::rpc::common::StartTime);
100 $callback!($crate::rpc::common::Version);
101
102 $callback!($crate::rpc::eth::EthAccounts);
104 $callback!($crate::rpc::eth::EthAddressToFilecoinAddress);
105 $callback!($crate::rpc::eth::FilecoinAddressToEthAddress);
106 $callback!($crate::rpc::eth::EthBlockNumber);
107 $callback!($crate::rpc::eth::EthCall);
108 $callback!($crate::rpc::eth::EthCallV2);
109 $callback!($crate::rpc::eth::EthChainId);
110 $callback!($crate::rpc::eth::EthEstimateGas);
111 $callback!($crate::rpc::eth::EthEstimateGasV2);
112 $callback!($crate::rpc::eth::EthFeeHistory);
113 $callback!($crate::rpc::eth::EthFeeHistoryV2);
114 $callback!($crate::rpc::eth::EthGasPrice);
115 $callback!($crate::rpc::eth::EthGetBalance);
116 $callback!($crate::rpc::eth::EthGetBalanceV2);
117 $callback!($crate::rpc::eth::EthGetBlockByHash);
118 $callback!($crate::rpc::eth::EthGetBlockByNumber);
119 $callback!($crate::rpc::eth::EthGetBlockByNumberV2);
120 $callback!($crate::rpc::eth::EthGetBlockReceipts);
121 $callback!($crate::rpc::eth::EthGetBlockReceiptsV2);
122 $callback!($crate::rpc::eth::EthGetBlockReceiptsLimited);
123 $callback!($crate::rpc::eth::EthGetBlockReceiptsLimitedV2);
124 $callback!($crate::rpc::eth::EthGetBlockTransactionCountByHash);
125 $callback!($crate::rpc::eth::EthGetBlockTransactionCountByNumber);
126 $callback!($crate::rpc::eth::EthGetBlockTransactionCountByNumberV2);
127 $callback!($crate::rpc::eth::EthGetCode);
128 $callback!($crate::rpc::eth::EthGetCodeV2);
129 $callback!($crate::rpc::eth::EthGetLogs);
130 $callback!($crate::rpc::eth::EthGetFilterLogs);
131 $callback!($crate::rpc::eth::EthGetFilterChanges);
132 $callback!($crate::rpc::eth::EthGetMessageCidByTransactionHash);
133 $callback!($crate::rpc::eth::EthGetStorageAt);
134 $callback!($crate::rpc::eth::EthGetStorageAtV2);
135 $callback!($crate::rpc::eth::EthGetTransactionByHash);
136 $callback!($crate::rpc::eth::EthGetTransactionByHashLimited);
137 $callback!($crate::rpc::eth::EthGetTransactionCount);
138 $callback!($crate::rpc::eth::EthGetTransactionCountV2);
139 $callback!($crate::rpc::eth::EthGetTransactionHashByCid);
140 $callback!($crate::rpc::eth::EthGetTransactionByBlockNumberAndIndex);
141 $callback!($crate::rpc::eth::EthGetTransactionByBlockNumberAndIndexV2);
142 $callback!($crate::rpc::eth::EthGetTransactionByBlockHashAndIndex);
143 $callback!($crate::rpc::eth::EthMaxPriorityFeePerGas);
144 $callback!($crate::rpc::eth::EthProtocolVersion);
145 $callback!($crate::rpc::eth::EthGetTransactionReceipt);
146 $callback!($crate::rpc::eth::EthGetTransactionReceiptLimited);
147 $callback!($crate::rpc::eth::EthNewFilter);
148 $callback!($crate::rpc::eth::EthNewPendingTransactionFilter);
149 $callback!($crate::rpc::eth::EthNewBlockFilter);
150 $callback!($crate::rpc::eth::EthUninstallFilter);
151 $callback!($crate::rpc::eth::EthUnsubscribe);
152 $callback!($crate::rpc::eth::EthSubscribe);
153 $callback!($crate::rpc::eth::EthSyncing);
154 $callback!($crate::rpc::eth::EthTraceBlock);
155 $callback!($crate::rpc::eth::EthTraceBlockV2);
156 $callback!($crate::rpc::eth::EthTraceCall);
157 $callback!($crate::rpc::eth::EthTraceFilter);
158 $callback!($crate::rpc::eth::EthTraceFilterV2);
159 $callback!($crate::rpc::eth::EthTraceTransaction);
160 $callback!($crate::rpc::eth::EthTraceReplayBlockTransactions);
161 $callback!($crate::rpc::eth::EthTraceReplayBlockTransactionsV2);
162 $callback!($crate::rpc::eth::Web3ClientVersion);
163 $callback!($crate::rpc::eth::EthSendRawTransaction);
164 $callback!($crate::rpc::eth::EthSendRawTransactionUntrusted);
165
166 $callback!($crate::rpc::gas::GasEstimateFeeCap);
168 $callback!($crate::rpc::gas::GasEstimateGasLimit);
169 $callback!($crate::rpc::gas::GasEstimateGasPremium);
170 $callback!($crate::rpc::gas::GasEstimateMessageGas);
171
172 $callback!($crate::rpc::market::MarketAddBalance);
174
175 $callback!($crate::rpc::miner::MinerCreateBlock);
177 $callback!($crate::rpc::miner::MinerGetBaseInfo);
178
179 $callback!($crate::rpc::mpool::MpoolBatchPush);
181 $callback!($crate::rpc::mpool::MpoolBatchPushUntrusted);
182 $callback!($crate::rpc::mpool::MpoolGetNonce);
183 $callback!($crate::rpc::mpool::MpoolPending);
184 $callback!($crate::rpc::mpool::MpoolPush);
185 $callback!($crate::rpc::mpool::MpoolPushMessage);
186 $callback!($crate::rpc::mpool::MpoolPushUntrusted);
187 $callback!($crate::rpc::mpool::MpoolSelect);
188
189 $callback!($crate::rpc::msig::MsigGetAvailableBalance);
191 $callback!($crate::rpc::msig::MsigGetPending);
192 $callback!($crate::rpc::msig::MsigGetVested);
193 $callback!($crate::rpc::msig::MsigGetVestingSchedule);
194
195 $callback!($crate::rpc::net::NetAddrsListen);
197 $callback!($crate::rpc::net::NetAgentVersion);
198 $callback!($crate::rpc::net::NetAutoNatStatus);
199 $callback!($crate::rpc::net::NetConnect);
200 $callback!($crate::rpc::net::NetDisconnect);
201 $callback!($crate::rpc::net::NetFindPeer);
202 $callback!($crate::rpc::net::NetInfo);
203 $callback!($crate::rpc::net::NetListening);
204 $callback!($crate::rpc::net::NetPeers);
205 $callback!($crate::rpc::net::NetProtectAdd);
206 $callback!($crate::rpc::net::NetProtectList);
207 $callback!($crate::rpc::net::NetProtectRemove);
208 $callback!($crate::rpc::net::NetVersion);
209
210 $callback!($crate::rpc::node::NodeStatus);
212
213 $callback!($crate::rpc::state::StateAccountKey);
215 $callback!($crate::rpc::state::StateCall);
216 $callback!($crate::rpc::state::StateCirculatingSupply);
217 $callback!($crate::rpc::state::ForestStateCompute);
218 $callback!($crate::rpc::state::StateCompute);
219 $callback!($crate::rpc::state::StateDealProviderCollateralBounds);
220 $callback!($crate::rpc::state::StateFetchRoot);
221 $callback!($crate::rpc::state::StateGetActor);
222 $callback!($crate::rpc::state::StateGetActorV2);
223 $callback!($crate::rpc::state::StateGetID);
224 $callback!($crate::rpc::state::StateGetAllAllocations);
225 $callback!($crate::rpc::state::StateGetAllClaims);
226 $callback!($crate::rpc::state::StateGetAllocation);
227 $callback!($crate::rpc::state::StateGetAllocationForPendingDeal);
228 $callback!($crate::rpc::state::StateGetAllocationIdForPendingDeal);
229 $callback!($crate::rpc::state::StateGetAllocations);
230 $callback!($crate::rpc::state::StateGetBeaconEntry);
231 $callback!($crate::rpc::state::StateGetClaim);
232 $callback!($crate::rpc::state::StateGetClaims);
233 $callback!($crate::rpc::state::StateGetNetworkParams);
234 $callback!($crate::rpc::state::StateGetRandomnessDigestFromBeacon);
235 $callback!($crate::rpc::state::StateGetRandomnessDigestFromTickets);
236 $callback!($crate::rpc::state::StateGetRandomnessFromBeacon);
237 $callback!($crate::rpc::state::StateGetRandomnessFromTickets);
238 $callback!($crate::rpc::state::StateGetReceipt);
239 $callback!($crate::rpc::state::StateListActors);
240 $callback!($crate::rpc::state::StateListMessages);
241 $callback!($crate::rpc::state::StateListMiners);
242 $callback!($crate::rpc::state::StateLookupID);
243 $callback!($crate::rpc::state::StateLookupRobustAddress);
244 $callback!($crate::rpc::state::StateMarketBalance);
245 $callback!($crate::rpc::state::StateMarketDeals);
246 $callback!($crate::rpc::state::StateMarketParticipants);
247 $callback!($crate::rpc::state::StateMarketStorageDeal);
248 $callback!($crate::rpc::state::StateMinerActiveSectors);
249 $callback!($crate::rpc::state::StateMinerAllocated);
250 $callback!($crate::rpc::state::StateMinerAvailableBalance);
251 $callback!($crate::rpc::state::StateMinerDeadlines);
252 $callback!($crate::rpc::state::StateMinerFaults);
253 $callback!($crate::rpc::state::StateMinerInfo);
254 $callback!($crate::rpc::state::StateMinerInitialPledgeCollateral);
255 $callback!($crate::rpc::state::StateMinerPartitions);
256 $callback!($crate::rpc::state::StateMinerPower);
257 $callback!($crate::rpc::state::StateMinerPreCommitDepositForPower);
258 $callback!($crate::rpc::state::StateMinerProvingDeadline);
259 $callback!($crate::rpc::state::StateMinerRecoveries);
260 $callback!($crate::rpc::state::StateMinerSectorAllocated);
261 $callback!($crate::rpc::state::StateMinerSectorCount);
262 $callback!($crate::rpc::state::StateMinerSectors);
263 $callback!($crate::rpc::state::StateNetworkName);
264 $callback!($crate::rpc::state::StateNetworkVersion);
265 $callback!($crate::rpc::state::StateActorInfo);
266 $callback!($crate::rpc::state::StateReadState);
267 $callback!($crate::rpc::state::StateDecodeParams);
268 $callback!($crate::rpc::state::StateReplay);
269 $callback!($crate::rpc::state::StateSearchMsg);
270 $callback!($crate::rpc::state::StateSearchMsgLimited);
271 $callback!($crate::rpc::state::StateSectorExpiration);
272 $callback!($crate::rpc::state::StateSectorGetInfo);
273 $callback!($crate::rpc::state::StateSectorPartition);
274 $callback!($crate::rpc::state::StateSectorPreCommitInfo);
275 $callback!($crate::rpc::state::StateSectorPreCommitInfoV0);
276 $callback!($crate::rpc::state::StateVerifiedClientStatus);
277 $callback!($crate::rpc::state::StateVerifiedRegistryRootKey);
278 $callback!($crate::rpc::state::StateVerifierStatus);
279 $callback!($crate::rpc::state::StateVMCirculatingSupplyInternal);
280 $callback!($crate::rpc::state::StateWaitMsg);
281 $callback!($crate::rpc::state::StateWaitMsgV0);
282 $callback!($crate::rpc::state::StateMinerInitialPledgeForSector);
283
284 $callback!($crate::rpc::sync::SyncCheckBad);
286 $callback!($crate::rpc::sync::SyncMarkBad);
287 $callback!($crate::rpc::sync::SyncSnapshotProgress);
288 $callback!($crate::rpc::sync::SyncStatus);
289 $callback!($crate::rpc::sync::SyncSubmitBlock);
290
291 $callback!($crate::rpc::wallet::WalletBalance);
293 $callback!($crate::rpc::wallet::WalletDefaultAddress);
294 $callback!($crate::rpc::wallet::WalletDelete);
295 $callback!($crate::rpc::wallet::WalletExport);
296 $callback!($crate::rpc::wallet::WalletHas);
297 $callback!($crate::rpc::wallet::WalletImport);
298 $callback!($crate::rpc::wallet::WalletList);
299 $callback!($crate::rpc::wallet::WalletNew);
300 $callback!($crate::rpc::wallet::WalletSetDefault);
301 $callback!($crate::rpc::wallet::WalletSign);
302 $callback!($crate::rpc::wallet::WalletSignMessage);
303 $callback!($crate::rpc::wallet::WalletValidateAddress);
304 $callback!($crate::rpc::wallet::WalletVerify);
305
306 $callback!($crate::rpc::f3::GetRawNetworkName);
308 $callback!($crate::rpc::f3::F3GetCertificate);
309 $callback!($crate::rpc::f3::F3GetECPowerTable);
310 $callback!($crate::rpc::f3::F3GetF3PowerTable);
311 $callback!($crate::rpc::f3::F3GetF3PowerTableByInstance);
312 $callback!($crate::rpc::f3::F3IsRunning);
313 $callback!($crate::rpc::f3::F3GetProgress);
314 $callback!($crate::rpc::f3::F3GetManifest);
315 $callback!($crate::rpc::f3::F3ListParticipants);
316 $callback!($crate::rpc::f3::F3GetLatestCertificate);
317 $callback!($crate::rpc::f3::F3GetOrRenewParticipationTicket);
318 $callback!($crate::rpc::f3::F3Participate);
319 $callback!($crate::rpc::f3::F3ExportLatestSnapshot);
320 $callback!($crate::rpc::f3::GetHead);
321 $callback!($crate::rpc::f3::GetParent);
322 $callback!($crate::rpc::f3::GetParticipatingMinerIDs);
323 $callback!($crate::rpc::f3::GetPowerTable);
324 $callback!($crate::rpc::f3::GetTipset);
325 $callback!($crate::rpc::f3::GetTipsetByEpoch);
326 $callback!($crate::rpc::f3::Finalize);
327 $callback!($crate::rpc::f3::ProtectPeer);
328 $callback!($crate::rpc::f3::SignMessage);
329
330 $callback!($crate::rpc::misc::GetActorEventsRaw);
332 };
333}
334pub(crate) use for_each_rpc_method;
335use sync::SnapshotProgressTracker;
336use tower_http::compression::CompressionLayer;
337use tower_http::sensitive_headers::SetSensitiveRequestHeadersLayer;
338
339#[allow(unused)]
340pub mod prelude {
352 use super::*;
353
354 pub use reflect::RpcMethodExt as _;
355
356 macro_rules! export {
357 ($ty:ty) => {
358 pub use $ty;
359 };
360 }
361
362 for_each_rpc_method!(export);
363}
364
365pub fn collect_rpc_method_info() -> Vec<(&'static str, Permission)> {
367 use crate::rpc::RpcMethod;
368
369 let mut methods = Vec::new();
370
371 macro_rules! add_method {
372 ($ty:ty) => {
373 methods.push((<$ty>::NAME, <$ty>::PERMISSION));
374 };
375 }
376
377 for_each_rpc_method!(add_method);
378
379 methods
380}
381
382mod methods {
419 pub mod auth;
420 pub mod beacon;
421 pub mod chain;
422 pub mod common;
423 pub mod eth;
424 pub mod f3;
425 pub mod gas;
426 pub mod market;
427 pub mod miner;
428 pub mod misc;
429 pub mod mpool;
430 pub mod msig;
431 pub mod net;
432 pub mod node;
433 pub mod state;
434 pub mod sync;
435 pub mod wallet;
436}
437
438use crate::rpc::auth_layer::AuthLayer;
439pub use crate::rpc::channel::CANCEL_METHOD_NAME;
440use crate::rpc::channel::RpcModule as FilRpcModule;
441use crate::rpc::eth::pubsub::EthPubSub;
442use crate::rpc::metrics_layer::MetricsLayer;
443use crate::{chain_sync::network_context::SyncNetworkContext, key_management::KeyStore};
444
445use crate::blocks::FullTipset;
446use fvm_ipld_blockstore::Blockstore;
447use jsonrpsee::{
448 Methods,
449 core::middleware::RpcServiceBuilder,
450 server::{RpcModule, Server, StopHandle, TowerServiceBuilder},
451};
452use parking_lot::RwLock;
453use std::env;
454use std::sync::{Arc, LazyLock};
455use std::time::Duration;
456use tokio::sync::mpsc;
457use tower::Service;
458
459use crate::rpc::sync::SnapshotProgressState;
460use openrpc_types::{self, ParamStructure};
461
462pub const DEFAULT_PORT: u16 = 2345;
463
464static DEFAULT_REQUEST_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
466 env::var("FOREST_RPC_DEFAULT_TIMEOUT")
467 .ok()
468 .and_then(|it| Duration::from_secs(it.parse().ok()?).into())
469 .unwrap_or(Duration::from_secs(60))
470});
471
472static DEFAULT_MAX_CONNECTIONS: LazyLock<u32> = LazyLock::new(|| {
475 env::var("FOREST_RPC_MAX_CONNECTIONS")
476 .ok()
477 .and_then(|it| it.parse().ok())
478 .unwrap_or(1000)
479});
480
481const MAX_REQUEST_BODY_SIZE: u32 = 64 * 1024 * 1024;
482const MAX_RESPONSE_BODY_SIZE: u32 = MAX_REQUEST_BODY_SIZE;
483
484pub struct RPCState<DB> {
487 pub keystore: Arc<RwLock<KeyStore>>,
488 pub state_manager: Arc<crate::state_manager::StateManager<DB>>,
489 pub mpool: Arc<crate::message_pool::MessagePool<crate::message_pool::MpoolRpcProvider<DB>>>,
490 pub bad_blocks: Option<Arc<crate::chain_sync::BadBlockCache>>,
491 pub msgs_in_tipset: Arc<crate::chain::store::MsgsInTipsetCache>,
492 pub sync_status: crate::chain_sync::SyncStatus,
493 pub eth_event_handler: Arc<EthEventHandler>,
494 pub sync_network_context: SyncNetworkContext<DB>,
495 pub tipset_send: flume::Sender<FullTipset>,
496 pub start_time: chrono::DateTime<chrono::Utc>,
497 pub snapshot_progress_tracker: SnapshotProgressTracker,
498 pub shutdown: mpsc::Sender<()>,
499}
500
501impl<DB: Blockstore> RPCState<DB> {
502 pub fn beacon(&self) -> &Arc<crate::beacon::BeaconSchedule> {
503 self.state_manager.beacon_schedule()
504 }
505
506 pub fn chain_store(&self) -> &Arc<crate::chain::ChainStore<DB>> {
507 self.state_manager.chain_store()
508 }
509
510 pub fn chain_index(&self) -> &Arc<crate::chain::index::ChainIndex<Arc<DB>>> {
511 self.chain_store().chain_index()
512 }
513
514 pub fn chain_config(&self) -> &Arc<crate::networks::ChainConfig> {
515 self.state_manager.chain_config()
516 }
517
518 pub fn store(&self) -> &Arc<DB> {
519 self.chain_store().blockstore()
520 }
521
522 pub fn store_owned(&self) -> Arc<DB> {
523 self.state_manager.blockstore_owned()
524 }
525
526 pub fn network_send(&self) -> &flume::Sender<crate::libp2p::NetworkMessage> {
527 self.sync_network_context.network_send()
528 }
529
530 pub fn get_snapshot_progress_tracker(&self) -> SnapshotProgressState {
531 self.snapshot_progress_tracker.state()
532 }
533}
534
535#[derive(Clone)]
536struct PerConnection<RpcMiddleware, HttpMiddleware> {
537 stop_handle: StopHandle,
538 svc_builder: TowerServiceBuilder<RpcMiddleware, HttpMiddleware>,
539 keystore: Arc<RwLock<KeyStore>>,
540}
541
542pub async fn start_rpc<DB>(
543 state: RPCState<DB>,
544 rpc_listener: tokio::net::TcpListener,
545 stop_handle: StopHandle,
546 filter_list: Option<FilterList>,
547) -> anyhow::Result<()>
548where
549 DB: Blockstore + Send + Sync + 'static,
550{
551 let filter_list = filter_list.unwrap_or_default();
552 let state = Arc::new(state);
554 let keystore = state.keystore.clone();
555 let mut modules = create_modules(state.clone());
556
557 let mut pubsub_module = FilRpcModule::default();
558 pubsub_module.register_channel("Filecoin.ChainNotify", {
559 let state_clone = state.clone();
560 move |params| chain::chain_notify(params, &state_clone)
561 })?;
562
563 for module in modules.values_mut() {
564 module.merge(EthPubSub::new(state.clone()).into_rpc())?;
566 module.merge(pubsub_module.clone())?;
567 }
568
569 let methods: Arc<HashMap<ApiPaths, Methods>> =
570 Arc::new(modules.into_iter().map(|(k, v)| (k, v.into())).collect());
571
572 let per_conn = PerConnection {
573 stop_handle: stop_handle.clone(),
574 svc_builder: Server::builder()
575 .set_config(
576 ServerConfig::builder()
577 .max_request_body_size(MAX_REQUEST_BODY_SIZE)
579 .max_response_body_size(MAX_RESPONSE_BODY_SIZE)
580 .max_connections(*DEFAULT_MAX_CONNECTIONS)
581 .set_id_provider(RandomHexStringIdProvider::new())
582 .build(),
583 )
584 .set_http_middleware(
585 tower::ServiceBuilder::new()
586 .layer(CompressionLayer::new())
587 .layer(SetSensitiveRequestHeadersLayer::new(std::iter::once(
589 http::header::AUTHORIZATION,
590 ))),
591 )
592 .to_service_builder(),
593 keystore,
594 };
595 tracing::info!("Ready for RPC connections");
596 loop {
597 let sock = tokio::select! {
598 res = rpc_listener.accept() => {
599 match res {
600 Ok((stream, _remote_addr)) => stream,
601 Err(e) => {
602 tracing::error!("failed to accept v4 connection: {:?}", e);
603 continue;
604 }
605 }
606 }
607 _ = per_conn.stop_handle.clone().shutdown() => break,
608 };
609
610 let svc = tower::service_fn({
611 let methods = methods.clone();
612 let per_conn = per_conn.clone();
613 let filter_list = filter_list.clone();
614 move |req| {
615 let is_websocket = jsonrpsee::server::ws::is_upgrade_request(&req);
616 let path = if let Ok(p) = ApiPaths::from_uri(req.uri()) {
617 p
618 } else {
619 return async move {
620 Ok(http::Response::builder()
621 .status(http::StatusCode::NOT_FOUND)
622 .body(Default::default())
623 .unwrap_or_else(|_| http::Response::new(Default::default())))
624 }
625 .boxed();
626 };
627 let methods = methods.get(&path).cloned().unwrap_or_default();
628 let PerConnection {
629 stop_handle,
630 svc_builder,
631 keystore,
632 } = per_conn.clone();
633 let headers = req.headers().clone();
636 let rpc_middleware = RpcServiceBuilder::new()
637 .layer(SetExtensionLayer { path })
638 .layer(SegregationLayer)
639 .layer(FilterLayer::new(filter_list.clone()))
640 .layer(validation_layer::JsonValidationLayer)
641 .layer(AuthLayer {
642 headers,
643 keystore: keystore.clone(),
644 })
645 .layer(LogLayer::default())
646 .layer(MetricsLayer::default());
647 let mut jsonrpsee_svc = svc_builder
648 .set_rpc_middleware(rpc_middleware)
649 .build(methods, stop_handle);
650
651 if is_websocket {
652 let session_close = jsonrpsee_svc.on_session_closed();
655
656 tokio::spawn(async move {
659 session_close.await;
660 tracing::trace!("Closed WebSocket connection");
661 });
662
663 async move {
664 tracing::trace!("Opened WebSocket connection");
665 jsonrpsee_svc
669 .call(req)
670 .await
671 .map_err(|e| anyhow::anyhow!("{:?}", e))
672 }
673 .boxed()
674 } else {
675 async move {
677 tracing::trace!("Opened HTTP connection");
678 let rp = jsonrpsee_svc.call(req).await;
679 tracing::trace!("Closed HTTP connection");
680 rp.map_err(|e| anyhow::anyhow!("{:?}", e))
684 }
685 .boxed()
686 }
687 }
688 });
689
690 tokio::spawn(jsonrpsee::server::serve_with_graceful_shutdown(
691 sock,
692 svc,
693 stop_handle.clone().shutdown(),
694 ));
695 }
696
697 Ok(())
698}
699
700fn create_modules<DB>(state: Arc<RPCState<DB>>) -> HashMap<ApiPaths, RpcModule<RPCState<DB>>>
701where
702 DB: Blockstore + Send + Sync + 'static,
703{
704 let mut modules = HashMap::default();
705 for api_version in ApiPaths::value_variants() {
706 modules.insert(*api_version, RpcModule::from_arc(state.clone()));
707 }
708 macro_rules! register {
709 ($ty:ty) => {
710 if !<$ty>::SUBSCRIPTION {
713 <$ty>::register(&mut modules, ParamStructure::ByPosition).unwrap();
714 }
715 };
716 }
717 for_each_rpc_method!(register);
718 modules
719}
720
721pub fn openrpc(path: ApiPaths, include: Option<&[&str]>) -> openrpc_types::OpenRPC {
723 use schemars::generate::{SchemaGenerator, SchemaSettings};
724
725 let mut methods = vec![];
726 let mut settings = SchemaSettings::draft07();
728 settings.definitions_path = "#/components/schemas/".into();
730 let mut generator = SchemaGenerator::new(settings);
731 macro_rules! callback {
732 ($ty:ty) => {
733 if <$ty>::API_PATHS.contains(path) {
734 match include {
735 Some(include) => match include.contains(&<$ty>::NAME) {
736 true => {
737 methods.push(openrpc_types::ReferenceOr::Item(<$ty>::openrpc(
738 &mut generator,
739 ParamStructure::ByPosition,
740 &<$ty>::NAME,
741 )));
742 if let Some(alias) = &<$ty>::NAME_ALIAS {
743 methods.push(openrpc_types::ReferenceOr::Item(<$ty>::openrpc(
744 &mut generator,
745 ParamStructure::ByPosition,
746 &alias,
747 )));
748 }
749 }
750 false => {}
751 },
752 None => {
753 methods.push(openrpc_types::ReferenceOr::Item(<$ty>::openrpc(
754 &mut generator,
755 ParamStructure::ByPosition,
756 &<$ty>::NAME,
757 )));
758 if let Some(alias) = &<$ty>::NAME_ALIAS {
759 methods.push(openrpc_types::ReferenceOr::Item(<$ty>::openrpc(
760 &mut generator,
761 ParamStructure::ByPosition,
762 &alias,
763 )));
764 }
765 }
766 }
767 }
768 };
769 }
770 for_each_rpc_method!(callback);
771 openrpc_types::OpenRPC {
772 methods,
773 components: Some(openrpc_types::Components {
774 schemas: Some(
775 generator
776 .take_definitions(false)
777 .into_iter()
778 .filter_map(|(k, v)| {
779 if let Ok(v) = Schema::try_from(v) {
780 Some((k, v))
781 } else {
782 None
783 }
784 })
785 .collect(),
786 ),
787 ..Default::default()
788 }),
789 openrpc: openrpc_types::OPEN_RPC_SPECIFICATION_VERSION,
790 info: openrpc_types::Info {
791 title: String::from("forest"),
792 version: env!("CARGO_PKG_VERSION").into(),
793 ..Default::default()
794 },
795 ..Default::default()
796 }
797}
798
799#[cfg(test)]
800mod tests {
801 use super::*;
802 use crate::{
803 db::MemoryDB, networks::NetworkChain, rpc::common::ShiftingVersion,
804 tool::offline_server::server::offline_rpc_state,
805 };
806 use jsonrpsee::server::stop_channel;
807 use std::net::{Ipv4Addr, SocketAddr};
808 use tokio::task::JoinSet;
809
810 #[test]
815 fn openrpc_v0() {
816 openrpc(ApiPaths::V0);
817 }
818
819 #[test]
820 fn openrpc_v1() {
821 openrpc(ApiPaths::V1);
822 }
823
824 #[test]
825 fn openrpc_v2() {
826 openrpc(ApiPaths::V2);
827 }
828
829 fn openrpc(path: ApiPaths) {
830 let spec = super::openrpc(path, None);
831 insta::assert_yaml_snapshot!(path.path(), spec);
832 }
833
834 #[test]
835 fn test_rpc_server() {
836 const TIMEOUT: Duration = Duration::from_secs(5);
837 let (done_tx, done_rx) = flume::bounded(1);
838 let rt = tokio::runtime::Builder::new_multi_thread()
839 .enable_all()
840 .build()
841 .unwrap();
842 rt.block_on(async move { test_rpc_server_inner(done_tx).await });
843 done_rx.recv().unwrap();
844 rt.shutdown_timeout(TIMEOUT);
846 }
847
848 async fn test_rpc_server_inner(done_tx: flume::Sender<()>) {
849 let chain = NetworkChain::Calibnet;
850 let db = Arc::new(MemoryDB::default());
851 let mut services = JoinSet::new();
852 let (state, mut shutdown_recv) = offline_rpc_state(chain, db, None, None, &mut services)
853 .await
854 .unwrap();
855 let block_delay_secs = state.chain_config().block_delay_secs;
856 let shutdown_send = state.shutdown.clone();
857 let jwt_read_permissions = vec!["read".to_owned()];
858 let jwt_read = super::methods::auth::AuthNew::create_token(
859 &state.keystore.read(),
860 chrono::Duration::hours(1),
861 jwt_read_permissions.clone(),
862 )
863 .unwrap();
864 let rpc_listener =
865 tokio::net::TcpListener::bind(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0))
866 .await
867 .unwrap();
868 let rpc_address = rpc_listener.local_addr().unwrap();
869 let (stop_handle, server_handle) = stop_channel();
870
871 let handle = tokio::spawn(start_rpc(state, rpc_listener, stop_handle, None));
874
875 println!("sending a few http requests");
876
877 let client = Client::from_url(
878 format!("http://{}:{}/", rpc_address.ip(), rpc_address.port())
879 .parse()
880 .unwrap(),
881 );
882
883 let response = super::methods::common::Version::call(&client, ())
884 .await
885 .unwrap();
886 assert_eq!(
887 &response.version,
888 &*crate::utils::version::FOREST_VERSION_STRING
889 );
890 assert_eq!(response.block_delay, block_delay_secs);
891 assert_eq!(response.api_version, ShiftingVersion::new(2, 3, 0));
892
893 let response = super::methods::auth::AuthVerify::call(&client, (jwt_read.clone(),))
894 .await
895 .unwrap();
896 assert_eq!(response, jwt_read_permissions);
897
898 drop(client);
899
900 println!("sending a few websocket requests");
901
902 let client = Client::from_url(
903 format!("ws://{}:{}/", rpc_address.ip(), rpc_address.port())
904 .parse()
905 .unwrap(),
906 );
907
908 let response = super::methods::auth::AuthVerify::call(&client, (jwt_read,))
909 .await
910 .unwrap();
911 assert_eq!(response, jwt_read_permissions);
912
913 drop(client);
914
915 println!("sending shutdown signal");
917 shutdown_send.send(()).await.unwrap();
918 println!("waiting on shutdown receiver");
919 shutdown_recv.recv().await;
920 println!("sending server stop signal");
921 server_handle.stop().unwrap();
922 println!("waiting on graceful shutdown");
923 handle.await.unwrap().unwrap();
924 println!("done");
925 done_tx.send(()).unwrap();
926 }
927}