use crate::{atomic::Ordering, mpsc::queue::QueuePtr};
pub struct Receiver<T> {
ptr: QueuePtr<T>,
local_head: usize,
}
impl<T> Receiver<T> {
pub(crate) fn new(queue_ptr: QueuePtr<T>) -> Self {
Self {
ptr: queue_ptr,
local_head: 0,
}
}
pub fn recv(&mut self) -> T {
self.recv_with_spin_count(16)
}
pub fn recv_with_spin_count(&mut self, spin_count: u32) -> T {
let next_head = self.local_head.wrapping_add(1);
let cell = self.ptr.cell_at(self.local_head);
let mut backoff = crate::Backoff::with_spin_count(spin_count);
while cell.epoch().load(Ordering::Acquire) < next_head {
backoff.backoff();
}
let ret = unsafe { cell.get() };
cell.epoch().store(
self.local_head.wrapping_add(self.ptr.capacity),
Ordering::Release,
);
self.local_head = next_head;
ret
}
pub fn try_recv(&mut self) -> Option<T> {
let next_head = self.local_head.wrapping_add(1);
let cell = self.ptr.cell_at(self.local_head);
if cell.epoch().load(Ordering::Acquire) < next_head {
return None;
}
let ret = unsafe { cell.get() };
cell.epoch().store(
self.local_head.wrapping_add(self.ptr.capacity),
Ordering::Release,
);
self.local_head = next_head;
Some(ret)
}
}
unsafe impl<T: Send> Send for Receiver<T> {}