#![deny(missing_docs)]
extern crate testbench;
use std::cell::UnsafeCell;
use std::ops::BitAnd;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
#[derive(Debug)]
pub struct SPMCBuffer<T: Send + Sync> {
input: SPMCBufferInput<T>,
output: SPMCBufferOutput<T>,
}
impl<T: Clone + Send + Sync> SPMCBuffer<T> {
pub fn new(read_buffers: usize, initial: T) -> Self {
Self::new_impl(read_buffers, || initial.clone())
}
}
impl<T: Default + Send + Sync> SPMCBuffer<T> {
pub fn with_default(read_buffers: usize) -> Self {
Self::new_impl(read_buffers, || T::default())
}
}
impl<T: Send + Sync> SPMCBuffer<T> {
fn new_impl<F>(read_buffers: usize, mut generator: F) -> Self
where F: FnMut() -> T
{
assert!(read_buffers <= MAX_READ_BUFFERS);
let num_buffers = 2 + read_buffers;
let buffers =
(0..num_buffers).map(|_| Buffer {
data: UnsafeCell::new(generator()),
done_readers: AtomicRefCount::new(0),
})
.collect();
let shared_state =
Arc::new(SPMCBufferSharedState {
buffers,
latest_info: AtomicSharedIndex::new(1),
});
let mut result = SPMCBuffer {
input: SPMCBufferInput {
shared: shared_state.clone(),
reader_counts: vec![0; num_buffers],
},
output: SPMCBufferOutput {
shared: shared_state,
read_idx: 0,
},
};
result.input.reader_counts[0] = INFINITE_REFCOUNT;
result
}
pub fn split(self) -> (SPMCBufferInput<T>, SPMCBufferOutput<T>) {
(self.input, self.output)
}
}
impl<T: Clone + Send + Sync> Clone for SPMCBuffer<T> {
fn clone(&self) -> Self {
let shared_state = Arc::new(unsafe { (*self.input.shared).clone() });
SPMCBuffer {
input: SPMCBufferInput {
shared: shared_state.clone(),
reader_counts: self.input.reader_counts.clone(),
},
output: SPMCBufferOutput {
shared: shared_state,
read_idx: self.output.read_idx,
},
}
}
}
impl<T: PartialEq + Send + Sync> PartialEq for SPMCBuffer<T> {
fn eq(&self, other: &Self) -> bool {
let shared_states_equal =
unsafe { (*self.input.shared).eq(&*other.input.shared) };
shared_states_equal &&
(self.input.reader_counts == other.input.reader_counts) &&
(self.output.read_idx == other.output.read_idx)
}
}
#[derive(Debug)]
pub struct SPMCBufferInput<T: Send + Sync> {
shared: Arc<SPMCBufferSharedState<T>>,
reader_counts: Vec<RefCount>,
}
impl<T: Send + Sync> SPMCBufferInput<T> {
pub fn write(&mut self, value: T) {
let ref shared_state = *self.shared;
let write_idx: usize;
loop {
let mut buf_rc_iter =
shared_state.buffers.iter().zip(self.reader_counts.iter());
let write_pos =
buf_rc_iter.position(|tuple| {
let (buffer, refcount) = tuple;
*refcount ==
buffer.done_readers
.load(Ordering::Relaxed)
});
if let Some(idx) = write_pos {
write_idx = idx;
break;
} else {
thread::yield_now();
}
}
let ref write_buffer = shared_state.buffers[write_idx];
let write_ptr = write_buffer.data.get();
unsafe {
*write_ptr = value;
}
write_buffer.done_readers.store(0, Ordering::Relaxed);
let former_latest_info = shared_state.latest_info.swap(
write_idx * SHARED_INDEX_MULTIPLIER,
Ordering::Release );
debug_assert!(former_latest_info.bitand(SHARED_OVERFLOW_BIT) == 0);
let former_idx = former_latest_info.bitand(SHARED_INDEX_MASK) /
SHARED_INDEX_MULTIPLIER;
let former_readcount = former_latest_info.bitand(SHARED_READCOUNT_MASK);
self.reader_counts[former_idx] = former_readcount;
self.reader_counts[write_idx] = INFINITE_REFCOUNT;
}
}
#[derive(Debug)]
pub struct SPMCBufferOutput<T: Send + Sync> {
shared: Arc<SPMCBufferSharedState<T>>,
read_idx: BufferIndex,
}
impl<T: Send + Sync> SPMCBufferOutput<T> {
pub fn read(&mut self) -> &T {
let ref shared_state = *self.shared;
let latest_info = shared_state.latest_info.load(Ordering::Relaxed);
let update_available = latest_info.bitand(SHARED_INDEX_MASK) !=
(self.read_idx * SHARED_INDEX_MULTIPLIER);
if update_available {
let latest_info = shared_state.latest_info.fetch_add(
1,
Ordering::Acquire );
unsafe {
self.discard_read_buffer(Ordering::Relaxed);
}
debug_assert!((latest_info + 1).bitand(SHARED_OVERFLOW_BIT) == 0);
self.read_idx = latest_info.bitand(SHARED_INDEX_MASK) /
SHARED_INDEX_MULTIPLIER;
}
let read_ptr = shared_state.buffers[self.read_idx].data.get();
unsafe { &*read_ptr }
}
unsafe fn discard_read_buffer(&self, order: Ordering) {
self.shared.buffers[self.read_idx].done_readers.fetch_add(1, order);
}
}
impl<T: Send + Sync> Clone for SPMCBufferOutput<T> {
fn clone(&self) -> Self {
let shared_state = self.shared.clone();
let latest_info = shared_state.latest_info.fetch_add(
1,
Ordering::Acquire );
let new_read_idx = latest_info.bitand(SHARED_INDEX_MASK) /
SHARED_INDEX_MULTIPLIER;
SPMCBufferOutput {
shared: shared_state,
read_idx: new_read_idx,
}
}
}
impl<T: Send + Sync> Drop for SPMCBufferOutput<T> {
fn drop(&mut self) {
unsafe {
self.discard_read_buffer(Ordering::Release);
}
}
}
#[derive(Debug)]
struct SPMCBufferSharedState<T: Send + Sync> {
buffers: Vec<Buffer<T>>,
latest_info: AtomicSharedIndex,
}
impl<T: Clone + Send + Sync> SPMCBufferSharedState<T> {
unsafe fn clone(&self) -> Self {
let buffers = self.buffers.iter().map(|b| b.clone()).collect();
SPMCBufferSharedState {
buffers,
latest_info: AtomicSharedIndex::new(
self.latest_info.load(Ordering::Relaxed)
),
}
}
}
impl<T: PartialEq + Send + Sync> SPMCBufferSharedState<T> {
unsafe fn eq(&self, other: &Self) -> bool {
let buffers_equal = self.buffers.iter()
.zip(other.buffers.iter())
.all(|tuple| -> bool {
let (buf1, buf2) = tuple;
buf1.eq(buf2)
});
buffers_equal &&
(self.latest_info.load(Ordering::Relaxed) ==
other.latest_info.load(Ordering::Relaxed))
}
}
unsafe impl<T: Send + Sync> Sync for SPMCBufferSharedState<T> {}
#[derive(Debug)]
struct Buffer<T: Send + Sync> {
data: UnsafeCell<T>,
done_readers: AtomicRefCount,
}
impl<T: Clone + Send + Sync> Buffer<T> {
unsafe fn clone(&self) -> Self {
Buffer {
data: UnsafeCell::new((*self.data.get()).clone()),
done_readers: AtomicRefCount::new(self.done_readers
.load(Ordering::Relaxed)),
}
}
}
impl<T: PartialEq + Send + Sync> Buffer<T> {
unsafe fn eq(&self, other: &Self) -> bool {
let dr1 = self.done_readers.load(Ordering::Relaxed);
let dr2 = other.done_readers.load(Ordering::Relaxed);
(*self.data.get() == *other.data.get()) && (dr1 == dr2)
}
}
type BufferIndex = usize;
type RefCount = usize;
const INFINITE_REFCOUNT: RefCount = 0xffff;
type AtomicRefCount = AtomicUsize;
type SharedIndex = usize;
type AtomicSharedIndex = AtomicUsize;
const SHARED_READCOUNT_MASK: SharedIndex = 0b0000_0001_1111_1111;
const SHARED_OVERFLOW_BIT: SharedIndex = 0b0000_0010_0000_0000;
const SHARED_INDEX_MASK: SharedIndex = 0b1111_1100_0000_0000;
const SHARED_INDEX_MULTIPLIER: SharedIndex = 0b0000_0100_0000_0000;
const MAX_BUFFERS: usize = SHARED_INDEX_MASK / SHARED_INDEX_MULTIPLIER + 1;
const MAX_READ_BUFFERS: usize = MAX_BUFFERS - 2;
#[cfg(test)]
mod tests {
use std::ops::BitAnd;
use std::sync::{Arc, Condvar, Mutex};
use std::sync::atomic::Ordering;
use std::thread;
use std::time::Duration;
use testbench;
use testbench::race_cell::{Racey, UsizeRaceCell};
#[test]
fn initial_state() {
test_initialization(0);
test_initialization(1);
test_initialization(::MAX_READ_BUFFERS);
}
#[test]
#[should_panic]
fn too_many_readers() {
test_initialization(::MAX_READ_BUFFERS + 1);
}
#[test]
fn write_write_sequence() {
let mut buf = ::SPMCBuffer::new(0, 1.0);
let old_buf = buf.clone();
buf.input.write(4.2);
{
let mut expected_buf = old_buf.clone();
let ref expected_shared = expected_buf.input.shared;
let old_read_idx = old_buf.output.read_idx;
let write_idx = 1 - old_read_idx;
let write_ptr = expected_shared.buffers[write_idx].data.get();
unsafe {
*write_ptr = 4.2;
}
let new_latest_info = write_idx * ::SHARED_INDEX_MULTIPLIER;
expected_shared.latest_info.store(new_latest_info,
Ordering::Relaxed);
expected_buf.input.reader_counts[write_idx] = ::INFINITE_REFCOUNT;
expected_buf.input.reader_counts[old_read_idx] = 1;
assert_eq!(buf, expected_buf);
}
{
let sync = Arc::new((Mutex::new(0), Condvar::new()));
let writer_sync = sync.clone();
let (mut buf_input, mut buf_output) = buf.split();
let writer = thread::spawn(move || {
*writer_sync.0.lock().unwrap() = 1;
buf_input.write(2.4);
*writer_sync.0.lock().unwrap() = 2;
writer_sync.1.notify_all();
});
let shared_lock = sync.0.lock().unwrap();
let wait_result =
sync.1.wait_timeout(shared_lock, Duration::from_millis(100));
let (shared_lock, timeout_result) = wait_result.unwrap();
assert!(timeout_result.timed_out());
assert_eq!(*shared_lock, 1);
let _ = buf_output.read();
let wait_result =
sync.1.wait_timeout(shared_lock, Duration::from_millis(100));
let (shared_lock, timeout_result) = wait_result.unwrap();
assert!(!timeout_result.timed_out());
assert_eq!(*shared_lock, 2);
writer.join().unwrap();
}
}
#[test]
fn write_read_read_sequence() {
let mut buf = ::SPMCBuffer::new(0, false);
buf.input.write(true);
{
let old_buf = buf.clone();
let ref old_shared = old_buf.input.shared;
let result = *buf.output.read();
assert_eq!(result, true);
let mut expected_buf = old_buf.clone();
let ref expected_shared = expected_buf.input.shared;
let old_read_idx = old_buf.output.read_idx;
expected_shared.buffers[old_read_idx]
.done_readers
.store(1, Ordering::Relaxed);
let latest_idx = old_shared.latest_info
.load(Ordering::Relaxed)
.bitand(::SHARED_INDEX_MASK) /
::SHARED_INDEX_MULTIPLIER;
expected_buf.output.read_idx = latest_idx;
expected_shared.latest_info.fetch_add(1, Ordering::Relaxed);
assert_eq!(buf, expected_buf);
}
{
let old_buf = buf.clone();
let result = *buf.output.read();
assert_eq!(result, true);
assert_eq!(buf, old_buf);
}
}
#[test]
fn dirty_read_write_sequence() {
let mut buf = ::SPMCBuffer::new(0, [1, 2, 3]);
buf.input.write([4, 5, 6]);
let _ = buf.output.read();
let old_buf = buf.clone();
buf.input.write([7, 8, 9]);
{
let mut expected_buf = old_buf.clone();
let ref expected_shared = expected_buf.input.shared;
let old_read_idx = old_buf.output.read_idx;
let write_idx = 1 - old_read_idx;
let ref write_buffer = expected_shared.buffers[write_idx];
let write_ptr = write_buffer.data.get();
unsafe {
*write_ptr = [7, 8, 9];
}
write_buffer.done_readers.store(0, Ordering::Relaxed);
let new_latest_info = write_idx * ::SHARED_INDEX_MULTIPLIER;
expected_shared.latest_info.store(new_latest_info,
Ordering::Relaxed);
expected_buf.input.reader_counts[write_idx] = ::INFINITE_REFCOUNT;
expected_buf.input.reader_counts[old_read_idx] = 1;
assert_eq!(buf, expected_buf);
}
}
#[test]
fn spawn_new_reader() {
let buf = ::SPMCBuffer::new(0, (64, 4.6));
let old_buf = buf.clone();
let new_output = buf.output.clone();
{
let expected_buf = old_buf.clone();
let ref expected_shared = expected_buf.input.shared;
let old_latest = expected_shared.latest_info
.fetch_add(1, Ordering::Relaxed);
let latest_idx = old_latest.bitand(::SHARED_INDEX_MASK) /
::SHARED_INDEX_MULTIPLIER;
assert_eq!(new_output.read_idx, latest_idx);
assert_eq!(buf, expected_buf);
}
}
#[test]
#[ignore]
fn uncontended_concurrent_access() {
test_rate_limited_writes(false);
test_rate_limited_writes(true);
}
#[test]
#[ignore]
fn contended_concurrent_access() {
test_max_rate_writes(false);
test_max_rate_writes(true);
}
fn test_initialization(read_buffers: usize) {
let buf = ::SPMCBuffer::new(read_buffers, 42);
let ref buf_shared = *buf.input.shared;
let num_buffers = buf_shared.buffers.len();
assert_eq!(num_buffers, 2 + read_buffers);
let latest_info = buf_shared.latest_info.load(Ordering::Relaxed);
let reader_count = latest_info.bitand(::SHARED_READCOUNT_MASK);
assert_eq!(reader_count, 1);
let overflow = latest_info.bitand(::SHARED_OVERFLOW_BIT) != 0;
assert!(!overflow);
let latest_idx = latest_info.bitand(::SHARED_INDEX_MASK) /
::SHARED_INDEX_MULTIPLIER;
assert!(latest_idx < num_buffers);
assert_eq!(buf.output.read_idx, latest_idx);
let ref buffers = buf_shared.buffers;
let read_ptr = buffers[latest_idx].data.get();
assert_eq!(unsafe { *read_ptr }, 42);
for buffer in buffers {
assert_eq!(buffer.done_readers.load(Ordering::Relaxed), 0);
}
let indexes_and_refcounts = buf.input
.reader_counts
.iter()
.enumerate();
for tuple in indexes_and_refcounts {
let (index, refcount) = tuple;
if index != latest_idx {
assert_eq!(*refcount, 0);
} else {
assert_eq!(*refcount, ::INFINITE_REFCOUNT);
}
}
}
fn test_rate_limited_writes(wait_free_regime: bool) {
const TEST_WRITE_COUNT: usize = 100;
run_concurrent_test(
wait_free_regime,
UsizeRaceCell::new(0),
|mut buf_input: ::SPMCBufferInput<UsizeRaceCell>| {
for value in 1..(TEST_WRITE_COUNT + 1) {
buf_input.write(UsizeRaceCell::new(value));
thread::yield_now();
thread::sleep(Duration::from_millis(100));
}
},
|mut buf_output: ::SPMCBufferOutput<UsizeRaceCell>| {
let mut last_value = 0usize;
while last_value != TEST_WRITE_COUNT {
let new_racey_value = buf_output.read().get();
match new_racey_value {
Racey::Consistent(new_value) => {
assert!((new_value >= last_value) &&
(new_value - last_value <= 1));
last_value = new_value;
}
Racey::Inconsistent => {
panic!("Inconsistent state exposed by the buffer!");
}
}
}
}
);
}
fn test_max_rate_writes(wait_free_regime: bool) {
const TEST_WRITE_COUNT: usize = 20_000_000;
run_concurrent_test(
wait_free_regime,
UsizeRaceCell::new(0),
|mut buf_input: ::SPMCBufferInput<UsizeRaceCell>| {
for value in 1..(TEST_WRITE_COUNT + 1) {
buf_input.write(UsizeRaceCell::new(value));
}
},
|mut buf_output: ::SPMCBufferOutput<UsizeRaceCell>| {
let mut last_value = 0usize;
while last_value != TEST_WRITE_COUNT {
let new_racey_value = buf_output.read().get();
match new_racey_value {
Racey::Consistent(new_value) => {
assert!((new_value >= last_value) &&
(new_value <= TEST_WRITE_COUNT));
last_value = new_value;
}
Racey::Inconsistent => {
panic!("Inconsistent state exposed by the buffer!");
}
}
}
}
);
}
fn run_concurrent_test<T, P, C>(wait_free_regime: bool,
initial: T,
producer: P,
consumer: C)
where T: Clone + Send + Sync + 'static,
P: FnOnce(::SPMCBufferInput<T>) + Send + 'static,
C: Fn(::SPMCBufferOutput<T>) + Send + Sync + 'static
{
let wf_conc_readers = if wait_free_regime { 2 } else { 1 };
let buffer = ::SPMCBuffer::new(wf_conc_readers, initial);
let (buf_input, buf_output1) = buffer.split();
let buf_output2 = buf_output1.clone();
let consumer1 = Arc::new(consumer);
let consumer2 = consumer1.clone();
testbench::concurrent_test_3(move || producer(buf_input),
move || consumer1(buf_output1),
move || consumer2(buf_output2));
}
}
#[cfg(test)]
mod benchmarks {
use testbench;
#[test]
#[ignore]
fn clean_read() {
let mut buf = ::SPMCBuffer::new(1, 0u32);
testbench::benchmark(3_000_000_000u32, || {
let read = *buf.output.read();
assert!(read < u32::max_value());
});
}
#[test]
#[ignore]
fn write() {
let mut buf = ::SPMCBuffer::new(1, 0u32);
let mut iter = 1u32;
testbench::benchmark(300_000_000u32, || {
buf.input.write(iter);
iter += 1;
});
}
#[test]
#[ignore]
fn write_and_dirty_read() {
let mut buf = ::SPMCBuffer::new(1, 0u32);
let mut iter = 1u32;
testbench::benchmark(140_000_000u32, || {
buf.input.write(iter);
iter += 1;
let read = *buf.output.read();
assert!(read < u32::max_value());
});
}
#[test]
#[ignore]
fn concurrent_read() {
let buf = ::SPMCBuffer::new(1, 0u32);
let (mut buf_input, mut buf_output) = buf.split();
let mut counter = 0u32;
testbench::concurrent_benchmark(
80_000_000u32,
move || {
let read = *buf_output.read();
assert!(read < u32::max_value());
},
move || {
buf_input.write(counter);
counter = (counter + 1) % u32::max_value();
}
);
}
#[test]
#[ignore]
fn concurrent_write() {
let buf = ::SPMCBuffer::new(1, 0u32);
let (mut buf_input, mut buf_output) = buf.split();
let mut iter = 1u32;
testbench::concurrent_benchmark(
30_000_000u32,
move || {
buf_input.write(iter);
iter += 1;
},
move || {
let read = *buf_output.read();
assert!(read < u32::max_value());
}
);
}
}