1use std::cell::Cell;
8use std::cell::RefCell;
9use std::collections::HashMap;
10use std::ffi::c_void;
11use std::mem::take;
12use std::pin::Pin;
13use std::ptr::NonNull;
14use std::rc::Rc;
15use std::sync::Arc;
16use std::task::Context;
17use std::task::Poll;
18use std::thread;
19
20use parking_lot::Mutex;
21
22use crate::futures::channel::mpsc;
23use crate::futures::channel::mpsc::UnboundedReceiver;
24use crate::futures::channel::mpsc::UnboundedSender;
25use crate::futures::channel::oneshot;
26use crate::futures::prelude::*;
27use crate::futures::stream::FuturesUnordered;
28use crate::futures::stream::StreamExt;
29use crate::futures::task;
30use crate::serde_json::json;
31
32#[derive(Debug)]
33pub enum InspectorMsgKind {
34 Notification,
35 Message(i32),
36}
37
38#[derive(Debug)]
39pub struct InspectorMsg {
40 pub kind: InspectorMsgKind,
41 pub content: String,
42}
43
44impl InspectorMsg {
45 pub fn notification(content: serde_json::Value) -> Self {
47 Self {
48 kind: InspectorMsgKind::Notification,
49 content: content.to_string(),
50 }
51 }
52}
53
54pub type SessionProxySender = UnboundedSender<InspectorMsg>;
56pub type SessionProxyReceiver = UnboundedReceiver<String>;
58
59pub enum InspectorSessionChannels {
61 Regular {
63 tx: SessionProxySender,
64 rx: SessionProxyReceiver,
65 },
66 Worker {
68 main_to_worker_tx: UnboundedSender<String>,
70 worker_to_main_rx: UnboundedReceiver<InspectorMsg>,
72 worker_url: String,
74 },
75}
76
77pub struct InspectorSessionProxy {
79 pub channels: InspectorSessionChannels,
80 pub kind: InspectorSessionKind,
81}
82
83pub fn create_worker_inspector_session_pair(
85 worker_url: String,
86) -> (InspectorSessionProxy, InspectorSessionProxy) {
87 let (worker_to_main_tx, worker_to_main_rx) =
88 mpsc::unbounded::<InspectorMsg>();
89 let (main_to_worker_tx, main_to_worker_rx) = mpsc::unbounded::<String>();
90
91 let main_side = InspectorSessionProxy {
92 channels: InspectorSessionChannels::Worker {
93 main_to_worker_tx,
94 worker_to_main_rx,
95 worker_url,
96 },
97 kind: InspectorSessionKind::NonBlocking {
98 wait_for_disconnect: false,
99 },
100 };
101
102 let worker_side = InspectorSessionProxy {
103 channels: InspectorSessionChannels::Regular {
104 tx: worker_to_main_tx,
105 rx: main_to_worker_rx,
106 },
107 kind: InspectorSessionKind::NonBlocking {
108 wait_for_disconnect: false,
109 },
110 };
111
112 (main_side, worker_side)
113}
114
115pub type InspectorSessionSend = Box<dyn Fn(InspectorMsg)>;
116
117#[derive(Clone, Copy, Debug)]
118enum PollState {
119 Idle,
120 Woken,
121 Polling,
122 Parked,
123 Dropped,
124}
125
126pub struct JsRuntimeInspector {
137 v8_inspector: Rc<v8::inspector::V8Inspector>,
138 new_session_tx: UnboundedSender<InspectorSessionProxy>,
139 deregister_tx: RefCell<Option<oneshot::Sender<()>>>,
140 state: Rc<JsRuntimeInspectorState>,
141}
142
143impl Drop for JsRuntimeInspector {
144 fn drop(&mut self) {
145 self
149 .state
150 .waker
151 .update(|w| w.poll_state = PollState::Dropped);
152
153 self.state.sessions.borrow_mut().drop_sessions();
158
159 if let Some(deregister_tx) = self.deregister_tx.borrow_mut().take() {
163 let _ = deregister_tx.send(());
164 }
165 }
166}
167
168#[derive(Clone)]
169struct JsRuntimeInspectorState {
170 isolate_ptr: v8::UnsafeRawIsolatePtr,
171 context: v8::Global<v8::Context>,
172 flags: Rc<RefCell<InspectorFlags>>,
173 waker: Arc<InspectorWaker>,
174 sessions: Rc<RefCell<SessionContainer>>,
175 is_dispatching_message: Rc<RefCell<bool>>,
176 pending_worker_messages: Arc<Mutex<Vec<(String, String)>>>,
177 nodeworker_enabled: Rc<Cell<bool>>,
178 auto_attach_enabled: Rc<Cell<bool>>,
179 discover_targets_enabled: Rc<Cell<bool>>,
180}
181
182struct JsRuntimeInspectorClient(Rc<JsRuntimeInspectorState>);
183
184impl v8::inspector::V8InspectorClientImpl for JsRuntimeInspectorClient {
185 fn run_message_loop_on_pause(&self, context_group_id: i32) {
186 assert_eq!(context_group_id, JsRuntimeInspector::CONTEXT_GROUP_ID);
187 self.0.flags.borrow_mut().on_pause = true;
188 let _ = self.0.poll_sessions(None);
189 }
190
191 fn quit_message_loop_on_pause(&self) {
192 self.0.flags.borrow_mut().on_pause = false;
193 }
194
195 fn run_if_waiting_for_debugger(&self, context_group_id: i32) {
196 assert_eq!(context_group_id, JsRuntimeInspector::CONTEXT_GROUP_ID);
197 let mut flags = self.0.flags.borrow_mut();
198 flags.waiting_for_session = false;
199 flags.paused_on_start = false;
200 }
201
202 fn ensure_default_context_in_group(
203 &self,
204 context_group_id: i32,
205 ) -> Option<v8::Local<'_, v8::Context>> {
206 assert_eq!(context_group_id, JsRuntimeInspector::CONTEXT_GROUP_ID);
207 let context = self.0.context.clone();
208 let mut isolate =
209 unsafe { v8::Isolate::from_raw_isolate_ptr(self.0.isolate_ptr) };
210 let isolate = &mut isolate;
211 v8::callback_scope!(unsafe scope, isolate);
212 let local = v8::Local::new(scope, context);
213 Some(unsafe { local.extend_lifetime_unchecked() })
214 }
215
216 fn resource_name_to_url(
217 &self,
218 resource_name: &v8::inspector::StringView,
219 ) -> Option<v8::UniquePtr<v8::inspector::StringBuffer>> {
220 let resource_name = resource_name.to_string();
221 #[allow(
222 clippy::disallowed_methods,
223 reason = "Url::from_file_path is more efficient when not using the error and this code doesn't need Wasm support"
224 )]
225 let url = url::Url::from_file_path(resource_name).ok()?;
226 let src_view = v8::inspector::StringView::from(url.as_str().as_bytes());
227 Some(v8::inspector::StringBuffer::create(src_view))
228 }
229}
230
231impl JsRuntimeInspectorState {
232 #[allow(clippy::result_unit_err, reason = "error details not needed")]
233 pub fn poll_sessions(
234 &self,
235 mut invoker_cx: Option<&mut Context>,
236 ) -> Result<Poll<()>, ()> {
237 let Ok(mut sessions) = self.sessions.try_borrow_mut() else {
242 return Err(());
243 };
244
245 self.waker.update(|w| {
246 match w.poll_state {
247 PollState::Idle | PollState::Woken => w.poll_state = PollState::Polling,
248 _ => unreachable!(),
249 };
250 });
251
252 let waker_ref = task::waker_ref(&self.waker);
255 let cx = &mut Context::from_waker(&waker_ref);
256
257 loop {
258 loop {
259 if let Some(session) = sessions.handshake.take() {
261 let id = sessions.next_local_id;
262 sessions.next_local_id += 1;
263 let mut fut =
264 pump_inspector_session_messages(session.clone(), id).boxed_local();
265 if fut.poll_unpin(cx).is_pending() {
273 sessions.established.push(fut);
274 sessions.local.insert(id, session);
275
276 if sessions.main_session_id.is_none() {
278 sessions.main_session_id = Some(id);
279 }
280 }
281
282 continue;
283 }
284
285 if let Poll::Ready(Some(session_proxy)) =
287 sessions.session_rx.poll_next_unpin(cx)
288 {
289 match session_proxy.channels {
290 InspectorSessionChannels::Worker {
291 main_to_worker_tx,
292 worker_to_main_rx,
293 worker_url,
294 } => {
295 let worker_id = sessions.next_local_id;
297 sessions.next_local_id += 1;
298
299 sessions.register_worker_session(worker_id, worker_url.clone());
300
301 sessions.register_worker_channels(
303 worker_id,
304 main_to_worker_tx,
305 worker_to_main_rx,
306 );
307
308 if let Some(main_id) = sessions.main_session_id
318 && let Some(main_session) = sessions.local.get(&main_id)
319 && let Some(ts) =
320 sessions.target_sessions.get(&format!("{}", worker_id))
321 {
322 if self.discover_targets_enabled.get() {
323 (main_session.state.send)(InspectorMsg::notification(
324 json!({
325 "method": "Target.targetCreated",
326 "params": { "targetInfo": ts.target_info(false) }
327 }),
328 ));
329 }
330
331 if self.auto_attach_enabled.get() {
332 ts.attached.set(true);
333 (main_session.state.send)(InspectorMsg::notification(
334 json!({
335 "method": "Target.attachedToTarget",
336 "params": {
337 "sessionId": ts.session_id,
338 "targetInfo": ts.target_info(true),
339 "waitingForDebugger": false
340 }
341 }),
342 ));
343 }
344 }
345
346 continue;
347 }
348 InspectorSessionChannels::Regular { tx, rx } => {
349 let session = InspectorSession::new(
351 sessions.v8_inspector.as_ref().unwrap().clone(),
352 self.is_dispatching_message.clone(),
353 Box::new(move |msg| {
354 let _ = tx.unbounded_send(msg);
355 }),
356 Some(rx),
357 session_proxy.kind,
358 self.sessions.clone(),
359 self.pending_worker_messages.clone(),
360 self.nodeworker_enabled.clone(),
361 self.auto_attach_enabled.clone(),
362 self.discover_targets_enabled.clone(),
363 self.flags.clone(),
364 );
365
366 let prev = sessions.handshake.replace(session);
367 assert!(prev.is_none());
368 continue;
369 }
370 }
371 }
372
373 if let Some(main_id) = sessions.main_session_id {
375 let main_session_send =
377 sessions.local.get(&main_id).map(|s| s.state.send.clone());
378
379 if let Some(send) = main_session_send {
380 let mut has_worker_message = false;
381 let mut terminated_workers = Vec::new();
382
383 for target_session in sessions.target_sessions.values() {
384 if !target_session.has_channels() {
386 continue;
387 }
388 match target_session.poll_from_worker(cx) {
389 Poll::Ready(Some(msg)) => {
390 if !self.nodeworker_enabled.get() {
394 if let Ok(mut parsed) =
395 serde_json::from_str::<serde_json::Value>(&msg.content)
396 {
397 if let Some(obj) = parsed.as_object_mut() {
398 obj.insert(
399 "sessionId".to_string(),
400 json!(target_session.session_id),
401 );
402 let flattened_msg = parsed.to_string();
403
404 send(InspectorMsg {
406 kind: msg.kind,
407 content: flattened_msg,
408 });
409 }
410 } else {
411 send(msg);
413 }
414 } else {
415 let wrapped_nodeworker = json!({
416 "method": "NodeWorker.receivedMessageFromWorker",
417 "params": {
418 "sessionId": target_session.session_id,
419 "message": msg.content,
420 "workerId": target_session.target_id
421 }
422 });
423
424 send(InspectorMsg {
425 kind: InspectorMsgKind::Notification,
426 content: wrapped_nodeworker.to_string(),
427 });
428 }
429
430 has_worker_message = true;
431 }
432 Poll::Ready(None) => {
433 if self.nodeworker_enabled.get() {
436 send(InspectorMsg::notification(json!({
438 "method": "NodeWorker.detachedFromWorker",
439 "params": {
440 "sessionId": target_session.session_id
441 }
442 })));
443 } else if self.auto_attach_enabled.get()
444 || self.discover_targets_enabled.get()
445 {
446 send(InspectorMsg::notification(json!({
448 "method": "Target.targetDestroyed",
449 "params": {
450 "targetId": target_session.target_id
451 }
452 })));
453 }
454 terminated_workers.push(target_session.session_id.clone());
455 }
456 Poll::Pending => {}
457 }
458 }
459
460 for session_id in terminated_workers {
462 sessions.target_sessions.remove(&session_id);
463 has_worker_message = true; }
465
466 if has_worker_message {
467 continue;
468 }
469 }
470 }
471
472 match sessions.established.poll_next_unpin(cx) {
474 Poll::Ready(Some(completed_id)) => {
475 sessions.local.remove(&completed_id);
479 if sessions.main_session_id == Some(completed_id) {
483 sessions.main_session_id = sessions.local.keys().next().copied();
487 }
488 continue;
489 }
490 Poll::Ready(None) => {
491 break;
492 }
493 Poll::Pending => {
494 break;
495 }
496 };
497 }
498
499 let should_block = {
500 let flags = self.flags.borrow();
501 flags.on_pause || flags.waiting_for_session
502 };
503
504 let pending_messages: Vec<(String, String)> = {
507 let mut queue = self.pending_worker_messages.lock();
508 queue.drain(..).collect()
509 };
510
511 for (session_id, message) in pending_messages {
512 if let Some(target_session) = sessions.target_sessions.get(&session_id)
513 {
514 target_session.send_to_worker(message);
515 }
516 }
517
518 let new_state = self.waker.update(|w| {
519 match w.poll_state {
520 PollState::Woken => {
521 w.poll_state = PollState::Polling;
524 }
525 PollState::Polling if !should_block => {
526 w.poll_state = PollState::Idle;
530 if let Some(cx) = invoker_cx.take() {
533 w.task_waker.replace(cx.waker().clone());
534 }
535 w.inspector_state_ptr = NonNull::new(self as *const _ as *mut Self);
538 }
539 PollState::Polling if should_block => {
540 w.poll_state = PollState::Parked;
545 w.parked_thread.replace(thread::current());
546 }
547 _ => unreachable!(),
548 };
549 w.poll_state
550 });
551 match new_state {
552 PollState::Idle => break, PollState::Polling => continue, PollState::Parked => thread::park(), _ => unreachable!(),
556 };
557 }
558
559 Ok(Poll::Pending)
560 }
561}
562
563impl JsRuntimeInspector {
564 const CONTEXT_GROUP_ID: i32 = 1;
567
568 pub fn new(
569 isolate_ptr: v8::UnsafeRawIsolatePtr,
570 scope: &mut v8::PinScope,
571 context: v8::Local<v8::Context>,
572 is_main_runtime: bool,
573 worker_id: Option<u32>,
574 ) -> Rc<Self> {
575 let (new_session_tx, new_session_rx) =
576 mpsc::unbounded::<InspectorSessionProxy>();
577
578 let waker = InspectorWaker::new(scope.thread_safe_handle());
579 let state = Rc::new(JsRuntimeInspectorState {
580 waker,
581 flags: Default::default(),
582 isolate_ptr,
583 context: v8::Global::new(scope, context),
584 sessions: Rc::new(
585 RefCell::new(SessionContainer::temporary_placeholder()),
586 ),
587 is_dispatching_message: Default::default(),
588 pending_worker_messages: Arc::new(Mutex::new(Vec::new())),
589 nodeworker_enabled: Rc::new(Cell::new(false)),
590 auto_attach_enabled: Rc::new(Cell::new(false)),
591 discover_targets_enabled: Rc::new(Cell::new(false)),
592 });
593 let client = Box::new(JsRuntimeInspectorClient(state.clone()));
594 let v8_inspector_client = v8::inspector::V8InspectorClient::new(client);
595 let v8_inspector = Rc::new(v8::inspector::V8Inspector::create(
596 scope,
597 v8_inspector_client,
598 ));
599
600 *state.sessions.borrow_mut() =
601 SessionContainer::new(v8_inspector.clone(), new_session_rx);
602
603 let context_name_bytes = if is_main_runtime {
605 &b"main realm"[..]
606 } else {
607 &format!("worker [{}]", worker_id.unwrap_or(1)).into_bytes()
608 };
609
610 let context_name = v8::inspector::StringView::from(context_name_bytes);
611 let aux_data = if is_main_runtime {
617 r#"{"isDefault": true, "type": "default"}"#
618 } else {
619 r#"{"isDefault": false, "type": "worker"}"#
620 };
621 let aux_data_view = v8::inspector::StringView::from(aux_data.as_bytes());
622 v8_inspector.context_created(
623 context,
624 Self::CONTEXT_GROUP_ID,
625 context_name,
626 aux_data_view,
627 );
628
629 let _ = state.poll_sessions(None).unwrap();
632
633 Rc::new(Self {
634 v8_inspector,
635 state,
636 new_session_tx,
637 deregister_tx: RefCell::new(None),
638 })
639 }
640
641 pub fn is_dispatching_message(&self) -> bool {
642 *self.state.is_dispatching_message.borrow()
643 }
644
645 pub fn context_destroyed(
646 &self,
647 scope: &mut v8::PinScope<'_, '_>,
648 context: v8::Global<v8::Context>,
649 ) {
650 let context = v8::Local::new(scope, context);
651 self.v8_inspector.context_destroyed(context);
652 }
653
654 pub fn exception_thrown(
655 &self,
656 scope: &mut v8::PinScope<'_, '_>,
657 exception: v8::Local<'_, v8::Value>,
658 in_promise: bool,
659 ) {
660 let context = scope.get_current_context();
661 let message = v8::Exception::create_message(scope, exception);
662 let stack_trace = message.get_stack_trace(scope);
663 let stack_trace = self.v8_inspector.create_stack_trace(stack_trace);
664 self.v8_inspector.exception_thrown(
665 context,
666 if in_promise {
667 v8::inspector::StringView::from("Uncaught (in promise)".as_bytes())
668 } else {
669 v8::inspector::StringView::from("Uncaught".as_bytes())
670 },
671 exception,
672 v8::inspector::StringView::from("".as_bytes()),
673 v8::inspector::StringView::from("".as_bytes()),
674 0,
675 0,
676 stack_trace,
677 0,
678 );
679 }
680
681 pub fn broadcast_context_destroyed(&self) {
687 let sessions = self.state.sessions.borrow();
688 for session in sessions.local.values() {
689 (session.state.send)(InspectorMsg::notification(json!({
690 "method": "Runtime.executionContextDestroyed",
691 "params": { "executionContextId": Self::CONTEXT_GROUP_ID }
692 })));
693 }
694 }
695
696 pub fn wait_for_sessions_disconnect(&self) {
709 loop {
710 {
711 let sessions = self.state.sessions.borrow();
712 if sessions.local.is_empty() && sessions.established.is_empty() {
713 break;
714 }
715 }
716 let _ = self.state.poll_sessions(None);
717 }
718 }
719
720 pub fn sessions_state(&self) -> SessionsState {
721 self.state.sessions.borrow().sessions_state()
722 }
723
724 pub fn poll_sessions_from_event_loop(&self, cx: &mut Context) {
725 let _ = self.state.poll_sessions(Some(cx)).unwrap();
726 }
727
728 pub fn wait_for_session(&self) {
731 loop {
732 if let Some(_session) =
733 self.state.sessions.borrow_mut().local.values().next()
734 {
735 self.state.flags.borrow_mut().waiting_for_session = false;
736 break;
737 } else {
738 self.state.flags.borrow_mut().waiting_for_session = true;
739 let _ = self.state.poll_sessions(None).unwrap();
740 }
741 }
742 }
743
744 pub fn wait_for_session_and_break_on_next_statement(&self) {
751 self.state.flags.borrow_mut().paused_on_start = true;
752 loop {
764 if !self.state.flags.borrow().paused_on_start {
765 break;
766 }
767 self.state.flags.borrow_mut().waiting_for_session = true;
768 let _ = self.state.poll_sessions(None).unwrap();
769 }
770 let sessions = self.state.sessions.borrow();
778 let resumed_by = self.state.flags.borrow().resumed_by_session_id;
779 let session = resumed_by
780 .and_then(|id| sessions.local.get(&id))
781 .or_else(|| sessions.local.values().next());
782 if let Some(session) = session {
783 let reason = v8::inspector::StringView::from(&b"debugCommand"[..]);
784 let detail = v8::inspector::StringView::empty();
785 session
786 .v8_session
787 .schedule_pause_on_next_statement(reason, detail);
788 }
789 self.state.flags.borrow_mut().resumed_by_session_id = None;
791 }
792
793 pub fn get_session_sender(&self) -> UnboundedSender<InspectorSessionProxy> {
795 self.new_session_tx.clone()
796 }
797
798 pub fn add_deregister_handler(&self) -> oneshot::Receiver<()> {
802 let maybe_deregister_tx = self.deregister_tx.borrow_mut().take();
803 if let Some(deregister_tx) = maybe_deregister_tx
804 && !deregister_tx.is_canceled()
805 {
806 panic!("Inspector deregister handler already exists and is alive.");
807 }
808 let (tx, rx) = oneshot::channel::<()>();
809 self.deregister_tx.borrow_mut().replace(tx);
810 rx
811 }
812
813 pub fn create_local_session(
814 inspector: Rc<JsRuntimeInspector>,
815 callback: InspectorSessionSend,
816 kind: InspectorSessionKind,
817 ) -> LocalInspectorSession {
818 let (session_id, sessions) = {
819 let sessions = inspector.state.sessions.clone();
820
821 let inspector_session = InspectorSession::new(
822 inspector.v8_inspector.clone(),
823 inspector.state.is_dispatching_message.clone(),
824 callback,
825 None,
826 kind,
827 sessions.clone(),
828 inspector.state.pending_worker_messages.clone(),
829 inspector.state.nodeworker_enabled.clone(),
830 inspector.state.auto_attach_enabled.clone(),
831 inspector.state.discover_targets_enabled.clone(),
832 inspector.state.flags.clone(),
833 );
834
835 let session_id = {
836 let mut s = sessions.borrow_mut();
837 let id = s.next_local_id;
838 s.next_local_id += 1;
839 assert!(s.local.insert(id, inspector_session).is_none());
840 id
841 };
842
843 take(&mut inspector.state.flags.borrow_mut().waiting_for_session);
844 (session_id, sessions)
845 };
846
847 LocalInspectorSession::new(session_id, sessions)
848 }
849}
850
851#[derive(Default)]
852struct InspectorFlags {
853 waiting_for_session: bool,
854 on_pause: bool,
855 paused_on_start: bool,
859 resumed_by_session_id: Option<i32>,
862}
863
864#[derive(Debug)]
865pub struct SessionsState {
866 pub has_active: bool,
867 pub has_blocking: bool,
868 pub has_nonblocking: bool,
869 pub has_nonblocking_wait_for_disconnect: bool,
870}
871
872pub struct SessionContainer {
875 v8_inspector: Option<Rc<v8::inspector::V8Inspector>>,
876 session_rx: UnboundedReceiver<InspectorSessionProxy>,
877 handshake: Option<Rc<InspectorSession>>,
878 established: FuturesUnordered<InspectorSessionPumpMessages>,
879 next_local_id: i32,
880 local: HashMap<i32, Rc<InspectorSession>>,
881
882 target_sessions: HashMap<String, Rc<TargetSession>>, main_session_id: Option<i32>, next_worker_id: u32, }
886
887struct MainWorkerChannels {
888 main_to_worker_tx: UnboundedSender<String>,
889 worker_to_main_rx: UnboundedReceiver<InspectorMsg>,
890}
891
892struct TargetSession {
894 target_id: String,
895 session_id: String,
896 local_session_id: i32,
897 worker_id: u32,
899 main_worker_channels: RefCell<Option<MainWorkerChannels>>,
900 url: String,
901 attached: Cell<bool>,
903}
904
905impl TargetSession {
906 fn title(&self) -> String {
909 format!("worker [{}]", self.worker_id)
910 }
911
912 fn send_to_worker(&self, message: String) {
914 if let Some(channels) = self.main_worker_channels.borrow().as_ref() {
915 let _ = channels.main_to_worker_tx.unbounded_send(message);
916 }
917 }
918
919 fn has_channels(&self) -> bool {
921 self.main_worker_channels.borrow().is_some()
922 }
923
924 fn poll_from_worker(&self, cx: &mut Context) -> Poll<Option<InspectorMsg>> {
928 self
929 .main_worker_channels
930 .borrow_mut()
931 .as_mut()
932 .expect("poll_from_worker called before channels were registered")
933 .worker_to_main_rx
934 .poll_next_unpin(cx)
935 }
936}
937
938impl SessionContainer {
939 fn new(
940 v8_inspector: Rc<v8::inspector::V8Inspector>,
941 new_session_rx: UnboundedReceiver<InspectorSessionProxy>,
942 ) -> Self {
943 Self {
944 v8_inspector: Some(v8_inspector),
945 session_rx: new_session_rx,
946 handshake: None,
947 established: FuturesUnordered::new(),
948 next_local_id: 1,
949 local: HashMap::new(),
950
951 target_sessions: HashMap::new(),
952 main_session_id: None,
953 next_worker_id: 1, }
955 }
956
957 fn drop_sessions(&mut self) {
962 self.v8_inspector = Default::default();
963 self.handshake.take();
964 self.established.clear();
965 self.local.clear();
966 }
967
968 fn sessions_state(&self) -> SessionsState {
969 SessionsState {
970 has_active: !self.established.is_empty()
971 || self.handshake.is_some()
972 || !self.local.is_empty(),
973 has_blocking: self
974 .local
975 .values()
976 .any(|s| matches!(s.state.kind, InspectorSessionKind::Blocking)),
977 has_nonblocking: self.local.values().any(|s| {
978 matches!(s.state.kind, InspectorSessionKind::NonBlocking { .. })
979 }),
980 has_nonblocking_wait_for_disconnect: self.local.values().any(|s| {
981 matches!(
982 s.state.kind,
983 InspectorSessionKind::NonBlocking {
984 wait_for_disconnect: true
985 }
986 )
987 }),
988 }
989 }
990
991 fn temporary_placeholder() -> Self {
996 let (_tx, rx) = mpsc::unbounded::<InspectorSessionProxy>();
997 Self {
998 v8_inspector: Default::default(),
999 session_rx: rx,
1000 handshake: None,
1001 established: FuturesUnordered::new(),
1002 next_local_id: 1,
1003 local: HashMap::new(),
1004
1005 target_sessions: HashMap::new(),
1006 main_session_id: None,
1007 next_worker_id: 1,
1008 }
1009 }
1010
1011 pub fn dispatch_message_from_frontend(
1012 &mut self,
1013 session_id: i32,
1014 message: String,
1015 ) {
1016 let session = self.local.get(&session_id).unwrap();
1017 session.dispatch_message(message);
1018 }
1019
1020 fn register_worker_session(
1022 &mut self,
1023 local_session_id: i32,
1024 worker_url: String,
1025 ) -> u32 {
1026 let worker_id = self.next_worker_id;
1028 self.next_worker_id += 1;
1029
1030 let target_id = format!("{}", local_session_id);
1032 let session_id = format!("{}", local_session_id);
1033
1034 let target_session = Rc::new(TargetSession {
1035 target_id: target_id.clone(),
1036 session_id: session_id.clone(),
1037 local_session_id,
1038 worker_id,
1039 main_worker_channels: RefCell::new(None),
1040 url: worker_url.clone(),
1041 attached: Cell::new(false),
1042 });
1043 self
1044 .target_sessions
1045 .insert(session_id.clone(), target_session.clone());
1046
1047 worker_id
1048 }
1049
1050 pub fn register_worker_channels(
1053 &mut self,
1054 local_session_id: i32,
1055 main_to_worker_tx: UnboundedSender<String>,
1056 worker_to_main_rx: UnboundedReceiver<InspectorMsg>,
1057 ) -> bool {
1058 for target_session in self.target_sessions.values() {
1060 if target_session.local_session_id == local_session_id {
1061 *target_session.main_worker_channels.borrow_mut() =
1062 Some(MainWorkerChannels {
1063 main_to_worker_tx,
1064 worker_to_main_rx,
1065 });
1066 return true;
1067 }
1068 }
1069 false
1070 }
1071}
1072
1073struct InspectorWakerInner {
1074 poll_state: PollState,
1075 task_waker: Option<task::Waker>,
1076 parked_thread: Option<thread::Thread>,
1077 inspector_state_ptr: Option<NonNull<JsRuntimeInspectorState>>,
1078 isolate_handle: v8::IsolateHandle,
1079}
1080
1081unsafe impl Send for InspectorWakerInner {}
1083
1084struct InspectorWaker(Mutex<InspectorWakerInner>);
1085
1086impl InspectorWaker {
1087 fn new(isolate_handle: v8::IsolateHandle) -> Arc<Self> {
1088 let inner = InspectorWakerInner {
1089 poll_state: PollState::Idle,
1090 task_waker: None,
1091 parked_thread: None,
1092 inspector_state_ptr: None,
1093 isolate_handle,
1094 };
1095 Arc::new(Self(Mutex::new(inner)))
1096 }
1097
1098 fn update<F, R>(&self, update_fn: F) -> R
1099 where
1100 F: FnOnce(&mut InspectorWakerInner) -> R,
1101 {
1102 let mut g = self.0.lock();
1103 update_fn(&mut g)
1104 }
1105}
1106
1107impl task::ArcWake for InspectorWaker {
1108 fn wake_by_ref(arc_self: &Arc<Self>) {
1109 arc_self.update(|w| {
1110 match w.poll_state {
1111 PollState::Idle => {
1112 if let Some(waker) = w.task_waker.take() {
1114 waker.wake()
1115 }
1116 if let Some(arg) = w
1119 .inspector_state_ptr
1120 .take()
1121 .map(|ptr| ptr.as_ptr() as *mut c_void)
1122 {
1123 w.isolate_handle.request_interrupt(handle_interrupt, arg);
1124 }
1125 unsafe extern "C" fn handle_interrupt(
1126 _isolate: v8::UnsafeRawIsolatePtr,
1127 arg: *mut c_void,
1128 ) {
1129 let inspector_state =
1132 unsafe { &*(arg as *mut JsRuntimeInspectorState) };
1133 let _ = inspector_state.poll_sessions(None);
1134 }
1135 }
1136 PollState::Parked => {
1137 let parked_thread = w.parked_thread.take().unwrap();
1139 assert_ne!(parked_thread.id(), thread::current().id());
1140 parked_thread.unpark();
1141 }
1142 _ => {}
1143 };
1144 w.poll_state = PollState::Woken;
1145 });
1146 }
1147}
1148
1149#[derive(Clone, Copy, Debug)]
1150pub enum InspectorSessionKind {
1151 Blocking,
1152 NonBlocking { wait_for_disconnect: bool },
1153}
1154
1155#[derive(Clone)]
1156struct InspectorSessionState {
1157 is_dispatching_message: Rc<RefCell<bool>>,
1158 send: Rc<InspectorSessionSend>,
1159 rx: Rc<RefCell<Option<SessionProxyReceiver>>>,
1160 kind: InspectorSessionKind,
1163 sessions: Rc<RefCell<SessionContainer>>,
1164 pending_worker_messages: Arc<Mutex<Vec<(String, String)>>>,
1166 nodeworker_enabled: Rc<Cell<bool>>,
1168 auto_attach_enabled: Rc<Cell<bool>>,
1170 discover_targets_enabled: Rc<Cell<bool>>,
1172 noderuntime_enabled: Cell<bool>,
1175 flags: Rc<RefCell<InspectorFlags>>,
1177}
1178
1179struct InspectorSession {
1182 v8_session: v8::inspector::V8InspectorSession,
1183 state: InspectorSessionState,
1184}
1185
1186impl InspectorSession {
1187 const CONTEXT_GROUP_ID: i32 = 1;
1188
1189 #[allow(clippy::too_many_arguments, reason = "construction")]
1190 pub fn new(
1191 v8_inspector: Rc<v8::inspector::V8Inspector>,
1192 is_dispatching_message: Rc<RefCell<bool>>,
1193 send: InspectorSessionSend,
1194 rx: Option<SessionProxyReceiver>,
1195 kind: InspectorSessionKind,
1196 sessions: Rc<RefCell<SessionContainer>>,
1197 pending_worker_messages: Arc<Mutex<Vec<(String, String)>>>,
1198 nodeworker_enabled: Rc<Cell<bool>>,
1199 auto_attach_enabled: Rc<Cell<bool>>,
1200 discover_targets_enabled: Rc<Cell<bool>>,
1201 flags: Rc<RefCell<InspectorFlags>>,
1202 ) -> Rc<Self> {
1203 let state = InspectorSessionState {
1204 is_dispatching_message,
1205 send: Rc::new(send),
1206 rx: Rc::new(RefCell::new(rx)),
1207 kind,
1208 sessions,
1209 pending_worker_messages,
1210 nodeworker_enabled,
1211 auto_attach_enabled,
1212 discover_targets_enabled,
1213 noderuntime_enabled: Cell::new(false),
1214 flags,
1215 };
1216
1217 let v8_session = v8_inspector.connect(
1218 Self::CONTEXT_GROUP_ID,
1219 v8::inspector::Channel::new(Box::new(state.clone())),
1220 v8::inspector::StringView::empty(),
1221 v8::inspector::V8InspectorClientTrustLevel::FullyTrusted,
1222 );
1223
1224 Rc::new(Self { v8_session, state })
1225 }
1226
1227 fn dispatch_message(&self, msg: String) {
1229 *self.state.is_dispatching_message.borrow_mut() = true;
1230 let msg = v8::inspector::StringView::from(msg.as_bytes());
1231 self.v8_session.dispatch_protocol_message(msg);
1232 *self.state.is_dispatching_message.borrow_mut() = false;
1233 }
1234
1235 fn queue_worker_message(&self, session_id: &str, message: String) {
1237 self
1238 .state
1239 .pending_worker_messages
1240 .lock()
1241 .push((session_id.to_string(), message));
1242 }
1243
1244 fn notify_workers<F>(&self, mut f: F)
1246 where
1247 F: FnMut(&TargetSession, &dyn Fn(InspectorMsg)) + 'static,
1248 {
1249 let sessions = self.state.sessions.clone();
1250 let send = self.state.send.clone();
1251 deno_core::unsync::spawn(async move {
1252 let sessions = sessions.borrow();
1253 for ts in sessions.target_sessions.values() {
1254 f(ts, &|msg| send(msg));
1255 }
1256 });
1257 }
1258}
1259
1260impl InspectorSessionState {
1261 fn send_message(
1262 &self,
1263 msg_kind: InspectorMsgKind,
1264 msg: v8::UniquePtr<v8::inspector::StringBuffer>,
1265 ) {
1266 let msg = msg.unwrap().string().to_string();
1267 (self.send)(InspectorMsg {
1268 kind: msg_kind,
1269 content: msg,
1270 });
1271 }
1272}
1273
1274impl v8::inspector::ChannelImpl for InspectorSessionState {
1275 fn send_response(
1276 &self,
1277 call_id: i32,
1278 message: v8::UniquePtr<v8::inspector::StringBuffer>,
1279 ) {
1280 self.send_message(InspectorMsgKind::Message(call_id), message);
1281 }
1282
1283 fn send_notification(
1284 &self,
1285 message: v8::UniquePtr<v8::inspector::StringBuffer>,
1286 ) {
1287 self.send_message(InspectorMsgKind::Notification, message);
1288 }
1289
1290 fn flush_protocol_notifications(&self) {}
1291}
1292type InspectorSessionPumpMessages = Pin<Box<dyn Future<Output = i32>>>;
1293fn get_str_param(params: &Option<serde_json::Value>, key: &str) -> String {
1295 params
1296 .as_ref()
1297 .and_then(|p| p.get(key))
1298 .and_then(|v| v.as_str())
1299 .unwrap_or_default()
1300 .to_owned()
1301}
1302
1303fn get_bool_param(params: &Option<serde_json::Value>, key: &str) -> bool {
1305 params
1306 .as_ref()
1307 .and_then(|p| p.get(key))
1308 .and_then(|v| v.as_bool())
1309 .unwrap_or(false)
1310}
1311
1312impl TargetSession {
1313 fn target_info(&self, attached: bool) -> serde_json::Value {
1315 json!({
1316 "targetId": self.target_id,
1317 "type": "node_worker",
1318 "title": self.title(),
1319 "url": self.url,
1320 "attached": attached,
1321 "canAccessOpener": true
1322 })
1323 }
1324
1325 fn worker_info(&self) -> serde_json::Value {
1327 json!({
1328 "workerId": self.target_id,
1329 "type": "node_worker",
1330 "title": self.title(),
1331 "url": self.url
1332 })
1333 }
1334}
1335
1336async fn pump_inspector_session_messages(
1337 session: Rc<InspectorSession>,
1338 session_id: i32,
1339) -> i32 {
1340 let mut rx = session.state.rx.borrow_mut().take().unwrap();
1341
1342 while let Some(msg) = rx.next().await {
1343 let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&msg) else {
1344 session.dispatch_message(msg);
1345 continue;
1346 };
1347
1348 if let Some(session_id) = parsed.get("sessionId").and_then(|s| s.as_str()) {
1350 let mut worker_msg = parsed.clone();
1351 if let Some(obj) = worker_msg.as_object_mut() {
1352 obj.remove("sessionId");
1353 session.queue_worker_message(session_id, worker_msg.to_string());
1354 }
1355 continue;
1356 }
1357
1358 let Some(method) = parsed.get("method").and_then(|m| m.as_str()) else {
1359 session.dispatch_message(msg);
1360 continue;
1361 };
1362
1363 let params = parsed.get("params").cloned();
1364 let msg_id = parsed.get("id").cloned();
1365
1366 match method {
1367 "NodeRuntime.enable" => {
1368 session.state.noderuntime_enabled.set(true);
1369 let flags = session.state.flags.borrow();
1374 if flags.waiting_for_session || flags.paused_on_start {
1375 drop(flags);
1376 (session.state.send)(InspectorMsg::notification(json!({
1377 "method": "NodeRuntime.waitingForDebugger"
1378 })));
1379 }
1380 }
1381 "NodeRuntime.disable" => {
1382 session.state.noderuntime_enabled.set(false);
1383 }
1384 "Runtime.runIfWaitingForDebugger" => {
1385 {
1390 let mut flags = session.state.flags.borrow_mut();
1391 flags.paused_on_start = false;
1392 flags.waiting_for_session = false;
1393 flags.resumed_by_session_id = Some(session_id);
1394 }
1395 session.dispatch_message(msg);
1397 continue;
1398 }
1399 "NodeWorker.enable" => {
1400 session.state.nodeworker_enabled.set(true);
1401 session.notify_workers(|ts, send| {
1402 send(InspectorMsg::notification(json!({
1403 "method": "NodeWorker.attachedToWorker",
1404 "params": {
1405 "sessionId": ts.session_id,
1406 "workerInfo": ts.worker_info(),
1407 "waitingForDebugger": false
1408 }
1409 })));
1410 });
1411 }
1412 "NodeWorker.sendMessageToWorker" | "Target.sendMessageToTarget" => {
1413 session.queue_worker_message(
1414 &get_str_param(¶ms, "sessionId"),
1415 get_str_param(¶ms, "message"),
1416 );
1417 }
1418 "Target.setDiscoverTargets" => {
1419 let discover = get_bool_param(¶ms, "discover");
1420 session.state.discover_targets_enabled.set(discover);
1421
1422 if discover {
1423 session.notify_workers(|ts, send| {
1424 send(InspectorMsg::notification(json!({
1425 "method": "Target.targetCreated",
1426 "params": { "targetInfo": ts.target_info(false) }
1427 })));
1428 });
1429 }
1430 }
1431 "Target.setAutoAttach" => {
1432 let auto_attach = get_bool_param(¶ms, "autoAttach");
1433 let send = session.state.send.clone();
1434 let sessions = session.state.sessions.clone();
1435 session.state.auto_attach_enabled.set(auto_attach);
1436 if auto_attach {
1437 deno_core::unsync::spawn(async move {
1438 let sessions = sessions.borrow();
1439 for ts in sessions.target_sessions.values() {
1440 if ts.attached.replace(true) {
1441 continue; }
1443 send(InspectorMsg::notification(json!({
1444 "method": "Target.attachedToTarget",
1445 "params": {
1446 "sessionId": ts.session_id,
1447 "targetInfo": ts.target_info(true),
1448 "waitingForDebugger": false
1449 }
1450 })));
1451 }
1452 });
1453 }
1454 }
1455 _ => {
1456 session.dispatch_message(msg);
1457 continue;
1458 }
1459 }
1460
1461 if let Some(id) = msg_id {
1463 let call_id = id.as_i64().unwrap_or(0) as i32;
1464 (session.state.send)(InspectorMsg {
1465 kind: InspectorMsgKind::Message(call_id),
1466 content: json!({
1467 "id": id,
1468 "result": {}
1469 })
1470 .to_string(),
1471 });
1472 }
1473 }
1474 session_id
1475}
1476
1477pub struct LocalInspectorSession {
1482 sessions: Rc<RefCell<SessionContainer>>,
1483 session_id: i32,
1484}
1485
1486impl LocalInspectorSession {
1487 pub fn new(session_id: i32, sessions: Rc<RefCell<SessionContainer>>) -> Self {
1488 Self {
1489 sessions,
1490 session_id,
1491 }
1492 }
1493
1494 pub fn dispatch(&mut self, msg: String) {
1495 self
1496 .sessions
1497 .borrow_mut()
1498 .dispatch_message_from_frontend(self.session_id, msg);
1499 }
1500
1501 pub fn post_message<T: serde::Serialize>(
1502 &mut self,
1503 id: i32,
1504 method: &str,
1505 params: Option<T>,
1506 ) {
1507 let message = json!({
1508 "id": id,
1509 "method": method,
1510 "params": params,
1511 });
1512
1513 let stringified_msg = serde_json::to_string(&message).unwrap();
1514 self.dispatch(stringified_msg);
1515 }
1516}