gloo_worker/actor/
spawner.rs1use std::cell::RefCell;
2use std::collections::HashMap;
3use std::fmt;
4use std::marker::PhantomData;
5use std::rc::{Rc, Weak};
6
7use gloo_utils::window;
8use js_sys::Array;
9use serde::de::Deserialize;
10use serde::ser::Serialize;
11use web_sys::{Blob, BlobPropertyBag, Url, WorkerOptions, WorkerType};
12
13use super::bridge::{CallbackMap, WorkerBridge};
14use super::handler_id::HandlerId;
15use super::messages::FromWorker;
16use super::native_worker::{DedicatedWorker, NativeWorkerExt};
17use super::traits::Worker;
18use super::{Callback, Shared};
19use crate::codec::{Bincode, Codec};
20
21#[derive(Clone)]
23pub struct WorkerSpawner<W, CODEC = Bincode>
24where
25 W: Worker,
26 CODEC: Codec,
27{
28 _marker: PhantomData<(W, CODEC)>,
29 callback: Option<Callback<W::Output>>,
30 with_loader: bool,
31 as_module: bool,
32}
33
34impl<W, CODEC> fmt::Debug for WorkerSpawner<W, CODEC>
35where
36 W: Worker,
37 CODEC: Codec,
38{
39 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40 f.write_str("WorkerScope<_>")
41 }
42}
43
44impl<W, CODEC> Default for WorkerSpawner<W, CODEC>
45where
46 W: Worker + 'static,
47 CODEC: Codec,
48{
49 fn default() -> Self {
50 Self::new()
51 }
52}
53
54impl<W, CODEC> WorkerSpawner<W, CODEC>
55where
56 W: Worker + 'static,
57 CODEC: Codec,
58{
59 pub const fn new() -> Self {
61 Self {
62 _marker: PhantomData,
63 callback: None,
64 with_loader: false,
65 as_module: true,
66 }
67 }
68
69 pub fn encoding<C>(&mut self) -> WorkerSpawner<W, C>
71 where
72 C: Codec,
73 {
74 WorkerSpawner {
75 _marker: PhantomData,
76 callback: self.callback.clone(),
77 with_loader: self.with_loader,
78 as_module: self.as_module,
79 }
80 }
81
82 pub fn callback<F>(&mut self, cb: F) -> &mut Self
84 where
85 F: 'static + Fn(W::Output),
86 {
87 self.callback = Some(Rc::new(cb));
88
89 self
90 }
91
92 pub fn with_loader(&mut self, with_loader: bool) -> &mut Self {
97 self.with_loader = with_loader;
98
99 self
100 }
101
102 pub fn as_module(&mut self, as_module: bool) -> &mut Self {
111 self.as_module = as_module;
112
113 self
114 }
115
116 pub fn spawn(&self, path: &str) -> WorkerBridge<W>
118 where
119 W::Input: Serialize + for<'de> Deserialize<'de>,
120 W::Output: Serialize + for<'de> Deserialize<'de>,
121 {
122 let worker = self.create_worker(path).expect("failed to spawn worker");
123
124 self.spawn_inner(worker)
125 }
126
127 fn spawn_inner(&self, worker: DedicatedWorker) -> WorkerBridge<W>
128 where
129 W::Input: Serialize + for<'de> Deserialize<'de>,
130 W::Output: Serialize + for<'de> Deserialize<'de>,
131 {
132 let pending_queue = Rc::new(RefCell::new(Some(Vec::new())));
133 let handler_id = HandlerId::new();
134 let mut callbacks = HashMap::new();
135
136 if let Some(m) = self.callback.as_ref().map(Rc::downgrade) {
137 callbacks.insert(handler_id, m);
138 }
139
140 let callbacks: Shared<CallbackMap<W>> = Rc::new(RefCell::new(callbacks));
141
142 let handler = {
143 let pending_queue = pending_queue.clone();
144 let callbacks = callbacks.clone();
145
146 let worker = worker.clone();
147
148 move |msg: FromWorker<W>| match msg {
149 FromWorker::WorkerLoaded => {
150 if let Some(pending_queue) = pending_queue.borrow_mut().take() {
151 for to_worker in pending_queue.into_iter() {
152 worker.post_packed_message::<_, CODEC>(to_worker);
153 }
154 }
155 }
156 FromWorker::ProcessOutput(id, output) => {
157 let mut callbacks = callbacks.borrow_mut();
158
159 if let Some(m) = callbacks.get(&id) {
160 if let Some(m) = Weak::upgrade(m) {
161 m(output);
162 } else {
163 callbacks.remove(&id);
164 }
165 }
166 }
167 }
168 };
169
170 worker.set_on_packed_message::<_, CODEC, _>(handler);
171
172 WorkerBridge::<W>::new::<CODEC>(
173 handler_id,
174 worker,
175 pending_queue,
176 callbacks,
177 self.callback.clone(),
178 )
179 }
180
181 fn create_worker(&self, path: &str) -> Option<DedicatedWorker> {
182 let path = if self.with_loader {
183 std::borrow::Cow::Borrowed(path)
184 } else {
185 let js_shim_url = Url::new_with_base(
186 path,
187 &window().location().href().expect("failed to read href."),
188 )
189 .expect("failed to create url for javascript entrypoint")
190 .to_string();
191
192 let wasm_url = js_shim_url.replace(".js", "_bg.wasm");
193
194 let array = Array::new();
195 let shim = if self.as_module {
196 format!(r#"import init from '{js_shim_url}';await init();"#)
197 } else {
198 format!(r#"importScripts("{js_shim_url}");wasm_bindgen("{wasm_url}");"#)
199 };
200 array.push(&shim.into());
201 let opts = BlobPropertyBag::new();
202 opts.set_type("application/javascript");
203 let blob = Blob::new_with_str_sequence_and_options(&array, &opts).unwrap();
204 let url = Url::create_object_url_with_blob(&blob).unwrap();
205 std::borrow::Cow::Owned(url)
206 };
207 let path = path.as_ref();
208
209 if self.as_module {
210 let options = WorkerOptions::new();
211 options.set_type(WorkerType::Module);
212 DedicatedWorker::new_with_options(path, &options).ok()
213 } else {
214 DedicatedWorker::new(path).ok()
215 }
216 }
217}