use log::*;
use serde::{Deserialize, Serialize};
use solana_client::{client_error::Result as ClientResult, rpc_client::RpcClient};
use solana_metrics::{datapoint_error, datapoint_info};
use solana_sdk::{
clock::Slot, native_token::LAMPORTS_PER_SOL, program_utils::limited_deserialize,
pubkey::Pubkey, signature::Signature, transaction::Transaction,
};
use solana_stake_program::{stake_instruction::StakeInstruction, stake_state::Lockup};
use solana_transaction_status::{ConfirmedBlock, RpcTransactionStatusMeta, TransactionEncoding};
use std::{collections::HashMap, thread::sleep, time::Duration};
pub type PubkeyString = String;
pub type SignatureString = String;
#[derive(Serialize, Deserialize, Debug, PartialEq)]
pub enum AccountOperation {
Initialize,
Withdraw,
SplitSource,
SplitDestination,
SystemAccountEnroll,
FailedToMaintainMinimumBalance,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct AccountTransactionInfo {
pub op: AccountOperation,
pub slot: Slot,
pub signature: SignatureString,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct AccountInfo {
pub compliant_since: Option<Slot>,
pub lamports: u64,
pub transactions: Vec<AccountTransactionInfo>,
}
#[derive(Serialize, Deserialize, Default, Debug)]
pub struct AccountsInfo {
pub slot: Slot,
pub account_info: HashMap<PubkeyString, AccountInfo>,
}
impl AccountsInfo {
pub fn enroll_system_account(&mut self, account_address: &Pubkey, slot: Slot, lamports: u64) {
self.account_info.insert(
account_address.to_string(),
AccountInfo {
compliant_since: Some(slot),
lamports,
transactions: vec![AccountTransactionInfo {
op: AccountOperation::SystemAccountEnroll,
slot,
signature: Signature::default().to_string(),
}],
},
);
}
}
fn process_transaction(
slot: Slot,
transaction: &Transaction,
meta: &RpcTransactionStatusMeta,
accounts: &mut HashMap<PubkeyString, AccountInfo>,
) {
let mut last_instruction = true;
let message = &transaction.message;
let signature = transaction.signatures[0].to_string();
for instruction in message.instructions.iter().rev() {
let program_pubkey = message.account_keys[instruction.program_id_index as usize];
if program_pubkey != solana_stake_program::id() {
continue;
}
if !last_instruction {
datapoint_error!(
"stake-monitor-failure",
("slot", slot, i64),
("err", "Stake instruction ignored", String)
);
continue;
}
last_instruction = false;
match limited_deserialize::<StakeInstruction>(&instruction.data) {
Err(err) => datapoint_error!(
"stake-monitor-failure",
("slot", slot, i64),
(
"err",
format!("Failed to deserialize stake instruction: {}", err),
String
)
),
Ok(stake_instruction) => {
match stake_instruction {
StakeInstruction::Initialize(_authorized, lockup) => {
let stake_account_index = instruction.accounts[0] as usize;
let stake_pubkey = message.account_keys[stake_account_index].to_string();
let lamports = meta.post_balances[stake_account_index];
accounts.insert(
stake_pubkey,
AccountInfo {
compliant_since: if lockup != Lockup::default() {
None
} else {
Some(slot)
},
lamports,
transactions: vec![AccountTransactionInfo {
op: AccountOperation::Initialize,
slot,
signature: signature.clone(),
}],
},
);
}
StakeInstruction::Authorize(_, _)
| StakeInstruction::DelegateStake
| StakeInstruction::Deactivate => {
}
StakeInstruction::Split(lamports) => {
let source_stake_account_index = instruction.accounts[0] as usize;
let split_stake_account_index = instruction.accounts[1] as usize;
let source_stake_pubkey =
message.account_keys[source_stake_account_index].to_string();
let split_stake_pubkey =
message.account_keys[split_stake_account_index].to_string();
if let Some(mut source_account_info) =
accounts.get_mut(&source_stake_pubkey)
{
if source_account_info.compliant_since.is_some() {
source_account_info
.transactions
.push(AccountTransactionInfo {
op: AccountOperation::SplitSource,
slot,
signature: signature.clone(),
});
source_account_info.lamports -= lamports;
let split_account_info = AccountInfo {
compliant_since: source_account_info.compliant_since,
lamports,
transactions: vec![AccountTransactionInfo {
op: AccountOperation::SplitDestination,
slot,
signature: signature.clone(),
}],
};
accounts.insert(split_stake_pubkey, split_account_info);
}
}
}
StakeInstruction::Withdraw(_) => {
let stake_account_index = instruction.accounts[0] as usize;
let stake_pubkey = message.account_keys[stake_account_index].to_string();
if let Some(mut account_info) = accounts.get_mut(&stake_pubkey) {
if account_info.compliant_since.is_some() {
account_info.compliant_since = None;
account_info.transactions.push(AccountTransactionInfo {
op: AccountOperation::Withdraw,
slot,
signature: signature.clone(),
});
}
}
}
StakeInstruction::SetLockup(_lockup_args) => {
}
}
}
}
}
for (index, account_pubkey) in message.account_keys.iter().enumerate() {
if let Some(mut account_info) = accounts.get_mut(&account_pubkey.to_string()) {
let post_balance = meta.post_balances[index];
if account_info.compliant_since.is_some()
&& post_balance <= account_info.lamports.saturating_sub(LAMPORTS_PER_SOL)
{
account_info.compliant_since = None;
account_info.transactions.push(AccountTransactionInfo {
op: AccountOperation::FailedToMaintainMinimumBalance,
slot,
signature: signature.clone(),
});
}
}
}
}
fn process_confirmed_block(
slot: Slot,
confirmed_block: ConfirmedBlock,
accounts: &mut HashMap<PubkeyString, AccountInfo>,
) {
for rpc_transaction in confirmed_block.transactions {
match rpc_transaction.meta {
None => {
datapoint_error!(
"stake-monitor-failure",
("slot", slot, i64),
("err", "Transaction meta not available", String)
);
}
Some(meta) => {
if meta.err.is_none() {
if let Some(transaction) = rpc_transaction.transaction.decode() {
if transaction.verify().is_ok() {
process_transaction(slot, &transaction, &meta, accounts);
} else {
datapoint_error!(
"stake-monitor-failure",
("slot", slot, i64),
("err", "Transaction signature verification failed", String)
);
}
}
}
}
}
}
}
fn load_blocks(
rpc_client: &RpcClient,
start_slot: Slot,
end_slot: Slot,
) -> ClientResult<Vec<(Slot, ConfirmedBlock)>> {
info!(
"Loading confirmed blocks between slots: {} - {}",
start_slot, end_slot
);
let slots = rpc_client.get_confirmed_blocks(start_slot, Some(end_slot))?;
let mut blocks = vec![];
for slot in slots.into_iter() {
let block =
rpc_client.get_confirmed_block_with_encoding(slot, TransactionEncoding::Binary)?;
blocks.push((slot, block));
}
Ok(blocks)
}
pub fn process_slots(rpc_client: &RpcClient, accounts_info: &mut AccountsInfo, batch_size: u64) {
let end_slot = accounts_info.slot + batch_size;
loop {
let start_slot = accounts_info.slot + 1;
info!("start_slot:{} - end_slot:{}", start_slot, end_slot);
if start_slot >= end_slot {
break;
}
let latest_available_slot = rpc_client.get_slot().unwrap_or_else(|err| {
datapoint_error!(
"stake-monitor-failure",
("err", format!("get_slot() failed: {}", err), String)
);
0
});
if accounts_info.slot >= latest_available_slot {
info!("Waiting for a slot greater than {}...", accounts_info.slot);
sleep(Duration::from_secs(5));
continue;
}
match load_blocks(&rpc_client, start_slot, end_slot) {
Ok(blocks) => {
info!("Loaded {} blocks", blocks.len());
if blocks.is_empty() && end_slot < latest_available_slot {
accounts_info.slot = end_slot;
} else {
for (slot, block) in blocks.into_iter() {
process_confirmed_block(slot, block, &mut accounts_info.account_info);
accounts_info.slot = slot;
}
}
datapoint_info!("stake-monitor-slot", ("slot", accounts_info.slot, i64));
}
Err(err) => {
datapoint_error!(
"stake-monitor-failure",
(
"err",
format!(
"failed to get blocks in range ({},{}): {}",
start_slot, end_slot, err
),
String
)
);
sleep(Duration::from_secs(1));
}
}
}
}