1use {
2 crossbeam_channel::Sender,
3 jsonrpc_core::{BoxFuture, ErrorCode, MetaIoHandler, Metadata, Result},
4 jsonrpc_core_client::{transports::ipc, RpcError},
5 jsonrpc_derive::rpc,
6 jsonrpc_ipc_server::{
7 tokio::sync::oneshot::channel as oneshot_channel, RequestContext, ServerBuilder,
8 },
9 jsonrpc_server_utils::tokio,
10 log::*,
11 serde::{de::Deserializer, Deserialize, Serialize},
12 solana_accounts_db::accounts_index::AccountIndex,
13 solana_core::{
14 admin_rpc_post_init::AdminRpcRequestMetadataPostInit,
15 consensus::{tower_storage::TowerStorage, Tower},
16 repair::repair_service,
17 validator::ValidatorStartProgress,
18 },
19 solana_geyser_plugin_manager::GeyserPluginManagerRequest,
20 solana_gossip::contact_info::{ContactInfo, Protocol, SOCKET_ADDR_UNSPECIFIED},
21 solana_rpc::rpc::verify_pubkey,
22 solana_rpc_client_api::{config::RpcAccountIndex, custom_error::RpcCustomError},
23 solana_sdk::{
24 exit::Exit,
25 pubkey::Pubkey,
26 signature::{read_keypair_file, Keypair, Signer},
27 },
28 std::{
29 collections::{HashMap, HashSet},
30 error,
31 fmt::{self, Display},
32 net::SocketAddr,
33 path::{Path, PathBuf},
34 sync::{Arc, RwLock},
35 thread::{self, Builder},
36 time::{Duration, SystemTime},
37 },
38};
39
40#[derive(Clone)]
41pub struct AdminRpcRequestMetadata {
42 pub rpc_addr: Option<SocketAddr>,
43 pub start_time: SystemTime,
44 pub start_progress: Arc<RwLock<ValidatorStartProgress>>,
45 pub validator_exit: Arc<RwLock<Exit>>,
46 pub authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
47 pub tower_storage: Arc<dyn TowerStorage>,
48 pub staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
49 pub post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
50 pub rpc_to_plugin_manager_sender: Option<Sender<GeyserPluginManagerRequest>>,
51}
52
53impl Metadata for AdminRpcRequestMetadata {}
54
55impl AdminRpcRequestMetadata {
56 fn with_post_init<F, R>(&self, func: F) -> Result<R>
57 where
58 F: FnOnce(&AdminRpcRequestMetadataPostInit) -> Result<R>,
59 {
60 if let Some(post_init) = self.post_init.read().unwrap().as_ref() {
61 func(post_init)
62 } else {
63 Err(jsonrpc_core::error::Error::invalid_params(
64 "Retry once validator start up is complete",
65 ))
66 }
67 }
68}
69
70#[derive(Debug, Deserialize, Serialize)]
71pub struct AdminRpcContactInfo {
72 pub id: String,
73 pub gossip: SocketAddr,
74 pub tvu: SocketAddr,
75 pub tvu_quic: SocketAddr,
76 pub serve_repair_quic: SocketAddr,
77 pub tpu: SocketAddr,
78 pub tpu_forwards: SocketAddr,
79 pub tpu_vote: SocketAddr,
80 pub rpc: SocketAddr,
81 pub rpc_pubsub: SocketAddr,
82 pub serve_repair: SocketAddr,
83 pub last_updated_timestamp: u64,
84 pub shred_version: u16,
85}
86
87#[derive(Debug, Deserialize, Serialize)]
88pub struct AdminRpcRepairWhitelist {
89 pub whitelist: Vec<Pubkey>,
90}
91
92impl From<ContactInfo> for AdminRpcContactInfo {
93 fn from(node: ContactInfo) -> Self {
94 macro_rules! unwrap_socket {
95 ($name:ident) => {
96 node.$name().unwrap_or(SOCKET_ADDR_UNSPECIFIED)
97 };
98 ($name:ident, $protocol:expr) => {
99 node.$name($protocol).unwrap_or(SOCKET_ADDR_UNSPECIFIED)
100 };
101 }
102 Self {
103 id: node.pubkey().to_string(),
104 last_updated_timestamp: node.wallclock(),
105 gossip: unwrap_socket!(gossip),
106 tvu: unwrap_socket!(tvu, Protocol::UDP),
107 tvu_quic: unwrap_socket!(tvu, Protocol::QUIC),
108 serve_repair_quic: unwrap_socket!(serve_repair, Protocol::QUIC),
109 tpu: unwrap_socket!(tpu, Protocol::UDP),
110 tpu_forwards: unwrap_socket!(tpu_forwards, Protocol::UDP),
111 tpu_vote: unwrap_socket!(tpu_vote),
112 rpc: unwrap_socket!(rpc),
113 rpc_pubsub: unwrap_socket!(rpc_pubsub),
114 serve_repair: unwrap_socket!(serve_repair, Protocol::UDP),
115 shred_version: node.shred_version(),
116 }
117 }
118}
119
120impl Display for AdminRpcContactInfo {
121 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
122 writeln!(f, "Identity: {}", self.id)?;
123 writeln!(f, "Gossip: {}", self.gossip)?;
124 writeln!(f, "TVU: {}", self.tvu)?;
125 writeln!(f, "TVU QUIC: {}", self.tvu_quic)?;
126 writeln!(f, "TPU: {}", self.tpu)?;
127 writeln!(f, "TPU Forwards: {}", self.tpu_forwards)?;
128 writeln!(f, "TPU Votes: {}", self.tpu_vote)?;
129 writeln!(f, "RPC: {}", self.rpc)?;
130 writeln!(f, "RPC Pubsub: {}", self.rpc_pubsub)?;
131 writeln!(f, "Serve Repair: {}", self.serve_repair)?;
132 writeln!(f, "Last Updated Timestamp: {}", self.last_updated_timestamp)?;
133 writeln!(f, "Shred Version: {}", self.shred_version)
134 }
135}
136
137impl Display for AdminRpcRepairWhitelist {
138 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
139 writeln!(f, "Repair whitelist: {:?}", &self.whitelist)
140 }
141}
142
143#[rpc]
144pub trait AdminRpc {
145 type Metadata;
146
147 #[rpc(meta, name = "exit")]
148 fn exit(&self, meta: Self::Metadata) -> Result<()>;
149
150 #[rpc(meta, name = "reloadPlugin")]
151 fn reload_plugin(
152 &self,
153 meta: Self::Metadata,
154 name: String,
155 config_file: String,
156 ) -> BoxFuture<Result<()>>;
157
158 #[rpc(meta, name = "unloadPlugin")]
159 fn unload_plugin(&self, meta: Self::Metadata, name: String) -> BoxFuture<Result<()>>;
160
161 #[rpc(meta, name = "loadPlugin")]
162 fn load_plugin(&self, meta: Self::Metadata, config_file: String) -> BoxFuture<Result<String>>;
163
164 #[rpc(meta, name = "listPlugins")]
165 fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture<Result<Vec<String>>>;
166
167 #[rpc(meta, name = "rpcAddress")]
168 fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>>;
169
170 #[rpc(name = "setLogFilter")]
171 fn set_log_filter(&self, filter: String) -> Result<()>;
172
173 #[rpc(meta, name = "startTime")]
174 fn start_time(&self, meta: Self::Metadata) -> Result<SystemTime>;
175
176 #[rpc(meta, name = "startProgress")]
177 fn start_progress(&self, meta: Self::Metadata) -> Result<ValidatorStartProgress>;
178
179 #[rpc(meta, name = "addAuthorizedVoter")]
180 fn add_authorized_voter(&self, meta: Self::Metadata, keypair_file: String) -> Result<()>;
181
182 #[rpc(meta, name = "addAuthorizedVoterFromBytes")]
183 fn add_authorized_voter_from_bytes(&self, meta: Self::Metadata, keypair: Vec<u8>)
184 -> Result<()>;
185
186 #[rpc(meta, name = "removeAllAuthorizedVoters")]
187 fn remove_all_authorized_voters(&self, meta: Self::Metadata) -> Result<()>;
188
189 #[rpc(meta, name = "setIdentity")]
190 fn set_identity(
191 &self,
192 meta: Self::Metadata,
193 keypair_file: String,
194 require_tower: bool,
195 ) -> Result<()>;
196
197 #[rpc(meta, name = "setIdentityFromBytes")]
198 fn set_identity_from_bytes(
199 &self,
200 meta: Self::Metadata,
201 identity_keypair: Vec<u8>,
202 require_tower: bool,
203 ) -> Result<()>;
204
205 #[rpc(meta, name = "setStakedNodesOverrides")]
206 fn set_staked_nodes_overrides(&self, meta: Self::Metadata, path: String) -> Result<()>;
207
208 #[rpc(meta, name = "contactInfo")]
209 fn contact_info(&self, meta: Self::Metadata) -> Result<AdminRpcContactInfo>;
210
211 #[rpc(meta, name = "repairShredFromPeer")]
212 fn repair_shred_from_peer(
213 &self,
214 meta: Self::Metadata,
215 pubkey: Option<Pubkey>,
216 slot: u64,
217 shred_index: u64,
218 ) -> Result<()>;
219
220 #[rpc(meta, name = "repairWhitelist")]
221 fn repair_whitelist(&self, meta: Self::Metadata) -> Result<AdminRpcRepairWhitelist>;
222
223 #[rpc(meta, name = "setRepairWhitelist")]
224 fn set_repair_whitelist(&self, meta: Self::Metadata, whitelist: Vec<Pubkey>) -> Result<()>;
225
226 #[rpc(meta, name = "getSecondaryIndexKeySize")]
227 fn get_secondary_index_key_size(
228 &self,
229 meta: Self::Metadata,
230 pubkey_str: String,
231 ) -> Result<HashMap<RpcAccountIndex, usize>>;
232
233 #[rpc(meta, name = "setPublicTpuAddress")]
234 fn set_public_tpu_address(
235 &self,
236 meta: Self::Metadata,
237 public_tpu_addr: SocketAddr,
238 ) -> Result<()>;
239
240 #[rpc(meta, name = "setPublicTpuForwardsAddress")]
241 fn set_public_tpu_forwards_address(
242 &self,
243 meta: Self::Metadata,
244 public_tpu_forwards_addr: SocketAddr,
245 ) -> Result<()>;
246}
247
248pub struct AdminRpcImpl;
249impl AdminRpc for AdminRpcImpl {
250 type Metadata = AdminRpcRequestMetadata;
251
252 fn exit(&self, meta: Self::Metadata) -> Result<()> {
253 debug!("exit admin rpc request received");
254
255 thread::Builder::new()
256 .name("solProcessExit".into())
257 .spawn(move || {
258 thread::sleep(Duration::from_millis(100));
261
262 warn!("validator exit requested");
263 meta.validator_exit.write().unwrap().exit();
264
265 thread::sleep(Duration::from_secs(5));
270 warn!("validator exit timeout");
271 std::process::exit(0);
272 })
273 .unwrap();
274 Ok(())
275 }
276
277 fn reload_plugin(
278 &self,
279 meta: Self::Metadata,
280 name: String,
281 config_file: String,
282 ) -> BoxFuture<Result<()>> {
283 Box::pin(async move {
284 let (response_sender, response_receiver) = oneshot_channel();
286
287 if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
289 rpc_to_manager_sender
290 .send(GeyserPluginManagerRequest::ReloadPlugin {
291 name,
292 config_file,
293 response_sender,
294 })
295 .expect("GeyerPluginService should never drop request receiver");
296 } else {
297 return Err(jsonrpc_core::Error {
298 code: ErrorCode::InvalidRequest,
299 message: "No geyser plugin service".to_string(),
300 data: None,
301 });
302 }
303
304 response_receiver
306 .await
307 .expect("GeyerPluginService's oneshot sender shouldn't drop early")
308 })
309 }
310
311 fn load_plugin(&self, meta: Self::Metadata, config_file: String) -> BoxFuture<Result<String>> {
312 Box::pin(async move {
313 let (response_sender, response_receiver) = oneshot_channel();
315
316 if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
318 rpc_to_manager_sender
319 .send(GeyserPluginManagerRequest::LoadPlugin {
320 config_file,
321 response_sender,
322 })
323 .expect("GeyerPluginService should never drop request receiver");
324 } else {
325 return Err(jsonrpc_core::Error {
326 code: ErrorCode::InvalidRequest,
327 message: "No geyser plugin service".to_string(),
328 data: None,
329 });
330 }
331
332 response_receiver
334 .await
335 .expect("GeyerPluginService's oneshot sender shouldn't drop early")
336 })
337 }
338
339 fn unload_plugin(&self, meta: Self::Metadata, name: String) -> BoxFuture<Result<()>> {
340 Box::pin(async move {
341 let (response_sender, response_receiver) = oneshot_channel();
343
344 if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
346 rpc_to_manager_sender
347 .send(GeyserPluginManagerRequest::UnloadPlugin {
348 name,
349 response_sender,
350 })
351 .expect("GeyerPluginService should never drop request receiver");
352 } else {
353 return Err(jsonrpc_core::Error {
354 code: ErrorCode::InvalidRequest,
355 message: "No geyser plugin service".to_string(),
356 data: None,
357 });
358 }
359
360 response_receiver
362 .await
363 .expect("GeyerPluginService's oneshot sender shouldn't drop early")
364 })
365 }
366
367 fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture<Result<Vec<String>>> {
368 Box::pin(async move {
369 let (response_sender, response_receiver) = oneshot_channel();
371
372 if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
374 rpc_to_manager_sender
375 .send(GeyserPluginManagerRequest::ListPlugins { response_sender })
376 .expect("GeyerPluginService should never drop request receiver");
377 } else {
378 return Err(jsonrpc_core::Error {
379 code: ErrorCode::InvalidRequest,
380 message: "No geyser plugin service".to_string(),
381 data: None,
382 });
383 }
384
385 response_receiver
387 .await
388 .expect("GeyerPluginService's oneshot sender shouldn't drop early")
389 })
390 }
391
392 fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>> {
393 debug!("rpc_addr admin rpc request received");
394 Ok(meta.rpc_addr)
395 }
396
397 fn set_log_filter(&self, filter: String) -> Result<()> {
398 debug!("set_log_filter admin rpc request received");
399 solana_logger::setup_with(&filter);
400 Ok(())
401 }
402
403 fn start_time(&self, meta: Self::Metadata) -> Result<SystemTime> {
404 debug!("start_time admin rpc request received");
405 Ok(meta.start_time)
406 }
407
408 fn start_progress(&self, meta: Self::Metadata) -> Result<ValidatorStartProgress> {
409 debug!("start_progress admin rpc request received");
410 Ok(*meta.start_progress.read().unwrap())
411 }
412
413 fn add_authorized_voter(&self, meta: Self::Metadata, keypair_file: String) -> Result<()> {
414 debug!("add_authorized_voter request received");
415
416 let authorized_voter = read_keypair_file(keypair_file)
417 .map_err(|err| jsonrpc_core::error::Error::invalid_params(format!("{err}")))?;
418
419 AdminRpcImpl::add_authorized_voter_keypair(meta, authorized_voter)
420 }
421
422 fn add_authorized_voter_from_bytes(
423 &self,
424 meta: Self::Metadata,
425 keypair: Vec<u8>,
426 ) -> Result<()> {
427 debug!("add_authorized_voter_from_bytes request received");
428
429 let authorized_voter = Keypair::from_bytes(&keypair).map_err(|err| {
430 jsonrpc_core::error::Error::invalid_params(format!(
431 "Failed to read authorized voter keypair from provided byte array: {err}"
432 ))
433 })?;
434
435 AdminRpcImpl::add_authorized_voter_keypair(meta, authorized_voter)
436 }
437
438 fn remove_all_authorized_voters(&self, meta: Self::Metadata) -> Result<()> {
439 debug!("remove_all_authorized_voters received");
440 meta.authorized_voter_keypairs.write().unwrap().clear();
441 Ok(())
442 }
443
444 fn set_identity(
445 &self,
446 meta: Self::Metadata,
447 keypair_file: String,
448 require_tower: bool,
449 ) -> Result<()> {
450 debug!("set_identity request received");
451
452 let identity_keypair = read_keypair_file(&keypair_file).map_err(|err| {
453 jsonrpc_core::error::Error::invalid_params(format!(
454 "Failed to read identity keypair from {keypair_file}: {err}"
455 ))
456 })?;
457
458 AdminRpcImpl::set_identity_keypair(meta, identity_keypair, require_tower)
459 }
460
461 fn set_identity_from_bytes(
462 &self,
463 meta: Self::Metadata,
464 identity_keypair: Vec<u8>,
465 require_tower: bool,
466 ) -> Result<()> {
467 debug!("set_identity_from_bytes request received");
468
469 let identity_keypair = Keypair::from_bytes(&identity_keypair).map_err(|err| {
470 jsonrpc_core::error::Error::invalid_params(format!(
471 "Failed to read identity keypair from provided byte array: {err}"
472 ))
473 })?;
474
475 AdminRpcImpl::set_identity_keypair(meta, identity_keypair, require_tower)
476 }
477
478 fn set_staked_nodes_overrides(&self, meta: Self::Metadata, path: String) -> Result<()> {
479 let loaded_config = load_staked_nodes_overrides(&path)
480 .map_err(|err| {
481 error!(
482 "Failed to load staked nodes overrides from {}: {}",
483 &path, err
484 );
485 jsonrpc_core::error::Error::internal_error()
486 })?
487 .staked_map_id;
488 let mut write_staked_nodes = meta.staked_nodes_overrides.write().unwrap();
489 write_staked_nodes.clear();
490 write_staked_nodes.extend(loaded_config);
491 info!("Staked nodes overrides loaded from {}", path);
492 debug!("overrides map: {:?}", write_staked_nodes);
493 Ok(())
494 }
495
496 fn contact_info(&self, meta: Self::Metadata) -> Result<AdminRpcContactInfo> {
497 meta.with_post_init(|post_init| Ok(post_init.cluster_info.my_contact_info().into()))
498 }
499
500 fn repair_shred_from_peer(
501 &self,
502 meta: Self::Metadata,
503 pubkey: Option<Pubkey>,
504 slot: u64,
505 shred_index: u64,
506 ) -> Result<()> {
507 debug!("repair_shred_from_peer request received");
508
509 meta.with_post_init(|post_init| {
510 repair_service::RepairService::request_repair_for_shred_from_peer(
511 post_init.cluster_info.clone(),
512 post_init.cluster_slots.clone(),
513 pubkey,
514 slot,
515 shred_index,
516 &post_init.repair_socket.clone(),
517 post_init.outstanding_repair_requests.clone(),
518 );
519 Ok(())
520 })
521 }
522
523 fn repair_whitelist(&self, meta: Self::Metadata) -> Result<AdminRpcRepairWhitelist> {
524 debug!("repair_whitelist request received");
525
526 meta.with_post_init(|post_init| {
527 let whitelist: Vec<_> = post_init
528 .repair_whitelist
529 .read()
530 .unwrap()
531 .iter()
532 .copied()
533 .collect();
534 Ok(AdminRpcRepairWhitelist { whitelist })
535 })
536 }
537
538 fn set_repair_whitelist(&self, meta: Self::Metadata, whitelist: Vec<Pubkey>) -> Result<()> {
539 debug!("set_repair_whitelist request received");
540
541 let whitelist: HashSet<Pubkey> = whitelist.into_iter().collect();
542 meta.with_post_init(|post_init| {
543 *post_init.repair_whitelist.write().unwrap() = whitelist;
544 warn!(
545 "Repair whitelist set to {:?}",
546 &post_init.repair_whitelist.read().unwrap()
547 );
548 Ok(())
549 })
550 }
551
552 fn get_secondary_index_key_size(
553 &self,
554 meta: Self::Metadata,
555 pubkey_str: String,
556 ) -> Result<HashMap<RpcAccountIndex, usize>> {
557 debug!(
558 "get_secondary_index_key_size rpc request received: {:?}",
559 pubkey_str
560 );
561 let index_key = verify_pubkey(&pubkey_str)?;
562 meta.with_post_init(|post_init| {
563 let bank = post_init.bank_forks.read().unwrap().root_bank();
564
565 let enabled_account_indexes = &bank.accounts().accounts_db.account_indexes;
567
568 if enabled_account_indexes.is_empty() {
570 debug!("get_secondary_index_key_size: secondary index not enabled.");
571 return Ok(HashMap::new());
572 };
573
574 if !enabled_account_indexes.include_key(&index_key) {
576 return Err(RpcCustomError::KeyExcludedFromSecondaryIndex {
577 index_key: index_key.to_string(),
578 }
579 .into());
580 }
581
582 let accounts_index = &bank.accounts().accounts_db.accounts_index;
584
585 let found_sizes = enabled_account_indexes
587 .indexes
588 .iter()
589 .filter_map(|index| {
590 accounts_index
591 .get_index_key_size(index, &index_key)
592 .map(|size| (rpc_account_index_from_account_index(index), size))
593 })
594 .collect::<HashMap<_, _>>();
595
596 if found_sizes.is_empty() {
598 debug!("get_secondary_index_key_size: key not found in the secondary index.");
599 }
600 Ok(found_sizes)
601 })
602 }
603
604 fn set_public_tpu_address(
605 &self,
606 meta: Self::Metadata,
607 public_tpu_addr: SocketAddr,
608 ) -> Result<()> {
609 debug!("set_public_tpu_address rpc request received: {public_tpu_addr}");
610
611 meta.with_post_init(|post_init| {
612 post_init
613 .cluster_info
614 .my_contact_info()
615 .tpu(Protocol::UDP)
616 .map_err(|err| {
617 error!(
618 "The public TPU address isn't being published. \
619 The node is likely in repair mode. \
620 See help for --restricted-repair-only-mode for more information. \
621 {err}"
622 );
623 jsonrpc_core::error::Error::internal_error()
624 })?;
625 post_init
626 .cluster_info
627 .set_tpu(public_tpu_addr)
628 .map_err(|err| {
629 error!("Failed to set public TPU address to {public_tpu_addr}: {err}");
630 jsonrpc_core::error::Error::internal_error()
631 })?;
632 let my_contact_info = post_init.cluster_info.my_contact_info();
633 warn!(
634 "Public TPU addresses set to {:?} (udp) and {:?} (quic)",
635 my_contact_info.tpu(Protocol::UDP),
636 my_contact_info.tpu(Protocol::QUIC),
637 );
638 Ok(())
639 })
640 }
641
642 fn set_public_tpu_forwards_address(
643 &self,
644 meta: Self::Metadata,
645 public_tpu_forwards_addr: SocketAddr,
646 ) -> Result<()> {
647 debug!("set_public_tpu_forwards_address rpc request received: {public_tpu_forwards_addr}");
648
649 meta.with_post_init(|post_init| {
650 post_init
651 .cluster_info
652 .my_contact_info()
653 .tpu_forwards(Protocol::UDP)
654 .map_err(|err| {
655 error!(
656 "The public TPU Forwards address isn't being published. \
657 The node is likely in repair mode. \
658 See help for --restricted-repair-only-mode for more information. \
659 {err}"
660 );
661 jsonrpc_core::error::Error::internal_error()
662 })?;
663 post_init
664 .cluster_info
665 .set_tpu_forwards(public_tpu_forwards_addr)
666 .map_err(|err| {
667 error!("Failed to set public TPU address to {public_tpu_forwards_addr}: {err}");
668 jsonrpc_core::error::Error::internal_error()
669 })?;
670 let my_contact_info = post_init.cluster_info.my_contact_info();
671 warn!(
672 "Public TPU Forwards addresses set to {:?} (udp) and {:?} (quic)",
673 my_contact_info.tpu_forwards(Protocol::UDP),
674 my_contact_info.tpu_forwards(Protocol::QUIC),
675 );
676 Ok(())
677 })
678 }
679}
680
681impl AdminRpcImpl {
682 fn add_authorized_voter_keypair(
683 meta: AdminRpcRequestMetadata,
684 authorized_voter: Keypair,
685 ) -> Result<()> {
686 let mut authorized_voter_keypairs = meta.authorized_voter_keypairs.write().unwrap();
687
688 if authorized_voter_keypairs
689 .iter()
690 .any(|x| x.pubkey() == authorized_voter.pubkey())
691 {
692 Err(jsonrpc_core::error::Error::invalid_params(
693 "Authorized voter already present",
694 ))
695 } else {
696 authorized_voter_keypairs.push(Arc::new(authorized_voter));
697 Ok(())
698 }
699 }
700
701 fn set_identity_keypair(
702 meta: AdminRpcRequestMetadata,
703 identity_keypair: Keypair,
704 require_tower: bool,
705 ) -> Result<()> {
706 meta.with_post_init(|post_init| {
707 if require_tower {
708 let _ = Tower::restore(meta.tower_storage.as_ref(), &identity_keypair.pubkey())
709 .map_err(|err| {
710 jsonrpc_core::error::Error::invalid_params(format!(
711 "Unable to load tower file for identity {}: {}",
712 identity_keypair.pubkey(),
713 err
714 ))
715 })?;
716 }
717
718 for n in post_init.notifies.iter() {
719 if let Err(err) = n.update_key(&identity_keypair) {
720 error!("Error updating network layer keypair: {err}");
721 }
722 }
723
724 solana_metrics::set_host_id(identity_keypair.pubkey().to_string());
725 post_init
726 .cluster_info
727 .set_keypair(Arc::new(identity_keypair));
728 warn!("Identity set to {}", post_init.cluster_info.id());
729 Ok(())
730 })
731 }
732}
733
734fn rpc_account_index_from_account_index(account_index: &AccountIndex) -> RpcAccountIndex {
735 match account_index {
736 AccountIndex::ProgramId => RpcAccountIndex::ProgramId,
737 AccountIndex::SplTokenOwner => RpcAccountIndex::SplTokenOwner,
738 AccountIndex::SplTokenMint => RpcAccountIndex::SplTokenMint,
739 }
740}
741
742pub fn run(ledger_path: &Path, metadata: AdminRpcRequestMetadata) {
744 let admin_rpc_path = admin_rpc_path(ledger_path);
745
746 let event_loop = tokio::runtime::Builder::new_multi_thread()
747 .thread_name("solAdminRpcEl")
748 .worker_threads(3) .enable_all()
750 .build()
751 .unwrap();
752
753 Builder::new()
754 .name("solAdminRpc".to_string())
755 .spawn(move || {
756 let mut io = MetaIoHandler::default();
757 io.extend_with(AdminRpcImpl.to_delegate());
758
759 let validator_exit = metadata.validator_exit.clone();
760 let server = ServerBuilder::with_meta_extractor(io, move |_req: &RequestContext| {
761 metadata.clone()
762 })
763 .event_loop_executor(event_loop.handle().clone())
764 .start(&format!("{}", admin_rpc_path.display()));
765
766 match server {
767 Err(err) => {
768 warn!("Unable to start admin rpc service: {:?}", err);
769 }
770 Ok(server) => {
771 info!("started admin rpc service!");
772 let close_handle = server.close_handle();
773 validator_exit
774 .write()
775 .unwrap()
776 .register_exit(Box::new(move || {
777 close_handle.close();
778 }));
779
780 server.wait();
781 }
782 }
783 })
784 .unwrap();
785}
786
787fn admin_rpc_path(ledger_path: &Path) -> PathBuf {
788 #[cfg(target_family = "windows")]
789 {
790 if let Some(ledger_filename) = ledger_path.file_name() {
793 PathBuf::from(format!(
794 "\\\\.\\pipe\\{}-admin.rpc",
795 ledger_filename.to_string_lossy()
796 ))
797 } else {
798 PathBuf::from("\\\\.\\pipe\\admin.rpc")
799 }
800 }
801 #[cfg(not(target_family = "windows"))]
802 {
803 ledger_path.join("admin.rpc")
804 }
805}
806
807pub async fn connect(ledger_path: &Path) -> std::result::Result<gen_client::Client, RpcError> {
809 let admin_rpc_path = admin_rpc_path(ledger_path);
810 if !admin_rpc_path.exists() {
811 Err(RpcError::Client(format!(
812 "{} does not exist",
813 admin_rpc_path.display()
814 )))
815 } else {
816 ipc::connect::<_, gen_client::Client>(&format!("{}", admin_rpc_path.display())).await
817 }
818}
819
820pub fn runtime() -> jsonrpc_server_utils::tokio::runtime::Runtime {
821 jsonrpc_server_utils::tokio::runtime::Runtime::new().expect("new tokio runtime")
822}
823
824#[derive(Default, Deserialize, Clone)]
825pub struct StakedNodesOverrides {
826 #[serde(deserialize_with = "deserialize_pubkey_map")]
827 pub staked_map_id: HashMap<Pubkey, u64>,
828}
829
830pub fn deserialize_pubkey_map<'de, D>(des: D) -> std::result::Result<HashMap<Pubkey, u64>, D::Error>
831where
832 D: Deserializer<'de>,
833{
834 let container: HashMap<String, u64> = serde::Deserialize::deserialize(des)?;
835 let mut container_typed: HashMap<Pubkey, u64> = HashMap::new();
836 for (key, value) in container.iter() {
837 let typed_key = Pubkey::try_from(key.as_str())
838 .map_err(|_| serde::de::Error::invalid_type(serde::de::Unexpected::Map, &"PubKey"))?;
839 container_typed.insert(typed_key, *value);
840 }
841 Ok(container_typed)
842}
843
844pub fn load_staked_nodes_overrides(
845 path: &String,
846) -> std::result::Result<StakedNodesOverrides, Box<dyn error::Error>> {
847 debug!("Loading staked nodes overrides configuration from {}", path);
848 if Path::new(&path).exists() {
849 let file = std::fs::File::open(path)?;
850 Ok(serde_yaml::from_reader(file)?)
851 } else {
852 Err(format!("Staked nodes overrides provided '{path}' a non-existing file path.").into())
853 }
854}
855
856#[cfg(test)]
857mod tests {
858 use {
859 super::*,
860 serde_json::Value,
861 solana_accounts_db::{accounts_index::AccountSecondaryIndexes, inline_spl_token},
862 solana_core::consensus::tower_storage::NullTowerStorage,
863 solana_gossip::cluster_info::ClusterInfo,
864 solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo},
865 solana_rpc::rpc::create_validator_exit,
866 solana_runtime::{
867 bank::{Bank, BankTestConfig},
868 bank_forks::BankForks,
869 },
870 solana_sdk::{
871 account::{Account, AccountSharedData},
872 pubkey::Pubkey,
873 system_program,
874 },
875 solana_streamer::socket::SocketAddrSpace,
876 spl_token_2022::{
877 solana_program::{program_option::COption, program_pack::Pack},
878 state::{Account as TokenAccount, AccountState as TokenAccountState, Mint},
879 },
880 std::{collections::HashSet, sync::atomic::AtomicBool},
881 };
882
883 #[derive(Default)]
884 struct TestConfig {
885 account_indexes: AccountSecondaryIndexes,
886 }
887
888 struct RpcHandler {
889 io: MetaIoHandler<AdminRpcRequestMetadata>,
890 meta: AdminRpcRequestMetadata,
891 bank_forks: Arc<RwLock<BankForks>>,
892 }
893
894 impl RpcHandler {
895 fn _start() -> Self {
896 Self::start_with_config(TestConfig::default())
897 }
898
899 fn start_with_config(config: TestConfig) -> Self {
900 let keypair = Arc::new(Keypair::new());
901 let cluster_info = Arc::new(ClusterInfo::new(
902 ContactInfo::new(
903 keypair.pubkey(),
904 solana_sdk::timing::timestamp(), 0u16, ),
907 keypair,
908 SocketAddrSpace::Unspecified,
909 ));
910 let exit = Arc::new(AtomicBool::new(false));
911 let validator_exit = create_validator_exit(exit);
912 let (bank_forks, vote_keypair) = new_bank_forks_with_config(BankTestConfig {
913 secondary_indexes: config.account_indexes,
914 });
915 let vote_account = vote_keypair.pubkey();
916 let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
917 let repair_whitelist = Arc::new(RwLock::new(HashSet::new()));
918 let meta = AdminRpcRequestMetadata {
919 rpc_addr: None,
920 start_time: SystemTime::now(),
921 start_progress,
922 validator_exit,
923 authorized_voter_keypairs: Arc::new(RwLock::new(vec![vote_keypair])),
924 tower_storage: Arc::new(NullTowerStorage {}),
925 post_init: Arc::new(RwLock::new(Some(AdminRpcRequestMetadataPostInit {
926 cluster_info,
927 bank_forks: bank_forks.clone(),
928 vote_account,
929 repair_whitelist,
930 notifies: Vec::new(),
931 repair_socket: Arc::new(std::net::UdpSocket::bind("0.0.0.0:0").unwrap()),
932 outstanding_repair_requests: Arc::<
933 RwLock<repair_service::OutstandingShredRepairs>,
934 >::default(),
935 cluster_slots: Arc::new(
936 solana_core::cluster_slots_service::cluster_slots::ClusterSlots::default(),
937 ),
938 }))),
939 staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
940 rpc_to_plugin_manager_sender: None,
941 };
942 let mut io = MetaIoHandler::default();
943 io.extend_with(AdminRpcImpl.to_delegate());
944
945 Self {
946 io,
947 meta,
948 bank_forks,
949 }
950 }
951
952 fn root_bank(&self) -> Arc<Bank> {
953 self.bank_forks.read().unwrap().root_bank()
954 }
955 }
956
957 fn new_bank_forks_with_config(
958 config: BankTestConfig,
959 ) -> (Arc<RwLock<BankForks>>, Arc<Keypair>) {
960 let GenesisConfigInfo {
961 genesis_config,
962 voting_keypair,
963 ..
964 } = create_genesis_config(1_000_000_000);
965
966 let bank = Bank::new_for_tests_with_config(&genesis_config, config);
967 (BankForks::new_rw_arc(bank), Arc::new(voting_keypair))
968 }
969
970 #[test]
971 fn test_secondary_index_key_sizes() {
972 for secondary_index_enabled in [true, false] {
973 let account_indexes = if secondary_index_enabled {
974 AccountSecondaryIndexes {
975 keys: None,
976 indexes: HashSet::from([
977 AccountIndex::ProgramId,
978 AccountIndex::SplTokenMint,
979 AccountIndex::SplTokenOwner,
980 ]),
981 }
982 } else {
983 AccountSecondaryIndexes::default()
984 };
985
986 let rpc = RpcHandler::start_with_config(TestConfig { account_indexes });
988
989 let bank = rpc.root_bank();
990 let RpcHandler { io, meta, .. } = rpc;
991
992 let token_account1_pubkey = Pubkey::new_unique();
994 let token_account2_pubkey = Pubkey::new_unique();
995 let token_account3_pubkey = Pubkey::new_unique();
996 let mint1_pubkey = Pubkey::new_unique();
997 let mint2_pubkey = Pubkey::new_unique();
998 let wallet1_pubkey = Pubkey::new_unique();
999 let wallet2_pubkey = Pubkey::new_unique();
1000 let non_existent_pubkey = Pubkey::new_unique();
1001 let delegate = Pubkey::new_unique();
1002
1003 let mut num_default_spl_token_program_accounts = 0;
1004 let mut num_default_system_program_accounts = 0;
1005
1006 if !secondary_index_enabled {
1007 let req = format!(
1009 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{token_account1_pubkey}"]}}"#,
1010 );
1011 let res = io.handle_request_sync(&req, meta.clone());
1012 let result: Value = serde_json::from_str(&res.expect("actual response"))
1013 .expect("actual response deserialization");
1014 let sizes: HashMap<RpcAccountIndex, usize> =
1015 serde_json::from_value(result["result"].clone()).unwrap();
1016 assert!(sizes.is_empty());
1017 } else {
1018 let req = format!(
1020 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1021 inline_spl_token::id(),
1022 );
1023 let res = io.handle_request_sync(&req, meta.clone());
1024 let result: Value = serde_json::from_str(&res.expect("actual response"))
1025 .expect("actual response deserialization");
1026 let sizes: HashMap<RpcAccountIndex, usize> =
1027 serde_json::from_value(result["result"].clone()).unwrap();
1028 assert_eq!(sizes.len(), 1);
1029 num_default_spl_token_program_accounts =
1030 *sizes.get(&RpcAccountIndex::ProgramId).unwrap();
1031 let req = format!(
1033 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1034 system_program::id(),
1035 );
1036 let res = io.handle_request_sync(&req, meta.clone());
1037 let result: Value = serde_json::from_str(&res.expect("actual response"))
1038 .expect("actual response deserialization");
1039 let sizes: HashMap<RpcAccountIndex, usize> =
1040 serde_json::from_value(result["result"].clone()).unwrap();
1041 assert_eq!(sizes.len(), 1);
1042 num_default_system_program_accounts =
1043 *sizes.get(&RpcAccountIndex::ProgramId).unwrap();
1044 }
1045
1046 let wallet1_account = AccountSharedData::from(Account {
1048 lamports: 11111111,
1049 owner: system_program::id(),
1050 ..Account::default()
1051 });
1052 bank.store_account(&wallet1_pubkey, &wallet1_account);
1053 let wallet2_account = AccountSharedData::from(Account {
1054 lamports: 11111111,
1055 owner: system_program::id(),
1056 ..Account::default()
1057 });
1058 bank.store_account(&wallet2_pubkey, &wallet2_account);
1059
1060 let mut account1_data = vec![0; TokenAccount::get_packed_len()];
1062 let token_account1 = TokenAccount {
1063 mint: mint1_pubkey,
1064 owner: wallet1_pubkey,
1065 delegate: COption::Some(delegate),
1066 amount: 420,
1067 state: TokenAccountState::Initialized,
1068 is_native: COption::None,
1069 delegated_amount: 30,
1070 close_authority: COption::Some(wallet1_pubkey),
1071 };
1072 TokenAccount::pack(token_account1, &mut account1_data).unwrap();
1073 let token_account1 = AccountSharedData::from(Account {
1074 lamports: 111,
1075 data: account1_data.to_vec(),
1076 owner: inline_spl_token::id(),
1077 ..Account::default()
1078 });
1079 bank.store_account(&token_account1_pubkey, &token_account1);
1080
1081 let mut mint1_data = vec![0; Mint::get_packed_len()];
1083 let mint1_state = Mint {
1084 mint_authority: COption::Some(wallet1_pubkey),
1085 supply: 500,
1086 decimals: 2,
1087 is_initialized: true,
1088 freeze_authority: COption::Some(wallet1_pubkey),
1089 };
1090 Mint::pack(mint1_state, &mut mint1_data).unwrap();
1091 let mint_account1 = AccountSharedData::from(Account {
1092 lamports: 222,
1093 data: mint1_data.to_vec(),
1094 owner: inline_spl_token::id(),
1095 ..Account::default()
1096 });
1097 bank.store_account(&mint1_pubkey, &mint_account1);
1098
1099 let mut account2_data = vec![0; TokenAccount::get_packed_len()];
1101 let token_account2 = TokenAccount {
1102 mint: mint1_pubkey,
1103 owner: wallet2_pubkey,
1104 delegate: COption::Some(delegate),
1105 amount: 420,
1106 state: TokenAccountState::Initialized,
1107 is_native: COption::None,
1108 delegated_amount: 30,
1109 close_authority: COption::Some(wallet2_pubkey),
1110 };
1111 TokenAccount::pack(token_account2, &mut account2_data).unwrap();
1112 let token_account2 = AccountSharedData::from(Account {
1113 lamports: 333,
1114 data: account2_data.to_vec(),
1115 owner: inline_spl_token::id(),
1116 ..Account::default()
1117 });
1118 bank.store_account(&token_account2_pubkey, &token_account2);
1119
1120 let mut account3_data = vec![0; TokenAccount::get_packed_len()];
1122 let token_account3 = TokenAccount {
1123 mint: mint2_pubkey,
1124 owner: wallet2_pubkey,
1125 delegate: COption::Some(delegate),
1126 amount: 42,
1127 state: TokenAccountState::Initialized,
1128 is_native: COption::None,
1129 delegated_amount: 30,
1130 close_authority: COption::Some(wallet2_pubkey),
1131 };
1132 TokenAccount::pack(token_account3, &mut account3_data).unwrap();
1133 let token_account3 = AccountSharedData::from(Account {
1134 lamports: 444,
1135 data: account3_data.to_vec(),
1136 owner: inline_spl_token::id(),
1137 ..Account::default()
1138 });
1139 bank.store_account(&token_account3_pubkey, &token_account3);
1140
1141 let mut mint2_data = vec![0; Mint::get_packed_len()];
1143 let mint2_state = Mint {
1144 mint_authority: COption::Some(wallet2_pubkey),
1145 supply: 200,
1146 decimals: 3,
1147 is_initialized: true,
1148 freeze_authority: COption::Some(wallet2_pubkey),
1149 };
1150 Mint::pack(mint2_state, &mut mint2_data).unwrap();
1151 let mint_account2 = AccountSharedData::from(Account {
1152 lamports: 555,
1153 data: mint2_data.to_vec(),
1154 owner: inline_spl_token::id(),
1155 ..Account::default()
1156 });
1157 bank.store_account(&mint2_pubkey, &mint_account2);
1158
1159 if secondary_index_enabled {
1176 let req = format!(
1178 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{non_existent_pubkey}"]}}"#,
1179 );
1180 let res = io.handle_request_sync(&req, meta.clone());
1181 let result: Value = serde_json::from_str(&res.expect("actual response"))
1182 .expect("actual response deserialization");
1183 let sizes: HashMap<RpcAccountIndex, usize> =
1184 serde_json::from_value(result["result"].clone()).unwrap();
1185 assert!(sizes.is_empty());
1186 let req = format!(
1189 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{wallet1_pubkey}"]}}"#,
1190 );
1191 let res = io.handle_request_sync(&req, meta.clone());
1192 let result: Value = serde_json::from_str(&res.expect("actual response"))
1193 .expect("actual response deserialization");
1194 let sizes: HashMap<RpcAccountIndex, usize> =
1195 serde_json::from_value(result["result"].clone()).unwrap();
1196 assert_eq!(sizes.len(), 1);
1197 assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenOwner).unwrap(), 1);
1198 let req = format!(
1200 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{wallet2_pubkey}"]}}"#,
1201 );
1202 let res = io.handle_request_sync(&req, meta.clone());
1203 let result: Value = serde_json::from_str(&res.expect("actual response"))
1204 .expect("actual response deserialization");
1205 let sizes: HashMap<RpcAccountIndex, usize> =
1206 serde_json::from_value(result["result"].clone()).unwrap();
1207 assert_eq!(sizes.len(), 1);
1208 assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenOwner).unwrap(), 2);
1209 let req = format!(
1211 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{mint1_pubkey}"]}}"#,
1212 );
1213 let res = io.handle_request_sync(&req, meta.clone());
1214 let result: Value = serde_json::from_str(&res.expect("actual response"))
1215 .expect("actual response deserialization");
1216 let sizes: HashMap<RpcAccountIndex, usize> =
1217 serde_json::from_value(result["result"].clone()).unwrap();
1218 assert_eq!(sizes.len(), 1);
1219 assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenMint).unwrap(), 2);
1220 let req = format!(
1222 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{mint2_pubkey}"]}}"#,
1223 );
1224 let res = io.handle_request_sync(&req, meta.clone());
1225 let result: Value = serde_json::from_str(&res.expect("actual response"))
1226 .expect("actual response deserialization");
1227 let sizes: HashMap<RpcAccountIndex, usize> =
1228 serde_json::from_value(result["result"].clone()).unwrap();
1229 assert_eq!(sizes.len(), 1);
1230 assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenMint).unwrap(), 1);
1231 let req = format!(
1233 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1234 inline_spl_token::id(),
1235 );
1236 let res = io.handle_request_sync(&req, meta.clone());
1237 let result: Value = serde_json::from_str(&res.expect("actual response"))
1238 .expect("actual response deserialization");
1239 let sizes: HashMap<RpcAccountIndex, usize> =
1240 serde_json::from_value(result["result"].clone()).unwrap();
1241 assert_eq!(sizes.len(), 1);
1242 assert_eq!(
1243 *sizes.get(&RpcAccountIndex::ProgramId).unwrap(),
1244 (num_default_spl_token_program_accounts + 5)
1245 );
1246 let req = format!(
1248 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1249 system_program::id(),
1250 );
1251 let res = io.handle_request_sync(&req, meta.clone());
1252 let result: Value = serde_json::from_str(&res.expect("actual response"))
1253 .expect("actual response deserialization");
1254 let sizes: HashMap<RpcAccountIndex, usize> =
1255 serde_json::from_value(result["result"].clone()).unwrap();
1256 assert_eq!(sizes.len(), 1);
1257 assert_eq!(
1258 *sizes.get(&RpcAccountIndex::ProgramId).unwrap(),
1259 (num_default_system_program_accounts + 2)
1260 );
1261 } else {
1262 let req = format!(
1264 r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{token_account2_pubkey}"]}}"#,
1265 );
1266 let res = io.handle_request_sync(&req, meta.clone());
1267 let result: Value = serde_json::from_str(&res.expect("actual response"))
1268 .expect("actual response deserialization");
1269 let sizes: HashMap<RpcAccountIndex, usize> =
1270 serde_json::from_value(result["result"].clone()).unwrap();
1271 assert!(sizes.is_empty());
1272 }
1273 }
1274 }
1275}