buffett-core 0.1.1

Core library for Bitconch:buffett
Documentation
use bincode::deserialize;
use bincode::serialize;
use crate::budget_program::BudgetState;
use crate::budget_transaction::BudgetTransaction;
use buffett_metrics::counter::Counter;
use crate::dynamic_program::DynamicProgram;
use crate::entry::Entry;
use buffett_crypto::hash::{hash, Hash};
use itertools::Itertools;
use crate::ledger::Block;
use log::Level;
use crate::coinery::Mint;
use buffett_budget::payment::Payment;
use buffett_crypto::signature::{Keypair, Signature};
use buffett_interface::account::{Account, KeyedAccount};
use buffett_interface::pubkey::Pubkey;
use std;
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::result;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::RwLock;
use std::time::Instant;
use crate::storage_program::StorageProgram;
use crate::system_program::SystemProgram;
use crate::system_transaction::SystemTransaction;
use crate::tictactoe_dashboard_program::TicTacToeDashboardProgram;
use crate::tictactoe_program::TicTacToeProgram;
use buffett_timing::timing::{duration_in_microseconds, timestamp};
use crate::transaction::Transaction;
use crate::window::WINDOW_SIZE;
use buffett_metrics::sub_new_counter_info;

pub const MAX_ENTRY_IDS: usize = 1024 * 16;

pub const VERIFY_BLOCK_SIZE: usize = 16;


#[derive(Debug, PartialEq, Eq, Clone)]
pub enum BankError {
    
    AccountNotFound,

    
    InsufficientFundsForFee,


    DuplicateSignature,


    LastIdNotFound,


    SignatureNotFound,

    
    LedgerVerificationFailed,
    
    UnbalancedTransaction,
    
    ResultWithNegativeTokens,

    UnknownContractId,

    ModifiedContractId,

    ExternalAccountTokenSpend,

    ProgramRuntimeError,
}

pub type Result<T> = result::Result<T, BankError>;
type SignatureStatusMap = HashMap<Signature, Result<()>>;

#[derive(Default)]
struct ErrorCounters {
    account_not_found_validator: usize,
    account_not_found_leader: usize,
    account_not_found_vote: usize,
}


pub struct Bank {
    
    accounts: RwLock<HashMap<Pubkey, Account>>,

    
    last_ids: RwLock<VecDeque<Hash>>,

    
    last_ids_sigs: RwLock<HashMap<Hash, (SignatureStatusMap, u64)>>,

    
    transaction_count: AtomicUsize,

    
    pub is_leader: bool,

    
    finality_time: AtomicUsize,

    
    loaded_contracts: RwLock<HashMap<Pubkey, DynamicProgram>>,
}

impl Default for Bank {
    fn default() -> Self {
        Bank {
            accounts: RwLock::new(HashMap::new()),
            last_ids: RwLock::new(VecDeque::new()),
            last_ids_sigs: RwLock::new(HashMap::new()),
            transaction_count: AtomicUsize::new(0),
            is_leader: true,
            finality_time: AtomicUsize::new(std::usize::MAX),
            loaded_contracts: RwLock::new(HashMap::new()),
        }
    }
}

impl Bank {
    
    pub fn new_default(is_leader: bool) -> Self {
        let mut bank = Bank::default();
        bank.is_leader = is_leader;
        bank
    }
    
    pub fn new_from_deposit(deposit: &Payment) -> Self {
        let bank = Self::default();
        {
            let mut accounts = bank.accounts.write().unwrap();
            let account = accounts.entry(deposit.to).or_insert_with(Account::default);
            Self::apply_payment(deposit, account);
        }
        bank
    }

    
    pub fn new(mint: &Mint) -> Self {
        let deposit = Payment {
            to: mint.pubkey(),
            balance: mint.tokens,
        };
        let bank = Self::new_from_deposit(&deposit);
        bank.register_entry_id(&mint.last_id());
        bank
    }

    
    fn apply_payment(payment: &Payment, account: &mut Account) {
        trace!("apply payments {}", payment.balance);
        account.tokens += payment.balance;
    }

    
    pub fn last_id(&self) -> Hash {
        let last_ids = self.last_ids.read().expect("'last_ids' read lock");
        let last_item = last_ids
            .iter()
            .last()
            .expect("get last item from 'last_ids' list");
        *last_item
    }

    
    fn reserve_signature(signatures: &mut SignatureStatusMap, signature: &Signature) -> Result<()> {
        if let Some(_result) = signatures.get(signature) {
            return Err(BankError::DuplicateSignature);
        }
        signatures.insert(*signature, Ok(()));
        Ok(())
    }

    
    pub fn clear_signatures(&self) {
        for (_, sigs) in self.last_ids_sigs.write().unwrap().iter_mut() {
            sigs.0.clear();
        }
    }

    fn reserve_signature_with_last_id(&self, signature: &Signature, last_id: &Hash) -> Result<()> {
        if let Some(entry) = self
            .last_ids_sigs
            .write()
            .expect("'last_ids' read lock in reserve_signature_with_last_id")
            .get_mut(last_id)
        {
            return Self::reserve_signature(&mut entry.0, signature);
        }
        Err(BankError::LastIdNotFound)
    }

    fn update_signature_status(
        signatures: &mut SignatureStatusMap,
        signature: &Signature,
        result: &Result<()>,
    ) {
        let entry = signatures.entry(*signature).or_insert(Ok(()));
        *entry = result.clone();
    }

    fn update_signature_status_with_last_id(
        &self,
        signature: &Signature,
        result: &Result<()>,
        last_id: &Hash,
    ) {
        if let Some(entry) = self.last_ids_sigs.write().unwrap().get_mut(last_id) {
            Self::update_signature_status(&mut entry.0, signature, result);
        }
    }

    fn update_transaction_statuses(&self, txs: &[Transaction], res: &[Result<()>]) {
        for (i, tx) in txs.iter().enumerate() {
            self.update_signature_status_with_last_id(&tx.signature, &res[i], &tx.last_id);
        }
    }

    
    pub fn count_valid_ids(&self, ids: &[Hash]) -> Vec<(usize, u64)> {
        let last_ids = self.last_ids_sigs.read().unwrap();
        let mut ret = Vec::new();
        for (i, id) in ids.iter().enumerate() {
            if let Some(entry) = last_ids.get(id) {
                ret.push((i, entry.1));
            }
        }
        ret
    }

    
    pub fn register_entry_id(&self, last_id: &Hash) {
        let mut last_ids = self
            .last_ids
            .write()
            .expect("'last_ids' write lock in register_entry_id");
        let mut last_ids_sigs = self
            .last_ids_sigs
            .write()
            .expect("last_ids_sigs write lock");
        if last_ids.len() >= MAX_ENTRY_IDS {
            let id = last_ids.pop_front().unwrap();
            last_ids_sigs.remove(&id);
        }
        last_ids_sigs.insert(*last_id, (HashMap::new(), timestamp()));
        last_ids.push_back(*last_id);
    }

    
    pub fn process_transaction(&self, tx: &Transaction) -> Result<()> {
        match self.process_transactions(&[tx.clone()])[0] {
            Err(ref e) => {
                info!("a transaction error happened in tx_vault: {:?}", e);
                Err((*e).clone())
            }
            Ok(_) => Ok(()),
        }
    }

    fn load_account(
        &self,
        tx: &Transaction,
        accounts: &HashMap<Pubkey, Account>,
        error_counters: &mut ErrorCounters,
    ) -> Result<Vec<Account>> {
        
        if accounts.get(&tx.keys[0]).is_none() {
            if !self.is_leader {
                error_counters.account_not_found_validator += 1;
            } else {
                error_counters.account_not_found_leader += 1;
            }
            if BudgetState::check_id(&tx.program_id) {
                use buffett_budget::instruction::Instruction;
                if let Some(Instruction::NewVote(_vote)) = tx.instruction() {
                    error_counters.account_not_found_vote += 1;
                }
            }
            Err(BankError::AccountNotFound)
        } else if accounts.get(&tx.keys[0]).unwrap().tokens < tx.fee {
            Err(BankError::InsufficientFundsForFee)
        } else {
            let mut called_accounts: Vec<Account> = tx
                .keys
                .iter()
                .map(|key| accounts.get(key).cloned().unwrap_or_default())
                .collect();
            
            self.reserve_signature_with_last_id(&tx.signature, &tx.last_id)?;
            called_accounts[0].tokens -= tx.fee;
            Ok(called_accounts)
        }
    }

    fn load_accounts(
        &self,
        txs: &[Transaction],
        accounts: &HashMap<Pubkey, Account>,
        error_counters: &mut ErrorCounters,
    ) -> Vec<Result<Vec<Account>>> {
        txs.iter()
            .map(|tx| self.load_account(tx, accounts, error_counters))
            .collect()
    }

    pub fn verify_transaction(
        tx: &Transaction,
        pre_program_id: &Pubkey,
        pre_tokens: i64,
        account: &Account,
    ) -> Result<()> {
        
        if !((*pre_program_id == account.program_id)
            || (SystemProgram::check_id(&tx.program_id)
                && SystemProgram::check_id(&pre_program_id)))
        {
            
            return Err(BankError::ModifiedContractId);
        }
        
        if tx.program_id != account.program_id && pre_tokens > account.tokens {
            return Err(BankError::ExternalAccountTokenSpend);
        }
        if account.tokens < 0 {
            return Err(BankError::ResultWithNegativeTokens);
        }
        Ok(())
    }

    fn loaded_contract(&self, tx: &Transaction, accounts: &mut [Account]) -> bool {
        let loaded_contracts = self.loaded_contracts.write().unwrap();
        match loaded_contracts.get(&tx.program_id) {
            Some(dc) => {
                let mut infos: Vec<_> = (&tx.keys)
                    .into_iter()
                    .zip(accounts)
                    .map(|(key, account)| KeyedAccount { key, account })
                    .collect();

                dc.call(&mut infos, &tx.userdata);
                true
            }
            None => false,
        }
    }

    
    fn execute_transaction(&self, tx: &Transaction, accounts: &mut [Account]) -> Result<()> {
        let pre_total: i64 = accounts.iter().map(|a| a.tokens).sum();
        let pre_data: Vec<_> = accounts
            .iter_mut()
            .map(|a| (a.program_id, a.tokens))
            .collect();

        
        if SystemProgram::check_id(&tx.program_id) {
            SystemProgram::process_transaction(&tx, accounts, &self.loaded_contracts)
        } else if BudgetState::check_id(&tx.program_id) {
            
            if BudgetState::process_transaction(&tx, accounts).is_err() {
                return Err(BankError::ProgramRuntimeError);
            }
        } else if StorageProgram::check_id(&tx.program_id) {
            if StorageProgram::process_transaction(&tx, accounts).is_err() {
                return Err(BankError::ProgramRuntimeError);
            }
        } else if TicTacToeProgram::check_id(&tx.program_id) {
            if TicTacToeProgram::process_transaction(&tx, accounts).is_err() {
                return Err(BankError::ProgramRuntimeError);
            }
        } else if TicTacToeDashboardProgram::check_id(&tx.program_id) {
            if TicTacToeDashboardProgram::process_transaction(&tx, accounts).is_err() {
                return Err(BankError::ProgramRuntimeError);
            }
        } else if self.loaded_contract(&tx, accounts) {
        } else {
            return Err(BankError::UnknownContractId);
        }
        
        for ((pre_program_id, pre_tokens), post_account) in pre_data.iter().zip(accounts.iter()) {
            Self::verify_transaction(&tx, pre_program_id, *pre_tokens, post_account)?;
        }
        
        let post_total: i64 = accounts.iter().map(|a| a.tokens).sum();
        if pre_total != post_total {
            Err(BankError::UnbalancedTransaction)
        } else {
            Ok(())
        }
    }

    pub fn store_accounts(
        txs: &[Transaction],
        res: &[Result<()>],
        loaded: &[Result<Vec<Account>>],
        accounts: &mut HashMap<Pubkey, Account>,
    ) {
        for (i, racc) in loaded.iter().enumerate() {
            if res[i].is_err() || racc.is_err() {
                continue;
            }

            let tx = &txs[i];
            let acc = racc.as_ref().unwrap();
            for (key, account) in tx.keys.iter().zip(acc.iter()) {
                //purge if 0
                if account.tokens == 0 {
                    accounts.remove(&key);
                } else {
                    *accounts.entry(*key).or_insert_with(Account::default) = account.clone();
                    assert_eq!(accounts.get(key).unwrap().tokens, account.tokens);
                }
            }
        }
    }

    
    #[must_use]
    pub fn process_transactions(&self, txs: &[Transaction]) -> Vec<Result<()>> {
        debug!("processing transactions: {}", txs.len());
        
        let mut accounts = self.accounts.write().unwrap();
        let txs_len = txs.len();
        let mut error_counters = ErrorCounters::default();
        let now = Instant::now();
        let mut loaded_accounts = self.load_accounts(&txs, &accounts, &mut error_counters);
        let load_elapsed = now.elapsed();
        let now = Instant::now();

        let res: Vec<_> = loaded_accounts
            .iter_mut()
            .zip(txs.iter())
            .map(|(acc, tx)| match acc {
                Err(e) => Err(e.clone()),
                Ok(ref mut accounts) => self.execute_transaction(tx, accounts),
            }).collect();
        let execution_elapsed = now.elapsed();
        let now = Instant::now();
        Self::store_accounts(&txs, &res, &loaded_accounts, &mut accounts);
        self.update_transaction_statuses(&txs, &res);
        let write_elapsed = now.elapsed();
        debug!(
            "load: {}us execution: {}us write: {}us txs_len={}",
            duration_in_microseconds(&load_elapsed),
            duration_in_microseconds(&execution_elapsed),
            duration_in_microseconds(&write_elapsed),
            txs_len
        );
        let mut tx_count = 0;
        let mut err_count = 0;
        for r in &res {
            if r.is_ok() {
                tx_count += 1;
            } else {
                if err_count == 0 {
                    debug!("tx error: {:?}", r);
                }
                err_count += 1;
            }
        }
        if err_count > 0 {
            info!("{} errors of {} txs", err_count, err_count + tx_count);
            if !self.is_leader {
                sub_new_counter_info!("bank-process_transactions_err-validator", err_count);
                sub_new_counter_info!(
                    "bank-appy_debits-account_not_found-validator",
                    error_counters.account_not_found_validator
                );
            } else {
                sub_new_counter_info!("bank-process_transactions_err-leader", err_count);
                sub_new_counter_info!(
                    "bank-appy_debits-account_not_found-leader",
                    error_counters.account_not_found_leader
                );
                sub_new_counter_info!(
                    "bank-appy_debits-vote_account_not_found",
                    error_counters.account_not_found_vote
                );
            }
        }
        let cur_tx_count = self.transaction_count.load(Ordering::Relaxed);
        if ((cur_tx_count + tx_count) & !(262_144 - 1)) > cur_tx_count & !(262_144 - 1) {
            info!("accounts.len: {}", accounts.len());
        }
        self.transaction_count
            .fetch_add(tx_count, Ordering::Relaxed);
        res
    }

    pub fn process_entry(&self, entry: &Entry) -> Result<()> {
        if !entry.transactions.is_empty() {
            for result in self.process_transactions(&entry.transactions) {
                result?;
            }
        }
        self.register_entry_id(&entry.id);
        Ok(())
    }

    
    fn process_entries_tail(
        &self,
        entries: Vec<Entry>,
        tail: &mut Vec<Entry>,
        tail_idx: &mut usize,
    ) -> Result<u64> {
        let mut entry_count = 0;

        for entry in entries {
            if tail.len() > *tail_idx {
                tail[*tail_idx] = entry.clone();
            } else {
                tail.push(entry.clone());
            }
            *tail_idx = (*tail_idx + 1) % WINDOW_SIZE as usize;

            entry_count += 1;
            self.process_entry(&entry)?;
        }

        Ok(entry_count)
    }

    
    pub fn process_entries(&self, entries: &[Entry]) -> Result<()> {
        for entry in entries {
            self.process_entry(&entry)?;
        }
        Ok(())
    }

    
    fn process_blocks<I>(
        &self,
        start_hash: Hash,
        entries: I,
        tail: &mut Vec<Entry>,
        tail_idx: &mut usize,
    ) -> Result<u64>
    where
        I: IntoIterator<Item = Entry>,
    {
        
        let mut entry_count = *tail_idx as u64;
        let mut id = start_hash;
        for block in &entries.into_iter().chunks(VERIFY_BLOCK_SIZE) {
            let block: Vec<_> = block.collect();
            if !block.verify(&id) {
                warn!("Ledger proof of history failed at entry: {}", entry_count);
                return Err(BankError::LedgerVerificationFailed);
            }
            id = block.last().unwrap().id;
            entry_count += self.process_entries_tail(block, tail, tail_idx)?;
        }
        Ok(entry_count)
    }

    
    pub fn process_ledger<I>(&self, entries: I) -> Result<(u64, Vec<Entry>)>
    where
        I: IntoIterator<Item = Entry>,
    {
        let mut entries = entries.into_iter();

        
        let entry0 = entries.next().expect("invalid ledger: empty");

        
        let entry1 = entries
            .next()
            .expect("invalid ledger: need at least 2 entries");
        {
            let tx = &entry1.transactions[0];
            assert!(SystemProgram::check_id(&tx.program_id), "Invalid ledger");
            let instruction: SystemProgram = deserialize(&tx.userdata).unwrap();
            let deposit = if let SystemProgram::Move { tokens } = instruction {
                Some(tokens)
            } else {
                None
            }.expect("invalid ledger, needs to start with a contract");
            {
                let mut accounts = self.accounts.write().unwrap();
                let account = accounts.entry(tx.keys[0]).or_insert_with(Account::default);
                account.tokens += deposit;
                trace!("applied genesis payment {:?} => {:?}", deposit, account);
            }
        }
        self.register_entry_id(&entry0.id);
        self.register_entry_id(&entry1.id);
        let entry1_id = entry1.id;

        let mut tail = Vec::with_capacity(WINDOW_SIZE as usize);
        tail.push(entry0);
        tail.push(entry1);
        let mut tail_idx = 2;
        let entry_count = self.process_blocks(entry1_id, entries, &mut tail, &mut tail_idx)?;

        
        if tail.len() == WINDOW_SIZE as usize {
            tail.rotate_left(tail_idx)
        }

        Ok((entry_count, tail))
    }

    
    pub fn transfer(
        &self,
        n: i64,
        keypair: &Keypair,
        to: Pubkey,
        last_id: Hash,
    ) -> Result<Signature> {
        let tx = Transaction::system_new(keypair, to, n, last_id);
        let signature = tx.signature;
        self.process_transaction(&tx).map(|_| signature)
    }

    pub fn read_balance(account: &Account) -> i64 {
        if SystemProgram::check_id(&account.program_id) {
            SystemProgram::get_balance(account)
        } else if BudgetState::check_id(&account.program_id) {
            BudgetState::get_balance(account)
        } else {
            account.tokens
        }
    }
    
    pub fn get_balance(&self, pubkey: &Pubkey) -> i64 {
        self.get_account(pubkey)
            .map(|x| Self::read_balance(&x))
            .unwrap_or(0)
    }

    pub fn get_account(&self, pubkey: &Pubkey) -> Option<Account> {
        let accounts = self
            .accounts
            .read()
            .expect("'accounts' read lock in get_balance");
        accounts.get(pubkey).cloned()
    }

    pub fn transaction_count(&self) -> usize {
        self.transaction_count.load(Ordering::Relaxed)
    }

    pub fn get_signature_status(&self, signature: &Signature) -> Result<()> {
        let last_ids_sigs = self.last_ids_sigs.read().unwrap();
        for (_hash, (signatures, _)) in last_ids_sigs.iter() {
            if let Some(res) = signatures.get(signature) {
                return res.clone();
            }
        }
        Err(BankError::SignatureNotFound)
    }

    pub fn has_signature(&self, signature: &Signature) -> bool {
        self.get_signature_status(signature) != Err(BankError::SignatureNotFound)
    }

    
    pub fn hash_internal_state(&self) -> Hash {
        let mut ordered_accounts = BTreeMap::new();
        for (pubkey, account) in self.accounts.read().unwrap().iter() {
            ordered_accounts.insert(*pubkey, account.clone());
        }
        hash(&serialize(&ordered_accounts).unwrap())
    }

    pub fn finality(&self) -> usize {
        self.finality_time.load(Ordering::Relaxed)
    }

    pub fn set_finality(&self, finality: usize) {
        self.finality_time.store(finality, Ordering::Relaxed);
    }
}