Skip to main content

deno_core/
inspector.rs

1// Copyright 2018-2025 the Deno authors. MIT license.
2
3//! The documentation for the inspector API is sparse, but these are helpful:
4//! <https://chromedevtools.github.io/devtools-protocol/>
5//! <https://web.archive.org/web/20210918052901/https://hyperandroid.com/2020/02/12/v8-inspector-from-an-embedder-standpoint/>
6
7use crate::futures::channel::mpsc;
8use crate::futures::channel::mpsc::UnboundedReceiver;
9use crate::futures::channel::mpsc::UnboundedSender;
10use crate::futures::channel::oneshot;
11use crate::futures::prelude::*;
12use crate::futures::stream::FuturesUnordered;
13use crate::futures::stream::StreamExt;
14use crate::futures::task;
15use crate::serde_json::json;
16
17use parking_lot::Mutex;
18use std::cell::Cell;
19use std::cell::RefCell;
20use std::collections::HashMap;
21use std::ffi::c_void;
22use std::mem::take;
23use std::pin::Pin;
24use std::ptr::NonNull;
25use std::rc::Rc;
26use std::sync::Arc;
27use std::task::Context;
28use std::task::Poll;
29use std::thread;
30
31#[derive(Debug)]
32pub enum InspectorMsgKind {
33  Notification,
34  Message(i32),
35}
36
37#[derive(Debug)]
38pub struct InspectorMsg {
39  pub kind: InspectorMsgKind,
40  pub content: String,
41}
42
43impl InspectorMsg {
44  /// Create a notification message from a JSON value
45  pub fn notification(content: serde_json::Value) -> Self {
46    Self {
47      kind: InspectorMsgKind::Notification,
48      content: content.to_string(),
49    }
50  }
51}
52
53// TODO(bartlomieju): remove this
54pub type SessionProxySender = UnboundedSender<InspectorMsg>;
55// TODO(bartlomieju): remove this
56pub type SessionProxyReceiver = UnboundedReceiver<String>;
57
58/// Channels for an inspector session
59pub enum InspectorSessionChannels {
60  /// Regular inspector session with bidirectional communication
61  Regular {
62    tx: SessionProxySender,
63    rx: SessionProxyReceiver,
64  },
65  /// Worker inspector session with separate channels for main<->worker communication
66  Worker {
67    /// Main thread sends commands TO worker
68    main_to_worker_tx: UnboundedSender<String>,
69    /// Worker sends responses/events TO main thread
70    worker_to_main_rx: UnboundedReceiver<InspectorMsg>,
71    /// Worker URL for identification
72    worker_url: String,
73  },
74}
75
76/// Encapsulates channels and metadata for creating an inspector session
77pub struct InspectorSessionProxy {
78  pub channels: InspectorSessionChannels,
79  pub kind: InspectorSessionKind,
80}
81
82/// Creates a pair of inspector session proxies for worker-main communication.
83pub fn create_worker_inspector_session_pair(
84  worker_url: String,
85) -> (InspectorSessionProxy, InspectorSessionProxy) {
86  let (worker_to_main_tx, worker_to_main_rx) =
87    mpsc::unbounded::<InspectorMsg>();
88  let (main_to_worker_tx, main_to_worker_rx) = mpsc::unbounded::<String>();
89
90  let main_side = InspectorSessionProxy {
91    channels: InspectorSessionChannels::Worker {
92      main_to_worker_tx,
93      worker_to_main_rx,
94      worker_url,
95    },
96    kind: InspectorSessionKind::NonBlocking {
97      wait_for_disconnect: false,
98    },
99  };
100
101  let worker_side = InspectorSessionProxy {
102    channels: InspectorSessionChannels::Regular {
103      tx: worker_to_main_tx,
104      rx: main_to_worker_rx,
105    },
106    kind: InspectorSessionKind::NonBlocking {
107      wait_for_disconnect: false,
108    },
109  };
110
111  (main_side, worker_side)
112}
113
114pub type InspectorSessionSend = Box<dyn Fn(InspectorMsg)>;
115
116#[derive(Clone, Copy, Debug)]
117enum PollState {
118  Idle,
119  Woken,
120  Polling,
121  Parked,
122  Dropped,
123}
124
125/// This structure is used responsible for providing inspector interface
126/// to the `JsRuntime`.
127///
128/// It stores an instance of `v8::inspector::V8Inspector` and additionally
129/// implements `v8::inspector::V8InspectorClientImpl`.
130///
131/// After creating this structure it's possible to connect multiple sessions
132/// to the inspector, in case of Deno it's either: a "websocket session" that
133/// provides integration with Chrome Devtools, or an "in-memory session" that
134/// is used for REPL or coverage collection.
135pub struct JsRuntimeInspector {
136  v8_inspector: Rc<v8::inspector::V8Inspector>,
137  new_session_tx: UnboundedSender<InspectorSessionProxy>,
138  deregister_tx: RefCell<Option<oneshot::Sender<()>>>,
139  state: Rc<JsRuntimeInspectorState>,
140}
141
142impl Drop for JsRuntimeInspector {
143  fn drop(&mut self) {
144    // Since the waker is cloneable, it might outlive the inspector itself.
145    // Set the poll state to 'dropped' so it doesn't attempt to request an
146    // interrupt from the isolate.
147    self
148      .state
149      .waker
150      .update(|w| w.poll_state = PollState::Dropped);
151
152    // V8 automatically deletes all sessions when an `V8Inspector` instance is
153    // deleted, however InspectorSession also has a drop handler that cleans
154    // up after itself. To avoid a double free, make sure the inspector is
155    // dropped last.
156    self.state.sessions.borrow_mut().drop_sessions();
157
158    // Notify counterparty that this instance is being destroyed. Ignoring
159    // result because counterparty waiting for the signal might have already
160    // dropped the other end of channel.
161    if let Some(deregister_tx) = self.deregister_tx.borrow_mut().take() {
162      let _ = deregister_tx.send(());
163    }
164  }
165}
166
167#[derive(Clone)]
168struct JsRuntimeInspectorState {
169  isolate_ptr: v8::UnsafeRawIsolatePtr,
170  context: v8::Global<v8::Context>,
171  flags: Rc<RefCell<InspectorFlags>>,
172  waker: Arc<InspectorWaker>,
173  sessions: Rc<RefCell<SessionContainer>>,
174  is_dispatching_message: Rc<RefCell<bool>>,
175  pending_worker_messages: Arc<Mutex<Vec<(String, String)>>>,
176  nodeworker_enabled: Rc<Cell<bool>>,
177  auto_attach_enabled: Rc<Cell<bool>>,
178  discover_targets_enabled: Rc<Cell<bool>>,
179}
180
181struct JsRuntimeInspectorClient(Rc<JsRuntimeInspectorState>);
182
183impl v8::inspector::V8InspectorClientImpl for JsRuntimeInspectorClient {
184  fn run_message_loop_on_pause(&self, context_group_id: i32) {
185    assert_eq!(context_group_id, JsRuntimeInspector::CONTEXT_GROUP_ID);
186    self.0.flags.borrow_mut().on_pause = true;
187    let _ = self.0.poll_sessions(None);
188  }
189
190  fn quit_message_loop_on_pause(&self) {
191    self.0.flags.borrow_mut().on_pause = false;
192  }
193
194  fn run_if_waiting_for_debugger(&self, context_group_id: i32) {
195    assert_eq!(context_group_id, JsRuntimeInspector::CONTEXT_GROUP_ID);
196    self.0.flags.borrow_mut().waiting_for_session = false;
197  }
198
199  fn ensure_default_context_in_group(
200    &self,
201    context_group_id: i32,
202  ) -> Option<v8::Local<'_, v8::Context>> {
203    assert_eq!(context_group_id, JsRuntimeInspector::CONTEXT_GROUP_ID);
204    let context = self.0.context.clone();
205    let mut isolate =
206      unsafe { v8::Isolate::from_raw_isolate_ptr(self.0.isolate_ptr) };
207    let isolate = &mut isolate;
208    v8::callback_scope!(unsafe scope, isolate);
209    let local = v8::Local::new(scope, context);
210    Some(unsafe { local.extend_lifetime_unchecked() })
211  }
212
213  fn resource_name_to_url(
214    &self,
215    resource_name: &v8::inspector::StringView,
216  ) -> Option<v8::UniquePtr<v8::inspector::StringBuffer>> {
217    let resource_name = resource_name.to_string();
218    let url = url::Url::from_file_path(resource_name).ok()?;
219    let src_view = v8::inspector::StringView::from(url.as_str().as_bytes());
220    Some(v8::inspector::StringBuffer::create(src_view))
221  }
222}
223
224impl JsRuntimeInspectorState {
225  #[allow(clippy::result_unit_err)]
226  pub fn poll_sessions(
227    &self,
228    mut invoker_cx: Option<&mut Context>,
229  ) -> Result<Poll<()>, ()> {
230    // The futures this function uses do not have re-entrant poll() functions.
231    // However it is can happen that poll_sessions() gets re-entered, e.g.
232    // when an interrupt request is honored while the inspector future is polled
233    // by the task executor. We let the caller know by returning some error.
234    let Ok(mut sessions) = self.sessions.try_borrow_mut() else {
235      return Err(());
236    };
237
238    self.waker.update(|w| {
239      match w.poll_state {
240        PollState::Idle | PollState::Woken => w.poll_state = PollState::Polling,
241        _ => unreachable!(),
242      };
243    });
244
245    // Create a new Context object that will make downstream futures
246    // use the InspectorWaker when they are ready to be polled again.
247    let waker_ref = task::waker_ref(&self.waker);
248    let cx = &mut Context::from_waker(&waker_ref);
249
250    loop {
251      loop {
252        // Do one "handshake" with a newly connected session at a time.
253        if let Some(session) = sessions.handshake.take() {
254          let mut fut =
255            pump_inspector_session_messages(session.clone()).boxed_local();
256          // Only add to established if the future is still pending.
257          // If the channel is already closed (e.g., worker terminated quickly),
258          // the future may complete immediately on first poll. Pushing a
259          // completed future to FuturesUnordered would cause a panic when
260          // polled again.
261          if fut.poll_unpin(cx).is_pending() {
262            sessions.established.push(fut);
263          }
264          let id = sessions.next_local_id;
265          sessions.next_local_id += 1;
266          sessions.local.insert(id, session);
267
268          // Track the first session as the main session for Target events
269          if sessions.main_session_id.is_none() {
270            sessions.main_session_id = Some(id);
271          }
272
273          continue;
274        }
275
276        // Accept new connections.
277        if let Poll::Ready(Some(session_proxy)) =
278          sessions.session_rx.poll_next_unpin(cx)
279        {
280          match session_proxy.channels {
281            InspectorSessionChannels::Worker {
282              main_to_worker_tx,
283              worker_to_main_rx,
284              worker_url,
285            } => {
286              // Get the next local ID for this worker
287              let worker_id = sessions.next_local_id;
288              sessions.next_local_id += 1;
289
290              sessions.register_worker_session(worker_id, worker_url.clone());
291
292              // Register the worker channels
293              sessions.register_worker_channels(
294                worker_id,
295                main_to_worker_tx,
296                worker_to_main_rx,
297              );
298
299              // Notify the main session about worker target creation/attachment.
300              //
301              // discover_targets_enabled: Set to true when the debugger calls
302              // Target.setDiscoverTargets. When enabled, the inspector sends
303              // Target.targetCreated events for new workers.
304              //
305              // auto_attach_enabled: Set to true when the debugger calls
306              // Target.setAutoAttach. When enabled, the inspector automatically
307              // attaches to new workers and sends Target.attachedToTarget events.
308              if let Some(main_id) = sessions.main_session_id
309                && let Some(main_session) = sessions.local.get(&main_id)
310                && let Some(ts) =
311                  sessions.target_sessions.get(&format!("{}", worker_id))
312              {
313                if self.discover_targets_enabled.get() {
314                  (main_session.state.send)(InspectorMsg::notification(
315                    json!({
316                      "method": "Target.targetCreated",
317                      "params": { "targetInfo": ts.target_info(false) }
318                    }),
319                  ));
320                }
321
322                if self.auto_attach_enabled.get() {
323                  ts.attached.set(true);
324                  (main_session.state.send)(InspectorMsg::notification(
325                    json!({
326                      "method": "Target.attachedToTarget",
327                      "params": {
328                        "sessionId": ts.session_id,
329                        "targetInfo": ts.target_info(true),
330                        "waitingForDebugger": false
331                      }
332                    }),
333                  ));
334                }
335              }
336
337              continue;
338            }
339            InspectorSessionChannels::Regular { tx, rx } => {
340              // Normal session (not a worker)
341              let session = InspectorSession::new(
342                sessions.v8_inspector.as_ref().unwrap().clone(),
343                self.is_dispatching_message.clone(),
344                Box::new(move |msg| {
345                  let _ = tx.unbounded_send(msg);
346                }),
347                Some(rx),
348                session_proxy.kind,
349                self.sessions.clone(),
350                self.pending_worker_messages.clone(),
351                self.nodeworker_enabled.clone(),
352                self.auto_attach_enabled.clone(),
353                self.discover_targets_enabled.clone(),
354              );
355
356              let prev = sessions.handshake.replace(session);
357              assert!(prev.is_none());
358              continue;
359            }
360          }
361        }
362
363        // Poll worker message channels - forward messages from workers to main session
364        if let Some(main_id) = sessions.main_session_id {
365          // Get main session send function before mutably iterating over target_sessions
366          let main_session_send =
367            sessions.local.get(&main_id).map(|s| s.state.send.clone());
368
369          if let Some(send) = main_session_send {
370            let mut has_worker_message = false;
371            let mut terminated_workers = Vec::new();
372
373            for target_session in sessions.target_sessions.values() {
374              // Skip sessions that haven't had their channels registered
375              if !target_session.has_channels() {
376                continue;
377              }
378              match target_session.poll_from_worker(cx) {
379                Poll::Ready(Some(msg)) => {
380                  // CDP Flattened Session Mode: Add sessionId at top level
381                  // Used by Chrome DevTools (when NodeWorker.enable has NOT been called)
382                  // VSCode uses NodeWorker.receivedMessageFromWorker instead
383                  if !self.nodeworker_enabled.get() {
384                    if let Ok(mut parsed) =
385                      serde_json::from_str::<serde_json::Value>(&msg.content)
386                    {
387                      if let Some(obj) = parsed.as_object_mut() {
388                        obj.insert(
389                          "sessionId".to_string(),
390                          json!(target_session.session_id),
391                        );
392                        let flattened_msg = parsed.to_string();
393
394                        // Send the flattened response with sessionId at top level
395                        send(InspectorMsg {
396                          kind: msg.kind,
397                          content: flattened_msg,
398                        });
399                      }
400                    } else {
401                      // Fallback: send as-is if not valid JSON
402                      send(msg);
403                    }
404                  } else {
405                    let wrapped_nodeworker = json!({
406                      "method": "NodeWorker.receivedMessageFromWorker",
407                      "params": {
408                        "sessionId": target_session.session_id,
409                        "message": msg.content,
410                        "workerId": target_session.target_id
411                      }
412                    });
413
414                    send(InspectorMsg {
415                      kind: InspectorMsgKind::Notification,
416                      content: wrapped_nodeworker.to_string(),
417                    });
418                  }
419
420                  has_worker_message = true;
421                }
422                Poll::Ready(None) => {
423                  // Worker channel closed - worker has terminated
424                  // Notify debugger based on which protocol is in use
425                  if self.nodeworker_enabled.get() {
426                    // VSCode / Node.js style
427                    send(InspectorMsg::notification(json!({
428                      "method": "NodeWorker.detachedFromWorker",
429                      "params": {
430                        "sessionId": target_session.session_id
431                      }
432                    })));
433                  } else if self.auto_attach_enabled.get()
434                    || self.discover_targets_enabled.get()
435                  {
436                    // Chrome DevTools style
437                    send(InspectorMsg::notification(json!({
438                      "method": "Target.targetDestroyed",
439                      "params": {
440                        "targetId": target_session.target_id
441                      }
442                    })));
443                  }
444                  terminated_workers.push(target_session.session_id.clone());
445                }
446                Poll::Pending => {}
447              }
448            }
449
450            // Clean up terminated worker sessions
451            for session_id in terminated_workers {
452              sessions.target_sessions.remove(&session_id);
453              has_worker_message = true; // Trigger re-poll
454            }
455
456            if has_worker_message {
457              continue;
458            }
459          }
460        }
461
462        // Poll established sessions.
463        match sessions.established.poll_next_unpin(cx) {
464          Poll::Ready(Some(())) => {
465            continue;
466          }
467          Poll::Ready(None) => {
468            break;
469          }
470          Poll::Pending => {
471            break;
472          }
473        };
474      }
475
476      let should_block = {
477        let flags = self.flags.borrow();
478        flags.on_pause || flags.waiting_for_session
479      };
480
481      // Process any queued NodeWorker messages after polling completes
482      // Drain from the thread-safe Mutex queue (doesn't require borrowing sessions)
483      let pending_messages: Vec<(String, String)> = {
484        let mut queue = self.pending_worker_messages.lock();
485        queue.drain(..).collect()
486      };
487
488      for (session_id, message) in pending_messages {
489        if let Some(target_session) = sessions.target_sessions.get(&session_id)
490        {
491          target_session.send_to_worker(message);
492        }
493      }
494
495      let new_state = self.waker.update(|w| {
496        match w.poll_state {
497          PollState::Woken => {
498            // The inspector was woken while the session handler was being
499            // polled, so we poll it another time.
500            w.poll_state = PollState::Polling;
501          }
502          PollState::Polling if !should_block => {
503            // The session handler doesn't need to be polled any longer, and
504            // there's no reason to block (execution is not paused), so this
505            // function is about to return.
506            w.poll_state = PollState::Idle;
507            // Register the task waker that can be used to wake the parent
508            // task that will poll the inspector future.
509            if let Some(cx) = invoker_cx.take() {
510              w.task_waker.replace(cx.waker().clone());
511            }
512            // Register the address of the inspector, which allows the waker
513            // to request an interrupt from the isolate.
514            w.inspector_state_ptr = NonNull::new(self as *const _ as *mut Self);
515          }
516          PollState::Polling if should_block => {
517            // Isolate execution has been paused but there are no more
518            // events to process, so this thread will be parked. Therefore,
519            // store the current thread handle in the waker so it knows
520            // which thread to unpark when new events arrive.
521            w.poll_state = PollState::Parked;
522            w.parked_thread.replace(thread::current());
523          }
524          _ => unreachable!(),
525        };
526        w.poll_state
527      });
528      match new_state {
529        PollState::Idle => break,            // Yield to task.
530        PollState::Polling => continue,      // Poll the session handler again.
531        PollState::Parked => thread::park(), // Park the thread.
532        _ => unreachable!(),
533      };
534    }
535
536    Ok(Poll::Pending)
537  }
538}
539
540impl JsRuntimeInspector {
541  /// Currently Deno supports only a single context in `JsRuntime`
542  /// and thus it's id is provided as an associated constant.
543  const CONTEXT_GROUP_ID: i32 = 1;
544
545  pub fn new(
546    isolate_ptr: v8::UnsafeRawIsolatePtr,
547    scope: &mut v8::PinScope,
548    context: v8::Local<v8::Context>,
549    is_main_runtime: bool,
550    worker_id: Option<u32>,
551  ) -> Rc<Self> {
552    let (new_session_tx, new_session_rx) =
553      mpsc::unbounded::<InspectorSessionProxy>();
554
555    let waker = InspectorWaker::new(scope.thread_safe_handle());
556    let state = Rc::new(JsRuntimeInspectorState {
557      waker,
558      flags: Default::default(),
559      isolate_ptr,
560      context: v8::Global::new(scope, context),
561      sessions: Rc::new(
562        RefCell::new(SessionContainer::temporary_placeholder()),
563      ),
564      is_dispatching_message: Default::default(),
565      pending_worker_messages: Arc::new(Mutex::new(Vec::new())),
566      nodeworker_enabled: Rc::new(Cell::new(false)),
567      auto_attach_enabled: Rc::new(Cell::new(false)),
568      discover_targets_enabled: Rc::new(Cell::new(false)),
569    });
570    let client = Box::new(JsRuntimeInspectorClient(state.clone()));
571    let v8_inspector_client = v8::inspector::V8InspectorClient::new(client);
572    let v8_inspector = Rc::new(v8::inspector::V8Inspector::create(
573      scope,
574      v8_inspector_client,
575    ));
576
577    *state.sessions.borrow_mut() =
578      SessionContainer::new(v8_inspector.clone(), new_session_rx);
579
580    // Tell the inspector about the main realm.
581    let context_name_bytes = if is_main_runtime {
582      &b"main realm"[..]
583    } else {
584      &format!("worker [{}]", worker_id.unwrap_or(1)).into_bytes()
585    };
586
587    let context_name = v8::inspector::StringView::from(context_name_bytes);
588    // NOTE(bartlomieju): this is what Node.js does and it turns out some
589    // debuggers (like VSCode) rely on this information to disconnect after
590    // program completes.
591    // The auxData structure should match {isDefault: boolean, type: 'default'|'isolated'|'worker'}
592    // For Chrome DevTools to properly show workers in the execution context dropdown.
593    let aux_data = if is_main_runtime {
594      r#"{"isDefault": true, "type": "default"}"#
595    } else {
596      r#"{"isDefault": false, "type": "worker"}"#
597    };
598    let aux_data_view = v8::inspector::StringView::from(aux_data.as_bytes());
599    v8_inspector.context_created(
600      context,
601      Self::CONTEXT_GROUP_ID,
602      context_name,
603      aux_data_view,
604    );
605
606    // Poll the session handler so we will get notified whenever there is
607    // new incoming debugger activity.
608    let _ = state.poll_sessions(None).unwrap();
609
610    Rc::new(Self {
611      v8_inspector,
612      state,
613      new_session_tx,
614      deregister_tx: RefCell::new(None),
615    })
616  }
617
618  pub fn is_dispatching_message(&self) -> bool {
619    *self.state.is_dispatching_message.borrow()
620  }
621
622  pub fn context_destroyed(
623    &self,
624    scope: &mut v8::PinScope<'_, '_>,
625    context: v8::Global<v8::Context>,
626  ) {
627    let context = v8::Local::new(scope, context);
628    self.v8_inspector.context_destroyed(context);
629  }
630
631  pub fn exception_thrown(
632    &self,
633    scope: &mut v8::PinScope<'_, '_>,
634    exception: v8::Local<'_, v8::Value>,
635    in_promise: bool,
636  ) {
637    let context = scope.get_current_context();
638    let message = v8::Exception::create_message(scope, exception);
639    let stack_trace = message.get_stack_trace(scope);
640    let stack_trace = self.v8_inspector.create_stack_trace(stack_trace);
641    self.v8_inspector.exception_thrown(
642      context,
643      if in_promise {
644        v8::inspector::StringView::from("Uncaught (in promise)".as_bytes())
645      } else {
646        v8::inspector::StringView::from("Uncaught".as_bytes())
647      },
648      exception,
649      v8::inspector::StringView::from("".as_bytes()),
650      v8::inspector::StringView::from("".as_bytes()),
651      0,
652      0,
653      stack_trace,
654      0,
655    );
656  }
657
658  pub fn sessions_state(&self) -> SessionsState {
659    self.state.sessions.borrow().sessions_state()
660  }
661
662  pub fn poll_sessions_from_event_loop(&self, cx: &mut Context) {
663    let _ = self.state.poll_sessions(Some(cx)).unwrap();
664  }
665
666  /// This function blocks the thread until at least one inspector client has
667  /// established a websocket connection.
668  pub fn wait_for_session(&self) {
669    loop {
670      if let Some(_session) =
671        self.state.sessions.borrow_mut().local.values().next()
672      {
673        self.state.flags.borrow_mut().waiting_for_session = false;
674        break;
675      } else {
676        self.state.flags.borrow_mut().waiting_for_session = true;
677        let _ = self.state.poll_sessions(None).unwrap();
678      }
679    }
680  }
681
682  /// This function blocks the thread until at least one inspector client has
683  /// established a websocket connection.
684  ///
685  /// After that, it instructs V8 to pause at the next statement.
686  /// Frontend must send "Runtime.runIfWaitingForDebugger" message to resume
687  /// execution.
688  pub fn wait_for_session_and_break_on_next_statement(&self) {
689    loop {
690      if let Some(session) =
691        self.state.sessions.borrow_mut().local.values().next()
692      {
693        break session.break_on_next_statement();
694      } else {
695        self.state.flags.borrow_mut().waiting_for_session = true;
696        let _ = self.state.poll_sessions(None).unwrap();
697      }
698    }
699  }
700
701  /// Obtain a sender for proxy channels.
702  pub fn get_session_sender(&self) -> UnboundedSender<InspectorSessionProxy> {
703    self.new_session_tx.clone()
704  }
705
706  /// Create a channel that notifies the frontend when inspector is dropped.
707  ///
708  /// NOTE: Only a single handler is currently available.
709  pub fn add_deregister_handler(&self) -> oneshot::Receiver<()> {
710    let maybe_deregister_tx = self.deregister_tx.borrow_mut().take();
711    if let Some(deregister_tx) = maybe_deregister_tx
712      && !deregister_tx.is_canceled()
713    {
714      panic!("Inspector deregister handler already exists and is alive.");
715    }
716    let (tx, rx) = oneshot::channel::<()>();
717    self.deregister_tx.borrow_mut().replace(tx);
718    rx
719  }
720
721  pub fn create_local_session(
722    inspector: Rc<JsRuntimeInspector>,
723    callback: InspectorSessionSend,
724    kind: InspectorSessionKind,
725  ) -> LocalInspectorSession {
726    let (session_id, sessions) = {
727      let sessions = inspector.state.sessions.clone();
728
729      let inspector_session = InspectorSession::new(
730        inspector.v8_inspector.clone(),
731        inspector.state.is_dispatching_message.clone(),
732        callback,
733        None,
734        kind,
735        sessions.clone(),
736        inspector.state.pending_worker_messages.clone(),
737        inspector.state.nodeworker_enabled.clone(),
738        inspector.state.auto_attach_enabled.clone(),
739        inspector.state.discover_targets_enabled.clone(),
740      );
741
742      let session_id = {
743        let mut s = sessions.borrow_mut();
744        let id = s.next_local_id;
745        s.next_local_id += 1;
746        assert!(s.local.insert(id, inspector_session).is_none());
747        id
748      };
749
750      take(&mut inspector.state.flags.borrow_mut().waiting_for_session);
751      (session_id, sessions)
752    };
753
754    LocalInspectorSession::new(session_id, sessions)
755  }
756}
757
758#[derive(Default)]
759struct InspectorFlags {
760  waiting_for_session: bool,
761  on_pause: bool,
762}
763
764#[derive(Debug)]
765pub struct SessionsState {
766  pub has_active: bool,
767  pub has_blocking: bool,
768  pub has_nonblocking: bool,
769  pub has_nonblocking_wait_for_disconnect: bool,
770}
771
772/// A helper structure that helps coordinate sessions during different
773/// parts of their lifecycle.
774pub struct SessionContainer {
775  v8_inspector: Option<Rc<v8::inspector::V8Inspector>>,
776  session_rx: UnboundedReceiver<InspectorSessionProxy>,
777  handshake: Option<Rc<InspectorSession>>,
778  established: FuturesUnordered<InspectorSessionPumpMessages>,
779  next_local_id: i32,
780  local: HashMap<i32, Rc<InspectorSession>>,
781
782  target_sessions: HashMap<String, Rc<TargetSession>>, // sessionId -> TargetSession
783  main_session_id: Option<i32>, // The first session that should receive Target events
784  next_worker_id: u32, // Sequential ID for worker display naming (1, 2, 3, ...)
785}
786
787struct MainWorkerChannels {
788  main_to_worker_tx: UnboundedSender<String>,
789  worker_to_main_rx: UnboundedReceiver<InspectorMsg>,
790}
791
792/// Represents a CDP Target session (e.g., a worker)
793struct TargetSession {
794  target_id: String,
795  session_id: String,
796  local_session_id: i32,
797  /// Sequential worker ID for display (1, 2, 3, ...) - independent from session IDs
798  worker_id: u32,
799  main_worker_channels: RefCell<Option<MainWorkerChannels>>,
800  url: String,
801  /// Track if we've already sent attachedToTarget for this session
802  attached: Cell<bool>,
803}
804
805impl TargetSession {
806  /// Get a display title for the worker using Node.js style naming
807  /// e.g., "worker [1]", "worker [2]"
808  fn title(&self) -> String {
809    format!("worker [{}]", self.worker_id)
810  }
811
812  /// Send a message to the worker (main → worker direction)
813  fn send_to_worker(&self, message: String) {
814    if let Some(channels) = self.main_worker_channels.borrow().as_ref() {
815      let _ = channels.main_to_worker_tx.unbounded_send(message);
816    }
817  }
818
819  /// Returns true if worker channels have been registered
820  fn has_channels(&self) -> bool {
821    self.main_worker_channels.borrow().is_some()
822  }
823
824  /// Poll for messages from the worker (worker → main direction).
825  /// Panics if channels have not been registered yet - caller should
826  /// check has_channels() first.
827  fn poll_from_worker(&self, cx: &mut Context) -> Poll<Option<InspectorMsg>> {
828    self
829      .main_worker_channels
830      .borrow_mut()
831      .as_mut()
832      .expect("poll_from_worker called before channels were registered")
833      .worker_to_main_rx
834      .poll_next_unpin(cx)
835  }
836}
837
838impl SessionContainer {
839  fn new(
840    v8_inspector: Rc<v8::inspector::V8Inspector>,
841    new_session_rx: UnboundedReceiver<InspectorSessionProxy>,
842  ) -> Self {
843    Self {
844      v8_inspector: Some(v8_inspector),
845      session_rx: new_session_rx,
846      handshake: None,
847      established: FuturesUnordered::new(),
848      next_local_id: 1,
849      local: HashMap::new(),
850
851      target_sessions: HashMap::new(),
852      main_session_id: None,
853      next_worker_id: 1, // Workers are numbered starting from 1
854    }
855  }
856
857  /// V8 automatically deletes all sessions when an `V8Inspector` instance is
858  /// deleted, however InspectorSession also has a drop handler that cleans
859  /// up after itself. To avoid a double free, we need to manually drop
860  /// all sessions before dropping the inspector instance.
861  fn drop_sessions(&mut self) {
862    self.v8_inspector = Default::default();
863    self.handshake.take();
864    self.established.clear();
865    self.local.clear();
866  }
867
868  fn sessions_state(&self) -> SessionsState {
869    SessionsState {
870      has_active: !self.established.is_empty()
871        || self.handshake.is_some()
872        || !self.local.is_empty(),
873      has_blocking: self
874        .local
875        .values()
876        .any(|s| matches!(s.state.kind, InspectorSessionKind::Blocking)),
877      has_nonblocking: self.local.values().any(|s| {
878        matches!(s.state.kind, InspectorSessionKind::NonBlocking { .. })
879      }),
880      has_nonblocking_wait_for_disconnect: self.local.values().any(|s| {
881        matches!(
882          s.state.kind,
883          InspectorSessionKind::NonBlocking {
884            wait_for_disconnect: true
885          }
886        )
887      }),
888    }
889  }
890
891  /// A temporary placeholder that should be used before actual
892  /// instance of V8Inspector is created. It's used in favor
893  /// of `Default` implementation to signal that it's not meant
894  /// for actual use.
895  fn temporary_placeholder() -> Self {
896    let (_tx, rx) = mpsc::unbounded::<InspectorSessionProxy>();
897    Self {
898      v8_inspector: Default::default(),
899      session_rx: rx,
900      handshake: None,
901      established: FuturesUnordered::new(),
902      next_local_id: 1,
903      local: HashMap::new(),
904
905      target_sessions: HashMap::new(),
906      main_session_id: None,
907      next_worker_id: 1,
908    }
909  }
910
911  pub fn dispatch_message_from_frontend(
912    &mut self,
913    session_id: i32,
914    message: String,
915  ) {
916    let session = self.local.get(&session_id).unwrap();
917    session.dispatch_message(message);
918  }
919
920  /// Register a worker session and return the assigned worker ID
921  fn register_worker_session(
922    &mut self,
923    local_session_id: i32,
924    worker_url: String,
925  ) -> u32 {
926    // Assign a sequential worker ID for display purposes
927    let worker_id = self.next_worker_id;
928    self.next_worker_id += 1;
929
930    // Use the local_session_id for internal session routing
931    let target_id = format!("{}", local_session_id);
932    let session_id = format!("{}", local_session_id);
933
934    let target_session = Rc::new(TargetSession {
935      target_id: target_id.clone(),
936      session_id: session_id.clone(),
937      local_session_id,
938      worker_id,
939      main_worker_channels: RefCell::new(None),
940      url: worker_url.clone(),
941      attached: Cell::new(false),
942    });
943    self
944      .target_sessions
945      .insert(session_id.clone(), target_session.clone());
946
947    worker_id
948  }
949
950  /// Register the communication channels for a worker's V8 inspector
951  /// This is called from the worker side to establish bidirectional communication
952  pub fn register_worker_channels(
953    &mut self,
954    local_session_id: i32,
955    main_to_worker_tx: UnboundedSender<String>,
956    worker_to_main_rx: UnboundedReceiver<InspectorMsg>,
957  ) -> bool {
958    // Find the target session for this local session ID
959    for target_session in self.target_sessions.values() {
960      if target_session.local_session_id == local_session_id {
961        *target_session.main_worker_channels.borrow_mut() =
962          Some(MainWorkerChannels {
963            main_to_worker_tx,
964            worker_to_main_rx,
965          });
966        return true;
967      }
968    }
969    false
970  }
971}
972
973struct InspectorWakerInner {
974  poll_state: PollState,
975  task_waker: Option<task::Waker>,
976  parked_thread: Option<thread::Thread>,
977  inspector_state_ptr: Option<NonNull<JsRuntimeInspectorState>>,
978  isolate_handle: v8::IsolateHandle,
979}
980
981// SAFETY: unsafe trait must have unsafe implementation
982unsafe impl Send for InspectorWakerInner {}
983
984struct InspectorWaker(Mutex<InspectorWakerInner>);
985
986impl InspectorWaker {
987  fn new(isolate_handle: v8::IsolateHandle) -> Arc<Self> {
988    let inner = InspectorWakerInner {
989      poll_state: PollState::Idle,
990      task_waker: None,
991      parked_thread: None,
992      inspector_state_ptr: None,
993      isolate_handle,
994    };
995    Arc::new(Self(Mutex::new(inner)))
996  }
997
998  fn update<F, R>(&self, update_fn: F) -> R
999  where
1000    F: FnOnce(&mut InspectorWakerInner) -> R,
1001  {
1002    let mut g = self.0.lock();
1003    update_fn(&mut g)
1004  }
1005}
1006
1007impl task::ArcWake for InspectorWaker {
1008  fn wake_by_ref(arc_self: &Arc<Self>) {
1009    arc_self.update(|w| {
1010      match w.poll_state {
1011        PollState::Idle => {
1012          // Wake the task, if any, that has polled the Inspector future last.
1013          if let Some(waker) = w.task_waker.take() {
1014            waker.wake()
1015          }
1016          // Request an interrupt from the isolate if it's running and there's
1017          // not unhandled interrupt request in flight.
1018          if let Some(arg) = w
1019            .inspector_state_ptr
1020            .take()
1021            .map(|ptr| ptr.as_ptr() as *mut c_void)
1022          {
1023            w.isolate_handle.request_interrupt(handle_interrupt, arg);
1024          }
1025          unsafe extern "C" fn handle_interrupt(
1026            _isolate: v8::UnsafeRawIsolatePtr,
1027            arg: *mut c_void,
1028          ) {
1029            // SAFETY: `InspectorWaker` is owned by `JsRuntimeInspector`, so the
1030            // pointer to the latter is valid as long as waker is alive.
1031            let inspector_state =
1032              unsafe { &*(arg as *mut JsRuntimeInspectorState) };
1033            let _ = inspector_state.poll_sessions(None);
1034          }
1035        }
1036        PollState::Parked => {
1037          // Unpark the isolate thread.
1038          let parked_thread = w.parked_thread.take().unwrap();
1039          assert_ne!(parked_thread.id(), thread::current().id());
1040          parked_thread.unpark();
1041        }
1042        _ => {}
1043      };
1044      w.poll_state = PollState::Woken;
1045    });
1046  }
1047}
1048
1049#[derive(Clone, Copy, Debug)]
1050pub enum InspectorSessionKind {
1051  Blocking,
1052  NonBlocking { wait_for_disconnect: bool },
1053}
1054
1055#[derive(Clone)]
1056struct InspectorSessionState {
1057  is_dispatching_message: Rc<RefCell<bool>>,
1058  send: Rc<InspectorSessionSend>,
1059  rx: Rc<RefCell<Option<SessionProxyReceiver>>>,
1060  // Describes if session should keep event loop alive, eg. a local REPL
1061  // session should keep event loop alive, but a Websocket session shouldn't.
1062  kind: InspectorSessionKind,
1063  sessions: Rc<RefCell<SessionContainer>>,
1064  // Thread-safe queue for NodeWorker messages that need to be sent to workers
1065  pending_worker_messages: Arc<Mutex<Vec<(String, String)>>>,
1066  // Track whether NodeWorker.enable has been called (enables VSCode-style worker debugging)
1067  nodeworker_enabled: Rc<Cell<bool>>,
1068  // Track whether Target.setAutoAttach has been called (enables worker auto-attach)
1069  auto_attach_enabled: Rc<Cell<bool>>,
1070  // Track whether Target.setDiscoverTargets has been called (enables target discovery)
1071  discover_targets_enabled: Rc<Cell<bool>>,
1072}
1073
1074/// An inspector session that proxies messages to concrete "transport layer",
1075/// eg. Websocket or another set of channels.
1076struct InspectorSession {
1077  v8_session: v8::inspector::V8InspectorSession,
1078  state: InspectorSessionState,
1079}
1080
1081impl InspectorSession {
1082  const CONTEXT_GROUP_ID: i32 = 1;
1083
1084  #[allow(clippy::too_many_arguments)]
1085  pub fn new(
1086    v8_inspector: Rc<v8::inspector::V8Inspector>,
1087    is_dispatching_message: Rc<RefCell<bool>>,
1088    send: InspectorSessionSend,
1089    rx: Option<SessionProxyReceiver>,
1090    kind: InspectorSessionKind,
1091    sessions: Rc<RefCell<SessionContainer>>,
1092    pending_worker_messages: Arc<Mutex<Vec<(String, String)>>>,
1093    nodeworker_enabled: Rc<Cell<bool>>,
1094    auto_attach_enabled: Rc<Cell<bool>>,
1095    discover_targets_enabled: Rc<Cell<bool>>,
1096  ) -> Rc<Self> {
1097    let state = InspectorSessionState {
1098      is_dispatching_message,
1099      send: Rc::new(send),
1100      rx: Rc::new(RefCell::new(rx)),
1101      kind,
1102      sessions,
1103      pending_worker_messages,
1104      nodeworker_enabled,
1105      auto_attach_enabled,
1106      discover_targets_enabled,
1107    };
1108
1109    let v8_session = v8_inspector.connect(
1110      Self::CONTEXT_GROUP_ID,
1111      v8::inspector::Channel::new(Box::new(state.clone())),
1112      v8::inspector::StringView::empty(),
1113      v8::inspector::V8InspectorClientTrustLevel::FullyTrusted,
1114    );
1115
1116    Rc::new(Self { v8_session, state })
1117  }
1118
1119  // Dispatch message to V8 session
1120  fn dispatch_message(&self, msg: String) {
1121    *self.state.is_dispatching_message.borrow_mut() = true;
1122    let msg = v8::inspector::StringView::from(msg.as_bytes());
1123    self.v8_session.dispatch_protocol_message(msg);
1124    *self.state.is_dispatching_message.borrow_mut() = false;
1125  }
1126
1127  pub fn break_on_next_statement(&self) {
1128    let reason = v8::inspector::StringView::from(&b"debugCommand"[..]);
1129    let detail = v8::inspector::StringView::empty();
1130    self
1131      .v8_session
1132      .schedule_pause_on_next_statement(reason, detail);
1133  }
1134
1135  /// Queue a message to be sent to a worker
1136  fn queue_worker_message(&self, session_id: &str, message: String) {
1137    self
1138      .state
1139      .pending_worker_messages
1140      .lock()
1141      .push((session_id.to_string(), message));
1142  }
1143
1144  /// Notify all registered workers via a callback
1145  fn notify_workers<F>(&self, mut f: F)
1146  where
1147    F: FnMut(&TargetSession, &dyn Fn(InspectorMsg)) + 'static,
1148  {
1149    let sessions = self.state.sessions.clone();
1150    let send = self.state.send.clone();
1151    deno_core::unsync::spawn(async move {
1152      let sessions = sessions.borrow();
1153      for ts in sessions.target_sessions.values() {
1154        f(ts, &|msg| send(msg));
1155      }
1156    });
1157  }
1158}
1159
1160impl InspectorSessionState {
1161  fn send_message(
1162    &self,
1163    msg_kind: InspectorMsgKind,
1164    msg: v8::UniquePtr<v8::inspector::StringBuffer>,
1165  ) {
1166    let msg = msg.unwrap().string().to_string();
1167    (self.send)(InspectorMsg {
1168      kind: msg_kind,
1169      content: msg,
1170    });
1171  }
1172}
1173
1174impl v8::inspector::ChannelImpl for InspectorSessionState {
1175  fn send_response(
1176    &self,
1177    call_id: i32,
1178    message: v8::UniquePtr<v8::inspector::StringBuffer>,
1179  ) {
1180    self.send_message(InspectorMsgKind::Message(call_id), message);
1181  }
1182
1183  fn send_notification(
1184    &self,
1185    message: v8::UniquePtr<v8::inspector::StringBuffer>,
1186  ) {
1187    self.send_message(InspectorMsgKind::Notification, message);
1188  }
1189
1190  fn flush_protocol_notifications(&self) {}
1191}
1192type InspectorSessionPumpMessages = Pin<Box<dyn Future<Output = ()>>>;
1193/// Helper to extract a string param from CDP params
1194fn get_str_param(params: &Option<serde_json::Value>, key: &str) -> String {
1195  params
1196    .as_ref()
1197    .and_then(|p| p.get(key))
1198    .and_then(|v| v.as_str())
1199    .unwrap_or_default()
1200    .to_owned()
1201}
1202
1203/// Helper to extract a bool param from CDP params
1204fn get_bool_param(params: &Option<serde_json::Value>, key: &str) -> bool {
1205  params
1206    .as_ref()
1207    .and_then(|p| p.get(key))
1208    .and_then(|v| v.as_bool())
1209    .unwrap_or(false)
1210}
1211
1212impl TargetSession {
1213  /// Build target info JSON for CDP events
1214  fn target_info(&self, attached: bool) -> serde_json::Value {
1215    json!({
1216      "targetId": self.target_id,
1217      "type": "node_worker",
1218      "title": self.title(),
1219      "url": self.url,
1220      "attached": attached,
1221      "canAccessOpener": true
1222    })
1223  }
1224
1225  /// Build worker info JSON for NodeWorker events
1226  fn worker_info(&self) -> serde_json::Value {
1227    json!({
1228      "workerId": self.target_id,
1229      "type": "node_worker",
1230      "title": self.title(),
1231      "url": self.url
1232    })
1233  }
1234}
1235
1236async fn pump_inspector_session_messages(session: Rc<InspectorSession>) {
1237  let mut rx = session.state.rx.borrow_mut().take().unwrap();
1238
1239  while let Some(msg) = rx.next().await {
1240    let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&msg) else {
1241      session.dispatch_message(msg);
1242      continue;
1243    };
1244
1245    // CDP Flattened Session Mode: route messages with top-level sessionId to workers
1246    if let Some(session_id) = parsed.get("sessionId").and_then(|s| s.as_str()) {
1247      let mut worker_msg = parsed.clone();
1248      if let Some(obj) = worker_msg.as_object_mut() {
1249        obj.remove("sessionId");
1250        session.queue_worker_message(session_id, worker_msg.to_string());
1251      }
1252      continue;
1253    }
1254
1255    let Some(method) = parsed.get("method").and_then(|m| m.as_str()) else {
1256      session.dispatch_message(msg);
1257      continue;
1258    };
1259
1260    let params = parsed.get("params").cloned();
1261    let msg_id = parsed.get("id").cloned();
1262
1263    match method {
1264      "NodeWorker.enable" => {
1265        session.state.nodeworker_enabled.set(true);
1266        session.notify_workers(|ts, send| {
1267          send(InspectorMsg::notification(json!({
1268            "method": "NodeWorker.attachedToWorker",
1269            "params": {
1270              "sessionId": ts.session_id,
1271              "workerInfo": ts.worker_info(),
1272              "waitingForDebugger": false
1273            }
1274          })));
1275        });
1276      }
1277      "NodeWorker.sendMessageToWorker" | "Target.sendMessageToTarget" => {
1278        session.queue_worker_message(
1279          &get_str_param(&params, "sessionId"),
1280          get_str_param(&params, "message"),
1281        );
1282      }
1283      "Target.setDiscoverTargets" => {
1284        let discover = get_bool_param(&params, "discover");
1285        session.state.discover_targets_enabled.set(discover);
1286
1287        if discover {
1288          session.notify_workers(|ts, send| {
1289            send(InspectorMsg::notification(json!({
1290              "method": "Target.targetCreated",
1291              "params": { "targetInfo": ts.target_info(false) }
1292            })));
1293          });
1294        }
1295      }
1296      "Target.setAutoAttach" => {
1297        let auto_attach = get_bool_param(&params, "autoAttach");
1298        let send = session.state.send.clone();
1299        let sessions = session.state.sessions.clone();
1300        session.state.auto_attach_enabled.set(auto_attach);
1301        if auto_attach {
1302          deno_core::unsync::spawn(async move {
1303            let sessions = sessions.borrow();
1304            for ts in sessions.target_sessions.values() {
1305              if ts.attached.replace(true) {
1306                continue; // Skip if already attached
1307              }
1308              send(InspectorMsg::notification(json!({
1309                "method": "Target.attachedToTarget",
1310                "params": {
1311                  "sessionId": ts.session_id,
1312                  "targetInfo": ts.target_info(true),
1313                  "waitingForDebugger": false
1314                }
1315              })));
1316            }
1317          });
1318        }
1319      }
1320      _ => {
1321        session.dispatch_message(msg);
1322        continue;
1323      }
1324    }
1325
1326    // Send response after handling the command
1327    if let Some(id) = msg_id {
1328      let call_id = id.as_i64().unwrap_or(0) as i32;
1329      (session.state.send)(InspectorMsg {
1330        kind: InspectorMsgKind::Message(call_id),
1331        content: json!({
1332          "id": id,
1333          "result": {}
1334        })
1335        .to_string(),
1336      });
1337    }
1338  }
1339}
1340
1341/// A local inspector session that can be used to send and receive protocol messages directly on
1342/// the same thread as an isolate.
1343///
1344/// Does not provide any abstraction over CDP messages.
1345pub struct LocalInspectorSession {
1346  sessions: Rc<RefCell<SessionContainer>>,
1347  session_id: i32,
1348}
1349
1350impl LocalInspectorSession {
1351  pub fn new(session_id: i32, sessions: Rc<RefCell<SessionContainer>>) -> Self {
1352    Self {
1353      sessions,
1354      session_id,
1355    }
1356  }
1357
1358  pub fn dispatch(&mut self, msg: String) {
1359    self
1360      .sessions
1361      .borrow_mut()
1362      .dispatch_message_from_frontend(self.session_id, msg);
1363  }
1364
1365  pub fn post_message<T: serde::Serialize>(
1366    &mut self,
1367    id: i32,
1368    method: &str,
1369    params: Option<T>,
1370  ) {
1371    let message = json!({
1372        "id": id,
1373        "method": method,
1374        "params": params,
1375    });
1376
1377    let stringified_msg = serde_json::to_string(&message).unwrap();
1378    self.dispatch(stringified_msg);
1379  }
1380}