#![cfg_attr(not(test), no_std)]
#![deny(missing_debug_implementations, missing_docs)]
extern crate alloc;
use crossbeam_utils::CachePadded;
use alloc::sync::Arc;
use core::{
cell::UnsafeCell,
fmt,
ops::{Deref, DerefMut},
sync::atomic::{AtomicU8, Ordering},
};
#[derive(Debug)]
pub struct TripleBuffer<T: Send> {
input: Input<T>,
output: Output<T>,
}
impl<T: Clone + Send> TripleBuffer<T> {
pub fn new(initial: &T) -> Self {
Self::new_impl(|| initial.clone())
}
}
impl<T: Default + Send> Default for TripleBuffer<T> {
fn default() -> Self {
Self::new_impl(T::default)
}
}
impl<T: Send> TripleBuffer<T> {
fn new_impl(mut generator: impl FnMut() -> T) -> Self {
let shared_state = Arc::new(SharedState::new(|_i| generator(), 0));
TripleBuffer {
input: Input {
shared: shared_state.clone(),
input_idx: 1,
},
output: Output {
shared: shared_state,
output_idx: 2,
},
}
}
pub fn split(self) -> (Input<T>, Output<T>) {
(self.input, self.output)
}
}
pub fn triple_buffer<T: Clone + Send>(initial: &T) -> (Input<T>, Output<T>) {
TripleBuffer::new(initial).split()
}
#[doc(hidden)]
impl<T: Clone + Send> Clone for TripleBuffer<T> {
fn clone(&self) -> Self {
let shared_state = Arc::new(unsafe { (*self.input.shared).clone() });
TripleBuffer {
input: Input {
shared: shared_state.clone(),
input_idx: self.input.input_idx,
},
output: Output {
shared: shared_state,
output_idx: self.output.output_idx,
},
}
}
}
#[doc(hidden)]
impl<T: PartialEq + Send> PartialEq for TripleBuffer<T> {
fn eq(&self, other: &Self) -> bool {
let shared_states_equal = unsafe { (*self.input.shared).eq(&*other.input.shared) };
shared_states_equal
&& (self.input.input_idx == other.input.input_idx)
&& (self.output.output_idx == other.output.output_idx)
}
}
#[derive(Debug)]
pub struct Input<T: Send> {
shared: Arc<SharedState<T>>,
input_idx: BufferIndex,
}
impl<T: Send> Input<T> {
pub fn write(&mut self, value: T) {
*self.input_buffer_mut() = value;
self.publish();
}
pub fn consumed(&self) -> bool {
let back_info = self.shared.back_info.load(Ordering::Relaxed);
back_info & BACK_DIRTY_BIT == 0
}
pub fn input_buffer(&self) -> &T {
let input_ptr = self.shared.buffers[self.input_idx as usize].get();
unsafe { &*input_ptr }
}
pub fn input_buffer_mut(&mut self) -> &mut T {
let input_ptr = self.shared.buffers[self.input_idx as usize].get();
unsafe { &mut *input_ptr }
}
pub fn publish(&mut self) -> bool {
let former_back_info = self
.shared
.back_info
.swap(self.input_idx | BACK_DIRTY_BIT, Ordering::AcqRel);
self.input_idx = former_back_info & BACK_INDEX_MASK;
former_back_info & BACK_DIRTY_BIT != 0
}
pub fn input_buffer_publisher(&mut self) -> InputPublishGuard<'_, T> {
InputPublishGuard { reference: self }
}
}
pub struct InputPublishGuard<'a, T: 'a + Send> {
reference: &'a mut Input<T>,
}
impl<T: Send> Deref for InputPublishGuard<'_, T> {
type Target = T;
fn deref(&self) -> &T {
self.reference.input_buffer()
}
}
impl<T: Send> DerefMut for InputPublishGuard<'_, T> {
fn deref_mut(&mut self) -> &mut T {
self.reference.input_buffer_mut()
}
}
impl<T: Send> Drop for InputPublishGuard<'_, T> {
#[inline]
fn drop(&mut self) {
self.reference.publish();
}
}
impl<T: Send + fmt::Debug> fmt::Debug for InputPublishGuard<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&**self, f)
}
}
impl<T: fmt::Display + Send> fmt::Display for InputPublishGuard<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
(**self).fmt(f)
}
}
#[derive(Debug)]
pub struct Output<T: Send> {
shared: Arc<SharedState<T>>,
output_idx: BufferIndex,
}
impl<T: Send> Output<T> {
pub fn read(&mut self) -> &T {
self.update();
self.output_buffer_mut()
}
pub fn updated(&self) -> bool {
let back_info = self.shared.back_info.load(Ordering::Relaxed);
back_info & BACK_DIRTY_BIT != 0
}
#[deprecated = "Please use output_buffer() instead"]
pub fn peek_output_buffer(&self) -> &T {
self.output_buffer()
}
pub fn output_buffer(&self) -> &T {
let output_ptr = self.shared.buffers[self.output_idx as usize].get();
unsafe { &*output_ptr }
}
pub fn output_buffer_mut(&mut self) -> &mut T {
let output_ptr = self.shared.buffers[self.output_idx as usize].get();
unsafe { &mut *output_ptr }
}
pub fn update(&mut self) -> bool {
let updated = self.updated();
if updated {
let shared_state = &(*self.shared);
let former_back_info = shared_state
.back_info
.swap(self.output_idx, Ordering::AcqRel);
self.output_idx = former_back_info & BACK_INDEX_MASK;
}
updated
}
}
#[derive(Debug)]
struct SharedState<T: Send> {
buffers: [CachePadded<UnsafeCell<T>>; 3],
back_info: CachePadded<AtomicBackBufferInfo>,
}
#[doc(hidden)]
impl<T: Send> SharedState<T> {
fn new(mut gen_buf_data: impl FnMut(usize) -> T, back_info: BackBufferInfo) -> Self {
let mut make_buf = |i| -> CachePadded<UnsafeCell<T>> {
CachePadded::new(UnsafeCell::new(gen_buf_data(i)))
};
Self {
buffers: [make_buf(0), make_buf(1), make_buf(2)],
back_info: CachePadded::new(AtomicBackBufferInfo::new(back_info)),
}
}
}
#[doc(hidden)]
impl<T: Clone + Send> SharedState<T> {
unsafe fn clone(&self) -> Self {
Self::new(
|i| unsafe { (*self.buffers[i].get()).clone() },
self.back_info.load(Ordering::Relaxed),
)
}
}
#[doc(hidden)]
impl<T: PartialEq + Send> SharedState<T> {
unsafe fn eq(&self, other: &Self) -> bool {
let buffers_equal = self
.buffers
.iter()
.zip(other.buffers.iter())
.all(|tuple| -> bool {
let (cell1, cell2) = tuple;
unsafe { *cell1.get() == *cell2.get() }
});
buffers_equal
&& (self.back_info.load(Ordering::Relaxed) == other.back_info.load(Ordering::Relaxed))
}
}
unsafe impl<T: Send> Sync for SharedState<T> {}
type BufferIndex = u8;
type BackBufferInfo = BufferIndex;
type AtomicBackBufferInfo = AtomicU8;
const BACK_INDEX_MASK: u8 = 0b11; const BACK_DIRTY_BIT: u8 = 0b100;
#[cfg(test)]
mod tests {
use super::{BACK_DIRTY_BIT, BACK_INDEX_MASK, BufferIndex, SharedState, TripleBuffer};
use std::{fmt::Debug, ops::Deref, sync::atomic::Ordering, thread, time::Duration};
use testbench::race_cell::{RaceCell, Racey};
#[test]
fn initial_state() {
let mut buf = TripleBuffer::new(&42);
check_buf_state(&mut buf, false);
assert_eq!(*buf.output.read(), 42);
}
#[test]
fn partial_eq_shared() {
let dummy_state = SharedState::<u16>::new(|i| [111, 222, 333][i], 0b10);
assert!(unsafe { dummy_state.eq(&dummy_state) });
assert!(unsafe { !dummy_state.eq(&SharedState::<u16>::new(|i| [114, 222, 333][i], 0b10)) });
assert!(unsafe { !dummy_state.eq(&SharedState::<u16>::new(|i| [111, 225, 333][i], 0b10)) });
assert!(unsafe { !dummy_state.eq(&SharedState::<u16>::new(|i| [111, 222, 336][i], 0b10)) });
assert!(unsafe {
!dummy_state.eq(&SharedState::<u16>::new(
|i| [111, 222, 333][i],
BACK_DIRTY_BIT & 0b10,
))
});
assert!(unsafe { !dummy_state.eq(&SharedState::<u16>::new(|i| [111, 222, 333][i], 0b01)) });
}
#[test]
fn partial_eq() {
let buf = TripleBuffer::new(&"test");
assert_eq!(buf, buf);
let buf2 = TripleBuffer::new(&"taste");
assert_eq!(buf.input.input_idx, buf2.input.input_idx);
assert_eq!(buf.output.output_idx, buf2.output.output_idx);
assert!(buf != buf2);
let mut buf3 = TripleBuffer::new(&"test");
assert_eq!(buf, buf3);
let old_input_idx = buf3.input.input_idx;
buf3.input.input_idx = buf3.output.output_idx;
assert!(buf != buf3);
buf3.input.input_idx = old_input_idx;
buf3.output.output_idx = old_input_idx;
assert!(buf != buf3);
}
#[test]
fn clone_shared() {
let dummy_state = SharedState::<u8>::new(|i| [123, 231, 132][i], BACK_DIRTY_BIT & 0b01);
let dummy_state_copy = unsafe { dummy_state.clone() };
assert!(unsafe {
dummy_state.eq(&SharedState::<u8>::new(
|i| [123, 231, 132][i],
BACK_DIRTY_BIT & 0b01,
))
});
assert!(unsafe { dummy_state.eq(&dummy_state_copy) });
}
#[test]
fn clone() {
let mut buf = TripleBuffer::new(&4.2);
unsafe {
*buf.input.shared.buffers[0].get() = 1.2;
*buf.input.shared.buffers[1].get() = 3.4;
*buf.input.shared.buffers[2].get() = 5.6;
}
buf.input
.shared
.back_info
.store(BACK_DIRTY_BIT & 0b01, Ordering::Relaxed);
buf.input.input_idx = 0b10;
buf.output.output_idx = 0b00;
let buf_clone = buf.clone();
assert_eq!(
as_ptr(&buf_clone.input.shared),
as_ptr(&buf_clone.output.shared)
);
assert_ne!(as_ptr(&buf_clone.input.shared), as_ptr(&buf.input.shared));
assert_ne!(as_ptr(&buf_clone.output.shared), as_ptr(&buf.output.shared));
assert_eq!(buf, buf_clone);
unsafe {
assert_eq!(*buf.input.shared.buffers[0].get(), 1.2);
assert_eq!(*buf.input.shared.buffers[1].get(), 3.4);
assert_eq!(*buf.input.shared.buffers[2].get(), 5.6);
}
assert_eq!(
buf.input.shared.back_info.load(Ordering::Relaxed),
BACK_DIRTY_BIT & 0b01
);
assert_eq!(buf.input.input_idx, 0b10);
assert_eq!(buf.output.output_idx, 0b00);
}
#[test]
fn swaps() {
let mut buf = TripleBuffer::new(&[123, 456]);
let old_buf = buf.clone();
let old_input_idx = old_buf.input.input_idx;
let old_shared = &old_buf.input.shared;
let old_back_info = old_shared.back_info.load(Ordering::Relaxed);
let old_back_idx = old_back_info & BACK_INDEX_MASK;
let old_output_idx = old_buf.output.output_idx;
assert!(!buf.output.update());
assert_eq!(buf, old_buf);
check_buf_state(&mut buf, false);
assert!(!buf.input.publish());
let mut expected_buf = old_buf.clone();
expected_buf.input.input_idx = old_back_idx;
expected_buf
.input
.shared
.back_info
.store(old_input_idx | BACK_DIRTY_BIT, Ordering::Relaxed);
assert_eq!(buf, expected_buf);
check_buf_state(&mut buf, true);
assert!(buf.input.publish());
let mut expected_buf = old_buf.clone();
expected_buf.input.input_idx = old_input_idx;
expected_buf
.input
.shared
.back_info
.store(old_back_idx | BACK_DIRTY_BIT, Ordering::Relaxed);
assert_eq!(buf, expected_buf);
check_buf_state(&mut buf, true);
assert!(buf.output.update());
expected_buf.output.output_idx = old_back_idx;
expected_buf
.output
.shared
.back_info
.store(old_output_idx, Ordering::Relaxed);
assert_eq!(buf, expected_buf);
check_buf_state(&mut buf, false);
}
#[test]
fn vec_guarded_write() {
let mut buf = TripleBuffer::new(&vec![]);
{
let mut buffer = buf.input.input_buffer_publisher();
buffer.push(0);
buffer.push(1);
buffer.push(2);
let back_info = buffer.reference.shared.back_info.load(Ordering::Relaxed);
let back_buffer_dirty = back_info & BACK_DIRTY_BIT != 0;
assert!(!back_buffer_dirty);
}
check_buf_state(&mut buf, true); assert_eq!(*buf.output.read(), vec![0, 1, 2]);
check_buf_state(&mut buf, false);
{
buf.input.input_buffer_publisher().push(3);
}
check_buf_state(&mut buf, true);
{
buf.input.input_buffer_publisher().push(4);
}
assert_eq!(*buf.output.read(), vec![4]);
check_buf_state(&mut buf, false);
{
buf.input.input_buffer_publisher().push(5);
}
assert_eq!(*buf.output.read(), vec![3, 5]);
check_buf_state(&mut buf, false);
{
let mut buffer = buf.input.input_buffer_publisher();
buffer.clear();
buffer.push(6);
}
assert_eq!(*buf.output.read(), vec![6]);
check_buf_state(&mut buf, false);
}
#[test]
fn sequential_write() {
let mut buf = TripleBuffer::new(&false);
let old_buf = buf.clone();
buf.input.write(true);
{
let mut expected_buf = old_buf.clone();
*expected_buf.input.input_buffer_mut() = true;
expected_buf.input.publish();
assert_eq!(buf, expected_buf);
check_buf_state(&mut buf, true);
}
}
#[test]
fn sequential_guarded_write() {
let mut buf = TripleBuffer::new(&false);
let old_buf = buf.clone();
*buf.input.input_buffer_publisher() = true;
{
let mut expected_buf = old_buf.clone();
*expected_buf.input.input_buffer_mut() = true;
expected_buf.input.publish();
assert_eq!(buf, expected_buf);
check_buf_state(&mut buf, true);
}
}
#[test]
fn sequential_read() {
let mut buf = TripleBuffer::new(&1.0);
buf.input.write(4.2);
{
let old_buf = buf.clone();
let result = *buf.output.read();
assert_eq!(result, 4.2);
let mut expected_buf = old_buf.clone();
assert!(expected_buf.output.update());
assert_eq!(buf, expected_buf);
check_buf_state(&mut buf, false);
}
{
let old_buf = buf.clone();
let result = *buf.output.read();
assert_eq!(result, 4.2);
assert_eq!(buf, old_buf);
check_buf_state(&mut buf, false);
}
}
#[test]
fn sequential_guarded_read() {
let mut buf = TripleBuffer::new(&1.0);
*buf.input.input_buffer_publisher() = 4.2;
{
let old_buf: TripleBuffer<f64> = buf.clone();
let result = *buf.output.read();
assert_eq!(result, 4.2);
let mut expected_buf = old_buf.clone();
assert!(expected_buf.output.update());
assert_eq!(buf, expected_buf);
check_buf_state(&mut buf, false);
}
{
let old_buf = buf.clone();
let result = *buf.output.read();
assert_eq!(result, 4.2);
assert_eq!(buf, old_buf);
check_buf_state(&mut buf, false);
}
}
#[test]
#[ignore]
fn contended_concurrent_read_write() {
#[cfg(not(feature = "miri"))]
const TEST_WRITE_COUNT: usize = 100_000_000;
#[cfg(feature = "miri")]
const TEST_WRITE_COUNT: usize = 3_000;
let buf = TripleBuffer::new(&RaceCell::new(0));
let (mut buf_input, mut buf_output) = buf.split();
let mut last_value = 0usize;
testbench::concurrent_test_2(
move || {
for value in 1..=TEST_WRITE_COUNT {
buf_input.write(RaceCell::new(value));
}
},
move || {
while last_value < TEST_WRITE_COUNT {
let new_racey_value = buf_output.read().get();
match new_racey_value {
Racey::Consistent(new_value) => {
assert!((new_value >= last_value) && (new_value <= TEST_WRITE_COUNT));
last_value = new_value;
}
Racey::Inconsistent => {
panic!("Inconsistent state exposed by the buffer!");
}
}
}
},
);
}
#[test]
#[ignore]
fn uncontended_concurrent_read_write() {
#[cfg(not(feature = "miri"))]
const TEST_WRITE_COUNT: usize = 625;
#[cfg(feature = "miri")]
const TEST_WRITE_COUNT: usize = 200;
let buf = TripleBuffer::new(&RaceCell::new(0));
let (mut buf_input, mut buf_output) = buf.split();
let mut last_value = 0usize;
testbench::concurrent_test_2(
move || {
for value in 1..=TEST_WRITE_COUNT {
buf_input.write(RaceCell::new(value));
thread::yield_now();
thread::sleep(Duration::from_millis(32));
}
},
move || {
while last_value < TEST_WRITE_COUNT {
let new_racey_value = buf_output.read().get();
match new_racey_value {
Racey::Consistent(new_value) => {
assert!((new_value >= last_value) && (new_value - last_value <= 1));
last_value = new_value;
}
Racey::Inconsistent => {
panic!("Inconsistent state exposed by the buffer!");
}
}
}
},
);
}
#[test]
#[ignore]
fn concurrent_bidirectional_exchange() {
#[cfg(not(feature = "miri"))]
const TEST_WRITE_COUNT: usize = 100_000_000;
#[cfg(feature = "miri")]
const TEST_WRITE_COUNT: usize = 3_000;
let buf = TripleBuffer::new(&RaceCell::new(0));
let (mut buf_input, mut buf_output) = buf.split();
testbench::concurrent_test_2(
move || {
for new_value in 1..=TEST_WRITE_COUNT {
match buf_input.input_buffer_mut().get() {
Racey::Consistent(curr_value) => {
assert!(curr_value <= new_value);
}
Racey::Inconsistent => {
panic!("Inconsistent state exposed by the buffer!");
}
}
buf_input.write(RaceCell::new(new_value));
}
},
move || {
let mut last_value = 0usize;
while last_value < TEST_WRITE_COUNT {
match buf_output.output_buffer().get() {
Racey::Consistent(new_value) => {
assert!((new_value >= last_value) && (new_value <= TEST_WRITE_COUNT));
last_value = new_value;
}
Racey::Inconsistent => {
panic!("Inconsistent state exposed by the buffer!");
}
}
if buf_output.updated() {
buf_output.output_buffer_mut().set(last_value / 2);
buf_output.update();
}
}
},
);
}
#[allow(unused_comparisons)]
fn index_in_range(idx: BufferIndex) -> bool {
(0..=2).contains(&idx)
}
fn as_ptr<P: Deref>(ref_like: &P) -> *const P::Target {
&(**ref_like) as *const _
}
fn check_buf_state<T>(buf: &mut TripleBuffer<T>, expected_dirty_bit: bool)
where
T: Clone + Debug + PartialEq + Send,
{
let initial_buf = buf.clone();
assert_eq!(as_ptr(&buf.input.shared), as_ptr(&buf.output.shared));
let back_info = buf.input.shared.back_info.load(Ordering::Relaxed);
let back_idx = back_info & BACK_INDEX_MASK;
let back_buffer_dirty = back_info & BACK_DIRTY_BIT != 0;
assert!(index_in_range(buf.input.input_idx));
assert!(index_in_range(buf.output.output_idx));
assert!(index_in_range(back_idx));
assert!(buf.input.input_idx != buf.output.output_idx);
assert!(buf.input.input_idx != back_idx);
assert!(buf.output.output_idx != back_idx);
assert_eq!(back_buffer_dirty, expected_dirty_bit);
assert_eq!(
as_ptr(&buf.input.input_buffer_mut()),
buf.input.shared.buffers[buf.input.input_idx as usize].get()
);
assert_eq!(*buf, initial_buf);
assert_eq!(!buf.input.consumed(), expected_dirty_bit);
assert_eq!(*buf, initial_buf);
assert_eq!(
as_ptr(&buf.output.output_buffer()),
buf.output.shared.buffers[buf.output.output_idx as usize].get()
);
assert_eq!(*buf, initial_buf);
assert_eq!(buf.output.updated(), expected_dirty_bit);
assert_eq!(*buf, initial_buf);
}
}