#![doc = include_str!("../README.md")]
use std::{future::Future, time::Duration};
use async_scoped::spawner::{Blocker, FuncSpawner, Spawner};
use async_scoped::Scope;
use derive_builder::Builder;
use error::Result;
use once_cell::sync::Lazy;
use tokio::runtime::{Builder as TokioBuilder, Handle, Runtime};
use tokio::task as tokio_task;
use transport::{Transport, TransportReceiver, TransportSender};
pub mod error;
pub mod transport;
static DEFAULT_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
let runtime = TokioBuilder::new_multi_thread()
.worker_threads(4)
.thread_keep_alive(Duration::from_secs(60))
.enable_time()
.enable_io()
.build()
.unwrap();
runtime
});
fn default_rt_handle() -> Handle {
DEFAULT_RUNTIME.handle().clone()
}
#[derive(Clone, Builder)]
#[builder(pattern = "owned")]
pub struct TokioExecutor<TR> {
#[builder(default = "default_rt_handle()")]
rt_handle: Handle,
transport: TR,
}
impl<TR> TokioExecutor<TR> {
pub fn new(transport: TR) -> Self {
Self {
transport,
rt_handle: default_rt_handle(),
}
}
}
impl<TR: Default> Default for TokioExecutor<TR> {
fn default() -> Self {
Self {
rt_handle: default_rt_handle(),
transport: TR::default(),
}
}
}
impl<TR: Transport> TokioExecutor<TR> {
pub fn exec<T: Send, F: Future<Output = T> + Send>(&self, f: F) -> Result<T> {
let (tx, rx) = self.transport.create_channel();
let mut scope = unsafe { Scope::create(self.spawner()) };
let future = async move {
tx.send(f.await)
.expect("Failed to send future result through transport sender")
};
scope.spawn(future);
rx.receive()
}
fn spawner(&self) -> TokioSpawner {
TokioSpawner(self.rt_handle.clone())
}
}
struct TokioSpawner(Handle);
unsafe impl<T: Send + 'static> Spawner<T> for TokioSpawner {
type FutureOutput = std::result::Result<T, tokio_task::JoinError>;
type SpawnHandle = tokio_task::JoinHandle<T>;
fn spawn<F: Future<Output = T> + Send + 'static>(&self, f: F) -> Self::SpawnHandle {
self.0.spawn(f)
}
}
unsafe impl<T: Send + 'static> FuncSpawner<T> for TokioSpawner {
type FutureOutput = std::result::Result<T, tokio_task::JoinError>;
type SpawnHandle = tokio_task::JoinHandle<T>;
fn spawn_func<F: FnOnce() -> T + Send + 'static>(&self, f: F) -> Self::SpawnHandle {
self.0.spawn_blocking(f)
}
}
unsafe impl Blocker for TokioSpawner {
fn block_on<T, F: Future<Output = T>>(&self, f: F) -> T {
self.0.block_on(f)
}
}