agave_validator/cli/
thread_args.rs

1//! Arguments for controlling the number of threads allocated for various tasks
2
3use {
4    clap::{value_t_or_exit, Arg, ArgMatches},
5    solana_accounts_db::{accounts_db, accounts_index},
6    solana_clap_utils::{hidden_unless_forced, input_validators::is_within_range},
7    solana_rayon_threadlimit::{get_max_thread_count, get_thread_count},
8    std::{num::NonZeroUsize, ops::RangeInclusive},
9};
10
11// Need this struct to provide &str whose lifetime matches that of the CLAP Arg's
12pub struct DefaultThreadArgs {
13    pub accounts_db_clean_threads: String,
14    pub accounts_db_foreground_threads: String,
15    pub accounts_db_hash_threads: String,
16    pub accounts_index_flush_threads: String,
17    pub ip_echo_server_threads: String,
18    pub rayon_global_threads: String,
19    pub replay_forks_threads: String,
20    pub replay_transactions_threads: String,
21    pub rocksdb_compaction_threads: String,
22    pub rocksdb_flush_threads: String,
23    pub tvu_receive_threads: String,
24    pub tvu_retransmit_threads: String,
25    pub tvu_sigverify_threads: String,
26}
27
28impl Default for DefaultThreadArgs {
29    fn default() -> Self {
30        Self {
31            accounts_db_clean_threads: AccountsDbCleanThreadsArg::bounded_default().to_string(),
32            accounts_db_foreground_threads: AccountsDbForegroundThreadsArg::bounded_default()
33                .to_string(),
34            accounts_db_hash_threads: AccountsDbHashThreadsArg::bounded_default().to_string(),
35            accounts_index_flush_threads: AccountsIndexFlushThreadsArg::bounded_default()
36                .to_string(),
37            ip_echo_server_threads: IpEchoServerThreadsArg::bounded_default().to_string(),
38            rayon_global_threads: RayonGlobalThreadsArg::bounded_default().to_string(),
39            replay_forks_threads: ReplayForksThreadsArg::bounded_default().to_string(),
40            replay_transactions_threads: ReplayTransactionsThreadsArg::bounded_default()
41                .to_string(),
42            rocksdb_compaction_threads: RocksdbCompactionThreadsArg::bounded_default().to_string(),
43            rocksdb_flush_threads: RocksdbFlushThreadsArg::bounded_default().to_string(),
44            tvu_receive_threads: TvuReceiveThreadsArg::bounded_default().to_string(),
45            tvu_retransmit_threads: TvuRetransmitThreadsArg::bounded_default().to_string(),
46            tvu_sigverify_threads: TvuShredSigverifyThreadsArg::bounded_default().to_string(),
47        }
48    }
49}
50
51pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec<Arg<'_, 'a>> {
52    vec![
53        new_thread_arg::<AccountsDbCleanThreadsArg>(&defaults.accounts_db_clean_threads),
54        new_thread_arg::<AccountsDbForegroundThreadsArg>(&defaults.accounts_db_foreground_threads),
55        new_thread_arg::<AccountsDbHashThreadsArg>(&defaults.accounts_db_hash_threads),
56        new_thread_arg::<AccountsIndexFlushThreadsArg>(&defaults.accounts_index_flush_threads),
57        new_thread_arg::<IpEchoServerThreadsArg>(&defaults.ip_echo_server_threads),
58        new_thread_arg::<RayonGlobalThreadsArg>(&defaults.rayon_global_threads),
59        new_thread_arg::<ReplayForksThreadsArg>(&defaults.replay_forks_threads),
60        new_thread_arg::<ReplayTransactionsThreadsArg>(&defaults.replay_transactions_threads),
61        new_thread_arg::<RocksdbCompactionThreadsArg>(&defaults.rocksdb_compaction_threads),
62        new_thread_arg::<RocksdbFlushThreadsArg>(&defaults.rocksdb_flush_threads),
63        new_thread_arg::<TvuReceiveThreadsArg>(&defaults.tvu_receive_threads),
64        new_thread_arg::<TvuRetransmitThreadsArg>(&defaults.tvu_retransmit_threads),
65        new_thread_arg::<TvuShredSigverifyThreadsArg>(&defaults.tvu_sigverify_threads),
66    ]
67}
68
69fn new_thread_arg<'a, T: ThreadArg>(default: &str) -> Arg<'_, 'a> {
70    Arg::with_name(T::NAME)
71        .long(T::LONG_NAME)
72        .takes_value(true)
73        .value_name("NUMBER")
74        .default_value(default)
75        .validator(|num| is_within_range(num, T::range()))
76        .hidden(hidden_unless_forced())
77        .help(T::HELP)
78}
79
80pub struct NumThreadConfig {
81    pub accounts_db_clean_threads: NonZeroUsize,
82    pub accounts_db_foreground_threads: NonZeroUsize,
83    pub accounts_db_hash_threads: NonZeroUsize,
84    pub accounts_index_flush_threads: NonZeroUsize,
85    pub ip_echo_server_threads: NonZeroUsize,
86    pub rayon_global_threads: NonZeroUsize,
87    pub replay_forks_threads: NonZeroUsize,
88    pub replay_transactions_threads: NonZeroUsize,
89    pub rocksdb_compaction_threads: NonZeroUsize,
90    pub rocksdb_flush_threads: NonZeroUsize,
91    pub tvu_receive_threads: NonZeroUsize,
92    pub tvu_retransmit_threads: NonZeroUsize,
93    pub tvu_sigverify_threads: NonZeroUsize,
94}
95
96pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig {
97    NumThreadConfig {
98        accounts_db_clean_threads: value_t_or_exit!(
99            matches,
100            AccountsDbCleanThreadsArg::NAME,
101            NonZeroUsize
102        ),
103        accounts_db_foreground_threads: value_t_or_exit!(
104            matches,
105            AccountsDbForegroundThreadsArg::NAME,
106            NonZeroUsize
107        ),
108        accounts_db_hash_threads: value_t_or_exit!(
109            matches,
110            AccountsDbHashThreadsArg::NAME,
111            NonZeroUsize
112        ),
113        accounts_index_flush_threads: value_t_or_exit!(
114            matches,
115            AccountsIndexFlushThreadsArg::NAME,
116            NonZeroUsize
117        ),
118        ip_echo_server_threads: value_t_or_exit!(
119            matches,
120            IpEchoServerThreadsArg::NAME,
121            NonZeroUsize
122        ),
123        rayon_global_threads: value_t_or_exit!(matches, RayonGlobalThreadsArg::NAME, NonZeroUsize),
124        replay_forks_threads: if matches.is_present("replay_slots_concurrently") {
125            NonZeroUsize::new(4).expect("4 is non-zero")
126        } else {
127            value_t_or_exit!(matches, ReplayForksThreadsArg::NAME, NonZeroUsize)
128        },
129        replay_transactions_threads: value_t_or_exit!(
130            matches,
131            ReplayTransactionsThreadsArg::NAME,
132            NonZeroUsize
133        ),
134        rocksdb_compaction_threads: value_t_or_exit!(
135            matches,
136            RocksdbCompactionThreadsArg::NAME,
137            NonZeroUsize
138        ),
139        rocksdb_flush_threads: value_t_or_exit!(
140            matches,
141            RocksdbFlushThreadsArg::NAME,
142            NonZeroUsize
143        ),
144        tvu_receive_threads: value_t_or_exit!(matches, TvuReceiveThreadsArg::NAME, NonZeroUsize),
145        tvu_retransmit_threads: value_t_or_exit!(
146            matches,
147            TvuRetransmitThreadsArg::NAME,
148            NonZeroUsize
149        ),
150        tvu_sigverify_threads: value_t_or_exit!(
151            matches,
152            TvuShredSigverifyThreadsArg::NAME,
153            NonZeroUsize
154        ),
155    }
156}
157
158/// Configuration for CLAP arguments that control the number of threads for various functions
159trait ThreadArg {
160    /// The argument's name
161    const NAME: &'static str;
162    /// The argument's long name
163    const LONG_NAME: &'static str;
164    /// The argument's help message
165    const HELP: &'static str;
166
167    /// The default number of threads
168    fn default() -> usize;
169    /// The default number of threads, bounded by Self::max()
170    /// This prevents potential CLAP issues on low core count machines where
171    /// a fixed value in Self::default() could be greater than Self::max()
172    fn bounded_default() -> usize {
173        std::cmp::min(Self::default(), Self::max())
174    }
175    /// The minimum allowed number of threads (inclusive)
176    fn min() -> usize {
177        1
178    }
179    /// The maximum allowed number of threads (inclusive)
180    fn max() -> usize {
181        // By default, no thread pool should scale over the number of the machine's threads
182        get_max_thread_count()
183    }
184    /// The range of allowed number of threads (inclusive on both ends)
185    fn range() -> RangeInclusive<usize> {
186        RangeInclusive::new(Self::min(), Self::max())
187    }
188}
189
190struct AccountsDbCleanThreadsArg;
191impl ThreadArg for AccountsDbCleanThreadsArg {
192    const NAME: &'static str = "accounts_db_clean_threads";
193    const LONG_NAME: &'static str = "accounts-db-clean-threads";
194    const HELP: &'static str = "Number of threads to use for cleaning AccountsDb";
195
196    fn default() -> usize {
197        accounts_db::quarter_thread_count()
198    }
199}
200
201struct AccountsDbForegroundThreadsArg;
202impl ThreadArg for AccountsDbForegroundThreadsArg {
203    const NAME: &'static str = "accounts_db_foreground_threads";
204    const LONG_NAME: &'static str = "accounts-db-foreground-threads";
205    const HELP: &'static str = "Number of threads to use for AccountsDb block processing";
206
207    fn default() -> usize {
208        accounts_db::default_num_foreground_threads()
209    }
210}
211
212struct AccountsDbHashThreadsArg;
213impl ThreadArg for AccountsDbHashThreadsArg {
214    const NAME: &'static str = "accounts_db_hash_threads";
215    const LONG_NAME: &'static str = "accounts-db-hash-threads";
216    const HELP: &'static str = "Number of threads to use for background accounts hashing";
217
218    fn default() -> usize {
219        accounts_db::default_num_hash_threads().get()
220    }
221}
222
223struct AccountsIndexFlushThreadsArg;
224impl ThreadArg for AccountsIndexFlushThreadsArg {
225    const NAME: &'static str = "accounts_index_flush_threads";
226    const LONG_NAME: &'static str = "accounts-index-flush-threads";
227    const HELP: &'static str = "Number of threads to use for flushing the accounts index";
228
229    fn default() -> usize {
230        accounts_index::default_num_flush_threads().get()
231    }
232}
233
234struct IpEchoServerThreadsArg;
235impl ThreadArg for IpEchoServerThreadsArg {
236    const NAME: &'static str = "ip_echo_server_threads";
237    const LONG_NAME: &'static str = "ip-echo-server-threads";
238    const HELP: &'static str = "Number of threads to use for the IP echo server";
239
240    fn default() -> usize {
241        solana_net_utils::DEFAULT_IP_ECHO_SERVER_THREADS.get()
242    }
243    fn min() -> usize {
244        solana_net_utils::MINIMUM_IP_ECHO_SERVER_THREADS.get()
245    }
246}
247
248struct RayonGlobalThreadsArg;
249impl ThreadArg for RayonGlobalThreadsArg {
250    const NAME: &'static str = "rayon_global_threads";
251    const LONG_NAME: &'static str = "rayon-global-threads";
252    const HELP: &'static str = "Number of threads to use for the global rayon thread pool";
253
254    fn default() -> usize {
255        get_max_thread_count()
256    }
257}
258
259struct ReplayForksThreadsArg;
260impl ThreadArg for ReplayForksThreadsArg {
261    const NAME: &'static str = "replay_forks_threads";
262    const LONG_NAME: &'static str = "replay-forks-threads";
263    const HELP: &'static str = "Number of threads to use for replay of blocks on different forks";
264
265    fn default() -> usize {
266        // Default to single threaded fork execution
267        1
268    }
269    fn max() -> usize {
270        // Choose a value that is small enough to limit the overhead of having a large thread pool
271        // while also being large enough to allow replay of all active forks in most scenarios
272        4
273    }
274}
275
276struct ReplayTransactionsThreadsArg;
277impl ThreadArg for ReplayTransactionsThreadsArg {
278    const NAME: &'static str = "replay_transactions_threads";
279    const LONG_NAME: &'static str = "replay-transactions-threads";
280    const HELP: &'static str = "Number of threads to use for transaction replay";
281
282    fn default() -> usize {
283        get_max_thread_count()
284    }
285}
286
287struct RocksdbCompactionThreadsArg;
288impl ThreadArg for RocksdbCompactionThreadsArg {
289    const NAME: &'static str = "rocksdb_compaction_threads";
290    const LONG_NAME: &'static str = "rocksdb-compaction-threads";
291    const HELP: &'static str = "Number of threads to use for rocksdb (Blockstore) compactions";
292
293    fn default() -> usize {
294        solana_ledger::blockstore::default_num_compaction_threads().get()
295    }
296}
297
298struct RocksdbFlushThreadsArg;
299impl ThreadArg for RocksdbFlushThreadsArg {
300    const NAME: &'static str = "rocksdb_flush_threads";
301    const LONG_NAME: &'static str = "rocksdb-flush-threads";
302    const HELP: &'static str = "Number of threads to use for rocksdb (Blockstore) memtable flushes";
303
304    fn default() -> usize {
305        solana_ledger::blockstore::default_num_flush_threads().get()
306    }
307}
308
309struct TvuReceiveThreadsArg;
310impl ThreadArg for TvuReceiveThreadsArg {
311    const NAME: &'static str = "tvu_receive_threads";
312    const LONG_NAME: &'static str = "tvu-receive-threads";
313    const HELP: &'static str =
314        "Number of threads (and sockets) to use for receiving shreds on the TVU port";
315
316    fn default() -> usize {
317        solana_gossip::cluster_info::DEFAULT_NUM_TVU_RECEIVE_SOCKETS.get()
318    }
319    fn min() -> usize {
320        solana_gossip::cluster_info::MINIMUM_NUM_TVU_RECEIVE_SOCKETS.get()
321    }
322}
323
324struct TvuRetransmitThreadsArg;
325impl ThreadArg for TvuRetransmitThreadsArg {
326    const NAME: &'static str = "tvu_retransmit_threads";
327    const LONG_NAME: &'static str = "tvu-retransmit-threads";
328    const HELP: &'static str = "Number of threads (and sockets) to use for retransmitting shreds";
329
330    fn default() -> usize {
331        solana_gossip::cluster_info::DEFAULT_NUM_TVU_RETRANSMIT_SOCKETS.get()
332    }
333
334    fn min() -> usize {
335        solana_gossip::cluster_info::MINIMUM_NUM_TVU_RETRANSMIT_SOCKETS.get()
336    }
337}
338
339struct TvuShredSigverifyThreadsArg;
340impl ThreadArg for TvuShredSigverifyThreadsArg {
341    const NAME: &'static str = "tvu_shred_sigverify_threads";
342    const LONG_NAME: &'static str = "tvu-shred-sigverify-threads";
343    const HELP: &'static str =
344        "Number of threads to use for performing signature verification of received shreds";
345
346    fn default() -> usize {
347        get_thread_count()
348    }
349}