use core::fmt;
use core::marker::PhantomData;
use core::mem::MaybeUninit;
use core::sync::atomic::Ordering;
use crate::{Consumer, CopyToUninit, Producer};
#[allow(unused_imports)]
use crate::RingBuffer;
impl<T> Producer<T> {
pub fn write_chunk(&mut self, n: usize) -> Result<WriteChunk<'_, T>, ChunkError>
where
T: Default,
{
self.write_chunk_uninit(n).map(WriteChunk::from)
}
pub fn write_chunk_uninit(&mut self, n: usize) -> Result<WriteChunkUninit<'_, T>, ChunkError> {
let tail = self.cached_tail.get();
if self.buffer.capacity - self.buffer.distance(self.cached_head.get(), tail) < n {
let head = self.buffer.head.load(Ordering::Acquire);
self.cached_head.set(head);
let slots = self.buffer.capacity - self.buffer.distance(head, tail);
if slots < n {
return Err(ChunkError::TooFewSlots(slots));
}
}
let tail = self.buffer.collapse_position(tail);
let first_len = n.min(self.buffer.capacity - tail);
Ok(WriteChunkUninit {
first_ptr: unsafe { self.buffer.data_ptr.add(tail) },
first_len,
second_ptr: self.buffer.data_ptr,
second_len: n - first_len,
producer: self,
})
}
}
impl<T: Copy> Producer<T> {
pub fn push_partial_slice<'a>(&mut self, slice: &'a [T]) -> (&'a [T], &'a [T]) {
let slots = if self.cached_slots() < slice.len() {
slice.len().min(self.slots())
} else {
slice.len()
};
let (pushed, remainder) = slice.split_at(slots);
match self.push_entire_slice(pushed) {
Ok(()) => {}
Err(_) => unsafe { core::hint::unreachable_unchecked() },
};
(pushed, remainder)
}
pub fn push_entire_slice(&mut self, slice: &[T]) -> Result<(), ChunkError> {
let mut chunk = self.write_chunk_uninit(slice.len())?;
let (one, two) = chunk.as_mut_slices();
let mid = one.len();
slice[..mid].copy_to_uninit(one);
slice[mid..].copy_to_uninit(two);
unsafe { chunk.commit_all() };
Ok(())
}
}
impl<T> Consumer<T> {
pub fn read_chunk(&mut self, n: usize) -> Result<ReadChunk<'_, T>, ChunkError> {
let head = self.cached_head.get();
if self.buffer.distance(head, self.cached_tail.get()) < n {
let tail = self.buffer.tail.load(Ordering::Acquire);
self.cached_tail.set(tail);
let slots = self.buffer.distance(head, tail);
if slots < n {
return Err(ChunkError::TooFewSlots(slots));
}
}
let head = self.buffer.collapse_position(head);
let first_len = n.min(self.buffer.capacity - head);
Ok(ReadChunk {
first_ptr: unsafe { self.buffer.data_ptr.add(head) },
first_len,
second_ptr: self.buffer.data_ptr,
second_len: n - first_len,
consumer: self,
})
}
}
impl<T: Copy> Consumer<T> {
pub fn pop_partial_slice<'a>(&mut self, slice: &'a mut [T]) -> (&'a mut [T], &'a mut [T]) {
let (popped, remainder) =
unsafe { self.pop_partial_slice_uninit(&mut *(slice as *mut [_] as *mut _)) };
(popped, unsafe { &mut *(remainder as *mut _ as *mut [_]) })
}
#[inline]
pub fn pop_partial_slice_uninit<'a>(
&mut self,
slice: &'a mut [MaybeUninit<T>],
) -> (&'a mut [T], &'a mut [MaybeUninit<T>]) {
let slots = if self.cached_slots() < slice.len() {
slice.len().min(self.slots())
} else {
slice.len()
};
let (buffer, remainder) = slice.split_at_mut(slots);
let popped = match self.pop_entire_slice_uninit(buffer) {
Ok(popped) => popped,
Err(_) => unsafe { core::hint::unreachable_unchecked() },
};
(popped, remainder)
}
pub fn pop_entire_slice(&mut self, slice: &mut [T]) -> Result<(), ChunkError> {
let _ = unsafe { self.pop_entire_slice_uninit(&mut *(slice as *mut [_] as *mut _))? };
Ok(())
}
pub fn pop_entire_slice_uninit<'a>(
&mut self,
slice: &'a mut [MaybeUninit<T>],
) -> Result<&'a mut [T], ChunkError> {
let chunk = self.read_chunk(slice.len())?;
let (one, two) = chunk.as_slices();
let mid = one.len();
one.copy_to_uninit(&mut slice[..mid]);
two.copy_to_uninit(&mut slice[mid..]);
chunk.commit_all();
Ok(unsafe { &mut *(slice as *mut _ as *mut [_]) })
}
}
#[derive(Debug, PartialEq, Eq)]
pub struct WriteChunk<'a, T>(Option<WriteChunkUninit<'a, T>>, PhantomData<T>);
impl<T> Drop for WriteChunk<'_, T> {
fn drop(&mut self) {
if let Some(mut chunk) = self.0.take() {
unsafe { chunk.drop_suffix(0) };
}
}
}
impl<'a, T> From<WriteChunkUninit<'a, T>> for WriteChunk<'a, T>
where
T: Default,
{
fn from(chunk: WriteChunkUninit<'a, T>) -> Self {
for i in 0..chunk.first_len {
unsafe { chunk.first_ptr.add(i).write(Default::default()) };
}
for i in 0..chunk.second_len {
unsafe { chunk.second_ptr.add(i).write(Default::default()) };
}
WriteChunk(Some(chunk), PhantomData)
}
}
impl<T> WriteChunk<'_, T>
where
T: Default,
{
pub fn as_mut_slices(&mut self) -> (&mut [T], &mut [T]) {
let chunk = self.0.as_ref().unwrap();
unsafe {
(
core::slice::from_raw_parts_mut(chunk.first_ptr, chunk.first_len),
core::slice::from_raw_parts_mut(chunk.second_ptr, chunk.second_len),
)
}
}
pub fn commit(mut self, n: usize) {
let mut chunk = self.0.take().unwrap();
unsafe {
chunk.drop_suffix(n);
chunk.commit(n);
}
}
pub fn commit_all(mut self) {
let chunk = self.0.take().unwrap();
unsafe { chunk.commit_all() };
}
#[must_use]
pub fn len(&self) -> usize {
self.0.as_ref().unwrap().len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.0.as_ref().unwrap().is_empty()
}
}
#[derive(Debug, PartialEq, Eq)]
pub struct WriteChunkUninit<'a, T> {
first_ptr: *mut T,
first_len: usize,
second_ptr: *mut T,
second_len: usize,
producer: &'a Producer<T>,
}
unsafe impl<T: Send> Send for WriteChunkUninit<'_, T> {}
impl<T> WriteChunkUninit<'_, T> {
pub fn as_mut_slices(&mut self) -> (&mut [MaybeUninit<T>], &mut [MaybeUninit<T>]) {
unsafe {
(
core::slice::from_raw_parts_mut(self.first_ptr.cast(), self.first_len),
core::slice::from_raw_parts_mut(self.second_ptr.cast(), self.second_len),
)
}
}
pub unsafe fn commit(self, n: usize) {
assert!(n <= self.len(), "cannot commit more than chunk size");
unsafe { self.commit_unchecked(n) };
}
pub unsafe fn commit_all(self) {
let slots = self.len();
unsafe { self.commit_unchecked(slots) };
}
unsafe fn commit_unchecked(self, n: usize) -> usize {
let p = self.producer;
let tail = p.buffer.increment(p.cached_tail.get(), n);
p.buffer.tail.store(tail, Ordering::Release);
p.cached_tail.set(tail);
n
}
pub fn fill_from_iter<I>(self, iter: I) -> usize
where
I: IntoIterator<Item = T>,
{
let mut iter = iter.into_iter();
let mut iterated = 0;
'outer: for &(ptr, len) in &[
(self.first_ptr, self.first_len),
(self.second_ptr, self.second_len),
] {
for i in 0..len {
match iter.next() {
Some(item) => {
unsafe { ptr.add(i).write(item) };
iterated += 1;
}
None => break 'outer,
}
}
}
unsafe { self.commit_unchecked(iterated) }
}
#[must_use]
pub fn len(&self) -> usize {
self.first_len + self.second_len
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.first_len == 0
}
unsafe fn drop_suffix(&mut self, n: usize) {
for i in n..self.first_len {
unsafe { self.first_ptr.add(i).drop_in_place() };
}
for i in n.saturating_sub(self.first_len)..self.second_len {
unsafe { self.second_ptr.add(i).drop_in_place() };
}
}
}
#[derive(Debug, PartialEq, Eq)]
pub struct ReadChunk<'a, T> {
first_ptr: *mut T,
first_len: usize,
second_ptr: *mut T,
second_len: usize,
consumer: &'a Consumer<T>,
}
unsafe impl<T: Send> Send for ReadChunk<'_, T> {}
impl<T> ReadChunk<'_, T> {
#[must_use]
pub fn as_slices(&self) -> (&[T], &[T]) {
unsafe {
(
core::slice::from_raw_parts(self.first_ptr, self.first_len),
core::slice::from_raw_parts(self.second_ptr, self.second_len),
)
}
}
#[must_use]
pub fn as_mut_slices(&mut self) -> (&mut [T], &mut [T]) {
unsafe {
(
core::slice::from_raw_parts_mut(self.first_ptr, self.first_len),
core::slice::from_raw_parts_mut(self.second_ptr, self.second_len),
)
}
}
pub fn commit(self, n: usize) {
assert!(n <= self.len(), "cannot commit more than chunk size");
unsafe { self.commit_unchecked(n) };
}
pub fn commit_all(self) {
let slots = self.len();
unsafe { self.commit_unchecked(slots) };
}
unsafe fn commit_unchecked(self, n: usize) -> usize {
let first_len = self.first_len.min(n);
for i in 0..first_len {
unsafe { self.first_ptr.add(i).drop_in_place() };
}
let second_len = self.second_len.min(n - first_len);
for i in 0..second_len {
unsafe { self.second_ptr.add(i).drop_in_place() };
}
let c = self.consumer;
let head = c.buffer.increment(c.cached_head.get(), n);
c.buffer.head.store(head, Ordering::Release);
c.cached_head.set(head);
n
}
#[must_use]
pub fn len(&self) -> usize {
self.first_len + self.second_len
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.first_len == 0
}
}
impl<'a, T> IntoIterator for ReadChunk<'a, T> {
type Item = T;
type IntoIter = ReadChunkIntoIter<'a, T>;
fn into_iter(self) -> Self::IntoIter {
Self::IntoIter {
chunk: self,
iterated: 0,
}
}
}
#[derive(Debug)]
pub struct ReadChunkIntoIter<'a, T> {
chunk: ReadChunk<'a, T>,
iterated: usize,
}
impl<T> Drop for ReadChunkIntoIter<'_, T> {
fn drop(&mut self) {
let c = &self.chunk.consumer;
let head = c.buffer.increment(c.cached_head.get(), self.iterated);
c.buffer.head.store(head, Ordering::Release);
c.cached_head.set(head);
}
}
impl<T> Iterator for ReadChunkIntoIter<'_, T> {
type Item = T;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
let ptr = if self.iterated < self.chunk.first_len {
unsafe { self.chunk.first_ptr.add(self.iterated) }
} else if self.iterated < self.chunk.first_len + self.chunk.second_len {
unsafe {
self.chunk
.second_ptr
.add(self.iterated - self.chunk.first_len)
}
} else {
return None;
};
self.iterated += 1;
Some(unsafe { ptr.read() })
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
let remaining = self.chunk.first_len + self.chunk.second_len - self.iterated;
(remaining, Some(remaining))
}
}
impl<T> ExactSizeIterator for ReadChunkIntoIter<'_, T> {}
impl<T> core::iter::FusedIterator for ReadChunkIntoIter<'_, T> {}
#[cfg(feature = "std")]
impl std::io::Write for Producer<u8> {
#[inline]
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
if buf.is_empty() {
return Ok(0);
}
match self.push_partial_slice(buf) {
([], _) => Err(std::io::ErrorKind::WouldBlock.into()),
(pushed, _) => Ok(pushed.len()),
}
}
#[inline]
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
#[cfg(feature = "std")]
impl std::io::Read for Consumer<u8> {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
if buf.is_empty() {
return Ok(0);
}
match self.pop_partial_slice(buf) {
([], _) => Err(std::io::ErrorKind::WouldBlock.into()),
(popped, _) => Ok(popped.len()),
}
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum ChunkError {
TooFewSlots(usize),
}
#[cfg(feature = "std")]
impl std::error::Error for ChunkError {}
impl fmt::Display for ChunkError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ChunkError::TooFewSlots(n) => {
alloc::format!("only {} slots available in ring buffer", n).fmt(f)
}
}
}
}