agave_validator/cli/
thread_args.rs

1//! Arguments for controlling the number of threads allocated for various tasks
2
3use {
4    clap::{value_t_or_exit, Arg, ArgMatches},
5    solana_accounts_db::{accounts_db, accounts_index},
6    solana_clap_utils::{hidden_unless_forced, input_validators::is_within_range},
7    solana_core::banking_stage::BankingStage,
8    solana_rayon_threadlimit::get_thread_count,
9    std::{num::NonZeroUsize, ops::RangeInclusive},
10};
11
12// Need this struct to provide &str whose lifetime matches that of the CLAP Arg's
13pub struct DefaultThreadArgs {
14    pub accounts_db_background_threads: String,
15    pub accounts_db_foreground_threads: String,
16    pub accounts_index_flush_threads: String,
17    pub block_production_num_workers: String,
18    pub ip_echo_server_threads: String,
19    pub rayon_global_threads: String,
20    pub replay_forks_threads: String,
21    pub replay_transactions_threads: String,
22    pub rocksdb_compaction_threads: String,
23    pub rocksdb_flush_threads: String,
24    pub tpu_transaction_forward_receive_threads: String,
25    pub tpu_transaction_receive_threads: String,
26    pub tpu_vote_transaction_receive_threads: String,
27    pub tvu_receive_threads: String,
28    pub tvu_retransmit_threads: String,
29    pub tvu_sigverify_threads: String,
30}
31
32impl Default for DefaultThreadArgs {
33    fn default() -> Self {
34        Self {
35            accounts_db_background_threads: AccountsDbBackgroundThreadsArg::bounded_default()
36                .to_string(),
37            accounts_db_foreground_threads: AccountsDbForegroundThreadsArg::bounded_default()
38                .to_string(),
39            accounts_index_flush_threads: AccountsIndexFlushThreadsArg::bounded_default()
40                .to_string(),
41            block_production_num_workers: BankingStage::default_num_workers().to_string(),
42            ip_echo_server_threads: IpEchoServerThreadsArg::bounded_default().to_string(),
43            rayon_global_threads: RayonGlobalThreadsArg::bounded_default().to_string(),
44            replay_forks_threads: ReplayForksThreadsArg::bounded_default().to_string(),
45            replay_transactions_threads: ReplayTransactionsThreadsArg::bounded_default()
46                .to_string(),
47            rocksdb_compaction_threads: RocksdbCompactionThreadsArg::bounded_default().to_string(),
48            rocksdb_flush_threads: RocksdbFlushThreadsArg::bounded_default().to_string(),
49            tpu_transaction_forward_receive_threads:
50                TpuTransactionForwardReceiveThreadArgs::bounded_default().to_string(),
51            tpu_transaction_receive_threads: TpuTransactionReceiveThreads::bounded_default()
52                .to_string(),
53            tpu_vote_transaction_receive_threads:
54                TpuVoteTransactionReceiveThreads::bounded_default().to_string(),
55            tvu_receive_threads: TvuReceiveThreadsArg::bounded_default().to_string(),
56            tvu_retransmit_threads: TvuRetransmitThreadsArg::bounded_default().to_string(),
57            tvu_sigverify_threads: TvuShredSigverifyThreadsArg::bounded_default().to_string(),
58        }
59    }
60}
61
62pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec<Arg<'_, 'a>> {
63    vec![
64        new_thread_arg::<AccountsDbBackgroundThreadsArg>(&defaults.accounts_db_background_threads),
65        new_thread_arg::<AccountsDbForegroundThreadsArg>(&defaults.accounts_db_foreground_threads),
66        new_thread_arg::<AccountsIndexFlushThreadsArg>(&defaults.accounts_index_flush_threads),
67        new_thread_arg::<BlockProductionNumWorkersArg>(&defaults.block_production_num_workers),
68        new_thread_arg::<IpEchoServerThreadsArg>(&defaults.ip_echo_server_threads),
69        new_thread_arg::<RayonGlobalThreadsArg>(&defaults.rayon_global_threads),
70        new_thread_arg::<ReplayForksThreadsArg>(&defaults.replay_forks_threads),
71        new_thread_arg::<ReplayTransactionsThreadsArg>(&defaults.replay_transactions_threads),
72        new_thread_arg::<RocksdbCompactionThreadsArg>(&defaults.rocksdb_compaction_threads),
73        new_thread_arg::<RocksdbFlushThreadsArg>(&defaults.rocksdb_flush_threads),
74        new_thread_arg::<TpuTransactionForwardReceiveThreadArgs>(
75            &defaults.tpu_transaction_forward_receive_threads,
76        ),
77        new_thread_arg::<TpuTransactionReceiveThreads>(&defaults.tpu_transaction_receive_threads),
78        new_thread_arg::<TpuVoteTransactionReceiveThreads>(
79            &defaults.tpu_vote_transaction_receive_threads,
80        ),
81        new_thread_arg::<TvuReceiveThreadsArg>(&defaults.tvu_receive_threads),
82        new_thread_arg::<TvuRetransmitThreadsArg>(&defaults.tvu_retransmit_threads),
83        new_thread_arg::<TvuShredSigverifyThreadsArg>(&defaults.tvu_sigverify_threads),
84    ]
85}
86
87fn new_thread_arg<'a, T: ThreadArg>(default: &str) -> Arg<'_, 'a> {
88    Arg::with_name(T::NAME)
89        .long(T::LONG_NAME)
90        .takes_value(true)
91        .value_name("NUMBER")
92        .default_value(default)
93        .validator(|num| is_within_range(num, T::range()))
94        .hidden(hidden_unless_forced())
95        .help(T::HELP)
96}
97
98pub struct NumThreadConfig {
99    pub accounts_db_background_threads: NonZeroUsize,
100    pub accounts_db_foreground_threads: NonZeroUsize,
101    pub accounts_index_flush_threads: NonZeroUsize,
102    pub block_production_num_workers: NonZeroUsize,
103    pub ip_echo_server_threads: NonZeroUsize,
104    pub rayon_global_threads: NonZeroUsize,
105    pub replay_forks_threads: NonZeroUsize,
106    pub replay_transactions_threads: NonZeroUsize,
107    pub tpu_transaction_forward_receive_threads: NonZeroUsize,
108    pub tpu_transaction_receive_threads: NonZeroUsize,
109    pub tpu_vote_transaction_receive_threads: NonZeroUsize,
110    pub tvu_receive_threads: NonZeroUsize,
111    pub tvu_retransmit_threads: NonZeroUsize,
112    pub tvu_sigverify_threads: NonZeroUsize,
113}
114
115pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig {
116    let accounts_db_background_threads = {
117        if matches.is_present("accounts_db_clean_threads") {
118            value_t_or_exit!(matches, "accounts_db_clean_threads", NonZeroUsize)
119        } else {
120            value_t_or_exit!(matches, AccountsDbBackgroundThreadsArg::NAME, NonZeroUsize)
121        }
122    };
123    NumThreadConfig {
124        accounts_db_background_threads,
125        accounts_db_foreground_threads: value_t_or_exit!(
126            matches,
127            AccountsDbForegroundThreadsArg::NAME,
128            NonZeroUsize
129        ),
130        accounts_index_flush_threads: value_t_or_exit!(
131            matches,
132            AccountsIndexFlushThreadsArg::NAME,
133            NonZeroUsize
134        ),
135        block_production_num_workers: value_t_or_exit!(
136            matches,
137            BlockProductionNumWorkersArg::NAME,
138            NonZeroUsize
139        ),
140        ip_echo_server_threads: value_t_or_exit!(
141            matches,
142            IpEchoServerThreadsArg::NAME,
143            NonZeroUsize
144        ),
145        rayon_global_threads: value_t_or_exit!(matches, RayonGlobalThreadsArg::NAME, NonZeroUsize),
146        replay_forks_threads: value_t_or_exit!(matches, ReplayForksThreadsArg::NAME, NonZeroUsize),
147        replay_transactions_threads: value_t_or_exit!(
148            matches,
149            ReplayTransactionsThreadsArg::NAME,
150            NonZeroUsize
151        ),
152        tpu_transaction_forward_receive_threads: value_t_or_exit!(
153            matches,
154            TpuTransactionForwardReceiveThreadArgs::NAME,
155            NonZeroUsize
156        ),
157        tpu_transaction_receive_threads: value_t_or_exit!(
158            matches,
159            TpuTransactionReceiveThreads::NAME,
160            NonZeroUsize
161        ),
162        tpu_vote_transaction_receive_threads: value_t_or_exit!(
163            matches,
164            TpuVoteTransactionReceiveThreads::NAME,
165            NonZeroUsize
166        ),
167        tvu_receive_threads: value_t_or_exit!(matches, TvuReceiveThreadsArg::NAME, NonZeroUsize),
168        tvu_retransmit_threads: value_t_or_exit!(
169            matches,
170            TvuRetransmitThreadsArg::NAME,
171            NonZeroUsize
172        ),
173        tvu_sigverify_threads: value_t_or_exit!(
174            matches,
175            TvuShredSigverifyThreadsArg::NAME,
176            NonZeroUsize
177        ),
178    }
179}
180
181/// Configuration for CLAP arguments that control the number of threads for various functions
182pub trait ThreadArg {
183    /// The argument's name
184    const NAME: &'static str;
185    /// The argument's long name
186    const LONG_NAME: &'static str;
187    /// The argument's help message
188    const HELP: &'static str;
189
190    /// The default number of threads
191    fn default() -> usize;
192    /// The default number of threads, bounded by Self::max()
193    /// This prevents potential CLAP issues on low core count machines where
194    /// a fixed value in Self::default() could be greater than Self::max()
195    fn bounded_default() -> usize {
196        std::cmp::min(Self::default(), Self::max())
197    }
198    /// The minimum allowed number of threads (inclusive)
199    fn min() -> usize {
200        1
201    }
202    /// The maximum allowed number of threads (inclusive)
203    fn max() -> usize {
204        // By default, no thread pool should scale over the number of the machine's threads
205        num_cpus::get()
206    }
207    /// The range of allowed number of threads (inclusive on both ends)
208    fn range() -> RangeInclusive<usize> {
209        RangeInclusive::new(Self::min(), Self::max())
210    }
211}
212
213struct AccountsDbBackgroundThreadsArg;
214impl ThreadArg for AccountsDbBackgroundThreadsArg {
215    const NAME: &'static str = "accounts_db_background_threads";
216    const LONG_NAME: &'static str = "accounts-db-background-threads";
217    const HELP: &'static str = "Number of threads to use for AccountsDb background tasks";
218
219    fn default() -> usize {
220        accounts_db::quarter_thread_count()
221    }
222}
223
224struct AccountsDbForegroundThreadsArg;
225impl ThreadArg for AccountsDbForegroundThreadsArg {
226    const NAME: &'static str = "accounts_db_foreground_threads";
227    const LONG_NAME: &'static str = "accounts-db-foreground-threads";
228    const HELP: &'static str =
229        "Number of threads to use for AccountsDb foreground tasks, e.g. transaction processing";
230
231    fn default() -> usize {
232        accounts_db::default_num_foreground_threads()
233    }
234}
235
236struct AccountsIndexFlushThreadsArg;
237impl ThreadArg for AccountsIndexFlushThreadsArg {
238    const NAME: &'static str = "accounts_index_flush_threads";
239    const LONG_NAME: &'static str = "accounts-index-flush-threads";
240    const HELP: &'static str = "Number of threads to use for flushing the accounts index";
241
242    fn default() -> usize {
243        accounts_index::default_num_flush_threads().get()
244    }
245}
246
247struct BlockProductionNumWorkersArg;
248impl ThreadArg for BlockProductionNumWorkersArg {
249    const NAME: &'static str = "block_production_num_workers";
250    const LONG_NAME: &'static str = "block-production-num-workers";
251    const HELP: &'static str = "Number of worker threads to use for block production";
252
253    fn default() -> usize {
254        BankingStage::default_num_workers().get()
255    }
256
257    fn min() -> usize {
258        1
259    }
260
261    fn max() -> usize {
262        BankingStage::max_num_workers().get()
263    }
264}
265
266struct IpEchoServerThreadsArg;
267impl ThreadArg for IpEchoServerThreadsArg {
268    const NAME: &'static str = "ip_echo_server_threads";
269    const LONG_NAME: &'static str = "ip-echo-server-threads";
270    const HELP: &'static str = "Number of threads to use for the IP echo server";
271
272    fn default() -> usize {
273        solana_net_utils::DEFAULT_IP_ECHO_SERVER_THREADS.get()
274    }
275    fn min() -> usize {
276        solana_net_utils::MINIMUM_IP_ECHO_SERVER_THREADS.get()
277    }
278}
279
280struct RayonGlobalThreadsArg;
281impl ThreadArg for RayonGlobalThreadsArg {
282    const NAME: &'static str = "rayon_global_threads";
283    const LONG_NAME: &'static str = "rayon-global-threads";
284    const HELP: &'static str = "Number of threads to use for the global rayon thread pool";
285
286    fn default() -> usize {
287        num_cpus::get()
288    }
289}
290
291struct ReplayForksThreadsArg;
292impl ThreadArg for ReplayForksThreadsArg {
293    const NAME: &'static str = "replay_forks_threads";
294    const LONG_NAME: &'static str = "replay-forks-threads";
295    const HELP: &'static str = "Number of threads to use for replay of blocks on different forks";
296
297    fn default() -> usize {
298        // Default to single threaded fork execution
299        1
300    }
301    fn max() -> usize {
302        // Choose a value that is small enough to limit the overhead of having a large thread pool
303        // while also being large enough to allow replay of all active forks in most scenarios
304        4
305    }
306}
307
308struct ReplayTransactionsThreadsArg;
309impl ThreadArg for ReplayTransactionsThreadsArg {
310    const NAME: &'static str = "replay_transactions_threads";
311    const LONG_NAME: &'static str = "replay-transactions-threads";
312    const HELP: &'static str = "Number of threads to use for transaction replay";
313
314    fn default() -> usize {
315        num_cpus::get()
316    }
317}
318
319pub struct RocksdbCompactionThreadsArg;
320impl ThreadArg for RocksdbCompactionThreadsArg {
321    const NAME: &'static str = "rocksdb_compaction_threads";
322    const LONG_NAME: &'static str = "rocksdb-compaction-threads";
323    const HELP: &'static str = "Number of threads to use for rocksdb (Blockstore) compactions";
324
325    fn default() -> usize {
326        solana_ledger::blockstore::default_num_compaction_threads().get()
327    }
328}
329
330pub struct RocksdbFlushThreadsArg;
331impl ThreadArg for RocksdbFlushThreadsArg {
332    const NAME: &'static str = "rocksdb_flush_threads";
333    const LONG_NAME: &'static str = "rocksdb-flush-threads";
334    const HELP: &'static str = "Number of threads to use for rocksdb (Blockstore) memtable flushes";
335
336    fn default() -> usize {
337        solana_ledger::blockstore::default_num_flush_threads().get()
338    }
339}
340
341struct TpuTransactionForwardReceiveThreadArgs;
342impl ThreadArg for TpuTransactionForwardReceiveThreadArgs {
343    const NAME: &'static str = "tpu_transaction_forward_receive_threads";
344    const LONG_NAME: &'static str = "tpu-transaction-forward-receive-threads";
345    const HELP: &'static str =
346        "Number of threads to use for receiving transactions on the TPU fowards port";
347
348    fn default() -> usize {
349        solana_streamer::quic::default_num_tpu_transaction_forward_receive_threads()
350    }
351}
352
353struct TpuTransactionReceiveThreads;
354impl ThreadArg for TpuTransactionReceiveThreads {
355    const NAME: &'static str = "tpu_transaction_receive_threads";
356    const LONG_NAME: &'static str = "tpu-transaction-receive-threads";
357    const HELP: &'static str =
358        "Number of threads to use for receiving transactions on the TPU port";
359
360    fn default() -> usize {
361        solana_streamer::quic::default_num_tpu_transaction_receive_threads()
362    }
363}
364
365struct TpuVoteTransactionReceiveThreads;
366impl ThreadArg for TpuVoteTransactionReceiveThreads {
367    const NAME: &'static str = "tpu_vote_transaction_receive_threads";
368    const LONG_NAME: &'static str = "tpu-vote-transaction-receive-threads";
369    const HELP: &'static str =
370        "Number of threads to use for receiving transactions on the TPU vote port";
371
372    fn default() -> usize {
373        solana_streamer::quic::default_num_tpu_vote_transaction_receive_threads()
374    }
375}
376
377struct TvuReceiveThreadsArg;
378impl ThreadArg for TvuReceiveThreadsArg {
379    const NAME: &'static str = "tvu_receive_threads";
380    const LONG_NAME: &'static str = "tvu-receive-threads";
381    const HELP: &'static str =
382        "Number of threads (and sockets) to use for receiving shreds on the TVU port";
383
384    fn default() -> usize {
385        solana_gossip::cluster_info::DEFAULT_NUM_TVU_RECEIVE_SOCKETS.get()
386    }
387    fn min() -> usize {
388        solana_gossip::cluster_info::MINIMUM_NUM_TVU_RECEIVE_SOCKETS.get()
389    }
390}
391
392struct TvuRetransmitThreadsArg;
393impl ThreadArg for TvuRetransmitThreadsArg {
394    const NAME: &'static str = "tvu_retransmit_threads";
395    const LONG_NAME: &'static str = "tvu-retransmit-threads";
396    const HELP: &'static str = "Number of threads (and sockets) to use for retransmitting shreds";
397
398    fn default() -> usize {
399        solana_gossip::cluster_info::DEFAULT_NUM_TVU_RETRANSMIT_SOCKETS.get()
400    }
401
402    fn min() -> usize {
403        solana_gossip::cluster_info::MINIMUM_NUM_TVU_RETRANSMIT_SOCKETS.get()
404    }
405}
406
407struct TvuShredSigverifyThreadsArg;
408impl ThreadArg for TvuShredSigverifyThreadsArg {
409    const NAME: &'static str = "tvu_shred_sigverify_threads";
410    const LONG_NAME: &'static str = "tvu-shred-sigverify-threads";
411    const HELP: &'static str =
412        "Number of threads to use for performing signature verification of received shreds";
413
414    fn default() -> usize {
415        get_thread_count()
416    }
417}