use crate::*;
impl Default for Task {
#[inline(always)]
fn default() -> Self {
let worker_count: usize = Handle::try_current()
.map(|handle: Handle| handle.metrics().num_workers())
.unwrap_or_default()
.max(1);
Self::new(worker_count)
}
}
impl Drop for Task {
#[inline(always)]
fn drop(&mut self) {
self.shutdown();
}
}
impl Task {
pub fn new(worker_count: usize) -> Self {
let mut pool: Vec<UnboundedSender<AsyncTask>> = Vec::with_capacity(worker_count);
let counter: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
let shutdown: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
let notifies: Vec<Arc<Notify>> =
(0..worker_count).map(|_| Arc::new(Notify::new())).collect();
for notify in notifies.iter().take(worker_count) {
let (sender, mut receiver): (UnboundedSender<AsyncTask>, UnboundedReceiver<AsyncTask>) =
unbounded_channel();
pool.push(sender);
let shutdown_clone: Arc<AtomicBool> = shutdown.clone();
let notify_clone: Arc<Notify> = notify.clone();
spawn_blocking(move || {
Handle::current().block_on(LocalSet::new().run_until(async move {
loop {
if shutdown_clone.load(atomic::Ordering::Relaxed) {
break;
}
match receiver.try_recv() {
Ok(task) => {
spawn_local(task);
}
Err(_) => {
notify_clone.notified().await;
}
}
}
}));
});
}
Self {
pool,
counter,
shutdown,
notifies,
}
}
pub fn try_spawn_local<F>(&self, index_opt: Option<usize>, hook: F) -> bool
where
F: Future<Output = ()> + Send + 'static,
{
if self.get_pool().is_empty() {
return false;
}
let index: usize = index_opt
.unwrap_or(self.get_counter().fetch_add(1, atomic::Ordering::Relaxed))
.wrapping_rem(self.get_pool().len());
if let Some(sender) = self.get_pool().get(index) {
let result: bool = sender.send(Box::pin(hook)).is_ok();
if result && let Some(notify) = self.get_notifies().get(index) {
notify.notify_one()
}
return result;
}
false
}
#[inline(always)]
pub fn shutdown(&self) {
self.get_shutdown().store(true, atomic::Ordering::Relaxed);
self.get_notifies()
.iter()
.for_each(|notify: &Arc<Notify>| notify.notify_one());
}
}