agave_validator/
admin_rpc_service.rs

1use {
2    crossbeam_channel::Sender,
3    jsonrpc_core::{BoxFuture, ErrorCode, MetaIoHandler, Metadata, Result},
4    jsonrpc_core_client::{transports::ipc, RpcError},
5    jsonrpc_derive::rpc,
6    jsonrpc_ipc_server::{
7        tokio::sync::oneshot::channel as oneshot_channel, RequestContext, ServerBuilder,
8    },
9    log::*,
10    serde::{de::Deserializer, Deserialize, Serialize},
11    solana_accounts_db::accounts_index::AccountIndex,
12    solana_core::{
13        admin_rpc_post_init::AdminRpcRequestMetadataPostInit,
14        consensus::{tower_storage::TowerStorage, Tower},
15        repair::repair_service,
16        validator::ValidatorStartProgress,
17    },
18    solana_geyser_plugin_manager::GeyserPluginManagerRequest,
19    solana_gossip::contact_info::{ContactInfo, Protocol, SOCKET_ADDR_UNSPECIFIED},
20    solana_keypair::{read_keypair_file, Keypair},
21    solana_pubkey::Pubkey,
22    solana_rpc::rpc::verify_pubkey,
23    solana_rpc_client_api::{config::RpcAccountIndex, custom_error::RpcCustomError},
24    solana_signer::Signer,
25    solana_validator_exit::Exit,
26    std::{
27        collections::{HashMap, HashSet},
28        env, error,
29        fmt::{self, Display},
30        net::SocketAddr,
31        path::{Path, PathBuf},
32        sync::{
33            atomic::{AtomicBool, Ordering},
34            Arc, RwLock,
35        },
36        thread::{self, Builder},
37        time::{Duration, SystemTime},
38    },
39    tokio::runtime::Runtime,
40};
41
42#[derive(Clone)]
43pub struct AdminRpcRequestMetadata {
44    pub rpc_addr: Option<SocketAddr>,
45    pub start_time: SystemTime,
46    pub start_progress: Arc<RwLock<ValidatorStartProgress>>,
47    pub validator_exit: Arc<RwLock<Exit>>,
48    pub validator_exit_backpressure: HashMap<String, Arc<AtomicBool>>,
49    pub authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
50    pub tower_storage: Arc<dyn TowerStorage>,
51    pub staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
52    pub post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
53    pub rpc_to_plugin_manager_sender: Option<Sender<GeyserPluginManagerRequest>>,
54}
55
56impl Metadata for AdminRpcRequestMetadata {}
57
58impl AdminRpcRequestMetadata {
59    fn with_post_init<F, R>(&self, func: F) -> Result<R>
60    where
61        F: FnOnce(&AdminRpcRequestMetadataPostInit) -> Result<R>,
62    {
63        if let Some(post_init) = self.post_init.read().unwrap().as_ref() {
64            func(post_init)
65        } else {
66            Err(jsonrpc_core::error::Error::invalid_params(
67                "Retry once validator start up is complete",
68            ))
69        }
70    }
71}
72
73#[derive(Debug, Deserialize, Serialize)]
74pub struct AdminRpcContactInfo {
75    pub id: String,
76    pub gossip: SocketAddr,
77    pub tvu: SocketAddr,
78    pub tvu_quic: SocketAddr,
79    pub serve_repair_quic: SocketAddr,
80    pub tpu: SocketAddr,
81    pub tpu_forwards: SocketAddr,
82    pub tpu_vote: SocketAddr,
83    pub rpc: SocketAddr,
84    pub rpc_pubsub: SocketAddr,
85    pub serve_repair: SocketAddr,
86    pub last_updated_timestamp: u64,
87    pub shred_version: u16,
88}
89
90#[derive(Debug, Deserialize, Serialize)]
91pub struct AdminRpcRepairWhitelist {
92    pub whitelist: Vec<Pubkey>,
93}
94
95impl From<ContactInfo> for AdminRpcContactInfo {
96    fn from(node: ContactInfo) -> Self {
97        macro_rules! unwrap_socket {
98            ($name:ident) => {
99                node.$name().unwrap_or(SOCKET_ADDR_UNSPECIFIED)
100            };
101            ($name:ident, $protocol:expr) => {
102                node.$name($protocol).unwrap_or(SOCKET_ADDR_UNSPECIFIED)
103            };
104        }
105        Self {
106            id: node.pubkey().to_string(),
107            last_updated_timestamp: node.wallclock(),
108            gossip: unwrap_socket!(gossip),
109            tvu: unwrap_socket!(tvu, Protocol::UDP),
110            tvu_quic: unwrap_socket!(tvu, Protocol::QUIC),
111            serve_repair_quic: unwrap_socket!(serve_repair, Protocol::QUIC),
112            tpu: unwrap_socket!(tpu, Protocol::UDP),
113            tpu_forwards: unwrap_socket!(tpu_forwards, Protocol::UDP),
114            tpu_vote: unwrap_socket!(tpu_vote, Protocol::UDP),
115            rpc: unwrap_socket!(rpc),
116            rpc_pubsub: unwrap_socket!(rpc_pubsub),
117            serve_repair: unwrap_socket!(serve_repair, Protocol::UDP),
118            shred_version: node.shred_version(),
119        }
120    }
121}
122
123impl Display for AdminRpcContactInfo {
124    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
125        writeln!(f, "Identity: {}", self.id)?;
126        writeln!(f, "Gossip: {}", self.gossip)?;
127        writeln!(f, "TVU: {}", self.tvu)?;
128        writeln!(f, "TVU QUIC: {}", self.tvu_quic)?;
129        writeln!(f, "TPU: {}", self.tpu)?;
130        writeln!(f, "TPU Forwards: {}", self.tpu_forwards)?;
131        writeln!(f, "TPU Votes: {}", self.tpu_vote)?;
132        writeln!(f, "RPC: {}", self.rpc)?;
133        writeln!(f, "RPC Pubsub: {}", self.rpc_pubsub)?;
134        writeln!(f, "Serve Repair: {}", self.serve_repair)?;
135        writeln!(f, "Last Updated Timestamp: {}", self.last_updated_timestamp)?;
136        writeln!(f, "Shred Version: {}", self.shred_version)
137    }
138}
139impl solana_cli_output::VerboseDisplay for AdminRpcContactInfo {}
140impl solana_cli_output::QuietDisplay for AdminRpcContactInfo {}
141
142impl Display for AdminRpcRepairWhitelist {
143    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
144        writeln!(f, "Repair whitelist: {:?}", &self.whitelist)
145    }
146}
147impl solana_cli_output::VerboseDisplay for AdminRpcRepairWhitelist {}
148impl solana_cli_output::QuietDisplay for AdminRpcRepairWhitelist {}
149
150#[rpc]
151pub trait AdminRpc {
152    type Metadata;
153
154    #[rpc(meta, name = "exit")]
155    fn exit(&self, meta: Self::Metadata) -> Result<()>;
156
157    #[rpc(meta, name = "reloadPlugin")]
158    fn reload_plugin(
159        &self,
160        meta: Self::Metadata,
161        name: String,
162        config_file: String,
163    ) -> BoxFuture<Result<()>>;
164
165    #[rpc(meta, name = "unloadPlugin")]
166    fn unload_plugin(&self, meta: Self::Metadata, name: String) -> BoxFuture<Result<()>>;
167
168    #[rpc(meta, name = "loadPlugin")]
169    fn load_plugin(&self, meta: Self::Metadata, config_file: String) -> BoxFuture<Result<String>>;
170
171    #[rpc(meta, name = "listPlugins")]
172    fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture<Result<Vec<String>>>;
173
174    #[rpc(meta, name = "rpcAddress")]
175    fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>>;
176
177    #[rpc(name = "setLogFilter")]
178    fn set_log_filter(&self, filter: String) -> Result<()>;
179
180    #[rpc(meta, name = "startTime")]
181    fn start_time(&self, meta: Self::Metadata) -> Result<SystemTime>;
182
183    #[rpc(meta, name = "startProgress")]
184    fn start_progress(&self, meta: Self::Metadata) -> Result<ValidatorStartProgress>;
185
186    #[rpc(meta, name = "addAuthorizedVoter")]
187    fn add_authorized_voter(&self, meta: Self::Metadata, keypair_file: String) -> Result<()>;
188
189    #[rpc(meta, name = "addAuthorizedVoterFromBytes")]
190    fn add_authorized_voter_from_bytes(&self, meta: Self::Metadata, keypair: Vec<u8>)
191        -> Result<()>;
192
193    #[rpc(meta, name = "removeAllAuthorizedVoters")]
194    fn remove_all_authorized_voters(&self, meta: Self::Metadata) -> Result<()>;
195
196    #[rpc(meta, name = "setIdentity")]
197    fn set_identity(
198        &self,
199        meta: Self::Metadata,
200        keypair_file: String,
201        require_tower: bool,
202    ) -> Result<()>;
203
204    #[rpc(meta, name = "setIdentityFromBytes")]
205    fn set_identity_from_bytes(
206        &self,
207        meta: Self::Metadata,
208        identity_keypair: Vec<u8>,
209        require_tower: bool,
210    ) -> Result<()>;
211
212    #[rpc(meta, name = "setStakedNodesOverrides")]
213    fn set_staked_nodes_overrides(&self, meta: Self::Metadata, path: String) -> Result<()>;
214
215    #[rpc(meta, name = "contactInfo")]
216    fn contact_info(&self, meta: Self::Metadata) -> Result<AdminRpcContactInfo>;
217
218    #[rpc(meta, name = "repairShredFromPeer")]
219    fn repair_shred_from_peer(
220        &self,
221        meta: Self::Metadata,
222        pubkey: Option<Pubkey>,
223        slot: u64,
224        shred_index: u64,
225    ) -> Result<()>;
226
227    #[rpc(meta, name = "repairWhitelist")]
228    fn repair_whitelist(&self, meta: Self::Metadata) -> Result<AdminRpcRepairWhitelist>;
229
230    #[rpc(meta, name = "setRepairWhitelist")]
231    fn set_repair_whitelist(&self, meta: Self::Metadata, whitelist: Vec<Pubkey>) -> Result<()>;
232
233    #[rpc(meta, name = "getSecondaryIndexKeySize")]
234    fn get_secondary_index_key_size(
235        &self,
236        meta: Self::Metadata,
237        pubkey_str: String,
238    ) -> Result<HashMap<RpcAccountIndex, usize>>;
239
240    #[rpc(meta, name = "setPublicTpuAddress")]
241    fn set_public_tpu_address(
242        &self,
243        meta: Self::Metadata,
244        public_tpu_addr: SocketAddr,
245    ) -> Result<()>;
246
247    #[rpc(meta, name = "setPublicTpuForwardsAddress")]
248    fn set_public_tpu_forwards_address(
249        &self,
250        meta: Self::Metadata,
251        public_tpu_forwards_addr: SocketAddr,
252    ) -> Result<()>;
253}
254
255pub struct AdminRpcImpl;
256impl AdminRpc for AdminRpcImpl {
257    type Metadata = AdminRpcRequestMetadata;
258
259    fn exit(&self, meta: Self::Metadata) -> Result<()> {
260        debug!("exit admin rpc request received");
261
262        thread::Builder::new()
263            .name("solProcessExit".into())
264            .spawn(move || {
265                // Delay exit signal until this RPC request completes, otherwise the caller of `exit` might
266                // receive a confusing error as the validator shuts down before a response is sent back.
267                thread::sleep(Duration::from_millis(100));
268
269                warn!("validator exit requested");
270                meta.validator_exit.write().unwrap().exit();
271
272                if !meta.validator_exit_backpressure.is_empty() {
273                    let service_names = meta.validator_exit_backpressure.keys();
274                    info!("Wait for these services to complete: {service_names:?}");
275                    loop {
276                        // The initial sleep is a grace period to allow the services to raise their
277                        // backpressure flags.
278                        // Subsequent sleeps are to throttle how often we check and log.
279                        thread::sleep(Duration::from_secs(1));
280
281                        let mut any_flags_raised = false;
282                        for (name, flag) in meta.validator_exit_backpressure.iter() {
283                            let is_flag_raised = flag.load(Ordering::Relaxed);
284                            if is_flag_raised {
285                                info!("{name}'s exit backpressure flag is raised");
286                                any_flags_raised = true;
287                            }
288                        }
289                        if !any_flags_raised {
290                            break;
291                        }
292                    }
293                    info!("All services have completed");
294                }
295
296                // TODO: Debug why Exit doesn't always cause the validator to fully exit
297                // (rocksdb background processing or some other stuck thread perhaps?).
298                //
299                // If the process is still alive after five seconds, exit harder
300                thread::sleep(Duration::from_secs(
301                    env::var("SOLANA_VALIDATOR_EXIT_TIMEOUT")
302                        .ok()
303                        .and_then(|x| x.parse().ok())
304                        .unwrap_or(5),
305                ));
306                warn!("validator exit timeout");
307                std::process::exit(0);
308            })
309            .unwrap();
310
311        Ok(())
312    }
313
314    fn reload_plugin(
315        &self,
316        meta: Self::Metadata,
317        name: String,
318        config_file: String,
319    ) -> BoxFuture<Result<()>> {
320        Box::pin(async move {
321            // Construct channel for plugin to respond to this particular rpc request instance
322            let (response_sender, response_receiver) = oneshot_channel();
323
324            // Send request to plugin manager if there is a geyser service
325            if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
326                rpc_to_manager_sender
327                    .send(GeyserPluginManagerRequest::ReloadPlugin {
328                        name,
329                        config_file,
330                        response_sender,
331                    })
332                    .expect("GeyerPluginService should never drop request receiver");
333            } else {
334                return Err(jsonrpc_core::Error {
335                    code: ErrorCode::InvalidRequest,
336                    message: "No geyser plugin service".to_string(),
337                    data: None,
338                });
339            }
340
341            // Await response from plugin manager
342            response_receiver
343                .await
344                .expect("GeyerPluginService's oneshot sender shouldn't drop early")
345        })
346    }
347
348    fn load_plugin(&self, meta: Self::Metadata, config_file: String) -> BoxFuture<Result<String>> {
349        Box::pin(async move {
350            // Construct channel for plugin to respond to this particular rpc request instance
351            let (response_sender, response_receiver) = oneshot_channel();
352
353            // Send request to plugin manager if there is a geyser service
354            if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
355                rpc_to_manager_sender
356                    .send(GeyserPluginManagerRequest::LoadPlugin {
357                        config_file,
358                        response_sender,
359                    })
360                    .expect("GeyerPluginService should never drop request receiver");
361            } else {
362                return Err(jsonrpc_core::Error {
363                    code: ErrorCode::InvalidRequest,
364                    message: "No geyser plugin service".to_string(),
365                    data: None,
366                });
367            }
368
369            // Await response from plugin manager
370            response_receiver
371                .await
372                .expect("GeyerPluginService's oneshot sender shouldn't drop early")
373        })
374    }
375
376    fn unload_plugin(&self, meta: Self::Metadata, name: String) -> BoxFuture<Result<()>> {
377        Box::pin(async move {
378            // Construct channel for plugin to respond to this particular rpc request instance
379            let (response_sender, response_receiver) = oneshot_channel();
380
381            // Send request to plugin manager if there is a geyser service
382            if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
383                rpc_to_manager_sender
384                    .send(GeyserPluginManagerRequest::UnloadPlugin {
385                        name,
386                        response_sender,
387                    })
388                    .expect("GeyerPluginService should never drop request receiver");
389            } else {
390                return Err(jsonrpc_core::Error {
391                    code: ErrorCode::InvalidRequest,
392                    message: "No geyser plugin service".to_string(),
393                    data: None,
394                });
395            }
396
397            // Await response from plugin manager
398            response_receiver
399                .await
400                .expect("GeyerPluginService's oneshot sender shouldn't drop early")
401        })
402    }
403
404    fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture<Result<Vec<String>>> {
405        Box::pin(async move {
406            // Construct channel for plugin to respond to this particular rpc request instance
407            let (response_sender, response_receiver) = oneshot_channel();
408
409            // Send request to plugin manager
410            if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
411                rpc_to_manager_sender
412                    .send(GeyserPluginManagerRequest::ListPlugins { response_sender })
413                    .expect("GeyerPluginService should never drop request receiver");
414            } else {
415                return Err(jsonrpc_core::Error {
416                    code: ErrorCode::InvalidRequest,
417                    message: "No geyser plugin service".to_string(),
418                    data: None,
419                });
420            }
421
422            // Await response from plugin manager
423            response_receiver
424                .await
425                .expect("GeyerPluginService's oneshot sender shouldn't drop early")
426        })
427    }
428
429    fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>> {
430        debug!("rpc_addr admin rpc request received");
431        Ok(meta.rpc_addr)
432    }
433
434    fn set_log_filter(&self, filter: String) -> Result<()> {
435        debug!("set_log_filter admin rpc request received");
436        solana_logger::setup_with(&filter);
437        Ok(())
438    }
439
440    fn start_time(&self, meta: Self::Metadata) -> Result<SystemTime> {
441        debug!("start_time admin rpc request received");
442        Ok(meta.start_time)
443    }
444
445    fn start_progress(&self, meta: Self::Metadata) -> Result<ValidatorStartProgress> {
446        debug!("start_progress admin rpc request received");
447        Ok(*meta.start_progress.read().unwrap())
448    }
449
450    fn add_authorized_voter(&self, meta: Self::Metadata, keypair_file: String) -> Result<()> {
451        debug!("add_authorized_voter request received");
452
453        let authorized_voter = read_keypair_file(keypair_file)
454            .map_err(|err| jsonrpc_core::error::Error::invalid_params(format!("{err}")))?;
455
456        AdminRpcImpl::add_authorized_voter_keypair(meta, authorized_voter)
457    }
458
459    fn add_authorized_voter_from_bytes(
460        &self,
461        meta: Self::Metadata,
462        keypair: Vec<u8>,
463    ) -> Result<()> {
464        debug!("add_authorized_voter_from_bytes request received");
465
466        let authorized_voter = Keypair::from_bytes(&keypair).map_err(|err| {
467            jsonrpc_core::error::Error::invalid_params(format!(
468                "Failed to read authorized voter keypair from provided byte array: {err}"
469            ))
470        })?;
471
472        AdminRpcImpl::add_authorized_voter_keypair(meta, authorized_voter)
473    }
474
475    fn remove_all_authorized_voters(&self, meta: Self::Metadata) -> Result<()> {
476        debug!("remove_all_authorized_voters received");
477        meta.authorized_voter_keypairs.write().unwrap().clear();
478        Ok(())
479    }
480
481    fn set_identity(
482        &self,
483        meta: Self::Metadata,
484        keypair_file: String,
485        require_tower: bool,
486    ) -> Result<()> {
487        debug!("set_identity request received");
488
489        let identity_keypair = read_keypair_file(&keypair_file).map_err(|err| {
490            jsonrpc_core::error::Error::invalid_params(format!(
491                "Failed to read identity keypair from {keypair_file}: {err}"
492            ))
493        })?;
494
495        AdminRpcImpl::set_identity_keypair(meta, identity_keypair, require_tower)
496    }
497
498    fn set_identity_from_bytes(
499        &self,
500        meta: Self::Metadata,
501        identity_keypair: Vec<u8>,
502        require_tower: bool,
503    ) -> Result<()> {
504        debug!("set_identity_from_bytes request received");
505
506        let identity_keypair = Keypair::from_bytes(&identity_keypair).map_err(|err| {
507            jsonrpc_core::error::Error::invalid_params(format!(
508                "Failed to read identity keypair from provided byte array: {err}"
509            ))
510        })?;
511
512        AdminRpcImpl::set_identity_keypair(meta, identity_keypair, require_tower)
513    }
514
515    fn set_staked_nodes_overrides(&self, meta: Self::Metadata, path: String) -> Result<()> {
516        let loaded_config = load_staked_nodes_overrides(&path)
517            .map_err(|err| {
518                error!(
519                    "Failed to load staked nodes overrides from {}: {}",
520                    &path, err
521                );
522                jsonrpc_core::error::Error::internal_error()
523            })?
524            .staked_map_id;
525        let mut write_staked_nodes = meta.staked_nodes_overrides.write().unwrap();
526        write_staked_nodes.clear();
527        write_staked_nodes.extend(loaded_config);
528        info!("Staked nodes overrides loaded from {}", path);
529        debug!("overrides map: {:?}", write_staked_nodes);
530        Ok(())
531    }
532
533    fn contact_info(&self, meta: Self::Metadata) -> Result<AdminRpcContactInfo> {
534        meta.with_post_init(|post_init| Ok(post_init.cluster_info.my_contact_info().into()))
535    }
536
537    fn repair_shred_from_peer(
538        &self,
539        meta: Self::Metadata,
540        pubkey: Option<Pubkey>,
541        slot: u64,
542        shred_index: u64,
543    ) -> Result<()> {
544        debug!("repair_shred_from_peer request received");
545
546        meta.with_post_init(|post_init| {
547            repair_service::RepairService::request_repair_for_shred_from_peer(
548                post_init.cluster_info.clone(),
549                post_init.cluster_slots.clone(),
550                pubkey,
551                slot,
552                shred_index,
553                &post_init.repair_socket.clone(),
554                post_init.outstanding_repair_requests.clone(),
555            );
556            Ok(())
557        })
558    }
559
560    fn repair_whitelist(&self, meta: Self::Metadata) -> Result<AdminRpcRepairWhitelist> {
561        debug!("repair_whitelist request received");
562
563        meta.with_post_init(|post_init| {
564            let whitelist: Vec<_> = post_init
565                .repair_whitelist
566                .read()
567                .unwrap()
568                .iter()
569                .copied()
570                .collect();
571            Ok(AdminRpcRepairWhitelist { whitelist })
572        })
573    }
574
575    fn set_repair_whitelist(&self, meta: Self::Metadata, whitelist: Vec<Pubkey>) -> Result<()> {
576        debug!("set_repair_whitelist request received");
577
578        let whitelist: HashSet<Pubkey> = whitelist.into_iter().collect();
579        meta.with_post_init(|post_init| {
580            *post_init.repair_whitelist.write().unwrap() = whitelist;
581            warn!(
582                "Repair whitelist set to {:?}",
583                &post_init.repair_whitelist.read().unwrap()
584            );
585            Ok(())
586        })
587    }
588
589    fn get_secondary_index_key_size(
590        &self,
591        meta: Self::Metadata,
592        pubkey_str: String,
593    ) -> Result<HashMap<RpcAccountIndex, usize>> {
594        debug!(
595            "get_secondary_index_key_size rpc request received: {:?}",
596            pubkey_str
597        );
598        let index_key = verify_pubkey(&pubkey_str)?;
599        meta.with_post_init(|post_init| {
600            let bank = post_init.bank_forks.read().unwrap().root_bank();
601
602            // Take ref to enabled AccountSecondaryIndexes
603            let enabled_account_indexes = &bank.accounts().accounts_db.account_indexes;
604
605            // Exit if secondary indexes are not enabled
606            if enabled_account_indexes.is_empty() {
607                debug!("get_secondary_index_key_size: secondary index not enabled.");
608                return Ok(HashMap::new());
609            };
610
611            // Make sure the requested key is not explicitly excluded
612            if !enabled_account_indexes.include_key(&index_key) {
613                return Err(RpcCustomError::KeyExcludedFromSecondaryIndex {
614                    index_key: index_key.to_string(),
615                }
616                .into());
617            }
618
619            // Grab a ref to the AccountsDbfor this Bank
620            let accounts_index = &bank.accounts().accounts_db.accounts_index;
621
622            // Find the size of the key in every index where it exists
623            let found_sizes = enabled_account_indexes
624                .indexes
625                .iter()
626                .filter_map(|index| {
627                    accounts_index
628                        .get_index_key_size(index, &index_key)
629                        .map(|size| (rpc_account_index_from_account_index(index), size))
630                })
631                .collect::<HashMap<_, _>>();
632
633            // Note: Will return an empty HashMap if no keys are found.
634            if found_sizes.is_empty() {
635                debug!("get_secondary_index_key_size: key not found in the secondary index.");
636            }
637            Ok(found_sizes)
638        })
639    }
640
641    fn set_public_tpu_address(
642        &self,
643        meta: Self::Metadata,
644        public_tpu_addr: SocketAddr,
645    ) -> Result<()> {
646        debug!("set_public_tpu_address rpc request received: {public_tpu_addr}");
647
648        meta.with_post_init(|post_init| {
649            post_init
650                .cluster_info
651                .my_contact_info()
652                .tpu(Protocol::UDP)
653                .ok_or_else(|| {
654                    error!(
655                        "The public TPU address isn't being published. The node is likely in \
656                         repair mode. See help for --restricted-repair-only-mode for more \
657                         information."
658                    );
659                    jsonrpc_core::error::Error::internal_error()
660                })?;
661            post_init
662                .cluster_info
663                .set_tpu(public_tpu_addr)
664                .map_err(|err| {
665                    error!("Failed to set public TPU address to {public_tpu_addr}: {err}");
666                    jsonrpc_core::error::Error::internal_error()
667                })?;
668            let my_contact_info = post_init.cluster_info.my_contact_info();
669            warn!(
670                "Public TPU addresses set to {:?} (udp) and {:?} (quic)",
671                my_contact_info.tpu(Protocol::UDP),
672                my_contact_info.tpu(Protocol::QUIC),
673            );
674            Ok(())
675        })
676    }
677
678    fn set_public_tpu_forwards_address(
679        &self,
680        meta: Self::Metadata,
681        public_tpu_forwards_addr: SocketAddr,
682    ) -> Result<()> {
683        debug!("set_public_tpu_forwards_address rpc request received: {public_tpu_forwards_addr}");
684
685        meta.with_post_init(|post_init| {
686            post_init
687                .cluster_info
688                .my_contact_info()
689                .tpu_forwards(Protocol::UDP)
690                .ok_or_else(|| {
691                    error!(
692                        "The public TPU Forwards address isn't being published. The node is \
693                         likely in repair mode. See help for --restricted-repair-only-mode for \
694                         more information."
695                    );
696                    jsonrpc_core::error::Error::internal_error()
697                })?;
698            post_init
699                .cluster_info
700                .set_tpu_forwards(public_tpu_forwards_addr)
701                .map_err(|err| {
702                    error!("Failed to set public TPU address to {public_tpu_forwards_addr}: {err}");
703                    jsonrpc_core::error::Error::internal_error()
704                })?;
705            let my_contact_info = post_init.cluster_info.my_contact_info();
706            warn!(
707                "Public TPU Forwards addresses set to {:?} (udp) and {:?} (quic)",
708                my_contact_info.tpu_forwards(Protocol::UDP),
709                my_contact_info.tpu_forwards(Protocol::QUIC),
710            );
711            Ok(())
712        })
713    }
714}
715
716impl AdminRpcImpl {
717    fn add_authorized_voter_keypair(
718        meta: AdminRpcRequestMetadata,
719        authorized_voter: Keypair,
720    ) -> Result<()> {
721        let mut authorized_voter_keypairs = meta.authorized_voter_keypairs.write().unwrap();
722
723        if authorized_voter_keypairs
724            .iter()
725            .any(|x| x.pubkey() == authorized_voter.pubkey())
726        {
727            Err(jsonrpc_core::error::Error::invalid_params(
728                "Authorized voter already present",
729            ))
730        } else {
731            authorized_voter_keypairs.push(Arc::new(authorized_voter));
732            Ok(())
733        }
734    }
735
736    fn set_identity_keypair(
737        meta: AdminRpcRequestMetadata,
738        identity_keypair: Keypair,
739        require_tower: bool,
740    ) -> Result<()> {
741        meta.with_post_init(|post_init| {
742            if require_tower {
743                let _ = Tower::restore(meta.tower_storage.as_ref(), &identity_keypair.pubkey())
744                    .map_err(|err| {
745                        jsonrpc_core::error::Error::invalid_params(format!(
746                            "Unable to load tower file for identity {}: {}",
747                            identity_keypair.pubkey(),
748                            err
749                        ))
750                    })?;
751            }
752
753            for (key, notifier) in &*post_init.notifies.read().unwrap() {
754                if let Err(err) = notifier.update_key(&identity_keypair) {
755                    error!("Error updating network layer keypair: {err} on {key:?}");
756                }
757            }
758
759            solana_metrics::set_host_id(identity_keypair.pubkey().to_string());
760            post_init
761                .cluster_info
762                .set_keypair(Arc::new(identity_keypair));
763            warn!("Identity set to {}", post_init.cluster_info.id());
764            Ok(())
765        })
766    }
767}
768
769fn rpc_account_index_from_account_index(account_index: &AccountIndex) -> RpcAccountIndex {
770    match account_index {
771        AccountIndex::ProgramId => RpcAccountIndex::ProgramId,
772        AccountIndex::SplTokenOwner => RpcAccountIndex::SplTokenOwner,
773        AccountIndex::SplTokenMint => RpcAccountIndex::SplTokenMint,
774    }
775}
776
777// Start the Admin RPC interface
778pub fn run(ledger_path: &Path, metadata: AdminRpcRequestMetadata) {
779    let admin_rpc_path = admin_rpc_path(ledger_path);
780
781    let event_loop = tokio::runtime::Builder::new_multi_thread()
782        .thread_name("solAdminRpcEl")
783        .worker_threads(3) // Three still seems like a lot, and better than the default of available core count
784        .enable_all()
785        .build()
786        .unwrap();
787
788    Builder::new()
789        .name("solAdminRpc".to_string())
790        .spawn(move || {
791            let mut io = MetaIoHandler::default();
792            io.extend_with(AdminRpcImpl.to_delegate());
793
794            let validator_exit = metadata.validator_exit.clone();
795            let server = ServerBuilder::with_meta_extractor(io, move |_req: &RequestContext| {
796                metadata.clone()
797            })
798            .event_loop_executor(event_loop.handle().clone())
799            .start(&format!("{}", admin_rpc_path.display()));
800
801            match server {
802                Err(err) => {
803                    warn!("Unable to start admin rpc service: {:?}", err);
804                }
805                Ok(server) => {
806                    info!("started admin rpc service!");
807                    let close_handle = server.close_handle();
808                    validator_exit
809                        .write()
810                        .unwrap()
811                        .register_exit(Box::new(move || {
812                            close_handle.close();
813                        }));
814
815                    server.wait();
816                }
817            }
818        })
819        .unwrap();
820}
821
822fn admin_rpc_path(ledger_path: &Path) -> PathBuf {
823    #[cfg(target_family = "windows")]
824    {
825        // More information about the wackiness of pipe names over at
826        // https://docs.microsoft.com/en-us/windows/win32/ipc/pipe-names
827        if let Some(ledger_filename) = ledger_path.file_name() {
828            PathBuf::from(format!(
829                "\\\\.\\pipe\\{}-admin.rpc",
830                ledger_filename.to_string_lossy()
831            ))
832        } else {
833            PathBuf::from("\\\\.\\pipe\\admin.rpc")
834        }
835    }
836    #[cfg(not(target_family = "windows"))]
837    {
838        ledger_path.join("admin.rpc")
839    }
840}
841
842// Connect to the Admin RPC interface
843pub async fn connect(ledger_path: &Path) -> std::result::Result<gen_client::Client, RpcError> {
844    let admin_rpc_path = admin_rpc_path(ledger_path);
845    if !admin_rpc_path.exists() {
846        Err(RpcError::Client(format!(
847            "{} does not exist",
848            admin_rpc_path.display()
849        )))
850    } else {
851        ipc::connect::<_, gen_client::Client>(&format!("{}", admin_rpc_path.display())).await
852    }
853}
854
855pub fn runtime() -> Runtime {
856    tokio::runtime::Builder::new_multi_thread()
857        .thread_name("solAdminRpcRt")
858        .enable_all()
859        .build()
860        .expect("new tokio runtime")
861}
862
863#[derive(Default, Deserialize, Clone)]
864pub struct StakedNodesOverrides {
865    #[serde(deserialize_with = "deserialize_pubkey_map")]
866    pub staked_map_id: HashMap<Pubkey, u64>,
867}
868
869pub fn deserialize_pubkey_map<'de, D>(des: D) -> std::result::Result<HashMap<Pubkey, u64>, D::Error>
870where
871    D: Deserializer<'de>,
872{
873    let container: HashMap<String, u64> = serde::Deserialize::deserialize(des)?;
874    let mut container_typed: HashMap<Pubkey, u64> = HashMap::new();
875    for (key, value) in container.iter() {
876        let typed_key = Pubkey::try_from(key.as_str())
877            .map_err(|_| serde::de::Error::invalid_type(serde::de::Unexpected::Map, &"PubKey"))?;
878        container_typed.insert(typed_key, *value);
879    }
880    Ok(container_typed)
881}
882
883pub fn load_staked_nodes_overrides(
884    path: &String,
885) -> std::result::Result<StakedNodesOverrides, Box<dyn error::Error>> {
886    debug!("Loading staked nodes overrides configuration from {}", path);
887    if Path::new(&path).exists() {
888        let file = std::fs::File::open(path)?;
889        Ok(serde_yaml::from_reader(file)?)
890    } else {
891        Err(format!("Staked nodes overrides provided '{path}' a non-existing file path.").into())
892    }
893}
894
895#[cfg(test)]
896mod tests {
897    use {
898        super::*,
899        serde_json::Value,
900        solana_account::{Account, AccountSharedData},
901        solana_accounts_db::{
902            accounts_db::{AccountsDbConfig, ACCOUNTS_DB_CONFIG_FOR_TESTING},
903            accounts_index::AccountSecondaryIndexes,
904        },
905        solana_core::{
906            admin_rpc_post_init::{KeyUpdaterType, KeyUpdaters},
907            consensus::tower_storage::NullTowerStorage,
908            validator::{Validator, ValidatorConfig, ValidatorTpuConfig},
909        },
910        solana_gossip::cluster_info::{ClusterInfo, Node},
911        solana_ledger::{
912            create_new_tmp_ledger,
913            genesis_utils::{
914                create_genesis_config, create_genesis_config_with_leader, GenesisConfigInfo,
915            },
916        },
917        solana_net_utils::bind_to_unspecified,
918        solana_program_option::COption,
919        solana_program_pack::Pack,
920        solana_pubkey::Pubkey,
921        solana_rpc::rpc::create_validator_exit,
922        solana_runtime::{
923            bank::{Bank, BankTestConfig},
924            bank_forks::BankForks,
925        },
926        solana_streamer::socket::SocketAddrSpace,
927        solana_system_interface::program as system_program,
928        solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP,
929        spl_generic_token::token,
930        spl_token_2022::state::{Account as TokenAccount, AccountState as TokenAccountState, Mint},
931        std::{collections::HashSet, fs::remove_dir_all, sync::atomic::AtomicBool},
932    };
933
934    #[derive(Default)]
935    struct TestConfig {
936        account_indexes: AccountSecondaryIndexes,
937    }
938
939    struct RpcHandler {
940        io: MetaIoHandler<AdminRpcRequestMetadata>,
941        meta: AdminRpcRequestMetadata,
942        bank_forks: Arc<RwLock<BankForks>>,
943    }
944
945    impl RpcHandler {
946        fn _start() -> Self {
947            Self::start_with_config(TestConfig::default())
948        }
949
950        fn start_with_config(config: TestConfig) -> Self {
951            let keypair = Arc::new(Keypair::new());
952            let cluster_info = Arc::new(ClusterInfo::new(
953                ContactInfo::new(
954                    keypair.pubkey(),
955                    solana_time_utils::timestamp(), // wallclock
956                    0u16,                           // shred_version
957                ),
958                keypair,
959                SocketAddrSpace::Unspecified,
960            ));
961            let exit = Arc::new(AtomicBool::new(false));
962            let validator_exit = create_validator_exit(exit);
963            let (bank_forks, vote_keypair) = new_bank_forks_with_config(BankTestConfig {
964                accounts_db_config: AccountsDbConfig {
965                    account_indexes: Some(config.account_indexes),
966                    ..ACCOUNTS_DB_CONFIG_FOR_TESTING
967                },
968            });
969            let vote_account = vote_keypair.pubkey();
970            let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
971            let repair_whitelist = Arc::new(RwLock::new(HashSet::new()));
972            let meta = AdminRpcRequestMetadata {
973                rpc_addr: None,
974                start_time: SystemTime::now(),
975                start_progress,
976                validator_exit,
977                validator_exit_backpressure: HashMap::default(),
978                authorized_voter_keypairs: Arc::new(RwLock::new(vec![vote_keypair])),
979                tower_storage: Arc::new(NullTowerStorage {}),
980                post_init: Arc::new(RwLock::new(Some(AdminRpcRequestMetadataPostInit {
981                    cluster_info,
982                    bank_forks: bank_forks.clone(),
983                    vote_account,
984                    repair_whitelist,
985                    notifies: Arc::new(RwLock::new(KeyUpdaters::default())),
986                    repair_socket: Arc::new(bind_to_unspecified().unwrap()),
987                    outstanding_repair_requests: Arc::<
988                        RwLock<repair_service::OutstandingShredRepairs>,
989                    >::default(),
990                    cluster_slots: Arc::new(
991                        solana_core::cluster_slots_service::cluster_slots::ClusterSlots::default(),
992                    ),
993                }))),
994                staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
995                rpc_to_plugin_manager_sender: None,
996            };
997            let mut io = MetaIoHandler::default();
998            io.extend_with(AdminRpcImpl.to_delegate());
999
1000            Self {
1001                io,
1002                meta,
1003                bank_forks,
1004            }
1005        }
1006
1007        fn root_bank(&self) -> Arc<Bank> {
1008            self.bank_forks.read().unwrap().root_bank()
1009        }
1010    }
1011
1012    fn new_bank_forks_with_config(
1013        config: BankTestConfig,
1014    ) -> (Arc<RwLock<BankForks>>, Arc<Keypair>) {
1015        let GenesisConfigInfo {
1016            genesis_config,
1017            voting_keypair,
1018            ..
1019        } = create_genesis_config(1_000_000_000);
1020
1021        let bank = Bank::new_with_config_for_tests(&genesis_config, config);
1022        (BankForks::new_rw_arc(bank), Arc::new(voting_keypair))
1023    }
1024
1025    #[test]
1026    fn test_secondary_index_key_sizes() {
1027        for secondary_index_enabled in [true, false] {
1028            let account_indexes = if secondary_index_enabled {
1029                AccountSecondaryIndexes {
1030                    keys: None,
1031                    indexes: HashSet::from([
1032                        AccountIndex::ProgramId,
1033                        AccountIndex::SplTokenMint,
1034                        AccountIndex::SplTokenOwner,
1035                    ]),
1036                }
1037            } else {
1038                AccountSecondaryIndexes::default()
1039            };
1040
1041            // RPC & Bank Setup
1042            let rpc = RpcHandler::start_with_config(TestConfig { account_indexes });
1043
1044            let bank = rpc.root_bank();
1045            let RpcHandler { io, meta, .. } = rpc;
1046
1047            // Pubkeys
1048            let token_account1_pubkey = Pubkey::new_unique();
1049            let token_account2_pubkey = Pubkey::new_unique();
1050            let token_account3_pubkey = Pubkey::new_unique();
1051            let mint1_pubkey = Pubkey::new_unique();
1052            let mint2_pubkey = Pubkey::new_unique();
1053            let wallet1_pubkey = Pubkey::new_unique();
1054            let wallet2_pubkey = Pubkey::new_unique();
1055            let non_existent_pubkey = Pubkey::new_unique();
1056            let delegate = Pubkey::new_unique();
1057
1058            let mut num_default_spl_token_program_accounts = 0;
1059            let mut num_default_system_program_accounts = 0;
1060
1061            if !secondary_index_enabled {
1062                // Test first with no accounts added & no secondary indexes enabled:
1063                let req = format!(
1064                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{token_account1_pubkey}"]}}"#,
1065                );
1066                let res = io.handle_request_sync(&req, meta.clone());
1067                let result: Value = serde_json::from_str(&res.expect("actual response"))
1068                    .expect("actual response deserialization");
1069                let sizes: HashMap<RpcAccountIndex, usize> =
1070                    serde_json::from_value(result["result"].clone()).unwrap();
1071                assert!(sizes.is_empty());
1072            } else {
1073                // Count SPL Token Program Default Accounts
1074                let req = format!(
1075                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1076                    token::id(),
1077                );
1078                let res = io.handle_request_sync(&req, meta.clone());
1079                let result: Value = serde_json::from_str(&res.expect("actual response"))
1080                    .expect("actual response deserialization");
1081                let sizes: HashMap<RpcAccountIndex, usize> =
1082                    serde_json::from_value(result["result"].clone()).unwrap();
1083                assert_eq!(sizes.len(), 1);
1084                num_default_spl_token_program_accounts =
1085                    *sizes.get(&RpcAccountIndex::ProgramId).unwrap();
1086                // Count System Program Default Accounts
1087                let req = format!(
1088                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1089                    system_program::id(),
1090                );
1091                let res = io.handle_request_sync(&req, meta.clone());
1092                let result: Value = serde_json::from_str(&res.expect("actual response"))
1093                    .expect("actual response deserialization");
1094                let sizes: HashMap<RpcAccountIndex, usize> =
1095                    serde_json::from_value(result["result"].clone()).unwrap();
1096                assert_eq!(sizes.len(), 1);
1097                num_default_system_program_accounts =
1098                    *sizes.get(&RpcAccountIndex::ProgramId).unwrap();
1099            }
1100
1101            // Add 2 basic wallet accounts
1102            let wallet1_account = AccountSharedData::from(Account {
1103                lamports: 11111111,
1104                owner: system_program::id(),
1105                ..Account::default()
1106            });
1107            bank.store_account(&wallet1_pubkey, &wallet1_account);
1108            let wallet2_account = AccountSharedData::from(Account {
1109                lamports: 11111111,
1110                owner: system_program::id(),
1111                ..Account::default()
1112            });
1113            bank.store_account(&wallet2_pubkey, &wallet2_account);
1114
1115            // Add a token account
1116            let mut account1_data = vec![0; TokenAccount::get_packed_len()];
1117            let token_account1 = TokenAccount {
1118                mint: mint1_pubkey,
1119                owner: wallet1_pubkey,
1120                delegate: COption::Some(delegate),
1121                amount: 420,
1122                state: TokenAccountState::Initialized,
1123                is_native: COption::None,
1124                delegated_amount: 30,
1125                close_authority: COption::Some(wallet1_pubkey),
1126            };
1127            TokenAccount::pack(token_account1, &mut account1_data).unwrap();
1128            let token_account1 = AccountSharedData::from(Account {
1129                lamports: 111,
1130                data: account1_data.to_vec(),
1131                owner: token::id(),
1132                ..Account::default()
1133            });
1134            bank.store_account(&token_account1_pubkey, &token_account1);
1135
1136            // Add the mint
1137            let mut mint1_data = vec![0; Mint::get_packed_len()];
1138            let mint1_state = Mint {
1139                mint_authority: COption::Some(wallet1_pubkey),
1140                supply: 500,
1141                decimals: 2,
1142                is_initialized: true,
1143                freeze_authority: COption::Some(wallet1_pubkey),
1144            };
1145            Mint::pack(mint1_state, &mut mint1_data).unwrap();
1146            let mint_account1 = AccountSharedData::from(Account {
1147                lamports: 222,
1148                data: mint1_data.to_vec(),
1149                owner: token::id(),
1150                ..Account::default()
1151            });
1152            bank.store_account(&mint1_pubkey, &mint_account1);
1153
1154            // Add another token account with the different owner, but same delegate, and mint
1155            let mut account2_data = vec![0; TokenAccount::get_packed_len()];
1156            let token_account2 = TokenAccount {
1157                mint: mint1_pubkey,
1158                owner: wallet2_pubkey,
1159                delegate: COption::Some(delegate),
1160                amount: 420,
1161                state: TokenAccountState::Initialized,
1162                is_native: COption::None,
1163                delegated_amount: 30,
1164                close_authority: COption::Some(wallet2_pubkey),
1165            };
1166            TokenAccount::pack(token_account2, &mut account2_data).unwrap();
1167            let token_account2 = AccountSharedData::from(Account {
1168                lamports: 333,
1169                data: account2_data.to_vec(),
1170                owner: token::id(),
1171                ..Account::default()
1172            });
1173            bank.store_account(&token_account2_pubkey, &token_account2);
1174
1175            // Add another token account with the same owner and delegate but different mint
1176            let mut account3_data = vec![0; TokenAccount::get_packed_len()];
1177            let token_account3 = TokenAccount {
1178                mint: mint2_pubkey,
1179                owner: wallet2_pubkey,
1180                delegate: COption::Some(delegate),
1181                amount: 42,
1182                state: TokenAccountState::Initialized,
1183                is_native: COption::None,
1184                delegated_amount: 30,
1185                close_authority: COption::Some(wallet2_pubkey),
1186            };
1187            TokenAccount::pack(token_account3, &mut account3_data).unwrap();
1188            let token_account3 = AccountSharedData::from(Account {
1189                lamports: 444,
1190                data: account3_data.to_vec(),
1191                owner: token::id(),
1192                ..Account::default()
1193            });
1194            bank.store_account(&token_account3_pubkey, &token_account3);
1195
1196            // Add the new mint
1197            let mut mint2_data = vec![0; Mint::get_packed_len()];
1198            let mint2_state = Mint {
1199                mint_authority: COption::Some(wallet2_pubkey),
1200                supply: 200,
1201                decimals: 3,
1202                is_initialized: true,
1203                freeze_authority: COption::Some(wallet2_pubkey),
1204            };
1205            Mint::pack(mint2_state, &mut mint2_data).unwrap();
1206            let mint_account2 = AccountSharedData::from(Account {
1207                lamports: 555,
1208                data: mint2_data.to_vec(),
1209                owner: token::id(),
1210                ..Account::default()
1211            });
1212            bank.store_account(&mint2_pubkey, &mint_account2);
1213
1214            // Accounts should now look like the following:
1215            //
1216            //                   -----system_program------
1217            //                  /                         \
1218            //                 /-(owns)                    \-(owns)
1219            //                /                             \
1220            //             wallet1                   ---wallet2---
1221            //               /                      /             \
1222            //              /-(SPL::owns)          /-(SPL::owns)   \-(SPL::owns)
1223            //             /                      /                 \
1224            //      token_account1         token_account2       token_account3
1225            //            \                     /                   /
1226            //             \-(SPL::mint)       /-(SPL::mint)       /-(SPL::mint)
1227            //              \                 /                   /
1228            //               --mint_account1--               mint_account2
1229
1230            if secondary_index_enabled {
1231                // ----------- Test for a non-existent key -----------
1232                let req = format!(
1233                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{non_existent_pubkey}"]}}"#,
1234                );
1235                let res = io.handle_request_sync(&req, meta.clone());
1236                let result: Value = serde_json::from_str(&res.expect("actual response"))
1237                    .expect("actual response deserialization");
1238                let sizes: HashMap<RpcAccountIndex, usize> =
1239                    serde_json::from_value(result["result"].clone()).unwrap();
1240                assert!(sizes.is_empty());
1241                // --------------- Test Queries ---------------
1242                // 1) Wallet1 - Owns 1 SPL Token
1243                let req = format!(
1244                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{wallet1_pubkey}"]}}"#,
1245                );
1246                let res = io.handle_request_sync(&req, meta.clone());
1247                let result: Value = serde_json::from_str(&res.expect("actual response"))
1248                    .expect("actual response deserialization");
1249                let sizes: HashMap<RpcAccountIndex, usize> =
1250                    serde_json::from_value(result["result"].clone()).unwrap();
1251                assert_eq!(sizes.len(), 1);
1252                assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenOwner).unwrap(), 1);
1253                // 2) Wallet2 - Owns 2 SPL Tokens
1254                let req = format!(
1255                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{wallet2_pubkey}"]}}"#,
1256                );
1257                let res = io.handle_request_sync(&req, meta.clone());
1258                let result: Value = serde_json::from_str(&res.expect("actual response"))
1259                    .expect("actual response deserialization");
1260                let sizes: HashMap<RpcAccountIndex, usize> =
1261                    serde_json::from_value(result["result"].clone()).unwrap();
1262                assert_eq!(sizes.len(), 1);
1263                assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenOwner).unwrap(), 2);
1264                // 3) Mint1 - Is in 2 SPL Accounts
1265                let req = format!(
1266                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{mint1_pubkey}"]}}"#,
1267                );
1268                let res = io.handle_request_sync(&req, meta.clone());
1269                let result: Value = serde_json::from_str(&res.expect("actual response"))
1270                    .expect("actual response deserialization");
1271                let sizes: HashMap<RpcAccountIndex, usize> =
1272                    serde_json::from_value(result["result"].clone()).unwrap();
1273                assert_eq!(sizes.len(), 1);
1274                assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenMint).unwrap(), 2);
1275                // 4) Mint2 - Is in 1 SPL Account
1276                let req = format!(
1277                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{mint2_pubkey}"]}}"#,
1278                );
1279                let res = io.handle_request_sync(&req, meta.clone());
1280                let result: Value = serde_json::from_str(&res.expect("actual response"))
1281                    .expect("actual response deserialization");
1282                let sizes: HashMap<RpcAccountIndex, usize> =
1283                    serde_json::from_value(result["result"].clone()).unwrap();
1284                assert_eq!(sizes.len(), 1);
1285                assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenMint).unwrap(), 1);
1286                // 5) SPL Token Program Owns 6 Accounts - 1 Default, 5 created above.
1287                let req = format!(
1288                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1289                    token::id(),
1290                );
1291                let res = io.handle_request_sync(&req, meta.clone());
1292                let result: Value = serde_json::from_str(&res.expect("actual response"))
1293                    .expect("actual response deserialization");
1294                let sizes: HashMap<RpcAccountIndex, usize> =
1295                    serde_json::from_value(result["result"].clone()).unwrap();
1296                assert_eq!(sizes.len(), 1);
1297                assert_eq!(
1298                    *sizes.get(&RpcAccountIndex::ProgramId).unwrap(),
1299                    (num_default_spl_token_program_accounts + 5)
1300                );
1301                // 5) System Program Owns 4 Accounts + 2 Default, 2 created above.
1302                let req = format!(
1303                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1304                    system_program::id(),
1305                );
1306                let res = io.handle_request_sync(&req, meta.clone());
1307                let result: Value = serde_json::from_str(&res.expect("actual response"))
1308                    .expect("actual response deserialization");
1309                let sizes: HashMap<RpcAccountIndex, usize> =
1310                    serde_json::from_value(result["result"].clone()).unwrap();
1311                assert_eq!(sizes.len(), 1);
1312                assert_eq!(
1313                    *sizes.get(&RpcAccountIndex::ProgramId).unwrap(),
1314                    (num_default_system_program_accounts + 2)
1315                );
1316            } else {
1317                // ------------ Secondary Indexes Disabled ------------
1318                let req = format!(
1319                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{token_account2_pubkey}"]}}"#,
1320                );
1321                let res = io.handle_request_sync(&req, meta.clone());
1322                let result: Value = serde_json::from_str(&res.expect("actual response"))
1323                    .expect("actual response deserialization");
1324                let sizes: HashMap<RpcAccountIndex, usize> =
1325                    serde_json::from_value(result["result"].clone()).unwrap();
1326                assert!(sizes.is_empty());
1327            }
1328        }
1329    }
1330
1331    // This test checks that the rpc call to `set_identity` works a expected with
1332    // Bank but without validator.
1333    #[test]
1334    fn test_set_identity() {
1335        let rpc = RpcHandler::start_with_config(TestConfig::default());
1336
1337        let RpcHandler { io, meta, .. } = rpc;
1338
1339        let expected_validator_id = Keypair::new();
1340        let validator_id_bytes = format!("{:?}", expected_validator_id.to_bytes());
1341
1342        let set_id_request = format!(
1343            r#"{{"jsonrpc":"2.0","id":1,"method":"setIdentityFromBytes","params":[{validator_id_bytes}, false]}}"#,
1344        );
1345        let response = io.handle_request_sync(&set_id_request, meta.clone());
1346        let actual_parsed_response: Value =
1347            serde_json::from_str(&response.expect("actual response"))
1348                .expect("actual response deserialization");
1349
1350        let expected_parsed_response: Value = serde_json::from_str(
1351            r#"{
1352                "id": 1,
1353                "jsonrpc": "2.0",
1354                "result": null
1355            }"#,
1356        )
1357        .expect("Failed to parse expected response");
1358        assert_eq!(actual_parsed_response, expected_parsed_response);
1359
1360        let contact_info_request =
1361            r#"{"jsonrpc":"2.0","id":1,"method":"contactInfo","params":[]}"#.to_string();
1362        let response = io.handle_request_sync(&contact_info_request, meta.clone());
1363        let parsed_response: Value = serde_json::from_str(&response.expect("actual response"))
1364            .expect("actual response deserialization");
1365        let actual_validator_id = parsed_response["result"]["id"]
1366            .as_str()
1367            .expect("Expected a string");
1368        assert_eq!(
1369            actual_validator_id,
1370            expected_validator_id.pubkey().to_string()
1371        );
1372    }
1373
1374    struct TestValidatorWithAdminRpc {
1375        meta: AdminRpcRequestMetadata,
1376        io: MetaIoHandler<AdminRpcRequestMetadata>,
1377        validator_ledger_path: PathBuf,
1378    }
1379
1380    impl TestValidatorWithAdminRpc {
1381        fn new() -> Self {
1382            let leader_keypair = Keypair::new();
1383            let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
1384
1385            let validator_keypair = Keypair::new();
1386            let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
1387            let genesis_config =
1388                create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1000)
1389                    .genesis_config;
1390            let (validator_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
1391
1392            let voting_keypair = Arc::new(Keypair::new());
1393            let voting_pubkey = voting_keypair.pubkey();
1394            let authorized_voter_keypairs = Arc::new(RwLock::new(vec![voting_keypair]));
1395            let validator_config = ValidatorConfig {
1396                rpc_addrs: Some((
1397                    validator_node.info.rpc().unwrap(),
1398                    validator_node.info.rpc_pubsub().unwrap(),
1399                )),
1400                ..ValidatorConfig::default_for_test()
1401            };
1402            let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
1403
1404            let post_init = Arc::new(RwLock::new(None));
1405            let meta = AdminRpcRequestMetadata {
1406                rpc_addr: validator_config.rpc_addrs.map(|(rpc_addr, _)| rpc_addr),
1407                start_time: SystemTime::now(),
1408                start_progress: start_progress.clone(),
1409                validator_exit: validator_config.validator_exit.clone(),
1410                validator_exit_backpressure: HashMap::default(),
1411                authorized_voter_keypairs: authorized_voter_keypairs.clone(),
1412                tower_storage: Arc::new(NullTowerStorage {}),
1413                post_init: post_init.clone(),
1414                staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
1415                rpc_to_plugin_manager_sender: None,
1416            };
1417
1418            let _validator = Validator::new(
1419                validator_node,
1420                Arc::new(validator_keypair),
1421                &validator_ledger_path,
1422                &voting_pubkey,
1423                authorized_voter_keypairs,
1424                vec![leader_node.info],
1425                &validator_config,
1426                true, // should_check_duplicate_instance
1427                None, // rpc_to_plugin_manager_receiver
1428                start_progress.clone(),
1429                SocketAddrSpace::Unspecified,
1430                ValidatorTpuConfig::new_for_tests(DEFAULT_TPU_ENABLE_UDP),
1431                post_init.clone(),
1432            )
1433            .expect("assume successful validator start");
1434            assert_eq!(
1435                *start_progress.read().unwrap(),
1436                ValidatorStartProgress::Running
1437            );
1438            let post_init = post_init.read().unwrap();
1439
1440            assert!(post_init.is_some());
1441            let post_init = post_init.as_ref().unwrap();
1442            let notifies = post_init.notifies.read().unwrap();
1443            let updater_keys: HashSet<KeyUpdaterType> =
1444                notifies.into_iter().map(|(key, _)| key.clone()).collect();
1445            assert_eq!(
1446                updater_keys,
1447                HashSet::from_iter(vec![
1448                    KeyUpdaterType::Tpu,
1449                    KeyUpdaterType::TpuForwards,
1450                    KeyUpdaterType::TpuVote,
1451                    KeyUpdaterType::Forward,
1452                    KeyUpdaterType::RpcService
1453                ])
1454            );
1455            let mut io = MetaIoHandler::default();
1456            io.extend_with(AdminRpcImpl.to_delegate());
1457            Self {
1458                meta,
1459                io,
1460                validator_ledger_path,
1461            }
1462        }
1463
1464        fn handle_request(&self, request: &str) -> Option<String> {
1465            self.io.handle_request_sync(request, self.meta.clone())
1466        }
1467    }
1468
1469    impl Drop for TestValidatorWithAdminRpc {
1470        fn drop(&mut self) {
1471            remove_dir_all(self.validator_ledger_path.clone()).unwrap();
1472        }
1473    }
1474
1475    // This test checks that `set_identity` call works with working validator and client.
1476    #[test]
1477    fn test_set_identity_with_validator() {
1478        let test_validator = TestValidatorWithAdminRpc::new();
1479        let expected_validator_id = Keypair::new();
1480        let validator_id_bytes = format!("{:?}", expected_validator_id.to_bytes());
1481
1482        let set_id_request = format!(
1483            r#"{{"jsonrpc":"2.0","id":1,"method":"setIdentityFromBytes","params":[{validator_id_bytes}, false]}}"#,
1484        );
1485        let response = test_validator.handle_request(&set_id_request);
1486        let actual_parsed_response: Value =
1487            serde_json::from_str(&response.expect("actual response"))
1488                .expect("actual response deserialization");
1489
1490        let expected_parsed_response: Value = serde_json::from_str(
1491            r#"{
1492                "id": 1,
1493                "jsonrpc": "2.0",
1494                "result": null
1495            }"#,
1496        )
1497        .expect("Failed to parse expected response");
1498        assert_eq!(actual_parsed_response, expected_parsed_response);
1499
1500        let contact_info_request =
1501            r#"{"jsonrpc":"2.0","id":1,"method":"contactInfo","params":[]}"#.to_string();
1502        let response = test_validator.handle_request(&contact_info_request);
1503        let parsed_response: Value = serde_json::from_str(&response.expect("actual response"))
1504            .expect("actual response deserialization");
1505        let actual_validator_id = parsed_response["result"]["id"]
1506            .as_str()
1507            .expect("Expected a string");
1508        assert_eq!(
1509            actual_validator_id,
1510            expected_validator_id.pubkey().to_string()
1511        );
1512
1513        let contact_info_request =
1514            r#"{"jsonrpc":"2.0","id":1,"method":"exit","params":[]}"#.to_string();
1515        let exit_response = test_validator.handle_request(&contact_info_request);
1516        let actual_parsed_response: Value =
1517            serde_json::from_str(&exit_response.expect("actual response"))
1518                .expect("actual response deserialization");
1519        assert_eq!(actual_parsed_response, expected_parsed_response);
1520    }
1521}