agave_validator/cli/
thread_args.rs

1//! Arguments for controlling the number of threads allocated for various tasks
2
3use {
4    clap::{value_t_or_exit, Arg, ArgMatches},
5    solana_accounts_db::{accounts_db, accounts_index},
6    solana_clap_utils::{hidden_unless_forced, input_validators::is_within_range},
7    solana_core::banking_stage::BankingStage,
8    solana_rayon_threadlimit::get_thread_count,
9    std::{num::NonZeroUsize, ops::RangeInclusive},
10};
11
12// Need this struct to provide &str whose lifetime matches that of the CLAP Arg's
13pub struct DefaultThreadArgs {
14    pub accounts_db_background_threads: String,
15    pub accounts_db_foreground_threads: String,
16    pub accounts_db_hash_threads: String,
17    pub accounts_index_flush_threads: String,
18    pub block_production_num_workers: String,
19    pub ip_echo_server_threads: String,
20    pub rayon_global_threads: String,
21    pub replay_forks_threads: String,
22    pub replay_transactions_threads: String,
23    pub rocksdb_compaction_threads: String,
24    pub rocksdb_flush_threads: String,
25    pub tpu_transaction_forward_receive_threads: String,
26    pub tpu_transaction_receive_threads: String,
27    pub tpu_vote_transaction_receive_threads: String,
28    pub tvu_receive_threads: String,
29    pub tvu_retransmit_threads: String,
30    pub tvu_sigverify_threads: String,
31}
32
33impl Default for DefaultThreadArgs {
34    fn default() -> Self {
35        Self {
36            accounts_db_background_threads: AccountsDbBackgroundThreadsArg::bounded_default()
37                .to_string(),
38            accounts_db_foreground_threads: AccountsDbForegroundThreadsArg::bounded_default()
39                .to_string(),
40            accounts_db_hash_threads: AccountsDbHashThreadsArg::bounded_default().to_string(),
41            accounts_index_flush_threads: AccountsIndexFlushThreadsArg::bounded_default()
42                .to_string(),
43            block_production_num_workers: BankingStage::default_num_workers().to_string(),
44            ip_echo_server_threads: IpEchoServerThreadsArg::bounded_default().to_string(),
45            rayon_global_threads: RayonGlobalThreadsArg::bounded_default().to_string(),
46            replay_forks_threads: ReplayForksThreadsArg::bounded_default().to_string(),
47            replay_transactions_threads: ReplayTransactionsThreadsArg::bounded_default()
48                .to_string(),
49            rocksdb_compaction_threads: RocksdbCompactionThreadsArg::bounded_default().to_string(),
50            rocksdb_flush_threads: RocksdbFlushThreadsArg::bounded_default().to_string(),
51            tpu_transaction_forward_receive_threads:
52                TpuTransactionForwardReceiveThreadArgs::bounded_default().to_string(),
53            tpu_transaction_receive_threads: TpuTransactionReceiveThreads::bounded_default()
54                .to_string(),
55            tpu_vote_transaction_receive_threads:
56                TpuVoteTransactionReceiveThreads::bounded_default().to_string(),
57            tvu_receive_threads: TvuReceiveThreadsArg::bounded_default().to_string(),
58            tvu_retransmit_threads: TvuRetransmitThreadsArg::bounded_default().to_string(),
59            tvu_sigverify_threads: TvuShredSigverifyThreadsArg::bounded_default().to_string(),
60        }
61    }
62}
63
64pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec<Arg<'_, 'a>> {
65    vec![
66        new_thread_arg::<AccountsDbBackgroundThreadsArg>(&defaults.accounts_db_background_threads),
67        new_thread_arg::<AccountsDbForegroundThreadsArg>(&defaults.accounts_db_foreground_threads),
68        new_thread_arg::<AccountsDbHashThreadsArg>(&defaults.accounts_db_hash_threads),
69        new_thread_arg::<AccountsIndexFlushThreadsArg>(&defaults.accounts_index_flush_threads),
70        new_thread_arg::<BlockProductionNumWorkersArg>(&defaults.block_production_num_workers),
71        new_thread_arg::<IpEchoServerThreadsArg>(&defaults.ip_echo_server_threads),
72        new_thread_arg::<RayonGlobalThreadsArg>(&defaults.rayon_global_threads),
73        new_thread_arg::<ReplayForksThreadsArg>(&defaults.replay_forks_threads),
74        new_thread_arg::<ReplayTransactionsThreadsArg>(&defaults.replay_transactions_threads),
75        new_thread_arg::<RocksdbCompactionThreadsArg>(&defaults.rocksdb_compaction_threads),
76        new_thread_arg::<RocksdbFlushThreadsArg>(&defaults.rocksdb_flush_threads),
77        new_thread_arg::<TpuTransactionForwardReceiveThreadArgs>(
78            &defaults.tpu_transaction_forward_receive_threads,
79        ),
80        new_thread_arg::<TpuTransactionReceiveThreads>(&defaults.tpu_transaction_receive_threads),
81        new_thread_arg::<TpuVoteTransactionReceiveThreads>(
82            &defaults.tpu_vote_transaction_receive_threads,
83        ),
84        new_thread_arg::<TvuReceiveThreadsArg>(&defaults.tvu_receive_threads),
85        new_thread_arg::<TvuRetransmitThreadsArg>(&defaults.tvu_retransmit_threads),
86        new_thread_arg::<TvuShredSigverifyThreadsArg>(&defaults.tvu_sigverify_threads),
87    ]
88}
89
90fn new_thread_arg<'a, T: ThreadArg>(default: &str) -> Arg<'_, 'a> {
91    Arg::with_name(T::NAME)
92        .long(T::LONG_NAME)
93        .takes_value(true)
94        .value_name("NUMBER")
95        .default_value(default)
96        .validator(|num| is_within_range(num, T::range()))
97        .hidden(hidden_unless_forced())
98        .help(T::HELP)
99}
100
101pub struct NumThreadConfig {
102    pub accounts_db_background_threads: NonZeroUsize,
103    pub accounts_db_foreground_threads: NonZeroUsize,
104    pub accounts_db_hash_threads: NonZeroUsize,
105    pub accounts_index_flush_threads: NonZeroUsize,
106    pub block_production_num_workers: NonZeroUsize,
107    pub ip_echo_server_threads: NonZeroUsize,
108    pub rayon_global_threads: NonZeroUsize,
109    pub replay_forks_threads: NonZeroUsize,
110    pub replay_transactions_threads: NonZeroUsize,
111    pub tpu_transaction_forward_receive_threads: NonZeroUsize,
112    pub tpu_transaction_receive_threads: NonZeroUsize,
113    pub tpu_vote_transaction_receive_threads: NonZeroUsize,
114    pub tvu_receive_threads: NonZeroUsize,
115    pub tvu_retransmit_threads: NonZeroUsize,
116    pub tvu_sigverify_threads: NonZeroUsize,
117}
118
119pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig {
120    let accounts_db_background_threads = {
121        if matches.is_present("accounts_db_clean_threads") {
122            value_t_or_exit!(matches, "accounts_db_clean_threads", NonZeroUsize)
123        } else {
124            value_t_or_exit!(matches, AccountsDbBackgroundThreadsArg::NAME, NonZeroUsize)
125        }
126    };
127    NumThreadConfig {
128        accounts_db_background_threads,
129        accounts_db_foreground_threads: value_t_or_exit!(
130            matches,
131            AccountsDbForegroundThreadsArg::NAME,
132            NonZeroUsize
133        ),
134        accounts_db_hash_threads: value_t_or_exit!(
135            matches,
136            AccountsDbHashThreadsArg::NAME,
137            NonZeroUsize
138        ),
139        accounts_index_flush_threads: value_t_or_exit!(
140            matches,
141            AccountsIndexFlushThreadsArg::NAME,
142            NonZeroUsize
143        ),
144        block_production_num_workers: value_t_or_exit!(
145            matches,
146            BlockProductionNumWorkersArg::NAME,
147            NonZeroUsize
148        ),
149        ip_echo_server_threads: value_t_or_exit!(
150            matches,
151            IpEchoServerThreadsArg::NAME,
152            NonZeroUsize
153        ),
154        rayon_global_threads: value_t_or_exit!(matches, RayonGlobalThreadsArg::NAME, NonZeroUsize),
155        replay_forks_threads: value_t_or_exit!(matches, ReplayForksThreadsArg::NAME, NonZeroUsize),
156        replay_transactions_threads: value_t_or_exit!(
157            matches,
158            ReplayTransactionsThreadsArg::NAME,
159            NonZeroUsize
160        ),
161        tpu_transaction_forward_receive_threads: value_t_or_exit!(
162            matches,
163            TpuTransactionForwardReceiveThreadArgs::NAME,
164            NonZeroUsize
165        ),
166        tpu_transaction_receive_threads: value_t_or_exit!(
167            matches,
168            TpuTransactionReceiveThreads::NAME,
169            NonZeroUsize
170        ),
171        tpu_vote_transaction_receive_threads: value_t_or_exit!(
172            matches,
173            TpuVoteTransactionReceiveThreads::NAME,
174            NonZeroUsize
175        ),
176        tvu_receive_threads: value_t_or_exit!(matches, TvuReceiveThreadsArg::NAME, NonZeroUsize),
177        tvu_retransmit_threads: value_t_or_exit!(
178            matches,
179            TvuRetransmitThreadsArg::NAME,
180            NonZeroUsize
181        ),
182        tvu_sigverify_threads: value_t_or_exit!(
183            matches,
184            TvuShredSigverifyThreadsArg::NAME,
185            NonZeroUsize
186        ),
187    }
188}
189
190/// Configuration for CLAP arguments that control the number of threads for various functions
191pub trait ThreadArg {
192    /// The argument's name
193    const NAME: &'static str;
194    /// The argument's long name
195    const LONG_NAME: &'static str;
196    /// The argument's help message
197    const HELP: &'static str;
198
199    /// The default number of threads
200    fn default() -> usize;
201    /// The default number of threads, bounded by Self::max()
202    /// This prevents potential CLAP issues on low core count machines where
203    /// a fixed value in Self::default() could be greater than Self::max()
204    fn bounded_default() -> usize {
205        std::cmp::min(Self::default(), Self::max())
206    }
207    /// The minimum allowed number of threads (inclusive)
208    fn min() -> usize {
209        1
210    }
211    /// The maximum allowed number of threads (inclusive)
212    fn max() -> usize {
213        // By default, no thread pool should scale over the number of the machine's threads
214        num_cpus::get()
215    }
216    /// The range of allowed number of threads (inclusive on both ends)
217    fn range() -> RangeInclusive<usize> {
218        RangeInclusive::new(Self::min(), Self::max())
219    }
220}
221
222struct AccountsDbBackgroundThreadsArg;
223impl ThreadArg for AccountsDbBackgroundThreadsArg {
224    const NAME: &'static str = "accounts_db_background_threads";
225    const LONG_NAME: &'static str = "accounts-db-background-threads";
226    const HELP: &'static str = "Number of threads to use for AccountsDb background tasks";
227
228    fn default() -> usize {
229        accounts_db::quarter_thread_count()
230    }
231}
232
233struct AccountsDbForegroundThreadsArg;
234impl ThreadArg for AccountsDbForegroundThreadsArg {
235    const NAME: &'static str = "accounts_db_foreground_threads";
236    const LONG_NAME: &'static str = "accounts-db-foreground-threads";
237    const HELP: &'static str =
238        "Number of threads to use for AccountsDb foreground tasks, e.g. transaction processing";
239
240    fn default() -> usize {
241        accounts_db::default_num_foreground_threads()
242    }
243}
244
245struct AccountsDbHashThreadsArg;
246impl ThreadArg for AccountsDbHashThreadsArg {
247    const NAME: &'static str = "accounts_db_hash_threads";
248    const LONG_NAME: &'static str = "accounts-db-hash-threads";
249    const HELP: &'static str = "Number of threads to use for accounts hash verification at startup";
250
251    fn default() -> usize {
252        accounts_db::default_num_hash_threads().get()
253    }
254}
255
256struct AccountsIndexFlushThreadsArg;
257impl ThreadArg for AccountsIndexFlushThreadsArg {
258    const NAME: &'static str = "accounts_index_flush_threads";
259    const LONG_NAME: &'static str = "accounts-index-flush-threads";
260    const HELP: &'static str = "Number of threads to use for flushing the accounts index";
261
262    fn default() -> usize {
263        accounts_index::default_num_flush_threads().get()
264    }
265}
266
267struct BlockProductionNumWorkersArg;
268impl ThreadArg for BlockProductionNumWorkersArg {
269    const NAME: &'static str = "block_production_num_workers";
270    const LONG_NAME: &'static str = "block-production-num-workers";
271    const HELP: &'static str = "Number of worker threads to use for block production";
272
273    fn default() -> usize {
274        BankingStage::default_num_workers().get()
275    }
276
277    fn min() -> usize {
278        1
279    }
280
281    fn max() -> usize {
282        BankingStage::max_num_workers().get()
283    }
284}
285
286struct IpEchoServerThreadsArg;
287impl ThreadArg for IpEchoServerThreadsArg {
288    const NAME: &'static str = "ip_echo_server_threads";
289    const LONG_NAME: &'static str = "ip-echo-server-threads";
290    const HELP: &'static str = "Number of threads to use for the IP echo server";
291
292    fn default() -> usize {
293        solana_net_utils::DEFAULT_IP_ECHO_SERVER_THREADS.get()
294    }
295    fn min() -> usize {
296        solana_net_utils::MINIMUM_IP_ECHO_SERVER_THREADS.get()
297    }
298}
299
300struct RayonGlobalThreadsArg;
301impl ThreadArg for RayonGlobalThreadsArg {
302    const NAME: &'static str = "rayon_global_threads";
303    const LONG_NAME: &'static str = "rayon-global-threads";
304    const HELP: &'static str = "Number of threads to use for the global rayon thread pool";
305
306    fn default() -> usize {
307        num_cpus::get()
308    }
309}
310
311struct ReplayForksThreadsArg;
312impl ThreadArg for ReplayForksThreadsArg {
313    const NAME: &'static str = "replay_forks_threads";
314    const LONG_NAME: &'static str = "replay-forks-threads";
315    const HELP: &'static str = "Number of threads to use for replay of blocks on different forks";
316
317    fn default() -> usize {
318        // Default to single threaded fork execution
319        1
320    }
321    fn max() -> usize {
322        // Choose a value that is small enough to limit the overhead of having a large thread pool
323        // while also being large enough to allow replay of all active forks in most scenarios
324        4
325    }
326}
327
328struct ReplayTransactionsThreadsArg;
329impl ThreadArg for ReplayTransactionsThreadsArg {
330    const NAME: &'static str = "replay_transactions_threads";
331    const LONG_NAME: &'static str = "replay-transactions-threads";
332    const HELP: &'static str = "Number of threads to use for transaction replay";
333
334    fn default() -> usize {
335        num_cpus::get()
336    }
337}
338
339pub struct RocksdbCompactionThreadsArg;
340impl ThreadArg for RocksdbCompactionThreadsArg {
341    const NAME: &'static str = "rocksdb_compaction_threads";
342    const LONG_NAME: &'static str = "rocksdb-compaction-threads";
343    const HELP: &'static str = "Number of threads to use for rocksdb (Blockstore) compactions";
344
345    fn default() -> usize {
346        solana_ledger::blockstore::default_num_compaction_threads().get()
347    }
348}
349
350pub struct RocksdbFlushThreadsArg;
351impl ThreadArg for RocksdbFlushThreadsArg {
352    const NAME: &'static str = "rocksdb_flush_threads";
353    const LONG_NAME: &'static str = "rocksdb-flush-threads";
354    const HELP: &'static str = "Number of threads to use for rocksdb (Blockstore) memtable flushes";
355
356    fn default() -> usize {
357        solana_ledger::blockstore::default_num_flush_threads().get()
358    }
359}
360
361struct TpuTransactionForwardReceiveThreadArgs;
362impl ThreadArg for TpuTransactionForwardReceiveThreadArgs {
363    const NAME: &'static str = "tpu_transaction_forward_receive_threads";
364    const LONG_NAME: &'static str = "tpu-transaction-forward-receive-threads";
365    const HELP: &'static str =
366        "Number of threads to use for receiving transactions on the TPU fowards port";
367
368    fn default() -> usize {
369        solana_streamer::quic::default_num_tpu_transaction_forward_receive_threads()
370    }
371}
372
373struct TpuTransactionReceiveThreads;
374impl ThreadArg for TpuTransactionReceiveThreads {
375    const NAME: &'static str = "tpu_transaction_receive_threads";
376    const LONG_NAME: &'static str = "tpu-transaction-receive-threads";
377    const HELP: &'static str =
378        "Number of threads to use for receiving transactions on the TPU port";
379
380    fn default() -> usize {
381        solana_streamer::quic::default_num_tpu_transaction_receive_threads()
382    }
383}
384
385struct TpuVoteTransactionReceiveThreads;
386impl ThreadArg for TpuVoteTransactionReceiveThreads {
387    const NAME: &'static str = "tpu_vote_transaction_receive_threads";
388    const LONG_NAME: &'static str = "tpu-vote-transaction-receive-threads";
389    const HELP: &'static str =
390        "Number of threads to use for receiving transactions on the TPU vote port";
391
392    fn default() -> usize {
393        solana_streamer::quic::default_num_tpu_vote_transaction_receive_threads()
394    }
395}
396
397struct TvuReceiveThreadsArg;
398impl ThreadArg for TvuReceiveThreadsArg {
399    const NAME: &'static str = "tvu_receive_threads";
400    const LONG_NAME: &'static str = "tvu-receive-threads";
401    const HELP: &'static str =
402        "Number of threads (and sockets) to use for receiving shreds on the TVU port";
403
404    fn default() -> usize {
405        solana_gossip::cluster_info::DEFAULT_NUM_TVU_RECEIVE_SOCKETS.get()
406    }
407    fn min() -> usize {
408        solana_gossip::cluster_info::MINIMUM_NUM_TVU_RECEIVE_SOCKETS.get()
409    }
410}
411
412struct TvuRetransmitThreadsArg;
413impl ThreadArg for TvuRetransmitThreadsArg {
414    const NAME: &'static str = "tvu_retransmit_threads";
415    const LONG_NAME: &'static str = "tvu-retransmit-threads";
416    const HELP: &'static str = "Number of threads (and sockets) to use for retransmitting shreds";
417
418    fn default() -> usize {
419        solana_gossip::cluster_info::DEFAULT_NUM_TVU_RETRANSMIT_SOCKETS.get()
420    }
421
422    fn min() -> usize {
423        solana_gossip::cluster_info::MINIMUM_NUM_TVU_RETRANSMIT_SOCKETS.get()
424    }
425}
426
427struct TvuShredSigverifyThreadsArg;
428impl ThreadArg for TvuShredSigverifyThreadsArg {
429    const NAME: &'static str = "tvu_shred_sigverify_threads";
430    const LONG_NAME: &'static str = "tvu-shred-sigverify-threads";
431    const HELP: &'static str =
432        "Number of threads to use for performing signature verification of received shreds";
433
434    fn default() -> usize {
435        get_thread_count()
436    }
437}