1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
use {
    crate::accountsdb_repl_server::{self, ReplicaSlotConfirmationServer},
    crossbeam_channel::Receiver,
    gemachain_sdk::{clock::Slot, commitment_config::CommitmentLevel},
    std::{
        collections::VecDeque,
        sync::{
            atomic::{AtomicBool, Ordering},
            Arc, RwLock,
        },
        thread::{self, sleep, Builder, JoinHandle},
        time::Duration,
    },
    tonic,
};

/// The structure modelling the slots eligible for replication and
/// their states.
#[derive(Default, Clone)]
struct ReplicaEligibleSlotSet {
    slot_set: Arc<RwLock<VecDeque<(Slot, CommitmentLevel)>>>,
}

pub(crate) struct ReplicaSlotConfirmationServerImpl {
    eligible_slot_set: ReplicaEligibleSlotSet,
    confirmed_bank_receiver_service: Option<JoinHandle<()>>,
    cleanup_service: Option<JoinHandle<()>>,
    exit_updated_slot_server: Arc<AtomicBool>,
}

impl ReplicaSlotConfirmationServer for ReplicaSlotConfirmationServerImpl {
    fn get_confirmed_slots(
        &self,
        request: &accountsdb_repl_server::ReplicaSlotConfirmationRequest,
    ) -> Result<accountsdb_repl_server::ReplicaSlotConfirmationResponse, tonic::Status> {
        let slot_set = self.eligible_slot_set.slot_set.read().unwrap();
        let updated_slots: Vec<u64> = slot_set
            .iter()
            .filter(|(slot, _)| *slot > request.last_replicated_slot)
            .map(|(slot, _)| *slot)
            .collect();

        Ok(accountsdb_repl_server::ReplicaSlotConfirmationResponse { updated_slots })
    }

    fn join(&mut self) -> thread::Result<()> {
        self.exit_updated_slot_server.store(true, Ordering::Relaxed);
        self.confirmed_bank_receiver_service
            .take()
            .map(JoinHandle::join)
            .unwrap()
            .expect("confirmed_bank_receiver_service");

        self.cleanup_service.take().map(JoinHandle::join).unwrap()
    }
}

const MAX_ELIGIBLE_SLOT_SET_SIZE: usize = 262144;

impl ReplicaSlotConfirmationServerImpl {
    pub fn new(confirmed_bank_receiver: Receiver<Slot>) -> Self {
        let eligible_slot_set = ReplicaEligibleSlotSet::default();
        let exit_updated_slot_server = Arc::new(AtomicBool::new(false));

        Self {
            eligible_slot_set: eligible_slot_set.clone(),
            confirmed_bank_receiver_service: Some(Self::run_confirmed_bank_receiver(
                confirmed_bank_receiver,
                eligible_slot_set.clone(),
                exit_updated_slot_server.clone(),
            )),
            cleanup_service: Some(Self::run_cleanup_service(
                eligible_slot_set,
                MAX_ELIGIBLE_SLOT_SET_SIZE,
                exit_updated_slot_server.clone(),
            )),
            exit_updated_slot_server,
        }
    }

    fn run_confirmed_bank_receiver(
        confirmed_bank_receiver: Receiver<Slot>,
        eligible_slot_set: ReplicaEligibleSlotSet,
        exit: Arc<AtomicBool>,
    ) -> JoinHandle<()> {
        Builder::new()
            .name("confirmed_bank_receiver".to_string())
            .spawn(move || {
                while !exit.load(Ordering::Relaxed) {
                    if let Ok(slot) = confirmed_bank_receiver.recv() {
                        let mut slot_set = eligible_slot_set.slot_set.write().unwrap();
                        slot_set.push_back((slot, CommitmentLevel::Confirmed));
                    }
                }
            })
            .unwrap()
    }

    fn run_cleanup_service(
        eligible_slot_set: ReplicaEligibleSlotSet,
        max_set_size: usize,
        exit: Arc<AtomicBool>,
    ) -> JoinHandle<()> {
        Builder::new()
            .name("cleanup_service".to_string())
            .spawn(move || {
                while !exit.load(Ordering::Relaxed) {
                    let mut slot_set = eligible_slot_set.slot_set.write().unwrap();
                    let count_to_drain = slot_set.len().saturating_sub(max_set_size);
                    if count_to_drain > 0 {
                        drop(slot_set.drain(..count_to_drain));
                    }
                    drop(slot_set);
                    sleep(Duration::from_millis(200));
                }
            })
            .unwrap()
    }
}