objectiveai_sdk/mcp/connection.rs
1//! MCP connection for communicating with an MCP server.
2//!
3//! [`Connection`] is a cheaply-clonable handle around an internal
4//! [`ConnectionInner`]. The last drop of the inner `Arc` runs
5//! [`ConnectionInner`]'s `Drop`, which cancels the listener task's
6//! [`tokio_util::sync::CancellationToken`] (held in `_listener_cancel_guard`
7//! as a [`tokio_util::sync::DropGuard`]) — the SSE listener exits the
8//! instant any in-flight reconnect, sleep, or read is cancelled, with no
9//! zombie 401 retries against a now-dead proxy session.
10
11use std::ops::Deref;
12use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
13use std::sync::{Arc, RwLock as StdRwLock, Weak};
14use std::time::Duration;
15
16use indexmap::IndexMap;
17use tokio::sync::{Notify, RwLock};
18use tokio_util::sync::{CancellationToken, DropGuard};
19
20/// Callback fired by [`Connection`] when the upstream MCP server emits
21/// `notifications/tools/list_changed` or `notifications/resources/list_changed`.
22///
23/// **Timing:** runs after the corresponding cache's write lock is taken
24/// but *before* the network paginate that replaces it. That ordering
25/// matches the moment the staleness window opens — anyone blocked on the
26/// read lock won't return until the new list lands. The callback should
27/// not call back into `list_tools` / `list_resources`: doing so would
28/// re-take the lock the listener already holds and deadlock.
29///
30/// Stored behind an `Arc` so the listener task can cheaply clone it out
31/// of the lock and call it without holding the read guard.
32pub type ListChangedCallback = Arc<dyn Fn() + Send + Sync + 'static>;
33
34/// A registered-or-not callback slot. Wrapper so [`ConnectionInner`] can
35/// keep `#[derive(Debug)]` (a raw `dyn Fn` isn't `Debug`).
36struct CallbackSlot(StdRwLock<Option<ListChangedCallback>>);
37
38impl CallbackSlot {
39 fn new() -> Self {
40 Self(StdRwLock::new(None))
41 }
42
43 fn set(&self, callback: ListChangedCallback) {
44 *self.0.write().unwrap() = Some(callback);
45 }
46
47 /// Cheap clone-out of the current callback (if any). The `Arc` clone
48 /// lets us release the read guard before invoking the callback.
49 fn get(&self) -> Option<ListChangedCallback> {
50 self.0.read().unwrap().clone()
51 }
52}
53
54impl std::fmt::Debug for CallbackSlot {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 let set = self.0.read().map(|g| g.is_some()).unwrap_or(false);
57 f.debug_struct("CallbackSlot").field("set", &set).finish()
58 }
59}
60
61/// An active connection to an MCP server using the Streamable HTTP transport.
62///
63/// Cheaply clonable (one `Arc` bump). When the last clone is dropped, the
64/// inner `Arc` ref count hits zero, [`ConnectionInner::Drop`] runs, the
65/// listener-cancel `DropGuard` is dropped, and the SSE listener task is
66/// cancelled — exiting any in-flight `lines.next_line()`, reconnect
67/// `send()`, or backoff `sleep` *immediately* without retrying against
68/// the now-dead proxy session.
69///
70/// Use the public methods (`list_tools`, `call_tool`, `list_resources`,
71/// `read_resource`, `call_tool_as_message`, `tool_key`) for the upstream
72/// MCP protocol surface. The inner state ([`ConnectionInner`]) is also
73/// reachable via `Deref` for read-only field access (e.g.
74/// `connection.url`, `connection.initialize_result.server_info.name`),
75/// but its methods are private — you must go through `Connection`.
76#[derive(Debug)]
77pub struct Connection {
78 inner: Arc<ConnectionInner>,
79}
80
81impl Clone for Connection {
82 fn clone(&self) -> Self {
83 Self {
84 inner: Arc::clone(&self.inner),
85 }
86 }
87}
88
89// No `Drop` for `Connection`: cancellation happens deterministically
90// when the last `Arc<ConnectionInner>` clone is dropped, which runs
91// `ConnectionInner::drop` and releases the cancel-token DropGuard.
92
93impl Deref for Connection {
94 type Target = ConnectionInner;
95 fn deref(&self) -> &ConnectionInner {
96 &self.inner
97 }
98}
99
100impl Connection {
101 /// Tear this connection down explicitly.
102 ///
103 /// 1. Cancels the long-lived list-changed listener task immediately
104 /// (drops the [`DropGuard`] in
105 /// [`ConnectionInner::_listener_cancel_guard`]), so by the time
106 /// the HTTP DELETE goes out the listener isn't still holding an
107 /// SSE read open against the upstream we're about to close.
108 /// 2. Issues `DELETE /` to the upstream with this connection's
109 /// `Mcp-Session-Id` and the same merged header set every other
110 /// RPC stamps. Reuses [`ConnectionInner::call_timeout`].
111 /// 3. Treats `404 / 401 / 403` as success — the upstream is
112 /// unreachable from these credentials anyway, which is the
113 /// desired terminal state. Other non-2xx surfaces as
114 /// [`super::Error::BadStatus`].
115 ///
116 /// Takes `&self`: the listener cancel is in-place, and dropping
117 /// the surrounding `Arc<ConnectionInner>` (which closes the rest
118 /// of the connection's owned state) is the caller's responsibility
119 /// — usually by dropping the `Arc<Session>` holding it. Stateless
120 /// callers that don't hold a `Connection` should use
121 /// [`Client::delete`](super::Client::delete) instead.
122 ///
123 /// **In-flight RPC ordering.** This method does not block on
124 /// in-flight `call_tool` / `read_resource` / `list_tools` /
125 /// `list_resources` calls on the same connection. If one is
126 /// outstanding when `delete` lands, the upstream may see DELETE
127 /// before the RPC's reply makes it back; the in-flight call then
128 /// surfaces as a closed-connection error to whoever started it.
129 /// That's the spec-correct order (client said terminate) — drain
130 /// on the caller side first if you need different semantics.
131 pub async fn delete(&self) -> Result<(), super::Error> {
132 // 1. Mark the connection as "used" so the drop-time
133 // orphan-DELETE check (see `ConnectionInner::Drop`) skips
134 // its own fan-out. Without this, a fresh-mint connection
135 // that's explicitly torn down via `delete()` would race
136 // its own drop-time orphan into a duplicate upstream
137 // DELETE.
138 self.inner.any_calls.store(true, Ordering::Relaxed);
139
140 // 2. Drop the listener-cancel guard. Releasing the `DropGuard`
141 // cancels the sibling `CancellationToken` the listener task
142 // holds; the listener `tokio::select!`s against it on every
143 // blocking await and exits inside one scheduler tick.
144 if let Ok(mut guard) = self.inner._listener_cancel_guard.lock() {
145 let _ = guard.take();
146 }
147
148 // 3. Build + send HTTP DELETE. Mirrors `Client::connect_once`'s
149 // request-stamp shape: header loop first, explicit
150 // `Mcp-Session-Id` always wins.
151 let request = self
152 .inner
153 .http_client
154 .delete(&self.inner.url)
155 .timeout(self.inner.call_timeout)
156 .headers(self.inner.build_request_headers(None, None).await);
157 let response = request.send().await.map_err(|source| {
158 super::Error::Request {
159 url: self.inner.url.clone(),
160 source,
161 }
162 })?;
163
164 // 4. 404 / 401 / 403 → success; other non-2xx → real error.
165 let status = response.status();
166 if matches!(
167 status,
168 reqwest::StatusCode::NOT_FOUND
169 | reqwest::StatusCode::UNAUTHORIZED
170 | reqwest::StatusCode::FORBIDDEN
171 ) {
172 return Ok(());
173 }
174 if !status.is_success() {
175 let body = response.text().await.unwrap_or_default();
176 return Err(super::Error::BadStatus {
177 url: self.inner.url.clone(),
178 code: status,
179 body: body.chars().take(800).collect(),
180 });
181 }
182 Ok(())
183 }
184
185 pub(super) async fn new(
186 http_client: reqwest::Client,
187 url: String,
188 session_id: String,
189 headers: IndexMap<String, String>,
190 backoff_current_interval: Duration,
191 backoff_initial_interval: Duration,
192 backoff_randomization_factor: f64,
193 backoff_multiplier: f64,
194 backoff_max_interval: Duration,
195 backoff_max_elapsed_time: Duration,
196 call_timeout: Duration,
197 initialize_result: super::initialize_result::InitializeResult,
198 initial_sse_lines: Option<super::LinesStream>,
199 is_reconnect: bool,
200 ) -> Self {
201 let inner = ConnectionInner::new(
202 http_client,
203 url,
204 session_id,
205 headers,
206 backoff_current_interval,
207 backoff_initial_interval,
208 backoff_randomization_factor,
209 backoff_multiplier,
210 backoff_max_interval,
211 backoff_max_elapsed_time,
212 call_timeout,
213 initialize_result,
214 initial_sse_lines,
215 is_reconnect,
216 )
217 .await;
218 Self { inner }
219 }
220
221 #[cfg(test)]
222 pub(crate) fn new_for_test(name: String, url: String) -> Self {
223 Self {
224 inner: ConnectionInner::new_for_test(name, url),
225 }
226 }
227
228 #[cfg(test)]
229 pub(crate) fn new_for_test_with_caps(
230 name: String,
231 url: String,
232 capabilities: super::initialize_result::ServerCapabilities,
233 ) -> Self {
234 Self {
235 inner: ConnectionInner::new_for_test_with_caps(
236 name,
237 url,
238 capabilities,
239 ),
240 }
241 }
242
243 /// Send a JSON-RPC notification to the upstream. Used by `Client`
244 /// right after `initialize` to send `notifications/initialized`.
245 pub(super) async fn notify<P: serde::Serialize>(
246 &self,
247 method: &str,
248 params: &P,
249 ) -> Result<(), super::Error> {
250 self.inner.notify(method, params).await
251 }
252
253 /// Returns a key identifying this connection for tool namespacing.
254 pub fn tool_key(&self) -> String {
255 self.inner.tool_key()
256 }
257
258 /// Returns the session ID for this connection.
259 pub fn session_id(&self) -> &str {
260 self.inner.session_id()
261 }
262
263 /// Returns all tools from the upstream server.
264 pub async fn list_tools(
265 &self,
266 ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
267 self.inner.list_tools().await
268 }
269
270 /// Calls a tool on the upstream server.
271 pub async fn call_tool(
272 &self,
273 params: &super::tool::CallToolRequestParams,
274 ) -> Result<super::tool::CallToolResult, super::Error> {
275 self.inner.call_tool(params).await
276 }
277
278 /// Calls a tool and converts the result into a [`ToolMessage`].
279 pub async fn call_tool_as_message(
280 &self,
281 params: &super::tool::CallToolRequestParams,
282 tool_call_id: String,
283 ) -> Result<crate::agent::completions::message::ToolMessage, super::Error>
284 {
285 self.inner.call_tool_as_message(params, tool_call_id).await
286 }
287
288 /// Returns all resources from the upstream server.
289 pub async fn list_resources(
290 &self,
291 ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
292 self.inner.list_resources().await
293 }
294
295 /// Returns the cached tool list as soon as it differs from `current`,
296 /// or waits up to `timeout` for the next `notifications/tools/list_changed`
297 /// from the upstream server before re-reading.
298 ///
299 /// Wakes the moment a refresh writer takes the cache write lock, so
300 /// the post-wake `read` is guaranteed to observe the new list rather
301 /// than racing against the install. Safe to call from any number of
302 /// tasks concurrently.
303 pub async fn subscribe_tools(
304 &self,
305 current: &[super::tool::Tool],
306 timeout: Duration,
307 ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
308 self.inner.subscribe_tools(current, timeout).await
309 }
310
311 /// Resource counterpart of [`Connection::subscribe_tools`].
312 pub async fn subscribe_resources(
313 &self,
314 current: &[super::resource::Resource],
315 timeout: Duration,
316 ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
317 self.inner.subscribe_resources(current, timeout).await
318 }
319
320 /// Atomically drain the proxy's `pending_notifications` queue for
321 /// this session via `GET /notify` and return the queued content
322 /// blocks. A second call returns `[]` until the next out-of-band
323 /// `POST /notify`.
324 ///
325 /// Intended for use at the start of an agent turn so notifications
326 /// queued between turns — when the prior turn ended without a tool
327 /// call, or the user is starting a fresh continuation — surface as
328 /// a user message instead of being lost. The proxy's existing
329 /// `tools/call` response path still drains in-flight notifications
330 /// arriving *during* a turn; this method covers the gap between
331 /// turns.
332 ///
333 /// A 404 from the proxy (session unknown — possible after a proxy
334 /// restart) is mapped to an empty `Vec` so callers do not need to
335 /// distinguish "no notifications" from "lost session" at the use
336 /// site; the next upstream call will surface the lost-session
337 /// condition through its own error path.
338 pub async fn drain_notifications(
339 &self,
340 ) -> Result<Vec<super::tool::ContentBlock>, super::Error> {
341 self.inner.drain_notifications().await
342 }
343
344 /// Non-draining peek at the proxy's `pending_notifications` queue
345 /// via `GET /notify/queued`. Returns `true` iff the queue holds at
346 /// least one block. Companion to [`Connection::drain_notifications`]
347 /// for callers that want to know whether queued blocks exist
348 /// without consuming them.
349 ///
350 /// A 404 from the proxy (session unknown — possible after a proxy
351 /// restart) is mapped to `Ok(false)` for the same reason as the
352 /// drain path: callers do not need to distinguish "no
353 /// notifications" from "lost session" at the use site.
354 pub async fn has_pending_notifications(
355 &self,
356 ) -> Result<bool, super::Error> {
357 self.inner.has_pending_notifications().await
358 }
359
360 /// `POST <self.url>/notify` against the ObjectiveAI MCP proxy.
361 /// Appends `blocks` to the proxy's pending-notifications queue for
362 /// this session; they surface as a user message on the next
363 /// `tools/call` response (wrapped in a `<system-reminder>` block)
364 /// or as the head of the next agent turn when drained between turns.
365 ///
366 /// Mirror of [`Connection::drain_notifications`] / [`Connection::has_pending_notifications`]
367 /// for the inbound side. A 404 from the proxy means the session is
368 /// gone — surfaced as `SessionExpired` so callers can distinguish
369 /// "session lost" from "delivery failed" at the use site.
370 pub async fn enqueue_notifications(
371 &self,
372 blocks: &[super::tool::ContentBlock],
373 ) -> Result<(), super::Error> {
374 self.inner.enqueue_notifications(blocks).await
375 }
376
377 /// Reads a resource from the upstream server.
378 pub async fn read_resource(
379 &self,
380 uri: &str,
381 ) -> Result<super::resource::ReadResourceResult, super::Error> {
382 self.inner.read_resource(uri).await
383 }
384
385 /// Register a callback to fire whenever the upstream emits
386 /// `notifications/tools/list_changed`.
387 ///
388 /// **Timing:** the callback runs *after* the tool cache's write lock
389 /// is acquired but *before* the network paginate that replaces it.
390 /// That means readers blocked on the read lock won't return until the
391 /// new list is in place, and the callback observes the moment the
392 /// staleness window opens. The proxy uses this to emit its own
393 /// `notifications/tools/list_changed` to downstream clients at the
394 /// right instant.
395 ///
396 /// Replaces any previously-registered tools-list-changed callback.
397 /// All clones of this `Connection` share the same callback slot.
398 pub fn set_on_tools_list_changed<F>(&self, callback: F)
399 where
400 F: Fn() + Send + Sync + 'static,
401 {
402 self.inner.on_tools_list_changed.set(Arc::new(callback));
403 }
404
405 /// Register a callback to fire whenever the upstream emits
406 /// `notifications/resources/list_changed`. Same timing contract as
407 /// [`Connection::set_on_tools_list_changed`].
408 ///
409 /// Replaces any previously-registered resources-list-changed callback.
410 /// All clones of this `Connection` share the same callback slot.
411 pub fn set_on_resources_list_changed<F>(&self, callback: F)
412 where
413 F: Fn() + Send + Sync + 'static,
414 {
415 self.inner.on_resources_list_changed.set(Arc::new(callback));
416 }
417
418 /// Atomically replace the connection's [`ConnectionInner::extra_headers`]
419 /// bag. Every subsequent outbound HTTP request from this connection
420 /// stamps the new map AFTER `headers`, with `HeaderMap::insert`
421 /// REPLACE semantics — keys in `extras` override the same key in
422 /// the per-URL `headers` bag set at `Client::connect`. Caller
423 /// supplies the FULL replacement map; missing keys are dropped
424 /// (no merge).
425 ///
426 /// Used by the proxy to inject session-global headers
427 /// (`X-OBJECTIVEAI-RESPONSE-ID`, `X-OBJECTIVEAI-RESPONSE-IDS`)
428 /// that re-set on every inbound `initialize`, without re-dialing
429 /// the upstream.
430 pub async fn set_extra_headers(
431 &self,
432 extras: IndexMap<String, String>,
433 ) {
434 *self.inner.extra_headers.write().await = extras;
435 }
436}
437
438/// The actual connection state. Behind an `Arc` inside [`Connection`].
439///
440/// Fields are public for read-only access (callers reach them via
441/// `Connection`'s `Deref`), but every method on this type is private —
442/// the public surface lives on [`Connection`] and delegates through.
443#[derive(Debug)]
444pub struct ConnectionInner {
445 pub http_client: reqwest::Client,
446 pub url: String,
447 pub session_id: String,
448 /// All HTTP headers stamped on every POST / GET this connection
449 /// makes — the same merged map (defaults + caller overrides) the
450 /// `Client` built once during connect. `Mcp-Session-Id`,
451 /// `Content-Type`, and `Accept` are still set by the request
452 /// builders and override anything in `headers`.
453 pub headers: IndexMap<String, String>,
454 /// Mutable per-request override layer stamped AFTER `headers` on
455 /// every outbound HTTP request. The request-builder uses
456 /// `reqwest::header::HeaderMap::insert` semantics so any key
457 /// present in `extra_headers` REPLACES the same key in `headers`.
458 /// Used by the proxy to inject session-global headers
459 /// (`X-OBJECTIVEAI-RESPONSE-ID` etc.) that override per-URL
460 /// values without re-dialing. Empty by default; set via
461 /// [`Connection::set_extra_headers`].
462 pub extra_headers: RwLock<IndexMap<String, String>>,
463
464 pub backoff_current_interval: Duration,
465 pub backoff_initial_interval: Duration,
466 pub backoff_randomization_factor: f64,
467 pub backoff_multiplier: f64,
468 pub backoff_max_interval: Duration,
469 pub backoff_max_elapsed_time: Duration,
470 pub call_timeout: Duration,
471
472 /// The server's capabilities and info from the initialize response.
473 pub initialize_result: super::initialize_result::InitializeResult,
474
475 /// `true` iff this connection was opened by resuming an existing
476 /// upstream session — i.e. [`Client::connect`](super::Client::connect)
477 /// was called with `session_id: Some(...)`. Set once at
478 /// construction; never mutated.
479 ///
480 /// Used by the drop-time orphan-DELETE gate in
481 /// [`ConnectionInner::Drop`]: reconnects are **excluded** from
482 /// orphan DELETE because their `any_calls == false` only means
483 /// "this `Connection` instance didn't use it" — an earlier
484 /// instance that opened the upstream session may have. Only
485 /// freshly-minted connections (`is_reconnect == false`) that no
486 /// one used get the drop-time orphan DELETE.
487 is_reconnect: bool,
488
489 /// `true` once any [`Self::call_tool`] or [`Self::read_resource`]
490 /// has been issued through this connection. Listings
491 /// ([`Self::list_tools`], [`Self::list_resources`]) and the
492 /// proxy-side notification helpers do NOT flip this — only
493 /// deliberate use of the upstream session counts.
494 ///
495 /// Stored atomically because the setters live behind `&self`-only
496 /// call paths; `Ordering::Relaxed` is sufficient (we never
497 /// synchronize anything else against this load/store).
498 ///
499 /// Also flipped to `true` at the top of [`super::Connection::delete`]
500 /// so an explicit teardown of a fresh-mint connection can't race
501 /// the drop-time orphan-DELETE into a double-fire.
502 any_calls: AtomicBool,
503
504 /// Auto-incrementing request ID (starts at 2; 1 was used for initialize).
505 next_id: AtomicU64,
506
507 /// All tools from the server, populated by background pagination.
508 ///
509 /// `None` = cache cleared — either pre-populate (between
510 /// [`Self::new`] and the first `refresh_tools_signaling`) or
511 /// post-drop (the listener empties this the moment its SSE
512 /// stream ends so `list_tools` will re-paginate against the
513 /// upstream rather than return stale state). `Some(_)` = last
514 /// known result, `Ok` or `Err`.
515 tools:
516 RwLock<Option<Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>>>>,
517 /// All resources from the server, populated by background pagination.
518 /// Same `None`/`Some` semantics as [`Self::tools`].
519 resources: RwLock<
520 Option<Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>>>,
521 >,
522
523 /// Cancellation token for the long-lived `listen_for_list_changes`
524 /// task. The listener selects this against every blocking await
525 /// (read, reconnect-send, backoff-sleep) and returns the instant it
526 /// fires.
527 ///
528 /// Held inside the connection as a [`DropGuard`] so that the moment
529 /// the last `Arc<ConnectionInner>` clone is dropped — i.e. the
530 /// moment no external `Connection` handle remains — `Drop` runs on
531 /// the guard, the token cancels, and the listener task tears down.
532 /// The listener itself holds a sibling `CancellationToken` (clone),
533 /// not the guard, so its task does not extend the connection's
534 /// lifetime.
535 ///
536 /// Wrapped in `Mutex<Option<_>>` so explicit teardown paths
537 /// ([`Connection::delete`]) can drop the guard in place — firing
538 /// the cancel token *before* the surrounding `Arc<ConnectionInner>`
539 /// goes away. Regular drop still works: `Mutex<Option<DropGuard>>`
540 /// drops its inner `DropGuard` automatically when the mutex itself
541 /// drops, so existing `Connection::Drop` semantics are unchanged.
542 _listener_cancel_guard: std::sync::Mutex<Option<DropGuard>>,
543
544 /// Optional callback fired *after* the listener has refreshed the
545 /// tool cache in response to an upstream `notifications/tools/list_changed`.
546 /// Set via [`Connection::set_on_tools_list_changed`].
547 on_tools_list_changed: CallbackSlot,
548
549 /// Optional callback fired *after* the listener has refreshed the
550 /// resource cache in response to an upstream
551 /// `notifications/resources/list_changed`.
552 /// Set via [`Connection::set_on_resources_list_changed`].
553 on_resources_list_changed: CallbackSlot,
554
555 /// Wakes any task awaiting in [`Connection::subscribe_tools`]. Fired
556 /// from inside `refresh_tools_signaling` the moment the writer
557 /// acquires the cache write lock — *before* the new list is
558 /// installed. A woken subscriber's next `read().await` blocks behind
559 /// the writer's guard, so it always observes the post-swap state.
560 tools_changed: Notify,
561
562 /// Resource counterpart of [`Self::tools_changed`].
563 resources_changed: Notify,
564}
565
566impl ConnectionInner {
567 /// Creates a minimal connection for unit testing. Declares both
568 /// `tools` and `resources` capabilities with `list_changed:
569 /// Some(true)` so callers exercise the present-cap +
570 /// list_changed-enabled paths in `list_*`, `refresh_*`, and
571 /// `subscribe_*`. For other capability shapes use
572 /// `new_for_test_with_caps`.
573 #[cfg(test)]
574 fn new_for_test(name: String, url: String) -> Arc<Self> {
575 Self::new_for_test_with_caps(
576 name,
577 url,
578 super::initialize_result::ServerCapabilities {
579 experimental: None,
580 logging: None,
581 completions: None,
582 prompts: None,
583 resources: Some(
584 super::initialize_result::ResourcesCapability {
585 subscribe: None,
586 list_changed: Some(true),
587 },
588 ),
589 tools: Some(super::initialize_result::ToolsCapability {
590 list_changed: Some(true),
591 }),
592 tasks: None,
593 },
594 )
595 }
596
597 /// Creates a minimal connection for unit testing with an explicit
598 /// `ServerCapabilities`. Used by the capability-gating tests to
599 /// drive each gate's absent-cap branch.
600 #[cfg(test)]
601 fn new_for_test_with_caps(
602 name: String,
603 url: String,
604 capabilities: super::initialize_result::ServerCapabilities,
605 ) -> Arc<Self> {
606 Arc::new(Self {
607 http_client: reqwest::Client::new(),
608 url,
609 session_id: String::new(),
610 headers: IndexMap::new(),
611 extra_headers: RwLock::new(IndexMap::new()),
612 backoff_current_interval: Duration::from_millis(500),
613 backoff_initial_interval: Duration::from_millis(500),
614 backoff_randomization_factor: 0.5,
615 backoff_multiplier: 1.5,
616 backoff_max_interval: Duration::from_secs(60),
617 backoff_max_elapsed_time: Duration::from_secs(900),
618 call_timeout: Duration::from_secs(30),
619 initialize_result: super::initialize_result::InitializeResult {
620 protocol_version: "2025-03-26".into(),
621 capabilities,
622 server_info: super::initialize_result::Implementation {
623 name,
624 title: None,
625 version: "0.0.0".into(),
626 website_url: None,
627 description: None,
628 icons: None,
629 },
630 instructions: None,
631 _meta: None,
632 },
633 is_reconnect: false,
634 any_calls: AtomicBool::new(false),
635 next_id: AtomicU64::new(2),
636 // Test connection has no listener and never refreshes; seed
637 // with an empty Ok so `list_tools` doesn't try to paginate.
638 tools: RwLock::new(Some(Ok(Arc::new(Vec::new())))),
639 resources: RwLock::new(Some(Ok(Arc::new(Vec::new())))),
640 _listener_cancel_guard: std::sync::Mutex::new(None),
641 on_tools_list_changed: CallbackSlot::new(),
642 on_resources_list_changed: CallbackSlot::new(),
643 tools_changed: Notify::new(),
644 resources_changed: Notify::new(),
645 })
646 }
647
648 /// Creates a new connection and spawns background tasks to paginate
649 /// all tools and resources. Called internally by
650 /// [`Client::connect`](super::Client::connect) (via [`Connection::new`]).
651 ///
652 /// `initial_sse_lines`, if `Some`, is a pre-opened SSE line reader
653 /// that the list-changed listener will read from immediately on its
654 /// first iteration, instead of opening its own GET `/`. The caller
655 /// is responsible for arranging for one of these to exist whenever
656 /// the upstream advertises `tools.list_changed` or
657 /// `resources.list_changed` — see
658 /// [`Client::connect`](super::Client::connect).
659 async fn new(
660 http_client: reqwest::Client,
661 url: String,
662 session_id: String,
663 headers: IndexMap<String, String>,
664 backoff_current_interval: Duration,
665 backoff_initial_interval: Duration,
666 backoff_randomization_factor: f64,
667 backoff_multiplier: f64,
668 backoff_max_interval: Duration,
669 backoff_max_elapsed_time: Duration,
670 call_timeout: Duration,
671 initialize_result: super::initialize_result::InitializeResult,
672 initial_sse_lines: Option<super::LinesStream>,
673 is_reconnect: bool,
674 ) -> Arc<Self> {
675 // Cancel-the-listener machinery: store the DropGuard inside the
676 // inner so the cancellation fires deterministically when the
677 // last external `Arc<ConnectionInner>` clone drops. Hand the
678 // listener task a sibling clone (no guard) — that way the
679 // listener task's lifetime does not extend the connection.
680 let listener_cancel = CancellationToken::new();
681 let listener_cancel_for_task = listener_cancel.clone();
682 let conn = Arc::new(Self {
683 http_client,
684 url,
685 session_id,
686 headers,
687 extra_headers: RwLock::new(IndexMap::new()),
688 backoff_current_interval,
689 backoff_initial_interval,
690 backoff_randomization_factor,
691 backoff_multiplier,
692 backoff_max_interval,
693 backoff_max_elapsed_time,
694 call_timeout,
695 initialize_result,
696 is_reconnect,
697 any_calls: AtomicBool::new(false),
698 next_id: AtomicU64::new(2),
699 // Start empty; `refresh_tools_signaling` below installs
700 // `Some(_)` before `new` returns (the lock-handoff oneshot
701 // gates the return on the writer holding the lock).
702 tools: RwLock::new(None),
703 resources: RwLock::new(None),
704 _listener_cancel_guard: std::sync::Mutex::new(Some(
705 listener_cancel.drop_guard(),
706 )),
707 on_tools_list_changed: CallbackSlot::new(),
708 on_resources_list_changed: CallbackSlot::new(),
709 tools_changed: Notify::new(),
710 resources_changed: Notify::new(),
711 });
712
713 // Spawn background tool lister if the server supports tools.
714 //
715 // We don't return until the spawned task has acquired the write
716 // lock. Otherwise a caller that immediately reads `list_tools()`
717 // could race the writer — `tokio::spawn` only queues the task,
718 // and a fast reader can acquire the read lock before the writer
719 // has run its first instruction. The reader would then see the
720 // initial empty `Vec` and return that, even though a full
721 // populate is in flight.
722 //
723 // The `RwLockWriteGuard` itself isn't `Send`-friendly enough to
724 // pass back, so we use a oneshot to signal "I'm holding the
725 // lock now"; once we receive that, the cache is exclusively
726 // owned by the writer and any subsequent `read().await` from
727 // the caller is guaranteed to wait for the populate to finish.
728 if conn.initialize_result.capabilities.tools.is_some() {
729 let conn = Arc::clone(&conn);
730 let (lock_held_tx, lock_held_rx) = tokio::sync::oneshot::channel();
731 tokio::spawn(async move {
732 conn.refresh_tools_signaling(lock_held_tx, None).await;
733 });
734 // Wait for the writer to hold the lock before returning.
735 let _ = lock_held_rx.await;
736 }
737
738 // Spawn background resource lister if the server supports
739 // resources. Same lock-handoff contract as tools above.
740 if conn.initialize_result.capabilities.resources.is_some() {
741 let conn = Arc::clone(&conn);
742 let (lock_held_tx, lock_held_rx) = tokio::sync::oneshot::channel();
743 tokio::spawn(async move {
744 conn.refresh_resources_signaling(lock_held_tx, None).await;
745 });
746 let _ = lock_held_rx.await;
747 }
748
749 // Spawn the list-changed listener iff the caller handed us a
750 // pre-opened SSE stream. The connection is naive about
751 // `tools.list_changed` / `resources.list_changed` capabilities —
752 // [`Client::connect`](super::Client::connect) translates them
753 // into "did or didn't open a stream for us." If we get a stream,
754 // we listen on it; if we don't, there's nothing to listen for.
755 if let Some(initial_lines) = initial_sse_lines {
756 // Hand the listener a `Weak` so the spawned task itself does
757 // not keep the connection alive. `listener_cancel_for_task`
758 // is a sibling clone of the connection's own
759 // `_listener_cancel_guard` token — when the last external
760 // `Arc<ConnectionInner>` clone is dropped, the inner's Drop
761 // releases the guard and the listener wakes from any
762 // pending await (read, send, sleep) and exits immediately.
763 let weak = Arc::downgrade(&conn);
764 tokio::spawn(async move {
765 Self::listen_for_list_changes(
766 weak,
767 listener_cancel_for_task,
768 initial_lines,
769 )
770 .await;
771 });
772 }
773
774 conn
775 }
776
777 /// Server declared a `tools` capability in its `InitializeResult`.
778 /// Gates `list_tools`, `call_tool`, `refresh_tools`. When `false` the
779 /// upstream cannot service `tools/*` RPCs at all and any attempt would
780 /// either hang in idempotent backoff (`tools/list`) or fail with
781 /// `SessionExpired` (`tools/call`).
782 fn has_tools_cap(&self) -> bool {
783 self.initialize_result.capabilities.tools.is_some()
784 }
785
786 /// Server declared a `resources` capability in its `InitializeResult`.
787 /// Gates `list_resources`, `read_resource`, `refresh_resources`, and
788 /// the post-`call_tool` ResourceLink resolution. Same hang-or-fail
789 /// shape as `has_tools_cap` when absent.
790 fn has_resources_cap(&self) -> bool {
791 self.initialize_result.capabilities.resources.is_some()
792 }
793
794 /// Server declared `tools.list_changed: true`. Gates
795 /// `subscribe_tools`'s wait-for-notify branch: when `false` the
796 /// upstream will never push `notifications/tools/list_changed`, so
797 /// awaiting the notify is unreachable and `subscribe_tools` returns
798 /// the current cache immediately.
799 fn has_tools_list_changed(&self) -> bool {
800 matches!(
801 self.initialize_result.capabilities.tools,
802 Some(super::initialize_result::ToolsCapability {
803 list_changed: Some(true),
804 })
805 )
806 }
807
808 /// Server declared `resources.list_changed: true`. Symmetric to
809 /// `has_tools_list_changed`; gates `subscribe_resources`'s
810 /// wait-for-notify branch.
811 fn has_resources_list_changed(&self) -> bool {
812 matches!(
813 self.initialize_result.capabilities.resources,
814 Some(super::initialize_result::ResourcesCapability {
815 list_changed: Some(true),
816 ..
817 })
818 )
819 }
820
821 /// Creates an exponential backoff configuration from the connection's fields.
822 fn backoff(&self) -> backoff::ExponentialBackoff {
823 backoff::ExponentialBackoff {
824 current_interval: self.backoff_current_interval,
825 initial_interval: self.backoff_initial_interval,
826 randomization_factor: self.backoff_randomization_factor,
827 multiplier: self.backoff_multiplier,
828 max_interval: self.backoff_max_interval,
829 start_time: std::time::Instant::now(),
830 max_elapsed_time: Some(self.backoff_max_elapsed_time),
831 clock: backoff::SystemClock::default(),
832 }
833 }
834
835 /// Builds a POST request with all required headers and the call timeout.
836 async fn post(&self) -> reqwest::RequestBuilder {
837 self.http_client
838 .post(&self.url)
839 .timeout(self.call_timeout)
840 .headers(
841 self.build_request_headers(
842 Some("application/json"),
843 Some("application/json, text/event-stream"),
844 )
845 .await,
846 )
847 }
848
849 /// Build the `HeaderMap` stamped on every outbound request. Order
850 /// of insertion drives override semantics — `HeaderMap::insert`
851 /// REPLACES existing values for the same key:
852 ///
853 /// 1. Content-Type / Accept (when supplied by the caller).
854 /// 2. `self.headers` (the per-URL bag set at `Client::connect`).
855 /// 3. `self.extra_headers` (the mutable, session-global overrides
856 /// — proxies use this for `X-OBJECTIVEAI-RESPONSE-ID` etc).
857 /// 4. `Mcp-Session-Id` (the connection's own session id, always
858 /// last so it can never be shadowed).
859 async fn build_request_headers(
860 &self,
861 content_type: Option<&str>,
862 accept: Option<&str>,
863 ) -> reqwest::header::HeaderMap {
864 use reqwest::header::{
865 ACCEPT, CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue,
866 };
867 let mut hmap = HeaderMap::new();
868 if let Some(ct) = content_type {
869 if let Ok(hv) = HeaderValue::from_str(ct) {
870 hmap.insert(CONTENT_TYPE, hv);
871 }
872 }
873 if let Some(a) = accept {
874 if let Ok(hv) = HeaderValue::from_str(a) {
875 hmap.insert(ACCEPT, hv);
876 }
877 }
878 for (k, v) in &self.headers {
879 if let (Ok(hn), Ok(hv)) = (
880 HeaderName::try_from(k.as_str()),
881 HeaderValue::from_str(v),
882 ) {
883 hmap.insert(hn, hv);
884 }
885 }
886 let extras = self.extra_headers.read().await;
887 for (k, v) in extras.iter() {
888 if let (Ok(hn), Ok(hv)) = (
889 HeaderName::try_from(k.as_str()),
890 HeaderValue::from_str(v),
891 ) {
892 hmap.insert(hn, hv);
893 }
894 }
895 drop(extras);
896 if let Ok(hv) = HeaderValue::from_str(&self.session_id) {
897 hmap.insert(HeaderName::from_static("mcp-session-id"), hv);
898 }
899 hmap
900 }
901
902 /// Sends a JSON-RPC request, retrying transient errors when
903 /// `idempotent` is `true`.
904 ///
905 /// Idempotent methods (`tools/list`, `resources/list`,
906 /// `resources/read`, etc.) retry every transient error — network,
907 /// HTTP status, malformed body, JSON-RPC error, session expiration —
908 /// until the backoff's `max_elapsed_time` is exceeded.
909 ///
910 /// Non-idempotent methods (`tools/call`) make exactly one attempt.
911 /// Retrying a `tools/call` is unsafe: a tool may have mutated remote
912 /// state during the first attempt before the response was lost, and
913 /// re-firing the call would mutate state again. Each retry of
914 /// `AppendTask` advances `state.tasks.len()` an extra step, so the
915 /// agent sees a different return value than expected and the
916 /// pid-derived mock seed at the next step diverges. See
917 /// `objectiveai-api/src/agent/completions/client.rs` (sequential
918 /// dispatch) and `mock/client.rs::mock.seed_derive` for the
919 /// downstream consequence.
920 async fn rpc<P: serde::Serialize, R: serde::de::DeserializeOwned>(
921 &self,
922 method: &str,
923 params: &P,
924 idempotent: bool,
925 ) -> Result<R, super::Error> {
926 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
927 let body = serde_json::json!({
928 "jsonrpc": "2.0",
929 "id": id,
930 "method": method,
931 "params": params,
932 });
933
934 let attempt_one = || async {
935 let url = self.url.clone();
936 let response =
937 self.post().await.json(&body).send().await.map_err(|source| {
938 backoff::Error::transient(super::Error::Request {
939 url: url.clone(),
940 source,
941 })
942 })?;
943
944 if response.status() == reqwest::StatusCode::NOT_FOUND {
945 return Err(backoff::Error::transient(
946 super::Error::SessionExpired { url: url.clone() },
947 ));
948 }
949 if !response.status().is_success() {
950 let code = response.status();
951 let body = response.text().await.unwrap_or_default();
952 return Err(backoff::Error::transient(
953 super::Error::BadStatus {
954 url: url.clone(),
955 code,
956 body,
957 },
958 ));
959 }
960
961 let rpc_response: super::JsonRpcResponse<R> =
962 super::parse_streamable_http_response(&url, response)
963 .await
964 .map_err(backoff::Error::transient)?;
965
966 match rpc_response {
967 super::JsonRpcResponse::Success { result, .. } => Ok(result),
968 super::JsonRpcResponse::Error { error, .. } => {
969 Err(backoff::Error::transient(super::Error::JsonRpc {
970 url: url.clone(),
971 code: error.code,
972 message: error.message,
973 data: error.data,
974 }))
975 }
976 }
977 };
978
979 if idempotent {
980 backoff::future::retry(self.backoff(), attempt_one).await
981 } else {
982 attempt_one().await.map_err(|e| match e {
983 backoff::Error::Permanent(err)
984 | backoff::Error::Transient { err, .. } => err,
985 })
986 }
987 }
988
989 /// Sends a JSON-RPC notification (no response expected) with the
990 /// same exponential-backoff retry policy as [`Self::rpc`]. Every
991 /// error is transient; the loop gives up only when the backoff's
992 /// `max_elapsed_time` is exceeded.
993 async fn notify<P: serde::Serialize>(
994 &self,
995 method: &str,
996 params: &P,
997 ) -> Result<(), super::Error> {
998 let body = serde_json::json!({
999 "jsonrpc": "2.0",
1000 "method": method,
1001 "params": params,
1002 });
1003
1004 backoff::future::retry(self.backoff(), || async {
1005 let url = self.url.clone();
1006 let response =
1007 self.post().await.json(&body).send().await.map_err(|source| {
1008 backoff::Error::transient(super::Error::Request {
1009 url: url.clone(),
1010 source,
1011 })
1012 })?;
1013
1014 if response.status() == reqwest::StatusCode::NOT_FOUND {
1015 return Err(backoff::Error::transient(
1016 super::Error::SessionExpired { url: url.clone() },
1017 ));
1018 }
1019 if !response.status().is_success() {
1020 let code = response.status();
1021 let body = response.text().await.unwrap_or_default();
1022 return Err(backoff::Error::transient(
1023 super::Error::BadStatus {
1024 url: url.clone(),
1025 code,
1026 body,
1027 },
1028 ));
1029 }
1030
1031 Ok(())
1032 })
1033 .await
1034 }
1035
1036 /// `GET <self.url>/notify` against the ObjectiveAI MCP proxy.
1037 /// Atomically drains the proxy's pending-notifications queue for
1038 /// this session and returns the queued content blocks.
1039 ///
1040 /// Single-attempt — the proxy drain is destructive, so a retry
1041 /// after a transient failure would risk silently dropping
1042 /// notifications that the first attempt's response carried but
1043 /// failed to deliver. Networks errors propagate to the caller; the
1044 /// next turn's drain will pick up anything queued in the meantime.
1045 /// A 404 (session unknown) is mapped to `Ok(vec![])` — see the
1046 /// public method's doc on `Connection`.
1047 async fn drain_notifications(
1048 &self,
1049 ) -> Result<Vec<super::tool::ContentBlock>, super::Error> {
1050 let url = format!("{}/notify", self.url.trim_end_matches('/'));
1051 let request = self
1052 .http_client
1053 .get(&url)
1054 .timeout(self.call_timeout)
1055 .headers(
1056 self.build_request_headers(None, Some("application/json"))
1057 .await,
1058 );
1059
1060 let response =
1061 request
1062 .send()
1063 .await
1064 .map_err(|source| super::Error::Request {
1065 url: url.clone(),
1066 source,
1067 })?;
1068
1069 if response.status() == reqwest::StatusCode::NOT_FOUND {
1070 return Ok(Vec::new());
1071 }
1072 if !response.status().is_success() {
1073 let code = response.status();
1074 let body = response.text().await.unwrap_or_default();
1075 return Err(super::Error::BadStatus { url, code, body });
1076 }
1077
1078 response
1079 .json::<Vec<super::tool::ContentBlock>>()
1080 .await
1081 .map_err(|source| super::Error::Request { url, source })
1082 }
1083
1084 /// `POST <self.url>/notify` against the ObjectiveAI MCP proxy.
1085 /// Appends `blocks` to the proxy's pending-notifications queue for
1086 /// this session. Single-attempt — the caller decides whether to
1087 /// retry. A 404 (session unknown) surfaces as `SessionExpired`
1088 /// rather than `Ok(())` because the caller is asking for delivery
1089 /// and a lost session means delivery did not happen.
1090 async fn enqueue_notifications(
1091 &self,
1092 blocks: &[super::tool::ContentBlock],
1093 ) -> Result<(), super::Error> {
1094 let url = format!("{}/notify", self.url.trim_end_matches('/'));
1095 let request = self
1096 .http_client
1097 .post(&url)
1098 .timeout(self.call_timeout)
1099 .headers(
1100 self.build_request_headers(
1101 Some("application/json"),
1102 Some("application/json"),
1103 )
1104 .await,
1105 )
1106 .json(blocks);
1107
1108 let response =
1109 request
1110 .send()
1111 .await
1112 .map_err(|source| super::Error::Request {
1113 url: url.clone(),
1114 source,
1115 })?;
1116
1117 if response.status() == reqwest::StatusCode::NOT_FOUND {
1118 return Err(super::Error::SessionExpired { url });
1119 }
1120 if !response.status().is_success() {
1121 let code = response.status();
1122 let body = response.text().await.unwrap_or_default();
1123 return Err(super::Error::BadStatus { url, code, body });
1124 }
1125
1126 Ok(())
1127 }
1128
1129 /// `GET <self.url>/notify/queued` against the ObjectiveAI MCP proxy.
1130 /// Non-draining peek — returns `true` iff the proxy's
1131 /// pending-notifications queue for this session is non-empty.
1132 /// A 404 (session unknown) is mapped to `Ok(false)` to match the
1133 /// drain path's soft-fallback contract.
1134 async fn has_pending_notifications(&self) -> Result<bool, super::Error> {
1135 let url = format!("{}/notify/queued", self.url.trim_end_matches('/'));
1136 let request = self
1137 .http_client
1138 .get(&url)
1139 .timeout(self.call_timeout)
1140 .headers(
1141 self.build_request_headers(None, Some("application/json"))
1142 .await,
1143 );
1144
1145 let response =
1146 request
1147 .send()
1148 .await
1149 .map_err(|source| super::Error::Request {
1150 url: url.clone(),
1151 source,
1152 })?;
1153
1154 if response.status() == reqwest::StatusCode::NOT_FOUND {
1155 return Ok(false);
1156 }
1157 if !response.status().is_success() {
1158 let code = response.status();
1159 let body = response.text().await.unwrap_or_default();
1160 return Err(super::Error::BadStatus { url, code, body });
1161 }
1162
1163 response
1164 .json::<bool>()
1165 .await
1166 .map_err(|source| super::Error::Request { url, source })
1167 }
1168
1169 /// Returns a key identifying this connection for tool namespacing.
1170 fn tool_key(&self) -> String {
1171 format!("{}-{}", self.initialize_result.server_info.name, self.url)
1172 }
1173
1174 /// Returns the session ID for this connection.
1175 fn session_id(&self) -> &str {
1176 &self.session_id
1177 }
1178
1179 /// Sends a `tools/list` RPC call for a single page.
1180 async fn rpc_list_tools(
1181 &self,
1182 cursor: Option<&str>,
1183 ) -> Result<super::tool::ListToolsResult, super::Error> {
1184 self.rpc(
1185 "tools/list",
1186 &super::tool::ListToolsRequest {
1187 cursor: cursor.map(String::from),
1188 },
1189 true,
1190 )
1191 .await
1192 }
1193
1194 /// Returns all tools from the server.
1195 ///
1196 /// Blocks until background pagination completes, then returns a
1197 /// cheap `Arc` clone of the result. If the cache is currently
1198 /// empty (e.g. because the listener detected its SSE stream drop
1199 /// and cleared it) this paginates inline against the upstream —
1200 /// the caller gets fresh data on the happy path, or the live
1201 /// upstream error if the connection is genuinely down, rather
1202 /// than stale pre-drop tools.
1203 async fn list_tools(
1204 &self,
1205 ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
1206 if !self.has_tools_cap() {
1207 return Ok(Arc::new(Vec::new()));
1208 }
1209 if let Some(cached) = self.tools.read().await.as_ref() {
1210 return cached.clone();
1211 }
1212 // Cache cleared; refresh inline. Concurrent callers may each
1213 // refresh — wasteful, but the proxy fans out across distinct
1214 // upstreams so a single Connection rarely sees concurrent
1215 // `list_tools` calls.
1216 self.refresh_tools(None).await;
1217 self.tools
1218 .read()
1219 .await
1220 .as_ref()
1221 .expect("refresh_tools installs Some")
1222 .clone()
1223 }
1224
1225 /// Calls a tool on the MCP server. The returned
1226 /// `CallToolResult.content` is **fully resolved**: every
1227 /// `ContentBlock::ResourceLink { uri }` whose URI appears in
1228 /// `list_resources` has already been replaced by one or more
1229 /// `ContentBlock::EmbeddedResource` blocks carrying the fetched
1230 /// contents (via `read_resource`). Unknown-URI links pass through
1231 /// untouched — the upstream server may have its own out-of-band
1232 /// resolution path the caller should preserve.
1233 ///
1234 /// Resolving inside `call_tool` (rather than downstream in
1235 /// `call_tool_as_message`) means every consumer of the result
1236 /// sees `EmbeddedResource` shapes uniformly; the stateless
1237 /// `From<ContentBlock>` impl is then enough to convert the whole
1238 /// result to `RichContent` with no further connection work.
1239 async fn call_tool(
1240 &self,
1241 params: &super::tool::CallToolRequestParams,
1242 ) -> Result<super::tool::CallToolResult, super::Error> {
1243 if !self.has_tools_cap() {
1244 return Err(super::Error::UnsupportedCapability {
1245 capability: "tools",
1246 });
1247 }
1248 // Mark the connection as deliberately used. Flipped at the top
1249 // of the method (not after success) because even a failed
1250 // `tools/call` may have mutated upstream state — we don't want
1251 // the drop-time orphan-DELETE second-guessing that.
1252 self.any_calls.store(true, Ordering::Relaxed);
1253 let mut result: super::tool::CallToolResult =
1254 self.rpc("tools/call", params, false).await?;
1255
1256 // Build the known-resource URI set for ResourceLink
1257 // resolution. `list_resources` failure → empty set (same
1258 // safe fallback the resolution path used previously).
1259 let known_uris: std::collections::HashSet<String> =
1260 match self.list_resources().await {
1261 Ok(rs) => rs.iter().map(|r| r.uri.clone()).collect(),
1262 Err(_) => std::collections::HashSet::new(),
1263 };
1264
1265 // Walk the blocks, replacing each resolvable ResourceLink
1266 // with one EmbeddedResource per returned ResourceContentsUnion.
1267 // Everything else (Text, Image, Audio, EmbeddedResource,
1268 // unknown-URI ResourceLinks) passes through.
1269 let mut resolved: Vec<super::tool::ContentBlock> =
1270 Vec::with_capacity(result.content.len());
1271 for block in std::mem::take(&mut result.content) {
1272 match block {
1273 super::tool::ContentBlock::ResourceLink(link)
1274 if known_uris.contains(&link.uri) =>
1275 {
1276 let read = self.read_resource(&link.uri).await?;
1277 for contents in read.contents {
1278 resolved.push(
1279 super::tool::ContentBlock::EmbeddedResource(
1280 super::tool::EmbeddedResource {
1281 resource: contents,
1282 // ResourceLink's annotations
1283 // don't have a perfect home on
1284 // EmbeddedResource — both fields
1285 // exist but they describe different
1286 // shapes (the link vs the inlined
1287 // contents). Drop them on the way
1288 // in; the EmbeddedResource is now
1289 // a fresh authoritative block.
1290 annotations: None,
1291 _meta: None,
1292 },
1293 ),
1294 );
1295 }
1296 }
1297 other => resolved.push(other),
1298 }
1299 }
1300 result.content = resolved;
1301 Ok(result)
1302 }
1303
1304 /// Calls a tool and converts the (already-resolved) result into a
1305 /// [`ToolMessage`]. Resource resolution happens inside
1306 /// [`Self::call_tool`] — by the time we get the blocks here every
1307 /// resolvable `ResourceLink` has already been replaced with an
1308 /// `EmbeddedResource`, so the conversion is a pure stateless
1309 /// element-wise map through [`From<ContentBlock> for
1310 /// RichContentPart`](crate::agent::completions::message::RichContentPart).
1311 ///
1312 /// Content-block mapping (handled by the `From` impl):
1313 /// - `text` → text part
1314 /// - `image` → image_url part (data URL)
1315 /// - `audio` → input_audio part
1316 /// - `embedded_resource` (text) → text part
1317 /// - `embedded_resource` (blob, image mime) → image_url part
1318 /// - `embedded_resource` (blob, audio mime) → input_audio part
1319 /// - `embedded_resource` (blob, video mime) → input_video part
1320 /// - `embedded_resource` (blob, other mime) → file part
1321 /// - `resource_link` (unknown URI) → JSON-text fallback
1322 /// (resolvable URIs were already resolved upstream)
1323 async fn call_tool_as_message(
1324 &self,
1325 params: &super::tool::CallToolRequestParams,
1326 tool_call_id: String,
1327 ) -> Result<crate::agent::completions::message::ToolMessage, super::Error>
1328 {
1329 use crate::agent::completions::message::{
1330 RichContentPart, ToolMessage, ToolResponseMetadata,
1331 };
1332
1333 let result = self.call_tool(params).await?;
1334
1335 let parts: Vec<RichContentPart> =
1336 result.content.into_iter().map(Into::into).collect();
1337
1338 // Lossy-decode the MCP `_meta` extension bag into our typed
1339 // `ToolResponseMetadata`. Unknown keys (set by non-objectiveai
1340 // upstreams) are silently dropped. Decoding failure leaves
1341 // metadata as `None`.
1342 let metadata = result._meta.as_ref().and_then(|m| {
1343 serde_json::from_value::<ToolResponseMetadata>(
1344 serde_json::to_value(m).ok()?,
1345 )
1346 .ok()
1347 });
1348
1349 Ok(ToolMessage {
1350 content: parts.into(),
1351 tool_call_id,
1352 metadata,
1353 })
1354 }
1355
1356 /// Sends a `resources/list` RPC call for a single page.
1357 async fn rpc_list_resources(
1358 &self,
1359 cursor: Option<&str>,
1360 ) -> Result<super::resource::ListResourcesResult, super::Error> {
1361 self.rpc(
1362 "resources/list",
1363 &super::resource::ListResourcesRequest {
1364 cursor: cursor.map(String::from),
1365 },
1366 true,
1367 )
1368 .await
1369 }
1370
1371 /// Returns all resources from the server.
1372 ///
1373 /// Same cache-or-refresh semantics as [`Self::list_tools`]: a
1374 /// cleared cache (post-drop or pre-populate) triggers an inline
1375 /// paginate against the upstream.
1376 async fn list_resources(
1377 &self,
1378 ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
1379 if !self.has_resources_cap() {
1380 return Ok(Arc::new(Vec::new()));
1381 }
1382 if let Some(cached) = self.resources.read().await.as_ref() {
1383 return cached.clone();
1384 }
1385 self.refresh_resources(None).await;
1386 self.resources
1387 .read()
1388 .await
1389 .as_ref()
1390 .expect("refresh_resources installs Some")
1391 .clone()
1392 }
1393
1394 /// Returns the cached tool list as soon as it differs from `current`,
1395 /// or — if it equals `current` right now — waits up to `timeout` for
1396 /// the cache to change and then returns whatever it sees.
1397 ///
1398 /// An `Err` cache is treated as "different from any caller snapshot"
1399 /// and returned immediately.
1400 ///
1401 /// Concurrency-safe: any number of concurrent subscribers wait on
1402 /// independent `Notified` futures and read the cache through the
1403 /// shared `RwLock`. A timeout that fires alone is not an error — we
1404 /// re-read the cache and return whatever's there.
1405 async fn subscribe_tools(
1406 &self,
1407 current: &[super::tool::Tool],
1408 timeout: Duration,
1409 ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
1410 if !self.has_tools_list_changed() {
1411 // Server can't push `notifications/tools/list_changed` —
1412 // awaiting the notify is unreachable. Return whatever's
1413 // there right now (`Ok(empty)` on a tools-cap-absent
1414 // server via `list_tools`'s own gate; the real cache
1415 // otherwise).
1416 return self.list_tools().await;
1417 }
1418 // Arm BEFORE reading. `enable()` registers the future in the
1419 // wait queue without polling, so a `notify_waiters` racing
1420 // between our read and our await still wakes us.
1421 let notified = self.tools_changed.notified();
1422 tokio::pin!(notified);
1423 notified.as_mut().enable();
1424
1425 // `list_tools` handles a cleared cache (post-drop) by
1426 // paginating inline. A `None` initial state can't be
1427 // compared to the caller's snapshot meaningfully — promote
1428 // to whatever the refresh installs.
1429 let initial = self.list_tools().await;
1430 match &initial {
1431 Ok(arc) if arc.as_slice() == current => {}
1432 _ => return initial,
1433 }
1434
1435 let _ = tokio::time::timeout(timeout, notified).await;
1436
1437 self.list_tools().await
1438 }
1439
1440 /// Resource counterpart of [`Self::subscribe_tools`].
1441 async fn subscribe_resources(
1442 &self,
1443 current: &[super::resource::Resource],
1444 timeout: Duration,
1445 ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
1446 if !self.has_resources_list_changed() {
1447 // Symmetric to `subscribe_tools` — see that gate.
1448 return self.list_resources().await;
1449 }
1450 let notified = self.resources_changed.notified();
1451 tokio::pin!(notified);
1452 notified.as_mut().enable();
1453
1454 let initial = self.list_resources().await;
1455 match &initial {
1456 Ok(arc) if arc.as_slice() == current => {}
1457 _ => return initial,
1458 }
1459
1460 let _ = tokio::time::timeout(timeout, notified).await;
1461
1462 self.list_resources().await
1463 }
1464
1465 /// Reads a resource from the MCP server.
1466 async fn read_resource(
1467 &self,
1468 uri: &str,
1469 ) -> Result<super::resource::ReadResourceResult, super::Error> {
1470 if !self.has_resources_cap() {
1471 return Err(super::Error::UnsupportedCapability {
1472 capability: "resources",
1473 });
1474 }
1475 // Mark the connection as deliberately used (same reasoning as
1476 // `call_tool` — see the drop-time orphan-DELETE gate).
1477 self.any_calls.store(true, Ordering::Relaxed);
1478 self.rpc(
1479 "resources/read",
1480 &super::resource::ReadResourceRequestParams {
1481 uri: uri.to_string(),
1482 },
1483 true,
1484 )
1485 .await
1486 }
1487
1488 /// Re-fetches all tools from the server, replacing the cached list.
1489 ///
1490 /// Optionally fires `on_change` *after* the write lock is acquired but
1491 /// *before* the network paginate begins, so the callback observes the
1492 /// "list change is in flight" edge — readers blocked on the read lock
1493 /// won't return until the new list lands. The proxy uses this to
1494 /// re-emit `notifications/tools/list_changed` to its downstream client
1495 /// at the moment the staleness window opens.
1496 async fn refresh_tools(&self, on_change: Option<ListChangedCallback>) {
1497 if !self.has_tools_cap() {
1498 // No tools capability — install an empty Vec so the cache
1499 // contract holds (`list_tools`'s `.expect("refresh_tools
1500 // installs Some")` etc.) and return without paginating or
1501 // signalling. No `notify_waiters` and no `on_change` —
1502 // nothing real changed.
1503 let mut guard = self.tools.write().await;
1504 *guard = Some(Ok(Arc::new(Vec::new())));
1505 return;
1506 }
1507 // Listener-driven refresh. Visibility contract: any caller
1508 // that issues `list_tools()` after a `tools/list_changed`
1509 // notification has been observed must see the post-swap
1510 // value, not stale data — so the write lock has to gate
1511 // readers across the upstream paginate.
1512 //
1513 // Performance contract: don't serialise paginate *behind*
1514 // lock-acquisition latency. We start `tools.write()` and the
1515 // upstream paginate **concurrently** with `tokio::join!`. The
1516 // write-lock acquire blocks new `list_tools()` readers
1517 // immediately (preserving visibility) and runs in parallel
1518 // with whatever drain time the in-flight readers need; the
1519 // paginate runs alongside. Total wall-clock is
1520 // `max(drain_time, paginate_time)` instead of the sum.
1521 //
1522 // `notify_waiters` and `on_change` fire under the write
1523 // guard, *after* `*guard = result`, so anyone awoken by them
1524 // queues on the read lock, waits for the guard to drop, and
1525 // observes the post-swap state.
1526 let (mut guard, result) =
1527 tokio::join!(self.tools.write(), self.paginate_tools(),);
1528 *guard = Some(result);
1529 self.tools_changed.notify_waiters();
1530 if let Some(cb) = on_change {
1531 cb();
1532 }
1533 }
1534
1535 /// Page-by-page fetch of the upstream tool list, no locks held.
1536 /// Shared between the `_signaling` (initial-populate, holds lock
1537 /// for the original "block fast readers" contract) and `refresh_*`
1538 /// (listener-driven, lock-only-around-install) variants.
1539 async fn paginate_tools(
1540 &self,
1541 ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
1542 let mut all_tools = Vec::new();
1543 let mut cursor: Option<String> = None;
1544 loop {
1545 match self.rpc_list_tools(cursor.as_deref()).await {
1546 Ok(page) => {
1547 all_tools.extend(page.tools);
1548 cursor = page.next_cursor;
1549 if cursor.is_none() {
1550 return Ok(Arc::new(all_tools));
1551 }
1552 }
1553 Err(e) => return Err(Arc::new(e)),
1554 }
1555 }
1556 }
1557
1558 /// Same as [`Self::refresh_tools`] but fires `lock_held` once the
1559 /// write lock has been acquired so the caller can synchronise on
1560 /// "writer is in possession of the cache" before returning. Used by
1561 /// `ConnectionInner::new` to prevent a fast reader from acquiring
1562 /// the read lock before this writer has even started.
1563 ///
1564 /// **Caller invariant:** the spawn site in `ConnectionInner::new`
1565 /// must gate this call on `capabilities.tools.is_some()`. This
1566 /// method assumes the tools capability is present and issues
1567 /// `tools/list` RPCs unconditionally — running it against a
1568 /// no-tools server triggers the 15-min idempotent-backoff storm.
1569 async fn refresh_tools_signaling(
1570 &self,
1571 lock_held: tokio::sync::oneshot::Sender<()>,
1572 on_change: Option<ListChangedCallback>,
1573 ) {
1574 let mut guard = self.tools.write().await;
1575 // Fire `tools_changed` while we hold the write lock and *before*
1576 // installing the new list. Any subscriber woken now must take a
1577 // read lock to observe the result, and that read lock is queued
1578 // behind this write guard — so they always see the post-swap
1579 // state, never mid-swap.
1580 self.tools_changed.notify_waiters();
1581 let _ = lock_held.send(());
1582 if let Some(cb) = on_change {
1583 cb();
1584 }
1585 let mut all_tools = Vec::new();
1586 let mut cursor: Option<String> = None;
1587 let result = loop {
1588 match self.rpc_list_tools(cursor.as_deref()).await {
1589 Ok(page) => {
1590 all_tools.extend(page.tools);
1591 cursor = page.next_cursor;
1592 if cursor.is_none() {
1593 break Ok(Arc::new(all_tools));
1594 }
1595 }
1596 Err(e) => break Err(Arc::new(e)),
1597 }
1598 };
1599 *guard = Some(result);
1600 }
1601
1602 /// Re-fetches all resources from the server, replacing the cached list.
1603 /// See [`ConnectionInner::refresh_tools`] for the callback timing
1604 /// contract.
1605 async fn refresh_resources(&self, on_change: Option<ListChangedCallback>) {
1606 if !self.has_resources_cap() {
1607 // Symmetric to `refresh_tools` — see that gate.
1608 let mut guard = self.resources.write().await;
1609 *guard = Some(Ok(Arc::new(Vec::new())));
1610 return;
1611 }
1612 // Same paginate-while-acquiring-the-write-lock pattern as
1613 // `refresh_tools` — see that comment for the visibility +
1614 // performance rationale.
1615 let (mut guard, result) =
1616 tokio::join!(self.resources.write(), self.paginate_resources(),);
1617 *guard = Some(result);
1618 self.resources_changed.notify_waiters();
1619 if let Some(cb) = on_change {
1620 cb();
1621 }
1622 }
1623
1624 async fn paginate_resources(
1625 &self,
1626 ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
1627 let mut all_resources = Vec::new();
1628 let mut cursor: Option<String> = None;
1629 loop {
1630 match self.rpc_list_resources(cursor.as_deref()).await {
1631 Ok(page) => {
1632 all_resources.extend(page.resources);
1633 cursor = page.next_cursor;
1634 if cursor.is_none() {
1635 return Ok(Arc::new(all_resources));
1636 }
1637 }
1638 Err(e) => return Err(Arc::new(e)),
1639 }
1640 }
1641 }
1642
1643 /// Resource counterpart of [`Self::refresh_tools_signaling`]. The
1644 /// same spawn-site-gate invariant applies: the caller must gate
1645 /// the spawn on `capabilities.resources.is_some()`.
1646 async fn refresh_resources_signaling(
1647 &self,
1648 lock_held: tokio::sync::oneshot::Sender<()>,
1649 on_change: Option<ListChangedCallback>,
1650 ) {
1651 let mut guard = self.resources.write().await;
1652 // See `refresh_tools_signaling` — fire under the write lock,
1653 // before install, so subscribers' next read sees the post-swap
1654 // state.
1655 self.resources_changed.notify_waiters();
1656 let _ = lock_held.send(());
1657 if let Some(cb) = on_change {
1658 cb();
1659 }
1660 let mut all_resources = Vec::new();
1661 let mut cursor: Option<String> = None;
1662 let result = loop {
1663 match self.rpc_list_resources(cursor.as_deref()).await {
1664 Ok(page) => {
1665 all_resources.extend(page.resources);
1666 cursor = page.next_cursor;
1667 if cursor.is_none() {
1668 break Ok(Arc::new(all_resources));
1669 }
1670 }
1671 Err(e) => break Err(Arc::new(e)),
1672 }
1673 };
1674 *guard = Some(result);
1675 }
1676
1677 /// Builds a GET request to the MCP endpoint for receiving server
1678 /// notifications via SSE.
1679 async fn get(&self) -> reqwest::RequestBuilder {
1680 self.http_client
1681 .get(&self.url)
1682 .headers(
1683 self.build_request_headers(None, Some("text/event-stream"))
1684 .await,
1685 )
1686 }
1687
1688 /// Listens for `notifications/tools/list_changed` and
1689 /// `notifications/resources/list_changed` on an SSE stream. On each
1690 /// notification, write-locks and re-fetches the full list.
1691 ///
1692 /// `initial_lines` is the pre-opened SSE line reader handed in by
1693 /// [`Client::connect`](super::Client::connect) — that stream is
1694 /// consumed first. When it ends (or any later GET reconnect ends),
1695 /// we sleep `backoff_initial_interval` and open a fresh GET `/` SSE
1696 /// stream.
1697 ///
1698 /// Takes a [`Weak<Self>`] (not `Arc<Self>`) so the spawned task
1699 /// doesn't itself keep the [`Connection`] alive, and a
1700 /// [`CancellationToken`] sibling clone of the connection's
1701 /// [`DropGuard`] so the task tears down the instant the last
1702 /// external `Arc<ConnectionInner>` clone is dropped — every
1703 /// blocking await (line read, reconnect send, backoff sleep) is
1704 /// raced against `cancel.cancelled()` and exits without any zombie
1705 /// retries against a now-dead session.
1706 async fn listen_for_list_changes(
1707 weak: Weak<Self>,
1708 cancel: CancellationToken,
1709 initial_lines: super::LinesStream,
1710 ) {
1711 // First iteration: use the pre-opened SSE stream the client
1712 // handed us. After that, fall back to opening fresh GET / SSE
1713 // streams as the upstream connection cycles.
1714 let mut next_lines: Option<super::LinesStream> = Some(initial_lines);
1715 // One-shot guard for the catch-up refresh: false on the very
1716 // first iteration (the caller's pre-opened SSE stream — its
1717 // associated cache was just populated by `Client::connect`'s
1718 // initial pagination, so re-fetching there would just be a
1719 // wasted round-trip), true thereafter. Every stream end —
1720 // whether `Ok(None)` (clean close) or `Err(_)` (read failure)
1721 // — drops back here, which we treat as an implicit
1722 // list-changed notification: the upstream's broadcast (in
1723 // particular the proxy's per-session `tokio::broadcast`) is
1724 // lossy for moments when this listener has zero active
1725 // subscribers, so anything that fired during our disconnect
1726 // window may have been dropped.
1727 //
1728 // ORDER MATTERS. The refresh must run AFTER we've re-opened
1729 // the GET / SSE stream — i.e. after we're a subscriber again
1730 // — and BEFORE we enter the inner read loop. If we refreshed
1731 // before the resubscribe, a notification that fired between
1732 // our refresh-completion and our subscribe would be lost the
1733 // same way as the original disconnect-window drops; doing it
1734 // after means a notification fired DURING the refresh lands
1735 // in the new subscriber's buffer (broadcast::Sender::send
1736 // backs onto each receiver's channel-capacity slot) and gets
1737 // consumed by the inner loop on its next read.
1738 let mut is_reconnect = false;
1739
1740 loop {
1741 // The token cancels deterministically when the last
1742 // `Arc<ConnectionInner>` clone is dropped (see
1743 // `_listener_cancel_guard`). Check once per outer
1744 // iteration, but the real protection is the cancel arms in
1745 // every blocking await below — those exit immediately on
1746 // cancel.
1747 if cancel.is_cancelled() {
1748 return;
1749 }
1750 let Some(this) = weak.upgrade() else { return };
1751 let backoff_delay = this.backoff_initial_interval;
1752
1753 let mut lines = match next_lines.take() {
1754 Some(l) => l,
1755 None => {
1756 // Race the upstream GET against cancellation — if
1757 // the connection drops mid-reconnect, exit
1758 // immediately rather than waiting for the request
1759 // to complete or time out (otherwise produces a
1760 // burst of 401 retries against a now-dead session
1761 // under heavy churn).
1762 let send_outcome = tokio::select! {
1763 out = async {
1764 this.get().await.send().await
1765 } => out,
1766 _ = cancel.cancelled() => {
1767 drop(this);
1768 return;
1769 }
1770 };
1771 let response = match send_outcome {
1772 Ok(r) if r.status().is_success() => r,
1773 _ => {
1774 drop(this);
1775 // Sleep with cancel-arm: instant exit on
1776 // drop, no zombie retries.
1777 tokio::select! {
1778 _ = tokio::time::sleep(backoff_delay) => {}
1779 _ = cancel.cancelled() => return,
1780 }
1781 continue;
1782 }
1783 };
1784 super::lines_from_response(response)
1785 }
1786 };
1787
1788 // Catch-up refresh on every reconnect — the implicit
1789 // list-changed treatment for the just-failed stream. See
1790 // the `is_reconnect` doc-comment above for the
1791 // refresh-AFTER-resubscribe rationale.
1792 if is_reconnect {
1793 // tools and resources are independent locks; run the
1794 // catch-up refreshes concurrently so disconnect
1795 // recovery isn't sequential.
1796 let _ = tokio::join!(
1797 this.refresh_tools(this.on_tools_list_changed.get()),
1798 this.refresh_resources(
1799 this.on_resources_list_changed.get()
1800 ),
1801 );
1802 }
1803 is_reconnect = true;
1804
1805 'inner: loop {
1806 tokio::select! {
1807 line_result = lines.next_line() => {
1808 match line_result {
1809 Ok(Some(line)) => {
1810 // SSE data lines start with "data: ".
1811 let Some(data) = line.strip_prefix("data: ") else {
1812 continue 'inner;
1813 };
1814 let method = match serde_json::from_str::<super::JsonRpcNotification>(data) {
1815 Ok(n) => n.method,
1816 Err(_) => continue 'inner,
1817 };
1818 match method.as_str() {
1819 "notifications/tools/list_changed" => {
1820 // refresh_tools fires the
1821 // callback after the cache is
1822 // installed, so the proxy's
1823 // downstream
1824 // notifications/tools/list_changed
1825 // emission lines up with the
1826 // staleness window opening.
1827 this.refresh_tools(
1828 this.on_tools_list_changed.get(),
1829 )
1830 .await;
1831 }
1832 "notifications/resources/list_changed" => {
1833 this.refresh_resources(
1834 this.on_resources_list_changed.get(),
1835 )
1836 .await;
1837 }
1838 _ => {}
1839 }
1840 }
1841 // Stream ended cleanly or errored — break out
1842 // to the outer loop so we either reconnect or,
1843 // if everyone's gone, exit at the top.
1844 _ => break 'inner,
1845 }
1846 }
1847 // Cancellation: the connection's last clone has
1848 // dropped. Tear down immediately.
1849 _ = cancel.cancelled() => {
1850 drop(this);
1851 return;
1852 }
1853 }
1854 }
1855
1856 // Stream dropped — empty the per-Connection caches so the
1857 // next `list_tools` / `list_resources` paginates inline
1858 // against the (possibly still-dead) upstream rather than
1859 // returning whatever was cached before the drop. The
1860 // `is_reconnect` catch-up at the top of the next outer
1861 // iteration will repopulate `Some(_)` if the reconnect
1862 // succeeds; if the reconnect keeps failing, the cache
1863 // stays `None` and `list_*` callers paginate themselves.
1864 *this.tools.write().await = None;
1865 *this.resources.write().await = None;
1866
1867 // Stream ended — drop the strong ref before sleeping so the
1868 // next iteration's weak-upgrade can detect liveness honestly.
1869 drop(this);
1870 tokio::select! {
1871 _ = tokio::time::sleep(backoff_delay) => {}
1872 _ = cancel.cancelled() => return,
1873 }
1874 }
1875 }
1876}
1877
1878impl Drop for ConnectionInner {
1879 /// Orphan-DELETE hook: when a **freshly-minted** connection (not a
1880 /// resume) is dropped without any deliberate use — no `call_tool`,
1881 /// no `read_resource`, no explicit `Connection::delete` — spawn a
1882 /// fire-and-forget HTTP DELETE so the upstream session we just
1883 /// opened doesn't sit there accruing per-session state for nothing.
1884 ///
1885 /// Reconnect-resumes are deliberately excluded: a reconnect's
1886 /// `any_calls == false` only means *this* `Connection` instance
1887 /// never called anything — the underlying upstream session may
1888 /// well have been used by an earlier `Connection` that opened it,
1889 /// did real work, and let us re-attach. Tearing it down here would
1890 /// kill a still-live session out from under whoever owns it.
1891 /// Reconnects rely on the proxy's explicit `Connection::delete`
1892 /// (or `Client::delete`) for upstream cleanup instead.
1893 ///
1894 /// Skip conditions (any one triggers a no-op):
1895 ///
1896 /// - `mock` is `true` — there was never an HTTP session to begin with.
1897 /// - `is_reconnect` is `true` — see above; the upstream session
1898 /// pre-existed this connection and isn't ours to tear down on drop.
1899 /// - `any_calls` is `true` — the connection was deliberately used,
1900 /// or an explicit `Connection::delete` already ran.
1901 /// - No tokio runtime is in scope — `tokio::spawn` would panic.
1902 /// Silently leak the upstream session in this case (sync
1903 /// teardown paths e.g. `cfg(test)` blocks not driven by tokio).
1904 ///
1905 /// The listener-cancel `DropGuard` inside `_listener_cancel_guard`
1906 /// fires automatically as part of this same `drop` call, so by the
1907 /// time the orphan DELETE goes out the listener task has already
1908 /// been told to cancel — no SSE/GET race with the upstream DELETE.
1909 fn drop(&mut self) {
1910 if self.is_reconnect {
1911 return;
1912 }
1913 if self.any_calls.load(Ordering::Relaxed) {
1914 return;
1915 }
1916
1917 // Clone out the bits the orphan task needs. None of these are
1918 // big: `reqwest::Client` is itself an `Arc` bump,
1919 // `IndexMap<String, String>` is the per-connection header bag
1920 // (small), and the `String`s are the per-session id + URL.
1921 let http_client = self.http_client.clone();
1922 let url = self.url.clone();
1923 let session_id = self.session_id.clone();
1924 let headers = self.headers.clone();
1925 let timeout = self.call_timeout;
1926
1927 // Spawn only if a tokio runtime is in scope. `tokio::spawn`
1928 // panics outside one — sync teardown paths (e.g. a test that
1929 // builds a `Connection` and lets it drop on a non-async stack)
1930 // would crash without this guard.
1931 if let Ok(handle) = tokio::runtime::Handle::try_current() {
1932 handle.spawn(orphan_delete(
1933 http_client,
1934 url,
1935 session_id,
1936 headers,
1937 timeout,
1938 ));
1939 }
1940 }
1941}
1942
1943/// Fire-and-forget HTTP `DELETE` used by [`ConnectionInner::drop`] to
1944/// release a resumed upstream session that was never used. Mirrors
1945/// [`super::Connection::delete`]'s wire shape (same `Mcp-Session-Id`
1946/// header behavior, same header-loop with the explicit session id
1947/// winning over any `Mcp-Session-Id` entry in `headers`) but never
1948/// surfaces errors — there's no caller left to surface them to. The
1949/// `timeout` (sourced from the originating connection's `call_timeout`)
1950/// caps the request so a hanging upstream can't keep the spawned task
1951/// alive forever.
1952async fn orphan_delete(
1953 http_client: reqwest::Client,
1954 url: String,
1955 session_id: String,
1956 headers: IndexMap<String, String>,
1957 timeout: Duration,
1958) {
1959 let mut request = http_client
1960 .delete(&url)
1961 .timeout(timeout)
1962 .header("Mcp-Session-Id", &session_id);
1963 for (name, value) in &headers {
1964 if name.eq_ignore_ascii_case("Mcp-Session-Id") {
1965 continue;
1966 }
1967 request = request.header(name, value);
1968 }
1969 // Errors silently swallowed: no caller, and the listener-cancel
1970 // guard has already fired (it runs as part of the regular
1971 // `_listener_cancel_guard` drop inside the same `drop` call).
1972 let _ = request.send().await;
1973}
1974
1975#[cfg(test)]
1976mod capability_gate_tests {
1977 use super::*;
1978 use crate::mcp::initialize_result::{
1979 ResourcesCapability, ServerCapabilities, ToolsCapability,
1980 };
1981 use crate::mcp::tool::{
1982 CallToolRequestParams, Tool, ToolSchemaObject, ToolSchemaType,
1983 };
1984
1985 /// Builds a `ServerCapabilities` with the given `tools` / `resources`
1986 /// shapes and every other capability set to `None`. Each gate test
1987 /// passes its own combination to exercise a specific cap-absent
1988 /// branch.
1989 fn caps(
1990 tools: Option<ToolsCapability>,
1991 resources: Option<ResourcesCapability>,
1992 ) -> ServerCapabilities {
1993 ServerCapabilities {
1994 experimental: None,
1995 logging: None,
1996 completions: None,
1997 prompts: None,
1998 resources,
1999 tools,
2000 tasks: None,
2001 }
2002 }
2003
2004 fn tool(name: &str) -> Tool {
2005 Tool {
2006 name: name.to_string(),
2007 title: None,
2008 description: None,
2009 icons: None,
2010 input_schema: ToolSchemaObject {
2011 r#type: ToolSchemaType::Object,
2012 properties: None,
2013 required: None,
2014 extra: IndexMap::new(),
2015 },
2016 output_schema: None,
2017 annotations: None,
2018 execution: None,
2019 _meta: None,
2020 }
2021 }
2022
2023 /// 3.1 — `list_tools` short-circuits to `Ok(empty)` when the server
2024 /// declared no `tools` capability, *before* the cache is consulted.
2025 /// We poison the cache with `Err` to prove the gate fires first.
2026 #[tokio::test]
2027 async fn list_tools_returns_empty_when_tools_cap_absent() {
2028 let conn = Connection::new_for_test_with_caps(
2029 "t".into(),
2030 "http://x".into(),
2031 caps(None, None),
2032 );
2033 let err = super::super::Error::NoSessionId {
2034 url: "http://x".into(),
2035 body: String::new(),
2036 };
2037 *conn.inner.tools.write().await = Some(Err(Arc::new(err)));
2038
2039 let got = conn.list_tools().await.unwrap();
2040 assert!(got.is_empty());
2041 }
2042
2043 /// 3.2 — symmetric to 3.1 for `list_resources`.
2044 #[tokio::test]
2045 async fn list_resources_returns_empty_when_resources_cap_absent() {
2046 let conn = Connection::new_for_test_with_caps(
2047 "t".into(),
2048 "http://x".into(),
2049 caps(None, None),
2050 );
2051 let err = super::super::Error::NoSessionId {
2052 url: "http://x".into(),
2053 body: String::new(),
2054 };
2055 *conn.inner.resources.write().await = Some(Err(Arc::new(err)));
2056
2057 let got = conn.list_resources().await.unwrap();
2058 assert!(got.is_empty());
2059 }
2060
2061 /// 3.3 — `read_resource` errors with `UnsupportedCapability` when
2062 /// the server declared no `resources` capability, without hitting
2063 /// the network.
2064 #[tokio::test]
2065 async fn read_resource_errors_when_resources_cap_absent() {
2066 let conn = Connection::new_for_test_with_caps(
2067 "t".into(),
2068 "http://x".into(),
2069 caps(None, None),
2070 );
2071 let got = conn.read_resource("file://nope").await;
2072 assert!(matches!(
2073 got,
2074 Err(super::super::Error::UnsupportedCapability {
2075 capability: "resources"
2076 })
2077 ));
2078 }
2079
2080 /// 3.4 — `call_tool` errors with `UnsupportedCapability` when the
2081 /// server declared no `tools` capability.
2082 #[tokio::test]
2083 async fn call_tool_errors_when_tools_cap_absent() {
2084 let conn = Connection::new_for_test_with_caps(
2085 "t".into(),
2086 "http://x".into(),
2087 caps(None, None),
2088 );
2089 let params = CallToolRequestParams {
2090 name: "any".into(),
2091 arguments: None,
2092 _meta: None,
2093 task: None,
2094 };
2095 let got = conn.call_tool(¶ms).await;
2096 assert!(matches!(
2097 got,
2098 Err(super::super::Error::UnsupportedCapability {
2099 capability: "tools"
2100 })
2101 ));
2102 }
2103
2104 /// 3.5 — `refresh_tools` installs `Some(Ok(empty))` and returns
2105 /// without paginating when the server declared no `tools`
2106 /// capability. Clearing the cache first proves the install ran.
2107 #[tokio::test]
2108 async fn refresh_tools_installs_empty_when_tools_cap_absent() {
2109 let conn = Connection::new_for_test_with_caps(
2110 "t".into(),
2111 "http://x".into(),
2112 caps(None, None),
2113 );
2114 *conn.inner.tools.write().await = None;
2115
2116 conn.inner.refresh_tools(None).await;
2117
2118 let guard = conn.inner.tools.read().await;
2119 let v = guard
2120 .as_ref()
2121 .expect("refresh installed Some")
2122 .as_ref()
2123 .expect("refresh installed Ok");
2124 assert!(v.is_empty());
2125 }
2126
2127 /// 3.6 — symmetric to 3.5 for `refresh_resources`.
2128 #[tokio::test]
2129 async fn refresh_resources_installs_empty_when_resources_cap_absent() {
2130 let conn = Connection::new_for_test_with_caps(
2131 "t".into(),
2132 "http://x".into(),
2133 caps(None, None),
2134 );
2135 *conn.inner.resources.write().await = None;
2136
2137 conn.inner.refresh_resources(None).await;
2138
2139 let guard = conn.inner.resources.read().await;
2140 let v = guard
2141 .as_ref()
2142 .expect("refresh installed Some")
2143 .as_ref()
2144 .expect("refresh installed Ok");
2145 assert!(v.is_empty());
2146 }
2147
2148 /// 3.7 — `subscribe_tools` returns immediately (no wait-for-notify)
2149 /// when the server declared `tools` but not
2150 /// `tools.list_changed: Some(true)`. We populate the cache with a
2151 /// non-empty list, pass a long timeout, and expect a fast return
2152 /// with the cache contents.
2153 #[tokio::test]
2154 async fn subscribe_tools_short_circuits_when_list_changed_absent() {
2155 let conn = Connection::new_for_test_with_caps(
2156 "t".into(),
2157 "http://x".into(),
2158 caps(
2159 Some(ToolsCapability { list_changed: None }),
2160 None,
2161 ),
2162 );
2163 *conn.inner.tools.write().await =
2164 Some(Ok(Arc::new(vec![tool("a")])));
2165
2166 let start = std::time::Instant::now();
2167 let got = conn
2168 .subscribe_tools(&[tool("a")], Duration::from_secs(5))
2169 .await
2170 .unwrap();
2171 assert!(
2172 start.elapsed() < Duration::from_millis(100),
2173 "elapsed: {:?}",
2174 start.elapsed()
2175 );
2176 assert_eq!(got.as_slice(), &[tool("a")]);
2177 }
2178
2179 /// 3.8 — symmetric to 3.7 for `subscribe_resources`.
2180 #[tokio::test]
2181 async fn subscribe_resources_short_circuits_when_list_changed_absent() {
2182 use crate::mcp::resource::Resource;
2183 let conn = Connection::new_for_test_with_caps(
2184 "t".into(),
2185 "http://x".into(),
2186 caps(
2187 None,
2188 Some(ResourcesCapability {
2189 subscribe: None,
2190 list_changed: None,
2191 }),
2192 ),
2193 );
2194 let res = Resource {
2195 uri: "file://a".into(),
2196 name: "a".into(),
2197 title: None,
2198 description: None,
2199 mime_type: None,
2200 annotations: None,
2201 icons: None,
2202 _meta: None,
2203 };
2204 *conn.inner.resources.write().await =
2205 Some(Ok(Arc::new(vec![res.clone()])));
2206
2207 let start = std::time::Instant::now();
2208 let got = conn
2209 .subscribe_resources(&[res.clone()], Duration::from_secs(5))
2210 .await
2211 .unwrap();
2212 assert!(
2213 start.elapsed() < Duration::from_millis(100),
2214 "elapsed: {:?}",
2215 start.elapsed()
2216 );
2217 assert_eq!(got.as_slice(), &[res]);
2218 }
2219}
2220
2221#[cfg(test)]
2222mod subscribe_tests {
2223 use super::*;
2224 use crate::mcp::tool::{Tool, ToolSchemaObject, ToolSchemaType};
2225
2226 fn tool(name: &str) -> Tool {
2227 Tool {
2228 name: name.to_string(),
2229 title: None,
2230 description: None,
2231 icons: None,
2232 input_schema: ToolSchemaObject {
2233 r#type: ToolSchemaType::Object,
2234 properties: None,
2235 required: None,
2236 extra: IndexMap::new(),
2237 },
2238 output_schema: None,
2239 annotations: None,
2240 execution: None,
2241 _meta: None,
2242 }
2243 }
2244
2245 /// First read shows a different list — return immediately, never wait.
2246 #[tokio::test]
2247 async fn subscribe_tools_returns_immediately_when_cache_differs() {
2248 let conn = Connection::new_for_test("t".into(), "http://x".into());
2249 *conn.inner.tools.write().await = Some(Ok(Arc::new(vec![tool("a")])));
2250
2251 let start = std::time::Instant::now();
2252 let got = conn
2253 .subscribe_tools(&[tool("b")], Duration::from_secs(5))
2254 .await
2255 .unwrap();
2256 assert!(start.elapsed() < Duration::from_millis(100));
2257 assert_eq!(got.as_slice(), &[tool("a")]);
2258 }
2259
2260 /// Cached `Err` is treated as "different from any caller snapshot."
2261 #[tokio::test]
2262 async fn subscribe_tools_returns_err_immediately() {
2263 let conn = Connection::new_for_test("t".into(), "http://x".into());
2264 let err = super::super::Error::NoSessionId {
2265 url: "http://x".into(),
2266 body: String::new(),
2267 };
2268 *conn.inner.tools.write().await = Some(Err(Arc::new(err)));
2269
2270 let start = std::time::Instant::now();
2271 let got = conn.subscribe_tools(&[], Duration::from_secs(5)).await;
2272 assert!(start.elapsed() < Duration::from_millis(100));
2273 assert!(got.is_err());
2274 }
2275
2276 /// Cache equals snapshot, then a writer fires the notify under the
2277 /// write lock and installs a new list. The subscriber wakes, then its
2278 /// re-read blocks behind the writer's guard, observes the new list.
2279 #[tokio::test]
2280 async fn subscribe_tools_wakes_on_change_and_reads_post_swap() {
2281 let conn = Connection::new_for_test("t".into(), "http://x".into());
2282 *conn.inner.tools.write().await = Some(Ok(Arc::new(vec![tool("a")])));
2283
2284 let conn_for_subscriber = conn.clone();
2285 let subscriber = tokio::spawn(async move {
2286 conn_for_subscriber
2287 .subscribe_tools(&[tool("a")], Duration::from_secs(5))
2288 .await
2289 .unwrap()
2290 });
2291
2292 // Give the subscriber a moment to arm `notified()` and finish
2293 // its first read so it's parked on the timeout.
2294 tokio::time::sleep(Duration::from_millis(50)).await;
2295
2296 // Simulate `refresh_tools_signaling`: take the write lock, fire
2297 // `tools_changed` *while holding* the write lock, then install
2298 // the new value before releasing. This is exactly the ordering
2299 // that the real refresh path uses.
2300 {
2301 let mut guard = conn.inner.tools.write().await;
2302 conn.inner.tools_changed.notify_waiters();
2303 // Hold briefly to make absolutely sure the subscriber is
2304 // racing the read lock against our drop.
2305 tokio::time::sleep(Duration::from_millis(20)).await;
2306 *guard = Some(Ok(Arc::new(vec![tool("b")])));
2307 }
2308
2309 let got = tokio::time::timeout(Duration::from_secs(2), subscriber)
2310 .await
2311 .expect("subscriber returned in time")
2312 .expect("subscriber didn't panic");
2313 assert_eq!(got.as_slice(), &[tool("b")]);
2314 }
2315
2316 /// Cache equals snapshot, no notification arrives — timeout, return
2317 /// the still-equal list (not an error).
2318 #[tokio::test]
2319 async fn subscribe_tools_times_out_and_returns_unchanged_list() {
2320 let conn = Connection::new_for_test("t".into(), "http://x".into());
2321 *conn.inner.tools.write().await = Some(Ok(Arc::new(vec![tool("a")])));
2322
2323 let start = std::time::Instant::now();
2324 let got = conn
2325 .subscribe_tools(&[tool("a")], Duration::from_millis(50))
2326 .await
2327 .unwrap();
2328 let elapsed = start.elapsed();
2329 assert!(elapsed >= Duration::from_millis(40), "elapsed: {elapsed:?}");
2330 assert!(elapsed < Duration::from_millis(500), "elapsed: {elapsed:?}");
2331 assert_eq!(got.as_slice(), &[tool("a")]);
2332 }
2333
2334 /// Two concurrent subscribers both wake on a single notify_waiters
2335 /// and both observe the post-swap list.
2336 #[tokio::test]
2337 async fn subscribe_tools_supports_concurrent_subscribers() {
2338 let conn = Connection::new_for_test("t".into(), "http://x".into());
2339 *conn.inner.tools.write().await = Some(Ok(Arc::new(vec![tool("a")])));
2340
2341 let c1 = conn.clone();
2342 let c2 = conn.clone();
2343 let s1 = tokio::spawn(async move {
2344 c1.subscribe_tools(&[tool("a")], Duration::from_secs(5))
2345 .await
2346 .unwrap()
2347 });
2348 let s2 = tokio::spawn(async move {
2349 c2.subscribe_tools(&[tool("a")], Duration::from_secs(5))
2350 .await
2351 .unwrap()
2352 });
2353
2354 tokio::time::sleep(Duration::from_millis(50)).await;
2355
2356 {
2357 let mut guard = conn.inner.tools.write().await;
2358 conn.inner.tools_changed.notify_waiters();
2359 *guard = Some(Ok(Arc::new(vec![tool("c")])));
2360 }
2361
2362 let (r1, r2) = tokio::join!(s1, s2);
2363 let r1 = r1.unwrap();
2364 let r2 = r2.unwrap();
2365 assert_eq!(r1.as_slice(), &[tool("c")]);
2366 assert_eq!(r2.as_slice(), &[tool("c")]);
2367 }
2368}
2369
2370#[cfg(test)]
2371mod drain_notifications_tests {
2372 use super::*;
2373 use crate::mcp::tool::{ContentBlock, TextContent};
2374 use serde_json::json;
2375 use wiremock::matchers::{header, method, path};
2376 use wiremock::{Mock, MockServer, ResponseTemplate};
2377
2378 /// Happy path: proxy returns `[text, text]`, we parse it as two
2379 /// `ContentBlock::Text` and return them in order.
2380 #[tokio::test]
2381 async fn drain_notifications_parses_text_blocks_in_order() {
2382 let server = MockServer::start().await;
2383 Mock::given(method("GET"))
2384 .and(path("/notify"))
2385 .and(header("Mcp-Session-Id", ""))
2386 .respond_with(ResponseTemplate::new(200).set_body_json(json!([
2387 {"type": "text", "text": "first"},
2388 {"type": "text", "text": "second"},
2389 ])))
2390 .mount(&server)
2391 .await;
2392
2393 let conn = Connection::new_for_test("t".into(), server.uri());
2394 let blocks = conn.drain_notifications().await.expect("drain ok");
2395 assert_eq!(blocks.len(), 2);
2396 match &blocks[0] {
2397 ContentBlock::Text(TextContent { text, .. }) => {
2398 assert_eq!(text, "first")
2399 }
2400 other => panic!("expected text, got {other:?}"),
2401 }
2402 match &blocks[1] {
2403 ContentBlock::Text(TextContent { text, .. }) => {
2404 assert_eq!(text, "second")
2405 }
2406 other => panic!("expected text, got {other:?}"),
2407 }
2408 }
2409
2410 /// 404 (proxy lost the session, e.g. after a restart) → empty vec
2411 /// rather than an error. The next upstream call will surface the
2412 /// session-lost condition through its own error path; init-time
2413 /// drain shouldn't be the one to abort the request.
2414 #[tokio::test]
2415 async fn drain_notifications_404_returns_empty() {
2416 let server = MockServer::start().await;
2417 Mock::given(method("GET"))
2418 .and(path("/notify"))
2419 .respond_with(ResponseTemplate::new(404))
2420 .mount(&server)
2421 .await;
2422
2423 let conn = Connection::new_for_test("t".into(), server.uri());
2424 let blocks = conn.drain_notifications().await.expect("404 → ok(empty)");
2425 assert!(blocks.is_empty(), "expected empty vec, got {blocks:?}");
2426 }
2427
2428 /// Empty queue → empty array → empty vec. The most common case.
2429 #[tokio::test]
2430 async fn drain_notifications_empty_queue_returns_empty() {
2431 let server = MockServer::start().await;
2432 Mock::given(method("GET"))
2433 .and(path("/notify"))
2434 .respond_with(ResponseTemplate::new(200).set_body_json(json!([])))
2435 .mount(&server)
2436 .await;
2437
2438 let conn = Connection::new_for_test("t".into(), server.uri());
2439 let blocks = conn.drain_notifications().await.expect("drain ok");
2440 assert!(blocks.is_empty(), "expected empty vec, got {blocks:?}");
2441 }
2442
2443 /// Non-success / non-404 status propagates as `BadStatus`.
2444 #[tokio::test]
2445 async fn drain_notifications_5xx_returns_bad_status() {
2446 let server = MockServer::start().await;
2447 Mock::given(method("GET"))
2448 .and(path("/notify"))
2449 .respond_with(ResponseTemplate::new(500).set_body_string("boom"))
2450 .mount(&server)
2451 .await;
2452
2453 let conn = Connection::new_for_test("t".into(), server.uri());
2454 let err = conn.drain_notifications().await.expect_err("5xx → err");
2455 match err {
2456 super::super::Error::BadStatus { code, body, .. } => {
2457 assert_eq!(code.as_u16(), 500);
2458 assert_eq!(body, "boom");
2459 }
2460 other => panic!("expected BadStatus, got {other:?}"),
2461 }
2462 }
2463
2464}
2465
2466#[cfg(test)]
2467mod has_pending_notifications_tests {
2468 use super::*;
2469 use serde_json::json;
2470 use wiremock::matchers::{header, method, path};
2471 use wiremock::{Mock, MockServer, ResponseTemplate};
2472
2473 /// Happy path: proxy returns `true` → Ok(true).
2474 #[tokio::test]
2475 async fn has_pending_notifications_true() {
2476 let server = MockServer::start().await;
2477 Mock::given(method("GET"))
2478 .and(path("/notify/queued"))
2479 .and(header("Mcp-Session-Id", ""))
2480 .respond_with(ResponseTemplate::new(200).set_body_json(json!(true)))
2481 .mount(&server)
2482 .await;
2483
2484 let conn = Connection::new_for_test("t".into(), server.uri());
2485 let got = conn.has_pending_notifications().await.expect("peek ok");
2486 assert!(got);
2487 }
2488
2489 /// Proxy returns `false` → Ok(false).
2490 #[tokio::test]
2491 async fn has_pending_notifications_false() {
2492 let server = MockServer::start().await;
2493 Mock::given(method("GET"))
2494 .and(path("/notify/queued"))
2495 .respond_with(
2496 ResponseTemplate::new(200).set_body_json(json!(false)),
2497 )
2498 .mount(&server)
2499 .await;
2500
2501 let conn = Connection::new_for_test("t".into(), server.uri());
2502 let got = conn.has_pending_notifications().await.expect("peek ok");
2503 assert!(!got);
2504 }
2505
2506 /// 404 (proxy lost the session) → Ok(false). Same soft-fallback
2507 /// contract as drain_notifications — peek must never abort a
2508 /// request over a missing-session race.
2509 #[tokio::test]
2510 async fn has_pending_notifications_404_returns_false() {
2511 let server = MockServer::start().await;
2512 Mock::given(method("GET"))
2513 .and(path("/notify/queued"))
2514 .respond_with(ResponseTemplate::new(404))
2515 .mount(&server)
2516 .await;
2517
2518 let conn = Connection::new_for_test("t".into(), server.uri());
2519 let got = conn
2520 .has_pending_notifications()
2521 .await
2522 .expect("404 → ok(false)");
2523 assert!(!got);
2524 }
2525
2526 /// Non-success / non-404 status propagates as `BadStatus`.
2527 #[tokio::test]
2528 async fn has_pending_notifications_5xx_returns_bad_status() {
2529 let server = MockServer::start().await;
2530 Mock::given(method("GET"))
2531 .and(path("/notify/queued"))
2532 .respond_with(ResponseTemplate::new(500).set_body_string("boom"))
2533 .mount(&server)
2534 .await;
2535
2536 let conn = Connection::new_for_test("t".into(), server.uri());
2537 let err = conn
2538 .has_pending_notifications()
2539 .await
2540 .expect_err("5xx → err");
2541 match err {
2542 super::super::Error::BadStatus { code, body, .. } => {
2543 assert_eq!(code.as_u16(), 500);
2544 assert_eq!(body, "boom");
2545 }
2546 other => panic!("expected BadStatus, got {other:?}"),
2547 }
2548 }
2549
2550}