Skip to main content

objectiveai_sdk/mcp/
connection.rs

1//! MCP connection for communicating with an MCP server.
2//!
3//! [`Connection`] is a cheaply-clonable handle around an internal
4//! [`ConnectionInner`]. The last drop of the inner `Arc` runs
5//! [`ConnectionInner`]'s `Drop`, which cancels the listener task's
6//! [`tokio_util::sync::CancellationToken`] (held in `_listener_cancel_guard`
7//! as a [`tokio_util::sync::DropGuard`]) — the SSE listener exits the
8//! instant any in-flight reconnect, sleep, or read is cancelled, with no
9//! zombie 401 retries against a now-dead proxy session.
10
11use std::ops::Deref;
12use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
13use std::sync::{Arc, RwLock as StdRwLock, Weak};
14use std::time::Duration;
15
16use indexmap::IndexMap;
17use tokio::sync::{Notify, RwLock};
18use tokio_util::sync::{CancellationToken, DropGuard};
19
20/// Callback fired by [`Connection`] when the upstream MCP server emits
21/// `notifications/tools/list_changed` or `notifications/resources/list_changed`.
22///
23/// **Timing:** runs after the corresponding cache's write lock is taken
24/// but *before* the network paginate that replaces it. That ordering
25/// matches the moment the staleness window opens — anyone blocked on the
26/// read lock won't return until the new list lands. The callback should
27/// not call back into `list_tools` / `list_resources`: doing so would
28/// re-take the lock the listener already holds and deadlock.
29///
30/// Stored behind an `Arc` so the listener task can cheaply clone it out
31/// of the lock and call it without holding the read guard.
32pub type ListChangedCallback = Arc<dyn Fn() + Send + Sync + 'static>;
33
34/// A registered-or-not callback slot. Wrapper so [`ConnectionInner`] can
35/// keep `#[derive(Debug)]` (a raw `dyn Fn` isn't `Debug`).
36struct CallbackSlot(StdRwLock<Option<ListChangedCallback>>);
37
38impl CallbackSlot {
39    fn new() -> Self {
40        Self(StdRwLock::new(None))
41    }
42
43    fn set(&self, callback: ListChangedCallback) {
44        *self.0.write().unwrap() = Some(callback);
45    }
46
47    /// Cheap clone-out of the current callback (if any). The `Arc` clone
48    /// lets us release the read guard before invoking the callback.
49    fn get(&self) -> Option<ListChangedCallback> {
50        self.0.read().unwrap().clone()
51    }
52}
53
54impl std::fmt::Debug for CallbackSlot {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        let set = self.0.read().map(|g| g.is_some()).unwrap_or(false);
57        f.debug_struct("CallbackSlot").field("set", &set).finish()
58    }
59}
60
61/// An active connection to an MCP server using the Streamable HTTP transport.
62///
63/// Cheaply clonable (one `Arc` bump). When the last clone is dropped, the
64/// inner `Arc` ref count hits zero, [`ConnectionInner::Drop`] runs, the
65/// listener-cancel `DropGuard` is dropped, and the SSE listener task is
66/// cancelled — exiting any in-flight `lines.next_line()`, reconnect
67/// `send()`, or backoff `sleep` *immediately* without retrying against
68/// the now-dead proxy session.
69///
70/// Use the public methods (`list_tools`, `call_tool`, `list_resources`,
71/// `read_resource`, `call_tool_as_message`, `tool_key`) for the upstream
72/// MCP protocol surface. The inner state ([`ConnectionInner`]) is also
73/// reachable via `Deref` for read-only field access (e.g.
74/// `connection.url`, `connection.initialize_result.server_info.name`),
75/// but its methods are private — you must go through `Connection`.
76#[derive(Debug)]
77pub struct Connection {
78    inner: Arc<ConnectionInner>,
79}
80
81impl Clone for Connection {
82    fn clone(&self) -> Self {
83        Self {
84            inner: Arc::clone(&self.inner),
85        }
86    }
87}
88
89// No `Drop` for `Connection`: cancellation happens deterministically
90// when the last `Arc<ConnectionInner>` clone is dropped, which runs
91// `ConnectionInner::drop` and releases the cancel-token DropGuard.
92
93impl Deref for Connection {
94    type Target = ConnectionInner;
95    fn deref(&self) -> &ConnectionInner {
96        &self.inner
97    }
98}
99
100impl Connection {
101    /// Tear this connection down explicitly.
102    ///
103    /// 1. Cancels the long-lived list-changed listener task immediately
104    ///    (drops the [`DropGuard`] in
105    ///    [`ConnectionInner::_listener_cancel_guard`]), so by the time
106    ///    the HTTP DELETE goes out the listener isn't still holding an
107    ///    SSE read open against the upstream we're about to close.
108    /// 2. Issues `DELETE /` to the upstream with this connection's
109    ///    `Mcp-Session-Id` and the same merged header set every other
110    ///    RPC stamps. Reuses [`ConnectionInner::call_timeout`].
111    /// 3. Treats `404 / 401 / 403` as success — the upstream is
112    ///    unreachable from these credentials anyway, which is the
113    ///    desired terminal state. Other non-2xx surfaces as
114    ///    [`super::Error::BadStatus`].
115    ///
116    /// Takes `&self`: the listener cancel is in-place, and dropping
117    /// the surrounding `Arc<ConnectionInner>` (which closes the rest
118    /// of the connection's owned state) is the caller's responsibility
119    /// — usually by dropping the `Arc<Session>` holding it. Stateless
120    /// callers that don't hold a `Connection` should use
121    /// [`Client::delete`](super::Client::delete) instead.
122    ///
123    /// **In-flight RPC ordering.** This method does not block on
124    /// in-flight `call_tool` / `read_resource` / `list_tools` /
125    /// `list_resources` calls on the same connection. If one is
126    /// outstanding when `delete` lands, the upstream may see DELETE
127    /// before the RPC's reply makes it back; the in-flight call then
128    /// surfaces as a closed-connection error to whoever started it.
129    /// That's the spec-correct order (client said terminate) — drain
130    /// on the caller side first if you need different semantics.
131    pub async fn delete(&self) -> Result<(), super::Error> {
132        // 1. Mark the connection as "used" so the drop-time
133        //    orphan-DELETE check (see `ConnectionInner::Drop`) skips
134        //    its own fan-out. Without this, a fresh-mint connection
135        //    that's explicitly torn down via `delete()` would race
136        //    its own drop-time orphan into a duplicate upstream
137        //    DELETE.
138        self.inner.any_calls.store(true, Ordering::Relaxed);
139
140        // 2. Drop the listener-cancel guard. Releasing the `DropGuard`
141        //    cancels the sibling `CancellationToken` the listener task
142        //    holds; the listener `tokio::select!`s against it on every
143        //    blocking await and exits inside one scheduler tick.
144        if let Ok(mut guard) = self.inner._listener_cancel_guard.lock() {
145            let _ = guard.take();
146        }
147
148        // 3. Build + send HTTP DELETE. Mirrors `Client::connect_once`'s
149        //    request-stamp shape: header loop first, explicit
150        //    `Mcp-Session-Id` always wins.
151        let request = self
152            .inner
153            .http_client
154            .delete(&self.inner.url)
155            .timeout(self.inner.call_timeout)
156            .headers(self.inner.build_request_headers(None, None).await);
157        let response = request.send().await.map_err(|source| {
158            super::Error::Request {
159                url: self.inner.url.clone(),
160                source,
161            }
162        })?;
163
164        // 4. 404 / 401 / 403 → success; other non-2xx → real error.
165        let status = response.status();
166        if matches!(
167            status,
168            reqwest::StatusCode::NOT_FOUND
169                | reqwest::StatusCode::UNAUTHORIZED
170                | reqwest::StatusCode::FORBIDDEN
171        ) {
172            return Ok(());
173        }
174        if !status.is_success() {
175            let body = response.text().await.unwrap_or_default();
176            return Err(super::Error::BadStatus {
177                url: self.inner.url.clone(),
178                code: status,
179                body: body.chars().take(800).collect(),
180            });
181        }
182        Ok(())
183    }
184
185    pub(super) async fn new(
186        http_client: reqwest::Client,
187        url: String,
188        session_id: String,
189        headers: IndexMap<String, String>,
190        backoff_current_interval: Duration,
191        backoff_initial_interval: Duration,
192        backoff_randomization_factor: f64,
193        backoff_multiplier: f64,
194        backoff_max_interval: Duration,
195        backoff_max_elapsed_time: Duration,
196        call_timeout: Duration,
197        initialize_result: super::initialize_result::InitializeResult,
198        initial_sse_lines: Option<super::LinesStream>,
199        is_reconnect: bool,
200    ) -> Self {
201        let inner = ConnectionInner::new(
202            http_client,
203            url,
204            session_id,
205            headers,
206            backoff_current_interval,
207            backoff_initial_interval,
208            backoff_randomization_factor,
209            backoff_multiplier,
210            backoff_max_interval,
211            backoff_max_elapsed_time,
212            call_timeout,
213            initialize_result,
214            initial_sse_lines,
215            is_reconnect,
216        )
217        .await;
218        Self { inner }
219    }
220
221    #[cfg(test)]
222    pub(crate) fn new_for_test(name: String, url: String) -> Self {
223        Self {
224            inner: ConnectionInner::new_for_test(name, url),
225        }
226    }
227
228    #[cfg(test)]
229    pub(crate) fn new_for_test_with_caps(
230        name: String,
231        url: String,
232        capabilities: super::initialize_result::ServerCapabilities,
233    ) -> Self {
234        Self {
235            inner: ConnectionInner::new_for_test_with_caps(
236                name,
237                url,
238                capabilities,
239            ),
240        }
241    }
242
243    /// Send a JSON-RPC notification to the upstream. Used by `Client`
244    /// right after `initialize` to send `notifications/initialized`.
245    pub(super) async fn notify<P: serde::Serialize>(
246        &self,
247        method: &str,
248        params: &P,
249    ) -> Result<(), super::Error> {
250        self.inner.notify(method, params).await
251    }
252
253    /// Returns a key identifying this connection for tool namespacing.
254    pub fn tool_key(&self) -> String {
255        self.inner.tool_key()
256    }
257
258    /// Returns the session ID for this connection.
259    pub fn session_id(&self) -> &str {
260        self.inner.session_id()
261    }
262
263    /// Returns all tools from the upstream server.
264    pub async fn list_tools(
265        &self,
266    ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
267        self.inner.list_tools().await
268    }
269
270    /// Calls a tool on the upstream server.
271    pub async fn call_tool(
272        &self,
273        params: &super::tool::CallToolRequestParams,
274    ) -> Result<super::tool::CallToolResult, super::Error> {
275        self.inner.call_tool(params).await
276    }
277
278    /// Calls a tool and converts the result into a [`ToolMessage`].
279    pub async fn call_tool_as_message(
280        &self,
281        params: &super::tool::CallToolRequestParams,
282        tool_call_id: String,
283    ) -> Result<crate::agent::completions::message::ToolMessage, super::Error>
284    {
285        self.inner.call_tool_as_message(params, tool_call_id).await
286    }
287
288    /// Returns all resources from the upstream server.
289    pub async fn list_resources(
290        &self,
291    ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
292        self.inner.list_resources().await
293    }
294
295    /// Returns the cached tool list as soon as it differs from `current`,
296    /// or waits up to `timeout` for the next `notifications/tools/list_changed`
297    /// from the upstream server before re-reading.
298    ///
299    /// Wakes the moment a refresh writer takes the cache write lock, so
300    /// the post-wake `read` is guaranteed to observe the new list rather
301    /// than racing against the install. Safe to call from any number of
302    /// tasks concurrently.
303    pub async fn subscribe_tools(
304        &self,
305        current: &[super::tool::Tool],
306        timeout: Duration,
307    ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
308        self.inner.subscribe_tools(current, timeout).await
309    }
310
311    /// Resource counterpart of [`Connection::subscribe_tools`].
312    pub async fn subscribe_resources(
313        &self,
314        current: &[super::resource::Resource],
315        timeout: Duration,
316    ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
317        self.inner.subscribe_resources(current, timeout).await
318    }
319
320    /// Atomically drain the proxy's `pending_notifications` queue for
321    /// this session via `GET /notify` and return the queued content
322    /// blocks. A second call returns `[]` until the next out-of-band
323    /// `POST /notify`.
324    ///
325    /// Intended for use at the start of an agent turn so notifications
326    /// queued between turns — when the prior turn ended without a tool
327    /// call, or the user is starting a fresh continuation — surface as
328    /// a user message instead of being lost. The proxy's existing
329    /// `tools/call` response path still drains in-flight notifications
330    /// arriving *during* a turn; this method covers the gap between
331    /// turns.
332    ///
333    /// A 404 from the proxy (session unknown — possible after a proxy
334    /// restart) is mapped to an empty `Vec` so callers do not need to
335    /// distinguish "no notifications" from "lost session" at the use
336    /// site; the next upstream call will surface the lost-session
337    /// condition through its own error path.
338    pub async fn drain_notifications(
339        &self,
340    ) -> Result<Vec<super::tool::ContentBlock>, super::Error> {
341        self.inner.drain_notifications().await
342    }
343
344    /// Non-draining peek at the proxy's `pending_notifications` queue
345    /// via `GET /notify/queued`. Returns `true` iff the queue holds at
346    /// least one block. Companion to [`Connection::drain_notifications`]
347    /// for callers that want to know whether queued blocks exist
348    /// without consuming them.
349    ///
350    /// A 404 from the proxy (session unknown — possible after a proxy
351    /// restart) is mapped to `Ok(false)` for the same reason as the
352    /// drain path: callers do not need to distinguish "no
353    /// notifications" from "lost session" at the use site.
354    pub async fn has_pending_notifications(
355        &self,
356    ) -> Result<bool, super::Error> {
357        self.inner.has_pending_notifications().await
358    }
359
360    /// `POST <self.url>/notify` against the ObjectiveAI MCP proxy.
361    /// Appends `blocks` to the proxy's pending-notifications queue for
362    /// this session; they surface as a user message on the next
363    /// `tools/call` response (wrapped in a `<system-reminder>` block)
364    /// or as the head of the next agent turn when drained between turns.
365    ///
366    /// Mirror of [`Connection::drain_notifications`] / [`Connection::has_pending_notifications`]
367    /// for the inbound side. A 404 from the proxy means the session is
368    /// gone — surfaced as `SessionExpired` so callers can distinguish
369    /// "session lost" from "delivery failed" at the use site.
370    pub async fn enqueue_notifications(
371        &self,
372        blocks: &[super::tool::ContentBlock],
373    ) -> Result<(), super::Error> {
374        self.inner.enqueue_notifications(blocks).await
375    }
376
377    /// Reads a resource from the upstream server.
378    pub async fn read_resource(
379        &self,
380        uri: &str,
381    ) -> Result<super::resource::ReadResourceResult, super::Error> {
382        self.inner.read_resource(uri).await
383    }
384
385    /// Register a callback to fire whenever the upstream emits
386    /// `notifications/tools/list_changed`.
387    ///
388    /// **Timing:** the callback runs *after* the tool cache's write lock
389    /// is acquired but *before* the network paginate that replaces it.
390    /// That means readers blocked on the read lock won't return until the
391    /// new list is in place, and the callback observes the moment the
392    /// staleness window opens. The proxy uses this to emit its own
393    /// `notifications/tools/list_changed` to downstream clients at the
394    /// right instant.
395    ///
396    /// Replaces any previously-registered tools-list-changed callback.
397    /// All clones of this `Connection` share the same callback slot.
398    pub fn set_on_tools_list_changed<F>(&self, callback: F)
399    where
400        F: Fn() + Send + Sync + 'static,
401    {
402        self.inner.on_tools_list_changed.set(Arc::new(callback));
403    }
404
405    /// Register a callback to fire whenever the upstream emits
406    /// `notifications/resources/list_changed`. Same timing contract as
407    /// [`Connection::set_on_tools_list_changed`].
408    ///
409    /// Replaces any previously-registered resources-list-changed callback.
410    /// All clones of this `Connection` share the same callback slot.
411    pub fn set_on_resources_list_changed<F>(&self, callback: F)
412    where
413        F: Fn() + Send + Sync + 'static,
414    {
415        self.inner.on_resources_list_changed.set(Arc::new(callback));
416    }
417
418    /// Atomically replace the connection's [`ConnectionInner::extra_headers`]
419    /// bag. Every subsequent outbound HTTP request from this connection
420    /// stamps the new map AFTER `headers`, with `HeaderMap::insert`
421    /// REPLACE semantics — keys in `extras` override the same key in
422    /// the per-URL `headers` bag set at `Client::connect`. Caller
423    /// supplies the FULL replacement map; missing keys are dropped
424    /// (no merge).
425    ///
426    /// Used by the proxy to inject session-global headers
427    /// (`X-OBJECTIVEAI-RESPONSE-ID`, `X-OBJECTIVEAI-RESPONSE-IDS`)
428    /// that re-set on every inbound `initialize`, without re-dialing
429    /// the upstream.
430    pub async fn set_extra_headers(
431        &self,
432        extras: IndexMap<String, String>,
433    ) {
434        *self.inner.extra_headers.write().await = extras;
435    }
436}
437
438/// The actual connection state. Behind an `Arc` inside [`Connection`].
439///
440/// Fields are public for read-only access (callers reach them via
441/// `Connection`'s `Deref`), but every method on this type is private —
442/// the public surface lives on [`Connection`] and delegates through.
443#[derive(Debug)]
444pub struct ConnectionInner {
445    pub http_client: reqwest::Client,
446    pub url: String,
447    pub session_id: String,
448    /// All HTTP headers stamped on every POST / GET this connection
449    /// makes — the same merged map (defaults + caller overrides) the
450    /// `Client` built once during connect. `Mcp-Session-Id`,
451    /// `Content-Type`, and `Accept` are still set by the request
452    /// builders and override anything in `headers`.
453    pub headers: IndexMap<String, String>,
454    /// Mutable per-request override layer stamped AFTER `headers` on
455    /// every outbound HTTP request. The request-builder uses
456    /// `reqwest::header::HeaderMap::insert` semantics so any key
457    /// present in `extra_headers` REPLACES the same key in `headers`.
458    /// Used by the proxy to inject session-global headers
459    /// (`X-OBJECTIVEAI-RESPONSE-ID` etc.) that override per-URL
460    /// values without re-dialing. Empty by default; set via
461    /// [`Connection::set_extra_headers`].
462    pub extra_headers: RwLock<IndexMap<String, String>>,
463
464    pub backoff_current_interval: Duration,
465    pub backoff_initial_interval: Duration,
466    pub backoff_randomization_factor: f64,
467    pub backoff_multiplier: f64,
468    pub backoff_max_interval: Duration,
469    pub backoff_max_elapsed_time: Duration,
470    pub call_timeout: Duration,
471
472    /// The server's capabilities and info from the initialize response.
473    pub initialize_result: super::initialize_result::InitializeResult,
474
475    /// `true` iff this connection was opened by resuming an existing
476    /// upstream session — i.e. [`Client::connect`](super::Client::connect)
477    /// was called with `session_id: Some(...)`. Set once at
478    /// construction; never mutated.
479    ///
480    /// Used by the drop-time orphan-DELETE gate in
481    /// [`ConnectionInner::Drop`]: reconnects are **excluded** from
482    /// orphan DELETE because their `any_calls == false` only means
483    /// "this `Connection` instance didn't use it" — an earlier
484    /// instance that opened the upstream session may have. Only
485    /// freshly-minted connections (`is_reconnect == false`) that no
486    /// one used get the drop-time orphan DELETE.
487    is_reconnect: bool,
488
489    /// `true` once any [`Self::call_tool`] or [`Self::read_resource`]
490    /// has been issued through this connection. Listings
491    /// ([`Self::list_tools`], [`Self::list_resources`]) and the
492    /// proxy-side notification helpers do NOT flip this — only
493    /// deliberate use of the upstream session counts.
494    ///
495    /// Stored atomically because the setters live behind `&self`-only
496    /// call paths; `Ordering::Relaxed` is sufficient (we never
497    /// synchronize anything else against this load/store).
498    ///
499    /// Also flipped to `true` at the top of [`super::Connection::delete`]
500    /// so an explicit teardown of a fresh-mint connection can't race
501    /// the drop-time orphan-DELETE into a double-fire.
502    any_calls: AtomicBool,
503
504    /// Auto-incrementing request ID (starts at 2; 1 was used for initialize).
505    next_id: AtomicU64,
506
507    /// All tools from the server, populated by background pagination.
508    ///
509    /// `None` = cache cleared — either pre-populate (between
510    /// [`Self::new`] and the first `refresh_tools_signaling`) or
511    /// post-drop (the listener empties this the moment its SSE
512    /// stream ends so `list_tools` will re-paginate against the
513    /// upstream rather than return stale state). `Some(_)` = last
514    /// known result, `Ok` or `Err`.
515    tools:
516        RwLock<Option<Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>>>>,
517    /// All resources from the server, populated by background pagination.
518    /// Same `None`/`Some` semantics as [`Self::tools`].
519    resources: RwLock<
520        Option<Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>>>,
521    >,
522
523    /// Cancellation token for the long-lived `listen_for_list_changes`
524    /// task. The listener selects this against every blocking await
525    /// (read, reconnect-send, backoff-sleep) and returns the instant it
526    /// fires.
527    ///
528    /// Held inside the connection as a [`DropGuard`] so that the moment
529    /// the last `Arc<ConnectionInner>` clone is dropped — i.e. the
530    /// moment no external `Connection` handle remains — `Drop` runs on
531    /// the guard, the token cancels, and the listener task tears down.
532    /// The listener itself holds a sibling `CancellationToken` (clone),
533    /// not the guard, so its task does not extend the connection's
534    /// lifetime.
535    ///
536    /// Wrapped in `Mutex<Option<_>>` so explicit teardown paths
537    /// ([`Connection::delete`]) can drop the guard in place — firing
538    /// the cancel token *before* the surrounding `Arc<ConnectionInner>`
539    /// goes away. Regular drop still works: `Mutex<Option<DropGuard>>`
540    /// drops its inner `DropGuard` automatically when the mutex itself
541    /// drops, so existing `Connection::Drop` semantics are unchanged.
542    _listener_cancel_guard: std::sync::Mutex<Option<DropGuard>>,
543
544    /// Optional callback fired *after* the listener has refreshed the
545    /// tool cache in response to an upstream `notifications/tools/list_changed`.
546    /// Set via [`Connection::set_on_tools_list_changed`].
547    on_tools_list_changed: CallbackSlot,
548
549    /// Optional callback fired *after* the listener has refreshed the
550    /// resource cache in response to an upstream
551    /// `notifications/resources/list_changed`.
552    /// Set via [`Connection::set_on_resources_list_changed`].
553    on_resources_list_changed: CallbackSlot,
554
555    /// Wakes any task awaiting in [`Connection::subscribe_tools`]. Fired
556    /// from inside `refresh_tools_signaling` the moment the writer
557    /// acquires the cache write lock — *before* the new list is
558    /// installed. A woken subscriber's next `read().await` blocks behind
559    /// the writer's guard, so it always observes the post-swap state.
560    tools_changed: Notify,
561
562    /// Resource counterpart of [`Self::tools_changed`].
563    resources_changed: Notify,
564}
565
566impl ConnectionInner {
567    /// Creates a minimal connection for unit testing. Declares both
568    /// `tools` and `resources` capabilities with `list_changed:
569    /// Some(true)` so callers exercise the present-cap +
570    /// list_changed-enabled paths in `list_*`, `refresh_*`, and
571    /// `subscribe_*`. For other capability shapes use
572    /// `new_for_test_with_caps`.
573    #[cfg(test)]
574    fn new_for_test(name: String, url: String) -> Arc<Self> {
575        Self::new_for_test_with_caps(
576            name,
577            url,
578            super::initialize_result::ServerCapabilities {
579                experimental: None,
580                logging: None,
581                completions: None,
582                prompts: None,
583                resources: Some(
584                    super::initialize_result::ResourcesCapability {
585                        subscribe: None,
586                        list_changed: Some(true),
587                    },
588                ),
589                tools: Some(super::initialize_result::ToolsCapability {
590                    list_changed: Some(true),
591                }),
592                tasks: None,
593            },
594        )
595    }
596
597    /// Creates a minimal connection for unit testing with an explicit
598    /// `ServerCapabilities`. Used by the capability-gating tests to
599    /// drive each gate's absent-cap branch.
600    #[cfg(test)]
601    fn new_for_test_with_caps(
602        name: String,
603        url: String,
604        capabilities: super::initialize_result::ServerCapabilities,
605    ) -> Arc<Self> {
606        Arc::new(Self {
607            http_client: reqwest::Client::new(),
608            url,
609            session_id: String::new(),
610            headers: IndexMap::new(),
611            extra_headers: RwLock::new(IndexMap::new()),
612            backoff_current_interval: Duration::from_millis(500),
613            backoff_initial_interval: Duration::from_millis(500),
614            backoff_randomization_factor: 0.5,
615            backoff_multiplier: 1.5,
616            backoff_max_interval: Duration::from_secs(60),
617            backoff_max_elapsed_time: Duration::from_secs(900),
618            call_timeout: Duration::from_secs(30),
619            initialize_result: super::initialize_result::InitializeResult {
620                protocol_version: "2025-03-26".into(),
621                capabilities,
622                server_info: super::initialize_result::Implementation {
623                    name,
624                    title: None,
625                    version: "0.0.0".into(),
626                    website_url: None,
627                    description: None,
628                    icons: None,
629                },
630                instructions: None,
631                _meta: None,
632            },
633            is_reconnect: false,
634            any_calls: AtomicBool::new(false),
635            next_id: AtomicU64::new(2),
636            // Test connection has no listener and never refreshes; seed
637            // with an empty Ok so `list_tools` doesn't try to paginate.
638            tools: RwLock::new(Some(Ok(Arc::new(Vec::new())))),
639            resources: RwLock::new(Some(Ok(Arc::new(Vec::new())))),
640            _listener_cancel_guard: std::sync::Mutex::new(None),
641            on_tools_list_changed: CallbackSlot::new(),
642            on_resources_list_changed: CallbackSlot::new(),
643            tools_changed: Notify::new(),
644            resources_changed: Notify::new(),
645        })
646    }
647
648    /// Creates a new connection and spawns background tasks to paginate
649    /// all tools and resources. Called internally by
650    /// [`Client::connect`](super::Client::connect) (via [`Connection::new`]).
651    ///
652    /// `initial_sse_lines`, if `Some`, is a pre-opened SSE line reader
653    /// that the list-changed listener will read from immediately on its
654    /// first iteration, instead of opening its own GET `/`. The caller
655    /// is responsible for arranging for one of these to exist whenever
656    /// the upstream advertises `tools.list_changed` or
657    /// `resources.list_changed` — see
658    /// [`Client::connect`](super::Client::connect).
659    async fn new(
660        http_client: reqwest::Client,
661        url: String,
662        session_id: String,
663        headers: IndexMap<String, String>,
664        backoff_current_interval: Duration,
665        backoff_initial_interval: Duration,
666        backoff_randomization_factor: f64,
667        backoff_multiplier: f64,
668        backoff_max_interval: Duration,
669        backoff_max_elapsed_time: Duration,
670        call_timeout: Duration,
671        initialize_result: super::initialize_result::InitializeResult,
672        initial_sse_lines: Option<super::LinesStream>,
673        is_reconnect: bool,
674    ) -> Arc<Self> {
675        // Cancel-the-listener machinery: store the DropGuard inside the
676        // inner so the cancellation fires deterministically when the
677        // last external `Arc<ConnectionInner>` clone drops. Hand the
678        // listener task a sibling clone (no guard) — that way the
679        // listener task's lifetime does not extend the connection.
680        let listener_cancel = CancellationToken::new();
681        let listener_cancel_for_task = listener_cancel.clone();
682        let conn = Arc::new(Self {
683            http_client,
684            url,
685            session_id,
686            headers,
687            extra_headers: RwLock::new(IndexMap::new()),
688            backoff_current_interval,
689            backoff_initial_interval,
690            backoff_randomization_factor,
691            backoff_multiplier,
692            backoff_max_interval,
693            backoff_max_elapsed_time,
694            call_timeout,
695            initialize_result,
696            is_reconnect,
697            any_calls: AtomicBool::new(false),
698            next_id: AtomicU64::new(2),
699            // Start empty; `refresh_tools_signaling` below installs
700            // `Some(_)` before `new` returns (the lock-handoff oneshot
701            // gates the return on the writer holding the lock).
702            tools: RwLock::new(None),
703            resources: RwLock::new(None),
704            _listener_cancel_guard: std::sync::Mutex::new(Some(
705                listener_cancel.drop_guard(),
706            )),
707            on_tools_list_changed: CallbackSlot::new(),
708            on_resources_list_changed: CallbackSlot::new(),
709            tools_changed: Notify::new(),
710            resources_changed: Notify::new(),
711        });
712
713        // Spawn background tool lister if the server supports tools.
714        //
715        // We don't return until the spawned task has acquired the write
716        // lock. Otherwise a caller that immediately reads `list_tools()`
717        // could race the writer — `tokio::spawn` only queues the task,
718        // and a fast reader can acquire the read lock before the writer
719        // has run its first instruction. The reader would then see the
720        // initial empty `Vec` and return that, even though a full
721        // populate is in flight.
722        //
723        // The `RwLockWriteGuard` itself isn't `Send`-friendly enough to
724        // pass back, so we use a oneshot to signal "I'm holding the
725        // lock now"; once we receive that, the cache is exclusively
726        // owned by the writer and any subsequent `read().await` from
727        // the caller is guaranteed to wait for the populate to finish.
728        if conn.initialize_result.capabilities.tools.is_some() {
729            let conn = Arc::clone(&conn);
730            let (lock_held_tx, lock_held_rx) = tokio::sync::oneshot::channel();
731            tokio::spawn(async move {
732                conn.refresh_tools_signaling(lock_held_tx, None).await;
733            });
734            // Wait for the writer to hold the lock before returning.
735            let _ = lock_held_rx.await;
736        }
737
738        // Spawn background resource lister if the server supports
739        // resources. Same lock-handoff contract as tools above.
740        if conn.initialize_result.capabilities.resources.is_some() {
741            let conn = Arc::clone(&conn);
742            let (lock_held_tx, lock_held_rx) = tokio::sync::oneshot::channel();
743            tokio::spawn(async move {
744                conn.refresh_resources_signaling(lock_held_tx, None).await;
745            });
746            let _ = lock_held_rx.await;
747        }
748
749        // Spawn the list-changed listener iff the caller handed us a
750        // pre-opened SSE stream. The connection is naive about
751        // `tools.list_changed` / `resources.list_changed` capabilities —
752        // [`Client::connect`](super::Client::connect) translates them
753        // into "did or didn't open a stream for us." If we get a stream,
754        // we listen on it; if we don't, there's nothing to listen for.
755        if let Some(initial_lines) = initial_sse_lines {
756            // Hand the listener a `Weak` so the spawned task itself does
757            // not keep the connection alive. `listener_cancel_for_task`
758            // is a sibling clone of the connection's own
759            // `_listener_cancel_guard` token — when the last external
760            // `Arc<ConnectionInner>` clone is dropped, the inner's Drop
761            // releases the guard and the listener wakes from any
762            // pending await (read, send, sleep) and exits immediately.
763            let weak = Arc::downgrade(&conn);
764            tokio::spawn(async move {
765                Self::listen_for_list_changes(
766                    weak,
767                    listener_cancel_for_task,
768                    initial_lines,
769                )
770                .await;
771            });
772        }
773
774        conn
775    }
776
777    /// Server declared a `tools` capability in its `InitializeResult`.
778    /// Gates `list_tools`, `call_tool`, `refresh_tools`. When `false` the
779    /// upstream cannot service `tools/*` RPCs at all and any attempt would
780    /// either hang in idempotent backoff (`tools/list`) or fail with
781    /// `SessionExpired` (`tools/call`).
782    fn has_tools_cap(&self) -> bool {
783        self.initialize_result.capabilities.tools.is_some()
784    }
785
786    /// Server declared a `resources` capability in its `InitializeResult`.
787    /// Gates `list_resources`, `read_resource`, `refresh_resources`, and
788    /// the post-`call_tool` ResourceLink resolution. Same hang-or-fail
789    /// shape as `has_tools_cap` when absent.
790    fn has_resources_cap(&self) -> bool {
791        self.initialize_result.capabilities.resources.is_some()
792    }
793
794    /// Server declared `tools.list_changed: true`. Gates
795    /// `subscribe_tools`'s wait-for-notify branch: when `false` the
796    /// upstream will never push `notifications/tools/list_changed`, so
797    /// awaiting the notify is unreachable and `subscribe_tools` returns
798    /// the current cache immediately.
799    fn has_tools_list_changed(&self) -> bool {
800        matches!(
801            self.initialize_result.capabilities.tools,
802            Some(super::initialize_result::ToolsCapability {
803                list_changed: Some(true),
804            })
805        )
806    }
807
808    /// Server declared `resources.list_changed: true`. Symmetric to
809    /// `has_tools_list_changed`; gates `subscribe_resources`'s
810    /// wait-for-notify branch.
811    fn has_resources_list_changed(&self) -> bool {
812        matches!(
813            self.initialize_result.capabilities.resources,
814            Some(super::initialize_result::ResourcesCapability {
815                list_changed: Some(true),
816                ..
817            })
818        )
819    }
820
821    /// Creates an exponential backoff configuration from the connection's fields.
822    fn backoff(&self) -> backoff::ExponentialBackoff {
823        backoff::ExponentialBackoff {
824            current_interval: self.backoff_current_interval,
825            initial_interval: self.backoff_initial_interval,
826            randomization_factor: self.backoff_randomization_factor,
827            multiplier: self.backoff_multiplier,
828            max_interval: self.backoff_max_interval,
829            start_time: std::time::Instant::now(),
830            max_elapsed_time: Some(self.backoff_max_elapsed_time),
831            clock: backoff::SystemClock::default(),
832        }
833    }
834
835    /// Builds a POST request with all required headers and the call timeout.
836    async fn post(&self) -> reqwest::RequestBuilder {
837        self.http_client
838            .post(&self.url)
839            .timeout(self.call_timeout)
840            .headers(
841                self.build_request_headers(
842                    Some("application/json"),
843                    Some("application/json, text/event-stream"),
844                )
845                .await,
846            )
847    }
848
849    /// Build the `HeaderMap` stamped on every outbound request. Order
850    /// of insertion drives override semantics — `HeaderMap::insert`
851    /// REPLACES existing values for the same key:
852    ///
853    /// 1. Content-Type / Accept (when supplied by the caller).
854    /// 2. `self.headers` (the per-URL bag set at `Client::connect`).
855    /// 3. `self.extra_headers` (the mutable, session-global overrides
856    ///    — proxies use this for `X-OBJECTIVEAI-RESPONSE-ID` etc).
857    /// 4. `Mcp-Session-Id` (the connection's own session id, always
858    ///    last so it can never be shadowed).
859    async fn build_request_headers(
860        &self,
861        content_type: Option<&str>,
862        accept: Option<&str>,
863    ) -> reqwest::header::HeaderMap {
864        use reqwest::header::{
865            ACCEPT, CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue,
866        };
867        let mut hmap = HeaderMap::new();
868        if let Some(ct) = content_type {
869            if let Ok(hv) = HeaderValue::from_str(ct) {
870                hmap.insert(CONTENT_TYPE, hv);
871            }
872        }
873        if let Some(a) = accept {
874            if let Ok(hv) = HeaderValue::from_str(a) {
875                hmap.insert(ACCEPT, hv);
876            }
877        }
878        for (k, v) in &self.headers {
879            if let (Ok(hn), Ok(hv)) = (
880                HeaderName::try_from(k.as_str()),
881                HeaderValue::from_str(v),
882            ) {
883                hmap.insert(hn, hv);
884            }
885        }
886        let extras = self.extra_headers.read().await;
887        for (k, v) in extras.iter() {
888            if let (Ok(hn), Ok(hv)) = (
889                HeaderName::try_from(k.as_str()),
890                HeaderValue::from_str(v),
891            ) {
892                hmap.insert(hn, hv);
893            }
894        }
895        drop(extras);
896        if let Ok(hv) = HeaderValue::from_str(&self.session_id) {
897            hmap.insert(HeaderName::from_static("mcp-session-id"), hv);
898        }
899        hmap
900    }
901
902    /// Sends a JSON-RPC request, retrying transient errors when
903    /// `idempotent` is `true`.
904    ///
905    /// Idempotent methods (`tools/list`, `resources/list`,
906    /// `resources/read`, etc.) retry every transient error — network,
907    /// HTTP status, malformed body, JSON-RPC error, session expiration —
908    /// until the backoff's `max_elapsed_time` is exceeded.
909    ///
910    /// Non-idempotent methods (`tools/call`) make exactly one attempt.
911    /// Retrying a `tools/call` is unsafe: a tool may have mutated remote
912    /// state during the first attempt before the response was lost, and
913    /// re-firing the call would mutate state again. Each retry of
914    /// `AppendTask` advances `state.tasks.len()` an extra step, so the
915    /// agent sees a different return value than expected and the
916    /// pid-derived mock seed at the next step diverges. See
917    /// `objectiveai-api/src/agent/completions/client.rs` (sequential
918    /// dispatch) and `mock/client.rs::mock.seed_derive` for the
919    /// downstream consequence.
920    async fn rpc<P: serde::Serialize, R: serde::de::DeserializeOwned>(
921        &self,
922        method: &str,
923        params: &P,
924        idempotent: bool,
925    ) -> Result<R, super::Error> {
926        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
927        let body = serde_json::json!({
928            "jsonrpc": "2.0",
929            "id": id,
930            "method": method,
931            "params": params,
932        });
933
934        let attempt_one = || async {
935            let url = self.url.clone();
936            let response =
937                self.post().await.json(&body).send().await.map_err(|source| {
938                    backoff::Error::transient(super::Error::Request {
939                        url: url.clone(),
940                        source,
941                    })
942                })?;
943
944            if response.status() == reqwest::StatusCode::NOT_FOUND {
945                return Err(backoff::Error::transient(
946                    super::Error::SessionExpired { url: url.clone() },
947                ));
948            }
949            if !response.status().is_success() {
950                let code = response.status();
951                let body = response.text().await.unwrap_or_default();
952                return Err(backoff::Error::transient(
953                    super::Error::BadStatus {
954                        url: url.clone(),
955                        code,
956                        body,
957                    },
958                ));
959            }
960
961            let rpc_response: super::JsonRpcResponse<R> =
962                super::parse_streamable_http_response(&url, response)
963                    .await
964                    .map_err(backoff::Error::transient)?;
965
966            match rpc_response {
967                super::JsonRpcResponse::Success { result, .. } => Ok(result),
968                super::JsonRpcResponse::Error { error, .. } => {
969                    Err(backoff::Error::transient(super::Error::JsonRpc {
970                        url: url.clone(),
971                        code: error.code,
972                        message: error.message,
973                        data: error.data,
974                    }))
975                }
976            }
977        };
978
979        if idempotent {
980            backoff::future::retry(self.backoff(), attempt_one).await
981        } else {
982            attempt_one().await.map_err(|e| match e {
983                backoff::Error::Permanent(err)
984                | backoff::Error::Transient { err, .. } => err,
985            })
986        }
987    }
988
989    /// Sends a JSON-RPC notification (no response expected) with the
990    /// same exponential-backoff retry policy as [`Self::rpc`]. Every
991    /// error is transient; the loop gives up only when the backoff's
992    /// `max_elapsed_time` is exceeded.
993    async fn notify<P: serde::Serialize>(
994        &self,
995        method: &str,
996        params: &P,
997    ) -> Result<(), super::Error> {
998        let body = serde_json::json!({
999            "jsonrpc": "2.0",
1000            "method": method,
1001            "params": params,
1002        });
1003
1004        backoff::future::retry(self.backoff(), || async {
1005            let url = self.url.clone();
1006            let response =
1007                self.post().await.json(&body).send().await.map_err(|source| {
1008                    backoff::Error::transient(super::Error::Request {
1009                        url: url.clone(),
1010                        source,
1011                    })
1012                })?;
1013
1014            if response.status() == reqwest::StatusCode::NOT_FOUND {
1015                return Err(backoff::Error::transient(
1016                    super::Error::SessionExpired { url: url.clone() },
1017                ));
1018            }
1019            if !response.status().is_success() {
1020                let code = response.status();
1021                let body = response.text().await.unwrap_or_default();
1022                return Err(backoff::Error::transient(
1023                    super::Error::BadStatus {
1024                        url: url.clone(),
1025                        code,
1026                        body,
1027                    },
1028                ));
1029            }
1030
1031            Ok(())
1032        })
1033        .await
1034    }
1035
1036    /// `GET <self.url>/notify` against the ObjectiveAI MCP proxy.
1037    /// Atomically drains the proxy's pending-notifications queue for
1038    /// this session and returns the queued content blocks.
1039    ///
1040    /// Single-attempt — the proxy drain is destructive, so a retry
1041    /// after a transient failure would risk silently dropping
1042    /// notifications that the first attempt's response carried but
1043    /// failed to deliver. Networks errors propagate to the caller; the
1044    /// next turn's drain will pick up anything queued in the meantime.
1045    /// A 404 (session unknown) is mapped to `Ok(vec![])` — see the
1046    /// public method's doc on `Connection`.
1047    async fn drain_notifications(
1048        &self,
1049    ) -> Result<Vec<super::tool::ContentBlock>, super::Error> {
1050        let url = format!("{}/notify", self.url.trim_end_matches('/'));
1051        let request = self
1052            .http_client
1053            .get(&url)
1054            .timeout(self.call_timeout)
1055            .headers(
1056                self.build_request_headers(None, Some("application/json"))
1057                    .await,
1058            );
1059
1060        let response =
1061            request
1062                .send()
1063                .await
1064                .map_err(|source| super::Error::Request {
1065                    url: url.clone(),
1066                    source,
1067                })?;
1068
1069        if response.status() == reqwest::StatusCode::NOT_FOUND {
1070            return Ok(Vec::new());
1071        }
1072        if !response.status().is_success() {
1073            let code = response.status();
1074            let body = response.text().await.unwrap_or_default();
1075            return Err(super::Error::BadStatus { url, code, body });
1076        }
1077
1078        response
1079            .json::<Vec<super::tool::ContentBlock>>()
1080            .await
1081            .map_err(|source| super::Error::Request { url, source })
1082    }
1083
1084    /// `POST <self.url>/notify` against the ObjectiveAI MCP proxy.
1085    /// Appends `blocks` to the proxy's pending-notifications queue for
1086    /// this session. Single-attempt — the caller decides whether to
1087    /// retry. A 404 (session unknown) surfaces as `SessionExpired`
1088    /// rather than `Ok(())` because the caller is asking for delivery
1089    /// and a lost session means delivery did not happen.
1090    async fn enqueue_notifications(
1091        &self,
1092        blocks: &[super::tool::ContentBlock],
1093    ) -> Result<(), super::Error> {
1094        let url = format!("{}/notify", self.url.trim_end_matches('/'));
1095        let request = self
1096            .http_client
1097            .post(&url)
1098            .timeout(self.call_timeout)
1099            .headers(
1100                self.build_request_headers(
1101                    Some("application/json"),
1102                    Some("application/json"),
1103                )
1104                .await,
1105            )
1106            .json(blocks);
1107
1108        let response =
1109            request
1110                .send()
1111                .await
1112                .map_err(|source| super::Error::Request {
1113                    url: url.clone(),
1114                    source,
1115                })?;
1116
1117        if response.status() == reqwest::StatusCode::NOT_FOUND {
1118            return Err(super::Error::SessionExpired { url });
1119        }
1120        if !response.status().is_success() {
1121            let code = response.status();
1122            let body = response.text().await.unwrap_or_default();
1123            return Err(super::Error::BadStatus { url, code, body });
1124        }
1125
1126        Ok(())
1127    }
1128
1129    /// `GET <self.url>/notify/queued` against the ObjectiveAI MCP proxy.
1130    /// Non-draining peek — returns `true` iff the proxy's
1131    /// pending-notifications queue for this session is non-empty.
1132    /// A 404 (session unknown) is mapped to `Ok(false)` to match the
1133    /// drain path's soft-fallback contract.
1134    async fn has_pending_notifications(&self) -> Result<bool, super::Error> {
1135        let url = format!("{}/notify/queued", self.url.trim_end_matches('/'));
1136        let request = self
1137            .http_client
1138            .get(&url)
1139            .timeout(self.call_timeout)
1140            .headers(
1141                self.build_request_headers(None, Some("application/json"))
1142                    .await,
1143            );
1144
1145        let response =
1146            request
1147                .send()
1148                .await
1149                .map_err(|source| super::Error::Request {
1150                    url: url.clone(),
1151                    source,
1152                })?;
1153
1154        if response.status() == reqwest::StatusCode::NOT_FOUND {
1155            return Ok(false);
1156        }
1157        if !response.status().is_success() {
1158            let code = response.status();
1159            let body = response.text().await.unwrap_or_default();
1160            return Err(super::Error::BadStatus { url, code, body });
1161        }
1162
1163        response
1164            .json::<bool>()
1165            .await
1166            .map_err(|source| super::Error::Request { url, source })
1167    }
1168
1169    /// Returns a key identifying this connection for tool namespacing.
1170    fn tool_key(&self) -> String {
1171        format!("{}-{}", self.initialize_result.server_info.name, self.url)
1172    }
1173
1174    /// Returns the session ID for this connection.
1175    fn session_id(&self) -> &str {
1176        &self.session_id
1177    }
1178
1179    /// Sends a `tools/list` RPC call for a single page.
1180    async fn rpc_list_tools(
1181        &self,
1182        cursor: Option<&str>,
1183    ) -> Result<super::tool::ListToolsResult, super::Error> {
1184        self.rpc(
1185            "tools/list",
1186            &super::tool::ListToolsRequest {
1187                cursor: cursor.map(String::from),
1188            },
1189            true,
1190        )
1191        .await
1192    }
1193
1194    /// Returns all tools from the server.
1195    ///
1196    /// Blocks until background pagination completes, then returns a
1197    /// cheap `Arc` clone of the result. If the cache is currently
1198    /// empty (e.g. because the listener detected its SSE stream drop
1199    /// and cleared it) this paginates inline against the upstream —
1200    /// the caller gets fresh data on the happy path, or the live
1201    /// upstream error if the connection is genuinely down, rather
1202    /// than stale pre-drop tools.
1203    async fn list_tools(
1204        &self,
1205    ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
1206        if !self.has_tools_cap() {
1207            return Ok(Arc::new(Vec::new()));
1208        }
1209        if let Some(cached) = self.tools.read().await.as_ref() {
1210            return cached.clone();
1211        }
1212        // Cache cleared; refresh inline. Concurrent callers may each
1213        // refresh — wasteful, but the proxy fans out across distinct
1214        // upstreams so a single Connection rarely sees concurrent
1215        // `list_tools` calls.
1216        self.refresh_tools(None).await;
1217        self.tools
1218            .read()
1219            .await
1220            .as_ref()
1221            .expect("refresh_tools installs Some")
1222            .clone()
1223    }
1224
1225    /// Calls a tool on the MCP server. The returned
1226    /// `CallToolResult.content` is **fully resolved**: every
1227    /// `ContentBlock::ResourceLink { uri }` whose URI appears in
1228    /// `list_resources` has already been replaced by one or more
1229    /// `ContentBlock::EmbeddedResource` blocks carrying the fetched
1230    /// contents (via `read_resource`). Unknown-URI links pass through
1231    /// untouched — the upstream server may have its own out-of-band
1232    /// resolution path the caller should preserve.
1233    ///
1234    /// Resolving inside `call_tool` (rather than downstream in
1235    /// `call_tool_as_message`) means every consumer of the result
1236    /// sees `EmbeddedResource` shapes uniformly; the stateless
1237    /// `From<ContentBlock>` impl is then enough to convert the whole
1238    /// result to `RichContent` with no further connection work.
1239    async fn call_tool(
1240        &self,
1241        params: &super::tool::CallToolRequestParams,
1242    ) -> Result<super::tool::CallToolResult, super::Error> {
1243        if !self.has_tools_cap() {
1244            return Err(super::Error::UnsupportedCapability {
1245                capability: "tools",
1246            });
1247        }
1248        // Mark the connection as deliberately used. Flipped at the top
1249        // of the method (not after success) because even a failed
1250        // `tools/call` may have mutated upstream state — we don't want
1251        // the drop-time orphan-DELETE second-guessing that.
1252        self.any_calls.store(true, Ordering::Relaxed);
1253        let mut result: super::tool::CallToolResult =
1254            self.rpc("tools/call", params, false).await?;
1255
1256        // Build the known-resource URI set for ResourceLink
1257        // resolution. `list_resources` failure → empty set (same
1258        // safe fallback the resolution path used previously).
1259        let known_uris: std::collections::HashSet<String> =
1260            match self.list_resources().await {
1261                Ok(rs) => rs.iter().map(|r| r.uri.clone()).collect(),
1262                Err(_) => std::collections::HashSet::new(),
1263            };
1264
1265        // Walk the blocks, replacing each resolvable ResourceLink
1266        // with one EmbeddedResource per returned ResourceContentsUnion.
1267        // Everything else (Text, Image, Audio, EmbeddedResource,
1268        // unknown-URI ResourceLinks) passes through.
1269        let mut resolved: Vec<super::tool::ContentBlock> =
1270            Vec::with_capacity(result.content.len());
1271        for block in std::mem::take(&mut result.content) {
1272            match block {
1273                super::tool::ContentBlock::ResourceLink(link)
1274                    if known_uris.contains(&link.uri) =>
1275                {
1276                    let read = self.read_resource(&link.uri).await?;
1277                    for contents in read.contents {
1278                        resolved.push(
1279                            super::tool::ContentBlock::EmbeddedResource(
1280                                super::tool::EmbeddedResource {
1281                                    resource: contents,
1282                                    // ResourceLink's annotations
1283                                    // don't have a perfect home on
1284                                    // EmbeddedResource — both fields
1285                                    // exist but they describe different
1286                                    // shapes (the link vs the inlined
1287                                    // contents). Drop them on the way
1288                                    // in; the EmbeddedResource is now
1289                                    // a fresh authoritative block.
1290                                    annotations: None,
1291                                    _meta: None,
1292                                },
1293                            ),
1294                        );
1295                    }
1296                }
1297                other => resolved.push(other),
1298            }
1299        }
1300        result.content = resolved;
1301        Ok(result)
1302    }
1303
1304    /// Calls a tool and converts the (already-resolved) result into a
1305    /// [`ToolMessage`]. Resource resolution happens inside
1306    /// [`Self::call_tool`] — by the time we get the blocks here every
1307    /// resolvable `ResourceLink` has already been replaced with an
1308    /// `EmbeddedResource`, so the conversion is a pure stateless
1309    /// element-wise map through [`From<ContentBlock> for
1310    /// RichContentPart`](crate::agent::completions::message::RichContentPart).
1311    ///
1312    /// Content-block mapping (handled by the `From` impl):
1313    /// - `text` → text part
1314    /// - `image` → image_url part (data URL)
1315    /// - `audio` → input_audio part
1316    /// - `embedded_resource` (text) → text part
1317    /// - `embedded_resource` (blob, image mime) → image_url part
1318    /// - `embedded_resource` (blob, audio mime) → input_audio part
1319    /// - `embedded_resource` (blob, video mime) → input_video part
1320    /// - `embedded_resource` (blob, other mime) → file part
1321    /// - `resource_link` (unknown URI) → JSON-text fallback
1322    ///   (resolvable URIs were already resolved upstream)
1323    async fn call_tool_as_message(
1324        &self,
1325        params: &super::tool::CallToolRequestParams,
1326        tool_call_id: String,
1327    ) -> Result<crate::agent::completions::message::ToolMessage, super::Error>
1328    {
1329        use crate::agent::completions::message::{
1330            RichContentPart, ToolMessage, ToolResponseMetadata,
1331        };
1332
1333        let result = self.call_tool(params).await?;
1334
1335        let parts: Vec<RichContentPart> =
1336            result.content.into_iter().map(Into::into).collect();
1337
1338        // Lossy-decode the MCP `_meta` extension bag into our typed
1339        // `ToolResponseMetadata`. Unknown keys (set by non-objectiveai
1340        // upstreams) are silently dropped. Decoding failure leaves
1341        // metadata as `None`.
1342        let metadata = result._meta.as_ref().and_then(|m| {
1343            serde_json::from_value::<ToolResponseMetadata>(
1344                serde_json::to_value(m).ok()?,
1345            )
1346            .ok()
1347        });
1348
1349        Ok(ToolMessage {
1350            content: parts.into(),
1351            tool_call_id,
1352            metadata,
1353        })
1354    }
1355
1356    /// Sends a `resources/list` RPC call for a single page.
1357    async fn rpc_list_resources(
1358        &self,
1359        cursor: Option<&str>,
1360    ) -> Result<super::resource::ListResourcesResult, super::Error> {
1361        self.rpc(
1362            "resources/list",
1363            &super::resource::ListResourcesRequest {
1364                cursor: cursor.map(String::from),
1365            },
1366            true,
1367        )
1368        .await
1369    }
1370
1371    /// Returns all resources from the server.
1372    ///
1373    /// Same cache-or-refresh semantics as [`Self::list_tools`]: a
1374    /// cleared cache (post-drop or pre-populate) triggers an inline
1375    /// paginate against the upstream.
1376    async fn list_resources(
1377        &self,
1378    ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
1379        if !self.has_resources_cap() {
1380            return Ok(Arc::new(Vec::new()));
1381        }
1382        if let Some(cached) = self.resources.read().await.as_ref() {
1383            return cached.clone();
1384        }
1385        self.refresh_resources(None).await;
1386        self.resources
1387            .read()
1388            .await
1389            .as_ref()
1390            .expect("refresh_resources installs Some")
1391            .clone()
1392    }
1393
1394    /// Returns the cached tool list as soon as it differs from `current`,
1395    /// or — if it equals `current` right now — waits up to `timeout` for
1396    /// the cache to change and then returns whatever it sees.
1397    ///
1398    /// An `Err` cache is treated as "different from any caller snapshot"
1399    /// and returned immediately.
1400    ///
1401    /// Concurrency-safe: any number of concurrent subscribers wait on
1402    /// independent `Notified` futures and read the cache through the
1403    /// shared `RwLock`. A timeout that fires alone is not an error — we
1404    /// re-read the cache and return whatever's there.
1405    async fn subscribe_tools(
1406        &self,
1407        current: &[super::tool::Tool],
1408        timeout: Duration,
1409    ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
1410        if !self.has_tools_list_changed() {
1411            // Server can't push `notifications/tools/list_changed` —
1412            // awaiting the notify is unreachable. Return whatever's
1413            // there right now (`Ok(empty)` on a tools-cap-absent
1414            // server via `list_tools`'s own gate; the real cache
1415            // otherwise).
1416            return self.list_tools().await;
1417        }
1418        // Arm BEFORE reading. `enable()` registers the future in the
1419        // wait queue without polling, so a `notify_waiters` racing
1420        // between our read and our await still wakes us.
1421        let notified = self.tools_changed.notified();
1422        tokio::pin!(notified);
1423        notified.as_mut().enable();
1424
1425        // `list_tools` handles a cleared cache (post-drop) by
1426        // paginating inline. A `None` initial state can't be
1427        // compared to the caller's snapshot meaningfully — promote
1428        // to whatever the refresh installs.
1429        let initial = self.list_tools().await;
1430        match &initial {
1431            Ok(arc) if arc.as_slice() == current => {}
1432            _ => return initial,
1433        }
1434
1435        let _ = tokio::time::timeout(timeout, notified).await;
1436
1437        self.list_tools().await
1438    }
1439
1440    /// Resource counterpart of [`Self::subscribe_tools`].
1441    async fn subscribe_resources(
1442        &self,
1443        current: &[super::resource::Resource],
1444        timeout: Duration,
1445    ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
1446        if !self.has_resources_list_changed() {
1447            // Symmetric to `subscribe_tools` — see that gate.
1448            return self.list_resources().await;
1449        }
1450        let notified = self.resources_changed.notified();
1451        tokio::pin!(notified);
1452        notified.as_mut().enable();
1453
1454        let initial = self.list_resources().await;
1455        match &initial {
1456            Ok(arc) if arc.as_slice() == current => {}
1457            _ => return initial,
1458        }
1459
1460        let _ = tokio::time::timeout(timeout, notified).await;
1461
1462        self.list_resources().await
1463    }
1464
1465    /// Reads a resource from the MCP server.
1466    async fn read_resource(
1467        &self,
1468        uri: &str,
1469    ) -> Result<super::resource::ReadResourceResult, super::Error> {
1470        if !self.has_resources_cap() {
1471            return Err(super::Error::UnsupportedCapability {
1472                capability: "resources",
1473            });
1474        }
1475        // Mark the connection as deliberately used (same reasoning as
1476        // `call_tool` — see the drop-time orphan-DELETE gate).
1477        self.any_calls.store(true, Ordering::Relaxed);
1478        self.rpc(
1479            "resources/read",
1480            &super::resource::ReadResourceRequestParams {
1481                uri: uri.to_string(),
1482            },
1483            true,
1484        )
1485        .await
1486    }
1487
1488    /// Re-fetches all tools from the server, replacing the cached list.
1489    ///
1490    /// Optionally fires `on_change` *after* the write lock is acquired but
1491    /// *before* the network paginate begins, so the callback observes the
1492    /// "list change is in flight" edge — readers blocked on the read lock
1493    /// won't return until the new list lands. The proxy uses this to
1494    /// re-emit `notifications/tools/list_changed` to its downstream client
1495    /// at the moment the staleness window opens.
1496    async fn refresh_tools(&self, on_change: Option<ListChangedCallback>) {
1497        if !self.has_tools_cap() {
1498            // No tools capability — install an empty Vec so the cache
1499            // contract holds (`list_tools`'s `.expect("refresh_tools
1500            // installs Some")` etc.) and return without paginating or
1501            // signalling. No `notify_waiters` and no `on_change` —
1502            // nothing real changed.
1503            let mut guard = self.tools.write().await;
1504            *guard = Some(Ok(Arc::new(Vec::new())));
1505            return;
1506        }
1507        // Listener-driven refresh. Visibility contract: any caller
1508        // that issues `list_tools()` after a `tools/list_changed`
1509        // notification has been observed must see the post-swap
1510        // value, not stale data — so the write lock has to gate
1511        // readers across the upstream paginate.
1512        //
1513        // Performance contract: don't serialise paginate *behind*
1514        // lock-acquisition latency. We start `tools.write()` and the
1515        // upstream paginate **concurrently** with `tokio::join!`. The
1516        // write-lock acquire blocks new `list_tools()` readers
1517        // immediately (preserving visibility) and runs in parallel
1518        // with whatever drain time the in-flight readers need; the
1519        // paginate runs alongside. Total wall-clock is
1520        // `max(drain_time, paginate_time)` instead of the sum.
1521        //
1522        // `notify_waiters` and `on_change` fire under the write
1523        // guard, *after* `*guard = result`, so anyone awoken by them
1524        // queues on the read lock, waits for the guard to drop, and
1525        // observes the post-swap state.
1526        let (mut guard, result) =
1527            tokio::join!(self.tools.write(), self.paginate_tools(),);
1528        *guard = Some(result);
1529        self.tools_changed.notify_waiters();
1530        if let Some(cb) = on_change {
1531            cb();
1532        }
1533    }
1534
1535    /// Page-by-page fetch of the upstream tool list, no locks held.
1536    /// Shared between the `_signaling` (initial-populate, holds lock
1537    /// for the original "block fast readers" contract) and `refresh_*`
1538    /// (listener-driven, lock-only-around-install) variants.
1539    async fn paginate_tools(
1540        &self,
1541    ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
1542        let mut all_tools = Vec::new();
1543        let mut cursor: Option<String> = None;
1544        loop {
1545            match self.rpc_list_tools(cursor.as_deref()).await {
1546                Ok(page) => {
1547                    all_tools.extend(page.tools);
1548                    cursor = page.next_cursor;
1549                    if cursor.is_none() {
1550                        return Ok(Arc::new(all_tools));
1551                    }
1552                }
1553                Err(e) => return Err(Arc::new(e)),
1554            }
1555        }
1556    }
1557
1558    /// Same as [`Self::refresh_tools`] but fires `lock_held` once the
1559    /// write lock has been acquired so the caller can synchronise on
1560    /// "writer is in possession of the cache" before returning. Used by
1561    /// `ConnectionInner::new` to prevent a fast reader from acquiring
1562    /// the read lock before this writer has even started.
1563    ///
1564    /// **Caller invariant:** the spawn site in `ConnectionInner::new`
1565    /// must gate this call on `capabilities.tools.is_some()`. This
1566    /// method assumes the tools capability is present and issues
1567    /// `tools/list` RPCs unconditionally — running it against a
1568    /// no-tools server triggers the 15-min idempotent-backoff storm.
1569    async fn refresh_tools_signaling(
1570        &self,
1571        lock_held: tokio::sync::oneshot::Sender<()>,
1572        on_change: Option<ListChangedCallback>,
1573    ) {
1574        let mut guard = self.tools.write().await;
1575        // Fire `tools_changed` while we hold the write lock and *before*
1576        // installing the new list. Any subscriber woken now must take a
1577        // read lock to observe the result, and that read lock is queued
1578        // behind this write guard — so they always see the post-swap
1579        // state, never mid-swap.
1580        self.tools_changed.notify_waiters();
1581        let _ = lock_held.send(());
1582        if let Some(cb) = on_change {
1583            cb();
1584        }
1585        let mut all_tools = Vec::new();
1586        let mut cursor: Option<String> = None;
1587        let result = loop {
1588            match self.rpc_list_tools(cursor.as_deref()).await {
1589                Ok(page) => {
1590                    all_tools.extend(page.tools);
1591                    cursor = page.next_cursor;
1592                    if cursor.is_none() {
1593                        break Ok(Arc::new(all_tools));
1594                    }
1595                }
1596                Err(e) => break Err(Arc::new(e)),
1597            }
1598        };
1599        *guard = Some(result);
1600    }
1601
1602    /// Re-fetches all resources from the server, replacing the cached list.
1603    /// See [`ConnectionInner::refresh_tools`] for the callback timing
1604    /// contract.
1605    async fn refresh_resources(&self, on_change: Option<ListChangedCallback>) {
1606        if !self.has_resources_cap() {
1607            // Symmetric to `refresh_tools` — see that gate.
1608            let mut guard = self.resources.write().await;
1609            *guard = Some(Ok(Arc::new(Vec::new())));
1610            return;
1611        }
1612        // Same paginate-while-acquiring-the-write-lock pattern as
1613        // `refresh_tools` — see that comment for the visibility +
1614        // performance rationale.
1615        let (mut guard, result) =
1616            tokio::join!(self.resources.write(), self.paginate_resources(),);
1617        *guard = Some(result);
1618        self.resources_changed.notify_waiters();
1619        if let Some(cb) = on_change {
1620            cb();
1621        }
1622    }
1623
1624    async fn paginate_resources(
1625        &self,
1626    ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
1627        let mut all_resources = Vec::new();
1628        let mut cursor: Option<String> = None;
1629        loop {
1630            match self.rpc_list_resources(cursor.as_deref()).await {
1631                Ok(page) => {
1632                    all_resources.extend(page.resources);
1633                    cursor = page.next_cursor;
1634                    if cursor.is_none() {
1635                        return Ok(Arc::new(all_resources));
1636                    }
1637                }
1638                Err(e) => return Err(Arc::new(e)),
1639            }
1640        }
1641    }
1642
1643    /// Resource counterpart of [`Self::refresh_tools_signaling`]. The
1644    /// same spawn-site-gate invariant applies: the caller must gate
1645    /// the spawn on `capabilities.resources.is_some()`.
1646    async fn refresh_resources_signaling(
1647        &self,
1648        lock_held: tokio::sync::oneshot::Sender<()>,
1649        on_change: Option<ListChangedCallback>,
1650    ) {
1651        let mut guard = self.resources.write().await;
1652        // See `refresh_tools_signaling` — fire under the write lock,
1653        // before install, so subscribers' next read sees the post-swap
1654        // state.
1655        self.resources_changed.notify_waiters();
1656        let _ = lock_held.send(());
1657        if let Some(cb) = on_change {
1658            cb();
1659        }
1660        let mut all_resources = Vec::new();
1661        let mut cursor: Option<String> = None;
1662        let result = loop {
1663            match self.rpc_list_resources(cursor.as_deref()).await {
1664                Ok(page) => {
1665                    all_resources.extend(page.resources);
1666                    cursor = page.next_cursor;
1667                    if cursor.is_none() {
1668                        break Ok(Arc::new(all_resources));
1669                    }
1670                }
1671                Err(e) => break Err(Arc::new(e)),
1672            }
1673        };
1674        *guard = Some(result);
1675    }
1676
1677    /// Builds a GET request to the MCP endpoint for receiving server
1678    /// notifications via SSE.
1679    async fn get(&self) -> reqwest::RequestBuilder {
1680        self.http_client
1681            .get(&self.url)
1682            .headers(
1683                self.build_request_headers(None, Some("text/event-stream"))
1684                    .await,
1685            )
1686    }
1687
1688    /// Listens for `notifications/tools/list_changed` and
1689    /// `notifications/resources/list_changed` on an SSE stream. On each
1690    /// notification, write-locks and re-fetches the full list.
1691    ///
1692    /// `initial_lines` is the pre-opened SSE line reader handed in by
1693    /// [`Client::connect`](super::Client::connect) — that stream is
1694    /// consumed first. When it ends (or any later GET reconnect ends),
1695    /// we sleep `backoff_initial_interval` and open a fresh GET `/` SSE
1696    /// stream.
1697    ///
1698    /// Takes a [`Weak<Self>`] (not `Arc<Self>`) so the spawned task
1699    /// doesn't itself keep the [`Connection`] alive, and a
1700    /// [`CancellationToken`] sibling clone of the connection's
1701    /// [`DropGuard`] so the task tears down the instant the last
1702    /// external `Arc<ConnectionInner>` clone is dropped — every
1703    /// blocking await (line read, reconnect send, backoff sleep) is
1704    /// raced against `cancel.cancelled()` and exits without any zombie
1705    /// retries against a now-dead session.
1706    async fn listen_for_list_changes(
1707        weak: Weak<Self>,
1708        cancel: CancellationToken,
1709        initial_lines: super::LinesStream,
1710    ) {
1711        // First iteration: use the pre-opened SSE stream the client
1712        // handed us. After that, fall back to opening fresh GET / SSE
1713        // streams as the upstream connection cycles.
1714        let mut next_lines: Option<super::LinesStream> = Some(initial_lines);
1715        // One-shot guard for the catch-up refresh: false on the very
1716        // first iteration (the caller's pre-opened SSE stream — its
1717        // associated cache was just populated by `Client::connect`'s
1718        // initial pagination, so re-fetching there would just be a
1719        // wasted round-trip), true thereafter. Every stream end —
1720        // whether `Ok(None)` (clean close) or `Err(_)` (read failure)
1721        // — drops back here, which we treat as an implicit
1722        // list-changed notification: the upstream's broadcast (in
1723        // particular the proxy's per-session `tokio::broadcast`) is
1724        // lossy for moments when this listener has zero active
1725        // subscribers, so anything that fired during our disconnect
1726        // window may have been dropped.
1727        //
1728        // ORDER MATTERS. The refresh must run AFTER we've re-opened
1729        // the GET / SSE stream — i.e. after we're a subscriber again
1730        // — and BEFORE we enter the inner read loop. If we refreshed
1731        // before the resubscribe, a notification that fired between
1732        // our refresh-completion and our subscribe would be lost the
1733        // same way as the original disconnect-window drops; doing it
1734        // after means a notification fired DURING the refresh lands
1735        // in the new subscriber's buffer (broadcast::Sender::send
1736        // backs onto each receiver's channel-capacity slot) and gets
1737        // consumed by the inner loop on its next read.
1738        let mut is_reconnect = false;
1739
1740        loop {
1741            // The token cancels deterministically when the last
1742            // `Arc<ConnectionInner>` clone is dropped (see
1743            // `_listener_cancel_guard`). Check once per outer
1744            // iteration, but the real protection is the cancel arms in
1745            // every blocking await below — those exit immediately on
1746            // cancel.
1747            if cancel.is_cancelled() {
1748                return;
1749            }
1750            let Some(this) = weak.upgrade() else { return };
1751            let backoff_delay = this.backoff_initial_interval;
1752
1753            let mut lines = match next_lines.take() {
1754                Some(l) => l,
1755                None => {
1756                    // Race the upstream GET against cancellation — if
1757                    // the connection drops mid-reconnect, exit
1758                    // immediately rather than waiting for the request
1759                    // to complete or time out (otherwise produces a
1760                    // burst of 401 retries against a now-dead session
1761                    // under heavy churn).
1762                    let send_outcome = tokio::select! {
1763                        out = async {
1764                            this.get().await.send().await
1765                        } => out,
1766                        _ = cancel.cancelled() => {
1767                            drop(this);
1768                            return;
1769                        }
1770                    };
1771                    let response = match send_outcome {
1772                        Ok(r) if r.status().is_success() => r,
1773                        _ => {
1774                            drop(this);
1775                            // Sleep with cancel-arm: instant exit on
1776                            // drop, no zombie retries.
1777                            tokio::select! {
1778                                _ = tokio::time::sleep(backoff_delay) => {}
1779                                _ = cancel.cancelled() => return,
1780                            }
1781                            continue;
1782                        }
1783                    };
1784                    super::lines_from_response(response)
1785                }
1786            };
1787
1788            // Catch-up refresh on every reconnect — the implicit
1789            // list-changed treatment for the just-failed stream. See
1790            // the `is_reconnect` doc-comment above for the
1791            // refresh-AFTER-resubscribe rationale.
1792            if is_reconnect {
1793                // tools and resources are independent locks; run the
1794                // catch-up refreshes concurrently so disconnect
1795                // recovery isn't sequential.
1796                let _ = tokio::join!(
1797                    this.refresh_tools(this.on_tools_list_changed.get()),
1798                    this.refresh_resources(
1799                        this.on_resources_list_changed.get()
1800                    ),
1801                );
1802            }
1803            is_reconnect = true;
1804
1805            'inner: loop {
1806                tokio::select! {
1807                    line_result = lines.next_line() => {
1808                        match line_result {
1809                            Ok(Some(line)) => {
1810                                // SSE data lines start with "data: ".
1811                                let Some(data) = line.strip_prefix("data: ") else {
1812                                    continue 'inner;
1813                                };
1814                                let method = match serde_json::from_str::<super::JsonRpcNotification>(data) {
1815                                    Ok(n) => n.method,
1816                                    Err(_) => continue 'inner,
1817                                };
1818                                match method.as_str() {
1819                                    "notifications/tools/list_changed" => {
1820                                        // refresh_tools fires the
1821                                        // callback after the cache is
1822                                        // installed, so the proxy's
1823                                        // downstream
1824                                        // notifications/tools/list_changed
1825                                        // emission lines up with the
1826                                        // staleness window opening.
1827                                        this.refresh_tools(
1828                                            this.on_tools_list_changed.get(),
1829                                        )
1830                                        .await;
1831                                    }
1832                                    "notifications/resources/list_changed" => {
1833                                        this.refresh_resources(
1834                                            this.on_resources_list_changed.get(),
1835                                        )
1836                                        .await;
1837                                    }
1838                                    _ => {}
1839                                }
1840                            }
1841                            // Stream ended cleanly or errored — break out
1842                            // to the outer loop so we either reconnect or,
1843                            // if everyone's gone, exit at the top.
1844                            _ => break 'inner,
1845                        }
1846                    }
1847                    // Cancellation: the connection's last clone has
1848                    // dropped. Tear down immediately.
1849                    _ = cancel.cancelled() => {
1850                        drop(this);
1851                        return;
1852                    }
1853                }
1854            }
1855
1856            // Stream dropped — empty the per-Connection caches so the
1857            // next `list_tools` / `list_resources` paginates inline
1858            // against the (possibly still-dead) upstream rather than
1859            // returning whatever was cached before the drop. The
1860            // `is_reconnect` catch-up at the top of the next outer
1861            // iteration will repopulate `Some(_)` if the reconnect
1862            // succeeds; if the reconnect keeps failing, the cache
1863            // stays `None` and `list_*` callers paginate themselves.
1864            *this.tools.write().await = None;
1865            *this.resources.write().await = None;
1866
1867            // Stream ended — drop the strong ref before sleeping so the
1868            // next iteration's weak-upgrade can detect liveness honestly.
1869            drop(this);
1870            tokio::select! {
1871                _ = tokio::time::sleep(backoff_delay) => {}
1872                _ = cancel.cancelled() => return,
1873            }
1874        }
1875    }
1876}
1877
1878impl Drop for ConnectionInner {
1879    /// Orphan-DELETE hook: when a **freshly-minted** connection (not a
1880    /// resume) is dropped without any deliberate use — no `call_tool`,
1881    /// no `read_resource`, no explicit `Connection::delete` — spawn a
1882    /// fire-and-forget HTTP DELETE so the upstream session we just
1883    /// opened doesn't sit there accruing per-session state for nothing.
1884    ///
1885    /// Reconnect-resumes are deliberately excluded: a reconnect's
1886    /// `any_calls == false` only means *this* `Connection` instance
1887    /// never called anything — the underlying upstream session may
1888    /// well have been used by an earlier `Connection` that opened it,
1889    /// did real work, and let us re-attach. Tearing it down here would
1890    /// kill a still-live session out from under whoever owns it.
1891    /// Reconnects rely on the proxy's explicit `Connection::delete`
1892    /// (or `Client::delete`) for upstream cleanup instead.
1893    ///
1894    /// Skip conditions (any one triggers a no-op):
1895    ///
1896    /// - `mock` is `true` — there was never an HTTP session to begin with.
1897    /// - `is_reconnect` is `true` — see above; the upstream session
1898    ///   pre-existed this connection and isn't ours to tear down on drop.
1899    /// - `any_calls` is `true` — the connection was deliberately used,
1900    ///   or an explicit `Connection::delete` already ran.
1901    /// - No tokio runtime is in scope — `tokio::spawn` would panic.
1902    ///   Silently leak the upstream session in this case (sync
1903    ///   teardown paths e.g. `cfg(test)` blocks not driven by tokio).
1904    ///
1905    /// The listener-cancel `DropGuard` inside `_listener_cancel_guard`
1906    /// fires automatically as part of this same `drop` call, so by the
1907    /// time the orphan DELETE goes out the listener task has already
1908    /// been told to cancel — no SSE/GET race with the upstream DELETE.
1909    fn drop(&mut self) {
1910        if self.is_reconnect {
1911            return;
1912        }
1913        if self.any_calls.load(Ordering::Relaxed) {
1914            return;
1915        }
1916
1917        // Clone out the bits the orphan task needs. None of these are
1918        // big: `reqwest::Client` is itself an `Arc` bump,
1919        // `IndexMap<String, String>` is the per-connection header bag
1920        // (small), and the `String`s are the per-session id + URL.
1921        let http_client = self.http_client.clone();
1922        let url = self.url.clone();
1923        let session_id = self.session_id.clone();
1924        let headers = self.headers.clone();
1925        let timeout = self.call_timeout;
1926
1927        // Spawn only if a tokio runtime is in scope. `tokio::spawn`
1928        // panics outside one — sync teardown paths (e.g. a test that
1929        // builds a `Connection` and lets it drop on a non-async stack)
1930        // would crash without this guard.
1931        if let Ok(handle) = tokio::runtime::Handle::try_current() {
1932            handle.spawn(orphan_delete(
1933                http_client,
1934                url,
1935                session_id,
1936                headers,
1937                timeout,
1938            ));
1939        }
1940    }
1941}
1942
1943/// Fire-and-forget HTTP `DELETE` used by [`ConnectionInner::drop`] to
1944/// release a resumed upstream session that was never used. Mirrors
1945/// [`super::Connection::delete`]'s wire shape (same `Mcp-Session-Id`
1946/// header behavior, same header-loop with the explicit session id
1947/// winning over any `Mcp-Session-Id` entry in `headers`) but never
1948/// surfaces errors — there's no caller left to surface them to. The
1949/// `timeout` (sourced from the originating connection's `call_timeout`)
1950/// caps the request so a hanging upstream can't keep the spawned task
1951/// alive forever.
1952async fn orphan_delete(
1953    http_client: reqwest::Client,
1954    url: String,
1955    session_id: String,
1956    headers: IndexMap<String, String>,
1957    timeout: Duration,
1958) {
1959    let mut request = http_client
1960        .delete(&url)
1961        .timeout(timeout)
1962        .header("Mcp-Session-Id", &session_id);
1963    for (name, value) in &headers {
1964        if name.eq_ignore_ascii_case("Mcp-Session-Id") {
1965            continue;
1966        }
1967        request = request.header(name, value);
1968    }
1969    // Errors silently swallowed: no caller, and the listener-cancel
1970    // guard has already fired (it runs as part of the regular
1971    // `_listener_cancel_guard` drop inside the same `drop` call).
1972    let _ = request.send().await;
1973}
1974
1975#[cfg(test)]
1976mod capability_gate_tests {
1977    use super::*;
1978    use crate::mcp::initialize_result::{
1979        ResourcesCapability, ServerCapabilities, ToolsCapability,
1980    };
1981    use crate::mcp::tool::{
1982        CallToolRequestParams, Tool, ToolSchemaObject, ToolSchemaType,
1983    };
1984
1985    /// Builds a `ServerCapabilities` with the given `tools` / `resources`
1986    /// shapes and every other capability set to `None`. Each gate test
1987    /// passes its own combination to exercise a specific cap-absent
1988    /// branch.
1989    fn caps(
1990        tools: Option<ToolsCapability>,
1991        resources: Option<ResourcesCapability>,
1992    ) -> ServerCapabilities {
1993        ServerCapabilities {
1994            experimental: None,
1995            logging: None,
1996            completions: None,
1997            prompts: None,
1998            resources,
1999            tools,
2000            tasks: None,
2001        }
2002    }
2003
2004    fn tool(name: &str) -> Tool {
2005        Tool {
2006            name: name.to_string(),
2007            title: None,
2008            description: None,
2009            icons: None,
2010            input_schema: ToolSchemaObject {
2011                r#type: ToolSchemaType::Object,
2012                properties: None,
2013                required: None,
2014                extra: IndexMap::new(),
2015            },
2016            output_schema: None,
2017            annotations: None,
2018            execution: None,
2019            _meta: None,
2020        }
2021    }
2022
2023    /// 3.1 — `list_tools` short-circuits to `Ok(empty)` when the server
2024    /// declared no `tools` capability, *before* the cache is consulted.
2025    /// We poison the cache with `Err` to prove the gate fires first.
2026    #[tokio::test]
2027    async fn list_tools_returns_empty_when_tools_cap_absent() {
2028        let conn = Connection::new_for_test_with_caps(
2029            "t".into(),
2030            "http://x".into(),
2031            caps(None, None),
2032        );
2033        let err = super::super::Error::NoSessionId {
2034            url: "http://x".into(),
2035            body: String::new(),
2036        };
2037        *conn.inner.tools.write().await = Some(Err(Arc::new(err)));
2038
2039        let got = conn.list_tools().await.unwrap();
2040        assert!(got.is_empty());
2041    }
2042
2043    /// 3.2 — symmetric to 3.1 for `list_resources`.
2044    #[tokio::test]
2045    async fn list_resources_returns_empty_when_resources_cap_absent() {
2046        let conn = Connection::new_for_test_with_caps(
2047            "t".into(),
2048            "http://x".into(),
2049            caps(None, None),
2050        );
2051        let err = super::super::Error::NoSessionId {
2052            url: "http://x".into(),
2053            body: String::new(),
2054        };
2055        *conn.inner.resources.write().await = Some(Err(Arc::new(err)));
2056
2057        let got = conn.list_resources().await.unwrap();
2058        assert!(got.is_empty());
2059    }
2060
2061    /// 3.3 — `read_resource` errors with `UnsupportedCapability` when
2062    /// the server declared no `resources` capability, without hitting
2063    /// the network.
2064    #[tokio::test]
2065    async fn read_resource_errors_when_resources_cap_absent() {
2066        let conn = Connection::new_for_test_with_caps(
2067            "t".into(),
2068            "http://x".into(),
2069            caps(None, None),
2070        );
2071        let got = conn.read_resource("file://nope").await;
2072        assert!(matches!(
2073            got,
2074            Err(super::super::Error::UnsupportedCapability {
2075                capability: "resources"
2076            })
2077        ));
2078    }
2079
2080    /// 3.4 — `call_tool` errors with `UnsupportedCapability` when the
2081    /// server declared no `tools` capability.
2082    #[tokio::test]
2083    async fn call_tool_errors_when_tools_cap_absent() {
2084        let conn = Connection::new_for_test_with_caps(
2085            "t".into(),
2086            "http://x".into(),
2087            caps(None, None),
2088        );
2089        let params = CallToolRequestParams {
2090            name: "any".into(),
2091            arguments: None,
2092            _meta: None,
2093            task: None,
2094        };
2095        let got = conn.call_tool(&params).await;
2096        assert!(matches!(
2097            got,
2098            Err(super::super::Error::UnsupportedCapability {
2099                capability: "tools"
2100            })
2101        ));
2102    }
2103
2104    /// 3.5 — `refresh_tools` installs `Some(Ok(empty))` and returns
2105    /// without paginating when the server declared no `tools`
2106    /// capability. Clearing the cache first proves the install ran.
2107    #[tokio::test]
2108    async fn refresh_tools_installs_empty_when_tools_cap_absent() {
2109        let conn = Connection::new_for_test_with_caps(
2110            "t".into(),
2111            "http://x".into(),
2112            caps(None, None),
2113        );
2114        *conn.inner.tools.write().await = None;
2115
2116        conn.inner.refresh_tools(None).await;
2117
2118        let guard = conn.inner.tools.read().await;
2119        let v = guard
2120            .as_ref()
2121            .expect("refresh installed Some")
2122            .as_ref()
2123            .expect("refresh installed Ok");
2124        assert!(v.is_empty());
2125    }
2126
2127    /// 3.6 — symmetric to 3.5 for `refresh_resources`.
2128    #[tokio::test]
2129    async fn refresh_resources_installs_empty_when_resources_cap_absent() {
2130        let conn = Connection::new_for_test_with_caps(
2131            "t".into(),
2132            "http://x".into(),
2133            caps(None, None),
2134        );
2135        *conn.inner.resources.write().await = None;
2136
2137        conn.inner.refresh_resources(None).await;
2138
2139        let guard = conn.inner.resources.read().await;
2140        let v = guard
2141            .as_ref()
2142            .expect("refresh installed Some")
2143            .as_ref()
2144            .expect("refresh installed Ok");
2145        assert!(v.is_empty());
2146    }
2147
2148    /// 3.7 — `subscribe_tools` returns immediately (no wait-for-notify)
2149    /// when the server declared `tools` but not
2150    /// `tools.list_changed: Some(true)`. We populate the cache with a
2151    /// non-empty list, pass a long timeout, and expect a fast return
2152    /// with the cache contents.
2153    #[tokio::test]
2154    async fn subscribe_tools_short_circuits_when_list_changed_absent() {
2155        let conn = Connection::new_for_test_with_caps(
2156            "t".into(),
2157            "http://x".into(),
2158            caps(
2159                Some(ToolsCapability { list_changed: None }),
2160                None,
2161            ),
2162        );
2163        *conn.inner.tools.write().await =
2164            Some(Ok(Arc::new(vec![tool("a")])));
2165
2166        let start = std::time::Instant::now();
2167        let got = conn
2168            .subscribe_tools(&[tool("a")], Duration::from_secs(5))
2169            .await
2170            .unwrap();
2171        assert!(
2172            start.elapsed() < Duration::from_millis(100),
2173            "elapsed: {:?}",
2174            start.elapsed()
2175        );
2176        assert_eq!(got.as_slice(), &[tool("a")]);
2177    }
2178
2179    /// 3.8 — symmetric to 3.7 for `subscribe_resources`.
2180    #[tokio::test]
2181    async fn subscribe_resources_short_circuits_when_list_changed_absent() {
2182        use crate::mcp::resource::Resource;
2183        let conn = Connection::new_for_test_with_caps(
2184            "t".into(),
2185            "http://x".into(),
2186            caps(
2187                None,
2188                Some(ResourcesCapability {
2189                    subscribe: None,
2190                    list_changed: None,
2191                }),
2192            ),
2193        );
2194        let res = Resource {
2195            uri: "file://a".into(),
2196            name: "a".into(),
2197            title: None,
2198            description: None,
2199            mime_type: None,
2200            annotations: None,
2201            icons: None,
2202            _meta: None,
2203        };
2204        *conn.inner.resources.write().await =
2205            Some(Ok(Arc::new(vec![res.clone()])));
2206
2207        let start = std::time::Instant::now();
2208        let got = conn
2209            .subscribe_resources(&[res.clone()], Duration::from_secs(5))
2210            .await
2211            .unwrap();
2212        assert!(
2213            start.elapsed() < Duration::from_millis(100),
2214            "elapsed: {:?}",
2215            start.elapsed()
2216        );
2217        assert_eq!(got.as_slice(), &[res]);
2218    }
2219}
2220
2221#[cfg(test)]
2222mod subscribe_tests {
2223    use super::*;
2224    use crate::mcp::tool::{Tool, ToolSchemaObject, ToolSchemaType};
2225
2226    fn tool(name: &str) -> Tool {
2227        Tool {
2228            name: name.to_string(),
2229            title: None,
2230            description: None,
2231            icons: None,
2232            input_schema: ToolSchemaObject {
2233                r#type: ToolSchemaType::Object,
2234                properties: None,
2235                required: None,
2236                extra: IndexMap::new(),
2237            },
2238            output_schema: None,
2239            annotations: None,
2240            execution: None,
2241            _meta: None,
2242        }
2243    }
2244
2245    /// First read shows a different list — return immediately, never wait.
2246    #[tokio::test]
2247    async fn subscribe_tools_returns_immediately_when_cache_differs() {
2248        let conn = Connection::new_for_test("t".into(), "http://x".into());
2249        *conn.inner.tools.write().await = Some(Ok(Arc::new(vec![tool("a")])));
2250
2251        let start = std::time::Instant::now();
2252        let got = conn
2253            .subscribe_tools(&[tool("b")], Duration::from_secs(5))
2254            .await
2255            .unwrap();
2256        assert!(start.elapsed() < Duration::from_millis(100));
2257        assert_eq!(got.as_slice(), &[tool("a")]);
2258    }
2259
2260    /// Cached `Err` is treated as "different from any caller snapshot."
2261    #[tokio::test]
2262    async fn subscribe_tools_returns_err_immediately() {
2263        let conn = Connection::new_for_test("t".into(), "http://x".into());
2264        let err = super::super::Error::NoSessionId {
2265            url: "http://x".into(),
2266            body: String::new(),
2267        };
2268        *conn.inner.tools.write().await = Some(Err(Arc::new(err)));
2269
2270        let start = std::time::Instant::now();
2271        let got = conn.subscribe_tools(&[], Duration::from_secs(5)).await;
2272        assert!(start.elapsed() < Duration::from_millis(100));
2273        assert!(got.is_err());
2274    }
2275
2276    /// Cache equals snapshot, then a writer fires the notify under the
2277    /// write lock and installs a new list. The subscriber wakes, then its
2278    /// re-read blocks behind the writer's guard, observes the new list.
2279    #[tokio::test]
2280    async fn subscribe_tools_wakes_on_change_and_reads_post_swap() {
2281        let conn = Connection::new_for_test("t".into(), "http://x".into());
2282        *conn.inner.tools.write().await = Some(Ok(Arc::new(vec![tool("a")])));
2283
2284        let conn_for_subscriber = conn.clone();
2285        let subscriber = tokio::spawn(async move {
2286            conn_for_subscriber
2287                .subscribe_tools(&[tool("a")], Duration::from_secs(5))
2288                .await
2289                .unwrap()
2290        });
2291
2292        // Give the subscriber a moment to arm `notified()` and finish
2293        // its first read so it's parked on the timeout.
2294        tokio::time::sleep(Duration::from_millis(50)).await;
2295
2296        // Simulate `refresh_tools_signaling`: take the write lock, fire
2297        // `tools_changed` *while holding* the write lock, then install
2298        // the new value before releasing. This is exactly the ordering
2299        // that the real refresh path uses.
2300        {
2301            let mut guard = conn.inner.tools.write().await;
2302            conn.inner.tools_changed.notify_waiters();
2303            // Hold briefly to make absolutely sure the subscriber is
2304            // racing the read lock against our drop.
2305            tokio::time::sleep(Duration::from_millis(20)).await;
2306            *guard = Some(Ok(Arc::new(vec![tool("b")])));
2307        }
2308
2309        let got = tokio::time::timeout(Duration::from_secs(2), subscriber)
2310            .await
2311            .expect("subscriber returned in time")
2312            .expect("subscriber didn't panic");
2313        assert_eq!(got.as_slice(), &[tool("b")]);
2314    }
2315
2316    /// Cache equals snapshot, no notification arrives — timeout, return
2317    /// the still-equal list (not an error).
2318    #[tokio::test]
2319    async fn subscribe_tools_times_out_and_returns_unchanged_list() {
2320        let conn = Connection::new_for_test("t".into(), "http://x".into());
2321        *conn.inner.tools.write().await = Some(Ok(Arc::new(vec![tool("a")])));
2322
2323        let start = std::time::Instant::now();
2324        let got = conn
2325            .subscribe_tools(&[tool("a")], Duration::from_millis(50))
2326            .await
2327            .unwrap();
2328        let elapsed = start.elapsed();
2329        assert!(elapsed >= Duration::from_millis(40), "elapsed: {elapsed:?}");
2330        assert!(elapsed < Duration::from_millis(500), "elapsed: {elapsed:?}");
2331        assert_eq!(got.as_slice(), &[tool("a")]);
2332    }
2333
2334    /// Two concurrent subscribers both wake on a single notify_waiters
2335    /// and both observe the post-swap list.
2336    #[tokio::test]
2337    async fn subscribe_tools_supports_concurrent_subscribers() {
2338        let conn = Connection::new_for_test("t".into(), "http://x".into());
2339        *conn.inner.tools.write().await = Some(Ok(Arc::new(vec![tool("a")])));
2340
2341        let c1 = conn.clone();
2342        let c2 = conn.clone();
2343        let s1 = tokio::spawn(async move {
2344            c1.subscribe_tools(&[tool("a")], Duration::from_secs(5))
2345                .await
2346                .unwrap()
2347        });
2348        let s2 = tokio::spawn(async move {
2349            c2.subscribe_tools(&[tool("a")], Duration::from_secs(5))
2350                .await
2351                .unwrap()
2352        });
2353
2354        tokio::time::sleep(Duration::from_millis(50)).await;
2355
2356        {
2357            let mut guard = conn.inner.tools.write().await;
2358            conn.inner.tools_changed.notify_waiters();
2359            *guard = Some(Ok(Arc::new(vec![tool("c")])));
2360        }
2361
2362        let (r1, r2) = tokio::join!(s1, s2);
2363        let r1 = r1.unwrap();
2364        let r2 = r2.unwrap();
2365        assert_eq!(r1.as_slice(), &[tool("c")]);
2366        assert_eq!(r2.as_slice(), &[tool("c")]);
2367    }
2368}
2369
2370#[cfg(test)]
2371mod drain_notifications_tests {
2372    use super::*;
2373    use crate::mcp::tool::{ContentBlock, TextContent};
2374    use serde_json::json;
2375    use wiremock::matchers::{header, method, path};
2376    use wiremock::{Mock, MockServer, ResponseTemplate};
2377
2378    /// Happy path: proxy returns `[text, text]`, we parse it as two
2379    /// `ContentBlock::Text` and return them in order.
2380    #[tokio::test]
2381    async fn drain_notifications_parses_text_blocks_in_order() {
2382        let server = MockServer::start().await;
2383        Mock::given(method("GET"))
2384            .and(path("/notify"))
2385            .and(header("Mcp-Session-Id", ""))
2386            .respond_with(ResponseTemplate::new(200).set_body_json(json!([
2387                {"type": "text", "text": "first"},
2388                {"type": "text", "text": "second"},
2389            ])))
2390            .mount(&server)
2391            .await;
2392
2393        let conn = Connection::new_for_test("t".into(), server.uri());
2394        let blocks = conn.drain_notifications().await.expect("drain ok");
2395        assert_eq!(blocks.len(), 2);
2396        match &blocks[0] {
2397            ContentBlock::Text(TextContent { text, .. }) => {
2398                assert_eq!(text, "first")
2399            }
2400            other => panic!("expected text, got {other:?}"),
2401        }
2402        match &blocks[1] {
2403            ContentBlock::Text(TextContent { text, .. }) => {
2404                assert_eq!(text, "second")
2405            }
2406            other => panic!("expected text, got {other:?}"),
2407        }
2408    }
2409
2410    /// 404 (proxy lost the session, e.g. after a restart) → empty vec
2411    /// rather than an error. The next upstream call will surface the
2412    /// session-lost condition through its own error path; init-time
2413    /// drain shouldn't be the one to abort the request.
2414    #[tokio::test]
2415    async fn drain_notifications_404_returns_empty() {
2416        let server = MockServer::start().await;
2417        Mock::given(method("GET"))
2418            .and(path("/notify"))
2419            .respond_with(ResponseTemplate::new(404))
2420            .mount(&server)
2421            .await;
2422
2423        let conn = Connection::new_for_test("t".into(), server.uri());
2424        let blocks = conn.drain_notifications().await.expect("404 → ok(empty)");
2425        assert!(blocks.is_empty(), "expected empty vec, got {blocks:?}");
2426    }
2427
2428    /// Empty queue → empty array → empty vec. The most common case.
2429    #[tokio::test]
2430    async fn drain_notifications_empty_queue_returns_empty() {
2431        let server = MockServer::start().await;
2432        Mock::given(method("GET"))
2433            .and(path("/notify"))
2434            .respond_with(ResponseTemplate::new(200).set_body_json(json!([])))
2435            .mount(&server)
2436            .await;
2437
2438        let conn = Connection::new_for_test("t".into(), server.uri());
2439        let blocks = conn.drain_notifications().await.expect("drain ok");
2440        assert!(blocks.is_empty(), "expected empty vec, got {blocks:?}");
2441    }
2442
2443    /// Non-success / non-404 status propagates as `BadStatus`.
2444    #[tokio::test]
2445    async fn drain_notifications_5xx_returns_bad_status() {
2446        let server = MockServer::start().await;
2447        Mock::given(method("GET"))
2448            .and(path("/notify"))
2449            .respond_with(ResponseTemplate::new(500).set_body_string("boom"))
2450            .mount(&server)
2451            .await;
2452
2453        let conn = Connection::new_for_test("t".into(), server.uri());
2454        let err = conn.drain_notifications().await.expect_err("5xx → err");
2455        match err {
2456            super::super::Error::BadStatus { code, body, .. } => {
2457                assert_eq!(code.as_u16(), 500);
2458                assert_eq!(body, "boom");
2459            }
2460            other => panic!("expected BadStatus, got {other:?}"),
2461        }
2462    }
2463
2464}
2465
2466#[cfg(test)]
2467mod has_pending_notifications_tests {
2468    use super::*;
2469    use serde_json::json;
2470    use wiremock::matchers::{header, method, path};
2471    use wiremock::{Mock, MockServer, ResponseTemplate};
2472
2473    /// Happy path: proxy returns `true` → Ok(true).
2474    #[tokio::test]
2475    async fn has_pending_notifications_true() {
2476        let server = MockServer::start().await;
2477        Mock::given(method("GET"))
2478            .and(path("/notify/queued"))
2479            .and(header("Mcp-Session-Id", ""))
2480            .respond_with(ResponseTemplate::new(200).set_body_json(json!(true)))
2481            .mount(&server)
2482            .await;
2483
2484        let conn = Connection::new_for_test("t".into(), server.uri());
2485        let got = conn.has_pending_notifications().await.expect("peek ok");
2486        assert!(got);
2487    }
2488
2489    /// Proxy returns `false` → Ok(false).
2490    #[tokio::test]
2491    async fn has_pending_notifications_false() {
2492        let server = MockServer::start().await;
2493        Mock::given(method("GET"))
2494            .and(path("/notify/queued"))
2495            .respond_with(
2496                ResponseTemplate::new(200).set_body_json(json!(false)),
2497            )
2498            .mount(&server)
2499            .await;
2500
2501        let conn = Connection::new_for_test("t".into(), server.uri());
2502        let got = conn.has_pending_notifications().await.expect("peek ok");
2503        assert!(!got);
2504    }
2505
2506    /// 404 (proxy lost the session) → Ok(false). Same soft-fallback
2507    /// contract as drain_notifications — peek must never abort a
2508    /// request over a missing-session race.
2509    #[tokio::test]
2510    async fn has_pending_notifications_404_returns_false() {
2511        let server = MockServer::start().await;
2512        Mock::given(method("GET"))
2513            .and(path("/notify/queued"))
2514            .respond_with(ResponseTemplate::new(404))
2515            .mount(&server)
2516            .await;
2517
2518        let conn = Connection::new_for_test("t".into(), server.uri());
2519        let got = conn
2520            .has_pending_notifications()
2521            .await
2522            .expect("404 → ok(false)");
2523        assert!(!got);
2524    }
2525
2526    /// Non-success / non-404 status propagates as `BadStatus`.
2527    #[tokio::test]
2528    async fn has_pending_notifications_5xx_returns_bad_status() {
2529        let server = MockServer::start().await;
2530        Mock::given(method("GET"))
2531            .and(path("/notify/queued"))
2532            .respond_with(ResponseTemplate::new(500).set_body_string("boom"))
2533            .mount(&server)
2534            .await;
2535
2536        let conn = Connection::new_for_test("t".into(), server.uri());
2537        let err = conn
2538            .has_pending_notifications()
2539            .await
2540            .expect_err("5xx → err");
2541        match err {
2542            super::super::Error::BadStatus { code, body, .. } => {
2543                assert_eq!(code.as_u16(), 500);
2544                assert_eq!(body, "boom");
2545            }
2546            other => panic!("expected BadStatus, got {other:?}"),
2547        }
2548    }
2549
2550}