use std::iter;
use std::thread;
use crossbeam_deque::{Injector, Stealer, Worker};
use kv_log_macro::trace;
use lazy_static::lazy_static;
use super::sleepers::Sleepers;
use super::task;
use super::task_local;
use super::worker;
use super::{Builder, JoinHandle};
use crate::future::Future;
use crate::utils::abort_on_panic;
pub fn spawn<F, T>(future: F) -> JoinHandle<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
Builder::new().spawn(future).expect("cannot spawn future")
}
pub(crate) struct Pool {
pub injector: Injector<task::Runnable>,
pub stealers: Vec<Stealer<task::Runnable>>,
pub sleepers: Sleepers,
}
impl Pool {
pub fn spawn<F, T>(&self, future: F, builder: Builder) -> JoinHandle<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let tag = task::Tag::new(builder.name);
let child_id = tag.task_id().as_u64();
let parent_id = worker::get_task(|t| t.id().as_u64()).unwrap_or(0);
trace!("spawn", {
parent_id: parent_id,
child_id: child_id,
});
let future = unsafe { task_local::add_finalizer(future) };
let future = async move {
let res = future.await;
trace!("spawn completed", {
parent_id: parent_id,
child_id: child_id,
});
res
};
let (task, handle) = async_task::spawn(future, worker::schedule, tag);
task.schedule();
JoinHandle::new(handle)
}
pub fn find_task(&self, local: &Worker<task::Runnable>) -> Option<task::Runnable> {
local.pop().or_else(|| {
iter::repeat_with(|| {
self.injector
.steal_batch_and_pop(local)
.or_else(|| {
self.stealers
.iter()
.map(|s| s.steal_batch_and_pop(local))
.collect()
})
})
.find(|s| !s.is_retry())
.and_then(|s| s.success())
})
}
}
#[inline]
pub(crate) fn get() -> &'static Pool {
lazy_static! {
static ref POOL: Pool = {
let num_threads = num_cpus::get().max(1);
let mut stealers = Vec::new();
for _ in 0..num_threads {
let worker = Worker::new_fifo();
stealers.push(worker.stealer());
thread::Builder::new()
.name("async-task-driver".to_string())
.spawn(|| abort_on_panic(|| worker::main_loop(worker)))
.expect("cannot start a thread driving tasks");
}
Pool {
injector: Injector::new(),
stealers,
sleepers: Sleepers::new(),
}
};
}
&*POOL
}