use async_task::Runnable;
use futures_lite::future::{Future, FutureExt as _};
use std::{
fmt::Debug,
pin::Pin,
task::{Context, Poll},
time::Instant,
};
#[derive(Clone)]
pub struct BackgroundExecutor {
pub background_sender: flume::Sender<Runnable>,
}
type AnyFuture<R> = Pin<Box<dyn 'static + Send + Future<Output = R>>>;
impl BackgroundExecutor {
pub fn new() -> Self {
let (background_sender, background_receiver) = flume::unbounded::<Runnable>();
let thread_count = std::thread::available_parallelism()
.map(|i| i.get())
.unwrap_or(1);
let _background_threads = (0..thread_count)
.map(|i| {
let receiver = background_receiver.clone();
std::thread::spawn(move || {
for runable in receiver {
let start = Instant::now();
runable.run();
log::trace!(
"background thread {}:run runable, took:{:?}",
i,
start.elapsed()
);
}
})
})
.collect::<Vec<_>>();
Self { background_sender }
}
pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
where
R: Send + 'static,
{
self.spawn_internal::<R>(Box::pin(future))
}
fn spawn_internal<R: Send + 'static>(&self, future: AnyFuture<R>) -> Task<R> {
let sender = self.background_sender.clone();
let (runnable, task) =
async_task::spawn(future, move |runnable| sender.send(runnable).unwrap());
runnable.schedule();
Task::Spawned(task)
}
}
#[must_use]
#[derive(Debug)]
pub enum Task<T> {
Ready(Option<T>),
Spawned(async_task::Task<T>),
}
impl<T> Task<T> {
pub fn ready(val: T) -> Self {
Task::Ready(Some(val))
}
pub fn detach(self) {
match self {
Task::Ready(_) => {}
Task::Spawned(task) => task.detach(),
}
}
}
impl<E, T> Task<Result<T, E>>
where
T: 'static,
E: 'static + Debug,
{
#[track_caller]
pub fn detach_and_log_err(self) {}
}
impl<T> Future for Task<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match unsafe { self.get_unchecked_mut() } {
Task::Ready(val) => Poll::Ready(val.take().unwrap()),
Task::Spawned(task) => task.poll(cx),
}
}
}