1use {
4 clap::{value_t_or_exit, Arg, ArgMatches},
5 solana_accounts_db::{accounts_db, accounts_index},
6 solana_clap_utils::{hidden_unless_forced, input_validators::is_within_range},
7 solana_core::banking_stage::BankingStage,
8 solana_rayon_threadlimit::get_thread_count,
9 std::{num::NonZeroUsize, ops::RangeInclusive},
10};
11
12pub struct DefaultThreadArgs {
14 pub accounts_db_background_threads: String,
15 pub accounts_db_foreground_threads: String,
16 pub accounts_index_flush_threads: String,
17 pub block_production_num_workers: String,
18 pub ip_echo_server_threads: String,
19 pub rayon_global_threads: String,
20 pub replay_forks_threads: String,
21 pub replay_transactions_threads: String,
22 pub rocksdb_compaction_threads: String,
23 pub rocksdb_flush_threads: String,
24 pub tpu_transaction_forward_receive_threads: String,
25 pub tpu_transaction_receive_threads: String,
26 pub tpu_vote_transaction_receive_threads: String,
27 pub tvu_receive_threads: String,
28 pub tvu_retransmit_threads: String,
29 pub tvu_sigverify_threads: String,
30}
31
32impl Default for DefaultThreadArgs {
33 fn default() -> Self {
34 Self {
35 accounts_db_background_threads: AccountsDbBackgroundThreadsArg::bounded_default()
36 .to_string(),
37 accounts_db_foreground_threads: AccountsDbForegroundThreadsArg::bounded_default()
38 .to_string(),
39 accounts_index_flush_threads: AccountsIndexFlushThreadsArg::bounded_default()
40 .to_string(),
41 block_production_num_workers: BankingStage::default_num_workers().to_string(),
42 ip_echo_server_threads: IpEchoServerThreadsArg::bounded_default().to_string(),
43 rayon_global_threads: RayonGlobalThreadsArg::bounded_default().to_string(),
44 replay_forks_threads: ReplayForksThreadsArg::bounded_default().to_string(),
45 replay_transactions_threads: ReplayTransactionsThreadsArg::bounded_default()
46 .to_string(),
47 rocksdb_compaction_threads: RocksdbCompactionThreadsArg::bounded_default().to_string(),
48 rocksdb_flush_threads: RocksdbFlushThreadsArg::bounded_default().to_string(),
49 tpu_transaction_forward_receive_threads:
50 TpuTransactionForwardReceiveThreadArgs::bounded_default().to_string(),
51 tpu_transaction_receive_threads: TpuTransactionReceiveThreads::bounded_default()
52 .to_string(),
53 tpu_vote_transaction_receive_threads:
54 TpuVoteTransactionReceiveThreads::bounded_default().to_string(),
55 tvu_receive_threads: TvuReceiveThreadsArg::bounded_default().to_string(),
56 tvu_retransmit_threads: TvuRetransmitThreadsArg::bounded_default().to_string(),
57 tvu_sigverify_threads: TvuShredSigverifyThreadsArg::bounded_default().to_string(),
58 }
59 }
60}
61
62pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec<Arg<'_, 'a>> {
63 vec![
64 new_thread_arg::<AccountsDbBackgroundThreadsArg>(&defaults.accounts_db_background_threads),
65 new_thread_arg::<AccountsDbForegroundThreadsArg>(&defaults.accounts_db_foreground_threads),
66 new_thread_arg::<AccountsIndexFlushThreadsArg>(&defaults.accounts_index_flush_threads),
67 new_thread_arg::<BlockProductionNumWorkersArg>(&defaults.block_production_num_workers),
68 new_thread_arg::<IpEchoServerThreadsArg>(&defaults.ip_echo_server_threads),
69 new_thread_arg::<RayonGlobalThreadsArg>(&defaults.rayon_global_threads),
70 new_thread_arg::<ReplayForksThreadsArg>(&defaults.replay_forks_threads),
71 new_thread_arg::<ReplayTransactionsThreadsArg>(&defaults.replay_transactions_threads),
72 new_thread_arg::<RocksdbCompactionThreadsArg>(&defaults.rocksdb_compaction_threads),
73 new_thread_arg::<RocksdbFlushThreadsArg>(&defaults.rocksdb_flush_threads),
74 new_thread_arg::<TpuTransactionForwardReceiveThreadArgs>(
75 &defaults.tpu_transaction_forward_receive_threads,
76 ),
77 new_thread_arg::<TpuTransactionReceiveThreads>(&defaults.tpu_transaction_receive_threads),
78 new_thread_arg::<TpuVoteTransactionReceiveThreads>(
79 &defaults.tpu_vote_transaction_receive_threads,
80 ),
81 new_thread_arg::<TvuReceiveThreadsArg>(&defaults.tvu_receive_threads),
82 new_thread_arg::<TvuRetransmitThreadsArg>(&defaults.tvu_retransmit_threads),
83 new_thread_arg::<TvuShredSigverifyThreadsArg>(&defaults.tvu_sigverify_threads),
84 ]
85}
86
87fn new_thread_arg<'a, T: ThreadArg>(default: &str) -> Arg<'_, 'a> {
88 Arg::with_name(T::NAME)
89 .long(T::LONG_NAME)
90 .takes_value(true)
91 .value_name("NUMBER")
92 .default_value(default)
93 .validator(|num| is_within_range(num, T::range()))
94 .hidden(hidden_unless_forced())
95 .help(T::HELP)
96}
97
98pub struct NumThreadConfig {
99 pub accounts_db_background_threads: NonZeroUsize,
100 pub accounts_db_foreground_threads: NonZeroUsize,
101 pub accounts_index_flush_threads: NonZeroUsize,
102 pub block_production_num_workers: NonZeroUsize,
103 pub ip_echo_server_threads: NonZeroUsize,
104 pub rayon_global_threads: NonZeroUsize,
105 pub replay_forks_threads: NonZeroUsize,
106 pub replay_transactions_threads: NonZeroUsize,
107 pub tpu_transaction_forward_receive_threads: NonZeroUsize,
108 pub tpu_transaction_receive_threads: NonZeroUsize,
109 pub tpu_vote_transaction_receive_threads: NonZeroUsize,
110 pub tvu_receive_threads: NonZeroUsize,
111 pub tvu_retransmit_threads: NonZeroUsize,
112 pub tvu_sigverify_threads: NonZeroUsize,
113}
114
115pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig {
116 let accounts_db_background_threads = {
117 if matches.is_present("accounts_db_clean_threads") {
118 value_t_or_exit!(matches, "accounts_db_clean_threads", NonZeroUsize)
119 } else {
120 value_t_or_exit!(matches, AccountsDbBackgroundThreadsArg::NAME, NonZeroUsize)
121 }
122 };
123 NumThreadConfig {
124 accounts_db_background_threads,
125 accounts_db_foreground_threads: value_t_or_exit!(
126 matches,
127 AccountsDbForegroundThreadsArg::NAME,
128 NonZeroUsize
129 ),
130 accounts_index_flush_threads: value_t_or_exit!(
131 matches,
132 AccountsIndexFlushThreadsArg::NAME,
133 NonZeroUsize
134 ),
135 block_production_num_workers: value_t_or_exit!(
136 matches,
137 BlockProductionNumWorkersArg::NAME,
138 NonZeroUsize
139 ),
140 ip_echo_server_threads: value_t_or_exit!(
141 matches,
142 IpEchoServerThreadsArg::NAME,
143 NonZeroUsize
144 ),
145 rayon_global_threads: value_t_or_exit!(matches, RayonGlobalThreadsArg::NAME, NonZeroUsize),
146 replay_forks_threads: value_t_or_exit!(matches, ReplayForksThreadsArg::NAME, NonZeroUsize),
147 replay_transactions_threads: value_t_or_exit!(
148 matches,
149 ReplayTransactionsThreadsArg::NAME,
150 NonZeroUsize
151 ),
152 tpu_transaction_forward_receive_threads: value_t_or_exit!(
153 matches,
154 TpuTransactionForwardReceiveThreadArgs::NAME,
155 NonZeroUsize
156 ),
157 tpu_transaction_receive_threads: value_t_or_exit!(
158 matches,
159 TpuTransactionReceiveThreads::NAME,
160 NonZeroUsize
161 ),
162 tpu_vote_transaction_receive_threads: value_t_or_exit!(
163 matches,
164 TpuVoteTransactionReceiveThreads::NAME,
165 NonZeroUsize
166 ),
167 tvu_receive_threads: value_t_or_exit!(matches, TvuReceiveThreadsArg::NAME, NonZeroUsize),
168 tvu_retransmit_threads: value_t_or_exit!(
169 matches,
170 TvuRetransmitThreadsArg::NAME,
171 NonZeroUsize
172 ),
173 tvu_sigverify_threads: value_t_or_exit!(
174 matches,
175 TvuShredSigverifyThreadsArg::NAME,
176 NonZeroUsize
177 ),
178 }
179}
180
181pub trait ThreadArg {
183 const NAME: &'static str;
185 const LONG_NAME: &'static str;
187 const HELP: &'static str;
189
190 fn default() -> usize;
192 fn bounded_default() -> usize {
196 std::cmp::min(Self::default(), Self::max())
197 }
198 fn min() -> usize {
200 1
201 }
202 fn max() -> usize {
204 num_cpus::get()
206 }
207 fn range() -> RangeInclusive<usize> {
209 RangeInclusive::new(Self::min(), Self::max())
210 }
211}
212
213struct AccountsDbBackgroundThreadsArg;
214impl ThreadArg for AccountsDbBackgroundThreadsArg {
215 const NAME: &'static str = "accounts_db_background_threads";
216 const LONG_NAME: &'static str = "accounts-db-background-threads";
217 const HELP: &'static str = "Number of threads to use for AccountsDb background tasks";
218
219 fn default() -> usize {
220 accounts_db::quarter_thread_count()
221 }
222}
223
224struct AccountsDbForegroundThreadsArg;
225impl ThreadArg for AccountsDbForegroundThreadsArg {
226 const NAME: &'static str = "accounts_db_foreground_threads";
227 const LONG_NAME: &'static str = "accounts-db-foreground-threads";
228 const HELP: &'static str =
229 "Number of threads to use for AccountsDb foreground tasks, e.g. transaction processing";
230
231 fn default() -> usize {
232 accounts_db::default_num_foreground_threads()
233 }
234}
235
236struct AccountsIndexFlushThreadsArg;
237impl ThreadArg for AccountsIndexFlushThreadsArg {
238 const NAME: &'static str = "accounts_index_flush_threads";
239 const LONG_NAME: &'static str = "accounts-index-flush-threads";
240 const HELP: &'static str = "Number of threads to use for flushing the accounts index";
241
242 fn default() -> usize {
243 accounts_index::default_num_flush_threads().get()
244 }
245}
246
247struct BlockProductionNumWorkersArg;
248impl ThreadArg for BlockProductionNumWorkersArg {
249 const NAME: &'static str = "block_production_num_workers";
250 const LONG_NAME: &'static str = "block-production-num-workers";
251 const HELP: &'static str = "Number of worker threads to use for block production";
252
253 fn default() -> usize {
254 BankingStage::default_num_workers().get()
255 }
256
257 fn min() -> usize {
258 1
259 }
260
261 fn max() -> usize {
262 BankingStage::max_num_workers().get()
263 }
264}
265
266struct IpEchoServerThreadsArg;
267impl ThreadArg for IpEchoServerThreadsArg {
268 const NAME: &'static str = "ip_echo_server_threads";
269 const LONG_NAME: &'static str = "ip-echo-server-threads";
270 const HELP: &'static str = "Number of threads to use for the IP echo server";
271
272 fn default() -> usize {
273 solana_net_utils::DEFAULT_IP_ECHO_SERVER_THREADS.get()
274 }
275 fn min() -> usize {
276 solana_net_utils::MINIMUM_IP_ECHO_SERVER_THREADS.get()
277 }
278}
279
280struct RayonGlobalThreadsArg;
281impl ThreadArg for RayonGlobalThreadsArg {
282 const NAME: &'static str = "rayon_global_threads";
283 const LONG_NAME: &'static str = "rayon-global-threads";
284 const HELP: &'static str = "Number of threads to use for the global rayon thread pool";
285
286 fn default() -> usize {
287 num_cpus::get()
288 }
289}
290
291struct ReplayForksThreadsArg;
292impl ThreadArg for ReplayForksThreadsArg {
293 const NAME: &'static str = "replay_forks_threads";
294 const LONG_NAME: &'static str = "replay-forks-threads";
295 const HELP: &'static str = "Number of threads to use for replay of blocks on different forks";
296
297 fn default() -> usize {
298 1
300 }
301 fn max() -> usize {
302 4
305 }
306}
307
308struct ReplayTransactionsThreadsArg;
309impl ThreadArg for ReplayTransactionsThreadsArg {
310 const NAME: &'static str = "replay_transactions_threads";
311 const LONG_NAME: &'static str = "replay-transactions-threads";
312 const HELP: &'static str = "Number of threads to use for transaction replay";
313
314 fn default() -> usize {
315 num_cpus::get()
316 }
317}
318
319pub struct RocksdbCompactionThreadsArg;
320impl ThreadArg for RocksdbCompactionThreadsArg {
321 const NAME: &'static str = "rocksdb_compaction_threads";
322 const LONG_NAME: &'static str = "rocksdb-compaction-threads";
323 const HELP: &'static str = "Number of threads to use for rocksdb (Blockstore) compactions";
324
325 fn default() -> usize {
326 solana_ledger::blockstore::default_num_compaction_threads().get()
327 }
328}
329
330pub struct RocksdbFlushThreadsArg;
331impl ThreadArg for RocksdbFlushThreadsArg {
332 const NAME: &'static str = "rocksdb_flush_threads";
333 const LONG_NAME: &'static str = "rocksdb-flush-threads";
334 const HELP: &'static str = "Number of threads to use for rocksdb (Blockstore) memtable flushes";
335
336 fn default() -> usize {
337 solana_ledger::blockstore::default_num_flush_threads().get()
338 }
339}
340
341struct TpuTransactionForwardReceiveThreadArgs;
342impl ThreadArg for TpuTransactionForwardReceiveThreadArgs {
343 const NAME: &'static str = "tpu_transaction_forward_receive_threads";
344 const LONG_NAME: &'static str = "tpu-transaction-forward-receive-threads";
345 const HELP: &'static str =
346 "Number of threads to use for receiving transactions on the TPU fowards port";
347
348 fn default() -> usize {
349 solana_streamer::quic::default_num_tpu_transaction_forward_receive_threads()
350 }
351}
352
353struct TpuTransactionReceiveThreads;
354impl ThreadArg for TpuTransactionReceiveThreads {
355 const NAME: &'static str = "tpu_transaction_receive_threads";
356 const LONG_NAME: &'static str = "tpu-transaction-receive-threads";
357 const HELP: &'static str =
358 "Number of threads to use for receiving transactions on the TPU port";
359
360 fn default() -> usize {
361 solana_streamer::quic::default_num_tpu_transaction_receive_threads()
362 }
363}
364
365struct TpuVoteTransactionReceiveThreads;
366impl ThreadArg for TpuVoteTransactionReceiveThreads {
367 const NAME: &'static str = "tpu_vote_transaction_receive_threads";
368 const LONG_NAME: &'static str = "tpu-vote-transaction-receive-threads";
369 const HELP: &'static str =
370 "Number of threads to use for receiving transactions on the TPU vote port";
371
372 fn default() -> usize {
373 solana_streamer::quic::default_num_tpu_vote_transaction_receive_threads()
374 }
375}
376
377struct TvuReceiveThreadsArg;
378impl ThreadArg for TvuReceiveThreadsArg {
379 const NAME: &'static str = "tvu_receive_threads";
380 const LONG_NAME: &'static str = "tvu-receive-threads";
381 const HELP: &'static str =
382 "Number of threads (and sockets) to use for receiving shreds on the TVU port";
383
384 fn default() -> usize {
385 solana_gossip::cluster_info::DEFAULT_NUM_TVU_RECEIVE_SOCKETS.get()
386 }
387 fn min() -> usize {
388 solana_gossip::cluster_info::MINIMUM_NUM_TVU_RECEIVE_SOCKETS.get()
389 }
390}
391
392struct TvuRetransmitThreadsArg;
393impl ThreadArg for TvuRetransmitThreadsArg {
394 const NAME: &'static str = "tvu_retransmit_threads";
395 const LONG_NAME: &'static str = "tvu-retransmit-threads";
396 const HELP: &'static str = "Number of threads (and sockets) to use for retransmitting shreds";
397
398 fn default() -> usize {
399 solana_gossip::cluster_info::DEFAULT_NUM_TVU_RETRANSMIT_SOCKETS.get()
400 }
401
402 fn min() -> usize {
403 solana_gossip::cluster_info::MINIMUM_NUM_TVU_RETRANSMIT_SOCKETS.get()
404 }
405}
406
407struct TvuShredSigverifyThreadsArg;
408impl ThreadArg for TvuShredSigverifyThreadsArg {
409 const NAME: &'static str = "tvu_shred_sigverify_threads";
410 const LONG_NAME: &'static str = "tvu-shred-sigverify-threads";
411 const HELP: &'static str =
412 "Number of threads to use for performing signature verification of received shreds";
413
414 fn default() -> usize {
415 get_thread_count()
416 }
417}