buffett-core 0.1.1

Core library for Bitconch:buffett
use crate::tx_vault::Bank;
use bincode::deserialize;
use crate::budget_transaction::BudgetTransaction;
use buffett_metrics::counter::Counter;
use crate::entry::Entry;
use log::Level;
use crate::packet::Packets;
use poh_recorder::PohRecorder;
use rayon::prelude::*;
use crate::result::{Error, Result};
use crate::service::Service;
use crate::sigverify_stage::VerifiedPackets;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError};
use std::sync::{Arc, Mutex};
use std::thread::sleep;
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
use std::time::Instant;
use buffett_timing::timing;
use crate::transaction::Transaction;
use buffett_metrics::sub_new_counter_info;

pub const NUM_THREADS: usize = 1;


pub struct BankingStage {
    
    thread_hdls: Vec<JoinHandle<()>>,
}

pub enum Config {
    
    Tick(usize),
    
    Sleep(Duration),
}

impl Default for Config {
    fn default() -> Config {
        
        Config::Sleep(Duration::from_millis(500))
    }
}
impl BankingStage {
    
    pub fn new(
        bank: &Arc<Bank>,
        verified_receiver: Receiver<VerifiedPackets>,
        config: Config,
    ) -> (Self, Receiver<Vec<Entry>>) {
        let (entry_sender, entry_receiver) = channel();
        let shared_verified_receiver = Arc::new(Mutex::new(verified_receiver));
        let poh = PohRecorder::new(bank.clone(), entry_sender);
        let tick_poh = poh.clone();
        
        let poh_exit = Arc::new(AtomicBool::new(false));
        let banking_exit = poh_exit.clone();
        
        let tick_producer = Builder::new()
            .name("bitconch-banking-stage-tick_producer".to_string())
            .spawn(move || {
                if let Err(e) = Self::tick_producer(&tick_poh, &config, &poh_exit) {
                    match e {
                        Error::SendError => (),
                        _ => error!(
                            "bitconch-banking-stage-tick_producer unexpected error {:?}",
                            e
                        ),
                    }
                }
                debug!("tick producer exiting");
                poh_exit.store(true, Ordering::Relaxed);
            }).unwrap();

        
        let mut thread_hdls: Vec<JoinHandle<()>> = (0..NUM_THREADS)
            .into_iter()
            .map(|_| {
                let thread_bank = bank.clone();
                let thread_verified_receiver = shared_verified_receiver.clone();
                let thread_poh = poh.clone();
                let thread_banking_exit = banking_exit.clone();
                Builder::new()
                    .name("bitconch-banking-stage-tx".to_string())
                    .spawn(move || {
                        loop {
                            if let Err(e) = Self::process_packets(
                                &thread_bank,
                                &thread_verified_receiver,
                                &thread_poh,
                            ) {
                                debug!("got error {:?}", e);
                                match e {
                                    Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
                                    Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => {
                                        break
                                    }
                                    Error::RecvError(_) => break,
                                    Error::SendError => break,
                                    _ => error!("bitconch-banking-stage-tx {:?}", e),
                                }
                            }
                            if thread_banking_exit.load(Ordering::Relaxed) {
                                debug!("tick service exited");
                                break;
                            }
                        }
                        thread_banking_exit.store(true, Ordering::Relaxed);
                    }).unwrap()
            }).collect();
        thread_hdls.push(tick_producer);
        (BankingStage { thread_hdls }, entry_receiver)
    }

    
    fn deserialize_transactions(p: &Packets) -> Vec<Option<(Transaction, SocketAddr)>> {
        p.packets
            .par_iter()
            .map(|x| {
                deserialize(&x.data[0..x.meta.size])
                    .map(|req| (req, x.meta.addr()))
                    .ok()
            }).collect()
    }

    fn tick_producer(poh: &PohRecorder, config: &Config, poh_exit: &AtomicBool) -> Result<()> {
        loop {
            match *config {
                Config::Tick(num) => {
                    for _ in 0..num {
                        poh.hash();
                    }
                }
                Config::Sleep(duration) => {
                    sleep(duration);
                }
            }
            poh.tick()?;
            if poh_exit.load(Ordering::Relaxed) {
                debug!("tick service exited");
                return Ok(());
            }
        }
    }

    fn process_transactions(
        bank: &Arc<Bank>,
        transactions: &[Transaction],
        poh: &PohRecorder,
    ) -> Result<()> {
        debug!("transactions: {}", transactions.len());
        let mut chunk_start = 0;
        while chunk_start != transactions.len() {
            let chunk_end = chunk_start + Entry::num_will_fit(&transactions[chunk_start..]);

            let results = bank.process_transactions(&transactions[chunk_start..chunk_end]);

            let processed_transactions: Vec<_> = transactions[chunk_start..chunk_end]
                .into_iter()
                .enumerate()
                .filter_map(|(i, x)| match results[i] {
                    Ok(_) => Some(x.clone()),
                    Err(ref e) => {
                        debug!("process transaction failed {:?}", e);
                        None
                    }
                }).collect();

            if !processed_transactions.is_empty() {
                let hash = Transaction::hash(&processed_transactions);
                debug!("processed ok: {} {}", processed_transactions.len(), hash);
                poh.record(hash, processed_transactions)?;
            }
            chunk_start = chunk_end;
        }
        debug!("done process_transactions");
        Ok(())
    }

    
    pub fn process_packets(
        bank: &Arc<Bank>,
        verified_receiver: &Arc<Mutex<Receiver<VerifiedPackets>>>,
        poh: &PohRecorder,
    ) -> Result<()> {
        let recv_start = Instant::now();
        let mms = verified_receiver
            .lock()
            .unwrap()
            .recv_timeout(Duration::from_millis(100))?;
        let mut reqs_len = 0;
        let mms_len = mms.len();
        info!(
            "@{:?} process start stalled for: {:?}ms batches: {}",
            timing::timestamp(),
            timing::duration_in_milliseconds(&recv_start.elapsed()),
            mms.len(),
        );
        sub_new_counter_info!("banking_stage-entries_received", mms_len);
        let bank_starting_tx_count = bank.transaction_count();
        let count = mms.iter().map(|x| x.1.len()).sum();
        let proc_start = Instant::now();
        for (msgs, vers) in mms {
            let transactions = Self::deserialize_transactions(&msgs.read().unwrap());
            reqs_len += transactions.len();

            debug!("transactions received {}", transactions.len());

            let transactions: Vec<_> = transactions
                .into_iter()
                .zip(vers)
                .filter_map(|(tx, ver)| match tx {
                    None => None,
                    Some((tx, _addr)) => if tx.verify_plan() && ver != 0 {
                        Some(tx)
                    } else {
                        None
                    },
                }).collect();
            debug!("verified transactions {}", transactions.len());
            Self::process_transactions(bank, &transactions, poh)?;
        }

        sub_new_counter_info!(
            "banking_stage-time_ms",
            timing::duration_in_milliseconds(&proc_start.elapsed()) as usize
        );
        let total_time_s = timing::duration_in_seconds(&proc_start.elapsed());
        let total_time_ms = timing::duration_in_milliseconds(&proc_start.elapsed());
        info!(
            "Current Timing @{:?} done processing transaction bundle: {} time: {:?}milli-seconds requested: {} requsts per second: {}",
            timing::timestamp(),
            mms_len,
            total_time_ms,
            reqs_len,
            (reqs_len as f32) / (total_time_s)
        );
        sub_new_counter_info!("banking_stage-process_packets", count);
        sub_new_counter_info!(
            "banking_stage-process_transactions",
            bank.transaction_count() - bank_starting_tx_count
        );
        Ok(())
    }
}

impl Service for BankingStage {
    type JoinReturnType = ();

    fn join(self) -> thread::Result<()> {
        for thread_hdl in self.thread_hdls {
            thread_hdl.join()?;
        }
        Ok(())
    }
}