1use crate::futures::channel::mpsc;
8use crate::futures::channel::mpsc::UnboundedReceiver;
9use crate::futures::channel::mpsc::UnboundedSender;
10use crate::futures::channel::oneshot;
11use crate::futures::prelude::*;
12use crate::futures::stream::FuturesUnordered;
13use crate::futures::stream::StreamExt;
14use crate::futures::task;
15use crate::serde_json::json;
16
17use parking_lot::Mutex;
18use std::cell::Cell;
19use std::cell::RefCell;
20use std::collections::HashMap;
21use std::ffi::c_void;
22use std::mem::take;
23use std::pin::Pin;
24use std::ptr::NonNull;
25use std::rc::Rc;
26use std::sync::Arc;
27use std::task::Context;
28use std::task::Poll;
29use std::thread;
30
31#[derive(Debug)]
32pub enum InspectorMsgKind {
33 Notification,
34 Message(i32),
35}
36
37#[derive(Debug)]
38pub struct InspectorMsg {
39 pub kind: InspectorMsgKind,
40 pub content: String,
41}
42
43impl InspectorMsg {
44 pub fn notification(content: serde_json::Value) -> Self {
46 Self {
47 kind: InspectorMsgKind::Notification,
48 content: content.to_string(),
49 }
50 }
51}
52
53pub type SessionProxySender = UnboundedSender<InspectorMsg>;
55pub type SessionProxyReceiver = UnboundedReceiver<String>;
57
58pub enum InspectorSessionChannels {
60 Regular {
62 tx: SessionProxySender,
63 rx: SessionProxyReceiver,
64 },
65 Worker {
67 main_to_worker_tx: UnboundedSender<String>,
69 worker_to_main_rx: UnboundedReceiver<InspectorMsg>,
71 worker_url: String,
73 },
74}
75
76pub struct InspectorSessionProxy {
78 pub channels: InspectorSessionChannels,
79 pub kind: InspectorSessionKind,
80}
81
82pub fn create_worker_inspector_session_pair(
84 worker_url: String,
85) -> (InspectorSessionProxy, InspectorSessionProxy) {
86 let (worker_to_main_tx, worker_to_main_rx) =
87 mpsc::unbounded::<InspectorMsg>();
88 let (main_to_worker_tx, main_to_worker_rx) = mpsc::unbounded::<String>();
89
90 let main_side = InspectorSessionProxy {
91 channels: InspectorSessionChannels::Worker {
92 main_to_worker_tx,
93 worker_to_main_rx,
94 worker_url,
95 },
96 kind: InspectorSessionKind::NonBlocking {
97 wait_for_disconnect: false,
98 },
99 };
100
101 let worker_side = InspectorSessionProxy {
102 channels: InspectorSessionChannels::Regular {
103 tx: worker_to_main_tx,
104 rx: main_to_worker_rx,
105 },
106 kind: InspectorSessionKind::NonBlocking {
107 wait_for_disconnect: false,
108 },
109 };
110
111 (main_side, worker_side)
112}
113
114pub type InspectorSessionSend = Box<dyn Fn(InspectorMsg)>;
115
116#[derive(Clone, Copy, Debug)]
117enum PollState {
118 Idle,
119 Woken,
120 Polling,
121 Parked,
122 Dropped,
123}
124
125pub struct JsRuntimeInspector {
136 v8_inspector: Rc<v8::inspector::V8Inspector>,
137 new_session_tx: UnboundedSender<InspectorSessionProxy>,
138 deregister_tx: RefCell<Option<oneshot::Sender<()>>>,
139 state: Rc<JsRuntimeInspectorState>,
140}
141
142impl Drop for JsRuntimeInspector {
143 fn drop(&mut self) {
144 self
148 .state
149 .waker
150 .update(|w| w.poll_state = PollState::Dropped);
151
152 self.state.sessions.borrow_mut().drop_sessions();
157
158 if let Some(deregister_tx) = self.deregister_tx.borrow_mut().take() {
162 let _ = deregister_tx.send(());
163 }
164 }
165}
166
167#[derive(Clone)]
168struct JsRuntimeInspectorState {
169 isolate_ptr: v8::UnsafeRawIsolatePtr,
170 context: v8::Global<v8::Context>,
171 flags: Rc<RefCell<InspectorFlags>>,
172 waker: Arc<InspectorWaker>,
173 sessions: Rc<RefCell<SessionContainer>>,
174 is_dispatching_message: Rc<RefCell<bool>>,
175 pending_worker_messages: Arc<Mutex<Vec<(String, String)>>>,
176 nodeworker_enabled: Rc<Cell<bool>>,
177 auto_attach_enabled: Rc<Cell<bool>>,
178 discover_targets_enabled: Rc<Cell<bool>>,
179}
180
181struct JsRuntimeInspectorClient(Rc<JsRuntimeInspectorState>);
182
183impl v8::inspector::V8InspectorClientImpl for JsRuntimeInspectorClient {
184 fn run_message_loop_on_pause(&self, context_group_id: i32) {
185 assert_eq!(context_group_id, JsRuntimeInspector::CONTEXT_GROUP_ID);
186 self.0.flags.borrow_mut().on_pause = true;
187 let _ = self.0.poll_sessions(None);
188 }
189
190 fn quit_message_loop_on_pause(&self) {
191 self.0.flags.borrow_mut().on_pause = false;
192 }
193
194 fn run_if_waiting_for_debugger(&self, context_group_id: i32) {
195 assert_eq!(context_group_id, JsRuntimeInspector::CONTEXT_GROUP_ID);
196 self.0.flags.borrow_mut().waiting_for_session = false;
197 }
198
199 fn ensure_default_context_in_group(
200 &self,
201 context_group_id: i32,
202 ) -> Option<v8::Local<'_, v8::Context>> {
203 assert_eq!(context_group_id, JsRuntimeInspector::CONTEXT_GROUP_ID);
204 let context = self.0.context.clone();
205 let mut isolate =
206 unsafe { v8::Isolate::from_raw_isolate_ptr(self.0.isolate_ptr) };
207 let isolate = &mut isolate;
208 v8::callback_scope!(unsafe scope, isolate);
209 let local = v8::Local::new(scope, context);
210 Some(unsafe { local.extend_lifetime_unchecked() })
211 }
212
213 fn resource_name_to_url(
214 &self,
215 resource_name: &v8::inspector::StringView,
216 ) -> Option<v8::UniquePtr<v8::inspector::StringBuffer>> {
217 let resource_name = resource_name.to_string();
218 let url = url::Url::from_file_path(resource_name).ok()?;
219 let src_view = v8::inspector::StringView::from(url.as_str().as_bytes());
220 Some(v8::inspector::StringBuffer::create(src_view))
221 }
222}
223
224impl JsRuntimeInspectorState {
225 #[allow(clippy::result_unit_err)]
226 pub fn poll_sessions(
227 &self,
228 mut invoker_cx: Option<&mut Context>,
229 ) -> Result<Poll<()>, ()> {
230 let Ok(mut sessions) = self.sessions.try_borrow_mut() else {
235 return Err(());
236 };
237
238 self.waker.update(|w| {
239 match w.poll_state {
240 PollState::Idle | PollState::Woken => w.poll_state = PollState::Polling,
241 _ => unreachable!(),
242 };
243 });
244
245 let waker_ref = task::waker_ref(&self.waker);
248 let cx = &mut Context::from_waker(&waker_ref);
249
250 loop {
251 loop {
252 if let Some(session) = sessions.handshake.take() {
254 let mut fut =
255 pump_inspector_session_messages(session.clone()).boxed_local();
256 if fut.poll_unpin(cx).is_pending() {
262 sessions.established.push(fut);
263 }
264 let id = sessions.next_local_id;
265 sessions.next_local_id += 1;
266 sessions.local.insert(id, session);
267
268 if sessions.main_session_id.is_none() {
270 sessions.main_session_id = Some(id);
271 }
272
273 continue;
274 }
275
276 if let Poll::Ready(Some(session_proxy)) =
278 sessions.session_rx.poll_next_unpin(cx)
279 {
280 match session_proxy.channels {
281 InspectorSessionChannels::Worker {
282 main_to_worker_tx,
283 worker_to_main_rx,
284 worker_url,
285 } => {
286 let worker_id = sessions.next_local_id;
288 sessions.next_local_id += 1;
289
290 sessions.register_worker_session(worker_id, worker_url.clone());
291
292 sessions.register_worker_channels(
294 worker_id,
295 main_to_worker_tx,
296 worker_to_main_rx,
297 );
298
299 if let Some(main_id) = sessions.main_session_id
309 && let Some(main_session) = sessions.local.get(&main_id)
310 && let Some(ts) =
311 sessions.target_sessions.get(&format!("{}", worker_id))
312 {
313 if self.discover_targets_enabled.get() {
314 (main_session.state.send)(InspectorMsg::notification(
315 json!({
316 "method": "Target.targetCreated",
317 "params": { "targetInfo": ts.target_info(false) }
318 }),
319 ));
320 }
321
322 if self.auto_attach_enabled.get() {
323 ts.attached.set(true);
324 (main_session.state.send)(InspectorMsg::notification(
325 json!({
326 "method": "Target.attachedToTarget",
327 "params": {
328 "sessionId": ts.session_id,
329 "targetInfo": ts.target_info(true),
330 "waitingForDebugger": false
331 }
332 }),
333 ));
334 }
335 }
336
337 continue;
338 }
339 InspectorSessionChannels::Regular { tx, rx } => {
340 let session = InspectorSession::new(
342 sessions.v8_inspector.as_ref().unwrap().clone(),
343 self.is_dispatching_message.clone(),
344 Box::new(move |msg| {
345 let _ = tx.unbounded_send(msg);
346 }),
347 Some(rx),
348 session_proxy.kind,
349 self.sessions.clone(),
350 self.pending_worker_messages.clone(),
351 self.nodeworker_enabled.clone(),
352 self.auto_attach_enabled.clone(),
353 self.discover_targets_enabled.clone(),
354 );
355
356 let prev = sessions.handshake.replace(session);
357 assert!(prev.is_none());
358 continue;
359 }
360 }
361 }
362
363 if let Some(main_id) = sessions.main_session_id {
365 let main_session_send =
367 sessions.local.get(&main_id).map(|s| s.state.send.clone());
368
369 if let Some(send) = main_session_send {
370 let mut has_worker_message = false;
371 let mut terminated_workers = Vec::new();
372
373 for target_session in sessions.target_sessions.values() {
374 if !target_session.has_channels() {
376 continue;
377 }
378 match target_session.poll_from_worker(cx) {
379 Poll::Ready(Some(msg)) => {
380 if !self.nodeworker_enabled.get() {
384 if let Ok(mut parsed) =
385 serde_json::from_str::<serde_json::Value>(&msg.content)
386 {
387 if let Some(obj) = parsed.as_object_mut() {
388 obj.insert(
389 "sessionId".to_string(),
390 json!(target_session.session_id),
391 );
392 let flattened_msg = parsed.to_string();
393
394 send(InspectorMsg {
396 kind: msg.kind,
397 content: flattened_msg,
398 });
399 }
400 } else {
401 send(msg);
403 }
404 } else {
405 let wrapped_nodeworker = json!({
406 "method": "NodeWorker.receivedMessageFromWorker",
407 "params": {
408 "sessionId": target_session.session_id,
409 "message": msg.content,
410 "workerId": target_session.target_id
411 }
412 });
413
414 send(InspectorMsg {
415 kind: InspectorMsgKind::Notification,
416 content: wrapped_nodeworker.to_string(),
417 });
418 }
419
420 has_worker_message = true;
421 }
422 Poll::Ready(None) => {
423 if self.nodeworker_enabled.get() {
426 send(InspectorMsg::notification(json!({
428 "method": "NodeWorker.detachedFromWorker",
429 "params": {
430 "sessionId": target_session.session_id
431 }
432 })));
433 } else if self.auto_attach_enabled.get()
434 || self.discover_targets_enabled.get()
435 {
436 send(InspectorMsg::notification(json!({
438 "method": "Target.targetDestroyed",
439 "params": {
440 "targetId": target_session.target_id
441 }
442 })));
443 }
444 terminated_workers.push(target_session.session_id.clone());
445 }
446 Poll::Pending => {}
447 }
448 }
449
450 for session_id in terminated_workers {
452 sessions.target_sessions.remove(&session_id);
453 has_worker_message = true; }
455
456 if has_worker_message {
457 continue;
458 }
459 }
460 }
461
462 match sessions.established.poll_next_unpin(cx) {
464 Poll::Ready(Some(())) => {
465 continue;
466 }
467 Poll::Ready(None) => {
468 break;
469 }
470 Poll::Pending => {
471 break;
472 }
473 };
474 }
475
476 let should_block = {
477 let flags = self.flags.borrow();
478 flags.on_pause || flags.waiting_for_session
479 };
480
481 let pending_messages: Vec<(String, String)> = {
484 let mut queue = self.pending_worker_messages.lock();
485 queue.drain(..).collect()
486 };
487
488 for (session_id, message) in pending_messages {
489 if let Some(target_session) = sessions.target_sessions.get(&session_id)
490 {
491 target_session.send_to_worker(message);
492 }
493 }
494
495 let new_state = self.waker.update(|w| {
496 match w.poll_state {
497 PollState::Woken => {
498 w.poll_state = PollState::Polling;
501 }
502 PollState::Polling if !should_block => {
503 w.poll_state = PollState::Idle;
507 if let Some(cx) = invoker_cx.take() {
510 w.task_waker.replace(cx.waker().clone());
511 }
512 w.inspector_state_ptr = NonNull::new(self as *const _ as *mut Self);
515 }
516 PollState::Polling if should_block => {
517 w.poll_state = PollState::Parked;
522 w.parked_thread.replace(thread::current());
523 }
524 _ => unreachable!(),
525 };
526 w.poll_state
527 });
528 match new_state {
529 PollState::Idle => break, PollState::Polling => continue, PollState::Parked => thread::park(), _ => unreachable!(),
533 };
534 }
535
536 Ok(Poll::Pending)
537 }
538}
539
540impl JsRuntimeInspector {
541 const CONTEXT_GROUP_ID: i32 = 1;
544
545 pub fn new(
546 isolate_ptr: v8::UnsafeRawIsolatePtr,
547 scope: &mut v8::PinScope,
548 context: v8::Local<v8::Context>,
549 is_main_runtime: bool,
550 worker_id: Option<u32>,
551 ) -> Rc<Self> {
552 let (new_session_tx, new_session_rx) =
553 mpsc::unbounded::<InspectorSessionProxy>();
554
555 let waker = InspectorWaker::new(scope.thread_safe_handle());
556 let state = Rc::new(JsRuntimeInspectorState {
557 waker,
558 flags: Default::default(),
559 isolate_ptr,
560 context: v8::Global::new(scope, context),
561 sessions: Rc::new(
562 RefCell::new(SessionContainer::temporary_placeholder()),
563 ),
564 is_dispatching_message: Default::default(),
565 pending_worker_messages: Arc::new(Mutex::new(Vec::new())),
566 nodeworker_enabled: Rc::new(Cell::new(false)),
567 auto_attach_enabled: Rc::new(Cell::new(false)),
568 discover_targets_enabled: Rc::new(Cell::new(false)),
569 });
570 let client = Box::new(JsRuntimeInspectorClient(state.clone()));
571 let v8_inspector_client = v8::inspector::V8InspectorClient::new(client);
572 let v8_inspector = Rc::new(v8::inspector::V8Inspector::create(
573 scope,
574 v8_inspector_client,
575 ));
576
577 *state.sessions.borrow_mut() =
578 SessionContainer::new(v8_inspector.clone(), new_session_rx);
579
580 let context_name_bytes = if is_main_runtime {
582 &b"main realm"[..]
583 } else {
584 &format!("worker [{}]", worker_id.unwrap_or(1)).into_bytes()
585 };
586
587 let context_name = v8::inspector::StringView::from(context_name_bytes);
588 let aux_data = if is_main_runtime {
594 r#"{"isDefault": true, "type": "default"}"#
595 } else {
596 r#"{"isDefault": false, "type": "worker"}"#
597 };
598 let aux_data_view = v8::inspector::StringView::from(aux_data.as_bytes());
599 v8_inspector.context_created(
600 context,
601 Self::CONTEXT_GROUP_ID,
602 context_name,
603 aux_data_view,
604 );
605
606 let _ = state.poll_sessions(None).unwrap();
609
610 Rc::new(Self {
611 v8_inspector,
612 state,
613 new_session_tx,
614 deregister_tx: RefCell::new(None),
615 })
616 }
617
618 pub fn is_dispatching_message(&self) -> bool {
619 *self.state.is_dispatching_message.borrow()
620 }
621
622 pub fn context_destroyed(
623 &self,
624 scope: &mut v8::PinScope<'_, '_>,
625 context: v8::Global<v8::Context>,
626 ) {
627 let context = v8::Local::new(scope, context);
628 self.v8_inspector.context_destroyed(context);
629 }
630
631 pub fn exception_thrown(
632 &self,
633 scope: &mut v8::PinScope<'_, '_>,
634 exception: v8::Local<'_, v8::Value>,
635 in_promise: bool,
636 ) {
637 let context = scope.get_current_context();
638 let message = v8::Exception::create_message(scope, exception);
639 let stack_trace = message.get_stack_trace(scope);
640 let stack_trace = self.v8_inspector.create_stack_trace(stack_trace);
641 self.v8_inspector.exception_thrown(
642 context,
643 if in_promise {
644 v8::inspector::StringView::from("Uncaught (in promise)".as_bytes())
645 } else {
646 v8::inspector::StringView::from("Uncaught".as_bytes())
647 },
648 exception,
649 v8::inspector::StringView::from("".as_bytes()),
650 v8::inspector::StringView::from("".as_bytes()),
651 0,
652 0,
653 stack_trace,
654 0,
655 );
656 }
657
658 pub fn sessions_state(&self) -> SessionsState {
659 self.state.sessions.borrow().sessions_state()
660 }
661
662 pub fn poll_sessions_from_event_loop(&self, cx: &mut Context) {
663 let _ = self.state.poll_sessions(Some(cx)).unwrap();
664 }
665
666 pub fn wait_for_session(&self) {
669 loop {
670 if let Some(_session) =
671 self.state.sessions.borrow_mut().local.values().next()
672 {
673 self.state.flags.borrow_mut().waiting_for_session = false;
674 break;
675 } else {
676 self.state.flags.borrow_mut().waiting_for_session = true;
677 let _ = self.state.poll_sessions(None).unwrap();
678 }
679 }
680 }
681
682 pub fn wait_for_session_and_break_on_next_statement(&self) {
689 loop {
690 if let Some(session) =
691 self.state.sessions.borrow_mut().local.values().next()
692 {
693 break session.break_on_next_statement();
694 } else {
695 self.state.flags.borrow_mut().waiting_for_session = true;
696 let _ = self.state.poll_sessions(None).unwrap();
697 }
698 }
699 }
700
701 pub fn get_session_sender(&self) -> UnboundedSender<InspectorSessionProxy> {
703 self.new_session_tx.clone()
704 }
705
706 pub fn add_deregister_handler(&self) -> oneshot::Receiver<()> {
710 let maybe_deregister_tx = self.deregister_tx.borrow_mut().take();
711 if let Some(deregister_tx) = maybe_deregister_tx
712 && !deregister_tx.is_canceled()
713 {
714 panic!("Inspector deregister handler already exists and is alive.");
715 }
716 let (tx, rx) = oneshot::channel::<()>();
717 self.deregister_tx.borrow_mut().replace(tx);
718 rx
719 }
720
721 pub fn create_local_session(
722 inspector: Rc<JsRuntimeInspector>,
723 callback: InspectorSessionSend,
724 kind: InspectorSessionKind,
725 ) -> LocalInspectorSession {
726 let (session_id, sessions) = {
727 let sessions = inspector.state.sessions.clone();
728
729 let inspector_session = InspectorSession::new(
730 inspector.v8_inspector.clone(),
731 inspector.state.is_dispatching_message.clone(),
732 callback,
733 None,
734 kind,
735 sessions.clone(),
736 inspector.state.pending_worker_messages.clone(),
737 inspector.state.nodeworker_enabled.clone(),
738 inspector.state.auto_attach_enabled.clone(),
739 inspector.state.discover_targets_enabled.clone(),
740 );
741
742 let session_id = {
743 let mut s = sessions.borrow_mut();
744 let id = s.next_local_id;
745 s.next_local_id += 1;
746 assert!(s.local.insert(id, inspector_session).is_none());
747 id
748 };
749
750 take(&mut inspector.state.flags.borrow_mut().waiting_for_session);
751 (session_id, sessions)
752 };
753
754 LocalInspectorSession::new(session_id, sessions)
755 }
756}
757
758#[derive(Default)]
759struct InspectorFlags {
760 waiting_for_session: bool,
761 on_pause: bool,
762}
763
764#[derive(Debug)]
765pub struct SessionsState {
766 pub has_active: bool,
767 pub has_blocking: bool,
768 pub has_nonblocking: bool,
769 pub has_nonblocking_wait_for_disconnect: bool,
770}
771
772pub struct SessionContainer {
775 v8_inspector: Option<Rc<v8::inspector::V8Inspector>>,
776 session_rx: UnboundedReceiver<InspectorSessionProxy>,
777 handshake: Option<Rc<InspectorSession>>,
778 established: FuturesUnordered<InspectorSessionPumpMessages>,
779 next_local_id: i32,
780 local: HashMap<i32, Rc<InspectorSession>>,
781
782 target_sessions: HashMap<String, Rc<TargetSession>>, main_session_id: Option<i32>, next_worker_id: u32, }
786
787struct MainWorkerChannels {
788 main_to_worker_tx: UnboundedSender<String>,
789 worker_to_main_rx: UnboundedReceiver<InspectorMsg>,
790}
791
792struct TargetSession {
794 target_id: String,
795 session_id: String,
796 local_session_id: i32,
797 worker_id: u32,
799 main_worker_channels: RefCell<Option<MainWorkerChannels>>,
800 url: String,
801 attached: Cell<bool>,
803}
804
805impl TargetSession {
806 fn title(&self) -> String {
809 format!("worker [{}]", self.worker_id)
810 }
811
812 fn send_to_worker(&self, message: String) {
814 if let Some(channels) = self.main_worker_channels.borrow().as_ref() {
815 let _ = channels.main_to_worker_tx.unbounded_send(message);
816 }
817 }
818
819 fn has_channels(&self) -> bool {
821 self.main_worker_channels.borrow().is_some()
822 }
823
824 fn poll_from_worker(&self, cx: &mut Context) -> Poll<Option<InspectorMsg>> {
828 self
829 .main_worker_channels
830 .borrow_mut()
831 .as_mut()
832 .expect("poll_from_worker called before channels were registered")
833 .worker_to_main_rx
834 .poll_next_unpin(cx)
835 }
836}
837
838impl SessionContainer {
839 fn new(
840 v8_inspector: Rc<v8::inspector::V8Inspector>,
841 new_session_rx: UnboundedReceiver<InspectorSessionProxy>,
842 ) -> Self {
843 Self {
844 v8_inspector: Some(v8_inspector),
845 session_rx: new_session_rx,
846 handshake: None,
847 established: FuturesUnordered::new(),
848 next_local_id: 1,
849 local: HashMap::new(),
850
851 target_sessions: HashMap::new(),
852 main_session_id: None,
853 next_worker_id: 1, }
855 }
856
857 fn drop_sessions(&mut self) {
862 self.v8_inspector = Default::default();
863 self.handshake.take();
864 self.established.clear();
865 self.local.clear();
866 }
867
868 fn sessions_state(&self) -> SessionsState {
869 SessionsState {
870 has_active: !self.established.is_empty()
871 || self.handshake.is_some()
872 || !self.local.is_empty(),
873 has_blocking: self
874 .local
875 .values()
876 .any(|s| matches!(s.state.kind, InspectorSessionKind::Blocking)),
877 has_nonblocking: self.local.values().any(|s| {
878 matches!(s.state.kind, InspectorSessionKind::NonBlocking { .. })
879 }),
880 has_nonblocking_wait_for_disconnect: self.local.values().any(|s| {
881 matches!(
882 s.state.kind,
883 InspectorSessionKind::NonBlocking {
884 wait_for_disconnect: true
885 }
886 )
887 }),
888 }
889 }
890
891 fn temporary_placeholder() -> Self {
896 let (_tx, rx) = mpsc::unbounded::<InspectorSessionProxy>();
897 Self {
898 v8_inspector: Default::default(),
899 session_rx: rx,
900 handshake: None,
901 established: FuturesUnordered::new(),
902 next_local_id: 1,
903 local: HashMap::new(),
904
905 target_sessions: HashMap::new(),
906 main_session_id: None,
907 next_worker_id: 1,
908 }
909 }
910
911 pub fn dispatch_message_from_frontend(
912 &mut self,
913 session_id: i32,
914 message: String,
915 ) {
916 let session = self.local.get(&session_id).unwrap();
917 session.dispatch_message(message);
918 }
919
920 fn register_worker_session(
922 &mut self,
923 local_session_id: i32,
924 worker_url: String,
925 ) -> u32 {
926 let worker_id = self.next_worker_id;
928 self.next_worker_id += 1;
929
930 let target_id = format!("{}", local_session_id);
932 let session_id = format!("{}", local_session_id);
933
934 let target_session = Rc::new(TargetSession {
935 target_id: target_id.clone(),
936 session_id: session_id.clone(),
937 local_session_id,
938 worker_id,
939 main_worker_channels: RefCell::new(None),
940 url: worker_url.clone(),
941 attached: Cell::new(false),
942 });
943 self
944 .target_sessions
945 .insert(session_id.clone(), target_session.clone());
946
947 worker_id
948 }
949
950 pub fn register_worker_channels(
953 &mut self,
954 local_session_id: i32,
955 main_to_worker_tx: UnboundedSender<String>,
956 worker_to_main_rx: UnboundedReceiver<InspectorMsg>,
957 ) -> bool {
958 for target_session in self.target_sessions.values() {
960 if target_session.local_session_id == local_session_id {
961 *target_session.main_worker_channels.borrow_mut() =
962 Some(MainWorkerChannels {
963 main_to_worker_tx,
964 worker_to_main_rx,
965 });
966 return true;
967 }
968 }
969 false
970 }
971}
972
973struct InspectorWakerInner {
974 poll_state: PollState,
975 task_waker: Option<task::Waker>,
976 parked_thread: Option<thread::Thread>,
977 inspector_state_ptr: Option<NonNull<JsRuntimeInspectorState>>,
978 isolate_handle: v8::IsolateHandle,
979}
980
981unsafe impl Send for InspectorWakerInner {}
983
984struct InspectorWaker(Mutex<InspectorWakerInner>);
985
986impl InspectorWaker {
987 fn new(isolate_handle: v8::IsolateHandle) -> Arc<Self> {
988 let inner = InspectorWakerInner {
989 poll_state: PollState::Idle,
990 task_waker: None,
991 parked_thread: None,
992 inspector_state_ptr: None,
993 isolate_handle,
994 };
995 Arc::new(Self(Mutex::new(inner)))
996 }
997
998 fn update<F, R>(&self, update_fn: F) -> R
999 where
1000 F: FnOnce(&mut InspectorWakerInner) -> R,
1001 {
1002 let mut g = self.0.lock();
1003 update_fn(&mut g)
1004 }
1005}
1006
1007impl task::ArcWake for InspectorWaker {
1008 fn wake_by_ref(arc_self: &Arc<Self>) {
1009 arc_self.update(|w| {
1010 match w.poll_state {
1011 PollState::Idle => {
1012 if let Some(waker) = w.task_waker.take() {
1014 waker.wake()
1015 }
1016 if let Some(arg) = w
1019 .inspector_state_ptr
1020 .take()
1021 .map(|ptr| ptr.as_ptr() as *mut c_void)
1022 {
1023 w.isolate_handle.request_interrupt(handle_interrupt, arg);
1024 }
1025 unsafe extern "C" fn handle_interrupt(
1026 _isolate: v8::UnsafeRawIsolatePtr,
1027 arg: *mut c_void,
1028 ) {
1029 let inspector_state =
1032 unsafe { &*(arg as *mut JsRuntimeInspectorState) };
1033 let _ = inspector_state.poll_sessions(None);
1034 }
1035 }
1036 PollState::Parked => {
1037 let parked_thread = w.parked_thread.take().unwrap();
1039 assert_ne!(parked_thread.id(), thread::current().id());
1040 parked_thread.unpark();
1041 }
1042 _ => {}
1043 };
1044 w.poll_state = PollState::Woken;
1045 });
1046 }
1047}
1048
1049#[derive(Clone, Copy, Debug)]
1050pub enum InspectorSessionKind {
1051 Blocking,
1052 NonBlocking { wait_for_disconnect: bool },
1053}
1054
1055#[derive(Clone)]
1056struct InspectorSessionState {
1057 is_dispatching_message: Rc<RefCell<bool>>,
1058 send: Rc<InspectorSessionSend>,
1059 rx: Rc<RefCell<Option<SessionProxyReceiver>>>,
1060 kind: InspectorSessionKind,
1063 sessions: Rc<RefCell<SessionContainer>>,
1064 pending_worker_messages: Arc<Mutex<Vec<(String, String)>>>,
1066 nodeworker_enabled: Rc<Cell<bool>>,
1068 auto_attach_enabled: Rc<Cell<bool>>,
1070 discover_targets_enabled: Rc<Cell<bool>>,
1072}
1073
1074struct InspectorSession {
1077 v8_session: v8::inspector::V8InspectorSession,
1078 state: InspectorSessionState,
1079}
1080
1081impl InspectorSession {
1082 const CONTEXT_GROUP_ID: i32 = 1;
1083
1084 #[allow(clippy::too_many_arguments)]
1085 pub fn new(
1086 v8_inspector: Rc<v8::inspector::V8Inspector>,
1087 is_dispatching_message: Rc<RefCell<bool>>,
1088 send: InspectorSessionSend,
1089 rx: Option<SessionProxyReceiver>,
1090 kind: InspectorSessionKind,
1091 sessions: Rc<RefCell<SessionContainer>>,
1092 pending_worker_messages: Arc<Mutex<Vec<(String, String)>>>,
1093 nodeworker_enabled: Rc<Cell<bool>>,
1094 auto_attach_enabled: Rc<Cell<bool>>,
1095 discover_targets_enabled: Rc<Cell<bool>>,
1096 ) -> Rc<Self> {
1097 let state = InspectorSessionState {
1098 is_dispatching_message,
1099 send: Rc::new(send),
1100 rx: Rc::new(RefCell::new(rx)),
1101 kind,
1102 sessions,
1103 pending_worker_messages,
1104 nodeworker_enabled,
1105 auto_attach_enabled,
1106 discover_targets_enabled,
1107 };
1108
1109 let v8_session = v8_inspector.connect(
1110 Self::CONTEXT_GROUP_ID,
1111 v8::inspector::Channel::new(Box::new(state.clone())),
1112 v8::inspector::StringView::empty(),
1113 v8::inspector::V8InspectorClientTrustLevel::FullyTrusted,
1114 );
1115
1116 Rc::new(Self { v8_session, state })
1117 }
1118
1119 fn dispatch_message(&self, msg: String) {
1121 *self.state.is_dispatching_message.borrow_mut() = true;
1122 let msg = v8::inspector::StringView::from(msg.as_bytes());
1123 self.v8_session.dispatch_protocol_message(msg);
1124 *self.state.is_dispatching_message.borrow_mut() = false;
1125 }
1126
1127 pub fn break_on_next_statement(&self) {
1128 let reason = v8::inspector::StringView::from(&b"debugCommand"[..]);
1129 let detail = v8::inspector::StringView::empty();
1130 self
1131 .v8_session
1132 .schedule_pause_on_next_statement(reason, detail);
1133 }
1134
1135 fn queue_worker_message(&self, session_id: &str, message: String) {
1137 self
1138 .state
1139 .pending_worker_messages
1140 .lock()
1141 .push((session_id.to_string(), message));
1142 }
1143
1144 fn notify_workers<F>(&self, mut f: F)
1146 where
1147 F: FnMut(&TargetSession, &dyn Fn(InspectorMsg)) + 'static,
1148 {
1149 let sessions = self.state.sessions.clone();
1150 let send = self.state.send.clone();
1151 deno_core::unsync::spawn(async move {
1152 let sessions = sessions.borrow();
1153 for ts in sessions.target_sessions.values() {
1154 f(ts, &|msg| send(msg));
1155 }
1156 });
1157 }
1158}
1159
1160impl InspectorSessionState {
1161 fn send_message(
1162 &self,
1163 msg_kind: InspectorMsgKind,
1164 msg: v8::UniquePtr<v8::inspector::StringBuffer>,
1165 ) {
1166 let msg = msg.unwrap().string().to_string();
1167 (self.send)(InspectorMsg {
1168 kind: msg_kind,
1169 content: msg,
1170 });
1171 }
1172}
1173
1174impl v8::inspector::ChannelImpl for InspectorSessionState {
1175 fn send_response(
1176 &self,
1177 call_id: i32,
1178 message: v8::UniquePtr<v8::inspector::StringBuffer>,
1179 ) {
1180 self.send_message(InspectorMsgKind::Message(call_id), message);
1181 }
1182
1183 fn send_notification(
1184 &self,
1185 message: v8::UniquePtr<v8::inspector::StringBuffer>,
1186 ) {
1187 self.send_message(InspectorMsgKind::Notification, message);
1188 }
1189
1190 fn flush_protocol_notifications(&self) {}
1191}
1192type InspectorSessionPumpMessages = Pin<Box<dyn Future<Output = ()>>>;
1193fn get_str_param(params: &Option<serde_json::Value>, key: &str) -> String {
1195 params
1196 .as_ref()
1197 .and_then(|p| p.get(key))
1198 .and_then(|v| v.as_str())
1199 .unwrap_or_default()
1200 .to_owned()
1201}
1202
1203fn get_bool_param(params: &Option<serde_json::Value>, key: &str) -> bool {
1205 params
1206 .as_ref()
1207 .and_then(|p| p.get(key))
1208 .and_then(|v| v.as_bool())
1209 .unwrap_or(false)
1210}
1211
1212impl TargetSession {
1213 fn target_info(&self, attached: bool) -> serde_json::Value {
1215 json!({
1216 "targetId": self.target_id,
1217 "type": "node_worker",
1218 "title": self.title(),
1219 "url": self.url,
1220 "attached": attached,
1221 "canAccessOpener": true
1222 })
1223 }
1224
1225 fn worker_info(&self) -> serde_json::Value {
1227 json!({
1228 "workerId": self.target_id,
1229 "type": "node_worker",
1230 "title": self.title(),
1231 "url": self.url
1232 })
1233 }
1234}
1235
1236async fn pump_inspector_session_messages(session: Rc<InspectorSession>) {
1237 let mut rx = session.state.rx.borrow_mut().take().unwrap();
1238
1239 while let Some(msg) = rx.next().await {
1240 let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&msg) else {
1241 session.dispatch_message(msg);
1242 continue;
1243 };
1244
1245 if let Some(session_id) = parsed.get("sessionId").and_then(|s| s.as_str()) {
1247 let mut worker_msg = parsed.clone();
1248 if let Some(obj) = worker_msg.as_object_mut() {
1249 obj.remove("sessionId");
1250 session.queue_worker_message(session_id, worker_msg.to_string());
1251 }
1252 continue;
1253 }
1254
1255 let Some(method) = parsed.get("method").and_then(|m| m.as_str()) else {
1256 session.dispatch_message(msg);
1257 continue;
1258 };
1259
1260 let params = parsed.get("params").cloned();
1261 let msg_id = parsed.get("id").cloned();
1262
1263 match method {
1264 "NodeWorker.enable" => {
1265 session.state.nodeworker_enabled.set(true);
1266 session.notify_workers(|ts, send| {
1267 send(InspectorMsg::notification(json!({
1268 "method": "NodeWorker.attachedToWorker",
1269 "params": {
1270 "sessionId": ts.session_id,
1271 "workerInfo": ts.worker_info(),
1272 "waitingForDebugger": false
1273 }
1274 })));
1275 });
1276 }
1277 "NodeWorker.sendMessageToWorker" | "Target.sendMessageToTarget" => {
1278 session.queue_worker_message(
1279 &get_str_param(¶ms, "sessionId"),
1280 get_str_param(¶ms, "message"),
1281 );
1282 }
1283 "Target.setDiscoverTargets" => {
1284 let discover = get_bool_param(¶ms, "discover");
1285 session.state.discover_targets_enabled.set(discover);
1286
1287 if discover {
1288 session.notify_workers(|ts, send| {
1289 send(InspectorMsg::notification(json!({
1290 "method": "Target.targetCreated",
1291 "params": { "targetInfo": ts.target_info(false) }
1292 })));
1293 });
1294 }
1295 }
1296 "Target.setAutoAttach" => {
1297 let auto_attach = get_bool_param(¶ms, "autoAttach");
1298 let send = session.state.send.clone();
1299 let sessions = session.state.sessions.clone();
1300 session.state.auto_attach_enabled.set(auto_attach);
1301 if auto_attach {
1302 deno_core::unsync::spawn(async move {
1303 let sessions = sessions.borrow();
1304 for ts in sessions.target_sessions.values() {
1305 if ts.attached.replace(true) {
1306 continue; }
1308 send(InspectorMsg::notification(json!({
1309 "method": "Target.attachedToTarget",
1310 "params": {
1311 "sessionId": ts.session_id,
1312 "targetInfo": ts.target_info(true),
1313 "waitingForDebugger": false
1314 }
1315 })));
1316 }
1317 });
1318 }
1319 }
1320 _ => {
1321 session.dispatch_message(msg);
1322 continue;
1323 }
1324 }
1325
1326 if let Some(id) = msg_id {
1328 let call_id = id.as_i64().unwrap_or(0) as i32;
1329 (session.state.send)(InspectorMsg {
1330 kind: InspectorMsgKind::Message(call_id),
1331 content: json!({
1332 "id": id,
1333 "result": {}
1334 })
1335 .to_string(),
1336 });
1337 }
1338 }
1339}
1340
1341pub struct LocalInspectorSession {
1346 sessions: Rc<RefCell<SessionContainer>>,
1347 session_id: i32,
1348}
1349
1350impl LocalInspectorSession {
1351 pub fn new(session_id: i32, sessions: Rc<RefCell<SessionContainer>>) -> Self {
1352 Self {
1353 sessions,
1354 session_id,
1355 }
1356 }
1357
1358 pub fn dispatch(&mut self, msg: String) {
1359 self
1360 .sessions
1361 .borrow_mut()
1362 .dispatch_message_from_frontend(self.session_id, msg);
1363 }
1364
1365 pub fn post_message<T: serde::Serialize>(
1366 &mut self,
1367 id: i32,
1368 method: &str,
1369 params: Option<T>,
1370 ) {
1371 let message = json!({
1372 "id": id,
1373 "method": method,
1374 "params": params,
1375 });
1376
1377 let stringified_msg = serde_json::to_string(&message).unwrap();
1378 self.dispatch(stringified_msg);
1379 }
1380}