use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use tokio::sync::Notify;
use tokio::sync::mpsc;
use crate::errors::KclError;
#[derive(Debug, Clone)]
pub struct AsyncTasks {
tx: mpsc::UnboundedSender<Result<(), KclError>>,
rx: Arc<tokio::sync::Mutex<mpsc::UnboundedReceiver<Result<(), KclError>>>>,
spawned: Arc<AtomicUsize>,
notifier: Arc<Notify>,
}
unsafe impl Send for AsyncTasks {}
unsafe impl Sync for AsyncTasks {}
impl Default for AsyncTasks {
fn default() -> Self {
Self::new()
}
}
impl AsyncTasks {
pub fn new() -> Self {
console_error_panic_hook::set_once();
let (tx, rx) = mpsc::unbounded_channel();
Self {
tx,
rx: Arc::new(tokio::sync::Mutex::new(rx)),
spawned: Arc::new(AtomicUsize::new(0)),
notifier: Arc::new(Notify::new()),
}
}
pub async fn spawn<F>(&mut self, fut: F)
where
F: std::future::Future<Output = anyhow::Result<(), KclError>> + Send + 'static,
{
self.spawned.fetch_add(1, Ordering::Relaxed);
let tx = self.tx.clone();
let notify = self.notifier.clone();
wasm_bindgen_futures::spawn_local(async move {
console_error_panic_hook::set_once();
let _ = tx.send(fut.await); notify.notify_one(); });
}
pub async fn join_all(&mut self) -> anyhow::Result<(), KclError> {
let total = self.spawned.load(Ordering::Acquire);
if total == 0 {
return Ok(());
}
let mut done = 0;
while done < total {
{
let mut rx = self.rx.lock().await;
while let Ok(res) = rx.try_recv() {
done += 1;
res?; }
}
if done >= total {
break;
}
futures_lite::future::yield_now().await;
{
let mut rx = self.rx.lock().await;
while let Ok(res) = rx.try_recv() {
done += 1;
res?; if done >= total {
break;
}
}
}
if done < total {
self.notifier.notified().await;
}
}
Ok(())
}
pub async fn clear(&mut self) {
self.spawned.store(0, Ordering::Release);
let mut rx = self.rx.lock().await;
while rx.try_recv().is_ok() {}
}
}