pub const SCIRS2_FORCE_SERIAL_ENV: &str = "SCIRS2_FORCE_SERIAL";
pub fn par_map_chunks<T, U, F>(input: &[T], chunk_size: usize, f: F) -> Vec<U>
where
T: Sync,
U: Send,
F: Fn(&[T]) -> Vec<U> + Sync + Send,
{
assert!(chunk_size > 0, "par_map_chunks: chunk_size must be > 0");
if std::env::var(SCIRS2_FORCE_SERIAL_ENV).is_ok() {
return input.chunks(chunk_size).flat_map(|c| f(c)).collect();
}
if input.is_empty() {
return Vec::new();
}
#[cfg(all(target_os = "linux", feature = "memory_efficient", feature = "libc"))]
{
use crate::memory_efficient::numa_topology::NumaTopology;
if let Some(topo) = NumaTopology::detect() {
let node_count = topo.num_nodes();
let total_cpus: usize = topo.nodes.iter().map(|n| n.cpu_list.len()).sum();
let num_threads = total_cpus.max(1);
let per_node_cpus: Vec<Vec<usize>> =
topo.nodes.iter().map(|n| n.cpu_list.clone()).collect();
let pool_result = rayon::ThreadPoolBuilder::new()
.num_threads(num_threads)
.start_handler(move |tid| {
if node_count == 0 {
return;
}
let node_idx = tid % node_count;
if let Some(cpu_list) = per_node_cpus.get(node_idx) {
pin_thread_to_cpus(cpu_list);
}
})
.build();
match pool_result {
Ok(pool) => {
let chunks: Vec<&[T]> = input.chunks(chunk_size).collect();
return pool.install(|| {
use rayon::prelude::*;
chunks.into_par_iter().flat_map(|chunk| f(chunk)).collect()
});
}
Err(_) => {
}
}
}
}
{
use rayon::prelude::*;
input.par_chunks(chunk_size).flat_map(|c| f(c)).collect()
}
}
#[cfg(all(target_os = "linux", feature = "libc"))]
fn pin_thread_to_cpus(cpu_list: &[usize]) {
if cpu_list.is_empty() {
return;
}
unsafe {
let mut set: libc::cpu_set_t = std::mem::zeroed();
libc::CPU_ZERO(&mut set);
for &cpu in cpu_list {
if cpu < libc::CPU_SETSIZE as usize {
libc::CPU_SET(cpu, &mut set);
}
}
let tid = libc::pthread_self();
let _ = libc::pthread_setaffinity_np(
tid,
std::mem::size_of::<libc::cpu_set_t>(),
&set as *const libc::cpu_set_t,
);
}
}