agave_thread_manager/
rayon_runtime.rs1use {
2 crate::{
3 policy::{apply_policy, parse_policy, CoreAllocation},
4 MAX_THREAD_NAME_CHARS,
5 },
6 anyhow::Ok,
7 serde::{Deserialize, Serialize},
8 std::{
9 ops::Deref,
10 sync::{Arc, Mutex},
11 },
12};
13
14#[derive(Clone, Debug, Serialize, Deserialize)]
15#[serde(default)]
16pub struct RayonConfig {
17 pub worker_threads: usize,
18 pub priority: u8,
20 pub policy: String,
21 pub stack_size_bytes: usize,
22 pub core_allocation: CoreAllocation,
23}
24
25impl Default for RayonConfig {
26 fn default() -> Self {
27 Self {
28 core_allocation: CoreAllocation::OsDefault,
29 worker_threads: 16,
30 priority: crate::policy::DEFAULT_PRIORITY,
31 policy: "BATCH".to_owned(),
32 stack_size_bytes: 2 * 1024 * 1024,
33 }
34 }
35}
36
37#[derive(Debug)]
38pub struct RayonRuntimeInner {
39 pub rayon_pool: rayon::ThreadPool,
40 pub config: RayonConfig,
41}
42impl Deref for RayonRuntimeInner {
43 type Target = rayon::ThreadPool;
44
45 fn deref(&self) -> &Self::Target {
46 &self.rayon_pool
47 }
48}
49
50#[derive(Debug, Clone)]
51pub struct RayonRuntime {
52 inner: Arc<RayonRuntimeInner>,
53}
54
55impl Deref for RayonRuntime {
56 type Target = RayonRuntimeInner;
57
58 fn deref(&self) -> &Self::Target {
59 self.inner.deref()
60 }
61}
62
63impl RayonRuntime {
64 pub fn new(name: String, config: RayonConfig) -> anyhow::Result<Self> {
65 debug_assert!(name.len() < MAX_THREAD_NAME_CHARS, "Thread name too long");
66 let core_allocation = config.core_allocation.clone();
67 let chosen_cores_mask = Mutex::new(core_allocation.as_core_mask_vector());
68 let priority = config.priority;
69 let policy = parse_policy(&config.policy);
70 let rayon_pool = rayon::ThreadPoolBuilder::new()
71 .num_threads(config.worker_threads)
72 .thread_name(move |i| format!("{}_{}", &name, i))
73 .stack_size(config.stack_size_bytes)
74 .start_handler(move |_idx| {
75 apply_policy(&core_allocation, policy, priority, &chosen_cores_mask);
76 })
77 .build()?;
78 Ok(Self {
79 inner: Arc::new(RayonRuntimeInner { rayon_pool, config }),
80 })
81 }
82
83 #[cfg(feature = "dev-context-only-utils")]
84 pub fn new_for_tests(name: &str) -> Self {
85 Self::new(name.to_owned(), RayonConfig::default())
86 .expect("Failed to create rayon runtime for tests")
87 }
88}