use std::fmt;
use std::iter::FusedIterator;
use std::mem;
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::sync::Arc;
use std::time::{Duration, Instant};
use crate::context::Context;
use crate::counter;
use crate::err::{
RecvError, RecvTimeoutError, SendError, SendTimeoutError, TryRecvError, TrySendError,
};
use crate::flavors;
use crate::select::{Operation, SelectHandle, Token};
pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
let (s, r) = counter::new(flavors::list::Channel::new());
let s = Sender {
flavor: SenderFlavor::List(s),
};
let r = Receiver {
flavor: ReceiverFlavor::List(r),
};
(s, r)
}
pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
if cap == 0 {
let (s, r) = counter::new(flavors::zero::Channel::new());
let s = Sender {
flavor: SenderFlavor::Zero(s),
};
let r = Receiver {
flavor: ReceiverFlavor::Zero(r),
};
(s, r)
} else {
let (s, r) = counter::new(flavors::array::Channel::with_capacity(cap));
let s = Sender {
flavor: SenderFlavor::Array(s),
};
let r = Receiver {
flavor: ReceiverFlavor::Array(r),
};
(s, r)
}
}
pub fn after(duration: Duration) -> Receiver<Instant> {
match Instant::now().checked_add(duration) {
Some(deadline) => Receiver {
flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_deadline(deadline))),
},
None => never(),
}
}
pub fn at(when: Instant) -> Receiver<Instant> {
Receiver {
flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_deadline(when))),
}
}
pub fn never<T>() -> Receiver<T> {
Receiver {
flavor: ReceiverFlavor::Never(flavors::never::Channel::new()),
}
}
pub fn tick(duration: Duration) -> Receiver<Instant> {
match Instant::now().checked_add(duration) {
Some(delivery_time) => Receiver {
flavor: ReceiverFlavor::Tick(Arc::new(flavors::tick::Channel::new(
delivery_time,
duration,
))),
},
None => never(),
}
}
pub struct Sender<T> {
flavor: SenderFlavor<T>,
}
enum SenderFlavor<T> {
Array(counter::Sender<flavors::array::Channel<T>>),
List(counter::Sender<flavors::list::Channel<T>>),
Zero(counter::Sender<flavors::zero::Channel<T>>),
}
unsafe impl<T: Send> Send for Sender<T> {}
unsafe impl<T: Send> Sync for Sender<T> {}
impl<T> UnwindSafe for Sender<T> {}
impl<T> RefUnwindSafe for Sender<T> {}
impl<T> Sender<T> {
pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
match &self.flavor {
SenderFlavor::Array(chan) => chan.try_send(msg),
SenderFlavor::List(chan) => chan.try_send(msg),
SenderFlavor::Zero(chan) => chan.try_send(msg),
}
}
pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
match &self.flavor {
SenderFlavor::Array(chan) => chan.send(msg, None),
SenderFlavor::List(chan) => chan.send(msg, None),
SenderFlavor::Zero(chan) => chan.send(msg, None),
}
.map_err(|err| match err {
SendTimeoutError::Disconnected(msg) => SendError(msg),
SendTimeoutError::Timeout(_) => unreachable!(),
})
}
pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
match Instant::now().checked_add(timeout) {
Some(deadline) => self.send_deadline(msg, deadline),
None => self.send(msg).map_err(SendTimeoutError::from),
}
}
pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
match &self.flavor {
SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)),
SenderFlavor::List(chan) => chan.send(msg, Some(deadline)),
SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)),
}
}
pub fn is_empty(&self) -> bool {
match &self.flavor {
SenderFlavor::Array(chan) => chan.is_empty(),
SenderFlavor::List(chan) => chan.is_empty(),
SenderFlavor::Zero(chan) => chan.is_empty(),
}
}
pub fn is_full(&self) -> bool {
match &self.flavor {
SenderFlavor::Array(chan) => chan.is_full(),
SenderFlavor::List(chan) => chan.is_full(),
SenderFlavor::Zero(chan) => chan.is_full(),
}
}
pub fn len(&self) -> usize {
match &self.flavor {
SenderFlavor::Array(chan) => chan.len(),
SenderFlavor::List(chan) => chan.len(),
SenderFlavor::Zero(chan) => chan.len(),
}
}
pub fn capacity(&self) -> Option<usize> {
match &self.flavor {
SenderFlavor::Array(chan) => chan.capacity(),
SenderFlavor::List(chan) => chan.capacity(),
SenderFlavor::Zero(chan) => chan.capacity(),
}
}
pub fn same_channel(&self, other: &Sender<T>) -> bool {
match (&self.flavor, &other.flavor) {
(SenderFlavor::Array(ref a), SenderFlavor::Array(ref b)) => a == b,
(SenderFlavor::List(ref a), SenderFlavor::List(ref b)) => a == b,
(SenderFlavor::Zero(ref a), SenderFlavor::Zero(ref b)) => a == b,
_ => false,
}
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
unsafe {
match &self.flavor {
SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()),
SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()),
SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
}
}
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
let flavor = match &self.flavor {
SenderFlavor::Array(chan) => SenderFlavor::Array(chan.acquire()),
SenderFlavor::List(chan) => SenderFlavor::List(chan.acquire()),
SenderFlavor::Zero(chan) => SenderFlavor::Zero(chan.acquire()),
};
Sender { flavor }
}
}
impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Sender { .. }")
}
}
pub struct Receiver<T> {
flavor: ReceiverFlavor<T>,
}
enum ReceiverFlavor<T> {
Array(counter::Receiver<flavors::array::Channel<T>>),
List(counter::Receiver<flavors::list::Channel<T>>),
Zero(counter::Receiver<flavors::zero::Channel<T>>),
At(Arc<flavors::at::Channel>),
Tick(Arc<flavors::tick::Channel>),
Never(flavors::never::Channel<T>),
}
unsafe impl<T: Send> Send for Receiver<T> {}
unsafe impl<T: Send> Sync for Receiver<T> {}
impl<T> UnwindSafe for Receiver<T> {}
impl<T> RefUnwindSafe for Receiver<T> {}
impl<T> Receiver<T> {
pub fn try_recv(&self) -> Result<T, TryRecvError> {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.try_recv(),
ReceiverFlavor::List(chan) => chan.try_recv(),
ReceiverFlavor::Zero(chan) => chan.try_recv(),
ReceiverFlavor::At(chan) => {
let msg = chan.try_recv();
unsafe {
mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>(
&msg,
)
}
}
ReceiverFlavor::Tick(chan) => {
let msg = chan.try_recv();
unsafe {
mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>(
&msg,
)
}
}
ReceiverFlavor::Never(chan) => chan.try_recv(),
}
}
pub fn recv(&self) -> Result<T, RecvError> {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.recv(None),
ReceiverFlavor::List(chan) => chan.recv(None),
ReceiverFlavor::Zero(chan) => chan.recv(None),
ReceiverFlavor::At(chan) => {
let msg = chan.recv(None);
unsafe {
mem::transmute_copy::<
Result<Instant, RecvTimeoutError>,
Result<T, RecvTimeoutError>,
>(&msg)
}
}
ReceiverFlavor::Tick(chan) => {
let msg = chan.recv(None);
unsafe {
mem::transmute_copy::<
Result<Instant, RecvTimeoutError>,
Result<T, RecvTimeoutError>,
>(&msg)
}
}
ReceiverFlavor::Never(chan) => chan.recv(None),
}
.map_err(|_| RecvError)
}
pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
match Instant::now().checked_add(timeout) {
Some(deadline) => self.recv_deadline(deadline),
None => self.recv().map_err(RecvTimeoutError::from),
}
}
pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)),
ReceiverFlavor::List(chan) => chan.recv(Some(deadline)),
ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)),
ReceiverFlavor::At(chan) => {
let msg = chan.recv(Some(deadline));
unsafe {
mem::transmute_copy::<
Result<Instant, RecvTimeoutError>,
Result<T, RecvTimeoutError>,
>(&msg)
}
}
ReceiverFlavor::Tick(chan) => {
let msg = chan.recv(Some(deadline));
unsafe {
mem::transmute_copy::<
Result<Instant, RecvTimeoutError>,
Result<T, RecvTimeoutError>,
>(&msg)
}
}
ReceiverFlavor::Never(chan) => chan.recv(Some(deadline)),
}
}
pub fn is_empty(&self) -> bool {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.is_empty(),
ReceiverFlavor::List(chan) => chan.is_empty(),
ReceiverFlavor::Zero(chan) => chan.is_empty(),
ReceiverFlavor::At(chan) => chan.is_empty(),
ReceiverFlavor::Tick(chan) => chan.is_empty(),
ReceiverFlavor::Never(chan) => chan.is_empty(),
}
}
pub fn is_full(&self) -> bool {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.is_full(),
ReceiverFlavor::List(chan) => chan.is_full(),
ReceiverFlavor::Zero(chan) => chan.is_full(),
ReceiverFlavor::At(chan) => chan.is_full(),
ReceiverFlavor::Tick(chan) => chan.is_full(),
ReceiverFlavor::Never(chan) => chan.is_full(),
}
}
pub fn len(&self) -> usize {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.len(),
ReceiverFlavor::List(chan) => chan.len(),
ReceiverFlavor::Zero(chan) => chan.len(),
ReceiverFlavor::At(chan) => chan.len(),
ReceiverFlavor::Tick(chan) => chan.len(),
ReceiverFlavor::Never(chan) => chan.len(),
}
}
pub fn capacity(&self) -> Option<usize> {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.capacity(),
ReceiverFlavor::List(chan) => chan.capacity(),
ReceiverFlavor::Zero(chan) => chan.capacity(),
ReceiverFlavor::At(chan) => chan.capacity(),
ReceiverFlavor::Tick(chan) => chan.capacity(),
ReceiverFlavor::Never(chan) => chan.capacity(),
}
}
pub fn iter(&self) -> Iter<'_, T> {
Iter { receiver: self }
}
pub fn try_iter(&self) -> TryIter<'_, T> {
TryIter { receiver: self }
}
pub fn same_channel(&self, other: &Receiver<T>) -> bool {
match (&self.flavor, &other.flavor) {
(ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b,
(ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b,
(ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b,
(ReceiverFlavor::At(a), ReceiverFlavor::At(b)) => Arc::ptr_eq(a, b),
(ReceiverFlavor::Tick(a), ReceiverFlavor::Tick(b)) => Arc::ptr_eq(a, b),
(ReceiverFlavor::Never(_), ReceiverFlavor::Never(_)) => true,
_ => false,
}
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
unsafe {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()),
ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()),
ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
ReceiverFlavor::At(_) => {}
ReceiverFlavor::Tick(_) => {}
ReceiverFlavor::Never(_) => {}
}
}
}
}
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Self {
let flavor = match &self.flavor {
ReceiverFlavor::Array(chan) => ReceiverFlavor::Array(chan.acquire()),
ReceiverFlavor::List(chan) => ReceiverFlavor::List(chan.acquire()),
ReceiverFlavor::Zero(chan) => ReceiverFlavor::Zero(chan.acquire()),
ReceiverFlavor::At(chan) => ReceiverFlavor::At(chan.clone()),
ReceiverFlavor::Tick(chan) => ReceiverFlavor::Tick(chan.clone()),
ReceiverFlavor::Never(_) => ReceiverFlavor::Never(flavors::never::Channel::new()),
};
Receiver { flavor }
}
}
impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Receiver { .. }")
}
}
impl<'a, T> IntoIterator for &'a Receiver<T> {
type Item = T;
type IntoIter = Iter<'a, T>;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
impl<T> IntoIterator for Receiver<T> {
type Item = T;
type IntoIter = IntoIter<T>;
fn into_iter(self) -> Self::IntoIter {
IntoIter { receiver: self }
}
}
pub struct Iter<'a, T> {
receiver: &'a Receiver<T>,
}
impl<T> FusedIterator for Iter<'_, T> {}
impl<T> Iterator for Iter<'_, T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.receiver.recv().ok()
}
}
impl<T> fmt::Debug for Iter<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Iter { .. }")
}
}
pub struct TryIter<'a, T> {
receiver: &'a Receiver<T>,
}
impl<T> Iterator for TryIter<'_, T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.receiver.try_recv().ok()
}
}
impl<T> fmt::Debug for TryIter<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("TryIter { .. }")
}
}
pub struct IntoIter<T> {
receiver: Receiver<T>,
}
impl<T> FusedIterator for IntoIter<T> {}
impl<T> Iterator for IntoIter<T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.receiver.recv().ok()
}
}
impl<T> fmt::Debug for IntoIter<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("IntoIter { .. }")
}
}
impl<T> SelectHandle for Sender<T> {
fn try_select(&self, token: &mut Token) -> bool {
match &self.flavor {
SenderFlavor::Array(chan) => chan.sender().try_select(token),
SenderFlavor::List(chan) => chan.sender().try_select(token),
SenderFlavor::Zero(chan) => chan.sender().try_select(token),
}
}
fn deadline(&self) -> Option<Instant> {
None
}
fn register(&self, oper: Operation, cx: &Context) -> bool {
match &self.flavor {
SenderFlavor::Array(chan) => chan.sender().register(oper, cx),
SenderFlavor::List(chan) => chan.sender().register(oper, cx),
SenderFlavor::Zero(chan) => chan.sender().register(oper, cx),
}
}
fn unregister(&self, oper: Operation) {
match &self.flavor {
SenderFlavor::Array(chan) => chan.sender().unregister(oper),
SenderFlavor::List(chan) => chan.sender().unregister(oper),
SenderFlavor::Zero(chan) => chan.sender().unregister(oper),
}
}
fn accept(&self, token: &mut Token, cx: &Context) -> bool {
match &self.flavor {
SenderFlavor::Array(chan) => chan.sender().accept(token, cx),
SenderFlavor::List(chan) => chan.sender().accept(token, cx),
SenderFlavor::Zero(chan) => chan.sender().accept(token, cx),
}
}
fn is_ready(&self) -> bool {
match &self.flavor {
SenderFlavor::Array(chan) => chan.sender().is_ready(),
SenderFlavor::List(chan) => chan.sender().is_ready(),
SenderFlavor::Zero(chan) => chan.sender().is_ready(),
}
}
fn watch(&self, oper: Operation, cx: &Context) -> bool {
match &self.flavor {
SenderFlavor::Array(chan) => chan.sender().watch(oper, cx),
SenderFlavor::List(chan) => chan.sender().watch(oper, cx),
SenderFlavor::Zero(chan) => chan.sender().watch(oper, cx),
}
}
fn unwatch(&self, oper: Operation) {
match &self.flavor {
SenderFlavor::Array(chan) => chan.sender().unwatch(oper),
SenderFlavor::List(chan) => chan.sender().unwatch(oper),
SenderFlavor::Zero(chan) => chan.sender().unwatch(oper),
}
}
}
impl<T> SelectHandle for Receiver<T> {
fn try_select(&self, token: &mut Token) -> bool {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.receiver().try_select(token),
ReceiverFlavor::List(chan) => chan.receiver().try_select(token),
ReceiverFlavor::Zero(chan) => chan.receiver().try_select(token),
ReceiverFlavor::At(chan) => chan.try_select(token),
ReceiverFlavor::Tick(chan) => chan.try_select(token),
ReceiverFlavor::Never(chan) => chan.try_select(token),
}
}
fn deadline(&self) -> Option<Instant> {
match &self.flavor {
ReceiverFlavor::Array(_) => None,
ReceiverFlavor::List(_) => None,
ReceiverFlavor::Zero(_) => None,
ReceiverFlavor::At(chan) => chan.deadline(),
ReceiverFlavor::Tick(chan) => chan.deadline(),
ReceiverFlavor::Never(chan) => chan.deadline(),
}
}
fn register(&self, oper: Operation, cx: &Context) -> bool {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.receiver().register(oper, cx),
ReceiverFlavor::List(chan) => chan.receiver().register(oper, cx),
ReceiverFlavor::Zero(chan) => chan.receiver().register(oper, cx),
ReceiverFlavor::At(chan) => chan.register(oper, cx),
ReceiverFlavor::Tick(chan) => chan.register(oper, cx),
ReceiverFlavor::Never(chan) => chan.register(oper, cx),
}
}
fn unregister(&self, oper: Operation) {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.receiver().unregister(oper),
ReceiverFlavor::List(chan) => chan.receiver().unregister(oper),
ReceiverFlavor::Zero(chan) => chan.receiver().unregister(oper),
ReceiverFlavor::At(chan) => chan.unregister(oper),
ReceiverFlavor::Tick(chan) => chan.unregister(oper),
ReceiverFlavor::Never(chan) => chan.unregister(oper),
}
}
fn accept(&self, token: &mut Token, cx: &Context) -> bool {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.receiver().accept(token, cx),
ReceiverFlavor::List(chan) => chan.receiver().accept(token, cx),
ReceiverFlavor::Zero(chan) => chan.receiver().accept(token, cx),
ReceiverFlavor::At(chan) => chan.accept(token, cx),
ReceiverFlavor::Tick(chan) => chan.accept(token, cx),
ReceiverFlavor::Never(chan) => chan.accept(token, cx),
}
}
fn is_ready(&self) -> bool {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.receiver().is_ready(),
ReceiverFlavor::List(chan) => chan.receiver().is_ready(),
ReceiverFlavor::Zero(chan) => chan.receiver().is_ready(),
ReceiverFlavor::At(chan) => chan.is_ready(),
ReceiverFlavor::Tick(chan) => chan.is_ready(),
ReceiverFlavor::Never(chan) => chan.is_ready(),
}
}
fn watch(&self, oper: Operation, cx: &Context) -> bool {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.receiver().watch(oper, cx),
ReceiverFlavor::List(chan) => chan.receiver().watch(oper, cx),
ReceiverFlavor::Zero(chan) => chan.receiver().watch(oper, cx),
ReceiverFlavor::At(chan) => chan.watch(oper, cx),
ReceiverFlavor::Tick(chan) => chan.watch(oper, cx),
ReceiverFlavor::Never(chan) => chan.watch(oper, cx),
}
}
fn unwatch(&self, oper: Operation) {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.receiver().unwatch(oper),
ReceiverFlavor::List(chan) => chan.receiver().unwatch(oper),
ReceiverFlavor::Zero(chan) => chan.receiver().unwatch(oper),
ReceiverFlavor::At(chan) => chan.unwatch(oper),
ReceiverFlavor::Tick(chan) => chan.unwatch(oper),
ReceiverFlavor::Never(chan) => chan.unwatch(oper),
}
}
}
pub(crate) unsafe fn write<T>(s: &Sender<T>, token: &mut Token, msg: T) -> Result<(), T> {
match &s.flavor {
SenderFlavor::Array(chan) => chan.write(token, msg),
SenderFlavor::List(chan) => chan.write(token, msg),
SenderFlavor::Zero(chan) => chan.write(token, msg),
}
}
pub(crate) unsafe fn read<T>(r: &Receiver<T>, token: &mut Token) -> Result<T, ()> {
match &r.flavor {
ReceiverFlavor::Array(chan) => chan.read(token),
ReceiverFlavor::List(chan) => chan.read(token),
ReceiverFlavor::Zero(chan) => chan.read(token),
ReceiverFlavor::At(chan) => {
mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token))
}
ReceiverFlavor::Tick(chan) => {
mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token))
}
ReceiverFlavor::Never(chan) => chan.read(token),
}
}