agave_thread_manager/
lib.rs

1#![cfg_attr(
2    not(feature = "agave-unstable-api"),
3    deprecated(
4        since = "3.1.0",
5        note = "This crate has been marked for formal inclusion in the Agave Unstable API. From \
6                v4.0.0 onward, the `agave-unstable-api` crate feature must be specified to \
7                acknowledge use of an interface that may break without warning."
8    )
9)]
10use {
11    anyhow::Ok,
12    serde::{Deserialize, Serialize},
13    std::{collections::HashMap, ops::Deref, sync::Arc},
14};
15
16pub mod native_thread_runtime;
17pub mod policy;
18pub mod rayon_runtime;
19pub mod tokio_runtime;
20
21pub use {
22    native_thread_runtime::{JoinHandle, NativeConfig, NativeThreadRuntime},
23    policy::CoreAllocation,
24    rayon_runtime::{RayonConfig, RayonRuntime},
25    tokio_runtime::{TokioConfig, TokioRuntime},
26};
27
28pub const MAX_THREAD_NAME_CHARS: usize = 12;
29
30#[derive(Default, Debug)]
31pub struct ThreadManagerInner {
32    pub tokio_runtimes: HashMap<String, TokioRuntime>,
33    pub tokio_runtime_mapping: HashMap<String, String>,
34
35    pub native_thread_runtimes: HashMap<String, NativeThreadRuntime>,
36    pub native_runtime_mapping: HashMap<String, String>,
37
38    pub rayon_runtimes: HashMap<String, RayonRuntime>,
39    pub rayon_runtime_mapping: HashMap<String, String>,
40}
41
42impl ThreadManagerInner {
43    /// Populates mappings with copies of config names, overrides as appropriate
44    fn populate_mappings(&mut self, config: &ThreadManagerConfig) {
45        //TODO: this should probably be cleaned up with a macro at some point...
46
47        for name in config.native_configs.keys() {
48            self.native_runtime_mapping
49                .insert(name.clone(), name.clone());
50        }
51        for (k, v) in config.native_runtime_mapping.iter() {
52            self.native_runtime_mapping.insert(k.clone(), v.clone());
53        }
54
55        for name in config.tokio_configs.keys() {
56            self.tokio_runtime_mapping
57                .insert(name.clone(), name.clone());
58        }
59        for (k, v) in config.tokio_runtime_mapping.iter() {
60            self.tokio_runtime_mapping.insert(k.clone(), v.clone());
61        }
62
63        for name in config.rayon_configs.keys() {
64            self.rayon_runtime_mapping
65                .insert(name.clone(), name.clone());
66        }
67        for (k, v) in config.rayon_runtime_mapping.iter() {
68            self.rayon_runtime_mapping.insert(k.clone(), v.clone());
69        }
70    }
71}
72
73#[derive(Default, Debug, Clone)]
74pub struct ThreadManager {
75    inner: Arc<ThreadManagerInner>,
76}
77
78impl Deref for ThreadManager {
79    type Target = ThreadManagerInner;
80
81    fn deref(&self) -> &Self::Target {
82        &self.inner
83    }
84}
85
86#[derive(Clone, Debug, Serialize, Deserialize)]
87#[serde(default)]
88pub struct ThreadManagerConfig {
89    pub native_configs: HashMap<String, NativeConfig>,
90    pub native_runtime_mapping: HashMap<String, String>,
91
92    pub rayon_configs: HashMap<String, RayonConfig>,
93    pub rayon_runtime_mapping: HashMap<String, String>,
94
95    pub tokio_configs: HashMap<String, TokioConfig>,
96    pub tokio_runtime_mapping: HashMap<String, String>,
97
98    pub default_core_allocation: CoreAllocation,
99}
100
101impl Default for ThreadManagerConfig {
102    fn default() -> Self {
103        Self {
104            native_configs: HashMap::from([("default".to_owned(), NativeConfig::default())]),
105            native_runtime_mapping: HashMap::new(),
106            rayon_configs: HashMap::from([("default".to_owned(), RayonConfig::default())]),
107            rayon_runtime_mapping: HashMap::new(),
108            tokio_configs: HashMap::from([("default".to_owned(), TokioConfig::default())]),
109            tokio_runtime_mapping: HashMap::new(),
110            default_core_allocation: CoreAllocation::OsDefault,
111        }
112    }
113}
114
115impl ThreadManager {
116    /// Will lookup a runtime by given name. If not found, will try to lookup by name "default". If all fails, returns None.
117    fn lookup<'a, T>(
118        &'a self,
119        name: &str,
120        mapping: &HashMap<String, String>,
121        runtimes: &'a HashMap<String, T>,
122    ) -> Option<&'a T> {
123        match mapping.get(name) {
124            Some(n) => runtimes.get(n),
125            None => match mapping.get("default") {
126                Some(n) => {
127                    log::warn!("Falling back to default runtime for {name}");
128                    runtimes.get(n)
129                }
130                None => None,
131            },
132        }
133    }
134
135    pub fn try_get_native(&self, name: &str) -> Option<&NativeThreadRuntime> {
136        self.lookup(
137            name,
138            &self.native_runtime_mapping,
139            &self.native_thread_runtimes,
140        )
141    }
142    pub fn get_native(&self, name: &str) -> &NativeThreadRuntime {
143        if let Some(runtime) = self.try_get_native(name) {
144            runtime
145        } else {
146            panic!("Native thread pool for {name} can not be found!");
147        }
148    }
149
150    pub fn try_get_rayon(&self, name: &str) -> Option<&RayonRuntime> {
151        self.lookup(name, &self.rayon_runtime_mapping, &self.rayon_runtimes)
152    }
153
154    pub fn get_rayon(&self, name: &str) -> &RayonRuntime {
155        if let Some(runtime) = self.try_get_rayon(name) {
156            runtime
157        } else {
158            panic!("Rayon thread pool for {name} can not be found!");
159        }
160    }
161
162    pub fn try_get_tokio(&self, name: &str) -> Option<&TokioRuntime> {
163        self.lookup(name, &self.tokio_runtime_mapping, &self.tokio_runtimes)
164    }
165
166    pub fn get_tokio(&self, name: &str) -> &TokioRuntime {
167        if let Some(runtime) = self.try_get_tokio(name) {
168            runtime
169        } else {
170            panic!("Tokio thread pool for {name} can not be found!");
171        }
172    }
173
174    pub fn set_process_affinity(config: &ThreadManagerConfig) -> anyhow::Result<Vec<usize>> {
175        let chosen_cores_mask = config.default_core_allocation.as_core_mask_vector();
176        crate::policy::set_thread_affinity(&chosen_cores_mask);
177        Ok(chosen_cores_mask)
178    }
179
180    pub fn new(config: ThreadManagerConfig) -> anyhow::Result<Self> {
181        let mut core_allocations = HashMap::<String, Vec<usize>>::new();
182        Self::set_process_affinity(&config)?;
183        let mut manager = ThreadManagerInner::default();
184        manager.populate_mappings(&config);
185        for (name, cfg) in config.native_configs.iter() {
186            let nrt = NativeThreadRuntime::new(name.clone(), cfg.clone());
187            manager.native_thread_runtimes.insert(name.clone(), nrt);
188        }
189        for (name, cfg) in config.rayon_configs.iter() {
190            let rrt = RayonRuntime::new(name.clone(), cfg.clone())?;
191            manager.rayon_runtimes.insert(name.clone(), rrt);
192        }
193
194        for (name, cfg) in config.tokio_configs.iter() {
195            let tokiort = TokioRuntime::new(name.clone(), cfg.clone())?;
196
197            core_allocations.insert(name.clone(), cfg.core_allocation.as_core_mask_vector());
198            manager.tokio_runtimes.insert(name.clone(), tokiort);
199        }
200        Ok(Self {
201            inner: Arc::new(manager),
202        })
203    }
204}
205
206#[cfg(test)]
207mod tests {
208    use {crate::ThreadManagerConfig, std::io::Read};
209    #[cfg(target_os = "linux")]
210    use {
211        crate::{CoreAllocation, NativeConfig, RayonConfig, ThreadManager},
212        std::collections::HashMap,
213    };
214
215    #[test]
216    fn test_config_files() {
217        let experiments = [
218            "examples/core_contention_dedicated_set.toml",
219            "examples/core_contention_contending_set.toml",
220        ];
221
222        for exp in experiments {
223            println!("Loading config {exp}");
224            let mut conffile = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
225            conffile.push(exp);
226            let mut buf = String::new();
227            std::fs::File::open(conffile)
228                .unwrap()
229                .read_to_string(&mut buf)
230                .unwrap();
231            let cfg: ThreadManagerConfig = toml::from_str(&buf).unwrap();
232            println!("{cfg:?}");
233        }
234    }
235    // Nobody runs Agave on windows, and on Mac we can not set mask affinity without patching external crate
236    #[cfg(target_os = "linux")]
237    fn validate_affinity(expect_cores: &[usize], error_msg: &str) {
238        let affinity = affinity::get_thread_affinity().unwrap();
239        assert_eq!(affinity, expect_cores, "{error_msg}");
240    }
241    #[test]
242    #[cfg(target_os = "linux")]
243    #[ignore] //test ignored for now as thread priority requires kernel support and extra permissions
244    fn test_thread_priority() {
245        let priority_high = 10;
246        let priority_default = crate::policy::DEFAULT_PRIORITY;
247        let priority_low = 1;
248        let conf = ThreadManagerConfig {
249            native_configs: HashMap::from([
250                (
251                    "high".to_owned(),
252                    NativeConfig {
253                        priority: priority_high,
254                        ..Default::default()
255                    },
256                ),
257                (
258                    "default".to_owned(),
259                    NativeConfig {
260                        ..Default::default()
261                    },
262                ),
263                (
264                    "low".to_owned(),
265                    NativeConfig {
266                        priority: priority_low,
267                        ..Default::default()
268                    },
269                ),
270            ]),
271            ..Default::default()
272        };
273
274        let manager = ThreadManager::new(conf).unwrap();
275        let high = manager.get_native("high");
276        let low = manager.get_native("low");
277        let default = manager.get_native("default");
278
279        high.spawn(move || {
280            let prio =
281                thread_priority::get_thread_priority(thread_priority::thread_native_id()).unwrap();
282            assert_eq!(
283                prio,
284                thread_priority::ThreadPriority::Crossplatform((priority_high).try_into().unwrap())
285            );
286        })
287        .unwrap()
288        .join()
289        .unwrap();
290        low.spawn(move || {
291            let prio =
292                thread_priority::get_thread_priority(thread_priority::thread_native_id()).unwrap();
293            assert_eq!(
294                prio,
295                thread_priority::ThreadPriority::Crossplatform((priority_low).try_into().unwrap())
296            );
297        })
298        .unwrap()
299        .join()
300        .unwrap();
301        default
302            .spawn(move || {
303                let prio =
304                    thread_priority::get_thread_priority(thread_priority::thread_native_id())
305                        .unwrap();
306                assert_eq!(
307                    prio,
308                    thread_priority::ThreadPriority::Crossplatform(
309                        (priority_default).try_into().unwrap()
310                    )
311                );
312            })
313            .unwrap()
314            .join()
315            .unwrap();
316    }
317
318    #[cfg(target_os = "linux")]
319    #[test]
320    fn test_process_affinity() {
321        let conf = ThreadManagerConfig {
322            native_configs: HashMap::from([(
323                "pool1".to_owned(),
324                NativeConfig {
325                    core_allocation: CoreAllocation::DedicatedCoreSet { min: 0, max: 4 },
326                    max_threads: 5,
327                    ..Default::default()
328                },
329            )]),
330            default_core_allocation: CoreAllocation::DedicatedCoreSet { min: 4, max: 8 },
331            native_runtime_mapping: HashMap::from([("test".to_owned(), "pool1".to_owned())]),
332            ..Default::default()
333        };
334
335        let manager = ThreadManager::new(conf).unwrap();
336        let runtime = manager.get_native("test");
337
338        let thread1 = runtime
339            .spawn(|| {
340                validate_affinity(&[0, 1, 2, 3], "Managed thread allocation should be 0-3");
341            })
342            .unwrap();
343
344        let thread2 = std::thread::spawn(|| {
345            validate_affinity(&[4, 5, 6, 7], "Default thread allocation should be 4-7");
346
347            let inner_thread = std::thread::spawn(|| {
348                validate_affinity(
349                    &[4, 5, 6, 7],
350                    "Nested thread allocation should still be 4-7",
351                );
352            });
353            inner_thread.join().unwrap();
354        });
355        thread1.join().unwrap();
356        thread2.join().unwrap();
357    }
358
359    #[cfg(target_os = "linux")]
360    #[test]
361    fn test_rayon_affinity() {
362        let conf = ThreadManagerConfig {
363            rayon_configs: HashMap::from([(
364                "test".to_owned(),
365                RayonConfig {
366                    core_allocation: CoreAllocation::DedicatedCoreSet { min: 1, max: 4 },
367                    worker_threads: 3,
368                    ..Default::default()
369                },
370            )]),
371            default_core_allocation: CoreAllocation::DedicatedCoreSet { min: 4, max: 8 },
372
373            ..Default::default()
374        };
375
376        let manager = ThreadManager::new(conf).unwrap();
377        let rayon_runtime = manager.get_rayon("test");
378
379        let _rr = rayon_runtime.rayon_pool.broadcast(|ctx| {
380            println!("Rayon thread {} reporting", ctx.index());
381            validate_affinity(&[1, 2, 3], "Rayon thread allocation should still be 1-3");
382        });
383    }
384}