use std::cell::UnsafeCell;
use std::fmt::{self, Debug};
use std::iter::FromIterator;
use std::marker::PhantomData;
use std::mem;
use std::ptr;
use std::sync::atomic::Ordering::{Relaxed, SeqCst, Acquire, Release, AcqRel};
use std::sync::atomic::{AtomicPtr, AtomicBool};
use std::sync::{Arc, Weak};
use std::usize;
use {task, Stream, Future, Poll, Async};
use executor::{Notify, UnsafeNotify, NotifyHandle};
use task_impl::{self, AtomicTask};
#[must_use = "streams do nothing unless polled"]
pub struct FuturesUnordered<F> {
inner: Arc<Inner<F>>,
len: usize,
head_all: *const Node<F>,
}
unsafe impl<T: Send> Send for FuturesUnordered<T> {}
unsafe impl<T: Sync> Sync for FuturesUnordered<T> {}
#[allow(missing_debug_implementations)]
struct Inner<T> {
parent: AtomicTask,
head_readiness: AtomicPtr<Node<T>>,
tail_readiness: UnsafeCell<*const Node<T>>,
stub: Arc<Node<T>>,
}
struct Node<T> {
future: UnsafeCell<Option<T>>,
next_all: UnsafeCell<*const Node<T>>,
prev_all: UnsafeCell<*const Node<T>>,
next_readiness: AtomicPtr<Node<T>>,
queue: Weak<Inner<T>>,
queued: AtomicBool,
}
enum Dequeue<T> {
Data(*const Node<T>),
Empty,
Inconsistent,
}
impl<T> FuturesUnordered<T>
where T: Future,
{
pub fn new() -> FuturesUnordered<T> {
let stub = Arc::new(Node {
future: UnsafeCell::new(None),
next_all: UnsafeCell::new(ptr::null()),
prev_all: UnsafeCell::new(ptr::null()),
next_readiness: AtomicPtr::new(ptr::null_mut()),
queued: AtomicBool::new(true),
queue: Weak::new(),
});
let stub_ptr = &*stub as *const Node<T>;
let inner = Arc::new(Inner {
parent: AtomicTask::new(),
head_readiness: AtomicPtr::new(stub_ptr as *mut _),
tail_readiness: UnsafeCell::new(stub_ptr),
stub: stub,
});
FuturesUnordered {
len: 0,
head_all: ptr::null_mut(),
inner: inner,
}
}
}
impl<T> FuturesUnordered<T> {
pub fn len(&self) -> usize {
self.len
}
pub fn is_empty(&self) -> bool {
self.len == 0
}
pub fn push(&mut self, future: T) {
let node = Arc::new(Node {
future: UnsafeCell::new(Some(future)),
next_all: UnsafeCell::new(ptr::null_mut()),
prev_all: UnsafeCell::new(ptr::null_mut()),
next_readiness: AtomicPtr::new(ptr::null_mut()),
queued: AtomicBool::new(true),
queue: Arc::downgrade(&self.inner),
});
let ptr = self.link(node);
self.inner.enqueue(ptr);
}
pub fn iter_mut(&mut self) -> IterMut<T> {
IterMut {
node: self.head_all,
len: self.len,
_marker: PhantomData
}
}
fn release_node(&mut self, node: Arc<Node<T>>) {
let prev = node.queued.swap(true, SeqCst);
unsafe {
drop((*node.future.get()).take());
}
if prev {
mem::forget(node);
}
}
fn link(&mut self, node: Arc<Node<T>>) -> *const Node<T> {
let ptr = arc2ptr(node);
unsafe {
*(*ptr).next_all.get() = self.head_all;
if !self.head_all.is_null() {
*(*self.head_all).prev_all.get() = ptr;
}
}
self.head_all = ptr;
self.len += 1;
return ptr
}
unsafe fn unlink(&mut self, node: *const Node<T>) -> Arc<Node<T>> {
let node = ptr2arc(node);
let next = *node.next_all.get();
let prev = *node.prev_all.get();
*node.next_all.get() = ptr::null_mut();
*node.prev_all.get() = ptr::null_mut();
if !next.is_null() {
*(*next).prev_all.get() = prev;
}
if !prev.is_null() {
*(*prev).next_all.get() = next;
} else {
self.head_all = next;
}
self.len -= 1;
return node
}
}
impl<T> Stream for FuturesUnordered<T>
where T: Future
{
type Item = T::Item;
type Error = T::Error;
fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
self.inner.parent.register();
loop {
let node = match unsafe { self.inner.dequeue() } {
Dequeue::Empty => {
if self.is_empty() {
return Ok(Async::Ready(None));
} else {
return Ok(Async::NotReady)
}
}
Dequeue::Inconsistent => {
task::current().notify();
return Ok(Async::NotReady);
}
Dequeue::Data(node) => node,
};
debug_assert!(node != self.inner.stub());
unsafe {
let mut future = match (*(*node).future.get()).take() {
Some(future) => future,
None => {
let node = ptr2arc(node);
assert!((*node.next_all.get()).is_null());
assert!((*node.prev_all.get()).is_null());
continue
}
};
let prev = (*node).queued.swap(false, SeqCst);
assert!(prev);
struct Bomb<'a, T: 'a> {
queue: &'a mut FuturesUnordered<T>,
node: Option<Arc<Node<T>>>,
}
impl<'a, T> Drop for Bomb<'a, T> {
fn drop(&mut self) {
if let Some(node) = self.node.take() {
self.queue.release_node(node);
}
}
}
let mut bomb = Bomb {
node: Some(self.unlink(node)),
queue: self,
};
let res = {
let notify = NodeToHandle(bomb.node.as_ref().unwrap());
task_impl::with_notify(¬ify, 0, || {
future.poll()
})
};
let ret = match res {
Ok(Async::NotReady) => {
let node = bomb.node.take().unwrap();
*node.future.get() = Some(future);
bomb.queue.link(node);
continue
}
Ok(Async::Ready(e)) => Ok(Async::Ready(Some(e))),
Err(e) => Err(e),
};
return ret
}
}
}
}
impl<T: Debug> Debug for FuturesUnordered<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "FuturesUnordered {{ ... }}")
}
}
impl<T> Drop for FuturesUnordered<T> {
fn drop(&mut self) {
unsafe {
while !self.head_all.is_null() {
let head = self.head_all;
let node = self.unlink(head);
self.release_node(node);
}
}
}
}
impl<F: Future> FromIterator<F> for FuturesUnordered<F> {
fn from_iter<T>(iter: T) -> Self
where T: IntoIterator<Item = F>
{
let mut new = FuturesUnordered::new();
for future in iter.into_iter() {
new.push(future);
}
new
}
}
#[derive(Debug)]
pub struct IterMut<'a, F: 'a> {
node: *const Node<F>,
len: usize,
_marker: PhantomData<&'a mut FuturesUnordered<F>>
}
impl<'a, F> Iterator for IterMut<'a, F> {
type Item = &'a mut F;
fn next(&mut self) -> Option<&'a mut F> {
if self.node.is_null() {
return None;
}
unsafe {
let future = (*(*self.node).future.get()).as_mut().unwrap();
let next = *(*self.node).next_all.get();
self.node = next;
self.len -= 1;
return Some(future);
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.len, Some(self.len))
}
}
impl<'a, F> ExactSizeIterator for IterMut<'a, F> {}
impl<T> Inner<T> {
fn enqueue(&self, node: *const Node<T>) {
unsafe {
debug_assert!((*node).queued.load(Relaxed));
(*node).next_readiness.store(ptr::null_mut(), Relaxed);
let node = node as *mut _;
let prev = self.head_readiness.swap(node, AcqRel);
(*prev).next_readiness.store(node, Release);
}
}
unsafe fn dequeue(&self) -> Dequeue<T> {
let mut tail = *self.tail_readiness.get();
let mut next = (*tail).next_readiness.load(Acquire);
if tail == self.stub() {
if next.is_null() {
return Dequeue::Empty;
}
*self.tail_readiness.get() = next;
tail = next;
next = (*next).next_readiness.load(Acquire);
}
if !next.is_null() {
*self.tail_readiness.get() = next;
debug_assert!(tail != self.stub());
return Dequeue::Data(tail);
}
if self.head_readiness.load(Acquire) as *const _ != tail {
return Dequeue::Inconsistent;
}
self.enqueue(self.stub());
next = (*tail).next_readiness.load(Acquire);
if !next.is_null() {
*self.tail_readiness.get() = next;
return Dequeue::Data(tail);
}
Dequeue::Inconsistent
}
fn stub(&self) -> *const Node<T> {
&*self.stub
}
}
impl<T> Drop for Inner<T> {
fn drop(&mut self) {
unsafe {
loop {
match self.dequeue() {
Dequeue::Empty => break,
Dequeue::Inconsistent => abort("inconsistent in drop"),
Dequeue::Data(ptr) => drop(ptr2arc(ptr)),
}
}
}
}
}
#[allow(missing_debug_implementations)]
struct NodeToHandle<'a, T: 'a>(&'a Arc<Node<T>>);
impl<'a, T> Clone for NodeToHandle<'a, T> {
fn clone(&self) -> Self {
NodeToHandle(self.0)
}
}
impl<'a, T> From<NodeToHandle<'a, T>> for NotifyHandle {
fn from(handle: NodeToHandle<'a, T>) -> NotifyHandle {
unsafe {
let ptr = handle.0.clone();
let ptr = mem::transmute::<Arc<Node<T>>, *mut ArcNode<T>>(ptr);
NotifyHandle::new(hide_lt(ptr))
}
}
}
struct ArcNode<T>(PhantomData<T>);
unsafe impl<T> Send for ArcNode<T> {}
unsafe impl<T> Sync for ArcNode<T> {}
impl<T> Notify for ArcNode<T> {
fn notify(&self, _id: usize) {
unsafe {
let me: *const ArcNode<T> = self;
let me: *const *const ArcNode<T> = &me;
let me = me as *const Arc<Node<T>>;
Node::notify(&*me)
}
}
}
unsafe impl<T> UnsafeNotify for ArcNode<T> {
unsafe fn clone_raw(&self) -> NotifyHandle {
let me: *const ArcNode<T> = self;
let me: *const *const ArcNode<T> = &me;
let me = &*(me as *const Arc<Node<T>>);
NodeToHandle(me).into()
}
unsafe fn drop_raw(&self) {
let mut me: *const ArcNode<T> = self;
let me = &mut me as *mut *const ArcNode<T> as *mut Arc<Node<T>>;
ptr::drop_in_place(me);
}
}
unsafe fn hide_lt<T>(p: *mut ArcNode<T>) -> *mut UnsafeNotify {
mem::transmute(p as *mut UnsafeNotify)
}
impl<T> Node<T> {
fn notify(me: &Arc<Node<T>>) {
let inner = match me.queue.upgrade() {
Some(inner) => inner,
None => return,
};
let prev = me.queued.swap(true, SeqCst);
if !prev {
inner.enqueue(&**me);
inner.parent.notify();
}
}
}
impl<T> Drop for Node<T> {
fn drop(&mut self) {
unsafe {
if (*self.future.get()).is_some() {
abort("future still here when dropping");
}
}
}
}
fn arc2ptr<T>(ptr: Arc<T>) -> *const T {
let addr = &*ptr as *const T;
mem::forget(ptr);
return addr
}
unsafe fn ptr2arc<T>(ptr: *const T) -> Arc<T> {
let anchor = mem::transmute::<usize, Arc<T>>(0x10);
let addr = &*anchor as *const T;
mem::forget(anchor);
let offset = addr as isize - 0x10;
mem::transmute::<isize, Arc<T>>(ptr as isize - offset)
}
fn abort(s: &str) -> ! {
struct DoublePanic;
impl Drop for DoublePanic {
fn drop(&mut self) {
panic!("panicking twice to abort the program");
}
}
let _bomb = DoublePanic;
panic!("{}", s);
}