use std::{
io::{Read, Write},
net::{TcpListener, TcpStream},
process::{Command, Stdio},
sync::mpsc,
thread,
time::Duration,
};
use serde_json::Value;
use tokio_tungstenite::tungstenite::{accept, Message as WsFrame};
use crate::support::TestWorkspace;
pub(crate) fn spawn_http_server(status: u16, reason: &str, content_type: &str, body: &str) -> String {
let listener = TcpListener::bind("127.0.0.1:0").expect("listener should bind");
let address = listener.local_addr().expect("address should be available");
let reason = reason.to_string();
let content_type = content_type.to_string();
let body = body.to_string();
thread::spawn(move || {
let (mut stream, _) = listener.accept().expect("connection should be accepted");
let mut buffer = [0_u8; 1024];
let _ = stream.read(&mut buffer);
let response = format!(
"HTTP/1.1 {status} {reason}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}",
body.len()
);
stream
.write_all(response.as_bytes())
.expect("response should be written");
});
format!("http://{}", address)
}
pub(crate) fn spawn_header_echo_server(header_name: &str) -> String {
let listener = TcpListener::bind("127.0.0.1:0").expect("listener should bind");
let address = listener.local_addr().expect("address should be available");
let header_name = header_name.to_string();
thread::spawn(move || {
let (mut stream, _) = listener.accept().expect("connection should be accepted");
let request = read_http_request(&mut stream);
let header_value = request
.lines()
.find_map(|line| {
let (name, value) = line.split_once(':')?;
if name.trim().eq_ignore_ascii_case(&header_name) {
Some(value.trim().to_string())
} else {
None
}
})
.expect("expected request header to be present");
let body = serde_json::json!({ "value": header_value }).to_string();
write_http_response(
&mut stream,
200,
"OK",
Some("application/json"),
&body,
);
});
format!("http://{}", address)
}
pub(crate) fn spawn_cookie_session_server() -> String {
let listener = TcpListener::bind("127.0.0.1:0").expect("listener should bind");
let address = listener.local_addr().expect("address should be available");
thread::spawn(move || {
let (mut login_stream, _) = listener.accept().expect("login should connect");
let login_request = read_http_request(&mut login_stream);
assert!(login_request.starts_with("POST /login HTTP/1.1"));
assert!(
!login_request.to_ascii_lowercase().contains("cookie:"),
"initial login request should not include a cookie"
);
let login_body = r#"{"ok":true}"#;
let response = format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nSet-Cookie: session=abc123; Path=/; HttpOnly\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{login_body}",
login_body.len()
);
login_stream
.write_all(response.as_bytes())
.expect("login response should be written");
let (mut profile_stream, _) = listener.accept().expect("profile should connect");
let profile_request = read_http_request(&mut profile_stream);
let profile_request_lower = profile_request.to_ascii_lowercase();
assert!(profile_request.starts_with("GET /profile HTTP/1.1"));
assert!(
profile_request_lower.contains("cookie: session=abc123"),
"expected cookie header in profile request, got: {profile_request}"
);
write_http_response(
&mut profile_stream,
200,
"OK",
Some("application/json"),
r#"{"authenticated":true}"#,
);
});
format!("http://{}", address)
}
pub(crate) fn spawn_mcp_http_server() -> String {
let listener = TcpListener::bind("127.0.0.1:0").expect("listener should bind");
let address = listener.local_addr().expect("address should be available");
thread::spawn(move || {
let (mut initialize_stream, _) = listener.accept().expect("initialize should connect");
let initialize_request = read_http_request(&mut initialize_stream);
let initialize_request_lower = initialize_request.to_ascii_lowercase();
assert!(initialize_request_lower.contains("authorization: bearer test-token"));
assert!(initialize_request.contains("\"method\":\"initialize\""));
assert!(initialize_request.contains("\"protocolVersion\":\"2025-11-25\""));
assert!(initialize_request.contains("\"name\":\"hen\""));
assert!(initialize_request.contains(env!("CARGO_PKG_VERSION")));
write_http_response(
&mut initialize_stream,
200,
"OK",
Some("application/json"),
r#"{"jsonrpc":"2.0","id":1,"result":{"protocolVersion":"2025-11-25","capabilities":{},"serverInfo":{"name":"fixture-mcp","version":"1.0.0"}}}"#,
);
let (mut initialized_stream, _) = listener.accept().expect("initialized notification should connect");
let initialized_request = read_http_request(&mut initialized_stream);
let initialized_request_lower = initialized_request.to_ascii_lowercase();
assert!(initialized_request_lower.contains("authorization: bearer test-token"));
assert!(initialized_request.contains("\"method\":\"notifications/initialized\""));
write_http_response(&mut initialized_stream, 202, "Accepted", None, "");
let (mut list_tools_stream, _) = listener.accept().expect("tools/list should connect");
let list_tools_request = read_http_request(&mut list_tools_stream);
let list_tools_request_lower = list_tools_request.to_ascii_lowercase();
assert!(list_tools_request_lower.contains("authorization: bearer test-token"));
assert!(list_tools_request.contains("\"method\":\"tools/list\""));
write_http_response(
&mut list_tools_stream,
200,
"OK",
Some("application/json"),
r#"{"jsonrpc":"2.0","id":2,"result":{"tools":[{"name":"search"}]}}"#,
);
let (mut list_resources_stream, _) = listener.accept().expect("resources/list should connect");
let list_resources_request = read_http_request(&mut list_resources_stream);
let list_resources_request_lower = list_resources_request.to_ascii_lowercase();
assert!(list_resources_request_lower.contains("authorization: bearer test-token"));
assert!(list_resources_request.contains("\"method\":\"resources/list\""));
write_http_response(
&mut list_resources_stream,
200,
"OK",
Some("application/json"),
r#"{"jsonrpc":"2.0","id":3,"result":{"resources":[{"uri":"file:///fixture.txt","name":"Fixture Resource"}]}}"#,
);
let (mut tool_call_stream, _) = listener.accept().expect("tools/call should connect");
let tool_call_request = read_http_request(&mut tool_call_stream);
let tool_call_request_lower = tool_call_request.to_ascii_lowercase();
assert!(tool_call_request_lower.contains("authorization: bearer test-token"));
assert!(tool_call_request.contains("\"method\":\"tools/call\""));
assert!(tool_call_request.contains("\"name\":\"search\""));
assert!(tool_call_request.contains("\"query\":\"hedgehog\""));
write_http_response(
&mut tool_call_stream,
200,
"OK",
Some("application/json"),
r#"{"jsonrpc":"2.0","id":4,"result":{"content":[{"type":"text","text":"search result"}]}}"#,
);
});
format!("http://{}", address)
}
pub(crate) fn spawn_mcp_protocol_error_server() -> String {
let listener = TcpListener::bind("127.0.0.1:0").expect("listener should bind");
let address = listener.local_addr().expect("address should be available");
thread::spawn(move || {
let (mut initialize_stream, _) = listener.accept().expect("initialize should connect");
let initialize_request = read_http_request(&mut initialize_stream);
let initialize_request_lower = initialize_request.to_ascii_lowercase();
assert!(initialize_request_lower.contains("authorization: bearer test-token"));
assert!(initialize_request.contains("\"method\":\"initialize\""));
write_http_response(
&mut initialize_stream,
200,
"OK",
Some("application/json"),
r#"{"jsonrpc":"2.0","id":1,"result":{"protocolVersion":"2025-11-25","capabilities":{},"serverInfo":{"name":"fixture-mcp","version":"1.0.0"}}}"#,
);
let (mut initialized_stream, _) = listener.accept().expect("initialized notification should connect");
let initialized_request = read_http_request(&mut initialized_stream);
let initialized_request_lower = initialized_request.to_ascii_lowercase();
assert!(initialized_request_lower.contains("authorization: bearer test-token"));
assert!(initialized_request.contains("\"method\":\"notifications/initialized\""));
write_http_response(&mut initialized_stream, 202, "Accepted", None, "");
let (mut tool_call_stream, _) = listener.accept().expect("tools/call should connect");
let tool_call_request = read_http_request(&mut tool_call_stream);
let tool_call_request_lower = tool_call_request.to_ascii_lowercase();
assert!(tool_call_request_lower.contains("authorization: bearer test-token"));
assert!(tool_call_request.contains("\"method\":\"tools/call\""));
assert!(tool_call_request.contains("\"name\":\"missing-tool\""));
write_http_response(
&mut tool_call_stream,
200,
"OK",
Some("application/json"),
r#"{"jsonrpc":"2.0","id":2,"error":{"code":-32601,"message":"Tool not found"}}"#,
);
});
format!("http://{}", address)
}
pub(crate) fn spawn_mcp_sse_http_server() -> String {
let listener = TcpListener::bind("127.0.0.1:0").expect("listener should bind");
let address = listener.local_addr().expect("address should be available");
thread::spawn(move || {
let (mut initialize_stream, _) = listener.accept().expect("initialize should connect");
let initialize_request = read_http_request(&mut initialize_stream);
assert!(initialize_request.contains("\"method\":\"initialize\""));
write_http_response(
&mut initialize_stream,
200,
"OK",
Some("text/event-stream"),
concat!(
"event: message\r\n",
"data:\r\n\r\n",
"event: message\r\n",
"data: {\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"protocolVersion\":\"2025-06-18\",\"capabilities\":{\"tools\":{\"listChanged\":true}},\"serverInfo\":{\"name\":\"fixture-mcp\",\"version\":\"1.0.0\"}}}\r\n\r\n"
),
);
let (mut initialized_stream, _) = listener.accept().expect("initialized notification should connect");
let initialized_request = read_http_request(&mut initialized_stream);
assert!(initialized_request.contains("\"method\":\"notifications/initialized\""));
write_http_response(&mut initialized_stream, 202, "Accepted", None, "");
});
format!("http://{}", address)
}
pub(crate) fn spawn_sse_http_server() -> String {
let listener = TcpListener::bind("127.0.0.1:0").expect("listener should bind");
let address = listener.local_addr().expect("address should be available");
thread::spawn(move || {
let (mut stream, _) = listener.accept().expect("stream should connect");
let request = read_http_request(&mut stream);
let request_lower = request.to_ascii_lowercase();
assert!(request.starts_with("GET / HTTP/1.1") || request.starts_with("GET / "));
assert!(request_lower.contains("accept: text/event-stream"));
write_http_response(
&mut stream,
200,
"OK",
Some("text/event-stream"),
concat!(
"event: price\r\n",
"id: evt-1\r\n",
"data: {\"symbol\":\"AAPL\",\"price\":182.4}\r\n\r\n"
),
);
});
format!("http://{}", address)
}
pub(crate) fn spawn_ws_server() -> String {
let listener = TcpListener::bind("127.0.0.1:0").expect("listener should bind");
let address = listener.local_addr().expect("address should be available");
thread::spawn(move || {
let (stream, _) = listener.accept().expect("socket should connect");
let mut socket = accept(stream).expect("handshake should succeed");
let message = socket.read().expect("message should be readable");
let text = match message {
WsFrame::Text(text) => text.to_string(),
other => panic!("expected text message, got {:?}", other),
};
let payload: Value = serde_json::from_str(&text).expect("payload should be valid json");
assert_eq!(payload["type"], "hello");
assert_eq!(payload["room"], "prices");
socket
.send(WsFrame::Text(
r#"{"type":"ack","room":"prices"}"#.to_string().into(),
))
.expect("ack should be sent");
});
format!("ws://{}", address)
}
pub(crate) fn spawn_oauth_client_credentials_server() -> String {
let listener = TcpListener::bind("127.0.0.1:0").expect("listener should bind");
let address = listener.local_addr().expect("address should be available");
let base_url = format!("http://{}", address);
let token_url = format!("{}/token", base_url);
thread::spawn(move || {
let (mut discovery_stream, _) = listener.accept().expect("discovery should connect");
let discovery_request = read_http_request(&mut discovery_stream);
assert!(
discovery_request.starts_with("GET /.well-known/openid-configuration HTTP/"),
"request: {discovery_request}"
);
write_http_response(
&mut discovery_stream,
200,
"OK",
Some("application/json"),
&serde_json::json!({ "token_endpoint": token_url }).to_string(),
);
let (mut token_stream, _) = listener.accept().expect("token should connect");
let token_request = read_http_request(&mut token_stream);
assert!(
token_request.starts_with("POST /token HTTP/"),
"request: {token_request}"
);
let token_body = request_body(&token_request);
assert!(token_body.contains("grant_type=client_credentials"), "body: {token_body}");
assert!(token_body.contains("client_id=hen-client"), "body: {token_body}");
assert!(token_body.contains("client_secret=hen-secret"), "body: {token_body}");
assert!(token_body.contains("scope=read%3Afixtures"), "body: {token_body}");
assert!(token_body.contains("audience=fixtures"), "body: {token_body}");
write_http_response(
&mut token_stream,
200,
"OK",
Some("application/json"),
r#"{"access_token":"oauth-access-token","token_type":"Bearer","expires_in":3600}"#,
);
let (mut resource_one_stream, _) = listener.accept().expect("first resource should connect");
let resource_one_request = read_http_request(&mut resource_one_stream);
assert!(
resource_one_request.starts_with("GET /resource/one HTTP/"),
"request: {resource_one_request}"
);
assert_eq!(
request_header_value(&resource_one_request, "Authorization").as_deref(),
Some("Bearer oauth-access-token")
);
assert_eq!(
request_header_value(&resource_one_request, "X-Token-Type").as_deref(),
Some("Bearer")
);
write_http_response(
&mut resource_one_stream,
200,
"OK",
Some("application/json"),
r#"{"path":"one","authorization":"Bearer oauth-access-token","token_type":"Bearer"}"#,
);
let (mut resource_two_stream, _) = listener.accept().expect("second resource should connect");
let resource_two_request = read_http_request(&mut resource_two_stream);
assert!(
resource_two_request.starts_with("GET /resource/two HTTP/"),
"request: {resource_two_request}"
);
assert_eq!(
request_header_value(&resource_two_request, "Authorization").as_deref(),
Some("Bearer oauth-access-token")
);
write_http_response(
&mut resource_two_stream,
200,
"OK",
Some("application/json"),
r#"{"path":"two","authorization":"Bearer oauth-access-token"}"#,
);
let (mut resource_three_stream, _) = listener.accept().expect("third resource should connect");
let resource_three_request = read_http_request(&mut resource_three_stream);
assert!(
resource_three_request.starts_with("GET /resource/three HTTP/"),
"request: {resource_three_request}"
);
assert_eq!(
request_header_value(&resource_three_request, "Authorization").as_deref(),
Some("Bearer oauth-access-token")
);
write_http_response(
&mut resource_three_stream,
200,
"OK",
Some("application/json"),
r#"{"path":"three","authorization":"Bearer oauth-access-token"}"#,
);
});
base_url
}
pub(crate) fn spawn_oauth_refresh_server() -> String {
let listener = TcpListener::bind("127.0.0.1:0").expect("listener should bind");
let address = listener.local_addr().expect("address should be available");
thread::spawn(move || {
let (mut token_one_stream, _) = listener.accept().expect("first token request should connect");
let token_one_request = read_http_request(&mut token_one_stream);
assert!(
token_one_request.starts_with("POST /token HTTP/"),
"request: {token_one_request}"
);
let token_one_body = request_body(&token_one_request);
assert!(token_one_body.contains("grant_type=refresh_token"), "body: {token_one_body}");
assert!(token_one_body.contains("refresh_token=seed-refresh"), "body: {token_one_body}");
assert!(token_one_body.contains("client_id=hen-client"), "body: {token_one_body}");
write_http_response(
&mut token_one_stream,
200,
"OK",
Some("application/json"),
r#"{"access_token":"first-access-token","refresh_token":"next-refresh-token","token_type":"Bearer","expires_in":0}"#,
);
let (mut resource_one_stream, _) = listener.accept().expect("first resource should connect");
let resource_one_request = read_http_request(&mut resource_one_stream);
assert!(
resource_one_request.starts_with("GET /resource/one HTTP/"),
"request: {resource_one_request}"
);
assert_eq!(
request_header_value(&resource_one_request, "Authorization").as_deref(),
Some("Bearer first-access-token")
);
assert_eq!(
request_header_value(&resource_one_request, "X-Refresh-Token").as_deref(),
Some("next-refresh-token")
);
write_http_response(
&mut resource_one_stream,
200,
"OK",
Some("application/json"),
r#"{"path":"one","authorization":"Bearer first-access-token"}"#,
);
let (mut token_two_stream, _) = listener.accept().expect("second token request should connect");
let token_two_request = read_http_request(&mut token_two_stream);
assert!(
token_two_request.starts_with("POST /token HTTP/"),
"request: {token_two_request}"
);
let token_two_body = request_body(&token_two_request);
assert!(token_two_body.contains("grant_type=refresh_token"), "body: {token_two_body}");
assert!(token_two_body.contains("refresh_token=next-refresh-token"), "body: {token_two_body}");
assert!(token_two_body.contains("client_id=hen-client"), "body: {token_two_body}");
write_http_response(
&mut token_two_stream,
200,
"OK",
Some("application/json"),
r#"{"access_token":"second-access-token","token_type":"Bearer","expires_in":3600}"#,
);
let (mut resource_two_stream, _) = listener.accept().expect("second resource should connect");
let resource_two_request = read_http_request(&mut resource_two_stream);
assert!(
resource_two_request.starts_with("GET /resource/two HTTP/"),
"request: {resource_two_request}"
);
assert_eq!(
request_header_value(&resource_two_request, "Authorization").as_deref(),
Some("Bearer second-access-token")
);
write_http_response(
&mut resource_two_stream,
200,
"OK",
Some("application/json"),
r#"{"path":"two","authorization":"Bearer second-access-token"}"#,
);
});
format!("http://{}", address)
}
fn read_http_request(stream: &mut TcpStream) -> String {
let mut buffer = Vec::new();
let mut chunk = [0_u8; 1024];
let mut headers_end = None;
let mut content_length = 0usize;
loop {
let bytes_read = stream.read(&mut chunk).expect("request should be readable");
if bytes_read == 0 {
break;
}
buffer.extend_from_slice(&chunk[..bytes_read]);
if headers_end.is_none() {
if let Some(end) = find_header_terminator(&buffer) {
headers_end = Some(end);
let headers = String::from_utf8_lossy(&buffer[..end]);
content_length = parse_content_length(headers.as_ref());
}
}
if let Some(end) = headers_end {
if buffer.len() >= end + content_length {
break;
}
}
}
String::from_utf8(buffer).expect("request should be utf-8")
}
fn request_body(request: &str) -> &str {
request
.split_once("\r\n\r\n")
.map(|(_, body)| body)
.unwrap_or_default()
}
fn request_header_value(request: &str, header_name: &str) -> Option<String> {
request.lines().find_map(|line| {
let (name, value) = line.split_once(':')?;
if name.trim().eq_ignore_ascii_case(header_name) {
Some(value.trim().to_string())
} else {
None
}
})
}
fn find_header_terminator(buffer: &[u8]) -> Option<usize> {
buffer
.windows(4)
.position(|window| window == b"\r\n\r\n")
.map(|position| position + 4)
}
fn parse_content_length(headers: &str) -> usize {
headers
.lines()
.find_map(|line| {
let (name, value) = line.split_once(':')?;
if name.eq_ignore_ascii_case("content-length") {
value.trim().parse::<usize>().ok()
} else {
None
}
})
.unwrap_or(0)
}
fn write_http_response(
stream: &mut TcpStream,
status: u16,
reason: &str,
content_type: Option<&str>,
body: &str,
) {
let content_type_header = content_type
.map(|value| format!("Content-Type: {value}\r\n"))
.unwrap_or_default();
let response = format!(
"HTTP/1.1 {status} {reason}\r\n{content_type_header}Content-Length: {}\r\nConnection: close\r\n\r\n{body}",
body.len()
);
stream
.write_all(response.as_bytes())
.expect("response should be written");
}
#[cfg(unix)]
pub(crate) fn spawn_blocking_http_server(started_tx: mpsc::Sender<()>) -> String {
let listener = TcpListener::bind("127.0.0.1:0").expect("listener should bind");
let address = listener.local_addr().expect("address should be available");
thread::spawn(move || {
let (mut stream, _) = listener.accept().expect("connection should be accepted");
let mut buffer = [0_u8; 1024];
let _ = stream.read(&mut buffer);
started_tx
.send(())
.expect("test should observe the blocking request");
thread::sleep(Duration::from_secs(5));
let body = r#"{"ok":true}"#;
let response = format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}",
body.len()
);
let _ = stream.write_all(response.as_bytes());
});
format!("http://{}", address)
}
#[cfg(unix)]
pub(crate) fn run_until_signal(
workspace: &TestWorkspace,
args: &[&str],
signal: &str,
started_rx: mpsc::Receiver<()>,
) -> std::process::Output {
let child = Command::new(env!("CARGO_BIN_EXE_hen"))
.args(args)
.current_dir(workspace.root())
.env("NO_COLOR", "1")
.env("TERM", "dumb")
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("hen command should execute");
if started_rx.recv_timeout(Duration::from_secs(5)).is_err() {
let output = child
.wait_with_output()
.expect("child output should be available");
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
panic!(
"second request should start before signalling\nstatus: {:?}\nstdout: {}\nstderr: {}",
output.status.code(),
stdout,
stderr,
);
}
thread::sleep(Duration::from_millis(200));
let status = Command::new("kill")
.args([signal, &child.id().to_string()])
.status()
.expect("signal command should execute");
assert!(status.success(), "failed to send signal {signal}");
child
.wait_with_output()
.expect("child output should be available")
}