objectiveai_sdk/mcp/connection.rs
1//! MCP connection for communicating with an MCP server.
2//!
3//! [`Connection`] is a cheaply-clonable handle around an internal
4//! [`ConnectionInner`]. The last drop of the inner `Arc` runs
5//! [`ConnectionInner`]'s `Drop`, which cancels the listener task's
6//! [`tokio_util::sync::CancellationToken`] (held in `_listener_cancel_guard`
7//! as a [`tokio_util::sync::DropGuard`]) — the SSE listener exits the
8//! instant any in-flight reconnect, sleep, or read is cancelled, with no
9//! zombie 401 retries against a now-dead proxy session.
10
11use std::ops::Deref;
12use std::sync::{Arc, RwLock as StdRwLock, Weak};
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::time::Duration;
15
16use indexmap::IndexMap;
17use tokio::sync::{Notify, RwLock};
18use tokio_util::sync::{CancellationToken, DropGuard};
19
20/// Callback fired by [`Connection`] when the upstream MCP server emits
21/// `notifications/tools/list_changed` or `notifications/resources/list_changed`.
22///
23/// **Timing:** runs after the corresponding cache's write lock is taken
24/// but *before* the network paginate that replaces it. That ordering
25/// matches the moment the staleness window opens — anyone blocked on the
26/// read lock won't return until the new list lands. The callback should
27/// not call back into `list_tools` / `list_resources`: doing so would
28/// re-take the lock the listener already holds and deadlock.
29///
30/// Stored behind an `Arc` so the listener task can cheaply clone it out
31/// of the lock and call it without holding the read guard.
32pub type ListChangedCallback = Arc<dyn Fn() + Send + Sync + 'static>;
33
34/// A registered-or-not callback slot. Wrapper so [`ConnectionInner`] can
35/// keep `#[derive(Debug)]` (a raw `dyn Fn` isn't `Debug`).
36struct CallbackSlot(StdRwLock<Option<ListChangedCallback>>);
37
38impl CallbackSlot {
39 fn new() -> Self {
40 Self(StdRwLock::new(None))
41 }
42
43 fn set(&self, callback: ListChangedCallback) {
44 *self.0.write().unwrap() = Some(callback);
45 }
46
47 /// Cheap clone-out of the current callback (if any). The `Arc` clone
48 /// lets us release the read guard before invoking the callback.
49 fn get(&self) -> Option<ListChangedCallback> {
50 self.0.read().unwrap().clone()
51 }
52}
53
54impl std::fmt::Debug for CallbackSlot {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 let set = self.0.read().map(|g| g.is_some()).unwrap_or(false);
57 f.debug_struct("CallbackSlot").field("set", &set).finish()
58 }
59}
60
61/// An active connection to an MCP server using the Streamable HTTP transport.
62///
63/// Cheaply clonable (one `Arc` bump). When the last clone is dropped, the
64/// inner `Arc` ref count hits zero, [`ConnectionInner::Drop`] runs, the
65/// listener-cancel `DropGuard` is dropped, and the SSE listener task is
66/// cancelled — exiting any in-flight `lines.next_line()`, reconnect
67/// `send()`, or backoff `sleep` *immediately* without retrying against
68/// the now-dead proxy session.
69///
70/// Use the public methods (`list_tools`, `call_tool`, `list_resources`,
71/// `read_resource`, `call_tool_as_message`, `tool_key`) for the upstream
72/// MCP protocol surface. The inner state ([`ConnectionInner`]) is also
73/// reachable via `Deref` for read-only field access (e.g.
74/// `connection.url`, `connection.initialize_result.server_info.name`),
75/// but its methods are private — you must go through `Connection`.
76#[derive(Debug)]
77pub struct Connection {
78 inner: Arc<ConnectionInner>,
79}
80
81impl Clone for Connection {
82 fn clone(&self) -> Self {
83 Self { inner: Arc::clone(&self.inner) }
84 }
85}
86
87// No `Drop` for `Connection`: cancellation happens deterministically
88// when the last `Arc<ConnectionInner>` clone is dropped, which runs
89// `ConnectionInner::drop` and releases the cancel-token DropGuard.
90
91impl Deref for Connection {
92 type Target = ConnectionInner;
93 fn deref(&self) -> &ConnectionInner {
94 &self.inner
95 }
96}
97
98impl Connection {
99 pub(super) async fn new(
100 http_client: reqwest::Client,
101 url: String,
102 session_id: String,
103 headers: IndexMap<String, String>,
104 backoff_current_interval: Duration,
105 backoff_initial_interval: Duration,
106 backoff_randomization_factor: f64,
107 backoff_multiplier: f64,
108 backoff_max_interval: Duration,
109 backoff_max_elapsed_time: Duration,
110 call_timeout: Duration,
111 initialize_result: super::initialize_result::InitializeResult,
112 initial_sse_lines: Option<super::LinesStream>,
113 ) -> Self {
114 let inner = ConnectionInner::new(
115 http_client,
116 url,
117 session_id,
118 headers,
119 backoff_current_interval,
120 backoff_initial_interval,
121 backoff_randomization_factor,
122 backoff_multiplier,
123 backoff_max_interval,
124 backoff_max_elapsed_time,
125 call_timeout,
126 initialize_result,
127 initial_sse_lines,
128 )
129 .await;
130 Self { inner }
131 }
132
133
134 pub(super) fn new_mock(url: String) -> Self {
135 Self { inner: ConnectionInner::new_mock(url) }
136 }
137
138 #[cfg(test)]
139 pub(crate) fn new_for_test(name: String, url: String) -> Self {
140 Self { inner: ConnectionInner::new_for_test(name, url) }
141 }
142
143 /// Send a JSON-RPC notification to the upstream. Used by `Client`
144 /// right after `initialize` to send `notifications/initialized`.
145 pub(super) async fn notify<P: serde::Serialize>(
146 &self,
147 method: &str,
148 params: &P,
149 ) -> Result<(), super::Error> {
150 self.inner.notify(method, params).await
151 }
152
153 /// Returns a key identifying this connection for tool namespacing.
154 pub fn tool_key(&self) -> String {
155 self.inner.tool_key()
156 }
157
158 /// Returns the session ID for this connection.
159 pub fn session_id(&self) -> &str {
160 self.inner.session_id()
161 }
162
163 /// Returns all tools from the upstream server.
164 pub async fn list_tools(
165 &self,
166 ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
167 self.inner.list_tools().await
168 }
169
170 /// Calls a tool on the upstream server.
171 pub async fn call_tool(
172 &self,
173 params: &super::tool::CallToolRequestParams,
174 ) -> Result<super::tool::CallToolResult, super::Error> {
175 self.inner.call_tool(params).await
176 }
177
178 /// Calls a tool and converts the result into a [`ToolMessage`].
179 pub async fn call_tool_as_message(
180 &self,
181 params: &super::tool::CallToolRequestParams,
182 tool_call_id: String,
183 ) -> Result<
184 crate::agent::completions::message::ToolMessage,
185 super::Error,
186 > {
187 self.inner.call_tool_as_message(params, tool_call_id).await
188 }
189
190 /// Returns all resources from the upstream server.
191 pub async fn list_resources(
192 &self,
193 ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
194 self.inner.list_resources().await
195 }
196
197 /// Returns the cached tool list as soon as it differs from `current`,
198 /// or waits up to `timeout` for the next `notifications/tools/list_changed`
199 /// from the upstream server before re-reading.
200 ///
201 /// Wakes the moment a refresh writer takes the cache write lock, so
202 /// the post-wake `read` is guaranteed to observe the new list rather
203 /// than racing against the install. Safe to call from any number of
204 /// tasks concurrently.
205 pub async fn subscribe_tools(
206 &self,
207 current: &[super::tool::Tool],
208 timeout: Duration,
209 ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
210 self.inner.subscribe_tools(current, timeout).await
211 }
212
213 /// Resource counterpart of [`Connection::subscribe_tools`].
214 pub async fn subscribe_resources(
215 &self,
216 current: &[super::resource::Resource],
217 timeout: Duration,
218 ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
219 self.inner.subscribe_resources(current, timeout).await
220 }
221
222 /// Reads a resource from the upstream server.
223 pub async fn read_resource(
224 &self,
225 uri: &str,
226 ) -> Result<super::resource::ReadResourceResult, super::Error> {
227 self.inner.read_resource(uri).await
228 }
229
230 /// Register a callback to fire whenever the upstream emits
231 /// `notifications/tools/list_changed`.
232 ///
233 /// **Timing:** the callback runs *after* the tool cache's write lock
234 /// is acquired but *before* the network paginate that replaces it.
235 /// That means readers blocked on the read lock won't return until the
236 /// new list is in place, and the callback observes the moment the
237 /// staleness window opens. The proxy uses this to emit its own
238 /// `notifications/tools/list_changed` to downstream clients at the
239 /// right instant.
240 ///
241 /// Replaces any previously-registered tools-list-changed callback.
242 /// All clones of this `Connection` share the same callback slot.
243 pub fn set_on_tools_list_changed<F>(&self, callback: F)
244 where
245 F: Fn() + Send + Sync + 'static,
246 {
247 self.inner.on_tools_list_changed.set(Arc::new(callback));
248 }
249
250 /// Register a callback to fire whenever the upstream emits
251 /// `notifications/resources/list_changed`. Same timing contract as
252 /// [`Connection::set_on_tools_list_changed`].
253 ///
254 /// Replaces any previously-registered resources-list-changed callback.
255 /// All clones of this `Connection` share the same callback slot.
256 pub fn set_on_resources_list_changed<F>(&self, callback: F)
257 where
258 F: Fn() + Send + Sync + 'static,
259 {
260 self.inner.on_resources_list_changed.set(Arc::new(callback));
261 }
262}
263
264/// The actual connection state. Behind an `Arc` inside [`Connection`].
265///
266/// Fields are public for read-only access (callers reach them via
267/// `Connection`'s `Deref`), but every method on this type is private —
268/// the public surface lives on [`Connection`] and delegates through.
269#[derive(Debug)]
270pub struct ConnectionInner {
271 pub http_client: reqwest::Client,
272 pub url: String,
273 pub session_id: String,
274 /// All HTTP headers stamped on every POST / GET this connection
275 /// makes — the same merged map (defaults + caller overrides) the
276 /// `Client` built once during connect. `Mcp-Session-Id`,
277 /// `Content-Type`, and `Accept` are still set by the request
278 /// builders and override anything in `headers`.
279 pub headers: IndexMap<String, String>,
280
281 pub backoff_current_interval: Duration,
282 pub backoff_initial_interval: Duration,
283 pub backoff_randomization_factor: f64,
284 pub backoff_multiplier: f64,
285 pub backoff_max_interval: Duration,
286 pub backoff_max_elapsed_time: Duration,
287 pub call_timeout: Duration,
288
289 /// The server's capabilities and info from the initialize response.
290 pub initialize_result: super::initialize_result::InitializeResult,
291
292 /// If true, all RPC/notify calls are no-ops. Used for mock orchestrator URLs.
293 mock: bool,
294
295 /// Auto-incrementing request ID (starts at 2; 1 was used for initialize).
296 next_id: AtomicU64,
297
298 /// All tools from the server, populated by background pagination.
299 tools: RwLock<Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>>>,
300 /// All resources from the server, populated by background pagination.
301 resources:
302 RwLock<Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>>>,
303
304 /// Cancellation token for the long-lived `listen_for_list_changes`
305 /// task. The listener selects this against every blocking await
306 /// (read, reconnect-send, backoff-sleep) and returns the instant it
307 /// fires.
308 ///
309 /// Held inside the connection as a [`DropGuard`] so that the moment
310 /// the last `Arc<ConnectionInner>` clone is dropped — i.e. the
311 /// moment no external `Connection` handle remains — `Drop` runs on
312 /// the guard, the token cancels, and the listener task tears down.
313 /// The listener itself holds a sibling `CancellationToken` (clone),
314 /// not the guard, so its task does not extend the connection's
315 /// lifetime.
316 _listener_cancel_guard: Option<DropGuard>,
317
318 /// Optional callback fired *after* the listener has refreshed the
319 /// tool cache in response to an upstream `notifications/tools/list_changed`.
320 /// Set via [`Connection::set_on_tools_list_changed`].
321 on_tools_list_changed: CallbackSlot,
322
323 /// Optional callback fired *after* the listener has refreshed the
324 /// resource cache in response to an upstream
325 /// `notifications/resources/list_changed`.
326 /// Set via [`Connection::set_on_resources_list_changed`].
327 on_resources_list_changed: CallbackSlot,
328
329 /// Wakes any task awaiting in [`Connection::subscribe_tools`]. Fired
330 /// from inside `refresh_tools_signaling` the moment the writer
331 /// acquires the cache write lock — *before* the new list is
332 /// installed. A woken subscriber's next `read().await` blocks behind
333 /// the writer's guard, so it always observes the post-swap state.
334 tools_changed: Notify,
335
336 /// Resource counterpart of [`Self::tools_changed`].
337 resources_changed: Notify,
338}
339
340impl ConnectionInner {
341 /// Creates a mock connection that never makes network requests.
342 /// All RPC calls return empty/default results.
343 fn new_mock(url: String) -> Arc<Self> {
344 Arc::new(Self {
345 http_client: reqwest::Client::new(),
346 url,
347 session_id: String::new(),
348 headers: IndexMap::new(),
349 backoff_current_interval: Duration::ZERO,
350 backoff_initial_interval: Duration::ZERO,
351 backoff_randomization_factor: 0.0,
352 backoff_multiplier: 1.0,
353 backoff_max_interval: Duration::ZERO,
354 backoff_max_elapsed_time: Duration::ZERO,
355 call_timeout: Duration::ZERO,
356 initialize_result: super::initialize_result::InitializeResult {
357 protocol_version: "2025-03-26".into(),
358 capabilities: super::initialize_result::ServerCapabilities {
359 experimental: None,
360 logging: None,
361 completions: None,
362 prompts: None,
363 resources: None,
364 tools: None,
365 tasks: None,
366 },
367 server_info: super::initialize_result::Implementation {
368 name: "mock".into(),
369 title: None,
370 version: "0.0.0".into(),
371 website_url: None,
372 description: None,
373 icons: None,
374 },
375 instructions: None,
376 _meta: None,
377 },
378 mock: true,
379 next_id: AtomicU64::new(2),
380 tools: RwLock::new(Ok(Arc::new(Vec::new()))),
381 resources: RwLock::new(Ok(Arc::new(Vec::new()))),
382 _listener_cancel_guard: None,
383 on_tools_list_changed: CallbackSlot::new(),
384 on_resources_list_changed: CallbackSlot::new(),
385 tools_changed: Notify::new(),
386 resources_changed: Notify::new(),
387 })
388 }
389
390 /// Creates a minimal connection for unit testing.
391 #[cfg(test)]
392 fn new_for_test(name: String, url: String) -> Arc<Self> {
393 Arc::new(Self {
394 http_client: reqwest::Client::new(),
395 url,
396 session_id: String::new(),
397 headers: IndexMap::new(),
398 backoff_current_interval: Duration::from_millis(500),
399 backoff_initial_interval: Duration::from_millis(500),
400 backoff_randomization_factor: 0.5,
401 backoff_multiplier: 1.5,
402 backoff_max_interval: Duration::from_secs(60),
403 backoff_max_elapsed_time: Duration::from_secs(900),
404 call_timeout: Duration::from_secs(30),
405 initialize_result: super::initialize_result::InitializeResult {
406 protocol_version: "2025-03-26".into(),
407 capabilities:
408 super::initialize_result::ServerCapabilities {
409 experimental: None,
410 logging: None,
411 completions: None,
412 prompts: None,
413 resources: None,
414 tools: None,
415 tasks: None,
416 },
417 server_info: super::initialize_result::Implementation {
418 name,
419 title: None,
420 version: "0.0.0".into(),
421 website_url: None,
422 description: None,
423 icons: None,
424 },
425 instructions: None,
426 _meta: None,
427 },
428 mock: false,
429 next_id: AtomicU64::new(2),
430 tools: RwLock::new(Ok(Arc::new(Vec::new()))),
431 resources: RwLock::new(Ok(Arc::new(Vec::new()))),
432 _listener_cancel_guard: None,
433 on_tools_list_changed: CallbackSlot::new(),
434 on_resources_list_changed: CallbackSlot::new(),
435 tools_changed: Notify::new(),
436 resources_changed: Notify::new(),
437 })
438 }
439
440 /// Creates a new connection and spawns background tasks to paginate
441 /// all tools and resources. Called internally by
442 /// [`Client::connect`](super::Client::connect) (via [`Connection::new`]).
443 ///
444 /// `initial_sse_lines`, if `Some`, is a pre-opened SSE line reader
445 /// that the list-changed listener will read from immediately on its
446 /// first iteration, instead of opening its own GET `/`. The caller
447 /// is responsible for arranging for one of these to exist whenever
448 /// the upstream advertises `tools.list_changed` or
449 /// `resources.list_changed` — see
450 /// [`Client::connect`](super::Client::connect).
451 async fn new(
452 http_client: reqwest::Client,
453 url: String,
454 session_id: String,
455 headers: IndexMap<String, String>,
456 backoff_current_interval: Duration,
457 backoff_initial_interval: Duration,
458 backoff_randomization_factor: f64,
459 backoff_multiplier: f64,
460 backoff_max_interval: Duration,
461 backoff_max_elapsed_time: Duration,
462 call_timeout: Duration,
463 initialize_result: super::initialize_result::InitializeResult,
464 initial_sse_lines: Option<super::LinesStream>,
465 ) -> Arc<Self> {
466 // Cancel-the-listener machinery: store the DropGuard inside the
467 // inner so the cancellation fires deterministically when the
468 // last external `Arc<ConnectionInner>` clone drops. Hand the
469 // listener task a sibling clone (no guard) — that way the
470 // listener task's lifetime does not extend the connection.
471 let listener_cancel = CancellationToken::new();
472 let listener_cancel_for_task = listener_cancel.clone();
473 let conn = Arc::new(Self {
474 http_client,
475 url,
476 session_id,
477 headers,
478 backoff_current_interval,
479 backoff_initial_interval,
480 backoff_randomization_factor,
481 backoff_multiplier,
482 backoff_max_interval,
483 backoff_max_elapsed_time,
484 call_timeout,
485 initialize_result,
486 mock: false,
487 next_id: AtomicU64::new(2),
488 tools: RwLock::new(Ok(Arc::new(Vec::new()))),
489 resources: RwLock::new(Ok(Arc::new(Vec::new()))),
490 _listener_cancel_guard: Some(listener_cancel.drop_guard()),
491 on_tools_list_changed: CallbackSlot::new(),
492 on_resources_list_changed: CallbackSlot::new(),
493 tools_changed: Notify::new(),
494 resources_changed: Notify::new(),
495 });
496
497 // Spawn background tool lister if the server supports tools.
498 //
499 // We don't return until the spawned task has acquired the write
500 // lock. Otherwise a caller that immediately reads `list_tools()`
501 // could race the writer — `tokio::spawn` only queues the task,
502 // and a fast reader can acquire the read lock before the writer
503 // has run its first instruction. The reader would then see the
504 // initial empty `Vec` and return that, even though a full
505 // populate is in flight.
506 //
507 // The `RwLockWriteGuard` itself isn't `Send`-friendly enough to
508 // pass back, so we use a oneshot to signal "I'm holding the
509 // lock now"; once we receive that, the cache is exclusively
510 // owned by the writer and any subsequent `read().await` from
511 // the caller is guaranteed to wait for the populate to finish.
512 if conn.initialize_result.capabilities.tools.is_some() {
513 let conn = Arc::clone(&conn);
514 let (lock_held_tx, lock_held_rx) = tokio::sync::oneshot::channel();
515 tokio::spawn(async move {
516 conn.refresh_tools_signaling(lock_held_tx, None).await;
517 });
518 // Wait for the writer to hold the lock before returning.
519 let _ = lock_held_rx.await;
520 }
521
522 // Spawn background resource lister if the server supports
523 // resources. Same lock-handoff contract as tools above.
524 if conn.initialize_result.capabilities.resources.is_some() {
525 let conn = Arc::clone(&conn);
526 let (lock_held_tx, lock_held_rx) = tokio::sync::oneshot::channel();
527 tokio::spawn(async move {
528 conn.refresh_resources_signaling(lock_held_tx, None).await;
529 });
530 let _ = lock_held_rx.await;
531 }
532
533 // Spawn the list-changed listener iff the caller handed us a
534 // pre-opened SSE stream. The connection is naive about
535 // `tools.list_changed` / `resources.list_changed` capabilities —
536 // [`Client::connect`](super::Client::connect) translates them
537 // into "did or didn't open a stream for us." If we get a stream,
538 // we listen on it; if we don't, there's nothing to listen for.
539 if let Some(initial_lines) = initial_sse_lines {
540 // Hand the listener a `Weak` so the spawned task itself does
541 // not keep the connection alive. `listener_cancel_for_task`
542 // is a sibling clone of the connection's own
543 // `_listener_cancel_guard` token — when the last external
544 // `Arc<ConnectionInner>` clone is dropped, the inner's Drop
545 // releases the guard and the listener wakes from any
546 // pending await (read, send, sleep) and exits immediately.
547 let weak = Arc::downgrade(&conn);
548 tokio::spawn(async move {
549 Self::listen_for_list_changes(
550 weak,
551 listener_cancel_for_task,
552 initial_lines,
553 )
554 .await;
555 });
556 }
557
558 conn
559 }
560
561 /// Creates an exponential backoff configuration from the connection's fields.
562 fn backoff(&self) -> backoff::ExponentialBackoff {
563 backoff::ExponentialBackoff {
564 current_interval: self.backoff_current_interval,
565 initial_interval: self.backoff_initial_interval,
566 randomization_factor: self.backoff_randomization_factor,
567 multiplier: self.backoff_multiplier,
568 max_interval: self.backoff_max_interval,
569 start_time: std::time::Instant::now(),
570 max_elapsed_time: Some(self.backoff_max_elapsed_time),
571 clock: backoff::SystemClock::default(),
572 }
573 }
574
575 /// Builds a POST request with all required headers and the call timeout.
576 fn post(&self) -> reqwest::RequestBuilder {
577 let mut request = self
578 .http_client
579 .post(&self.url)
580 .timeout(self.call_timeout)
581 .header("Content-Type", "application/json")
582 .header("Accept", "application/json, text/event-stream");
583 for (name, value) in &self.headers {
584 request = request.header(name, value);
585 }
586 // Mcp-Session-Id is applied last so a same-named entry in
587 // `headers` (e.g. the proxy's encoded session id) can never
588 // override the connection's own session id.
589 request = request.header("Mcp-Session-Id", &self.session_id);
590 request
591 }
592
593 /// Sends a JSON-RPC request, retrying transient errors when
594 /// `idempotent` is `true`.
595 ///
596 /// Idempotent methods (`tools/list`, `resources/list`,
597 /// `resources/read`, etc.) retry every transient error — network,
598 /// HTTP status, malformed body, JSON-RPC error, session expiration —
599 /// until the backoff's `max_elapsed_time` is exceeded.
600 ///
601 /// Non-idempotent methods (`tools/call`) make exactly one attempt.
602 /// Retrying a `tools/call` is unsafe: a tool may have mutated remote
603 /// state during the first attempt before the response was lost, and
604 /// re-firing the call would mutate state again. Each retry of
605 /// `AppendTask` advances `state.tasks.len()` an extra step, so the
606 /// agent sees a different return value than expected and the
607 /// pid-derived mock seed at the next step diverges. See
608 /// `objectiveai-api/src/agent/completions/client.rs` (sequential
609 /// dispatch) and `mock/client.rs::mock.seed_derive` for the
610 /// downstream consequence.
611 async fn rpc<P: serde::Serialize, R: serde::de::DeserializeOwned>(
612 &self,
613 method: &str,
614 params: &P,
615 idempotent: bool,
616 ) -> Result<R, super::Error> {
617 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
618 let body = serde_json::json!({
619 "jsonrpc": "2.0",
620 "id": id,
621 "method": method,
622 "params": params,
623 });
624
625 let attempt_one = || async {
626 let url = self.url.clone();
627 let response = self.post().json(&body).send().await.map_err(|source| {
628 backoff::Error::transient(super::Error::Request {
629 url: url.clone(),
630 source,
631 })
632 })?;
633
634 if response.status() == reqwest::StatusCode::NOT_FOUND {
635 return Err(backoff::Error::transient(
636 super::Error::SessionExpired { url: url.clone() },
637 ));
638 }
639 if !response.status().is_success() {
640 let code = response.status();
641 let body = response.text().await.unwrap_or_default();
642 return Err(backoff::Error::transient(
643 super::Error::BadStatus { url: url.clone(), code, body },
644 ));
645 }
646
647 let rpc_response: super::JsonRpcResponse<R> =
648 super::parse_streamable_http_response(&url, response)
649 .await
650 .map_err(backoff::Error::transient)?;
651
652 match rpc_response {
653 super::JsonRpcResponse::Success { result, .. } => Ok(result),
654 super::JsonRpcResponse::Error { error, .. } => {
655 Err(backoff::Error::transient(super::Error::JsonRpc {
656 url: url.clone(),
657 code: error.code,
658 message: error.message,
659 data: error.data,
660 }))
661 }
662 }
663 };
664
665 if idempotent {
666 backoff::future::retry(self.backoff(), attempt_one).await
667 } else {
668 attempt_one().await.map_err(|e| match e {
669 backoff::Error::Permanent(err) | backoff::Error::Transient { err, .. } => err,
670 })
671 }
672 }
673
674 /// Sends a JSON-RPC notification (no response expected) with the
675 /// same exponential-backoff retry policy as [`Self::rpc`]. Every
676 /// error is transient; the loop gives up only when the backoff's
677 /// `max_elapsed_time` is exceeded.
678 async fn notify<P: serde::Serialize>(
679 &self,
680 method: &str,
681 params: &P,
682 ) -> Result<(), super::Error> {
683 if self.mock { return Ok(()); }
684 let body = serde_json::json!({
685 "jsonrpc": "2.0",
686 "method": method,
687 "params": params,
688 });
689
690 backoff::future::retry(self.backoff(), || async {
691 let url = self.url.clone();
692 let response = self.post().json(&body).send().await.map_err(|source| {
693 backoff::Error::transient(super::Error::Request {
694 url: url.clone(),
695 source,
696 })
697 })?;
698
699 if response.status() == reqwest::StatusCode::NOT_FOUND {
700 return Err(backoff::Error::transient(
701 super::Error::SessionExpired { url: url.clone() },
702 ));
703 }
704 if !response.status().is_success() {
705 let code = response.status();
706 let body = response.text().await.unwrap_or_default();
707 return Err(backoff::Error::transient(
708 super::Error::BadStatus { url: url.clone(), code, body },
709 ));
710 }
711
712 Ok(())
713 })
714 .await
715 }
716
717 /// Returns a key identifying this connection for tool namespacing.
718 fn tool_key(&self) -> String {
719 format!("{}-{}", self.initialize_result.server_info.name, self.url)
720 }
721
722 /// Returns the session ID for this connection.
723 fn session_id(&self) -> &str {
724 &self.session_id
725 }
726
727 /// Sends a `tools/list` RPC call for a single page.
728 async fn rpc_list_tools(
729 &self,
730 cursor: Option<&str>,
731 ) -> Result<super::tool::ListToolsResult, super::Error> {
732 self.rpc(
733 "tools/list",
734 &super::tool::ListToolsRequest {
735 cursor: cursor.map(String::from),
736 },
737 true,
738 )
739 .await
740 }
741
742 /// Returns all tools from the server.
743 ///
744 /// Blocks until background pagination completes, then returns a
745 /// cheap `Arc` clone of the result.
746 async fn list_tools(
747 &self,
748 ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
749 self.tools.read().await.clone()
750 }
751
752 /// Calls a tool on the MCP server.
753 async fn call_tool(
754 &self,
755 params: &super::tool::CallToolRequestParams,
756 ) -> Result<super::tool::CallToolResult, super::Error> {
757 if self.mock {
758 return Ok(super::tool::CallToolResult {
759 content: vec![super::tool::ContentBlock::Text(super::tool::TextContent {
760 text: "mock".to_string(),
761 annotations: None,
762 _meta: None,
763 })],
764 structured_content: None,
765 is_error: None,
766 _meta: None,
767 });
768 }
769 self.rpc("tools/call", params, false).await
770 }
771
772 /// Calls a tool and converts the result into a [`ToolMessage`].
773 ///
774 /// Content blocks are mapped as follows:
775 /// - `text` → text part
776 /// - `image` → image_url part (data URL)
777 /// - `audio` → input_audio part
778 /// - `resource` (embedded text) → text part
779 /// - `resource` (embedded blob, image mime) → image_url part (data URL)
780 /// - `resource` (embedded blob, other mime) → file part
781 /// - `resource_link` → if the URI appears in `list_resources`, fetches
782 /// via `read_resource` and inlines the content using the same
783 /// text/blob rules; otherwise serializes the link as JSON text
784 ///
785 /// If `is_error` is set on the result, the content is prefixed with
786 /// an error indicator.
787 async fn call_tool_as_message(
788 &self,
789 params: &super::tool::CallToolRequestParams,
790 tool_call_id: String,
791 ) -> Result<
792 crate::agent::completions::message::ToolMessage,
793 super::Error,
794 > {
795 use crate::agent::completions::message::{
796 File, ImageUrl, InputAudio, RichContent, RichContentPart,
797 ToolMessage,
798 };
799 use super::shared::ResourceContentsUnion;
800 use super::tool::ContentBlock;
801
802 let result = self.call_tool(params).await?;
803
804 // Build the set of known resource URIs for resource_link resolution.
805 let known_resource_uris: std::collections::HashSet<String> =
806 match self.list_resources().await {
807 Ok(resources) => {
808 resources.iter().map(|r| r.uri.clone()).collect()
809 }
810 Err(_) => std::collections::HashSet::new(),
811 };
812
813 /// Converts a `ResourceContentsUnion` into one or more rich content
814 /// parts. Text resources become text parts. Blob resources with an
815 /// image MIME type become image_url parts (data URL); all other blobs
816 /// become file parts.
817 fn resource_contents_to_part(
818 contents: &ResourceContentsUnion,
819 ) -> RichContentPart {
820 match contents {
821 ResourceContentsUnion::Text(text) => {
822 RichContentPart::Text {
823 text: text.text.clone(),
824 }
825 }
826 ResourceContentsUnion::Blob(blob) => {
827 let mime = blob
828 .base
829 .mime_type
830 .as_deref()
831 .unwrap_or("application/octet-stream");
832
833 if mime.starts_with("image/") {
834 RichContentPart::ImageUrl {
835 image_url: ImageUrl {
836 url: format!(
837 "data:{};base64,{}",
838 mime, blob.blob
839 ),
840 detail: None,
841 },
842 }
843 } else {
844 // Extract a filename from the URI path, if any.
845 let filename = blob
846 .base
847 .uri
848 .rsplit('/')
849 .next()
850 .filter(|s| !s.is_empty())
851 .map(String::from);
852
853 RichContentPart::File {
854 file: File {
855 file_data: Some(blob.blob.clone()),
856 filename,
857 file_id: None,
858 file_url: None,
859 },
860 }
861 }
862 }
863 }
864 }
865
866 let mut parts: Vec<RichContentPart> = Vec::new();
867
868 for block in &result.content {
869 match block {
870 ContentBlock::Text(text) => {
871 parts.push(RichContentPart::Text {
872 text: text.text.clone(),
873 });
874 }
875 ContentBlock::Image(image) => {
876 parts.push(RichContentPart::ImageUrl {
877 image_url: ImageUrl {
878 url: format!(
879 "data:{};base64,{}",
880 image.mime_type, image.data
881 ),
882 detail: None,
883 },
884 });
885 }
886 ContentBlock::Audio(audio) => {
887 parts.push(RichContentPart::InputAudio {
888 input_audio: InputAudio {
889 data: audio.data.clone(),
890 format: audio.mime_type.clone(),
891 },
892 });
893 }
894 ContentBlock::EmbeddedResource(embedded) => {
895 parts.push(resource_contents_to_part(
896 &embedded.resource,
897 ));
898 }
899 ContentBlock::ResourceLink(link) => {
900 if known_resource_uris.contains(&link.uri) {
901 // Fetch the resource and inline its contents.
902 let read_result =
903 self.read_resource(&link.uri).await?;
904 for contents in &read_result.contents {
905 parts.push(
906 resource_contents_to_part(contents),
907 );
908 }
909 } else {
910 // Not a known resource; serialize as JSON text.
911 parts.push(RichContentPart::Text {
912 text: serde_json::to_string(link)
913 .unwrap_or_default(),
914 });
915 }
916 }
917 }
918 }
919
920 let content = match parts.len() {
921 0 => RichContent::Text(String::new()),
922 1 => match parts.remove(0) {
923 RichContentPart::Text { text } => RichContent::Text(text),
924 other => RichContent::Parts(vec![other]),
925 },
926 _ => RichContent::Parts(parts),
927 };
928
929 Ok(ToolMessage {
930 content,
931 tool_call_id,
932 })
933 }
934
935 /// Sends a `resources/list` RPC call for a single page.
936 async fn rpc_list_resources(
937 &self,
938 cursor: Option<&str>,
939 ) -> Result<super::resource::ListResourcesResult, super::Error> {
940 self.rpc(
941 "resources/list",
942 &super::resource::ListResourcesRequest {
943 cursor: cursor.map(String::from),
944 },
945 true,
946 )
947 .await
948 }
949
950 /// Returns all resources from the server.
951 ///
952 /// Blocks until background pagination completes, then returns a
953 /// cheap `Arc` clone of the result.
954 async fn list_resources(
955 &self,
956 ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
957 self.resources.read().await.clone()
958 }
959
960 /// Returns the cached tool list as soon as it differs from `current`,
961 /// or — if it equals `current` right now — waits up to `timeout` for
962 /// the cache to change and then returns whatever it sees.
963 ///
964 /// An `Err` cache is treated as "different from any caller snapshot"
965 /// and returned immediately.
966 ///
967 /// Concurrency-safe: any number of concurrent subscribers wait on
968 /// independent `Notified` futures and read the cache through the
969 /// shared `RwLock`. A timeout that fires alone is not an error — we
970 /// re-read the cache and return whatever's there.
971 async fn subscribe_tools(
972 &self,
973 current: &[super::tool::Tool],
974 timeout: Duration,
975 ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
976 // Arm BEFORE reading. `enable()` registers the future in the
977 // wait queue without polling, so a `notify_waiters` racing
978 // between our read and our await still wakes us.
979 let notified = self.tools_changed.notified();
980 tokio::pin!(notified);
981 notified.as_mut().enable();
982
983 let initial = self.tools.read().await.clone();
984 match &initial {
985 Ok(arc) if arc.as_slice() == current => {}
986 _ => return initial,
987 }
988
989 let _ = tokio::time::timeout(timeout, notified).await;
990
991 self.tools.read().await.clone()
992 }
993
994 /// Resource counterpart of [`Self::subscribe_tools`].
995 async fn subscribe_resources(
996 &self,
997 current: &[super::resource::Resource],
998 timeout: Duration,
999 ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
1000 let notified = self.resources_changed.notified();
1001 tokio::pin!(notified);
1002 notified.as_mut().enable();
1003
1004 let initial = self.resources.read().await.clone();
1005 match &initial {
1006 Ok(arc) if arc.as_slice() == current => {}
1007 _ => return initial,
1008 }
1009
1010 let _ = tokio::time::timeout(timeout, notified).await;
1011
1012 self.resources.read().await.clone()
1013 }
1014
1015 /// Reads a resource from the MCP server.
1016 async fn read_resource(
1017 &self,
1018 uri: &str,
1019 ) -> Result<super::resource::ReadResourceResult, super::Error> {
1020 self.rpc(
1021 "resources/read",
1022 &super::resource::ReadResourceRequestParams {
1023 uri: uri.to_string(),
1024 },
1025 true,
1026 )
1027 .await
1028 }
1029
1030 /// Re-fetches all tools from the server, replacing the cached list.
1031 ///
1032 /// Optionally fires `on_change` *after* the write lock is acquired but
1033 /// *before* the network paginate begins, so the callback observes the
1034 /// "list change is in flight" edge — readers blocked on the read lock
1035 /// won't return until the new list lands. The proxy uses this to
1036 /// re-emit `notifications/tools/list_changed` to its downstream client
1037 /// at the moment the staleness window opens.
1038 async fn refresh_tools(&self, on_change: Option<ListChangedCallback>) {
1039 // Listener-driven refresh. Visibility contract: any caller
1040 // that issues `list_tools()` after a `tools/list_changed`
1041 // notification has been observed must see the post-swap
1042 // value, not stale data — so the write lock has to gate
1043 // readers across the upstream paginate.
1044 //
1045 // Performance contract: don't serialise paginate *behind*
1046 // lock-acquisition latency. We start `tools.write()` and the
1047 // upstream paginate **concurrently** with `tokio::join!`. The
1048 // write-lock acquire blocks new `list_tools()` readers
1049 // immediately (preserving visibility) and runs in parallel
1050 // with whatever drain time the in-flight readers need; the
1051 // paginate runs alongside. Total wall-clock is
1052 // `max(drain_time, paginate_time)` instead of the sum.
1053 //
1054 // `notify_waiters` and `on_change` fire under the write
1055 // guard, *after* `*guard = result`, so anyone awoken by them
1056 // queues on the read lock, waits for the guard to drop, and
1057 // observes the post-swap state.
1058 let (mut guard, result) = tokio::join!(
1059 self.tools.write(),
1060 self.paginate_tools(),
1061 );
1062 *guard = result;
1063 self.tools_changed.notify_waiters();
1064 if let Some(cb) = on_change {
1065 cb();
1066 }
1067 }
1068
1069 /// Page-by-page fetch of the upstream tool list, no locks held.
1070 /// Shared between the `_signaling` (initial-populate, holds lock
1071 /// for the original "block fast readers" contract) and `refresh_*`
1072 /// (listener-driven, lock-only-around-install) variants.
1073 async fn paginate_tools(
1074 &self,
1075 ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
1076 let mut all_tools = Vec::new();
1077 let mut cursor: Option<String> = None;
1078 loop {
1079 match self.rpc_list_tools(cursor.as_deref()).await {
1080 Ok(page) => {
1081 all_tools.extend(page.tools);
1082 cursor = page.next_cursor;
1083 if cursor.is_none() {
1084 return Ok(Arc::new(all_tools));
1085 }
1086 }
1087 Err(e) => return Err(Arc::new(e)),
1088 }
1089 }
1090 }
1091
1092 /// Same as [`Self::refresh_tools`] but fires `lock_held` once the
1093 /// write lock has been acquired so the caller can synchronise on
1094 /// "writer is in possession of the cache" before returning. Used by
1095 /// `ConnectionInner::new` to prevent a fast reader from acquiring
1096 /// the read lock before this writer has even started.
1097 async fn refresh_tools_signaling(
1098 &self,
1099 lock_held: tokio::sync::oneshot::Sender<()>,
1100 on_change: Option<ListChangedCallback>,
1101 ) {
1102 let mut guard = self.tools.write().await;
1103 // Fire `tools_changed` while we hold the write lock and *before*
1104 // installing the new list. Any subscriber woken now must take a
1105 // read lock to observe the result, and that read lock is queued
1106 // behind this write guard — so they always see the post-swap
1107 // state, never mid-swap.
1108 self.tools_changed.notify_waiters();
1109 let _ = lock_held.send(());
1110 if let Some(cb) = on_change {
1111 cb();
1112 }
1113 let mut all_tools = Vec::new();
1114 let mut cursor: Option<String> = None;
1115 let result = loop {
1116 match self.rpc_list_tools(cursor.as_deref()).await {
1117 Ok(page) => {
1118 all_tools.extend(page.tools);
1119 cursor = page.next_cursor;
1120 if cursor.is_none() {
1121 break Ok(Arc::new(all_tools));
1122 }
1123 }
1124 Err(e) => break Err(Arc::new(e)),
1125 }
1126 };
1127 *guard = result;
1128 }
1129
1130 /// Re-fetches all resources from the server, replacing the cached list.
1131 /// See [`ConnectionInner::refresh_tools`] for the callback timing
1132 /// contract.
1133 async fn refresh_resources(&self, on_change: Option<ListChangedCallback>) {
1134 // Same paginate-while-acquiring-the-write-lock pattern as
1135 // `refresh_tools` — see that comment for the visibility +
1136 // performance rationale.
1137 let (mut guard, result) = tokio::join!(
1138 self.resources.write(),
1139 self.paginate_resources(),
1140 );
1141 *guard = result;
1142 self.resources_changed.notify_waiters();
1143 if let Some(cb) = on_change {
1144 cb();
1145 }
1146 }
1147
1148 async fn paginate_resources(
1149 &self,
1150 ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
1151 let mut all_resources = Vec::new();
1152 let mut cursor: Option<String> = None;
1153 loop {
1154 match self.rpc_list_resources(cursor.as_deref()).await {
1155 Ok(page) => {
1156 all_resources.extend(page.resources);
1157 cursor = page.next_cursor;
1158 if cursor.is_none() {
1159 return Ok(Arc::new(all_resources));
1160 }
1161 }
1162 Err(e) => return Err(Arc::new(e)),
1163 }
1164 }
1165 }
1166
1167 /// Resource counterpart of [`Self::refresh_tools_signaling`].
1168 async fn refresh_resources_signaling(
1169 &self,
1170 lock_held: tokio::sync::oneshot::Sender<()>,
1171 on_change: Option<ListChangedCallback>,
1172 ) {
1173 let mut guard = self.resources.write().await;
1174 // See `refresh_tools_signaling` — fire under the write lock,
1175 // before install, so subscribers' next read sees the post-swap
1176 // state.
1177 self.resources_changed.notify_waiters();
1178 let _ = lock_held.send(());
1179 if let Some(cb) = on_change {
1180 cb();
1181 }
1182 let mut all_resources = Vec::new();
1183 let mut cursor: Option<String> = None;
1184 let result = loop {
1185 match self.rpc_list_resources(cursor.as_deref()).await {
1186 Ok(page) => {
1187 all_resources.extend(page.resources);
1188 cursor = page.next_cursor;
1189 if cursor.is_none() {
1190 break Ok(Arc::new(all_resources));
1191 }
1192 }
1193 Err(e) => break Err(Arc::new(e)),
1194 }
1195 };
1196 *guard = result;
1197 }
1198
1199 /// Builds a GET request to the MCP endpoint for receiving server
1200 /// notifications via SSE.
1201 fn get(&self) -> reqwest::RequestBuilder {
1202 let mut request = self
1203 .http_client
1204 .get(&self.url)
1205 .header("Accept", "text/event-stream");
1206 for (name, value) in &self.headers {
1207 request = request.header(name, value);
1208 }
1209 // Mcp-Session-Id last so it always wins over `headers`.
1210 request = request.header("Mcp-Session-Id", &self.session_id);
1211 request
1212 }
1213
1214 /// Listens for `notifications/tools/list_changed` and
1215 /// `notifications/resources/list_changed` on an SSE stream. On each
1216 /// notification, write-locks and re-fetches the full list.
1217 ///
1218 /// `initial_lines` is the pre-opened SSE line reader handed in by
1219 /// [`Client::connect`](super::Client::connect) — that stream is
1220 /// consumed first. When it ends (or any later GET reconnect ends),
1221 /// we sleep `backoff_initial_interval` and open a fresh GET `/` SSE
1222 /// stream.
1223 ///
1224 /// Takes a [`Weak<Self>`] (not `Arc<Self>`) so the spawned task
1225 /// doesn't itself keep the [`Connection`] alive, and a
1226 /// [`CancellationToken`] sibling clone of the connection's
1227 /// [`DropGuard`] so the task tears down the instant the last
1228 /// external `Arc<ConnectionInner>` clone is dropped — every
1229 /// blocking await (line read, reconnect send, backoff sleep) is
1230 /// raced against `cancel.cancelled()` and exits without any zombie
1231 /// retries against a now-dead session.
1232 async fn listen_for_list_changes(
1233 weak: Weak<Self>,
1234 cancel: CancellationToken,
1235 initial_lines: super::LinesStream,
1236 ) {
1237 // First iteration: use the pre-opened SSE stream the client
1238 // handed us. After that, fall back to opening fresh GET / SSE
1239 // streams as the upstream connection cycles.
1240 let mut next_lines: Option<super::LinesStream> = Some(initial_lines);
1241 // One-shot guard for the catch-up refresh: false on the very
1242 // first iteration (the caller's pre-opened SSE stream — its
1243 // associated cache was just populated by `Client::connect`'s
1244 // initial pagination, so re-fetching there would just be a
1245 // wasted round-trip), true thereafter. Every stream end —
1246 // whether `Ok(None)` (clean close) or `Err(_)` (read failure)
1247 // — drops back here, which we treat as an implicit
1248 // list-changed notification: the upstream's broadcast (in
1249 // particular the proxy's per-session `tokio::broadcast`) is
1250 // lossy for moments when this listener has zero active
1251 // subscribers, so anything that fired during our disconnect
1252 // window may have been dropped.
1253 //
1254 // ORDER MATTERS. The refresh must run AFTER we've re-opened
1255 // the GET / SSE stream — i.e. after we're a subscriber again
1256 // — and BEFORE we enter the inner read loop. If we refreshed
1257 // before the resubscribe, a notification that fired between
1258 // our refresh-completion and our subscribe would be lost the
1259 // same way as the original disconnect-window drops; doing it
1260 // after means a notification fired DURING the refresh lands
1261 // in the new subscriber's buffer (broadcast::Sender::send
1262 // backs onto each receiver's channel-capacity slot) and gets
1263 // consumed by the inner loop on its next read.
1264 let mut is_reconnect = false;
1265
1266 loop {
1267 // The token cancels deterministically when the last
1268 // `Arc<ConnectionInner>` clone is dropped (see
1269 // `_listener_cancel_guard`). Check once per outer
1270 // iteration, but the real protection is the cancel arms in
1271 // every blocking await below — those exit immediately on
1272 // cancel.
1273 if cancel.is_cancelled() {
1274 return;
1275 }
1276 let Some(this) = weak.upgrade() else { return };
1277 let backoff_delay = this.backoff_initial_interval;
1278
1279 let mut lines = match next_lines.take() {
1280 Some(l) => l,
1281 None => {
1282 // Race the upstream GET against cancellation — if
1283 // the connection drops mid-reconnect, exit
1284 // immediately rather than waiting for the request
1285 // to complete or time out (otherwise produces a
1286 // burst of 401 retries against a now-dead session
1287 // under heavy churn).
1288 let send_outcome = tokio::select! {
1289 out = this.get().send() => out,
1290 _ = cancel.cancelled() => {
1291 drop(this);
1292 return;
1293 }
1294 };
1295 let response = match send_outcome {
1296 Ok(r) if r.status().is_success() => r,
1297 _ => {
1298 drop(this);
1299 // Sleep with cancel-arm: instant exit on
1300 // drop, no zombie retries.
1301 tokio::select! {
1302 _ = tokio::time::sleep(backoff_delay) => {}
1303 _ = cancel.cancelled() => return,
1304 }
1305 continue;
1306 }
1307 };
1308 super::lines_from_response(response)
1309 }
1310 };
1311
1312 // Catch-up refresh on every reconnect — the implicit
1313 // list-changed treatment for the just-failed stream. See
1314 // the `is_reconnect` doc-comment above for the
1315 // refresh-AFTER-resubscribe rationale.
1316 if is_reconnect {
1317 // tools and resources are independent locks; run the
1318 // catch-up refreshes concurrently so disconnect
1319 // recovery isn't sequential.
1320 let _ = tokio::join!(
1321 this.refresh_tools(this.on_tools_list_changed.get()),
1322 this.refresh_resources(this.on_resources_list_changed.get()),
1323 );
1324 }
1325 is_reconnect = true;
1326
1327 'inner: loop {
1328 tokio::select! {
1329 line_result = lines.next_line() => {
1330 match line_result {
1331 Ok(Some(line)) => {
1332 // SSE data lines start with "data: ".
1333 let Some(data) = line.strip_prefix("data: ") else {
1334 continue 'inner;
1335 };
1336 let method = match serde_json::from_str::<super::JsonRpcNotification>(data) {
1337 Ok(n) => n.method,
1338 Err(_) => continue 'inner,
1339 };
1340 match method.as_str() {
1341 "notifications/tools/list_changed" => {
1342 // refresh_tools fires the
1343 // callback after the cache is
1344 // installed, so the proxy's
1345 // downstream
1346 // notifications/tools/list_changed
1347 // emission lines up with the
1348 // staleness window opening.
1349 this.refresh_tools(
1350 this.on_tools_list_changed.get(),
1351 )
1352 .await;
1353 }
1354 "notifications/resources/list_changed" => {
1355 this.refresh_resources(
1356 this.on_resources_list_changed.get(),
1357 )
1358 .await;
1359 }
1360 _ => {}
1361 }
1362 }
1363 // Stream ended cleanly or errored — break out
1364 // to the outer loop so we either reconnect or,
1365 // if everyone's gone, exit at the top.
1366 _ => break 'inner,
1367 }
1368 }
1369 // Cancellation: the connection's last clone has
1370 // dropped. Tear down immediately.
1371 _ = cancel.cancelled() => {
1372 drop(this);
1373 return;
1374 }
1375 }
1376 }
1377
1378 // Stream ended — drop the strong ref before sleeping so the
1379 // next iteration's weak-upgrade can detect liveness honestly.
1380 drop(this);
1381 tokio::select! {
1382 _ = tokio::time::sleep(backoff_delay) => {}
1383 _ = cancel.cancelled() => return,
1384 }
1385 }
1386 }
1387}
1388
1389#[cfg(test)]
1390mod subscribe_tests {
1391 use super::*;
1392 use crate::mcp::tool::{Tool, ToolSchemaObject, ToolSchemaType};
1393
1394 fn tool(name: &str) -> Tool {
1395 Tool {
1396 name: name.to_string(),
1397 title: None,
1398 description: None,
1399 icons: None,
1400 input_schema: ToolSchemaObject {
1401 r#type: ToolSchemaType::Object,
1402 properties: None,
1403 required: None,
1404 extra: IndexMap::new(),
1405 },
1406 output_schema: None,
1407 annotations: None,
1408 execution: None,
1409 _meta: None,
1410 }
1411 }
1412
1413 /// First read shows a different list — return immediately, never wait.
1414 #[tokio::test]
1415 async fn subscribe_tools_returns_immediately_when_cache_differs() {
1416 let conn = Connection::new_for_test("t".into(), "http://x".into());
1417 *conn.inner.tools.write().await = Ok(Arc::new(vec![tool("a")]));
1418
1419 let start = std::time::Instant::now();
1420 let got = conn
1421 .subscribe_tools(&[tool("b")], Duration::from_secs(5))
1422 .await
1423 .unwrap();
1424 assert!(start.elapsed() < Duration::from_millis(100));
1425 assert_eq!(got.as_slice(), &[tool("a")]);
1426 }
1427
1428 /// Cached `Err` is treated as "different from any caller snapshot."
1429 #[tokio::test]
1430 async fn subscribe_tools_returns_err_immediately() {
1431 let conn = Connection::new_for_test("t".into(), "http://x".into());
1432 let err = super::super::Error::NoSessionId {
1433 url: "http://x".into(),
1434 body: String::new(),
1435 };
1436 *conn.inner.tools.write().await = Err(Arc::new(err));
1437
1438 let start = std::time::Instant::now();
1439 let got = conn
1440 .subscribe_tools(&[], Duration::from_secs(5))
1441 .await;
1442 assert!(start.elapsed() < Duration::from_millis(100));
1443 assert!(got.is_err());
1444 }
1445
1446 /// Cache equals snapshot, then a writer fires the notify under the
1447 /// write lock and installs a new list. The subscriber wakes, then its
1448 /// re-read blocks behind the writer's guard, observes the new list.
1449 #[tokio::test]
1450 async fn subscribe_tools_wakes_on_change_and_reads_post_swap() {
1451 let conn = Connection::new_for_test("t".into(), "http://x".into());
1452 *conn.inner.tools.write().await = Ok(Arc::new(vec![tool("a")]));
1453
1454 let conn_for_subscriber = conn.clone();
1455 let subscriber = tokio::spawn(async move {
1456 conn_for_subscriber
1457 .subscribe_tools(&[tool("a")], Duration::from_secs(5))
1458 .await
1459 .unwrap()
1460 });
1461
1462 // Give the subscriber a moment to arm `notified()` and finish
1463 // its first read so it's parked on the timeout.
1464 tokio::time::sleep(Duration::from_millis(50)).await;
1465
1466 // Simulate `refresh_tools_signaling`: take the write lock, fire
1467 // `tools_changed` *while holding* the write lock, then install
1468 // the new value before releasing. This is exactly the ordering
1469 // that the real refresh path uses.
1470 {
1471 let mut guard = conn.inner.tools.write().await;
1472 conn.inner.tools_changed.notify_waiters();
1473 // Hold briefly to make absolutely sure the subscriber is
1474 // racing the read lock against our drop.
1475 tokio::time::sleep(Duration::from_millis(20)).await;
1476 *guard = Ok(Arc::new(vec![tool("b")]));
1477 }
1478
1479 let got = tokio::time::timeout(Duration::from_secs(2), subscriber)
1480 .await
1481 .expect("subscriber returned in time")
1482 .expect("subscriber didn't panic");
1483 assert_eq!(got.as_slice(), &[tool("b")]);
1484 }
1485
1486 /// Cache equals snapshot, no notification arrives — timeout, return
1487 /// the still-equal list (not an error).
1488 #[tokio::test]
1489 async fn subscribe_tools_times_out_and_returns_unchanged_list() {
1490 let conn = Connection::new_for_test("t".into(), "http://x".into());
1491 *conn.inner.tools.write().await = Ok(Arc::new(vec![tool("a")]));
1492
1493 let start = std::time::Instant::now();
1494 let got = conn
1495 .subscribe_tools(&[tool("a")], Duration::from_millis(50))
1496 .await
1497 .unwrap();
1498 let elapsed = start.elapsed();
1499 assert!(elapsed >= Duration::from_millis(40), "elapsed: {elapsed:?}");
1500 assert!(elapsed < Duration::from_millis(500), "elapsed: {elapsed:?}");
1501 assert_eq!(got.as_slice(), &[tool("a")]);
1502 }
1503
1504 /// Two concurrent subscribers both wake on a single notify_waiters
1505 /// and both observe the post-swap list.
1506 #[tokio::test]
1507 async fn subscribe_tools_supports_concurrent_subscribers() {
1508 let conn = Connection::new_for_test("t".into(), "http://x".into());
1509 *conn.inner.tools.write().await = Ok(Arc::new(vec![tool("a")]));
1510
1511 let c1 = conn.clone();
1512 let c2 = conn.clone();
1513 let s1 = tokio::spawn(async move {
1514 c1.subscribe_tools(&[tool("a")], Duration::from_secs(5))
1515 .await
1516 .unwrap()
1517 });
1518 let s2 = tokio::spawn(async move {
1519 c2.subscribe_tools(&[tool("a")], Duration::from_secs(5))
1520 .await
1521 .unwrap()
1522 });
1523
1524 tokio::time::sleep(Duration::from_millis(50)).await;
1525
1526 {
1527 let mut guard = conn.inner.tools.write().await;
1528 conn.inner.tools_changed.notify_waiters();
1529 *guard = Ok(Arc::new(vec![tool("c")]));
1530 }
1531
1532 let (r1, r2) = tokio::join!(s1, s2);
1533 let r1 = r1.unwrap();
1534 let r2 = r2.unwrap();
1535 assert_eq!(r1.as_slice(), &[tool("c")]);
1536 assert_eq!(r2.as_slice(), &[tool("c")]);
1537 }
1538}