use core::cmp::Ordering as Cmp;
#[cfg(feature = "std")]
use crate::mpmc::queue::FutexState;
use crate::{atomic::Ordering, mpmc::queue::QueuePtr};
#[derive(Clone)]
pub struct Sender<T> {
ptr: QueuePtr<T>,
local_tail: usize,
}
impl<T> Sender<T> {
pub(crate) fn new(queue_ptr: QueuePtr<T>) -> Self {
Self {
ptr: queue_ptr,
local_tail: 0,
}
}
pub fn send(&mut self, value: T) {
self.send_with_spin_count(value, 128, 1);
}
pub fn send_with_spin_count(&mut self, mut value: T, spin_limit: u32, yield_limit: u32) {
let mut backoff = crate::ParkingBackoff::new(spin_limit, yield_limit);
loop {
if let Err(ret) = self.try_send(value) {
value = ret;
#[cfg(feature = "std")]
if backoff.backoff() && self.ptr.prepare_wait(FutexState::SendersWaiting) {
if let Err(ret) = self.try_send(value) {
value = ret;
self.ptr.wait(FutexState::SendersWaiting);
} else {
return;
}
}
#[cfg(not(feature = "std"))]
backoff.backoff();
} else {
return;
}
}
}
pub fn try_send(&mut self, value: T) -> Result<(), T> {
let mut backoff = crate::ExponentialBackoff::new(6, 10);
let cell = loop {
let cell = self.ptr.cell_at(self.local_tail);
let epoch = cell.epoch().load(Ordering::Acquire);
match epoch.cmp(&self.local_tail) {
Cmp::Less => return Err(value),
Cmp::Equal => {
let next_epoch = self.local_tail.wrapping_add(1);
match self.ptr.tail().compare_exchange_weak(
self.local_tail,
next_epoch,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => {
self.local_tail = next_epoch;
break cell;
}
Err(cur_tail) => self.local_tail = cur_tail,
}
}
Cmp::Greater => self.local_tail = self.ptr.tail().load(Ordering::Relaxed),
};
backoff.backoff();
};
cell.set(value);
cell.epoch().store(self.local_tail, Ordering::Release);
#[cfg(feature = "std")]
self.ptr.wake();
Ok(())
}
}
unsafe impl<T: Send> Send for Sender<T> {}