Skip to main content

model_context_protocol/
http.rs

1//! HTTP Transport for MCP Servers
2//!
3//! Communicates with MCP servers via HTTP using JSON-RPC over POST requests.
4//! This is used for MCP servers that expose an HTTP endpoint.
5
6#[cfg(feature = "http")]
7use async_trait::async_trait;
8#[cfg(feature = "http")]
9use serde_json::Value;
10#[cfg(feature = "http")]
11use std::sync::atomic::{AtomicI64, Ordering};
12#[cfg(feature = "http")]
13use std::sync::Arc;
14#[cfg(feature = "http")]
15use std::time::Duration;
16
17#[cfg(feature = "http")]
18use crate::protocol::*;
19#[cfg(feature = "http")]
20use crate::transport::{McpTransport, McpTransportError, TransportTypeId};
21
22/// HTTP-based MCP transport for communicating with HTTP servers.
23#[cfg(feature = "http")]
24pub struct HttpTransport {
25    endpoint: String,
26    client: reqwest::Client,
27    next_id: Arc<AtomicI64>,
28}
29
30#[cfg(feature = "http")]
31impl HttpTransport {
32    /// Create a new HTTP transport.
33    pub fn new(endpoint: impl Into<String>) -> Self {
34        let client = reqwest::Client::builder()
35            .timeout(Duration::from_secs(30))
36            .build()
37            .expect("Failed to create HTTP client");
38
39        Self {
40            endpoint: endpoint.into(),
41            client,
42            next_id: Arc::new(AtomicI64::new(1)),
43        }
44    }
45
46    /// Create with custom timeout.
47    pub fn with_timeout(
48        endpoint: impl Into<String>,
49        timeout: Duration,
50    ) -> Result<Self, McpTransportError> {
51        let client = reqwest::Client::builder()
52            .timeout(timeout)
53            .build()
54            .map_err(|e| {
55                McpTransportError::TransportError(format!("Failed to create HTTP client: {}", e))
56            })?;
57
58        Ok(Self {
59            endpoint: endpoint.into(),
60            client,
61            next_id: Arc::new(AtomicI64::new(1)),
62        })
63    }
64
65    /// Send a JSON-RPC request via HTTP POST.
66    pub async fn send_request(
67        &self,
68        method: &str,
69        params: Option<Value>,
70    ) -> Result<Value, McpTransportError> {
71        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
72        let request = JsonRpcRequest::new(JsonRpcId::Number(id), method, params);
73
74        let response = self
75            .client
76            .post(&self.endpoint)
77            .json(&request)
78            .send()
79            .await
80            .map_err(|e| {
81                McpTransportError::TransportError(format!("HTTP request failed: {}", e))
82            })?;
83
84        if !response.status().is_success() {
85            return Err(McpTransportError::TransportError(format!(
86                "HTTP error: {} - {}",
87                response.status(),
88                response.text().await.unwrap_or_default()
89            )));
90        }
91
92        let json_response: JsonRpcResponse = response.json().await.map_err(|e| {
93            McpTransportError::TransportError(format!("Failed to parse JSON response: {}", e))
94        })?;
95
96        match json_response.payload {
97            JsonRpcPayload::Success { result } => Ok(result),
98            JsonRpcPayload::Error { error } => Err(McpTransportError::ServerError(format!(
99                "MCP Error: {}",
100                error
101            ))),
102        }
103    }
104
105    /// Health check - verify the endpoint is reachable.
106    pub async fn health_check(&self) -> bool {
107        self.client
108            .head(&self.endpoint)
109            .send()
110            .await
111            .map(|r| r.status().is_success())
112            .unwrap_or(false)
113    }
114
115    /// Get the endpoint URL.
116    pub fn endpoint(&self) -> &str {
117        &self.endpoint
118    }
119}
120
121/// Adapter that wraps HttpTransport and implements McpTransport.
122#[cfg(feature = "http")]
123pub struct HttpTransportAdapter {
124    inner: HttpTransport,
125}
126
127#[cfg(feature = "http")]
128impl HttpTransportAdapter {
129    /// Create a new HTTP transport adapter.
130    pub fn new(endpoint: impl Into<String>) -> Self {
131        Self {
132            inner: HttpTransport::new(endpoint),
133        }
134    }
135
136    /// Create with custom timeout.
137    pub fn with_timeout(
138        endpoint: impl Into<String>,
139        timeout: Duration,
140    ) -> Result<Self, McpTransportError> {
141        Ok(Self {
142            inner: HttpTransport::with_timeout(endpoint, timeout)?,
143        })
144    }
145}
146
147#[cfg(feature = "http")]
148#[async_trait]
149impl McpTransport for HttpTransportAdapter {
150    async fn list_tools(&self) -> Result<Vec<ToolDefinition>, McpTransportError> {
151        let result = self
152            .inner
153            .send_request("tools/list", Some(serde_json::json!({})))
154            .await?;
155
156        let list_result: ListToolsResult = serde_json::from_value(result)?;
157
158        Ok(list_result
159            .tools
160            .into_iter()
161            .map(ToolDefinition::from)
162            .collect())
163    }
164
165    async fn call_tool(&self, name: &str, args: Value) -> Result<Value, McpTransportError> {
166        let params = CallToolParams {
167            name: name.to_string(),
168            arguments: Some(args),
169        };
170
171        let result = self
172            .inner
173            .send_request("tools/call", Some(serde_json::to_value(&params)?))
174            .await?;
175
176        let call_result: CallToolResult = serde_json::from_value(result)?;
177
178        if call_result.is_error == Some(true) {
179            let error_text = call_result
180                .content
181                .first()
182                .and_then(|c| c.as_text())
183                .unwrap_or("Unknown error");
184            return Err(McpTransportError::ServerError(error_text.to_string()));
185        }
186
187        let text = call_result
188            .content
189            .iter()
190            .filter_map(|c| c.as_text())
191            .collect::<Vec<_>>()
192            .join("\n");
193
194        Ok(Value::String(text))
195    }
196
197    async fn shutdown(&self) -> Result<(), McpTransportError> {
198        // HTTP transport doesn't need explicit shutdown
199        Ok(())
200    }
201
202    fn is_alive(&self) -> bool {
203        // For HTTP, we'd need to do a health check
204        // For now, assume alive (caller can use health_check() if needed)
205        true
206    }
207
208    fn transport_type(&self) -> TransportTypeId {
209        TransportTypeId::Http
210    }
211}
212
213#[cfg(all(test, feature = "http"))]
214mod tests {
215    use super::*;
216
217    #[tokio::test]
218    async fn test_http_transport_creation() {
219        let transport = HttpTransport::new("http://localhost:8080/mcp");
220        assert_eq!(transport.endpoint(), "http://localhost:8080/mcp");
221    }
222
223    #[tokio::test]
224    async fn test_http_transport_with_timeout() {
225        let transport =
226            HttpTransport::with_timeout("http://localhost:8080/mcp", Duration::from_secs(10));
227        assert!(transport.is_ok());
228    }
229
230    #[test]
231    fn test_transport_type() {
232        assert_eq!(TransportTypeId::Http.to_string(), "http");
233    }
234}