use std::fmt;
use std::marker::PhantomData;
use std::mem;
use std::ptr;
use std::sync::atomic::{AtomicUsize, Ordering, spin_loop_hint};
pub struct AtomicRingBuffer<T: Sized> {
read_counters: CounterStore,
write_counters: CounterStore,
mem: *mut [T],
_marker: PhantomData<T>,
}
unsafe impl<T: Send> Send for AtomicRingBuffer<T> {}
unsafe impl<T: Send> Sync for AtomicRingBuffer<T> {}
const MAXIMUM_IN_PROGRESS: u8 = 16;
impl<T: Default> AtomicRingBuffer<T> {
#[inline(always)]
pub fn try_write<F: FnOnce(&mut T)>(&self, writer: F) -> Result<(), ()> {
self.try_unsafe_write(|cell| unsafe {
if mem::size_of::<T>() != 0 {
ptr::write(cell, Default::default());
}
writer(&mut (*cell));
})
}
}
impl<T: Sized> AtomicRingBuffer<T> {
pub fn with_capacity(capacity: usize) -> AtomicRingBuffer<T> {
if capacity > (::std::usize::MAX >> 16) + 1 {
panic!("too large!");
}
let cap = capacity.next_power_of_two();
let mut content: Vec<T> = Vec::with_capacity(cap);
unsafe { content.set_len(cap); }
let mem = Box::into_raw(content.into_boxed_slice());
AtomicRingBuffer {
mem,
read_counters: CounterStore::new(),
write_counters: CounterStore::new(),
_marker: PhantomData,
}
}
#[inline(always)]
pub fn try_push(&self, content: T) -> Result<(), T> {
self.try_unsafe_write_or(content, |cell, content| {
if mem::size_of::<T>() != 0 {
unsafe { ptr::write(cell, content); }
}
}, |content| { content })
}
#[inline(always)]
pub fn try_unsafe_write<F: FnOnce(*mut T)>(&self, writer: F) -> Result<(), ()> {
self.try_unsafe_write_or((), |dst, _| { writer(dst) }, |_| {})
}
#[inline(always)]
pub fn try_unsafe_write_or<CNT, OK, ERR, W: FnOnce(*mut T, CNT) -> OK, E: FnOnce(CNT) -> ERR>(&self, content: CNT, writer: W, err: E) -> Result<OK, ERR> {
let cap_mask = self.cap_mask();
let error_condition = |to_write_index: usize, _: u8| { to_write_index.wrapping_add(1) & cap_mask == self.read_counters.load(Ordering::SeqCst).index() };
if let Ok((write_counters, to_write_index)) = self.write_counters.increment_in_progress(error_condition, cap_mask) {
let ok = unsafe {
let cell = self.cell(to_write_index);
writer(cell, content)
};
self.write_counters.increment_done(write_counters, to_write_index, cap_mask);
Ok(ok)
} else {
Err(err(content))
}
}
#[inline(always)]
pub fn push_overwrite(&self, content: T) {
let mut cont = content;
loop {
match self.try_push(cont) {
Ok(_) => return,
Err(ret) => {
self.remove_if_full();
cont = ret;
}
}
}
}
#[inline]
pub fn try_pop(&self) -> Option<T> {
self.try_read_nodrop(|cell|
if ::std::mem::size_of::<T>() != 0 {
unsafe { ptr::read(cell) }
} else {
unsafe { ::std::mem::zeroed() }
})
}
#[inline]
pub fn try_read<U, F: FnOnce(&mut T) -> U>(&self, reader: F) -> Option<U> {
self.try_read_nodrop(|cell| unsafe {
let result = reader(&mut (*cell));
ptr::drop_in_place(cell);
result
})
}
#[inline]
pub fn try_read_nodrop<U, F: FnOnce(&mut T) -> U>(&self, reader: F) -> Option<U> {
let cap_mask = self.cap_mask();
let error_condition = |to_read_index: usize, _: u8| { to_read_index == self.write_counters.load(Ordering::SeqCst).index() };
if let Ok((read_counters, to_read_index)) = self.read_counters.increment_in_progress(error_condition, cap_mask) {
let popped = unsafe {
let cell = self.cell(to_read_index);
reader(&mut (*cell))
};
self.read_counters.increment_done(read_counters, to_read_index, cap_mask);
Some(popped)
} else {
None
}
}
#[inline]
pub fn len(&self) -> usize {
let read_counters = self.read_counters.load(Ordering::SeqCst);
let write_counters = self.write_counters.load(Ordering::SeqCst);
counter_len(read_counters, write_counters, self.capacity())
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[inline(always)]
pub fn capacity(&self) -> usize {
unsafe { (*self.mem).len() }
}
#[inline]
pub fn remaining_cap(&self) -> usize {
let read_counters = self.read_counters.load(Ordering::SeqCst);
let write_counters = self.write_counters.load(Ordering::SeqCst);
let cap = self.capacity();
let read_index = read_counters.index();
let write_index = write_counters.index();
let len = if read_index <= write_index { write_index - read_index } else { write_index + cap - read_index };
cap - 1 - len - write_counters.in_process_count() as usize
}
#[inline]
pub fn clear(&self) {
while let Some(_) = self.try_pop() {}
}
pub fn memory_usage(&self) -> usize {
unsafe { mem::size_of_val(&(*self.mem)) }
}
#[inline(always)]
unsafe fn cell(&self, index: usize) -> *mut T {
(&mut (*self.mem)[index])
}
#[inline(always)]
fn cap_mask(&self) -> usize {
self.capacity() - 1
}
fn remove_if_full(&self) -> Option<T> {
let cap_mask = self.cap_mask();
let error_condition = |to_read_index: usize, read_in_process_count: u8| { read_in_process_count > 0 || to_read_index.wrapping_add(1) & cap_mask == self.write_counters.load(Ordering::Acquire).index() };
if let Ok((read_counters, to_read_index)) = self.read_counters.increment_in_progress(error_condition, cap_mask) {
let popped = unsafe {
if ::std::mem::size_of::<T>() != 0 {
ptr::read(self.cell(to_read_index))
} else {
::std::mem::zeroed()
}
};
self.read_counters.increment_done(read_counters, to_read_index, cap_mask);
Some(popped)
} else {
None
}
}
}
impl<T> Drop for AtomicRingBuffer<T> {
fn drop(&mut self) {
self.clear();
unsafe { Box::from_raw(self.mem).into_vec().set_len(0); }
}
}
impl<T> fmt::Debug for AtomicRingBuffer<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if f.alternate() {
let cap = self.capacity();
let read_counters = self.read_counters.load(Ordering::Relaxed);
let write_counters = self.write_counters.load(Ordering::Relaxed);
write!(f, "AtomicRingBuffer cap: {} len: {} read_index: {}, read_in_process_count: {}, read_done_count: {}, write_index: {}, write_in_process_count: {}, write_done_count: {}", cap, self.len(),
read_counters.index(), read_counters.in_process_count(), read_counters.done_count(),
write_counters.index(), write_counters.in_process_count(), write_counters.done_count())
} else {
write!(f, "AtomicRingBuffer cap: {} len: {}", self.capacity(), self.len())
}
}
}
#[derive(Eq, PartialEq, Copy, Clone)]
pub struct Counters(usize);
impl Counters {
#[inline(always)]
const fn index(self) -> usize {
self.0 >> 16
}
#[inline(always)]
const fn in_process_count(self) -> u8 {
self.0 as u8
}
#[inline(always)]
const fn done_count(self) -> u8 {
(self.0 >> 8) as u8
}
}
impl From<usize> for Counters {
fn from(val: usize) -> Counters {
Counters(val)
}
}
impl From<Counters> for usize {
fn from(val: Counters) -> usize {
val.0
}
}
fn counter_len(read_counters: Counters, write_counters: Counters, cap: usize) -> usize {
let read_index = read_counters.index();
let write_index = write_counters.index();
let len = if read_index <= write_index { write_index - read_index } else { write_index + cap - read_index };
len - (read_counters.in_process_count() as usize)
}
struct CounterStore {
counters: AtomicUsize,
}
impl CounterStore {
pub const fn new() -> CounterStore {
CounterStore { counters: AtomicUsize::new(0) }
}
#[inline(always)]
pub fn load(&self, ordering: Ordering) -> Counters {
Counters(self.counters.load(ordering))
}
#[inline(always)]
pub fn increment_in_progress<F>(&self, error_condition: F, cap_mask: usize) -> Result<(Counters, usize), ()>
where F: Fn(usize, u8) -> bool {
let mut counters = self.load(Ordering::Acquire);
loop {
let in_progress_count = counters.in_process_count();
if in_progress_count == MAXIMUM_IN_PROGRESS {
spin_loop_hint();
counters = self.load(Ordering::Acquire);
continue;
}
let index = counters.index().wrapping_add(in_progress_count as usize) & cap_mask;
if error_condition(index, in_progress_count) {
return Err(());
}
let new_counters = Counters(counters.0.wrapping_add(1));
match self.counters.compare_exchange_weak(counters.0, new_counters.0, Ordering::Acquire, Ordering::Relaxed) {
Ok(_) => return Ok((new_counters, index)),
Err(updated) => counters = Counters(updated)
};
}
}
#[inline(always)]
pub fn increment_done(&self, mut counters: Counters, index: usize, cap_mask: usize) {
if (counters.0 & 0x00FF_FFFF_FFFF_FF00 == (index << 16)) && (index < cap_mask) {
self.counters.fetch_add((1 << 16) - 1, Ordering::Release);
return;
}
loop {
let in_process_count = counters.in_process_count();
let new_counters = Counters(if counters.done_count().wrapping_add(1) == in_process_count {
(counters.index().wrapping_add(in_process_count as usize) & cap_mask) << 16
} else if counters.index() == index {
counters.0.wrapping_add((1 << 16) - 1) & ((cap_mask << 16) | 0xFFFF)
} else {
counters.0.wrapping_add(1 << 8)
});
match self.counters.compare_exchange_weak(counters.0, new_counters.0, Ordering::Release, Ordering::Relaxed) {
Ok(_) => return,
Err(updated) => counters = Counters(updated)
};
}
}
}
#[cfg(test)]
mod tests {
#[test]
pub fn test_increments() {
let read_counter_store = super::CounterStore::new();
let write_counter_store = super::CounterStore::new();
let cap = 16;
let cap_mask = 0xf;
let error_condition = |_, _| { false };
for i in 0..8 {
let read_counters = read_counter_store.load(super::Ordering::Relaxed);
let write_counters = write_counter_store.load(super::Ordering::Relaxed);
assert_eq!((0, (0 + i * 3) % 16, 0, 0, (0 + i * 3) % 16, 0, 0), (super::counter_len(read_counters, write_counters, cap), read_counters.index(), read_counters.in_process_count(), read_counters.done_count(), write_counters.index(), write_counters.in_process_count(), write_counters.done_count()));
write_counter_store.increment_in_progress(error_condition, cap_mask).expect("..");
let read_counters = read_counter_store.load(super::Ordering::Relaxed);
let write_counters = write_counter_store.load(super::Ordering::Relaxed);
assert_eq!((0, (0 + i * 3) % 16, 0, 0, (0 + i * 3) % 16, 1, 0), (super::counter_len(read_counters, write_counters, cap), read_counters.index(), read_counters.in_process_count(), read_counters.done_count(), write_counters.index(), write_counters.in_process_count(), write_counters.done_count()));
write_counter_store.increment_in_progress(error_condition, cap_mask).expect("..");
let read_counters = read_counter_store.load(super::Ordering::Relaxed);
let write_counters = write_counter_store.load(super::Ordering::Relaxed);
assert_eq!((0, (0 + i * 3) % 16, 0, 0, (0 + i * 3) % 16, 2, 0), (super::counter_len(read_counters, write_counters, cap), read_counters.index(), read_counters.in_process_count(), read_counters.done_count(), write_counters.index(), write_counters.in_process_count(), write_counters.done_count()));
write_counter_store.increment_in_progress(error_condition, cap_mask).expect("..");
let read_counters = read_counter_store.load(super::Ordering::Relaxed);
let write_counters = write_counter_store.load(super::Ordering::Relaxed);
assert_eq!((0, (0 + i * 3) % 16, 0, 0, (0 + i * 3) % 16, 3, 0), (super::counter_len(read_counters, write_counters, cap), read_counters.index(), read_counters.in_process_count(), read_counters.done_count(), write_counters.index(), write_counters.in_process_count(), write_counters.done_count()));
write_counter_store.increment_done(write_counters, (0 + i * 3) % 16, cap_mask);
let read_counters = read_counter_store.load(super::Ordering::Relaxed);
let write_counters = write_counter_store.load(super::Ordering::Relaxed);
assert_eq!((1, (0 + i * 3) % 16, 0, 0, (1 + i * 3) % 16, 2, 0), (super::counter_len(read_counters, write_counters, cap), read_counters.index(), read_counters.in_process_count(), read_counters.done_count(), write_counters.index(), write_counters.in_process_count(), write_counters.done_count()));
write_counter_store.increment_done(write_counters, (2 + i * 3) % 16, cap_mask);
let read_counters = read_counter_store.load(super::Ordering::Relaxed);
let write_counters = write_counter_store.load(super::Ordering::Relaxed);
assert_eq!((1, (0 + i * 3) % 16, 0, 0, (1 + i * 3) % 16, 2, 1), (super::counter_len(read_counters, write_counters, cap), read_counters.index(), read_counters.in_process_count(), read_counters.done_count(), write_counters.index(), write_counters.in_process_count(), write_counters.done_count()));
write_counter_store.increment_done(write_counters, (1 + i * 3) % 16, cap_mask);
let read_counters = read_counter_store.load(super::Ordering::Relaxed);
let write_counters = write_counter_store.load(super::Ordering::Relaxed);
assert_eq!((3, (0 + i * 3) % 16, 0, 0, (3 + i * 3) % 16, 0, 0), (super::counter_len(read_counters, write_counters, cap), read_counters.index(), read_counters.in_process_count(), read_counters.done_count(), write_counters.index(), write_counters.in_process_count(), write_counters.done_count()));
read_counter_store.increment_in_progress(error_condition, cap_mask).expect("..");
let read_counters = read_counter_store.load(super::Ordering::Relaxed);
let write_counters = write_counter_store.load(super::Ordering::Relaxed);
assert_eq!((2, (0 + i * 3) % 16, 1, 0, (3 + i * 3) % 16, 0, 0), (super::counter_len(read_counters, write_counters, cap), read_counters.index(), read_counters.in_process_count(), read_counters.done_count(), write_counters.index(), write_counters.in_process_count(), write_counters.done_count()));
read_counter_store.increment_in_progress(error_condition, cap_mask).expect("..");
let read_counters = read_counter_store.load(super::Ordering::Relaxed);
let write_counters = write_counter_store.load(super::Ordering::Relaxed);
assert_eq!((1, (0 + i * 3) % 16, 2, 0, (3 + i * 3) % 16, 0, 0), (super::counter_len(read_counters, write_counters, cap), read_counters.index(), read_counters.in_process_count(), read_counters.done_count(), write_counters.index(), write_counters.in_process_count(), write_counters.done_count()));
read_counter_store.increment_in_progress(error_condition, cap_mask).expect("..");
let read_counters = read_counter_store.load(super::Ordering::Relaxed);
let write_counters = write_counter_store.load(super::Ordering::Relaxed);
assert_eq!((0, (0 + i * 3) % 16, 3, 0, (3 + i * 3) % 16, 0, 0), (super::counter_len(read_counters, write_counters, cap), read_counters.index(), read_counters.in_process_count(), read_counters.done_count(), write_counters.index(), write_counters.in_process_count(), write_counters.done_count()));
read_counter_store.increment_done(read_counters, (1 + i * 3) % 16, cap_mask);
let read_counters = read_counter_store.load(super::Ordering::Relaxed);
let write_counters = write_counter_store.load(super::Ordering::Relaxed);
assert_eq!((0, (0 + i * 3) % 16, 3, 1, (3 + i * 3) % 16, 0, 0), (super::counter_len(read_counters, write_counters, cap), read_counters.index(), read_counters.in_process_count(), read_counters.done_count(), write_counters.index(), write_counters.in_process_count(), write_counters.done_count()));
read_counter_store.increment_done(read_counters, (0 + i * 3) % 16, cap_mask);
let read_counters = read_counter_store.load(super::Ordering::Relaxed);
let write_counters = write_counter_store.load(super::Ordering::Relaxed);
assert_eq!((0, (1 + i * 3) % 16, 2, 1, (3 + i * 3) % 16, 0, 0), (super::counter_len(read_counters, write_counters, cap), read_counters.index(), read_counters.in_process_count(), read_counters.done_count(), write_counters.index(), write_counters.in_process_count(), write_counters.done_count()));
read_counter_store.increment_done(read_counters, (2 + i * 3) % 16, cap_mask);
let read_counters = read_counter_store.load(super::Ordering::Relaxed);
let write_counters = write_counter_store.load(super::Ordering::Relaxed);
assert_eq!((0, (3 + i * 3) % 16, 0, 0, (3 + i * 3) % 16, 0, 0), (super::counter_len(read_counters, write_counters, cap), read_counters.index(), read_counters.in_process_count(), read_counters.done_count(), write_counters.index(), write_counters.in_process_count(), write_counters.done_count()));
}
}
#[test]
pub fn test_pushpop() {
let ring = super::AtomicRingBuffer::with_capacity(900);
assert_eq!(1024, ring.capacity());
assert_eq!(None, ring.try_pop());
ring.push_overwrite(1);
assert_eq!(Some(1), ring.try_pop());
assert_eq!(None, ring.try_pop());
for i in 0..5000 {
ring.push_overwrite(i);
assert_eq!(Some(i), ring.try_pop());
assert_eq!(None, ring.try_pop());
}
for i in 0..199999 {
ring.push_overwrite(i);
}
assert_eq!(ring.capacity(), ring.len() + 1);
assert_eq!(199999 - (ring.capacity() - 1), ring.try_pop().unwrap());
assert_eq!(Ok(()), ring.try_push(199999));
for i in 200000 - (ring.capacity() - 1)..200000 {
assert_eq!(i, ring.try_pop().unwrap());
}
for i in 0..1023 {
ring.try_push(i).expect("push")
}
assert_eq!(1024, ring.capacity());
assert_eq!(1023, ring.len());
for i in 0..1023 {
assert_eq!(ring.try_pop(), Some(i));
ring.try_push(i).expect("push")
}
}
#[test]
pub fn test_pushpop_large() {
let ring = super::AtomicRingBuffer::with_capacity(65535);
assert_eq!(None, ring.try_pop());
ring.push_overwrite(1);
assert_eq!(Some(1), ring.try_pop());
for i in 0..200000 {
ring.push_overwrite(i);
assert_eq!(Some(i), ring.try_pop());
}
for i in 0..200000 {
ring.push_overwrite(i);
}
assert_eq!(ring.capacity(), ring.len() + 1);
for i in 200000 - (ring.capacity() - 1)..200000 {
assert_eq!(i, ring.try_pop().unwrap());
}
}
#[test]
pub fn test_pushpop_large2() {
let ring = super::AtomicRingBuffer::with_capacity(65536);
assert_eq!(None, ring.try_pop());
ring.push_overwrite(1);
assert_eq!(Some(1), ring.try_pop());
for i in 0..200000 {
ring.push_overwrite(i);
assert_eq!(Some(i), ring.try_pop());
}
for i in 0..200000 {
ring.push_overwrite(i);
}
assert_eq!(ring.capacity(), ring.len() + 1);
for i in 200000 - (ring.capacity() - 1)..200000 {
assert_eq!(i, ring.try_pop().unwrap());
}
}
#[test]
pub fn test_pushpop_large2_zerotype() {
#[derive(Eq, PartialEq, Debug)]
struct ZeroType {}
let ring = super::AtomicRingBuffer::with_capacity(65536);
assert_eq!(0, ring.memory_usage());
assert_eq!(None, ring.try_pop());
ring.push_overwrite(ZeroType {});
assert_eq!(Some(ZeroType {}), ring.try_pop());
for _i in 0..200000 {
ring.push_overwrite(ZeroType {});
assert_eq!(Some(ZeroType {}), ring.try_pop());
}
for _i in 0..200000 {
ring.push_overwrite(ZeroType {});
}
assert_eq!(ring.capacity(), ring.len() + 1);
for _i in 200000 - (ring.capacity() - 1)..200000 {
assert_eq!(ZeroType {}, ring.try_pop().unwrap());
}
}
#[test]
pub fn test_threaded() {
let cap = 65535;
let buf: super::AtomicRingBuffer<usize> = super::AtomicRingBuffer::with_capacity(cap);
for i in 0..cap {
buf.try_push(i).expect("init");
}
let arc = ::std::sync::Arc::new(buf);
let mut handles = Vec::new();
let end = ::std::time::Instant::now() + ::std::time::Duration::from_millis(10000);
for _thread_num in 0..100 {
let buf = ::std::sync::Arc::clone(&arc);
handles.push(::std::thread::spawn(move || {
while ::std::time::Instant::now() < end {
let a = pop_wait(&buf);
let b = pop_wait(&buf);
while let Err(_) = buf.try_push(a) {};
while let Err(_) = buf.try_push(b) {};
}
}));
}
for (_idx, handle) in handles.into_iter().enumerate() {
handle.join().expect("join");
}
assert_eq!(arc.len(), cap);
let mut expected: Vec<usize> = Vec::new();
let mut actual: Vec<usize> = Vec::new();
for i in 0..cap {
expected.push(i);
actual.push(arc.try_pop().expect("check"));
}
actual.sort_by(|&a, b| a.partial_cmp(b).unwrap());
assert_eq!(actual, expected);
}
#[test]
pub fn test_push_overwrite() {
let ring = super::AtomicRingBuffer::with_capacity(3);
ring.push_overwrite(10);
assert_eq!(Some(10), ring.try_pop());
assert_eq!(None, ring.try_pop());
for i in 0..3 {
assert_eq!(Ok(()), ring.try_push(i));
}
assert_eq!(3, ring.len());
ring.push_overwrite(3);
assert_eq!(Some(1), ring.try_pop());
assert_eq!(Some(2), ring.try_pop());
assert_eq!(Some(3), ring.try_pop());
}
static DROP_COUNT: ::std::sync::atomic::AtomicUsize = ::std::sync::atomic::AtomicUsize::new(0);
#[allow(dead_code)]
#[derive(Debug, Default)]
struct TestType {
some: usize
}
impl Drop for TestType {
fn drop(&mut self) {
DROP_COUNT.fetch_add(1, ::std::sync::atomic::Ordering::Relaxed);
}
}
#[test]
pub fn test_dropcount() {
DROP_COUNT.store(0, ::std::sync::atomic::Ordering::Relaxed);
{
let buf: super::AtomicRingBuffer<TestType> = super::AtomicRingBuffer::with_capacity(1024);
buf.try_push(TestType { some: 0 }).expect("push");
buf.try_push(TestType { some: 0 }).expect("push");
assert_eq!(0, DROP_COUNT.load(::std::sync::atomic::Ordering::Relaxed));
buf.try_pop();
assert_eq!(1, DROP_COUNT.load(::std::sync::atomic::Ordering::Relaxed));
}
assert_eq!(2, DROP_COUNT.load(::std::sync::atomic::Ordering::Relaxed));
}
#[test]
pub fn test_inline_dropcount() {
DROP_COUNT.store(0, ::std::sync::atomic::Ordering::Relaxed);
{
let buf: super::AtomicRingBuffer<TestType> = super::AtomicRingBuffer::with_capacity(1024);
buf.try_write(|w| { w.some = 0 }).expect("push");
buf.try_write(|w| { w.some = 0 }).expect("push");
assert_eq!(0, DROP_COUNT.load(::std::sync::atomic::Ordering::Relaxed));
buf.try_read(|_| {});
assert_eq!(1, DROP_COUNT.load(::std::sync::atomic::Ordering::Relaxed));
}
assert_eq!(2, DROP_COUNT.load(::std::sync::atomic::Ordering::Relaxed));
}
#[test]
pub fn test_unsafe_dropcount() {
DROP_COUNT.store(0, ::std::sync::atomic::Ordering::Relaxed);
{
let buf: super::AtomicRingBuffer<TestType> = super::AtomicRingBuffer::with_capacity(1024);
buf.try_unsafe_write(|w| unsafe { ::std::ptr::write(w, TestType { some: 0 }) }).expect("push");
buf.try_unsafe_write(|w| unsafe { ::std::ptr::write(w, TestType { some: 0 }) }).expect("push");
assert_eq!(0, DROP_COUNT.load(::std::sync::atomic::Ordering::Relaxed));
buf.try_read(|_| {});
assert_eq!(1, DROP_COUNT.load(::std::sync::atomic::Ordering::Relaxed));
}
assert_eq!(2, DROP_COUNT.load(::std::sync::atomic::Ordering::Relaxed));
}
fn pop_wait(buf: &::std::sync::Arc<super::AtomicRingBuffer<usize>>) -> usize {
loop {
match buf.try_pop() {
None => continue,
Some(v) => return v,
}
}
}
}