#![warn(missing_docs)]
#![no_std]
#![cfg_attr(feature = "nightly", feature(const_ptr_write))]
#![cfg_attr(feature = "nightly", feature(const_mut_refs))]
#![cfg_attr(feature = "nightly", feature(const_ptr_read))]
#![cfg_attr(feature = "nightly", feature(const_refs_to_cell))]
#[cfg(not(loom))]
use core::cell::UnsafeCell;
use core::default::Default;
use core::fmt::{Debug, Display};
use core::mem::MaybeUninit;
use core::ops::{Deref, DerefMut};
use core::ptr::{read_volatile, write_bytes, write_volatile};
#[cfg(not(loom))]
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
#[cfg(loom)]
use loom::cell::UnsafeCell;
#[cfg(loom)]
use loom::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
#[derive(Debug)]
pub struct RingBuffer<T: Copy, const N: usize> {
locked: Padded<AtomicBool>,
version: Padded<AtomicUsize>,
index: Padded<AtomicUsize>,
data: [Block<T>; N],
}
impl<T: Copy, const N: usize> Default for RingBuffer<T, N> {
fn default() -> Self {
Self::new()
}
}
unsafe impl<T: Copy, const N: usize> Send for RingBuffer<T, N> {}
unsafe impl<T: Copy, const N: usize> Sync for RingBuffer<T, N> {}
impl<T: Copy, const N: usize> RingBuffer<T, N> {
#[cfg(feature = "nightly")]
#[cfg(not(loom))]
pub const fn new() -> RingBuffer<T, N> {
let data: [Block<T>; N] = unsafe {
let mut data: [MaybeUninit<Block<T>>; N] = MaybeUninit::uninit().assume_init();
write_bytes(&mut data, 0, 1);
let init = core::ptr::read(
(&data as *const [MaybeUninit<Block<T>>; N]).cast::<[Block<T>; N]>(),
);
core::mem::forget(data);
init
};
RingBuffer {
locked: Padded(AtomicBool::new(false)),
version: Padded(AtomicUsize::new(0)),
index: Padded(AtomicUsize::new(0)),
data,
}
}
#[cfg(not(feature = "nightly"))]
#[cfg(not(loom))]
pub fn new() -> RingBuffer<T, N> {
let data: [Block<T>; N] = unsafe {
let mut data: [MaybeUninit<Block<T>>; N] = MaybeUninit::uninit().assume_init();
write_bytes(&mut data, 0, 1);
let init = core::ptr::read(
(&data as *const [MaybeUninit<Block<T>>; N]).cast::<[Block<T>; N]>(),
);
core::mem::forget(data);
init
};
RingBuffer {
locked: Padded(AtomicBool::new(false)),
version: Padded(AtomicUsize::new(0)),
index: Padded(AtomicUsize::new(0)),
data,
}
}
#[cfg(loom)]
pub fn new() -> RingBuffer<T, N> {
let data: [Block<T>; N] = unsafe {
let mut data: [MaybeUninit<Block<T>>; N] = MaybeUninit::uninit().assume_init();
for b in data.iter_mut() {
core::ptr::write(
(b as *mut MaybeUninit<Block<T>>).cast::<Block<T>>(),
Block {
seq: AtomicUsize::new(0),
message: UnsafeCell::new(MaybeUninit::uninit().assume_init()),
},
)
}
let init = core::ptr::read(
(&data as *const [MaybeUninit<Block<T>>; N]).cast::<[Block<T>; N]>(),
);
core::mem::forget(data);
init
};
RingBuffer {
locked: Padded(AtomicBool::new(false)),
version: Padded(AtomicUsize::new(0)),
index: Padded(AtomicUsize::new(0)),
data,
}
}
#[inline]
pub fn try_lock(&self) -> Result<WriteGuard<'_, T, N>, ()> {
if !self.locked.swap(true, Ordering::Acquire) {
Ok(WriteGuard { buffer: self })
} else {
Err(())
}
}
#[inline]
pub fn reader(&self) -> SharedReader<'_, T, N> {
SharedReader {
buffer: Padded(self),
index: Padded(AtomicUsize::new(0)),
version: Padded(AtomicUsize::new(self.version.load(Ordering::Relaxed))),
}
}
#[inline]
fn start_write(&self) -> usize {
let index = self.index.load(Ordering::Relaxed);
let seq = self.data[index].seq.fetch_add(1, Ordering::Relaxed);
assert!(seq % 2 == 0);
let ver = self.version.load(Ordering::Relaxed);
self.version
.store(core::cmp::max(ver, seq + 2), Ordering::Relaxed);
index
}
#[inline]
fn end_write(&self, index: usize) {
self.index.store((index + 1) % N, Ordering::Relaxed);
let seq = self.data[index].seq.fetch_add(1, Ordering::Release);
assert!(seq % 2 == 1);
}
}
#[derive(Debug)]
pub struct SharedReader<'read, T: Copy, const N: usize> {
buffer: Padded<&'read RingBuffer<T, N>>,
index: Padded<AtomicUsize>,
version: Padded<AtomicUsize>,
}
impl<'read, T: Copy, const N: usize> Clone for SharedReader<'read, T, N> {
fn clone(&self) -> Self {
SharedReader {
buffer: Padded(&self.buffer),
index: Padded(AtomicUsize::new(self.index.load(Ordering::Relaxed))),
version: Padded(AtomicUsize::new(self.version.load(Ordering::Relaxed))),
}
}
}
unsafe impl<'read, T: Copy, const N: usize> Send for SharedReader<'read, T, N> {}
impl<'read, T: Copy, const N: usize> SharedReader<'read, T, N> {
pub fn pop_front(&self) -> Option<T> {
let mut i = self.index.load(Ordering::Acquire);
loop {
let ver = self.version.load(Ordering::Relaxed);
let seq1 = unsafe {
Self::check_version(
self.buffer
.data
.get_unchecked(i)
.seq
.load(Ordering::Acquire),
ver,
i,
)?
};
self.version
.compare_exchange(ver, seq1, Ordering::Relaxed, Ordering::Relaxed)
.ok()?;
if let Err(new) =
self.index
.compare_exchange(i, (i + 1) % N, Ordering::Release, Ordering::Acquire)
{
i = new;
continue;
}
#[cfg(not(loom))]
let data: T =
unsafe { read_volatile(self.buffer.data.get_unchecked(i).message.get().cast()) };
let seq2 = unsafe {
self.buffer
.data
.get_unchecked(i)
.seq
.load(Ordering::Relaxed)
};
if seq1 != seq2 {
return None;
}
#[cfg(not(loom))]
return Some(data);
#[cfg(loom)]
return None;
}
}
#[inline]
fn check_version(mut seq: usize, ver: usize, i: usize) -> Option<usize> {
if seq & 1 != 0 {
return None;
}
seq &= usize::MAX - 1;
if (i == 0 && seq == ver) || seq < ver {
return None;
}
Some(seq)
}
}
#[derive(Debug)]
pub struct WriteGuard<'write, T: Copy, const N: usize> {
buffer: &'write RingBuffer<T, N>,
}
unsafe impl<'read, T: Copy, const N: usize> Send for WriteGuard<'read, T, N> {}
impl<'write, T: Copy, const N: usize> WriteGuard<'write, T, N> {
pub fn push_back(&mut self, val: T) {
let i = self.buffer.start_write();
#[cfg(not(loom))]
unsafe {
write_volatile(self.buffer.data[i].message.get().cast(), val)
};
#[cfg(loom)]
unsafe {
self.buffer.data[i]
.message
.with_mut(|p| write_volatile(p.cast(), val))
};
self.buffer.end_write(i);
}
}
impl<'write, T: Copy, const N: usize> Drop for WriteGuard<'write, T, N> {
fn drop(&mut self) {
self.buffer.locked.store(false, Ordering::Release);
}
}
struct Block<T: Copy> {
seq: AtomicUsize,
message: UnsafeCell<MaybeUninit<T>>,
}
impl<T: Copy> Debug for Block<T> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("Block")
.field("seq", &self.seq.load(Ordering::Relaxed))
.finish()
}
}
#[cfg_attr(
any(
target_arch = "x86_64",
target_arch = "aarch64",
target_arch = "powerpc64",
),
repr(align(128))
)]
#[cfg_attr(
any(
target_arch = "arm",
target_arch = "mips",
target_arch = "mips64",
target_arch = "riscv64",
),
repr(align(32))
)]
#[cfg_attr(target_arch = "s390x", repr(align(256)))]
#[cfg_attr(
not(any(
target_arch = "x86_64",
target_arch = "aarch64",
target_arch = "powerpc64",
target_arch = "arm",
target_arch = "mips",
target_arch = "mips64",
target_arch = "riscv64",
target_arch = "s390x",
)),
repr(align(64))
)]
#[derive(Clone, Copy, Default, Hash, PartialEq, Eq)]
struct Padded<T>(T);
impl<T> Padded<T> {
const fn new(t: T) -> Self {
Padded(t)
}
}
impl<T> Deref for Padded<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T> DerefMut for Padded<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<T> Debug for Padded<T>
where
T: Debug,
{
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.write_fmt(format_args!("{:?}", self.0))
}
}
impl<T> Display for Padded<T>
where
T: Display,
{
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.write_fmt(format_args!("{}", self.0))
}
}
impl<T> From<T> for Padded<T> {
fn from(value: T) -> Self {
Padded::new(value)
}
}
mod test {
#[allow(unused_imports)]
use super::*;
extern crate std;
#[allow(unused_imports)]
use std::println;
#[test]
fn test_nightly() {
if cfg!(feature = "nightly") {
println!("nightly")
}
}
#[test]
fn test_new_buffer() {
let _ = RingBuffer::<u32, 32>::new();
}
#[test]
fn test_write() {
let buffer = RingBuffer::<_, 32>::new();
let mut writer = buffer.try_lock().unwrap();
for i in 0..32 {
writer.push_back(i)
}
println!("buffer: {buffer:?}");
}
#[test]
fn test_empty_queue() {
let buffer = RingBuffer::<u8, 32>::new();
let reader = buffer.reader();
assert!(reader.pop_front().is_none());
}
#[test]
fn test_lock() {
let buffer = RingBuffer::<(), 32>::new();
let _writer = buffer.try_lock().unwrap();
assert!(buffer.try_lock().is_err());
}
#[test]
fn test_read() {
let buffer = RingBuffer::<_, 32>::new();
let mut writer = buffer.try_lock().unwrap();
for i in 0..32 {
writer.push_back(i)
}
let reader = buffer.reader();
while let Some(i) = reader.pop_front() {
println!("val: {i}");
}
}
#[test]
fn test_multi_reader() {
let buffer = RingBuffer::<_, 128>::new();
let mut writer = buffer.try_lock().unwrap();
let reader = buffer.reader();
std::thread::scope(|s| {
let reader = &reader;
for t in 0..8 {
s.spawn(move || {
for _ in 0..100 {
if let Some(val) = reader.pop_front() {
println!("t: {t}, val: {val:?}");
};
}
});
}
for _ in 0..100 {
writer.push_back([0, 32, 31, 903, 1, 4, 23, 12, 4, 21]);
}
});
}
#[test]
fn test_ping() {
for _ in 0..1000 {
let b1 = RingBuffer::<_, 128>::new();
let read = AtomicBool::new(false);
let mut writer = b1.try_lock().unwrap();
let reader = b1.reader();
std::thread::scope(|s| {
let reader = &reader;
let read = &read;
for t in 0..8 {
s.spawn(move || {
while !read.load(Ordering::SeqCst) {
if reader.pop_front().is_some() {
read.store(true, Ordering::SeqCst);
} else {
std::thread::yield_now();
}
}
for _ in 0..100 {
if let Some(val) = reader.pop_front() {
println!("t: {t}, val: {val:?}");
};
}
});
}
writer.push_back(());
});
}
}
}