futures 0.1.19

An implementation of futures and streams featuring zero allocations, composability, and iterator-like interfaces.
Documentation
//! Execution of futures on a single thread
//!
//! This module has no special handling of any blocking operations other than
//! futures-aware inter-thread communications, and is not intended to be used to
//! manage I/O. For futures that do I/O you'll likely want to use `tokio-core`.

use std::cell::{Cell, RefCell};
use std::sync::{Arc, Mutex, mpsc};

use futures::executor::{self, Spawn, Notify};
use futures::future::{Executor, ExecuteError};
use futures::{Future, Async};

/// Main loop object
pub struct Core {
    tx: mpsc::Sender<usize>,
    rx: mpsc::Receiver<usize>,
    notify: Arc<MyNotify>,

    // Slab of running futures used to track what's running and what slots are
    // empty. Slot indexes are then sent along tx/rx above to indicate which
    // future is ready to get polled.
    tasks: RefCell<Vec<Slot>>,
    next_vacant: Cell<usize>,
}

enum Slot {
    Vacant { next_vacant: usize },
    Running(Option<Spawn<Box<Future<Item = (), Error = ()>>>>),
}

impl Core {
    /// Create a new `Core`.
    pub fn new() -> Self {
        let (tx, rx) = mpsc::channel();
        Core {
            notify: Arc::new(MyNotify {
                tx: Mutex::new(tx.clone()),
            }),
            tx: tx,
            rx: rx,
            next_vacant: Cell::new(0),
            tasks: RefCell::new(Vec::new()),
        }
    }

    /// Spawn a future to be executed by a future call to `run`.
    ///
    /// The future `f` provided will not be executed until `run` is called
    /// below. While futures passed to `run` are executing, the future provided
    /// here will be executed concurrently as well.
    pub fn spawn<F>(&self, f: F)
        where F: Future<Item=(), Error=()> + 'static
    {
        let idx = self.next_vacant.get();
        let mut tasks = self.tasks.borrow_mut();
        match tasks.get_mut(idx) {
            Some(&mut Slot::Vacant { next_vacant }) => {
                self.next_vacant.set(next_vacant);
            }
            Some(&mut Slot::Running (_)) => {
                panic!("vacant points to running future")
            }
            None => {
                assert_eq!(idx, tasks.len());
                tasks.push(Slot::Vacant { next_vacant: 0 });
                self.next_vacant.set(idx + 1);
            }
        }
        tasks[idx] = Slot::Running(Some(executor::spawn(Box::new(f))));
        self.tx.send(idx).unwrap();
    }

    /// Run the loop until the future `f` completes.
    ///
    /// This method will block the current thread until the future `f` has
    /// resolved. While waiting on `f` to finish it will also execute any
    /// futures spawned via `spawn` above.
    pub fn run<F>(&self, f: F) -> Result<F::Item, F::Error>
        where F: Future,
    {
        let id = usize::max_value();
        self.tx.send(id).unwrap();
        let mut f = executor::spawn(f);
        loop {
            if self.turn() {
                match f.poll_future_notify(&self.notify, id)? {
                    Async::Ready(e) => return Ok(e),
                    Async::NotReady => {}
                }
            }
        }
    }

    /// "Turns" this event loop one tick.
    ///
    /// This'll block the current thread until something happens, and once an
    /// event happens this will act on that event.
    ///
    /// # Return value
    ///
    /// Returns `true` if the future passed to `run` should be polled or `false`
    /// otherwise.
    fn turn(&self) -> bool {
        let task_id = self.rx.recv().unwrap();
        if task_id == usize::max_value() {
            return true
        }

        // This may be a spurious wakeup so we're not guaranteed to have a
        // future associated with `task_id`, so do a fallible lookup.
        //
        // Note that we don't want to borrow `self.tasks` for too long so we
        // try to extract the future here and leave behind a tombstone future
        // which'll get replaced or removed later. This is how we support
        // spawn-in-run.
        let mut future = match self.tasks.borrow_mut().get_mut(task_id) {
            Some(&mut Slot::Running(ref mut future)) => future.take().unwrap(),
            Some(&mut Slot::Vacant { .. }) => return false,
            None => return false,
        };

        // Drive this future forward. If it's done we remove it and if it's not
        // done then we put it back in the tasks array.
        let done = match future.poll_future_notify(&self.notify, task_id) {
            Ok(Async::Ready(())) | Err(()) => true,
            Ok(Async::NotReady) => false,
        };
        let mut tasks = self.tasks.borrow_mut();
        if done {
            tasks[task_id] = Slot::Vacant { next_vacant: self.next_vacant.get() };
            self.next_vacant.set(task_id);
        } else {
            tasks[task_id] = Slot::Running(Some(future));
        }

        return false
    }
}

impl<F> Executor<F> for Core
    where F: Future<Item = (), Error = ()> + 'static,
{
    fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
        self.spawn(future);
        Ok(())
    }
}

struct MyNotify {
    // TODO: it's pretty unfortunate to use a `Mutex` here where the `Sender`
    //       itself is basically `Sync` as-is. Ideally this'd use something like
    //       an off-the-shelf mpsc queue as well as `thread::park` and
    //       `Thread::unpark`.
    tx: Mutex<mpsc::Sender<usize>>,
}

impl Notify for MyNotify {
    fn notify(&self, id: usize) {
        drop(self.tx.lock().unwrap().send(id));
    }
}