solana_runtime/
bucket_map_holder.rs

1use {
2    crate::{
3        accounts_index::{AccountsIndexConfig, IndexLimitMb, IndexValue},
4        bucket_map_holder_stats::BucketMapHolderStats,
5        in_mem_accounts_index::InMemAccountsIndex,
6        waitable_condvar::WaitableCondvar,
7    },
8    solana_bucket_map::bucket_map::{BucketMap, BucketMapConfig},
9    safecoin_measure::measure::Measure,
10    solana_sdk::{
11        clock::{Slot, SLOT_MS},
12        timing::AtomicInterval,
13    },
14    std::{
15        fmt::Debug,
16        sync::{
17            atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering},
18            Arc,
19        },
20        time::Duration,
21    },
22};
23pub type Age = u8;
24
25const AGE_MS: u64 = SLOT_MS; // match one age per slot time
26
27// 10 GB limit for in-mem idx. In practice, we don't get this high. This tunes how aggressively to save items we expect to use soon.
28pub const DEFAULT_DISK_INDEX: Option<usize> = Some(10_000);
29
30pub struct BucketMapHolder<T: IndexValue> {
31    pub disk: Option<BucketMap<(Slot, T)>>,
32
33    pub count_buckets_flushed: AtomicUsize,
34    pub age: AtomicU8,
35    pub stats: BucketMapHolderStats,
36
37    age_timer: AtomicInterval,
38
39    // used by bg processing to know when any bucket has become dirty
40    pub wait_dirty_or_aged: Arc<WaitableCondvar>,
41    next_bucket_to_flush: AtomicUsize,
42    bins: usize,
43
44    pub threads: usize,
45
46    // how much mb are we allowed to keep in the in-mem index?
47    // Rest goes to disk.
48    pub mem_budget_mb: Option<usize>,
49
50    /// how many ages should elapse from the last time an item is used where the item will remain in the cache
51    pub ages_to_stay_in_cache: Age,
52
53    /// startup is a special time for flush to focus on moving everything to disk as fast and efficiently as possible
54    /// with less thread count limitations. LRU and access patterns are not important. Freeing memory
55    /// and writing to disk in parallel are.
56    /// Note startup is an optimization and is not required for correctness.
57    startup: AtomicBool,
58}
59
60impl<T: IndexValue> Debug for BucketMapHolder<T> {
61    fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62        Ok(())
63    }
64}
65
66#[allow(clippy::mutex_atomic)]
67impl<T: IndexValue> BucketMapHolder<T> {
68    /// is the accounts index using disk as a backing store
69    pub fn is_disk_index_enabled(&self) -> bool {
70        self.disk.is_some()
71    }
72
73    pub fn increment_age(&self) {
74        // since we are about to change age, there are now 0 buckets that have been flushed at this age
75        // this should happen before the age.fetch_add
76        // Otherwise, as soon as we increment the age, a thread could race us and flush before we swap this out since it detects the age has moved forward and a bucket will be eligible for flushing.
77        let previous = self.count_buckets_flushed.swap(0, Ordering::AcqRel);
78        // fetch_add is defined to wrap.
79        // That's what we want. 0..255, then back to 0.
80        self.age.fetch_add(1, Ordering::Release);
81        assert!(
82            previous >= self.bins,
83            "previous: {}, bins: {}",
84            previous,
85            self.bins
86        ); // we should not have increased age before previous age was fully flushed
87        self.wait_dirty_or_aged.notify_all(); // notify all because we can age scan in parallel
88    }
89
90    pub fn future_age_to_flush(&self) -> Age {
91        self.current_age().wrapping_add(self.ages_to_stay_in_cache)
92    }
93
94    fn has_age_interval_elapsed(&self) -> bool {
95        // note that when this returns true, state of age_timer is modified
96        self.age_timer.should_update(self.age_interval_ms())
97    }
98
99    /// used by bg processes to determine # active threads and how aggressively to flush
100    pub fn get_startup(&self) -> bool {
101        self.startup.load(Ordering::Relaxed)
102    }
103
104    /// startup=true causes:
105    ///      in mem to act in a way that flushes to disk asap
106    /// startup=false is 'normal' operation
107    pub fn set_startup(&self, value: bool) {
108        if !value {
109            self.wait_for_idle();
110        }
111        self.startup.store(value, Ordering::Relaxed)
112    }
113
114    /// return when the bg threads have reached an 'idle' state
115    pub(crate) fn wait_for_idle(&self) {
116        assert!(self.get_startup());
117        if self.disk.is_none() {
118            return;
119        }
120
121        // when age has incremented twice, we know that we have made it through scanning all bins since we started waiting,
122        //  so we are then 'idle'
123        let end_age = self.current_age().wrapping_add(2);
124        loop {
125            self.wait_dirty_or_aged
126                .wait_timeout(Duration::from_millis(self.age_interval_ms()));
127            if end_age == self.current_age() {
128                break;
129            }
130        }
131    }
132
133    pub fn current_age(&self) -> Age {
134        self.age.load(Ordering::Acquire)
135    }
136
137    pub fn bucket_flushed_at_current_age(&self, can_advance_age: bool) {
138        let count_buckets_flushed = 1 + self.count_buckets_flushed.fetch_add(1, Ordering::AcqRel);
139        if can_advance_age {
140            self.maybe_advance_age_internal(
141                self.all_buckets_flushed_at_current_age_internal(count_buckets_flushed),
142            );
143        }
144    }
145
146    /// have all buckets been flushed at the current age?
147    pub fn all_buckets_flushed_at_current_age(&self) -> bool {
148        self.all_buckets_flushed_at_current_age_internal(self.count_buckets_flushed())
149    }
150
151    /// have all buckets been flushed at the current age?
152    fn all_buckets_flushed_at_current_age_internal(&self, count_buckets_flushed: usize) -> bool {
153        count_buckets_flushed >= self.bins
154    }
155
156    pub fn count_buckets_flushed(&self) -> usize {
157        self.count_buckets_flushed.load(Ordering::Acquire)
158    }
159
160    /// if all buckets are flushed at the current age and time has elapsed, then advance age
161    pub fn maybe_advance_age(&self) -> bool {
162        self.maybe_advance_age_internal(self.all_buckets_flushed_at_current_age())
163    }
164
165    /// if all buckets are flushed at the current age and time has elapsed, then advance age
166    fn maybe_advance_age_internal(&self, all_buckets_flushed_at_current_age: bool) -> bool {
167        // call has_age_interval_elapsed last since calling it modifies state on success
168        if all_buckets_flushed_at_current_age && self.has_age_interval_elapsed() {
169            self.increment_age();
170            true
171        } else {
172            false
173        }
174    }
175
176    pub fn new(bins: usize, config: &Option<AccountsIndexConfig>, threads: usize) -> Self {
177        const DEFAULT_AGE_TO_STAY_IN_CACHE: Age = 5;
178        let ages_to_stay_in_cache = config
179            .as_ref()
180            .and_then(|config| config.ages_to_stay_in_cache)
181            .unwrap_or(DEFAULT_AGE_TO_STAY_IN_CACHE);
182
183        let mut bucket_config = BucketMapConfig::new(bins);
184        bucket_config.drives = config.as_ref().and_then(|config| config.drives.clone());
185        let mem_budget_mb = match config
186            .as_ref()
187            .map(|config| &config.index_limit_mb)
188            .unwrap_or(&IndexLimitMb::Unspecified)
189        {
190            // creator said to use disk idx with a specific limit
191            IndexLimitMb::Limit(mb) => Some(*mb),
192            // creator said InMemOnly, so no disk index
193            IndexLimitMb::InMemOnly => None,
194            // whatever started us didn't specify whether to use the acct idx
195            IndexLimitMb::Unspecified => {
196                // check env var if we were not started from a validator
197                let mut use_default = true;
198                if !config
199                    .as_ref()
200                    .map(|config| config.started_from_validator)
201                    .unwrap_or_default()
202                {
203                    if let Ok(_limit) = std::env::var("SAFECOIN_TEST_ACCOUNTS_INDEX_MEMORY_LIMIT_MB")
204                    {
205                        // Note this env var means the opposite of the default. The default now is disk index is on.
206                        // So, if this env var is set, DO NOT allocate with disk buckets if mem budget was not set, we were NOT started from validator, and env var was set
207                        // we do not want the env var to have an effect when running the validator (only tests, benches, etc.)
208                        use_default = false;
209                    }
210                }
211                if use_default {
212                    // if validator does not specify disk index limit or specify in mem only, then this is the default
213                    DEFAULT_DISK_INDEX
214                } else {
215                    None
216                }
217            }
218        };
219
220        // only allocate if mem_budget_mb is Some
221        let disk = mem_budget_mb.map(|_| BucketMap::new(bucket_config));
222        Self {
223            disk,
224            ages_to_stay_in_cache,
225            count_buckets_flushed: AtomicUsize::default(),
226            age: AtomicU8::default(),
227            stats: BucketMapHolderStats::new(bins),
228            wait_dirty_or_aged: Arc::default(),
229            next_bucket_to_flush: AtomicUsize::new(0),
230            age_timer: AtomicInterval::default(),
231            bins,
232            startup: AtomicBool::default(),
233            mem_budget_mb,
234            threads,
235        }
236    }
237
238    // get the next bucket to flush, with the idea that the previous bucket
239    // is perhaps being flushed by another thread already.
240    pub fn next_bucket_to_flush(&self) -> usize {
241        self.next_bucket_to_flush
242            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |bucket| {
243                Some((bucket + 1) % self.bins)
244            })
245            .unwrap()
246    }
247
248    /// prepare for this to be dynamic if necessary
249    /// For example, maybe startup has a shorter age interval.
250    fn age_interval_ms(&self) -> u64 {
251        AGE_MS
252    }
253
254    /// return an amount of ms to sleep
255    fn throttling_wait_ms_internal(
256        &self,
257        interval_ms: u64,
258        elapsed_ms: u64,
259        bins_flushed: u64,
260    ) -> Option<u64> {
261        let target_percent = 90; // aim to finish in 90% of the allocated time
262        let remaining_ms = (interval_ms * target_percent / 100).saturating_sub(elapsed_ms);
263        let remaining_bins = (self.bins as u64).saturating_sub(bins_flushed);
264        if remaining_bins == 0 || remaining_ms == 0 || elapsed_ms == 0 || bins_flushed == 0 {
265            // any of these conditions result in 'do not wait due to progress'
266            return None;
267        }
268        let ms_per_s = 1_000;
269        let rate_bins_per_s = bins_flushed * ms_per_s / elapsed_ms;
270        let expected_bins_processed_in_remaining_time = rate_bins_per_s * remaining_ms / ms_per_s;
271        if expected_bins_processed_in_remaining_time > remaining_bins {
272            // wait because we predict will finish prior to target
273            Some(1)
274        } else {
275            // do not wait because we predict will finish after target
276            None
277        }
278    }
279
280    /// Check progress this age.
281    /// Return ms to wait to get closer to the wait target and spread out work over the entire age interval.
282    /// Goal is to avoid cpu spikes at beginning of age interval.
283    fn throttling_wait_ms(&self) -> Option<u64> {
284        let interval_ms = self.age_interval_ms();
285        let elapsed_ms = self.age_timer.elapsed_ms();
286        let bins_flushed = self.count_buckets_flushed() as u64;
287        self.throttling_wait_ms_internal(interval_ms, elapsed_ms, bins_flushed)
288    }
289
290    /// true if this thread can sleep
291    fn should_thread_sleep(&self) -> bool {
292        let bins_flushed = self.count_buckets_flushed();
293        if bins_flushed >= self.bins {
294            // all bins flushed, so this thread can sleep
295            true
296        } else {
297            // at least 1 thread running for each bin that still needs to be flushed, so this thread can sleep
298            let active = self.stats.active_threads.load(Ordering::Relaxed);
299            bins_flushed.saturating_add(active as usize) >= self.bins
300        }
301    }
302
303    // intended to execute in a bg thread
304    pub fn background(
305        &self,
306        exit: Arc<AtomicBool>,
307        in_mem: Vec<Arc<InMemAccountsIndex<T>>>,
308        can_advance_age: bool,
309    ) {
310        let bins = in_mem.len();
311        let flush = self.disk.is_some();
312        let mut throttling_wait_ms = None;
313        loop {
314            if !flush {
315                self.wait_dirty_or_aged.wait_timeout(Duration::from_millis(
316                    self.stats.remaining_until_next_interval(),
317                ));
318            } else if self.should_thread_sleep() || throttling_wait_ms.is_some() {
319                let mut wait = std::cmp::min(
320                    self.age_timer
321                        .remaining_until_next_interval(self.age_interval_ms()),
322                    self.stats.remaining_until_next_interval(),
323                );
324                if !can_advance_age {
325                    // if this thread cannot advance age, then make sure we don't sleep 0
326                    wait = wait.max(1);
327                }
328                if let Some(throttling_wait_ms) = throttling_wait_ms {
329                    self.stats
330                        .bg_throttling_wait_us
331                        .fetch_add(throttling_wait_ms * 1000, Ordering::Relaxed);
332                    wait = std::cmp::min(throttling_wait_ms, wait);
333                }
334
335                let mut m = Measure::start("wait");
336                self.wait_dirty_or_aged
337                    .wait_timeout(Duration::from_millis(wait));
338                m.stop();
339                self.stats
340                    .bg_waiting_us
341                    .fetch_add(m.as_us(), Ordering::Relaxed);
342                // likely some time has elapsed. May have been waiting for age time interval to elapse.
343                if can_advance_age {
344                    self.maybe_advance_age();
345                }
346            }
347            throttling_wait_ms = None;
348
349            if exit.load(Ordering::Relaxed) {
350                break;
351            }
352
353            self.stats.active_threads.fetch_add(1, Ordering::Relaxed);
354            for _ in 0..bins {
355                if flush {
356                    let index = self.next_bucket_to_flush();
357                    in_mem[index].flush(can_advance_age);
358                }
359                self.stats.report_stats(self);
360                if self.all_buckets_flushed_at_current_age() {
361                    break;
362                }
363                throttling_wait_ms = self.throttling_wait_ms();
364                if throttling_wait_ms.is_some() {
365                    break;
366                }
367            }
368            self.stats.active_threads.fetch_sub(1, Ordering::Relaxed);
369        }
370    }
371}
372
373#[cfg(test)]
374pub mod tests {
375    use {super::*, rayon::prelude::*, std::time::Instant};
376
377    #[test]
378    fn test_next_bucket_to_flush() {
379        solana_logger::setup();
380        let bins = 4;
381        let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
382        let visited = (0..bins)
383            .into_iter()
384            .map(|_| AtomicUsize::default())
385            .collect::<Vec<_>>();
386        let iterations = bins * 30;
387        let threads = bins * 4;
388        let expected = threads * iterations / bins;
389
390        (0..threads).into_par_iter().for_each(|_| {
391            (0..iterations).into_iter().for_each(|_| {
392                let bin = test.next_bucket_to_flush();
393                visited[bin].fetch_add(1, Ordering::Relaxed);
394            });
395        });
396        visited.iter().enumerate().for_each(|(bin, visited)| {
397            assert_eq!(visited.load(Ordering::Relaxed), expected, "bin: {}", bin)
398        });
399    }
400
401    #[test]
402    fn test_age_increment() {
403        solana_logger::setup();
404        let bins = 4;
405        let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
406        for age in 0..513 {
407            assert_eq!(test.current_age(), (age % 256) as Age);
408
409            // inc all
410            for _ in 0..bins {
411                assert!(!test.all_buckets_flushed_at_current_age());
412                // cannot call this because based on timing, it may fire: test.bucket_flushed_at_current_age();
413            }
414
415            // this would normally happen once time went off and all buckets had been flushed at the previous age
416            test.count_buckets_flushed
417                .fetch_add(bins, Ordering::Release);
418            test.increment_age();
419        }
420    }
421
422    #[test]
423    fn test_throttle() {
424        solana_logger::setup();
425        let bins = 128;
426        let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
427        let bins = test.bins as u64;
428        let interval_ms = test.age_interval_ms();
429        // 90% of time elapsed, all but 1 bins flushed, should not wait since we'll end up right on time
430        let elapsed_ms = interval_ms * 89 / 100;
431        let bins_flushed = bins - 1;
432        let result = test.throttling_wait_ms_internal(interval_ms, elapsed_ms, bins_flushed);
433        assert_eq!(result, None);
434        // 10% of time, all bins but 1, should wait
435        let elapsed_ms = interval_ms / 10;
436        let bins_flushed = bins - 1;
437        let result = test.throttling_wait_ms_internal(interval_ms, elapsed_ms, bins_flushed);
438        assert_eq!(result, Some(1));
439        // 5% of time, 8% of bins, should wait. target is 90%. These #s roughly work
440        let elapsed_ms = interval_ms * 5 / 100;
441        let bins_flushed = bins * 8 / 100;
442        let result = test.throttling_wait_ms_internal(interval_ms, elapsed_ms, bins_flushed);
443        assert_eq!(result, Some(1));
444        // 11% of time, 12% of bins, should NOT wait. target is 90%. These #s roughly work
445        let elapsed_ms = interval_ms * 11 / 100;
446        let bins_flushed = bins * 12 / 100;
447        let result = test.throttling_wait_ms_internal(interval_ms, elapsed_ms, bins_flushed);
448        assert_eq!(result, None);
449    }
450
451    #[test]
452    fn test_disk_index_enabled() {
453        let bins = 1;
454        let config = AccountsIndexConfig {
455            index_limit_mb: IndexLimitMb::Limit(0),
456            ..AccountsIndexConfig::default()
457        };
458        let test = BucketMapHolder::<u64>::new(bins, &Some(config), 1);
459        assert!(test.is_disk_index_enabled());
460    }
461
462    #[test]
463    fn test_age_time() {
464        solana_logger::setup();
465        let bins = 1;
466        let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
467        let threads = 2;
468        let time = AGE_MS * 8 / 3;
469        let expected = (time / AGE_MS) as Age;
470        let now = Instant::now();
471        test.bucket_flushed_at_current_age(true); // done with age 0
472        (0..threads).into_par_iter().for_each(|_| {
473            // This test used to be more strict with time, but in a parallel, multi test environment,
474            // sometimes threads starve and this test intermittently fails. So, give it more time than it should require.
475            // This may be aggrevated by the strategy of only allowing thread 0 to advance the age.
476            while now.elapsed().as_millis() < (time as u128) * 100 {
477                if test.maybe_advance_age() {
478                    test.bucket_flushed_at_current_age(true);
479                }
480
481                if test.current_age() >= expected {
482                    break;
483                }
484            }
485        });
486        assert!(
487            test.current_age() >= expected,
488            "{}, {}",
489            test.current_age(),
490            expected
491        );
492    }
493
494    #[test]
495    fn test_age_broad() {
496        solana_logger::setup();
497        let bins = 4;
498        let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
499        assert_eq!(test.current_age(), 0);
500        for _ in 0..bins {
501            assert!(!test.all_buckets_flushed_at_current_age());
502            test.bucket_flushed_at_current_age(true);
503        }
504        std::thread::sleep(std::time::Duration::from_millis(AGE_MS * 2));
505        test.maybe_advance_age();
506        assert_eq!(test.current_age(), 1);
507        assert!(!test.all_buckets_flushed_at_current_age());
508    }
509}