#![allow(dead_code)]
use std::any::Any;
use std::cell::RefCell;
use std::collections::VecDeque;
use std::error::Error;
use std::fmt;
use std::mem;
use std::rc::{Rc, Weak};
use futures::task::{self, Task};
use futures::{Async, AsyncSink, Poll, StartSend, Sink, Stream};
pub fn channel<T>(buffer: usize) -> Receiver<T> {
channel_(Some(buffer))
}
fn channel_<T>(buffer: Option<usize>) -> Receiver<T> {
let shared = Rc::new(RefCell::new(Shared {
buffer: VecDeque::new(),
capacity: buffer,
blocked_senders: VecDeque::new(),
blocked_recv: None,
sender_count: 0,
}));
Receiver { state: State::Open(shared) }
}
#[derive(Debug)]
struct Shared<T> {
buffer: VecDeque<T>,
capacity: Option<usize>,
blocked_senders: VecDeque<Task>,
blocked_recv: Option<Task>,
sender_count: usize,
}
#[derive(Debug)]
pub struct Sender<T> {
shared: Weak<RefCell<Shared<T>>>,
}
impl<T> Sender<T> {
pub fn connected(&self) -> bool {
match self.shared.upgrade() {
Some(_) => true,
None => false,
}
}
fn do_send(&self, msg: T) -> StartSend<T, SendError<T>> {
let shared = match self.shared.upgrade() {
Some(shared) => shared,
None => return Err(SendError(msg)),
};
let mut shared = shared.borrow_mut();
match shared.capacity {
Some(capacity) if shared.buffer.len() == capacity => {
shared.blocked_senders.push_back(task::current());
Ok(AsyncSink::NotReady(msg))
}
_ => {
shared.buffer.push_back(msg);
if let Some(task) = shared.blocked_recv.take() {
drop(shared);
task.notify();
}
Ok(AsyncSink::Ready)
}
}
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
let result = Sender { shared: Weak::clone(&self.shared) };
if let Some(shared) = self.shared.upgrade() {
shared.borrow_mut().sender_count += 1;
}
result
}
}
impl<T> Sink for Sender<T> {
type SinkItem = T;
type SinkError = SendError<T>;
fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> {
self.do_send(msg)
}
fn poll_complete(&mut self) -> Poll<(), SendError<T>> {
Ok(Async::Ready(()))
}
fn close(&mut self) -> Poll<(), SendError<T>> {
Ok(Async::Ready(()))
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let shared = match self.shared.upgrade() {
Some(shared) => shared,
None => return,
};
let mut shared = shared.borrow_mut();
shared.sender_count -= 1;
if shared.sender_count == 0 {
if let Some(task) = shared.blocked_recv.take() {
drop(shared);
task.notify();
}
}
}
}
#[derive(Debug)]
pub struct Receiver<T> {
state: State<T>,
}
#[derive(Debug)]
enum State<T> {
Open(Rc<RefCell<Shared<T>>>),
Closed(VecDeque<T>),
}
impl<T> Receiver<T> {
pub fn connected(&mut self) -> bool {
match self.state {
State::Open(ref state) => {
state.borrow().sender_count != 0
}
State::Closed(ref deque) => {
!deque.is_empty()
}
}
}
pub fn sender(&mut self) -> Sender<T> {
let (sender, items) = match self.state {
State::Open(ref state) => {
let sender = Sender { shared: Rc::downgrade(state) };
state.borrow_mut().sender_count += 1;
(Some(sender), None)
}
State::Closed(ref mut buf) => {
let items = mem::replace(buf, VecDeque::new());
(None, Some(items))
}
};
if let Some(items) = items {
let shared = Rc::new(RefCell::new(Shared {
buffer: items,
capacity: None,
blocked_senders: VecDeque::new(),
blocked_recv: None,
sender_count: 1,
}));
let sender = Sender { shared: Rc::downgrade(&shared) };
self.state = State::Open(shared);
sender
} else {
sender.unwrap()
}
}
pub fn close(&mut self) {
let (blockers, items) = match self.state {
State::Open(ref state) => {
let mut state = state.borrow_mut();
let items = mem::replace(&mut state.buffer, VecDeque::new());
let blockers = mem::replace(&mut state.blocked_senders, VecDeque::new());
(blockers, items)
}
State::Closed(_) => return,
};
self.state = State::Closed(items);
for task in blockers {
task.notify();
}
}
pub fn is_closed(&self) -> bool {
match self.state {
State::Closed(_) => true,
_ => false
}
}
}
impl<T> Stream for Receiver<T> {
type Item = T;
type Error = ();
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let me = match self.state {
State::Open(ref mut me) => me,
State::Closed(ref mut items) => {
return Ok(Async::Ready(items.pop_front()))
}
};
if let Some(shared) = Rc::get_mut(me) {
return Ok(Async::Ready(shared.borrow_mut().buffer.pop_front()));
}
let mut shared = me.borrow_mut();
if let Some(msg) = shared.buffer.pop_front() {
if let Some(task) = shared.blocked_senders.pop_front() {
drop(shared);
task.notify();
}
Ok(Async::Ready(Some(msg)))
} else {
shared.blocked_recv = Some(task::current());
Ok(Async::NotReady)
}
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
self.close();
}
}
#[derive(Debug)]
pub struct UnboundedSender<T>(Sender<T>);
impl<T> Clone for UnboundedSender<T> {
fn clone(&self) -> Self {
UnboundedSender(self.0.clone())
}
}
impl<T> Sink for UnboundedSender<T> {
type SinkItem = T;
type SinkError = SendError<T>;
fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> {
self.0.start_send(msg)
}
fn poll_complete(&mut self) -> Poll<(), SendError<T>> {
Ok(Async::Ready(()))
}
fn close(&mut self) -> Poll<(), SendError<T>> {
Ok(Async::Ready(()))
}
}
impl<'a, T> Sink for &'a UnboundedSender<T> {
type SinkItem = T;
type SinkError = SendError<T>;
fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> {
self.0.do_send(msg)
}
fn poll_complete(&mut self) -> Poll<(), SendError<T>> {
Ok(Async::Ready(()))
}
fn close(&mut self) -> Poll<(), SendError<T>> {
Ok(Async::Ready(()))
}
}
impl<T> UnboundedSender<T> {
pub fn connected(&self) -> bool {
self.0.connected()
}
pub fn unbounded_send(&self, msg: T) -> Result<(), SendError<T>> {
let shared = match self.0.shared.upgrade() {
Some(shared) => shared,
None => return Err(SendError(msg)),
};
let mut shared = shared.borrow_mut();
shared.buffer.push_back(msg);
if let Some(task) = shared.blocked_recv.take() {
drop(shared);
task.notify();
}
Ok(())
}
}
#[derive(Debug)]
pub struct UnboundedReceiver<T>(Receiver<T>);
impl<T> UnboundedReceiver<T> {
pub fn connected(&mut self) -> bool {
self.0.connected()
}
pub fn sender(&mut self) -> UnboundedSender<T> {
UnboundedSender(self.0.sender())
}
pub fn close(&mut self) {
self.0.close();
}
pub fn is_closed(&self) -> bool {
self.0.is_closed()
}
}
impl<T> Stream for UnboundedReceiver<T> {
type Item = T;
type Error = ();
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.0.poll()
}
}
pub fn unbounded<T>() -> UnboundedReceiver<T> {
UnboundedReceiver(channel_(None))
}
pub struct SendError<T>(T);
impl<T> fmt::Debug for SendError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_tuple("SendError")
.field(&"...")
.finish()
}
}
impl<T> fmt::Display for SendError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "send failed because receiver is gone")
}
}
impl<T: Any> Error for SendError<T> {
fn description(&self) -> &str {
"send failed because receiver is gone"
}
}
impl<T> SendError<T> {
pub fn into_inner(self) -> T {
self.0
}
}