artwrap 0.3.3

Minimal asynchronous smol/wasm wrapper for unified accessing selected primitives.
Documentation
use std::{
    sync::{
        Arc, LazyLock,
        atomic::{AtomicBool, Ordering},
    },
    thread,
};

use async_executor::{Executor, LocalExecutor, Task};
pub(crate) use async_io::block_on;
pub(crate) use blocking::unblock;
use dashmap::DashMap;
use event_listener::Event;

thread_local! {
    static LOCAL_EXECUTOR: LocalExecutor<'static> = LocalExecutor::new();
}

pub(crate) fn spawn_local<F>(f: F) -> Task<F::Output>
where
    F: Future + 'static,
{
    LOCAL_EXECUTOR.with(|executor| executor.spawn(f))
}

static EXECUTOR: LazyLock<Arc<Executor<'static>>> = LazyLock::new(|| Arc::new(Executor::new()));

static STATS_ACTIVE: AtomicBool = AtomicBool::new(false);
static STATS: LazyLock<DashMap<thread::ThreadId, u64>> = LazyLock::new(|| DashMap::new());

pub fn executor() -> &'static Executor<'static> {
    &*EXECUTOR
}

pub fn stats_active() -> &'static AtomicBool {
    &STATS_ACTIVE
}

pub fn stats() -> &'static DashMap<thread::ThreadId, u64> {
    &STATS
}

pub(crate) fn spawn<F>(f: F) -> Task<F::Output>
where
    F: Future + Send + 'static,
    F::Output: Send,
{
    if stats_active().load(Ordering::Relaxed) {
        let id = thread::current_id();
        *stats().entry(id).or_default() += 1;
    }

    executor().spawn(f)
}

pub fn with_main<T, F: FnOnce() -> T>(f: F) -> T {
    with_main_async(|| async { f() })
}

pub fn with_main_async<T, F: AsyncFnOnce() -> T>(f: F) -> T {
    with_thread_pool(|| block_on(executor().run(f())))
}

fn with_thread_pool<T>(f: impl FnOnce() -> T) -> T {
    let stopper = WaitForStop::new();

    thread::scope(|scope| {
        let num_threads = thread::available_parallelism().map_or(1, |num| num.get());
        for i in 0..num_threads {
            let stopper = &stopper;

            thread::Builder::new()
                .name(format!("artwrap-worker-{i}"))
                .spawn_scoped(scope, || {
                    block_on(executor().run(stopper.wait()));
                })
                .expect("failed to spawn thread");
        }

        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(f));

        stopper.stop();

        match result {
            Ok(value) => value,
            Err(err) => std::panic::resume_unwind(err),
        }
    })
}

struct WaitForStop {
    stopped: AtomicBool,
    events: Event,
}

impl WaitForStop {
    #[inline]
    fn new() -> Self {
        Self {
            stopped: AtomicBool::new(false),
            events: Event::new(),
        }
    }

    #[inline]
    async fn wait(&self) {
        loop {
            if self.stopped.load(Ordering::Relaxed) {
                return;
            }

            event_listener::listener!(&self.events => listener);

            if self.stopped.load(Ordering::Acquire) {
                return;
            }

            listener.await;
        }
    }

    #[inline]
    fn stop(&self) {
        self.stopped.store(true, Ordering::SeqCst);
        self.events.notify_additional(usize::MAX);
    }
}