1#![cfg_attr(
2 not(feature = "agave-unstable-api"),
3 deprecated(
4 since = "3.1.0",
5 note = "This crate has been marked for formal inclusion in the Agave Unstable API. From \
6 v4.0.0 onward, the `agave-unstable-api` crate feature must be specified to \
7 acknowledge use of an interface that may break without warning."
8 )
9)]
10use {
11 anyhow::Ok,
12 serde::{Deserialize, Serialize},
13 std::{collections::HashMap, ops::Deref, sync::Arc},
14};
15
16pub mod native_thread_runtime;
17pub mod policy;
18pub mod rayon_runtime;
19pub mod tokio_runtime;
20
21pub use {
22 native_thread_runtime::{JoinHandle, NativeConfig, NativeThreadRuntime},
23 policy::CoreAllocation,
24 rayon_runtime::{RayonConfig, RayonRuntime},
25 tokio_runtime::{TokioConfig, TokioRuntime},
26};
27
28pub const MAX_THREAD_NAME_CHARS: usize = 12;
29
30#[derive(Default, Debug)]
31pub struct ThreadManagerInner {
32 pub tokio_runtimes: HashMap<String, TokioRuntime>,
33 pub tokio_runtime_mapping: HashMap<String, String>,
34
35 pub native_thread_runtimes: HashMap<String, NativeThreadRuntime>,
36 pub native_runtime_mapping: HashMap<String, String>,
37
38 pub rayon_runtimes: HashMap<String, RayonRuntime>,
39 pub rayon_runtime_mapping: HashMap<String, String>,
40}
41
42impl ThreadManagerInner {
43 fn populate_mappings(&mut self, config: &ThreadManagerConfig) {
45 for name in config.native_configs.keys() {
48 self.native_runtime_mapping
49 .insert(name.clone(), name.clone());
50 }
51 for (k, v) in config.native_runtime_mapping.iter() {
52 self.native_runtime_mapping.insert(k.clone(), v.clone());
53 }
54
55 for name in config.tokio_configs.keys() {
56 self.tokio_runtime_mapping
57 .insert(name.clone(), name.clone());
58 }
59 for (k, v) in config.tokio_runtime_mapping.iter() {
60 self.tokio_runtime_mapping.insert(k.clone(), v.clone());
61 }
62
63 for name in config.rayon_configs.keys() {
64 self.rayon_runtime_mapping
65 .insert(name.clone(), name.clone());
66 }
67 for (k, v) in config.rayon_runtime_mapping.iter() {
68 self.rayon_runtime_mapping.insert(k.clone(), v.clone());
69 }
70 }
71}
72
73#[derive(Default, Debug, Clone)]
74pub struct ThreadManager {
75 inner: Arc<ThreadManagerInner>,
76}
77
78impl Deref for ThreadManager {
79 type Target = ThreadManagerInner;
80
81 fn deref(&self) -> &Self::Target {
82 &self.inner
83 }
84}
85
86#[derive(Clone, Debug, Serialize, Deserialize)]
87#[serde(default)]
88pub struct ThreadManagerConfig {
89 pub native_configs: HashMap<String, NativeConfig>,
90 pub native_runtime_mapping: HashMap<String, String>,
91
92 pub rayon_configs: HashMap<String, RayonConfig>,
93 pub rayon_runtime_mapping: HashMap<String, String>,
94
95 pub tokio_configs: HashMap<String, TokioConfig>,
96 pub tokio_runtime_mapping: HashMap<String, String>,
97
98 pub default_core_allocation: CoreAllocation,
99}
100
101impl Default for ThreadManagerConfig {
102 fn default() -> Self {
103 Self {
104 native_configs: HashMap::from([("default".to_owned(), NativeConfig::default())]),
105 native_runtime_mapping: HashMap::new(),
106 rayon_configs: HashMap::from([("default".to_owned(), RayonConfig::default())]),
107 rayon_runtime_mapping: HashMap::new(),
108 tokio_configs: HashMap::from([("default".to_owned(), TokioConfig::default())]),
109 tokio_runtime_mapping: HashMap::new(),
110 default_core_allocation: CoreAllocation::OsDefault,
111 }
112 }
113}
114
115impl ThreadManager {
116 fn lookup<'a, T>(
118 &'a self,
119 name: &str,
120 mapping: &HashMap<String, String>,
121 runtimes: &'a HashMap<String, T>,
122 ) -> Option<&'a T> {
123 match mapping.get(name) {
124 Some(n) => runtimes.get(n),
125 None => match mapping.get("default") {
126 Some(n) => {
127 log::warn!("Falling back to default runtime for {name}");
128 runtimes.get(n)
129 }
130 None => None,
131 },
132 }
133 }
134
135 pub fn try_get_native(&self, name: &str) -> Option<&NativeThreadRuntime> {
136 self.lookup(
137 name,
138 &self.native_runtime_mapping,
139 &self.native_thread_runtimes,
140 )
141 }
142 pub fn get_native(&self, name: &str) -> &NativeThreadRuntime {
143 if let Some(runtime) = self.try_get_native(name) {
144 runtime
145 } else {
146 panic!("Native thread pool for {name} can not be found!");
147 }
148 }
149
150 pub fn try_get_rayon(&self, name: &str) -> Option<&RayonRuntime> {
151 self.lookup(name, &self.rayon_runtime_mapping, &self.rayon_runtimes)
152 }
153
154 pub fn get_rayon(&self, name: &str) -> &RayonRuntime {
155 if let Some(runtime) = self.try_get_rayon(name) {
156 runtime
157 } else {
158 panic!("Rayon thread pool for {name} can not be found!");
159 }
160 }
161
162 pub fn try_get_tokio(&self, name: &str) -> Option<&TokioRuntime> {
163 self.lookup(name, &self.tokio_runtime_mapping, &self.tokio_runtimes)
164 }
165
166 pub fn get_tokio(&self, name: &str) -> &TokioRuntime {
167 if let Some(runtime) = self.try_get_tokio(name) {
168 runtime
169 } else {
170 panic!("Tokio thread pool for {name} can not be found!");
171 }
172 }
173
174 pub fn set_process_affinity(config: &ThreadManagerConfig) -> anyhow::Result<Vec<usize>> {
175 let chosen_cores_mask = config.default_core_allocation.as_core_mask_vector();
176 crate::policy::set_thread_affinity(&chosen_cores_mask);
177 Ok(chosen_cores_mask)
178 }
179
180 pub fn new(config: ThreadManagerConfig) -> anyhow::Result<Self> {
181 let mut core_allocations = HashMap::<String, Vec<usize>>::new();
182 Self::set_process_affinity(&config)?;
183 let mut manager = ThreadManagerInner::default();
184 manager.populate_mappings(&config);
185 for (name, cfg) in config.native_configs.iter() {
186 let nrt = NativeThreadRuntime::new(name.clone(), cfg.clone());
187 manager.native_thread_runtimes.insert(name.clone(), nrt);
188 }
189 for (name, cfg) in config.rayon_configs.iter() {
190 let rrt = RayonRuntime::new(name.clone(), cfg.clone())?;
191 manager.rayon_runtimes.insert(name.clone(), rrt);
192 }
193
194 for (name, cfg) in config.tokio_configs.iter() {
195 let tokiort = TokioRuntime::new(name.clone(), cfg.clone())?;
196
197 core_allocations.insert(name.clone(), cfg.core_allocation.as_core_mask_vector());
198 manager.tokio_runtimes.insert(name.clone(), tokiort);
199 }
200 Ok(Self {
201 inner: Arc::new(manager),
202 })
203 }
204}
205
206#[cfg(test)]
207mod tests {
208 use {crate::ThreadManagerConfig, std::io::Read};
209 #[cfg(target_os = "linux")]
210 use {
211 crate::{CoreAllocation, NativeConfig, RayonConfig, ThreadManager},
212 std::collections::HashMap,
213 };
214
215 #[test]
216 fn test_config_files() {
217 let experiments = [
218 "examples/core_contention_dedicated_set.toml",
219 "examples/core_contention_contending_set.toml",
220 ];
221
222 for exp in experiments {
223 println!("Loading config {exp}");
224 let mut conffile = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
225 conffile.push(exp);
226 let mut buf = String::new();
227 std::fs::File::open(conffile)
228 .unwrap()
229 .read_to_string(&mut buf)
230 .unwrap();
231 let cfg: ThreadManagerConfig = toml::from_str(&buf).unwrap();
232 println!("{cfg:?}");
233 }
234 }
235 #[cfg(target_os = "linux")]
237 fn validate_affinity(expect_cores: &[usize], error_msg: &str) {
238 let affinity = affinity::get_thread_affinity().unwrap();
239 assert_eq!(affinity, expect_cores, "{error_msg}");
240 }
241 #[test]
242 #[cfg(target_os = "linux")]
243 #[ignore] fn test_thread_priority() {
245 let priority_high = 10;
246 let priority_default = crate::policy::DEFAULT_PRIORITY;
247 let priority_low = 1;
248 let conf = ThreadManagerConfig {
249 native_configs: HashMap::from([
250 (
251 "high".to_owned(),
252 NativeConfig {
253 priority: priority_high,
254 ..Default::default()
255 },
256 ),
257 (
258 "default".to_owned(),
259 NativeConfig {
260 ..Default::default()
261 },
262 ),
263 (
264 "low".to_owned(),
265 NativeConfig {
266 priority: priority_low,
267 ..Default::default()
268 },
269 ),
270 ]),
271 ..Default::default()
272 };
273
274 let manager = ThreadManager::new(conf).unwrap();
275 let high = manager.get_native("high");
276 let low = manager.get_native("low");
277 let default = manager.get_native("default");
278
279 high.spawn(move || {
280 let prio =
281 thread_priority::get_thread_priority(thread_priority::thread_native_id()).unwrap();
282 assert_eq!(
283 prio,
284 thread_priority::ThreadPriority::Crossplatform((priority_high).try_into().unwrap())
285 );
286 })
287 .unwrap()
288 .join()
289 .unwrap();
290 low.spawn(move || {
291 let prio =
292 thread_priority::get_thread_priority(thread_priority::thread_native_id()).unwrap();
293 assert_eq!(
294 prio,
295 thread_priority::ThreadPriority::Crossplatform((priority_low).try_into().unwrap())
296 );
297 })
298 .unwrap()
299 .join()
300 .unwrap();
301 default
302 .spawn(move || {
303 let prio =
304 thread_priority::get_thread_priority(thread_priority::thread_native_id())
305 .unwrap();
306 assert_eq!(
307 prio,
308 thread_priority::ThreadPriority::Crossplatform(
309 (priority_default).try_into().unwrap()
310 )
311 );
312 })
313 .unwrap()
314 .join()
315 .unwrap();
316 }
317
318 #[cfg(target_os = "linux")]
319 #[test]
320 fn test_process_affinity() {
321 let conf = ThreadManagerConfig {
322 native_configs: HashMap::from([(
323 "pool1".to_owned(),
324 NativeConfig {
325 core_allocation: CoreAllocation::DedicatedCoreSet { min: 0, max: 4 },
326 max_threads: 5,
327 ..Default::default()
328 },
329 )]),
330 default_core_allocation: CoreAllocation::DedicatedCoreSet { min: 4, max: 8 },
331 native_runtime_mapping: HashMap::from([("test".to_owned(), "pool1".to_owned())]),
332 ..Default::default()
333 };
334
335 let manager = ThreadManager::new(conf).unwrap();
336 let runtime = manager.get_native("test");
337
338 let thread1 = runtime
339 .spawn(|| {
340 validate_affinity(&[0, 1, 2, 3], "Managed thread allocation should be 0-3");
341 })
342 .unwrap();
343
344 let thread2 = std::thread::spawn(|| {
345 validate_affinity(&[4, 5, 6, 7], "Default thread allocation should be 4-7");
346
347 let inner_thread = std::thread::spawn(|| {
348 validate_affinity(
349 &[4, 5, 6, 7],
350 "Nested thread allocation should still be 4-7",
351 );
352 });
353 inner_thread.join().unwrap();
354 });
355 thread1.join().unwrap();
356 thread2.join().unwrap();
357 }
358
359 #[cfg(target_os = "linux")]
360 #[test]
361 fn test_rayon_affinity() {
362 let conf = ThreadManagerConfig {
363 rayon_configs: HashMap::from([(
364 "test".to_owned(),
365 RayonConfig {
366 core_allocation: CoreAllocation::DedicatedCoreSet { min: 1, max: 4 },
367 worker_threads: 3,
368 ..Default::default()
369 },
370 )]),
371 default_core_allocation: CoreAllocation::DedicatedCoreSet { min: 4, max: 8 },
372
373 ..Default::default()
374 };
375
376 let manager = ThreadManager::new(conf).unwrap();
377 let rayon_runtime = manager.get_rayon("test");
378
379 let _rr = rayon_runtime.rayon_pool.broadcast(|ctx| {
380 println!("Rayon thread {} reporting", ctx.index());
381 validate_affinity(&[1, 2, 3], "Rayon thread allocation should still be 1-3");
382 });
383 }
384}