use std::fmt;
use std::error::Error;
use std::any::Any;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::{Arc, Mutex};
use std::thread;
use std::usize;
use sync::mpsc::queue::{Queue, PopResult};
use task::{self, Task};
use {Async, AsyncSink, Poll, StartSend, Sink, Stream};
mod queue;
pub struct Sender<T> {
inner: Arc<Inner<T>>,
sender_task: SenderTask,
maybe_parked: bool,
}
pub struct UnboundedSender<T>(Sender<T>);
fn _assert_kinds() {
fn _assert_send<T: Send>() {}
fn _assert_sync<T: Sync>() {}
fn _assert_clone<T: Clone>() {}
_assert_send::<UnboundedSender<u32>>();
_assert_sync::<UnboundedSender<u32>>();
_assert_clone::<UnboundedSender<u32>>();
}
pub struct Receiver<T> {
inner: Arc<Inner<T>>,
}
pub struct UnboundedReceiver<T>(Receiver<T>);
pub struct SendError<T>(T);
impl<T> fmt::Debug for SendError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_tuple("SendError")
.field(&"...")
.finish()
}
}
impl<T> fmt::Display for SendError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "send failed because receiver is gone")
}
}
impl<T: Any> Error for SendError<T>
{
fn description(&self) -> &str {
"send failed because receiver is gone"
}
}
impl<T> SendError<T> {
pub fn into_inner(self) -> T {
self.0
}
}
struct Inner<T> {
buffer: Option<usize>,
state: AtomicUsize,
message_queue: Queue<Option<T>>,
parked_queue: Queue<SenderTask>,
num_senders: AtomicUsize,
recv_task: Mutex<ReceiverTask>,
}
#[derive(Debug, Clone, Copy)]
struct State {
is_open: bool,
num_messages: usize,
}
struct ReceiverTask {
unparked: bool,
task: Option<Task>,
}
enum TryPark {
Parked,
Closed,
NotEmpty,
}
const OPEN_MASK: usize = 1 << 31;
const INIT_STATE: usize = OPEN_MASK;
const MAX_CAPACITY: usize = !(OPEN_MASK);
const MAX_BUFFER: usize = MAX_CAPACITY >> 1;
type SenderTask = Arc<Mutex<Option<Task>>>;
pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
assert!(buffer < MAX_BUFFER, "requested buffer size too large");
channel2(Some(buffer))
}
pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
let (tx, rx) = channel2(None);
(UnboundedSender(tx), UnboundedReceiver(rx))
}
fn channel2<T>(buffer: Option<usize>) -> (Sender<T>, Receiver<T>) {
let inner = Arc::new(Inner {
buffer: buffer,
state: AtomicUsize::new(INIT_STATE),
message_queue: Queue::new(),
parked_queue: Queue::new(),
num_senders: AtomicUsize::new(1),
recv_task: Mutex::new(ReceiverTask {
unparked: false,
task: None,
}),
});
let tx = Sender {
inner: inner.clone(),
sender_task: Arc::new(Mutex::new(None)),
maybe_parked: false,
};
let rx = Receiver {
inner: inner,
};
(tx, rx)
}
impl<T> Sender<T> {
fn do_send(&mut self, msg: Option<T>, can_park: bool) -> Result<(), SendError<T>> {
let park_self = match self.inc_num_messages(msg.is_none()) {
Some(park_self) => park_self,
None => {
if let Some(msg) = msg {
return Err(SendError(msg));
} else {
return Ok(());
}
}
};
if park_self {
self.park(can_park);
}
self.queue_push_and_signal(msg);
Ok(())
}
fn do_send_nb(&self, msg: T) -> Result<(), SendError<T>> {
match self.inc_num_messages(false) {
Some(park_self) => assert!(!park_self),
None => return Err(SendError(msg)),
};
self.queue_push_and_signal(Some(msg));
Ok(())
}
fn queue_push_and_signal(&self, msg: Option<T>) {
self.inner.message_queue.push(msg);
self.signal();
}
fn inc_num_messages(&self, close: bool) -> Option<bool> {
let mut curr = self.inner.state.load(SeqCst);
loop {
let mut state = decode_state(curr);
if !state.is_open {
return None;
}
assert!(state.num_messages < MAX_CAPACITY, "buffer space exhausted; \
sending this messages would overflow the state");
state.num_messages += 1;
if close {
state.is_open = false;
}
let next = encode_state(&state);
match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
Ok(_) => {
let park_self = match self.inner.buffer {
Some(buffer) => state.num_messages > buffer,
None => false,
};
return Some(park_self)
}
Err(actual) => curr = actual,
}
}
}
fn signal(&self) {
let task = {
let mut recv_task = self.inner.recv_task.lock().unwrap();
if recv_task.unparked {
return;
}
recv_task.unparked = true;
recv_task.task.take()
};
if let Some(task) = task {
task.unpark();
}
}
fn park(&mut self, can_park: bool) {
let task = if can_park {
Some(task::park())
} else {
None
};
*self.sender_task.lock().unwrap() = task;
let t = self.sender_task.clone();
self.inner.parked_queue.push(t);
let state = decode_state(self.inner.state.load(SeqCst));
self.maybe_parked = state.is_open;
}
fn poll_unparked(&mut self) -> Async<()> {
if self.maybe_parked {
let mut task = self.sender_task.lock().unwrap();
if task.is_none() {
self.maybe_parked = false;
return Async::Ready(())
}
*task = Some(task::park());
Async::NotReady
} else {
Async::Ready(())
}
}
}
impl<T> Sink for Sender<T> {
type SinkItem = T;
type SinkError = SendError<T>;
fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> {
if !self.poll_unparked().is_ready() {
return Ok(AsyncSink::NotReady(msg));
}
try!(self.do_send(Some(msg), true));
Ok(AsyncSink::Ready)
}
fn poll_complete(&mut self) -> Poll<(), SendError<T>> {
Ok(Async::Ready(()))
}
}
impl<T> UnboundedSender<T> {
pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
self.0.do_send_nb(msg)
}
}
impl<T> Sink for UnboundedSender<T> {
type SinkItem = T;
type SinkError = SendError<T>;
fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> {
self.0.start_send(msg)
}
fn poll_complete(&mut self) -> Poll<(), SendError<T>> {
self.0.poll_complete()
}
}
impl<'a, T> Sink for &'a UnboundedSender<T> {
type SinkItem = T;
type SinkError = SendError<T>;
fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> {
try!(self.0.do_send_nb(msg));
Ok(AsyncSink::Ready)
}
fn poll_complete(&mut self) -> Poll<(), SendError<T>> {
Ok(Async::Ready(()))
}
}
impl<T> Clone for UnboundedSender<T> {
fn clone(&self) -> UnboundedSender<T> {
UnboundedSender(self.0.clone())
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Sender<T> {
let mut curr = self.inner.num_senders.load(SeqCst);
loop {
if curr == self.inner.max_senders() {
panic!("cannot clone `Sender` -- too many outstanding senders");
}
debug_assert!(curr < self.inner.max_senders());
let next = curr + 1;
let actual = self.inner.num_senders.compare_and_swap(curr, next, SeqCst);
if actual == curr {
return Sender {
inner: self.inner.clone(),
sender_task: Arc::new(Mutex::new(None)),
maybe_parked: false,
};
}
curr = actual;
}
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
if prev == 1 {
let _ = self.do_send(None, false);
}
}
}
impl<T> Receiver<T> {
pub fn close(&mut self) {
let mut curr = self.inner.state.load(SeqCst);
loop {
let mut state = decode_state(curr);
if !state.is_open {
break
}
state.is_open = false;
let next = encode_state(&state);
match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
Ok(_) => break,
Err(actual) => curr = actual,
}
}
loop {
match unsafe { self.inner.parked_queue.pop() } {
PopResult::Data(task) => {
let task = task.lock().unwrap().take();
if let Some(task) = task {
task.unpark();
}
}
PopResult::Empty => break,
PopResult::Inconsistent => thread::yield_now(),
}
}
}
fn next_message(&mut self) -> Async<Option<T>> {
loop {
match unsafe { self.inner.message_queue.pop() } {
PopResult::Data(msg) => {
return Async::Ready(msg);
}
PopResult::Empty => {
return Async::NotReady;
}
PopResult::Inconsistent => {
thread::yield_now();
}
}
}
}
fn unpark_one(&mut self) {
loop {
match unsafe { self.inner.parked_queue.pop() } {
PopResult::Data(task) => {
let task = task.lock().unwrap().take();
if let Some(task) = task {
task.unpark();
}
return;
}
PopResult::Empty => {
return;
}
PopResult::Inconsistent => {
thread::yield_now();
}
}
}
}
fn try_park(&self) -> TryPark {
let curr = self.inner.state.load(SeqCst);
let state = decode_state(curr);
if !state.is_open && state.num_messages == 0 {
return TryPark::Closed;
}
let mut recv_task = self.inner.recv_task.lock().unwrap();
if recv_task.unparked {
recv_task.unparked = false;
return TryPark::NotEmpty;
}
recv_task.task = Some(task::park());
TryPark::Parked
}
fn dec_num_messages(&self) {
let mut curr = self.inner.state.load(SeqCst);
loop {
let mut state = decode_state(curr);
state.num_messages -= 1;
let next = encode_state(&state);
match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
Ok(_) => break,
Err(actual) => curr = actual,
}
}
}
}
impl<T> Stream for Receiver<T> {
type Item = T;
type Error = ();
fn poll(&mut self) -> Poll<Option<T>, ()> {
loop {
let msg = match self.next_message() {
Async::Ready(msg) => msg,
Async::NotReady => {
match self.try_park() {
TryPark::Parked => {
return Ok(Async::NotReady);
}
TryPark::Closed => {
return Ok(Async::Ready(None));
}
TryPark::NotEmpty => {
continue;
}
}
}
};
self.unpark_one();
self.dec_num_messages();
return Ok(Async::Ready(msg));
}
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
self.close();
while self.next_message().is_ready() {
}
}
}
impl<T> UnboundedReceiver<T> {
pub fn close(&mut self) {
self.0.close();
}
}
impl<T> Stream for UnboundedReceiver<T> {
type Item = T;
type Error = ();
fn poll(&mut self) -> Poll<Option<T>, ()> {
self.0.poll()
}
}
impl<T> Inner<T> {
fn max_senders(&self) -> usize {
match self.buffer {
Some(buffer) => MAX_CAPACITY - buffer,
None => MAX_BUFFER,
}
}
}
unsafe impl<T: Send> Send for Inner<T> {}
unsafe impl<T: Send> Sync for Inner<T> {}
fn decode_state(num: usize) -> State {
State {
is_open: num & OPEN_MASK == OPEN_MASK,
num_messages: num & MAX_CAPACITY,
}
}
fn encode_state(state: &State) -> usize {
let mut num = state.num_messages;
if state.is_open {
num |= OPEN_MASK;
}
num
}