Skip to main content

objectiveai_sdk/mcp/
connection.rs

1//! MCP connection for communicating with an MCP server.
2//!
3//! [`Connection`] is a cheaply-clonable handle around an internal
4//! [`ConnectionInner`]. The last drop of the inner `Arc` runs
5//! [`ConnectionInner`]'s `Drop`, which cancels the listener task's
6//! [`tokio_util::sync::CancellationToken`] (held in `_listener_cancel_guard`
7//! as a [`tokio_util::sync::DropGuard`]) — the SSE listener exits the
8//! instant any in-flight reconnect, sleep, or read is cancelled, with no
9//! zombie 401 retries against a now-dead proxy session.
10
11use std::ops::Deref;
12use std::sync::{Arc, RwLock as StdRwLock, Weak};
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::time::Duration;
15
16use indexmap::IndexMap;
17use tokio::sync::{Notify, RwLock};
18use tokio_util::sync::{CancellationToken, DropGuard};
19
20/// Callback fired by [`Connection`] when the upstream MCP server emits
21/// `notifications/tools/list_changed` or `notifications/resources/list_changed`.
22///
23/// **Timing:** runs after the corresponding cache's write lock is taken
24/// but *before* the network paginate that replaces it. That ordering
25/// matches the moment the staleness window opens — anyone blocked on the
26/// read lock won't return until the new list lands. The callback should
27/// not call back into `list_tools` / `list_resources`: doing so would
28/// re-take the lock the listener already holds and deadlock.
29///
30/// Stored behind an `Arc` so the listener task can cheaply clone it out
31/// of the lock and call it without holding the read guard.
32pub type ListChangedCallback = Arc<dyn Fn() + Send + Sync + 'static>;
33
34/// A registered-or-not callback slot. Wrapper so [`ConnectionInner`] can
35/// keep `#[derive(Debug)]` (a raw `dyn Fn` isn't `Debug`).
36struct CallbackSlot(StdRwLock<Option<ListChangedCallback>>);
37
38impl CallbackSlot {
39    fn new() -> Self {
40        Self(StdRwLock::new(None))
41    }
42
43    fn set(&self, callback: ListChangedCallback) {
44        *self.0.write().unwrap() = Some(callback);
45    }
46
47    /// Cheap clone-out of the current callback (if any). The `Arc` clone
48    /// lets us release the read guard before invoking the callback.
49    fn get(&self) -> Option<ListChangedCallback> {
50        self.0.read().unwrap().clone()
51    }
52}
53
54impl std::fmt::Debug for CallbackSlot {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        let set = self.0.read().map(|g| g.is_some()).unwrap_or(false);
57        f.debug_struct("CallbackSlot").field("set", &set).finish()
58    }
59}
60
61/// An active connection to an MCP server using the Streamable HTTP transport.
62///
63/// Cheaply clonable (one `Arc` bump). When the last clone is dropped, the
64/// inner `Arc` ref count hits zero, [`ConnectionInner::Drop`] runs, the
65/// listener-cancel `DropGuard` is dropped, and the SSE listener task is
66/// cancelled — exiting any in-flight `lines.next_line()`, reconnect
67/// `send()`, or backoff `sleep` *immediately* without retrying against
68/// the now-dead proxy session.
69///
70/// Use the public methods (`list_tools`, `call_tool`, `list_resources`,
71/// `read_resource`, `call_tool_as_message`, `tool_key`) for the upstream
72/// MCP protocol surface. The inner state ([`ConnectionInner`]) is also
73/// reachable via `Deref` for read-only field access (e.g.
74/// `connection.url`, `connection.initialize_result.server_info.name`),
75/// but its methods are private — you must go through `Connection`.
76#[derive(Debug)]
77pub struct Connection {
78    inner: Arc<ConnectionInner>,
79}
80
81impl Clone for Connection {
82    fn clone(&self) -> Self {
83        Self { inner: Arc::clone(&self.inner) }
84    }
85}
86
87// No `Drop` for `Connection`: cancellation happens deterministically
88// when the last `Arc<ConnectionInner>` clone is dropped, which runs
89// `ConnectionInner::drop` and releases the cancel-token DropGuard.
90
91impl Deref for Connection {
92    type Target = ConnectionInner;
93    fn deref(&self) -> &ConnectionInner {
94        &self.inner
95    }
96}
97
98impl Connection {
99    pub(super) async fn new(
100        http_client: reqwest::Client,
101        url: String,
102        session_id: String,
103        headers: IndexMap<String, String>,
104        backoff_current_interval: Duration,
105        backoff_initial_interval: Duration,
106        backoff_randomization_factor: f64,
107        backoff_multiplier: f64,
108        backoff_max_interval: Duration,
109        backoff_max_elapsed_time: Duration,
110        call_timeout: Duration,
111        initialize_result: super::initialize_result::InitializeResult,
112        initial_sse_lines: Option<super::LinesStream>,
113    ) -> Self {
114        let inner = ConnectionInner::new(
115            http_client,
116            url,
117            session_id,
118            headers,
119            backoff_current_interval,
120            backoff_initial_interval,
121            backoff_randomization_factor,
122            backoff_multiplier,
123            backoff_max_interval,
124            backoff_max_elapsed_time,
125            call_timeout,
126            initialize_result,
127            initial_sse_lines,
128        )
129        .await;
130        Self { inner }
131    }
132
133
134    pub(super) fn new_mock(url: String) -> Self {
135        Self { inner: ConnectionInner::new_mock(url) }
136    }
137
138    #[cfg(test)]
139    pub(crate) fn new_for_test(name: String, url: String) -> Self {
140        Self { inner: ConnectionInner::new_for_test(name, url) }
141    }
142
143    /// Send a JSON-RPC notification to the upstream. Used by `Client`
144    /// right after `initialize` to send `notifications/initialized`.
145    pub(super) async fn notify<P: serde::Serialize>(
146        &self,
147        method: &str,
148        params: &P,
149    ) -> Result<(), super::Error> {
150        self.inner.notify(method, params).await
151    }
152
153    /// Returns a key identifying this connection for tool namespacing.
154    pub fn tool_key(&self) -> String {
155        self.inner.tool_key()
156    }
157
158    /// Returns the session ID for this connection.
159    pub fn session_id(&self) -> &str {
160        self.inner.session_id()
161    }
162
163    /// Returns all tools from the upstream server.
164    pub async fn list_tools(
165        &self,
166    ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
167        self.inner.list_tools().await
168    }
169
170    /// Calls a tool on the upstream server.
171    pub async fn call_tool(
172        &self,
173        params: &super::tool::CallToolRequestParams,
174    ) -> Result<super::tool::CallToolResult, super::Error> {
175        self.inner.call_tool(params).await
176    }
177
178    /// Calls a tool and converts the result into a [`ToolMessage`].
179    pub async fn call_tool_as_message(
180        &self,
181        params: &super::tool::CallToolRequestParams,
182        tool_call_id: String,
183    ) -> Result<
184        crate::agent::completions::message::ToolMessage,
185        super::Error,
186    > {
187        self.inner.call_tool_as_message(params, tool_call_id).await
188    }
189
190    /// Returns all resources from the upstream server.
191    pub async fn list_resources(
192        &self,
193    ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
194        self.inner.list_resources().await
195    }
196
197    /// Returns the cached tool list as soon as it differs from `current`,
198    /// or waits up to `timeout` for the next `notifications/tools/list_changed`
199    /// from the upstream server before re-reading.
200    ///
201    /// Wakes the moment a refresh writer takes the cache write lock, so
202    /// the post-wake `read` is guaranteed to observe the new list rather
203    /// than racing against the install. Safe to call from any number of
204    /// tasks concurrently.
205    pub async fn subscribe_tools(
206        &self,
207        current: &[super::tool::Tool],
208        timeout: Duration,
209    ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
210        self.inner.subscribe_tools(current, timeout).await
211    }
212
213    /// Resource counterpart of [`Connection::subscribe_tools`].
214    pub async fn subscribe_resources(
215        &self,
216        current: &[super::resource::Resource],
217        timeout: Duration,
218    ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
219        self.inner.subscribe_resources(current, timeout).await
220    }
221
222    /// Reads a resource from the upstream server.
223    pub async fn read_resource(
224        &self,
225        uri: &str,
226    ) -> Result<super::resource::ReadResourceResult, super::Error> {
227        self.inner.read_resource(uri).await
228    }
229
230    /// Register a callback to fire whenever the upstream emits
231    /// `notifications/tools/list_changed`.
232    ///
233    /// **Timing:** the callback runs *after* the tool cache's write lock
234    /// is acquired but *before* the network paginate that replaces it.
235    /// That means readers blocked on the read lock won't return until the
236    /// new list is in place, and the callback observes the moment the
237    /// staleness window opens. The proxy uses this to emit its own
238    /// `notifications/tools/list_changed` to downstream clients at the
239    /// right instant.
240    ///
241    /// Replaces any previously-registered tools-list-changed callback.
242    /// All clones of this `Connection` share the same callback slot.
243    pub fn set_on_tools_list_changed<F>(&self, callback: F)
244    where
245        F: Fn() + Send + Sync + 'static,
246    {
247        self.inner.on_tools_list_changed.set(Arc::new(callback));
248    }
249
250    /// Register a callback to fire whenever the upstream emits
251    /// `notifications/resources/list_changed`. Same timing contract as
252    /// [`Connection::set_on_tools_list_changed`].
253    ///
254    /// Replaces any previously-registered resources-list-changed callback.
255    /// All clones of this `Connection` share the same callback slot.
256    pub fn set_on_resources_list_changed<F>(&self, callback: F)
257    where
258        F: Fn() + Send + Sync + 'static,
259    {
260        self.inner.on_resources_list_changed.set(Arc::new(callback));
261    }
262}
263
264/// The actual connection state. Behind an `Arc` inside [`Connection`].
265///
266/// Fields are public for read-only access (callers reach them via
267/// `Connection`'s `Deref`), but every method on this type is private —
268/// the public surface lives on [`Connection`] and delegates through.
269#[derive(Debug)]
270pub struct ConnectionInner {
271    pub http_client: reqwest::Client,
272    pub url: String,
273    pub session_id: String,
274    /// All HTTP headers stamped on every POST / GET this connection
275    /// makes — the same merged map (defaults + caller overrides) the
276    /// `Client` built once during connect. `Mcp-Session-Id`,
277    /// `Content-Type`, and `Accept` are still set by the request
278    /// builders and override anything in `headers`.
279    pub headers: IndexMap<String, String>,
280
281    pub backoff_current_interval: Duration,
282    pub backoff_initial_interval: Duration,
283    pub backoff_randomization_factor: f64,
284    pub backoff_multiplier: f64,
285    pub backoff_max_interval: Duration,
286    pub backoff_max_elapsed_time: Duration,
287    pub call_timeout: Duration,
288
289    /// The server's capabilities and info from the initialize response.
290    pub initialize_result: super::initialize_result::InitializeResult,
291
292    /// If true, all RPC/notify calls are no-ops. Used for mock orchestrator URLs.
293    mock: bool,
294
295    /// Auto-incrementing request ID (starts at 2; 1 was used for initialize).
296    next_id: AtomicU64,
297
298    /// All tools from the server, populated by background pagination.
299    tools: RwLock<Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>>>,
300    /// All resources from the server, populated by background pagination.
301    resources:
302        RwLock<Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>>>,
303
304    /// Cancellation token for the long-lived `listen_for_list_changes`
305    /// task. The listener selects this against every blocking await
306    /// (read, reconnect-send, backoff-sleep) and returns the instant it
307    /// fires.
308    ///
309    /// Held inside the connection as a [`DropGuard`] so that the moment
310    /// the last `Arc<ConnectionInner>` clone is dropped — i.e. the
311    /// moment no external `Connection` handle remains — `Drop` runs on
312    /// the guard, the token cancels, and the listener task tears down.
313    /// The listener itself holds a sibling `CancellationToken` (clone),
314    /// not the guard, so its task does not extend the connection's
315    /// lifetime.
316    _listener_cancel_guard: Option<DropGuard>,
317
318    /// Optional callback fired *after* the listener has refreshed the
319    /// tool cache in response to an upstream `notifications/tools/list_changed`.
320    /// Set via [`Connection::set_on_tools_list_changed`].
321    on_tools_list_changed: CallbackSlot,
322
323    /// Optional callback fired *after* the listener has refreshed the
324    /// resource cache in response to an upstream
325    /// `notifications/resources/list_changed`.
326    /// Set via [`Connection::set_on_resources_list_changed`].
327    on_resources_list_changed: CallbackSlot,
328
329    /// Wakes any task awaiting in [`Connection::subscribe_tools`]. Fired
330    /// from inside `refresh_tools_signaling` the moment the writer
331    /// acquires the cache write lock — *before* the new list is
332    /// installed. A woken subscriber's next `read().await` blocks behind
333    /// the writer's guard, so it always observes the post-swap state.
334    tools_changed: Notify,
335
336    /// Resource counterpart of [`Self::tools_changed`].
337    resources_changed: Notify,
338}
339
340impl ConnectionInner {
341    /// Creates a mock connection that never makes network requests.
342    /// All RPC calls return empty/default results.
343    fn new_mock(url: String) -> Arc<Self> {
344        Arc::new(Self {
345            http_client: reqwest::Client::new(),
346            url,
347            session_id: String::new(),
348            headers: IndexMap::new(),
349            backoff_current_interval: Duration::ZERO,
350            backoff_initial_interval: Duration::ZERO,
351            backoff_randomization_factor: 0.0,
352            backoff_multiplier: 1.0,
353            backoff_max_interval: Duration::ZERO,
354            backoff_max_elapsed_time: Duration::ZERO,
355            call_timeout: Duration::ZERO,
356            initialize_result: super::initialize_result::InitializeResult {
357                protocol_version: "2025-03-26".into(),
358                capabilities: super::initialize_result::ServerCapabilities {
359                    experimental: None,
360                    logging: None,
361                    completions: None,
362                    prompts: None,
363                    resources: None,
364                    tools: None,
365                    tasks: None,
366                },
367                server_info: super::initialize_result::Implementation {
368                    name: "mock".into(),
369                    title: None,
370                    version: "0.0.0".into(),
371                    website_url: None,
372                    description: None,
373                    icons: None,
374                },
375                instructions: None,
376                _meta: None,
377            },
378            mock: true,
379            next_id: AtomicU64::new(2),
380            tools: RwLock::new(Ok(Arc::new(Vec::new()))),
381            resources: RwLock::new(Ok(Arc::new(Vec::new()))),
382            _listener_cancel_guard: None,
383            on_tools_list_changed: CallbackSlot::new(),
384            on_resources_list_changed: CallbackSlot::new(),
385            tools_changed: Notify::new(),
386            resources_changed: Notify::new(),
387        })
388    }
389
390    /// Creates a minimal connection for unit testing.
391    #[cfg(test)]
392    fn new_for_test(name: String, url: String) -> Arc<Self> {
393        Arc::new(Self {
394            http_client: reqwest::Client::new(),
395            url,
396            session_id: String::new(),
397            headers: IndexMap::new(),
398            backoff_current_interval: Duration::from_millis(500),
399            backoff_initial_interval: Duration::from_millis(500),
400            backoff_randomization_factor: 0.5,
401            backoff_multiplier: 1.5,
402            backoff_max_interval: Duration::from_secs(60),
403            backoff_max_elapsed_time: Duration::from_secs(900),
404            call_timeout: Duration::from_secs(30),
405            initialize_result: super::initialize_result::InitializeResult {
406                protocol_version: "2025-03-26".into(),
407                capabilities:
408                    super::initialize_result::ServerCapabilities {
409                        experimental: None,
410                        logging: None,
411                        completions: None,
412                        prompts: None,
413                        resources: None,
414                        tools: None,
415                        tasks: None,
416                    },
417                server_info: super::initialize_result::Implementation {
418                    name,
419                    title: None,
420                    version: "0.0.0".into(),
421                    website_url: None,
422                    description: None,
423                    icons: None,
424                },
425                instructions: None,
426                _meta: None,
427            },
428            mock: false,
429            next_id: AtomicU64::new(2),
430            tools: RwLock::new(Ok(Arc::new(Vec::new()))),
431            resources: RwLock::new(Ok(Arc::new(Vec::new()))),
432            _listener_cancel_guard: None,
433            on_tools_list_changed: CallbackSlot::new(),
434            on_resources_list_changed: CallbackSlot::new(),
435            tools_changed: Notify::new(),
436            resources_changed: Notify::new(),
437        })
438    }
439
440    /// Creates a new connection and spawns background tasks to paginate
441    /// all tools and resources. Called internally by
442    /// [`Client::connect`](super::Client::connect) (via [`Connection::new`]).
443    ///
444    /// `initial_sse_lines`, if `Some`, is a pre-opened SSE line reader
445    /// that the list-changed listener will read from immediately on its
446    /// first iteration, instead of opening its own GET `/`. The caller
447    /// is responsible for arranging for one of these to exist whenever
448    /// the upstream advertises `tools.list_changed` or
449    /// `resources.list_changed` — see
450    /// [`Client::connect`](super::Client::connect).
451    async fn new(
452        http_client: reqwest::Client,
453        url: String,
454        session_id: String,
455        headers: IndexMap<String, String>,
456        backoff_current_interval: Duration,
457        backoff_initial_interval: Duration,
458        backoff_randomization_factor: f64,
459        backoff_multiplier: f64,
460        backoff_max_interval: Duration,
461        backoff_max_elapsed_time: Duration,
462        call_timeout: Duration,
463        initialize_result: super::initialize_result::InitializeResult,
464        initial_sse_lines: Option<super::LinesStream>,
465    ) -> Arc<Self> {
466        // Cancel-the-listener machinery: store the DropGuard inside the
467        // inner so the cancellation fires deterministically when the
468        // last external `Arc<ConnectionInner>` clone drops. Hand the
469        // listener task a sibling clone (no guard) — that way the
470        // listener task's lifetime does not extend the connection.
471        let listener_cancel = CancellationToken::new();
472        let listener_cancel_for_task = listener_cancel.clone();
473        let conn = Arc::new(Self {
474            http_client,
475            url,
476            session_id,
477            headers,
478            backoff_current_interval,
479            backoff_initial_interval,
480            backoff_randomization_factor,
481            backoff_multiplier,
482            backoff_max_interval,
483            backoff_max_elapsed_time,
484            call_timeout,
485            initialize_result,
486            mock: false,
487            next_id: AtomicU64::new(2),
488            tools: RwLock::new(Ok(Arc::new(Vec::new()))),
489            resources: RwLock::new(Ok(Arc::new(Vec::new()))),
490            _listener_cancel_guard: Some(listener_cancel.drop_guard()),
491            on_tools_list_changed: CallbackSlot::new(),
492            on_resources_list_changed: CallbackSlot::new(),
493            tools_changed: Notify::new(),
494            resources_changed: Notify::new(),
495        });
496
497        // Spawn background tool lister if the server supports tools.
498        //
499        // We don't return until the spawned task has acquired the write
500        // lock. Otherwise a caller that immediately reads `list_tools()`
501        // could race the writer — `tokio::spawn` only queues the task,
502        // and a fast reader can acquire the read lock before the writer
503        // has run its first instruction. The reader would then see the
504        // initial empty `Vec` and return that, even though a full
505        // populate is in flight.
506        //
507        // The `RwLockWriteGuard` itself isn't `Send`-friendly enough to
508        // pass back, so we use a oneshot to signal "I'm holding the
509        // lock now"; once we receive that, the cache is exclusively
510        // owned by the writer and any subsequent `read().await` from
511        // the caller is guaranteed to wait for the populate to finish.
512        if conn.initialize_result.capabilities.tools.is_some() {
513            let conn = Arc::clone(&conn);
514            let (lock_held_tx, lock_held_rx) = tokio::sync::oneshot::channel();
515            tokio::spawn(async move {
516                conn.refresh_tools_signaling(lock_held_tx, None).await;
517            });
518            // Wait for the writer to hold the lock before returning.
519            let _ = lock_held_rx.await;
520        }
521
522        // Spawn background resource lister if the server supports
523        // resources. Same lock-handoff contract as tools above.
524        if conn.initialize_result.capabilities.resources.is_some() {
525            let conn = Arc::clone(&conn);
526            let (lock_held_tx, lock_held_rx) = tokio::sync::oneshot::channel();
527            tokio::spawn(async move {
528                conn.refresh_resources_signaling(lock_held_tx, None).await;
529            });
530            let _ = lock_held_rx.await;
531        }
532
533        // Spawn the list-changed listener iff the caller handed us a
534        // pre-opened SSE stream. The connection is naive about
535        // `tools.list_changed` / `resources.list_changed` capabilities —
536        // [`Client::connect`](super::Client::connect) translates them
537        // into "did or didn't open a stream for us." If we get a stream,
538        // we listen on it; if we don't, there's nothing to listen for.
539        if let Some(initial_lines) = initial_sse_lines {
540            // Hand the listener a `Weak` so the spawned task itself does
541            // not keep the connection alive. `listener_cancel_for_task`
542            // is a sibling clone of the connection's own
543            // `_listener_cancel_guard` token — when the last external
544            // `Arc<ConnectionInner>` clone is dropped, the inner's Drop
545            // releases the guard and the listener wakes from any
546            // pending await (read, send, sleep) and exits immediately.
547            let weak = Arc::downgrade(&conn);
548            tokio::spawn(async move {
549                Self::listen_for_list_changes(
550                    weak,
551                    listener_cancel_for_task,
552                    initial_lines,
553                )
554                .await;
555            });
556        }
557
558        conn
559    }
560
561    /// Creates an exponential backoff configuration from the connection's fields.
562    fn backoff(&self) -> backoff::ExponentialBackoff {
563        backoff::ExponentialBackoff {
564            current_interval: self.backoff_current_interval,
565            initial_interval: self.backoff_initial_interval,
566            randomization_factor: self.backoff_randomization_factor,
567            multiplier: self.backoff_multiplier,
568            max_interval: self.backoff_max_interval,
569            start_time: std::time::Instant::now(),
570            max_elapsed_time: Some(self.backoff_max_elapsed_time),
571            clock: backoff::SystemClock::default(),
572        }
573    }
574
575    /// Builds a POST request with all required headers and the call timeout.
576    fn post(&self) -> reqwest::RequestBuilder {
577        let mut request = self
578            .http_client
579            .post(&self.url)
580            .timeout(self.call_timeout)
581            .header("Content-Type", "application/json")
582            .header("Accept", "application/json, text/event-stream");
583        for (name, value) in &self.headers {
584            request = request.header(name, value);
585        }
586        // Mcp-Session-Id is applied last so a same-named entry in
587        // `headers` (e.g. the proxy's encoded session id) can never
588        // override the connection's own session id.
589        request = request.header("Mcp-Session-Id", &self.session_id);
590        request
591    }
592
593    /// Sends a JSON-RPC request, retrying transient errors when
594    /// `idempotent` is `true`.
595    ///
596    /// Idempotent methods (`tools/list`, `resources/list`,
597    /// `resources/read`, etc.) retry every transient error — network,
598    /// HTTP status, malformed body, JSON-RPC error, session expiration —
599    /// until the backoff's `max_elapsed_time` is exceeded.
600    ///
601    /// Non-idempotent methods (`tools/call`) make exactly one attempt.
602    /// Retrying a `tools/call` is unsafe: a tool may have mutated remote
603    /// state during the first attempt before the response was lost, and
604    /// re-firing the call would mutate state again. Each retry of
605    /// `AppendTask` advances `state.tasks.len()` an extra step, so the
606    /// agent sees a different return value than expected and the
607    /// pid-derived mock seed at the next step diverges. See
608    /// `objectiveai-api/src/agent/completions/client.rs` (sequential
609    /// dispatch) and `mock/client.rs::mock.seed_derive` for the
610    /// downstream consequence.
611    async fn rpc<P: serde::Serialize, R: serde::de::DeserializeOwned>(
612        &self,
613        method: &str,
614        params: &P,
615        idempotent: bool,
616    ) -> Result<R, super::Error> {
617        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
618        let body = serde_json::json!({
619            "jsonrpc": "2.0",
620            "id": id,
621            "method": method,
622            "params": params,
623        });
624
625        let attempt_one = || async {
626            let url = self.url.clone();
627            let response = self.post().json(&body).send().await.map_err(|source| {
628                backoff::Error::transient(super::Error::Request {
629                    url: url.clone(),
630                    source,
631                })
632            })?;
633
634            if response.status() == reqwest::StatusCode::NOT_FOUND {
635                return Err(backoff::Error::transient(
636                    super::Error::SessionExpired { url: url.clone() },
637                ));
638            }
639            if !response.status().is_success() {
640                let code = response.status();
641                let body = response.text().await.unwrap_or_default();
642                return Err(backoff::Error::transient(
643                    super::Error::BadStatus { url: url.clone(), code, body },
644                ));
645            }
646
647            let rpc_response: super::JsonRpcResponse<R> =
648                super::parse_streamable_http_response(&url, response)
649                    .await
650                    .map_err(backoff::Error::transient)?;
651
652            match rpc_response {
653                super::JsonRpcResponse::Success { result, .. } => Ok(result),
654                super::JsonRpcResponse::Error { error, .. } => {
655                    Err(backoff::Error::transient(super::Error::JsonRpc {
656                        url: url.clone(),
657                        code: error.code,
658                        message: error.message,
659                        data: error.data,
660                    }))
661                }
662            }
663        };
664
665        if idempotent {
666            backoff::future::retry(self.backoff(), attempt_one).await
667        } else {
668            attempt_one().await.map_err(|e| match e {
669                backoff::Error::Permanent(err) | backoff::Error::Transient { err, .. } => err,
670            })
671        }
672    }
673
674    /// Sends a JSON-RPC notification (no response expected) with the
675    /// same exponential-backoff retry policy as [`Self::rpc`]. Every
676    /// error is transient; the loop gives up only when the backoff's
677    /// `max_elapsed_time` is exceeded.
678    async fn notify<P: serde::Serialize>(
679        &self,
680        method: &str,
681        params: &P,
682    ) -> Result<(), super::Error> {
683        if self.mock { return Ok(()); }
684        let body = serde_json::json!({
685            "jsonrpc": "2.0",
686            "method": method,
687            "params": params,
688        });
689
690        backoff::future::retry(self.backoff(), || async {
691            let url = self.url.clone();
692            let response = self.post().json(&body).send().await.map_err(|source| {
693                backoff::Error::transient(super::Error::Request {
694                    url: url.clone(),
695                    source,
696                })
697            })?;
698
699            if response.status() == reqwest::StatusCode::NOT_FOUND {
700                return Err(backoff::Error::transient(
701                    super::Error::SessionExpired { url: url.clone() },
702                ));
703            }
704            if !response.status().is_success() {
705                let code = response.status();
706                let body = response.text().await.unwrap_or_default();
707                return Err(backoff::Error::transient(
708                    super::Error::BadStatus { url: url.clone(), code, body },
709                ));
710            }
711
712            Ok(())
713        })
714        .await
715    }
716
717    /// Returns a key identifying this connection for tool namespacing.
718    fn tool_key(&self) -> String {
719        format!("{}-{}", self.initialize_result.server_info.name, self.url)
720    }
721
722    /// Returns the session ID for this connection.
723    fn session_id(&self) -> &str {
724        &self.session_id
725    }
726
727    /// Sends a `tools/list` RPC call for a single page.
728    async fn rpc_list_tools(
729        &self,
730        cursor: Option<&str>,
731    ) -> Result<super::tool::ListToolsResult, super::Error> {
732        self.rpc(
733            "tools/list",
734            &super::tool::ListToolsRequest {
735                cursor: cursor.map(String::from),
736            },
737            true,
738        )
739        .await
740    }
741
742    /// Returns all tools from the server.
743    ///
744    /// Blocks until background pagination completes, then returns a
745    /// cheap `Arc` clone of the result.
746    async fn list_tools(
747        &self,
748    ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
749        self.tools.read().await.clone()
750    }
751
752    /// Calls a tool on the MCP server.
753    async fn call_tool(
754        &self,
755        params: &super::tool::CallToolRequestParams,
756    ) -> Result<super::tool::CallToolResult, super::Error> {
757        if self.mock {
758            return Ok(super::tool::CallToolResult {
759                content: vec![super::tool::ContentBlock::Text(super::tool::TextContent {
760                    text: "mock".to_string(),
761                    annotations: None,
762                    _meta: None,
763                })],
764                structured_content: None,
765                is_error: None,
766                _meta: None,
767            });
768        }
769        self.rpc("tools/call", params, false).await
770    }
771
772    /// Calls a tool and converts the result into a [`ToolMessage`].
773    ///
774    /// Content blocks are mapped as follows:
775    /// - `text` → text part
776    /// - `image` → image_url part (data URL)
777    /// - `audio` → input_audio part
778    /// - `resource` (embedded text) → text part
779    /// - `resource` (embedded blob, image mime) → image_url part (data URL)
780    /// - `resource` (embedded blob, other mime) → file part
781    /// - `resource_link` → if the URI appears in `list_resources`, fetches
782    ///   via `read_resource` and inlines the content using the same
783    ///   text/blob rules; otherwise serializes the link as JSON text
784    ///
785    /// If `is_error` is set on the result, the content is prefixed with
786    /// an error indicator.
787    async fn call_tool_as_message(
788        &self,
789        params: &super::tool::CallToolRequestParams,
790        tool_call_id: String,
791    ) -> Result<
792        crate::agent::completions::message::ToolMessage,
793        super::Error,
794    > {
795        use crate::agent::completions::message::{
796            File, ImageUrl, InputAudio, RichContent, RichContentPart,
797            ToolMessage,
798        };
799        use super::shared::ResourceContentsUnion;
800        use super::tool::ContentBlock;
801
802        let result = self.call_tool(params).await?;
803
804        // Build the set of known resource URIs for resource_link resolution.
805        let known_resource_uris: std::collections::HashSet<String> =
806            match self.list_resources().await {
807                Ok(resources) => {
808                    resources.iter().map(|r| r.uri.clone()).collect()
809                }
810                Err(_) => std::collections::HashSet::new(),
811            };
812
813        /// Converts a `ResourceContentsUnion` into one or more rich content
814        /// parts. Text resources become text parts. Blob resources with an
815        /// image MIME type become image_url parts (data URL); all other blobs
816        /// become file parts.
817        fn resource_contents_to_part(
818            contents: &ResourceContentsUnion,
819        ) -> RichContentPart {
820            match contents {
821                ResourceContentsUnion::Text(text) => {
822                    RichContentPart::Text {
823                        text: text.text.clone(),
824                    }
825                }
826                ResourceContentsUnion::Blob(blob) => {
827                    let mime = blob
828                        .base
829                        .mime_type
830                        .as_deref()
831                        .unwrap_or("application/octet-stream");
832
833                    if mime.starts_with("image/") {
834                        RichContentPart::ImageUrl {
835                            image_url: ImageUrl {
836                                url: format!(
837                                    "data:{};base64,{}",
838                                    mime, blob.blob
839                                ),
840                                detail: None,
841                            },
842                        }
843                    } else {
844                        // Extract a filename from the URI path, if any.
845                        let filename = blob
846                            .base
847                            .uri
848                            .rsplit('/')
849                            .next()
850                            .filter(|s| !s.is_empty())
851                            .map(String::from);
852
853                        RichContentPart::File {
854                            file: File {
855                                file_data: Some(blob.blob.clone()),
856                                filename,
857                                file_id: None,
858                                file_url: None,
859                            },
860                        }
861                    }
862                }
863            }
864        }
865
866        let mut parts: Vec<RichContentPart> = Vec::new();
867
868        for block in &result.content {
869            match block {
870                ContentBlock::Text(text) => {
871                    parts.push(RichContentPart::Text {
872                        text: text.text.clone(),
873                    });
874                }
875                ContentBlock::Image(image) => {
876                    parts.push(RichContentPart::ImageUrl {
877                        image_url: ImageUrl {
878                            url: format!(
879                                "data:{};base64,{}",
880                                image.mime_type, image.data
881                            ),
882                            detail: None,
883                        },
884                    });
885                }
886                ContentBlock::Audio(audio) => {
887                    parts.push(RichContentPart::InputAudio {
888                        input_audio: InputAudio {
889                            data: audio.data.clone(),
890                            format: audio.mime_type.clone(),
891                        },
892                    });
893                }
894                ContentBlock::EmbeddedResource(embedded) => {
895                    parts.push(resource_contents_to_part(
896                        &embedded.resource,
897                    ));
898                }
899                ContentBlock::ResourceLink(link) => {
900                    if known_resource_uris.contains(&link.uri) {
901                        // Fetch the resource and inline its contents.
902                        let read_result =
903                            self.read_resource(&link.uri).await?;
904                        for contents in &read_result.contents {
905                            parts.push(
906                                resource_contents_to_part(contents),
907                            );
908                        }
909                    } else {
910                        // Not a known resource; serialize as JSON text.
911                        parts.push(RichContentPart::Text {
912                            text: serde_json::to_string(link)
913                                .unwrap_or_default(),
914                        });
915                    }
916                }
917            }
918        }
919
920        let content = match parts.len() {
921            0 => RichContent::Text(String::new()),
922            1 => match parts.remove(0) {
923                RichContentPart::Text { text } => RichContent::Text(text),
924                other => RichContent::Parts(vec![other]),
925            },
926            _ => RichContent::Parts(parts),
927        };
928
929        Ok(ToolMessage {
930            content,
931            tool_call_id,
932        })
933    }
934
935    /// Sends a `resources/list` RPC call for a single page.
936    async fn rpc_list_resources(
937        &self,
938        cursor: Option<&str>,
939    ) -> Result<super::resource::ListResourcesResult, super::Error> {
940        self.rpc(
941            "resources/list",
942            &super::resource::ListResourcesRequest {
943                cursor: cursor.map(String::from),
944            },
945            true,
946        )
947        .await
948    }
949
950    /// Returns all resources from the server.
951    ///
952    /// Blocks until background pagination completes, then returns a
953    /// cheap `Arc` clone of the result.
954    async fn list_resources(
955        &self,
956    ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
957        self.resources.read().await.clone()
958    }
959
960    /// Returns the cached tool list as soon as it differs from `current`,
961    /// or — if it equals `current` right now — waits up to `timeout` for
962    /// the cache to change and then returns whatever it sees.
963    ///
964    /// An `Err` cache is treated as "different from any caller snapshot"
965    /// and returned immediately.
966    ///
967    /// Concurrency-safe: any number of concurrent subscribers wait on
968    /// independent `Notified` futures and read the cache through the
969    /// shared `RwLock`. A timeout that fires alone is not an error — we
970    /// re-read the cache and return whatever's there.
971    async fn subscribe_tools(
972        &self,
973        current: &[super::tool::Tool],
974        timeout: Duration,
975    ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
976        // Arm BEFORE reading. `enable()` registers the future in the
977        // wait queue without polling, so a `notify_waiters` racing
978        // between our read and our await still wakes us.
979        let notified = self.tools_changed.notified();
980        tokio::pin!(notified);
981        notified.as_mut().enable();
982
983        let initial = self.tools.read().await.clone();
984        match &initial {
985            Ok(arc) if arc.as_slice() == current => {}
986            _ => return initial,
987        }
988
989        let _ = tokio::time::timeout(timeout, notified).await;
990
991        self.tools.read().await.clone()
992    }
993
994    /// Resource counterpart of [`Self::subscribe_tools`].
995    async fn subscribe_resources(
996        &self,
997        current: &[super::resource::Resource],
998        timeout: Duration,
999    ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
1000        let notified = self.resources_changed.notified();
1001        tokio::pin!(notified);
1002        notified.as_mut().enable();
1003
1004        let initial = self.resources.read().await.clone();
1005        match &initial {
1006            Ok(arc) if arc.as_slice() == current => {}
1007            _ => return initial,
1008        }
1009
1010        let _ = tokio::time::timeout(timeout, notified).await;
1011
1012        self.resources.read().await.clone()
1013    }
1014
1015    /// Reads a resource from the MCP server.
1016    async fn read_resource(
1017        &self,
1018        uri: &str,
1019    ) -> Result<super::resource::ReadResourceResult, super::Error> {
1020        self.rpc(
1021            "resources/read",
1022            &super::resource::ReadResourceRequestParams {
1023                uri: uri.to_string(),
1024            },
1025            true,
1026        )
1027        .await
1028    }
1029
1030    /// Re-fetches all tools from the server, replacing the cached list.
1031    ///
1032    /// Optionally fires `on_change` *after* the write lock is acquired but
1033    /// *before* the network paginate begins, so the callback observes the
1034    /// "list change is in flight" edge — readers blocked on the read lock
1035    /// won't return until the new list lands. The proxy uses this to
1036    /// re-emit `notifications/tools/list_changed` to its downstream client
1037    /// at the moment the staleness window opens.
1038    async fn refresh_tools(&self, on_change: Option<ListChangedCallback>) {
1039        // Listener-driven refresh. Visibility contract: any caller
1040        // that issues `list_tools()` after a `tools/list_changed`
1041        // notification has been observed must see the post-swap
1042        // value, not stale data — so the write lock has to gate
1043        // readers across the upstream paginate.
1044        //
1045        // Performance contract: don't serialise paginate *behind*
1046        // lock-acquisition latency. We start `tools.write()` and the
1047        // upstream paginate **concurrently** with `tokio::join!`. The
1048        // write-lock acquire blocks new `list_tools()` readers
1049        // immediately (preserving visibility) and runs in parallel
1050        // with whatever drain time the in-flight readers need; the
1051        // paginate runs alongside. Total wall-clock is
1052        // `max(drain_time, paginate_time)` instead of the sum.
1053        //
1054        // `notify_waiters` and `on_change` fire under the write
1055        // guard, *after* `*guard = result`, so anyone awoken by them
1056        // queues on the read lock, waits for the guard to drop, and
1057        // observes the post-swap state.
1058        let (mut guard, result) = tokio::join!(
1059            self.tools.write(),
1060            self.paginate_tools(),
1061        );
1062        *guard = result;
1063        self.tools_changed.notify_waiters();
1064        if let Some(cb) = on_change {
1065            cb();
1066        }
1067    }
1068
1069    /// Page-by-page fetch of the upstream tool list, no locks held.
1070    /// Shared between the `_signaling` (initial-populate, holds lock
1071    /// for the original "block fast readers" contract) and `refresh_*`
1072    /// (listener-driven, lock-only-around-install) variants.
1073    async fn paginate_tools(
1074        &self,
1075    ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
1076        let mut all_tools = Vec::new();
1077        let mut cursor: Option<String> = None;
1078        loop {
1079            match self.rpc_list_tools(cursor.as_deref()).await {
1080                Ok(page) => {
1081                    all_tools.extend(page.tools);
1082                    cursor = page.next_cursor;
1083                    if cursor.is_none() {
1084                        return Ok(Arc::new(all_tools));
1085                    }
1086                }
1087                Err(e) => return Err(Arc::new(e)),
1088            }
1089        }
1090    }
1091
1092    /// Same as [`Self::refresh_tools`] but fires `lock_held` once the
1093    /// write lock has been acquired so the caller can synchronise on
1094    /// "writer is in possession of the cache" before returning. Used by
1095    /// `ConnectionInner::new` to prevent a fast reader from acquiring
1096    /// the read lock before this writer has even started.
1097    async fn refresh_tools_signaling(
1098        &self,
1099        lock_held: tokio::sync::oneshot::Sender<()>,
1100        on_change: Option<ListChangedCallback>,
1101    ) {
1102        let mut guard = self.tools.write().await;
1103        // Fire `tools_changed` while we hold the write lock and *before*
1104        // installing the new list. Any subscriber woken now must take a
1105        // read lock to observe the result, and that read lock is queued
1106        // behind this write guard — so they always see the post-swap
1107        // state, never mid-swap.
1108        self.tools_changed.notify_waiters();
1109        let _ = lock_held.send(());
1110        if let Some(cb) = on_change {
1111            cb();
1112        }
1113        let mut all_tools = Vec::new();
1114        let mut cursor: Option<String> = None;
1115        let result = loop {
1116            match self.rpc_list_tools(cursor.as_deref()).await {
1117                Ok(page) => {
1118                    all_tools.extend(page.tools);
1119                    cursor = page.next_cursor;
1120                    if cursor.is_none() {
1121                        break Ok(Arc::new(all_tools));
1122                    }
1123                }
1124                Err(e) => break Err(Arc::new(e)),
1125            }
1126        };
1127        *guard = result;
1128    }
1129
1130    /// Re-fetches all resources from the server, replacing the cached list.
1131    /// See [`ConnectionInner::refresh_tools`] for the callback timing
1132    /// contract.
1133    async fn refresh_resources(&self, on_change: Option<ListChangedCallback>) {
1134        // Same paginate-while-acquiring-the-write-lock pattern as
1135        // `refresh_tools` — see that comment for the visibility +
1136        // performance rationale.
1137        let (mut guard, result) = tokio::join!(
1138            self.resources.write(),
1139            self.paginate_resources(),
1140        );
1141        *guard = result;
1142        self.resources_changed.notify_waiters();
1143        if let Some(cb) = on_change {
1144            cb();
1145        }
1146    }
1147
1148    async fn paginate_resources(
1149        &self,
1150    ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
1151        let mut all_resources = Vec::new();
1152        let mut cursor: Option<String> = None;
1153        loop {
1154            match self.rpc_list_resources(cursor.as_deref()).await {
1155                Ok(page) => {
1156                    all_resources.extend(page.resources);
1157                    cursor = page.next_cursor;
1158                    if cursor.is_none() {
1159                        return Ok(Arc::new(all_resources));
1160                    }
1161                }
1162                Err(e) => return Err(Arc::new(e)),
1163            }
1164        }
1165    }
1166
1167    /// Resource counterpart of [`Self::refresh_tools_signaling`].
1168    async fn refresh_resources_signaling(
1169        &self,
1170        lock_held: tokio::sync::oneshot::Sender<()>,
1171        on_change: Option<ListChangedCallback>,
1172    ) {
1173        let mut guard = self.resources.write().await;
1174        // See `refresh_tools_signaling` — fire under the write lock,
1175        // before install, so subscribers' next read sees the post-swap
1176        // state.
1177        self.resources_changed.notify_waiters();
1178        let _ = lock_held.send(());
1179        if let Some(cb) = on_change {
1180            cb();
1181        }
1182        let mut all_resources = Vec::new();
1183        let mut cursor: Option<String> = None;
1184        let result = loop {
1185            match self.rpc_list_resources(cursor.as_deref()).await {
1186                Ok(page) => {
1187                    all_resources.extend(page.resources);
1188                    cursor = page.next_cursor;
1189                    if cursor.is_none() {
1190                        break Ok(Arc::new(all_resources));
1191                    }
1192                }
1193                Err(e) => break Err(Arc::new(e)),
1194            }
1195        };
1196        *guard = result;
1197    }
1198
1199    /// Builds a GET request to the MCP endpoint for receiving server
1200    /// notifications via SSE.
1201    fn get(&self) -> reqwest::RequestBuilder {
1202        let mut request = self
1203            .http_client
1204            .get(&self.url)
1205            .header("Accept", "text/event-stream");
1206        for (name, value) in &self.headers {
1207            request = request.header(name, value);
1208        }
1209        // Mcp-Session-Id last so it always wins over `headers`.
1210        request = request.header("Mcp-Session-Id", &self.session_id);
1211        request
1212    }
1213
1214    /// Listens for `notifications/tools/list_changed` and
1215    /// `notifications/resources/list_changed` on an SSE stream. On each
1216    /// notification, write-locks and re-fetches the full list.
1217    ///
1218    /// `initial_lines` is the pre-opened SSE line reader handed in by
1219    /// [`Client::connect`](super::Client::connect) — that stream is
1220    /// consumed first. When it ends (or any later GET reconnect ends),
1221    /// we sleep `backoff_initial_interval` and open a fresh GET `/` SSE
1222    /// stream.
1223    ///
1224    /// Takes a [`Weak<Self>`] (not `Arc<Self>`) so the spawned task
1225    /// doesn't itself keep the [`Connection`] alive, and a
1226    /// [`CancellationToken`] sibling clone of the connection's
1227    /// [`DropGuard`] so the task tears down the instant the last
1228    /// external `Arc<ConnectionInner>` clone is dropped — every
1229    /// blocking await (line read, reconnect send, backoff sleep) is
1230    /// raced against `cancel.cancelled()` and exits without any zombie
1231    /// retries against a now-dead session.
1232    async fn listen_for_list_changes(
1233        weak: Weak<Self>,
1234        cancel: CancellationToken,
1235        initial_lines: super::LinesStream,
1236    ) {
1237        // First iteration: use the pre-opened SSE stream the client
1238        // handed us. After that, fall back to opening fresh GET / SSE
1239        // streams as the upstream connection cycles.
1240        let mut next_lines: Option<super::LinesStream> = Some(initial_lines);
1241        // One-shot guard for the catch-up refresh: false on the very
1242        // first iteration (the caller's pre-opened SSE stream — its
1243        // associated cache was just populated by `Client::connect`'s
1244        // initial pagination, so re-fetching there would just be a
1245        // wasted round-trip), true thereafter. Every stream end —
1246        // whether `Ok(None)` (clean close) or `Err(_)` (read failure)
1247        // — drops back here, which we treat as an implicit
1248        // list-changed notification: the upstream's broadcast (in
1249        // particular the proxy's per-session `tokio::broadcast`) is
1250        // lossy for moments when this listener has zero active
1251        // subscribers, so anything that fired during our disconnect
1252        // window may have been dropped.
1253        //
1254        // ORDER MATTERS. The refresh must run AFTER we've re-opened
1255        // the GET / SSE stream — i.e. after we're a subscriber again
1256        // — and BEFORE we enter the inner read loop. If we refreshed
1257        // before the resubscribe, a notification that fired between
1258        // our refresh-completion and our subscribe would be lost the
1259        // same way as the original disconnect-window drops; doing it
1260        // after means a notification fired DURING the refresh lands
1261        // in the new subscriber's buffer (broadcast::Sender::send
1262        // backs onto each receiver's channel-capacity slot) and gets
1263        // consumed by the inner loop on its next read.
1264        let mut is_reconnect = false;
1265
1266        loop {
1267            // The token cancels deterministically when the last
1268            // `Arc<ConnectionInner>` clone is dropped (see
1269            // `_listener_cancel_guard`). Check once per outer
1270            // iteration, but the real protection is the cancel arms in
1271            // every blocking await below — those exit immediately on
1272            // cancel.
1273            if cancel.is_cancelled() {
1274                return;
1275            }
1276            let Some(this) = weak.upgrade() else { return };
1277            let backoff_delay = this.backoff_initial_interval;
1278
1279            let mut lines = match next_lines.take() {
1280                Some(l) => l,
1281                None => {
1282                    // Race the upstream GET against cancellation — if
1283                    // the connection drops mid-reconnect, exit
1284                    // immediately rather than waiting for the request
1285                    // to complete or time out (otherwise produces a
1286                    // burst of 401 retries against a now-dead session
1287                    // under heavy churn).
1288                    let send_outcome = tokio::select! {
1289                        out = this.get().send() => out,
1290                        _ = cancel.cancelled() => {
1291                            drop(this);
1292                            return;
1293                        }
1294                    };
1295                    let response = match send_outcome {
1296                        Ok(r) if r.status().is_success() => r,
1297                        _ => {
1298                            drop(this);
1299                            // Sleep with cancel-arm: instant exit on
1300                            // drop, no zombie retries.
1301                            tokio::select! {
1302                                _ = tokio::time::sleep(backoff_delay) => {}
1303                                _ = cancel.cancelled() => return,
1304                            }
1305                            continue;
1306                        }
1307                    };
1308                    super::lines_from_response(response)
1309                }
1310            };
1311
1312            // Catch-up refresh on every reconnect — the implicit
1313            // list-changed treatment for the just-failed stream. See
1314            // the `is_reconnect` doc-comment above for the
1315            // refresh-AFTER-resubscribe rationale.
1316            if is_reconnect {
1317                // tools and resources are independent locks; run the
1318                // catch-up refreshes concurrently so disconnect
1319                // recovery isn't sequential.
1320                let _ = tokio::join!(
1321                    this.refresh_tools(this.on_tools_list_changed.get()),
1322                    this.refresh_resources(this.on_resources_list_changed.get()),
1323                );
1324            }
1325            is_reconnect = true;
1326
1327            'inner: loop {
1328                tokio::select! {
1329                    line_result = lines.next_line() => {
1330                        match line_result {
1331                            Ok(Some(line)) => {
1332                                // SSE data lines start with "data: ".
1333                                let Some(data) = line.strip_prefix("data: ") else {
1334                                    continue 'inner;
1335                                };
1336                                let method = match serde_json::from_str::<super::JsonRpcNotification>(data) {
1337                                    Ok(n) => n.method,
1338                                    Err(_) => continue 'inner,
1339                                };
1340                                match method.as_str() {
1341                                    "notifications/tools/list_changed" => {
1342                                        // refresh_tools fires the
1343                                        // callback after the cache is
1344                                        // installed, so the proxy's
1345                                        // downstream
1346                                        // notifications/tools/list_changed
1347                                        // emission lines up with the
1348                                        // staleness window opening.
1349                                        this.refresh_tools(
1350                                            this.on_tools_list_changed.get(),
1351                                        )
1352                                        .await;
1353                                    }
1354                                    "notifications/resources/list_changed" => {
1355                                        this.refresh_resources(
1356                                            this.on_resources_list_changed.get(),
1357                                        )
1358                                        .await;
1359                                    }
1360                                    _ => {}
1361                                }
1362                            }
1363                            // Stream ended cleanly or errored — break out
1364                            // to the outer loop so we either reconnect or,
1365                            // if everyone's gone, exit at the top.
1366                            _ => break 'inner,
1367                        }
1368                    }
1369                    // Cancellation: the connection's last clone has
1370                    // dropped. Tear down immediately.
1371                    _ = cancel.cancelled() => {
1372                        drop(this);
1373                        return;
1374                    }
1375                }
1376            }
1377
1378            // Stream ended — drop the strong ref before sleeping so the
1379            // next iteration's weak-upgrade can detect liveness honestly.
1380            drop(this);
1381            tokio::select! {
1382                _ = tokio::time::sleep(backoff_delay) => {}
1383                _ = cancel.cancelled() => return,
1384            }
1385        }
1386    }
1387}
1388
1389#[cfg(test)]
1390mod subscribe_tests {
1391    use super::*;
1392    use crate::mcp::tool::{Tool, ToolSchemaObject, ToolSchemaType};
1393
1394    fn tool(name: &str) -> Tool {
1395        Tool {
1396            name: name.to_string(),
1397            title: None,
1398            description: None,
1399            icons: None,
1400            input_schema: ToolSchemaObject {
1401                r#type: ToolSchemaType::Object,
1402                properties: None,
1403                required: None,
1404                extra: IndexMap::new(),
1405            },
1406            output_schema: None,
1407            annotations: None,
1408            execution: None,
1409            _meta: None,
1410        }
1411    }
1412
1413    /// First read shows a different list — return immediately, never wait.
1414    #[tokio::test]
1415    async fn subscribe_tools_returns_immediately_when_cache_differs() {
1416        let conn = Connection::new_for_test("t".into(), "http://x".into());
1417        *conn.inner.tools.write().await = Ok(Arc::new(vec![tool("a")]));
1418
1419        let start = std::time::Instant::now();
1420        let got = conn
1421            .subscribe_tools(&[tool("b")], Duration::from_secs(5))
1422            .await
1423            .unwrap();
1424        assert!(start.elapsed() < Duration::from_millis(100));
1425        assert_eq!(got.as_slice(), &[tool("a")]);
1426    }
1427
1428    /// Cached `Err` is treated as "different from any caller snapshot."
1429    #[tokio::test]
1430    async fn subscribe_tools_returns_err_immediately() {
1431        let conn = Connection::new_for_test("t".into(), "http://x".into());
1432        let err = super::super::Error::NoSessionId {
1433            url: "http://x".into(),
1434            body: String::new(),
1435        };
1436        *conn.inner.tools.write().await = Err(Arc::new(err));
1437
1438        let start = std::time::Instant::now();
1439        let got = conn
1440            .subscribe_tools(&[], Duration::from_secs(5))
1441            .await;
1442        assert!(start.elapsed() < Duration::from_millis(100));
1443        assert!(got.is_err());
1444    }
1445
1446    /// Cache equals snapshot, then a writer fires the notify under the
1447    /// write lock and installs a new list. The subscriber wakes, then its
1448    /// re-read blocks behind the writer's guard, observes the new list.
1449    #[tokio::test]
1450    async fn subscribe_tools_wakes_on_change_and_reads_post_swap() {
1451        let conn = Connection::new_for_test("t".into(), "http://x".into());
1452        *conn.inner.tools.write().await = Ok(Arc::new(vec![tool("a")]));
1453
1454        let conn_for_subscriber = conn.clone();
1455        let subscriber = tokio::spawn(async move {
1456            conn_for_subscriber
1457                .subscribe_tools(&[tool("a")], Duration::from_secs(5))
1458                .await
1459                .unwrap()
1460        });
1461
1462        // Give the subscriber a moment to arm `notified()` and finish
1463        // its first read so it's parked on the timeout.
1464        tokio::time::sleep(Duration::from_millis(50)).await;
1465
1466        // Simulate `refresh_tools_signaling`: take the write lock, fire
1467        // `tools_changed` *while holding* the write lock, then install
1468        // the new value before releasing. This is exactly the ordering
1469        // that the real refresh path uses.
1470        {
1471            let mut guard = conn.inner.tools.write().await;
1472            conn.inner.tools_changed.notify_waiters();
1473            // Hold briefly to make absolutely sure the subscriber is
1474            // racing the read lock against our drop.
1475            tokio::time::sleep(Duration::from_millis(20)).await;
1476            *guard = Ok(Arc::new(vec![tool("b")]));
1477        }
1478
1479        let got = tokio::time::timeout(Duration::from_secs(2), subscriber)
1480            .await
1481            .expect("subscriber returned in time")
1482            .expect("subscriber didn't panic");
1483        assert_eq!(got.as_slice(), &[tool("b")]);
1484    }
1485
1486    /// Cache equals snapshot, no notification arrives — timeout, return
1487    /// the still-equal list (not an error).
1488    #[tokio::test]
1489    async fn subscribe_tools_times_out_and_returns_unchanged_list() {
1490        let conn = Connection::new_for_test("t".into(), "http://x".into());
1491        *conn.inner.tools.write().await = Ok(Arc::new(vec![tool("a")]));
1492
1493        let start = std::time::Instant::now();
1494        let got = conn
1495            .subscribe_tools(&[tool("a")], Duration::from_millis(50))
1496            .await
1497            .unwrap();
1498        let elapsed = start.elapsed();
1499        assert!(elapsed >= Duration::from_millis(40), "elapsed: {elapsed:?}");
1500        assert!(elapsed < Duration::from_millis(500), "elapsed: {elapsed:?}");
1501        assert_eq!(got.as_slice(), &[tool("a")]);
1502    }
1503
1504    /// Two concurrent subscribers both wake on a single notify_waiters
1505    /// and both observe the post-swap list.
1506    #[tokio::test]
1507    async fn subscribe_tools_supports_concurrent_subscribers() {
1508        let conn = Connection::new_for_test("t".into(), "http://x".into());
1509        *conn.inner.tools.write().await = Ok(Arc::new(vec![tool("a")]));
1510
1511        let c1 = conn.clone();
1512        let c2 = conn.clone();
1513        let s1 = tokio::spawn(async move {
1514            c1.subscribe_tools(&[tool("a")], Duration::from_secs(5))
1515                .await
1516                .unwrap()
1517        });
1518        let s2 = tokio::spawn(async move {
1519            c2.subscribe_tools(&[tool("a")], Duration::from_secs(5))
1520                .await
1521                .unwrap()
1522        });
1523
1524        tokio::time::sleep(Duration::from_millis(50)).await;
1525
1526        {
1527            let mut guard = conn.inner.tools.write().await;
1528            conn.inner.tools_changed.notify_waiters();
1529            *guard = Ok(Arc::new(vec![tool("c")]));
1530        }
1531
1532        let (r1, r2) = tokio::join!(s1, s2);
1533        let r1 = r1.unwrap();
1534        let r2 = r2.unwrap();
1535        assert_eq!(r1.as_slice(), &[tool("c")]);
1536        assert_eq!(r2.as_slice(), &[tool("c")]);
1537    }
1538}