use super::buffer::PooledBuffer;
use crossbeam_utils::CachePadded;
use std::{
alloc::Layout,
cell::Cell,
mem::MaybeUninit,
num::{NonZeroU32, NonZeroUsize},
sync::atomic::Ordering,
};
cfg_if::cfg_if! {
if #[cfg(feature = "loom")] {
use loom::{
cell::UnsafeCell,
sync::atomic::{AtomicU64, AtomicUsize},
};
} else {
use std::{
cell::UnsafeCell,
sync::atomic::{AtomicU64, AtomicUsize},
};
}
}
const SLOT_BITMAP_WORD_BITS: usize = u64::BITS as usize;
const SLOT_BITMAP_WORD_SHIFT: u32 = SLOT_BITMAP_WORD_BITS.trailing_zeros();
const INLINE_PUT_BATCH_MASKS: usize = 128;
pub struct Freelist {
layout: Layout,
created: AtomicUsize,
words: Box<[CachePadded<AtomicU64>]>,
storage: Box<[UnsafeCell<MaybeUninit<PooledBuffer>>]>,
word_mask: usize,
word_shift: u32,
}
unsafe impl Send for Freelist {}
unsafe impl Sync for Freelist {}
impl Freelist {
pub fn new(
capacity: NonZeroU32,
parallelism: NonZeroUsize,
layout: Layout,
prefill: bool,
) -> Self {
assert!(layout.size() > 0, "layout size must be non-zero");
let capacity = capacity.get() as usize;
let max_stripes = 1usize << capacity.ilog2();
let target_stripes = parallelism
.get()
.checked_next_power_of_two()
.unwrap_or(max_stripes)
.min(max_stripes);
let word_count = target_stripes
.max(capacity.div_ceil(SLOT_BITMAP_WORD_BITS))
.next_power_of_two();
let word_shift = word_count.trailing_zeros();
let word_mask = word_count - 1;
let words = (0..word_count)
.map(|_| CachePadded::new(AtomicU64::new(0)))
.collect::<Vec<_>>()
.into_boxed_slice();
let storage = (0..capacity)
.map(|_| UnsafeCell::new(MaybeUninit::uninit()))
.collect::<Vec<_>>()
.into_boxed_slice();
let freelist = Self {
layout,
created: AtomicUsize::new(0),
words,
storage,
word_mask,
word_shift,
};
if prefill {
freelist.put_batch((0..capacity).map(|_| {
freelist
.try_create(false)
.expect("prefill creates exactly capacity buffers")
}));
}
freelist
}
#[inline(always)]
pub(super) fn try_create(&self, zeroed: bool) -> Option<(u32, PooledBuffer)> {
let slot = self
.created
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |created| {
(created < self.storage.len()).then_some(created + 1)
})
.ok()? as u32;
let buffer = if zeroed {
PooledBuffer::new_zeroed(self.layout)
} else {
PooledBuffer::new(self.layout)
};
Some((slot, buffer))
}
#[inline(always)]
const fn slot_word(&self, slot: u32) -> (usize, u64) {
let slot = slot as usize;
let word_index = slot & self.word_mask;
let bit = slot >> self.word_shift;
(word_index, 1u64 << bit)
}
#[inline(always)]
const fn slot_index(&self, word_index: usize, bit: usize) -> u32 {
let slot = (bit << self.word_shift) | word_index;
slot as u32
}
#[inline]
pub fn put(&self, slot: u32, buffer: PooledBuffer) {
self.park(slot, buffer);
let (word_index, mask) = self.slot_word(slot);
let previous = self.words[word_index].fetch_or(mask, Ordering::Release);
assert_eq!(
previous & mask,
0,
"returned slot must not already be marked free"
);
}
#[inline]
pub fn put_batch(&self, entries: impl IntoIterator<Item = (u32, PooledBuffer)>) {
let mut entries = entries.into_iter();
let Some((slot, buffer)) = entries.next() else {
return;
};
let Some((next_slot, next_buffer)) = entries.next() else {
self.put(slot, buffer);
return;
};
let word_count = self.words.len();
if word_count <= INLINE_PUT_BATCH_MASKS {
let mut masks = MaybeUninit::<[u64; INLINE_PUT_BATCH_MASKS]>::uninit();
let masks = unsafe {
let ptr = masks.as_mut_ptr().cast::<u64>();
ptr.write_bytes(0, word_count);
std::slice::from_raw_parts_mut(ptr, word_count)
};
self.put_entries(masks, slot, buffer, next_slot, next_buffer, entries);
} else {
let mut masks = vec![0u64; word_count];
self.put_entries(
masks.as_mut_slice(),
slot,
buffer,
next_slot,
next_buffer,
entries,
);
}
}
#[inline(always)]
fn put_entries(
&self,
masks: &mut [u64],
slot: u32,
buffer: PooledBuffer,
next_slot: u32,
next_buffer: PooledBuffer,
entries: impl Iterator<Item = (u32, PooledBuffer)>,
) {
self.stage_put(masks, slot, buffer);
self.stage_put(masks, next_slot, next_buffer);
for (slot, buffer) in entries {
self.stage_put(masks, slot, buffer);
}
for (word_index, &mask) in masks.iter().enumerate() {
if mask == 0 {
continue;
}
let previous = self.words[word_index].fetch_or(mask, Ordering::Release);
assert_eq!(
previous & mask,
0,
"returned slot batch must not already contain a free slot"
);
}
}
#[inline(always)]
fn stage_put(&self, masks: &mut [u64], slot: u32, buffer: PooledBuffer) {
self.park(slot, buffer);
let (word_index, mask) = self.slot_word(slot);
masks[word_index] |= mask;
}
#[inline]
pub fn take(&self) -> Option<(u32, PooledBuffer)> {
let probe = SlotBitmapProbe::new(self.word_mask, self.word_shift);
for scanned in 0..self.words.len() {
let word_index = probe.word_index(scanned);
let word_ref = &self.words[word_index];
let mut word = word_ref.load(Ordering::Relaxed);
while word != 0 {
let bit = probe.select_set_bit(word);
let mask = 1u64 << bit;
let observed = word_ref.fetch_and(!mask, Ordering::Acquire);
if observed & mask != 0 {
let slot = self.slot_index(word_index, bit);
return Some((slot, self.unpark(slot)));
}
word = observed & !mask;
}
}
None
}
#[inline]
pub fn take_batch(&self, max: usize, mut on_entry: impl FnMut(u32, PooledBuffer)) -> usize {
if max == 1 {
let Some((slot, buffer)) = self.take() else {
return 0;
};
on_entry(slot, buffer);
return 1;
}
let probe = SlotBitmapProbe::new(self.word_mask, self.word_shift);
let mut filled = 0;
for scanned in 0..self.words.len() {
if filled == max {
break;
}
let word_index = probe.word_index(scanned);
let word_ref = &self.words[word_index];
let mut word = word_ref.load(Ordering::Relaxed);
while word != 0 && filled < max {
let claim = probe.select_set_bits(word, max - filled);
let observed = word_ref.fetch_and(!claim, Ordering::Acquire);
let mut claimed = observed & claim;
while claimed != 0 {
let bit = claimed.trailing_zeros() as usize;
let slot = self.slot_index(word_index, bit);
on_entry(slot, self.unpark(slot));
claimed &= claimed - 1;
filled += 1;
}
word = observed & !claim;
}
}
filled
}
#[inline]
pub fn drain(&self) -> usize {
let mut drained = 0;
for (word_index, word) in self.words.iter().enumerate() {
let mut claimed = word.swap(0, Ordering::Acquire);
while claimed != 0 {
let bit = claimed.trailing_zeros() as usize;
let slot = self.slot_index(word_index, bit);
let buffer = self.unpark(slot);
unsafe { buffer.deallocate(self.layout) };
claimed &= claimed - 1;
drained += 1;
}
}
drained
}
#[inline(always)]
fn park(&self, slot: u32, buffer: PooledBuffer) {
let cell = self
.storage
.get(slot as usize)
.expect("slot id must refer to an allocated slot");
cfg_if::cfg_if! {
if #[cfg(not(feature = "loom"))] {
unsafe {
(*cell.get()).write(buffer);
}
} else {
cell.with_mut(|ptr| {
unsafe { (*ptr).write(buffer) };
});
}
}
}
#[inline(always)]
fn unpark(&self, slot: u32) -> PooledBuffer {
let cell = self
.storage
.get(slot as usize)
.expect("slot id must refer to an allocated slot");
cfg_if::cfg_if! {
if #[cfg(not(feature = "loom"))] {
unsafe { (*cell.get()).assume_init_read() }
} else {
cell.with_mut(|ptr| {
unsafe { (*ptr).assume_init_read() }
})
}
}
}
}
impl Drop for Freelist {
fn drop(&mut self) {
self.drain();
}
}
struct SlotBitmapProbe {
start_word: usize,
word_mask: usize,
bit_offset: u32,
}
cfg_if::cfg_if! {
if #[cfg(not(feature = "loom"))] {
static NEXT_SLOT_BITMAP_THREAD_ID: AtomicUsize = AtomicUsize::new(0);
} else {
loom::lazy_static! {
static ref NEXT_SLOT_BITMAP_THREAD_ID: AtomicUsize = AtomicUsize::new(0);
}
}
}
cfg_if::cfg_if! {
if #[cfg(not(feature = "loom"))] {
thread_local! {
static TLS_SLOT_BITMAP_THREAD_ID: Cell<Option<usize>> = const { Cell::new(None) };
}
} else {
loom::thread_local! {
static TLS_SLOT_BITMAP_THREAD_ID: Cell<Option<usize>> = Cell::new(None);
}
}
}
impl SlotBitmapProbe {
#[inline(always)]
fn new(word_mask: usize, word_shift: u32) -> Self {
let thread_id = TLS_SLOT_BITMAP_THREAD_ID.with(|thread_id| {
if let Some(id) = thread_id.get() {
return id;
}
let id = NEXT_SLOT_BITMAP_THREAD_ID.fetch_add(1, Ordering::Relaxed);
thread_id.set(Some(id));
id
});
Self {
start_word: thread_id & word_mask,
word_mask,
bit_offset: Self::bit_offset(thread_id, word_shift),
}
}
#[inline(always)]
const fn bit_offset(thread_id: usize, word_shift: u32) -> u32 {
let group = thread_id >> word_shift;
(group.reverse_bits() >> (usize::BITS - SLOT_BITMAP_WORD_SHIFT)) as u32
}
#[inline(always)]
const fn word_index(&self, scanned: usize) -> usize {
(self.start_word + scanned) & self.word_mask
}
#[inline(always)]
const fn select_set_bit(&self, word: u64) -> usize {
let rotated = word.rotate_right(self.bit_offset);
((rotated.trailing_zeros() + self.bit_offset) & (SLOT_BITMAP_WORD_BITS as u32 - 1)) as usize
}
#[inline]
const fn select_set_bits(&self, word: u64, limit: usize) -> u64 {
let mut remaining = word.rotate_right(self.bit_offset);
let mut selected = 0u64;
let mut taken = 0;
while remaining != 0 && taken < limit {
let bit = remaining.trailing_zeros();
let mask = 1u64 << bit;
selected |= mask;
remaining &= !mask;
taken += 1;
}
selected.rotate_left(self.bit_offset)
}
}
#[cfg(test)]
pub(super) mod tests {
use super::*;
use commonware_utils::{NZUsize, NZU32};
use std::sync::{
atomic::{AtomicUsize as StdAtomicUsize, Ordering as AtomicOrdering},
Arc, Barrier,
};
pub fn created(freelist: &Freelist) -> usize {
freelist.created.load(Ordering::Relaxed)
}
pub fn len(freelist: &Freelist) -> usize {
freelist
.words
.iter()
.map(|word| word.load(Ordering::Acquire).count_ones() as usize)
.sum()
}
pub fn num_words(freelist: &Freelist) -> usize {
freelist.words.len()
}
const TEST_LAYOUT: Layout = match Layout::from_size_align(64, 64) {
Ok(layout) => layout,
Err(_) => unreachable!(),
};
#[test]
fn test_freelist_try_create_tracks_capacity_and_prefill() {
let set = Freelist::new(NZU32!(2), NZUsize!(1), TEST_LAYOUT, false);
assert_eq!(created(&set), 0);
let (slot0, buffer0) = set.try_create(false).expect("first creation");
let (slot1, buffer1) = set.try_create(false).expect("second creation");
assert_eq!(slot0, 0);
assert_eq!(slot1, 1);
assert_eq!(created(&set), 2);
assert!(set.try_create(false).is_none());
set.put(slot0, buffer0);
set.put(slot1, buffer1);
assert_eq!(len(&set), 2);
assert_eq!(created(&set), 2);
assert!(set.try_create(false).is_none());
let prefilled = Freelist::new(NZU32!(2), NZUsize!(1), TEST_LAYOUT, true);
assert_eq!(created(&prefilled), 2);
assert_eq!(len(&prefilled), 2);
}
#[test]
fn test_freelist_returns_each_slot_once() {
let set = Freelist::new(NZU32!(3), NZUsize!(1), TEST_LAYOUT, false);
let (slot0, buffer0) = set.try_create(false).unwrap();
let (slot1, buffer1) = set.try_create(false).unwrap();
let (slot2, buffer2) = set.try_create(false).unwrap();
assert_eq!([slot0, slot1, slot2], [0, 1, 2]);
set.put(slot0, buffer0);
set.put(slot1, buffer1);
set.put(slot2, buffer2);
let mut seen = [false; 3];
let mut taken = Vec::new();
for _ in 0..3 {
let (slot, buffer) = set.take().expect("slot should be available");
assert!(!seen[slot as usize]);
seen[slot as usize] = true;
taken.push((slot, buffer));
}
assert_eq!(len(&set), 0);
assert!(seen.into_iter().all(|seen| seen));
assert!(set.take().is_none());
for (slot, buffer) in taken {
set.put(slot, buffer);
}
}
#[test]
fn test_freelist_uses_striped_power_of_two_words() {
let cases = [
(1, 1, 1),
(2, 2, 2),
(3, 4, 2),
(4, 4, 4),
(12, 9, 8),
(16, 8, 8),
(64, 8, 8),
(512, 8, 8),
(513, 8, 16),
(4097, 8, 128),
];
for (capacity, parallelism, expected_words) in cases {
let set = Freelist::new(NZU32!(capacity), NZUsize!(parallelism), TEST_LAYOUT, false);
assert_eq!(num_words(&set), expected_words);
assert!(num_words(&set).is_power_of_two());
for slot in 0..capacity {
let (word_index, mask) = set.slot_word(slot);
let bit = mask.trailing_zeros() as usize;
assert!(word_index < expected_words);
assert_eq!(set.slot_index(word_index, bit), slot);
}
}
}
#[test]
fn test_freelist_put_batch_handles_empty_single_and_multi_entry_paths() {
let set = Freelist::new(NZU32!(8), NZUsize!(8), TEST_LAYOUT, false);
set.put_batch(Vec::new());
assert_eq!(len(&set), 0);
let mut created = Vec::new();
for expected in 0..8 {
let (slot, buffer) = set.try_create(false).unwrap();
assert_eq!(slot, expected);
created.push(Some(buffer));
}
let buffer = created[3].take().unwrap();
set.put_batch(vec![(3, buffer)]);
assert_eq!(len(&set), 1);
let mut taken = Vec::new();
assert_eq!(
set.take_batch(1, |slot, buffer| taken.push((slot, buffer))),
1
);
assert_eq!(taken.len(), 1);
assert_eq!(taken[0].0, 3);
let single = taken.pop().expect("single entry was taken");
let mut batch = Vec::new();
for slot in [1, 5, 7] {
let buffer = created[slot as usize].take().unwrap();
batch.push((slot, buffer));
}
set.put_batch(batch);
assert_eq!(len(&set), 3);
assert_eq!(
set.take_batch(3, |slot, buffer| taken.push((slot, buffer))),
3
);
let mut slots = taken.iter().map(|(slot, _)| *slot).collect::<Vec<_>>();
slots.sort_unstable();
assert_eq!(slots, vec![1, 5, 7]);
assert_eq!(len(&set), 0);
set.put(single.0, single.1);
for (slot, buffer) in taken {
set.put(slot, buffer);
}
for (slot, buffer) in created.into_iter().enumerate() {
if let Some(buffer) = buffer {
set.put(slot as u32, buffer);
}
}
}
#[test]
fn test_freelist_put_batch_uses_heap_masks_when_word_count_exceeds_inline_capacity() {
let set = Freelist::new(NZU32!(8193), NZUsize!(65), TEST_LAYOUT, false);
assert!(num_words(&set) > INLINE_PUT_BATCH_MASKS);
let mut created = Vec::new();
for expected in 0..8193 {
let (slot, buffer) = set.try_create(false).expect("slot");
assert_eq!(slot, expected);
created.push(Some(buffer));
}
let mut batch = Vec::new();
for slot in [0, 1, 64, 8192] {
let buffer = created[slot as usize].take().expect("slot buffer");
batch.push((slot, buffer));
}
set.put_batch(batch);
assert_eq!(len(&set), 4);
let mut taken = Vec::new();
assert_eq!(
set.take_batch(8, |slot, buffer| taken.push((slot, buffer))),
4
);
let mut slots = taken.iter().map(|(slot, _)| *slot).collect::<Vec<_>>();
slots.sort_unstable();
assert_eq!(slots, vec![0, 1, 64, 8192]);
assert_eq!(len(&set), 0);
for (slot, buffer) in taken {
set.put(slot, buffer);
}
for (slot, buffer) in created.into_iter().enumerate() {
if let Some(buffer) = buffer {
set.put(slot as u32, buffer);
}
}
}
#[test]
fn test_freelist_drain_returns_all_available_slots() {
let set = Freelist::new(NZU32!(4), NZUsize!(4), TEST_LAYOUT, false);
let (slot0, buffer0) = set.try_create(false).unwrap();
let (slot1, buffer1) = set.try_create(false).unwrap();
let (slot2, buffer2) = set.try_create(false).unwrap();
let (slot3, buffer3) = set.try_create(false).unwrap();
assert_eq!([slot0, slot1, slot2, slot3], [0, 1, 2, 3]);
set.put(slot0, buffer0);
set.put(slot2, buffer2);
set.put(slot3, buffer3);
assert_eq!(set.drain(), 3);
assert_eq!(len(&set), 0);
set.put(slot1, buffer1);
}
#[test]
fn test_freelist_take_batch_handles_zero_single_and_partial_fill() {
let set = Freelist::new(NZU32!(4), NZUsize!(4), TEST_LAYOUT, false);
for expected in [0, 1, 2] {
let (slot, buffer) = set.try_create(false).expect("slot");
assert_eq!(slot, expected);
set.put(slot, buffer);
}
let mut taken = Vec::new();
let mut record = |slot, buffer| taken.push((slot, buffer));
assert_eq!(set.take_batch(0, &mut record), 0);
assert_eq!(set.take_batch(1, &mut record), 1);
assert_eq!(set.take_batch(8, &mut record), 2);
assert_eq!(set.take_batch(8, &mut record), 0);
assert_eq!(set.take_batch(1, &mut record), 0);
assert_eq!(taken.len(), 3);
let mut slots = taken.iter().map(|(slot, _)| *slot).collect::<Vec<_>>();
slots.sort_unstable();
assert_eq!(slots, vec![0, 1, 2]);
for (slot, buffer) in taken {
set.put(slot, buffer);
}
}
#[test]
fn test_freelist_take_batch_breaks_after_filling_target_in_home_word() {
let set = Freelist::new(NZU32!(16), NZUsize!(8), TEST_LAYOUT, true);
let start_word = SlotBitmapProbe::new(set.word_mask, set.word_shift).word_index(0);
let slot0 = set.slot_index(start_word, 0);
let slot1 = set.slot_index(start_word, 1);
let mut taken = Vec::new();
assert_eq!(
set.take_batch(2, |slot, buffer| taken.push((slot, buffer))),
2
);
let mut slots = taken.iter().map(|(slot, _)| *slot).collect::<Vec<_>>();
slots.sort_unstable();
assert_eq!(slots, vec![slot0, slot1]);
assert_eq!(len(&set), 14);
for (slot, buffer) in taken {
set.put(slot, buffer);
}
}
#[test]
fn test_freelist_take_batch_stops_mid_word_when_limit_is_reached() {
let set = Freelist::new(NZU32!(24), NZUsize!(8), TEST_LAYOUT, true);
let start_word = SlotBitmapProbe::new(set.word_mask, set.word_shift).word_index(0);
let slots = [
set.slot_index(start_word, 0),
set.slot_index(start_word, 1),
set.slot_index(start_word, 2),
];
let mut taken = Vec::new();
assert_eq!(
set.take_batch(2, |slot, buffer| taken.push((slot, buffer))),
2
);
assert_eq!(len(&set), 22);
let remaining = set.take().expect("one slot should remain free");
let mut seen = taken.iter().map(|(slot, _)| *slot).collect::<Vec<_>>();
seen.push(remaining.0);
seen.sort_unstable();
assert_eq!(seen, slots);
for (slot, buffer) in taken {
set.put(slot, buffer);
}
set.put(remaining.0, remaining.1);
}
#[test]
fn test_slot_bitmap_probe_selectors_respect_offset_and_limit() {
let word = (1u64 << 1) | (1u64 << 5) | (1u64 << 9) | (1u64 << 20);
let probe_0 = SlotBitmapProbe {
start_word: 0,
word_mask: 0,
bit_offset: 0,
};
let probe_6 = SlotBitmapProbe {
start_word: 0,
word_mask: 0,
bit_offset: 6,
};
assert_eq!(probe_0.select_set_bit(word), 1);
assert_eq!(probe_6.select_set_bit(word), 9);
let selected = probe_6.select_set_bits(word, 2);
assert_eq!(selected.count_ones(), 2);
assert_eq!(selected & !word, 0);
assert_eq!(selected, (1u64 << 9) | (1u64 << 20));
let probe_32 = SlotBitmapProbe {
start_word: 0,
word_mask: 0,
bit_offset: 32,
};
let wrap_word = (1u64 << 4) | (1u64 << 40);
let selected = probe_32.select_set_bits(wrap_word, 2);
assert_eq!(selected, wrap_word);
}
#[test]
fn test_slot_bitmap_probe_offsets_spread_home_word_collisions() {
assert_eq!(
[
SlotBitmapProbe::bit_offset(0, 3),
SlotBitmapProbe::bit_offset(8, 3),
SlotBitmapProbe::bit_offset(16, 3),
SlotBitmapProbe::bit_offset(24, 3),
SlotBitmapProbe::bit_offset(32, 3),
SlotBitmapProbe::bit_offset(40, 3),
SlotBitmapProbe::bit_offset(48, 3),
SlotBitmapProbe::bit_offset(56, 3),
],
[0, 32, 16, 48, 8, 40, 24, 56]
);
assert_eq!(
[
SlotBitmapProbe::bit_offset(0, 6),
SlotBitmapProbe::bit_offset(64, 6),
SlotBitmapProbe::bit_offset(128, 6),
SlotBitmapProbe::bit_offset(192, 6),
SlotBitmapProbe::bit_offset(256, 6),
SlotBitmapProbe::bit_offset(320, 6),
SlotBitmapProbe::bit_offset(384, 6),
SlotBitmapProbe::bit_offset(448, 6),
],
[0, 32, 16, 48, 8, 40, 24, 56]
);
}
#[test]
fn test_freelist_take_retries_after_losing_a_same_bit_race() {
for _ in 0..32 {
let set = Arc::new(Freelist::new(NZU32!(1), NZUsize!(1), TEST_LAYOUT, false));
let (slot, buffer) = set.try_create(false).unwrap();
assert_eq!(slot, 0);
set.put(slot, buffer);
let barrier = Arc::new(Barrier::new(16));
let successes = Arc::new(StdAtomicUsize::new(0));
let (claimed_tx, claimed_rx) = std::sync::mpsc::channel();
let mut handles = Vec::new();
for _ in 0..16 {
let set = Arc::clone(&set);
let barrier = Arc::clone(&barrier);
let successes = Arc::clone(&successes);
let claimed_tx = claimed_tx.clone();
handles.push(std::thread::spawn(move || {
barrier.wait();
if let Some(entry) = set.take() {
successes.fetch_add(1, AtomicOrdering::Relaxed);
claimed_tx.send(entry).expect("send claimed entry");
}
}));
}
for handle in handles {
handle.join().expect("worker should not panic");
}
assert_eq!(successes.load(AtomicOrdering::Relaxed), 1);
assert_eq!(len(&set), 0);
let claimed = claimed_rx.recv().expect("one thread claimed the slot");
assert!(claimed_rx.try_recv().is_err());
set.put(claimed.0, claimed.1);
}
}
#[test]
fn test_freelist_drop_drains_remaining_buffers() {
let set = Freelist::new(NZU32!(2), NZUsize!(2), TEST_LAYOUT, false);
for expected in [0, 1] {
let (slot, buffer) = set.try_create(false).expect("slot");
assert_eq!(slot, expected);
set.put(slot, buffer);
}
drop(set);
}
}
#[cfg(all(test, feature = "loom"))]
mod loom_tests {
use super::*;
use commonware_utils::{sync::Mutex, NZUsize, NZU32};
use loom::{
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
thread,
};
fn single_word_freelist(capacity: u32) -> Freelist {
Freelist::new(
NZU32!(capacity),
NZUsize!(1),
Layout::from_size_align(64, 64).unwrap(),
false,
)
}
#[derive(Clone, Copy, Debug)]
enum Geometry {
SingleWordSingleBit,
SingleWordMultiBit,
MultiWordSingleBit,
MultiWordMultiBit,
}
impl Geometry {
fn freelist(self) -> Freelist {
match self {
Self::SingleWordSingleBit => single_word_freelist(1),
Self::SingleWordMultiBit => single_word_freelist(2),
Self::MultiWordSingleBit => Freelist::new(
NZU32!(4),
NZUsize!(4),
Layout::from_size_align(64, 64).unwrap(),
false,
),
Self::MultiWordMultiBit => Freelist::new(
NZU32!(4),
NZUsize!(2),
Layout::from_size_align(64, 64).unwrap(),
false,
),
}
}
fn slots(self) -> &'static [u32] {
match self {
Self::SingleWordSingleBit => &[0],
Self::SingleWordMultiBit => &[0, 1],
Self::MultiWordSingleBit => &[0, 1, 2, 3],
Self::MultiWordMultiBit => &[0, 2, 1, 3],
}
}
fn slot_mask(self) -> usize {
self.slots()
.iter()
.fold(0usize, |mask, &slot| mask | (1usize << slot))
}
}
const ALL_GEOMETRIES: [Geometry; 4] = [
Geometry::SingleWordSingleBit,
Geometry::SingleWordMultiBit,
Geometry::MultiWordSingleBit,
Geometry::MultiWordMultiBit,
];
const BATCH_GEOMETRIES: [Geometry; 3] = [
Geometry::SingleWordMultiBit,
Geometry::MultiWordSingleBit,
Geometry::MultiWordMultiBit,
];
const STRIPED_GEOMETRIES: [Geometry; 2] =
[Geometry::MultiWordSingleBit, Geometry::MultiWordMultiBit];
const MULTI_BIT_GEOMETRIES: [Geometry; 2] =
[Geometry::SingleWordMultiBit, Geometry::MultiWordMultiBit];
fn model<F>(geometries: &[Geometry], test: F)
where
F: Fn(Geometry, Arc<Freelist>) + Clone + Send + Sync + 'static,
{
for &geometry in geometries {
let test = test.clone();
loom::model(move || {
test(geometry, Arc::new(geometry.freelist()));
});
}
}
struct Leases {
freelist: Arc<Freelist>,
buffers: Mutex<Vec<(u32, PooledBuffer)>>,
}
impl Leases {
fn new(freelist: Arc<Freelist>) -> Arc<Self> {
Arc::new(Self {
freelist,
buffers: Mutex::new(Vec::new()),
})
}
fn reserve(freelist: Arc<Freelist>) -> (Arc<Self>, Vec<(u32, PooledBuffer)>) {
let entries = Self::entries(&freelist);
(Self::new(freelist), entries)
}
fn entries(freelist: &Freelist) -> Vec<(u32, PooledBuffer)> {
let mut entries = Vec::new();
while let Some(entry) = freelist.try_create(false) {
entries.push(entry);
}
entries
}
fn push(&self, slot: u32, buffer: PooledBuffer) {
self.buffers.lock().push((slot, buffer));
}
fn push_expected(
&self,
seen: &AtomicUsize,
expected: usize,
slot: u32,
buffer: PooledBuffer,
) {
let mask = 1usize << slot;
assert_ne!(expected & mask, 0);
let previous = seen.fetch_or(mask, Ordering::Relaxed);
assert_eq!(previous & mask, 0);
self.push(slot, buffer);
}
}
impl Drop for Leases {
fn drop(&mut self) {
for (slot, buffer) in self.buffers.lock().drain(..) {
self.freelist.put(slot, buffer);
}
}
}
#[test]
fn put_publishes_before_take() {
model(&ALL_GEOMETRIES, |geometry, freelist| {
let slot = geometry.slots()[0];
let (_, buffer) = freelist.try_create(false).unwrap();
let leases = Leases::new(freelist.clone());
let writer = thread::spawn({
let freelist = freelist.clone();
move || freelist.put(slot, buffer)
});
let reader = thread::spawn({
let freelist = freelist.clone();
let leases = leases.clone();
move || loop {
if let Some((taken, buffer)) = freelist.take() {
assert_eq!(taken, slot);
leases.push(taken, buffer);
break;
}
thread::yield_now();
}
});
writer.join().unwrap();
reader.join().unwrap();
});
}
#[test]
fn concurrent_puts_merge_disjoint_bits() {
loom::model(|| {
let freelist = Arc::new(single_word_freelist(2));
let seen = Arc::new(AtomicUsize::new(0));
let expected = 0b11;
let (leases, mut entries) = Leases::reserve(freelist.clone());
let (slot0, buffer0) = entries.pop().unwrap();
let (slot1, buffer1) = entries.pop().unwrap();
let first = thread::spawn({
let freelist = freelist.clone();
move || freelist.put(slot0, buffer0)
});
let second = thread::spawn({
let freelist = freelist.clone();
move || freelist.put(slot1, buffer1)
});
first.join().unwrap();
second.join().unwrap();
assert_eq!(
freelist.take_batch(2, |slot, buffer| {
leases.push_expected(&seen, expected, slot, buffer)
}),
2
);
assert_eq!(seen.load(Ordering::Relaxed), expected);
assert_eq!(freelist.drain(), 0);
});
}
#[test]
fn concurrent_put_batches_merge_disjoint_bits() {
loom::model(|| {
let freelist = Arc::new(single_word_freelist(4));
let seen = Arc::new(AtomicUsize::new(0));
let expected = 0b1111;
let (leases, mut entries) = Leases::reserve(freelist.clone());
let second_entries = entries.split_off(2);
let first = thread::spawn({
let freelist = freelist.clone();
move || freelist.put_batch(entries)
});
let second = thread::spawn({
let freelist = freelist.clone();
move || freelist.put_batch(second_entries)
});
first.join().unwrap();
second.join().unwrap();
assert_eq!(
freelist.take_batch(4, |slot, buffer| {
leases.push_expected(&seen, expected, slot, buffer)
}),
4
);
assert_eq!(seen.load(Ordering::Relaxed), expected);
assert_eq!(freelist.drain(), 0);
});
}
#[test]
fn put_and_take_compose_on_partially_free_word() {
loom::model(|| {
let freelist = Arc::new(single_word_freelist(2));
let (leases, mut entries) = Leases::reserve(freelist.clone());
let initial_entry = entries.pop().unwrap();
let writer_entry = entries.pop().unwrap();
assert!(entries.pop().is_none());
freelist.put(initial_entry.0, initial_entry.1);
let seen = Arc::new(AtomicUsize::new(0));
let expected = 0b11;
let writer = thread::spawn({
let freelist = freelist.clone();
move || freelist.put(writer_entry.0, writer_entry.1)
});
let taker = thread::spawn({
let freelist = freelist.clone();
let seen = seen.clone();
let leases = leases.clone();
move || {
let (slot, buffer) = freelist.take().expect("slot 0 starts free");
leases.push_expected(&seen, expected, slot, buffer);
}
});
writer.join().unwrap();
taker.join().unwrap();
while let Some((slot, buffer)) = freelist.take() {
leases.push_expected(&seen, expected, slot, buffer);
}
assert_eq!(seen.load(Ordering::Relaxed), expected);
assert_eq!(freelist.drain(), 0);
});
}
#[test]
fn put_and_take_batch_compose_on_partially_free_word() {
loom::model(|| {
let freelist = Arc::new(single_word_freelist(2));
let (leases, mut entries) = Leases::reserve(freelist.clone());
let initial_entry = entries.pop().unwrap();
let writer_entry = entries.pop().unwrap();
assert!(entries.pop().is_none());
freelist.put(initial_entry.0, initial_entry.1);
let seen = Arc::new(AtomicUsize::new(0));
let expected = 0b11;
let writer = thread::spawn({
let freelist = freelist.clone();
move || freelist.put(writer_entry.0, writer_entry.1)
});
let batch_taker = thread::spawn({
let freelist = freelist.clone();
let seen = seen.clone();
let leases = leases.clone();
move || {
let count = freelist.take_batch(2, |slot, buffer| {
leases.push_expected(&seen, expected, slot, buffer);
});
assert!((1..=2).contains(&count));
}
});
writer.join().unwrap();
batch_taker.join().unwrap();
while let Some((slot, buffer)) = freelist.take() {
leases.push_expected(&seen, expected, slot, buffer);
}
assert_eq!(seen.load(Ordering::Relaxed), expected);
assert_eq!(freelist.drain(), 0);
});
}
#[test]
fn put_batch_and_drain_compose_on_partially_free_word() {
loom::model(|| {
let freelist = Arc::new(single_word_freelist(3));
let mut entries = Leases::entries(&freelist);
let writer_entry0 = entries.pop().unwrap();
let writer_entry1 = entries.pop().unwrap();
let initial_entry = entries.pop().unwrap();
assert!(entries.pop().is_none());
freelist.put(initial_entry.0, initial_entry.1);
let drained = Arc::new(AtomicUsize::new(0));
let writer = thread::spawn({
let freelist = freelist.clone();
move || freelist.put_batch([writer_entry0, writer_entry1])
});
let drainer = thread::spawn({
let freelist = freelist.clone();
let drained = drained.clone();
move || {
let count = freelist.drain();
assert!(matches!(count, 1 | 3));
drained.store(count, Ordering::Relaxed);
}
});
writer.join().unwrap();
drainer.join().unwrap();
let total = drained.load(Ordering::Relaxed) + freelist.drain();
assert_eq!(total, 3);
assert_eq!(freelist.drain(), 0);
});
}
#[test]
fn two_takers_cannot_claim_one_slot() {
loom::model(|| {
let freelist = Arc::new(single_word_freelist(2));
let (_, buffer) = freelist.try_create(false).unwrap();
freelist.put(0, buffer);
let seen = Arc::new(AtomicUsize::new(0));
let expected = 0b1;
let mut handles = Vec::new();
let leases = Leases::new(freelist.clone());
for _ in 0..2 {
handles.push(thread::spawn({
let freelist = freelist.clone();
let seen = seen.clone();
let leases = leases.clone();
move || {
if let Some((slot, buffer)) = freelist.take() {
leases.push_expected(&seen, expected, slot, buffer);
}
}
}));
}
for handle in handles {
handle.join().unwrap();
}
assert_eq!(seen.load(Ordering::Relaxed), expected);
});
}
#[test]
fn stale_candidate_can_claim_republished_same_slot() {
loom::model(|| {
let freelist = Arc::new(single_word_freelist(1));
let (leases, mut entries) = Leases::reserve(freelist.clone());
let entry = entries.pop().unwrap();
assert!(entries.pop().is_none());
freelist.put(entry.0, entry.1);
let transfers = Arc::new(AtomicUsize::new(0));
let mut handles = Vec::new();
for _ in 0..2 {
handles.push(thread::spawn({
let freelist = freelist.clone();
let transfers = transfers.clone();
let leases = leases.clone();
move || loop {
if let Some((slot, buffer)) = freelist.take() {
assert_eq!(slot, 0);
let transfer = transfers.fetch_add(1, Ordering::Relaxed) + 1;
if transfer == 1 {
freelist.put(slot, buffer);
} else {
leases.push(slot, buffer);
}
break;
}
thread::yield_now();
}
}));
}
for handle in handles {
handle.join().unwrap();
}
assert_eq!(transfers.load(Ordering::Relaxed), 2);
assert_eq!(freelist.drain(), 0);
});
}
#[test]
fn batch_claims_survive_intervening_rmw_sequence() {
loom::model(|| {
let freelist = Arc::new(single_word_freelist(2));
let (leases, entries) = Leases::reserve(freelist.clone());
freelist.put_batch(entries);
let seen = Arc::new(AtomicUsize::new(0));
let expected = 0b11;
let mut handles = Vec::new();
for _ in 0..2 {
handles.push(thread::spawn({
let freelist = freelist.clone();
let seen = seen.clone();
let leases = leases.clone();
move || {
if let Some((slot, buffer)) = freelist.take() {
leases.push_expected(&seen, expected, slot, buffer);
}
}
}));
}
for handle in handles {
handle.join().unwrap();
}
assert_eq!(seen.load(Ordering::Relaxed), expected);
});
}
#[test]
fn take_and_take_batch_do_not_duplicate_slots() {
model(&BATCH_GEOMETRIES, |geometry, freelist| {
let slots = geometry.slots();
let expected = geometry.slot_mask();
let (leases, entries) = Leases::reserve(freelist.clone());
freelist.put_batch(entries);
let seen = Arc::new(AtomicUsize::new(0));
let batch_count = Arc::new(AtomicUsize::new(0));
let batch_callbacks = Arc::new(AtomicUsize::new(0));
let batch_taker = thread::spawn({
let freelist = freelist.clone();
let seen = seen.clone();
let batch_count = batch_count.clone();
let batch_callbacks = batch_callbacks.clone();
let leases = leases.clone();
move || {
let count = freelist.take_batch(slots.len(), |slot, buffer| {
batch_callbacks.fetch_add(1, Ordering::Relaxed);
leases.push_expected(&seen, expected, slot, buffer);
});
batch_count.store(count, Ordering::Relaxed);
}
});
let single_taker = thread::spawn({
let freelist = freelist.clone();
let seen = seen.clone();
let leases = leases.clone();
move || {
if let Some((slot, buffer)) = freelist.take() {
leases.push_expected(&seen, expected, slot, buffer);
}
}
});
batch_taker.join().unwrap();
single_taker.join().unwrap();
assert_eq!(seen.load(Ordering::Relaxed), expected);
assert!(batch_count.load(Ordering::Relaxed) <= slots.len());
assert_eq!(
batch_count.load(Ordering::Relaxed),
batch_callbacks.load(Ordering::Relaxed)
);
});
}
#[test]
fn two_take_batches_do_not_duplicate_slots() {
model(&MULTI_BIT_GEOMETRIES, |geometry, freelist| {
let slots = geometry.slots();
let expected = geometry.slot_mask();
let (leases, entries) = Leases::reserve(freelist.clone());
freelist.put_batch(entries);
let seen = Arc::new(AtomicUsize::new(0));
let total = Arc::new(AtomicUsize::new(0));
let mut handles = Vec::new();
for _ in 0..2 {
handles.push(thread::spawn({
let freelist = freelist.clone();
let seen = seen.clone();
let total = total.clone();
let leases = leases.clone();
move || {
let count = freelist.take_batch(slots.len(), |slot, buffer| {
leases.push_expected(&seen, expected, slot, buffer);
});
total.fetch_add(count, Ordering::Relaxed);
}
}));
}
for handle in handles {
handle.join().unwrap();
}
assert_eq!(seen.load(Ordering::Relaxed), expected);
assert_eq!(total.load(Ordering::Relaxed), slots.len());
});
}
#[test]
fn two_take_batches_continue_after_losing_selected_bits() {
loom::model(|| {
let freelist = Arc::new(single_word_freelist(3));
let (leases, entries) = Leases::reserve(freelist.clone());
freelist.put_batch(entries);
let seen = Arc::new(AtomicUsize::new(0));
let total = Arc::new(AtomicUsize::new(0));
let expected = 0b111;
let mut handles = Vec::new();
for _ in 0..2 {
handles.push(thread::spawn({
let freelist = freelist.clone();
let seen = seen.clone();
let total = total.clone();
let leases = leases.clone();
move || {
let count = freelist.take_batch(2, |slot, buffer| {
leases.push_expected(&seen, expected, slot, buffer);
});
total.fetch_add(count, Ordering::Relaxed);
}
}));
}
for handle in handles {
handle.join().unwrap();
}
assert_eq!(seen.load(Ordering::Relaxed), expected);
assert_eq!(total.load(Ordering::Relaxed), 3);
assert_eq!(freelist.drain(), 0);
});
}
#[test]
fn put_batch_publishes_to_take_batch() {
model(&BATCH_GEOMETRIES, |geometry, freelist| {
let seen = Arc::new(AtomicUsize::new(0));
let slots = geometry.slots();
let expected = geometry.slot_mask();
let (leases, entries) = Leases::reserve(freelist.clone());
let writer = thread::spawn({
let freelist = freelist.clone();
move || freelist.put_batch(entries)
});
let reader = thread::spawn({
let freelist = freelist.clone();
let seen = seen.clone();
let leases = leases.clone();
move || {
while seen.load(Ordering::Relaxed) != expected {
let claimed = freelist.take_batch(slots.len(), |slot, buffer| {
leases.push_expected(&seen, expected, slot, buffer);
});
if claimed == 0 {
thread::yield_now();
}
}
}
});
writer.join().unwrap();
reader.join().unwrap();
assert_eq!(seen.load(Ordering::Relaxed), expected);
});
}
#[test]
fn put_publishes_to_drain() {
model(&ALL_GEOMETRIES, |geometry, freelist| {
let drained = Arc::new(AtomicUsize::new(0));
let slot = geometry.slots()[0];
let (_, buffer) = freelist.try_create(false).unwrap();
let writer = thread::spawn({
let freelist = freelist.clone();
move || freelist.put(slot, buffer)
});
let drainer = thread::spawn({
let freelist = freelist.clone();
let drained = drained.clone();
move || {
while drained.load(Ordering::Relaxed) == 0 {
let count = freelist.drain();
if count == 0 {
thread::yield_now();
} else {
assert_eq!(count, 1);
drained.store(count, Ordering::Relaxed);
}
}
}
});
writer.join().unwrap();
drainer.join().unwrap();
assert_eq!(drained.load(Ordering::Relaxed), 1);
assert_eq!(freelist.drain(), 0);
});
}
#[test]
fn put_batch_publishes_to_drain() {
model(&BATCH_GEOMETRIES, |geometry, freelist| {
let drained = Arc::new(AtomicUsize::new(0));
let slots = geometry.slots();
let expected = slots.len();
let entries = Leases::entries(&freelist);
let writer = thread::spawn({
let freelist = freelist.clone();
move || freelist.put_batch(entries)
});
let drainer = thread::spawn({
let freelist = freelist.clone();
let drained = drained.clone();
move || {
while drained.load(Ordering::Relaxed) < expected {
let count = freelist.drain();
if count == 0 {
thread::yield_now();
} else {
let previous = drained.fetch_add(count, Ordering::Relaxed);
assert!(previous + count <= expected);
}
}
}
});
writer.join().unwrap();
drainer.join().unwrap();
assert_eq!(drained.load(Ordering::Relaxed), expected);
assert_eq!(freelist.drain(), 0);
});
}
#[test]
fn puts_and_take_scan_across_stripes() {
model(&STRIPED_GEOMETRIES, |geometry, freelist| {
let seen = Arc::new(AtomicUsize::new(0));
let expected = geometry.slot_mask();
let (leases, entries) = Leases::reserve(freelist.clone());
let writer = thread::spawn({
let freelist = freelist.clone();
move || {
for (slot, buffer) in entries {
freelist.put(slot, buffer);
}
}
});
let reader = thread::spawn({
let freelist = freelist.clone();
let seen = seen.clone();
let leases = leases.clone();
move || {
while seen.load(Ordering::Relaxed) != expected {
if let Some((slot, buffer)) = freelist.take() {
leases.push_expected(&seen, expected, slot, buffer);
} else {
thread::yield_now();
}
}
}
});
writer.join().unwrap();
reader.join().unwrap();
assert_eq!(seen.load(Ordering::Relaxed), expected);
assert_eq!(freelist.drain(), 0);
});
}
#[test]
fn drain_and_take_do_not_duplicate_or_lose_slots() {
model(&BATCH_GEOMETRIES, |geometry, freelist| {
let slots = geometry.slots();
let expected = slots.len();
let expected_mask = geometry.slot_mask();
let (leases, entries) = Leases::reserve(freelist.clone());
freelist.put_batch(entries);
let drained = Arc::new(AtomicUsize::new(0));
let taken = Arc::new(AtomicUsize::new(0));
let drainer = thread::spawn({
let freelist = freelist.clone();
let drained = drained.clone();
move || {
drained.store(freelist.drain(), Ordering::Relaxed);
}
});
let taker = thread::spawn({
let freelist = freelist.clone();
let taken = taken.clone();
let leases = leases.clone();
move || {
if let Some((slot, buffer)) = freelist.take() {
leases.push_expected(&taken, expected_mask, slot, buffer);
}
}
});
drainer.join().unwrap();
taker.join().unwrap();
assert_eq!(freelist.drain(), 0);
assert_eq!(
drained.load(Ordering::Relaxed)
+ taken.load(Ordering::Relaxed).count_ones() as usize,
expected
);
});
}
#[test]
fn two_drains_do_not_duplicate_or_lose_slots() {
model(&BATCH_GEOMETRIES, |geometry, freelist| {
let slots = geometry.slots();
let expected = slots.len();
let entries = Leases::entries(&freelist);
freelist.put_batch(entries);
let total = Arc::new(AtomicUsize::new(0));
let mut handles = Vec::new();
for _ in 0..2 {
handles.push(thread::spawn({
let freelist = freelist.clone();
let total = total.clone();
move || {
total.fetch_add(freelist.drain(), Ordering::Relaxed);
}
}));
}
for handle in handles {
handle.join().unwrap();
}
assert_eq!(total.load(Ordering::Relaxed), expected);
assert_eq!(freelist.drain(), 0);
});
}
#[test]
fn drain_and_take_batch_do_not_duplicate_or_lose_slots() {
model(&BATCH_GEOMETRIES, |geometry, freelist| {
let slots = geometry.slots();
let expected = slots.len();
let expected_mask = geometry.slot_mask();
let (leases, entries) = Leases::reserve(freelist.clone());
freelist.put_batch(entries);
let drained = Arc::new(AtomicUsize::new(0));
let taken = Arc::new(AtomicUsize::new(0));
let taken_slots = Arc::new(AtomicUsize::new(0));
let drainer = thread::spawn({
let freelist = freelist.clone();
let drained = drained.clone();
move || {
drained.store(freelist.drain(), Ordering::Relaxed);
}
});
let batch_taker = thread::spawn({
let freelist = freelist.clone();
let taken = taken.clone();
let taken_slots = taken_slots.clone();
let leases = leases.clone();
move || {
let count = freelist.take_batch(expected, |slot, buffer| {
leases.push_expected(&taken_slots, expected_mask, slot, buffer);
});
taken.store(count, Ordering::Relaxed);
}
});
drainer.join().unwrap();
batch_taker.join().unwrap();
assert_eq!(freelist.drain(), 0);
assert_eq!(
taken.load(Ordering::Relaxed),
taken_slots.load(Ordering::Relaxed).count_ones() as usize
);
assert_eq!(
drained.load(Ordering::Relaxed) + taken.load(Ordering::Relaxed),
expected
);
});
}
}