agave_thread_manager/
lib.rs

1use {
2    log::{debug, error, warn},
3    std::{
4        collections::HashMap,
5        ops::Deref,
6        sync::{atomic::Ordering, Arc},
7    },
8};
9
10pub mod config;
11pub mod native_thread_runtime;
12pub mod policy;
13pub mod rayon_runtime;
14pub mod tokio_runtime;
15
16pub use {
17    config::ThreadManagerConfig,
18    native_thread_runtime::{JoinHandle, NativeConfig, NativeThreadRuntime},
19    policy::CoreAllocation,
20    rayon_runtime::{RayonConfig, RayonRuntime},
21    tokio_runtime::{TokioConfig, TokioRuntime},
22};
23
24pub const MAX_THREAD_NAME_CHARS: usize = 16;
25
26#[derive(Default, Debug)]
27pub struct ThreadManagerInner {
28    pub tokio_runtimes: HashMap<String, TokioRuntime>,
29    pub tokio_runtime_mapping: HashMap<String, String>,
30
31    pub native_thread_runtimes: HashMap<String, NativeThreadRuntime>,
32    pub native_runtime_mapping: HashMap<String, String>,
33
34    pub rayon_runtimes: HashMap<String, RayonRuntime>,
35    pub rayon_runtime_mapping: HashMap<String, String>,
36}
37
38impl ThreadManagerInner {
39    /// Populates mappings with copies of config names, overrides as appropriate
40    fn populate_mappings(&mut self, config: &ThreadManagerConfig) {
41        //TODO: this should probably be cleaned up with a macro at some point...
42
43        for name in config.native_configs.keys() {
44            self.native_runtime_mapping
45                .insert(name.clone(), name.clone());
46        }
47        for (k, v) in config.native_runtime_mapping.iter() {
48            self.native_runtime_mapping.insert(k.clone(), v.clone());
49        }
50
51        for name in config.tokio_configs.keys() {
52            self.tokio_runtime_mapping
53                .insert(name.clone(), name.clone());
54        }
55        for (k, v) in config.tokio_runtime_mapping.iter() {
56            self.tokio_runtime_mapping.insert(k.clone(), v.clone());
57        }
58
59        for name in config.rayon_configs.keys() {
60            self.rayon_runtime_mapping
61                .insert(name.clone(), name.clone());
62        }
63        for (k, v) in config.rayon_runtime_mapping.iter() {
64            self.rayon_runtime_mapping.insert(k.clone(), v.clone());
65        }
66    }
67}
68
69#[derive(Default, Debug, Clone)]
70pub struct ThreadManager {
71    inner: Arc<ThreadManagerInner>,
72}
73
74impl Deref for ThreadManager {
75    type Target = ThreadManagerInner;
76
77    fn deref(&self) -> &Self::Target {
78        &self.inner
79    }
80}
81
82impl ThreadManager {
83    /// Will lookup a runtime by given name. If not found, will try to lookup by name "default". If all fails, returns None.
84    fn lookup<'a, T>(
85        &'a self,
86        name: &str,
87        mapping: &HashMap<String, String>,
88        runtimes: &'a HashMap<String, T>,
89    ) -> Option<&'a T> {
90        match mapping.get(name) {
91            Some(n) => runtimes.get(n),
92            None => match mapping.get("default") {
93                Some(n) => {
94                    warn!("Falling back to default runtime for {name}");
95                    runtimes.get(n)
96                }
97                None => None,
98            },
99        }
100    }
101
102    pub fn try_get_native(&self, name: &str) -> Option<&NativeThreadRuntime> {
103        self.lookup(
104            name,
105            &self.native_runtime_mapping,
106            &self.native_thread_runtimes,
107        )
108    }
109    pub fn get_native(&self, name: &str) -> &NativeThreadRuntime {
110        if let Some(runtime) = self.try_get_native(name) {
111            runtime
112        } else {
113            panic!("Native thread pool for {name} can not be found!");
114        }
115    }
116
117    pub fn try_get_rayon(&self, name: &str) -> Option<&RayonRuntime> {
118        self.lookup(name, &self.rayon_runtime_mapping, &self.rayon_runtimes)
119    }
120
121    pub fn get_rayon(&self, name: &str) -> &RayonRuntime {
122        if let Some(runtime) = self.try_get_rayon(name) {
123            runtime
124        } else {
125            panic!("Rayon thread pool for {name} can not be found!");
126        }
127    }
128
129    pub fn try_get_tokio(&self, name: &str) -> Option<&TokioRuntime> {
130        self.lookup(name, &self.tokio_runtime_mapping, &self.tokio_runtimes)
131    }
132
133    pub fn get_tokio(&self, name: &str) -> &TokioRuntime {
134        if let Some(runtime) = self.try_get_tokio(name) {
135            runtime
136        } else {
137            panic!("Tokio thread pool for {name} can not be found!");
138        }
139    }
140
141    pub fn set_process_affinity(config: &ThreadManagerConfig) -> anyhow::Result<Vec<usize>> {
142        let chosen_cores_mask = config.default_core_allocation.as_core_mask_vector();
143        crate::policy::set_thread_affinity(&chosen_cores_mask);
144        Ok(chosen_cores_mask)
145    }
146
147    pub fn new(config: &ThreadManagerConfig) -> anyhow::Result<Self> {
148        let mut core_allocations = HashMap::<String, Vec<usize>>::new();
149        Self::set_process_affinity(config)?;
150        let mut manager = ThreadManagerInner::default();
151        manager.populate_mappings(config);
152        for (name, cfg) in config.native_configs.iter() {
153            let nrt = NativeThreadRuntime::new(name.clone(), cfg.clone());
154            manager.native_thread_runtimes.insert(name.clone(), nrt);
155        }
156        for (name, cfg) in config.rayon_configs.iter() {
157            let rrt = RayonRuntime::new(name.clone(), cfg.clone())?;
158            manager.rayon_runtimes.insert(name.clone(), rrt);
159        }
160
161        for (name, cfg) in config.tokio_configs.iter() {
162            let tokiort = TokioRuntime::new(name.clone(), cfg.clone())?;
163
164            core_allocations.insert(name.clone(), cfg.core_allocation.as_core_mask_vector());
165            manager.tokio_runtimes.insert(name.clone(), tokiort);
166        }
167        Ok(Self {
168            inner: Arc::new(manager),
169        })
170    }
171
172    pub fn destroy(self) {
173        let Ok(mut inner) = Arc::try_unwrap(self.inner) else {
174            error!(
175                      "References to Thread Manager are still active, clean shutdown may not be possible!"
176                  );
177            return;
178        };
179
180        for (name, runtime) in inner.tokio_runtimes.drain() {
181            let active_cnt = runtime.counters.active_threads_cnt.load(Ordering::SeqCst);
182            match active_cnt {
183                0 => debug!("Shutting down Tokio runtime {name}"),
184                _ => warn!("Tokio runtime {name} has active workers during shutdown!"),
185            }
186            runtime.tokio.shutdown_background();
187        }
188        for (name, runtime) in inner.native_thread_runtimes.drain() {
189            let active_cnt = runtime.running_count.load(Ordering::SeqCst);
190            match active_cnt {
191                0 => debug!("Shutting down Native thread pool {name}"),
192                _ => warn!("Native pool {name} has active threads during shutdown!"),
193            }
194        }
195    }
196}
197
198#[cfg(test)]
199mod tests {
200    use {crate::ThreadManagerConfig, std::io::Read};
201    #[cfg(target_os = "linux")]
202    use {
203        crate::{CoreAllocation, NativeConfig, RayonConfig, ThreadManager},
204        std::collections::HashMap,
205    };
206
207    #[test]
208    fn test_config_files() {
209        let experiments = [
210            "examples/core_contention_dedicated_set.toml",
211            "examples/core_contention_contending_set.toml",
212        ];
213
214        for exp in experiments {
215            println!("Loading config {exp}");
216            let mut conffile = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
217            conffile.push(exp);
218            let mut buf = String::new();
219            std::fs::File::open(conffile)
220                .unwrap()
221                .read_to_string(&mut buf)
222                .unwrap();
223            let cfg: ThreadManagerConfig = toml::from_str(&buf).unwrap();
224            println!("{:?}", cfg);
225        }
226    }
227    // Nobody runs Agave on windows, and on Mac we can not set mask affinity without patching external crate
228    #[cfg(target_os = "linux")]
229    fn validate_affinity(expect_cores: &[usize], error_msg: &str) {
230        let affinity = affinity::get_thread_affinity().unwrap();
231        assert_eq!(affinity, expect_cores, "{}", error_msg);
232    }
233    #[test]
234    #[cfg(target_os = "linux")]
235    #[ignore] //test ignored for now as thread priority requires kernel support and extra permissions
236    fn test_thread_priority() {
237        let priority_high = 10;
238        let priority_default = crate::policy::DEFAULT_PRIORITY;
239        let priority_low = 1;
240        let conf = ThreadManagerConfig {
241            native_configs: HashMap::from([
242                (
243                    "high".to_owned(),
244                    NativeConfig {
245                        priority: priority_high,
246                        ..Default::default()
247                    },
248                ),
249                (
250                    "default".to_owned(),
251                    NativeConfig {
252                        ..Default::default()
253                    },
254                ),
255                (
256                    "low".to_owned(),
257                    NativeConfig {
258                        priority: priority_low,
259                        ..Default::default()
260                    },
261                ),
262            ]),
263            ..Default::default()
264        };
265
266        let manager = ThreadManager::new(&conf).unwrap();
267        let high = manager.get_native("high");
268        let low = manager.get_native("low");
269        let default = manager.get_native("default");
270
271        high.spawn(move || {
272            let prio =
273                thread_priority::get_thread_priority(thread_priority::thread_native_id()).unwrap();
274            assert_eq!(
275                prio,
276                thread_priority::ThreadPriority::Crossplatform((priority_high).try_into().unwrap())
277            );
278        })
279        .unwrap()
280        .join()
281        .unwrap();
282        low.spawn(move || {
283            let prio =
284                thread_priority::get_thread_priority(thread_priority::thread_native_id()).unwrap();
285            assert_eq!(
286                prio,
287                thread_priority::ThreadPriority::Crossplatform((priority_low).try_into().unwrap())
288            );
289        })
290        .unwrap()
291        .join()
292        .unwrap();
293        default
294            .spawn(move || {
295                let prio =
296                    thread_priority::get_thread_priority(thread_priority::thread_native_id())
297                        .unwrap();
298                assert_eq!(
299                    prio,
300                    thread_priority::ThreadPriority::Crossplatform(
301                        (priority_default).try_into().unwrap()
302                    )
303                );
304            })
305            .unwrap()
306            .join()
307            .unwrap();
308    }
309
310    #[cfg(target_os = "linux")]
311    #[test]
312    fn test_process_affinity() {
313        let conf = ThreadManagerConfig {
314            native_configs: HashMap::from([(
315                "pool1".to_owned(),
316                NativeConfig {
317                    core_allocation: CoreAllocation::DedicatedCoreSet { min: 0, max: 4 },
318                    max_threads: 5,
319                    ..Default::default()
320                },
321            )]),
322            default_core_allocation: CoreAllocation::DedicatedCoreSet { min: 4, max: 8 },
323            native_runtime_mapping: HashMap::from([("test".to_owned(), "pool1".to_owned())]),
324            ..Default::default()
325        };
326
327        let manager = ThreadManager::new(&conf).unwrap();
328        let runtime = manager.get_native("test");
329
330        let thread1 = runtime
331            .spawn(|| {
332                validate_affinity(&[0, 1, 2, 3], "Managed thread allocation should be 0-3");
333            })
334            .unwrap();
335
336        let thread2 = std::thread::spawn(|| {
337            validate_affinity(&[4, 5, 6, 7], "Default thread allocation should be 4-7");
338
339            let inner_thread = std::thread::spawn(|| {
340                validate_affinity(
341                    &[4, 5, 6, 7],
342                    "Nested thread allocation should still be 4-7",
343                );
344            });
345            inner_thread.join().unwrap();
346        });
347        thread1.join().unwrap();
348        thread2.join().unwrap();
349    }
350
351    #[cfg(target_os = "linux")]
352    #[test]
353    fn test_rayon_affinity() {
354        let conf = ThreadManagerConfig {
355            rayon_configs: HashMap::from([(
356                "test".to_owned(),
357                RayonConfig {
358                    core_allocation: CoreAllocation::DedicatedCoreSet { min: 1, max: 4 },
359                    worker_threads: 3,
360                    ..Default::default()
361                },
362            )]),
363            default_core_allocation: CoreAllocation::DedicatedCoreSet { min: 4, max: 8 },
364
365            ..Default::default()
366        };
367
368        let manager = ThreadManager::new(&conf).unwrap();
369        let rayon_runtime = manager.get_rayon("test");
370
371        let _rr = rayon_runtime.rayon_pool.broadcast(|ctx| {
372            println!("Rayon thread {} reporting", ctx.index());
373            validate_affinity(&[1, 2, 3], "Rayon thread allocation should still be 1-3");
374        });
375    }
376}