use crate::*;
impl Clone for Task {
#[inline(always)]
fn clone(&self) -> Self {
Self {
pool: self.get_pool().clone(),
notify: self.get_notify(),
counter: AtomicUsize::new(self.get_counter().load(atomic::Ordering::Relaxed)),
shutdown: self.get_shutdown(),
}
}
}
impl Default for Task {
#[inline(always)]
fn default() -> Self {
let worker_count: usize = Handle::try_current()
.map(|handle: Handle| handle.metrics().num_workers())
.unwrap_or_default()
.max(1);
let notify: &'static Notify = Box::leak(Box::new(Notify::new()));
let shutdown: &'static AtomicBool = Box::leak(Box::new(AtomicBool::new(false)));
let mut pool: Vec<UnboundedSender<AsyncTask>> = Vec::with_capacity(worker_count);
for _ in 0..worker_count {
let (sender, mut receiver): (UnboundedSender<AsyncTask>, UnboundedReceiver<AsyncTask>) =
unbounded_channel();
pool.push(sender);
spawn_blocking(move || {
Handle::current().block_on(LocalSet::new().run_until(async move {
loop {
if shutdown.load(atomic::Ordering::Relaxed) {
break;
}
match receiver.try_recv() {
Ok(task) => {
spawn_local(task);
}
Err(_) => {
notify.notified().await;
}
}
}
}));
});
}
Self {
pool,
notify,
counter: AtomicUsize::new(0),
shutdown,
}
}
}
impl Drop for Task {
#[inline(always)]
fn drop(&mut self) {
self.shutdown();
}
}
impl Task {
pub fn try_spawn_local<F>(&self, index_opt: Option<usize>, hook: F) -> bool
where
F: Future<Output = ()> + Send + 'static,
{
if self.get_pool().is_empty() {
return false;
}
let index: usize = index_opt
.unwrap_or(self.get_counter().fetch_add(1, atomic::Ordering::Relaxed))
.wrapping_rem(self.get_pool().len());
if let Some(sender) = self.get_pool().get(index) {
let result: bool = sender.send(Box::pin(hook)).is_ok();
if result {
self.get_notify().notify_one();
}
return result;
}
false
}
#[inline(always)]
pub fn shutdown(&self) {
self.get_shutdown().store(true, atomic::Ordering::Relaxed);
self.get_notify().notify_waiters();
}
}