use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use deno_core::anyhow::Context;
use deno_core::anyhow::anyhow;
use deno_core::anyhow::bail;
use deno_core::error::AnyError;
use deno_core::serde_json;
use deno_core::serde_json::Value;
use deno_core::serde_json::json;
use fastwebsockets::Frame;
use fastwebsockets::OpCode;
use fastwebsockets::WebSocket;
use fastwebsockets::WebSocketError;
use fastwebsockets::handshake;
use http_body_util::BodyExt;
use http_body_util::Empty;
use http_body_util::Full;
use hyper::body::Bytes;
use hyper::body::Incoming;
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use uuid::Uuid;
#[derive(Clone, Debug)]
pub struct MuxConfig {
pub listen: SocketAddr,
pub deno_internal: SocketAddr,
pub cef_internal: SocketAddr,
pub inspect_brk: bool,
#[allow(
dead_code,
reason = "read by the child process via env vars, not by the mux itself"
)]
pub wait_for_debugger: bool,
}
pub struct MuxHandle {
pub listen: SocketAddr,
_shutdown_tx: oneshot::Sender<()>,
}
pub use deno_lib::util::net::allocate_random_port;
pub async fn spawn_mux(config: MuxConfig) -> Result<MuxHandle, AnyError> {
let listener = TcpListener::bind(config.listen).await.with_context(|| {
format!("failed to bind CDP multiplexer to {}", config.listen)
})?;
let listen = listener.local_addr()?;
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
let state = Arc::new(MuxState::new(config.clone(), listen));
tokio::spawn(async move {
let mut shutdown_rx = shutdown_rx;
let mut consecutive_accept_errors: u32 = 0;
loop {
tokio::select! {
_ = &mut shutdown_rx => {
log::debug!("[devtools-mux] shutdown requested");
break;
}
accept = listener.accept() => {
match accept {
Ok((stream, _)) => {
consecutive_accept_errors = 0;
let state = state.clone();
tokio::spawn(async move {
if let Err(err) = serve_connection(stream, state).await {
log::debug!("[devtools-mux] connection error: {err:?}");
}
});
}
Err(err) => {
consecutive_accept_errors =
consecutive_accept_errors.saturating_add(1);
if consecutive_accept_errors == 1
|| consecutive_accept_errors.is_power_of_two()
{
log::error!(
"[devtools-mux] accept failed (attempt {}): {err:?}",
consecutive_accept_errors,
);
}
tokio::time::sleep(Duration::from_millis(200)).await;
}
}
}
}
}
});
Ok(MuxHandle {
listen,
_shutdown_tx: shutdown_tx,
})
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum TargetKind {
Unified,
Deno,
Cef,
}
impl TargetKind {
fn path(self) -> &'static str {
match self {
TargetKind::Unified => "/unified",
TargetKind::Deno => "/deno",
TargetKind::Cef => "/cef",
}
}
fn title(self) -> &'static str {
match self {
TargetKind::Unified => "Deno Desktop (unified)",
TargetKind::Deno => "Deno Runtime",
TargetKind::Cef => "CEF Renderer",
}
}
}
const DENO_CHILD_TARGET_ID: &str = "deno-runtime-isolate";
struct MuxState {
config: MuxConfig,
listen: SocketAddr,
unified_id: Uuid,
deno_id: Uuid,
cef_id: Uuid,
deno_session_id: String,
debugger_attached: Arc<std::sync::atomic::AtomicBool>,
}
impl MuxState {
fn new(config: MuxConfig, listen: SocketAddr) -> Self {
Self {
config,
listen,
unified_id: Uuid::new_v4(),
deno_id: Uuid::new_v4(),
cef_id: Uuid::new_v4(),
deno_session_id: Uuid::new_v4().to_string(),
debugger_attached: Arc::new(std::sync::atomic::AtomicBool::new(false)),
}
}
fn target_for_path(&self, path: &str) -> Option<TargetKind> {
if path == TargetKind::Unified.path() {
Some(TargetKind::Unified)
} else if path == TargetKind::Deno.path() {
Some(TargetKind::Deno)
} else if path == TargetKind::Cef.path() {
Some(TargetKind::Cef)
} else {
None
}
}
}
async fn serve_connection(
stream: TcpStream,
state: Arc<MuxState>,
) -> Result<(), AnyError> {
let io = TokioIo::new(stream);
let service = hyper::service::service_fn(move |req| {
let state = state.clone();
async move { Ok::<_, Infallible>(handle_request(req, state).await) }
});
hyper::server::conn::http1::Builder::new()
.serve_connection(io, service)
.with_upgrades()
.await
.map_err(|e| anyhow!("hyper serve error: {e}"))?;
Ok(())
}
async fn handle_request(
req: hyper::Request<Incoming>,
state: Arc<MuxState>,
) -> hyper::Response<Full<Bytes>> {
if req.method() != http::Method::GET {
return simple_response(
http::StatusCode::METHOD_NOT_ALLOWED,
"Not Allowed",
);
}
let path = req.uri().path().to_string();
match path.as_str() {
"/json/version" => json_version(&state),
"/json" | "/json/list" => json_list(&state),
"/json/protocol" => json_protocol(),
"/debugger-attached" => {
if state
.debugger_attached
.load(std::sync::atomic::Ordering::SeqCst)
{
simple_response(http::StatusCode::OK, "attached")
} else {
simple_response(http::StatusCode::SERVICE_UNAVAILABLE, "waiting")
}
}
other => {
if let Some(kind) = state.target_for_path(other) {
match handle_upgrade(req, kind, state.clone()) {
Ok(resp) => resp,
Err(err) => {
log::error!("[devtools-mux] upgrade failed for {other}: {err:?}");
simple_response(http::StatusCode::BAD_REQUEST, "upgrade failed")
}
}
} else if other.starts_with("/devtools/") {
match proxy_devtools_asset(&req, state.config.cef_internal).await {
Ok(resp) => resp,
Err(err) => {
log::error!("[devtools-mux] devtools asset proxy failed: {err:?}");
simple_response(http::StatusCode::BAD_GATEWAY, "proxy failed")
}
}
} else {
simple_response(http::StatusCode::NOT_FOUND, "Not Found")
}
}
}
}
async fn proxy_devtools_asset(
req: &hyper::Request<Incoming>,
cef_internal: SocketAddr,
) -> Result<hyper::Response<Full<Bytes>>, AnyError> {
let path_and_query = req
.uri()
.path_and_query()
.map(|p| p.as_str())
.unwrap_or("/");
let stream = TcpStream::connect(cef_internal).await?;
let io = TokioIo::new(stream);
let (mut sender, conn) = hyper::client::conn::http1::handshake(io)
.await
.map_err(|e| anyhow!("devtools asset handshake failed: {e}"))?;
tokio::spawn(async move {
if let Err(err) = conn.await {
log::trace!("[devtools-mux] devtools asset conn closed: {err:?}");
}
});
let upstream_req = hyper::Request::builder()
.method(http::Method::GET)
.uri(path_and_query)
.header(http::header::HOST, cef_internal.to_string())
.body(Empty::<Bytes>::new())?;
let resp = sender.send_request(upstream_req).await?;
let (parts, body) = resp.into_parts();
let bytes = body.collect().await?.to_bytes();
let mut builder = hyper::Response::builder().status(parts.status);
for (name, value) in parts.headers.iter() {
let skip = matches!(
name.as_str().to_ascii_lowercase().as_str(),
"transfer-encoding"
| "content-length"
| "connection"
| "keep-alive"
| "proxy-authenticate"
| "proxy-authorization"
| "te"
| "trailer"
| "upgrade"
);
if !skip {
builder = builder.header(name, value);
}
}
Ok(
builder
.header(http::header::CONTENT_LENGTH, bytes.len())
.body(Full::new(bytes))
.unwrap(),
)
}
fn json_version(state: &MuxState) -> hyper::Response<Full<Bytes>> {
let body = json!({
"Browser": format!("deno-desktop/{}", env!("CARGO_PKG_VERSION")),
"Protocol-Version": "1.3",
"V8-Version": deno_core::v8::VERSION_STRING,
"webSocketDebuggerUrl": format!(
"ws://{}{}",
state.listen,
TargetKind::Cef.path(),
),
});
json_response(body)
}
fn json_list(state: &MuxState) -> hyper::Response<Full<Bytes>> {
let listen = state.listen.to_string();
let unified_url = format!("ws://{listen}{}", TargetKind::Unified.path());
let deno_url = format!("ws://{listen}{}", TargetKind::Deno.path());
let cef_url = format!("ws://{listen}{}", TargetKind::Cef.path());
let unified_entry = json!({
"id": state.unified_id.to_string(),
"type": "page",
"title": TargetKind::Unified.title(),
"description": "Unified DevTools (CEF page + Deno runtime)",
"url": "deno-desktop://unified",
"faviconUrl": "https://deno.land/favicon.ico",
"devtoolsFrontendUrl": format!(
"devtools://devtools/bundled/inspector.html?ws={}",
strip_scheme(&unified_url),
),
"webSocketDebuggerUrl": unified_url,
});
let deno_entry = json!({
"id": state.deno_id.to_string(),
"type": "node",
"title": TargetKind::Deno.title(),
"description": "Deno runtime V8 isolate (direct)",
"url": format!("deno://{}", state.config.deno_internal),
"faviconUrl": "https://deno.land/favicon.ico",
"devtoolsFrontendUrl": format!(
"devtools://devtools/bundled/js_app.html?ws={}&experiments=true&v8only=true",
strip_scheme(&deno_url),
),
"webSocketDebuggerUrl": deno_url,
});
let cef_entry = json!({
"id": state.cef_id.to_string(),
"type": "page",
"title": TargetKind::Cef.title(),
"description": "CEF renderer V8 isolate (direct)",
"url": format!("cef://{}", state.config.cef_internal),
"faviconUrl": "https://deno.land/favicon.ico",
"devtoolsFrontendUrl": format!(
"devtools://devtools/bundled/inspector.html?ws={}",
strip_scheme(&cef_url),
),
"webSocketDebuggerUrl": cef_url,
});
json_response(Value::Array(vec![unified_entry, deno_entry, cef_entry]))
}
fn json_protocol() -> hyper::Response<Full<Bytes>> {
json_response(json!({
"version": { "major": "1", "minor": "3" },
"domains": [],
}))
}
fn json_response(value: Value) -> hyper::Response<Full<Bytes>> {
let body = Full::new(Bytes::from(serde_json::to_vec(&value).unwrap()));
hyper::Response::builder()
.status(http::StatusCode::OK)
.header(http::header::CONTENT_TYPE, "application/json")
.body(body)
.unwrap()
}
fn simple_response(
status: http::StatusCode,
msg: &'static str,
) -> hyper::Response<Full<Bytes>> {
hyper::Response::builder()
.status(status)
.body(Full::new(Bytes::from(msg)))
.unwrap()
}
fn strip_scheme(ws_url: &str) -> String {
ws_url
.strip_prefix("ws://")
.or_else(|| ws_url.strip_prefix("wss://"))
.unwrap_or(ws_url)
.to_string()
}
fn handle_upgrade(
mut req: hyper::Request<Incoming>,
kind: TargetKind,
state: Arc<MuxState>,
) -> Result<hyper::Response<Full<Bytes>>, AnyError> {
let (resp, upgrade_fut) = fastwebsockets::upgrade::upgrade(&mut req)
.map_err(|e| anyhow!("not a valid websocket upgrade: {e}"))?;
tokio::spawn(async move {
let client = match upgrade_fut.await {
Ok(ws) => ws,
Err(err) => {
log::error!("[devtools-mux] client upgrade failed: {err:?}");
return;
}
};
match kind {
TargetKind::Unified => {
if let Err(err) = run_unified_session(client, state).await {
log::debug!("[devtools-mux] unified session ended: {err:?}");
}
}
TargetKind::Deno => {
mark_debugger_attached(&state);
match connect_upstream(&state, kind).await {
Ok(upstream) => {
if let Err(err) = proxy_frames(client, upstream).await {
log::debug!("[devtools-mux] proxy ended: {err:?}");
}
}
Err(err) => {
log::error!(
"[devtools-mux] failed to connect upstream for Deno: {err:?}"
);
}
}
}
TargetKind::Cef => match connect_upstream(&state, kind).await {
Ok(mut upstream) => {
if state.config.inspect_brk
&& let Err(err) = inject_cef_pause(&mut upstream).await
{
log::error!(
"[devtools-mux] failed to inject pause into CEF: {err:?}"
);
}
mark_debugger_attached(&state);
if let Err(err) = proxy_frames(client, upstream).await {
log::debug!("[devtools-mux] proxy ended: {err:?}");
}
}
Err(err) => {
log::error!(
"[devtools-mux] failed to connect upstream for CEF: {err:?}"
);
}
},
}
});
let (parts, _) = resp.into_parts();
Ok(hyper::Response::from_parts(parts, Full::new(Bytes::new())))
}
async fn connect_upstream(
state: &MuxState,
kind: TargetKind,
) -> Result<WebSocket<TokioIo<hyper::upgrade::Upgraded>>, AnyError> {
let upstream_host = match kind {
TargetKind::Deno => state.config.deno_internal,
TargetKind::Cef => state.config.cef_internal,
TargetKind::Unified => {
bail!("connect_upstream cannot be called with the Unified target")
}
};
let mut last_err: Option<AnyError> = None;
let deadline = tokio::time::Instant::now() + Duration::from_secs(30);
while tokio::time::Instant::now() < deadline {
match fetch_upstream_ws_url(upstream_host).await {
Ok(ws_url) => match connect_ws(&ws_url).await {
Ok(ws) => return Ok(ws),
Err(err) => {
last_err = Some(err);
}
},
Err(err) => {
last_err = Some(err);
}
}
tokio::time::sleep(Duration::from_millis(250)).await;
}
Err(last_err.unwrap_or_else(|| anyhow!("upstream connect timed out")))
}
async fn fetch_upstream_ws_url(host: SocketAddr) -> Result<String, AnyError> {
let stream = TcpStream::connect(host).await?;
let io = TokioIo::new(stream);
let (mut sender, conn) = hyper::client::conn::http1::handshake(io)
.await
.map_err(|e| anyhow!("http handshake to {host} failed: {e}"))?;
tokio::spawn(async move {
if let Err(err) = conn.await {
log::trace!("[devtools-mux] upstream conn closed: {err:?}");
}
});
let req = hyper::Request::builder()
.method(http::Method::GET)
.uri("/json/list")
.header(http::header::HOST, host.to_string())
.body(Empty::<Bytes>::new())?;
let resp = sender.send_request(req).await?;
if !resp.status().is_success() {
bail!("upstream /json/list at {host} returned {}", resp.status());
}
let body = resp.collect().await?.to_bytes();
let value: Value = serde_json::from_slice(&body)
.with_context(|| format!("upstream /json/list at {host} not JSON"))?;
let ws_url = value
.as_array()
.and_then(|arr| {
arr.iter().find_map(|v| {
let url = v.get("url").and_then(|u| u.as_str()).unwrap_or("");
if url.contains("/devtools/") || url.contains("devtools://") {
return None;
}
v.get("webSocketDebuggerUrl")
})
})
.and_then(|v| v.as_str())
.ok_or_else(|| {
anyhow!("no webSocketDebuggerUrl in /json/list at {host}")
})?;
let rewritten = rewrite_ws_host(ws_url, host);
Ok(rewritten)
}
fn rewrite_ws_host(ws_url: &str, host: SocketAddr) -> String {
let rest = ws_url
.strip_prefix("ws://")
.or_else(|| ws_url.strip_prefix("wss://"))
.unwrap_or(ws_url);
let path = rest.find('/').map(|i| &rest[i..]).unwrap_or("/");
format!("ws://{host}{path}")
}
async fn connect_ws(
ws_url: &str,
) -> Result<WebSocket<TokioIo<hyper::upgrade::Upgraded>>, AnyError> {
let url: http::Uri = ws_url.parse()?;
let host = url
.host()
.ok_or_else(|| anyhow!("ws url missing host: {ws_url}"))?;
let port = url
.port_u16()
.ok_or_else(|| anyhow!("ws url missing port: {ws_url}"))?;
let authority = format!("{host}:{port}");
let stream = TcpStream::connect(&authority).await?;
let req = hyper::Request::builder()
.method(http::Method::GET)
.uri(url.path_and_query().map(|p| p.as_str()).unwrap_or("/"))
.header(http::header::HOST, &authority)
.header(http::header::UPGRADE, "websocket")
.header(http::header::CONNECTION, "upgrade")
.header("Sec-WebSocket-Key", handshake::generate_key())
.header("Sec-WebSocket-Version", "13")
.body(Empty::<Bytes>::new())?;
let (ws, _) = handshake::client(&TokioExec, req, stream).await?;
Ok(ws)
}
struct TokioExec;
impl<F> hyper::rt::Executor<F> for TokioExec
where
F: std::future::Future + Send + 'static,
F::Output: Send + 'static,
{
fn execute(&self, fut: F) {
tokio::spawn(fut);
}
}
async fn proxy_frames(
mut client: WebSocket<TokioIo<hyper::upgrade::Upgraded>>,
mut upstream: WebSocket<TokioIo<hyper::upgrade::Upgraded>>,
) -> Result<(), AnyError> {
client.set_auto_close(false);
client.set_auto_pong(false);
upstream.set_auto_close(false);
upstream.set_auto_pong(false);
let (mut client_rx, mut client_tx) = client.split(tokio::io::split);
let (mut up_rx, mut up_tx) = upstream.split(tokio::io::split);
let client_to_up = async {
let mut noop = |_: Frame<'_>| async { Ok::<(), WebSocketError>(()) };
loop {
let frame = match client_rx.read_frame(&mut noop).await {
Ok(f) => f,
Err(err) => {
log::debug!("[devtools-mux] client read: {err:?}");
return;
}
};
let is_close = frame.opcode == OpCode::Close;
if let Err(err) = up_tx.write_frame(frame).await {
log::debug!("[devtools-mux] upstream write: {err:?}");
return;
}
if is_close {
return;
}
}
};
let up_to_client = async {
let mut noop = |_: Frame<'_>| async { Ok::<(), WebSocketError>(()) };
loop {
let frame = match up_rx.read_frame(&mut noop).await {
Ok(f) => f,
Err(err) => {
log::debug!("[devtools-mux] upstream read: {err:?}");
return;
}
};
let is_close = frame.opcode == OpCode::Close;
if let Err(err) = client_tx.write_frame(frame).await {
log::debug!("[devtools-mux] client write: {err:?}");
return;
}
if is_close {
return;
}
}
};
tokio::join!(client_to_up, up_to_client);
Ok(())
}
struct OwnedFrame {
opcode: OpCode,
payload: Vec<u8>,
}
impl OwnedFrame {
fn text(payload: Vec<u8>) -> Self {
Self {
opcode: OpCode::Text,
payload,
}
}
fn into_frame(self) -> Frame<'static> {
Frame::new(
true,
self.opcode,
None,
fastwebsockets::Payload::Owned(self.payload),
)
}
}
fn mark_debugger_attached(state: &MuxState) {
state
.debugger_attached
.store(true, std::sync::atomic::Ordering::SeqCst);
}
async fn inject_cef_pause<S>(cef: &mut WebSocket<S>) -> Result<(), AnyError>
where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{
let enable = json!({"id": -1, "method": "Debugger.enable"});
cef
.write_frame(Frame::new(
true,
OpCode::Text,
None,
fastwebsockets::Payload::Owned(serde_json::to_vec(&enable).unwrap()),
))
.await?;
let pause = json!({"id": -2, "method": "Debugger.pause"});
cef
.write_frame(Frame::new(
true,
OpCode::Text,
None,
fastwebsockets::Payload::Owned(serde_json::to_vec(&pause).unwrap()),
))
.await?;
let mut saw_enable = false;
let mut saw_pause = false;
while !(saw_enable && saw_pause) {
let frame = cef.read_frame().await?;
if frame.opcode != OpCode::Text {
continue;
}
let value: Value = match serde_json::from_slice(&frame.payload) {
Ok(v) => v,
Err(_) => continue,
};
match value.get("id").and_then(|v| v.as_i64()) {
Some(-1) => saw_enable = true,
Some(-2) => saw_pause = true,
_ => {}
}
}
log::debug!(
"[devtools-mux] injected Debugger.enable + Debugger.pause into CEF"
);
Ok(())
}
async fn run_unified_session(
client: WebSocket<TokioIo<hyper::upgrade::Upgraded>>,
state: Arc<MuxState>,
) -> Result<(), AnyError> {
let mut client = client;
client.set_auto_close(false);
client.set_auto_pong(false);
let (cef, deno) = tokio::try_join!(
connect_upstream(&state, TargetKind::Cef),
connect_upstream(&state, TargetKind::Deno),
)?;
let mut cef = cef;
let mut deno = deno;
cef.set_auto_close(false);
cef.set_auto_pong(false);
deno.set_auto_close(false);
deno.set_auto_pong(false);
let session_id = state.deno_session_id.clone();
if state.config.inspect_brk {
inject_cef_pause(&mut cef).await?;
}
mark_debugger_attached(&state);
let (mut client_rx, mut client_tx) = client.split(tokio::io::split);
let (mut cef_rx, mut cef_tx) = cef.split(tokio::io::split);
let (mut deno_rx, mut deno_tx) = deno.split(tokio::io::split);
let (client_send, mut client_recv) = mpsc::unbounded_channel::<OwnedFrame>();
let (cef_send, mut cef_recv) = mpsc::unbounded_channel::<OwnedFrame>();
let (deno_send, mut deno_recv) = mpsc::unbounded_channel::<OwnedFrame>();
let deno_announced = Arc::new(std::sync::atomic::AtomicBool::new(false));
let client_writer = tokio::spawn(async move {
while let Some(owned) = client_recv.recv().await {
let close = owned.opcode == OpCode::Close;
if let Err(err) = client_tx.write_frame(owned.into_frame()).await {
log::debug!("[devtools-mux] unified client write: {err:?}");
return;
}
if close {
return;
}
}
});
let cef_writer = tokio::spawn(async move {
while let Some(owned) = cef_recv.recv().await {
let close = owned.opcode == OpCode::Close;
if let Err(err) = cef_tx.write_frame(owned.into_frame()).await {
log::debug!("[devtools-mux] unified cef write: {err:?}");
return;
}
if close {
return;
}
}
});
let deno_writer = tokio::spawn(async move {
while let Some(owned) = deno_recv.recv().await {
let close = owned.opcode == OpCode::Close;
if let Err(err) = deno_tx.write_frame(owned.into_frame()).await {
log::debug!("[devtools-mux] unified deno write: {err:?}");
return;
}
if close {
return;
}
}
});
let mut client_reader = {
let client_send = client_send.clone();
let cef_send = cef_send.clone();
let deno_send = deno_send.clone();
let session_id = session_id.clone();
let deno_announced = deno_announced.clone();
tokio::spawn(async move {
let mut noop = |_: Frame<'_>| async { Ok::<(), WebSocketError>(()) };
loop {
let frame = match client_rx.read_frame(&mut noop).await {
Ok(f) => f,
Err(err) => {
log::debug!("[devtools-mux] unified client read: {err:?}");
return;
}
};
let opcode = frame.opcode;
let payload = frame.payload.to_vec();
match opcode {
OpCode::Text => {
route_client_text(
&payload,
&session_id,
&client_send,
&cef_send,
&deno_send,
&deno_announced,
);
}
OpCode::Close => {
let _ = cef_send.send(OwnedFrame {
opcode,
payload: payload.clone(),
});
let _ = deno_send.send(OwnedFrame { opcode, payload });
return;
}
_ => {
let _ = cef_send.send(OwnedFrame { opcode, payload });
}
}
}
})
};
let mut cef_reader = {
let client_send = client_send.clone();
tokio::spawn(async move {
let mut noop = |_: Frame<'_>| async { Ok::<(), WebSocketError>(()) };
loop {
let frame = match cef_rx.read_frame(&mut noop).await {
Ok(f) => f,
Err(err) => {
log::debug!("[devtools-mux] unified cef read: {err:?}");
return;
}
};
let opcode = frame.opcode;
let payload = frame.payload.to_vec();
let owned = if opcode == OpCode::Text {
OwnedFrame::text(rewrite_text_from_upstream(
&payload, None, "Renderer",
))
} else {
OwnedFrame { opcode, payload }
};
if client_send.send(owned).is_err() {
return;
}
}
})
};
let mut deno_reader = {
let client_send = client_send.clone();
let session_id = session_id.clone();
tokio::spawn(async move {
let mut noop = |_: Frame<'_>| async { Ok::<(), WebSocketError>(()) };
loop {
let frame = match deno_rx.read_frame(&mut noop).await {
Ok(f) => f,
Err(err) => {
log::debug!("[devtools-mux] unified deno read: {err:?}");
return;
}
};
let opcode = frame.opcode;
let payload = frame.payload.to_vec();
let owned = if opcode == OpCode::Text {
OwnedFrame::text(rewrite_text_from_upstream(
&payload,
Some(&session_id),
"Deno",
))
} else {
OwnedFrame { opcode, payload }
};
if client_send.send(owned).is_err() {
return;
}
}
})
};
drop(client_send);
drop(cef_send);
drop(deno_send);
tokio::select! {
_ = &mut client_reader => {},
_ = &mut cef_reader => {},
_ = &mut deno_reader => {},
}
client_reader.abort();
cef_reader.abort();
deno_reader.abort();
client_writer.abort();
cef_writer.abort();
deno_writer.abort();
Ok(())
}
fn route_client_text(
payload: &[u8],
session_id: &str,
client_send: &mpsc::UnboundedSender<OwnedFrame>,
cef_send: &mpsc::UnboundedSender<OwnedFrame>,
deno_send: &mpsc::UnboundedSender<OwnedFrame>,
deno_announced: &Arc<std::sync::atomic::AtomicBool>,
) {
let mut value: Value = match serde_json::from_slice(payload) {
Ok(v) => v,
Err(_) => {
let _ = cef_send.send(OwnedFrame::text(payload.to_vec()));
return;
}
};
let session = value.get("sessionId").and_then(|v| v.as_str());
if session == Some(session_id) {
if let Some(obj) = value.as_object_mut() {
obj.remove("sessionId");
}
let bytes = serde_json::to_vec(&value).unwrap_or_else(|_| payload.to_vec());
let _ = deno_send.send(OwnedFrame::text(bytes));
return;
}
let id = value.get("id").and_then(|v| v.as_i64());
let method = value
.get("method")
.and_then(|v| v.as_str())
.map(str::to_owned);
if matches!(
method.as_deref(),
Some("Target.setAutoAttach") | Some("Target.setDiscoverTargets")
) && !deno_announced.swap(true, std::sync::atomic::Ordering::SeqCst)
{
let event = attached_to_target_event(session_id);
let _ =
client_send.send(OwnedFrame::text(serde_json::to_vec(&event).unwrap()));
}
if method.as_deref() == Some("Target.attachToTarget") {
let target_id = value
.get("params")
.and_then(|p| p.get("targetId"))
.and_then(|v| v.as_str());
if target_id == Some(DENO_CHILD_TARGET_ID) {
if let Some(rid) = id {
let reply = json!({
"id": rid,
"result": { "sessionId": session_id },
});
let _ = client_send
.send(OwnedFrame::text(serde_json::to_vec(&reply).unwrap()));
}
return;
}
}
if method.as_deref() == Some("Target.detachFromTarget") {
let detach_session = value
.get("params")
.and_then(|p| p.get("sessionId"))
.and_then(|v| v.as_str());
if detach_session == Some(session_id) {
if let Some(rid) = id {
let reply = json!({ "id": rid, "result": {} });
let _ = client_send
.send(OwnedFrame::text(serde_json::to_vec(&reply).unwrap()));
}
let event = json!({
"method": "Target.detachedFromTarget",
"params": {
"sessionId": session_id,
"targetId": DENO_CHILD_TARGET_ID,
},
});
let _ =
client_send.send(OwnedFrame::text(serde_json::to_vec(&event).unwrap()));
return;
}
}
let _ = cef_send.send(OwnedFrame::text(payload.to_vec()));
}
fn rewrite_text_from_upstream(
payload: &[u8],
inject_session_id: Option<&str>,
context_name: &str,
) -> Vec<u8> {
let mut value: Value = match serde_json::from_slice(payload) {
Ok(v) => v,
Err(_) => return payload.to_vec(),
};
let Some(obj) = value.as_object_mut() else {
return payload.to_vec();
};
if let Some(sid) = inject_session_id {
obj.insert("sessionId".to_string(), Value::String(sid.to_string()));
}
if obj.get("method").and_then(|v| v.as_str())
== Some("Runtime.executionContextCreated")
&& let Some(ctx) = obj
.get_mut("params")
.and_then(|v| v.get_mut("context"))
.and_then(|v| v.as_object_mut())
{
ctx.insert("name".to_string(), Value::String(context_name.to_string()));
}
serde_json::to_vec(&value).unwrap_or_else(|_| payload.to_vec())
}
fn attached_to_target_event(session_id: &str) -> Value {
json!({
"method": "Target.attachedToTarget",
"params": {
"sessionId": session_id,
"targetInfo": {
"targetId": DENO_CHILD_TARGET_ID,
"type": "worker",
"title": "Deno Runtime",
"url": "deno://runtime",
"attached": true,
"canAccessOpener": false,
},
"waitingForDebugger": false,
},
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn rewrite_ws_host_forces_host() {
let host: SocketAddr = "127.0.0.1:9230".parse().unwrap();
assert_eq!(
rewrite_ws_host("ws://0.0.0.0:9230/devtools/browser/abc", host),
"ws://127.0.0.1:9230/devtools/browser/abc"
);
assert_eq!(
rewrite_ws_host("ws://localhost/ws/deadbeef", host),
"ws://127.0.0.1:9230/ws/deadbeef"
);
}
fn test_config() -> MuxConfig {
MuxConfig {
listen: "127.0.0.1:9229".parse().unwrap(),
deno_internal: "127.0.0.1:9230".parse().unwrap(),
cef_internal: "127.0.0.1:9231".parse().unwrap(),
inspect_brk: false,
wait_for_debugger: false,
}
}
#[test]
fn target_path_round_trip() {
let state = MuxState::new(test_config(), "127.0.0.1:9229".parse().unwrap());
assert_eq!(state.target_for_path("/unified"), Some(TargetKind::Unified));
assert_eq!(state.target_for_path("/deno"), Some(TargetKind::Deno));
assert_eq!(state.target_for_path("/cef"), Some(TargetKind::Cef));
assert_eq!(state.target_for_path("/bogus"), None);
}
#[test]
fn route_client_text_strips_session_and_forwards_to_deno() {
let (client_tx, _client_rx) = mpsc::unbounded_channel::<OwnedFrame>();
let (cef_tx, _cef_rx) = mpsc::unbounded_channel::<OwnedFrame>();
let (deno_tx, mut deno_rx) = mpsc::unbounded_channel::<OwnedFrame>();
let announced = Arc::new(std::sync::atomic::AtomicBool::new(false));
let session_id = "test-session-123";
let msg = json!({
"id": 1,
"method": "Debugger.enable",
"sessionId": session_id,
});
let payload = serde_json::to_vec(&msg).unwrap();
route_client_text(
&payload, session_id, &client_tx, &cef_tx, &deno_tx, &announced,
);
let frame = deno_rx.try_recv().expect("expected frame on deno channel");
let value: Value = serde_json::from_slice(&frame.payload).unwrap();
assert_eq!(value.get("id").unwrap(), 1);
assert_eq!(value.get("method").unwrap(), "Debugger.enable");
assert!(
value.get("sessionId").is_none(),
"sessionId should be stripped"
);
}
#[test]
fn route_client_text_forwards_non_session_to_cef() {
let (client_tx, _client_rx) = mpsc::unbounded_channel::<OwnedFrame>();
let (cef_tx, mut cef_rx) = mpsc::unbounded_channel::<OwnedFrame>();
let (deno_tx, _deno_rx) = mpsc::unbounded_channel::<OwnedFrame>();
let announced = Arc::new(std::sync::atomic::AtomicBool::new(true));
let msg = json!({"id": 5, "method": "DOM.getDocument"});
let payload = serde_json::to_vec(&msg).unwrap();
route_client_text(
&payload,
"some-session",
&client_tx,
&cef_tx,
&deno_tx,
&announced,
);
let frame = cef_rx.try_recv().expect("expected frame on cef channel");
let value: Value = serde_json::from_slice(&frame.payload).unwrap();
assert_eq!(value.get("id").unwrap(), 5);
}
#[test]
fn route_client_text_attach_to_deno_target_replies_locally() {
let (client_tx, mut client_rx) = mpsc::unbounded_channel::<OwnedFrame>();
let (cef_tx, mut cef_rx) = mpsc::unbounded_channel::<OwnedFrame>();
let (deno_tx, _deno_rx) = mpsc::unbounded_channel::<OwnedFrame>();
let announced = Arc::new(std::sync::atomic::AtomicBool::new(true));
let session_id = "deno-sess";
let msg = json!({
"id": 10,
"method": "Target.attachToTarget",
"params": { "targetId": DENO_CHILD_TARGET_ID },
});
let payload = serde_json::to_vec(&msg).unwrap();
route_client_text(
&payload, session_id, &client_tx, &cef_tx, &deno_tx, &announced,
);
let frame = client_rx.try_recv().expect("expected reply on client");
let value: Value = serde_json::from_slice(&frame.payload).unwrap();
assert_eq!(value["id"], 10);
assert_eq!(value["result"]["sessionId"], session_id);
assert!(cef_rx.try_recv().is_err());
}
#[test]
fn route_client_text_lazily_announces_deno() {
let (client_tx, mut client_rx) = mpsc::unbounded_channel::<OwnedFrame>();
let (cef_tx, _cef_rx) = mpsc::unbounded_channel::<OwnedFrame>();
let (deno_tx, _deno_rx) = mpsc::unbounded_channel::<OwnedFrame>();
let announced = Arc::new(std::sync::atomic::AtomicBool::new(false));
let msg = json!({
"id": 1,
"method": "Target.setAutoAttach",
"params": { "autoAttach": true, "waitForDebuggerOnStart": false },
});
let payload = serde_json::to_vec(&msg).unwrap();
route_client_text(
&payload, "sess", &client_tx, &cef_tx, &deno_tx, &announced,
);
let frame = client_rx.try_recv().expect("expected announce event");
let value: Value = serde_json::from_slice(&frame.payload).unwrap();
assert_eq!(value["method"], "Target.attachedToTarget");
assert_eq!(value["params"]["targetInfo"]["type"], "worker");
assert!(announced.load(std::sync::atomic::Ordering::SeqCst));
route_client_text(
&payload, "sess", &client_tx, &cef_tx, &deno_tx, &announced,
);
assert!(client_rx.try_recv().is_err());
}
#[test]
fn route_client_text_set_discover_targets_also_triggers_announce() {
let (client_tx, mut client_rx) = mpsc::unbounded_channel::<OwnedFrame>();
let (cef_tx, _cef_rx) = mpsc::unbounded_channel::<OwnedFrame>();
let (deno_tx, _deno_rx) = mpsc::unbounded_channel::<OwnedFrame>();
let announced = Arc::new(std::sync::atomic::AtomicBool::new(false));
let msg = json!({
"id": 7,
"method": "Target.setDiscoverTargets",
"params": { "discover": true },
});
let payload = serde_json::to_vec(&msg).unwrap();
route_client_text(
&payload, "sess", &client_tx, &cef_tx, &deno_tx, &announced,
);
let frame = client_rx
.try_recv()
.expect("setDiscoverTargets should also trigger announce");
let value: Value = serde_json::from_slice(&frame.payload).unwrap();
assert_eq!(value["method"], "Target.attachedToTarget");
assert!(announced.load(std::sync::atomic::Ordering::SeqCst));
}
#[test]
fn route_client_text_attach_to_cef_target_forwards_to_cef() {
let (client_tx, mut client_rx) = mpsc::unbounded_channel::<OwnedFrame>();
let (cef_tx, mut cef_rx) = mpsc::unbounded_channel::<OwnedFrame>();
let (deno_tx, _deno_rx) = mpsc::unbounded_channel::<OwnedFrame>();
let announced = Arc::new(std::sync::atomic::AtomicBool::new(true));
let msg = json!({
"id": 11,
"method": "Target.attachToTarget",
"params": { "targetId": "some-cef-subframe-target" },
});
let payload = serde_json::to_vec(&msg).unwrap();
route_client_text(
&payload,
"deno-sess",
&client_tx,
&cef_tx,
&deno_tx,
&announced,
);
let forwarded = cef_rx.try_recv().expect("expected frame on cef");
let value: Value = serde_json::from_slice(&forwarded.payload).unwrap();
assert_eq!(value["method"], "Target.attachToTarget");
assert_eq!(value["params"]["targetId"], "some-cef-subframe-target");
assert!(client_rx.try_recv().is_err());
}
#[test]
fn rewrite_injects_session_id() {
let input = json!({"id": 1, "result": {}});
let payload = serde_json::to_vec(&input).unwrap();
let out = rewrite_text_from_upstream(&payload, Some("my-sess"), "Deno");
let value: Value = serde_json::from_slice(&out).unwrap();
assert_eq!(value["sessionId"], "my-sess");
}
#[test]
fn rewrite_renames_execution_context() {
let input = json!({
"method": "Runtime.executionContextCreated",
"params": {
"context": {
"id": 1,
"origin": "",
"name": "top",
},
},
});
let payload = serde_json::to_vec(&input).unwrap();
let out = rewrite_text_from_upstream(&payload, None, "Renderer");
let value: Value = serde_json::from_slice(&out).unwrap();
assert_eq!(value["params"]["context"]["name"], "Renderer");
}
#[test]
fn rewrite_no_session_leaves_field_absent() {
let input = json!({"method": "Console.messageAdded"});
let payload = serde_json::to_vec(&input).unwrap();
let out = rewrite_text_from_upstream(&payload, None, "Renderer");
let value: Value = serde_json::from_slice(&out).unwrap();
assert!(value.get("sessionId").is_none());
}
#[test]
fn rewrite_leaves_non_execution_context_messages_unchanged() {
let input = json!({
"method": "Target.targetInfoChanged",
"params": { "targetInfo": { "name": "something" } },
});
let payload = serde_json::to_vec(&input).unwrap();
let out = rewrite_text_from_upstream(&payload, None, "Renderer");
let value: Value = serde_json::from_slice(&out).unwrap();
assert_eq!(value["params"]["targetInfo"]["name"], "something");
}
#[test]
fn rewrite_invalid_json_returned_verbatim() {
let payload = b"totally not json".to_vec();
let out = rewrite_text_from_upstream(&payload, Some("sid"), "Renderer");
assert_eq!(out, payload);
}
#[test]
fn rewrite_non_object_json_returned_verbatim() {
let input = json!([1, 2, 3]);
let payload = serde_json::to_vec(&input).unwrap();
let out = rewrite_text_from_upstream(&payload, Some("sid"), "Renderer");
assert_eq!(out, payload);
}
#[tokio::test]
async fn json_list_returns_three_targets() {
let state = MuxState::new(test_config(), "127.0.0.1:9229".parse().unwrap());
let resp = json_list(&state);
let body = resp.into_body();
let bytes = body.collect().await.unwrap().to_bytes();
let list: Vec<Value> = serde_json::from_slice(&bytes).unwrap();
assert_eq!(list.len(), 3);
assert_eq!(list[0]["type"], "page");
assert_eq!(list[0]["title"], "Deno Desktop (unified)");
assert!(
list[0]["webSocketDebuggerUrl"]
.as_str()
.unwrap()
.ends_with("/unified")
);
assert_eq!(list[1]["type"], "node");
assert_eq!(list[1]["title"], "Deno Runtime");
assert!(
list[1]["webSocketDebuggerUrl"]
.as_str()
.unwrap()
.ends_with("/deno")
);
assert_eq!(list[2]["type"], "page");
assert_eq!(list[2]["title"], "CEF Renderer");
assert!(
list[2]["webSocketDebuggerUrl"]
.as_str()
.unwrap()
.ends_with("/cef")
);
for entry in &list {
let frontend = entry["devtoolsFrontendUrl"].as_str().unwrap();
let ws = entry["webSocketDebuggerUrl"].as_str().unwrap();
let ws_host_path = ws.strip_prefix("ws://").unwrap();
assert!(
frontend.contains(ws_host_path),
"frontend {frontend} must embed ws host+path {ws_host_path}"
);
let id = entry["id"].as_str().unwrap();
Uuid::parse_str(id).unwrap_or_else(|_| panic!("id {id} is not a UUID"));
assert!(entry["description"].is_string());
assert!(entry["faviconUrl"].is_string());
}
assert!(
list[1]["devtoolsFrontendUrl"]
.as_str()
.unwrap()
.contains("js_app.html"),
"deno direct entry should launch js_app.html"
);
for page_entry in [&list[0], &list[2]] {
assert!(
page_entry["devtoolsFrontendUrl"]
.as_str()
.unwrap()
.contains("inspector.html"),
);
}
let resp2 = json_list(&state);
let bytes2 = resp2.into_body().collect().await.unwrap().to_bytes();
let list2: Vec<Value> = serde_json::from_slice(&bytes2).unwrap();
for (a, b) in list.iter().zip(list2.iter()) {
assert_eq!(
a["id"], b["id"],
"target id changed across /json/list calls"
);
}
}
#[tokio::test]
async fn json_version_shape() {
let state = MuxState::new(test_config(), "127.0.0.1:9229".parse().unwrap());
let resp = json_version(&state);
assert_eq!(resp.status(), http::StatusCode::OK);
let ct = resp.headers().get(http::header::CONTENT_TYPE).unwrap();
assert_eq!(ct, "application/json");
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let value: Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(value["Protocol-Version"], "1.3");
assert!(
value["Browser"]
.as_str()
.unwrap()
.starts_with("deno-desktop/")
);
assert!(value["V8-Version"].is_string());
let ws = value["webSocketDebuggerUrl"].as_str().unwrap();
assert!(ws.ends_with("/cef"), "got {ws}");
}
#[test]
fn route_client_text_detach_from_deno_session_synthesizes_reply_and_event() {
let (client_tx, mut client_rx) = mpsc::unbounded_channel::<OwnedFrame>();
let (cef_tx, mut cef_rx) = mpsc::unbounded_channel::<OwnedFrame>();
let (deno_tx, _deno_rx) = mpsc::unbounded_channel::<OwnedFrame>();
let announced = Arc::new(std::sync::atomic::AtomicBool::new(true));
let session_id = "deno-sess-xyz";
let msg = json!({
"id": 42,
"method": "Target.detachFromTarget",
"params": { "sessionId": session_id },
});
let payload = serde_json::to_vec(&msg).unwrap();
route_client_text(
&payload, session_id, &client_tx, &cef_tx, &deno_tx, &announced,
);
let reply = client_rx.try_recv().expect("expected detach reply");
let reply_val: Value = serde_json::from_slice(&reply.payload).unwrap();
assert_eq!(reply_val["id"], 42);
assert!(reply_val["result"].is_object());
let event = client_rx.try_recv().expect("expected detached event");
let event_val: Value = serde_json::from_slice(&event.payload).unwrap();
assert_eq!(event_val["method"], "Target.detachedFromTarget");
assert_eq!(event_val["params"]["sessionId"], session_id);
assert_eq!(event_val["params"]["targetId"], DENO_CHILD_TARGET_ID);
assert!(cef_rx.try_recv().is_err());
}
#[test]
fn route_client_text_detach_from_unknown_session_forwards_to_cef() {
let (client_tx, mut client_rx) = mpsc::unbounded_channel::<OwnedFrame>();
let (cef_tx, mut cef_rx) = mpsc::unbounded_channel::<OwnedFrame>();
let (deno_tx, _deno_rx) = mpsc::unbounded_channel::<OwnedFrame>();
let announced = Arc::new(std::sync::atomic::AtomicBool::new(true));
let msg = json!({
"id": 99,
"method": "Target.detachFromTarget",
"params": { "sessionId": "some-cef-session" },
});
let payload = serde_json::to_vec(&msg).unwrap();
route_client_text(
&payload,
"deno-sess",
&client_tx,
&cef_tx,
&deno_tx,
&announced,
);
let forwarded = cef_rx.try_recv().expect("expected frame forwarded");
let value: Value = serde_json::from_slice(&forwarded.payload).unwrap();
assert_eq!(value["id"], 99);
assert!(client_rx.try_recv().is_err());
}
#[test]
fn route_client_text_invalid_json_forwards_to_cef() {
let (client_tx, mut client_rx) = mpsc::unbounded_channel::<OwnedFrame>();
let (cef_tx, mut cef_rx) = mpsc::unbounded_channel::<OwnedFrame>();
let (deno_tx, _deno_rx) = mpsc::unbounded_channel::<OwnedFrame>();
let announced = Arc::new(std::sync::atomic::AtomicBool::new(true));
let payload = b"not-json".to_vec();
route_client_text(
&payload, "sess", &client_tx, &cef_tx, &deno_tx, &announced,
);
let frame = cef_rx.try_recv().expect("expected frame on cef");
assert_eq!(frame.payload, b"not-json");
assert!(client_rx.try_recv().is_err());
}
struct MockUpstream {
listen: SocketAddr,
from_mux: tokio::sync::Mutex<mpsc::UnboundedReceiver<OwnedFrame>>,
to_mux: mpsc::UnboundedSender<OwnedFrame>,
}
async fn spawn_mock_upstream() -> Arc<MockUpstream> {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let listen = listener.local_addr().unwrap();
let (in_tx, in_rx) = mpsc::unbounded_channel::<OwnedFrame>();
let (out_tx, out_rx) = mpsc::unbounded_channel::<OwnedFrame>();
let upstream = Arc::new(MockUpstream {
listen,
from_mux: tokio::sync::Mutex::new(in_rx),
to_mux: out_tx,
});
let shared_out_rx = Arc::new(std::sync::Mutex::new(Some(out_rx)));
tokio::spawn(async move {
loop {
let (stream, _) = match listener.accept().await {
Ok(v) => v,
Err(_) => return,
};
let in_tx = in_tx.clone();
let shared_out_rx = shared_out_rx.clone();
let listen_str = listen.to_string();
tokio::spawn(async move {
let io = TokioIo::new(stream);
let service = hyper::service::service_fn(move |mut req| {
let in_tx = in_tx.clone();
let shared_out_rx = shared_out_rx.clone();
let listen_str = listen_str.clone();
async move {
let path = req.uri().path().to_string();
if path == "/json/list" {
let body = serde_json::to_vec(&json!([{
"id": "mock-target",
"type": "page",
"webSocketDebuggerUrl": format!("ws://{listen_str}/ws"),
}]))
.unwrap();
return Ok::<_, Infallible>(
hyper::Response::builder()
.status(http::StatusCode::OK)
.header(http::header::CONTENT_TYPE, "application/json")
.body(Full::new(Bytes::from(body)))
.unwrap(),
);
}
if path == "/ws" {
let Ok((resp, upgrade_fut)) =
fastwebsockets::upgrade::upgrade(&mut req)
else {
return Ok(simple_response(
http::StatusCode::BAD_REQUEST,
"bad upgrade",
));
};
let in_tx = in_tx.clone();
let out_rx = shared_out_rx.lock().unwrap().take();
tokio::spawn(async move {
let mut ws = match upgrade_fut.await {
Ok(w) => w,
Err(_) => return,
};
ws.set_auto_close(false);
ws.set_auto_pong(false);
let (mut rx, mut tx) = ws.split(tokio::io::split);
let reader = async move {
let mut noop =
|_: Frame<'_>| async { Ok::<(), WebSocketError>(()) };
loop {
let frame = match rx.read_frame(&mut noop).await {
Ok(f) => f,
Err(_) => return,
};
let owned = OwnedFrame {
opcode: frame.opcode,
payload: frame.payload.to_vec(),
};
let is_close = owned.opcode == OpCode::Close;
if in_tx.send(owned).is_err() || is_close {
return;
}
}
};
let writer = async move {
let Some(mut out_rx) = out_rx else { return };
while let Some(owned) = out_rx.recv().await {
let close = owned.opcode == OpCode::Close;
if tx.write_frame(owned.into_frame()).await.is_err() {
return;
}
if close {
return;
}
}
};
tokio::join!(reader, writer);
});
let (parts, _) = resp.into_parts();
return Ok(hyper::Response::from_parts(
parts,
Full::new(Bytes::new()),
));
}
Ok(simple_response(http::StatusCode::NOT_FOUND, "Not Found"))
}
});
let _ = hyper::server::conn::http1::Builder::new()
.serve_connection(io, service)
.with_upgrades()
.await;
});
}
});
upstream
}
async fn test_connect_ws(
ws_url: &str,
) -> WebSocket<TokioIo<hyper::upgrade::Upgraded>> {
connect_ws(ws_url).await.unwrap()
}
async fn read_text_value<S>(ws: &mut WebSocket<S>) -> Value
where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{
loop {
let frame = ws.read_frame().await.unwrap();
if frame.opcode != OpCode::Text {
continue;
}
return serde_json::from_slice(&frame.payload).unwrap();
}
}
async fn write_json<S>(ws: &mut WebSocket<S>, v: &Value)
where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{
let payload = serde_json::to_vec(v).unwrap();
ws.write_frame(Frame::new(
true,
OpCode::Text,
None,
fastwebsockets::Payload::Owned(payload),
))
.await
.unwrap();
}
async fn spawn_test_mux(
cef: &MockUpstream,
deno: &MockUpstream,
inspect_brk: bool,
) -> MuxHandle {
let listen_port = allocate_random_port().unwrap();
spawn_mux(MuxConfig {
listen: format!("127.0.0.1:{listen_port}").parse().unwrap(),
deno_internal: deno.listen,
cef_internal: cef.listen,
inspect_brk,
wait_for_debugger: inspect_brk,
})
.await
.unwrap()
}
async fn http_get(addr: SocketAddr, path: &str) -> (http::StatusCode, Bytes) {
let stream = TcpStream::connect(addr).await.unwrap();
let io = TokioIo::new(stream);
let (mut sender, conn) =
hyper::client::conn::http1::handshake(io).await.unwrap();
tokio::spawn(async move {
let _ = conn.await;
});
let req = hyper::Request::builder()
.method(http::Method::GET)
.uri(path)
.header(http::header::HOST, addr.to_string())
.body(Empty::<Bytes>::new())
.unwrap();
let resp = sender.send_request(req).await.unwrap();
let status = resp.status();
let bytes = resp.collect().await.unwrap().to_bytes();
(status, bytes)
}
#[tokio::test]
async fn integration_http_endpoints() {
let cef = spawn_mock_upstream().await;
let deno = spawn_mock_upstream().await;
let mux = spawn_test_mux(&cef, &deno, false).await;
let (status, body) = http_get(mux.listen, "/json/list").await;
assert_eq!(status, http::StatusCode::OK);
let list: Vec<Value> = serde_json::from_slice(&body).unwrap();
assert_eq!(list.len(), 3);
let (status, body) = http_get(mux.listen, "/json/version").await;
assert_eq!(status, http::StatusCode::OK);
let v: Value = serde_json::from_slice(&body).unwrap();
assert_eq!(v["Protocol-Version"], "1.3");
let (status, _) = http_get(mux.listen, "/debugger-attached").await;
assert_eq!(status, http::StatusCode::SERVICE_UNAVAILABLE);
let (status, _) = http_get(mux.listen, "/nope").await;
assert_eq!(status, http::StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn integration_unified_session_routes_and_rewrites() {
let cef = spawn_mock_upstream().await;
let deno = spawn_mock_upstream().await;
let mux = spawn_test_mux(&cef, &deno, false).await;
let ws_url = format!("ws://{}/unified", mux.listen);
let mut client = test_connect_ws(&ws_url).await;
client.set_auto_close(false);
client.set_auto_pong(false);
for _ in 0..50 {
let (status, _) = http_get(mux.listen, "/debugger-attached").await;
if status == http::StatusCode::OK {
break;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
write_json(
&mut client,
&json!({
"id": 1,
"method": "Target.setAutoAttach",
"params": { "autoAttach": true, "waitForDebuggerOnStart": false },
}),
)
.await;
let attached = read_text_value(&mut client).await;
assert_eq!(attached["method"], "Target.attachedToTarget");
let session_id = attached["params"]["sessionId"]
.as_str()
.unwrap()
.to_string();
assert_eq!(attached["params"]["targetInfo"]["type"], "worker");
let received = {
let mut rx = cef.from_mux.lock().await;
tokio::time::timeout(Duration::from_secs(2), rx.recv())
.await
.unwrap()
.unwrap()
};
let received_val: Value =
serde_json::from_slice(&received.payload).unwrap();
assert_eq!(received_val["method"], "Target.setAutoAttach");
write_json(
&mut client,
&json!({
"id": 2,
"method": "Debugger.enable",
"sessionId": session_id,
}),
)
.await;
let deno_received = {
let mut rx = deno.from_mux.lock().await;
tokio::time::timeout(Duration::from_secs(2), rx.recv())
.await
.unwrap()
.unwrap()
};
let deno_val: Value =
serde_json::from_slice(&deno_received.payload).unwrap();
assert_eq!(deno_val["id"], 2);
assert_eq!(deno_val["method"], "Debugger.enable");
assert!(deno_val["sessionId"].is_null());
cef
.to_mux
.send(OwnedFrame::text(
serde_json::to_vec(&json!({
"method": "Runtime.executionContextCreated",
"params": { "context": { "id": 1, "origin": "", "name": "top" } },
}))
.unwrap(),
))
.unwrap();
let from_cef = read_text_value(&mut client).await;
assert_eq!(from_cef["method"], "Runtime.executionContextCreated");
assert_eq!(from_cef["params"]["context"]["name"], "Renderer");
assert!(from_cef["sessionId"].is_null());
deno
.to_mux
.send(OwnedFrame::text(
serde_json::to_vec(&json!({
"method": "Runtime.executionContextCreated",
"params": { "context": { "id": 1, "origin": "", "name": "main realm" } },
}))
.unwrap(),
))
.unwrap();
let from_deno = read_text_value(&mut client).await;
assert_eq!(from_deno["method"], "Runtime.executionContextCreated");
assert_eq!(from_deno["params"]["context"]["name"], "Deno");
assert_eq!(from_deno["sessionId"], session_id);
drop(client);
drop(mux);
}
#[tokio::test]
async fn integration_inspect_brk_injects_before_marking_attached() {
let cef = spawn_mock_upstream().await;
let deno = spawn_mock_upstream().await;
let mux = spawn_test_mux(&cef, &deno, true).await;
let ws_url = format!("ws://{}/unified", mux.listen);
let client_connect =
tokio::spawn(async move { test_connect_ws(&ws_url).await });
let enable_frame = {
let mut rx = cef.from_mux.lock().await;
tokio::time::timeout(Duration::from_secs(5), rx.recv())
.await
.expect("no frame received on CEF upstream")
.unwrap()
};
let enable_val: Value =
serde_json::from_slice(&enable_frame.payload).unwrap();
assert_eq!(enable_val["method"], "Debugger.enable");
let (status, _) = http_get(mux.listen, "/debugger-attached").await;
assert_eq!(
status,
http::StatusCode::SERVICE_UNAVAILABLE,
"debugger_attached must not be signalled until pause injection completes"
);
let pause_frame = {
let mut rx = cef.from_mux.lock().await;
tokio::time::timeout(Duration::from_secs(5), rx.recv())
.await
.expect("no pause frame received")
.unwrap()
};
let pause_val: Value =
serde_json::from_slice(&pause_frame.payload).unwrap();
assert_eq!(pause_val["method"], "Debugger.pause");
cef
.to_mux
.send(OwnedFrame::text(
serde_json::to_vec(&json!({"id": -1, "result": {}})).unwrap(),
))
.unwrap();
cef
.to_mux
.send(OwnedFrame::text(
serde_json::to_vec(&json!({"id": -2, "result": {}})).unwrap(),
))
.unwrap();
let mut saw_attached = false;
for _ in 0..100 {
let (status, _) = http_get(mux.listen, "/debugger-attached").await;
if status == http::StatusCode::OK {
saw_attached = true;
break;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
assert!(
saw_attached,
"debugger_attached never flipped to 200 after injection completed"
);
let _client = client_connect.await.unwrap();
drop(mux);
}
}