objectiveai_sdk/mcp/client.rs
1//! MCP client for creating connections to MCP servers.
2
3use std::time::Duration;
4
5use indexmap::IndexMap;
6
7/// Client for creating MCP connections.
8///
9/// Holds shared configuration (HTTP client, headers, backoff parameters)
10/// and creates [`Connection`](super::Connection) instances via
11/// [`connect`](Client::connect).
12#[derive(Debug, Clone)]
13pub struct Client {
14 /// HTTP client for making requests.
15 pub http_client: reqwest::Client,
16 /// User-Agent header value.
17 pub user_agent: String,
18 /// X-Title header value.
19 pub x_title: String,
20 /// Referer header value.
21 pub http_referer: String,
22 /// Timeout for the initial connection (initialize request).
23 pub connect_timeout: Duration,
24
25 /// Current backoff interval for retry logic.
26 pub backoff_current_interval: Duration,
27 /// Initial backoff interval for retry logic.
28 pub backoff_initial_interval: Duration,
29 /// Randomization factor for backoff jitter.
30 pub backoff_randomization_factor: f64,
31 /// Multiplier for exponential backoff growth.
32 pub backoff_multiplier: f64,
33 /// Maximum backoff interval.
34 pub backoff_max_interval: Duration,
35 /// Maximum total time to spend on retries.
36 pub backoff_max_elapsed_time: Duration,
37 /// Timeout for individual RPC calls after connection is established.
38 pub call_timeout: Duration,
39}
40
41impl Client {
42 /// Creates a new MCP client.
43 pub fn new(
44 http_client: reqwest::Client,
45 user_agent: String,
46 x_title: String,
47 http_referer: String,
48 connect_timeout: Duration,
49 backoff_current_interval: Duration,
50 backoff_initial_interval: Duration,
51 backoff_randomization_factor: f64,
52 backoff_multiplier: f64,
53 backoff_max_interval: Duration,
54 backoff_max_elapsed_time: Duration,
55 call_timeout: Duration,
56 ) -> Self {
57 Self {
58 http_client,
59 user_agent,
60 x_title,
61 http_referer,
62 connect_timeout,
63 backoff_current_interval,
64 backoff_initial_interval,
65 backoff_randomization_factor,
66 backoff_multiplier,
67 backoff_max_interval,
68 backoff_max_elapsed_time,
69 call_timeout,
70 }
71 }
72
73 /// Build the canonical header map that the client stamps on every
74 /// request opened under this client / connection. The supplied
75 /// caller map (if any) wins on conflict — defaults are inserted
76 /// only when the caller didn't already provide them. The merged
77 /// map is computed once at the top of `connect_once` and reused
78 /// across all three handshake requests + handed to the resulting
79 /// [`Connection`] for every later RPC.
80 fn headers(
81 &self,
82 supplied: Option<IndexMap<String, String>>,
83 ) -> IndexMap<String, String> {
84 let mut out = supplied.unwrap_or_default();
85 out.entry("User-Agent".to_string())
86 .or_insert_with(|| self.user_agent.clone());
87 out.entry("X-Title".to_string())
88 .or_insert_with(|| self.x_title.clone());
89 out.entry("Referer".to_string())
90 .or_insert_with(|| self.http_referer.clone());
91 out.entry("HTTP-Referer".to_string())
92 .or_insert_with(|| self.http_referer.clone());
93 out
94 }
95
96 /// Connects to an MCP server using the Streamable HTTP transport.
97 ///
98 /// Sends an `initialize` JSON-RPC request to the server and extracts
99 /// the `Mcp-Session-Id` from the response. Returns a [`Connection`]
100 /// that can be used to list/call tools and list/read resources.
101 ///
102 /// `headers` are forwarded on every request this connection makes
103 /// to the upstream — both the initial `initialize` POST and every
104 /// subsequent RPC. The client merges its own defaults
105 /// (`User-Agent`, `X-Title`, `Referer`, `HTTP-Referer`) into this
106 /// map, but caller-supplied values for any of those win on
107 /// conflict. `Authorization` (when needed) is just another entry
108 /// in `headers`. The `Mcp-Session-Id` header is reserved — pass it
109 /// via `session_id` instead so the explicit argument can never be
110 /// clobbered by the headers map.
111 ///
112 /// ## SSE handoff
113 ///
114 /// `Accept` is `text/event-stream, application/json` — stream first
115 /// — so the server is encouraged to keep the underlying connection
116 /// open. If the response comes back as SSE we read the initialize
117 /// event off the stream and hand the *still-open* line reader to the
118 /// returned [`Connection`]'s list-changed listener. The listener
119 /// starts reading from that pre-opened stream immediately, which
120 /// closes the race where a peer (e.g. an in-process rmcp upstream)
121 /// would broadcast `notifications/tools/list_changed` before our
122 /// listener had managed to open its own GET `/` SSE.
123 ///
124 /// If the response is unary JSON and the server advertises either
125 /// `tools.list_changed` or `resources.list_changed`, we proactively
126 /// open a GET `/` SSE stream *before returning* and hand it to the
127 /// listener for the same reason. If neither capability is set, no
128 /// listener is needed and we return without touching SSE.
129 pub async fn connect(
130 &self,
131 url: String,
132 session_id: Option<String>,
133 headers: Option<IndexMap<String, String>>,
134 ) -> Result<super::Connection, super::Error> {
135 // Merge the caller's headers with the client's defaults once,
136 // then reuse the same merged map across every retry of the
137 // handshake AND hand it to the resulting Connection for every
138 // later RPC. Caller-supplied headers always win over defaults.
139 let headers = self.headers(headers);
140
141 // One outer backoff retry around all three handshake steps —
142 // initialize POST, notifications/initialized POST, GET / SSE
143 // (when capabilities require it). On a failure of any step we
144 // restart from scratch: a partial handshake leaves server-side
145 // session state we can't reuse, so retrying just the failed
146 // step would reference a session the server already discarded.
147 // Every error is treated as transient — the loop only gives up
148 // when the backoff's `max_elapsed_time` is exceeded.
149 let mut backoff = backoff::ExponentialBackoff {
150 current_interval: self.backoff_current_interval,
151 initial_interval: self.backoff_initial_interval,
152 randomization_factor: self.backoff_randomization_factor,
153 multiplier: self.backoff_multiplier,
154 max_interval: self.backoff_max_interval,
155 start_time: std::time::Instant::now(),
156 max_elapsed_time: Some(self.backoff_max_elapsed_time),
157 clock: backoff::SystemClock::default(),
158 };
159
160 loop {
161 match self
162 .connect_once(&url, session_id.as_deref(), &headers)
163 .await
164 {
165 Ok(conn) => return Ok(conn),
166 Err(e) => {
167 use backoff::backoff::Backoff;
168 match backoff.next_backoff() {
169 Some(d) => tokio::time::sleep(d).await,
170 None => return Err(e),
171 }
172 }
173 }
174 }
175 }
176
177 /// Issue a stateless HTTP `DELETE /` to one upstream MCP server,
178 /// telling it to terminate the session identified by `session_id`.
179 ///
180 /// This is the low-level primitive — no backoff, no listener
181 /// teardown, no connection state. Caller is responsible for any
182 /// retry semantics. For the stateful tear-down path that also
183 /// cancels the connection's own active streams and absorbs
184 /// upstream `404 / 401 / 403` as success, use
185 /// [`Connection::delete`](super::Connection::delete) instead.
186 ///
187 /// `headers` is merged with the same defaults `connect` applies
188 /// (`User-Agent`, `X-Title`, `Referer`, `HTTP-Referer`). The
189 /// explicit `session_id` argument always wins over any
190 /// `Mcp-Session-Id` entry that happens to appear in `headers` — the
191 /// shape mirrors `connect`'s argument split for the same reason.
192 ///
193 /// Returns `Ok(())` on any 2xx status. Any non-2xx (including
194 /// `404 Not Found`) surfaces as [`Error::BadStatus`]. Network /
195 /// transport failures surface as [`Error::Request`].
196 pub async fn delete(
197 &self,
198 url: String,
199 session_id: String,
200 headers: Option<IndexMap<String, String>>,
201 ) -> Result<(), super::Error> {
202 let headers = self.headers(headers);
203 let mut request = self
204 .http_client
205 .delete(&url)
206 .timeout(self.call_timeout)
207 .header("Mcp-Session-Id", &session_id);
208 for (name, value) in &headers {
209 // Explicit `session_id` arg always wins.
210 if name.eq_ignore_ascii_case("Mcp-Session-Id") {
211 continue;
212 }
213 request = request.header(name, value);
214 }
215 let response = request.send().await.map_err(|source| {
216 super::Error::Request {
217 url: url.clone(),
218 source,
219 }
220 })?;
221 if !response.status().is_success() {
222 let code = response.status();
223 let body = response.text().await.unwrap_or_default();
224 return Err(super::Error::BadStatus {
225 url,
226 code,
227 body: body.chars().take(800).collect(),
228 });
229 }
230 Ok(())
231 }
232
233 /// One pass through the full Streamable-HTTP handshake. Caller
234 /// applies the outer backoff retry loop in [`Self::connect`].
235 /// `headers` is the already-merged map (defaults + caller overrides
236 /// from [`Self::headers`]), reused on every request without further
237 /// processing. `Mcp-Session-Id` is applied AFTER the headers loop
238 /// so it always wins over any same-named entry in `headers`.
239 async fn connect_once(
240 &self,
241 url: &str,
242 session_id: Option<&str>,
243 headers: &IndexMap<String, String>,
244 ) -> Result<super::Connection, super::Error> {
245 let init_request = serde_json::json!({
246 "jsonrpc": "2.0",
247 "id": 1,
248 "method": "initialize",
249 "params": {
250 "protocolVersion": "2025-06-18",
251 "capabilities": {},
252 "clientInfo": {
253 "name": "objectiveai",
254 "version": env!("CARGO_PKG_VERSION"),
255 }
256 }
257 });
258
259 let mut request = self
260 .http_client
261 .post(url)
262 .timeout(self.connect_timeout)
263 .header("Content-Type", "application/json")
264 .header("Accept", "text/event-stream, application/json")
265 .json(&init_request);
266
267 for (name, value) in headers {
268 request = request.header(name, value);
269 }
270 // Mcp-Session-Id is applied last so the explicit `session_id`
271 // argument always wins over any same-named entry in `headers`.
272 if let Some(sid) = session_id {
273 request = request.header("Mcp-Session-Id", sid);
274 }
275
276 let response = request.send().await.map_err(|source| {
277 super::Error::Connection {
278 url: url.to_string(),
279 source,
280 }
281 })?;
282
283 if !response.status().is_success() {
284 let code = response.status();
285 let body = response.text().await.unwrap_or_default();
286 return Err(super::Error::BadStatus {
287 url: url.to_string(),
288 code,
289 body,
290 });
291 }
292
293 // Extract session ID from response header.
294 //
295 // For *new* sessions the server must mint and return a session
296 // id. For *existing* sessions (caller passed `session_id` in)
297 // many servers — including rmcp's `StreamableHttpService` on
298 // its existing-session branch — don't echo the header back
299 // because nothing changed. When the caller already knew the
300 // session id, fall back to it instead of erroring; that's the
301 // value we'll be stamping on every subsequent request anyway.
302 let resolved_session_id = match response
303 .headers()
304 .get("Mcp-Session-Id")
305 .and_then(|v| v.to_str().ok())
306 .map(String::from)
307 {
308 Some(s) => s,
309 None => match session_id {
310 Some(provided) => provided.to_string(),
311 None => {
312 let body = response.text().await.unwrap_or_default();
313 return Err(super::Error::NoSessionId {
314 url: url.to_string(),
315 body: body.chars().take(800).collect(),
316 });
317 }
318 },
319 };
320
321 // Did the server return SSE or unary JSON? rmcp's
322 // `StreamableHttpService` always returns SSE; many other servers
323 // reply with bare JSON.
324 let is_sse = response
325 .headers()
326 .get(reqwest::header::CONTENT_TYPE)
327 .and_then(|v| v.to_str().ok())
328 .map(|v| v.starts_with("text/event-stream"))
329 .unwrap_or(false);
330
331 // Parse the initialize response. SSE path consumes one event
332 // from the stream and keeps the rest of the stream alive for
333 // the listener; unary path consumes the whole body.
334 let (initialize_result, mut initial_sse_lines) = if is_sse {
335 let mut lines = super::lines_from_response(response);
336 let rpc_response: super::JsonRpcResponse<
337 super::initialize_result::InitializeResult,
338 > = super::read_next_sse_event(url, &mut lines).await?;
339 let result = match rpc_response {
340 super::JsonRpcResponse::Success { result, .. } => result,
341 super::JsonRpcResponse::Error { error, .. } => {
342 return Err(super::Error::JsonRpc {
343 url: url.to_string(),
344 code: error.code,
345 message: error.message,
346 data: error.data,
347 });
348 }
349 };
350 (result, Some(lines))
351 } else {
352 let rpc_response: super::JsonRpcResponse<
353 super::initialize_result::InitializeResult,
354 > = super::parse_streamable_http_response(url, response).await?;
355 let result = match rpc_response {
356 super::JsonRpcResponse::Success { result, .. } => result,
357 super::JsonRpcResponse::Error { error, .. } => {
358 return Err(super::Error::JsonRpc {
359 url: url.to_string(),
360 code: error.code,
361 message: error.message,
362 data: error.data,
363 });
364 }
365 };
366 (result, None)
367 };
368
369 // Whether we need a notification SSE channel at all.
370 let needs_sse = initialize_result
371 .capabilities
372 .tools
373 .as_ref()
374 .and_then(|t| t.list_changed)
375 .unwrap_or(false)
376 || initialize_result
377 .capabilities
378 .resources
379 .as_ref()
380 .and_then(|r| r.list_changed)
381 .unwrap_or(false);
382
383 // Send `notifications/initialized` BEFORE any other request.
384 // rmcp's per-session worker is in `expect_notification("initialized")`
385 // at this point — anything else (a `tools/list`, an opportunistic
386 // GET `/`) that lands during that window pushes a non-notification
387 // through the worker, makes `serve_server_with_ct_inner` return
388 // `Err(ExpectedInitializedNotification(...))`, drops the
389 // WorkerTransport, cancels the worker via its drop_guard, and
390 // tears the whole session down. Every later POST then 500s with
391 // "Session service terminated."
392 //
393 // We don't have a `Connection` yet — building one would spawn
394 // `refresh_tools` / `refresh_resources` background tasks that
395 // race with this notification, which is exactly the bug we're
396 // avoiding. We therefore POST inline here.
397 let init_notification_body = serde_json::json!({
398 "jsonrpc": "2.0",
399 "method": "notifications/initialized",
400 "params": {},
401 });
402 let mut notify_request = self
403 .http_client
404 .post(url)
405 .timeout(self.call_timeout)
406 .header("Content-Type", "application/json")
407 .header("Accept", "application/json, text/event-stream");
408 for (name, value) in headers {
409 notify_request = notify_request.header(name, value);
410 }
411 notify_request =
412 notify_request.header("Mcp-Session-Id", &resolved_session_id);
413 let notify_response = notify_request
414 .json(&init_notification_body)
415 .send()
416 .await
417 .map_err(|source| super::Error::Request {
418 url: url.to_string(),
419 source,
420 })?;
421 if !notify_response.status().is_success() {
422 let code = notify_response.status();
423 let body = notify_response.text().await.unwrap_or_default();
424 return Err(super::Error::BadStatus {
425 url: url.to_string(),
426 code,
427 body,
428 });
429 }
430
431 // Now safe to drop the init-side SSE stream; rmcp's session
432 // worker is past the init handshake and we have no further use
433 // for it (notifications come on the GET / stream below).
434 drop(initial_sse_lines.take());
435
436 // Now that the server's session worker is past the
437 // `expect_notification` gate, it's safe to open the proactive
438 // GET `/` SSE stream the listener will read
439 // `notifications/{tools,resources}/list_changed` from. Capability
440 // inspection only gates whether we open this stream; the
441 // `Connection` itself is naive about capabilities.
442 let initial_sse_lines: Option<super::LinesStream> = if needs_sse {
443 let mut get_request = self
444 .http_client
445 .get(url)
446 .timeout(self.connect_timeout)
447 .header("Accept", "text/event-stream");
448 for (name, value) in headers {
449 get_request = get_request.header(name, value);
450 }
451 get_request =
452 get_request.header("Mcp-Session-Id", &resolved_session_id);
453 let get_response = get_request.send().await.map_err(|source| {
454 super::Error::Connection {
455 url: url.to_string(),
456 source,
457 }
458 })?;
459 if !get_response.status().is_success() {
460 let code = get_response.status();
461 let body = get_response.text().await.unwrap_or_default();
462 return Err(super::Error::BadStatus {
463 url: url.to_string(),
464 code,
465 body,
466 });
467 }
468 Some(super::lines_from_response(get_response))
469 } else {
470 None
471 };
472
473 // Construct the Connection at the very end. This is the only
474 // place in `connect` where the listener task and the
475 // `refresh_tools` / `refresh_resources` background tasks get
476 // spawned — by now the upstream is fully past its init
477 // handshake, so any of those POSTs land safely.
478 //
479 // `was_resumed` is true when the caller passed an existing
480 // `session_id` into `connect` — i.e. this is a re-attach to a
481 // pre-existing upstream session, not a fresh mint. The bit
482 // rides into `ConnectionInner` as `is_reconnect` and gates
483 // the drop-time orphan-DELETE that releases resumed sessions
484 // nobody ended up using.
485 let was_resumed = session_id.is_some();
486 let connection = super::Connection::new(
487 self.http_client.clone(),
488 url.to_string(),
489 resolved_session_id,
490 headers.clone(),
491 self.backoff_current_interval,
492 self.backoff_initial_interval,
493 self.backoff_randomization_factor,
494 self.backoff_multiplier,
495 self.backoff_max_interval,
496 self.backoff_max_elapsed_time,
497 self.call_timeout,
498 initialize_result,
499 initial_sse_lines,
500 was_resumed,
501 )
502 .await;
503
504 Ok(connection)
505 }
506}