1use {
2 log::{debug, error, warn},
3 std::{
4 collections::HashMap,
5 ops::Deref,
6 sync::{atomic::Ordering, Arc},
7 },
8};
9
10pub mod config;
11pub mod native_thread_runtime;
12pub mod policy;
13pub mod rayon_runtime;
14pub mod tokio_runtime;
15
16pub use {
17 config::ThreadManagerConfig,
18 native_thread_runtime::{JoinHandle, NativeConfig, NativeThreadRuntime},
19 policy::CoreAllocation,
20 rayon_runtime::{RayonConfig, RayonRuntime},
21 tokio_runtime::{TokioConfig, TokioRuntime},
22};
23
24pub const MAX_THREAD_NAME_CHARS: usize = 16;
25
26#[derive(Default, Debug)]
27pub struct ThreadManagerInner {
28 pub tokio_runtimes: HashMap<String, TokioRuntime>,
29 pub tokio_runtime_mapping: HashMap<String, String>,
30
31 pub native_thread_runtimes: HashMap<String, NativeThreadRuntime>,
32 pub native_runtime_mapping: HashMap<String, String>,
33
34 pub rayon_runtimes: HashMap<String, RayonRuntime>,
35 pub rayon_runtime_mapping: HashMap<String, String>,
36}
37
38impl ThreadManagerInner {
39 fn populate_mappings(&mut self, config: &ThreadManagerConfig) {
41 for name in config.native_configs.keys() {
44 self.native_runtime_mapping
45 .insert(name.clone(), name.clone());
46 }
47 for (k, v) in config.native_runtime_mapping.iter() {
48 self.native_runtime_mapping.insert(k.clone(), v.clone());
49 }
50
51 for name in config.tokio_configs.keys() {
52 self.tokio_runtime_mapping
53 .insert(name.clone(), name.clone());
54 }
55 for (k, v) in config.tokio_runtime_mapping.iter() {
56 self.tokio_runtime_mapping.insert(k.clone(), v.clone());
57 }
58
59 for name in config.rayon_configs.keys() {
60 self.rayon_runtime_mapping
61 .insert(name.clone(), name.clone());
62 }
63 for (k, v) in config.rayon_runtime_mapping.iter() {
64 self.rayon_runtime_mapping.insert(k.clone(), v.clone());
65 }
66 }
67}
68
69#[derive(Default, Debug, Clone)]
70pub struct ThreadManager {
71 inner: Arc<ThreadManagerInner>,
72}
73
74impl Deref for ThreadManager {
75 type Target = ThreadManagerInner;
76
77 fn deref(&self) -> &Self::Target {
78 &self.inner
79 }
80}
81
82impl ThreadManager {
83 fn lookup<'a, T>(
85 &'a self,
86 name: &str,
87 mapping: &HashMap<String, String>,
88 runtimes: &'a HashMap<String, T>,
89 ) -> Option<&'a T> {
90 match mapping.get(name) {
91 Some(n) => runtimes.get(n),
92 None => match mapping.get("default") {
93 Some(n) => {
94 warn!("Falling back to default runtime for {name}");
95 runtimes.get(n)
96 }
97 None => None,
98 },
99 }
100 }
101
102 pub fn try_get_native(&self, name: &str) -> Option<&NativeThreadRuntime> {
103 self.lookup(
104 name,
105 &self.native_runtime_mapping,
106 &self.native_thread_runtimes,
107 )
108 }
109 pub fn get_native(&self, name: &str) -> &NativeThreadRuntime {
110 if let Some(runtime) = self.try_get_native(name) {
111 runtime
112 } else {
113 panic!("Native thread pool for {name} can not be found!");
114 }
115 }
116
117 pub fn try_get_rayon(&self, name: &str) -> Option<&RayonRuntime> {
118 self.lookup(name, &self.rayon_runtime_mapping, &self.rayon_runtimes)
119 }
120
121 pub fn get_rayon(&self, name: &str) -> &RayonRuntime {
122 if let Some(runtime) = self.try_get_rayon(name) {
123 runtime
124 } else {
125 panic!("Rayon thread pool for {name} can not be found!");
126 }
127 }
128
129 pub fn try_get_tokio(&self, name: &str) -> Option<&TokioRuntime> {
130 self.lookup(name, &self.tokio_runtime_mapping, &self.tokio_runtimes)
131 }
132
133 pub fn get_tokio(&self, name: &str) -> &TokioRuntime {
134 if let Some(runtime) = self.try_get_tokio(name) {
135 runtime
136 } else {
137 panic!("Tokio thread pool for {name} can not be found!");
138 }
139 }
140
141 pub fn set_process_affinity(config: &ThreadManagerConfig) -> anyhow::Result<Vec<usize>> {
142 let chosen_cores_mask = config.default_core_allocation.as_core_mask_vector();
143 crate::policy::set_thread_affinity(&chosen_cores_mask);
144 Ok(chosen_cores_mask)
145 }
146
147 pub fn new(config: &ThreadManagerConfig) -> anyhow::Result<Self> {
148 let mut core_allocations = HashMap::<String, Vec<usize>>::new();
149 Self::set_process_affinity(config)?;
150 let mut manager = ThreadManagerInner::default();
151 manager.populate_mappings(config);
152 for (name, cfg) in config.native_configs.iter() {
153 let nrt = NativeThreadRuntime::new(name.clone(), cfg.clone());
154 manager.native_thread_runtimes.insert(name.clone(), nrt);
155 }
156 for (name, cfg) in config.rayon_configs.iter() {
157 let rrt = RayonRuntime::new(name.clone(), cfg.clone())?;
158 manager.rayon_runtimes.insert(name.clone(), rrt);
159 }
160
161 for (name, cfg) in config.tokio_configs.iter() {
162 let tokiort = TokioRuntime::new(name.clone(), cfg.clone())?;
163
164 core_allocations.insert(name.clone(), cfg.core_allocation.as_core_mask_vector());
165 manager.tokio_runtimes.insert(name.clone(), tokiort);
166 }
167 Ok(Self {
168 inner: Arc::new(manager),
169 })
170 }
171
172 pub fn destroy(self) {
173 let Ok(mut inner) = Arc::try_unwrap(self.inner) else {
174 error!(
175 "References to Thread Manager are still active, clean shutdown may not be possible!"
176 );
177 return;
178 };
179
180 for (name, runtime) in inner.tokio_runtimes.drain() {
181 let active_cnt = runtime.counters.active_threads_cnt.load(Ordering::SeqCst);
182 match active_cnt {
183 0 => debug!("Shutting down Tokio runtime {name}"),
184 _ => warn!("Tokio runtime {name} has active workers during shutdown!"),
185 }
186 runtime.tokio.shutdown_background();
187 }
188 for (name, runtime) in inner.native_thread_runtimes.drain() {
189 let active_cnt = runtime.running_count.load(Ordering::SeqCst);
190 match active_cnt {
191 0 => debug!("Shutting down Native thread pool {name}"),
192 _ => warn!("Native pool {name} has active threads during shutdown!"),
193 }
194 }
195 }
196}
197
198#[cfg(test)]
199mod tests {
200 use {crate::ThreadManagerConfig, std::io::Read};
201 #[cfg(target_os = "linux")]
202 use {
203 crate::{CoreAllocation, NativeConfig, RayonConfig, ThreadManager},
204 std::collections::HashMap,
205 };
206
207 #[test]
208 fn test_config_files() {
209 let experiments = [
210 "examples/core_contention_dedicated_set.toml",
211 "examples/core_contention_contending_set.toml",
212 ];
213
214 for exp in experiments {
215 println!("Loading config {exp}");
216 let mut conffile = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
217 conffile.push(exp);
218 let mut buf = String::new();
219 std::fs::File::open(conffile)
220 .unwrap()
221 .read_to_string(&mut buf)
222 .unwrap();
223 let cfg: ThreadManagerConfig = toml::from_str(&buf).unwrap();
224 println!("{:?}", cfg);
225 }
226 }
227 #[cfg(target_os = "linux")]
229 fn validate_affinity(expect_cores: &[usize], error_msg: &str) {
230 let affinity = affinity::get_thread_affinity().unwrap();
231 assert_eq!(affinity, expect_cores, "{}", error_msg);
232 }
233 #[test]
234 #[cfg(target_os = "linux")]
235 #[ignore] fn test_thread_priority() {
237 let priority_high = 10;
238 let priority_default = crate::policy::DEFAULT_PRIORITY;
239 let priority_low = 1;
240 let conf = ThreadManagerConfig {
241 native_configs: HashMap::from([
242 (
243 "high".to_owned(),
244 NativeConfig {
245 priority: priority_high,
246 ..Default::default()
247 },
248 ),
249 (
250 "default".to_owned(),
251 NativeConfig {
252 ..Default::default()
253 },
254 ),
255 (
256 "low".to_owned(),
257 NativeConfig {
258 priority: priority_low,
259 ..Default::default()
260 },
261 ),
262 ]),
263 ..Default::default()
264 };
265
266 let manager = ThreadManager::new(&conf).unwrap();
267 let high = manager.get_native("high");
268 let low = manager.get_native("low");
269 let default = manager.get_native("default");
270
271 high.spawn(move || {
272 let prio =
273 thread_priority::get_thread_priority(thread_priority::thread_native_id()).unwrap();
274 assert_eq!(
275 prio,
276 thread_priority::ThreadPriority::Crossplatform((priority_high).try_into().unwrap())
277 );
278 })
279 .unwrap()
280 .join()
281 .unwrap();
282 low.spawn(move || {
283 let prio =
284 thread_priority::get_thread_priority(thread_priority::thread_native_id()).unwrap();
285 assert_eq!(
286 prio,
287 thread_priority::ThreadPriority::Crossplatform((priority_low).try_into().unwrap())
288 );
289 })
290 .unwrap()
291 .join()
292 .unwrap();
293 default
294 .spawn(move || {
295 let prio =
296 thread_priority::get_thread_priority(thread_priority::thread_native_id())
297 .unwrap();
298 assert_eq!(
299 prio,
300 thread_priority::ThreadPriority::Crossplatform(
301 (priority_default).try_into().unwrap()
302 )
303 );
304 })
305 .unwrap()
306 .join()
307 .unwrap();
308 }
309
310 #[cfg(target_os = "linux")]
311 #[test]
312 fn test_process_affinity() {
313 let conf = ThreadManagerConfig {
314 native_configs: HashMap::from([(
315 "pool1".to_owned(),
316 NativeConfig {
317 core_allocation: CoreAllocation::DedicatedCoreSet { min: 0, max: 4 },
318 max_threads: 5,
319 ..Default::default()
320 },
321 )]),
322 default_core_allocation: CoreAllocation::DedicatedCoreSet { min: 4, max: 8 },
323 native_runtime_mapping: HashMap::from([("test".to_owned(), "pool1".to_owned())]),
324 ..Default::default()
325 };
326
327 let manager = ThreadManager::new(&conf).unwrap();
328 let runtime = manager.get_native("test");
329
330 let thread1 = runtime
331 .spawn(|| {
332 validate_affinity(&[0, 1, 2, 3], "Managed thread allocation should be 0-3");
333 })
334 .unwrap();
335
336 let thread2 = std::thread::spawn(|| {
337 validate_affinity(&[4, 5, 6, 7], "Default thread allocation should be 4-7");
338
339 let inner_thread = std::thread::spawn(|| {
340 validate_affinity(
341 &[4, 5, 6, 7],
342 "Nested thread allocation should still be 4-7",
343 );
344 });
345 inner_thread.join().unwrap();
346 });
347 thread1.join().unwrap();
348 thread2.join().unwrap();
349 }
350
351 #[cfg(target_os = "linux")]
352 #[test]
353 fn test_rayon_affinity() {
354 let conf = ThreadManagerConfig {
355 rayon_configs: HashMap::from([(
356 "test".to_owned(),
357 RayonConfig {
358 core_allocation: CoreAllocation::DedicatedCoreSet { min: 1, max: 4 },
359 worker_threads: 3,
360 ..Default::default()
361 },
362 )]),
363 default_core_allocation: CoreAllocation::DedicatedCoreSet { min: 4, max: 8 },
364
365 ..Default::default()
366 };
367
368 let manager = ThreadManager::new(&conf).unwrap();
369 let rayon_runtime = manager.get_rayon("test");
370
371 let _rr = rayon_runtime.rayon_pool.broadcast(|ctx| {
372 println!("Rayon thread {} reporting", ctx.index());
373 validate_affinity(&[1, 2, 3], "Rayon thread allocation should still be 1-3");
374 });
375 }
376}