use crate::{
atomic::Ordering,
queue::{Ownership, RefCounted},
read_guard::BatchReader,
spsc::queue::QueuePtr,
};
#[cfg(feature = "async")]
use core::{
pin::Pin,
task::{Context, Poll},
};
pub struct Receiver<T, O: Ownership = RefCounted> {
pub ptr: QueuePtr<T, O>,
pub local_tail: usize,
pub local_head: usize,
}
impl<T, O: Ownership> Receiver<T, O> {
pub(crate) fn new(queue_ptr: QueuePtr<T, O>) -> Self {
Self {
ptr: queue_ptr,
local_tail: 0,
local_head: 0,
}
}
pub fn try_recv(&mut self) -> Option<T> {
if self.local_head == self.local_tail {
self.load_tail();
if self.local_head == self.local_tail {
return None;
}
}
let ret = unsafe { self.ptr.get(self.local_head) };
let new_head = self.local_head.wrapping_add(1);
self.store_head(new_head);
self.local_head = new_head;
#[cfg(feature = "async")]
self.ptr.wake_sender();
Some(ret)
}
pub fn recv(&mut self) -> T {
self.recv_with_spin_count(128)
}
pub fn recv_with_spin_count(&mut self, spin_count: u32) -> T {
let mut backoff = crate::Backoff::with_spin_count(spin_count);
while self.local_head == self.local_tail {
backoff.backoff();
self.load_tail();
}
let ret = unsafe { self.ptr.get(self.local_head) };
let new_head = self.local_head.wrapping_add(1);
self.store_head(new_head);
self.local_head = new_head;
#[cfg(feature = "async")]
self.ptr.wake_sender();
ret
}
pub fn read_guard(&mut self) -> crate::read_guard::ReadGuard<'_, Self> {
crate::read_guard::ReadGuard::new(self)
}
#[cfg(feature = "async")]
pub async fn recv_async(&mut self) -> T {
futures::future::poll_fn(|cx| self.poll_recv(cx)).await
}
#[cfg(feature = "async")]
fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<T> {
if self.local_head == self.local_tail {
self.load_tail();
if self.local_head == self.local_tail {
self.ptr.register_receiver_waker(cx.waker());
self.local_tail = self.ptr.tail().load(Ordering::SeqCst);
if self.local_head == self.local_tail {
return Poll::Pending;
}
}
}
let ret = unsafe { self.ptr.get(self.local_head) };
let new_head = self.local_head.wrapping_add(1);
self.store_head(new_head);
self.local_head = new_head;
self.ptr.wake_sender();
Poll::Ready(ret)
}
#[inline(always)]
fn store_head(&self, value: usize) {
self.ptr.head().store(value, Ordering::Release);
}
#[inline(always)]
fn load_tail(&mut self) {
self.local_tail = self.ptr.tail().load(Ordering::Acquire);
}
#[inline(always)]
pub(crate) fn refresh_head(&mut self) {
self.local_head = self.ptr.head().load(Ordering::Acquire);
if self.local_tail < self.local_head {
self.local_tail = self.ptr.tail().load(Ordering::Acquire);
}
}
}
unsafe impl<T: Send, O: Ownership> Send for Receiver<T, O> {}
impl<T, O: Ownership> Unpin for Receiver<T, O> {}
#[cfg(feature = "async")]
impl<T, O: Ownership> futures::Stream for Receiver<T, O> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.poll_recv(cx).map(Some)
}
}
unsafe impl<T, O: Ownership> BatchReader for Receiver<T, O> {
type Item = T;
#[inline]
fn read_buffer(&mut self) -> &[T] {
let mut available = self.local_tail.wrapping_sub(self.local_head);
if available == 0 {
self.load_tail();
available = self.local_tail.wrapping_sub(self.local_head);
}
let start = self.local_head & self.ptr.mask;
let contiguous = self.ptr.capacity - start;
let len = available.min(contiguous);
unsafe {
let ptr = self.ptr.exact_at(start);
core::slice::from_raw_parts(ptr.as_ptr(), len)
}
}
#[inline(always)]
unsafe fn advance(&mut self, n: usize) {
#[cfg(debug_assertions)]
{
let start = self.local_head & self.ptr.mask;
let contiguous = self.ptr.capacity - start;
let available = contiguous.min(self.local_tail.wrapping_sub(self.local_head));
assert!(
n <= available,
"advancing ({n}) more than available space ({available})"
);
}
let new_head = self.local_head.wrapping_add(n);
self.store_head(new_head);
self.local_head = new_head;
#[cfg(feature = "async")]
self.ptr.wake_sender();
}
}