deno_runtime/
web_worker.rs

1// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
2
3use deno_broadcast_channel::InMemoryBroadcastChannel;
4use deno_cache::CreateCache;
5use deno_cache::SqliteBackedCache;
6use deno_core::error::AnyError;
7use deno_core::error::JsError;
8use deno_core::futures::channel::mpsc;
9use deno_core::futures::future::poll_fn;
10use deno_core::futures::stream::StreamExt;
11use deno_core::futures::task::AtomicWaker;
12use deno_core::located_script_name;
13use deno_core::serde::Deserialize;
14use deno_core::serde::Serialize;
15use deno_core::serde_json::json;
16use deno_core::v8;
17use deno_core::CancelHandle;
18use deno_core::CompiledWasmModuleStore;
19use deno_core::DetachedBuffer;
20use deno_core::Extension;
21use deno_core::FeatureChecker;
22use deno_core::GetErrorClassFn;
23use deno_core::JsRuntime;
24use deno_core::ModuleCodeString;
25use deno_core::ModuleId;
26use deno_core::ModuleLoader;
27use deno_core::ModuleSpecifier;
28use deno_core::PollEventLoopOptions;
29use deno_core::RuntimeOptions;
30use deno_core::SharedArrayBufferStore;
31use deno_cron::local::LocalCronHandler;
32use deno_fs::FileSystem;
33use deno_http::DefaultHttpPropertyExtractor;
34use deno_io::Stdio;
35use deno_kv::dynamic::MultiBackendDbHandler;
36use deno_node::NodeExtInitServices;
37use deno_permissions::PermissionsContainer;
38use deno_terminal::colors;
39use deno_tls::RootCertStoreProvider;
40use deno_tls::TlsKeys;
41use deno_web::create_entangled_message_port;
42use deno_web::serialize_transferables;
43use deno_web::BlobStore;
44use deno_web::JsMessageData;
45use deno_web::MessagePort;
46use deno_web::Transferable;
47use log::debug;
48use std::cell::RefCell;
49use std::fmt;
50use std::rc::Rc;
51use std::sync::atomic::AtomicBool;
52use std::sync::atomic::AtomicU32;
53use std::sync::atomic::Ordering;
54use std::sync::Arc;
55use std::task::Context;
56use std::task::Poll;
57
58use crate::inspector_server::InspectorServer;
59use crate::ops;
60use crate::ops::process::NpmProcessStateProviderRc;
61use crate::ops::worker_host::WorkersTable;
62use crate::shared::maybe_transpile_source;
63use crate::shared::runtime;
64use crate::tokio_util::create_and_run_current_thread;
65use crate::worker::create_op_metrics;
66use crate::worker::import_meta_resolve_callback;
67use crate::worker::validate_import_attributes_callback;
68use crate::worker::FormatJsErrorFn;
69use crate::BootstrapOptions;
70
71pub struct WorkerMetadata {
72  pub buffer: DetachedBuffer,
73  pub transferables: Vec<Transferable>,
74}
75
76static WORKER_ID_COUNTER: AtomicU32 = AtomicU32::new(1);
77
78#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
79pub struct WorkerId(u32);
80impl WorkerId {
81  pub fn new() -> WorkerId {
82    let id = WORKER_ID_COUNTER.fetch_add(1, Ordering::SeqCst);
83    WorkerId(id)
84  }
85}
86impl fmt::Display for WorkerId {
87  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88    write!(f, "worker-{}", self.0)
89  }
90}
91impl Default for WorkerId {
92  fn default() -> Self {
93    Self::new()
94  }
95}
96
97#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
98#[serde(rename_all = "lowercase")]
99pub enum WebWorkerType {
100  Classic,
101  Module,
102}
103
104/// Events that are sent to host from child
105/// worker.
106pub enum WorkerControlEvent {
107  Error(AnyError),
108  TerminalError(AnyError),
109  Close,
110}
111
112use deno_core::serde::Serializer;
113
114impl Serialize for WorkerControlEvent {
115  fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
116  where
117    S: Serializer,
118  {
119    let type_id = match &self {
120      WorkerControlEvent::TerminalError(_) => 1_i32,
121      WorkerControlEvent::Error(_) => 2_i32,
122      WorkerControlEvent::Close => 3_i32,
123    };
124
125    match self {
126      WorkerControlEvent::TerminalError(error)
127      | WorkerControlEvent::Error(error) => {
128        let value = match error.downcast_ref::<JsError>() {
129          Some(js_error) => {
130            let frame = js_error.frames.iter().find(|f| match &f.file_name {
131              Some(s) => !s.trim_start_matches('[').starts_with("ext:"),
132              None => false,
133            });
134            json!({
135              "message": js_error.exception_message,
136              "fileName": frame.map(|f| f.file_name.as_ref()),
137              "lineNumber": frame.map(|f| f.line_number.as_ref()),
138              "columnNumber": frame.map(|f| f.column_number.as_ref()),
139            })
140          }
141          None => json!({
142            "message": error.to_string(),
143          }),
144        };
145
146        Serialize::serialize(&(type_id, value), serializer)
147      }
148      _ => Serialize::serialize(&(type_id, ()), serializer),
149    }
150  }
151}
152
153// Channels used for communication with worker's parent
154#[derive(Clone)]
155pub struct WebWorkerInternalHandle {
156  sender: mpsc::Sender<WorkerControlEvent>,
157  pub port: Rc<MessagePort>,
158  pub cancel: Rc<CancelHandle>,
159  termination_signal: Arc<AtomicBool>,
160  has_terminated: Arc<AtomicBool>,
161  terminate_waker: Arc<AtomicWaker>,
162  isolate_handle: v8::IsolateHandle,
163  pub name: String,
164  pub worker_type: WebWorkerType,
165}
166
167impl WebWorkerInternalHandle {
168  /// Post WorkerEvent to parent as a worker
169  pub fn post_event(
170    &self,
171    event: WorkerControlEvent,
172  ) -> Result<(), mpsc::TrySendError<WorkerControlEvent>> {
173    let mut sender = self.sender.clone();
174    // If the channel is closed,
175    // the worker must have terminated but the termination message has not yet been received.
176    //
177    // Therefore just treat it as if the worker has terminated and return.
178    if sender.is_closed() {
179      self.has_terminated.store(true, Ordering::SeqCst);
180      return Ok(());
181    }
182    sender.try_send(event)
183  }
184
185  /// Check if this worker is terminated or being terminated
186  pub fn is_terminated(&self) -> bool {
187    self.has_terminated.load(Ordering::SeqCst)
188  }
189
190  /// Check if this worker must terminate (because the termination signal is
191  /// set), and terminates it if so. Returns whether the worker is terminated or
192  /// being terminated, as with [`Self::is_terminated()`].
193  pub fn terminate_if_needed(&mut self) -> bool {
194    let has_terminated = self.is_terminated();
195
196    if !has_terminated && self.termination_signal.load(Ordering::SeqCst) {
197      self.terminate();
198      return true;
199    }
200
201    has_terminated
202  }
203
204  /// Terminate the worker
205  /// This function will set terminated to true, terminate the isolate and close the message channel
206  pub fn terminate(&mut self) {
207    self.cancel.cancel();
208    self.terminate_waker.wake();
209
210    // This function can be called multiple times by whomever holds
211    // the handle. However only a single "termination" should occur so
212    // we need a guard here.
213    let already_terminated = self.has_terminated.swap(true, Ordering::SeqCst);
214
215    if !already_terminated {
216      // Stop javascript execution
217      self.isolate_handle.terminate_execution();
218    }
219
220    // Wake parent by closing the channel
221    self.sender.close_channel();
222  }
223}
224
225pub struct SendableWebWorkerHandle {
226  port: MessagePort,
227  receiver: mpsc::Receiver<WorkerControlEvent>,
228  termination_signal: Arc<AtomicBool>,
229  has_terminated: Arc<AtomicBool>,
230  terminate_waker: Arc<AtomicWaker>,
231  isolate_handle: v8::IsolateHandle,
232}
233
234impl From<SendableWebWorkerHandle> for WebWorkerHandle {
235  fn from(handle: SendableWebWorkerHandle) -> Self {
236    WebWorkerHandle {
237      receiver: Rc::new(RefCell::new(handle.receiver)),
238      port: Rc::new(handle.port),
239      termination_signal: handle.termination_signal,
240      has_terminated: handle.has_terminated,
241      terminate_waker: handle.terminate_waker,
242      isolate_handle: handle.isolate_handle,
243    }
244  }
245}
246
247/// This is the handle to the web worker that the parent thread uses to
248/// communicate with the worker. It is created from a `SendableWebWorkerHandle`
249/// which is sent to the parent thread from the worker thread where it is
250/// created. The reason for this separation is that the handle first needs to be
251/// `Send` when transferring between threads, and then must be `Clone` when it
252/// has arrived on the parent thread. It can not be both at once without large
253/// amounts of Arc<Mutex> and other fun stuff.
254#[derive(Clone)]
255pub struct WebWorkerHandle {
256  pub port: Rc<MessagePort>,
257  receiver: Rc<RefCell<mpsc::Receiver<WorkerControlEvent>>>,
258  termination_signal: Arc<AtomicBool>,
259  has_terminated: Arc<AtomicBool>,
260  terminate_waker: Arc<AtomicWaker>,
261  isolate_handle: v8::IsolateHandle,
262}
263
264impl WebWorkerHandle {
265  /// Get the WorkerEvent with lock
266  /// Return error if more than one listener tries to get event
267  #[allow(clippy::await_holding_refcell_ref)] // TODO(ry) remove!
268  pub async fn get_control_event(&self) -> Option<WorkerControlEvent> {
269    let mut receiver = self.receiver.borrow_mut();
270    receiver.next().await
271  }
272
273  /// Terminate the worker
274  /// This function will set the termination signal, close the message channel,
275  /// and schedule to terminate the isolate after two seconds.
276  pub fn terminate(self) {
277    use std::thread::sleep;
278    use std::thread::spawn;
279    use std::time::Duration;
280
281    let schedule_termination =
282      !self.termination_signal.swap(true, Ordering::SeqCst);
283
284    self.port.disentangle();
285
286    if schedule_termination && !self.has_terminated.load(Ordering::SeqCst) {
287      // Wake up the worker's event loop so it can terminate.
288      self.terminate_waker.wake();
289
290      let has_terminated = self.has_terminated.clone();
291
292      // Schedule to terminate the isolate's execution.
293      spawn(move || {
294        sleep(Duration::from_secs(2));
295
296        // A worker's isolate can only be terminated once, so we need a guard
297        // here.
298        let already_terminated = has_terminated.swap(true, Ordering::SeqCst);
299
300        if !already_terminated {
301          // Stop javascript execution
302          self.isolate_handle.terminate_execution();
303        }
304      });
305    }
306  }
307}
308
309fn create_handles(
310  isolate_handle: v8::IsolateHandle,
311  name: String,
312  worker_type: WebWorkerType,
313) -> (WebWorkerInternalHandle, SendableWebWorkerHandle) {
314  let (parent_port, worker_port) = create_entangled_message_port();
315  let (ctrl_tx, ctrl_rx) = mpsc::channel::<WorkerControlEvent>(1);
316  let termination_signal = Arc::new(AtomicBool::new(false));
317  let has_terminated = Arc::new(AtomicBool::new(false));
318  let terminate_waker = Arc::new(AtomicWaker::new());
319  let internal_handle = WebWorkerInternalHandle {
320    name,
321    port: Rc::new(parent_port),
322    termination_signal: termination_signal.clone(),
323    has_terminated: has_terminated.clone(),
324    terminate_waker: terminate_waker.clone(),
325    isolate_handle: isolate_handle.clone(),
326    cancel: CancelHandle::new_rc(),
327    sender: ctrl_tx,
328    worker_type,
329  };
330  let external_handle = SendableWebWorkerHandle {
331    receiver: ctrl_rx,
332    port: worker_port,
333    termination_signal,
334    has_terminated,
335    terminate_waker,
336    isolate_handle,
337  };
338  (internal_handle, external_handle)
339}
340
341pub struct WebWorkerServiceOptions {
342  pub blob_store: Arc<BlobStore>,
343  pub broadcast_channel: InMemoryBroadcastChannel,
344  pub compiled_wasm_module_store: Option<CompiledWasmModuleStore>,
345  pub feature_checker: Arc<FeatureChecker>,
346  pub fs: Arc<dyn FileSystem>,
347  pub maybe_inspector_server: Option<Arc<InspectorServer>>,
348  pub module_loader: Rc<dyn ModuleLoader>,
349  pub node_services: Option<NodeExtInitServices>,
350  pub npm_process_state_provider: Option<NpmProcessStateProviderRc>,
351  pub permissions: PermissionsContainer,
352  pub root_cert_store_provider: Option<Arc<dyn RootCertStoreProvider>>,
353  pub shared_array_buffer_store: Option<SharedArrayBufferStore>,
354}
355
356pub struct WebWorkerOptions {
357  pub name: String,
358  pub main_module: ModuleSpecifier,
359  pub worker_id: WorkerId,
360  pub bootstrap: BootstrapOptions,
361  pub extensions: Vec<Extension>,
362  pub startup_snapshot: Option<&'static [u8]>,
363  pub unsafely_ignore_certificate_errors: Option<Vec<String>>,
364  /// Optional isolate creation parameters, such as heap limits.
365  pub create_params: Option<v8::CreateParams>,
366  pub seed: Option<u64>,
367  pub create_web_worker_cb: Arc<ops::worker_host::CreateWebWorkerCb>,
368  pub format_js_error_fn: Option<Arc<FormatJsErrorFn>>,
369  pub worker_type: WebWorkerType,
370  pub get_error_class_fn: Option<GetErrorClassFn>,
371  pub cache_storage_dir: Option<std::path::PathBuf>,
372  pub stdio: Stdio,
373  pub strace_ops: Option<Vec<String>>,
374  pub close_on_idle: bool,
375  pub maybe_worker_metadata: Option<WorkerMetadata>,
376  pub enable_stack_trace_arg_in_ops: bool,
377}
378
379/// This struct is an implementation of `Worker` Web API
380///
381/// Each `WebWorker` is either a child of `MainWorker` or other
382/// `WebWorker`.
383pub struct WebWorker {
384  id: WorkerId,
385  pub js_runtime: JsRuntime,
386  pub name: String,
387  close_on_idle: bool,
388  has_executed_main_module: bool,
389  internal_handle: WebWorkerInternalHandle,
390  pub worker_type: WebWorkerType,
391  pub main_module: ModuleSpecifier,
392  poll_for_messages_fn: Option<v8::Global<v8::Value>>,
393  has_message_event_listener_fn: Option<v8::Global<v8::Value>>,
394  bootstrap_fn_global: Option<v8::Global<v8::Function>>,
395  // Consumed when `bootstrap_fn` is called
396  maybe_worker_metadata: Option<WorkerMetadata>,
397}
398
399impl Drop for WebWorker {
400  fn drop(&mut self) {
401    // clean up the package.json thread local cache
402    node_resolver::PackageJsonThreadLocalCache::clear();
403  }
404}
405
406impl WebWorker {
407  pub fn bootstrap_from_options(
408    services: WebWorkerServiceOptions,
409    options: WebWorkerOptions,
410  ) -> (Self, SendableWebWorkerHandle) {
411    let (mut worker, handle, bootstrap_options) =
412      Self::from_options(services, options);
413    worker.bootstrap(&bootstrap_options);
414    (worker, handle)
415  }
416
417  fn from_options(
418    services: WebWorkerServiceOptions,
419    mut options: WebWorkerOptions,
420  ) -> (Self, SendableWebWorkerHandle, BootstrapOptions) {
421    deno_core::extension!(deno_permissions_web_worker,
422      options = {
423        permissions: PermissionsContainer,
424        enable_testing_features: bool,
425      },
426      state = |state, options| {
427        state.put::<PermissionsContainer>(options.permissions);
428        state.put(ops::TestingFeaturesEnabled(options.enable_testing_features));
429      },
430    );
431
432    // Permissions: many ops depend on this
433    let enable_testing_features = options.bootstrap.enable_testing_features;
434    let create_cache = options.cache_storage_dir.map(|storage_dir| {
435      let create_cache_fn = move || SqliteBackedCache::new(storage_dir.clone());
436      CreateCache(Arc::new(create_cache_fn))
437    });
438
439    // NOTE(bartlomieju): ordering is important here, keep it in sync with
440    // `runtime/worker.rs` and `runtime/snapshot.rs`!
441
442    let mut extensions = vec![
443      deno_telemetry::deno_telemetry::init_ops_and_esm(),
444      // Web APIs
445      deno_webidl::deno_webidl::init_ops_and_esm(),
446      deno_console::deno_console::init_ops_and_esm(),
447      deno_url::deno_url::init_ops_and_esm(),
448      deno_web::deno_web::init_ops_and_esm::<PermissionsContainer>(
449        services.blob_store,
450        Some(options.main_module.clone()),
451      ),
452      deno_webgpu::deno_webgpu::init_ops_and_esm(),
453      deno_canvas::deno_canvas::init_ops_and_esm(),
454      deno_fetch::deno_fetch::init_ops_and_esm::<PermissionsContainer>(
455        deno_fetch::Options {
456          user_agent: options.bootstrap.user_agent.clone(),
457          root_cert_store_provider: services.root_cert_store_provider.clone(),
458          unsafely_ignore_certificate_errors: options
459            .unsafely_ignore_certificate_errors
460            .clone(),
461          file_fetch_handler: Rc::new(deno_fetch::FsFetchHandler),
462          ..Default::default()
463        },
464      ),
465      deno_cache::deno_cache::init_ops_and_esm::<SqliteBackedCache>(
466        create_cache,
467      ),
468      deno_websocket::deno_websocket::init_ops_and_esm::<PermissionsContainer>(
469        options.bootstrap.user_agent.clone(),
470        services.root_cert_store_provider.clone(),
471        options.unsafely_ignore_certificate_errors.clone(),
472      ),
473      deno_webstorage::deno_webstorage::init_ops_and_esm(None).disable(),
474      deno_crypto::deno_crypto::init_ops_and_esm(options.seed),
475      deno_broadcast_channel::deno_broadcast_channel::init_ops_and_esm(
476        services.broadcast_channel,
477      ),
478      deno_ffi::deno_ffi::init_ops_and_esm::<PermissionsContainer>(),
479      deno_net::deno_net::init_ops_and_esm::<PermissionsContainer>(
480        services.root_cert_store_provider.clone(),
481        options.unsafely_ignore_certificate_errors.clone(),
482      ),
483      deno_tls::deno_tls::init_ops_and_esm(),
484      deno_kv::deno_kv::init_ops_and_esm(
485        MultiBackendDbHandler::remote_or_sqlite::<PermissionsContainer>(
486          None,
487          options.seed,
488          deno_kv::remote::HttpOptions {
489            user_agent: options.bootstrap.user_agent.clone(),
490            root_cert_store_provider: services.root_cert_store_provider,
491            unsafely_ignore_certificate_errors: options
492              .unsafely_ignore_certificate_errors
493              .clone(),
494            client_cert_chain_and_key: TlsKeys::Null,
495            proxy: None,
496          },
497        ),
498        deno_kv::KvConfig::builder().build(),
499      ),
500      deno_cron::deno_cron::init_ops_and_esm(LocalCronHandler::new()),
501      deno_napi::deno_napi::init_ops_and_esm::<PermissionsContainer>(),
502      deno_http::deno_http::init_ops_and_esm::<DefaultHttpPropertyExtractor>(
503        deno_http::Options::default(),
504      ),
505      deno_io::deno_io::init_ops_and_esm(Some(options.stdio)),
506      deno_fs::deno_fs::init_ops_and_esm::<PermissionsContainer>(
507        services.fs.clone(),
508      ),
509      deno_node::deno_node::init_ops_and_esm::<PermissionsContainer>(
510        services.node_services,
511        services.fs,
512      ),
513      // Runtime ops that are always initialized for WebWorkers
514      ops::runtime::deno_runtime::init_ops_and_esm(options.main_module.clone()),
515      ops::worker_host::deno_worker_host::init_ops_and_esm(
516        options.create_web_worker_cb,
517        options.format_js_error_fn,
518      ),
519      ops::fs_events::deno_fs_events::init_ops_and_esm(),
520      ops::os::deno_os_worker::init_ops_and_esm(),
521      ops::permissions::deno_permissions::init_ops_and_esm(),
522      ops::process::deno_process::init_ops_and_esm(
523        services.npm_process_state_provider,
524      ),
525      ops::signal::deno_signal::init_ops_and_esm(),
526      ops::tty::deno_tty::init_ops_and_esm(),
527      ops::http::deno_http_runtime::init_ops_and_esm(),
528      ops::bootstrap::deno_bootstrap::init_ops_and_esm(
529        if options.startup_snapshot.is_some() {
530          None
531        } else {
532          Some(Default::default())
533        },
534      ),
535      deno_permissions_web_worker::init_ops_and_esm(
536        services.permissions,
537        enable_testing_features,
538      ),
539      runtime::init_ops_and_esm(),
540      ops::web_worker::deno_web_worker::init_ops_and_esm(),
541    ];
542
543    #[cfg(feature = "hmr")]
544    assert!(
545      cfg!(not(feature = "only_snapshotted_js_sources")),
546      "'hmr' is incompatible with 'only_snapshotted_js_sources'."
547    );
548
549    for extension in &mut extensions {
550      if options.startup_snapshot.is_some() {
551        extension.js_files = std::borrow::Cow::Borrowed(&[]);
552        extension.esm_files = std::borrow::Cow::Borrowed(&[]);
553        extension.esm_entry_point = None;
554      }
555    }
556
557    extensions.extend(std::mem::take(&mut options.extensions));
558
559    #[cfg(feature = "only_snapshotted_js_sources")]
560    options.startup_snapshot.as_ref().expect("A user snapshot was not provided, even though 'only_snapshotted_js_sources' is used.");
561
562    // Get our op metrics
563    let (op_summary_metrics, op_metrics_factory_fn) = create_op_metrics(
564      options.bootstrap.enable_op_summary_metrics,
565      options.strace_ops,
566    );
567
568    let mut js_runtime = JsRuntime::new(RuntimeOptions {
569      module_loader: Some(services.module_loader),
570      startup_snapshot: options.startup_snapshot,
571      create_params: options.create_params,
572      get_error_class_fn: options.get_error_class_fn,
573      shared_array_buffer_store: services.shared_array_buffer_store,
574      compiled_wasm_module_store: services.compiled_wasm_module_store,
575      extensions,
576      extension_transpiler: Some(Rc::new(|specifier, source| {
577        maybe_transpile_source(specifier, source)
578      })),
579      inspector: true,
580      feature_checker: Some(services.feature_checker),
581      op_metrics_factory_fn,
582      import_meta_resolve_callback: Some(Box::new(
583        import_meta_resolve_callback,
584      )),
585      validate_import_attributes_cb: Some(Box::new(
586        validate_import_attributes_callback,
587      )),
588      import_assertions_support: deno_core::ImportAssertionsSupport::Error,
589      maybe_op_stack_trace_callback: if options.enable_stack_trace_arg_in_ops {
590        Some(Box::new(|stack| {
591          deno_permissions::prompter::set_current_stacktrace(stack)
592        }))
593      } else {
594        None
595      },
596      ..Default::default()
597    });
598
599    if let Some(op_summary_metrics) = op_summary_metrics {
600      js_runtime.op_state().borrow_mut().put(op_summary_metrics);
601    }
602
603    // Put inspector handle into the op state so we can put a breakpoint when
604    // executing a CJS entrypoint.
605    let op_state = js_runtime.op_state();
606    let inspector = js_runtime.inspector();
607    op_state.borrow_mut().put(inspector);
608
609    if let Some(server) = services.maybe_inspector_server {
610      server.register_inspector(
611        options.main_module.to_string(),
612        &mut js_runtime,
613        false,
614      );
615    }
616
617    let (internal_handle, external_handle) = {
618      let handle = js_runtime.v8_isolate().thread_safe_handle();
619      let (internal_handle, external_handle) =
620        create_handles(handle, options.name.clone(), options.worker_type);
621      let op_state = js_runtime.op_state();
622      let mut op_state = op_state.borrow_mut();
623      op_state.put(internal_handle.clone());
624      (internal_handle, external_handle)
625    };
626
627    let bootstrap_fn_global = {
628      let context = js_runtime.main_context();
629      let scope = &mut js_runtime.handle_scope();
630      let context_local = v8::Local::new(scope, context);
631      let global_obj = context_local.global(scope);
632      let bootstrap_str =
633        v8::String::new_external_onebyte_static(scope, b"bootstrap").unwrap();
634      let bootstrap_ns: v8::Local<v8::Object> = global_obj
635        .get(scope, bootstrap_str.into())
636        .unwrap()
637        .try_into()
638        .unwrap();
639      let main_runtime_str =
640        v8::String::new_external_onebyte_static(scope, b"workerRuntime")
641          .unwrap();
642      let bootstrap_fn =
643        bootstrap_ns.get(scope, main_runtime_str.into()).unwrap();
644      let bootstrap_fn =
645        v8::Local::<v8::Function>::try_from(bootstrap_fn).unwrap();
646      v8::Global::new(scope, bootstrap_fn)
647    };
648
649    (
650      Self {
651        id: options.worker_id,
652        js_runtime,
653        name: options.name,
654        internal_handle,
655        worker_type: options.worker_type,
656        main_module: options.main_module,
657        poll_for_messages_fn: None,
658        has_message_event_listener_fn: None,
659        bootstrap_fn_global: Some(bootstrap_fn_global),
660        close_on_idle: options.close_on_idle,
661        has_executed_main_module: false,
662        maybe_worker_metadata: options.maybe_worker_metadata,
663      },
664      external_handle,
665      options.bootstrap,
666    )
667  }
668
669  pub fn bootstrap(&mut self, options: &BootstrapOptions) {
670    let op_state = self.js_runtime.op_state();
671    op_state.borrow_mut().put(options.clone());
672    // Instead of using name for log we use `worker-${id}` because
673    // WebWorkers can have empty string as name.
674    {
675      let scope = &mut self.js_runtime.handle_scope();
676      let args = options.as_v8(scope);
677      let bootstrap_fn = self.bootstrap_fn_global.take().unwrap();
678      let bootstrap_fn = v8::Local::new(scope, bootstrap_fn);
679      let undefined = v8::undefined(scope);
680      let mut worker_data: v8::Local<v8::Value> = v8::undefined(scope).into();
681      if let Some(data) = self.maybe_worker_metadata.take() {
682        let js_transferables = serialize_transferables(
683          &mut op_state.borrow_mut(),
684          data.transferables,
685        );
686        let js_message_data = JsMessageData {
687          data: data.buffer,
688          transferables: js_transferables,
689        };
690        worker_data =
691          deno_core::serde_v8::to_v8(scope, js_message_data).unwrap();
692      }
693      let name_str: v8::Local<v8::Value> =
694        v8::String::new(scope, &self.name).unwrap().into();
695      let id_str: v8::Local<v8::Value> =
696        v8::String::new(scope, &format!("{}", self.id))
697          .unwrap()
698          .into();
699      let id: v8::Local<v8::Value> =
700        v8::Integer::new(scope, self.id.0 as i32).into();
701      bootstrap_fn
702        .call(
703          scope,
704          undefined.into(),
705          &[args, name_str, id_str, id, worker_data],
706        )
707        .unwrap();
708
709      let context = scope.get_current_context();
710      let global = context.global(scope);
711      let poll_for_messages_str =
712        v8::String::new_external_onebyte_static(scope, b"pollForMessages")
713          .unwrap();
714      let poll_for_messages_fn = global
715        .get(scope, poll_for_messages_str.into())
716        .expect("get globalThis.pollForMessages");
717      global.delete(scope, poll_for_messages_str.into());
718      self.poll_for_messages_fn =
719        Some(v8::Global::new(scope, poll_for_messages_fn));
720
721      let has_message_event_listener_str =
722        v8::String::new_external_onebyte_static(
723          scope,
724          b"hasMessageEventListener",
725        )
726        .unwrap();
727      let has_message_event_listener_fn = global
728        .get(scope, has_message_event_listener_str.into())
729        .expect("get globalThis.hasMessageEventListener");
730      global.delete(scope, has_message_event_listener_str.into());
731      self.has_message_event_listener_fn =
732        Some(v8::Global::new(scope, has_message_event_listener_fn));
733    }
734  }
735
736  /// See [JsRuntime::execute_script](deno_core::JsRuntime::execute_script)
737  pub fn execute_script(
738    &mut self,
739    name: &'static str,
740    source_code: ModuleCodeString,
741  ) -> Result<(), AnyError> {
742    self.js_runtime.execute_script(name, source_code)?;
743    Ok(())
744  }
745
746  /// Loads and instantiates specified JavaScript module as "main" module.
747  pub async fn preload_main_module(
748    &mut self,
749    module_specifier: &ModuleSpecifier,
750  ) -> Result<ModuleId, AnyError> {
751    self.js_runtime.load_main_es_module(module_specifier).await
752  }
753
754  /// Loads and instantiates specified JavaScript module as "side" module.
755  pub async fn preload_side_module(
756    &mut self,
757    module_specifier: &ModuleSpecifier,
758  ) -> Result<ModuleId, AnyError> {
759    self.js_runtime.load_side_es_module(module_specifier).await
760  }
761
762  /// Loads, instantiates and executes specified JavaScript module.
763  ///
764  /// This method assumes that worker can't be terminated when executing
765  /// side module code.
766  pub async fn execute_side_module(
767    &mut self,
768    module_specifier: &ModuleSpecifier,
769  ) -> Result<(), AnyError> {
770    let id = self.preload_side_module(module_specifier).await?;
771    let mut receiver = self.js_runtime.mod_evaluate(id);
772    tokio::select! {
773      biased;
774
775      maybe_result = &mut receiver => {
776        debug!("received module evaluate {:#?}", maybe_result);
777        maybe_result
778      }
779
780      event_loop_result = self.js_runtime.run_event_loop(PollEventLoopOptions::default()) => {
781        event_loop_result?;
782        receiver.await
783      }
784    }
785  }
786
787  /// Loads, instantiates and executes specified JavaScript module.
788  ///
789  /// This module will have "import.meta.main" equal to true.
790  pub async fn execute_main_module(
791    &mut self,
792    id: ModuleId,
793  ) -> Result<(), AnyError> {
794    let mut receiver = self.js_runtime.mod_evaluate(id);
795    let poll_options = PollEventLoopOptions::default();
796
797    tokio::select! {
798      biased;
799
800      maybe_result = &mut receiver => {
801        debug!("received worker module evaluate {:#?}", maybe_result);
802        self.has_executed_main_module = true;
803        maybe_result
804      }
805
806      event_loop_result = self.run_event_loop(poll_options) => {
807        if self.internal_handle.is_terminated() {
808           return Ok(());
809        }
810        event_loop_result?;
811        receiver.await
812      }
813    }
814  }
815
816  fn poll_event_loop(
817    &mut self,
818    cx: &mut Context,
819    poll_options: PollEventLoopOptions,
820  ) -> Poll<Result<(), AnyError>> {
821    // If awakened because we are terminating, just return Ok
822    if self.internal_handle.terminate_if_needed() {
823      return Poll::Ready(Ok(()));
824    }
825
826    self.internal_handle.terminate_waker.register(cx.waker());
827
828    match self.js_runtime.poll_event_loop(cx, poll_options) {
829      Poll::Ready(r) => {
830        // If js ended because we are terminating, just return Ok
831        if self.internal_handle.terminate_if_needed() {
832          return Poll::Ready(Ok(()));
833        }
834
835        if let Err(e) = r {
836          return Poll::Ready(Err(e));
837        }
838
839        if self.close_on_idle {
840          return Poll::Ready(Ok(()));
841        }
842
843        // TODO(mmastrac): we don't want to test this w/classic workers because
844        // WPT triggers a failure here. This is only exposed via --enable-testing-features-do-not-use.
845        if self.worker_type == WebWorkerType::Module {
846          panic!(
847            "coding error: either js is polling or the worker is terminated"
848          );
849        } else {
850          log::error!("classic worker terminated unexpectedly");
851          Poll::Ready(Ok(()))
852        }
853      }
854      Poll::Pending => {
855        // This is special code path for workers created from `node:worker_threads`
856        // module that have different semantics than Web workers.
857        // We want the worker thread to terminate automatically if we've done executing
858        // Top-Level await, there are no child workers spawned by that workers
859        // and there's no "message" event listener.
860        if self.close_on_idle
861          && self.has_executed_main_module
862          && !self.has_child_workers()
863          && !self.has_message_event_listener()
864        {
865          Poll::Ready(Ok(()))
866        } else {
867          Poll::Pending
868        }
869      }
870    }
871  }
872
873  pub async fn run_event_loop(
874    &mut self,
875    poll_options: PollEventLoopOptions,
876  ) -> Result<(), AnyError> {
877    poll_fn(|cx| self.poll_event_loop(cx, poll_options)).await
878  }
879
880  // Starts polling for messages from worker host from JavaScript.
881  fn start_polling_for_messages(&mut self) {
882    let poll_for_messages_fn = self.poll_for_messages_fn.take().unwrap();
883    let scope = &mut self.js_runtime.handle_scope();
884    let poll_for_messages =
885      v8::Local::<v8::Value>::new(scope, poll_for_messages_fn);
886    let fn_ = v8::Local::<v8::Function>::try_from(poll_for_messages).unwrap();
887    let undefined = v8::undefined(scope);
888    // This call may return `None` if worker is terminated.
889    fn_.call(scope, undefined.into(), &[]);
890  }
891
892  fn has_message_event_listener(&mut self) -> bool {
893    let has_message_event_listener_fn =
894      self.has_message_event_listener_fn.as_ref().unwrap();
895    let scope = &mut self.js_runtime.handle_scope();
896    let has_message_event_listener =
897      v8::Local::<v8::Value>::new(scope, has_message_event_listener_fn);
898    let fn_ =
899      v8::Local::<v8::Function>::try_from(has_message_event_listener).unwrap();
900    let undefined = v8::undefined(scope);
901    // This call may return `None` if worker is terminated.
902    match fn_.call(scope, undefined.into(), &[]) {
903      Some(result) => result.is_true(),
904      None => false,
905    }
906  }
907
908  fn has_child_workers(&mut self) -> bool {
909    !self
910      .js_runtime
911      .op_state()
912      .borrow()
913      .borrow::<WorkersTable>()
914      .is_empty()
915  }
916}
917
918fn print_worker_error(
919  error: &AnyError,
920  name: &str,
921  format_js_error_fn: Option<&FormatJsErrorFn>,
922) {
923  let error_str = match format_js_error_fn {
924    Some(format_js_error_fn) => match error.downcast_ref::<JsError>() {
925      Some(js_error) => format_js_error_fn(js_error),
926      None => error.to_string(),
927    },
928    None => error.to_string(),
929  };
930  log::error!(
931    "{}: Uncaught (in worker \"{}\") {}",
932    colors::red_bold("error"),
933    name,
934    error_str.trim_start_matches("Uncaught "),
935  );
936}
937
938/// This function should be called from a thread dedicated to this worker.
939// TODO(bartlomieju): check if order of actions is aligned to Worker spec
940pub fn run_web_worker(
941  mut worker: WebWorker,
942  specifier: ModuleSpecifier,
943  mut maybe_source_code: Option<String>,
944  format_js_error_fn: Option<Arc<FormatJsErrorFn>>,
945) -> Result<(), AnyError> {
946  let name = worker.name.to_string();
947
948  // TODO(bartlomieju): run following block using "select!"
949  // with terminate
950
951  let fut = async move {
952    let internal_handle = worker.internal_handle.clone();
953
954    // Execute provided source code immediately
955    let result = if let Some(source_code) = maybe_source_code.take() {
956      let r = worker.execute_script(located_script_name!(), source_code.into());
957      worker.start_polling_for_messages();
958      r
959    } else {
960      // TODO(bartlomieju): add "type": "classic", ie. ability to load
961      // script instead of module
962      match worker.preload_main_module(&specifier).await {
963        Ok(id) => {
964          worker.start_polling_for_messages();
965          worker.execute_main_module(id).await
966        }
967        Err(e) => Err(e),
968      }
969    };
970
971    // If sender is closed it means that worker has already been closed from
972    // within using "globalThis.close()"
973    if internal_handle.is_terminated() {
974      return Ok(());
975    }
976
977    let result = if result.is_ok() {
978      worker
979        .run_event_loop(PollEventLoopOptions {
980          wait_for_inspector: true,
981          ..Default::default()
982        })
983        .await
984    } else {
985      result
986    };
987
988    if let Err(e) = result {
989      print_worker_error(&e, &name, format_js_error_fn.as_deref());
990      internal_handle
991        .post_event(WorkerControlEvent::TerminalError(e))
992        .expect("Failed to post message to host");
993
994      // Failure to execute script is a terminal error, bye, bye.
995      return Ok(());
996    }
997
998    debug!("Worker thread shuts down {}", &name);
999    result
1000  };
1001  create_and_run_current_thread(fut)
1002}