#[cfg(feature = "select")]
pub mod select;
#[cfg(feature = "async")]
pub mod r#async;
#[cfg(feature = "select")]
pub use select::Selector;
use std::{
collections::VecDeque,
sync::{Arc, Condvar, Mutex, WaitTimeoutResult, atomic::{AtomicUsize, Ordering}},
time::{Duration, Instant},
cell::{UnsafeCell, RefCell},
marker::PhantomData,
thread,
};
#[cfg(windows)]
use std::sync::{Mutex as InnerMutex, MutexGuard};
#[cfg(not(windows))]
use spin::{Mutex as InnerMutex, MutexGuard};
#[cfg(feature = "async")]
use std::task::Waker;
#[cfg(feature = "select")]
use crate::select::Token;
#[cfg(feature = "async")]
use crate::r#async::RecvFuture;
use std::cell::Cell;
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct SendError<T>(pub T);
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum RecvError {
Disconnected,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum TrySendError<T> {
Full(T),
Disconnected(T),
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum TryRecvError {
Empty,
Disconnected,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum RecvTimeoutError {
Timeout,
Disconnected,
}
#[derive(Default)]
struct Signal<T = ()> {
lock: Mutex<T>,
trigger: Condvar,
waiters: AtomicUsize,
}
impl<T> Signal<T> {
fn wait<G>(&self, sync_guard: G) {
let guard = self.lock.lock().unwrap();
self.waiters.fetch_add(1, Ordering::Relaxed);
drop(sync_guard);
let guard = self.trigger.wait(guard).unwrap();
self.waiters.fetch_sub(1, Ordering::Relaxed);
}
fn do_then_wait_while<G>(&self, sync_guard: G, first: impl FnOnce(&mut T), cond: impl FnMut(&mut T) -> bool) {
let mut guard = self.lock.lock().unwrap();
self.waiters.fetch_add(1, Ordering::Relaxed);
drop(sync_guard);
first(&mut *guard);
let guard = self.trigger.wait_while(guard, cond).unwrap();
self.waiters.fetch_sub(1, Ordering::Relaxed);
}
fn wait_then<G, R>(&self, sync_guard: G, then: impl FnOnce(&mut T) -> R) -> R {
let guard = self.lock.lock().unwrap();
self.waiters.fetch_add(1, Ordering::Relaxed);
drop(sync_guard);
let mut guard = self.trigger.wait(guard).unwrap();
self.waiters.fetch_sub(1, Ordering::Relaxed);
then(&mut *guard)
}
fn wait_timeout<G>(&self, dur: Duration, sync_guard: G) -> WaitTimeoutResult {
let guard = self.lock.lock().unwrap();
self.waiters.fetch_add(1, Ordering::Relaxed);
drop(sync_guard);
let (guard, timeout) = self.trigger.wait_timeout(guard, dur).unwrap();
self.waiters.fetch_sub(1, Ordering::Relaxed);
timeout
}
fn notify_one<G>(&self, sync_guard: G) {
if self.waiters.load(Ordering::Relaxed) > 0 {
drop(sync_guard);
let _guard = self.lock.lock().unwrap();
self.trigger.notify_one();
}
}
fn notify_one_with<G>(&self, f: impl FnOnce(&mut T), sync_guard: G) {
if self.waiters.load(Ordering::Relaxed) > 0 {
drop(sync_guard);
let mut guard = self.lock.lock().unwrap();
f(&mut *guard);
self.trigger.notify_one();
}
}
fn notify_all<G>(&self, sync_guard: G) {
if self.waiters.load(Ordering::Relaxed) > 0 {
drop(sync_guard);
let _guard = self.lock.lock().unwrap();
self.trigger.notify_all();
}
}
}
struct Queue<T>(VecDeque<T>, Option<usize>);
impl<T> Queue<T> {
fn new(cap: Option<usize>) -> Self { Self(VecDeque::new(), cap) }
fn len(&self) -> usize { self.0.len() }
fn push(&mut self, x: T) -> Result<(), T> {
if self.1 == Some(self.0.len()) {
Err(x)
} else {
self.0.push_back(x);
Ok(())
}
}
fn pop(&mut self) -> Option<T> {
self.0.pop_front()
}
fn swap(&mut self, buf: &mut VecDeque<T>) {
if !self.1.is_some() {
std::mem::swap(&mut self.0, buf);
}
}
fn take(&mut self) -> Self {
std::mem::replace(self, Queue(VecDeque::new(), self.1))
}
}
struct Inner<T> {
queue: Queue<T>,
#[cfg(feature = "select")]
send_selector_counter: usize,
#[cfg(feature = "select")]
send_selectors: Vec<(usize, Arc<Signal<Token>>, Token)>,
#[cfg(feature = "select")]
recv_selector: Option<(Arc<Signal<Token>>, Token)>,
#[cfg(feature = "async")]
recv_waker: Option<Waker>,
sender_count: usize,
listen_mode: usize,
}
struct Shared<T> {
inner: InnerMutex<Inner<T>>,
send_signal: Signal,
recv_signal: Option<Signal>,
rendezvous_signal: Option<Signal<Option<T>>>,
}
impl<T> Shared<T> {
fn new(cap: Option<usize>) -> Self {
Self {
inner: InnerMutex::new(Inner {
queue: Queue::new(cap),
#[cfg(feature = "select")]
send_selector_counter: 0,
#[cfg(feature = "select")]
send_selectors: Vec::new(),
#[cfg(feature = "select")]
recv_selector: None,
#[cfg(feature = "async")]
recv_waker: None,
sender_count: 1,
listen_mode: 1,
}),
send_signal: Signal::default(),
recv_signal: if cap.is_some() { Some(Signal::default()) } else { None },
rendezvous_signal: if cap == Some(0) { Some(Signal::default()) } else { None },
}
}
#[inline]
fn lock_inner(&self) -> MutexGuard<Inner<T>> {
#[cfg(windows)] { self.inner.lock().unwrap() }
#[cfg(not(windows))] { self.inner.lock() }
}
#[inline]
#[cfg(not(windows))]
fn wait_inner(&self) -> MutexGuard<'_, Inner<T>> {
let mut i = 0;
loop {
for _ in 0..5 {
if let Some(inner) = self.inner.try_lock() {
return inner;
}
thread::yield_now();
}
thread::sleep(Duration::from_nanos(i * 50));
i += 1;
}
}
#[inline]
#[cfg(windows)]
fn wait_inner(&self) -> MutexGuard<'_, Inner<T>> {
self.lock_inner()
}
#[inline]
#[cfg(feature = "async")]
fn poll_inner(&self) -> Option<MutexGuard<'_, Inner<T>>> {
#[cfg(windows)] { self.inner.try_lock().ok() }
#[cfg(not(windows))] { self.inner.try_lock() }
}
#[inline]
fn try_send(&self, msg: T) -> Result<(), (MutexGuard<Inner<T>>, TrySendError<T>)> {
let mut inner = self.wait_inner();
if inner.listen_mode == 0 {
return Err((inner, TrySendError::Disconnected(msg)));
}
match inner.queue.push(msg) {
Err(msg) => return Err((inner, TrySendError::Full(msg))),
Ok(()) => {},
};
#[cfg(feature = "select")]
{
if let Some((signal, token)) = &inner.recv_selector {
signal.notify_one_with(|t| *t = *token, ());
}
}
#[cfg(feature = "async")]
{
if let Some(recv_waker) = &inner.recv_waker {
recv_waker.wake_by_ref();
}
}
self.send_signal.notify_one(inner);
Ok(())
}
#[inline]
fn send(&self, mut msg: T) -> Result<(), SendError<T>> {
loop {
let inner = match self.try_send(msg) {
Ok(()) => return Ok(()),
Err((_, TrySendError::Disconnected(msg))) => return Err(SendError(msg)),
Err((inner, TrySendError::Full(m))) => if let Some(sig) = self.rendezvous_signal.as_ref() {
sig.do_then_wait_while(inner, |msg| {
*msg = Some(m);
self.send_signal.notify_one(());
}, |msg| msg.is_some());
return Ok(());
} else {
msg = m;
inner
},
};
if let Some(recv_signal) = self.recv_signal.as_ref() {
recv_signal.wait(inner);
}
}
}
#[inline]
fn all_senders_disconnected(&self) {
self.send_signal.notify_all(self.inner.lock());
#[cfg(feature = "async")]
{
if let Some(recv_waker) = &self.lock_inner().recv_waker {
recv_waker.wake_by_ref();
}
}
}
#[inline]
fn receiver_disconnected(&self) {
if let Some(recv_signal) = self.recv_signal.as_ref() {
recv_signal.notify_all(self.inner.lock());
}
}
#[inline]
fn take_remaining(&self) -> Queue<T> {
self.wait_inner().queue.take()
}
#[inline]
fn try_recv<'a>(
&'a self,
take_inner: impl FnOnce() -> MutexGuard<'a, Inner<T>>,
buf: &mut VecDeque<T>,
finished: &Cell<bool>,
) -> Result<T, (MutexGuard<Inner<T>>, TryRecvError)> {
if let Some(msg) = buf.pop_front() {
return Ok(msg);
}
let mut inner = take_inner();
if let Some(rendezvous_signal) = self.rendezvous_signal.as_ref() {
let mut msg = None;
rendezvous_signal.notify_one_with(|m| msg = m.take(), ());
if let Some(msg) = msg {
return Ok(msg);
} else {
return Err((inner, TryRecvError::Empty));
}
}
let msg = match inner.queue.pop() {
Some(msg) => msg,
None if inner.sender_count == 0 => {
finished.set(true);
return Err((inner, TryRecvError::Disconnected));
},
None => return Err((inner, TryRecvError::Empty)),
};
inner.queue.swap(buf);
#[cfg(feature = "select")]
{
inner
.send_selectors
.iter()
.for_each(|(_, signal, token)| {
signal.notify_one_with(|t| *t = *token, ());
});
}
if let Some(recv_signal) = self.recv_signal.as_ref() {
recv_signal.notify_one(inner);
}
Ok(msg)
}
#[inline]
fn recv(
&self,
buf: &mut VecDeque<T>,
finished: &Cell<bool>,
) -> Result<T, RecvError> {
loop {
let mut i = 0;
let inner = loop {
match self.try_recv(|| self.wait_inner(), buf, finished) {
Ok(msg) => return Ok(msg),
Err((_, TryRecvError::Disconnected)) => return Err(RecvError::Disconnected),
Err((inner, TryRecvError::Empty)) if i == 3 => break inner,
Err((_, TryRecvError::Empty)) => {},
};
thread::yield_now();
i += 1;
};
self.send_signal.wait(inner);
}
}
#[inline]
fn recv_deadline(
&self,
deadline: Instant,
buf: &mut VecDeque<T>,
finished: &Cell<bool>,
) -> Result<T, RecvTimeoutError> {
let mut inner = match self.try_recv(|| self.wait_inner(), buf, finished) {
Ok(msg) => return Ok(msg),
Err((_, TryRecvError::Disconnected)) => return Err(RecvTimeoutError::Disconnected),
Err((inner, TryRecvError::Empty)) => inner,
};
loop {
let now = Instant::now();
let timeout = if now >= deadline {
break Err(RecvTimeoutError::Timeout);
} else {
deadline.duration_since(now)
};
let timeout = self.send_signal.wait_timeout(timeout, inner);
if timeout.timed_out() {
break Err(RecvTimeoutError::Timeout);
}
inner = match self.try_recv(|| self.wait_inner(), buf, finished) {
Ok(msg) => return Ok(msg),
Err((inner, TryRecvError::Empty)) => inner,
Err((_, TryRecvError::Disconnected)) => return Err(RecvTimeoutError::Disconnected),
};
}
}
#[cfg(feature = "select")]
#[inline]
fn connect_send_selector(&self, signal: Arc<Signal<Token>>, token: Token) -> usize {
let mut inner = self.lock_inner();
inner.send_selector_counter += 1;
let id = inner.send_selector_counter;
inner.send_selectors.push((id, signal, token));
id
}
#[cfg(feature = "select")]
#[inline]
fn disconnect_send_selector(&self, id: usize) {
self.lock_inner().send_selectors.retain(|(s_id, _, _)| s_id != &id);
}
#[cfg(feature = "select")]
#[inline]
fn connect_recv_selector(&self, signal: Arc<Signal<Token>>, token: Token) {
self.lock_inner().recv_selector = Some((signal, token));
}
#[cfg(feature = "select")]
#[inline]
fn disconnect_recv_selector(&self) {
self.lock_inner().recv_selector = None;
}
}
pub struct Sender<T> {
shared: Arc<Shared<T>>,
}
impl<T> Sender<T> {
pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
self.shared.send(msg)
}
pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
self.shared.try_send(msg).map(|_| ()).map_err(|(_, err)| err)
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
self.shared.wait_inner().sender_count += 1;
Self { shared: self.shared.clone() }
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
if {
let mut inner = self.shared.wait_inner();
inner.sender_count -= 1;
inner.sender_count
} == 0 {
self.shared.all_senders_disconnected();
}
}
}
pub struct Receiver<T> {
shared: Arc<Shared<T>>,
buffer: RefCell<VecDeque<T>>,
_phantom_cell: UnsafeCell<()>,
finished: Cell<bool>,
}
impl<T> Receiver<T> {
pub fn recv(&self) -> Result<T, RecvError> {
self.shared.recv(&mut self.buffer.borrow_mut(), &self.finished)
}
pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
self.shared.recv_deadline(
Instant::now().checked_add(timeout).unwrap(),
&mut self.buffer.borrow_mut(),
&self.finished
)
}
pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
self.shared.recv_deadline(
deadline,
&mut self.buffer.borrow_mut(),
&self.finished
)
}
#[cfg(feature = "async")]
pub fn recv_async(&mut self) -> RecvFuture<T> {
RecvFuture::new(self)
}
pub fn try_recv(&self) -> Result<T, TryRecvError> {
self
.shared
.try_recv(
|| self.shared.wait_inner(),
&mut self.buffer.borrow_mut(),
&self.finished
)
.map_err(|(_, err)| err)
}
pub fn iter(&self) -> Iter<T> {
Iter { receiver: &self }
}
pub fn try_iter(&self) -> TryIter<T> {
TryIter { receiver: &self }
}
pub fn drain(&self) -> Drain<T> {
Drain { queue: self.shared.take_remaining(), _phantom: PhantomData }
}
}
impl<T> IntoIterator for Receiver<T> {
type Item = T;
type IntoIter = IntoIter<T>;
fn into_iter(self) -> Self::IntoIter {
IntoIter { receiver: self }
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
self.shared.wait_inner().listen_mode = 0;
self.shared.receiver_disconnected();
}
}
pub struct Iter<'a, T> {
receiver: &'a Receiver<T>,
}
impl<'a, T> Iterator for Iter<'a, T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.receiver.recv().ok()
}
}
pub struct TryIter<'a, T> {
receiver: &'a Receiver<T>,
}
impl<'a, T> Iterator for TryIter<'a, T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.receiver.try_recv().ok()
}
}
pub struct Drain<'a, T> {
queue: Queue<T>,
_phantom: PhantomData<&'a ()>,
}
impl<'a, T> Iterator for Drain<'a, T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.queue.pop()
}
}
impl<'a, T> ExactSizeIterator for Drain<'a, T> {
fn len(&self) -> usize {
self.queue.len()
}
}
pub struct IntoIter<T> {
receiver: Receiver<T>,
}
impl<T> Iterator for IntoIter<T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.receiver.recv().ok()
}
}
pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
let shared = Arc::new(Shared::new(None));
(
Sender { shared: shared.clone() },
Receiver {
shared,
buffer: RefCell::new(VecDeque::new()),
finished: Cell::new(false),
_phantom_cell: UnsafeCell::new(())
},
)
}
pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
let shared = Arc::new(Shared::new(Some(cap)));
(
Sender { shared: shared.clone() },
Receiver {
shared,
buffer: RefCell::new(VecDeque::new()),
finished: Cell::new(false),
_phantom_cell: UnsafeCell::new(())
},
)
}