1use {
4 crate::{
5 cluster_tpu_info::ClusterTpuInfo,
6 max_slots::MaxSlots,
7 optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
8 rpc::{rpc_accounts::*, rpc_accounts_scan::*, rpc_bank::*, rpc_full::*, rpc_minimal::*, *},
9 rpc_cache::LargestAccountsCache,
10 rpc_health::*,
11 },
12 crossbeam_channel::unbounded,
13 jsonrpc_core::{futures::prelude::*, MetaIoHandler},
14 jsonrpc_http_server::{
15 hyper, AccessControlAllowOrigin, CloseHandle, DomainsValidation, RequestMiddleware,
16 RequestMiddlewareAction, ServerBuilder,
17 },
18 regex::Regex,
19 solana_cli_output::display::build_balance_message,
20 solana_client::connection_cache::{ConnectionCache, Protocol},
21 solana_genesis_config::DEFAULT_GENESIS_DOWNLOAD_PATH,
22 solana_gossip::cluster_info::ClusterInfo,
23 solana_hash::Hash,
24 solana_keypair::Keypair,
25 solana_ledger::{
26 bigtable_upload::ConfirmedBlockUploadConfig,
27 bigtable_upload_service::BigTableUploadService, blockstore::Blockstore,
28 leader_schedule_cache::LeaderScheduleCache,
29 },
30 solana_metrics::inc_new_counter_info,
31 solana_perf::thread::renice_this_thread,
32 solana_poh::poh_recorder::PohRecorder,
33 solana_quic_definitions::NotifyKeyUpdate,
34 solana_runtime::{
35 bank::Bank,
36 bank_forks::BankForks,
37 commitment::BlockCommitmentCache,
38 non_circulating_supply::calculate_non_circulating_supply,
39 prioritization_fee_cache::PrioritizationFeeCache,
40 snapshot_archive_info::SnapshotArchiveInfoGetter,
41 snapshot_config::SnapshotConfig,
42 snapshot_utils::{self, SnapshotInterval},
43 },
44 solana_send_transaction_service::{
45 send_transaction_service::{self, SendTransactionService},
46 transaction_client::{ConnectionCacheClient, TpuClientNextClient, TransactionClient},
47 },
48 solana_storage_bigtable::CredentialType,
49 solana_validator_exit::Exit,
50 std::{
51 net::{SocketAddr, UdpSocket},
52 path::{Path, PathBuf},
53 pin::Pin,
54 sync::{
55 atomic::{AtomicBool, AtomicU64, Ordering},
56 Arc, RwLock,
57 },
58 task::{Context, Poll},
59 thread::{self, Builder, JoinHandle},
60 time::{Duration, Instant},
61 },
62 tokio::runtime::{Builder as TokioBuilder, Handle as RuntimeHandle, Runtime as TokioRuntime},
63 tokio_util::{
64 bytes::Bytes,
65 codec::{BytesCodec, FramedRead},
66 sync::CancellationToken,
67 },
68};
69
70const FULL_SNAPSHOT_REQUEST_PATH: &str = "/snapshot.tar.bz2";
71const INCREMENTAL_SNAPSHOT_REQUEST_PATH: &str = "/incremental-snapshot.tar.bz2";
72const LARGEST_ACCOUNTS_CACHE_DURATION: u64 = 60 * 60 * 2;
73const FALLBACK_FULL_SNAPSHOT_TIMEOUT_SECS: Duration = Duration::from_secs(12_000);
77const FALLBACK_INCREMENTAL_SNAPSHOT_TIMEOUT_SECS: Duration = Duration::from_secs(250);
79
80enum SnapshotKind {
81 Full,
82 Incremental,
83}
84
85struct TimeoutStream<S> {
86 inner: S,
87 deadline: Instant,
88}
89
90impl<S> TimeoutStream<S> {
91 fn new(inner: S, timeout: Duration) -> Self {
92 Self {
93 inner,
94 deadline: Instant::now() + timeout,
95 }
96 }
97}
98
99impl<S> Stream for TimeoutStream<S>
100where
101 S: Stream<Item = std::io::Result<Bytes>> + Unpin,
102{
103 type Item = std::io::Result<Bytes>;
104
105 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
106 if Instant::now() >= self.deadline {
107 return Poll::Ready(Some(Err(std::io::Error::new(
108 std::io::ErrorKind::TimedOut,
109 "snapshot transfer deadline exceeded",
110 ))));
111 }
112 Pin::new(&mut self.inner).poll_next(cx)
113 }
114}
115
116pub struct JsonRpcService {
117 thread_hdl: JoinHandle<()>,
118
119 #[cfg(test)]
120 pub request_processor: JsonRpcRequestProcessor, close_handle: Option<CloseHandle>,
123
124 client_updater: Arc<dyn NotifyKeyUpdate + Send + Sync>,
125}
126
127struct RpcRequestMiddleware {
128 ledger_path: PathBuf,
129 full_snapshot_archive_path_regex: Regex,
130 incremental_snapshot_archive_path_regex: Regex,
131 snapshot_config: Option<SnapshotConfig>,
132 bank_forks: Arc<RwLock<BankForks>>,
133 health: Arc<RpcHealth>,
134}
135
136impl RpcRequestMiddleware {
137 pub fn new(
138 ledger_path: PathBuf,
139 snapshot_config: Option<SnapshotConfig>,
140 bank_forks: Arc<RwLock<BankForks>>,
141 health: Arc<RpcHealth>,
142 ) -> Self {
143 Self {
144 ledger_path,
145 full_snapshot_archive_path_regex: Regex::new(
146 snapshot_utils::FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX,
147 )
148 .unwrap(),
149 incremental_snapshot_archive_path_regex: Regex::new(
150 snapshot_utils::INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX,
151 )
152 .unwrap(),
153 snapshot_config,
154 bank_forks,
155 health,
156 }
157 }
158
159 fn redirect(location: &str) -> hyper::Response<hyper::Body> {
160 hyper::Response::builder()
161 .status(hyper::StatusCode::SEE_OTHER)
162 .header(hyper::header::LOCATION, location)
163 .body(hyper::Body::from(String::from(location)))
164 .unwrap()
165 }
166
167 fn not_found() -> hyper::Response<hyper::Body> {
168 hyper::Response::builder()
169 .status(hyper::StatusCode::NOT_FOUND)
170 .body(hyper::Body::empty())
171 .unwrap()
172 }
173
174 fn internal_server_error() -> hyper::Response<hyper::Body> {
175 hyper::Response::builder()
176 .status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
177 .body(hyper::Body::empty())
178 .unwrap()
179 }
180
181 fn strip_leading_slash(path: &str) -> Option<&str> {
182 path.strip_prefix('/')
183 }
184
185 fn is_file_get_path(&self, path: &str) -> bool {
186 if path == DEFAULT_GENESIS_DOWNLOAD_PATH {
187 return true;
188 }
189
190 if self.snapshot_config.is_none() {
191 return false;
192 }
193
194 let Some(path) = Self::strip_leading_slash(path) else {
195 return false;
196 };
197
198 self.full_snapshot_archive_path_regex.is_match(path)
199 || self.incremental_snapshot_archive_path_regex.is_match(path)
200 }
201
202 #[cfg(unix)]
203 async fn open_no_follow(path: impl AsRef<Path>) -> std::io::Result<tokio::fs::File> {
204 tokio::fs::OpenOptions::new()
205 .read(true)
206 .write(false)
207 .create(false)
208 .custom_flags(libc::O_NOFOLLOW)
209 .open(path)
210 .await
211 }
212
213 #[cfg(not(unix))]
214 async fn open_no_follow(path: impl AsRef<Path>) -> std::io::Result<tokio::fs::File> {
215 tokio::fs::File::open(path).await
217 }
218
219 fn find_snapshot_file<P>(&self, stem: P) -> (PathBuf, SnapshotKind)
220 where
221 P: AsRef<Path>,
222 {
223 let is_full = self
224 .full_snapshot_archive_path_regex
225 .is_match(Path::new("").join(&stem).to_str().unwrap());
226 let root = if is_full {
227 &self
228 .snapshot_config
229 .as_ref()
230 .unwrap()
231 .full_snapshot_archives_dir
232 } else {
233 &self
234 .snapshot_config
235 .as_ref()
236 .unwrap()
237 .incremental_snapshot_archives_dir
238 };
239 let local_path = root.join(&stem);
240 let path = if local_path.exists() {
241 local_path
242 } else {
243 snapshot_utils::build_snapshot_archives_remote_dir(root).join(stem)
245 };
246 (
247 path,
248 if is_full {
249 SnapshotKind::Full
250 } else {
251 SnapshotKind::Incremental
252 },
253 )
254 }
255
256 fn process_file_get(&self, path: &str) -> RequestMiddlewareAction {
257 let (filename, snapshot_type) = {
258 let stem = Self::strip_leading_slash(path).expect("path already verified");
259 match path {
260 DEFAULT_GENESIS_DOWNLOAD_PATH => {
261 inc_new_counter_info!("rpc-get_genesis", 1);
262 (self.ledger_path.join(stem), None)
263 }
264 _ => {
265 inc_new_counter_info!("rpc-get_snapshot", 1);
266 let (path, snapshot_type) = self.find_snapshot_file(stem);
267 (path, Some(snapshot_type))
268 }
269 }
270 };
271 let file_length = std::fs::metadata(&filename)
272 .map(|m| m.len())
273 .unwrap_or(0)
274 .to_string();
275 info!("get {path} -> {filename:?} ({file_length} bytes)");
276
277 if cfg!(not(test)) {
278 assert!(
279 self.snapshot_config.is_some(),
280 "snapshot_config should never be None outside of tests"
281 );
282 }
283 let snapshot_timeout = self.snapshot_config.as_ref().and_then(|config| {
284 snapshot_type.map(|st| {
285 let interval = match st {
286 SnapshotKind::Full => config.full_snapshot_archive_interval,
287 SnapshotKind::Incremental => config.incremental_snapshot_archive_interval,
288 };
289 let computed = match interval {
290 SnapshotInterval::Disabled => Duration::ZERO,
291 SnapshotInterval::Slots(slots) => Duration::from_millis(
292 slots
293 .get()
294 .saturating_mul(solana_clock::DEFAULT_MS_PER_SLOT),
295 ),
296 };
297 let fallback = match st {
298 SnapshotKind::Full => FALLBACK_FULL_SNAPSHOT_TIMEOUT_SECS,
299 SnapshotKind::Incremental => FALLBACK_INCREMENTAL_SNAPSHOT_TIMEOUT_SECS,
300 };
301 std::cmp::max(computed, fallback)
302 })
303 });
304
305 RequestMiddlewareAction::Respond {
306 should_validate_hosts: true,
307 response: Box::pin(async move {
308 match Self::open_no_follow(filename).await {
309 Err(err) => Ok(if err.kind() == std::io::ErrorKind::NotFound {
310 Self::not_found()
311 } else {
312 Self::internal_server_error()
313 }),
314 Ok(file) => {
315 let stream =
316 FramedRead::new(file, BytesCodec::new()).map_ok(|b| b.freeze());
317 let body = if let Some(timeout) = snapshot_timeout {
318 hyper::Body::wrap_stream(TimeoutStream::new(stream, timeout))
319 } else {
320 hyper::Body::wrap_stream(stream)
321 };
322 Ok(hyper::Response::builder()
323 .header(hyper::header::CONTENT_LENGTH, file_length)
324 .body(body)
325 .unwrap())
326 }
327 }
328 }),
329 }
330 }
331
332 fn health_check(&self) -> &'static str {
333 let response = match self.health.check() {
334 RpcHealthStatus::Ok => "ok",
335 RpcHealthStatus::Behind { .. } => "behind",
336 RpcHealthStatus::Unknown => "unknown",
337 };
338 info!("health check: {response}");
339 response
340 }
341}
342
343impl RequestMiddleware for RpcRequestMiddleware {
344 fn on_request(&self, request: hyper::Request<hyper::Body>) -> RequestMiddlewareAction {
345 trace!("request uri: {}", request.uri());
346
347 if let Some(ref snapshot_config) = self.snapshot_config {
348 if request.uri().path() == FULL_SNAPSHOT_REQUEST_PATH
349 || request.uri().path() == INCREMENTAL_SNAPSHOT_REQUEST_PATH
350 {
351 let full_snapshot_archive_info =
353 snapshot_utils::get_highest_full_snapshot_archive_info(
354 &snapshot_config.full_snapshot_archives_dir,
355 );
356 let snapshot_archive_info =
357 if let Some(full_snapshot_archive_info) = full_snapshot_archive_info {
358 if request.uri().path() == FULL_SNAPSHOT_REQUEST_PATH {
359 Some(full_snapshot_archive_info.snapshot_archive_info().clone())
360 } else {
361 snapshot_utils::get_highest_incremental_snapshot_archive_info(
362 &snapshot_config.incremental_snapshot_archives_dir,
363 full_snapshot_archive_info.slot(),
364 )
365 .map(|incremental_snapshot_archive_info| {
366 incremental_snapshot_archive_info
367 .snapshot_archive_info()
368 .clone()
369 })
370 }
371 } else {
372 None
373 };
374 return if let Some(snapshot_archive_info) = snapshot_archive_info {
375 RpcRequestMiddleware::redirect(&format!(
376 "/{}",
377 snapshot_archive_info
378 .path
379 .file_name()
380 .unwrap_or_else(|| std::ffi::OsStr::new(""))
381 .to_str()
382 .unwrap_or("")
383 ))
384 } else {
385 RpcRequestMiddleware::not_found()
386 }
387 .into();
388 }
389 }
390
391 if let Some(path) = match_supply_path(request.uri().path()) {
392 process_rest(&self.bank_forks, path)
393 } else if self.is_file_get_path(request.uri().path()) {
394 self.process_file_get(request.uri().path())
395 } else if request.uri().path() == "/health" {
396 hyper::Response::builder()
397 .status(hyper::StatusCode::OK)
398 .body(hyper::Body::from(self.health_check()))
399 .unwrap()
400 .into()
401 } else {
402 request.into()
403 }
404 }
405}
406
407fn match_supply_path(path: &str) -> Option<&str> {
408 match path {
409 "/v0/circulating-supply" | "/v0/total-supply" => Some(path),
410 _ => None,
411 }
412}
413
414#[derive(Debug)]
415pub enum SupplyCalcError {
416 Scan(String),
417}
418
419async fn calculate_circulating_supply_async(bank: &Arc<Bank>) -> Result<u64, SupplyCalcError> {
420 let total_supply = bank.capitalization();
421 let bank = Arc::clone(bank);
422 let non_circulating_supply =
423 tokio::task::spawn_blocking(move || calculate_non_circulating_supply(&bank))
424 .await
425 .expect("Failed to spawn blocking task")
426 .map_err(|e| SupplyCalcError::Scan(e.to_string()))?;
427
428 Ok(total_supply.saturating_sub(non_circulating_supply.lamports))
429}
430
431async fn handle_rest(bank_forks: &Arc<RwLock<BankForks>>, path: &str) -> Option<String> {
432 match path {
433 "/v0/circulating-supply" => {
434 let bank = bank_forks.read().unwrap().root_bank();
435 let supply_result = calculate_circulating_supply_async(&bank).await;
436 match supply_result {
437 Ok(supply) => Some(build_balance_message(supply, false, false)),
438 Err(_) => None,
439 }
440 }
441 "/v0/total-supply" => {
442 let bank = bank_forks.read().unwrap().root_bank();
443 let total_supply = bank.capitalization();
444 Some(build_balance_message(total_supply, false, false))
445 }
446 _ => None,
447 }
448}
449
450fn process_rest(bank_forks: &Arc<RwLock<BankForks>>, path: &str) -> RequestMiddlewareAction {
451 let bank_forks = bank_forks.clone();
452 let path = path.to_string();
453
454 RequestMiddlewareAction::Respond {
455 should_validate_hosts: true,
456 response: Box::pin(async move {
457 let result = handle_rest(&bank_forks, path.as_str()).await;
458 match result {
459 Some(s) => Ok(hyper::Response::builder()
460 .status(hyper::StatusCode::OK)
461 .body(hyper::Body::from(s))
462 .unwrap()),
463 None => Ok(RpcRequestMiddleware::not_found()),
464 }
465 }),
466 }
467}
468
469pub struct JsonRpcServiceConfig<'a> {
473 pub rpc_addr: SocketAddr,
474 pub rpc_config: JsonRpcConfig,
475 pub snapshot_config: Option<SnapshotConfig>,
476 pub bank_forks: Arc<RwLock<BankForks>>,
477 pub block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
478 pub blockstore: Arc<Blockstore>,
479 pub cluster_info: Arc<ClusterInfo>,
480 pub poh_recorder: Option<Arc<RwLock<PohRecorder>>>,
481 pub genesis_hash: Hash,
482 pub ledger_path: PathBuf,
483 pub validator_exit: Arc<RwLock<Exit>>,
484 pub exit: Arc<AtomicBool>,
485 pub override_health_check: Arc<AtomicBool>,
486 pub startup_verification_complete: Arc<AtomicBool>,
487 pub optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
488 pub send_transaction_service_config: send_transaction_service::Config,
489 pub max_slots: Arc<MaxSlots>,
490 pub leader_schedule_cache: Arc<LeaderScheduleCache>,
491 pub max_complete_transaction_status_slot: Arc<AtomicU64>,
492 pub prioritization_fee_cache: Arc<PrioritizationFeeCache>,
493 pub client_option: ClientOption<'a>,
494}
495
496pub enum ClientOption<'a> {
503 ConnectionCache(Arc<ConnectionCache>),
504 TpuClientNext(&'a Keypair, UdpSocket, RuntimeHandle, CancellationToken),
505}
506
507impl JsonRpcService {
508 pub fn new_with_config(config: JsonRpcServiceConfig) -> Result<Self, String> {
509 let runtime = service_runtime(
510 config.rpc_config.rpc_threads,
511 config.rpc_config.rpc_blocking_threads,
512 config.rpc_config.rpc_niceness_adj,
513 );
514 let leader_info = config
515 .poh_recorder
516 .map(|recorder| ClusterTpuInfo::new(config.cluster_info.clone(), recorder));
517
518 match config.client_option {
519 ClientOption::ConnectionCache(connection_cache) => {
520 let my_tpu_address = config
521 .cluster_info
522 .my_contact_info()
523 .tpu(connection_cache.protocol())
524 .ok_or(format!(
525 "Invalid {:?} socket address for TPU",
526 connection_cache.protocol()
527 ))?;
528 let client = ConnectionCacheClient::new(
529 connection_cache,
530 my_tpu_address,
531 config.send_transaction_service_config.tpu_peers.clone(),
532 leader_info,
533 config.send_transaction_service_config.leader_forward_count,
534 );
535 let json_rpc_service = Self::new_with_client(
536 config.rpc_addr,
537 config.rpc_config,
538 config.snapshot_config,
539 config.bank_forks,
540 config.block_commitment_cache,
541 config.blockstore,
542 config.cluster_info,
543 config.genesis_hash,
544 config.ledger_path.as_path(),
545 config.validator_exit,
546 config.exit,
547 config.override_health_check,
548 config.startup_verification_complete,
549 config.optimistically_confirmed_bank,
550 config.send_transaction_service_config,
551 config.max_slots,
552 config.leader_schedule_cache,
553 client.clone(),
554 config.max_complete_transaction_status_slot,
555 config.prioritization_fee_cache,
556 runtime,
557 )?;
558 Ok(json_rpc_service)
559 }
560 ClientOption::TpuClientNext(
561 identity_keypair,
562 tpu_client_socket,
563 client_runtime,
564 cancel,
565 ) => {
566 let my_tpu_address = config
567 .cluster_info
568 .my_contact_info()
569 .tpu(Protocol::QUIC)
570 .ok_or(format!(
571 "Invalid {:?} socket address for TPU",
572 Protocol::QUIC
573 ))?;
574 let client = TpuClientNextClient::new(
575 client_runtime,
576 my_tpu_address,
577 config.send_transaction_service_config.tpu_peers.clone(),
578 leader_info,
579 config.send_transaction_service_config.leader_forward_count,
580 Some(identity_keypair),
581 tpu_client_socket,
582 cancel,
583 );
584
585 let json_rpc_service = Self::new_with_client(
586 config.rpc_addr,
587 config.rpc_config.clone(),
588 config.snapshot_config,
589 config.bank_forks.clone(),
590 config.block_commitment_cache.clone(),
591 config.blockstore.clone(),
592 config.cluster_info.clone(),
593 config.genesis_hash,
594 config.ledger_path.as_path(),
595 config.validator_exit,
596 config.exit,
597 config.override_health_check,
598 config.startup_verification_complete,
599 config.optimistically_confirmed_bank,
600 config.send_transaction_service_config,
601 config.max_slots,
602 config.leader_schedule_cache,
603 client,
604 config.max_complete_transaction_status_slot,
605 config.prioritization_fee_cache,
606 runtime,
607 )?;
608 Ok(json_rpc_service)
609 }
610 }
611 }
612
613 #[allow(clippy::too_many_arguments)]
614 pub fn new(
615 rpc_addr: SocketAddr,
616 config: JsonRpcConfig,
617 snapshot_config: Option<SnapshotConfig>,
618 bank_forks: Arc<RwLock<BankForks>>,
619 block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
620 blockstore: Arc<Blockstore>,
621 cluster_info: Arc<ClusterInfo>,
622 poh_recorder: Option<Arc<RwLock<PohRecorder>>>,
623 genesis_hash: Hash,
624 ledger_path: &Path,
625 validator_exit: Arc<RwLock<Exit>>,
626 exit: Arc<AtomicBool>,
627 override_health_check: Arc<AtomicBool>,
628 startup_verification_complete: Arc<AtomicBool>,
629 optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
630 send_transaction_service_config: send_transaction_service::Config,
631 max_slots: Arc<MaxSlots>,
632 leader_schedule_cache: Arc<LeaderScheduleCache>,
633 connection_cache: Arc<ConnectionCache>,
634 max_complete_transaction_status_slot: Arc<AtomicU64>,
635 prioritization_fee_cache: Arc<PrioritizationFeeCache>,
636 ) -> Result<Self, String> {
637 let runtime = service_runtime(
638 config.rpc_threads,
639 config.rpc_blocking_threads,
640 config.rpc_niceness_adj,
641 );
642
643 let tpu_address = cluster_info
644 .my_contact_info()
645 .tpu(connection_cache.protocol())
646 .ok_or_else(|| {
647 format!(
648 "Invalid {:?} socket address for TPU",
649 connection_cache.protocol()
650 )
651 })?;
652
653 let leader_info =
654 poh_recorder.map(|recorder| ClusterTpuInfo::new(cluster_info.clone(), recorder));
655 let client = ConnectionCacheClient::new(
656 connection_cache,
657 tpu_address,
658 send_transaction_service_config.tpu_peers.clone(),
659 leader_info,
660 send_transaction_service_config.leader_forward_count,
661 );
662 let json_rpc_service = Self::new_with_client(
663 rpc_addr,
664 config,
665 snapshot_config,
666 bank_forks,
667 block_commitment_cache,
668 blockstore,
669 cluster_info,
670 genesis_hash,
671 ledger_path,
672 validator_exit,
673 exit,
674 override_health_check,
675 startup_verification_complete,
676 optimistically_confirmed_bank,
677 send_transaction_service_config,
678 max_slots,
679 leader_schedule_cache,
680 client.clone(),
681 max_complete_transaction_status_slot,
682 prioritization_fee_cache,
683 runtime,
684 )?;
685 Ok(json_rpc_service)
686 }
687
688 #[allow(clippy::too_many_arguments)]
689 fn new_with_client<
690 Client: TransactionClient
691 + NotifyKeyUpdate
692 + Clone
693 + std::marker::Send
694 + std::marker::Sync
695 + 'static,
696 >(
697 rpc_addr: SocketAddr,
698 config: JsonRpcConfig,
699 snapshot_config: Option<SnapshotConfig>,
700 bank_forks: Arc<RwLock<BankForks>>,
701 block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
702 blockstore: Arc<Blockstore>,
703 cluster_info: Arc<ClusterInfo>,
704 genesis_hash: Hash,
705 ledger_path: &Path,
706 validator_exit: Arc<RwLock<Exit>>,
707 exit: Arc<AtomicBool>,
708 override_health_check: Arc<AtomicBool>,
709 startup_verification_complete: Arc<AtomicBool>,
710 optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
711 send_transaction_service_config: send_transaction_service::Config,
712 max_slots: Arc<MaxSlots>,
713 leader_schedule_cache: Arc<LeaderScheduleCache>,
714 client: Client,
715 max_complete_transaction_status_slot: Arc<AtomicU64>,
716 prioritization_fee_cache: Arc<PrioritizationFeeCache>,
717 runtime: Arc<TokioRuntime>,
718 ) -> Result<Self, String> {
719 info!("rpc bound to {rpc_addr:?}");
720 info!("rpc configuration: {config:?}");
721 let rpc_niceness_adj = config.rpc_niceness_adj;
722
723 let health = Arc::new(RpcHealth::new(
724 Arc::clone(&optimistically_confirmed_bank),
725 Arc::clone(&blockstore),
726 config.health_check_slot_distance,
727 override_health_check,
728 startup_verification_complete,
729 ));
730
731 let largest_accounts_cache = Arc::new(RwLock::new(LargestAccountsCache::new(
732 LARGEST_ACCOUNTS_CACHE_DURATION,
733 )));
734
735 let exit_bigtable_ledger_upload_service = Arc::new(AtomicBool::new(false));
736
737 let (bigtable_ledger_storage, _bigtable_ledger_upload_service) =
738 if let Some(RpcBigtableConfig {
739 enable_bigtable_ledger_upload,
740 ref bigtable_instance_name,
741 ref bigtable_app_profile_id,
742 timeout,
743 max_message_size,
744 }) = config.rpc_bigtable_config
745 {
746 let bigtable_config = solana_storage_bigtable::LedgerStorageConfig {
747 read_only: !enable_bigtable_ledger_upload,
748 timeout,
749 credential_type: CredentialType::Filepath(None),
750 instance_name: bigtable_instance_name.clone(),
751 app_profile_id: bigtable_app_profile_id.clone(),
752 max_message_size,
753 };
754 runtime
755 .block_on(solana_storage_bigtable::LedgerStorage::new_with_config(
756 bigtable_config,
757 ))
758 .map(|bigtable_ledger_storage| {
759 info!("BigTable ledger storage initialized");
760
761 let bigtable_ledger_upload_service = if enable_bigtable_ledger_upload {
762 Some(Arc::new(BigTableUploadService::new_with_config(
763 runtime.clone(),
764 bigtable_ledger_storage.clone(),
765 blockstore.clone(),
766 block_commitment_cache.clone(),
767 max_complete_transaction_status_slot.clone(),
768 ConfirmedBlockUploadConfig::default(),
769 exit_bigtable_ledger_upload_service.clone(),
770 )))
771 } else {
772 None
773 };
774
775 (
776 Some(bigtable_ledger_storage),
777 bigtable_ledger_upload_service,
778 )
779 })
780 .unwrap_or_else(|err| {
781 error!("Failed to initialize BigTable ledger storage: {err:?}");
782 (None, None)
783 })
784 } else {
785 (None, None)
786 };
787
788 let full_api = config.full_api;
789 let max_request_body_size = config
790 .max_request_body_size
791 .unwrap_or(MAX_REQUEST_BODY_SIZE);
792 let (request_processor, receiver) = JsonRpcRequestProcessor::new(
793 config,
794 snapshot_config.clone(),
795 bank_forks.clone(),
796 block_commitment_cache,
797 blockstore,
798 validator_exit.clone(),
799 health.clone(),
800 cluster_info.clone(),
801 genesis_hash,
802 bigtable_ledger_storage,
803 optimistically_confirmed_bank,
804 largest_accounts_cache,
805 max_slots,
806 leader_schedule_cache,
807 max_complete_transaction_status_slot,
808 prioritization_fee_cache,
809 Arc::clone(&runtime),
810 );
811
812 let _send_transaction_service = Arc::new(SendTransactionService::new_with_client(
813 &bank_forks,
814 receiver,
815 client.clone(),
816 send_transaction_service_config,
817 exit,
818 ));
819
820 #[cfg(test)]
821 let test_request_processor = request_processor.clone();
822
823 let ledger_path = ledger_path.to_path_buf();
824
825 let (close_handle_sender, close_handle_receiver) = unbounded();
826 let thread_hdl = Builder::new()
827 .name("solJsonRpcSvc".to_string())
828 .spawn(move || {
829 renice_this_thread(rpc_niceness_adj).unwrap();
830
831 let mut io = MetaIoHandler::default();
832
833 io.extend_with(rpc_minimal::MinimalImpl.to_delegate());
834 if full_api {
835 io.extend_with(rpc_bank::BankDataImpl.to_delegate());
836 io.extend_with(rpc_accounts::AccountsDataImpl.to_delegate());
837 io.extend_with(rpc_accounts_scan::AccountsScanImpl.to_delegate());
838 io.extend_with(rpc_full::FullImpl.to_delegate());
839 }
840
841 let request_middleware = RpcRequestMiddleware::new(
842 ledger_path,
843 snapshot_config,
844 bank_forks.clone(),
845 health.clone(),
846 );
847 let server = ServerBuilder::with_meta_extractor(
848 io,
849 move |req: &hyper::Request<hyper::Body>| {
850 let xbigtable = req.headers().get("x-bigtable");
851 if xbigtable.is_some_and(|v| v == "disabled") {
852 request_processor.clone_without_bigtable()
853 } else {
854 request_processor.clone()
855 }
856 },
857 )
858 .event_loop_executor(runtime.handle().clone())
859 .threads(1)
860 .cors(DomainsValidation::AllowOnly(vec![
861 AccessControlAllowOrigin::Any,
862 ]))
863 .cors_max_age(86400)
864 .request_middleware(request_middleware)
865 .max_request_body_size(max_request_body_size)
866 .start_http(&rpc_addr);
867
868 if let Err(e) = server {
869 warn!(
870 "JSON RPC service unavailable error: {e:?}. Also, check that port {} is \
871 not already in use by another application",
872 rpc_addr.port()
873 );
874 close_handle_sender.send(Err(e.to_string())).unwrap();
875 return;
876 }
877
878 let server = server.unwrap();
879 close_handle_sender.send(Ok(server.close_handle())).unwrap();
880 server.wait();
881 exit_bigtable_ledger_upload_service.store(true, Ordering::Relaxed);
882 })
883 .unwrap();
884
885 let close_handle = close_handle_receiver.recv().unwrap()?;
886 let close_handle_ = close_handle.clone();
887 validator_exit
888 .write()
889 .unwrap()
890 .register_exit(Box::new(move || {
891 close_handle_.close();
892 }));
893 Ok(Self {
894 thread_hdl,
895 #[cfg(test)]
896 request_processor: test_request_processor,
897 close_handle: Some(close_handle),
898 client_updater: Arc::new(client) as Arc<dyn NotifyKeyUpdate + Send + Sync>,
899 })
900 }
901
902 pub fn exit(&mut self) {
903 if let Some(c) = self.close_handle.take() {
904 c.close()
905 }
906 }
907
908 pub fn join(mut self) -> thread::Result<()> {
909 self.exit();
910 self.thread_hdl.join()
911 }
912
913 pub fn get_client_key_updater(&self) -> Arc<dyn NotifyKeyUpdate + Send + Sync> {
914 self.client_updater.clone()
915 }
916}
917
918pub fn service_runtime(
919 rpc_threads: usize,
920 rpc_blocking_threads: usize,
921 rpc_niceness_adj: i8,
922) -> Arc<TokioRuntime> {
923 let rpc_threads = 1.max(rpc_threads);
940 let rpc_blocking_threads = 1.max(rpc_blocking_threads);
941 let runtime = Arc::new(
942 TokioBuilder::new_multi_thread()
943 .worker_threads(rpc_threads)
944 .max_blocking_threads(rpc_blocking_threads)
945 .on_thread_start(move || renice_this_thread(rpc_niceness_adj).unwrap())
946 .thread_name("solRpcEl")
947 .enable_all()
948 .build()
949 .expect("Runtime"),
950 );
951 runtime
952}
953
954#[cfg(test)]
955mod tests {
956 use {
957 super::*,
958 crate::rpc::{create_validator_exit, tests::new_test_cluster_info},
959 solana_cluster_type::ClusterType,
960 solana_genesis_config::DEFAULT_GENESIS_ARCHIVE,
961 solana_ledger::{
962 genesis_utils::{create_genesis_config, GenesisConfigInfo},
963 get_tmp_ledger_path_auto_delete,
964 },
965 solana_rpc_client_api::config::RpcContextConfig,
966 solana_runtime::bank::Bank,
967 solana_signer::Signer,
968 std::{
969 io::Write,
970 net::{IpAddr, Ipv4Addr},
971 },
972 tokio::runtime::Runtime,
973 };
974
975 #[test]
976 fn test_rpc_new() {
977 let GenesisConfigInfo {
978 genesis_config,
979 mint_keypair,
980 ..
981 } = create_genesis_config(10_000);
982 let exit = Arc::new(AtomicBool::new(false));
983 let validator_exit = create_validator_exit(exit.clone());
984 let bank = Bank::new_for_tests(&genesis_config);
985 let cluster_info = Arc::new(new_test_cluster_info());
986 let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
987 let port_range = solana_net_utils::sockets::localhost_port_range_for_tests();
988 let rpc_addr = SocketAddr::new(
989 ip_addr,
990 solana_net_utils::find_available_port_in_range(ip_addr, port_range).unwrap(),
991 );
992 let bank_forks = BankForks::new_rw_arc(bank);
993 let ledger_path = get_tmp_ledger_path_auto_delete!();
994 let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
995 let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
996 let optimistically_confirmed_bank =
997 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
998 let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test"));
999 let mut rpc_service = JsonRpcService::new(
1000 rpc_addr,
1001 JsonRpcConfig::default(),
1002 None,
1003 bank_forks,
1004 block_commitment_cache,
1005 blockstore,
1006 cluster_info,
1007 None,
1008 Hash::default(),
1009 &PathBuf::from("farf"),
1010 validator_exit,
1011 exit,
1012 Arc::new(AtomicBool::new(false)),
1013 Arc::new(AtomicBool::new(true)),
1014 optimistically_confirmed_bank,
1015 send_transaction_service::Config {
1016 retry_rate_ms: 1000,
1017 leader_forward_count: 1,
1018 ..send_transaction_service::Config::default()
1019 },
1020 Arc::new(MaxSlots::default()),
1021 Arc::new(LeaderScheduleCache::default()),
1022 connection_cache,
1023 Arc::new(AtomicU64::default()),
1024 Arc::new(PrioritizationFeeCache::default()),
1025 )
1026 .expect("assume successful JsonRpcService start");
1027 let thread = rpc_service.thread_hdl.thread();
1028 assert_eq!(thread.name().unwrap(), "solJsonRpcSvc");
1029
1030 assert_eq!(
1031 10_000,
1032 rpc_service
1033 .request_processor
1034 .get_balance(&mint_keypair.pubkey(), RpcContextConfig::default())
1035 .unwrap()
1036 .value
1037 );
1038 rpc_service.exit();
1039 rpc_service.join().unwrap();
1040 }
1041
1042 fn create_bank_forks() -> Arc<RwLock<BankForks>> {
1043 let GenesisConfigInfo {
1044 mut genesis_config, ..
1045 } = create_genesis_config(10_000);
1046 genesis_config.cluster_type = ClusterType::MainnetBeta;
1047 let bank = Bank::new_for_tests(&genesis_config);
1048 BankForks::new_rw_arc(bank)
1049 }
1050
1051 #[test]
1052 fn test_process_rest_api() {
1053 let bank_forks = create_bank_forks();
1054 let runtime = tokio::runtime::Runtime::new().unwrap();
1055
1056 runtime.block_on(async {
1057 assert_eq!(
1058 None,
1059 handle_rest(&bank_forks, "not-a-supported-rest-api").await
1060 );
1061
1062 let circulating_supply = handle_rest(&bank_forks, "/v0/circulating-supply").await;
1063 assert!(circulating_supply.is_some());
1064
1065 let total_supply = handle_rest(&bank_forks, "/v0/total-supply").await;
1066 assert!(total_supply.is_some());
1067
1068 assert_eq!(
1069 handle_rest(&bank_forks, "/v0/circulating-supply").await,
1070 handle_rest(&bank_forks, "/v0/total-supply").await
1071 );
1072 });
1073 }
1074
1075 #[test]
1076 fn test_strip_prefix() {
1077 assert_eq!(RpcRequestMiddleware::strip_leading_slash("/"), Some(""));
1078 assert_eq!(RpcRequestMiddleware::strip_leading_slash("//"), Some("/"));
1079 assert_eq!(
1080 RpcRequestMiddleware::strip_leading_slash("/abc"),
1081 Some("abc")
1082 );
1083 assert_eq!(
1084 RpcRequestMiddleware::strip_leading_slash("//abc"),
1085 Some("/abc")
1086 );
1087 assert_eq!(
1088 RpcRequestMiddleware::strip_leading_slash("/./abc"),
1089 Some("./abc")
1090 );
1091 assert_eq!(
1092 RpcRequestMiddleware::strip_leading_slash("/../abc"),
1093 Some("../abc")
1094 );
1095
1096 assert_eq!(RpcRequestMiddleware::strip_leading_slash(""), None);
1097 assert_eq!(RpcRequestMiddleware::strip_leading_slash("./"), None);
1098 assert_eq!(RpcRequestMiddleware::strip_leading_slash("../"), None);
1099 assert_eq!(RpcRequestMiddleware::strip_leading_slash("."), None);
1100 assert_eq!(RpcRequestMiddleware::strip_leading_slash(".."), None);
1101 assert_eq!(RpcRequestMiddleware::strip_leading_slash("abc"), None);
1102 }
1103
1104 #[test]
1105 fn test_is_file_get_path() {
1106 let ledger_path = get_tmp_ledger_path_auto_delete!();
1107 let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
1108 let bank_forks = create_bank_forks();
1109 let optimistically_confirmed_bank =
1110 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
1111 let health = RpcHealth::stub(optimistically_confirmed_bank, blockstore);
1112
1113 let bank_forks = create_bank_forks();
1114 let rrm = RpcRequestMiddleware::new(
1115 ledger_path.path().to_path_buf(),
1116 None,
1117 bank_forks.clone(),
1118 health.clone(),
1119 );
1120 let rrm_with_snapshot_config = RpcRequestMiddleware::new(
1121 ledger_path.path().to_path_buf(),
1122 Some(SnapshotConfig::default()),
1123 bank_forks,
1124 health,
1125 );
1126
1127 assert!(rrm.is_file_get_path(DEFAULT_GENESIS_DOWNLOAD_PATH));
1128 assert!(!rrm.is_file_get_path(DEFAULT_GENESIS_ARCHIVE));
1129 assert!(!rrm.is_file_get_path("//genesis.tar.bz2"));
1130 assert!(!rrm.is_file_get_path("/../genesis.tar.bz2"));
1131
1132 assert!(!rrm.is_file_get_path("/snapshot.tar.bz2"));
1134 assert!(!rrm.is_file_get_path("/incremental-snapshot.tar.bz2"));
1135
1136 assert!(!rrm.is_file_get_path(
1137 "/snapshot-100-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.zst"
1138 ));
1139 assert!(!rrm.is_file_get_path(
1140 "/incremental-snapshot-100-200-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.zst"
1141 ));
1142
1143 assert!(rrm_with_snapshot_config.is_file_get_path(
1144 "/snapshot-100-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.zst"
1145 ));
1146 assert!(rrm_with_snapshot_config.is_file_get_path(
1147 "/snapshot-100-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.lz4"
1148 ));
1149 assert!(!rrm_with_snapshot_config.is_file_get_path(
1150 "/snapshot-100-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.bz2"
1151 ));
1152 assert!(!rrm_with_snapshot_config
1153 .is_file_get_path("/snapshot-100-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.gz"));
1154 assert!(!rrm_with_snapshot_config
1155 .is_file_get_path("/snapshot-100-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar"));
1156
1157 assert!(rrm_with_snapshot_config.is_file_get_path(
1158 "/incremental-snapshot-100-200-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.zst"
1159 ));
1160 assert!(rrm_with_snapshot_config.is_file_get_path(
1161 "/incremental-snapshot-100-200-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.lz4"
1162 ));
1163 assert!(!rrm_with_snapshot_config.is_file_get_path(
1164 "/incremental-snapshot-100-200-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.bz2"
1165 ));
1166 assert!(!rrm_with_snapshot_config.is_file_get_path(
1167 "/incremental-snapshot-100-200-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.gz"
1168 ));
1169 assert!(!rrm_with_snapshot_config.is_file_get_path(
1170 "/incremental-snapshot-100-200-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar"
1171 ));
1172
1173 assert!(!rrm_with_snapshot_config.is_file_get_path(
1174 "/snapshot-notaslotnumber-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.zst"
1175 ));
1176 assert!(!rrm_with_snapshot_config.is_file_get_path(
1177 "/incremental-snapshot-notaslotnumber-200-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.zst"
1178 ));
1179 assert!(!rrm_with_snapshot_config.is_file_get_path(
1180 "/incremental-snapshot-100-notaslotnumber-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.zst"
1181 ));
1182
1183 assert!(
1184 !rrm_with_snapshot_config.is_file_get_path("../../../test/snapshot-123-xxx.tar.zst")
1185 );
1186 assert!(!rrm_with_snapshot_config
1187 .is_file_get_path("../../../test/incremental-snapshot-123-456-xxx.tar.zst"));
1188
1189 assert!(!rrm.is_file_get_path("/"));
1190 assert!(!rrm.is_file_get_path("//"));
1191 assert!(!rrm.is_file_get_path("/."));
1192 assert!(!rrm.is_file_get_path("/./"));
1193 assert!(!rrm.is_file_get_path("/.."));
1194 assert!(!rrm.is_file_get_path("/../"));
1195 assert!(!rrm.is_file_get_path("."));
1196 assert!(!rrm.is_file_get_path("./"));
1197 assert!(!rrm.is_file_get_path(".//"));
1198 assert!(!rrm.is_file_get_path(".."));
1199 assert!(!rrm.is_file_get_path("../"));
1200 assert!(!rrm.is_file_get_path("..//"));
1201 assert!(!rrm.is_file_get_path("🎣"));
1202
1203 assert!(!rrm_with_snapshot_config.is_file_get_path(
1204 "//snapshot-100-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.zst"
1205 ));
1206 assert!(!rrm_with_snapshot_config.is_file_get_path(
1207 "/./snapshot-100-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.zst"
1208 ));
1209 assert!(!rrm_with_snapshot_config.is_file_get_path(
1210 "/../snapshot-100-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.zst"
1211 ));
1212 assert!(!rrm_with_snapshot_config.is_file_get_path(
1213 "//incremental-snapshot-100-200-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.zst"
1214 ));
1215 assert!(!rrm_with_snapshot_config.is_file_get_path(
1216 "/./incremental-snapshot-100-200-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.zst"
1217 ));
1218 assert!(!rrm_with_snapshot_config.is_file_get_path(
1219 "/../incremental-snapshot-100-200-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.zst"
1220 ));
1221 }
1222
1223 #[test]
1224 fn test_process_file_get() {
1225 let runtime = Runtime::new().unwrap();
1226
1227 let ledger_path = get_tmp_ledger_path_auto_delete!();
1228 let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
1229 let genesis_path = ledger_path.path().join(DEFAULT_GENESIS_ARCHIVE);
1230 let bank_forks = create_bank_forks();
1231 let optimistically_confirmed_bank =
1232 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
1233 let rrm = RpcRequestMiddleware::new(
1234 ledger_path.path().to_path_buf(),
1235 None,
1236 bank_forks,
1237 RpcHealth::stub(optimistically_confirmed_bank, blockstore),
1238 );
1239
1240 let action = rrm.process_file_get(DEFAULT_GENESIS_DOWNLOAD_PATH);
1242 if let RequestMiddlewareAction::Respond { response, .. } = action {
1243 let response = runtime.block_on(response);
1244 let response = response.unwrap();
1245 assert_ne!(response.status(), 200);
1246 } else {
1247 panic!("Unexpected RequestMiddlewareAction variant");
1248 }
1249
1250 {
1251 let mut file = std::fs::File::create(&genesis_path).unwrap();
1252 file.write_all(b"should be ok").unwrap();
1253 }
1254
1255 let action = rrm.process_file_get(DEFAULT_GENESIS_DOWNLOAD_PATH);
1257 if let RequestMiddlewareAction::Respond { response, .. } = action {
1258 let response = runtime.block_on(response);
1259 let response = response.unwrap();
1260 assert_eq!(response.status(), 200);
1261 } else {
1262 panic!("Unexpected RequestMiddlewareAction variant");
1263 }
1264
1265 std::fs::remove_file(&genesis_path).unwrap();
1266 {
1267 let mut file = std::fs::File::create(ledger_path.path().join("wrong")).unwrap();
1268 file.write_all(b"wrong file").unwrap();
1269 }
1270 symlink::symlink_file("wrong", &genesis_path).unwrap();
1271
1272 let action = rrm.process_file_get(DEFAULT_GENESIS_DOWNLOAD_PATH);
1274 if let RequestMiddlewareAction::Respond { response, .. } = action {
1275 let response = runtime.block_on(response);
1276 let response = response.unwrap();
1277 assert_ne!(response.status(), 200);
1278 } else {
1279 panic!("Unexpected RequestMiddlewareAction variant");
1280 }
1281 }
1282}