1use {
2 crossbeam_channel::Sender,
3 jsonrpc_core::{BoxFuture, ErrorCode, MetaIoHandler, Metadata, Result},
4 jsonrpc_core_client::{RpcError, transports::ipc},
5 jsonrpc_derive::rpc,
6 jsonrpc_ipc_server::{
7 RequestContext, ServerBuilder, tokio::sync::oneshot::channel as oneshot_channel,
8 },
9 log::*,
10 serde::{Deserialize, Serialize, de::Deserializer},
11 solana_accounts_db::accounts_index::AccountIndex,
12 solana_core::{
13 admin_rpc_post_init::AdminRpcRequestMetadataPostInit,
14 banking_stage::{
15 BankingControlMsg, BankingStage,
16 transaction_scheduler::scheduler_controller::SchedulerConfig,
17 },
18 consensus::{Tower, tower_storage::TowerStorage},
19 repair::repair_service,
20 validator::{
21 BlockProductionMethod, SchedulerPacing, TransactionStructure, ValidatorStartProgress,
22 },
23 },
24 solana_geyser_plugin_manager::GeyserPluginManagerRequest,
25 solana_gossip::contact_info::{ContactInfo, Protocol, SOCKET_ADDR_UNSPECIFIED},
26 solana_keypair::{Keypair, read_keypair_file},
27 solana_metrics::{datapoint_info, datapoint_warn},
28 solana_pubkey::Pubkey,
29 solana_rpc::rpc::verify_pubkey,
30 solana_rpc_client_api::{config::RpcAccountIndex, custom_error::RpcCustomError},
31 solana_runtime::snapshot_controller::SnapshotController,
32 solana_signer::Signer,
33 solana_validator_exit::Exit,
34 std::{
35 collections::{HashMap, HashSet},
36 env, error,
37 fmt::{self, Display},
38 net::{IpAddr, SocketAddr},
39 num::NonZeroUsize,
40 path::{Path, PathBuf},
41 sync::{
42 Arc, RwLock,
43 atomic::{AtomicBool, Ordering},
44 },
45 thread::{self, Builder},
46 time::{Duration, Instant, SystemTime},
47 },
48 tokio::runtime::Runtime,
49};
50
51#[derive(Clone)]
52pub struct AdminRpcRequestMetadata {
53 pub rpc_addr: Option<SocketAddr>,
54 pub start_time: SystemTime,
55 pub start_progress: Arc<RwLock<ValidatorStartProgress>>,
56 pub validator_exit: Arc<RwLock<Exit>>,
57 pub validator_exit_backpressure: HashMap<String, Arc<AtomicBool>>,
58 pub authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
59 pub tower_storage: Arc<dyn TowerStorage>,
60 pub staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
61 pub post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
62 pub rpc_to_plugin_manager_sender: Option<Sender<GeyserPluginManagerRequest>>,
63}
64
65impl Metadata for AdminRpcRequestMetadata {}
66
67impl AdminRpcRequestMetadata {
68 fn with_post_init<F, R>(&self, func: F) -> Result<R>
69 where
70 F: FnOnce(&AdminRpcRequestMetadataPostInit) -> Result<R>,
71 {
72 if let Some(post_init) = self.post_init.read().unwrap().as_ref() {
73 func(post_init)
74 } else {
75 Err(jsonrpc_core::error::Error::invalid_params(
76 "Retry once validator start up is complete",
77 ))
78 }
79 }
80
81 fn snapshot_controller(&self) -> Option<Arc<SnapshotController>> {
82 self.with_post_init(|post_init| Ok(post_init.snapshot_controller.clone()))
83 .map_err(|_| {
84 warn!("snapshot_controller unavailable, shutting down without taking snapshot");
86 })
87 .ok()
88 }
89}
90
91#[derive(Debug, Deserialize, Serialize)]
92pub struct AdminRpcContactInfo {
93 pub id: String,
94 pub gossip: SocketAddr,
95 pub tvu: SocketAddr,
96 pub serve_repair_quic: SocketAddr,
97 pub tpu: SocketAddr,
98 pub tpu_forwards: SocketAddr,
99 pub tpu_vote: SocketAddr,
100 pub rpc: SocketAddr,
101 pub rpc_pubsub: SocketAddr,
102 pub serve_repair: SocketAddr,
103 pub last_updated_timestamp: u64,
104 pub shred_version: u16,
105}
106
107#[derive(Debug, Deserialize, Serialize)]
108pub struct AdminRpcRepairWhitelist {
109 pub whitelist: Vec<Pubkey>,
110}
111
112impl From<ContactInfo> for AdminRpcContactInfo {
113 fn from(node: ContactInfo) -> Self {
114 macro_rules! unwrap_socket {
115 ($name:ident) => {
116 node.$name().unwrap_or(SOCKET_ADDR_UNSPECIFIED)
117 };
118 ($name:ident, $protocol:expr) => {
119 node.$name($protocol).unwrap_or(SOCKET_ADDR_UNSPECIFIED)
120 };
121 }
122 Self {
123 id: node.pubkey().to_string(),
124 last_updated_timestamp: node.wallclock(),
125 gossip: unwrap_socket!(gossip),
126 tvu: unwrap_socket!(tvu, Protocol::UDP),
127 serve_repair_quic: unwrap_socket!(serve_repair, Protocol::QUIC),
128 tpu: unwrap_socket!(tpu, Protocol::UDP),
129 tpu_forwards: unwrap_socket!(tpu_forwards, Protocol::UDP),
130 tpu_vote: unwrap_socket!(tpu_vote, Protocol::UDP),
131 rpc: unwrap_socket!(rpc),
132 rpc_pubsub: unwrap_socket!(rpc_pubsub),
133 serve_repair: unwrap_socket!(serve_repair, Protocol::UDP),
134 shred_version: node.shred_version(),
135 }
136 }
137}
138
139impl Display for AdminRpcContactInfo {
140 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
141 writeln!(f, "Identity: {}", self.id)?;
142 writeln!(f, "Gossip: {}", self.gossip)?;
143 writeln!(f, "TVU: {}", self.tvu)?;
144 writeln!(f, "TPU: {}", self.tpu)?;
145 writeln!(f, "TPU Forwards: {}", self.tpu_forwards)?;
146 writeln!(f, "TPU Votes: {}", self.tpu_vote)?;
147 writeln!(f, "RPC: {}", self.rpc)?;
148 writeln!(f, "RPC Pubsub: {}", self.rpc_pubsub)?;
149 writeln!(f, "Serve Repair: {}", self.serve_repair)?;
150 writeln!(f, "Last Updated Timestamp: {}", self.last_updated_timestamp)?;
151 writeln!(f, "Shred Version: {}", self.shred_version)
152 }
153}
154impl solana_cli_output::VerboseDisplay for AdminRpcContactInfo {}
155impl solana_cli_output::QuietDisplay for AdminRpcContactInfo {}
156
157impl Display for AdminRpcRepairWhitelist {
158 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
159 writeln!(f, "Repair whitelist: {:?}", &self.whitelist)
160 }
161}
162impl solana_cli_output::VerboseDisplay for AdminRpcRepairWhitelist {}
163impl solana_cli_output::QuietDisplay for AdminRpcRepairWhitelist {}
164
165#[rpc]
166pub trait AdminRpc {
167 type Metadata;
168
169 #[rpc(meta, name = "exit")]
172 fn exit(&self, meta: Self::Metadata) -> Result<()>;
173
174 #[rpc(meta, name = "pid")]
176 fn pid(&self, meta: Self::Metadata) -> Result<u32>;
177
178 #[rpc(meta, name = "reloadPlugin")]
179 fn reload_plugin(
180 &self,
181 meta: Self::Metadata,
182 name: String,
183 config_file: String,
184 ) -> BoxFuture<Result<()>>;
185
186 #[rpc(meta, name = "unloadPlugin")]
187 fn unload_plugin(&self, meta: Self::Metadata, name: String) -> BoxFuture<Result<()>>;
188
189 #[rpc(meta, name = "loadPlugin")]
190 fn load_plugin(&self, meta: Self::Metadata, config_file: String) -> BoxFuture<Result<String>>;
191
192 #[rpc(meta, name = "listPlugins")]
193 fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture<Result<Vec<String>>>;
194
195 #[rpc(meta, name = "rpcAddress")]
196 fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>>;
197
198 #[rpc(name = "setLogFilter")]
199 fn set_log_filter(&self, filter: String) -> Result<()>;
200
201 #[rpc(meta, name = "startTime")]
202 fn start_time(&self, meta: Self::Metadata) -> Result<SystemTime>;
203
204 #[rpc(meta, name = "startProgress")]
205 fn start_progress(&self, meta: Self::Metadata) -> Result<ValidatorStartProgress>;
206
207 #[rpc(meta, name = "addAuthorizedVoter")]
208 fn add_authorized_voter(&self, meta: Self::Metadata, keypair_file: String) -> Result<()>;
209
210 #[rpc(meta, name = "addAuthorizedVoterFromBytes")]
211 fn add_authorized_voter_from_bytes(&self, meta: Self::Metadata, keypair: Vec<u8>)
212 -> Result<()>;
213
214 #[rpc(meta, name = "removeAllAuthorizedVoters")]
215 fn remove_all_authorized_voters(&self, meta: Self::Metadata) -> Result<()>;
216
217 #[rpc(meta, name = "setIdentity")]
218 fn set_identity(
219 &self,
220 meta: Self::Metadata,
221 keypair_file: String,
222 require_tower: bool,
223 ) -> Result<()>;
224
225 #[rpc(meta, name = "setIdentityFromBytes")]
226 fn set_identity_from_bytes(
227 &self,
228 meta: Self::Metadata,
229 identity_keypair: Vec<u8>,
230 require_tower: bool,
231 ) -> Result<()>;
232
233 #[rpc(meta, name = "setStakedNodesOverrides")]
234 fn set_staked_nodes_overrides(&self, meta: Self::Metadata, path: String) -> Result<()>;
235
236 #[rpc(meta, name = "contactInfo")]
237 fn contact_info(&self, meta: Self::Metadata) -> Result<AdminRpcContactInfo>;
238
239 #[rpc(meta, name = "selectActiveInterface")]
240 fn select_active_interface(&self, meta: Self::Metadata, interface: IpAddr) -> Result<()>;
241
242 #[rpc(meta, name = "repairShredFromPeer")]
243 fn repair_shred_from_peer(
244 &self,
245 meta: Self::Metadata,
246 pubkey: Option<Pubkey>,
247 slot: u64,
248 shred_index: u64,
249 ) -> Result<()>;
250
251 #[rpc(meta, name = "repairWhitelist")]
252 fn repair_whitelist(&self, meta: Self::Metadata) -> Result<AdminRpcRepairWhitelist>;
253
254 #[rpc(meta, name = "setRepairWhitelist")]
255 fn set_repair_whitelist(&self, meta: Self::Metadata, whitelist: Vec<Pubkey>) -> Result<()>;
256
257 #[rpc(meta, name = "getSecondaryIndexKeySize")]
258 fn get_secondary_index_key_size(
259 &self,
260 meta: Self::Metadata,
261 pubkey_str: String,
262 ) -> Result<HashMap<RpcAccountIndex, usize>>;
263
264 #[rpc(meta, name = "setPublicTpuAddress")]
265 fn set_public_tpu_address(
266 &self,
267 meta: Self::Metadata,
268 public_tpu_addr: SocketAddr,
269 ) -> Result<()>;
270
271 #[rpc(meta, name = "setPublicTpuForwardsAddress")]
272 fn set_public_tpu_forwards_address(
273 &self,
274 meta: Self::Metadata,
275 public_tpu_forwards_addr: SocketAddr,
276 ) -> Result<()>;
277
278 #[rpc(meta, name = "setPublicTvuAddress")]
279 fn set_public_tvu_address(
280 &self,
281 meta: Self::Metadata,
282 public_tvu_addr: SocketAddr,
283 ) -> Result<()>;
284
285 #[rpc(meta, name = "manageBlockProduction")]
286 fn manage_block_production(
287 &self,
288 meta: Self::Metadata,
289 block_production_method: BlockProductionMethod,
290 transaction_struct: TransactionStructure,
291 num_workers: NonZeroUsize,
292 scheduler_pacing: SchedulerPacing,
293 ) -> Result<()>;
294
295 #[rpc(meta, name = "isGeneratingSnapshots")]
296 fn is_generating_snapshots(&self, meta: Self::Metadata) -> Result<bool>;
297}
298
299pub struct AdminRpcImpl;
300impl AdminRpc for AdminRpcImpl {
301 type Metadata = AdminRpcRequestMetadata;
302
303 fn exit(&self, meta: Self::Metadata) -> Result<()> {
304 debug!("exit admin rpc request received");
305
306 thread::Builder::new()
307 .name("solProcessExit".into())
308 .spawn(move || {
309 let start_time = Instant::now();
310
311 if let Some(snapshot_controller) = meta.snapshot_controller() {
313 let latest_snapshot_slot = snapshot_controller.latest_bank_snapshot_slot();
314
315 info!("Requesting fastboot snapshot before exit");
316 snapshot_controller.request_fastboot_snapshot();
317
318 let timeout = Duration::from_secs(5);
322 while snapshot_controller.latest_bank_snapshot_slot() == latest_snapshot_slot {
323 if start_time.elapsed() > timeout {
324 warn!("Timeout waiting for snapshot to complete");
325 datapoint_warn!(
326 "admin-rpc-snapshot-timeout",
327 ("timeout_us", start_time.elapsed().as_micros(), i64)
328 );
329 break;
330 }
331 thread::sleep(Duration::from_millis(100));
332 }
333 info!(
334 "Requesting fastboot snapshot before exit... Done in {:?}",
335 start_time.elapsed()
336 );
337 }
338
339 if start_time.elapsed().as_millis() < 100 {
343 thread::sleep(Duration::from_millis(100));
344 }
345
346 info!("validator exit requested");
347 meta.validator_exit.write().unwrap().exit();
348
349 if !meta.validator_exit_backpressure.is_empty() {
350 let service_names = meta.validator_exit_backpressure.keys();
351 info!("Wait for these services to complete: {service_names:?}");
352 loop {
353 thread::sleep(Duration::from_secs(1));
357
358 let mut any_flags_raised = false;
359 for (name, flag) in meta.validator_exit_backpressure.iter() {
360 let is_flag_raised = flag.load(Ordering::Relaxed);
361 if is_flag_raised {
362 info!("{name}'s exit backpressure flag is raised");
363 any_flags_raised = true;
364 }
365 }
366 if !any_flags_raised {
367 break;
368 }
369 }
370 info!("All services have completed");
371 }
372
373 thread::sleep(Duration::from_secs(
378 env::var("SOLANA_VALIDATOR_EXIT_TIMEOUT")
379 .ok()
380 .and_then(|x| x.parse().ok())
381 .unwrap_or(5),
382 ));
383 warn!("validator exit timeout");
384 std::process::exit(0);
385 })
386 .unwrap();
387
388 Ok(())
389 }
390
391 fn pid(&self, _meta: Self::Metadata) -> Result<u32> {
392 Ok(std::process::id())
393 }
394
395 fn reload_plugin(
396 &self,
397 meta: Self::Metadata,
398 name: String,
399 config_file: String,
400 ) -> BoxFuture<Result<()>> {
401 Box::pin(async move {
402 let (response_sender, response_receiver) = oneshot_channel();
404
405 if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
407 rpc_to_manager_sender
408 .send(GeyserPluginManagerRequest::ReloadPlugin {
409 name,
410 config_file,
411 response_sender,
412 })
413 .expect("GeyerPluginService should never drop request receiver");
414 } else {
415 return Err(jsonrpc_core::Error {
416 code: ErrorCode::InvalidRequest,
417 message: "No geyser plugin service".to_string(),
418 data: None,
419 });
420 }
421
422 response_receiver
424 .await
425 .expect("GeyerPluginService's oneshot sender shouldn't drop early")
426 })
427 }
428
429 fn load_plugin(&self, meta: Self::Metadata, config_file: String) -> BoxFuture<Result<String>> {
430 Box::pin(async move {
431 let (response_sender, response_receiver) = oneshot_channel();
433
434 if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
436 rpc_to_manager_sender
437 .send(GeyserPluginManagerRequest::LoadPlugin {
438 config_file,
439 response_sender,
440 })
441 .expect("GeyerPluginService should never drop request receiver");
442 } else {
443 return Err(jsonrpc_core::Error {
444 code: ErrorCode::InvalidRequest,
445 message: "No geyser plugin service".to_string(),
446 data: None,
447 });
448 }
449
450 response_receiver
452 .await
453 .expect("GeyerPluginService's oneshot sender shouldn't drop early")
454 })
455 }
456
457 fn unload_plugin(&self, meta: Self::Metadata, name: String) -> BoxFuture<Result<()>> {
458 Box::pin(async move {
459 let (response_sender, response_receiver) = oneshot_channel();
461
462 if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
464 rpc_to_manager_sender
465 .send(GeyserPluginManagerRequest::UnloadPlugin {
466 name,
467 response_sender,
468 })
469 .expect("GeyerPluginService should never drop request receiver");
470 } else {
471 return Err(jsonrpc_core::Error {
472 code: ErrorCode::InvalidRequest,
473 message: "No geyser plugin service".to_string(),
474 data: None,
475 });
476 }
477
478 response_receiver
480 .await
481 .expect("GeyerPluginService's oneshot sender shouldn't drop early")
482 })
483 }
484
485 fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture<Result<Vec<String>>> {
486 Box::pin(async move {
487 let (response_sender, response_receiver) = oneshot_channel();
489
490 if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
492 rpc_to_manager_sender
493 .send(GeyserPluginManagerRequest::ListPlugins { response_sender })
494 .expect("GeyerPluginService should never drop request receiver");
495 } else {
496 return Err(jsonrpc_core::Error {
497 code: ErrorCode::InvalidRequest,
498 message: "No geyser plugin service".to_string(),
499 data: None,
500 });
501 }
502
503 response_receiver
505 .await
506 .expect("GeyerPluginService's oneshot sender shouldn't drop early")
507 })
508 }
509
510 fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>> {
511 debug!("rpc_addr admin rpc request received");
512 Ok(meta.rpc_addr)
513 }
514
515 fn set_log_filter(&self, filter: String) -> Result<()> {
516 debug!("set_log_filter admin rpc request received");
517 agave_logger::setup_with(&filter);
518 Ok(())
519 }
520
521 fn start_time(&self, meta: Self::Metadata) -> Result<SystemTime> {
522 debug!("start_time admin rpc request received");
523 Ok(meta.start_time)
524 }
525
526 fn start_progress(&self, meta: Self::Metadata) -> Result<ValidatorStartProgress> {
527 debug!("start_progress admin rpc request received");
528 Ok(*meta.start_progress.read().unwrap())
529 }
530
531 fn add_authorized_voter(&self, meta: Self::Metadata, keypair_file: String) -> Result<()> {
532 debug!("add_authorized_voter request received");
533
534 let authorized_voter = read_keypair_file(keypair_file)
535 .map_err(|err| jsonrpc_core::error::Error::invalid_params(format!("{err}")))?;
536
537 AdminRpcImpl::add_authorized_voter_keypair(meta, authorized_voter)
538 }
539
540 fn add_authorized_voter_from_bytes(
541 &self,
542 meta: Self::Metadata,
543 keypair: Vec<u8>,
544 ) -> Result<()> {
545 debug!("add_authorized_voter_from_bytes request received");
546
547 let authorized_voter = Keypair::try_from(keypair.as_ref()).map_err(|err| {
548 jsonrpc_core::error::Error::invalid_params(format!(
549 "Failed to read authorized voter keypair from provided byte array: {err}"
550 ))
551 })?;
552
553 AdminRpcImpl::add_authorized_voter_keypair(meta, authorized_voter)
554 }
555
556 fn remove_all_authorized_voters(&self, meta: Self::Metadata) -> Result<()> {
557 debug!("remove_all_authorized_voters received");
558 meta.authorized_voter_keypairs.write().unwrap().clear();
559 Ok(())
560 }
561
562 fn set_identity(
563 &self,
564 meta: Self::Metadata,
565 keypair_file: String,
566 require_tower: bool,
567 ) -> Result<()> {
568 debug!("set_identity request received");
569
570 let identity_keypair = read_keypair_file(&keypair_file).map_err(|err| {
571 jsonrpc_core::error::Error::invalid_params(format!(
572 "Failed to read identity keypair from {keypair_file}: {err}"
573 ))
574 })?;
575
576 AdminRpcImpl::set_identity_keypair(meta, identity_keypair, require_tower)
577 }
578
579 fn set_identity_from_bytes(
580 &self,
581 meta: Self::Metadata,
582 identity_keypair: Vec<u8>,
583 require_tower: bool,
584 ) -> Result<()> {
585 debug!("set_identity_from_bytes request received");
586
587 let identity_keypair = Keypair::try_from(identity_keypair.as_ref()).map_err(|err| {
588 jsonrpc_core::error::Error::invalid_params(format!(
589 "Failed to read identity keypair from provided byte array: {err}"
590 ))
591 })?;
592
593 AdminRpcImpl::set_identity_keypair(meta, identity_keypair, require_tower)
594 }
595
596 fn set_staked_nodes_overrides(&self, meta: Self::Metadata, path: String) -> Result<()> {
597 let loaded_config = load_staked_nodes_overrides(&path)
598 .map_err(|err| {
599 error!(
600 "Failed to load staked nodes overrides from {}: {}",
601 &path, err
602 );
603 jsonrpc_core::error::Error::internal_error()
604 })?
605 .staked_map_id;
606 let mut write_staked_nodes = meta.staked_nodes_overrides.write().unwrap();
607 write_staked_nodes.clear();
608 write_staked_nodes.extend(loaded_config);
609 info!("Staked nodes overrides loaded from {path}");
610 debug!("overrides map: {write_staked_nodes:?}");
611 Ok(())
612 }
613
614 fn contact_info(&self, meta: Self::Metadata) -> Result<AdminRpcContactInfo> {
615 meta.with_post_init(|post_init| Ok(post_init.cluster_info.my_contact_info().into()))
616 }
617
618 fn select_active_interface(&self, meta: Self::Metadata, interface: IpAddr) -> Result<()> {
619 debug!("select_active_interface received: {interface}");
620 meta.with_post_init(|post_init| {
621 let node = post_init.node.as_ref().ok_or_else(|| {
622 jsonrpc_core::Error::invalid_params("`Node` not initialized in post_init")
623 })?;
624
625 node.switch_active_interface(interface, &post_init.cluster_info)
626 .map_err(|e| {
627 jsonrpc_core::Error::invalid_params(format!(
628 "Switching failed due to error {e}"
629 ))
630 })?;
631 info!("Switched primary interface to {interface}");
632 Ok(())
633 })
634 }
635
636 fn repair_shred_from_peer(
637 &self,
638 meta: Self::Metadata,
639 pubkey: Option<Pubkey>,
640 slot: u64,
641 shred_index: u64,
642 ) -> Result<()> {
643 debug!("repair_shred_from_peer request received");
644
645 meta.with_post_init(|post_init| {
646 repair_service::RepairService::request_repair_for_shred_from_peer(
647 post_init.cluster_info.clone(),
648 post_init.cluster_slots.clone(),
649 pubkey,
650 slot,
651 shred_index,
652 &post_init.repair_socket,
653 post_init.outstanding_repair_requests.clone(),
654 );
655 Ok(())
656 })
657 }
658
659 fn repair_whitelist(&self, meta: Self::Metadata) -> Result<AdminRpcRepairWhitelist> {
660 debug!("repair_whitelist request received");
661
662 meta.with_post_init(|post_init| {
663 let whitelist: Vec<_> = post_init
664 .repair_whitelist
665 .read()
666 .unwrap()
667 .iter()
668 .copied()
669 .collect();
670 Ok(AdminRpcRepairWhitelist { whitelist })
671 })
672 }
673
674 fn set_repair_whitelist(&self, meta: Self::Metadata, whitelist: Vec<Pubkey>) -> Result<()> {
675 debug!("set_repair_whitelist request received");
676
677 let whitelist: HashSet<Pubkey> = whitelist.into_iter().collect();
678 meta.with_post_init(|post_init| {
679 *post_init.repair_whitelist.write().unwrap() = whitelist;
680 warn!(
681 "Repair whitelist set to {:?}",
682 &post_init.repair_whitelist.read().unwrap()
683 );
684 Ok(())
685 })
686 }
687
688 fn get_secondary_index_key_size(
689 &self,
690 meta: Self::Metadata,
691 pubkey_str: String,
692 ) -> Result<HashMap<RpcAccountIndex, usize>> {
693 debug!("get_secondary_index_key_size rpc request received: {pubkey_str:?}");
694 let index_key = verify_pubkey(&pubkey_str)?;
695 meta.with_post_init(|post_init| {
696 let bank = post_init.bank_forks.read().unwrap().root_bank();
697
698 let enabled_account_indexes = &bank.accounts().accounts_db.account_indexes;
700
701 if enabled_account_indexes.is_empty() {
703 debug!("get_secondary_index_key_size: secondary index not enabled.");
704 return Ok(HashMap::new());
705 };
706
707 if !enabled_account_indexes.include_key(&index_key) {
709 return Err(RpcCustomError::KeyExcludedFromSecondaryIndex {
710 index_key: index_key.to_string(),
711 }
712 .into());
713 }
714
715 let accounts_index = &bank.accounts().accounts_db.accounts_index;
717
718 let found_sizes = enabled_account_indexes
720 .indexes
721 .iter()
722 .filter_map(|index| {
723 accounts_index
724 .get_index_key_size(index, &index_key)
725 .map(|size| (rpc_account_index_from_account_index(index), size))
726 })
727 .collect::<HashMap<_, _>>();
728
729 if found_sizes.is_empty() {
731 debug!("get_secondary_index_key_size: key not found in the secondary index.");
732 }
733 Ok(found_sizes)
734 })
735 }
736
737 fn set_public_tpu_address(
738 &self,
739 meta: Self::Metadata,
740 public_tpu_addr: SocketAddr,
741 ) -> Result<()> {
742 debug!("set_public_tpu_address rpc request received: {public_tpu_addr}");
743
744 meta.with_post_init(|post_init| {
745 post_init
746 .cluster_info
747 .my_contact_info()
748 .tpu(Protocol::QUIC)
749 .ok_or_else(|| {
750 error!(
751 "The public TPU QUIC address isn't being published. The node is likely in \
752 repair mode. See help for --restricted-repair-only-mode for more \
753 information."
754 );
755 jsonrpc_core::error::Error::internal_error()
756 })?;
757 post_init
758 .cluster_info
759 .set_tpu_quic(public_tpu_addr)
760 .map_err(|err| {
761 error!("Failed to set public TPU QUIC address to {public_tpu_addr}: {err}");
762 jsonrpc_core::error::Error::internal_error()
763 })?;
764 let my_contact_info = post_init.cluster_info.my_contact_info();
765 warn!(
766 "Public TPU addresses set to {:?} (quic)",
767 my_contact_info.tpu(Protocol::QUIC),
768 );
769 Ok(())
770 })
771 }
772
773 fn set_public_tpu_forwards_address(
774 &self,
775 meta: Self::Metadata,
776 public_tpu_forwards_addr: SocketAddr,
777 ) -> Result<()> {
778 debug!("set_public_tpu_forwards_address rpc request received: {public_tpu_forwards_addr}");
779
780 meta.with_post_init(|post_init| {
781 post_init
782 .cluster_info
783 .my_contact_info()
784 .tpu_forwards(Protocol::QUIC)
785 .ok_or_else(|| {
786 error!(
787 "The public TPU Forwards address isn't being published. The node is \
788 likely in repair mode. See help for --restricted-repair-only-mode for \
789 more information."
790 );
791 jsonrpc_core::error::Error::internal_error()
792 })?;
793 post_init
794 .cluster_info
795 .set_tpu_forwards_quic(public_tpu_forwards_addr)
796 .map_err(|err| {
797 error!(
798 "Failed to set public TPU QUIC address to {public_tpu_forwards_addr}: \
799 {err}"
800 );
801 jsonrpc_core::error::Error::internal_error()
802 })?;
803 let my_contact_info = post_init.cluster_info.my_contact_info();
804 warn!(
805 "Public TPU Forwards address set to {:?} (quic)",
806 my_contact_info.tpu_forwards(Protocol::QUIC),
807 );
808 Ok(())
809 })
810 }
811
812 fn set_public_tvu_address(
813 &self,
814 meta: Self::Metadata,
815 public_tvu_addr: SocketAddr,
816 ) -> Result<()> {
817 debug!("set_public_tvu_address rpc request received: {public_tvu_addr}");
818
819 meta.with_post_init(|post_init| {
820 post_init
821 .cluster_info
822 .my_contact_info()
823 .tvu(Protocol::UDP)
824 .ok_or_else(|| {
825 error!(
826 "The public TVU address isn't being published. The node is likely in \
827 repair mode. See help for --restricted-repair-only-mode for more \
828 information."
829 );
830 jsonrpc_core::error::Error::internal_error()
831 })?;
832 post_init
833 .cluster_info
834 .set_tvu_socket(public_tvu_addr)
835 .map_err(|err| {
836 error!("Failed to set public TVU address to {public_tvu_addr}: {err}");
837 jsonrpc_core::error::Error::internal_error()
838 })?;
839 let my_contact_info = post_init.cluster_info.my_contact_info();
840 warn!(
841 "Public TVU addresses set to {:?}",
842 my_contact_info.tvu(Protocol::UDP),
843 );
844 Ok(())
845 })
846 }
847
848 fn manage_block_production(
849 &self,
850 meta: Self::Metadata,
851 block_production_method: BlockProductionMethod,
852 transaction_struct: TransactionStructure,
853 num_workers: NonZeroUsize,
854 scheduler_pacing: SchedulerPacing,
855 ) -> Result<()> {
856 debug!("manage_block_production rpc request received");
857
858 if num_workers > BankingStage::max_num_workers() {
859 return Err(jsonrpc_core::error::Error::invalid_params(format!(
860 "Number of workers ({}) exceeds maximum allowed ({})",
861 num_workers,
862 BankingStage::max_num_workers()
863 )));
864 }
865
866 if transaction_struct != TransactionStructure::View {
867 warn!("TransactionStructure::Sdk has no effect on block production");
868 }
869
870 meta.with_post_init(|post_init| {
871 if post_init
872 .banking_control_sender
873 .try_send(BankingControlMsg::Internal {
874 block_production_method,
875 num_workers,
876 config: SchedulerConfig { scheduler_pacing },
877 })
878 .is_err()
879 {
880 error!("Banking stage already switching schedulers");
881
882 return Err(jsonrpc_core::error::Error::internal_error());
883 }
884
885 Ok(())
886 })
887 }
888
889 fn is_generating_snapshots(&self, meta: Self::Metadata) -> Result<bool> {
890 if let Some(snapshot_controller) = meta.snapshot_controller() {
891 Ok(snapshot_controller.is_generating_snapshots())
892 } else {
893 Err(jsonrpc_core::error::Error::invalid_params(
894 "snapshot_controller unavailable",
895 ))
896 }
897 }
898}
899
900impl AdminRpcImpl {
901 fn add_authorized_voter_keypair(
902 meta: AdminRpcRequestMetadata,
903 authorized_voter: Keypair,
904 ) -> Result<()> {
905 let mut authorized_voter_keypairs = meta.authorized_voter_keypairs.write().unwrap();
906
907 if authorized_voter_keypairs
908 .iter()
909 .any(|x| x.pubkey() == authorized_voter.pubkey())
910 {
911 Err(jsonrpc_core::error::Error::invalid_params(
912 "Authorized voter already present",
913 ))
914 } else {
915 authorized_voter_keypairs.push(Arc::new(authorized_voter));
916 Ok(())
917 }
918 }
919
920 fn set_identity_keypair(
921 meta: AdminRpcRequestMetadata,
922 identity_keypair: Keypair,
923 require_tower: bool,
924 ) -> Result<()> {
925 meta.with_post_init(|post_init| {
926 if require_tower {
927 let _ = Tower::restore(meta.tower_storage.as_ref(), &identity_keypair.pubkey())
928 .map_err(|err| {
929 jsonrpc_core::error::Error::invalid_params(format!(
930 "Unable to load tower file for identity {}: {}",
931 identity_keypair.pubkey(),
932 err
933 ))
934 })?;
935 }
936
937 for (key, notifier) in &*post_init.notifies.read().unwrap() {
938 if let Err(err) = notifier.update_key(&identity_keypair) {
939 error!("Error updating network layer keypair: {err} on {key:?}");
940 }
941 }
942
943 let old_identity = post_init.cluster_info.id();
944 let new_identity = identity_keypair.pubkey();
945 solana_metrics::set_host_id(new_identity.to_string());
946 datapoint_info!(
948 "validator-set_identity",
949 ("old_id", old_identity.to_string(), String),
950 ("new_id", new_identity.to_string(), String),
951 ("version", solana_version::version!(), String),
952 );
953 post_init
954 .cluster_info
955 .set_keypair(Arc::new(identity_keypair));
956 warn!("Identity set to {new_identity}");
957 Ok(())
958 })
959 }
960}
961
962fn rpc_account_index_from_account_index(account_index: &AccountIndex) -> RpcAccountIndex {
963 match account_index {
964 AccountIndex::ProgramId => RpcAccountIndex::ProgramId,
965 AccountIndex::SplTokenOwner => RpcAccountIndex::SplTokenOwner,
966 AccountIndex::SplTokenMint => RpcAccountIndex::SplTokenMint,
967 }
968}
969
970pub fn run(ledger_path: &Path, metadata: AdminRpcRequestMetadata) {
972 let admin_rpc_path = admin_rpc_path(ledger_path);
973
974 let event_loop = tokio::runtime::Builder::new_multi_thread()
975 .thread_name("solAdminRpcEl")
976 .worker_threads(3) .enable_all()
978 .build()
979 .unwrap();
980
981 Builder::new()
982 .name("solAdminRpc".to_string())
983 .spawn(move || {
984 let mut io = MetaIoHandler::default();
985 io.extend_with(AdminRpcImpl.to_delegate());
986
987 let validator_exit = metadata.validator_exit.clone();
988 let server = ServerBuilder::with_meta_extractor(io, move |_req: &RequestContext| {
989 metadata.clone()
990 })
991 .event_loop_executor(event_loop.handle().clone())
992 .start(&format!("{}", admin_rpc_path.display()));
993
994 match server {
995 Err(err) => {
996 warn!("Unable to start admin rpc service: {err:?}");
997 }
998 Ok(server) => {
999 info!("started admin rpc service!");
1000 let close_handle = server.close_handle();
1001 validator_exit
1002 .write()
1003 .unwrap()
1004 .register_exit(Box::new(move || {
1005 close_handle.close();
1006 }));
1007
1008 server.wait();
1009 }
1010 }
1011 })
1012 .unwrap();
1013}
1014
1015fn admin_rpc_path(ledger_path: &Path) -> PathBuf {
1016 #[cfg(target_family = "windows")]
1017 {
1018 if let Some(ledger_filename) = ledger_path.file_name() {
1021 PathBuf::from(format!(
1022 "\\\\.\\pipe\\{}-admin.rpc",
1023 ledger_filename.to_string_lossy()
1024 ))
1025 } else {
1026 PathBuf::from("\\\\.\\pipe\\admin.rpc")
1027 }
1028 }
1029 #[cfg(not(target_family = "windows"))]
1030 {
1031 ledger_path.join("admin.rpc")
1032 }
1033}
1034
1035pub async fn connect(ledger_path: &Path) -> std::result::Result<gen_client::Client, RpcError> {
1037 let admin_rpc_path = admin_rpc_path(ledger_path);
1038 if !admin_rpc_path.exists() {
1039 Err(RpcError::Client(format!(
1040 "{} does not exist",
1041 admin_rpc_path.display()
1042 )))
1043 } else {
1044 ipc::connect::<_, gen_client::Client>(&format!("{}", admin_rpc_path.display())).await
1045 }
1046}
1047
1048pub fn runtime() -> Runtime {
1050 tokio::runtime::Builder::new_multi_thread()
1051 .thread_name("solAdminRpcRt")
1052 .enable_all()
1053 .worker_threads(2)
1056 .build()
1057 .expect("new tokio runtime")
1058}
1059
1060#[derive(Default, Deserialize, Clone)]
1061pub struct StakedNodesOverrides {
1062 #[serde(deserialize_with = "deserialize_pubkey_map")]
1063 pub staked_map_id: HashMap<Pubkey, u64>,
1064}
1065
1066pub fn deserialize_pubkey_map<'de, D>(des: D) -> std::result::Result<HashMap<Pubkey, u64>, D::Error>
1067where
1068 D: Deserializer<'de>,
1069{
1070 let container: HashMap<String, u64> = serde::Deserialize::deserialize(des)?;
1071 let mut container_typed: HashMap<Pubkey, u64> = HashMap::new();
1072 for (key, value) in container.iter() {
1073 let typed_key = Pubkey::try_from(key.as_str())
1074 .map_err(|_| serde::de::Error::invalid_type(serde::de::Unexpected::Map, &"PubKey"))?;
1075 container_typed.insert(typed_key, *value);
1076 }
1077 Ok(container_typed)
1078}
1079
1080pub fn load_staked_nodes_overrides(
1081 path: &String,
1082) -> std::result::Result<StakedNodesOverrides, Box<dyn error::Error>> {
1083 debug!("Loading staked nodes overrides configuration from {path}");
1084 if Path::new(&path).exists() {
1085 let file = std::fs::File::open(path)?;
1086 Ok(serde_yaml::from_reader(file)?)
1087 } else {
1088 Err(format!("Staked nodes overrides provided '{path}' a non-existing file path.").into())
1089 }
1090}
1091
1092#[cfg(test)]
1093mod tests {
1094 use {
1095 super::*,
1096 agave_snapshots::snapshot_config::SnapshotConfig,
1097 crossbeam_channel::unbounded,
1098 serde_json::Value,
1099 solana_account::{Account, AccountSharedData},
1100 solana_accounts_db::{
1101 accounts_db::{ACCOUNTS_DB_CONFIG_FOR_TESTING, AccountsDbConfig},
1102 accounts_index::AccountSecondaryIndexes,
1103 },
1104 solana_core::{
1105 admin_rpc_post_init::{KeyUpdaterType, KeyUpdaters},
1106 consensus::tower_storage::NullTowerStorage,
1107 validator::{Validator, ValidatorConfig, ValidatorTpuConfig},
1108 },
1109 solana_gossip::{cluster_info::ClusterInfo, node::Node},
1110 solana_ledger::{
1111 create_new_tmp_ledger,
1112 genesis_utils::{
1113 GenesisConfigInfo, create_genesis_config, create_genesis_config_with_leader,
1114 },
1115 },
1116 solana_net_utils::{SocketAddrSpace, sockets::bind_to_localhost_unique},
1117 solana_program_option::COption,
1118 solana_program_pack::Pack,
1119 solana_pubkey::Pubkey,
1120 solana_rpc::rpc::create_validator_exit,
1121 solana_runtime::{
1122 bank::{Bank, BankTestConfig},
1123 bank_forks::BankForks,
1124 },
1125 solana_system_interface::program as system_program,
1126 spl_generic_token::token,
1127 spl_token_2022_interface::state::{
1128 Account as TokenAccount, AccountState as TokenAccountState, Mint,
1129 },
1130 std::{collections::HashSet, fs::remove_dir_all, sync::atomic::AtomicBool},
1131 tokio::sync::mpsc,
1132 };
1133
1134 #[derive(Default)]
1135 struct TestConfig {
1136 account_indexes: AccountSecondaryIndexes,
1137 }
1138
1139 struct RpcHandler {
1140 io: MetaIoHandler<AdminRpcRequestMetadata>,
1141 meta: AdminRpcRequestMetadata,
1142 bank_forks: Arc<RwLock<BankForks>>,
1143 }
1144
1145 impl RpcHandler {
1146 fn _start() -> Self {
1147 Self::start_with_config(TestConfig::default())
1148 }
1149
1150 fn start_with_config(config: TestConfig) -> Self {
1151 let keypair = Arc::new(Keypair::new());
1152 let cluster_info = Arc::new(ClusterInfo::new(
1153 ContactInfo::new(
1154 keypair.pubkey(),
1155 solana_time_utils::timestamp(), 0u16, ),
1158 keypair,
1159 SocketAddrSpace::Unspecified,
1160 ));
1161 let exit = Arc::new(AtomicBool::new(false));
1162 let validator_exit = create_validator_exit(exit);
1163 let (bank_forks, vote_keypair) = new_bank_forks_with_config(BankTestConfig {
1164 accounts_db_config: AccountsDbConfig {
1165 account_indexes: Some(config.account_indexes),
1166 ..ACCOUNTS_DB_CONFIG_FOR_TESTING
1167 },
1168 });
1169
1170 let (snapshot_request_sender, _) = unbounded();
1171 let snapshot_controller = Arc::new(SnapshotController::new(
1172 snapshot_request_sender.clone(),
1173 SnapshotConfig::default(),
1174 bank_forks.read().unwrap().root(),
1175 ));
1176
1177 let vote_account = vote_keypair.pubkey();
1178 let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
1179 let repair_whitelist = Arc::new(RwLock::new(HashSet::new()));
1180 let meta = AdminRpcRequestMetadata {
1181 rpc_addr: None,
1182 start_time: SystemTime::now(),
1183 start_progress,
1184 validator_exit,
1185 validator_exit_backpressure: HashMap::default(),
1186 authorized_voter_keypairs: Arc::new(RwLock::new(vec![vote_keypair])),
1187 tower_storage: Arc::new(NullTowerStorage {}),
1188 post_init: Arc::new(RwLock::new(Some(AdminRpcRequestMetadataPostInit {
1189 cluster_info,
1190 bank_forks: bank_forks.clone(),
1191 vote_account,
1192 repair_whitelist,
1193 notifies: Arc::new(RwLock::new(KeyUpdaters::default())),
1194 repair_socket: Arc::new(bind_to_localhost_unique().expect("should bind")),
1195 outstanding_repair_requests: Arc::<
1196 RwLock<repair_service::OutstandingShredRepairs>,
1197 >::default(),
1198 cluster_slots: Arc::new(
1199 solana_core::cluster_slots_service::cluster_slots::ClusterSlots::default_for_tests(),
1200 ),
1201 node: None,
1202 banking_control_sender: mpsc::channel(1).0,
1203 snapshot_controller,
1204 }))),
1205 staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
1206 rpc_to_plugin_manager_sender: None,
1207 };
1208 let mut io = MetaIoHandler::default();
1209 io.extend_with(AdminRpcImpl.to_delegate());
1210
1211 Self {
1212 io,
1213 meta,
1214 bank_forks,
1215 }
1216 }
1217
1218 fn root_bank(&self) -> Arc<Bank> {
1219 self.bank_forks.read().unwrap().root_bank()
1220 }
1221 }
1222
1223 fn new_bank_forks_with_config(
1224 config: BankTestConfig,
1225 ) -> (Arc<RwLock<BankForks>>, Arc<Keypair>) {
1226 let GenesisConfigInfo {
1227 genesis_config,
1228 voting_keypair,
1229 ..
1230 } = create_genesis_config(1_000_000_000);
1231
1232 let bank = Bank::new_with_config_for_tests(&genesis_config, config);
1233 (BankForks::new_rw_arc(bank), Arc::new(voting_keypair))
1234 }
1235
1236 #[test]
1237 fn test_secondary_index_key_sizes() {
1238 for secondary_index_enabled in [true, false] {
1239 let account_indexes = if secondary_index_enabled {
1240 AccountSecondaryIndexes {
1241 keys: None,
1242 indexes: HashSet::from([
1243 AccountIndex::ProgramId,
1244 AccountIndex::SplTokenMint,
1245 AccountIndex::SplTokenOwner,
1246 ]),
1247 }
1248 } else {
1249 AccountSecondaryIndexes::default()
1250 };
1251
1252 let rpc = RpcHandler::start_with_config(TestConfig { account_indexes });
1254
1255 let bank = rpc.root_bank();
1256 let RpcHandler { io, meta, .. } = rpc;
1257
1258 let token_account1_pubkey = Pubkey::new_unique();
1260 let token_account2_pubkey = Pubkey::new_unique();
1261 let token_account3_pubkey = Pubkey::new_unique();
1262 let mint1_pubkey = Pubkey::new_unique();
1263 let mint2_pubkey = Pubkey::new_unique();
1264 let wallet1_pubkey = Pubkey::new_unique();
1265 let wallet2_pubkey = Pubkey::new_unique();
1266 let non_existent_pubkey = Pubkey::new_unique();
1267 let delegate = Pubkey::new_unique();
1268
1269 let mut num_default_spl_token_program_accounts = 0;
1270 let mut num_default_system_program_accounts = 0;
1271
1272 if !secondary_index_enabled {
1273 let req = format!(
1275 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{token_account1_pubkey}"]}}"#,
1276 );
1277 let res = io.handle_request_sync(&req, meta.clone());
1278 let result: Value = serde_json::from_str(&res.expect("actual response"))
1279 .expect("actual response deserialization");
1280 let sizes: HashMap<RpcAccountIndex, usize> =
1281 serde_json::from_value(result["result"].clone()).unwrap();
1282 assert!(sizes.is_empty());
1283 } else {
1284 let req = format!(
1286 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1287 token::id(),
1288 );
1289 let res = io.handle_request_sync(&req, meta.clone());
1290 let result: Value = serde_json::from_str(&res.expect("actual response"))
1291 .expect("actual response deserialization");
1292 let sizes: HashMap<RpcAccountIndex, usize> =
1293 serde_json::from_value(result["result"].clone()).unwrap();
1294 assert_eq!(sizes.len(), 1);
1295 num_default_spl_token_program_accounts =
1296 *sizes.get(&RpcAccountIndex::ProgramId).unwrap();
1297 let req = format!(
1299 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1300 system_program::id(),
1301 );
1302 let res = io.handle_request_sync(&req, meta.clone());
1303 let result: Value = serde_json::from_str(&res.expect("actual response"))
1304 .expect("actual response deserialization");
1305 let sizes: HashMap<RpcAccountIndex, usize> =
1306 serde_json::from_value(result["result"].clone()).unwrap();
1307 assert_eq!(sizes.len(), 1);
1308 num_default_system_program_accounts =
1309 *sizes.get(&RpcAccountIndex::ProgramId).unwrap();
1310 }
1311
1312 let wallet1_account = AccountSharedData::from(Account {
1314 lamports: 11111111,
1315 owner: system_program::id(),
1316 ..Account::default()
1317 });
1318 bank.store_account(&wallet1_pubkey, &wallet1_account);
1319 let wallet2_account = AccountSharedData::from(Account {
1320 lamports: 11111111,
1321 owner: system_program::id(),
1322 ..Account::default()
1323 });
1324 bank.store_account(&wallet2_pubkey, &wallet2_account);
1325
1326 let mut account1_data = vec![0; TokenAccount::get_packed_len()];
1328 let token_account1 = TokenAccount {
1329 mint: mint1_pubkey,
1330 owner: wallet1_pubkey,
1331 delegate: COption::Some(delegate),
1332 amount: 420,
1333 state: TokenAccountState::Initialized,
1334 is_native: COption::None,
1335 delegated_amount: 30,
1336 close_authority: COption::Some(wallet1_pubkey),
1337 };
1338 TokenAccount::pack(token_account1, &mut account1_data).unwrap();
1339 let token_account1 = AccountSharedData::from(Account {
1340 lamports: 111,
1341 data: account1_data.to_vec(),
1342 owner: token::id(),
1343 ..Account::default()
1344 });
1345 bank.store_account(&token_account1_pubkey, &token_account1);
1346
1347 let mut mint1_data = vec![0; Mint::get_packed_len()];
1349 let mint1_state = Mint {
1350 mint_authority: COption::Some(wallet1_pubkey),
1351 supply: 500,
1352 decimals: 2,
1353 is_initialized: true,
1354 freeze_authority: COption::Some(wallet1_pubkey),
1355 };
1356 Mint::pack(mint1_state, &mut mint1_data).unwrap();
1357 let mint_account1 = AccountSharedData::from(Account {
1358 lamports: 222,
1359 data: mint1_data.to_vec(),
1360 owner: token::id(),
1361 ..Account::default()
1362 });
1363 bank.store_account(&mint1_pubkey, &mint_account1);
1364
1365 let mut account2_data = vec![0; TokenAccount::get_packed_len()];
1367 let token_account2 = TokenAccount {
1368 mint: mint1_pubkey,
1369 owner: wallet2_pubkey,
1370 delegate: COption::Some(delegate),
1371 amount: 420,
1372 state: TokenAccountState::Initialized,
1373 is_native: COption::None,
1374 delegated_amount: 30,
1375 close_authority: COption::Some(wallet2_pubkey),
1376 };
1377 TokenAccount::pack(token_account2, &mut account2_data).unwrap();
1378 let token_account2 = AccountSharedData::from(Account {
1379 lamports: 333,
1380 data: account2_data.to_vec(),
1381 owner: token::id(),
1382 ..Account::default()
1383 });
1384 bank.store_account(&token_account2_pubkey, &token_account2);
1385
1386 let mut account3_data = vec![0; TokenAccount::get_packed_len()];
1388 let token_account3 = TokenAccount {
1389 mint: mint2_pubkey,
1390 owner: wallet2_pubkey,
1391 delegate: COption::Some(delegate),
1392 amount: 42,
1393 state: TokenAccountState::Initialized,
1394 is_native: COption::None,
1395 delegated_amount: 30,
1396 close_authority: COption::Some(wallet2_pubkey),
1397 };
1398 TokenAccount::pack(token_account3, &mut account3_data).unwrap();
1399 let token_account3 = AccountSharedData::from(Account {
1400 lamports: 444,
1401 data: account3_data.to_vec(),
1402 owner: token::id(),
1403 ..Account::default()
1404 });
1405 bank.store_account(&token_account3_pubkey, &token_account3);
1406
1407 let mut mint2_data = vec![0; Mint::get_packed_len()];
1409 let mint2_state = Mint {
1410 mint_authority: COption::Some(wallet2_pubkey),
1411 supply: 200,
1412 decimals: 3,
1413 is_initialized: true,
1414 freeze_authority: COption::Some(wallet2_pubkey),
1415 };
1416 Mint::pack(mint2_state, &mut mint2_data).unwrap();
1417 let mint_account2 = AccountSharedData::from(Account {
1418 lamports: 555,
1419 data: mint2_data.to_vec(),
1420 owner: token::id(),
1421 ..Account::default()
1422 });
1423 bank.store_account(&mint2_pubkey, &mint_account2);
1424
1425 if secondary_index_enabled {
1442 let req = format!(
1444 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{non_existent_pubkey}"]}}"#,
1445 );
1446 let res = io.handle_request_sync(&req, meta.clone());
1447 let result: Value = serde_json::from_str(&res.expect("actual response"))
1448 .expect("actual response deserialization");
1449 let sizes: HashMap<RpcAccountIndex, usize> =
1450 serde_json::from_value(result["result"].clone()).unwrap();
1451 assert!(sizes.is_empty());
1452 let req = format!(
1455 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{wallet1_pubkey}"]}}"#,
1456 );
1457 let res = io.handle_request_sync(&req, meta.clone());
1458 let result: Value = serde_json::from_str(&res.expect("actual response"))
1459 .expect("actual response deserialization");
1460 let sizes: HashMap<RpcAccountIndex, usize> =
1461 serde_json::from_value(result["result"].clone()).unwrap();
1462 assert_eq!(sizes.len(), 1);
1463 assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenOwner).unwrap(), 1);
1464 let req = format!(
1466 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{wallet2_pubkey}"]}}"#,
1467 );
1468 let res = io.handle_request_sync(&req, meta.clone());
1469 let result: Value = serde_json::from_str(&res.expect("actual response"))
1470 .expect("actual response deserialization");
1471 let sizes: HashMap<RpcAccountIndex, usize> =
1472 serde_json::from_value(result["result"].clone()).unwrap();
1473 assert_eq!(sizes.len(), 1);
1474 assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenOwner).unwrap(), 2);
1475 let req = format!(
1477 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{mint1_pubkey}"]}}"#,
1478 );
1479 let res = io.handle_request_sync(&req, meta.clone());
1480 let result: Value = serde_json::from_str(&res.expect("actual response"))
1481 .expect("actual response deserialization");
1482 let sizes: HashMap<RpcAccountIndex, usize> =
1483 serde_json::from_value(result["result"].clone()).unwrap();
1484 assert_eq!(sizes.len(), 1);
1485 assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenMint).unwrap(), 2);
1486 let req = format!(
1488 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{mint2_pubkey}"]}}"#,
1489 );
1490 let res = io.handle_request_sync(&req, meta.clone());
1491 let result: Value = serde_json::from_str(&res.expect("actual response"))
1492 .expect("actual response deserialization");
1493 let sizes: HashMap<RpcAccountIndex, usize> =
1494 serde_json::from_value(result["result"].clone()).unwrap();
1495 assert_eq!(sizes.len(), 1);
1496 assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenMint).unwrap(), 1);
1497 let req = format!(
1499 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1500 token::id(),
1501 );
1502 let res = io.handle_request_sync(&req, meta.clone());
1503 let result: Value = serde_json::from_str(&res.expect("actual response"))
1504 .expect("actual response deserialization");
1505 let sizes: HashMap<RpcAccountIndex, usize> =
1506 serde_json::from_value(result["result"].clone()).unwrap();
1507 assert_eq!(sizes.len(), 1);
1508 assert_eq!(
1509 *sizes.get(&RpcAccountIndex::ProgramId).unwrap(),
1510 (num_default_spl_token_program_accounts + 5)
1511 );
1512 let req = format!(
1514 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1515 system_program::id(),
1516 );
1517 let res = io.handle_request_sync(&req, meta.clone());
1518 let result: Value = serde_json::from_str(&res.expect("actual response"))
1519 .expect("actual response deserialization");
1520 let sizes: HashMap<RpcAccountIndex, usize> =
1521 serde_json::from_value(result["result"].clone()).unwrap();
1522 assert_eq!(sizes.len(), 1);
1523 assert_eq!(
1524 *sizes.get(&RpcAccountIndex::ProgramId).unwrap(),
1525 (num_default_system_program_accounts + 2)
1526 );
1527 } else {
1528 let req = format!(
1530 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{token_account2_pubkey}"]}}"#,
1531 );
1532 let res = io.handle_request_sync(&req, meta.clone());
1533 let result: Value = serde_json::from_str(&res.expect("actual response"))
1534 .expect("actual response deserialization");
1535 let sizes: HashMap<RpcAccountIndex, usize> =
1536 serde_json::from_value(result["result"].clone()).unwrap();
1537 assert!(sizes.is_empty());
1538 }
1539 }
1540 }
1541
1542 #[test]
1545 fn test_set_identity() {
1546 let rpc = RpcHandler::start_with_config(TestConfig::default());
1547
1548 let RpcHandler { io, meta, .. } = rpc;
1549
1550 let expected_validator_id = Keypair::new();
1551 let validator_id_bytes = format!("{:?}", expected_validator_id.to_bytes());
1552
1553 let set_id_request = format!(
1554 r#"{{"jsonrpc":"2.0","id":1,"method":"setIdentityFromBytes","params":[{validator_id_bytes}, false]}}"#,
1555 );
1556 let response = io.handle_request_sync(&set_id_request, meta.clone());
1557 let actual_parsed_response: Value =
1558 serde_json::from_str(&response.expect("actual response"))
1559 .expect("actual response deserialization");
1560
1561 let expected_parsed_response: Value = serde_json::from_str(
1562 r#"{
1563 "id": 1,
1564 "jsonrpc": "2.0",
1565 "result": null
1566 }"#,
1567 )
1568 .expect("Failed to parse expected response");
1569 assert_eq!(actual_parsed_response, expected_parsed_response);
1570
1571 let contact_info_request =
1572 r#"{"jsonrpc":"2.0","id":1,"method":"contactInfo","params":[]}"#.to_string();
1573 let response = io.handle_request_sync(&contact_info_request, meta.clone());
1574 let parsed_response: Value = serde_json::from_str(&response.expect("actual response"))
1575 .expect("actual response deserialization");
1576 let actual_validator_id = parsed_response["result"]["id"]
1577 .as_str()
1578 .expect("Expected a string");
1579 assert_eq!(
1580 actual_validator_id,
1581 expected_validator_id.pubkey().to_string()
1582 );
1583 }
1584
1585 struct TestValidatorWithAdminRpc {
1586 meta: AdminRpcRequestMetadata,
1587 io: MetaIoHandler<AdminRpcRequestMetadata>,
1588 validator_ledger_path: PathBuf,
1589 }
1590
1591 impl TestValidatorWithAdminRpc {
1592 fn new() -> Self {
1593 let leader_keypair = Keypair::new();
1594 let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
1595
1596 let validator_keypair = Keypair::new();
1597 let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
1598 let genesis_config =
1599 create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1000)
1600 .genesis_config;
1601 let (validator_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
1602
1603 let voting_keypair = Arc::new(Keypair::new());
1604 let voting_pubkey = voting_keypair.pubkey();
1605 let authorized_voter_keypairs = Arc::new(RwLock::new(vec![voting_keypair]));
1606 let validator_config = ValidatorConfig {
1607 rpc_addrs: Some((
1608 validator_node.info.rpc().unwrap(),
1609 validator_node.info.rpc_pubsub().unwrap(),
1610 )),
1611 ..ValidatorConfig::default_for_test()
1612 };
1613 let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
1614
1615 let post_init = Arc::new(RwLock::new(None));
1616 let meta = AdminRpcRequestMetadata {
1617 rpc_addr: validator_config.rpc_addrs.map(|(rpc_addr, _)| rpc_addr),
1618 start_time: SystemTime::now(),
1619 start_progress: start_progress.clone(),
1620 validator_exit: validator_config.validator_exit.clone(),
1621 validator_exit_backpressure: HashMap::default(),
1622 authorized_voter_keypairs: authorized_voter_keypairs.clone(),
1623 tower_storage: Arc::new(NullTowerStorage {}),
1624 post_init: post_init.clone(),
1625 staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
1626 rpc_to_plugin_manager_sender: None,
1627 };
1628
1629 let _validator = Validator::new(
1630 validator_node,
1631 Arc::new(validator_keypair),
1632 &validator_ledger_path,
1633 &voting_pubkey,
1634 authorized_voter_keypairs,
1635 vec![leader_node.info],
1636 &validator_config,
1637 true, None, start_progress.clone(),
1640 SocketAddrSpace::Unspecified,
1641 ValidatorTpuConfig::new_for_tests(),
1642 post_init.clone(),
1643 None,
1644 )
1645 .expect("assume successful validator start");
1646 assert_eq!(
1647 *start_progress.read().unwrap(),
1648 ValidatorStartProgress::Running
1649 );
1650 let post_init = post_init.read().unwrap();
1651
1652 assert!(post_init.is_some());
1653 let post_init = post_init.as_ref().unwrap();
1654 let notifies = post_init.notifies.read().unwrap();
1655 let updater_keys: HashSet<KeyUpdaterType> =
1656 notifies.into_iter().map(|(key, _)| key.clone()).collect();
1657 assert_eq!(
1658 updater_keys,
1659 HashSet::from_iter(vec![
1660 KeyUpdaterType::Tpu,
1661 KeyUpdaterType::TpuForwards,
1662 KeyUpdaterType::TpuVote,
1663 KeyUpdaterType::Forward,
1664 KeyUpdaterType::RpcService,
1665 KeyUpdaterType::Bls,
1666 KeyUpdaterType::BlsConnectionCache,
1667 ])
1668 );
1669 let mut io = MetaIoHandler::default();
1670 io.extend_with(AdminRpcImpl.to_delegate());
1671 Self {
1672 meta,
1673 io,
1674 validator_ledger_path,
1675 }
1676 }
1677
1678 fn handle_request(&self, request: &str) -> Option<String> {
1679 self.io.handle_request_sync(request, self.meta.clone())
1680 }
1681 }
1682
1683 impl Drop for TestValidatorWithAdminRpc {
1684 fn drop(&mut self) {
1685 remove_dir_all(self.validator_ledger_path.clone()).unwrap();
1686 }
1687 }
1688
1689 #[test]
1690 fn test_no_post_init_no_snapshot_controller() {
1691 let validator_exit = create_validator_exit(Arc::new(AtomicBool::new(false)));
1692 let voting_keypair = Arc::new(Keypair::new());
1693 let authorized_voter_keypairs = Arc::new(RwLock::new(vec![voting_keypair]));
1694 let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
1695
1696 let post_init = Arc::new(RwLock::new(None));
1697 let meta = AdminRpcRequestMetadata {
1698 rpc_addr: None,
1699 start_time: SystemTime::now(),
1700 start_progress: start_progress.clone(),
1701 validator_exit,
1702 validator_exit_backpressure: HashMap::default(),
1703 authorized_voter_keypairs: authorized_voter_keypairs.clone(),
1704 tower_storage: Arc::new(NullTowerStorage {}),
1705 post_init: post_init.clone(),
1706 staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
1707 rpc_to_plugin_manager_sender: None,
1708 };
1709
1710 let snapshot_controller = meta.snapshot_controller();
1711 assert!(snapshot_controller.is_none());
1712 }
1713
1714 #[test]
1716 fn test_set_identity_with_validator() {
1717 let test_validator = TestValidatorWithAdminRpc::new();
1718 let expected_validator_id = Keypair::new();
1719 let validator_id_bytes = format!("{:?}", expected_validator_id.to_bytes());
1720
1721 let set_id_request = format!(
1722 r#"{{"jsonrpc":"2.0","id":1,"method":"setIdentityFromBytes","params":[{validator_id_bytes}, false]}}"#,
1723 );
1724 let response = test_validator.handle_request(&set_id_request);
1725 let actual_parsed_response: Value =
1726 serde_json::from_str(&response.expect("actual response"))
1727 .expect("actual response deserialization");
1728
1729 let expected_parsed_response: Value = serde_json::from_str(
1730 r#"{
1731 "id": 1,
1732 "jsonrpc": "2.0",
1733 "result": null
1734 }"#,
1735 )
1736 .expect("Failed to parse expected response");
1737 assert_eq!(actual_parsed_response, expected_parsed_response);
1738
1739 let contact_info_request =
1740 r#"{"jsonrpc":"2.0","id":1,"method":"contactInfo","params":[]}"#.to_string();
1741 let response = test_validator.handle_request(&contact_info_request);
1742 let parsed_response: Value = serde_json::from_str(&response.expect("actual response"))
1743 .expect("actual response deserialization");
1744 let actual_validator_id = parsed_response["result"]["id"]
1745 .as_str()
1746 .expect("Expected a string");
1747 assert_eq!(
1748 actual_validator_id,
1749 expected_validator_id.pubkey().to_string()
1750 );
1751
1752 let contact_info_request =
1753 r#"{"jsonrpc":"2.0","id":1,"method":"exit","params":[]}"#.to_string();
1754 let exit_response = test_validator.handle_request(&contact_info_request);
1755 let actual_parsed_response: Value =
1756 serde_json::from_str(&exit_response.expect("actual response"))
1757 .expect("actual response deserialization");
1758 assert_eq!(actual_parsed_response, expected_parsed_response);
1759 }
1760
1761 #[test]
1762 fn test_is_generating_snapshots() {
1763 let rpc = RpcHandler::start_with_config(TestConfig::default());
1765 let RpcHandler { io, meta, .. } = rpc;
1766
1767 let request = r#"{"jsonrpc":"2.0","id":1,"method":"isGeneratingSnapshots","params":[]}"#;
1768 let response = io.handle_request_sync(request, meta.clone());
1769 let result: Value = serde_json::from_str(&response.expect("actual response"))
1770 .expect("actual response deserialization");
1771
1772 assert!(result["result"].is_boolean());
1774 assert!(result["result"].as_bool().unwrap());
1776 }
1777
1778 #[test]
1779 fn test_is_generating_snapshots_no_controller() {
1780 let rpc = RpcHandler::start_with_config(TestConfig::default());
1782 let RpcHandler { io, .. } = rpc;
1783
1784 let request = r#"{"jsonrpc":"2.0","id":1,"method":"isGeneratingSnapshots","params":[]}"#;
1786 let validator_exit = create_validator_exit(Arc::new(AtomicBool::new(false)));
1787 let authorized_voter_keypairs = Arc::new(RwLock::new(vec![Arc::new(Keypair::new())]));
1788 let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
1789
1790 let meta_no_post_init = AdminRpcRequestMetadata {
1791 rpc_addr: None,
1792 start_time: SystemTime::now(),
1793 start_progress,
1794 validator_exit,
1795 validator_exit_backpressure: HashMap::default(),
1796 authorized_voter_keypairs,
1797 tower_storage: Arc::new(NullTowerStorage {}),
1798 post_init: Arc::new(RwLock::new(None)),
1799 staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
1800 rpc_to_plugin_manager_sender: None,
1801 };
1802
1803 let response = io.handle_request_sync(request, meta_no_post_init);
1804 let result: Value = serde_json::from_str(&response.expect("actual response"))
1805 .expect("actual response deserialization");
1806
1807 assert!(result["error"].is_object());
1809 assert_eq!(
1810 result["error"]["message"].as_str().unwrap(),
1811 "snapshot_controller unavailable"
1812 );
1813 }
1814}