1use {
4 clap::{value_t_or_exit, Arg, ArgMatches},
5 solana_accounts_db::{accounts_db, accounts_index},
6 solana_clap_utils::{hidden_unless_forced, input_validators::is_within_range},
7 solana_core::banking_stage::BankingStage,
8 solana_rayon_threadlimit::get_thread_count,
9 std::{num::NonZeroUsize, ops::RangeInclusive},
10};
11
12pub struct DefaultThreadArgs {
14 pub accounts_db_background_threads: String,
15 pub accounts_db_foreground_threads: String,
16 pub accounts_db_hash_threads: String,
17 pub accounts_index_flush_threads: String,
18 pub block_production_num_workers: String,
19 pub ip_echo_server_threads: String,
20 pub rayon_global_threads: String,
21 pub replay_forks_threads: String,
22 pub replay_transactions_threads: String,
23 pub rocksdb_compaction_threads: String,
24 pub rocksdb_flush_threads: String,
25 pub tpu_transaction_forward_receive_threads: String,
26 pub tpu_transaction_receive_threads: String,
27 pub tpu_vote_transaction_receive_threads: String,
28 pub tvu_receive_threads: String,
29 pub tvu_retransmit_threads: String,
30 pub tvu_sigverify_threads: String,
31}
32
33impl Default for DefaultThreadArgs {
34 fn default() -> Self {
35 Self {
36 accounts_db_background_threads: AccountsDbBackgroundThreadsArg::bounded_default()
37 .to_string(),
38 accounts_db_foreground_threads: AccountsDbForegroundThreadsArg::bounded_default()
39 .to_string(),
40 accounts_db_hash_threads: AccountsDbHashThreadsArg::bounded_default().to_string(),
41 accounts_index_flush_threads: AccountsIndexFlushThreadsArg::bounded_default()
42 .to_string(),
43 block_production_num_workers: BankingStage::default_num_workers().to_string(),
44 ip_echo_server_threads: IpEchoServerThreadsArg::bounded_default().to_string(),
45 rayon_global_threads: RayonGlobalThreadsArg::bounded_default().to_string(),
46 replay_forks_threads: ReplayForksThreadsArg::bounded_default().to_string(),
47 replay_transactions_threads: ReplayTransactionsThreadsArg::bounded_default()
48 .to_string(),
49 rocksdb_compaction_threads: RocksdbCompactionThreadsArg::bounded_default().to_string(),
50 rocksdb_flush_threads: RocksdbFlushThreadsArg::bounded_default().to_string(),
51 tpu_transaction_forward_receive_threads:
52 TpuTransactionForwardReceiveThreadArgs::bounded_default().to_string(),
53 tpu_transaction_receive_threads: TpuTransactionReceiveThreads::bounded_default()
54 .to_string(),
55 tpu_vote_transaction_receive_threads:
56 TpuVoteTransactionReceiveThreads::bounded_default().to_string(),
57 tvu_receive_threads: TvuReceiveThreadsArg::bounded_default().to_string(),
58 tvu_retransmit_threads: TvuRetransmitThreadsArg::bounded_default().to_string(),
59 tvu_sigverify_threads: TvuShredSigverifyThreadsArg::bounded_default().to_string(),
60 }
61 }
62}
63
64pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec<Arg<'_, 'a>> {
65 vec![
66 new_thread_arg::<AccountsDbBackgroundThreadsArg>(&defaults.accounts_db_background_threads),
67 new_thread_arg::<AccountsDbForegroundThreadsArg>(&defaults.accounts_db_foreground_threads),
68 new_thread_arg::<AccountsDbHashThreadsArg>(&defaults.accounts_db_hash_threads),
69 new_thread_arg::<AccountsIndexFlushThreadsArg>(&defaults.accounts_index_flush_threads),
70 new_thread_arg::<BlockProductionNumWorkersArg>(&defaults.block_production_num_workers),
71 new_thread_arg::<IpEchoServerThreadsArg>(&defaults.ip_echo_server_threads),
72 new_thread_arg::<RayonGlobalThreadsArg>(&defaults.rayon_global_threads),
73 new_thread_arg::<ReplayForksThreadsArg>(&defaults.replay_forks_threads),
74 new_thread_arg::<ReplayTransactionsThreadsArg>(&defaults.replay_transactions_threads),
75 new_thread_arg::<RocksdbCompactionThreadsArg>(&defaults.rocksdb_compaction_threads),
76 new_thread_arg::<RocksdbFlushThreadsArg>(&defaults.rocksdb_flush_threads),
77 new_thread_arg::<TpuTransactionForwardReceiveThreadArgs>(
78 &defaults.tpu_transaction_forward_receive_threads,
79 ),
80 new_thread_arg::<TpuTransactionReceiveThreads>(&defaults.tpu_transaction_receive_threads),
81 new_thread_arg::<TpuVoteTransactionReceiveThreads>(
82 &defaults.tpu_vote_transaction_receive_threads,
83 ),
84 new_thread_arg::<TvuReceiveThreadsArg>(&defaults.tvu_receive_threads),
85 new_thread_arg::<TvuRetransmitThreadsArg>(&defaults.tvu_retransmit_threads),
86 new_thread_arg::<TvuShredSigverifyThreadsArg>(&defaults.tvu_sigverify_threads),
87 ]
88}
89
90fn new_thread_arg<'a, T: ThreadArg>(default: &str) -> Arg<'_, 'a> {
91 Arg::with_name(T::NAME)
92 .long(T::LONG_NAME)
93 .takes_value(true)
94 .value_name("NUMBER")
95 .default_value(default)
96 .validator(|num| is_within_range(num, T::range()))
97 .hidden(hidden_unless_forced())
98 .help(T::HELP)
99}
100
101pub struct NumThreadConfig {
102 pub accounts_db_background_threads: NonZeroUsize,
103 pub accounts_db_foreground_threads: NonZeroUsize,
104 pub accounts_db_hash_threads: NonZeroUsize,
105 pub accounts_index_flush_threads: NonZeroUsize,
106 pub block_production_num_workers: NonZeroUsize,
107 pub ip_echo_server_threads: NonZeroUsize,
108 pub rayon_global_threads: NonZeroUsize,
109 pub replay_forks_threads: NonZeroUsize,
110 pub replay_transactions_threads: NonZeroUsize,
111 pub tpu_transaction_forward_receive_threads: NonZeroUsize,
112 pub tpu_transaction_receive_threads: NonZeroUsize,
113 pub tpu_vote_transaction_receive_threads: NonZeroUsize,
114 pub tvu_receive_threads: NonZeroUsize,
115 pub tvu_retransmit_threads: NonZeroUsize,
116 pub tvu_sigverify_threads: NonZeroUsize,
117}
118
119pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig {
120 let accounts_db_background_threads = {
121 if matches.is_present("accounts_db_clean_threads") {
122 value_t_or_exit!(matches, "accounts_db_clean_threads", NonZeroUsize)
123 } else {
124 value_t_or_exit!(matches, AccountsDbBackgroundThreadsArg::NAME, NonZeroUsize)
125 }
126 };
127 NumThreadConfig {
128 accounts_db_background_threads,
129 accounts_db_foreground_threads: value_t_or_exit!(
130 matches,
131 AccountsDbForegroundThreadsArg::NAME,
132 NonZeroUsize
133 ),
134 accounts_db_hash_threads: value_t_or_exit!(
135 matches,
136 AccountsDbHashThreadsArg::NAME,
137 NonZeroUsize
138 ),
139 accounts_index_flush_threads: value_t_or_exit!(
140 matches,
141 AccountsIndexFlushThreadsArg::NAME,
142 NonZeroUsize
143 ),
144 block_production_num_workers: value_t_or_exit!(
145 matches,
146 BlockProductionNumWorkersArg::NAME,
147 NonZeroUsize
148 ),
149 ip_echo_server_threads: value_t_or_exit!(
150 matches,
151 IpEchoServerThreadsArg::NAME,
152 NonZeroUsize
153 ),
154 rayon_global_threads: value_t_or_exit!(matches, RayonGlobalThreadsArg::NAME, NonZeroUsize),
155 replay_forks_threads: value_t_or_exit!(matches, ReplayForksThreadsArg::NAME, NonZeroUsize),
156 replay_transactions_threads: value_t_or_exit!(
157 matches,
158 ReplayTransactionsThreadsArg::NAME,
159 NonZeroUsize
160 ),
161 tpu_transaction_forward_receive_threads: value_t_or_exit!(
162 matches,
163 TpuTransactionForwardReceiveThreadArgs::NAME,
164 NonZeroUsize
165 ),
166 tpu_transaction_receive_threads: value_t_or_exit!(
167 matches,
168 TpuTransactionReceiveThreads::NAME,
169 NonZeroUsize
170 ),
171 tpu_vote_transaction_receive_threads: value_t_or_exit!(
172 matches,
173 TpuVoteTransactionReceiveThreads::NAME,
174 NonZeroUsize
175 ),
176 tvu_receive_threads: value_t_or_exit!(matches, TvuReceiveThreadsArg::NAME, NonZeroUsize),
177 tvu_retransmit_threads: value_t_or_exit!(
178 matches,
179 TvuRetransmitThreadsArg::NAME,
180 NonZeroUsize
181 ),
182 tvu_sigverify_threads: value_t_or_exit!(
183 matches,
184 TvuShredSigverifyThreadsArg::NAME,
185 NonZeroUsize
186 ),
187 }
188}
189
190pub trait ThreadArg {
192 const NAME: &'static str;
194 const LONG_NAME: &'static str;
196 const HELP: &'static str;
198
199 fn default() -> usize;
201 fn bounded_default() -> usize {
205 std::cmp::min(Self::default(), Self::max())
206 }
207 fn min() -> usize {
209 1
210 }
211 fn max() -> usize {
213 num_cpus::get()
215 }
216 fn range() -> RangeInclusive<usize> {
218 RangeInclusive::new(Self::min(), Self::max())
219 }
220}
221
222struct AccountsDbBackgroundThreadsArg;
223impl ThreadArg for AccountsDbBackgroundThreadsArg {
224 const NAME: &'static str = "accounts_db_background_threads";
225 const LONG_NAME: &'static str = "accounts-db-background-threads";
226 const HELP: &'static str = "Number of threads to use for AccountsDb background tasks";
227
228 fn default() -> usize {
229 accounts_db::quarter_thread_count()
230 }
231}
232
233struct AccountsDbForegroundThreadsArg;
234impl ThreadArg for AccountsDbForegroundThreadsArg {
235 const NAME: &'static str = "accounts_db_foreground_threads";
236 const LONG_NAME: &'static str = "accounts-db-foreground-threads";
237 const HELP: &'static str =
238 "Number of threads to use for AccountsDb foreground tasks, e.g. transaction processing";
239
240 fn default() -> usize {
241 accounts_db::default_num_foreground_threads()
242 }
243}
244
245struct AccountsDbHashThreadsArg;
246impl ThreadArg for AccountsDbHashThreadsArg {
247 const NAME: &'static str = "accounts_db_hash_threads";
248 const LONG_NAME: &'static str = "accounts-db-hash-threads";
249 const HELP: &'static str = "Number of threads to use for accounts hash verification at startup";
250
251 fn default() -> usize {
252 accounts_db::default_num_hash_threads().get()
253 }
254}
255
256struct AccountsIndexFlushThreadsArg;
257impl ThreadArg for AccountsIndexFlushThreadsArg {
258 const NAME: &'static str = "accounts_index_flush_threads";
259 const LONG_NAME: &'static str = "accounts-index-flush-threads";
260 const HELP: &'static str = "Number of threads to use for flushing the accounts index";
261
262 fn default() -> usize {
263 accounts_index::default_num_flush_threads().get()
264 }
265}
266
267struct BlockProductionNumWorkersArg;
268impl ThreadArg for BlockProductionNumWorkersArg {
269 const NAME: &'static str = "block_production_num_workers";
270 const LONG_NAME: &'static str = "block-production-num-workers";
271 const HELP: &'static str = "Number of worker threads to use for block production";
272
273 fn default() -> usize {
274 BankingStage::default_num_workers().get()
275 }
276
277 fn min() -> usize {
278 1
279 }
280
281 fn max() -> usize {
282 BankingStage::max_num_workers().get()
283 }
284}
285
286struct IpEchoServerThreadsArg;
287impl ThreadArg for IpEchoServerThreadsArg {
288 const NAME: &'static str = "ip_echo_server_threads";
289 const LONG_NAME: &'static str = "ip-echo-server-threads";
290 const HELP: &'static str = "Number of threads to use for the IP echo server";
291
292 fn default() -> usize {
293 solana_net_utils::DEFAULT_IP_ECHO_SERVER_THREADS.get()
294 }
295 fn min() -> usize {
296 solana_net_utils::MINIMUM_IP_ECHO_SERVER_THREADS.get()
297 }
298}
299
300struct RayonGlobalThreadsArg;
301impl ThreadArg for RayonGlobalThreadsArg {
302 const NAME: &'static str = "rayon_global_threads";
303 const LONG_NAME: &'static str = "rayon-global-threads";
304 const HELP: &'static str = "Number of threads to use for the global rayon thread pool";
305
306 fn default() -> usize {
307 num_cpus::get()
308 }
309}
310
311struct ReplayForksThreadsArg;
312impl ThreadArg for ReplayForksThreadsArg {
313 const NAME: &'static str = "replay_forks_threads";
314 const LONG_NAME: &'static str = "replay-forks-threads";
315 const HELP: &'static str = "Number of threads to use for replay of blocks on different forks";
316
317 fn default() -> usize {
318 1
320 }
321 fn max() -> usize {
322 4
325 }
326}
327
328struct ReplayTransactionsThreadsArg;
329impl ThreadArg for ReplayTransactionsThreadsArg {
330 const NAME: &'static str = "replay_transactions_threads";
331 const LONG_NAME: &'static str = "replay-transactions-threads";
332 const HELP: &'static str = "Number of threads to use for transaction replay";
333
334 fn default() -> usize {
335 num_cpus::get()
336 }
337}
338
339pub struct RocksdbCompactionThreadsArg;
340impl ThreadArg for RocksdbCompactionThreadsArg {
341 const NAME: &'static str = "rocksdb_compaction_threads";
342 const LONG_NAME: &'static str = "rocksdb-compaction-threads";
343 const HELP: &'static str = "Number of threads to use for rocksdb (Blockstore) compactions";
344
345 fn default() -> usize {
346 solana_ledger::blockstore::default_num_compaction_threads().get()
347 }
348}
349
350pub struct RocksdbFlushThreadsArg;
351impl ThreadArg for RocksdbFlushThreadsArg {
352 const NAME: &'static str = "rocksdb_flush_threads";
353 const LONG_NAME: &'static str = "rocksdb-flush-threads";
354 const HELP: &'static str = "Number of threads to use for rocksdb (Blockstore) memtable flushes";
355
356 fn default() -> usize {
357 solana_ledger::blockstore::default_num_flush_threads().get()
358 }
359}
360
361struct TpuTransactionForwardReceiveThreadArgs;
362impl ThreadArg for TpuTransactionForwardReceiveThreadArgs {
363 const NAME: &'static str = "tpu_transaction_forward_receive_threads";
364 const LONG_NAME: &'static str = "tpu-transaction-forward-receive-threads";
365 const HELP: &'static str =
366 "Number of threads to use for receiving transactions on the TPU fowards port";
367
368 fn default() -> usize {
369 solana_streamer::quic::default_num_tpu_transaction_forward_receive_threads()
370 }
371}
372
373struct TpuTransactionReceiveThreads;
374impl ThreadArg for TpuTransactionReceiveThreads {
375 const NAME: &'static str = "tpu_transaction_receive_threads";
376 const LONG_NAME: &'static str = "tpu-transaction-receive-threads";
377 const HELP: &'static str =
378 "Number of threads to use for receiving transactions on the TPU port";
379
380 fn default() -> usize {
381 solana_streamer::quic::default_num_tpu_transaction_receive_threads()
382 }
383}
384
385struct TpuVoteTransactionReceiveThreads;
386impl ThreadArg for TpuVoteTransactionReceiveThreads {
387 const NAME: &'static str = "tpu_vote_transaction_receive_threads";
388 const LONG_NAME: &'static str = "tpu-vote-transaction-receive-threads";
389 const HELP: &'static str =
390 "Number of threads to use for receiving transactions on the TPU vote port";
391
392 fn default() -> usize {
393 solana_streamer::quic::default_num_tpu_vote_transaction_receive_threads()
394 }
395}
396
397struct TvuReceiveThreadsArg;
398impl ThreadArg for TvuReceiveThreadsArg {
399 const NAME: &'static str = "tvu_receive_threads";
400 const LONG_NAME: &'static str = "tvu-receive-threads";
401 const HELP: &'static str =
402 "Number of threads (and sockets) to use for receiving shreds on the TVU port";
403
404 fn default() -> usize {
405 solana_gossip::cluster_info::DEFAULT_NUM_TVU_RECEIVE_SOCKETS.get()
406 }
407 fn min() -> usize {
408 solana_gossip::cluster_info::MINIMUM_NUM_TVU_RECEIVE_SOCKETS.get()
409 }
410}
411
412struct TvuRetransmitThreadsArg;
413impl ThreadArg for TvuRetransmitThreadsArg {
414 const NAME: &'static str = "tvu_retransmit_threads";
415 const LONG_NAME: &'static str = "tvu-retransmit-threads";
416 const HELP: &'static str = "Number of threads (and sockets) to use for retransmitting shreds";
417
418 fn default() -> usize {
419 solana_gossip::cluster_info::DEFAULT_NUM_TVU_RETRANSMIT_SOCKETS.get()
420 }
421
422 fn min() -> usize {
423 solana_gossip::cluster_info::MINIMUM_NUM_TVU_RETRANSMIT_SOCKETS.get()
424 }
425}
426
427struct TvuShredSigverifyThreadsArg;
428impl ThreadArg for TvuShredSigverifyThreadsArg {
429 const NAME: &'static str = "tvu_shred_sigverify_threads";
430 const LONG_NAME: &'static str = "tvu-shred-sigverify-threads";
431 const HELP: &'static str =
432 "Number of threads to use for performing signature verification of received shreds";
433
434 fn default() -> usize {
435 get_thread_count()
436 }
437}