mcp_client/transport/
mod.rs

1use async_trait::async_trait;
2use mcp_spec::protocol::JsonRpcMessage;
3use std::collections::HashMap;
4use thiserror::Error;
5use tokio::sync::{mpsc, oneshot, RwLock};
6
7pub type BoxError = Box<dyn std::error::Error + Sync + Send>;
8/// A generic error type for transport operations.
9#[derive(Debug, Error)]
10pub enum Error {
11    #[error("I/O error: {0}")]
12    Io(#[from] std::io::Error),
13
14    #[error("Transport was not connected or is already closed")]
15    NotConnected,
16
17    #[error("Channel closed")]
18    ChannelClosed,
19
20    #[error("Serialization error: {0}")]
21    Serialization(#[from] serde_json::Error),
22
23    #[error("Unsupported message type. JsonRpcMessage can only be Request or Notification.")]
24    UnsupportedMessage,
25
26    #[error("Stdio process error: {0}")]
27    StdioProcessError(String),
28
29    #[error("SSE connection error: {0}")]
30    SseConnection(String),
31
32    #[error("HTTP error: {status} - {message}")]
33    HttpError { status: u16, message: String },
34}
35
36/// A message that can be sent through the transport
37#[derive(Debug)]
38pub struct TransportMessage {
39    /// The JSON-RPC message to send
40    pub message: JsonRpcMessage,
41    /// Channel to receive the response on (None for notifications)
42    pub response_tx: Option<oneshot::Sender<Result<JsonRpcMessage, Error>>>,
43}
44
45/// A generic asynchronous transport trait with channel-based communication
46#[async_trait]
47pub trait Transport {
48    type Handle: TransportHandle;
49
50    /// Start the transport and establish the underlying connection.
51    /// Returns the transport handle for sending messages.
52    async fn start(&self) -> Result<Self::Handle, Error>;
53
54    /// Close the transport and free any resources.
55    async fn close(&self) -> Result<(), Error>;
56}
57
58#[async_trait]
59pub trait TransportHandle: Send + Sync + Clone + 'static {
60    async fn send(&self, message: JsonRpcMessage) -> Result<JsonRpcMessage, Error>;
61}
62
63// Helper function that contains the common send implementation
64pub async fn send_message(
65    sender: &mpsc::Sender<TransportMessage>,
66    message: JsonRpcMessage,
67) -> Result<JsonRpcMessage, Error> {
68    match message {
69        JsonRpcMessage::Request(request) => {
70            let (respond_to, response) = oneshot::channel();
71            let msg = TransportMessage {
72                message: JsonRpcMessage::Request(request),
73                response_tx: Some(respond_to),
74            };
75            sender.send(msg).await.map_err(|_| Error::ChannelClosed)?;
76            Ok(response.await.map_err(|_| Error::ChannelClosed)??)
77        }
78        JsonRpcMessage::Notification(notification) => {
79            let msg = TransportMessage {
80                message: JsonRpcMessage::Notification(notification),
81                response_tx: None,
82            };
83            sender.send(msg).await.map_err(|_| Error::ChannelClosed)?;
84            Ok(JsonRpcMessage::Nil)
85        }
86        _ => Err(Error::UnsupportedMessage),
87    }
88}
89
90// A data structure to store pending requests and their response channels
91pub struct PendingRequests {
92    requests: RwLock<HashMap<String, oneshot::Sender<Result<JsonRpcMessage, Error>>>>,
93}
94
95impl Default for PendingRequests {
96    fn default() -> Self {
97        Self::new()
98    }
99}
100
101impl PendingRequests {
102    pub fn new() -> Self {
103        Self {
104            requests: RwLock::new(HashMap::new()),
105        }
106    }
107
108    pub async fn insert(&self, id: String, sender: oneshot::Sender<Result<JsonRpcMessage, Error>>) {
109        self.requests.write().await.insert(id, sender);
110    }
111
112    pub async fn respond(&self, id: &str, response: Result<JsonRpcMessage, Error>) {
113        if let Some(tx) = self.requests.write().await.remove(id) {
114            let _ = tx.send(response);
115        }
116    }
117
118    pub async fn clear(&self) {
119        self.requests.write().await.clear();
120    }
121}
122
123pub mod stdio;
124pub use stdio::StdioTransport;
125
126pub mod sse;
127pub use sse::SseTransport;