use super::types::*;
use anyhow::Result;
use async_trait::async_trait;
use serde_json::Value;
use std::io::{BufRead, Write};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::sync::mpsc;
use tracing::{debug, error, trace, warn};
#[async_trait]
pub trait Transport: Send + Sync {
async fn send_request(&self, request: JsonRpcRequest) -> Result<()>;
async fn send_response(&self, response: JsonRpcResponse) -> Result<()>;
async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()>;
async fn receive(&self) -> Result<Option<McpMessage>>;
async fn close(&self) -> Result<()>;
}
#[derive(Debug, Clone)]
pub enum McpMessage {
Request(JsonRpcRequest),
Response(JsonRpcResponse),
Notification(JsonRpcNotification),
}
impl McpMessage {
pub fn from_json(value: Value) -> Result<Self> {
if value.get("id").is_some() {
if value.get("method").is_some() {
let request: JsonRpcRequest = serde_json::from_value(value)?;
Ok(McpMessage::Request(request))
} else {
let response: JsonRpcResponse = serde_json::from_value(value)?;
Ok(McpMessage::Response(response))
}
} else {
let notification: JsonRpcNotification = serde_json::from_value(value)?;
Ok(McpMessage::Notification(notification))
}
}
}
pub struct StdioTransport {
#[allow(dead_code)]
tx: mpsc::Sender<String>,
rx: tokio::sync::Mutex<mpsc::Receiver<String>>,
}
#[derive(Debug, Default, Clone)]
pub struct NullTransport;
impl NullTransport {
pub fn new() -> Self {
Self
}
}
impl Default for StdioTransport {
fn default() -> Self {
Self::new()
}
}
impl StdioTransport {
pub fn new() -> Self {
let (write_tx, mut write_rx) = mpsc::channel::<String>(100);
let (read_tx, read_rx) = mpsc::channel::<String>(100);
std::thread::spawn(move || {
let mut stdout = std::io::stdout().lock();
while let Some(msg) = write_rx.blocking_recv() {
trace!("MCP TX: {}", msg);
if let Err(e) = writeln!(stdout, "{}", msg) {
error!("Failed to write to stdout: {}", e);
break;
}
if let Err(e) = stdout.flush() {
error!("Failed to flush stdout: {}", e);
break;
}
}
});
std::thread::spawn(move || {
let stdin = std::io::stdin();
let reader = stdin.lock();
for line in reader.lines() {
match line {
Ok(msg) if !msg.is_empty() => {
trace!("MCP RX: {}", msg);
if read_tx.blocking_send(msg).is_err() {
break;
}
}
Ok(_) => continue, Err(e) => {
error!("Failed to read from stdin: {}", e);
break;
}
}
}
});
Self {
tx: write_tx,
rx: tokio::sync::Mutex::new(read_rx),
}
}
async fn send_json(&self, value: Value) -> Result<()> {
let json = serde_json::to_string(&value)?;
self.tx.send(json).await?;
Ok(())
}
}
#[async_trait]
impl Transport for StdioTransport {
async fn send_request(&self, request: JsonRpcRequest) -> Result<()> {
self.send_json(serde_json::to_value(&request)?).await
}
async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
self.send_json(serde_json::to_value(&response)?).await
}
async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()> {
self.send_json(serde_json::to_value(¬ification)?).await
}
async fn receive(&self) -> Result<Option<McpMessage>> {
let mut rx = self.rx.lock().await;
match rx.recv().await {
Some(line) => {
let value: Value = serde_json::from_str(&line)?;
let msg = McpMessage::from_json(value)?;
Ok(Some(msg))
}
None => Ok(None),
}
}
async fn close(&self) -> Result<()> {
Ok(())
}
}
#[async_trait]
impl Transport for NullTransport {
async fn send_request(&self, _request: JsonRpcRequest) -> Result<()> {
Ok(())
}
async fn send_response(&self, _response: JsonRpcResponse) -> Result<()> {
Ok(())
}
async fn send_notification(&self, _notification: JsonRpcNotification) -> Result<()> {
Ok(())
}
async fn receive(&self) -> Result<Option<McpMessage>> {
Ok(None)
}
async fn close(&self) -> Result<()> {
Ok(())
}
}
pub struct SseTransport {
endpoint: String,
client: reqwest::Client,
_tx: mpsc::Sender<String>,
rx: tokio::sync::Mutex<mpsc::Receiver<String>>,
}
impl SseTransport {
pub async fn new(endpoint: String) -> Result<Self> {
let client = reqwest::Client::new();
let (write_tx, _write_rx) = mpsc::channel::<String>(100);
let (read_tx, read_rx) = mpsc::channel::<String>(100);
let endpoint_clone = endpoint.clone();
let read_tx_clone = read_tx;
tokio::spawn(async move {
debug!("SSE transport connecting to: {}", endpoint_clone);
let _ = read_tx_clone;
});
Ok(Self {
endpoint,
client,
_tx: write_tx,
rx: tokio::sync::Mutex::new(read_rx),
})
}
async fn send_json(&self, value: Value) -> Result<()> {
let json = serde_json::to_string(&value)?;
debug!("SSE TX: {}", json);
self.client
.post(&self.endpoint)
.header("Content-Type", "application/json")
.body(json)
.send()
.await?;
Ok(())
}
}
#[async_trait]
impl Transport for SseTransport {
async fn send_request(&self, request: JsonRpcRequest) -> Result<()> {
self.send_json(serde_json::to_value(&request)?).await
}
async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
self.send_json(serde_json::to_value(&response)?).await
}
async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()> {
self.send_json(serde_json::to_value(¬ification)?).await
}
async fn receive(&self) -> Result<Option<McpMessage>> {
let mut rx = self.rx.lock().await;
match rx.recv().await {
Some(line) => {
let value: Value = serde_json::from_str(&line)?;
let msg = McpMessage::from_json(value)?;
Ok(Some(msg))
}
None => Ok(None),
}
}
async fn close(&self) -> Result<()> {
Ok(())
}
}
pub struct ProcessTransport {
_child: tokio::process::Child,
tx: mpsc::Sender<String>,
rx: tokio::sync::Mutex<mpsc::Receiver<String>>,
}
impl ProcessTransport {
pub async fn spawn(command: &str, args: &[&str]) -> Result<Self> {
use tokio::process::Command;
let mut child = Command::new(command)
.args(args)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()?;
let stdout = child
.stdout
.take()
.ok_or_else(|| anyhow::anyhow!("No stdout"))?;
let mut stdin = child
.stdin
.take()
.ok_or_else(|| anyhow::anyhow!("No stdin"))?;
let stderr = child
.stderr
.take()
.ok_or_else(|| anyhow::anyhow!("No stderr"))?;
let (write_tx, mut write_rx) = mpsc::channel::<String>(100);
let (read_tx, read_rx) = mpsc::channel::<String>(100);
tokio::spawn(async move {
let mut reader = BufReader::new(stderr);
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) => break,
Ok(_) => {
let trimmed = line.trim();
if !trimmed.is_empty() {
warn!(target: "mcp_subprocess", "{trimmed}");
}
}
Err(_) => break,
}
}
});
tokio::spawn(async move {
while let Some(msg) = write_rx.recv().await {
trace!("Process TX: {}", msg);
if let Err(e) = stdin.write_all(format!("{}\n", msg).as_bytes()).await {
error!("Failed to write to process stdin: {}", e);
break;
}
if let Err(e) = stdin.flush().await {
error!("Failed to flush process stdin: {}", e);
break;
}
}
});
tokio::spawn(async move {
let mut reader = BufReader::new(stdout);
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) => break, Ok(_) => {
let trimmed = line.trim();
if !trimmed.is_empty() {
trace!("Process RX: {}", trimmed);
if read_tx.send(trimmed.to_string()).await.is_err() {
break;
}
}
}
Err(e) => {
error!("Failed to read from process stdout: {}", e);
break;
}
}
}
});
Ok(Self {
_child: child,
tx: write_tx,
rx: tokio::sync::Mutex::new(read_rx),
})
}
async fn send_json(&self, value: Value) -> Result<()> {
let json = serde_json::to_string(&value)?;
self.tx.send(json).await?;
Ok(())
}
}
#[async_trait]
impl Transport for ProcessTransport {
async fn send_request(&self, request: JsonRpcRequest) -> Result<()> {
self.send_json(serde_json::to_value(&request)?).await
}
async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
self.send_json(serde_json::to_value(&response)?).await
}
async fn send_notification(&self, notification: JsonRpcNotification) -> Result<()> {
self.send_json(serde_json::to_value(¬ification)?).await
}
async fn receive(&self) -> Result<Option<McpMessage>> {
let mut rx = self.rx.lock().await;
match rx.recv().await {
Some(line) => {
let value: Value = serde_json::from_str(&line)?;
let msg = McpMessage::from_json(value)?;
Ok(Some(msg))
}
None => Ok(None),
}
}
async fn close(&self) -> Result<()> {
Ok(())
}
}