agave_thread_manager/
lib.rs

1use {
2    anyhow::Ok,
3    serde::{Deserialize, Serialize},
4    std::{collections::HashMap, ops::Deref, sync::Arc},
5};
6
7pub mod native_thread_runtime;
8pub mod policy;
9pub mod rayon_runtime;
10pub mod tokio_runtime;
11
12pub use {
13    native_thread_runtime::{JoinHandle, NativeConfig, NativeThreadRuntime},
14    policy::CoreAllocation,
15    rayon_runtime::{RayonConfig, RayonRuntime},
16    tokio_runtime::{TokioConfig, TokioRuntime},
17};
18
19pub const MAX_THREAD_NAME_CHARS: usize = 12;
20
21#[derive(Default, Debug)]
22pub struct ThreadManagerInner {
23    pub tokio_runtimes: HashMap<String, TokioRuntime>,
24    pub tokio_runtime_mapping: HashMap<String, String>,
25
26    pub native_thread_runtimes: HashMap<String, NativeThreadRuntime>,
27    pub native_runtime_mapping: HashMap<String, String>,
28
29    pub rayon_runtimes: HashMap<String, RayonRuntime>,
30    pub rayon_runtime_mapping: HashMap<String, String>,
31}
32
33impl ThreadManagerInner {
34    /// Populates mappings with copies of config names, overrides as appropriate
35    fn populate_mappings(&mut self, config: &ThreadManagerConfig) {
36        //TODO: this should probably be cleaned up with a macro at some point...
37
38        for name in config.native_configs.keys() {
39            self.native_runtime_mapping
40                .insert(name.clone(), name.clone());
41        }
42        for (k, v) in config.native_runtime_mapping.iter() {
43            self.native_runtime_mapping.insert(k.clone(), v.clone());
44        }
45
46        for name in config.tokio_configs.keys() {
47            self.tokio_runtime_mapping
48                .insert(name.clone(), name.clone());
49        }
50        for (k, v) in config.tokio_runtime_mapping.iter() {
51            self.tokio_runtime_mapping.insert(k.clone(), v.clone());
52        }
53
54        for name in config.rayon_configs.keys() {
55            self.rayon_runtime_mapping
56                .insert(name.clone(), name.clone());
57        }
58        for (k, v) in config.rayon_runtime_mapping.iter() {
59            self.rayon_runtime_mapping.insert(k.clone(), v.clone());
60        }
61    }
62}
63
64#[derive(Default, Debug, Clone)]
65pub struct ThreadManager {
66    inner: Arc<ThreadManagerInner>,
67}
68
69impl Deref for ThreadManager {
70    type Target = ThreadManagerInner;
71
72    fn deref(&self) -> &Self::Target {
73        &self.inner
74    }
75}
76
77#[derive(Clone, Debug, Serialize, Deserialize)]
78#[serde(default)]
79pub struct ThreadManagerConfig {
80    pub native_configs: HashMap<String, NativeConfig>,
81    pub native_runtime_mapping: HashMap<String, String>,
82
83    pub rayon_configs: HashMap<String, RayonConfig>,
84    pub rayon_runtime_mapping: HashMap<String, String>,
85
86    pub tokio_configs: HashMap<String, TokioConfig>,
87    pub tokio_runtime_mapping: HashMap<String, String>,
88
89    pub default_core_allocation: CoreAllocation,
90}
91
92impl Default for ThreadManagerConfig {
93    fn default() -> Self {
94        Self {
95            native_configs: HashMap::from([("default".to_owned(), NativeConfig::default())]),
96            native_runtime_mapping: HashMap::new(),
97            rayon_configs: HashMap::from([("default".to_owned(), RayonConfig::default())]),
98            rayon_runtime_mapping: HashMap::new(),
99            tokio_configs: HashMap::from([("default".to_owned(), TokioConfig::default())]),
100            tokio_runtime_mapping: HashMap::new(),
101            default_core_allocation: CoreAllocation::OsDefault,
102        }
103    }
104}
105
106impl ThreadManager {
107    /// Will lookup a runtime by given name. If not found, will try to lookup by name "default". If all fails, returns None.
108    fn lookup<'a, T>(
109        &'a self,
110        name: &str,
111        mapping: &HashMap<String, String>,
112        runtimes: &'a HashMap<String, T>,
113    ) -> Option<&'a T> {
114        match mapping.get(name) {
115            Some(n) => runtimes.get(n),
116            None => match mapping.get("default") {
117                Some(n) => {
118                    log::warn!("Falling back to default runtime for {name}");
119                    runtimes.get(n)
120                }
121                None => None,
122            },
123        }
124    }
125
126    pub fn try_get_native(&self, name: &str) -> Option<&NativeThreadRuntime> {
127        self.lookup(
128            name,
129            &self.native_runtime_mapping,
130            &self.native_thread_runtimes,
131        )
132    }
133    pub fn get_native(&self, name: &str) -> &NativeThreadRuntime {
134        if let Some(runtime) = self.try_get_native(name) {
135            runtime
136        } else {
137            panic!("Native thread pool for {name} can not be found!");
138        }
139    }
140
141    pub fn try_get_rayon(&self, name: &str) -> Option<&RayonRuntime> {
142        self.lookup(name, &self.rayon_runtime_mapping, &self.rayon_runtimes)
143    }
144
145    pub fn get_rayon(&self, name: &str) -> &RayonRuntime {
146        if let Some(runtime) = self.try_get_rayon(name) {
147            runtime
148        } else {
149            panic!("Rayon thread pool for {name} can not be found!");
150        }
151    }
152
153    pub fn try_get_tokio(&self, name: &str) -> Option<&TokioRuntime> {
154        self.lookup(name, &self.tokio_runtime_mapping, &self.tokio_runtimes)
155    }
156
157    pub fn get_tokio(&self, name: &str) -> &TokioRuntime {
158        if let Some(runtime) = self.try_get_tokio(name) {
159            runtime
160        } else {
161            panic!("Tokio thread pool for {name} can not be found!");
162        }
163    }
164
165    pub fn set_process_affinity(config: &ThreadManagerConfig) -> anyhow::Result<Vec<usize>> {
166        let chosen_cores_mask = config.default_core_allocation.as_core_mask_vector();
167        crate::policy::set_thread_affinity(&chosen_cores_mask);
168        Ok(chosen_cores_mask)
169    }
170
171    pub fn new(config: ThreadManagerConfig) -> anyhow::Result<Self> {
172        let mut core_allocations = HashMap::<String, Vec<usize>>::new();
173        Self::set_process_affinity(&config)?;
174        let mut manager = ThreadManagerInner::default();
175        manager.populate_mappings(&config);
176        for (name, cfg) in config.native_configs.iter() {
177            let nrt = NativeThreadRuntime::new(name.clone(), cfg.clone());
178            manager.native_thread_runtimes.insert(name.clone(), nrt);
179        }
180        for (name, cfg) in config.rayon_configs.iter() {
181            let rrt = RayonRuntime::new(name.clone(), cfg.clone())?;
182            manager.rayon_runtimes.insert(name.clone(), rrt);
183        }
184
185        for (name, cfg) in config.tokio_configs.iter() {
186            let tokiort = TokioRuntime::new(name.clone(), cfg.clone())?;
187
188            core_allocations.insert(name.clone(), cfg.core_allocation.as_core_mask_vector());
189            manager.tokio_runtimes.insert(name.clone(), tokiort);
190        }
191        Ok(Self {
192            inner: Arc::new(manager),
193        })
194    }
195}
196
197#[cfg(test)]
198mod tests {
199    use {crate::ThreadManagerConfig, std::io::Read};
200    #[cfg(target_os = "linux")]
201    use {
202        crate::{CoreAllocation, NativeConfig, RayonConfig, ThreadManager},
203        std::collections::HashMap,
204    };
205
206    #[test]
207    fn test_config_files() {
208        let experiments = [
209            "examples/core_contention_dedicated_set.toml",
210            "examples/core_contention_contending_set.toml",
211        ];
212
213        for exp in experiments {
214            println!("Loading config {exp}");
215            let mut conffile = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
216            conffile.push(exp);
217            let mut buf = String::new();
218            std::fs::File::open(conffile)
219                .unwrap()
220                .read_to_string(&mut buf)
221                .unwrap();
222            let cfg: ThreadManagerConfig = toml::from_str(&buf).unwrap();
223            println!("{:?}", cfg);
224        }
225    }
226    // Nobody runs Agave on windows, and on Mac we can not set mask affinity without patching external crate
227    #[cfg(target_os = "linux")]
228    fn validate_affinity(expect_cores: &[usize], error_msg: &str) {
229        let affinity = affinity::get_thread_affinity().unwrap();
230        assert_eq!(affinity, expect_cores, "{}", error_msg);
231    }
232    #[test]
233    #[cfg(target_os = "linux")]
234    #[ignore] //test ignored for now as thread priority requires kernel support and extra permissions
235    fn test_thread_priority() {
236        let priority_high = 10;
237        let priority_default = crate::policy::DEFAULT_PRIORITY;
238        let priority_low = 1;
239        let conf = ThreadManagerConfig {
240            native_configs: HashMap::from([
241                (
242                    "high".to_owned(),
243                    NativeConfig {
244                        priority: priority_high,
245                        ..Default::default()
246                    },
247                ),
248                (
249                    "default".to_owned(),
250                    NativeConfig {
251                        ..Default::default()
252                    },
253                ),
254                (
255                    "low".to_owned(),
256                    NativeConfig {
257                        priority: priority_low,
258                        ..Default::default()
259                    },
260                ),
261            ]),
262            ..Default::default()
263        };
264
265        let manager = ThreadManager::new(conf).unwrap();
266        let high = manager.get_native("high");
267        let low = manager.get_native("low");
268        let default = manager.get_native("default");
269
270        high.spawn(move || {
271            let prio =
272                thread_priority::get_thread_priority(thread_priority::thread_native_id()).unwrap();
273            assert_eq!(
274                prio,
275                thread_priority::ThreadPriority::Crossplatform((priority_high).try_into().unwrap())
276            );
277        })
278        .unwrap()
279        .join()
280        .unwrap();
281        low.spawn(move || {
282            let prio =
283                thread_priority::get_thread_priority(thread_priority::thread_native_id()).unwrap();
284            assert_eq!(
285                prio,
286                thread_priority::ThreadPriority::Crossplatform((priority_low).try_into().unwrap())
287            );
288        })
289        .unwrap()
290        .join()
291        .unwrap();
292        default
293            .spawn(move || {
294                let prio =
295                    thread_priority::get_thread_priority(thread_priority::thread_native_id())
296                        .unwrap();
297                assert_eq!(
298                    prio,
299                    thread_priority::ThreadPriority::Crossplatform(
300                        (priority_default).try_into().unwrap()
301                    )
302                );
303            })
304            .unwrap()
305            .join()
306            .unwrap();
307    }
308
309    #[cfg(target_os = "linux")]
310    #[test]
311    fn test_process_affinity() {
312        let conf = ThreadManagerConfig {
313            native_configs: HashMap::from([(
314                "pool1".to_owned(),
315                NativeConfig {
316                    core_allocation: CoreAllocation::DedicatedCoreSet { min: 0, max: 4 },
317                    max_threads: 5,
318                    ..Default::default()
319                },
320            )]),
321            default_core_allocation: CoreAllocation::DedicatedCoreSet { min: 4, max: 8 },
322            native_runtime_mapping: HashMap::from([("test".to_owned(), "pool1".to_owned())]),
323            ..Default::default()
324        };
325
326        let manager = ThreadManager::new(conf).unwrap();
327        let runtime = manager.get_native("test");
328
329        let thread1 = runtime
330            .spawn(|| {
331                validate_affinity(&[0, 1, 2, 3], "Managed thread allocation should be 0-3");
332            })
333            .unwrap();
334
335        let thread2 = std::thread::spawn(|| {
336            validate_affinity(&[4, 5, 6, 7], "Default thread allocation should be 4-7");
337
338            let inner_thread = std::thread::spawn(|| {
339                validate_affinity(
340                    &[4, 5, 6, 7],
341                    "Nested thread allocation should still be 4-7",
342                );
343            });
344            inner_thread.join().unwrap();
345        });
346        thread1.join().unwrap();
347        thread2.join().unwrap();
348    }
349
350    #[cfg(target_os = "linux")]
351    #[test]
352    fn test_rayon_affinity() {
353        let conf = ThreadManagerConfig {
354            rayon_configs: HashMap::from([(
355                "test".to_owned(),
356                RayonConfig {
357                    core_allocation: CoreAllocation::DedicatedCoreSet { min: 1, max: 4 },
358                    worker_threads: 3,
359                    ..Default::default()
360                },
361            )]),
362            default_core_allocation: CoreAllocation::DedicatedCoreSet { min: 4, max: 8 },
363
364            ..Default::default()
365        };
366
367        let manager = ThreadManager::new(conf).unwrap();
368        let rayon_runtime = manager.get_rayon("test");
369
370        let _rr = rayon_runtime.rayon_pool.broadcast(|ctx| {
371            println!("Rayon thread {} reporting", ctx.index());
372            validate_affinity(&[1, 2, 3], "Rayon thread allocation should still be 1-3");
373        });
374    }
375}