Skip to main content

objectiveai_sdk/mcp/
client.rs

1//! MCP client for creating connections to MCP servers.
2
3use std::time::Duration;
4
5use indexmap::IndexMap;
6
7/// Client for creating MCP connections.
8///
9/// Holds shared configuration (HTTP client, headers, backoff parameters)
10/// and creates [`Connection`](super::Connection) instances via
11/// [`connect`](Client::connect).
12#[derive(Debug, Clone)]
13pub struct Client {
14    /// HTTP client for making requests.
15    pub http_client: reqwest::Client,
16    /// User-Agent header value.
17    pub user_agent: String,
18    /// X-Title header value.
19    pub x_title: String,
20    /// Referer header value.
21    pub http_referer: String,
22    /// Timeout for the initial connection (initialize request).
23    pub connect_timeout: Duration,
24
25    /// Current backoff interval for retry logic.
26    pub backoff_current_interval: Duration,
27    /// Initial backoff interval for retry logic.
28    pub backoff_initial_interval: Duration,
29    /// Randomization factor for backoff jitter.
30    pub backoff_randomization_factor: f64,
31    /// Multiplier for exponential backoff growth.
32    pub backoff_multiplier: f64,
33    /// Maximum backoff interval.
34    pub backoff_max_interval: Duration,
35    /// Maximum total time to spend on retries.
36    pub backoff_max_elapsed_time: Duration,
37    /// Timeout for individual RPC calls after connection is established.
38    pub call_timeout: Duration,
39}
40
41/// Serializable MCP client tuning — the canonical `mcp_backoff` blob, as a
42/// single JSON object. Covers the full retry/timeout knob set [`Client`]
43/// uses: the connect + per-call timeouts plus the six exponential-backoff
44/// parameters. [`Default`] is the project-wide default (connect 60000ms,
45/// 100ms / 100ms / 0.5 / 1.5 / 1000ms / 40000ms, call 60000ms) — the same
46/// values the api and proxy already use when their `MCP_*` env vars are
47/// unset. Field order mirrors [`Client::new`]'s timeout/backoff arguments.
48#[derive(
49    Debug, Clone, Copy, PartialEq, serde::Serialize, serde::Deserialize, schemars::JsonSchema,
50)]
51#[schemars(rename = "mcp.Backoff")]
52pub struct Backoff {
53    pub connect_timeout_ms: u64,
54    pub current_interval_ms: u64,
55    pub initial_interval_ms: u64,
56    pub randomization_factor: f64,
57    pub multiplier: f64,
58    pub max_interval_ms: u64,
59    pub max_elapsed_time_ms: u64,
60    pub call_timeout_ms: u64,
61}
62
63impl Default for Backoff {
64    fn default() -> Self {
65        Self {
66            connect_timeout_ms: 60000,
67            current_interval_ms: 100,
68            initial_interval_ms: 100,
69            randomization_factor: 0.5,
70            multiplier: 1.5,
71            max_interval_ms: 1000,
72            max_elapsed_time_ms: 40000,
73            call_timeout_ms: 60000,
74        }
75    }
76}
77
78impl Client {
79    /// Creates a new MCP client.
80    pub fn new(
81        http_client: reqwest::Client,
82        user_agent: String,
83        x_title: String,
84        http_referer: String,
85        connect_timeout: Duration,
86        backoff_current_interval: Duration,
87        backoff_initial_interval: Duration,
88        backoff_randomization_factor: f64,
89        backoff_multiplier: f64,
90        backoff_max_interval: Duration,
91        backoff_max_elapsed_time: Duration,
92        call_timeout: Duration,
93    ) -> Self {
94        Self {
95            http_client,
96            user_agent,
97            x_title,
98            http_referer,
99            connect_timeout,
100            backoff_current_interval,
101            backoff_initial_interval,
102            backoff_randomization_factor,
103            backoff_multiplier,
104            backoff_max_interval,
105            backoff_max_elapsed_time,
106            call_timeout,
107        }
108    }
109
110    /// Build the canonical header map that the client stamps on every
111    /// request opened under this client / connection. The supplied
112    /// caller map (if any) wins on conflict — defaults are inserted
113    /// only when the caller didn't already provide them. The merged
114    /// map is computed once at the top of `connect_once` and reused
115    /// across all three handshake requests + handed to the resulting
116    /// [`Connection`] for every later RPC.
117    fn headers(
118        &self,
119        supplied: Option<IndexMap<String, String>>,
120    ) -> IndexMap<String, String> {
121        let mut out = supplied.unwrap_or_default();
122        out.entry("User-Agent".to_string())
123            .or_insert_with(|| self.user_agent.clone());
124        out.entry("X-Title".to_string())
125            .or_insert_with(|| self.x_title.clone());
126        out.entry("Referer".to_string())
127            .or_insert_with(|| self.http_referer.clone());
128        out.entry("HTTP-Referer".to_string())
129            .or_insert_with(|| self.http_referer.clone());
130        out
131    }
132
133    /// Connects to an MCP server using the Streamable HTTP transport.
134    ///
135    /// Sends an `initialize` JSON-RPC request to the server and extracts
136    /// the `Mcp-Session-Id` from the response. Returns a [`Connection`]
137    /// that can be used to list/call tools and list/read resources.
138    ///
139    /// `headers` are forwarded on every request this connection makes
140    /// to the upstream — both the initial `initialize` POST and every
141    /// subsequent RPC. The client merges its own defaults
142    /// (`User-Agent`, `X-Title`, `Referer`, `HTTP-Referer`) into this
143    /// map, but caller-supplied values for any of those win on
144    /// conflict. `Authorization` (when needed) is just another entry
145    /// in `headers`. The `Mcp-Session-Id` header is reserved — pass it
146    /// via `session_id` instead so the explicit argument can never be
147    /// clobbered by the headers map.
148    ///
149    /// ## SSE handoff
150    ///
151    /// `Accept` is `text/event-stream, application/json` — stream first
152    /// — so the server is encouraged to keep the underlying connection
153    /// open. If the response comes back as SSE we read the initialize
154    /// event off the stream and hand the *still-open* line reader to the
155    /// returned [`Connection`]'s list-changed listener. The listener
156    /// starts reading from that pre-opened stream immediately, which
157    /// closes the race where a peer (e.g. an in-process rmcp upstream)
158    /// would broadcast `notifications/tools/list_changed` before our
159    /// listener had managed to open its own GET `/` SSE.
160    ///
161    /// If the response is unary JSON and the server advertises either
162    /// `tools.list_changed` or `resources.list_changed`, we proactively
163    /// open a GET `/` SSE stream *before returning* and hand it to the
164    /// listener for the same reason. If neither capability is set, no
165    /// listener is needed and we return without touching SSE.
166    pub async fn connect(
167        &self,
168        url: String,
169        session_id: Option<String>,
170        headers: Option<IndexMap<String, String>>,
171    ) -> Result<super::Connection, super::Error> {
172        // Merge the caller's headers with the client's defaults once,
173        // then reuse the same merged map across every retry of the
174        // handshake AND hand it to the resulting Connection for every
175        // later RPC. Caller-supplied headers always win over defaults.
176        let headers = self.headers(headers);
177
178        // One outer backoff retry around all three handshake steps —
179        // initialize POST, notifications/initialized POST, GET / SSE
180        // (when capabilities require it). On a failure of any step we
181        // restart from scratch: a partial handshake leaves server-side
182        // session state we can't reuse, so retrying just the failed
183        // step would reference a session the server already discarded.
184        // Every error is treated as transient — the loop only gives up
185        // when the backoff's `max_elapsed_time` is exceeded.
186        let mut backoff = backoff::ExponentialBackoff {
187            current_interval: self.backoff_current_interval,
188            initial_interval: self.backoff_initial_interval,
189            randomization_factor: self.backoff_randomization_factor,
190            multiplier: self.backoff_multiplier,
191            max_interval: self.backoff_max_interval,
192            start_time: std::time::Instant::now(),
193            max_elapsed_time: Some(self.backoff_max_elapsed_time),
194            clock: backoff::SystemClock::default(),
195        };
196
197        loop {
198            match self
199                .connect_once(&url, session_id.as_deref(), &headers)
200                .await
201            {
202                Ok(conn) => return Ok(conn),
203                Err(e) => {
204                    use backoff::backoff::Backoff;
205                    match backoff.next_backoff() {
206                        Some(d) => tokio::time::sleep(d).await,
207                        None => return Err(e),
208                    }
209                }
210            }
211        }
212    }
213
214    /// Issue a stateless HTTP `DELETE /` to one upstream MCP server,
215    /// telling it to terminate the session identified by `session_id`.
216    ///
217    /// This is the low-level primitive — no backoff, no listener
218    /// teardown, no connection state. Caller is responsible for any
219    /// retry semantics. For the stateful tear-down path that also
220    /// cancels the connection's own active streams and absorbs
221    /// upstream `404 / 401 / 403` as success, use
222    /// [`Connection::delete`](super::Connection::delete) instead.
223    ///
224    /// `headers` is merged with the same defaults `connect` applies
225    /// (`User-Agent`, `X-Title`, `Referer`, `HTTP-Referer`). The
226    /// explicit `session_id` argument always wins over any
227    /// `Mcp-Session-Id` entry that happens to appear in `headers` — the
228    /// shape mirrors `connect`'s argument split for the same reason.
229    ///
230    /// Returns `Ok(())` on any 2xx status. Any non-2xx (including
231    /// `404 Not Found`) surfaces as [`Error::BadStatus`]. Network /
232    /// transport failures surface as [`Error::Request`].
233    pub async fn delete(
234        &self,
235        url: String,
236        session_id: String,
237        headers: Option<IndexMap<String, String>>,
238    ) -> Result<(), super::Error> {
239        let headers = self.headers(headers);
240        let mut request = self
241            .http_client
242            .delete(&url)
243            .timeout(self.call_timeout)
244            .header("Mcp-Session-Id", &session_id);
245        for (name, value) in &headers {
246            // Explicit `session_id` arg always wins.
247            if name.eq_ignore_ascii_case("Mcp-Session-Id") {
248                continue;
249            }
250            request = request.header(name, value);
251        }
252        let response = request.send().await.map_err(|source| {
253            super::Error::Request {
254                url: url.clone(),
255                source,
256            }
257        })?;
258        if !response.status().is_success() {
259            let code = response.status();
260            let body = response.text().await.unwrap_or_default();
261            return Err(super::Error::BadStatus {
262                url,
263                code,
264                body: body.chars().take(800).collect(),
265            });
266        }
267        Ok(())
268    }
269
270    /// One pass through the full Streamable-HTTP handshake. Caller
271    /// applies the outer backoff retry loop in [`Self::connect`].
272    /// `headers` is the already-merged map (defaults + caller overrides
273    /// from [`Self::headers`]), reused on every request without further
274    /// processing. `Mcp-Session-Id` is applied AFTER the headers loop
275    /// so it always wins over any same-named entry in `headers`.
276    async fn connect_once(
277        &self,
278        url: &str,
279        session_id: Option<&str>,
280        headers: &IndexMap<String, String>,
281    ) -> Result<super::Connection, super::Error> {
282        let init_request = serde_json::json!({
283            "jsonrpc": "2.0",
284            "id": 1,
285            "method": "initialize",
286            "params": {
287                "protocolVersion": "2025-06-18",
288                "capabilities": {},
289                "clientInfo": {
290                    "name": "objectiveai",
291                    "version": env!("CARGO_PKG_VERSION"),
292                }
293            }
294        });
295
296        let mut request = self
297            .http_client
298            .post(url)
299            .timeout(self.connect_timeout)
300            .header("Content-Type", "application/json")
301            .header("Accept", "text/event-stream, application/json")
302            .json(&init_request);
303
304        for (name, value) in headers {
305            request = request.header(name, value);
306        }
307        // Mcp-Session-Id is applied last so the explicit `session_id`
308        // argument always wins over any same-named entry in `headers`.
309        if let Some(sid) = session_id {
310            request = request.header("Mcp-Session-Id", sid);
311        }
312
313        let response = request.send().await.map_err(|source| {
314            super::Error::Connection {
315                url: url.to_string(),
316                source,
317            }
318        })?;
319
320        if !response.status().is_success() {
321            let code = response.status();
322            let body = response.text().await.unwrap_or_default();
323            return Err(super::Error::BadStatus {
324                url: url.to_string(),
325                code,
326                body,
327            });
328        }
329
330        // Extract session ID from response header.
331        //
332        // For *new* sessions the server must mint and return a session
333        // id. For *existing* sessions (caller passed `session_id` in)
334        // many servers — including rmcp's `StreamableHttpService` on
335        // its existing-session branch — don't echo the header back
336        // because nothing changed. When the caller already knew the
337        // session id, fall back to it instead of erroring; that's the
338        // value we'll be stamping on every subsequent request anyway.
339        let resolved_session_id = match response
340            .headers()
341            .get("Mcp-Session-Id")
342            .and_then(|v| v.to_str().ok())
343            .map(String::from)
344        {
345            Some(s) => s,
346            None => match session_id {
347                Some(provided) => provided.to_string(),
348                None => {
349                    let body = response.text().await.unwrap_or_default();
350                    return Err(super::Error::NoSessionId {
351                        url: url.to_string(),
352                        body: body.chars().take(800).collect(),
353                    });
354                }
355            },
356        };
357
358        // Did the server return SSE or unary JSON? rmcp's
359        // `StreamableHttpService` always returns SSE; many other servers
360        // reply with bare JSON.
361        let is_sse = response
362            .headers()
363            .get(reqwest::header::CONTENT_TYPE)
364            .and_then(|v| v.to_str().ok())
365            .map(|v| v.starts_with("text/event-stream"))
366            .unwrap_or(false);
367
368        // Parse the initialize response. SSE path consumes one event
369        // from the stream and keeps the rest of the stream alive for
370        // the listener; unary path consumes the whole body.
371        let (initialize_result, mut initial_sse_lines) = if is_sse {
372            let mut lines = super::lines_from_response(response);
373            let rpc_response: super::JsonRpcResponse<
374                super::initialize_result::InitializeResult,
375            > = super::read_next_sse_event(url, &mut lines).await?;
376            let result = match rpc_response {
377                super::JsonRpcResponse::Success { result, .. } => result,
378                super::JsonRpcResponse::Error { error, .. } => {
379                    return Err(super::Error::JsonRpc {
380                        url: url.to_string(),
381                        code: error.code,
382                        message: error.message,
383                        data: error.data,
384                    });
385                }
386            };
387            (result, Some(lines))
388        } else {
389            let rpc_response: super::JsonRpcResponse<
390                super::initialize_result::InitializeResult,
391            > = super::parse_streamable_http_response(url, response).await?;
392            let result = match rpc_response {
393                super::JsonRpcResponse::Success { result, .. } => result,
394                super::JsonRpcResponse::Error { error, .. } => {
395                    return Err(super::Error::JsonRpc {
396                        url: url.to_string(),
397                        code: error.code,
398                        message: error.message,
399                        data: error.data,
400                    });
401                }
402            };
403            (result, None)
404        };
405
406        // Whether we need a notification SSE channel at all.
407        let needs_sse = initialize_result
408            .capabilities
409            .tools
410            .as_ref()
411            .and_then(|t| t.list_changed)
412            .unwrap_or(false)
413            || initialize_result
414                .capabilities
415                .resources
416                .as_ref()
417                .and_then(|r| r.list_changed)
418                .unwrap_or(false);
419
420        // Send `notifications/initialized` BEFORE any other request.
421        // rmcp's per-session worker is in `expect_notification("initialized")`
422        // at this point — anything else (a `tools/list`, an opportunistic
423        // GET `/`) that lands during that window pushes a non-notification
424        // through the worker, makes `serve_server_with_ct_inner` return
425        // `Err(ExpectedInitializedNotification(...))`, drops the
426        // WorkerTransport, cancels the worker via its drop_guard, and
427        // tears the whole session down. Every later POST then 500s with
428        // "Session service terminated."
429        //
430        // We don't have a `Connection` yet — building one would spawn
431        // `refresh_tools` / `refresh_resources` background tasks that
432        // race with this notification, which is exactly the bug we're
433        // avoiding. We therefore POST inline here.
434        let init_notification_body = serde_json::json!({
435            "jsonrpc": "2.0",
436            "method": "notifications/initialized",
437            "params": {},
438        });
439        let mut notify_request = self
440            .http_client
441            .post(url)
442            .timeout(self.call_timeout)
443            .header("Content-Type", "application/json")
444            .header("Accept", "application/json, text/event-stream");
445        for (name, value) in headers {
446            notify_request = notify_request.header(name, value);
447        }
448        notify_request =
449            notify_request.header("Mcp-Session-Id", &resolved_session_id);
450        let notify_response = notify_request
451            .json(&init_notification_body)
452            .send()
453            .await
454            .map_err(|source| super::Error::Request {
455                url: url.to_string(),
456                source,
457            })?;
458        if !notify_response.status().is_success() {
459            let code = notify_response.status();
460            let body = notify_response.text().await.unwrap_or_default();
461            return Err(super::Error::BadStatus {
462                url: url.to_string(),
463                code,
464                body,
465            });
466        }
467
468        // Now safe to drop the init-side SSE stream; rmcp's session
469        // worker is past the init handshake and we have no further use
470        // for it (notifications come on the GET / stream below).
471        drop(initial_sse_lines.take());
472
473        // Now that the server's session worker is past the
474        // `expect_notification` gate, it's safe to open the proactive
475        // GET `/` SSE stream the listener will read
476        // `notifications/{tools,resources}/list_changed` from. Capability
477        // inspection only gates whether we open this stream; the
478        // `Connection` itself is naive about capabilities.
479        let initial_sse_lines: Option<super::LinesStream> = if needs_sse {
480            let mut get_request = self
481                .http_client
482                .get(url)
483                .timeout(self.connect_timeout)
484                .header("Accept", "text/event-stream");
485            for (name, value) in headers {
486                get_request = get_request.header(name, value);
487            }
488            get_request =
489                get_request.header("Mcp-Session-Id", &resolved_session_id);
490            let get_response = get_request.send().await.map_err(|source| {
491                super::Error::Connection {
492                    url: url.to_string(),
493                    source,
494                }
495            })?;
496            if !get_response.status().is_success() {
497                let code = get_response.status();
498                let body = get_response.text().await.unwrap_or_default();
499                return Err(super::Error::BadStatus {
500                    url: url.to_string(),
501                    code,
502                    body,
503                });
504            }
505            Some(super::lines_from_response(get_response))
506        } else {
507            None
508        };
509
510        // Construct the Connection at the very end. This is the only
511        // place in `connect` where the listener task and the
512        // `refresh_tools` / `refresh_resources` background tasks get
513        // spawned — by now the upstream is fully past its init
514        // handshake, so any of those POSTs land safely.
515        let connection = super::Connection::new(
516            self.http_client.clone(),
517            url.to_string(),
518            resolved_session_id,
519            headers.clone(),
520            self.backoff_current_interval,
521            self.backoff_initial_interval,
522            self.backoff_randomization_factor,
523            self.backoff_multiplier,
524            self.backoff_max_interval,
525            self.backoff_max_elapsed_time,
526            self.call_timeout,
527            initialize_result,
528            initial_sse_lines,
529        )
530        .await;
531
532        Ok(connection)
533    }
534}