//! # LIFO, bounded, work-stealing queue.
//!
//! ## Example
//!
//! ```
//! use std::thread;
//! use st3::lifo::Worker;
//!
//! // Push 4 items into a LIFO queue of capacity 256.
//! let worker = Worker::new(256);
//! worker.push("a").unwrap();
//! worker.push("b").unwrap();
//! worker.push("c").unwrap();
//! worker.push("d").unwrap();
//!
//! // Steal items concurrently.
//! let stealer = worker.stealer();
//! let th = thread::spawn(move || {
//! let other_worker = Worker::new(256);
//!
//! // Try to steal half the items and return the actual count of stolen items.
//! match stealer.steal(&other_worker, |n| n/2) {
//! Ok(actual) => actual,
//! Err(_) => 0,
//! }
//! });
//!
//! // Pop items concurrently.
//! let mut pop_count = 0;
//! while worker.pop().is_some() {
//! pop_count += 1;
//! }
//!
//! // Does it add up?
//! let steal_count = th.join().unwrap();
//! assert_eq!(pop_count + steal_count, 4);
//! ```
use alloc::boxed::Box;
use alloc::sync::Arc;
use core::alloc::Layout;
use core::iter::FusedIterator;
use core::mem::{drop, transmute, MaybeUninit};
use core::panic::{RefUnwindSafe, UnwindSafe};
use core::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use crossbeam_utils::CachePadded;
use crate::config::{AtomicUnsignedLong, AtomicUnsignedShort, UnsignedShort};
use crate::loom_exports::cell::UnsafeCell;
use crate::loom_exports::debug_or_loom_assert;
use crate::{allocate_buffer, pack, unpack, StealError};
/// A double-ended LIFO queue.
///
/// The queue tracks its tail and head position within a ring buffer with
/// wrap-around integers, where the least significant bits specify the actual
/// buffer index. All positions have bit widths that are intentionally larger
/// than necessary for buffer indexing because:
/// - an extra bit is needed to disambiguate between empty and full buffers when
/// the start and end position of the buffer are equal,
/// - the pop count and the head in `pop_count_and_head` are also used as
/// long-cycle counters to prevent ABA issues in the pop CAS and in the
/// stealer CAS.
///
/// The position of the head can be at any moment determined by subtracting 2
/// counters: the push operations counter and the pop operations counter.
#[derive(Debug)]
struct Queue<T> {
/// Total number of push operations.
push_count: CachePadded<AtomicUnsignedShort>,
/// Total number of pop operations, packed together with the position of the
/// head that a stealer will set once stealing is complete. This head
/// position always coincides with the `head` field below if the last
/// stealing operation has completed.
pop_count_and_head: CachePadded<AtomicUnsignedLong>,
/// Position of the queue head, updated after completion of each stealing
/// operation.
head: CachePadded<AtomicUnsignedShort>,
/// Queue items.
buffer: Box<[UnsafeCell<MaybeUninit<T>>]>,
/// Mask for the buffer index.
mask: UnsignedShort,
}
impl<T> Queue<T> {
/// Read an item at the given position.
///
/// The position is automatically mapped to a valid buffer index using a
/// modulo operation.
///
/// # Safety
///
/// The item at the given position must have been initialized before and
/// cannot have been moved out.
///
/// The caller must guarantee that the item at this position cannot be
/// written to or moved out concurrently.
#[inline]
unsafe fn read_at(&self, position: UnsignedShort) -> T {
let index = (position & self.mask) as usize;
(*self.buffer).as_ref()[index].with(|slot| slot.read().assume_init())
}
/// Write an item at the given position.
///
/// The position is automatically mapped to a valid buffer index using a
/// modulo operation.
///
/// # Note
///
/// If an item is already initialized but was not moved out yet, it will be
/// leaked.
///
/// # Safety
///
/// The caller must guarantee that the item at this position cannot be read
/// or written to concurrently.
#[inline]
unsafe fn write_at(&self, position: UnsignedShort, item: T) {
let index = (position & self.mask) as usize;
(*self.buffer).as_ref()[index].with_mut(|slot| slot.write(MaybeUninit::new(item)));
}
/// Attempt to book `N` items for stealing where `N` is specified by a
/// closure which takes as argument the total count of available items.
///
/// In case of success, the returned triplet is the *current* head, the
/// *next* head and an item count at least equal to 1.
///
/// # Errors
///
/// An error is returned in the following cases:
/// 1) no item could be stolen, either because the queue is empty or because
/// `N` is 0,
/// 2) a concurrent stealing operation is ongoing.
///
/// # Safety
///
/// This function is not strictly unsafe, but because it initiates the
/// stealing operation by modifying the post-stealing head in
/// `push_count_and_head` without ever updating the `head` atomic variable,
/// its misuse can result in permanently blocking subsequent stealing
/// operations.
fn book_items<C>(
&self,
mut count_fn: C,
max_count: UnsignedShort,
) -> Result<(UnsignedShort, UnsignedShort, UnsignedShort), StealError>
where
C: FnMut(usize) -> usize,
{
// Ordering: Acquire on the `pop_count_and_head` load synchronizes with
// the release at the end of a previous pop operation. It is therefore
// warranted that the push count loaded later is at least the same as it
// was when the pop count was set, ensuring in turn that the computed
// tail is not less than the head and therefore the item count does not
// wrap around. For the same reason, the failure ordering on the CAS is
// also Acquire since the push count is loaded again at every CAS
// iteration.
let mut pop_count_and_head = self.pop_count_and_head.load(Acquire);
// Ordering: Acquire on the `head` load synchronizes with a release at
// the end of a previous steal operation. Once this head is confirmed
// equal to the head in `pop_count_and_head`, it is therefore warranted
// that the push count loaded later is at least the same as it was on
// the last completed steal operation, ensuring in turn that the
// computed tail is not less than the head and therefore the item count
// does not wrap around. Alternatively, the ordering could be Relaxed if
// the success ordering on the CAS was AcqRel, which would achieve the
// same by synchronizing with the head field of `pop_count_and_head`.
let old_head = self.head.load(Acquire);
loop {
let (pop_count, head) = unpack(pop_count_and_head);
// Bail out if both heads differ because it means another stealing
// operation is concurrently ongoing.
if old_head != head {
return Err(StealError::Busy);
}
// Ordering: Acquire synchronizes with the Release in the push
// method and ensure that all items pushed to the queue are visible.
let push_count = self.push_count.load(Acquire);
let tail = push_count.wrapping_sub(pop_count);
// Note: it is possible for the computed item_count to be spuriously
// greater than the number of available items if, in this iteration
// of the CAS loop, `pop_count_and_head` and `head` are both
// obsolete. This is not an issue, however, since the CAS will then
// fail due to `pop_count_and_head` being obsolete.
let item_count = tail.wrapping_sub(head);
// `item_count` is tested now because `count_fn` may expect
// `item_count>0`.
if item_count == 0 {
return Err(StealError::Empty);
}
// Unwind safety: it is OK if `count_fn` panics because no state has
// been modified yet.
let count = (count_fn(item_count as usize).min(max_count as usize) as UnsignedShort)
.min(item_count);
// The special case `count_fn() == 0` must be tested specifically,
// because if the compare-exchange succeeds with `count=0`, the new
// value will be the same as the old one so other stealers will not
// detect that stealing is currently ongoing and may try to actually
// steal items and concurrently modify the position of the head.
if count == 0 {
return Err(StealError::Empty);
}
let new_head = head.wrapping_add(count);
let new_pop_count_and_head = pack(pop_count, new_head);
// Attempt to book the slots. Only one stealer can succeed since
// once this atomic is changed, the other thread will necessarily
// observe a mismatch between `head` and the head sub-field of
// `pop_count_and_head`.
//
// Ordering: see justification for Acquire on failure in the first
// load of `pop_count_and_head`. No further synchronization is
// necessary on success.
match self.pop_count_and_head.compare_exchange_weak(
pop_count_and_head,
new_pop_count_and_head,
Acquire,
Acquire,
) {
Ok(_) => return Ok((head, new_head, count)),
// We lost the race to a concurrent pop or steal operation, or
// the CAS failed spuriously; try again.
Err(current) => pop_count_and_head = current,
}
}
}
/// Capacity of the queue.
#[inline]
fn capacity(&self) -> UnsignedShort {
self.mask.wrapping_add(1)
}
}
impl<T> Drop for Queue<T> {
fn drop(&mut self) {
let head = self.head.load(Relaxed);
let push_count = self.push_count.load(Relaxed);
let pop_count = unpack(self.pop_count_and_head.load(Relaxed)).0;
let tail = push_count.wrapping_sub(pop_count);
let count = tail.wrapping_sub(head);
for offset in 0..count {
drop(unsafe { self.read_at(head.wrapping_add(offset)) });
}
}
}
/// Handle for single-threaded LIFO push and pop operations.
#[derive(Debug)]
pub struct Worker<T> {
queue: Arc<Queue<T>>,
}
impl<T> Worker<T> {
/// Creates a new queue and returns a `Worker` handle.
///
/// **The capacity of a queue is always a power of two**. It is set to the
/// smallest power of two greater than or equal to the requested minimum
/// capacity.
///
/// # Panic
///
/// This method will panic if the minimum requested capacity is greater than
/// 2³¹ on targets that support 64-bit atomics, or greater than 2¹⁵ on
/// targets that only support 32-bit atomics.
pub fn new(min_capacity: usize) -> Self {
const MAX_CAPACITY: usize = 1 << (UnsignedShort::BITS - 1);
assert!(
min_capacity <= MAX_CAPACITY,
"the capacity of the queue cannot exceed {}",
MAX_CAPACITY
);
// `next_power_of_two` cannot overflow since `min_capacity` cannot be
// greater than `MAX_CAPACITY`, and the latter is a power of two that
// always fits within an `UnsignedShort`.
let capacity = min_capacity.next_power_of_two();
let buffer = allocate_buffer(capacity);
let mask = capacity as UnsignedShort - 1;
let queue = Arc::new(Queue {
push_count: CachePadded::new(AtomicUnsignedShort::new(0)),
pop_count_and_head: CachePadded::new(AtomicUnsignedLong::new(0)),
head: CachePadded::new(AtomicUnsignedShort::new(0)),
buffer,
mask,
});
Worker { queue }
}
/// Creates a new `Stealer` handle associated to this `Worker`.
///
/// An arbitrary number of `Stealer` handles can be created, either using
/// this method or cloning an existing `Stealer` handle.
pub fn stealer(&self) -> Stealer<T> {
Stealer {
queue: self.queue.clone(),
}
}
/// Creates a reference to a `Stealer` handle associated to this `Worker`.
///
/// This is a zero-cost reference-to-reference conversion: the reference
/// count to the underlying queue is not modified. The returned reference
/// can in particular be used to perform a cheap equality check with another
/// `Stealer` and verify that it is associated to the same `Worker`.
pub fn stealer_ref(&self) -> &Stealer<T> {
// Sanity checks to assess that `queue` has indeed the size and
// alignment of a `Stealer` (this assert is optimized in release mode).
assert_eq!(Layout::for_value(&self.queue), Layout::new::<Stealer<T>>());
// Safety: `self.queue` has the size and alignment of `Stealer` since
// the latter is a `repr(transparent)` type over an `Arc<Queue<T>>`. The
// lifetime of the returned reference is bounded by the lifetime of
// `&self`. The soundness of providing a `Stealer` from a `Worker` is
// already assumed by the `stealer()` method, so providing a short-lived
// reference to a `Stealer` does not in itself modify safety guarantees.
unsafe { transmute::<&'_ Arc<Queue<T>>, &'_ Stealer<T>>(&self.queue) }
}
/// Returns the capacity of the queue.
pub fn capacity(&self) -> usize {
self.queue.capacity() as usize
}
/// Returns the number of items that can be successfully pushed onto the
/// queue.
///
/// Note that that the spare capacity may be underestimated due to
/// concurrent stealing operations.
pub fn spare_capacity(&self) -> usize {
let push_count = self.queue.push_count.load(Relaxed);
let pop_count = unpack(self.queue.pop_count_and_head.load(Relaxed)).0;
let tail = push_count.wrapping_sub(pop_count);
// Ordering: Relaxed ordering is sufficient since no element will be
// read or written.
let head = self.queue.head.load(Relaxed);
// Aggregate count of available items (those which can be popped) and of
// items currently being stolen. Note that even if the value of `head`
// is stale, `len` can never exceed the maximum capacity because it is
// computed on the same thread that pushes items, but `push` would fail
// if `head` suggested that there is no spare capacity.
let len = tail.wrapping_sub(head);
(self.queue.capacity() - len) as usize
}
/// Returns true if the queue is empty.
///
/// Note that the queue size is somewhat ill-defined in a multi-threaded
/// context, but it is warranted that if `is_empty()` returns true, a
/// subsequent call to `pop()` will fail.
pub fn is_empty(&self) -> bool {
let push_count = self.queue.push_count.load(Relaxed);
let (pop_count, head) = unpack(self.queue.pop_count_and_head.load(Relaxed));
let tail = push_count.wrapping_sub(pop_count);
tail == head
}
/// Attempts to push one item at the tail of the queue.
///
/// # Errors
///
/// This will fail if the queue is full, in which case the item is returned
/// as the error field.
pub fn push(&self, item: T) -> Result<(), T> {
let push_count = self.queue.push_count.load(Relaxed);
let pop_count = unpack(self.queue.pop_count_and_head.load(Relaxed)).0;
let tail = push_count.wrapping_sub(pop_count);
// Ordering: Acquire ordering is required to synchronize with the
// Release of the `head` atomic at the end of a stealing operation and
// ensure that the stealer has finished copying the items from the
// buffer.
let head = self.queue.head.load(Acquire);
// Check that the buffer is not full.
if tail.wrapping_sub(head) > self.queue.mask {
return Err(item);
}
// Store the item.
unsafe { self.queue.write_at(tail, item) };
// Make the item visible by incrementing the push count.
//
// Ordering: the Release ordering ensures that the subsequent
// acquisition of this atomic by a stealer will make the previous write
// visible.
self.queue
.push_count
.store(push_count.wrapping_add(1), Release);
Ok(())
}
/// Attempts to push the content of an iterator at the tail of the queue.
///
/// It is the responsibility of the caller to ensure that there is enough
/// spare capacity to accommodate all iterator items, for instance by
/// calling [`spare_capacity`](Worker::spare_capacity) beforehand.
/// Otherwise, the iterator is dropped while still holding the excess items.
pub fn extend<I: IntoIterator<Item = T>>(&self, iter: I) {
let push_count = self.queue.push_count.load(Relaxed);
let pop_count = unpack(self.queue.pop_count_and_head.load(Relaxed)).0;
let mut tail = push_count.wrapping_sub(pop_count);
// Ordering: Acquire ordering is required to synchronize with the
// Release of the `head` atomic at the end of a stealing operation and
// ensure that the stealer has finished copying the items from the
// buffer.
let head = self.queue.head.load(Acquire);
let max_tail = head.wrapping_add(self.queue.capacity());
for item in iter {
// Check whether the buffer is full.
if tail == max_tail {
break;
}
// Store the item.
unsafe { self.queue.write_at(tail, item) };
tail = tail.wrapping_add(1);
}
// Make the items visible by incrementing the push count.
//
// Ordering: the Release ordering ensures that the subsequent
// acquisition of this atomic by a stealer will make the previous write
// visible.
self.queue
.push_count
.store(tail.wrapping_add(pop_count), Release);
}
/// Attempts to pop one item from the tail of the queue.
///
/// This returns None if the queue is empty.
pub fn pop(&self) -> Option<T> {
// Acquire the item to be popped.
//
// Ordering: Relaxed ordering is sufficient since (i) the push and pop
// count are only set by this thread and (ii) no stealer will read this
// slot until it has been again written to with a push operation. In the
// worse case, the head position read below will be obsolete and the
// first CAS will fail.
let mut pop_count_and_head = self.queue.pop_count_and_head.load(Relaxed);
let push_count = self.queue.push_count.load(Relaxed);
let (pop_count, mut head) = unpack(pop_count_and_head);
let tail = push_count.wrapping_sub(pop_count);
let new_pop_count = pop_count.wrapping_add(1);
loop {
// Check if the queue is empty.
if tail == head {
return None;
}
let new_pop_count_and_head = pack(new_pop_count, head);
// Attempt to claim this slot.
//
// Ordering: Release is necessary so that stealers can acquire the
// pop count and be sure that all previous push operations have been
// accounted for, otherwise the calculated tail could end up less
// than the head.
match self.queue.pop_count_and_head.compare_exchange_weak(
pop_count_and_head,
new_pop_count_and_head,
Release,
Relaxed,
) {
Ok(_) => break,
// We lost the race to a stealer or the CAS failed spuriously; try again.
Err(current) => {
pop_count_and_head = current;
head = unpack(current).1;
}
}
}
// Read the item.
unsafe { Some(self.queue.read_at(tail.wrapping_sub(1))) }
}
/// Returns an iterator that steals items from the head of the queue.
///
/// The returned iterator steals up to `N` items, where `N` is specified by
/// a closure which takes as argument the total count of items available for
/// stealing. Upon success, the number of items ultimately stolen can be
/// from 1 to `N`, depending on the number of available items.
///
/// # Beware
///
/// All items stolen by the iterator should be moved out as soon as
/// possible, because until then or until the iterator is dropped, all
/// concurrent stealing operations will fail with [`StealError::Busy`].
///
/// # Leaking
///
/// If the iterator is leaked before all stolen items have been moved out,
/// subsequent stealing operations will permanently fail with
/// [`StealError::Busy`].
///
/// # Errors
///
/// An error is returned in the following cases:
/// 1) no item was stolen, either because the queue is empty or `N` is 0,
/// 2) a concurrent stealing operation is ongoing.
pub fn drain<C>(&self, count_fn: C) -> Result<Drain<'_, T>, StealError>
where
C: FnMut(usize) -> usize,
{
let (old_head, new_head, _) = self.queue.book_items(count_fn, UnsignedShort::MAX)?;
Ok(Drain {
queue: &self.queue,
current: old_head,
end: new_head,
})
}
}
impl<T> UnwindSafe for Worker<T> {}
impl<T> RefUnwindSafe for Worker<T> {}
unsafe impl<T: Send> Send for Worker<T> {}
/// A draining iterator for [`Worker<T>`].
///
/// This iterator is created by [`Worker::drain`]. See its documentation for
/// more information.
#[derive(Debug)]
pub struct Drain<'a, T> {
queue: &'a Queue<T>,
current: UnsignedShort,
end: UnsignedShort,
}
impl<'a, T> Iterator for Drain<'a, T> {
type Item = T;
fn next(&mut self) -> Option<T> {
if self.current == self.end {
return None;
}
let item = Some(unsafe { self.queue.read_at(self.current) });
self.current = self.current.wrapping_add(1);
// We cannot rely on the caller to call `next` again after the last item
// is yielded so the head position must be updated immediately when
// yielding the last item.
if self.current == self.end {
// Update the head position.
//
// Ordering: the Release ordering ensures that all items have been moved
// out when a subsequent push operation synchronizes by acquiring
// `head`. It also ensures that the push count seen by a subsequent
// steal operation (which acquires `head`) is at least equal to the one
// seen by the present steal operation.
self.queue.head.store(self.end, Release);
}
item
}
fn size_hint(&self) -> (usize, Option<usize>) {
let sz = self.end.wrapping_sub(self.current) as usize;
(sz, Some(sz))
}
}
impl<'a, T> ExactSizeIterator for Drain<'a, T> {}
impl<'a, T> FusedIterator for Drain<'a, T> {}
impl<'a, T> Drop for Drain<'a, T> {
fn drop(&mut self) {
// Drop all items and make sure the head is updated so that subsequent
// stealing operations can succeed.
for _item in self {}
}
}
impl<'a, T> UnwindSafe for Drain<'a, T> {}
impl<'a, T> RefUnwindSafe for Drain<'a, T> {}
unsafe impl<'a, T: Send> Send for Drain<'a, T> {}
unsafe impl<'a, T: Send> Sync for Drain<'a, T> {}
/// Handle for multi-threaded stealing operations.
#[derive(Debug)]
#[repr(transparent)]
pub struct Stealer<T> {
queue: Arc<Queue<T>>,
}
impl<T> Stealer<T> {
/// Attempts to steal items from the head of the queue and move them to the
/// tail of another queue.
///
/// Up to `N` items are moved to the destination queue, where `N` is
/// specified by a closure which takes as argument the total count of items
/// available for stealing. Upon success, the number of items ultimately
/// transferred to the destination queue can be from 1 to `N`, depending on
/// the number of available items and the capacity of the destination queue;
/// the count of transferred items is returned as the success payload.
///
/// # Errors
///
/// An error is returned in the following cases:
/// 1) no item was stolen, either because the queue is empty, the
/// destination is full or `N` is 0,
/// 2) a concurrent stealing operation is ongoing.
pub fn steal<C>(&self, dest: &Worker<T>, count_fn: C) -> Result<usize, StealError>
where
C: FnMut(usize) -> usize,
{
// Compute the free capacity of the destination queue.
//
// Note that even if the value of `dest_head` is stale, the subtraction
// that computes `dest_free_capacity` can never overflow since it is
// computed on the same thread that pushes items to the destination
// queue, but `push` would fail if `dest_head` suggested that there is
// no spare capacity.
//
// Ordering: see `Worker::push()` method.
let dest_push_count = dest.queue.push_count.load(Relaxed);
let dest_pop_count = unpack(dest.queue.pop_count_and_head.load(Relaxed)).0;
let dest_tail = dest_push_count.wrapping_sub(dest_pop_count);
let dest_head = dest.queue.head.load(Acquire);
let dest_free_capacity = dest.queue.capacity() - dest_tail.wrapping_sub(dest_head);
debug_or_loom_assert!(dest_free_capacity <= dest.queue.capacity());
let (old_head, new_head, transfer_count) =
self.queue.book_items(count_fn, dest_free_capacity)?;
debug_or_loom_assert!(transfer_count <= dest_free_capacity);
// Move all items but the last to the destination queue.
for offset in 0..transfer_count {
unsafe {
let item = self.queue.read_at(old_head.wrapping_add(offset));
dest.queue.write_at(dest_tail.wrapping_add(offset), item);
}
}
// Make the moved items visible by updating the destination tail position.
//
// Ordering: see comments in the `push()` method.
dest.queue
.push_count
.store(dest_push_count.wrapping_add(transfer_count), Release);
// Update the head position.
//
// Ordering: the Release ordering ensures that all items have been moved
// out when a subsequent push operation synchronizes by acquiring
// `head`. It also ensures that the push count seen by a subsequent
// steal operation (which acquires `head`) is at least equal to the one
// seen by the present steal operation.
self.queue.head.store(new_head, Release);
Ok(transfer_count as usize)
}
/// Attempts to steal items from the head of the queue, returning one of
/// them directly and moving the others to the tail of another queue.
///
/// Up to `N` items are stolen (including the one returned directly), where
/// `N` is specified by a closure which takes as argument the total count of
/// items available for stealing. Upon success, one item is returned and
/// from 0 to `N-1` items are moved to the destination queue, depending on
/// the number of available items and the capacity of the destination queue;
/// the number of transferred items is returned as the second field of the
/// success value.
///
/// The returned item is the most recent one among the stolen items.
///
/// # Errors
///
/// An error is returned in the following cases:
/// 1) no item was stolen, either because the queue is empty or `N` is 0,
/// 2) a concurrent stealing operation is ongoing.
///
/// Failure to transfer any item to the destination queue is not considered
/// an error as long as one element could be returned directly. This can
/// occur if the destination queue is full, if the source queue has only one
/// item or if `N` is 1.
pub fn steal_and_pop<C>(&self, dest: &Worker<T>, count_fn: C) -> Result<(T, usize), StealError>
where
C: FnMut(usize) -> usize,
{
// Compute the free capacity of the destination queue.
//
// Ordering: see `Worker::push()` method.
let dest_push_count = dest.queue.push_count.load(Relaxed);
let dest_pop_count = unpack(dest.queue.pop_count_and_head.load(Relaxed)).0;
let dest_tail = dest_push_count.wrapping_sub(dest_pop_count);
let dest_head = dest.queue.head.load(Acquire);
let dest_free_capacity = dest.queue.capacity() - dest_tail.wrapping_sub(dest_head);
debug_or_loom_assert!(dest_free_capacity <= dest.queue.capacity());
let (old_head, new_head, count) =
self.queue.book_items(count_fn, dest_free_capacity + 1)?;
let transfer_count = count - 1;
debug_or_loom_assert!(transfer_count <= dest_free_capacity);
// Move all items but the last to the destination queue.
for offset in 0..transfer_count {
unsafe {
let item = self.queue.read_at(old_head.wrapping_add(offset));
dest.queue.write_at(dest_tail.wrapping_add(offset), item);
}
}
// Read the last item.
let last_item = unsafe { self.queue.read_at(old_head.wrapping_add(transfer_count)) };
// Make the moved items visible by updating the destination tail position.
//
// Ordering: see comments in the `push()` method.
dest.queue
.push_count
.store(dest_push_count.wrapping_add(transfer_count), Release);
// Update the head position.
//
// Ordering: the Release ordering ensures that all items have been moved
// out when a subsequent push operation synchronizes by acquiring
// `head`. It also ensures that the push count seen by a subsequent
// steal operation (which acquires `head`) is at least equal to the one
// seen by the present steal operation.
self.queue.head.store(new_head, Release);
Ok((last_item, transfer_count as usize))
}
}
impl<T> Clone for Stealer<T> {
fn clone(&self) -> Self {
Stealer {
queue: self.queue.clone(),
}
}
}
impl<T> PartialEq for Stealer<T> {
fn eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.queue, &other.queue)
}
}
impl<T> Eq for Stealer<T> {}
impl<T> UnwindSafe for Stealer<T> {}
impl<T> RefUnwindSafe for Stealer<T> {}
unsafe impl<T: Send> Send for Stealer<T> {}
unsafe impl<T: Send> Sync for Stealer<T> {}