1use crate::ops::TestingFeaturesEnabled;
4use crate::web_worker::run_web_worker;
5use crate::web_worker::SendableWebWorkerHandle;
6use crate::web_worker::WebWorker;
7use crate::web_worker::WebWorkerHandle;
8use crate::web_worker::WebWorkerType;
9use crate::web_worker::WorkerControlEvent;
10use crate::web_worker::WorkerId;
11use crate::web_worker::WorkerMetadata;
12use crate::worker::FormatJsErrorFn;
13use deno_core::op2;
14use deno_core::serde::Deserialize;
15use deno_core::CancelFuture;
16use deno_core::CancelHandle;
17use deno_core::ModuleSpecifier;
18use deno_core::OpState;
19use deno_permissions::ChildPermissionsArg;
20use deno_permissions::PermissionsContainer;
21use deno_web::deserialize_js_transferables;
22use deno_web::JsMessageData;
23use deno_web::MessagePortError;
24use log::debug;
25use std::cell::RefCell;
26use std::collections::HashMap;
27use std::rc::Rc;
28use std::sync::Arc;
29
30pub const UNSTABLE_FEATURE_NAME: &str = "worker-options";
31
32pub struct CreateWebWorkerArgs {
33 pub name: String,
34 pub worker_id: WorkerId,
35 pub parent_permissions: PermissionsContainer,
36 pub permissions: PermissionsContainer,
37 pub main_module: ModuleSpecifier,
38 pub worker_type: WebWorkerType,
39 pub close_on_idle: bool,
40 pub maybe_worker_metadata: Option<WorkerMetadata>,
41}
42
43pub type CreateWebWorkerCb = dyn Fn(CreateWebWorkerArgs) -> (WebWorker, SendableWebWorkerHandle)
44 + Sync
45 + Send;
46
47#[derive(Clone)]
52struct CreateWebWorkerCbHolder(Arc<CreateWebWorkerCb>);
53
54#[derive(Clone)]
55struct FormatJsErrorFnHolder(Option<Arc<FormatJsErrorFn>>);
56
57pub struct WorkerThread {
58 worker_handle: WebWorkerHandle,
59 cancel_handle: Rc<CancelHandle>,
60
61 ctrl_closed: bool,
65 message_closed: bool,
66}
67
68impl WorkerThread {
69 fn terminate(self) {
70 self.cancel_handle.cancel();
73 }
74}
75
76impl Drop for WorkerThread {
77 fn drop(&mut self) {
78 self.worker_handle.clone().terminate();
79 }
80}
81
82pub type WorkersTable = HashMap<WorkerId, WorkerThread>;
83
84deno_core::extension!(
85 deno_worker_host,
86 ops = [
87 op_create_worker,
88 op_host_terminate_worker,
89 op_host_post_message,
90 op_host_recv_ctrl,
91 op_host_recv_message,
92 ],
93 options = {
94 create_web_worker_cb: Arc<CreateWebWorkerCb>,
95 format_js_error_fn: Option<Arc<FormatJsErrorFn>>,
96 },
97 state = |state, options| {
98 state.put::<WorkersTable>(WorkersTable::default());
99
100 let create_web_worker_cb_holder =
101 CreateWebWorkerCbHolder(options.create_web_worker_cb);
102 state.put::<CreateWebWorkerCbHolder>(create_web_worker_cb_holder);
103 let format_js_error_fn_holder =
104 FormatJsErrorFnHolder(options.format_js_error_fn);
105 state.put::<FormatJsErrorFnHolder>(format_js_error_fn_holder);
106 },
107);
108
109#[derive(Deserialize)]
110#[serde(rename_all = "camelCase")]
111pub struct CreateWorkerArgs {
112 has_source_code: bool,
113 name: Option<String>,
114 permissions: Option<ChildPermissionsArg>,
115 source_code: String,
116 specifier: String,
117 worker_type: WebWorkerType,
118 close_on_idle: bool,
119}
120
121#[derive(Debug, thiserror::Error)]
122pub enum CreateWorkerError {
123 #[error("Classic workers are not supported.")]
124 ClassicWorkers,
125 #[error(transparent)]
126 Permission(deno_permissions::ChildPermissionError),
127 #[error(transparent)]
128 ModuleResolution(#[from] deno_core::ModuleResolutionError),
129 #[error(transparent)]
130 MessagePort(#[from] MessagePortError),
131 #[error("{0}")]
132 Io(#[from] std::io::Error),
133}
134
135#[op2(stack_trace)]
137#[serde]
138fn op_create_worker(
139 state: &mut OpState,
140 #[serde] args: CreateWorkerArgs,
141 #[serde] maybe_worker_metadata: Option<JsMessageData>,
142) -> Result<WorkerId, CreateWorkerError> {
143 let specifier = args.specifier.clone();
144 let maybe_source_code = if args.has_source_code {
145 Some(args.source_code.clone())
146 } else {
147 None
148 };
149 let args_name = args.name;
150 let worker_type = args.worker_type;
151 if let WebWorkerType::Classic = worker_type {
152 if let TestingFeaturesEnabled(false) = state.borrow() {
153 return Err(CreateWorkerError::ClassicWorkers);
154 }
155 }
156
157 if args.permissions.is_some() {
158 super::check_unstable(
159 state,
160 UNSTABLE_FEATURE_NAME,
161 "Worker.deno.permissions",
162 );
163 }
164 let parent_permissions = state.borrow_mut::<PermissionsContainer>();
165 let worker_permissions = if let Some(child_permissions_arg) = args.permissions
166 {
167 parent_permissions
168 .create_child_permissions(child_permissions_arg)
169 .map_err(CreateWorkerError::Permission)?
170 } else {
171 parent_permissions.clone()
172 };
173 let parent_permissions = parent_permissions.clone();
174 let create_web_worker_cb = state.borrow::<CreateWebWorkerCbHolder>().clone();
175 let format_js_error_fn = state.borrow::<FormatJsErrorFnHolder>().clone();
176 let worker_id = WorkerId::new();
177
178 let module_specifier = deno_core::resolve_url(&specifier)?;
179 let worker_name = args_name.unwrap_or_default();
180
181 let (handle_sender, handle_receiver) =
182 std::sync::mpsc::sync_channel::<SendableWebWorkerHandle>(1);
183
184 let thread_builder = std::thread::Builder::new().name(format!("{worker_id}"));
186 let maybe_worker_metadata = if let Some(data) = maybe_worker_metadata {
187 let transferables =
188 deserialize_js_transferables(state, data.transferables)?;
189 Some(WorkerMetadata {
190 buffer: data.data,
191 transferables,
192 })
193 } else {
194 None
195 };
196 thread_builder.spawn(move || {
198 let (worker, external_handle) =
204 (create_web_worker_cb.0)(CreateWebWorkerArgs {
205 name: worker_name,
206 worker_id,
207 parent_permissions,
208 permissions: worker_permissions,
209 main_module: module_specifier.clone(),
210 worker_type,
211 close_on_idle: args.close_on_idle,
212 maybe_worker_metadata,
213 });
214
215 handle_sender.send(external_handle).unwrap();
217 drop(handle_sender);
218
219 run_web_worker(
224 worker,
225 module_specifier,
226 maybe_source_code,
227 format_js_error_fn.0,
228 )
229 })?;
230
231 let worker_handle = handle_receiver.recv().unwrap();
233
234 let worker_thread = WorkerThread {
235 worker_handle: worker_handle.into(),
236 cancel_handle: CancelHandle::new_rc(),
237 ctrl_closed: false,
238 message_closed: false,
239 };
240
241 state
244 .borrow_mut::<WorkersTable>()
245 .insert(worker_id, worker_thread);
246
247 Ok(worker_id)
248}
249
250#[op2]
251fn op_host_terminate_worker(state: &mut OpState, #[serde] id: WorkerId) {
252 if let Some(worker_thread) = state.borrow_mut::<WorkersTable>().remove(&id) {
253 worker_thread.terminate();
254 } else {
255 debug!("tried to terminate non-existent worker {}", id);
256 }
257}
258
259enum WorkerChannel {
260 Ctrl,
261 Messages,
262}
263
264fn close_channel(
267 state: Rc<RefCell<OpState>>,
268 id: WorkerId,
269 channel: WorkerChannel,
270) {
271 use std::collections::hash_map::Entry;
272
273 let mut s = state.borrow_mut();
274 let workers = s.borrow_mut::<WorkersTable>();
275
276 if let Entry::Occupied(mut entry) = workers.entry(id) {
279 let terminate = {
280 let worker_thread = entry.get_mut();
281 match channel {
282 WorkerChannel::Ctrl => {
283 worker_thread.ctrl_closed = true;
284 worker_thread.message_closed
285 }
286 WorkerChannel::Messages => {
287 worker_thread.message_closed = true;
288 worker_thread.ctrl_closed
289 }
290 }
291 };
292
293 if terminate {
294 entry.remove().terminate();
295 }
296 }
297}
298
299#[op2(async)]
301#[serde]
302async fn op_host_recv_ctrl(
303 state: Rc<RefCell<OpState>>,
304 #[serde] id: WorkerId,
305) -> WorkerControlEvent {
306 let (worker_handle, cancel_handle) = {
307 let state = state.borrow();
308 let workers_table = state.borrow::<WorkersTable>();
309 let maybe_handle = workers_table.get(&id);
310 if let Some(handle) = maybe_handle {
311 (handle.worker_handle.clone(), handle.cancel_handle.clone())
312 } else {
313 return WorkerControlEvent::Close;
315 }
316 };
317
318 let maybe_event = worker_handle
319 .get_control_event()
320 .or_cancel(cancel_handle)
321 .await;
322 match maybe_event {
323 Ok(Some(event)) => {
324 if let WorkerControlEvent::TerminalError(_) = &event {
326 close_channel(state, id, WorkerChannel::Ctrl);
327 }
328 event
329 }
330 Ok(None) => {
331 close_channel(state, id, WorkerChannel::Ctrl);
333 WorkerControlEvent::Close
334 }
335 Err(_) => {
336 WorkerControlEvent::Close
338 }
339 }
340}
341
342#[op2(async)]
343#[serde]
344async fn op_host_recv_message(
345 state: Rc<RefCell<OpState>>,
346 #[serde] id: WorkerId,
347) -> Result<Option<JsMessageData>, MessagePortError> {
348 let (worker_handle, cancel_handle) = {
349 let s = state.borrow();
350 let workers_table = s.borrow::<WorkersTable>();
351 let maybe_handle = workers_table.get(&id);
352 if let Some(handle) = maybe_handle {
353 (handle.worker_handle.clone(), handle.cancel_handle.clone())
354 } else {
355 return Ok(None);
357 }
358 };
359
360 let ret = worker_handle
361 .port
362 .recv(state.clone())
363 .or_cancel(cancel_handle)
364 .await;
365 match ret {
366 Ok(Ok(ret)) => {
367 if ret.is_none() {
368 close_channel(state, id, WorkerChannel::Messages);
369 }
370 Ok(ret)
371 }
372 Ok(Err(err)) => Err(err),
373 Err(_) => {
374 Ok(None)
376 }
377 }
378}
379
380#[op2]
382fn op_host_post_message(
383 state: &mut OpState,
384 #[serde] id: WorkerId,
385 #[serde] data: JsMessageData,
386) -> Result<(), MessagePortError> {
387 if let Some(worker_thread) = state.borrow::<WorkersTable>().get(&id) {
388 debug!("post message to worker {}", id);
389 let worker_handle = worker_thread.worker_handle.clone();
390 worker_handle.port.send(state, data)?;
391 } else {
392 debug!("tried to post message to non-existent worker {}", id);
393 }
394 Ok(())
395}