agave_thread_manager/
policy.rs1cfg_if::cfg_if! {
2 if #[cfg(target_os = "linux")]{
3 use thread_priority::{NormalThreadSchedulePolicy, ThreadExt, ThreadSchedulePolicy};
4 }
5 else{
6 #[derive(Clone, Copy)]
7 pub(crate) struct ThreadSchedulePolicy {}
8 }
9}
10use {
11 serde::{Deserialize, Serialize},
12 std::sync::{Mutex, OnceLock},
13};
14
15static CORE_COUNT: OnceLock<usize> = OnceLock::new();
16
17pub const DEFAULT_PRIORITY: u8 = 0;
18
19#[derive(Default, Debug, Clone, Serialize, Deserialize)]
20pub enum CoreAllocation {
21 #[default]
23 OsDefault,
24 PinnedCores { min: usize, max: usize },
26 DedicatedCoreSet { min: usize, max: usize },
28}
29
30impl CoreAllocation {
31 pub fn as_core_mask_vector(&self) -> Vec<usize> {
33 match *self {
34 CoreAllocation::PinnedCores { min, max } => (min..max).collect(),
35 CoreAllocation::DedicatedCoreSet { min, max } => (min..max).collect(),
36 CoreAllocation::OsDefault => Vec::from_iter(0..*CORE_COUNT.get_or_init(num_cpus::get)),
37 }
38 }
39}
40cfg_if::cfg_if! {
41 if #[cfg(target_os = "linux")]{
42
43 pub fn set_thread_affinity(cores: &[usize]) {
44 assert!(
45 !cores.is_empty(),
46 "Can not call setaffinity with empty cores mask"
47 );
48 if let Err(e) = affinity::set_thread_affinity(cores) {
49 let thread = std::thread::current();
50 panic!(
51 "Can not set core affinity {:?} for thread {:?} named {:?}, error {}",
52 cores,
53 thread.id(),
54 thread.name(),
55 e
56 );
57 }
58 }
59 fn apply_thread_scheduler_policy(policy: ThreadSchedulePolicy, priority: u8) {
60 if let Err(e) = std::thread::current().set_priority_and_policy(
61 policy,
62 thread_priority::ThreadPriority::Crossplatform((priority).try_into().expect("Priority value outside of OS-supported range")),
63 ) {
64 panic!("Can not set thread priority, OS error {:?}", e);
65 }
66 }
67 pub fn parse_policy(policy: &str) -> ThreadSchedulePolicy {
68 match policy.to_uppercase().as_ref() {
69 "BATCH" => ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Batch),
70 "OTHER" => ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Other),
71 "IDLE" => ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Idle),
72 _ => panic!("Could not parse the policy"),
73 }
74 }
75 }
76 else{
77
78 pub fn set_thread_affinity(_cores: &[usize]) {}
79
80 pub(crate) fn parse_policy(_policy: &str) -> ThreadSchedulePolicy {
81 ThreadSchedulePolicy {}
82 }
83 fn apply_thread_scheduler_policy(_policy: ThreadSchedulePolicy, _priority: u8) {}
84 }
85}
86
87pub(crate) fn apply_policy(
89 alloc: &CoreAllocation,
90 policy: ThreadSchedulePolicy,
91 priority: u8,
92 chosen_cores_mask: &Mutex<Vec<usize>>,
93) {
94 apply_thread_scheduler_policy(policy, priority);
95 match alloc {
96 CoreAllocation::PinnedCores { min: _, max: _ } => {
97 let mut lg = chosen_cores_mask
98 .lock()
99 .expect("Can not lock core mask mutex");
100 let core = lg
101 .pop()
102 .expect("Not enough cores provided for pinned allocation");
103 set_thread_affinity(&[core]);
104 }
105 CoreAllocation::DedicatedCoreSet { min: _, max: _ } => {
106 let lg = chosen_cores_mask
107 .lock()
108 .expect("Can not lock core mask mutex");
109 set_thread_affinity(&lg);
110 }
111 CoreAllocation::OsDefault => {}
112 }
113}