#![no_std]
#[cfg(feature = "std")]
extern crate std;
#[cfg(feature = "std")]
use std::ffi::CStr;
use core::cell::UnsafeCell;
use core::ffi::{c_char, c_int, c_void};
use core::marker::PhantomData;
use core::ptr::NonNull;
use core::slice;
use core::sync::atomic::{AtomicBool, Ordering};
pub const DEFAULT_ALIGNMENT: usize = 128;
#[repr(align(128))]
#[derive(Clone, Copy, Debug, Default)]
pub struct CacheAligned<T>(pub T);
pub struct BasicSpinMutex<T, const PAUSE: bool> {
locked: AtomicBool,
data: UnsafeCell<T>,
}
impl<T, const PAUSE: bool> BasicSpinMutex<T, PAUSE> {
pub const fn new(data: T) -> Self {
Self {
locked: AtomicBool::new(false),
data: UnsafeCell::new(data),
}
}
pub fn lock(&self) -> BasicSpinMutexGuard<'_, T, PAUSE> {
while self
.locked
.compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_err()
{
if PAUSE {
core::hint::spin_loop();
}
}
BasicSpinMutexGuard { mutex: self }
}
pub fn try_lock(&self) -> Option<BasicSpinMutexGuard<'_, T, PAUSE>> {
if self
.locked
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
Some(BasicSpinMutexGuard { mutex: self })
} else {
None
}
}
pub fn is_locked(&self) -> bool {
self.locked.load(Ordering::Acquire)
}
pub fn into_inner(self) -> T {
self.data.into_inner()
}
pub fn get_mut(&mut self) -> &mut T {
self.data.get_mut()
}
}
unsafe impl<T: Send, const PAUSE: bool> Send for BasicSpinMutex<T, PAUSE> {}
unsafe impl<T: Send, const PAUSE: bool> Sync for BasicSpinMutex<T, PAUSE> {}
pub struct BasicSpinMutexGuard<'a, T, const PAUSE: bool> {
mutex: &'a BasicSpinMutex<T, PAUSE>,
}
impl<'a, T, const PAUSE: bool> BasicSpinMutexGuard<'a, T, PAUSE> {
pub fn get(&self) -> &T {
unsafe { &*self.mutex.data.get() }
}
pub fn get_mut(&mut self) -> &mut T {
unsafe { &mut *self.mutex.data.get() }
}
}
impl<'a, T, const PAUSE: bool> core::ops::Deref for BasicSpinMutexGuard<'a, T, PAUSE> {
type Target = T;
fn deref(&self) -> &Self::Target {
unsafe { &*self.mutex.data.get() }
}
}
impl<'a, T, const PAUSE: bool> core::ops::DerefMut for BasicSpinMutexGuard<'a, T, PAUSE> {
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { &mut *self.mutex.data.get() }
}
}
impl<'a, T, const PAUSE: bool> Drop for BasicSpinMutexGuard<'a, T, PAUSE> {
fn drop(&mut self) {
self.mutex.locked.store(false, Ordering::Release);
}
}
pub type SpinMutex<T> = BasicSpinMutex<T, true>;
#[derive(Copy, Clone, Debug)]
pub struct Prong {
pub task_index: usize,
pub thread_index: usize,
pub colocation_index: usize,
}
#[derive(Debug)]
pub enum Error {
CreationFailed,
SpawnFailed,
InvalidParameter,
UnsupportedPlatform,
}
#[cfg(feature = "std")]
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::CreationFailed => write!(f, "failed to create thread pool"),
Self::SpawnFailed => write!(f, "failed to spawn worker threads"),
Self::InvalidParameter => write!(f, "invalid parameter provided"),
Self::UnsupportedPlatform => write!(f, "platform not supported"),
}
}
}
#[cfg(feature = "std")]
impl std::error::Error for Error {}
extern "C" {
fn fu_version_major() -> c_int;
fn fu_version_minor() -> c_int;
fn fu_version_patch() -> c_int;
fn fu_enabled_numa() -> c_int;
fn fu_capabilities_string() -> *const c_char;
fn fu_count_logical_cores() -> usize;
fn fu_count_colocations() -> usize;
fn fu_count_numa_nodes() -> usize;
fn fu_count_quality_levels() -> usize;
fn fu_volume_any_pages() -> usize;
fn fu_pool_new(name: *const c_char) -> *mut c_void;
fn fu_pool_delete(pool: *mut c_void);
fn fu_pool_spawn(pool: *mut c_void, threads: usize, exclusivity: c_int) -> c_int;
fn fu_pool_terminate(pool: *mut c_void);
fn fu_pool_count_threads(pool: *mut c_void) -> usize;
fn fu_pool_count_colocations(pool: *mut c_void) -> usize;
fn fu_pool_count_threads_in(pool: *mut c_void, colocation_index: usize) -> usize;
fn fu_pool_locate_thread_in(
pool: *mut c_void,
global_thread_index: usize,
colocation_index: usize,
) -> usize;
#[allow(dead_code)]
fn fu_pool_for_threads(
pool: *mut c_void,
callback: extern "C" fn(*mut c_void, usize, usize),
context: *mut c_void,
);
fn fu_pool_for_n(
pool: *mut c_void,
n: usize,
callback: extern "C" fn(*mut c_void, usize, usize, usize),
context: *mut c_void,
);
fn fu_pool_for_n_dynamic(
pool: *mut c_void,
n: usize,
callback: extern "C" fn(*mut c_void, usize, usize, usize),
context: *mut c_void,
);
fn fu_pool_for_slices(
pool: *mut c_void,
n: usize,
callback: extern "C" fn(*mut c_void, usize, usize, usize, usize),
context: *mut c_void,
);
fn fu_pool_unsafe_for_threads(
pool: *mut c_void,
callback: extern "C" fn(*mut c_void, usize, usize),
context: *mut c_void,
);
fn fu_pool_unsafe_join(pool: *mut c_void);
fn fu_pool_sleep(pool: *mut c_void, micros: usize);
fn fu_allocate_at_least(
numa_node_index: usize,
minimum_bytes: usize,
allocated_bytes: *mut usize,
bytes_per_page: *mut usize,
) -> *mut c_void;
fn fu_allocate(numa_node_index: usize, bytes: usize) -> *mut c_void;
fn fu_free(numa_node_index: usize, pointer: *mut c_void, bytes: usize);
fn fu_volume_huge_pages_in(numa_node_index: usize) -> usize;
fn fu_volume_any_pages_in(numa_node_index: usize) -> usize;
}
#[cfg(feature = "std")]
pub fn capabilities_string() -> Option<&'static str> {
unsafe {
let ptr = fu_capabilities_string();
if ptr.is_null() {
None
} else {
CStr::from_ptr(ptr).to_str().ok()
}
}
}
pub fn capabilities_string_ptr() -> *const c_char {
unsafe { fu_capabilities_string() }
}
pub fn volume_any_pages() -> usize {
unsafe { fu_volume_any_pages() }
}
pub fn count_logical_cores() -> usize {
unsafe { fu_count_logical_cores() }
}
pub fn count_numa_nodes() -> usize {
unsafe { fu_count_numa_nodes() }
}
pub fn count_colocations() -> usize {
unsafe { fu_count_colocations() }
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum CallerExclusivity {
Inclusive = 0,
Exclusive = 1,
}
pub fn count_quality_levels() -> usize {
unsafe { fu_count_quality_levels() }
}
pub fn numa_enabled() -> bool {
unsafe { fu_enabled_numa() != 0 }
}
pub fn version_major() -> usize {
unsafe { fu_version_major() as usize }
}
pub fn version_minor() -> usize {
unsafe { fu_version_minor() as usize }
}
pub fn version_patch() -> usize {
unsafe { fu_version_patch() as usize }
}
pub fn version() -> (usize, usize, usize) {
(version_major(), version_minor(), version_patch())
}
pub struct ThreadPool {
inner: *mut c_void,
}
unsafe impl Send for ThreadPool {}
unsafe impl Sync for ThreadPool {}
impl ThreadPool {
pub fn try_spawn_with_exclusivity(
threads: usize,
exclusivity: CallerExclusivity,
) -> Result<Self, Error> {
Self::try_named_spawn_with_exclusivity(None, threads, exclusivity)
}
pub fn try_named_spawn_with_exclusivity(
name: Option<&str>,
threads: usize,
exclusivity: CallerExclusivity,
) -> Result<Self, Error> {
if threads == 0 {
return Err(Error::InvalidParameter);
}
unsafe {
let name_ptr = if let Some(name_str) = name {
let mut name_buffer = [0u8; 16];
let name_bytes = name_str.as_bytes();
let copy_len = core::cmp::min(name_bytes.len(), 15); name_buffer[..copy_len].copy_from_slice(&name_bytes[..copy_len]);
name_buffer.as_ptr() as *const c_char
} else {
core::ptr::null()
};
let inner = fu_pool_new(name_ptr);
if inner.is_null() {
return Err(Error::CreationFailed);
}
let success = fu_pool_spawn(inner, threads, exclusivity as c_int);
if success == 0 {
fu_pool_delete(inner);
return Err(Error::SpawnFailed);
}
Ok(Self { inner })
}
}
pub fn try_spawn(threads: usize) -> Result<Self, Error> {
Self::try_spawn_with_exclusivity(threads, CallerExclusivity::Inclusive)
}
pub fn try_named_spawn(name: &str, threads: usize) -> Result<Self, Error> {
Self::try_named_spawn_with_exclusivity(Some(name), threads, CallerExclusivity::Inclusive)
}
pub fn threads(&self) -> usize {
unsafe { fu_pool_count_threads(self.inner) }
}
pub fn colocations(&self) -> usize {
unsafe { fu_pool_count_colocations(self.inner) }
}
pub fn count_threads_in(&self, colocation_index: usize) -> usize {
unsafe { fu_pool_count_threads_in(self.inner, colocation_index) }
}
pub fn locate_thread_in(&self, global_thread_index: usize, colocation_index: usize) -> usize {
unsafe { fu_pool_locate_thread_in(self.inner, global_thread_index, colocation_index) }
}
pub fn sleep(&mut self, micros: usize) {
unsafe {
fu_pool_sleep(self.inner, micros);
}
}
pub fn for_threads<F>(&mut self, function: F) -> ForThreadsOperation<'_, F>
where
F: Fn(usize, usize) + Sync,
{
ForThreadsOperation::new(self, function)
}
pub fn for_n<F>(&mut self, n: usize, function: F) -> ForNOperation<'_, F>
where
F: Fn(Prong) + Sync,
{
ForNOperation {
pool: self,
n,
function,
}
}
pub fn for_n_dynamic<F>(&mut self, n: usize, function: F) -> ForNDynamicOperation<'_, F>
where
F: Fn(Prong) + Sync,
{
ForNDynamicOperation {
pool: self,
n,
function,
}
}
pub fn for_slices<F>(&mut self, n: usize, function: F) -> ForSlicesOperation<'_, F>
where
F: Fn(Prong, usize) + Sync,
{
ForSlicesOperation {
pool: self,
n,
function,
}
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
unsafe {
fu_pool_terminate(self.inner);
fu_pool_delete(self.inner);
}
}
}
#[derive(Debug)]
pub struct AllocationResult {
ptr: NonNull<u8>,
allocated_bytes: usize,
bytes_per_page: usize,
numa_node: usize,
}
impl AllocationResult {
pub fn as_mut_slice(&mut self) -> &mut [u8] {
unsafe { slice::from_raw_parts_mut(self.ptr.as_ptr(), self.allocated_bytes) }
}
pub fn as_slice(&self) -> &[u8] {
unsafe { slice::from_raw_parts(self.ptr.as_ptr(), self.allocated_bytes) }
}
pub fn as_ptr(&self) -> *mut u8 {
self.ptr.as_ptr()
}
pub fn allocated_bytes(&self) -> usize {
self.allocated_bytes
}
pub fn bytes_per_page(&self) -> usize {
self.bytes_per_page
}
pub fn numa_node(&self) -> usize {
self.numa_node
}
pub unsafe fn as_mut_slice_of<T>(&mut self) -> &mut [T] {
let element_size = core::mem::size_of::<T>();
let element_count = self.allocated_bytes / element_size;
slice::from_raw_parts_mut(self.ptr.as_ptr() as *mut T, element_count)
}
pub unsafe fn as_slice_of<T>(&self) -> &[T] {
let element_size = core::mem::size_of::<T>();
let element_count = self.allocated_bytes / element_size;
slice::from_raw_parts(self.ptr.as_ptr() as *const T, element_count)
}
}
impl Drop for AllocationResult {
fn drop(&mut self) {
unsafe {
fu_free(
self.numa_node,
self.ptr.as_ptr() as *mut c_void,
self.allocated_bytes,
);
}
}
}
unsafe impl Send for AllocationResult {}
unsafe impl Sync for AllocationResult {}
#[derive(Debug, Clone, Copy)]
pub struct PinnedAllocator {
numa_node: usize,
}
impl PinnedAllocator {
pub fn new(numa_node: usize) -> Option<Self> {
if numa_node >= count_numa_nodes() {
return None;
}
Some(Self { numa_node })
}
pub fn numa_node(&self) -> usize {
self.numa_node
}
pub fn volume_huge_pages(&self) -> usize {
unsafe { fu_volume_huge_pages_in(self.numa_node) }
}
pub fn volume_any_pages(&self) -> usize {
unsafe { fu_volume_any_pages_in(self.numa_node) }
}
pub fn allocate_at_least(&self, minimum_bytes: usize) -> Option<AllocationResult> {
if minimum_bytes == 0 {
return None;
}
let mut allocated_bytes = 0usize;
let mut bytes_per_page = 0usize;
unsafe {
let ptr = fu_allocate_at_least(
self.numa_node,
minimum_bytes,
&mut allocated_bytes as *mut usize,
&mut bytes_per_page as *mut usize,
);
if ptr.is_null() || allocated_bytes == 0 {
return None;
}
Some(AllocationResult {
ptr: NonNull::new_unchecked(ptr as *mut u8),
allocated_bytes,
bytes_per_page,
numa_node: self.numa_node,
})
}
}
pub fn allocate(&self, bytes: usize) -> Option<AllocationResult> {
if bytes == 0 {
return None;
}
unsafe {
let ptr = fu_allocate(self.numa_node, bytes);
if ptr.is_null() {
return None;
}
Some(AllocationResult {
ptr: NonNull::new_unchecked(ptr as *mut u8),
allocated_bytes: bytes,
bytes_per_page: 0, numa_node: self.numa_node,
})
}
}
pub fn allocate_for<T>(&self, count: usize) -> Option<AllocationResult> {
let bytes = count.checked_mul(core::mem::size_of::<T>())?;
self.allocate(bytes)
}
pub fn allocate_for_at_least<T>(&self, min_count: usize) -> Option<AllocationResult> {
let min_bytes = min_count.checked_mul(core::mem::size_of::<T>())?;
self.allocate_at_least(min_bytes)
}
}
pub fn default_numa_allocator() -> Option<PinnedAllocator> {
PinnedAllocator::new(0)
}
#[derive(Debug)]
pub struct PinnedVec<T> {
allocator: PinnedAllocator,
allocation: Option<AllocationResult>,
len: usize,
capacity: usize,
_phantom: core::marker::PhantomData<T>,
}
impl<T> PinnedVec<T> {
pub fn new_in(allocator: PinnedAllocator) -> Self {
Self {
allocator,
allocation: None,
len: 0,
capacity: 0,
_phantom: core::marker::PhantomData,
}
}
pub fn with_capacity_in(allocator: PinnedAllocator, capacity: usize) -> Option<Self> {
let mut vec = Self {
allocator,
allocation: None,
len: 0,
capacity: 0,
_phantom: core::marker::PhantomData,
};
if capacity > 0 {
vec.reserve(capacity).ok()?;
}
Some(vec)
}
pub fn len(&self) -> usize {
self.len
}
pub fn is_empty(&self) -> bool {
self.len == 0
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn numa_node(&self) -> usize {
self.allocator.numa_node()
}
pub fn reserve(&mut self, additional: usize) -> Result<(), &'static str> {
let needed_capacity = self
.len
.checked_add(additional)
.ok_or("Capacity overflow")?;
if needed_capacity <= self.capacity {
return Ok(());
}
let new_capacity = needed_capacity.max(self.capacity * 2).max(4);
self.grow_to(new_capacity)
}
fn grow_to(&mut self, new_capacity: usize) -> Result<(), &'static str> {
if new_capacity <= self.capacity {
return Ok(());
}
let new_allocation = self
.allocator
.allocate_for::<T>(new_capacity)
.ok_or("Failed to allocate memory")?;
if let Some(old_allocation) = self.allocation.take() {
unsafe {
let old_ptr = old_allocation.as_ptr() as *const T;
let new_ptr = new_allocation.as_ptr() as *mut T;
core::ptr::copy_nonoverlapping(old_ptr, new_ptr, self.len);
}
}
self.allocation = Some(new_allocation);
self.capacity = new_capacity;
Ok(())
}
pub fn push(&mut self, value: T) -> Result<(), &'static str> {
if self.len >= self.capacity {
self.reserve(1)?;
}
unsafe {
let ptr = self.as_mut_ptr().add(self.len);
core::ptr::write(ptr, value);
}
self.len += 1;
Ok(())
}
pub fn pop(&mut self) -> Option<T> {
if self.len == 0 {
return None;
}
self.len -= 1;
unsafe {
let ptr = self.as_mut_ptr().add(self.len);
Some(core::ptr::read(ptr))
}
}
pub fn clear(&mut self) {
unsafe {
let ptr = self.as_mut_ptr();
for i in 0..self.len {
core::ptr::drop_in_place(ptr.add(i));
}
}
self.len = 0;
}
pub fn as_ptr(&self) -> *const T {
match &self.allocation {
Some(alloc) => alloc.as_ptr() as *const T,
None => core::ptr::NonNull::dangling().as_ptr(),
}
}
pub fn as_mut_ptr(&mut self) -> *mut T {
match &self.allocation {
Some(alloc) => alloc.as_ptr() as *mut T,
None => core::ptr::NonNull::dangling().as_ptr(),
}
}
pub fn sync_ptr(&self) -> SyncMutPtr<T> {
let ptr = match &self.allocation {
Some(alloc) => alloc.as_ptr() as *mut T,
None => core::ptr::NonNull::dangling().as_ptr(),
};
SyncMutPtr::new(ptr)
}
pub fn as_slice(&self) -> &[T] {
unsafe { core::slice::from_raw_parts(self.as_ptr(), self.len) }
}
pub fn as_mut_slice(&mut self) -> &mut [T] {
unsafe { core::slice::from_raw_parts_mut(self.as_mut_ptr(), self.len) }
}
pub fn iter(&self) -> core::slice::Iter<'_, T> {
self.as_slice().iter()
}
pub fn iter_mut(&mut self) -> core::slice::IterMut<'_, T> {
self.as_mut_slice().iter_mut()
}
pub fn par_iter(&self) -> ParallelSlice<'_, T> {
ParallelSlice::new(self.as_slice())
}
pub fn par_iter_mut(&mut self) -> ParallelSliceMut<'_, T>
where
T: Send,
{
ParallelSliceMut::new(self.as_mut_slice())
}
pub fn insert(&mut self, index: usize, element: T) -> Result<(), &'static str> {
if index > self.len {
panic!(
"insertion index (is {}) should be <= len (is {})",
index, self.len
);
}
if self.len >= self.capacity {
self.reserve(1)?;
}
unsafe {
let ptr = self.as_mut_ptr();
core::ptr::copy(ptr.add(index), ptr.add(index + 1), self.len - index);
core::ptr::write(ptr.add(index), element);
}
self.len += 1;
Ok(())
}
pub fn remove(&mut self, index: usize) -> T {
if index >= self.len {
panic!(
"removal index (is {}) should be < len (is {})",
index, self.len
);
}
unsafe {
let ptr = self.as_mut_ptr();
let result = core::ptr::read(ptr.add(index));
core::ptr::copy(ptr.add(index + 1), ptr.add(index), self.len - index - 1);
self.len -= 1;
result
}
}
pub fn extend_from_slice(&mut self, other: &[T]) -> Result<(), &'static str>
where
T: Clone,
{
self.reserve(other.len())?;
for item in other {
self.push(item.clone())?;
}
Ok(())
}
pub fn get<I>(&self, index: I) -> Option<&<I as core::slice::SliceIndex<[T]>>::Output>
where
I: core::slice::SliceIndex<[T]>,
{
self.as_slice().get(index)
}
pub fn get_mut<I>(
&mut self,
index: I,
) -> Option<&mut <I as core::slice::SliceIndex<[T]>>::Output>
where
I: core::slice::SliceIndex<[T]>,
{
self.as_mut_slice().get_mut(index)
}
pub fn first(&self) -> Option<&T> {
self.as_slice().first()
}
pub fn first_mut(&mut self) -> Option<&mut T> {
self.as_mut_slice().first_mut()
}
pub fn last(&self) -> Option<&T> {
self.as_slice().last()
}
pub fn last_mut(&mut self) -> Option<&mut T> {
self.as_mut_slice().last_mut()
}
pub fn swap(&mut self, a: usize, b: usize) {
self.as_mut_slice().swap(a, b)
}
pub fn reverse(&mut self) {
self.as_mut_slice().reverse()
}
pub fn contains(&self, x: &T) -> bool
where
T: PartialEq,
{
self.as_slice().contains(x)
}
pub fn truncate(&mut self, len: usize) {
if len < self.len {
unsafe {
let ptr = self.as_mut_ptr();
for i in len..self.len {
core::ptr::drop_in_place(ptr.add(i));
}
}
self.len = len;
}
}
pub fn resize(&mut self, new_len: usize, value: T) -> Result<(), &'static str>
where
T: Clone,
{
if new_len > self.len {
self.reserve(new_len - self.len)?;
while self.len < new_len {
self.push(value.clone())?;
}
} else {
self.truncate(new_len);
}
Ok(())
}
pub fn resize_with<F>(&mut self, new_len: usize, f: F) -> Result<(), &'static str>
where
F: FnMut() -> T,
{
if new_len > self.len {
self.reserve(new_len - self.len)?;
let mut f = f;
while self.len < new_len {
self.push(f())?;
}
} else {
self.truncate(new_len);
}
Ok(())
}
pub fn fill(&mut self, value: T)
where
T: Clone,
{
self.as_mut_slice().fill(value);
}
pub fn fill_with<F>(&mut self, f: F)
where
F: FnMut() -> T,
{
self.as_mut_slice().fill_with(f);
}
}
impl<T> core::ops::Index<usize> for PinnedVec<T> {
type Output = T;
fn index(&self, index: usize) -> &Self::Output {
&self.as_slice()[index]
}
}
impl<T> core::ops::IndexMut<usize> for PinnedVec<T> {
fn index_mut(&mut self, index: usize) -> &mut Self::Output {
&mut self.as_mut_slice()[index]
}
}
impl<T> Drop for PinnedVec<T> {
fn drop(&mut self) {
self.clear();
}
}
unsafe impl<T: Send> Send for PinnedVec<T> {}
unsafe impl<T: Sync> Sync for PinnedVec<T> {}
pub struct RoundRobinVec<T> {
colocations: PinnedVec<PinnedVec<T>>,
total_length: usize,
total_capacity: usize,
}
impl<T> RoundRobinVec<T> {
pub fn new() -> Option<Self> {
let colocations_count = count_colocations();
if colocations_count == 0 {
return None;
}
let container_allocator = PinnedAllocator::new(0)?;
let mut colocations = PinnedVec::with_capacity_in(container_allocator, colocations_count)?;
for colocation_index in 0..colocations_count {
let allocator = PinnedAllocator::new(colocation_index)?;
let vec = PinnedVec::new_in(allocator);
colocations.push(vec).ok()?;
}
let mut total_capacity = 0;
for i in 0..colocations.len() {
total_capacity += colocations[i].capacity();
}
Some(Self {
colocations,
total_length: 0,
total_capacity,
})
}
pub fn with_capacity_per_colocation(capacity_per_colocation: usize) -> Option<Self> {
let colocations_count = count_colocations();
if colocations_count == 0 {
return None;
}
let container_allocator = PinnedAllocator::new(0)?;
let mut colocations = PinnedVec::with_capacity_in(container_allocator, colocations_count)?;
for colocation_index in 0..colocations_count {
let allocator = PinnedAllocator::new(colocation_index)?;
let vec = PinnedVec::with_capacity_in(allocator, capacity_per_colocation)?;
colocations.push(vec).ok()?;
}
let mut total_capacity = 0;
for i in 0..colocations.len() {
total_capacity += colocations[i].capacity();
}
Some(Self {
colocations,
total_length: 0,
total_capacity,
})
}
pub fn colocations_count(&self) -> usize {
self.colocations.len()
}
pub fn len_at(&self, colocation_index: usize) -> usize {
self.colocations[colocation_index].len()
}
pub fn capacity_at(&self, colocation_index: usize) -> usize {
self.colocations[colocation_index].capacity()
}
pub fn len(&self) -> usize {
self.total_length
}
pub fn capacity(&self) -> usize {
self.total_capacity
}
pub fn is_empty(&self) -> bool {
self.total_length == 0
}
pub fn get_colocation(&self, colocation_index: usize) -> Option<&PinnedVec<T>> {
self.colocations.get(colocation_index)
}
pub fn get_colocation_mut(&mut self, colocation_index: usize) -> Option<&mut PinnedVec<T>> {
self.colocations.get_mut(colocation_index)
}
pub fn get(&self, index: usize) -> Option<&T> {
if self.colocations.is_empty() {
return None;
}
let colocations_count = self.colocations.len();
let colocation_index = index % colocations_count;
let local_index = index / colocations_count;
self.colocations.get(colocation_index)?.get(local_index)
}
pub fn get_mut(&mut self, index: usize) -> Option<&mut T> {
if self.colocations.is_empty() {
return None;
}
let colocations_count = self.colocations.len();
let colocation_index = index % colocations_count;
let local_index = index / colocations_count;
self.colocations
.get_mut(colocation_index)?
.get_mut(local_index)
}
pub fn push(&mut self, value: T) -> Result<(), &'static str> {
if self.colocations.is_empty() {
return Err("No NUMA nodes available");
}
let target_colocation = self.total_length % self.colocations.len();
let result = self.colocations[target_colocation].push(value);
if result.is_ok() {
self.total_length += 1;
}
result
}
pub fn par_iter(&self) -> ParallelRoundRobin<'_, T>
where
T: Sync,
{
ParallelRoundRobin::new(self)
}
pub fn par_iter_mut(&mut self) -> ParallelRoundRobinMut<'_, T>
where
T: Send,
{
ParallelRoundRobinMut::new(self)
}
pub fn par_for_each<S, F>(&self, pool: &mut ThreadPool, schedule: S, function: F)
where
T: Sync,
S: ParallelSchedule,
F: Fn(&T, Prong) + Sync,
{
self.par_iter().drive(pool, schedule, &function);
}
pub fn par_for_each_mut<S, F>(&mut self, pool: &mut ThreadPool, schedule: S, function: F)
where
T: Send,
S: ParallelSchedule,
F: Fn(&mut T, Prong) + Sync,
{
self.par_iter_mut().drive(pool, schedule, &function);
}
pub fn pop(&mut self) -> Option<T> {
if self.total_length == 0 {
return None;
}
let target_colocation = (self.total_length - 1) % self.colocations.len();
let result = self.colocations[target_colocation].pop();
if result.is_some() {
self.total_length -= 1;
}
result
}
pub fn local_to_global_index(&self, colocation_index: usize, local_index: usize) -> usize {
local_index * self.colocations_count() + colocation_index
}
pub fn global_to_local_index(&self, global_index: usize) -> (usize, usize) {
let colocations_count = self.colocations_count();
let colocation_index = global_index % colocations_count;
let local_index = global_index / colocations_count;
(colocation_index, local_index)
}
pub fn fill(&mut self, value: T, pool: &mut ThreadPool)
where
T: Clone + Send + Sync,
{
let colocations_count = self.colocations_count();
let safe_ptr = SafePtr(self.colocations.as_mut_ptr());
let pool_ptr = SafePtr(pool as *const ThreadPool as *mut ThreadPool);
pool.for_threads(move |thread_index, colocation_index| {
if colocation_index < colocations_count {
let node_vec = safe_ptr.get_mut_at(colocation_index);
let pool = pool_ptr.get_mut();
let threads_in_colocation = pool.count_threads_in(colocation_index);
let thread_local_index = pool.locate_thread_in(thread_index, colocation_index);
let split = IndexedSplit::new(node_vec.len(), threads_in_colocation);
let range = split.get(thread_local_index);
for idx in range {
if let Some(element) = node_vec.get_mut(idx) {
*element = value.clone();
}
}
}
});
}
pub fn fill_with<F>(&mut self, mut f: F, pool: &mut ThreadPool)
where
F: FnMut() -> T + Send + Sync,
T: Send + Sync,
{
let colocations_count = self.colocations_count();
let safe_ptr = SafePtr(self.colocations.as_mut_ptr());
let f_ptr = SafePtr(&mut f as *mut F);
let pool_ptr = SafePtr(pool as *const ThreadPool as *mut ThreadPool);
pool.for_threads(move |thread_index, colocation_index| {
if colocation_index < colocations_count {
let node_vec = safe_ptr.get_mut_at(colocation_index);
let f_ref = f_ptr.get_mut();
let pool = pool_ptr.get_mut();
let threads_in_colocation = pool.count_threads_in(colocation_index);
let thread_local_index = pool.locate_thread_in(thread_index, colocation_index);
let split = IndexedSplit::new(node_vec.len(), threads_in_colocation);
let range = split.get(thread_local_index);
for idx in range {
if let Some(element) = node_vec.get_mut(idx) {
*element = f_ref();
}
}
}
});
}
pub fn clear(&mut self, pool: &mut ThreadPool) {
let colocations_count = self.colocations_count();
let safe_ptr = SafePtr(self.colocations.as_mut_ptr());
let pool_ptr = SafePtr(pool as *const ThreadPool as *mut ThreadPool);
pool.for_threads(move |thread_index, colocation_index| {
if colocation_index < colocations_count {
let node_vec = safe_ptr.get_mut_at(colocation_index);
let pool = pool_ptr.get_mut();
let threads_in_colocation = pool.count_threads_in(colocation_index);
let thread_local_index = pool.locate_thread_in(thread_index, colocation_index);
let split = IndexedSplit::new(node_vec.len(), threads_in_colocation);
let range = split.get(thread_local_index);
unsafe {
let ptr = node_vec.as_mut_ptr();
for idx in range {
core::ptr::drop_in_place(ptr.add(idx));
}
}
}
});
for i in 0..self.colocations.len() {
self.colocations[i].len = 0;
}
self.total_length = 0;
}
pub fn resize(
&mut self,
new_len: usize,
value: T,
pool: &mut ThreadPool,
) -> Result<(), &'static str>
where
T: Clone + Send + Sync,
{
let colocations_count = self.colocations_count();
if colocations_count == 0 {
return Err("No NUMA nodes available");
}
let elements_per_node = new_len / colocations_count;
let extra_elements = new_len % colocations_count;
for i in 0..colocations_count {
let node_len = if i < extra_elements {
elements_per_node + 1
} else {
elements_per_node
};
let current_len = self.colocations[i].len();
if node_len > current_len {
self.colocations[i].reserve(node_len - current_len)?;
}
}
let safe_ptr = SafePtr(self.colocations.as_mut_ptr());
let pool_ptr = SafePtr(pool as *const ThreadPool as *mut ThreadPool);
pool.for_threads(move |thread_index, colocation_index| {
if colocation_index < colocations_count {
let node_vec = safe_ptr.get_mut_at(colocation_index);
let pool = pool_ptr.get_mut();
let node_len = if colocation_index < extra_elements {
elements_per_node + 1
} else {
elements_per_node
};
let current_len = node_vec.len();
let threads_in_colocation = pool.count_threads_in(colocation_index);
let thread_local_index = pool.locate_thread_in(thread_index, colocation_index);
match node_len.cmp(¤t_len) {
std::cmp::Ordering::Greater => {
let new_elements = node_len - current_len;
let split = IndexedSplit::new(new_elements, threads_in_colocation);
let range = split.get(thread_local_index);
unsafe {
let ptr = node_vec.as_mut_ptr();
for i in range {
let idx = current_len + i;
core::ptr::write(ptr.add(idx), value.clone());
}
}
}
std::cmp::Ordering::Less => {
let elements_to_drop = current_len - node_len;
let split = IndexedSplit::new(elements_to_drop, threads_in_colocation);
let range = split.get(thread_local_index);
unsafe {
let ptr = node_vec.as_mut_ptr();
for i in range {
let idx = node_len + i;
core::ptr::drop_in_place(ptr.add(idx));
}
}
}
std::cmp::Ordering::Equal => {
}
}
}
});
for i in 0..colocations_count {
let node_len = if i < extra_elements {
elements_per_node + 1
} else {
elements_per_node
};
self.colocations[i].len = node_len;
}
self.total_length = new_len;
self.total_capacity = self.capacity(); Ok(())
}
}
pub struct SafePtr<T>(*mut T);
unsafe impl<T> Send for SafePtr<T> {}
unsafe impl<T> Sync for SafePtr<T> {}
impl<T> SafePtr<T> {
pub fn new(ptr: *mut T) -> Self {
SafePtr(ptr)
}
#[allow(clippy::mut_from_ref)]
pub fn get_mut_at(&self, index: usize) -> &mut T {
unsafe { &mut *self.0.add(index) }
}
#[allow(clippy::mut_from_ref)]
pub fn get_mut(&self) -> &mut T {
unsafe { &mut *self.0 }
}
}
unsafe impl<T: Send> Send for RoundRobinVec<T> {}
unsafe impl<T: Sync> Sync for RoundRobinVec<T> {}
#[derive(Clone, Copy, Debug)]
pub struct SyncConstPtr<T> {
ptr: *const T,
}
impl<T> SyncConstPtr<T> {
pub fn new(ptr: *const T) -> Self {
Self { ptr }
}
pub unsafe fn get(&self, index: usize) -> &T {
&*self.ptr.add(index)
}
pub fn as_ptr(&self) -> *const T {
self.ptr
}
}
unsafe impl<T> Send for SyncConstPtr<T> {}
unsafe impl<T> Sync for SyncConstPtr<T> {}
#[derive(Clone, Copy)]
pub struct SyncMutPtr<T> {
ptr: *mut T,
_marker: PhantomData<T>,
}
impl<T> SyncMutPtr<T> {
pub const fn new(ptr: *mut T) -> Self {
Self {
ptr,
_marker: PhantomData,
}
}
pub unsafe fn get(&self, index: usize) -> *mut T {
self.ptr.add(index)
}
pub fn as_ptr(&self) -> *mut T {
self.ptr
}
}
unsafe impl<T> Send for SyncMutPtr<T> {}
unsafe impl<T> Sync for SyncMutPtr<T> {}
#[derive(Clone, Copy, Debug)]
pub struct StaticScheduler;
#[derive(Clone, Copy, Debug)]
pub struct DynamicScheduler;
pub trait ParallelSchedule: Copy {
fn dispatch<F>(&self, pool: &mut ThreadPool, tasks: usize, function: F)
where
F: Fn(Prong) + Sync;
fn dispatch_slices<F>(&self, pool: &mut ThreadPool, tasks: usize, function: F)
where
F: Fn(Prong, usize) + Sync,
{
self.dispatch(pool, tasks, move |prong| {
function(prong, 1);
});
}
}
impl ParallelSchedule for StaticScheduler {
fn dispatch<F>(&self, pool: &mut ThreadPool, tasks: usize, function: F)
where
F: Fn(Prong) + Sync,
{
if tasks == 0 {
return;
}
let function_ptr = SyncConstPtr::new(&function as *const F);
let _operation = pool.for_n(tasks, move |prong| {
let func = unsafe { &*function_ptr.as_ptr() };
func(prong);
});
}
fn dispatch_slices<F>(&self, pool: &mut ThreadPool, tasks: usize, function: F)
where
F: Fn(Prong, usize) + Sync,
{
if tasks == 0 {
return;
}
let function_ptr = SyncConstPtr::new(&function as *const F);
let _operation = pool.for_slices(tasks, move |prong, count| {
let func = unsafe { &*function_ptr.as_ptr() };
func(prong, count);
});
}
}
impl ParallelSchedule for DynamicScheduler {
fn dispatch<F>(&self, pool: &mut ThreadPool, tasks: usize, function: F)
where
F: Fn(Prong) + Sync,
{
if tasks == 0 {
return;
}
let function_ptr = SyncConstPtr::new(&function as *const F);
let _operation = pool.for_n_dynamic(tasks, move |prong| {
let func = unsafe { &*function_ptr.as_ptr() };
func(prong);
});
}
}
pub trait ParallelIterator: Sized {
type Item;
fn len(&self) -> usize;
fn is_empty(&self) -> bool {
self.len() == 0
}
fn drive<S, F>(self, pool: &mut ThreadPool, schedule: S, consumer: &F)
where
S: ParallelSchedule,
F: Fn(Self::Item, Prong) + Sync;
fn drive_static<F>(self, pool: &mut ThreadPool, consumer: F)
where
F: Fn(Self::Item, Prong) + Sync,
{
self.drive(pool, StaticScheduler, &consumer);
}
fn drive_dynamic<F>(self, pool: &mut ThreadPool, consumer: F)
where
F: Fn(Self::Item, Prong) + Sync,
{
self.drive(pool, DynamicScheduler, &consumer);
}
fn map<M, U>(self, mapper: M) -> Map<Self, M>
where
M: Fn(Self::Item) -> U + Sync,
{
Map { base: self, mapper }
}
fn filter<P>(self, predicate: P) -> Filter<Self, P>
where
P: Fn(&Self::Item) -> bool + Sync,
{
Filter {
base: self,
predicate,
}
}
}
pub trait ParallelIteratorExt: ParallelIterator + Sized {
fn with_pool<'pool>(
self,
pool: &'pool mut ThreadPool,
) -> ParallelRunner<'pool, Self, StaticScheduler> {
ParallelRunner {
pool,
iterator: self,
schedule: StaticScheduler,
}
}
fn with_schedule<'pool, S>(
self,
pool: &'pool mut ThreadPool,
schedule: S,
) -> ParallelRunner<'pool, Self, S>
where
S: ParallelSchedule,
{
ParallelRunner {
pool,
iterator: self,
schedule,
}
}
}
impl<I: ParallelIterator> ParallelIteratorExt for I {}
pub struct ParallelRunner<'pool, I, S> {
pool: &'pool mut ThreadPool,
iterator: I,
schedule: S,
}
impl<'pool, I, S> ParallelRunner<'pool, I, S>
where
I: ParallelIterator,
S: ParallelSchedule,
{
pub fn for_each<F>(self, function: F)
where
F: Fn(I::Item) + Sync,
{
let ParallelRunner {
pool,
iterator,
schedule,
} = self;
let function_ptr = SyncConstPtr::new(&function as *const F);
iterator.drive(pool, schedule, &move |item, _| {
let func = unsafe { &*function_ptr.as_ptr() };
func(item);
});
}
pub fn for_each_with_prong<F>(self, function: F)
where
F: Fn(I::Item, Prong) + Sync,
{
let ParallelRunner {
pool,
iterator,
schedule,
} = self;
let function_ptr = SyncConstPtr::new(&function as *const F);
iterator.drive(pool, schedule, &move |item, prong| {
let func = unsafe { &*function_ptr.as_ptr() };
func(item, prong);
});
}
pub fn fold_with_scratch<T, F>(self, scratch: &mut [T], fold: F)
where
T: Send,
F: Fn(&mut T, I::Item, Prong) + Sync,
{
let ParallelRunner {
pool,
iterator,
schedule,
} = self;
fold_with_scratch(pool, iterator, schedule, scratch, fold);
}
pub fn with_schedule<S2>(self, schedule: S2) -> ParallelRunner<'pool, I, S2>
where
S2: ParallelSchedule,
{
let ParallelRunner { pool, iterator, .. } = self;
ParallelRunner {
pool,
iterator,
schedule,
}
}
}
pub trait IntoParallelIterator {
type Item;
type Iter: ParallelIterator<Item = Self::Item>;
fn into_par_iter(self) -> Self::Iter;
}
impl<'a, T> IntoParallelIterator for &'a [T]
where
T: Sync,
{
type Item = &'a T;
type Iter = ParallelSlice<'a, T>;
fn into_par_iter(self) -> Self::Iter {
ParallelSlice::new(self)
}
}
impl<'a, T> IntoParallelIterator for &'a mut [T]
where
T: Send,
{
type Item = &'a mut T;
type Iter = ParallelSliceMut<'a, T>;
fn into_par_iter(self) -> Self::Iter {
ParallelSliceMut::new(self)
}
}
impl IntoParallelIterator for core::ops::Range<usize> {
type Item = usize;
type Iter = ParallelRange;
fn into_par_iter(self) -> Self::Iter {
ParallelRange::new(self)
}
}
impl<'a, T> IntoParallelIterator for &'a RoundRobinVec<T>
where
T: Sync,
{
type Item = &'a T;
type Iter = ParallelRoundRobin<'a, T>;
fn into_par_iter(self) -> Self::Iter {
ParallelRoundRobin::new(self)
}
}
impl<'a, T> IntoParallelIterator for &'a mut RoundRobinVec<T>
where
T: Send,
{
type Item = &'a mut T;
type Iter = ParallelRoundRobinMut<'a, T>;
fn into_par_iter(self) -> Self::Iter {
ParallelRoundRobinMut::new(self)
}
}
pub struct Map<I, M> {
base: I,
mapper: M,
}
impl<I, M, U> ParallelIterator for Map<I, M>
where
I: ParallelIterator,
M: Fn(I::Item) -> U + Sync,
{
type Item = U;
fn len(&self) -> usize {
self.base.len()
}
fn drive<S, F>(self, pool: &mut ThreadPool, schedule: S, consumer: &F)
where
S: ParallelSchedule,
F: Fn(Self::Item, Prong) + Sync,
{
let Map { base, mapper } = self;
let mapper_ptr = SyncConstPtr::new(&mapper as *const M);
let consumer_ptr = SyncConstPtr::new(consumer as *const F);
let mapped = move |item: I::Item, prong: Prong| {
let mp = unsafe { &*mapper_ptr.as_ptr() };
let next = mp(item);
let consumer_ref = unsafe { &*consumer_ptr.as_ptr() };
consumer_ref(next, prong);
};
base.drive(pool, schedule, &mapped);
}
}
pub struct Filter<I, P> {
base: I,
predicate: P,
}
impl<I, P> ParallelIterator for Filter<I, P>
where
I: ParallelIterator,
P: Fn(&I::Item) -> bool + Sync,
{
type Item = I::Item;
fn len(&self) -> usize {
self.base.len()
}
fn drive<S, F>(self, pool: &mut ThreadPool, schedule: S, consumer: &F)
where
S: ParallelSchedule,
F: Fn(Self::Item, Prong) + Sync,
{
let Filter { base, predicate } = self;
let predicate_ptr = SyncConstPtr::new(&predicate as *const P);
let consumer_ptr = SyncConstPtr::new(consumer as *const F);
let filtered = move |item: I::Item, prong: Prong| {
let pred = unsafe { &*predicate_ptr.as_ptr() };
if pred(&item) {
let consumer_ref = unsafe { &*consumer_ptr.as_ptr() };
consumer_ref(item, prong);
}
};
base.drive(pool, schedule, &filtered);
}
}
#[derive(Clone, Copy)]
pub struct ParallelSlice<'a, T> {
data: &'a [T],
}
impl<'a, T> ParallelSlice<'a, T> {
pub fn new(data: &'a [T]) -> Self {
Self { data }
}
pub fn for_each_static<F>(self, pool: &mut ThreadPool, function: F)
where
T: Sync,
F: Fn(&'a T, Prong) + Sync,
{
self.drive_static(pool, function);
}
pub fn for_each_dynamic<F>(self, pool: &mut ThreadPool, function: F)
where
T: Sync,
F: Fn(&'a T, Prong) + Sync,
{
self.drive_dynamic(pool, function);
}
pub fn zip<'b, U>(self, other: ParallelSlice<'b, U>) -> ParallelSliceZip<'a, 'b, T, U> {
ParallelSliceZip {
left: self,
right: other,
}
}
}
impl<'a, T> ParallelIterator for ParallelSlice<'a, T>
where
T: Sync,
{
type Item = &'a T;
fn len(&self) -> usize {
self.data.len()
}
fn drive<S, F>(self, pool: &mut ThreadPool, schedule: S, consumer: &F)
where
S: ParallelSchedule,
F: Fn(Self::Item, Prong) + Sync,
{
if self.data.is_empty() {
return;
}
let slice = self.data;
let consumer_ptr = SyncConstPtr::new(consumer as *const F);
schedule.dispatch(pool, slice.len(), move |prong| {
let item = unsafe { slice.get_unchecked(prong.task_index) };
let func = unsafe { &*consumer_ptr.as_ptr() };
func(item, prong);
});
}
}
pub struct ParallelSliceZip<'a, 'b, T, U> {
left: ParallelSlice<'a, T>,
right: ParallelSlice<'b, U>,
}
impl<'a, 'b, T, U> ParallelIterator for ParallelSliceZip<'a, 'b, T, U>
where
T: Sync,
U: Sync,
{
type Item = (&'a T, &'b U);
fn len(&self) -> usize {
let len = self.left.data.len();
debug_assert_eq!(len, self.right.data.len());
len
}
fn drive<S, F>(self, pool: &mut ThreadPool, schedule: S, consumer: &F)
where
S: ParallelSchedule,
F: Fn(Self::Item, Prong) + Sync,
{
let len = self.left.data.len();
assert_eq!(len, self.right.data.len(), "zip requires equal lengths");
if len == 0 {
return;
}
let left = self.left.data;
let right = self.right.data;
let consumer_ptr = SyncConstPtr::new(consumer as *const F);
schedule.dispatch(pool, len, move |prong| {
let lhs = unsafe { left.get_unchecked(prong.task_index) };
let rhs = unsafe { right.get_unchecked(prong.task_index) };
let func = unsafe { &*consumer_ptr.as_ptr() };
func((lhs, rhs), prong);
});
}
}
pub struct ParallelSliceMut<'a, T> {
ptr: SyncMutPtr<T>,
len: usize,
_marker: PhantomData<&'a mut [T]>,
}
impl<'a, T> ParallelSliceMut<'a, T> {
pub fn new(data: &'a mut [T]) -> Self {
Self {
ptr: SyncMutPtr::new(data.as_mut_ptr()),
len: data.len(),
_marker: PhantomData,
}
}
pub fn for_each_static<F>(self, pool: &mut ThreadPool, function: F)
where
T: Send,
F: Fn(&'a mut T, Prong) + Sync,
{
self.drive_static(pool, function);
}
pub fn for_each_dynamic<F>(self, pool: &mut ThreadPool, function: F)
where
T: Send,
F: Fn(&'a mut T, Prong) + Sync,
{
self.drive_dynamic(pool, function);
}
}
impl<'a, T> ParallelIterator for ParallelSliceMut<'a, T>
where
T: Send,
{
type Item = &'a mut T;
fn len(&self) -> usize {
self.len
}
fn drive<S, F>(self, pool: &mut ThreadPool, schedule: S, consumer: &F)
where
S: ParallelSchedule,
F: Fn(Self::Item, Prong) + Sync,
{
if self.len == 0 {
return;
}
let ptr = self.ptr;
let consumer_ptr = SyncConstPtr::new(consumer as *const F);
schedule.dispatch(pool, self.len, move |prong| {
let raw = unsafe { ptr.get(prong.task_index) };
let item = unsafe { &mut *raw };
let func = unsafe { &*consumer_ptr.as_ptr() };
func(item, prong);
});
}
}
#[derive(Clone)]
pub struct ParallelRange {
range: core::ops::Range<usize>,
}
impl ParallelRange {
pub fn new(range: core::ops::Range<usize>) -> Self {
Self { range }
}
}
impl ParallelIterator for ParallelRange {
type Item = usize;
fn len(&self) -> usize {
self.range.len()
}
fn drive<S, F>(self, pool: &mut ThreadPool, schedule: S, consumer: &F)
where
S: ParallelSchedule,
F: Fn(Self::Item, Prong) + Sync,
{
let len = self.range.len();
if len == 0 {
return;
}
let start = self.range.start;
let consumer_ptr = SyncConstPtr::new(consumer as *const F);
schedule.dispatch(pool, len, move |mut prong| {
let index = start + prong.task_index;
prong.task_index = index;
let func = unsafe { &*consumer_ptr.as_ptr() };
func(index, prong);
});
}
}
pub struct ParallelExactIter<T, I> {
len: usize,
indexer: I,
_marker: PhantomData<T>,
}
impl<T, I> ParallelExactIter<T, I>
where
I: Fn(usize) -> T + Sync,
{
pub fn new(len: usize, indexer: I) -> Self {
Self {
len,
indexer,
_marker: PhantomData,
}
}
}
impl<T, I> ParallelIterator for ParallelExactIter<T, I>
where
T: Send,
I: Fn(usize) -> T + Sync,
{
type Item = T;
fn len(&self) -> usize {
self.len
}
fn drive<S, F>(self, pool: &mut ThreadPool, schedule: S, consumer: &F)
where
S: ParallelSchedule,
F: Fn(Self::Item, Prong) + Sync,
{
let ParallelExactIter { len, indexer, .. } = self;
if len == 0 {
return;
}
let indexer_ptr = SyncConstPtr::new(&indexer as *const I);
let consumer_ptr = SyncConstPtr::new(consumer as *const F);
schedule.dispatch_slices(pool, len, move |mut prong, count| {
let mut current = prong.task_index;
let idx_fn = unsafe { &*indexer_ptr.as_ptr() };
let consumer_ref = unsafe { &*consumer_ptr.as_ptr() };
for _ in 0..count {
prong.task_index = current;
let value = idx_fn(current);
consumer_ref(value, prong);
current += 1;
}
});
}
}
pub struct ParallelRoundRobin<'a, T> {
colocations_ptr: SyncConstPtr<PinnedVec<T>>,
colocations_len: usize,
total_len: usize,
_marker: PhantomData<&'a [T]>,
}
impl<'a, T> ParallelRoundRobin<'a, T> {
fn new(vec: &'a RoundRobinVec<T>) -> Self {
let slice = vec.colocations.as_slice();
Self {
colocations_ptr: SyncConstPtr::new(slice.as_ptr()),
colocations_len: slice.len(),
total_len: vec.total_length,
_marker: PhantomData,
}
}
}
impl<'a, T> ParallelIterator for ParallelRoundRobin<'a, T>
where
T: Sync,
{
type Item = &'a T;
fn len(&self) -> usize {
self.total_len
}
fn drive<S, F>(self, pool: &mut ThreadPool, schedule: S, consumer: &F)
where
S: ParallelSchedule,
F: Fn(Self::Item, Prong) + Sync,
{
if self.total_len == 0 || self.colocations_len == 0 {
return;
}
let colocations_ptr = self.colocations_ptr;
let colocations_len = self.colocations_len;
let consumer_ptr = SyncConstPtr::new(consumer as *const F);
schedule.dispatch(pool, self.total_len, move |prong| {
let index = prong.task_index;
let colocation_index = index % colocations_len;
let local_index = index / colocations_len;
let base = colocations_ptr.as_ptr();
let colocation = unsafe { &*base.add(colocation_index) };
let slice = colocation.as_slice();
let item = unsafe { slice.get_unchecked(local_index) };
let func = unsafe { &*consumer_ptr.as_ptr() };
func(item, prong);
});
}
}
pub struct ParallelRoundRobinMut<'a, T> {
colocations_ptr: SyncConstPtr<PinnedVec<T>>,
colocations_len: usize,
total_len: usize,
_marker: PhantomData<&'a mut [T]>,
}
impl<'a, T> ParallelRoundRobinMut<'a, T> {
fn new(vec: &'a mut RoundRobinVec<T>) -> Self {
let slice = vec.colocations.as_slice();
Self {
colocations_ptr: SyncConstPtr::new(slice.as_ptr()),
colocations_len: slice.len(),
total_len: vec.total_length,
_marker: PhantomData,
}
}
}
impl<'a, T> ParallelIterator for ParallelRoundRobinMut<'a, T>
where
T: Send,
{
type Item = &'a mut T;
fn len(&self) -> usize {
self.total_len
}
fn drive<S, F>(self, pool: &mut ThreadPool, schedule: S, consumer: &F)
where
S: ParallelSchedule,
F: Fn(Self::Item, Prong) + Sync,
{
if self.total_len == 0 || self.colocations_len == 0 {
return;
}
let colocations_ptr = self.colocations_ptr;
let colocations_len = self.colocations_len;
let consumer_ptr = SyncConstPtr::new(consumer as *const F);
schedule.dispatch(pool, self.total_len, move |prong| {
let index = prong.task_index;
let colocation_index = index % colocations_len;
let local_index = index / colocations_len;
let base = colocations_ptr.as_ptr();
let colocation = unsafe { &*base.add(colocation_index) };
let base_ptr = colocation.sync_ptr();
let raw = unsafe { base_ptr.get(local_index) };
let item = unsafe { &mut *raw };
let func = unsafe { &*consumer_ptr.as_ptr() };
func(item, prong);
});
}
}
pub fn fold_with_scratch<I, S, T, F>(
pool: &mut ThreadPool,
iterator: I,
schedule: S,
scratch: &mut [T],
fold: F,
) where
I: ParallelIterator,
S: ParallelSchedule,
T: Send,
F: Fn(&mut T, I::Item, Prong) + Sync,
{
let scratch_len = scratch.len();
assert!(
scratch_len >= pool.threads(),
"scratch space must cover all threads"
);
let scratch_ptr = SyncMutPtr::new(scratch.as_mut_ptr());
iterator.drive(pool, schedule, &move |item, prong| {
debug_assert!(prong.thread_index < scratch_len);
let slot = unsafe { &mut *scratch_ptr.get(prong.thread_index) };
fold(slot, item, prong);
});
}
pub mod prelude {
pub use super::{
DynamicScheduler, IntoParallelIterator, ParallelIterator, ParallelIteratorExt,
ParallelRunner, StaticScheduler,
};
}
pub struct ForThreadsOperation<'a, F>
where
F: Fn(usize, usize) + Sync,
{
pool: &'a mut ThreadPool,
function: F,
did_broadcast: bool,
did_join: bool,
}
impl<'a, F> ForThreadsOperation<'a, F>
where
F: Fn(usize, usize) + Sync,
{
pub(crate) fn new(pool: &'a mut ThreadPool, function: F) -> Self {
Self {
pool,
function,
did_broadcast: false,
did_join: false,
}
}
pub fn broadcast(&mut self) {
if self.did_broadcast {
return; }
extern "C" fn trampoline<F>(ctx: *mut c_void, thread_index: usize, colocation_index: usize)
where
F: Fn(usize, usize) + Sync,
{
let f = unsafe { &*(ctx as *const F) };
f(thread_index, colocation_index);
}
unsafe {
let ctx = &self.function as *const F as *mut c_void;
fu_pool_unsafe_for_threads(self.pool.inner, trampoline::<F>, ctx);
self.did_broadcast = true;
}
}
pub fn join(&mut self) {
if !self.did_broadcast {
self.broadcast();
}
if self.did_join {
return; }
unsafe {
fu_pool_unsafe_join(self.pool.inner);
self.did_join = true;
}
}
}
impl<'a, F> Drop for ForThreadsOperation<'a, F>
where
F: Fn(usize, usize) + Sync,
{
fn drop(&mut self) {
self.join();
}
}
pub struct ForNOperation<'a, F>
where
F: Fn(Prong) + Sync,
{
pool: &'a mut ThreadPool,
n: usize,
function: F,
}
impl<'a, F> Drop for ForNOperation<'a, F>
where
F: Fn(Prong) + Sync,
{
fn drop(&mut self) {
extern "C" fn trampoline<F>(
ctx: *mut c_void,
task_index: usize,
thread_index: usize,
colocation_index: usize,
) where
F: Fn(Prong) + Sync,
{
let f = unsafe { &*(ctx as *const F) };
f(Prong {
task_index,
thread_index,
colocation_index,
});
}
unsafe {
let ctx = &self.function as *const F as *mut c_void;
fu_pool_for_n(self.pool.inner, self.n, trampoline::<F>, ctx);
}
}
}
pub struct ForNDynamicOperation<'a, F>
where
F: Fn(Prong) + Sync,
{
pool: &'a mut ThreadPool,
n: usize,
function: F,
}
impl<'a, F> Drop for ForNDynamicOperation<'a, F>
where
F: Fn(Prong) + Sync,
{
fn drop(&mut self) {
extern "C" fn trampoline<F>(
ctx: *mut c_void,
task_index: usize,
thread_index: usize,
colocation_index: usize,
) where
F: Fn(Prong) + Sync,
{
let f = unsafe { &*(ctx as *const F) };
f(Prong {
task_index,
thread_index,
colocation_index,
});
}
unsafe {
let ctx = &self.function as *const F as *mut c_void;
fu_pool_for_n_dynamic(self.pool.inner, self.n, trampoline::<F>, ctx);
}
}
}
pub struct ForSlicesOperation<'a, F>
where
F: Fn(Prong, usize) + Sync,
{
pool: &'a mut ThreadPool,
n: usize,
function: F,
}
impl<'a, F> Drop for ForSlicesOperation<'a, F>
where
F: Fn(Prong, usize) + Sync,
{
fn drop(&mut self) {
extern "C" fn trampoline<F>(
ctx: *mut c_void,
first_index: usize,
count: usize,
thread_index: usize,
colocation_index: usize,
) where
F: Fn(Prong, usize) + Sync,
{
let f = unsafe { &*(ctx as *const F) };
f(
Prong {
task_index: first_index,
thread_index,
colocation_index,
},
count,
);
}
unsafe {
let ctx = &self.function as *const F as *mut c_void;
fu_pool_for_slices(self.pool.inner, self.n, trampoline::<F>, ctx);
}
}
}
pub fn spawn(threads: usize) -> ThreadPool {
ThreadPool::try_spawn(threads).expect("Failed to spawn ThreadPool")
}
pub fn named_spawn(name: &str, threads: usize) -> ThreadPool {
ThreadPool::try_named_spawn(name, threads).expect("Failed to spawn named ThreadPool")
}
pub fn for_n<F>(pool: &mut ThreadPool, n: usize, function: F)
where
F: Fn(Prong) + Sync,
{
let _operation = pool.for_n(n, function);
}
pub fn for_n_dynamic<F>(pool: &mut ThreadPool, n: usize, function: F)
where
F: Fn(Prong) + Sync,
{
let _operation = pool.for_n_dynamic(n, function);
}
pub fn for_slices<F>(pool: &mut ThreadPool, n: usize, function: F)
where
F: Fn(Prong, usize) + Sync,
{
let _operation = pool.for_slices(n, function);
}
pub fn for_each_prong_mut<T, F>(pool: &mut ThreadPool, data: &mut [T], function: F)
where
T: Send + Sync,
F: Fn(&mut T, Prong) + Sync + Send,
{
let ptr = SyncMutPtr::new(data.as_mut_ptr());
let n = data.len();
let _operation = pool.for_n(n, move |prong| {
let item = unsafe { &mut *ptr.get(prong.task_index) };
function(item, prong);
});
}
pub fn for_each_prong_mut_dynamic<T, F>(pool: &mut ThreadPool, data: &mut [T], function: F)
where
T: Send + Sync,
F: Fn(&mut T, Prong) + Sync + Send,
{
let ptr = SyncMutPtr::new(data.as_mut_ptr());
let n = data.len();
let _operation = pool.for_n_dynamic(n, move |prong| {
let item = unsafe { &mut *ptr.get(prong.task_index) };
function(item, prong);
});
}
#[derive(Debug, Clone)]
pub struct IndexedSplit {
quotient: usize,
remainder: usize,
}
impl IndexedSplit {
pub fn new(tasks_count: usize, threads_count: usize) -> Self {
assert!(threads_count > 0, "Threads count must be greater than zero");
Self {
quotient: tasks_count / threads_count,
remainder: tasks_count % threads_count,
}
}
pub fn get(&self, thread_index: usize) -> core::ops::Range<usize> {
let begin = self.quotient * thread_index + thread_index.min(self.remainder);
let count = self.quotient + if thread_index < self.remainder { 1 } else { 0 };
begin..(begin + count)
}
}
#[cfg(test)]
#[cfg(feature = "std")]
mod tests {
use super::*;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use std::vec;
use std::vec::Vec;
#[inline]
fn hw_threads() -> usize {
count_logical_cores().max(1)
}
#[test]
fn capabilities() {
let caps = capabilities_string();
std::println!("Capabilities: {caps:?}");
assert!(caps.is_some());
}
#[test]
fn system_info() {
let cores = count_logical_cores();
let numa = count_numa_nodes();
let colocations = count_colocations();
let qos = count_quality_levels();
std::println!("Cores: {cores}, NUMA: {numa}, Colocations: {colocations}, QoS: {qos}");
assert!(cores > 0);
}
#[test]
fn spawn_and_basic_info() {
let pool = spawn(2);
assert_eq!(pool.threads(), 2);
assert!(pool.colocations() > 0);
}
#[test]
fn for_threads_dispatch() {
let count_threads = hw_threads();
let mut pool = spawn(count_threads);
let visited: Arc<Vec<AtomicBool>> =
Arc::new((0..count_threads).map(|_| AtomicBool::new(false)).collect());
let visited_ref = Arc::clone(&visited);
{
let _op = pool.for_threads(move |thread_index, _colocation| {
if thread_index < visited_ref.len() {
visited_ref[thread_index].store(true, Ordering::Relaxed);
}
});
}
for (i, flag) in visited.iter().enumerate() {
assert!(
flag.load(Ordering::Relaxed),
"thread {i} never reached the callback"
);
}
}
#[test]
fn for_n_static_scheduling() {
const EXPECTED_PARTS: usize = 1_000;
let mut pool = spawn(hw_threads());
let visited: Arc<Vec<AtomicBool>> = Arc::new(
(0..EXPECTED_PARTS)
.map(|_| AtomicBool::new(false))
.collect(),
);
let duplicate = Arc::new(AtomicBool::new(false));
let visited_ref = Arc::clone(&visited);
let duplicate_ref = Arc::clone(&duplicate);
for_n(&mut pool, EXPECTED_PARTS, move |prong| {
let task_index = prong.task_index;
if visited_ref[task_index].swap(true, Ordering::Relaxed) {
duplicate_ref.store(true, Ordering::Relaxed);
}
});
assert!(
!duplicate.load(Ordering::Relaxed),
"static scheduling produced duplicate task IDs"
);
for flag in visited.iter() {
assert!(flag.load(Ordering::Relaxed));
}
}
#[test]
fn for_n_dynamic_scheduling() {
const EXPECTED_PARTS: usize = 1_000;
let mut pool = spawn(hw_threads());
let visited: Arc<Vec<AtomicBool>> = Arc::new(
(0..EXPECTED_PARTS)
.map(|_| AtomicBool::new(false))
.collect(),
);
let duplicate = Arc::new(AtomicBool::new(false));
let visited_ref = Arc::clone(&visited);
let duplicate_ref = Arc::clone(&duplicate);
for_n_dynamic(&mut pool, EXPECTED_PARTS, move |prong| {
let task_index = prong.task_index;
if visited_ref[task_index].swap(true, Ordering::Relaxed) {
duplicate_ref.store(true, Ordering::Relaxed);
}
});
assert!(
!duplicate.load(Ordering::Relaxed),
"dynamic scheduling produced duplicate task IDs"
);
for flag in visited.iter() {
assert!(flag.load(Ordering::Relaxed));
}
}
#[test]
fn for_each_mut() {
const ELEMENTS: usize = 1000;
let mut pool = spawn(hw_threads());
let mut data = std::vec![0u64; ELEMENTS];
for_each_prong_mut(&mut pool, &mut data, |x, prong| {
*x = prong.task_index as u64 * 2;
});
for (i, &value) in data.iter().enumerate() {
assert_eq!(value, i as u64 * 2);
}
}
#[test]
fn closure_objects() {
let mut pool = spawn(hw_threads());
let counter = Arc::new(AtomicUsize::new(0));
let counter_ref = Arc::clone(&counter);
{
let _op = pool.for_n(1000, move |_prong| {
counter_ref.fetch_add(1, Ordering::Relaxed);
});
}
assert_eq!(counter.load(Ordering::Relaxed), 1000);
}
#[test]
fn explicit_broadcast_join() {
let mut pool = spawn(4);
let counter = Arc::new(AtomicUsize::new(0));
let counter_ref = Arc::clone(&counter);
let mut operation = pool.for_threads(move |_thread_index, _colocation| {
counter_ref.fetch_add(1, Ordering::Relaxed);
thread::sleep(Duration::from_millis(10)); });
operation.broadcast();
thread::sleep(Duration::from_millis(5));
operation.join();
assert_eq!(counter.load(Ordering::Relaxed), 4);
}
#[test]
fn join_without_explicit_broadcast() {
let mut pool = spawn(4);
let counter = Arc::new(AtomicUsize::new(0));
let counter_ref = Arc::clone(&counter);
let mut operation = pool.for_threads(move |_thread_index, _colocation| {
counter_ref.fetch_add(1, Ordering::Relaxed);
});
operation.join();
assert_eq!(counter.load(Ordering::Relaxed), 4);
}
#[test]
fn pinned_allocator_creation() {
let numa_count = count_numa_nodes();
assert!(numa_count > 0, "System should have at least one NUMA node");
let allocator = PinnedAllocator::new(0).expect("NUMA node 0 should be available");
assert_eq!(allocator.numa_node(), 0);
let invalid_allocator = PinnedAllocator::new(numa_count + 10);
assert!(
invalid_allocator.is_none(),
"Invalid NUMA node should return None"
);
}
#[test]
fn basic_allocation() {
let allocator = PinnedAllocator::new(0).expect("Failed to create alloc");
let allocation = allocator
.allocate(1024)
.expect("Failed to allocate 1024 bytes");
assert_eq!(allocation.allocated_bytes(), 1024);
assert_eq!(allocation.numa_node(), 0);
let slice = allocation.as_slice();
assert_eq!(slice.len(), 1024);
}
#[test]
fn allocate_zero_bytes() {
let allocator = PinnedAllocator::new(0).expect("Failed to create alloc");
let allocation = allocator.allocate(0);
assert!(
allocation.is_none(),
"Allocating 0 bytes should return None"
);
}
#[test]
fn allocate_at_least() {
let allocator = PinnedAllocator::new(0).expect("Failed to create alloc");
let allocation = allocator
.allocate_at_least(1000)
.expect("Failed to allocate at least 1000 bytes");
assert!(allocation.allocated_bytes() >= 1000);
assert_eq!(allocation.numa_node(), 0);
if allocation.bytes_per_page() > 0 {
assert!(allocation.bytes_per_page() >= 512); }
}
#[test]
fn pinned_vec_creation() {
let allocator = PinnedAllocator::new(0).expect("Failed to create alloc");
let vec = PinnedVec::<i32>::new_in(allocator);
assert_eq!(vec.len(), 0);
assert_eq!(vec.capacity(), 0);
assert_eq!(vec.numa_node(), 0);
assert!(vec.is_empty());
}
#[test]
fn pinned_vec_with_capacity() {
let allocator = PinnedAllocator::new(0).expect("Failed to create alloc");
let vec = PinnedVec::<i32>::with_capacity_in(allocator, 10).expect("Failed to create vec");
assert_eq!(vec.len(), 0);
assert_eq!(vec.capacity(), 10);
assert_eq!(vec.numa_node(), 0);
assert!(vec.is_empty());
}
#[test]
fn pinned_vec_push_pop() {
let allocator = PinnedAllocator::new(0).expect("Failed to create alloc");
let mut vec = PinnedVec::<i32>::new_in(allocator);
vec.push(42).expect("Failed to push");
assert_eq!(vec.len(), 1);
assert!(!vec.is_empty());
assert_eq!(vec[0], 42);
vec.push(100).expect("Failed to push");
assert_eq!(vec.len(), 2);
assert_eq!(vec[1], 100);
assert_eq!(vec.pop(), Some(100));
assert_eq!(vec.len(), 1);
assert_eq!(vec.pop(), Some(42));
assert_eq!(vec.len(), 0);
assert_eq!(vec.pop(), None);
}
#[test]
fn pinned_vec_indexing() {
let allocator = PinnedAllocator::new(0).expect("Failed to create alloc");
let mut vec = PinnedVec::<i32>::new_in(allocator);
vec.push(10).expect("Failed to push");
vec.push(20).expect("Failed to push");
vec.push(30).expect("Failed to push");
assert_eq!(vec[0], 10);
assert_eq!(vec[1], 20);
assert_eq!(vec[2], 30);
vec[1] = 25;
assert_eq!(vec[1], 25);
}
#[test]
fn pinned_vec_clear() {
let allocator = PinnedAllocator::new(0).expect("Failed to create alloc");
let mut vec = PinnedVec::<i32>::new_in(allocator);
vec.push(1).expect("Failed to push");
vec.push(2).expect("Failed to push");
vec.push(3).expect("Failed to push");
assert_eq!(vec.len(), 3);
vec.clear();
assert_eq!(vec.len(), 0);
assert!(vec.is_empty());
}
#[test]
fn pinned_vec_insert_remove() {
let allocator = PinnedAllocator::new(0).expect("Failed to create alloc");
let mut vec = PinnedVec::<i32>::new_in(allocator);
vec.push(1).expect("Failed to push");
vec.push(3).expect("Failed to push");
vec.insert(1, 2).expect("Failed to insert");
assert_eq!(vec.len(), 3);
assert_eq!(vec[0], 1);
assert_eq!(vec[1], 2);
assert_eq!(vec[2], 3);
let removed = vec.remove(1);
assert_eq!(removed, 2);
assert_eq!(vec.len(), 2);
assert_eq!(vec[0], 1);
assert_eq!(vec[1], 3);
}
#[test]
fn pinned_vec_reserve() {
let allocator = PinnedAllocator::new(0).expect("Failed to create alloc");
let mut vec = PinnedVec::<i32>::new_in(allocator);
assert_eq!(vec.capacity(), 0);
vec.reserve(10).expect("Failed to reserve");
assert!(vec.capacity() >= 10);
assert_eq!(vec.len(), 0);
for i in 0..10 {
vec.push(i).expect("Failed to push");
}
assert_eq!(vec.len(), 10);
}
#[test]
fn pinned_vec_extend_from_slice() {
let allocator = PinnedAllocator::new(0).expect("Failed to create alloc");
let mut vec = PinnedVec::<i32>::new_in(allocator);
let data = [1, 2, 3, 4, 5];
vec.extend_from_slice(&data).expect("Failed to extend");
assert_eq!(vec.len(), 5);
for (i, &value) in data.iter().enumerate() {
assert_eq!(vec[i], value);
}
}
#[test]
fn parallel_slice_static_for_each() {
let mut pool = spawn(hw_threads());
let data: Vec<usize> = (0..512).collect();
let total = AtomicUsize::new(0);
(&data[..])
.into_par_iter()
.with_pool(&mut pool)
.for_each(|value| {
total.fetch_add(*value, Ordering::Relaxed);
});
assert_eq!(total.load(Ordering::Relaxed), data.iter().sum());
}
#[test]
fn parallel_slice_mut_dynamic_for_each() {
let mut pool = spawn(hw_threads());
let mut data: Vec<usize> = (0..256).collect();
(&mut data[..])
.into_par_iter()
.with_schedule(&mut pool, DynamicScheduler)
.for_each(|value| {
*value *= 2;
});
for (i, v) in data.iter().enumerate() {
assert_eq!(*v, i * 2);
}
}
#[test]
fn parallel_slice_zip_sum() {
let mut pool = spawn(hw_threads());
let a: Vec<usize> = (0..128).collect();
let b: Vec<usize> = (0..128).rev().collect();
let sums: Arc<Vec<AtomicUsize>> =
Arc::new((0..hw_threads()).map(|_| AtomicUsize::new(0)).collect());
let shared = Arc::clone(&sums);
(&a[..])
.into_par_iter()
.zip((&b[..]).into_par_iter())
.with_pool(&mut pool)
.for_each_with_prong(|(lhs, rhs), prong| {
shared[prong.thread_index % shared.len()].fetch_add(lhs + rhs, Ordering::Relaxed);
});
let total: usize = sums.iter().map(|v| v.load(Ordering::Relaxed)).sum();
let expected: usize = a.iter().zip(b.iter()).map(|(x, y)| x + y).sum();
assert_eq!(total, expected);
}
#[test]
fn parallel_exact_iter_dispatch() {
let mut pool = spawn(hw_threads());
let mut values = vec![0usize; 256];
let ptr = SyncMutPtr::new(values.as_mut_ptr());
(0..values.len())
.into_par_iter()
.with_pool(&mut pool)
.for_each_with_prong(|index, prong| {
let slot = unsafe { &mut *ptr.get(prong.task_index) };
*slot = index * index;
});
for (idx, val) in values.iter().enumerate() {
assert_eq!(*val, idx * idx);
}
}
#[test]
fn round_robin_parallel_mut() {
let mut pool = spawn(hw_threads());
let mut rr_vec =
RoundRobinVec::<usize>::with_capacity_per_colocation(8).expect("round robin vec");
for value in 0..32 {
rr_vec.push(value).expect("push");
}
rr_vec
.par_iter_mut()
.with_pool(&mut pool)
.for_each(|value| {
*value += 1;
});
for index in 0..rr_vec.len() {
assert_eq!(rr_vec.get(index), Some(&(index + 1)));
}
}
#[test]
fn scratch_reduction_collects_sum() {
let mut pool = spawn(hw_threads());
let data: Vec<usize> = (0..1024).collect();
let mut scratch = vec![0usize; pool.threads()];
(&data[..])
.into_par_iter()
.with_pool(&mut pool)
.fold_with_scratch(scratch.as_mut_slice(), |slot, value, _| {
*slot += *value;
});
let total: usize = scratch.iter().sum();
let expected: usize = data.iter().sum();
assert_eq!(total, expected);
}
#[test]
fn pinned_vec_iterators() {
let allocator = PinnedAllocator::new(0).expect("Failed to create alloc");
let mut vec = PinnedVec::<i32>::new_in(allocator);
for i in 0..5 {
vec.push(i).expect("Failed to push");
}
let collected: Vec<i32> = vec.iter().copied().collect();
let expected = Vec::from([0, 1, 2, 3, 4]);
assert_eq!(collected, expected);
for value in vec.iter_mut() {
*value *= 2;
}
assert_eq!(vec[0], 0);
assert_eq!(vec[1], 2);
assert_eq!(vec[2], 4);
assert_eq!(vec[3], 6);
assert_eq!(vec[4], 8);
}
#[test]
fn pinned_vec_slices() {
let allocator = PinnedAllocator::new(0).expect("Failed to create alloc");
let mut vec = PinnedVec::<i32>::new_in(allocator);
for i in 0..5 {
vec.push(i).expect("Failed to push");
}
let slice = vec.as_slice();
assert_eq!(slice.len(), 5);
assert_eq!(slice[2], 2);
let mut_slice = vec.as_mut_slice();
mut_slice[2] = 99;
assert_eq!(vec[2], 99);
}
#[test]
fn pinned_vec_growth() {
let allocator = PinnedAllocator::new(0).expect("Failed to create alloc");
let mut vec = PinnedVec::<i32>::new_in(allocator);
for i in 0..100 {
vec.push(i).expect("Failed to push");
}
assert_eq!(vec.len(), 100);
for i in 0..100 {
assert_eq!(vec[i], i as i32);
}
}
#[test]
fn pinned_vec_invalid_numa_node() {
let numa_count = count_numa_nodes();
let allocator = PinnedAllocator::new(numa_count + 1);
assert!(allocator.is_none());
}
#[test]
fn sync_const_ptr() {
let data = Vec::from([1, 2, 3, 4, 5]);
let sync_ptr = SyncConstPtr::new(data.as_ptr());
unsafe {
assert_eq!(*sync_ptr.get(0), 1);
assert_eq!(*sync_ptr.get(2), 3);
assert_eq!(*sync_ptr.get(4), 5);
}
assert_eq!(sync_ptr.as_ptr(), data.as_ptr());
}
#[test]
fn sync_const_ptr_send_sync() {
fn assert_send<T: Send>() {}
fn assert_sync<T: Sync>() {}
assert_send::<SyncConstPtr<i32>>();
assert_sync::<SyncConstPtr<i32>>();
}
#[test]
fn pinned_vec_send_sync() {
fn assert_send<T: Send>() {}
fn assert_sync<T: Sync>() {}
assert_send::<PinnedVec<i32>>();
assert_sync::<PinnedVec<i32>>();
}
#[test]
fn indexed_split() {
let split = IndexedSplit::new(10, 3);
assert_eq!(split.get(0), 0..4); assert_eq!(split.get(1), 4..7); assert_eq!(split.get(2), 7..10);
let split = IndexedSplit::new(12, 3);
assert_eq!(split.get(0), 0..4);
assert_eq!(split.get(1), 4..8);
assert_eq!(split.get(2), 8..12);
let split = IndexedSplit::new(0, 2);
assert_eq!(split.get(0), 0..0);
assert_eq!(split.get(1), 0..0);
let split = IndexedSplit::new(1, 2);
assert_eq!(split.get(0), 0..1);
assert_eq!(split.get(1), 1..1);
}
#[test]
#[should_panic(expected = "Threads count must be greater than zero")]
fn indexed_split_zero_threads() {
IndexedSplit::new(10, 0);
}
}