1use deno_broadcast_channel::InMemoryBroadcastChannel;
4use deno_cache::CreateCache;
5use deno_cache::SqliteBackedCache;
6use deno_core::error::AnyError;
7use deno_core::error::JsError;
8use deno_core::futures::channel::mpsc;
9use deno_core::futures::future::poll_fn;
10use deno_core::futures::stream::StreamExt;
11use deno_core::futures::task::AtomicWaker;
12use deno_core::located_script_name;
13use deno_core::serde::Deserialize;
14use deno_core::serde::Serialize;
15use deno_core::serde_json::json;
16use deno_core::v8;
17use deno_core::CancelHandle;
18use deno_core::CompiledWasmModuleStore;
19use deno_core::DetachedBuffer;
20use deno_core::Extension;
21use deno_core::FeatureChecker;
22use deno_core::GetErrorClassFn;
23use deno_core::JsRuntime;
24use deno_core::ModuleCodeString;
25use deno_core::ModuleId;
26use deno_core::ModuleLoader;
27use deno_core::ModuleSpecifier;
28use deno_core::PollEventLoopOptions;
29use deno_core::RuntimeOptions;
30use deno_core::SharedArrayBufferStore;
31use deno_cron::local::LocalCronHandler;
32use deno_fs::FileSystem;
33use deno_http::DefaultHttpPropertyExtractor;
34use deno_io::Stdio;
35use deno_kv::dynamic::MultiBackendDbHandler;
36use deno_node::NodeExtInitServices;
37use deno_permissions::PermissionsContainer;
38use deno_terminal::colors;
39use deno_tls::RootCertStoreProvider;
40use deno_tls::TlsKeys;
41use deno_web::create_entangled_message_port;
42use deno_web::serialize_transferables;
43use deno_web::BlobStore;
44use deno_web::JsMessageData;
45use deno_web::MessagePort;
46use deno_web::Transferable;
47use log::debug;
48use std::cell::RefCell;
49use std::fmt;
50use std::rc::Rc;
51use std::sync::atomic::AtomicBool;
52use std::sync::atomic::AtomicU32;
53use std::sync::atomic::Ordering;
54use std::sync::Arc;
55use std::task::Context;
56use std::task::Poll;
57
58use crate::inspector_server::InspectorServer;
59use crate::ops;
60use crate::ops::process::NpmProcessStateProviderRc;
61use crate::ops::worker_host::WorkersTable;
62use crate::shared::maybe_transpile_source;
63use crate::shared::runtime;
64use crate::tokio_util::create_and_run_current_thread;
65use crate::worker::create_op_metrics;
66use crate::worker::import_meta_resolve_callback;
67use crate::worker::validate_import_attributes_callback;
68use crate::worker::FormatJsErrorFn;
69use crate::BootstrapOptions;
70
71pub struct WorkerMetadata {
72 pub buffer: DetachedBuffer,
73 pub transferables: Vec<Transferable>,
74}
75
76static WORKER_ID_COUNTER: AtomicU32 = AtomicU32::new(1);
77
78#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
79pub struct WorkerId(u32);
80impl WorkerId {
81 pub fn new() -> WorkerId {
82 let id = WORKER_ID_COUNTER.fetch_add(1, Ordering::SeqCst);
83 WorkerId(id)
84 }
85}
86impl fmt::Display for WorkerId {
87 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88 write!(f, "worker-{}", self.0)
89 }
90}
91impl Default for WorkerId {
92 fn default() -> Self {
93 Self::new()
94 }
95}
96
97#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
98#[serde(rename_all = "lowercase")]
99pub enum WebWorkerType {
100 Classic,
101 Module,
102}
103
104pub enum WorkerControlEvent {
107 Error(AnyError),
108 TerminalError(AnyError),
109 Close,
110}
111
112use deno_core::serde::Serializer;
113
114impl Serialize for WorkerControlEvent {
115 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
116 where
117 S: Serializer,
118 {
119 let type_id = match &self {
120 WorkerControlEvent::TerminalError(_) => 1_i32,
121 WorkerControlEvent::Error(_) => 2_i32,
122 WorkerControlEvent::Close => 3_i32,
123 };
124
125 match self {
126 WorkerControlEvent::TerminalError(error)
127 | WorkerControlEvent::Error(error) => {
128 let value = match error.downcast_ref::<JsError>() {
129 Some(js_error) => {
130 let frame = js_error.frames.iter().find(|f| match &f.file_name {
131 Some(s) => !s.trim_start_matches('[').starts_with("ext:"),
132 None => false,
133 });
134 json!({
135 "message": js_error.exception_message,
136 "fileName": frame.map(|f| f.file_name.as_ref()),
137 "lineNumber": frame.map(|f| f.line_number.as_ref()),
138 "columnNumber": frame.map(|f| f.column_number.as_ref()),
139 })
140 }
141 None => json!({
142 "message": error.to_string(),
143 }),
144 };
145
146 Serialize::serialize(&(type_id, value), serializer)
147 }
148 _ => Serialize::serialize(&(type_id, ()), serializer),
149 }
150 }
151}
152
153#[derive(Clone)]
155pub struct WebWorkerInternalHandle {
156 sender: mpsc::Sender<WorkerControlEvent>,
157 pub port: Rc<MessagePort>,
158 pub cancel: Rc<CancelHandle>,
159 termination_signal: Arc<AtomicBool>,
160 has_terminated: Arc<AtomicBool>,
161 terminate_waker: Arc<AtomicWaker>,
162 isolate_handle: v8::IsolateHandle,
163 pub name: String,
164 pub worker_type: WebWorkerType,
165}
166
167impl WebWorkerInternalHandle {
168 pub fn post_event(
170 &self,
171 event: WorkerControlEvent,
172 ) -> Result<(), mpsc::TrySendError<WorkerControlEvent>> {
173 let mut sender = self.sender.clone();
174 if sender.is_closed() {
179 self.has_terminated.store(true, Ordering::SeqCst);
180 return Ok(());
181 }
182 sender.try_send(event)
183 }
184
185 pub fn is_terminated(&self) -> bool {
187 self.has_terminated.load(Ordering::SeqCst)
188 }
189
190 pub fn terminate_if_needed(&mut self) -> bool {
194 let has_terminated = self.is_terminated();
195
196 if !has_terminated && self.termination_signal.load(Ordering::SeqCst) {
197 self.terminate();
198 return true;
199 }
200
201 has_terminated
202 }
203
204 pub fn terminate(&mut self) {
207 self.cancel.cancel();
208 self.terminate_waker.wake();
209
210 let already_terminated = self.has_terminated.swap(true, Ordering::SeqCst);
214
215 if !already_terminated {
216 self.isolate_handle.terminate_execution();
218 }
219
220 self.sender.close_channel();
222 }
223}
224
225pub struct SendableWebWorkerHandle {
226 port: MessagePort,
227 receiver: mpsc::Receiver<WorkerControlEvent>,
228 termination_signal: Arc<AtomicBool>,
229 has_terminated: Arc<AtomicBool>,
230 terminate_waker: Arc<AtomicWaker>,
231 isolate_handle: v8::IsolateHandle,
232}
233
234impl From<SendableWebWorkerHandle> for WebWorkerHandle {
235 fn from(handle: SendableWebWorkerHandle) -> Self {
236 WebWorkerHandle {
237 receiver: Rc::new(RefCell::new(handle.receiver)),
238 port: Rc::new(handle.port),
239 termination_signal: handle.termination_signal,
240 has_terminated: handle.has_terminated,
241 terminate_waker: handle.terminate_waker,
242 isolate_handle: handle.isolate_handle,
243 }
244 }
245}
246
247#[derive(Clone)]
255pub struct WebWorkerHandle {
256 pub port: Rc<MessagePort>,
257 receiver: Rc<RefCell<mpsc::Receiver<WorkerControlEvent>>>,
258 termination_signal: Arc<AtomicBool>,
259 has_terminated: Arc<AtomicBool>,
260 terminate_waker: Arc<AtomicWaker>,
261 isolate_handle: v8::IsolateHandle,
262}
263
264impl WebWorkerHandle {
265 #[allow(clippy::await_holding_refcell_ref)] pub async fn get_control_event(&self) -> Option<WorkerControlEvent> {
269 let mut receiver = self.receiver.borrow_mut();
270 receiver.next().await
271 }
272
273 pub fn terminate(self) {
277 use std::thread::sleep;
278 use std::thread::spawn;
279 use std::time::Duration;
280
281 let schedule_termination =
282 !self.termination_signal.swap(true, Ordering::SeqCst);
283
284 self.port.disentangle();
285
286 if schedule_termination && !self.has_terminated.load(Ordering::SeqCst) {
287 self.terminate_waker.wake();
289
290 let has_terminated = self.has_terminated.clone();
291
292 spawn(move || {
294 sleep(Duration::from_secs(2));
295
296 let already_terminated = has_terminated.swap(true, Ordering::SeqCst);
299
300 if !already_terminated {
301 self.isolate_handle.terminate_execution();
303 }
304 });
305 }
306 }
307}
308
309fn create_handles(
310 isolate_handle: v8::IsolateHandle,
311 name: String,
312 worker_type: WebWorkerType,
313) -> (WebWorkerInternalHandle, SendableWebWorkerHandle) {
314 let (parent_port, worker_port) = create_entangled_message_port();
315 let (ctrl_tx, ctrl_rx) = mpsc::channel::<WorkerControlEvent>(1);
316 let termination_signal = Arc::new(AtomicBool::new(false));
317 let has_terminated = Arc::new(AtomicBool::new(false));
318 let terminate_waker = Arc::new(AtomicWaker::new());
319 let internal_handle = WebWorkerInternalHandle {
320 name,
321 port: Rc::new(parent_port),
322 termination_signal: termination_signal.clone(),
323 has_terminated: has_terminated.clone(),
324 terminate_waker: terminate_waker.clone(),
325 isolate_handle: isolate_handle.clone(),
326 cancel: CancelHandle::new_rc(),
327 sender: ctrl_tx,
328 worker_type,
329 };
330 let external_handle = SendableWebWorkerHandle {
331 receiver: ctrl_rx,
332 port: worker_port,
333 termination_signal,
334 has_terminated,
335 terminate_waker,
336 isolate_handle,
337 };
338 (internal_handle, external_handle)
339}
340
341pub struct WebWorkerServiceOptions {
342 pub blob_store: Arc<BlobStore>,
343 pub broadcast_channel: InMemoryBroadcastChannel,
344 pub compiled_wasm_module_store: Option<CompiledWasmModuleStore>,
345 pub feature_checker: Arc<FeatureChecker>,
346 pub fs: Arc<dyn FileSystem>,
347 pub maybe_inspector_server: Option<Arc<InspectorServer>>,
348 pub module_loader: Rc<dyn ModuleLoader>,
349 pub node_services: Option<NodeExtInitServices>,
350 pub npm_process_state_provider: Option<NpmProcessStateProviderRc>,
351 pub permissions: PermissionsContainer,
352 pub root_cert_store_provider: Option<Arc<dyn RootCertStoreProvider>>,
353 pub shared_array_buffer_store: Option<SharedArrayBufferStore>,
354}
355
356pub struct WebWorkerOptions {
357 pub name: String,
358 pub main_module: ModuleSpecifier,
359 pub worker_id: WorkerId,
360 pub bootstrap: BootstrapOptions,
361 pub extensions: Vec<Extension>,
362 pub startup_snapshot: Option<&'static [u8]>,
363 pub unsafely_ignore_certificate_errors: Option<Vec<String>>,
364 pub create_params: Option<v8::CreateParams>,
366 pub seed: Option<u64>,
367 pub create_web_worker_cb: Arc<ops::worker_host::CreateWebWorkerCb>,
368 pub format_js_error_fn: Option<Arc<FormatJsErrorFn>>,
369 pub worker_type: WebWorkerType,
370 pub get_error_class_fn: Option<GetErrorClassFn>,
371 pub cache_storage_dir: Option<std::path::PathBuf>,
372 pub stdio: Stdio,
373 pub strace_ops: Option<Vec<String>>,
374 pub close_on_idle: bool,
375 pub maybe_worker_metadata: Option<WorkerMetadata>,
376 pub enable_stack_trace_arg_in_ops: bool,
377}
378
379pub struct WebWorker {
384 id: WorkerId,
385 pub js_runtime: JsRuntime,
386 pub name: String,
387 close_on_idle: bool,
388 has_executed_main_module: bool,
389 internal_handle: WebWorkerInternalHandle,
390 pub worker_type: WebWorkerType,
391 pub main_module: ModuleSpecifier,
392 poll_for_messages_fn: Option<v8::Global<v8::Value>>,
393 has_message_event_listener_fn: Option<v8::Global<v8::Value>>,
394 bootstrap_fn_global: Option<v8::Global<v8::Function>>,
395 maybe_worker_metadata: Option<WorkerMetadata>,
397}
398
399impl Drop for WebWorker {
400 fn drop(&mut self) {
401 node_resolver::PackageJsonThreadLocalCache::clear();
403 }
404}
405
406impl WebWorker {
407 pub fn bootstrap_from_options(
408 services: WebWorkerServiceOptions,
409 options: WebWorkerOptions,
410 ) -> (Self, SendableWebWorkerHandle) {
411 let (mut worker, handle, bootstrap_options) =
412 Self::from_options(services, options);
413 worker.bootstrap(&bootstrap_options);
414 (worker, handle)
415 }
416
417 fn from_options(
418 services: WebWorkerServiceOptions,
419 mut options: WebWorkerOptions,
420 ) -> (Self, SendableWebWorkerHandle, BootstrapOptions) {
421 deno_core::extension!(deno_permissions_web_worker,
422 options = {
423 permissions: PermissionsContainer,
424 enable_testing_features: bool,
425 },
426 state = |state, options| {
427 state.put::<PermissionsContainer>(options.permissions);
428 state.put(ops::TestingFeaturesEnabled(options.enable_testing_features));
429 },
430 );
431
432 let enable_testing_features = options.bootstrap.enable_testing_features;
434 let create_cache = options.cache_storage_dir.map(|storage_dir| {
435 let create_cache_fn = move || SqliteBackedCache::new(storage_dir.clone());
436 CreateCache(Arc::new(create_cache_fn))
437 });
438
439 let mut extensions = vec![
443 deno_telemetry::deno_telemetry::init_ops_and_esm(),
444 deno_webidl::deno_webidl::init_ops_and_esm(),
446 deno_console::deno_console::init_ops_and_esm(),
447 deno_url::deno_url::init_ops_and_esm(),
448 deno_web::deno_web::init_ops_and_esm::<PermissionsContainer>(
449 services.blob_store,
450 Some(options.main_module.clone()),
451 ),
452 deno_webgpu::deno_webgpu::init_ops_and_esm(),
453 deno_canvas::deno_canvas::init_ops_and_esm(),
454 deno_fetch::deno_fetch::init_ops_and_esm::<PermissionsContainer>(
455 deno_fetch::Options {
456 user_agent: options.bootstrap.user_agent.clone(),
457 root_cert_store_provider: services.root_cert_store_provider.clone(),
458 unsafely_ignore_certificate_errors: options
459 .unsafely_ignore_certificate_errors
460 .clone(),
461 file_fetch_handler: Rc::new(deno_fetch::FsFetchHandler),
462 ..Default::default()
463 },
464 ),
465 deno_cache::deno_cache::init_ops_and_esm::<SqliteBackedCache>(
466 create_cache,
467 ),
468 deno_websocket::deno_websocket::init_ops_and_esm::<PermissionsContainer>(
469 options.bootstrap.user_agent.clone(),
470 services.root_cert_store_provider.clone(),
471 options.unsafely_ignore_certificate_errors.clone(),
472 ),
473 deno_webstorage::deno_webstorage::init_ops_and_esm(None).disable(),
474 deno_crypto::deno_crypto::init_ops_and_esm(options.seed),
475 deno_broadcast_channel::deno_broadcast_channel::init_ops_and_esm(
476 services.broadcast_channel,
477 ),
478 deno_ffi::deno_ffi::init_ops_and_esm::<PermissionsContainer>(),
479 deno_net::deno_net::init_ops_and_esm::<PermissionsContainer>(
480 services.root_cert_store_provider.clone(),
481 options.unsafely_ignore_certificate_errors.clone(),
482 ),
483 deno_tls::deno_tls::init_ops_and_esm(),
484 deno_kv::deno_kv::init_ops_and_esm(
485 MultiBackendDbHandler::remote_or_sqlite::<PermissionsContainer>(
486 None,
487 options.seed,
488 deno_kv::remote::HttpOptions {
489 user_agent: options.bootstrap.user_agent.clone(),
490 root_cert_store_provider: services.root_cert_store_provider,
491 unsafely_ignore_certificate_errors: options
492 .unsafely_ignore_certificate_errors
493 .clone(),
494 client_cert_chain_and_key: TlsKeys::Null,
495 proxy: None,
496 },
497 ),
498 deno_kv::KvConfig::builder().build(),
499 ),
500 deno_cron::deno_cron::init_ops_and_esm(LocalCronHandler::new()),
501 deno_napi::deno_napi::init_ops_and_esm::<PermissionsContainer>(),
502 deno_http::deno_http::init_ops_and_esm::<DefaultHttpPropertyExtractor>(
503 deno_http::Options::default(),
504 ),
505 deno_io::deno_io::init_ops_and_esm(Some(options.stdio)),
506 deno_fs::deno_fs::init_ops_and_esm::<PermissionsContainer>(
507 services.fs.clone(),
508 ),
509 deno_node::deno_node::init_ops_and_esm::<PermissionsContainer>(
510 services.node_services,
511 services.fs,
512 ),
513 ops::runtime::deno_runtime::init_ops_and_esm(options.main_module.clone()),
515 ops::worker_host::deno_worker_host::init_ops_and_esm(
516 options.create_web_worker_cb,
517 options.format_js_error_fn,
518 ),
519 ops::fs_events::deno_fs_events::init_ops_and_esm(),
520 ops::os::deno_os_worker::init_ops_and_esm(),
521 ops::permissions::deno_permissions::init_ops_and_esm(),
522 ops::process::deno_process::init_ops_and_esm(
523 services.npm_process_state_provider,
524 ),
525 ops::signal::deno_signal::init_ops_and_esm(),
526 ops::tty::deno_tty::init_ops_and_esm(),
527 ops::http::deno_http_runtime::init_ops_and_esm(),
528 ops::bootstrap::deno_bootstrap::init_ops_and_esm(
529 if options.startup_snapshot.is_some() {
530 None
531 } else {
532 Some(Default::default())
533 },
534 ),
535 deno_permissions_web_worker::init_ops_and_esm(
536 services.permissions,
537 enable_testing_features,
538 ),
539 runtime::init_ops_and_esm(),
540 ops::web_worker::deno_web_worker::init_ops_and_esm(),
541 ];
542
543 #[cfg(feature = "hmr")]
544 assert!(
545 cfg!(not(feature = "only_snapshotted_js_sources")),
546 "'hmr' is incompatible with 'only_snapshotted_js_sources'."
547 );
548
549 for extension in &mut extensions {
550 if options.startup_snapshot.is_some() {
551 extension.js_files = std::borrow::Cow::Borrowed(&[]);
552 extension.esm_files = std::borrow::Cow::Borrowed(&[]);
553 extension.esm_entry_point = None;
554 }
555 }
556
557 extensions.extend(std::mem::take(&mut options.extensions));
558
559 #[cfg(feature = "only_snapshotted_js_sources")]
560 options.startup_snapshot.as_ref().expect("A user snapshot was not provided, even though 'only_snapshotted_js_sources' is used.");
561
562 let (op_summary_metrics, op_metrics_factory_fn) = create_op_metrics(
564 options.bootstrap.enable_op_summary_metrics,
565 options.strace_ops,
566 );
567
568 let mut js_runtime = JsRuntime::new(RuntimeOptions {
569 module_loader: Some(services.module_loader),
570 startup_snapshot: options.startup_snapshot,
571 create_params: options.create_params,
572 get_error_class_fn: options.get_error_class_fn,
573 shared_array_buffer_store: services.shared_array_buffer_store,
574 compiled_wasm_module_store: services.compiled_wasm_module_store,
575 extensions,
576 extension_transpiler: Some(Rc::new(|specifier, source| {
577 maybe_transpile_source(specifier, source)
578 })),
579 inspector: true,
580 feature_checker: Some(services.feature_checker),
581 op_metrics_factory_fn,
582 import_meta_resolve_callback: Some(Box::new(
583 import_meta_resolve_callback,
584 )),
585 validate_import_attributes_cb: Some(Box::new(
586 validate_import_attributes_callback,
587 )),
588 import_assertions_support: deno_core::ImportAssertionsSupport::Error,
589 maybe_op_stack_trace_callback: if options.enable_stack_trace_arg_in_ops {
590 Some(Box::new(|stack| {
591 deno_permissions::prompter::set_current_stacktrace(stack)
592 }))
593 } else {
594 None
595 },
596 ..Default::default()
597 });
598
599 if let Some(op_summary_metrics) = op_summary_metrics {
600 js_runtime.op_state().borrow_mut().put(op_summary_metrics);
601 }
602
603 let op_state = js_runtime.op_state();
606 let inspector = js_runtime.inspector();
607 op_state.borrow_mut().put(inspector);
608
609 if let Some(server) = services.maybe_inspector_server {
610 server.register_inspector(
611 options.main_module.to_string(),
612 &mut js_runtime,
613 false,
614 );
615 }
616
617 let (internal_handle, external_handle) = {
618 let handle = js_runtime.v8_isolate().thread_safe_handle();
619 let (internal_handle, external_handle) =
620 create_handles(handle, options.name.clone(), options.worker_type);
621 let op_state = js_runtime.op_state();
622 let mut op_state = op_state.borrow_mut();
623 op_state.put(internal_handle.clone());
624 (internal_handle, external_handle)
625 };
626
627 let bootstrap_fn_global = {
628 let context = js_runtime.main_context();
629 let scope = &mut js_runtime.handle_scope();
630 let context_local = v8::Local::new(scope, context);
631 let global_obj = context_local.global(scope);
632 let bootstrap_str =
633 v8::String::new_external_onebyte_static(scope, b"bootstrap").unwrap();
634 let bootstrap_ns: v8::Local<v8::Object> = global_obj
635 .get(scope, bootstrap_str.into())
636 .unwrap()
637 .try_into()
638 .unwrap();
639 let main_runtime_str =
640 v8::String::new_external_onebyte_static(scope, b"workerRuntime")
641 .unwrap();
642 let bootstrap_fn =
643 bootstrap_ns.get(scope, main_runtime_str.into()).unwrap();
644 let bootstrap_fn =
645 v8::Local::<v8::Function>::try_from(bootstrap_fn).unwrap();
646 v8::Global::new(scope, bootstrap_fn)
647 };
648
649 (
650 Self {
651 id: options.worker_id,
652 js_runtime,
653 name: options.name,
654 internal_handle,
655 worker_type: options.worker_type,
656 main_module: options.main_module,
657 poll_for_messages_fn: None,
658 has_message_event_listener_fn: None,
659 bootstrap_fn_global: Some(bootstrap_fn_global),
660 close_on_idle: options.close_on_idle,
661 has_executed_main_module: false,
662 maybe_worker_metadata: options.maybe_worker_metadata,
663 },
664 external_handle,
665 options.bootstrap,
666 )
667 }
668
669 pub fn bootstrap(&mut self, options: &BootstrapOptions) {
670 let op_state = self.js_runtime.op_state();
671 op_state.borrow_mut().put(options.clone());
672 {
675 let scope = &mut self.js_runtime.handle_scope();
676 let args = options.as_v8(scope);
677 let bootstrap_fn = self.bootstrap_fn_global.take().unwrap();
678 let bootstrap_fn = v8::Local::new(scope, bootstrap_fn);
679 let undefined = v8::undefined(scope);
680 let mut worker_data: v8::Local<v8::Value> = v8::undefined(scope).into();
681 if let Some(data) = self.maybe_worker_metadata.take() {
682 let js_transferables = serialize_transferables(
683 &mut op_state.borrow_mut(),
684 data.transferables,
685 );
686 let js_message_data = JsMessageData {
687 data: data.buffer,
688 transferables: js_transferables,
689 };
690 worker_data =
691 deno_core::serde_v8::to_v8(scope, js_message_data).unwrap();
692 }
693 let name_str: v8::Local<v8::Value> =
694 v8::String::new(scope, &self.name).unwrap().into();
695 let id_str: v8::Local<v8::Value> =
696 v8::String::new(scope, &format!("{}", self.id))
697 .unwrap()
698 .into();
699 let id: v8::Local<v8::Value> =
700 v8::Integer::new(scope, self.id.0 as i32).into();
701 bootstrap_fn
702 .call(
703 scope,
704 undefined.into(),
705 &[args, name_str, id_str, id, worker_data],
706 )
707 .unwrap();
708
709 let context = scope.get_current_context();
710 let global = context.global(scope);
711 let poll_for_messages_str =
712 v8::String::new_external_onebyte_static(scope, b"pollForMessages")
713 .unwrap();
714 let poll_for_messages_fn = global
715 .get(scope, poll_for_messages_str.into())
716 .expect("get globalThis.pollForMessages");
717 global.delete(scope, poll_for_messages_str.into());
718 self.poll_for_messages_fn =
719 Some(v8::Global::new(scope, poll_for_messages_fn));
720
721 let has_message_event_listener_str =
722 v8::String::new_external_onebyte_static(
723 scope,
724 b"hasMessageEventListener",
725 )
726 .unwrap();
727 let has_message_event_listener_fn = global
728 .get(scope, has_message_event_listener_str.into())
729 .expect("get globalThis.hasMessageEventListener");
730 global.delete(scope, has_message_event_listener_str.into());
731 self.has_message_event_listener_fn =
732 Some(v8::Global::new(scope, has_message_event_listener_fn));
733 }
734 }
735
736 pub fn execute_script(
738 &mut self,
739 name: &'static str,
740 source_code: ModuleCodeString,
741 ) -> Result<(), AnyError> {
742 self.js_runtime.execute_script(name, source_code)?;
743 Ok(())
744 }
745
746 pub async fn preload_main_module(
748 &mut self,
749 module_specifier: &ModuleSpecifier,
750 ) -> Result<ModuleId, AnyError> {
751 self.js_runtime.load_main_es_module(module_specifier).await
752 }
753
754 pub async fn preload_side_module(
756 &mut self,
757 module_specifier: &ModuleSpecifier,
758 ) -> Result<ModuleId, AnyError> {
759 self.js_runtime.load_side_es_module(module_specifier).await
760 }
761
762 pub async fn execute_side_module(
767 &mut self,
768 module_specifier: &ModuleSpecifier,
769 ) -> Result<(), AnyError> {
770 let id = self.preload_side_module(module_specifier).await?;
771 let mut receiver = self.js_runtime.mod_evaluate(id);
772 tokio::select! {
773 biased;
774
775 maybe_result = &mut receiver => {
776 debug!("received module evaluate {:#?}", maybe_result);
777 maybe_result
778 }
779
780 event_loop_result = self.js_runtime.run_event_loop(PollEventLoopOptions::default()) => {
781 event_loop_result?;
782 receiver.await
783 }
784 }
785 }
786
787 pub async fn execute_main_module(
791 &mut self,
792 id: ModuleId,
793 ) -> Result<(), AnyError> {
794 let mut receiver = self.js_runtime.mod_evaluate(id);
795 let poll_options = PollEventLoopOptions::default();
796
797 tokio::select! {
798 biased;
799
800 maybe_result = &mut receiver => {
801 debug!("received worker module evaluate {:#?}", maybe_result);
802 self.has_executed_main_module = true;
803 maybe_result
804 }
805
806 event_loop_result = self.run_event_loop(poll_options) => {
807 if self.internal_handle.is_terminated() {
808 return Ok(());
809 }
810 event_loop_result?;
811 receiver.await
812 }
813 }
814 }
815
816 fn poll_event_loop(
817 &mut self,
818 cx: &mut Context,
819 poll_options: PollEventLoopOptions,
820 ) -> Poll<Result<(), AnyError>> {
821 if self.internal_handle.terminate_if_needed() {
823 return Poll::Ready(Ok(()));
824 }
825
826 self.internal_handle.terminate_waker.register(cx.waker());
827
828 match self.js_runtime.poll_event_loop(cx, poll_options) {
829 Poll::Ready(r) => {
830 if self.internal_handle.terminate_if_needed() {
832 return Poll::Ready(Ok(()));
833 }
834
835 if let Err(e) = r {
836 return Poll::Ready(Err(e));
837 }
838
839 if self.close_on_idle {
840 return Poll::Ready(Ok(()));
841 }
842
843 if self.worker_type == WebWorkerType::Module {
846 panic!(
847 "coding error: either js is polling or the worker is terminated"
848 );
849 } else {
850 log::error!("classic worker terminated unexpectedly");
851 Poll::Ready(Ok(()))
852 }
853 }
854 Poll::Pending => {
855 if self.close_on_idle
861 && self.has_executed_main_module
862 && !self.has_child_workers()
863 && !self.has_message_event_listener()
864 {
865 Poll::Ready(Ok(()))
866 } else {
867 Poll::Pending
868 }
869 }
870 }
871 }
872
873 pub async fn run_event_loop(
874 &mut self,
875 poll_options: PollEventLoopOptions,
876 ) -> Result<(), AnyError> {
877 poll_fn(|cx| self.poll_event_loop(cx, poll_options)).await
878 }
879
880 fn start_polling_for_messages(&mut self) {
882 let poll_for_messages_fn = self.poll_for_messages_fn.take().unwrap();
883 let scope = &mut self.js_runtime.handle_scope();
884 let poll_for_messages =
885 v8::Local::<v8::Value>::new(scope, poll_for_messages_fn);
886 let fn_ = v8::Local::<v8::Function>::try_from(poll_for_messages).unwrap();
887 let undefined = v8::undefined(scope);
888 fn_.call(scope, undefined.into(), &[]);
890 }
891
892 fn has_message_event_listener(&mut self) -> bool {
893 let has_message_event_listener_fn =
894 self.has_message_event_listener_fn.as_ref().unwrap();
895 let scope = &mut self.js_runtime.handle_scope();
896 let has_message_event_listener =
897 v8::Local::<v8::Value>::new(scope, has_message_event_listener_fn);
898 let fn_ =
899 v8::Local::<v8::Function>::try_from(has_message_event_listener).unwrap();
900 let undefined = v8::undefined(scope);
901 match fn_.call(scope, undefined.into(), &[]) {
903 Some(result) => result.is_true(),
904 None => false,
905 }
906 }
907
908 fn has_child_workers(&mut self) -> bool {
909 !self
910 .js_runtime
911 .op_state()
912 .borrow()
913 .borrow::<WorkersTable>()
914 .is_empty()
915 }
916}
917
918fn print_worker_error(
919 error: &AnyError,
920 name: &str,
921 format_js_error_fn: Option<&FormatJsErrorFn>,
922) {
923 let error_str = match format_js_error_fn {
924 Some(format_js_error_fn) => match error.downcast_ref::<JsError>() {
925 Some(js_error) => format_js_error_fn(js_error),
926 None => error.to_string(),
927 },
928 None => error.to_string(),
929 };
930 log::error!(
931 "{}: Uncaught (in worker \"{}\") {}",
932 colors::red_bold("error"),
933 name,
934 error_str.trim_start_matches("Uncaught "),
935 );
936}
937
938pub fn run_web_worker(
941 mut worker: WebWorker,
942 specifier: ModuleSpecifier,
943 mut maybe_source_code: Option<String>,
944 format_js_error_fn: Option<Arc<FormatJsErrorFn>>,
945) -> Result<(), AnyError> {
946 let name = worker.name.to_string();
947
948 let fut = async move {
952 let internal_handle = worker.internal_handle.clone();
953
954 let result = if let Some(source_code) = maybe_source_code.take() {
956 let r = worker.execute_script(located_script_name!(), source_code.into());
957 worker.start_polling_for_messages();
958 r
959 } else {
960 match worker.preload_main_module(&specifier).await {
963 Ok(id) => {
964 worker.start_polling_for_messages();
965 worker.execute_main_module(id).await
966 }
967 Err(e) => Err(e),
968 }
969 };
970
971 if internal_handle.is_terminated() {
974 return Ok(());
975 }
976
977 let result = if result.is_ok() {
978 worker
979 .run_event_loop(PollEventLoopOptions {
980 wait_for_inspector: true,
981 ..Default::default()
982 })
983 .await
984 } else {
985 result
986 };
987
988 if let Err(e) = result {
989 print_worker_error(&e, &name, format_js_error_fn.as_deref());
990 internal_handle
991 .post_event(WorkerControlEvent::TerminalError(e))
992 .expect("Failed to post message to host");
993
994 return Ok(());
996 }
997
998 debug!("Worker thread shuts down {}", &name);
999 result
1000 };
1001 create_and_run_current_thread(fut)
1002}