buffett-core 0.1.1

Core library for Bitconch:buffett
use crate::tx_vault::Bank;
use crate::socket_streamer::BlobFetchStage;
use crate::crdt::Crdt;
use replicate_stage::ReplicateStage;
use retransmit_stage::{RetransmitStage, RetransmitStageReturnType};
use crate::service::Service;
use buffett_crypto::signature::Keypair;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread;
use crate::window::SharedWindow;

#[derive(Debug, PartialEq, Eq, Clone)]
pub enum TvuReturnType {
    LeaderRotation(u64),
}

pub struct Tvu {
    replicate_stage: ReplicateStage,
    fetch_stage: BlobFetchStage,
    retransmit_stage: RetransmitStage,
    exit: Arc<AtomicBool>,
}

impl Tvu {
    
    #[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
    pub fn new(
        keypair: Arc<Keypair>,
        bank: &Arc<Bank>,
        entry_height: u64,
        crdt: Arc<RwLock<Crdt>>,
        window: SharedWindow,
        replicate_sockets: Vec<UdpSocket>,
        repair_socket: UdpSocket,
        retransmit_socket: UdpSocket,
        ledger_path: Option<&str>,
    ) -> Self {
        let exit = Arc::new(AtomicBool::new(false));

        let repair_socket = Arc::new(repair_socket);
        let mut blob_sockets: Vec<Arc<UdpSocket>> =
            replicate_sockets.into_iter().map(Arc::new).collect();
        blob_sockets.push(repair_socket.clone());
        let (fetch_stage, blob_fetch_receiver) =
            BlobFetchStage::new_multi_socket(blob_sockets, exit.clone());
        
        let (retransmit_stage, blob_window_receiver) = RetransmitStage::new(
            &crdt,
            window,
            entry_height,
            Arc::new(retransmit_socket),
            repair_socket,
            blob_fetch_receiver,
        );

        let replicate_stage = ReplicateStage::new(
            keypair,
            bank.clone(),
            crdt,
            blob_window_receiver,
            ledger_path,
            exit.clone(),
        );

        Tvu {
            replicate_stage,
            fetch_stage,
            retransmit_stage,
            exit,
        }
    }

    pub fn exit(&self) -> () {
        self.exit.store(true, Ordering::Relaxed);
    }

    pub fn close(self) -> thread::Result<Option<TvuReturnType>> {
        self.fetch_stage.close();
        self.join()
    }
}

impl Service for Tvu {
    type JoinReturnType = Option<TvuReturnType>;

    fn join(self) -> thread::Result<Option<TvuReturnType>> {
        self.replicate_stage.join()?;
        self.fetch_stage.join()?;
        match self.retransmit_stage.join()? {
            Some(RetransmitStageReturnType::LeaderRotation(entry_height)) => {
                Ok(Some(TvuReturnType::LeaderRotation(entry_height)))
            }
            _ => Ok(None),
        }
    }
}