use std::fmt;
use std::iter::FusedIterator;
use std::mem;
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::sync::Arc;
use std::time::{Duration, Instant};
use context::Context;
use counter;
use err::{RecvError, RecvTimeoutError, SendError, SendTimeoutError, TryRecvError, TrySendError};
use flavors;
use select::{Operation, SelectHandle, Token};
pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
let (s, r) = counter::new(flavors::list::Channel::new());
let s = Sender {
flavor: SenderFlavor::List(s),
};
let r = Receiver {
flavor: ReceiverFlavor::List(r),
};
(s, r)
}
pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
if cap == 0 {
let (s, r) = counter::new(flavors::zero::Channel::new());
let s = Sender {
flavor: SenderFlavor::Zero(s),
};
let r = Receiver {
flavor: ReceiverFlavor::Zero(r),
};
(s, r)
} else {
let (s, r) = counter::new(flavors::array::Channel::with_capacity(cap));
let s = Sender {
flavor: SenderFlavor::Array(s),
};
let r = Receiver {
flavor: ReceiverFlavor::Array(r),
};
(s, r)
}
}
pub fn after(duration: Duration) -> Receiver<Instant> {
Receiver {
flavor: ReceiverFlavor::After(Arc::new(flavors::after::Channel::new(duration))),
}
}
pub fn never<T>() -> Receiver<T> {
Receiver {
flavor: ReceiverFlavor::Never(flavors::never::Channel::new()),
}
}
pub fn tick(duration: Duration) -> Receiver<Instant> {
Receiver {
flavor: ReceiverFlavor::Tick(Arc::new(flavors::tick::Channel::new(duration))),
}
}
pub struct Sender<T> {
flavor: SenderFlavor<T>,
}
enum SenderFlavor<T> {
Array(counter::Sender<flavors::array::Channel<T>>),
List(counter::Sender<flavors::list::Channel<T>>),
Zero(counter::Sender<flavors::zero::Channel<T>>),
}
unsafe impl<T: Send> Send for Sender<T> {}
unsafe impl<T: Send> Sync for Sender<T> {}
impl<T> UnwindSafe for Sender<T> {}
impl<T> RefUnwindSafe for Sender<T> {}
impl<T> Sender<T> {
pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
match &self.flavor {
SenderFlavor::Array(chan) => chan.try_send(msg),
SenderFlavor::List(chan) => chan.try_send(msg),
SenderFlavor::Zero(chan) => chan.try_send(msg),
}
}
pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
match &self.flavor {
SenderFlavor::Array(chan) => chan.send(msg, None),
SenderFlavor::List(chan) => chan.send(msg, None),
SenderFlavor::Zero(chan) => chan.send(msg, None),
}
.map_err(|err| match err {
SendTimeoutError::Disconnected(msg) => SendError(msg),
SendTimeoutError::Timeout(_) => unreachable!(),
})
}
pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
let deadline = Instant::now() + timeout;
match &self.flavor {
SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)),
SenderFlavor::List(chan) => chan.send(msg, Some(deadline)),
SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)),
}
}
pub fn is_empty(&self) -> bool {
match &self.flavor {
SenderFlavor::Array(chan) => chan.is_empty(),
SenderFlavor::List(chan) => chan.is_empty(),
SenderFlavor::Zero(chan) => chan.is_empty(),
}
}
pub fn is_full(&self) -> bool {
match &self.flavor {
SenderFlavor::Array(chan) => chan.is_full(),
SenderFlavor::List(chan) => chan.is_full(),
SenderFlavor::Zero(chan) => chan.is_full(),
}
}
pub fn len(&self) -> usize {
match &self.flavor {
SenderFlavor::Array(chan) => chan.len(),
SenderFlavor::List(chan) => chan.len(),
SenderFlavor::Zero(chan) => chan.len(),
}
}
pub fn capacity(&self) -> Option<usize> {
match &self.flavor {
SenderFlavor::Array(chan) => chan.capacity(),
SenderFlavor::List(chan) => chan.capacity(),
SenderFlavor::Zero(chan) => chan.capacity(),
}
}
pub fn same_channel(&self, other: &Sender<T>) -> bool {
match (&self.flavor, &other.flavor) {
(SenderFlavor::Array(ref a), SenderFlavor::Array(ref b)) => a == b,
(SenderFlavor::List(ref a), SenderFlavor::List(ref b)) => a == b,
(SenderFlavor::Zero(ref a), SenderFlavor::Zero(ref b)) => a == b,
_ => false,
}
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
unsafe {
match &self.flavor {
SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()),
SenderFlavor::List(chan) => chan.release(|c| c.disconnect()),
SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
}
}
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
let flavor = match &self.flavor {
SenderFlavor::Array(chan) => SenderFlavor::Array(chan.acquire()),
SenderFlavor::List(chan) => SenderFlavor::List(chan.acquire()),
SenderFlavor::Zero(chan) => SenderFlavor::Zero(chan.acquire()),
};
Sender { flavor }
}
}
impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("Sender { .. }")
}
}
pub struct Receiver<T> {
flavor: ReceiverFlavor<T>,
}
enum ReceiverFlavor<T> {
Array(counter::Receiver<flavors::array::Channel<T>>),
List(counter::Receiver<flavors::list::Channel<T>>),
Zero(counter::Receiver<flavors::zero::Channel<T>>),
After(Arc<flavors::after::Channel>),
Tick(Arc<flavors::tick::Channel>),
Never(flavors::never::Channel<T>),
}
unsafe impl<T: Send> Send for Receiver<T> {}
unsafe impl<T: Send> Sync for Receiver<T> {}
impl<T> UnwindSafe for Receiver<T> {}
impl<T> RefUnwindSafe for Receiver<T> {}
impl<T> Receiver<T> {
pub fn try_recv(&self) -> Result<T, TryRecvError> {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.try_recv(),
ReceiverFlavor::List(chan) => chan.try_recv(),
ReceiverFlavor::Zero(chan) => chan.try_recv(),
ReceiverFlavor::After(chan) => {
let msg = chan.try_recv();
unsafe {
mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>(
&msg,
)
}
}
ReceiverFlavor::Tick(chan) => {
let msg = chan.try_recv();
unsafe {
mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>(
&msg,
)
}
}
ReceiverFlavor::Never(chan) => chan.try_recv(),
}
}
pub fn recv(&self) -> Result<T, RecvError> {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.recv(None),
ReceiverFlavor::List(chan) => chan.recv(None),
ReceiverFlavor::Zero(chan) => chan.recv(None),
ReceiverFlavor::After(chan) => {
let msg = chan.recv(None);
unsafe {
mem::transmute_copy::<
Result<Instant, RecvTimeoutError>,
Result<T, RecvTimeoutError>,
>(&msg)
}
}
ReceiverFlavor::Tick(chan) => {
let msg = chan.recv(None);
unsafe {
mem::transmute_copy::<
Result<Instant, RecvTimeoutError>,
Result<T, RecvTimeoutError>,
>(&msg)
}
}
ReceiverFlavor::Never(chan) => chan.recv(None),
}
.map_err(|_| RecvError)
}
pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
let deadline = Instant::now() + timeout;
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)),
ReceiverFlavor::List(chan) => chan.recv(Some(deadline)),
ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)),
ReceiverFlavor::After(chan) => {
let msg = chan.recv(Some(deadline));
unsafe {
mem::transmute_copy::<
Result<Instant, RecvTimeoutError>,
Result<T, RecvTimeoutError>,
>(&msg)
}
}
ReceiverFlavor::Tick(chan) => {
let msg = chan.recv(Some(deadline));
unsafe {
mem::transmute_copy::<
Result<Instant, RecvTimeoutError>,
Result<T, RecvTimeoutError>,
>(&msg)
}
}
ReceiverFlavor::Never(chan) => chan.recv(Some(deadline)),
}
}
pub fn is_empty(&self) -> bool {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.is_empty(),
ReceiverFlavor::List(chan) => chan.is_empty(),
ReceiverFlavor::Zero(chan) => chan.is_empty(),
ReceiverFlavor::After(chan) => chan.is_empty(),
ReceiverFlavor::Tick(chan) => chan.is_empty(),
ReceiverFlavor::Never(chan) => chan.is_empty(),
}
}
pub fn is_full(&self) -> bool {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.is_full(),
ReceiverFlavor::List(chan) => chan.is_full(),
ReceiverFlavor::Zero(chan) => chan.is_full(),
ReceiverFlavor::After(chan) => chan.is_full(),
ReceiverFlavor::Tick(chan) => chan.is_full(),
ReceiverFlavor::Never(chan) => chan.is_full(),
}
}
pub fn len(&self) -> usize {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.len(),
ReceiverFlavor::List(chan) => chan.len(),
ReceiverFlavor::Zero(chan) => chan.len(),
ReceiverFlavor::After(chan) => chan.len(),
ReceiverFlavor::Tick(chan) => chan.len(),
ReceiverFlavor::Never(chan) => chan.len(),
}
}
pub fn capacity(&self) -> Option<usize> {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.capacity(),
ReceiverFlavor::List(chan) => chan.capacity(),
ReceiverFlavor::Zero(chan) => chan.capacity(),
ReceiverFlavor::After(chan) => chan.capacity(),
ReceiverFlavor::Tick(chan) => chan.capacity(),
ReceiverFlavor::Never(chan) => chan.capacity(),
}
}
pub fn iter(&self) -> Iter<T> {
Iter { receiver: self }
}
pub fn try_iter(&self) -> TryIter<T> {
TryIter { receiver: self }
}
pub fn same_channel(&self, other: &Receiver<T>) -> bool {
match (&self.flavor, &other.flavor) {
(ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b,
(ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b,
(ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b,
(ReceiverFlavor::After(a), ReceiverFlavor::After(b)) => Arc::ptr_eq(a, b),
(ReceiverFlavor::Tick(a), ReceiverFlavor::Tick(b)) => Arc::ptr_eq(a, b),
(ReceiverFlavor::Never(_), ReceiverFlavor::Never(_)) => true,
_ => false,
}
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
unsafe {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()),
ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect()),
ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
ReceiverFlavor::After(_) => {}
ReceiverFlavor::Tick(_) => {}
ReceiverFlavor::Never(_) => {}
}
}
}
}
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Self {
let flavor = match &self.flavor {
ReceiverFlavor::Array(chan) => ReceiverFlavor::Array(chan.acquire()),
ReceiverFlavor::List(chan) => ReceiverFlavor::List(chan.acquire()),
ReceiverFlavor::Zero(chan) => ReceiverFlavor::Zero(chan.acquire()),
ReceiverFlavor::After(chan) => ReceiverFlavor::After(chan.clone()),
ReceiverFlavor::Tick(chan) => ReceiverFlavor::Tick(chan.clone()),
ReceiverFlavor::Never(_) => ReceiverFlavor::Never(flavors::never::Channel::new()),
};
Receiver { flavor }
}
}
impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("Receiver { .. }")
}
}
impl<'a, T> IntoIterator for &'a Receiver<T> {
type Item = T;
type IntoIter = Iter<'a, T>;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
impl<T> IntoIterator for Receiver<T> {
type Item = T;
type IntoIter = IntoIter<T>;
fn into_iter(self) -> Self::IntoIter {
IntoIter { receiver: self }
}
}
pub struct Iter<'a, T: 'a> {
receiver: &'a Receiver<T>,
}
impl<'a, T> FusedIterator for Iter<'a, T> {}
impl<'a, T> Iterator for Iter<'a, T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.receiver.recv().ok()
}
}
impl<'a, T> fmt::Debug for Iter<'a, T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("Iter { .. }")
}
}
pub struct TryIter<'a, T: 'a> {
receiver: &'a Receiver<T>,
}
impl<'a, T> Iterator for TryIter<'a, T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.receiver.try_recv().ok()
}
}
impl<'a, T> fmt::Debug for TryIter<'a, T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("TryIter { .. }")
}
}
pub struct IntoIter<T> {
receiver: Receiver<T>,
}
impl<T> FusedIterator for IntoIter<T> {}
impl<T> Iterator for IntoIter<T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.receiver.recv().ok()
}
}
impl<T> fmt::Debug for IntoIter<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("IntoIter { .. }")
}
}
impl<T> SelectHandle for Sender<T> {
fn try_select(&self, token: &mut Token) -> bool {
match &self.flavor {
SenderFlavor::Array(chan) => chan.sender().try_select(token),
SenderFlavor::List(chan) => chan.sender().try_select(token),
SenderFlavor::Zero(chan) => chan.sender().try_select(token),
}
}
fn deadline(&self) -> Option<Instant> {
None
}
fn register(&self, oper: Operation, cx: &Context) -> bool {
match &self.flavor {
SenderFlavor::Array(chan) => chan.sender().register(oper, cx),
SenderFlavor::List(chan) => chan.sender().register(oper, cx),
SenderFlavor::Zero(chan) => chan.sender().register(oper, cx),
}
}
fn unregister(&self, oper: Operation) {
match &self.flavor {
SenderFlavor::Array(chan) => chan.sender().unregister(oper),
SenderFlavor::List(chan) => chan.sender().unregister(oper),
SenderFlavor::Zero(chan) => chan.sender().unregister(oper),
}
}
fn accept(&self, token: &mut Token, cx: &Context) -> bool {
match &self.flavor {
SenderFlavor::Array(chan) => chan.sender().accept(token, cx),
SenderFlavor::List(chan) => chan.sender().accept(token, cx),
SenderFlavor::Zero(chan) => chan.sender().accept(token, cx),
}
}
fn is_ready(&self) -> bool {
match &self.flavor {
SenderFlavor::Array(chan) => chan.sender().is_ready(),
SenderFlavor::List(chan) => chan.sender().is_ready(),
SenderFlavor::Zero(chan) => chan.sender().is_ready(),
}
}
fn watch(&self, oper: Operation, cx: &Context) -> bool {
match &self.flavor {
SenderFlavor::Array(chan) => chan.sender().watch(oper, cx),
SenderFlavor::List(chan) => chan.sender().watch(oper, cx),
SenderFlavor::Zero(chan) => chan.sender().watch(oper, cx),
}
}
fn unwatch(&self, oper: Operation) {
match &self.flavor {
SenderFlavor::Array(chan) => chan.sender().unwatch(oper),
SenderFlavor::List(chan) => chan.sender().unwatch(oper),
SenderFlavor::Zero(chan) => chan.sender().unwatch(oper),
}
}
}
impl<T> SelectHandle for Receiver<T> {
fn try_select(&self, token: &mut Token) -> bool {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.receiver().try_select(token),
ReceiverFlavor::List(chan) => chan.receiver().try_select(token),
ReceiverFlavor::Zero(chan) => chan.receiver().try_select(token),
ReceiverFlavor::After(chan) => chan.try_select(token),
ReceiverFlavor::Tick(chan) => chan.try_select(token),
ReceiverFlavor::Never(chan) => chan.try_select(token),
}
}
fn deadline(&self) -> Option<Instant> {
match &self.flavor {
ReceiverFlavor::Array(_) => None,
ReceiverFlavor::List(_) => None,
ReceiverFlavor::Zero(_) => None,
ReceiverFlavor::After(chan) => chan.deadline(),
ReceiverFlavor::Tick(chan) => chan.deadline(),
ReceiverFlavor::Never(chan) => chan.deadline(),
}
}
fn register(&self, oper: Operation, cx: &Context) -> bool {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.receiver().register(oper, cx),
ReceiverFlavor::List(chan) => chan.receiver().register(oper, cx),
ReceiverFlavor::Zero(chan) => chan.receiver().register(oper, cx),
ReceiverFlavor::After(chan) => chan.register(oper, cx),
ReceiverFlavor::Tick(chan) => chan.register(oper, cx),
ReceiverFlavor::Never(chan) => chan.register(oper, cx),
}
}
fn unregister(&self, oper: Operation) {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.receiver().unregister(oper),
ReceiverFlavor::List(chan) => chan.receiver().unregister(oper),
ReceiverFlavor::Zero(chan) => chan.receiver().unregister(oper),
ReceiverFlavor::After(chan) => chan.unregister(oper),
ReceiverFlavor::Tick(chan) => chan.unregister(oper),
ReceiverFlavor::Never(chan) => chan.unregister(oper),
}
}
fn accept(&self, token: &mut Token, cx: &Context) -> bool {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.receiver().accept(token, cx),
ReceiverFlavor::List(chan) => chan.receiver().accept(token, cx),
ReceiverFlavor::Zero(chan) => chan.receiver().accept(token, cx),
ReceiverFlavor::After(chan) => chan.accept(token, cx),
ReceiverFlavor::Tick(chan) => chan.accept(token, cx),
ReceiverFlavor::Never(chan) => chan.accept(token, cx),
}
}
fn is_ready(&self) -> bool {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.receiver().is_ready(),
ReceiverFlavor::List(chan) => chan.receiver().is_ready(),
ReceiverFlavor::Zero(chan) => chan.receiver().is_ready(),
ReceiverFlavor::After(chan) => chan.is_ready(),
ReceiverFlavor::Tick(chan) => chan.is_ready(),
ReceiverFlavor::Never(chan) => chan.is_ready(),
}
}
fn watch(&self, oper: Operation, cx: &Context) -> bool {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.receiver().watch(oper, cx),
ReceiverFlavor::List(chan) => chan.receiver().watch(oper, cx),
ReceiverFlavor::Zero(chan) => chan.receiver().watch(oper, cx),
ReceiverFlavor::After(chan) => chan.watch(oper, cx),
ReceiverFlavor::Tick(chan) => chan.watch(oper, cx),
ReceiverFlavor::Never(chan) => chan.watch(oper, cx),
}
}
fn unwatch(&self, oper: Operation) {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.receiver().unwatch(oper),
ReceiverFlavor::List(chan) => chan.receiver().unwatch(oper),
ReceiverFlavor::Zero(chan) => chan.receiver().unwatch(oper),
ReceiverFlavor::After(chan) => chan.unwatch(oper),
ReceiverFlavor::Tick(chan) => chan.unwatch(oper),
ReceiverFlavor::Never(chan) => chan.unwatch(oper),
}
}
}
pub unsafe fn write<T>(s: &Sender<T>, token: &mut Token, msg: T) -> Result<(), T> {
match &s.flavor {
SenderFlavor::Array(chan) => chan.write(token, msg),
SenderFlavor::List(chan) => chan.write(token, msg),
SenderFlavor::Zero(chan) => chan.write(token, msg),
}
}
pub unsafe fn read<T>(r: &Receiver<T>, token: &mut Token) -> Result<T, ()> {
match &r.flavor {
ReceiverFlavor::Array(chan) => chan.read(token),
ReceiverFlavor::List(chan) => chan.read(token),
ReceiverFlavor::Zero(chan) => chan.read(token),
ReceiverFlavor::After(chan) => {
mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token))
}
ReceiverFlavor::Tick(chan) => {
mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token))
}
ReceiverFlavor::Never(chan) => chan.read(token),
}
}