#![feature(test)]
use std::{fmt, ptr};
#[cfg(loom)]
pub(crate) use loom::{
cell::{ConstPtr, MutPtr, UnsafeCell},
sync::atomic::{AtomicBool, AtomicUsize, Ordering},
};
#[cfg(not(loom))]
pub(crate) use std::{
cell::UnsafeCell,
ops::{Deref, DerefMut},
sync::atomic::{AtomicBool, AtomicUsize, Ordering},
};
const OBJECT_PERMUTATIONS: &[usize; 6] = &[
0b000, 0b001, 0b010, 0b011, 0b100, 0b101, ];
const WRITER_STATE_MAP: &[usize; 16] = &[
0b1011, 0b1011, 0b1011, 0b1011, 0b1001, 0b1001, 0b11111111, 0b11111111, 0b0011, 0b0011, 0b0011, 0b0011, 0b0001, 0b0001, 0b11111111, 0b11111111, ];
const WRITER_CUP_MAP: &[usize; 16] = &[0, 0, 2, 1, 2, 1, 3, 3, 0, 0, 2, 1, 2, 1, 3, 3];
const READER_STATE_MAP: &[usize; 16] = &[
0b0000, 0b0000, 0b0000, 0b0000, 0b0000, 0b0000, 0b11111111, 0b11111111, 0b1001, 0b1001, 0b1110, 0b1110, 0b1110, 0b1110, 0b11111111, 0b11111111, ];
const READER_CUP_MAP: &[usize; 16] = &[2, 1, 1, 2, 0, 0, 3, 3, 2, 1, 1, 2, 0, 0, 3, 3];
struct Cupchan<T> {
cups: [UnsafeCell<T>; 3], state: AtomicUsize,
unconnected: AtomicBool,
}
pub fn cupchan<T: Clone>(initial: T) -> (CupchanWriter<T>, CupchanReader<T>) {
let chan = Cupchan {
cups: [
UnsafeCell::new(initial.clone()),
UnsafeCell::new(initial.clone()),
UnsafeCell::new(initial),
],
state: AtomicUsize::new(OBJECT_PERMUTATIONS[0]), unconnected: AtomicBool::new(false),
};
let chan = Box::leak(Box::new(chan)); (
CupchanWriter {
chan,
current_cup: &chan.cups[0],
},
CupchanReader { chan },
)
}
impl<T: fmt::Debug> fmt::Debug for Cupchan<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Cupchan")
.field("cups", &self.cups)
.field("state", &self.state.load(Ordering::SeqCst))
.field("unconnected", &self.unconnected.load(Ordering::SeqCst))
.finish()
}
}
#[derive(Debug)]
pub struct CupchanWriter<T: 'static> {
chan: &'static Cupchan<T>,
current_cup: &'static UnsafeCell<T>,
}
impl<T> CupchanWriter<T> {
fn new(chan: &'static Cupchan<T>) -> Self {
let cup_index = WRITER_CUP_MAP[chan.state.load(Ordering::Acquire)];
Self {
chan,
current_cup: &chan.cups[cup_index],
}
}
pub fn flush(&mut self) {
let res = self
.chan
.state
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |state| {
Some(state ^ WRITER_STATE_MAP[state])
})
.unwrap();
self.current_cup = &self.chan.cups[WRITER_CUP_MAP[res ^ WRITER_STATE_MAP[res]]];
}
pub fn new_reader(&self) -> Option<CupchanReader<T>> {
if self.chan.unconnected.swap(false, Ordering::SeqCst) {
Some(CupchanReader::new(self.chan))
} else {
None
}
}
#[cfg(loom)]
pub fn loom_ptr(&mut self) -> MutPtr<T> {
self.current_cup.get_mut()
}
}
#[cfg(not(loom))]
impl<T> Deref for CupchanWriter<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
unsafe { &*self.current_cup.get() }
}
}
#[cfg(not(loom))]
impl<T> DerefMut for CupchanWriter<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { &mut *self.current_cup.get() }
}
}
impl<T> Drop for CupchanWriter<T> {
fn drop(&mut self) {
let was_unconnected = self.chan.unconnected.swap(true, Ordering::AcqRel);
if was_unconnected {
unsafe {
let ptr = std::mem::transmute::<_, *mut Cupchan<T>>(self.chan);
ptr::drop_in_place(ptr);
}
}
}
}
unsafe impl<T: Sync + Send> Send for CupchanWriter<T> {}
unsafe impl<T: Sync + Send> Sync for CupchanWriter<T> {}
#[derive(Debug)]
pub struct CupchanReader<T: 'static> {
chan: &'static Cupchan<T>,
}
impl<T> CupchanReader<T> {
fn new(chan: &'static Cupchan<T>) -> Self {
Self { chan }
}
#[inline]
fn read(&self) -> &'static UnsafeCell<T> {
let res = self
.chan
.state
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |state| {
Some(state ^ READER_STATE_MAP[state])
})
.unwrap();
&self.chan.cups[READER_CUP_MAP[res ^ READER_STATE_MAP[res]]]
}
pub fn new_writer(&self) -> Option<CupchanWriter<T>> {
if self.chan.unconnected.swap(false, Ordering::SeqCst) {
Some(CupchanWriter::new(self.chan))
} else {
None
}
}
#[cfg(loom)]
pub fn loom_ptr(&self) -> ConstPtr<T> {
self.read().get()
}
}
#[cfg(not(loom))]
impl<T> Deref for CupchanReader<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
unsafe { &(*self.read().get()) }
}
}
impl<T> Drop for CupchanReader<T> {
fn drop(&mut self) {
let was_unconnected = self.chan.unconnected.swap(true, Ordering::AcqRel);
if was_unconnected {
unsafe {
let ptr = std::mem::transmute::<_, *mut Cupchan<T>>(self.chan);
ptr::drop_in_place(ptr);
}
}
}
}
unsafe impl<T: Sync + Send> Send for CupchanReader<T> {}
unsafe impl<T: Sync + Send> Sync for CupchanReader<T> {}
#[cfg(test)]
mod tests {
extern crate test;
use test::Bencher;
use std::thread;
use crate::cupchan;
#[test]
fn test_chan_sync() {
let (mut writer, reader) = cupchan(0);
*writer = 1;
writer.flush();
assert_eq!(*reader, 1);
*writer = 2;
writer.flush();
assert_eq!(*reader, 2);
drop(reader);
let reader = writer.new_reader().unwrap();
*writer = 3;
writer.flush();
assert_eq!(*reader, 3);
drop(writer)
}
const MAX: usize = 5_000;
#[test]
fn cupchan_async_greedy_reader() {
let (mut writer, reader) = cupchan(0usize);
let join = thread::spawn(move || {
for i in 0..MAX {
*writer = i;
writer.flush();
}
});
let mut current = *reader;
while current < MAX - 1 {
current = *reader;
}
assert!(*reader == MAX - 1);
join.join().unwrap();
}
#[test]
fn cupchan_async_lazy_reader() {
let (mut writer, reader) = cupchan(0usize);
let join = thread::spawn(move || {
for i in 0..MAX {
*writer = i;
writer.flush();
}
});
let mut current = *reader;
while current < MAX - 1 {
thread::yield_now();
current = *reader;
}
assert!(*reader == MAX - 1);
join.join().unwrap();
}
#[test]
fn crossbeam_chan_async() {
let (tx, rx) = crossbeam_channel::bounded(3);
let join = thread::spawn(move || {
for i in 0..MAX {
tx.send(i).unwrap();
}
});
let mut current = 0;
for _ in 0..MAX {
current = rx.recv().unwrap();
}
assert!(current == MAX - 1);
join.join().unwrap();
}
#[test]
fn crossbeam_chan_async_cap_10() {
let (tx, rx) = crossbeam_channel::bounded(10);
let join = thread::spawn(move || {
for i in 0..MAX {
tx.send(i).unwrap();
}
});
let mut current = 0;
for _ in 0..MAX {
current = rx.recv().unwrap();
}
assert!(current == MAX - 1);
join.join().unwrap();
}
#[test]
fn flume_chan_async() {
let (tx, rx) = flume::unbounded();
let join = thread::spawn(move || {
for i in 0..MAX {
tx.send(i).unwrap();
}
});
let mut current = 0;
for _ in 0..MAX {
current = rx.recv().unwrap();
}
assert!(current == MAX - 1);
join.join().unwrap();
}
#[bench]
fn bench_cupchan_greedy(b: &mut Bencher) {
b.iter(|| {
cupchan_async_greedy_reader();
})
}
#[bench]
fn bench_cupchan_lazy(b: &mut Bencher) {
b.iter(|| {
cupchan_async_lazy_reader();
})
}
#[bench]
fn bench_crossbeam_chan_cap_3(b: &mut Bencher) {
b.iter(|| {
crossbeam_chan_async();
})
}
#[bench]
fn bench_crossbeam_chan_cap_10(b: &mut Bencher) {
b.iter(|| {
crossbeam_chan_async_cap_10();
})
}
#[bench]
fn bench_flume_chan(b: &mut Bencher) {
b.iter(|| {
flume_chan_async();
})
}
}