agave_thread_manager/
rayon_runtime.rs

1use {
2    crate::{
3        policy::{apply_policy, parse_policy, CoreAllocation},
4        MAX_THREAD_NAME_CHARS,
5    },
6    anyhow::Ok,
7    serde::{Deserialize, Serialize},
8    std::{
9        ops::Deref,
10        sync::{Arc, Mutex},
11    },
12};
13
14#[derive(Clone, Debug, Serialize, Deserialize)]
15#[serde(default)]
16pub struct RayonConfig {
17    pub worker_threads: usize,
18    /// Priority in range 0..99
19    pub priority: u8,
20    pub policy: String,
21    pub stack_size_bytes: usize,
22    pub core_allocation: CoreAllocation,
23}
24
25impl Default for RayonConfig {
26    fn default() -> Self {
27        Self {
28            core_allocation: CoreAllocation::OsDefault,
29            worker_threads: 16,
30            priority: crate::policy::DEFAULT_PRIORITY,
31            policy: "BATCH".to_owned(),
32            stack_size_bytes: 2 * 1024 * 1024,
33        }
34    }
35}
36
37#[derive(Debug)]
38pub struct RayonRuntimeInner {
39    pub rayon_pool: rayon::ThreadPool,
40    pub config: RayonConfig,
41}
42impl Deref for RayonRuntimeInner {
43    type Target = rayon::ThreadPool;
44
45    fn deref(&self) -> &Self::Target {
46        &self.rayon_pool
47    }
48}
49
50#[derive(Debug, Clone)]
51pub struct RayonRuntime {
52    inner: Arc<RayonRuntimeInner>,
53}
54
55impl Deref for RayonRuntime {
56    type Target = RayonRuntimeInner;
57
58    fn deref(&self) -> &Self::Target {
59        self.inner.deref()
60    }
61}
62
63impl RayonRuntime {
64    pub fn new(name: String, config: RayonConfig) -> anyhow::Result<Self> {
65        debug_assert!(name.len() < MAX_THREAD_NAME_CHARS, "Thread name too long");
66        let core_allocation = config.core_allocation.clone();
67        let chosen_cores_mask = Mutex::new(core_allocation.as_core_mask_vector());
68        let priority = config.priority;
69        let policy = parse_policy(&config.policy);
70        let rayon_pool = rayon::ThreadPoolBuilder::new()
71            .num_threads(config.worker_threads)
72            .thread_name(move |i| format!("{}_{}", &name, i))
73            .stack_size(config.stack_size_bytes)
74            .start_handler(move |_idx| {
75                apply_policy(&core_allocation, policy, priority, &chosen_cores_mask);
76            })
77            .build()?;
78        Ok(Self {
79            inner: Arc::new(RayonRuntimeInner { rayon_pool, config }),
80        })
81    }
82
83    #[cfg(feature = "dev-context-only-utils")]
84    pub fn new_for_tests(name: &str) -> Self {
85        Self::new(name.to_owned(), RayonConfig::default())
86            .expect("Failed to create rayon runtime for tests")
87    }
88}