1use {
2 crossbeam_channel::Sender,
3 jsonrpc_core::{BoxFuture, ErrorCode, MetaIoHandler, Metadata, Result},
4 jsonrpc_core_client::{transports::ipc, RpcError},
5 jsonrpc_derive::rpc,
6 jsonrpc_ipc_server::{
7 tokio::sync::oneshot::channel as oneshot_channel, RequestContext, ServerBuilder,
8 },
9 log::*,
10 serde::{de::Deserializer, Deserialize, Serialize},
11 solana_accounts_db::accounts_index::AccountIndex,
12 solana_core::{
13 admin_rpc_post_init::AdminRpcRequestMetadataPostInit,
14 banking_stage::{
15 transaction_scheduler::scheduler_controller::SchedulerConfig, BankingStage,
16 },
17 consensus::{tower_storage::TowerStorage, Tower},
18 repair::repair_service,
19 validator::{
20 BlockProductionMethod, SchedulerPacing, TransactionStructure, ValidatorStartProgress,
21 },
22 },
23 solana_geyser_plugin_manager::GeyserPluginManagerRequest,
24 solana_gossip::contact_info::{ContactInfo, Protocol, SOCKET_ADDR_UNSPECIFIED},
25 solana_keypair::{read_keypair_file, Keypair},
26 solana_pubkey::Pubkey,
27 solana_rpc::rpc::verify_pubkey,
28 solana_rpc_client_api::{config::RpcAccountIndex, custom_error::RpcCustomError},
29 solana_signer::Signer,
30 solana_validator_exit::Exit,
31 std::{
32 collections::{HashMap, HashSet},
33 env, error,
34 fmt::{self, Display},
35 net::{IpAddr, SocketAddr},
36 num::NonZeroUsize,
37 path::{Path, PathBuf},
38 sync::{
39 atomic::{AtomicBool, Ordering},
40 Arc, RwLock,
41 },
42 thread::{self, Builder},
43 time::{Duration, SystemTime},
44 },
45 tokio::runtime::Runtime,
46};
47
48#[derive(Clone)]
49pub struct AdminRpcRequestMetadata {
50 pub rpc_addr: Option<SocketAddr>,
51 pub start_time: SystemTime,
52 pub start_progress: Arc<RwLock<ValidatorStartProgress>>,
53 pub validator_exit: Arc<RwLock<Exit>>,
54 pub validator_exit_backpressure: HashMap<String, Arc<AtomicBool>>,
55 pub authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
56 pub tower_storage: Arc<dyn TowerStorage>,
57 pub staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
58 pub post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
59 pub rpc_to_plugin_manager_sender: Option<Sender<GeyserPluginManagerRequest>>,
60}
61
62impl Metadata for AdminRpcRequestMetadata {}
63
64impl AdminRpcRequestMetadata {
65 fn with_post_init<F, R>(&self, func: F) -> Result<R>
66 where
67 F: FnOnce(&AdminRpcRequestMetadataPostInit) -> Result<R>,
68 {
69 if let Some(post_init) = self.post_init.read().unwrap().as_ref() {
70 func(post_init)
71 } else {
72 Err(jsonrpc_core::error::Error::invalid_params(
73 "Retry once validator start up is complete",
74 ))
75 }
76 }
77}
78
79#[derive(Debug, Deserialize, Serialize)]
80pub struct AdminRpcContactInfo {
81 pub id: String,
82 pub gossip: SocketAddr,
83 pub tvu: SocketAddr,
84 pub tvu_quic: SocketAddr,
85 pub serve_repair_quic: SocketAddr,
86 pub tpu: SocketAddr,
87 pub tpu_forwards: SocketAddr,
88 pub tpu_vote: SocketAddr,
89 pub rpc: SocketAddr,
90 pub rpc_pubsub: SocketAddr,
91 pub serve_repair: SocketAddr,
92 pub last_updated_timestamp: u64,
93 pub shred_version: u16,
94}
95
96#[derive(Debug, Deserialize, Serialize)]
97pub struct AdminRpcRepairWhitelist {
98 pub whitelist: Vec<Pubkey>,
99}
100
101impl From<ContactInfo> for AdminRpcContactInfo {
102 fn from(node: ContactInfo) -> Self {
103 macro_rules! unwrap_socket {
104 ($name:ident) => {
105 node.$name().unwrap_or(SOCKET_ADDR_UNSPECIFIED)
106 };
107 ($name:ident, $protocol:expr) => {
108 node.$name($protocol).unwrap_or(SOCKET_ADDR_UNSPECIFIED)
109 };
110 }
111 Self {
112 id: node.pubkey().to_string(),
113 last_updated_timestamp: node.wallclock(),
114 gossip: unwrap_socket!(gossip),
115 tvu: unwrap_socket!(tvu, Protocol::UDP),
116 tvu_quic: unwrap_socket!(tvu, Protocol::QUIC),
117 serve_repair_quic: unwrap_socket!(serve_repair, Protocol::QUIC),
118 tpu: unwrap_socket!(tpu, Protocol::UDP),
119 tpu_forwards: unwrap_socket!(tpu_forwards, Protocol::UDP),
120 tpu_vote: unwrap_socket!(tpu_vote, Protocol::UDP),
121 rpc: unwrap_socket!(rpc),
122 rpc_pubsub: unwrap_socket!(rpc_pubsub),
123 serve_repair: unwrap_socket!(serve_repair, Protocol::UDP),
124 shred_version: node.shred_version(),
125 }
126 }
127}
128
129impl Display for AdminRpcContactInfo {
130 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
131 writeln!(f, "Identity: {}", self.id)?;
132 writeln!(f, "Gossip: {}", self.gossip)?;
133 writeln!(f, "TVU: {}", self.tvu)?;
134 writeln!(f, "TVU QUIC: {}", self.tvu_quic)?;
135 writeln!(f, "TPU: {}", self.tpu)?;
136 writeln!(f, "TPU Forwards: {}", self.tpu_forwards)?;
137 writeln!(f, "TPU Votes: {}", self.tpu_vote)?;
138 writeln!(f, "RPC: {}", self.rpc)?;
139 writeln!(f, "RPC Pubsub: {}", self.rpc_pubsub)?;
140 writeln!(f, "Serve Repair: {}", self.serve_repair)?;
141 writeln!(f, "Last Updated Timestamp: {}", self.last_updated_timestamp)?;
142 writeln!(f, "Shred Version: {}", self.shred_version)
143 }
144}
145impl solana_cli_output::VerboseDisplay for AdminRpcContactInfo {}
146impl solana_cli_output::QuietDisplay for AdminRpcContactInfo {}
147
148impl Display for AdminRpcRepairWhitelist {
149 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
150 writeln!(f, "Repair whitelist: {:?}", &self.whitelist)
151 }
152}
153impl solana_cli_output::VerboseDisplay for AdminRpcRepairWhitelist {}
154impl solana_cli_output::QuietDisplay for AdminRpcRepairWhitelist {}
155
156#[rpc]
157pub trait AdminRpc {
158 type Metadata;
159
160 #[rpc(meta, name = "exit")]
163 fn exit(&self, meta: Self::Metadata) -> Result<()>;
164
165 #[rpc(meta, name = "pid")]
167 fn pid(&self, meta: Self::Metadata) -> Result<u32>;
168
169 #[rpc(meta, name = "reloadPlugin")]
170 fn reload_plugin(
171 &self,
172 meta: Self::Metadata,
173 name: String,
174 config_file: String,
175 ) -> BoxFuture<Result<()>>;
176
177 #[rpc(meta, name = "unloadPlugin")]
178 fn unload_plugin(&self, meta: Self::Metadata, name: String) -> BoxFuture<Result<()>>;
179
180 #[rpc(meta, name = "loadPlugin")]
181 fn load_plugin(&self, meta: Self::Metadata, config_file: String) -> BoxFuture<Result<String>>;
182
183 #[rpc(meta, name = "listPlugins")]
184 fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture<Result<Vec<String>>>;
185
186 #[rpc(meta, name = "rpcAddress")]
187 fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>>;
188
189 #[rpc(name = "setLogFilter")]
190 fn set_log_filter(&self, filter: String) -> Result<()>;
191
192 #[rpc(meta, name = "startTime")]
193 fn start_time(&self, meta: Self::Metadata) -> Result<SystemTime>;
194
195 #[rpc(meta, name = "startProgress")]
196 fn start_progress(&self, meta: Self::Metadata) -> Result<ValidatorStartProgress>;
197
198 #[rpc(meta, name = "addAuthorizedVoter")]
199 fn add_authorized_voter(&self, meta: Self::Metadata, keypair_file: String) -> Result<()>;
200
201 #[rpc(meta, name = "addAuthorizedVoterFromBytes")]
202 fn add_authorized_voter_from_bytes(&self, meta: Self::Metadata, keypair: Vec<u8>)
203 -> Result<()>;
204
205 #[rpc(meta, name = "removeAllAuthorizedVoters")]
206 fn remove_all_authorized_voters(&self, meta: Self::Metadata) -> Result<()>;
207
208 #[rpc(meta, name = "setIdentity")]
209 fn set_identity(
210 &self,
211 meta: Self::Metadata,
212 keypair_file: String,
213 require_tower: bool,
214 ) -> Result<()>;
215
216 #[rpc(meta, name = "setIdentityFromBytes")]
217 fn set_identity_from_bytes(
218 &self,
219 meta: Self::Metadata,
220 identity_keypair: Vec<u8>,
221 require_tower: bool,
222 ) -> Result<()>;
223
224 #[rpc(meta, name = "setStakedNodesOverrides")]
225 fn set_staked_nodes_overrides(&self, meta: Self::Metadata, path: String) -> Result<()>;
226
227 #[rpc(meta, name = "contactInfo")]
228 fn contact_info(&self, meta: Self::Metadata) -> Result<AdminRpcContactInfo>;
229
230 #[rpc(meta, name = "selectActiveInterface")]
231 fn select_active_interface(&self, meta: Self::Metadata, interface: IpAddr) -> Result<()>;
232
233 #[rpc(meta, name = "repairShredFromPeer")]
234 fn repair_shred_from_peer(
235 &self,
236 meta: Self::Metadata,
237 pubkey: Option<Pubkey>,
238 slot: u64,
239 shred_index: u64,
240 ) -> Result<()>;
241
242 #[rpc(meta, name = "repairWhitelist")]
243 fn repair_whitelist(&self, meta: Self::Metadata) -> Result<AdminRpcRepairWhitelist>;
244
245 #[rpc(meta, name = "setRepairWhitelist")]
246 fn set_repair_whitelist(&self, meta: Self::Metadata, whitelist: Vec<Pubkey>) -> Result<()>;
247
248 #[rpc(meta, name = "getSecondaryIndexKeySize")]
249 fn get_secondary_index_key_size(
250 &self,
251 meta: Self::Metadata,
252 pubkey_str: String,
253 ) -> Result<HashMap<RpcAccountIndex, usize>>;
254
255 #[rpc(meta, name = "setPublicTpuAddress")]
256 fn set_public_tpu_address(
257 &self,
258 meta: Self::Metadata,
259 public_tpu_addr: SocketAddr,
260 ) -> Result<()>;
261
262 #[rpc(meta, name = "setPublicTpuForwardsAddress")]
263 fn set_public_tpu_forwards_address(
264 &self,
265 meta: Self::Metadata,
266 public_tpu_forwards_addr: SocketAddr,
267 ) -> Result<()>;
268
269 #[rpc(meta, name = "manageBlockProduction")]
270 fn manage_block_production(
271 &self,
272 meta: Self::Metadata,
273 block_production_method: BlockProductionMethod,
274 transaction_struct: TransactionStructure,
275 num_workers: NonZeroUsize,
276 scheduler_pacing: SchedulerPacing,
277 ) -> Result<()>;
278}
279
280pub struct AdminRpcImpl;
281impl AdminRpc for AdminRpcImpl {
282 type Metadata = AdminRpcRequestMetadata;
283
284 fn exit(&self, meta: Self::Metadata) -> Result<()> {
285 debug!("exit admin rpc request received");
286
287 thread::Builder::new()
288 .name("solProcessExit".into())
289 .spawn(move || {
290 thread::sleep(Duration::from_millis(100));
293
294 info!("validator exit requested");
295 meta.validator_exit.write().unwrap().exit();
296
297 if !meta.validator_exit_backpressure.is_empty() {
298 let service_names = meta.validator_exit_backpressure.keys();
299 info!("Wait for these services to complete: {service_names:?}");
300 loop {
301 thread::sleep(Duration::from_secs(1));
305
306 let mut any_flags_raised = false;
307 for (name, flag) in meta.validator_exit_backpressure.iter() {
308 let is_flag_raised = flag.load(Ordering::Relaxed);
309 if is_flag_raised {
310 info!("{name}'s exit backpressure flag is raised");
311 any_flags_raised = true;
312 }
313 }
314 if !any_flags_raised {
315 break;
316 }
317 }
318 info!("All services have completed");
319 }
320
321 thread::sleep(Duration::from_secs(
326 env::var("SOLANA_VALIDATOR_EXIT_TIMEOUT")
327 .ok()
328 .and_then(|x| x.parse().ok())
329 .unwrap_or(5),
330 ));
331 warn!("validator exit timeout");
332 std::process::exit(0);
333 })
334 .unwrap();
335
336 Ok(())
337 }
338
339 fn pid(&self, _meta: Self::Metadata) -> Result<u32> {
340 Ok(std::process::id())
341 }
342
343 fn reload_plugin(
344 &self,
345 meta: Self::Metadata,
346 name: String,
347 config_file: String,
348 ) -> BoxFuture<Result<()>> {
349 Box::pin(async move {
350 let (response_sender, response_receiver) = oneshot_channel();
352
353 if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
355 rpc_to_manager_sender
356 .send(GeyserPluginManagerRequest::ReloadPlugin {
357 name,
358 config_file,
359 response_sender,
360 })
361 .expect("GeyerPluginService should never drop request receiver");
362 } else {
363 return Err(jsonrpc_core::Error {
364 code: ErrorCode::InvalidRequest,
365 message: "No geyser plugin service".to_string(),
366 data: None,
367 });
368 }
369
370 response_receiver
372 .await
373 .expect("GeyerPluginService's oneshot sender shouldn't drop early")
374 })
375 }
376
377 fn load_plugin(&self, meta: Self::Metadata, config_file: String) -> BoxFuture<Result<String>> {
378 Box::pin(async move {
379 let (response_sender, response_receiver) = oneshot_channel();
381
382 if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
384 rpc_to_manager_sender
385 .send(GeyserPluginManagerRequest::LoadPlugin {
386 config_file,
387 response_sender,
388 })
389 .expect("GeyerPluginService should never drop request receiver");
390 } else {
391 return Err(jsonrpc_core::Error {
392 code: ErrorCode::InvalidRequest,
393 message: "No geyser plugin service".to_string(),
394 data: None,
395 });
396 }
397
398 response_receiver
400 .await
401 .expect("GeyerPluginService's oneshot sender shouldn't drop early")
402 })
403 }
404
405 fn unload_plugin(&self, meta: Self::Metadata, name: String) -> BoxFuture<Result<()>> {
406 Box::pin(async move {
407 let (response_sender, response_receiver) = oneshot_channel();
409
410 if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
412 rpc_to_manager_sender
413 .send(GeyserPluginManagerRequest::UnloadPlugin {
414 name,
415 response_sender,
416 })
417 .expect("GeyerPluginService should never drop request receiver");
418 } else {
419 return Err(jsonrpc_core::Error {
420 code: ErrorCode::InvalidRequest,
421 message: "No geyser plugin service".to_string(),
422 data: None,
423 });
424 }
425
426 response_receiver
428 .await
429 .expect("GeyerPluginService's oneshot sender shouldn't drop early")
430 })
431 }
432
433 fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture<Result<Vec<String>>> {
434 Box::pin(async move {
435 let (response_sender, response_receiver) = oneshot_channel();
437
438 if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
440 rpc_to_manager_sender
441 .send(GeyserPluginManagerRequest::ListPlugins { response_sender })
442 .expect("GeyerPluginService should never drop request receiver");
443 } else {
444 return Err(jsonrpc_core::Error {
445 code: ErrorCode::InvalidRequest,
446 message: "No geyser plugin service".to_string(),
447 data: None,
448 });
449 }
450
451 response_receiver
453 .await
454 .expect("GeyerPluginService's oneshot sender shouldn't drop early")
455 })
456 }
457
458 fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>> {
459 debug!("rpc_addr admin rpc request received");
460 Ok(meta.rpc_addr)
461 }
462
463 fn set_log_filter(&self, filter: String) -> Result<()> {
464 debug!("set_log_filter admin rpc request received");
465 agave_logger::setup_with(&filter);
466 Ok(())
467 }
468
469 fn start_time(&self, meta: Self::Metadata) -> Result<SystemTime> {
470 debug!("start_time admin rpc request received");
471 Ok(meta.start_time)
472 }
473
474 fn start_progress(&self, meta: Self::Metadata) -> Result<ValidatorStartProgress> {
475 debug!("start_progress admin rpc request received");
476 Ok(*meta.start_progress.read().unwrap())
477 }
478
479 fn add_authorized_voter(&self, meta: Self::Metadata, keypair_file: String) -> Result<()> {
480 debug!("add_authorized_voter request received");
481
482 let authorized_voter = read_keypair_file(keypair_file)
483 .map_err(|err| jsonrpc_core::error::Error::invalid_params(format!("{err}")))?;
484
485 AdminRpcImpl::add_authorized_voter_keypair(meta, authorized_voter)
486 }
487
488 fn add_authorized_voter_from_bytes(
489 &self,
490 meta: Self::Metadata,
491 keypair: Vec<u8>,
492 ) -> Result<()> {
493 debug!("add_authorized_voter_from_bytes request received");
494
495 let authorized_voter = Keypair::try_from(keypair.as_ref()).map_err(|err| {
496 jsonrpc_core::error::Error::invalid_params(format!(
497 "Failed to read authorized voter keypair from provided byte array: {err}"
498 ))
499 })?;
500
501 AdminRpcImpl::add_authorized_voter_keypair(meta, authorized_voter)
502 }
503
504 fn remove_all_authorized_voters(&self, meta: Self::Metadata) -> Result<()> {
505 debug!("remove_all_authorized_voters received");
506 meta.authorized_voter_keypairs.write().unwrap().clear();
507 Ok(())
508 }
509
510 fn set_identity(
511 &self,
512 meta: Self::Metadata,
513 keypair_file: String,
514 require_tower: bool,
515 ) -> Result<()> {
516 debug!("set_identity request received");
517
518 let identity_keypair = read_keypair_file(&keypair_file).map_err(|err| {
519 jsonrpc_core::error::Error::invalid_params(format!(
520 "Failed to read identity keypair from {keypair_file}: {err}"
521 ))
522 })?;
523
524 AdminRpcImpl::set_identity_keypair(meta, identity_keypair, require_tower)
525 }
526
527 fn set_identity_from_bytes(
528 &self,
529 meta: Self::Metadata,
530 identity_keypair: Vec<u8>,
531 require_tower: bool,
532 ) -> Result<()> {
533 debug!("set_identity_from_bytes request received");
534
535 let identity_keypair = Keypair::try_from(identity_keypair.as_ref()).map_err(|err| {
536 jsonrpc_core::error::Error::invalid_params(format!(
537 "Failed to read identity keypair from provided byte array: {err}"
538 ))
539 })?;
540
541 AdminRpcImpl::set_identity_keypair(meta, identity_keypair, require_tower)
542 }
543
544 fn set_staked_nodes_overrides(&self, meta: Self::Metadata, path: String) -> Result<()> {
545 let loaded_config = load_staked_nodes_overrides(&path)
546 .map_err(|err| {
547 error!(
548 "Failed to load staked nodes overrides from {}: {}",
549 &path, err
550 );
551 jsonrpc_core::error::Error::internal_error()
552 })?
553 .staked_map_id;
554 let mut write_staked_nodes = meta.staked_nodes_overrides.write().unwrap();
555 write_staked_nodes.clear();
556 write_staked_nodes.extend(loaded_config);
557 info!("Staked nodes overrides loaded from {path}");
558 debug!("overrides map: {write_staked_nodes:?}");
559 Ok(())
560 }
561
562 fn contact_info(&self, meta: Self::Metadata) -> Result<AdminRpcContactInfo> {
563 meta.with_post_init(|post_init| Ok(post_init.cluster_info.my_contact_info().into()))
564 }
565
566 fn select_active_interface(&self, meta: Self::Metadata, interface: IpAddr) -> Result<()> {
567 debug!("select_active_interface received: {interface}");
568 meta.with_post_init(|post_init| {
569 let node = post_init.node.as_ref().ok_or_else(|| {
570 jsonrpc_core::Error::invalid_params("`Node` not initialized in post_init")
571 })?;
572
573 node.switch_active_interface(interface, &post_init.cluster_info)
574 .map_err(|e| {
575 jsonrpc_core::Error::invalid_params(format!(
576 "Switching failed due to error {e}"
577 ))
578 })?;
579 info!("Switched primary interface to {interface}");
580 Ok(())
581 })
582 }
583
584 fn repair_shred_from_peer(
585 &self,
586 meta: Self::Metadata,
587 pubkey: Option<Pubkey>,
588 slot: u64,
589 shred_index: u64,
590 ) -> Result<()> {
591 debug!("repair_shred_from_peer request received");
592
593 meta.with_post_init(|post_init| {
594 repair_service::RepairService::request_repair_for_shred_from_peer(
595 post_init.cluster_info.clone(),
596 post_init.cluster_slots.clone(),
597 pubkey,
598 slot,
599 shred_index,
600 &post_init.repair_socket.clone(),
601 post_init.outstanding_repair_requests.clone(),
602 );
603 Ok(())
604 })
605 }
606
607 fn repair_whitelist(&self, meta: Self::Metadata) -> Result<AdminRpcRepairWhitelist> {
608 debug!("repair_whitelist request received");
609
610 meta.with_post_init(|post_init| {
611 let whitelist: Vec<_> = post_init
612 .repair_whitelist
613 .read()
614 .unwrap()
615 .iter()
616 .copied()
617 .collect();
618 Ok(AdminRpcRepairWhitelist { whitelist })
619 })
620 }
621
622 fn set_repair_whitelist(&self, meta: Self::Metadata, whitelist: Vec<Pubkey>) -> Result<()> {
623 debug!("set_repair_whitelist request received");
624
625 let whitelist: HashSet<Pubkey> = whitelist.into_iter().collect();
626 meta.with_post_init(|post_init| {
627 *post_init.repair_whitelist.write().unwrap() = whitelist;
628 warn!(
629 "Repair whitelist set to {:?}",
630 &post_init.repair_whitelist.read().unwrap()
631 );
632 Ok(())
633 })
634 }
635
636 fn get_secondary_index_key_size(
637 &self,
638 meta: Self::Metadata,
639 pubkey_str: String,
640 ) -> Result<HashMap<RpcAccountIndex, usize>> {
641 debug!("get_secondary_index_key_size rpc request received: {pubkey_str:?}");
642 let index_key = verify_pubkey(&pubkey_str)?;
643 meta.with_post_init(|post_init| {
644 let bank = post_init.bank_forks.read().unwrap().root_bank();
645
646 let enabled_account_indexes = &bank.accounts().accounts_db.account_indexes;
648
649 if enabled_account_indexes.is_empty() {
651 debug!("get_secondary_index_key_size: secondary index not enabled.");
652 return Ok(HashMap::new());
653 };
654
655 if !enabled_account_indexes.include_key(&index_key) {
657 return Err(RpcCustomError::KeyExcludedFromSecondaryIndex {
658 index_key: index_key.to_string(),
659 }
660 .into());
661 }
662
663 let accounts_index = &bank.accounts().accounts_db.accounts_index;
665
666 let found_sizes = enabled_account_indexes
668 .indexes
669 .iter()
670 .filter_map(|index| {
671 accounts_index
672 .get_index_key_size(index, &index_key)
673 .map(|size| (rpc_account_index_from_account_index(index), size))
674 })
675 .collect::<HashMap<_, _>>();
676
677 if found_sizes.is_empty() {
679 debug!("get_secondary_index_key_size: key not found in the secondary index.");
680 }
681 Ok(found_sizes)
682 })
683 }
684
685 fn set_public_tpu_address(
686 &self,
687 meta: Self::Metadata,
688 public_tpu_addr: SocketAddr,
689 ) -> Result<()> {
690 debug!("set_public_tpu_address rpc request received: {public_tpu_addr}");
691
692 meta.with_post_init(|post_init| {
693 post_init
694 .cluster_info
695 .my_contact_info()
696 .tpu(Protocol::UDP)
697 .ok_or_else(|| {
698 error!(
699 "The public TPU address isn't being published. The node is likely in \
700 repair mode. See help for --restricted-repair-only-mode for more \
701 information."
702 );
703 jsonrpc_core::error::Error::internal_error()
704 })?;
705 post_init
706 .cluster_info
707 .set_tpu(public_tpu_addr)
708 .map_err(|err| {
709 error!("Failed to set public TPU address to {public_tpu_addr}: {err}");
710 jsonrpc_core::error::Error::internal_error()
711 })?;
712 let my_contact_info = post_init.cluster_info.my_contact_info();
713 warn!(
714 "Public TPU addresses set to {:?} (udp) and {:?} (quic)",
715 my_contact_info.tpu(Protocol::UDP),
716 my_contact_info.tpu(Protocol::QUIC),
717 );
718 Ok(())
719 })
720 }
721
722 fn set_public_tpu_forwards_address(
723 &self,
724 meta: Self::Metadata,
725 public_tpu_forwards_addr: SocketAddr,
726 ) -> Result<()> {
727 debug!("set_public_tpu_forwards_address rpc request received: {public_tpu_forwards_addr}");
728
729 meta.with_post_init(|post_init| {
730 post_init
731 .cluster_info
732 .my_contact_info()
733 .tpu_forwards(Protocol::UDP)
734 .ok_or_else(|| {
735 error!(
736 "The public TPU Forwards address isn't being published. The node is \
737 likely in repair mode. See help for --restricted-repair-only-mode for \
738 more information."
739 );
740 jsonrpc_core::error::Error::internal_error()
741 })?;
742 post_init
743 .cluster_info
744 .set_tpu_forwards(public_tpu_forwards_addr)
745 .map_err(|err| {
746 error!("Failed to set public TPU address to {public_tpu_forwards_addr}: {err}");
747 jsonrpc_core::error::Error::internal_error()
748 })?;
749 let my_contact_info = post_init.cluster_info.my_contact_info();
750 warn!(
751 "Public TPU Forwards addresses set to {:?} (udp) and {:?} (quic)",
752 my_contact_info.tpu_forwards(Protocol::UDP),
753 my_contact_info.tpu_forwards(Protocol::QUIC),
754 );
755 Ok(())
756 })
757 }
758
759 fn manage_block_production(
760 &self,
761 meta: Self::Metadata,
762 block_production_method: BlockProductionMethod,
763 transaction_struct: TransactionStructure,
764 num_workers: NonZeroUsize,
765 scheduler_pacing: SchedulerPacing,
766 ) -> Result<()> {
767 debug!("manage_block_production rpc request received");
768
769 if num_workers > BankingStage::max_num_workers() {
770 return Err(jsonrpc_core::error::Error::invalid_params(format!(
771 "Number of workers ({}) exceeds maximum allowed ({})",
772 num_workers,
773 BankingStage::max_num_workers()
774 )));
775 }
776
777 if transaction_struct != TransactionStructure::View {
778 warn!("TransactionStructure::Sdk has no effect on block production");
779 }
780
781 meta.with_post_init(|post_init| {
782 let mut banking_stage = post_init.banking_stage.write().unwrap();
783 let Some(banking_stage) = banking_stage.as_mut() else {
784 error!("banking stage is not initialized");
785 return Err(jsonrpc_core::error::Error::internal_error());
786 };
787
788 banking_stage
789 .spawn_internal_threads(
790 block_production_method,
791 num_workers,
792 SchedulerConfig { scheduler_pacing },
793 )
794 .map_err(|err| {
795 error!("Failed to spawn new non-vote threads: {err:?}");
796 jsonrpc_core::error::Error::internal_error()
797 })?;
798
799 Ok(())
800 })
801 }
802}
803
804impl AdminRpcImpl {
805 fn add_authorized_voter_keypair(
806 meta: AdminRpcRequestMetadata,
807 authorized_voter: Keypair,
808 ) -> Result<()> {
809 let mut authorized_voter_keypairs = meta.authorized_voter_keypairs.write().unwrap();
810
811 if authorized_voter_keypairs
812 .iter()
813 .any(|x| x.pubkey() == authorized_voter.pubkey())
814 {
815 Err(jsonrpc_core::error::Error::invalid_params(
816 "Authorized voter already present",
817 ))
818 } else {
819 authorized_voter_keypairs.push(Arc::new(authorized_voter));
820 Ok(())
821 }
822 }
823
824 fn set_identity_keypair(
825 meta: AdminRpcRequestMetadata,
826 identity_keypair: Keypair,
827 require_tower: bool,
828 ) -> Result<()> {
829 meta.with_post_init(|post_init| {
830 if require_tower {
831 let _ = Tower::restore(meta.tower_storage.as_ref(), &identity_keypair.pubkey())
832 .map_err(|err| {
833 jsonrpc_core::error::Error::invalid_params(format!(
834 "Unable to load tower file for identity {}: {}",
835 identity_keypair.pubkey(),
836 err
837 ))
838 })?;
839 }
840
841 for (key, notifier) in &*post_init.notifies.read().unwrap() {
842 if let Err(err) = notifier.update_key(&identity_keypair) {
843 error!("Error updating network layer keypair: {err} on {key:?}");
844 }
845 }
846
847 solana_metrics::set_host_id(identity_keypair.pubkey().to_string());
848 post_init
849 .cluster_info
850 .set_keypair(Arc::new(identity_keypair));
851 warn!("Identity set to {}", post_init.cluster_info.id());
852 Ok(())
853 })
854 }
855}
856
857fn rpc_account_index_from_account_index(account_index: &AccountIndex) -> RpcAccountIndex {
858 match account_index {
859 AccountIndex::ProgramId => RpcAccountIndex::ProgramId,
860 AccountIndex::SplTokenOwner => RpcAccountIndex::SplTokenOwner,
861 AccountIndex::SplTokenMint => RpcAccountIndex::SplTokenMint,
862 }
863}
864
865pub fn run(ledger_path: &Path, metadata: AdminRpcRequestMetadata) {
867 let admin_rpc_path = admin_rpc_path(ledger_path);
868
869 let event_loop = tokio::runtime::Builder::new_multi_thread()
870 .thread_name("solAdminRpcEl")
871 .worker_threads(3) .enable_all()
873 .build()
874 .unwrap();
875
876 Builder::new()
877 .name("solAdminRpc".to_string())
878 .spawn(move || {
879 let mut io = MetaIoHandler::default();
880 io.extend_with(AdminRpcImpl.to_delegate());
881
882 let validator_exit = metadata.validator_exit.clone();
883 let server = ServerBuilder::with_meta_extractor(io, move |_req: &RequestContext| {
884 metadata.clone()
885 })
886 .event_loop_executor(event_loop.handle().clone())
887 .start(&format!("{}", admin_rpc_path.display()));
888
889 match server {
890 Err(err) => {
891 warn!("Unable to start admin rpc service: {err:?}");
892 }
893 Ok(server) => {
894 info!("started admin rpc service!");
895 let close_handle = server.close_handle();
896 validator_exit
897 .write()
898 .unwrap()
899 .register_exit(Box::new(move || {
900 close_handle.close();
901 }));
902
903 server.wait();
904 }
905 }
906 })
907 .unwrap();
908}
909
910fn admin_rpc_path(ledger_path: &Path) -> PathBuf {
911 #[cfg(target_family = "windows")]
912 {
913 if let Some(ledger_filename) = ledger_path.file_name() {
916 PathBuf::from(format!(
917 "\\\\.\\pipe\\{}-admin.rpc",
918 ledger_filename.to_string_lossy()
919 ))
920 } else {
921 PathBuf::from("\\\\.\\pipe\\admin.rpc")
922 }
923 }
924 #[cfg(not(target_family = "windows"))]
925 {
926 ledger_path.join("admin.rpc")
927 }
928}
929
930pub async fn connect(ledger_path: &Path) -> std::result::Result<gen_client::Client, RpcError> {
932 let admin_rpc_path = admin_rpc_path(ledger_path);
933 if !admin_rpc_path.exists() {
934 Err(RpcError::Client(format!(
935 "{} does not exist",
936 admin_rpc_path.display()
937 )))
938 } else {
939 ipc::connect::<_, gen_client::Client>(&format!("{}", admin_rpc_path.display())).await
940 }
941}
942
943pub fn runtime() -> Runtime {
945 tokio::runtime::Builder::new_multi_thread()
946 .thread_name("solAdminRpcRt")
947 .enable_all()
948 .worker_threads(2)
951 .build()
952 .expect("new tokio runtime")
953}
954
955#[derive(Default, Deserialize, Clone)]
956pub struct StakedNodesOverrides {
957 #[serde(deserialize_with = "deserialize_pubkey_map")]
958 pub staked_map_id: HashMap<Pubkey, u64>,
959}
960
961pub fn deserialize_pubkey_map<'de, D>(des: D) -> std::result::Result<HashMap<Pubkey, u64>, D::Error>
962where
963 D: Deserializer<'de>,
964{
965 let container: HashMap<String, u64> = serde::Deserialize::deserialize(des)?;
966 let mut container_typed: HashMap<Pubkey, u64> = HashMap::new();
967 for (key, value) in container.iter() {
968 let typed_key = Pubkey::try_from(key.as_str())
969 .map_err(|_| serde::de::Error::invalid_type(serde::de::Unexpected::Map, &"PubKey"))?;
970 container_typed.insert(typed_key, *value);
971 }
972 Ok(container_typed)
973}
974
975pub fn load_staked_nodes_overrides(
976 path: &String,
977) -> std::result::Result<StakedNodesOverrides, Box<dyn error::Error>> {
978 debug!("Loading staked nodes overrides configuration from {path}");
979 if Path::new(&path).exists() {
980 let file = std::fs::File::open(path)?;
981 Ok(serde_yaml::from_reader(file)?)
982 } else {
983 Err(format!("Staked nodes overrides provided '{path}' a non-existing file path.").into())
984 }
985}
986
987#[cfg(test)]
988mod tests {
989 use {
990 super::*,
991 serde_json::Value,
992 solana_account::{Account, AccountSharedData},
993 solana_accounts_db::{
994 accounts_db::{AccountsDbConfig, ACCOUNTS_DB_CONFIG_FOR_TESTING},
995 accounts_index::AccountSecondaryIndexes,
996 },
997 solana_core::{
998 admin_rpc_post_init::{KeyUpdaterType, KeyUpdaters},
999 consensus::tower_storage::NullTowerStorage,
1000 validator::{Validator, ValidatorConfig, ValidatorTpuConfig},
1001 },
1002 solana_gossip::{cluster_info::ClusterInfo, node::Node},
1003 solana_ledger::{
1004 create_new_tmp_ledger,
1005 genesis_utils::{
1006 create_genesis_config, create_genesis_config_with_leader, GenesisConfigInfo,
1007 },
1008 },
1009 solana_net_utils::sockets::bind_to_localhost_unique,
1010 solana_program_option::COption,
1011 solana_program_pack::Pack,
1012 solana_pubkey::Pubkey,
1013 solana_rpc::rpc::create_validator_exit,
1014 solana_runtime::{
1015 bank::{Bank, BankTestConfig},
1016 bank_forks::BankForks,
1017 },
1018 solana_streamer::socket::SocketAddrSpace,
1019 solana_system_interface::program as system_program,
1020 solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP,
1021 spl_generic_token::token,
1022 spl_token_2022_interface::state::{
1023 Account as TokenAccount, AccountState as TokenAccountState, Mint,
1024 },
1025 std::{collections::HashSet, fs::remove_dir_all, sync::atomic::AtomicBool},
1026 };
1027
1028 #[derive(Default)]
1029 struct TestConfig {
1030 account_indexes: AccountSecondaryIndexes,
1031 }
1032
1033 struct RpcHandler {
1034 io: MetaIoHandler<AdminRpcRequestMetadata>,
1035 meta: AdminRpcRequestMetadata,
1036 bank_forks: Arc<RwLock<BankForks>>,
1037 }
1038
1039 impl RpcHandler {
1040 fn _start() -> Self {
1041 Self::start_with_config(TestConfig::default())
1042 }
1043
1044 fn start_with_config(config: TestConfig) -> Self {
1045 let keypair = Arc::new(Keypair::new());
1046 let cluster_info = Arc::new(ClusterInfo::new(
1047 ContactInfo::new(
1048 keypair.pubkey(),
1049 solana_time_utils::timestamp(), 0u16, ),
1052 keypair,
1053 SocketAddrSpace::Unspecified,
1054 ));
1055 let exit = Arc::new(AtomicBool::new(false));
1056 let validator_exit = create_validator_exit(exit);
1057 let (bank_forks, vote_keypair) = new_bank_forks_with_config(BankTestConfig {
1058 accounts_db_config: AccountsDbConfig {
1059 account_indexes: Some(config.account_indexes),
1060 ..ACCOUNTS_DB_CONFIG_FOR_TESTING
1061 },
1062 });
1063 let vote_account = vote_keypair.pubkey();
1064 let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
1065 let repair_whitelist = Arc::new(RwLock::new(HashSet::new()));
1066 let meta = AdminRpcRequestMetadata {
1067 rpc_addr: None,
1068 start_time: SystemTime::now(),
1069 start_progress,
1070 validator_exit,
1071 validator_exit_backpressure: HashMap::default(),
1072 authorized_voter_keypairs: Arc::new(RwLock::new(vec![vote_keypair])),
1073 tower_storage: Arc::new(NullTowerStorage {}),
1074 post_init: Arc::new(RwLock::new(Some(AdminRpcRequestMetadataPostInit {
1075 cluster_info,
1076 bank_forks: bank_forks.clone(),
1077 vote_account,
1078 repair_whitelist,
1079 notifies: Arc::new(RwLock::new(KeyUpdaters::default())),
1080 repair_socket: Arc::new(bind_to_localhost_unique().expect("should bind")),
1081 outstanding_repair_requests: Arc::<
1082 RwLock<repair_service::OutstandingShredRepairs>,
1083 >::default(),
1084 cluster_slots: Arc::new(
1085 solana_core::cluster_slots_service::cluster_slots::ClusterSlots::default_for_tests(),
1086 ),
1087 node: None,
1088 banking_stage: Arc::new(RwLock::new(None)),
1089 }))),
1090 staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
1091 rpc_to_plugin_manager_sender: None,
1092 };
1093 let mut io = MetaIoHandler::default();
1094 io.extend_with(AdminRpcImpl.to_delegate());
1095
1096 Self {
1097 io,
1098 meta,
1099 bank_forks,
1100 }
1101 }
1102
1103 fn root_bank(&self) -> Arc<Bank> {
1104 self.bank_forks.read().unwrap().root_bank()
1105 }
1106 }
1107
1108 fn new_bank_forks_with_config(
1109 config: BankTestConfig,
1110 ) -> (Arc<RwLock<BankForks>>, Arc<Keypair>) {
1111 let GenesisConfigInfo {
1112 genesis_config,
1113 voting_keypair,
1114 ..
1115 } = create_genesis_config(1_000_000_000);
1116
1117 let bank = Bank::new_with_config_for_tests(&genesis_config, config);
1118 (BankForks::new_rw_arc(bank), Arc::new(voting_keypair))
1119 }
1120
1121 #[test]
1122 fn test_secondary_index_key_sizes() {
1123 for secondary_index_enabled in [true, false] {
1124 let account_indexes = if secondary_index_enabled {
1125 AccountSecondaryIndexes {
1126 keys: None,
1127 indexes: HashSet::from([
1128 AccountIndex::ProgramId,
1129 AccountIndex::SplTokenMint,
1130 AccountIndex::SplTokenOwner,
1131 ]),
1132 }
1133 } else {
1134 AccountSecondaryIndexes::default()
1135 };
1136
1137 let rpc = RpcHandler::start_with_config(TestConfig { account_indexes });
1139
1140 let bank = rpc.root_bank();
1141 let RpcHandler { io, meta, .. } = rpc;
1142
1143 let token_account1_pubkey = Pubkey::new_unique();
1145 let token_account2_pubkey = Pubkey::new_unique();
1146 let token_account3_pubkey = Pubkey::new_unique();
1147 let mint1_pubkey = Pubkey::new_unique();
1148 let mint2_pubkey = Pubkey::new_unique();
1149 let wallet1_pubkey = Pubkey::new_unique();
1150 let wallet2_pubkey = Pubkey::new_unique();
1151 let non_existent_pubkey = Pubkey::new_unique();
1152 let delegate = Pubkey::new_unique();
1153
1154 let mut num_default_spl_token_program_accounts = 0;
1155 let mut num_default_system_program_accounts = 0;
1156
1157 if !secondary_index_enabled {
1158 let req = format!(
1160 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{token_account1_pubkey}"]}}"#,
1161 );
1162 let res = io.handle_request_sync(&req, meta.clone());
1163 let result: Value = serde_json::from_str(&res.expect("actual response"))
1164 .expect("actual response deserialization");
1165 let sizes: HashMap<RpcAccountIndex, usize> =
1166 serde_json::from_value(result["result"].clone()).unwrap();
1167 assert!(sizes.is_empty());
1168 } else {
1169 let req = format!(
1171 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1172 token::id(),
1173 );
1174 let res = io.handle_request_sync(&req, meta.clone());
1175 let result: Value = serde_json::from_str(&res.expect("actual response"))
1176 .expect("actual response deserialization");
1177 let sizes: HashMap<RpcAccountIndex, usize> =
1178 serde_json::from_value(result["result"].clone()).unwrap();
1179 assert_eq!(sizes.len(), 1);
1180 num_default_spl_token_program_accounts =
1181 *sizes.get(&RpcAccountIndex::ProgramId).unwrap();
1182 let req = format!(
1184 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1185 system_program::id(),
1186 );
1187 let res = io.handle_request_sync(&req, meta.clone());
1188 let result: Value = serde_json::from_str(&res.expect("actual response"))
1189 .expect("actual response deserialization");
1190 let sizes: HashMap<RpcAccountIndex, usize> =
1191 serde_json::from_value(result["result"].clone()).unwrap();
1192 assert_eq!(sizes.len(), 1);
1193 num_default_system_program_accounts =
1194 *sizes.get(&RpcAccountIndex::ProgramId).unwrap();
1195 }
1196
1197 let wallet1_account = AccountSharedData::from(Account {
1199 lamports: 11111111,
1200 owner: system_program::id(),
1201 ..Account::default()
1202 });
1203 bank.store_account(&wallet1_pubkey, &wallet1_account);
1204 let wallet2_account = AccountSharedData::from(Account {
1205 lamports: 11111111,
1206 owner: system_program::id(),
1207 ..Account::default()
1208 });
1209 bank.store_account(&wallet2_pubkey, &wallet2_account);
1210
1211 let mut account1_data = vec![0; TokenAccount::get_packed_len()];
1213 let token_account1 = TokenAccount {
1214 mint: mint1_pubkey,
1215 owner: wallet1_pubkey,
1216 delegate: COption::Some(delegate),
1217 amount: 420,
1218 state: TokenAccountState::Initialized,
1219 is_native: COption::None,
1220 delegated_amount: 30,
1221 close_authority: COption::Some(wallet1_pubkey),
1222 };
1223 TokenAccount::pack(token_account1, &mut account1_data).unwrap();
1224 let token_account1 = AccountSharedData::from(Account {
1225 lamports: 111,
1226 data: account1_data.to_vec(),
1227 owner: token::id(),
1228 ..Account::default()
1229 });
1230 bank.store_account(&token_account1_pubkey, &token_account1);
1231
1232 let mut mint1_data = vec![0; Mint::get_packed_len()];
1234 let mint1_state = Mint {
1235 mint_authority: COption::Some(wallet1_pubkey),
1236 supply: 500,
1237 decimals: 2,
1238 is_initialized: true,
1239 freeze_authority: COption::Some(wallet1_pubkey),
1240 };
1241 Mint::pack(mint1_state, &mut mint1_data).unwrap();
1242 let mint_account1 = AccountSharedData::from(Account {
1243 lamports: 222,
1244 data: mint1_data.to_vec(),
1245 owner: token::id(),
1246 ..Account::default()
1247 });
1248 bank.store_account(&mint1_pubkey, &mint_account1);
1249
1250 let mut account2_data = vec![0; TokenAccount::get_packed_len()];
1252 let token_account2 = TokenAccount {
1253 mint: mint1_pubkey,
1254 owner: wallet2_pubkey,
1255 delegate: COption::Some(delegate),
1256 amount: 420,
1257 state: TokenAccountState::Initialized,
1258 is_native: COption::None,
1259 delegated_amount: 30,
1260 close_authority: COption::Some(wallet2_pubkey),
1261 };
1262 TokenAccount::pack(token_account2, &mut account2_data).unwrap();
1263 let token_account2 = AccountSharedData::from(Account {
1264 lamports: 333,
1265 data: account2_data.to_vec(),
1266 owner: token::id(),
1267 ..Account::default()
1268 });
1269 bank.store_account(&token_account2_pubkey, &token_account2);
1270
1271 let mut account3_data = vec![0; TokenAccount::get_packed_len()];
1273 let token_account3 = TokenAccount {
1274 mint: mint2_pubkey,
1275 owner: wallet2_pubkey,
1276 delegate: COption::Some(delegate),
1277 amount: 42,
1278 state: TokenAccountState::Initialized,
1279 is_native: COption::None,
1280 delegated_amount: 30,
1281 close_authority: COption::Some(wallet2_pubkey),
1282 };
1283 TokenAccount::pack(token_account3, &mut account3_data).unwrap();
1284 let token_account3 = AccountSharedData::from(Account {
1285 lamports: 444,
1286 data: account3_data.to_vec(),
1287 owner: token::id(),
1288 ..Account::default()
1289 });
1290 bank.store_account(&token_account3_pubkey, &token_account3);
1291
1292 let mut mint2_data = vec![0; Mint::get_packed_len()];
1294 let mint2_state = Mint {
1295 mint_authority: COption::Some(wallet2_pubkey),
1296 supply: 200,
1297 decimals: 3,
1298 is_initialized: true,
1299 freeze_authority: COption::Some(wallet2_pubkey),
1300 };
1301 Mint::pack(mint2_state, &mut mint2_data).unwrap();
1302 let mint_account2 = AccountSharedData::from(Account {
1303 lamports: 555,
1304 data: mint2_data.to_vec(),
1305 owner: token::id(),
1306 ..Account::default()
1307 });
1308 bank.store_account(&mint2_pubkey, &mint_account2);
1309
1310 if secondary_index_enabled {
1327 let req = format!(
1329 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{non_existent_pubkey}"]}}"#,
1330 );
1331 let res = io.handle_request_sync(&req, meta.clone());
1332 let result: Value = serde_json::from_str(&res.expect("actual response"))
1333 .expect("actual response deserialization");
1334 let sizes: HashMap<RpcAccountIndex, usize> =
1335 serde_json::from_value(result["result"].clone()).unwrap();
1336 assert!(sizes.is_empty());
1337 let req = format!(
1340 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{wallet1_pubkey}"]}}"#,
1341 );
1342 let res = io.handle_request_sync(&req, meta.clone());
1343 let result: Value = serde_json::from_str(&res.expect("actual response"))
1344 .expect("actual response deserialization");
1345 let sizes: HashMap<RpcAccountIndex, usize> =
1346 serde_json::from_value(result["result"].clone()).unwrap();
1347 assert_eq!(sizes.len(), 1);
1348 assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenOwner).unwrap(), 1);
1349 let req = format!(
1351 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{wallet2_pubkey}"]}}"#,
1352 );
1353 let res = io.handle_request_sync(&req, meta.clone());
1354 let result: Value = serde_json::from_str(&res.expect("actual response"))
1355 .expect("actual response deserialization");
1356 let sizes: HashMap<RpcAccountIndex, usize> =
1357 serde_json::from_value(result["result"].clone()).unwrap();
1358 assert_eq!(sizes.len(), 1);
1359 assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenOwner).unwrap(), 2);
1360 let req = format!(
1362 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{mint1_pubkey}"]}}"#,
1363 );
1364 let res = io.handle_request_sync(&req, meta.clone());
1365 let result: Value = serde_json::from_str(&res.expect("actual response"))
1366 .expect("actual response deserialization");
1367 let sizes: HashMap<RpcAccountIndex, usize> =
1368 serde_json::from_value(result["result"].clone()).unwrap();
1369 assert_eq!(sizes.len(), 1);
1370 assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenMint).unwrap(), 2);
1371 let req = format!(
1373 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{mint2_pubkey}"]}}"#,
1374 );
1375 let res = io.handle_request_sync(&req, meta.clone());
1376 let result: Value = serde_json::from_str(&res.expect("actual response"))
1377 .expect("actual response deserialization");
1378 let sizes: HashMap<RpcAccountIndex, usize> =
1379 serde_json::from_value(result["result"].clone()).unwrap();
1380 assert_eq!(sizes.len(), 1);
1381 assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenMint).unwrap(), 1);
1382 let req = format!(
1384 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1385 token::id(),
1386 );
1387 let res = io.handle_request_sync(&req, meta.clone());
1388 let result: Value = serde_json::from_str(&res.expect("actual response"))
1389 .expect("actual response deserialization");
1390 let sizes: HashMap<RpcAccountIndex, usize> =
1391 serde_json::from_value(result["result"].clone()).unwrap();
1392 assert_eq!(sizes.len(), 1);
1393 assert_eq!(
1394 *sizes.get(&RpcAccountIndex::ProgramId).unwrap(),
1395 (num_default_spl_token_program_accounts + 5)
1396 );
1397 let req = format!(
1399 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1400 system_program::id(),
1401 );
1402 let res = io.handle_request_sync(&req, meta.clone());
1403 let result: Value = serde_json::from_str(&res.expect("actual response"))
1404 .expect("actual response deserialization");
1405 let sizes: HashMap<RpcAccountIndex, usize> =
1406 serde_json::from_value(result["result"].clone()).unwrap();
1407 assert_eq!(sizes.len(), 1);
1408 assert_eq!(
1409 *sizes.get(&RpcAccountIndex::ProgramId).unwrap(),
1410 (num_default_system_program_accounts + 2)
1411 );
1412 } else {
1413 let req = format!(
1415 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{token_account2_pubkey}"]}}"#,
1416 );
1417 let res = io.handle_request_sync(&req, meta.clone());
1418 let result: Value = serde_json::from_str(&res.expect("actual response"))
1419 .expect("actual response deserialization");
1420 let sizes: HashMap<RpcAccountIndex, usize> =
1421 serde_json::from_value(result["result"].clone()).unwrap();
1422 assert!(sizes.is_empty());
1423 }
1424 }
1425 }
1426
1427 #[test]
1430 fn test_set_identity() {
1431 let rpc = RpcHandler::start_with_config(TestConfig::default());
1432
1433 let RpcHandler { io, meta, .. } = rpc;
1434
1435 let expected_validator_id = Keypair::new();
1436 let validator_id_bytes = format!("{:?}", expected_validator_id.to_bytes());
1437
1438 let set_id_request = format!(
1439 r#"{{"jsonrpc":"2.0","id":1,"method":"setIdentityFromBytes","params":[{validator_id_bytes}, false]}}"#,
1440 );
1441 let response = io.handle_request_sync(&set_id_request, meta.clone());
1442 let actual_parsed_response: Value =
1443 serde_json::from_str(&response.expect("actual response"))
1444 .expect("actual response deserialization");
1445
1446 let expected_parsed_response: Value = serde_json::from_str(
1447 r#"{
1448 "id": 1,
1449 "jsonrpc": "2.0",
1450 "result": null
1451 }"#,
1452 )
1453 .expect("Failed to parse expected response");
1454 assert_eq!(actual_parsed_response, expected_parsed_response);
1455
1456 let contact_info_request =
1457 r#"{"jsonrpc":"2.0","id":1,"method":"contactInfo","params":[]}"#.to_string();
1458 let response = io.handle_request_sync(&contact_info_request, meta.clone());
1459 let parsed_response: Value = serde_json::from_str(&response.expect("actual response"))
1460 .expect("actual response deserialization");
1461 let actual_validator_id = parsed_response["result"]["id"]
1462 .as_str()
1463 .expect("Expected a string");
1464 assert_eq!(
1465 actual_validator_id,
1466 expected_validator_id.pubkey().to_string()
1467 );
1468 }
1469
1470 struct TestValidatorWithAdminRpc {
1471 meta: AdminRpcRequestMetadata,
1472 io: MetaIoHandler<AdminRpcRequestMetadata>,
1473 validator_ledger_path: PathBuf,
1474 }
1475
1476 impl TestValidatorWithAdminRpc {
1477 fn new() -> Self {
1478 let leader_keypair = Keypair::new();
1479 let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
1480
1481 let validator_keypair = Keypair::new();
1482 let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
1483 let genesis_config =
1484 create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1000)
1485 .genesis_config;
1486 let (validator_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
1487
1488 let voting_keypair = Arc::new(Keypair::new());
1489 let voting_pubkey = voting_keypair.pubkey();
1490 let authorized_voter_keypairs = Arc::new(RwLock::new(vec![voting_keypair]));
1491 let validator_config = ValidatorConfig {
1492 rpc_addrs: Some((
1493 validator_node.info.rpc().unwrap(),
1494 validator_node.info.rpc_pubsub().unwrap(),
1495 )),
1496 ..ValidatorConfig::default_for_test()
1497 };
1498 let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
1499
1500 let post_init = Arc::new(RwLock::new(None));
1501 let meta = AdminRpcRequestMetadata {
1502 rpc_addr: validator_config.rpc_addrs.map(|(rpc_addr, _)| rpc_addr),
1503 start_time: SystemTime::now(),
1504 start_progress: start_progress.clone(),
1505 validator_exit: validator_config.validator_exit.clone(),
1506 validator_exit_backpressure: HashMap::default(),
1507 authorized_voter_keypairs: authorized_voter_keypairs.clone(),
1508 tower_storage: Arc::new(NullTowerStorage {}),
1509 post_init: post_init.clone(),
1510 staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
1511 rpc_to_plugin_manager_sender: None,
1512 };
1513
1514 let _validator = Validator::new(
1515 validator_node,
1516 Arc::new(validator_keypair),
1517 &validator_ledger_path,
1518 &voting_pubkey,
1519 authorized_voter_keypairs,
1520 vec![leader_node.info],
1521 &validator_config,
1522 true, None, start_progress.clone(),
1525 SocketAddrSpace::Unspecified,
1526 ValidatorTpuConfig::new_for_tests(DEFAULT_TPU_ENABLE_UDP),
1527 post_init.clone(),
1528 )
1529 .expect("assume successful validator start");
1530 assert_eq!(
1531 *start_progress.read().unwrap(),
1532 ValidatorStartProgress::Running
1533 );
1534 let post_init = post_init.read().unwrap();
1535
1536 assert!(post_init.is_some());
1537 let post_init = post_init.as_ref().unwrap();
1538 let notifies = post_init.notifies.read().unwrap();
1539 let updater_keys: HashSet<KeyUpdaterType> =
1540 notifies.into_iter().map(|(key, _)| key.clone()).collect();
1541 assert_eq!(
1542 updater_keys,
1543 HashSet::from_iter(vec![
1544 KeyUpdaterType::Tpu,
1545 KeyUpdaterType::TpuForwards,
1546 KeyUpdaterType::TpuVote,
1547 KeyUpdaterType::Forward,
1548 KeyUpdaterType::RpcService
1549 ])
1550 );
1551 let mut io = MetaIoHandler::default();
1552 io.extend_with(AdminRpcImpl.to_delegate());
1553 Self {
1554 meta,
1555 io,
1556 validator_ledger_path,
1557 }
1558 }
1559
1560 fn handle_request(&self, request: &str) -> Option<String> {
1561 self.io.handle_request_sync(request, self.meta.clone())
1562 }
1563 }
1564
1565 impl Drop for TestValidatorWithAdminRpc {
1566 fn drop(&mut self) {
1567 remove_dir_all(self.validator_ledger_path.clone()).unwrap();
1568 }
1569 }
1570
1571 #[test]
1573 fn test_set_identity_with_validator() {
1574 let test_validator = TestValidatorWithAdminRpc::new();
1575 let expected_validator_id = Keypair::new();
1576 let validator_id_bytes = format!("{:?}", expected_validator_id.to_bytes());
1577
1578 let set_id_request = format!(
1579 r#"{{"jsonrpc":"2.0","id":1,"method":"setIdentityFromBytes","params":[{validator_id_bytes}, false]}}"#,
1580 );
1581 let response = test_validator.handle_request(&set_id_request);
1582 let actual_parsed_response: Value =
1583 serde_json::from_str(&response.expect("actual response"))
1584 .expect("actual response deserialization");
1585
1586 let expected_parsed_response: Value = serde_json::from_str(
1587 r#"{
1588 "id": 1,
1589 "jsonrpc": "2.0",
1590 "result": null
1591 }"#,
1592 )
1593 .expect("Failed to parse expected response");
1594 assert_eq!(actual_parsed_response, expected_parsed_response);
1595
1596 let contact_info_request =
1597 r#"{"jsonrpc":"2.0","id":1,"method":"contactInfo","params":[]}"#.to_string();
1598 let response = test_validator.handle_request(&contact_info_request);
1599 let parsed_response: Value = serde_json::from_str(&response.expect("actual response"))
1600 .expect("actual response deserialization");
1601 let actual_validator_id = parsed_response["result"]["id"]
1602 .as_str()
1603 .expect("Expected a string");
1604 assert_eq!(
1605 actual_validator_id,
1606 expected_validator_id.pubkey().to_string()
1607 );
1608
1609 let contact_info_request =
1610 r#"{"jsonrpc":"2.0","id":1,"method":"exit","params":[]}"#.to_string();
1611 let exit_response = test_validator.handle_request(&contact_info_request);
1612 let actual_parsed_response: Value =
1613 serde_json::from_str(&exit_response.expect("actual response"))
1614 .expect("actual response deserialization");
1615 assert_eq!(actual_parsed_response, expected_parsed_response);
1616 }
1617}