use anyhow::Result;
use futures_util::{SinkExt, StreamExt};
use serde_json::Value;
use tokio_tungstenite::{connect_async, tungstenite::Message};
pub struct McpHttpClient {
base_url: String,
client: reqwest::Client,
}
impl McpHttpClient {
pub fn new(base_url: String) -> Self {
Self {
base_url,
client: reqwest::Client::new(),
}
}
pub async fn call_capability(&self, method: &str, params: Value) -> Result<Value> {
let url = format!("{}/mcp/call", self.base_url);
let payload = serde_json::json!({
"method": method,
"params": params
});
let response = self.client.post(&url).json(&payload).send().await?;
let result: Value = response.json().await?;
Ok(result)
}
pub async fn list_capabilities(&self) -> Result<Value> {
let url = format!("{}/mcp/capabilities/list", self.base_url);
let response = self.client.get(&url).send().await?;
let result: Value = response.json().await?;
Ok(result)
}
pub async fn discover_capabilities(&self, requested: Option<Vec<String>>) -> Result<Value> {
let url = format!("{}/mcp/capabilities/discover", self.base_url);
let payload = match requested {
Some(req) => serde_json::json!({ "requested": req }),
None => serde_json::json!({}),
};
let response = self.client.post(&url).json(&payload).send().await?;
let result: Value = response.json().await?;
Ok(result)
}
pub async fn ping(&self) -> Result<Value> {
let url = format!("{}/mcp/ping", self.base_url);
let response = self.client.get(&url).send().await?;
let result: Value = response.json().await?;
Ok(result)
}
}
pub struct McpWebSocketClient {
ws_url: String,
}
impl McpWebSocketClient {
pub fn new(ws_url: String) -> Self {
Self { ws_url }
}
pub async fn call_capability(&self, method: &str, params: Value) -> Result<Value> {
let (mut ws_stream, _) = connect_async(&self.ws_url).await?;
let request_id = uuid::Uuid::new_v4().to_string();
let request = serde_json::json!({
"id": request_id,
"method": method,
"params": params
});
ws_stream.send(Message::Text(request.to_string())).await?;
if let Some(msg) = ws_stream.next().await {
match msg? {
Message::Text(text) => {
let response: Value = serde_json::from_str(&text)?;
if let Some(error_obj) = response.get("error") {
return Err(anyhow::anyhow!("MCP Error: {:?}", error_obj));
}
if let Some(result) = response.get("result") {
Ok(result.clone())
} else {
Ok(response)
}
}
Message::Close(_) => Err(anyhow::anyhow!("Connection closed by server")),
_ => Err(anyhow::anyhow!("Unexpected message type")),
}
} else {
Err(anyhow::anyhow!("No response received"))
}
}
pub async fn connect_and_listen<F>(&self, mut handler: F) -> Result<()>
where
F: FnMut(Value) -> Result<()> + Send + 'static,
{
let (mut ws_stream, _) = connect_async(&self.ws_url).await?;
while let Some(msg) = ws_stream.next().await {
match msg? {
Message::Text(text) => {
let value: Value = serde_json::from_str(&text)?;
handler(value)?;
}
Message::Close(frame) => {
if let Some(close_frame) = frame {
println!("Connection closed: {}", close_frame.reason);
} else {
println!("Connection closed by server");
}
break;
}
_ => {
}
}
}
Ok(())
}
}
pub struct McpSseClient {
sse_url: String,
client: reqwest::Client,
}
impl McpSseClient {
pub fn new(sse_url: String) -> Self {
Self {
sse_url,
client: reqwest::Client::new(),
}
}
pub async fn subscribe_events<F>(&self, mut handler: F) -> Result<()>
where
F: FnMut(Value) -> Result<()> + Send + 'static,
{
use eventsource_stream::Eventsource;
let response = self.client.get(&self.sse_url).send().await?;
let stream = response.bytes_stream().eventsource();
tokio::pin!(stream);
while let Some(event_result) = stream.next().await {
match event_result {
Ok(event) => {
let data: Value = serde_json::from_str(&event.data)?;
handler(data)?;
}
Err(e) => {
eprintln!("SSE Error: {}", e);
break;
}
}
}
Ok(())
}
}
#[derive(Default)]
pub struct McpStdioClient;
impl McpStdioClient {
pub fn new() -> Self {
Self
}
pub fn call_capability_sync(&self, method: &str, params: Value) -> Result<Value> {
let request = serde_json::json!({
"method": method,
"params": params
});
println!("{}", request);
let mut input = String::new();
std::io::stdin().read_line(&mut input)?;
let response: Value = serde_json::from_str(input.trim())?;
if let Some(error_obj) = response.get("error") {
return Err(anyhow::anyhow!("MCP Error: {:?}", error_obj));
}
if let Some(result) = response.get("result") {
Ok(result.clone())
} else {
Ok(response)
}
}
}
pub enum McpTransport {
Http(McpHttpClient),
WebSocket(McpWebSocketClient),
Sse(McpSseClient),
Stdio(McpStdioClient),
}
impl McpTransport {
pub fn new_http(base_url: String) -> Self {
McpTransport::Http(McpHttpClient::new(base_url))
}
pub fn new_websocket(ws_url: String) -> Self {
McpTransport::WebSocket(McpWebSocketClient::new(ws_url))
}
pub fn new_sse(sse_url: String) -> Self {
McpTransport::Sse(McpSseClient::new(sse_url))
}
pub fn new_stdio() -> Self {
McpTransport::Stdio(McpStdioClient::new())
}
pub async fn call_capability(&self, method: &str, params: Value) -> Result<Value> {
match self {
McpTransport::Http(client) => client.call_capability(method, params).await,
McpTransport::WebSocket(client) => client.call_capability(method, params).await,
McpTransport::Sse(_) => Err(anyhow::anyhow!(
"SSE transport doesn't support direct calls"
)),
McpTransport::Stdio(client) => client.call_capability_sync(method, params),
}
}
pub async fn list_capabilities(&self) -> Result<Value> {
match self {
McpTransport::Http(client) => client.list_capabilities().await,
McpTransport::WebSocket(_) => Err(anyhow::anyhow!(
"WebSocket transport doesn't support listing capabilities directly"
)),
McpTransport::Sse(_) => Err(anyhow::anyhow!(
"SSE transport doesn't support listing capabilities"
)),
McpTransport::Stdio(_) => Err(anyhow::anyhow!(
"Stdio transport doesn't support listing capabilities"
)),
}
}
pub async fn discover_capabilities(&self, requested: Option<Vec<String>>) -> Result<Value> {
match self {
McpTransport::Http(client) => client.discover_capabilities(requested).await,
McpTransport::WebSocket(_) => Err(anyhow::anyhow!(
"WebSocket transport doesn't support discovery directly"
)),
McpTransport::Sse(_) => Err(anyhow::anyhow!("SSE transport doesn't support discovery")),
McpTransport::Stdio(_) => {
Err(anyhow::anyhow!("Stdio transport doesn't support discovery"))
}
}
}
pub async fn ping(&self) -> Result<Value> {
match self {
McpTransport::Http(client) => client.ping().await,
_ => Err(anyhow::anyhow!("Ping only supported via HTTP transport")),
}
}
}