Skip to main content

phi_core/mcp/
client.rs

1//! High-level MCP client.
2/*
3ARCHITECTURE: McpClient — the MCP protocol layer
4
5`McpClient` is a stateful wrapper that manages the full MCP connection lifecycle:
6  1. Connect: `connect_stdio()` spawns a process; `connect_http()` opens an HTTP session
7  2. Handshake: `initialize()` sends the `initialize` + `notifications/initialized` messages
8  3. Discover: `list_tools()` fetches available tools from the server
9  4. Execute: `call_tool(name, arguments)` invokes a specific tool
10  5. Shutdown: `close()` kills the process or closes the HTTP connection
11
12The MCP "handshake" is mandatory — servers refuse requests unless the client
13first sends `initialize` with the protocol version and client capabilities.
14After the server responds, we send `notifications/initialized` (a one-way notification).
15
16RUST QUIRK: `Arc<Mutex<Box<dyn McpTransport>>>` — three layers of wrapping, each with a purpose
17
18Let's unpack from the inside out:
19  `Box<dyn McpTransport>`      — heap-allocated trait object (type-erased transport)
20  `Mutex<Box<dyn McpTransport>>` — exclusive access lock (one request at a time)
21    The MCP stdio transport is NOT safe for concurrent requests — each request/response
22    must complete before the next starts. `Mutex` enforces this.
23  `Arc<Mutex<Box<dyn McpTransport>>>` — shared ownership
24    `McpClient` is cloned and passed to multiple `McpToolAdapter` instances.
25    `Arc` lets all adapters share the same underlying transport safely.
26    `.clone()` on `Arc` just bumps a reference count — cheap.
27
28Python analogy:
29  self._transport = threading.Lock()  # wrapping a transport object
30  # Arc is implicit in Python (reference counting + GIL)
31*/
32
33use super::transport::{HttpTransport, McpTransport, StdioTransport, DEFAULT_REQUEST_TIMEOUT};
34use super::types::*;
35use std::collections::HashMap;
36use std::sync::Arc;
37use std::time::Duration;
38use tokio::sync::Mutex;
39
40/// Configuration knobs for [`McpClient`] construction.
41///
42/// Currently only carries the per-request timeout, but kept as a struct so future
43/// options can be added without breaking the public API.
44#[derive(Debug, Clone)]
45pub struct McpClientConfig {
46    /// Per-request timeout applied to every transport `send()` call.
47    pub request_timeout: Duration,
48}
49
50impl Default for McpClientConfig {
51    fn default() -> Self {
52        Self {
53            request_timeout: DEFAULT_REQUEST_TIMEOUT,
54        }
55    }
56}
57
58/// High-level MCP client that manages connection lifecycle and protocol.
59pub struct McpClient {
60    transport: Arc<Mutex<Box<dyn McpTransport>>>, // shared, locked, type-erased transport
61    server_info: Option<ServerInfo>,              // populated after initialize()
62    capabilities: Option<ServerCapabilities>,     // populated after initialize()
63}
64
65impl McpClient {
66    /// Connect to an MCP server via stdio (spawn a child process).
67    ///
68    /// Uses the default per-request timeout (`DEFAULT_REQUEST_TIMEOUT`, 30 s).
69    /// For a custom timeout, use [`McpClient::connect_stdio_with_config`].
70    pub async fn connect_stdio(
71        command: &str, // EXECUTABLE — binary to spawn as the MCP server subprocess
72        args: &[&str], // ARGV — command-line args for the subprocess
73        env: Option<HashMap<String, String>>, // ENV OVERRIDES — extra env vars; None = inherit parent env
74    ) -> Result<Self, McpError> {
75        Self::connect_stdio_with_config(command, args, env, McpClientConfig::default()).await
76    }
77
78    /// Connect to an MCP server via stdio with custom configuration.
79    pub async fn connect_stdio_with_config(
80        command: &str,
81        args: &[&str],
82        env: Option<HashMap<String, String>>,
83        config: McpClientConfig,
84    ) -> Result<Self, McpError> {
85        let transport = StdioTransport::new(command, args, env)
86            .await?
87            .with_timeout(config.request_timeout);
88        let mut client = Self {
89            transport: Arc::new(Mutex::new(Box::new(transport))),
90            server_info: None,
91            capabilities: None,
92        };
93        client.initialize().await?;
94        Ok(client)
95    }
96
97    /// Connect to an MCP server via HTTP.
98    ///
99    /// Uses the default per-request timeout (`DEFAULT_REQUEST_TIMEOUT`, 30 s).
100    /// For a custom timeout, use [`McpClient::connect_http_with_config`].
101    pub async fn connect_http(url: &str) -> Result<Self, McpError> {
102        Self::connect_http_with_config(url, McpClientConfig::default()).await
103    }
104
105    /// Connect to an MCP server via HTTP with custom configuration.
106    pub async fn connect_http_with_config(
107        url: &str,
108        config: McpClientConfig,
109    ) -> Result<Self, McpError> {
110        let transport = HttpTransport::new_with_timeout(url, config.request_timeout)?;
111        let mut client = Self {
112            transport: Arc::new(Mutex::new(Box::new(transport))),
113            server_info: None,
114            capabilities: None,
115        };
116        client.initialize().await?;
117        Ok(client)
118    }
119
120    /// Create from an existing transport (useful for testing).
121    pub fn from_transport(transport: Box<dyn McpTransport>) -> Self {
122        Self {
123            transport: Arc::new(Mutex::new(transport)),
124            server_info: None,
125            capabilities: None,
126        }
127    }
128
129    /// Initialize the MCP connection (handshake).
130    pub async fn initialize(&mut self) -> Result<ServerInfo, McpError> {
131        let params = serde_json::json!({
132            "protocolVersion": "2024-11-05",
133            "capabilities": {},
134            "clientInfo": ClientInfo::default()
135        });
136
137        let request = JsonRpcRequest::new("initialize", Some(params));
138        let response = self.send_request(request).await?;
139
140        let result: InitializeResult = serde_json::from_value(response)?;
141        self.server_info = Some(result.server_info.clone());
142        self.capabilities = Some(result.capabilities);
143
144        // Send initialized notification (no response expected, but we send it as a request
145        // since our transport is request/response. Some servers ignore the id on notifications.)
146        let notify = JsonRpcRequest::new("notifications/initialized", None);
147        // Best-effort: ignore errors on the notification
148        let _ = self.send_request(notify).await;
149
150        Ok(result.server_info)
151    }
152
153    /// List available tools from the server.
154    pub async fn list_tools(&self) -> Result<Vec<McpToolInfo>, McpError> {
155        let request = JsonRpcRequest::new("tools/list", Some(serde_json::json!({})));
156        let response = self.send_request(request).await?;
157
158        let result: ToolsListResult = serde_json::from_value(response)?;
159        Ok(result.tools)
160    }
161
162    /// Call a tool on the server.
163    /*
164    DESIGN: Why `name` AND `arguments` are separate parameters
165      `name`      = SELECTOR — which tool on the MCP server to invoke (like a function name)
166      `arguments` = INPUT    — the JSON arguments for that specific invocation
167    Mirrors the same registry-vs-invocation split as AgentTool: the server has a registry of
168    named tools; each call selects one by name and provides the arguments for that call.
169    */
170    pub async fn call_tool(
171        &self,
172        name: &str, // SELECTOR — tool name on the MCP server (must match tools/list result)
173        arguments: serde_json::Value, // INPUT — JSON arguments for this invocation (schema defined by the server)
174    ) -> Result<McpToolCallResult, McpError> {
175        let params = serde_json::json!({
176            "name": name,
177            "arguments": arguments
178        });
179
180        let request = JsonRpcRequest::new("tools/call", Some(params));
181        let response = self.send_request(request).await?;
182
183        let result: McpToolCallResult = serde_json::from_value(response)?;
184        Ok(result)
185    }
186
187    /// Close the connection.
188    pub async fn close(&self) -> Result<(), McpError> {
189        self.transport.lock().await.close().await
190    }
191
192    /// Get server info (available after initialize).
193    pub fn server_info(&self) -> Option<&ServerInfo> {
194        self.server_info.as_ref()
195    }
196
197    /// Send a request and extract the result value, classifying errors.
198    /*
199    RUST QUIRK: `self.transport.lock().await` — async mutex acquisition
200
201    `tokio::sync::Mutex::lock()` returns a future that resolves when the lock is acquired.
202    `.await` suspends the current task (not the OS thread!) until the lock is free.
203    Returns `MutexGuard<Box<dyn McpTransport>>` — auto-unlocks when guard is dropped.
204
205    The guard is held for the duration of `transport.send(request).await?`, then dropped.
206    This is important: if we made TWO requests concurrently, the second would wait here
207    until the first's guard drops. Serial request ordering is enforced.
208
209    ARCHITECTURE: Error classification
210    JSON-RPC errors come in two forms:
211      1. Transport error: network failure, process died, parse error
212         → `McpError::Transport(String)` or `McpError::Protocol(String)`
213      2. Application error: server-side error (tool not found, invalid arguments)
214         → `response.error` is populated, returned as `McpError::JsonRpc { code, message }`
215
216    After extracting the result, we also handle the case where NEITHER `result` nor `error`
217    is present — technically invalid JSON-RPC but defensive against buggy servers.
218    */
219    async fn send_request(&self, request: JsonRpcRequest) -> Result<serde_json::Value, McpError> {
220        let transport = self.transport.lock().await; // acquire exclusive lock
221        let response = transport.send(request).await?; // blocks until response arrives
222
223        if let Some(error) = response.error {
224            return Err(McpError::JsonRpc {
225                code: error.code,
226                message: error.message,
227            });
228        }
229
230        response
231            .result
232            .ok_or_else(|| McpError::Protocol("Response has neither result nor error".into()))
233    }
234}
235
236#[cfg(test)]
237mod tests {
238    use super::*;
239
240    // Integration test would require a running MCP server.
241    // Unit tests for the client logic are covered via mock transport in tool_adapter tests.
242
243    #[test]
244    fn test_client_info_default() {
245        let info = ClientInfo::default();
246        assert_eq!(info.name, "phi-core");
247        assert!(!info.version.is_empty());
248    }
249}