agave_validator/
admin_rpc_service.rs

1use {
2    crossbeam_channel::Sender,
3    jsonrpc_core::{BoxFuture, ErrorCode, MetaIoHandler, Metadata, Result},
4    jsonrpc_core_client::{transports::ipc, RpcError},
5    jsonrpc_derive::rpc,
6    jsonrpc_ipc_server::{
7        tokio::sync::oneshot::channel as oneshot_channel, RequestContext, ServerBuilder,
8    },
9    log::*,
10    serde::{de::Deserializer, Deserialize, Serialize},
11    solana_accounts_db::accounts_index::AccountIndex,
12    solana_core::{
13        admin_rpc_post_init::AdminRpcRequestMetadataPostInit,
14        banking_stage::{
15            transaction_scheduler::scheduler_controller::SchedulerConfig, BankingStage,
16        },
17        consensus::{tower_storage::TowerStorage, Tower},
18        repair::repair_service,
19        validator::{
20            BlockProductionMethod, SchedulerPacing, TransactionStructure, ValidatorStartProgress,
21        },
22    },
23    solana_geyser_plugin_manager::GeyserPluginManagerRequest,
24    solana_gossip::contact_info::{ContactInfo, Protocol, SOCKET_ADDR_UNSPECIFIED},
25    solana_keypair::{read_keypair_file, Keypair},
26    solana_pubkey::Pubkey,
27    solana_rpc::rpc::verify_pubkey,
28    solana_rpc_client_api::{config::RpcAccountIndex, custom_error::RpcCustomError},
29    solana_signer::Signer,
30    solana_validator_exit::Exit,
31    std::{
32        collections::{HashMap, HashSet},
33        env, error,
34        fmt::{self, Display},
35        net::{IpAddr, SocketAddr},
36        num::NonZeroUsize,
37        path::{Path, PathBuf},
38        sync::{
39            atomic::{AtomicBool, Ordering},
40            Arc, RwLock,
41        },
42        thread::{self, Builder},
43        time::{Duration, SystemTime},
44    },
45    tokio::runtime::Runtime,
46};
47
48#[derive(Clone)]
49pub struct AdminRpcRequestMetadata {
50    pub rpc_addr: Option<SocketAddr>,
51    pub start_time: SystemTime,
52    pub start_progress: Arc<RwLock<ValidatorStartProgress>>,
53    pub validator_exit: Arc<RwLock<Exit>>,
54    pub validator_exit_backpressure: HashMap<String, Arc<AtomicBool>>,
55    pub authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
56    pub tower_storage: Arc<dyn TowerStorage>,
57    pub staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
58    pub post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
59    pub rpc_to_plugin_manager_sender: Option<Sender<GeyserPluginManagerRequest>>,
60}
61
62impl Metadata for AdminRpcRequestMetadata {}
63
64impl AdminRpcRequestMetadata {
65    fn with_post_init<F, R>(&self, func: F) -> Result<R>
66    where
67        F: FnOnce(&AdminRpcRequestMetadataPostInit) -> Result<R>,
68    {
69        if let Some(post_init) = self.post_init.read().unwrap().as_ref() {
70            func(post_init)
71        } else {
72            Err(jsonrpc_core::error::Error::invalid_params(
73                "Retry once validator start up is complete",
74            ))
75        }
76    }
77}
78
79#[derive(Debug, Deserialize, Serialize)]
80pub struct AdminRpcContactInfo {
81    pub id: String,
82    pub gossip: SocketAddr,
83    pub tvu: SocketAddr,
84    pub tvu_quic: SocketAddr,
85    pub serve_repair_quic: SocketAddr,
86    pub tpu: SocketAddr,
87    pub tpu_forwards: SocketAddr,
88    pub tpu_vote: SocketAddr,
89    pub rpc: SocketAddr,
90    pub rpc_pubsub: SocketAddr,
91    pub serve_repair: SocketAddr,
92    pub last_updated_timestamp: u64,
93    pub shred_version: u16,
94}
95
96#[derive(Debug, Deserialize, Serialize)]
97pub struct AdminRpcRepairWhitelist {
98    pub whitelist: Vec<Pubkey>,
99}
100
101impl From<ContactInfo> for AdminRpcContactInfo {
102    fn from(node: ContactInfo) -> Self {
103        macro_rules! unwrap_socket {
104            ($name:ident) => {
105                node.$name().unwrap_or(SOCKET_ADDR_UNSPECIFIED)
106            };
107            ($name:ident, $protocol:expr) => {
108                node.$name($protocol).unwrap_or(SOCKET_ADDR_UNSPECIFIED)
109            };
110        }
111        Self {
112            id: node.pubkey().to_string(),
113            last_updated_timestamp: node.wallclock(),
114            gossip: unwrap_socket!(gossip),
115            tvu: unwrap_socket!(tvu, Protocol::UDP),
116            tvu_quic: unwrap_socket!(tvu, Protocol::QUIC),
117            serve_repair_quic: unwrap_socket!(serve_repair, Protocol::QUIC),
118            tpu: unwrap_socket!(tpu, Protocol::UDP),
119            tpu_forwards: unwrap_socket!(tpu_forwards, Protocol::UDP),
120            tpu_vote: unwrap_socket!(tpu_vote, Protocol::UDP),
121            rpc: unwrap_socket!(rpc),
122            rpc_pubsub: unwrap_socket!(rpc_pubsub),
123            serve_repair: unwrap_socket!(serve_repair, Protocol::UDP),
124            shred_version: node.shred_version(),
125        }
126    }
127}
128
129impl Display for AdminRpcContactInfo {
130    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
131        writeln!(f, "Identity: {}", self.id)?;
132        writeln!(f, "Gossip: {}", self.gossip)?;
133        writeln!(f, "TVU: {}", self.tvu)?;
134        writeln!(f, "TVU QUIC: {}", self.tvu_quic)?;
135        writeln!(f, "TPU: {}", self.tpu)?;
136        writeln!(f, "TPU Forwards: {}", self.tpu_forwards)?;
137        writeln!(f, "TPU Votes: {}", self.tpu_vote)?;
138        writeln!(f, "RPC: {}", self.rpc)?;
139        writeln!(f, "RPC Pubsub: {}", self.rpc_pubsub)?;
140        writeln!(f, "Serve Repair: {}", self.serve_repair)?;
141        writeln!(f, "Last Updated Timestamp: {}", self.last_updated_timestamp)?;
142        writeln!(f, "Shred Version: {}", self.shred_version)
143    }
144}
145impl solana_cli_output::VerboseDisplay for AdminRpcContactInfo {}
146impl solana_cli_output::QuietDisplay for AdminRpcContactInfo {}
147
148impl Display for AdminRpcRepairWhitelist {
149    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
150        writeln!(f, "Repair whitelist: {:?}", &self.whitelist)
151    }
152}
153impl solana_cli_output::VerboseDisplay for AdminRpcRepairWhitelist {}
154impl solana_cli_output::QuietDisplay for AdminRpcRepairWhitelist {}
155
156#[rpc]
157pub trait AdminRpc {
158    type Metadata;
159
160    /// Initiates validator exit; exit is asynchronous so the validator
161    /// will almost certainly still be running when this method returns
162    #[rpc(meta, name = "exit")]
163    fn exit(&self, meta: Self::Metadata) -> Result<()>;
164
165    /// Return the process id (pid)
166    #[rpc(meta, name = "pid")]
167    fn pid(&self, meta: Self::Metadata) -> Result<u32>;
168
169    #[rpc(meta, name = "reloadPlugin")]
170    fn reload_plugin(
171        &self,
172        meta: Self::Metadata,
173        name: String,
174        config_file: String,
175    ) -> BoxFuture<Result<()>>;
176
177    #[rpc(meta, name = "unloadPlugin")]
178    fn unload_plugin(&self, meta: Self::Metadata, name: String) -> BoxFuture<Result<()>>;
179
180    #[rpc(meta, name = "loadPlugin")]
181    fn load_plugin(&self, meta: Self::Metadata, config_file: String) -> BoxFuture<Result<String>>;
182
183    #[rpc(meta, name = "listPlugins")]
184    fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture<Result<Vec<String>>>;
185
186    #[rpc(meta, name = "rpcAddress")]
187    fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>>;
188
189    #[rpc(name = "setLogFilter")]
190    fn set_log_filter(&self, filter: String) -> Result<()>;
191
192    #[rpc(meta, name = "startTime")]
193    fn start_time(&self, meta: Self::Metadata) -> Result<SystemTime>;
194
195    #[rpc(meta, name = "startProgress")]
196    fn start_progress(&self, meta: Self::Metadata) -> Result<ValidatorStartProgress>;
197
198    #[rpc(meta, name = "addAuthorizedVoter")]
199    fn add_authorized_voter(&self, meta: Self::Metadata, keypair_file: String) -> Result<()>;
200
201    #[rpc(meta, name = "addAuthorizedVoterFromBytes")]
202    fn add_authorized_voter_from_bytes(&self, meta: Self::Metadata, keypair: Vec<u8>)
203        -> Result<()>;
204
205    #[rpc(meta, name = "removeAllAuthorizedVoters")]
206    fn remove_all_authorized_voters(&self, meta: Self::Metadata) -> Result<()>;
207
208    #[rpc(meta, name = "setIdentity")]
209    fn set_identity(
210        &self,
211        meta: Self::Metadata,
212        keypair_file: String,
213        require_tower: bool,
214    ) -> Result<()>;
215
216    #[rpc(meta, name = "setIdentityFromBytes")]
217    fn set_identity_from_bytes(
218        &self,
219        meta: Self::Metadata,
220        identity_keypair: Vec<u8>,
221        require_tower: bool,
222    ) -> Result<()>;
223
224    #[rpc(meta, name = "setStakedNodesOverrides")]
225    fn set_staked_nodes_overrides(&self, meta: Self::Metadata, path: String) -> Result<()>;
226
227    #[rpc(meta, name = "contactInfo")]
228    fn contact_info(&self, meta: Self::Metadata) -> Result<AdminRpcContactInfo>;
229
230    #[rpc(meta, name = "selectActiveInterface")]
231    fn select_active_interface(&self, meta: Self::Metadata, interface: IpAddr) -> Result<()>;
232
233    #[rpc(meta, name = "repairShredFromPeer")]
234    fn repair_shred_from_peer(
235        &self,
236        meta: Self::Metadata,
237        pubkey: Option<Pubkey>,
238        slot: u64,
239        shred_index: u64,
240    ) -> Result<()>;
241
242    #[rpc(meta, name = "repairWhitelist")]
243    fn repair_whitelist(&self, meta: Self::Metadata) -> Result<AdminRpcRepairWhitelist>;
244
245    #[rpc(meta, name = "setRepairWhitelist")]
246    fn set_repair_whitelist(&self, meta: Self::Metadata, whitelist: Vec<Pubkey>) -> Result<()>;
247
248    #[rpc(meta, name = "getSecondaryIndexKeySize")]
249    fn get_secondary_index_key_size(
250        &self,
251        meta: Self::Metadata,
252        pubkey_str: String,
253    ) -> Result<HashMap<RpcAccountIndex, usize>>;
254
255    #[rpc(meta, name = "setPublicTpuAddress")]
256    fn set_public_tpu_address(
257        &self,
258        meta: Self::Metadata,
259        public_tpu_addr: SocketAddr,
260    ) -> Result<()>;
261
262    #[rpc(meta, name = "setPublicTpuForwardsAddress")]
263    fn set_public_tpu_forwards_address(
264        &self,
265        meta: Self::Metadata,
266        public_tpu_forwards_addr: SocketAddr,
267    ) -> Result<()>;
268
269    #[rpc(meta, name = "manageBlockProduction")]
270    fn manage_block_production(
271        &self,
272        meta: Self::Metadata,
273        block_production_method: BlockProductionMethod,
274        transaction_struct: TransactionStructure,
275        num_workers: NonZeroUsize,
276        scheduler_pacing: SchedulerPacing,
277    ) -> Result<()>;
278}
279
280pub struct AdminRpcImpl;
281impl AdminRpc for AdminRpcImpl {
282    type Metadata = AdminRpcRequestMetadata;
283
284    fn exit(&self, meta: Self::Metadata) -> Result<()> {
285        debug!("exit admin rpc request received");
286
287        thread::Builder::new()
288            .name("solProcessExit".into())
289            .spawn(move || {
290                // Delay exit signal until this RPC request completes, otherwise the caller of `exit` might
291                // receive a confusing error as the validator shuts down before a response is sent back.
292                thread::sleep(Duration::from_millis(100));
293
294                info!("validator exit requested");
295                meta.validator_exit.write().unwrap().exit();
296
297                if !meta.validator_exit_backpressure.is_empty() {
298                    let service_names = meta.validator_exit_backpressure.keys();
299                    info!("Wait for these services to complete: {service_names:?}");
300                    loop {
301                        // The initial sleep is a grace period to allow the services to raise their
302                        // backpressure flags.
303                        // Subsequent sleeps are to throttle how often we check and log.
304                        thread::sleep(Duration::from_secs(1));
305
306                        let mut any_flags_raised = false;
307                        for (name, flag) in meta.validator_exit_backpressure.iter() {
308                            let is_flag_raised = flag.load(Ordering::Relaxed);
309                            if is_flag_raised {
310                                info!("{name}'s exit backpressure flag is raised");
311                                any_flags_raised = true;
312                            }
313                        }
314                        if !any_flags_raised {
315                            break;
316                        }
317                    }
318                    info!("All services have completed");
319                }
320
321                // TODO: Debug why Exit doesn't always cause the validator to fully exit
322                // (rocksdb background processing or some other stuck thread perhaps?).
323                //
324                // If the process is still alive after five seconds, exit harder
325                thread::sleep(Duration::from_secs(
326                    env::var("SOLANA_VALIDATOR_EXIT_TIMEOUT")
327                        .ok()
328                        .and_then(|x| x.parse().ok())
329                        .unwrap_or(5),
330                ));
331                warn!("validator exit timeout");
332                std::process::exit(0);
333            })
334            .unwrap();
335
336        Ok(())
337    }
338
339    fn pid(&self, _meta: Self::Metadata) -> Result<u32> {
340        Ok(std::process::id())
341    }
342
343    fn reload_plugin(
344        &self,
345        meta: Self::Metadata,
346        name: String,
347        config_file: String,
348    ) -> BoxFuture<Result<()>> {
349        Box::pin(async move {
350            // Construct channel for plugin to respond to this particular rpc request instance
351            let (response_sender, response_receiver) = oneshot_channel();
352
353            // Send request to plugin manager if there is a geyser service
354            if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
355                rpc_to_manager_sender
356                    .send(GeyserPluginManagerRequest::ReloadPlugin {
357                        name,
358                        config_file,
359                        response_sender,
360                    })
361                    .expect("GeyerPluginService should never drop request receiver");
362            } else {
363                return Err(jsonrpc_core::Error {
364                    code: ErrorCode::InvalidRequest,
365                    message: "No geyser plugin service".to_string(),
366                    data: None,
367                });
368            }
369
370            // Await response from plugin manager
371            response_receiver
372                .await
373                .expect("GeyerPluginService's oneshot sender shouldn't drop early")
374        })
375    }
376
377    fn load_plugin(&self, meta: Self::Metadata, config_file: String) -> BoxFuture<Result<String>> {
378        Box::pin(async move {
379            // Construct channel for plugin to respond to this particular rpc request instance
380            let (response_sender, response_receiver) = oneshot_channel();
381
382            // Send request to plugin manager if there is a geyser service
383            if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
384                rpc_to_manager_sender
385                    .send(GeyserPluginManagerRequest::LoadPlugin {
386                        config_file,
387                        response_sender,
388                    })
389                    .expect("GeyerPluginService should never drop request receiver");
390            } else {
391                return Err(jsonrpc_core::Error {
392                    code: ErrorCode::InvalidRequest,
393                    message: "No geyser plugin service".to_string(),
394                    data: None,
395                });
396            }
397
398            // Await response from plugin manager
399            response_receiver
400                .await
401                .expect("GeyerPluginService's oneshot sender shouldn't drop early")
402        })
403    }
404
405    fn unload_plugin(&self, meta: Self::Metadata, name: String) -> BoxFuture<Result<()>> {
406        Box::pin(async move {
407            // Construct channel for plugin to respond to this particular rpc request instance
408            let (response_sender, response_receiver) = oneshot_channel();
409
410            // Send request to plugin manager if there is a geyser service
411            if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
412                rpc_to_manager_sender
413                    .send(GeyserPluginManagerRequest::UnloadPlugin {
414                        name,
415                        response_sender,
416                    })
417                    .expect("GeyerPluginService should never drop request receiver");
418            } else {
419                return Err(jsonrpc_core::Error {
420                    code: ErrorCode::InvalidRequest,
421                    message: "No geyser plugin service".to_string(),
422                    data: None,
423                });
424            }
425
426            // Await response from plugin manager
427            response_receiver
428                .await
429                .expect("GeyerPluginService's oneshot sender shouldn't drop early")
430        })
431    }
432
433    fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture<Result<Vec<String>>> {
434        Box::pin(async move {
435            // Construct channel for plugin to respond to this particular rpc request instance
436            let (response_sender, response_receiver) = oneshot_channel();
437
438            // Send request to plugin manager
439            if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
440                rpc_to_manager_sender
441                    .send(GeyserPluginManagerRequest::ListPlugins { response_sender })
442                    .expect("GeyerPluginService should never drop request receiver");
443            } else {
444                return Err(jsonrpc_core::Error {
445                    code: ErrorCode::InvalidRequest,
446                    message: "No geyser plugin service".to_string(),
447                    data: None,
448                });
449            }
450
451            // Await response from plugin manager
452            response_receiver
453                .await
454                .expect("GeyerPluginService's oneshot sender shouldn't drop early")
455        })
456    }
457
458    fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>> {
459        debug!("rpc_addr admin rpc request received");
460        Ok(meta.rpc_addr)
461    }
462
463    fn set_log_filter(&self, filter: String) -> Result<()> {
464        debug!("set_log_filter admin rpc request received");
465        agave_logger::setup_with(&filter);
466        Ok(())
467    }
468
469    fn start_time(&self, meta: Self::Metadata) -> Result<SystemTime> {
470        debug!("start_time admin rpc request received");
471        Ok(meta.start_time)
472    }
473
474    fn start_progress(&self, meta: Self::Metadata) -> Result<ValidatorStartProgress> {
475        debug!("start_progress admin rpc request received");
476        Ok(*meta.start_progress.read().unwrap())
477    }
478
479    fn add_authorized_voter(&self, meta: Self::Metadata, keypair_file: String) -> Result<()> {
480        debug!("add_authorized_voter request received");
481
482        let authorized_voter = read_keypair_file(keypair_file)
483            .map_err(|err| jsonrpc_core::error::Error::invalid_params(format!("{err}")))?;
484
485        AdminRpcImpl::add_authorized_voter_keypair(meta, authorized_voter)
486    }
487
488    fn add_authorized_voter_from_bytes(
489        &self,
490        meta: Self::Metadata,
491        keypair: Vec<u8>,
492    ) -> Result<()> {
493        debug!("add_authorized_voter_from_bytes request received");
494
495        let authorized_voter = Keypair::try_from(keypair.as_ref()).map_err(|err| {
496            jsonrpc_core::error::Error::invalid_params(format!(
497                "Failed to read authorized voter keypair from provided byte array: {err}"
498            ))
499        })?;
500
501        AdminRpcImpl::add_authorized_voter_keypair(meta, authorized_voter)
502    }
503
504    fn remove_all_authorized_voters(&self, meta: Self::Metadata) -> Result<()> {
505        debug!("remove_all_authorized_voters received");
506        meta.authorized_voter_keypairs.write().unwrap().clear();
507        Ok(())
508    }
509
510    fn set_identity(
511        &self,
512        meta: Self::Metadata,
513        keypair_file: String,
514        require_tower: bool,
515    ) -> Result<()> {
516        debug!("set_identity request received");
517
518        let identity_keypair = read_keypair_file(&keypair_file).map_err(|err| {
519            jsonrpc_core::error::Error::invalid_params(format!(
520                "Failed to read identity keypair from {keypair_file}: {err}"
521            ))
522        })?;
523
524        AdminRpcImpl::set_identity_keypair(meta, identity_keypair, require_tower)
525    }
526
527    fn set_identity_from_bytes(
528        &self,
529        meta: Self::Metadata,
530        identity_keypair: Vec<u8>,
531        require_tower: bool,
532    ) -> Result<()> {
533        debug!("set_identity_from_bytes request received");
534
535        let identity_keypair = Keypair::try_from(identity_keypair.as_ref()).map_err(|err| {
536            jsonrpc_core::error::Error::invalid_params(format!(
537                "Failed to read identity keypair from provided byte array: {err}"
538            ))
539        })?;
540
541        AdminRpcImpl::set_identity_keypair(meta, identity_keypair, require_tower)
542    }
543
544    fn set_staked_nodes_overrides(&self, meta: Self::Metadata, path: String) -> Result<()> {
545        let loaded_config = load_staked_nodes_overrides(&path)
546            .map_err(|err| {
547                error!(
548                    "Failed to load staked nodes overrides from {}: {}",
549                    &path, err
550                );
551                jsonrpc_core::error::Error::internal_error()
552            })?
553            .staked_map_id;
554        let mut write_staked_nodes = meta.staked_nodes_overrides.write().unwrap();
555        write_staked_nodes.clear();
556        write_staked_nodes.extend(loaded_config);
557        info!("Staked nodes overrides loaded from {path}");
558        debug!("overrides map: {write_staked_nodes:?}");
559        Ok(())
560    }
561
562    fn contact_info(&self, meta: Self::Metadata) -> Result<AdminRpcContactInfo> {
563        meta.with_post_init(|post_init| Ok(post_init.cluster_info.my_contact_info().into()))
564    }
565
566    fn select_active_interface(&self, meta: Self::Metadata, interface: IpAddr) -> Result<()> {
567        debug!("select_active_interface received: {interface}");
568        meta.with_post_init(|post_init| {
569            let node = post_init.node.as_ref().ok_or_else(|| {
570                jsonrpc_core::Error::invalid_params("`Node` not initialized in post_init")
571            })?;
572
573            node.switch_active_interface(interface, &post_init.cluster_info)
574                .map_err(|e| {
575                    jsonrpc_core::Error::invalid_params(format!(
576                        "Switching failed due to error {e}"
577                    ))
578                })?;
579            info!("Switched primary interface to {interface}");
580            Ok(())
581        })
582    }
583
584    fn repair_shred_from_peer(
585        &self,
586        meta: Self::Metadata,
587        pubkey: Option<Pubkey>,
588        slot: u64,
589        shred_index: u64,
590    ) -> Result<()> {
591        debug!("repair_shred_from_peer request received");
592
593        meta.with_post_init(|post_init| {
594            repair_service::RepairService::request_repair_for_shred_from_peer(
595                post_init.cluster_info.clone(),
596                post_init.cluster_slots.clone(),
597                pubkey,
598                slot,
599                shred_index,
600                &post_init.repair_socket.clone(),
601                post_init.outstanding_repair_requests.clone(),
602            );
603            Ok(())
604        })
605    }
606
607    fn repair_whitelist(&self, meta: Self::Metadata) -> Result<AdminRpcRepairWhitelist> {
608        debug!("repair_whitelist request received");
609
610        meta.with_post_init(|post_init| {
611            let whitelist: Vec<_> = post_init
612                .repair_whitelist
613                .read()
614                .unwrap()
615                .iter()
616                .copied()
617                .collect();
618            Ok(AdminRpcRepairWhitelist { whitelist })
619        })
620    }
621
622    fn set_repair_whitelist(&self, meta: Self::Metadata, whitelist: Vec<Pubkey>) -> Result<()> {
623        debug!("set_repair_whitelist request received");
624
625        let whitelist: HashSet<Pubkey> = whitelist.into_iter().collect();
626        meta.with_post_init(|post_init| {
627            *post_init.repair_whitelist.write().unwrap() = whitelist;
628            warn!(
629                "Repair whitelist set to {:?}",
630                &post_init.repair_whitelist.read().unwrap()
631            );
632            Ok(())
633        })
634    }
635
636    fn get_secondary_index_key_size(
637        &self,
638        meta: Self::Metadata,
639        pubkey_str: String,
640    ) -> Result<HashMap<RpcAccountIndex, usize>> {
641        debug!("get_secondary_index_key_size rpc request received: {pubkey_str:?}");
642        let index_key = verify_pubkey(&pubkey_str)?;
643        meta.with_post_init(|post_init| {
644            let bank = post_init.bank_forks.read().unwrap().root_bank();
645
646            // Take ref to enabled AccountSecondaryIndexes
647            let enabled_account_indexes = &bank.accounts().accounts_db.account_indexes;
648
649            // Exit if secondary indexes are not enabled
650            if enabled_account_indexes.is_empty() {
651                debug!("get_secondary_index_key_size: secondary index not enabled.");
652                return Ok(HashMap::new());
653            };
654
655            // Make sure the requested key is not explicitly excluded
656            if !enabled_account_indexes.include_key(&index_key) {
657                return Err(RpcCustomError::KeyExcludedFromSecondaryIndex {
658                    index_key: index_key.to_string(),
659                }
660                .into());
661            }
662
663            // Grab a ref to the AccountsDbfor this Bank
664            let accounts_index = &bank.accounts().accounts_db.accounts_index;
665
666            // Find the size of the key in every index where it exists
667            let found_sizes = enabled_account_indexes
668                .indexes
669                .iter()
670                .filter_map(|index| {
671                    accounts_index
672                        .get_index_key_size(index, &index_key)
673                        .map(|size| (rpc_account_index_from_account_index(index), size))
674                })
675                .collect::<HashMap<_, _>>();
676
677            // Note: Will return an empty HashMap if no keys are found.
678            if found_sizes.is_empty() {
679                debug!("get_secondary_index_key_size: key not found in the secondary index.");
680            }
681            Ok(found_sizes)
682        })
683    }
684
685    fn set_public_tpu_address(
686        &self,
687        meta: Self::Metadata,
688        public_tpu_addr: SocketAddr,
689    ) -> Result<()> {
690        debug!("set_public_tpu_address rpc request received: {public_tpu_addr}");
691
692        meta.with_post_init(|post_init| {
693            post_init
694                .cluster_info
695                .my_contact_info()
696                .tpu(Protocol::UDP)
697                .ok_or_else(|| {
698                    error!(
699                        "The public TPU address isn't being published. The node is likely in \
700                         repair mode. See help for --restricted-repair-only-mode for more \
701                         information."
702                    );
703                    jsonrpc_core::error::Error::internal_error()
704                })?;
705            post_init
706                .cluster_info
707                .set_tpu(public_tpu_addr)
708                .map_err(|err| {
709                    error!("Failed to set public TPU address to {public_tpu_addr}: {err}");
710                    jsonrpc_core::error::Error::internal_error()
711                })?;
712            let my_contact_info = post_init.cluster_info.my_contact_info();
713            warn!(
714                "Public TPU addresses set to {:?} (udp) and {:?} (quic)",
715                my_contact_info.tpu(Protocol::UDP),
716                my_contact_info.tpu(Protocol::QUIC),
717            );
718            Ok(())
719        })
720    }
721
722    fn set_public_tpu_forwards_address(
723        &self,
724        meta: Self::Metadata,
725        public_tpu_forwards_addr: SocketAddr,
726    ) -> Result<()> {
727        debug!("set_public_tpu_forwards_address rpc request received: {public_tpu_forwards_addr}");
728
729        meta.with_post_init(|post_init| {
730            post_init
731                .cluster_info
732                .my_contact_info()
733                .tpu_forwards(Protocol::UDP)
734                .ok_or_else(|| {
735                    error!(
736                        "The public TPU Forwards address isn't being published. The node is \
737                         likely in repair mode. See help for --restricted-repair-only-mode for \
738                         more information."
739                    );
740                    jsonrpc_core::error::Error::internal_error()
741                })?;
742            post_init
743                .cluster_info
744                .set_tpu_forwards(public_tpu_forwards_addr)
745                .map_err(|err| {
746                    error!("Failed to set public TPU address to {public_tpu_forwards_addr}: {err}");
747                    jsonrpc_core::error::Error::internal_error()
748                })?;
749            let my_contact_info = post_init.cluster_info.my_contact_info();
750            warn!(
751                "Public TPU Forwards addresses set to {:?} (udp) and {:?} (quic)",
752                my_contact_info.tpu_forwards(Protocol::UDP),
753                my_contact_info.tpu_forwards(Protocol::QUIC),
754            );
755            Ok(())
756        })
757    }
758
759    fn manage_block_production(
760        &self,
761        meta: Self::Metadata,
762        block_production_method: BlockProductionMethod,
763        transaction_struct: TransactionStructure,
764        num_workers: NonZeroUsize,
765        scheduler_pacing: SchedulerPacing,
766    ) -> Result<()> {
767        debug!("manage_block_production rpc request received");
768
769        if num_workers > BankingStage::max_num_workers() {
770            return Err(jsonrpc_core::error::Error::invalid_params(format!(
771                "Number of workers ({}) exceeds maximum allowed ({})",
772                num_workers,
773                BankingStage::max_num_workers()
774            )));
775        }
776
777        if transaction_struct != TransactionStructure::View {
778            warn!("TransactionStructure::Sdk has no effect on block production");
779        }
780
781        meta.with_post_init(|post_init| {
782            let mut banking_stage = post_init.banking_stage.write().unwrap();
783            let Some(banking_stage) = banking_stage.as_mut() else {
784                error!("banking stage is not initialized");
785                return Err(jsonrpc_core::error::Error::internal_error());
786            };
787
788            banking_stage
789                .spawn_internal_threads(
790                    block_production_method,
791                    num_workers,
792                    SchedulerConfig { scheduler_pacing },
793                )
794                .map_err(|err| {
795                    error!("Failed to spawn new non-vote threads: {err:?}");
796                    jsonrpc_core::error::Error::internal_error()
797                })?;
798
799            Ok(())
800        })
801    }
802}
803
804impl AdminRpcImpl {
805    fn add_authorized_voter_keypair(
806        meta: AdminRpcRequestMetadata,
807        authorized_voter: Keypair,
808    ) -> Result<()> {
809        let mut authorized_voter_keypairs = meta.authorized_voter_keypairs.write().unwrap();
810
811        if authorized_voter_keypairs
812            .iter()
813            .any(|x| x.pubkey() == authorized_voter.pubkey())
814        {
815            Err(jsonrpc_core::error::Error::invalid_params(
816                "Authorized voter already present",
817            ))
818        } else {
819            authorized_voter_keypairs.push(Arc::new(authorized_voter));
820            Ok(())
821        }
822    }
823
824    fn set_identity_keypair(
825        meta: AdminRpcRequestMetadata,
826        identity_keypair: Keypair,
827        require_tower: bool,
828    ) -> Result<()> {
829        meta.with_post_init(|post_init| {
830            if require_tower {
831                let _ = Tower::restore(meta.tower_storage.as_ref(), &identity_keypair.pubkey())
832                    .map_err(|err| {
833                        jsonrpc_core::error::Error::invalid_params(format!(
834                            "Unable to load tower file for identity {}: {}",
835                            identity_keypair.pubkey(),
836                            err
837                        ))
838                    })?;
839            }
840
841            for (key, notifier) in &*post_init.notifies.read().unwrap() {
842                if let Err(err) = notifier.update_key(&identity_keypair) {
843                    error!("Error updating network layer keypair: {err} on {key:?}");
844                }
845            }
846
847            solana_metrics::set_host_id(identity_keypair.pubkey().to_string());
848            post_init
849                .cluster_info
850                .set_keypair(Arc::new(identity_keypair));
851            warn!("Identity set to {}", post_init.cluster_info.id());
852            Ok(())
853        })
854    }
855}
856
857fn rpc_account_index_from_account_index(account_index: &AccountIndex) -> RpcAccountIndex {
858    match account_index {
859        AccountIndex::ProgramId => RpcAccountIndex::ProgramId,
860        AccountIndex::SplTokenOwner => RpcAccountIndex::SplTokenOwner,
861        AccountIndex::SplTokenMint => RpcAccountIndex::SplTokenMint,
862    }
863}
864
865// Start the Admin RPC interface
866pub fn run(ledger_path: &Path, metadata: AdminRpcRequestMetadata) {
867    let admin_rpc_path = admin_rpc_path(ledger_path);
868
869    let event_loop = tokio::runtime::Builder::new_multi_thread()
870        .thread_name("solAdminRpcEl")
871        .worker_threads(3) // Three still seems like a lot, and better than the default of available core count
872        .enable_all()
873        .build()
874        .unwrap();
875
876    Builder::new()
877        .name("solAdminRpc".to_string())
878        .spawn(move || {
879            let mut io = MetaIoHandler::default();
880            io.extend_with(AdminRpcImpl.to_delegate());
881
882            let validator_exit = metadata.validator_exit.clone();
883            let server = ServerBuilder::with_meta_extractor(io, move |_req: &RequestContext| {
884                metadata.clone()
885            })
886            .event_loop_executor(event_loop.handle().clone())
887            .start(&format!("{}", admin_rpc_path.display()));
888
889            match server {
890                Err(err) => {
891                    warn!("Unable to start admin rpc service: {err:?}");
892                }
893                Ok(server) => {
894                    info!("started admin rpc service!");
895                    let close_handle = server.close_handle();
896                    validator_exit
897                        .write()
898                        .unwrap()
899                        .register_exit(Box::new(move || {
900                            close_handle.close();
901                        }));
902
903                    server.wait();
904                }
905            }
906        })
907        .unwrap();
908}
909
910fn admin_rpc_path(ledger_path: &Path) -> PathBuf {
911    #[cfg(target_family = "windows")]
912    {
913        // More information about the wackiness of pipe names over at
914        // https://docs.microsoft.com/en-us/windows/win32/ipc/pipe-names
915        if let Some(ledger_filename) = ledger_path.file_name() {
916            PathBuf::from(format!(
917                "\\\\.\\pipe\\{}-admin.rpc",
918                ledger_filename.to_string_lossy()
919            ))
920        } else {
921            PathBuf::from("\\\\.\\pipe\\admin.rpc")
922        }
923    }
924    #[cfg(not(target_family = "windows"))]
925    {
926        ledger_path.join("admin.rpc")
927    }
928}
929
930// Connect to the Admin RPC interface
931pub async fn connect(ledger_path: &Path) -> std::result::Result<gen_client::Client, RpcError> {
932    let admin_rpc_path = admin_rpc_path(ledger_path);
933    if !admin_rpc_path.exists() {
934        Err(RpcError::Client(format!(
935            "{} does not exist",
936            admin_rpc_path.display()
937        )))
938    } else {
939        ipc::connect::<_, gen_client::Client>(&format!("{}", admin_rpc_path.display())).await
940    }
941}
942
943// Create a runtime for use by client side admin RPC interface calls
944pub fn runtime() -> Runtime {
945    tokio::runtime::Builder::new_multi_thread()
946        .thread_name("solAdminRpcRt")
947        .enable_all()
948        // The agave-validator subcommands make few admin RPC calls and block
949        // on the results so two workers is plenty
950        .worker_threads(2)
951        .build()
952        .expect("new tokio runtime")
953}
954
955#[derive(Default, Deserialize, Clone)]
956pub struct StakedNodesOverrides {
957    #[serde(deserialize_with = "deserialize_pubkey_map")]
958    pub staked_map_id: HashMap<Pubkey, u64>,
959}
960
961pub fn deserialize_pubkey_map<'de, D>(des: D) -> std::result::Result<HashMap<Pubkey, u64>, D::Error>
962where
963    D: Deserializer<'de>,
964{
965    let container: HashMap<String, u64> = serde::Deserialize::deserialize(des)?;
966    let mut container_typed: HashMap<Pubkey, u64> = HashMap::new();
967    for (key, value) in container.iter() {
968        let typed_key = Pubkey::try_from(key.as_str())
969            .map_err(|_| serde::de::Error::invalid_type(serde::de::Unexpected::Map, &"PubKey"))?;
970        container_typed.insert(typed_key, *value);
971    }
972    Ok(container_typed)
973}
974
975pub fn load_staked_nodes_overrides(
976    path: &String,
977) -> std::result::Result<StakedNodesOverrides, Box<dyn error::Error>> {
978    debug!("Loading staked nodes overrides configuration from {path}");
979    if Path::new(&path).exists() {
980        let file = std::fs::File::open(path)?;
981        Ok(serde_yaml::from_reader(file)?)
982    } else {
983        Err(format!("Staked nodes overrides provided '{path}' a non-existing file path.").into())
984    }
985}
986
987#[cfg(test)]
988mod tests {
989    use {
990        super::*,
991        serde_json::Value,
992        solana_account::{Account, AccountSharedData},
993        solana_accounts_db::{
994            accounts_db::{AccountsDbConfig, ACCOUNTS_DB_CONFIG_FOR_TESTING},
995            accounts_index::AccountSecondaryIndexes,
996        },
997        solana_core::{
998            admin_rpc_post_init::{KeyUpdaterType, KeyUpdaters},
999            consensus::tower_storage::NullTowerStorage,
1000            validator::{Validator, ValidatorConfig, ValidatorTpuConfig},
1001        },
1002        solana_gossip::{cluster_info::ClusterInfo, node::Node},
1003        solana_ledger::{
1004            create_new_tmp_ledger,
1005            genesis_utils::{
1006                create_genesis_config, create_genesis_config_with_leader, GenesisConfigInfo,
1007            },
1008        },
1009        solana_net_utils::sockets::bind_to_localhost_unique,
1010        solana_program_option::COption,
1011        solana_program_pack::Pack,
1012        solana_pubkey::Pubkey,
1013        solana_rpc::rpc::create_validator_exit,
1014        solana_runtime::{
1015            bank::{Bank, BankTestConfig},
1016            bank_forks::BankForks,
1017        },
1018        solana_streamer::socket::SocketAddrSpace,
1019        solana_system_interface::program as system_program,
1020        solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP,
1021        spl_generic_token::token,
1022        spl_token_2022_interface::state::{
1023            Account as TokenAccount, AccountState as TokenAccountState, Mint,
1024        },
1025        std::{collections::HashSet, fs::remove_dir_all, sync::atomic::AtomicBool},
1026    };
1027
1028    #[derive(Default)]
1029    struct TestConfig {
1030        account_indexes: AccountSecondaryIndexes,
1031    }
1032
1033    struct RpcHandler {
1034        io: MetaIoHandler<AdminRpcRequestMetadata>,
1035        meta: AdminRpcRequestMetadata,
1036        bank_forks: Arc<RwLock<BankForks>>,
1037    }
1038
1039    impl RpcHandler {
1040        fn _start() -> Self {
1041            Self::start_with_config(TestConfig::default())
1042        }
1043
1044        fn start_with_config(config: TestConfig) -> Self {
1045            let keypair = Arc::new(Keypair::new());
1046            let cluster_info = Arc::new(ClusterInfo::new(
1047                ContactInfo::new(
1048                    keypair.pubkey(),
1049                    solana_time_utils::timestamp(), // wallclock
1050                    0u16,                           // shred_version
1051                ),
1052                keypair,
1053                SocketAddrSpace::Unspecified,
1054            ));
1055            let exit = Arc::new(AtomicBool::new(false));
1056            let validator_exit = create_validator_exit(exit);
1057            let (bank_forks, vote_keypair) = new_bank_forks_with_config(BankTestConfig {
1058                accounts_db_config: AccountsDbConfig {
1059                    account_indexes: Some(config.account_indexes),
1060                    ..ACCOUNTS_DB_CONFIG_FOR_TESTING
1061                },
1062            });
1063            let vote_account = vote_keypair.pubkey();
1064            let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
1065            let repair_whitelist = Arc::new(RwLock::new(HashSet::new()));
1066            let meta = AdminRpcRequestMetadata {
1067                rpc_addr: None,
1068                start_time: SystemTime::now(),
1069                start_progress,
1070                validator_exit,
1071                validator_exit_backpressure: HashMap::default(),
1072                authorized_voter_keypairs: Arc::new(RwLock::new(vec![vote_keypair])),
1073                tower_storage: Arc::new(NullTowerStorage {}),
1074                post_init: Arc::new(RwLock::new(Some(AdminRpcRequestMetadataPostInit {
1075                    cluster_info,
1076                    bank_forks: bank_forks.clone(),
1077                    vote_account,
1078                    repair_whitelist,
1079                    notifies: Arc::new(RwLock::new(KeyUpdaters::default())),
1080                    repair_socket: Arc::new(bind_to_localhost_unique().expect("should bind")),
1081                    outstanding_repair_requests: Arc::<
1082                        RwLock<repair_service::OutstandingShredRepairs>,
1083                    >::default(),
1084                    cluster_slots: Arc::new(
1085                        solana_core::cluster_slots_service::cluster_slots::ClusterSlots::default_for_tests(),
1086                    ),
1087                    node: None,
1088                    banking_stage: Arc::new(RwLock::new(None)),
1089                }))),
1090                staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
1091                rpc_to_plugin_manager_sender: None,
1092            };
1093            let mut io = MetaIoHandler::default();
1094            io.extend_with(AdminRpcImpl.to_delegate());
1095
1096            Self {
1097                io,
1098                meta,
1099                bank_forks,
1100            }
1101        }
1102
1103        fn root_bank(&self) -> Arc<Bank> {
1104            self.bank_forks.read().unwrap().root_bank()
1105        }
1106    }
1107
1108    fn new_bank_forks_with_config(
1109        config: BankTestConfig,
1110    ) -> (Arc<RwLock<BankForks>>, Arc<Keypair>) {
1111        let GenesisConfigInfo {
1112            genesis_config,
1113            voting_keypair,
1114            ..
1115        } = create_genesis_config(1_000_000_000);
1116
1117        let bank = Bank::new_with_config_for_tests(&genesis_config, config);
1118        (BankForks::new_rw_arc(bank), Arc::new(voting_keypair))
1119    }
1120
1121    #[test]
1122    fn test_secondary_index_key_sizes() {
1123        for secondary_index_enabled in [true, false] {
1124            let account_indexes = if secondary_index_enabled {
1125                AccountSecondaryIndexes {
1126                    keys: None,
1127                    indexes: HashSet::from([
1128                        AccountIndex::ProgramId,
1129                        AccountIndex::SplTokenMint,
1130                        AccountIndex::SplTokenOwner,
1131                    ]),
1132                }
1133            } else {
1134                AccountSecondaryIndexes::default()
1135            };
1136
1137            // RPC & Bank Setup
1138            let rpc = RpcHandler::start_with_config(TestConfig { account_indexes });
1139
1140            let bank = rpc.root_bank();
1141            let RpcHandler { io, meta, .. } = rpc;
1142
1143            // Pubkeys
1144            let token_account1_pubkey = Pubkey::new_unique();
1145            let token_account2_pubkey = Pubkey::new_unique();
1146            let token_account3_pubkey = Pubkey::new_unique();
1147            let mint1_pubkey = Pubkey::new_unique();
1148            let mint2_pubkey = Pubkey::new_unique();
1149            let wallet1_pubkey = Pubkey::new_unique();
1150            let wallet2_pubkey = Pubkey::new_unique();
1151            let non_existent_pubkey = Pubkey::new_unique();
1152            let delegate = Pubkey::new_unique();
1153
1154            let mut num_default_spl_token_program_accounts = 0;
1155            let mut num_default_system_program_accounts = 0;
1156
1157            if !secondary_index_enabled {
1158                // Test first with no accounts added & no secondary indexes enabled:
1159                let req = format!(
1160                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{token_account1_pubkey}"]}}"#,
1161                );
1162                let res = io.handle_request_sync(&req, meta.clone());
1163                let result: Value = serde_json::from_str(&res.expect("actual response"))
1164                    .expect("actual response deserialization");
1165                let sizes: HashMap<RpcAccountIndex, usize> =
1166                    serde_json::from_value(result["result"].clone()).unwrap();
1167                assert!(sizes.is_empty());
1168            } else {
1169                // Count SPL Token Program Default Accounts
1170                let req = format!(
1171                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1172                    token::id(),
1173                );
1174                let res = io.handle_request_sync(&req, meta.clone());
1175                let result: Value = serde_json::from_str(&res.expect("actual response"))
1176                    .expect("actual response deserialization");
1177                let sizes: HashMap<RpcAccountIndex, usize> =
1178                    serde_json::from_value(result["result"].clone()).unwrap();
1179                assert_eq!(sizes.len(), 1);
1180                num_default_spl_token_program_accounts =
1181                    *sizes.get(&RpcAccountIndex::ProgramId).unwrap();
1182                // Count System Program Default Accounts
1183                let req = format!(
1184                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1185                    system_program::id(),
1186                );
1187                let res = io.handle_request_sync(&req, meta.clone());
1188                let result: Value = serde_json::from_str(&res.expect("actual response"))
1189                    .expect("actual response deserialization");
1190                let sizes: HashMap<RpcAccountIndex, usize> =
1191                    serde_json::from_value(result["result"].clone()).unwrap();
1192                assert_eq!(sizes.len(), 1);
1193                num_default_system_program_accounts =
1194                    *sizes.get(&RpcAccountIndex::ProgramId).unwrap();
1195            }
1196
1197            // Add 2 basic wallet accounts
1198            let wallet1_account = AccountSharedData::from(Account {
1199                lamports: 11111111,
1200                owner: system_program::id(),
1201                ..Account::default()
1202            });
1203            bank.store_account(&wallet1_pubkey, &wallet1_account);
1204            let wallet2_account = AccountSharedData::from(Account {
1205                lamports: 11111111,
1206                owner: system_program::id(),
1207                ..Account::default()
1208            });
1209            bank.store_account(&wallet2_pubkey, &wallet2_account);
1210
1211            // Add a token account
1212            let mut account1_data = vec![0; TokenAccount::get_packed_len()];
1213            let token_account1 = TokenAccount {
1214                mint: mint1_pubkey,
1215                owner: wallet1_pubkey,
1216                delegate: COption::Some(delegate),
1217                amount: 420,
1218                state: TokenAccountState::Initialized,
1219                is_native: COption::None,
1220                delegated_amount: 30,
1221                close_authority: COption::Some(wallet1_pubkey),
1222            };
1223            TokenAccount::pack(token_account1, &mut account1_data).unwrap();
1224            let token_account1 = AccountSharedData::from(Account {
1225                lamports: 111,
1226                data: account1_data.to_vec(),
1227                owner: token::id(),
1228                ..Account::default()
1229            });
1230            bank.store_account(&token_account1_pubkey, &token_account1);
1231
1232            // Add the mint
1233            let mut mint1_data = vec![0; Mint::get_packed_len()];
1234            let mint1_state = Mint {
1235                mint_authority: COption::Some(wallet1_pubkey),
1236                supply: 500,
1237                decimals: 2,
1238                is_initialized: true,
1239                freeze_authority: COption::Some(wallet1_pubkey),
1240            };
1241            Mint::pack(mint1_state, &mut mint1_data).unwrap();
1242            let mint_account1 = AccountSharedData::from(Account {
1243                lamports: 222,
1244                data: mint1_data.to_vec(),
1245                owner: token::id(),
1246                ..Account::default()
1247            });
1248            bank.store_account(&mint1_pubkey, &mint_account1);
1249
1250            // Add another token account with the different owner, but same delegate, and mint
1251            let mut account2_data = vec![0; TokenAccount::get_packed_len()];
1252            let token_account2 = TokenAccount {
1253                mint: mint1_pubkey,
1254                owner: wallet2_pubkey,
1255                delegate: COption::Some(delegate),
1256                amount: 420,
1257                state: TokenAccountState::Initialized,
1258                is_native: COption::None,
1259                delegated_amount: 30,
1260                close_authority: COption::Some(wallet2_pubkey),
1261            };
1262            TokenAccount::pack(token_account2, &mut account2_data).unwrap();
1263            let token_account2 = AccountSharedData::from(Account {
1264                lamports: 333,
1265                data: account2_data.to_vec(),
1266                owner: token::id(),
1267                ..Account::default()
1268            });
1269            bank.store_account(&token_account2_pubkey, &token_account2);
1270
1271            // Add another token account with the same owner and delegate but different mint
1272            let mut account3_data = vec![0; TokenAccount::get_packed_len()];
1273            let token_account3 = TokenAccount {
1274                mint: mint2_pubkey,
1275                owner: wallet2_pubkey,
1276                delegate: COption::Some(delegate),
1277                amount: 42,
1278                state: TokenAccountState::Initialized,
1279                is_native: COption::None,
1280                delegated_amount: 30,
1281                close_authority: COption::Some(wallet2_pubkey),
1282            };
1283            TokenAccount::pack(token_account3, &mut account3_data).unwrap();
1284            let token_account3 = AccountSharedData::from(Account {
1285                lamports: 444,
1286                data: account3_data.to_vec(),
1287                owner: token::id(),
1288                ..Account::default()
1289            });
1290            bank.store_account(&token_account3_pubkey, &token_account3);
1291
1292            // Add the new mint
1293            let mut mint2_data = vec![0; Mint::get_packed_len()];
1294            let mint2_state = Mint {
1295                mint_authority: COption::Some(wallet2_pubkey),
1296                supply: 200,
1297                decimals: 3,
1298                is_initialized: true,
1299                freeze_authority: COption::Some(wallet2_pubkey),
1300            };
1301            Mint::pack(mint2_state, &mut mint2_data).unwrap();
1302            let mint_account2 = AccountSharedData::from(Account {
1303                lamports: 555,
1304                data: mint2_data.to_vec(),
1305                owner: token::id(),
1306                ..Account::default()
1307            });
1308            bank.store_account(&mint2_pubkey, &mint_account2);
1309
1310            // Accounts should now look like the following:
1311            //
1312            //                   -----system_program------
1313            //                  /                         \
1314            //                 /-(owns)                    \-(owns)
1315            //                /                             \
1316            //             wallet1                   ---wallet2---
1317            //               /                      /             \
1318            //              /-(SPL::owns)          /-(SPL::owns)   \-(SPL::owns)
1319            //             /                      /                 \
1320            //      token_account1         token_account2       token_account3
1321            //            \                     /                   /
1322            //             \-(SPL::mint)       /-(SPL::mint)       /-(SPL::mint)
1323            //              \                 /                   /
1324            //               --mint_account1--               mint_account2
1325
1326            if secondary_index_enabled {
1327                // ----------- Test for a non-existent key -----------
1328                let req = format!(
1329                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{non_existent_pubkey}"]}}"#,
1330                );
1331                let res = io.handle_request_sync(&req, meta.clone());
1332                let result: Value = serde_json::from_str(&res.expect("actual response"))
1333                    .expect("actual response deserialization");
1334                let sizes: HashMap<RpcAccountIndex, usize> =
1335                    serde_json::from_value(result["result"].clone()).unwrap();
1336                assert!(sizes.is_empty());
1337                // --------------- Test Queries ---------------
1338                // 1) Wallet1 - Owns 1 SPL Token
1339                let req = format!(
1340                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{wallet1_pubkey}"]}}"#,
1341                );
1342                let res = io.handle_request_sync(&req, meta.clone());
1343                let result: Value = serde_json::from_str(&res.expect("actual response"))
1344                    .expect("actual response deserialization");
1345                let sizes: HashMap<RpcAccountIndex, usize> =
1346                    serde_json::from_value(result["result"].clone()).unwrap();
1347                assert_eq!(sizes.len(), 1);
1348                assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenOwner).unwrap(), 1);
1349                // 2) Wallet2 - Owns 2 SPL Tokens
1350                let req = format!(
1351                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{wallet2_pubkey}"]}}"#,
1352                );
1353                let res = io.handle_request_sync(&req, meta.clone());
1354                let result: Value = serde_json::from_str(&res.expect("actual response"))
1355                    .expect("actual response deserialization");
1356                let sizes: HashMap<RpcAccountIndex, usize> =
1357                    serde_json::from_value(result["result"].clone()).unwrap();
1358                assert_eq!(sizes.len(), 1);
1359                assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenOwner).unwrap(), 2);
1360                // 3) Mint1 - Is in 2 SPL Accounts
1361                let req = format!(
1362                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{mint1_pubkey}"]}}"#,
1363                );
1364                let res = io.handle_request_sync(&req, meta.clone());
1365                let result: Value = serde_json::from_str(&res.expect("actual response"))
1366                    .expect("actual response deserialization");
1367                let sizes: HashMap<RpcAccountIndex, usize> =
1368                    serde_json::from_value(result["result"].clone()).unwrap();
1369                assert_eq!(sizes.len(), 1);
1370                assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenMint).unwrap(), 2);
1371                // 4) Mint2 - Is in 1 SPL Account
1372                let req = format!(
1373                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{mint2_pubkey}"]}}"#,
1374                );
1375                let res = io.handle_request_sync(&req, meta.clone());
1376                let result: Value = serde_json::from_str(&res.expect("actual response"))
1377                    .expect("actual response deserialization");
1378                let sizes: HashMap<RpcAccountIndex, usize> =
1379                    serde_json::from_value(result["result"].clone()).unwrap();
1380                assert_eq!(sizes.len(), 1);
1381                assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenMint).unwrap(), 1);
1382                // 5) SPL Token Program Owns 6 Accounts - 1 Default, 5 created above.
1383                let req = format!(
1384                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1385                    token::id(),
1386                );
1387                let res = io.handle_request_sync(&req, meta.clone());
1388                let result: Value = serde_json::from_str(&res.expect("actual response"))
1389                    .expect("actual response deserialization");
1390                let sizes: HashMap<RpcAccountIndex, usize> =
1391                    serde_json::from_value(result["result"].clone()).unwrap();
1392                assert_eq!(sizes.len(), 1);
1393                assert_eq!(
1394                    *sizes.get(&RpcAccountIndex::ProgramId).unwrap(),
1395                    (num_default_spl_token_program_accounts + 5)
1396                );
1397                // 5) System Program Owns 4 Accounts + 2 Default, 2 created above.
1398                let req = format!(
1399                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1400                    system_program::id(),
1401                );
1402                let res = io.handle_request_sync(&req, meta.clone());
1403                let result: Value = serde_json::from_str(&res.expect("actual response"))
1404                    .expect("actual response deserialization");
1405                let sizes: HashMap<RpcAccountIndex, usize> =
1406                    serde_json::from_value(result["result"].clone()).unwrap();
1407                assert_eq!(sizes.len(), 1);
1408                assert_eq!(
1409                    *sizes.get(&RpcAccountIndex::ProgramId).unwrap(),
1410                    (num_default_system_program_accounts + 2)
1411                );
1412            } else {
1413                // ------------ Secondary Indexes Disabled ------------
1414                let req = format!(
1415                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{token_account2_pubkey}"]}}"#,
1416                );
1417                let res = io.handle_request_sync(&req, meta.clone());
1418                let result: Value = serde_json::from_str(&res.expect("actual response"))
1419                    .expect("actual response deserialization");
1420                let sizes: HashMap<RpcAccountIndex, usize> =
1421                    serde_json::from_value(result["result"].clone()).unwrap();
1422                assert!(sizes.is_empty());
1423            }
1424        }
1425    }
1426
1427    // This test checks that the rpc call to `set_identity` works a expected with
1428    // Bank but without validator.
1429    #[test]
1430    fn test_set_identity() {
1431        let rpc = RpcHandler::start_with_config(TestConfig::default());
1432
1433        let RpcHandler { io, meta, .. } = rpc;
1434
1435        let expected_validator_id = Keypair::new();
1436        let validator_id_bytes = format!("{:?}", expected_validator_id.to_bytes());
1437
1438        let set_id_request = format!(
1439            r#"{{"jsonrpc":"2.0","id":1,"method":"setIdentityFromBytes","params":[{validator_id_bytes}, false]}}"#,
1440        );
1441        let response = io.handle_request_sync(&set_id_request, meta.clone());
1442        let actual_parsed_response: Value =
1443            serde_json::from_str(&response.expect("actual response"))
1444                .expect("actual response deserialization");
1445
1446        let expected_parsed_response: Value = serde_json::from_str(
1447            r#"{
1448                "id": 1,
1449                "jsonrpc": "2.0",
1450                "result": null
1451            }"#,
1452        )
1453        .expect("Failed to parse expected response");
1454        assert_eq!(actual_parsed_response, expected_parsed_response);
1455
1456        let contact_info_request =
1457            r#"{"jsonrpc":"2.0","id":1,"method":"contactInfo","params":[]}"#.to_string();
1458        let response = io.handle_request_sync(&contact_info_request, meta.clone());
1459        let parsed_response: Value = serde_json::from_str(&response.expect("actual response"))
1460            .expect("actual response deserialization");
1461        let actual_validator_id = parsed_response["result"]["id"]
1462            .as_str()
1463            .expect("Expected a string");
1464        assert_eq!(
1465            actual_validator_id,
1466            expected_validator_id.pubkey().to_string()
1467        );
1468    }
1469
1470    struct TestValidatorWithAdminRpc {
1471        meta: AdminRpcRequestMetadata,
1472        io: MetaIoHandler<AdminRpcRequestMetadata>,
1473        validator_ledger_path: PathBuf,
1474    }
1475
1476    impl TestValidatorWithAdminRpc {
1477        fn new() -> Self {
1478            let leader_keypair = Keypair::new();
1479            let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
1480
1481            let validator_keypair = Keypair::new();
1482            let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
1483            let genesis_config =
1484                create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1000)
1485                    .genesis_config;
1486            let (validator_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
1487
1488            let voting_keypair = Arc::new(Keypair::new());
1489            let voting_pubkey = voting_keypair.pubkey();
1490            let authorized_voter_keypairs = Arc::new(RwLock::new(vec![voting_keypair]));
1491            let validator_config = ValidatorConfig {
1492                rpc_addrs: Some((
1493                    validator_node.info.rpc().unwrap(),
1494                    validator_node.info.rpc_pubsub().unwrap(),
1495                )),
1496                ..ValidatorConfig::default_for_test()
1497            };
1498            let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
1499
1500            let post_init = Arc::new(RwLock::new(None));
1501            let meta = AdminRpcRequestMetadata {
1502                rpc_addr: validator_config.rpc_addrs.map(|(rpc_addr, _)| rpc_addr),
1503                start_time: SystemTime::now(),
1504                start_progress: start_progress.clone(),
1505                validator_exit: validator_config.validator_exit.clone(),
1506                validator_exit_backpressure: HashMap::default(),
1507                authorized_voter_keypairs: authorized_voter_keypairs.clone(),
1508                tower_storage: Arc::new(NullTowerStorage {}),
1509                post_init: post_init.clone(),
1510                staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
1511                rpc_to_plugin_manager_sender: None,
1512            };
1513
1514            let _validator = Validator::new(
1515                validator_node,
1516                Arc::new(validator_keypair),
1517                &validator_ledger_path,
1518                &voting_pubkey,
1519                authorized_voter_keypairs,
1520                vec![leader_node.info],
1521                &validator_config,
1522                true, // should_check_duplicate_instance
1523                None, // rpc_to_plugin_manager_receiver
1524                start_progress.clone(),
1525                SocketAddrSpace::Unspecified,
1526                ValidatorTpuConfig::new_for_tests(DEFAULT_TPU_ENABLE_UDP),
1527                post_init.clone(),
1528            )
1529            .expect("assume successful validator start");
1530            assert_eq!(
1531                *start_progress.read().unwrap(),
1532                ValidatorStartProgress::Running
1533            );
1534            let post_init = post_init.read().unwrap();
1535
1536            assert!(post_init.is_some());
1537            let post_init = post_init.as_ref().unwrap();
1538            let notifies = post_init.notifies.read().unwrap();
1539            let updater_keys: HashSet<KeyUpdaterType> =
1540                notifies.into_iter().map(|(key, _)| key.clone()).collect();
1541            assert_eq!(
1542                updater_keys,
1543                HashSet::from_iter(vec![
1544                    KeyUpdaterType::Tpu,
1545                    KeyUpdaterType::TpuForwards,
1546                    KeyUpdaterType::TpuVote,
1547                    KeyUpdaterType::Forward,
1548                    KeyUpdaterType::RpcService
1549                ])
1550            );
1551            let mut io = MetaIoHandler::default();
1552            io.extend_with(AdminRpcImpl.to_delegate());
1553            Self {
1554                meta,
1555                io,
1556                validator_ledger_path,
1557            }
1558        }
1559
1560        fn handle_request(&self, request: &str) -> Option<String> {
1561            self.io.handle_request_sync(request, self.meta.clone())
1562        }
1563    }
1564
1565    impl Drop for TestValidatorWithAdminRpc {
1566        fn drop(&mut self) {
1567            remove_dir_all(self.validator_ledger_path.clone()).unwrap();
1568        }
1569    }
1570
1571    // This test checks that `set_identity` call works with working validator and client.
1572    #[test]
1573    fn test_set_identity_with_validator() {
1574        let test_validator = TestValidatorWithAdminRpc::new();
1575        let expected_validator_id = Keypair::new();
1576        let validator_id_bytes = format!("{:?}", expected_validator_id.to_bytes());
1577
1578        let set_id_request = format!(
1579            r#"{{"jsonrpc":"2.0","id":1,"method":"setIdentityFromBytes","params":[{validator_id_bytes}, false]}}"#,
1580        );
1581        let response = test_validator.handle_request(&set_id_request);
1582        let actual_parsed_response: Value =
1583            serde_json::from_str(&response.expect("actual response"))
1584                .expect("actual response deserialization");
1585
1586        let expected_parsed_response: Value = serde_json::from_str(
1587            r#"{
1588                "id": 1,
1589                "jsonrpc": "2.0",
1590                "result": null
1591            }"#,
1592        )
1593        .expect("Failed to parse expected response");
1594        assert_eq!(actual_parsed_response, expected_parsed_response);
1595
1596        let contact_info_request =
1597            r#"{"jsonrpc":"2.0","id":1,"method":"contactInfo","params":[]}"#.to_string();
1598        let response = test_validator.handle_request(&contact_info_request);
1599        let parsed_response: Value = serde_json::from_str(&response.expect("actual response"))
1600            .expect("actual response deserialization");
1601        let actual_validator_id = parsed_response["result"]["id"]
1602            .as_str()
1603            .expect("Expected a string");
1604        assert_eq!(
1605            actual_validator_id,
1606            expected_validator_id.pubkey().to_string()
1607        );
1608
1609        let contact_info_request =
1610            r#"{"jsonrpc":"2.0","id":1,"method":"exit","params":[]}"#.to_string();
1611        let exit_response = test_validator.handle_request(&contact_info_request);
1612        let actual_parsed_response: Value =
1613            serde_json::from_str(&exit_response.expect("actual response"))
1614                .expect("actual response deserialization");
1615        assert_eq!(actual_parsed_response, expected_parsed_response);
1616    }
1617}