many_cpus/
processor_set.rs

1use std::{sync::LazyLock, thread};
2
3use itertools::Itertools;
4use nonempty::NonEmpty;
5
6use crate::{
7    HardwareTrackerClient, HardwareTrackerClientFacade, Processor, ProcessorSetBuilder,
8    pal::{Platform, PlatformFacade},
9};
10
11// https://github.com/cloudhead/nonempty/issues/68
12extern crate alloc;
13
14static ALL_PROCESSORS: LazyLock<ProcessorSet> = LazyLock::new(|| {
15    ProcessorSetBuilder::default()
16        .take_all()
17        .expect("there must be at least one processor - how could this code run if not")
18});
19
20/// One or more processors present on the system and available for use.
21///
22/// You can obtain the full set of available processors via [`ProcessorSet::all()`] or specify more
23/// fine-grained selection criteria via [`ProcessorSet::builder()`]. You can use
24/// [`ProcessorSet::to_builder()`] to further narrow down an existing set.
25///
26/// One you have a [`ProcessorSet`], you can iterate over [`ProcessorSet::processors()`]
27/// to inspect the individual processors in the set or use [`ProcessorSet::spawn_threads()`] to
28/// spawn a set of threads pinned to each of the processors in the set, one thread per processor.
29/// You may also use [``ProcessorSet::spawn_thread()`] to spawn a single thread pinned to all
30/// processors in the set, allowing the thread to move but only between the processors in the set.
31///
32/// # Changes at runtime
33///
34/// It is possible that a system will have processors added or removed at runtime. This is not
35/// supported - any hardware changes made at runtime will not be visible to the `ProcessorSet`
36/// instances. Operations attempted on removed processors may fail with an error or panic or
37/// silently misbehave (e.g. threads never starting). Added processors will not be considered a
38/// member of any set.
39#[derive(Clone, Debug)]
40pub struct ProcessorSet {
41    processors: NonEmpty<Processor>,
42
43    // We use this when we pin a thread, to update the tracker
44    // about the current thread's pinning status.
45    tracker_client: HardwareTrackerClientFacade,
46
47    pal: PlatformFacade,
48}
49
50impl ProcessorSet {
51    /// Gets a [`ProcessorSet`] referencing all present and available processors on the system.
52    ///
53    /// This set does not include processors that are not available to the current process or those
54    /// that are purely theoretical (e.g. processors that may be added later to the system but are
55    /// not yet present).
56    pub fn all() -> &'static Self {
57        &ALL_PROCESSORS
58    }
59
60    /// Creates a builder that can be used to construct a processor set with specific criteria.
61    #[cfg_attr(test, mutants::skip)] // Mutates to itself via Default::default().
62    pub fn builder() -> ProcessorSetBuilder {
63        ProcessorSetBuilder::default()
64    }
65
66    /// Returns a [`ProcessorSetBuilder`] that is narrowed down to all processors in the current
67    /// set, to be used to further narrow down the set to a specific subset.
68    pub fn to_builder(&self) -> ProcessorSetBuilder {
69        ProcessorSetBuilder::with_internals(self.tracker_client.clone(), self.pal.clone())
70            .filter(|p| self.processors.contains(p))
71    }
72
73    pub(crate) fn new(
74        processors: NonEmpty<Processor>,
75        tracker_client: HardwareTrackerClientFacade,
76        pal: PlatformFacade,
77    ) -> Self {
78        Self {
79            processors,
80            tracker_client,
81            pal,
82        }
83    }
84
85    /// Returns a [`ProcessorSet`] containing the provided processors.
86    pub fn from_processors(processors: NonEmpty<Processor>) -> Self {
87        let pal = processors.first().pal.clone();
88        Self::new(processors, HardwareTrackerClientFacade::real(), pal)
89    }
90
91    /// Returns a [`ProcessorSet`] containing a single processor.
92    pub fn from_processor(processor: Processor) -> Self {
93        let pal = processor.pal.clone();
94        Self::new(
95            NonEmpty::singleton(processor),
96            HardwareTrackerClientFacade::real(),
97            pal,
98        )
99    }
100
101    /// Returns the number of processors in the set. A processor set is never empty.
102    #[expect(clippy::len_without_is_empty)] // Never empty by definition.
103    pub fn len(&self) -> usize {
104        self.processors.len()
105    }
106
107    /// Returns a reference to a collection containing all the processors in the set.
108    pub fn processors(&self) -> &NonEmpty<Processor> {
109        &self.processors
110    }
111
112    /// Modifies the affinity of the current thread to execute
113    /// only on the processors in this processor set.
114    ///
115    /// # Behavior with multiple processors
116    ///
117    /// If multiple processors are present in the processor set, they might not be evenly used.
118    /// An arbitrary processor may be preferentially used, with others used only when the preferred
119    /// processor is otherwise busy.
120    pub fn pin_current_thread_to(&self) {
121        self.pal.pin_current_thread_to(&self.processors);
122
123        if self.processors.len() == 1 {
124            // If there is only one processor, both the processor and memory region are known.
125            let processor = self.processors.first();
126
127            self.tracker_client
128                .update_pin_status(Some(processor.id()), Some(processor.memory_region_id()));
129        } else if self
130            .processors
131            .iter()
132            .map(|p| p.memory_region_id())
133            .unique()
134            .count()
135            == 1
136        {
137            // All processors are in the same memory region, so we can at least track that.
138            let memory_region_id = self.processors.first().memory_region_id();
139
140            self.tracker_client
141                .update_pin_status(None, Some(memory_region_id));
142        } else {
143            // We got nothing, have to resolve from scratch every time the data is asked for.
144            self.tracker_client.update_pin_status(None, None);
145        }
146    }
147
148    /// Spawns one thread for each processor in the set, pinned to that processor,
149    /// providing the target processor information to the thread entry point.
150    ///
151    /// Each spawned thread will only be scheduled on one of the processors in the set. When that
152    /// processor is busy, the thread will simply wait for the processor to become available.
153    pub fn spawn_threads<E, R>(&self, entrypoint: E) -> Box<[thread::JoinHandle<R>]>
154    where
155        E: Fn(Processor) -> R + Send + Clone + 'static,
156        R: Send + 'static,
157    {
158        self.processors()
159            .iter()
160            .map(|processor| {
161                thread::spawn({
162                    let processor = processor.clone();
163                    let entrypoint = entrypoint.clone();
164                    let tracker_client = self.tracker_client.clone();
165                    let pal = self.pal.clone();
166
167                    move || {
168                        let set = Self::new(
169                            NonEmpty::from_vec(vec![processor.clone()])
170                                .expect("we provide 1-item vec as input, so it must be non-empty"),
171                            tracker_client.clone(),
172                            pal.clone(),
173                        );
174                        set.pin_current_thread_to();
175                        entrypoint(processor)
176                    }
177                })
178            })
179            .collect::<Vec<_>>()
180            .into_boxed_slice()
181    }
182
183    /// Spawns a single thread pinned to the set. The thread will only be scheduled to execute on
184    /// the processors in the processor set and may freely move between the processors in the set.
185    ///
186    /// # Behavior with multiple processors
187    ///
188    /// If multiple processors are present in the processor set, they might not be evenly used.
189    /// An arbitrary processor may be preferentially used, with others used only when the preferred
190    /// processor is otherwise busy.
191    pub fn spawn_thread<E, R>(&self, entrypoint: E) -> thread::JoinHandle<R>
192    where
193        E: FnOnce(ProcessorSet) -> R + Send + 'static,
194        R: Send + 'static,
195    {
196        let set = self.clone();
197
198        thread::spawn(move || {
199            set.pin_current_thread_to();
200            entrypoint(set)
201        })
202    }
203}
204
205impl From<Processor> for ProcessorSet {
206    fn from(value: Processor) -> Self {
207        Self::from_processor(value)
208    }
209}
210
211impl From<NonEmpty<Processor>> for ProcessorSet {
212    fn from(value: NonEmpty<Processor>) -> Self {
213        Self::from_processors(value)
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use std::{
220        num::NonZero,
221        sync::{
222            Arc,
223            atomic::{AtomicUsize, Ordering},
224        },
225    };
226
227    use nonempty::nonempty;
228
229    use crate::{
230        EfficiencyClass, MockHardwareTrackerClient,
231        pal::{FakeProcessor, MockPlatform},
232    };
233
234    use super::*;
235
236    #[test]
237    fn smoke_test() {
238        let mut platform = MockPlatform::new();
239
240        // Pin current thread to entire set.
241        platform
242            .expect_pin_current_thread_to_core()
243            .withf(|p| p.len() == 2)
244            .return_const(());
245
246        // Pin spawned single thread to entire set.
247        platform
248            .expect_pin_current_thread_to_core()
249            .withf(|p| p.len() == 2)
250            .return_const(());
251
252        // Pin spawned two threads, each to one processor.
253        platform
254            .expect_pin_current_thread_to_core()
255            .withf(|p| p.len() == 1)
256            .return_const(());
257
258        platform
259            .expect_pin_current_thread_to_core()
260            .withf(|p| p.len() == 1)
261            .return_const(());
262
263        let platform = PlatformFacade::from_mock(platform);
264
265        let pal_processors = nonempty![
266            FakeProcessor {
267                index: 0,
268                memory_region: 0,
269                efficiency_class: EfficiencyClass::Efficiency,
270            },
271            FakeProcessor {
272                index: 1,
273                memory_region: 0,
274                efficiency_class: EfficiencyClass::Performance,
275            }
276        ];
277
278        let processors = pal_processors.map({
279            let platform = platform.clone();
280            move |p| Processor::new(p.into(), platform.clone())
281        });
282
283        let mut tracker_client = MockHardwareTrackerClient::new();
284
285        tracker_client
286            .expect_update_pin_status()
287            // Once for entrypoint thread, once for spawn_thread().
288            .times(2)
289            .withf(|processor, memory_region| {
290                processor.is_none() && matches!(memory_region, Some(0))
291            })
292            .return_const(());
293
294        // Once for each of the threads in spawn_threads().
295        tracker_client
296            .expect_update_pin_status()
297            .times(1)
298            .withf(|processor, memory_region| {
299                matches!(processor, Some(0)) && matches!(memory_region, Some(0))
300            })
301            .return_const(());
302
303        tracker_client
304            .expect_update_pin_status()
305            .times(1)
306            .withf(|processor, memory_region| {
307                matches!(processor, Some(1)) && matches!(memory_region, Some(0))
308            })
309            .return_const(());
310
311        let tracker_client = HardwareTrackerClientFacade::from_mock(tracker_client);
312
313        let processor_set = ProcessorSet::new(processors, tracker_client, platform);
314
315        // Getters appear to get the expected values.
316        assert_eq!(processor_set.len(), 2);
317
318        // Iterator iterates through the expected stuff.
319        let mut processor_iter = processor_set.processors().iter();
320
321        let p1 = processor_iter.next().unwrap();
322        assert_eq!(p1.id(), 0);
323        assert_eq!(p1.memory_region_id(), 0);
324        assert_eq!(p1.efficiency_class(), EfficiencyClass::Efficiency);
325
326        let p2 = processor_iter.next().unwrap();
327        assert_eq!(p2.id(), 1);
328        assert_eq!(p2.memory_region_id(), 0);
329        assert_eq!(p2.efficiency_class(), EfficiencyClass::Performance);
330
331        assert!(processor_iter.next().is_none());
332
333        // Pin calls into PAL to execute the pinning.
334        processor_set.pin_current_thread_to();
335
336        // spawn_thread() spawns and pins a single thread.
337        let threads_spawned = Arc::new(AtomicUsize::new(0));
338
339        // We create one clone for the worker thread to use.
340        // We do not create any additional clones (spawn_thread() is guaranteed to only spawn 1).
341        let threads_spawned_clone = Arc::clone(&threads_spawned);
342
343        let non_copy_value = "foo".to_string();
344
345        fn process_string(_s: String) {}
346
347        processor_set
348            .spawn_thread({
349                move |processor_set| {
350                    // Verify that we appear to have been given the expected processor set.
351                    assert_eq!(processor_set.len(), 2);
352
353                    // We prove that the callback can use !Copy values by calling this fn.
354                    process_string(non_copy_value);
355
356                    threads_spawned_clone.fetch_add(1, Ordering::Relaxed);
357                }
358            })
359            .join()
360            .unwrap();
361
362        assert_eq!(threads_spawned.load(Ordering::Relaxed), 1);
363
364        // spawn_threads() spawns multiple threads and pins each.
365        let threads_spawned = Arc::new(AtomicUsize::new(0));
366
367        processor_set
368            .spawn_threads({
369                let threads_spawned = Arc::clone(&threads_spawned);
370                move |_| {
371                    threads_spawned.fetch_add(1, Ordering::Relaxed);
372                }
373            })
374            .into_vec()
375            .into_iter()
376            .for_each(|h| h.join().unwrap());
377
378        assert_eq!(threads_spawned.load(Ordering::Relaxed), 2);
379
380        // A clone appears to contain the same stuff.
381        let cloned_processor_set = processor_set.clone();
382
383        assert_eq!(cloned_processor_set.len(), 2);
384    }
385
386    #[cfg(not(miri))] // Miri does not support talking to the real platform.
387    #[test]
388    fn to_builder_preserves_processors() {
389        let set = ProcessorSet::builder()
390            .take(NonZero::new(1).unwrap())
391            .unwrap();
392
393        let builder = set.to_builder();
394
395        let set2 = builder.take_all().unwrap();
396        assert_eq!(set2.len(), 1);
397
398        let processor1 = set.processors().first();
399        let processor2 = set2.processors().first();
400
401        assert_eq!(processor1, processor2);
402    }
403
404    #[cfg(not(miri))] // Miri does not support talking to the real platform.
405    #[test]
406    fn inherit_on_pinned() {
407        thread::spawn(|| {
408            let one = ProcessorSet::builder()
409                .take(NonZero::new(1).unwrap())
410                .unwrap();
411
412            one.pin_current_thread_to();
413
414            // Potential false negative here if the system only has one processor but that's fine.
415            let current_thread_allowed = ProcessorSet::builder()
416                .where_available_for_current_thread()
417                .take_all()
418                .unwrap();
419
420            assert_eq!(current_thread_allowed.len(), 1);
421            assert_eq!(
422                current_thread_allowed.processors().first(),
423                one.processors().first()
424            );
425        })
426        .join()
427        .unwrap();
428    }
429}