use async_trait::async_trait;
use tokio::sync::mpsc;
use crate::context::notification_channel;
use crate::error::Result;
use crate::jsonrpc::JsonRpcService;
use crate::protocol::{JsonRpcRequest, JsonRpcResponse, McpNotification};
use crate::router::McpRouter;
use super::transport::ClientTransport;
pub struct ChannelTransport {
request_tx: mpsc::Sender<String>,
response_rx: mpsc::Receiver<String>,
connected: bool,
}
impl ChannelTransport {
pub fn new(router: McpRouter) -> Self {
let (request_tx, mut request_rx) = mpsc::channel::<String>(64);
let (response_tx, response_rx) = mpsc::channel::<String>(64);
let (notification_tx, _notification_rx) = notification_channel(64);
let router = router.with_notification_sender(notification_tx);
let mut service = JsonRpcService::new(router.clone());
tokio::spawn(async move {
while let Some(raw_request) = request_rx.recv().await {
let req: JsonRpcRequest = match serde_json::from_str(&raw_request) {
Ok(r) => r,
Err(e) => {
tracing::error!("ChannelTransport: failed to parse request: {}", e);
continue;
}
};
if req.method == "notifications/initialized" {
router.handle_notification(McpNotification::Initialized);
continue;
}
if req.method.starts_with("notifications/") {
continue;
}
let response = service.call_single(req).await;
let json = match response {
Ok(resp) => match serde_json::to_string(&resp) {
Ok(j) => j,
Err(e) => {
tracing::error!(
"ChannelTransport: failed to serialize response: {}",
e
);
continue;
}
},
Err(e) => {
let err_resp = JsonRpcResponse::error(
None,
tower_mcp_types::JsonRpcError::internal_error(e.to_string()),
);
match serde_json::to_string(&err_resp) {
Ok(j) => j,
Err(_) => continue,
}
}
};
if response_tx.send(json).await.is_err() {
break; }
}
});
Self {
request_tx,
response_rx,
connected: true,
}
}
}
#[async_trait]
impl ClientTransport for ChannelTransport {
async fn send(&mut self, message: &str) -> Result<()> {
self.request_tx
.send(message.to_string())
.await
.map_err(|_| crate::error::Error::internal("ChannelTransport: server task dropped"))?;
Ok(())
}
async fn recv(&mut self) -> Result<Option<String>> {
match self.response_rx.recv().await {
Some(msg) => Ok(Some(msg)),
None => {
self.connected = false;
Ok(None)
}
}
}
fn is_connected(&self) -> bool {
self.connected
}
async fn close(&mut self) -> Result<()> {
self.connected = false;
Ok(())
}
}