phi_core/mcp/client.rs
1//! High-level MCP client.
2/*
3ARCHITECTURE: McpClient — the MCP protocol layer
4
5`McpClient` is a stateful wrapper that manages the full MCP connection lifecycle:
6 1. Connect: `connect_stdio()` spawns a process; `connect_http()` opens an HTTP session
7 2. Handshake: `initialize()` sends the `initialize` + `notifications/initialized` messages
8 3. Discover: `list_tools()` fetches available tools from the server
9 4. Execute: `call_tool(name, arguments)` invokes a specific tool
10 5. Shutdown: `close()` kills the process or closes the HTTP connection
11
12The MCP "handshake" is mandatory — servers refuse requests unless the client
13first sends `initialize` with the protocol version and client capabilities.
14After the server responds, we send `notifications/initialized` (a one-way notification).
15
16RUST QUIRK: `Arc<Mutex<Box<dyn McpTransport>>>` — three layers of wrapping, each with a purpose
17
18Let's unpack from the inside out:
19 `Box<dyn McpTransport>` — heap-allocated trait object (type-erased transport)
20 `Mutex<Box<dyn McpTransport>>` — exclusive access lock (one request at a time)
21 The MCP stdio transport is NOT safe for concurrent requests — each request/response
22 must complete before the next starts. `Mutex` enforces this.
23 `Arc<Mutex<Box<dyn McpTransport>>>` — shared ownership
24 `McpClient` is cloned and passed to multiple `McpToolAdapter` instances.
25 `Arc` lets all adapters share the same underlying transport safely.
26 `.clone()` on `Arc` just bumps a reference count — cheap.
27
28Python analogy:
29 self._transport = threading.Lock() # wrapping a transport object
30 # Arc is implicit in Python (reference counting + GIL)
31*/
32
33use super::transport::{HttpTransport, McpTransport, StdioTransport, DEFAULT_REQUEST_TIMEOUT};
34use super::types::*;
35use std::collections::HashMap;
36use std::sync::Arc;
37use std::time::Duration;
38use tokio::sync::Mutex;
39
40/// Configuration knobs for [`McpClient`] construction.
41///
42/// Currently only carries the per-request timeout, but kept as a struct so future
43/// options can be added without breaking the public API.
44#[derive(Debug, Clone)]
45pub struct McpClientConfig {
46 /// Per-request timeout applied to every transport `send()` call.
47 pub request_timeout: Duration,
48}
49
50impl Default for McpClientConfig {
51 fn default() -> Self {
52 Self {
53 request_timeout: DEFAULT_REQUEST_TIMEOUT,
54 }
55 }
56}
57
58/// High-level MCP client that manages connection lifecycle and protocol.
59pub struct McpClient {
60 transport: Arc<Mutex<Box<dyn McpTransport>>>, // shared, locked, type-erased transport
61 server_info: Option<ServerInfo>, // populated after initialize()
62 capabilities: Option<ServerCapabilities>, // populated after initialize()
63}
64
65impl McpClient {
66 /// Connect to an MCP server via stdio (spawn a child process).
67 ///
68 /// Uses the default per-request timeout (`DEFAULT_REQUEST_TIMEOUT`, 30 s).
69 /// For a custom timeout, use [`McpClient::connect_stdio_with_config`].
70 pub async fn connect_stdio(
71 command: &str, // EXECUTABLE — binary to spawn as the MCP server subprocess
72 args: &[&str], // ARGV — command-line args for the subprocess
73 env: Option<HashMap<String, String>>, // ENV OVERRIDES — extra env vars; None = inherit parent env
74 ) -> Result<Self, McpError> {
75 Self::connect_stdio_with_config(command, args, env, McpClientConfig::default()).await
76 }
77
78 /// Connect to an MCP server via stdio with custom configuration.
79 pub async fn connect_stdio_with_config(
80 command: &str,
81 args: &[&str],
82 env: Option<HashMap<String, String>>,
83 config: McpClientConfig,
84 ) -> Result<Self, McpError> {
85 let transport = StdioTransport::new(command, args, env)
86 .await?
87 .with_timeout(config.request_timeout);
88 let mut client = Self {
89 transport: Arc::new(Mutex::new(Box::new(transport))),
90 server_info: None,
91 capabilities: None,
92 };
93 client.initialize().await?;
94 Ok(client)
95 }
96
97 /// Connect to an MCP server via HTTP.
98 ///
99 /// Uses the default per-request timeout (`DEFAULT_REQUEST_TIMEOUT`, 30 s).
100 /// For a custom timeout, use [`McpClient::connect_http_with_config`].
101 pub async fn connect_http(url: &str) -> Result<Self, McpError> {
102 Self::connect_http_with_config(url, McpClientConfig::default()).await
103 }
104
105 /// Connect to an MCP server via HTTP with custom configuration.
106 pub async fn connect_http_with_config(
107 url: &str,
108 config: McpClientConfig,
109 ) -> Result<Self, McpError> {
110 let transport = HttpTransport::new_with_timeout(url, config.request_timeout)?;
111 let mut client = Self {
112 transport: Arc::new(Mutex::new(Box::new(transport))),
113 server_info: None,
114 capabilities: None,
115 };
116 client.initialize().await?;
117 Ok(client)
118 }
119
120 /// Create from an existing transport (useful for testing).
121 pub fn from_transport(transport: Box<dyn McpTransport>) -> Self {
122 Self {
123 transport: Arc::new(Mutex::new(transport)),
124 server_info: None,
125 capabilities: None,
126 }
127 }
128
129 /// Initialize the MCP connection (handshake).
130 pub async fn initialize(&mut self) -> Result<ServerInfo, McpError> {
131 let params = serde_json::json!({
132 "protocolVersion": "2024-11-05",
133 "capabilities": {},
134 "clientInfo": ClientInfo::default()
135 });
136
137 let request = JsonRpcRequest::new("initialize", Some(params));
138 let response = self.send_request(request).await?;
139
140 let result: InitializeResult = serde_json::from_value(response)?;
141 self.server_info = Some(result.server_info.clone());
142 self.capabilities = Some(result.capabilities);
143
144 // Send initialized notification (no response expected, but we send it as a request
145 // since our transport is request/response. Some servers ignore the id on notifications.)
146 let notify = JsonRpcRequest::new("notifications/initialized", None);
147 // Best-effort: ignore errors on the notification
148 let _ = self.send_request(notify).await;
149
150 Ok(result.server_info)
151 }
152
153 /// List available tools from the server.
154 pub async fn list_tools(&self) -> Result<Vec<McpToolInfo>, McpError> {
155 let request = JsonRpcRequest::new("tools/list", Some(serde_json::json!({})));
156 let response = self.send_request(request).await?;
157
158 let result: ToolsListResult = serde_json::from_value(response)?;
159 Ok(result.tools)
160 }
161
162 /// Call a tool on the server.
163 /*
164 DESIGN: Why `name` AND `arguments` are separate parameters
165 `name` = SELECTOR — which tool on the MCP server to invoke (like a function name)
166 `arguments` = INPUT — the JSON arguments for that specific invocation
167 Mirrors the same registry-vs-invocation split as AgentTool: the server has a registry of
168 named tools; each call selects one by name and provides the arguments for that call.
169 */
170 pub async fn call_tool(
171 &self,
172 name: &str, // SELECTOR — tool name on the MCP server (must match tools/list result)
173 arguments: serde_json::Value, // INPUT — JSON arguments for this invocation (schema defined by the server)
174 ) -> Result<McpToolCallResult, McpError> {
175 let params = serde_json::json!({
176 "name": name,
177 "arguments": arguments
178 });
179
180 let request = JsonRpcRequest::new("tools/call", Some(params));
181 let response = self.send_request(request).await?;
182
183 let result: McpToolCallResult = serde_json::from_value(response)?;
184 Ok(result)
185 }
186
187 /// Close the connection.
188 pub async fn close(&self) -> Result<(), McpError> {
189 self.transport.lock().await.close().await
190 }
191
192 /// Get server info (available after initialize).
193 pub fn server_info(&self) -> Option<&ServerInfo> {
194 self.server_info.as_ref()
195 }
196
197 /// Send a request and extract the result value, classifying errors.
198 /*
199 RUST QUIRK: `self.transport.lock().await` — async mutex acquisition
200
201 `tokio::sync::Mutex::lock()` returns a future that resolves when the lock is acquired.
202 `.await` suspends the current task (not the OS thread!) until the lock is free.
203 Returns `MutexGuard<Box<dyn McpTransport>>` — auto-unlocks when guard is dropped.
204
205 The guard is held for the duration of `transport.send(request).await?`, then dropped.
206 This is important: if we made TWO requests concurrently, the second would wait here
207 until the first's guard drops. Serial request ordering is enforced.
208
209 ARCHITECTURE: Error classification
210 JSON-RPC errors come in two forms:
211 1. Transport error: network failure, process died, parse error
212 → `McpError::Transport(String)` or `McpError::Protocol(String)`
213 2. Application error: server-side error (tool not found, invalid arguments)
214 → `response.error` is populated, returned as `McpError::JsonRpc { code, message }`
215
216 After extracting the result, we also handle the case where NEITHER `result` nor `error`
217 is present — technically invalid JSON-RPC but defensive against buggy servers.
218 */
219 async fn send_request(&self, request: JsonRpcRequest) -> Result<serde_json::Value, McpError> {
220 let transport = self.transport.lock().await; // acquire exclusive lock
221 let response = transport.send(request).await?; // blocks until response arrives
222
223 if let Some(error) = response.error {
224 return Err(McpError::JsonRpc {
225 code: error.code,
226 message: error.message,
227 });
228 }
229
230 response
231 .result
232 .ok_or_else(|| McpError::Protocol("Response has neither result nor error".into()))
233 }
234}
235
236#[cfg(test)]
237mod tests {
238 use super::*;
239
240 // Integration test would require a running MCP server.
241 // Unit tests for the client logic are covered via mock transport in tool_adapter tests.
242
243 #[test]
244 fn test_client_info_default() {
245 let info = ClientInfo::default();
246 assert_eq!(info.name, "phi-core");
247 assert!(!info.version.is_empty());
248 }
249}