Skip to main content

agave_validator/
admin_rpc_service.rs

1use {
2    crossbeam_channel::Sender,
3    jsonrpc_core::{BoxFuture, ErrorCode, MetaIoHandler, Metadata, Result},
4    jsonrpc_core_client::{RpcError, transports::ipc},
5    jsonrpc_derive::rpc,
6    jsonrpc_ipc_server::{
7        RequestContext, ServerBuilder, tokio::sync::oneshot::channel as oneshot_channel,
8    },
9    log::*,
10    serde::{Deserialize, Serialize, de::Deserializer},
11    solana_accounts_db::accounts_index::AccountIndex,
12    solana_core::{
13        admin_rpc_post_init::AdminRpcRequestMetadataPostInit,
14        banking_stage::{
15            BankingControlMsg, BankingStage,
16            transaction_scheduler::scheduler_controller::SchedulerConfig,
17        },
18        consensus::{Tower, tower_storage::TowerStorage},
19        repair::repair_service,
20        validator::{
21            BlockProductionMethod, SchedulerPacing, TransactionStructure, ValidatorStartProgress,
22        },
23    },
24    solana_geyser_plugin_manager::GeyserPluginManagerRequest,
25    solana_gossip::contact_info::{ContactInfo, Protocol, SOCKET_ADDR_UNSPECIFIED},
26    solana_keypair::{Keypair, read_keypair_file},
27    solana_metrics::{datapoint_info, datapoint_warn},
28    solana_pubkey::Pubkey,
29    solana_rpc::rpc::verify_pubkey,
30    solana_rpc_client_api::{config::RpcAccountIndex, custom_error::RpcCustomError},
31    solana_runtime::snapshot_controller::SnapshotController,
32    solana_signer::Signer,
33    solana_validator_exit::Exit,
34    std::{
35        collections::{HashMap, HashSet},
36        env, error,
37        fmt::{self, Display},
38        net::{IpAddr, SocketAddr},
39        num::NonZeroUsize,
40        path::{Path, PathBuf},
41        sync::{
42            Arc, RwLock,
43            atomic::{AtomicBool, Ordering},
44        },
45        thread::{self, Builder},
46        time::{Duration, Instant, SystemTime},
47    },
48    tokio::runtime::Runtime,
49};
50
51#[derive(Clone)]
52pub struct AdminRpcRequestMetadata {
53    pub rpc_addr: Option<SocketAddr>,
54    pub start_time: SystemTime,
55    pub start_progress: Arc<RwLock<ValidatorStartProgress>>,
56    pub validator_exit: Arc<RwLock<Exit>>,
57    pub validator_exit_backpressure: HashMap<String, Arc<AtomicBool>>,
58    pub authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
59    pub tower_storage: Arc<dyn TowerStorage>,
60    pub staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
61    pub post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
62    pub rpc_to_plugin_manager_sender: Option<Sender<GeyserPluginManagerRequest>>,
63}
64
65impl Metadata for AdminRpcRequestMetadata {}
66
67impl AdminRpcRequestMetadata {
68    fn with_post_init<F, R>(&self, func: F) -> Result<R>
69    where
70        F: FnOnce(&AdminRpcRequestMetadataPostInit) -> Result<R>,
71    {
72        if let Some(post_init) = self.post_init.read().unwrap().as_ref() {
73            func(post_init)
74        } else {
75            Err(jsonrpc_core::error::Error::invalid_params(
76                "Retry once validator start up is complete",
77            ))
78        }
79    }
80
81    fn snapshot_controller(&self) -> Option<Arc<SnapshotController>> {
82        self.with_post_init(|post_init| Ok(post_init.snapshot_controller.clone()))
83            .map_err(|_| {
84                // The error from with_post_init is not relevant, as it is meant for RPC callers
85                warn!("snapshot_controller unavailable, shutting down without taking snapshot");
86            })
87            .ok()
88    }
89}
90
91#[derive(Debug, Deserialize, Serialize)]
92pub struct AdminRpcContactInfo {
93    pub id: String,
94    pub gossip: SocketAddr,
95    pub tvu: SocketAddr,
96    pub serve_repair_quic: SocketAddr,
97    pub tpu: SocketAddr,
98    pub tpu_forwards: SocketAddr,
99    pub tpu_vote: SocketAddr,
100    pub rpc: SocketAddr,
101    pub rpc_pubsub: SocketAddr,
102    pub serve_repair: SocketAddr,
103    pub last_updated_timestamp: u64,
104    pub shred_version: u16,
105}
106
107#[derive(Debug, Deserialize, Serialize)]
108pub struct AdminRpcRepairWhitelist {
109    pub whitelist: Vec<Pubkey>,
110}
111
112impl From<ContactInfo> for AdminRpcContactInfo {
113    fn from(node: ContactInfo) -> Self {
114        macro_rules! unwrap_socket {
115            ($name:ident) => {
116                node.$name().unwrap_or(SOCKET_ADDR_UNSPECIFIED)
117            };
118            ($name:ident, $protocol:expr) => {
119                node.$name($protocol).unwrap_or(SOCKET_ADDR_UNSPECIFIED)
120            };
121        }
122        Self {
123            id: node.pubkey().to_string(),
124            last_updated_timestamp: node.wallclock(),
125            gossip: unwrap_socket!(gossip),
126            tvu: unwrap_socket!(tvu, Protocol::UDP),
127            serve_repair_quic: unwrap_socket!(serve_repair, Protocol::QUIC),
128            tpu: unwrap_socket!(tpu, Protocol::UDP),
129            tpu_forwards: unwrap_socket!(tpu_forwards, Protocol::UDP),
130            tpu_vote: unwrap_socket!(tpu_vote, Protocol::UDP),
131            rpc: unwrap_socket!(rpc),
132            rpc_pubsub: unwrap_socket!(rpc_pubsub),
133            serve_repair: unwrap_socket!(serve_repair, Protocol::UDP),
134            shred_version: node.shred_version(),
135        }
136    }
137}
138
139impl Display for AdminRpcContactInfo {
140    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
141        writeln!(f, "Identity: {}", self.id)?;
142        writeln!(f, "Gossip: {}", self.gossip)?;
143        writeln!(f, "TVU: {}", self.tvu)?;
144        writeln!(f, "TPU: {}", self.tpu)?;
145        writeln!(f, "TPU Forwards: {}", self.tpu_forwards)?;
146        writeln!(f, "TPU Votes: {}", self.tpu_vote)?;
147        writeln!(f, "RPC: {}", self.rpc)?;
148        writeln!(f, "RPC Pubsub: {}", self.rpc_pubsub)?;
149        writeln!(f, "Serve Repair: {}", self.serve_repair)?;
150        writeln!(f, "Last Updated Timestamp: {}", self.last_updated_timestamp)?;
151        writeln!(f, "Shred Version: {}", self.shred_version)
152    }
153}
154impl solana_cli_output::VerboseDisplay for AdminRpcContactInfo {}
155impl solana_cli_output::QuietDisplay for AdminRpcContactInfo {}
156
157impl Display for AdminRpcRepairWhitelist {
158    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
159        writeln!(f, "Repair whitelist: {:?}", &self.whitelist)
160    }
161}
162impl solana_cli_output::VerboseDisplay for AdminRpcRepairWhitelist {}
163impl solana_cli_output::QuietDisplay for AdminRpcRepairWhitelist {}
164
165#[rpc]
166pub trait AdminRpc {
167    type Metadata;
168
169    /// Initiates validator exit; exit is asynchronous so the validator
170    /// will almost certainly still be running when this method returns
171    #[rpc(meta, name = "exit")]
172    fn exit(&self, meta: Self::Metadata) -> Result<()>;
173
174    /// Return the process id (pid)
175    #[rpc(meta, name = "pid")]
176    fn pid(&self, meta: Self::Metadata) -> Result<u32>;
177
178    #[rpc(meta, name = "reloadPlugin")]
179    fn reload_plugin(
180        &self,
181        meta: Self::Metadata,
182        name: String,
183        config_file: String,
184    ) -> BoxFuture<Result<()>>;
185
186    #[rpc(meta, name = "unloadPlugin")]
187    fn unload_plugin(&self, meta: Self::Metadata, name: String) -> BoxFuture<Result<()>>;
188
189    #[rpc(meta, name = "loadPlugin")]
190    fn load_plugin(&self, meta: Self::Metadata, config_file: String) -> BoxFuture<Result<String>>;
191
192    #[rpc(meta, name = "listPlugins")]
193    fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture<Result<Vec<String>>>;
194
195    #[rpc(meta, name = "rpcAddress")]
196    fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>>;
197
198    #[rpc(name = "setLogFilter")]
199    fn set_log_filter(&self, filter: String) -> Result<()>;
200
201    #[rpc(meta, name = "startTime")]
202    fn start_time(&self, meta: Self::Metadata) -> Result<SystemTime>;
203
204    #[rpc(meta, name = "startProgress")]
205    fn start_progress(&self, meta: Self::Metadata) -> Result<ValidatorStartProgress>;
206
207    #[rpc(meta, name = "addAuthorizedVoter")]
208    fn add_authorized_voter(&self, meta: Self::Metadata, keypair_file: String) -> Result<()>;
209
210    #[rpc(meta, name = "addAuthorizedVoterFromBytes")]
211    fn add_authorized_voter_from_bytes(&self, meta: Self::Metadata, keypair: Vec<u8>)
212    -> Result<()>;
213
214    #[rpc(meta, name = "removeAllAuthorizedVoters")]
215    fn remove_all_authorized_voters(&self, meta: Self::Metadata) -> Result<()>;
216
217    #[rpc(meta, name = "setIdentity")]
218    fn set_identity(
219        &self,
220        meta: Self::Metadata,
221        keypair_file: String,
222        require_tower: bool,
223    ) -> Result<()>;
224
225    #[rpc(meta, name = "setIdentityFromBytes")]
226    fn set_identity_from_bytes(
227        &self,
228        meta: Self::Metadata,
229        identity_keypair: Vec<u8>,
230        require_tower: bool,
231    ) -> Result<()>;
232
233    #[rpc(meta, name = "setStakedNodesOverrides")]
234    fn set_staked_nodes_overrides(&self, meta: Self::Metadata, path: String) -> Result<()>;
235
236    #[rpc(meta, name = "contactInfo")]
237    fn contact_info(&self, meta: Self::Metadata) -> Result<AdminRpcContactInfo>;
238
239    #[rpc(meta, name = "selectActiveInterface")]
240    fn select_active_interface(&self, meta: Self::Metadata, interface: IpAddr) -> Result<()>;
241
242    #[rpc(meta, name = "repairShredFromPeer")]
243    fn repair_shred_from_peer(
244        &self,
245        meta: Self::Metadata,
246        pubkey: Option<Pubkey>,
247        slot: u64,
248        shred_index: u64,
249    ) -> Result<()>;
250
251    #[rpc(meta, name = "repairWhitelist")]
252    fn repair_whitelist(&self, meta: Self::Metadata) -> Result<AdminRpcRepairWhitelist>;
253
254    #[rpc(meta, name = "setRepairWhitelist")]
255    fn set_repair_whitelist(&self, meta: Self::Metadata, whitelist: Vec<Pubkey>) -> Result<()>;
256
257    #[rpc(meta, name = "getSecondaryIndexKeySize")]
258    fn get_secondary_index_key_size(
259        &self,
260        meta: Self::Metadata,
261        pubkey_str: String,
262    ) -> Result<HashMap<RpcAccountIndex, usize>>;
263
264    #[rpc(meta, name = "setPublicTpuAddress")]
265    fn set_public_tpu_address(
266        &self,
267        meta: Self::Metadata,
268        public_tpu_addr: SocketAddr,
269    ) -> Result<()>;
270
271    #[rpc(meta, name = "setPublicTpuForwardsAddress")]
272    fn set_public_tpu_forwards_address(
273        &self,
274        meta: Self::Metadata,
275        public_tpu_forwards_addr: SocketAddr,
276    ) -> Result<()>;
277
278    #[rpc(meta, name = "setPublicTvuAddress")]
279    fn set_public_tvu_address(
280        &self,
281        meta: Self::Metadata,
282        public_tvu_addr: SocketAddr,
283    ) -> Result<()>;
284
285    #[rpc(meta, name = "manageBlockProduction")]
286    fn manage_block_production(
287        &self,
288        meta: Self::Metadata,
289        block_production_method: BlockProductionMethod,
290        transaction_struct: TransactionStructure,
291        num_workers: NonZeroUsize,
292        scheduler_pacing: SchedulerPacing,
293    ) -> Result<()>;
294
295    #[rpc(meta, name = "isGeneratingSnapshots")]
296    fn is_generating_snapshots(&self, meta: Self::Metadata) -> Result<bool>;
297}
298
299pub struct AdminRpcImpl;
300impl AdminRpc for AdminRpcImpl {
301    type Metadata = AdminRpcRequestMetadata;
302
303    fn exit(&self, meta: Self::Metadata) -> Result<()> {
304        debug!("exit admin rpc request received");
305
306        thread::Builder::new()
307            .name("solProcessExit".into())
308            .spawn(move || {
309                let start_time = Instant::now();
310
311                // Trigger a fastboot snapshot before exiting
312                if let Some(snapshot_controller) = meta.snapshot_controller() {
313                    let latest_snapshot_slot = snapshot_controller.latest_bank_snapshot_slot();
314
315                    info!("Requesting fastboot snapshot before exit");
316                    snapshot_controller.request_fastboot_snapshot();
317
318                    // Wait up to 5s for a snapshot to finish. This should allow time for the
319                    // fastboot snapshot to complete without stalling exit indefinitely.
320                    // The timeout will be hit in the event new roots are not being created.
321                    let timeout = Duration::from_secs(5);
322                    while snapshot_controller.latest_bank_snapshot_slot() == latest_snapshot_slot {
323                        if start_time.elapsed() > timeout {
324                            warn!("Timeout waiting for snapshot to complete");
325                            datapoint_warn!(
326                                "admin-rpc-snapshot-timeout",
327                                ("timeout_us", start_time.elapsed().as_micros(), i64)
328                            );
329                            break;
330                        }
331                        thread::sleep(Duration::from_millis(100));
332                    }
333                    info!(
334                        "Requesting fastboot snapshot before exit... Done in {:?}",
335                        start_time.elapsed()
336                    );
337                }
338
339                // Delay exit signal until this RPC request completes, otherwise the caller of `exit` might
340                // receive a confusing error as the validator shuts down before a response is sent back.
341                // If elapsed time has already taken 100ms, there is no need for further delay
342                if start_time.elapsed().as_millis() < 100 {
343                    thread::sleep(Duration::from_millis(100));
344                }
345
346                info!("validator exit requested");
347                meta.validator_exit.write().unwrap().exit();
348
349                if !meta.validator_exit_backpressure.is_empty() {
350                    let service_names = meta.validator_exit_backpressure.keys();
351                    info!("Wait for these services to complete: {service_names:?}");
352                    loop {
353                        // The initial sleep is a grace period to allow the services to raise their
354                        // backpressure flags.
355                        // Subsequent sleeps are to throttle how often we check and log.
356                        thread::sleep(Duration::from_secs(1));
357
358                        let mut any_flags_raised = false;
359                        for (name, flag) in meta.validator_exit_backpressure.iter() {
360                            let is_flag_raised = flag.load(Ordering::Relaxed);
361                            if is_flag_raised {
362                                info!("{name}'s exit backpressure flag is raised");
363                                any_flags_raised = true;
364                            }
365                        }
366                        if !any_flags_raised {
367                            break;
368                        }
369                    }
370                    info!("All services have completed");
371                }
372
373                // TODO: Debug why Exit doesn't always cause the validator to fully exit
374                // (rocksdb background processing or some other stuck thread perhaps?).
375                //
376                // If the process is still alive after five seconds, exit harder
377                thread::sleep(Duration::from_secs(
378                    env::var("SOLANA_VALIDATOR_EXIT_TIMEOUT")
379                        .ok()
380                        .and_then(|x| x.parse().ok())
381                        .unwrap_or(5),
382                ));
383                warn!("validator exit timeout");
384                std::process::exit(0);
385            })
386            .unwrap();
387
388        Ok(())
389    }
390
391    fn pid(&self, _meta: Self::Metadata) -> Result<u32> {
392        Ok(std::process::id())
393    }
394
395    fn reload_plugin(
396        &self,
397        meta: Self::Metadata,
398        name: String,
399        config_file: String,
400    ) -> BoxFuture<Result<()>> {
401        Box::pin(async move {
402            // Construct channel for plugin to respond to this particular rpc request instance
403            let (response_sender, response_receiver) = oneshot_channel();
404
405            // Send request to plugin manager if there is a geyser service
406            if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
407                rpc_to_manager_sender
408                    .send(GeyserPluginManagerRequest::ReloadPlugin {
409                        name,
410                        config_file,
411                        response_sender,
412                    })
413                    .expect("GeyerPluginService should never drop request receiver");
414            } else {
415                return Err(jsonrpc_core::Error {
416                    code: ErrorCode::InvalidRequest,
417                    message: "No geyser plugin service".to_string(),
418                    data: None,
419                });
420            }
421
422            // Await response from plugin manager
423            response_receiver
424                .await
425                .expect("GeyerPluginService's oneshot sender shouldn't drop early")
426        })
427    }
428
429    fn load_plugin(&self, meta: Self::Metadata, config_file: String) -> BoxFuture<Result<String>> {
430        Box::pin(async move {
431            // Construct channel for plugin to respond to this particular rpc request instance
432            let (response_sender, response_receiver) = oneshot_channel();
433
434            // Send request to plugin manager if there is a geyser service
435            if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
436                rpc_to_manager_sender
437                    .send(GeyserPluginManagerRequest::LoadPlugin {
438                        config_file,
439                        response_sender,
440                    })
441                    .expect("GeyerPluginService should never drop request receiver");
442            } else {
443                return Err(jsonrpc_core::Error {
444                    code: ErrorCode::InvalidRequest,
445                    message: "No geyser plugin service".to_string(),
446                    data: None,
447                });
448            }
449
450            // Await response from plugin manager
451            response_receiver
452                .await
453                .expect("GeyerPluginService's oneshot sender shouldn't drop early")
454        })
455    }
456
457    fn unload_plugin(&self, meta: Self::Metadata, name: String) -> BoxFuture<Result<()>> {
458        Box::pin(async move {
459            // Construct channel for plugin to respond to this particular rpc request instance
460            let (response_sender, response_receiver) = oneshot_channel();
461
462            // Send request to plugin manager if there is a geyser service
463            if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
464                rpc_to_manager_sender
465                    .send(GeyserPluginManagerRequest::UnloadPlugin {
466                        name,
467                        response_sender,
468                    })
469                    .expect("GeyerPluginService should never drop request receiver");
470            } else {
471                return Err(jsonrpc_core::Error {
472                    code: ErrorCode::InvalidRequest,
473                    message: "No geyser plugin service".to_string(),
474                    data: None,
475                });
476            }
477
478            // Await response from plugin manager
479            response_receiver
480                .await
481                .expect("GeyerPluginService's oneshot sender shouldn't drop early")
482        })
483    }
484
485    fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture<Result<Vec<String>>> {
486        Box::pin(async move {
487            // Construct channel for plugin to respond to this particular rpc request instance
488            let (response_sender, response_receiver) = oneshot_channel();
489
490            // Send request to plugin manager
491            if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
492                rpc_to_manager_sender
493                    .send(GeyserPluginManagerRequest::ListPlugins { response_sender })
494                    .expect("GeyerPluginService should never drop request receiver");
495            } else {
496                return Err(jsonrpc_core::Error {
497                    code: ErrorCode::InvalidRequest,
498                    message: "No geyser plugin service".to_string(),
499                    data: None,
500                });
501            }
502
503            // Await response from plugin manager
504            response_receiver
505                .await
506                .expect("GeyerPluginService's oneshot sender shouldn't drop early")
507        })
508    }
509
510    fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>> {
511        debug!("rpc_addr admin rpc request received");
512        Ok(meta.rpc_addr)
513    }
514
515    fn set_log_filter(&self, filter: String) -> Result<()> {
516        debug!("set_log_filter admin rpc request received");
517        agave_logger::setup_with(&filter);
518        Ok(())
519    }
520
521    fn start_time(&self, meta: Self::Metadata) -> Result<SystemTime> {
522        debug!("start_time admin rpc request received");
523        Ok(meta.start_time)
524    }
525
526    fn start_progress(&self, meta: Self::Metadata) -> Result<ValidatorStartProgress> {
527        debug!("start_progress admin rpc request received");
528        Ok(*meta.start_progress.read().unwrap())
529    }
530
531    fn add_authorized_voter(&self, meta: Self::Metadata, keypair_file: String) -> Result<()> {
532        debug!("add_authorized_voter request received");
533
534        let authorized_voter = read_keypair_file(keypair_file)
535            .map_err(|err| jsonrpc_core::error::Error::invalid_params(format!("{err}")))?;
536
537        AdminRpcImpl::add_authorized_voter_keypair(meta, authorized_voter)
538    }
539
540    fn add_authorized_voter_from_bytes(
541        &self,
542        meta: Self::Metadata,
543        keypair: Vec<u8>,
544    ) -> Result<()> {
545        debug!("add_authorized_voter_from_bytes request received");
546
547        let authorized_voter = Keypair::try_from(keypair.as_ref()).map_err(|err| {
548            jsonrpc_core::error::Error::invalid_params(format!(
549                "Failed to read authorized voter keypair from provided byte array: {err}"
550            ))
551        })?;
552
553        AdminRpcImpl::add_authorized_voter_keypair(meta, authorized_voter)
554    }
555
556    fn remove_all_authorized_voters(&self, meta: Self::Metadata) -> Result<()> {
557        debug!("remove_all_authorized_voters received");
558        meta.authorized_voter_keypairs.write().unwrap().clear();
559        Ok(())
560    }
561
562    fn set_identity(
563        &self,
564        meta: Self::Metadata,
565        keypair_file: String,
566        require_tower: bool,
567    ) -> Result<()> {
568        debug!("set_identity request received");
569
570        let identity_keypair = read_keypair_file(&keypair_file).map_err(|err| {
571            jsonrpc_core::error::Error::invalid_params(format!(
572                "Failed to read identity keypair from {keypair_file}: {err}"
573            ))
574        })?;
575
576        AdminRpcImpl::set_identity_keypair(meta, identity_keypair, require_tower)
577    }
578
579    fn set_identity_from_bytes(
580        &self,
581        meta: Self::Metadata,
582        identity_keypair: Vec<u8>,
583        require_tower: bool,
584    ) -> Result<()> {
585        debug!("set_identity_from_bytes request received");
586
587        let identity_keypair = Keypair::try_from(identity_keypair.as_ref()).map_err(|err| {
588            jsonrpc_core::error::Error::invalid_params(format!(
589                "Failed to read identity keypair from provided byte array: {err}"
590            ))
591        })?;
592
593        AdminRpcImpl::set_identity_keypair(meta, identity_keypair, require_tower)
594    }
595
596    fn set_staked_nodes_overrides(&self, meta: Self::Metadata, path: String) -> Result<()> {
597        let loaded_config = load_staked_nodes_overrides(&path)
598            .map_err(|err| {
599                error!(
600                    "Failed to load staked nodes overrides from {}: {}",
601                    &path, err
602                );
603                jsonrpc_core::error::Error::internal_error()
604            })?
605            .staked_map_id;
606        let mut write_staked_nodes = meta.staked_nodes_overrides.write().unwrap();
607        write_staked_nodes.clear();
608        write_staked_nodes.extend(loaded_config);
609        info!("Staked nodes overrides loaded from {path}");
610        debug!("overrides map: {write_staked_nodes:?}");
611        Ok(())
612    }
613
614    fn contact_info(&self, meta: Self::Metadata) -> Result<AdminRpcContactInfo> {
615        meta.with_post_init(|post_init| Ok(post_init.cluster_info.my_contact_info().into()))
616    }
617
618    fn select_active_interface(&self, meta: Self::Metadata, interface: IpAddr) -> Result<()> {
619        debug!("select_active_interface received: {interface}");
620        meta.with_post_init(|post_init| {
621            let node = post_init.node.as_ref().ok_or_else(|| {
622                jsonrpc_core::Error::invalid_params("`Node` not initialized in post_init")
623            })?;
624
625            node.switch_active_interface(interface, &post_init.cluster_info)
626                .map_err(|e| {
627                    jsonrpc_core::Error::invalid_params(format!(
628                        "Switching failed due to error {e}"
629                    ))
630                })?;
631            info!("Switched primary interface to {interface}");
632            Ok(())
633        })
634    }
635
636    fn repair_shred_from_peer(
637        &self,
638        meta: Self::Metadata,
639        pubkey: Option<Pubkey>,
640        slot: u64,
641        shred_index: u64,
642    ) -> Result<()> {
643        debug!("repair_shred_from_peer request received");
644
645        meta.with_post_init(|post_init| {
646            repair_service::RepairService::request_repair_for_shred_from_peer(
647                post_init.cluster_info.clone(),
648                post_init.cluster_slots.clone(),
649                pubkey,
650                slot,
651                shred_index,
652                &post_init.repair_socket,
653                post_init.outstanding_repair_requests.clone(),
654            );
655            Ok(())
656        })
657    }
658
659    fn repair_whitelist(&self, meta: Self::Metadata) -> Result<AdminRpcRepairWhitelist> {
660        debug!("repair_whitelist request received");
661
662        meta.with_post_init(|post_init| {
663            let whitelist: Vec<_> = post_init
664                .repair_whitelist
665                .read()
666                .unwrap()
667                .iter()
668                .copied()
669                .collect();
670            Ok(AdminRpcRepairWhitelist { whitelist })
671        })
672    }
673
674    fn set_repair_whitelist(&self, meta: Self::Metadata, whitelist: Vec<Pubkey>) -> Result<()> {
675        debug!("set_repair_whitelist request received");
676
677        let whitelist: HashSet<Pubkey> = whitelist.into_iter().collect();
678        meta.with_post_init(|post_init| {
679            *post_init.repair_whitelist.write().unwrap() = whitelist;
680            warn!(
681                "Repair whitelist set to {:?}",
682                &post_init.repair_whitelist.read().unwrap()
683            );
684            Ok(())
685        })
686    }
687
688    fn get_secondary_index_key_size(
689        &self,
690        meta: Self::Metadata,
691        pubkey_str: String,
692    ) -> Result<HashMap<RpcAccountIndex, usize>> {
693        debug!("get_secondary_index_key_size rpc request received: {pubkey_str:?}");
694        let index_key = verify_pubkey(&pubkey_str)?;
695        meta.with_post_init(|post_init| {
696            let bank = post_init.bank_forks.read().unwrap().root_bank();
697
698            // Take ref to enabled AccountSecondaryIndexes
699            let enabled_account_indexes = &bank.accounts().accounts_db.account_indexes;
700
701            // Exit if secondary indexes are not enabled
702            if enabled_account_indexes.is_empty() {
703                debug!("get_secondary_index_key_size: secondary index not enabled.");
704                return Ok(HashMap::new());
705            };
706
707            // Make sure the requested key is not explicitly excluded
708            if !enabled_account_indexes.include_key(&index_key) {
709                return Err(RpcCustomError::KeyExcludedFromSecondaryIndex {
710                    index_key: index_key.to_string(),
711                }
712                .into());
713            }
714
715            // Grab a ref to the AccountsDbfor this Bank
716            let accounts_index = &bank.accounts().accounts_db.accounts_index;
717
718            // Find the size of the key in every index where it exists
719            let found_sizes = enabled_account_indexes
720                .indexes
721                .iter()
722                .filter_map(|index| {
723                    accounts_index
724                        .get_index_key_size(index, &index_key)
725                        .map(|size| (rpc_account_index_from_account_index(index), size))
726                })
727                .collect::<HashMap<_, _>>();
728
729            // Note: Will return an empty HashMap if no keys are found.
730            if found_sizes.is_empty() {
731                debug!("get_secondary_index_key_size: key not found in the secondary index.");
732            }
733            Ok(found_sizes)
734        })
735    }
736
737    fn set_public_tpu_address(
738        &self,
739        meta: Self::Metadata,
740        public_tpu_addr: SocketAddr,
741    ) -> Result<()> {
742        debug!("set_public_tpu_address rpc request received: {public_tpu_addr}");
743
744        meta.with_post_init(|post_init| {
745            post_init
746                .cluster_info
747                .my_contact_info()
748                .tpu(Protocol::QUIC)
749                .ok_or_else(|| {
750                    error!(
751                        "The public TPU QUIC address isn't being published. The node is likely in \
752                         repair mode. See help for --restricted-repair-only-mode for more \
753                         information."
754                    );
755                    jsonrpc_core::error::Error::internal_error()
756                })?;
757            post_init
758                .cluster_info
759                .set_tpu_quic(public_tpu_addr)
760                .map_err(|err| {
761                    error!("Failed to set public TPU QUIC address to {public_tpu_addr}: {err}");
762                    jsonrpc_core::error::Error::internal_error()
763                })?;
764            let my_contact_info = post_init.cluster_info.my_contact_info();
765            warn!(
766                "Public TPU addresses set to {:?} (quic)",
767                my_contact_info.tpu(Protocol::QUIC),
768            );
769            Ok(())
770        })
771    }
772
773    fn set_public_tpu_forwards_address(
774        &self,
775        meta: Self::Metadata,
776        public_tpu_forwards_addr: SocketAddr,
777    ) -> Result<()> {
778        debug!("set_public_tpu_forwards_address rpc request received: {public_tpu_forwards_addr}");
779
780        meta.with_post_init(|post_init| {
781            post_init
782                .cluster_info
783                .my_contact_info()
784                .tpu_forwards(Protocol::QUIC)
785                .ok_or_else(|| {
786                    error!(
787                        "The public TPU Forwards address isn't being published. The node is \
788                         likely in repair mode. See help for --restricted-repair-only-mode for \
789                         more information."
790                    );
791                    jsonrpc_core::error::Error::internal_error()
792                })?;
793            post_init
794                .cluster_info
795                .set_tpu_forwards_quic(public_tpu_forwards_addr)
796                .map_err(|err| {
797                    error!(
798                        "Failed to set public TPU QUIC address to {public_tpu_forwards_addr}: \
799                         {err}"
800                    );
801                    jsonrpc_core::error::Error::internal_error()
802                })?;
803            let my_contact_info = post_init.cluster_info.my_contact_info();
804            warn!(
805                "Public TPU Forwards address set to {:?} (quic)",
806                my_contact_info.tpu_forwards(Protocol::QUIC),
807            );
808            Ok(())
809        })
810    }
811
812    fn set_public_tvu_address(
813        &self,
814        meta: Self::Metadata,
815        public_tvu_addr: SocketAddr,
816    ) -> Result<()> {
817        debug!("set_public_tvu_address rpc request received: {public_tvu_addr}");
818
819        meta.with_post_init(|post_init| {
820            post_init
821                .cluster_info
822                .my_contact_info()
823                .tvu(Protocol::UDP)
824                .ok_or_else(|| {
825                    error!(
826                        "The public TVU address isn't being published. The node is likely in \
827                         repair mode. See help for --restricted-repair-only-mode for more \
828                         information."
829                    );
830                    jsonrpc_core::error::Error::internal_error()
831                })?;
832            post_init
833                .cluster_info
834                .set_tvu_socket(public_tvu_addr)
835                .map_err(|err| {
836                    error!("Failed to set public TVU address to {public_tvu_addr}: {err}");
837                    jsonrpc_core::error::Error::internal_error()
838                })?;
839            let my_contact_info = post_init.cluster_info.my_contact_info();
840            warn!(
841                "Public TVU addresses set to {:?}",
842                my_contact_info.tvu(Protocol::UDP),
843            );
844            Ok(())
845        })
846    }
847
848    fn manage_block_production(
849        &self,
850        meta: Self::Metadata,
851        block_production_method: BlockProductionMethod,
852        transaction_struct: TransactionStructure,
853        num_workers: NonZeroUsize,
854        scheduler_pacing: SchedulerPacing,
855    ) -> Result<()> {
856        debug!("manage_block_production rpc request received");
857
858        if num_workers > BankingStage::max_num_workers() {
859            return Err(jsonrpc_core::error::Error::invalid_params(format!(
860                "Number of workers ({}) exceeds maximum allowed ({})",
861                num_workers,
862                BankingStage::max_num_workers()
863            )));
864        }
865
866        if transaction_struct != TransactionStructure::View {
867            warn!("TransactionStructure::Sdk has no effect on block production");
868        }
869
870        meta.with_post_init(|post_init| {
871            if post_init
872                .banking_control_sender
873                .try_send(BankingControlMsg::Internal {
874                    block_production_method,
875                    num_workers,
876                    config: SchedulerConfig { scheduler_pacing },
877                })
878                .is_err()
879            {
880                error!("Banking stage already switching schedulers");
881
882                return Err(jsonrpc_core::error::Error::internal_error());
883            }
884
885            Ok(())
886        })
887    }
888
889    fn is_generating_snapshots(&self, meta: Self::Metadata) -> Result<bool> {
890        if let Some(snapshot_controller) = meta.snapshot_controller() {
891            Ok(snapshot_controller.is_generating_snapshots())
892        } else {
893            Err(jsonrpc_core::error::Error::invalid_params(
894                "snapshot_controller unavailable",
895            ))
896        }
897    }
898}
899
900impl AdminRpcImpl {
901    fn add_authorized_voter_keypair(
902        meta: AdminRpcRequestMetadata,
903        authorized_voter: Keypair,
904    ) -> Result<()> {
905        let mut authorized_voter_keypairs = meta.authorized_voter_keypairs.write().unwrap();
906
907        if authorized_voter_keypairs
908            .iter()
909            .any(|x| x.pubkey() == authorized_voter.pubkey())
910        {
911            Err(jsonrpc_core::error::Error::invalid_params(
912                "Authorized voter already present",
913            ))
914        } else {
915            authorized_voter_keypairs.push(Arc::new(authorized_voter));
916            Ok(())
917        }
918    }
919
920    fn set_identity_keypair(
921        meta: AdminRpcRequestMetadata,
922        identity_keypair: Keypair,
923        require_tower: bool,
924    ) -> Result<()> {
925        meta.with_post_init(|post_init| {
926            if require_tower {
927                let _ = Tower::restore(meta.tower_storage.as_ref(), &identity_keypair.pubkey())
928                    .map_err(|err| {
929                        jsonrpc_core::error::Error::invalid_params(format!(
930                            "Unable to load tower file for identity {}: {}",
931                            identity_keypair.pubkey(),
932                            err
933                        ))
934                    })?;
935            }
936
937            for (key, notifier) in &*post_init.notifies.read().unwrap() {
938                if let Err(err) = notifier.update_key(&identity_keypair) {
939                    error!("Error updating network layer keypair: {err} on {key:?}");
940                }
941            }
942
943            let old_identity = post_init.cluster_info.id();
944            let new_identity = identity_keypair.pubkey();
945            solana_metrics::set_host_id(new_identity.to_string());
946            // Emit the datapoint after updating metrics to emit the new pubkey
947            datapoint_info!(
948                "validator-set_identity",
949                ("old_id", old_identity.to_string(), String),
950                ("new_id", new_identity.to_string(), String),
951                ("version", solana_version::version!(), String),
952            );
953            post_init
954                .cluster_info
955                .set_keypair(Arc::new(identity_keypair));
956            warn!("Identity set to {new_identity}");
957            Ok(())
958        })
959    }
960}
961
962fn rpc_account_index_from_account_index(account_index: &AccountIndex) -> RpcAccountIndex {
963    match account_index {
964        AccountIndex::ProgramId => RpcAccountIndex::ProgramId,
965        AccountIndex::SplTokenOwner => RpcAccountIndex::SplTokenOwner,
966        AccountIndex::SplTokenMint => RpcAccountIndex::SplTokenMint,
967    }
968}
969
970// Start the Admin RPC interface
971pub fn run(ledger_path: &Path, metadata: AdminRpcRequestMetadata) {
972    let admin_rpc_path = admin_rpc_path(ledger_path);
973
974    let event_loop = tokio::runtime::Builder::new_multi_thread()
975        .thread_name("solAdminRpcEl")
976        .worker_threads(3) // Three still seems like a lot, and better than the default of available core count
977        .enable_all()
978        .build()
979        .unwrap();
980
981    Builder::new()
982        .name("solAdminRpc".to_string())
983        .spawn(move || {
984            let mut io = MetaIoHandler::default();
985            io.extend_with(AdminRpcImpl.to_delegate());
986
987            let validator_exit = metadata.validator_exit.clone();
988            let server = ServerBuilder::with_meta_extractor(io, move |_req: &RequestContext| {
989                metadata.clone()
990            })
991            .event_loop_executor(event_loop.handle().clone())
992            .start(&format!("{}", admin_rpc_path.display()));
993
994            match server {
995                Err(err) => {
996                    warn!("Unable to start admin rpc service: {err:?}");
997                }
998                Ok(server) => {
999                    info!("started admin rpc service!");
1000                    let close_handle = server.close_handle();
1001                    validator_exit
1002                        .write()
1003                        .unwrap()
1004                        .register_exit(Box::new(move || {
1005                            close_handle.close();
1006                        }));
1007
1008                    server.wait();
1009                }
1010            }
1011        })
1012        .unwrap();
1013}
1014
1015fn admin_rpc_path(ledger_path: &Path) -> PathBuf {
1016    #[cfg(target_family = "windows")]
1017    {
1018        // More information about the wackiness of pipe names over at
1019        // https://docs.microsoft.com/en-us/windows/win32/ipc/pipe-names
1020        if let Some(ledger_filename) = ledger_path.file_name() {
1021            PathBuf::from(format!(
1022                "\\\\.\\pipe\\{}-admin.rpc",
1023                ledger_filename.to_string_lossy()
1024            ))
1025        } else {
1026            PathBuf::from("\\\\.\\pipe\\admin.rpc")
1027        }
1028    }
1029    #[cfg(not(target_family = "windows"))]
1030    {
1031        ledger_path.join("admin.rpc")
1032    }
1033}
1034
1035// Connect to the Admin RPC interface
1036pub async fn connect(ledger_path: &Path) -> std::result::Result<gen_client::Client, RpcError> {
1037    let admin_rpc_path = admin_rpc_path(ledger_path);
1038    if !admin_rpc_path.exists() {
1039        Err(RpcError::Client(format!(
1040            "{} does not exist",
1041            admin_rpc_path.display()
1042        )))
1043    } else {
1044        ipc::connect::<_, gen_client::Client>(&format!("{}", admin_rpc_path.display())).await
1045    }
1046}
1047
1048// Create a runtime for use by client side admin RPC interface calls
1049pub fn runtime() -> Runtime {
1050    tokio::runtime::Builder::new_multi_thread()
1051        .thread_name("solAdminRpcRt")
1052        .enable_all()
1053        // The agave-validator subcommands make few admin RPC calls and block
1054        // on the results so two workers is plenty
1055        .worker_threads(2)
1056        .build()
1057        .expect("new tokio runtime")
1058}
1059
1060#[derive(Default, Deserialize, Clone)]
1061pub struct StakedNodesOverrides {
1062    #[serde(deserialize_with = "deserialize_pubkey_map")]
1063    pub staked_map_id: HashMap<Pubkey, u64>,
1064}
1065
1066pub fn deserialize_pubkey_map<'de, D>(des: D) -> std::result::Result<HashMap<Pubkey, u64>, D::Error>
1067where
1068    D: Deserializer<'de>,
1069{
1070    let container: HashMap<String, u64> = serde::Deserialize::deserialize(des)?;
1071    let mut container_typed: HashMap<Pubkey, u64> = HashMap::new();
1072    for (key, value) in container.iter() {
1073        let typed_key = Pubkey::try_from(key.as_str())
1074            .map_err(|_| serde::de::Error::invalid_type(serde::de::Unexpected::Map, &"PubKey"))?;
1075        container_typed.insert(typed_key, *value);
1076    }
1077    Ok(container_typed)
1078}
1079
1080pub fn load_staked_nodes_overrides(
1081    path: &String,
1082) -> std::result::Result<StakedNodesOverrides, Box<dyn error::Error>> {
1083    debug!("Loading staked nodes overrides configuration from {path}");
1084    if Path::new(&path).exists() {
1085        let file = std::fs::File::open(path)?;
1086        Ok(serde_yaml::from_reader(file)?)
1087    } else {
1088        Err(format!("Staked nodes overrides provided '{path}' a non-existing file path.").into())
1089    }
1090}
1091
1092#[cfg(test)]
1093mod tests {
1094    use {
1095        super::*,
1096        agave_snapshots::snapshot_config::SnapshotConfig,
1097        crossbeam_channel::unbounded,
1098        serde_json::Value,
1099        solana_account::{Account, AccountSharedData},
1100        solana_accounts_db::{
1101            accounts_db::{ACCOUNTS_DB_CONFIG_FOR_TESTING, AccountsDbConfig},
1102            accounts_index::AccountSecondaryIndexes,
1103        },
1104        solana_core::{
1105            admin_rpc_post_init::{KeyUpdaterType, KeyUpdaters},
1106            consensus::tower_storage::NullTowerStorage,
1107            validator::{Validator, ValidatorConfig, ValidatorTpuConfig},
1108        },
1109        solana_gossip::{cluster_info::ClusterInfo, node::Node},
1110        solana_ledger::{
1111            create_new_tmp_ledger,
1112            genesis_utils::{
1113                GenesisConfigInfo, create_genesis_config, create_genesis_config_with_leader,
1114            },
1115        },
1116        solana_net_utils::{SocketAddrSpace, sockets::bind_to_localhost_unique},
1117        solana_program_option::COption,
1118        solana_program_pack::Pack,
1119        solana_pubkey::Pubkey,
1120        solana_rpc::rpc::create_validator_exit,
1121        solana_runtime::{
1122            bank::{Bank, BankTestConfig},
1123            bank_forks::BankForks,
1124        },
1125        solana_system_interface::program as system_program,
1126        spl_generic_token::token,
1127        spl_token_2022_interface::state::{
1128            Account as TokenAccount, AccountState as TokenAccountState, Mint,
1129        },
1130        std::{collections::HashSet, fs::remove_dir_all, sync::atomic::AtomicBool},
1131        tokio::sync::mpsc,
1132    };
1133
1134    #[derive(Default)]
1135    struct TestConfig {
1136        account_indexes: AccountSecondaryIndexes,
1137    }
1138
1139    struct RpcHandler {
1140        io: MetaIoHandler<AdminRpcRequestMetadata>,
1141        meta: AdminRpcRequestMetadata,
1142        bank_forks: Arc<RwLock<BankForks>>,
1143    }
1144
1145    impl RpcHandler {
1146        fn _start() -> Self {
1147            Self::start_with_config(TestConfig::default())
1148        }
1149
1150        fn start_with_config(config: TestConfig) -> Self {
1151            let keypair = Arc::new(Keypair::new());
1152            let cluster_info = Arc::new(ClusterInfo::new(
1153                ContactInfo::new(
1154                    keypair.pubkey(),
1155                    solana_time_utils::timestamp(), // wallclock
1156                    0u16,                           // shred_version
1157                ),
1158                keypair,
1159                SocketAddrSpace::Unspecified,
1160            ));
1161            let exit = Arc::new(AtomicBool::new(false));
1162            let validator_exit = create_validator_exit(exit);
1163            let (bank_forks, vote_keypair) = new_bank_forks_with_config(BankTestConfig {
1164                accounts_db_config: AccountsDbConfig {
1165                    account_indexes: Some(config.account_indexes),
1166                    ..ACCOUNTS_DB_CONFIG_FOR_TESTING
1167                },
1168            });
1169
1170            let (snapshot_request_sender, _) = unbounded();
1171            let snapshot_controller = Arc::new(SnapshotController::new(
1172                snapshot_request_sender.clone(),
1173                SnapshotConfig::default(),
1174                bank_forks.read().unwrap().root(),
1175            ));
1176
1177            let vote_account = vote_keypair.pubkey();
1178            let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
1179            let repair_whitelist = Arc::new(RwLock::new(HashSet::new()));
1180            let meta = AdminRpcRequestMetadata {
1181                rpc_addr: None,
1182                start_time: SystemTime::now(),
1183                start_progress,
1184                validator_exit,
1185                validator_exit_backpressure: HashMap::default(),
1186                authorized_voter_keypairs: Arc::new(RwLock::new(vec![vote_keypair])),
1187                tower_storage: Arc::new(NullTowerStorage {}),
1188                post_init: Arc::new(RwLock::new(Some(AdminRpcRequestMetadataPostInit {
1189                    cluster_info,
1190                    bank_forks: bank_forks.clone(),
1191                    vote_account,
1192                    repair_whitelist,
1193                    notifies: Arc::new(RwLock::new(KeyUpdaters::default())),
1194                    repair_socket: Arc::new(bind_to_localhost_unique().expect("should bind")),
1195                    outstanding_repair_requests: Arc::<
1196                        RwLock<repair_service::OutstandingShredRepairs>,
1197                    >::default(),
1198                    cluster_slots: Arc::new(
1199                        solana_core::cluster_slots_service::cluster_slots::ClusterSlots::default_for_tests(),
1200                    ),
1201                    node: None,
1202                    banking_control_sender: mpsc::channel(1).0,
1203                    snapshot_controller,
1204                }))),
1205                staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
1206                rpc_to_plugin_manager_sender: None,
1207            };
1208            let mut io = MetaIoHandler::default();
1209            io.extend_with(AdminRpcImpl.to_delegate());
1210
1211            Self {
1212                io,
1213                meta,
1214                bank_forks,
1215            }
1216        }
1217
1218        fn root_bank(&self) -> Arc<Bank> {
1219            self.bank_forks.read().unwrap().root_bank()
1220        }
1221    }
1222
1223    fn new_bank_forks_with_config(
1224        config: BankTestConfig,
1225    ) -> (Arc<RwLock<BankForks>>, Arc<Keypair>) {
1226        let GenesisConfigInfo {
1227            genesis_config,
1228            voting_keypair,
1229            ..
1230        } = create_genesis_config(1_000_000_000);
1231
1232        let bank = Bank::new_with_config_for_tests(&genesis_config, config);
1233        (BankForks::new_rw_arc(bank), Arc::new(voting_keypair))
1234    }
1235
1236    #[test]
1237    fn test_secondary_index_key_sizes() {
1238        for secondary_index_enabled in [true, false] {
1239            let account_indexes = if secondary_index_enabled {
1240                AccountSecondaryIndexes {
1241                    keys: None,
1242                    indexes: HashSet::from([
1243                        AccountIndex::ProgramId,
1244                        AccountIndex::SplTokenMint,
1245                        AccountIndex::SplTokenOwner,
1246                    ]),
1247                }
1248            } else {
1249                AccountSecondaryIndexes::default()
1250            };
1251
1252            // RPC & Bank Setup
1253            let rpc = RpcHandler::start_with_config(TestConfig { account_indexes });
1254
1255            let bank = rpc.root_bank();
1256            let RpcHandler { io, meta, .. } = rpc;
1257
1258            // Pubkeys
1259            let token_account1_pubkey = Pubkey::new_unique();
1260            let token_account2_pubkey = Pubkey::new_unique();
1261            let token_account3_pubkey = Pubkey::new_unique();
1262            let mint1_pubkey = Pubkey::new_unique();
1263            let mint2_pubkey = Pubkey::new_unique();
1264            let wallet1_pubkey = Pubkey::new_unique();
1265            let wallet2_pubkey = Pubkey::new_unique();
1266            let non_existent_pubkey = Pubkey::new_unique();
1267            let delegate = Pubkey::new_unique();
1268
1269            let mut num_default_spl_token_program_accounts = 0;
1270            let mut num_default_system_program_accounts = 0;
1271
1272            if !secondary_index_enabled {
1273                // Test first with no accounts added & no secondary indexes enabled:
1274                let req = format!(
1275                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{token_account1_pubkey}"]}}"#,
1276                );
1277                let res = io.handle_request_sync(&req, meta.clone());
1278                let result: Value = serde_json::from_str(&res.expect("actual response"))
1279                    .expect("actual response deserialization");
1280                let sizes: HashMap<RpcAccountIndex, usize> =
1281                    serde_json::from_value(result["result"].clone()).unwrap();
1282                assert!(sizes.is_empty());
1283            } else {
1284                // Count SPL Token Program Default Accounts
1285                let req = format!(
1286                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1287                    token::id(),
1288                );
1289                let res = io.handle_request_sync(&req, meta.clone());
1290                let result: Value = serde_json::from_str(&res.expect("actual response"))
1291                    .expect("actual response deserialization");
1292                let sizes: HashMap<RpcAccountIndex, usize> =
1293                    serde_json::from_value(result["result"].clone()).unwrap();
1294                assert_eq!(sizes.len(), 1);
1295                num_default_spl_token_program_accounts =
1296                    *sizes.get(&RpcAccountIndex::ProgramId).unwrap();
1297                // Count System Program Default Accounts
1298                let req = format!(
1299                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1300                    system_program::id(),
1301                );
1302                let res = io.handle_request_sync(&req, meta.clone());
1303                let result: Value = serde_json::from_str(&res.expect("actual response"))
1304                    .expect("actual response deserialization");
1305                let sizes: HashMap<RpcAccountIndex, usize> =
1306                    serde_json::from_value(result["result"].clone()).unwrap();
1307                assert_eq!(sizes.len(), 1);
1308                num_default_system_program_accounts =
1309                    *sizes.get(&RpcAccountIndex::ProgramId).unwrap();
1310            }
1311
1312            // Add 2 basic wallet accounts
1313            let wallet1_account = AccountSharedData::from(Account {
1314                lamports: 11111111,
1315                owner: system_program::id(),
1316                ..Account::default()
1317            });
1318            bank.store_account(&wallet1_pubkey, &wallet1_account);
1319            let wallet2_account = AccountSharedData::from(Account {
1320                lamports: 11111111,
1321                owner: system_program::id(),
1322                ..Account::default()
1323            });
1324            bank.store_account(&wallet2_pubkey, &wallet2_account);
1325
1326            // Add a token account
1327            let mut account1_data = vec![0; TokenAccount::get_packed_len()];
1328            let token_account1 = TokenAccount {
1329                mint: mint1_pubkey,
1330                owner: wallet1_pubkey,
1331                delegate: COption::Some(delegate),
1332                amount: 420,
1333                state: TokenAccountState::Initialized,
1334                is_native: COption::None,
1335                delegated_amount: 30,
1336                close_authority: COption::Some(wallet1_pubkey),
1337            };
1338            TokenAccount::pack(token_account1, &mut account1_data).unwrap();
1339            let token_account1 = AccountSharedData::from(Account {
1340                lamports: 111,
1341                data: account1_data.to_vec(),
1342                owner: token::id(),
1343                ..Account::default()
1344            });
1345            bank.store_account(&token_account1_pubkey, &token_account1);
1346
1347            // Add the mint
1348            let mut mint1_data = vec![0; Mint::get_packed_len()];
1349            let mint1_state = Mint {
1350                mint_authority: COption::Some(wallet1_pubkey),
1351                supply: 500,
1352                decimals: 2,
1353                is_initialized: true,
1354                freeze_authority: COption::Some(wallet1_pubkey),
1355            };
1356            Mint::pack(mint1_state, &mut mint1_data).unwrap();
1357            let mint_account1 = AccountSharedData::from(Account {
1358                lamports: 222,
1359                data: mint1_data.to_vec(),
1360                owner: token::id(),
1361                ..Account::default()
1362            });
1363            bank.store_account(&mint1_pubkey, &mint_account1);
1364
1365            // Add another token account with the different owner, but same delegate, and mint
1366            let mut account2_data = vec![0; TokenAccount::get_packed_len()];
1367            let token_account2 = TokenAccount {
1368                mint: mint1_pubkey,
1369                owner: wallet2_pubkey,
1370                delegate: COption::Some(delegate),
1371                amount: 420,
1372                state: TokenAccountState::Initialized,
1373                is_native: COption::None,
1374                delegated_amount: 30,
1375                close_authority: COption::Some(wallet2_pubkey),
1376            };
1377            TokenAccount::pack(token_account2, &mut account2_data).unwrap();
1378            let token_account2 = AccountSharedData::from(Account {
1379                lamports: 333,
1380                data: account2_data.to_vec(),
1381                owner: token::id(),
1382                ..Account::default()
1383            });
1384            bank.store_account(&token_account2_pubkey, &token_account2);
1385
1386            // Add another token account with the same owner and delegate but different mint
1387            let mut account3_data = vec![0; TokenAccount::get_packed_len()];
1388            let token_account3 = TokenAccount {
1389                mint: mint2_pubkey,
1390                owner: wallet2_pubkey,
1391                delegate: COption::Some(delegate),
1392                amount: 42,
1393                state: TokenAccountState::Initialized,
1394                is_native: COption::None,
1395                delegated_amount: 30,
1396                close_authority: COption::Some(wallet2_pubkey),
1397            };
1398            TokenAccount::pack(token_account3, &mut account3_data).unwrap();
1399            let token_account3 = AccountSharedData::from(Account {
1400                lamports: 444,
1401                data: account3_data.to_vec(),
1402                owner: token::id(),
1403                ..Account::default()
1404            });
1405            bank.store_account(&token_account3_pubkey, &token_account3);
1406
1407            // Add the new mint
1408            let mut mint2_data = vec![0; Mint::get_packed_len()];
1409            let mint2_state = Mint {
1410                mint_authority: COption::Some(wallet2_pubkey),
1411                supply: 200,
1412                decimals: 3,
1413                is_initialized: true,
1414                freeze_authority: COption::Some(wallet2_pubkey),
1415            };
1416            Mint::pack(mint2_state, &mut mint2_data).unwrap();
1417            let mint_account2 = AccountSharedData::from(Account {
1418                lamports: 555,
1419                data: mint2_data.to_vec(),
1420                owner: token::id(),
1421                ..Account::default()
1422            });
1423            bank.store_account(&mint2_pubkey, &mint_account2);
1424
1425            // Accounts should now look like the following:
1426            //
1427            //                   -----system_program------
1428            //                  /                         \
1429            //                 /-(owns)                    \-(owns)
1430            //                /                             \
1431            //             wallet1                   ---wallet2---
1432            //               /                      /             \
1433            //              /-(SPL::owns)          /-(SPL::owns)   \-(SPL::owns)
1434            //             /                      /                 \
1435            //      token_account1         token_account2       token_account3
1436            //            \                     /                   /
1437            //             \-(SPL::mint)       /-(SPL::mint)       /-(SPL::mint)
1438            //              \                 /                   /
1439            //               --mint_account1--               mint_account2
1440
1441            if secondary_index_enabled {
1442                // ----------- Test for a non-existent key -----------
1443                let req = format!(
1444                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{non_existent_pubkey}"]}}"#,
1445                );
1446                let res = io.handle_request_sync(&req, meta.clone());
1447                let result: Value = serde_json::from_str(&res.expect("actual response"))
1448                    .expect("actual response deserialization");
1449                let sizes: HashMap<RpcAccountIndex, usize> =
1450                    serde_json::from_value(result["result"].clone()).unwrap();
1451                assert!(sizes.is_empty());
1452                // --------------- Test Queries ---------------
1453                // 1) Wallet1 - Owns 1 SPL Token
1454                let req = format!(
1455                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{wallet1_pubkey}"]}}"#,
1456                );
1457                let res = io.handle_request_sync(&req, meta.clone());
1458                let result: Value = serde_json::from_str(&res.expect("actual response"))
1459                    .expect("actual response deserialization");
1460                let sizes: HashMap<RpcAccountIndex, usize> =
1461                    serde_json::from_value(result["result"].clone()).unwrap();
1462                assert_eq!(sizes.len(), 1);
1463                assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenOwner).unwrap(), 1);
1464                // 2) Wallet2 - Owns 2 SPL Tokens
1465                let req = format!(
1466                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{wallet2_pubkey}"]}}"#,
1467                );
1468                let res = io.handle_request_sync(&req, meta.clone());
1469                let result: Value = serde_json::from_str(&res.expect("actual response"))
1470                    .expect("actual response deserialization");
1471                let sizes: HashMap<RpcAccountIndex, usize> =
1472                    serde_json::from_value(result["result"].clone()).unwrap();
1473                assert_eq!(sizes.len(), 1);
1474                assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenOwner).unwrap(), 2);
1475                // 3) Mint1 - Is in 2 SPL Accounts
1476                let req = format!(
1477                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{mint1_pubkey}"]}}"#,
1478                );
1479                let res = io.handle_request_sync(&req, meta.clone());
1480                let result: Value = serde_json::from_str(&res.expect("actual response"))
1481                    .expect("actual response deserialization");
1482                let sizes: HashMap<RpcAccountIndex, usize> =
1483                    serde_json::from_value(result["result"].clone()).unwrap();
1484                assert_eq!(sizes.len(), 1);
1485                assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenMint).unwrap(), 2);
1486                // 4) Mint2 - Is in 1 SPL Account
1487                let req = format!(
1488                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{mint2_pubkey}"]}}"#,
1489                );
1490                let res = io.handle_request_sync(&req, meta.clone());
1491                let result: Value = serde_json::from_str(&res.expect("actual response"))
1492                    .expect("actual response deserialization");
1493                let sizes: HashMap<RpcAccountIndex, usize> =
1494                    serde_json::from_value(result["result"].clone()).unwrap();
1495                assert_eq!(sizes.len(), 1);
1496                assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenMint).unwrap(), 1);
1497                // 5) SPL Token Program Owns 6 Accounts - 1 Default, 5 created above.
1498                let req = format!(
1499                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1500                    token::id(),
1501                );
1502                let res = io.handle_request_sync(&req, meta.clone());
1503                let result: Value = serde_json::from_str(&res.expect("actual response"))
1504                    .expect("actual response deserialization");
1505                let sizes: HashMap<RpcAccountIndex, usize> =
1506                    serde_json::from_value(result["result"].clone()).unwrap();
1507                assert_eq!(sizes.len(), 1);
1508                assert_eq!(
1509                    *sizes.get(&RpcAccountIndex::ProgramId).unwrap(),
1510                    (num_default_spl_token_program_accounts + 5)
1511                );
1512                // 5) System Program Owns 4 Accounts + 2 Default, 2 created above.
1513                let req = format!(
1514                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1515                    system_program::id(),
1516                );
1517                let res = io.handle_request_sync(&req, meta.clone());
1518                let result: Value = serde_json::from_str(&res.expect("actual response"))
1519                    .expect("actual response deserialization");
1520                let sizes: HashMap<RpcAccountIndex, usize> =
1521                    serde_json::from_value(result["result"].clone()).unwrap();
1522                assert_eq!(sizes.len(), 1);
1523                assert_eq!(
1524                    *sizes.get(&RpcAccountIndex::ProgramId).unwrap(),
1525                    (num_default_system_program_accounts + 2)
1526                );
1527            } else {
1528                // ------------ Secondary Indexes Disabled ------------
1529                let req = format!(
1530                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{token_account2_pubkey}"]}}"#,
1531                );
1532                let res = io.handle_request_sync(&req, meta.clone());
1533                let result: Value = serde_json::from_str(&res.expect("actual response"))
1534                    .expect("actual response deserialization");
1535                let sizes: HashMap<RpcAccountIndex, usize> =
1536                    serde_json::from_value(result["result"].clone()).unwrap();
1537                assert!(sizes.is_empty());
1538            }
1539        }
1540    }
1541
1542    // This test checks that the rpc call to `set_identity` works a expected with
1543    // Bank but without validator.
1544    #[test]
1545    fn test_set_identity() {
1546        let rpc = RpcHandler::start_with_config(TestConfig::default());
1547
1548        let RpcHandler { io, meta, .. } = rpc;
1549
1550        let expected_validator_id = Keypair::new();
1551        let validator_id_bytes = format!("{:?}", expected_validator_id.to_bytes());
1552
1553        let set_id_request = format!(
1554            r#"{{"jsonrpc":"2.0","id":1,"method":"setIdentityFromBytes","params":[{validator_id_bytes}, false]}}"#,
1555        );
1556        let response = io.handle_request_sync(&set_id_request, meta.clone());
1557        let actual_parsed_response: Value =
1558            serde_json::from_str(&response.expect("actual response"))
1559                .expect("actual response deserialization");
1560
1561        let expected_parsed_response: Value = serde_json::from_str(
1562            r#"{
1563                "id": 1,
1564                "jsonrpc": "2.0",
1565                "result": null
1566            }"#,
1567        )
1568        .expect("Failed to parse expected response");
1569        assert_eq!(actual_parsed_response, expected_parsed_response);
1570
1571        let contact_info_request =
1572            r#"{"jsonrpc":"2.0","id":1,"method":"contactInfo","params":[]}"#.to_string();
1573        let response = io.handle_request_sync(&contact_info_request, meta.clone());
1574        let parsed_response: Value = serde_json::from_str(&response.expect("actual response"))
1575            .expect("actual response deserialization");
1576        let actual_validator_id = parsed_response["result"]["id"]
1577            .as_str()
1578            .expect("Expected a string");
1579        assert_eq!(
1580            actual_validator_id,
1581            expected_validator_id.pubkey().to_string()
1582        );
1583    }
1584
1585    struct TestValidatorWithAdminRpc {
1586        meta: AdminRpcRequestMetadata,
1587        io: MetaIoHandler<AdminRpcRequestMetadata>,
1588        validator_ledger_path: PathBuf,
1589    }
1590
1591    impl TestValidatorWithAdminRpc {
1592        fn new() -> Self {
1593            let leader_keypair = Keypair::new();
1594            let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
1595
1596            let validator_keypair = Keypair::new();
1597            let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
1598            let genesis_config =
1599                create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1000)
1600                    .genesis_config;
1601            let (validator_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
1602
1603            let voting_keypair = Arc::new(Keypair::new());
1604            let voting_pubkey = voting_keypair.pubkey();
1605            let authorized_voter_keypairs = Arc::new(RwLock::new(vec![voting_keypair]));
1606            let validator_config = ValidatorConfig {
1607                rpc_addrs: Some((
1608                    validator_node.info.rpc().unwrap(),
1609                    validator_node.info.rpc_pubsub().unwrap(),
1610                )),
1611                ..ValidatorConfig::default_for_test()
1612            };
1613            let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
1614
1615            let post_init = Arc::new(RwLock::new(None));
1616            let meta = AdminRpcRequestMetadata {
1617                rpc_addr: validator_config.rpc_addrs.map(|(rpc_addr, _)| rpc_addr),
1618                start_time: SystemTime::now(),
1619                start_progress: start_progress.clone(),
1620                validator_exit: validator_config.validator_exit.clone(),
1621                validator_exit_backpressure: HashMap::default(),
1622                authorized_voter_keypairs: authorized_voter_keypairs.clone(),
1623                tower_storage: Arc::new(NullTowerStorage {}),
1624                post_init: post_init.clone(),
1625                staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
1626                rpc_to_plugin_manager_sender: None,
1627            };
1628
1629            let _validator = Validator::new(
1630                validator_node,
1631                Arc::new(validator_keypair),
1632                &validator_ledger_path,
1633                &voting_pubkey,
1634                authorized_voter_keypairs,
1635                vec![leader_node.info],
1636                &validator_config,
1637                true, // should_check_duplicate_instance
1638                None, // rpc_to_plugin_manager_receiver
1639                start_progress.clone(),
1640                SocketAddrSpace::Unspecified,
1641                ValidatorTpuConfig::new_for_tests(),
1642                post_init.clone(),
1643                None,
1644            )
1645            .expect("assume successful validator start");
1646            assert_eq!(
1647                *start_progress.read().unwrap(),
1648                ValidatorStartProgress::Running
1649            );
1650            let post_init = post_init.read().unwrap();
1651
1652            assert!(post_init.is_some());
1653            let post_init = post_init.as_ref().unwrap();
1654            let notifies = post_init.notifies.read().unwrap();
1655            let updater_keys: HashSet<KeyUpdaterType> =
1656                notifies.into_iter().map(|(key, _)| key.clone()).collect();
1657            assert_eq!(
1658                updater_keys,
1659                HashSet::from_iter(vec![
1660                    KeyUpdaterType::Tpu,
1661                    KeyUpdaterType::TpuForwards,
1662                    KeyUpdaterType::TpuVote,
1663                    KeyUpdaterType::Forward,
1664                    KeyUpdaterType::RpcService,
1665                    KeyUpdaterType::Bls,
1666                    KeyUpdaterType::BlsConnectionCache,
1667                ])
1668            );
1669            let mut io = MetaIoHandler::default();
1670            io.extend_with(AdminRpcImpl.to_delegate());
1671            Self {
1672                meta,
1673                io,
1674                validator_ledger_path,
1675            }
1676        }
1677
1678        fn handle_request(&self, request: &str) -> Option<String> {
1679            self.io.handle_request_sync(request, self.meta.clone())
1680        }
1681    }
1682
1683    impl Drop for TestValidatorWithAdminRpc {
1684        fn drop(&mut self) {
1685            remove_dir_all(self.validator_ledger_path.clone()).unwrap();
1686        }
1687    }
1688
1689    #[test]
1690    fn test_no_post_init_no_snapshot_controller() {
1691        let validator_exit = create_validator_exit(Arc::new(AtomicBool::new(false)));
1692        let voting_keypair = Arc::new(Keypair::new());
1693        let authorized_voter_keypairs = Arc::new(RwLock::new(vec![voting_keypair]));
1694        let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
1695
1696        let post_init = Arc::new(RwLock::new(None));
1697        let meta = AdminRpcRequestMetadata {
1698            rpc_addr: None,
1699            start_time: SystemTime::now(),
1700            start_progress: start_progress.clone(),
1701            validator_exit,
1702            validator_exit_backpressure: HashMap::default(),
1703            authorized_voter_keypairs: authorized_voter_keypairs.clone(),
1704            tower_storage: Arc::new(NullTowerStorage {}),
1705            post_init: post_init.clone(),
1706            staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
1707            rpc_to_plugin_manager_sender: None,
1708        };
1709
1710        let snapshot_controller = meta.snapshot_controller();
1711        assert!(snapshot_controller.is_none());
1712    }
1713
1714    // This test checks that `set_identity` call works with working validator and client.
1715    #[test]
1716    fn test_set_identity_with_validator() {
1717        let test_validator = TestValidatorWithAdminRpc::new();
1718        let expected_validator_id = Keypair::new();
1719        let validator_id_bytes = format!("{:?}", expected_validator_id.to_bytes());
1720
1721        let set_id_request = format!(
1722            r#"{{"jsonrpc":"2.0","id":1,"method":"setIdentityFromBytes","params":[{validator_id_bytes}, false]}}"#,
1723        );
1724        let response = test_validator.handle_request(&set_id_request);
1725        let actual_parsed_response: Value =
1726            serde_json::from_str(&response.expect("actual response"))
1727                .expect("actual response deserialization");
1728
1729        let expected_parsed_response: Value = serde_json::from_str(
1730            r#"{
1731                "id": 1,
1732                "jsonrpc": "2.0",
1733                "result": null
1734            }"#,
1735        )
1736        .expect("Failed to parse expected response");
1737        assert_eq!(actual_parsed_response, expected_parsed_response);
1738
1739        let contact_info_request =
1740            r#"{"jsonrpc":"2.0","id":1,"method":"contactInfo","params":[]}"#.to_string();
1741        let response = test_validator.handle_request(&contact_info_request);
1742        let parsed_response: Value = serde_json::from_str(&response.expect("actual response"))
1743            .expect("actual response deserialization");
1744        let actual_validator_id = parsed_response["result"]["id"]
1745            .as_str()
1746            .expect("Expected a string");
1747        assert_eq!(
1748            actual_validator_id,
1749            expected_validator_id.pubkey().to_string()
1750        );
1751
1752        let contact_info_request =
1753            r#"{"jsonrpc":"2.0","id":1,"method":"exit","params":[]}"#.to_string();
1754        let exit_response = test_validator.handle_request(&contact_info_request);
1755        let actual_parsed_response: Value =
1756            serde_json::from_str(&exit_response.expect("actual response"))
1757                .expect("actual response deserialization");
1758        assert_eq!(actual_parsed_response, expected_parsed_response);
1759    }
1760
1761    #[test]
1762    fn test_is_generating_snapshots() {
1763        // Test with snapshots enabled
1764        let rpc = RpcHandler::start_with_config(TestConfig::default());
1765        let RpcHandler { io, meta, .. } = rpc;
1766
1767        let request = r#"{"jsonrpc":"2.0","id":1,"method":"isGeneratingSnapshots","params":[]}"#;
1768        let response = io.handle_request_sync(request, meta.clone());
1769        let result: Value = serde_json::from_str(&response.expect("actual response"))
1770            .expect("actual response deserialization");
1771
1772        // Should return a boolean result indicating if snapshots are being generated
1773        assert!(result["result"].is_boolean());
1774        // Verify that snapshots are being generated since the test setup includes a snapshot controller
1775        assert!(result["result"].as_bool().unwrap());
1776    }
1777
1778    #[test]
1779    fn test_is_generating_snapshots_no_controller() {
1780        // Test with snapshots enabled
1781        let rpc = RpcHandler::start_with_config(TestConfig::default());
1782        let RpcHandler { io, .. } = rpc;
1783
1784        // Test with no post_init (snapshot_controller unavailable)
1785        let request = r#"{"jsonrpc":"2.0","id":1,"method":"isGeneratingSnapshots","params":[]}"#;
1786        let validator_exit = create_validator_exit(Arc::new(AtomicBool::new(false)));
1787        let authorized_voter_keypairs = Arc::new(RwLock::new(vec![Arc::new(Keypair::new())]));
1788        let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
1789
1790        let meta_no_post_init = AdminRpcRequestMetadata {
1791            rpc_addr: None,
1792            start_time: SystemTime::now(),
1793            start_progress,
1794            validator_exit,
1795            validator_exit_backpressure: HashMap::default(),
1796            authorized_voter_keypairs,
1797            tower_storage: Arc::new(NullTowerStorage {}),
1798            post_init: Arc::new(RwLock::new(None)),
1799            staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
1800            rpc_to_plugin_manager_sender: None,
1801        };
1802
1803        let response = io.handle_request_sync(request, meta_no_post_init);
1804        let result: Value = serde_json::from_str(&response.expect("actual response"))
1805            .expect("actual response deserialization");
1806
1807        // Should return an error when snapshot_controller is unavailable
1808        assert!(result["error"].is_object());
1809        assert_eq!(
1810            result["error"]["message"].as_str().unwrap(),
1811            "snapshot_controller unavailable"
1812        );
1813    }
1814}