model_context_protocol/
http.rs1#[cfg(feature = "http")]
7use async_trait::async_trait;
8#[cfg(feature = "http")]
9use serde_json::Value;
10#[cfg(feature = "http")]
11use std::sync::atomic::{AtomicI64, Ordering};
12#[cfg(feature = "http")]
13use std::sync::Arc;
14#[cfg(feature = "http")]
15use std::time::Duration;
16
17#[cfg(feature = "http")]
18use crate::protocol::*;
19#[cfg(feature = "http")]
20use crate::transport::{McpTransport, McpTransportError, TransportTypeId};
21
22#[cfg(feature = "http")]
24pub struct HttpTransport {
25 endpoint: String,
26 client: reqwest::Client,
27 next_id: Arc<AtomicI64>,
28}
29
30#[cfg(feature = "http")]
31impl HttpTransport {
32 pub fn new(endpoint: impl Into<String>) -> Self {
34 let client = reqwest::Client::builder()
35 .timeout(Duration::from_secs(30))
36 .build()
37 .expect("Failed to create HTTP client");
38
39 Self {
40 endpoint: endpoint.into(),
41 client,
42 next_id: Arc::new(AtomicI64::new(1)),
43 }
44 }
45
46 pub fn with_timeout(
48 endpoint: impl Into<String>,
49 timeout: Duration,
50 ) -> Result<Self, McpTransportError> {
51 let client = reqwest::Client::builder()
52 .timeout(timeout)
53 .build()
54 .map_err(|e| {
55 McpTransportError::TransportError(format!("Failed to create HTTP client: {}", e))
56 })?;
57
58 Ok(Self {
59 endpoint: endpoint.into(),
60 client,
61 next_id: Arc::new(AtomicI64::new(1)),
62 })
63 }
64
65 pub async fn send_request(
67 &self,
68 method: &str,
69 params: Option<Value>,
70 ) -> Result<Value, McpTransportError> {
71 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
72 let request = JsonRpcRequest::new(JsonRpcId::Number(id), method, params);
73
74 let response = self
75 .client
76 .post(&self.endpoint)
77 .json(&request)
78 .send()
79 .await
80 .map_err(|e| {
81 McpTransportError::TransportError(format!("HTTP request failed: {}", e))
82 })?;
83
84 if !response.status().is_success() {
85 return Err(McpTransportError::TransportError(format!(
86 "HTTP error: {} - {}",
87 response.status(),
88 response.text().await.unwrap_or_default()
89 )));
90 }
91
92 let json_response: JsonRpcResponse = response.json().await.map_err(|e| {
93 McpTransportError::TransportError(format!("Failed to parse JSON response: {}", e))
94 })?;
95
96 match json_response.payload {
97 JsonRpcPayload::Success { result } => Ok(result),
98 JsonRpcPayload::Error { error } => Err(McpTransportError::ServerError(format!(
99 "MCP Error: {}",
100 error
101 ))),
102 }
103 }
104
105 pub async fn health_check(&self) -> bool {
107 self.client
108 .head(&self.endpoint)
109 .send()
110 .await
111 .map(|r| r.status().is_success())
112 .unwrap_or(false)
113 }
114
115 pub fn endpoint(&self) -> &str {
117 &self.endpoint
118 }
119}
120
121#[cfg(feature = "http")]
123pub struct HttpTransportAdapter {
124 inner: HttpTransport,
125}
126
127#[cfg(feature = "http")]
128impl HttpTransportAdapter {
129 pub fn new(endpoint: impl Into<String>) -> Self {
131 Self {
132 inner: HttpTransport::new(endpoint),
133 }
134 }
135
136 pub fn with_timeout(
138 endpoint: impl Into<String>,
139 timeout: Duration,
140 ) -> Result<Self, McpTransportError> {
141 Ok(Self {
142 inner: HttpTransport::with_timeout(endpoint, timeout)?,
143 })
144 }
145}
146
147#[cfg(feature = "http")]
148#[async_trait]
149impl McpTransport for HttpTransportAdapter {
150 async fn list_tools(&self) -> Result<Vec<ToolDefinition>, McpTransportError> {
151 let result = self
152 .inner
153 .send_request("tools/list", Some(serde_json::json!({})))
154 .await?;
155
156 let list_result: ListToolsResult = serde_json::from_value(result)?;
157
158 Ok(list_result
159 .tools
160 .into_iter()
161 .map(ToolDefinition::from)
162 .collect())
163 }
164
165 async fn call_tool(&self, name: &str, args: Value) -> Result<Value, McpTransportError> {
166 let params = CallToolParams {
167 name: name.to_string(),
168 arguments: Some(args),
169 };
170
171 let result = self
172 .inner
173 .send_request("tools/call", Some(serde_json::to_value(¶ms)?))
174 .await?;
175
176 let call_result: CallToolResult = serde_json::from_value(result)?;
177
178 if call_result.is_error == Some(true) {
179 let error_text = call_result
180 .content
181 .first()
182 .and_then(|c| c.as_text())
183 .unwrap_or("Unknown error");
184 return Err(McpTransportError::ServerError(error_text.to_string()));
185 }
186
187 let text = call_result
188 .content
189 .iter()
190 .filter_map(|c| c.as_text())
191 .collect::<Vec<_>>()
192 .join("\n");
193
194 Ok(Value::String(text))
195 }
196
197 async fn shutdown(&self) -> Result<(), McpTransportError> {
198 Ok(())
200 }
201
202 fn is_alive(&self) -> bool {
203 true
206 }
207
208 fn transport_type(&self) -> TransportTypeId {
209 TransportTypeId::Http
210 }
211}
212
213#[cfg(all(test, feature = "http"))]
214mod tests {
215 use super::*;
216
217 #[tokio::test]
218 async fn test_http_transport_creation() {
219 let transport = HttpTransport::new("http://localhost:8080/mcp");
220 assert_eq!(transport.endpoint(), "http://localhost:8080/mcp");
221 }
222
223 #[tokio::test]
224 async fn test_http_transport_with_timeout() {
225 let transport =
226 HttpTransport::with_timeout("http://localhost:8080/mcp", Duration::from_secs(10));
227 assert!(transport.is_ok());
228 }
229
230 #[test]
231 fn test_transport_type() {
232 assert_eq!(TransportTypeId::Http.to_string(), "http");
233 }
234}