1use {
2 crate::{
3 accounts_index::{AccountsIndexConfig, IndexLimitMb, IndexValue},
4 bucket_map_holder_stats::BucketMapHolderStats,
5 in_mem_accounts_index::InMemAccountsIndex,
6 waitable_condvar::WaitableCondvar,
7 },
8 solana_bucket_map::bucket_map::{BucketMap, BucketMapConfig},
9 safecoin_measure::measure::Measure,
10 solana_sdk::{
11 clock::{Slot, SLOT_MS},
12 timing::AtomicInterval,
13 },
14 std::{
15 fmt::Debug,
16 sync::{
17 atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering},
18 Arc,
19 },
20 time::Duration,
21 },
22};
23pub type Age = u8;
24
25const AGE_MS: u64 = SLOT_MS; pub const DEFAULT_DISK_INDEX: Option<usize> = Some(10_000);
29
30pub struct BucketMapHolder<T: IndexValue> {
31 pub disk: Option<BucketMap<(Slot, T)>>,
32
33 pub count_buckets_flushed: AtomicUsize,
34 pub age: AtomicU8,
35 pub stats: BucketMapHolderStats,
36
37 age_timer: AtomicInterval,
38
39 pub wait_dirty_or_aged: Arc<WaitableCondvar>,
41 next_bucket_to_flush: AtomicUsize,
42 bins: usize,
43
44 pub threads: usize,
45
46 pub mem_budget_mb: Option<usize>,
49
50 pub ages_to_stay_in_cache: Age,
52
53 startup: AtomicBool,
58}
59
60impl<T: IndexValue> Debug for BucketMapHolder<T> {
61 fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62 Ok(())
63 }
64}
65
66#[allow(clippy::mutex_atomic)]
67impl<T: IndexValue> BucketMapHolder<T> {
68 pub fn is_disk_index_enabled(&self) -> bool {
70 self.disk.is_some()
71 }
72
73 pub fn increment_age(&self) {
74 let previous = self.count_buckets_flushed.swap(0, Ordering::AcqRel);
78 self.age.fetch_add(1, Ordering::Release);
81 assert!(
82 previous >= self.bins,
83 "previous: {}, bins: {}",
84 previous,
85 self.bins
86 ); self.wait_dirty_or_aged.notify_all(); }
89
90 pub fn future_age_to_flush(&self) -> Age {
91 self.current_age().wrapping_add(self.ages_to_stay_in_cache)
92 }
93
94 fn has_age_interval_elapsed(&self) -> bool {
95 self.age_timer.should_update(self.age_interval_ms())
97 }
98
99 pub fn get_startup(&self) -> bool {
101 self.startup.load(Ordering::Relaxed)
102 }
103
104 pub fn set_startup(&self, value: bool) {
108 if !value {
109 self.wait_for_idle();
110 }
111 self.startup.store(value, Ordering::Relaxed)
112 }
113
114 pub(crate) fn wait_for_idle(&self) {
116 assert!(self.get_startup());
117 if self.disk.is_none() {
118 return;
119 }
120
121 let end_age = self.current_age().wrapping_add(2);
124 loop {
125 self.wait_dirty_or_aged
126 .wait_timeout(Duration::from_millis(self.age_interval_ms()));
127 if end_age == self.current_age() {
128 break;
129 }
130 }
131 }
132
133 pub fn current_age(&self) -> Age {
134 self.age.load(Ordering::Acquire)
135 }
136
137 pub fn bucket_flushed_at_current_age(&self, can_advance_age: bool) {
138 let count_buckets_flushed = 1 + self.count_buckets_flushed.fetch_add(1, Ordering::AcqRel);
139 if can_advance_age {
140 self.maybe_advance_age_internal(
141 self.all_buckets_flushed_at_current_age_internal(count_buckets_flushed),
142 );
143 }
144 }
145
146 pub fn all_buckets_flushed_at_current_age(&self) -> bool {
148 self.all_buckets_flushed_at_current_age_internal(self.count_buckets_flushed())
149 }
150
151 fn all_buckets_flushed_at_current_age_internal(&self, count_buckets_flushed: usize) -> bool {
153 count_buckets_flushed >= self.bins
154 }
155
156 pub fn count_buckets_flushed(&self) -> usize {
157 self.count_buckets_flushed.load(Ordering::Acquire)
158 }
159
160 pub fn maybe_advance_age(&self) -> bool {
162 self.maybe_advance_age_internal(self.all_buckets_flushed_at_current_age())
163 }
164
165 fn maybe_advance_age_internal(&self, all_buckets_flushed_at_current_age: bool) -> bool {
167 if all_buckets_flushed_at_current_age && self.has_age_interval_elapsed() {
169 self.increment_age();
170 true
171 } else {
172 false
173 }
174 }
175
176 pub fn new(bins: usize, config: &Option<AccountsIndexConfig>, threads: usize) -> Self {
177 const DEFAULT_AGE_TO_STAY_IN_CACHE: Age = 5;
178 let ages_to_stay_in_cache = config
179 .as_ref()
180 .and_then(|config| config.ages_to_stay_in_cache)
181 .unwrap_or(DEFAULT_AGE_TO_STAY_IN_CACHE);
182
183 let mut bucket_config = BucketMapConfig::new(bins);
184 bucket_config.drives = config.as_ref().and_then(|config| config.drives.clone());
185 let mem_budget_mb = match config
186 .as_ref()
187 .map(|config| &config.index_limit_mb)
188 .unwrap_or(&IndexLimitMb::Unspecified)
189 {
190 IndexLimitMb::Limit(mb) => Some(*mb),
192 IndexLimitMb::InMemOnly => None,
194 IndexLimitMb::Unspecified => {
196 let mut use_default = true;
198 if !config
199 .as_ref()
200 .map(|config| config.started_from_validator)
201 .unwrap_or_default()
202 {
203 if let Ok(_limit) = std::env::var("SAFECOIN_TEST_ACCOUNTS_INDEX_MEMORY_LIMIT_MB")
204 {
205 use_default = false;
209 }
210 }
211 if use_default {
212 DEFAULT_DISK_INDEX
214 } else {
215 None
216 }
217 }
218 };
219
220 let disk = mem_budget_mb.map(|_| BucketMap::new(bucket_config));
222 Self {
223 disk,
224 ages_to_stay_in_cache,
225 count_buckets_flushed: AtomicUsize::default(),
226 age: AtomicU8::default(),
227 stats: BucketMapHolderStats::new(bins),
228 wait_dirty_or_aged: Arc::default(),
229 next_bucket_to_flush: AtomicUsize::new(0),
230 age_timer: AtomicInterval::default(),
231 bins,
232 startup: AtomicBool::default(),
233 mem_budget_mb,
234 threads,
235 }
236 }
237
238 pub fn next_bucket_to_flush(&self) -> usize {
241 self.next_bucket_to_flush
242 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |bucket| {
243 Some((bucket + 1) % self.bins)
244 })
245 .unwrap()
246 }
247
248 fn age_interval_ms(&self) -> u64 {
251 AGE_MS
252 }
253
254 fn throttling_wait_ms_internal(
256 &self,
257 interval_ms: u64,
258 elapsed_ms: u64,
259 bins_flushed: u64,
260 ) -> Option<u64> {
261 let target_percent = 90; let remaining_ms = (interval_ms * target_percent / 100).saturating_sub(elapsed_ms);
263 let remaining_bins = (self.bins as u64).saturating_sub(bins_flushed);
264 if remaining_bins == 0 || remaining_ms == 0 || elapsed_ms == 0 || bins_flushed == 0 {
265 return None;
267 }
268 let ms_per_s = 1_000;
269 let rate_bins_per_s = bins_flushed * ms_per_s / elapsed_ms;
270 let expected_bins_processed_in_remaining_time = rate_bins_per_s * remaining_ms / ms_per_s;
271 if expected_bins_processed_in_remaining_time > remaining_bins {
272 Some(1)
274 } else {
275 None
277 }
278 }
279
280 fn throttling_wait_ms(&self) -> Option<u64> {
284 let interval_ms = self.age_interval_ms();
285 let elapsed_ms = self.age_timer.elapsed_ms();
286 let bins_flushed = self.count_buckets_flushed() as u64;
287 self.throttling_wait_ms_internal(interval_ms, elapsed_ms, bins_flushed)
288 }
289
290 fn should_thread_sleep(&self) -> bool {
292 let bins_flushed = self.count_buckets_flushed();
293 if bins_flushed >= self.bins {
294 true
296 } else {
297 let active = self.stats.active_threads.load(Ordering::Relaxed);
299 bins_flushed.saturating_add(active as usize) >= self.bins
300 }
301 }
302
303 pub fn background(
305 &self,
306 exit: Arc<AtomicBool>,
307 in_mem: Vec<Arc<InMemAccountsIndex<T>>>,
308 can_advance_age: bool,
309 ) {
310 let bins = in_mem.len();
311 let flush = self.disk.is_some();
312 let mut throttling_wait_ms = None;
313 loop {
314 if !flush {
315 self.wait_dirty_or_aged.wait_timeout(Duration::from_millis(
316 self.stats.remaining_until_next_interval(),
317 ));
318 } else if self.should_thread_sleep() || throttling_wait_ms.is_some() {
319 let mut wait = std::cmp::min(
320 self.age_timer
321 .remaining_until_next_interval(self.age_interval_ms()),
322 self.stats.remaining_until_next_interval(),
323 );
324 if !can_advance_age {
325 wait = wait.max(1);
327 }
328 if let Some(throttling_wait_ms) = throttling_wait_ms {
329 self.stats
330 .bg_throttling_wait_us
331 .fetch_add(throttling_wait_ms * 1000, Ordering::Relaxed);
332 wait = std::cmp::min(throttling_wait_ms, wait);
333 }
334
335 let mut m = Measure::start("wait");
336 self.wait_dirty_or_aged
337 .wait_timeout(Duration::from_millis(wait));
338 m.stop();
339 self.stats
340 .bg_waiting_us
341 .fetch_add(m.as_us(), Ordering::Relaxed);
342 if can_advance_age {
344 self.maybe_advance_age();
345 }
346 }
347 throttling_wait_ms = None;
348
349 if exit.load(Ordering::Relaxed) {
350 break;
351 }
352
353 self.stats.active_threads.fetch_add(1, Ordering::Relaxed);
354 for _ in 0..bins {
355 if flush {
356 let index = self.next_bucket_to_flush();
357 in_mem[index].flush(can_advance_age);
358 }
359 self.stats.report_stats(self);
360 if self.all_buckets_flushed_at_current_age() {
361 break;
362 }
363 throttling_wait_ms = self.throttling_wait_ms();
364 if throttling_wait_ms.is_some() {
365 break;
366 }
367 }
368 self.stats.active_threads.fetch_sub(1, Ordering::Relaxed);
369 }
370 }
371}
372
373#[cfg(test)]
374pub mod tests {
375 use {super::*, rayon::prelude::*, std::time::Instant};
376
377 #[test]
378 fn test_next_bucket_to_flush() {
379 solana_logger::setup();
380 let bins = 4;
381 let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
382 let visited = (0..bins)
383 .into_iter()
384 .map(|_| AtomicUsize::default())
385 .collect::<Vec<_>>();
386 let iterations = bins * 30;
387 let threads = bins * 4;
388 let expected = threads * iterations / bins;
389
390 (0..threads).into_par_iter().for_each(|_| {
391 (0..iterations).into_iter().for_each(|_| {
392 let bin = test.next_bucket_to_flush();
393 visited[bin].fetch_add(1, Ordering::Relaxed);
394 });
395 });
396 visited.iter().enumerate().for_each(|(bin, visited)| {
397 assert_eq!(visited.load(Ordering::Relaxed), expected, "bin: {}", bin)
398 });
399 }
400
401 #[test]
402 fn test_age_increment() {
403 solana_logger::setup();
404 let bins = 4;
405 let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
406 for age in 0..513 {
407 assert_eq!(test.current_age(), (age % 256) as Age);
408
409 for _ in 0..bins {
411 assert!(!test.all_buckets_flushed_at_current_age());
412 }
414
415 test.count_buckets_flushed
417 .fetch_add(bins, Ordering::Release);
418 test.increment_age();
419 }
420 }
421
422 #[test]
423 fn test_throttle() {
424 solana_logger::setup();
425 let bins = 128;
426 let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
427 let bins = test.bins as u64;
428 let interval_ms = test.age_interval_ms();
429 let elapsed_ms = interval_ms * 89 / 100;
431 let bins_flushed = bins - 1;
432 let result = test.throttling_wait_ms_internal(interval_ms, elapsed_ms, bins_flushed);
433 assert_eq!(result, None);
434 let elapsed_ms = interval_ms / 10;
436 let bins_flushed = bins - 1;
437 let result = test.throttling_wait_ms_internal(interval_ms, elapsed_ms, bins_flushed);
438 assert_eq!(result, Some(1));
439 let elapsed_ms = interval_ms * 5 / 100;
441 let bins_flushed = bins * 8 / 100;
442 let result = test.throttling_wait_ms_internal(interval_ms, elapsed_ms, bins_flushed);
443 assert_eq!(result, Some(1));
444 let elapsed_ms = interval_ms * 11 / 100;
446 let bins_flushed = bins * 12 / 100;
447 let result = test.throttling_wait_ms_internal(interval_ms, elapsed_ms, bins_flushed);
448 assert_eq!(result, None);
449 }
450
451 #[test]
452 fn test_disk_index_enabled() {
453 let bins = 1;
454 let config = AccountsIndexConfig {
455 index_limit_mb: IndexLimitMb::Limit(0),
456 ..AccountsIndexConfig::default()
457 };
458 let test = BucketMapHolder::<u64>::new(bins, &Some(config), 1);
459 assert!(test.is_disk_index_enabled());
460 }
461
462 #[test]
463 fn test_age_time() {
464 solana_logger::setup();
465 let bins = 1;
466 let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
467 let threads = 2;
468 let time = AGE_MS * 8 / 3;
469 let expected = (time / AGE_MS) as Age;
470 let now = Instant::now();
471 test.bucket_flushed_at_current_age(true); (0..threads).into_par_iter().for_each(|_| {
473 while now.elapsed().as_millis() < (time as u128) * 100 {
477 if test.maybe_advance_age() {
478 test.bucket_flushed_at_current_age(true);
479 }
480
481 if test.current_age() >= expected {
482 break;
483 }
484 }
485 });
486 assert!(
487 test.current_age() >= expected,
488 "{}, {}",
489 test.current_age(),
490 expected
491 );
492 }
493
494 #[test]
495 fn test_age_broad() {
496 solana_logger::setup();
497 let bins = 4;
498 let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
499 assert_eq!(test.current_age(), 0);
500 for _ in 0..bins {
501 assert!(!test.all_buckets_flushed_at_current_age());
502 test.bucket_flushed_at_current_age(true);
503 }
504 std::thread::sleep(std::time::Duration::from_millis(AGE_MS * 2));
505 test.maybe_advance_age();
506 assert_eq!(test.current_age(), 1);
507 assert!(!test.all_buckets_flushed_at_current_age());
508 }
509}