synwire_agent/mcp/
http.rs1use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5
6use serde_json::Value;
7use tokio::sync::Mutex;
8
9use synwire_core::BoxFuture;
10use synwire_core::agents::error::AgentError;
11use synwire_core::mcp::traits::{
12 McpConnectionState, McpServerStatus, McpToolDescriptor, McpTransport,
13};
14
15#[derive(Debug)]
21pub struct HttpMcpTransport {
22 name: String,
23 base_url: String,
24 auth_token: Option<String>,
25 timeout: std::time::Duration,
29 client: reqwest::Client,
30 state: Arc<Mutex<McpConnectionState>>,
31 calls_succeeded: AtomicU64,
32 calls_failed: AtomicU64,
33 enabled: Arc<std::sync::atomic::AtomicBool>,
34}
35
36impl HttpMcpTransport {
37 pub fn try_new(
40 name: impl Into<String>,
41 base_url: impl Into<String>,
42 auth_token: Option<String>,
43 timeout_secs: Option<u64>,
44 ) -> Result<Self, AgentError> {
45 let timeout = std::time::Duration::from_secs(timeout_secs.unwrap_or(30));
46 let client = reqwest::Client::builder()
47 .timeout(timeout)
48 .build()
49 .map_err(|e| AgentError::Tool(format!("failed to build HTTP client: {e}")))?;
50
51 Ok(Self {
52 name: name.into(),
53 base_url: base_url.into(),
54 auth_token,
55 timeout,
56 client,
57 state: Arc::new(Mutex::new(McpConnectionState::Disconnected)),
58 calls_succeeded: AtomicU64::new(0),
59 calls_failed: AtomicU64::new(0),
60 enabled: Arc::new(std::sync::atomic::AtomicBool::new(true)),
61 })
62 }
63
64 async fn post(&self, path: &str, body: Value) -> Result<Value, AgentError> {
65 let url = format!("{}{}", self.base_url.trim_end_matches('/'), path);
66 let mut req = self.client.post(&url).json(&body).timeout(self.timeout);
67 if let Some(token) = &self.auth_token {
68 req = req.bearer_auth(token);
69 }
70 let resp = req
71 .send()
72 .await
73 .map_err(|e| AgentError::Tool(e.to_string()))?;
74 if !resp.status().is_success() {
75 let status = resp.status();
76 let text = resp.text().await.unwrap_or_default();
77 return Err(AgentError::Tool(format!("MCP HTTP error {status}: {text}")));
78 }
79 resp.json::<Value>()
80 .await
81 .map_err(|e| AgentError::Tool(e.to_string()))
82 }
83}
84
85impl McpTransport for HttpMcpTransport {
86 fn connect(&self) -> BoxFuture<'_, Result<(), AgentError>> {
87 Box::pin(async move {
88 *self.state.lock().await = McpConnectionState::Connecting;
89 match self.post("/tools/list", serde_json::json!({})).await {
91 Ok(_) => {
92 *self.state.lock().await = McpConnectionState::Connected;
93 tracing::info!(server = %self.name, "MCP HTTP server connected");
94 Ok(())
95 }
96 Err(e) => {
97 *self.state.lock().await = McpConnectionState::Disconnected;
98 Err(e)
99 }
100 }
101 })
102 }
103
104 fn reconnect(&self) -> BoxFuture<'_, Result<(), AgentError>> {
105 Box::pin(async move {
106 *self.state.lock().await = McpConnectionState::Reconnecting;
107 self.connect().await
108 })
109 }
110
111 fn status(&self) -> BoxFuture<'_, McpServerStatus> {
112 Box::pin(async move {
113 McpServerStatus {
114 name: self.name.clone(),
115 state: *self.state.lock().await,
116 calls_succeeded: self.calls_succeeded.load(Ordering::Relaxed),
117 calls_failed: self.calls_failed.load(Ordering::Relaxed),
118 enabled: self.enabled.load(Ordering::Relaxed),
119 }
120 })
121 }
122
123 fn list_tools(&self) -> BoxFuture<'_, Result<Vec<McpToolDescriptor>, AgentError>> {
124 Box::pin(async move {
125 let resp = self.post("/tools/list", serde_json::json!({})).await?;
126 let tools = resp["tools"]
127 .as_array()
128 .cloned()
129 .unwrap_or_default()
130 .into_iter()
131 .filter_map(|t| serde_json::from_value(t).ok())
132 .collect();
133 Ok(tools)
134 })
135 }
136
137 fn call_tool(
138 &self,
139 tool_name: &str,
140 arguments: Value,
141 ) -> BoxFuture<'_, Result<Value, AgentError>> {
142 let tool_name = tool_name.to_string();
143 Box::pin(async move {
144 let result = self
145 .post(
146 "/tools/call",
147 serde_json::json!({ "name": tool_name, "arguments": arguments }),
148 )
149 .await;
150 match &result {
151 Ok(_) => {
152 let _ = self.calls_succeeded.fetch_add(1, Ordering::Relaxed);
153 }
154 Err(_) => {
155 let _ = self.calls_failed.fetch_add(1, Ordering::Relaxed);
156 }
157 }
158 result.and_then(|r| {
159 r.get("result")
160 .cloned()
161 .ok_or_else(|| AgentError::Tool("MCP response missing 'result' field".into()))
162 })
163 })
164 }
165
166 fn disconnect(&self) -> BoxFuture<'_, Result<(), AgentError>> {
167 Box::pin(async move {
168 *self.state.lock().await = McpConnectionState::Shutdown;
169 tracing::info!(server = %self.name, "MCP HTTP server disconnected");
170 Ok(())
171 })
172 }
173}