use crate::enter;
use futures_core::future::Future;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
use futures_task::{waker_ref, ArcWake};
use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
use futures_util::pin_mut;
use futures_util::stream::FuturesUnordered;
use futures_util::stream::StreamExt;
use std::cell::RefCell;
use std::ops::{Deref, DerefMut};
use std::rc::{Rc, Weak};
use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
use std::thread::{self, Thread};
#[derive(Debug)]
pub struct LocalPool {
pool: FuturesUnordered<LocalFutureObj<'static, ()>>,
incoming: Rc<Incoming>,
}
#[derive(Clone, Debug)]
pub struct LocalSpawner {
incoming: Weak<Incoming>,
}
type Incoming = RefCell<Vec<LocalFutureObj<'static, ()>>>;
pub(crate) struct ThreadNotify {
thread: Thread,
unparked: AtomicBool,
}
thread_local! {
static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {
thread: thread::current(),
unparked: AtomicBool::new(false),
});
}
impl ArcWake for ThreadNotify {
fn wake_by_ref(arc_self: &Arc<Self>) {
let unparked = arc_self.unparked.swap(true, Ordering::Relaxed);
if !unparked {
arc_self.thread.unpark();
}
}
}
fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
let _enter = enter().expect(
"cannot execute `LocalPool` executor from within \
another executor",
);
CURRENT_THREAD_NOTIFY.with(|thread_notify| {
let waker = waker_ref(thread_notify);
let mut cx = Context::from_waker(&waker);
loop {
if let Poll::Ready(t) = f(&mut cx) {
return t;
}
let unparked = thread_notify.unparked.swap(false, Ordering::Acquire);
if !unparked {
thread::park();
thread_notify.unparked.store(false, Ordering::Release);
}
}
})
}
fn poll_executor<T, F: FnMut(&mut Context<'_>) -> T>(mut f: F) -> T {
let _enter = enter().expect(
"cannot execute `LocalPool` executor from within \
another executor",
);
CURRENT_THREAD_NOTIFY.with(|thread_notify| {
let waker = waker_ref(thread_notify);
let mut cx = Context::from_waker(&waker);
f(&mut cx)
})
}
impl LocalPool {
pub fn new() -> LocalPool {
LocalPool {
pool: FuturesUnordered::new(),
incoming: Default::default(),
}
}
pub fn spawner(&self) -> LocalSpawner {
LocalSpawner {
incoming: Rc::downgrade(&self.incoming),
}
}
pub fn run(&mut self) {
run_executor(|cx| self.poll_pool(cx))
}
pub fn run_until<F: Future>(&mut self, future: F) -> F::Output {
pin_mut!(future);
run_executor(|cx| {
{
let result = future.as_mut().poll(cx);
if let Poll::Ready(output) = result {
return Poll::Ready(output);
}
}
let _ = self.poll_pool(cx);
Poll::Pending
})
}
pub fn try_run_one(&mut self) -> bool {
poll_executor(|ctx| {
loop {
let ret = self.poll_pool_once(ctx);
if let Poll::Ready(Some(_)) = ret {
return true;
}
if self.incoming.borrow().is_empty() {
return false;
}
}
})
}
pub fn run_until_stalled(&mut self) {
poll_executor(|ctx| {
let _ = self.poll_pool(ctx);
});
}
fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> {
loop {
let ret = self.poll_pool_once(cx);
if !self.incoming.borrow().is_empty() {
continue;
}
match ret {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(()),
_ => {}
}
}
}
fn poll_pool_once(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
{
let mut incoming = self.incoming.borrow_mut();
for task in incoming.drain(..) {
self.pool.push(task)
}
}
self.pool.poll_next_unpin(cx)
}
}
impl Default for LocalPool {
fn default() -> Self {
Self::new()
}
}
pub fn block_on<F: Future>(f: F) -> F::Output {
pin_mut!(f);
run_executor(|cx| f.as_mut().poll(cx))
}
pub fn block_on_stream<S: Stream + Unpin>(stream: S) -> BlockingStream<S> {
BlockingStream { stream }
}
#[derive(Debug)]
pub struct BlockingStream<S: Stream + Unpin> {
stream: S,
}
impl<S: Stream + Unpin> Deref for BlockingStream<S> {
type Target = S;
fn deref(&self) -> &Self::Target {
&self.stream
}
}
impl<S: Stream + Unpin> DerefMut for BlockingStream<S> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.stream
}
}
impl<S: Stream + Unpin> BlockingStream<S> {
pub fn into_inner(self) -> S {
self.stream
}
}
impl<S: Stream + Unpin> Iterator for BlockingStream<S> {
type Item = S::Item;
fn next(&mut self) -> Option<Self::Item> {
LocalPool::new().run_until(self.stream.next())
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
}
impl Spawn for LocalSpawner {
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
if let Some(incoming) = self.incoming.upgrade() {
incoming.borrow_mut().push(future.into());
Ok(())
} else {
Err(SpawnError::shutdown())
}
}
fn status(&self) -> Result<(), SpawnError> {
if self.incoming.upgrade().is_some() {
Ok(())
} else {
Err(SpawnError::shutdown())
}
}
}
impl LocalSpawn for LocalSpawner {
fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
if let Some(incoming) = self.incoming.upgrade() {
incoming.borrow_mut().push(future);
Ok(())
} else {
Err(SpawnError::shutdown())
}
}
fn status_local(&self) -> Result<(), SpawnError> {
if self.incoming.upgrade().is_some() {
Ok(())
} else {
Err(SpawnError::shutdown())
}
}
}