agave_validator/
admin_rpc_service.rs

1use {
2    crossbeam_channel::Sender,
3    jsonrpc_core::{BoxFuture, ErrorCode, MetaIoHandler, Metadata, Result},
4    jsonrpc_core_client::{transports::ipc, RpcError},
5    jsonrpc_derive::rpc,
6    jsonrpc_ipc_server::{
7        tokio::sync::oneshot::channel as oneshot_channel, RequestContext, ServerBuilder,
8    },
9    log::*,
10    serde::{de::Deserializer, Deserialize, Serialize},
11    solana_accounts_db::accounts_index::AccountIndex,
12    solana_core::{
13        admin_rpc_post_init::AdminRpcRequestMetadataPostInit,
14        consensus::{tower_storage::TowerStorage, Tower},
15        repair::repair_service,
16        validator::ValidatorStartProgress,
17    },
18    solana_geyser_plugin_manager::GeyserPluginManagerRequest,
19    solana_gossip::contact_info::{ContactInfo, Protocol, SOCKET_ADDR_UNSPECIFIED},
20    solana_keypair::{read_keypair_file, Keypair},
21    solana_pubkey::Pubkey,
22    solana_rpc::rpc::verify_pubkey,
23    solana_rpc_client_api::{config::RpcAccountIndex, custom_error::RpcCustomError},
24    solana_signer::Signer,
25    solana_validator_exit::Exit,
26    std::{
27        collections::{HashMap, HashSet},
28        env, error,
29        fmt::{self, Display},
30        net::{IpAddr, SocketAddr},
31        path::{Path, PathBuf},
32        sync::{
33            atomic::{AtomicBool, Ordering},
34            Arc, RwLock,
35        },
36        thread::{self, Builder},
37        time::{Duration, SystemTime},
38    },
39    tokio::runtime::Runtime,
40};
41
42#[derive(Clone)]
43pub struct AdminRpcRequestMetadata {
44    pub rpc_addr: Option<SocketAddr>,
45    pub start_time: SystemTime,
46    pub start_progress: Arc<RwLock<ValidatorStartProgress>>,
47    pub validator_exit: Arc<RwLock<Exit>>,
48    pub validator_exit_backpressure: HashMap<String, Arc<AtomicBool>>,
49    pub authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
50    pub tower_storage: Arc<dyn TowerStorage>,
51    pub staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
52    pub post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
53    pub rpc_to_plugin_manager_sender: Option<Sender<GeyserPluginManagerRequest>>,
54}
55
56impl Metadata for AdminRpcRequestMetadata {}
57
58impl AdminRpcRequestMetadata {
59    fn with_post_init<F, R>(&self, func: F) -> Result<R>
60    where
61        F: FnOnce(&AdminRpcRequestMetadataPostInit) -> Result<R>,
62    {
63        if let Some(post_init) = self.post_init.read().unwrap().as_ref() {
64            func(post_init)
65        } else {
66            Err(jsonrpc_core::error::Error::invalid_params(
67                "Retry once validator start up is complete",
68            ))
69        }
70    }
71}
72
73#[derive(Debug, Deserialize, Serialize)]
74pub struct AdminRpcContactInfo {
75    pub id: String,
76    pub gossip: SocketAddr,
77    pub tvu: SocketAddr,
78    pub tvu_quic: SocketAddr,
79    pub serve_repair_quic: SocketAddr,
80    pub tpu: SocketAddr,
81    pub tpu_forwards: SocketAddr,
82    pub tpu_vote: SocketAddr,
83    pub rpc: SocketAddr,
84    pub rpc_pubsub: SocketAddr,
85    pub serve_repair: SocketAddr,
86    pub last_updated_timestamp: u64,
87    pub shred_version: u16,
88}
89
90#[derive(Debug, Deserialize, Serialize)]
91pub struct AdminRpcRepairWhitelist {
92    pub whitelist: Vec<Pubkey>,
93}
94
95impl From<ContactInfo> for AdminRpcContactInfo {
96    fn from(node: ContactInfo) -> Self {
97        macro_rules! unwrap_socket {
98            ($name:ident) => {
99                node.$name().unwrap_or(SOCKET_ADDR_UNSPECIFIED)
100            };
101            ($name:ident, $protocol:expr) => {
102                node.$name($protocol).unwrap_or(SOCKET_ADDR_UNSPECIFIED)
103            };
104        }
105        Self {
106            id: node.pubkey().to_string(),
107            last_updated_timestamp: node.wallclock(),
108            gossip: unwrap_socket!(gossip),
109            tvu: unwrap_socket!(tvu, Protocol::UDP),
110            tvu_quic: unwrap_socket!(tvu, Protocol::QUIC),
111            serve_repair_quic: unwrap_socket!(serve_repair, Protocol::QUIC),
112            tpu: unwrap_socket!(tpu, Protocol::UDP),
113            tpu_forwards: unwrap_socket!(tpu_forwards, Protocol::UDP),
114            tpu_vote: unwrap_socket!(tpu_vote, Protocol::UDP),
115            rpc: unwrap_socket!(rpc),
116            rpc_pubsub: unwrap_socket!(rpc_pubsub),
117            serve_repair: unwrap_socket!(serve_repair, Protocol::UDP),
118            shred_version: node.shred_version(),
119        }
120    }
121}
122
123impl Display for AdminRpcContactInfo {
124    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
125        writeln!(f, "Identity: {}", self.id)?;
126        writeln!(f, "Gossip: {}", self.gossip)?;
127        writeln!(f, "TVU: {}", self.tvu)?;
128        writeln!(f, "TVU QUIC: {}", self.tvu_quic)?;
129        writeln!(f, "TPU: {}", self.tpu)?;
130        writeln!(f, "TPU Forwards: {}", self.tpu_forwards)?;
131        writeln!(f, "TPU Votes: {}", self.tpu_vote)?;
132        writeln!(f, "RPC: {}", self.rpc)?;
133        writeln!(f, "RPC Pubsub: {}", self.rpc_pubsub)?;
134        writeln!(f, "Serve Repair: {}", self.serve_repair)?;
135        writeln!(f, "Last Updated Timestamp: {}", self.last_updated_timestamp)?;
136        writeln!(f, "Shred Version: {}", self.shred_version)
137    }
138}
139impl solana_cli_output::VerboseDisplay for AdminRpcContactInfo {}
140impl solana_cli_output::QuietDisplay for AdminRpcContactInfo {}
141
142impl Display for AdminRpcRepairWhitelist {
143    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
144        writeln!(f, "Repair whitelist: {:?}", &self.whitelist)
145    }
146}
147impl solana_cli_output::VerboseDisplay for AdminRpcRepairWhitelist {}
148impl solana_cli_output::QuietDisplay for AdminRpcRepairWhitelist {}
149
150#[rpc]
151pub trait AdminRpc {
152    type Metadata;
153
154    /// Initiates validator exit; exit is asynchronous so the validator
155    /// will almost certainly still be running when this method returns
156    #[rpc(meta, name = "exit")]
157    fn exit(&self, meta: Self::Metadata) -> Result<()>;
158
159    /// Return the process id (pid)
160    #[rpc(meta, name = "pid")]
161    fn pid(&self, meta: Self::Metadata) -> Result<u32>;
162
163    #[rpc(meta, name = "reloadPlugin")]
164    fn reload_plugin(
165        &self,
166        meta: Self::Metadata,
167        name: String,
168        config_file: String,
169    ) -> BoxFuture<Result<()>>;
170
171    #[rpc(meta, name = "unloadPlugin")]
172    fn unload_plugin(&self, meta: Self::Metadata, name: String) -> BoxFuture<Result<()>>;
173
174    #[rpc(meta, name = "loadPlugin")]
175    fn load_plugin(&self, meta: Self::Metadata, config_file: String) -> BoxFuture<Result<String>>;
176
177    #[rpc(meta, name = "listPlugins")]
178    fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture<Result<Vec<String>>>;
179
180    #[rpc(meta, name = "rpcAddress")]
181    fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>>;
182
183    #[rpc(name = "setLogFilter")]
184    fn set_log_filter(&self, filter: String) -> Result<()>;
185
186    #[rpc(meta, name = "startTime")]
187    fn start_time(&self, meta: Self::Metadata) -> Result<SystemTime>;
188
189    #[rpc(meta, name = "startProgress")]
190    fn start_progress(&self, meta: Self::Metadata) -> Result<ValidatorStartProgress>;
191
192    #[rpc(meta, name = "addAuthorizedVoter")]
193    fn add_authorized_voter(&self, meta: Self::Metadata, keypair_file: String) -> Result<()>;
194
195    #[rpc(meta, name = "addAuthorizedVoterFromBytes")]
196    fn add_authorized_voter_from_bytes(&self, meta: Self::Metadata, keypair: Vec<u8>)
197        -> Result<()>;
198
199    #[rpc(meta, name = "removeAllAuthorizedVoters")]
200    fn remove_all_authorized_voters(&self, meta: Self::Metadata) -> Result<()>;
201
202    #[rpc(meta, name = "setIdentity")]
203    fn set_identity(
204        &self,
205        meta: Self::Metadata,
206        keypair_file: String,
207        require_tower: bool,
208    ) -> Result<()>;
209
210    #[rpc(meta, name = "setIdentityFromBytes")]
211    fn set_identity_from_bytes(
212        &self,
213        meta: Self::Metadata,
214        identity_keypair: Vec<u8>,
215        require_tower: bool,
216    ) -> Result<()>;
217
218    #[rpc(meta, name = "setStakedNodesOverrides")]
219    fn set_staked_nodes_overrides(&self, meta: Self::Metadata, path: String) -> Result<()>;
220
221    #[rpc(meta, name = "contactInfo")]
222    fn contact_info(&self, meta: Self::Metadata) -> Result<AdminRpcContactInfo>;
223
224    #[rpc(meta, name = "selectActiveInterface")]
225    fn select_active_interface(&self, meta: Self::Metadata, interface: IpAddr) -> Result<()>;
226
227    #[rpc(meta, name = "repairShredFromPeer")]
228    fn repair_shred_from_peer(
229        &self,
230        meta: Self::Metadata,
231        pubkey: Option<Pubkey>,
232        slot: u64,
233        shred_index: u64,
234    ) -> Result<()>;
235
236    #[rpc(meta, name = "repairWhitelist")]
237    fn repair_whitelist(&self, meta: Self::Metadata) -> Result<AdminRpcRepairWhitelist>;
238
239    #[rpc(meta, name = "setRepairWhitelist")]
240    fn set_repair_whitelist(&self, meta: Self::Metadata, whitelist: Vec<Pubkey>) -> Result<()>;
241
242    #[rpc(meta, name = "getSecondaryIndexKeySize")]
243    fn get_secondary_index_key_size(
244        &self,
245        meta: Self::Metadata,
246        pubkey_str: String,
247    ) -> Result<HashMap<RpcAccountIndex, usize>>;
248
249    #[rpc(meta, name = "setPublicTpuAddress")]
250    fn set_public_tpu_address(
251        &self,
252        meta: Self::Metadata,
253        public_tpu_addr: SocketAddr,
254    ) -> Result<()>;
255
256    #[rpc(meta, name = "setPublicTpuForwardsAddress")]
257    fn set_public_tpu_forwards_address(
258        &self,
259        meta: Self::Metadata,
260        public_tpu_forwards_addr: SocketAddr,
261    ) -> Result<()>;
262}
263
264pub struct AdminRpcImpl;
265impl AdminRpc for AdminRpcImpl {
266    type Metadata = AdminRpcRequestMetadata;
267
268    fn exit(&self, meta: Self::Metadata) -> Result<()> {
269        debug!("exit admin rpc request received");
270
271        thread::Builder::new()
272            .name("solProcessExit".into())
273            .spawn(move || {
274                // Delay exit signal until this RPC request completes, otherwise the caller of `exit` might
275                // receive a confusing error as the validator shuts down before a response is sent back.
276                thread::sleep(Duration::from_millis(100));
277
278                info!("validator exit requested");
279                meta.validator_exit.write().unwrap().exit();
280
281                if !meta.validator_exit_backpressure.is_empty() {
282                    let service_names = meta.validator_exit_backpressure.keys();
283                    info!("Wait for these services to complete: {service_names:?}");
284                    loop {
285                        // The initial sleep is a grace period to allow the services to raise their
286                        // backpressure flags.
287                        // Subsequent sleeps are to throttle how often we check and log.
288                        thread::sleep(Duration::from_secs(1));
289
290                        let mut any_flags_raised = false;
291                        for (name, flag) in meta.validator_exit_backpressure.iter() {
292                            let is_flag_raised = flag.load(Ordering::Relaxed);
293                            if is_flag_raised {
294                                info!("{name}'s exit backpressure flag is raised");
295                                any_flags_raised = true;
296                            }
297                        }
298                        if !any_flags_raised {
299                            break;
300                        }
301                    }
302                    info!("All services have completed");
303                }
304
305                // TODO: Debug why Exit doesn't always cause the validator to fully exit
306                // (rocksdb background processing or some other stuck thread perhaps?).
307                //
308                // If the process is still alive after five seconds, exit harder
309                thread::sleep(Duration::from_secs(
310                    env::var("SOLANA_VALIDATOR_EXIT_TIMEOUT")
311                        .ok()
312                        .and_then(|x| x.parse().ok())
313                        .unwrap_or(5),
314                ));
315                warn!("validator exit timeout");
316                std::process::exit(0);
317            })
318            .unwrap();
319
320        Ok(())
321    }
322
323    fn pid(&self, _meta: Self::Metadata) -> Result<u32> {
324        Ok(std::process::id())
325    }
326
327    fn reload_plugin(
328        &self,
329        meta: Self::Metadata,
330        name: String,
331        config_file: String,
332    ) -> BoxFuture<Result<()>> {
333        Box::pin(async move {
334            // Construct channel for plugin to respond to this particular rpc request instance
335            let (response_sender, response_receiver) = oneshot_channel();
336
337            // Send request to plugin manager if there is a geyser service
338            if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
339                rpc_to_manager_sender
340                    .send(GeyserPluginManagerRequest::ReloadPlugin {
341                        name,
342                        config_file,
343                        response_sender,
344                    })
345                    .expect("GeyerPluginService should never drop request receiver");
346            } else {
347                return Err(jsonrpc_core::Error {
348                    code: ErrorCode::InvalidRequest,
349                    message: "No geyser plugin service".to_string(),
350                    data: None,
351                });
352            }
353
354            // Await response from plugin manager
355            response_receiver
356                .await
357                .expect("GeyerPluginService's oneshot sender shouldn't drop early")
358        })
359    }
360
361    fn load_plugin(&self, meta: Self::Metadata, config_file: String) -> BoxFuture<Result<String>> {
362        Box::pin(async move {
363            // Construct channel for plugin to respond to this particular rpc request instance
364            let (response_sender, response_receiver) = oneshot_channel();
365
366            // Send request to plugin manager if there is a geyser service
367            if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
368                rpc_to_manager_sender
369                    .send(GeyserPluginManagerRequest::LoadPlugin {
370                        config_file,
371                        response_sender,
372                    })
373                    .expect("GeyerPluginService should never drop request receiver");
374            } else {
375                return Err(jsonrpc_core::Error {
376                    code: ErrorCode::InvalidRequest,
377                    message: "No geyser plugin service".to_string(),
378                    data: None,
379                });
380            }
381
382            // Await response from plugin manager
383            response_receiver
384                .await
385                .expect("GeyerPluginService's oneshot sender shouldn't drop early")
386        })
387    }
388
389    fn unload_plugin(&self, meta: Self::Metadata, name: String) -> BoxFuture<Result<()>> {
390        Box::pin(async move {
391            // Construct channel for plugin to respond to this particular rpc request instance
392            let (response_sender, response_receiver) = oneshot_channel();
393
394            // Send request to plugin manager if there is a geyser service
395            if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
396                rpc_to_manager_sender
397                    .send(GeyserPluginManagerRequest::UnloadPlugin {
398                        name,
399                        response_sender,
400                    })
401                    .expect("GeyerPluginService should never drop request receiver");
402            } else {
403                return Err(jsonrpc_core::Error {
404                    code: ErrorCode::InvalidRequest,
405                    message: "No geyser plugin service".to_string(),
406                    data: None,
407                });
408            }
409
410            // Await response from plugin manager
411            response_receiver
412                .await
413                .expect("GeyerPluginService's oneshot sender shouldn't drop early")
414        })
415    }
416
417    fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture<Result<Vec<String>>> {
418        Box::pin(async move {
419            // Construct channel for plugin to respond to this particular rpc request instance
420            let (response_sender, response_receiver) = oneshot_channel();
421
422            // Send request to plugin manager
423            if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
424                rpc_to_manager_sender
425                    .send(GeyserPluginManagerRequest::ListPlugins { response_sender })
426                    .expect("GeyerPluginService should never drop request receiver");
427            } else {
428                return Err(jsonrpc_core::Error {
429                    code: ErrorCode::InvalidRequest,
430                    message: "No geyser plugin service".to_string(),
431                    data: None,
432                });
433            }
434
435            // Await response from plugin manager
436            response_receiver
437                .await
438                .expect("GeyerPluginService's oneshot sender shouldn't drop early")
439        })
440    }
441
442    fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>> {
443        debug!("rpc_addr admin rpc request received");
444        Ok(meta.rpc_addr)
445    }
446
447    fn set_log_filter(&self, filter: String) -> Result<()> {
448        debug!("set_log_filter admin rpc request received");
449        solana_logger::setup_with(&filter);
450        Ok(())
451    }
452
453    fn start_time(&self, meta: Self::Metadata) -> Result<SystemTime> {
454        debug!("start_time admin rpc request received");
455        Ok(meta.start_time)
456    }
457
458    fn start_progress(&self, meta: Self::Metadata) -> Result<ValidatorStartProgress> {
459        debug!("start_progress admin rpc request received");
460        Ok(*meta.start_progress.read().unwrap())
461    }
462
463    fn add_authorized_voter(&self, meta: Self::Metadata, keypair_file: String) -> Result<()> {
464        debug!("add_authorized_voter request received");
465
466        let authorized_voter = read_keypair_file(keypair_file)
467            .map_err(|err| jsonrpc_core::error::Error::invalid_params(format!("{err}")))?;
468
469        AdminRpcImpl::add_authorized_voter_keypair(meta, authorized_voter)
470    }
471
472    fn add_authorized_voter_from_bytes(
473        &self,
474        meta: Self::Metadata,
475        keypair: Vec<u8>,
476    ) -> Result<()> {
477        debug!("add_authorized_voter_from_bytes request received");
478
479        let authorized_voter = Keypair::try_from(keypair.as_ref()).map_err(|err| {
480            jsonrpc_core::error::Error::invalid_params(format!(
481                "Failed to read authorized voter keypair from provided byte array: {err}"
482            ))
483        })?;
484
485        AdminRpcImpl::add_authorized_voter_keypair(meta, authorized_voter)
486    }
487
488    fn remove_all_authorized_voters(&self, meta: Self::Metadata) -> Result<()> {
489        debug!("remove_all_authorized_voters received");
490        meta.authorized_voter_keypairs.write().unwrap().clear();
491        Ok(())
492    }
493
494    fn set_identity(
495        &self,
496        meta: Self::Metadata,
497        keypair_file: String,
498        require_tower: bool,
499    ) -> Result<()> {
500        debug!("set_identity request received");
501
502        let identity_keypair = read_keypair_file(&keypair_file).map_err(|err| {
503            jsonrpc_core::error::Error::invalid_params(format!(
504                "Failed to read identity keypair from {keypair_file}: {err}"
505            ))
506        })?;
507
508        AdminRpcImpl::set_identity_keypair(meta, identity_keypair, require_tower)
509    }
510
511    fn set_identity_from_bytes(
512        &self,
513        meta: Self::Metadata,
514        identity_keypair: Vec<u8>,
515        require_tower: bool,
516    ) -> Result<()> {
517        debug!("set_identity_from_bytes request received");
518
519        let identity_keypair = Keypair::try_from(identity_keypair.as_ref()).map_err(|err| {
520            jsonrpc_core::error::Error::invalid_params(format!(
521                "Failed to read identity keypair from provided byte array: {err}"
522            ))
523        })?;
524
525        AdminRpcImpl::set_identity_keypair(meta, identity_keypair, require_tower)
526    }
527
528    fn set_staked_nodes_overrides(&self, meta: Self::Metadata, path: String) -> Result<()> {
529        let loaded_config = load_staked_nodes_overrides(&path)
530            .map_err(|err| {
531                error!(
532                    "Failed to load staked nodes overrides from {}: {}",
533                    &path, err
534                );
535                jsonrpc_core::error::Error::internal_error()
536            })?
537            .staked_map_id;
538        let mut write_staked_nodes = meta.staked_nodes_overrides.write().unwrap();
539        write_staked_nodes.clear();
540        write_staked_nodes.extend(loaded_config);
541        info!("Staked nodes overrides loaded from {path}");
542        debug!("overrides map: {write_staked_nodes:?}");
543        Ok(())
544    }
545
546    fn contact_info(&self, meta: Self::Metadata) -> Result<AdminRpcContactInfo> {
547        meta.with_post_init(|post_init| Ok(post_init.cluster_info.my_contact_info().into()))
548    }
549
550    fn select_active_interface(&self, meta: Self::Metadata, interface: IpAddr) -> Result<()> {
551        debug!("select_active_interface received: {interface}");
552        meta.with_post_init(|post_init| {
553            let node = post_init.node.as_ref().ok_or_else(|| {
554                jsonrpc_core::Error::invalid_params("`Node` not initialized in post_init")
555            })?;
556
557            node.switch_active_interface(interface, &post_init.cluster_info)
558                .map_err(|e| {
559                    jsonrpc_core::Error::invalid_params(format!(
560                        "Switching failed due to error {e}"
561                    ))
562                })?;
563            info!("Switched primary interface to {interface}");
564            Ok(())
565        })
566    }
567
568    fn repair_shred_from_peer(
569        &self,
570        meta: Self::Metadata,
571        pubkey: Option<Pubkey>,
572        slot: u64,
573        shred_index: u64,
574    ) -> Result<()> {
575        debug!("repair_shred_from_peer request received");
576
577        meta.with_post_init(|post_init| {
578            repair_service::RepairService::request_repair_for_shred_from_peer(
579                post_init.cluster_info.clone(),
580                post_init.cluster_slots.clone(),
581                pubkey,
582                slot,
583                shred_index,
584                &post_init.repair_socket.clone(),
585                post_init.outstanding_repair_requests.clone(),
586            );
587            Ok(())
588        })
589    }
590
591    fn repair_whitelist(&self, meta: Self::Metadata) -> Result<AdminRpcRepairWhitelist> {
592        debug!("repair_whitelist request received");
593
594        meta.with_post_init(|post_init| {
595            let whitelist: Vec<_> = post_init
596                .repair_whitelist
597                .read()
598                .unwrap()
599                .iter()
600                .copied()
601                .collect();
602            Ok(AdminRpcRepairWhitelist { whitelist })
603        })
604    }
605
606    fn set_repair_whitelist(&self, meta: Self::Metadata, whitelist: Vec<Pubkey>) -> Result<()> {
607        debug!("set_repair_whitelist request received");
608
609        let whitelist: HashSet<Pubkey> = whitelist.into_iter().collect();
610        meta.with_post_init(|post_init| {
611            *post_init.repair_whitelist.write().unwrap() = whitelist;
612            warn!(
613                "Repair whitelist set to {:?}",
614                &post_init.repair_whitelist.read().unwrap()
615            );
616            Ok(())
617        })
618    }
619
620    fn get_secondary_index_key_size(
621        &self,
622        meta: Self::Metadata,
623        pubkey_str: String,
624    ) -> Result<HashMap<RpcAccountIndex, usize>> {
625        debug!("get_secondary_index_key_size rpc request received: {pubkey_str:?}");
626        let index_key = verify_pubkey(&pubkey_str)?;
627        meta.with_post_init(|post_init| {
628            let bank = post_init.bank_forks.read().unwrap().root_bank();
629
630            // Take ref to enabled AccountSecondaryIndexes
631            let enabled_account_indexes = &bank.accounts().accounts_db.account_indexes;
632
633            // Exit if secondary indexes are not enabled
634            if enabled_account_indexes.is_empty() {
635                debug!("get_secondary_index_key_size: secondary index not enabled.");
636                return Ok(HashMap::new());
637            };
638
639            // Make sure the requested key is not explicitly excluded
640            if !enabled_account_indexes.include_key(&index_key) {
641                return Err(RpcCustomError::KeyExcludedFromSecondaryIndex {
642                    index_key: index_key.to_string(),
643                }
644                .into());
645            }
646
647            // Grab a ref to the AccountsDbfor this Bank
648            let accounts_index = &bank.accounts().accounts_db.accounts_index;
649
650            // Find the size of the key in every index where it exists
651            let found_sizes = enabled_account_indexes
652                .indexes
653                .iter()
654                .filter_map(|index| {
655                    accounts_index
656                        .get_index_key_size(index, &index_key)
657                        .map(|size| (rpc_account_index_from_account_index(index), size))
658                })
659                .collect::<HashMap<_, _>>();
660
661            // Note: Will return an empty HashMap if no keys are found.
662            if found_sizes.is_empty() {
663                debug!("get_secondary_index_key_size: key not found in the secondary index.");
664            }
665            Ok(found_sizes)
666        })
667    }
668
669    fn set_public_tpu_address(
670        &self,
671        meta: Self::Metadata,
672        public_tpu_addr: SocketAddr,
673    ) -> Result<()> {
674        debug!("set_public_tpu_address rpc request received: {public_tpu_addr}");
675
676        meta.with_post_init(|post_init| {
677            post_init
678                .cluster_info
679                .my_contact_info()
680                .tpu(Protocol::UDP)
681                .ok_or_else(|| {
682                    error!(
683                        "The public TPU address isn't being published. The node is likely in \
684                         repair mode. See help for --restricted-repair-only-mode for more \
685                         information."
686                    );
687                    jsonrpc_core::error::Error::internal_error()
688                })?;
689            post_init
690                .cluster_info
691                .set_tpu(public_tpu_addr)
692                .map_err(|err| {
693                    error!("Failed to set public TPU address to {public_tpu_addr}: {err}");
694                    jsonrpc_core::error::Error::internal_error()
695                })?;
696            let my_contact_info = post_init.cluster_info.my_contact_info();
697            warn!(
698                "Public TPU addresses set to {:?} (udp) and {:?} (quic)",
699                my_contact_info.tpu(Protocol::UDP),
700                my_contact_info.tpu(Protocol::QUIC),
701            );
702            Ok(())
703        })
704    }
705
706    fn set_public_tpu_forwards_address(
707        &self,
708        meta: Self::Metadata,
709        public_tpu_forwards_addr: SocketAddr,
710    ) -> Result<()> {
711        debug!("set_public_tpu_forwards_address rpc request received: {public_tpu_forwards_addr}");
712
713        meta.with_post_init(|post_init| {
714            post_init
715                .cluster_info
716                .my_contact_info()
717                .tpu_forwards(Protocol::UDP)
718                .ok_or_else(|| {
719                    error!(
720                        "The public TPU Forwards address isn't being published. The node is \
721                         likely in repair mode. See help for --restricted-repair-only-mode for \
722                         more information."
723                    );
724                    jsonrpc_core::error::Error::internal_error()
725                })?;
726            post_init
727                .cluster_info
728                .set_tpu_forwards(public_tpu_forwards_addr)
729                .map_err(|err| {
730                    error!("Failed to set public TPU address to {public_tpu_forwards_addr}: {err}");
731                    jsonrpc_core::error::Error::internal_error()
732                })?;
733            let my_contact_info = post_init.cluster_info.my_contact_info();
734            warn!(
735                "Public TPU Forwards addresses set to {:?} (udp) and {:?} (quic)",
736                my_contact_info.tpu_forwards(Protocol::UDP),
737                my_contact_info.tpu_forwards(Protocol::QUIC),
738            );
739            Ok(())
740        })
741    }
742}
743
744impl AdminRpcImpl {
745    fn add_authorized_voter_keypair(
746        meta: AdminRpcRequestMetadata,
747        authorized_voter: Keypair,
748    ) -> Result<()> {
749        let mut authorized_voter_keypairs = meta.authorized_voter_keypairs.write().unwrap();
750
751        if authorized_voter_keypairs
752            .iter()
753            .any(|x| x.pubkey() == authorized_voter.pubkey())
754        {
755            Err(jsonrpc_core::error::Error::invalid_params(
756                "Authorized voter already present",
757            ))
758        } else {
759            authorized_voter_keypairs.push(Arc::new(authorized_voter));
760            Ok(())
761        }
762    }
763
764    fn set_identity_keypair(
765        meta: AdminRpcRequestMetadata,
766        identity_keypair: Keypair,
767        require_tower: bool,
768    ) -> Result<()> {
769        meta.with_post_init(|post_init| {
770            if require_tower {
771                let _ = Tower::restore(meta.tower_storage.as_ref(), &identity_keypair.pubkey())
772                    .map_err(|err| {
773                        jsonrpc_core::error::Error::invalid_params(format!(
774                            "Unable to load tower file for identity {}: {}",
775                            identity_keypair.pubkey(),
776                            err
777                        ))
778                    })?;
779            }
780
781            for (key, notifier) in &*post_init.notifies.read().unwrap() {
782                if let Err(err) = notifier.update_key(&identity_keypair) {
783                    error!("Error updating network layer keypair: {err} on {key:?}");
784                }
785            }
786
787            solana_metrics::set_host_id(identity_keypair.pubkey().to_string());
788            post_init
789                .cluster_info
790                .set_keypair(Arc::new(identity_keypair));
791            warn!("Identity set to {}", post_init.cluster_info.id());
792            Ok(())
793        })
794    }
795}
796
797fn rpc_account_index_from_account_index(account_index: &AccountIndex) -> RpcAccountIndex {
798    match account_index {
799        AccountIndex::ProgramId => RpcAccountIndex::ProgramId,
800        AccountIndex::SplTokenOwner => RpcAccountIndex::SplTokenOwner,
801        AccountIndex::SplTokenMint => RpcAccountIndex::SplTokenMint,
802    }
803}
804
805// Start the Admin RPC interface
806pub fn run(ledger_path: &Path, metadata: AdminRpcRequestMetadata) {
807    let admin_rpc_path = admin_rpc_path(ledger_path);
808
809    let event_loop = tokio::runtime::Builder::new_multi_thread()
810        .thread_name("solAdminRpcEl")
811        .worker_threads(3) // Three still seems like a lot, and better than the default of available core count
812        .enable_all()
813        .build()
814        .unwrap();
815
816    Builder::new()
817        .name("solAdminRpc".to_string())
818        .spawn(move || {
819            let mut io = MetaIoHandler::default();
820            io.extend_with(AdminRpcImpl.to_delegate());
821
822            let validator_exit = metadata.validator_exit.clone();
823            let server = ServerBuilder::with_meta_extractor(io, move |_req: &RequestContext| {
824                metadata.clone()
825            })
826            .event_loop_executor(event_loop.handle().clone())
827            .start(&format!("{}", admin_rpc_path.display()));
828
829            match server {
830                Err(err) => {
831                    warn!("Unable to start admin rpc service: {err:?}");
832                }
833                Ok(server) => {
834                    info!("started admin rpc service!");
835                    let close_handle = server.close_handle();
836                    validator_exit
837                        .write()
838                        .unwrap()
839                        .register_exit(Box::new(move || {
840                            close_handle.close();
841                        }));
842
843                    server.wait();
844                }
845            }
846        })
847        .unwrap();
848}
849
850fn admin_rpc_path(ledger_path: &Path) -> PathBuf {
851    #[cfg(target_family = "windows")]
852    {
853        // More information about the wackiness of pipe names over at
854        // https://docs.microsoft.com/en-us/windows/win32/ipc/pipe-names
855        if let Some(ledger_filename) = ledger_path.file_name() {
856            PathBuf::from(format!(
857                "\\\\.\\pipe\\{}-admin.rpc",
858                ledger_filename.to_string_lossy()
859            ))
860        } else {
861            PathBuf::from("\\\\.\\pipe\\admin.rpc")
862        }
863    }
864    #[cfg(not(target_family = "windows"))]
865    {
866        ledger_path.join("admin.rpc")
867    }
868}
869
870// Connect to the Admin RPC interface
871pub async fn connect(ledger_path: &Path) -> std::result::Result<gen_client::Client, RpcError> {
872    let admin_rpc_path = admin_rpc_path(ledger_path);
873    if !admin_rpc_path.exists() {
874        Err(RpcError::Client(format!(
875            "{} does not exist",
876            admin_rpc_path.display()
877        )))
878    } else {
879        ipc::connect::<_, gen_client::Client>(&format!("{}", admin_rpc_path.display())).await
880    }
881}
882
883// Create a runtime for use by client side admin RPC interface calls
884pub fn runtime() -> Runtime {
885    tokio::runtime::Builder::new_multi_thread()
886        .thread_name("solAdminRpcRt")
887        .enable_all()
888        // The agave-validator subcommands make few admin RPC calls and block
889        // on the results so two workers is plenty
890        .worker_threads(2)
891        .build()
892        .expect("new tokio runtime")
893}
894
895#[derive(Default, Deserialize, Clone)]
896pub struct StakedNodesOverrides {
897    #[serde(deserialize_with = "deserialize_pubkey_map")]
898    pub staked_map_id: HashMap<Pubkey, u64>,
899}
900
901pub fn deserialize_pubkey_map<'de, D>(des: D) -> std::result::Result<HashMap<Pubkey, u64>, D::Error>
902where
903    D: Deserializer<'de>,
904{
905    let container: HashMap<String, u64> = serde::Deserialize::deserialize(des)?;
906    let mut container_typed: HashMap<Pubkey, u64> = HashMap::new();
907    for (key, value) in container.iter() {
908        let typed_key = Pubkey::try_from(key.as_str())
909            .map_err(|_| serde::de::Error::invalid_type(serde::de::Unexpected::Map, &"PubKey"))?;
910        container_typed.insert(typed_key, *value);
911    }
912    Ok(container_typed)
913}
914
915pub fn load_staked_nodes_overrides(
916    path: &String,
917) -> std::result::Result<StakedNodesOverrides, Box<dyn error::Error>> {
918    debug!("Loading staked nodes overrides configuration from {path}");
919    if Path::new(&path).exists() {
920        let file = std::fs::File::open(path)?;
921        Ok(serde_yaml::from_reader(file)?)
922    } else {
923        Err(format!("Staked nodes overrides provided '{path}' a non-existing file path.").into())
924    }
925}
926
927#[cfg(test)]
928mod tests {
929    use {
930        super::*,
931        serde_json::Value,
932        solana_account::{Account, AccountSharedData},
933        solana_accounts_db::{
934            accounts_db::{AccountsDbConfig, ACCOUNTS_DB_CONFIG_FOR_TESTING},
935            accounts_index::AccountSecondaryIndexes,
936        },
937        solana_core::{
938            admin_rpc_post_init::{KeyUpdaterType, KeyUpdaters},
939            consensus::tower_storage::NullTowerStorage,
940            validator::{Validator, ValidatorConfig, ValidatorTpuConfig},
941        },
942        solana_gossip::{cluster_info::ClusterInfo, node::Node},
943        solana_ledger::{
944            create_new_tmp_ledger,
945            genesis_utils::{
946                create_genesis_config, create_genesis_config_with_leader, GenesisConfigInfo,
947            },
948        },
949        solana_net_utils::sockets::bind_to_localhost_unique,
950        solana_program_option::COption,
951        solana_program_pack::Pack,
952        solana_pubkey::Pubkey,
953        solana_rpc::rpc::create_validator_exit,
954        solana_runtime::{
955            bank::{Bank, BankTestConfig},
956            bank_forks::BankForks,
957        },
958        solana_streamer::socket::SocketAddrSpace,
959        solana_system_interface::program as system_program,
960        solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP,
961        spl_generic_token::token,
962        spl_token_2022_interface::state::{
963            Account as TokenAccount, AccountState as TokenAccountState, Mint,
964        },
965        std::{collections::HashSet, fs::remove_dir_all, sync::atomic::AtomicBool},
966    };
967
968    #[derive(Default)]
969    struct TestConfig {
970        account_indexes: AccountSecondaryIndexes,
971    }
972
973    struct RpcHandler {
974        io: MetaIoHandler<AdminRpcRequestMetadata>,
975        meta: AdminRpcRequestMetadata,
976        bank_forks: Arc<RwLock<BankForks>>,
977    }
978
979    impl RpcHandler {
980        fn _start() -> Self {
981            Self::start_with_config(TestConfig::default())
982        }
983
984        fn start_with_config(config: TestConfig) -> Self {
985            let keypair = Arc::new(Keypair::new());
986            let cluster_info = Arc::new(ClusterInfo::new(
987                ContactInfo::new(
988                    keypair.pubkey(),
989                    solana_time_utils::timestamp(), // wallclock
990                    0u16,                           // shred_version
991                ),
992                keypair,
993                SocketAddrSpace::Unspecified,
994            ));
995            let exit = Arc::new(AtomicBool::new(false));
996            let validator_exit = create_validator_exit(exit);
997            let (bank_forks, vote_keypair) = new_bank_forks_with_config(BankTestConfig {
998                accounts_db_config: AccountsDbConfig {
999                    account_indexes: Some(config.account_indexes),
1000                    ..ACCOUNTS_DB_CONFIG_FOR_TESTING
1001                },
1002            });
1003            let vote_account = vote_keypair.pubkey();
1004            let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
1005            let repair_whitelist = Arc::new(RwLock::new(HashSet::new()));
1006            let meta = AdminRpcRequestMetadata {
1007                rpc_addr: None,
1008                start_time: SystemTime::now(),
1009                start_progress,
1010                validator_exit,
1011                validator_exit_backpressure: HashMap::default(),
1012                authorized_voter_keypairs: Arc::new(RwLock::new(vec![vote_keypair])),
1013                tower_storage: Arc::new(NullTowerStorage {}),
1014                post_init: Arc::new(RwLock::new(Some(AdminRpcRequestMetadataPostInit {
1015                    cluster_info,
1016                    bank_forks: bank_forks.clone(),
1017                    vote_account,
1018                    repair_whitelist,
1019                    notifies: Arc::new(RwLock::new(KeyUpdaters::default())),
1020                    repair_socket: Arc::new(bind_to_localhost_unique().expect("should bind")),
1021                    outstanding_repair_requests: Arc::<
1022                        RwLock<repair_service::OutstandingShredRepairs>,
1023                    >::default(),
1024                    cluster_slots: Arc::new(
1025                        solana_core::cluster_slots_service::cluster_slots::ClusterSlots::default(),
1026                    ),
1027                    node: None,
1028                }))),
1029                staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
1030                rpc_to_plugin_manager_sender: None,
1031            };
1032            let mut io = MetaIoHandler::default();
1033            io.extend_with(AdminRpcImpl.to_delegate());
1034
1035            Self {
1036                io,
1037                meta,
1038                bank_forks,
1039            }
1040        }
1041
1042        fn root_bank(&self) -> Arc<Bank> {
1043            self.bank_forks.read().unwrap().root_bank()
1044        }
1045    }
1046
1047    fn new_bank_forks_with_config(
1048        config: BankTestConfig,
1049    ) -> (Arc<RwLock<BankForks>>, Arc<Keypair>) {
1050        let GenesisConfigInfo {
1051            genesis_config,
1052            voting_keypair,
1053            ..
1054        } = create_genesis_config(1_000_000_000);
1055
1056        let bank = Bank::new_with_config_for_tests(&genesis_config, config);
1057        (BankForks::new_rw_arc(bank), Arc::new(voting_keypair))
1058    }
1059
1060    #[test]
1061    fn test_secondary_index_key_sizes() {
1062        for secondary_index_enabled in [true, false] {
1063            let account_indexes = if secondary_index_enabled {
1064                AccountSecondaryIndexes {
1065                    keys: None,
1066                    indexes: HashSet::from([
1067                        AccountIndex::ProgramId,
1068                        AccountIndex::SplTokenMint,
1069                        AccountIndex::SplTokenOwner,
1070                    ]),
1071                }
1072            } else {
1073                AccountSecondaryIndexes::default()
1074            };
1075
1076            // RPC & Bank Setup
1077            let rpc = RpcHandler::start_with_config(TestConfig { account_indexes });
1078
1079            let bank = rpc.root_bank();
1080            let RpcHandler { io, meta, .. } = rpc;
1081
1082            // Pubkeys
1083            let token_account1_pubkey = Pubkey::new_unique();
1084            let token_account2_pubkey = Pubkey::new_unique();
1085            let token_account3_pubkey = Pubkey::new_unique();
1086            let mint1_pubkey = Pubkey::new_unique();
1087            let mint2_pubkey = Pubkey::new_unique();
1088            let wallet1_pubkey = Pubkey::new_unique();
1089            let wallet2_pubkey = Pubkey::new_unique();
1090            let non_existent_pubkey = Pubkey::new_unique();
1091            let delegate = Pubkey::new_unique();
1092
1093            let mut num_default_spl_token_program_accounts = 0;
1094            let mut num_default_system_program_accounts = 0;
1095
1096            if !secondary_index_enabled {
1097                // Test first with no accounts added & no secondary indexes enabled:
1098                let req = format!(
1099                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{token_account1_pubkey}"]}}"#,
1100                );
1101                let res = io.handle_request_sync(&req, meta.clone());
1102                let result: Value = serde_json::from_str(&res.expect("actual response"))
1103                    .expect("actual response deserialization");
1104                let sizes: HashMap<RpcAccountIndex, usize> =
1105                    serde_json::from_value(result["result"].clone()).unwrap();
1106                assert!(sizes.is_empty());
1107            } else {
1108                // Count SPL Token Program Default Accounts
1109                let req = format!(
1110                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1111                    token::id(),
1112                );
1113                let res = io.handle_request_sync(&req, meta.clone());
1114                let result: Value = serde_json::from_str(&res.expect("actual response"))
1115                    .expect("actual response deserialization");
1116                let sizes: HashMap<RpcAccountIndex, usize> =
1117                    serde_json::from_value(result["result"].clone()).unwrap();
1118                assert_eq!(sizes.len(), 1);
1119                num_default_spl_token_program_accounts =
1120                    *sizes.get(&RpcAccountIndex::ProgramId).unwrap();
1121                // Count System Program Default Accounts
1122                let req = format!(
1123                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1124                    system_program::id(),
1125                );
1126                let res = io.handle_request_sync(&req, meta.clone());
1127                let result: Value = serde_json::from_str(&res.expect("actual response"))
1128                    .expect("actual response deserialization");
1129                let sizes: HashMap<RpcAccountIndex, usize> =
1130                    serde_json::from_value(result["result"].clone()).unwrap();
1131                assert_eq!(sizes.len(), 1);
1132                num_default_system_program_accounts =
1133                    *sizes.get(&RpcAccountIndex::ProgramId).unwrap();
1134            }
1135
1136            // Add 2 basic wallet accounts
1137            let wallet1_account = AccountSharedData::from(Account {
1138                lamports: 11111111,
1139                owner: system_program::id(),
1140                ..Account::default()
1141            });
1142            bank.store_account(&wallet1_pubkey, &wallet1_account);
1143            let wallet2_account = AccountSharedData::from(Account {
1144                lamports: 11111111,
1145                owner: system_program::id(),
1146                ..Account::default()
1147            });
1148            bank.store_account(&wallet2_pubkey, &wallet2_account);
1149
1150            // Add a token account
1151            let mut account1_data = vec![0; TokenAccount::get_packed_len()];
1152            let token_account1 = TokenAccount {
1153                mint: mint1_pubkey,
1154                owner: wallet1_pubkey,
1155                delegate: COption::Some(delegate),
1156                amount: 420,
1157                state: TokenAccountState::Initialized,
1158                is_native: COption::None,
1159                delegated_amount: 30,
1160                close_authority: COption::Some(wallet1_pubkey),
1161            };
1162            TokenAccount::pack(token_account1, &mut account1_data).unwrap();
1163            let token_account1 = AccountSharedData::from(Account {
1164                lamports: 111,
1165                data: account1_data.to_vec(),
1166                owner: token::id(),
1167                ..Account::default()
1168            });
1169            bank.store_account(&token_account1_pubkey, &token_account1);
1170
1171            // Add the mint
1172            let mut mint1_data = vec![0; Mint::get_packed_len()];
1173            let mint1_state = Mint {
1174                mint_authority: COption::Some(wallet1_pubkey),
1175                supply: 500,
1176                decimals: 2,
1177                is_initialized: true,
1178                freeze_authority: COption::Some(wallet1_pubkey),
1179            };
1180            Mint::pack(mint1_state, &mut mint1_data).unwrap();
1181            let mint_account1 = AccountSharedData::from(Account {
1182                lamports: 222,
1183                data: mint1_data.to_vec(),
1184                owner: token::id(),
1185                ..Account::default()
1186            });
1187            bank.store_account(&mint1_pubkey, &mint_account1);
1188
1189            // Add another token account with the different owner, but same delegate, and mint
1190            let mut account2_data = vec![0; TokenAccount::get_packed_len()];
1191            let token_account2 = TokenAccount {
1192                mint: mint1_pubkey,
1193                owner: wallet2_pubkey,
1194                delegate: COption::Some(delegate),
1195                amount: 420,
1196                state: TokenAccountState::Initialized,
1197                is_native: COption::None,
1198                delegated_amount: 30,
1199                close_authority: COption::Some(wallet2_pubkey),
1200            };
1201            TokenAccount::pack(token_account2, &mut account2_data).unwrap();
1202            let token_account2 = AccountSharedData::from(Account {
1203                lamports: 333,
1204                data: account2_data.to_vec(),
1205                owner: token::id(),
1206                ..Account::default()
1207            });
1208            bank.store_account(&token_account2_pubkey, &token_account2);
1209
1210            // Add another token account with the same owner and delegate but different mint
1211            let mut account3_data = vec![0; TokenAccount::get_packed_len()];
1212            let token_account3 = TokenAccount {
1213                mint: mint2_pubkey,
1214                owner: wallet2_pubkey,
1215                delegate: COption::Some(delegate),
1216                amount: 42,
1217                state: TokenAccountState::Initialized,
1218                is_native: COption::None,
1219                delegated_amount: 30,
1220                close_authority: COption::Some(wallet2_pubkey),
1221            };
1222            TokenAccount::pack(token_account3, &mut account3_data).unwrap();
1223            let token_account3 = AccountSharedData::from(Account {
1224                lamports: 444,
1225                data: account3_data.to_vec(),
1226                owner: token::id(),
1227                ..Account::default()
1228            });
1229            bank.store_account(&token_account3_pubkey, &token_account3);
1230
1231            // Add the new mint
1232            let mut mint2_data = vec![0; Mint::get_packed_len()];
1233            let mint2_state = Mint {
1234                mint_authority: COption::Some(wallet2_pubkey),
1235                supply: 200,
1236                decimals: 3,
1237                is_initialized: true,
1238                freeze_authority: COption::Some(wallet2_pubkey),
1239            };
1240            Mint::pack(mint2_state, &mut mint2_data).unwrap();
1241            let mint_account2 = AccountSharedData::from(Account {
1242                lamports: 555,
1243                data: mint2_data.to_vec(),
1244                owner: token::id(),
1245                ..Account::default()
1246            });
1247            bank.store_account(&mint2_pubkey, &mint_account2);
1248
1249            // Accounts should now look like the following:
1250            //
1251            //                   -----system_program------
1252            //                  /                         \
1253            //                 /-(owns)                    \-(owns)
1254            //                /                             \
1255            //             wallet1                   ---wallet2---
1256            //               /                      /             \
1257            //              /-(SPL::owns)          /-(SPL::owns)   \-(SPL::owns)
1258            //             /                      /                 \
1259            //      token_account1         token_account2       token_account3
1260            //            \                     /                   /
1261            //             \-(SPL::mint)       /-(SPL::mint)       /-(SPL::mint)
1262            //              \                 /                   /
1263            //               --mint_account1--               mint_account2
1264
1265            if secondary_index_enabled {
1266                // ----------- Test for a non-existent key -----------
1267                let req = format!(
1268                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{non_existent_pubkey}"]}}"#,
1269                );
1270                let res = io.handle_request_sync(&req, meta.clone());
1271                let result: Value = serde_json::from_str(&res.expect("actual response"))
1272                    .expect("actual response deserialization");
1273                let sizes: HashMap<RpcAccountIndex, usize> =
1274                    serde_json::from_value(result["result"].clone()).unwrap();
1275                assert!(sizes.is_empty());
1276                // --------------- Test Queries ---------------
1277                // 1) Wallet1 - Owns 1 SPL Token
1278                let req = format!(
1279                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{wallet1_pubkey}"]}}"#,
1280                );
1281                let res = io.handle_request_sync(&req, meta.clone());
1282                let result: Value = serde_json::from_str(&res.expect("actual response"))
1283                    .expect("actual response deserialization");
1284                let sizes: HashMap<RpcAccountIndex, usize> =
1285                    serde_json::from_value(result["result"].clone()).unwrap();
1286                assert_eq!(sizes.len(), 1);
1287                assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenOwner).unwrap(), 1);
1288                // 2) Wallet2 - Owns 2 SPL Tokens
1289                let req = format!(
1290                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{wallet2_pubkey}"]}}"#,
1291                );
1292                let res = io.handle_request_sync(&req, meta.clone());
1293                let result: Value = serde_json::from_str(&res.expect("actual response"))
1294                    .expect("actual response deserialization");
1295                let sizes: HashMap<RpcAccountIndex, usize> =
1296                    serde_json::from_value(result["result"].clone()).unwrap();
1297                assert_eq!(sizes.len(), 1);
1298                assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenOwner).unwrap(), 2);
1299                // 3) Mint1 - Is in 2 SPL Accounts
1300                let req = format!(
1301                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{mint1_pubkey}"]}}"#,
1302                );
1303                let res = io.handle_request_sync(&req, meta.clone());
1304                let result: Value = serde_json::from_str(&res.expect("actual response"))
1305                    .expect("actual response deserialization");
1306                let sizes: HashMap<RpcAccountIndex, usize> =
1307                    serde_json::from_value(result["result"].clone()).unwrap();
1308                assert_eq!(sizes.len(), 1);
1309                assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenMint).unwrap(), 2);
1310                // 4) Mint2 - Is in 1 SPL Account
1311                let req = format!(
1312                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{mint2_pubkey}"]}}"#,
1313                );
1314                let res = io.handle_request_sync(&req, meta.clone());
1315                let result: Value = serde_json::from_str(&res.expect("actual response"))
1316                    .expect("actual response deserialization");
1317                let sizes: HashMap<RpcAccountIndex, usize> =
1318                    serde_json::from_value(result["result"].clone()).unwrap();
1319                assert_eq!(sizes.len(), 1);
1320                assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenMint).unwrap(), 1);
1321                // 5) SPL Token Program Owns 6 Accounts - 1 Default, 5 created above.
1322                let req = format!(
1323                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1324                    token::id(),
1325                );
1326                let res = io.handle_request_sync(&req, meta.clone());
1327                let result: Value = serde_json::from_str(&res.expect("actual response"))
1328                    .expect("actual response deserialization");
1329                let sizes: HashMap<RpcAccountIndex, usize> =
1330                    serde_json::from_value(result["result"].clone()).unwrap();
1331                assert_eq!(sizes.len(), 1);
1332                assert_eq!(
1333                    *sizes.get(&RpcAccountIndex::ProgramId).unwrap(),
1334                    (num_default_spl_token_program_accounts + 5)
1335                );
1336                // 5) System Program Owns 4 Accounts + 2 Default, 2 created above.
1337                let req = format!(
1338                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1339                    system_program::id(),
1340                );
1341                let res = io.handle_request_sync(&req, meta.clone());
1342                let result: Value = serde_json::from_str(&res.expect("actual response"))
1343                    .expect("actual response deserialization");
1344                let sizes: HashMap<RpcAccountIndex, usize> =
1345                    serde_json::from_value(result["result"].clone()).unwrap();
1346                assert_eq!(sizes.len(), 1);
1347                assert_eq!(
1348                    *sizes.get(&RpcAccountIndex::ProgramId).unwrap(),
1349                    (num_default_system_program_accounts + 2)
1350                );
1351            } else {
1352                // ------------ Secondary Indexes Disabled ------------
1353                let req = format!(
1354                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{token_account2_pubkey}"]}}"#,
1355                );
1356                let res = io.handle_request_sync(&req, meta.clone());
1357                let result: Value = serde_json::from_str(&res.expect("actual response"))
1358                    .expect("actual response deserialization");
1359                let sizes: HashMap<RpcAccountIndex, usize> =
1360                    serde_json::from_value(result["result"].clone()).unwrap();
1361                assert!(sizes.is_empty());
1362            }
1363        }
1364    }
1365
1366    // This test checks that the rpc call to `set_identity` works a expected with
1367    // Bank but without validator.
1368    #[test]
1369    fn test_set_identity() {
1370        let rpc = RpcHandler::start_with_config(TestConfig::default());
1371
1372        let RpcHandler { io, meta, .. } = rpc;
1373
1374        let expected_validator_id = Keypair::new();
1375        let validator_id_bytes = format!("{:?}", expected_validator_id.to_bytes());
1376
1377        let set_id_request = format!(
1378            r#"{{"jsonrpc":"2.0","id":1,"method":"setIdentityFromBytes","params":[{validator_id_bytes}, false]}}"#,
1379        );
1380        let response = io.handle_request_sync(&set_id_request, meta.clone());
1381        let actual_parsed_response: Value =
1382            serde_json::from_str(&response.expect("actual response"))
1383                .expect("actual response deserialization");
1384
1385        let expected_parsed_response: Value = serde_json::from_str(
1386            r#"{
1387                "id": 1,
1388                "jsonrpc": "2.0",
1389                "result": null
1390            }"#,
1391        )
1392        .expect("Failed to parse expected response");
1393        assert_eq!(actual_parsed_response, expected_parsed_response);
1394
1395        let contact_info_request =
1396            r#"{"jsonrpc":"2.0","id":1,"method":"contactInfo","params":[]}"#.to_string();
1397        let response = io.handle_request_sync(&contact_info_request, meta.clone());
1398        let parsed_response: Value = serde_json::from_str(&response.expect("actual response"))
1399            .expect("actual response deserialization");
1400        let actual_validator_id = parsed_response["result"]["id"]
1401            .as_str()
1402            .expect("Expected a string");
1403        assert_eq!(
1404            actual_validator_id,
1405            expected_validator_id.pubkey().to_string()
1406        );
1407    }
1408
1409    struct TestValidatorWithAdminRpc {
1410        meta: AdminRpcRequestMetadata,
1411        io: MetaIoHandler<AdminRpcRequestMetadata>,
1412        validator_ledger_path: PathBuf,
1413    }
1414
1415    impl TestValidatorWithAdminRpc {
1416        fn new() -> Self {
1417            let leader_keypair = Keypair::new();
1418            let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
1419
1420            let validator_keypair = Keypair::new();
1421            let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
1422            let genesis_config =
1423                create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1000)
1424                    .genesis_config;
1425            let (validator_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
1426
1427            let voting_keypair = Arc::new(Keypair::new());
1428            let voting_pubkey = voting_keypair.pubkey();
1429            let authorized_voter_keypairs = Arc::new(RwLock::new(vec![voting_keypair]));
1430            let validator_config = ValidatorConfig {
1431                rpc_addrs: Some((
1432                    validator_node.info.rpc().unwrap(),
1433                    validator_node.info.rpc_pubsub().unwrap(),
1434                )),
1435                ..ValidatorConfig::default_for_test()
1436            };
1437            let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
1438
1439            let post_init = Arc::new(RwLock::new(None));
1440            let meta = AdminRpcRequestMetadata {
1441                rpc_addr: validator_config.rpc_addrs.map(|(rpc_addr, _)| rpc_addr),
1442                start_time: SystemTime::now(),
1443                start_progress: start_progress.clone(),
1444                validator_exit: validator_config.validator_exit.clone(),
1445                validator_exit_backpressure: HashMap::default(),
1446                authorized_voter_keypairs: authorized_voter_keypairs.clone(),
1447                tower_storage: Arc::new(NullTowerStorage {}),
1448                post_init: post_init.clone(),
1449                staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
1450                rpc_to_plugin_manager_sender: None,
1451            };
1452
1453            let _validator = Validator::new(
1454                validator_node,
1455                Arc::new(validator_keypair),
1456                &validator_ledger_path,
1457                &voting_pubkey,
1458                authorized_voter_keypairs,
1459                vec![leader_node.info],
1460                &validator_config,
1461                true, // should_check_duplicate_instance
1462                None, // rpc_to_plugin_manager_receiver
1463                start_progress.clone(),
1464                SocketAddrSpace::Unspecified,
1465                ValidatorTpuConfig::new_for_tests(DEFAULT_TPU_ENABLE_UDP),
1466                post_init.clone(),
1467            )
1468            .expect("assume successful validator start");
1469            assert_eq!(
1470                *start_progress.read().unwrap(),
1471                ValidatorStartProgress::Running
1472            );
1473            let post_init = post_init.read().unwrap();
1474
1475            assert!(post_init.is_some());
1476            let post_init = post_init.as_ref().unwrap();
1477            let notifies = post_init.notifies.read().unwrap();
1478            let updater_keys: HashSet<KeyUpdaterType> =
1479                notifies.into_iter().map(|(key, _)| key.clone()).collect();
1480            assert_eq!(
1481                updater_keys,
1482                HashSet::from_iter(vec![
1483                    KeyUpdaterType::Tpu,
1484                    KeyUpdaterType::TpuForwards,
1485                    KeyUpdaterType::TpuVote,
1486                    KeyUpdaterType::Forward,
1487                    KeyUpdaterType::RpcService
1488                ])
1489            );
1490            let mut io = MetaIoHandler::default();
1491            io.extend_with(AdminRpcImpl.to_delegate());
1492            Self {
1493                meta,
1494                io,
1495                validator_ledger_path,
1496            }
1497        }
1498
1499        fn handle_request(&self, request: &str) -> Option<String> {
1500            self.io.handle_request_sync(request, self.meta.clone())
1501        }
1502    }
1503
1504    impl Drop for TestValidatorWithAdminRpc {
1505        fn drop(&mut self) {
1506            remove_dir_all(self.validator_ledger_path.clone()).unwrap();
1507        }
1508    }
1509
1510    // This test checks that `set_identity` call works with working validator and client.
1511    #[test]
1512    fn test_set_identity_with_validator() {
1513        let test_validator = TestValidatorWithAdminRpc::new();
1514        let expected_validator_id = Keypair::new();
1515        let validator_id_bytes = format!("{:?}", expected_validator_id.to_bytes());
1516
1517        let set_id_request = format!(
1518            r#"{{"jsonrpc":"2.0","id":1,"method":"setIdentityFromBytes","params":[{validator_id_bytes}, false]}}"#,
1519        );
1520        let response = test_validator.handle_request(&set_id_request);
1521        let actual_parsed_response: Value =
1522            serde_json::from_str(&response.expect("actual response"))
1523                .expect("actual response deserialization");
1524
1525        let expected_parsed_response: Value = serde_json::from_str(
1526            r#"{
1527                "id": 1,
1528                "jsonrpc": "2.0",
1529                "result": null
1530            }"#,
1531        )
1532        .expect("Failed to parse expected response");
1533        assert_eq!(actual_parsed_response, expected_parsed_response);
1534
1535        let contact_info_request =
1536            r#"{"jsonrpc":"2.0","id":1,"method":"contactInfo","params":[]}"#.to_string();
1537        let response = test_validator.handle_request(&contact_info_request);
1538        let parsed_response: Value = serde_json::from_str(&response.expect("actual response"))
1539            .expect("actual response deserialization");
1540        let actual_validator_id = parsed_response["result"]["id"]
1541            .as_str()
1542            .expect("Expected a string");
1543        assert_eq!(
1544            actual_validator_id,
1545            expected_validator_id.pubkey().to_string()
1546        );
1547
1548        let contact_info_request =
1549            r#"{"jsonrpc":"2.0","id":1,"method":"exit","params":[]}"#.to_string();
1550        let exit_response = test_validator.handle_request(&contact_info_request);
1551        let actual_parsed_response: Value =
1552            serde_json::from_str(&exit_response.expect("actual response"))
1553                .expect("actual response deserialization");
1554        assert_eq!(actual_parsed_response, expected_parsed_response);
1555    }
1556}