Skip to main content

taktora_executor/
thread_attrs.rs

1//! Thread tuning knobs. Most fields are best-effort; unsupported platforms
2//! emit a `tracing::warn!` and proceed with defaults.
3
4#[cfg(feature = "thread_attrs")]
5mod inner {
6    /// Builder-style thread-tuning bag.
7    #[derive(Clone, Debug, Default)]
8    #[non_exhaustive]
9    pub struct ThreadAttributes {
10        pub(crate) name_prefix: Option<String>,
11        pub(crate) affinity_mask: Option<Vec<usize>>,
12        pub(crate) priority: Option<i32>,
13    }
14
15    impl ThreadAttributes {
16        /// Build a new attributes bag with no settings.
17        #[must_use]
18        pub fn new() -> Self {
19            Self::default()
20        }
21
22        /// Set a prefix used for worker thread names; final name is `<prefix>-<index>`.
23        #[must_use]
24        pub fn name_prefix(mut self, p: impl Into<String>) -> Self {
25            self.name_prefix = Some(p.into());
26            self
27        }
28
29        /// Pin each worker `i` to `cores[i % cores.len()]`.
30        #[must_use]
31        pub fn affinity_mask(mut self, cores: Vec<usize>) -> Self {
32            self.affinity_mask = Some(cores);
33            self
34        }
35
36        /// `SCHED_FIFO` priority on Linux; ignored on platforms that don't
37        /// support it. The user's process must have `CAP_SYS_NICE` / equivalent.
38        #[must_use]
39        pub const fn priority(mut self, p: i32) -> Self {
40            self.priority = Some(p);
41            self
42        }
43
44        /// Apply the recorded attributes to the current thread.
45        pub(crate) fn apply_to_self(&self, worker_index: usize) {
46            if let Some(mask) = &self.affinity_mask {
47                let ids = core_affinity::get_core_ids().unwrap_or_default();
48                if let Some(core) = mask.get(worker_index % mask.len()) {
49                    if let Some(c) = ids.get(*core) {
50                        let _ = core_affinity::set_for_current(*c);
51                    }
52                }
53            }
54            #[cfg(target_os = "linux")]
55            if let Some(prio) = self.priority {
56                set_sched_fifo(prio);
57            }
58            // Suppress unused-variable warning on non-Linux targets.
59            #[cfg(not(target_os = "linux"))]
60            let _ = self.priority;
61        }
62    }
63
64    #[cfg(target_os = "linux")]
65    #[allow(unsafe_code)]
66    fn set_sched_fifo(prio: i32) {
67        use std::mem::MaybeUninit;
68        let mut param: MaybeUninit<libc::sched_param> = MaybeUninit::zeroed();
69        // SAFETY: pthread_setschedparam takes a pointer to sched_param;
70        // the param is zero-initialised then we set sched_priority before
71        // passing it. Failure (e.g. no CAP_SYS_NICE) is silently ignored.
72        unsafe {
73            (*param.as_mut_ptr()).sched_priority = prio;
74            let _ =
75                libc::pthread_setschedparam(libc::pthread_self(), libc::SCHED_FIFO, param.as_ptr());
76        }
77    }
78}
79
80#[cfg(feature = "thread_attrs")]
81pub use inner::ThreadAttributes;
82
83#[cfg(not(feature = "thread_attrs"))]
84mod stub {
85    /// Disabled stub. Enable the `thread_attrs` feature for real settings.
86    #[derive(Clone, Debug, Default)]
87    pub struct ThreadAttributes;
88
89    impl ThreadAttributes {
90        /// Build a new attributes bag (no-op when feature is off).
91        #[must_use]
92        pub const fn new() -> Self {
93            Self
94        }
95
96        #[allow(clippy::unused_self)]
97        pub(crate) const fn apply_to_self(&self, _i: usize) {}
98    }
99}
100
101#[cfg(not(feature = "thread_attrs"))]
102pub use stub::ThreadAttributes;