use crossbeam_epoch::{self as epoch, Atomic, Owned};
use crossbeam_utils::{Backoff, CachePadded};
use std::cell::{Cell, UnsafeCell};
use std::iter::FromIterator;
use std::marker::PhantomData;
use std::mem::{self, ManuallyDrop};
use std::sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering};
use std::sync::Arc;
use std::{cmp, fmt, ptr};
const MIN_CAP: usize = 64;
const MAX_BATCH: usize = 32;
const FLUSH_THRESHOLD_BYTES: usize = 1 << 10;
struct Buffer<T> {
ptr: *mut T,
cap: usize,
}
unsafe impl<T> Send for Buffer<T> {}
impl<T> Buffer<T> {
fn alloc(cap: usize) -> Buffer<T> {
debug_assert_eq!(cap, cap.next_power_of_two());
let mut v = Vec::with_capacity(cap);
let ptr = v.as_mut_ptr();
mem::forget(v);
Buffer { ptr, cap }
}
unsafe fn dealloc(self) {
drop(Vec::from_raw_parts(self.ptr, 0, self.cap));
}
unsafe fn at(&self, index: isize) -> *mut T {
self.ptr.offset(index & (self.cap - 1) as isize)
}
unsafe fn write(&self, index: isize, task: T) {
ptr::write_volatile(self.at(index), task)
}
unsafe fn read(&self, index: isize) -> T {
ptr::read_volatile(self.at(index))
}
}
impl<T> Clone for Buffer<T> {
fn clone(&self) -> Buffer<T> {
Buffer {
ptr: self.ptr,
cap: self.cap,
}
}
}
impl<T> Copy for Buffer<T> {}
struct Inner<T> {
front: AtomicIsize,
back: AtomicIsize,
buffer: CachePadded<Atomic<Buffer<T>>>,
}
impl<T> Drop for Inner<T> {
fn drop(&mut self) {
let b = self.back.load(Ordering::Relaxed);
let f = self.front.load(Ordering::Relaxed);
unsafe {
let buffer = self.buffer.load(Ordering::Relaxed, epoch::unprotected());
let mut i = f;
while i != b {
ptr::drop_in_place(buffer.deref().at(i));
i = i.wrapping_add(1);
}
buffer.into_owned().into_box().dealloc();
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum Flavor {
Fifo,
Lifo,
}
pub struct Worker<T> {
inner: Arc<CachePadded<Inner<T>>>,
buffer: Cell<Buffer<T>>,
flavor: Flavor,
_marker: PhantomData<*mut ()>, }
unsafe impl<T: Send> Send for Worker<T> {}
impl<T> Worker<T> {
pub fn new_fifo() -> Worker<T> {
let buffer = Buffer::alloc(MIN_CAP);
let inner = Arc::new(CachePadded::new(Inner {
front: AtomicIsize::new(0),
back: AtomicIsize::new(0),
buffer: CachePadded::new(Atomic::new(buffer)),
}));
Worker {
inner,
buffer: Cell::new(buffer),
flavor: Flavor::Fifo,
_marker: PhantomData,
}
}
pub fn new_lifo() -> Worker<T> {
let buffer = Buffer::alloc(MIN_CAP);
let inner = Arc::new(CachePadded::new(Inner {
front: AtomicIsize::new(0),
back: AtomicIsize::new(0),
buffer: CachePadded::new(Atomic::new(buffer)),
}));
Worker {
inner,
buffer: Cell::new(buffer),
flavor: Flavor::Lifo,
_marker: PhantomData,
}
}
pub fn worker_run_queue_size(&self) -> usize {
let b = self.inner.back.load(Ordering::Relaxed);
let f = self.inner.front.load(Ordering::SeqCst);
match b.wrapping_sub(f) {
x if x <= 0 => 0_usize,
y => y as usize,
}
}
pub fn stealer(&self) -> Stealer<T> {
Stealer {
inner: self.inner.clone(),
flavor: self.flavor,
}
}
#[cold]
unsafe fn resize(&self, new_cap: usize) {
let b = self.inner.back.load(Ordering::Relaxed);
let f = self.inner.front.load(Ordering::Relaxed);
let buffer = self.buffer.get();
let new = Buffer::alloc(new_cap);
let mut i = f;
while i != b {
ptr::copy_nonoverlapping(buffer.at(i), new.at(i), 1);
i = i.wrapping_add(1);
}
let guard = &epoch::pin();
self.buffer.replace(new);
let old =
self.inner
.buffer
.swap(Owned::new(new).into_shared(guard), Ordering::Release, guard);
guard.defer_unchecked(move || old.into_owned().into_box().dealloc());
if mem::size_of::<T>() * new_cap >= FLUSH_THRESHOLD_BYTES {
guard.flush();
}
}
fn reserve(&self, reserve_cap: usize) {
if reserve_cap > 0 {
let b = self.inner.back.load(Ordering::Relaxed);
let f = self.inner.front.load(Ordering::SeqCst);
let len = b.wrapping_sub(f) as usize;
let cap = self.buffer.get().cap;
if cap.saturating_sub(len) < reserve_cap {
let mut new_cap = cap * 2;
while new_cap.saturating_sub(len) < reserve_cap {
new_cap = new_cap.wrapping_mul(2);
}
unsafe {
self.resize(new_cap);
}
}
}
}
pub fn is_empty(&self) -> bool {
let b = self.inner.back.load(Ordering::Relaxed);
let f = self.inner.front.load(Ordering::SeqCst);
b.wrapping_sub(f) <= 0
}
pub fn push(&self, task: T) {
let b = self.inner.back.load(Ordering::Relaxed);
let f = self.inner.front.load(Ordering::Acquire);
let mut buffer = self.buffer.get();
let len = b.wrapping_sub(f);
if len >= buffer.cap as isize {
unsafe {
self.resize(2 * buffer.cap);
}
buffer = self.buffer.get();
}
unsafe {
buffer.write(b, task);
}
atomic::fence(Ordering::Release);
self.inner.back.store(b.wrapping_add(1), Ordering::Release);
}
pub fn pop(&self) -> Option<T> {
let b = self.inner.back.load(Ordering::Relaxed);
let f = self.inner.front.load(Ordering::Relaxed);
let len = b.wrapping_sub(f);
if len <= 0 {
return None;
}
match self.flavor {
Flavor::Fifo => {
let f = self.inner.front.fetch_add(1, Ordering::SeqCst);
let new_f = f.wrapping_add(1);
if b.wrapping_sub(new_f) < 0 {
self.inner.front.store(f, Ordering::Relaxed);
return None;
}
unsafe {
let buffer = self.buffer.get();
let task = buffer.read(f);
if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 {
self.resize(buffer.cap / 2);
}
Some(task)
}
}
Flavor::Lifo => {
let b = b.wrapping_sub(1);
self.inner.back.store(b, Ordering::Relaxed);
atomic::fence(Ordering::SeqCst);
let f = self.inner.front.load(Ordering::Relaxed);
let len = b.wrapping_sub(f);
if len < 0 {
self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
None
} else {
let buffer = self.buffer.get();
let mut task = unsafe { Some(buffer.read(b)) };
if len == 0 {
if self
.inner
.front
.compare_exchange(
f,
f.wrapping_add(1),
Ordering::SeqCst,
Ordering::Relaxed,
)
.is_err()
{
mem::forget(task.take());
}
self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
} else {
if buffer.cap > MIN_CAP && len < buffer.cap as isize / 4 {
unsafe {
self.resize(buffer.cap / 2);
}
}
}
task
}
}
}
}
}
impl<T> fmt::Debug for Worker<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("Worker { .. }")
}
}
pub struct Stealer<T> {
inner: Arc<CachePadded<Inner<T>>>,
flavor: Flavor,
}
unsafe impl<T: Send> Send for Stealer<T> {}
unsafe impl<T: Send> Sync for Stealer<T> {}
impl<T> Stealer<T> {
pub fn is_empty(&self) -> bool {
let f = self.inner.front.load(Ordering::Acquire);
atomic::fence(Ordering::SeqCst);
let b = self.inner.back.load(Ordering::Acquire);
b.wrapping_sub(f) <= 0
}
pub fn run_queue_size(&self) -> usize {
let b = self.inner.back.load(Ordering::Acquire);
atomic::fence(Ordering::SeqCst);
let f = self.inner.front.load(Ordering::Acquire);
match b.wrapping_sub(f) {
x if x <= 0 => 0_usize,
y => y as usize,
}
}
pub fn steal(&self) -> Steal<T> {
let f = self.inner.front.load(Ordering::Acquire);
if epoch::is_pinned() {
atomic::fence(Ordering::SeqCst);
}
let guard = &epoch::pin();
let b = self.inner.back.load(Ordering::Acquire);
if b.wrapping_sub(f) <= 0 {
return Steal::Empty;
}
let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
let task = unsafe { buffer.deref().read(f) };
if self
.inner
.front
.compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
.is_err()
{
mem::forget(task);
return Steal::Retry;
}
Steal::Success(task)
}
pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
let mut f = self.inner.front.load(Ordering::Acquire);
if epoch::is_pinned() {
atomic::fence(Ordering::SeqCst);
}
let guard = &epoch::pin();
let b = self.inner.back.load(Ordering::Acquire);
let len = b.wrapping_sub(f);
if len <= 0 {
return Steal::Empty;
}
let batch_size = cmp::min((len as usize + 1) / 2, MAX_BATCH);
dest.reserve(batch_size);
let mut batch_size = batch_size as isize;
let dest_buffer = dest.buffer.get();
let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
match self.flavor {
Flavor::Fifo => {
match dest.flavor {
Flavor::Fifo => {
for i in 0..batch_size {
unsafe {
let task = buffer.deref().read(f.wrapping_add(i));
dest_buffer.write(dest_b.wrapping_add(i), task);
}
}
}
Flavor::Lifo => {
for i in 0..batch_size {
unsafe {
let task = buffer.deref().read(f.wrapping_add(i));
dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
}
}
}
}
if self
.inner
.front
.compare_exchange(
f,
f.wrapping_add(batch_size),
Ordering::SeqCst,
Ordering::Relaxed,
)
.is_err()
{
return Steal::Retry;
}
dest_b = dest_b.wrapping_add(batch_size);
}
Flavor::Lifo => {
let size = batch_size;
for i in 0..size {
if i > 0 {
atomic::fence(Ordering::SeqCst);
let b = self.inner.back.load(Ordering::Acquire);
if b.wrapping_sub(f) <= 0 {
batch_size = i;
break;
}
}
let task = unsafe { buffer.deref().read(f) };
if self
.inner
.front
.compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
.is_err()
{
mem::forget(task);
batch_size = i;
break;
}
unsafe {
dest_buffer.write(dest_b, task);
}
f = f.wrapping_add(1);
dest_b = dest_b.wrapping_add(1);
}
if batch_size == 0 {
return Steal::Retry;
}
if dest.flavor == Flavor::Fifo {
for i in 0..batch_size / 2 {
unsafe {
let i1 = dest_b.wrapping_sub(batch_size - i);
let i2 = dest_b.wrapping_sub(i + 1);
let t1 = dest_buffer.read(i1);
let t2 = dest_buffer.read(i2);
dest_buffer.write(i1, t2);
dest_buffer.write(i2, t1);
}
}
}
}
}
atomic::fence(Ordering::Release);
dest.inner.back.store(dest_b, Ordering::Release);
Steal::Success(())
}
pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
let mut f = self.inner.front.load(Ordering::Acquire);
if epoch::is_pinned() {
atomic::fence(Ordering::SeqCst);
}
let guard = &epoch::pin();
let b = self.inner.back.load(Ordering::Acquire);
let len = b.wrapping_sub(f);
if len <= 0 {
return Steal::Empty;
}
let batch_size = cmp::min((len as usize - 1) / 2, MAX_BATCH - 1);
dest.reserve(batch_size);
let mut batch_size = batch_size as isize;
let dest_buffer = dest.buffer.get();
let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
let mut task = unsafe { buffer.deref().read(f) };
match self.flavor {
Flavor::Fifo => {
match dest.flavor {
Flavor::Fifo => {
for i in 0..batch_size {
unsafe {
let task = buffer.deref().read(f.wrapping_add(i + 1));
dest_buffer.write(dest_b.wrapping_add(i), task);
}
}
}
Flavor::Lifo => {
for i in 0..batch_size {
unsafe {
let task = buffer.deref().read(f.wrapping_add(i + 1));
dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
}
}
}
}
if self
.inner
.front
.compare_exchange(
f,
f.wrapping_add(batch_size + 1),
Ordering::SeqCst,
Ordering::Relaxed,
)
.is_err()
{
mem::forget(task);
return Steal::Retry;
}
dest_b = dest_b.wrapping_add(batch_size);
}
Flavor::Lifo => {
if self
.inner
.front
.compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
.is_err()
{
mem::forget(task);
return Steal::Retry;
}
f = f.wrapping_add(1);
let size = batch_size;
for i in 0..size {
atomic::fence(Ordering::SeqCst);
let b = self.inner.back.load(Ordering::Acquire);
if b.wrapping_sub(f) <= 0 {
batch_size = i;
break;
}
let tmp = unsafe { buffer.deref().read(f) };
if self
.inner
.front
.compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
.is_err()
{
mem::forget(tmp);
batch_size = i;
break;
}
unsafe {
dest_buffer.write(dest_b, mem::replace(&mut task, tmp));
}
f = f.wrapping_add(1);
dest_b = dest_b.wrapping_add(1);
}
if dest.flavor == Flavor::Fifo {
for i in 0..batch_size / 2 {
unsafe {
let i1 = dest_b.wrapping_sub(batch_size - i);
let i2 = dest_b.wrapping_sub(i + 1);
let t1 = dest_buffer.read(i1);
let t2 = dest_buffer.read(i2);
dest_buffer.write(i1, t2);
dest_buffer.write(i2, t1);
}
}
}
}
}
atomic::fence(Ordering::Release);
dest.inner.back.store(dest_b, Ordering::Release);
Steal::Success(task)
}
pub fn steal_batch_and_pop_with_amount(&self, dest: &Worker<T>, amount: usize) -> Steal<T> {
let mut f = self.inner.front.load(Ordering::Acquire);
if epoch::is_pinned() {
atomic::fence(Ordering::SeqCst);
}
let guard = &epoch::pin();
let mut amount = amount;
let b = self.inner.back.load(Ordering::Acquire);
let len = b.wrapping_sub(f);
if len <= 0 {
return Steal::Empty;
}
let default_time_ahead = (len as usize - 1) / 2;
if amount > default_time_ahead as usize {
amount = default_time_ahead;
}
let batch_size = cmp::min(amount, MAX_BATCH - 1);
dest.reserve(batch_size);
let mut batch_size = batch_size as isize;
let dest_buffer = dest.buffer.get();
let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
let mut task = unsafe { buffer.deref().read(f) };
match self.flavor {
Flavor::Fifo => {
match dest.flavor {
Flavor::Fifo => {
for i in 0..batch_size {
unsafe {
let task = buffer.deref().read(f.wrapping_add(i + 1));
dest_buffer.write(dest_b.wrapping_add(i), task);
}
}
}
Flavor::Lifo => {
for i in 0..batch_size {
unsafe {
let task = buffer.deref().read(f.wrapping_add(i + 1));
dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
}
}
}
}
if self
.inner
.front
.compare_exchange(
f,
f.wrapping_add(batch_size + 1),
Ordering::SeqCst,
Ordering::Relaxed,
)
.is_err()
{
mem::forget(task);
return Steal::Retry;
}
dest_b = dest_b.wrapping_add(batch_size);
}
Flavor::Lifo => {
if self
.inner
.front
.compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
.is_err()
{
mem::forget(task);
return Steal::Retry;
}
f = f.wrapping_add(1);
let size = batch_size;
for i in 0..size {
atomic::fence(Ordering::SeqCst);
let b = self.inner.back.load(Ordering::Acquire);
if b.wrapping_sub(f) <= 0 {
batch_size = i;
break;
}
let tmp = unsafe { buffer.deref().read(f) };
if self
.inner
.front
.compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
.is_err()
{
mem::forget(tmp);
batch_size = i;
break;
}
unsafe {
dest_buffer.write(dest_b, mem::replace(&mut task, tmp));
}
f = f.wrapping_add(1);
dest_b = dest_b.wrapping_add(1);
}
if dest.flavor == Flavor::Fifo {
for i in 0..batch_size / 2 {
unsafe {
let i1 = dest_b.wrapping_sub(batch_size - i);
let i2 = dest_b.wrapping_sub(i + 1);
let t1 = dest_buffer.read(i1);
let t2 = dest_buffer.read(i2);
dest_buffer.write(i1, t2);
dest_buffer.write(i2, t1);
}
}
}
}
}
atomic::fence(Ordering::Release);
dest.inner.back.store(dest_b, Ordering::Release);
Steal::Success(task)
}
}
impl<T> Clone for Stealer<T> {
fn clone(&self) -> Stealer<T> {
Stealer {
inner: self.inner.clone(),
flavor: self.flavor,
}
}
}
impl<T> fmt::Debug for Stealer<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("Stealer { .. }")
}
}
const WRITE: usize = 1;
const READ: usize = 2;
const DESTROY: usize = 4;
const LAP: usize = 64;
const BLOCK_CAP: usize = LAP - 1;
const SHIFT: usize = 1;
const HAS_NEXT: usize = 1;
struct Slot<T> {
task: UnsafeCell<ManuallyDrop<T>>,
state: AtomicUsize,
}
impl<T> Slot<T> {
fn wait_write(&self) {
let backoff = Backoff::new();
while self.state.load(Ordering::Acquire) & WRITE == 0 {
backoff.snooze();
}
}
}
struct Block<T> {
next: AtomicPtr<Block<T>>,
slots: [Slot<T>; BLOCK_CAP],
}
impl<T> Block<T> {
fn new() -> Block<T> {
unsafe { mem::zeroed() }
}
fn wait_next(&self) -> *mut Block<T> {
let backoff = Backoff::new();
loop {
let next = self.next.load(Ordering::Acquire);
if !next.is_null() {
return next;
}
backoff.snooze();
}
}
unsafe fn destroy(this: *mut Block<T>, count: usize) {
for i in (0..count).rev() {
let slot = (*this).slots.get_unchecked(i);
if slot.state.load(Ordering::Acquire) & READ == 0
&& slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
{
return;
}
}
drop(Box::from_raw(this));
}
}
struct Position<T> {
index: AtomicUsize,
block: AtomicPtr<Block<T>>,
}
pub struct Injector<T> {
head: CachePadded<Position<T>>,
tail: CachePadded<Position<T>>,
_marker: PhantomData<T>,
}
unsafe impl<T: Send> Send for Injector<T> {}
unsafe impl<T: Send> Sync for Injector<T> {}
impl<T> Default for Injector<T> {
fn default() -> Self {
let block = Box::into_raw(Box::new(Block::<T>::new()));
Self {
head: CachePadded::new(Position {
block: AtomicPtr::new(block),
index: AtomicUsize::new(0),
}),
tail: CachePadded::new(Position {
block: AtomicPtr::new(block),
index: AtomicUsize::new(0),
}),
_marker: PhantomData,
}
}
}
impl<T> Injector<T> {
pub fn new() -> Self {
Self::default()
}
pub fn push(&self, task: T) {
let backoff = Backoff::new();
let mut tail = self.tail.index.load(Ordering::Acquire);
let mut block = self.tail.block.load(Ordering::Acquire);
let mut next_block = None;
loop {
let offset = (tail >> SHIFT) % LAP;
if offset == BLOCK_CAP {
backoff.snooze();
tail = self.tail.index.load(Ordering::Acquire);
block = self.tail.block.load(Ordering::Acquire);
continue;
}
if offset + 1 == BLOCK_CAP && next_block.is_none() {
next_block = Some(Box::new(Block::<T>::new()));
}
let new_tail = tail + (1 << SHIFT);
match self.tail.index.compare_exchange_weak(
tail,
new_tail,
Ordering::SeqCst,
Ordering::Acquire,
) {
Ok(_) => unsafe {
if offset + 1 == BLOCK_CAP {
let next_block = Box::into_raw(next_block.unwrap());
let next_index = new_tail.wrapping_add(1 << SHIFT);
self.tail.block.store(next_block, Ordering::Release);
self.tail.index.store(next_index, Ordering::Release);
(*block).next.store(next_block, Ordering::Release);
}
let slot = (*block).slots.get_unchecked(offset);
slot.task.get().write(ManuallyDrop::new(task));
slot.state.fetch_or(WRITE, Ordering::Release);
return;
},
Err(t) => {
tail = t;
block = self.tail.block.load(Ordering::Acquire);
backoff.spin();
}
}
}
}
pub fn steal(&self) -> Steal<T> {
let mut head;
let mut block;
let mut offset;
let backoff = Backoff::new();
loop {
head = self.head.index.load(Ordering::Acquire);
block = self.head.block.load(Ordering::Acquire);
offset = (head >> SHIFT) % LAP;
if offset == BLOCK_CAP {
backoff.snooze();
} else {
break;
}
}
let mut new_head = head + (1 << SHIFT);
if new_head & HAS_NEXT == 0 {
atomic::fence(Ordering::SeqCst);
let tail = self.tail.index.load(Ordering::Relaxed);
if head >> SHIFT == tail >> SHIFT {
return Steal::Empty;
}
if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
new_head |= HAS_NEXT;
}
}
if self
.head
.index
.compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
.is_err()
{
return Steal::Retry;
}
unsafe {
if offset + 1 == BLOCK_CAP {
let next = (*block).wait_next();
let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
if !(*next).next.load(Ordering::Relaxed).is_null() {
next_index |= HAS_NEXT;
}
self.head.block.store(next, Ordering::Release);
self.head.index.store(next_index, Ordering::Release);
}
let slot = (*block).slots.get_unchecked(offset);
slot.wait_write();
let m = slot.task.get().read();
let task = ManuallyDrop::into_inner(m);
if offset + 1 == BLOCK_CAP {
Block::destroy(block, offset);
} else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
Block::destroy(block, offset);
}
Steal::Success(task)
}
}
pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
let mut head;
let mut block;
let mut offset;
let backoff = Backoff::new();
loop {
head = self.head.index.load(Ordering::Acquire);
block = self.head.block.load(Ordering::Acquire);
offset = (head >> SHIFT) % LAP;
if offset == BLOCK_CAP {
backoff.snooze();
} else {
break;
}
}
let mut new_head = head;
let advance;
if new_head & HAS_NEXT == 0 {
atomic::fence(Ordering::SeqCst);
let tail = self.tail.index.load(Ordering::Relaxed);
if head >> SHIFT == tail >> SHIFT {
return Steal::Empty;
}
if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
new_head |= HAS_NEXT;
advance = (BLOCK_CAP - offset).min(MAX_BATCH);
} else {
let len = (tail - head) >> SHIFT;
advance = ((len + 1) / 2).min(MAX_BATCH);
}
} else {
advance = (BLOCK_CAP - offset).min(MAX_BATCH);
}
new_head += advance << SHIFT;
let new_offset = offset + advance;
if self
.head
.index
.compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
.is_err()
{
return Steal::Retry;
}
let batch_size = new_offset - offset;
dest.reserve(batch_size);
let dest_buffer = dest.buffer.get();
let dest_b = dest.inner.back.load(Ordering::Relaxed);
unsafe {
if new_offset == BLOCK_CAP {
let next = (*block).wait_next();
let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
if !(*next).next.load(Ordering::Relaxed).is_null() {
next_index |= HAS_NEXT;
}
self.head.block.store(next, Ordering::Release);
self.head.index.store(next_index, Ordering::Release);
}
match dest.flavor {
Flavor::Fifo => {
for i in 0..batch_size {
let slot = (*block).slots.get_unchecked(offset + i);
slot.wait_write();
let m = slot.task.get().read();
let task = ManuallyDrop::into_inner(m);
dest_buffer.write(dest_b.wrapping_add(i as isize), task);
}
}
Flavor::Lifo => {
for i in 0..batch_size {
let slot = (*block).slots.get_unchecked(offset + i);
slot.wait_write();
let m = slot.task.get().read();
let task = ManuallyDrop::into_inner(m);
dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
}
}
}
atomic::fence(Ordering::Release);
dest.inner
.back
.store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
if new_offset == BLOCK_CAP {
Block::destroy(block, offset);
} else {
for i in offset..new_offset {
let slot = (*block).slots.get_unchecked(i);
if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
Block::destroy(block, offset);
break;
}
}
}
Steal::Success(())
}
}
pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
let mut head;
let mut block;
let mut offset;
let backoff = Backoff::new();
loop {
head = self.head.index.load(Ordering::Acquire);
block = self.head.block.load(Ordering::Acquire);
offset = (head >> SHIFT) % LAP;
if offset == BLOCK_CAP {
backoff.snooze();
} else {
break;
}
}
let mut new_head = head;
let advance;
if new_head & HAS_NEXT == 0 {
atomic::fence(Ordering::SeqCst);
let tail = self.tail.index.load(Ordering::Relaxed);
if head >> SHIFT == tail >> SHIFT {
return Steal::Empty;
}
if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
new_head |= HAS_NEXT;
advance = (BLOCK_CAP - offset).min(MAX_BATCH + 1);
} else {
let len = (tail - head) >> SHIFT;
advance = ((len + 1) / 2).min(MAX_BATCH + 1);
}
} else {
advance = (BLOCK_CAP - offset).min(MAX_BATCH + 1);
}
new_head += advance << SHIFT;
let new_offset = offset + advance;
if self
.head
.index
.compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
.is_err()
{
return Steal::Retry;
}
let batch_size = new_offset - offset - 1;
dest.reserve(batch_size);
let dest_buffer = dest.buffer.get();
let dest_b = dest.inner.back.load(Ordering::Relaxed);
unsafe {
if new_offset == BLOCK_CAP {
let next = (*block).wait_next();
let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
if !(*next).next.load(Ordering::Relaxed).is_null() {
next_index |= HAS_NEXT;
}
self.head.block.store(next, Ordering::Release);
self.head.index.store(next_index, Ordering::Release);
}
let slot = (*block).slots.get_unchecked(offset);
slot.wait_write();
let m = slot.task.get().read();
let task = ManuallyDrop::into_inner(m);
match dest.flavor {
Flavor::Fifo => {
for i in 0..batch_size {
let slot = (*block).slots.get_unchecked(offset + i + 1);
slot.wait_write();
let m = slot.task.get().read();
let task = ManuallyDrop::into_inner(m);
dest_buffer.write(dest_b.wrapping_add(i as isize), task);
}
}
Flavor::Lifo => {
for i in 0..batch_size {
let slot = (*block).slots.get_unchecked(offset + i + 1);
slot.wait_write();
let m = slot.task.get().read();
let task = ManuallyDrop::into_inner(m);
dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
}
}
}
atomic::fence(Ordering::Release);
dest.inner
.back
.store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
if new_offset == BLOCK_CAP {
Block::destroy(block, offset);
} else {
for i in offset..new_offset {
let slot = (*block).slots.get_unchecked(i);
if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
Block::destroy(block, offset);
break;
}
}
}
Steal::Success(task)
}
}
pub fn is_empty(&self) -> bool {
let head = self.head.index.load(Ordering::SeqCst);
let tail = self.tail.index.load(Ordering::SeqCst);
head >> SHIFT == tail >> SHIFT
}
}
impl<T> Drop for Injector<T> {
fn drop(&mut self) {
let mut head = self.head.index.load(Ordering::Relaxed);
let mut tail = self.tail.index.load(Ordering::Relaxed);
let mut block = self.head.block.load(Ordering::Relaxed);
head &= !((1 << SHIFT) - 1);
tail &= !((1 << SHIFT) - 1);
unsafe {
while head != tail {
let offset = (head >> SHIFT) % LAP;
if offset < BLOCK_CAP {
let slot = (*block).slots.get_unchecked(offset);
ManuallyDrop::drop(&mut *(*slot).task.get());
} else {
let next = (*block).next.load(Ordering::Relaxed);
drop(Box::from_raw(block));
block = next;
}
head = head.wrapping_add(1 << SHIFT);
}
drop(Box::from_raw(block));
}
}
}
impl<T> fmt::Debug for Injector<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("Worker { .. }")
}
}
#[must_use]
#[derive(PartialEq, Eq, Copy, Clone)]
pub enum Steal<T> {
Empty,
Success(T),
Retry,
}
impl<T> Steal<T> {
pub fn is_empty(&self) -> bool {
match self {
Steal::Empty => true,
_ => false,
}
}
pub fn is_success(&self) -> bool {
match self {
Steal::Success(_) => true,
_ => false,
}
}
pub fn is_retry(&self) -> bool {
match self {
Steal::Retry => true,
_ => false,
}
}
pub fn success(self) -> Option<T> {
match self {
Steal::Success(res) => Some(res),
_ => None,
}
}
pub fn or_else<F>(self, f: F) -> Steal<T>
where
F: FnOnce() -> Steal<T>,
{
match self {
Steal::Empty => f(),
Steal::Success(_) => self,
Steal::Retry => {
if let Steal::Success(res) = f() {
Steal::Success(res)
} else {
Steal::Retry
}
}
}
}
}
impl<T> fmt::Debug for Steal<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Steal::Empty => f.pad("Empty"),
Steal::Success(_) => f.pad("Success(..)"),
Steal::Retry => f.pad("Retry"),
}
}
}
impl<T> FromIterator<Steal<T>> for Steal<T> {
fn from_iter<I>(iter: I) -> Steal<T>
where
I: IntoIterator<Item = Steal<T>>,
{
let mut retry = false;
for s in iter {
match &s {
Steal::Empty => {}
Steal::Success(_) => return s,
Steal::Retry => retry = true,
}
}
if retry {
Steal::Retry
} else {
Steal::Empty
}
}
}