Skip to main content

objectiveai_sdk/mcp/
connection.rs

1//! MCP connection for communicating with an MCP server.
2//!
3//! [`Connection`] is a cheaply-clonable handle around an internal
4//! [`ConnectionInner`]. The last drop of the inner `Arc` drops
5//! [`ConnectionInner`]'s `_listener_cancel_guard` field (a
6//! [`tokio_util::sync::DropGuard`]), which cancels the listener task's
7//! [`tokio_util::sync::CancellationToken`] — the SSE listener exits the
8//! instant any in-flight reconnect, sleep, or read is cancelled, with no
9//! zombie 401 retries against a now-dead proxy session.
10
11use std::ops::Deref;
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::sync::{Arc, RwLock as StdRwLock, Weak};
14use std::time::Duration;
15
16use indexmap::IndexMap;
17use tokio::sync::RwLock;
18use tokio_util::sync::{CancellationToken, DropGuard};
19
20/// Callback fired by [`Connection`] when the upstream MCP server emits
21/// `notifications/tools/list_changed` or `notifications/resources/list_changed`.
22///
23/// **Timing:** runs after the corresponding cache's write lock is taken
24/// but *before* the network paginate that replaces it. That ordering
25/// matches the moment the staleness window opens — anyone blocked on the
26/// read lock won't return until the new list lands. The callback should
27/// not call back into `list_tools` / `list_resources`: doing so would
28/// re-take the lock the listener already holds and deadlock.
29///
30/// Stored behind an `Arc` so the listener task can cheaply clone it out
31/// of the lock and call it without holding the read guard.
32pub type ListChangedCallback = Arc<dyn Fn() + Send + Sync + 'static>;
33
34/// A registered-or-not callback slot. Wrapper so [`ConnectionInner`] can
35/// keep `#[derive(Debug)]` (a raw `dyn Fn` isn't `Debug`).
36struct CallbackSlot(StdRwLock<Option<ListChangedCallback>>);
37
38impl CallbackSlot {
39    fn new() -> Self {
40        Self(StdRwLock::new(None))
41    }
42
43    fn set(&self, callback: ListChangedCallback) {
44        *self.0.write().unwrap() = Some(callback);
45    }
46
47    /// Cheap clone-out of the current callback (if any). The `Arc` clone
48    /// lets us release the read guard before invoking the callback.
49    fn get(&self) -> Option<ListChangedCallback> {
50        self.0.read().unwrap().clone()
51    }
52}
53
54impl std::fmt::Debug for CallbackSlot {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        let set = self.0.read().map(|g| g.is_some()).unwrap_or(false);
57        f.debug_struct("CallbackSlot").field("set", &set).finish()
58    }
59}
60
61/// An active connection to an MCP server using the Streamable HTTP transport.
62///
63/// Cheaply clonable (one `Arc` bump). When the last clone is dropped, the
64/// inner `Arc` ref count hits zero, the `_listener_cancel_guard` field is
65/// dropped, and the SSE listener task is
66/// cancelled — exiting any in-flight `lines.next_line()`, reconnect
67/// `send()`, or backoff `sleep` *immediately* without retrying against
68/// the now-dead proxy session.
69///
70/// Use the public methods (`list_tools`, `call_tool`, `list_resources`,
71/// `read_resource`, `call_tool_as_message`, `tool_key`) for the upstream
72/// MCP protocol surface. The inner state ([`ConnectionInner`]) is also
73/// reachable via `Deref` for read-only field access (e.g.
74/// `connection.url`, `connection.initialize_result.server_info.name`),
75/// but its methods are private — you must go through `Connection`.
76#[derive(Debug)]
77pub struct Connection {
78    inner: Arc<ConnectionInner>,
79}
80
81impl Clone for Connection {
82    fn clone(&self) -> Self {
83        Self {
84            inner: Arc::clone(&self.inner),
85        }
86    }
87}
88
89// No `Drop` for `Connection`: cancellation happens deterministically
90// when the last `Arc<ConnectionInner>` clone is dropped, which drops
91// the `_listener_cancel_guard` field and cancels the listener token.
92
93impl Deref for Connection {
94    type Target = ConnectionInner;
95    fn deref(&self) -> &ConnectionInner {
96        &self.inner
97    }
98}
99
100impl Connection {
101    /// Tear this connection down explicitly.
102    ///
103    /// 1. Cancels the long-lived list-changed listener task immediately
104    ///    (drops the [`DropGuard`] in
105    ///    [`ConnectionInner::_listener_cancel_guard`]), so by the time
106    ///    the HTTP DELETE goes out the listener isn't still holding an
107    ///    SSE read open against the upstream we're about to close.
108    /// 2. Issues `DELETE /` to the upstream with this connection's
109    ///    `Mcp-Session-Id` and the same merged header set every other
110    ///    RPC stamps. Reuses [`ConnectionInner::call_timeout`].
111    /// 3. Treats `404 / 401 / 403` as success — the upstream is
112    ///    unreachable from these credentials anyway, which is the
113    ///    desired terminal state. Other non-2xx surfaces as
114    ///    [`super::Error::BadStatus`].
115    ///
116    /// Takes `&self`: the listener cancel is in-place, and dropping
117    /// the surrounding `Arc<ConnectionInner>` (which closes the rest
118    /// of the connection's owned state) is the caller's responsibility
119    /// — usually by dropping the `Arc<Session>` holding it. Stateless
120    /// callers that don't hold a `Connection` should use
121    /// [`Client::delete`](super::Client::delete) instead.
122    ///
123    /// **In-flight RPC ordering.** This method does not block on
124    /// in-flight `call_tool` / `read_resource` / `list_tools` /
125    /// `list_resources` calls on the same connection. If one is
126    /// outstanding when `delete` lands, the upstream may see DELETE
127    /// before the RPC's reply makes it back; the in-flight call then
128    /// surfaces as a closed-connection error to whoever started it.
129    /// That's the spec-correct order (client said terminate) — drain
130    /// on the caller side first if you need different semantics.
131    pub async fn delete(&self) -> Result<(), super::Error> {
132        // 1. Drop the listener-cancel guard. Releasing the `DropGuard`
133        //    cancels the sibling `CancellationToken` the listener task
134        //    holds; the listener `tokio::select!`s against it on every
135        //    blocking await and exits inside one scheduler tick.
136        if let Ok(mut guard) = self.inner._listener_cancel_guard.lock() {
137            let _ = guard.take();
138        }
139
140        // 2. Build + send HTTP DELETE. Mirrors `Client::connect_once`'s
141        //    request-stamp shape: header loop first, explicit
142        //    `Mcp-Session-Id` always wins.
143        let request = self
144            .inner
145            .http_client
146            .delete(&self.inner.url)
147            .timeout(self.inner.call_timeout)
148            .headers(self.inner.build_request_headers(None, None).await);
149        let response = request.send().await.map_err(|source| {
150            super::Error::Request {
151                url: self.inner.url.clone(),
152                source,
153            }
154        })?;
155
156        // 3. 404 / 401 / 403 → success; other non-2xx → real error.
157        let status = response.status();
158        if matches!(
159            status,
160            reqwest::StatusCode::NOT_FOUND
161                | reqwest::StatusCode::UNAUTHORIZED
162                | reqwest::StatusCode::FORBIDDEN
163        ) {
164            return Ok(());
165        }
166        if !status.is_success() {
167            let body = response.text().await.unwrap_or_default();
168            return Err(super::Error::BadStatus {
169                url: self.inner.url.clone(),
170                code: status,
171                body: body.chars().take(800).collect(),
172            });
173        }
174        Ok(())
175    }
176
177    pub(super) async fn new(
178        http_client: reqwest::Client,
179        url: String,
180        session_id: String,
181        headers: IndexMap<String, String>,
182        backoff_current_interval: Duration,
183        backoff_initial_interval: Duration,
184        backoff_randomization_factor: f64,
185        backoff_multiplier: f64,
186        backoff_max_interval: Duration,
187        backoff_max_elapsed_time: Duration,
188        call_timeout: Duration,
189        initialize_result: super::initialize_result::InitializeResult,
190        initial_sse_lines: Option<super::LinesStream>,
191    ) -> Self {
192        let inner = ConnectionInner::new(
193            http_client,
194            url,
195            session_id,
196            headers,
197            backoff_current_interval,
198            backoff_initial_interval,
199            backoff_randomization_factor,
200            backoff_multiplier,
201            backoff_max_interval,
202            backoff_max_elapsed_time,
203            call_timeout,
204            initialize_result,
205            initial_sse_lines,
206        )
207        .await;
208        Self { inner }
209    }
210
211    #[cfg(test)]
212    pub(crate) fn new_for_test(name: String, url: String) -> Self {
213        Self {
214            inner: ConnectionInner::new_for_test(name, url),
215        }
216    }
217
218    #[cfg(test)]
219    pub(crate) fn new_for_test_with_caps(
220        name: String,
221        url: String,
222        capabilities: super::initialize_result::ServerCapabilities,
223    ) -> Self {
224        Self {
225            inner: ConnectionInner::new_for_test_with_caps(
226                name,
227                url,
228                capabilities,
229            ),
230        }
231    }
232
233    /// Returns a key identifying this connection for tool namespacing.
234    pub fn tool_key(&self) -> String {
235        self.inner.tool_key()
236    }
237
238    /// Returns the session ID for this connection.
239    pub fn session_id(&self) -> &str {
240        self.inner.session_id()
241    }
242
243    /// Returns all tools from the upstream server.
244    pub async fn list_tools(
245        &self,
246    ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
247        self.inner.list_tools().await
248    }
249
250    /// Calls a tool on the upstream server.
251    pub async fn call_tool(
252        &self,
253        params: &super::tool::CallToolRequestParams,
254    ) -> Result<super::tool::CallToolResult, super::Error> {
255        self.inner.call_tool(params).await
256    }
257
258    /// Calls a tool and converts the result into a [`ToolMessage`].
259    pub async fn call_tool_as_message(
260        &self,
261        params: &super::tool::CallToolRequestParams,
262        tool_call_id: String,
263    ) -> Result<crate::agent::completions::message::ToolMessage, super::Error>
264    {
265        self.inner.call_tool_as_message(params, tool_call_id).await
266    }
267
268    /// Returns all resources from the upstream server.
269    pub async fn list_resources(
270        &self,
271    ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
272        self.inner.list_resources().await
273    }
274
275    /// Reads a resource from the upstream server.
276    pub async fn read_resource(
277        &self,
278        uri: &str,
279    ) -> Result<super::resource::ReadResourceResult, super::Error> {
280        self.inner.read_resource(uri).await
281    }
282
283    /// Register a callback to fire whenever the upstream emits
284    /// `notifications/tools/list_changed`.
285    ///
286    /// **Timing:** the callback runs *after* the tool cache's write lock
287    /// is acquired but *before* the network paginate that replaces it.
288    /// That means readers blocked on the read lock won't return until the
289    /// new list is in place, and the callback observes the moment the
290    /// staleness window opens. The proxy uses this to emit its own
291    /// `notifications/tools/list_changed` to downstream clients at the
292    /// right instant.
293    ///
294    /// Replaces any previously-registered tools-list-changed callback.
295    /// All clones of this `Connection` share the same callback slot.
296    pub fn set_on_tools_list_changed<F>(&self, callback: F)
297    where
298        F: Fn() + Send + Sync + 'static,
299    {
300        self.inner.on_tools_list_changed.set(Arc::new(callback));
301    }
302
303    /// Register a callback to fire whenever the upstream emits
304    /// `notifications/resources/list_changed`. Same timing contract as
305    /// [`Connection::set_on_tools_list_changed`].
306    ///
307    /// Replaces any previously-registered resources-list-changed callback.
308    /// All clones of this `Connection` share the same callback slot.
309    pub fn set_on_resources_list_changed<F>(&self, callback: F)
310    where
311        F: Fn() + Send + Sync + 'static,
312    {
313        self.inner.on_resources_list_changed.set(Arc::new(callback));
314    }
315
316    /// Atomically replace the connection's [`ConnectionInner::extra_headers`]
317    /// bag. Every subsequent outbound HTTP request from this connection
318    /// stamps the new map AFTER `headers`, with `HeaderMap::insert`
319    /// REPLACE semantics — keys in `extras` override the same key in
320    /// the per-URL `headers` bag set at `Client::connect`. Caller
321    /// supplies the FULL replacement map; missing keys are dropped
322    /// (no merge).
323    ///
324    /// Used by the proxy to inject session-global headers
325    /// (`X-OBJECTIVEAI-RESPONSE-ID`, `X-OBJECTIVEAI-RESPONSE-IDS`)
326    /// that re-set on every inbound `initialize`, without re-dialing
327    /// the upstream.
328    pub async fn set_extra_headers(
329        &self,
330        extras: IndexMap<String, String>,
331    ) {
332        *self.inner.extra_headers.write().await = extras;
333    }
334}
335
336/// The actual connection state. Behind an `Arc` inside [`Connection`].
337///
338/// Fields are public for read-only access (callers reach them via
339/// `Connection`'s `Deref`), but every method on this type is private —
340/// the public surface lives on [`Connection`] and delegates through.
341#[derive(Debug)]
342pub struct ConnectionInner {
343    pub http_client: reqwest::Client,
344    pub url: String,
345    pub session_id: String,
346    /// All HTTP headers stamped on every POST / GET this connection
347    /// makes — the same merged map (defaults + caller overrides) the
348    /// `Client` built once during connect. `Mcp-Session-Id`,
349    /// `Content-Type`, and `Accept` are still set by the request
350    /// builders and override anything in `headers`.
351    pub headers: IndexMap<String, String>,
352    /// Mutable per-request override layer stamped AFTER `headers` on
353    /// every outbound HTTP request. The request-builder uses
354    /// `reqwest::header::HeaderMap::insert` semantics so any key
355    /// present in `extra_headers` REPLACES the same key in `headers`.
356    /// Used by the proxy to inject session-global headers
357    /// (`X-OBJECTIVEAI-RESPONSE-ID` etc.) that override per-URL
358    /// values without re-dialing. Empty by default; set via
359    /// [`Connection::set_extra_headers`].
360    pub extra_headers: RwLock<IndexMap<String, String>>,
361
362    pub backoff_current_interval: Duration,
363    pub backoff_initial_interval: Duration,
364    pub backoff_randomization_factor: f64,
365    pub backoff_multiplier: f64,
366    pub backoff_max_interval: Duration,
367    pub backoff_max_elapsed_time: Duration,
368    pub call_timeout: Duration,
369
370    /// The server's capabilities and info from the initialize response.
371    pub initialize_result: super::initialize_result::InitializeResult,
372
373    /// Auto-incrementing request ID (starts at 2; 1 was used for initialize).
374    next_id: AtomicU64,
375
376    /// All tools from the server, populated by background pagination.
377    ///
378    /// `None` = cache cleared — either pre-populate (between
379    /// [`Self::new`] and the first `refresh_tools_signaling`) or
380    /// post-drop (the listener empties this the moment its SSE
381    /// stream ends so `list_tools` will re-paginate against the
382    /// upstream rather than return stale state). `Some(_)` = last
383    /// known result, `Ok` or `Err`.
384    tools:
385        RwLock<Option<Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>>>>,
386    /// All resources from the server, populated by background pagination.
387    /// Same `None`/`Some` semantics as [`Self::tools`].
388    resources: RwLock<
389        Option<Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>>>,
390    >,
391
392    /// Cancellation token for the long-lived `listen_for_list_changes`
393    /// task. The listener selects this against every blocking await
394    /// (read, reconnect-send, backoff-sleep) and returns the instant it
395    /// fires.
396    ///
397    /// Held inside the connection as a [`DropGuard`] so that the moment
398    /// the last `Arc<ConnectionInner>` clone is dropped — i.e. the
399    /// moment no external `Connection` handle remains — `Drop` runs on
400    /// the guard, the token cancels, and the listener task tears down.
401    /// The listener itself holds a sibling `CancellationToken` (clone),
402    /// not the guard, so its task does not extend the connection's
403    /// lifetime.
404    ///
405    /// Wrapped in `Mutex<Option<_>>` so explicit teardown paths
406    /// ([`Connection::delete`]) can drop the guard in place — firing
407    /// the cancel token *before* the surrounding `Arc<ConnectionInner>`
408    /// goes away. Regular drop still works: `Mutex<Option<DropGuard>>`
409    /// drops its inner `DropGuard` automatically when the mutex itself
410    /// drops, so the listener is still cancelled on the last `Arc` drop.
411    _listener_cancel_guard: std::sync::Mutex<Option<DropGuard>>,
412
413    /// Optional callback fired *after* the listener has refreshed the
414    /// tool cache in response to an upstream `notifications/tools/list_changed`.
415    /// Set via [`Connection::set_on_tools_list_changed`].
416    on_tools_list_changed: CallbackSlot,
417
418    /// Optional callback fired *after* the listener has refreshed the
419    /// resource cache in response to an upstream
420    /// `notifications/resources/list_changed`.
421    /// Set via [`Connection::set_on_resources_list_changed`].
422    on_resources_list_changed: CallbackSlot,
423}
424
425impl ConnectionInner {
426    /// Creates a minimal connection for unit testing. Declares both
427    /// `tools` and `resources` capabilities with `list_changed:
428    /// Some(true)` so callers exercise the present-cap +
429    /// list_changed-enabled paths in `list_*`, `refresh_*`, and
430    /// `subscribe_*`. For other capability shapes use
431    /// `new_for_test_with_caps`.
432    #[cfg(test)]
433    fn new_for_test(name: String, url: String) -> Arc<Self> {
434        Self::new_for_test_with_caps(
435            name,
436            url,
437            super::initialize_result::ServerCapabilities {
438                experimental: None,
439                logging: None,
440                completions: None,
441                prompts: None,
442                resources: Some(
443                    super::initialize_result::ResourcesCapability {
444                        subscribe: None,
445                        list_changed: Some(true),
446                    },
447                ),
448                tools: Some(super::initialize_result::ToolsCapability {
449                    list_changed: Some(true),
450                }),
451                tasks: None,
452            },
453        )
454    }
455
456    /// Creates a minimal connection for unit testing with an explicit
457    /// `ServerCapabilities`. Used by the capability-gating tests to
458    /// drive each gate's absent-cap branch.
459    #[cfg(test)]
460    fn new_for_test_with_caps(
461        name: String,
462        url: String,
463        capabilities: super::initialize_result::ServerCapabilities,
464    ) -> Arc<Self> {
465        Arc::new(Self {
466            http_client: reqwest::Client::new(),
467            url,
468            session_id: String::new(),
469            headers: IndexMap::new(),
470            extra_headers: RwLock::new(IndexMap::new()),
471            backoff_current_interval: Duration::from_millis(500),
472            backoff_initial_interval: Duration::from_millis(500),
473            backoff_randomization_factor: 0.5,
474            backoff_multiplier: 1.5,
475            backoff_max_interval: Duration::from_secs(60),
476            backoff_max_elapsed_time: Duration::from_secs(900),
477            call_timeout: Duration::from_secs(30),
478            initialize_result: super::initialize_result::InitializeResult {
479                protocol_version: "2025-03-26".into(),
480                capabilities,
481                server_info: super::initialize_result::Implementation {
482                    name,
483                    title: None,
484                    version: "0.0.0".into(),
485                    website_url: None,
486                    description: None,
487                    icons: None,
488                },
489                instructions: None,
490                _meta: None,
491            },
492            next_id: AtomicU64::new(2),
493            // Test connection has no listener and never refreshes; seed
494            // with an empty Ok so `list_tools` doesn't try to paginate.
495            tools: RwLock::new(Some(Ok(Arc::new(Vec::new())))),
496            resources: RwLock::new(Some(Ok(Arc::new(Vec::new())))),
497            _listener_cancel_guard: std::sync::Mutex::new(None),
498            on_tools_list_changed: CallbackSlot::new(),
499            on_resources_list_changed: CallbackSlot::new(),
500        })
501    }
502
503    /// Creates a new connection and spawns background tasks to paginate
504    /// all tools and resources. Called internally by
505    /// [`Client::connect`](super::Client::connect) (via [`Connection::new`]).
506    ///
507    /// `initial_sse_lines`, if `Some`, is a pre-opened SSE line reader
508    /// that the list-changed listener will read from immediately on its
509    /// first iteration, instead of opening its own GET `/`. The caller
510    /// is responsible for arranging for one of these to exist whenever
511    /// the upstream advertises `tools.list_changed` or
512    /// `resources.list_changed` — see
513    /// [`Client::connect`](super::Client::connect).
514    async fn new(
515        http_client: reqwest::Client,
516        url: String,
517        session_id: String,
518        headers: IndexMap<String, String>,
519        backoff_current_interval: Duration,
520        backoff_initial_interval: Duration,
521        backoff_randomization_factor: f64,
522        backoff_multiplier: f64,
523        backoff_max_interval: Duration,
524        backoff_max_elapsed_time: Duration,
525        call_timeout: Duration,
526        initialize_result: super::initialize_result::InitializeResult,
527        initial_sse_lines: Option<super::LinesStream>,
528    ) -> Arc<Self> {
529        // Cancel-the-listener machinery: store the DropGuard inside the
530        // inner so the cancellation fires deterministically when the
531        // last external `Arc<ConnectionInner>` clone drops. Hand the
532        // listener task a sibling clone (no guard) — that way the
533        // listener task's lifetime does not extend the connection.
534        let listener_cancel = CancellationToken::new();
535        let listener_cancel_for_task = listener_cancel.clone();
536        let conn = Arc::new(Self {
537            http_client,
538            url,
539            session_id,
540            headers,
541            extra_headers: RwLock::new(IndexMap::new()),
542            backoff_current_interval,
543            backoff_initial_interval,
544            backoff_randomization_factor,
545            backoff_multiplier,
546            backoff_max_interval,
547            backoff_max_elapsed_time,
548            call_timeout,
549            initialize_result,
550            next_id: AtomicU64::new(2),
551            // Start empty; `refresh_tools_signaling` below installs
552            // `Some(_)` before `new` returns (the lock-handoff oneshot
553            // gates the return on the writer holding the lock).
554            tools: RwLock::new(None),
555            resources: RwLock::new(None),
556            _listener_cancel_guard: std::sync::Mutex::new(Some(
557                listener_cancel.drop_guard(),
558            )),
559            on_tools_list_changed: CallbackSlot::new(),
560            on_resources_list_changed: CallbackSlot::new(),
561        });
562
563        // Spawn background tool lister if the server supports tools.
564        //
565        // We don't return until the spawned task has acquired the write
566        // lock. Otherwise a caller that immediately reads `list_tools()`
567        // could race the writer — `tokio::spawn` only queues the task,
568        // and a fast reader can acquire the read lock before the writer
569        // has run its first instruction. The reader would then see the
570        // initial empty `Vec` and return that, even though a full
571        // populate is in flight.
572        //
573        // The `RwLockWriteGuard` itself isn't `Send`-friendly enough to
574        // pass back, so we use a oneshot to signal "I'm holding the
575        // lock now"; once we receive that, the cache is exclusively
576        // owned by the writer and any subsequent `read().await` from
577        // the caller is guaranteed to wait for the populate to finish.
578        if conn.initialize_result.capabilities.tools.is_some() {
579            let conn = Arc::clone(&conn);
580            let (lock_held_tx, lock_held_rx) = tokio::sync::oneshot::channel();
581            tokio::spawn(async move {
582                conn.refresh_tools_signaling(lock_held_tx, None).await;
583            });
584            // Wait for the writer to hold the lock before returning.
585            let _ = lock_held_rx.await;
586        }
587
588        // Spawn background resource lister if the server supports
589        // resources. Same lock-handoff contract as tools above.
590        if conn.initialize_result.capabilities.resources.is_some() {
591            let conn = Arc::clone(&conn);
592            let (lock_held_tx, lock_held_rx) = tokio::sync::oneshot::channel();
593            tokio::spawn(async move {
594                conn.refresh_resources_signaling(lock_held_tx, None).await;
595            });
596            let _ = lock_held_rx.await;
597        }
598
599        // Spawn the list-changed listener iff the caller handed us a
600        // pre-opened SSE stream. The connection is naive about
601        // `tools.list_changed` / `resources.list_changed` capabilities —
602        // [`Client::connect`](super::Client::connect) translates them
603        // into "did or didn't open a stream for us." If we get a stream,
604        // we listen on it; if we don't, there's nothing to listen for.
605        if let Some(initial_lines) = initial_sse_lines {
606            // Hand the listener a `Weak` so the spawned task itself does
607            // not keep the connection alive. `listener_cancel_for_task`
608            // is a sibling clone of the connection's own
609            // `_listener_cancel_guard` token — when the last external
610            // `Arc<ConnectionInner>` clone is dropped, the inner's Drop
611            // releases the guard and the listener wakes from any
612            // pending await (read, send, sleep) and exits immediately.
613            let weak = Arc::downgrade(&conn);
614            tokio::spawn(async move {
615                Self::listen_for_list_changes(
616                    weak,
617                    listener_cancel_for_task,
618                    initial_lines,
619                )
620                .await;
621            });
622        }
623
624        conn
625    }
626
627    /// Server declared a `tools` capability in its `InitializeResult`.
628    /// Gates `list_tools`, `call_tool`, `refresh_tools`. When `false` the
629    /// upstream cannot service `tools/*` RPCs at all and any attempt would
630    /// either hang in idempotent backoff (`tools/list`) or fail with
631    /// `SessionExpired` (`tools/call`).
632    fn has_tools_cap(&self) -> bool {
633        self.initialize_result.capabilities.tools.is_some()
634    }
635
636    /// Server declared a `resources` capability in its `InitializeResult`.
637    /// Gates `list_resources`, `read_resource`, `refresh_resources`, and
638    /// the post-`call_tool` ResourceLink resolution. Same hang-or-fail
639    /// shape as `has_tools_cap` when absent.
640    fn has_resources_cap(&self) -> bool {
641        self.initialize_result.capabilities.resources.is_some()
642    }
643
644    /// Creates an exponential backoff configuration from the connection's fields.
645    fn backoff(&self) -> backoff::ExponentialBackoff {
646        backoff::ExponentialBackoff {
647            current_interval: self.backoff_current_interval,
648            initial_interval: self.backoff_initial_interval,
649            randomization_factor: self.backoff_randomization_factor,
650            multiplier: self.backoff_multiplier,
651            max_interval: self.backoff_max_interval,
652            start_time: std::time::Instant::now(),
653            max_elapsed_time: Some(self.backoff_max_elapsed_time),
654            clock: backoff::SystemClock::default(),
655        }
656    }
657
658    /// Builds a POST request with all required headers and the call timeout.
659    async fn post(&self) -> reqwest::RequestBuilder {
660        self.http_client
661            .post(&self.url)
662            .timeout(self.call_timeout)
663            .headers(
664                self.build_request_headers(
665                    Some("application/json"),
666                    Some("application/json, text/event-stream"),
667                )
668                .await,
669            )
670    }
671
672    /// Build the `HeaderMap` stamped on every outbound request. Order
673    /// of insertion drives override semantics — `HeaderMap::insert`
674    /// REPLACES existing values for the same key:
675    ///
676    /// 1. Content-Type / Accept (when supplied by the caller).
677    /// 2. `self.headers` (the per-URL bag set at `Client::connect`).
678    /// 3. `self.extra_headers` (the mutable, session-global overrides
679    ///    — proxies use this for `X-OBJECTIVEAI-RESPONSE-ID` etc).
680    /// 4. `Mcp-Session-Id` (the connection's own session id, always
681    ///    last so it can never be shadowed).
682    async fn build_request_headers(
683        &self,
684        content_type: Option<&str>,
685        accept: Option<&str>,
686    ) -> reqwest::header::HeaderMap {
687        use reqwest::header::{
688            ACCEPT, CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue,
689        };
690        let mut hmap = HeaderMap::new();
691        if let Some(ct) = content_type {
692            if let Ok(hv) = HeaderValue::from_str(ct) {
693                hmap.insert(CONTENT_TYPE, hv);
694            }
695        }
696        if let Some(a) = accept {
697            if let Ok(hv) = HeaderValue::from_str(a) {
698                hmap.insert(ACCEPT, hv);
699            }
700        }
701        for (k, v) in &self.headers {
702            if let (Ok(hn), Ok(hv)) = (
703                HeaderName::try_from(k.as_str()),
704                HeaderValue::from_str(v),
705            ) {
706                hmap.insert(hn, hv);
707            }
708        }
709        let extras = self.extra_headers.read().await;
710        for (k, v) in extras.iter() {
711            if let (Ok(hn), Ok(hv)) = (
712                HeaderName::try_from(k.as_str()),
713                HeaderValue::from_str(v),
714            ) {
715                hmap.insert(hn, hv);
716            }
717        }
718        drop(extras);
719        if let Ok(hv) = HeaderValue::from_str(&self.session_id) {
720            hmap.insert(HeaderName::from_static("mcp-session-id"), hv);
721        }
722        hmap
723    }
724
725    /// Sends a JSON-RPC request, retrying transient errors when
726    /// `idempotent` is `true`.
727    ///
728    /// Idempotent methods (`tools/list`, `resources/list`,
729    /// `resources/read`, etc.) retry every transient error — network,
730    /// HTTP status, malformed body, JSON-RPC error, session expiration —
731    /// until the backoff's `max_elapsed_time` is exceeded.
732    ///
733    /// Non-idempotent methods (`tools/call`) make exactly one attempt.
734    /// Retrying a `tools/call` is unsafe: a tool may have mutated remote
735    /// state during the first attempt before the response was lost, and
736    /// re-firing the call would mutate state again. Each retry of
737    /// `AppendTask` advances `state.tasks.len()` an extra step, so the
738    /// agent sees a different return value than expected and the
739    /// pid-derived mock seed at the next step diverges. See
740    /// `objectiveai-api/src/agent/completions/client.rs` (sequential
741    /// dispatch) and `mock/client.rs::mock.seed_derive` for the
742    /// downstream consequence.
743    async fn rpc<P: serde::Serialize, R: serde::de::DeserializeOwned>(
744        &self,
745        method: &str,
746        params: &P,
747        idempotent: bool,
748    ) -> Result<R, super::Error> {
749        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
750        let body = serde_json::json!({
751            "jsonrpc": "2.0",
752            "id": id,
753            "method": method,
754            "params": params,
755        });
756
757        let attempt_one = || async {
758            let url = self.url.clone();
759            let response =
760                self.post().await.json(&body).send().await.map_err(|source| {
761                    backoff::Error::transient(super::Error::Request {
762                        url: url.clone(),
763                        source,
764                    })
765                })?;
766
767            if response.status() == reqwest::StatusCode::NOT_FOUND {
768                return Err(backoff::Error::transient(
769                    super::Error::SessionExpired { url: url.clone() },
770                ));
771            }
772            if !response.status().is_success() {
773                let code = response.status();
774                let body = response.text().await.unwrap_or_default();
775                return Err(backoff::Error::transient(
776                    super::Error::BadStatus {
777                        url: url.clone(),
778                        code,
779                        body,
780                    },
781                ));
782            }
783
784            let rpc_response: super::JsonRpcResponse<R> =
785                super::parse_streamable_http_response(&url, response)
786                    .await
787                    .map_err(backoff::Error::transient)?;
788
789            match rpc_response {
790                super::JsonRpcResponse::Success { result, .. } => Ok(result),
791                super::JsonRpcResponse::Error { error, .. } => {
792                    Err(backoff::Error::transient(super::Error::JsonRpc {
793                        url: url.clone(),
794                        code: error.code,
795                        message: error.message,
796                        data: error.data,
797                    }))
798                }
799            }
800        };
801
802        if idempotent {
803            backoff::future::retry(self.backoff(), attempt_one).await
804        } else {
805            attempt_one().await.map_err(|e| match e {
806                backoff::Error::Permanent(err)
807                | backoff::Error::Transient { err, .. } => err,
808            })
809        }
810    }
811
812    /// Returns a key identifying this connection for tool namespacing.
813    fn tool_key(&self) -> String {
814        format!("{}-{}", self.initialize_result.server_info.name, self.url)
815    }
816
817    /// Returns the session ID for this connection.
818    fn session_id(&self) -> &str {
819        &self.session_id
820    }
821
822    /// Sends a `tools/list` RPC call for a single page.
823    async fn rpc_list_tools(
824        &self,
825        cursor: Option<&str>,
826    ) -> Result<super::tool::ListToolsResult, super::Error> {
827        self.rpc(
828            "tools/list",
829            &super::tool::ListToolsRequest {
830                cursor: cursor.map(String::from),
831            },
832            true,
833        )
834        .await
835    }
836
837    /// Returns all tools from the server.
838    ///
839    /// Blocks until background pagination completes, then returns a
840    /// cheap `Arc` clone of the result. If the cache is currently
841    /// empty (e.g. because the listener detected its SSE stream drop
842    /// and cleared it) this paginates inline against the upstream —
843    /// the caller gets fresh data on the happy path, or the live
844    /// upstream error if the connection is genuinely down, rather
845    /// than stale pre-drop tools.
846    async fn list_tools(
847        &self,
848    ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
849        if !self.has_tools_cap() {
850            return Ok(Arc::new(Vec::new()));
851        }
852        if let Some(cached) = self.tools.read().await.as_ref() {
853            return cached.clone();
854        }
855        // Cache cleared; refresh inline. Concurrent callers may each
856        // refresh — wasteful, but the proxy fans out across distinct
857        // upstreams so a single Connection rarely sees concurrent
858        // `list_tools` calls.
859        self.refresh_tools(None).await;
860        self.tools
861            .read()
862            .await
863            .as_ref()
864            .expect("refresh_tools installs Some")
865            .clone()
866    }
867
868    /// Calls a tool on the MCP server. The returned
869    /// `CallToolResult.content` is **fully resolved**: every
870    /// `ContentBlock::ResourceLink { uri }` whose URI appears in
871    /// `list_resources` has already been replaced by one or more
872    /// `ContentBlock::EmbeddedResource` blocks carrying the fetched
873    /// contents (via `read_resource`). Unknown-URI links pass through
874    /// untouched — the upstream server may have its own out-of-band
875    /// resolution path the caller should preserve.
876    ///
877    /// Resolving inside `call_tool` (rather than downstream in
878    /// `call_tool_as_message`) means every consumer of the result
879    /// sees `EmbeddedResource` shapes uniformly; the stateless
880    /// `From<ContentBlock>` impl is then enough to convert the whole
881    /// result to `RichContent` with no further connection work.
882    async fn call_tool(
883        &self,
884        params: &super::tool::CallToolRequestParams,
885    ) -> Result<super::tool::CallToolResult, super::Error> {
886        if !self.has_tools_cap() {
887            return Err(super::Error::UnsupportedCapability {
888                capability: "tools",
889            });
890        }
891        let mut result: super::tool::CallToolResult =
892            self.rpc("tools/call", params, false).await?;
893
894        // Build the known-resource URI set for ResourceLink
895        // resolution. `list_resources` failure → empty set (same
896        // safe fallback the resolution path used previously).
897        let known_uris: std::collections::HashSet<String> =
898            match self.list_resources().await {
899                Ok(rs) => rs.iter().map(|r| r.uri.clone()).collect(),
900                Err(_) => std::collections::HashSet::new(),
901            };
902
903        // Walk the blocks, replacing each resolvable ResourceLink
904        // with one EmbeddedResource per returned ResourceContentsUnion.
905        // Everything else (Text, Image, Audio, EmbeddedResource,
906        // unknown-URI ResourceLinks) passes through.
907        let mut resolved: Vec<super::tool::ContentBlock> =
908            Vec::with_capacity(result.content.len());
909        for block in std::mem::take(&mut result.content) {
910            match block {
911                super::tool::ContentBlock::ResourceLink(link)
912                    if known_uris.contains(&link.uri) =>
913                {
914                    let read = self.read_resource(&link.uri).await?;
915                    for contents in read.contents {
916                        resolved.push(
917                            super::tool::ContentBlock::EmbeddedResource(
918                                super::tool::EmbeddedResource {
919                                    resource: contents,
920                                    // ResourceLink's annotations
921                                    // don't have a perfect home on
922                                    // EmbeddedResource — both fields
923                                    // exist but they describe different
924                                    // shapes (the link vs the inlined
925                                    // contents). Drop them on the way
926                                    // in; the EmbeddedResource is now
927                                    // a fresh authoritative block.
928                                    annotations: None,
929                                    _meta: None,
930                                },
931                            ),
932                        );
933                    }
934                }
935                other => resolved.push(other),
936            }
937        }
938        result.content = resolved;
939        Ok(result)
940    }
941
942    /// Calls a tool and converts the (already-resolved) result into a
943    /// [`ToolMessage`]. Resource resolution happens inside
944    /// [`Self::call_tool`] — by the time we get the blocks here every
945    /// resolvable `ResourceLink` has already been replaced with an
946    /// `EmbeddedResource`, so the conversion is a pure stateless
947    /// element-wise map through [`From<ContentBlock> for
948    /// RichContentPart`](crate::agent::completions::message::RichContentPart).
949    ///
950    /// Content-block mapping (handled by the `From` impl):
951    /// - `text` → text part
952    /// - `image` → image_url part (data URL)
953    /// - `audio` → input_audio part
954    /// - `embedded_resource` (text) → text part
955    /// - `embedded_resource` (blob, image mime) → image_url part
956    /// - `embedded_resource` (blob, audio mime) → input_audio part
957    /// - `embedded_resource` (blob, video mime) → input_video part
958    /// - `embedded_resource` (blob, other mime) → file part
959    /// - `resource_link` (unknown URI) → JSON-text fallback
960    ///   (resolvable URIs were already resolved upstream)
961    async fn call_tool_as_message(
962        &self,
963        params: &super::tool::CallToolRequestParams,
964        tool_call_id: String,
965    ) -> Result<crate::agent::completions::message::ToolMessage, super::Error>
966    {
967        use crate::agent::completions::message::{
968            RichContentPart, ToolMessage, ToolResponseMetadata,
969        };
970
971        let result = self.call_tool(params).await?;
972
973        let parts: Vec<RichContentPart> =
974            result.content.into_iter().map(Into::into).collect();
975
976        // Lossy-decode the MCP `_meta` extension bag into our typed
977        // `ToolResponseMetadata`. Unknown keys (set by non-objectiveai
978        // upstreams) are silently dropped. Decoding failure leaves
979        // metadata as `None`.
980        let metadata = result._meta.as_ref().and_then(|m| {
981            serde_json::from_value::<ToolResponseMetadata>(
982                serde_json::to_value(m).ok()?,
983            )
984            .ok()
985        });
986
987        Ok(ToolMessage {
988            content: parts.into(),
989            tool_call_id,
990            metadata,
991        })
992    }
993
994    /// Sends a `resources/list` RPC call for a single page.
995    async fn rpc_list_resources(
996        &self,
997        cursor: Option<&str>,
998    ) -> Result<super::resource::ListResourcesResult, super::Error> {
999        self.rpc(
1000            "resources/list",
1001            &super::resource::ListResourcesRequest {
1002                cursor: cursor.map(String::from),
1003            },
1004            true,
1005        )
1006        .await
1007    }
1008
1009    /// Returns all resources from the server.
1010    ///
1011    /// Same cache-or-refresh semantics as [`Self::list_tools`]: a
1012    /// cleared cache (post-drop or pre-populate) triggers an inline
1013    /// paginate against the upstream.
1014    async fn list_resources(
1015        &self,
1016    ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
1017        if !self.has_resources_cap() {
1018            return Ok(Arc::new(Vec::new()));
1019        }
1020        if let Some(cached) = self.resources.read().await.as_ref() {
1021            return cached.clone();
1022        }
1023        self.refresh_resources(None).await;
1024        self.resources
1025            .read()
1026            .await
1027            .as_ref()
1028            .expect("refresh_resources installs Some")
1029            .clone()
1030    }
1031
1032    /// Reads a resource from the MCP server.
1033    async fn read_resource(
1034        &self,
1035        uri: &str,
1036    ) -> Result<super::resource::ReadResourceResult, super::Error> {
1037        if !self.has_resources_cap() {
1038            return Err(super::Error::UnsupportedCapability {
1039                capability: "resources",
1040            });
1041        }
1042        self.rpc(
1043            "resources/read",
1044            &super::resource::ReadResourceRequestParams {
1045                uri: uri.to_string(),
1046            },
1047            true,
1048        )
1049        .await
1050    }
1051
1052    /// Re-fetches all tools from the server, replacing the cached list.
1053    ///
1054    /// Optionally fires `on_change` *after* the write lock is acquired but
1055    /// *before* the network paginate begins, so the callback observes the
1056    /// "list change is in flight" edge — readers blocked on the read lock
1057    /// won't return until the new list lands. The proxy uses this to
1058    /// re-emit `notifications/tools/list_changed` to its downstream client
1059    /// at the moment the staleness window opens.
1060    async fn refresh_tools(&self, on_change: Option<ListChangedCallback>) {
1061        if !self.has_tools_cap() {
1062            // No tools capability — install an empty Vec so the cache
1063            // contract holds (`list_tools`'s `.expect("refresh_tools
1064            // installs Some")` etc.) and return without paginating or
1065            // signalling. No `notify_waiters` and no `on_change` —
1066            // nothing real changed.
1067            let mut guard = self.tools.write().await;
1068            *guard = Some(Ok(Arc::new(Vec::new())));
1069            return;
1070        }
1071        // Listener-driven refresh. Visibility contract: any caller
1072        // that issues `list_tools()` after a `tools/list_changed`
1073        // notification has been observed must see the post-swap
1074        // value, not stale data — so the write lock has to gate
1075        // readers across the upstream paginate.
1076        //
1077        // Performance contract: don't serialise paginate *behind*
1078        // lock-acquisition latency. We start `tools.write()` and the
1079        // upstream paginate **concurrently** with `tokio::join!`. The
1080        // write-lock acquire blocks new `list_tools()` readers
1081        // immediately (preserving visibility) and runs in parallel
1082        // with whatever drain time the in-flight readers need; the
1083        // paginate runs alongside. Total wall-clock is
1084        // `max(drain_time, paginate_time)` instead of the sum.
1085        //
1086        // `on_change` fires under the write guard, *after* `*guard =
1087        // result`, so anyone the callback wakes queues on the read lock,
1088        // waits for the guard to drop, and observes the post-swap state.
1089        let (mut guard, result) =
1090            tokio::join!(self.tools.write(), self.paginate_tools(),);
1091        *guard = Some(result);
1092        if let Some(cb) = on_change {
1093            cb();
1094        }
1095    }
1096
1097    /// Page-by-page fetch of the upstream tool list, no locks held.
1098    /// Shared between the `_signaling` (initial-populate, holds lock
1099    /// for the original "block fast readers" contract) and `refresh_*`
1100    /// (listener-driven, lock-only-around-install) variants.
1101    async fn paginate_tools(
1102        &self,
1103    ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
1104        let mut all_tools = Vec::new();
1105        let mut cursor: Option<String> = None;
1106        loop {
1107            match self.rpc_list_tools(cursor.as_deref()).await {
1108                Ok(page) => {
1109                    all_tools.extend(page.tools);
1110                    cursor = page.next_cursor;
1111                    if cursor.is_none() {
1112                        return Ok(Arc::new(all_tools));
1113                    }
1114                }
1115                Err(e) => return Err(Arc::new(e)),
1116            }
1117        }
1118    }
1119
1120    /// Same as [`Self::refresh_tools`] but fires `lock_held` once the
1121    /// write lock has been acquired so the caller can synchronise on
1122    /// "writer is in possession of the cache" before returning. Used by
1123    /// `ConnectionInner::new` to prevent a fast reader from acquiring
1124    /// the read lock before this writer has even started.
1125    ///
1126    /// **Caller invariant:** the spawn site in `ConnectionInner::new`
1127    /// must gate this call on `capabilities.tools.is_some()`. This
1128    /// method assumes the tools capability is present and issues
1129    /// `tools/list` RPCs unconditionally — running it against a
1130    /// no-tools server triggers the 15-min idempotent-backoff storm.
1131    async fn refresh_tools_signaling(
1132        &self,
1133        lock_held: tokio::sync::oneshot::Sender<()>,
1134        on_change: Option<ListChangedCallback>,
1135    ) {
1136        let mut guard = self.tools.write().await;
1137        // Signal `lock_held` while we hold the write lock and *before*
1138        // installing the new list, so the `on_change` callback observes
1139        // the "list change in flight" edge — its readers queue behind
1140        // this write guard and always see the post-swap state.
1141        let _ = lock_held.send(());
1142        if let Some(cb) = on_change {
1143            cb();
1144        }
1145        let mut all_tools = Vec::new();
1146        let mut cursor: Option<String> = None;
1147        let result = loop {
1148            match self.rpc_list_tools(cursor.as_deref()).await {
1149                Ok(page) => {
1150                    all_tools.extend(page.tools);
1151                    cursor = page.next_cursor;
1152                    if cursor.is_none() {
1153                        break Ok(Arc::new(all_tools));
1154                    }
1155                }
1156                Err(e) => break Err(Arc::new(e)),
1157            }
1158        };
1159        *guard = Some(result);
1160    }
1161
1162    /// Re-fetches all resources from the server, replacing the cached list.
1163    /// See [`ConnectionInner::refresh_tools`] for the callback timing
1164    /// contract.
1165    async fn refresh_resources(&self, on_change: Option<ListChangedCallback>) {
1166        if !self.has_resources_cap() {
1167            // Symmetric to `refresh_tools` — see that gate.
1168            let mut guard = self.resources.write().await;
1169            *guard = Some(Ok(Arc::new(Vec::new())));
1170            return;
1171        }
1172        // Same paginate-while-acquiring-the-write-lock pattern as
1173        // `refresh_tools` — see that comment for the visibility +
1174        // performance rationale.
1175        let (mut guard, result) =
1176            tokio::join!(self.resources.write(), self.paginate_resources(),);
1177        *guard = Some(result);
1178        if let Some(cb) = on_change {
1179            cb();
1180        }
1181    }
1182
1183    async fn paginate_resources(
1184        &self,
1185    ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
1186        let mut all_resources = Vec::new();
1187        let mut cursor: Option<String> = None;
1188        loop {
1189            match self.rpc_list_resources(cursor.as_deref()).await {
1190                Ok(page) => {
1191                    all_resources.extend(page.resources);
1192                    cursor = page.next_cursor;
1193                    if cursor.is_none() {
1194                        return Ok(Arc::new(all_resources));
1195                    }
1196                }
1197                Err(e) => return Err(Arc::new(e)),
1198            }
1199        }
1200    }
1201
1202    /// Resource counterpart of [`Self::refresh_tools_signaling`]. The
1203    /// same spawn-site-gate invariant applies: the caller must gate
1204    /// the spawn on `capabilities.resources.is_some()`.
1205    async fn refresh_resources_signaling(
1206        &self,
1207        lock_held: tokio::sync::oneshot::Sender<()>,
1208        on_change: Option<ListChangedCallback>,
1209    ) {
1210        let mut guard = self.resources.write().await;
1211        // See `refresh_tools_signaling` — signal `lock_held` under the
1212        // write lock, before install, so the `on_change` callback's
1213        // readers see the post-swap state.
1214        let _ = lock_held.send(());
1215        if let Some(cb) = on_change {
1216            cb();
1217        }
1218        let mut all_resources = Vec::new();
1219        let mut cursor: Option<String> = None;
1220        let result = loop {
1221            match self.rpc_list_resources(cursor.as_deref()).await {
1222                Ok(page) => {
1223                    all_resources.extend(page.resources);
1224                    cursor = page.next_cursor;
1225                    if cursor.is_none() {
1226                        break Ok(Arc::new(all_resources));
1227                    }
1228                }
1229                Err(e) => break Err(Arc::new(e)),
1230            }
1231        };
1232        *guard = Some(result);
1233    }
1234
1235    /// Builds a GET request to the MCP endpoint for receiving server
1236    /// notifications via SSE.
1237    async fn get(&self) -> reqwest::RequestBuilder {
1238        self.http_client
1239            .get(&self.url)
1240            .headers(
1241                self.build_request_headers(None, Some("text/event-stream"))
1242                    .await,
1243            )
1244    }
1245
1246    /// Listens for `notifications/tools/list_changed` and
1247    /// `notifications/resources/list_changed` on an SSE stream. On each
1248    /// notification, write-locks and re-fetches the full list.
1249    ///
1250    /// `initial_lines` is the pre-opened SSE line reader handed in by
1251    /// [`Client::connect`](super::Client::connect) — that stream is
1252    /// consumed first. When it ends (or any later GET reconnect ends),
1253    /// we sleep `backoff_initial_interval` and open a fresh GET `/` SSE
1254    /// stream.
1255    ///
1256    /// Takes a [`Weak<Self>`] (not `Arc<Self>`) so the spawned task
1257    /// doesn't itself keep the [`Connection`] alive, and a
1258    /// [`CancellationToken`] sibling clone of the connection's
1259    /// [`DropGuard`] so the task tears down the instant the last
1260    /// external `Arc<ConnectionInner>` clone is dropped — every
1261    /// blocking await (line read, reconnect send, backoff sleep) is
1262    /// raced against `cancel.cancelled()` and exits without any zombie
1263    /// retries against a now-dead session.
1264    async fn listen_for_list_changes(
1265        weak: Weak<Self>,
1266        cancel: CancellationToken,
1267        initial_lines: super::LinesStream,
1268    ) {
1269        // First iteration: use the pre-opened SSE stream the client
1270        // handed us. After that, fall back to opening fresh GET / SSE
1271        // streams as the upstream connection cycles.
1272        let mut next_lines: Option<super::LinesStream> = Some(initial_lines);
1273        // One-shot guard for the catch-up refresh: false on the very
1274        // first iteration (the caller's pre-opened SSE stream — its
1275        // associated cache was just populated by `Client::connect`'s
1276        // initial pagination, so re-fetching there would just be a
1277        // wasted round-trip), true thereafter. Every stream end —
1278        // whether `Ok(None)` (clean close) or `Err(_)` (read failure)
1279        // — drops back here, which we treat as an implicit
1280        // list-changed notification: the upstream's broadcast (in
1281        // particular the proxy's per-session `tokio::broadcast`) is
1282        // lossy for moments when this listener has zero active
1283        // subscribers, so anything that fired during our disconnect
1284        // window may have been dropped.
1285        //
1286        // ORDER MATTERS. The refresh must run AFTER we've re-opened
1287        // the GET / SSE stream — i.e. after we're a subscriber again
1288        // — and BEFORE we enter the inner read loop. If we refreshed
1289        // before the resubscribe, a notification that fired between
1290        // our refresh-completion and our subscribe would be lost the
1291        // same way as the original disconnect-window drops; doing it
1292        // after means a notification fired DURING the refresh lands
1293        // in the new subscriber's buffer (broadcast::Sender::send
1294        // backs onto each receiver's channel-capacity slot) and gets
1295        // consumed by the inner loop on its next read.
1296        let mut is_reconnect = false;
1297
1298        loop {
1299            // The token cancels deterministically when the last
1300            // `Arc<ConnectionInner>` clone is dropped (see
1301            // `_listener_cancel_guard`). Check once per outer
1302            // iteration, but the real protection is the cancel arms in
1303            // every blocking await below — those exit immediately on
1304            // cancel.
1305            if cancel.is_cancelled() {
1306                return;
1307            }
1308            let Some(this) = weak.upgrade() else { return };
1309            let backoff_delay = this.backoff_initial_interval;
1310
1311            let mut lines = match next_lines.take() {
1312                Some(l) => l,
1313                None => {
1314                    // Race the upstream GET against cancellation — if
1315                    // the connection drops mid-reconnect, exit
1316                    // immediately rather than waiting for the request
1317                    // to complete or time out (otherwise produces a
1318                    // burst of 401 retries against a now-dead session
1319                    // under heavy churn).
1320                    let send_outcome = tokio::select! {
1321                        out = async {
1322                            this.get().await.send().await
1323                        } => out,
1324                        _ = cancel.cancelled() => {
1325                            drop(this);
1326                            return;
1327                        }
1328                    };
1329                    let response = match send_outcome {
1330                        Ok(r) if r.status().is_success() => r,
1331                        _ => {
1332                            drop(this);
1333                            // Sleep with cancel-arm: instant exit on
1334                            // drop, no zombie retries.
1335                            tokio::select! {
1336                                _ = tokio::time::sleep(backoff_delay) => {}
1337                                _ = cancel.cancelled() => return,
1338                            }
1339                            continue;
1340                        }
1341                    };
1342                    super::lines_from_response(response)
1343                }
1344            };
1345
1346            // Catch-up refresh on every reconnect — the implicit
1347            // list-changed treatment for the just-failed stream. See
1348            // the `is_reconnect` doc-comment above for the
1349            // refresh-AFTER-resubscribe rationale.
1350            if is_reconnect {
1351                // tools and resources are independent locks; run the
1352                // catch-up refreshes concurrently so disconnect
1353                // recovery isn't sequential.
1354                let _ = tokio::join!(
1355                    this.refresh_tools(this.on_tools_list_changed.get()),
1356                    this.refresh_resources(
1357                        this.on_resources_list_changed.get()
1358                    ),
1359                );
1360            }
1361            is_reconnect = true;
1362
1363            'inner: loop {
1364                tokio::select! {
1365                    line_result = lines.next_line() => {
1366                        match line_result {
1367                            Ok(Some(line)) => {
1368                                // SSE data lines start with "data: ".
1369                                let Some(data) = line.strip_prefix("data: ") else {
1370                                    continue 'inner;
1371                                };
1372                                let method = match serde_json::from_str::<super::JsonRpcNotification>(data) {
1373                                    Ok(n) => n.method,
1374                                    Err(_) => continue 'inner,
1375                                };
1376                                match method.as_str() {
1377                                    "notifications/tools/list_changed" => {
1378                                        // refresh_tools fires the
1379                                        // callback after the cache is
1380                                        // installed, so the proxy's
1381                                        // downstream
1382                                        // notifications/tools/list_changed
1383                                        // emission lines up with the
1384                                        // staleness window opening.
1385                                        this.refresh_tools(
1386                                            this.on_tools_list_changed.get(),
1387                                        )
1388                                        .await;
1389                                    }
1390                                    "notifications/resources/list_changed" => {
1391                                        this.refresh_resources(
1392                                            this.on_resources_list_changed.get(),
1393                                        )
1394                                        .await;
1395                                    }
1396                                    _ => {}
1397                                }
1398                            }
1399                            // Stream ended cleanly or errored — break out
1400                            // to the outer loop so we either reconnect or,
1401                            // if everyone's gone, exit at the top.
1402                            _ => break 'inner,
1403                        }
1404                    }
1405                    // Cancellation: the connection's last clone has
1406                    // dropped. Tear down immediately.
1407                    _ = cancel.cancelled() => {
1408                        drop(this);
1409                        return;
1410                    }
1411                }
1412            }
1413
1414            // Stream dropped — empty the per-Connection caches so the
1415            // next `list_tools` / `list_resources` paginates inline
1416            // against the (possibly still-dead) upstream rather than
1417            // returning whatever was cached before the drop. The
1418            // `is_reconnect` catch-up at the top of the next outer
1419            // iteration will repopulate `Some(_)` if the reconnect
1420            // succeeds; if the reconnect keeps failing, the cache
1421            // stays `None` and `list_*` callers paginate themselves.
1422            *this.tools.write().await = None;
1423            *this.resources.write().await = None;
1424
1425            // Stream ended — drop the strong ref before sleeping so the
1426            // next iteration's weak-upgrade can detect liveness honestly.
1427            drop(this);
1428            tokio::select! {
1429                _ = tokio::time::sleep(backoff_delay) => {}
1430                _ = cancel.cancelled() => return,
1431            }
1432        }
1433    }
1434}
1435
1436#[cfg(test)]
1437mod capability_gate_tests {
1438    use super::*;
1439    use crate::mcp::initialize_result::{
1440        ResourcesCapability, ServerCapabilities, ToolsCapability,
1441    };
1442    use crate::mcp::tool::{
1443        CallToolRequestParams, Tool, ToolSchemaObject, ToolSchemaType,
1444    };
1445
1446    /// Builds a `ServerCapabilities` with the given `tools` / `resources`
1447    /// shapes and every other capability set to `None`. Each gate test
1448    /// passes its own combination to exercise a specific cap-absent
1449    /// branch.
1450    fn caps(
1451        tools: Option<ToolsCapability>,
1452        resources: Option<ResourcesCapability>,
1453    ) -> ServerCapabilities {
1454        ServerCapabilities {
1455            experimental: None,
1456            logging: None,
1457            completions: None,
1458            prompts: None,
1459            resources,
1460            tools,
1461            tasks: None,
1462        }
1463    }
1464
1465    fn tool(name: &str) -> Tool {
1466        Tool {
1467            name: name.to_string(),
1468            title: None,
1469            description: None,
1470            icons: None,
1471            input_schema: ToolSchemaObject {
1472                r#type: ToolSchemaType::Object,
1473                properties: None,
1474                required: None,
1475                extra: IndexMap::new(),
1476            },
1477            output_schema: None,
1478            annotations: None,
1479            execution: None,
1480            _meta: None,
1481        }
1482    }
1483
1484    /// 3.1 — `list_tools` short-circuits to `Ok(empty)` when the server
1485    /// declared no `tools` capability, *before* the cache is consulted.
1486    /// We poison the cache with `Err` to prove the gate fires first.
1487    #[tokio::test]
1488    async fn list_tools_returns_empty_when_tools_cap_absent() {
1489        let conn = Connection::new_for_test_with_caps(
1490            "t".into(),
1491            "http://x".into(),
1492            caps(None, None),
1493        );
1494        let err = super::super::Error::NoSessionId {
1495            url: "http://x".into(),
1496            body: String::new(),
1497        };
1498        *conn.inner.tools.write().await = Some(Err(Arc::new(err)));
1499
1500        let got = conn.list_tools().await.unwrap();
1501        assert!(got.is_empty());
1502    }
1503
1504    /// 3.2 — symmetric to 3.1 for `list_resources`.
1505    #[tokio::test]
1506    async fn list_resources_returns_empty_when_resources_cap_absent() {
1507        let conn = Connection::new_for_test_with_caps(
1508            "t".into(),
1509            "http://x".into(),
1510            caps(None, None),
1511        );
1512        let err = super::super::Error::NoSessionId {
1513            url: "http://x".into(),
1514            body: String::new(),
1515        };
1516        *conn.inner.resources.write().await = Some(Err(Arc::new(err)));
1517
1518        let got = conn.list_resources().await.unwrap();
1519        assert!(got.is_empty());
1520    }
1521
1522    /// 3.3 — `read_resource` errors with `UnsupportedCapability` when
1523    /// the server declared no `resources` capability, without hitting
1524    /// the network.
1525    #[tokio::test]
1526    async fn read_resource_errors_when_resources_cap_absent() {
1527        let conn = Connection::new_for_test_with_caps(
1528            "t".into(),
1529            "http://x".into(),
1530            caps(None, None),
1531        );
1532        let got = conn.read_resource("file://nope").await;
1533        assert!(matches!(
1534            got,
1535            Err(super::super::Error::UnsupportedCapability {
1536                capability: "resources"
1537            })
1538        ));
1539    }
1540
1541    /// 3.4 — `call_tool` errors with `UnsupportedCapability` when the
1542    /// server declared no `tools` capability.
1543    #[tokio::test]
1544    async fn call_tool_errors_when_tools_cap_absent() {
1545        let conn = Connection::new_for_test_with_caps(
1546            "t".into(),
1547            "http://x".into(),
1548            caps(None, None),
1549        );
1550        let params = CallToolRequestParams {
1551            name: "any".into(),
1552            arguments: None,
1553            _meta: None,
1554            task: None,
1555        };
1556        let got = conn.call_tool(&params).await;
1557        assert!(matches!(
1558            got,
1559            Err(super::super::Error::UnsupportedCapability {
1560                capability: "tools"
1561            })
1562        ));
1563    }
1564
1565    /// 3.5 — `refresh_tools` installs `Some(Ok(empty))` and returns
1566    /// without paginating when the server declared no `tools`
1567    /// capability. Clearing the cache first proves the install ran.
1568    #[tokio::test]
1569    async fn refresh_tools_installs_empty_when_tools_cap_absent() {
1570        let conn = Connection::new_for_test_with_caps(
1571            "t".into(),
1572            "http://x".into(),
1573            caps(None, None),
1574        );
1575        *conn.inner.tools.write().await = None;
1576
1577        conn.inner.refresh_tools(None).await;
1578
1579        let guard = conn.inner.tools.read().await;
1580        let v = guard
1581            .as_ref()
1582            .expect("refresh installed Some")
1583            .as_ref()
1584            .expect("refresh installed Ok");
1585        assert!(v.is_empty());
1586    }
1587
1588    /// 3.6 — symmetric to 3.5 for `refresh_resources`.
1589    #[tokio::test]
1590    async fn refresh_resources_installs_empty_when_resources_cap_absent() {
1591        let conn = Connection::new_for_test_with_caps(
1592            "t".into(),
1593            "http://x".into(),
1594            caps(None, None),
1595        );
1596        *conn.inner.resources.write().await = None;
1597
1598        conn.inner.refresh_resources(None).await;
1599
1600        let guard = conn.inner.resources.read().await;
1601        let v = guard
1602            .as_ref()
1603            .expect("refresh installed Some")
1604            .as_ref()
1605            .expect("refresh installed Ok");
1606        assert!(v.is_empty());
1607    }
1608
1609}