Skip to main content

objectiveai_sdk/mcp/
connection.rs

1//! MCP connection for communicating with an MCP server.
2//!
3//! [`Connection`] is a cheaply-clonable handle around an internal
4//! [`ConnectionInner`]. The last drop of the inner `Arc` runs
5//! [`ConnectionInner`]'s `Drop`, which cancels the listener task's
6//! [`tokio_util::sync::CancellationToken`] (held in `_listener_cancel_guard`
7//! as a [`tokio_util::sync::DropGuard`]) — the SSE listener exits the
8//! instant any in-flight reconnect, sleep, or read is cancelled, with no
9//! zombie 401 retries against a now-dead proxy session.
10
11use std::ops::Deref;
12use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
13use std::sync::{Arc, RwLock as StdRwLock, Weak};
14use std::time::Duration;
15
16use indexmap::IndexMap;
17use tokio::sync::{Notify, RwLock};
18use tokio_util::sync::{CancellationToken, DropGuard};
19
20/// Callback fired by [`Connection`] when the upstream MCP server emits
21/// `notifications/tools/list_changed` or `notifications/resources/list_changed`.
22///
23/// **Timing:** runs after the corresponding cache's write lock is taken
24/// but *before* the network paginate that replaces it. That ordering
25/// matches the moment the staleness window opens — anyone blocked on the
26/// read lock won't return until the new list lands. The callback should
27/// not call back into `list_tools` / `list_resources`: doing so would
28/// re-take the lock the listener already holds and deadlock.
29///
30/// Stored behind an `Arc` so the listener task can cheaply clone it out
31/// of the lock and call it without holding the read guard.
32pub type ListChangedCallback = Arc<dyn Fn() + Send + Sync + 'static>;
33
34/// A registered-or-not callback slot. Wrapper so [`ConnectionInner`] can
35/// keep `#[derive(Debug)]` (a raw `dyn Fn` isn't `Debug`).
36struct CallbackSlot(StdRwLock<Option<ListChangedCallback>>);
37
38impl CallbackSlot {
39    fn new() -> Self {
40        Self(StdRwLock::new(None))
41    }
42
43    fn set(&self, callback: ListChangedCallback) {
44        *self.0.write().unwrap() = Some(callback);
45    }
46
47    /// Cheap clone-out of the current callback (if any). The `Arc` clone
48    /// lets us release the read guard before invoking the callback.
49    fn get(&self) -> Option<ListChangedCallback> {
50        self.0.read().unwrap().clone()
51    }
52}
53
54impl std::fmt::Debug for CallbackSlot {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        let set = self.0.read().map(|g| g.is_some()).unwrap_or(false);
57        f.debug_struct("CallbackSlot").field("set", &set).finish()
58    }
59}
60
61/// An active connection to an MCP server using the Streamable HTTP transport.
62///
63/// Cheaply clonable (one `Arc` bump). When the last clone is dropped, the
64/// inner `Arc` ref count hits zero, [`ConnectionInner::Drop`] runs, the
65/// listener-cancel `DropGuard` is dropped, and the SSE listener task is
66/// cancelled — exiting any in-flight `lines.next_line()`, reconnect
67/// `send()`, or backoff `sleep` *immediately* without retrying against
68/// the now-dead proxy session.
69///
70/// Use the public methods (`list_tools`, `call_tool`, `list_resources`,
71/// `read_resource`, `call_tool_as_message`, `tool_key`) for the upstream
72/// MCP protocol surface. The inner state ([`ConnectionInner`]) is also
73/// reachable via `Deref` for read-only field access (e.g.
74/// `connection.url`, `connection.initialize_result.server_info.name`),
75/// but its methods are private — you must go through `Connection`.
76#[derive(Debug)]
77pub struct Connection {
78    inner: Arc<ConnectionInner>,
79}
80
81impl Clone for Connection {
82    fn clone(&self) -> Self {
83        Self {
84            inner: Arc::clone(&self.inner),
85        }
86    }
87}
88
89// No `Drop` for `Connection`: cancellation happens deterministically
90// when the last `Arc<ConnectionInner>` clone is dropped, which runs
91// `ConnectionInner::drop` and releases the cancel-token DropGuard.
92
93impl Deref for Connection {
94    type Target = ConnectionInner;
95    fn deref(&self) -> &ConnectionInner {
96        &self.inner
97    }
98}
99
100impl Connection {
101    /// Tear this connection down explicitly.
102    ///
103    /// 1. Cancels the long-lived list-changed listener task immediately
104    ///    (drops the [`DropGuard`] in
105    ///    [`ConnectionInner::_listener_cancel_guard`]), so by the time
106    ///    the HTTP DELETE goes out the listener isn't still holding an
107    ///    SSE read open against the upstream we're about to close.
108    /// 2. Issues `DELETE /` to the upstream with this connection's
109    ///    `Mcp-Session-Id` and the same merged header set every other
110    ///    RPC stamps. Reuses [`ConnectionInner::call_timeout`].
111    /// 3. Treats `404 / 401 / 403` as success — the upstream is
112    ///    unreachable from these credentials anyway, which is the
113    ///    desired terminal state. Other non-2xx surfaces as
114    ///    [`super::Error::BadStatus`].
115    ///
116    /// Takes `&self`: the listener cancel is in-place, and dropping
117    /// the surrounding `Arc<ConnectionInner>` (which closes the rest
118    /// of the connection's owned state) is the caller's responsibility
119    /// — usually by dropping the `Arc<Session>` holding it. Stateless
120    /// callers that don't hold a `Connection` should use
121    /// [`Client::delete`](super::Client::delete) instead.
122    ///
123    /// **In-flight RPC ordering.** This method does not block on
124    /// in-flight `call_tool` / `read_resource` / `list_tools` /
125    /// `list_resources` calls on the same connection. If one is
126    /// outstanding when `delete` lands, the upstream may see DELETE
127    /// before the RPC's reply makes it back; the in-flight call then
128    /// surfaces as a closed-connection error to whoever started it.
129    /// That's the spec-correct order (client said terminate) — drain
130    /// on the caller side first if you need different semantics.
131    pub async fn delete(&self) -> Result<(), super::Error> {
132        // 1. Mark the connection as "used" so the drop-time
133        //    orphan-DELETE check (see `ConnectionInner::Drop`) skips
134        //    its own fan-out. Without this, a fresh-mint connection
135        //    that's explicitly torn down via `delete()` would race
136        //    its own drop-time orphan into a duplicate upstream
137        //    DELETE.
138        self.inner.any_calls.store(true, Ordering::Relaxed);
139
140        // 2. Drop the listener-cancel guard. Releasing the `DropGuard`
141        //    cancels the sibling `CancellationToken` the listener task
142        //    holds; the listener `tokio::select!`s against it on every
143        //    blocking await and exits inside one scheduler tick.
144        if let Ok(mut guard) = self.inner._listener_cancel_guard.lock() {
145            let _ = guard.take();
146        }
147
148        // 3. Mock connections never opened an HTTP session, so the
149        //    DELETE call would 404. Short-circuit to success.
150        if self.inner.mock {
151            return Ok(());
152        }
153
154        // 4. Build + send HTTP DELETE. Mirrors `Client::connect_once`'s
155        //    request-stamp shape: header loop first, explicit
156        //    `Mcp-Session-Id` always wins.
157        let request = self
158            .inner
159            .http_client
160            .delete(&self.inner.url)
161            .timeout(self.inner.call_timeout)
162            .headers(self.inner.build_request_headers(None, None).await);
163        let response = request.send().await.map_err(|source| {
164            super::Error::Request {
165                url: self.inner.url.clone(),
166                source,
167            }
168        })?;
169
170        // 5. 404 / 401 / 403 → success; other non-2xx → real error.
171        let status = response.status();
172        if matches!(
173            status,
174            reqwest::StatusCode::NOT_FOUND
175                | reqwest::StatusCode::UNAUTHORIZED
176                | reqwest::StatusCode::FORBIDDEN
177        ) {
178            return Ok(());
179        }
180        if !status.is_success() {
181            let body = response.text().await.unwrap_or_default();
182            return Err(super::Error::BadStatus {
183                url: self.inner.url.clone(),
184                code: status,
185                body: body.chars().take(800).collect(),
186            });
187        }
188        Ok(())
189    }
190
191    pub(super) async fn new(
192        http_client: reqwest::Client,
193        url: String,
194        session_id: String,
195        headers: IndexMap<String, String>,
196        backoff_current_interval: Duration,
197        backoff_initial_interval: Duration,
198        backoff_randomization_factor: f64,
199        backoff_multiplier: f64,
200        backoff_max_interval: Duration,
201        backoff_max_elapsed_time: Duration,
202        call_timeout: Duration,
203        initialize_result: super::initialize_result::InitializeResult,
204        initial_sse_lines: Option<super::LinesStream>,
205        is_reconnect: bool,
206    ) -> Self {
207        let inner = ConnectionInner::new(
208            http_client,
209            url,
210            session_id,
211            headers,
212            backoff_current_interval,
213            backoff_initial_interval,
214            backoff_randomization_factor,
215            backoff_multiplier,
216            backoff_max_interval,
217            backoff_max_elapsed_time,
218            call_timeout,
219            initialize_result,
220            initial_sse_lines,
221            is_reconnect,
222        )
223        .await;
224        Self { inner }
225    }
226
227    pub(super) fn new_mock(url: String) -> Self {
228        Self {
229            inner: ConnectionInner::new_mock(url),
230        }
231    }
232
233    #[cfg(test)]
234    pub(crate) fn new_for_test(name: String, url: String) -> Self {
235        Self {
236            inner: ConnectionInner::new_for_test(name, url),
237        }
238    }
239
240    /// Send a JSON-RPC notification to the upstream. Used by `Client`
241    /// right after `initialize` to send `notifications/initialized`.
242    pub(super) async fn notify<P: serde::Serialize>(
243        &self,
244        method: &str,
245        params: &P,
246    ) -> Result<(), super::Error> {
247        self.inner.notify(method, params).await
248    }
249
250    /// Returns a key identifying this connection for tool namespacing.
251    pub fn tool_key(&self) -> String {
252        self.inner.tool_key()
253    }
254
255    /// Returns the session ID for this connection.
256    pub fn session_id(&self) -> &str {
257        self.inner.session_id()
258    }
259
260    /// Returns all tools from the upstream server.
261    pub async fn list_tools(
262        &self,
263    ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
264        self.inner.list_tools().await
265    }
266
267    /// Calls a tool on the upstream server.
268    pub async fn call_tool(
269        &self,
270        params: &super::tool::CallToolRequestParams,
271    ) -> Result<super::tool::CallToolResult, super::Error> {
272        self.inner.call_tool(params).await
273    }
274
275    /// Calls a tool and converts the result into a [`ToolMessage`].
276    pub async fn call_tool_as_message(
277        &self,
278        params: &super::tool::CallToolRequestParams,
279        tool_call_id: String,
280    ) -> Result<crate::agent::completions::message::ToolMessage, super::Error>
281    {
282        self.inner.call_tool_as_message(params, tool_call_id).await
283    }
284
285    /// Returns all resources from the upstream server.
286    pub async fn list_resources(
287        &self,
288    ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
289        self.inner.list_resources().await
290    }
291
292    /// Returns the cached tool list as soon as it differs from `current`,
293    /// or waits up to `timeout` for the next `notifications/tools/list_changed`
294    /// from the upstream server before re-reading.
295    ///
296    /// Wakes the moment a refresh writer takes the cache write lock, so
297    /// the post-wake `read` is guaranteed to observe the new list rather
298    /// than racing against the install. Safe to call from any number of
299    /// tasks concurrently.
300    pub async fn subscribe_tools(
301        &self,
302        current: &[super::tool::Tool],
303        timeout: Duration,
304    ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
305        self.inner.subscribe_tools(current, timeout).await
306    }
307
308    /// Resource counterpart of [`Connection::subscribe_tools`].
309    pub async fn subscribe_resources(
310        &self,
311        current: &[super::resource::Resource],
312        timeout: Duration,
313    ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
314        self.inner.subscribe_resources(current, timeout).await
315    }
316
317    /// Atomically drain the proxy's `pending_notifications` queue for
318    /// this session via `GET /notify` and return the queued content
319    /// blocks. A second call returns `[]` until the next out-of-band
320    /// `POST /notify`.
321    ///
322    /// Intended for use at the start of an agent turn so notifications
323    /// queued between turns — when the prior turn ended without a tool
324    /// call, or the user is starting a fresh continuation — surface as
325    /// a user message instead of being lost. The proxy's existing
326    /// `tools/call` response path still drains in-flight notifications
327    /// arriving *during* a turn; this method covers the gap between
328    /// turns.
329    ///
330    /// A 404 from the proxy (session unknown — possible after a proxy
331    /// restart) is mapped to an empty `Vec` so callers do not need to
332    /// distinguish "no notifications" from "lost session" at the use
333    /// site; the next upstream call will surface the lost-session
334    /// condition through its own error path.
335    pub async fn drain_notifications(
336        &self,
337    ) -> Result<Vec<super::tool::ContentBlock>, super::Error> {
338        self.inner.drain_notifications().await
339    }
340
341    /// Non-draining peek at the proxy's `pending_notifications` queue
342    /// via `GET /notify/queued`. Returns `true` iff the queue holds at
343    /// least one block. Companion to [`Connection::drain_notifications`]
344    /// for callers that want to know whether queued blocks exist
345    /// without consuming them.
346    ///
347    /// A 404 from the proxy (session unknown — possible after a proxy
348    /// restart) is mapped to `Ok(false)` for the same reason as the
349    /// drain path: callers do not need to distinguish "no
350    /// notifications" from "lost session" at the use site.
351    pub async fn has_pending_notifications(
352        &self,
353    ) -> Result<bool, super::Error> {
354        self.inner.has_pending_notifications().await
355    }
356
357    /// `POST <self.url>/notify` against the ObjectiveAI MCP proxy.
358    /// Appends `blocks` to the proxy's pending-notifications queue for
359    /// this session; they surface as a user message on the next
360    /// `tools/call` response (wrapped in a `<system-reminder>` block)
361    /// or as the head of the next agent turn when drained between turns.
362    ///
363    /// Mirror of [`Connection::drain_notifications`] / [`Connection::has_pending_notifications`]
364    /// for the inbound side. A 404 from the proxy means the session is
365    /// gone — surfaced as `SessionExpired` so callers can distinguish
366    /// "session lost" from "delivery failed" at the use site.
367    pub async fn enqueue_notifications(
368        &self,
369        blocks: &[super::tool::ContentBlock],
370    ) -> Result<(), super::Error> {
371        self.inner.enqueue_notifications(blocks).await
372    }
373
374    /// Reads a resource from the upstream server.
375    pub async fn read_resource(
376        &self,
377        uri: &str,
378    ) -> Result<super::resource::ReadResourceResult, super::Error> {
379        self.inner.read_resource(uri).await
380    }
381
382    /// Register a callback to fire whenever the upstream emits
383    /// `notifications/tools/list_changed`.
384    ///
385    /// **Timing:** the callback runs *after* the tool cache's write lock
386    /// is acquired but *before* the network paginate that replaces it.
387    /// That means readers blocked on the read lock won't return until the
388    /// new list is in place, and the callback observes the moment the
389    /// staleness window opens. The proxy uses this to emit its own
390    /// `notifications/tools/list_changed` to downstream clients at the
391    /// right instant.
392    ///
393    /// Replaces any previously-registered tools-list-changed callback.
394    /// All clones of this `Connection` share the same callback slot.
395    pub fn set_on_tools_list_changed<F>(&self, callback: F)
396    where
397        F: Fn() + Send + Sync + 'static,
398    {
399        self.inner.on_tools_list_changed.set(Arc::new(callback));
400    }
401
402    /// Register a callback to fire whenever the upstream emits
403    /// `notifications/resources/list_changed`. Same timing contract as
404    /// [`Connection::set_on_tools_list_changed`].
405    ///
406    /// Replaces any previously-registered resources-list-changed callback.
407    /// All clones of this `Connection` share the same callback slot.
408    pub fn set_on_resources_list_changed<F>(&self, callback: F)
409    where
410        F: Fn() + Send + Sync + 'static,
411    {
412        self.inner.on_resources_list_changed.set(Arc::new(callback));
413    }
414
415    /// Atomically replace the connection's [`ConnectionInner::extra_headers`]
416    /// bag. Every subsequent outbound HTTP request from this connection
417    /// stamps the new map AFTER `headers`, with `HeaderMap::insert`
418    /// REPLACE semantics — keys in `extras` override the same key in
419    /// the per-URL `headers` bag set at `Client::connect`. Caller
420    /// supplies the FULL replacement map; missing keys are dropped
421    /// (no merge).
422    ///
423    /// Used by the proxy to inject session-global headers
424    /// (`X-OBJECTIVEAI-RESPONSE-ID`, `X-OBJECTIVEAI-RESPONSE-IDS`)
425    /// that re-set on every inbound `initialize`, without re-dialing
426    /// the upstream.
427    pub async fn set_extra_headers(
428        &self,
429        extras: IndexMap<String, String>,
430    ) {
431        *self.inner.extra_headers.write().await = extras;
432    }
433}
434
435/// The actual connection state. Behind an `Arc` inside [`Connection`].
436///
437/// Fields are public for read-only access (callers reach them via
438/// `Connection`'s `Deref`), but every method on this type is private —
439/// the public surface lives on [`Connection`] and delegates through.
440#[derive(Debug)]
441pub struct ConnectionInner {
442    pub http_client: reqwest::Client,
443    pub url: String,
444    pub session_id: String,
445    /// All HTTP headers stamped on every POST / GET this connection
446    /// makes — the same merged map (defaults + caller overrides) the
447    /// `Client` built once during connect. `Mcp-Session-Id`,
448    /// `Content-Type`, and `Accept` are still set by the request
449    /// builders and override anything in `headers`.
450    pub headers: IndexMap<String, String>,
451    /// Mutable per-request override layer stamped AFTER `headers` on
452    /// every outbound HTTP request. The request-builder uses
453    /// `reqwest::header::HeaderMap::insert` semantics so any key
454    /// present in `extra_headers` REPLACES the same key in `headers`.
455    /// Used by the proxy to inject session-global headers
456    /// (`X-OBJECTIVEAI-RESPONSE-ID` etc.) that override per-URL
457    /// values without re-dialing. Empty by default; set via
458    /// [`Connection::set_extra_headers`].
459    pub extra_headers: RwLock<IndexMap<String, String>>,
460
461    pub backoff_current_interval: Duration,
462    pub backoff_initial_interval: Duration,
463    pub backoff_randomization_factor: f64,
464    pub backoff_multiplier: f64,
465    pub backoff_max_interval: Duration,
466    pub backoff_max_elapsed_time: Duration,
467    pub call_timeout: Duration,
468
469    /// The server's capabilities and info from the initialize response.
470    pub initialize_result: super::initialize_result::InitializeResult,
471
472    /// If true, all RPC/notify calls are no-ops. Used for mock orchestrator URLs.
473    mock: bool,
474
475    /// `true` iff this connection was opened by resuming an existing
476    /// upstream session — i.e. [`Client::connect`](super::Client::connect)
477    /// was called with `session_id: Some(...)`. Set once at
478    /// construction; never mutated.
479    ///
480    /// Used by the drop-time orphan-DELETE gate in
481    /// [`ConnectionInner::Drop`]: reconnects are **excluded** from
482    /// orphan DELETE because their `any_calls == false` only means
483    /// "this `Connection` instance didn't use it" — an earlier
484    /// instance that opened the upstream session may have. Only
485    /// freshly-minted connections (`is_reconnect == false`) that no
486    /// one used get the drop-time orphan DELETE.
487    is_reconnect: bool,
488
489    /// `true` once any [`Self::call_tool`] or [`Self::read_resource`]
490    /// has been issued through this connection. Listings
491    /// ([`Self::list_tools`], [`Self::list_resources`]) and the
492    /// proxy-side notification helpers do NOT flip this — only
493    /// deliberate use of the upstream session counts.
494    ///
495    /// Stored atomically because the setters live behind `&self`-only
496    /// call paths; `Ordering::Relaxed` is sufficient (we never
497    /// synchronize anything else against this load/store).
498    ///
499    /// Also flipped to `true` at the top of [`super::Connection::delete`]
500    /// so an explicit teardown of a fresh-mint connection can't race
501    /// the drop-time orphan-DELETE into a double-fire.
502    any_calls: AtomicBool,
503
504    /// Auto-incrementing request ID (starts at 2; 1 was used for initialize).
505    next_id: AtomicU64,
506
507    /// All tools from the server, populated by background pagination.
508    ///
509    /// `None` = cache cleared — either pre-populate (between
510    /// [`Self::new`] and the first `refresh_tools_signaling`) or
511    /// post-drop (the listener empties this the moment its SSE
512    /// stream ends so `list_tools` will re-paginate against the
513    /// upstream rather than return stale state). `Some(_)` = last
514    /// known result, `Ok` or `Err`.
515    tools:
516        RwLock<Option<Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>>>>,
517    /// All resources from the server, populated by background pagination.
518    /// Same `None`/`Some` semantics as [`Self::tools`].
519    resources: RwLock<
520        Option<Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>>>,
521    >,
522
523    /// Cancellation token for the long-lived `listen_for_list_changes`
524    /// task. The listener selects this against every blocking await
525    /// (read, reconnect-send, backoff-sleep) and returns the instant it
526    /// fires.
527    ///
528    /// Held inside the connection as a [`DropGuard`] so that the moment
529    /// the last `Arc<ConnectionInner>` clone is dropped — i.e. the
530    /// moment no external `Connection` handle remains — `Drop` runs on
531    /// the guard, the token cancels, and the listener task tears down.
532    /// The listener itself holds a sibling `CancellationToken` (clone),
533    /// not the guard, so its task does not extend the connection's
534    /// lifetime.
535    ///
536    /// Wrapped in `Mutex<Option<_>>` so explicit teardown paths
537    /// ([`Connection::delete`]) can drop the guard in place — firing
538    /// the cancel token *before* the surrounding `Arc<ConnectionInner>`
539    /// goes away. Regular drop still works: `Mutex<Option<DropGuard>>`
540    /// drops its inner `DropGuard` automatically when the mutex itself
541    /// drops, so existing `Connection::Drop` semantics are unchanged.
542    _listener_cancel_guard: std::sync::Mutex<Option<DropGuard>>,
543
544    /// Optional callback fired *after* the listener has refreshed the
545    /// tool cache in response to an upstream `notifications/tools/list_changed`.
546    /// Set via [`Connection::set_on_tools_list_changed`].
547    on_tools_list_changed: CallbackSlot,
548
549    /// Optional callback fired *after* the listener has refreshed the
550    /// resource cache in response to an upstream
551    /// `notifications/resources/list_changed`.
552    /// Set via [`Connection::set_on_resources_list_changed`].
553    on_resources_list_changed: CallbackSlot,
554
555    /// Wakes any task awaiting in [`Connection::subscribe_tools`]. Fired
556    /// from inside `refresh_tools_signaling` the moment the writer
557    /// acquires the cache write lock — *before* the new list is
558    /// installed. A woken subscriber's next `read().await` blocks behind
559    /// the writer's guard, so it always observes the post-swap state.
560    tools_changed: Notify,
561
562    /// Resource counterpart of [`Self::tools_changed`].
563    resources_changed: Notify,
564}
565
566impl ConnectionInner {
567    /// Creates a mock connection that never makes network requests.
568    /// All RPC calls return empty/default results.
569    fn new_mock(url: String) -> Arc<Self> {
570        Arc::new(Self {
571            http_client: reqwest::Client::new(),
572            url,
573            session_id: String::new(),
574            headers: IndexMap::new(),
575            extra_headers: RwLock::new(IndexMap::new()),
576            backoff_current_interval: Duration::ZERO,
577            backoff_initial_interval: Duration::ZERO,
578            backoff_randomization_factor: 0.0,
579            backoff_multiplier: 1.0,
580            backoff_max_interval: Duration::ZERO,
581            backoff_max_elapsed_time: Duration::ZERO,
582            call_timeout: Duration::ZERO,
583            initialize_result: super::initialize_result::InitializeResult {
584                protocol_version: "2025-03-26".into(),
585                capabilities: super::initialize_result::ServerCapabilities {
586                    experimental: None,
587                    logging: None,
588                    completions: None,
589                    prompts: None,
590                    resources: None,
591                    tools: None,
592                    tasks: None,
593                },
594                server_info: super::initialize_result::Implementation {
595                    name: "mock".into(),
596                    title: None,
597                    version: "0.0.0".into(),
598                    website_url: None,
599                    description: None,
600                    icons: None,
601                },
602                instructions: None,
603                _meta: None,
604            },
605            mock: true,
606            is_reconnect: false,
607            any_calls: AtomicBool::new(false),
608            next_id: AtomicU64::new(2),
609            // Mock has no listener and never refreshes; seed with an
610            // empty Ok so `list_tools` returns immediately instead of
611            // trying to paginate over a non-existent upstream.
612            tools: RwLock::new(Some(Ok(Arc::new(Vec::new())))),
613            resources: RwLock::new(Some(Ok(Arc::new(Vec::new())))),
614            _listener_cancel_guard: std::sync::Mutex::new(None),
615            on_tools_list_changed: CallbackSlot::new(),
616            on_resources_list_changed: CallbackSlot::new(),
617            tools_changed: Notify::new(),
618            resources_changed: Notify::new(),
619        })
620    }
621
622    /// Creates a minimal connection for unit testing.
623    #[cfg(test)]
624    fn new_for_test(name: String, url: String) -> Arc<Self> {
625        Arc::new(Self {
626            http_client: reqwest::Client::new(),
627            url,
628            session_id: String::new(),
629            headers: IndexMap::new(),
630            extra_headers: RwLock::new(IndexMap::new()),
631            backoff_current_interval: Duration::from_millis(500),
632            backoff_initial_interval: Duration::from_millis(500),
633            backoff_randomization_factor: 0.5,
634            backoff_multiplier: 1.5,
635            backoff_max_interval: Duration::from_secs(60),
636            backoff_max_elapsed_time: Duration::from_secs(900),
637            call_timeout: Duration::from_secs(30),
638            initialize_result: super::initialize_result::InitializeResult {
639                protocol_version: "2025-03-26".into(),
640                capabilities: super::initialize_result::ServerCapabilities {
641                    experimental: None,
642                    logging: None,
643                    completions: None,
644                    prompts: None,
645                    resources: None,
646                    tools: None,
647                    tasks: None,
648                },
649                server_info: super::initialize_result::Implementation {
650                    name,
651                    title: None,
652                    version: "0.0.0".into(),
653                    website_url: None,
654                    description: None,
655                    icons: None,
656                },
657                instructions: None,
658                _meta: None,
659            },
660            mock: false,
661            is_reconnect: false,
662            any_calls: AtomicBool::new(false),
663            next_id: AtomicU64::new(2),
664            // Test connection has no listener and never refreshes; seed
665            // with an empty Ok so `list_tools` doesn't try to paginate.
666            tools: RwLock::new(Some(Ok(Arc::new(Vec::new())))),
667            resources: RwLock::new(Some(Ok(Arc::new(Vec::new())))),
668            _listener_cancel_guard: std::sync::Mutex::new(None),
669            on_tools_list_changed: CallbackSlot::new(),
670            on_resources_list_changed: CallbackSlot::new(),
671            tools_changed: Notify::new(),
672            resources_changed: Notify::new(),
673        })
674    }
675
676    /// Creates a new connection and spawns background tasks to paginate
677    /// all tools and resources. Called internally by
678    /// [`Client::connect`](super::Client::connect) (via [`Connection::new`]).
679    ///
680    /// `initial_sse_lines`, if `Some`, is a pre-opened SSE line reader
681    /// that the list-changed listener will read from immediately on its
682    /// first iteration, instead of opening its own GET `/`. The caller
683    /// is responsible for arranging for one of these to exist whenever
684    /// the upstream advertises `tools.list_changed` or
685    /// `resources.list_changed` — see
686    /// [`Client::connect`](super::Client::connect).
687    async fn new(
688        http_client: reqwest::Client,
689        url: String,
690        session_id: String,
691        headers: IndexMap<String, String>,
692        backoff_current_interval: Duration,
693        backoff_initial_interval: Duration,
694        backoff_randomization_factor: f64,
695        backoff_multiplier: f64,
696        backoff_max_interval: Duration,
697        backoff_max_elapsed_time: Duration,
698        call_timeout: Duration,
699        initialize_result: super::initialize_result::InitializeResult,
700        initial_sse_lines: Option<super::LinesStream>,
701        is_reconnect: bool,
702    ) -> Arc<Self> {
703        // Cancel-the-listener machinery: store the DropGuard inside the
704        // inner so the cancellation fires deterministically when the
705        // last external `Arc<ConnectionInner>` clone drops. Hand the
706        // listener task a sibling clone (no guard) — that way the
707        // listener task's lifetime does not extend the connection.
708        let listener_cancel = CancellationToken::new();
709        let listener_cancel_for_task = listener_cancel.clone();
710        let conn = Arc::new(Self {
711            http_client,
712            url,
713            session_id,
714            headers,
715            extra_headers: RwLock::new(IndexMap::new()),
716            backoff_current_interval,
717            backoff_initial_interval,
718            backoff_randomization_factor,
719            backoff_multiplier,
720            backoff_max_interval,
721            backoff_max_elapsed_time,
722            call_timeout,
723            initialize_result,
724            mock: false,
725            is_reconnect,
726            any_calls: AtomicBool::new(false),
727            next_id: AtomicU64::new(2),
728            // Start empty; `refresh_tools_signaling` below installs
729            // `Some(_)` before `new` returns (the lock-handoff oneshot
730            // gates the return on the writer holding the lock).
731            tools: RwLock::new(None),
732            resources: RwLock::new(None),
733            _listener_cancel_guard: std::sync::Mutex::new(Some(
734                listener_cancel.drop_guard(),
735            )),
736            on_tools_list_changed: CallbackSlot::new(),
737            on_resources_list_changed: CallbackSlot::new(),
738            tools_changed: Notify::new(),
739            resources_changed: Notify::new(),
740        });
741
742        // Spawn background tool lister if the server supports tools.
743        //
744        // We don't return until the spawned task has acquired the write
745        // lock. Otherwise a caller that immediately reads `list_tools()`
746        // could race the writer — `tokio::spawn` only queues the task,
747        // and a fast reader can acquire the read lock before the writer
748        // has run its first instruction. The reader would then see the
749        // initial empty `Vec` and return that, even though a full
750        // populate is in flight.
751        //
752        // The `RwLockWriteGuard` itself isn't `Send`-friendly enough to
753        // pass back, so we use a oneshot to signal "I'm holding the
754        // lock now"; once we receive that, the cache is exclusively
755        // owned by the writer and any subsequent `read().await` from
756        // the caller is guaranteed to wait for the populate to finish.
757        if conn.initialize_result.capabilities.tools.is_some() {
758            let conn = Arc::clone(&conn);
759            let (lock_held_tx, lock_held_rx) = tokio::sync::oneshot::channel();
760            tokio::spawn(async move {
761                conn.refresh_tools_signaling(lock_held_tx, None).await;
762            });
763            // Wait for the writer to hold the lock before returning.
764            let _ = lock_held_rx.await;
765        }
766
767        // Spawn background resource lister if the server supports
768        // resources. Same lock-handoff contract as tools above.
769        if conn.initialize_result.capabilities.resources.is_some() {
770            let conn = Arc::clone(&conn);
771            let (lock_held_tx, lock_held_rx) = tokio::sync::oneshot::channel();
772            tokio::spawn(async move {
773                conn.refresh_resources_signaling(lock_held_tx, None).await;
774            });
775            let _ = lock_held_rx.await;
776        }
777
778        // Spawn the list-changed listener iff the caller handed us a
779        // pre-opened SSE stream. The connection is naive about
780        // `tools.list_changed` / `resources.list_changed` capabilities —
781        // [`Client::connect`](super::Client::connect) translates them
782        // into "did or didn't open a stream for us." If we get a stream,
783        // we listen on it; if we don't, there's nothing to listen for.
784        if let Some(initial_lines) = initial_sse_lines {
785            // Hand the listener a `Weak` so the spawned task itself does
786            // not keep the connection alive. `listener_cancel_for_task`
787            // is a sibling clone of the connection's own
788            // `_listener_cancel_guard` token — when the last external
789            // `Arc<ConnectionInner>` clone is dropped, the inner's Drop
790            // releases the guard and the listener wakes from any
791            // pending await (read, send, sleep) and exits immediately.
792            let weak = Arc::downgrade(&conn);
793            tokio::spawn(async move {
794                Self::listen_for_list_changes(
795                    weak,
796                    listener_cancel_for_task,
797                    initial_lines,
798                )
799                .await;
800            });
801        }
802
803        conn
804    }
805
806    /// Creates an exponential backoff configuration from the connection's fields.
807    fn backoff(&self) -> backoff::ExponentialBackoff {
808        backoff::ExponentialBackoff {
809            current_interval: self.backoff_current_interval,
810            initial_interval: self.backoff_initial_interval,
811            randomization_factor: self.backoff_randomization_factor,
812            multiplier: self.backoff_multiplier,
813            max_interval: self.backoff_max_interval,
814            start_time: std::time::Instant::now(),
815            max_elapsed_time: Some(self.backoff_max_elapsed_time),
816            clock: backoff::SystemClock::default(),
817        }
818    }
819
820    /// Builds a POST request with all required headers and the call timeout.
821    async fn post(&self) -> reqwest::RequestBuilder {
822        self.http_client
823            .post(&self.url)
824            .timeout(self.call_timeout)
825            .headers(
826                self.build_request_headers(
827                    Some("application/json"),
828                    Some("application/json, text/event-stream"),
829                )
830                .await,
831            )
832    }
833
834    /// Build the `HeaderMap` stamped on every outbound request. Order
835    /// of insertion drives override semantics — `HeaderMap::insert`
836    /// REPLACES existing values for the same key:
837    ///
838    /// 1. Content-Type / Accept (when supplied by the caller).
839    /// 2. `self.headers` (the per-URL bag set at `Client::connect`).
840    /// 3. `self.extra_headers` (the mutable, session-global overrides
841    ///    — proxies use this for `X-OBJECTIVEAI-RESPONSE-ID` etc).
842    /// 4. `Mcp-Session-Id` (the connection's own session id, always
843    ///    last so it can never be shadowed).
844    async fn build_request_headers(
845        &self,
846        content_type: Option<&str>,
847        accept: Option<&str>,
848    ) -> reqwest::header::HeaderMap {
849        use reqwest::header::{
850            ACCEPT, CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue,
851        };
852        let mut hmap = HeaderMap::new();
853        if let Some(ct) = content_type {
854            if let Ok(hv) = HeaderValue::from_str(ct) {
855                hmap.insert(CONTENT_TYPE, hv);
856            }
857        }
858        if let Some(a) = accept {
859            if let Ok(hv) = HeaderValue::from_str(a) {
860                hmap.insert(ACCEPT, hv);
861            }
862        }
863        for (k, v) in &self.headers {
864            if let (Ok(hn), Ok(hv)) = (
865                HeaderName::try_from(k.as_str()),
866                HeaderValue::from_str(v),
867            ) {
868                hmap.insert(hn, hv);
869            }
870        }
871        let extras = self.extra_headers.read().await;
872        for (k, v) in extras.iter() {
873            if let (Ok(hn), Ok(hv)) = (
874                HeaderName::try_from(k.as_str()),
875                HeaderValue::from_str(v),
876            ) {
877                hmap.insert(hn, hv);
878            }
879        }
880        drop(extras);
881        if let Ok(hv) = HeaderValue::from_str(&self.session_id) {
882            hmap.insert(HeaderName::from_static("mcp-session-id"), hv);
883        }
884        hmap
885    }
886
887    /// Sends a JSON-RPC request, retrying transient errors when
888    /// `idempotent` is `true`.
889    ///
890    /// Idempotent methods (`tools/list`, `resources/list`,
891    /// `resources/read`, etc.) retry every transient error — network,
892    /// HTTP status, malformed body, JSON-RPC error, session expiration —
893    /// until the backoff's `max_elapsed_time` is exceeded.
894    ///
895    /// Non-idempotent methods (`tools/call`) make exactly one attempt.
896    /// Retrying a `tools/call` is unsafe: a tool may have mutated remote
897    /// state during the first attempt before the response was lost, and
898    /// re-firing the call would mutate state again. Each retry of
899    /// `AppendTask` advances `state.tasks.len()` an extra step, so the
900    /// agent sees a different return value than expected and the
901    /// pid-derived mock seed at the next step diverges. See
902    /// `objectiveai-api/src/agent/completions/client.rs` (sequential
903    /// dispatch) and `mock/client.rs::mock.seed_derive` for the
904    /// downstream consequence.
905    async fn rpc<P: serde::Serialize, R: serde::de::DeserializeOwned>(
906        &self,
907        method: &str,
908        params: &P,
909        idempotent: bool,
910    ) -> Result<R, super::Error> {
911        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
912        let body = serde_json::json!({
913            "jsonrpc": "2.0",
914            "id": id,
915            "method": method,
916            "params": params,
917        });
918
919        let attempt_one = || async {
920            let url = self.url.clone();
921            let response =
922                self.post().await.json(&body).send().await.map_err(|source| {
923                    backoff::Error::transient(super::Error::Request {
924                        url: url.clone(),
925                        source,
926                    })
927                })?;
928
929            if response.status() == reqwest::StatusCode::NOT_FOUND {
930                return Err(backoff::Error::transient(
931                    super::Error::SessionExpired { url: url.clone() },
932                ));
933            }
934            if !response.status().is_success() {
935                let code = response.status();
936                let body = response.text().await.unwrap_or_default();
937                return Err(backoff::Error::transient(
938                    super::Error::BadStatus {
939                        url: url.clone(),
940                        code,
941                        body,
942                    },
943                ));
944            }
945
946            let rpc_response: super::JsonRpcResponse<R> =
947                super::parse_streamable_http_response(&url, response)
948                    .await
949                    .map_err(backoff::Error::transient)?;
950
951            match rpc_response {
952                super::JsonRpcResponse::Success { result, .. } => Ok(result),
953                super::JsonRpcResponse::Error { error, .. } => {
954                    Err(backoff::Error::transient(super::Error::JsonRpc {
955                        url: url.clone(),
956                        code: error.code,
957                        message: error.message,
958                        data: error.data,
959                    }))
960                }
961            }
962        };
963
964        if idempotent {
965            backoff::future::retry(self.backoff(), attempt_one).await
966        } else {
967            attempt_one().await.map_err(|e| match e {
968                backoff::Error::Permanent(err)
969                | backoff::Error::Transient { err, .. } => err,
970            })
971        }
972    }
973
974    /// Sends a JSON-RPC notification (no response expected) with the
975    /// same exponential-backoff retry policy as [`Self::rpc`]. Every
976    /// error is transient; the loop gives up only when the backoff's
977    /// `max_elapsed_time` is exceeded.
978    async fn notify<P: serde::Serialize>(
979        &self,
980        method: &str,
981        params: &P,
982    ) -> Result<(), super::Error> {
983        if self.mock {
984            return Ok(());
985        }
986        let body = serde_json::json!({
987            "jsonrpc": "2.0",
988            "method": method,
989            "params": params,
990        });
991
992        backoff::future::retry(self.backoff(), || async {
993            let url = self.url.clone();
994            let response =
995                self.post().await.json(&body).send().await.map_err(|source| {
996                    backoff::Error::transient(super::Error::Request {
997                        url: url.clone(),
998                        source,
999                    })
1000                })?;
1001
1002            if response.status() == reqwest::StatusCode::NOT_FOUND {
1003                return Err(backoff::Error::transient(
1004                    super::Error::SessionExpired { url: url.clone() },
1005                ));
1006            }
1007            if !response.status().is_success() {
1008                let code = response.status();
1009                let body = response.text().await.unwrap_or_default();
1010                return Err(backoff::Error::transient(
1011                    super::Error::BadStatus {
1012                        url: url.clone(),
1013                        code,
1014                        body,
1015                    },
1016                ));
1017            }
1018
1019            Ok(())
1020        })
1021        .await
1022    }
1023
1024    /// `GET <self.url>/notify` against the ObjectiveAI MCP proxy.
1025    /// Atomically drains the proxy's pending-notifications queue for
1026    /// this session and returns the queued content blocks.
1027    ///
1028    /// Single-attempt — the proxy drain is destructive, so a retry
1029    /// after a transient failure would risk silently dropping
1030    /// notifications that the first attempt's response carried but
1031    /// failed to deliver. Networks errors propagate to the caller; the
1032    /// next turn's drain will pick up anything queued in the meantime.
1033    /// A 404 (session unknown) is mapped to `Ok(vec![])` — see the
1034    /// public method's doc on `Connection`.
1035    async fn drain_notifications(
1036        &self,
1037    ) -> Result<Vec<super::tool::ContentBlock>, super::Error> {
1038        if self.mock {
1039            return Ok(Vec::new());
1040        }
1041
1042        let url = format!("{}/notify", self.url.trim_end_matches('/'));
1043        let request = self
1044            .http_client
1045            .get(&url)
1046            .timeout(self.call_timeout)
1047            .headers(
1048                self.build_request_headers(None, Some("application/json"))
1049                    .await,
1050            );
1051
1052        let response =
1053            request
1054                .send()
1055                .await
1056                .map_err(|source| super::Error::Request {
1057                    url: url.clone(),
1058                    source,
1059                })?;
1060
1061        if response.status() == reqwest::StatusCode::NOT_FOUND {
1062            return Ok(Vec::new());
1063        }
1064        if !response.status().is_success() {
1065            let code = response.status();
1066            let body = response.text().await.unwrap_or_default();
1067            return Err(super::Error::BadStatus { url, code, body });
1068        }
1069
1070        response
1071            .json::<Vec<super::tool::ContentBlock>>()
1072            .await
1073            .map_err(|source| super::Error::Request { url, source })
1074    }
1075
1076    /// `POST <self.url>/notify` against the ObjectiveAI MCP proxy.
1077    /// Appends `blocks` to the proxy's pending-notifications queue for
1078    /// this session. Single-attempt — the caller decides whether to
1079    /// retry. A 404 (session unknown) surfaces as `SessionExpired`
1080    /// rather than `Ok(())` because the caller is asking for delivery
1081    /// and a lost session means delivery did not happen.
1082    async fn enqueue_notifications(
1083        &self,
1084        blocks: &[super::tool::ContentBlock],
1085    ) -> Result<(), super::Error> {
1086        if self.mock {
1087            return Ok(());
1088        }
1089
1090        let url = format!("{}/notify", self.url.trim_end_matches('/'));
1091        let request = self
1092            .http_client
1093            .post(&url)
1094            .timeout(self.call_timeout)
1095            .headers(
1096                self.build_request_headers(
1097                    Some("application/json"),
1098                    Some("application/json"),
1099                )
1100                .await,
1101            )
1102            .json(blocks);
1103
1104        let response =
1105            request
1106                .send()
1107                .await
1108                .map_err(|source| super::Error::Request {
1109                    url: url.clone(),
1110                    source,
1111                })?;
1112
1113        if response.status() == reqwest::StatusCode::NOT_FOUND {
1114            return Err(super::Error::SessionExpired { url });
1115        }
1116        if !response.status().is_success() {
1117            let code = response.status();
1118            let body = response.text().await.unwrap_or_default();
1119            return Err(super::Error::BadStatus { url, code, body });
1120        }
1121
1122        Ok(())
1123    }
1124
1125    /// `GET <self.url>/notify/queued` against the ObjectiveAI MCP proxy.
1126    /// Non-draining peek — returns `true` iff the proxy's
1127    /// pending-notifications queue for this session is non-empty.
1128    /// A 404 (session unknown) is mapped to `Ok(false)` to match the
1129    /// drain path's soft-fallback contract.
1130    async fn has_pending_notifications(&self) -> Result<bool, super::Error> {
1131        if self.mock {
1132            return Ok(false);
1133        }
1134
1135        let url = format!("{}/notify/queued", self.url.trim_end_matches('/'));
1136        let request = self
1137            .http_client
1138            .get(&url)
1139            .timeout(self.call_timeout)
1140            .headers(
1141                self.build_request_headers(None, Some("application/json"))
1142                    .await,
1143            );
1144
1145        let response =
1146            request
1147                .send()
1148                .await
1149                .map_err(|source| super::Error::Request {
1150                    url: url.clone(),
1151                    source,
1152                })?;
1153
1154        if response.status() == reqwest::StatusCode::NOT_FOUND {
1155            return Ok(false);
1156        }
1157        if !response.status().is_success() {
1158            let code = response.status();
1159            let body = response.text().await.unwrap_or_default();
1160            return Err(super::Error::BadStatus { url, code, body });
1161        }
1162
1163        response
1164            .json::<bool>()
1165            .await
1166            .map_err(|source| super::Error::Request { url, source })
1167    }
1168
1169    /// Returns a key identifying this connection for tool namespacing.
1170    fn tool_key(&self) -> String {
1171        format!("{}-{}", self.initialize_result.server_info.name, self.url)
1172    }
1173
1174    /// Returns the session ID for this connection.
1175    fn session_id(&self) -> &str {
1176        &self.session_id
1177    }
1178
1179    /// Sends a `tools/list` RPC call for a single page.
1180    async fn rpc_list_tools(
1181        &self,
1182        cursor: Option<&str>,
1183    ) -> Result<super::tool::ListToolsResult, super::Error> {
1184        self.rpc(
1185            "tools/list",
1186            &super::tool::ListToolsRequest {
1187                cursor: cursor.map(String::from),
1188            },
1189            true,
1190        )
1191        .await
1192    }
1193
1194    /// Returns all tools from the server.
1195    ///
1196    /// Blocks until background pagination completes, then returns a
1197    /// cheap `Arc` clone of the result. If the cache is currently
1198    /// empty (e.g. because the listener detected its SSE stream drop
1199    /// and cleared it) this paginates inline against the upstream —
1200    /// the caller gets fresh data on the happy path, or the live
1201    /// upstream error if the connection is genuinely down, rather
1202    /// than stale pre-drop tools.
1203    async fn list_tools(
1204        &self,
1205    ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
1206        if let Some(cached) = self.tools.read().await.as_ref() {
1207            return cached.clone();
1208        }
1209        // Cache cleared; refresh inline. Concurrent callers may each
1210        // refresh — wasteful, but the proxy fans out across distinct
1211        // upstreams so a single Connection rarely sees concurrent
1212        // `list_tools` calls.
1213        self.refresh_tools(None).await;
1214        self.tools
1215            .read()
1216            .await
1217            .as_ref()
1218            .expect("refresh_tools installs Some")
1219            .clone()
1220    }
1221
1222    /// Calls a tool on the MCP server. The returned
1223    /// `CallToolResult.content` is **fully resolved**: every
1224    /// `ContentBlock::ResourceLink { uri }` whose URI appears in
1225    /// `list_resources` has already been replaced by one or more
1226    /// `ContentBlock::EmbeddedResource` blocks carrying the fetched
1227    /// contents (via `read_resource`). Unknown-URI links pass through
1228    /// untouched — the upstream server may have its own out-of-band
1229    /// resolution path the caller should preserve.
1230    ///
1231    /// Resolving inside `call_tool` (rather than downstream in
1232    /// `call_tool_as_message`) means every consumer of the result
1233    /// sees `EmbeddedResource` shapes uniformly; the stateless
1234    /// `From<ContentBlock>` impl is then enough to convert the whole
1235    /// result to `RichContent` with no further connection work.
1236    async fn call_tool(
1237        &self,
1238        params: &super::tool::CallToolRequestParams,
1239    ) -> Result<super::tool::CallToolResult, super::Error> {
1240        // Mark the connection as deliberately used. Flipped at the top
1241        // of the method (not after success) because even a failed
1242        // `tools/call` may have mutated upstream state — we don't want
1243        // the drop-time orphan-DELETE second-guessing that.
1244        self.any_calls.store(true, Ordering::Relaxed);
1245        if self.mock {
1246            return Ok(super::tool::CallToolResult {
1247                content: vec![super::tool::ContentBlock::Text(
1248                    super::tool::TextContent {
1249                        text: "mock".to_string(),
1250                        annotations: None,
1251                        _meta: None,
1252                    },
1253                )],
1254                structured_content: None,
1255                is_error: None,
1256                _meta: None,
1257            });
1258        }
1259        let mut result: super::tool::CallToolResult =
1260            self.rpc("tools/call", params, false).await?;
1261
1262        // Build the known-resource URI set for ResourceLink
1263        // resolution. `list_resources` failure → empty set (same
1264        // safe fallback the resolution path used previously).
1265        let known_uris: std::collections::HashSet<String> =
1266            match self.list_resources().await {
1267                Ok(rs) => rs.iter().map(|r| r.uri.clone()).collect(),
1268                Err(_) => std::collections::HashSet::new(),
1269            };
1270
1271        // Walk the blocks, replacing each resolvable ResourceLink
1272        // with one EmbeddedResource per returned ResourceContentsUnion.
1273        // Everything else (Text, Image, Audio, EmbeddedResource,
1274        // unknown-URI ResourceLinks) passes through.
1275        let mut resolved: Vec<super::tool::ContentBlock> =
1276            Vec::with_capacity(result.content.len());
1277        for block in std::mem::take(&mut result.content) {
1278            match block {
1279                super::tool::ContentBlock::ResourceLink(link)
1280                    if known_uris.contains(&link.uri) =>
1281                {
1282                    let read = self.read_resource(&link.uri).await?;
1283                    for contents in read.contents {
1284                        resolved.push(
1285                            super::tool::ContentBlock::EmbeddedResource(
1286                                super::tool::EmbeddedResource {
1287                                    resource: contents,
1288                                    // ResourceLink's annotations
1289                                    // don't have a perfect home on
1290                                    // EmbeddedResource — both fields
1291                                    // exist but they describe different
1292                                    // shapes (the link vs the inlined
1293                                    // contents). Drop them on the way
1294                                    // in; the EmbeddedResource is now
1295                                    // a fresh authoritative block.
1296                                    annotations: None,
1297                                    _meta: None,
1298                                },
1299                            ),
1300                        );
1301                    }
1302                }
1303                other => resolved.push(other),
1304            }
1305        }
1306        result.content = resolved;
1307        Ok(result)
1308    }
1309
1310    /// Calls a tool and converts the (already-resolved) result into a
1311    /// [`ToolMessage`]. Resource resolution happens inside
1312    /// [`Self::call_tool`] — by the time we get the blocks here every
1313    /// resolvable `ResourceLink` has already been replaced with an
1314    /// `EmbeddedResource`, so the conversion is a pure stateless
1315    /// element-wise map through [`From<ContentBlock> for
1316    /// RichContentPart`](crate::agent::completions::message::RichContentPart).
1317    ///
1318    /// Content-block mapping (handled by the `From` impl):
1319    /// - `text` → text part
1320    /// - `image` → image_url part (data URL)
1321    /// - `audio` → input_audio part
1322    /// - `embedded_resource` (text) → text part
1323    /// - `embedded_resource` (blob, image mime) → image_url part
1324    /// - `embedded_resource` (blob, audio mime) → input_audio part
1325    /// - `embedded_resource` (blob, video mime) → input_video part
1326    /// - `embedded_resource` (blob, other mime) → file part
1327    /// - `resource_link` (unknown URI) → JSON-text fallback
1328    ///   (resolvable URIs were already resolved upstream)
1329    async fn call_tool_as_message(
1330        &self,
1331        params: &super::tool::CallToolRequestParams,
1332        tool_call_id: String,
1333    ) -> Result<crate::agent::completions::message::ToolMessage, super::Error>
1334    {
1335        use crate::agent::completions::message::{
1336            RichContentPart, ToolMessage, ToolResponseMetadata,
1337        };
1338
1339        let result = self.call_tool(params).await?;
1340
1341        let parts: Vec<RichContentPart> =
1342            result.content.into_iter().map(Into::into).collect();
1343
1344        // Lossy-decode the MCP `_meta` extension bag into our typed
1345        // `ToolResponseMetadata`. Unknown keys (set by non-objectiveai
1346        // upstreams) are silently dropped. Decoding failure leaves
1347        // metadata as `None`.
1348        let metadata = result._meta.as_ref().and_then(|m| {
1349            serde_json::from_value::<ToolResponseMetadata>(
1350                serde_json::to_value(m).ok()?,
1351            )
1352            .ok()
1353        });
1354
1355        Ok(ToolMessage {
1356            content: parts.into(),
1357            tool_call_id,
1358            metadata,
1359        })
1360    }
1361
1362    /// Sends a `resources/list` RPC call for a single page.
1363    async fn rpc_list_resources(
1364        &self,
1365        cursor: Option<&str>,
1366    ) -> Result<super::resource::ListResourcesResult, super::Error> {
1367        self.rpc(
1368            "resources/list",
1369            &super::resource::ListResourcesRequest {
1370                cursor: cursor.map(String::from),
1371            },
1372            true,
1373        )
1374        .await
1375    }
1376
1377    /// Returns all resources from the server.
1378    ///
1379    /// Same cache-or-refresh semantics as [`Self::list_tools`]: a
1380    /// cleared cache (post-drop or pre-populate) triggers an inline
1381    /// paginate against the upstream.
1382    async fn list_resources(
1383        &self,
1384    ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
1385        if let Some(cached) = self.resources.read().await.as_ref() {
1386            return cached.clone();
1387        }
1388        self.refresh_resources(None).await;
1389        self.resources
1390            .read()
1391            .await
1392            .as_ref()
1393            .expect("refresh_resources installs Some")
1394            .clone()
1395    }
1396
1397    /// Returns the cached tool list as soon as it differs from `current`,
1398    /// or — if it equals `current` right now — waits up to `timeout` for
1399    /// the cache to change and then returns whatever it sees.
1400    ///
1401    /// An `Err` cache is treated as "different from any caller snapshot"
1402    /// and returned immediately.
1403    ///
1404    /// Concurrency-safe: any number of concurrent subscribers wait on
1405    /// independent `Notified` futures and read the cache through the
1406    /// shared `RwLock`. A timeout that fires alone is not an error — we
1407    /// re-read the cache and return whatever's there.
1408    async fn subscribe_tools(
1409        &self,
1410        current: &[super::tool::Tool],
1411        timeout: Duration,
1412    ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
1413        // Arm BEFORE reading. `enable()` registers the future in the
1414        // wait queue without polling, so a `notify_waiters` racing
1415        // between our read and our await still wakes us.
1416        let notified = self.tools_changed.notified();
1417        tokio::pin!(notified);
1418        notified.as_mut().enable();
1419
1420        // `list_tools` handles a cleared cache (post-drop) by
1421        // paginating inline. A `None` initial state can't be
1422        // compared to the caller's snapshot meaningfully — promote
1423        // to whatever the refresh installs.
1424        let initial = self.list_tools().await;
1425        match &initial {
1426            Ok(arc) if arc.as_slice() == current => {}
1427            _ => return initial,
1428        }
1429
1430        let _ = tokio::time::timeout(timeout, notified).await;
1431
1432        self.list_tools().await
1433    }
1434
1435    /// Resource counterpart of [`Self::subscribe_tools`].
1436    async fn subscribe_resources(
1437        &self,
1438        current: &[super::resource::Resource],
1439        timeout: Duration,
1440    ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
1441        let notified = self.resources_changed.notified();
1442        tokio::pin!(notified);
1443        notified.as_mut().enable();
1444
1445        let initial = self.list_resources().await;
1446        match &initial {
1447            Ok(arc) if arc.as_slice() == current => {}
1448            _ => return initial,
1449        }
1450
1451        let _ = tokio::time::timeout(timeout, notified).await;
1452
1453        self.list_resources().await
1454    }
1455
1456    /// Reads a resource from the MCP server.
1457    async fn read_resource(
1458        &self,
1459        uri: &str,
1460    ) -> Result<super::resource::ReadResourceResult, super::Error> {
1461        // Mark the connection as deliberately used (same reasoning as
1462        // `call_tool` — see the drop-time orphan-DELETE gate).
1463        self.any_calls.store(true, Ordering::Relaxed);
1464        self.rpc(
1465            "resources/read",
1466            &super::resource::ReadResourceRequestParams {
1467                uri: uri.to_string(),
1468            },
1469            true,
1470        )
1471        .await
1472    }
1473
1474    /// Re-fetches all tools from the server, replacing the cached list.
1475    ///
1476    /// Optionally fires `on_change` *after* the write lock is acquired but
1477    /// *before* the network paginate begins, so the callback observes the
1478    /// "list change is in flight" edge — readers blocked on the read lock
1479    /// won't return until the new list lands. The proxy uses this to
1480    /// re-emit `notifications/tools/list_changed` to its downstream client
1481    /// at the moment the staleness window opens.
1482    async fn refresh_tools(&self, on_change: Option<ListChangedCallback>) {
1483        // Listener-driven refresh. Visibility contract: any caller
1484        // that issues `list_tools()` after a `tools/list_changed`
1485        // notification has been observed must see the post-swap
1486        // value, not stale data — so the write lock has to gate
1487        // readers across the upstream paginate.
1488        //
1489        // Performance contract: don't serialise paginate *behind*
1490        // lock-acquisition latency. We start `tools.write()` and the
1491        // upstream paginate **concurrently** with `tokio::join!`. The
1492        // write-lock acquire blocks new `list_tools()` readers
1493        // immediately (preserving visibility) and runs in parallel
1494        // with whatever drain time the in-flight readers need; the
1495        // paginate runs alongside. Total wall-clock is
1496        // `max(drain_time, paginate_time)` instead of the sum.
1497        //
1498        // `notify_waiters` and `on_change` fire under the write
1499        // guard, *after* `*guard = result`, so anyone awoken by them
1500        // queues on the read lock, waits for the guard to drop, and
1501        // observes the post-swap state.
1502        let (mut guard, result) =
1503            tokio::join!(self.tools.write(), self.paginate_tools(),);
1504        *guard = Some(result);
1505        self.tools_changed.notify_waiters();
1506        if let Some(cb) = on_change {
1507            cb();
1508        }
1509    }
1510
1511    /// Page-by-page fetch of the upstream tool list, no locks held.
1512    /// Shared between the `_signaling` (initial-populate, holds lock
1513    /// for the original "block fast readers" contract) and `refresh_*`
1514    /// (listener-driven, lock-only-around-install) variants.
1515    async fn paginate_tools(
1516        &self,
1517    ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
1518        let mut all_tools = Vec::new();
1519        let mut cursor: Option<String> = None;
1520        loop {
1521            match self.rpc_list_tools(cursor.as_deref()).await {
1522                Ok(page) => {
1523                    all_tools.extend(page.tools);
1524                    cursor = page.next_cursor;
1525                    if cursor.is_none() {
1526                        return Ok(Arc::new(all_tools));
1527                    }
1528                }
1529                Err(e) => return Err(Arc::new(e)),
1530            }
1531        }
1532    }
1533
1534    /// Same as [`Self::refresh_tools`] but fires `lock_held` once the
1535    /// write lock has been acquired so the caller can synchronise on
1536    /// "writer is in possession of the cache" before returning. Used by
1537    /// `ConnectionInner::new` to prevent a fast reader from acquiring
1538    /// the read lock before this writer has even started.
1539    async fn refresh_tools_signaling(
1540        &self,
1541        lock_held: tokio::sync::oneshot::Sender<()>,
1542        on_change: Option<ListChangedCallback>,
1543    ) {
1544        let mut guard = self.tools.write().await;
1545        // Fire `tools_changed` while we hold the write lock and *before*
1546        // installing the new list. Any subscriber woken now must take a
1547        // read lock to observe the result, and that read lock is queued
1548        // behind this write guard — so they always see the post-swap
1549        // state, never mid-swap.
1550        self.tools_changed.notify_waiters();
1551        let _ = lock_held.send(());
1552        if let Some(cb) = on_change {
1553            cb();
1554        }
1555        let mut all_tools = Vec::new();
1556        let mut cursor: Option<String> = None;
1557        let result = loop {
1558            match self.rpc_list_tools(cursor.as_deref()).await {
1559                Ok(page) => {
1560                    all_tools.extend(page.tools);
1561                    cursor = page.next_cursor;
1562                    if cursor.is_none() {
1563                        break Ok(Arc::new(all_tools));
1564                    }
1565                }
1566                Err(e) => break Err(Arc::new(e)),
1567            }
1568        };
1569        *guard = Some(result);
1570    }
1571
1572    /// Re-fetches all resources from the server, replacing the cached list.
1573    /// See [`ConnectionInner::refresh_tools`] for the callback timing
1574    /// contract.
1575    async fn refresh_resources(&self, on_change: Option<ListChangedCallback>) {
1576        // Same paginate-while-acquiring-the-write-lock pattern as
1577        // `refresh_tools` — see that comment for the visibility +
1578        // performance rationale.
1579        let (mut guard, result) =
1580            tokio::join!(self.resources.write(), self.paginate_resources(),);
1581        *guard = Some(result);
1582        self.resources_changed.notify_waiters();
1583        if let Some(cb) = on_change {
1584            cb();
1585        }
1586    }
1587
1588    async fn paginate_resources(
1589        &self,
1590    ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
1591        let mut all_resources = Vec::new();
1592        let mut cursor: Option<String> = None;
1593        loop {
1594            match self.rpc_list_resources(cursor.as_deref()).await {
1595                Ok(page) => {
1596                    all_resources.extend(page.resources);
1597                    cursor = page.next_cursor;
1598                    if cursor.is_none() {
1599                        return Ok(Arc::new(all_resources));
1600                    }
1601                }
1602                Err(e) => return Err(Arc::new(e)),
1603            }
1604        }
1605    }
1606
1607    /// Resource counterpart of [`Self::refresh_tools_signaling`].
1608    async fn refresh_resources_signaling(
1609        &self,
1610        lock_held: tokio::sync::oneshot::Sender<()>,
1611        on_change: Option<ListChangedCallback>,
1612    ) {
1613        let mut guard = self.resources.write().await;
1614        // See `refresh_tools_signaling` — fire under the write lock,
1615        // before install, so subscribers' next read sees the post-swap
1616        // state.
1617        self.resources_changed.notify_waiters();
1618        let _ = lock_held.send(());
1619        if let Some(cb) = on_change {
1620            cb();
1621        }
1622        let mut all_resources = Vec::new();
1623        let mut cursor: Option<String> = None;
1624        let result = loop {
1625            match self.rpc_list_resources(cursor.as_deref()).await {
1626                Ok(page) => {
1627                    all_resources.extend(page.resources);
1628                    cursor = page.next_cursor;
1629                    if cursor.is_none() {
1630                        break Ok(Arc::new(all_resources));
1631                    }
1632                }
1633                Err(e) => break Err(Arc::new(e)),
1634            }
1635        };
1636        *guard = Some(result);
1637    }
1638
1639    /// Builds a GET request to the MCP endpoint for receiving server
1640    /// notifications via SSE.
1641    async fn get(&self) -> reqwest::RequestBuilder {
1642        self.http_client
1643            .get(&self.url)
1644            .headers(
1645                self.build_request_headers(None, Some("text/event-stream"))
1646                    .await,
1647            )
1648    }
1649
1650    /// Listens for `notifications/tools/list_changed` and
1651    /// `notifications/resources/list_changed` on an SSE stream. On each
1652    /// notification, write-locks and re-fetches the full list.
1653    ///
1654    /// `initial_lines` is the pre-opened SSE line reader handed in by
1655    /// [`Client::connect`](super::Client::connect) — that stream is
1656    /// consumed first. When it ends (or any later GET reconnect ends),
1657    /// we sleep `backoff_initial_interval` and open a fresh GET `/` SSE
1658    /// stream.
1659    ///
1660    /// Takes a [`Weak<Self>`] (not `Arc<Self>`) so the spawned task
1661    /// doesn't itself keep the [`Connection`] alive, and a
1662    /// [`CancellationToken`] sibling clone of the connection's
1663    /// [`DropGuard`] so the task tears down the instant the last
1664    /// external `Arc<ConnectionInner>` clone is dropped — every
1665    /// blocking await (line read, reconnect send, backoff sleep) is
1666    /// raced against `cancel.cancelled()` and exits without any zombie
1667    /// retries against a now-dead session.
1668    async fn listen_for_list_changes(
1669        weak: Weak<Self>,
1670        cancel: CancellationToken,
1671        initial_lines: super::LinesStream,
1672    ) {
1673        // First iteration: use the pre-opened SSE stream the client
1674        // handed us. After that, fall back to opening fresh GET / SSE
1675        // streams as the upstream connection cycles.
1676        let mut next_lines: Option<super::LinesStream> = Some(initial_lines);
1677        // One-shot guard for the catch-up refresh: false on the very
1678        // first iteration (the caller's pre-opened SSE stream — its
1679        // associated cache was just populated by `Client::connect`'s
1680        // initial pagination, so re-fetching there would just be a
1681        // wasted round-trip), true thereafter. Every stream end —
1682        // whether `Ok(None)` (clean close) or `Err(_)` (read failure)
1683        // — drops back here, which we treat as an implicit
1684        // list-changed notification: the upstream's broadcast (in
1685        // particular the proxy's per-session `tokio::broadcast`) is
1686        // lossy for moments when this listener has zero active
1687        // subscribers, so anything that fired during our disconnect
1688        // window may have been dropped.
1689        //
1690        // ORDER MATTERS. The refresh must run AFTER we've re-opened
1691        // the GET / SSE stream — i.e. after we're a subscriber again
1692        // — and BEFORE we enter the inner read loop. If we refreshed
1693        // before the resubscribe, a notification that fired between
1694        // our refresh-completion and our subscribe would be lost the
1695        // same way as the original disconnect-window drops; doing it
1696        // after means a notification fired DURING the refresh lands
1697        // in the new subscriber's buffer (broadcast::Sender::send
1698        // backs onto each receiver's channel-capacity slot) and gets
1699        // consumed by the inner loop on its next read.
1700        let mut is_reconnect = false;
1701
1702        loop {
1703            // The token cancels deterministically when the last
1704            // `Arc<ConnectionInner>` clone is dropped (see
1705            // `_listener_cancel_guard`). Check once per outer
1706            // iteration, but the real protection is the cancel arms in
1707            // every blocking await below — those exit immediately on
1708            // cancel.
1709            if cancel.is_cancelled() {
1710                return;
1711            }
1712            let Some(this) = weak.upgrade() else { return };
1713            let backoff_delay = this.backoff_initial_interval;
1714
1715            let mut lines = match next_lines.take() {
1716                Some(l) => l,
1717                None => {
1718                    // Race the upstream GET against cancellation — if
1719                    // the connection drops mid-reconnect, exit
1720                    // immediately rather than waiting for the request
1721                    // to complete or time out (otherwise produces a
1722                    // burst of 401 retries against a now-dead session
1723                    // under heavy churn).
1724                    let send_outcome = tokio::select! {
1725                        out = async {
1726                            this.get().await.send().await
1727                        } => out,
1728                        _ = cancel.cancelled() => {
1729                            drop(this);
1730                            return;
1731                        }
1732                    };
1733                    let response = match send_outcome {
1734                        Ok(r) if r.status().is_success() => r,
1735                        _ => {
1736                            drop(this);
1737                            // Sleep with cancel-arm: instant exit on
1738                            // drop, no zombie retries.
1739                            tokio::select! {
1740                                _ = tokio::time::sleep(backoff_delay) => {}
1741                                _ = cancel.cancelled() => return,
1742                            }
1743                            continue;
1744                        }
1745                    };
1746                    super::lines_from_response(response)
1747                }
1748            };
1749
1750            // Catch-up refresh on every reconnect — the implicit
1751            // list-changed treatment for the just-failed stream. See
1752            // the `is_reconnect` doc-comment above for the
1753            // refresh-AFTER-resubscribe rationale.
1754            if is_reconnect {
1755                // tools and resources are independent locks; run the
1756                // catch-up refreshes concurrently so disconnect
1757                // recovery isn't sequential.
1758                let _ = tokio::join!(
1759                    this.refresh_tools(this.on_tools_list_changed.get()),
1760                    this.refresh_resources(
1761                        this.on_resources_list_changed.get()
1762                    ),
1763                );
1764            }
1765            is_reconnect = true;
1766
1767            'inner: loop {
1768                tokio::select! {
1769                    line_result = lines.next_line() => {
1770                        match line_result {
1771                            Ok(Some(line)) => {
1772                                // SSE data lines start with "data: ".
1773                                let Some(data) = line.strip_prefix("data: ") else {
1774                                    continue 'inner;
1775                                };
1776                                let method = match serde_json::from_str::<super::JsonRpcNotification>(data) {
1777                                    Ok(n) => n.method,
1778                                    Err(_) => continue 'inner,
1779                                };
1780                                match method.as_str() {
1781                                    "notifications/tools/list_changed" => {
1782                                        // refresh_tools fires the
1783                                        // callback after the cache is
1784                                        // installed, so the proxy's
1785                                        // downstream
1786                                        // notifications/tools/list_changed
1787                                        // emission lines up with the
1788                                        // staleness window opening.
1789                                        this.refresh_tools(
1790                                            this.on_tools_list_changed.get(),
1791                                        )
1792                                        .await;
1793                                    }
1794                                    "notifications/resources/list_changed" => {
1795                                        this.refresh_resources(
1796                                            this.on_resources_list_changed.get(),
1797                                        )
1798                                        .await;
1799                                    }
1800                                    _ => {}
1801                                }
1802                            }
1803                            // Stream ended cleanly or errored — break out
1804                            // to the outer loop so we either reconnect or,
1805                            // if everyone's gone, exit at the top.
1806                            _ => break 'inner,
1807                        }
1808                    }
1809                    // Cancellation: the connection's last clone has
1810                    // dropped. Tear down immediately.
1811                    _ = cancel.cancelled() => {
1812                        drop(this);
1813                        return;
1814                    }
1815                }
1816            }
1817
1818            // Stream dropped — empty the per-Connection caches so the
1819            // next `list_tools` / `list_resources` paginates inline
1820            // against the (possibly still-dead) upstream rather than
1821            // returning whatever was cached before the drop. The
1822            // `is_reconnect` catch-up at the top of the next outer
1823            // iteration will repopulate `Some(_)` if the reconnect
1824            // succeeds; if the reconnect keeps failing, the cache
1825            // stays `None` and `list_*` callers paginate themselves.
1826            *this.tools.write().await = None;
1827            *this.resources.write().await = None;
1828
1829            // Stream ended — drop the strong ref before sleeping so the
1830            // next iteration's weak-upgrade can detect liveness honestly.
1831            drop(this);
1832            tokio::select! {
1833                _ = tokio::time::sleep(backoff_delay) => {}
1834                _ = cancel.cancelled() => return,
1835            }
1836        }
1837    }
1838}
1839
1840impl Drop for ConnectionInner {
1841    /// Orphan-DELETE hook: when a **freshly-minted** connection (not a
1842    /// resume) is dropped without any deliberate use — no `call_tool`,
1843    /// no `read_resource`, no explicit `Connection::delete` — spawn a
1844    /// fire-and-forget HTTP DELETE so the upstream session we just
1845    /// opened doesn't sit there accruing per-session state for nothing.
1846    ///
1847    /// Reconnect-resumes are deliberately excluded: a reconnect's
1848    /// `any_calls == false` only means *this* `Connection` instance
1849    /// never called anything — the underlying upstream session may
1850    /// well have been used by an earlier `Connection` that opened it,
1851    /// did real work, and let us re-attach. Tearing it down here would
1852    /// kill a still-live session out from under whoever owns it.
1853    /// Reconnects rely on the proxy's explicit `Connection::delete`
1854    /// (or `Client::delete`) for upstream cleanup instead.
1855    ///
1856    /// Skip conditions (any one triggers a no-op):
1857    ///
1858    /// - `mock` is `true` — there was never an HTTP session to begin with.
1859    /// - `is_reconnect` is `true` — see above; the upstream session
1860    ///   pre-existed this connection and isn't ours to tear down on drop.
1861    /// - `any_calls` is `true` — the connection was deliberately used,
1862    ///   or an explicit `Connection::delete` already ran.
1863    /// - No tokio runtime is in scope — `tokio::spawn` would panic.
1864    ///   Silently leak the upstream session in this case (sync
1865    ///   teardown paths e.g. `cfg(test)` blocks not driven by tokio).
1866    ///
1867    /// The listener-cancel `DropGuard` inside `_listener_cancel_guard`
1868    /// fires automatically as part of this same `drop` call, so by the
1869    /// time the orphan DELETE goes out the listener task has already
1870    /// been told to cancel — no SSE/GET race with the upstream DELETE.
1871    fn drop(&mut self) {
1872        if self.mock {
1873            return;
1874        }
1875        if self.is_reconnect {
1876            return;
1877        }
1878        if self.any_calls.load(Ordering::Relaxed) {
1879            return;
1880        }
1881
1882        // Clone out the bits the orphan task needs. None of these are
1883        // big: `reqwest::Client` is itself an `Arc` bump,
1884        // `IndexMap<String, String>` is the per-connection header bag
1885        // (small), and the `String`s are the per-session id + URL.
1886        let http_client = self.http_client.clone();
1887        let url = self.url.clone();
1888        let session_id = self.session_id.clone();
1889        let headers = self.headers.clone();
1890        let timeout = self.call_timeout;
1891
1892        // Spawn only if a tokio runtime is in scope. `tokio::spawn`
1893        // panics outside one — sync teardown paths (e.g. a test that
1894        // builds a `Connection` and lets it drop on a non-async stack)
1895        // would crash without this guard.
1896        if let Ok(handle) = tokio::runtime::Handle::try_current() {
1897            handle.spawn(orphan_delete(
1898                http_client,
1899                url,
1900                session_id,
1901                headers,
1902                timeout,
1903            ));
1904        }
1905    }
1906}
1907
1908/// Fire-and-forget HTTP `DELETE` used by [`ConnectionInner::drop`] to
1909/// release a resumed upstream session that was never used. Mirrors
1910/// [`super::Connection::delete`]'s wire shape (same `Mcp-Session-Id`
1911/// header behavior, same header-loop with the explicit session id
1912/// winning over any `Mcp-Session-Id` entry in `headers`) but never
1913/// surfaces errors — there's no caller left to surface them to. The
1914/// `timeout` (sourced from the originating connection's `call_timeout`)
1915/// caps the request so a hanging upstream can't keep the spawned task
1916/// alive forever.
1917async fn orphan_delete(
1918    http_client: reqwest::Client,
1919    url: String,
1920    session_id: String,
1921    headers: IndexMap<String, String>,
1922    timeout: Duration,
1923) {
1924    let mut request = http_client
1925        .delete(&url)
1926        .timeout(timeout)
1927        .header("Mcp-Session-Id", &session_id);
1928    for (name, value) in &headers {
1929        if name.eq_ignore_ascii_case("Mcp-Session-Id") {
1930            continue;
1931        }
1932        request = request.header(name, value);
1933    }
1934    // Errors silently swallowed: no caller, and the listener-cancel
1935    // guard has already fired (it runs as part of the regular
1936    // `_listener_cancel_guard` drop inside the same `drop` call).
1937    let _ = request.send().await;
1938}
1939
1940#[cfg(test)]
1941mod subscribe_tests {
1942    use super::*;
1943    use crate::mcp::tool::{Tool, ToolSchemaObject, ToolSchemaType};
1944
1945    fn tool(name: &str) -> Tool {
1946        Tool {
1947            name: name.to_string(),
1948            title: None,
1949            description: None,
1950            icons: None,
1951            input_schema: ToolSchemaObject {
1952                r#type: ToolSchemaType::Object,
1953                properties: None,
1954                required: None,
1955                extra: IndexMap::new(),
1956            },
1957            output_schema: None,
1958            annotations: None,
1959            execution: None,
1960            _meta: None,
1961        }
1962    }
1963
1964    /// First read shows a different list — return immediately, never wait.
1965    #[tokio::test]
1966    async fn subscribe_tools_returns_immediately_when_cache_differs() {
1967        let conn = Connection::new_for_test("t".into(), "http://x".into());
1968        *conn.inner.tools.write().await = Some(Ok(Arc::new(vec![tool("a")])));
1969
1970        let start = std::time::Instant::now();
1971        let got = conn
1972            .subscribe_tools(&[tool("b")], Duration::from_secs(5))
1973            .await
1974            .unwrap();
1975        assert!(start.elapsed() < Duration::from_millis(100));
1976        assert_eq!(got.as_slice(), &[tool("a")]);
1977    }
1978
1979    /// Cached `Err` is treated as "different from any caller snapshot."
1980    #[tokio::test]
1981    async fn subscribe_tools_returns_err_immediately() {
1982        let conn = Connection::new_for_test("t".into(), "http://x".into());
1983        let err = super::super::Error::NoSessionId {
1984            url: "http://x".into(),
1985            body: String::new(),
1986        };
1987        *conn.inner.tools.write().await = Some(Err(Arc::new(err)));
1988
1989        let start = std::time::Instant::now();
1990        let got = conn.subscribe_tools(&[], Duration::from_secs(5)).await;
1991        assert!(start.elapsed() < Duration::from_millis(100));
1992        assert!(got.is_err());
1993    }
1994
1995    /// Cache equals snapshot, then a writer fires the notify under the
1996    /// write lock and installs a new list. The subscriber wakes, then its
1997    /// re-read blocks behind the writer's guard, observes the new list.
1998    #[tokio::test]
1999    async fn subscribe_tools_wakes_on_change_and_reads_post_swap() {
2000        let conn = Connection::new_for_test("t".into(), "http://x".into());
2001        *conn.inner.tools.write().await = Some(Ok(Arc::new(vec![tool("a")])));
2002
2003        let conn_for_subscriber = conn.clone();
2004        let subscriber = tokio::spawn(async move {
2005            conn_for_subscriber
2006                .subscribe_tools(&[tool("a")], Duration::from_secs(5))
2007                .await
2008                .unwrap()
2009        });
2010
2011        // Give the subscriber a moment to arm `notified()` and finish
2012        // its first read so it's parked on the timeout.
2013        tokio::time::sleep(Duration::from_millis(50)).await;
2014
2015        // Simulate `refresh_tools_signaling`: take the write lock, fire
2016        // `tools_changed` *while holding* the write lock, then install
2017        // the new value before releasing. This is exactly the ordering
2018        // that the real refresh path uses.
2019        {
2020            let mut guard = conn.inner.tools.write().await;
2021            conn.inner.tools_changed.notify_waiters();
2022            // Hold briefly to make absolutely sure the subscriber is
2023            // racing the read lock against our drop.
2024            tokio::time::sleep(Duration::from_millis(20)).await;
2025            *guard = Some(Ok(Arc::new(vec![tool("b")])));
2026        }
2027
2028        let got = tokio::time::timeout(Duration::from_secs(2), subscriber)
2029            .await
2030            .expect("subscriber returned in time")
2031            .expect("subscriber didn't panic");
2032        assert_eq!(got.as_slice(), &[tool("b")]);
2033    }
2034
2035    /// Cache equals snapshot, no notification arrives — timeout, return
2036    /// the still-equal list (not an error).
2037    #[tokio::test]
2038    async fn subscribe_tools_times_out_and_returns_unchanged_list() {
2039        let conn = Connection::new_for_test("t".into(), "http://x".into());
2040        *conn.inner.tools.write().await = Some(Ok(Arc::new(vec![tool("a")])));
2041
2042        let start = std::time::Instant::now();
2043        let got = conn
2044            .subscribe_tools(&[tool("a")], Duration::from_millis(50))
2045            .await
2046            .unwrap();
2047        let elapsed = start.elapsed();
2048        assert!(elapsed >= Duration::from_millis(40), "elapsed: {elapsed:?}");
2049        assert!(elapsed < Duration::from_millis(500), "elapsed: {elapsed:?}");
2050        assert_eq!(got.as_slice(), &[tool("a")]);
2051    }
2052
2053    /// Two concurrent subscribers both wake on a single notify_waiters
2054    /// and both observe the post-swap list.
2055    #[tokio::test]
2056    async fn subscribe_tools_supports_concurrent_subscribers() {
2057        let conn = Connection::new_for_test("t".into(), "http://x".into());
2058        *conn.inner.tools.write().await = Some(Ok(Arc::new(vec![tool("a")])));
2059
2060        let c1 = conn.clone();
2061        let c2 = conn.clone();
2062        let s1 = tokio::spawn(async move {
2063            c1.subscribe_tools(&[tool("a")], Duration::from_secs(5))
2064                .await
2065                .unwrap()
2066        });
2067        let s2 = tokio::spawn(async move {
2068            c2.subscribe_tools(&[tool("a")], Duration::from_secs(5))
2069                .await
2070                .unwrap()
2071        });
2072
2073        tokio::time::sleep(Duration::from_millis(50)).await;
2074
2075        {
2076            let mut guard = conn.inner.tools.write().await;
2077            conn.inner.tools_changed.notify_waiters();
2078            *guard = Some(Ok(Arc::new(vec![tool("c")])));
2079        }
2080
2081        let (r1, r2) = tokio::join!(s1, s2);
2082        let r1 = r1.unwrap();
2083        let r2 = r2.unwrap();
2084        assert_eq!(r1.as_slice(), &[tool("c")]);
2085        assert_eq!(r2.as_slice(), &[tool("c")]);
2086    }
2087}
2088
2089#[cfg(test)]
2090mod drain_notifications_tests {
2091    use super::*;
2092    use crate::mcp::tool::{ContentBlock, TextContent};
2093    use serde_json::json;
2094    use wiremock::matchers::{header, method, path};
2095    use wiremock::{Mock, MockServer, ResponseTemplate};
2096
2097    /// Happy path: proxy returns `[text, text]`, we parse it as two
2098    /// `ContentBlock::Text` and return them in order.
2099    #[tokio::test]
2100    async fn drain_notifications_parses_text_blocks_in_order() {
2101        let server = MockServer::start().await;
2102        Mock::given(method("GET"))
2103            .and(path("/notify"))
2104            .and(header("Mcp-Session-Id", ""))
2105            .respond_with(ResponseTemplate::new(200).set_body_json(json!([
2106                {"type": "text", "text": "first"},
2107                {"type": "text", "text": "second"},
2108            ])))
2109            .mount(&server)
2110            .await;
2111
2112        let conn = Connection::new_for_test("t".into(), server.uri());
2113        let blocks = conn.drain_notifications().await.expect("drain ok");
2114        assert_eq!(blocks.len(), 2);
2115        match &blocks[0] {
2116            ContentBlock::Text(TextContent { text, .. }) => {
2117                assert_eq!(text, "first")
2118            }
2119            other => panic!("expected text, got {other:?}"),
2120        }
2121        match &blocks[1] {
2122            ContentBlock::Text(TextContent { text, .. }) => {
2123                assert_eq!(text, "second")
2124            }
2125            other => panic!("expected text, got {other:?}"),
2126        }
2127    }
2128
2129    /// 404 (proxy lost the session, e.g. after a restart) → empty vec
2130    /// rather than an error. The next upstream call will surface the
2131    /// session-lost condition through its own error path; init-time
2132    /// drain shouldn't be the one to abort the request.
2133    #[tokio::test]
2134    async fn drain_notifications_404_returns_empty() {
2135        let server = MockServer::start().await;
2136        Mock::given(method("GET"))
2137            .and(path("/notify"))
2138            .respond_with(ResponseTemplate::new(404))
2139            .mount(&server)
2140            .await;
2141
2142        let conn = Connection::new_for_test("t".into(), server.uri());
2143        let blocks = conn.drain_notifications().await.expect("404 → ok(empty)");
2144        assert!(blocks.is_empty(), "expected empty vec, got {blocks:?}");
2145    }
2146
2147    /// Empty queue → empty array → empty vec. The most common case.
2148    #[tokio::test]
2149    async fn drain_notifications_empty_queue_returns_empty() {
2150        let server = MockServer::start().await;
2151        Mock::given(method("GET"))
2152            .and(path("/notify"))
2153            .respond_with(ResponseTemplate::new(200).set_body_json(json!([])))
2154            .mount(&server)
2155            .await;
2156
2157        let conn = Connection::new_for_test("t".into(), server.uri());
2158        let blocks = conn.drain_notifications().await.expect("drain ok");
2159        assert!(blocks.is_empty(), "expected empty vec, got {blocks:?}");
2160    }
2161
2162    /// Non-success / non-404 status propagates as `BadStatus`.
2163    #[tokio::test]
2164    async fn drain_notifications_5xx_returns_bad_status() {
2165        let server = MockServer::start().await;
2166        Mock::given(method("GET"))
2167            .and(path("/notify"))
2168            .respond_with(ResponseTemplate::new(500).set_body_string("boom"))
2169            .mount(&server)
2170            .await;
2171
2172        let conn = Connection::new_for_test("t".into(), server.uri());
2173        let err = conn.drain_notifications().await.expect_err("5xx → err");
2174        match err {
2175            super::super::Error::BadStatus { code, body, .. } => {
2176                assert_eq!(code.as_u16(), 500);
2177                assert_eq!(body, "boom");
2178            }
2179            other => panic!("expected BadStatus, got {other:?}"),
2180        }
2181    }
2182
2183    /// Mock connections never hit the network and always return empty.
2184    #[tokio::test]
2185    async fn drain_notifications_mock_returns_empty() {
2186        let conn = Connection::new_mock("http://does-not-matter".into());
2187        let blocks = conn.drain_notifications().await.expect("mock ok");
2188        assert!(blocks.is_empty());
2189    }
2190}
2191
2192#[cfg(test)]
2193mod has_pending_notifications_tests {
2194    use super::*;
2195    use serde_json::json;
2196    use wiremock::matchers::{header, method, path};
2197    use wiremock::{Mock, MockServer, ResponseTemplate};
2198
2199    /// Happy path: proxy returns `true` → Ok(true).
2200    #[tokio::test]
2201    async fn has_pending_notifications_true() {
2202        let server = MockServer::start().await;
2203        Mock::given(method("GET"))
2204            .and(path("/notify/queued"))
2205            .and(header("Mcp-Session-Id", ""))
2206            .respond_with(ResponseTemplate::new(200).set_body_json(json!(true)))
2207            .mount(&server)
2208            .await;
2209
2210        let conn = Connection::new_for_test("t".into(), server.uri());
2211        let got = conn.has_pending_notifications().await.expect("peek ok");
2212        assert!(got);
2213    }
2214
2215    /// Proxy returns `false` → Ok(false).
2216    #[tokio::test]
2217    async fn has_pending_notifications_false() {
2218        let server = MockServer::start().await;
2219        Mock::given(method("GET"))
2220            .and(path("/notify/queued"))
2221            .respond_with(
2222                ResponseTemplate::new(200).set_body_json(json!(false)),
2223            )
2224            .mount(&server)
2225            .await;
2226
2227        let conn = Connection::new_for_test("t".into(), server.uri());
2228        let got = conn.has_pending_notifications().await.expect("peek ok");
2229        assert!(!got);
2230    }
2231
2232    /// 404 (proxy lost the session) → Ok(false). Same soft-fallback
2233    /// contract as drain_notifications — peek must never abort a
2234    /// request over a missing-session race.
2235    #[tokio::test]
2236    async fn has_pending_notifications_404_returns_false() {
2237        let server = MockServer::start().await;
2238        Mock::given(method("GET"))
2239            .and(path("/notify/queued"))
2240            .respond_with(ResponseTemplate::new(404))
2241            .mount(&server)
2242            .await;
2243
2244        let conn = Connection::new_for_test("t".into(), server.uri());
2245        let got = conn
2246            .has_pending_notifications()
2247            .await
2248            .expect("404 → ok(false)");
2249        assert!(!got);
2250    }
2251
2252    /// Non-success / non-404 status propagates as `BadStatus`.
2253    #[tokio::test]
2254    async fn has_pending_notifications_5xx_returns_bad_status() {
2255        let server = MockServer::start().await;
2256        Mock::given(method("GET"))
2257            .and(path("/notify/queued"))
2258            .respond_with(ResponseTemplate::new(500).set_body_string("boom"))
2259            .mount(&server)
2260            .await;
2261
2262        let conn = Connection::new_for_test("t".into(), server.uri());
2263        let err = conn
2264            .has_pending_notifications()
2265            .await
2266            .expect_err("5xx → err");
2267        match err {
2268            super::super::Error::BadStatus { code, body, .. } => {
2269                assert_eq!(code.as_u16(), 500);
2270                assert_eq!(body, "boom");
2271            }
2272            other => panic!("expected BadStatus, got {other:?}"),
2273        }
2274    }
2275
2276    /// Mock connections never hit the network and always return false.
2277    #[tokio::test]
2278    async fn has_pending_notifications_mock_returns_false() {
2279        let conn = Connection::new_mock("http://does-not-matter".into());
2280        let got = conn.has_pending_notifications().await.expect("mock ok");
2281        assert!(!got);
2282    }
2283}