agave_thread_manager/
tokio_runtime.rs1use {
2 crate::{
3 policy::{apply_policy, parse_policy, CoreAllocation},
4 MAX_THREAD_NAME_CHARS,
5 },
6 serde::{Deserialize, Serialize},
7 solana_metrics::datapoint_info,
8 std::{
9 ops::Deref,
10 sync::{
11 atomic::{AtomicU64, AtomicUsize, Ordering},
12 Arc, Mutex,
13 },
14 time::Duration,
15 },
16 thread_priority::ThreadExt,
17};
18
19#[derive(Clone, Debug, Serialize, Deserialize)]
20#[serde(default)]
21pub struct TokioConfig {
22 pub worker_threads: usize,
24 pub max_blocking_threads: usize,
26 pub priority: u8,
28 pub policy: String,
29 pub stack_size_bytes: usize,
30 pub event_interval: u32,
31 pub core_allocation: CoreAllocation,
32}
33
34impl Default for TokioConfig {
35 fn default() -> Self {
36 Self {
37 core_allocation: CoreAllocation::OsDefault,
38 worker_threads: 8,
39 max_blocking_threads: 1,
40 priority: crate::policy::DEFAULT_PRIORITY,
41 policy: "OTHER".to_owned(),
42 stack_size_bytes: 2 * 1024 * 1024,
43 event_interval: 61,
44 }
45 }
46}
47
48#[derive(Debug)]
49pub struct TokioRuntime {
50 pub tokio: tokio::runtime::Runtime,
51 pub config: TokioConfig,
52 pub counters: Arc<ThreadCounters>,
53}
54
55impl Deref for TokioRuntime {
56 type Target = tokio::runtime::Runtime;
57
58 fn deref(&self) -> &Self::Target {
59 &self.tokio
60 }
61}
62
63impl TokioRuntime {
64 pub fn start_metrics_sampling(&self, period: Duration) {
67 let counters = self.counters.clone();
68 self.tokio.spawn(metrics_sampler(counters, period));
69 }
70
71 pub fn new(name: String, cfg: TokioConfig) -> anyhow::Result<Self> {
72 debug_assert!(name.len() < MAX_THREAD_NAME_CHARS, "Thread name too long");
73 let num_workers = if cfg.worker_threads == 0 {
74 num_cpus::get()
75 } else {
76 cfg.worker_threads
77 };
78 let chosen_cores_mask = cfg.core_allocation.as_core_mask_vector();
79
80 let base_name = name.clone();
81 let mut builder = match num_workers {
82 1 => tokio::runtime::Builder::new_current_thread(),
83 _ => {
84 let mut builder = tokio::runtime::Builder::new_multi_thread();
85 builder.worker_threads(num_workers);
86 builder
87 }
88 };
89 let atomic_id: AtomicUsize = AtomicUsize::new(0);
90
91 let counters = Arc::new(ThreadCounters {
92 namespace: format!("thread-manager-tokio-{}", &base_name).leak(),
94 total_threads_cnt: cfg.worker_threads as u64,
95 active_threads_cnt: AtomicU64::new(
96 (num_workers.wrapping_add(cfg.max_blocking_threads)) as u64,
97 ),
98 });
99 builder
100 .event_interval(cfg.event_interval)
101 .thread_name_fn(move || {
102 let id = atomic_id.fetch_add(1, Ordering::Relaxed);
103 format!("{}-{}", base_name, id)
104 })
105 .on_thread_park({
106 let counters = counters.clone();
107 move || {
108 counters.on_park();
109 }
110 })
111 .on_thread_unpark({
112 let counters = counters.clone();
113 move || {
114 counters.on_unpark();
115 }
116 })
117 .thread_stack_size(cfg.stack_size_bytes)
118 .enable_all()
119 .max_blocking_threads(cfg.max_blocking_threads);
120
121 let c = cfg.clone();
123 let chosen_cores_mask = Mutex::new(chosen_cores_mask);
124 builder.on_thread_start(move || {
125 let cur_thread = std::thread::current();
126 let _tid = cur_thread
127 .get_native_id()
128 .expect("Can not get thread id for newly created thread");
129
130 apply_policy(
131 &c.core_allocation,
132 parse_policy(&c.policy),
133 c.priority,
134 &chosen_cores_mask,
135 );
136 });
137 Ok(TokioRuntime {
138 tokio: builder.build()?,
139 config: cfg.clone(),
140 counters,
141 })
142 }
143
144 #[cfg(feature = "dev-context-only-utils")]
146 pub fn new_for_tests() -> Self {
147 let cfg = TokioConfig {
148 worker_threads: 2,
149 ..Default::default()
150 };
151 TokioRuntime::new("solNetTest".to_owned(), cfg.clone())
152 .expect("Failed to create Tokio runtime for tests")
153 }
154}
155
156#[derive(Debug)]
158pub struct ThreadCounters {
159 pub namespace: &'static str,
160 pub total_threads_cnt: u64,
161 pub active_threads_cnt: AtomicU64,
162}
163
164impl ThreadCounters {
165 pub fn on_park(&self) {
166 self.active_threads_cnt.fetch_sub(1, Ordering::Relaxed);
167 }
168
169 pub fn on_unpark(&self) {
170 self.active_threads_cnt.fetch_add(1, Ordering::Relaxed);
171 }
172}
173
174async fn metrics_sampler(counters: Arc<ThreadCounters>, period: Duration) {
175 let mut interval = tokio::time::interval(period);
176 loop {
177 interval.tick().await;
178 let active = counters.active_threads_cnt.load(Ordering::Relaxed) as i64;
179 let parked = (counters.total_threads_cnt as i64).saturating_sub(active);
180 datapoint_info!(
181 counters.namespace,
182 ("threads_parked", parked, i64),
183 ("threads_active", active, i64),
184 );
185 }
186}