my_ecs/default/worker.rs
1//! Provides [`Work`](crate::ecs::worker::Work) implementations and worker pool.
2//!
3//! If build target is native, the module exposes worker type which based on
4//! [`std::thread::Thread`]. While the build target is web, the module exposes
5//! web worker instead.
6
7/// Common interface for worker pool implementations.
8pub trait AsWorkerPool<W>: From<Vec<W>> + Into<Vec<W>> {
9 /// Creates an empty worker pool.
10 ///
11 /// # Examples
12 ///
13 /// ```
14 /// use my_ecs::prelude::*;
15 ///
16 /// let pool = WorkerPool::new();
17 /// assert!(pool.is_empty());
18 /// ```
19 fn new() -> Self;
20
21 /// Creates worker pool with workers as many as number of available logical
22 /// cpus.
23 ///
24 /// Number of logical cpus depends on platform which this crate runs on.
25 /// This method guarantees the returned worker pool to have at least one
26 /// worker in it even if it failed to get the number of logical cpus.
27 ///
28 /// # Examples
29 ///
30 /// ```
31 /// use my_ecs::prelude::*;
32 ///
33 /// let pool = WorkerPool::with_all_cpus();
34 /// assert!(!pool.is_empty());
35 /// ```
36 fn with_all_cpus() -> Self {
37 #[cfg(not(target_arch = "wasm32"))]
38 let len = {
39 std::thread::available_parallelism()
40 .unwrap_or(unsafe { std::num::NonZeroUsize::new_unchecked(1) })
41 .get()
42 };
43
44 #[cfg(target_arch = "wasm32")]
45 let len = crate::util::web::available_parallelism();
46
47 Self::with_len(len)
48 }
49
50 /// Creates worker pool with `len` workers.
51 ///
52 /// # Examples
53 ///
54 /// ```
55 /// use my_ecs::prelude::*;
56 ///
57 /// let pool = WorkerPool::with_len(1);
58 /// assert_eq!(pool.len() , 1);
59 /// ```
60 fn with_len(len: usize) -> Self;
61
62 /// Returns number of workers in the worker pool.
63 fn len(&self) -> usize;
64
65 /// Returns true if the worker pool doesn't contain any workers in it.
66 fn is_empty(&self) -> bool {
67 self.len() == 0
68 }
69
70 /// Appends a worker in the worker pool.
71 ///
72 /// # Examples
73 ///
74 /// ```
75 /// use my_ecs::prelude::*;
76 ///
77 /// let mut pool = WorkerPool::new();
78 /// assert!(pool.is_empty());
79 ///
80 /// let worker = WorkerBuilder::new("name").spawn().unwrap();
81 /// pool.append(worker);
82 /// assert_eq!(pool.len(), 1);
83 /// ```
84 fn append(&mut self, worker: W);
85}
86
87#[cfg(not(target_arch = "wasm32"))]
88pub use non_web::*;
89
90#[cfg(target_arch = "wasm32")]
91pub use web::*;
92
93#[cfg(not(target_arch = "wasm32"))]
94mod non_web {
95 use super::*;
96 use crate::{ds::ManagedConstPtr, ecs::prelude::*, util};
97 use std::{
98 fmt,
99 sync::mpsc::{self, Sender},
100 thread::{Builder, JoinHandle},
101 };
102
103 /// A data type holding [`Worker`]s.
104 #[derive(Debug)]
105 #[repr(transparent)]
106 pub struct WorkerPool {
107 workers: Vec<Worker>,
108 }
109
110 impl AsWorkerPool<Worker> for WorkerPool {
111 fn new() -> Self {
112 Self {
113 workers: Vec::new(),
114 }
115 }
116
117 fn with_len(len: usize) -> Self {
118 let mut this = Self::new();
119
120 let mut name = "worker0".to_owned();
121 for _ in 0..len {
122 let worker = WorkerBuilder::new(&name).spawn().unwrap();
123 this.append(worker);
124 util::str::increase_rnumber(&mut name);
125 }
126
127 this
128 }
129
130 fn len(&self) -> usize {
131 self.workers.len()
132 }
133
134 fn append(&mut self, worker: Worker) {
135 self.workers.push(worker);
136 }
137 }
138
139 impl Default for WorkerPool {
140 fn default() -> Self {
141 Self::new()
142 }
143 }
144
145 impl From<Vec<Worker>> for WorkerPool {
146 fn from(value: Vec<Worker>) -> Self {
147 Self { workers: value }
148 }
149 }
150
151 impl From<WorkerPool> for Vec<Worker> {
152 fn from(value: WorkerPool) -> Self {
153 value.workers
154 }
155 }
156
157 /// [`Worker`] builder.
158 ///
159 /// You can spawn [`Worker`] from this builder.
160 #[derive(Debug)]
161 pub struct WorkerBuilder<'a> {
162 inner: Builder,
163 name: &'a str,
164 }
165
166 impl<'a> WorkerBuilder<'a> {
167 /// Creates a new [`WorkerBuilder`].
168 pub fn new(name: &'a str) -> Self {
169 Self {
170 inner: Builder::new().name(name.to_owned()),
171 name,
172 }
173 }
174
175 /// Sets worker's stack size in bytes.
176 pub fn stack_size(self, size: usize) -> Self {
177 Self {
178 inner: self.inner.stack_size(size),
179 name: self.name,
180 }
181 }
182
183 /// Spawns a new [`Worker`] from the builder.
184 pub fn spawn(self) -> Result<Worker, std::io::Error> {
185 Worker::spawn(self)
186 }
187 }
188
189 /// Worker handle.
190 ///
191 /// When [`Worker`] is dropped, it waits for its associated worker to
192 /// finish.
193 pub struct Worker {
194 name: Box<str>,
195 tx: Sender<Option<ManagedConstPtr<SubContext>>>,
196 join_handle: Option<JoinHandle<()>>,
197 }
198
199 impl Worker {
200 fn spawn(builder: WorkerBuilder) -> Result<Self, std::io::Error> {
201 let (tx, rx) = mpsc::channel::<Option<ManagedConstPtr<SubContext>>>();
202 let join_handle = builder.inner.spawn(move || {
203 while let Some(cx) = rx.recv().unwrap() {
204 SubContext::execute(cx);
205 }
206 })?;
207 Ok(Self {
208 name: builder.name.into(),
209 tx,
210 join_handle: Some(join_handle),
211 })
212 }
213 }
214
215 impl fmt::Debug for Worker {
216 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
217 f.debug_struct("Worker")
218 .field("name", &self.name)
219 .finish_non_exhaustive()
220 }
221 }
222
223 impl Drop for Worker {
224 fn drop(&mut self) {
225 // `rx` could be broken if worker panics.
226 let _ = self.tx.send(None);
227 // Safety: `join_handle` must have been filled.
228 let join_handle = unsafe { self.join_handle.take().unwrap_unchecked() };
229 let _ = join_handle.join();
230 }
231 }
232
233 impl Work for Worker {
234 fn unpark(&mut self, cx: ManagedConstPtr<SubContext>) -> bool {
235 let res = self.tx.send(Some(cx));
236 res.is_ok()
237 }
238
239 fn park(&mut self) -> bool {
240 true
241 }
242
243 fn name(&self) -> &str {
244 &self.name
245 }
246 }
247}
248
249#[cfg(target_arch = "wasm32")]
250mod web {
251 use super::*;
252 use crate::{
253 ds::{ManagedConstPtr, NonNullExt},
254 ecs::prelude::*,
255 util::{macros::impl_from_for_enum, prelude::*},
256 };
257 use std::{
258 cell::RefCell,
259 collections::VecDeque,
260 fmt,
261 future::Future,
262 mem,
263 mem::ManuallyDrop,
264 ops::{Deref, DerefMut},
265 pin::Pin,
266 rc::Rc,
267 sync::{
268 atomic::{AtomicBool, Ordering},
269 Arc, OnceLock,
270 },
271 };
272 use wasm_bindgen::prelude::*;
273
274 /// A data type holding [`Worker`]s.
275 #[derive(Debug)]
276 #[repr(transparent)]
277 pub struct WorkerPool {
278 workers: Vec<Worker>,
279 }
280
281 impl WorkerPool {
282 fn clear(&mut self) {
283 self.workers.clear();
284 }
285 }
286
287 impl AsWorkerPool<Worker> for WorkerPool {
288 fn new() -> Self {
289 Self {
290 workers: Vec::new(),
291 }
292 }
293
294 fn with_len(len: usize) -> Self {
295 let mut this = Self::new();
296
297 let mut name = "worker0".to_owned();
298 for _ in 0..len {
299 let worker = WorkerBuilder::new(&name).spawn().unwrap();
300 this.append(worker);
301 str_util::increase_rnumber(&mut name);
302 }
303
304 this
305 }
306
307 fn len(&self) -> usize {
308 self.workers.len()
309 }
310
311 fn append(&mut self, worker: Worker) {
312 self.workers.push(worker);
313 }
314 }
315
316 impl Default for WorkerPool {
317 fn default() -> Self {
318 Self::new()
319 }
320 }
321
322 impl From<Vec<Worker>> for WorkerPool {
323 fn from(value: Vec<Worker>) -> Self {
324 Self { workers: value }
325 }
326 }
327
328 impl From<WorkerPool> for Vec<Worker> {
329 fn from(value: WorkerPool) -> Self {
330 value.workers
331 }
332 }
333
334 /// [`MainWorker`] builder.
335 ///
336 /// You can spawn [`MainWorker`] from this builder.
337 #[derive(Debug)]
338 #[repr(transparent)]
339 pub struct MainWorkerBuilder<'a> {
340 inner: WorkerBuilder<'a>,
341 }
342
343 impl<'a> MainWorkerBuilder<'a> {
344 /// Creates a [`WorkerBuilder`] with default name 'main-worker'.
345 pub fn new() -> Self {
346 let inner = WorkerBuilder::new("main-worker").with_listen("mainOnMessage");
347 Self { inner }
348 }
349
350 /// Creates a [`WorkerBuilder`] with the given name.
351 pub fn with_name(self, name: &'a str) -> Self {
352 Self {
353 inner: self.inner.with_name(name),
354 }
355 }
356
357 /// Creates a [`WorkerBuilder`] with the given name of initialization
358 /// function.
359 ///
360 /// Default initialization function is 'mainOnMessage'.
361 pub fn with_init(self, init: &'a str) -> Self {
362 Self {
363 inner: self.inner.with_init(init),
364 }
365 }
366
367 /// Spawns a [`MainWorker`].
368 pub fn spawn(self) -> Result<MainWorker, JsValue> {
369 MainWorker::spawn(self)
370 }
371 }
372
373 impl<'a> Default for MainWorkerBuilder<'a> {
374 fn default() -> Self {
375 Self::new()
376 }
377 }
378
379 /// Main worker handle.
380 ///
381 /// Main worker is a web worker that is parent of other sub workers. Main
382 /// worker is responsible for communication with window context, spawning
383 /// sub workers, creating ecs instance, and running ecs. You can think of
384 /// main worker as main thread where 'main' function runs on native
385 /// environment.
386 ///
387 /// # Common worker hierarchy
388 ///
389 /// window - main worker - sub workers
390 ///
391 /// # Why we need main worker
392 ///
393 /// Ecs instance blocks sometimes to wait for messages from sub workers. But
394 /// browsers doesn't allow us to block on window context. So we need an
395 /// extra web worker.
396 #[derive(Debug)]
397 #[repr(transparent)]
398 pub struct MainWorker {
399 inner: Worker,
400 }
401
402 impl MainWorker {
403 fn spawn(builder: MainWorkerBuilder) -> Result<Self, JsValue> {
404 Ok(Self {
405 inner: builder.inner.spawn()?,
406 })
407 }
408
409 /// Spawns sub workers as many as the given number on the main worker.
410 ///
411 /// Sub workers are behind the main worker so that you cannot
412 /// communicate with them directly.
413 ///
414 /// # Examples
415 ///
416 /// ```
417 /// use my_ecs::prelude::*;
418 ///
419 /// let main = MainWorkerBuilder::new().spawn().unwrap();
420 /// let num_cpus = web_util::available_parallelism();
421 /// main.spawn_children(num_cpus);
422 /// ```
423 pub fn spawn_children(&self, num: usize) {
424 self.delegate(
425 |arg| {
426 let num: f64 = arg.unchecked_into_f64();
427 let num = num as usize;
428 JS_MAIN_CX.with_borrow_mut(|cx| {
429 for _ in 0..num {
430 str_util::increase_rnumber(&mut cx.child_name);
431 let worker = WorkerBuilder::new(&cx.child_name).spawn().unwrap();
432 cx.pool.append(worker);
433 }
434 });
435 },
436 num.into(),
437 );
438 }
439
440 /// Sends the main worker a function that initializes ecs instance.
441 ///
442 /// The main worker will execute the function and store the returned ecs
443 /// instance once it's ready.
444 ///
445 /// # Examples
446 ///
447 /// ```
448 /// use my_ecs::prelude::*;
449 ///
450 /// let main = MainWorkerBuilder::new().spawn().unwrap();
451 /// main.init_app(|pool| {
452 /// let num_workers = pool.len();
453 /// Ecs::default(pool, [num_workers])
454 /// });
455 /// ```
456 pub fn init_app<F, R>(&self, f: F)
457 where
458 F: FnOnce(WorkerPool) -> R + 'static,
459 R: Into<LeakedEcsApp>,
460 {
461 let f = move |pool: WorkerPool| -> LeakedEcsApp { f(pool).into() };
462 let f: DynFnOnce<WorkerPool, LeakedEcsApp> = ManuallyDrop::new(Box::new(f));
463 helper(self, f);
464
465 // === Internal helper functions ===
466
467 fn helper(this: &MainWorker, f: DynFnOnce<WorkerPool, LeakedEcsApp>) {
468 let arg = DynFnOnceCodec::encode_into_array(f);
469 this.delegate(
470 |arg| {
471 JS_MAIN_CX.with_borrow_mut(|cx| {
472 let arg: js_sys::Uint32Array = arg.unchecked_into();
473 // Safety: `arg` is `f`.
474 unsafe {
475 let f = DynFnOnceCodec::decode_from_array(&arg)
476 .cast::<WorkerPool, LeakedEcsApp>();
477 cx.schedule_fn(f);
478 };
479 cx.consume_if_ready();
480 });
481 },
482 arg.into(),
483 );
484 }
485 }
486
487 /// Sends the main worker a function that accesses ecs instance.
488 ///
489 /// But if main worker doesn't have ecs instance, the function will be
490 /// dropped without execution. Don't forget to call
491 /// [`MainWorker::init_app`] beforehand.
492 ///
493 /// # Examples
494 ///
495 /// ```
496 /// use my_ecs::prelude::*;
497 ///
498 /// let main = MainWorkerBuilder::new().spawn().unwrap();
499 ///
500 /// main.init_app(|pool| {
501 /// let num_workers = pool.len();
502 /// Ecs::default(pool, [num_workers])
503 /// });
504 ///
505 /// main.with_app(|app| { /* ... */ });
506 /// ```
507 pub fn with_app<F>(&self, f: F)
508 where
509 F: FnOnce(EcsExt<'static>) + 'static,
510 {
511 let f: DynFnOnce<EcsExt<'static>, ()> = ManuallyDrop::new(Box::new(f));
512 helper(self, f);
513
514 // === Internal helper functions ===
515
516 fn helper(this: &MainWorker, f: DynFnOnce<EcsExt<'static>, ()>) {
517 let arg = DynFnOnceCodec::encode_into_array(f);
518 this.delegate(
519 |arg| {
520 JS_MAIN_CX.with_borrow_mut(|cx| {
521 let arg: js_sys::Uint32Array = arg.unchecked_into();
522 // Safety: `arg` is `f`.
523 unsafe {
524 let f = DynFnOnceCodec::decode_from_array(&arg)
525 .cast::<EcsExt<'static>, ()>();
526 cx.schedule_fn(f);
527 };
528 cx.consume_if_ready();
529 });
530 },
531 arg.into(),
532 );
533 }
534 }
535
536 /// Executes the given future on the main worker using JS runtime.
537 ///
538 /// This method doesn't block, but the main worker stacks other
539 /// requested functions rather than executing them until the given
540 /// future is completed.
541 ///
542 /// Web APIs which return `Promise` should be called carefully. They
543 /// eagerly put tasks to JS runtime queue, and the tasks cannot make
544 /// more progress while Rust wasm holds CPU. That means that Rust wasm
545 /// should stop its processing to complete JS `Promise`. In other words,
546 /// Rust wasm cannot wait to be woken up by `Promise`.
547 ///
548 /// # Examples
549 ///
550 /// ```
551 /// use my_ecs::prelude::*;
552 /// use web_sys::{WorkerGlobalScope, Response};
553 /// use wasm_bindgen_futures::JsFuture;
554 ///
555 /// let main = MainWorkerBuilder::new().spawn().unwrap();
556 ///
557 /// main.init_app(|pool| {
558 /// let num_workers = pool.len();
559 /// Ecs::default(pool, [num_workers])
560 /// });
561 ///
562 /// main.with_app_await(|app| async {
563 /// let global: WorkerGlobalScope = js_sys::global().unchecked_into();
564 /// let promise = global.fetch_with_str("<some-url>");
565 /// let resp = JsFuture::from(promise).await.unwrap();
566 /// });
567 /// ```
568 pub fn with_app_await<F, Fut>(&self, f: F)
569 where
570 F: FnOnce(EcsExt<'static>) -> Fut + 'static,
571 Fut: Future<Output = ()> + 'static,
572 {
573 type Arg = EcsExt<'static>;
574 type R = Pin<Box<dyn Future<Output = ()>>>;
575
576 let f = |ecs: Arg| async move {
577 f(ecs).await;
578
579 // Resumes the main worker, so that it can consume buffered
580 // functions.
581 JS_MAIN_CX.with_borrow_mut(|cx| {
582 cx.resume();
583 cx.consume_if_ready();
584 });
585 };
586 let f: Box<dyn FnOnce(Arg) -> R> = Box::new(move |ecs| Box::pin(f(ecs)));
587 let f: DynFnOnce<Arg, R> = ManuallyDrop::new(f);
588
589 helper(self, f);
590
591 // === Internal helper functions ===
592
593 fn helper(this: &MainWorker, f: DynFnOnce<Arg, R>) {
594 let arg = DynFnOnceCodec::encode_into_array(f);
595 this.delegate(
596 |arg| {
597 JS_MAIN_CX.with_borrow_mut(|cx| {
598 let arg: js_sys::Uint32Array = arg.unchecked_into();
599 // Safety: `arg` is `f`.
600 unsafe {
601 let f = DynFnOnceCodec::decode_from_array(&arg).cast::<Arg, R>();
602 cx.schedule_fn(f);
603 };
604 cx.consume_if_ready();
605 });
606 },
607 arg.into(),
608 );
609 }
610 }
611
612 /// Sends the main worker a function to call it on the main worker
613 /// context.
614 ///
615 /// The function will be called once the main worker is ready.
616 pub fn delegate(&self, f: fn(arg: JsValue), arg: JsValue) {
617 MessageFn { f }.post_to(&self.handle(), arg);
618 }
619 }
620
621 impl Drop for MainWorker {
622 fn drop(&mut self) {
623 JS_MAIN_CX.with_borrow_mut(|cx| {
624 cx.pool.clear();
625 });
626 }
627 }
628
629 impl Deref for MainWorker {
630 type Target = Worker;
631
632 fn deref(&self) -> &Self::Target {
633 &self.inner
634 }
635 }
636
637 impl DerefMut for MainWorker {
638 fn deref_mut(&mut self) -> &mut Self::Target {
639 &mut self.inner
640 }
641 }
642
643 /// Listener of messages from the main worker.
644 ///
645 /// This function is exposed to JS, and it looks like,
646 ///
647 /// ```js
648 /// // index.js
649 /// const main_worker = new Worker('worker.js');
650 /// main_worker.onmessage = (msg) => {
651 /// // this function
652 /// };
653 ///
654 /// // worker.js
655 /// postMessage('to window');
656 /// ```
657 #[wasm_bindgen(js_name = mainOnMessage)]
658 pub fn main_on_message(msg: JsValue) {
659 if let Some(arr) = msg.dyn_ref::<js_sys::Array>() {
660 let header = arr.get(0);
661 match MessageHeader::from_js_value(header).0 {
662 MessageHeader::FN_INNER => {
663 let f = MessageFn::read_body(arr).f;
664 let arg = MessageFn::read_argument(arr);
665 f(arg);
666 }
667 _ => {
668 crate::log!("[W] unknown message on main worker");
669 }
670 }
671 } else {
672 web_util::worker_post_message(&msg).unwrap();
673 }
674 }
675
676 thread_local! {
677 static JS_MAIN_CX: RefCell<MainWorkerContext> = RefCell::new(
678 MainWorkerContext::new()
679 );
680
681 static CONSUME_IF_READY: RefCell<Closure<dyn FnMut()>> = RefCell::new(
682 Closure::new(|| {
683 JS_MAIN_CX.with_borrow_mut(|cx| cx.consume_if_ready());
684 })
685 );
686 }
687
688 enum FnOnMain {
689 InitEcs(DynFnOnceExt<WorkerPool, LeakedEcsApp>),
690 WithEcs(DynFnOnceExt<EcsExt<'static>, ()>),
691 WithEcsAwait(DynFnOnceExt<EcsExt<'static>, Pin<Box<dyn Future<Output = ()>>>>),
692 }
693
694 impl_from_for_enum!(
695 "outer" = FnOnMain; "var" = InitEcs;
696 "inner" = DynFnOnceExt<WorkerPool, LeakedEcsApp>
697 );
698 impl_from_for_enum!(
699 "outer" = FnOnMain; "var" = WithEcs;
700 "inner" = DynFnOnceExt<EcsExt<'static>, ()>
701 );
702 impl_from_for_enum!(
703 "outer" = FnOnMain; "var" = WithEcsAwait;
704 "inner" = DynFnOnceExt<EcsExt<'static>, Pin<Box<dyn Future<Output = ()>>>>
705 );
706
707 struct MainWorkerContext {
708 /// Worker pool.
709 pool: WorkerPool,
710 ecs: Option<LeakedEcsApp>,
711 /// Child worker name that will be given to the next spawned child worker.
712 child_name: String,
713 pending: VecDeque<FnOnMain>,
714 pause: Arc<AtomicBool>,
715 }
716
717 impl MainWorkerContext {
718 fn new() -> Self {
719 Self {
720 pool: WorkerPool::new(),
721 ecs: None,
722 child_name: "sub-worker0".to_owned(),
723 pending: VecDeque::new(),
724 pause: Arc::new(AtomicBool::new(false)),
725 }
726 }
727
728 fn schedule_fn<F>(&mut self, f: F)
729 where
730 F: Into<FnOnMain>,
731 {
732 self.pending.push_back(f.into());
733 }
734
735 fn consume_if_ready(&mut self) {
736 // If child workers are not ready yet, we need to give CPU to JS
737 // runtime.
738 if !self.is_ready() {
739 CONSUME_IF_READY.with_borrow(|ready_run| {
740 const WAIT_MS: i32 = 10;
741 let cb = ready_run.as_ref().unchecked_ref();
742 let global = web_util::worker_global();
743 global
744 .set_timeout_with_callback_and_timeout_and_arguments_0(cb, WAIT_MS)
745 .unwrap();
746 });
747 return;
748 }
749
750 if self.is_paused() {
751 return;
752 }
753
754 while let Some(f) = self.pending.pop_front() {
755 match f {
756 FnOnMain::InitEcs(f) => {
757 let pool = mem::take(&mut self.pool);
758 let ecs = f.call(pool);
759 self.ecs = Some(ecs);
760 }
761 FnOnMain::WithEcs(f) => {
762 if let Some(ecs) = self.ecs.as_ref() {
763 // Safety: We're accessing valid ecs once at a time.
764 unsafe { f.call(ecs.get()) };
765 }
766 }
767 FnOnMain::WithEcsAwait(f) => {
768 if let Some(ecs) = self.ecs.as_ref() {
769 // Safety: We're accessing valid ecs once at a time.
770 let fut = unsafe { f.call(ecs.get()) };
771 wasm_bindgen_futures::spawn_local(fut);
772
773 // Stops consuming further before being resumed.
774 self.pause();
775 break;
776 }
777 }
778 }
779 }
780 }
781
782 fn is_ready(&self) -> bool {
783 let children = &self.pool.workers;
784 children.iter().all(|child| child.is_ready())
785 }
786
787 fn is_paused(&self) -> bool {
788 self.pause.load(Ordering::Relaxed)
789 }
790
791 fn pause(&self) {
792 self.pause.store(true, Ordering::Relaxed);
793 }
794
795 fn resume(&self) {
796 self.pause.store(false, Ordering::Relaxed);
797 }
798 }
799
800 impl fmt::Debug for MainWorkerContext {
801 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
802 f.debug_struct("MainWorkerContext")
803 .field("pool", &self.pool)
804 .finish_non_exhaustive()
805 }
806 }
807
808 impl Default for MainWorkerContext {
809 fn default() -> Self {
810 Self::new()
811 }
812 }
813
814 /// [`Worker`] builder.
815 ///
816 /// You can spawn [`Worker`] from this builder.
817 #[derive(Debug)]
818 pub struct WorkerBuilder<'a> {
819 name: &'a str,
820 script: Option<&'a str>,
821 listen: &'a str,
822 init: &'a str,
823 }
824
825 impl<'a> WorkerBuilder<'a> {
826 /// Default message listener of the worker.
827 /// See [`worker_on_message`].
828 const DEFAULT_LISTEN: &'static str = "workerOnMessage";
829 const DEFAULT_INIT: &'static str = "workerInit";
830
831 /// Creates a new [`WorkerBuilder`].
832 pub const fn new(name: &'a str) -> Self {
833 Self {
834 name,
835 script: None,
836 init: Self::DEFAULT_INIT,
837 listen: Self::DEFAULT_LISTEN,
838 }
839 }
840
841 /// Creates a new [`WorkerBuilder`] with the given name.
842 pub const fn with_name(mut self, name: &'a str) -> Self {
843 self.name = name;
844 self
845 }
846
847 /// Creates a new [`WorkerBuilder`] with the given script.
848 pub const fn with_script(mut self, script: &'a str) -> Self {
849 self.script = Some(script);
850 self
851 }
852
853 /// Creates a new [`WorkerBuilder`] with the given name of
854 /// initialization function.
855 pub const fn with_init(mut self, init: &'a str) -> Self {
856 self.init = init;
857 self
858 }
859
860 /// Creates a new [`WorkerBuilder`] with the given `onmessage` listener.
861 pub const fn with_listen(mut self, listen: &'a str) -> Self {
862 self.listen = listen;
863 self
864 }
865
866 /// Spawns a new [`Worker`] from the builder.
867 pub fn spawn(self) -> Result<Worker, JsValue> {
868 Worker::spawn(self)
869 }
870 }
871
872 /// Worker handle.
873 pub struct Worker {
874 /// JS worker handle.
875 handle: web_sys::Worker,
876
877 /// Worker name. You can see this name in browser's dev tool.
878 name: Box<str>,
879
880 /// Callback for worker's first response, which is a notification of
881 /// the worker's readiness.
882 ///
883 /// The callback will be replaced with [`Worker::on_message`] once it
884 /// called.
885 _on_ready: Closure<dyn FnMut()>,
886
887 /// Callback for worker response.
888 #[allow(clippy::type_complexity)]
889 on_message: Rc<RefCell<Closure<dyn FnMut(web_sys::MessageEvent)>>>,
890
891 /// Determines the worker is spawned and ready to listen to message.
892 ready: Arc<AtomicBool>,
893 }
894
895 impl fmt::Debug for Worker {
896 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
897 f.debug_struct("Worker")
898 .field("name", &self.name)
899 .field("ready", &self.ready.as_ref())
900 .finish_non_exhaustive()
901 }
902 }
903
904 impl Worker {
905 fn spawn(builder: WorkerBuilder) -> Result<Self, JsValue> {
906 // Creates a new worker.
907 let handle = create_worker(builder.name, builder.script)?;
908 let ready = Arc::new(AtomicBool::new(false));
909 let on_message = Rc::new(RefCell::new(Closure::new(|_| {})));
910 let c_handle = handle.clone();
911 let c_ready = Arc::clone(&ready);
912 let c_on_message = Rc::clone(&on_message);
913
914 // Listens to worker's ready notification.
915 let on_ready = Closure::new(move || {
916 let on_message = c_on_message.borrow();
917 c_handle.set_onmessage(Some(on_message.as_ref().unchecked_ref()));
918 c_ready.store(true, Ordering::Release);
919 });
920 handle.set_onmessage(Some(on_ready.as_ref().unchecked_ref()));
921
922 // Sets 'WBG_INIT' if it wasn't set yet.
923 let wasm_init = WBG_INIT.get_or_init(|| DEFAULT_WBG_INIT.to_owned());
924
925 // TODO: For now, we assume that wasm use shared memory.
926 // Initializes the worker.
927 use js_sys::{Object, Reflect};
928 let msg = Object::new();
929 Reflect::set(&msg, &"module".into(), &wasm_bindgen::module())?;
930 Reflect::set(&msg, &"memory".into(), &wasm_bindgen::memory())?;
931 Reflect::set(&msg, &"url".into(), &IMPORT_META_URL.with(JsValue::clone))?;
932 Reflect::set(&msg, &"wasmInit".into(), &wasm_init.into())?;
933 Reflect::set(&msg, &"init".into(), &builder.init.into())?;
934 Reflect::set(&msg, &"listen".into(), &builder.listen.into())?;
935 handle.post_message(&msg)?;
936
937 Ok(Self {
938 handle,
939 name: builder.name.into(),
940 _on_ready: on_ready,
941 on_message,
942 ready,
943 })
944 }
945
946 /// Returns JS worker handle.
947 pub fn handle(&self) -> web_sys::Worker {
948 self.handle.clone()
949 }
950
951 /// Returns true if the worker has been fully initialized and ready to
952 /// process messages.
953 pub fn is_ready(&self) -> bool {
954 self.ready.load(Ordering::Relaxed)
955 }
956
957 pub fn set_on_message<F>(&self, mut cb: F)
958 where
959 F: FnMut(JsValue) + 'static,
960 {
961 let cb: Closure<dyn FnMut(web_sys::MessageEvent)> =
962 Closure::new(move |ev: web_sys::MessageEvent| cb(ev.data()));
963
964 if self.is_ready() {
965 self.handle.set_onmessage(Some(cb.as_ref().unchecked_ref()));
966 }
967 *self.on_message.borrow_mut() = cb;
968 }
969
970 /// Sends the worker a message.
971 pub fn post_message(&self, msg: &JsValue) -> Result<(), JsValue> {
972 self.handle.post_message(msg)
973 }
974 }
975
976 impl Work for Worker {
977 fn unpark(&mut self, cx: ManagedConstPtr<SubContext>) -> bool {
978 let ptr = cx.as_ptr();
979
980 #[cfg(feature = "check")]
981 drop(cx);
982
983 let res = self.handle.post_message(&JsValue::from(ptr));
984 res.is_ok()
985 }
986
987 fn park(&mut self) -> bool {
988 true
989 }
990
991 fn name(&self) -> &str {
992 &self.name
993 }
994 }
995
996 impl Drop for Worker {
997 /// Terminates web worker *immediately*.
998 fn drop(&mut self) {
999 self.handle.terminate();
1000 }
1001 }
1002
1003 /// Clients can modify init function of wasm glue JS file before they call [`Worker::spawn`].
1004 /// If you don't set this value, [`DEFAULT_WBG_INIT`] will be set as default value.
1005 ///
1006 /// # Example
1007 ///
1008 /// ```rust
1009 /// use my_ecs::default::prelude::*;
1010 ///
1011 /// // Some bundlers may minify export name to '_' or 'default'.
1012 /// crate::WBG_INIT.set("_".to_owned()).unwrap();
1013 /// Worker::spawn("worker", 0).unwrap();
1014 /// ```
1015 pub static WBG_INIT: OnceLock<String> = OnceLock::new();
1016
1017 /// wasm-bindgen will generate "__wbg_init" as default export function.
1018 /// But, I expect that it will be exported as 'default'.
1019 pub const DEFAULT_WBG_INIT: &str = "default";
1020
1021 pub const DEFAULT_WORKER_SCRIPT: &str = include_str!("worker.js");
1022
1023 // Some bundlers could warn about circular dependency caused by worker
1024 // due to the cycle that looks like
1025 // "Rust wasm - (bind) -> worker.js -> (import) -> wasm".
1026 //
1027 // But, if the worker JS file is substituted with a created object,
1028 // we can avoid the warning.
1029 //
1030 // However, in that case, we need to set bundler to cooperate with the
1031 // created worker object.
1032 // For instance, in Vite(v5.4.2), you may need following settings.
1033 //
1034 // build: {
1035 // rollupOptions: {
1036 // // We need to split wasm glue module into a separate chunk
1037 // // 1. Not to include window context data.
1038 // // * wasm glue module will be imported in worker context.
1039 // // * In worker context, we can't access something like document.
1040 // // 2. To preserve indirectly used exports.
1041 // // * Rollup doesn't know that we're going to access some exported
1042 // // objects, workerOnMessage for instance, in wasm code.
1043 // // * So we need to make Rollup not to drop those objects.
1044 // //
1045 // // First of all, we need to make a new entry point for wasm.
1046 // input: {
1047 // wasm: 'pkg/wasm-index.js', // path to wasm glue file.
1048 // ...
1049 // },
1050 // ...
1051 //
1052 // // Then, put the following.
1053 // // * https://rollupjs.org/configuration-options/#preserveentrysignatures
1054 // // Although Rollup says default is already 'exports-only',
1055 // // but I guess Vite 5.4.2 changes it to `false`.
1056 // preserveEntrySignatures: 'exports-only',
1057 // },
1058 // ...
1059 // }
1060 fn create_worker(name: &str, script: Option<&str>) -> Result<web_sys::Worker, JsValue> {
1061 let opt = web_sys::WorkerOptions::new();
1062 opt.set_name(name);
1063 opt.set_type(web_sys::WorkerType::Module);
1064 web_sys::Worker::new_with_options(&script_url(script), &opt)
1065 }
1066
1067 #[wasm_bindgen]
1068 extern "C" {
1069 /// URL of wasm glue JS file.
1070 //
1071 // We need this URL of wasm glue JS file in order to import it dynamically in workers.
1072 // So that workers can share the same wasm module and memory.
1073 // But note that bundler may evaluate "import.meta.url" statically during bundling,
1074 // which is not what we want, we need to evaluate it at runtime.
1075 // Therefore, you need to configure your bundler not to do it.
1076 // (e.g. Webpack does it basically, but Vite(v5.1.6) doesn't do it)
1077 #[wasm_bindgen(thread_local_v2, js_namespace = ["import", "meta"], js_name = url)]
1078 static IMPORT_META_URL: JsValue;
1079 }
1080
1081 fn script_url(script: Option<&str>) -> String {
1082 let script = script.unwrap_or(DEFAULT_WORKER_SCRIPT);
1083 let blob_parts = js_sys::Array::new_with_length(1);
1084 blob_parts.set(0, JsValue::from_str(script));
1085
1086 let options = web_sys::BlobPropertyBag::new();
1087 options.set_type("application/javascript");
1088
1089 let blob = web_sys::Blob::new_with_str_sequence_and_options(&blob_parts, &options).unwrap();
1090 web_sys::Url::create_object_url_with_blob(&blob).unwrap()
1091 }
1092
1093 #[wasm_bindgen(js_name = workerInit)]
1094 pub fn worker_init() {
1095 // Nothing to do as default behavior.
1096 }
1097
1098 /// # Safety
1099 ///
1100 /// Undefined behavior if the pointer is not valid or aliased.
1101 #[wasm_bindgen(js_name = workerOnMessage)]
1102 pub unsafe fn worker_on_message(cx: *mut SubContext) {
1103 let cx = ManagedConstPtr::new(NonNullExt::new_unchecked(cx));
1104 SubContext::execute(cx);
1105 }
1106
1107 /// Message event header.
1108 ///
1109 /// Inner value will be transmuted into f64 and vice versa. You must
1110 /// guarantee that f64 representation is not Nan (or Inf). If so,
1111 /// someone(maybe JS) can change its bit expression into something else with
1112 /// just preserving the meaning, Nan or Inf or something like that.
1113 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
1114 pub struct MessageHeader(pub u64);
1115
1116 impl MessageHeader {
1117 const MAGIC: u64 = 0x0EC5 << (16 * 3);
1118
1119 const BASE: u64 = Self::MAGIC;
1120
1121 const FN_INNER: u64 = Self::BASE + 1;
1122 const FN: Self = Self(Self::FN_INNER);
1123
1124 pub fn into_js_value(self) -> JsValue {
1125 let value = f64::from_bits(self.0);
1126 debug_assert!(value.is_finite()); // Nan or Inf are now allowed.
1127 JsValue::from_f64(value)
1128 }
1129
1130 pub fn from_js_value(value: JsValue) -> Self {
1131 Self(value.unchecked_into_f64().to_bits())
1132 }
1133 }
1134
1135 const F64_SIZE: usize = mem::size_of::<f64>();
1136 const F64_ROUND_UP: usize = F64_SIZE - 1;
1137 const HEADER_LEN: usize = (mem::size_of::<MessageHeader>() + F64_ROUND_UP) / F64_SIZE;
1138 const U32_SIZE: usize = mem::size_of::<u32>();
1139 const U32_ROUND_UP: usize = U32_SIZE - 1;
1140
1141 /// Instant message for sending a function.
1142 #[derive(Debug, Clone, Copy)]
1143 #[repr(C)]
1144 struct MessageFn {
1145 /// Function pointer.
1146 //
1147 // NOTE: We cannot turn an address into f64 directly.
1148 // Address could be 'inf' or 'nan' in f64 representation.
1149 // See https://en.wikipedia.org/wiki/IEEE_754-1985
1150 // Those numbers will keep the meaning of 'inf' or 'nan' though,
1151 // they can be freely different number in perspective of bit.
1152 // So we put this value in Uint32Array.
1153 //
1154 // According to googling, OSes use 48-bit address space on 64 bit machines.
1155 // Even if so, it could be dangerous to convert addresses to f64
1156 // when OSes fill preceding bits to 1.
1157 f: fn(arg: JsValue),
1158 }
1159
1160 impl MessageFn {
1161 const HEADER: MessageHeader = MessageHeader::FN;
1162
1163 /// Returns minimum length of [`js_sys::Array`] for this message.
1164 const fn message_array_len() -> u32 {
1165 // 3, message looks like [header, [body array], arg].
1166 const BODY_LEN: u32 = 1;
1167 const ARG_LEN: u32 = 1;
1168 HEADER_LEN as u32 + BODY_LEN + ARG_LEN
1169 }
1170
1171 fn post_to(self, worker: &web_sys::Worker, arg: JsValue) {
1172 let buf = js_sys::Array::new_with_length(Self::message_array_len());
1173 Self::write_header(&buf);
1174 self.write_body(&buf);
1175 Self::write_argument(&buf, arg);
1176 worker.post_message(&buf).unwrap();
1177 }
1178
1179 fn write_header(buf: &js_sys::Array) {
1180 buf.set(0, Self::HEADER.into_js_value());
1181 }
1182
1183 fn write_body(&self, buf: &js_sys::Array) {
1184 let body = FnCodec::encode_into_array(self.f);
1185 buf.set(1, body.into());
1186 }
1187
1188 fn write_argument(buf: &js_sys::Array, arg: JsValue) {
1189 buf.set(2, arg);
1190 }
1191
1192 fn read_body(buf: &js_sys::Array) -> Self {
1193 debug_assert!(buf.length() >= Self::message_array_len());
1194 assert_eq!(Self::HEADER, MessageHeader::from_js_value(buf.get(0)));
1195
1196 let body: js_sys::Uint32Array = buf.get(1).unchecked_into();
1197
1198 // Safety: We checked the message header.
1199 unsafe {
1200 let f = FnCodec::decode_from_array(&body);
1201 Self {
1202 f: mem::transmute::<fn(), fn(JsValue)>(f),
1203 }
1204 }
1205 }
1206
1207 fn read_argument(buf: &js_sys::Array) -> JsValue {
1208 buf.get(2)
1209 }
1210 }
1211
1212 #[repr(C)]
1213 union FnCodec {
1214 src: fn(),
1215 dst: [u32; Self::len()],
1216 }
1217
1218 impl FnCodec {
1219 const fn len() -> usize {
1220 // 2 (on 64 bit) or 1 (on 32 bit)
1221 (mem::size_of::<fn()>() + U32_ROUND_UP) / U32_SIZE
1222 }
1223
1224 #[inline]
1225 fn encode_into_array<Arg, R>(f: fn(Arg) -> R) -> js_sys::Uint32Array {
1226 let arr = js_sys::Uint32Array::new_with_length(Self::len() as u32);
1227 arr.copy_from(&Self::encode(f));
1228 arr
1229 }
1230
1231 #[inline]
1232 const fn encode<Arg, R>(f: fn(Arg) -> R) -> [u32; Self::len()] {
1233 // Safety: Function pointer can safely become u32 array.
1234 unsafe {
1235 Self {
1236 src: mem::transmute::<fn(Arg) -> R, fn()>(f),
1237 }
1238 .dst
1239 }
1240 }
1241
1242 /// # Safety
1243 ///
1244 /// Undefined behavior if the given data is not a valid function pointer.
1245 //
1246 // Why we don't return `fn(Arg) -> R` like encode_into_array().
1247 // - Imagine we're returning `fn(Arg) -> R`.
1248 // If caller designated `&i32` to `Arg` and `()` to `R`,
1249 // then return type will be `fn(&'x i32)`, where 'x is defined by the caller.
1250 // Then, caller can call `fn(&'x i32)` directly, but cannot do other things
1251 // like pushing the function pointer into `Vec<for<'a> fn(&'a i32)>`.
1252 // Because the Vec requires `for<'a>`, which is quite generic,
1253 // but caller can't convert 'x, which is less generic, into `for<'a>`.
1254 // - Therefore, generic is not sufficient.
1255 #[inline]
1256 unsafe fn decode_from_array(arr: &js_sys::Uint32Array) -> fn() {
1257 let mut buf: [u32; Self::len()] = [0; Self::len()];
1258 arr.copy_to(&mut buf);
1259 Self::decode(buf)
1260 }
1261
1262 /// # Safety
1263 ///
1264 /// Undefined behavior if the given data is not a valid function pointer.
1265 #[inline]
1266 const unsafe fn decode(encoded: [u32; Self::len()]) -> fn() {
1267 Self { dst: encoded }.src
1268 }
1269 }
1270
1271 #[repr(C)]
1272 union DynFnOnceCodec {
1273 src: DynFnOnce<(), ()>,
1274 dst: [u32; Self::len()],
1275 }
1276
1277 impl DynFnOnceCodec {
1278 const fn len() -> usize {
1279 (mem::size_of::<DynFnOnce<(), ()>>() + U32_ROUND_UP) / U32_SIZE
1280 }
1281
1282 #[inline]
1283 fn encode_into_array<Arg, R>(f: DynFnOnce<Arg, R>) -> js_sys::Uint32Array {
1284 let arr = js_sys::Uint32Array::new_with_length(Self::len() as u32);
1285 arr.copy_from(&Self::encode(f));
1286 arr
1287 }
1288
1289 #[inline]
1290 const fn encode<Arg, R>(f: DynFnOnce<Arg, R>) -> [u32; Self::len()] {
1291 // Safety: Dynamic function pointer can safely become u32 array.
1292 unsafe {
1293 Self {
1294 src: mem::transmute::<DynFnOnce<Arg, R>, DynFnOnce<(), ()>>(f),
1295 }
1296 .dst
1297 }
1298 }
1299
1300 /// # Safety
1301 ///
1302 /// Undefined behavior if the given data is not a valid [`DynFnOnce`].
1303 /// Also, return value must be cast as the original type.
1304 #[inline]
1305 unsafe fn decode_from_array(arr: &js_sys::Uint32Array) -> DynFnOnceExt<(), ()> {
1306 let mut buf: [u32; Self::len()] = [0; Self::len()];
1307 arr.copy_to(&mut buf);
1308 Self::decode(buf)
1309 }
1310
1311 /// # Safety
1312 ///
1313 /// Undefined behavior if the given data is not a valid [`DynFnOnce`].
1314 /// Also, return value must be cast as the original type.
1315 #[inline]
1316 const unsafe fn decode(encoded: [u32; Self::len()]) -> DynFnOnceExt<(), ()> {
1317 let f = Self { dst: encoded }.src;
1318 DynFnOnceExt(f)
1319 }
1320 }
1321
1322 type DynFnOnce<Arg, R> = ManuallyDrop<Box<dyn FnOnce(Arg) -> R>>;
1323
1324 #[repr(transparent)]
1325 struct DynFnOnceExt<Arg, R>(DynFnOnce<Arg, R>);
1326
1327 impl<Arg, R> DynFnOnceExt<Arg, R> {
1328 /// # Safety
1329 ///
1330 /// Undefined behavior if the type is not correct.
1331 unsafe fn cast<ToArg, ToR>(self) -> DynFnOnceExt<ToArg, ToR> {
1332 unsafe { mem::transmute::<DynFnOnceExt<Arg, R>, DynFnOnceExt<ToArg, ToR>>(self) }
1333 }
1334
1335 fn call(self, arg: Arg) -> R {
1336 unsafe {
1337 let inner: DynFnOnce<Arg, R> = mem::transmute(self);
1338 let f = ManuallyDrop::into_inner(inner);
1339 f(arg)
1340 }
1341 }
1342 }
1343
1344 impl<Arg, R> Drop for DynFnOnceExt<Arg, R> {
1345 fn drop(&mut self) {
1346 let inner = &mut self.0;
1347 unsafe { ManuallyDrop::drop(inner) };
1348 }
1349 }
1350}