use crate::Options;
use crate::stats::{RealtimeStats, Statistics};
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Instant;
use bytes::Bytes;
use http::{HeaderMap, Request, StatusCode, header};
use rand::Rng;
use rmcp::model::{
CallToolRequest, CallToolRequestParams, ClientCapabilities, Implementation, InitializeRequest,
InitializeRequestParams, InitializeResult, InitializedNotification, JsonObject, JsonRpcRequest,
JsonRpcResponse, ListRootsResult, ListToolsRequest, ListToolsResult, NumberOrString,
};
use rmcp::serde_json::{self, Map, Value};
use crate::client::utils::*;
use crate::fatal;
use const_format::concatcp;
use http_body_util::{BodyExt, Full};
const MIME_APPLICATION_JSON: &str = "application/json";
const MIME_TEXT_EVENT_STREAM: &str = "text/event-stream";
const MIME_APPLICATION_JSON_AND_EVENT_STREAM: &str =
concatcp!(MIME_APPLICATION_JSON, ", ", MIME_TEXT_EVENT_STREAM);
#[derive(Debug, Default)]
pub struct McpSetup {
pub uri: hyper::Uri,
pub tool_bodies: Vec<Bytes>,
pub sse_task: Option<tokio::task::JoinHandle<()>>,
pub session_id: Option<String>,
}
pub async fn http_hyper_mcp(
tid: usize,
cid: usize,
opts: Arc<Options>,
rt_stats: &RealtimeStats,
) -> Statistics {
if opts.http2 {
http_hyper_mcp_client::<Http2>(tid, cid, opts.as_ref(), rt_stats).await
} else {
http_hyper_mcp_client::<Http1>(tid, cid, opts.as_ref(), rt_stats).await
}
}
async fn http_hyper_mcp_client<B: HttpConnectionBuilder>(
tid: usize,
cid: usize,
opts: &Options,
rt_stats: &RealtimeStats,
) -> Statistics {
let mut statistics = Statistics::new(opts.latency);
let mut total: u32 = 0;
let mut conn_req_count: u32;
let mut banner = HashSet::new();
let uri_str = opts.uri[cid % opts.uri.len()].as_str();
let mut uri = uri_str
.parse::<hyper::Uri>()
.unwrap_or_else(|e| fatal!(1, "invalid uri: {e}"));
let (mut host, mut port) =
get_conn_address(&opts, &uri).unwrap_or_else(|| fatal!(1, "no host specified in uri"));
let mut endpoint = build_conn_endpoint(&host, port);
let mut headers =
build_headers(&uri, opts).unwrap_or_else(|e| fatal!(2, "could not build headers: {e}"));
let mut mcp = McpSetup::default();
if opts.mcp_sse {
mcp = mcp_sse_initialize::<B>(&uri_str, opts, &headers).await;
uri = mcp.uri;
if opts.host.is_none() {
if let Some(h) = uri.host() {
host = h.to_owned();
}
if let Some(p) = uri.port_u16() {
port = p;
}
endpoint = build_conn_endpoint(&host, port);
}
}
headers.insert(
header::CONTENT_TYPE,
header::HeaderValue::from_static(MIME_APPLICATION_JSON),
);
let transport = if opts.mcp_sse {
headers.insert(
header::ACCEPT,
header::HeaderValue::from_static(MIME_APPLICATION_JSON),
);
"sse"
} else {
headers.insert(
header::ACCEPT,
header::HeaderValue::from_static(MIME_APPLICATION_JSON_AND_EVENT_STREAM),
);
"streamableHttp"
};
let clock = quanta::Clock::new();
let start = Instant::now();
'connection: loop {
if should_stop(total, start, &opts) {
break 'connection;
}
if cid < opts.uri.len() && !banner.contains(uri_str) {
banner.insert(uri_str.to_owned());
println!(
"hyper-mcp [{tid:>2}] -> connecting to {}:{}, method = POST uri = {} {} (transport {transport})...",
host,
port,
uri,
B::SCHEME
);
}
let (mut sender, mut conn_task) =
match B::build_connection(endpoint, &mut statistics, rt_stats, &opts).await {
Some(s) => s,
None => {
total += 1;
continue 'connection;
}
};
statistics.inc_conn();
conn_req_count = 0;
if !opts.mcp_sse && mcp.tool_bodies.is_empty() {
match mcp_streamable_http_initialize(&uri, &headers, &mut sender, opts).await {
Ok(setup) => {
mcp = setup;
if let Some(ref session_id) = mcp.session_id {
headers.insert(
http::header::HeaderName::from_static("mcp-session-id"),
http::header::HeaderValue::from_str(session_id)
.unwrap_or_else(|e| fatal!(3, "invalid session id: {e}")),
);
}
}
Err(e) => {
fatal!(3, "MCP Streamable HTTP initialization failed: {e}");
}
}
}
let bodies: Vec<Full<Bytes>> = mcp.tool_bodies.clone().into_iter().map(Full::new).collect();
loop {
let body = bodies
.get(total as usize % bodies.len())
.cloned()
.unwrap_or_else(|| Full::new(Bytes::from("")));
conn_req_count += 1;
let is_last = conn_req_count >= opts.rpc;
let mut req_headers = headers.clone();
if is_last {
req_headers.insert(header::CONNECTION, header::HeaderValue::from_static("close"));
}
let mut req = Request::new(body);
*req.method_mut() = http::Method::POST;
*req.uri_mut() = uri.clone();
*req.headers_mut() = req_headers;
let start_lat = opts.latency.then_some(clock.raw());
match sender.send_request(req).await {
Ok(res) => match discard_body(res).await {
Ok(StatusCode::OK) => statistics.inc_ok(rt_stats),
Ok(StatusCode::ACCEPTED) => statistics.inc_ok(rt_stats),
Ok(code) => statistics.set_http_status(code, rt_stats),
Err(ref err) => {
statistics.set_error(err.as_ref(), rt_stats);
total += 1;
continue 'connection;
}
},
Err(ref err) => {
statistics.set_error(err, rt_stats);
total += 1;
continue 'connection;
}
}
if let Some(start_lat) = start_lat
&& let Some(hist) = &mut statistics.latency
{
hist.record(clock.delta_as_nanos(start_lat, clock.raw()) / 1000)
.ok();
};
total += 1;
if should_stop(total, start, &opts) {
break 'connection;
}
if is_last {
conn_task.abort();
continue 'connection;
} else {
tokio::select! {
res = sender.ready() => {
if let Err(ref err) = res {
statistics.set_error(err, rt_stats);
continue 'connection;
}
}
_ = &mut conn_task => {
continue 'connection;
}
}
}
}
}
statistics
}
fn create_tool_request(arguments: &Arc<JsonObject>, opts: &Options) -> Option<Map<String, Value>> {
let required = arguments.get("required").and_then(|v| v.as_array())?;
let properties = arguments.get("properties").and_then(|v| v.as_object())?;
let mut generated_request_args = Map::new();
let mut rng = rand::thread_rng();
for required_arg in required {
let field_name = required_arg.as_str()?;
let field_schema = properties.get(field_name)?;
let value = generate_value_from_schema(field_schema, &mut rng, opts, 0);
generated_request_args.insert(field_name.to_string(), value);
}
Some(generated_request_args)
}
const MAX_RECURSION_DEPTH: usize = 10;
fn generate_value_from_schema<R: Rng>(
schema: &Value,
rng: &mut R,
opts: &Options,
depth: usize,
) -> Value {
if depth >= MAX_RECURSION_DEPTH {
return Value::Null;
}
if let Some(default) = schema.get("default") {
return default.clone();
}
let type_str = schema
.get("type")
.and_then(|t| t.as_str())
.unwrap_or("string");
match type_str {
"string" => {
let len = opts
.mcp_rand_string_len
.unwrap_or_else(|| rng.gen_range(5..20));
let s: String = (0..len)
.map(|_| rng.sample(rand::distributions::Alphanumeric) as char)
.collect();
Value::String(s)
}
"number" => Value::Number(rng.gen_range(-1000..1000i64).into()),
"integer" => Value::Number(rng.gen_range(0..1000).into()),
"boolean" => Value::Bool(rng.gen_bool(0.5)),
"array" => {
let len = rng.gen_range(1..4);
let items_schema = schema.get("items");
let items: Vec<Value> = (0..len)
.map(|_| {
items_schema
.map(|s| generate_value_from_schema(s, rng, opts, depth + 1))
.unwrap_or_else(|| generate_primitive_value(rng))
})
.collect();
Value::Array(items)
}
"object" => {
let mut obj = Map::new();
if let Some(properties) = schema.get("properties").and_then(|p| p.as_object()) {
if let Some(required) = schema.get("required").and_then(|r| r.as_array()) {
for required_field in required {
if let Some(field_name) = required_field.as_str() {
if let Some(field_schema) = properties.get(field_name) {
let value =
generate_value_from_schema(field_schema, rng, opts, depth + 1);
obj.insert(field_name.to_string(), value);
}
}
}
}
}
Value::Object(obj)
}
"null" => Value::Null,
_ => {
Value::String("".to_string())
}
}
}
fn generate_primitive_value<R: Rng>(rng: &mut R) -> Value {
match rng.gen_range(0..4) {
0 => Value::String("sample".to_string()),
1 => Value::Number(rng.gen_range(-100..100i64).into()),
2 => Value::Number(rng.gen_range(0..100).into()),
3 => Value::Bool(rng.gen_bool(0.5)),
_ => Value::Null,
}
}
async fn mcp_streamable_http_initialize<S>(
uri: &hyper::Uri,
base_headers: &http::HeaderMap,
sender: &mut S,
opts: &Options,
) -> Result<McpSetup, Box<dyn std::error::Error + Send + Sync>>
where
S: RequestSender<Full<Bytes>>,
{
let init_params = InitializeRequestParams::new(
ClientCapabilities::builder().enable_roots().build(),
Implementation::new("plumbrs".to_string(), env!("CARGO_PKG_VERSION").to_string()),
);
let init_request = InitializeRequest::new(init_params);
let init_jsonrpc = JsonRpcRequest {
jsonrpc: Default::default(),
id: NumberOrString::Number(1),
request: init_request,
};
let init_body = serde_json::to_vec(&init_jsonrpc)?;
let mut req = Request::new(Full::new(Bytes::from(init_body)));
*req.method_mut() = http::Method::POST;
*req.uri_mut() = uri.clone();
*req.headers_mut() = base_headers.clone();
let response = sender
.send_request(req)
.await
.map_err(|e| format!("initialize request failed: {e}"))?;
let session_id = response
.headers()
.get("mcp-session-id")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
if response.status() != StatusCode::OK {
return Err(format!(
"initialize request failed with status: {}",
response.status()
)
.into());
}
let content_type = response
.headers()
.get(http::header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.unwrap_or(MIME_APPLICATION_JSON)
.to_string();
let body_bytes = response
.into_body()
.collect()
.await
.map_err(|e| format!("failed to read initialize response body: {e}"))?
.to_bytes();
let json_body = if content_type.contains(MIME_TEXT_EVENT_STREAM) {
let body_str = String::from_utf8_lossy(&body_bytes);
extract_sse_data(&body_str)?
} else {
String::from_utf8_lossy(&body_bytes).to_string()
};
let _init_result: JsonRpcResponse<InitializeResult> = serde_json::from_str(&json_body)
.map_err(|e| format!("failed to parse initialize response: {e}"))?;
eprintln!(
"MCP Streamable HTTP: initialized successfully, session_id={:?}",
session_id
);
let mut headers_with_session = base_headers.clone();
if let Some(ref sid) = session_id {
headers_with_session.insert(
http::header::HeaderName::from_static("mcp-session-id"),
http::header::HeaderValue::from_str(sid)?,
);
}
let initialized_notif = InitializedNotification::default();
let notif_jsonrpc = rmcp::model::JsonRpcNotification {
jsonrpc: Default::default(),
notification: initialized_notif,
};
let initialized_body = serde_json::to_vec(¬if_jsonrpc)?;
let mut req = Request::new(Full::new(Bytes::from(initialized_body)));
*req.method_mut() = http::Method::POST;
*req.uri_mut() = uri.clone();
*req.headers_mut() = headers_with_session.clone();
sender
.ready()
.await
.map_err(|e| format!("sender not ready: {e}"))?;
let response = sender
.send_request(req)
.await
.map_err(|e| format!("initialized notification failed: {e}"))?;
if response.status() != StatusCode::OK
&& response.status() != StatusCode::ACCEPTED
&& response.status() != StatusCode::NO_CONTENT
{
return Err(format!(
"initialized notification failed with status: {}",
response.status()
)
.into());
}
let _ = response.into_body().collect().await;
let tools_list_request = ListToolsRequest::default();
let tools_list_jsonrpc = JsonRpcRequest {
jsonrpc: Default::default(),
id: NumberOrString::Number(2),
request: tools_list_request,
};
let tools_list_body = serde_json::to_vec(&tools_list_jsonrpc)?;
let mut req = Request::new(Full::new(Bytes::from(tools_list_body)));
*req.method_mut() = http::Method::POST;
*req.uri_mut() = uri.clone();
*req.headers_mut() = headers_with_session.clone();
sender
.ready()
.await
.map_err(|e| format!("sender not ready: {e}"))?;
let response = sender
.send_request(req)
.await
.map_err(|e| format!("tools/list request failed: {e}"))?;
if response.status() != StatusCode::OK {
return Err(format!(
"tools/list request failed with status: {}",
response.status()
)
.into());
}
let content_type = response
.headers()
.get(http::header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.unwrap_or(MIME_APPLICATION_JSON)
.to_string();
let body_bytes = response
.into_body()
.collect()
.await
.map_err(|e| format!("failed to read tools/list response body: {e}"))?
.to_bytes();
let json_body = if content_type.contains(MIME_TEXT_EVENT_STREAM) {
let body_str = String::from_utf8_lossy(&body_bytes);
extract_sse_data(&body_str)?
} else {
String::from_utf8_lossy(&body_bytes).to_string()
};
let tools_result: JsonRpcResponse<ListToolsResult> = serde_json::from_str(&json_body)
.map_err(|e| format!("failed to parse tools/list response: {e}"))?;
let tools = &tools_result.result.tools;
if tools.is_empty() {
return Err("no tools available from MCP server".into());
}
eprintln!(
"MCP Streamable HTTP: found {} tools: {:?}",
tools.len(),
tools.iter().map(|t| t.name.as_ref()).collect::<Vec<_>>()
);
let tool_bodies: Vec<Bytes> = tools
.iter()
.enumerate()
.map(|(idx, tool)| {
let call_params = if let Some(args) = create_tool_request(&tool.input_schema, opts) {
CallToolRequestParams::new(tool.name.clone()).with_arguments(args)
} else {
CallToolRequestParams::new(tool.name.clone())
};
let call_request = CallToolRequest::new(call_params);
let call_jsonrpc = JsonRpcRequest {
jsonrpc: Default::default(),
id: NumberOrString::Number((idx + 100) as i64),
request: call_request,
};
let json = serde_json::to_vec(&call_jsonrpc).unwrap_or_else(|e| {
fatal!(
3,
"failed to serialize tools/call request for {}: {e}",
tool.name
)
});
Bytes::from(json)
})
.collect();
Ok(McpSetup {
uri: uri.clone(),
tool_bodies,
sse_task: None,
session_id,
})
}
fn extract_sse_data(body: &str) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
let mut buffer = String::new();
let mut found = false;
for line in body.lines() {
if let Some(rest) = line.strip_prefix("data:") {
found = true;
let content = rest.strip_prefix(' ').unwrap_or(rest);
buffer.push_str(content);
buffer.push('\n');
}
}
if found {
Ok(buffer)
} else {
Err("no data field found in SSE response".into())
}
}
async fn read_sse_event(
body: &mut hyper::body::Incoming,
buffer: &mut String,
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
let mut event_data = String::new();
loop {
if let Some(idx) = buffer.find('\n') {
let line_full: String = buffer.drain(..=idx).collect();
let line = line_full.trim_end();
if line.is_empty() {
if !event_data.is_empty() {
return Ok(event_data);
}
continue;
}
if let Some(rest) = line.strip_prefix("data:") {
let content = rest.strip_prefix(' ').unwrap_or(rest);
event_data.push_str(content);
event_data.push('\n');
}
continue;
}
match body.frame().await {
Some(Ok(frame)) => {
if let Some(chunk) = frame.data_ref() {
buffer.push_str(&String::from_utf8_lossy(chunk));
}
}
Some(Err(e)) => return Err(format!("SSE read error: {e}").into()),
None => return Err("SSE stream ended".into()),
}
}
}
pub async fn mcp_sse_initialize<B>(uri: &str, opts: &Options, headers: &HeaderMap) -> McpSetup
where
B: HttpConnectionBuilder,
{
use crate::stats::{RealtimeStats, Statistics};
let base_uri: hyper::Uri = uri
.parse()
.unwrap_or_else(|e| fatal!(3, "invalid base uri: {e}"));
let host = base_uri
.host()
.unwrap_or_else(|| fatal!(3, "no host in uri"))
.to_owned();
let port = base_uri.port_u16().unwrap_or(80);
let endpoint: &'static str = build_conn_endpoint(&host, port);
let mut stats = Statistics::new(false);
let rt_stats = RealtimeStats::default();
let (mut sse_sender, sse_conn_task) =
B::build_connection::<Full<Bytes>>(endpoint, &mut stats, &rt_stats, opts)
.await
.unwrap_or_else(|| fatal!(3, "SSE connection failed"));
let mut sse_req_builder = Request::builder()
.method(http::Method::GET)
.uri(base_uri.clone())
.header(http::header::HOST, format!("{}:{}", host, port))
.header(http::header::ACCEPT, MIME_TEXT_EVENT_STREAM)
.header(http::header::CACHE_CONTROL, "no-cache");
for (key, value) in headers.iter() {
sse_req_builder = sse_req_builder.header(key, value);
}
let sse_req = sse_req_builder
.body(Full::new(Bytes::new()))
.unwrap_or_else(|e| fatal!(3, "failed to build SSE request: {e}"));
let sse_response = sse_sender
.send_request(sse_req)
.await
.unwrap_or_else(|e| fatal!(3, "SSE handshake request failed: {e}"));
let mut sse_body = sse_response.into_body();
let mut buffer = String::new();
let endpoint_data = read_sse_event(&mut sse_body, &mut buffer)
.await
.unwrap_or_else(|e| fatal!(3, "failed to read SSE handshake event: {e}"));
let new_path = endpoint_data.trim().to_string();
if new_path.is_empty() {
fatal!(3, "could not find endpoint in SSE handshake");
}
let new_uri = if new_path.starts_with("http://") || new_path.starts_with("https://") {
new_path
.parse::<hyper::Uri>()
.unwrap_or_else(|e| fatal!(3, "invalid uri from SSE: {e}"))
} else {
let mut parts = base_uri.clone().into_parts();
parts.path_and_query = Some(
new_path
.parse()
.unwrap_or_else(|e| fatal!(3, "invalid path from SSE: {e}")),
);
hyper::Uri::from_parts(parts).unwrap_or_else(|e| fatal!(3, "invalid new uri: {e}"))
};
let (mut post_sender, _) =
Http1::build_connection::<Full<Bytes>>(endpoint, &mut stats, &rt_stats, opts)
.await
.unwrap_or_else(|| fatal!(3, "POST connection failed"));
async fn send_post<S>(
uri: &hyper::Uri,
headers: &HeaderMap,
body: Vec<u8>,
sender: &mut S,
) -> http::Response<hyper::body::Incoming>
where
S: RequestSender<Full<Bytes>>,
{
let mut req_builder = Request::builder()
.method(http::Method::POST)
.uri(uri.clone())
.header(http::header::CONTENT_TYPE, MIME_APPLICATION_JSON);
for (key, value) in headers.iter() {
req_builder = req_builder.header(key, value);
}
let req = req_builder
.body(Full::new(Bytes::from(body)))
.unwrap_or_else(|e| fatal!(3, "failed to build POST request: {e}"));
sender
.send_request(req)
.await
.unwrap_or_else(|e| fatal!(3, "POST request failed: {e}"))
}
let init_params = InitializeRequestParams::new(
ClientCapabilities::builder().enable_roots().build(),
Implementation::new("plumbrs".to_string(), env!("CARGO_PKG_VERSION").to_string()),
);
let init_request = InitializeRequest::new(init_params);
let init_jsonrpc = JsonRpcRequest {
jsonrpc: Default::default(),
id: NumberOrString::Number(1),
request: init_request,
};
let init_body = serde_json::to_vec(&init_jsonrpc)
.unwrap_or_else(|e| fatal!(3, "failed to serialize initialize request: {e}"));
let _ = send_post(&new_uri, headers, init_body, &mut post_sender).await;
let init_response_body = read_sse_event(&mut sse_body, &mut buffer)
.await
.unwrap_or_else(|e| fatal!(3, "failed to read initialize response: {e}"));
let init_result: JsonRpcResponse<InitializeResult> = serde_json::from_str(&init_response_body)
.unwrap_or_else(|e| fatal!(3, "failed to parse initialize response: {e}"));
let _ = init_result.result;
eprintln!("MCP: initialized successfully");
let initialized_notif = InitializedNotification::default();
let notif_jsonrpc = rmcp::model::JsonRpcNotification {
jsonrpc: Default::default(),
notification: initialized_notif,
};
let initialized_body = serde_json::to_vec(¬if_jsonrpc)
.unwrap_or_else(|e| fatal!(3, "failed to serialize initialized notification: {e}"));
let _ = send_post(&new_uri, headers, initialized_body, &mut post_sender).await;
let tools_list_request = ListToolsRequest::default();
let tools_list_jsonrpc = JsonRpcRequest {
jsonrpc: Default::default(),
id: NumberOrString::Number(2),
request: tools_list_request,
};
let tools_list_body = serde_json::to_vec(&tools_list_jsonrpc)
.unwrap_or_else(|e| fatal!(3, "failed to serialize tools/list request: {e}"));
let _ = send_post(&new_uri, headers, tools_list_body, &mut post_sender).await;
let tools_response_body;
loop {
let data = read_sse_event(&mut sse_body, &mut buffer)
.await
.unwrap_or_else(|e| fatal!(3, "failed to read SSE event (tools/list): {e}"));
if let Ok(server_req) = serde_json::from_str::<serde_json::Value>(&data) {
if let Some(method) = server_req.get("method").and_then(|m| m.as_str()) {
if let Some(req_id) = server_req.get("id") {
if method == "roots/list" {
let roots_result = ListRootsResult::new(vec![]);
let roots_response = JsonRpcResponse {
jsonrpc: Default::default(),
id: req_id
.as_i64()
.map(NumberOrString::Number)
.or_else(|| {
req_id.as_str().map(|s| NumberOrString::String(s.into()))
})
.unwrap_or(NumberOrString::Number(0)),
result: roots_result,
};
let roots_body = serde_json::to_vec(&roots_response).unwrap_or_else(|e| {
fatal!(3, "failed to serialize roots/list response: {e}")
});
let _ = send_post(&new_uri, headers, roots_body, &mut post_sender).await;
continue; }
}
}
if server_req.get("method").is_none() {
tools_response_body = data;
break;
}
}
}
let tools_result: JsonRpcResponse<ListToolsResult> = serde_json::from_str(&tools_response_body)
.unwrap_or_else(|e| fatal!(3, "failed to parse tools/list response: {e}"));
let tools = &tools_result.result.tools;
if tools.is_empty() {
fatal!(3, "no tools available from MCP server");
}
eprintln!(
"MCP: found {} tools: {:?}",
tools.len(),
tools.iter().map(|t| t.name.as_ref()).collect::<Vec<_>>()
);
let tool_bodies: Vec<Bytes> = tools
.iter()
.enumerate()
.map(|(idx, tool)| {
let call_params = if let Some(args) = create_tool_request(&tool.input_schema, opts) {
CallToolRequestParams::new(tool.name.clone()).with_arguments(args)
} else {
CallToolRequestParams::new(tool.name.clone())
};
let call_request = CallToolRequest::new(call_params);
let call_jsonrpc = JsonRpcRequest {
jsonrpc: Default::default(),
id: NumberOrString::Number((idx + 100) as i64),
request: call_request,
};
let json = serde_json::to_vec(&call_jsonrpc).unwrap_or_else(|e| {
fatal!(
3,
"failed to serialize tools/call request for {}: {e}",
tool.name
)
});
Bytes::from(json)
})
.collect();
let sse_task = tokio::spawn(async move {
let _conn = sse_conn_task;
while let Some(frame) = sse_body.frame().await {
if let Err(e) = frame {
eprintln!("SSE: error reading frame: {e}");
}
}
eprintln!("SSE: connection closed.");
});
McpSetup {
uri: new_uri,
tool_bodies,
sse_task: Some(sse_task),
session_id: None,
}
}