1use {
2 anyhow::Ok,
3 serde::{Deserialize, Serialize},
4 std::{collections::HashMap, ops::Deref, sync::Arc},
5};
6
7pub mod native_thread_runtime;
8pub mod policy;
9pub mod rayon_runtime;
10pub mod tokio_runtime;
11
12pub use {
13 native_thread_runtime::{JoinHandle, NativeConfig, NativeThreadRuntime},
14 policy::CoreAllocation,
15 rayon_runtime::{RayonConfig, RayonRuntime},
16 tokio_runtime::{TokioConfig, TokioRuntime},
17};
18
19pub const MAX_THREAD_NAME_CHARS: usize = 12;
20
21#[derive(Default, Debug)]
22pub struct ThreadManagerInner {
23 pub tokio_runtimes: HashMap<String, TokioRuntime>,
24 pub tokio_runtime_mapping: HashMap<String, String>,
25
26 pub native_thread_runtimes: HashMap<String, NativeThreadRuntime>,
27 pub native_runtime_mapping: HashMap<String, String>,
28
29 pub rayon_runtimes: HashMap<String, RayonRuntime>,
30 pub rayon_runtime_mapping: HashMap<String, String>,
31}
32
33impl ThreadManagerInner {
34 fn populate_mappings(&mut self, config: &ThreadManagerConfig) {
36 for name in config.native_configs.keys() {
39 self.native_runtime_mapping
40 .insert(name.clone(), name.clone());
41 }
42 for (k, v) in config.native_runtime_mapping.iter() {
43 self.native_runtime_mapping.insert(k.clone(), v.clone());
44 }
45
46 for name in config.tokio_configs.keys() {
47 self.tokio_runtime_mapping
48 .insert(name.clone(), name.clone());
49 }
50 for (k, v) in config.tokio_runtime_mapping.iter() {
51 self.tokio_runtime_mapping.insert(k.clone(), v.clone());
52 }
53
54 for name in config.rayon_configs.keys() {
55 self.rayon_runtime_mapping
56 .insert(name.clone(), name.clone());
57 }
58 for (k, v) in config.rayon_runtime_mapping.iter() {
59 self.rayon_runtime_mapping.insert(k.clone(), v.clone());
60 }
61 }
62}
63
64#[derive(Default, Debug, Clone)]
65pub struct ThreadManager {
66 inner: Arc<ThreadManagerInner>,
67}
68
69impl Deref for ThreadManager {
70 type Target = ThreadManagerInner;
71
72 fn deref(&self) -> &Self::Target {
73 &self.inner
74 }
75}
76
77#[derive(Clone, Debug, Serialize, Deserialize)]
78#[serde(default)]
79pub struct ThreadManagerConfig {
80 pub native_configs: HashMap<String, NativeConfig>,
81 pub native_runtime_mapping: HashMap<String, String>,
82
83 pub rayon_configs: HashMap<String, RayonConfig>,
84 pub rayon_runtime_mapping: HashMap<String, String>,
85
86 pub tokio_configs: HashMap<String, TokioConfig>,
87 pub tokio_runtime_mapping: HashMap<String, String>,
88
89 pub default_core_allocation: CoreAllocation,
90}
91
92impl Default for ThreadManagerConfig {
93 fn default() -> Self {
94 Self {
95 native_configs: HashMap::from([("default".to_owned(), NativeConfig::default())]),
96 native_runtime_mapping: HashMap::new(),
97 rayon_configs: HashMap::from([("default".to_owned(), RayonConfig::default())]),
98 rayon_runtime_mapping: HashMap::new(),
99 tokio_configs: HashMap::from([("default".to_owned(), TokioConfig::default())]),
100 tokio_runtime_mapping: HashMap::new(),
101 default_core_allocation: CoreAllocation::OsDefault,
102 }
103 }
104}
105
106impl ThreadManager {
107 fn lookup<'a, T>(
109 &'a self,
110 name: &str,
111 mapping: &HashMap<String, String>,
112 runtimes: &'a HashMap<String, T>,
113 ) -> Option<&'a T> {
114 match mapping.get(name) {
115 Some(n) => runtimes.get(n),
116 None => match mapping.get("default") {
117 Some(n) => {
118 log::warn!("Falling back to default runtime for {name}");
119 runtimes.get(n)
120 }
121 None => None,
122 },
123 }
124 }
125
126 pub fn try_get_native(&self, name: &str) -> Option<&NativeThreadRuntime> {
127 self.lookup(
128 name,
129 &self.native_runtime_mapping,
130 &self.native_thread_runtimes,
131 )
132 }
133 pub fn get_native(&self, name: &str) -> &NativeThreadRuntime {
134 if let Some(runtime) = self.try_get_native(name) {
135 runtime
136 } else {
137 panic!("Native thread pool for {name} can not be found!");
138 }
139 }
140
141 pub fn try_get_rayon(&self, name: &str) -> Option<&RayonRuntime> {
142 self.lookup(name, &self.rayon_runtime_mapping, &self.rayon_runtimes)
143 }
144
145 pub fn get_rayon(&self, name: &str) -> &RayonRuntime {
146 if let Some(runtime) = self.try_get_rayon(name) {
147 runtime
148 } else {
149 panic!("Rayon thread pool for {name} can not be found!");
150 }
151 }
152
153 pub fn try_get_tokio(&self, name: &str) -> Option<&TokioRuntime> {
154 self.lookup(name, &self.tokio_runtime_mapping, &self.tokio_runtimes)
155 }
156
157 pub fn get_tokio(&self, name: &str) -> &TokioRuntime {
158 if let Some(runtime) = self.try_get_tokio(name) {
159 runtime
160 } else {
161 panic!("Tokio thread pool for {name} can not be found!");
162 }
163 }
164
165 pub fn set_process_affinity(config: &ThreadManagerConfig) -> anyhow::Result<Vec<usize>> {
166 let chosen_cores_mask = config.default_core_allocation.as_core_mask_vector();
167 crate::policy::set_thread_affinity(&chosen_cores_mask);
168 Ok(chosen_cores_mask)
169 }
170
171 pub fn new(config: ThreadManagerConfig) -> anyhow::Result<Self> {
172 let mut core_allocations = HashMap::<String, Vec<usize>>::new();
173 Self::set_process_affinity(&config)?;
174 let mut manager = ThreadManagerInner::default();
175 manager.populate_mappings(&config);
176 for (name, cfg) in config.native_configs.iter() {
177 let nrt = NativeThreadRuntime::new(name.clone(), cfg.clone());
178 manager.native_thread_runtimes.insert(name.clone(), nrt);
179 }
180 for (name, cfg) in config.rayon_configs.iter() {
181 let rrt = RayonRuntime::new(name.clone(), cfg.clone())?;
182 manager.rayon_runtimes.insert(name.clone(), rrt);
183 }
184
185 for (name, cfg) in config.tokio_configs.iter() {
186 let tokiort = TokioRuntime::new(name.clone(), cfg.clone())?;
187
188 core_allocations.insert(name.clone(), cfg.core_allocation.as_core_mask_vector());
189 manager.tokio_runtimes.insert(name.clone(), tokiort);
190 }
191 Ok(Self {
192 inner: Arc::new(manager),
193 })
194 }
195}
196
197#[cfg(test)]
198mod tests {
199 use {crate::ThreadManagerConfig, std::io::Read};
200 #[cfg(target_os = "linux")]
201 use {
202 crate::{CoreAllocation, NativeConfig, RayonConfig, ThreadManager},
203 std::collections::HashMap,
204 };
205
206 #[test]
207 fn test_config_files() {
208 let experiments = [
209 "examples/core_contention_dedicated_set.toml",
210 "examples/core_contention_contending_set.toml",
211 ];
212
213 for exp in experiments {
214 println!("Loading config {exp}");
215 let mut conffile = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
216 conffile.push(exp);
217 let mut buf = String::new();
218 std::fs::File::open(conffile)
219 .unwrap()
220 .read_to_string(&mut buf)
221 .unwrap();
222 let cfg: ThreadManagerConfig = toml::from_str(&buf).unwrap();
223 println!("{:?}", cfg);
224 }
225 }
226 #[cfg(target_os = "linux")]
228 fn validate_affinity(expect_cores: &[usize], error_msg: &str) {
229 let affinity = affinity::get_thread_affinity().unwrap();
230 assert_eq!(affinity, expect_cores, "{}", error_msg);
231 }
232 #[test]
233 #[cfg(target_os = "linux")]
234 #[ignore] fn test_thread_priority() {
236 let priority_high = 10;
237 let priority_default = crate::policy::DEFAULT_PRIORITY;
238 let priority_low = 1;
239 let conf = ThreadManagerConfig {
240 native_configs: HashMap::from([
241 (
242 "high".to_owned(),
243 NativeConfig {
244 priority: priority_high,
245 ..Default::default()
246 },
247 ),
248 (
249 "default".to_owned(),
250 NativeConfig {
251 ..Default::default()
252 },
253 ),
254 (
255 "low".to_owned(),
256 NativeConfig {
257 priority: priority_low,
258 ..Default::default()
259 },
260 ),
261 ]),
262 ..Default::default()
263 };
264
265 let manager = ThreadManager::new(conf).unwrap();
266 let high = manager.get_native("high");
267 let low = manager.get_native("low");
268 let default = manager.get_native("default");
269
270 high.spawn(move || {
271 let prio =
272 thread_priority::get_thread_priority(thread_priority::thread_native_id()).unwrap();
273 assert_eq!(
274 prio,
275 thread_priority::ThreadPriority::Crossplatform((priority_high).try_into().unwrap())
276 );
277 })
278 .unwrap()
279 .join()
280 .unwrap();
281 low.spawn(move || {
282 let prio =
283 thread_priority::get_thread_priority(thread_priority::thread_native_id()).unwrap();
284 assert_eq!(
285 prio,
286 thread_priority::ThreadPriority::Crossplatform((priority_low).try_into().unwrap())
287 );
288 })
289 .unwrap()
290 .join()
291 .unwrap();
292 default
293 .spawn(move || {
294 let prio =
295 thread_priority::get_thread_priority(thread_priority::thread_native_id())
296 .unwrap();
297 assert_eq!(
298 prio,
299 thread_priority::ThreadPriority::Crossplatform(
300 (priority_default).try_into().unwrap()
301 )
302 );
303 })
304 .unwrap()
305 .join()
306 .unwrap();
307 }
308
309 #[cfg(target_os = "linux")]
310 #[test]
311 fn test_process_affinity() {
312 let conf = ThreadManagerConfig {
313 native_configs: HashMap::from([(
314 "pool1".to_owned(),
315 NativeConfig {
316 core_allocation: CoreAllocation::DedicatedCoreSet { min: 0, max: 4 },
317 max_threads: 5,
318 ..Default::default()
319 },
320 )]),
321 default_core_allocation: CoreAllocation::DedicatedCoreSet { min: 4, max: 8 },
322 native_runtime_mapping: HashMap::from([("test".to_owned(), "pool1".to_owned())]),
323 ..Default::default()
324 };
325
326 let manager = ThreadManager::new(conf).unwrap();
327 let runtime = manager.get_native("test");
328
329 let thread1 = runtime
330 .spawn(|| {
331 validate_affinity(&[0, 1, 2, 3], "Managed thread allocation should be 0-3");
332 })
333 .unwrap();
334
335 let thread2 = std::thread::spawn(|| {
336 validate_affinity(&[4, 5, 6, 7], "Default thread allocation should be 4-7");
337
338 let inner_thread = std::thread::spawn(|| {
339 validate_affinity(
340 &[4, 5, 6, 7],
341 "Nested thread allocation should still be 4-7",
342 );
343 });
344 inner_thread.join().unwrap();
345 });
346 thread1.join().unwrap();
347 thread2.join().unwrap();
348 }
349
350 #[cfg(target_os = "linux")]
351 #[test]
352 fn test_rayon_affinity() {
353 let conf = ThreadManagerConfig {
354 rayon_configs: HashMap::from([(
355 "test".to_owned(),
356 RayonConfig {
357 core_allocation: CoreAllocation::DedicatedCoreSet { min: 1, max: 4 },
358 worker_threads: 3,
359 ..Default::default()
360 },
361 )]),
362 default_core_allocation: CoreAllocation::DedicatedCoreSet { min: 4, max: 8 },
363
364 ..Default::default()
365 };
366
367 let manager = ThreadManager::new(conf).unwrap();
368 let rayon_runtime = manager.get_rayon("test");
369
370 let _rr = rayon_runtime.rayon_pool.broadcast(|ctx| {
371 println!("Rayon thread {} reporting", ctx.index());
372 validate_affinity(&[1, 2, 3], "Rayon thread allocation should still be 1-3");
373 });
374 }
375}