use std::fmt::Display;
use std::num::NonZero;
use std::thread;
use itertools::Itertools;
use nonempty::NonEmpty;
use crate::pal::Platform;
use crate::{Processor, ProcessorSetBuilder, SystemHardware};
#[derive(Clone, Debug)]
pub struct ProcessorSet {
processors: NonEmpty<Processor>,
hardware: SystemHardware,
}
impl ProcessorSet {
#[must_use]
pub(crate) fn new(processors: NonEmpty<Processor>, hardware: SystemHardware) -> Self {
Self {
processors,
hardware,
}
}
#[must_use]
pub fn to_builder(&self) -> ProcessorSetBuilder {
ProcessorSetBuilder::with_internals(self.hardware.clone())
.source_processors(&self.processors)
}
#[must_use]
#[cfg_attr(test, mutants::skip)] pub fn take(&self, count: NonZero<usize>) -> Option<Self> {
self.to_builder().take(count)
}
#[must_use]
pub fn decompose(&self) -> NonEmpty<Self> {
self.processors
.clone()
.map(|processor| Self::new(NonEmpty::singleton(processor), self.hardware.clone()))
}
#[must_use]
#[cfg_attr(test, mutants::skip)] pub fn filter(&self, predicate: impl Fn(&Processor) -> bool) -> Option<Self> {
self.to_builder().filter(predicate).take_all()
}
#[must_use]
#[inline]
#[expect(clippy::len_without_is_empty, reason = "never empty by definition")]
pub fn len(&self) -> usize {
self.processors.len()
}
#[must_use]
#[inline]
pub fn iter(&self) -> nonempty::Iter<'_, Processor> {
self.processors.iter()
}
#[must_use]
#[inline]
pub fn processors(&self) -> &NonEmpty<Processor> {
&self.processors
}
#[must_use]
#[inline]
pub(crate) fn into_processors(self) -> NonEmpty<Processor> {
self.processors
}
pub fn pin_current_thread_to(&self) {
self.hardware
.platform()
.pin_current_thread_to(&self.processors);
if self.processors.len() == 1 {
let processor = self.processors.first();
self.hardware
.update_pin_status(Some(processor.id()), Some(processor.memory_region_id()));
} else if self
.processors
.iter()
.map(Processor::memory_region_id)
.unique()
.count()
== 1
{
let memory_region_id = self.processors.first().memory_region_id();
self.hardware
.update_pin_status(None, Some(memory_region_id));
} else {
self.hardware.update_pin_status(None, None);
}
}
#[cfg_attr(test, mutants::skip)] pub fn spawn_threads<E, R>(&self, entrypoint: E) -> Box<[thread::JoinHandle<R>]>
where
E: Fn(Processor) -> R + Send + Clone + 'static,
R: Send + 'static,
{
self.processors()
.iter()
.map(|processor| {
thread::spawn({
let processor = processor.clone();
let entrypoint = entrypoint.clone();
let hardware = self.hardware.clone();
move || {
let set =
Self::new(NonEmpty::singleton(processor.clone()), hardware.clone());
set.pin_current_thread_to();
entrypoint(processor)
}
})
})
.collect::<Vec<_>>()
.into_boxed_slice()
}
pub fn spawn_thread<E, R>(&self, entrypoint: E) -> thread::JoinHandle<R>
where
E: FnOnce(Self) -> R + Send + 'static,
R: Send + 'static,
{
let set = self.clone();
thread::spawn(move || {
set.pin_current_thread_to();
entrypoint(set)
})
}
}
impl From<ProcessorSet> for ProcessorSetBuilder {
#[inline]
#[cfg_attr(test, mutants::skip)] fn from(value: ProcessorSet) -> Self {
value.to_builder()
}
}
impl From<&ProcessorSet> for ProcessorSetBuilder {
#[inline]
#[cfg_attr(test, mutants::skip)] fn from(value: &ProcessorSet) -> Self {
value.to_builder()
}
}
impl AsRef<Self> for ProcessorSet {
#[inline]
#[cfg_attr(test, mutants::skip)] fn as_ref(&self) -> &Self {
self
}
}
impl Display for ProcessorSet {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let list = cpulist::emit(self.processors.iter().map(Processor::id));
write!(f, " {list} ({} processors)", self.len())
}
}
impl IntoIterator for ProcessorSet {
type IntoIter = <NonEmpty<Processor> as IntoIterator>::IntoIter;
type Item = Processor;
#[inline]
fn into_iter(self) -> Self::IntoIter {
self.processors.into_iter()
}
}
impl<'a> IntoIterator for &'a ProcessorSet {
type IntoIter = nonempty::Iter<'a, Processor>;
type Item = &'a Processor;
#[inline]
fn into_iter(self) -> Self::IntoIter {
self.processors.iter()
}
}
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use new_zealand::nz;
use static_assertions::assert_impl_all;
use testing::with_watchdog;
use super::*;
use crate::EfficiencyClass;
use crate::fake::{HardwareBuilder, ProcessorBuilder};
assert_impl_all!(ProcessorSet: UnwindSafe, RefUnwindSafe);
#[test]
fn smoke_test() {
with_watchdog(|| {
let hardware = SystemHardware::fake(
HardwareBuilder::new()
.processor(
ProcessorBuilder::new().efficiency_class(EfficiencyClass::Efficiency),
)
.processor(
ProcessorBuilder::new().efficiency_class(EfficiencyClass::Performance),
),
);
let processor_set = hardware.processors();
assert_eq!(processor_set.len(), 2);
let mut processor_iter = processor_set.processors().iter();
let p1 = processor_iter.next().unwrap();
assert_eq!(p1.id(), 0);
assert_eq!(p1.memory_region_id(), 0);
assert_eq!(p1.efficiency_class(), EfficiencyClass::Efficiency);
let p2 = processor_iter.next().unwrap();
assert_eq!(p2.id(), 1);
assert_eq!(p2.memory_region_id(), 0);
assert_eq!(p2.efficiency_class(), EfficiencyClass::Performance);
assert!(processor_iter.next().is_none());
processor_set.pin_current_thread_to();
assert!(!hardware.is_thread_processor_pinned());
assert!(hardware.is_thread_memory_region_pinned());
let threads_spawned = Arc::new(AtomicUsize::new(0));
let threads_spawned_clone = Arc::clone(&threads_spawned);
let non_copy_value = "foo".to_string();
processor_set
.spawn_thread({
fn process_string(_s: String) {}
move |spawned_processor_set| {
assert_eq!(spawned_processor_set.len(), 2);
process_string(non_copy_value);
threads_spawned_clone.fetch_add(1, Ordering::Relaxed);
}
})
.join()
.unwrap();
assert_eq!(threads_spawned.load(Ordering::Relaxed), 1);
let threads_spawned = Arc::new(AtomicUsize::new(0));
processor_set
.spawn_threads({
let threads_spawned = Arc::clone(&threads_spawned);
move |_| {
threads_spawned.fetch_add(1, Ordering::Relaxed);
}
})
.into_vec()
.into_iter()
.for_each(|h| h.join().unwrap());
assert_eq!(threads_spawned.load(Ordering::Relaxed), 2);
let cloned_processor_set = processor_set.clone();
assert_eq!(cloned_processor_set.len(), 2);
});
}
#[test]
fn display_shows_cpulist_and_count() {
let hardware = SystemHardware::fake(HardwareBuilder::from_counts(nz!(4), nz!(1)));
let processor_set = hardware.processors();
let display_output = processor_set.to_string();
assert!(
display_output.contains("0-3"),
"display output should contain cpulist format: {display_output}"
);
assert!(
display_output.contains("4 processors"),
"display output should contain processor count: {display_output}"
);
}
#[test]
fn iter_returns_all_processors() {
let hardware = SystemHardware::fake(HardwareBuilder::from_counts(nz!(3), nz!(1)));
let processor_set = hardware.processors();
let ids_via_iter: foldhash::HashSet<_> = processor_set.iter().map(Processor::id).collect();
assert_eq!(ids_via_iter.len(), 3);
assert!(ids_via_iter.contains(&0));
assert!(ids_via_iter.contains(&1));
assert!(ids_via_iter.contains(&2));
}
#[test]
fn into_iterator_for_owned_processor_set() {
let hardware = SystemHardware::fake(HardwareBuilder::from_counts(nz!(3), nz!(1)));
let processor_set = hardware.processors();
let ids: foldhash::HashSet<_> = processor_set.into_iter().map(|p| p.id()).collect();
assert_eq!(ids.len(), 3);
assert!(ids.contains(&0));
assert!(ids.contains(&1));
assert!(ids.contains(&2));
}
#[test]
fn into_iterator_for_ref_processor_set() {
let hardware = SystemHardware::fake(HardwareBuilder::from_counts(nz!(3), nz!(1)));
let processor_set = hardware.processors();
let mut ids = foldhash::HashSet::default();
for processor in &processor_set {
ids.insert(processor.id());
}
assert_eq!(ids.len(), 3);
assert!(ids.contains(&0));
assert!(ids.contains(&1));
assert!(ids.contains(&2));
assert_eq!(processor_set.len(), 3);
}
#[test]
fn from_owned_processor_set_creates_builder() {
let hardware = SystemHardware::fake(HardwareBuilder::from_counts(nz!(4), nz!(1)));
let processor_set = hardware.processors();
let builder: ProcessorSetBuilder = processor_set.into();
let rebuilt = builder.take_all().unwrap();
assert_eq!(rebuilt.len(), 4);
}
#[test]
fn from_ref_processor_set_creates_builder() {
let hardware = SystemHardware::fake(HardwareBuilder::from_counts(nz!(4), nz!(1)));
let processor_set = hardware.processors();
let builder: ProcessorSetBuilder = (&processor_set).into();
let rebuilt = builder.take_all().unwrap();
assert_eq!(rebuilt.len(), 4);
assert_eq!(processor_set.len(), 4);
}
#[test]
fn decompose_single_processor() {
let hardware = SystemHardware::fake(HardwareBuilder::from_counts(nz!(1), nz!(1)));
let processor_set = hardware.processors();
let decomposed = processor_set.decompose();
assert_eq!(decomposed.len(), 1);
assert_eq!(decomposed.first().len(), 1);
assert_eq!(decomposed.first().processors().first().id(), 0);
}
#[test]
fn decompose_multiple_processors() {
let hardware = SystemHardware::fake(HardwareBuilder::from_counts(nz!(3), nz!(1)));
let processor_set = hardware.processors();
let decomposed = processor_set.decompose();
assert_eq!(decomposed.len(), 3);
for set in &decomposed {
assert_eq!(set.len(), 1);
}
let ids: foldhash::HashSet<_> = decomposed
.iter()
.map(|set| set.processors().first().id())
.collect();
assert_eq!(ids.len(), 3);
assert!(ids.contains(&0));
assert!(ids.contains(&1));
assert!(ids.contains(&2));
}
#[test]
fn decompose_preserves_memory_regions() {
let hardware = SystemHardware::fake(
HardwareBuilder::new()
.processor(ProcessorBuilder::new().id(0).memory_region(0))
.processor(ProcessorBuilder::new().id(1).memory_region(1)),
);
let decomposed = hardware.processors().decompose();
assert_eq!(decomposed.len(), 2);
let pairs: foldhash::HashSet<_> = decomposed
.iter()
.map(|set| {
assert_eq!(set.len(), 1);
let p = set.processors().first();
(p.id(), p.memory_region_id())
})
.collect();
assert!(pairs.contains(&(0, 0)));
assert!(pairs.contains(&(1, 1)));
}
#[test]
fn decompose_sets_can_pin() {
let hardware = SystemHardware::fake(HardwareBuilder::from_counts(nz!(2), nz!(1)));
let decomposed = hardware.processors().decompose();
decomposed.first().pin_current_thread_to();
assert!(hardware.is_thread_processor_pinned());
assert!(hardware.is_thread_memory_region_pinned());
}
#[test]
fn pin_to_multiple_memory_regions_clears_region_pin() {
let hardware = SystemHardware::fake(
HardwareBuilder::new()
.processor(ProcessorBuilder::new().id(0).memory_region(0))
.processor(ProcessorBuilder::new().id(1).memory_region(1)),
);
let processor_set = hardware.processors();
assert_eq!(processor_set.len(), 2);
processor_set.pin_current_thread_to();
assert!(!hardware.is_thread_processor_pinned());
assert!(!hardware.is_thread_memory_region_pinned());
}
#[test]
fn to_builder_preserves_processors() {
let set = SystemHardware::current().processors().take(nz!(1)).unwrap();
let builder = set.to_builder();
let set2 = builder.take_all().unwrap();
assert_eq!(set2.len(), 1);
let processor1 = set.processors().first();
let processor2 = set2.processors().first();
assert_eq!(processor1, processor2);
}
#[test]
fn inherit_on_pinned() {
with_watchdog(|| {
thread::spawn(|| {
let hw = SystemHardware::current();
let one = hw.processors().take(nz!(1)).unwrap();
one.pin_current_thread_to();
let current_thread_allowed = hw
.processors()
.to_builder()
.where_available_for_current_thread()
.take_all()
.unwrap();
assert_eq!(current_thread_allowed.len(), 1);
assert_eq!(
current_thread_allowed.processors().first(),
one.processors().first()
);
})
.join()
.unwrap();
});
}
#[test]
fn filter_basic() {
let hardware = SystemHardware::fake(HardwareBuilder::from_counts(nz!(6), nz!(1)));
let all = hardware.processors();
assert_eq!(all.len(), 6);
let even_ids = all.filter(|p| p.id() % 2 == 0).unwrap();
assert_eq!(even_ids.len(), 3);
let ids: foldhash::HashSet<_> = even_ids.iter().map(Processor::id).collect();
assert!(ids.contains(&0));
assert!(ids.contains(&2));
assert!(ids.contains(&4));
}
#[test]
fn filter_returns_none_when_no_matches() {
let hardware = SystemHardware::fake(
HardwareBuilder::new()
.processor(
ProcessorBuilder::new()
.id(0)
.efficiency_class(EfficiencyClass::Efficiency),
)
.processor(
ProcessorBuilder::new()
.id(1)
.efficiency_class(EfficiencyClass::Efficiency),
),
);
let all = hardware.processors();
let performance = all.filter(|p| p.efficiency_class() == EfficiencyClass::Performance);
assert!(performance.is_none());
}
#[test]
fn filter_on_subset() {
let hardware = SystemHardware::fake(
HardwareBuilder::new()
.processor(ProcessorBuilder::new().id(0))
.processor(ProcessorBuilder::new().id(1))
.processor(ProcessorBuilder::new().id(2))
.processor(ProcessorBuilder::new().id(3)),
);
let subset = hardware.processors();
assert_eq!(subset.len(), 4);
let even_from_subset = subset.filter(|p| p.id() % 2 == 0).unwrap();
let subset_ids: foldhash::HashSet<_> = subset.iter().map(Processor::id).collect();
let even_ids: foldhash::HashSet<_> = even_from_subset.iter().map(Processor::id).collect();
for id in &even_ids {
assert_eq!(id % 2, 0);
}
for id in &even_ids {
assert!(subset_ids.contains(id));
}
assert_eq!(even_ids.len(), 2);
assert!(even_ids.contains(&0));
assert!(even_ids.contains(&2));
}
}
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests_fallback {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, mpsc};
use std::thread;
use new_zealand::nz;
use testing::with_watchdog;
use crate::SystemHardware;
#[test]
fn smoke_test() {
let hw = SystemHardware::fallback();
let processor_set = hw.processors();
assert!(processor_set.len() >= 1);
let mut processor_iter = processor_set.processors().iter();
let p1 = processor_iter.next().unwrap();
assert_eq!(p1.id(), 0);
assert_eq!(p1.memory_region_id(), 0);
processor_set.pin_current_thread_to();
let threads_spawned = Arc::new(AtomicUsize::new(0));
let threads_spawned_clone = Arc::clone(&threads_spawned);
processor_set
.spawn_thread(move |_| {
threads_spawned_clone.fetch_add(1, Ordering::Relaxed);
})
.join()
.unwrap();
assert_eq!(threads_spawned.load(Ordering::Relaxed), 1);
}
#[test]
fn pin_updates_tracker() {
let hw = SystemHardware::fallback();
let processor_set = hw.processors().take(nz!(1)).unwrap();
processor_set.pin_current_thread_to();
assert!(hw.is_thread_processor_pinned());
assert!(hw.is_thread_memory_region_pinned());
}
#[test]
fn spawn_thread_pins_correctly() {
with_watchdog(|| {
let hw = SystemHardware::fallback();
let processor_set = hw.processors().take(nz!(1)).unwrap();
let (tx, rx) = mpsc::channel();
processor_set
.spawn_thread(move |_| {
let is_pinned = hw.is_thread_processor_pinned();
tx.send(is_pinned).unwrap();
})
.join()
.unwrap();
let is_pinned = rx.recv().unwrap();
assert!(is_pinned);
});
}
#[test]
fn spawn_threads_pins_all_correctly() {
with_watchdog(|| {
let hw = SystemHardware::fallback();
let processor_set = hw.processors().take(nz!(2));
let Some(processor_set) = processor_set else {
return;
};
let threads = processor_set.spawn_threads(move |_| hw.is_thread_processor_pinned());
for thread in threads {
let is_pinned = thread.join().unwrap();
assert!(is_pinned);
}
});
}
#[test]
fn inherit_on_pinned() {
thread::spawn(|| {
let hw = SystemHardware::fallback();
let one = hw.processors().take(nz!(1)).unwrap();
one.pin_current_thread_to();
let current_thread_allowed = hw
.processors()
.to_builder()
.where_available_for_current_thread()
.take_all()
.unwrap();
assert_eq!(current_thread_allowed.len(), 1);
})
.join()
.unwrap();
}
#[test]
fn to_builder_preserves_processors() {
let hw = SystemHardware::fallback();
let set = hw.processors().to_builder().take(nz!(1)).unwrap();
let builder = set.to_builder();
let set2 = builder.take_all().unwrap();
assert_eq!(set2.len(), 1);
let processor1 = set.processors().first();
let processor2 = set2.processors().first();
assert_eq!(processor1.id(), processor2.id());
}
#[test]
fn take_all_returns_all_processors() {
let hw = SystemHardware::fallback();
let all_set = hw.processors();
let expected_count = thread::available_parallelism()
.map(std::num::NonZero::get)
.unwrap_or(1);
assert_eq!(all_set.len(), expected_count);
}
}