use std::cell::Cell;
use futures::sync::oneshot;
use futures::{Async, Future, Poll};
use tokio_threadpool;
use error::fail;
use error::Error;
#[doc(no_inline)]
pub use futures::sync::oneshot::SpawnHandle;
#[doc(no_inline)]
pub use tokio::executor::DefaultExecutor;
#[doc(no_inline)]
pub use tokio::spawn;
#[doc(no_inline)]
pub use tokio_threadpool::BlockingError;
#[derive(Debug, Copy, Clone)]
pub(crate) enum RuntimeMode {
ThreadPool,
CurrentThread,
}
thread_local!(static MODE: Cell<Option<RuntimeMode>> = Cell::new(None));
pub(crate) fn with_set_runtime_mode<R>(mode: RuntimeMode, f: impl FnOnce() -> R) -> R {
#[allow(missing_debug_implementations)]
struct SetOnDrop(Option<RuntimeMode>);
impl Drop for SetOnDrop {
fn drop(&mut self) {
MODE.with(|mode| mode.set(self.0));
}
}
let mode = MODE.with(|m| m.replace(Some(mode)));
let _prev = SetOnDrop(mode);
match mode {
Some(..) => panic!("The runtime mode has already set on the current context."),
None => f(),
}
}
pub fn blocking<R>(f: impl FnOnce() -> R) -> Poll<R, BlockingError> {
match MODE.with(|mode| mode.get()) {
Some(RuntimeMode::ThreadPool) | None => tokio_threadpool::blocking(f),
Some(RuntimeMode::CurrentThread) => Ok(Async::Ready(f())),
}
}
pub fn blocking_section<F, T, E>(f: F) -> BlockingSection<F>
where
F: FnOnce() -> Result<T, E>,
E: Into<Error>,
{
BlockingSection { op: Some(f) }
}
#[derive(Debug)]
pub struct BlockingSection<F> {
op: Option<F>,
}
impl<F, T, E> Future for BlockingSection<F>
where
F: FnOnce() -> Result<T, E>,
E: Into<Error>,
{
type Item = T;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let result = try_ready!(
blocking(|| {
let op = self.op.take().unwrap();
op()
}).map_err(fail)
);
result.map(Async::Ready).map_err(Into::into)
}
}
#[inline]
pub fn spawn_with_handle<F>(future: F) -> SpawnHandle<F::Item, F::Error>
where
F: Future + Send + 'static,
F::Item: Send + 'static,
F::Error: Send + 'static,
{
oneshot::spawn(future, &DefaultExecutor::current())
}