#[cfg(feature = "select")]
pub mod select;
#[cfg(feature = "async")]
pub mod r#async;
mod signal;
#[cfg(feature = "select")]
pub use select::Selector;
use std::{
collections::VecDeque,
sync::{Arc, atomic::{AtomicUsize, AtomicBool, Ordering}},
time::{Duration, Instant},
marker::PhantomData,
thread,
fmt,
};
use spinning_top::{Spinlock, SpinlockGuard};
use crate::signal::{Signal, SyncSignal};
#[derive(Copy, Clone, PartialEq, Eq)]
pub struct SendError<T>(pub T);
impl<T> fmt::Debug for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
"SendError(..)".fmt(f)
}
}
impl<T> fmt::Display for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
"sending on a closed channel".fmt(f)
}
}
impl<T> std::error::Error for SendError<T> {}
#[derive(Copy, Clone, PartialEq, Eq)]
pub enum TrySendError<T> {
Full(T),
Disconnected(T),
}
impl<T> fmt::Debug for TrySendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
TrySendError::Full(..) => "Full(..)".fmt(f),
TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
}
}
}
impl<T> fmt::Display for TrySendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TrySendError::Full(..) => "sending on a full channel".fmt(f),
TrySendError::Disconnected(..) => "sending on a closed channel".fmt(f),
}
}
}
impl<T> std::error::Error for TrySendError<T> {}
#[derive(Copy, Clone, PartialEq, Eq)]
pub enum SendTimeoutError<T> {
Timeout(T),
Disconnected(T),
}
impl<T> fmt::Debug for SendTimeoutError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
"SendTimeoutError(..)".fmt(f)
}
}
impl<T> fmt::Display for SendTimeoutError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SendTimeoutError::Timeout(..) => "timed out sending on a full channel".fmt(f),
SendTimeoutError::Disconnected(..) => "sending on a closed channel".fmt(f),
}
}
}
impl<T> std::error::Error for SendTimeoutError<T> {}
enum TrySendTimeoutError<T> {
Full(T),
Disconnected(T),
Timeout(T),
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum RecvError {
Disconnected,
}
impl fmt::Display for RecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RecvError::Disconnected => "receiving on a closed channel".fmt(f),
}
}
}
impl std::error::Error for RecvError {}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum TryRecvError {
Empty,
Disconnected,
}
impl fmt::Display for TryRecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TryRecvError::Empty => "receiving on an empty channel".fmt(f),
TryRecvError::Disconnected => "channel is empty and closed".fmt(f),
}
}
}
impl std::error::Error for TryRecvError {}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum RecvTimeoutError {
Timeout,
Disconnected,
}
impl fmt::Display for RecvTimeoutError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RecvTimeoutError::Timeout => "timed out waiting on a channel".fmt(f),
RecvTimeoutError::Disconnected => "channel is empty and closed".fmt(f),
}
}
}
impl std::error::Error for RecvTimeoutError {}
enum TryRecvTimeoutError {
Empty,
Timeout,
Disconnected,
}
struct Hook<T, S: ?Sized>(Option<Spinlock<Option<T>>>, S);
impl<T, S: ?Sized + Signal> Hook<T, S> {
pub fn slot(msg: Option<T>, signal: S) -> Arc<Self> where S: Sized {
Arc::new(Self(Some(Spinlock::new(msg)), signal))
}
pub fn trigger(signal: S) -> Arc<Self> where S: Sized {
Arc::new(Self(None, signal))
}
pub fn signal(&self) -> &S {
&self.1
}
pub fn fire_nothing(&self) -> bool {
self.signal().fire()
}
pub fn fire_recv(&self) -> T {
let msg = self.0.as_ref().unwrap().lock().take().unwrap();
let _ = self.signal().fire();
msg
}
pub fn fire_send(&self, msg: T) -> (Option<T>, bool) {
let ret = match &self.0 {
Some(hook) => {
*hook.lock() = Some(msg);
None
},
None => Some(msg),
};
let was_stream = self.signal().fire();
(ret, was_stream)
}
pub fn is_empty(&self) -> bool {
self.0.as_ref().map(|s| s.lock().is_none()).unwrap_or(true)
}
pub fn try_take(&self) -> Option<T> {
self.0.as_ref().and_then(|s| s.lock().take())
}
}
impl<T> Hook<T, SyncSignal> {
pub fn wait_recv(&self, abort: &AtomicBool) -> Option<T> {
loop {
let disconnected = abort.load(Ordering::SeqCst);
let msg = self.0.as_ref().unwrap().lock().take();
if let Some(msg) = msg {
break Some(msg);
} else if disconnected {
break None;
} else {
self.signal().wait()
}
}
}
pub fn wait_deadline_recv(&self, abort: &AtomicBool, deadline: Instant) -> Result<T, bool> {
loop {
let disconnected = abort.load(Ordering::SeqCst);
let msg = self.0.as_ref().unwrap().lock().take();
if let Some(msg) = msg {
break Ok(msg);
} else if disconnected {
break Err(false);
} else if let Some(dur) = deadline.checked_duration_since(Instant::now()) {
self.signal().wait_timeout(dur);
} else {
break Err(true);
}
}
}
pub fn wait_send(&self, abort: &AtomicBool) {
loop {
let disconnected = abort.load(Ordering::SeqCst);
if disconnected || self.0.as_ref().unwrap().lock().is_none() {
break;
}
self.signal().wait();
}
}
pub fn wait_deadline_send(&self, abort: &AtomicBool, deadline: Instant) -> Result<(), bool> {
loop {
let disconnected = abort.load(Ordering::SeqCst);
if self.0.as_ref().unwrap().lock().is_none() {
break Ok(());
} else if disconnected {
break Err(false);
} else if let Some(dur) = deadline.checked_duration_since(Instant::now()) {
self.signal().wait_timeout(dur);
} else {
break Err(true);
}
}
}
}
#[inline]
#[cfg(not(windows))]
fn wait_lock<'a, T>(lock: &'a Spinlock<T>) -> SpinlockGuard<'a, T> {
let mut i = 4;
loop {
for _ in 0..10 {
if let Some(guard) = lock.try_lock() {
return guard;
}
thread::yield_now();
}
thread::sleep(Duration::from_nanos(1 << i));
i += 1;
}
}
#[inline]
#[cfg(windows)]
fn wait_lock<'a, T>(lock: &'a Mutex<T>) -> MutexGuard<'a, T> {
lock.lock().unwrap()
}
#[cfg(windows)]
use std::sync::{Mutex, MutexGuard};
#[cfg(not(windows))]
type ChanLock<T> = Spinlock<T>;
#[cfg(windows)]
type ChanLock<T> = Mutex<T>;
struct Chan<T> {
sending: Option<(usize, VecDeque<Arc<Hook<T, dyn signal::Signal>>>)>,
queue: VecDeque<T>,
waiting: VecDeque<Arc<Hook<T, dyn signal::Signal>>>,
}
impl<T> Chan<T> {
fn pull_pending(&mut self, pull_extra: bool) {
if let Some((cap, sending)) = &mut self.sending {
let effective_cap = *cap + pull_extra as usize;
while self.queue.len() < effective_cap {
if let Some(s) = sending.pop_front() {
self.queue.push_back(s.fire_recv());
} else {
break;
}
}
}
}
fn try_wake_receiver_if_pending(&mut self) {
if !self.queue.is_empty() {
while Some(false) == self.waiting.pop_front().map(|s| s.fire_nothing()) {}
}
}
}
struct Shared<T> {
chan: ChanLock<Chan<T>>,
disconnected: AtomicBool,
sender_count: AtomicUsize,
receiver_count: AtomicUsize,
}
impl<T> Shared<T> {
fn new(cap: Option<usize>) -> Self {
Self {
chan: ChanLock::new(Chan {
sending: cap.map(|cap| (cap, VecDeque::new())),
queue: VecDeque::new(),
waiting: VecDeque::new(),
}),
disconnected: AtomicBool::new(false),
sender_count: AtomicUsize::new(1),
receiver_count: AtomicUsize::new(1),
}
}
fn send<S: Signal, R: From<Result<(), TrySendTimeoutError<T>>>>(
&self,
msg: T,
should_block: bool,
make_signal: impl FnOnce(T) -> Arc<Hook<T, S>>,
do_block: impl FnOnce(Arc<Hook<T, S>>) -> R,
) -> R {
let mut chan = wait_lock(&self.chan);
if self.is_disconnected() {
Err(TrySendTimeoutError::Disconnected(msg)).into()
} else if !chan.waiting.is_empty() {
let mut msg = Some(msg);
loop {
match chan.waiting.pop_front().map(|r| r.fire_send(msg.take().unwrap())) {
None if msg.is_none() => break,
None => {
chan.queue.push_front(msg.unwrap());
break;
}
Some((Some(m), true)) => {
msg.replace(m);
continue
},
Some((Some(m), false)) => {
chan.queue.push_front(m);
break
}
Some((None, _)) => break,
}
}
Ok(()).into()
} else if chan.sending.as_ref().map(|(cap, _)| chan.queue.len() < *cap).unwrap_or(true) {
chan.queue.push_back(msg);
Ok(()).into()
} else if should_block {
let hook = make_signal(msg);
chan.sending.as_mut().unwrap().1.push_back(hook.clone());
drop(chan);
do_block(hook)
} else {
Err(TrySendTimeoutError::Full(msg)).into()
}
}
fn send_sync(
&self,
msg: T,
block: Option<Option<Instant>>,
) -> Result<(), TrySendTimeoutError<T>> {
self.send(
msg,
block.is_some(),
|msg| Hook::slot(Some(msg), SyncSignal::default()),
|hook| if let Some(deadline) = block.unwrap() {
hook.wait_deadline_send(&self.disconnected, deadline)
.or_else(|timed_out| {
if timed_out {
let hook: Arc<Hook<T, dyn signal::Signal>> = hook.clone();
wait_lock(&self.chan).sending
.as_mut()
.unwrap().1
.retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
}
hook.try_take().map(|msg| if self.is_disconnected() {
Err(TrySendTimeoutError::Disconnected(msg))
} else {
Err(TrySendTimeoutError::Timeout(msg))
})
.unwrap_or(Ok(()))
})
} else {
hook.wait_send(&self.disconnected);
match hook.try_take() {
Some(msg) => Err(TrySendTimeoutError::Disconnected(msg)),
None => Ok(()),
}
},
)
}
fn recv<S: Signal, R: From<Result<T, TryRecvTimeoutError>>>(
&self,
should_block: bool,
make_signal: impl FnOnce() -> Arc<Hook<T, S>>,
do_block: impl FnOnce(Arc<Hook<T, S>>) -> R,
) -> R {
let mut chan = wait_lock(&self.chan);
chan.pull_pending(true);
if let Some(msg) = chan.queue.pop_front() {
Ok(msg).into()
} else if self.is_disconnected() {
Err(TryRecvTimeoutError::Disconnected).into()
} else if should_block {
let hook = make_signal();
chan.waiting.push_back(hook.clone());
drop(chan);
do_block(hook)
} else {
Err(TryRecvTimeoutError::Empty).into()
}
}
fn recv_sync(&self, block: Option<Option<Instant>>) -> Result<T, TryRecvTimeoutError> {
self.recv(
block.is_some(),
|| Hook::slot(None, SyncSignal::default()),
|hook| if let Some(deadline) = block.unwrap() {
hook.wait_deadline_recv(&self.disconnected, deadline)
.or_else(|timed_out| {
if timed_out {
let hook: Arc<Hook<T, dyn Signal>> = hook.clone();
wait_lock(&self.chan).waiting
.retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
}
match hook.try_take() {
Some(msg) => Ok(msg),
None => {
let disconnected = self.is_disconnected();
if let Some(msg) = wait_lock(&self.chan).queue.pop_front() {
Ok(msg)
} else if disconnected {
Err(TryRecvTimeoutError::Disconnected)
} else {
Err(TryRecvTimeoutError::Timeout)
}
},
}
})
} else {
hook.wait_recv(&self.disconnected)
.or_else(|| wait_lock(&self.chan).queue.pop_front())
.ok_or(TryRecvTimeoutError::Disconnected)
},
)
}
fn disconnect_all(&self) {
self.disconnected.store(true, Ordering::Relaxed);
let mut chan = wait_lock(&self.chan);
chan.pull_pending(false);
chan.sending.as_ref().map(|(_, sending)| sending.iter().for_each(|hook| {
hook.signal().fire();
}));
chan.waiting.iter().for_each(|hook| { hook.signal().fire(); });
}
fn is_disconnected(&self) -> bool {
self.disconnected.load(Ordering::SeqCst)
}
fn is_empty(&self) -> bool {
self.len() == 0
}
fn is_full(&self) -> bool {
self.capacity().map(|cap| cap == self.len()).unwrap_or(false)
}
fn len(&self) -> usize {
let mut chan = wait_lock(&self.chan);
chan.pull_pending(false);
chan.queue.len()
}
fn capacity(&self) -> Option<usize> {
wait_lock(&self.chan).sending.as_ref().map(|(cap, _)| *cap)
}
}
pub struct Sender<T> {
shared: Arc<Shared<T>>,
}
impl<T> Sender<T> {
pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
self.shared.send_sync(msg, None).map_err(|err| match err {
TrySendTimeoutError::Full(msg) => TrySendError::Full(msg),
TrySendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
_ => unreachable!(),
})
}
pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
self.shared.send_sync(msg, Some(None)).map_err(|err| match err {
TrySendTimeoutError::Disconnected(msg) => SendError(msg),
_ => unreachable!(),
})
}
pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
self.shared.send_sync(msg, Some(Some(deadline))).map_err(|err| match err {
TrySendTimeoutError::Disconnected(msg) => SendTimeoutError::Disconnected(msg),
TrySendTimeoutError::Timeout(msg) => SendTimeoutError::Timeout(msg),
_ => unreachable!(),
})
}
pub fn send_timeout(&self, msg: T, dur: Duration) -> Result<(), SendTimeoutError<T>> {
self.send_deadline(msg, Instant::now().checked_add(dur).unwrap())
}
pub fn is_disconnected(&self) -> bool {
self.shared.is_disconnected()
}
pub fn is_empty(&self) -> bool {
self.shared.is_empty()
}
pub fn is_full(&self) -> bool {
self.shared.is_full()
}
pub fn len(&self) -> usize {
self.shared.len()
}
pub fn capacity(&self) -> Option<usize> {
self.shared.capacity()
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
self.shared.sender_count.fetch_add(1, Ordering::Relaxed);
Self { shared: self.shared.clone() }
}
}
impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Sender").finish()
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
if self.shared.sender_count.fetch_sub(1, Ordering::Relaxed) == 1 {
self.shared.disconnect_all();
}
}
}
pub struct Receiver<T> {
shared: Arc<Shared<T>>,
}
impl<T> Receiver<T> {
pub fn try_recv(&self) -> Result<T, TryRecvError> {
self.shared.recv_sync(None).map_err(|err| match err {
TryRecvTimeoutError::Disconnected => TryRecvError::Disconnected,
TryRecvTimeoutError::Empty => TryRecvError::Empty,
_ => unreachable!(),
})
}
pub fn recv(&self) -> Result<T, RecvError> {
self.shared.recv_sync(Some(None)).map_err(|err| match err {
TryRecvTimeoutError::Disconnected => RecvError::Disconnected,
_ => unreachable!(),
})
}
pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
self.shared.recv_sync(Some(Some(deadline))).map_err(|err| match err {
TryRecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected,
TryRecvTimeoutError::Timeout => RecvTimeoutError::Timeout,
_ => unreachable!(),
})
}
pub fn recv_timeout(&self, dur: Duration) -> Result<T, RecvTimeoutError> {
self.recv_deadline(Instant::now().checked_add(dur).unwrap())
}
pub fn iter(&self) -> Iter<T> {
Iter { receiver: &self }
}
pub fn try_iter(&self) -> TryIter<T> {
TryIter { receiver: &self }
}
pub fn drain(&self) -> Drain<T> {
let mut chan = wait_lock(&self.shared.chan);
chan.pull_pending(false);
let queue = std::mem::take(&mut chan.queue);
Drain { queue, _phantom: PhantomData }
}
pub fn is_disconnected(&self) -> bool {
self.shared.is_disconnected()
}
pub fn is_empty(&self) -> bool {
self.shared.is_empty()
}
pub fn is_full(&self) -> bool {
self.shared.is_full()
}
pub fn len(&self) -> usize {
self.shared.len()
}
pub fn capacity(&self) -> Option<usize> {
self.shared.capacity()
}
}
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Self {
self.shared.receiver_count.fetch_add(1, Ordering::Relaxed);
Self { shared: self.shared.clone() }
}
}
impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Receiver").finish()
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
if self.shared.receiver_count.fetch_sub(1, Ordering::Relaxed) == 1 {
self.shared.disconnect_all();
}
}
}
impl<'a, T> IntoIterator for &'a Receiver<T> {
type Item = T;
type IntoIter = Iter<'a, T>;
fn into_iter(self) -> Self::IntoIter {
Iter { receiver: self }
}
}
impl<T> IntoIterator for Receiver<T> {
type Item = T;
type IntoIter = IntoIter<T>;
fn into_iter(self) -> Self::IntoIter {
IntoIter { receiver: self }
}
}
pub struct Iter<'a, T> {
receiver: &'a Receiver<T>,
}
impl<'a, T> Iterator for Iter<'a, T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.receiver.recv().ok()
}
}
pub struct TryIter<'a, T> {
receiver: &'a Receiver<T>,
}
impl<'a, T> Iterator for TryIter<'a, T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.receiver.try_recv().ok()
}
}
#[derive(Debug)]
pub struct Drain<'a, T> {
queue: VecDeque<T>,
_phantom: PhantomData<&'a ()>,
}
impl<'a, T> Iterator for Drain<'a, T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.queue.pop_front()
}
}
impl<'a, T> ExactSizeIterator for Drain<'a, T> {
fn len(&self) -> usize {
self.queue.len()
}
}
pub struct IntoIter<T> {
receiver: Receiver<T>,
}
impl<T> Iterator for IntoIter<T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.receiver.recv().ok()
}
}
pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
let shared = Arc::new(Shared::new(None));
(
Sender { shared: shared.clone() },
Receiver { shared },
)
}
pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
let shared = Arc::new(Shared::new(Some(cap)));
(
Sender { shared: shared.clone() },
Receiver { shared },
)
}