Skip to main content

model_context_protocol/client/
http.rs

1//! HTTP Transport for connecting to MCP servers.
2//!
3//! Communicates with MCP servers via HTTP using JSON-RPC over POST requests.
4//! This is used to connect to MCP servers that expose an HTTP endpoint.
5
6use crate::protocol::*;
7use crate::transport::{McpTransport, McpTransportError, TransportTypeId};
8use async_trait::async_trait;
9use serde_json::Value;
10use std::sync::atomic::{AtomicI64, Ordering};
11use std::sync::Arc;
12use std::time::Duration;
13
14/// HTTP-based MCP transport for communicating with HTTP servers.
15pub struct HttpTransport {
16    endpoint: String,
17    client: reqwest::Client,
18    next_id: Arc<AtomicI64>,
19}
20
21impl HttpTransport {
22    /// Create a new HTTP transport.
23    pub fn new(endpoint: impl Into<String>) -> Self {
24        let client = reqwest::Client::builder()
25            .timeout(Duration::from_secs(30))
26            .build()
27            .expect("Failed to create HTTP client");
28
29        Self {
30            endpoint: endpoint.into(),
31            client,
32            next_id: Arc::new(AtomicI64::new(1)),
33        }
34    }
35
36    /// Create with custom timeout.
37    pub fn with_timeout(
38        endpoint: impl Into<String>,
39        timeout: Duration,
40    ) -> Result<Self, McpTransportError> {
41        let client = reqwest::Client::builder()
42            .timeout(timeout)
43            .build()
44            .map_err(|e| {
45                McpTransportError::TransportError(format!("Failed to create HTTP client: {}", e))
46            })?;
47
48        Ok(Self {
49            endpoint: endpoint.into(),
50            client,
51            next_id: Arc::new(AtomicI64::new(1)),
52        })
53    }
54
55    /// Send a JSON-RPC request via HTTP POST.
56    pub async fn send_request(
57        &self,
58        method: &str,
59        params: Option<Value>,
60    ) -> Result<Value, McpTransportError> {
61        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
62        let request = JsonRpcRequest::new(JsonRpcId::Number(id), method, params);
63
64        let response = self
65            .client
66            .post(&self.endpoint)
67            .json(&request)
68            .send()
69            .await
70            .map_err(|e| {
71                McpTransportError::TransportError(format!("HTTP request failed: {}", e))
72            })?;
73
74        if !response.status().is_success() {
75            return Err(McpTransportError::TransportError(format!(
76                "HTTP error: {} - {}",
77                response.status(),
78                response.text().await.unwrap_or_default()
79            )));
80        }
81
82        let json_response: JsonRpcResponse = response.json().await.map_err(|e| {
83            McpTransportError::TransportError(format!("Failed to parse JSON response: {}", e))
84        })?;
85
86        match json_response.payload {
87            JsonRpcPayload::Success { result } => Ok(result),
88            JsonRpcPayload::Error { error } => Err(McpTransportError::ServerError(format!(
89                "MCP Error: {}",
90                error
91            ))),
92        }
93    }
94
95    /// Health check - verify the endpoint is reachable.
96    pub async fn health_check(&self) -> bool {
97        self.client
98            .head(&self.endpoint)
99            .send()
100            .await
101            .map(|r| r.status().is_success())
102            .unwrap_or(false)
103    }
104
105    /// Get the endpoint URL.
106    pub fn endpoint(&self) -> &str {
107        &self.endpoint
108    }
109}
110
111/// Adapter that wraps HttpTransport and implements McpTransport.
112pub struct HttpTransportAdapter {
113    inner: HttpTransport,
114}
115
116impl HttpTransportAdapter {
117    /// Create a new HTTP transport adapter.
118    pub fn new(endpoint: impl Into<String>) -> Self {
119        Self {
120            inner: HttpTransport::new(endpoint),
121        }
122    }
123
124    /// Create with custom timeout.
125    pub fn with_timeout(
126        endpoint: impl Into<String>,
127        timeout: Duration,
128    ) -> Result<Self, McpTransportError> {
129        Ok(Self {
130            inner: HttpTransport::with_timeout(endpoint, timeout)?,
131        })
132    }
133}
134
135#[async_trait]
136impl McpTransport for HttpTransportAdapter {
137    async fn list_tools(&self) -> Result<Vec<McpToolDefinition>, McpTransportError> {
138        let result = self
139            .inner
140            .send_request("tools/list", Some(serde_json::json!({})))
141            .await?;
142
143        let list_result: ListToolsResult = serde_json::from_value(result)?;
144
145        Ok(list_result.tools)
146    }
147
148    async fn call_tool(&self, name: &str, args: Value) -> Result<Value, McpTransportError> {
149        let params = CallToolParams {
150            name: name.to_string(),
151            arguments: Some(args),
152            task: None,
153            meta: None,
154        };
155
156        let result = self
157            .inner
158            .send_request("tools/call", Some(serde_json::to_value(&params)?))
159            .await?;
160
161        let call_result: CallToolResult = serde_json::from_value(result)?;
162
163        if call_result.is_error == Some(true) {
164            let error_text = call_result
165                .content
166                .first()
167                .and_then(|c| c.as_text())
168                .unwrap_or("Unknown error");
169            return Err(McpTransportError::ServerError(error_text.to_string()));
170        }
171
172        let text = call_result
173            .content
174            .iter()
175            .filter_map(|c| c.as_text())
176            .collect::<Vec<_>>()
177            .join("\n");
178
179        Ok(Value::String(text))
180    }
181
182    async fn shutdown(&self) -> Result<(), McpTransportError> {
183        // HTTP transport doesn't need explicit shutdown
184        Ok(())
185    }
186
187    fn is_alive(&self) -> bool {
188        // For HTTP, we'd need to do a health check
189        // For now, assume alive (caller can use health_check() if needed)
190        true
191    }
192
193    fn transport_type(&self) -> TransportTypeId {
194        TransportTypeId::Http
195    }
196}
197
198#[cfg(test)]
199mod tests {
200    use super::*;
201
202    #[tokio::test]
203    async fn test_http_transport_creation() {
204        let transport = HttpTransport::new("http://localhost:8080/mcp");
205        assert_eq!(transport.endpoint(), "http://localhost:8080/mcp");
206    }
207
208    #[tokio::test]
209    async fn test_http_transport_with_timeout() {
210        let transport =
211            HttpTransport::with_timeout("http://localhost:8080/mcp", Duration::from_secs(10));
212        assert!(transport.is_ok());
213    }
214
215    #[test]
216    fn test_transport_type() {
217        assert_eq!(TransportTypeId::Http.to_string(), "http");
218    }
219}