agave_thread_manager/
policy.rs

1cfg_if::cfg_if! {
2    if #[cfg(target_os = "linux")]{
3        use thread_priority::{NormalThreadSchedulePolicy, ThreadExt, ThreadSchedulePolicy};
4    }
5    else{
6        #[derive(Clone, Copy)]
7        pub(crate) struct ThreadSchedulePolicy {}
8    }
9}
10use {
11    serde::{Deserialize, Serialize},
12    std::sync::{Mutex, OnceLock},
13};
14
15static CORE_COUNT: OnceLock<usize> = OnceLock::new();
16
17pub const DEFAULT_PRIORITY: u8 = 0;
18
19#[derive(Default, Debug, Clone, Serialize, Deserialize)]
20pub enum CoreAllocation {
21    ///Use OS default allocation (i.e. do not alter core affinity)
22    #[default]
23    OsDefault,
24    ///Pin each thread to a core in given range. Number of cores should be >= number of threads
25    PinnedCores { min: usize, max: usize },
26    ///Pin the threads to a set of cores
27    DedicatedCoreSet { min: usize, max: usize },
28}
29
30impl CoreAllocation {
31    /// Converts into a vector of core IDs. OsDefault is converted to vector with all core IDs.
32    pub fn as_core_mask_vector(&self) -> Vec<usize> {
33        match *self {
34            CoreAllocation::PinnedCores { min, max } => (min..max).collect(),
35            CoreAllocation::DedicatedCoreSet { min, max } => (min..max).collect(),
36            CoreAllocation::OsDefault => Vec::from_iter(0..*CORE_COUNT.get_or_init(num_cpus::get)),
37        }
38    }
39}
40cfg_if::cfg_if! {
41    if #[cfg(target_os = "linux")]{
42
43        pub fn set_thread_affinity(cores: &[usize]) {
44            assert!(
45                !cores.is_empty(),
46                "Can not call setaffinity with empty cores mask"
47            );
48            if let Err(e) = affinity::set_thread_affinity(cores) {
49                let thread = std::thread::current();
50                panic!(
51                    "Can not set core affinity {:?} for thread {:?} named {:?}, error {}",
52                    cores,
53                    thread.id(),
54                    thread.name(),
55                    e
56                );
57            }
58        }
59        fn apply_thread_scheduler_policy(policy: ThreadSchedulePolicy, priority: u8) {
60            if let Err(e) = std::thread::current().set_priority_and_policy(
61                policy,
62                thread_priority::ThreadPriority::Crossplatform((priority).try_into().expect("Priority value outside of OS-supported range")),
63            ) {
64                panic!("Can not set thread priority, OS error {e:?}");
65            }
66        }
67        pub fn parse_policy(policy: &str) -> ThreadSchedulePolicy {
68            match policy.to_uppercase().as_ref() {
69                "BATCH" => ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Batch),
70                "OTHER" => ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Other),
71                "IDLE" => ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Idle),
72                _ => panic!("Could not parse the policy"),
73            }
74        }
75    }
76    else{
77
78        pub fn set_thread_affinity(_cores: &[usize]) {}
79
80        pub(crate) fn parse_policy(_policy: &str) -> ThreadSchedulePolicy {
81            ThreadSchedulePolicy {}
82        }
83        fn apply_thread_scheduler_policy(_policy: ThreadSchedulePolicy, _priority: u8) {}
84    }
85}
86
87///Applies policy to the calling thread
88pub(crate) fn apply_policy(
89    alloc: &CoreAllocation,
90    policy: ThreadSchedulePolicy,
91    priority: u8,
92    chosen_cores_mask: &Mutex<Vec<usize>>,
93) {
94    apply_thread_scheduler_policy(policy, priority);
95    match alloc {
96        CoreAllocation::PinnedCores { min: _, max: _ } => {
97            let mut lg = chosen_cores_mask
98                .lock()
99                .expect("Can not lock core mask mutex");
100            let core = lg
101                .pop()
102                .expect("Not enough cores provided for pinned allocation");
103            set_thread_affinity(&[core]);
104        }
105        CoreAllocation::DedicatedCoreSet { min: _, max: _ } => {
106            let lg = chosen_cores_mask
107                .lock()
108                .expect("Can not lock core mask mutex");
109            set_thread_affinity(&lg);
110        }
111        CoreAllocation::OsDefault => {}
112    }
113}