1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
use {
crate::accountsdb_repl_server::{self, ReplicaSlotConfirmationServer},
crossbeam_channel::Receiver,
gemachain_sdk::{clock::Slot, commitment_config::CommitmentLevel},
std::{
collections::VecDeque,
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
thread::{self, sleep, Builder, JoinHandle},
time::Duration,
},
tonic,
};
#[derive(Default, Clone)]
struct ReplicaEligibleSlotSet {
slot_set: Arc<RwLock<VecDeque<(Slot, CommitmentLevel)>>>,
}
pub(crate) struct ReplicaSlotConfirmationServerImpl {
eligible_slot_set: ReplicaEligibleSlotSet,
confirmed_bank_receiver_service: Option<JoinHandle<()>>,
cleanup_service: Option<JoinHandle<()>>,
exit_updated_slot_server: Arc<AtomicBool>,
}
impl ReplicaSlotConfirmationServer for ReplicaSlotConfirmationServerImpl {
fn get_confirmed_slots(
&self,
request: &accountsdb_repl_server::ReplicaSlotConfirmationRequest,
) -> Result<accountsdb_repl_server::ReplicaSlotConfirmationResponse, tonic::Status> {
let slot_set = self.eligible_slot_set.slot_set.read().unwrap();
let updated_slots: Vec<u64> = slot_set
.iter()
.filter(|(slot, _)| *slot > request.last_replicated_slot)
.map(|(slot, _)| *slot)
.collect();
Ok(accountsdb_repl_server::ReplicaSlotConfirmationResponse { updated_slots })
}
fn join(&mut self) -> thread::Result<()> {
self.exit_updated_slot_server.store(true, Ordering::Relaxed);
self.confirmed_bank_receiver_service
.take()
.map(JoinHandle::join)
.unwrap()
.expect("confirmed_bank_receiver_service");
self.cleanup_service.take().map(JoinHandle::join).unwrap()
}
}
const MAX_ELIGIBLE_SLOT_SET_SIZE: usize = 262144;
impl ReplicaSlotConfirmationServerImpl {
pub fn new(confirmed_bank_receiver: Receiver<Slot>) -> Self {
let eligible_slot_set = ReplicaEligibleSlotSet::default();
let exit_updated_slot_server = Arc::new(AtomicBool::new(false));
Self {
eligible_slot_set: eligible_slot_set.clone(),
confirmed_bank_receiver_service: Some(Self::run_confirmed_bank_receiver(
confirmed_bank_receiver,
eligible_slot_set.clone(),
exit_updated_slot_server.clone(),
)),
cleanup_service: Some(Self::run_cleanup_service(
eligible_slot_set,
MAX_ELIGIBLE_SLOT_SET_SIZE,
exit_updated_slot_server.clone(),
)),
exit_updated_slot_server,
}
}
fn run_confirmed_bank_receiver(
confirmed_bank_receiver: Receiver<Slot>,
eligible_slot_set: ReplicaEligibleSlotSet,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
Builder::new()
.name("confirmed_bank_receiver".to_string())
.spawn(move || {
while !exit.load(Ordering::Relaxed) {
if let Ok(slot) = confirmed_bank_receiver.recv() {
let mut slot_set = eligible_slot_set.slot_set.write().unwrap();
slot_set.push_back((slot, CommitmentLevel::Confirmed));
}
}
})
.unwrap()
}
fn run_cleanup_service(
eligible_slot_set: ReplicaEligibleSlotSet,
max_set_size: usize,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
Builder::new()
.name("cleanup_service".to_string())
.spawn(move || {
while !exit.load(Ordering::Relaxed) {
let mut slot_set = eligible_slot_set.slot_set.write().unwrap();
let count_to_drain = slot_set.len().saturating_sub(max_set_size);
if count_to_drain > 0 {
drop(slot_set.drain(..count_to_drain));
}
drop(slot_set);
sleep(Duration::from_millis(200));
}
})
.unwrap()
}
}