Skip to main content

agave_validator/cli/
thread_args.rs

1//! Arguments for controlling the number of threads allocated for various tasks
2
3use {
4    clap::{Arg, ArgMatches, value_t_or_exit},
5    solana_accounts_db::{accounts_db, accounts_index},
6    solana_clap_utils::{hidden_unless_forced, input_validators::is_within_range},
7    solana_core::banking_stage::BankingStage,
8    solana_rayon_threadlimit::get_thread_count,
9    std::{num::NonZeroUsize, ops::RangeInclusive},
10};
11
12// Need this struct to provide &str whose lifetime matches that of the CLAP Arg's
13pub struct DefaultThreadArgs {
14    pub accounts_db_background_threads: String,
15    pub accounts_db_foreground_threads: String,
16    pub accounts_index_flush_threads: String,
17    pub block_production_num_workers: String,
18    pub ip_echo_server_threads: String,
19    pub rayon_global_threads: String,
20    pub replay_forks_threads: String,
21    pub replay_transactions_threads: String,
22    pub tpu_sigverify_threads: String,
23    pub tpu_transaction_forward_receive_threads: String,
24    pub tpu_transaction_receive_threads: String,
25    pub tpu_vote_transaction_receive_threads: String,
26    pub tvu_receive_threads: String,
27    pub tvu_retransmit_threads: String,
28    pub tvu_sigverify_threads: String,
29    pub tvu_bls_sigverify_threads: String,
30}
31
32impl Default for DefaultThreadArgs {
33    fn default() -> Self {
34        Self {
35            accounts_db_background_threads: AccountsDbBackgroundThreadsArg::bounded_default()
36                .to_string(),
37            accounts_db_foreground_threads: AccountsDbForegroundThreadsArg::bounded_default()
38                .to_string(),
39            accounts_index_flush_threads: AccountsIndexFlushThreadsArg::bounded_default()
40                .to_string(),
41            block_production_num_workers: BankingStage::default_num_workers().to_string(),
42            ip_echo_server_threads: IpEchoServerThreadsArg::bounded_default().to_string(),
43            rayon_global_threads: RayonGlobalThreadsArg::bounded_default().to_string(),
44            replay_forks_threads: ReplayForksThreadsArg::bounded_default().to_string(),
45            replay_transactions_threads: ReplayTransactionsThreadsArg::bounded_default()
46                .to_string(),
47            tpu_sigverify_threads: TpuSigverifyThreadsArg::bounded_default().to_string(),
48            tpu_transaction_forward_receive_threads:
49                TpuTransactionForwardReceiveThreadArgs::bounded_default().to_string(),
50            tpu_transaction_receive_threads: TpuTransactionReceiveThreads::bounded_default()
51                .to_string(),
52            tpu_vote_transaction_receive_threads:
53                TpuVoteTransactionReceiveThreads::bounded_default().to_string(),
54            tvu_receive_threads: TvuReceiveThreadsArg::bounded_default().to_string(),
55            tvu_retransmit_threads: TvuRetransmitThreadsArg::bounded_default().to_string(),
56            tvu_sigverify_threads: TvuShredSigverifyThreadsArg::bounded_default().to_string(),
57            tvu_bls_sigverify_threads: TvuBlsShredSigverifyThreadsArg::bounded_default()
58                .to_string(),
59        }
60    }
61}
62
63pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec<Arg<'_, 'a>> {
64    vec![
65        new_thread_arg::<AccountsDbBackgroundThreadsArg>(&defaults.accounts_db_background_threads),
66        new_thread_arg::<AccountsDbForegroundThreadsArg>(&defaults.accounts_db_foreground_threads),
67        new_thread_arg::<AccountsIndexFlushThreadsArg>(&defaults.accounts_index_flush_threads),
68        new_thread_arg::<BlockProductionNumWorkersArg>(&defaults.block_production_num_workers),
69        new_thread_arg::<IpEchoServerThreadsArg>(&defaults.ip_echo_server_threads),
70        new_thread_arg::<RayonGlobalThreadsArg>(&defaults.rayon_global_threads),
71        new_thread_arg::<ReplayForksThreadsArg>(&defaults.replay_forks_threads),
72        new_thread_arg::<ReplayTransactionsThreadsArg>(&defaults.replay_transactions_threads),
73        new_thread_arg::<TpuSigverifyThreadsArg>(&defaults.tpu_sigverify_threads),
74        new_thread_arg::<TpuTransactionForwardReceiveThreadArgs>(
75            &defaults.tpu_transaction_forward_receive_threads,
76        ),
77        new_thread_arg::<TpuTransactionReceiveThreads>(&defaults.tpu_transaction_receive_threads),
78        new_thread_arg::<TpuVoteTransactionReceiveThreads>(
79            &defaults.tpu_vote_transaction_receive_threads,
80        ),
81        new_thread_arg::<TvuReceiveThreadsArg>(&defaults.tvu_receive_threads),
82        new_thread_arg::<TvuRetransmitThreadsArg>(&defaults.tvu_retransmit_threads),
83        new_thread_arg::<TvuShredSigverifyThreadsArg>(&defaults.tvu_sigverify_threads),
84        new_thread_arg::<TvuBlsShredSigverifyThreadsArg>(&defaults.tvu_bls_sigverify_threads),
85    ]
86}
87
88pub(crate) fn new_thread_arg<'a, T: ThreadArg>(default: &str) -> Arg<'_, 'a> {
89    Arg::with_name(T::NAME)
90        .long(T::LONG_NAME)
91        .takes_value(true)
92        .value_name("NUMBER")
93        .default_value(default)
94        .validator(|num| is_within_range(num, T::range()))
95        .hidden(hidden_unless_forced())
96        .help(T::HELP)
97}
98
99pub struct NumThreadConfig {
100    pub accounts_db_background_threads: NonZeroUsize,
101    pub accounts_db_foreground_threads: NonZeroUsize,
102    pub accounts_index_flush_threads: NonZeroUsize,
103    pub block_production_num_workers: NonZeroUsize,
104    pub ip_echo_server_threads: NonZeroUsize,
105    pub rayon_global_threads: NonZeroUsize,
106    pub replay_forks_threads: NonZeroUsize,
107    pub replay_transactions_threads: NonZeroUsize,
108    pub tpu_sigverify_threads: NonZeroUsize,
109    pub tpu_transaction_forward_receive_threads: NonZeroUsize,
110    pub tpu_transaction_receive_threads: NonZeroUsize,
111    pub tpu_vote_transaction_receive_threads: NonZeroUsize,
112    pub tvu_receive_threads: NonZeroUsize,
113    pub tvu_retransmit_threads: NonZeroUsize,
114    pub tvu_sigverify_threads: NonZeroUsize,
115    pub tvu_bls_sigverify_threads: NonZeroUsize,
116}
117
118pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig {
119    NumThreadConfig {
120        accounts_db_background_threads: value_t_or_exit!(
121            matches,
122            AccountsDbBackgroundThreadsArg::NAME,
123            NonZeroUsize
124        ),
125        accounts_db_foreground_threads: value_t_or_exit!(
126            matches,
127            AccountsDbForegroundThreadsArg::NAME,
128            NonZeroUsize
129        ),
130        accounts_index_flush_threads: value_t_or_exit!(
131            matches,
132            AccountsIndexFlushThreadsArg::NAME,
133            NonZeroUsize
134        ),
135        block_production_num_workers: value_t_or_exit!(
136            matches,
137            BlockProductionNumWorkersArg::NAME,
138            NonZeroUsize
139        ),
140        ip_echo_server_threads: value_t_or_exit!(
141            matches,
142            IpEchoServerThreadsArg::NAME,
143            NonZeroUsize
144        ),
145        rayon_global_threads: value_t_or_exit!(matches, RayonGlobalThreadsArg::NAME, NonZeroUsize),
146        replay_forks_threads: value_t_or_exit!(matches, ReplayForksThreadsArg::NAME, NonZeroUsize),
147        replay_transactions_threads: value_t_or_exit!(
148            matches,
149            ReplayTransactionsThreadsArg::NAME,
150            NonZeroUsize
151        ),
152        tpu_sigverify_threads: value_t_or_exit!(
153            matches,
154            TpuSigverifyThreadsArg::NAME,
155            NonZeroUsize
156        ),
157        tpu_transaction_forward_receive_threads: value_t_or_exit!(
158            matches,
159            TpuTransactionForwardReceiveThreadArgs::NAME,
160            NonZeroUsize
161        ),
162        tpu_transaction_receive_threads: value_t_or_exit!(
163            matches,
164            TpuTransactionReceiveThreads::NAME,
165            NonZeroUsize
166        ),
167        tpu_vote_transaction_receive_threads: value_t_or_exit!(
168            matches,
169            TpuVoteTransactionReceiveThreads::NAME,
170            NonZeroUsize
171        ),
172        tvu_receive_threads: value_t_or_exit!(matches, TvuReceiveThreadsArg::NAME, NonZeroUsize),
173        tvu_retransmit_threads: value_t_or_exit!(
174            matches,
175            TvuRetransmitThreadsArg::NAME,
176            NonZeroUsize
177        ),
178        tvu_sigverify_threads: value_t_or_exit!(
179            matches,
180            TvuShredSigverifyThreadsArg::NAME,
181            NonZeroUsize
182        ),
183        tvu_bls_sigverify_threads: value_t_or_exit!(
184            matches,
185            TvuBlsShredSigverifyThreadsArg::NAME,
186            NonZeroUsize
187        ),
188    }
189}
190
191/// Configuration for CLAP arguments that control the number of threads for various functions
192pub trait ThreadArg {
193    /// The argument's name
194    const NAME: &'static str;
195    /// The argument's long name
196    const LONG_NAME: &'static str;
197    /// The argument's help message
198    const HELP: &'static str;
199
200    /// The default number of threads
201    fn default() -> usize;
202    /// The default number of threads, bounded by Self::max()
203    /// This prevents potential CLAP issues on low core count machines where
204    /// a fixed value in Self::default() could be greater than Self::max()
205    fn bounded_default() -> usize {
206        std::cmp::min(Self::default(), Self::max())
207    }
208    /// The minimum allowed number of threads (inclusive)
209    fn min() -> usize {
210        1
211    }
212    /// The maximum allowed number of threads (inclusive)
213    fn max() -> usize {
214        // By default, no thread pool should scale over the number of the machine's threads
215        num_cpus::get()
216    }
217    /// The range of allowed number of threads (inclusive on both ends)
218    fn range() -> RangeInclusive<usize> {
219        RangeInclusive::new(Self::min(), Self::max())
220    }
221}
222
223struct AccountsDbBackgroundThreadsArg;
224impl ThreadArg for AccountsDbBackgroundThreadsArg {
225    const NAME: &'static str = "accounts_db_background_threads";
226    const LONG_NAME: &'static str = "accounts-db-background-threads";
227    const HELP: &'static str = "Number of threads to use for AccountsDb background tasks";
228
229    fn default() -> usize {
230        accounts_db::quarter_thread_count()
231    }
232}
233
234struct AccountsDbForegroundThreadsArg;
235impl ThreadArg for AccountsDbForegroundThreadsArg {
236    const NAME: &'static str = "accounts_db_foreground_threads";
237    const LONG_NAME: &'static str = "accounts-db-foreground-threads";
238    const HELP: &'static str =
239        "Number of threads to use for AccountsDb foreground tasks, e.g. transaction processing";
240
241    fn default() -> usize {
242        accounts_db::default_num_foreground_threads()
243    }
244}
245
246struct AccountsIndexFlushThreadsArg;
247impl ThreadArg for AccountsIndexFlushThreadsArg {
248    const NAME: &'static str = "accounts_index_flush_threads";
249    const LONG_NAME: &'static str = "accounts-index-flush-threads";
250    const HELP: &'static str = "Number of threads to use for flushing the accounts index";
251
252    fn default() -> usize {
253        accounts_index::default_num_flush_threads().get()
254    }
255}
256
257struct BlockProductionNumWorkersArg;
258impl ThreadArg for BlockProductionNumWorkersArg {
259    const NAME: &'static str = "block_production_num_workers";
260    const LONG_NAME: &'static str = "block-production-num-workers";
261    const HELP: &'static str = "Number of worker threads to use for block production";
262
263    fn default() -> usize {
264        BankingStage::default_num_workers().get()
265    }
266
267    fn min() -> usize {
268        1
269    }
270
271    fn max() -> usize {
272        BankingStage::max_num_workers().get()
273    }
274}
275
276struct IpEchoServerThreadsArg;
277impl ThreadArg for IpEchoServerThreadsArg {
278    const NAME: &'static str = "ip_echo_server_threads";
279    const LONG_NAME: &'static str = "ip-echo-server-threads";
280    const HELP: &'static str = "Number of threads to use for the IP echo server";
281
282    fn default() -> usize {
283        solana_net_utils::DEFAULT_IP_ECHO_SERVER_THREADS.get()
284    }
285}
286
287struct RayonGlobalThreadsArg;
288impl ThreadArg for RayonGlobalThreadsArg {
289    const NAME: &'static str = "rayon_global_threads";
290    const LONG_NAME: &'static str = "rayon-global-threads";
291    const HELP: &'static str = "Number of threads to use for the global rayon thread pool";
292
293    fn default() -> usize {
294        num_cpus::get()
295    }
296}
297
298struct ReplayForksThreadsArg;
299impl ThreadArg for ReplayForksThreadsArg {
300    const NAME: &'static str = "replay_forks_threads";
301    const LONG_NAME: &'static str = "replay-forks-threads";
302    const HELP: &'static str = "Number of threads to use for replay of blocks on different forks";
303
304    fn default() -> usize {
305        // Default to single threaded fork execution
306        1
307    }
308    fn max() -> usize {
309        // Choose a value that is small enough to limit the overhead of having a large thread pool
310        // while also being large enough to allow replay of all active forks in most scenarios
311        4
312    }
313}
314
315struct ReplayTransactionsThreadsArg;
316impl ThreadArg for ReplayTransactionsThreadsArg {
317    const NAME: &'static str = "replay_transactions_threads";
318    const LONG_NAME: &'static str = "replay-transactions-threads";
319    const HELP: &'static str = "Number of threads to use for transaction replay";
320
321    fn default() -> usize {
322        num_cpus::get()
323    }
324}
325
326struct TpuSigverifyThreadsArg;
327impl ThreadArg for TpuSigverifyThreadsArg {
328    const NAME: &'static str = "tpu_sigverify_threads";
329    const LONG_NAME: &'static str = "tpu-sigverify-threads";
330    const HELP: &'static str =
331        "Number of threads to use for performing signature verification of received transactions";
332
333    fn default() -> usize {
334        get_thread_count()
335    }
336}
337
338struct TpuTransactionForwardReceiveThreadArgs;
339impl ThreadArg for TpuTransactionForwardReceiveThreadArgs {
340    const NAME: &'static str = "tpu_transaction_forward_receive_threads";
341    const LONG_NAME: &'static str = "tpu-transaction-forward-receive-threads";
342    const HELP: &'static str =
343        "Number of threads to use for receiving transactions on the TPU forwards port";
344
345    fn default() -> usize {
346        solana_streamer::quic::default_num_tpu_transaction_forward_receive_threads()
347    }
348}
349
350struct TpuTransactionReceiveThreads;
351impl ThreadArg for TpuTransactionReceiveThreads {
352    const NAME: &'static str = "tpu_transaction_receive_threads";
353    const LONG_NAME: &'static str = "tpu-transaction-receive-threads";
354    const HELP: &'static str =
355        "Number of threads to use for receiving transactions on the TPU port";
356
357    fn default() -> usize {
358        solana_streamer::quic::default_num_tpu_transaction_receive_threads()
359    }
360}
361
362struct TpuVoteTransactionReceiveThreads;
363impl ThreadArg for TpuVoteTransactionReceiveThreads {
364    const NAME: &'static str = "tpu_vote_transaction_receive_threads";
365    const LONG_NAME: &'static str = "tpu-vote-transaction-receive-threads";
366    const HELP: &'static str =
367        "Number of threads to use for receiving transactions on the TPU vote port";
368
369    fn default() -> usize {
370        solana_streamer::quic::default_num_tpu_vote_transaction_receive_threads()
371    }
372}
373
374struct TvuReceiveThreadsArg;
375impl ThreadArg for TvuReceiveThreadsArg {
376    const NAME: &'static str = "tvu_receive_threads";
377    const LONG_NAME: &'static str = "tvu-receive-threads";
378    const HELP: &'static str =
379        "Number of threads (and sockets) to use for receiving shreds on the TVU port";
380
381    fn default() -> usize {
382        solana_gossip::cluster_info::DEFAULT_NUM_TVU_RECEIVE_SOCKETS.get()
383    }
384    fn min() -> usize {
385        solana_gossip::cluster_info::MINIMUM_NUM_TVU_RECEIVE_SOCKETS.get()
386    }
387}
388
389struct TvuRetransmitThreadsArg;
390impl ThreadArg for TvuRetransmitThreadsArg {
391    const NAME: &'static str = "tvu_retransmit_threads";
392    const LONG_NAME: &'static str = "tvu-retransmit-threads";
393    const HELP: &'static str = "Number of threads (and sockets) to use for retransmitting shreds";
394
395    fn default() -> usize {
396        solana_gossip::cluster_info::DEFAULT_NUM_TVU_RETRANSMIT_SOCKETS.get()
397    }
398
399    fn min() -> usize {
400        solana_gossip::cluster_info::MINIMUM_NUM_TVU_RETRANSMIT_SOCKETS.get()
401    }
402}
403
404struct TvuShredSigverifyThreadsArg;
405impl ThreadArg for TvuShredSigverifyThreadsArg {
406    const NAME: &'static str = "tvu_shred_sigverify_threads";
407    const LONG_NAME: &'static str = "tvu-shred-sigverify-threads";
408    const HELP: &'static str =
409        "Number of threads to use for performing signature verification of received shreds";
410
411    fn default() -> usize {
412        get_thread_count()
413    }
414}
415
416struct TvuBlsShredSigverifyThreadsArg;
417impl ThreadArg for TvuBlsShredSigverifyThreadsArg {
418    const NAME: &'static str = "tvu_bls_shred_sigverify_threads";
419    const LONG_NAME: &'static str = "tvu-bls-shred-sigverify-threads";
420    const HELP: &'static str = "Number of threads to use for performing BLS signature \
421                                verification of received Alpenglow consensus messages";
422
423    fn default() -> usize {
424        get_thread_count()
425    }
426}