1use crate::rpc::methods::eth::pubsub_trait::EthPubSubApiServer;
5mod auth_layer;
6mod channel;
7mod client;
8mod compression_layer;
9mod filter_layer;
10mod filter_list;
11pub mod json_validator;
12mod log_layer;
13mod metrics_layer;
14mod request;
15mod segregation_layer;
16mod set_extension_layer;
17mod validation_layer;
18
19use crate::rpc::eth::types::RandomHexStringIdProvider;
20use crate::shim::clock::ChainEpoch;
21use clap::ValueEnum as _;
22pub use client::Client;
23pub use error::ServerError;
24use eth::filter::EthEventHandler;
25use filter_layer::FilterLayer;
26pub use filter_list::FilterList;
27use futures::FutureExt as _;
28use jsonrpsee::server::ServerConfig;
29use log_layer::LogLayer;
30use reflect::Ctx;
31pub use reflect::{ApiPaths, Permission, RpcMethod, RpcMethodExt};
32pub use request::Request;
33use schemars::Schema;
34use segregation_layer::SegregationLayer;
35use set_extension_layer::SetExtensionLayer;
36mod error;
37mod reflect;
38use ahash::HashMap;
39mod registry;
40pub mod types;
41
42pub use methods::*;
43
44pub use jsonrpsee::core::ClientError;
46
47pub const LOOKBACK_NO_LIMIT: ChainEpoch = -1;
49
50#[macro_export]
58macro_rules! for_each_rpc_method {
59 ($callback:path) => {
60 $callback!($crate::rpc::auth::AuthNew);
62 $callback!($crate::rpc::auth::AuthVerify);
63
64 $callback!($crate::rpc::beacon::BeaconGetEntry);
66
67 $callback!($crate::rpc::chain::ChainPruneSnapshot);
69 $callback!($crate::rpc::chain::ChainExport);
70 $callback!($crate::rpc::chain::ChainGetBlock);
71 $callback!($crate::rpc::chain::ChainGetBlockMessages);
72 $callback!($crate::rpc::chain::ChainGetEvents);
73 $callback!($crate::rpc::chain::ChainGetGenesis);
74 $callback!($crate::rpc::chain::ChainGetFinalizedTipset);
75 $callback!($crate::rpc::chain::ChainGetMessage);
76 $callback!($crate::rpc::chain::ChainGetMessagesInTipset);
77 $callback!($crate::rpc::chain::ChainGetMinBaseFee);
78 $callback!($crate::rpc::chain::ChainGetParentMessages);
79 $callback!($crate::rpc::chain::ChainGetParentReceipts);
80 $callback!($crate::rpc::chain::ChainGetPath);
81 $callback!($crate::rpc::chain::ChainGetTipSet);
82 $callback!($crate::rpc::chain::ChainGetTipSetV2);
83 $callback!($crate::rpc::chain::ChainGetTipSetFinalityStatus);
84 $callback!($crate::rpc::chain::ChainGetTipSetAfterHeight);
85 $callback!($crate::rpc::chain::ChainGetTipSetByHeight);
86 $callback!($crate::rpc::chain::ChainHasObj);
87 $callback!($crate::rpc::chain::ChainHead);
88 $callback!($crate::rpc::chain::ChainReadObj);
89 $callback!($crate::rpc::chain::ChainSetHead);
90 $callback!($crate::rpc::chain::ChainStatObj);
91 $callback!($crate::rpc::chain::ChainTipSetWeight);
92 $callback!($crate::rpc::chain::ForestChainExport);
93 $callback!($crate::rpc::chain::ForestChainExportDiff);
94 $callback!($crate::rpc::chain::ForestChainExportStatus);
95 $callback!($crate::rpc::chain::ForestChainExportCancel);
96 $callback!($crate::rpc::chain::ChainGetTipsetByParentState);
97
98 $callback!($crate::rpc::common::Session);
100 $callback!($crate::rpc::common::Shutdown);
101 $callback!($crate::rpc::common::StartTime);
102 $callback!($crate::rpc::common::Version);
103
104 $callback!($crate::rpc::eth::EthAccounts);
106 $callback!($crate::rpc::eth::EthAddressToFilecoinAddress);
107 $callback!($crate::rpc::eth::FilecoinAddressToEthAddress);
108 $callback!($crate::rpc::eth::EthBlockNumber);
109 $callback!($crate::rpc::eth::EthCall);
110 $callback!($crate::rpc::eth::EthChainId);
111 $callback!($crate::rpc::eth::EthEstimateGas);
112 $callback!($crate::rpc::eth::EthFeeHistory);
113 $callback!($crate::rpc::eth::EthGasPrice);
114 $callback!($crate::rpc::eth::EthGetBalance);
115 $callback!($crate::rpc::eth::EthGetBlockByHash);
116 $callback!($crate::rpc::eth::EthGetBlockByNumber);
117 $callback!($crate::rpc::eth::EthGetBlockReceipts);
118 $callback!($crate::rpc::eth::EthGetBlockReceiptsLimited);
119 $callback!($crate::rpc::eth::EthGetBlockTransactionCountByHash);
120 $callback!($crate::rpc::eth::EthGetBlockTransactionCountByNumber);
121 $callback!($crate::rpc::eth::EthGetCode);
122 $callback!($crate::rpc::eth::EthGetLogs);
123 $callback!($crate::rpc::eth::EthGetFilterLogs);
124 $callback!($crate::rpc::eth::EthGetFilterChanges);
125 $callback!($crate::rpc::eth::EthGetMessageCidByTransactionHash);
126 $callback!($crate::rpc::eth::EthGetStorageAt);
127 $callback!($crate::rpc::eth::EthGetTransactionByHash);
128 $callback!($crate::rpc::eth::EthGetTransactionByHashLimited);
129 $callback!($crate::rpc::eth::EthGetTransactionCount);
130 $callback!($crate::rpc::eth::EthGetTransactionHashByCid);
131 $callback!($crate::rpc::eth::EthGetTransactionByBlockNumberAndIndex);
132 $callback!($crate::rpc::eth::EthGetTransactionByBlockHashAndIndex);
133 $callback!($crate::rpc::eth::EthMaxPriorityFeePerGas);
134 $callback!($crate::rpc::eth::EthProtocolVersion);
135 $callback!($crate::rpc::eth::EthGetTransactionReceipt);
136 $callback!($crate::rpc::eth::EthGetTransactionReceiptLimited);
137 $callback!($crate::rpc::eth::EthNewFilter);
138 $callback!($crate::rpc::eth::EthNewPendingTransactionFilter);
139 $callback!($crate::rpc::eth::EthNewBlockFilter);
140 $callback!($crate::rpc::eth::EthUninstallFilter);
141 $callback!($crate::rpc::eth::EthUnsubscribe);
142 $callback!($crate::rpc::eth::EthSubscribe);
143 $callback!($crate::rpc::eth::EthSyncing);
144 $callback!($crate::rpc::eth::EthTraceBlock);
145 $callback!($crate::rpc::eth::EthTraceCall);
146 $callback!($crate::rpc::eth::EthTraceFilter);
147 $callback!($crate::rpc::eth::EthTraceTransaction);
148 $callback!($crate::rpc::eth::EthDebugTraceTransaction);
149 $callback!($crate::rpc::eth::EthTraceReplayBlockTransactions);
150 $callback!($crate::rpc::eth::Web3ClientVersion);
151 $callback!($crate::rpc::eth::EthSendRawTransaction);
152 $callback!($crate::rpc::eth::EthSendRawTransactionUntrusted);
153
154 $callback!($crate::rpc::gas::GasEstimateFeeCap);
156 $callback!($crate::rpc::gas::GasEstimateGasLimit);
157 $callback!($crate::rpc::gas::GasEstimateGasPremium);
158 $callback!($crate::rpc::gas::GasEstimateMessageGas);
159
160 $callback!($crate::rpc::market::MarketAddBalance);
162
163 $callback!($crate::rpc::miner::MinerCreateBlock);
165 $callback!($crate::rpc::miner::MinerGetBaseInfo);
166
167 $callback!($crate::rpc::mpool::MpoolBatchPush);
169 $callback!($crate::rpc::mpool::MpoolBatchPushUntrusted);
170 $callback!($crate::rpc::mpool::MpoolGetNonce);
171 $callback!($crate::rpc::mpool::MpoolPending);
172 $callback!($crate::rpc::mpool::MpoolPush);
173 $callback!($crate::rpc::mpool::MpoolPushMessage);
174 $callback!($crate::rpc::mpool::MpoolPushUntrusted);
175 $callback!($crate::rpc::mpool::MpoolSelect);
176
177 $callback!($crate::rpc::msig::MsigGetAvailableBalance);
179 $callback!($crate::rpc::msig::MsigGetPending);
180 $callback!($crate::rpc::msig::MsigGetVested);
181 $callback!($crate::rpc::msig::MsigGetVestingSchedule);
182
183 $callback!($crate::rpc::net::NetAddrsListen);
185 $callback!($crate::rpc::net::NetAgentVersion);
186 $callback!($crate::rpc::net::NetAutoNatStatus);
187 $callback!($crate::rpc::net::NetConnect);
188 $callback!($crate::rpc::net::NetDisconnect);
189 $callback!($crate::rpc::net::NetFindPeer);
190 $callback!($crate::rpc::net::NetInfo);
191 $callback!($crate::rpc::net::NetListening);
192 $callback!($crate::rpc::net::NetPeers);
193 $callback!($crate::rpc::net::NetProtectAdd);
194 $callback!($crate::rpc::net::NetProtectList);
195 $callback!($crate::rpc::net::NetProtectRemove);
196 $callback!($crate::rpc::net::NetVersion);
197 $callback!($crate::rpc::net::NetChainExchange);
198
199 $callback!($crate::rpc::node::NodeStatus);
201
202 $callback!($crate::rpc::state::StateAccountKey);
204 $callback!($crate::rpc::state::StateCall);
205 $callback!($crate::rpc::state::StateCirculatingSupply);
206 $callback!($crate::rpc::state::ForestStateCompute);
207 $callback!($crate::rpc::state::StateCompute);
208 $callback!($crate::rpc::state::StateDealProviderCollateralBounds);
209 $callback!($crate::rpc::state::StateFetchRoot);
210 $callback!($crate::rpc::state::StateGetActor);
211 $callback!($crate::rpc::state::StateGetActorV2);
212 $callback!($crate::rpc::state::StateGetID);
213 $callback!($crate::rpc::state::StateGetAllAllocations);
214 $callback!($crate::rpc::state::StateGetAllClaims);
215 $callback!($crate::rpc::state::StateGetAllocation);
216 $callback!($crate::rpc::state::StateGetAllocationForPendingDeal);
217 $callback!($crate::rpc::state::StateGetAllocationIdForPendingDeal);
218 $callback!($crate::rpc::state::StateGetAllocations);
219 $callback!($crate::rpc::state::StateGetBeaconEntry);
220 $callback!($crate::rpc::state::StateGetClaim);
221 $callback!($crate::rpc::state::StateGetClaims);
222 $callback!($crate::rpc::state::StateGetNetworkParams);
223 $callback!($crate::rpc::state::StateGetRandomnessDigestFromBeacon);
224 $callback!($crate::rpc::state::StateGetRandomnessDigestFromTickets);
225 $callback!($crate::rpc::state::StateGetRandomnessFromBeacon);
226 $callback!($crate::rpc::state::StateGetRandomnessFromTickets);
227 $callback!($crate::rpc::state::StateGetReceipt);
228 $callback!($crate::rpc::state::StateListActors);
229 $callback!($crate::rpc::state::StateListMessages);
230 $callback!($crate::rpc::state::StateListMiners);
231 $callback!($crate::rpc::state::StateLookupID);
232 $callback!($crate::rpc::state::StateLookupRobustAddress);
233 $callback!($crate::rpc::state::StateMarketBalance);
234 $callback!($crate::rpc::state::StateMarketDeals);
235 $callback!($crate::rpc::state::StateMarketParticipants);
236 $callback!($crate::rpc::state::StateMarketStorageDeal);
237 $callback!($crate::rpc::state::StateMinerActiveSectors);
238 $callback!($crate::rpc::state::StateMinerAllocated);
239 $callback!($crate::rpc::state::StateMinerAvailableBalance);
240 $callback!($crate::rpc::state::StateMinerDeadlines);
241 $callback!($crate::rpc::state::StateMinerFaults);
242 $callback!($crate::rpc::state::StateMinerInfo);
243 $callback!($crate::rpc::state::StateMinerInitialPledgeCollateral);
244 $callback!($crate::rpc::state::StateMinerPartitions);
245 $callback!($crate::rpc::state::StateMinerPower);
246 $callback!($crate::rpc::state::StateMinerPreCommitDepositForPower);
247 $callback!($crate::rpc::state::StateMinerProvingDeadline);
248 $callback!($crate::rpc::state::StateMinerRecoveries);
249 $callback!($crate::rpc::state::StateMinerSectorAllocated);
250 $callback!($crate::rpc::state::StateMinerSectorCount);
251 $callback!($crate::rpc::state::StateMinerSectors);
252 $callback!($crate::rpc::state::StateNetworkName);
253 $callback!($crate::rpc::state::StateNetworkVersion);
254 $callback!($crate::rpc::state::StateActorInfo);
255 $callback!($crate::rpc::state::StateReadState);
256 $callback!($crate::rpc::state::StateDecodeParams);
257 $callback!($crate::rpc::state::StateReplay);
258 $callback!($crate::rpc::state::StateSearchMsg);
259 $callback!($crate::rpc::state::StateSearchMsgLimited);
260 $callback!($crate::rpc::state::StateSectorExpiration);
261 $callback!($crate::rpc::state::StateSectorGetInfo);
262 $callback!($crate::rpc::state::StateSectorPartition);
263 $callback!($crate::rpc::state::StateSectorPreCommitInfo);
264 $callback!($crate::rpc::state::StateSectorPreCommitInfoV0);
265 $callback!($crate::rpc::state::StateVerifiedClientStatus);
266 $callback!($crate::rpc::state::StateVerifiedRegistryRootKey);
267 $callback!($crate::rpc::state::StateVerifierStatus);
268 $callback!($crate::rpc::state::StateVMCirculatingSupplyInternal);
269 $callback!($crate::rpc::state::StateWaitMsg);
270 $callback!($crate::rpc::state::StateWaitMsgV0);
271 $callback!($crate::rpc::state::StateMinerInitialPledgeForSector);
272
273 $callback!($crate::rpc::sync::SyncCheckBad);
275 $callback!($crate::rpc::sync::SyncMarkBad);
276 $callback!($crate::rpc::sync::SyncSnapshotProgress);
277 $callback!($crate::rpc::sync::SyncStatus);
278 $callback!($crate::rpc::sync::SyncSubmitBlock);
279
280 $callback!($crate::rpc::wallet::WalletBalance);
282 $callback!($crate::rpc::wallet::WalletDefaultAddress);
283 $callback!($crate::rpc::wallet::WalletDelete);
284 $callback!($crate::rpc::wallet::WalletExport);
285 $callback!($crate::rpc::wallet::WalletHas);
286 $callback!($crate::rpc::wallet::WalletImport);
287 $callback!($crate::rpc::wallet::WalletList);
288 $callback!($crate::rpc::wallet::WalletNew);
289 $callback!($crate::rpc::wallet::WalletSetDefault);
290 $callback!($crate::rpc::wallet::WalletSign);
291 $callback!($crate::rpc::wallet::WalletSignMessage);
292 $callback!($crate::rpc::wallet::WalletValidateAddress);
293 $callback!($crate::rpc::wallet::WalletVerify);
294
295 $callback!($crate::rpc::f3::GetRawNetworkName);
297 $callback!($crate::rpc::f3::F3GetCertificate);
298 $callback!($crate::rpc::f3::F3GetECPowerTable);
299 $callback!($crate::rpc::f3::F3GetF3PowerTable);
300 $callback!($crate::rpc::f3::F3GetF3PowerTableByInstance);
301 $callback!($crate::rpc::f3::F3IsRunning);
302 $callback!($crate::rpc::f3::F3GetProgress);
303 $callback!($crate::rpc::f3::F3GetManifest);
304 $callback!($crate::rpc::f3::F3ListParticipants);
305 $callback!($crate::rpc::f3::F3GetLatestCertificate);
306 $callback!($crate::rpc::f3::F3GetOrRenewParticipationTicket);
307 $callback!($crate::rpc::f3::F3Participate);
308 $callback!($crate::rpc::f3::F3ExportLatestSnapshot);
309 $callback!($crate::rpc::f3::GetHead);
310 $callback!($crate::rpc::f3::GetParent);
311 $callback!($crate::rpc::f3::GetParticipatingMinerIDs);
312 $callback!($crate::rpc::f3::GetPowerTable);
313 $callback!($crate::rpc::f3::GetTipset);
314 $callback!($crate::rpc::f3::GetTipsetByEpoch);
315 $callback!($crate::rpc::f3::Finalize);
316 $callback!($crate::rpc::f3::ProtectPeer);
317 $callback!($crate::rpc::f3::SignMessage);
318
319 $callback!($crate::rpc::misc::GetActorEventsRaw);
321 };
322}
323use compression_layer::{COMPRESS_MIN_BODY_SIZE, CompressionLayer};
324pub(crate) use for_each_rpc_method;
325use sync::SnapshotProgressTracker;
326use tower_http::sensitive_headers::SetSensitiveRequestHeadersLayer;
327
328#[allow(unused)]
329pub mod prelude {
341 use super::*;
342
343 pub use reflect::RpcMethodExt as _;
344
345 macro_rules! export {
346 ($ty:ty) => {
347 pub use $ty;
348 };
349 }
350
351 for_each_rpc_method!(export);
352}
353
354pub fn collect_rpc_method_info() -> Vec<(&'static str, Permission)> {
356 use crate::rpc::RpcMethod;
357
358 let mut methods = Vec::new();
359
360 macro_rules! add_method {
361 ($ty:ty) => {
362 methods.push((<$ty>::NAME, <$ty>::PERMISSION));
363 };
364 }
365
366 for_each_rpc_method!(add_method);
367
368 methods
369}
370
371mod methods {
408 pub mod auth;
409 pub mod beacon;
410 pub mod chain;
411 pub mod common;
412 pub mod eth;
413 pub mod f3;
414 pub mod gas;
415 pub mod market;
416 pub mod miner;
417 pub mod misc;
418 pub mod mpool;
419 pub mod msig;
420 pub mod net;
421 pub mod node;
422 pub mod state;
423 pub mod sync;
424 pub mod wallet;
425}
426
427use crate::rpc::auth_layer::AuthLayer;
428pub use crate::rpc::channel::CANCEL_METHOD_NAME;
429use crate::rpc::channel::RpcModule as FilRpcModule;
430use crate::rpc::eth::pubsub::EthPubSub;
431use crate::rpc::metrics_layer::MetricsLayer;
432use crate::{chain_sync::network_context::SyncNetworkContext, key_management::KeyStore};
433
434use crate::blocks::FullTipset;
435use fvm_ipld_blockstore::Blockstore;
436use jsonrpsee::{
437 Methods,
438 core::middleware::RpcServiceBuilder,
439 server::{RpcModule, Server, StopHandle, TowerServiceBuilder},
440};
441use parking_lot::RwLock;
442use std::env;
443use std::sync::{Arc, LazyLock};
444use std::time::Duration;
445use tokio::sync::mpsc;
446use tower::Service;
447
448use crate::rpc::sync::SnapshotProgressState;
449use openrpc_types::{self, ParamStructure};
450
451pub const DEFAULT_PORT: u16 = 2345;
452
453static DEFAULT_REQUEST_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
455 env::var("FOREST_RPC_DEFAULT_TIMEOUT")
456 .ok()
457 .and_then(|it| Duration::from_secs(it.parse().ok()?).into())
458 .unwrap_or(Duration::from_secs(60))
459});
460
461static DEFAULT_MAX_CONNECTIONS: LazyLock<u32> = LazyLock::new(|| {
464 env::var("FOREST_RPC_MAX_CONNECTIONS")
465 .ok()
466 .and_then(|it| it.parse().ok())
467 .unwrap_or(1000)
468});
469
470const MAX_REQUEST_BODY_SIZE: u32 = 64 * 1024 * 1024;
471const MAX_RESPONSE_BODY_SIZE: u32 = MAX_REQUEST_BODY_SIZE;
472
473pub struct RPCState<DB> {
476 pub keystore: Arc<RwLock<KeyStore>>,
477 pub state_manager: Arc<crate::state_manager::StateManager<DB>>,
478 pub mpool: Arc<crate::message_pool::MessagePool<Arc<crate::chain::ChainStore<DB>>>>,
479 pub bad_blocks: Option<Arc<crate::chain_sync::BadBlockCache>>,
480 pub sync_status: crate::chain_sync::SyncStatus,
481 pub eth_event_handler: Arc<EthEventHandler>,
482 pub sync_network_context: SyncNetworkContext<DB>,
483 pub tipset_send: flume::Sender<FullTipset>,
484 pub start_time: chrono::DateTime<chrono::Utc>,
485 pub snapshot_progress_tracker: SnapshotProgressTracker,
486 pub shutdown: mpsc::Sender<()>,
487 pub mpool_locker: crate::message_pool::MpoolLocker,
488 pub nonce_tracker: crate::message_pool::NonceTracker,
489 pub temp_dir: Arc<std::path::PathBuf>,
490}
491
492impl<DB: Blockstore> RPCState<DB> {
493 pub fn beacon(&self) -> &Arc<crate::beacon::BeaconSchedule> {
494 self.state_manager.beacon_schedule()
495 }
496
497 pub fn chain_store(&self) -> &Arc<crate::chain::ChainStore<DB>> {
498 self.state_manager.chain_store()
499 }
500
501 pub fn chain_index(&self) -> &crate::chain::index::ChainIndex<DB> {
502 self.chain_store().chain_index()
503 }
504
505 pub fn chain_config(&self) -> &Arc<crate::networks::ChainConfig> {
506 self.state_manager.chain_config()
507 }
508
509 pub fn store(&self) -> &Arc<DB> {
510 self.chain_store().blockstore()
511 }
512
513 pub fn store_owned(&self) -> Arc<DB> {
514 self.state_manager.blockstore_owned()
515 }
516
517 pub fn network_send(&self) -> &flume::Sender<crate::libp2p::NetworkMessage> {
518 self.sync_network_context.network_send()
519 }
520
521 pub fn get_snapshot_progress_tracker(&self) -> SnapshotProgressState {
522 self.snapshot_progress_tracker.state()
523 }
524}
525
526#[derive(Clone)]
527struct PerConnection<RpcMiddleware, HttpMiddleware> {
528 stop_handle: StopHandle,
529 svc_builder: TowerServiceBuilder<RpcMiddleware, HttpMiddleware>,
530 keystore: Arc<RwLock<KeyStore>>,
531}
532
533pub async fn start_rpc<DB>(
534 state: RPCState<DB>,
535 rpc_listener: tokio::net::TcpListener,
536 stop_handle: StopHandle,
537 filter_list: Option<FilterList>,
538) -> anyhow::Result<()>
539where
540 DB: Blockstore + Send + Sync + 'static,
541{
542 let filter_list = filter_list.unwrap_or_default();
543 let state = Arc::new(state);
545 let keystore = state.keystore.clone();
546 let mut modules = create_modules(state.clone());
547
548 let mut pubsub_module = FilRpcModule::default();
549 pubsub_module.register_channel("Filecoin.ChainNotify", {
550 let state_clone = state.clone();
551 move |params| chain::chain_notify(params, &state_clone)
552 })?;
553
554 for module in modules.values_mut() {
555 module.merge(EthPubSub::new(state.clone()).into_rpc())?;
557 module.merge(pubsub_module.clone())?;
558 }
559
560 let methods: Arc<HashMap<ApiPaths, Methods>> =
561 Arc::new(modules.into_iter().map(|(k, v)| (k, v.into())).collect());
562
563 let per_conn = PerConnection {
564 stop_handle: stop_handle.clone(),
565 svc_builder: Server::builder()
566 .set_config(
567 ServerConfig::builder()
568 .max_request_body_size(MAX_REQUEST_BODY_SIZE)
570 .max_response_body_size(MAX_RESPONSE_BODY_SIZE)
571 .max_connections(*DEFAULT_MAX_CONNECTIONS)
572 .set_id_provider(RandomHexStringIdProvider::new())
573 .build(),
574 )
575 .set_http_middleware(
576 tower::ServiceBuilder::new()
577 .option_layer(COMPRESS_MIN_BODY_SIZE.map(CompressionLayer::new))
578 .layer(SetSensitiveRequestHeadersLayer::new(std::iter::once(
580 http::header::AUTHORIZATION,
581 ))),
582 )
583 .to_service_builder(),
584 keystore,
585 };
586 tracing::info!("Ready for RPC connections");
587 loop {
588 let sock = tokio::select! {
589 res = rpc_listener.accept() => {
590 match res {
591 Ok((stream, _remote_addr)) => stream,
592 Err(e) => {
593 tracing::error!("failed to accept v4 connection: {:?}", e);
594 continue;
595 }
596 }
597 }
598 _ = per_conn.stop_handle.clone().shutdown() => break,
599 };
600
601 let svc = tower::service_fn({
602 let methods = methods.clone();
603 let per_conn = per_conn.clone();
604 let filter_list = filter_list.clone();
605 move |req| {
606 let is_websocket = jsonrpsee::server::ws::is_upgrade_request(&req);
607 let path = if let Ok(p) = ApiPaths::from_uri(req.uri()) {
608 p
609 } else {
610 return async move {
611 Ok(http::Response::builder()
612 .status(http::StatusCode::NOT_FOUND)
613 .body(Default::default())
614 .unwrap_or_else(|_| http::Response::new(Default::default())))
615 }
616 .boxed();
617 };
618 let methods = methods.get(&path).cloned().unwrap_or_default();
619 let PerConnection {
620 stop_handle,
621 svc_builder,
622 keystore,
623 } = per_conn.clone();
624 let headers = req.headers().clone();
627 let rpc_middleware = RpcServiceBuilder::new()
628 .layer(SetExtensionLayer { path })
629 .layer(SegregationLayer)
630 .layer(FilterLayer::new(filter_list.clone()))
631 .layer(validation_layer::JsonValidationLayer)
632 .layer(AuthLayer {
633 headers,
634 keystore: keystore.clone(),
635 })
636 .layer(LogLayer::default())
637 .layer(MetricsLayer::default());
638 let mut jsonrpsee_svc = svc_builder
639 .set_rpc_middleware(rpc_middleware)
640 .build(methods, stop_handle);
641
642 if is_websocket {
643 let session_close = jsonrpsee_svc.on_session_closed();
646
647 tokio::spawn(async move {
650 session_close.await;
651 tracing::trace!("Closed WebSocket connection");
652 });
653
654 async move {
655 tracing::trace!("Opened WebSocket connection");
656 jsonrpsee_svc
660 .call(req)
661 .await
662 .map_err(|e| anyhow::anyhow!("{:?}", e))
663 }
664 .boxed()
665 } else {
666 async move {
668 tracing::trace!("Opened HTTP connection");
669 let rp = jsonrpsee_svc.call(req).await;
670 tracing::trace!("Closed HTTP connection");
671 rp.map_err(|e| anyhow::anyhow!("{:?}", e))
675 }
676 .boxed()
677 }
678 }
679 });
680
681 tokio::spawn(jsonrpsee::server::serve_with_graceful_shutdown(
682 sock,
683 svc,
684 stop_handle.clone().shutdown(),
685 ));
686 }
687
688 Ok(())
689}
690
691fn create_modules<DB>(state: Arc<RPCState<DB>>) -> HashMap<ApiPaths, RpcModule<RPCState<DB>>>
692where
693 DB: Blockstore + Send + Sync + 'static,
694{
695 let mut modules = HashMap::default();
696 for api_version in ApiPaths::value_variants() {
697 modules.insert(*api_version, RpcModule::from_arc(state.clone()));
698 }
699 macro_rules! register {
700 ($ty:ty) => {
701 if !<$ty>::SUBSCRIPTION {
704 <$ty>::register(&mut modules, ParamStructure::ByPosition).unwrap();
705 }
706 };
707 }
708 for_each_rpc_method!(register);
709 modules
710}
711
712pub fn openrpc(path: ApiPaths, include: Option<&[&str]>) -> openrpc_types::OpenRPC {
714 use schemars::generate::{SchemaGenerator, SchemaSettings};
715
716 let mut methods = vec![];
717 let mut settings = SchemaSettings::draft07();
719 settings.definitions_path = "#/components/schemas/".into();
721 let mut generator = SchemaGenerator::new(settings);
722 macro_rules! callback {
723 ($ty:ty) => {
724 if <$ty>::API_PATHS.contains(path) {
725 match include {
726 Some(include) => match include.contains(&<$ty>::NAME) {
727 true => {
728 methods.push(openrpc_types::ReferenceOr::Item(<$ty>::openrpc(
729 &mut generator,
730 ParamStructure::ByPosition,
731 &<$ty>::NAME,
732 )));
733 if let Some(alias) = &<$ty>::NAME_ALIAS {
734 methods.push(openrpc_types::ReferenceOr::Item(<$ty>::openrpc(
735 &mut generator,
736 ParamStructure::ByPosition,
737 &alias,
738 )));
739 }
740 }
741 false => {}
742 },
743 None => {
744 methods.push(openrpc_types::ReferenceOr::Item(<$ty>::openrpc(
745 &mut generator,
746 ParamStructure::ByPosition,
747 &<$ty>::NAME,
748 )));
749 if let Some(alias) = &<$ty>::NAME_ALIAS {
750 methods.push(openrpc_types::ReferenceOr::Item(<$ty>::openrpc(
751 &mut generator,
752 ParamStructure::ByPosition,
753 &alias,
754 )));
755 }
756 }
757 }
758 }
759 };
760 }
761 for_each_rpc_method!(callback);
762 openrpc_types::OpenRPC {
763 methods,
764 components: Some(openrpc_types::Components {
765 schemas: Some(
766 generator
767 .take_definitions(false)
768 .into_iter()
769 .filter_map(|(k, v)| {
770 if let Ok(v) = Schema::try_from(v) {
771 Some((k, v))
772 } else {
773 None
774 }
775 })
776 .collect(),
777 ),
778 ..Default::default()
779 }),
780 openrpc: openrpc_types::OPEN_RPC_SPECIFICATION_VERSION,
781 info: openrpc_types::Info {
782 title: String::from("forest"),
783 version: env!("CARGO_PKG_VERSION").into(),
784 ..Default::default()
785 },
786 ..Default::default()
787 }
788}
789
790#[cfg(test)]
791mod tests {
792 use super::*;
793 use crate::{
794 db::MemoryDB, networks::NetworkChain, rpc::common::ShiftingVersion,
795 tool::offline_server::server::offline_rpc_state,
796 };
797 use jsonrpsee::server::stop_channel;
798 use std::net::{Ipv4Addr, SocketAddr};
799 use tokio::task::JoinSet;
800
801 #[test]
806 fn openrpc_v0() {
807 openrpc(ApiPaths::V0);
808 }
809
810 #[test]
811 fn openrpc_v1() {
812 openrpc(ApiPaths::V1);
813 }
814
815 #[test]
816 fn openrpc_v2() {
817 openrpc(ApiPaths::V2);
818 }
819
820 fn openrpc(path: ApiPaths) {
821 let spec = super::openrpc(path, None);
822 insta::assert_yaml_snapshot!(path.path(), spec);
823 }
824
825 #[test]
826 fn test_rpc_server() {
827 const TIMEOUT: Duration = Duration::from_secs(5);
828 let (done_tx, done_rx) = flume::bounded(1);
829 let rt = tokio::runtime::Builder::new_multi_thread()
830 .enable_all()
831 .build()
832 .unwrap();
833 rt.block_on(async move { test_rpc_server_inner(done_tx).await });
834 done_rx.recv().unwrap();
835 rt.shutdown_timeout(TIMEOUT);
837 }
838
839 async fn test_rpc_server_inner(done_tx: flume::Sender<()>) {
840 let chain = NetworkChain::Calibnet;
841 let db = Arc::new(MemoryDB::default());
842 let mut services = JoinSet::new();
843 let (state, mut shutdown_recv) = offline_rpc_state(chain, db, None, None, &mut services)
844 .await
845 .unwrap();
846 let block_delay_secs = state.chain_config().block_delay_secs;
847 let shutdown_send = state.shutdown.clone();
848 let jwt_read_permissions = vec!["read".to_owned()];
849 let jwt_read = super::methods::auth::AuthNew::create_token(
850 &state.keystore.read(),
851 chrono::Duration::hours(1),
852 jwt_read_permissions.clone(),
853 )
854 .unwrap();
855 let rpc_listener =
856 tokio::net::TcpListener::bind(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0))
857 .await
858 .unwrap();
859 let rpc_address = rpc_listener.local_addr().unwrap();
860 let (stop_handle, server_handle) = stop_channel();
861
862 let handle = tokio::spawn(start_rpc(state, rpc_listener, stop_handle, None));
865
866 println!("sending a few http requests");
867
868 let client = Client::from_url(
869 format!("http://{}:{}/", rpc_address.ip(), rpc_address.port())
870 .parse()
871 .unwrap(),
872 );
873
874 let response = super::methods::common::Version::call(&client, ())
875 .await
876 .unwrap();
877 assert_eq!(
878 &response.version,
879 &*crate::utils::version::FOREST_VERSION_STRING
880 );
881 assert_eq!(response.block_delay, block_delay_secs);
882 assert_eq!(response.api_version, ShiftingVersion::new(2, 3, 0));
883
884 let response = super::methods::auth::AuthVerify::call(&client, (jwt_read.clone(),))
885 .await
886 .unwrap();
887 assert_eq!(response, jwt_read_permissions);
888
889 drop(client);
890
891 println!("sending a few websocket requests");
892
893 let client = Client::from_url(
894 format!("ws://{}:{}/", rpc_address.ip(), rpc_address.port())
895 .parse()
896 .unwrap(),
897 );
898
899 let response = super::methods::auth::AuthVerify::call(&client, (jwt_read,))
900 .await
901 .unwrap();
902 assert_eq!(response, jwt_read_permissions);
903
904 drop(client);
905
906 println!("sending shutdown signal");
908 shutdown_send.send(()).await.unwrap();
909 println!("waiting on shutdown receiver");
910 shutdown_recv.recv().await;
911 println!("sending server stop signal");
912 server_handle.stop().unwrap();
913 println!("waiting on graceful shutdown");
914 handle.await.unwrap().unwrap();
915 println!("done");
916 done_tx.send(()).unwrap();
917 }
918}