use async_trait::async_trait;
use mcp_core_fishcode2025::protocol::JsonRpcMessage;
use std::collections::HashMap;
use thiserror::Error;
use tokio::sync::{mpsc, oneshot, RwLock};
pub type BoxError = Box<dyn std::error::Error + Sync + Send>;
#[derive(Debug, Error)]
pub enum Error {
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("Transport was not connected or is already closed")]
NotConnected,
#[error("Channel closed")]
ChannelClosed,
#[error("Serialization error: {0}")]
Serialization(#[from] serde_json::Error),
#[error("Unsupported message type. JsonRpcMessage can only be Request or Notification.")]
UnsupportedMessage,
#[error("Stdio process error: {0}")]
StdioProcessError(String),
#[error("SSE connection error: {0}")]
SseConnection(String),
#[error("HTTP error: {status} - {message}")]
HttpError { status: u16, message: String },
}
#[derive(Debug)]
pub struct TransportMessage {
pub message: JsonRpcMessage,
pub response_tx: Option<oneshot::Sender<Result<JsonRpcMessage, Error>>>,
}
#[async_trait]
pub trait Transport {
type Handle: TransportHandle;
async fn start(&self) -> Result<Self::Handle, Error>;
async fn close(&self) -> Result<(), Error>;
}
#[async_trait]
pub trait TransportHandle: Send + Sync + Clone + 'static {
async fn send(&self, message: JsonRpcMessage) -> Result<JsonRpcMessage, Error>;
}
pub async fn send_message(
sender: &mpsc::Sender<TransportMessage>,
message: JsonRpcMessage,
) -> Result<JsonRpcMessage, Error> {
match message {
JsonRpcMessage::Request(request) => {
let (respond_to, response) = oneshot::channel();
let msg = TransportMessage {
message: JsonRpcMessage::Request(request),
response_tx: Some(respond_to),
};
sender.send(msg).await.map_err(|_| Error::ChannelClosed)?;
Ok(response.await.map_err(|_| Error::ChannelClosed)??)
}
JsonRpcMessage::Notification(notification) => {
let msg = TransportMessage {
message: JsonRpcMessage::Notification(notification),
response_tx: None,
};
sender.send(msg).await.map_err(|_| Error::ChannelClosed)?;
Ok(JsonRpcMessage::Nil)
}
_ => Err(Error::UnsupportedMessage),
}
}
pub struct PendingRequests {
requests: RwLock<HashMap<String, oneshot::Sender<Result<JsonRpcMessage, Error>>>>,
}
impl Default for PendingRequests {
fn default() -> Self {
Self::new()
}
}
impl PendingRequests {
pub fn new() -> Self {
Self {
requests: RwLock::new(HashMap::new()),
}
}
pub async fn insert(&self, id: String, sender: oneshot::Sender<Result<JsonRpcMessage, Error>>) {
self.requests.write().await.insert(id, sender);
}
pub async fn respond(&self, id: &str, response: Result<JsonRpcMessage, Error>) {
if let Some(tx) = self.requests.write().await.remove(id) {
let _ = tx.send(response);
}
}
pub async fn clear(&self) {
self.requests.write().await.clear();
}
}
pub mod stdio;
pub use stdio::StdioTransport;
pub mod sse;
pub use sse::SseTransport;