solana_runtime/
accounts_index_storage.rs

1use {
2    crate::{
3        accounts_index::{AccountsIndexConfig, IndexValue},
4        bucket_map_holder::BucketMapHolder,
5        in_mem_accounts_index::InMemAccountsIndex,
6        waitable_condvar::WaitableCondvar,
7    },
8    std::{
9        fmt::Debug,
10        sync::{
11            atomic::{AtomicBool, Ordering},
12            Arc, Mutex,
13        },
14        thread::{Builder, JoinHandle},
15    },
16};
17
18/// Manages the lifetime of the background processing threads.
19pub struct AccountsIndexStorage<T: IndexValue> {
20    _bg_threads: BgThreads,
21
22    pub storage: Arc<BucketMapHolder<T>>,
23    pub in_mem: Vec<Arc<InMemAccountsIndex<T>>>,
24    exit: Arc<AtomicBool>,
25
26    /// set_startup(true) creates bg threads which are kept alive until set_startup(false)
27    startup_worker_threads: Mutex<Option<BgThreads>>,
28}
29
30impl<T: IndexValue> Debug for AccountsIndexStorage<T> {
31    fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32        Ok(())
33    }
34}
35
36/// low-level managing the bg threads
37struct BgThreads {
38    exit: Arc<AtomicBool>,
39    handles: Option<Vec<JoinHandle<()>>>,
40    wait: Arc<WaitableCondvar>,
41}
42
43impl Drop for BgThreads {
44    fn drop(&mut self) {
45        self.exit.store(true, Ordering::Relaxed);
46        self.wait.notify_all();
47        if let Some(handles) = self.handles.take() {
48            handles
49                .into_iter()
50                .for_each(|handle| handle.join().unwrap());
51        }
52    }
53}
54
55impl BgThreads {
56    fn new<T: IndexValue>(
57        storage: &Arc<BucketMapHolder<T>>,
58        in_mem: &[Arc<InMemAccountsIndex<T>>],
59        threads: usize,
60        can_advance_age: bool,
61        exit: &Arc<AtomicBool>,
62    ) -> Self {
63        // stop signal used for THIS batch of bg threads
64        let local_exit = Arc::new(AtomicBool::default());
65        let handles = Some(
66            (0..threads)
67                .map(|idx| {
68                    // the first thread we start is special
69                    let can_advance_age = can_advance_age && idx == 0;
70                    let storage_ = Arc::clone(storage);
71                    let local_exit_ = Arc::clone(&local_exit);
72                    let system_exit_ = Arc::clone(exit);
73                    let in_mem_ = in_mem.to_vec();
74
75                    // note that using rayon here causes us to exhaust # rayon threads and many tests running in parallel deadlock
76                    Builder::new()
77                        .name(format!("solIdxFlusher{idx:02}"))
78                        .spawn(move || {
79                            storage_.background(
80                                vec![local_exit_, system_exit_],
81                                in_mem_,
82                                can_advance_age,
83                            );
84                        })
85                        .unwrap()
86                })
87                .collect(),
88        );
89
90        BgThreads {
91            exit: local_exit,
92            handles,
93            wait: Arc::clone(&storage.wait_dirty_or_aged),
94        }
95    }
96}
97
98/// modes the system can be in
99pub enum Startup {
100    /// not startup, but steady state execution
101    Normal,
102    /// startup (not steady state execution)
103    /// requesting 'startup'-like behavior where in-mem acct idx items are flushed asap
104    Startup,
105    /// startup (not steady state execution)
106    /// but also requesting additional threads to be running to flush the acct idx to disk asap
107    /// The idea is that the best perf to ssds will be with multiple threads,
108    ///  but during steady state, we can't allocate as many threads because we'd starve the rest of the system.
109    StartupWithExtraThreads,
110}
111
112impl<T: IndexValue> AccountsIndexStorage<T> {
113    /// startup=true causes:
114    ///      in mem to act in a way that flushes to disk asap
115    ///      also creates some additional bg threads to facilitate flushing to disk asap
116    /// startup=false is 'normal' operation
117    pub fn set_startup(&self, startup: Startup) {
118        let value = !matches!(startup, Startup::Normal);
119        if matches!(startup, Startup::StartupWithExtraThreads) {
120            // create some additional bg threads to help get things to the disk index asap
121            *self.startup_worker_threads.lock().unwrap() = Some(BgThreads::new(
122                &self.storage,
123                &self.in_mem,
124                Self::num_threads(),
125                false, // cannot advance age from any of these threads
126                &self.exit,
127            ));
128        }
129        self.storage.set_startup(value);
130        if !value {
131            // transitioning from startup to !startup (ie. steady state)
132            // shutdown the bg threads
133            *self.startup_worker_threads.lock().unwrap() = None;
134            // maybe shrink hashmaps
135            self.shrink_to_fit();
136        }
137    }
138
139    /// estimate how many items are still needing to be flushed to the disk cache.
140    pub fn get_startup_remaining_items_to_flush_estimate(&self) -> usize {
141        self.storage
142            .disk
143            .as_ref()
144            .map(|_| self.storage.stats.get_remaining_items_to_flush_estimate())
145            .unwrap_or_default()
146    }
147
148    fn shrink_to_fit(&self) {
149        self.in_mem.iter().for_each(|mem| mem.shrink_to_fit())
150    }
151
152    fn num_threads() -> usize {
153        std::cmp::max(2, num_cpus::get() / 4)
154    }
155
156    /// allocate BucketMapHolder and InMemAccountsIndex[]
157    pub fn new(bins: usize, config: &Option<AccountsIndexConfig>, exit: &Arc<AtomicBool>) -> Self {
158        let threads = config
159            .as_ref()
160            .and_then(|config| config.flush_threads)
161            .unwrap_or_else(Self::num_threads);
162
163        let storage = Arc::new(BucketMapHolder::new(bins, config, threads));
164
165        let in_mem = (0..bins)
166            .map(|bin| Arc::new(InMemAccountsIndex::new(&storage, bin)))
167            .collect::<Vec<_>>();
168
169        Self {
170            _bg_threads: BgThreads::new(&storage, &in_mem, threads, true, exit),
171            storage,
172            in_mem,
173            startup_worker_threads: Mutex::default(),
174            exit: Arc::clone(exit),
175        }
176    }
177}