use std::prelude::v1::*;
use alloc::sync::Arc;
use std::pin::Pin;
use std::task::{Context, Poll};
use async_trait::async_trait;
use futures_task::SpawnError;
use tokio::runtime::Handle;
use crate::{Future, Timer};
use crate::{Executor, ExecutorPerform};
#[cfg(feature = "exec_tokio")]
pub struct TokioExecutor {
handle: Option<Handle>,
timer: Option<tokio::time::Interval>
}
#[cfg(feature = "exec_tokio")]
#[async_trait]
impl Executor<Handle> for TokioExecutor {
async fn context() -> Result<Self, SpawnError> {
let handle = Handle::current();
Ok(Self {
handle: Some(handle),
timer: None
})
}
async fn set_context(&mut self, executor: Handle) {
self.handle = Some(executor);
}
async fn enter_context(&self) {
if let Some(handle) = &self.handle {
let _ = handle.enter();
}
}
async fn get_executor(&self) -> &Handle {
self.handle.as_ref().expect("No executor available")
}
fn context_sync() -> Result<Self, SpawnError> {
let handle = Handle::current();
Ok(Self {
handle: Some(handle),
timer: None
})
}
fn set_context_sync(&mut self, executor: Handle) {
self.handle = Some(executor);
}
fn enter_context_sync(&self) {
if let Some(handle) = &self.handle {
let _ = handle.enter();
}
}
fn get_executor_sync(&self) -> &Handle {
self.handle.as_ref().expect("No executor available")
}
}
#[cfg(feature = "exec_tokio")]
#[async_trait]
impl ExecutorPerform<Handle> for TokioExecutor {
async fn spawn_in_context<F>(future: F) -> Result<Arc<dyn crate::Task<F::Output, Output = F::Output>>, SpawnError>
where
F: Future + Send + 'static,
F::Output: Send + 'static {
let handle = tokio::task::spawn(future);
Ok(Arc::new(Task::new(handle)))
}
async fn spawn_from_executor<E, F>(executor: &E, future: F) -> Result<Arc<dyn crate::Task<F::Output, Output = F::Output>>, SpawnError>
where
E: Executor<Handle>,
F: Future + Send + 'static,
F::Output: Send + 'static {
let handle = executor.get_executor().await;
Ok(Arc::new(Task::new(handle.spawn(future))))
}
async fn block_from_executor<E, F>(executor: &E, future: F) -> F::Output
where
E: Executor<Handle>,
F: Future + Send + 'static,
F::Output: Send + 'static {
let handle = executor.get_executor().await;
handle.block_on(future)
}
fn spawn_in_context_sync<F>(future: F) -> Result<Arc<dyn crate::Task<F::Output, Output = F::Output>>, SpawnError>
where
F: Future + Send + 'static,
F::Output: Send + 'static {
let handle = tokio::task::spawn(future);
Ok(Arc::new(Task::new(handle)))
}
fn spawn_from_executor_sync<E, F>(executor: &E, future: F) -> Result<Arc<dyn crate::Task<F::Output, Output = F::Output>>, SpawnError>
where
E: Executor<Handle>,
F: Future + Send + 'static,
F::Output: Send + 'static {
let handle = executor.get_executor_sync();
Ok(Arc::new(Task::new(handle.spawn(future))))
}
fn block_from_executor_sync<E, F>(executor: &E, future: F) -> F::Output
where
E: Executor<Handle>,
F: Future + Send + 'static,
F::Output: Send + 'static {
let handle = executor.get_executor_sync();
handle.block_on(future)
}
}
#[cfg(feature = "exec_tokio")]
#[async_trait]
impl Timer for TokioExecutor {
async fn once(duration_millis: u32) -> Self {
Self::interval(duration_millis).await
}
async fn interval(duration_millis: u32) -> Self {
let duration = tokio::time::Duration::from_millis(u64::from(duration_millis));
let interval = tokio::time::interval(duration);
Self {
handle: None,
timer: Some(interval)
}
}
async fn cancel(&mut self) {
self.timer = None;
}
fn once_sync(duration_millis: u32) -> Self {
Self::interval_sync(duration_millis)
}
fn interval_sync(duration_millis: u32) -> Self {
let duration = tokio::time::Duration::from_millis(u64::from(duration_millis));
let interval = tokio::time::interval(duration);
Self {
handle: None,
timer: Some(interval)
}
}
fn cancel_sync(&mut self) {
self.timer = None;
}
async fn tick(&mut self) -> u32 {
let instant = match &mut self.timer {
None => panic!("Timer has been dropped"),
Some(timer) => timer.tick().await
};
u32::try_from(instant.elapsed().as_millis()).unwrap_or_default()
}
}
#[cfg(feature = "exec_tokio")]
impl Clone for TokioExecutor {
fn clone(&self) -> Self {
Self {
handle: self.handle.clone(),
timer: None
}
}
}
#[cfg(feature = "exec_tokio")]
pub struct Task<T> {
detached: bool,
handle: tokio::task::JoinHandle<T>
}
#[cfg(feature = "exec_tokio")]
impl<T> Task<T> {
pub const fn new(handle: tokio::task::JoinHandle<T>) -> Self {
Self {
detached: false,
handle
}
}
}
#[cfg(feature = "exec_tokio")]
impl<T> Future for Task<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let handle = &mut self.get_mut().handle;
match futures_util::ready!(Pin::new(handle).poll(cx)) {
Ok(task) => Poll::Ready(task),
Err(e) => panic!("Task canceled or panicked: {}", e)
}
}
}
#[cfg(feature = "exec_tokio")]
#[async_trait]
impl<T: Send + 'static> crate::Task<T> for Task<T> {
async fn output(self) -> T {
match self.handle.await {
Ok(out) => out,
Err(e) => panic!("Problem obtaining output: {:?}", e)
}
}
fn detach(mut self) {
self.detached = true;
}
fn drop(self) {
if !self.detached { self.handle.abort() }
}
}
#[cfg(all(feature = "hyper_executor", feature = "exec_tokio"))]
impl<F> hyper::rt::Executor<F> for TokioExecutor
where
F: Future + Send + 'static,
F::Output: Send + 'static
{
fn execute(&self, fut: F) {
let _ = Self::spawn_in_context_sync(fut);
}
}