use std::prelude::v1::*;
use std::cell::{RefCell};
use std::rc::{Rc, Weak};
use futures_core::{Future, Poll, Async, Stream};
use futures_core::task::{Context, Waker, LocalMap};
use futures_core::executor::{Executor, SpawnError};
use futures_core::never::Never;
use futures_util::stream::FuturesUnordered;
use futures_util::stream::StreamExt;
use thread::ThreadNotify;
use enter;
use ThreadPool;
struct Task {
fut: Box<Future<Item = (), Error = Never>>,
map: LocalMap,
}
pub struct LocalPool {
pool: FuturesUnordered<Task>,
incoming: Rc<Incoming>,
}
#[derive(Clone)]
pub struct LocalExecutor {
incoming: Weak<Incoming>,
}
type Incoming = RefCell<Vec<Task>>;
fn run_executor<T, F: FnMut(&Waker) -> Async<T>>(mut f: F) -> T {
let _enter = enter()
.expect("cannot execute `LocalPool` executor from within \
another executor");
ThreadNotify::with_current(|thread| {
let waker = &Waker::from(thread.clone());
loop {
if let Async::Ready(t) = f(waker) {
return t;
}
thread.park();
}
})
}
impl LocalPool {
pub fn new() -> LocalPool {
LocalPool {
pool: FuturesUnordered::new(),
incoming: Default::default(),
}
}
pub fn executor(&self) -> LocalExecutor {
LocalExecutor {
incoming: Rc::downgrade(&self.incoming)
}
}
pub fn run(&mut self, exec: &mut Executor) {
run_executor(|waker| self.poll_pool(waker, exec))
}
pub fn run_until<F>(&mut self, mut f: F, exec: &mut Executor) -> Result<F::Item, F::Error>
where F: Future
{
let mut main_map = LocalMap::new();
run_executor(|waker| {
{
let mut main_cx = Context::new(&mut main_map, waker, exec);
match f.poll(&mut main_cx) {
Ok(Async::Ready(v)) => return Async::Ready(Ok(v)),
Err(err) => return Async::Ready(Err(err)),
_ => {}
}
}
self.poll_pool(waker, exec);
Async::Pending
})
}
fn poll_pool(&mut self, waker: &Waker, exec: &mut Executor) -> Async<()> {
let mut pool_map = LocalMap::new();
let mut pool_cx = Context::new(&mut pool_map, waker, exec);
loop {
{
let mut incoming = self.incoming.borrow_mut();
for task in incoming.drain(..) {
self.pool.push(task)
}
}
if let Ok(ret) = self.pool.poll_next(&mut pool_cx) {
if !self.incoming.borrow().is_empty() {
continue;
}
match ret {
Async::Pending => return Async::Pending,
Async::Ready(None) => return Async::Ready(()),
_ => {}
}
}
}
}
}
lazy_static! {
static ref GLOBAL_POOL: ThreadPool = ThreadPool::builder()
.name_prefix("block_on-")
.create()
.expect("Unable to create global thread-pool");
}
pub fn block_on<F: Future>(f: F) -> Result<F::Item, F::Error> {
let mut pool = LocalPool::new();
pool.run_until(f, &mut GLOBAL_POOL.clone())
}
pub fn block_on_stream<S: Stream>(s: S) -> BlockingStream<S> {
BlockingStream { stream: Some(s) }
}
pub struct BlockingStream<S: Stream> { stream: Option<S> }
impl<S: Stream> BlockingStream<S> {
pub fn into_inner(self) -> S {
self.stream.expect("BlockingStream shouldn't be empty")
}
}
impl<S: Stream> Iterator for BlockingStream<S> {
type Item = Result<S::Item, S::Error>;
fn next(&mut self) -> Option<Self::Item> {
let s = self.stream.take().expect("BlockingStream shouldn't be empty");
let (item, s) =
match LocalPool::new().run_until(s.next(), &mut GLOBAL_POOL.clone()) {
Ok((Some(item), s)) => (Some(Ok(item)), s),
Ok((None, s)) => (None, s),
Err((e, s)) => (Some(Err(e)), s),
};
self.stream = Some(s);
item
}
}
impl Executor for LocalExecutor {
fn spawn(&mut self, f: Box<Future<Item = (), Error = Never> + Send>) -> Result<(), SpawnError> {
self.spawn_task(Task {
fut: f,
map: LocalMap::new(),
})
}
fn status(&self) -> Result<(), SpawnError> {
if self.incoming.upgrade().is_some() {
Ok(())
} else {
Err(SpawnError::shutdown())
}
}
}
impl LocalExecutor {
fn spawn_task(&self, task: Task) -> Result<(), SpawnError> {
let incoming = self.incoming.upgrade().ok_or(SpawnError::shutdown())?;
incoming.borrow_mut().push(task);
Ok(())
}
pub fn spawn_local<F>(&mut self, f: F) -> Result<(), SpawnError>
where F: Future<Item = (), Error = Never> + 'static
{
self.spawn_task(Task {
fut: Box::new(f),
map: LocalMap::new(),
})
}
}
impl Future for Task {
type Item = ();
type Error = Never;
fn poll(&mut self, cx: &mut Context) -> Poll<(), Never> {
self.fut.poll(&mut cx.with_locals(&mut self.map))
}
}