mcp_client/transport/
mod.rs1use async_trait::async_trait;
2use mcp_spec::protocol::JsonRpcMessage;
3use std::collections::HashMap;
4use thiserror::Error;
5use tokio::sync::{mpsc, oneshot, RwLock};
6
7pub type BoxError = Box<dyn std::error::Error + Sync + Send>;
8#[derive(Debug, Error)]
10pub enum Error {
11 #[error("I/O error: {0}")]
12 Io(#[from] std::io::Error),
13
14 #[error("Transport was not connected or is already closed")]
15 NotConnected,
16
17 #[error("Channel closed")]
18 ChannelClosed,
19
20 #[error("Serialization error: {0}")]
21 Serialization(#[from] serde_json::Error),
22
23 #[error("Unsupported message type. JsonRpcMessage can only be Request or Notification.")]
24 UnsupportedMessage,
25
26 #[error("Stdio process error: {0}")]
27 StdioProcessError(String),
28
29 #[error("SSE connection error: {0}")]
30 SseConnection(String),
31
32 #[error("HTTP error: {status} - {message}")]
33 HttpError { status: u16, message: String },
34}
35
36#[derive(Debug)]
38pub struct TransportMessage {
39 pub message: JsonRpcMessage,
41 pub response_tx: Option<oneshot::Sender<Result<JsonRpcMessage, Error>>>,
43}
44
45#[async_trait]
47pub trait Transport {
48 type Handle: TransportHandle;
49
50 async fn start(&self) -> Result<Self::Handle, Error>;
53
54 async fn close(&self) -> Result<(), Error>;
56}
57
58#[async_trait]
59pub trait TransportHandle: Send + Sync + Clone + 'static {
60 async fn send(&self, message: JsonRpcMessage) -> Result<JsonRpcMessage, Error>;
61}
62
63pub async fn send_message(
65 sender: &mpsc::Sender<TransportMessage>,
66 message: JsonRpcMessage,
67) -> Result<JsonRpcMessage, Error> {
68 match message {
69 JsonRpcMessage::Request(request) => {
70 let (respond_to, response) = oneshot::channel();
71 let msg = TransportMessage {
72 message: JsonRpcMessage::Request(request),
73 response_tx: Some(respond_to),
74 };
75 sender.send(msg).await.map_err(|_| Error::ChannelClosed)?;
76 Ok(response.await.map_err(|_| Error::ChannelClosed)??)
77 }
78 JsonRpcMessage::Notification(notification) => {
79 let msg = TransportMessage {
80 message: JsonRpcMessage::Notification(notification),
81 response_tx: None,
82 };
83 sender.send(msg).await.map_err(|_| Error::ChannelClosed)?;
84 Ok(JsonRpcMessage::Nil)
85 }
86 _ => Err(Error::UnsupportedMessage),
87 }
88}
89
90pub struct PendingRequests {
92 requests: RwLock<HashMap<String, oneshot::Sender<Result<JsonRpcMessage, Error>>>>,
93}
94
95impl Default for PendingRequests {
96 fn default() -> Self {
97 Self::new()
98 }
99}
100
101impl PendingRequests {
102 pub fn new() -> Self {
103 Self {
104 requests: RwLock::new(HashMap::new()),
105 }
106 }
107
108 pub async fn insert(&self, id: String, sender: oneshot::Sender<Result<JsonRpcMessage, Error>>) {
109 self.requests.write().await.insert(id, sender);
110 }
111
112 pub async fn respond(&self, id: &str, response: Result<JsonRpcMessage, Error>) {
113 if let Some(tx) = self.requests.write().await.remove(id) {
114 let _ = tx.send(response);
115 }
116 }
117
118 pub async fn clear(&self) {
119 self.requests.write().await.clear();
120 }
121}
122
123pub mod stdio;
124pub use stdio::StdioTransport;
125
126pub mod sse;
127pub use sse::SseTransport;