use crate::rpc_subscriptions::RpcSubscriptions;
use crossbeam_channel::{Receiver, RecvTimeoutError, Sender};
use solana_client::rpc_response::{SlotTransactionStats, SlotUpdate};
use solana_runtime::{bank::Bank, bank_forks::BankForks};
use solana_sdk::{clock::Slot, timing::timestamp};
use std::{
    collections::HashSet,
    sync::{
        atomic::{AtomicBool, Ordering},
        Arc, RwLock,
    },
    thread::{self, Builder, JoinHandle},
    time::Duration,
};
pub struct OptimisticallyConfirmedBank {
    pub bank: Arc<Bank>,
}
impl OptimisticallyConfirmedBank {
    pub fn locked_from_bank_forks_root(bank_forks: &Arc<RwLock<BankForks>>) -> Arc<RwLock<Self>> {
        Arc::new(RwLock::new(Self {
            bank: bank_forks.read().unwrap().root_bank(),
        }))
    }
}
pub enum BankNotification {
    OptimisticallyConfirmed(Slot),
    Frozen(Arc<Bank>),
    Root(Arc<Bank>),
}
impl std::fmt::Debug for BankNotification {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        match self {
            BankNotification::OptimisticallyConfirmed(slot) => {
                write!(f, "OptimisticallyConfirmed({:?})", slot)
            }
            BankNotification::Frozen(bank) => write!(f, "Frozen({})", bank.slot()),
            BankNotification::Root(bank) => write!(f, "Root({})", bank.slot()),
        }
    }
}
pub type BankNotificationReceiver = Receiver<BankNotification>;
pub type BankNotificationSender = Sender<BankNotification>;
pub struct OptimisticallyConfirmedBankTracker {
    thread_hdl: JoinHandle<()>,
}
impl OptimisticallyConfirmedBankTracker {
    pub fn new(
        receiver: BankNotificationReceiver,
        exit: &Arc<AtomicBool>,
        bank_forks: Arc<RwLock<BankForks>>,
        optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
        subscriptions: Arc<RpcSubscriptions>,
    ) -> Self {
        let exit_ = exit.clone();
        let mut pending_optimistically_confirmed_banks = HashSet::new();
        let thread_hdl = Builder::new()
            .name("solana-optimistic-bank-tracker".to_string())
            .spawn(move || loop {
                if exit_.load(Ordering::Relaxed) {
                    break;
                }
                if let Err(RecvTimeoutError::Disconnected) = Self::recv_notification(
                    &receiver,
                    &bank_forks,
                    &optimistically_confirmed_bank,
                    &subscriptions,
                    &mut pending_optimistically_confirmed_banks,
                ) {
                    break;
                }
            })
            .unwrap();
        Self { thread_hdl }
    }
    fn recv_notification(
        receiver: &Receiver<BankNotification>,
        bank_forks: &Arc<RwLock<BankForks>>,
        optimistically_confirmed_bank: &Arc<RwLock<OptimisticallyConfirmedBank>>,
        subscriptions: &Arc<RpcSubscriptions>,
        mut pending_optimistically_confirmed_banks: &mut HashSet<Slot>,
    ) -> Result<(), RecvTimeoutError> {
        let notification = receiver.recv_timeout(Duration::from_secs(1))?;
        Self::process_notification(
            notification,
            bank_forks,
            optimistically_confirmed_bank,
            subscriptions,
            &mut pending_optimistically_confirmed_banks,
        );
        Ok(())
    }
    pub(crate) fn process_notification(
        notification: BankNotification,
        bank_forks: &Arc<RwLock<BankForks>>,
        optimistically_confirmed_bank: &Arc<RwLock<OptimisticallyConfirmedBank>>,
        subscriptions: &Arc<RpcSubscriptions>,
        pending_optimistically_confirmed_banks: &mut HashSet<Slot>,
    ) {
        debug!("received bank notification: {:?}", notification);
        match notification {
            BankNotification::OptimisticallyConfirmed(slot) => {
                if let Some(bank) = bank_forks
                    .read()
                    .unwrap()
                    .get(slot)
                    .filter(|b| b.is_frozen())
                {
                    let mut w_optimistically_confirmed_bank =
                        optimistically_confirmed_bank.write().unwrap();
                    if bank.slot() > w_optimistically_confirmed_bank.bank.slot() {
                        w_optimistically_confirmed_bank.bank = bank.clone();
                        subscriptions.notify_gossip_subscribers(slot);
                    }
                    drop(w_optimistically_confirmed_bank);
                } else if slot > bank_forks.read().unwrap().root_bank().slot() {
                    pending_optimistically_confirmed_banks.insert(slot);
                } else {
                    inc_new_counter_info!("dropped-already-rooted-optimistic-bank-notification", 1);
                }
                
                subscriptions.notify_slot_update(SlotUpdate::OptimisticConfirmation {
                    slot,
                    timestamp: timestamp(),
                });
            }
            BankNotification::Frozen(bank) => {
                let frozen_slot = bank.slot();
                if let Some(parent) = bank.parent() {
                    let num_successful_transactions = bank
                        .transaction_count()
                        .saturating_sub(parent.transaction_count());
                    subscriptions.notify_slot_update(SlotUpdate::Frozen {
                        slot: frozen_slot,
                        timestamp: timestamp(),
                        stats: SlotTransactionStats {
                            num_transaction_entries: bank.transaction_entries_count(),
                            num_successful_transactions,
                            num_failed_transactions: bank.transaction_error_count(),
                            max_transactions_per_entry: bank.transactions_per_entry_max(),
                        },
                    });
                }
                if pending_optimistically_confirmed_banks.remove(&bank.slot()) {
                    let mut w_optimistically_confirmed_bank =
                        optimistically_confirmed_bank.write().unwrap();
                    if frozen_slot > w_optimistically_confirmed_bank.bank.slot() {
                        w_optimistically_confirmed_bank.bank = bank;
                        subscriptions.notify_gossip_subscribers(frozen_slot);
                    }
                    drop(w_optimistically_confirmed_bank);
                }
            }
            BankNotification::Root(bank) => {
                let root_slot = bank.slot();
                let mut w_optimistically_confirmed_bank =
                    optimistically_confirmed_bank.write().unwrap();
                if root_slot > w_optimistically_confirmed_bank.bank.slot() {
                    w_optimistically_confirmed_bank.bank = bank;
                }
                drop(w_optimistically_confirmed_bank);
                pending_optimistically_confirmed_banks.retain(|&s| s > root_slot);
            }
        }
    }
    pub fn close(self) -> thread::Result<()> {
        self.join()
    }
    pub fn join(self) -> thread::Result<()> {
        self.thread_hdl.join()
    }
}
#[cfg(test)]
mod tests {
    use super::*;
    use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo};
    use solana_runtime::{
        accounts_background_service::AbsRequestSender, commitment::BlockCommitmentCache,
    };
    use solana_sdk::pubkey::Pubkey;
    #[test]
    fn test_process_notification() {
        let exit = Arc::new(AtomicBool::new(false));
        let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(100);
        let bank = Bank::new(&genesis_config);
        let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
        let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone();
        let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1);
        bank_forks.write().unwrap().insert(bank1);
        let bank1 = bank_forks.read().unwrap().get(1).unwrap().clone();
        let bank2 = Bank::new_from_parent(&bank1, &Pubkey::default(), 2);
        bank_forks.write().unwrap().insert(bank2);
        let bank2 = bank_forks.read().unwrap().get(2).unwrap().clone();
        let bank3 = Bank::new_from_parent(&bank2, &Pubkey::default(), 3);
        bank_forks.write().unwrap().insert(bank3);
        let optimistically_confirmed_bank =
            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
        let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
        let subscriptions = Arc::new(RpcSubscriptions::new(
            &exit,
            bank_forks.clone(),
            block_commitment_cache,
            optimistically_confirmed_bank.clone(),
        ));
        let mut pending_optimistically_confirmed_banks = HashSet::new();
        assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 0);
        OptimisticallyConfirmedBankTracker::process_notification(
            BankNotification::OptimisticallyConfirmed(2),
            &bank_forks,
            &optimistically_confirmed_bank,
            &subscriptions,
            &mut pending_optimistically_confirmed_banks,
        );
        assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2);
        
        OptimisticallyConfirmedBankTracker::process_notification(
            BankNotification::OptimisticallyConfirmed(1),
            &bank_forks,
            &optimistically_confirmed_bank,
            &subscriptions,
            &mut pending_optimistically_confirmed_banks,
        );
        assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2);
        
        OptimisticallyConfirmedBankTracker::process_notification(
            BankNotification::OptimisticallyConfirmed(3),
            &bank_forks,
            &optimistically_confirmed_bank,
            &subscriptions,
            &mut pending_optimistically_confirmed_banks,
        );
        assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2);
        assert_eq!(pending_optimistically_confirmed_banks.len(), 1);
        assert_eq!(pending_optimistically_confirmed_banks.contains(&3), true);
        
        let bank3 = bank_forks.read().unwrap().get(3).unwrap().clone();
        OptimisticallyConfirmedBankTracker::process_notification(
            BankNotification::Frozen(bank3),
            &bank_forks,
            &optimistically_confirmed_bank,
            &subscriptions,
            &mut pending_optimistically_confirmed_banks,
        );
        assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 3);
        
        let bank3 = bank_forks.read().unwrap().get(3).unwrap().clone();
        let bank4 = Bank::new_from_parent(&bank3, &Pubkey::default(), 4);
        bank_forks.write().unwrap().insert(bank4);
        OptimisticallyConfirmedBankTracker::process_notification(
            BankNotification::OptimisticallyConfirmed(4),
            &bank_forks,
            &optimistically_confirmed_bank,
            &subscriptions,
            &mut pending_optimistically_confirmed_banks,
        );
        assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 3);
        assert_eq!(pending_optimistically_confirmed_banks.len(), 1);
        assert_eq!(pending_optimistically_confirmed_banks.contains(&4), true);
        let bank4 = bank_forks.read().unwrap().get(4).unwrap().clone();
        let bank5 = Bank::new_from_parent(&bank4, &Pubkey::default(), 5);
        bank_forks.write().unwrap().insert(bank5);
        let bank5 = bank_forks.read().unwrap().get(5).unwrap().clone();
        OptimisticallyConfirmedBankTracker::process_notification(
            BankNotification::Root(bank5),
            &bank_forks,
            &optimistically_confirmed_bank,
            &subscriptions,
            &mut pending_optimistically_confirmed_banks,
        );
        assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 5);
        assert_eq!(pending_optimistically_confirmed_banks.len(), 0);
        assert_eq!(pending_optimistically_confirmed_banks.contains(&4), false);
        
        let bank5 = bank_forks.read().unwrap().get(5).unwrap().clone();
        let bank6 = Bank::new_from_parent(&bank5, &Pubkey::default(), 6);
        bank_forks.write().unwrap().insert(bank6);
        let bank5 = bank_forks.read().unwrap().get(5).unwrap().clone();
        let bank7 = Bank::new_from_parent(&bank5, &Pubkey::default(), 7);
        bank_forks.write().unwrap().insert(bank7);
        bank_forks
            .write()
            .unwrap()
            .set_root(7, &AbsRequestSender::default(), None);
        OptimisticallyConfirmedBankTracker::process_notification(
            BankNotification::OptimisticallyConfirmed(6),
            &bank_forks,
            &optimistically_confirmed_bank,
            &subscriptions,
            &mut pending_optimistically_confirmed_banks,
        );
        assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 5);
        assert_eq!(pending_optimistically_confirmed_banks.len(), 0);
        assert_eq!(pending_optimistically_confirmed_banks.contains(&6), false);
    }
}