#![deny(missing_docs)]
use atomic::{Atomic, Ordering};
use bytemuck::NoUninit;
use std::sync::Arc;
use thiserror::Error;
pub use bytemuck;
struct SharedState<T: NoUninit + Sync> {
data: Box<[Atomic<T>]>,
data_len_pow2: u32,
readable: Atomic<usize>,
writing: Atomic<usize>,
}
impl<T: NoUninit + Sync> SharedState<T> {
#[inline]
fn data_len(&self) -> usize {
let data_len = 1 << self.data_len_pow2;
debug_assert_eq!(self.data.len(), data_len);
data_len
}
}
pub struct Input<T: NoUninit + Sync>(Arc<SharedState<T>>);
impl<T: NoUninit + Sync> Input<T> {
pub fn capacity(&self) -> usize {
self.0.data_len()
}
pub fn write(&mut self, input: &[T]) {
let data_len = self.0.data_len();
assert!(
input.len() <= data_len,
"History is shorter than provided input"
);
let old_writing = self.0.writing.load(Ordering::Relaxed);
let new_writing = old_writing.wrapping_add(input.len());
self.0.writing.store(new_writing, Ordering::Relaxed);
atomic::fence(Ordering::Release);
let old_writing_idx = old_writing % data_len;
let new_writing_idx = new_writing % data_len;
let first_output_len = input.len().min(data_len - old_writing_idx);
let first_output = &self.0.data[old_writing_idx..old_writing_idx + first_output_len];
let (first_input, second_input) = input.split_at(first_output_len);
for (src, dst) in first_input.iter().zip(first_output.iter()) {
dst.store(*src, Ordering::Relaxed);
}
if first_output_len < input.len() {
debug_assert!(old_writing_idx >= new_writing_idx);
debug_assert_eq!(new_writing_idx, second_input.len());
let second_output = &self.0.data[..new_writing_idx];
for (src, dst) in second_input.iter().zip(second_output.iter()) {
dst.store(*src, Ordering::Relaxed);
}
}
if cfg!(debug_assertions) {
assert_eq!(self.0.readable.load(Ordering::Relaxed), old_writing);
}
self.0.readable.store(new_writing, Ordering::Release);
}
}
pub type Clock = usize;
#[derive(Clone, Copy, Debug, Error, PartialEq, Eq)]
#[error(
"A buffer overrun occured while reading history of entry {clock}, writer overwrote {excess_entries} entries"
)]
pub struct Overrun {
pub clock: Clock,
pub excess_entries: Clock,
}
#[derive(Clone)]
pub struct Output<T: NoUninit + Sync>(Arc<SharedState<T>>);
impl<T: NoUninit + Sync> Output<T> {
pub fn capacity(&self) -> usize {
self.0.data_len()
}
pub fn read(&self, output: &mut [T]) -> Result<Clock, Overrun> {
let data_len = self.0.data_len();
assert!(
output.len() <= data_len,
"History is shorter than requested output"
);
let last_readable = self.0.readable.load(Ordering::Acquire);
let first_readable = last_readable.wrapping_sub(output.len());
let last_readable_idx = last_readable % data_len;
let first_readable_idx = first_readable % data_len;
let output_len = output.len();
let first_input_len = output_len.min(data_len - first_readable_idx);
let first_input = &self.0.data[first_readable_idx..first_readable_idx + first_input_len];
let (first_output, second_output) = output.split_at_mut(first_input_len);
for (src, dst) in first_input.iter().zip(first_output.iter_mut()) {
*dst = src.load(Ordering::Relaxed);
}
if first_input_len < output_len {
debug_assert!(first_readable_idx >= last_readable_idx);
debug_assert_eq!(last_readable_idx, second_output.len());
let second_input = &self.0.data[..last_readable_idx];
for (src, dst) in second_input.iter().zip(second_output.iter_mut()) {
*dst = src.load(Ordering::Relaxed);
}
}
atomic::fence(Ordering::Acquire);
let last_writing = self.0.writing.load(Ordering::Relaxed);
let excess_entries = last_writing
.wrapping_sub(first_readable)
.saturating_sub(data_len);
if excess_entries > 0 {
Err(Overrun {
clock: last_readable,
excess_entries,
})
} else {
Ok(last_readable)
}
}
}
pub struct RTHistory<T: NoUninit + Sync>(Arc<SharedState<T>>);
impl<T: NoUninit + Default + Sync> RTHistory<T> {
pub fn new(min_entries: usize) -> Self {
assert!(
Atomic::<T>::is_lock_free(),
"Cannot build a lock-free history log if type T does not have lock-free atomics"
);
let data_len = min_entries.next_power_of_two();
let data_len_pow2 = data_len.trailing_zeros();
Self(Arc::new(SharedState {
data: std::iter::repeat_with(Atomic::<T>::default)
.take(data_len)
.collect(),
data_len_pow2,
readable: Atomic::<usize>::new(0),
writing: Atomic::<usize>::new(0),
}))
}
}
impl<T: NoUninit + Sync> RTHistory<T> {
pub fn capacity(&self) -> usize {
self.0.data_len()
}
pub fn split(self) -> (Input<T>, Output<T>) {
(Input(self.0.clone()), Output(self.0))
}
}
#[cfg(test)]
mod tests {
use super::*;
use proptest::prelude::*;
use std::{
panic::{self, AssertUnwindSafe},
sync::atomic::AtomicUsize,
};
fn num_entries() -> impl Strategy<Value = usize> {
0usize..(1024 * 1024)
}
proptest! {
#[test]
fn new_split_clock(min_entries in num_entries()) {
let history = RTHistory::<f32>::new(min_entries);
prop_assert!(history
.0
.data
.iter()
.map(|x| x.load(Ordering::Relaxed))
.all(|f| f == 0.0));
prop_assert_eq!(history.0.data_len(), min_entries.next_power_of_two());
prop_assert_eq!(history.0.data_len(), history.0.data.len());
prop_assert_eq!(history.0.readable.load(Ordering::Relaxed), 0);
prop_assert_eq!(history.0.writing.load(Ordering::Relaxed), 0);
let (input, output) = history.split();
prop_assert_eq!(&*input.0 as *const _, &*output.0 as *const _);
}
#[test]
fn writes_and_read(
min_entries in num_entries(),
write1: Vec<u32>,
write2: Vec<u32>,
read_size in num_entries(),
) {
let (mut input, output) = RTHistory::<u32>::new(min_entries).split();
macro_rules! checked_write {
($write:expr, $input:expr) => {
if let Err(_) = panic::catch_unwind(AssertUnwindSafe(|| $input.write(&$write[..])))
{
prop_assert!($write.len() > $input.0.data_len());
return Ok(());
}
prop_assert!($write.len() <= $input.0.data_len());
};
}
checked_write!(write1, input);
prop_assert!(input
.0
.data
.iter()
.take(write1.len())
.zip(&write1)
.all(|(a, &x)| a.load(Ordering::Relaxed) == x));
prop_assert!(input
.0
.data
.iter()
.skip(write1.len())
.all(|a| a.load(Ordering::Relaxed) == 0));
prop_assert_eq!(input.0.readable.load(Ordering::Relaxed), write1.len());
prop_assert_eq!(input.0.writing.load(Ordering::Relaxed), write1.len());
checked_write!(write2, input);
let new_readable = write1.len() + write2.len();
let data_len = input.0.data_len();
let overwritten = new_readable.saturating_sub(data_len);
prop_assert!(input
.0
.data
.iter()
.take(overwritten)
.zip(write2[write2.len() - overwritten..].iter())
.all(|(a, &x2)| a.load(Ordering::Relaxed) == x2));
prop_assert!(input
.0
.data
.iter()
.skip(overwritten)
.take(write1.len().saturating_sub(overwritten))
.zip(write1.iter().skip(overwritten))
.all(|(a, &x1)| a.load(Ordering::Relaxed) == x1));
prop_assert!(input
.0
.data
.iter()
.skip(write1.len())
.take(write2.len())
.zip(&write2)
.all(|(a, &x2)| a.load(Ordering::Relaxed) == x2));
prop_assert!(input
.0
.data
.iter()
.skip(new_readable)
.all(|a| a.load(Ordering::Relaxed) == 0));
prop_assert_eq!(input.0.readable.load(Ordering::Relaxed), new_readable);
prop_assert_eq!(input.0.writing.load(Ordering::Relaxed), new_readable);
let data_backup = input
.0
.data
.iter()
.map(|a| a.load(Ordering::Relaxed))
.collect::<Box<[_]>>();
let mut read = vec![0; read_size];
match panic::catch_unwind(AssertUnwindSafe(|| output.read(&mut read[..]))) {
Err(_) => {
prop_assert!(read.len() > output.0.data_len());
return Ok(());
}
Ok(result) => {
prop_assert!(read.len() <= output.0.data_len());
prop_assert_eq!(result, Ok(new_readable));
}
}
prop_assert!(input
.0
.data
.iter()
.zip(data_backup.iter().copied())
.all(|(a, x)| a.load(Ordering::Relaxed) == x));
prop_assert_eq!(input.0.readable.load(Ordering::Relaxed), new_readable);
prop_assert_eq!(input.0.writing.load(Ordering::Relaxed), new_readable);
prop_assert!(read
.iter()
.rev()
.zip(
write2
.iter()
.rev()
.chain(write1.iter().rev())
.chain(std::iter::repeat(&0))
)
.all(|(&x, &y)| x == y));
}
}
#[test]
#[ignore]
fn concurrent_test() {
const PRODUCER_SIZE: usize = 1 << 3; const CONSUMER_SIZE: usize = PRODUCER_SIZE << 1; const BUFFER_SIZE: usize = CONSUMER_SIZE << 2;
const NUM_ELEMS: usize = 1 << 31;
let (mut input, output) = RTHistory::<u64>::new(BUFFER_SIZE).split();
let mut last_emitted = 0;
let producer = move || {
while last_emitted < NUM_ELEMS as u64 {
let mut buf = [0; PRODUCER_SIZE];
for dst in &mut buf {
last_emitted += 1;
*dst = last_emitted;
}
input.write(&buf[..]);
}
};
#[allow(clippy::declare_interior_mutable_const)]
const XRUN_CTR_INIT: AtomicUsize = AtomicUsize::new(0);
let underrun_ctrs = [XRUN_CTR_INIT; 2];
let overrun_ctrs = [XRUN_CTR_INIT; 2];
let gen_consumer = |idx| {
let num_underruns: &AtomicUsize = &underrun_ctrs[idx];
let num_overruns: &AtomicUsize = &overrun_ctrs[idx];
let output = output.clone();
move || {
let mut last_clock = 0;
let mut last_underrun = usize::MAX;
let mut buf = [0; CONSUMER_SIZE];
while last_clock < NUM_ELEMS {
let (clock, valid) = match output.read(&mut buf[..]) {
Ok(clock) => (clock, &buf[..]),
Err(Overrun {
clock,
excess_entries,
}) => {
assert!(excess_entries > 0);
assert_eq!(excess_entries % PRODUCER_SIZE, 0);
num_overruns
.store(num_overruns.load(Ordering::Relaxed) + 1, Ordering::Relaxed);
if excess_entries < buf.len() {
(clock, &buf[excess_entries + 1..])
} else {
(clock, &[][..])
}
}
};
if clock == last_clock {
if clock != last_underrun {
num_underruns.store(
num_underruns.load(Ordering::Relaxed) + 1,
Ordering::Relaxed,
);
last_underrun = clock;
}
} else {
assert!(clock > last_clock);
last_clock = clock;
}
for (expected, &actual) in (1..=clock).rev().zip(valid.iter().rev()) {
assert_eq!(expected as u64, actual);
}
}
}
};
testbench::concurrent_test_3(producer, gen_consumer(0), gen_consumer(1));
const NUM_READOUTS: usize = NUM_ELEMS / CONSUMER_SIZE;
underrun_ctrs
.into_iter()
.map(|a| a.load(Ordering::Relaxed))
.enumerate()
.for_each(|(idx, underrun_ctr)| {
println!("consumer {idx}: {underrun_ctr}/{NUM_READOUTS} underruns");
assert!(underrun_ctr > NUM_READOUTS / 10000);
assert!(underrun_ctr < NUM_READOUTS / 5);
});
overrun_ctrs
.into_iter()
.map(|a| a.load(Ordering::Relaxed))
.enumerate()
.for_each(|(idx, overrun_ctr)| {
println!("consumer {idx}: {overrun_ctr}/{NUM_READOUTS} overruns");
assert!(overrun_ctr > NUM_READOUTS / 10000);
assert!(overrun_ctr < NUM_READOUTS / 5);
});
}
}