1use crate::db::EthMappingsStore;
5use crate::rpc::methods::eth::pubsub_trait::EthPubSubApiServer;
6mod auth_layer;
7mod channel;
8mod client;
9mod compression_layer;
10mod filter_layer;
11mod filter_list;
12pub mod json_validator;
13mod log_layer;
14mod metrics_layer;
15mod request;
16mod segregation_layer;
17mod set_extension_layer;
18mod validation_layer;
19
20use crate::rpc::eth::types::RandomHexStringIdProvider;
21use crate::shim::clock::ChainEpoch;
22use clap::ValueEnum as _;
23pub use client::Client;
24pub use error::ServerError;
25use eth::filter::EthEventHandler;
26use filter_layer::FilterLayer;
27pub use filter_list::FilterList;
28use futures::FutureExt as _;
29use jsonrpsee::server::ServerConfig;
30use log_layer::LogLayer;
31use reflect::Ctx;
32pub use reflect::{ApiPaths, Permission, RpcMethod, RpcMethodExt};
33pub use request::Request;
34use schemars::Schema;
35use segregation_layer::SegregationLayer;
36use set_extension_layer::SetExtensionLayer;
37mod error;
38mod reflect;
39use ahash::HashMap;
40mod registry;
41pub mod types;
42
43pub use methods::*;
44
45pub use jsonrpsee::core::ClientError;
47
48pub const LOOKBACK_NO_LIMIT: ChainEpoch = -1;
50
51#[macro_export]
59macro_rules! for_each_rpc_method {
60 ($callback:path) => {
61 $callback!($crate::rpc::auth::AuthNew);
63 $callback!($crate::rpc::auth::AuthVerify);
64
65 $callback!($crate::rpc::beacon::BeaconGetEntry);
67
68 $callback!($crate::rpc::chain::ChainPruneSnapshot);
70 $callback!($crate::rpc::chain::ChainExport);
71 $callback!($crate::rpc::chain::ChainGetBlock);
72 $callback!($crate::rpc::chain::ChainGetBlockMessages);
73 $callback!($crate::rpc::chain::ChainGetEvents);
74 $callback!($crate::rpc::chain::ChainGetGenesis);
75 $callback!($crate::rpc::chain::ChainGetFinalizedTipset);
76 $callback!($crate::rpc::chain::ChainGetMessage);
77 $callback!($crate::rpc::chain::ChainGetMessagesInTipset);
78 $callback!($crate::rpc::chain::ChainGetMinBaseFee);
79 $callback!($crate::rpc::chain::ChainGetParentMessages);
80 $callback!($crate::rpc::chain::ChainGetParentReceipts);
81 $callback!($crate::rpc::chain::ChainGetPath);
82 $callback!($crate::rpc::chain::ChainGetTipSet);
83 $callback!($crate::rpc::chain::ChainGetTipSetV2);
84 $callback!($crate::rpc::chain::ChainGetTipSetFinalityStatus);
85 $callback!($crate::rpc::chain::ChainGetTipSetAfterHeight);
86 $callback!($crate::rpc::chain::ChainGetTipSetByHeight);
87 $callback!($crate::rpc::chain::ChainHasObj);
88 $callback!($crate::rpc::chain::ChainHead);
89 $callback!($crate::rpc::chain::ChainReadObj);
90 $callback!($crate::rpc::chain::ChainSetHead);
91 $callback!($crate::rpc::chain::ChainStatObj);
92 $callback!($crate::rpc::chain::ChainTipSetWeight);
93 $callback!($crate::rpc::chain::ForestChainExport);
94 $callback!($crate::rpc::chain::ForestChainExportDiff);
95 $callback!($crate::rpc::chain::ForestChainExportStatus);
96 $callback!($crate::rpc::chain::ForestChainExportCancel);
97 $callback!($crate::rpc::chain::ChainGetTipsetByParentState);
98
99 $callback!($crate::rpc::common::Session);
101 $callback!($crate::rpc::common::Shutdown);
102 $callback!($crate::rpc::common::StartTime);
103 $callback!($crate::rpc::common::Version);
104
105 $callback!($crate::rpc::eth::EthAccounts);
107 $callback!($crate::rpc::eth::EthAddressToFilecoinAddress);
108 $callback!($crate::rpc::eth::FilecoinAddressToEthAddress);
109 $callback!($crate::rpc::eth::EthBlockNumber);
110 $callback!($crate::rpc::eth::EthCall);
111 $callback!($crate::rpc::eth::EthChainId);
112 $callback!($crate::rpc::eth::EthEstimateGas);
113 $callback!($crate::rpc::eth::EthFeeHistory);
114 $callback!($crate::rpc::eth::EthGasPrice);
115 $callback!($crate::rpc::eth::EthGetBalance);
116 $callback!($crate::rpc::eth::EthGetBlockByHash);
117 $callback!($crate::rpc::eth::EthGetBlockByNumber);
118 $callback!($crate::rpc::eth::EthGetBlockReceipts);
119 $callback!($crate::rpc::eth::EthGetBlockReceiptsLimited);
120 $callback!($crate::rpc::eth::EthGetBlockTransactionCountByHash);
121 $callback!($crate::rpc::eth::EthGetBlockTransactionCountByNumber);
122 $callback!($crate::rpc::eth::EthGetCode);
123 $callback!($crate::rpc::eth::EthGetLogs);
124 $callback!($crate::rpc::eth::EthGetFilterLogs);
125 $callback!($crate::rpc::eth::EthGetFilterChanges);
126 $callback!($crate::rpc::eth::EthGetMessageCidByTransactionHash);
127 $callback!($crate::rpc::eth::EthGetStorageAt);
128 $callback!($crate::rpc::eth::EthGetTransactionByHash);
129 $callback!($crate::rpc::eth::EthGetTransactionByHashLimited);
130 $callback!($crate::rpc::eth::EthGetTransactionCount);
131 $callback!($crate::rpc::eth::EthGetTransactionHashByCid);
132 $callback!($crate::rpc::eth::EthGetTransactionByBlockNumberAndIndex);
133 $callback!($crate::rpc::eth::EthGetTransactionByBlockHashAndIndex);
134 $callback!($crate::rpc::eth::EthMaxPriorityFeePerGas);
135 $callback!($crate::rpc::eth::EthProtocolVersion);
136 $callback!($crate::rpc::eth::EthGetTransactionReceipt);
137 $callback!($crate::rpc::eth::EthGetTransactionReceiptLimited);
138 $callback!($crate::rpc::eth::EthNewFilter);
139 $callback!($crate::rpc::eth::EthNewPendingTransactionFilter);
140 $callback!($crate::rpc::eth::EthNewBlockFilter);
141 $callback!($crate::rpc::eth::EthUninstallFilter);
142 $callback!($crate::rpc::eth::EthUnsubscribe);
143 $callback!($crate::rpc::eth::EthSubscribe);
144 $callback!($crate::rpc::eth::EthSyncing);
145 $callback!($crate::rpc::eth::EthTraceBlock);
146 $callback!($crate::rpc::eth::EthTraceCall);
147 $callback!($crate::rpc::eth::EthTraceFilter);
148 $callback!($crate::rpc::eth::EthTraceTransaction);
149 $callback!($crate::rpc::eth::EthDebugTraceTransaction);
150 $callback!($crate::rpc::eth::EthTraceReplayBlockTransactions);
151 $callback!($crate::rpc::eth::Web3ClientVersion);
152 $callback!($crate::rpc::eth::EthSendRawTransaction);
153 $callback!($crate::rpc::eth::EthSendRawTransactionUntrusted);
154
155 $callback!($crate::rpc::gas::GasEstimateFeeCap);
157 $callback!($crate::rpc::gas::GasEstimateGasLimit);
158 $callback!($crate::rpc::gas::GasEstimateGasPremium);
159 $callback!($crate::rpc::gas::GasEstimateMessageGas);
160
161 $callback!($crate::rpc::market::MarketAddBalance);
163
164 $callback!($crate::rpc::miner::MinerCreateBlock);
166 $callback!($crate::rpc::miner::MinerGetBaseInfo);
167
168 $callback!($crate::rpc::mpool::MpoolBatchPush);
170 $callback!($crate::rpc::mpool::MpoolBatchPushUntrusted);
171 $callback!($crate::rpc::mpool::MpoolGetNonce);
172 $callback!($crate::rpc::mpool::MpoolPending);
173 $callback!($crate::rpc::mpool::MpoolPush);
174 $callback!($crate::rpc::mpool::MpoolPushMessage);
175 $callback!($crate::rpc::mpool::MpoolPushUntrusted);
176 $callback!($crate::rpc::mpool::MpoolSelect);
177
178 $callback!($crate::rpc::msig::MsigGetAvailableBalance);
180 $callback!($crate::rpc::msig::MsigGetPending);
181 $callback!($crate::rpc::msig::MsigGetVested);
182 $callback!($crate::rpc::msig::MsigGetVestingSchedule);
183
184 $callback!($crate::rpc::net::NetAddrsListen);
186 $callback!($crate::rpc::net::NetAgentVersion);
187 $callback!($crate::rpc::net::NetAutoNatStatus);
188 $callback!($crate::rpc::net::NetConnect);
189 $callback!($crate::rpc::net::NetDisconnect);
190 $callback!($crate::rpc::net::NetFindPeer);
191 $callback!($crate::rpc::net::NetInfo);
192 $callback!($crate::rpc::net::NetListening);
193 $callback!($crate::rpc::net::NetPeers);
194 $callback!($crate::rpc::net::NetProtectAdd);
195 $callback!($crate::rpc::net::NetProtectList);
196 $callback!($crate::rpc::net::NetProtectRemove);
197 $callback!($crate::rpc::net::NetVersion);
198 $callback!($crate::rpc::net::NetChainExchange);
199
200 $callback!($crate::rpc::node::NodeStatus);
202
203 $callback!($crate::rpc::state::StateAccountKey);
205 $callback!($crate::rpc::state::StateCall);
206 $callback!($crate::rpc::state::StateCirculatingSupply);
207 $callback!($crate::rpc::state::ForestStateCompute);
208 $callback!($crate::rpc::state::StateCompute);
209 $callback!($crate::rpc::state::StateDealProviderCollateralBounds);
210 $callback!($crate::rpc::state::StateFetchRoot);
211 $callback!($crate::rpc::state::StateGetActor);
212 $callback!($crate::rpc::state::StateGetActorV2);
213 $callback!($crate::rpc::state::StateGetID);
214 $callback!($crate::rpc::state::StateGetAllAllocations);
215 $callback!($crate::rpc::state::StateGetAllClaims);
216 $callback!($crate::rpc::state::StateGetAllocation);
217 $callback!($crate::rpc::state::StateGetAllocationForPendingDeal);
218 $callback!($crate::rpc::state::StateGetAllocationIdForPendingDeal);
219 $callback!($crate::rpc::state::StateGetAllocations);
220 $callback!($crate::rpc::state::StateGetBeaconEntry);
221 $callback!($crate::rpc::state::StateGetClaim);
222 $callback!($crate::rpc::state::StateGetClaims);
223 $callback!($crate::rpc::state::StateGetNetworkParams);
224 $callback!($crate::rpc::state::StateGetRandomnessDigestFromBeacon);
225 $callback!($crate::rpc::state::StateGetRandomnessDigestFromTickets);
226 $callback!($crate::rpc::state::StateGetRandomnessFromBeacon);
227 $callback!($crate::rpc::state::StateGetRandomnessFromTickets);
228 $callback!($crate::rpc::state::StateGetReceipt);
229 $callback!($crate::rpc::state::StateListActors);
230 $callback!($crate::rpc::state::StateListMessages);
231 $callback!($crate::rpc::state::StateListMiners);
232 $callback!($crate::rpc::state::StateLookupID);
233 $callback!($crate::rpc::state::StateLookupRobustAddress);
234 $callback!($crate::rpc::state::StateMarketBalance);
235 $callback!($crate::rpc::state::StateMarketDeals);
236 $callback!($crate::rpc::state::StateMarketParticipants);
237 $callback!($crate::rpc::state::StateMarketStorageDeal);
238 $callback!($crate::rpc::state::StateMinerActiveSectors);
239 $callback!($crate::rpc::state::StateMinerAllocated);
240 $callback!($crate::rpc::state::StateMinerAvailableBalance);
241 $callback!($crate::rpc::state::StateMinerDeadlines);
242 $callback!($crate::rpc::state::StateMinerFaults);
243 $callback!($crate::rpc::state::StateMinerInfo);
244 $callback!($crate::rpc::state::StateMinerInitialPledgeCollateral);
245 $callback!($crate::rpc::state::StateMinerPartitions);
246 $callback!($crate::rpc::state::StateMinerPower);
247 $callback!($crate::rpc::state::StateMinerPreCommitDepositForPower);
248 $callback!($crate::rpc::state::StateMinerProvingDeadline);
249 $callback!($crate::rpc::state::StateMinerRecoveries);
250 $callback!($crate::rpc::state::StateMinerSectorAllocated);
251 $callback!($crate::rpc::state::StateMinerSectorCount);
252 $callback!($crate::rpc::state::StateMinerSectors);
253 $callback!($crate::rpc::state::StateNetworkName);
254 $callback!($crate::rpc::state::StateNetworkVersion);
255 $callback!($crate::rpc::state::StateActorInfo);
256 $callback!($crate::rpc::state::StateReadState);
257 $callback!($crate::rpc::state::StateDecodeParams);
258 $callback!($crate::rpc::state::StateReplay);
259 $callback!($crate::rpc::state::StateSearchMsg);
260 $callback!($crate::rpc::state::StateSearchMsgLimited);
261 $callback!($crate::rpc::state::StateSectorExpiration);
262 $callback!($crate::rpc::state::StateSectorGetInfo);
263 $callback!($crate::rpc::state::StateSectorPartition);
264 $callback!($crate::rpc::state::StateSectorPreCommitInfo);
265 $callback!($crate::rpc::state::StateSectorPreCommitInfoV0);
266 $callback!($crate::rpc::state::StateVerifiedClientStatus);
267 $callback!($crate::rpc::state::StateVerifiedRegistryRootKey);
268 $callback!($crate::rpc::state::StateVerifierStatus);
269 $callback!($crate::rpc::state::StateVMCirculatingSupplyInternal);
270 $callback!($crate::rpc::state::StateWaitMsg);
271 $callback!($crate::rpc::state::StateWaitMsgV0);
272 $callback!($crate::rpc::state::StateMinerInitialPledgeForSector);
273
274 $callback!($crate::rpc::sync::SyncCheckBad);
276 $callback!($crate::rpc::sync::SyncMarkBad);
277 $callback!($crate::rpc::sync::SyncSnapshotProgress);
278 $callback!($crate::rpc::sync::SyncStatus);
279 $callback!($crate::rpc::sync::SyncSubmitBlock);
280
281 $callback!($crate::rpc::wallet::WalletBalance);
283 $callback!($crate::rpc::wallet::WalletDefaultAddress);
284 $callback!($crate::rpc::wallet::WalletDelete);
285 $callback!($crate::rpc::wallet::WalletExport);
286 $callback!($crate::rpc::wallet::WalletHas);
287 $callback!($crate::rpc::wallet::WalletImport);
288 $callback!($crate::rpc::wallet::WalletList);
289 $callback!($crate::rpc::wallet::WalletNew);
290 $callback!($crate::rpc::wallet::WalletSetDefault);
291 $callback!($crate::rpc::wallet::WalletSign);
292 $callback!($crate::rpc::wallet::WalletSignMessage);
293 $callback!($crate::rpc::wallet::WalletValidateAddress);
294 $callback!($crate::rpc::wallet::WalletVerify);
295
296 $callback!($crate::rpc::f3::GetRawNetworkName);
298 $callback!($crate::rpc::f3::F3GetCertificate);
299 $callback!($crate::rpc::f3::F3GetECPowerTable);
300 $callback!($crate::rpc::f3::F3GetF3PowerTable);
301 $callback!($crate::rpc::f3::F3GetF3PowerTableByInstance);
302 $callback!($crate::rpc::f3::F3IsRunning);
303 $callback!($crate::rpc::f3::F3GetProgress);
304 $callback!($crate::rpc::f3::F3GetManifest);
305 $callback!($crate::rpc::f3::F3ListParticipants);
306 $callback!($crate::rpc::f3::F3GetLatestCertificate);
307 $callback!($crate::rpc::f3::F3GetOrRenewParticipationTicket);
308 $callback!($crate::rpc::f3::F3Participate);
309 $callback!($crate::rpc::f3::F3ExportLatestSnapshot);
310 $callback!($crate::rpc::f3::GetHead);
311 $callback!($crate::rpc::f3::GetParent);
312 $callback!($crate::rpc::f3::GetParticipatingMinerIDs);
313 $callback!($crate::rpc::f3::GetPowerTable);
314 $callback!($crate::rpc::f3::GetTipset);
315 $callback!($crate::rpc::f3::GetTipsetByEpoch);
316 $callback!($crate::rpc::f3::Finalize);
317 $callback!($crate::rpc::f3::ProtectPeer);
318 $callback!($crate::rpc::f3::SignMessage);
319
320 $callback!($crate::rpc::misc::GetActorEventsRaw);
322 };
323}
324use compression_layer::{COMPRESS_MIN_BODY_SIZE, CompressionLayer};
325pub(crate) use for_each_rpc_method;
326use sync::SnapshotProgressTracker;
327use tower_http::sensitive_headers::SetSensitiveRequestHeadersLayer;
328
329#[allow(unused)]
330pub mod prelude {
342 use super::*;
343
344 pub use reflect::RpcMethodExt as _;
345
346 macro_rules! export {
347 ($ty:ty) => {
348 pub use $ty;
349 };
350 }
351
352 for_each_rpc_method!(export);
353}
354
355pub fn collect_rpc_method_info() -> Vec<(&'static str, Permission)> {
357 use crate::rpc::RpcMethod;
358
359 let mut methods = Vec::new();
360
361 macro_rules! add_method {
362 ($ty:ty) => {
363 methods.push((<$ty>::NAME, <$ty>::PERMISSION));
364 };
365 }
366
367 for_each_rpc_method!(add_method);
368
369 methods
370}
371
372mod methods {
409 pub mod auth;
410 pub mod beacon;
411 pub mod chain;
412 pub mod common;
413 pub mod eth;
414 pub mod f3;
415 pub mod gas;
416 pub mod market;
417 pub mod miner;
418 pub mod misc;
419 pub mod mpool;
420 pub mod msig;
421 pub mod net;
422 pub mod node;
423 pub mod state;
424 pub mod sync;
425 pub mod wallet;
426}
427
428use crate::rpc::auth_layer::AuthLayer;
429pub use crate::rpc::channel::CANCEL_METHOD_NAME;
430use crate::rpc::channel::RpcModule as FilRpcModule;
431use crate::rpc::eth::pubsub::EthPubSub;
432use crate::rpc::metrics_layer::MetricsLayer;
433use crate::{chain_sync::network_context::SyncNetworkContext, key_management::KeyStore};
434
435use crate::blocks::FullTipset;
436use crate::utils::misc::env::env_or_default;
437use fvm_ipld_blockstore::Blockstore;
438use jsonrpsee::{
439 Methods,
440 core::middleware::RpcServiceBuilder,
441 server::{RpcModule, Server, StopHandle, TowerServiceBuilder},
442};
443use parking_lot::RwLock;
444use std::env;
445use std::sync::{Arc, LazyLock};
446use std::time::Duration;
447use tokio::sync::mpsc;
448use tower::Service;
449
450use crate::rpc::sync::SnapshotProgressState;
451use openrpc_types::{self, ParamStructure};
452
453pub const DEFAULT_PORT: u16 = 2345;
454
455static DEFAULT_REQUEST_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
457 env::var("FOREST_RPC_DEFAULT_TIMEOUT")
458 .ok()
459 .and_then(|it| Duration::from_secs(it.parse().ok()?).into())
460 .unwrap_or(Duration::from_secs(60))
461});
462
463pub fn default_max_connections() -> u32 {
469 static VALUE: LazyLock<u32> = LazyLock::new(|| {
470 env::var("FOREST_RPC_MAX_CONNECTIONS")
471 .ok()
472 .and_then(|it| it.parse().ok())
473 .unwrap_or(1000)
474 });
475 *VALUE
476}
477
478const MAX_REQUEST_BODY_SIZE: u32 = 64 * 1024 * 1024;
479
480static MAX_RESPONSE_BODY_SIZE: LazyLock<u32> =
487 LazyLock::new(|| env_or_default("FOREST_RPC_MAX_RESPONSE_BODY_SIZE", MAX_REQUEST_BODY_SIZE));
488
489pub struct RPCState<DB> {
492 pub keystore: Arc<RwLock<KeyStore>>,
493 pub state_manager: Arc<crate::state_manager::StateManager<DB>>,
494 pub mpool: Arc<crate::message_pool::MessagePool<Arc<crate::chain::ChainStore<DB>>>>,
495 pub bad_blocks: Option<Arc<crate::chain_sync::BadBlockCache>>,
496 pub sync_status: crate::chain_sync::SyncStatus,
497 pub eth_event_handler: Arc<EthEventHandler>,
498 pub sync_network_context: SyncNetworkContext<DB>,
499 pub tipset_send: flume::Sender<FullTipset>,
500 pub start_time: chrono::DateTime<chrono::Utc>,
501 pub snapshot_progress_tracker: SnapshotProgressTracker,
502 pub shutdown: mpsc::Sender<()>,
503 pub mpool_locker: crate::message_pool::MpoolLocker,
504 pub nonce_tracker: crate::message_pool::NonceTracker,
505 pub temp_dir: Arc<std::path::PathBuf>,
506}
507
508impl<DB: Blockstore> RPCState<DB> {
509 pub fn beacon(&self) -> &Arc<crate::beacon::BeaconSchedule> {
510 self.state_manager.beacon_schedule()
511 }
512
513 pub fn chain_store(&self) -> &Arc<crate::chain::ChainStore<DB>> {
514 self.state_manager.chain_store()
515 }
516
517 pub fn chain_index(&self) -> &crate::chain::index::ChainIndex<DB> {
518 self.chain_store().chain_index()
519 }
520
521 pub fn chain_config(&self) -> &Arc<crate::networks::ChainConfig> {
522 self.state_manager.chain_config()
523 }
524
525 pub fn store(&self) -> &Arc<DB> {
526 self.chain_store().blockstore()
527 }
528
529 pub fn store_owned(&self) -> Arc<DB> {
530 self.state_manager.blockstore_owned()
531 }
532
533 pub fn network_send(&self) -> &flume::Sender<crate::libp2p::NetworkMessage> {
534 self.sync_network_context.network_send()
535 }
536
537 pub fn get_snapshot_progress_tracker(&self) -> SnapshotProgressState {
538 self.snapshot_progress_tracker.state()
539 }
540}
541
542#[derive(Clone)]
543struct PerConnection<RpcMiddleware, HttpMiddleware> {
544 stop_handle: StopHandle,
545 svc_builder: TowerServiceBuilder<RpcMiddleware, HttpMiddleware>,
546 keystore: Arc<RwLock<KeyStore>>,
547}
548
549pub async fn start_rpc<DB>(
550 state: RPCState<DB>,
551 rpc_listener: tokio::net::TcpListener,
552 stop_handle: StopHandle,
553 filter_list: Option<FilterList>,
554) -> anyhow::Result<()>
555where
556 DB: Blockstore + EthMappingsStore + Send + Sync + 'static,
557{
558 let filter_list = filter_list.unwrap_or_default();
559 let state = Arc::new(state);
561 let keystore = state.keystore.clone();
562 let mut modules = create_modules(state.clone());
563
564 let mut pubsub_module = FilRpcModule::default();
565 pubsub_module.register_channel("Filecoin.ChainNotify", {
566 let state_clone = state.clone();
567 move |params| chain::chain_notify(params, &state_clone)
568 })?;
569
570 for module in modules.values_mut() {
571 module.merge(EthPubSub::new(state.clone()).into_rpc())?;
573 module.merge(pubsub_module.clone())?;
574 }
575
576 let methods: Arc<HashMap<ApiPaths, Methods>> =
577 Arc::new(modules.into_iter().map(|(k, v)| (k, v.into())).collect());
578
579 let per_conn = PerConnection {
580 stop_handle: stop_handle.clone(),
581 svc_builder: Server::builder()
582 .set_config(
583 ServerConfig::builder()
584 .max_request_body_size(MAX_REQUEST_BODY_SIZE)
585 .max_response_body_size(*MAX_RESPONSE_BODY_SIZE)
587 .max_connections(default_max_connections())
588 .set_id_provider(RandomHexStringIdProvider::new())
589 .build(),
590 )
591 .set_http_middleware(
592 tower::ServiceBuilder::new()
593 .option_layer(COMPRESS_MIN_BODY_SIZE.map(CompressionLayer::new))
594 .layer(SetSensitiveRequestHeadersLayer::new(std::iter::once(
596 http::header::AUTHORIZATION,
597 ))),
598 )
599 .to_service_builder(),
600 keystore,
601 };
602 tracing::info!("Ready for RPC connections");
603 loop {
604 let sock = tokio::select! {
605 res = rpc_listener.accept() => {
606 match res {
607 Ok((stream, _remote_addr)) => stream,
608 Err(e) => {
609 tracing::error!("failed to accept v4 connection: {:?}", e);
610 continue;
611 }
612 }
613 }
614 _ = per_conn.stop_handle.clone().shutdown() => break,
615 };
616
617 let svc = tower::service_fn({
618 let methods = methods.clone();
619 let per_conn = per_conn.clone();
620 let filter_list = filter_list.clone();
621 move |req| {
622 let is_websocket = jsonrpsee::server::ws::is_upgrade_request(&req);
623 let path = if let Ok(p) = ApiPaths::from_uri(req.uri()) {
624 p
625 } else {
626 return async move {
627 Ok(http::Response::builder()
628 .status(http::StatusCode::NOT_FOUND)
629 .body(Default::default())
630 .unwrap_or_else(|_| http::Response::new(Default::default())))
631 }
632 .boxed();
633 };
634 let methods = methods.get(&path).cloned().unwrap_or_default();
635 let PerConnection {
636 stop_handle,
637 svc_builder,
638 keystore,
639 } = per_conn.clone();
640 let headers = req.headers().clone();
643 let rpc_middleware = RpcServiceBuilder::new()
644 .layer(SetExtensionLayer { path })
645 .layer(SegregationLayer)
646 .layer(FilterLayer::new(filter_list.clone()))
647 .layer(validation_layer::JsonValidationLayer)
648 .layer(AuthLayer {
649 headers,
650 keystore: keystore.clone(),
651 })
652 .layer(LogLayer::default())
653 .layer(MetricsLayer::default());
654 let mut jsonrpsee_svc = svc_builder
655 .set_rpc_middleware(rpc_middleware)
656 .build(methods, stop_handle);
657
658 if is_websocket {
659 let session_close = jsonrpsee_svc.on_session_closed();
662
663 tokio::spawn(async move {
666 session_close.await;
667 tracing::trace!("Closed WebSocket connection");
668 });
669
670 async move {
671 tracing::trace!("Opened WebSocket connection");
672 jsonrpsee_svc
676 .call(req)
677 .await
678 .map_err(|e| anyhow::anyhow!("{:?}", e))
679 }
680 .boxed()
681 } else {
682 async move {
684 tracing::trace!("Opened HTTP connection");
685 let rp = jsonrpsee_svc.call(req).await;
686 tracing::trace!("Closed HTTP connection");
687 rp.map_err(|e| anyhow::anyhow!("{:?}", e))
691 }
692 .boxed()
693 }
694 }
695 });
696
697 tokio::spawn(jsonrpsee::server::serve_with_graceful_shutdown(
698 sock,
699 svc,
700 stop_handle.clone().shutdown(),
701 ));
702 }
703
704 Ok(())
705}
706
707fn create_modules<DB>(state: Arc<RPCState<DB>>) -> HashMap<ApiPaths, RpcModule<RPCState<DB>>>
708where
709 DB: Blockstore + EthMappingsStore + Send + Sync + 'static,
710{
711 let mut modules = HashMap::default();
712 for api_version in ApiPaths::value_variants() {
713 modules.insert(*api_version, RpcModule::from_arc(state.clone()));
714 }
715 macro_rules! register {
716 ($ty:ty) => {
717 if !<$ty>::SUBSCRIPTION {
720 <$ty>::register(&mut modules, ParamStructure::ByPosition).unwrap();
721 }
722 };
723 }
724 for_each_rpc_method!(register);
725 modules
726}
727
728pub fn openrpc(path: ApiPaths, include: Option<&[&str]>) -> openrpc_types::OpenRPC {
730 use schemars::generate::{SchemaGenerator, SchemaSettings};
731
732 let mut methods = vec![];
733 let mut settings = SchemaSettings::draft07();
735 settings.definitions_path = "#/components/schemas/".into();
737 let mut generator = SchemaGenerator::new(settings);
738 macro_rules! callback {
739 ($ty:ty) => {
740 if <$ty>::API_PATHS.contains(path) {
741 match include {
742 Some(include) => match include.contains(&<$ty>::NAME) {
743 true => {
744 methods.push(openrpc_types::ReferenceOr::Item(<$ty>::openrpc(
745 &mut generator,
746 ParamStructure::ByPosition,
747 &<$ty>::NAME,
748 )));
749 if let Some(alias) = &<$ty>::NAME_ALIAS {
750 methods.push(openrpc_types::ReferenceOr::Item(<$ty>::openrpc(
751 &mut generator,
752 ParamStructure::ByPosition,
753 &alias,
754 )));
755 }
756 }
757 false => {}
758 },
759 None => {
760 methods.push(openrpc_types::ReferenceOr::Item(<$ty>::openrpc(
761 &mut generator,
762 ParamStructure::ByPosition,
763 &<$ty>::NAME,
764 )));
765 if let Some(alias) = &<$ty>::NAME_ALIAS {
766 methods.push(openrpc_types::ReferenceOr::Item(<$ty>::openrpc(
767 &mut generator,
768 ParamStructure::ByPosition,
769 &alias,
770 )));
771 }
772 }
773 }
774 }
775 };
776 }
777 for_each_rpc_method!(callback);
778 openrpc_types::OpenRPC {
779 methods,
780 components: Some(openrpc_types::Components {
781 schemas: Some(
782 generator
783 .take_definitions(false)
784 .into_iter()
785 .filter_map(|(k, v)| {
786 if let Ok(v) = Schema::try_from(v) {
787 Some((k, v))
788 } else {
789 None
790 }
791 })
792 .collect(),
793 ),
794 ..Default::default()
795 }),
796 openrpc: openrpc_types::OPEN_RPC_SPECIFICATION_VERSION,
797 info: openrpc_types::Info {
798 title: String::from("forest"),
799 version: env!("CARGO_PKG_VERSION").into(),
800 ..Default::default()
801 },
802 ..Default::default()
803 }
804}
805
806#[cfg(test)]
807mod tests {
808 use super::*;
809 use crate::{
810 db::MemoryDB, networks::NetworkChain, rpc::common::ShiftingVersion,
811 tool::offline_server::server::offline_rpc_state,
812 };
813 use jsonrpsee::server::stop_channel;
814 use std::net::{Ipv4Addr, SocketAddr};
815 use tokio::task::JoinSet;
816
817 #[test]
822 fn openrpc_v0() {
823 openrpc(ApiPaths::V0);
824 }
825
826 #[test]
827 fn openrpc_v1() {
828 openrpc(ApiPaths::V1);
829 }
830
831 #[test]
832 fn openrpc_v2() {
833 openrpc(ApiPaths::V2);
834 }
835
836 fn openrpc(path: ApiPaths) {
837 let spec = super::openrpc(path, None);
838 insta::assert_yaml_snapshot!(path.path(), spec);
839 }
840
841 #[test]
842 fn test_rpc_server() {
843 const TIMEOUT: Duration = Duration::from_secs(5);
844 let (done_tx, done_rx) = flume::bounded(1);
845 let rt = tokio::runtime::Builder::new_multi_thread()
846 .enable_all()
847 .build()
848 .unwrap();
849 rt.block_on(async move { test_rpc_server_inner(done_tx).await });
850 done_rx.recv().unwrap();
851 rt.shutdown_timeout(TIMEOUT);
853 }
854
855 async fn test_rpc_server_inner(done_tx: flume::Sender<()>) {
856 let chain = NetworkChain::Calibnet;
857 let db = Arc::new(MemoryDB::default());
858 let mut services = JoinSet::new();
859 let (state, mut shutdown_recv) = offline_rpc_state(chain, db, None, None, &mut services)
860 .await
861 .unwrap();
862 let block_delay_secs = state.chain_config().block_delay_secs;
863 let shutdown_send = state.shutdown.clone();
864 let jwt_read_permissions = vec!["read".to_owned()];
865 let jwt_read = super::methods::auth::AuthNew::create_token(
866 &state.keystore.read(),
867 chrono::Duration::hours(1),
868 jwt_read_permissions.clone(),
869 )
870 .unwrap();
871 let rpc_listener =
872 tokio::net::TcpListener::bind(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0))
873 .await
874 .unwrap();
875 let rpc_address = rpc_listener.local_addr().unwrap();
876 let (stop_handle, server_handle) = stop_channel();
877
878 let handle = tokio::spawn(start_rpc(state, rpc_listener, stop_handle, None));
881
882 println!("sending a few http requests");
883
884 let client = Client::from_url(
885 format!("http://{}:{}/", rpc_address.ip(), rpc_address.port())
886 .parse()
887 .unwrap(),
888 );
889
890 let response = super::methods::common::Version::call(&client, ())
891 .await
892 .unwrap();
893 assert_eq!(
894 &response.version,
895 &*crate::utils::version::FOREST_VERSION_STRING
896 );
897 assert_eq!(response.block_delay, block_delay_secs);
898 assert_eq!(response.api_version, ShiftingVersion::new(2, 3, 0));
899
900 let response = super::methods::auth::AuthVerify::call(&client, (jwt_read.clone(),))
901 .await
902 .unwrap();
903 assert_eq!(response, jwt_read_permissions);
904
905 drop(client);
906
907 println!("sending a few websocket requests");
908
909 let client = Client::from_url(
910 format!("ws://{}:{}/", rpc_address.ip(), rpc_address.port())
911 .parse()
912 .unwrap(),
913 );
914
915 let response = super::methods::auth::AuthVerify::call(&client, (jwt_read,))
916 .await
917 .unwrap();
918 assert_eq!(response, jwt_read_permissions);
919
920 drop(client);
921
922 println!("sending shutdown signal");
924 shutdown_send.send(()).await.unwrap();
925 println!("waiting on shutdown receiver");
926 shutdown_recv.recv().await;
927 println!("sending server stop signal");
928 server_handle.stop().unwrap();
929 println!("waiting on graceful shutdown");
930 handle.await.unwrap().unwrap();
931 println!("done");
932 done_tx.send(()).unwrap();
933 }
934}