#[macro_use]
pub mod macros;
pub mod builder;
use crate::fixed::traits::Storable;
use crate::fixed::{BitWidth, Error, FixedVec};
use mem_dbg::{DbgFlags, MemDbgImpl, MemSize, SizeFlags};
use num_traits::{Bounded, ToPrimitive, WrappingAdd, WrappingSub};
use parking_lot::Mutex;
use std::fmt;
use std::marker::PhantomData;
use std::ops::{BitAnd, BitOr, BitXor, Deref, DerefMut};
use std::sync::atomic::{AtomicU64, Ordering};
#[cfg(feature = "parallel")]
use rayon::prelude::*;
pub type UAtomicFixedVec<T> = AtomicFixedVec<T>;
pub type SAtomicFixedVec<T> = AtomicFixedVec<T>;
const MAX_LOCKS: usize = 1024;
const MIN_LOCKS: usize = 2;
const WORDS_PER_LOCK: usize = 64;
#[cfg(feature = "parallel")]
pub struct AtomicMutProxy<'a, T>
where
T: Storable<u64> + Copy + ToPrimitive,
{
vec: &'a AtomicFixedVec<T>,
index: usize,
value: T,
}
#[cfg(feature = "parallel")]
impl<T> fmt::Debug for AtomicMutProxy<'_, T>
where
T: Storable<u64> + Copy + ToPrimitive + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AtomicMutProxy")
.field("value", &self.value)
.finish()
}
}
#[cfg(feature = "parallel")]
impl<'a, T> AtomicMutProxy<'a, T>
where
T: Storable<u64> + Copy + ToPrimitive,
{
fn new(vec: &'a AtomicFixedVec<T>, index: usize) -> Self {
let value = vec.load(index, Ordering::Relaxed);
Self { vec, index, value }
}
pub fn into_inner(self) -> T {
use std::mem;
let value = self.value;
mem::forget(self); value
}
}
#[cfg(feature = "parallel")]
impl<T> Deref for AtomicMutProxy<'_, T>
where
T: Storable<u64> + Copy + ToPrimitive,
{
type Target = T;
fn deref(&self) -> &Self::Target {
&self.value
}
}
#[cfg(feature = "parallel")]
impl<T> DerefMut for AtomicMutProxy<'_, T>
where
T: Storable<u64> + Copy + ToPrimitive,
{
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.value
}
}
#[cfg(feature = "parallel")]
impl<T> Drop for AtomicMutProxy<'_, T>
where
T: Storable<u64> + Copy + ToPrimitive,
{
fn drop(&mut self) {
self.vec.store(self.index, self.value, Ordering::Relaxed);
}
}
#[derive(Debug)]
pub struct AtomicFixedVec<T>
where
T: Storable<u64>,
{
pub(crate) storage: Vec<AtomicU64>,
locks: Vec<Mutex<()>>,
bit_width: usize,
mask: u64,
len: usize,
_phantom: PhantomData<T>,
}
impl<T> AtomicFixedVec<T>
where
T: Storable<u64> + Copy + ToPrimitive,
{
#[inline(always)]
pub fn builder() -> builder::AtomicFixedVecBuilder<T> {
builder::AtomicFixedVecBuilder::new()
}
#[inline(always)]
pub fn len(&self) -> usize {
self.len
}
#[inline(always)]
pub fn is_empty(&self) -> bool {
self.len == 0
}
#[inline(always)]
pub fn bit_width(&self) -> usize {
self.bit_width
}
#[inline(always)]
pub fn as_slice(&self) -> &[AtomicU64] {
&self.storage
}
#[inline(always)]
pub fn load(&self, index: usize, order: Ordering) -> T {
assert!(index < self.len, "load index out of bounds");
let loaded_word = self.atomic_load(index, order);
T::from_word(loaded_word)
}
#[inline(always)]
pub unsafe fn load_unchecked(&self, index: usize, order: Ordering) -> T {
debug_assert!(index < self.len, "load_unchecked index out of bounds");
let loaded_word = self.atomic_load(index, order);
T::from_word(loaded_word)
}
#[inline(always)]
pub fn store(&self, index: usize, value: T, order: Ordering) {
assert!(index < self.len, "store index out of bounds");
let value_w = T::into_word(value);
self.atomic_store(index, value_w, order);
}
#[inline(always)]
pub unsafe fn store_unchecked(&self, index: usize, value: T, order: Ordering) {
debug_assert!(index < self.len, "store_unchecked index out of bounds");
let value_w = T::into_word(value);
self.atomic_store(index, value_w, order);
}
#[inline(always)]
pub fn swap(&self, index: usize, value: T, order: Ordering) -> T {
assert!(index < self.len, "swap index out of bounds");
let value_w = T::into_word(value);
let old_word = self.atomic_swap(index, value_w, order);
T::from_word(old_word)
}
#[inline(always)]
pub unsafe fn swap_unchecked(&self, index: usize, value: T, order: Ordering) -> T {
debug_assert!(index < self.len, "swap_unchecked index out of bounds");
let value_w = T::into_word(value);
let old_word = self.atomic_swap(index, value_w, order);
T::from_word(old_word)
}
#[inline(always)]
pub fn compare_exchange(
&self,
index: usize,
current: T,
new: T,
success: Ordering,
failure: Ordering,
) -> Result<T, T> {
assert!(index < self.len, "compare_exchange index out of bounds");
let current_w = T::into_word(current);
let new_w = T::into_word(new);
match self.atomic_compare_exchange(index, current_w, new_w, success, failure) {
Ok(w) => Ok(T::from_word(w)),
Err(w) => Err(T::from_word(w)),
}
}
#[inline(always)]
pub unsafe fn compare_exchange_unchecked(
&self,
index: usize,
current: T,
new: T,
success: Ordering,
failure: Ordering,
) -> Result<T, T> {
debug_assert!(
index < self.len,
"compare_exchange_unchecked index out of bounds"
);
let current_w = T::into_word(current);
let new_w = T::into_word(new);
match self.atomic_compare_exchange(index, current_w, new_w, success, failure) {
Ok(w) => Ok(T::from_word(w)),
Err(w) => Err(T::from_word(w)),
}
}
#[inline(always)]
pub fn get(&self, index: usize) -> Option<T> {
if index >= self.len {
return None;
}
Some(self.load(index, Ordering::SeqCst))
}
#[inline(always)]
pub unsafe fn get_unchecked(&self, index: usize) -> T {
unsafe { self.load_unchecked(index, Ordering::SeqCst) }
}
pub fn iter(&self) -> impl Iterator<Item = T> + '_ {
AtomicFixedVecIter {
vec: self,
current_index: 0,
}
}
#[cfg(feature = "parallel")]
pub fn par_iter(&self) -> impl ParallelIterator<Item = T> + '_
where
T: Send + Sync,
{
(0..self.len())
.into_par_iter()
.map(move |i| self.load(i, Ordering::Relaxed))
}
#[cfg(feature = "parallel")]
pub fn par_iter_mut(&self) -> impl ParallelIterator<Item = AtomicMutProxy<'_, T>>
where
T: Send + Sync,
{
(0..self.len())
.into_par_iter()
.map(move |i| AtomicMutProxy::new(self, i))
}
}
impl<T> AtomicFixedVec<T>
where
T: Storable<u64> + Bounded + Copy + ToPrimitive,
{
#[inline(always)]
pub fn fetch_add(&self, index: usize, val: T, order: Ordering) -> T
where
T: WrappingAdd,
{
self.atomic_rmw(index, val, order, |a, b| a.wrapping_add(&b))
}
#[inline(always)]
pub fn fetch_sub(&self, index: usize, val: T, order: Ordering) -> T
where
T: WrappingSub,
{
self.atomic_rmw(index, val, order, |a, b| a.wrapping_sub(&b))
}
#[inline(always)]
pub fn fetch_and(&self, index: usize, val: T, order: Ordering) -> T
where
T: BitAnd<Output = T>,
{
self.atomic_rmw(index, val, order, |a, b| a & b)
}
#[inline(always)]
pub fn fetch_or(&self, index: usize, val: T, order: Ordering) -> T
where
T: BitOr<Output = T>,
{
self.atomic_rmw(index, val, order, |a, b| a | b)
}
#[inline(always)]
pub fn fetch_xor(&self, index: usize, val: T, order: Ordering) -> T
where
T: BitXor<Output = T>,
{
self.atomic_rmw(index, val, order, |a, b| a ^ b)
}
#[inline(always)]
pub fn fetch_max(&self, index: usize, val: T, order: Ordering) -> T
where
T: Ord,
{
self.atomic_rmw(index, val, order, |a, b| a.max(b))
}
#[inline(always)]
pub fn fetch_min(&self, index: usize, val: T, order: Ordering) -> T
where
T: Ord,
{
self.atomic_rmw(index, val, order, |a, b| a.min(b))
}
pub fn fetch_update<F>(
&self,
index: usize,
success: Ordering,
failure: Ordering,
mut f: F,
) -> Result<T, T>
where
F: FnMut(T) -> Option<T>,
{
let mut current = self.load(index, Ordering::Relaxed);
loop {
match f(current) {
Some(new) => match self.compare_exchange(index, current, new, success, failure) {
Ok(old) => return Ok(old),
Err(actual) => current = actual,
},
None => return Err(current),
}
}
}
}
impl<T> TryFrom<&[T]> for AtomicFixedVec<T>
where
T: Storable<u64> + Copy + ToPrimitive,
{
type Error = Error;
fn try_from(slice: &[T]) -> Result<Self, Self::Error> {
AtomicFixedVec::builder()
.bit_width(BitWidth::Minimal)
.build(slice)
}
}
impl<T> AtomicFixedVec<T>
where
T: Storable<u64>,
{
pub(crate) fn new(bit_width: usize, len: usize) -> Result<Self, Error> {
if bit_width > u64::BITS as usize {
return Err(Error::InvalidParameters(format!(
"bit_width ({}) cannot be greater than the word size ({})",
bit_width,
u64::BITS
)));
}
let mask = if bit_width == u64::BITS as usize {
u64::MAX
} else {
(1u64 << bit_width).wrapping_sub(1)
};
let total_bits = len.saturating_mul(bit_width);
let num_words = total_bits.div_ceil(u64::BITS as usize);
let buffer_len = if len == 0 { 0 } else { num_words + 1 }; let storage = (0..buffer_len).map(|_| AtomicU64::new(0)).collect();
let num_locks = if len == 0 {
MIN_LOCKS
} else {
let num_cores = std::thread::available_parallelism().map_or(MIN_LOCKS, |n| n.get());
let target_locks = (num_words / WORDS_PER_LOCK).max(1);
(target_locks.max(num_cores) * 2)
.next_power_of_two()
.min(MAX_LOCKS)
};
let locks = (0..num_locks).map(|_| Mutex::new(())).collect();
Ok(Self {
storage,
locks,
bit_width,
mask,
len,
_phantom: PhantomData,
})
}
}
impl<T> AtomicFixedVec<T>
where
T: Storable<u64> + Copy + ToPrimitive,
{
#[inline(always)]
fn atomic_load(&self, index: usize, order: Ordering) -> u64 {
let bit_pos = index * self.bit_width;
let word_index = bit_pos / u64::BITS as usize;
let bit_offset = bit_pos % u64::BITS as usize;
if bit_offset + self.bit_width <= u64::BITS as usize {
let word = self.storage[word_index].load(order);
(word >> bit_offset) & self.mask
} else {
let lock_index = word_index & (self.locks.len() - 1);
let _guard = self.locks[lock_index].lock();
let low_word = self.storage[word_index].load(Ordering::Relaxed);
let high_word = self.storage[word_index + 1].load(Ordering::Relaxed);
let combined =
(low_word >> bit_offset) | (high_word << (u64::BITS as usize - bit_offset));
combined & self.mask
}
}
#[inline(always)]
fn atomic_store(&self, index: usize, value: u64, order: Ordering) {
let bit_pos = index * self.bit_width;
let word_index = bit_pos / u64::BITS as usize;
let bit_offset = bit_pos % u64::BITS as usize;
if bit_offset + self.bit_width <= u64::BITS as usize {
let atomic_word_ref = &self.storage[word_index];
let store_mask = self.mask << bit_offset;
let store_value = value << bit_offset;
let mut old_word = atomic_word_ref.load(Ordering::Relaxed);
loop {
let new_word = (old_word & !store_mask) | store_value;
match atomic_word_ref.compare_exchange_weak(
old_word,
new_word,
order,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(x) => old_word = x,
}
}
} else {
let lock_index = word_index & (self.locks.len() - 1);
let _guard = self.locks[lock_index].lock();
let low_word_ref = &self.storage[word_index];
let high_word_ref = &self.storage[word_index + 1];
low_word_ref
.fetch_update(order, Ordering::Relaxed, |mut w| {
w &= !(u64::MAX << bit_offset);
w |= value << bit_offset;
Some(w)
})
.unwrap();
let bits_in_high = (bit_offset + self.bit_width) - u64::BITS as usize;
let high_mask = (1u64 << bits_in_high).wrapping_sub(1);
high_word_ref
.fetch_update(order, Ordering::Relaxed, |mut w| {
w &= !high_mask;
w |= value >> (u64::BITS as usize - bit_offset);
Some(w)
})
.unwrap(); }
}
#[inline(always)]
fn atomic_swap(&self, index: usize, value: u64, order: Ordering) -> u64 {
let bit_pos = index * self.bit_width;
let word_index = bit_pos / u64::BITS as usize;
let bit_offset = bit_pos % u64::BITS as usize;
if bit_offset + self.bit_width <= u64::BITS as usize {
let atomic_word_ref = &self.storage[word_index];
let store_mask = self.mask << bit_offset;
let store_value = value << bit_offset;
let mut old_word = atomic_word_ref.load(Ordering::Relaxed);
loop {
let new_word = (old_word & !store_mask) | store_value;
match atomic_word_ref.compare_exchange_weak(
old_word,
new_word,
order,
Ordering::Relaxed,
) {
Ok(_) => return (old_word >> bit_offset) & self.mask,
Err(x) => old_word = x,
}
}
} else {
let lock_index = word_index & (self.locks.len() - 1);
let _guard = self.locks[lock_index].lock();
let old_val = self.atomic_load(index, Ordering::Relaxed);
self.atomic_store(index, value, order);
old_val
}
}
#[inline(always)]
fn atomic_compare_exchange(
&self,
index: usize,
current: u64,
new: u64,
success: Ordering,
failure: Ordering,
) -> Result<u64, u64> {
let bit_pos = index * self.bit_width;
let word_index = bit_pos / u64::BITS as usize;
let bit_offset = bit_pos % u64::BITS as usize;
if bit_offset + self.bit_width <= u64::BITS as usize {
let atomic_word_ref = &self.storage[word_index];
let store_mask = self.mask << bit_offset;
let new_value_shifted = new << bit_offset;
let mut old_word = atomic_word_ref.load(failure);
loop {
let old_val_extracted = (old_word >> bit_offset) & self.mask;
if old_val_extracted != current {
return Err(old_val_extracted);
}
let new_word = (old_word & !store_mask) | new_value_shifted;
match atomic_word_ref.compare_exchange_weak(old_word, new_word, success, failure) {
Ok(_) => return Ok(current),
Err(x) => old_word = x,
}
}
} else {
let lock_index = word_index & (self.locks.len() - 1);
let _guard = self.locks[lock_index].lock();
let old_val = self.atomic_load(index, failure);
if old_val != current {
return Err(old_val);
}
self.atomic_store(index, new, success);
Ok(current)
}
}
#[inline(always)]
fn atomic_rmw(&self, index: usize, val: T, order: Ordering, op: impl Fn(T, T) -> T) -> T {
let mut current = self.load(index, Ordering::Relaxed);
loop {
let new = op(current, val);
match self.compare_exchange(index, current, new, order, Ordering::Relaxed) {
Ok(old) => return old,
Err(actual) => current = actual,
}
}
}
}
impl<T, W, E> From<FixedVec<T, W, E, Vec<W>>> for AtomicFixedVec<T>
where
T: Storable<W> + Storable<u64>,
W: crate::fixed::traits::Word,
E: dsi_bitstream::prelude::Endianness,
{
fn from(fixed_vec: FixedVec<T, W, E, Vec<W>>) -> Self {
let storage = unsafe {
let mut md = std::mem::ManuallyDrop::new(fixed_vec.bits);
Vec::from_raw_parts(md.as_mut_ptr() as *mut AtomicU64, md.len(), md.capacity())
};
let num_words = (fixed_vec.len * fixed_vec.bit_width).div_ceil(u64::BITS as usize);
let num_locks = if fixed_vec.len == 0 {
MIN_LOCKS
} else {
let num_cores = std::thread::available_parallelism().map_or(MIN_LOCKS, |n| n.get());
let target_locks = (num_words / WORDS_PER_LOCK).max(1);
(target_locks.max(num_cores) * 2)
.next_power_of_two()
.min(MAX_LOCKS)
};
let locks = (0..num_locks).map(|_| Mutex::new(())).collect();
Self {
storage,
locks,
bit_width: fixed_vec.bit_width,
mask: fixed_vec.mask.to_u64().unwrap(),
len: fixed_vec.len,
_phantom: PhantomData,
}
}
}
impl<T> From<AtomicFixedVec<T>> for FixedVec<T, u64, dsi_bitstream::prelude::LE, Vec<u64>>
where
T: Storable<u64>,
{
fn from(atomic_vec: AtomicFixedVec<T>) -> Self {
let bits = unsafe {
let mut md = std::mem::ManuallyDrop::new(atomic_vec.storage);
Vec::from_raw_parts(md.as_mut_ptr() as *mut u64, md.len(), md.capacity())
};
unsafe { FixedVec::new_unchecked(bits, atomic_vec.len, atomic_vec.bit_width) }
}
}
impl<T> MemSize for AtomicFixedVec<T>
where
T: Storable<u64>,
{
fn mem_size_rec(&self, flags: SizeFlags, _refs: &mut mem_dbg::HashMap<usize, usize>) -> usize {
let locks_size = if flags.contains(SizeFlags::CAPACITY) {
self.locks.capacity() * core::mem::size_of::<Mutex<()>>()
} else {
self.locks.len() * core::mem::size_of::<Mutex<()>>()
};
core::mem::size_of::<Self>()
+ self.storage.mem_size(flags)
+ core::mem::size_of::<Vec<Mutex<()>>>()
+ locks_size
}
}
impl<T: Storable<u64>> MemDbgImpl for AtomicFixedVec<T> {
fn _mem_dbg_rec_on(
&self,
writer: &mut impl core::fmt::Write,
total_size: usize,
max_depth: usize,
prefix: &mut String,
_is_last: bool,
flags: DbgFlags,
_dbg_refs: &mut mem_dbg::HashSet<usize>,
) -> core::fmt::Result {
self.bit_width
._mem_dbg_rec_on(writer, total_size, max_depth, prefix, false, flags, _dbg_refs)?;
self.len
._mem_dbg_rec_on(writer, total_size, max_depth, prefix, false, flags, _dbg_refs)?;
self.mask
._mem_dbg_rec_on(writer, total_size, max_depth, prefix, false, flags, _dbg_refs)?;
let locks_size = core::mem::size_of::<Vec<Mutex<()>>>()
+ self.locks.capacity() * core::mem::size_of::<Mutex<()>>();
locks_size._mem_dbg_rec_on(writer, total_size, max_depth, prefix, false, flags, _dbg_refs)?;
self.storage
._mem_dbg_rec_on(writer, total_size, max_depth, prefix, true, flags, _dbg_refs)?;
Ok(())
}
}
pub struct AtomicFixedVecIter<'a, T>
where
T: Storable<u64> + Copy + ToPrimitive,
{
vec: &'a AtomicFixedVec<T>,
current_index: usize,
}
impl<T> Iterator for AtomicFixedVecIter<'_, T>
where
T: Storable<u64> + Copy + ToPrimitive,
{
type Item = T;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.current_index >= self.vec.len() {
return None;
}
let value = self.vec.get(self.current_index).unwrap();
self.current_index += 1;
Some(value)
}
fn size_hint(&self) -> (usize, Option<usize>) {
let remaining = self.vec.len().saturating_sub(self.current_index);
(remaining, Some(remaining))
}
}
impl<T> ExactSizeIterator for AtomicFixedVecIter<'_, T>
where
T: Storable<u64> + Copy + ToPrimitive,
{
fn len(&self) -> usize {
self.vec.len().saturating_sub(self.current_index)
}
}
impl<'a, T> IntoIterator for &'a AtomicFixedVec<T>
where
T: Storable<u64> + Copy + ToPrimitive,
{
type Item = T;
type IntoIter = AtomicFixedVecIter<'a, T>;
fn into_iter(self) -> Self::IntoIter {
AtomicFixedVecIter {
vec: self,
current_index: 0,
}
}
}
impl<T> PartialEq for AtomicFixedVec<T>
where
T: Storable<u64> + PartialEq + Copy + ToPrimitive,
{
fn eq(&self, other: &Self) -> bool {
if self.len() != other.len() || self.bit_width() != other.bit_width() {
return false;
}
self.iter().zip(other.iter()).all(|(a, b)| a == b)
}
}
impl<T> Eq for AtomicFixedVec<T> where T: Storable<u64> + Eq + Copy + ToPrimitive {}