1use {
4 clap::{value_t_or_exit, Arg, ArgMatches},
5 solana_accounts_db::{accounts_db, accounts_index},
6 solana_clap_utils::{hidden_unless_forced, input_validators::is_within_range},
7 solana_rayon_threadlimit::{get_max_thread_count, get_thread_count},
8 std::{num::NonZeroUsize, ops::RangeInclusive},
9};
10
11pub struct DefaultThreadArgs {
13 pub accounts_db_clean_threads: String,
14 pub accounts_db_foreground_threads: String,
15 pub accounts_db_hash_threads: String,
16 pub accounts_index_flush_threads: String,
17 pub ip_echo_server_threads: String,
18 pub rayon_global_threads: String,
19 pub replay_forks_threads: String,
20 pub replay_transactions_threads: String,
21 pub rocksdb_compaction_threads: String,
22 pub rocksdb_flush_threads: String,
23 pub tvu_receive_threads: String,
24 pub tvu_retransmit_threads: String,
25 pub tvu_sigverify_threads: String,
26}
27
28impl Default for DefaultThreadArgs {
29 fn default() -> Self {
30 Self {
31 accounts_db_clean_threads: AccountsDbCleanThreadsArg::bounded_default().to_string(),
32 accounts_db_foreground_threads: AccountsDbForegroundThreadsArg::bounded_default()
33 .to_string(),
34 accounts_db_hash_threads: AccountsDbHashThreadsArg::bounded_default().to_string(),
35 accounts_index_flush_threads: AccountsIndexFlushThreadsArg::bounded_default()
36 .to_string(),
37 ip_echo_server_threads: IpEchoServerThreadsArg::bounded_default().to_string(),
38 rayon_global_threads: RayonGlobalThreadsArg::bounded_default().to_string(),
39 replay_forks_threads: ReplayForksThreadsArg::bounded_default().to_string(),
40 replay_transactions_threads: ReplayTransactionsThreadsArg::bounded_default()
41 .to_string(),
42 rocksdb_compaction_threads: RocksdbCompactionThreadsArg::bounded_default().to_string(),
43 rocksdb_flush_threads: RocksdbFlushThreadsArg::bounded_default().to_string(),
44 tvu_receive_threads: TvuReceiveThreadsArg::bounded_default().to_string(),
45 tvu_retransmit_threads: TvuRetransmitThreadsArg::bounded_default().to_string(),
46 tvu_sigverify_threads: TvuShredSigverifyThreadsArg::bounded_default().to_string(),
47 }
48 }
49}
50
51pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec<Arg<'_, 'a>> {
52 vec![
53 new_thread_arg::<AccountsDbCleanThreadsArg>(&defaults.accounts_db_clean_threads),
54 new_thread_arg::<AccountsDbForegroundThreadsArg>(&defaults.accounts_db_foreground_threads),
55 new_thread_arg::<AccountsDbHashThreadsArg>(&defaults.accounts_db_hash_threads),
56 new_thread_arg::<AccountsIndexFlushThreadsArg>(&defaults.accounts_index_flush_threads),
57 new_thread_arg::<IpEchoServerThreadsArg>(&defaults.ip_echo_server_threads),
58 new_thread_arg::<RayonGlobalThreadsArg>(&defaults.rayon_global_threads),
59 new_thread_arg::<ReplayForksThreadsArg>(&defaults.replay_forks_threads),
60 new_thread_arg::<ReplayTransactionsThreadsArg>(&defaults.replay_transactions_threads),
61 new_thread_arg::<RocksdbCompactionThreadsArg>(&defaults.rocksdb_compaction_threads),
62 new_thread_arg::<RocksdbFlushThreadsArg>(&defaults.rocksdb_flush_threads),
63 new_thread_arg::<TvuReceiveThreadsArg>(&defaults.tvu_receive_threads),
64 new_thread_arg::<TvuRetransmitThreadsArg>(&defaults.tvu_retransmit_threads),
65 new_thread_arg::<TvuShredSigverifyThreadsArg>(&defaults.tvu_sigverify_threads),
66 ]
67}
68
69fn new_thread_arg<'a, T: ThreadArg>(default: &str) -> Arg<'_, 'a> {
70 Arg::with_name(T::NAME)
71 .long(T::LONG_NAME)
72 .takes_value(true)
73 .value_name("NUMBER")
74 .default_value(default)
75 .validator(|num| is_within_range(num, T::range()))
76 .hidden(hidden_unless_forced())
77 .help(T::HELP)
78}
79
80pub struct NumThreadConfig {
81 pub accounts_db_clean_threads: NonZeroUsize,
82 pub accounts_db_foreground_threads: NonZeroUsize,
83 pub accounts_db_hash_threads: NonZeroUsize,
84 pub accounts_index_flush_threads: NonZeroUsize,
85 pub ip_echo_server_threads: NonZeroUsize,
86 pub rayon_global_threads: NonZeroUsize,
87 pub replay_forks_threads: NonZeroUsize,
88 pub replay_transactions_threads: NonZeroUsize,
89 pub rocksdb_compaction_threads: NonZeroUsize,
90 pub rocksdb_flush_threads: NonZeroUsize,
91 pub tvu_receive_threads: NonZeroUsize,
92 pub tvu_retransmit_threads: NonZeroUsize,
93 pub tvu_sigverify_threads: NonZeroUsize,
94}
95
96pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig {
97 NumThreadConfig {
98 accounts_db_clean_threads: value_t_or_exit!(
99 matches,
100 AccountsDbCleanThreadsArg::NAME,
101 NonZeroUsize
102 ),
103 accounts_db_foreground_threads: value_t_or_exit!(
104 matches,
105 AccountsDbForegroundThreadsArg::NAME,
106 NonZeroUsize
107 ),
108 accounts_db_hash_threads: value_t_or_exit!(
109 matches,
110 AccountsDbHashThreadsArg::NAME,
111 NonZeroUsize
112 ),
113 accounts_index_flush_threads: value_t_or_exit!(
114 matches,
115 AccountsIndexFlushThreadsArg::NAME,
116 NonZeroUsize
117 ),
118 ip_echo_server_threads: value_t_or_exit!(
119 matches,
120 IpEchoServerThreadsArg::NAME,
121 NonZeroUsize
122 ),
123 rayon_global_threads: value_t_or_exit!(matches, RayonGlobalThreadsArg::NAME, NonZeroUsize),
124 replay_forks_threads: if matches.is_present("replay_slots_concurrently") {
125 NonZeroUsize::new(4).expect("4 is non-zero")
126 } else {
127 value_t_or_exit!(matches, ReplayForksThreadsArg::NAME, NonZeroUsize)
128 },
129 replay_transactions_threads: value_t_or_exit!(
130 matches,
131 ReplayTransactionsThreadsArg::NAME,
132 NonZeroUsize
133 ),
134 rocksdb_compaction_threads: value_t_or_exit!(
135 matches,
136 RocksdbCompactionThreadsArg::NAME,
137 NonZeroUsize
138 ),
139 rocksdb_flush_threads: value_t_or_exit!(
140 matches,
141 RocksdbFlushThreadsArg::NAME,
142 NonZeroUsize
143 ),
144 tvu_receive_threads: value_t_or_exit!(matches, TvuReceiveThreadsArg::NAME, NonZeroUsize),
145 tvu_retransmit_threads: value_t_or_exit!(
146 matches,
147 TvuRetransmitThreadsArg::NAME,
148 NonZeroUsize
149 ),
150 tvu_sigverify_threads: value_t_or_exit!(
151 matches,
152 TvuShredSigverifyThreadsArg::NAME,
153 NonZeroUsize
154 ),
155 }
156}
157
158trait ThreadArg {
160 const NAME: &'static str;
162 const LONG_NAME: &'static str;
164 const HELP: &'static str;
166
167 fn default() -> usize;
169 fn bounded_default() -> usize {
173 std::cmp::min(Self::default(), Self::max())
174 }
175 fn min() -> usize {
177 1
178 }
179 fn max() -> usize {
181 get_max_thread_count()
183 }
184 fn range() -> RangeInclusive<usize> {
186 RangeInclusive::new(Self::min(), Self::max())
187 }
188}
189
190struct AccountsDbCleanThreadsArg;
191impl ThreadArg for AccountsDbCleanThreadsArg {
192 const NAME: &'static str = "accounts_db_clean_threads";
193 const LONG_NAME: &'static str = "accounts-db-clean-threads";
194 const HELP: &'static str = "Number of threads to use for cleaning AccountsDb";
195
196 fn default() -> usize {
197 accounts_db::quarter_thread_count()
198 }
199}
200
201struct AccountsDbForegroundThreadsArg;
202impl ThreadArg for AccountsDbForegroundThreadsArg {
203 const NAME: &'static str = "accounts_db_foreground_threads";
204 const LONG_NAME: &'static str = "accounts-db-foreground-threads";
205 const HELP: &'static str = "Number of threads to use for AccountsDb block processing";
206
207 fn default() -> usize {
208 accounts_db::default_num_foreground_threads()
209 }
210}
211
212struct AccountsDbHashThreadsArg;
213impl ThreadArg for AccountsDbHashThreadsArg {
214 const NAME: &'static str = "accounts_db_hash_threads";
215 const LONG_NAME: &'static str = "accounts-db-hash-threads";
216 const HELP: &'static str = "Number of threads to use for background accounts hashing";
217
218 fn default() -> usize {
219 accounts_db::default_num_hash_threads().get()
220 }
221}
222
223struct AccountsIndexFlushThreadsArg;
224impl ThreadArg for AccountsIndexFlushThreadsArg {
225 const NAME: &'static str = "accounts_index_flush_threads";
226 const LONG_NAME: &'static str = "accounts-index-flush-threads";
227 const HELP: &'static str = "Number of threads to use for flushing the accounts index";
228
229 fn default() -> usize {
230 accounts_index::default_num_flush_threads().get()
231 }
232}
233
234struct IpEchoServerThreadsArg;
235impl ThreadArg for IpEchoServerThreadsArg {
236 const NAME: &'static str = "ip_echo_server_threads";
237 const LONG_NAME: &'static str = "ip-echo-server-threads";
238 const HELP: &'static str = "Number of threads to use for the IP echo server";
239
240 fn default() -> usize {
241 solana_net_utils::DEFAULT_IP_ECHO_SERVER_THREADS.get()
242 }
243 fn min() -> usize {
244 solana_net_utils::MINIMUM_IP_ECHO_SERVER_THREADS.get()
245 }
246}
247
248struct RayonGlobalThreadsArg;
249impl ThreadArg for RayonGlobalThreadsArg {
250 const NAME: &'static str = "rayon_global_threads";
251 const LONG_NAME: &'static str = "rayon-global-threads";
252 const HELP: &'static str = "Number of threads to use for the global rayon thread pool";
253
254 fn default() -> usize {
255 get_max_thread_count()
256 }
257}
258
259struct ReplayForksThreadsArg;
260impl ThreadArg for ReplayForksThreadsArg {
261 const NAME: &'static str = "replay_forks_threads";
262 const LONG_NAME: &'static str = "replay-forks-threads";
263 const HELP: &'static str = "Number of threads to use for replay of blocks on different forks";
264
265 fn default() -> usize {
266 1
268 }
269 fn max() -> usize {
270 4
273 }
274}
275
276struct ReplayTransactionsThreadsArg;
277impl ThreadArg for ReplayTransactionsThreadsArg {
278 const NAME: &'static str = "replay_transactions_threads";
279 const LONG_NAME: &'static str = "replay-transactions-threads";
280 const HELP: &'static str = "Number of threads to use for transaction replay";
281
282 fn default() -> usize {
283 get_max_thread_count()
284 }
285}
286
287struct RocksdbCompactionThreadsArg;
288impl ThreadArg for RocksdbCompactionThreadsArg {
289 const NAME: &'static str = "rocksdb_compaction_threads";
290 const LONG_NAME: &'static str = "rocksdb-compaction-threads";
291 const HELP: &'static str = "Number of threads to use for rocksdb (Blockstore) compactions";
292
293 fn default() -> usize {
294 solana_ledger::blockstore::default_num_compaction_threads().get()
295 }
296}
297
298struct RocksdbFlushThreadsArg;
299impl ThreadArg for RocksdbFlushThreadsArg {
300 const NAME: &'static str = "rocksdb_flush_threads";
301 const LONG_NAME: &'static str = "rocksdb-flush-threads";
302 const HELP: &'static str = "Number of threads to use for rocksdb (Blockstore) memtable flushes";
303
304 fn default() -> usize {
305 solana_ledger::blockstore::default_num_flush_threads().get()
306 }
307}
308
309struct TvuReceiveThreadsArg;
310impl ThreadArg for TvuReceiveThreadsArg {
311 const NAME: &'static str = "tvu_receive_threads";
312 const LONG_NAME: &'static str = "tvu-receive-threads";
313 const HELP: &'static str =
314 "Number of threads (and sockets) to use for receiving shreds on the TVU port";
315
316 fn default() -> usize {
317 solana_gossip::cluster_info::DEFAULT_NUM_TVU_RECEIVE_SOCKETS.get()
318 }
319 fn min() -> usize {
320 solana_gossip::cluster_info::MINIMUM_NUM_TVU_RECEIVE_SOCKETS.get()
321 }
322}
323
324struct TvuRetransmitThreadsArg;
325impl ThreadArg for TvuRetransmitThreadsArg {
326 const NAME: &'static str = "tvu_retransmit_threads";
327 const LONG_NAME: &'static str = "tvu-retransmit-threads";
328 const HELP: &'static str = "Number of threads (and sockets) to use for retransmitting shreds";
329
330 fn default() -> usize {
331 solana_gossip::cluster_info::DEFAULT_NUM_TVU_RETRANSMIT_SOCKETS.get()
332 }
333
334 fn min() -> usize {
335 solana_gossip::cluster_info::MINIMUM_NUM_TVU_RETRANSMIT_SOCKETS.get()
336 }
337}
338
339struct TvuShredSigverifyThreadsArg;
340impl ThreadArg for TvuShredSigverifyThreadsArg {
341 const NAME: &'static str = "tvu_shred_sigverify_threads";
342 const LONG_NAME: &'static str = "tvu-shred-sigverify-threads";
343 const HELP: &'static str =
344 "Number of threads to use for performing signature verification of received shreds";
345
346 fn default() -> usize {
347 get_thread_count()
348 }
349}