use {
crate::{
admin_rpc_service,
commands::{FromClapArgMatches, Result},
new_spinner_progress_bar, println_name_value,
},
clap::{App, Arg, ArgMatches, SubCommand, value_t_or_exit},
console::style,
jsonrpc_core::ErrorCode,
jsonrpc_core_client::RpcError,
solana_clap_utils::{
input_parsers::pubkey_of,
input_validators::{is_parsable, is_pubkey_or_keypair, is_valid_percentage},
},
solana_clock::{DEFAULT_S_PER_SLOT, Slot},
solana_commitment_config::CommitmentConfig,
solana_pubkey::Pubkey,
solana_rpc_client::rpc_client::RpcClient,
solana_rpc_client_api::config::RpcLeaderScheduleConfig,
std::{
collections::VecDeque,
path::Path,
time::{Duration, SystemTime},
},
};
const COMMAND: &str = "wait-for-restart-window";
const DEFAULT_MIN_IDLE_TIME: &str = "10";
const DEFAULT_MAX_DELINQUENT_STAKE: &str = "5";
#[derive(Debug, PartialEq)]
pub struct WaitForRestartWindowArgs {
pub min_idle_time: usize,
pub identity: Option<Pubkey>,
pub max_delinquent_stake: u8,
pub skip_new_snapshot_check: bool,
pub skip_health_check: bool,
}
impl FromClapArgMatches for WaitForRestartWindowArgs {
fn from_clap_arg_match(matches: &ArgMatches) -> Result<Self> {
Ok(WaitForRestartWindowArgs {
min_idle_time: value_t_or_exit!(matches, "min_idle_time", usize),
identity: pubkey_of(matches, "identity"),
max_delinquent_stake: value_t_or_exit!(matches, "max_delinquent_stake", u8),
skip_new_snapshot_check: matches.is_present("skip_new_snapshot_check"),
skip_health_check: matches.is_present("skip_health_check"),
})
}
}
pub(crate) fn command<'a>() -> App<'a, 'a> {
SubCommand::with_name(COMMAND)
.about("Monitor the validator for a good time to restart")
.arg(
Arg::with_name("min_idle_time")
.long("min-idle-time")
.takes_value(true)
.validator(is_parsable::<usize>)
.value_name("MINUTES")
.default_value(DEFAULT_MIN_IDLE_TIME)
.help("Minimum time that the validator should not be leader before restarting"),
)
.arg(
Arg::with_name("identity")
.long("identity")
.value_name("ADDRESS")
.takes_value(true)
.validator(is_pubkey_or_keypair)
.help("Validator identity to monitor [default: your validator]"),
)
.arg(
Arg::with_name("max_delinquent_stake")
.long("max-delinquent-stake")
.takes_value(true)
.validator(is_valid_percentage)
.value_name("PERCENT")
.default_value(DEFAULT_MAX_DELINQUENT_STAKE)
.help("The maximum delinquent stake % permitted for a restart"),
)
.arg(
Arg::with_name("skip_new_snapshot_check")
.long("skip-new-snapshot-check")
.help("Skip check for a new snapshot"),
)
.arg(
Arg::with_name("skip_health_check")
.long("skip-health-check")
.help("Skip health check"),
)
.after_help(
"Note: If this command exits with a non-zero status then this not a good time for a \
restart",
)
}
pub fn execute(matches: &ArgMatches, ledger_path: &Path) -> Result<()> {
let wait_for_restart_window_args = WaitForRestartWindowArgs::from_clap_arg_match(matches)?;
wait_for_restart_window(
ledger_path,
wait_for_restart_window_args.identity,
wait_for_restart_window_args.min_idle_time,
wait_for_restart_window_args.max_delinquent_stake,
wait_for_restart_window_args.skip_new_snapshot_check,
wait_for_restart_window_args.skip_health_check,
)?;
Ok(())
}
fn should_skip_snapshot_check(
skip_new_snapshot_check_arg: bool,
is_generating_snapshots: Option<bool>,
) -> bool {
if !skip_new_snapshot_check_arg {
match is_generating_snapshots {
Some(false) => {
println!("Validator is not generating snapshots. Skipping new snapshot check...");
true
}
Some(true) => false,
None => {
println!(
"Validator doesn't support isGeneratingSnapshots RPC method, Assuming \
snapshots are being generated and leaving snapshot check enabled..."
);
false
}
}
} else {
skip_new_snapshot_check_arg
}
}
pub fn wait_for_restart_window(
ledger_path: &Path,
identity: Option<Pubkey>,
min_idle_time_in_minutes: usize,
max_delinquency_percentage: u8,
skip_new_snapshot_check: bool,
skip_health_check: bool,
) -> std::result::Result<(), Box<dyn std::error::Error>> {
let sleep_interval = Duration::from_secs(5);
let min_idle_slots = (min_idle_time_in_minutes as f64 * 60. / DEFAULT_S_PER_SLOT) as Slot;
let is_generating_snapshots_result = admin_rpc_service::runtime().block_on(async {
let admin_client = admin_rpc_service::connect(ledger_path).await?;
admin_client.is_generating_snapshots().await
});
let is_generating_snapshots = match is_generating_snapshots_result {
Ok(val) => Some(val),
Err(RpcError::JsonRpcError(ref e)) if e.code == ErrorCode::MethodNotFound => None,
Err(err) => {
return Err(
format!("Failed to check if validator is generating snapshots: {err}").into(),
);
}
};
let skip_new_snapshot_check =
should_skip_snapshot_check(skip_new_snapshot_check, is_generating_snapshots);
let admin_client = admin_rpc_service::connect(ledger_path);
let rpc_addr = admin_rpc_service::runtime()
.block_on(async move { admin_client.await?.rpc_addr().await })
.map_err(|err| format!("validator RPC address request failed: {err}"))?
.ok_or("validator RPC is unavailable".to_string())?;
let rpc_client = RpcClient::new_socket(rpc_addr);
let my_identity = rpc_client.get_identity()?;
let identity = identity.unwrap_or(my_identity);
let monitoring_another_validator = identity != my_identity;
println_name_value("Identity:", &identity.to_string());
println_name_value(
"Minimum Idle Time:",
&format!("{min_idle_slots} slots (~{min_idle_time_in_minutes} minutes)"),
);
println!("Maximum permitted delinquency: {max_delinquency_percentage}%");
let mut current_epoch = None;
let mut leader_schedule = VecDeque::new();
let mut restart_snapshot = None;
let mut upcoming_idle_windows = vec![];
let progress_bar = new_spinner_progress_bar();
let monitor_start_time = SystemTime::now();
let mut seen_incremential_snapshot = false;
loop {
let snapshot_slot_info = rpc_client.get_highest_snapshot_slot().ok();
let snapshot_slot_info_has_incremential = snapshot_slot_info
.as_ref()
.map(|snapshot_slot_info| snapshot_slot_info.incremental.is_some())
.unwrap_or_default();
seen_incremential_snapshot |= snapshot_slot_info_has_incremential;
let epoch_info = rpc_client.get_epoch_info_with_commitment(CommitmentConfig::processed())?;
let healthy = skip_health_check || rpc_client.get_health().ok().is_some();
let delinquent_stake_percentage = {
let vote_accounts = rpc_client.get_vote_accounts()?;
let current_stake: u64 = vote_accounts
.current
.iter()
.map(|va| va.activated_stake)
.sum();
let delinquent_stake: u64 = vote_accounts
.delinquent
.iter()
.map(|va| va.activated_stake)
.sum();
let total_stake = current_stake + delinquent_stake;
delinquent_stake as f64 / total_stake as f64
};
if match current_epoch {
None => true,
Some(current_epoch) => current_epoch != epoch_info.epoch,
} {
progress_bar.set_message(format!(
"Fetching leader schedule for epoch {}...",
epoch_info.epoch
));
let first_slot_in_epoch = epoch_info.absolute_slot - epoch_info.slot_index;
leader_schedule = rpc_client
.get_leader_schedule_with_config(
Some(first_slot_in_epoch),
RpcLeaderScheduleConfig {
identity: Some(identity.to_string()),
..RpcLeaderScheduleConfig::default()
},
)?
.ok_or_else(|| {
format!("Unable to get leader schedule from slot {first_slot_in_epoch}")
})?
.get(&identity.to_string())
.cloned()
.unwrap_or_default()
.into_iter()
.map(|slot_index| first_slot_in_epoch.saturating_add(slot_index as u64))
.filter(|slot| *slot > epoch_info.absolute_slot)
.collect::<VecDeque<_>>();
upcoming_idle_windows.clear();
{
let has_leader_slots = !leader_schedule.is_empty();
let mut leader_schedule = leader_schedule.clone();
let mut max_idle_window = 0;
let mut idle_window_start_slot = epoch_info.absolute_slot;
while let Some(next_leader_slot) = leader_schedule.pop_front() {
let idle_window = next_leader_slot - idle_window_start_slot;
max_idle_window = max_idle_window.max(idle_window);
if idle_window > min_idle_slots {
upcoming_idle_windows.push((idle_window_start_slot, idle_window));
}
idle_window_start_slot = next_leader_slot;
}
if has_leader_slots && upcoming_idle_windows.is_empty() {
return Err(format!(
"Validator has no idle window of at least {min_idle_slots} slots. Largest \
idle window for epoch {} is {max_idle_window} slots",
epoch_info.epoch,
)
.into());
}
}
current_epoch = Some(epoch_info.epoch);
}
let status = {
if !healthy {
style("Node is unhealthy").red().to_string()
} else {
let in_leader_schedule_hole = if epoch_info.slot_index + min_idle_slots
> epoch_info.slots_in_epoch
{
Err("Current epoch is almost complete".to_string())
} else {
while leader_schedule
.front()
.map(|slot| *slot < epoch_info.absolute_slot)
.unwrap_or(false)
{
leader_schedule.pop_front();
}
while upcoming_idle_windows
.first()
.map(|(slot, _)| *slot < epoch_info.absolute_slot)
.unwrap_or(false)
{
upcoming_idle_windows.pop();
}
match leader_schedule.front() {
None => {
Ok(()) }
Some(next_leader_slot) => {
let idle_slots =
next_leader_slot.saturating_sub(epoch_info.absolute_slot);
if idle_slots >= min_idle_slots {
Ok(())
} else {
Err(match upcoming_idle_windows.first() {
Some((starting_slot, length_in_slots)) => {
format!(
"Next idle window in {} slots, for {} slots",
starting_slot.saturating_sub(epoch_info.absolute_slot),
length_in_slots
)
}
None => format!(
"Validator will be leader soon. Next leader slot is \
{next_leader_slot}"
),
})
}
}
}
};
match in_leader_schedule_hole {
Ok(_) => {
if skip_new_snapshot_check {
break; }
let snapshot_slot = snapshot_slot_info.map(|snapshot_slot_info| {
snapshot_slot_info
.incremental
.unwrap_or(snapshot_slot_info.full)
});
if restart_snapshot.is_none() {
restart_snapshot = snapshot_slot;
}
if restart_snapshot == snapshot_slot && !monitoring_another_validator {
"Waiting for a new snapshot".to_string()
} else if delinquent_stake_percentage
>= (max_delinquency_percentage as f64 / 100.)
{
style("Delinquency too high").red().to_string()
} else if seen_incremential_snapshot && !snapshot_slot_info_has_incremential
{
"Waiting for incremental snapshot".to_string()
} else {
break; }
}
Err(why) => style(why).yellow().to_string(),
}
}
};
progress_bar.set_message(format!(
"{} | Processed Slot: {} {} | {:.2}% delinquent stake | {}",
{
let elapsed =
chrono::Duration::from_std(monitor_start_time.elapsed().unwrap()).unwrap();
format!(
"{:02}:{:02}:{:02}",
elapsed.num_hours(),
elapsed.num_minutes() % 60,
elapsed.num_seconds() % 60
)
},
epoch_info.absolute_slot,
if monitoring_another_validator {
"".to_string()
} else {
format!(
"| Full Snapshot Slot: {} | Incremental Snapshot Slot: {}",
snapshot_slot_info
.as_ref()
.map(|snapshot_slot_info| snapshot_slot_info.full.to_string())
.unwrap_or_else(|| '-'.to_string()),
snapshot_slot_info
.as_ref()
.and_then(|snapshot_slot_info| snapshot_slot_info
.incremental
.map(|incremental| incremental.to_string()))
.unwrap_or_else(|| '-'.to_string()),
)
},
delinquent_stake_percentage * 100.,
status
));
std::thread::sleep(sleep_interval);
}
drop(progress_bar);
println!("{}", style("Ready to restart").green());
Ok(())
}
#[cfg(test)]
mod tests {
use {super::*, crate::commands::tests::verify_args_struct_by_command, std::str::FromStr};
impl Default for WaitForRestartWindowArgs {
fn default() -> Self {
WaitForRestartWindowArgs {
min_idle_time: DEFAULT_MIN_IDLE_TIME
.parse()
.expect("invalid DEFAULT_MIN_IDLE_TIME"),
identity: None,
max_delinquent_stake: DEFAULT_MAX_DELINQUENT_STAKE
.parse()
.expect("invalid DEFAULT_MAX_DELINQUENT_STAKE"),
skip_new_snapshot_check: false,
skip_health_check: false,
}
}
}
#[test]
fn verify_args_struct_by_command_wait_for_restart_window_default() {
verify_args_struct_by_command(
command(),
vec![COMMAND],
WaitForRestartWindowArgs::default(),
);
}
#[test]
fn verify_args_struct_by_command_wait_for_restart_window_skip_new_snapshot_check() {
verify_args_struct_by_command(
command(),
vec![COMMAND, "--skip-new-snapshot-check"],
WaitForRestartWindowArgs {
skip_new_snapshot_check: true,
..WaitForRestartWindowArgs::default()
},
);
}
#[test]
fn verify_args_struct_by_command_wait_for_restart_window_skip_health_check() {
verify_args_struct_by_command(
command(),
vec![COMMAND, "--skip-health-check"],
WaitForRestartWindowArgs {
skip_health_check: true,
..WaitForRestartWindowArgs::default()
},
);
}
#[test]
fn verify_args_struct_by_command_wait_for_restart_window_min_idle_time() {
verify_args_struct_by_command(
command(),
vec![COMMAND, "--min-idle-time", "60"],
WaitForRestartWindowArgs {
min_idle_time: 60,
..WaitForRestartWindowArgs::default()
},
);
}
#[test]
fn verify_args_struct_by_command_wait_for_restart_window_identity() {
verify_args_struct_by_command(
command(),
vec![
COMMAND,
"--identity",
"ch1do11111111111111111111111111111111111111",
],
WaitForRestartWindowArgs {
identity: Some(
Pubkey::from_str("ch1do11111111111111111111111111111111111111").unwrap(),
),
..WaitForRestartWindowArgs::default()
},
);
}
#[test]
fn verify_args_struct_by_command_wait_for_restart_window_max_delinquent_stake() {
verify_args_struct_by_command(
command(),
vec![COMMAND, "--max-delinquent-stake", "10"],
WaitForRestartWindowArgs {
max_delinquent_stake: 10,
..WaitForRestartWindowArgs::default()
},
);
}
#[test]
fn test_should_skip_snapshot_check_with_arg_true() {
assert!(should_skip_snapshot_check(true, Some(false)));
assert!(should_skip_snapshot_check(true, Some(true)));
assert!(should_skip_snapshot_check(true, None));
assert!(should_skip_snapshot_check(false, Some(false)));
assert!(!should_skip_snapshot_check(false, Some(true)));
assert!(!should_skip_snapshot_check(false, None));
}
}