Skip to main content

agave_validator/
admin_rpc_service.rs

1use {
2    crossbeam_channel::Sender,
3    jsonrpc_core::{BoxFuture, ErrorCode, MetaIoHandler, Metadata, Result},
4    jsonrpc_core_client::{RpcError, transports::ipc},
5    jsonrpc_derive::rpc,
6    jsonrpc_ipc_server::{
7        RequestContext, ServerBuilder, tokio::sync::oneshot::channel as oneshot_channel,
8    },
9    log::*,
10    serde::{Deserialize, Serialize, de::Deserializer},
11    solana_accounts_db::accounts_index::AccountIndex,
12    solana_clock::Slot,
13    solana_core::{
14        admin_rpc_post_init::AdminRpcRequestMetadataPostInit,
15        banking_stage::{
16            BankingControlMsg, BankingStage,
17            transaction_scheduler::scheduler_controller::SchedulerConfig,
18        },
19        consensus::{Tower, tower_storage::TowerStorage},
20        repair::repair_service,
21        validator::{
22            BlockProductionMethod, SchedulerPacing, TransactionStructure, ValidatorStartProgress,
23        },
24    },
25    solana_geyser_plugin_manager::GeyserPluginManagerRequest,
26    solana_gossip::contact_info::{ContactInfo, Protocol, SOCKET_ADDR_UNSPECIFIED},
27    solana_keypair::{Keypair, read_keypair_file},
28    solana_metrics::{datapoint_info, datapoint_warn},
29    solana_pubkey::Pubkey,
30    solana_rpc::rpc::verify_pubkey,
31    solana_rpc_client_api::{config::RpcAccountIndex, custom_error::RpcCustomError},
32    solana_runtime::snapshot_controller::SnapshotController,
33    solana_signer::Signer,
34    solana_validator_exit::Exit,
35    std::{
36        collections::{HashMap, HashSet},
37        env, error,
38        fmt::{self, Display},
39        net::{IpAddr, SocketAddr},
40        num::NonZeroUsize,
41        path::{Path, PathBuf},
42        sync::{
43            Arc, RwLock,
44            atomic::{AtomicBool, Ordering},
45        },
46        thread::{self, Builder},
47        time::{Duration, Instant, SystemTime},
48    },
49    tokio::runtime::Runtime,
50};
51
52#[derive(Clone)]
53pub struct AdminRpcRequestMetadata {
54    pub rpc_addr: Option<SocketAddr>,
55    pub start_time: SystemTime,
56    pub start_progress: Arc<RwLock<ValidatorStartProgress>>,
57    pub validator_exit: Arc<RwLock<Exit>>,
58    pub validator_exit_backpressure: HashMap<String, Arc<AtomicBool>>,
59    pub authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
60    pub tower_storage: Arc<dyn TowerStorage>,
61    pub staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
62    pub post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
63    pub rpc_to_plugin_manager_sender: Option<Sender<GeyserPluginManagerRequest>>,
64}
65
66impl Metadata for AdminRpcRequestMetadata {}
67
68impl AdminRpcRequestMetadata {
69    fn with_post_init<F, R>(&self, func: F) -> Result<R>
70    where
71        F: FnOnce(&AdminRpcRequestMetadataPostInit) -> Result<R>,
72    {
73        if let Some(post_init) = self.post_init.read().unwrap().as_ref() {
74            func(post_init)
75        } else {
76            Err(jsonrpc_core::error::Error::invalid_params(
77                "Retry once validator start up is complete",
78            ))
79        }
80    }
81
82    fn snapshot_controller(&self) -> Option<Arc<SnapshotController>> {
83        self.with_post_init(|post_init| Ok(post_init.snapshot_controller.clone()))
84            .map_err(|_| {
85                // The error from with_post_init is not relevant, as it is meant for RPC callers
86                warn!("snapshot_controller unavailable, shutting down without taking snapshot");
87            })
88            .ok()
89    }
90}
91
92#[derive(Debug, Deserialize, Serialize)]
93pub struct AdminRpcContactInfo {
94    pub id: String,
95    pub gossip: SocketAddr,
96    pub tvu: SocketAddr,
97    pub serve_repair_quic: SocketAddr,
98    pub tpu: SocketAddr,
99    pub tpu_forwards: SocketAddr,
100    pub tpu_vote: SocketAddr,
101    pub rpc: SocketAddr,
102    pub rpc_pubsub: SocketAddr,
103    pub serve_repair: SocketAddr,
104    pub last_updated_timestamp: u64,
105    pub shred_version: u16,
106}
107
108#[derive(Debug, Deserialize, Serialize)]
109pub struct AdminRpcRepairWhitelist {
110    pub whitelist: Vec<Pubkey>,
111}
112
113impl From<ContactInfo> for AdminRpcContactInfo {
114    fn from(node: ContactInfo) -> Self {
115        macro_rules! unwrap_socket {
116            ($name:ident) => {
117                node.$name().unwrap_or(SOCKET_ADDR_UNSPECIFIED)
118            };
119            ($name:ident, $protocol:expr) => {
120                node.$name($protocol).unwrap_or(SOCKET_ADDR_UNSPECIFIED)
121            };
122        }
123        Self {
124            id: node.pubkey().to_string(),
125            last_updated_timestamp: node.wallclock(),
126            gossip: unwrap_socket!(gossip),
127            tvu: unwrap_socket!(tvu, Protocol::UDP),
128            serve_repair_quic: unwrap_socket!(serve_repair, Protocol::QUIC),
129            tpu: unwrap_socket!(tpu, Protocol::UDP),
130            tpu_forwards: unwrap_socket!(tpu_forwards, Protocol::UDP),
131            tpu_vote: unwrap_socket!(tpu_vote, Protocol::UDP),
132            rpc: unwrap_socket!(rpc),
133            rpc_pubsub: unwrap_socket!(rpc_pubsub),
134            serve_repair: unwrap_socket!(serve_repair, Protocol::UDP),
135            shred_version: node.shred_version(),
136        }
137    }
138}
139
140impl Display for AdminRpcContactInfo {
141    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
142        writeln!(f, "Identity: {}", self.id)?;
143        writeln!(f, "Gossip: {}", self.gossip)?;
144        writeln!(f, "TVU: {}", self.tvu)?;
145        writeln!(f, "TPU: {}", self.tpu)?;
146        writeln!(f, "TPU Forwards: {}", self.tpu_forwards)?;
147        writeln!(f, "TPU Votes: {}", self.tpu_vote)?;
148        writeln!(f, "RPC: {}", self.rpc)?;
149        writeln!(f, "RPC Pubsub: {}", self.rpc_pubsub)?;
150        writeln!(f, "Serve Repair: {}", self.serve_repair)?;
151        writeln!(f, "Last Updated Timestamp: {}", self.last_updated_timestamp)?;
152        writeln!(f, "Shred Version: {}", self.shred_version)
153    }
154}
155impl solana_cli_output::VerboseDisplay for AdminRpcContactInfo {}
156impl solana_cli_output::QuietDisplay for AdminRpcContactInfo {}
157
158impl Display for AdminRpcRepairWhitelist {
159    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
160        writeln!(f, "Repair whitelist: {:?}", &self.whitelist)
161    }
162}
163impl solana_cli_output::VerboseDisplay for AdminRpcRepairWhitelist {}
164impl solana_cli_output::QuietDisplay for AdminRpcRepairWhitelist {}
165
166#[rpc]
167pub trait AdminRpc {
168    type Metadata;
169
170    /// Initiates validator exit; exit is asynchronous so the validator
171    /// will almost certainly still be running when this method returns
172    #[rpc(meta, name = "exit")]
173    fn exit(&self, meta: Self::Metadata) -> Result<()>;
174
175    /// Return the process id (pid)
176    #[rpc(meta, name = "pid")]
177    fn pid(&self, meta: Self::Metadata) -> Result<u32>;
178
179    #[rpc(meta, name = "reloadPlugin")]
180    fn reload_plugin(
181        &self,
182        meta: Self::Metadata,
183        name: String,
184        config_file: String,
185    ) -> BoxFuture<Result<()>>;
186
187    #[rpc(meta, name = "unloadPlugin")]
188    fn unload_plugin(&self, meta: Self::Metadata, name: String) -> BoxFuture<Result<()>>;
189
190    #[rpc(meta, name = "loadPlugin")]
191    fn load_plugin(&self, meta: Self::Metadata, config_file: String) -> BoxFuture<Result<String>>;
192
193    #[rpc(meta, name = "listPlugins")]
194    fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture<Result<Vec<String>>>;
195
196    #[rpc(meta, name = "rpcAddress")]
197    fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>>;
198
199    #[rpc(name = "setLogFilter")]
200    fn set_log_filter(&self, filter: String) -> Result<()>;
201
202    #[rpc(meta, name = "startTime")]
203    fn start_time(&self, meta: Self::Metadata) -> Result<SystemTime>;
204
205    #[rpc(meta, name = "startProgress")]
206    fn start_progress(&self, meta: Self::Metadata) -> Result<ValidatorStartProgress>;
207
208    #[rpc(meta, name = "addAuthorizedVoter")]
209    fn add_authorized_voter(&self, meta: Self::Metadata, keypair_file: String) -> Result<()>;
210
211    #[rpc(meta, name = "addAuthorizedVoterFromBytes")]
212    fn add_authorized_voter_from_bytes(&self, meta: Self::Metadata, keypair: Vec<u8>)
213    -> Result<()>;
214
215    #[rpc(meta, name = "removeAllAuthorizedVoters")]
216    fn remove_all_authorized_voters(&self, meta: Self::Metadata) -> Result<()>;
217
218    #[rpc(meta, name = "setIdentity")]
219    fn set_identity(
220        &self,
221        meta: Self::Metadata,
222        keypair_file: String,
223        require_tower: bool,
224    ) -> Result<()>;
225
226    #[rpc(meta, name = "setIdentityFromBytes")]
227    fn set_identity_from_bytes(
228        &self,
229        meta: Self::Metadata,
230        identity_keypair: Vec<u8>,
231        require_tower: bool,
232    ) -> Result<()>;
233
234    #[rpc(meta, name = "setStakedNodesOverrides")]
235    fn set_staked_nodes_overrides(&self, meta: Self::Metadata, path: String) -> Result<()>;
236
237    #[rpc(meta, name = "contactInfo")]
238    fn contact_info(&self, meta: Self::Metadata) -> Result<AdminRpcContactInfo>;
239
240    #[rpc(meta, name = "selectActiveInterface")]
241    fn select_active_interface(&self, meta: Self::Metadata, interface: IpAddr) -> Result<()>;
242
243    #[rpc(meta, name = "repairShredFromPeer")]
244    fn repair_shred_from_peer(
245        &self,
246        meta: Self::Metadata,
247        pubkey: Option<Pubkey>,
248        slot: u64,
249        shred_index: u64,
250    ) -> Result<()>;
251
252    #[rpc(meta, name = "repairWhitelist")]
253    fn repair_whitelist(&self, meta: Self::Metadata) -> Result<AdminRpcRepairWhitelist>;
254
255    #[rpc(meta, name = "setRepairWhitelist")]
256    fn set_repair_whitelist(&self, meta: Self::Metadata, whitelist: Vec<Pubkey>) -> Result<()>;
257
258    #[rpc(meta, name = "getSecondaryIndexKeySize")]
259    fn get_secondary_index_key_size(
260        &self,
261        meta: Self::Metadata,
262        pubkey_str: String,
263    ) -> Result<HashMap<RpcAccountIndex, usize>>;
264
265    #[rpc(meta, name = "setPublicTpuAddress")]
266    fn set_public_tpu_address(
267        &self,
268        meta: Self::Metadata,
269        public_tpu_addr: SocketAddr,
270    ) -> Result<()>;
271
272    #[rpc(meta, name = "setPublicTpuForwardsAddress")]
273    fn set_public_tpu_forwards_address(
274        &self,
275        meta: Self::Metadata,
276        public_tpu_forwards_addr: SocketAddr,
277    ) -> Result<()>;
278
279    #[rpc(meta, name = "setPublicTvuAddress")]
280    fn set_public_tvu_address(
281        &self,
282        meta: Self::Metadata,
283        public_tvu_addr: SocketAddr,
284    ) -> Result<()>;
285
286    #[rpc(meta, name = "manageBlockProduction")]
287    fn manage_block_production(
288        &self,
289        meta: Self::Metadata,
290        block_production_method: BlockProductionMethod,
291        transaction_struct: TransactionStructure,
292        num_workers: NonZeroUsize,
293        scheduler_pacing: SchedulerPacing,
294    ) -> Result<()>;
295
296    #[rpc(meta, name = "isGeneratingSnapshots")]
297    fn is_generating_snapshots(&self, meta: Self::Metadata) -> Result<bool>;
298
299    #[rpc(meta, name = "blockstorePurge")]
300    fn blockstore_purge(&self, meta: Self::Metadata, maximum_purge_slot: Slot) -> Result<()>;
301}
302
303pub struct AdminRpcImpl;
304impl AdminRpc for AdminRpcImpl {
305    type Metadata = AdminRpcRequestMetadata;
306
307    fn exit(&self, meta: Self::Metadata) -> Result<()> {
308        debug!("exit admin rpc request received");
309
310        thread::Builder::new()
311            .name("solProcessExit".into())
312            .spawn(move || {
313                let start_time = Instant::now();
314
315                // Trigger a fastboot snapshot before exiting
316                if let Some(snapshot_controller) = meta.snapshot_controller() {
317                    let latest_snapshot_slot = snapshot_controller.latest_bank_snapshot_slot();
318
319                    info!("Requesting fastboot snapshot before exit");
320                    snapshot_controller.request_fastboot_snapshot();
321
322                    // Wait up to 5s for a snapshot to finish. This should allow time for the
323                    // fastboot snapshot to complete without stalling exit indefinitely.
324                    // The timeout will be hit in the event new roots are not being created.
325                    let timeout = Duration::from_secs(5);
326                    while snapshot_controller.latest_bank_snapshot_slot() == latest_snapshot_slot {
327                        if start_time.elapsed() > timeout {
328                            warn!("Timeout waiting for snapshot to complete");
329                            datapoint_warn!(
330                                "admin-rpc-snapshot-timeout",
331                                ("timeout_us", start_time.elapsed().as_micros(), i64)
332                            );
333                            break;
334                        }
335                        thread::sleep(Duration::from_millis(100));
336                    }
337                    info!(
338                        "Requesting fastboot snapshot before exit... Done in {:?}",
339                        start_time.elapsed()
340                    );
341                }
342
343                // Delay exit signal until this RPC request completes, otherwise the caller of `exit` might
344                // receive a confusing error as the validator shuts down before a response is sent back.
345                // If elapsed time has already taken 100ms, there is no need for further delay
346                if start_time.elapsed().as_millis() < 100 {
347                    thread::sleep(Duration::from_millis(100));
348                }
349
350                info!("validator exit requested");
351                meta.validator_exit.write().unwrap().exit();
352
353                if !meta.validator_exit_backpressure.is_empty() {
354                    let service_names = meta.validator_exit_backpressure.keys();
355                    info!("Wait for these services to complete: {service_names:?}");
356                    loop {
357                        // The initial sleep is a grace period to allow the services to raise their
358                        // backpressure flags.
359                        // Subsequent sleeps are to throttle how often we check and log.
360                        thread::sleep(Duration::from_secs(1));
361
362                        let mut any_flags_raised = false;
363                        for (name, flag) in meta.validator_exit_backpressure.iter() {
364                            let is_flag_raised = flag.load(Ordering::Relaxed);
365                            if is_flag_raised {
366                                info!("{name}'s exit backpressure flag is raised");
367                                any_flags_raised = true;
368                            }
369                        }
370                        if !any_flags_raised {
371                            break;
372                        }
373                    }
374                    info!("All services have completed");
375                }
376
377                // TODO: Debug why Exit doesn't always cause the validator to fully exit
378                // (rocksdb background processing or some other stuck thread perhaps?).
379                //
380                // If the process is still alive after five seconds, exit harder
381                thread::sleep(Duration::from_secs(
382                    env::var("SOLANA_VALIDATOR_EXIT_TIMEOUT")
383                        .ok()
384                        .and_then(|x| x.parse().ok())
385                        .unwrap_or(5),
386                ));
387                warn!("validator exit timeout");
388                std::process::exit(0);
389            })
390            .unwrap();
391
392        Ok(())
393    }
394
395    fn pid(&self, _meta: Self::Metadata) -> Result<u32> {
396        Ok(std::process::id())
397    }
398
399    fn reload_plugin(
400        &self,
401        meta: Self::Metadata,
402        name: String,
403        config_file: String,
404    ) -> BoxFuture<Result<()>> {
405        Box::pin(async move {
406            // Construct channel for plugin to respond to this particular rpc request instance
407            let (response_sender, response_receiver) = oneshot_channel();
408
409            // Send request to plugin manager if there is a geyser service
410            if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
411                rpc_to_manager_sender
412                    .send(GeyserPluginManagerRequest::ReloadPlugin {
413                        name,
414                        config_file,
415                        response_sender,
416                    })
417                    .expect("GeyerPluginService should never drop request receiver");
418            } else {
419                return Err(jsonrpc_core::Error {
420                    code: ErrorCode::InvalidRequest,
421                    message: "No geyser plugin service".to_string(),
422                    data: None,
423                });
424            }
425
426            // Await response from plugin manager
427            response_receiver
428                .await
429                .expect("GeyerPluginService's oneshot sender shouldn't drop early")
430        })
431    }
432
433    fn load_plugin(&self, meta: Self::Metadata, config_file: String) -> BoxFuture<Result<String>> {
434        Box::pin(async move {
435            // Construct channel for plugin to respond to this particular rpc request instance
436            let (response_sender, response_receiver) = oneshot_channel();
437
438            // Send request to plugin manager if there is a geyser service
439            if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
440                rpc_to_manager_sender
441                    .send(GeyserPluginManagerRequest::LoadPlugin {
442                        config_file,
443                        response_sender,
444                    })
445                    .expect("GeyerPluginService should never drop request receiver");
446            } else {
447                return Err(jsonrpc_core::Error {
448                    code: ErrorCode::InvalidRequest,
449                    message: "No geyser plugin service".to_string(),
450                    data: None,
451                });
452            }
453
454            // Await response from plugin manager
455            response_receiver
456                .await
457                .expect("GeyerPluginService's oneshot sender shouldn't drop early")
458        })
459    }
460
461    fn unload_plugin(&self, meta: Self::Metadata, name: String) -> BoxFuture<Result<()>> {
462        Box::pin(async move {
463            // Construct channel for plugin to respond to this particular rpc request instance
464            let (response_sender, response_receiver) = oneshot_channel();
465
466            // Send request to plugin manager if there is a geyser service
467            if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
468                rpc_to_manager_sender
469                    .send(GeyserPluginManagerRequest::UnloadPlugin {
470                        name,
471                        response_sender,
472                    })
473                    .expect("GeyerPluginService should never drop request receiver");
474            } else {
475                return Err(jsonrpc_core::Error {
476                    code: ErrorCode::InvalidRequest,
477                    message: "No geyser plugin service".to_string(),
478                    data: None,
479                });
480            }
481
482            // Await response from plugin manager
483            response_receiver
484                .await
485                .expect("GeyerPluginService's oneshot sender shouldn't drop early")
486        })
487    }
488
489    fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture<Result<Vec<String>>> {
490        Box::pin(async move {
491            // Construct channel for plugin to respond to this particular rpc request instance
492            let (response_sender, response_receiver) = oneshot_channel();
493
494            // Send request to plugin manager
495            if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
496                rpc_to_manager_sender
497                    .send(GeyserPluginManagerRequest::ListPlugins { response_sender })
498                    .expect("GeyerPluginService should never drop request receiver");
499            } else {
500                return Err(jsonrpc_core::Error {
501                    code: ErrorCode::InvalidRequest,
502                    message: "No geyser plugin service".to_string(),
503                    data: None,
504                });
505            }
506
507            // Await response from plugin manager
508            response_receiver
509                .await
510                .expect("GeyerPluginService's oneshot sender shouldn't drop early")
511        })
512    }
513
514    fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>> {
515        debug!("rpc_addr admin rpc request received");
516        Ok(meta.rpc_addr)
517    }
518
519    fn set_log_filter(&self, filter: String) -> Result<()> {
520        debug!("set_log_filter admin rpc request received");
521        agave_logger::setup_with(&filter);
522        Ok(())
523    }
524
525    fn start_time(&self, meta: Self::Metadata) -> Result<SystemTime> {
526        debug!("start_time admin rpc request received");
527        Ok(meta.start_time)
528    }
529
530    fn start_progress(&self, meta: Self::Metadata) -> Result<ValidatorStartProgress> {
531        debug!("start_progress admin rpc request received");
532        Ok(*meta.start_progress.read().unwrap())
533    }
534
535    fn add_authorized_voter(&self, meta: Self::Metadata, keypair_file: String) -> Result<()> {
536        debug!("add_authorized_voter request received");
537
538        let authorized_voter = read_keypair_file(keypair_file)
539            .map_err(|err| jsonrpc_core::error::Error::invalid_params(format!("{err}")))?;
540
541        AdminRpcImpl::add_authorized_voter_keypair(meta, authorized_voter)
542    }
543
544    fn add_authorized_voter_from_bytes(
545        &self,
546        meta: Self::Metadata,
547        keypair: Vec<u8>,
548    ) -> Result<()> {
549        debug!("add_authorized_voter_from_bytes request received");
550
551        let authorized_voter = Keypair::try_from(keypair.as_ref()).map_err(|err| {
552            jsonrpc_core::error::Error::invalid_params(format!(
553                "Failed to read authorized voter keypair from provided byte array: {err}"
554            ))
555        })?;
556
557        AdminRpcImpl::add_authorized_voter_keypair(meta, authorized_voter)
558    }
559
560    fn remove_all_authorized_voters(&self, meta: Self::Metadata) -> Result<()> {
561        debug!("remove_all_authorized_voters received");
562        meta.authorized_voter_keypairs.write().unwrap().clear();
563        Ok(())
564    }
565
566    fn set_identity(
567        &self,
568        meta: Self::Metadata,
569        keypair_file: String,
570        require_tower: bool,
571    ) -> Result<()> {
572        debug!("set_identity request received");
573
574        let identity_keypair = read_keypair_file(&keypair_file).map_err(|err| {
575            jsonrpc_core::error::Error::invalid_params(format!(
576                "Failed to read identity keypair from {keypair_file}: {err}"
577            ))
578        })?;
579
580        AdminRpcImpl::set_identity_keypair(meta, identity_keypair, require_tower)
581    }
582
583    fn set_identity_from_bytes(
584        &self,
585        meta: Self::Metadata,
586        identity_keypair: Vec<u8>,
587        require_tower: bool,
588    ) -> Result<()> {
589        debug!("set_identity_from_bytes request received");
590
591        let identity_keypair = Keypair::try_from(identity_keypair.as_ref()).map_err(|err| {
592            jsonrpc_core::error::Error::invalid_params(format!(
593                "Failed to read identity keypair from provided byte array: {err}"
594            ))
595        })?;
596
597        AdminRpcImpl::set_identity_keypair(meta, identity_keypair, require_tower)
598    }
599
600    fn set_staked_nodes_overrides(&self, meta: Self::Metadata, path: String) -> Result<()> {
601        let loaded_config = load_staked_nodes_overrides(&path)
602            .map_err(|err| {
603                error!(
604                    "Failed to load staked nodes overrides from {}: {}",
605                    &path, err
606                );
607                jsonrpc_core::error::Error::internal_error()
608            })?
609            .staked_map_id;
610        let mut write_staked_nodes = meta.staked_nodes_overrides.write().unwrap();
611        write_staked_nodes.clear();
612        write_staked_nodes.extend(loaded_config);
613        info!("Staked nodes overrides loaded from {path}");
614        debug!("overrides map: {write_staked_nodes:?}");
615        Ok(())
616    }
617
618    fn contact_info(&self, meta: Self::Metadata) -> Result<AdminRpcContactInfo> {
619        meta.with_post_init(|post_init| Ok(post_init.cluster_info.my_contact_info().into()))
620    }
621
622    fn select_active_interface(&self, meta: Self::Metadata, interface: IpAddr) -> Result<()> {
623        debug!("select_active_interface received: {interface}");
624        meta.with_post_init(|post_init| {
625            let node = post_init.node.as_ref().ok_or_else(|| {
626                jsonrpc_core::Error::invalid_params("`Node` not initialized in post_init")
627            })?;
628
629            node.switch_active_interface(interface, &post_init.cluster_info)
630                .map_err(|e| {
631                    jsonrpc_core::Error::invalid_params(format!(
632                        "Switching failed due to error {e}"
633                    ))
634                })?;
635            info!("Switched primary interface to {interface}");
636            Ok(())
637        })
638    }
639
640    fn repair_shred_from_peer(
641        &self,
642        meta: Self::Metadata,
643        pubkey: Option<Pubkey>,
644        slot: u64,
645        shred_index: u64,
646    ) -> Result<()> {
647        debug!("repair_shred_from_peer request received");
648
649        meta.with_post_init(|post_init| {
650            repair_service::RepairService::request_repair_for_shred_from_peer(
651                post_init.cluster_info.clone(),
652                post_init.cluster_slots.clone(),
653                pubkey,
654                slot,
655                shred_index,
656                &post_init.repair_socket,
657                post_init.outstanding_repair_requests.clone(),
658            );
659            Ok(())
660        })
661    }
662
663    fn repair_whitelist(&self, meta: Self::Metadata) -> Result<AdminRpcRepairWhitelist> {
664        debug!("repair_whitelist request received");
665
666        meta.with_post_init(|post_init| {
667            let whitelist: Vec<_> = post_init
668                .repair_whitelist
669                .read()
670                .unwrap()
671                .iter()
672                .copied()
673                .collect();
674            Ok(AdminRpcRepairWhitelist { whitelist })
675        })
676    }
677
678    fn set_repair_whitelist(&self, meta: Self::Metadata, whitelist: Vec<Pubkey>) -> Result<()> {
679        debug!("set_repair_whitelist request received");
680
681        let whitelist: HashSet<Pubkey> = whitelist.into_iter().collect();
682        meta.with_post_init(|post_init| {
683            *post_init.repair_whitelist.write().unwrap() = whitelist;
684            warn!(
685                "Repair whitelist set to {:?}",
686                &post_init.repair_whitelist.read().unwrap()
687            );
688            Ok(())
689        })
690    }
691
692    fn get_secondary_index_key_size(
693        &self,
694        meta: Self::Metadata,
695        pubkey_str: String,
696    ) -> Result<HashMap<RpcAccountIndex, usize>> {
697        debug!("get_secondary_index_key_size rpc request received: {pubkey_str:?}");
698        let index_key = verify_pubkey(&pubkey_str)?;
699        meta.with_post_init(|post_init| {
700            let bank = post_init.bank_forks.read().unwrap().root_bank();
701
702            // Take ref to enabled AccountSecondaryIndexes
703            let enabled_account_indexes = &bank.accounts().accounts_db.account_indexes;
704
705            // Exit if secondary indexes are not enabled
706            if enabled_account_indexes.is_empty() {
707                debug!("get_secondary_index_key_size: secondary index not enabled.");
708                return Ok(HashMap::new());
709            };
710
711            // Make sure the requested key is not explicitly excluded
712            if !enabled_account_indexes.include_key(&index_key) {
713                return Err(RpcCustomError::KeyExcludedFromSecondaryIndex {
714                    index_key: index_key.to_string(),
715                }
716                .into());
717            }
718
719            // Grab a ref to the AccountsDbfor this Bank
720            let accounts_index = &bank.accounts().accounts_db.accounts_index;
721
722            // Find the size of the key in every index where it exists
723            let found_sizes = enabled_account_indexes
724                .indexes
725                .iter()
726                .filter_map(|index| {
727                    accounts_index
728                        .get_index_key_size(index, &index_key)
729                        .map(|size| (rpc_account_index_from_account_index(index), size))
730                })
731                .collect::<HashMap<_, _>>();
732
733            // Note: Will return an empty HashMap if no keys are found.
734            if found_sizes.is_empty() {
735                debug!("get_secondary_index_key_size: key not found in the secondary index.");
736            }
737            Ok(found_sizes)
738        })
739    }
740
741    fn set_public_tpu_address(
742        &self,
743        meta: Self::Metadata,
744        public_tpu_addr: SocketAddr,
745    ) -> Result<()> {
746        debug!("set_public_tpu_address rpc request received: {public_tpu_addr}");
747
748        meta.with_post_init(|post_init| {
749            post_init
750                .cluster_info
751                .my_contact_info()
752                .tpu(Protocol::QUIC)
753                .ok_or_else(|| {
754                    error!(
755                        "The public TPU QUIC address isn't being published. The node is likely in \
756                         repair mode. See help for --restricted-repair-only-mode for more \
757                         information."
758                    );
759                    jsonrpc_core::error::Error::internal_error()
760                })?;
761            post_init
762                .cluster_info
763                .set_tpu_quic(public_tpu_addr)
764                .map_err(|err| {
765                    error!("Failed to set public TPU QUIC address to {public_tpu_addr}: {err}");
766                    jsonrpc_core::error::Error::internal_error()
767                })?;
768            let my_contact_info = post_init.cluster_info.my_contact_info();
769            warn!(
770                "Public TPU addresses set to {:?} (quic)",
771                my_contact_info.tpu(Protocol::QUIC),
772            );
773            Ok(())
774        })
775    }
776
777    fn set_public_tpu_forwards_address(
778        &self,
779        meta: Self::Metadata,
780        public_tpu_forwards_addr: SocketAddr,
781    ) -> Result<()> {
782        debug!("set_public_tpu_forwards_address rpc request received: {public_tpu_forwards_addr}");
783
784        meta.with_post_init(|post_init| {
785            post_init
786                .cluster_info
787                .my_contact_info()
788                .tpu_forwards(Protocol::QUIC)
789                .ok_or_else(|| {
790                    error!(
791                        "The public TPU Forwards address isn't being published. The node is \
792                         likely in repair mode. See help for --restricted-repair-only-mode for \
793                         more information."
794                    );
795                    jsonrpc_core::error::Error::internal_error()
796                })?;
797            post_init
798                .cluster_info
799                .set_tpu_forwards_quic(public_tpu_forwards_addr)
800                .map_err(|err| {
801                    error!(
802                        "Failed to set public TPU QUIC address to {public_tpu_forwards_addr}: \
803                         {err}"
804                    );
805                    jsonrpc_core::error::Error::internal_error()
806                })?;
807            let my_contact_info = post_init.cluster_info.my_contact_info();
808            warn!(
809                "Public TPU Forwards address set to {:?} (quic)",
810                my_contact_info.tpu_forwards(Protocol::QUIC),
811            );
812            Ok(())
813        })
814    }
815
816    fn set_public_tvu_address(
817        &self,
818        meta: Self::Metadata,
819        public_tvu_addr: SocketAddr,
820    ) -> Result<()> {
821        debug!("set_public_tvu_address rpc request received: {public_tvu_addr}");
822
823        meta.with_post_init(|post_init| {
824            post_init
825                .cluster_info
826                .my_contact_info()
827                .tvu(Protocol::UDP)
828                .ok_or_else(|| {
829                    error!(
830                        "The public TVU address isn't being published. The node is likely in \
831                         repair mode. See help for --restricted-repair-only-mode for more \
832                         information."
833                    );
834                    jsonrpc_core::error::Error::internal_error()
835                })?;
836            post_init
837                .cluster_info
838                .set_tvu_socket(public_tvu_addr)
839                .map_err(|err| {
840                    error!("Failed to set public TVU address to {public_tvu_addr}: {err}");
841                    jsonrpc_core::error::Error::internal_error()
842                })?;
843            let my_contact_info = post_init.cluster_info.my_contact_info();
844            warn!(
845                "Public TVU addresses set to {:?}",
846                my_contact_info.tvu(Protocol::UDP),
847            );
848            Ok(())
849        })
850    }
851
852    fn manage_block_production(
853        &self,
854        meta: Self::Metadata,
855        block_production_method: BlockProductionMethod,
856        transaction_struct: TransactionStructure,
857        num_workers: NonZeroUsize,
858        scheduler_pacing: SchedulerPacing,
859    ) -> Result<()> {
860        debug!("manage_block_production rpc request received");
861
862        if num_workers > BankingStage::max_num_workers() {
863            return Err(jsonrpc_core::error::Error::invalid_params(format!(
864                "Number of workers ({}) exceeds maximum allowed ({})",
865                num_workers,
866                BankingStage::max_num_workers()
867            )));
868        }
869
870        if transaction_struct != TransactionStructure::View {
871            warn!("TransactionStructure::Sdk has no effect on block production");
872        }
873
874        meta.with_post_init(|post_init| {
875            if post_init
876                .banking_control_sender
877                .try_send(BankingControlMsg::Internal {
878                    block_production_method,
879                    num_workers,
880                    config: SchedulerConfig { scheduler_pacing },
881                })
882                .is_err()
883            {
884                error!("Banking stage already switching schedulers");
885
886                return Err(jsonrpc_core::error::Error::internal_error());
887            }
888
889            Ok(())
890        })
891    }
892
893    fn is_generating_snapshots(&self, meta: Self::Metadata) -> Result<bool> {
894        if let Some(snapshot_controller) = meta.snapshot_controller() {
895            Ok(snapshot_controller.is_generating_snapshots())
896        } else {
897            Err(jsonrpc_core::error::Error::invalid_params(
898                "snapshot_controller unavailable",
899            ))
900        }
901    }
902
903    fn blockstore_purge(&self, meta: Self::Metadata, maximum_purge_slot: Slot) -> Result<()> {
904        meta.with_post_init(|post_init| {
905            post_init
906                .blockstore
907                .send_manual_purge_request(maximum_purge_slot)
908                .map_err(|err| jsonrpc_core::Error {
909                    code: ErrorCode::InvalidRequest,
910                    message: format!("{err}"),
911                    data: None,
912                })
913        })
914    }
915}
916
917impl AdminRpcImpl {
918    fn add_authorized_voter_keypair(
919        meta: AdminRpcRequestMetadata,
920        authorized_voter: Keypair,
921    ) -> Result<()> {
922        let mut authorized_voter_keypairs = meta.authorized_voter_keypairs.write().unwrap();
923
924        if authorized_voter_keypairs
925            .iter()
926            .any(|x| x.pubkey() == authorized_voter.pubkey())
927        {
928            Err(jsonrpc_core::error::Error::invalid_params(
929                "Authorized voter already present",
930            ))
931        } else {
932            authorized_voter_keypairs.push(Arc::new(authorized_voter));
933            Ok(())
934        }
935    }
936
937    fn set_identity_keypair(
938        meta: AdminRpcRequestMetadata,
939        identity_keypair: Keypair,
940        require_tower: bool,
941    ) -> Result<()> {
942        meta.with_post_init(|post_init| {
943            if require_tower {
944                let _ = Tower::restore(meta.tower_storage.as_ref(), &identity_keypair.pubkey())
945                    .map_err(|err| {
946                        jsonrpc_core::error::Error::invalid_params(format!(
947                            "Unable to load tower file for identity {}: {}",
948                            identity_keypair.pubkey(),
949                            err
950                        ))
951                    })?;
952            }
953
954            for (key, notifier) in &*post_init.notifies.read().unwrap() {
955                if let Err(err) = notifier.update_key(&identity_keypair) {
956                    error!("Error updating network layer keypair: {err} on {key:?}");
957                }
958            }
959
960            let old_identity = post_init.cluster_info.id();
961            let new_identity = identity_keypair.pubkey();
962            solana_metrics::set_host_id(new_identity.to_string());
963            // Emit the datapoint after updating metrics to emit the new pubkey
964            datapoint_info!(
965                "validator-set_identity",
966                ("old_id", old_identity.to_string(), String),
967                ("new_id", new_identity.to_string(), String),
968                ("version", solana_version::version!(), String),
969            );
970            post_init
971                .cluster_info
972                .set_keypair(Arc::new(identity_keypair));
973            warn!("Identity set to {new_identity}");
974            Ok(())
975        })
976    }
977}
978
979fn rpc_account_index_from_account_index(account_index: &AccountIndex) -> RpcAccountIndex {
980    match account_index {
981        AccountIndex::ProgramId => RpcAccountIndex::ProgramId,
982        AccountIndex::SplTokenOwner => RpcAccountIndex::SplTokenOwner,
983        AccountIndex::SplTokenMint => RpcAccountIndex::SplTokenMint,
984    }
985}
986
987// Start the Admin RPC interface
988pub fn run(ledger_path: &Path, metadata: AdminRpcRequestMetadata) {
989    let admin_rpc_path = admin_rpc_path(ledger_path);
990
991    let event_loop = tokio::runtime::Builder::new_multi_thread()
992        .thread_name("solAdminRpcEl")
993        .worker_threads(3) // Three still seems like a lot, and better than the default of available core count
994        .enable_all()
995        .build()
996        .unwrap();
997
998    Builder::new()
999        .name("solAdminRpc".to_string())
1000        .spawn(move || {
1001            let mut io = MetaIoHandler::default();
1002            io.extend_with(AdminRpcImpl.to_delegate());
1003
1004            let validator_exit = metadata.validator_exit.clone();
1005            let server = ServerBuilder::with_meta_extractor(io, move |_req: &RequestContext| {
1006                metadata.clone()
1007            })
1008            .event_loop_executor(event_loop.handle().clone())
1009            .start(&format!("{}", admin_rpc_path.display()));
1010
1011            match server {
1012                Err(err) => {
1013                    warn!("Unable to start admin rpc service: {err:?}");
1014                }
1015                Ok(server) => {
1016                    info!("started admin rpc service!");
1017                    let close_handle = server.close_handle();
1018                    validator_exit
1019                        .write()
1020                        .unwrap()
1021                        .register_exit(Box::new(move || {
1022                            close_handle.close();
1023                        }));
1024
1025                    server.wait();
1026                }
1027            }
1028        })
1029        .unwrap();
1030}
1031
1032fn admin_rpc_path(ledger_path: &Path) -> PathBuf {
1033    #[cfg(target_family = "windows")]
1034    {
1035        // More information about the wackiness of pipe names over at
1036        // https://docs.microsoft.com/en-us/windows/win32/ipc/pipe-names
1037        if let Some(ledger_filename) = ledger_path.file_name() {
1038            PathBuf::from(format!(
1039                "\\\\.\\pipe\\{}-admin.rpc",
1040                ledger_filename.to_string_lossy()
1041            ))
1042        } else {
1043            PathBuf::from("\\\\.\\pipe\\admin.rpc")
1044        }
1045    }
1046    #[cfg(not(target_family = "windows"))]
1047    {
1048        ledger_path.join("admin.rpc")
1049    }
1050}
1051
1052// Connect to the Admin RPC interface
1053pub async fn connect(ledger_path: &Path) -> std::result::Result<gen_client::Client, RpcError> {
1054    let admin_rpc_path = admin_rpc_path(ledger_path);
1055    if !admin_rpc_path.exists() {
1056        Err(RpcError::Client(format!(
1057            "{} does not exist",
1058            admin_rpc_path.display()
1059        )))
1060    } else {
1061        ipc::connect::<_, gen_client::Client>(&format!("{}", admin_rpc_path.display())).await
1062    }
1063}
1064
1065// Create a runtime for use by client side admin RPC interface calls
1066pub fn runtime() -> Runtime {
1067    tokio::runtime::Builder::new_multi_thread()
1068        .thread_name("solAdminRpcRt")
1069        .enable_all()
1070        // The agave-validator subcommands make few admin RPC calls and block
1071        // on the results so two workers is plenty
1072        .worker_threads(2)
1073        .build()
1074        .expect("new tokio runtime")
1075}
1076
1077#[derive(Default, Deserialize, Clone)]
1078pub struct StakedNodesOverrides {
1079    #[serde(deserialize_with = "deserialize_pubkey_map")]
1080    pub staked_map_id: HashMap<Pubkey, u64>,
1081}
1082
1083pub fn deserialize_pubkey_map<'de, D>(des: D) -> std::result::Result<HashMap<Pubkey, u64>, D::Error>
1084where
1085    D: Deserializer<'de>,
1086{
1087    let container: HashMap<String, u64> = serde::Deserialize::deserialize(des)?;
1088    let mut container_typed: HashMap<Pubkey, u64> = HashMap::new();
1089    for (key, value) in container.iter() {
1090        let typed_key = Pubkey::try_from(key.as_str())
1091            .map_err(|_| serde::de::Error::invalid_type(serde::de::Unexpected::Map, &"PubKey"))?;
1092        container_typed.insert(typed_key, *value);
1093    }
1094    Ok(container_typed)
1095}
1096
1097pub fn load_staked_nodes_overrides(
1098    path: &String,
1099) -> std::result::Result<StakedNodesOverrides, Box<dyn error::Error>> {
1100    debug!("Loading staked nodes overrides configuration from {path}");
1101    if Path::new(&path).exists() {
1102        let file = std::fs::File::open(path)?;
1103        Ok(serde_yaml::from_reader(file)?)
1104    } else {
1105        Err(format!("Staked nodes overrides provided '{path}' a non-existing file path.").into())
1106    }
1107}
1108
1109#[cfg(test)]
1110mod tests {
1111    use {
1112        super::*,
1113        agave_snapshots::snapshot_config::SnapshotConfig,
1114        crossbeam_channel::unbounded,
1115        serde_json::Value,
1116        solana_account::{Account, AccountSharedData},
1117        solana_accounts_db::{
1118            accounts_db::{ACCOUNTS_DB_CONFIG_FOR_TESTING, AccountsDbConfig},
1119            accounts_index::AccountSecondaryIndexes,
1120        },
1121        solana_core::{
1122            admin_rpc_post_init::{KeyUpdaterType, KeyUpdaters},
1123            consensus::tower_storage::NullTowerStorage,
1124            validator::{Validator, ValidatorConfig, ValidatorTpuConfig},
1125        },
1126        solana_gossip::{cluster_info::ClusterInfo, node::Node},
1127        solana_ledger::{
1128            blockstore::Blockstore,
1129            create_new_tmp_ledger,
1130            genesis_utils::{
1131                GenesisConfigInfo, create_genesis_config, create_genesis_config_with_leader,
1132            },
1133            get_tmp_ledger_path_auto_delete,
1134        },
1135        solana_net_utils::{SocketAddrSpace, sockets::bind_to_localhost_unique},
1136        solana_program_option::COption,
1137        solana_program_pack::Pack,
1138        solana_pubkey::Pubkey,
1139        solana_rpc::rpc::create_validator_exit,
1140        solana_runtime::{
1141            bank::{Bank, BankTestConfig},
1142            bank_forks::BankForks,
1143        },
1144        solana_system_interface::program as system_program,
1145        spl_generic_token::token,
1146        spl_token_2022_interface::state::{
1147            Account as TokenAccount, AccountState as TokenAccountState, Mint,
1148        },
1149        std::{collections::HashSet, fs::remove_dir_all, sync::atomic::AtomicBool},
1150        tokio::sync::mpsc,
1151    };
1152
1153    #[derive(Default)]
1154    struct TestConfig {
1155        account_indexes: AccountSecondaryIndexes,
1156    }
1157
1158    struct RpcHandler {
1159        io: MetaIoHandler<AdminRpcRequestMetadata>,
1160        meta: AdminRpcRequestMetadata,
1161        bank_forks: Arc<RwLock<BankForks>>,
1162    }
1163
1164    impl RpcHandler {
1165        fn _start() -> Self {
1166            Self::start_with_config(TestConfig::default())
1167        }
1168
1169        fn start_with_config(config: TestConfig) -> Self {
1170            let keypair = Arc::new(Keypair::new());
1171            let cluster_info = Arc::new(ClusterInfo::new(
1172                ContactInfo::new(
1173                    keypair.pubkey(),
1174                    solana_time_utils::timestamp(), // wallclock
1175                    0u16,                           // shred_version
1176                ),
1177                keypair,
1178                SocketAddrSpace::Unspecified,
1179            ));
1180            let exit = Arc::new(AtomicBool::new(false));
1181            let validator_exit = create_validator_exit(exit);
1182            let (bank_forks, vote_keypair) = new_bank_forks_with_config(BankTestConfig {
1183                accounts_db_config: AccountsDbConfig {
1184                    account_indexes: Some(config.account_indexes),
1185                    ..ACCOUNTS_DB_CONFIG_FOR_TESTING
1186                },
1187            });
1188
1189            let (snapshot_request_sender, _) = unbounded();
1190            let snapshot_controller = Arc::new(SnapshotController::new(
1191                snapshot_request_sender.clone(),
1192                SnapshotConfig::default(),
1193                bank_forks.read().unwrap().root(),
1194            ));
1195
1196            let ledger_path = get_tmp_ledger_path_auto_delete!();
1197            let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
1198
1199            let vote_account = vote_keypair.pubkey();
1200            let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
1201            let repair_whitelist = Arc::new(RwLock::new(HashSet::new()));
1202            let meta = AdminRpcRequestMetadata {
1203                rpc_addr: None,
1204                start_time: SystemTime::now(),
1205                start_progress,
1206                validator_exit,
1207                validator_exit_backpressure: HashMap::default(),
1208                authorized_voter_keypairs: Arc::new(RwLock::new(vec![vote_keypair])),
1209                tower_storage: Arc::new(NullTowerStorage {}),
1210                post_init: Arc::new(RwLock::new(Some(AdminRpcRequestMetadataPostInit {
1211                    cluster_info,
1212                    bank_forks: bank_forks.clone(),
1213                    vote_account,
1214                    repair_whitelist,
1215                    notifies: Arc::new(RwLock::new(KeyUpdaters::default())),
1216                    repair_socket: Arc::new(bind_to_localhost_unique().expect("should bind")),
1217                    outstanding_repair_requests: Arc::<
1218                        RwLock<repair_service::OutstandingShredRepairs>,
1219                    >::default(),
1220                    cluster_slots: Arc::new(
1221                        solana_core::cluster_slots_service::cluster_slots::ClusterSlots::default_for_tests(),
1222                    ),
1223                    node: None,
1224                    banking_control_sender: mpsc::channel(1).0,
1225                    snapshot_controller,
1226                    blockstore,
1227                }))),
1228                staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
1229                rpc_to_plugin_manager_sender: None,
1230            };
1231            let mut io = MetaIoHandler::default();
1232            io.extend_with(AdminRpcImpl.to_delegate());
1233
1234            Self {
1235                io,
1236                meta,
1237                bank_forks,
1238            }
1239        }
1240
1241        fn root_bank(&self) -> Arc<Bank> {
1242            self.bank_forks.read().unwrap().root_bank()
1243        }
1244    }
1245
1246    fn new_bank_forks_with_config(
1247        config: BankTestConfig,
1248    ) -> (Arc<RwLock<BankForks>>, Arc<Keypair>) {
1249        let GenesisConfigInfo {
1250            genesis_config,
1251            voting_keypair,
1252            ..
1253        } = create_genesis_config(1_000_000_000);
1254
1255        let bank = Bank::new_with_config_for_tests(&genesis_config, config);
1256        (BankForks::new_rw_arc(bank), Arc::new(voting_keypair))
1257    }
1258
1259    #[test]
1260    fn test_secondary_index_key_sizes() {
1261        for secondary_index_enabled in [true, false] {
1262            let account_indexes = if secondary_index_enabled {
1263                AccountSecondaryIndexes {
1264                    keys: None,
1265                    indexes: HashSet::from([
1266                        AccountIndex::ProgramId,
1267                        AccountIndex::SplTokenMint,
1268                        AccountIndex::SplTokenOwner,
1269                    ]),
1270                }
1271            } else {
1272                AccountSecondaryIndexes::default()
1273            };
1274
1275            // RPC & Bank Setup
1276            let rpc = RpcHandler::start_with_config(TestConfig { account_indexes });
1277
1278            let bank = rpc.root_bank();
1279            let RpcHandler { io, meta, .. } = rpc;
1280
1281            // Pubkeys
1282            let token_account1_pubkey = Pubkey::new_unique();
1283            let token_account2_pubkey = Pubkey::new_unique();
1284            let token_account3_pubkey = Pubkey::new_unique();
1285            let mint1_pubkey = Pubkey::new_unique();
1286            let mint2_pubkey = Pubkey::new_unique();
1287            let wallet1_pubkey = Pubkey::new_unique();
1288            let wallet2_pubkey = Pubkey::new_unique();
1289            let non_existent_pubkey = Pubkey::new_unique();
1290            let delegate = Pubkey::new_unique();
1291
1292            let mut num_default_spl_token_program_accounts = 0;
1293            let mut num_default_system_program_accounts = 0;
1294
1295            if !secondary_index_enabled {
1296                // Test first with no accounts added & no secondary indexes enabled:
1297                let req = format!(
1298                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{token_account1_pubkey}"]}}"#,
1299                );
1300                let res = io.handle_request_sync(&req, meta.clone());
1301                let result: Value = serde_json::from_str(&res.expect("actual response"))
1302                    .expect("actual response deserialization");
1303                let sizes: HashMap<RpcAccountIndex, usize> =
1304                    serde_json::from_value(result["result"].clone()).unwrap();
1305                assert!(sizes.is_empty());
1306            } else {
1307                // Count SPL Token Program Default Accounts
1308                let req = format!(
1309                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1310                    token::id(),
1311                );
1312                let res = io.handle_request_sync(&req, meta.clone());
1313                let result: Value = serde_json::from_str(&res.expect("actual response"))
1314                    .expect("actual response deserialization");
1315                let sizes: HashMap<RpcAccountIndex, usize> =
1316                    serde_json::from_value(result["result"].clone()).unwrap();
1317                assert_eq!(sizes.len(), 1);
1318                num_default_spl_token_program_accounts =
1319                    *sizes.get(&RpcAccountIndex::ProgramId).unwrap();
1320                // Count System Program Default Accounts
1321                let req = format!(
1322                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1323                    system_program::id(),
1324                );
1325                let res = io.handle_request_sync(&req, meta.clone());
1326                let result: Value = serde_json::from_str(&res.expect("actual response"))
1327                    .expect("actual response deserialization");
1328                let sizes: HashMap<RpcAccountIndex, usize> =
1329                    serde_json::from_value(result["result"].clone()).unwrap();
1330                assert_eq!(sizes.len(), 1);
1331                num_default_system_program_accounts =
1332                    *sizes.get(&RpcAccountIndex::ProgramId).unwrap();
1333            }
1334
1335            // Add 2 basic wallet accounts
1336            let wallet1_account = AccountSharedData::from(Account {
1337                lamports: 11111111,
1338                owner: system_program::id(),
1339                ..Account::default()
1340            });
1341            bank.store_account(&wallet1_pubkey, &wallet1_account);
1342            let wallet2_account = AccountSharedData::from(Account {
1343                lamports: 11111111,
1344                owner: system_program::id(),
1345                ..Account::default()
1346            });
1347            bank.store_account(&wallet2_pubkey, &wallet2_account);
1348
1349            // Add a token account
1350            let mut account1_data = vec![0; TokenAccount::get_packed_len()];
1351            let token_account1 = TokenAccount {
1352                mint: mint1_pubkey,
1353                owner: wallet1_pubkey,
1354                delegate: COption::Some(delegate),
1355                amount: 420,
1356                state: TokenAccountState::Initialized,
1357                is_native: COption::None,
1358                delegated_amount: 30,
1359                close_authority: COption::Some(wallet1_pubkey),
1360            };
1361            TokenAccount::pack(token_account1, &mut account1_data).unwrap();
1362            let token_account1 = AccountSharedData::from(Account {
1363                lamports: 111,
1364                data: account1_data.to_vec(),
1365                owner: token::id(),
1366                ..Account::default()
1367            });
1368            bank.store_account(&token_account1_pubkey, &token_account1);
1369
1370            // Add the mint
1371            let mut mint1_data = vec![0; Mint::get_packed_len()];
1372            let mint1_state = Mint {
1373                mint_authority: COption::Some(wallet1_pubkey),
1374                supply: 500,
1375                decimals: 2,
1376                is_initialized: true,
1377                freeze_authority: COption::Some(wallet1_pubkey),
1378            };
1379            Mint::pack(mint1_state, &mut mint1_data).unwrap();
1380            let mint_account1 = AccountSharedData::from(Account {
1381                lamports: 222,
1382                data: mint1_data.to_vec(),
1383                owner: token::id(),
1384                ..Account::default()
1385            });
1386            bank.store_account(&mint1_pubkey, &mint_account1);
1387
1388            // Add another token account with the different owner, but same delegate, and mint
1389            let mut account2_data = vec![0; TokenAccount::get_packed_len()];
1390            let token_account2 = TokenAccount {
1391                mint: mint1_pubkey,
1392                owner: wallet2_pubkey,
1393                delegate: COption::Some(delegate),
1394                amount: 420,
1395                state: TokenAccountState::Initialized,
1396                is_native: COption::None,
1397                delegated_amount: 30,
1398                close_authority: COption::Some(wallet2_pubkey),
1399            };
1400            TokenAccount::pack(token_account2, &mut account2_data).unwrap();
1401            let token_account2 = AccountSharedData::from(Account {
1402                lamports: 333,
1403                data: account2_data.to_vec(),
1404                owner: token::id(),
1405                ..Account::default()
1406            });
1407            bank.store_account(&token_account2_pubkey, &token_account2);
1408
1409            // Add another token account with the same owner and delegate but different mint
1410            let mut account3_data = vec![0; TokenAccount::get_packed_len()];
1411            let token_account3 = TokenAccount {
1412                mint: mint2_pubkey,
1413                owner: wallet2_pubkey,
1414                delegate: COption::Some(delegate),
1415                amount: 42,
1416                state: TokenAccountState::Initialized,
1417                is_native: COption::None,
1418                delegated_amount: 30,
1419                close_authority: COption::Some(wallet2_pubkey),
1420            };
1421            TokenAccount::pack(token_account3, &mut account3_data).unwrap();
1422            let token_account3 = AccountSharedData::from(Account {
1423                lamports: 444,
1424                data: account3_data.to_vec(),
1425                owner: token::id(),
1426                ..Account::default()
1427            });
1428            bank.store_account(&token_account3_pubkey, &token_account3);
1429
1430            // Add the new mint
1431            let mut mint2_data = vec![0; Mint::get_packed_len()];
1432            let mint2_state = Mint {
1433                mint_authority: COption::Some(wallet2_pubkey),
1434                supply: 200,
1435                decimals: 3,
1436                is_initialized: true,
1437                freeze_authority: COption::Some(wallet2_pubkey),
1438            };
1439            Mint::pack(mint2_state, &mut mint2_data).unwrap();
1440            let mint_account2 = AccountSharedData::from(Account {
1441                lamports: 555,
1442                data: mint2_data.to_vec(),
1443                owner: token::id(),
1444                ..Account::default()
1445            });
1446            bank.store_account(&mint2_pubkey, &mint_account2);
1447
1448            // Accounts should now look like the following:
1449            //
1450            //                   -----system_program------
1451            //                  /                         \
1452            //                 /-(owns)                    \-(owns)
1453            //                /                             \
1454            //             wallet1                   ---wallet2---
1455            //               /                      /             \
1456            //              /-(SPL::owns)          /-(SPL::owns)   \-(SPL::owns)
1457            //             /                      /                 \
1458            //      token_account1         token_account2       token_account3
1459            //            \                     /                   /
1460            //             \-(SPL::mint)       /-(SPL::mint)       /-(SPL::mint)
1461            //              \                 /                   /
1462            //               --mint_account1--               mint_account2
1463
1464            if secondary_index_enabled {
1465                // ----------- Test for a non-existent key -----------
1466                let req = format!(
1467                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{non_existent_pubkey}"]}}"#,
1468                );
1469                let res = io.handle_request_sync(&req, meta.clone());
1470                let result: Value = serde_json::from_str(&res.expect("actual response"))
1471                    .expect("actual response deserialization");
1472                let sizes: HashMap<RpcAccountIndex, usize> =
1473                    serde_json::from_value(result["result"].clone()).unwrap();
1474                assert!(sizes.is_empty());
1475                // --------------- Test Queries ---------------
1476                // 1) Wallet1 - Owns 1 SPL Token
1477                let req = format!(
1478                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{wallet1_pubkey}"]}}"#,
1479                );
1480                let res = io.handle_request_sync(&req, meta.clone());
1481                let result: Value = serde_json::from_str(&res.expect("actual response"))
1482                    .expect("actual response deserialization");
1483                let sizes: HashMap<RpcAccountIndex, usize> =
1484                    serde_json::from_value(result["result"].clone()).unwrap();
1485                assert_eq!(sizes.len(), 1);
1486                assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenOwner).unwrap(), 1);
1487                // 2) Wallet2 - Owns 2 SPL Tokens
1488                let req = format!(
1489                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{wallet2_pubkey}"]}}"#,
1490                );
1491                let res = io.handle_request_sync(&req, meta.clone());
1492                let result: Value = serde_json::from_str(&res.expect("actual response"))
1493                    .expect("actual response deserialization");
1494                let sizes: HashMap<RpcAccountIndex, usize> =
1495                    serde_json::from_value(result["result"].clone()).unwrap();
1496                assert_eq!(sizes.len(), 1);
1497                assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenOwner).unwrap(), 2);
1498                // 3) Mint1 - Is in 2 SPL Accounts
1499                let req = format!(
1500                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{mint1_pubkey}"]}}"#,
1501                );
1502                let res = io.handle_request_sync(&req, meta.clone());
1503                let result: Value = serde_json::from_str(&res.expect("actual response"))
1504                    .expect("actual response deserialization");
1505                let sizes: HashMap<RpcAccountIndex, usize> =
1506                    serde_json::from_value(result["result"].clone()).unwrap();
1507                assert_eq!(sizes.len(), 1);
1508                assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenMint).unwrap(), 2);
1509                // 4) Mint2 - Is in 1 SPL Account
1510                let req = format!(
1511                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{mint2_pubkey}"]}}"#,
1512                );
1513                let res = io.handle_request_sync(&req, meta.clone());
1514                let result: Value = serde_json::from_str(&res.expect("actual response"))
1515                    .expect("actual response deserialization");
1516                let sizes: HashMap<RpcAccountIndex, usize> =
1517                    serde_json::from_value(result["result"].clone()).unwrap();
1518                assert_eq!(sizes.len(), 1);
1519                assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenMint).unwrap(), 1);
1520                // 5) SPL Token Program Owns 6 Accounts - 1 Default, 5 created above.
1521                let req = format!(
1522                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1523                    token::id(),
1524                );
1525                let res = io.handle_request_sync(&req, meta.clone());
1526                let result: Value = serde_json::from_str(&res.expect("actual response"))
1527                    .expect("actual response deserialization");
1528                let sizes: HashMap<RpcAccountIndex, usize> =
1529                    serde_json::from_value(result["result"].clone()).unwrap();
1530                assert_eq!(sizes.len(), 1);
1531                assert_eq!(
1532                    *sizes.get(&RpcAccountIndex::ProgramId).unwrap(),
1533                    (num_default_spl_token_program_accounts + 5)
1534                );
1535                // 5) System Program Owns 4 Accounts + 2 Default, 2 created above.
1536                let req = format!(
1537                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
1538                    system_program::id(),
1539                );
1540                let res = io.handle_request_sync(&req, meta.clone());
1541                let result: Value = serde_json::from_str(&res.expect("actual response"))
1542                    .expect("actual response deserialization");
1543                let sizes: HashMap<RpcAccountIndex, usize> =
1544                    serde_json::from_value(result["result"].clone()).unwrap();
1545                assert_eq!(sizes.len(), 1);
1546                assert_eq!(
1547                    *sizes.get(&RpcAccountIndex::ProgramId).unwrap(),
1548                    (num_default_system_program_accounts + 2)
1549                );
1550            } else {
1551                // ------------ Secondary Indexes Disabled ------------
1552                let req = format!(
1553                    r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{token_account2_pubkey}"]}}"#,
1554                );
1555                let res = io.handle_request_sync(&req, meta.clone());
1556                let result: Value = serde_json::from_str(&res.expect("actual response"))
1557                    .expect("actual response deserialization");
1558                let sizes: HashMap<RpcAccountIndex, usize> =
1559                    serde_json::from_value(result["result"].clone()).unwrap();
1560                assert!(sizes.is_empty());
1561            }
1562        }
1563    }
1564
1565    // This test checks that the rpc call to `set_identity` works a expected with
1566    // Bank but without validator.
1567    #[test]
1568    fn test_set_identity() {
1569        let rpc = RpcHandler::start_with_config(TestConfig::default());
1570
1571        let RpcHandler { io, meta, .. } = rpc;
1572
1573        let expected_validator_id = Keypair::new();
1574        let validator_id_bytes = format!("{:?}", expected_validator_id.to_bytes());
1575
1576        let set_id_request = format!(
1577            r#"{{"jsonrpc":"2.0","id":1,"method":"setIdentityFromBytes","params":[{validator_id_bytes}, false]}}"#,
1578        );
1579        let response = io.handle_request_sync(&set_id_request, meta.clone());
1580        let actual_parsed_response: Value =
1581            serde_json::from_str(&response.expect("actual response"))
1582                .expect("actual response deserialization");
1583
1584        let expected_parsed_response: Value = serde_json::from_str(
1585            r#"{
1586                "id": 1,
1587                "jsonrpc": "2.0",
1588                "result": null
1589            }"#,
1590        )
1591        .expect("Failed to parse expected response");
1592        assert_eq!(actual_parsed_response, expected_parsed_response);
1593
1594        let contact_info_request =
1595            r#"{"jsonrpc":"2.0","id":1,"method":"contactInfo","params":[]}"#.to_string();
1596        let response = io.handle_request_sync(&contact_info_request, meta.clone());
1597        let parsed_response: Value = serde_json::from_str(&response.expect("actual response"))
1598            .expect("actual response deserialization");
1599        let actual_validator_id = parsed_response["result"]["id"]
1600            .as_str()
1601            .expect("Expected a string");
1602        assert_eq!(
1603            actual_validator_id,
1604            expected_validator_id.pubkey().to_string()
1605        );
1606    }
1607
1608    struct TestValidatorWithAdminRpc {
1609        meta: AdminRpcRequestMetadata,
1610        io: MetaIoHandler<AdminRpcRequestMetadata>,
1611        validator_ledger_path: PathBuf,
1612    }
1613
1614    impl TestValidatorWithAdminRpc {
1615        fn new() -> Self {
1616            let leader_keypair = Keypair::new();
1617            let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
1618
1619            let validator_keypair = Keypair::new();
1620            let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
1621            let genesis_config =
1622                create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1000)
1623                    .genesis_config;
1624            let (validator_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
1625
1626            let voting_keypair = Arc::new(Keypair::new());
1627            let voting_pubkey = voting_keypair.pubkey();
1628            let authorized_voter_keypairs = Arc::new(RwLock::new(vec![voting_keypair]));
1629            let validator_config = ValidatorConfig {
1630                rpc_addrs: Some((
1631                    validator_node.info.rpc().unwrap(),
1632                    validator_node.info.rpc_pubsub().unwrap(),
1633                )),
1634                ..ValidatorConfig::default_for_test()
1635            };
1636            let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
1637
1638            let post_init = Arc::new(RwLock::new(None));
1639            let meta = AdminRpcRequestMetadata {
1640                rpc_addr: validator_config.rpc_addrs.map(|(rpc_addr, _)| rpc_addr),
1641                start_time: SystemTime::now(),
1642                start_progress: start_progress.clone(),
1643                validator_exit: validator_config.validator_exit.clone(),
1644                validator_exit_backpressure: HashMap::default(),
1645                authorized_voter_keypairs: authorized_voter_keypairs.clone(),
1646                tower_storage: Arc::new(NullTowerStorage {}),
1647                post_init: post_init.clone(),
1648                staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
1649                rpc_to_plugin_manager_sender: None,
1650            };
1651
1652            let _validator = Validator::new(
1653                validator_node,
1654                Arc::new(validator_keypair),
1655                &validator_ledger_path,
1656                &voting_pubkey,
1657                authorized_voter_keypairs,
1658                vec![leader_node.info],
1659                &validator_config,
1660                true, // should_check_duplicate_instance
1661                None, // rpc_to_plugin_manager_receiver
1662                start_progress.clone(),
1663                SocketAddrSpace::Unspecified,
1664                ValidatorTpuConfig::new_for_tests(),
1665                post_init.clone(),
1666                None,
1667            )
1668            .expect("assume successful validator start");
1669            assert_eq!(
1670                *start_progress.read().unwrap(),
1671                ValidatorStartProgress::Running
1672            );
1673            let post_init = post_init.read().unwrap();
1674
1675            assert!(post_init.is_some());
1676            let post_init = post_init.as_ref().unwrap();
1677            let notifies = post_init.notifies.read().unwrap();
1678            let updater_keys: HashSet<KeyUpdaterType> =
1679                notifies.into_iter().map(|(key, _)| key.clone()).collect();
1680            assert_eq!(
1681                updater_keys,
1682                HashSet::from_iter(vec![
1683                    KeyUpdaterType::Tpu,
1684                    KeyUpdaterType::TpuForwards,
1685                    KeyUpdaterType::TpuVote,
1686                    KeyUpdaterType::Forward,
1687                    KeyUpdaterType::RpcService,
1688                    KeyUpdaterType::Bls,
1689                    KeyUpdaterType::BlsConnectionCache,
1690                ])
1691            );
1692            let mut io = MetaIoHandler::default();
1693            io.extend_with(AdminRpcImpl.to_delegate());
1694            Self {
1695                meta,
1696                io,
1697                validator_ledger_path,
1698            }
1699        }
1700
1701        fn handle_request(&self, request: &str) -> Option<String> {
1702            self.io.handle_request_sync(request, self.meta.clone())
1703        }
1704    }
1705
1706    impl Drop for TestValidatorWithAdminRpc {
1707        fn drop(&mut self) {
1708            remove_dir_all(self.validator_ledger_path.clone()).unwrap();
1709        }
1710    }
1711
1712    #[test]
1713    fn test_no_post_init_no_snapshot_controller() {
1714        let validator_exit = create_validator_exit(Arc::new(AtomicBool::new(false)));
1715        let voting_keypair = Arc::new(Keypair::new());
1716        let authorized_voter_keypairs = Arc::new(RwLock::new(vec![voting_keypair]));
1717        let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
1718
1719        let post_init = Arc::new(RwLock::new(None));
1720        let meta = AdminRpcRequestMetadata {
1721            rpc_addr: None,
1722            start_time: SystemTime::now(),
1723            start_progress: start_progress.clone(),
1724            validator_exit,
1725            validator_exit_backpressure: HashMap::default(),
1726            authorized_voter_keypairs: authorized_voter_keypairs.clone(),
1727            tower_storage: Arc::new(NullTowerStorage {}),
1728            post_init: post_init.clone(),
1729            staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
1730            rpc_to_plugin_manager_sender: None,
1731        };
1732
1733        let snapshot_controller = meta.snapshot_controller();
1734        assert!(snapshot_controller.is_none());
1735    }
1736
1737    // This test checks that `set_identity` call works with working validator and client.
1738    #[test]
1739    fn test_set_identity_with_validator() {
1740        let test_validator = TestValidatorWithAdminRpc::new();
1741        let expected_validator_id = Keypair::new();
1742        let validator_id_bytes = format!("{:?}", expected_validator_id.to_bytes());
1743
1744        let set_id_request = format!(
1745            r#"{{"jsonrpc":"2.0","id":1,"method":"setIdentityFromBytes","params":[{validator_id_bytes}, false]}}"#,
1746        );
1747        let response = test_validator.handle_request(&set_id_request);
1748        let actual_parsed_response: Value =
1749            serde_json::from_str(&response.expect("actual response"))
1750                .expect("actual response deserialization");
1751
1752        let expected_parsed_response: Value = serde_json::from_str(
1753            r#"{
1754                "id": 1,
1755                "jsonrpc": "2.0",
1756                "result": null
1757            }"#,
1758        )
1759        .expect("Failed to parse expected response");
1760        assert_eq!(actual_parsed_response, expected_parsed_response);
1761
1762        let contact_info_request =
1763            r#"{"jsonrpc":"2.0","id":1,"method":"contactInfo","params":[]}"#.to_string();
1764        let response = test_validator.handle_request(&contact_info_request);
1765        let parsed_response: Value = serde_json::from_str(&response.expect("actual response"))
1766            .expect("actual response deserialization");
1767        let actual_validator_id = parsed_response["result"]["id"]
1768            .as_str()
1769            .expect("Expected a string");
1770        assert_eq!(
1771            actual_validator_id,
1772            expected_validator_id.pubkey().to_string()
1773        );
1774
1775        let contact_info_request =
1776            r#"{"jsonrpc":"2.0","id":1,"method":"exit","params":[]}"#.to_string();
1777        let exit_response = test_validator.handle_request(&contact_info_request);
1778        let actual_parsed_response: Value =
1779            serde_json::from_str(&exit_response.expect("actual response"))
1780                .expect("actual response deserialization");
1781        assert_eq!(actual_parsed_response, expected_parsed_response);
1782    }
1783
1784    #[test]
1785    fn test_is_generating_snapshots() {
1786        // Test with snapshots enabled
1787        let rpc = RpcHandler::start_with_config(TestConfig::default());
1788        let RpcHandler { io, meta, .. } = rpc;
1789
1790        let request = r#"{"jsonrpc":"2.0","id":1,"method":"isGeneratingSnapshots","params":[]}"#;
1791        let response = io.handle_request_sync(request, meta.clone());
1792        let result: Value = serde_json::from_str(&response.expect("actual response"))
1793            .expect("actual response deserialization");
1794
1795        // Should return a boolean result indicating if snapshots are being generated
1796        assert!(result["result"].is_boolean());
1797        // Verify that snapshots are being generated since the test setup includes a snapshot controller
1798        assert!(result["result"].as_bool().unwrap());
1799    }
1800
1801    #[test]
1802    fn test_is_generating_snapshots_no_controller() {
1803        // Test with snapshots enabled
1804        let rpc = RpcHandler::start_with_config(TestConfig::default());
1805        let RpcHandler { io, .. } = rpc;
1806
1807        // Test with no post_init (snapshot_controller unavailable)
1808        let request = r#"{"jsonrpc":"2.0","id":1,"method":"isGeneratingSnapshots","params":[]}"#;
1809        let validator_exit = create_validator_exit(Arc::new(AtomicBool::new(false)));
1810        let authorized_voter_keypairs = Arc::new(RwLock::new(vec![Arc::new(Keypair::new())]));
1811        let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
1812
1813        let meta_no_post_init = AdminRpcRequestMetadata {
1814            rpc_addr: None,
1815            start_time: SystemTime::now(),
1816            start_progress,
1817            validator_exit,
1818            validator_exit_backpressure: HashMap::default(),
1819            authorized_voter_keypairs,
1820            tower_storage: Arc::new(NullTowerStorage {}),
1821            post_init: Arc::new(RwLock::new(None)),
1822            staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
1823            rpc_to_plugin_manager_sender: None,
1824        };
1825
1826        let response = io.handle_request_sync(request, meta_no_post_init);
1827        let result: Value = serde_json::from_str(&response.expect("actual response"))
1828            .expect("actual response deserialization");
1829
1830        // Should return an error when snapshot_controller is unavailable
1831        assert!(result["error"].is_object());
1832        assert_eq!(
1833            result["error"]["message"].as_str().unwrap(),
1834            "snapshot_controller unavailable"
1835        );
1836    }
1837}