use crate::{
Backoff, Box,
read_guard::BatchReader,
spsc::{self, shards::ShardsPtr},
};
pub struct Receiver<T> {
receivers: Box<[spsc::Receiver<T>]>,
max_shards: usize,
next_shard: usize,
}
impl<T> Receiver<T> {
pub(crate) fn new(shards: ShardsPtr<T>, max_shards: usize) -> Self {
let mut receivers = Box::new_uninit_slice(max_shards);
for i in 0..max_shards {
let shard = shards.clone_queue_ptr(i);
receivers[i].write(spsc::Receiver::new(shard));
}
Self {
receivers: unsafe { receivers.assume_init() },
max_shards,
next_shard: 0,
}
}
pub fn recv(&mut self) -> T {
self.recv_with_spin_count(128)
}
pub fn recv_with_spin_count(&mut self, spin_count: u32) -> T {
let mut backoff = Backoff::with_spin_count(spin_count);
loop {
match self.try_recv() {
None => backoff.backoff(),
Some(ret) => return ret,
}
}
}
pub fn try_recv(&mut self) -> Option<T> {
let start = self.next_shard;
loop {
let ret = self.receivers[self.next_shard].try_recv();
if ret.is_some() {
return ret;
}
self.next_shard += 1;
if self.next_shard == self.max_shards {
self.next_shard = 0;
}
if self.next_shard == start {
return None;
}
}
}
pub fn read_guard(&mut self) -> crate::read_guard::ReadGuard<'_, Self> {
crate::read_guard::ReadGuard::new(self)
}
}
unsafe impl<T> BatchReader for Receiver<T> {
type Item = T;
fn read_buffer(&mut self) -> &[T] {
let start = self.next_shard;
loop {
let ret = self.receivers[self.next_shard].read_buffer();
if !ret.is_empty() {
return unsafe { core::mem::transmute::<&[T], &[T]>(ret) };
}
self.next_shard += 1;
if self.next_shard == self.max_shards {
self.next_shard = 0;
}
if self.next_shard == start {
return &[];
}
}
}
unsafe fn advance(&mut self, len: usize) {
unsafe { self.receivers[self.next_shard].advance(len) };
}
}
unsafe impl<T> Send for Receiver<T> {}