gemachain_runtime/
bucket_map_holder.rs1use crate::accounts_index::{AccountsIndexConfig, IndexValue};
2use crate::bucket_map_holder_stats::BucketMapHolderStats;
3use crate::in_mem_accounts_index::{InMemAccountsIndex, SlotT};
4use crate::waitable_condvar::WaitableCondvar;
5use gemachain_bucket_map::bucket_map::{BucketMap, BucketMapConfig};
6use gemachain_measure::measure::Measure;
7use gemachain_sdk::clock::SLOT_MS;
8use gemachain_sdk::timing::AtomicInterval;
9use std::fmt::Debug;
10use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
11use std::sync::{Arc, Mutex};
12use std::time::Duration;
13pub type Age = u8;
14
15pub const AGE_MS: u64 = SLOT_MS; pub struct BucketMapHolder<T: IndexValue> {
18 pub disk: Option<BucketMap<SlotT<T>>>,
19
20 pub count_ages_flushed: AtomicUsize,
21 pub age: AtomicU8,
22 pub stats: BucketMapHolderStats,
23
24 age_timer: AtomicInterval,
25
26 pub wait_dirty_or_aged: WaitableCondvar,
28 next_bucket_to_flush: Mutex<usize>,
29 bins: usize,
30
31 _threads: usize,
32
33 pub mem_budget_mb: Option<usize>,
36 ages_to_stay_in_cache: Age,
37
38 startup: AtomicBool,
43}
44
45impl<T: IndexValue> Debug for BucketMapHolder<T> {
46 fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 Ok(())
48 }
49}
50
51#[allow(clippy::mutex_atomic)]
52impl<T: IndexValue> BucketMapHolder<T> {
53 pub fn increment_age(&self) {
54 let previous = self.count_ages_flushed.swap(0, Ordering::Acquire);
57 self.age.fetch_add(1, Ordering::Release);
60 assert!(previous >= self.bins); self.wait_dirty_or_aged.notify_all(); }
63
64 pub fn future_age_to_flush(&self) -> Age {
65 self.current_age().wrapping_add(self.ages_to_stay_in_cache)
66 }
67
68 fn has_age_interval_elapsed(&self) -> bool {
69 self.age_timer.should_update(AGE_MS)
71 }
72
73 pub fn get_startup(&self) -> bool {
75 self.startup.load(Ordering::Relaxed)
76 }
77
78 pub fn set_startup(&self, value: bool) {
79 if !value {
80 self.wait_for_idle();
81 }
82 self.startup.store(value, Ordering::Relaxed)
83 }
84
85 pub(crate) fn wait_for_idle(&self) {
86 assert!(self.get_startup());
87 }
88
89 pub fn current_age(&self) -> Age {
90 self.age.load(Ordering::Acquire)
91 }
92
93 pub fn bucket_flushed_at_current_age(&self) {
94 self.count_ages_flushed.fetch_add(1, Ordering::Release);
95 self.maybe_advance_age();
96 }
97
98 pub fn all_buckets_flushed_at_current_age(&self) -> bool {
100 self.count_ages_flushed() >= self.bins
101 }
102
103 pub fn count_ages_flushed(&self) -> usize {
104 self.count_ages_flushed.load(Ordering::Acquire)
105 }
106
107 pub fn maybe_advance_age(&self) -> bool {
108 if self.all_buckets_flushed_at_current_age() && self.has_age_interval_elapsed() {
110 self.increment_age();
111 true
112 } else {
113 false
114 }
115 }
116
117 pub fn new(bins: usize, config: &Option<AccountsIndexConfig>, threads: usize) -> Self {
118 const DEFAULT_AGE_TO_STAY_IN_CACHE: Age = 5;
119 let ages_to_stay_in_cache = config
120 .as_ref()
121 .and_then(|config| config.ages_to_stay_in_cache)
122 .unwrap_or(DEFAULT_AGE_TO_STAY_IN_CACHE);
123
124 let mut bucket_config = BucketMapConfig::new(bins);
125 bucket_config.drives = config.as_ref().and_then(|config| config.drives.clone());
126 let mem_budget_mb = config.as_ref().and_then(|config| config.index_limit_mb);
127 let disk = mem_budget_mb.map(|_| BucketMap::new(bucket_config));
129 Self {
130 disk,
131 ages_to_stay_in_cache,
132 count_ages_flushed: AtomicUsize::default(),
133 age: AtomicU8::default(),
134 stats: BucketMapHolderStats::new(bins),
135 wait_dirty_or_aged: WaitableCondvar::default(),
136 next_bucket_to_flush: Mutex::new(0),
137 age_timer: AtomicInterval::default(),
138 bins,
139 startup: AtomicBool::default(),
140 mem_budget_mb,
141 _threads: threads,
142 }
143 }
144
145 pub fn next_bucket_to_flush(&self) -> usize {
148 let mut lock = self.next_bucket_to_flush.lock().unwrap();
151 let result = *lock;
152 *lock = (result + 1) % self.bins;
153 result
154 }
155
156 pub fn background(&self, exit: Arc<AtomicBool>, in_mem: Vec<Arc<InMemAccountsIndex<T>>>) {
158 let bins = in_mem.len();
159 let flush = self.disk.is_some();
160 loop {
161 if self.all_buckets_flushed_at_current_age() {
162 let wait = std::cmp::min(
163 self.age_timer.remaining_until_next_interval(AGE_MS),
164 self.stats.remaining_until_next_interval(),
165 );
166
167 let mut m = Measure::start("wait");
168 self.wait_dirty_or_aged
169 .wait_timeout(Duration::from_millis(wait));
170 m.stop();
171 self.stats
172 .bg_waiting_us
173 .fetch_add(m.as_us(), Ordering::Relaxed);
174 }
175
176 if exit.load(Ordering::Relaxed) {
177 break;
178 }
179
180 self.stats.active_threads.fetch_add(1, Ordering::Relaxed);
181 for _ in 0..bins {
182 if flush {
183 let index = self.next_bucket_to_flush();
184 in_mem[index].flush();
185 }
186 self.stats.report_stats(self);
187 if self.all_buckets_flushed_at_current_age() {
188 break;
189 }
190 }
191 self.stats.active_threads.fetch_sub(1, Ordering::Relaxed);
192 }
193 }
194}
195
196#[cfg(test)]
197pub mod tests {
198 use super::*;
199 use rayon::prelude::*;
200 use std::sync::atomic::{AtomicUsize, Ordering};
201 use std::time::Instant;
202
203 #[test]
204 fn test_next_bucket_to_flush() {
205 gemachain_logger::setup();
206 let bins = 4;
207 let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
208 let visited = (0..bins)
209 .into_iter()
210 .map(|_| AtomicUsize::default())
211 .collect::<Vec<_>>();
212 let iterations = bins * 30;
213 let threads = bins * 4;
214 let expected = threads * iterations / bins;
215
216 (0..threads).into_par_iter().for_each(|_| {
217 (0..iterations).into_iter().for_each(|_| {
218 let bin = test.next_bucket_to_flush();
219 visited[bin].fetch_add(1, Ordering::Relaxed);
220 });
221 });
222 visited.iter().enumerate().for_each(|(bin, visited)| {
223 assert_eq!(visited.load(Ordering::Relaxed), expected, "bin: {}", bin)
224 });
225 }
226
227 #[test]
228 fn test_age_increment() {
229 gemachain_logger::setup();
230 let bins = 4;
231 let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
232 for age in 0..513 {
233 assert_eq!(test.current_age(), (age % 256) as Age);
234
235 for _ in 0..bins {
237 assert!(!test.all_buckets_flushed_at_current_age());
238 }
240
241 test.count_ages_flushed.fetch_add(bins, Ordering::Release);
243 test.increment_age();
244 }
245 }
246
247 #[test]
248 fn test_age_time() {
249 gemachain_logger::setup();
250 let bins = 1;
251 let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
252 let threads = 2;
253 let time = AGE_MS * 5 / 2;
254 let expected = (time / AGE_MS) as Age;
255 let now = Instant::now();
256 test.bucket_flushed_at_current_age(); (0..threads).into_par_iter().for_each(|_| {
258 while now.elapsed().as_millis() < (time as u128) {
259 if test.maybe_advance_age() {
260 test.bucket_flushed_at_current_age();
261 }
262 }
263 });
264 assert_eq!(test.current_age(), expected);
265 }
266
267 #[test]
268 fn test_age_broad() {
269 gemachain_logger::setup();
270 let bins = 4;
271 let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
272 assert_eq!(test.current_age(), 0);
273 for _ in 0..bins {
274 assert!(!test.all_buckets_flushed_at_current_age());
275 test.bucket_flushed_at_current_age();
276 }
277 std::thread::sleep(std::time::Duration::from_millis(AGE_MS * 2));
278 test.maybe_advance_age();
279 assert_eq!(test.current_age(), 1);
280 assert!(!test.all_buckets_flushed_at_current_age());
281 }
282}