use ::alloc::{alloc, sync::Arc};
use core::{
alloc::Layout,
cell::Cell,
mem, ptr,
sync::atomic::{AtomicUsize, Ordering},
usize,
};
#[cfg(target_arch = "s390x")]
const CACHELINE_LEN: usize = 256;
#[cfg(any(target_arch = "x86_64", target_arch = "aarch64", target_arch = "powerpc64",))]
const CACHELINE_LEN: usize = 128;
#[cfg(not(any(
target_arch = "x86_64",
target_arch = "aarch64",
target_arch = "powerpc64",
target_arch = "arm",
target_arch = "mips",
target_arch = "mips64",
target_arch = "riscv64",
target_arch = "s390x",
)))]
const CACHELINE_LEN: usize = 64;
#[cfg(any(target_arch = "arm", target_arch = "mips", target_arch = "mips64", target_arch = "riscv64",))]
const CACHELINE_LEN: usize = 32;
const CACHELINE: usize = CACHELINE_LEN / core::mem::size_of::<usize>();
#[repr(C)]
pub struct Buffer<T> {
buffer: *mut T,
capacity: usize,
_padding1: [usize; CACHELINE - 2],
head: AtomicUsize,
shadow_tail: Cell<usize>,
_padding2: [usize; CACHELINE - 2],
tail: AtomicUsize,
shadow_head: Cell<usize>,
_padding3: [usize; CACHELINE - 2],
}
pub struct Consumer<T> {
buffer: Arc<Buffer<T>>,
}
pub struct Producer<T> {
buffer: Arc<Buffer<T>>,
}
unsafe impl<T: Send> Send for Consumer<T> {}
unsafe impl<T: Send> Send for Producer<T> {}
impl<T> Buffer<T> {
pub fn try_pop(&self) -> Option<T> {
let current_head = self.head.load(Ordering::Relaxed);
if current_head == self.shadow_tail.get() {
self.shadow_tail.set(self.tail.load(Ordering::Acquire));
if current_head == self.shadow_tail.get() {
return None;
}
}
let v = unsafe { ptr::read(self.load(current_head)) };
self.head.store(current_head.wrapping_add(1), Ordering::Release);
Some(v)
}
pub fn pop(&self) -> T {
loop {
match self.try_pop() {
None => {}
Some(v) => return v,
}
}
}
pub fn try_push(&self, v: T) -> Option<T> {
let current_tail = self.tail.load(Ordering::Relaxed);
if self.shadow_head.get() + self.capacity <= current_tail {
self.shadow_head.set(self.head.load(Ordering::Relaxed));
if self.shadow_head.get() + self.capacity <= current_tail {
return Some(v);
}
}
unsafe {
self.store(current_tail, v);
}
self.tail.store(current_tail.wrapping_add(1), Ordering::Release);
None
}
pub fn push(&self, v: T) {
let mut t = v;
loop {
match self.try_push(t) {
Some(rv) => t = rv,
None => return,
}
}
}
#[inline]
unsafe fn load(&self, pos: usize) -> &T {
&*self.buffer.offset((pos & (self.capacity - 1)) as isize)
}
#[inline]
unsafe fn store(&self, pos: usize, v: T) {
let end = self.buffer.offset((pos & (self.capacity - 1)) as isize);
ptr::write(&mut *end, v);
}
}
impl<T> Drop for Buffer<T> {
fn drop(&mut self) {
while let Some(_) = self.try_pop() {}
unsafe {
let layout =
Layout::from_size_align(self.capacity * mem::size_of::<T>(), mem::align_of::<T>()).unwrap();
alloc::dealloc(self.buffer as *mut u8, layout);
}
}
}
pub fn make<T>(min_capacity: usize) -> (Producer<T>, Consumer<T>) {
let capacity = min_capacity.next_power_of_two();
let ptr = unsafe { allocate_buffer(capacity) };
let arc = Arc::new(Buffer {
buffer: ptr,
capacity,
_padding1: [0; CACHELINE - 2],
head: AtomicUsize::new(0),
shadow_tail: Cell::new(0),
_padding2: [0; CACHELINE - 2],
tail: AtomicUsize::new(0),
shadow_head: Cell::new(0),
_padding3: [0; CACHELINE - 2],
});
(
Producer {
buffer: arc.clone(),
},
Consumer {
buffer: arc.clone(),
},
)
}
unsafe fn allocate_buffer<T>(capacity: usize) -> *mut T {
let adjusted_size = capacity.next_power_of_two();
let size = adjusted_size.checked_mul(mem::size_of::<T>()).expect("capacity overflow");
let layout = Layout::from_size_align(size, mem::align_of::<T>()).unwrap();
let ptr = alloc::alloc(layout);
if ptr.is_null() {
alloc::handle_alloc_error(layout)
} else {
ptr as *mut T
}
}
impl<T> Producer<T> {
#[inline]
pub fn push(&self, v: T) {
(*self.buffer).push(v);
}
#[inline]
pub fn try_push(&self, v: T) -> Option<T> {
(*self.buffer).try_push(v)
}
pub fn capacity(&self) -> usize {
(*self.buffer).capacity
}
pub fn size(&self) -> usize {
(*self.buffer).tail.load(Ordering::Acquire) - (*self.buffer).head.load(Ordering::Acquire)
}
pub fn free_space(&self) -> usize {
self.capacity() - self.size()
}
}
impl<T> Consumer<T> {
#[inline]
pub fn pop(&self) -> T {
(*self.buffer).pop()
}
#[inline]
pub fn try_pop(&self) -> Option<T> {
(*self.buffer).try_pop()
}
pub fn capacity(&self) -> usize {
(*self.buffer).capacity
}
pub fn size(&self) -> usize {
(*self.buffer).tail.load(Ordering::Acquire) - (*self.buffer).head.load(Ordering::Acquire)
}
}
#[cfg(test)]
mod tests {
#![allow(unused_imports)]
use super::{Buffer, CACHELINE_LEN};
#[test]
fn buffer_size() {
assert_eq!(::core::mem::size_of::<Buffer<()>>(), 3 * CACHELINE_LEN);
}
#[test]
fn producer_push() {
let (p, _) = super::make(10);
for i in 0..9 {
p.push(i);
assert!(p.capacity() == 10);
assert!(p.size() == i + 1);
}
}
#[test]
fn consumer_pop() {
let (p, c) = super::make(10);
for i in 0..9 {
p.push(i);
assert!(p.capacity() == 10);
assert!(p.size() == i + 1);
}
for i in 0..9 {
assert!(c.size() == 9 - i);
let t = c.pop();
assert!(c.capacity() == 10);
assert!(c.size() == 9 - i - 1);
assert!(t == i);
}
}
#[test]
fn try_push() {
let (p, _) = super::make(10);
for i in 0..10 {
p.push(i);
assert!(p.capacity() == 10);
assert!(p.size() == i + 1);
}
match p.try_push(10) {
Some(v) => {
assert!(v == 10);
}
None => assert!(false, "Queue should not have accepted another write!"),
}
}
#[test]
fn try_poll() {
let (p, c) = super::make(10);
match c.try_pop() {
Some(_) => assert!(false, "Queue was empty but a value was read!"),
None => {}
}
p.push(123);
match c.try_pop() {
Some(v) => assert!(v == 123),
None => assert!(false, "Queue was not empty but poll() returned nothing!"),
}
match c.try_pop() {
Some(_) => assert!(false, "Queue was empty but a value was read!"),
None => {}
}
}
#[test]
fn threaded() {
use std::thread;
let (p, c) = super::make(500);
thread::spawn(move || {
for i in 0..100000 {
p.push(i);
}
});
for i in 0..100000 {
let t = c.pop();
assert!(t == i);
}
}
}