use std::cell::{Cell, UnsafeCell};
use std::fmt::Arguments;
use std::mem;
use std::panic::{self, AssertUnwindSafe};
use std::pin::Pin;
use std::ptr;
use std::thread;
use crossbeam_channel::{unbounded, Sender};
use futures::future::FutureExt;
use lazy_static::lazy_static;
use super::task;
use super::{JoinHandle, Task};
use crate::future::Future;
use crate::io;
pub fn current() -> Task {
get_task(|task| task.clone()).expect("`task::current()` called outside the context of a task")
}
pub fn spawn<F, T>(future: F) -> JoinHandle<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
spawn_with_builder(Builder::new(), future, "spawn")
}
pub fn block_on<F, T>(future: F) -> T
where
F: Future<Output = T> + Send,
T: Send,
{
unsafe {
let out = &mut UnsafeCell::new(None);
let future = {
let out = out.get();
async move {
let v = AssertUnwindSafe(future).catch_unwind().await;
*out = Some(v);
}
};
futures::pin_mut!(future);
let future = mem::transmute::<
Pin<&mut dyn Future<Output = ()>>,
Pin<&'static mut (dyn Future<Output = ()> + Send)>,
>(future);
futures::executor::block_on(spawn_with_builder(Builder::new(), future, "block_on"));
match (*out.get()).take().unwrap() {
Ok(v) => v,
Err(err) => panic::resume_unwind(err),
}
}
}
#[derive(Debug)]
pub struct Builder {
pub(crate) name: Option<String>,
}
impl Builder {
pub fn new() -> Builder {
Builder { name: None }
}
pub fn name(mut self, name: String) -> Builder {
self.name = Some(name);
self
}
pub fn spawn<F, T>(self, future: F) -> io::Result<JoinHandle<T>>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
Ok(spawn_with_builder(self, future, "spawn"))
}
}
pub(crate) fn spawn_with_builder<F, T>(
builder: Builder,
future: F,
fn_name: &'static str,
) -> JoinHandle<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let Builder { name } = builder;
type Job = async_task::Task<task::Tag>;
lazy_static! {
static ref QUEUE: Sender<Job> = {
let (sender, receiver) = unbounded::<Job>();
for _ in 0..num_cpus::get().max(1) {
let receiver = receiver.clone();
thread::Builder::new()
.name("async-task-driver".to_string())
.spawn(|| {
TAG.with(|tag| {
for job in receiver {
tag.set(job.tag());
abort_on_panic(|| job.run());
tag.set(ptr::null());
}
});
})
.expect("cannot start a thread driving tasks");
}
sender
};
}
let tag = task::Tag::new(name);
let schedule = |job| QUEUE.send(job).unwrap();
let child_id = tag.task_id().as_u64();
let parent_id = get_task(|t| t.id().as_u64()).unwrap_or(0);
print(
format_args!("{}", fn_name),
LogData {
parent_id,
child_id,
},
);
let future = async move {
let res = future.await;
abort_on_panic(|| get_task(|task| task.metadata().local_map.clear()));
print(
format_args!("{} completed", fn_name),
LogData {
parent_id,
child_id,
},
);
res
};
let (task, handle) = async_task::spawn(future, schedule, tag);
task.schedule();
JoinHandle::new(handle)
}
thread_local! {
static TAG: Cell<*const task::Tag> = Cell::new(ptr::null_mut());
}
pub(crate) fn get_task<F: FnOnce(&Task) -> R, R>(f: F) -> Option<R> {
let res = TAG.try_with(|tag| unsafe { tag.get().as_ref().map(task::Tag::task).map(f) });
match res {
Ok(Some(val)) => Some(val),
Ok(None) | Err(_) => None,
}
}
#[inline]
fn abort_on_panic<T>(f: impl FnOnce() -> T) -> T {
struct Bomb;
impl Drop for Bomb {
fn drop(&mut self) {
std::process::abort();
}
}
let bomb = Bomb;
let t = f();
mem::forget(bomb);
t
}
struct LogData {
parent_id: u64,
child_id: u64,
}
impl<'a> log::kv::Source for LogData {
fn visit<'kvs>(
&'kvs self,
visitor: &mut dyn log::kv::Visitor<'kvs>,
) -> Result<(), log::kv::Error> {
visitor.visit_pair("parent_id".into(), self.parent_id.into())?;
visitor.visit_pair("child_id".into(), self.child_id.into())?;
Ok(())
}
}
fn print(msg: Arguments<'_>, key_values: impl log::kv::Source) {
log::logger().log(
&log::Record::builder()
.args(msg)
.key_values(&key_values)
.level(log::Level::Trace)
.target(module_path!())
.module_path(Some(module_path!()))
.file(Some(file!()))
.line(Some(line!()))
.build(),
);
}