extern crate alloc;
use alloc::vec::Vec;
use core::borrow::Borrow;
use core::ops::{Deref, Index};
use core::mem::{ManuallyDrop, size_of};
use core::sync::atomic::{AtomicUsize, AtomicPtr, Ordering, spin_loop_hint};
pub struct Syncbuf<T> {
pub(crate) capacity: usize,
pub(crate) len: AtomicUsize,
pub(crate) working_len: AtomicUsize,
pub(crate) buf: AtomicPtr<T>,
}
impl<T> Syncbuf<T> {
pub fn with_capacity(capacity: usize) -> Syncbuf<T> {
let v: Vec<T> = Vec::with_capacity(capacity);
Syncbuf::from(v)
}
pub fn push(&self, elem: T) -> Result<usize, T> {
let idx = self.working_len.fetch_add(1, Ordering::SeqCst);
if idx >= self.capacity {
return Err(elem);
}
#[cfg(test)]
{
extern crate std;
std::thread::sleep(std::time::Duration::from_millis(10));
}
unsafe {
let location = self.buf.load(Ordering::Relaxed).add(idx);
location.write(elem);
}
let new_idx = loop {
match self.len.compare_exchange_weak(idx, idx + 1, Ordering::SeqCst, Ordering::Acquire) {
Ok(new_idx) => break new_idx,
Err(_) => spin_loop_hint(),
}
};
Ok(new_idx)
}
pub fn get(&self, index: usize) -> Option<&T> {
let len = self.len.load(Ordering::SeqCst);
if index >= len {
None
} else {
let elem_ref = unsafe {
let elem_ptr = self.buf.load(Ordering::Relaxed).add(index);
elem_ptr.as_ref()
};
elem_ref
}
}
pub fn last(&self) -> Option<&T> {
self.last_item().map(|(_,r)| r)
}
pub fn last_item(&self) -> Option<(usize, &T)> {
match self.len.load(Ordering::SeqCst) {
0 => None,
i => self.get(i-1).map(|e| (i-1, e)),
}
}
pub fn len(&self) -> usize {
self.len.load(Ordering::Relaxed)
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn iter(&self) -> Iter<'_, T> {
Iter {
idx: 0,
orig: self,
}
}
}
impl<T> From<Vec<T>> for Syncbuf<T> {
fn from(v: Vec<T>) -> Self {
if size_of::<T>() * v.capacity() == 0 {
panic!("Syncbuf does not support ZSTs");
}
let mut v = ManuallyDrop::new(v);
let vec_buf = v.as_mut_ptr();
let buf: AtomicPtr<T> = AtomicPtr::new(vec_buf);
Syncbuf {
capacity: v.capacity(),
buf,
len: AtomicUsize::new(v.len()),
working_len: AtomicUsize::new(v.len()),
}
}
}
pub struct Iter<'i, T: 'i> {
orig: &'i Syncbuf<T>,
idx: usize,
}
impl<'i, T> Iterator for Iter<'i, T> {
type Item = &'i T;
fn next(&mut self) -> Option<Self::Item> {
let ret = self.orig.get(self.idx);
self.idx += 1;
ret
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.orig.len() - self.idx, Some(self.orig.capacity - self.idx))
}
}
impl<T, I> Index<I> for Syncbuf<T>
where I: core::slice::SliceIndex<[T]> {
type Output = I::Output;
fn index(&self, index: I) -> &Self::Output {
Index::index(&**self, index)
}
}
impl<T> Deref for Syncbuf<T> {
type Target = [T];
fn deref(&self) -> &Self::Target {
let ptr = self.buf.load(Ordering::Relaxed);
let len = self.len();
unsafe {
core::slice::from_raw_parts(ptr, len)
}
}
}
impl<T> Borrow<[T]> for Syncbuf<T> {
fn borrow(&self) -> &[T] {
&self[..]
}
}
impl<T> Into<Vec<T>> for Syncbuf<T> {
fn into(self) -> Vec<T> {
let sb = ManuallyDrop::new(self);
unsafe {
Vec::from_raw_parts(
sb.buf.load(Ordering::Relaxed),
sb.len(),
sb.capacity,
)
}
}
}
impl<T> Drop for Syncbuf<T> {
fn drop(&mut self) {
unsafe {
Vec::from_raw_parts(
self.buf.load(Ordering::Relaxed),
self.len.load(Ordering::Relaxed),
self.capacity,
)
};
}
}
impl<T: core::fmt::Debug> core::fmt::Debug for Syncbuf<T> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut i = self.iter();
write!(f, "Syncbuf {} {{ ", self.capacity)?;
if let Some(elem) = i.next() {
write!(f, "{:?}", elem)?;
for elem in i {
write!(f, ", {:?}", elem)?;
}
}
write!(f, " }}")
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloc::vec;
extern crate std;
use alloc::borrow::ToOwned;
use alloc::string::String;
#[test]
fn push() {
let buf: Syncbuf<usize> = Syncbuf::with_capacity(4);
let pass = vec![buf.push(400), buf.push(1), buf.push(2), buf.push(9)];
assert_eq!(pass, vec![Ok(0), Ok(1), Ok(2), Ok(3)]);
let reject = buf.push(33);
assert_eq!(reject, Err(33));
}
#[test]
fn refs_not_invalidated() {
let buf: Syncbuf<String> = Syncbuf::with_capacity(4);
assert_eq!(buf.push("held".to_owned()), Ok(0));
let held = buf.get(0).unwrap();
assert_eq!(buf.push("added".to_owned()), Ok(1));
assert_eq!(held, "held");
}
#[test]
fn from_vec() {
let v = vec!["foo", "bar", "bat"];
let vec_len = v.len();
let buf: Syncbuf<_> = v.into();
assert_eq!(buf.len(), vec_len);
assert_eq!(buf.get(2), Some(&"bat"));
}
#[test]
fn into_vec() {
let buf: Syncbuf<_> = vec!["bingo".to_owned(), "bango".to_owned(), "bongo".to_owned()].into();
assert_eq!(buf[2], "bongo");
let buf_vec: Vec<String> = buf.into();
assert_eq!(buf_vec[2], "bongo");
let buf2: Syncbuf<_> = buf_vec.into();
assert_eq!(buf2.len(), 3);
assert_eq!(buf2[1], "bango");
}
#[test]
#[should_panic(expected = "index out of bounds")]
fn index() {
let buf: Syncbuf<_> = vec!["6", "9"].into();
assert_eq!(buf[0], "6");
assert_eq!(buf[1], "9");
assert_eq!(buf[2], "panik");
}
#[test]
fn many_threads() {
use std::sync::Arc;
use std::thread::{sleep, JoinHandle};
use std::time::Duration;
const THREADS: usize = 20;
const PUSHES: usize = 200;
let buf: Syncbuf<i32> = Syncbuf::with_capacity(100000);
let buf_arc = Arc::new(buf);
let mut children: Vec<JoinHandle<_>> = Vec::new();
for i in 0..THREADS as i32 {
let buf_arc = Arc::clone(&buf_arc);
children.push(std::thread::spawn(move || {
for _ in 0..PUSHES {
sleep(Duration::from_millis(2));
let idx = buf_arc.push(i).unwrap();
let i_ref = buf_arc.get(idx).unwrap();
sleep(Duration::from_millis(2));
assert_eq!(*i_ref, i);
}
}));
}
for handle in children {
handle.join().unwrap();
}
assert_eq!(buf_arc.len(), THREADS * PUSHES);
assert_eq!(buf_arc.working_len.load(Ordering::Relaxed), THREADS * PUSHES);
}
}