agave_validator/
dashboard.rs

1use {
2    crate::{
3        admin_rpc_service, format_name_value, new_spinner_progress_bar, println_name_value,
4        ProgressBar,
5    },
6    console::style,
7    solana_clock::Slot,
8    solana_commitment_config::CommitmentConfig,
9    solana_core::validator::ValidatorStartProgress,
10    solana_native_token::Sol,
11    solana_pubkey::Pubkey,
12    solana_rpc_client::rpc_client::RpcClient,
13    solana_rpc_client_api::{client_error, request, response::RpcContactInfo},
14    solana_validator_exit::Exit,
15    std::{
16        net::SocketAddr,
17        path::{Path, PathBuf},
18        sync::{
19            atomic::{AtomicBool, Ordering},
20            Arc,
21        },
22        thread,
23        time::{Duration, SystemTime},
24    },
25};
26
27pub struct Dashboard {
28    progress_bar: ProgressBar,
29    ledger_path: PathBuf,
30    exit: Arc<AtomicBool>,
31}
32
33impl Dashboard {
34    pub fn new(
35        ledger_path: &Path,
36        log_path: Option<&Path>,
37        validator_exit: Option<&mut Exit>,
38    ) -> Self {
39        println_name_value("Ledger location:", &format!("{}", ledger_path.display()));
40        if let Some(log_path) = log_path {
41            println_name_value("Log:", &format!("{}", log_path.display()));
42        }
43
44        let progress_bar = new_spinner_progress_bar();
45        progress_bar.set_message("Initializing...");
46
47        let exit = Arc::new(AtomicBool::new(false));
48        if let Some(validator_exit) = validator_exit {
49            let exit = exit.clone();
50            validator_exit.register_exit(Box::new(move || exit.store(true, Ordering::Relaxed)));
51        }
52
53        Self {
54            exit,
55            ledger_path: ledger_path.to_path_buf(),
56            progress_bar,
57        }
58    }
59
60    pub fn run(self, refresh_interval: Duration) {
61        let Self {
62            exit,
63            ledger_path,
64            progress_bar,
65            ..
66        } = self;
67        drop(progress_bar);
68
69        let runtime = admin_rpc_service::runtime();
70        while !exit.load(Ordering::Relaxed) {
71            let progress_bar = new_spinner_progress_bar();
72            progress_bar.set_message("Connecting...");
73
74            let Some((rpc_addr, start_time)) = runtime.block_on(wait_for_validator_startup(
75                &ledger_path,
76                &exit,
77                progress_bar,
78                refresh_interval,
79            )) else {
80                continue;
81            };
82
83            let rpc_client = RpcClient::new_socket(rpc_addr);
84            let mut identity = match rpc_client.get_identity() {
85                Ok(identity) => identity,
86                Err(err) => {
87                    println!("Failed to get validator identity over RPC: {err}");
88                    continue;
89                }
90            };
91            println_name_value("Identity:", &identity.to_string());
92
93            if let Ok(genesis_hash) = rpc_client.get_genesis_hash() {
94                println_name_value("Genesis Hash:", &genesis_hash.to_string());
95            }
96
97            if let Some(contact_info) = get_contact_info(&rpc_client, &identity) {
98                println_name_value(
99                    "Version:",
100                    &contact_info.version.unwrap_or_else(|| "?".to_string()),
101                );
102                if let Some(shred_version) = contact_info.shred_version {
103                    println_name_value("Shred Version:", &shred_version.to_string());
104                }
105                if let Some(gossip) = contact_info.gossip {
106                    println_name_value("Gossip Address:", &gossip.to_string());
107                }
108                if let Some(tpu) = contact_info.tpu {
109                    println_name_value("TPU Address:", &tpu.to_string());
110                }
111                if let Some(rpc) = contact_info.rpc {
112                    println_name_value("JSON RPC URL:", &format!("http://{rpc}"));
113                }
114                if let Some(pubsub) = contact_info.pubsub {
115                    println_name_value("WebSocket PubSub URL:", &format!("ws://{pubsub}"));
116                }
117            }
118
119            let progress_bar = new_spinner_progress_bar();
120            let mut snapshot_slot_info = None;
121            for i in 0.. {
122                if exit.load(Ordering::Relaxed) {
123                    break;
124                }
125                if i % 10 == 0 {
126                    snapshot_slot_info = rpc_client.get_highest_snapshot_slot().ok();
127                }
128
129                let new_identity = rpc_client.get_identity().unwrap_or(identity);
130                if identity != new_identity {
131                    identity = new_identity;
132                    progress_bar.println(format_name_value("Identity:", &identity.to_string()));
133                }
134
135                match get_validator_stats(&rpc_client, &identity) {
136                    Ok((
137                        processed_slot,
138                        confirmed_slot,
139                        finalized_slot,
140                        transaction_count,
141                        identity_balance,
142                        health,
143                    )) => {
144                        let uptime = {
145                            let uptime =
146                                chrono::Duration::from_std(start_time.elapsed().unwrap()).unwrap();
147
148                            format!(
149                                "{:02}:{:02}:{:02} ",
150                                uptime.num_hours(),
151                                uptime.num_minutes() % 60,
152                                uptime.num_seconds() % 60
153                            )
154                        };
155
156                        progress_bar.set_message(format!(
157                            "{}{}| Processed Slot: {} | Confirmed Slot: {} | Finalized Slot: {} | \
158                             Full Snapshot Slot: {} | Incremental Snapshot Slot: {} | \
159                             Transactions: {} | {}",
160                            uptime,
161                            if health == "ok" {
162                                "".to_string()
163                            } else {
164                                format!("| {} ", style(health).bold().red())
165                            },
166                            processed_slot,
167                            confirmed_slot,
168                            finalized_slot,
169                            snapshot_slot_info
170                                .as_ref()
171                                .map(|snapshot_slot_info| snapshot_slot_info.full.to_string())
172                                .unwrap_or_else(|| '-'.to_string()),
173                            snapshot_slot_info
174                                .as_ref()
175                                .and_then(|snapshot_slot_info| snapshot_slot_info
176                                    .incremental
177                                    .map(|incremental| incremental.to_string()))
178                                .unwrap_or_else(|| '-'.to_string()),
179                            transaction_count,
180                            identity_balance
181                        ));
182                        thread::sleep(refresh_interval);
183                    }
184                    Err(err) => {
185                        progress_bar.abandon_with_message(format!("RPC connection failure: {err}"));
186                        break;
187                    }
188                }
189            }
190        }
191    }
192}
193
194async fn wait_for_validator_startup(
195    ledger_path: &Path,
196    exit: &AtomicBool,
197    progress_bar: ProgressBar,
198    refresh_interval: Duration,
199) -> Option<(SocketAddr, SystemTime)> {
200    let mut admin_client = None;
201    loop {
202        if exit.load(Ordering::Relaxed) {
203            return None;
204        }
205
206        if admin_client.is_none() {
207            match admin_rpc_service::connect(ledger_path).await {
208                Ok(new_admin_client) => admin_client = Some(new_admin_client),
209                Err(err) => {
210                    progress_bar.set_message(format!("Unable to connect to validator: {err}"));
211                    thread::sleep(refresh_interval);
212                    continue;
213                }
214            }
215        }
216
217        match admin_client.as_ref().unwrap().start_progress().await {
218            Ok(start_progress) => {
219                if start_progress == ValidatorStartProgress::Running {
220                    let admin_client = admin_client.take().unwrap();
221
222                    let validator_info = async move {
223                        let rpc_addr = admin_client.rpc_addr().await?;
224                        let start_time = admin_client.start_time().await?;
225                        Ok::<_, jsonrpc_core_client::RpcError>((rpc_addr, start_time))
226                    }
227                    .await;
228                    match validator_info {
229                        Ok((None, _)) => progress_bar.set_message("RPC service not available"),
230                        Ok((Some(rpc_addr), start_time)) => return Some((rpc_addr, start_time)),
231                        Err(err) => {
232                            progress_bar
233                                .set_message(format!("Failed to get validator info: {err}"));
234                        }
235                    }
236                } else {
237                    progress_bar.set_message(format!("Validator startup: {start_progress:?}..."));
238                }
239            }
240            Err(err) => {
241                admin_client = None;
242                progress_bar.set_message(format!("Failed to get validator start progress: {err}"));
243            }
244        }
245        thread::sleep(refresh_interval);
246    }
247}
248
249fn get_contact_info(rpc_client: &RpcClient, identity: &Pubkey) -> Option<RpcContactInfo> {
250    rpc_client
251        .get_cluster_nodes()
252        .ok()
253        .unwrap_or_default()
254        .into_iter()
255        .find(|node| node.pubkey == identity.to_string())
256}
257
258fn get_validator_stats(
259    rpc_client: &RpcClient,
260    identity: &Pubkey,
261) -> client_error::Result<(Slot, Slot, Slot, u64, Sol, String)> {
262    let finalized_slot = rpc_client.get_slot_with_commitment(CommitmentConfig::finalized())?;
263    let confirmed_slot = rpc_client.get_slot_with_commitment(CommitmentConfig::confirmed())?;
264    let processed_slot = rpc_client.get_slot_with_commitment(CommitmentConfig::processed())?;
265    let transaction_count =
266        rpc_client.get_transaction_count_with_commitment(CommitmentConfig::processed())?;
267    let identity_balance = rpc_client
268        .get_balance_with_commitment(identity, CommitmentConfig::confirmed())?
269        .value;
270
271    let health = match rpc_client.get_health() {
272        Ok(()) => "ok".to_string(),
273        Err(err) => {
274            if let client_error::ErrorKind::RpcError(request::RpcError::RpcResponseError {
275                code: _,
276                message: _,
277                data:
278                    request::RpcResponseErrorData::NodeUnhealthy {
279                        num_slots_behind: Some(num_slots_behind),
280                    },
281            }) = err.kind()
282            {
283                format!("{num_slots_behind} slots behind")
284            } else {
285                "health unknown".to_string()
286            }
287        }
288    };
289
290    Ok((
291        processed_slot,
292        confirmed_slot,
293        finalized_slot,
294        transaction_count,
295        Sol(identity_balance),
296        health,
297    ))
298}