use std::future::Future;
use std::pin::Pin;
use bytes::Bytes;
use serde_json::Value;
use tracing::{debug, trace};
use turbomcp_protocol::{
InitializeRequest, InitializeResult, MessageId,
jsonrpc::{
JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, JsonRpcResponsePayload,
JsonRpcVersion,
},
};
use turbomcp_transport::{
ChildProcessConfig, ChildProcessTransport, Transport, TransportMessage,
core::TransportMessageMetadata,
};
use uuid::Uuid;
use crate::error::{ProxyError, ProxyResult};
use super::McpBackend;
pub struct StdioBackend {
transport: ChildProcessTransport,
next_id: std::sync::atomic::AtomicU64,
}
impl StdioBackend {
pub async fn new(command: impl Into<String>, args: Vec<String>) -> ProxyResult<Self> {
let config = ChildProcessConfig {
command: command.into(),
args,
working_directory: None,
environment: None,
..Default::default()
};
let transport = ChildProcessTransport::new(config);
transport
.connect()
.await
.map_err(|e| ProxyError::backend(format!("Failed to connect to subprocess: {e}")))?;
Ok(Self {
transport,
next_id: std::sync::atomic::AtomicU64::new(1),
})
}
pub async fn with_working_dir(
command: impl Into<String>,
args: Vec<String>,
working_dir: String,
) -> ProxyResult<Self> {
let config = ChildProcessConfig {
command: command.into(),
args,
working_directory: Some(working_dir),
environment: None,
..Default::default()
};
let transport = ChildProcessTransport::new(config);
transport
.connect()
.await
.map_err(|e| ProxyError::backend(format!("Failed to connect to subprocess: {e}")))?;
Ok(Self {
transport,
next_id: std::sync::atomic::AtomicU64::new(1),
})
}
fn next_message_id(&self) -> u64 {
self.next_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
}
async fn send_request(&self, method: &str, params: Value) -> ProxyResult<Value> {
let id = self.next_message_id();
let request = JsonRpcRequest {
jsonrpc: JsonRpcVersion,
#[allow(clippy::cast_possible_wrap)]
id: MessageId::Number(id as i64),
method: method.to_string(),
params: Some(params),
};
trace!(method = %method, id = %id, "Sending introspection request");
let request_json = serde_json::to_string(&request)
.map_err(|e| ProxyError::backend(format!("Failed to serialize request: {e}")))?;
let message = TransportMessage {
id: turbomcp_protocol::MessageId::String(Uuid::new_v4().to_string()),
payload: Bytes::from(request_json.into_bytes()),
metadata: TransportMessageMetadata::default(),
};
self.transport
.send(message)
.await
.map_err(|e| ProxyError::backend(format!("Failed to send message: {e}")))?;
let response_message = self
.transport
.receive()
.await
.map_err(|e| ProxyError::backend(format!("Failed to receive response: {e}")))?
.ok_or_else(|| {
ProxyError::backend("No response received (transport closed)".to_string())
})?;
let response_str = String::from_utf8(response_message.payload.to_vec())
.map_err(|e| ProxyError::backend(format!("Invalid UTF-8 in response: {e}")))?;
trace!(response = %response_str, "Received introspection response");
let response: JsonRpcResponse = serde_json::from_str(&response_str)
.map_err(|e| ProxyError::backend(format!("Failed to parse response: {e}")))?;
match response.payload {
JsonRpcResponsePayload::Success { result } => Ok(result),
JsonRpcResponsePayload::Error { error } => Err(ProxyError::backend(format!(
"Server returned error: {error:?}"
))),
}
}
}
impl McpBackend for StdioBackend {
fn initialize(
&mut self,
request: InitializeRequest,
) -> Pin<Box<dyn Future<Output = ProxyResult<InitializeResult>> + Send + '_>> {
Box::pin(async move {
debug!("Initializing STDIO backend via turbomcp-transport");
let params = serde_json::to_value(&request).map_err(|e| {
ProxyError::backend(format!("Failed to serialize initialize request: {e}"))
})?;
let result = self.send_request("initialize", params).await?;
let init_result: InitializeResult = serde_json::from_value(result).map_err(|e| {
ProxyError::backend(format!("Failed to deserialize initialize result: {e}"))
})?;
debug!(
server_name = %init_result.server_info.name,
server_version = %init_result.server_info.version,
protocol_version = %init_result.protocol_version,
"Server initialized successfully"
);
self.send_notification("notifications/initialized", serde_json::json!({}))
.await?;
Ok(init_result)
})
}
fn call_method<'a>(
&'a mut self,
method: &'a str,
params: Value,
) -> Pin<Box<dyn Future<Output = ProxyResult<Value>> + Send + 'a>> {
Box::pin(async move { self.send_request(method, params).await })
}
fn send_notification<'a>(
&'a mut self,
method: &'a str,
params: Value,
) -> Pin<Box<dyn Future<Output = ProxyResult<()>> + Send + 'a>> {
Box::pin(async move {
let notification = JsonRpcNotification {
jsonrpc: JsonRpcVersion,
method: method.to_string(),
params: Some(params),
};
let notification_json = serde_json::to_string(¬ification).map_err(|e| {
ProxyError::backend(format!("Failed to serialize notification: {e}"))
})?;
trace!(method = %method, "Sending notification");
let message = TransportMessage {
id: turbomcp_protocol::MessageId::String(Uuid::new_v4().to_string()),
payload: Bytes::from(notification_json.into_bytes()),
metadata: TransportMessageMetadata::default(),
};
self.transport
.send(message)
.await
.map_err(|e| ProxyError::backend(format!("Failed to send notification: {e}")))?;
Ok(())
})
}
fn shutdown(&mut self) -> Pin<Box<dyn Future<Output = ProxyResult<()>> + Send + '_>> {
Box::pin(async move {
debug!("Shutting down STDIO backend");
Ok(())
})
}
fn description(&self) -> String {
"STDIO backend via turbomcp-transport".to_string()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_stdio_backend_creation() {
let backend = StdioBackend::new("python", vec!["server.py".to_string()]).await;
assert!(backend.is_ok());
}
#[tokio::test]
async fn test_stdio_backend_with_working_dir() {
let backend = StdioBackend::with_working_dir(
"python",
vec!["server.py".to_string()],
"/tmp".to_string(),
)
.await;
assert!(backend.is_ok());
}
}