Skip to main content

synwire_agent/mcp/
http.rs

1//! HTTP and SSE MCP transports.
2
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5
6use serde_json::Value;
7use tokio::sync::Mutex;
8
9use synwire_core::BoxFuture;
10use synwire_core::agents::error::AgentError;
11use synwire_core::mcp::traits::{
12    McpConnectionState, McpServerStatus, McpToolDescriptor, McpTransport,
13};
14
15// ---------------------------------------------------------------------------
16// HttpMcpTransport
17// ---------------------------------------------------------------------------
18
19/// MCP transport that communicates with an HTTP-based MCP server.
20#[derive(Debug)]
21pub struct HttpMcpTransport {
22    name: String,
23    base_url: String,
24    auth_token: Option<String>,
25    /// Per-request timeout applied to each HTTP call via
26    /// `reqwest::RequestBuilder::timeout`.  Also set as the global client
27    /// timeout at construction time for defence-in-depth.
28    timeout: std::time::Duration,
29    client: reqwest::Client,
30    state: Arc<Mutex<McpConnectionState>>,
31    calls_succeeded: AtomicU64,
32    calls_failed: AtomicU64,
33    enabled: Arc<std::sync::atomic::AtomicBool>,
34}
35
36impl HttpMcpTransport {
37    /// Create a new HTTP MCP transport, returning an error if the HTTP client
38    /// cannot be built (e.g. TLS initialisation failure).
39    pub fn try_new(
40        name: impl Into<String>,
41        base_url: impl Into<String>,
42        auth_token: Option<String>,
43        timeout_secs: Option<u64>,
44    ) -> Result<Self, AgentError> {
45        let timeout = std::time::Duration::from_secs(timeout_secs.unwrap_or(30));
46        let client = reqwest::Client::builder()
47            .timeout(timeout)
48            .build()
49            .map_err(|e| AgentError::Tool(format!("failed to build HTTP client: {e}")))?;
50
51        Ok(Self {
52            name: name.into(),
53            base_url: base_url.into(),
54            auth_token,
55            timeout,
56            client,
57            state: Arc::new(Mutex::new(McpConnectionState::Disconnected)),
58            calls_succeeded: AtomicU64::new(0),
59            calls_failed: AtomicU64::new(0),
60            enabled: Arc::new(std::sync::atomic::AtomicBool::new(true)),
61        })
62    }
63
64    async fn post(&self, path: &str, body: Value) -> Result<Value, AgentError> {
65        let url = format!("{}{}", self.base_url.trim_end_matches('/'), path);
66        let mut req = self.client.post(&url).json(&body).timeout(self.timeout);
67        if let Some(token) = &self.auth_token {
68            req = req.bearer_auth(token);
69        }
70        let resp = req
71            .send()
72            .await
73            .map_err(|e| AgentError::Tool(e.to_string()))?;
74        if !resp.status().is_success() {
75            let status = resp.status();
76            let text = resp.text().await.unwrap_or_default();
77            return Err(AgentError::Tool(format!("MCP HTTP error {status}: {text}")));
78        }
79        resp.json::<Value>()
80            .await
81            .map_err(|e| AgentError::Tool(e.to_string()))
82    }
83}
84
85impl McpTransport for HttpMcpTransport {
86    fn connect(&self) -> BoxFuture<'_, Result<(), AgentError>> {
87        Box::pin(async move {
88            *self.state.lock().await = McpConnectionState::Connecting;
89            // Perform a health-check by listing tools.
90            match self.post("/tools/list", serde_json::json!({})).await {
91                Ok(_) => {
92                    *self.state.lock().await = McpConnectionState::Connected;
93                    tracing::info!(server = %self.name, "MCP HTTP server connected");
94                    Ok(())
95                }
96                Err(e) => {
97                    *self.state.lock().await = McpConnectionState::Disconnected;
98                    Err(e)
99                }
100            }
101        })
102    }
103
104    fn reconnect(&self) -> BoxFuture<'_, Result<(), AgentError>> {
105        Box::pin(async move {
106            *self.state.lock().await = McpConnectionState::Reconnecting;
107            self.connect().await
108        })
109    }
110
111    fn status(&self) -> BoxFuture<'_, McpServerStatus> {
112        Box::pin(async move {
113            McpServerStatus {
114                name: self.name.clone(),
115                state: *self.state.lock().await,
116                calls_succeeded: self.calls_succeeded.load(Ordering::Relaxed),
117                calls_failed: self.calls_failed.load(Ordering::Relaxed),
118                enabled: self.enabled.load(Ordering::Relaxed),
119            }
120        })
121    }
122
123    fn list_tools(&self) -> BoxFuture<'_, Result<Vec<McpToolDescriptor>, AgentError>> {
124        Box::pin(async move {
125            let resp = self.post("/tools/list", serde_json::json!({})).await?;
126            let tools = resp["tools"]
127                .as_array()
128                .cloned()
129                .unwrap_or_default()
130                .into_iter()
131                .filter_map(|t| serde_json::from_value(t).ok())
132                .collect();
133            Ok(tools)
134        })
135    }
136
137    fn call_tool(
138        &self,
139        tool_name: &str,
140        arguments: Value,
141    ) -> BoxFuture<'_, Result<Value, AgentError>> {
142        let tool_name = tool_name.to_string();
143        Box::pin(async move {
144            let result = self
145                .post(
146                    "/tools/call",
147                    serde_json::json!({ "name": tool_name, "arguments": arguments }),
148                )
149                .await;
150            match &result {
151                Ok(_) => {
152                    let _ = self.calls_succeeded.fetch_add(1, Ordering::Relaxed);
153                }
154                Err(_) => {
155                    let _ = self.calls_failed.fetch_add(1, Ordering::Relaxed);
156                }
157            }
158            result.and_then(|r| {
159                r.get("result")
160                    .cloned()
161                    .ok_or_else(|| AgentError::Tool("MCP response missing 'result' field".into()))
162            })
163        })
164    }
165
166    fn disconnect(&self) -> BoxFuture<'_, Result<(), AgentError>> {
167        Box::pin(async move {
168            *self.state.lock().await = McpConnectionState::Shutdown;
169            tracing::info!(server = %self.name, "MCP HTTP server disconnected");
170            Ok(())
171        })
172    }
173}