Skip to main content

objectiveai_sdk/mcp/
connection.rs

1//! MCP connection for communicating with an MCP server.
2//!
3//! [`Connection`] is a cheaply-clonable handle around an internal
4//! [`ConnectionInner`]. The last drop of the inner `Arc` runs
5//! [`ConnectionInner`]'s `Drop`, which cancels the listener task's
6//! [`tokio_util::sync::CancellationToken`] (held in `_listener_cancel_guard`
7//! as a [`tokio_util::sync::DropGuard`]) — the SSE listener exits the
8//! instant any in-flight reconnect, sleep, or read is cancelled, with no
9//! zombie 401 retries against a now-dead proxy session.
10
11use std::ops::Deref;
12use std::sync::{Arc, RwLock as StdRwLock, Weak};
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::time::Duration;
15
16use indexmap::IndexMap;
17use tokio::sync::{Notify, RwLock};
18use tokio_util::sync::{CancellationToken, DropGuard};
19
20/// Callback fired by [`Connection`] when the upstream MCP server emits
21/// `notifications/tools/list_changed` or `notifications/resources/list_changed`.
22///
23/// **Timing:** runs after the corresponding cache's write lock is taken
24/// but *before* the network paginate that replaces it. That ordering
25/// matches the moment the staleness window opens — anyone blocked on the
26/// read lock won't return until the new list lands. The callback should
27/// not call back into `list_tools` / `list_resources`: doing so would
28/// re-take the lock the listener already holds and deadlock.
29///
30/// Stored behind an `Arc` so the listener task can cheaply clone it out
31/// of the lock and call it without holding the read guard.
32pub type ListChangedCallback = Arc<dyn Fn() + Send + Sync + 'static>;
33
34/// A registered-or-not callback slot. Wrapper so [`ConnectionInner`] can
35/// keep `#[derive(Debug)]` (a raw `dyn Fn` isn't `Debug`).
36struct CallbackSlot(StdRwLock<Option<ListChangedCallback>>);
37
38impl CallbackSlot {
39    fn new() -> Self {
40        Self(StdRwLock::new(None))
41    }
42
43    fn set(&self, callback: ListChangedCallback) {
44        *self.0.write().unwrap() = Some(callback);
45    }
46
47    /// Cheap clone-out of the current callback (if any). The `Arc` clone
48    /// lets us release the read guard before invoking the callback.
49    fn get(&self) -> Option<ListChangedCallback> {
50        self.0.read().unwrap().clone()
51    }
52}
53
54impl std::fmt::Debug for CallbackSlot {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        let set = self.0.read().map(|g| g.is_some()).unwrap_or(false);
57        f.debug_struct("CallbackSlot").field("set", &set).finish()
58    }
59}
60
61/// An active connection to an MCP server using the Streamable HTTP transport.
62///
63/// Cheaply clonable (one `Arc` bump). When the last clone is dropped, the
64/// inner `Arc` ref count hits zero, [`ConnectionInner::Drop`] runs, the
65/// listener-cancel `DropGuard` is dropped, and the SSE listener task is
66/// cancelled — exiting any in-flight `lines.next_line()`, reconnect
67/// `send()`, or backoff `sleep` *immediately* without retrying against
68/// the now-dead proxy session.
69///
70/// Use the public methods (`list_tools`, `call_tool`, `list_resources`,
71/// `read_resource`, `call_tool_as_message`, `tool_key`) for the upstream
72/// MCP protocol surface. The inner state ([`ConnectionInner`]) is also
73/// reachable via `Deref` for read-only field access (e.g.
74/// `connection.url`, `connection.initialize_result.server_info.name`),
75/// but its methods are private — you must go through `Connection`.
76#[derive(Debug)]
77pub struct Connection {
78    inner: Arc<ConnectionInner>,
79}
80
81impl Clone for Connection {
82    fn clone(&self) -> Self {
83        Self { inner: Arc::clone(&self.inner) }
84    }
85}
86
87// No `Drop` for `Connection`: cancellation happens deterministically
88// when the last `Arc<ConnectionInner>` clone is dropped, which runs
89// `ConnectionInner::drop` and releases the cancel-token DropGuard.
90
91impl Deref for Connection {
92    type Target = ConnectionInner;
93    fn deref(&self) -> &ConnectionInner {
94        &self.inner
95    }
96}
97
98impl Connection {
99    pub(super) async fn new(
100        http_client: reqwest::Client,
101        url: String,
102        session_id: String,
103        headers: IndexMap<String, String>,
104        backoff_current_interval: Duration,
105        backoff_initial_interval: Duration,
106        backoff_randomization_factor: f64,
107        backoff_multiplier: f64,
108        backoff_max_interval: Duration,
109        backoff_max_elapsed_time: Duration,
110        call_timeout: Duration,
111        initialize_result: super::initialize_result::InitializeResult,
112        initial_sse_lines: Option<super::LinesStream>,
113    ) -> Self {
114        let inner = ConnectionInner::new(
115            http_client,
116            url,
117            session_id,
118            headers,
119            backoff_current_interval,
120            backoff_initial_interval,
121            backoff_randomization_factor,
122            backoff_multiplier,
123            backoff_max_interval,
124            backoff_max_elapsed_time,
125            call_timeout,
126            initialize_result,
127            initial_sse_lines,
128        )
129        .await;
130        Self { inner }
131    }
132
133
134    pub(super) fn new_mock(url: String) -> Self {
135        Self { inner: ConnectionInner::new_mock(url) }
136    }
137
138    #[cfg(test)]
139    pub(crate) fn new_for_test(name: String, url: String) -> Self {
140        Self { inner: ConnectionInner::new_for_test(name, url) }
141    }
142
143    /// Send a JSON-RPC notification to the upstream. Used by `Client`
144    /// right after `initialize` to send `notifications/initialized`.
145    pub(super) async fn notify<P: serde::Serialize>(
146        &self,
147        method: &str,
148        params: &P,
149    ) -> Result<(), super::Error> {
150        self.inner.notify(method, params).await
151    }
152
153    /// Returns a key identifying this connection for tool namespacing.
154    pub fn tool_key(&self) -> String {
155        self.inner.tool_key()
156    }
157
158    /// Returns the session ID for this connection.
159    pub fn session_id(&self) -> &str {
160        self.inner.session_id()
161    }
162
163    /// Returns all tools from the upstream server.
164    pub async fn list_tools(
165        &self,
166    ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
167        self.inner.list_tools().await
168    }
169
170    /// Calls a tool on the upstream server.
171    pub async fn call_tool(
172        &self,
173        params: &super::tool::CallToolRequestParams,
174    ) -> Result<super::tool::CallToolResult, super::Error> {
175        self.inner.call_tool(params).await
176    }
177
178    /// Calls a tool and converts the result into a [`ToolMessage`].
179    pub async fn call_tool_as_message(
180        &self,
181        params: &super::tool::CallToolRequestParams,
182        tool_call_id: String,
183    ) -> Result<
184        crate::agent::completions::message::ToolMessage,
185        super::Error,
186    > {
187        self.inner.call_tool_as_message(params, tool_call_id).await
188    }
189
190    /// Returns all resources from the upstream server.
191    pub async fn list_resources(
192        &self,
193    ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
194        self.inner.list_resources().await
195    }
196
197    /// Returns the cached tool list as soon as it differs from `current`,
198    /// or waits up to `timeout` for the next `notifications/tools/list_changed`
199    /// from the upstream server before re-reading.
200    ///
201    /// Wakes the moment a refresh writer takes the cache write lock, so
202    /// the post-wake `read` is guaranteed to observe the new list rather
203    /// than racing against the install. Safe to call from any number of
204    /// tasks concurrently.
205    pub async fn subscribe_tools(
206        &self,
207        current: &[super::tool::Tool],
208        timeout: Duration,
209    ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
210        self.inner.subscribe_tools(current, timeout).await
211    }
212
213    /// Resource counterpart of [`Connection::subscribe_tools`].
214    pub async fn subscribe_resources(
215        &self,
216        current: &[super::resource::Resource],
217        timeout: Duration,
218    ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
219        self.inner.subscribe_resources(current, timeout).await
220    }
221
222    /// Atomically drain the proxy's `pending_notifications` queue for
223    /// this session via `GET /notify` and return the queued content
224    /// blocks. A second call returns `[]` until the next out-of-band
225    /// `POST /notify`.
226    ///
227    /// Intended for use at the start of an agent turn so notifications
228    /// queued between turns — when the prior turn ended without a tool
229    /// call, or the user is starting a fresh continuation — surface as
230    /// a user message instead of being lost. The proxy's existing
231    /// `tools/call` response path still drains in-flight notifications
232    /// arriving *during* a turn; this method covers the gap between
233    /// turns.
234    ///
235    /// A 404 from the proxy (session unknown — possible after a proxy
236    /// restart) is mapped to an empty `Vec` so callers do not need to
237    /// distinguish "no notifications" from "lost session" at the use
238    /// site; the next upstream call will surface the lost-session
239    /// condition through its own error path.
240    pub async fn drain_notifications(
241        &self,
242    ) -> Result<Vec<super::tool::ContentBlock>, super::Error> {
243        self.inner.drain_notifications().await
244    }
245
246    /// Reads a resource from the upstream server.
247    pub async fn read_resource(
248        &self,
249        uri: &str,
250    ) -> Result<super::resource::ReadResourceResult, super::Error> {
251        self.inner.read_resource(uri).await
252    }
253
254    /// Register a callback to fire whenever the upstream emits
255    /// `notifications/tools/list_changed`.
256    ///
257    /// **Timing:** the callback runs *after* the tool cache's write lock
258    /// is acquired but *before* the network paginate that replaces it.
259    /// That means readers blocked on the read lock won't return until the
260    /// new list is in place, and the callback observes the moment the
261    /// staleness window opens. The proxy uses this to emit its own
262    /// `notifications/tools/list_changed` to downstream clients at the
263    /// right instant.
264    ///
265    /// Replaces any previously-registered tools-list-changed callback.
266    /// All clones of this `Connection` share the same callback slot.
267    pub fn set_on_tools_list_changed<F>(&self, callback: F)
268    where
269        F: Fn() + Send + Sync + 'static,
270    {
271        self.inner.on_tools_list_changed.set(Arc::new(callback));
272    }
273
274    /// Register a callback to fire whenever the upstream emits
275    /// `notifications/resources/list_changed`. Same timing contract as
276    /// [`Connection::set_on_tools_list_changed`].
277    ///
278    /// Replaces any previously-registered resources-list-changed callback.
279    /// All clones of this `Connection` share the same callback slot.
280    pub fn set_on_resources_list_changed<F>(&self, callback: F)
281    where
282        F: Fn() + Send + Sync + 'static,
283    {
284        self.inner.on_resources_list_changed.set(Arc::new(callback));
285    }
286}
287
288/// The actual connection state. Behind an `Arc` inside [`Connection`].
289///
290/// Fields are public for read-only access (callers reach them via
291/// `Connection`'s `Deref`), but every method on this type is private —
292/// the public surface lives on [`Connection`] and delegates through.
293#[derive(Debug)]
294pub struct ConnectionInner {
295    pub http_client: reqwest::Client,
296    pub url: String,
297    pub session_id: String,
298    /// All HTTP headers stamped on every POST / GET this connection
299    /// makes — the same merged map (defaults + caller overrides) the
300    /// `Client` built once during connect. `Mcp-Session-Id`,
301    /// `Content-Type`, and `Accept` are still set by the request
302    /// builders and override anything in `headers`.
303    pub headers: IndexMap<String, String>,
304
305    pub backoff_current_interval: Duration,
306    pub backoff_initial_interval: Duration,
307    pub backoff_randomization_factor: f64,
308    pub backoff_multiplier: f64,
309    pub backoff_max_interval: Duration,
310    pub backoff_max_elapsed_time: Duration,
311    pub call_timeout: Duration,
312
313    /// The server's capabilities and info from the initialize response.
314    pub initialize_result: super::initialize_result::InitializeResult,
315
316    /// If true, all RPC/notify calls are no-ops. Used for mock orchestrator URLs.
317    mock: bool,
318
319    /// Auto-incrementing request ID (starts at 2; 1 was used for initialize).
320    next_id: AtomicU64,
321
322    /// All tools from the server, populated by background pagination.
323    tools: RwLock<Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>>>,
324    /// All resources from the server, populated by background pagination.
325    resources:
326        RwLock<Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>>>,
327
328    /// Cancellation token for the long-lived `listen_for_list_changes`
329    /// task. The listener selects this against every blocking await
330    /// (read, reconnect-send, backoff-sleep) and returns the instant it
331    /// fires.
332    ///
333    /// Held inside the connection as a [`DropGuard`] so that the moment
334    /// the last `Arc<ConnectionInner>` clone is dropped — i.e. the
335    /// moment no external `Connection` handle remains — `Drop` runs on
336    /// the guard, the token cancels, and the listener task tears down.
337    /// The listener itself holds a sibling `CancellationToken` (clone),
338    /// not the guard, so its task does not extend the connection's
339    /// lifetime.
340    _listener_cancel_guard: Option<DropGuard>,
341
342    /// Optional callback fired *after* the listener has refreshed the
343    /// tool cache in response to an upstream `notifications/tools/list_changed`.
344    /// Set via [`Connection::set_on_tools_list_changed`].
345    on_tools_list_changed: CallbackSlot,
346
347    /// Optional callback fired *after* the listener has refreshed the
348    /// resource cache in response to an upstream
349    /// `notifications/resources/list_changed`.
350    /// Set via [`Connection::set_on_resources_list_changed`].
351    on_resources_list_changed: CallbackSlot,
352
353    /// Wakes any task awaiting in [`Connection::subscribe_tools`]. Fired
354    /// from inside `refresh_tools_signaling` the moment the writer
355    /// acquires the cache write lock — *before* the new list is
356    /// installed. A woken subscriber's next `read().await` blocks behind
357    /// the writer's guard, so it always observes the post-swap state.
358    tools_changed: Notify,
359
360    /// Resource counterpart of [`Self::tools_changed`].
361    resources_changed: Notify,
362}
363
364impl ConnectionInner {
365    /// Creates a mock connection that never makes network requests.
366    /// All RPC calls return empty/default results.
367    fn new_mock(url: String) -> Arc<Self> {
368        Arc::new(Self {
369            http_client: reqwest::Client::new(),
370            url,
371            session_id: String::new(),
372            headers: IndexMap::new(),
373            backoff_current_interval: Duration::ZERO,
374            backoff_initial_interval: Duration::ZERO,
375            backoff_randomization_factor: 0.0,
376            backoff_multiplier: 1.0,
377            backoff_max_interval: Duration::ZERO,
378            backoff_max_elapsed_time: Duration::ZERO,
379            call_timeout: Duration::ZERO,
380            initialize_result: super::initialize_result::InitializeResult {
381                protocol_version: "2025-03-26".into(),
382                capabilities: super::initialize_result::ServerCapabilities {
383                    experimental: None,
384                    logging: None,
385                    completions: None,
386                    prompts: None,
387                    resources: None,
388                    tools: None,
389                    tasks: None,
390                },
391                server_info: super::initialize_result::Implementation {
392                    name: "mock".into(),
393                    title: None,
394                    version: "0.0.0".into(),
395                    website_url: None,
396                    description: None,
397                    icons: None,
398                },
399                instructions: None,
400                _meta: None,
401            },
402            mock: true,
403            next_id: AtomicU64::new(2),
404            tools: RwLock::new(Ok(Arc::new(Vec::new()))),
405            resources: RwLock::new(Ok(Arc::new(Vec::new()))),
406            _listener_cancel_guard: None,
407            on_tools_list_changed: CallbackSlot::new(),
408            on_resources_list_changed: CallbackSlot::new(),
409            tools_changed: Notify::new(),
410            resources_changed: Notify::new(),
411        })
412    }
413
414    /// Creates a minimal connection for unit testing.
415    #[cfg(test)]
416    fn new_for_test(name: String, url: String) -> Arc<Self> {
417        Arc::new(Self {
418            http_client: reqwest::Client::new(),
419            url,
420            session_id: String::new(),
421            headers: IndexMap::new(),
422            backoff_current_interval: Duration::from_millis(500),
423            backoff_initial_interval: Duration::from_millis(500),
424            backoff_randomization_factor: 0.5,
425            backoff_multiplier: 1.5,
426            backoff_max_interval: Duration::from_secs(60),
427            backoff_max_elapsed_time: Duration::from_secs(900),
428            call_timeout: Duration::from_secs(30),
429            initialize_result: super::initialize_result::InitializeResult {
430                protocol_version: "2025-03-26".into(),
431                capabilities:
432                    super::initialize_result::ServerCapabilities {
433                        experimental: None,
434                        logging: None,
435                        completions: None,
436                        prompts: None,
437                        resources: None,
438                        tools: None,
439                        tasks: None,
440                    },
441                server_info: super::initialize_result::Implementation {
442                    name,
443                    title: None,
444                    version: "0.0.0".into(),
445                    website_url: None,
446                    description: None,
447                    icons: None,
448                },
449                instructions: None,
450                _meta: None,
451            },
452            mock: false,
453            next_id: AtomicU64::new(2),
454            tools: RwLock::new(Ok(Arc::new(Vec::new()))),
455            resources: RwLock::new(Ok(Arc::new(Vec::new()))),
456            _listener_cancel_guard: None,
457            on_tools_list_changed: CallbackSlot::new(),
458            on_resources_list_changed: CallbackSlot::new(),
459            tools_changed: Notify::new(),
460            resources_changed: Notify::new(),
461        })
462    }
463
464    /// Creates a new connection and spawns background tasks to paginate
465    /// all tools and resources. Called internally by
466    /// [`Client::connect`](super::Client::connect) (via [`Connection::new`]).
467    ///
468    /// `initial_sse_lines`, if `Some`, is a pre-opened SSE line reader
469    /// that the list-changed listener will read from immediately on its
470    /// first iteration, instead of opening its own GET `/`. The caller
471    /// is responsible for arranging for one of these to exist whenever
472    /// the upstream advertises `tools.list_changed` or
473    /// `resources.list_changed` — see
474    /// [`Client::connect`](super::Client::connect).
475    async fn new(
476        http_client: reqwest::Client,
477        url: String,
478        session_id: String,
479        headers: IndexMap<String, String>,
480        backoff_current_interval: Duration,
481        backoff_initial_interval: Duration,
482        backoff_randomization_factor: f64,
483        backoff_multiplier: f64,
484        backoff_max_interval: Duration,
485        backoff_max_elapsed_time: Duration,
486        call_timeout: Duration,
487        initialize_result: super::initialize_result::InitializeResult,
488        initial_sse_lines: Option<super::LinesStream>,
489    ) -> Arc<Self> {
490        // Cancel-the-listener machinery: store the DropGuard inside the
491        // inner so the cancellation fires deterministically when the
492        // last external `Arc<ConnectionInner>` clone drops. Hand the
493        // listener task a sibling clone (no guard) — that way the
494        // listener task's lifetime does not extend the connection.
495        let listener_cancel = CancellationToken::new();
496        let listener_cancel_for_task = listener_cancel.clone();
497        let conn = Arc::new(Self {
498            http_client,
499            url,
500            session_id,
501            headers,
502            backoff_current_interval,
503            backoff_initial_interval,
504            backoff_randomization_factor,
505            backoff_multiplier,
506            backoff_max_interval,
507            backoff_max_elapsed_time,
508            call_timeout,
509            initialize_result,
510            mock: false,
511            next_id: AtomicU64::new(2),
512            tools: RwLock::new(Ok(Arc::new(Vec::new()))),
513            resources: RwLock::new(Ok(Arc::new(Vec::new()))),
514            _listener_cancel_guard: Some(listener_cancel.drop_guard()),
515            on_tools_list_changed: CallbackSlot::new(),
516            on_resources_list_changed: CallbackSlot::new(),
517            tools_changed: Notify::new(),
518            resources_changed: Notify::new(),
519        });
520
521        // Spawn background tool lister if the server supports tools.
522        //
523        // We don't return until the spawned task has acquired the write
524        // lock. Otherwise a caller that immediately reads `list_tools()`
525        // could race the writer — `tokio::spawn` only queues the task,
526        // and a fast reader can acquire the read lock before the writer
527        // has run its first instruction. The reader would then see the
528        // initial empty `Vec` and return that, even though a full
529        // populate is in flight.
530        //
531        // The `RwLockWriteGuard` itself isn't `Send`-friendly enough to
532        // pass back, so we use a oneshot to signal "I'm holding the
533        // lock now"; once we receive that, the cache is exclusively
534        // owned by the writer and any subsequent `read().await` from
535        // the caller is guaranteed to wait for the populate to finish.
536        if conn.initialize_result.capabilities.tools.is_some() {
537            let conn = Arc::clone(&conn);
538            let (lock_held_tx, lock_held_rx) = tokio::sync::oneshot::channel();
539            tokio::spawn(async move {
540                conn.refresh_tools_signaling(lock_held_tx, None).await;
541            });
542            // Wait for the writer to hold the lock before returning.
543            let _ = lock_held_rx.await;
544        }
545
546        // Spawn background resource lister if the server supports
547        // resources. Same lock-handoff contract as tools above.
548        if conn.initialize_result.capabilities.resources.is_some() {
549            let conn = Arc::clone(&conn);
550            let (lock_held_tx, lock_held_rx) = tokio::sync::oneshot::channel();
551            tokio::spawn(async move {
552                conn.refresh_resources_signaling(lock_held_tx, None).await;
553            });
554            let _ = lock_held_rx.await;
555        }
556
557        // Spawn the list-changed listener iff the caller handed us a
558        // pre-opened SSE stream. The connection is naive about
559        // `tools.list_changed` / `resources.list_changed` capabilities —
560        // [`Client::connect`](super::Client::connect) translates them
561        // into "did or didn't open a stream for us." If we get a stream,
562        // we listen on it; if we don't, there's nothing to listen for.
563        if let Some(initial_lines) = initial_sse_lines {
564            // Hand the listener a `Weak` so the spawned task itself does
565            // not keep the connection alive. `listener_cancel_for_task`
566            // is a sibling clone of the connection's own
567            // `_listener_cancel_guard` token — when the last external
568            // `Arc<ConnectionInner>` clone is dropped, the inner's Drop
569            // releases the guard and the listener wakes from any
570            // pending await (read, send, sleep) and exits immediately.
571            let weak = Arc::downgrade(&conn);
572            tokio::spawn(async move {
573                Self::listen_for_list_changes(
574                    weak,
575                    listener_cancel_for_task,
576                    initial_lines,
577                )
578                .await;
579            });
580        }
581
582        conn
583    }
584
585    /// Creates an exponential backoff configuration from the connection's fields.
586    fn backoff(&self) -> backoff::ExponentialBackoff {
587        backoff::ExponentialBackoff {
588            current_interval: self.backoff_current_interval,
589            initial_interval: self.backoff_initial_interval,
590            randomization_factor: self.backoff_randomization_factor,
591            multiplier: self.backoff_multiplier,
592            max_interval: self.backoff_max_interval,
593            start_time: std::time::Instant::now(),
594            max_elapsed_time: Some(self.backoff_max_elapsed_time),
595            clock: backoff::SystemClock::default(),
596        }
597    }
598
599    /// Builds a POST request with all required headers and the call timeout.
600    fn post(&self) -> reqwest::RequestBuilder {
601        let mut request = self
602            .http_client
603            .post(&self.url)
604            .timeout(self.call_timeout)
605            .header("Content-Type", "application/json")
606            .header("Accept", "application/json, text/event-stream");
607        for (name, value) in &self.headers {
608            request = request.header(name, value);
609        }
610        // Mcp-Session-Id is applied last so a same-named entry in
611        // `headers` (e.g. the proxy's encoded session id) can never
612        // override the connection's own session id.
613        request = request.header("Mcp-Session-Id", &self.session_id);
614        request
615    }
616
617    /// Sends a JSON-RPC request, retrying transient errors when
618    /// `idempotent` is `true`.
619    ///
620    /// Idempotent methods (`tools/list`, `resources/list`,
621    /// `resources/read`, etc.) retry every transient error — network,
622    /// HTTP status, malformed body, JSON-RPC error, session expiration —
623    /// until the backoff's `max_elapsed_time` is exceeded.
624    ///
625    /// Non-idempotent methods (`tools/call`) make exactly one attempt.
626    /// Retrying a `tools/call` is unsafe: a tool may have mutated remote
627    /// state during the first attempt before the response was lost, and
628    /// re-firing the call would mutate state again. Each retry of
629    /// `AppendTask` advances `state.tasks.len()` an extra step, so the
630    /// agent sees a different return value than expected and the
631    /// pid-derived mock seed at the next step diverges. See
632    /// `objectiveai-api/src/agent/completions/client.rs` (sequential
633    /// dispatch) and `mock/client.rs::mock.seed_derive` for the
634    /// downstream consequence.
635    async fn rpc<P: serde::Serialize, R: serde::de::DeserializeOwned>(
636        &self,
637        method: &str,
638        params: &P,
639        idempotent: bool,
640    ) -> Result<R, super::Error> {
641        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
642        let body = serde_json::json!({
643            "jsonrpc": "2.0",
644            "id": id,
645            "method": method,
646            "params": params,
647        });
648
649        let attempt_one = || async {
650            let url = self.url.clone();
651            let response = self.post().json(&body).send().await.map_err(|source| {
652                backoff::Error::transient(super::Error::Request {
653                    url: url.clone(),
654                    source,
655                })
656            })?;
657
658            if response.status() == reqwest::StatusCode::NOT_FOUND {
659                return Err(backoff::Error::transient(
660                    super::Error::SessionExpired { url: url.clone() },
661                ));
662            }
663            if !response.status().is_success() {
664                let code = response.status();
665                let body = response.text().await.unwrap_or_default();
666                return Err(backoff::Error::transient(
667                    super::Error::BadStatus { url: url.clone(), code, body },
668                ));
669            }
670
671            let rpc_response: super::JsonRpcResponse<R> =
672                super::parse_streamable_http_response(&url, response)
673                    .await
674                    .map_err(backoff::Error::transient)?;
675
676            match rpc_response {
677                super::JsonRpcResponse::Success { result, .. } => Ok(result),
678                super::JsonRpcResponse::Error { error, .. } => {
679                    Err(backoff::Error::transient(super::Error::JsonRpc {
680                        url: url.clone(),
681                        code: error.code,
682                        message: error.message,
683                        data: error.data,
684                    }))
685                }
686            }
687        };
688
689        if idempotent {
690            backoff::future::retry(self.backoff(), attempt_one).await
691        } else {
692            attempt_one().await.map_err(|e| match e {
693                backoff::Error::Permanent(err) | backoff::Error::Transient { err, .. } => err,
694            })
695        }
696    }
697
698    /// Sends a JSON-RPC notification (no response expected) with the
699    /// same exponential-backoff retry policy as [`Self::rpc`]. Every
700    /// error is transient; the loop gives up only when the backoff's
701    /// `max_elapsed_time` is exceeded.
702    async fn notify<P: serde::Serialize>(
703        &self,
704        method: &str,
705        params: &P,
706    ) -> Result<(), super::Error> {
707        if self.mock { return Ok(()); }
708        let body = serde_json::json!({
709            "jsonrpc": "2.0",
710            "method": method,
711            "params": params,
712        });
713
714        backoff::future::retry(self.backoff(), || async {
715            let url = self.url.clone();
716            let response = self.post().json(&body).send().await.map_err(|source| {
717                backoff::Error::transient(super::Error::Request {
718                    url: url.clone(),
719                    source,
720                })
721            })?;
722
723            if response.status() == reqwest::StatusCode::NOT_FOUND {
724                return Err(backoff::Error::transient(
725                    super::Error::SessionExpired { url: url.clone() },
726                ));
727            }
728            if !response.status().is_success() {
729                let code = response.status();
730                let body = response.text().await.unwrap_or_default();
731                return Err(backoff::Error::transient(
732                    super::Error::BadStatus { url: url.clone(), code, body },
733                ));
734            }
735
736            Ok(())
737        })
738        .await
739    }
740
741    /// `GET <self.url>/notify` against the ObjectiveAI MCP proxy.
742    /// Atomically drains the proxy's pending-notifications queue for
743    /// this session and returns the queued content blocks.
744    ///
745    /// Single-attempt — the proxy drain is destructive, so a retry
746    /// after a transient failure would risk silently dropping
747    /// notifications that the first attempt's response carried but
748    /// failed to deliver. Networks errors propagate to the caller; the
749    /// next turn's drain will pick up anything queued in the meantime.
750    /// A 404 (session unknown) is mapped to `Ok(vec![])` — see the
751    /// public method's doc on `Connection`.
752    async fn drain_notifications(
753        &self,
754    ) -> Result<Vec<super::tool::ContentBlock>, super::Error> {
755        if self.mock {
756            return Ok(Vec::new());
757        }
758
759        let url = format!("{}/notify", self.url.trim_end_matches('/'));
760        let mut request = self
761            .http_client
762            .get(&url)
763            .timeout(self.call_timeout)
764            .header("Accept", "application/json");
765        for (name, value) in &self.headers {
766            request = request.header(name, value);
767        }
768        // Mcp-Session-Id applied last so a same-named entry in `headers`
769        // can never override the connection's own session id — matches
770        // the invariant in `Self::post`.
771        request = request.header("Mcp-Session-Id", &self.session_id);
772
773        let response = request.send().await.map_err(|source| super::Error::Request {
774            url: url.clone(),
775            source,
776        })?;
777
778        if response.status() == reqwest::StatusCode::NOT_FOUND {
779            return Ok(Vec::new());
780        }
781        if !response.status().is_success() {
782            let code = response.status();
783            let body = response.text().await.unwrap_or_default();
784            return Err(super::Error::BadStatus { url, code, body });
785        }
786
787        response
788            .json::<Vec<super::tool::ContentBlock>>()
789            .await
790            .map_err(|source| super::Error::Request { url, source })
791    }
792
793    /// Returns a key identifying this connection for tool namespacing.
794    fn tool_key(&self) -> String {
795        format!("{}-{}", self.initialize_result.server_info.name, self.url)
796    }
797
798    /// Returns the session ID for this connection.
799    fn session_id(&self) -> &str {
800        &self.session_id
801    }
802
803    /// Sends a `tools/list` RPC call for a single page.
804    async fn rpc_list_tools(
805        &self,
806        cursor: Option<&str>,
807    ) -> Result<super::tool::ListToolsResult, super::Error> {
808        self.rpc(
809            "tools/list",
810            &super::tool::ListToolsRequest {
811                cursor: cursor.map(String::from),
812            },
813            true,
814        )
815        .await
816    }
817
818    /// Returns all tools from the server.
819    ///
820    /// Blocks until background pagination completes, then returns a
821    /// cheap `Arc` clone of the result.
822    async fn list_tools(
823        &self,
824    ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
825        self.tools.read().await.clone()
826    }
827
828    /// Calls a tool on the MCP server.
829    async fn call_tool(
830        &self,
831        params: &super::tool::CallToolRequestParams,
832    ) -> Result<super::tool::CallToolResult, super::Error> {
833        if self.mock {
834            return Ok(super::tool::CallToolResult {
835                content: vec![super::tool::ContentBlock::Text(super::tool::TextContent {
836                    text: "mock".to_string(),
837                    annotations: None,
838                    _meta: None,
839                })],
840                structured_content: None,
841                is_error: None,
842                _meta: None,
843            });
844        }
845        self.rpc("tools/call", params, false).await
846    }
847
848    /// Calls a tool and converts the result into a [`ToolMessage`].
849    ///
850    /// Content blocks are mapped as follows:
851    /// - `text` → text part
852    /// - `image` → image_url part (data URL)
853    /// - `audio` → input_audio part
854    /// - `resource` (embedded text) → text part
855    /// - `resource` (embedded blob, image mime) → image_url part (data URL)
856    /// - `resource` (embedded blob, other mime) → file part
857    /// - `resource_link` → if the URI appears in `list_resources`, fetches
858    ///   via `read_resource` and inlines the content using the same
859    ///   text/blob rules; otherwise serializes the link as JSON text
860    ///
861    /// If `is_error` is set on the result, the content is prefixed with
862    /// an error indicator.
863    async fn call_tool_as_message(
864        &self,
865        params: &super::tool::CallToolRequestParams,
866        tool_call_id: String,
867    ) -> Result<
868        crate::agent::completions::message::ToolMessage,
869        super::Error,
870    > {
871        use crate::agent::completions::message::{
872            File, ImageUrl, InputAudio, RichContent, RichContentPart,
873            ToolMessage,
874        };
875        use super::shared::ResourceContentsUnion;
876        use super::tool::ContentBlock;
877
878        let result = self.call_tool(params).await?;
879
880        // Build the set of known resource URIs for resource_link resolution.
881        let known_resource_uris: std::collections::HashSet<String> =
882            match self.list_resources().await {
883                Ok(resources) => {
884                    resources.iter().map(|r| r.uri.clone()).collect()
885                }
886                Err(_) => std::collections::HashSet::new(),
887            };
888
889        /// Converts a `ResourceContentsUnion` into one or more rich content
890        /// parts. Text resources become text parts. Blob resources with an
891        /// image MIME type become image_url parts (data URL); all other blobs
892        /// become file parts.
893        fn resource_contents_to_part(
894            contents: &ResourceContentsUnion,
895        ) -> RichContentPart {
896            match contents {
897                ResourceContentsUnion::Text(text) => {
898                    RichContentPart::Text {
899                        text: text.text.clone(),
900                    }
901                }
902                ResourceContentsUnion::Blob(blob) => {
903                    let mime = blob
904                        .base
905                        .mime_type
906                        .as_deref()
907                        .unwrap_or("application/octet-stream");
908
909                    if mime.starts_with("image/") {
910                        RichContentPart::ImageUrl {
911                            image_url: ImageUrl {
912                                url: format!(
913                                    "data:{};base64,{}",
914                                    mime, blob.blob
915                                ),
916                                detail: None,
917                            },
918                        }
919                    } else {
920                        // Extract a filename from the URI path, if any.
921                        let filename = blob
922                            .base
923                            .uri
924                            .rsplit('/')
925                            .next()
926                            .filter(|s| !s.is_empty())
927                            .map(String::from);
928
929                        RichContentPart::File {
930                            file: File {
931                                file_data: Some(blob.blob.clone()),
932                                filename,
933                                file_id: None,
934                                file_url: None,
935                            },
936                        }
937                    }
938                }
939            }
940        }
941
942        let mut parts: Vec<RichContentPart> = Vec::new();
943
944        for block in &result.content {
945            match block {
946                ContentBlock::Text(text) => {
947                    parts.push(RichContentPart::Text {
948                        text: text.text.clone(),
949                    });
950                }
951                ContentBlock::Image(image) => {
952                    parts.push(RichContentPart::ImageUrl {
953                        image_url: ImageUrl {
954                            url: format!(
955                                "data:{};base64,{}",
956                                image.mime_type, image.data
957                            ),
958                            detail: None,
959                        },
960                    });
961                }
962                ContentBlock::Audio(audio) => {
963                    parts.push(RichContentPart::InputAudio {
964                        input_audio: InputAudio {
965                            data: audio.data.clone(),
966                            format: audio.mime_type.clone(),
967                        },
968                    });
969                }
970                ContentBlock::EmbeddedResource(embedded) => {
971                    parts.push(resource_contents_to_part(
972                        &embedded.resource,
973                    ));
974                }
975                ContentBlock::ResourceLink(link) => {
976                    if known_resource_uris.contains(&link.uri) {
977                        // Fetch the resource and inline its contents.
978                        let read_result =
979                            self.read_resource(&link.uri).await?;
980                        for contents in &read_result.contents {
981                            parts.push(
982                                resource_contents_to_part(contents),
983                            );
984                        }
985                    } else {
986                        // Not a known resource; serialize as JSON text.
987                        parts.push(RichContentPart::Text {
988                            text: serde_json::to_string(link)
989                                .unwrap_or_default(),
990                        });
991                    }
992                }
993            }
994        }
995
996        let content = match parts.len() {
997            0 => RichContent::Text(String::new()),
998            1 => match parts.remove(0) {
999                RichContentPart::Text { text } => RichContent::Text(text),
1000                other => RichContent::Parts(vec![other]),
1001            },
1002            _ => RichContent::Parts(parts),
1003        };
1004
1005        Ok(ToolMessage {
1006            content,
1007            tool_call_id,
1008        })
1009    }
1010
1011    /// Sends a `resources/list` RPC call for a single page.
1012    async fn rpc_list_resources(
1013        &self,
1014        cursor: Option<&str>,
1015    ) -> Result<super::resource::ListResourcesResult, super::Error> {
1016        self.rpc(
1017            "resources/list",
1018            &super::resource::ListResourcesRequest {
1019                cursor: cursor.map(String::from),
1020            },
1021            true,
1022        )
1023        .await
1024    }
1025
1026    /// Returns all resources from the server.
1027    ///
1028    /// Blocks until background pagination completes, then returns a
1029    /// cheap `Arc` clone of the result.
1030    async fn list_resources(
1031        &self,
1032    ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
1033        self.resources.read().await.clone()
1034    }
1035
1036    /// Returns the cached tool list as soon as it differs from `current`,
1037    /// or — if it equals `current` right now — waits up to `timeout` for
1038    /// the cache to change and then returns whatever it sees.
1039    ///
1040    /// An `Err` cache is treated as "different from any caller snapshot"
1041    /// and returned immediately.
1042    ///
1043    /// Concurrency-safe: any number of concurrent subscribers wait on
1044    /// independent `Notified` futures and read the cache through the
1045    /// shared `RwLock`. A timeout that fires alone is not an error — we
1046    /// re-read the cache and return whatever's there.
1047    async fn subscribe_tools(
1048        &self,
1049        current: &[super::tool::Tool],
1050        timeout: Duration,
1051    ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
1052        // Arm BEFORE reading. `enable()` registers the future in the
1053        // wait queue without polling, so a `notify_waiters` racing
1054        // between our read and our await still wakes us.
1055        let notified = self.tools_changed.notified();
1056        tokio::pin!(notified);
1057        notified.as_mut().enable();
1058
1059        let initial = self.tools.read().await.clone();
1060        match &initial {
1061            Ok(arc) if arc.as_slice() == current => {}
1062            _ => return initial,
1063        }
1064
1065        let _ = tokio::time::timeout(timeout, notified).await;
1066
1067        self.tools.read().await.clone()
1068    }
1069
1070    /// Resource counterpart of [`Self::subscribe_tools`].
1071    async fn subscribe_resources(
1072        &self,
1073        current: &[super::resource::Resource],
1074        timeout: Duration,
1075    ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
1076        let notified = self.resources_changed.notified();
1077        tokio::pin!(notified);
1078        notified.as_mut().enable();
1079
1080        let initial = self.resources.read().await.clone();
1081        match &initial {
1082            Ok(arc) if arc.as_slice() == current => {}
1083            _ => return initial,
1084        }
1085
1086        let _ = tokio::time::timeout(timeout, notified).await;
1087
1088        self.resources.read().await.clone()
1089    }
1090
1091    /// Reads a resource from the MCP server.
1092    async fn read_resource(
1093        &self,
1094        uri: &str,
1095    ) -> Result<super::resource::ReadResourceResult, super::Error> {
1096        self.rpc(
1097            "resources/read",
1098            &super::resource::ReadResourceRequestParams {
1099                uri: uri.to_string(),
1100            },
1101            true,
1102        )
1103        .await
1104    }
1105
1106    /// Re-fetches all tools from the server, replacing the cached list.
1107    ///
1108    /// Optionally fires `on_change` *after* the write lock is acquired but
1109    /// *before* the network paginate begins, so the callback observes the
1110    /// "list change is in flight" edge — readers blocked on the read lock
1111    /// won't return until the new list lands. The proxy uses this to
1112    /// re-emit `notifications/tools/list_changed` to its downstream client
1113    /// at the moment the staleness window opens.
1114    async fn refresh_tools(&self, on_change: Option<ListChangedCallback>) {
1115        // Listener-driven refresh. Visibility contract: any caller
1116        // that issues `list_tools()` after a `tools/list_changed`
1117        // notification has been observed must see the post-swap
1118        // value, not stale data — so the write lock has to gate
1119        // readers across the upstream paginate.
1120        //
1121        // Performance contract: don't serialise paginate *behind*
1122        // lock-acquisition latency. We start `tools.write()` and the
1123        // upstream paginate **concurrently** with `tokio::join!`. The
1124        // write-lock acquire blocks new `list_tools()` readers
1125        // immediately (preserving visibility) and runs in parallel
1126        // with whatever drain time the in-flight readers need; the
1127        // paginate runs alongside. Total wall-clock is
1128        // `max(drain_time, paginate_time)` instead of the sum.
1129        //
1130        // `notify_waiters` and `on_change` fire under the write
1131        // guard, *after* `*guard = result`, so anyone awoken by them
1132        // queues on the read lock, waits for the guard to drop, and
1133        // observes the post-swap state.
1134        let (mut guard, result) = tokio::join!(
1135            self.tools.write(),
1136            self.paginate_tools(),
1137        );
1138        *guard = result;
1139        self.tools_changed.notify_waiters();
1140        if let Some(cb) = on_change {
1141            cb();
1142        }
1143    }
1144
1145    /// Page-by-page fetch of the upstream tool list, no locks held.
1146    /// Shared between the `_signaling` (initial-populate, holds lock
1147    /// for the original "block fast readers" contract) and `refresh_*`
1148    /// (listener-driven, lock-only-around-install) variants.
1149    async fn paginate_tools(
1150        &self,
1151    ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
1152        let mut all_tools = Vec::new();
1153        let mut cursor: Option<String> = None;
1154        loop {
1155            match self.rpc_list_tools(cursor.as_deref()).await {
1156                Ok(page) => {
1157                    all_tools.extend(page.tools);
1158                    cursor = page.next_cursor;
1159                    if cursor.is_none() {
1160                        return Ok(Arc::new(all_tools));
1161                    }
1162                }
1163                Err(e) => return Err(Arc::new(e)),
1164            }
1165        }
1166    }
1167
1168    /// Same as [`Self::refresh_tools`] but fires `lock_held` once the
1169    /// write lock has been acquired so the caller can synchronise on
1170    /// "writer is in possession of the cache" before returning. Used by
1171    /// `ConnectionInner::new` to prevent a fast reader from acquiring
1172    /// the read lock before this writer has even started.
1173    async fn refresh_tools_signaling(
1174        &self,
1175        lock_held: tokio::sync::oneshot::Sender<()>,
1176        on_change: Option<ListChangedCallback>,
1177    ) {
1178        let mut guard = self.tools.write().await;
1179        // Fire `tools_changed` while we hold the write lock and *before*
1180        // installing the new list. Any subscriber woken now must take a
1181        // read lock to observe the result, and that read lock is queued
1182        // behind this write guard — so they always see the post-swap
1183        // state, never mid-swap.
1184        self.tools_changed.notify_waiters();
1185        let _ = lock_held.send(());
1186        if let Some(cb) = on_change {
1187            cb();
1188        }
1189        let mut all_tools = Vec::new();
1190        let mut cursor: Option<String> = None;
1191        let result = loop {
1192            match self.rpc_list_tools(cursor.as_deref()).await {
1193                Ok(page) => {
1194                    all_tools.extend(page.tools);
1195                    cursor = page.next_cursor;
1196                    if cursor.is_none() {
1197                        break Ok(Arc::new(all_tools));
1198                    }
1199                }
1200                Err(e) => break Err(Arc::new(e)),
1201            }
1202        };
1203        *guard = result;
1204    }
1205
1206    /// Re-fetches all resources from the server, replacing the cached list.
1207    /// See [`ConnectionInner::refresh_tools`] for the callback timing
1208    /// contract.
1209    async fn refresh_resources(&self, on_change: Option<ListChangedCallback>) {
1210        // Same paginate-while-acquiring-the-write-lock pattern as
1211        // `refresh_tools` — see that comment for the visibility +
1212        // performance rationale.
1213        let (mut guard, result) = tokio::join!(
1214            self.resources.write(),
1215            self.paginate_resources(),
1216        );
1217        *guard = result;
1218        self.resources_changed.notify_waiters();
1219        if let Some(cb) = on_change {
1220            cb();
1221        }
1222    }
1223
1224    async fn paginate_resources(
1225        &self,
1226    ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
1227        let mut all_resources = Vec::new();
1228        let mut cursor: Option<String> = None;
1229        loop {
1230            match self.rpc_list_resources(cursor.as_deref()).await {
1231                Ok(page) => {
1232                    all_resources.extend(page.resources);
1233                    cursor = page.next_cursor;
1234                    if cursor.is_none() {
1235                        return Ok(Arc::new(all_resources));
1236                    }
1237                }
1238                Err(e) => return Err(Arc::new(e)),
1239            }
1240        }
1241    }
1242
1243    /// Resource counterpart of [`Self::refresh_tools_signaling`].
1244    async fn refresh_resources_signaling(
1245        &self,
1246        lock_held: tokio::sync::oneshot::Sender<()>,
1247        on_change: Option<ListChangedCallback>,
1248    ) {
1249        let mut guard = self.resources.write().await;
1250        // See `refresh_tools_signaling` — fire under the write lock,
1251        // before install, so subscribers' next read sees the post-swap
1252        // state.
1253        self.resources_changed.notify_waiters();
1254        let _ = lock_held.send(());
1255        if let Some(cb) = on_change {
1256            cb();
1257        }
1258        let mut all_resources = Vec::new();
1259        let mut cursor: Option<String> = None;
1260        let result = loop {
1261            match self.rpc_list_resources(cursor.as_deref()).await {
1262                Ok(page) => {
1263                    all_resources.extend(page.resources);
1264                    cursor = page.next_cursor;
1265                    if cursor.is_none() {
1266                        break Ok(Arc::new(all_resources));
1267                    }
1268                }
1269                Err(e) => break Err(Arc::new(e)),
1270            }
1271        };
1272        *guard = result;
1273    }
1274
1275    /// Builds a GET request to the MCP endpoint for receiving server
1276    /// notifications via SSE.
1277    fn get(&self) -> reqwest::RequestBuilder {
1278        let mut request = self
1279            .http_client
1280            .get(&self.url)
1281            .header("Accept", "text/event-stream");
1282        for (name, value) in &self.headers {
1283            request = request.header(name, value);
1284        }
1285        // Mcp-Session-Id last so it always wins over `headers`.
1286        request = request.header("Mcp-Session-Id", &self.session_id);
1287        request
1288    }
1289
1290    /// Listens for `notifications/tools/list_changed` and
1291    /// `notifications/resources/list_changed` on an SSE stream. On each
1292    /// notification, write-locks and re-fetches the full list.
1293    ///
1294    /// `initial_lines` is the pre-opened SSE line reader handed in by
1295    /// [`Client::connect`](super::Client::connect) — that stream is
1296    /// consumed first. When it ends (or any later GET reconnect ends),
1297    /// we sleep `backoff_initial_interval` and open a fresh GET `/` SSE
1298    /// stream.
1299    ///
1300    /// Takes a [`Weak<Self>`] (not `Arc<Self>`) so the spawned task
1301    /// doesn't itself keep the [`Connection`] alive, and a
1302    /// [`CancellationToken`] sibling clone of the connection's
1303    /// [`DropGuard`] so the task tears down the instant the last
1304    /// external `Arc<ConnectionInner>` clone is dropped — every
1305    /// blocking await (line read, reconnect send, backoff sleep) is
1306    /// raced against `cancel.cancelled()` and exits without any zombie
1307    /// retries against a now-dead session.
1308    async fn listen_for_list_changes(
1309        weak: Weak<Self>,
1310        cancel: CancellationToken,
1311        initial_lines: super::LinesStream,
1312    ) {
1313        // First iteration: use the pre-opened SSE stream the client
1314        // handed us. After that, fall back to opening fresh GET / SSE
1315        // streams as the upstream connection cycles.
1316        let mut next_lines: Option<super::LinesStream> = Some(initial_lines);
1317        // One-shot guard for the catch-up refresh: false on the very
1318        // first iteration (the caller's pre-opened SSE stream — its
1319        // associated cache was just populated by `Client::connect`'s
1320        // initial pagination, so re-fetching there would just be a
1321        // wasted round-trip), true thereafter. Every stream end —
1322        // whether `Ok(None)` (clean close) or `Err(_)` (read failure)
1323        // — drops back here, which we treat as an implicit
1324        // list-changed notification: the upstream's broadcast (in
1325        // particular the proxy's per-session `tokio::broadcast`) is
1326        // lossy for moments when this listener has zero active
1327        // subscribers, so anything that fired during our disconnect
1328        // window may have been dropped.
1329        //
1330        // ORDER MATTERS. The refresh must run AFTER we've re-opened
1331        // the GET / SSE stream — i.e. after we're a subscriber again
1332        // — and BEFORE we enter the inner read loop. If we refreshed
1333        // before the resubscribe, a notification that fired between
1334        // our refresh-completion and our subscribe would be lost the
1335        // same way as the original disconnect-window drops; doing it
1336        // after means a notification fired DURING the refresh lands
1337        // in the new subscriber's buffer (broadcast::Sender::send
1338        // backs onto each receiver's channel-capacity slot) and gets
1339        // consumed by the inner loop on its next read.
1340        let mut is_reconnect = false;
1341
1342        loop {
1343            // The token cancels deterministically when the last
1344            // `Arc<ConnectionInner>` clone is dropped (see
1345            // `_listener_cancel_guard`). Check once per outer
1346            // iteration, but the real protection is the cancel arms in
1347            // every blocking await below — those exit immediately on
1348            // cancel.
1349            if cancel.is_cancelled() {
1350                return;
1351            }
1352            let Some(this) = weak.upgrade() else { return };
1353            let backoff_delay = this.backoff_initial_interval;
1354
1355            let mut lines = match next_lines.take() {
1356                Some(l) => l,
1357                None => {
1358                    // Race the upstream GET against cancellation — if
1359                    // the connection drops mid-reconnect, exit
1360                    // immediately rather than waiting for the request
1361                    // to complete or time out (otherwise produces a
1362                    // burst of 401 retries against a now-dead session
1363                    // under heavy churn).
1364                    let send_outcome = tokio::select! {
1365                        out = this.get().send() => out,
1366                        _ = cancel.cancelled() => {
1367                            drop(this);
1368                            return;
1369                        }
1370                    };
1371                    let response = match send_outcome {
1372                        Ok(r) if r.status().is_success() => r,
1373                        _ => {
1374                            drop(this);
1375                            // Sleep with cancel-arm: instant exit on
1376                            // drop, no zombie retries.
1377                            tokio::select! {
1378                                _ = tokio::time::sleep(backoff_delay) => {}
1379                                _ = cancel.cancelled() => return,
1380                            }
1381                            continue;
1382                        }
1383                    };
1384                    super::lines_from_response(response)
1385                }
1386            };
1387
1388            // Catch-up refresh on every reconnect — the implicit
1389            // list-changed treatment for the just-failed stream. See
1390            // the `is_reconnect` doc-comment above for the
1391            // refresh-AFTER-resubscribe rationale.
1392            if is_reconnect {
1393                // tools and resources are independent locks; run the
1394                // catch-up refreshes concurrently so disconnect
1395                // recovery isn't sequential.
1396                let _ = tokio::join!(
1397                    this.refresh_tools(this.on_tools_list_changed.get()),
1398                    this.refresh_resources(this.on_resources_list_changed.get()),
1399                );
1400            }
1401            is_reconnect = true;
1402
1403            'inner: loop {
1404                tokio::select! {
1405                    line_result = lines.next_line() => {
1406                        match line_result {
1407                            Ok(Some(line)) => {
1408                                // SSE data lines start with "data: ".
1409                                let Some(data) = line.strip_prefix("data: ") else {
1410                                    continue 'inner;
1411                                };
1412                                let method = match serde_json::from_str::<super::JsonRpcNotification>(data) {
1413                                    Ok(n) => n.method,
1414                                    Err(_) => continue 'inner,
1415                                };
1416                                match method.as_str() {
1417                                    "notifications/tools/list_changed" => {
1418                                        // refresh_tools fires the
1419                                        // callback after the cache is
1420                                        // installed, so the proxy's
1421                                        // downstream
1422                                        // notifications/tools/list_changed
1423                                        // emission lines up with the
1424                                        // staleness window opening.
1425                                        this.refresh_tools(
1426                                            this.on_tools_list_changed.get(),
1427                                        )
1428                                        .await;
1429                                    }
1430                                    "notifications/resources/list_changed" => {
1431                                        this.refresh_resources(
1432                                            this.on_resources_list_changed.get(),
1433                                        )
1434                                        .await;
1435                                    }
1436                                    _ => {}
1437                                }
1438                            }
1439                            // Stream ended cleanly or errored — break out
1440                            // to the outer loop so we either reconnect or,
1441                            // if everyone's gone, exit at the top.
1442                            _ => break 'inner,
1443                        }
1444                    }
1445                    // Cancellation: the connection's last clone has
1446                    // dropped. Tear down immediately.
1447                    _ = cancel.cancelled() => {
1448                        drop(this);
1449                        return;
1450                    }
1451                }
1452            }
1453
1454            // Stream ended — drop the strong ref before sleeping so the
1455            // next iteration's weak-upgrade can detect liveness honestly.
1456            drop(this);
1457            tokio::select! {
1458                _ = tokio::time::sleep(backoff_delay) => {}
1459                _ = cancel.cancelled() => return,
1460            }
1461        }
1462    }
1463}
1464
1465#[cfg(test)]
1466mod subscribe_tests {
1467    use super::*;
1468    use crate::mcp::tool::{Tool, ToolSchemaObject, ToolSchemaType};
1469
1470    fn tool(name: &str) -> Tool {
1471        Tool {
1472            name: name.to_string(),
1473            title: None,
1474            description: None,
1475            icons: None,
1476            input_schema: ToolSchemaObject {
1477                r#type: ToolSchemaType::Object,
1478                properties: None,
1479                required: None,
1480                extra: IndexMap::new(),
1481            },
1482            output_schema: None,
1483            annotations: None,
1484            execution: None,
1485            _meta: None,
1486        }
1487    }
1488
1489    /// First read shows a different list — return immediately, never wait.
1490    #[tokio::test]
1491    async fn subscribe_tools_returns_immediately_when_cache_differs() {
1492        let conn = Connection::new_for_test("t".into(), "http://x".into());
1493        *conn.inner.tools.write().await = Ok(Arc::new(vec![tool("a")]));
1494
1495        let start = std::time::Instant::now();
1496        let got = conn
1497            .subscribe_tools(&[tool("b")], Duration::from_secs(5))
1498            .await
1499            .unwrap();
1500        assert!(start.elapsed() < Duration::from_millis(100));
1501        assert_eq!(got.as_slice(), &[tool("a")]);
1502    }
1503
1504    /// Cached `Err` is treated as "different from any caller snapshot."
1505    #[tokio::test]
1506    async fn subscribe_tools_returns_err_immediately() {
1507        let conn = Connection::new_for_test("t".into(), "http://x".into());
1508        let err = super::super::Error::NoSessionId {
1509            url: "http://x".into(),
1510            body: String::new(),
1511        };
1512        *conn.inner.tools.write().await = Err(Arc::new(err));
1513
1514        let start = std::time::Instant::now();
1515        let got = conn
1516            .subscribe_tools(&[], Duration::from_secs(5))
1517            .await;
1518        assert!(start.elapsed() < Duration::from_millis(100));
1519        assert!(got.is_err());
1520    }
1521
1522    /// Cache equals snapshot, then a writer fires the notify under the
1523    /// write lock and installs a new list. The subscriber wakes, then its
1524    /// re-read blocks behind the writer's guard, observes the new list.
1525    #[tokio::test]
1526    async fn subscribe_tools_wakes_on_change_and_reads_post_swap() {
1527        let conn = Connection::new_for_test("t".into(), "http://x".into());
1528        *conn.inner.tools.write().await = Ok(Arc::new(vec![tool("a")]));
1529
1530        let conn_for_subscriber = conn.clone();
1531        let subscriber = tokio::spawn(async move {
1532            conn_for_subscriber
1533                .subscribe_tools(&[tool("a")], Duration::from_secs(5))
1534                .await
1535                .unwrap()
1536        });
1537
1538        // Give the subscriber a moment to arm `notified()` and finish
1539        // its first read so it's parked on the timeout.
1540        tokio::time::sleep(Duration::from_millis(50)).await;
1541
1542        // Simulate `refresh_tools_signaling`: take the write lock, fire
1543        // `tools_changed` *while holding* the write lock, then install
1544        // the new value before releasing. This is exactly the ordering
1545        // that the real refresh path uses.
1546        {
1547            let mut guard = conn.inner.tools.write().await;
1548            conn.inner.tools_changed.notify_waiters();
1549            // Hold briefly to make absolutely sure the subscriber is
1550            // racing the read lock against our drop.
1551            tokio::time::sleep(Duration::from_millis(20)).await;
1552            *guard = Ok(Arc::new(vec![tool("b")]));
1553        }
1554
1555        let got = tokio::time::timeout(Duration::from_secs(2), subscriber)
1556            .await
1557            .expect("subscriber returned in time")
1558            .expect("subscriber didn't panic");
1559        assert_eq!(got.as_slice(), &[tool("b")]);
1560    }
1561
1562    /// Cache equals snapshot, no notification arrives — timeout, return
1563    /// the still-equal list (not an error).
1564    #[tokio::test]
1565    async fn subscribe_tools_times_out_and_returns_unchanged_list() {
1566        let conn = Connection::new_for_test("t".into(), "http://x".into());
1567        *conn.inner.tools.write().await = Ok(Arc::new(vec![tool("a")]));
1568
1569        let start = std::time::Instant::now();
1570        let got = conn
1571            .subscribe_tools(&[tool("a")], Duration::from_millis(50))
1572            .await
1573            .unwrap();
1574        let elapsed = start.elapsed();
1575        assert!(elapsed >= Duration::from_millis(40), "elapsed: {elapsed:?}");
1576        assert!(elapsed < Duration::from_millis(500), "elapsed: {elapsed:?}");
1577        assert_eq!(got.as_slice(), &[tool("a")]);
1578    }
1579
1580    /// Two concurrent subscribers both wake on a single notify_waiters
1581    /// and both observe the post-swap list.
1582    #[tokio::test]
1583    async fn subscribe_tools_supports_concurrent_subscribers() {
1584        let conn = Connection::new_for_test("t".into(), "http://x".into());
1585        *conn.inner.tools.write().await = Ok(Arc::new(vec![tool("a")]));
1586
1587        let c1 = conn.clone();
1588        let c2 = conn.clone();
1589        let s1 = tokio::spawn(async move {
1590            c1.subscribe_tools(&[tool("a")], Duration::from_secs(5))
1591                .await
1592                .unwrap()
1593        });
1594        let s2 = tokio::spawn(async move {
1595            c2.subscribe_tools(&[tool("a")], Duration::from_secs(5))
1596                .await
1597                .unwrap()
1598        });
1599
1600        tokio::time::sleep(Duration::from_millis(50)).await;
1601
1602        {
1603            let mut guard = conn.inner.tools.write().await;
1604            conn.inner.tools_changed.notify_waiters();
1605            *guard = Ok(Arc::new(vec![tool("c")]));
1606        }
1607
1608        let (r1, r2) = tokio::join!(s1, s2);
1609        let r1 = r1.unwrap();
1610        let r2 = r2.unwrap();
1611        assert_eq!(r1.as_slice(), &[tool("c")]);
1612        assert_eq!(r2.as_slice(), &[tool("c")]);
1613    }
1614}
1615
1616#[cfg(test)]
1617mod drain_notifications_tests {
1618    use super::*;
1619    use crate::mcp::tool::{ContentBlock, TextContent};
1620    use serde_json::json;
1621    use wiremock::matchers::{header, method, path};
1622    use wiremock::{Mock, MockServer, ResponseTemplate};
1623
1624    /// Happy path: proxy returns `[text, text]`, we parse it as two
1625    /// `ContentBlock::Text` and return them in order.
1626    #[tokio::test]
1627    async fn drain_notifications_parses_text_blocks_in_order() {
1628        let server = MockServer::start().await;
1629        Mock::given(method("GET"))
1630            .and(path("/notify"))
1631            .and(header("Mcp-Session-Id", ""))
1632            .respond_with(ResponseTemplate::new(200).set_body_json(json!([
1633                {"type": "text", "text": "first"},
1634                {"type": "text", "text": "second"},
1635            ])))
1636            .mount(&server)
1637            .await;
1638
1639        let conn = Connection::new_for_test("t".into(), server.uri());
1640        let blocks = conn.drain_notifications().await.expect("drain ok");
1641        assert_eq!(blocks.len(), 2);
1642        match &blocks[0] {
1643            ContentBlock::Text(TextContent { text, .. }) => assert_eq!(text, "first"),
1644            other => panic!("expected text, got {other:?}"),
1645        }
1646        match &blocks[1] {
1647            ContentBlock::Text(TextContent { text, .. }) => assert_eq!(text, "second"),
1648            other => panic!("expected text, got {other:?}"),
1649        }
1650    }
1651
1652    /// 404 (proxy lost the session, e.g. after a restart) → empty vec
1653    /// rather than an error. The next upstream call will surface the
1654    /// session-lost condition through its own error path; init-time
1655    /// drain shouldn't be the one to abort the request.
1656    #[tokio::test]
1657    async fn drain_notifications_404_returns_empty() {
1658        let server = MockServer::start().await;
1659        Mock::given(method("GET"))
1660            .and(path("/notify"))
1661            .respond_with(ResponseTemplate::new(404))
1662            .mount(&server)
1663            .await;
1664
1665        let conn = Connection::new_for_test("t".into(), server.uri());
1666        let blocks = conn.drain_notifications().await.expect("404 → ok(empty)");
1667        assert!(blocks.is_empty(), "expected empty vec, got {blocks:?}");
1668    }
1669
1670    /// Empty queue → empty array → empty vec. The most common case.
1671    #[tokio::test]
1672    async fn drain_notifications_empty_queue_returns_empty() {
1673        let server = MockServer::start().await;
1674        Mock::given(method("GET"))
1675            .and(path("/notify"))
1676            .respond_with(ResponseTemplate::new(200).set_body_json(json!([])))
1677            .mount(&server)
1678            .await;
1679
1680        let conn = Connection::new_for_test("t".into(), server.uri());
1681        let blocks = conn.drain_notifications().await.expect("drain ok");
1682        assert!(blocks.is_empty(), "expected empty vec, got {blocks:?}");
1683    }
1684
1685    /// Non-success / non-404 status propagates as `BadStatus`.
1686    #[tokio::test]
1687    async fn drain_notifications_5xx_returns_bad_status() {
1688        let server = MockServer::start().await;
1689        Mock::given(method("GET"))
1690            .and(path("/notify"))
1691            .respond_with(ResponseTemplate::new(500).set_body_string("boom"))
1692            .mount(&server)
1693            .await;
1694
1695        let conn = Connection::new_for_test("t".into(), server.uri());
1696        let err = conn
1697            .drain_notifications()
1698            .await
1699            .expect_err("5xx → err");
1700        match err {
1701            super::super::Error::BadStatus { code, body, .. } => {
1702                assert_eq!(code.as_u16(), 500);
1703                assert_eq!(body, "boom");
1704            }
1705            other => panic!("expected BadStatus, got {other:?}"),
1706        }
1707    }
1708
1709    /// Mock connections never hit the network and always return empty.
1710    #[tokio::test]
1711    async fn drain_notifications_mock_returns_empty() {
1712        let conn = Connection::new_mock("http://does-not-matter".into());
1713        let blocks = conn.drain_notifications().await.expect("mock ok");
1714        assert!(blocks.is_empty());
1715    }
1716}