gemachain_runtime/
bucket_map_holder.rs

1use crate::accounts_index::{AccountsIndexConfig, IndexValue};
2use crate::bucket_map_holder_stats::BucketMapHolderStats;
3use crate::in_mem_accounts_index::{InMemAccountsIndex, SlotT};
4use crate::waitable_condvar::WaitableCondvar;
5use gemachain_bucket_map::bucket_map::{BucketMap, BucketMapConfig};
6use gemachain_measure::measure::Measure;
7use gemachain_sdk::clock::SLOT_MS;
8use gemachain_sdk::timing::AtomicInterval;
9use std::fmt::Debug;
10use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
11use std::sync::{Arc, Mutex};
12use std::time::Duration;
13pub type Age = u8;
14
15pub const AGE_MS: u64 = SLOT_MS; // match one age per slot time
16
17pub struct BucketMapHolder<T: IndexValue> {
18    pub disk: Option<BucketMap<SlotT<T>>>,
19
20    pub count_ages_flushed: AtomicUsize,
21    pub age: AtomicU8,
22    pub stats: BucketMapHolderStats,
23
24    age_timer: AtomicInterval,
25
26    // used by bg processing to know when any bucket has become dirty
27    pub wait_dirty_or_aged: WaitableCondvar,
28    next_bucket_to_flush: Mutex<usize>,
29    bins: usize,
30
31    _threads: usize,
32
33    // how much mb are we allowed to keep in the in-mem index?
34    // Rest goes to disk.
35    pub mem_budget_mb: Option<usize>,
36    ages_to_stay_in_cache: Age,
37
38    /// startup is a special time for flush to focus on moving everything to disk as fast and efficiently as possible
39    /// with less thread count limitations. LRU and access patterns are not important. Freeing memory
40    /// and writing to disk in parallel are.
41    /// Note startup is an optimization and is not required for correctness.
42    startup: AtomicBool,
43}
44
45impl<T: IndexValue> Debug for BucketMapHolder<T> {
46    fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47        Ok(())
48    }
49}
50
51#[allow(clippy::mutex_atomic)]
52impl<T: IndexValue> BucketMapHolder<T> {
53    pub fn increment_age(&self) {
54        // since we are about to change age, there are now 0 buckets that have been flushed at this age
55        // this should happen before the age.fetch_add
56        let previous = self.count_ages_flushed.swap(0, Ordering::Acquire);
57        // fetch_add is defined to wrap.
58        // That's what we want. 0..255, then back to 0.
59        self.age.fetch_add(1, Ordering::Release);
60        assert!(previous >= self.bins); // we should not have increased age before previous age was fully flushed
61        self.wait_dirty_or_aged.notify_all(); // notify all because we can age scan in parallel
62    }
63
64    pub fn future_age_to_flush(&self) -> Age {
65        self.current_age().wrapping_add(self.ages_to_stay_in_cache)
66    }
67
68    fn has_age_interval_elapsed(&self) -> bool {
69        // note that when this returns true, state of age_timer is modified
70        self.age_timer.should_update(AGE_MS)
71    }
72
73    /// used by bg processes to determine # active threads and how aggressively to flush
74    pub fn get_startup(&self) -> bool {
75        self.startup.load(Ordering::Relaxed)
76    }
77
78    pub fn set_startup(&self, value: bool) {
79        if !value {
80            self.wait_for_idle();
81        }
82        self.startup.store(value, Ordering::Relaxed)
83    }
84
85    pub(crate) fn wait_for_idle(&self) {
86        assert!(self.get_startup());
87    }
88
89    pub fn current_age(&self) -> Age {
90        self.age.load(Ordering::Acquire)
91    }
92
93    pub fn bucket_flushed_at_current_age(&self) {
94        self.count_ages_flushed.fetch_add(1, Ordering::Release);
95        self.maybe_advance_age();
96    }
97
98    // have all buckets been flushed at the current age?
99    pub fn all_buckets_flushed_at_current_age(&self) -> bool {
100        self.count_ages_flushed() >= self.bins
101    }
102
103    pub fn count_ages_flushed(&self) -> usize {
104        self.count_ages_flushed.load(Ordering::Acquire)
105    }
106
107    pub fn maybe_advance_age(&self) -> bool {
108        // check has_age_interval_elapsed last as calling it modifies state on success
109        if self.all_buckets_flushed_at_current_age() && self.has_age_interval_elapsed() {
110            self.increment_age();
111            true
112        } else {
113            false
114        }
115    }
116
117    pub fn new(bins: usize, config: &Option<AccountsIndexConfig>, threads: usize) -> Self {
118        const DEFAULT_AGE_TO_STAY_IN_CACHE: Age = 5;
119        let ages_to_stay_in_cache = config
120            .as_ref()
121            .and_then(|config| config.ages_to_stay_in_cache)
122            .unwrap_or(DEFAULT_AGE_TO_STAY_IN_CACHE);
123
124        let mut bucket_config = BucketMapConfig::new(bins);
125        bucket_config.drives = config.as_ref().and_then(|config| config.drives.clone());
126        let mem_budget_mb = config.as_ref().and_then(|config| config.index_limit_mb);
127        // only allocate if mem_budget_mb is Some
128        let disk = mem_budget_mb.map(|_| BucketMap::new(bucket_config));
129        Self {
130            disk,
131            ages_to_stay_in_cache,
132            count_ages_flushed: AtomicUsize::default(),
133            age: AtomicU8::default(),
134            stats: BucketMapHolderStats::new(bins),
135            wait_dirty_or_aged: WaitableCondvar::default(),
136            next_bucket_to_flush: Mutex::new(0),
137            age_timer: AtomicInterval::default(),
138            bins,
139            startup: AtomicBool::default(),
140            mem_budget_mb,
141            _threads: threads,
142        }
143    }
144
145    // get the next bucket to flush, with the idea that the previous bucket
146    // is perhaps being flushed by another thread already.
147    pub fn next_bucket_to_flush(&self) -> usize {
148        // could be lock-free as an optimization
149        // wrapping is tricky
150        let mut lock = self.next_bucket_to_flush.lock().unwrap();
151        let result = *lock;
152        *lock = (result + 1) % self.bins;
153        result
154    }
155
156    // intended to execute in a bg thread
157    pub fn background(&self, exit: Arc<AtomicBool>, in_mem: Vec<Arc<InMemAccountsIndex<T>>>) {
158        let bins = in_mem.len();
159        let flush = self.disk.is_some();
160        loop {
161            if self.all_buckets_flushed_at_current_age() {
162                let wait = std::cmp::min(
163                    self.age_timer.remaining_until_next_interval(AGE_MS),
164                    self.stats.remaining_until_next_interval(),
165                );
166
167                let mut m = Measure::start("wait");
168                self.wait_dirty_or_aged
169                    .wait_timeout(Duration::from_millis(wait));
170                m.stop();
171                self.stats
172                    .bg_waiting_us
173                    .fetch_add(m.as_us(), Ordering::Relaxed);
174            }
175
176            if exit.load(Ordering::Relaxed) {
177                break;
178            }
179
180            self.stats.active_threads.fetch_add(1, Ordering::Relaxed);
181            for _ in 0..bins {
182                if flush {
183                    let index = self.next_bucket_to_flush();
184                    in_mem[index].flush();
185                }
186                self.stats.report_stats(self);
187                if self.all_buckets_flushed_at_current_age() {
188                    break;
189                }
190            }
191            self.stats.active_threads.fetch_sub(1, Ordering::Relaxed);
192        }
193    }
194}
195
196#[cfg(test)]
197pub mod tests {
198    use super::*;
199    use rayon::prelude::*;
200    use std::sync::atomic::{AtomicUsize, Ordering};
201    use std::time::Instant;
202
203    #[test]
204    fn test_next_bucket_to_flush() {
205        gemachain_logger::setup();
206        let bins = 4;
207        let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
208        let visited = (0..bins)
209            .into_iter()
210            .map(|_| AtomicUsize::default())
211            .collect::<Vec<_>>();
212        let iterations = bins * 30;
213        let threads = bins * 4;
214        let expected = threads * iterations / bins;
215
216        (0..threads).into_par_iter().for_each(|_| {
217            (0..iterations).into_iter().for_each(|_| {
218                let bin = test.next_bucket_to_flush();
219                visited[bin].fetch_add(1, Ordering::Relaxed);
220            });
221        });
222        visited.iter().enumerate().for_each(|(bin, visited)| {
223            assert_eq!(visited.load(Ordering::Relaxed), expected, "bin: {}", bin)
224        });
225    }
226
227    #[test]
228    fn test_age_increment() {
229        gemachain_logger::setup();
230        let bins = 4;
231        let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
232        for age in 0..513 {
233            assert_eq!(test.current_age(), (age % 256) as Age);
234
235            // inc all
236            for _ in 0..bins {
237                assert!(!test.all_buckets_flushed_at_current_age());
238                // cannot call this because based on timing, it may fire: test.bucket_flushed_at_current_age();
239            }
240
241            // this would normally happen once time went off and all buckets had been flushed at the previous age
242            test.count_ages_flushed.fetch_add(bins, Ordering::Release);
243            test.increment_age();
244        }
245    }
246
247    #[test]
248    fn test_age_time() {
249        gemachain_logger::setup();
250        let bins = 1;
251        let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
252        let threads = 2;
253        let time = AGE_MS * 5 / 2;
254        let expected = (time / AGE_MS) as Age;
255        let now = Instant::now();
256        test.bucket_flushed_at_current_age(); // done with age 0
257        (0..threads).into_par_iter().for_each(|_| {
258            while now.elapsed().as_millis() < (time as u128) {
259                if test.maybe_advance_age() {
260                    test.bucket_flushed_at_current_age();
261                }
262            }
263        });
264        assert_eq!(test.current_age(), expected);
265    }
266
267    #[test]
268    fn test_age_broad() {
269        gemachain_logger::setup();
270        let bins = 4;
271        let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
272        assert_eq!(test.current_age(), 0);
273        for _ in 0..bins {
274            assert!(!test.all_buckets_flushed_at_current_age());
275            test.bucket_flushed_at_current_age();
276        }
277        std::thread::sleep(std::time::Duration::from_millis(AGE_MS * 2));
278        test.maybe_advance_age();
279        assert_eq!(test.current_age(), 1);
280        assert!(!test.all_buckets_flushed_at_current_age());
281    }
282}