objectiveai_sdk/mcp/client.rs
1//! MCP client for creating connections to MCP servers.
2
3use std::time::Duration;
4
5use indexmap::IndexMap;
6
7/// Client for creating MCP connections.
8///
9/// Holds shared configuration (HTTP client, headers, backoff parameters)
10/// and creates [`Connection`](super::Connection) instances via
11/// [`connect`](Client::connect).
12#[derive(Debug, Clone)]
13pub struct Client {
14 /// HTTP client for making requests.
15 pub http_client: reqwest::Client,
16 /// User-Agent header value.
17 pub user_agent: String,
18 /// X-Title header value.
19 pub x_title: String,
20 /// Referer header value.
21 pub http_referer: String,
22 /// Timeout for the initial connection (initialize request).
23 pub connect_timeout: Duration,
24
25 /// Current backoff interval for retry logic.
26 pub backoff_current_interval: Duration,
27 /// Initial backoff interval for retry logic.
28 pub backoff_initial_interval: Duration,
29 /// Randomization factor for backoff jitter.
30 pub backoff_randomization_factor: f64,
31 /// Multiplier for exponential backoff growth.
32 pub backoff_multiplier: f64,
33 /// Maximum backoff interval.
34 pub backoff_max_interval: Duration,
35 /// Maximum total time to spend on retries.
36 pub backoff_max_elapsed_time: Duration,
37 /// Timeout for individual RPC calls after connection is established.
38 pub call_timeout: Duration,
39}
40
41/// Serializable MCP client tuning — the canonical `mcp_backoff` blob, as a
42/// single JSON object. Covers the full retry/timeout knob set [`Client`]
43/// uses: the connect + per-call timeouts plus the six exponential-backoff
44/// parameters. [`Default`] is the project-wide default (connect 60000ms,
45/// 100ms / 100ms / 0.5 / 1.5 / 1000ms / 40000ms, call 60000ms) — the same
46/// values the api and proxy already use when their `MCP_*` env vars are
47/// unset. Field order mirrors [`Client::new`]'s timeout/backoff arguments.
48#[derive(
49 Debug, Clone, Copy, PartialEq, serde::Serialize, serde::Deserialize, schemars::JsonSchema,
50)]
51#[schemars(rename = "mcp.Backoff")]
52pub struct Backoff {
53 pub connect_timeout_ms: u64,
54 pub current_interval_ms: u64,
55 pub initial_interval_ms: u64,
56 pub randomization_factor: f64,
57 pub multiplier: f64,
58 pub max_interval_ms: u64,
59 pub max_elapsed_time_ms: u64,
60 pub call_timeout_ms: u64,
61}
62
63impl Default for Backoff {
64 fn default() -> Self {
65 Self {
66 connect_timeout_ms: 60000,
67 current_interval_ms: 100,
68 initial_interval_ms: 100,
69 randomization_factor: 0.5,
70 multiplier: 1.5,
71 max_interval_ms: 1000,
72 max_elapsed_time_ms: 40000,
73 call_timeout_ms: 60000,
74 }
75 }
76}
77
78impl Client {
79 /// Creates a new MCP client.
80 pub fn new(
81 http_client: reqwest::Client,
82 user_agent: String,
83 x_title: String,
84 http_referer: String,
85 connect_timeout: Duration,
86 backoff_current_interval: Duration,
87 backoff_initial_interval: Duration,
88 backoff_randomization_factor: f64,
89 backoff_multiplier: f64,
90 backoff_max_interval: Duration,
91 backoff_max_elapsed_time: Duration,
92 call_timeout: Duration,
93 ) -> Self {
94 Self {
95 http_client,
96 user_agent,
97 x_title,
98 http_referer,
99 connect_timeout,
100 backoff_current_interval,
101 backoff_initial_interval,
102 backoff_randomization_factor,
103 backoff_multiplier,
104 backoff_max_interval,
105 backoff_max_elapsed_time,
106 call_timeout,
107 }
108 }
109
110 /// Build the canonical header map that the client stamps on every
111 /// request opened under this client / connection. The supplied
112 /// caller map (if any) wins on conflict — defaults are inserted
113 /// only when the caller didn't already provide them. The merged
114 /// map is computed once at the top of `connect_once` and reused
115 /// across all three handshake requests + handed to the resulting
116 /// [`Connection`] for every later RPC.
117 fn headers(
118 &self,
119 supplied: Option<IndexMap<String, String>>,
120 ) -> IndexMap<String, String> {
121 let mut out = supplied.unwrap_or_default();
122 out.entry("User-Agent".to_string())
123 .or_insert_with(|| self.user_agent.clone());
124 out.entry("X-Title".to_string())
125 .or_insert_with(|| self.x_title.clone());
126 out.entry("Referer".to_string())
127 .or_insert_with(|| self.http_referer.clone());
128 out.entry("HTTP-Referer".to_string())
129 .or_insert_with(|| self.http_referer.clone());
130 out
131 }
132
133 /// Connects to an MCP server using the Streamable HTTP transport.
134 ///
135 /// Sends an `initialize` JSON-RPC request to the server and extracts
136 /// the `Mcp-Session-Id` from the response. Returns a [`Connection`]
137 /// that can be used to list/call tools and list/read resources.
138 ///
139 /// `headers` are forwarded on every request this connection makes
140 /// to the upstream — both the initial `initialize` POST and every
141 /// subsequent RPC. The client merges its own defaults
142 /// (`User-Agent`, `X-Title`, `Referer`, `HTTP-Referer`) into this
143 /// map, but caller-supplied values for any of those win on
144 /// conflict. `Authorization` (when needed) is just another entry
145 /// in `headers`. The `Mcp-Session-Id` header is reserved — pass it
146 /// via `session_id` instead so the explicit argument can never be
147 /// clobbered by the headers map.
148 ///
149 /// ## SSE handoff
150 ///
151 /// `Accept` is `text/event-stream, application/json` — stream first
152 /// — so the server is encouraged to keep the underlying connection
153 /// open. If the response comes back as SSE we read the initialize
154 /// event off the stream and hand the *still-open* line reader to the
155 /// returned [`Connection`]'s list-changed listener. The listener
156 /// starts reading from that pre-opened stream immediately, which
157 /// closes the race where a peer (e.g. an in-process rmcp upstream)
158 /// would broadcast `notifications/tools/list_changed` before our
159 /// listener had managed to open its own GET `/` SSE.
160 ///
161 /// If the response is unary JSON and the server advertises either
162 /// `tools.list_changed` or `resources.list_changed`, we proactively
163 /// open a GET `/` SSE stream *before returning* and hand it to the
164 /// listener for the same reason. If neither capability is set, no
165 /// listener is needed and we return without touching SSE.
166 pub async fn connect(
167 &self,
168 url: String,
169 session_id: Option<String>,
170 headers: Option<IndexMap<String, String>>,
171 ) -> Result<super::Connection, super::Error> {
172 // Merge the caller's headers with the client's defaults once,
173 // then reuse the same merged map across every retry of the
174 // handshake AND hand it to the resulting Connection for every
175 // later RPC. Caller-supplied headers always win over defaults.
176 let headers = self.headers(headers);
177
178 // One outer backoff retry around all three handshake steps —
179 // initialize POST, notifications/initialized POST, GET / SSE
180 // (when capabilities require it). On a failure of any step we
181 // restart from scratch: a partial handshake leaves server-side
182 // session state we can't reuse, so retrying just the failed
183 // step would reference a session the server already discarded.
184 // Every error is treated as transient — the loop only gives up
185 // when the backoff's `max_elapsed_time` is exceeded.
186 let mut backoff = backoff::ExponentialBackoff {
187 current_interval: self.backoff_current_interval,
188 initial_interval: self.backoff_initial_interval,
189 randomization_factor: self.backoff_randomization_factor,
190 multiplier: self.backoff_multiplier,
191 max_interval: self.backoff_max_interval,
192 start_time: std::time::Instant::now(),
193 max_elapsed_time: Some(self.backoff_max_elapsed_time),
194 clock: backoff::SystemClock::default(),
195 };
196
197 loop {
198 match self
199 .connect_once(&url, session_id.as_deref(), &headers)
200 .await
201 {
202 Ok(conn) => return Ok(conn),
203 Err(e) => {
204 use backoff::backoff::Backoff;
205 match backoff.next_backoff() {
206 Some(d) => tokio::time::sleep(d).await,
207 None => return Err(e),
208 }
209 }
210 }
211 }
212 }
213
214 /// Issue a stateless HTTP `DELETE /` to one upstream MCP server,
215 /// telling it to terminate the session identified by `session_id`.
216 ///
217 /// This is the low-level primitive — no backoff, no listener
218 /// teardown, no connection state. Caller is responsible for any
219 /// retry semantics. For the stateful tear-down path that also
220 /// cancels the connection's own active streams and absorbs
221 /// upstream `404 / 401 / 403` as success, use
222 /// [`Connection::delete`](super::Connection::delete) instead.
223 ///
224 /// `headers` is merged with the same defaults `connect` applies
225 /// (`User-Agent`, `X-Title`, `Referer`, `HTTP-Referer`). The
226 /// explicit `session_id` argument always wins over any
227 /// `Mcp-Session-Id` entry that happens to appear in `headers` — the
228 /// shape mirrors `connect`'s argument split for the same reason.
229 ///
230 /// Returns `Ok(())` on any 2xx status. Any non-2xx (including
231 /// `404 Not Found`) surfaces as [`Error::BadStatus`]. Network /
232 /// transport failures surface as [`Error::Request`].
233 pub async fn delete(
234 &self,
235 url: String,
236 session_id: String,
237 headers: Option<IndexMap<String, String>>,
238 ) -> Result<(), super::Error> {
239 let headers = self.headers(headers);
240 let mut request = self
241 .http_client
242 .delete(&url)
243 .timeout(self.call_timeout)
244 .header("Mcp-Session-Id", &session_id);
245 for (name, value) in &headers {
246 // Explicit `session_id` arg always wins.
247 if name.eq_ignore_ascii_case("Mcp-Session-Id") {
248 continue;
249 }
250 request = request.header(name, value);
251 }
252 let response = request.send().await.map_err(|source| {
253 super::Error::Request {
254 url: url.clone(),
255 source,
256 }
257 })?;
258 if !response.status().is_success() {
259 let code = response.status();
260 let body = response.text().await.unwrap_or_default();
261 return Err(super::Error::BadStatus {
262 url,
263 code,
264 body: body.chars().take(800).collect(),
265 });
266 }
267 Ok(())
268 }
269
270 /// One pass through the full Streamable-HTTP handshake. Caller
271 /// applies the outer backoff retry loop in [`Self::connect`].
272 /// `headers` is the already-merged map (defaults + caller overrides
273 /// from [`Self::headers`]), reused on every request without further
274 /// processing. `Mcp-Session-Id` is applied AFTER the headers loop
275 /// so it always wins over any same-named entry in `headers`.
276 async fn connect_once(
277 &self,
278 url: &str,
279 session_id: Option<&str>,
280 headers: &IndexMap<String, String>,
281 ) -> Result<super::Connection, super::Error> {
282 let init_request = serde_json::json!({
283 "jsonrpc": "2.0",
284 "id": 1,
285 "method": "initialize",
286 "params": {
287 "protocolVersion": "2025-06-18",
288 "capabilities": {},
289 "clientInfo": {
290 "name": "objectiveai",
291 "version": env!("CARGO_PKG_VERSION"),
292 }
293 }
294 });
295
296 let mut request = self
297 .http_client
298 .post(url)
299 .timeout(self.connect_timeout)
300 .header("Content-Type", "application/json")
301 .header("Accept", "text/event-stream, application/json")
302 .json(&init_request);
303
304 for (name, value) in headers {
305 request = request.header(name, value);
306 }
307 // Mcp-Session-Id is applied last so the explicit `session_id`
308 // argument always wins over any same-named entry in `headers`.
309 if let Some(sid) = session_id {
310 request = request.header("Mcp-Session-Id", sid);
311 }
312
313 let response = request.send().await.map_err(|source| {
314 super::Error::Connection {
315 url: url.to_string(),
316 source,
317 }
318 })?;
319
320 if !response.status().is_success() {
321 let code = response.status();
322 let body = response.text().await.unwrap_or_default();
323 return Err(super::Error::BadStatus {
324 url: url.to_string(),
325 code,
326 body,
327 });
328 }
329
330 // Extract session ID from response header.
331 //
332 // For *new* sessions the server must mint and return a session
333 // id. For *existing* sessions (caller passed `session_id` in)
334 // many servers — including rmcp's `StreamableHttpService` on
335 // its existing-session branch — don't echo the header back
336 // because nothing changed. When the caller already knew the
337 // session id, fall back to it instead of erroring; that's the
338 // value we'll be stamping on every subsequent request anyway.
339 let resolved_session_id = match response
340 .headers()
341 .get("Mcp-Session-Id")
342 .and_then(|v| v.to_str().ok())
343 .map(String::from)
344 {
345 Some(s) => s,
346 None => match session_id {
347 Some(provided) => provided.to_string(),
348 None => {
349 let body = response.text().await.unwrap_or_default();
350 return Err(super::Error::NoSessionId {
351 url: url.to_string(),
352 body: body.chars().take(800).collect(),
353 });
354 }
355 },
356 };
357
358 // Did the server return SSE or unary JSON? rmcp's
359 // `StreamableHttpService` always returns SSE; many other servers
360 // reply with bare JSON.
361 let is_sse = response
362 .headers()
363 .get(reqwest::header::CONTENT_TYPE)
364 .and_then(|v| v.to_str().ok())
365 .map(|v| v.starts_with("text/event-stream"))
366 .unwrap_or(false);
367
368 // Parse the initialize response. SSE path consumes one event
369 // from the stream and keeps the rest of the stream alive for
370 // the listener; unary path consumes the whole body.
371 let (initialize_result, mut initial_sse_lines) = if is_sse {
372 let mut lines = super::lines_from_response(response);
373 let rpc_response: super::JsonRpcResponse<
374 super::initialize_result::InitializeResult,
375 > = super::read_next_sse_event(url, &mut lines).await?;
376 let result = match rpc_response {
377 super::JsonRpcResponse::Success { result, .. } => result,
378 super::JsonRpcResponse::Error { error, .. } => {
379 return Err(super::Error::JsonRpc {
380 url: url.to_string(),
381 code: error.code,
382 message: error.message,
383 data: error.data,
384 });
385 }
386 };
387 (result, Some(lines))
388 } else {
389 let rpc_response: super::JsonRpcResponse<
390 super::initialize_result::InitializeResult,
391 > = super::parse_streamable_http_response(url, response).await?;
392 let result = match rpc_response {
393 super::JsonRpcResponse::Success { result, .. } => result,
394 super::JsonRpcResponse::Error { error, .. } => {
395 return Err(super::Error::JsonRpc {
396 url: url.to_string(),
397 code: error.code,
398 message: error.message,
399 data: error.data,
400 });
401 }
402 };
403 (result, None)
404 };
405
406 // Whether we need a notification SSE channel at all.
407 let needs_sse = initialize_result
408 .capabilities
409 .tools
410 .as_ref()
411 .and_then(|t| t.list_changed)
412 .unwrap_or(false)
413 || initialize_result
414 .capabilities
415 .resources
416 .as_ref()
417 .and_then(|r| r.list_changed)
418 .unwrap_or(false);
419
420 // Send `notifications/initialized` BEFORE any other request.
421 // rmcp's per-session worker is in `expect_notification("initialized")`
422 // at this point — anything else (a `tools/list`, an opportunistic
423 // GET `/`) that lands during that window pushes a non-notification
424 // through the worker, makes `serve_server_with_ct_inner` return
425 // `Err(ExpectedInitializedNotification(...))`, drops the
426 // WorkerTransport, cancels the worker via its drop_guard, and
427 // tears the whole session down. Every later POST then 500s with
428 // "Session service terminated."
429 //
430 // We don't have a `Connection` yet — building one would spawn
431 // `refresh_tools` / `refresh_resources` background tasks that
432 // race with this notification, which is exactly the bug we're
433 // avoiding. We therefore POST inline here.
434 let init_notification_body = serde_json::json!({
435 "jsonrpc": "2.0",
436 "method": "notifications/initialized",
437 "params": {},
438 });
439 let mut notify_request = self
440 .http_client
441 .post(url)
442 .timeout(self.call_timeout)
443 .header("Content-Type", "application/json")
444 .header("Accept", "application/json, text/event-stream");
445 for (name, value) in headers {
446 notify_request = notify_request.header(name, value);
447 }
448 notify_request =
449 notify_request.header("Mcp-Session-Id", &resolved_session_id);
450 let notify_response = notify_request
451 .json(&init_notification_body)
452 .send()
453 .await
454 .map_err(|source| super::Error::Request {
455 url: url.to_string(),
456 source,
457 })?;
458 if !notify_response.status().is_success() {
459 let code = notify_response.status();
460 let body = notify_response.text().await.unwrap_or_default();
461 return Err(super::Error::BadStatus {
462 url: url.to_string(),
463 code,
464 body,
465 });
466 }
467
468 // Now safe to drop the init-side SSE stream; rmcp's session
469 // worker is past the init handshake and we have no further use
470 // for it (notifications come on the GET / stream below).
471 drop(initial_sse_lines.take());
472
473 // Now that the server's session worker is past the
474 // `expect_notification` gate, it's safe to open the proactive
475 // GET `/` SSE stream the listener will read
476 // `notifications/{tools,resources}/list_changed` from. Capability
477 // inspection only gates whether we open this stream; the
478 // `Connection` itself is naive about capabilities.
479 let initial_sse_lines: Option<super::LinesStream> = if needs_sse {
480 let mut get_request = self
481 .http_client
482 .get(url)
483 .timeout(self.connect_timeout)
484 .header("Accept", "text/event-stream");
485 for (name, value) in headers {
486 get_request = get_request.header(name, value);
487 }
488 get_request =
489 get_request.header("Mcp-Session-Id", &resolved_session_id);
490 let get_response = get_request.send().await.map_err(|source| {
491 super::Error::Connection {
492 url: url.to_string(),
493 source,
494 }
495 })?;
496 if !get_response.status().is_success() {
497 let code = get_response.status();
498 let body = get_response.text().await.unwrap_or_default();
499 return Err(super::Error::BadStatus {
500 url: url.to_string(),
501 code,
502 body,
503 });
504 }
505 Some(super::lines_from_response(get_response))
506 } else {
507 None
508 };
509
510 // Construct the Connection at the very end. This is the only
511 // place in `connect` where the listener task and the
512 // `refresh_tools` / `refresh_resources` background tasks get
513 // spawned — by now the upstream is fully past its init
514 // handshake, so any of those POSTs land safely.
515 let connection = super::Connection::new(
516 self.http_client.clone(),
517 url.to_string(),
518 resolved_session_id,
519 headers.clone(),
520 self.backoff_current_interval,
521 self.backoff_initial_interval,
522 self.backoff_randomization_factor,
523 self.backoff_multiplier,
524 self.backoff_max_interval,
525 self.backoff_max_elapsed_time,
526 self.call_timeout,
527 initialize_result,
528 initial_sse_lines,
529 )
530 .await;
531
532 Ok(connection)
533 }
534}