solana_rpc/
rpc_service.rs

1//! The `rpc_service` module implements the Solana JSON RPC service.
2
3use {
4    crate::{
5        cluster_tpu_info::ClusterTpuInfo,
6        max_slots::MaxSlots,
7        optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
8        rpc::{rpc_accounts::*, rpc_accounts_scan::*, rpc_bank::*, rpc_full::*, rpc_minimal::*, *},
9        rpc_cache::LargestAccountsCache,
10        rpc_health::*,
11    },
12    crossbeam_channel::unbounded,
13    jsonrpc_core::{futures::prelude::*, MetaIoHandler},
14    jsonrpc_http_server::{
15        hyper, AccessControlAllowOrigin, CloseHandle, DomainsValidation, RequestMiddleware,
16        RequestMiddlewareAction, ServerBuilder,
17    },
18    regex::Regex,
19    solana_cli_output::display::build_balance_message,
20    solana_client::connection_cache::{ConnectionCache, Protocol},
21    solana_genesis_config::DEFAULT_GENESIS_DOWNLOAD_PATH,
22    solana_gossip::cluster_info::ClusterInfo,
23    solana_hash::Hash,
24    solana_keypair::Keypair,
25    solana_ledger::{
26        bigtable_upload::ConfirmedBlockUploadConfig,
27        bigtable_upload_service::BigTableUploadService, blockstore::Blockstore,
28        leader_schedule_cache::LeaderScheduleCache,
29    },
30    solana_metrics::inc_new_counter_info,
31    solana_perf::thread::renice_this_thread,
32    solana_poh::poh_recorder::PohRecorder,
33    solana_quic_definitions::NotifyKeyUpdate,
34    solana_runtime::{
35        bank::Bank,
36        bank_forks::BankForks,
37        commitment::BlockCommitmentCache,
38        non_circulating_supply::calculate_non_circulating_supply,
39        prioritization_fee_cache::PrioritizationFeeCache,
40        snapshot_archive_info::SnapshotArchiveInfoGetter,
41        snapshot_config::SnapshotConfig,
42        snapshot_utils::{self, SnapshotInterval},
43    },
44    solana_send_transaction_service::{
45        send_transaction_service::{self, SendTransactionService},
46        transaction_client::{ConnectionCacheClient, TpuClientNextClient, TransactionClient},
47    },
48    solana_storage_bigtable::CredentialType,
49    solana_validator_exit::Exit,
50    std::{
51        net::{SocketAddr, UdpSocket},
52        path::{Path, PathBuf},
53        pin::Pin,
54        sync::{
55            atomic::{AtomicBool, AtomicU64, Ordering},
56            Arc, RwLock,
57        },
58        task::{Context, Poll},
59        thread::{self, Builder, JoinHandle},
60        time::{Duration, Instant},
61    },
62    tokio::runtime::{Builder as TokioBuilder, Handle as RuntimeHandle, Runtime as TokioRuntime},
63    tokio_util::{
64        bytes::Bytes,
65        codec::{BytesCodec, FramedRead},
66        sync::CancellationToken,
67    },
68};
69
70const FULL_SNAPSHOT_REQUEST_PATH: &str = "/snapshot.tar.bz2";
71const INCREMENTAL_SNAPSHOT_REQUEST_PATH: &str = "/incremental-snapshot.tar.bz2";
72const LARGEST_ACCOUNTS_CACHE_DURATION: u64 = 60 * 60 * 2;
73/// Default minimum snapshot download speed is 10 MB/s
74/// Full snapshots are ~90 GB, incremental are ~1 GB today but both will increase over time
75/// Full: 120 GB / 10 MB/s = 12,000 seconds -> ~30k slots
76const FALLBACK_FULL_SNAPSHOT_TIMEOUT_SECS: Duration = Duration::from_secs(12_000);
77/// Incremental: 2.5 GB / 10 MB/s = 250 seconds -> ~625 slots
78const FALLBACK_INCREMENTAL_SNAPSHOT_TIMEOUT_SECS: Duration = Duration::from_secs(250);
79
80enum SnapshotKind {
81    Full,
82    Incremental,
83}
84
85struct TimeoutStream<S> {
86    inner: S,
87    deadline: Instant,
88}
89
90impl<S> TimeoutStream<S> {
91    fn new(inner: S, timeout: Duration) -> Self {
92        Self {
93            inner,
94            deadline: Instant::now() + timeout,
95        }
96    }
97}
98
99impl<S> Stream for TimeoutStream<S>
100where
101    S: Stream<Item = std::io::Result<Bytes>> + Unpin,
102{
103    type Item = std::io::Result<Bytes>;
104
105    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
106        if Instant::now() >= self.deadline {
107            return Poll::Ready(Some(Err(std::io::Error::new(
108                std::io::ErrorKind::TimedOut,
109                "snapshot transfer deadline exceeded",
110            ))));
111        }
112        Pin::new(&mut self.inner).poll_next(cx)
113    }
114}
115
116pub struct JsonRpcService {
117    thread_hdl: JoinHandle<()>,
118
119    #[cfg(test)]
120    pub request_processor: JsonRpcRequestProcessor, // Used only by test_rpc_new()...
121
122    close_handle: Option<CloseHandle>,
123
124    client_updater: Arc<dyn NotifyKeyUpdate + Send + Sync>,
125}
126
127struct RpcRequestMiddleware {
128    ledger_path: PathBuf,
129    full_snapshot_archive_path_regex: Regex,
130    incremental_snapshot_archive_path_regex: Regex,
131    snapshot_config: Option<SnapshotConfig>,
132    bank_forks: Arc<RwLock<BankForks>>,
133    health: Arc<RpcHealth>,
134}
135
136impl RpcRequestMiddleware {
137    pub fn new(
138        ledger_path: PathBuf,
139        snapshot_config: Option<SnapshotConfig>,
140        bank_forks: Arc<RwLock<BankForks>>,
141        health: Arc<RpcHealth>,
142    ) -> Self {
143        Self {
144            ledger_path,
145            full_snapshot_archive_path_regex: Regex::new(
146                snapshot_utils::FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX,
147            )
148            .unwrap(),
149            incremental_snapshot_archive_path_regex: Regex::new(
150                snapshot_utils::INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX,
151            )
152            .unwrap(),
153            snapshot_config,
154            bank_forks,
155            health,
156        }
157    }
158
159    fn redirect(location: &str) -> hyper::Response<hyper::Body> {
160        hyper::Response::builder()
161            .status(hyper::StatusCode::SEE_OTHER)
162            .header(hyper::header::LOCATION, location)
163            .body(hyper::Body::from(String::from(location)))
164            .unwrap()
165    }
166
167    fn not_found() -> hyper::Response<hyper::Body> {
168        hyper::Response::builder()
169            .status(hyper::StatusCode::NOT_FOUND)
170            .body(hyper::Body::empty())
171            .unwrap()
172    }
173
174    fn internal_server_error() -> hyper::Response<hyper::Body> {
175        hyper::Response::builder()
176            .status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
177            .body(hyper::Body::empty())
178            .unwrap()
179    }
180
181    fn strip_leading_slash(path: &str) -> Option<&str> {
182        path.strip_prefix('/')
183    }
184
185    fn is_file_get_path(&self, path: &str) -> bool {
186        if path == DEFAULT_GENESIS_DOWNLOAD_PATH {
187            return true;
188        }
189
190        if self.snapshot_config.is_none() {
191            return false;
192        }
193
194        let Some(path) = Self::strip_leading_slash(path) else {
195            return false;
196        };
197
198        self.full_snapshot_archive_path_regex.is_match(path)
199            || self.incremental_snapshot_archive_path_regex.is_match(path)
200    }
201
202    #[cfg(unix)]
203    async fn open_no_follow(path: impl AsRef<Path>) -> std::io::Result<tokio::fs::File> {
204        tokio::fs::OpenOptions::new()
205            .read(true)
206            .write(false)
207            .create(false)
208            .custom_flags(libc::O_NOFOLLOW)
209            .open(path)
210            .await
211    }
212
213    #[cfg(not(unix))]
214    async fn open_no_follow(path: impl AsRef<Path>) -> std::io::Result<tokio::fs::File> {
215        // TODO: Is there any way to achieve the same on Windows?
216        tokio::fs::File::open(path).await
217    }
218
219    fn find_snapshot_file<P>(&self, stem: P) -> (PathBuf, SnapshotKind)
220    where
221        P: AsRef<Path>,
222    {
223        let is_full = self
224            .full_snapshot_archive_path_regex
225            .is_match(Path::new("").join(&stem).to_str().unwrap());
226        let root = if is_full {
227            &self
228                .snapshot_config
229                .as_ref()
230                .unwrap()
231                .full_snapshot_archives_dir
232        } else {
233            &self
234                .snapshot_config
235                .as_ref()
236                .unwrap()
237                .incremental_snapshot_archives_dir
238        };
239        let local_path = root.join(&stem);
240        let path = if local_path.exists() {
241            local_path
242        } else {
243            // remote snapshot archive path
244            snapshot_utils::build_snapshot_archives_remote_dir(root).join(stem)
245        };
246        (
247            path,
248            if is_full {
249                SnapshotKind::Full
250            } else {
251                SnapshotKind::Incremental
252            },
253        )
254    }
255
256    fn process_file_get(&self, path: &str) -> RequestMiddlewareAction {
257        let (filename, snapshot_type) = {
258            let stem = Self::strip_leading_slash(path).expect("path already verified");
259            match path {
260                DEFAULT_GENESIS_DOWNLOAD_PATH => {
261                    inc_new_counter_info!("rpc-get_genesis", 1);
262                    (self.ledger_path.join(stem), None)
263                }
264                _ => {
265                    inc_new_counter_info!("rpc-get_snapshot", 1);
266                    let (path, snapshot_type) = self.find_snapshot_file(stem);
267                    (path, Some(snapshot_type))
268                }
269            }
270        };
271        let file_length = std::fs::metadata(&filename)
272            .map(|m| m.len())
273            .unwrap_or(0)
274            .to_string();
275        info!("get {path} -> {filename:?} ({file_length} bytes)");
276
277        if cfg!(not(test)) {
278            assert!(
279                self.snapshot_config.is_some(),
280                "snapshot_config should never be None outside of tests"
281            );
282        }
283        let snapshot_timeout = self.snapshot_config.as_ref().and_then(|config| {
284            snapshot_type.map(|st| {
285                let interval = match st {
286                    SnapshotKind::Full => config.full_snapshot_archive_interval,
287                    SnapshotKind::Incremental => config.incremental_snapshot_archive_interval,
288                };
289                let computed = match interval {
290                    SnapshotInterval::Disabled => Duration::ZERO,
291                    SnapshotInterval::Slots(slots) => Duration::from_millis(
292                        slots
293                            .get()
294                            .saturating_mul(solana_clock::DEFAULT_MS_PER_SLOT),
295                    ),
296                };
297                let fallback = match st {
298                    SnapshotKind::Full => FALLBACK_FULL_SNAPSHOT_TIMEOUT_SECS,
299                    SnapshotKind::Incremental => FALLBACK_INCREMENTAL_SNAPSHOT_TIMEOUT_SECS,
300                };
301                std::cmp::max(computed, fallback)
302            })
303        });
304
305        RequestMiddlewareAction::Respond {
306            should_validate_hosts: true,
307            response: Box::pin(async move {
308                match Self::open_no_follow(filename).await {
309                    Err(err) => Ok(if err.kind() == std::io::ErrorKind::NotFound {
310                        Self::not_found()
311                    } else {
312                        Self::internal_server_error()
313                    }),
314                    Ok(file) => {
315                        let stream =
316                            FramedRead::new(file, BytesCodec::new()).map_ok(|b| b.freeze());
317                        let body = if let Some(timeout) = snapshot_timeout {
318                            hyper::Body::wrap_stream(TimeoutStream::new(stream, timeout))
319                        } else {
320                            hyper::Body::wrap_stream(stream)
321                        };
322                        Ok(hyper::Response::builder()
323                            .header(hyper::header::CONTENT_LENGTH, file_length)
324                            .body(body)
325                            .unwrap())
326                    }
327                }
328            }),
329        }
330    }
331
332    fn health_check(&self) -> &'static str {
333        let response = match self.health.check() {
334            RpcHealthStatus::Ok => "ok",
335            RpcHealthStatus::Behind { .. } => "behind",
336            RpcHealthStatus::Unknown => "unknown",
337        };
338        info!("health check: {response}");
339        response
340    }
341}
342
343impl RequestMiddleware for RpcRequestMiddleware {
344    fn on_request(&self, request: hyper::Request<hyper::Body>) -> RequestMiddlewareAction {
345        trace!("request uri: {}", request.uri());
346
347        if let Some(ref snapshot_config) = self.snapshot_config {
348            if request.uri().path() == FULL_SNAPSHOT_REQUEST_PATH
349                || request.uri().path() == INCREMENTAL_SNAPSHOT_REQUEST_PATH
350            {
351                // Convenience redirect to the latest snapshot
352                let full_snapshot_archive_info =
353                    snapshot_utils::get_highest_full_snapshot_archive_info(
354                        &snapshot_config.full_snapshot_archives_dir,
355                    );
356                let snapshot_archive_info =
357                    if let Some(full_snapshot_archive_info) = full_snapshot_archive_info {
358                        if request.uri().path() == FULL_SNAPSHOT_REQUEST_PATH {
359                            Some(full_snapshot_archive_info.snapshot_archive_info().clone())
360                        } else {
361                            snapshot_utils::get_highest_incremental_snapshot_archive_info(
362                                &snapshot_config.incremental_snapshot_archives_dir,
363                                full_snapshot_archive_info.slot(),
364                            )
365                            .map(|incremental_snapshot_archive_info| {
366                                incremental_snapshot_archive_info
367                                    .snapshot_archive_info()
368                                    .clone()
369                            })
370                        }
371                    } else {
372                        None
373                    };
374                return if let Some(snapshot_archive_info) = snapshot_archive_info {
375                    RpcRequestMiddleware::redirect(&format!(
376                        "/{}",
377                        snapshot_archive_info
378                            .path
379                            .file_name()
380                            .unwrap_or_else(|| std::ffi::OsStr::new(""))
381                            .to_str()
382                            .unwrap_or("")
383                    ))
384                } else {
385                    RpcRequestMiddleware::not_found()
386                }
387                .into();
388            }
389        }
390
391        if let Some(path) = match_supply_path(request.uri().path()) {
392            process_rest(&self.bank_forks, path)
393        } else if self.is_file_get_path(request.uri().path()) {
394            self.process_file_get(request.uri().path())
395        } else if request.uri().path() == "/health" {
396            hyper::Response::builder()
397                .status(hyper::StatusCode::OK)
398                .body(hyper::Body::from(self.health_check()))
399                .unwrap()
400                .into()
401        } else {
402            request.into()
403        }
404    }
405}
406
407fn match_supply_path(path: &str) -> Option<&str> {
408    match path {
409        "/v0/circulating-supply" | "/v0/total-supply" => Some(path),
410        _ => None,
411    }
412}
413
414#[derive(Debug)]
415pub enum SupplyCalcError {
416    Scan(String),
417}
418
419async fn calculate_circulating_supply_async(bank: &Arc<Bank>) -> Result<u64, SupplyCalcError> {
420    let total_supply = bank.capitalization();
421    let bank = Arc::clone(bank);
422    let non_circulating_supply =
423        tokio::task::spawn_blocking(move || calculate_non_circulating_supply(&bank))
424            .await
425            .expect("Failed to spawn blocking task")
426            .map_err(|e| SupplyCalcError::Scan(e.to_string()))?;
427
428    Ok(total_supply.saturating_sub(non_circulating_supply.lamports))
429}
430
431async fn handle_rest(bank_forks: &Arc<RwLock<BankForks>>, path: &str) -> Option<String> {
432    match path {
433        "/v0/circulating-supply" => {
434            let bank = bank_forks.read().unwrap().root_bank();
435            let supply_result = calculate_circulating_supply_async(&bank).await;
436            match supply_result {
437                Ok(supply) => Some(build_balance_message(supply, false, false)),
438                Err(_) => None,
439            }
440        }
441        "/v0/total-supply" => {
442            let bank = bank_forks.read().unwrap().root_bank();
443            let total_supply = bank.capitalization();
444            Some(build_balance_message(total_supply, false, false))
445        }
446        _ => None,
447    }
448}
449
450fn process_rest(bank_forks: &Arc<RwLock<BankForks>>, path: &str) -> RequestMiddlewareAction {
451    let bank_forks = bank_forks.clone();
452    let path = path.to_string();
453
454    RequestMiddlewareAction::Respond {
455        should_validate_hosts: true,
456        response: Box::pin(async move {
457            let result = handle_rest(&bank_forks, path.as_str()).await;
458            match result {
459                Some(s) => Ok(hyper::Response::builder()
460                    .status(hyper::StatusCode::OK)
461                    .body(hyper::Body::from(s))
462                    .unwrap()),
463                None => Ok(RpcRequestMiddleware::not_found()),
464            }
465        }),
466    }
467}
468
469/// [`JsonRpcServiceConfig`] is a helper structure that simplifies the creation
470/// of a [`JsonRpcService`] with a target TPU client specified by
471/// `client_option`.
472pub struct JsonRpcServiceConfig<'a> {
473    pub rpc_addr: SocketAddr,
474    pub rpc_config: JsonRpcConfig,
475    pub snapshot_config: Option<SnapshotConfig>,
476    pub bank_forks: Arc<RwLock<BankForks>>,
477    pub block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
478    pub blockstore: Arc<Blockstore>,
479    pub cluster_info: Arc<ClusterInfo>,
480    pub poh_recorder: Option<Arc<RwLock<PohRecorder>>>,
481    pub genesis_hash: Hash,
482    pub ledger_path: PathBuf,
483    pub validator_exit: Arc<RwLock<Exit>>,
484    pub exit: Arc<AtomicBool>,
485    pub override_health_check: Arc<AtomicBool>,
486    pub startup_verification_complete: Arc<AtomicBool>,
487    pub optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
488    pub send_transaction_service_config: send_transaction_service::Config,
489    pub max_slots: Arc<MaxSlots>,
490    pub leader_schedule_cache: Arc<LeaderScheduleCache>,
491    pub max_complete_transaction_status_slot: Arc<AtomicU64>,
492    pub prioritization_fee_cache: Arc<PrioritizationFeeCache>,
493    pub client_option: ClientOption<'a>,
494}
495
496/// [`ClientOption`] enum represents the available client types for TPU
497/// communication:
498/// * [`ConnectionCacheClient`]: Uses a shared [`ConnectionCache`] to manage
499///   connections efficiently.
500/// * [`TpuClientNextClient`]: Relies on the `tpu-client-next` crate and
501///   requires a reference to a [`Keypair`].
502pub enum ClientOption<'a> {
503    ConnectionCache(Arc<ConnectionCache>),
504    TpuClientNext(&'a Keypair, UdpSocket, RuntimeHandle, CancellationToken),
505}
506
507impl JsonRpcService {
508    pub fn new_with_config(config: JsonRpcServiceConfig) -> Result<Self, String> {
509        let runtime = service_runtime(
510            config.rpc_config.rpc_threads,
511            config.rpc_config.rpc_blocking_threads,
512            config.rpc_config.rpc_niceness_adj,
513        );
514        let leader_info = config
515            .poh_recorder
516            .map(|recorder| ClusterTpuInfo::new(config.cluster_info.clone(), recorder));
517
518        match config.client_option {
519            ClientOption::ConnectionCache(connection_cache) => {
520                let my_tpu_address = config
521                    .cluster_info
522                    .my_contact_info()
523                    .tpu(connection_cache.protocol())
524                    .ok_or(format!(
525                        "Invalid {:?} socket address for TPU",
526                        connection_cache.protocol()
527                    ))?;
528                let client = ConnectionCacheClient::new(
529                    connection_cache,
530                    my_tpu_address,
531                    config.send_transaction_service_config.tpu_peers.clone(),
532                    leader_info,
533                    config.send_transaction_service_config.leader_forward_count,
534                );
535                let json_rpc_service = Self::new_with_client(
536                    config.rpc_addr,
537                    config.rpc_config,
538                    config.snapshot_config,
539                    config.bank_forks,
540                    config.block_commitment_cache,
541                    config.blockstore,
542                    config.cluster_info,
543                    config.genesis_hash,
544                    config.ledger_path.as_path(),
545                    config.validator_exit,
546                    config.exit,
547                    config.override_health_check,
548                    config.startup_verification_complete,
549                    config.optimistically_confirmed_bank,
550                    config.send_transaction_service_config,
551                    config.max_slots,
552                    config.leader_schedule_cache,
553                    client.clone(),
554                    config.max_complete_transaction_status_slot,
555                    config.prioritization_fee_cache,
556                    runtime,
557                )?;
558                Ok(json_rpc_service)
559            }
560            ClientOption::TpuClientNext(
561                identity_keypair,
562                tpu_client_socket,
563                client_runtime,
564                cancel,
565            ) => {
566                let my_tpu_address = config
567                    .cluster_info
568                    .my_contact_info()
569                    .tpu(Protocol::QUIC)
570                    .ok_or(format!(
571                        "Invalid {:?} socket address for TPU",
572                        Protocol::QUIC
573                    ))?;
574                let client = TpuClientNextClient::new(
575                    client_runtime,
576                    my_tpu_address,
577                    config.send_transaction_service_config.tpu_peers.clone(),
578                    leader_info,
579                    config.send_transaction_service_config.leader_forward_count,
580                    Some(identity_keypair),
581                    tpu_client_socket,
582                    cancel,
583                );
584
585                let json_rpc_service = Self::new_with_client(
586                    config.rpc_addr,
587                    config.rpc_config.clone(),
588                    config.snapshot_config,
589                    config.bank_forks.clone(),
590                    config.block_commitment_cache.clone(),
591                    config.blockstore.clone(),
592                    config.cluster_info.clone(),
593                    config.genesis_hash,
594                    config.ledger_path.as_path(),
595                    config.validator_exit,
596                    config.exit,
597                    config.override_health_check,
598                    config.startup_verification_complete,
599                    config.optimistically_confirmed_bank,
600                    config.send_transaction_service_config,
601                    config.max_slots,
602                    config.leader_schedule_cache,
603                    client,
604                    config.max_complete_transaction_status_slot,
605                    config.prioritization_fee_cache,
606                    runtime,
607                )?;
608                Ok(json_rpc_service)
609            }
610        }
611    }
612
613    #[allow(clippy::too_many_arguments)]
614    pub fn new(
615        rpc_addr: SocketAddr,
616        config: JsonRpcConfig,
617        snapshot_config: Option<SnapshotConfig>,
618        bank_forks: Arc<RwLock<BankForks>>,
619        block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
620        blockstore: Arc<Blockstore>,
621        cluster_info: Arc<ClusterInfo>,
622        poh_recorder: Option<Arc<RwLock<PohRecorder>>>,
623        genesis_hash: Hash,
624        ledger_path: &Path,
625        validator_exit: Arc<RwLock<Exit>>,
626        exit: Arc<AtomicBool>,
627        override_health_check: Arc<AtomicBool>,
628        startup_verification_complete: Arc<AtomicBool>,
629        optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
630        send_transaction_service_config: send_transaction_service::Config,
631        max_slots: Arc<MaxSlots>,
632        leader_schedule_cache: Arc<LeaderScheduleCache>,
633        connection_cache: Arc<ConnectionCache>,
634        max_complete_transaction_status_slot: Arc<AtomicU64>,
635        prioritization_fee_cache: Arc<PrioritizationFeeCache>,
636    ) -> Result<Self, String> {
637        let runtime = service_runtime(
638            config.rpc_threads,
639            config.rpc_blocking_threads,
640            config.rpc_niceness_adj,
641        );
642
643        let tpu_address = cluster_info
644            .my_contact_info()
645            .tpu(connection_cache.protocol())
646            .ok_or_else(|| {
647                format!(
648                    "Invalid {:?} socket address for TPU",
649                    connection_cache.protocol()
650                )
651            })?;
652
653        let leader_info =
654            poh_recorder.map(|recorder| ClusterTpuInfo::new(cluster_info.clone(), recorder));
655        let client = ConnectionCacheClient::new(
656            connection_cache,
657            tpu_address,
658            send_transaction_service_config.tpu_peers.clone(),
659            leader_info,
660            send_transaction_service_config.leader_forward_count,
661        );
662        let json_rpc_service = Self::new_with_client(
663            rpc_addr,
664            config,
665            snapshot_config,
666            bank_forks,
667            block_commitment_cache,
668            blockstore,
669            cluster_info,
670            genesis_hash,
671            ledger_path,
672            validator_exit,
673            exit,
674            override_health_check,
675            startup_verification_complete,
676            optimistically_confirmed_bank,
677            send_transaction_service_config,
678            max_slots,
679            leader_schedule_cache,
680            client.clone(),
681            max_complete_transaction_status_slot,
682            prioritization_fee_cache,
683            runtime,
684        )?;
685        Ok(json_rpc_service)
686    }
687
688    #[allow(clippy::too_many_arguments)]
689    fn new_with_client<
690        Client: TransactionClient
691            + NotifyKeyUpdate
692            + Clone
693            + std::marker::Send
694            + std::marker::Sync
695            + 'static,
696    >(
697        rpc_addr: SocketAddr,
698        config: JsonRpcConfig,
699        snapshot_config: Option<SnapshotConfig>,
700        bank_forks: Arc<RwLock<BankForks>>,
701        block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
702        blockstore: Arc<Blockstore>,
703        cluster_info: Arc<ClusterInfo>,
704        genesis_hash: Hash,
705        ledger_path: &Path,
706        validator_exit: Arc<RwLock<Exit>>,
707        exit: Arc<AtomicBool>,
708        override_health_check: Arc<AtomicBool>,
709        startup_verification_complete: Arc<AtomicBool>,
710        optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
711        send_transaction_service_config: send_transaction_service::Config,
712        max_slots: Arc<MaxSlots>,
713        leader_schedule_cache: Arc<LeaderScheduleCache>,
714        client: Client,
715        max_complete_transaction_status_slot: Arc<AtomicU64>,
716        prioritization_fee_cache: Arc<PrioritizationFeeCache>,
717        runtime: Arc<TokioRuntime>,
718    ) -> Result<Self, String> {
719        info!("rpc bound to {rpc_addr:?}");
720        info!("rpc configuration: {config:?}");
721        let rpc_niceness_adj = config.rpc_niceness_adj;
722
723        let health = Arc::new(RpcHealth::new(
724            Arc::clone(&optimistically_confirmed_bank),
725            Arc::clone(&blockstore),
726            config.health_check_slot_distance,
727            override_health_check,
728            startup_verification_complete,
729        ));
730
731        let largest_accounts_cache = Arc::new(RwLock::new(LargestAccountsCache::new(
732            LARGEST_ACCOUNTS_CACHE_DURATION,
733        )));
734
735        let exit_bigtable_ledger_upload_service = Arc::new(AtomicBool::new(false));
736
737        let (bigtable_ledger_storage, _bigtable_ledger_upload_service) =
738            if let Some(RpcBigtableConfig {
739                enable_bigtable_ledger_upload,
740                ref bigtable_instance_name,
741                ref bigtable_app_profile_id,
742                timeout,
743                max_message_size,
744            }) = config.rpc_bigtable_config
745            {
746                let bigtable_config = solana_storage_bigtable::LedgerStorageConfig {
747                    read_only: !enable_bigtable_ledger_upload,
748                    timeout,
749                    credential_type: CredentialType::Filepath(None),
750                    instance_name: bigtable_instance_name.clone(),
751                    app_profile_id: bigtable_app_profile_id.clone(),
752                    max_message_size,
753                };
754                runtime
755                    .block_on(solana_storage_bigtable::LedgerStorage::new_with_config(
756                        bigtable_config,
757                    ))
758                    .map(|bigtable_ledger_storage| {
759                        info!("BigTable ledger storage initialized");
760
761                        let bigtable_ledger_upload_service = if enable_bigtable_ledger_upload {
762                            Some(Arc::new(BigTableUploadService::new_with_config(
763                                runtime.clone(),
764                                bigtable_ledger_storage.clone(),
765                                blockstore.clone(),
766                                block_commitment_cache.clone(),
767                                max_complete_transaction_status_slot.clone(),
768                                ConfirmedBlockUploadConfig::default(),
769                                exit_bigtable_ledger_upload_service.clone(),
770                            )))
771                        } else {
772                            None
773                        };
774
775                        (
776                            Some(bigtable_ledger_storage),
777                            bigtable_ledger_upload_service,
778                        )
779                    })
780                    .unwrap_or_else(|err| {
781                        error!("Failed to initialize BigTable ledger storage: {err:?}");
782                        (None, None)
783                    })
784            } else {
785                (None, None)
786            };
787
788        let full_api = config.full_api;
789        let max_request_body_size = config
790            .max_request_body_size
791            .unwrap_or(MAX_REQUEST_BODY_SIZE);
792        let (request_processor, receiver) = JsonRpcRequestProcessor::new(
793            config,
794            snapshot_config.clone(),
795            bank_forks.clone(),
796            block_commitment_cache,
797            blockstore,
798            validator_exit.clone(),
799            health.clone(),
800            cluster_info.clone(),
801            genesis_hash,
802            bigtable_ledger_storage,
803            optimistically_confirmed_bank,
804            largest_accounts_cache,
805            max_slots,
806            leader_schedule_cache,
807            max_complete_transaction_status_slot,
808            prioritization_fee_cache,
809            Arc::clone(&runtime),
810        );
811
812        let _send_transaction_service = Arc::new(SendTransactionService::new_with_client(
813            &bank_forks,
814            receiver,
815            client.clone(),
816            send_transaction_service_config,
817            exit,
818        ));
819
820        #[cfg(test)]
821        let test_request_processor = request_processor.clone();
822
823        let ledger_path = ledger_path.to_path_buf();
824
825        let (close_handle_sender, close_handle_receiver) = unbounded();
826        let thread_hdl = Builder::new()
827            .name("solJsonRpcSvc".to_string())
828            .spawn(move || {
829                renice_this_thread(rpc_niceness_adj).unwrap();
830
831                let mut io = MetaIoHandler::default();
832
833                io.extend_with(rpc_minimal::MinimalImpl.to_delegate());
834                if full_api {
835                    io.extend_with(rpc_bank::BankDataImpl.to_delegate());
836                    io.extend_with(rpc_accounts::AccountsDataImpl.to_delegate());
837                    io.extend_with(rpc_accounts_scan::AccountsScanImpl.to_delegate());
838                    io.extend_with(rpc_full::FullImpl.to_delegate());
839                }
840
841                let request_middleware = RpcRequestMiddleware::new(
842                    ledger_path,
843                    snapshot_config,
844                    bank_forks.clone(),
845                    health.clone(),
846                );
847                let server = ServerBuilder::with_meta_extractor(
848                    io,
849                    move |req: &hyper::Request<hyper::Body>| {
850                        let xbigtable = req.headers().get("x-bigtable");
851                        if xbigtable.is_some_and(|v| v == "disabled") {
852                            request_processor.clone_without_bigtable()
853                        } else {
854                            request_processor.clone()
855                        }
856                    },
857                )
858                .event_loop_executor(runtime.handle().clone())
859                .threads(1)
860                .cors(DomainsValidation::AllowOnly(vec![
861                    AccessControlAllowOrigin::Any,
862                ]))
863                .cors_max_age(86400)
864                .request_middleware(request_middleware)
865                .max_request_body_size(max_request_body_size)
866                .start_http(&rpc_addr);
867
868                if let Err(e) = server {
869                    warn!(
870                        "JSON RPC service unavailable error: {e:?}. Also, check that port {} is \
871                         not already in use by another application",
872                        rpc_addr.port()
873                    );
874                    close_handle_sender.send(Err(e.to_string())).unwrap();
875                    return;
876                }
877
878                let server = server.unwrap();
879                close_handle_sender.send(Ok(server.close_handle())).unwrap();
880                server.wait();
881                exit_bigtable_ledger_upload_service.store(true, Ordering::Relaxed);
882            })
883            .unwrap();
884
885        let close_handle = close_handle_receiver.recv().unwrap()?;
886        let close_handle_ = close_handle.clone();
887        validator_exit
888            .write()
889            .unwrap()
890            .register_exit(Box::new(move || {
891                close_handle_.close();
892            }));
893        Ok(Self {
894            thread_hdl,
895            #[cfg(test)]
896            request_processor: test_request_processor,
897            close_handle: Some(close_handle),
898            client_updater: Arc::new(client) as Arc<dyn NotifyKeyUpdate + Send + Sync>,
899        })
900    }
901
902    pub fn exit(&mut self) {
903        if let Some(c) = self.close_handle.take() {
904            c.close()
905        }
906    }
907
908    pub fn join(mut self) -> thread::Result<()> {
909        self.exit();
910        self.thread_hdl.join()
911    }
912
913    pub fn get_client_key_updater(&self) -> Arc<dyn NotifyKeyUpdate + Send + Sync> {
914        self.client_updater.clone()
915    }
916}
917
918pub fn service_runtime(
919    rpc_threads: usize,
920    rpc_blocking_threads: usize,
921    rpc_niceness_adj: i8,
922) -> Arc<TokioRuntime> {
923    // The jsonrpc_http_server crate supports two execution models:
924    //
925    // - By default, it spawns a number of threads - configured with .threads(N) - and runs a
926    //   single-threaded futures executor in each thread.
927    // - Alternatively when configured with .event_loop_executor(executor) and .threads(1),
928    //   it executes all the tasks on the given executor, not spawning any extra internal threads.
929    //
930    // We use the latter configuration, using a multi threaded tokio runtime as the executor. We
931    // do this so we can configure the number of worker threads, the number of blocking threads
932    // and then use tokio::task::spawn_blocking() to avoid blocking the worker threads on CPU
933    // bound operations like getMultipleAccounts. This results in reduced latency, since fast
934    // rpc calls (the majority) are not blocked by slow CPU bound ones.
935    //
936    // NB: `rpc_blocking_threads` shouldn't be set too high (defaults to num_cpus / 2). Too many
937    // (busy) blocking threads could compete with CPU time with other validator threads and
938    // negatively impact performance.
939    let rpc_threads = 1.max(rpc_threads);
940    let rpc_blocking_threads = 1.max(rpc_blocking_threads);
941    let runtime = Arc::new(
942        TokioBuilder::new_multi_thread()
943            .worker_threads(rpc_threads)
944            .max_blocking_threads(rpc_blocking_threads)
945            .on_thread_start(move || renice_this_thread(rpc_niceness_adj).unwrap())
946            .thread_name("solRpcEl")
947            .enable_all()
948            .build()
949            .expect("Runtime"),
950    );
951    runtime
952}
953
954#[cfg(test)]
955mod tests {
956    use {
957        super::*,
958        crate::rpc::{create_validator_exit, tests::new_test_cluster_info},
959        solana_cluster_type::ClusterType,
960        solana_genesis_config::DEFAULT_GENESIS_ARCHIVE,
961        solana_ledger::{
962            genesis_utils::{create_genesis_config, GenesisConfigInfo},
963            get_tmp_ledger_path_auto_delete,
964        },
965        solana_rpc_client_api::config::RpcContextConfig,
966        solana_runtime::bank::Bank,
967        solana_signer::Signer,
968        std::{
969            io::Write,
970            net::{IpAddr, Ipv4Addr},
971        },
972        tokio::runtime::Runtime,
973    };
974
975    #[test]
976    fn test_rpc_new() {
977        let GenesisConfigInfo {
978            genesis_config,
979            mint_keypair,
980            ..
981        } = create_genesis_config(10_000);
982        let exit = Arc::new(AtomicBool::new(false));
983        let validator_exit = create_validator_exit(exit.clone());
984        let bank = Bank::new_for_tests(&genesis_config);
985        let cluster_info = Arc::new(new_test_cluster_info());
986        let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
987        let port_range = solana_net_utils::sockets::localhost_port_range_for_tests();
988        let rpc_addr = SocketAddr::new(
989            ip_addr,
990            solana_net_utils::find_available_port_in_range(ip_addr, port_range).unwrap(),
991        );
992        let bank_forks = BankForks::new_rw_arc(bank);
993        let ledger_path = get_tmp_ledger_path_auto_delete!();
994        let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
995        let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
996        let optimistically_confirmed_bank =
997            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
998        let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test"));
999        let mut rpc_service = JsonRpcService::new(
1000            rpc_addr,
1001            JsonRpcConfig::default(),
1002            None,
1003            bank_forks,
1004            block_commitment_cache,
1005            blockstore,
1006            cluster_info,
1007            None,
1008            Hash::default(),
1009            &PathBuf::from("farf"),
1010            validator_exit,
1011            exit,
1012            Arc::new(AtomicBool::new(false)),
1013            Arc::new(AtomicBool::new(true)),
1014            optimistically_confirmed_bank,
1015            send_transaction_service::Config {
1016                retry_rate_ms: 1000,
1017                leader_forward_count: 1,
1018                ..send_transaction_service::Config::default()
1019            },
1020            Arc::new(MaxSlots::default()),
1021            Arc::new(LeaderScheduleCache::default()),
1022            connection_cache,
1023            Arc::new(AtomicU64::default()),
1024            Arc::new(PrioritizationFeeCache::default()),
1025        )
1026        .expect("assume successful JsonRpcService start");
1027        let thread = rpc_service.thread_hdl.thread();
1028        assert_eq!(thread.name().unwrap(), "solJsonRpcSvc");
1029
1030        assert_eq!(
1031            10_000,
1032            rpc_service
1033                .request_processor
1034                .get_balance(&mint_keypair.pubkey(), RpcContextConfig::default())
1035                .unwrap()
1036                .value
1037        );
1038        rpc_service.exit();
1039        rpc_service.join().unwrap();
1040    }
1041
1042    fn create_bank_forks() -> Arc<RwLock<BankForks>> {
1043        let GenesisConfigInfo {
1044            mut genesis_config, ..
1045        } = create_genesis_config(10_000);
1046        genesis_config.cluster_type = ClusterType::MainnetBeta;
1047        let bank = Bank::new_for_tests(&genesis_config);
1048        BankForks::new_rw_arc(bank)
1049    }
1050
1051    #[test]
1052    fn test_process_rest_api() {
1053        let bank_forks = create_bank_forks();
1054        let runtime = tokio::runtime::Runtime::new().unwrap();
1055
1056        runtime.block_on(async {
1057            assert_eq!(
1058                None,
1059                handle_rest(&bank_forks, "not-a-supported-rest-api").await
1060            );
1061
1062            let circulating_supply = handle_rest(&bank_forks, "/v0/circulating-supply").await;
1063            assert!(circulating_supply.is_some());
1064
1065            let total_supply = handle_rest(&bank_forks, "/v0/total-supply").await;
1066            assert!(total_supply.is_some());
1067
1068            assert_eq!(
1069                handle_rest(&bank_forks, "/v0/circulating-supply").await,
1070                handle_rest(&bank_forks, "/v0/total-supply").await
1071            );
1072        });
1073    }
1074
1075    #[test]
1076    fn test_strip_prefix() {
1077        assert_eq!(RpcRequestMiddleware::strip_leading_slash("/"), Some(""));
1078        assert_eq!(RpcRequestMiddleware::strip_leading_slash("//"), Some("/"));
1079        assert_eq!(
1080            RpcRequestMiddleware::strip_leading_slash("/abc"),
1081            Some("abc")
1082        );
1083        assert_eq!(
1084            RpcRequestMiddleware::strip_leading_slash("//abc"),
1085            Some("/abc")
1086        );
1087        assert_eq!(
1088            RpcRequestMiddleware::strip_leading_slash("/./abc"),
1089            Some("./abc")
1090        );
1091        assert_eq!(
1092            RpcRequestMiddleware::strip_leading_slash("/../abc"),
1093            Some("../abc")
1094        );
1095
1096        assert_eq!(RpcRequestMiddleware::strip_leading_slash(""), None);
1097        assert_eq!(RpcRequestMiddleware::strip_leading_slash("./"), None);
1098        assert_eq!(RpcRequestMiddleware::strip_leading_slash("../"), None);
1099        assert_eq!(RpcRequestMiddleware::strip_leading_slash("."), None);
1100        assert_eq!(RpcRequestMiddleware::strip_leading_slash(".."), None);
1101        assert_eq!(RpcRequestMiddleware::strip_leading_slash("abc"), None);
1102    }
1103
1104    #[test]
1105    fn test_is_file_get_path() {
1106        let ledger_path = get_tmp_ledger_path_auto_delete!();
1107        let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
1108        let bank_forks = create_bank_forks();
1109        let optimistically_confirmed_bank =
1110            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
1111        let health = RpcHealth::stub(optimistically_confirmed_bank, blockstore);
1112
1113        let bank_forks = create_bank_forks();
1114        let rrm = RpcRequestMiddleware::new(
1115            ledger_path.path().to_path_buf(),
1116            None,
1117            bank_forks.clone(),
1118            health.clone(),
1119        );
1120        let rrm_with_snapshot_config = RpcRequestMiddleware::new(
1121            ledger_path.path().to_path_buf(),
1122            Some(SnapshotConfig::default()),
1123            bank_forks,
1124            health,
1125        );
1126
1127        assert!(rrm.is_file_get_path(DEFAULT_GENESIS_DOWNLOAD_PATH));
1128        assert!(!rrm.is_file_get_path(DEFAULT_GENESIS_ARCHIVE));
1129        assert!(!rrm.is_file_get_path("//genesis.tar.bz2"));
1130        assert!(!rrm.is_file_get_path("/../genesis.tar.bz2"));
1131
1132        // These two are redirects
1133        assert!(!rrm.is_file_get_path("/snapshot.tar.bz2"));
1134        assert!(!rrm.is_file_get_path("/incremental-snapshot.tar.bz2"));
1135
1136        assert!(!rrm.is_file_get_path(
1137            "/snapshot-100-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.zst"
1138        ));
1139        assert!(!rrm.is_file_get_path(
1140            "/incremental-snapshot-100-200-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.zst"
1141        ));
1142
1143        assert!(rrm_with_snapshot_config.is_file_get_path(
1144            "/snapshot-100-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.zst"
1145        ));
1146        assert!(rrm_with_snapshot_config.is_file_get_path(
1147            "/snapshot-100-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.lz4"
1148        ));
1149        assert!(!rrm_with_snapshot_config.is_file_get_path(
1150            "/snapshot-100-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.bz2"
1151        ));
1152        assert!(!rrm_with_snapshot_config
1153            .is_file_get_path("/snapshot-100-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.gz"));
1154        assert!(!rrm_with_snapshot_config
1155            .is_file_get_path("/snapshot-100-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar"));
1156
1157        assert!(rrm_with_snapshot_config.is_file_get_path(
1158            "/incremental-snapshot-100-200-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.zst"
1159        ));
1160        assert!(rrm_with_snapshot_config.is_file_get_path(
1161            "/incremental-snapshot-100-200-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.lz4"
1162        ));
1163        assert!(!rrm_with_snapshot_config.is_file_get_path(
1164            "/incremental-snapshot-100-200-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.bz2"
1165        ));
1166        assert!(!rrm_with_snapshot_config.is_file_get_path(
1167            "/incremental-snapshot-100-200-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.gz"
1168        ));
1169        assert!(!rrm_with_snapshot_config.is_file_get_path(
1170            "/incremental-snapshot-100-200-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar"
1171        ));
1172
1173        assert!(!rrm_with_snapshot_config.is_file_get_path(
1174            "/snapshot-notaslotnumber-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.zst"
1175        ));
1176        assert!(!rrm_with_snapshot_config.is_file_get_path(
1177            "/incremental-snapshot-notaslotnumber-200-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.zst"
1178        ));
1179        assert!(!rrm_with_snapshot_config.is_file_get_path(
1180            "/incremental-snapshot-100-notaslotnumber-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.zst"
1181        ));
1182
1183        assert!(
1184            !rrm_with_snapshot_config.is_file_get_path("../../../test/snapshot-123-xxx.tar.zst")
1185        );
1186        assert!(!rrm_with_snapshot_config
1187            .is_file_get_path("../../../test/incremental-snapshot-123-456-xxx.tar.zst"));
1188
1189        assert!(!rrm.is_file_get_path("/"));
1190        assert!(!rrm.is_file_get_path("//"));
1191        assert!(!rrm.is_file_get_path("/."));
1192        assert!(!rrm.is_file_get_path("/./"));
1193        assert!(!rrm.is_file_get_path("/.."));
1194        assert!(!rrm.is_file_get_path("/../"));
1195        assert!(!rrm.is_file_get_path("."));
1196        assert!(!rrm.is_file_get_path("./"));
1197        assert!(!rrm.is_file_get_path(".//"));
1198        assert!(!rrm.is_file_get_path(".."));
1199        assert!(!rrm.is_file_get_path("../"));
1200        assert!(!rrm.is_file_get_path("..//"));
1201        assert!(!rrm.is_file_get_path("🎣"));
1202
1203        assert!(!rrm_with_snapshot_config.is_file_get_path(
1204            "//snapshot-100-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.zst"
1205        ));
1206        assert!(!rrm_with_snapshot_config.is_file_get_path(
1207            "/./snapshot-100-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.zst"
1208        ));
1209        assert!(!rrm_with_snapshot_config.is_file_get_path(
1210            "/../snapshot-100-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.zst"
1211        ));
1212        assert!(!rrm_with_snapshot_config.is_file_get_path(
1213            "//incremental-snapshot-100-200-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.zst"
1214        ));
1215        assert!(!rrm_with_snapshot_config.is_file_get_path(
1216            "/./incremental-snapshot-100-200-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.zst"
1217        ));
1218        assert!(!rrm_with_snapshot_config.is_file_get_path(
1219            "/../incremental-snapshot-100-200-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.zst"
1220        ));
1221    }
1222
1223    #[test]
1224    fn test_process_file_get() {
1225        let runtime = Runtime::new().unwrap();
1226
1227        let ledger_path = get_tmp_ledger_path_auto_delete!();
1228        let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
1229        let genesis_path = ledger_path.path().join(DEFAULT_GENESIS_ARCHIVE);
1230        let bank_forks = create_bank_forks();
1231        let optimistically_confirmed_bank =
1232            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
1233        let rrm = RpcRequestMiddleware::new(
1234            ledger_path.path().to_path_buf(),
1235            None,
1236            bank_forks,
1237            RpcHealth::stub(optimistically_confirmed_bank, blockstore),
1238        );
1239
1240        // File does not exist => request should fail.
1241        let action = rrm.process_file_get(DEFAULT_GENESIS_DOWNLOAD_PATH);
1242        if let RequestMiddlewareAction::Respond { response, .. } = action {
1243            let response = runtime.block_on(response);
1244            let response = response.unwrap();
1245            assert_ne!(response.status(), 200);
1246        } else {
1247            panic!("Unexpected RequestMiddlewareAction variant");
1248        }
1249
1250        {
1251            let mut file = std::fs::File::create(&genesis_path).unwrap();
1252            file.write_all(b"should be ok").unwrap();
1253        }
1254
1255        // Normal file exist => request should succeed.
1256        let action = rrm.process_file_get(DEFAULT_GENESIS_DOWNLOAD_PATH);
1257        if let RequestMiddlewareAction::Respond { response, .. } = action {
1258            let response = runtime.block_on(response);
1259            let response = response.unwrap();
1260            assert_eq!(response.status(), 200);
1261        } else {
1262            panic!("Unexpected RequestMiddlewareAction variant");
1263        }
1264
1265        std::fs::remove_file(&genesis_path).unwrap();
1266        {
1267            let mut file = std::fs::File::create(ledger_path.path().join("wrong")).unwrap();
1268            file.write_all(b"wrong file").unwrap();
1269        }
1270        symlink::symlink_file("wrong", &genesis_path).unwrap();
1271
1272        // File is a symbolic link => request should fail.
1273        let action = rrm.process_file_get(DEFAULT_GENESIS_DOWNLOAD_PATH);
1274        if let RequestMiddlewareAction::Respond { response, .. } = action {
1275            let response = runtime.block_on(response);
1276            let response = response.unwrap();
1277            assert_ne!(response.status(), 200);
1278        } else {
1279            panic!("Unexpected RequestMiddlewareAction variant");
1280        }
1281    }
1282}