extern crate futures;
extern crate index_queue;
extern crate vec_arena;
extern crate void;
use std::fmt;
use std::cell::RefCell;
use std::rc::{self, Rc};
use std::sync::{Arc, Mutex};
use futures::executor::{self, Spawn, Unpark};
use futures::{Async, Future, Poll, future, task};
use index_queue::IndexQueue;
use vec_arena::Arena;
use void::Void;
struct DebugWith<F>(F);
impl<F: Fn(&mut fmt::Formatter) -> fmt::Result> fmt::Debug for DebugWith<F> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.0(f)
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
struct SpawnId(usize);
impl SpawnId {
fn from_queue_index(queue_index: usize) -> Self {
debug_assert_ne!(queue_index, !0);
SpawnId(queue_index)
}
fn to_queue_index(self) -> usize {
self.0
}
fn main() -> Self {
Self::from_queue_index(0)
}
fn aux(aux_index: usize) -> Self {
debug_assert!(aux_index < !0 - 1);
Self::from_queue_index(aux_index + 1)
}
fn to_aux(self) -> Option<usize> {
if self == Self::main() {
None
} else {
Some(self.to_queue_index() - 1)
}
}
}
struct TicketInner {
id: SpawnId,
queue: Option<Arc<Mutex<IndexQueue>>>,
}
impl fmt::Debug for TicketInner {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let name = if self.queue.is_none() {
"TicketInner[inactive]"
} else {
"TicketInner"
};
f.debug_tuple(name)
.field(&self.id.to_queue_index())
.finish()
}
}
#[derive(Debug)]
struct Ticket(Mutex<TicketInner>);
impl Ticket {
fn deactivate(&self) {
let inner = self.0.lock().unwrap();
inner.queue.as_ref().map(|queue| {
queue.lock().unwrap().remove(inner.id.to_queue_index());
});
}
}
impl Unpark for Ticket {
fn unpark(&self) {
let inner = self.0.lock().unwrap();
inner.queue.as_ref().map(|queue| {
queue.lock().unwrap().push_back(inner.id.to_queue_index());
});
}
}
struct Spawned<F> {
spawn: Spawn<F>,
ticket: Arc<Ticket>,
}
impl<F> fmt::Debug for Spawned<F> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_tuple("Spawned")
.field(&self.ticket)
.finish()
}
}
type SpawnedBox<'a> = Spawned<Box<Future<Item=(), Error=Void> + 'a>>;
#[derive(Default)]
struct Inner<'a> {
spawns: Arena<Option<SpawnedBox<'a>>>,
queue: Arc<Mutex<IndexQueue>>,
}
impl<'a> Inner<'a> {
fn new_ticket(&self, id: SpawnId) -> Arc<Ticket> {
let ticket = Arc::new(Ticket(Mutex::new(TicketInner {
id: id,
queue: Some(self.queue.clone()),
})));
ticket.unpark();
ticket
}
}
impl<'a> fmt::Debug for Inner<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Inner")
.field("spawns", &DebugWith(|f: &mut fmt::Formatter| {
f.debug_list().entries(self.spawns.iter().map(|(i, _)| i))
.finish()
}))
.field("queue", &self.queue)
.finish()
}
}
#[derive(Debug, Clone)]
pub struct Handle<'a>(rc::Weak<RefCell<Inner<'a>>>);
impl<'a> Handle<'a> {
pub fn spawn<F: Future<Item=(), Error=Void> + 'a>(&self, f: F) {
let inner = match self.0.upgrade() {
Some(inner) => inner,
None => return,
};
let mut inner = inner.borrow_mut();
let aux = inner.spawns.insert(None);
let ticket = inner.new_ticket(SpawnId::aux(aux));
inner.spawns[aux] = Some(Spawned {
spawn: executor::spawn(Box::new(f) as Box<_>),
ticket: ticket,
});
}
}
fn yield_turn<T, E>(status: Option<Poll<T, E>>) -> Poll<T, E> {
let result = status.unwrap_or(Ok(Async::NotReady));
if let Ok(Async::NotReady) = result {
task::park().unpark();
}
result
}
#[derive(Debug)]
pub struct RunFuture<'b, 'a: 'b, F> {
core: &'b mut Core<'a>,
spawned: Spawned<F>,
}
impl<'b, 'a, F: Future> RunFuture<'b, 'a, F> {
pub fn run(&mut self) -> Result<F::Item, F::Error> {
loop {
match self.turn().unwrap_or(Ok(Async::NotReady))? {
Async::Ready(x) => return Ok(x),
Async::NotReady => continue,
}
}
}
pub fn turn(&mut self) -> Option<Poll<F::Item, F::Error>> {
self.core.turn_with(Ok(&mut self.spawned))
}
}
impl<'b, 'a, F: Future> Future for RunFuture<'b, 'a, F> {
type Item = F::Item;
type Error = F::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
yield_turn(self.turn())
}
}
#[derive(Debug, Default)]
pub struct Core<'a>(Rc<RefCell<Inner<'a>>>);
impl<'a> Core<'a> {
pub fn handle(&self) -> Handle<'a> {
Handle(Rc::downgrade(&self.0))
}
pub fn run<F: Future>(&mut self, f: F) -> Result<F::Item, F::Error> {
self.run_future(f).run()
}
pub fn run_future<'b, F: Future>(&'b mut self, f: F)
-> RunFuture<'b, 'a, F> {
let ticket = {
let inner = self.0.borrow();
let id = SpawnId::main();
inner.queue.lock().unwrap().remove(id.to_queue_index());
inner.new_ticket(id)
};
RunFuture {
core: self,
spawned: Spawned {
spawn: executor::spawn(f),
ticket: ticket,
},
}
}
pub fn turn(&mut self) -> Option<Poll<(), Void>> {
self.turn_with::<future::Empty<(), Void>>(Err(()))
}
fn turn_with<F: Future>(&mut self, main: Result<&mut Spawned<F>, F::Item>)
-> Option<Poll<F::Item, F::Error>> {
let index = {
let inner = self.0.borrow();
let popped = inner.queue.lock().unwrap().pop_front();
match popped {
None => return if inner.spawns.is_empty() {
match main {
Err(item) => Some(Ok(Async::Ready(item))),
Ok(_) => None
}
} else {
None
},
Some(index) => index,
}
};
match SpawnId::from_queue_index(index).to_aux() {
None => {
match main {
Err(_) => Some(Ok(Async::NotReady)),
Ok(main) => {
let ticket = main.ticket.clone();
let poll = main.spawn.poll_future(ticket);
if let Ok(Async::Ready(_)) = poll {
main.ticket.deactivate();
}
Some(poll)
}
}
}
Some(aux) => {
let spawned = self.0.borrow_mut().spawns.get_mut(aux)
.and_then(|x| x.take());
if let Some(mut spawned) = spawned {
let ticket = spawned.ticket.clone();
let poll = spawned.spawn.poll_future(ticket);
let mut inner = self.0.borrow_mut();
if let Ok(Async::Ready(())) = poll {
spawned.ticket.deactivate();
inner.spawns.remove(aux);
} else {
inner.spawns[aux] = Some(spawned);
}
} else {
self.0.borrow_mut().spawns.remove(aux);
}
Some(Ok(Async::NotReady))
}
}
}
}
impl<'a> Future for Core<'a> {
type Item = ();
type Error = Void;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
yield_turn(self.turn())
}
}