objectiveai_sdk/mcp/connection.rs
1//! MCP connection for communicating with an MCP server.
2//!
3//! [`Connection`] is a cheaply-clonable handle around an internal
4//! [`ConnectionInner`]. The last drop of the inner `Arc` drops
5//! [`ConnectionInner`]'s `_listener_cancel_guard` field (a
6//! [`tokio_util::sync::DropGuard`]), which cancels the listener task's
7//! [`tokio_util::sync::CancellationToken`] — the SSE listener exits the
8//! instant any in-flight reconnect, sleep, or read is cancelled, with no
9//! zombie 401 retries against a now-dead proxy session.
10
11use std::ops::Deref;
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::sync::{Arc, RwLock as StdRwLock, Weak};
14use std::time::Duration;
15
16use indexmap::IndexMap;
17use tokio::sync::RwLock;
18use tokio_util::sync::{CancellationToken, DropGuard};
19
20/// Callback fired by [`Connection`] when the upstream MCP server emits
21/// `notifications/tools/list_changed` or `notifications/resources/list_changed`.
22///
23/// **Timing:** runs after the corresponding cache's write lock is taken
24/// but *before* the network paginate that replaces it. That ordering
25/// matches the moment the staleness window opens — anyone blocked on the
26/// read lock won't return until the new list lands. The callback should
27/// not call back into `list_tools` / `list_resources`: doing so would
28/// re-take the lock the listener already holds and deadlock.
29///
30/// Stored behind an `Arc` so the listener task can cheaply clone it out
31/// of the lock and call it without holding the read guard.
32pub type ListChangedCallback = Arc<dyn Fn() + Send + Sync + 'static>;
33
34/// A registered-or-not callback slot. Wrapper so [`ConnectionInner`] can
35/// keep `#[derive(Debug)]` (a raw `dyn Fn` isn't `Debug`).
36struct CallbackSlot(StdRwLock<Option<ListChangedCallback>>);
37
38impl CallbackSlot {
39 fn new() -> Self {
40 Self(StdRwLock::new(None))
41 }
42
43 fn set(&self, callback: ListChangedCallback) {
44 *self.0.write().unwrap() = Some(callback);
45 }
46
47 /// Cheap clone-out of the current callback (if any). The `Arc` clone
48 /// lets us release the read guard before invoking the callback.
49 fn get(&self) -> Option<ListChangedCallback> {
50 self.0.read().unwrap().clone()
51 }
52}
53
54impl std::fmt::Debug for CallbackSlot {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 let set = self.0.read().map(|g| g.is_some()).unwrap_or(false);
57 f.debug_struct("CallbackSlot").field("set", &set).finish()
58 }
59}
60
61/// An active connection to an MCP server using the Streamable HTTP transport.
62///
63/// Cheaply clonable (one `Arc` bump). When the last clone is dropped, the
64/// inner `Arc` ref count hits zero, the `_listener_cancel_guard` field is
65/// dropped, and the SSE listener task is
66/// cancelled — exiting any in-flight `lines.next_line()`, reconnect
67/// `send()`, or backoff `sleep` *immediately* without retrying against
68/// the now-dead proxy session.
69///
70/// Use the public methods (`list_tools`, `call_tool`, `list_resources`,
71/// `read_resource`, `call_tool_as_message`, `tool_key`) for the upstream
72/// MCP protocol surface. The inner state ([`ConnectionInner`]) is also
73/// reachable via `Deref` for read-only field access (e.g.
74/// `connection.url`, `connection.initialize_result.server_info.name`),
75/// but its methods are private — you must go through `Connection`.
76#[derive(Debug)]
77pub struct Connection {
78 inner: Arc<ConnectionInner>,
79}
80
81impl Clone for Connection {
82 fn clone(&self) -> Self {
83 Self {
84 inner: Arc::clone(&self.inner),
85 }
86 }
87}
88
89// No `Drop` for `Connection`: cancellation happens deterministically
90// when the last `Arc<ConnectionInner>` clone is dropped, which drops
91// the `_listener_cancel_guard` field and cancels the listener token.
92
93impl Deref for Connection {
94 type Target = ConnectionInner;
95 fn deref(&self) -> &ConnectionInner {
96 &self.inner
97 }
98}
99
100impl Connection {
101 /// Tear this connection down explicitly.
102 ///
103 /// 1. Cancels the long-lived list-changed listener task immediately
104 /// (drops the [`DropGuard`] in
105 /// [`ConnectionInner::_listener_cancel_guard`]), so by the time
106 /// the HTTP DELETE goes out the listener isn't still holding an
107 /// SSE read open against the upstream we're about to close.
108 /// 2. Issues `DELETE /` to the upstream with this connection's
109 /// `Mcp-Session-Id` and the same merged header set every other
110 /// RPC stamps. Reuses [`ConnectionInner::call_timeout`].
111 /// 3. Treats `404 / 401 / 403` as success — the upstream is
112 /// unreachable from these credentials anyway, which is the
113 /// desired terminal state. Other non-2xx surfaces as
114 /// [`super::Error::BadStatus`].
115 ///
116 /// Takes `&self`: the listener cancel is in-place, and dropping
117 /// the surrounding `Arc<ConnectionInner>` (which closes the rest
118 /// of the connection's owned state) is the caller's responsibility
119 /// — usually by dropping the `Arc<Session>` holding it. Stateless
120 /// callers that don't hold a `Connection` should use
121 /// [`Client::delete`](super::Client::delete) instead.
122 ///
123 /// **In-flight RPC ordering.** This method does not block on
124 /// in-flight `call_tool` / `read_resource` / `list_tools` /
125 /// `list_resources` calls on the same connection. If one is
126 /// outstanding when `delete` lands, the upstream may see DELETE
127 /// before the RPC's reply makes it back; the in-flight call then
128 /// surfaces as a closed-connection error to whoever started it.
129 /// That's the spec-correct order (client said terminate) — drain
130 /// on the caller side first if you need different semantics.
131 pub async fn delete(&self) -> Result<(), super::Error> {
132 // 1. Drop the listener-cancel guard. Releasing the `DropGuard`
133 // cancels the sibling `CancellationToken` the listener task
134 // holds; the listener `tokio::select!`s against it on every
135 // blocking await and exits inside one scheduler tick.
136 if let Ok(mut guard) = self.inner._listener_cancel_guard.lock() {
137 let _ = guard.take();
138 }
139
140 // 2. Build + send HTTP DELETE. Mirrors `Client::connect_once`'s
141 // request-stamp shape: header loop first, explicit
142 // `Mcp-Session-Id` always wins.
143 let request = self
144 .inner
145 .http_client
146 .delete(&self.inner.url)
147 .timeout(self.inner.call_timeout)
148 .headers(self.inner.build_request_headers(None, None).await);
149 let response = request.send().await.map_err(|source| {
150 super::Error::Request {
151 url: self.inner.url.clone(),
152 source,
153 }
154 })?;
155
156 // 3. 404 / 401 / 403 → success; other non-2xx → real error.
157 let status = response.status();
158 if matches!(
159 status,
160 reqwest::StatusCode::NOT_FOUND
161 | reqwest::StatusCode::UNAUTHORIZED
162 | reqwest::StatusCode::FORBIDDEN
163 ) {
164 return Ok(());
165 }
166 if !status.is_success() {
167 let body = response.text().await.unwrap_or_default();
168 return Err(super::Error::BadStatus {
169 url: self.inner.url.clone(),
170 code: status,
171 body: body.chars().take(800).collect(),
172 });
173 }
174 Ok(())
175 }
176
177 pub(super) async fn new(
178 http_client: reqwest::Client,
179 url: String,
180 session_id: String,
181 headers: IndexMap<String, String>,
182 backoff_current_interval: Duration,
183 backoff_initial_interval: Duration,
184 backoff_randomization_factor: f64,
185 backoff_multiplier: f64,
186 backoff_max_interval: Duration,
187 backoff_max_elapsed_time: Duration,
188 call_timeout: Duration,
189 initialize_result: super::initialize_result::InitializeResult,
190 initial_sse_lines: Option<super::LinesStream>,
191 ) -> Self {
192 let inner = ConnectionInner::new(
193 http_client,
194 url,
195 session_id,
196 headers,
197 backoff_current_interval,
198 backoff_initial_interval,
199 backoff_randomization_factor,
200 backoff_multiplier,
201 backoff_max_interval,
202 backoff_max_elapsed_time,
203 call_timeout,
204 initialize_result,
205 initial_sse_lines,
206 )
207 .await;
208 Self { inner }
209 }
210
211 #[cfg(test)]
212 pub(crate) fn new_for_test(name: String, url: String) -> Self {
213 Self {
214 inner: ConnectionInner::new_for_test(name, url),
215 }
216 }
217
218 #[cfg(test)]
219 pub(crate) fn new_for_test_with_caps(
220 name: String,
221 url: String,
222 capabilities: super::initialize_result::ServerCapabilities,
223 ) -> Self {
224 Self {
225 inner: ConnectionInner::new_for_test_with_caps(
226 name,
227 url,
228 capabilities,
229 ),
230 }
231 }
232
233 /// Returns a key identifying this connection for tool namespacing.
234 pub fn tool_key(&self) -> String {
235 self.inner.tool_key()
236 }
237
238 /// Returns the session ID for this connection.
239 pub fn session_id(&self) -> &str {
240 self.inner.session_id()
241 }
242
243 /// Returns all tools from the upstream server.
244 pub async fn list_tools(
245 &self,
246 ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
247 self.inner.list_tools().await
248 }
249
250 /// Calls a tool on the upstream server.
251 pub async fn call_tool(
252 &self,
253 params: &super::tool::CallToolRequestParams,
254 ) -> Result<super::tool::CallToolResult, super::Error> {
255 self.inner.call_tool(params).await
256 }
257
258 /// Calls a tool and converts the result into a [`ToolMessage`].
259 pub async fn call_tool_as_message(
260 &self,
261 params: &super::tool::CallToolRequestParams,
262 tool_call_id: String,
263 ) -> Result<crate::agent::completions::message::ToolMessage, super::Error>
264 {
265 self.inner.call_tool_as_message(params, tool_call_id).await
266 }
267
268 /// Returns all resources from the upstream server.
269 pub async fn list_resources(
270 &self,
271 ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
272 self.inner.list_resources().await
273 }
274
275 /// Reads a resource from the upstream server.
276 pub async fn read_resource(
277 &self,
278 uri: &str,
279 ) -> Result<super::resource::ReadResourceResult, super::Error> {
280 self.inner.read_resource(uri).await
281 }
282
283 /// Register a callback to fire whenever the upstream emits
284 /// `notifications/tools/list_changed`.
285 ///
286 /// **Timing:** the callback runs *after* the tool cache's write lock
287 /// is acquired but *before* the network paginate that replaces it.
288 /// That means readers blocked on the read lock won't return until the
289 /// new list is in place, and the callback observes the moment the
290 /// staleness window opens. The proxy uses this to emit its own
291 /// `notifications/tools/list_changed` to downstream clients at the
292 /// right instant.
293 ///
294 /// Replaces any previously-registered tools-list-changed callback.
295 /// All clones of this `Connection` share the same callback slot.
296 pub fn set_on_tools_list_changed<F>(&self, callback: F)
297 where
298 F: Fn() + Send + Sync + 'static,
299 {
300 self.inner.on_tools_list_changed.set(Arc::new(callback));
301 }
302
303 /// Register a callback to fire whenever the upstream emits
304 /// `notifications/resources/list_changed`. Same timing contract as
305 /// [`Connection::set_on_tools_list_changed`].
306 ///
307 /// Replaces any previously-registered resources-list-changed callback.
308 /// All clones of this `Connection` share the same callback slot.
309 pub fn set_on_resources_list_changed<F>(&self, callback: F)
310 where
311 F: Fn() + Send + Sync + 'static,
312 {
313 self.inner.on_resources_list_changed.set(Arc::new(callback));
314 }
315
316 /// Atomically replace the connection's [`ConnectionInner::extra_headers`]
317 /// bag. Every subsequent outbound HTTP request from this connection
318 /// stamps the new map AFTER `headers`, with `HeaderMap::insert`
319 /// REPLACE semantics — keys in `extras` override the same key in
320 /// the per-URL `headers` bag set at `Client::connect`. Caller
321 /// supplies the FULL replacement map; missing keys are dropped
322 /// (no merge).
323 ///
324 /// Used by the proxy to inject session-global headers
325 /// (`X-OBJECTIVEAI-RESPONSE-ID`, `X-OBJECTIVEAI-RESPONSE-IDS`)
326 /// that re-set on every inbound `initialize`, without re-dialing
327 /// the upstream.
328 pub async fn set_extra_headers(
329 &self,
330 extras: IndexMap<String, String>,
331 ) {
332 *self.inner.extra_headers.write().await = extras;
333 }
334}
335
336/// The actual connection state. Behind an `Arc` inside [`Connection`].
337///
338/// Fields are public for read-only access (callers reach them via
339/// `Connection`'s `Deref`), but every method on this type is private —
340/// the public surface lives on [`Connection`] and delegates through.
341#[derive(Debug)]
342pub struct ConnectionInner {
343 pub http_client: reqwest::Client,
344 pub url: String,
345 pub session_id: String,
346 /// All HTTP headers stamped on every POST / GET this connection
347 /// makes — the same merged map (defaults + caller overrides) the
348 /// `Client` built once during connect. `Mcp-Session-Id`,
349 /// `Content-Type`, and `Accept` are still set by the request
350 /// builders and override anything in `headers`.
351 pub headers: IndexMap<String, String>,
352 /// Mutable per-request override layer stamped AFTER `headers` on
353 /// every outbound HTTP request. The request-builder uses
354 /// `reqwest::header::HeaderMap::insert` semantics so any key
355 /// present in `extra_headers` REPLACES the same key in `headers`.
356 /// Used by the proxy to inject session-global headers
357 /// (`X-OBJECTIVEAI-RESPONSE-ID` etc.) that override per-URL
358 /// values without re-dialing. Empty by default; set via
359 /// [`Connection::set_extra_headers`].
360 pub extra_headers: RwLock<IndexMap<String, String>>,
361
362 pub backoff_current_interval: Duration,
363 pub backoff_initial_interval: Duration,
364 pub backoff_randomization_factor: f64,
365 pub backoff_multiplier: f64,
366 pub backoff_max_interval: Duration,
367 pub backoff_max_elapsed_time: Duration,
368 pub call_timeout: Duration,
369
370 /// The server's capabilities and info from the initialize response.
371 pub initialize_result: super::initialize_result::InitializeResult,
372
373 /// Auto-incrementing request ID (starts at 2; 1 was used for initialize).
374 next_id: AtomicU64,
375
376 /// All tools from the server, populated by background pagination.
377 ///
378 /// `None` = cache cleared — either pre-populate (between
379 /// [`Self::new`] and the first `refresh_tools_signaling`) or
380 /// post-drop (the listener empties this the moment its SSE
381 /// stream ends so `list_tools` will re-paginate against the
382 /// upstream rather than return stale state). `Some(_)` = last
383 /// known result, `Ok` or `Err`.
384 tools:
385 RwLock<Option<Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>>>>,
386 /// All resources from the server, populated by background pagination.
387 /// Same `None`/`Some` semantics as [`Self::tools`].
388 resources: RwLock<
389 Option<Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>>>,
390 >,
391
392 /// Cancellation token for the long-lived `listen_for_list_changes`
393 /// task. The listener selects this against every blocking await
394 /// (read, reconnect-send, backoff-sleep) and returns the instant it
395 /// fires.
396 ///
397 /// Held inside the connection as a [`DropGuard`] so that the moment
398 /// the last `Arc<ConnectionInner>` clone is dropped — i.e. the
399 /// moment no external `Connection` handle remains — `Drop` runs on
400 /// the guard, the token cancels, and the listener task tears down.
401 /// The listener itself holds a sibling `CancellationToken` (clone),
402 /// not the guard, so its task does not extend the connection's
403 /// lifetime.
404 ///
405 /// Wrapped in `Mutex<Option<_>>` so explicit teardown paths
406 /// ([`Connection::delete`]) can drop the guard in place — firing
407 /// the cancel token *before* the surrounding `Arc<ConnectionInner>`
408 /// goes away. Regular drop still works: `Mutex<Option<DropGuard>>`
409 /// drops its inner `DropGuard` automatically when the mutex itself
410 /// drops, so the listener is still cancelled on the last `Arc` drop.
411 _listener_cancel_guard: std::sync::Mutex<Option<DropGuard>>,
412
413 /// Optional callback fired *after* the listener has refreshed the
414 /// tool cache in response to an upstream `notifications/tools/list_changed`.
415 /// Set via [`Connection::set_on_tools_list_changed`].
416 on_tools_list_changed: CallbackSlot,
417
418 /// Optional callback fired *after* the listener has refreshed the
419 /// resource cache in response to an upstream
420 /// `notifications/resources/list_changed`.
421 /// Set via [`Connection::set_on_resources_list_changed`].
422 on_resources_list_changed: CallbackSlot,
423}
424
425impl ConnectionInner {
426 /// Creates a minimal connection for unit testing. Declares both
427 /// `tools` and `resources` capabilities with `list_changed:
428 /// Some(true)` so callers exercise the present-cap +
429 /// list_changed-enabled paths in `list_*`, `refresh_*`, and
430 /// `subscribe_*`. For other capability shapes use
431 /// `new_for_test_with_caps`.
432 #[cfg(test)]
433 fn new_for_test(name: String, url: String) -> Arc<Self> {
434 Self::new_for_test_with_caps(
435 name,
436 url,
437 super::initialize_result::ServerCapabilities {
438 experimental: None,
439 logging: None,
440 completions: None,
441 prompts: None,
442 resources: Some(
443 super::initialize_result::ResourcesCapability {
444 subscribe: None,
445 list_changed: Some(true),
446 },
447 ),
448 tools: Some(super::initialize_result::ToolsCapability {
449 list_changed: Some(true),
450 }),
451 tasks: None,
452 },
453 )
454 }
455
456 /// Creates a minimal connection for unit testing with an explicit
457 /// `ServerCapabilities`. Used by the capability-gating tests to
458 /// drive each gate's absent-cap branch.
459 #[cfg(test)]
460 fn new_for_test_with_caps(
461 name: String,
462 url: String,
463 capabilities: super::initialize_result::ServerCapabilities,
464 ) -> Arc<Self> {
465 Arc::new(Self {
466 http_client: reqwest::Client::new(),
467 url,
468 session_id: String::new(),
469 headers: IndexMap::new(),
470 extra_headers: RwLock::new(IndexMap::new()),
471 backoff_current_interval: Duration::from_millis(500),
472 backoff_initial_interval: Duration::from_millis(500),
473 backoff_randomization_factor: 0.5,
474 backoff_multiplier: 1.5,
475 backoff_max_interval: Duration::from_secs(60),
476 backoff_max_elapsed_time: Duration::from_secs(900),
477 call_timeout: Duration::from_secs(30),
478 initialize_result: super::initialize_result::InitializeResult {
479 protocol_version: "2025-03-26".into(),
480 capabilities,
481 server_info: super::initialize_result::Implementation {
482 name,
483 title: None,
484 version: "0.0.0".into(),
485 website_url: None,
486 description: None,
487 icons: None,
488 },
489 instructions: None,
490 _meta: None,
491 },
492 next_id: AtomicU64::new(2),
493 // Test connection has no listener and never refreshes; seed
494 // with an empty Ok so `list_tools` doesn't try to paginate.
495 tools: RwLock::new(Some(Ok(Arc::new(Vec::new())))),
496 resources: RwLock::new(Some(Ok(Arc::new(Vec::new())))),
497 _listener_cancel_guard: std::sync::Mutex::new(None),
498 on_tools_list_changed: CallbackSlot::new(),
499 on_resources_list_changed: CallbackSlot::new(),
500 })
501 }
502
503 /// Creates a new connection and spawns background tasks to paginate
504 /// all tools and resources. Called internally by
505 /// [`Client::connect`](super::Client::connect) (via [`Connection::new`]).
506 ///
507 /// `initial_sse_lines`, if `Some`, is a pre-opened SSE line reader
508 /// that the list-changed listener will read from immediately on its
509 /// first iteration, instead of opening its own GET `/`. The caller
510 /// is responsible for arranging for one of these to exist whenever
511 /// the upstream advertises `tools.list_changed` or
512 /// `resources.list_changed` — see
513 /// [`Client::connect`](super::Client::connect).
514 async fn new(
515 http_client: reqwest::Client,
516 url: String,
517 session_id: String,
518 headers: IndexMap<String, String>,
519 backoff_current_interval: Duration,
520 backoff_initial_interval: Duration,
521 backoff_randomization_factor: f64,
522 backoff_multiplier: f64,
523 backoff_max_interval: Duration,
524 backoff_max_elapsed_time: Duration,
525 call_timeout: Duration,
526 initialize_result: super::initialize_result::InitializeResult,
527 initial_sse_lines: Option<super::LinesStream>,
528 ) -> Arc<Self> {
529 // Cancel-the-listener machinery: store the DropGuard inside the
530 // inner so the cancellation fires deterministically when the
531 // last external `Arc<ConnectionInner>` clone drops. Hand the
532 // listener task a sibling clone (no guard) — that way the
533 // listener task's lifetime does not extend the connection.
534 let listener_cancel = CancellationToken::new();
535 let listener_cancel_for_task = listener_cancel.clone();
536 let conn = Arc::new(Self {
537 http_client,
538 url,
539 session_id,
540 headers,
541 extra_headers: RwLock::new(IndexMap::new()),
542 backoff_current_interval,
543 backoff_initial_interval,
544 backoff_randomization_factor,
545 backoff_multiplier,
546 backoff_max_interval,
547 backoff_max_elapsed_time,
548 call_timeout,
549 initialize_result,
550 next_id: AtomicU64::new(2),
551 // Start empty; `refresh_tools_signaling` below installs
552 // `Some(_)` before `new` returns (the lock-handoff oneshot
553 // gates the return on the writer holding the lock).
554 tools: RwLock::new(None),
555 resources: RwLock::new(None),
556 _listener_cancel_guard: std::sync::Mutex::new(Some(
557 listener_cancel.drop_guard(),
558 )),
559 on_tools_list_changed: CallbackSlot::new(),
560 on_resources_list_changed: CallbackSlot::new(),
561 });
562
563 // Spawn background tool lister if the server supports tools.
564 //
565 // We don't return until the spawned task has acquired the write
566 // lock. Otherwise a caller that immediately reads `list_tools()`
567 // could race the writer — `tokio::spawn` only queues the task,
568 // and a fast reader can acquire the read lock before the writer
569 // has run its first instruction. The reader would then see the
570 // initial empty `Vec` and return that, even though a full
571 // populate is in flight.
572 //
573 // The `RwLockWriteGuard` itself isn't `Send`-friendly enough to
574 // pass back, so we use a oneshot to signal "I'm holding the
575 // lock now"; once we receive that, the cache is exclusively
576 // owned by the writer and any subsequent `read().await` from
577 // the caller is guaranteed to wait for the populate to finish.
578 if conn.initialize_result.capabilities.tools.is_some() {
579 let conn = Arc::clone(&conn);
580 let (lock_held_tx, lock_held_rx) = tokio::sync::oneshot::channel();
581 tokio::spawn(async move {
582 conn.refresh_tools_signaling(lock_held_tx, None).await;
583 });
584 // Wait for the writer to hold the lock before returning.
585 let _ = lock_held_rx.await;
586 }
587
588 // Spawn background resource lister if the server supports
589 // resources. Same lock-handoff contract as tools above.
590 if conn.initialize_result.capabilities.resources.is_some() {
591 let conn = Arc::clone(&conn);
592 let (lock_held_tx, lock_held_rx) = tokio::sync::oneshot::channel();
593 tokio::spawn(async move {
594 conn.refresh_resources_signaling(lock_held_tx, None).await;
595 });
596 let _ = lock_held_rx.await;
597 }
598
599 // Spawn the list-changed listener iff the caller handed us a
600 // pre-opened SSE stream. The connection is naive about
601 // `tools.list_changed` / `resources.list_changed` capabilities —
602 // [`Client::connect`](super::Client::connect) translates them
603 // into "did or didn't open a stream for us." If we get a stream,
604 // we listen on it; if we don't, there's nothing to listen for.
605 if let Some(initial_lines) = initial_sse_lines {
606 // Hand the listener a `Weak` so the spawned task itself does
607 // not keep the connection alive. `listener_cancel_for_task`
608 // is a sibling clone of the connection's own
609 // `_listener_cancel_guard` token — when the last external
610 // `Arc<ConnectionInner>` clone is dropped, the inner's Drop
611 // releases the guard and the listener wakes from any
612 // pending await (read, send, sleep) and exits immediately.
613 let weak = Arc::downgrade(&conn);
614 tokio::spawn(async move {
615 Self::listen_for_list_changes(
616 weak,
617 listener_cancel_for_task,
618 initial_lines,
619 )
620 .await;
621 });
622 }
623
624 conn
625 }
626
627 /// Server declared a `tools` capability in its `InitializeResult`.
628 /// Gates `list_tools`, `call_tool`, `refresh_tools`. When `false` the
629 /// upstream cannot service `tools/*` RPCs at all and any attempt would
630 /// either hang in idempotent backoff (`tools/list`) or fail with
631 /// `SessionExpired` (`tools/call`).
632 fn has_tools_cap(&self) -> bool {
633 self.initialize_result.capabilities.tools.is_some()
634 }
635
636 /// Server declared a `resources` capability in its `InitializeResult`.
637 /// Gates `list_resources`, `read_resource`, `refresh_resources`, and
638 /// the post-`call_tool` ResourceLink resolution. Same hang-or-fail
639 /// shape as `has_tools_cap` when absent.
640 fn has_resources_cap(&self) -> bool {
641 self.initialize_result.capabilities.resources.is_some()
642 }
643
644 /// Creates an exponential backoff configuration from the connection's fields.
645 fn backoff(&self) -> backoff::ExponentialBackoff {
646 backoff::ExponentialBackoff {
647 current_interval: self.backoff_current_interval,
648 initial_interval: self.backoff_initial_interval,
649 randomization_factor: self.backoff_randomization_factor,
650 multiplier: self.backoff_multiplier,
651 max_interval: self.backoff_max_interval,
652 start_time: std::time::Instant::now(),
653 max_elapsed_time: Some(self.backoff_max_elapsed_time),
654 clock: backoff::SystemClock::default(),
655 }
656 }
657
658 /// Builds a POST request with all required headers and the call timeout.
659 async fn post(&self) -> reqwest::RequestBuilder {
660 self.http_client
661 .post(&self.url)
662 .timeout(self.call_timeout)
663 .headers(
664 self.build_request_headers(
665 Some("application/json"),
666 Some("application/json, text/event-stream"),
667 )
668 .await,
669 )
670 }
671
672 /// Build the `HeaderMap` stamped on every outbound request. Order
673 /// of insertion drives override semantics — `HeaderMap::insert`
674 /// REPLACES existing values for the same key:
675 ///
676 /// 1. Content-Type / Accept (when supplied by the caller).
677 /// 2. `self.headers` (the per-URL bag set at `Client::connect`).
678 /// 3. `self.extra_headers` (the mutable, session-global overrides
679 /// — proxies use this for `X-OBJECTIVEAI-RESPONSE-ID` etc).
680 /// 4. `Mcp-Session-Id` (the connection's own session id, always
681 /// last so it can never be shadowed).
682 async fn build_request_headers(
683 &self,
684 content_type: Option<&str>,
685 accept: Option<&str>,
686 ) -> reqwest::header::HeaderMap {
687 use reqwest::header::{
688 ACCEPT, CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue,
689 };
690 let mut hmap = HeaderMap::new();
691 if let Some(ct) = content_type {
692 if let Ok(hv) = HeaderValue::from_str(ct) {
693 hmap.insert(CONTENT_TYPE, hv);
694 }
695 }
696 if let Some(a) = accept {
697 if let Ok(hv) = HeaderValue::from_str(a) {
698 hmap.insert(ACCEPT, hv);
699 }
700 }
701 for (k, v) in &self.headers {
702 if let (Ok(hn), Ok(hv)) = (
703 HeaderName::try_from(k.as_str()),
704 HeaderValue::from_str(v),
705 ) {
706 hmap.insert(hn, hv);
707 }
708 }
709 let extras = self.extra_headers.read().await;
710 for (k, v) in extras.iter() {
711 if let (Ok(hn), Ok(hv)) = (
712 HeaderName::try_from(k.as_str()),
713 HeaderValue::from_str(v),
714 ) {
715 hmap.insert(hn, hv);
716 }
717 }
718 drop(extras);
719 if let Ok(hv) = HeaderValue::from_str(&self.session_id) {
720 hmap.insert(HeaderName::from_static("mcp-session-id"), hv);
721 }
722 hmap
723 }
724
725 /// Sends a JSON-RPC request, retrying transient errors when
726 /// `idempotent` is `true`.
727 ///
728 /// Idempotent methods (`tools/list`, `resources/list`,
729 /// `resources/read`, etc.) retry every transient error — network,
730 /// HTTP status, malformed body, JSON-RPC error, session expiration —
731 /// until the backoff's `max_elapsed_time` is exceeded.
732 ///
733 /// Non-idempotent methods (`tools/call`) make exactly one attempt.
734 /// Retrying a `tools/call` is unsafe: a tool may have mutated remote
735 /// state during the first attempt before the response was lost, and
736 /// re-firing the call would mutate state again. Each retry of
737 /// `AppendTask` advances `state.tasks.len()` an extra step, so the
738 /// agent sees a different return value than expected and the
739 /// pid-derived mock seed at the next step diverges. See
740 /// `objectiveai-api/src/agent/completions/client.rs` (sequential
741 /// dispatch) and `mock/client.rs::mock.seed_derive` for the
742 /// downstream consequence.
743 async fn rpc<P: serde::Serialize, R: serde::de::DeserializeOwned>(
744 &self,
745 method: &str,
746 params: &P,
747 idempotent: bool,
748 ) -> Result<R, super::Error> {
749 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
750 let body = serde_json::json!({
751 "jsonrpc": "2.0",
752 "id": id,
753 "method": method,
754 "params": params,
755 });
756
757 let attempt_one = || async {
758 let url = self.url.clone();
759 let response =
760 self.post().await.json(&body).send().await.map_err(|source| {
761 backoff::Error::transient(super::Error::Request {
762 url: url.clone(),
763 source,
764 })
765 })?;
766
767 if response.status() == reqwest::StatusCode::NOT_FOUND {
768 return Err(backoff::Error::transient(
769 super::Error::SessionExpired { url: url.clone() },
770 ));
771 }
772 if !response.status().is_success() {
773 let code = response.status();
774 let body = response.text().await.unwrap_or_default();
775 return Err(backoff::Error::transient(
776 super::Error::BadStatus {
777 url: url.clone(),
778 code,
779 body,
780 },
781 ));
782 }
783
784 let rpc_response: super::JsonRpcResponse<R> =
785 super::parse_streamable_http_response(&url, response)
786 .await
787 .map_err(backoff::Error::transient)?;
788
789 match rpc_response {
790 super::JsonRpcResponse::Success { result, .. } => Ok(result),
791 super::JsonRpcResponse::Error { error, .. } => {
792 Err(backoff::Error::transient(super::Error::JsonRpc {
793 url: url.clone(),
794 code: error.code,
795 message: error.message,
796 data: error.data,
797 }))
798 }
799 }
800 };
801
802 if idempotent {
803 backoff::future::retry(self.backoff(), attempt_one).await
804 } else {
805 attempt_one().await.map_err(|e| match e {
806 backoff::Error::Permanent(err)
807 | backoff::Error::Transient { err, .. } => err,
808 })
809 }
810 }
811
812 /// Returns a key identifying this connection for tool namespacing.
813 fn tool_key(&self) -> String {
814 format!("{}-{}", self.initialize_result.server_info.name, self.url)
815 }
816
817 /// Returns the session ID for this connection.
818 fn session_id(&self) -> &str {
819 &self.session_id
820 }
821
822 /// Sends a `tools/list` RPC call for a single page.
823 async fn rpc_list_tools(
824 &self,
825 cursor: Option<&str>,
826 ) -> Result<super::tool::ListToolsResult, super::Error> {
827 self.rpc(
828 "tools/list",
829 &super::tool::ListToolsRequest {
830 cursor: cursor.map(String::from),
831 },
832 true,
833 )
834 .await
835 }
836
837 /// Returns all tools from the server.
838 ///
839 /// Blocks until background pagination completes, then returns a
840 /// cheap `Arc` clone of the result. If the cache is currently
841 /// empty (e.g. because the listener detected its SSE stream drop
842 /// and cleared it) this paginates inline against the upstream —
843 /// the caller gets fresh data on the happy path, or the live
844 /// upstream error if the connection is genuinely down, rather
845 /// than stale pre-drop tools.
846 async fn list_tools(
847 &self,
848 ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
849 if !self.has_tools_cap() {
850 return Ok(Arc::new(Vec::new()));
851 }
852 if let Some(cached) = self.tools.read().await.as_ref() {
853 return cached.clone();
854 }
855 // Cache cleared; refresh inline. Concurrent callers may each
856 // refresh — wasteful, but the proxy fans out across distinct
857 // upstreams so a single Connection rarely sees concurrent
858 // `list_tools` calls.
859 self.refresh_tools(None).await;
860 self.tools
861 .read()
862 .await
863 .as_ref()
864 .expect("refresh_tools installs Some")
865 .clone()
866 }
867
868 /// Calls a tool on the MCP server. The returned
869 /// `CallToolResult.content` is **fully resolved**: every
870 /// `ContentBlock::ResourceLink { uri }` whose URI appears in
871 /// `list_resources` has already been replaced by one or more
872 /// `ContentBlock::EmbeddedResource` blocks carrying the fetched
873 /// contents (via `read_resource`). Unknown-URI links pass through
874 /// untouched — the upstream server may have its own out-of-band
875 /// resolution path the caller should preserve.
876 ///
877 /// Resolving inside `call_tool` (rather than downstream in
878 /// `call_tool_as_message`) means every consumer of the result
879 /// sees `EmbeddedResource` shapes uniformly; the stateless
880 /// `From<ContentBlock>` impl is then enough to convert the whole
881 /// result to `RichContent` with no further connection work.
882 async fn call_tool(
883 &self,
884 params: &super::tool::CallToolRequestParams,
885 ) -> Result<super::tool::CallToolResult, super::Error> {
886 if !self.has_tools_cap() {
887 return Err(super::Error::UnsupportedCapability {
888 capability: "tools",
889 });
890 }
891 let mut result: super::tool::CallToolResult =
892 self.rpc("tools/call", params, false).await?;
893
894 // Build the known-resource URI set for ResourceLink
895 // resolution. `list_resources` failure → empty set (same
896 // safe fallback the resolution path used previously).
897 let known_uris: std::collections::HashSet<String> =
898 match self.list_resources().await {
899 Ok(rs) => rs.iter().map(|r| r.uri.clone()).collect(),
900 Err(_) => std::collections::HashSet::new(),
901 };
902
903 // Walk the blocks, replacing each resolvable ResourceLink
904 // with one EmbeddedResource per returned ResourceContentsUnion.
905 // Everything else (Text, Image, Audio, EmbeddedResource,
906 // unknown-URI ResourceLinks) passes through.
907 let mut resolved: Vec<super::tool::ContentBlock> =
908 Vec::with_capacity(result.content.len());
909 for block in std::mem::take(&mut result.content) {
910 match block {
911 super::tool::ContentBlock::ResourceLink(link)
912 if known_uris.contains(&link.uri) =>
913 {
914 let read = self.read_resource(&link.uri).await?;
915 for contents in read.contents {
916 resolved.push(
917 super::tool::ContentBlock::EmbeddedResource(
918 super::tool::EmbeddedResource {
919 resource: contents,
920 // ResourceLink's annotations
921 // don't have a perfect home on
922 // EmbeddedResource — both fields
923 // exist but they describe different
924 // shapes (the link vs the inlined
925 // contents). Drop them on the way
926 // in; the EmbeddedResource is now
927 // a fresh authoritative block.
928 annotations: None,
929 _meta: None,
930 },
931 ),
932 );
933 }
934 }
935 other => resolved.push(other),
936 }
937 }
938 result.content = resolved;
939 Ok(result)
940 }
941
942 /// Calls a tool and converts the (already-resolved) result into a
943 /// [`ToolMessage`]. Resource resolution happens inside
944 /// [`Self::call_tool`] — by the time we get the blocks here every
945 /// resolvable `ResourceLink` has already been replaced with an
946 /// `EmbeddedResource`, so the conversion is a pure stateless
947 /// element-wise map through [`From<ContentBlock> for
948 /// RichContentPart`](crate::agent::completions::message::RichContentPart).
949 ///
950 /// Content-block mapping (handled by the `From` impl):
951 /// - `text` → text part
952 /// - `image` → image_url part (data URL)
953 /// - `audio` → input_audio part
954 /// - `embedded_resource` (text) → text part
955 /// - `embedded_resource` (blob, image mime) → image_url part
956 /// - `embedded_resource` (blob, audio mime) → input_audio part
957 /// - `embedded_resource` (blob, video mime) → input_video part
958 /// - `embedded_resource` (blob, other mime) → file part
959 /// - `resource_link` (unknown URI) → JSON-text fallback
960 /// (resolvable URIs were already resolved upstream)
961 async fn call_tool_as_message(
962 &self,
963 params: &super::tool::CallToolRequestParams,
964 tool_call_id: String,
965 ) -> Result<crate::agent::completions::message::ToolMessage, super::Error>
966 {
967 use crate::agent::completions::message::{
968 RichContentPart, ToolMessage, ToolResponseMetadata,
969 };
970
971 let result = self.call_tool(params).await?;
972
973 let parts: Vec<RichContentPart> =
974 result.content.into_iter().map(Into::into).collect();
975
976 // Lossy-decode the MCP `_meta` extension bag into our typed
977 // `ToolResponseMetadata`. Unknown keys (set by non-objectiveai
978 // upstreams) are silently dropped. Decoding failure leaves
979 // metadata as `None`.
980 let metadata = result._meta.as_ref().and_then(|m| {
981 serde_json::from_value::<ToolResponseMetadata>(
982 serde_json::to_value(m).ok()?,
983 )
984 .ok()
985 });
986
987 Ok(ToolMessage {
988 content: parts.into(),
989 tool_call_id,
990 metadata,
991 })
992 }
993
994 /// Sends a `resources/list` RPC call for a single page.
995 async fn rpc_list_resources(
996 &self,
997 cursor: Option<&str>,
998 ) -> Result<super::resource::ListResourcesResult, super::Error> {
999 self.rpc(
1000 "resources/list",
1001 &super::resource::ListResourcesRequest {
1002 cursor: cursor.map(String::from),
1003 },
1004 true,
1005 )
1006 .await
1007 }
1008
1009 /// Returns all resources from the server.
1010 ///
1011 /// Same cache-or-refresh semantics as [`Self::list_tools`]: a
1012 /// cleared cache (post-drop or pre-populate) triggers an inline
1013 /// paginate against the upstream.
1014 async fn list_resources(
1015 &self,
1016 ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
1017 if !self.has_resources_cap() {
1018 return Ok(Arc::new(Vec::new()));
1019 }
1020 if let Some(cached) = self.resources.read().await.as_ref() {
1021 return cached.clone();
1022 }
1023 self.refresh_resources(None).await;
1024 self.resources
1025 .read()
1026 .await
1027 .as_ref()
1028 .expect("refresh_resources installs Some")
1029 .clone()
1030 }
1031
1032 /// Reads a resource from the MCP server.
1033 async fn read_resource(
1034 &self,
1035 uri: &str,
1036 ) -> Result<super::resource::ReadResourceResult, super::Error> {
1037 if !self.has_resources_cap() {
1038 return Err(super::Error::UnsupportedCapability {
1039 capability: "resources",
1040 });
1041 }
1042 self.rpc(
1043 "resources/read",
1044 &super::resource::ReadResourceRequestParams {
1045 uri: uri.to_string(),
1046 },
1047 true,
1048 )
1049 .await
1050 }
1051
1052 /// Re-fetches all tools from the server, replacing the cached list.
1053 ///
1054 /// Optionally fires `on_change` *after* the write lock is acquired but
1055 /// *before* the network paginate begins, so the callback observes the
1056 /// "list change is in flight" edge — readers blocked on the read lock
1057 /// won't return until the new list lands. The proxy uses this to
1058 /// re-emit `notifications/tools/list_changed` to its downstream client
1059 /// at the moment the staleness window opens.
1060 async fn refresh_tools(&self, on_change: Option<ListChangedCallback>) {
1061 if !self.has_tools_cap() {
1062 // No tools capability — install an empty Vec so the cache
1063 // contract holds (`list_tools`'s `.expect("refresh_tools
1064 // installs Some")` etc.) and return without paginating or
1065 // signalling. No `notify_waiters` and no `on_change` —
1066 // nothing real changed.
1067 let mut guard = self.tools.write().await;
1068 *guard = Some(Ok(Arc::new(Vec::new())));
1069 return;
1070 }
1071 // Listener-driven refresh. Visibility contract: any caller
1072 // that issues `list_tools()` after a `tools/list_changed`
1073 // notification has been observed must see the post-swap
1074 // value, not stale data — so the write lock has to gate
1075 // readers across the upstream paginate.
1076 //
1077 // Performance contract: don't serialise paginate *behind*
1078 // lock-acquisition latency. We start `tools.write()` and the
1079 // upstream paginate **concurrently** with `tokio::join!`. The
1080 // write-lock acquire blocks new `list_tools()` readers
1081 // immediately (preserving visibility) and runs in parallel
1082 // with whatever drain time the in-flight readers need; the
1083 // paginate runs alongside. Total wall-clock is
1084 // `max(drain_time, paginate_time)` instead of the sum.
1085 //
1086 // `on_change` fires under the write guard, *after* `*guard =
1087 // result`, so anyone the callback wakes queues on the read lock,
1088 // waits for the guard to drop, and observes the post-swap state.
1089 let (mut guard, result) =
1090 tokio::join!(self.tools.write(), self.paginate_tools(),);
1091 *guard = Some(result);
1092 if let Some(cb) = on_change {
1093 cb();
1094 }
1095 }
1096
1097 /// Page-by-page fetch of the upstream tool list, no locks held.
1098 /// Shared between the `_signaling` (initial-populate, holds lock
1099 /// for the original "block fast readers" contract) and `refresh_*`
1100 /// (listener-driven, lock-only-around-install) variants.
1101 async fn paginate_tools(
1102 &self,
1103 ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
1104 let mut all_tools = Vec::new();
1105 let mut cursor: Option<String> = None;
1106 loop {
1107 match self.rpc_list_tools(cursor.as_deref()).await {
1108 Ok(page) => {
1109 all_tools.extend(page.tools);
1110 cursor = page.next_cursor;
1111 if cursor.is_none() {
1112 return Ok(Arc::new(all_tools));
1113 }
1114 }
1115 Err(e) => return Err(Arc::new(e)),
1116 }
1117 }
1118 }
1119
1120 /// Same as [`Self::refresh_tools`] but fires `lock_held` once the
1121 /// write lock has been acquired so the caller can synchronise on
1122 /// "writer is in possession of the cache" before returning. Used by
1123 /// `ConnectionInner::new` to prevent a fast reader from acquiring
1124 /// the read lock before this writer has even started.
1125 ///
1126 /// **Caller invariant:** the spawn site in `ConnectionInner::new`
1127 /// must gate this call on `capabilities.tools.is_some()`. This
1128 /// method assumes the tools capability is present and issues
1129 /// `tools/list` RPCs unconditionally — running it against a
1130 /// no-tools server triggers the 15-min idempotent-backoff storm.
1131 async fn refresh_tools_signaling(
1132 &self,
1133 lock_held: tokio::sync::oneshot::Sender<()>,
1134 on_change: Option<ListChangedCallback>,
1135 ) {
1136 let mut guard = self.tools.write().await;
1137 // Signal `lock_held` while we hold the write lock and *before*
1138 // installing the new list, so the `on_change` callback observes
1139 // the "list change in flight" edge — its readers queue behind
1140 // this write guard and always see the post-swap state.
1141 let _ = lock_held.send(());
1142 if let Some(cb) = on_change {
1143 cb();
1144 }
1145 let mut all_tools = Vec::new();
1146 let mut cursor: Option<String> = None;
1147 let result = loop {
1148 match self.rpc_list_tools(cursor.as_deref()).await {
1149 Ok(page) => {
1150 all_tools.extend(page.tools);
1151 cursor = page.next_cursor;
1152 if cursor.is_none() {
1153 break Ok(Arc::new(all_tools));
1154 }
1155 }
1156 Err(e) => break Err(Arc::new(e)),
1157 }
1158 };
1159 *guard = Some(result);
1160 }
1161
1162 /// Re-fetches all resources from the server, replacing the cached list.
1163 /// See [`ConnectionInner::refresh_tools`] for the callback timing
1164 /// contract.
1165 async fn refresh_resources(&self, on_change: Option<ListChangedCallback>) {
1166 if !self.has_resources_cap() {
1167 // Symmetric to `refresh_tools` — see that gate.
1168 let mut guard = self.resources.write().await;
1169 *guard = Some(Ok(Arc::new(Vec::new())));
1170 return;
1171 }
1172 // Same paginate-while-acquiring-the-write-lock pattern as
1173 // `refresh_tools` — see that comment for the visibility +
1174 // performance rationale.
1175 let (mut guard, result) =
1176 tokio::join!(self.resources.write(), self.paginate_resources(),);
1177 *guard = Some(result);
1178 if let Some(cb) = on_change {
1179 cb();
1180 }
1181 }
1182
1183 async fn paginate_resources(
1184 &self,
1185 ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
1186 let mut all_resources = Vec::new();
1187 let mut cursor: Option<String> = None;
1188 loop {
1189 match self.rpc_list_resources(cursor.as_deref()).await {
1190 Ok(page) => {
1191 all_resources.extend(page.resources);
1192 cursor = page.next_cursor;
1193 if cursor.is_none() {
1194 return Ok(Arc::new(all_resources));
1195 }
1196 }
1197 Err(e) => return Err(Arc::new(e)),
1198 }
1199 }
1200 }
1201
1202 /// Resource counterpart of [`Self::refresh_tools_signaling`]. The
1203 /// same spawn-site-gate invariant applies: the caller must gate
1204 /// the spawn on `capabilities.resources.is_some()`.
1205 async fn refresh_resources_signaling(
1206 &self,
1207 lock_held: tokio::sync::oneshot::Sender<()>,
1208 on_change: Option<ListChangedCallback>,
1209 ) {
1210 let mut guard = self.resources.write().await;
1211 // See `refresh_tools_signaling` — signal `lock_held` under the
1212 // write lock, before install, so the `on_change` callback's
1213 // readers see the post-swap state.
1214 let _ = lock_held.send(());
1215 if let Some(cb) = on_change {
1216 cb();
1217 }
1218 let mut all_resources = Vec::new();
1219 let mut cursor: Option<String> = None;
1220 let result = loop {
1221 match self.rpc_list_resources(cursor.as_deref()).await {
1222 Ok(page) => {
1223 all_resources.extend(page.resources);
1224 cursor = page.next_cursor;
1225 if cursor.is_none() {
1226 break Ok(Arc::new(all_resources));
1227 }
1228 }
1229 Err(e) => break Err(Arc::new(e)),
1230 }
1231 };
1232 *guard = Some(result);
1233 }
1234
1235 /// Builds a GET request to the MCP endpoint for receiving server
1236 /// notifications via SSE.
1237 async fn get(&self) -> reqwest::RequestBuilder {
1238 self.http_client
1239 .get(&self.url)
1240 .headers(
1241 self.build_request_headers(None, Some("text/event-stream"))
1242 .await,
1243 )
1244 }
1245
1246 /// Listens for `notifications/tools/list_changed` and
1247 /// `notifications/resources/list_changed` on an SSE stream. On each
1248 /// notification, write-locks and re-fetches the full list.
1249 ///
1250 /// `initial_lines` is the pre-opened SSE line reader handed in by
1251 /// [`Client::connect`](super::Client::connect) — that stream is
1252 /// consumed first. When it ends (or any later GET reconnect ends),
1253 /// we sleep `backoff_initial_interval` and open a fresh GET `/` SSE
1254 /// stream.
1255 ///
1256 /// Takes a [`Weak<Self>`] (not `Arc<Self>`) so the spawned task
1257 /// doesn't itself keep the [`Connection`] alive, and a
1258 /// [`CancellationToken`] sibling clone of the connection's
1259 /// [`DropGuard`] so the task tears down the instant the last
1260 /// external `Arc<ConnectionInner>` clone is dropped — every
1261 /// blocking await (line read, reconnect send, backoff sleep) is
1262 /// raced against `cancel.cancelled()` and exits without any zombie
1263 /// retries against a now-dead session.
1264 async fn listen_for_list_changes(
1265 weak: Weak<Self>,
1266 cancel: CancellationToken,
1267 initial_lines: super::LinesStream,
1268 ) {
1269 // First iteration: use the pre-opened SSE stream the client
1270 // handed us. After that, fall back to opening fresh GET / SSE
1271 // streams as the upstream connection cycles.
1272 let mut next_lines: Option<super::LinesStream> = Some(initial_lines);
1273 // One-shot guard for the catch-up refresh: false on the very
1274 // first iteration (the caller's pre-opened SSE stream — its
1275 // associated cache was just populated by `Client::connect`'s
1276 // initial pagination, so re-fetching there would just be a
1277 // wasted round-trip), true thereafter. Every stream end —
1278 // whether `Ok(None)` (clean close) or `Err(_)` (read failure)
1279 // — drops back here, which we treat as an implicit
1280 // list-changed notification: the upstream's broadcast (in
1281 // particular the proxy's per-session `tokio::broadcast`) is
1282 // lossy for moments when this listener has zero active
1283 // subscribers, so anything that fired during our disconnect
1284 // window may have been dropped.
1285 //
1286 // ORDER MATTERS. The refresh must run AFTER we've re-opened
1287 // the GET / SSE stream — i.e. after we're a subscriber again
1288 // — and BEFORE we enter the inner read loop. If we refreshed
1289 // before the resubscribe, a notification that fired between
1290 // our refresh-completion and our subscribe would be lost the
1291 // same way as the original disconnect-window drops; doing it
1292 // after means a notification fired DURING the refresh lands
1293 // in the new subscriber's buffer (broadcast::Sender::send
1294 // backs onto each receiver's channel-capacity slot) and gets
1295 // consumed by the inner loop on its next read.
1296 let mut is_reconnect = false;
1297
1298 loop {
1299 // The token cancels deterministically when the last
1300 // `Arc<ConnectionInner>` clone is dropped (see
1301 // `_listener_cancel_guard`). Check once per outer
1302 // iteration, but the real protection is the cancel arms in
1303 // every blocking await below — those exit immediately on
1304 // cancel.
1305 if cancel.is_cancelled() {
1306 return;
1307 }
1308 let Some(this) = weak.upgrade() else { return };
1309 let backoff_delay = this.backoff_initial_interval;
1310
1311 let mut lines = match next_lines.take() {
1312 Some(l) => l,
1313 None => {
1314 // Race the upstream GET against cancellation — if
1315 // the connection drops mid-reconnect, exit
1316 // immediately rather than waiting for the request
1317 // to complete or time out (otherwise produces a
1318 // burst of 401 retries against a now-dead session
1319 // under heavy churn).
1320 let send_outcome = tokio::select! {
1321 out = async {
1322 this.get().await.send().await
1323 } => out,
1324 _ = cancel.cancelled() => {
1325 drop(this);
1326 return;
1327 }
1328 };
1329 let response = match send_outcome {
1330 Ok(r) if r.status().is_success() => r,
1331 _ => {
1332 drop(this);
1333 // Sleep with cancel-arm: instant exit on
1334 // drop, no zombie retries.
1335 tokio::select! {
1336 _ = tokio::time::sleep(backoff_delay) => {}
1337 _ = cancel.cancelled() => return,
1338 }
1339 continue;
1340 }
1341 };
1342 super::lines_from_response(response)
1343 }
1344 };
1345
1346 // Catch-up refresh on every reconnect — the implicit
1347 // list-changed treatment for the just-failed stream. See
1348 // the `is_reconnect` doc-comment above for the
1349 // refresh-AFTER-resubscribe rationale.
1350 if is_reconnect {
1351 // tools and resources are independent locks; run the
1352 // catch-up refreshes concurrently so disconnect
1353 // recovery isn't sequential.
1354 let _ = tokio::join!(
1355 this.refresh_tools(this.on_tools_list_changed.get()),
1356 this.refresh_resources(
1357 this.on_resources_list_changed.get()
1358 ),
1359 );
1360 }
1361 is_reconnect = true;
1362
1363 'inner: loop {
1364 tokio::select! {
1365 line_result = lines.next_line() => {
1366 match line_result {
1367 Ok(Some(line)) => {
1368 // SSE data lines start with "data: ".
1369 let Some(data) = line.strip_prefix("data: ") else {
1370 continue 'inner;
1371 };
1372 let method = match serde_json::from_str::<super::JsonRpcNotification>(data) {
1373 Ok(n) => n.method,
1374 Err(_) => continue 'inner,
1375 };
1376 match method.as_str() {
1377 "notifications/tools/list_changed" => {
1378 // refresh_tools fires the
1379 // callback after the cache is
1380 // installed, so the proxy's
1381 // downstream
1382 // notifications/tools/list_changed
1383 // emission lines up with the
1384 // staleness window opening.
1385 this.refresh_tools(
1386 this.on_tools_list_changed.get(),
1387 )
1388 .await;
1389 }
1390 "notifications/resources/list_changed" => {
1391 this.refresh_resources(
1392 this.on_resources_list_changed.get(),
1393 )
1394 .await;
1395 }
1396 _ => {}
1397 }
1398 }
1399 // Stream ended cleanly or errored — break out
1400 // to the outer loop so we either reconnect or,
1401 // if everyone's gone, exit at the top.
1402 _ => break 'inner,
1403 }
1404 }
1405 // Cancellation: the connection's last clone has
1406 // dropped. Tear down immediately.
1407 _ = cancel.cancelled() => {
1408 drop(this);
1409 return;
1410 }
1411 }
1412 }
1413
1414 // Stream dropped — empty the per-Connection caches so the
1415 // next `list_tools` / `list_resources` paginates inline
1416 // against the (possibly still-dead) upstream rather than
1417 // returning whatever was cached before the drop. The
1418 // `is_reconnect` catch-up at the top of the next outer
1419 // iteration will repopulate `Some(_)` if the reconnect
1420 // succeeds; if the reconnect keeps failing, the cache
1421 // stays `None` and `list_*` callers paginate themselves.
1422 *this.tools.write().await = None;
1423 *this.resources.write().await = None;
1424
1425 // Stream ended — drop the strong ref before sleeping so the
1426 // next iteration's weak-upgrade can detect liveness honestly.
1427 drop(this);
1428 tokio::select! {
1429 _ = tokio::time::sleep(backoff_delay) => {}
1430 _ = cancel.cancelled() => return,
1431 }
1432 }
1433 }
1434}
1435
1436#[cfg(test)]
1437mod capability_gate_tests {
1438 use super::*;
1439 use crate::mcp::initialize_result::{
1440 ResourcesCapability, ServerCapabilities, ToolsCapability,
1441 };
1442 use crate::mcp::tool::{
1443 CallToolRequestParams, Tool, ToolSchemaObject, ToolSchemaType,
1444 };
1445
1446 /// Builds a `ServerCapabilities` with the given `tools` / `resources`
1447 /// shapes and every other capability set to `None`. Each gate test
1448 /// passes its own combination to exercise a specific cap-absent
1449 /// branch.
1450 fn caps(
1451 tools: Option<ToolsCapability>,
1452 resources: Option<ResourcesCapability>,
1453 ) -> ServerCapabilities {
1454 ServerCapabilities {
1455 experimental: None,
1456 logging: None,
1457 completions: None,
1458 prompts: None,
1459 resources,
1460 tools,
1461 tasks: None,
1462 }
1463 }
1464
1465 fn tool(name: &str) -> Tool {
1466 Tool {
1467 name: name.to_string(),
1468 title: None,
1469 description: None,
1470 icons: None,
1471 input_schema: ToolSchemaObject {
1472 r#type: ToolSchemaType::Object,
1473 properties: None,
1474 required: None,
1475 extra: IndexMap::new(),
1476 },
1477 output_schema: None,
1478 annotations: None,
1479 execution: None,
1480 _meta: None,
1481 }
1482 }
1483
1484 /// 3.1 — `list_tools` short-circuits to `Ok(empty)` when the server
1485 /// declared no `tools` capability, *before* the cache is consulted.
1486 /// We poison the cache with `Err` to prove the gate fires first.
1487 #[tokio::test]
1488 async fn list_tools_returns_empty_when_tools_cap_absent() {
1489 let conn = Connection::new_for_test_with_caps(
1490 "t".into(),
1491 "http://x".into(),
1492 caps(None, None),
1493 );
1494 let err = super::super::Error::NoSessionId {
1495 url: "http://x".into(),
1496 body: String::new(),
1497 };
1498 *conn.inner.tools.write().await = Some(Err(Arc::new(err)));
1499
1500 let got = conn.list_tools().await.unwrap();
1501 assert!(got.is_empty());
1502 }
1503
1504 /// 3.2 — symmetric to 3.1 for `list_resources`.
1505 #[tokio::test]
1506 async fn list_resources_returns_empty_when_resources_cap_absent() {
1507 let conn = Connection::new_for_test_with_caps(
1508 "t".into(),
1509 "http://x".into(),
1510 caps(None, None),
1511 );
1512 let err = super::super::Error::NoSessionId {
1513 url: "http://x".into(),
1514 body: String::new(),
1515 };
1516 *conn.inner.resources.write().await = Some(Err(Arc::new(err)));
1517
1518 let got = conn.list_resources().await.unwrap();
1519 assert!(got.is_empty());
1520 }
1521
1522 /// 3.3 — `read_resource` errors with `UnsupportedCapability` when
1523 /// the server declared no `resources` capability, without hitting
1524 /// the network.
1525 #[tokio::test]
1526 async fn read_resource_errors_when_resources_cap_absent() {
1527 let conn = Connection::new_for_test_with_caps(
1528 "t".into(),
1529 "http://x".into(),
1530 caps(None, None),
1531 );
1532 let got = conn.read_resource("file://nope").await;
1533 assert!(matches!(
1534 got,
1535 Err(super::super::Error::UnsupportedCapability {
1536 capability: "resources"
1537 })
1538 ));
1539 }
1540
1541 /// 3.4 — `call_tool` errors with `UnsupportedCapability` when the
1542 /// server declared no `tools` capability.
1543 #[tokio::test]
1544 async fn call_tool_errors_when_tools_cap_absent() {
1545 let conn = Connection::new_for_test_with_caps(
1546 "t".into(),
1547 "http://x".into(),
1548 caps(None, None),
1549 );
1550 let params = CallToolRequestParams {
1551 name: "any".into(),
1552 arguments: None,
1553 _meta: None,
1554 task: None,
1555 };
1556 let got = conn.call_tool(¶ms).await;
1557 assert!(matches!(
1558 got,
1559 Err(super::super::Error::UnsupportedCapability {
1560 capability: "tools"
1561 })
1562 ));
1563 }
1564
1565 /// 3.5 — `refresh_tools` installs `Some(Ok(empty))` and returns
1566 /// without paginating when the server declared no `tools`
1567 /// capability. Clearing the cache first proves the install ran.
1568 #[tokio::test]
1569 async fn refresh_tools_installs_empty_when_tools_cap_absent() {
1570 let conn = Connection::new_for_test_with_caps(
1571 "t".into(),
1572 "http://x".into(),
1573 caps(None, None),
1574 );
1575 *conn.inner.tools.write().await = None;
1576
1577 conn.inner.refresh_tools(None).await;
1578
1579 let guard = conn.inner.tools.read().await;
1580 let v = guard
1581 .as_ref()
1582 .expect("refresh installed Some")
1583 .as_ref()
1584 .expect("refresh installed Ok");
1585 assert!(v.is_empty());
1586 }
1587
1588 /// 3.6 — symmetric to 3.5 for `refresh_resources`.
1589 #[tokio::test]
1590 async fn refresh_resources_installs_empty_when_resources_cap_absent() {
1591 let conn = Connection::new_for_test_with_caps(
1592 "t".into(),
1593 "http://x".into(),
1594 caps(None, None),
1595 );
1596 *conn.inner.resources.write().await = None;
1597
1598 conn.inner.refresh_resources(None).await;
1599
1600 let guard = conn.inner.resources.read().await;
1601 let v = guard
1602 .as_ref()
1603 .expect("refresh installed Some")
1604 .as_ref()
1605 .expect("refresh installed Ok");
1606 assert!(v.is_empty());
1607 }
1608
1609}