model_context_protocol/client/
http.rs1use crate::protocol::*;
7use crate::transport::{McpTransport, McpTransportError, TransportTypeId};
8use async_trait::async_trait;
9use serde_json::Value;
10use std::sync::atomic::{AtomicI64, Ordering};
11use std::sync::Arc;
12use std::time::Duration;
13
14pub struct HttpTransport {
16 endpoint: String,
17 client: reqwest::Client,
18 next_id: Arc<AtomicI64>,
19}
20
21impl HttpTransport {
22 pub fn new(endpoint: impl Into<String>) -> Self {
24 let client = reqwest::Client::builder()
25 .timeout(Duration::from_secs(30))
26 .build()
27 .expect("Failed to create HTTP client");
28
29 Self {
30 endpoint: endpoint.into(),
31 client,
32 next_id: Arc::new(AtomicI64::new(1)),
33 }
34 }
35
36 pub fn with_timeout(
38 endpoint: impl Into<String>,
39 timeout: Duration,
40 ) -> Result<Self, McpTransportError> {
41 let client = reqwest::Client::builder()
42 .timeout(timeout)
43 .build()
44 .map_err(|e| {
45 McpTransportError::TransportError(format!("Failed to create HTTP client: {}", e))
46 })?;
47
48 Ok(Self {
49 endpoint: endpoint.into(),
50 client,
51 next_id: Arc::new(AtomicI64::new(1)),
52 })
53 }
54
55 pub async fn send_request(
57 &self,
58 method: &str,
59 params: Option<Value>,
60 ) -> Result<Value, McpTransportError> {
61 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
62 let request = JsonRpcRequest::new(JsonRpcId::Number(id), method, params);
63
64 let response = self
65 .client
66 .post(&self.endpoint)
67 .json(&request)
68 .send()
69 .await
70 .map_err(|e| {
71 McpTransportError::TransportError(format!("HTTP request failed: {}", e))
72 })?;
73
74 if !response.status().is_success() {
75 return Err(McpTransportError::TransportError(format!(
76 "HTTP error: {} - {}",
77 response.status(),
78 response.text().await.unwrap_or_default()
79 )));
80 }
81
82 let json_response: JsonRpcResponse = response.json().await.map_err(|e| {
83 McpTransportError::TransportError(format!("Failed to parse JSON response: {}", e))
84 })?;
85
86 match json_response.payload {
87 JsonRpcPayload::Success { result } => Ok(result),
88 JsonRpcPayload::Error { error } => Err(McpTransportError::ServerError(format!(
89 "MCP Error: {}",
90 error
91 ))),
92 }
93 }
94
95 pub async fn health_check(&self) -> bool {
97 self.client
98 .head(&self.endpoint)
99 .send()
100 .await
101 .map(|r| r.status().is_success())
102 .unwrap_or(false)
103 }
104
105 pub fn endpoint(&self) -> &str {
107 &self.endpoint
108 }
109}
110
111pub struct HttpTransportAdapter {
113 inner: HttpTransport,
114}
115
116impl HttpTransportAdapter {
117 pub fn new(endpoint: impl Into<String>) -> Self {
119 Self {
120 inner: HttpTransport::new(endpoint),
121 }
122 }
123
124 pub fn with_timeout(
126 endpoint: impl Into<String>,
127 timeout: Duration,
128 ) -> Result<Self, McpTransportError> {
129 Ok(Self {
130 inner: HttpTransport::with_timeout(endpoint, timeout)?,
131 })
132 }
133}
134
135#[async_trait]
136impl McpTransport for HttpTransportAdapter {
137 async fn list_tools(&self) -> Result<Vec<McpToolDefinition>, McpTransportError> {
138 let result = self
139 .inner
140 .send_request("tools/list", Some(serde_json::json!({})))
141 .await?;
142
143 let list_result: ListToolsResult = serde_json::from_value(result)?;
144
145 Ok(list_result.tools)
146 }
147
148 async fn call_tool(&self, name: &str, args: Value) -> Result<Value, McpTransportError> {
149 let params = CallToolParams {
150 name: name.to_string(),
151 arguments: Some(args),
152 task: None,
153 meta: None,
154 };
155
156 let result = self
157 .inner
158 .send_request("tools/call", Some(serde_json::to_value(¶ms)?))
159 .await?;
160
161 let call_result: CallToolResult = serde_json::from_value(result)?;
162
163 if call_result.is_error == Some(true) {
164 let error_text = call_result
165 .content
166 .first()
167 .and_then(|c| c.as_text())
168 .unwrap_or("Unknown error");
169 return Err(McpTransportError::ServerError(error_text.to_string()));
170 }
171
172 let text = call_result
173 .content
174 .iter()
175 .filter_map(|c| c.as_text())
176 .collect::<Vec<_>>()
177 .join("\n");
178
179 Ok(Value::String(text))
180 }
181
182 async fn shutdown(&self) -> Result<(), McpTransportError> {
183 Ok(())
185 }
186
187 fn is_alive(&self) -> bool {
188 true
191 }
192
193 fn transport_type(&self) -> TransportTypeId {
194 TransportTypeId::Http
195 }
196}
197
198#[cfg(test)]
199mod tests {
200 use super::*;
201
202 #[tokio::test]
203 async fn test_http_transport_creation() {
204 let transport = HttpTransport::new("http://localhost:8080/mcp");
205 assert_eq!(transport.endpoint(), "http://localhost:8080/mcp");
206 }
207
208 #[tokio::test]
209 async fn test_http_transport_with_timeout() {
210 let transport =
211 HttpTransport::with_timeout("http://localhost:8080/mcp", Duration::from_secs(10));
212 assert!(transport.is_ok());
213 }
214
215 #[test]
216 fn test_transport_type() {
217 assert_eq!(TransportTypeId::Http.to_string(), "http");
218 }
219}