solana_validator/
dashboard.rs

1use {
2    crate::{
3        admin_rpc_service, format_name_value, new_spinner_progress_bar, println_name_value,
4        ProgressBar,
5    },
6    console::style,
7    solana_core::validator::ValidatorStartProgress,
8    solana_rpc_client::rpc_client::RpcClient,
9    solana_rpc_client_api::{client_error, request, response::RpcContactInfo},
10    solana_sdk::{
11        clock::Slot, commitment_config::CommitmentConfig, exit::Exit, native_token::Sol,
12        pubkey::Pubkey,
13    },
14    std::{
15        io,
16        net::SocketAddr,
17        path::{Path, PathBuf},
18        sync::{
19            atomic::{AtomicBool, Ordering},
20            Arc,
21        },
22        thread,
23        time::{Duration, SystemTime},
24    },
25};
26
27pub struct Dashboard {
28    progress_bar: ProgressBar,
29    ledger_path: PathBuf,
30    exit: Arc<AtomicBool>,
31}
32
33impl Dashboard {
34    pub fn new(
35        ledger_path: &Path,
36        log_path: Option<&Path>,
37        validator_exit: Option<&mut Exit>,
38    ) -> Result<Self, io::Error> {
39        println_name_value("Ledger location:", &format!("{}", ledger_path.display()));
40        if let Some(log_path) = log_path {
41            println_name_value("Log:", &format!("{}", log_path.display()));
42        }
43
44        let progress_bar = new_spinner_progress_bar();
45        progress_bar.set_message("Initializing...");
46
47        let exit = Arc::new(AtomicBool::new(false));
48        if let Some(validator_exit) = validator_exit {
49            let exit = exit.clone();
50            validator_exit.register_exit(Box::new(move || exit.store(true, Ordering::Relaxed)));
51        }
52
53        Ok(Self {
54            exit,
55            ledger_path: ledger_path.to_path_buf(),
56            progress_bar,
57        })
58    }
59
60    pub fn run(self, refresh_interval: Duration) {
61        let Self {
62            exit,
63            ledger_path,
64            progress_bar,
65            ..
66        } = self;
67        drop(progress_bar);
68
69        let runtime = admin_rpc_service::runtime();
70        while !exit.load(Ordering::Relaxed) {
71            let progress_bar = new_spinner_progress_bar();
72            progress_bar.set_message("Connecting...");
73
74            let Some((rpc_addr, start_time)) = runtime.block_on(wait_for_validator_startup(
75                &ledger_path,
76                &exit,
77                progress_bar,
78                refresh_interval,
79            )) else {
80                continue;
81            };
82
83            let rpc_client = RpcClient::new_socket(rpc_addr);
84            let mut identity = match rpc_client.get_identity() {
85                Ok(identity) => identity,
86                Err(err) => {
87                    println!("Failed to get validator identity over RPC: {err}");
88                    continue;
89                }
90            };
91            println_name_value("Identity:", &identity.to_string());
92
93            if let Ok(genesis_hash) = rpc_client.get_genesis_hash() {
94                println_name_value("Genesis Hash:", &genesis_hash.to_string());
95            }
96
97            if let Some(contact_info) = get_contact_info(&rpc_client, &identity) {
98                println_name_value(
99                    "Version:",
100                    &contact_info.version.unwrap_or_else(|| "?".to_string()),
101                );
102                if let Some(shred_version) = contact_info.shred_version {
103                    println_name_value("Shred Version:", &shred_version.to_string());
104                }
105                if let Some(gossip) = contact_info.gossip {
106                    println_name_value("Gossip Address:", &gossip.to_string());
107                }
108                if let Some(tpu) = contact_info.tpu {
109                    println_name_value("TPU Address:", &tpu.to_string());
110                }
111                if let Some(rpc) = contact_info.rpc {
112                    println_name_value("JSON RPC URL:", &format!("http://{rpc}"));
113                }
114                if let Some(pubsub) = contact_info.pubsub {
115                    println_name_value("WebSocket PubSub URL:", &format!("ws://{pubsub}"));
116                }
117            }
118
119            let progress_bar = new_spinner_progress_bar();
120            let mut snapshot_slot_info = None;
121            for i in 0.. {
122                if exit.load(Ordering::Relaxed) {
123                    break;
124                }
125                if i % 10 == 0 {
126                    snapshot_slot_info = rpc_client.get_highest_snapshot_slot().ok();
127                }
128
129                let new_identity = rpc_client.get_identity().unwrap_or(identity);
130                if identity != new_identity {
131                    identity = new_identity;
132                    progress_bar.println(format_name_value("Identity:", &identity.to_string()));
133                }
134
135                match get_validator_stats(&rpc_client, &identity) {
136                    Ok((
137                        processed_slot,
138                        confirmed_slot,
139                        finalized_slot,
140                        transaction_count,
141                        identity_balance,
142                        health,
143                    )) => {
144                        let uptime = {
145                            let uptime =
146                                chrono::Duration::from_std(start_time.elapsed().unwrap()).unwrap();
147
148                            format!(
149                                "{:02}:{:02}:{:02} ",
150                                uptime.num_hours(),
151                                uptime.num_minutes() % 60,
152                                uptime.num_seconds() % 60
153                            )
154                        };
155
156                        progress_bar.set_message(format!(
157                            "{}{}| \
158                                    Processed Slot: {} | Confirmed Slot: {} | Finalized Slot: {} | \
159                                    Full Snapshot Slot: {} | Incremental Snapshot Slot: {} | \
160                                    Transactions: {} | {}",
161                            uptime,
162                            if health == "ok" {
163                                "".to_string()
164                            } else {
165                                format!("| {} ", style(health).bold().red())
166                            },
167                            processed_slot,
168                            confirmed_slot,
169                            finalized_slot,
170                            snapshot_slot_info
171                                .as_ref()
172                                .map(|snapshot_slot_info| snapshot_slot_info.full.to_string())
173                                .unwrap_or_else(|| '-'.to_string()),
174                            snapshot_slot_info
175                                .as_ref()
176                                .and_then(|snapshot_slot_info| snapshot_slot_info
177                                    .incremental
178                                    .map(|incremental| incremental.to_string()))
179                                .unwrap_or_else(|| '-'.to_string()),
180                            transaction_count,
181                            identity_balance
182                        ));
183                        thread::sleep(refresh_interval);
184                    }
185                    Err(err) => {
186                        progress_bar.abandon_with_message(format!("RPC connection failure: {err}"));
187                        break;
188                    }
189                }
190            }
191        }
192    }
193}
194
195async fn wait_for_validator_startup(
196    ledger_path: &Path,
197    exit: &AtomicBool,
198    progress_bar: ProgressBar,
199    refresh_interval: Duration,
200) -> Option<(SocketAddr, SystemTime)> {
201    let mut admin_client = None;
202    loop {
203        if exit.load(Ordering::Relaxed) {
204            return None;
205        }
206
207        if admin_client.is_none() {
208            match admin_rpc_service::connect(ledger_path).await {
209                Ok(new_admin_client) => admin_client = Some(new_admin_client),
210                Err(err) => {
211                    progress_bar.set_message(format!("Unable to connect to validator: {err}"));
212                    thread::sleep(refresh_interval);
213                    continue;
214                }
215            }
216        }
217
218        match admin_client.as_ref().unwrap().start_progress().await {
219            Ok(start_progress) => {
220                if start_progress == ValidatorStartProgress::Running {
221                    let admin_client = admin_client.take().unwrap();
222
223                    let validator_info = async move {
224                        let rpc_addr = admin_client.rpc_addr().await?;
225                        let start_time = admin_client.start_time().await?;
226                        Ok::<_, jsonrpc_core_client::RpcError>((rpc_addr, start_time))
227                    }
228                    .await;
229                    match validator_info {
230                        Ok((None, _)) => progress_bar.set_message("RPC service not available"),
231                        Ok((Some(rpc_addr), start_time)) => return Some((rpc_addr, start_time)),
232                        Err(err) => {
233                            progress_bar
234                                .set_message(format!("Failed to get validator info: {err}"));
235                        }
236                    }
237                } else {
238                    progress_bar.set_message(format!("Validator startup: {start_progress:?}..."));
239                }
240            }
241            Err(err) => {
242                admin_client = None;
243                progress_bar.set_message(format!("Failed to get validator start progress: {err}"));
244            }
245        }
246        thread::sleep(refresh_interval);
247    }
248}
249
250fn get_contact_info(rpc_client: &RpcClient, identity: &Pubkey) -> Option<RpcContactInfo> {
251    rpc_client
252        .get_cluster_nodes()
253        .ok()
254        .unwrap_or_default()
255        .into_iter()
256        .find(|node| node.pubkey == identity.to_string())
257}
258
259fn get_validator_stats(
260    rpc_client: &RpcClient,
261    identity: &Pubkey,
262) -> client_error::Result<(Slot, Slot, Slot, u64, Sol, String)> {
263    let finalized_slot = rpc_client.get_slot_with_commitment(CommitmentConfig::finalized())?;
264    let confirmed_slot = rpc_client.get_slot_with_commitment(CommitmentConfig::confirmed())?;
265    let processed_slot = rpc_client.get_slot_with_commitment(CommitmentConfig::processed())?;
266    let transaction_count =
267        rpc_client.get_transaction_count_with_commitment(CommitmentConfig::processed())?;
268    let identity_balance = rpc_client
269        .get_balance_with_commitment(identity, CommitmentConfig::confirmed())?
270        .value;
271
272    let health = match rpc_client.get_health() {
273        Ok(()) => "ok".to_string(),
274        Err(err) => {
275            if let client_error::ErrorKind::RpcError(request::RpcError::RpcResponseError {
276                code: _,
277                message: _,
278                data:
279                    request::RpcResponseErrorData::NodeUnhealthy {
280                        num_slots_behind: Some(num_slots_behind),
281                    },
282            }) = &err.kind
283            {
284                format!("{num_slots_behind} slots behind")
285            } else {
286                "health unknown".to_string()
287            }
288        }
289    };
290
291    Ok((
292        processed_slot,
293        confirmed_slot,
294        finalized_slot,
295        transaction_count,
296        Sol(identity_balance),
297        health,
298    ))
299}