deno_runtime/ops/
worker_host.rs

1// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
2
3use crate::ops::TestingFeaturesEnabled;
4use crate::web_worker::run_web_worker;
5use crate::web_worker::SendableWebWorkerHandle;
6use crate::web_worker::WebWorker;
7use crate::web_worker::WebWorkerHandle;
8use crate::web_worker::WebWorkerType;
9use crate::web_worker::WorkerControlEvent;
10use crate::web_worker::WorkerId;
11use crate::web_worker::WorkerMetadata;
12use crate::worker::FormatJsErrorFn;
13use deno_core::op2;
14use deno_core::serde::Deserialize;
15use deno_core::CancelFuture;
16use deno_core::CancelHandle;
17use deno_core::ModuleSpecifier;
18use deno_core::OpState;
19use deno_permissions::ChildPermissionsArg;
20use deno_permissions::PermissionsContainer;
21use deno_web::deserialize_js_transferables;
22use deno_web::JsMessageData;
23use deno_web::MessagePortError;
24use log::debug;
25use std::cell::RefCell;
26use std::collections::HashMap;
27use std::rc::Rc;
28use std::sync::Arc;
29
30pub const UNSTABLE_FEATURE_NAME: &str = "worker-options";
31
32pub struct CreateWebWorkerArgs {
33  pub name: String,
34  pub worker_id: WorkerId,
35  pub parent_permissions: PermissionsContainer,
36  pub permissions: PermissionsContainer,
37  pub main_module: ModuleSpecifier,
38  pub worker_type: WebWorkerType,
39  pub close_on_idle: bool,
40  pub maybe_worker_metadata: Option<WorkerMetadata>,
41}
42
43pub type CreateWebWorkerCb = dyn Fn(CreateWebWorkerArgs) -> (WebWorker, SendableWebWorkerHandle)
44  + Sync
45  + Send;
46
47/// A holder for callback that is used to create a new
48/// WebWorker. It's a struct instead of a type alias
49/// because `GothamState` used in `OpState` overrides
50/// value if type aliases have the same underlying type
51#[derive(Clone)]
52struct CreateWebWorkerCbHolder(Arc<CreateWebWorkerCb>);
53
54#[derive(Clone)]
55struct FormatJsErrorFnHolder(Option<Arc<FormatJsErrorFn>>);
56
57pub struct WorkerThread {
58  worker_handle: WebWorkerHandle,
59  cancel_handle: Rc<CancelHandle>,
60
61  // A WorkerThread that hasn't been explicitly terminated can only be removed
62  // from the WorkersTable once close messages have been received for both the
63  // control and message channels. See `close_channel`.
64  ctrl_closed: bool,
65  message_closed: bool,
66}
67
68impl WorkerThread {
69  fn terminate(self) {
70    // Cancel recv ops when terminating the worker, so they don't show up as
71    // pending ops.
72    self.cancel_handle.cancel();
73  }
74}
75
76impl Drop for WorkerThread {
77  fn drop(&mut self) {
78    self.worker_handle.clone().terminate();
79  }
80}
81
82pub type WorkersTable = HashMap<WorkerId, WorkerThread>;
83
84deno_core::extension!(
85  deno_worker_host,
86  ops = [
87    op_create_worker,
88    op_host_terminate_worker,
89    op_host_post_message,
90    op_host_recv_ctrl,
91    op_host_recv_message,
92  ],
93  options = {
94    create_web_worker_cb: Arc<CreateWebWorkerCb>,
95    format_js_error_fn: Option<Arc<FormatJsErrorFn>>,
96  },
97  state = |state, options| {
98    state.put::<WorkersTable>(WorkersTable::default());
99
100    let create_web_worker_cb_holder =
101      CreateWebWorkerCbHolder(options.create_web_worker_cb);
102    state.put::<CreateWebWorkerCbHolder>(create_web_worker_cb_holder);
103    let format_js_error_fn_holder =
104      FormatJsErrorFnHolder(options.format_js_error_fn);
105    state.put::<FormatJsErrorFnHolder>(format_js_error_fn_holder);
106  },
107);
108
109#[derive(Deserialize)]
110#[serde(rename_all = "camelCase")]
111pub struct CreateWorkerArgs {
112  has_source_code: bool,
113  name: Option<String>,
114  permissions: Option<ChildPermissionsArg>,
115  source_code: String,
116  specifier: String,
117  worker_type: WebWorkerType,
118  close_on_idle: bool,
119}
120
121#[derive(Debug, thiserror::Error)]
122pub enum CreateWorkerError {
123  #[error("Classic workers are not supported.")]
124  ClassicWorkers,
125  #[error(transparent)]
126  Permission(deno_permissions::ChildPermissionError),
127  #[error(transparent)]
128  ModuleResolution(#[from] deno_core::ModuleResolutionError),
129  #[error(transparent)]
130  MessagePort(#[from] MessagePortError),
131  #[error("{0}")]
132  Io(#[from] std::io::Error),
133}
134
135/// Create worker as the host
136#[op2(stack_trace)]
137#[serde]
138fn op_create_worker(
139  state: &mut OpState,
140  #[serde] args: CreateWorkerArgs,
141  #[serde] maybe_worker_metadata: Option<JsMessageData>,
142) -> Result<WorkerId, CreateWorkerError> {
143  let specifier = args.specifier.clone();
144  let maybe_source_code = if args.has_source_code {
145    Some(args.source_code.clone())
146  } else {
147    None
148  };
149  let args_name = args.name;
150  let worker_type = args.worker_type;
151  if let WebWorkerType::Classic = worker_type {
152    if let TestingFeaturesEnabled(false) = state.borrow() {
153      return Err(CreateWorkerError::ClassicWorkers);
154    }
155  }
156
157  if args.permissions.is_some() {
158    super::check_unstable(
159      state,
160      UNSTABLE_FEATURE_NAME,
161      "Worker.deno.permissions",
162    );
163  }
164  let parent_permissions = state.borrow_mut::<PermissionsContainer>();
165  let worker_permissions = if let Some(child_permissions_arg) = args.permissions
166  {
167    parent_permissions
168      .create_child_permissions(child_permissions_arg)
169      .map_err(CreateWorkerError::Permission)?
170  } else {
171    parent_permissions.clone()
172  };
173  let parent_permissions = parent_permissions.clone();
174  let create_web_worker_cb = state.borrow::<CreateWebWorkerCbHolder>().clone();
175  let format_js_error_fn = state.borrow::<FormatJsErrorFnHolder>().clone();
176  let worker_id = WorkerId::new();
177
178  let module_specifier = deno_core::resolve_url(&specifier)?;
179  let worker_name = args_name.unwrap_or_default();
180
181  let (handle_sender, handle_receiver) =
182    std::sync::mpsc::sync_channel::<SendableWebWorkerHandle>(1);
183
184  // Setup new thread
185  let thread_builder = std::thread::Builder::new().name(format!("{worker_id}"));
186  let maybe_worker_metadata = if let Some(data) = maybe_worker_metadata {
187    let transferables =
188      deserialize_js_transferables(state, data.transferables)?;
189    Some(WorkerMetadata {
190      buffer: data.data,
191      transferables,
192    })
193  } else {
194    None
195  };
196  // Spawn it
197  thread_builder.spawn(move || {
198    // Any error inside this block is terminal:
199    // - JS worker is useless - meaning it throws an exception and can't do anything else,
200    //  all action done upon it should be noops
201    // - newly spawned thread exits
202
203    let (worker, external_handle) =
204      (create_web_worker_cb.0)(CreateWebWorkerArgs {
205        name: worker_name,
206        worker_id,
207        parent_permissions,
208        permissions: worker_permissions,
209        main_module: module_specifier.clone(),
210        worker_type,
211        close_on_idle: args.close_on_idle,
212        maybe_worker_metadata,
213      });
214
215    // Send thread safe handle from newly created worker to host thread
216    handle_sender.send(external_handle).unwrap();
217    drop(handle_sender);
218
219    // At this point the only method of communication with host
220    // is using `worker.internal_channels`.
221    //
222    // Host can already push messages and interact with worker.
223    run_web_worker(
224      worker,
225      module_specifier,
226      maybe_source_code,
227      format_js_error_fn.0,
228    )
229  })?;
230
231  // Receive WebWorkerHandle from newly created worker
232  let worker_handle = handle_receiver.recv().unwrap();
233
234  let worker_thread = WorkerThread {
235    worker_handle: worker_handle.into(),
236    cancel_handle: CancelHandle::new_rc(),
237    ctrl_closed: false,
238    message_closed: false,
239  };
240
241  // At this point all interactions with worker happen using thread
242  // safe handler returned from previous function calls
243  state
244    .borrow_mut::<WorkersTable>()
245    .insert(worker_id, worker_thread);
246
247  Ok(worker_id)
248}
249
250#[op2]
251fn op_host_terminate_worker(state: &mut OpState, #[serde] id: WorkerId) {
252  if let Some(worker_thread) = state.borrow_mut::<WorkersTable>().remove(&id) {
253    worker_thread.terminate();
254  } else {
255    debug!("tried to terminate non-existent worker {}", id);
256  }
257}
258
259enum WorkerChannel {
260  Ctrl,
261  Messages,
262}
263
264/// Close a worker's channel. If this results in both of a worker's channels
265/// being closed, the worker will be removed from the workers table.
266fn close_channel(
267  state: Rc<RefCell<OpState>>,
268  id: WorkerId,
269  channel: WorkerChannel,
270) {
271  use std::collections::hash_map::Entry;
272
273  let mut s = state.borrow_mut();
274  let workers = s.borrow_mut::<WorkersTable>();
275
276  // `Worker.terminate()` might have been called already, meaning that we won't
277  // find the worker in the table - in that case ignore.
278  if let Entry::Occupied(mut entry) = workers.entry(id) {
279    let terminate = {
280      let worker_thread = entry.get_mut();
281      match channel {
282        WorkerChannel::Ctrl => {
283          worker_thread.ctrl_closed = true;
284          worker_thread.message_closed
285        }
286        WorkerChannel::Messages => {
287          worker_thread.message_closed = true;
288          worker_thread.ctrl_closed
289        }
290      }
291    };
292
293    if terminate {
294      entry.remove().terminate();
295    }
296  }
297}
298
299/// Get control event from guest worker as host
300#[op2(async)]
301#[serde]
302async fn op_host_recv_ctrl(
303  state: Rc<RefCell<OpState>>,
304  #[serde] id: WorkerId,
305) -> WorkerControlEvent {
306  let (worker_handle, cancel_handle) = {
307    let state = state.borrow();
308    let workers_table = state.borrow::<WorkersTable>();
309    let maybe_handle = workers_table.get(&id);
310    if let Some(handle) = maybe_handle {
311      (handle.worker_handle.clone(), handle.cancel_handle.clone())
312    } else {
313      // If handle was not found it means worker has already shutdown
314      return WorkerControlEvent::Close;
315    }
316  };
317
318  let maybe_event = worker_handle
319    .get_control_event()
320    .or_cancel(cancel_handle)
321    .await;
322  match maybe_event {
323    Ok(Some(event)) => {
324      // Terminal error means that worker should be removed from worker table.
325      if let WorkerControlEvent::TerminalError(_) = &event {
326        close_channel(state, id, WorkerChannel::Ctrl);
327      }
328      event
329    }
330    Ok(None) => {
331      // If there was no event from worker it means it has already been closed.
332      close_channel(state, id, WorkerChannel::Ctrl);
333      WorkerControlEvent::Close
334    }
335    Err(_) => {
336      // The worker was terminated.
337      WorkerControlEvent::Close
338    }
339  }
340}
341
342#[op2(async)]
343#[serde]
344async fn op_host_recv_message(
345  state: Rc<RefCell<OpState>>,
346  #[serde] id: WorkerId,
347) -> Result<Option<JsMessageData>, MessagePortError> {
348  let (worker_handle, cancel_handle) = {
349    let s = state.borrow();
350    let workers_table = s.borrow::<WorkersTable>();
351    let maybe_handle = workers_table.get(&id);
352    if let Some(handle) = maybe_handle {
353      (handle.worker_handle.clone(), handle.cancel_handle.clone())
354    } else {
355      // If handle was not found it means worker has already shutdown
356      return Ok(None);
357    }
358  };
359
360  let ret = worker_handle
361    .port
362    .recv(state.clone())
363    .or_cancel(cancel_handle)
364    .await;
365  match ret {
366    Ok(Ok(ret)) => {
367      if ret.is_none() {
368        close_channel(state, id, WorkerChannel::Messages);
369      }
370      Ok(ret)
371    }
372    Ok(Err(err)) => Err(err),
373    Err(_) => {
374      // The worker was terminated.
375      Ok(None)
376    }
377  }
378}
379
380/// Post message to guest worker as host
381#[op2]
382fn op_host_post_message(
383  state: &mut OpState,
384  #[serde] id: WorkerId,
385  #[serde] data: JsMessageData,
386) -> Result<(), MessagePortError> {
387  if let Some(worker_thread) = state.borrow::<WorkersTable>().get(&id) {
388    debug!("post message to worker {}", id);
389    let worker_handle = worker_thread.worker_handle.clone();
390    worker_handle.port.send(state, data)?;
391  } else {
392    debug!("tried to post message to non-existent worker {}", id);
393  }
394  Ok(())
395}