Skip to main content

objectiveai_sdk/mcp/
client.rs

1//! MCP client for creating connections to MCP servers.
2
3use std::time::Duration;
4
5use indexmap::IndexMap;
6
7/// Client for creating MCP connections.
8///
9/// Holds shared configuration (HTTP client, headers, backoff parameters)
10/// and creates [`Connection`](super::Connection) instances via
11/// [`connect`](Client::connect).
12#[derive(Debug, Clone)]
13pub struct Client {
14    /// HTTP client for making requests.
15    pub http_client: reqwest::Client,
16    /// User-Agent header value.
17    pub user_agent: String,
18    /// X-Title header value.
19    pub x_title: String,
20    /// Referer header value.
21    pub http_referer: String,
22    /// Timeout for the initial connection (initialize request).
23    pub connect_timeout: Duration,
24
25    /// Current backoff interval for retry logic.
26    pub backoff_current_interval: Duration,
27    /// Initial backoff interval for retry logic.
28    pub backoff_initial_interval: Duration,
29    /// Randomization factor for backoff jitter.
30    pub backoff_randomization_factor: f64,
31    /// Multiplier for exponential backoff growth.
32    pub backoff_multiplier: f64,
33    /// Maximum backoff interval.
34    pub backoff_max_interval: Duration,
35    /// Maximum total time to spend on retries.
36    pub backoff_max_elapsed_time: Duration,
37    /// Timeout for individual RPC calls after connection is established.
38    pub call_timeout: Duration,
39}
40
41impl Client {
42    /// Creates a new MCP client.
43    pub fn new(
44        http_client: reqwest::Client,
45        user_agent: String,
46        x_title: String,
47        http_referer: String,
48        connect_timeout: Duration,
49        backoff_current_interval: Duration,
50        backoff_initial_interval: Duration,
51        backoff_randomization_factor: f64,
52        backoff_multiplier: f64,
53        backoff_max_interval: Duration,
54        backoff_max_elapsed_time: Duration,
55        call_timeout: Duration,
56    ) -> Self {
57        Self {
58            http_client,
59            user_agent,
60            x_title,
61            http_referer,
62            connect_timeout,
63            backoff_current_interval,
64            backoff_initial_interval,
65            backoff_randomization_factor,
66            backoff_multiplier,
67            backoff_max_interval,
68            backoff_max_elapsed_time,
69            call_timeout,
70        }
71    }
72
73    /// Build the canonical header map that the client stamps on every
74    /// request opened under this client / connection. The supplied
75    /// caller map (if any) wins on conflict — defaults are inserted
76    /// only when the caller didn't already provide them. The merged
77    /// map is computed once at the top of `connect_once` and reused
78    /// across all three handshake requests + handed to the resulting
79    /// [`Connection`] for every later RPC.
80    fn headers(
81        &self,
82        supplied: Option<IndexMap<String, String>>,
83    ) -> IndexMap<String, String> {
84        let mut out = supplied.unwrap_or_default();
85        out.entry("User-Agent".to_string())
86            .or_insert_with(|| self.user_agent.clone());
87        out.entry("X-Title".to_string())
88            .or_insert_with(|| self.x_title.clone());
89        out.entry("Referer".to_string())
90            .or_insert_with(|| self.http_referer.clone());
91        out.entry("HTTP-Referer".to_string())
92            .or_insert_with(|| self.http_referer.clone());
93        out
94    }
95
96    /// Connects to an MCP server using the Streamable HTTP transport.
97    ///
98    /// Sends an `initialize` JSON-RPC request to the server and extracts
99    /// the `Mcp-Session-Id` from the response. Returns a [`Connection`]
100    /// that can be used to list/call tools and list/read resources.
101    ///
102    /// `headers` are forwarded on every request this connection makes
103    /// to the upstream — both the initial `initialize` POST and every
104    /// subsequent RPC. The client merges its own defaults
105    /// (`User-Agent`, `X-Title`, `Referer`, `HTTP-Referer`) into this
106    /// map, but caller-supplied values for any of those win on
107    /// conflict. `Authorization` (when needed) is just another entry
108    /// in `headers`. The `Mcp-Session-Id` header is reserved — pass it
109    /// via `session_id` instead so the explicit argument can never be
110    /// clobbered by the headers map.
111    ///
112    /// ## SSE handoff
113    ///
114    /// `Accept` is `text/event-stream, application/json` — stream first
115    /// — so the server is encouraged to keep the underlying connection
116    /// open. If the response comes back as SSE we read the initialize
117    /// event off the stream and hand the *still-open* line reader to the
118    /// returned [`Connection`]'s list-changed listener. The listener
119    /// starts reading from that pre-opened stream immediately, which
120    /// closes the race where a peer (e.g. an in-process rmcp upstream)
121    /// would broadcast `notifications/tools/list_changed` before our
122    /// listener had managed to open its own GET `/` SSE.
123    ///
124    /// If the response is unary JSON and the server advertises either
125    /// `tools.list_changed` or `resources.list_changed`, we proactively
126    /// open a GET `/` SSE stream *before returning* and hand it to the
127    /// listener for the same reason. If neither capability is set, no
128    /// listener is needed and we return without touching SSE.
129    pub async fn connect(
130        &self,
131        url: String,
132        session_id: Option<String>,
133        headers: Option<IndexMap<String, String>>,
134    ) -> Result<super::Connection, super::Error> {
135        // Merge the caller's headers with the client's defaults once,
136        // then reuse the same merged map across every retry of the
137        // handshake AND hand it to the resulting Connection for every
138        // later RPC. Caller-supplied headers always win over defaults.
139        let headers = self.headers(headers);
140
141        // One outer backoff retry around all three handshake steps —
142        // initialize POST, notifications/initialized POST, GET / SSE
143        // (when capabilities require it). On a failure of any step we
144        // restart from scratch: a partial handshake leaves server-side
145        // session state we can't reuse, so retrying just the failed
146        // step would reference a session the server already discarded.
147        // Every error is treated as transient — the loop only gives up
148        // when the backoff's `max_elapsed_time` is exceeded.
149        let mut backoff = backoff::ExponentialBackoff {
150            current_interval: self.backoff_current_interval,
151            initial_interval: self.backoff_initial_interval,
152            randomization_factor: self.backoff_randomization_factor,
153            multiplier: self.backoff_multiplier,
154            max_interval: self.backoff_max_interval,
155            start_time: std::time::Instant::now(),
156            max_elapsed_time: Some(self.backoff_max_elapsed_time),
157            clock: backoff::SystemClock::default(),
158        };
159
160        loop {
161            match self
162                .connect_once(&url, session_id.as_deref(), &headers)
163                .await
164            {
165                Ok(conn) => return Ok(conn),
166                Err(e) => {
167                    use backoff::backoff::Backoff;
168                    match backoff.next_backoff() {
169                        Some(d) => tokio::time::sleep(d).await,
170                        None => return Err(e),
171                    }
172                }
173            }
174        }
175    }
176
177    /// Issue a stateless HTTP `DELETE /` to one upstream MCP server,
178    /// telling it to terminate the session identified by `session_id`.
179    ///
180    /// This is the low-level primitive — no backoff, no listener
181    /// teardown, no connection state. Caller is responsible for any
182    /// retry semantics. For the stateful tear-down path that also
183    /// cancels the connection's own active streams and absorbs
184    /// upstream `404 / 401 / 403` as success, use
185    /// [`Connection::delete`](super::Connection::delete) instead.
186    ///
187    /// `headers` is merged with the same defaults `connect` applies
188    /// (`User-Agent`, `X-Title`, `Referer`, `HTTP-Referer`). The
189    /// explicit `session_id` argument always wins over any
190    /// `Mcp-Session-Id` entry that happens to appear in `headers` — the
191    /// shape mirrors `connect`'s argument split for the same reason.
192    ///
193    /// Returns `Ok(())` on any 2xx status. Any non-2xx (including
194    /// `404 Not Found`) surfaces as [`Error::BadStatus`]. Network /
195    /// transport failures surface as [`Error::Request`].
196    pub async fn delete(
197        &self,
198        url: String,
199        session_id: String,
200        headers: Option<IndexMap<String, String>>,
201    ) -> Result<(), super::Error> {
202        let headers = self.headers(headers);
203        let mut request = self
204            .http_client
205            .delete(&url)
206            .timeout(self.call_timeout)
207            .header("Mcp-Session-Id", &session_id);
208        for (name, value) in &headers {
209            // Explicit `session_id` arg always wins.
210            if name.eq_ignore_ascii_case("Mcp-Session-Id") {
211                continue;
212            }
213            request = request.header(name, value);
214        }
215        let response = request.send().await.map_err(|source| {
216            super::Error::Request {
217                url: url.clone(),
218                source,
219            }
220        })?;
221        if !response.status().is_success() {
222            let code = response.status();
223            let body = response.text().await.unwrap_or_default();
224            return Err(super::Error::BadStatus {
225                url,
226                code,
227                body: body.chars().take(800).collect(),
228            });
229        }
230        Ok(())
231    }
232
233    /// One pass through the full Streamable-HTTP handshake. Caller
234    /// applies the outer backoff retry loop in [`Self::connect`].
235    /// `headers` is the already-merged map (defaults + caller overrides
236    /// from [`Self::headers`]), reused on every request without further
237    /// processing. `Mcp-Session-Id` is applied AFTER the headers loop
238    /// so it always wins over any same-named entry in `headers`.
239    async fn connect_once(
240        &self,
241        url: &str,
242        session_id: Option<&str>,
243        headers: &IndexMap<String, String>,
244    ) -> Result<super::Connection, super::Error> {
245        let init_request = serde_json::json!({
246            "jsonrpc": "2.0",
247            "id": 1,
248            "method": "initialize",
249            "params": {
250                "protocolVersion": "2025-06-18",
251                "capabilities": {},
252                "clientInfo": {
253                    "name": "objectiveai",
254                    "version": env!("CARGO_PKG_VERSION"),
255                }
256            }
257        });
258
259        let mut request = self
260            .http_client
261            .post(url)
262            .timeout(self.connect_timeout)
263            .header("Content-Type", "application/json")
264            .header("Accept", "text/event-stream, application/json")
265            .json(&init_request);
266
267        for (name, value) in headers {
268            request = request.header(name, value);
269        }
270        // Mcp-Session-Id is applied last so the explicit `session_id`
271        // argument always wins over any same-named entry in `headers`.
272        if let Some(sid) = session_id {
273            request = request.header("Mcp-Session-Id", sid);
274        }
275
276        let response = request.send().await.map_err(|source| {
277            super::Error::Connection {
278                url: url.to_string(),
279                source,
280            }
281        })?;
282
283        if !response.status().is_success() {
284            let code = response.status();
285            let body = response.text().await.unwrap_or_default();
286            return Err(super::Error::BadStatus {
287                url: url.to_string(),
288                code,
289                body,
290            });
291        }
292
293        // Extract session ID from response header.
294        //
295        // For *new* sessions the server must mint and return a session
296        // id. For *existing* sessions (caller passed `session_id` in)
297        // many servers — including rmcp's `StreamableHttpService` on
298        // its existing-session branch — don't echo the header back
299        // because nothing changed. When the caller already knew the
300        // session id, fall back to it instead of erroring; that's the
301        // value we'll be stamping on every subsequent request anyway.
302        let resolved_session_id = match response
303            .headers()
304            .get("Mcp-Session-Id")
305            .and_then(|v| v.to_str().ok())
306            .map(String::from)
307        {
308            Some(s) => s,
309            None => match session_id {
310                Some(provided) => provided.to_string(),
311                None => {
312                    let body = response.text().await.unwrap_or_default();
313                    return Err(super::Error::NoSessionId {
314                        url: url.to_string(),
315                        body: body.chars().take(800).collect(),
316                    });
317                }
318            },
319        };
320
321        // Did the server return SSE or unary JSON? rmcp's
322        // `StreamableHttpService` always returns SSE; many other servers
323        // reply with bare JSON.
324        let is_sse = response
325            .headers()
326            .get(reqwest::header::CONTENT_TYPE)
327            .and_then(|v| v.to_str().ok())
328            .map(|v| v.starts_with("text/event-stream"))
329            .unwrap_or(false);
330
331        // Parse the initialize response. SSE path consumes one event
332        // from the stream and keeps the rest of the stream alive for
333        // the listener; unary path consumes the whole body.
334        let (initialize_result, mut initial_sse_lines) = if is_sse {
335            let mut lines = super::lines_from_response(response);
336            let rpc_response: super::JsonRpcResponse<
337                super::initialize_result::InitializeResult,
338            > = super::read_next_sse_event(url, &mut lines).await?;
339            let result = match rpc_response {
340                super::JsonRpcResponse::Success { result, .. } => result,
341                super::JsonRpcResponse::Error { error, .. } => {
342                    return Err(super::Error::JsonRpc {
343                        url: url.to_string(),
344                        code: error.code,
345                        message: error.message,
346                        data: error.data,
347                    });
348                }
349            };
350            (result, Some(lines))
351        } else {
352            let rpc_response: super::JsonRpcResponse<
353                super::initialize_result::InitializeResult,
354            > = super::parse_streamable_http_response(url, response).await?;
355            let result = match rpc_response {
356                super::JsonRpcResponse::Success { result, .. } => result,
357                super::JsonRpcResponse::Error { error, .. } => {
358                    return Err(super::Error::JsonRpc {
359                        url: url.to_string(),
360                        code: error.code,
361                        message: error.message,
362                        data: error.data,
363                    });
364                }
365            };
366            (result, None)
367        };
368
369        // Whether we need a notification SSE channel at all.
370        let needs_sse = initialize_result
371            .capabilities
372            .tools
373            .as_ref()
374            .and_then(|t| t.list_changed)
375            .unwrap_or(false)
376            || initialize_result
377                .capabilities
378                .resources
379                .as_ref()
380                .and_then(|r| r.list_changed)
381                .unwrap_or(false);
382
383        // Send `notifications/initialized` BEFORE any other request.
384        // rmcp's per-session worker is in `expect_notification("initialized")`
385        // at this point — anything else (a `tools/list`, an opportunistic
386        // GET `/`) that lands during that window pushes a non-notification
387        // through the worker, makes `serve_server_with_ct_inner` return
388        // `Err(ExpectedInitializedNotification(...))`, drops the
389        // WorkerTransport, cancels the worker via its drop_guard, and
390        // tears the whole session down. Every later POST then 500s with
391        // "Session service terminated."
392        //
393        // We don't have a `Connection` yet — building one would spawn
394        // `refresh_tools` / `refresh_resources` background tasks that
395        // race with this notification, which is exactly the bug we're
396        // avoiding. We therefore POST inline here.
397        let init_notification_body = serde_json::json!({
398            "jsonrpc": "2.0",
399            "method": "notifications/initialized",
400            "params": {},
401        });
402        let mut notify_request = self
403            .http_client
404            .post(url)
405            .timeout(self.call_timeout)
406            .header("Content-Type", "application/json")
407            .header("Accept", "application/json, text/event-stream");
408        for (name, value) in headers {
409            notify_request = notify_request.header(name, value);
410        }
411        notify_request =
412            notify_request.header("Mcp-Session-Id", &resolved_session_id);
413        let notify_response = notify_request
414            .json(&init_notification_body)
415            .send()
416            .await
417            .map_err(|source| super::Error::Request {
418                url: url.to_string(),
419                source,
420            })?;
421        if !notify_response.status().is_success() {
422            let code = notify_response.status();
423            let body = notify_response.text().await.unwrap_or_default();
424            return Err(super::Error::BadStatus {
425                url: url.to_string(),
426                code,
427                body,
428            });
429        }
430
431        // Now safe to drop the init-side SSE stream; rmcp's session
432        // worker is past the init handshake and we have no further use
433        // for it (notifications come on the GET / stream below).
434        drop(initial_sse_lines.take());
435
436        // Now that the server's session worker is past the
437        // `expect_notification` gate, it's safe to open the proactive
438        // GET `/` SSE stream the listener will read
439        // `notifications/{tools,resources}/list_changed` from. Capability
440        // inspection only gates whether we open this stream; the
441        // `Connection` itself is naive about capabilities.
442        let initial_sse_lines: Option<super::LinesStream> = if needs_sse {
443            let mut get_request = self
444                .http_client
445                .get(url)
446                .timeout(self.connect_timeout)
447                .header("Accept", "text/event-stream");
448            for (name, value) in headers {
449                get_request = get_request.header(name, value);
450            }
451            get_request =
452                get_request.header("Mcp-Session-Id", &resolved_session_id);
453            let get_response = get_request.send().await.map_err(|source| {
454                super::Error::Connection {
455                    url: url.to_string(),
456                    source,
457                }
458            })?;
459            if !get_response.status().is_success() {
460                let code = get_response.status();
461                let body = get_response.text().await.unwrap_or_default();
462                return Err(super::Error::BadStatus {
463                    url: url.to_string(),
464                    code,
465                    body,
466                });
467            }
468            Some(super::lines_from_response(get_response))
469        } else {
470            None
471        };
472
473        // Construct the Connection at the very end. This is the only
474        // place in `connect` where the listener task and the
475        // `refresh_tools` / `refresh_resources` background tasks get
476        // spawned — by now the upstream is fully past its init
477        // handshake, so any of those POSTs land safely.
478        //
479        // `was_resumed` is true when the caller passed an existing
480        // `session_id` into `connect` — i.e. this is a re-attach to a
481        // pre-existing upstream session, not a fresh mint. The bit
482        // rides into `ConnectionInner` as `is_reconnect` and gates
483        // the drop-time orphan-DELETE that releases resumed sessions
484        // nobody ended up using.
485        let was_resumed = session_id.is_some();
486        let connection = super::Connection::new(
487            self.http_client.clone(),
488            url.to_string(),
489            resolved_session_id,
490            headers.clone(),
491            self.backoff_current_interval,
492            self.backoff_initial_interval,
493            self.backoff_randomization_factor,
494            self.backoff_multiplier,
495            self.backoff_max_interval,
496            self.backoff_max_elapsed_time,
497            self.call_timeout,
498            initialize_result,
499            initial_sse_lines,
500            was_resumed,
501        )
502        .await;
503
504        Ok(connection)
505    }
506}