solana_runtime/
accounts_index_storage.rs1use {
2 crate::{
3 accounts_index::{AccountsIndexConfig, IndexValue},
4 bucket_map_holder::BucketMapHolder,
5 in_mem_accounts_index::InMemAccountsIndex,
6 waitable_condvar::WaitableCondvar,
7 },
8 std::{
9 fmt::Debug,
10 sync::{
11 atomic::{AtomicBool, Ordering},
12 Arc, Mutex,
13 },
14 thread::{Builder, JoinHandle},
15 },
16};
17
18pub struct AccountsIndexStorage<T: IndexValue> {
20 _bg_threads: BgThreads,
21
22 pub storage: Arc<BucketMapHolder<T>>,
23 pub in_mem: Vec<Arc<InMemAccountsIndex<T>>>,
24 exit: Arc<AtomicBool>,
25
26 startup_worker_threads: Mutex<Option<BgThreads>>,
28}
29
30impl<T: IndexValue> Debug for AccountsIndexStorage<T> {
31 fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32 Ok(())
33 }
34}
35
36struct BgThreads {
38 exit: Arc<AtomicBool>,
39 handles: Option<Vec<JoinHandle<()>>>,
40 wait: Arc<WaitableCondvar>,
41}
42
43impl Drop for BgThreads {
44 fn drop(&mut self) {
45 self.exit.store(true, Ordering::Relaxed);
46 self.wait.notify_all();
47 if let Some(handles) = self.handles.take() {
48 handles
49 .into_iter()
50 .for_each(|handle| handle.join().unwrap());
51 }
52 }
53}
54
55impl BgThreads {
56 fn new<T: IndexValue>(
57 storage: &Arc<BucketMapHolder<T>>,
58 in_mem: &[Arc<InMemAccountsIndex<T>>],
59 threads: usize,
60 can_advance_age: bool,
61 exit: &Arc<AtomicBool>,
62 ) -> Self {
63 let local_exit = Arc::new(AtomicBool::default());
65 let handles = Some(
66 (0..threads)
67 .map(|idx| {
68 let can_advance_age = can_advance_age && idx == 0;
70 let storage_ = Arc::clone(storage);
71 let local_exit_ = Arc::clone(&local_exit);
72 let system_exit_ = Arc::clone(exit);
73 let in_mem_ = in_mem.to_vec();
74
75 Builder::new()
77 .name(format!("solIdxFlusher{idx:02}"))
78 .spawn(move || {
79 storage_.background(
80 vec![local_exit_, system_exit_],
81 in_mem_,
82 can_advance_age,
83 );
84 })
85 .unwrap()
86 })
87 .collect(),
88 );
89
90 BgThreads {
91 exit: local_exit,
92 handles,
93 wait: Arc::clone(&storage.wait_dirty_or_aged),
94 }
95 }
96}
97
98pub enum Startup {
100 Normal,
102 Startup,
105 StartupWithExtraThreads,
110}
111
112impl<T: IndexValue> AccountsIndexStorage<T> {
113 pub fn set_startup(&self, startup: Startup) {
118 let value = !matches!(startup, Startup::Normal);
119 if matches!(startup, Startup::StartupWithExtraThreads) {
120 *self.startup_worker_threads.lock().unwrap() = Some(BgThreads::new(
122 &self.storage,
123 &self.in_mem,
124 Self::num_threads(),
125 false, &self.exit,
127 ));
128 }
129 self.storage.set_startup(value);
130 if !value {
131 *self.startup_worker_threads.lock().unwrap() = None;
134 self.shrink_to_fit();
136 }
137 }
138
139 pub fn get_startup_remaining_items_to_flush_estimate(&self) -> usize {
141 self.storage
142 .disk
143 .as_ref()
144 .map(|_| self.storage.stats.get_remaining_items_to_flush_estimate())
145 .unwrap_or_default()
146 }
147
148 fn shrink_to_fit(&self) {
149 self.in_mem.iter().for_each(|mem| mem.shrink_to_fit())
150 }
151
152 fn num_threads() -> usize {
153 std::cmp::max(2, num_cpus::get() / 4)
154 }
155
156 pub fn new(bins: usize, config: &Option<AccountsIndexConfig>, exit: &Arc<AtomicBool>) -> Self {
158 let threads = config
159 .as_ref()
160 .and_then(|config| config.flush_threads)
161 .unwrap_or_else(Self::num_threads);
162
163 let storage = Arc::new(BucketMapHolder::new(bins, config, threads));
164
165 let in_mem = (0..bins)
166 .map(|bin| Arc::new(InMemAccountsIndex::new(&storage, bin)))
167 .collect::<Vec<_>>();
168
169 Self {
170 _bg_threads: BgThreads::new(&storage, &in_mem, threads, true, exit),
171 storage,
172 in_mem,
173 startup_worker_threads: Mutex::default(),
174 exit: Arc::clone(exit),
175 }
176 }
177}