#![allow(
clippy::too_many_lines,
clippy::doc_markdown,
clippy::uninlined_format_args,
clippy::single_char_pattern,
clippy::cast_precision_loss,
clippy::unwrap_used,
clippy::expect_used,
clippy::needless_pass_by_value
)]
use super::client::McpClient;
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
pub fn with_stub_server<F: FnOnce(&str)>(body: F) {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind");
let addr = listener.local_addr().expect("addr");
let base = format!("http://{addr}");
let stop = Arc::new(AtomicBool::new(false));
let stop_clone = stop.clone();
let handle = thread::spawn(move || {
listener.set_nonblocking(true).expect("listener nonblocking");
while !stop_clone.load(Ordering::Acquire) {
match listener.accept() {
Ok((stream, _)) => {
thread::spawn(move || handle_stub_conn(stream));
},
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
thread::sleep(std::time::Duration::from_millis(10));
},
Err(_) => break,
}
}
});
body(&base);
stop.store(true, Ordering::Release);
let _ = TcpStream::connect(addr);
let _ = handle.join();
}
fn handle_stub_conn(mut stream: TcpStream) {
let mut buf = [0u8; 4096];
let Ok(n) = stream.read(&mut buf) else { return };
let request = String::from_utf8_lossy(&buf[..n]);
let mut lines = request.lines();
let request_line = lines.next().unwrap_or("");
let mut parts = request_line.split_whitespace();
let method = parts.next().unwrap_or("GET").to_string();
let path = parts.next().unwrap_or("/").to_string();
let mut headers: Vec<(String, String)> = Vec::new();
let mut content_length = 0usize;
for line in lines.by_ref() {
if line.is_empty() {
break;
}
if let Some((k, v)) = line.split_once(':') {
let k = k.trim().to_string();
let v = v.trim().to_string();
if k.eq_ignore_ascii_case("content-length") {
content_length = v.parse().unwrap_or(0);
}
headers.push((k, v));
}
}
let mut body = vec![0u8; content_length];
if content_length > 0 {
if let Some(idx) = request.find("\r\n\r\n") {
let body_start = idx + 4;
let already = (n - body_start.min(n)).min(content_length);
if already > 0 {
body[..already].copy_from_slice(&buf[body_start..body_start + already]);
}
if already < content_length {
let _ = stream.read_exact(&mut body[already..]);
}
}
}
let response = build_stub_response(&method, &path, &headers, &body);
let _ = stream.write_all(&response);
let _ = stream.flush();
}
fn build_stub_response(method: &str, path: &str, headers: &[(String, String)], body: &[u8]) -> Vec<u8> {
match (method, path) {
("GET", "/redirect") => {
b"HTTP/1.1 302 Found\r\nLocation: /landed\r\nContent-Length: 0\r\nConnection: close\r\n\r\n".to_vec()
},
("GET", "/landed") => http_response("text/plain", b"landed"),
("GET", "/api/users") => http_response("application/json", br#"{"users":["alice","bob"]}"#),
("POST", "/echo") => http_response("text/plain", body),
("GET", "/multi-cookie") => {
let body = b"cookies-set";
let mut out = format!(
"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nSet-Cookie: a=1; Path=/\r\nSet-Cookie: b=2; Path=/\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
body.len()
)
.into_bytes();
out.extend_from_slice(body);
out
},
("GET", "/ua-marker") => {
let ua = headers
.iter()
.find(|(k, _)| k.eq_ignore_ascii_case("user-agent"))
.map_or("", |(_, v)| v.as_str());
let payload = format!("UA={ua}");
http_response("text/plain", payload.as_bytes())
},
_ => http_status_response(404, "Not Found", "text/plain", b"not found"),
}
}
fn http_response(content_type: &str, body: &[u8]) -> Vec<u8> {
http_status_response(200, "OK", content_type, body)
}
fn http_status_response(status: u16, status_text: &str, content_type: &str, body: &[u8]) -> Vec<u8> {
let mut out = format!(
"HTTP/1.1 {status} {status_text}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
body.len()
)
.into_bytes();
out.extend_from_slice(body);
out
}
pub fn test_network_redirect_chain(c: &mut McpClient) {
with_stub_server(|base| {
let landed = format!("{base}/landed");
c.nav_url("about:blank");
let script = format!(
r"
const wait = page.waitForResponse({landed:?}, 10000);
await page.goto({base:?} + '/redirect');
const resp = await wait;
const req = resp.request();
const prev = req.redirectedFrom();
if (!prev) {{
throw new Error('redirectedFrom() should expose the prior request');
}}
const prevResp = await prev.response();
return {{
finalUrl: resp.url(),
finalStatus: resp.status(),
prevUrl: prev.url(),
prevStatus: prevResp ? prevResp.status() : null,
redirectedFromIsNull: req.redirectedFrom() === null,
}};
",
landed = landed,
base = base,
);
let v = c.script_value(&script);
assert_eq!(v["finalUrl"].as_str(), Some(landed.as_str()), "final url: {v}");
assert_eq!(v["finalStatus"].as_i64(), Some(200), "final status: {v}");
assert!(
v["prevUrl"].as_str().is_some_and(|s| s.ends_with("/redirect")),
"prev url should be the 302: {v}",
);
assert_eq!(v["prevStatus"].as_i64(), Some(302), "prev status: {v}");
assert!(
!v["redirectedFromIsNull"].as_bool().unwrap_or(true),
"chain link present: {v}",
);
});
}
pub fn test_network_request_failure(c: &mut McpClient) {
if c.backend == "webkit" {
return test_network_request_failure_via_refused_port(c);
}
with_stub_server(|base| {
c.nav_url(&format!("{base}/landed"));
let script = r#"
await page.route('**/api/blocked-by-route', (route) => {
route.abort('blockedbyclient');
});
try {
const failedPromise = page.waitForEvent('requestfailed', 10000).catch(() => null);
const fetchOutcome = await page.evaluate(
"fetch('/api/blocked-by-route').then(() => 'ok').catch(() => 'blocked')"
);
const failedEvent = await failedPromise;
let failureText = null;
let failedUrl = null;
if (failedEvent && typeof failedEvent.failure === 'function') {
const f = await failedEvent.failure();
failureText = f ? f.errorText : null;
failedUrl = failedEvent.url();
}
return { fetchOutcome, failureText, failedUrl };
} finally {
await page.unroute('**/api/blocked-by-route');
}
"#;
let v = c.script_value(script);
assert_eq!(
v["fetchOutcome"].as_str(),
Some("blocked"),
"fetch should be blocked: {v}"
);
assert!(
v["failedUrl"]
.as_str()
.is_some_and(|u| u.contains("/api/blocked-by-route")),
"failedUrl should match the aborted request: {v}",
);
assert!(
v["failureText"].as_str().is_some_and(|t| {
t.contains("blockedbyclient") || t.contains("net::ERR_BLOCKED") || t.contains("NS_ERROR_ABORT")
}),
"failureText should carry the abort reason: {v}",
);
});
}
fn test_network_request_failure_via_refused_port(c: &mut McpClient) {
with_stub_server(|base| {
c.nav_url(&format!("{base}/landed"));
let script = r#"
const failedPromise = page.waitForEvent('requestfailed', 10000).catch(() => null);
const fetchOutcome = await page.evaluate(
"fetch('http://127.0.0.1:65530/blocked').then(() => 'ok').catch(() => 'blocked')"
);
const failedEvent = await failedPromise;
let failureText = null;
let failedUrl = null;
if (failedEvent && typeof failedEvent.failure === 'function') {
const f = await failedEvent.failure();
failureText = f ? f.errorText : null;
failedUrl = failedEvent.url();
}
return { fetchOutcome, failureText, failedUrl };
"#;
let v = c.script_value(script);
assert_eq!(v["fetchOutcome"].as_str(), Some("blocked"), "fetch should fail: {v}");
assert!(
v["failedUrl"].as_str().is_some_and(|u| u.contains("/blocked")),
"failedUrl should match the failed request: {v}",
);
assert!(
v["failureText"].as_str().is_some_and(|t| !t.is_empty()),
"failureText should be set: {v}",
);
});
}
pub fn test_network_response_body(c: &mut McpClient) {
with_stub_server(|base| {
c.nav_url(&format!("{base}/landed"));
let script = r#"
const wait = page.waitForResponse('**/api/users', 10000);
await page.evaluate("fetch('/api/users').then(r => r.text())");
const resp = await wait;
const text = await resp.text();
const json = await resp.json();
const headerValue = await resp.headerValue('content-type');
return {
status: resp.status(),
bodyText: text,
users: json.users,
headerValue,
};
"#;
let body_supported = c.backend == "cdp-pipe" || c.backend == "cdp-raw" || c.backend == "webkit";
if body_supported {
let v = c.script_value(script);
assert_eq!(v["status"].as_i64(), Some(200), "status: {v}");
assert!(
v["bodyText"].as_str().is_some_and(|s| s.contains("alice")),
"body text: {v}",
);
let users = v["users"].as_array().expect("users array");
assert_eq!(users.len(), 2, "users length: {v}");
assert!(
v["headerValue"]
.as_str()
.is_some_and(|s| s.contains("application/json")),
"content-type: {v}",
);
} else {
let payload = c.script(script);
assert_eq!(payload["status"].as_str(), Some("error"), "should error: {payload}");
let msg = payload["error"]["message"].as_str().unwrap_or("");
assert!(
msg.contains("not supported")
|| msg.contains("Unsupported")
|| msg.contains("unsupported")
|| msg.contains("unavailable"),
"{} body should surface typed Unsupported: {msg}",
c.backend,
);
}
});
}
pub fn test_network_post_data(c: &mut McpClient) {
with_stub_server(|base| {
c.nav_url(&format!("{base}/landed"));
let script = r"
const wait = page.waitForRequest('**/echo', 10000);
await page.evaluate(`
fetch('/echo', {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify({ ping: 'pong', n: 7 }),
}).then(r => r.text())
`);
const req = await wait;
const data = req.postData();
const parsed = req.postDataJSON();
return {
method: req.method(),
data,
parsedPing: parsed ? parsed.ping : null,
parsedN: parsed ? parsed.n : null,
};
";
let v = c.script_value(script);
assert_eq!(v["method"].as_str(), Some("POST"), "method: {v}");
if c.backend == "cdp-pipe" || c.backend == "cdp-raw" || c.backend == "webkit" {
assert!(
v["data"].as_str().is_some_and(|s| s.contains("\"ping\":\"pong\"")),
"data: {v}",
);
assert_eq!(v["parsedPing"].as_str(), Some("pong"), "parsedPing: {v}");
assert_eq!(v["parsedN"].as_i64(), Some(7), "parsedN: {v}");
} else {
assert!(
v["data"].is_null() || v["data"].as_str() == Some(""),
"{} backend cannot capture postData yet: {v}",
c.backend,
);
}
});
}
pub fn test_network_headers(c: &mut McpClient) {
with_stub_server(|base| {
c.nav_url(&format!("{base}/landed"));
let script = r#"
const cookieWait = page.waitForResponse('**/multi-cookie', 10000);
const uaWait = page.waitForRequest('**/ua-marker', 10000);
await page.evaluate("fetch('/multi-cookie').then(r => r.text())");
const cookieResp = await cookieWait;
await page.evaluate("fetch('/ua-marker').then(r => r.text())");
const uaReq = await uaWait;
const uaHeaders = uaReq.headers();
const uaName = Object.keys(uaHeaders).find(k => k.toLowerCase() === 'user-agent');
const cookieHeaders = await cookieResp.headersArray();
const cookieEntries = cookieHeaders.filter(h => h.name.toLowerCase() === 'set-cookie');
const setCookieJoined = await cookieResp.headerValue('set-cookie');
return {
userAgent: uaName ? uaHeaders[uaName] : null,
setCookieCount: cookieEntries.length,
setCookieJoined,
};
"#;
let v = c.script_value(script);
if c.backend == "cdp-pipe" || c.backend == "cdp-raw" || c.backend == "bidi" || c.backend == "webkit" {
assert!(
v["userAgent"].as_str().is_some_and(|s| !s.is_empty()),
"User-Agent header should be set on {}: {v}",
c.backend,
);
} else {
assert!(
v["userAgent"].is_null() || v["userAgent"].as_str() == Some(""),
"WebKit cannot surface browser-set request headers: {v}",
);
}
if c.backend == "cdp-pipe" || c.backend == "cdp-raw" {
assert_eq!(
v["setCookieCount"].as_i64(),
Some(2),
"two Set-Cookie entries preserved: {v}",
);
}
assert!(
v["setCookieJoined"]
.as_str()
.is_some_and(|s| s.contains("a=1") && s.contains("b=2")),
"{} joined set-cookie should carry both values: {v}",
c.backend,
);
});
}
pub fn test_network_websocket(c: &mut McpClient) {
if c.backend == "bidi" || c.backend == "webkit" {
c.nav_url("about:blank");
let script = "const r = await page.waitForEvent('websocket', 500).catch(e => ({ error: String(e) }));\
return r && r.error ? { error: r.error } : { ok: true };";
let v = c.script_value(script);
assert!(
v["error"].as_str().is_some_and(|s| {
let lc = s.to_ascii_lowercase();
lc.contains("timeout") || lc.contains("waiting for event")
}),
"BiDi/WebKit WebSocket should reject with typed timeout: {v}",
);
return;
}
let (ws_url, ws_stop) = spawn_ws_echo();
let (http_url, http_stop) = spawn_http_blank();
c.nav_url(&http_url);
let script = format!(
r#"
const wsPromise = page.waitForEvent('websocket', 10000);
await page.evaluate(`
window.__ws = new WebSocket({ws_url:?});
window.__opened = new Promise((res) => {{ window.__ws.onopen = () => res(); }});
null
`);
const ws = await wsPromise;
const recvPromise = ws.waitForEvent('framereceived', 10000);
await page.evaluate("window.__opened.then(() => window.__ws.send('hello-ws')); null");
const frame = await recvPromise;
return {{
url: ws.url(),
payload: frame.payload,
isClosed: ws.isClosed(),
}};
"#,
ws_url = ws_url,
);
let v = c.script_value(&script);
assert!(v["url"].as_str().is_some_and(|s| s.starts_with("ws://")), "ws url: {v}");
assert_eq!(v["payload"].as_str(), Some("hello-ws"), "echoed payload: {v}");
let _ = ws_stop.send(());
let _ = http_stop.send(());
}
fn spawn_http_blank() -> (String, std::sync::mpsc::Sender<()>) {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener as TokioListener;
let std_listener = TcpListener::bind("127.0.0.1:0").expect("http bind");
let addr = std_listener.local_addr().expect("addr");
let url = format!("http://{addr}/");
std_listener.set_nonblocking(true).expect("nonblocking");
let (tx, rx) = std::sync::mpsc::channel::<()>();
thread::spawn(move || {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("http runtime");
runtime.block_on(async move {
let listener = TokioListener::from_std(std_listener).expect("from_std");
loop {
if rx.try_recv().is_ok() {
break;
}
tokio::select! {
accept = listener.accept() => {
if let Ok((mut stream, _)) = accept {
tokio::spawn(async move {
let mut buf = [0u8; 1024];
let _ = stream.read(&mut buf).await;
let body = b"<!doctype html><title>ferri</title>";
let resp = format!(
"HTTP/1.1 200 OK\r\nContent-Length: {}\r\nContent-Type: text/html\r\nConnection: close\r\n\r\n",
body.len(),
);
let _ = stream.write_all(resp.as_bytes()).await;
let _ = stream.write_all(body).await;
let _ = stream.shutdown().await;
});
}
},
() = tokio::time::sleep(std::time::Duration::from_millis(50)) => {},
}
}
});
});
(url, tx)
}
fn spawn_ws_echo() -> (String, std::sync::mpsc::Sender<()>) {
use futures::{SinkExt, StreamExt};
use tokio::net::TcpListener as TokioListener;
let std_listener = TcpListener::bind("127.0.0.1:0").expect("ws bind");
let addr = std_listener.local_addr().expect("addr");
let url = format!("ws://{addr}/");
std_listener.set_nonblocking(true).expect("nonblocking");
let (tx, rx) = std::sync::mpsc::channel::<()>();
thread::spawn(move || {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("ws runtime");
runtime.block_on(async move {
let listener = TokioListener::from_std(std_listener).expect("from_std");
loop {
if rx.try_recv().is_ok() {
break;
}
tokio::select! {
accept = listener.accept() => {
if let Ok((stream, _)) = accept {
tokio::spawn(async move {
let Ok(mut ws) = tokio_tungstenite::accept_async(stream).await else { return };
while let Some(Ok(msg)) = ws.next().await {
if msg.is_text() || msg.is_binary() {
let _ = ws.send(msg).await;
}
if ws.send(tokio_tungstenite::tungstenite::Message::Close(None)).await.is_ok() {
break;
}
}
});
}
},
() = tokio::time::sleep(std::time::Duration::from_millis(50)) => {},
}
}
});
});
(url, tx)
}