my_ecs/default/
worker.rs

1//! Provides [`Work`](crate::ecs::worker::Work) implementations and worker pool.
2//!
3//! If build target is native, the module exposes worker type which based on
4//! [`std::thread::Thread`]. While the build target is web, the module exposes
5//! web worker instead.
6
7/// Common interface for worker pool implementations.
8pub trait AsWorkerPool<W>: From<Vec<W>> + Into<Vec<W>> {
9    /// Creates an empty worker pool.
10    ///
11    /// # Examples
12    ///
13    /// ```
14    /// use my_ecs::prelude::*;
15    ///
16    /// let pool = WorkerPool::new();
17    /// assert!(pool.is_empty());
18    /// ```
19    fn new() -> Self;
20
21    /// Creates worker pool with workers as many as number of available logical
22    /// cpus.
23    ///
24    /// Number of logical cpus depends on platform which this crate runs on.
25    /// This method guarantees the returned worker pool to have at least one
26    /// worker in it even if it failed to get the number of logical cpus.
27    ///
28    /// # Examples
29    ///
30    /// ```
31    /// use my_ecs::prelude::*;
32    ///
33    /// let pool = WorkerPool::with_all_cpus();
34    /// assert!(!pool.is_empty());
35    /// ```
36    fn with_all_cpus() -> Self {
37        #[cfg(not(target_arch = "wasm32"))]
38        let len = {
39            std::thread::available_parallelism()
40                .unwrap_or(unsafe { std::num::NonZeroUsize::new_unchecked(1) })
41                .get()
42        };
43
44        #[cfg(target_arch = "wasm32")]
45        let len = crate::util::web::available_parallelism();
46
47        Self::with_len(len)
48    }
49
50    /// Creates worker pool with `len` workers.
51    ///
52    /// # Examples
53    ///
54    /// ```
55    /// use my_ecs::prelude::*;
56    ///
57    /// let pool = WorkerPool::with_len(1);
58    /// assert_eq!(pool.len() , 1);
59    /// ```
60    fn with_len(len: usize) -> Self;
61
62    /// Returns number of workers in the worker pool.
63    fn len(&self) -> usize;
64
65    /// Returns true if the worker pool doesn't contain any workers in it.
66    fn is_empty(&self) -> bool {
67        self.len() == 0
68    }
69
70    /// Appends a worker in the worker pool.
71    ///
72    /// # Examples
73    ///
74    /// ```
75    /// use my_ecs::prelude::*;
76    ///
77    /// let mut pool = WorkerPool::new();
78    /// assert!(pool.is_empty());
79    ///
80    /// let worker = WorkerBuilder::new("name").spawn().unwrap();
81    /// pool.append(worker);
82    /// assert_eq!(pool.len(), 1);
83    /// ```
84    fn append(&mut self, worker: W);
85}
86
87#[cfg(not(target_arch = "wasm32"))]
88pub use non_web::*;
89
90#[cfg(target_arch = "wasm32")]
91pub use web::*;
92
93#[cfg(not(target_arch = "wasm32"))]
94mod non_web {
95    use super::*;
96    use crate::{ds::ManagedConstPtr, ecs::prelude::*, util};
97    use std::{
98        fmt,
99        sync::mpsc::{self, Sender},
100        thread::{Builder, JoinHandle},
101    };
102
103    /// A data type holding [`Worker`]s.
104    #[derive(Debug)]
105    #[repr(transparent)]
106    pub struct WorkerPool {
107        workers: Vec<Worker>,
108    }
109
110    impl AsWorkerPool<Worker> for WorkerPool {
111        fn new() -> Self {
112            Self {
113                workers: Vec::new(),
114            }
115        }
116
117        fn with_len(len: usize) -> Self {
118            let mut this = Self::new();
119
120            let mut name = "worker0".to_owned();
121            for _ in 0..len {
122                let worker = WorkerBuilder::new(&name).spawn().unwrap();
123                this.append(worker);
124                util::str::increase_rnumber(&mut name);
125            }
126
127            this
128        }
129
130        fn len(&self) -> usize {
131            self.workers.len()
132        }
133
134        fn append(&mut self, worker: Worker) {
135            self.workers.push(worker);
136        }
137    }
138
139    impl Default for WorkerPool {
140        fn default() -> Self {
141            Self::new()
142        }
143    }
144
145    impl From<Vec<Worker>> for WorkerPool {
146        fn from(value: Vec<Worker>) -> Self {
147            Self { workers: value }
148        }
149    }
150
151    impl From<WorkerPool> for Vec<Worker> {
152        fn from(value: WorkerPool) -> Self {
153            value.workers
154        }
155    }
156
157    /// [`Worker`] builder.
158    ///
159    /// You can spawn [`Worker`] from this builder.
160    #[derive(Debug)]
161    pub struct WorkerBuilder<'a> {
162        inner: Builder,
163        name: &'a str,
164    }
165
166    impl<'a> WorkerBuilder<'a> {
167        /// Creates a new [`WorkerBuilder`].
168        pub fn new(name: &'a str) -> Self {
169            Self {
170                inner: Builder::new().name(name.to_owned()),
171                name,
172            }
173        }
174
175        /// Sets worker's stack size in bytes.
176        pub fn stack_size(self, size: usize) -> Self {
177            Self {
178                inner: self.inner.stack_size(size),
179                name: self.name,
180            }
181        }
182
183        /// Spawns a new [`Worker`] from the builder.
184        pub fn spawn(self) -> Result<Worker, std::io::Error> {
185            Worker::spawn(self)
186        }
187    }
188
189    /// Worker handle.
190    ///
191    /// When [`Worker`] is dropped, it waits for its associated worker to
192    /// finish.
193    pub struct Worker {
194        name: Box<str>,
195        tx: Sender<Option<ManagedConstPtr<SubContext>>>,
196        join_handle: Option<JoinHandle<()>>,
197    }
198
199    impl Worker {
200        fn spawn(builder: WorkerBuilder) -> Result<Self, std::io::Error> {
201            let (tx, rx) = mpsc::channel::<Option<ManagedConstPtr<SubContext>>>();
202            let join_handle = builder.inner.spawn(move || {
203                while let Some(cx) = rx.recv().unwrap() {
204                    SubContext::execute(cx);
205                }
206            })?;
207            Ok(Self {
208                name: builder.name.into(),
209                tx,
210                join_handle: Some(join_handle),
211            })
212        }
213    }
214
215    impl fmt::Debug for Worker {
216        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
217            f.debug_struct("Worker")
218                .field("name", &self.name)
219                .finish_non_exhaustive()
220        }
221    }
222
223    impl Drop for Worker {
224        fn drop(&mut self) {
225            // `rx` could be broken if worker panics.
226            let _ = self.tx.send(None);
227            // Safety: `join_handle` must have been filled.
228            let join_handle = unsafe { self.join_handle.take().unwrap_unchecked() };
229            let _ = join_handle.join();
230        }
231    }
232
233    impl Work for Worker {
234        fn unpark(&mut self, cx: ManagedConstPtr<SubContext>) -> bool {
235            let res = self.tx.send(Some(cx));
236            res.is_ok()
237        }
238
239        fn park(&mut self) -> bool {
240            true
241        }
242
243        fn name(&self) -> &str {
244            &self.name
245        }
246    }
247}
248
249#[cfg(target_arch = "wasm32")]
250mod web {
251    use super::*;
252    use crate::{
253        ds::{ManagedConstPtr, NonNullExt},
254        ecs::prelude::*,
255        util::{macros::impl_from_for_enum, prelude::*},
256    };
257    use std::{
258        cell::RefCell,
259        collections::VecDeque,
260        fmt,
261        future::Future,
262        mem,
263        mem::ManuallyDrop,
264        ops::{Deref, DerefMut},
265        pin::Pin,
266        rc::Rc,
267        sync::{
268            atomic::{AtomicBool, Ordering},
269            Arc, OnceLock,
270        },
271    };
272    use wasm_bindgen::prelude::*;
273
274    /// A data type holding [`Worker`]s.
275    #[derive(Debug)]
276    #[repr(transparent)]
277    pub struct WorkerPool {
278        workers: Vec<Worker>,
279    }
280
281    impl WorkerPool {
282        fn clear(&mut self) {
283            self.workers.clear();
284        }
285    }
286
287    impl AsWorkerPool<Worker> for WorkerPool {
288        fn new() -> Self {
289            Self {
290                workers: Vec::new(),
291            }
292        }
293
294        fn with_len(len: usize) -> Self {
295            let mut this = Self::new();
296
297            let mut name = "worker0".to_owned();
298            for _ in 0..len {
299                let worker = WorkerBuilder::new(&name).spawn().unwrap();
300                this.append(worker);
301                str_util::increase_rnumber(&mut name);
302            }
303
304            this
305        }
306
307        fn len(&self) -> usize {
308            self.workers.len()
309        }
310
311        fn append(&mut self, worker: Worker) {
312            self.workers.push(worker);
313        }
314    }
315
316    impl Default for WorkerPool {
317        fn default() -> Self {
318            Self::new()
319        }
320    }
321
322    impl From<Vec<Worker>> for WorkerPool {
323        fn from(value: Vec<Worker>) -> Self {
324            Self { workers: value }
325        }
326    }
327
328    impl From<WorkerPool> for Vec<Worker> {
329        fn from(value: WorkerPool) -> Self {
330            value.workers
331        }
332    }
333
334    /// [`MainWorker`] builder.
335    ///
336    /// You can spawn [`MainWorker`] from this builder.
337    #[derive(Debug)]
338    #[repr(transparent)]
339    pub struct MainWorkerBuilder<'a> {
340        inner: WorkerBuilder<'a>,
341    }
342
343    impl<'a> MainWorkerBuilder<'a> {
344        /// Creates a [`WorkerBuilder`] with default name 'main-worker'.
345        pub fn new() -> Self {
346            let inner = WorkerBuilder::new("main-worker").with_listen("mainOnMessage");
347            Self { inner }
348        }
349
350        /// Creates a [`WorkerBuilder`] with the given name.
351        pub fn with_name(self, name: &'a str) -> Self {
352            Self {
353                inner: self.inner.with_name(name),
354            }
355        }
356
357        /// Creates a [`WorkerBuilder`] with the given name of initialization
358        /// function.
359        ///
360        /// Default initialization function is 'mainOnMessage'.
361        pub fn with_init(self, init: &'a str) -> Self {
362            Self {
363                inner: self.inner.with_init(init),
364            }
365        }
366
367        /// Spawns a [`MainWorker`].
368        pub fn spawn(self) -> Result<MainWorker, JsValue> {
369            MainWorker::spawn(self)
370        }
371    }
372
373    impl<'a> Default for MainWorkerBuilder<'a> {
374        fn default() -> Self {
375            Self::new()
376        }
377    }
378
379    /// Main worker handle.
380    ///
381    /// Main worker is a web worker that is parent of other sub workers. Main
382    /// worker is responsible for communication with window context, spawning
383    /// sub workers, creating ecs instance, and running ecs. You can think of
384    /// main worker as main thread where 'main' function runs on native
385    /// environment.
386    ///
387    /// # Common worker hierarchy
388    ///
389    /// window - main worker - sub workers
390    ///
391    /// # Why we need main worker
392    ///
393    /// Ecs instance blocks sometimes to wait for messages from sub workers. But
394    /// browsers doesn't allow us to block on window context. So we need an
395    /// extra web worker.
396    #[derive(Debug)]
397    #[repr(transparent)]
398    pub struct MainWorker {
399        inner: Worker,
400    }
401
402    impl MainWorker {
403        fn spawn(builder: MainWorkerBuilder) -> Result<Self, JsValue> {
404            Ok(Self {
405                inner: builder.inner.spawn()?,
406            })
407        }
408
409        /// Spawns sub workers as many as the given number on the main worker.
410        ///
411        /// Sub workers are behind the main worker so that you cannot
412        /// communicate with them directly.
413        ///
414        /// # Examples
415        ///
416        /// ```
417        /// use my_ecs::prelude::*;
418        ///
419        /// let main = MainWorkerBuilder::new().spawn().unwrap();
420        /// let num_cpus = web_util::available_parallelism();
421        /// main.spawn_children(num_cpus);
422        /// ```
423        pub fn spawn_children(&self, num: usize) {
424            self.delegate(
425                |arg| {
426                    let num: f64 = arg.unchecked_into_f64();
427                    let num = num as usize;
428                    JS_MAIN_CX.with_borrow_mut(|cx| {
429                        for _ in 0..num {
430                            str_util::increase_rnumber(&mut cx.child_name);
431                            let worker = WorkerBuilder::new(&cx.child_name).spawn().unwrap();
432                            cx.pool.append(worker);
433                        }
434                    });
435                },
436                num.into(),
437            );
438        }
439
440        /// Sends the main worker a function that initializes ecs instance.
441        ///
442        /// The main worker will execute the function and store the returned ecs
443        /// instance once it's ready.
444        ///
445        /// # Examples
446        ///
447        /// ```
448        /// use my_ecs::prelude::*;
449        ///
450        /// let main = MainWorkerBuilder::new().spawn().unwrap();
451        /// main.init_app(|pool| {
452        ///     let num_workers = pool.len();
453        ///     Ecs::default(pool, [num_workers])
454        /// });
455        /// ```
456        pub fn init_app<F, R>(&self, f: F)
457        where
458            F: FnOnce(WorkerPool) -> R + 'static,
459            R: Into<LeakedEcsApp>,
460        {
461            let f = move |pool: WorkerPool| -> LeakedEcsApp { f(pool).into() };
462            let f: DynFnOnce<WorkerPool, LeakedEcsApp> = ManuallyDrop::new(Box::new(f));
463            helper(self, f);
464
465            // === Internal helper functions ===
466
467            fn helper(this: &MainWorker, f: DynFnOnce<WorkerPool, LeakedEcsApp>) {
468                let arg = DynFnOnceCodec::encode_into_array(f);
469                this.delegate(
470                    |arg| {
471                        JS_MAIN_CX.with_borrow_mut(|cx| {
472                            let arg: js_sys::Uint32Array = arg.unchecked_into();
473                            // Safety: `arg` is `f`.
474                            unsafe {
475                                let f = DynFnOnceCodec::decode_from_array(&arg)
476                                    .cast::<WorkerPool, LeakedEcsApp>();
477                                cx.schedule_fn(f);
478                            };
479                            cx.consume_if_ready();
480                        });
481                    },
482                    arg.into(),
483                );
484            }
485        }
486
487        /// Sends the main worker a function that accesses ecs instance.
488        ///
489        /// But if main worker doesn't have ecs instance, the function will be
490        /// dropped without execution. Don't forget to call
491        /// [`MainWorker::init_app`] beforehand.
492        ///
493        /// # Examples
494        ///
495        /// ```
496        /// use my_ecs::prelude::*;
497        ///
498        /// let main = MainWorkerBuilder::new().spawn().unwrap();
499        ///
500        /// main.init_app(|pool| {
501        ///     let num_workers = pool.len();
502        ///     Ecs::default(pool, [num_workers])
503        /// });
504        ///
505        /// main.with_app(|app| { /* ... */ });
506        /// ```
507        pub fn with_app<F>(&self, f: F)
508        where
509            F: FnOnce(EcsExt<'static>) + 'static,
510        {
511            let f: DynFnOnce<EcsExt<'static>, ()> = ManuallyDrop::new(Box::new(f));
512            helper(self, f);
513
514            // === Internal helper functions ===
515
516            fn helper(this: &MainWorker, f: DynFnOnce<EcsExt<'static>, ()>) {
517                let arg = DynFnOnceCodec::encode_into_array(f);
518                this.delegate(
519                    |arg| {
520                        JS_MAIN_CX.with_borrow_mut(|cx| {
521                            let arg: js_sys::Uint32Array = arg.unchecked_into();
522                            // Safety: `arg` is `f`.
523                            unsafe {
524                                let f = DynFnOnceCodec::decode_from_array(&arg)
525                                    .cast::<EcsExt<'static>, ()>();
526                                cx.schedule_fn(f);
527                            };
528                            cx.consume_if_ready();
529                        });
530                    },
531                    arg.into(),
532                );
533            }
534        }
535
536        /// Executes the given future on the main worker using JS runtime.
537        ///
538        /// This method doesn't block, but the main worker stacks other
539        /// requested functions rather than executing them until the given
540        /// future is completed.
541        ///
542        /// Web APIs which return `Promise` should be called carefully. They
543        /// eagerly put tasks to JS runtime queue, and the tasks cannot make
544        /// more progress while Rust wasm holds CPU. That means that Rust wasm
545        /// should stop its processing to complete JS `Promise`. In other words,
546        /// Rust wasm cannot wait to be woken up by `Promise`.
547        ///
548        /// # Examples
549        ///
550        /// ```
551        /// use my_ecs::prelude::*;
552        /// use web_sys::{WorkerGlobalScope, Response};
553        /// use wasm_bindgen_futures::JsFuture;
554        ///
555        /// let main = MainWorkerBuilder::new().spawn().unwrap();
556        ///
557        /// main.init_app(|pool| {
558        ///     let num_workers = pool.len();
559        ///     Ecs::default(pool, [num_workers])
560        /// });
561        ///
562        /// main.with_app_await(|app| async {
563        ///     let global: WorkerGlobalScope = js_sys::global().unchecked_into();
564        ///     let promise = global.fetch_with_str("<some-url>");
565        ///     let resp = JsFuture::from(promise).await.unwrap();
566        /// });
567        /// ```
568        pub fn with_app_await<F, Fut>(&self, f: F)
569        where
570            F: FnOnce(EcsExt<'static>) -> Fut + 'static,
571            Fut: Future<Output = ()> + 'static,
572        {
573            type Arg = EcsExt<'static>;
574            type R = Pin<Box<dyn Future<Output = ()>>>;
575
576            let f = |ecs: Arg| async move {
577                f(ecs).await;
578
579                // Resumes the main worker, so that it can consume buffered
580                // functions.
581                JS_MAIN_CX.with_borrow_mut(|cx| {
582                    cx.resume();
583                    cx.consume_if_ready();
584                });
585            };
586            let f: Box<dyn FnOnce(Arg) -> R> = Box::new(move |ecs| Box::pin(f(ecs)));
587            let f: DynFnOnce<Arg, R> = ManuallyDrop::new(f);
588
589            helper(self, f);
590
591            // === Internal helper functions ===
592
593            fn helper(this: &MainWorker, f: DynFnOnce<Arg, R>) {
594                let arg = DynFnOnceCodec::encode_into_array(f);
595                this.delegate(
596                    |arg| {
597                        JS_MAIN_CX.with_borrow_mut(|cx| {
598                            let arg: js_sys::Uint32Array = arg.unchecked_into();
599                            // Safety: `arg` is `f`.
600                            unsafe {
601                                let f = DynFnOnceCodec::decode_from_array(&arg).cast::<Arg, R>();
602                                cx.schedule_fn(f);
603                            };
604                            cx.consume_if_ready();
605                        });
606                    },
607                    arg.into(),
608                );
609            }
610        }
611
612        /// Sends the main worker a function to call it on the main worker
613        /// context.
614        ///
615        /// The function will be called once the main worker is ready.
616        pub fn delegate(&self, f: fn(arg: JsValue), arg: JsValue) {
617            MessageFn { f }.post_to(&self.handle(), arg);
618        }
619    }
620
621    impl Drop for MainWorker {
622        fn drop(&mut self) {
623            JS_MAIN_CX.with_borrow_mut(|cx| {
624                cx.pool.clear();
625            });
626        }
627    }
628
629    impl Deref for MainWorker {
630        type Target = Worker;
631
632        fn deref(&self) -> &Self::Target {
633            &self.inner
634        }
635    }
636
637    impl DerefMut for MainWorker {
638        fn deref_mut(&mut self) -> &mut Self::Target {
639            &mut self.inner
640        }
641    }
642
643    /// Listener of messages from the main worker.
644    ///
645    /// This function is exposed to JS, and it looks like,
646    ///
647    /// ```js
648    /// // index.js
649    /// const main_worker = new Worker('worker.js');
650    /// main_worker.onmessage = (msg) => {
651    ///     // this function
652    /// };
653    ///
654    /// // worker.js
655    /// postMessage('to window');
656    /// ```
657    #[wasm_bindgen(js_name = mainOnMessage)]
658    pub fn main_on_message(msg: JsValue) {
659        if let Some(arr) = msg.dyn_ref::<js_sys::Array>() {
660            let header = arr.get(0);
661            match MessageHeader::from_js_value(header).0 {
662                MessageHeader::FN_INNER => {
663                    let f = MessageFn::read_body(arr).f;
664                    let arg = MessageFn::read_argument(arr);
665                    f(arg);
666                }
667                _ => {
668                    crate::log!("[W] unknown message on main worker");
669                }
670            }
671        } else {
672            web_util::worker_post_message(&msg).unwrap();
673        }
674    }
675
676    thread_local! {
677        static JS_MAIN_CX: RefCell<MainWorkerContext> = RefCell::new(
678            MainWorkerContext::new()
679        );
680
681        static CONSUME_IF_READY: RefCell<Closure<dyn FnMut()>> = RefCell::new(
682            Closure::new(|| {
683                JS_MAIN_CX.with_borrow_mut(|cx| cx.consume_if_ready());
684            })
685        );
686    }
687
688    enum FnOnMain {
689        InitEcs(DynFnOnceExt<WorkerPool, LeakedEcsApp>),
690        WithEcs(DynFnOnceExt<EcsExt<'static>, ()>),
691        WithEcsAwait(DynFnOnceExt<EcsExt<'static>, Pin<Box<dyn Future<Output = ()>>>>),
692    }
693
694    impl_from_for_enum!(
695        "outer" = FnOnMain; "var" = InitEcs;
696        "inner" = DynFnOnceExt<WorkerPool, LeakedEcsApp>
697    );
698    impl_from_for_enum!(
699        "outer" = FnOnMain; "var" = WithEcs;
700        "inner" = DynFnOnceExt<EcsExt<'static>, ()>
701    );
702    impl_from_for_enum!(
703        "outer" = FnOnMain; "var" = WithEcsAwait;
704        "inner" = DynFnOnceExt<EcsExt<'static>, Pin<Box<dyn Future<Output = ()>>>>
705    );
706
707    struct MainWorkerContext {
708        /// Worker pool.
709        pool: WorkerPool,
710        ecs: Option<LeakedEcsApp>,
711        /// Child worker name that will be given to the next spawned child worker.
712        child_name: String,
713        pending: VecDeque<FnOnMain>,
714        pause: Arc<AtomicBool>,
715    }
716
717    impl MainWorkerContext {
718        fn new() -> Self {
719            Self {
720                pool: WorkerPool::new(),
721                ecs: None,
722                child_name: "sub-worker0".to_owned(),
723                pending: VecDeque::new(),
724                pause: Arc::new(AtomicBool::new(false)),
725            }
726        }
727
728        fn schedule_fn<F>(&mut self, f: F)
729        where
730            F: Into<FnOnMain>,
731        {
732            self.pending.push_back(f.into());
733        }
734
735        fn consume_if_ready(&mut self) {
736            // If child workers are not ready yet, we need to give CPU to JS
737            // runtime.
738            if !self.is_ready() {
739                CONSUME_IF_READY.with_borrow(|ready_run| {
740                    const WAIT_MS: i32 = 10;
741                    let cb = ready_run.as_ref().unchecked_ref();
742                    let global = web_util::worker_global();
743                    global
744                        .set_timeout_with_callback_and_timeout_and_arguments_0(cb, WAIT_MS)
745                        .unwrap();
746                });
747                return;
748            }
749
750            if self.is_paused() {
751                return;
752            }
753
754            while let Some(f) = self.pending.pop_front() {
755                match f {
756                    FnOnMain::InitEcs(f) => {
757                        let pool = mem::take(&mut self.pool);
758                        let ecs = f.call(pool);
759                        self.ecs = Some(ecs);
760                    }
761                    FnOnMain::WithEcs(f) => {
762                        if let Some(ecs) = self.ecs.as_ref() {
763                            // Safety: We're accessing valid ecs once at a time.
764                            unsafe { f.call(ecs.get()) };
765                        }
766                    }
767                    FnOnMain::WithEcsAwait(f) => {
768                        if let Some(ecs) = self.ecs.as_ref() {
769                            // Safety: We're accessing valid ecs once at a time.
770                            let fut = unsafe { f.call(ecs.get()) };
771                            wasm_bindgen_futures::spawn_local(fut);
772
773                            // Stops consuming further before being resumed.
774                            self.pause();
775                            break;
776                        }
777                    }
778                }
779            }
780        }
781
782        fn is_ready(&self) -> bool {
783            let children = &self.pool.workers;
784            children.iter().all(|child| child.is_ready())
785        }
786
787        fn is_paused(&self) -> bool {
788            self.pause.load(Ordering::Relaxed)
789        }
790
791        fn pause(&self) {
792            self.pause.store(true, Ordering::Relaxed);
793        }
794
795        fn resume(&self) {
796            self.pause.store(false, Ordering::Relaxed);
797        }
798    }
799
800    impl fmt::Debug for MainWorkerContext {
801        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
802            f.debug_struct("MainWorkerContext")
803                .field("pool", &self.pool)
804                .finish_non_exhaustive()
805        }
806    }
807
808    impl Default for MainWorkerContext {
809        fn default() -> Self {
810            Self::new()
811        }
812    }
813
814    /// [`Worker`] builder.
815    ///
816    /// You can spawn [`Worker`] from this builder.
817    #[derive(Debug)]
818    pub struct WorkerBuilder<'a> {
819        name: &'a str,
820        script: Option<&'a str>,
821        listen: &'a str,
822        init: &'a str,
823    }
824
825    impl<'a> WorkerBuilder<'a> {
826        /// Default message listener of the worker.
827        /// See [`worker_on_message`].
828        const DEFAULT_LISTEN: &'static str = "workerOnMessage";
829        const DEFAULT_INIT: &'static str = "workerInit";
830
831        /// Creates a new [`WorkerBuilder`].
832        pub const fn new(name: &'a str) -> Self {
833            Self {
834                name,
835                script: None,
836                init: Self::DEFAULT_INIT,
837                listen: Self::DEFAULT_LISTEN,
838            }
839        }
840
841        /// Creates a new [`WorkerBuilder`] with the given name.
842        pub const fn with_name(mut self, name: &'a str) -> Self {
843            self.name = name;
844            self
845        }
846
847        /// Creates a new [`WorkerBuilder`] with the given script.
848        pub const fn with_script(mut self, script: &'a str) -> Self {
849            self.script = Some(script);
850            self
851        }
852
853        /// Creates a new [`WorkerBuilder`] with the given name of
854        /// initialization function.
855        pub const fn with_init(mut self, init: &'a str) -> Self {
856            self.init = init;
857            self
858        }
859
860        /// Creates a new [`WorkerBuilder`] with the given `onmessage` listener.
861        pub const fn with_listen(mut self, listen: &'a str) -> Self {
862            self.listen = listen;
863            self
864        }
865
866        /// Spawns a new [`Worker`] from the builder.
867        pub fn spawn(self) -> Result<Worker, JsValue> {
868            Worker::spawn(self)
869        }
870    }
871
872    /// Worker handle.
873    pub struct Worker {
874        /// JS worker handle.
875        handle: web_sys::Worker,
876
877        /// Worker name. You can see this name in browser's dev tool.
878        name: Box<str>,
879
880        /// Callback for worker's first response, which is a notification of
881        /// the worker's readiness.
882        ///
883        /// The callback will be replaced with [`Worker::on_message`] once it
884        /// called.
885        _on_ready: Closure<dyn FnMut()>,
886
887        /// Callback for worker response.
888        #[allow(clippy::type_complexity)]
889        on_message: Rc<RefCell<Closure<dyn FnMut(web_sys::MessageEvent)>>>,
890
891        /// Determines the worker is spawned and ready to listen to message.
892        ready: Arc<AtomicBool>,
893    }
894
895    impl fmt::Debug for Worker {
896        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
897            f.debug_struct("Worker")
898                .field("name", &self.name)
899                .field("ready", &self.ready.as_ref())
900                .finish_non_exhaustive()
901        }
902    }
903
904    impl Worker {
905        fn spawn(builder: WorkerBuilder) -> Result<Self, JsValue> {
906            // Creates a new worker.
907            let handle = create_worker(builder.name, builder.script)?;
908            let ready = Arc::new(AtomicBool::new(false));
909            let on_message = Rc::new(RefCell::new(Closure::new(|_| {})));
910            let c_handle = handle.clone();
911            let c_ready = Arc::clone(&ready);
912            let c_on_message = Rc::clone(&on_message);
913
914            // Listens to worker's ready notification.
915            let on_ready = Closure::new(move || {
916                let on_message = c_on_message.borrow();
917                c_handle.set_onmessage(Some(on_message.as_ref().unchecked_ref()));
918                c_ready.store(true, Ordering::Release);
919            });
920            handle.set_onmessage(Some(on_ready.as_ref().unchecked_ref()));
921
922            // Sets 'WBG_INIT' if it wasn't set yet.
923            let wasm_init = WBG_INIT.get_or_init(|| DEFAULT_WBG_INIT.to_owned());
924
925            // TODO: For now, we assume that wasm use shared memory.
926            // Initializes the worker.
927            use js_sys::{Object, Reflect};
928            let msg = Object::new();
929            Reflect::set(&msg, &"module".into(), &wasm_bindgen::module())?;
930            Reflect::set(&msg, &"memory".into(), &wasm_bindgen::memory())?;
931            Reflect::set(&msg, &"url".into(), &IMPORT_META_URL.with(JsValue::clone))?;
932            Reflect::set(&msg, &"wasmInit".into(), &wasm_init.into())?;
933            Reflect::set(&msg, &"init".into(), &builder.init.into())?;
934            Reflect::set(&msg, &"listen".into(), &builder.listen.into())?;
935            handle.post_message(&msg)?;
936
937            Ok(Self {
938                handle,
939                name: builder.name.into(),
940                _on_ready: on_ready,
941                on_message,
942                ready,
943            })
944        }
945
946        /// Returns JS worker handle.
947        pub fn handle(&self) -> web_sys::Worker {
948            self.handle.clone()
949        }
950
951        /// Returns true if the worker has been fully initialized and ready to
952        /// process messages.
953        pub fn is_ready(&self) -> bool {
954            self.ready.load(Ordering::Relaxed)
955        }
956
957        pub fn set_on_message<F>(&self, mut cb: F)
958        where
959            F: FnMut(JsValue) + 'static,
960        {
961            let cb: Closure<dyn FnMut(web_sys::MessageEvent)> =
962                Closure::new(move |ev: web_sys::MessageEvent| cb(ev.data()));
963
964            if self.is_ready() {
965                self.handle.set_onmessage(Some(cb.as_ref().unchecked_ref()));
966            }
967            *self.on_message.borrow_mut() = cb;
968        }
969
970        /// Sends the worker a message.
971        pub fn post_message(&self, msg: &JsValue) -> Result<(), JsValue> {
972            self.handle.post_message(msg)
973        }
974    }
975
976    impl Work for Worker {
977        fn unpark(&mut self, cx: ManagedConstPtr<SubContext>) -> bool {
978            let ptr = cx.as_ptr();
979
980            #[cfg(feature = "check")]
981            drop(cx);
982
983            let res = self.handle.post_message(&JsValue::from(ptr));
984            res.is_ok()
985        }
986
987        fn park(&mut self) -> bool {
988            true
989        }
990
991        fn name(&self) -> &str {
992            &self.name
993        }
994    }
995
996    impl Drop for Worker {
997        /// Terminates web worker *immediately*.
998        fn drop(&mut self) {
999            self.handle.terminate();
1000        }
1001    }
1002
1003    /// Clients can modify init function of wasm glue JS file before they call [`Worker::spawn`].
1004    /// If you don't set this value, [`DEFAULT_WBG_INIT`] will be set as default value.
1005    ///
1006    /// # Example
1007    ///
1008    /// ```rust
1009    /// use my_ecs::default::prelude::*;
1010    ///
1011    /// // Some bundlers may minify export name to '_' or 'default'.
1012    /// crate::WBG_INIT.set("_".to_owned()).unwrap();
1013    /// Worker::spawn("worker", 0).unwrap();
1014    /// ```
1015    pub static WBG_INIT: OnceLock<String> = OnceLock::new();
1016
1017    /// wasm-bindgen will generate "__wbg_init" as default export function.
1018    /// But, I expect that it will be exported as 'default'.
1019    pub const DEFAULT_WBG_INIT: &str = "default";
1020
1021    pub const DEFAULT_WORKER_SCRIPT: &str = include_str!("worker.js");
1022
1023    // Some bundlers could warn about circular dependency caused by worker
1024    // due to the cycle that looks like
1025    // "Rust wasm - (bind) -> worker.js -> (import) -> wasm".
1026    //
1027    // But, if the worker JS file is substituted with a created object,
1028    // we can avoid the warning.
1029    //
1030    // However, in that case, we need to set bundler to cooperate with the
1031    // created worker object.
1032    // For instance, in Vite(v5.4.2), you may need following settings.
1033    //
1034    // build: {
1035    //   rollupOptions: {
1036    //       // We need to split wasm glue module into a separate chunk
1037    //       // 1. Not to include window context data.
1038    //       //    * wasm glue module will be imported in worker context.
1039    //       //    * In worker context, we can't access something like document.
1040    //       // 2. To preserve indirectly used exports.
1041    //       //    * Rollup doesn't know that we're going to access some exported
1042    //       //      objects, workerOnMessage for instance, in wasm code.
1043    //       //    * So we need to make Rollup not to drop those objects.
1044    //       //
1045    //       // First of all, we need to make a new entry point for wasm.
1046    //       input: {
1047    //         wasm: 'pkg/wasm-index.js', // path to wasm glue file.
1048    //         ...
1049    //       },
1050    //       ...
1051    //
1052    //       // Then, put the following.
1053    //       // * https://rollupjs.org/configuration-options/#preserveentrysignatures
1054    //       //   Although Rollup says default is already 'exports-only',
1055    //       //   but I guess Vite 5.4.2 changes it to `false`.
1056    //       preserveEntrySignatures: 'exports-only',
1057    //     },
1058    //   ...
1059    // }
1060    fn create_worker(name: &str, script: Option<&str>) -> Result<web_sys::Worker, JsValue> {
1061        let opt = web_sys::WorkerOptions::new();
1062        opt.set_name(name);
1063        opt.set_type(web_sys::WorkerType::Module);
1064        web_sys::Worker::new_with_options(&script_url(script), &opt)
1065    }
1066
1067    #[wasm_bindgen]
1068    extern "C" {
1069        /// URL of wasm glue JS file.
1070        //
1071        // We need this URL of wasm glue JS file in order to import it dynamically in workers.
1072        // So that workers can share the same wasm module and memory.
1073        // But note that bundler may evaluate "import.meta.url" statically during bundling,
1074        // which is not what we want, we need to evaluate it at runtime.
1075        // Therefore, you need to configure your bundler not to do it.
1076        // (e.g. Webpack does it basically, but Vite(v5.1.6) doesn't do it)
1077        #[wasm_bindgen(thread_local_v2, js_namespace = ["import", "meta"], js_name = url)]
1078        static IMPORT_META_URL: JsValue;
1079    }
1080
1081    fn script_url(script: Option<&str>) -> String {
1082        let script = script.unwrap_or(DEFAULT_WORKER_SCRIPT);
1083        let blob_parts = js_sys::Array::new_with_length(1);
1084        blob_parts.set(0, JsValue::from_str(script));
1085
1086        let options = web_sys::BlobPropertyBag::new();
1087        options.set_type("application/javascript");
1088
1089        let blob = web_sys::Blob::new_with_str_sequence_and_options(&blob_parts, &options).unwrap();
1090        web_sys::Url::create_object_url_with_blob(&blob).unwrap()
1091    }
1092
1093    #[wasm_bindgen(js_name = workerInit)]
1094    pub fn worker_init() {
1095        // Nothing to do as default behavior.
1096    }
1097
1098    /// # Safety
1099    ///
1100    /// Undefined behavior if the pointer is not valid or aliased.
1101    #[wasm_bindgen(js_name = workerOnMessage)]
1102    pub unsafe fn worker_on_message(cx: *mut SubContext) {
1103        let cx = ManagedConstPtr::new(NonNullExt::new_unchecked(cx));
1104        SubContext::execute(cx);
1105    }
1106
1107    /// Message event header.
1108    ///
1109    /// Inner value will be transmuted into f64 and vice versa. You must
1110    /// guarantee that f64 representation is not Nan (or Inf). If so,
1111    /// someone(maybe JS) can change its bit expression into something else with
1112    /// just preserving the meaning, Nan or Inf or something like that.
1113    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
1114    pub struct MessageHeader(pub u64);
1115
1116    impl MessageHeader {
1117        const MAGIC: u64 = 0x0EC5 << (16 * 3);
1118
1119        const BASE: u64 = Self::MAGIC;
1120
1121        const FN_INNER: u64 = Self::BASE + 1;
1122        const FN: Self = Self(Self::FN_INNER);
1123
1124        pub fn into_js_value(self) -> JsValue {
1125            let value = f64::from_bits(self.0);
1126            debug_assert!(value.is_finite()); // Nan or Inf are now allowed.
1127            JsValue::from_f64(value)
1128        }
1129
1130        pub fn from_js_value(value: JsValue) -> Self {
1131            Self(value.unchecked_into_f64().to_bits())
1132        }
1133    }
1134
1135    const F64_SIZE: usize = mem::size_of::<f64>();
1136    const F64_ROUND_UP: usize = F64_SIZE - 1;
1137    const HEADER_LEN: usize = (mem::size_of::<MessageHeader>() + F64_ROUND_UP) / F64_SIZE;
1138    const U32_SIZE: usize = mem::size_of::<u32>();
1139    const U32_ROUND_UP: usize = U32_SIZE - 1;
1140
1141    /// Instant message for sending a function.
1142    #[derive(Debug, Clone, Copy)]
1143    #[repr(C)]
1144    struct MessageFn {
1145        /// Function pointer.
1146        //
1147        // NOTE: We cannot turn an address into f64 directly.
1148        // Address could be 'inf' or 'nan' in f64 representation.
1149        // See https://en.wikipedia.org/wiki/IEEE_754-1985
1150        // Those numbers will keep the meaning of 'inf' or 'nan' though,
1151        // they can be freely different number in perspective of bit.
1152        // So we put this value in Uint32Array.
1153        //
1154        // According to googling, OSes use 48-bit address space on 64 bit machines.
1155        // Even if so, it could be dangerous to convert addresses to f64
1156        // when OSes fill preceding bits to 1.
1157        f: fn(arg: JsValue),
1158    }
1159
1160    impl MessageFn {
1161        const HEADER: MessageHeader = MessageHeader::FN;
1162
1163        /// Returns minimum length of [`js_sys::Array`] for this message.
1164        const fn message_array_len() -> u32 {
1165            // 3, message looks like [header, [body array], arg].
1166            const BODY_LEN: u32 = 1;
1167            const ARG_LEN: u32 = 1;
1168            HEADER_LEN as u32 + BODY_LEN + ARG_LEN
1169        }
1170
1171        fn post_to(self, worker: &web_sys::Worker, arg: JsValue) {
1172            let buf = js_sys::Array::new_with_length(Self::message_array_len());
1173            Self::write_header(&buf);
1174            self.write_body(&buf);
1175            Self::write_argument(&buf, arg);
1176            worker.post_message(&buf).unwrap();
1177        }
1178
1179        fn write_header(buf: &js_sys::Array) {
1180            buf.set(0, Self::HEADER.into_js_value());
1181        }
1182
1183        fn write_body(&self, buf: &js_sys::Array) {
1184            let body = FnCodec::encode_into_array(self.f);
1185            buf.set(1, body.into());
1186        }
1187
1188        fn write_argument(buf: &js_sys::Array, arg: JsValue) {
1189            buf.set(2, arg);
1190        }
1191
1192        fn read_body(buf: &js_sys::Array) -> Self {
1193            debug_assert!(buf.length() >= Self::message_array_len());
1194            assert_eq!(Self::HEADER, MessageHeader::from_js_value(buf.get(0)));
1195
1196            let body: js_sys::Uint32Array = buf.get(1).unchecked_into();
1197
1198            // Safety: We checked the message header.
1199            unsafe {
1200                let f = FnCodec::decode_from_array(&body);
1201                Self {
1202                    f: mem::transmute::<fn(), fn(JsValue)>(f),
1203                }
1204            }
1205        }
1206
1207        fn read_argument(buf: &js_sys::Array) -> JsValue {
1208            buf.get(2)
1209        }
1210    }
1211
1212    #[repr(C)]
1213    union FnCodec {
1214        src: fn(),
1215        dst: [u32; Self::len()],
1216    }
1217
1218    impl FnCodec {
1219        const fn len() -> usize {
1220            // 2 (on 64 bit) or 1 (on 32 bit)
1221            (mem::size_of::<fn()>() + U32_ROUND_UP) / U32_SIZE
1222        }
1223
1224        #[inline]
1225        fn encode_into_array<Arg, R>(f: fn(Arg) -> R) -> js_sys::Uint32Array {
1226            let arr = js_sys::Uint32Array::new_with_length(Self::len() as u32);
1227            arr.copy_from(&Self::encode(f));
1228            arr
1229        }
1230
1231        #[inline]
1232        const fn encode<Arg, R>(f: fn(Arg) -> R) -> [u32; Self::len()] {
1233            // Safety: Function pointer can safely become u32 array.
1234            unsafe {
1235                Self {
1236                    src: mem::transmute::<fn(Arg) -> R, fn()>(f),
1237                }
1238                .dst
1239            }
1240        }
1241
1242        /// # Safety
1243        ///
1244        /// Undefined behavior if the given data is not a valid function pointer.
1245        //
1246        // Why we don't return `fn(Arg) -> R` like encode_into_array().
1247        // - Imagine we're returning `fn(Arg) -> R`.
1248        //   If caller designated `&i32` to `Arg` and `()` to `R`,
1249        //   then return type will be `fn(&'x i32)`, where 'x is defined by the caller.
1250        //   Then, caller can call `fn(&'x i32)` directly, but cannot do other things
1251        //   like pushing the function pointer into `Vec<for<'a> fn(&'a i32)>`.
1252        //   Because the Vec requires `for<'a>`, which is quite generic,
1253        //   but caller can't convert 'x, which is less generic, into `for<'a>`.
1254        // - Therefore, generic is not sufficient.
1255        #[inline]
1256        unsafe fn decode_from_array(arr: &js_sys::Uint32Array) -> fn() {
1257            let mut buf: [u32; Self::len()] = [0; Self::len()];
1258            arr.copy_to(&mut buf);
1259            Self::decode(buf)
1260        }
1261
1262        /// # Safety
1263        ///
1264        /// Undefined behavior if the given data is not a valid function pointer.
1265        #[inline]
1266        const unsafe fn decode(encoded: [u32; Self::len()]) -> fn() {
1267            Self { dst: encoded }.src
1268        }
1269    }
1270
1271    #[repr(C)]
1272    union DynFnOnceCodec {
1273        src: DynFnOnce<(), ()>,
1274        dst: [u32; Self::len()],
1275    }
1276
1277    impl DynFnOnceCodec {
1278        const fn len() -> usize {
1279            (mem::size_of::<DynFnOnce<(), ()>>() + U32_ROUND_UP) / U32_SIZE
1280        }
1281
1282        #[inline]
1283        fn encode_into_array<Arg, R>(f: DynFnOnce<Arg, R>) -> js_sys::Uint32Array {
1284            let arr = js_sys::Uint32Array::new_with_length(Self::len() as u32);
1285            arr.copy_from(&Self::encode(f));
1286            arr
1287        }
1288
1289        #[inline]
1290        const fn encode<Arg, R>(f: DynFnOnce<Arg, R>) -> [u32; Self::len()] {
1291            // Safety: Dynamic function pointer can safely become u32 array.
1292            unsafe {
1293                Self {
1294                    src: mem::transmute::<DynFnOnce<Arg, R>, DynFnOnce<(), ()>>(f),
1295                }
1296                .dst
1297            }
1298        }
1299
1300        /// # Safety
1301        ///
1302        /// Undefined behavior if the given data is not a valid [`DynFnOnce`].
1303        /// Also, return value must be cast as the original type.
1304        #[inline]
1305        unsafe fn decode_from_array(arr: &js_sys::Uint32Array) -> DynFnOnceExt<(), ()> {
1306            let mut buf: [u32; Self::len()] = [0; Self::len()];
1307            arr.copy_to(&mut buf);
1308            Self::decode(buf)
1309        }
1310
1311        /// # Safety
1312        ///
1313        /// Undefined behavior if the given data is not a valid [`DynFnOnce`].
1314        /// Also, return value must be cast as the original type.
1315        #[inline]
1316        const unsafe fn decode(encoded: [u32; Self::len()]) -> DynFnOnceExt<(), ()> {
1317            let f = Self { dst: encoded }.src;
1318            DynFnOnceExt(f)
1319        }
1320    }
1321
1322    type DynFnOnce<Arg, R> = ManuallyDrop<Box<dyn FnOnce(Arg) -> R>>;
1323
1324    #[repr(transparent)]
1325    struct DynFnOnceExt<Arg, R>(DynFnOnce<Arg, R>);
1326
1327    impl<Arg, R> DynFnOnceExt<Arg, R> {
1328        /// # Safety
1329        ///
1330        /// Undefined behavior if the type is not correct.
1331        unsafe fn cast<ToArg, ToR>(self) -> DynFnOnceExt<ToArg, ToR> {
1332            unsafe { mem::transmute::<DynFnOnceExt<Arg, R>, DynFnOnceExt<ToArg, ToR>>(self) }
1333        }
1334
1335        fn call(self, arg: Arg) -> R {
1336            unsafe {
1337                let inner: DynFnOnce<Arg, R> = mem::transmute(self);
1338                let f = ManuallyDrop::into_inner(inner);
1339                f(arg)
1340            }
1341        }
1342    }
1343
1344    impl<Arg, R> Drop for DynFnOnceExt<Arg, R> {
1345        fn drop(&mut self) {
1346            let inner = &mut self.0;
1347            unsafe { ManuallyDrop::drop(inner) };
1348        }
1349    }
1350}