use core::cmp::Ordering as Cmp;
use crate::{atomic::Ordering, mpmc::queue::QueuePtr};
#[derive(Clone)]
pub struct Sender<T> {
ptr: QueuePtr<T>,
local_tail: usize,
}
impl<T> Sender<T> {
pub(crate) fn new(queue_ptr: QueuePtr<T>) -> Self {
Self {
ptr: queue_ptr,
local_tail: 0,
}
}
pub fn send(&mut self, value: T) {
self.send_with_spin_count(value, 6, 10);
}
pub fn send_with_spin_count(&mut self, mut value: T, spin_limit: u32, yield_limit: u32) {
let mut backoff = crate::ExponentialBackoff::new(spin_limit, yield_limit);
loop {
match self.try_send(value) {
Ok(()) => return,
Err(ret) => {
value = ret;
if backoff.backoff() {
backoff.reset();
}
}
}
}
}
pub fn try_send(&mut self, value: T) -> Result<(), T> {
let mut backoff = crate::ExponentialBackoff::new(6, 10);
let cell = loop {
let cell = self.ptr.cell_at(self.local_tail);
let epoch = cell.epoch().load(Ordering::Acquire);
match epoch.cmp(&self.local_tail) {
Cmp::Less => return Err(value),
Cmp::Equal => {
let next_epoch = self.local_tail.wrapping_add(1);
match self.ptr.tail().compare_exchange_weak(
self.local_tail,
next_epoch,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => {
self.local_tail = next_epoch;
break cell;
}
Err(cur_tail) => self.local_tail = cur_tail,
}
}
Cmp::Greater => self.local_tail = self.ptr.tail().load(Ordering::Relaxed),
};
backoff.backoff();
};
cell.set(value);
cell.epoch().store(self.local_tail, Ordering::Release);
Ok(())
}
}
unsafe impl<T: Send> Send for Sender<T> {}