1use {
2 crossbeam_channel::Sender,
3 jsonrpc_core::{BoxFuture, ErrorCode, MetaIoHandler, Metadata, Result},
4 jsonrpc_core_client::{RpcError, transports::ipc},
5 jsonrpc_derive::rpc,
6 jsonrpc_ipc_server::{
7 RequestContext, ServerBuilder, tokio::sync::oneshot::channel as oneshot_channel,
8 },
9 log::*,
10 serde::{Deserialize, Serialize, de::Deserializer},
11 solana_accounts_db::accounts_index::AccountIndex,
12 solana_clock::Slot,
13 solana_core::{
14 admin_rpc_post_init::AdminRpcRequestMetadataPostInit,
15 banking_stage::{
16 BankingControlMsg, BankingStage,
17 transaction_scheduler::scheduler_controller::SchedulerConfig,
18 },
19 consensus::{Tower, tower_storage::TowerStorage},
20 repair::repair_service,
21 validator::{
22 BlockProductionMethod, SchedulerPacing, TransactionStructure, ValidatorStartProgress,
23 },
24 },
25 solana_geyser_plugin_manager::GeyserPluginManagerRequest,
26 solana_gossip::contact_info::{ContactInfo, Protocol, SOCKET_ADDR_UNSPECIFIED},
27 solana_keypair::{Keypair, read_keypair_file},
28 solana_metrics::{datapoint_info, datapoint_warn},
29 solana_pubkey::Pubkey,
30 solana_rpc::rpc::verify_pubkey,
31 solana_rpc_client_api::{config::RpcAccountIndex, custom_error::RpcCustomError},
32 solana_runtime::snapshot_controller::SnapshotController,
33 solana_signer::Signer,
34 solana_validator_exit::Exit,
35 std::{
36 collections::{HashMap, HashSet},
37 env, error,
38 fmt::{self, Display},
39 net::{IpAddr, SocketAddr},
40 num::NonZeroUsize,
41 path::{Path, PathBuf},
42 sync::{
43 Arc, RwLock,
44 atomic::{AtomicBool, Ordering},
45 },
46 thread::{self, Builder},
47 time::{Duration, Instant, SystemTime},
48 },
49 tokio::runtime::Runtime,
50};
51
52#[derive(Clone)]
53pub struct AdminRpcRequestMetadata {
54 pub rpc_addr: Option<SocketAddr>,
55 pub start_time: SystemTime,
56 pub start_progress: Arc<RwLock<ValidatorStartProgress>>,
57 pub validator_exit: Arc<RwLock<Exit>>,
58 pub validator_exit_backpressure: HashMap<String, Arc<AtomicBool>>,
59 pub authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
60 pub tower_storage: Arc<dyn TowerStorage>,
61 pub staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
62 pub post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
63 pub rpc_to_plugin_manager_sender: Option<Sender<GeyserPluginManagerRequest>>,
64}
65
66impl Metadata for AdminRpcRequestMetadata {}
67
68impl AdminRpcRequestMetadata {
69 fn with_post_init<F, R>(&self, func: F) -> Result<R>
70 where
71 F: FnOnce(&AdminRpcRequestMetadataPostInit) -> Result<R>,
72 {
73 if let Some(post_init) = self.post_init.read().unwrap().as_ref() {
74 func(post_init)
75 } else {
76 Err(jsonrpc_core::error::Error::invalid_params(
77 "Retry once validator start up is complete",
78 ))
79 }
80 }
81
82 fn snapshot_controller(&self) -> Option<Arc<SnapshotController>> {
83 self.with_post_init(|post_init| Ok(post_init.snapshot_controller.clone()))
84 .map_err(|_| {
85 warn!("snapshot_controller unavailable, shutting down without taking snapshot");
87 })
88 .ok()
89 }
90}
91
92#[derive(Debug, Deserialize, Serialize)]
93pub struct AdminRpcContactInfo {
94 pub id: String,
95 pub gossip: SocketAddr,
96 pub tvu: SocketAddr,
97 pub serve_repair_quic: SocketAddr,
98 pub tpu: SocketAddr,
99 pub tpu_forwards: SocketAddr,
100 pub tpu_vote: SocketAddr,
101 pub rpc: SocketAddr,
102 pub rpc_pubsub: SocketAddr,
103 pub serve_repair: SocketAddr,
104 pub last_updated_timestamp: u64,
105 pub shred_version: u16,
106}
107
108#[derive(Debug, Deserialize, Serialize)]
109pub struct AdminRpcRepairWhitelist {
110 pub whitelist: Vec<Pubkey>,
111}
112
113impl From<ContactInfo> for AdminRpcContactInfo {
114 fn from(node: ContactInfo) -> Self {
115 macro_rules! unwrap_socket {
116 ($name:ident) => {
117 node.$name().unwrap_or(SOCKET_ADDR_UNSPECIFIED)
118 };
119 ($name:ident, $protocol:expr) => {
120 node.$name($protocol).unwrap_or(SOCKET_ADDR_UNSPECIFIED)
121 };
122 }
123 Self {
124 id: node.pubkey().to_string(),
125 last_updated_timestamp: node.wallclock(),
126 gossip: unwrap_socket!(gossip),
127 tvu: unwrap_socket!(tvu, Protocol::UDP),
128 serve_repair_quic: unwrap_socket!(serve_repair, Protocol::QUIC),
129 tpu: unwrap_socket!(tpu, Protocol::UDP),
130 tpu_forwards: unwrap_socket!(tpu_forwards, Protocol::UDP),
131 tpu_vote: unwrap_socket!(tpu_vote, Protocol::UDP),
132 rpc: unwrap_socket!(rpc),
133 rpc_pubsub: unwrap_socket!(rpc_pubsub),
134 serve_repair: unwrap_socket!(serve_repair, Protocol::UDP),
135 shred_version: node.shred_version(),
136 }
137 }
138}
139
140impl Display for AdminRpcContactInfo {
141 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
142 writeln!(f, "Identity: {}", self.id)?;
143 writeln!(f, "Gossip: {}", self.gossip)?;
144 writeln!(f, "TVU: {}", self.tvu)?;
145 writeln!(f, "TPU: {}", self.tpu)?;
146 writeln!(f, "TPU Forwards: {}", self.tpu_forwards)?;
147 writeln!(f, "TPU Votes: {}", self.tpu_vote)?;
148 writeln!(f, "RPC: {}", self.rpc)?;
149 writeln!(f, "RPC Pubsub: {}", self.rpc_pubsub)?;
150 writeln!(f, "Serve Repair: {}", self.serve_repair)?;
151 writeln!(f, "Last Updated Timestamp: {}", self.last_updated_timestamp)?;
152 writeln!(f, "Shred Version: {}", self.shred_version)
153 }
154}
155impl solana_cli_output::VerboseDisplay for AdminRpcContactInfo {}
156impl solana_cli_output::QuietDisplay for AdminRpcContactInfo {}
157
158impl Display for AdminRpcRepairWhitelist {
159 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
160 writeln!(f, "Repair whitelist: {:?}", &self.whitelist)
161 }
162}
163impl solana_cli_output::VerboseDisplay for AdminRpcRepairWhitelist {}
164impl solana_cli_output::QuietDisplay for AdminRpcRepairWhitelist {}
165
166#[rpc]
167pub trait AdminRpc {
168 type Metadata;
169
170 #[rpc(meta, name = "exit")]
173 fn exit(&self, meta: Self::Metadata) -> Result<()>;
174
175 #[rpc(meta, name = "pid")]
177 fn pid(&self, meta: Self::Metadata) -> Result<u32>;
178
179 #[rpc(meta, name = "reloadPlugin")]
180 fn reload_plugin(
181 &self,
182 meta: Self::Metadata,
183 name: String,
184 config_file: String,
185 ) -> BoxFuture<Result<()>>;
186
187 #[rpc(meta, name = "unloadPlugin")]
188 fn unload_plugin(&self, meta: Self::Metadata, name: String) -> BoxFuture<Result<()>>;
189
190 #[rpc(meta, name = "loadPlugin")]
191 fn load_plugin(&self, meta: Self::Metadata, config_file: String) -> BoxFuture<Result<String>>;
192
193 #[rpc(meta, name = "listPlugins")]
194 fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture<Result<Vec<String>>>;
195
196 #[rpc(meta, name = "rpcAddress")]
197 fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>>;
198
199 #[rpc(name = "setLogFilter")]
200 fn set_log_filter(&self, filter: String) -> Result<()>;
201
202 #[rpc(meta, name = "startTime")]
203 fn start_time(&self, meta: Self::Metadata) -> Result<SystemTime>;
204
205 #[rpc(meta, name = "startProgress")]
206 fn start_progress(&self, meta: Self::Metadata) -> Result<ValidatorStartProgress>;
207
208 #[rpc(meta, name = "addAuthorizedVoter")]
209 fn add_authorized_voter(&self, meta: Self::Metadata, keypair_file: String) -> Result<()>;
210
211 #[rpc(meta, name = "addAuthorizedVoterFromBytes")]
212 fn add_authorized_voter_from_bytes(&self, meta: Self::Metadata, keypair: Vec<u8>)
213 -> Result<()>;
214
215 #[rpc(meta, name = "removeAllAuthorizedVoters")]
216 fn remove_all_authorized_voters(&self, meta: Self::Metadata) -> Result<()>;
217
218 #[rpc(meta, name = "setIdentity")]
219 fn set_identity(
220 &self,
221 meta: Self::Metadata,
222 keypair_file: String,
223 require_tower: bool,
224 ) -> Result<()>;
225
226 #[rpc(meta, name = "setIdentityFromBytes")]
227 fn set_identity_from_bytes(
228 &self,
229 meta: Self::Metadata,
230 identity_keypair: Vec<u8>,
231 require_tower: bool,
232 ) -> Result<()>;
233
234 #[rpc(meta, name = "setStakedNodesOverrides")]
235 fn set_staked_nodes_overrides(&self, meta: Self::Metadata, path: String) -> Result<()>;
236
237 #[rpc(meta, name = "contactInfo")]
238 fn contact_info(&self, meta: Self::Metadata) -> Result<AdminRpcContactInfo>;
239
240 #[rpc(meta, name = "selectActiveInterface")]
241 fn select_active_interface(&self, meta: Self::Metadata, interface: IpAddr) -> Result<()>;
242
243 #[rpc(meta, name = "repairShredFromPeer")]
244 fn repair_shred_from_peer(
245 &self,
246 meta: Self::Metadata,
247 pubkey: Option<Pubkey>,
248 slot: u64,
249 shred_index: u64,
250 ) -> Result<()>;
251
252 #[rpc(meta, name = "repairWhitelist")]
253 fn repair_whitelist(&self, meta: Self::Metadata) -> Result<AdminRpcRepairWhitelist>;
254
255 #[rpc(meta, name = "setRepairWhitelist")]
256 fn set_repair_whitelist(&self, meta: Self::Metadata, whitelist: Vec<Pubkey>) -> Result<()>;
257
258 #[rpc(meta, name = "getSecondaryIndexKeySize")]
259 fn get_secondary_index_key_size(
260 &self,
261 meta: Self::Metadata,
262 pubkey_str: String,
263 ) -> Result<HashMap<RpcAccountIndex, usize>>;
264
265 #[rpc(meta, name = "setPublicTpuAddress")]
266 fn set_public_tpu_address(
267 &self,
268 meta: Self::Metadata,
269 public_tpu_addr: SocketAddr,
270 ) -> Result<()>;
271
272 #[rpc(meta, name = "setPublicTpuForwardsAddress")]
273 fn set_public_tpu_forwards_address(
274 &self,
275 meta: Self::Metadata,
276 public_tpu_forwards_addr: SocketAddr,
277 ) -> Result<()>;
278
279 #[rpc(meta, name = "setPublicTvuAddress")]
280 fn set_public_tvu_address(
281 &self,
282 meta: Self::Metadata,
283 public_tvu_addr: SocketAddr,
284 ) -> Result<()>;
285
286 #[rpc(meta, name = "manageBlockProduction")]
287 fn manage_block_production(
288 &self,
289 meta: Self::Metadata,
290 block_production_method: BlockProductionMethod,
291 transaction_struct: TransactionStructure,
292 num_workers: NonZeroUsize,
293 scheduler_pacing: SchedulerPacing,
294 ) -> Result<()>;
295
296 #[rpc(meta, name = "isGeneratingSnapshots")]
297 fn is_generating_snapshots(&self, meta: Self::Metadata) -> Result<bool>;
298
299 #[rpc(meta, name = "blockstorePurge")]
300 fn blockstore_purge(&self, meta: Self::Metadata, maximum_purge_slot: Slot) -> Result<()>;
301}
302
303pub struct AdminRpcImpl;
304impl AdminRpc for AdminRpcImpl {
305 type Metadata = AdminRpcRequestMetadata;
306
307 fn exit(&self, meta: Self::Metadata) -> Result<()> {
308 debug!("exit admin rpc request received");
309
310 thread::Builder::new()
311 .name("solProcessExit".into())
312 .spawn(move || {
313 let start_time = Instant::now();
314
315 if let Some(snapshot_controller) = meta.snapshot_controller() {
317 let latest_snapshot_slot = snapshot_controller.latest_bank_snapshot_slot();
318
319 info!("Requesting fastboot snapshot before exit");
320 snapshot_controller.request_fastboot_snapshot();
321
322 let timeout = Duration::from_secs(5);
326 while snapshot_controller.latest_bank_snapshot_slot() == latest_snapshot_slot {
327 if start_time.elapsed() > timeout {
328 warn!("Timeout waiting for snapshot to complete");
329 datapoint_warn!(
330 "admin-rpc-snapshot-timeout",
331 ("timeout_us", start_time.elapsed().as_micros(), i64)
332 );
333 break;
334 }
335 thread::sleep(Duration::from_millis(100));
336 }
337 info!(
338 "Requesting fastboot snapshot before exit... Done in {:?}",
339 start_time.elapsed()
340 );
341 }
342
343 if start_time.elapsed().as_millis() < 100 {
347 thread::sleep(Duration::from_millis(100));
348 }
349
350 info!("validator exit requested");
351 meta.validator_exit.write().unwrap().exit();
352
353 if !meta.validator_exit_backpressure.is_empty() {
354 let service_names = meta.validator_exit_backpressure.keys();
355 info!("Wait for these services to complete: {service_names:?}");
356 loop {
357 thread::sleep(Duration::from_secs(1));
361
362 let mut any_flags_raised = false;
363 for (name, flag) in meta.validator_exit_backpressure.iter() {
364 let is_flag_raised = flag.load(Ordering::Relaxed);
365 if is_flag_raised {
366 info!("{name}'s exit backpressure flag is raised");
367 any_flags_raised = true;
368 }
369 }
370 if !any_flags_raised {
371 break;
372 }
373 }
374 info!("All services have completed");
375 }
376
377 thread::sleep(Duration::from_secs(
382 env::var("SOLANA_VALIDATOR_EXIT_TIMEOUT")
383 .ok()
384 .and_then(|x| x.parse().ok())
385 .unwrap_or(5),
386 ));
387 warn!("validator exit timeout");
388 std::process::exit(0);
389 })
390 .unwrap();
391
392 Ok(())
393 }
394
395 fn pid(&self, _meta: Self::Metadata) -> Result<u32> {
396 Ok(std::process::id())
397 }
398
399 fn reload_plugin(
400 &self,
401 meta: Self::Metadata,
402 name: String,
403 config_file: String,
404 ) -> BoxFuture<Result<()>> {
405 Box::pin(async move {
406 let (response_sender, response_receiver) = oneshot_channel();
408
409 if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
411 rpc_to_manager_sender
412 .send(GeyserPluginManagerRequest::ReloadPlugin {
413 name,
414 config_file,
415 response_sender,
416 })
417 .expect("GeyerPluginService should never drop request receiver");
418 } else {
419 return Err(jsonrpc_core::Error {
420 code: ErrorCode::InvalidRequest,
421 message: "No geyser plugin service".to_string(),
422 data: None,
423 });
424 }
425
426 response_receiver
428 .await
429 .expect("GeyerPluginService's oneshot sender shouldn't drop early")
430 })
431 }
432
433 fn load_plugin(&self, meta: Self::Metadata, config_file: String) -> BoxFuture<Result<String>> {
434 Box::pin(async move {
435 let (response_sender, response_receiver) = oneshot_channel();
437
438 if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
440 rpc_to_manager_sender
441 .send(GeyserPluginManagerRequest::LoadPlugin {
442 config_file,
443 response_sender,
444 })
445 .expect("GeyerPluginService should never drop request receiver");
446 } else {
447 return Err(jsonrpc_core::Error {
448 code: ErrorCode::InvalidRequest,
449 message: "No geyser plugin service".to_string(),
450 data: None,
451 });
452 }
453
454 response_receiver
456 .await
457 .expect("GeyerPluginService's oneshot sender shouldn't drop early")
458 })
459 }
460
461 fn unload_plugin(&self, meta: Self::Metadata, name: String) -> BoxFuture<Result<()>> {
462 Box::pin(async move {
463 let (response_sender, response_receiver) = oneshot_channel();
465
466 if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
468 rpc_to_manager_sender
469 .send(GeyserPluginManagerRequest::UnloadPlugin {
470 name,
471 response_sender,
472 })
473 .expect("GeyerPluginService should never drop request receiver");
474 } else {
475 return Err(jsonrpc_core::Error {
476 code: ErrorCode::InvalidRequest,
477 message: "No geyser plugin service".to_string(),
478 data: None,
479 });
480 }
481
482 response_receiver
484 .await
485 .expect("GeyerPluginService's oneshot sender shouldn't drop early")
486 })
487 }
488
489 fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture<Result<Vec<String>>> {
490 Box::pin(async move {
491 let (response_sender, response_receiver) = oneshot_channel();
493
494 if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
496 rpc_to_manager_sender
497 .send(GeyserPluginManagerRequest::ListPlugins { response_sender })
498 .expect("GeyerPluginService should never drop request receiver");
499 } else {
500 return Err(jsonrpc_core::Error {
501 code: ErrorCode::InvalidRequest,
502 message: "No geyser plugin service".to_string(),
503 data: None,
504 });
505 }
506
507 response_receiver
509 .await
510 .expect("GeyerPluginService's oneshot sender shouldn't drop early")
511 })
512 }
513
514 fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>> {
515 debug!("rpc_addr admin rpc request received");
516 Ok(meta.rpc_addr)
517 }
518
519 fn set_log_filter(&self, filter: String) -> Result<()> {
520 debug!("set_log_filter admin rpc request received");
521 agave_logger::setup_with(&filter);
522 Ok(())
523 }
524
525 fn start_time(&self, meta: Self::Metadata) -> Result<SystemTime> {
526 debug!("start_time admin rpc request received");
527 Ok(meta.start_time)
528 }
529
530 fn start_progress(&self, meta: Self::Metadata) -> Result<ValidatorStartProgress> {
531 debug!("start_progress admin rpc request received");
532 Ok(*meta.start_progress.read().unwrap())
533 }
534
535 fn add_authorized_voter(&self, meta: Self::Metadata, keypair_file: String) -> Result<()> {
536 debug!("add_authorized_voter request received");
537
538 let authorized_voter = read_keypair_file(keypair_file)
539 .map_err(|err| jsonrpc_core::error::Error::invalid_params(format!("{err}")))?;
540
541 AdminRpcImpl::add_authorized_voter_keypair(meta, authorized_voter)
542 }
543
544 fn add_authorized_voter_from_bytes(
545 &self,
546 meta: Self::Metadata,
547 keypair: Vec<u8>,
548 ) -> Result<()> {
549 debug!("add_authorized_voter_from_bytes request received");
550
551 let authorized_voter = Keypair::try_from(keypair.as_ref()).map_err(|err| {
552 jsonrpc_core::error::Error::invalid_params(format!(
553 "Failed to read authorized voter keypair from provided byte array: {err}"
554 ))
555 })?;
556
557 AdminRpcImpl::add_authorized_voter_keypair(meta, authorized_voter)
558 }
559
560 fn remove_all_authorized_voters(&self, meta: Self::Metadata) -> Result<()> {
561 debug!("remove_all_authorized_voters received");
562 meta.authorized_voter_keypairs.write().unwrap().clear();
563 Ok(())
564 }
565
566 fn set_identity(
567 &self,
568 meta: Self::Metadata,
569 keypair_file: String,
570 require_tower: bool,
571 ) -> Result<()> {
572 debug!("set_identity request received");
573
574 let identity_keypair = read_keypair_file(&keypair_file).map_err(|err| {
575 jsonrpc_core::error::Error::invalid_params(format!(
576 "Failed to read identity keypair from {keypair_file}: {err}"
577 ))
578 })?;
579
580 AdminRpcImpl::set_identity_keypair(meta, identity_keypair, require_tower)
581 }
582
583 fn set_identity_from_bytes(
584 &self,
585 meta: Self::Metadata,
586 identity_keypair: Vec<u8>,
587 require_tower: bool,
588 ) -> Result<()> {
589 debug!("set_identity_from_bytes request received");
590
591 let identity_keypair = Keypair::try_from(identity_keypair.as_ref()).map_err(|err| {
592 jsonrpc_core::error::Error::invalid_params(format!(
593 "Failed to read identity keypair from provided byte array: {err}"
594 ))
595 })?;
596
597 AdminRpcImpl::set_identity_keypair(meta, identity_keypair, require_tower)
598 }
599
600 fn set_staked_nodes_overrides(&self, meta: Self::Metadata, path: String) -> Result<()> {
601 let loaded_config = load_staked_nodes_overrides(&path)
602 .map_err(|err| {
603 error!(
604 "Failed to load staked nodes overrides from {}: {}",
605 &path, err
606 );
607 jsonrpc_core::error::Error::internal_error()
608 })?
609 .staked_map_id;
610 let mut write_staked_nodes = meta.staked_nodes_overrides.write().unwrap();
611 write_staked_nodes.clear();
612 write_staked_nodes.extend(loaded_config);
613 info!("Staked nodes overrides loaded from {path}");
614 debug!("overrides map: {write_staked_nodes:?}");
615 Ok(())
616 }
617
618 fn contact_info(&self, meta: Self::Metadata) -> Result<AdminRpcContactInfo> {
619 meta.with_post_init(|post_init| Ok(post_init.cluster_info.my_contact_info().into()))
620 }
621
622 fn select_active_interface(&self, meta: Self::Metadata, interface: IpAddr) -> Result<()> {
623 debug!("select_active_interface received: {interface}");
624 meta.with_post_init(|post_init| {
625 let node = post_init.node.as_ref().ok_or_else(|| {
626 jsonrpc_core::Error::invalid_params("`Node` not initialized in post_init")
627 })?;
628
629 node.switch_active_interface(interface, &post_init.cluster_info)
630 .map_err(|e| {
631 jsonrpc_core::Error::invalid_params(format!(
632 "Switching failed due to error {e}"
633 ))
634 })?;
635 info!("Switched primary interface to {interface}");
636 Ok(())
637 })
638 }
639
640 fn repair_shred_from_peer(
641 &self,
642 meta: Self::Metadata,
643 pubkey: Option<Pubkey>,
644 slot: u64,
645 shred_index: u64,
646 ) -> Result<()> {
647 debug!("repair_shred_from_peer request received");
648
649 meta.with_post_init(|post_init| {
650 repair_service::RepairService::request_repair_for_shred_from_peer(
651 post_init.cluster_info.clone(),
652 post_init.cluster_slots.clone(),
653 pubkey,
654 slot,
655 shred_index,
656 &post_init.repair_socket,
657 post_init.outstanding_repair_requests.clone(),
658 );
659 Ok(())
660 })
661 }
662
663 fn repair_whitelist(&self, meta: Self::Metadata) -> Result<AdminRpcRepairWhitelist> {
664 debug!("repair_whitelist request received");
665
666 meta.with_post_init(|post_init| {
667 let whitelist: Vec<_> = post_init
668 .repair_whitelist
669 .read()
670 .unwrap()
671 .iter()
672 .copied()
673 .collect();
674 Ok(AdminRpcRepairWhitelist { whitelist })
675 })
676 }
677
678 fn set_repair_whitelist(&self, meta: Self::Metadata, whitelist: Vec<Pubkey>) -> Result<()> {
679 debug!("set_repair_whitelist request received");
680
681 let whitelist: HashSet<Pubkey> = whitelist.into_iter().collect();
682 meta.with_post_init(|post_init| {
683 *post_init.repair_whitelist.write().unwrap() = whitelist;
684 warn!(
685 "Repair whitelist set to {:?}",
686 &post_init.repair_whitelist.read().unwrap()
687 );
688 Ok(())
689 })
690 }
691
692 fn get_secondary_index_key_size(
693 &self,
694 meta: Self::Metadata,
695 pubkey_str: String,
696 ) -> Result<HashMap<RpcAccountIndex, usize>> {
697 debug!("get_secondary_index_key_size rpc request received: {pubkey_str:?}");
698 let index_key = verify_pubkey(&pubkey_str)?;
699 meta.with_post_init(|post_init| {
700 let bank = post_init.bank_forks.read().unwrap().root_bank();
701
702 let enabled_account_indexes = &bank.accounts().accounts_db.account_indexes;
704
705 if enabled_account_indexes.is_empty() {
707 debug!("get_secondary_index_key_size: secondary index not enabled.");
708 return Ok(HashMap::new());
709 };
710
711 if !enabled_account_indexes.include_key(&index_key) {
713 return Err(RpcCustomError::KeyExcludedFromSecondaryIndex {
714 index_key: index_key.to_string(),
715 }
716 .into());
717 }
718
719 let accounts_index = &bank.accounts().accounts_db.accounts_index;
721
722 let found_sizes = enabled_account_indexes
724 .indexes
725 .iter()
726 .filter_map(|index| {
727 accounts_index
728 .get_index_key_size(index, &index_key)
729 .map(|size| (rpc_account_index_from_account_index(index), size))
730 })
731 .collect::<HashMap<_, _>>();
732
733 if found_sizes.is_empty() {
735 debug!("get_secondary_index_key_size: key not found in the secondary index.");
736 }
737 Ok(found_sizes)
738 })
739 }
740
741 fn set_public_tpu_address(
742 &self,
743 meta: Self::Metadata,
744 public_tpu_addr: SocketAddr,
745 ) -> Result<()> {
746 debug!("set_public_tpu_address rpc request received: {public_tpu_addr}");
747
748 meta.with_post_init(|post_init| {
749 post_init
750 .cluster_info
751 .my_contact_info()
752 .tpu(Protocol::QUIC)
753 .ok_or_else(|| {
754 error!(
755 "The public TPU QUIC address isn't being published. The node is likely in \
756 repair mode. See help for --restricted-repair-only-mode for more \
757 information."
758 );
759 jsonrpc_core::error::Error::internal_error()
760 })?;
761 post_init
762 .cluster_info
763 .set_tpu_quic(public_tpu_addr)
764 .map_err(|err| {
765 error!("Failed to set public TPU QUIC address to {public_tpu_addr}: {err}");
766 jsonrpc_core::error::Error::internal_error()
767 })?;
768 let my_contact_info = post_init.cluster_info.my_contact_info();
769 warn!(
770 "Public TPU addresses set to {:?} (quic)",
771 my_contact_info.tpu(Protocol::QUIC),
772 );
773 Ok(())
774 })
775 }
776
777 fn set_public_tpu_forwards_address(
778 &self,
779 meta: Self::Metadata,
780 public_tpu_forwards_addr: SocketAddr,
781 ) -> Result<()> {
782 debug!("set_public_tpu_forwards_address rpc request received: {public_tpu_forwards_addr}");
783
784 meta.with_post_init(|post_init| {
785 post_init
786 .cluster_info
787 .my_contact_info()
788 .tpu_forwards(Protocol::QUIC)
789 .ok_or_else(|| {
790 error!(
791 "The public TPU Forwards address isn't being published. The node is \
792 likely in repair mode. See help for --restricted-repair-only-mode for \
793 more information."
794 );
795 jsonrpc_core::error::Error::internal_error()
796 })?;
797 post_init
798 .cluster_info
799 .set_tpu_forwards_quic(public_tpu_forwards_addr)
800 .map_err(|err| {
801 error!(
802 "Failed to set public TPU QUIC address to {public_tpu_forwards_addr}: \
803 {err}"
804 );
805 jsonrpc_core::error::Error::internal_error()
806 })?;
807 let my_contact_info = post_init.cluster_info.my_contact_info();
808 warn!(
809 "Public TPU Forwards address set to {:?} (quic)",
810 my_contact_info.tpu_forwards(Protocol::QUIC),
811 );
812 Ok(())
813 })
814 }
815
816 fn set_public_tvu_address(
817 &self,
818 meta: Self::Metadata,
819 public_tvu_addr: SocketAddr,
820 ) -> Result<()> {
821 debug!("set_public_tvu_address rpc request received: {public_tvu_addr}");
822
823 meta.with_post_init(|post_init| {
824 post_init
825 .cluster_info
826 .my_contact_info()
827 .tvu(Protocol::UDP)
828 .ok_or_else(|| {
829 error!(
830 "The public TVU address isn't being published. The node is likely in \
831 repair mode. See help for --restricted-repair-only-mode for more \
832 information."
833 );
834 jsonrpc_core::error::Error::internal_error()
835 })?;
836 post_init
837 .cluster_info
838 .set_tvu_socket(public_tvu_addr)
839 .map_err(|err| {
840 error!("Failed to set public TVU address to {public_tvu_addr}: {err}");
841 jsonrpc_core::error::Error::internal_error()
842 })?;
843 let my_contact_info = post_init.cluster_info.my_contact_info();
844 warn!(
845 "Public TVU addresses set to {:?}",
846 my_contact_info.tvu(Protocol::UDP),
847 );
848 Ok(())
849 })
850 }
851
852 fn manage_block_production(
853 &self,
854 meta: Self::Metadata,
855 block_production_method: BlockProductionMethod,
856 transaction_struct: TransactionStructure,
857 num_workers: NonZeroUsize,
858 scheduler_pacing: SchedulerPacing,
859 ) -> Result<()> {
860 debug!("manage_block_production rpc request received");
861
862 if num_workers > BankingStage::max_num_workers() {
863 return Err(jsonrpc_core::error::Error::invalid_params(format!(
864 "Number of workers ({}) exceeds maximum allowed ({})",
865 num_workers,
866 BankingStage::max_num_workers()
867 )));
868 }
869
870 if transaction_struct != TransactionStructure::View {
871 warn!("TransactionStructure::Sdk has no effect on block production");
872 }
873
874 meta.with_post_init(|post_init| {
875 if post_init
876 .banking_control_sender
877 .try_send(BankingControlMsg::Internal {
878 block_production_method,
879 num_workers,
880 config: SchedulerConfig { scheduler_pacing },
881 })
882 .is_err()
883 {
884 error!("Banking stage already switching schedulers");
885
886 return Err(jsonrpc_core::error::Error::internal_error());
887 }
888
889 Ok(())
890 })
891 }
892
893 fn is_generating_snapshots(&self, meta: Self::Metadata) -> Result<bool> {
894 if let Some(snapshot_controller) = meta.snapshot_controller() {
895 Ok(snapshot_controller.is_generating_snapshots())
896 } else {
897 Err(jsonrpc_core::error::Error::invalid_params(
898 "snapshot_controller unavailable",
899 ))
900 }
901 }
902
903 fn blockstore_purge(&self, meta: Self::Metadata, maximum_purge_slot: Slot) -> Result<()> {
904 meta.with_post_init(|post_init| {
905 post_init
906 .blockstore
907 .send_manual_purge_request(maximum_purge_slot)
908 .map_err(|err| jsonrpc_core::Error {
909 code: ErrorCode::InvalidRequest,
910 message: format!("{err}"),
911 data: None,
912 })
913 })
914 }
915}
916
917impl AdminRpcImpl {
918 fn add_authorized_voter_keypair(
919 meta: AdminRpcRequestMetadata,
920 authorized_voter: Keypair,
921 ) -> Result<()> {
922 let mut authorized_voter_keypairs = meta.authorized_voter_keypairs.write().unwrap();
923
924 if authorized_voter_keypairs
925 .iter()
926 .any(|x| x.pubkey() == authorized_voter.pubkey())
927 {
928 Err(jsonrpc_core::error::Error::invalid_params(
929 "Authorized voter already present",
930 ))
931 } else {
932 authorized_voter_keypairs.push(Arc::new(authorized_voter));
933 Ok(())
934 }
935 }
936
937 fn set_identity_keypair(
938 meta: AdminRpcRequestMetadata,
939 identity_keypair: Keypair,
940 require_tower: bool,
941 ) -> Result<()> {
942 meta.with_post_init(|post_init| {
943 if require_tower {
944 let _ = Tower::restore(meta.tower_storage.as_ref(), &identity_keypair.pubkey())
945 .map_err(|err| {
946 jsonrpc_core::error::Error::invalid_params(format!(
947 "Unable to load tower file for identity {}: {}",
948 identity_keypair.pubkey(),
949 err
950 ))
951 })?;
952 }
953
954 for (key, notifier) in &*post_init.notifies.read().unwrap() {
955 if let Err(err) = notifier.update_key(&identity_keypair) {
956 error!("Error updating network layer keypair: {err} on {key:?}");
957 }
958 }
959
960 let old_identity = post_init.cluster_info.id();
961 let new_identity = identity_keypair.pubkey();
962 solana_metrics::set_host_id(new_identity.to_string());
963 datapoint_info!(
965 "validator-set_identity",
966 ("old_id", old_identity.to_string(), String),
967 ("new_id", new_identity.to_string(), String),
968 ("version", solana_version::version!(), String),
969 );
970 post_init
971 .cluster_info
972 .set_keypair(Arc::new(identity_keypair));
973 warn!("Identity set to {new_identity}");
974 Ok(())
975 })
976 }
977}
978
979fn rpc_account_index_from_account_index(account_index: &AccountIndex) -> RpcAccountIndex {
980 match account_index {
981 AccountIndex::ProgramId => RpcAccountIndex::ProgramId,
982 AccountIndex::SplTokenOwner => RpcAccountIndex::SplTokenOwner,
983 AccountIndex::SplTokenMint => RpcAccountIndex::SplTokenMint,
984 }
985}
986
987pub fn run(ledger_path: &Path, metadata: AdminRpcRequestMetadata) {
989 let admin_rpc_path = admin_rpc_path(ledger_path);
990
991 let event_loop = tokio::runtime::Builder::new_multi_thread()
992 .thread_name("solAdminRpcEl")
993 .worker_threads(3) .enable_all()
995 .build()
996 .unwrap();
997
998 Builder::new()
999 .name("solAdminRpc".to_string())
1000 .spawn(move || {
1001 let mut io = MetaIoHandler::default();
1002 io.extend_with(AdminRpcImpl.to_delegate());
1003
1004 let validator_exit = metadata.validator_exit.clone();
1005 let server = ServerBuilder::with_meta_extractor(io, move |_req: &RequestContext| {
1006 metadata.clone()
1007 })
1008 .event_loop_executor(event_loop.handle().clone())
1009 .start(&format!("{}", admin_rpc_path.display()));
1010
1011 match server {
1012 Err(err) => {
1013 warn!("Unable to start admin rpc service: {err:?}");
1014 }
1015 Ok(server) => {
1016 info!("started admin rpc service!");
1017 let close_handle = server.close_handle();
1018 validator_exit
1019 .write()
1020 .unwrap()
1021 .register_exit(Box::new(move || {
1022 close_handle.close();
1023 }));
1024
1025 server.wait();
1026 }
1027 }
1028 })
1029 .unwrap();
1030}
1031
1032fn admin_rpc_path(ledger_path: &Path) -> PathBuf {
1033 #[cfg(target_family = "windows")]
1034 {
1035 if let Some(ledger_filename) = ledger_path.file_name() {
1038 PathBuf::from(format!(
1039 "\\\\.\\pipe\\{}-admin.rpc",
1040 ledger_filename.to_string_lossy()
1041 ))
1042 } else {
1043 PathBuf::from("\\\\.\\pipe\\admin.rpc")
1044 }
1045 }
1046 #[cfg(not(target_family = "windows"))]
1047 {
1048 ledger_path.join("admin.rpc")
1049 }
1050}
1051
1052pub async fn connect(ledger_path: &Path) -> std::result::Result<gen_client::Client, RpcError> {
1054 let admin_rpc_path = admin_rpc_path(ledger_path);
1055 if !admin_rpc_path.exists() {
1056 Err(RpcError::Client(format!(
1057 "{} does not exist",
1058 admin_rpc_path.display()
1059 )))
1060 } else {
1061 ipc::connect::<_, gen_client::Client>(&format!("{}", admin_rpc_path.display())).await
1062 }
1063}
1064
1065pub fn runtime() -> Runtime {
1067 tokio::runtime::Builder::new_multi_thread()
1068 .thread_name("solAdminRpcRt")
1069 .enable_all()
1070 .worker_threads(2)
1073 .build()
1074 .expect("new tokio runtime")
1075}
1076
1077#[derive(Default, Deserialize, Clone)]
1078pub struct StakedNodesOverrides {
1079 #[serde(deserialize_with = "deserialize_pubkey_map")]
1080 pub staked_map_id: HashMap<Pubkey, u64>,
1081}
1082
1083pub fn deserialize_pubkey_map<'de, D>(des: D) -> std::result::Result<HashMap<Pubkey, u64>, D::Error>
1084where
1085 D: Deserializer<'de>,
1086{
1087 let container: HashMap<String, u64> = serde::Deserialize::deserialize(des)?;
1088 let mut container_typed: HashMap<Pubkey, u64> = HashMap::new();
1089 for (key, value) in container.iter() {
1090 let typed_key = Pubkey::try_from(key.as_str())
1091 .map_err(|_| serde::de::Error::invalid_type(serde::de::Unexpected::Map, &"PubKey"))?;
1092 container_typed.insert(typed_key, *value);
1093 }
1094 Ok(container_typed)
1095}
1096
1097pub fn load_staked_nodes_overrides(
1098 path: &String,
1099) -> std::result::Result<StakedNodesOverrides, Box<dyn error::Error>> {
1100 debug!("Loading staked nodes overrides configuration from {path}");
1101 if Path::new(&path).exists() {
1102 let file = std::fs::File::open(path)?;
1103 Ok(serde_yaml::from_reader(file)?)
1104 } else {
1105 Err(format!("Staked nodes overrides provided '{path}' a non-existing file path.").into())
1106 }
1107}
1108
1109#[cfg(test)]
1110mod tests {
1111 use {
1112 super::*,
1113 agave_snapshots::snapshot_config::SnapshotConfig,
1114 crossbeam_channel::unbounded,
1115 serde_json::Value,
1116 solana_account::{Account, AccountSharedData},
1117 solana_accounts_db::{
1118 accounts_db::{ACCOUNTS_DB_CONFIG_FOR_TESTING, AccountsDbConfig},
1119 accounts_index::AccountSecondaryIndexes,
1120 },
1121 solana_core::{
1122 admin_rpc_post_init::{KeyUpdaterType, KeyUpdaters},
1123 consensus::tower_storage::NullTowerStorage,
1124 validator::{Validator, ValidatorConfig, ValidatorTpuConfig},
1125 },
1126 solana_gossip::{cluster_info::ClusterInfo, node::Node},
1127 solana_ledger::{
1128 blockstore::Blockstore,
1129 create_new_tmp_ledger,
1130 genesis_utils::{
1131 GenesisConfigInfo, create_genesis_config, create_genesis_config_with_leader,
1132 },
1133 get_tmp_ledger_path_auto_delete,
1134 },
1135 solana_net_utils::{SocketAddrSpace, sockets::bind_to_localhost_unique},
1136 solana_program_option::COption,
1137 solana_program_pack::Pack,
1138 solana_pubkey::Pubkey,
1139 solana_rpc::rpc::create_validator_exit,
1140 solana_runtime::{
1141 bank::{Bank, BankTestConfig},
1142 bank_forks::BankForks,
1143 },
1144 solana_system_interface::program as system_program,
1145 spl_generic_token::token,
1146 spl_token_2022_interface::state::{
1147 Account as TokenAccount, AccountState as TokenAccountState, Mint,
1148 },
1149 std::{collections::HashSet, fs::remove_dir_all, sync::atomic::AtomicBool},
1150 tokio::sync::mpsc,
1151 };
1152
1153 #[derive(Default)]
1154 struct TestConfig {
1155 account_indexes: AccountSecondaryIndexes,
1156 }
1157
1158 struct RpcHandler {
1159 io: MetaIoHandler<AdminRpcRequestMetadata>,
1160 meta: AdminRpcRequestMetadata,
1161 bank_forks: Arc<RwLock<BankForks>>,
1162 }
1163
1164 impl RpcHandler {
1165 fn _start() -> Self {
1166 Self::start_with_config(TestConfig::default())
1167 }
1168
1169 fn start_with_config(config: TestConfig) -> Self {
1170 let keypair = Arc::new(Keypair::new());
1171 let cluster_info = Arc::new(ClusterInfo::new(
1172 ContactInfo::new(
1173 keypair.pubkey(),
1174 solana_time_utils::timestamp(), 0u16, ),
1177 keypair,
1178 SocketAddrSpace::Unspecified,
1179 ));
1180 let exit = Arc::new(AtomicBool::new(false));
1181 let validator_exit = create_validator_exit(exit);
1182 let (bank_forks, vote_keypair) = new_bank_forks_with_config(BankTestConfig {
1183 accounts_db_config: AccountsDbConfig {
1184 account_indexes: Some(config.account_indexes),
1185 ..ACCOUNTS_DB_CONFIG_FOR_TESTING
1186 },
1187 });
1188
1189 let (snapshot_request_sender, _) = unbounded();
1190 let snapshot_controller = Arc::new(SnapshotController::new(
1191 snapshot_request_sender.clone(),
1192 SnapshotConfig::default(),
1193 bank_forks.read().unwrap().root(),
1194 ));
1195
1196 let ledger_path = get_tmp_ledger_path_auto_delete!();
1197 let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
1198
1199 let vote_account = vote_keypair.pubkey();
1200 let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
1201 let repair_whitelist = Arc::new(RwLock::new(HashSet::new()));
1202 let meta = AdminRpcRequestMetadata {
1203 rpc_addr: None,
1204 start_time: SystemTime::now(),
1205 start_progress,
1206 validator_exit,
1207 validator_exit_backpressure: HashMap::default(),
1208 authorized_voter_keypairs: Arc::new(RwLock::new(vec![vote_keypair])),
1209 tower_storage: Arc::new(NullTowerStorage {}),
1210 post_init: Arc::new(RwLock::new(Some(AdminRpcRequestMetadataPostInit {
1211 cluster_info,
1212 bank_forks: bank_forks.clone(),
1213 vote_account,
1214 repair_whitelist,
1215 notifies: Arc::new(RwLock::new(KeyUpdaters::default())),
1216 repair_socket: Arc::new(bind_to_localhost_unique().expect("should bind")),
1217 outstanding_repair_requests: Arc::<
1218 RwLock<repair_service::OutstandingShredRepairs>,
1219 >::default(),
1220 cluster_slots: Arc::new(
1221 solana_core::cluster_slots_service::cluster_slots::ClusterSlots::default_for_tests(),
1222 ),
1223 node: None,
1224 banking_control_sender: mpsc::channel(1).0,
1225 snapshot_controller,
1226 blockstore,
1227 }))),
1228 staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
1229 rpc_to_plugin_manager_sender: None,
1230 };
1231 let mut io = MetaIoHandler::default();
1232 io.extend_with(AdminRpcImpl.to_delegate());
1233
1234 Self {
1235 io,
1236 meta,
1237 bank_forks,
1238 }
1239 }
1240
1241 fn root_bank(&self) -> Arc<Bank> {
1242 self.bank_forks.read().unwrap().root_bank()
1243 }
1244 }
1245
1246 fn new_bank_forks_with_config(
1247 config: BankTestConfig,
1248 ) -> (Arc<RwLock<BankForks>>, Arc<Keypair>) {
1249 let GenesisConfigInfo {
1250 genesis_config,
1251 voting_keypair,
1252 ..
1253 } = create_genesis_config(1_000_000_000);
1254
1255 let bank = Bank::new_with_config_for_tests(&genesis_config, config);
1256 (BankForks::new_rw_arc(bank), Arc::new(voting_keypair))
1257 }
1258
1259 #[test]
1260 fn test_secondary_index_key_sizes() {
1261 for secondary_index_enabled in [true, false] {
1262 let account_indexes = if secondary_index_enabled {
1263 AccountSecondaryIndexes {
1264 keys: None,
1265 indexes: HashSet::from([
1266 AccountIndex::ProgramId,
1267 AccountIndex::SplTokenMint,
1268 AccountIndex::SplTokenOwner,
1269 ]),
1270 }
1271 } else {
1272 AccountSecondaryIndexes::default()
1273 };
1274
1275 let rpc = RpcHandler::start_with_config(TestConfig { account_indexes });
1277
1278 let bank = rpc.root_bank();
1279 let RpcHandler { io, meta, .. } = rpc;
1280
1281 let token_account1_pubkey = Pubkey::new_unique();
1283 let token_account2_pubkey = Pubkey::new_unique();
1284 let token_account3_pubkey = Pubkey::new_unique();
1285 let mint1_pubkey = Pubkey::new_unique();
1286 let mint2_pubkey = Pubkey::new_unique();
1287 let wallet1_pubkey = Pubkey::new_unique();
1288 let wallet2_pubkey = Pubkey::new_unique();
1289 let non_existent_pubkey = Pubkey::new_unique();
1290 let delegate = Pubkey::new_unique();
1291
1292 let mut num_default_spl_token_program_accounts = 0;
1293 let mut num_default_system_program_accounts = 0;
1294
1295 if !secondary_index_enabled {
1296 let req = format!(
1298 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{token_account1_pubkey}"]}}"#,
1299 );
1300 let res = io.handle_request_sync(&req, meta.clone());
1301 let result: Value = serde_json::from_str(&res.expect("actual response"))
1302 .expect("actual response deserialization");
1303 let sizes: HashMap<RpcAccountIndex, usize> =
1304 serde_json::from_value(result["result"].clone()).unwrap();
1305 assert!(sizes.is_empty());
1306 } else {
1307 let req = format!(
1309 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1310 token::id(),
1311 );
1312 let res = io.handle_request_sync(&req, meta.clone());
1313 let result: Value = serde_json::from_str(&res.expect("actual response"))
1314 .expect("actual response deserialization");
1315 let sizes: HashMap<RpcAccountIndex, usize> =
1316 serde_json::from_value(result["result"].clone()).unwrap();
1317 assert_eq!(sizes.len(), 1);
1318 num_default_spl_token_program_accounts =
1319 *sizes.get(&RpcAccountIndex::ProgramId).unwrap();
1320 let req = format!(
1322 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1323 system_program::id(),
1324 );
1325 let res = io.handle_request_sync(&req, meta.clone());
1326 let result: Value = serde_json::from_str(&res.expect("actual response"))
1327 .expect("actual response deserialization");
1328 let sizes: HashMap<RpcAccountIndex, usize> =
1329 serde_json::from_value(result["result"].clone()).unwrap();
1330 assert_eq!(sizes.len(), 1);
1331 num_default_system_program_accounts =
1332 *sizes.get(&RpcAccountIndex::ProgramId).unwrap();
1333 }
1334
1335 let wallet1_account = AccountSharedData::from(Account {
1337 lamports: 11111111,
1338 owner: system_program::id(),
1339 ..Account::default()
1340 });
1341 bank.store_account(&wallet1_pubkey, &wallet1_account);
1342 let wallet2_account = AccountSharedData::from(Account {
1343 lamports: 11111111,
1344 owner: system_program::id(),
1345 ..Account::default()
1346 });
1347 bank.store_account(&wallet2_pubkey, &wallet2_account);
1348
1349 let mut account1_data = vec![0; TokenAccount::get_packed_len()];
1351 let token_account1 = TokenAccount {
1352 mint: mint1_pubkey,
1353 owner: wallet1_pubkey,
1354 delegate: COption::Some(delegate),
1355 amount: 420,
1356 state: TokenAccountState::Initialized,
1357 is_native: COption::None,
1358 delegated_amount: 30,
1359 close_authority: COption::Some(wallet1_pubkey),
1360 };
1361 TokenAccount::pack(token_account1, &mut account1_data).unwrap();
1362 let token_account1 = AccountSharedData::from(Account {
1363 lamports: 111,
1364 data: account1_data.to_vec(),
1365 owner: token::id(),
1366 ..Account::default()
1367 });
1368 bank.store_account(&token_account1_pubkey, &token_account1);
1369
1370 let mut mint1_data = vec![0; Mint::get_packed_len()];
1372 let mint1_state = Mint {
1373 mint_authority: COption::Some(wallet1_pubkey),
1374 supply: 500,
1375 decimals: 2,
1376 is_initialized: true,
1377 freeze_authority: COption::Some(wallet1_pubkey),
1378 };
1379 Mint::pack(mint1_state, &mut mint1_data).unwrap();
1380 let mint_account1 = AccountSharedData::from(Account {
1381 lamports: 222,
1382 data: mint1_data.to_vec(),
1383 owner: token::id(),
1384 ..Account::default()
1385 });
1386 bank.store_account(&mint1_pubkey, &mint_account1);
1387
1388 let mut account2_data = vec![0; TokenAccount::get_packed_len()];
1390 let token_account2 = TokenAccount {
1391 mint: mint1_pubkey,
1392 owner: wallet2_pubkey,
1393 delegate: COption::Some(delegate),
1394 amount: 420,
1395 state: TokenAccountState::Initialized,
1396 is_native: COption::None,
1397 delegated_amount: 30,
1398 close_authority: COption::Some(wallet2_pubkey),
1399 };
1400 TokenAccount::pack(token_account2, &mut account2_data).unwrap();
1401 let token_account2 = AccountSharedData::from(Account {
1402 lamports: 333,
1403 data: account2_data.to_vec(),
1404 owner: token::id(),
1405 ..Account::default()
1406 });
1407 bank.store_account(&token_account2_pubkey, &token_account2);
1408
1409 let mut account3_data = vec![0; TokenAccount::get_packed_len()];
1411 let token_account3 = TokenAccount {
1412 mint: mint2_pubkey,
1413 owner: wallet2_pubkey,
1414 delegate: COption::Some(delegate),
1415 amount: 42,
1416 state: TokenAccountState::Initialized,
1417 is_native: COption::None,
1418 delegated_amount: 30,
1419 close_authority: COption::Some(wallet2_pubkey),
1420 };
1421 TokenAccount::pack(token_account3, &mut account3_data).unwrap();
1422 let token_account3 = AccountSharedData::from(Account {
1423 lamports: 444,
1424 data: account3_data.to_vec(),
1425 owner: token::id(),
1426 ..Account::default()
1427 });
1428 bank.store_account(&token_account3_pubkey, &token_account3);
1429
1430 let mut mint2_data = vec![0; Mint::get_packed_len()];
1432 let mint2_state = Mint {
1433 mint_authority: COption::Some(wallet2_pubkey),
1434 supply: 200,
1435 decimals: 3,
1436 is_initialized: true,
1437 freeze_authority: COption::Some(wallet2_pubkey),
1438 };
1439 Mint::pack(mint2_state, &mut mint2_data).unwrap();
1440 let mint_account2 = AccountSharedData::from(Account {
1441 lamports: 555,
1442 data: mint2_data.to_vec(),
1443 owner: token::id(),
1444 ..Account::default()
1445 });
1446 bank.store_account(&mint2_pubkey, &mint_account2);
1447
1448 if secondary_index_enabled {
1465 let req = format!(
1467 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{non_existent_pubkey}"]}}"#,
1468 );
1469 let res = io.handle_request_sync(&req, meta.clone());
1470 let result: Value = serde_json::from_str(&res.expect("actual response"))
1471 .expect("actual response deserialization");
1472 let sizes: HashMap<RpcAccountIndex, usize> =
1473 serde_json::from_value(result["result"].clone()).unwrap();
1474 assert!(sizes.is_empty());
1475 let req = format!(
1478 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{wallet1_pubkey}"]}}"#,
1479 );
1480 let res = io.handle_request_sync(&req, meta.clone());
1481 let result: Value = serde_json::from_str(&res.expect("actual response"))
1482 .expect("actual response deserialization");
1483 let sizes: HashMap<RpcAccountIndex, usize> =
1484 serde_json::from_value(result["result"].clone()).unwrap();
1485 assert_eq!(sizes.len(), 1);
1486 assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenOwner).unwrap(), 1);
1487 let req = format!(
1489 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{wallet2_pubkey}"]}}"#,
1490 );
1491 let res = io.handle_request_sync(&req, meta.clone());
1492 let result: Value = serde_json::from_str(&res.expect("actual response"))
1493 .expect("actual response deserialization");
1494 let sizes: HashMap<RpcAccountIndex, usize> =
1495 serde_json::from_value(result["result"].clone()).unwrap();
1496 assert_eq!(sizes.len(), 1);
1497 assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenOwner).unwrap(), 2);
1498 let req = format!(
1500 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{mint1_pubkey}"]}}"#,
1501 );
1502 let res = io.handle_request_sync(&req, meta.clone());
1503 let result: Value = serde_json::from_str(&res.expect("actual response"))
1504 .expect("actual response deserialization");
1505 let sizes: HashMap<RpcAccountIndex, usize> =
1506 serde_json::from_value(result["result"].clone()).unwrap();
1507 assert_eq!(sizes.len(), 1);
1508 assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenMint).unwrap(), 2);
1509 let req = format!(
1511 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{mint2_pubkey}"]}}"#,
1512 );
1513 let res = io.handle_request_sync(&req, meta.clone());
1514 let result: Value = serde_json::from_str(&res.expect("actual response"))
1515 .expect("actual response deserialization");
1516 let sizes: HashMap<RpcAccountIndex, usize> =
1517 serde_json::from_value(result["result"].clone()).unwrap();
1518 assert_eq!(sizes.len(), 1);
1519 assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenMint).unwrap(), 1);
1520 let req = format!(
1522 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1523 token::id(),
1524 );
1525 let res = io.handle_request_sync(&req, meta.clone());
1526 let result: Value = serde_json::from_str(&res.expect("actual response"))
1527 .expect("actual response deserialization");
1528 let sizes: HashMap<RpcAccountIndex, usize> =
1529 serde_json::from_value(result["result"].clone()).unwrap();
1530 assert_eq!(sizes.len(), 1);
1531 assert_eq!(
1532 *sizes.get(&RpcAccountIndex::ProgramId).unwrap(),
1533 (num_default_spl_token_program_accounts + 5)
1534 );
1535 let req = format!(
1537 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1538 system_program::id(),
1539 );
1540 let res = io.handle_request_sync(&req, meta.clone());
1541 let result: Value = serde_json::from_str(&res.expect("actual response"))
1542 .expect("actual response deserialization");
1543 let sizes: HashMap<RpcAccountIndex, usize> =
1544 serde_json::from_value(result["result"].clone()).unwrap();
1545 assert_eq!(sizes.len(), 1);
1546 assert_eq!(
1547 *sizes.get(&RpcAccountIndex::ProgramId).unwrap(),
1548 (num_default_system_program_accounts + 2)
1549 );
1550 } else {
1551 let req = format!(
1553 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{token_account2_pubkey}"]}}"#,
1554 );
1555 let res = io.handle_request_sync(&req, meta.clone());
1556 let result: Value = serde_json::from_str(&res.expect("actual response"))
1557 .expect("actual response deserialization");
1558 let sizes: HashMap<RpcAccountIndex, usize> =
1559 serde_json::from_value(result["result"].clone()).unwrap();
1560 assert!(sizes.is_empty());
1561 }
1562 }
1563 }
1564
1565 #[test]
1568 fn test_set_identity() {
1569 let rpc = RpcHandler::start_with_config(TestConfig::default());
1570
1571 let RpcHandler { io, meta, .. } = rpc;
1572
1573 let expected_validator_id = Keypair::new();
1574 let validator_id_bytes = format!("{:?}", expected_validator_id.to_bytes());
1575
1576 let set_id_request = format!(
1577 r#"{{"jsonrpc":"2.0","id":1,"method":"setIdentityFromBytes","params":[{validator_id_bytes}, false]}}"#,
1578 );
1579 let response = io.handle_request_sync(&set_id_request, meta.clone());
1580 let actual_parsed_response: Value =
1581 serde_json::from_str(&response.expect("actual response"))
1582 .expect("actual response deserialization");
1583
1584 let expected_parsed_response: Value = serde_json::from_str(
1585 r#"{
1586 "id": 1,
1587 "jsonrpc": "2.0",
1588 "result": null
1589 }"#,
1590 )
1591 .expect("Failed to parse expected response");
1592 assert_eq!(actual_parsed_response, expected_parsed_response);
1593
1594 let contact_info_request =
1595 r#"{"jsonrpc":"2.0","id":1,"method":"contactInfo","params":[]}"#.to_string();
1596 let response = io.handle_request_sync(&contact_info_request, meta.clone());
1597 let parsed_response: Value = serde_json::from_str(&response.expect("actual response"))
1598 .expect("actual response deserialization");
1599 let actual_validator_id = parsed_response["result"]["id"]
1600 .as_str()
1601 .expect("Expected a string");
1602 assert_eq!(
1603 actual_validator_id,
1604 expected_validator_id.pubkey().to_string()
1605 );
1606 }
1607
1608 struct TestValidatorWithAdminRpc {
1609 meta: AdminRpcRequestMetadata,
1610 io: MetaIoHandler<AdminRpcRequestMetadata>,
1611 validator_ledger_path: PathBuf,
1612 }
1613
1614 impl TestValidatorWithAdminRpc {
1615 fn new() -> Self {
1616 let leader_keypair = Keypair::new();
1617 let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
1618
1619 let validator_keypair = Keypair::new();
1620 let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
1621 let genesis_config =
1622 create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1000)
1623 .genesis_config;
1624 let (validator_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
1625
1626 let voting_keypair = Arc::new(Keypair::new());
1627 let voting_pubkey = voting_keypair.pubkey();
1628 let authorized_voter_keypairs = Arc::new(RwLock::new(vec![voting_keypair]));
1629 let validator_config = ValidatorConfig {
1630 rpc_addrs: Some((
1631 validator_node.info.rpc().unwrap(),
1632 validator_node.info.rpc_pubsub().unwrap(),
1633 )),
1634 ..ValidatorConfig::default_for_test()
1635 };
1636 let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
1637
1638 let post_init = Arc::new(RwLock::new(None));
1639 let meta = AdminRpcRequestMetadata {
1640 rpc_addr: validator_config.rpc_addrs.map(|(rpc_addr, _)| rpc_addr),
1641 start_time: SystemTime::now(),
1642 start_progress: start_progress.clone(),
1643 validator_exit: validator_config.validator_exit.clone(),
1644 validator_exit_backpressure: HashMap::default(),
1645 authorized_voter_keypairs: authorized_voter_keypairs.clone(),
1646 tower_storage: Arc::new(NullTowerStorage {}),
1647 post_init: post_init.clone(),
1648 staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
1649 rpc_to_plugin_manager_sender: None,
1650 };
1651
1652 let _validator = Validator::new(
1653 validator_node,
1654 Arc::new(validator_keypair),
1655 &validator_ledger_path,
1656 &voting_pubkey,
1657 authorized_voter_keypairs,
1658 vec![leader_node.info],
1659 &validator_config,
1660 true, None, start_progress.clone(),
1663 SocketAddrSpace::Unspecified,
1664 ValidatorTpuConfig::new_for_tests(),
1665 post_init.clone(),
1666 None,
1667 )
1668 .expect("assume successful validator start");
1669 assert_eq!(
1670 *start_progress.read().unwrap(),
1671 ValidatorStartProgress::Running
1672 );
1673 let post_init = post_init.read().unwrap();
1674
1675 assert!(post_init.is_some());
1676 let post_init = post_init.as_ref().unwrap();
1677 let notifies = post_init.notifies.read().unwrap();
1678 let updater_keys: HashSet<KeyUpdaterType> =
1679 notifies.into_iter().map(|(key, _)| key.clone()).collect();
1680 assert_eq!(
1681 updater_keys,
1682 HashSet::from_iter(vec![
1683 KeyUpdaterType::Tpu,
1684 KeyUpdaterType::TpuForwards,
1685 KeyUpdaterType::TpuVote,
1686 KeyUpdaterType::Forward,
1687 KeyUpdaterType::RpcService,
1688 KeyUpdaterType::Bls,
1689 KeyUpdaterType::BlsConnectionCache,
1690 ])
1691 );
1692 let mut io = MetaIoHandler::default();
1693 io.extend_with(AdminRpcImpl.to_delegate());
1694 Self {
1695 meta,
1696 io,
1697 validator_ledger_path,
1698 }
1699 }
1700
1701 fn handle_request(&self, request: &str) -> Option<String> {
1702 self.io.handle_request_sync(request, self.meta.clone())
1703 }
1704 }
1705
1706 impl Drop for TestValidatorWithAdminRpc {
1707 fn drop(&mut self) {
1708 remove_dir_all(self.validator_ledger_path.clone()).unwrap();
1709 }
1710 }
1711
1712 #[test]
1713 fn test_no_post_init_no_snapshot_controller() {
1714 let validator_exit = create_validator_exit(Arc::new(AtomicBool::new(false)));
1715 let voting_keypair = Arc::new(Keypair::new());
1716 let authorized_voter_keypairs = Arc::new(RwLock::new(vec![voting_keypair]));
1717 let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
1718
1719 let post_init = Arc::new(RwLock::new(None));
1720 let meta = AdminRpcRequestMetadata {
1721 rpc_addr: None,
1722 start_time: SystemTime::now(),
1723 start_progress: start_progress.clone(),
1724 validator_exit,
1725 validator_exit_backpressure: HashMap::default(),
1726 authorized_voter_keypairs: authorized_voter_keypairs.clone(),
1727 tower_storage: Arc::new(NullTowerStorage {}),
1728 post_init: post_init.clone(),
1729 staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
1730 rpc_to_plugin_manager_sender: None,
1731 };
1732
1733 let snapshot_controller = meta.snapshot_controller();
1734 assert!(snapshot_controller.is_none());
1735 }
1736
1737 #[test]
1739 fn test_set_identity_with_validator() {
1740 let test_validator = TestValidatorWithAdminRpc::new();
1741 let expected_validator_id = Keypair::new();
1742 let validator_id_bytes = format!("{:?}", expected_validator_id.to_bytes());
1743
1744 let set_id_request = format!(
1745 r#"{{"jsonrpc":"2.0","id":1,"method":"setIdentityFromBytes","params":[{validator_id_bytes}, false]}}"#,
1746 );
1747 let response = test_validator.handle_request(&set_id_request);
1748 let actual_parsed_response: Value =
1749 serde_json::from_str(&response.expect("actual response"))
1750 .expect("actual response deserialization");
1751
1752 let expected_parsed_response: Value = serde_json::from_str(
1753 r#"{
1754 "id": 1,
1755 "jsonrpc": "2.0",
1756 "result": null
1757 }"#,
1758 )
1759 .expect("Failed to parse expected response");
1760 assert_eq!(actual_parsed_response, expected_parsed_response);
1761
1762 let contact_info_request =
1763 r#"{"jsonrpc":"2.0","id":1,"method":"contactInfo","params":[]}"#.to_string();
1764 let response = test_validator.handle_request(&contact_info_request);
1765 let parsed_response: Value = serde_json::from_str(&response.expect("actual response"))
1766 .expect("actual response deserialization");
1767 let actual_validator_id = parsed_response["result"]["id"]
1768 .as_str()
1769 .expect("Expected a string");
1770 assert_eq!(
1771 actual_validator_id,
1772 expected_validator_id.pubkey().to_string()
1773 );
1774
1775 let contact_info_request =
1776 r#"{"jsonrpc":"2.0","id":1,"method":"exit","params":[]}"#.to_string();
1777 let exit_response = test_validator.handle_request(&contact_info_request);
1778 let actual_parsed_response: Value =
1779 serde_json::from_str(&exit_response.expect("actual response"))
1780 .expect("actual response deserialization");
1781 assert_eq!(actual_parsed_response, expected_parsed_response);
1782 }
1783
1784 #[test]
1785 fn test_is_generating_snapshots() {
1786 let rpc = RpcHandler::start_with_config(TestConfig::default());
1788 let RpcHandler { io, meta, .. } = rpc;
1789
1790 let request = r#"{"jsonrpc":"2.0","id":1,"method":"isGeneratingSnapshots","params":[]}"#;
1791 let response = io.handle_request_sync(request, meta.clone());
1792 let result: Value = serde_json::from_str(&response.expect("actual response"))
1793 .expect("actual response deserialization");
1794
1795 assert!(result["result"].is_boolean());
1797 assert!(result["result"].as_bool().unwrap());
1799 }
1800
1801 #[test]
1802 fn test_is_generating_snapshots_no_controller() {
1803 let rpc = RpcHandler::start_with_config(TestConfig::default());
1805 let RpcHandler { io, .. } = rpc;
1806
1807 let request = r#"{"jsonrpc":"2.0","id":1,"method":"isGeneratingSnapshots","params":[]}"#;
1809 let validator_exit = create_validator_exit(Arc::new(AtomicBool::new(false)));
1810 let authorized_voter_keypairs = Arc::new(RwLock::new(vec![Arc::new(Keypair::new())]));
1811 let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
1812
1813 let meta_no_post_init = AdminRpcRequestMetadata {
1814 rpc_addr: None,
1815 start_time: SystemTime::now(),
1816 start_progress,
1817 validator_exit,
1818 validator_exit_backpressure: HashMap::default(),
1819 authorized_voter_keypairs,
1820 tower_storage: Arc::new(NullTowerStorage {}),
1821 post_init: Arc::new(RwLock::new(None)),
1822 staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
1823 rpc_to_plugin_manager_sender: None,
1824 };
1825
1826 let response = io.handle_request_sync(request, meta_no_post_init);
1827 let result: Value = serde_json::from_str(&response.expect("actual response"))
1828 .expect("actual response deserialization");
1829
1830 assert!(result["error"].is_object());
1832 assert_eq!(
1833 result["error"]["message"].as_str().unwrap(),
1834 "snapshot_controller unavailable"
1835 );
1836 }
1837}