use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicI64, Ordering};
use tokio::sync::{Mutex, Notify, RwLock, oneshot};
use bitrouter_core::api::mcp::gateway::McpClientRequestHandler;
use bitrouter_core::api::mcp::types::McpGatewayError;
use bitrouter_core::api::mcp::types::{
CallToolParams, CallToolResult, ClientCapabilities, ClientInfo, CreateMessageParams,
ElicitationCapability, ElicitationCreateParams, GetPromptParams, InitializeParams,
InitializeResult, JsonRpcId, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse,
ListPromptsResult, ListResourceTemplatesResult, ListResourcesResult, ListToolsParams,
ListToolsResult, McpGetPromptResult, McpPrompt, McpResource, McpResourceContent,
McpResourceTemplate, McpTool, McpToolCallResult, ReadResourceParams, ReadResourceResult,
SamplingCapability, SseJsonRpcMessage,
};
const PROTOCOL_VERSION: &str = "2025-03-26";
static REQUEST_ID: AtomicI64 = AtomicI64::new(1);
struct McpSession {
session_id: Option<String>,
protocol_version: Option<String>,
}
pub struct NotifyHandles {
pub tool: Arc<Notify>,
pub resource: Arc<Notify>,
pub prompt: Arc<Notify>,
}
pub struct McpHttpClient {
http: reqwest::Client,
url: String,
name: String,
session: Arc<RwLock<McpSession>>,
handler: Option<Arc<dyn McpClientRequestHandler>>,
pending: Arc<RwLock<HashMap<JsonRpcId, oneshot::Sender<JsonRpcResponse>>>>,
notify: Option<NotifyHandles>,
sse_task: Mutex<Option<tokio::task::JoinHandle<()>>>,
}
impl McpHttpClient {
pub fn new(
name: impl Into<String>,
url: impl Into<String>,
headers: &HashMap<String, String>,
handler: Option<Arc<dyn McpClientRequestHandler>>,
notify: Option<NotifyHandles>,
) -> Result<Self, McpGatewayError> {
let name = name.into();
let mut header_map = reqwest::header::HeaderMap::new();
for (k, v) in headers {
let header_name: reqwest::header::HeaderName =
k.parse().map_err(|e| McpGatewayError::UpstreamConnect {
name: name.clone(),
reason: format!("invalid header name '{k}': {e}"),
})?;
let header_value: reqwest::header::HeaderValue =
v.parse().map_err(|e| McpGatewayError::UpstreamConnect {
name: name.clone(),
reason: format!("invalid header value for '{k}': {e}"),
})?;
header_map.insert(header_name, header_value);
}
let http = reqwest::Client::builder()
.default_headers(header_map)
.build()
.map_err(|e| McpGatewayError::UpstreamConnect {
name: name.clone(),
reason: format!("failed to build HTTP client: {e}"),
})?;
Ok(Self {
http,
url: url.into(),
name,
session: Arc::new(RwLock::new(McpSession {
session_id: None,
protocol_version: None,
})),
handler,
pending: Arc::new(RwLock::new(HashMap::new())),
notify,
sse_task: Mutex::new(None),
})
}
async fn rpc_call(
&self,
method: &str,
params: serde_json::Value,
) -> Result<serde_json::Value, McpGatewayError> {
let (value, _headers) = self.rpc_call_with_headers(method, params).await?;
Ok(value)
}
async fn rpc_call_with_headers(
&self,
method: &str,
params: serde_json::Value,
) -> Result<(serde_json::Value, reqwest::header::HeaderMap), McpGatewayError> {
let id = JsonRpcId::Number(REQUEST_ID.fetch_add(1, Ordering::Relaxed));
let request = JsonRpcRequest {
jsonrpc: "2.0".to_owned(),
id: id.clone(),
method: method.to_owned(),
params: Some(params),
};
let (tx, rx) = oneshot::channel();
{
let mut pending = self.pending.write().await;
pending.insert(id.clone(), tx);
}
let session = self.session.read().await;
let mut builder = self
.http
.post(&self.url)
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream");
if let Some(ref sid) = session.session_id {
builder = builder.header("Mcp-Session-Id", sid);
}
if let Some(ref version) = session.protocol_version {
builder = builder.header("MCP-Protocol-Version", version);
}
drop(session);
let response =
builder
.json(&request)
.send()
.await
.map_err(|e| McpGatewayError::HttpTransport {
name: self.name.clone(),
reason: format!("failed to send {method} request: {e}"),
})?;
let status = response.status();
if status.as_u16() == 404 {
self.remove_pending(&id).await;
return Err(McpGatewayError::SessionExpired {
name: self.name.clone(),
});
}
if !status.is_success() {
self.remove_pending(&id).await;
let body = response.text().await.unwrap_or_default();
return Err(McpGatewayError::HttpTransport {
name: self.name.clone(),
reason: format!("HTTP {status} for {method}: {body}"),
});
}
let response_headers = response.headers().clone();
let content_type = response
.headers()
.get("content-type")
.and_then(|v| v.to_str().ok())
.unwrap_or("")
.to_owned();
if content_type.contains("text/event-stream") {
let body = match response.text().await {
Ok(b) => b,
Err(e) => {
self.remove_pending(&id).await;
return Err(McpGatewayError::HttpTransport {
name: self.name.clone(),
reason: format!("failed to read SSE body for {method}: {e}"),
});
}
};
self.process_sse_events(&body).await;
} else {
let rpc_response = match response.json::<JsonRpcResponse>().await {
Ok(r) => r,
Err(e) => {
self.remove_pending(&id).await;
return Err(McpGatewayError::HttpTransport {
name: self.name.clone(),
reason: format!("failed to parse JSON response for {method}: {e}"),
});
}
};
self.resolve_pending(rpc_response).await;
}
let rpc_response = rx.await.map_err(|_| McpGatewayError::HttpTransport {
name: self.name.clone(),
reason: format!("{method} response channel closed (server may have disconnected)"),
})?;
if let Some(error) = rpc_response.error {
return Err(McpGatewayError::UpstreamCall {
name: self.name.clone(),
reason: format!("{method} error ({}): {}", error.code, error.message),
});
}
let result = rpc_response.result.unwrap_or(serde_json::Value::Null);
Ok((result, response_headers))
}
async fn rpc_notify(
&self,
method: &str,
params: Option<serde_json::Value>,
) -> Result<(), McpGatewayError> {
let notification = JsonRpcNotification {
jsonrpc: "2.0".to_owned(),
method: method.to_owned(),
params,
};
let session = self.session.read().await;
let mut builder = self
.http
.post(&self.url)
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream");
if let Some(ref sid) = session.session_id {
builder = builder.header("Mcp-Session-Id", sid);
}
if let Some(ref version) = session.protocol_version {
builder = builder.header("MCP-Protocol-Version", version);
}
drop(session);
let response = builder.json(¬ification).send().await.map_err(|e| {
McpGatewayError::HttpTransport {
name: self.name.clone(),
reason: format!("failed to send notification {method}: {e}"),
}
})?;
let status = response.status();
if !status.is_success() && status.as_u16() != 202 {
let body = response.text().await.unwrap_or_default();
return Err(McpGatewayError::HttpTransport {
name: self.name.clone(),
reason: format!("notification {method} returned HTTP {status}: {body}"),
});
}
Ok(())
}
fn call_error(&self, reason: String) -> McpGatewayError {
McpGatewayError::UpstreamCall {
name: self.name.clone(),
reason,
}
}
async fn process_sse_events(&self, body: &str) {
for event_data in parse_sse_events(body) {
match serde_json::from_str::<SseJsonRpcMessage>(&event_data) {
Ok(msg) => self.dispatch_sse_message(msg).await,
Err(e) => {
tracing::warn!(
upstream = %self.name,
error = %e,
"failed to parse SSE event"
);
}
}
}
}
async fn dispatch_sse_message(&self, msg: SseJsonRpcMessage) {
match msg {
SseJsonRpcMessage::Response(resp) => {
self.resolve_pending(resp).await;
}
SseJsonRpcMessage::Request(req) => {
let response =
dispatch_server_request(&self.name, self.handler.as_deref(), req).await;
self.post_json_rpc_response(&response).await;
}
SseJsonRpcMessage::Notification(notif) => {
self.handle_notification(¬if);
}
}
}
async fn resolve_pending(&self, response: JsonRpcResponse) {
let mut pending = self.pending.write().await;
if let Some(tx) = pending.remove(&response.id) {
let _ = tx.send(response);
} else {
tracing::debug!(
upstream = %self.name,
id = ?response.id,
"received response for unknown request ID"
);
}
}
async fn remove_pending(&self, id: &JsonRpcId) {
let mut pending = self.pending.write().await;
pending.remove(id);
}
fn handle_notification(&self, notif: &JsonRpcNotification) {
if let Some(ref handles) = self.notify {
match notif.method.as_str() {
"notifications/tools/list_changed" => handles.tool.notify_one(),
"notifications/resources/list_changed" => handles.resource.notify_one(),
"notifications/prompts/list_changed" => handles.prompt.notify_one(),
_ => {
tracing::trace!(
upstream = %self.name,
method = %notif.method,
"ignoring unhandled notification"
);
}
}
}
}
async fn post_json_rpc_response(&self, response: &JsonRpcResponse) {
let session = self.session.read().await;
let mut builder = self
.http
.post(&self.url)
.header("Content-Type", "application/json");
if let Some(ref sid) = session.session_id {
builder = builder.header("Mcp-Session-Id", sid);
}
if let Some(ref version) = session.protocol_version {
builder = builder.header("MCP-Protocol-Version", version);
}
drop(session);
match builder.json(response).send().await {
Ok(resp) if resp.status().is_success() || resp.status().as_u16() == 202 => {}
Ok(resp) => {
tracing::warn!(
upstream = %self.name,
status = %resp.status(),
"unexpected status when posting response to server"
);
}
Err(e) => {
tracing::warn!(
upstream = %self.name,
error = %e,
"failed to post response to server"
);
}
}
}
async fn spawn_sse_listener(&self) {
let session = self.session.read().await;
let Some(ref session_id) = session.session_id else {
return;
};
let http = self.http.clone();
let url = self.url.clone();
let session_id = session_id.clone();
let protocol_version = session.protocol_version.clone();
drop(session);
let ctx = SseListenerContext {
name: self.name.clone(),
pending: Arc::clone(&self.pending),
handler: self.handler.clone(),
notify: self.notify.as_ref().map(|h| NotifyHandles {
tool: Arc::clone(&h.tool),
resource: Arc::clone(&h.resource),
prompt: Arc::clone(&h.prompt),
}),
http: self.http.clone(),
url: self.url.clone(),
session: Arc::clone(&self.session),
};
let handle = tokio::spawn(async move {
let mut builder = http
.get(&url)
.header("Accept", "text/event-stream")
.header("Mcp-Session-Id", &session_id);
if let Some(ref version) = protocol_version {
builder = builder.header("MCP-Protocol-Version", version);
}
let response = match builder.send().await {
Ok(r) => r,
Err(e) => {
tracing::debug!(
upstream = %ctx.name,
error = %e,
"failed to open SSE stream (server may not support GET streaming)"
);
return;
}
};
if !response.status().is_success() {
tracing::debug!(
upstream = %ctx.name,
status = %response.status(),
"SSE GET stream returned non-success status"
);
return;
}
use tokio_stream::StreamExt;
let mut byte_stream = response.bytes_stream();
let mut current_data = String::new();
let mut leftover = String::new();
loop {
let chunk = byte_stream.next().await;
let bytes = match chunk {
Some(Ok(b)) => b,
Some(Err(e)) => {
tracing::debug!(
upstream = %ctx.name,
error = %e,
"SSE stream read error"
);
break;
}
None => {
if !current_data.is_empty()
&& let Ok(msg) =
serde_json::from_str::<SseJsonRpcMessage>(¤t_data)
{
ctx.dispatch(msg).await;
}
break;
}
};
let text = match std::str::from_utf8(&bytes) {
Ok(s) => s,
Err(_) => continue,
};
leftover.push_str(text);
while let Some(newline_pos) = leftover.find('\n') {
let line: String = leftover[..newline_pos].trim_end_matches('\r').to_owned();
leftover = leftover[newline_pos + 1..].to_owned();
if let Some(data) = line.strip_prefix("data:") {
let data = data.trim_start();
if !current_data.is_empty() {
current_data.push('\n');
}
current_data.push_str(data);
} else if line.is_empty() && !current_data.is_empty() {
let event_data = std::mem::take(&mut current_data);
match serde_json::from_str::<SseJsonRpcMessage>(&event_data) {
Ok(msg) => ctx.dispatch(msg).await,
Err(e) => {
tracing::warn!(
upstream = %ctx.name,
error = %e,
"failed to parse SSE event from GET stream"
);
}
}
}
}
}
});
let mut task = self.sse_task.lock().await;
*task = Some(handle);
}
}
async fn dispatch_server_request(
server_name: &str,
handler: Option<&dyn McpClientRequestHandler>,
request: JsonRpcRequest,
) -> JsonRpcResponse {
let Some(handler) = handler else {
return JsonRpcResponse::error(
request.id,
-32601,
"server→client requests are not supported".to_owned(),
None,
);
};
let params_value = request.params.unwrap_or(serde_json::Value::Null);
match request.method.as_str() {
"sampling/createMessage" => {
match serde_json::from_value::<CreateMessageParams>(params_value) {
Ok(params) => match handler.handle_sampling(server_name, params).await {
Ok(result) => match serde_json::to_value(&result) {
Ok(v) => JsonRpcResponse::success(request.id, v),
Err(e) => JsonRpcResponse::error(
request.id,
-32603,
format!("failed to serialize sampling result: {e}"),
None,
),
},
Err(e) => JsonRpcResponse::error(request.id, e.code, e.message, e.data),
},
Err(e) => JsonRpcResponse::error(
request.id,
-32602,
format!("invalid sampling/createMessage params: {e}"),
None,
),
}
}
"elicitation/create" => {
match serde_json::from_value::<ElicitationCreateParams>(params_value) {
Ok(params) => match handler.handle_elicitation(server_name, params).await {
Ok(result) => match serde_json::to_value(&result) {
Ok(v) => JsonRpcResponse::success(request.id, v),
Err(e) => JsonRpcResponse::error(
request.id,
-32603,
format!("failed to serialize elicitation result: {e}"),
None,
),
},
Err(e) => JsonRpcResponse::error(request.id, e.code, e.message, e.data),
},
Err(e) => JsonRpcResponse::error(
request.id,
-32602,
format!("invalid elicitation/create params: {e}"),
None,
),
}
}
other => {
JsonRpcResponse::error(request.id, -32601, format!("unknown method: {other}"), None)
}
}
}
async fn post_response_to_server(
http: &reqwest::Client,
url: &str,
session: &RwLock<McpSession>,
name: &str,
response: &JsonRpcResponse,
) {
let session = session.read().await;
let mut builder = http.post(url).header("Content-Type", "application/json");
if let Some(ref sid) = session.session_id {
builder = builder.header("Mcp-Session-Id", sid);
}
if let Some(ref version) = session.protocol_version {
builder = builder.header("MCP-Protocol-Version", version);
}
drop(session);
match builder.json(response).send().await {
Ok(resp) if resp.status().is_success() || resp.status().as_u16() == 202 => {}
Ok(resp) => {
tracing::warn!(
upstream = %name,
status = %resp.status(),
"unexpected status when posting response to server"
);
}
Err(e) => {
tracing::warn!(
upstream = %name,
error = %e,
"failed to post response to server"
);
}
}
}
struct SseListenerContext {
name: String,
pending: Arc<RwLock<HashMap<JsonRpcId, oneshot::Sender<JsonRpcResponse>>>>,
handler: Option<Arc<dyn McpClientRequestHandler>>,
notify: Option<NotifyHandles>,
http: reqwest::Client,
url: String,
session: Arc<RwLock<McpSession>>,
}
impl SseListenerContext {
async fn dispatch(&self, msg: SseJsonRpcMessage) {
match msg {
SseJsonRpcMessage::Response(resp) => {
let mut p = self.pending.write().await;
if let Some(tx) = p.remove(&resp.id) {
let _ = tx.send(resp);
}
}
SseJsonRpcMessage::Request(req) => {
let response =
dispatch_server_request(&self.name, self.handler.as_deref(), req).await;
post_response_to_server(
&self.http,
&self.url,
&self.session,
&self.name,
&response,
)
.await;
}
SseJsonRpcMessage::Notification(notif) => {
if let Some(handles) = &self.notify {
match notif.method.as_str() {
"notifications/tools/list_changed" => handles.tool.notify_one(),
"notifications/resources/list_changed" => handles.resource.notify_one(),
"notifications/prompts/list_changed" => handles.prompt.notify_one(),
_ => {}
}
}
}
}
}
}
impl super::McpTransport for McpHttpClient {
async fn initialize(&self) -> Result<InitializeResult, McpGatewayError> {
let params = InitializeParams {
protocol_version: PROTOCOL_VERSION.to_owned(),
capabilities: ClientCapabilities {
sampling: self.handler.as_ref().map(|_| SamplingCapability::default()),
elicitation: self
.handler
.as_ref()
.map(|_| ElicitationCapability::default()),
},
client_info: ClientInfo {
name: "bitrouter".to_owned(),
version: Some(env!("CARGO_PKG_VERSION").to_owned()),
},
};
let params_value =
serde_json::to_value(¶ms).map_err(|e| McpGatewayError::UpstreamConnect {
name: self.name.clone(),
reason: format!("failed to serialize initialize params: {e}"),
})?;
let (result_value, response_headers) = self
.rpc_call_with_headers("initialize", params_value)
.await?;
if let Some(session_id) = response_headers
.get("mcp-session-id")
.and_then(|v| v.to_str().ok())
{
let mut session = self.session.write().await;
session.session_id = Some(session_id.to_owned());
}
let init_result: InitializeResult =
serde_json::from_value(result_value).map_err(|e| McpGatewayError::UpstreamConnect {
name: self.name.clone(),
reason: format!("failed to parse initialize result: {e}"),
})?;
{
let mut session = self.session.write().await;
session.protocol_version = Some(init_result.protocol_version.clone());
}
self.rpc_notify("notifications/initialized", None).await?;
self.spawn_sse_listener().await;
Ok(init_result)
}
async fn terminate(&self) {
{
let mut task = self.sse_task.lock().await;
if let Some(handle) = task.take() {
handle.abort();
}
}
let session = self.session.read().await;
let mut builder = self.http.delete(&self.url);
if let Some(ref sid) = session.session_id {
builder = builder.header("Mcp-Session-Id", sid);
}
let _ = builder.send().await;
}
async fn list_tools(&self) -> Result<Vec<McpTool>, McpGatewayError> {
let mut all = Vec::new();
let mut cursor: Option<String> = None;
loop {
let params = ListToolsParams {
cursor: cursor.clone(),
};
let value = self
.rpc_call(
"tools/list",
serde_json::to_value(¶ms).map_err(|e| {
self.call_error(format!("failed to serialize tools/list params: {e}"))
})?,
)
.await?;
let result: ListToolsResult = serde_json::from_value(value)
.map_err(|e| self.call_error(format!("failed to parse tools/list result: {e}")))?;
all.extend(result.tools);
cursor = result.next_cursor;
if cursor.is_none() {
break;
}
}
Ok(all)
}
async fn call_tool(
&self,
name: &str,
arguments: Option<serde_json::Map<String, serde_json::Value>>,
) -> Result<McpToolCallResult, McpGatewayError> {
let params = CallToolParams {
name: name.to_owned(),
arguments,
};
let value = self
.rpc_call(
"tools/call",
serde_json::to_value(¶ms).map_err(|e| {
self.call_error(format!("failed to serialize tools/call params: {e}"))
})?,
)
.await?;
let result: CallToolResult = serde_json::from_value(value)
.map_err(|e| self.call_error(format!("failed to parse tools/call result: {e}")))?;
Ok(result)
}
async fn list_resources(&self) -> Result<Vec<McpResource>, McpGatewayError> {
let mut all = Vec::new();
let mut cursor: Option<String> = None;
loop {
let params = serde_json::json!({ "cursor": cursor });
let value = self.rpc_call("resources/list", params).await?;
let result: ListResourcesResult = serde_json::from_value(value).map_err(|e| {
self.call_error(format!("failed to parse resources/list result: {e}"))
})?;
all.extend(result.resources);
cursor = result.next_cursor;
if cursor.is_none() {
break;
}
}
Ok(all)
}
async fn read_resource(&self, uri: &str) -> Result<Vec<McpResourceContent>, McpGatewayError> {
let params = ReadResourceParams {
uri: uri.to_owned(),
};
let value = self
.rpc_call(
"resources/read",
serde_json::to_value(¶ms).map_err(|e| {
self.call_error(format!("failed to serialize resources/read params: {e}"))
})?,
)
.await?;
let result: ReadResourceResult = serde_json::from_value(value)
.map_err(|e| self.call_error(format!("failed to parse resources/read result: {e}")))?;
Ok(result.contents)
}
async fn list_resource_templates(&self) -> Result<Vec<McpResourceTemplate>, McpGatewayError> {
let mut all = Vec::new();
let mut cursor: Option<String> = None;
loop {
let params = serde_json::json!({ "cursor": cursor });
let value = self.rpc_call("resources/templates/list", params).await?;
let result: ListResourceTemplatesResult =
serde_json::from_value(value).map_err(|e| {
self.call_error(format!(
"failed to parse resources/templates/list result: {e}"
))
})?;
all.extend(result.resource_templates);
cursor = result.next_cursor;
if cursor.is_none() {
break;
}
}
Ok(all)
}
async fn list_prompts(&self) -> Result<Vec<McpPrompt>, McpGatewayError> {
let mut all = Vec::new();
let mut cursor: Option<String> = None;
loop {
let params = serde_json::json!({ "cursor": cursor });
let value = self.rpc_call("prompts/list", params).await?;
let result: ListPromptsResult = serde_json::from_value(value).map_err(|e| {
self.call_error(format!("failed to parse prompts/list result: {e}"))
})?;
all.extend(result.prompts);
cursor = result.next_cursor;
if cursor.is_none() {
break;
}
}
Ok(all)
}
async fn get_prompt(
&self,
name: &str,
arguments: Option<HashMap<String, String>>,
) -> Result<McpGetPromptResult, McpGatewayError> {
let params = GetPromptParams {
name: name.to_owned(),
arguments,
};
let value = self
.rpc_call(
"prompts/get",
serde_json::to_value(¶ms).map_err(|e| {
self.call_error(format!("failed to serialize prompts/get params: {e}"))
})?,
)
.await?;
let result: McpGetPromptResult = serde_json::from_value(value)
.map_err(|e| self.call_error(format!("failed to parse prompts/get result: {e}")))?;
Ok(result)
}
}
fn parse_sse_events(body: &str) -> Vec<String> {
let mut events = Vec::new();
let mut current_data = String::new();
for line in body.lines() {
if let Some(data) = line.strip_prefix("data:") {
let data = data.trim_start();
if !current_data.is_empty() {
current_data.push('\n');
}
current_data.push_str(data);
} else if line.is_empty() {
if !current_data.is_empty() {
events.push(std::mem::take(&mut current_data));
}
}
}
if !current_data.is_empty() {
events.push(current_data);
}
events
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_sse_simple_json_response() {
let body = "data: {\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"tools\":[]}}\n\n";
let events = parse_sse_events(body);
assert_eq!(events.len(), 1);
let resp: JsonRpcResponse =
serde_json::from_str(&events[0]).expect("should parse as response");
assert_eq!(resp.id, JsonRpcId::Number(1));
assert!(resp.result.is_some());
}
#[test]
fn parse_sse_multiple_events() {
let body = "\
data: {\"jsonrpc\":\"2.0\",\"method\":\"notifications/tools/list_changed\"}\n\
\n\
data: {\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"ok\":true}}\n\
\n";
let events = parse_sse_events(body);
assert_eq!(events.len(), 2);
}
#[test]
fn parse_sse_with_event_and_id_fields() {
let body = "\
event: message\n\
id: evt-1\n\
data: {\"jsonrpc\":\"2.0\",\"id\":5,\"result\":{}}\n\
\n";
let events = parse_sse_events(body);
assert_eq!(events.len(), 1);
let resp: JsonRpcResponse =
serde_json::from_str(&events[0]).expect("should parse as response");
assert_eq!(resp.id, JsonRpcId::Number(5));
}
#[test]
fn parse_sse_no_trailing_newline() {
let body = "data: {\"jsonrpc\":\"2.0\",\"id\":1,\"result\":null}";
let events = parse_sse_events(body);
assert_eq!(events.len(), 1);
}
#[test]
fn parse_sse_empty_body() {
let events = parse_sse_events("");
assert!(events.is_empty());
}
#[test]
fn request_id_increments() {
let id1 = REQUEST_ID.fetch_add(1, Ordering::Relaxed);
let id2 = REQUEST_ID.fetch_add(1, Ordering::Relaxed);
assert!(id2 > id1);
}
#[test]
fn sse_message_discriminates_response() {
let json = r#"{"jsonrpc":"2.0","id":1,"result":{"tools":[]}}"#;
let msg: SseJsonRpcMessage = serde_json::from_str(json).expect("parse");
assert!(matches!(msg, SseJsonRpcMessage::Response(_)));
}
#[test]
fn sse_message_discriminates_request() {
let json = r#"{"jsonrpc":"2.0","id":2,"method":"sampling/createMessage","params":{"messages":[],"maxTokens":100}}"#;
let msg: SseJsonRpcMessage = serde_json::from_str(json).expect("parse");
assert!(matches!(msg, SseJsonRpcMessage::Request(_)));
}
#[test]
fn sse_message_discriminates_notification() {
let json = r#"{"jsonrpc":"2.0","method":"notifications/tools/list_changed"}"#;
let msg: SseJsonRpcMessage = serde_json::from_str(json).expect("parse");
assert!(matches!(msg, SseJsonRpcMessage::Notification(_)));
}
}