objectiveai_sdk/mcp/connection.rs
1//! MCP connection for communicating with an MCP server.
2//!
3//! [`Connection`] is a cheaply-clonable handle around an internal
4//! [`ConnectionInner`]. The last drop of the inner `Arc` runs
5//! [`ConnectionInner`]'s `Drop`, which cancels the listener task's
6//! [`tokio_util::sync::CancellationToken`] (held in `_listener_cancel_guard`
7//! as a [`tokio_util::sync::DropGuard`]) — the SSE listener exits the
8//! instant any in-flight reconnect, sleep, or read is cancelled, with no
9//! zombie 401 retries against a now-dead proxy session.
10
11use std::ops::Deref;
12use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
13use std::sync::{Arc, RwLock as StdRwLock, Weak};
14use std::time::Duration;
15
16use indexmap::IndexMap;
17use tokio::sync::{Notify, RwLock};
18use tokio_util::sync::{CancellationToken, DropGuard};
19
20/// Callback fired by [`Connection`] when the upstream MCP server emits
21/// `notifications/tools/list_changed` or `notifications/resources/list_changed`.
22///
23/// **Timing:** runs after the corresponding cache's write lock is taken
24/// but *before* the network paginate that replaces it. That ordering
25/// matches the moment the staleness window opens — anyone blocked on the
26/// read lock won't return until the new list lands. The callback should
27/// not call back into `list_tools` / `list_resources`: doing so would
28/// re-take the lock the listener already holds and deadlock.
29///
30/// Stored behind an `Arc` so the listener task can cheaply clone it out
31/// of the lock and call it without holding the read guard.
32pub type ListChangedCallback = Arc<dyn Fn() + Send + Sync + 'static>;
33
34/// A registered-or-not callback slot. Wrapper so [`ConnectionInner`] can
35/// keep `#[derive(Debug)]` (a raw `dyn Fn` isn't `Debug`).
36struct CallbackSlot(StdRwLock<Option<ListChangedCallback>>);
37
38impl CallbackSlot {
39 fn new() -> Self {
40 Self(StdRwLock::new(None))
41 }
42
43 fn set(&self, callback: ListChangedCallback) {
44 *self.0.write().unwrap() = Some(callback);
45 }
46
47 /// Cheap clone-out of the current callback (if any). The `Arc` clone
48 /// lets us release the read guard before invoking the callback.
49 fn get(&self) -> Option<ListChangedCallback> {
50 self.0.read().unwrap().clone()
51 }
52}
53
54impl std::fmt::Debug for CallbackSlot {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 let set = self.0.read().map(|g| g.is_some()).unwrap_or(false);
57 f.debug_struct("CallbackSlot").field("set", &set).finish()
58 }
59}
60
61/// An active connection to an MCP server using the Streamable HTTP transport.
62///
63/// Cheaply clonable (one `Arc` bump). When the last clone is dropped, the
64/// inner `Arc` ref count hits zero, [`ConnectionInner::Drop`] runs, the
65/// listener-cancel `DropGuard` is dropped, and the SSE listener task is
66/// cancelled — exiting any in-flight `lines.next_line()`, reconnect
67/// `send()`, or backoff `sleep` *immediately* without retrying against
68/// the now-dead proxy session.
69///
70/// Use the public methods (`list_tools`, `call_tool`, `list_resources`,
71/// `read_resource`, `call_tool_as_message`, `tool_key`) for the upstream
72/// MCP protocol surface. The inner state ([`ConnectionInner`]) is also
73/// reachable via `Deref` for read-only field access (e.g.
74/// `connection.url`, `connection.initialize_result.server_info.name`),
75/// but its methods are private — you must go through `Connection`.
76#[derive(Debug)]
77pub struct Connection {
78 inner: Arc<ConnectionInner>,
79}
80
81impl Clone for Connection {
82 fn clone(&self) -> Self {
83 Self {
84 inner: Arc::clone(&self.inner),
85 }
86 }
87}
88
89// No `Drop` for `Connection`: cancellation happens deterministically
90// when the last `Arc<ConnectionInner>` clone is dropped, which runs
91// `ConnectionInner::drop` and releases the cancel-token DropGuard.
92
93impl Deref for Connection {
94 type Target = ConnectionInner;
95 fn deref(&self) -> &ConnectionInner {
96 &self.inner
97 }
98}
99
100impl Connection {
101 /// Tear this connection down explicitly.
102 ///
103 /// 1. Cancels the long-lived list-changed listener task immediately
104 /// (drops the [`DropGuard`] in
105 /// [`ConnectionInner::_listener_cancel_guard`]), so by the time
106 /// the HTTP DELETE goes out the listener isn't still holding an
107 /// SSE read open against the upstream we're about to close.
108 /// 2. Issues `DELETE /` to the upstream with this connection's
109 /// `Mcp-Session-Id` and the same merged header set every other
110 /// RPC stamps. Reuses [`ConnectionInner::call_timeout`].
111 /// 3. Treats `404 / 401 / 403` as success — the upstream is
112 /// unreachable from these credentials anyway, which is the
113 /// desired terminal state. Other non-2xx surfaces as
114 /// [`super::Error::BadStatus`].
115 ///
116 /// Takes `&self`: the listener cancel is in-place, and dropping
117 /// the surrounding `Arc<ConnectionInner>` (which closes the rest
118 /// of the connection's owned state) is the caller's responsibility
119 /// — usually by dropping the `Arc<Session>` holding it. Stateless
120 /// callers that don't hold a `Connection` should use
121 /// [`Client::delete`](super::Client::delete) instead.
122 ///
123 /// **In-flight RPC ordering.** This method does not block on
124 /// in-flight `call_tool` / `read_resource` / `list_tools` /
125 /// `list_resources` calls on the same connection. If one is
126 /// outstanding when `delete` lands, the upstream may see DELETE
127 /// before the RPC's reply makes it back; the in-flight call then
128 /// surfaces as a closed-connection error to whoever started it.
129 /// That's the spec-correct order (client said terminate) — drain
130 /// on the caller side first if you need different semantics.
131 pub async fn delete(&self) -> Result<(), super::Error> {
132 // 1. Mark the connection as "used" so the drop-time
133 // orphan-DELETE check (see `ConnectionInner::Drop`) skips
134 // its own fan-out. Without this, a fresh-mint connection
135 // that's explicitly torn down via `delete()` would race
136 // its own drop-time orphan into a duplicate upstream
137 // DELETE.
138 self.inner.any_calls.store(true, Ordering::Relaxed);
139
140 // 2. Drop the listener-cancel guard. Releasing the `DropGuard`
141 // cancels the sibling `CancellationToken` the listener task
142 // holds; the listener `tokio::select!`s against it on every
143 // blocking await and exits inside one scheduler tick.
144 if let Ok(mut guard) = self.inner._listener_cancel_guard.lock() {
145 let _ = guard.take();
146 }
147
148 // 3. Mock connections never opened an HTTP session, so the
149 // DELETE call would 404. Short-circuit to success.
150 if self.inner.mock {
151 return Ok(());
152 }
153
154 // 4. Build + send HTTP DELETE. Mirrors `Client::connect_once`'s
155 // request-stamp shape: header loop first, explicit
156 // `Mcp-Session-Id` always wins.
157 let request = self
158 .inner
159 .http_client
160 .delete(&self.inner.url)
161 .timeout(self.inner.call_timeout)
162 .headers(self.inner.build_request_headers(None, None).await);
163 let response = request.send().await.map_err(|source| {
164 super::Error::Request {
165 url: self.inner.url.clone(),
166 source,
167 }
168 })?;
169
170 // 5. 404 / 401 / 403 → success; other non-2xx → real error.
171 let status = response.status();
172 if matches!(
173 status,
174 reqwest::StatusCode::NOT_FOUND
175 | reqwest::StatusCode::UNAUTHORIZED
176 | reqwest::StatusCode::FORBIDDEN
177 ) {
178 return Ok(());
179 }
180 if !status.is_success() {
181 let body = response.text().await.unwrap_or_default();
182 return Err(super::Error::BadStatus {
183 url: self.inner.url.clone(),
184 code: status,
185 body: body.chars().take(800).collect(),
186 });
187 }
188 Ok(())
189 }
190
191 pub(super) async fn new(
192 http_client: reqwest::Client,
193 url: String,
194 session_id: String,
195 headers: IndexMap<String, String>,
196 backoff_current_interval: Duration,
197 backoff_initial_interval: Duration,
198 backoff_randomization_factor: f64,
199 backoff_multiplier: f64,
200 backoff_max_interval: Duration,
201 backoff_max_elapsed_time: Duration,
202 call_timeout: Duration,
203 initialize_result: super::initialize_result::InitializeResult,
204 initial_sse_lines: Option<super::LinesStream>,
205 is_reconnect: bool,
206 ) -> Self {
207 let inner = ConnectionInner::new(
208 http_client,
209 url,
210 session_id,
211 headers,
212 backoff_current_interval,
213 backoff_initial_interval,
214 backoff_randomization_factor,
215 backoff_multiplier,
216 backoff_max_interval,
217 backoff_max_elapsed_time,
218 call_timeout,
219 initialize_result,
220 initial_sse_lines,
221 is_reconnect,
222 )
223 .await;
224 Self { inner }
225 }
226
227 pub(super) fn new_mock(url: String) -> Self {
228 Self {
229 inner: ConnectionInner::new_mock(url),
230 }
231 }
232
233 #[cfg(test)]
234 pub(crate) fn new_for_test(name: String, url: String) -> Self {
235 Self {
236 inner: ConnectionInner::new_for_test(name, url),
237 }
238 }
239
240 /// Send a JSON-RPC notification to the upstream. Used by `Client`
241 /// right after `initialize` to send `notifications/initialized`.
242 pub(super) async fn notify<P: serde::Serialize>(
243 &self,
244 method: &str,
245 params: &P,
246 ) -> Result<(), super::Error> {
247 self.inner.notify(method, params).await
248 }
249
250 /// Returns a key identifying this connection for tool namespacing.
251 pub fn tool_key(&self) -> String {
252 self.inner.tool_key()
253 }
254
255 /// Returns the session ID for this connection.
256 pub fn session_id(&self) -> &str {
257 self.inner.session_id()
258 }
259
260 /// Returns all tools from the upstream server.
261 pub async fn list_tools(
262 &self,
263 ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
264 self.inner.list_tools().await
265 }
266
267 /// Calls a tool on the upstream server.
268 pub async fn call_tool(
269 &self,
270 params: &super::tool::CallToolRequestParams,
271 ) -> Result<super::tool::CallToolResult, super::Error> {
272 self.inner.call_tool(params).await
273 }
274
275 /// Calls a tool and converts the result into a [`ToolMessage`].
276 pub async fn call_tool_as_message(
277 &self,
278 params: &super::tool::CallToolRequestParams,
279 tool_call_id: String,
280 ) -> Result<crate::agent::completions::message::ToolMessage, super::Error>
281 {
282 self.inner.call_tool_as_message(params, tool_call_id).await
283 }
284
285 /// Returns all resources from the upstream server.
286 pub async fn list_resources(
287 &self,
288 ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
289 self.inner.list_resources().await
290 }
291
292 /// Returns the cached tool list as soon as it differs from `current`,
293 /// or waits up to `timeout` for the next `notifications/tools/list_changed`
294 /// from the upstream server before re-reading.
295 ///
296 /// Wakes the moment a refresh writer takes the cache write lock, so
297 /// the post-wake `read` is guaranteed to observe the new list rather
298 /// than racing against the install. Safe to call from any number of
299 /// tasks concurrently.
300 pub async fn subscribe_tools(
301 &self,
302 current: &[super::tool::Tool],
303 timeout: Duration,
304 ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
305 self.inner.subscribe_tools(current, timeout).await
306 }
307
308 /// Resource counterpart of [`Connection::subscribe_tools`].
309 pub async fn subscribe_resources(
310 &self,
311 current: &[super::resource::Resource],
312 timeout: Duration,
313 ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
314 self.inner.subscribe_resources(current, timeout).await
315 }
316
317 /// Atomically drain the proxy's `pending_notifications` queue for
318 /// this session via `GET /notify` and return the queued content
319 /// blocks. A second call returns `[]` until the next out-of-band
320 /// `POST /notify`.
321 ///
322 /// Intended for use at the start of an agent turn so notifications
323 /// queued between turns — when the prior turn ended without a tool
324 /// call, or the user is starting a fresh continuation — surface as
325 /// a user message instead of being lost. The proxy's existing
326 /// `tools/call` response path still drains in-flight notifications
327 /// arriving *during* a turn; this method covers the gap between
328 /// turns.
329 ///
330 /// A 404 from the proxy (session unknown — possible after a proxy
331 /// restart) is mapped to an empty `Vec` so callers do not need to
332 /// distinguish "no notifications" from "lost session" at the use
333 /// site; the next upstream call will surface the lost-session
334 /// condition through its own error path.
335 pub async fn drain_notifications(
336 &self,
337 ) -> Result<Vec<super::tool::ContentBlock>, super::Error> {
338 self.inner.drain_notifications().await
339 }
340
341 /// Non-draining peek at the proxy's `pending_notifications` queue
342 /// via `GET /notify/queued`. Returns `true` iff the queue holds at
343 /// least one block. Companion to [`Connection::drain_notifications`]
344 /// for callers that want to know whether queued blocks exist
345 /// without consuming them.
346 ///
347 /// A 404 from the proxy (session unknown — possible after a proxy
348 /// restart) is mapped to `Ok(false)` for the same reason as the
349 /// drain path: callers do not need to distinguish "no
350 /// notifications" from "lost session" at the use site.
351 pub async fn has_pending_notifications(
352 &self,
353 ) -> Result<bool, super::Error> {
354 self.inner.has_pending_notifications().await
355 }
356
357 /// `POST <self.url>/notify` against the ObjectiveAI MCP proxy.
358 /// Appends `blocks` to the proxy's pending-notifications queue for
359 /// this session; they surface as a user message on the next
360 /// `tools/call` response (wrapped in a `<system-reminder>` block)
361 /// or as the head of the next agent turn when drained between turns.
362 ///
363 /// Mirror of [`Connection::drain_notifications`] / [`Connection::has_pending_notifications`]
364 /// for the inbound side. A 404 from the proxy means the session is
365 /// gone — surfaced as `SessionExpired` so callers can distinguish
366 /// "session lost" from "delivery failed" at the use site.
367 pub async fn enqueue_notifications(
368 &self,
369 blocks: &[super::tool::ContentBlock],
370 ) -> Result<(), super::Error> {
371 self.inner.enqueue_notifications(blocks).await
372 }
373
374 /// Reads a resource from the upstream server.
375 pub async fn read_resource(
376 &self,
377 uri: &str,
378 ) -> Result<super::resource::ReadResourceResult, super::Error> {
379 self.inner.read_resource(uri).await
380 }
381
382 /// Register a callback to fire whenever the upstream emits
383 /// `notifications/tools/list_changed`.
384 ///
385 /// **Timing:** the callback runs *after* the tool cache's write lock
386 /// is acquired but *before* the network paginate that replaces it.
387 /// That means readers blocked on the read lock won't return until the
388 /// new list is in place, and the callback observes the moment the
389 /// staleness window opens. The proxy uses this to emit its own
390 /// `notifications/tools/list_changed` to downstream clients at the
391 /// right instant.
392 ///
393 /// Replaces any previously-registered tools-list-changed callback.
394 /// All clones of this `Connection` share the same callback slot.
395 pub fn set_on_tools_list_changed<F>(&self, callback: F)
396 where
397 F: Fn() + Send + Sync + 'static,
398 {
399 self.inner.on_tools_list_changed.set(Arc::new(callback));
400 }
401
402 /// Register a callback to fire whenever the upstream emits
403 /// `notifications/resources/list_changed`. Same timing contract as
404 /// [`Connection::set_on_tools_list_changed`].
405 ///
406 /// Replaces any previously-registered resources-list-changed callback.
407 /// All clones of this `Connection` share the same callback slot.
408 pub fn set_on_resources_list_changed<F>(&self, callback: F)
409 where
410 F: Fn() + Send + Sync + 'static,
411 {
412 self.inner.on_resources_list_changed.set(Arc::new(callback));
413 }
414
415 /// Atomically replace the connection's [`ConnectionInner::extra_headers`]
416 /// bag. Every subsequent outbound HTTP request from this connection
417 /// stamps the new map AFTER `headers`, with `HeaderMap::insert`
418 /// REPLACE semantics — keys in `extras` override the same key in
419 /// the per-URL `headers` bag set at `Client::connect`. Caller
420 /// supplies the FULL replacement map; missing keys are dropped
421 /// (no merge).
422 ///
423 /// Used by the proxy to inject session-global headers
424 /// (`X-OBJECTIVEAI-RESPONSE-ID`, `X-OBJECTIVEAI-RESPONSE-IDS`)
425 /// that re-set on every inbound `initialize`, without re-dialing
426 /// the upstream.
427 pub async fn set_extra_headers(
428 &self,
429 extras: IndexMap<String, String>,
430 ) {
431 *self.inner.extra_headers.write().await = extras;
432 }
433}
434
435/// The actual connection state. Behind an `Arc` inside [`Connection`].
436///
437/// Fields are public for read-only access (callers reach them via
438/// `Connection`'s `Deref`), but every method on this type is private —
439/// the public surface lives on [`Connection`] and delegates through.
440#[derive(Debug)]
441pub struct ConnectionInner {
442 pub http_client: reqwest::Client,
443 pub url: String,
444 pub session_id: String,
445 /// All HTTP headers stamped on every POST / GET this connection
446 /// makes — the same merged map (defaults + caller overrides) the
447 /// `Client` built once during connect. `Mcp-Session-Id`,
448 /// `Content-Type`, and `Accept` are still set by the request
449 /// builders and override anything in `headers`.
450 pub headers: IndexMap<String, String>,
451 /// Mutable per-request override layer stamped AFTER `headers` on
452 /// every outbound HTTP request. The request-builder uses
453 /// `reqwest::header::HeaderMap::insert` semantics so any key
454 /// present in `extra_headers` REPLACES the same key in `headers`.
455 /// Used by the proxy to inject session-global headers
456 /// (`X-OBJECTIVEAI-RESPONSE-ID` etc.) that override per-URL
457 /// values without re-dialing. Empty by default; set via
458 /// [`Connection::set_extra_headers`].
459 pub extra_headers: RwLock<IndexMap<String, String>>,
460
461 pub backoff_current_interval: Duration,
462 pub backoff_initial_interval: Duration,
463 pub backoff_randomization_factor: f64,
464 pub backoff_multiplier: f64,
465 pub backoff_max_interval: Duration,
466 pub backoff_max_elapsed_time: Duration,
467 pub call_timeout: Duration,
468
469 /// The server's capabilities and info from the initialize response.
470 pub initialize_result: super::initialize_result::InitializeResult,
471
472 /// If true, all RPC/notify calls are no-ops. Used for mock orchestrator URLs.
473 mock: bool,
474
475 /// `true` iff this connection was opened by resuming an existing
476 /// upstream session — i.e. [`Client::connect`](super::Client::connect)
477 /// was called with `session_id: Some(...)`. Set once at
478 /// construction; never mutated.
479 ///
480 /// Used by the drop-time orphan-DELETE gate in
481 /// [`ConnectionInner::Drop`]: reconnects are **excluded** from
482 /// orphan DELETE because their `any_calls == false` only means
483 /// "this `Connection` instance didn't use it" — an earlier
484 /// instance that opened the upstream session may have. Only
485 /// freshly-minted connections (`is_reconnect == false`) that no
486 /// one used get the drop-time orphan DELETE.
487 is_reconnect: bool,
488
489 /// `true` once any [`Self::call_tool`] or [`Self::read_resource`]
490 /// has been issued through this connection. Listings
491 /// ([`Self::list_tools`], [`Self::list_resources`]) and the
492 /// proxy-side notification helpers do NOT flip this — only
493 /// deliberate use of the upstream session counts.
494 ///
495 /// Stored atomically because the setters live behind `&self`-only
496 /// call paths; `Ordering::Relaxed` is sufficient (we never
497 /// synchronize anything else against this load/store).
498 ///
499 /// Also flipped to `true` at the top of [`super::Connection::delete`]
500 /// so an explicit teardown of a fresh-mint connection can't race
501 /// the drop-time orphan-DELETE into a double-fire.
502 any_calls: AtomicBool,
503
504 /// Auto-incrementing request ID (starts at 2; 1 was used for initialize).
505 next_id: AtomicU64,
506
507 /// All tools from the server, populated by background pagination.
508 ///
509 /// `None` = cache cleared — either pre-populate (between
510 /// [`Self::new`] and the first `refresh_tools_signaling`) or
511 /// post-drop (the listener empties this the moment its SSE
512 /// stream ends so `list_tools` will re-paginate against the
513 /// upstream rather than return stale state). `Some(_)` = last
514 /// known result, `Ok` or `Err`.
515 tools:
516 RwLock<Option<Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>>>>,
517 /// All resources from the server, populated by background pagination.
518 /// Same `None`/`Some` semantics as [`Self::tools`].
519 resources: RwLock<
520 Option<Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>>>,
521 >,
522
523 /// Cancellation token for the long-lived `listen_for_list_changes`
524 /// task. The listener selects this against every blocking await
525 /// (read, reconnect-send, backoff-sleep) and returns the instant it
526 /// fires.
527 ///
528 /// Held inside the connection as a [`DropGuard`] so that the moment
529 /// the last `Arc<ConnectionInner>` clone is dropped — i.e. the
530 /// moment no external `Connection` handle remains — `Drop` runs on
531 /// the guard, the token cancels, and the listener task tears down.
532 /// The listener itself holds a sibling `CancellationToken` (clone),
533 /// not the guard, so its task does not extend the connection's
534 /// lifetime.
535 ///
536 /// Wrapped in `Mutex<Option<_>>` so explicit teardown paths
537 /// ([`Connection::delete`]) can drop the guard in place — firing
538 /// the cancel token *before* the surrounding `Arc<ConnectionInner>`
539 /// goes away. Regular drop still works: `Mutex<Option<DropGuard>>`
540 /// drops its inner `DropGuard` automatically when the mutex itself
541 /// drops, so existing `Connection::Drop` semantics are unchanged.
542 _listener_cancel_guard: std::sync::Mutex<Option<DropGuard>>,
543
544 /// Optional callback fired *after* the listener has refreshed the
545 /// tool cache in response to an upstream `notifications/tools/list_changed`.
546 /// Set via [`Connection::set_on_tools_list_changed`].
547 on_tools_list_changed: CallbackSlot,
548
549 /// Optional callback fired *after* the listener has refreshed the
550 /// resource cache in response to an upstream
551 /// `notifications/resources/list_changed`.
552 /// Set via [`Connection::set_on_resources_list_changed`].
553 on_resources_list_changed: CallbackSlot,
554
555 /// Wakes any task awaiting in [`Connection::subscribe_tools`]. Fired
556 /// from inside `refresh_tools_signaling` the moment the writer
557 /// acquires the cache write lock — *before* the new list is
558 /// installed. A woken subscriber's next `read().await` blocks behind
559 /// the writer's guard, so it always observes the post-swap state.
560 tools_changed: Notify,
561
562 /// Resource counterpart of [`Self::tools_changed`].
563 resources_changed: Notify,
564}
565
566impl ConnectionInner {
567 /// Creates a mock connection that never makes network requests.
568 /// All RPC calls return empty/default results.
569 fn new_mock(url: String) -> Arc<Self> {
570 Arc::new(Self {
571 http_client: reqwest::Client::new(),
572 url,
573 session_id: String::new(),
574 headers: IndexMap::new(),
575 extra_headers: RwLock::new(IndexMap::new()),
576 backoff_current_interval: Duration::ZERO,
577 backoff_initial_interval: Duration::ZERO,
578 backoff_randomization_factor: 0.0,
579 backoff_multiplier: 1.0,
580 backoff_max_interval: Duration::ZERO,
581 backoff_max_elapsed_time: Duration::ZERO,
582 call_timeout: Duration::ZERO,
583 initialize_result: super::initialize_result::InitializeResult {
584 protocol_version: "2025-03-26".into(),
585 capabilities: super::initialize_result::ServerCapabilities {
586 experimental: None,
587 logging: None,
588 completions: None,
589 prompts: None,
590 resources: None,
591 tools: None,
592 tasks: None,
593 },
594 server_info: super::initialize_result::Implementation {
595 name: "mock".into(),
596 title: None,
597 version: "0.0.0".into(),
598 website_url: None,
599 description: None,
600 icons: None,
601 },
602 instructions: None,
603 _meta: None,
604 },
605 mock: true,
606 is_reconnect: false,
607 any_calls: AtomicBool::new(false),
608 next_id: AtomicU64::new(2),
609 // Mock has no listener and never refreshes; seed with an
610 // empty Ok so `list_tools` returns immediately instead of
611 // trying to paginate over a non-existent upstream.
612 tools: RwLock::new(Some(Ok(Arc::new(Vec::new())))),
613 resources: RwLock::new(Some(Ok(Arc::new(Vec::new())))),
614 _listener_cancel_guard: std::sync::Mutex::new(None),
615 on_tools_list_changed: CallbackSlot::new(),
616 on_resources_list_changed: CallbackSlot::new(),
617 tools_changed: Notify::new(),
618 resources_changed: Notify::new(),
619 })
620 }
621
622 /// Creates a minimal connection for unit testing.
623 #[cfg(test)]
624 fn new_for_test(name: String, url: String) -> Arc<Self> {
625 Arc::new(Self {
626 http_client: reqwest::Client::new(),
627 url,
628 session_id: String::new(),
629 headers: IndexMap::new(),
630 extra_headers: RwLock::new(IndexMap::new()),
631 backoff_current_interval: Duration::from_millis(500),
632 backoff_initial_interval: Duration::from_millis(500),
633 backoff_randomization_factor: 0.5,
634 backoff_multiplier: 1.5,
635 backoff_max_interval: Duration::from_secs(60),
636 backoff_max_elapsed_time: Duration::from_secs(900),
637 call_timeout: Duration::from_secs(30),
638 initialize_result: super::initialize_result::InitializeResult {
639 protocol_version: "2025-03-26".into(),
640 capabilities: super::initialize_result::ServerCapabilities {
641 experimental: None,
642 logging: None,
643 completions: None,
644 prompts: None,
645 resources: None,
646 tools: None,
647 tasks: None,
648 },
649 server_info: super::initialize_result::Implementation {
650 name,
651 title: None,
652 version: "0.0.0".into(),
653 website_url: None,
654 description: None,
655 icons: None,
656 },
657 instructions: None,
658 _meta: None,
659 },
660 mock: false,
661 is_reconnect: false,
662 any_calls: AtomicBool::new(false),
663 next_id: AtomicU64::new(2),
664 // Test connection has no listener and never refreshes; seed
665 // with an empty Ok so `list_tools` doesn't try to paginate.
666 tools: RwLock::new(Some(Ok(Arc::new(Vec::new())))),
667 resources: RwLock::new(Some(Ok(Arc::new(Vec::new())))),
668 _listener_cancel_guard: std::sync::Mutex::new(None),
669 on_tools_list_changed: CallbackSlot::new(),
670 on_resources_list_changed: CallbackSlot::new(),
671 tools_changed: Notify::new(),
672 resources_changed: Notify::new(),
673 })
674 }
675
676 /// Creates a new connection and spawns background tasks to paginate
677 /// all tools and resources. Called internally by
678 /// [`Client::connect`](super::Client::connect) (via [`Connection::new`]).
679 ///
680 /// `initial_sse_lines`, if `Some`, is a pre-opened SSE line reader
681 /// that the list-changed listener will read from immediately on its
682 /// first iteration, instead of opening its own GET `/`. The caller
683 /// is responsible for arranging for one of these to exist whenever
684 /// the upstream advertises `tools.list_changed` or
685 /// `resources.list_changed` — see
686 /// [`Client::connect`](super::Client::connect).
687 async fn new(
688 http_client: reqwest::Client,
689 url: String,
690 session_id: String,
691 headers: IndexMap<String, String>,
692 backoff_current_interval: Duration,
693 backoff_initial_interval: Duration,
694 backoff_randomization_factor: f64,
695 backoff_multiplier: f64,
696 backoff_max_interval: Duration,
697 backoff_max_elapsed_time: Duration,
698 call_timeout: Duration,
699 initialize_result: super::initialize_result::InitializeResult,
700 initial_sse_lines: Option<super::LinesStream>,
701 is_reconnect: bool,
702 ) -> Arc<Self> {
703 // Cancel-the-listener machinery: store the DropGuard inside the
704 // inner so the cancellation fires deterministically when the
705 // last external `Arc<ConnectionInner>` clone drops. Hand the
706 // listener task a sibling clone (no guard) — that way the
707 // listener task's lifetime does not extend the connection.
708 let listener_cancel = CancellationToken::new();
709 let listener_cancel_for_task = listener_cancel.clone();
710 let conn = Arc::new(Self {
711 http_client,
712 url,
713 session_id,
714 headers,
715 extra_headers: RwLock::new(IndexMap::new()),
716 backoff_current_interval,
717 backoff_initial_interval,
718 backoff_randomization_factor,
719 backoff_multiplier,
720 backoff_max_interval,
721 backoff_max_elapsed_time,
722 call_timeout,
723 initialize_result,
724 mock: false,
725 is_reconnect,
726 any_calls: AtomicBool::new(false),
727 next_id: AtomicU64::new(2),
728 // Start empty; `refresh_tools_signaling` below installs
729 // `Some(_)` before `new` returns (the lock-handoff oneshot
730 // gates the return on the writer holding the lock).
731 tools: RwLock::new(None),
732 resources: RwLock::new(None),
733 _listener_cancel_guard: std::sync::Mutex::new(Some(
734 listener_cancel.drop_guard(),
735 )),
736 on_tools_list_changed: CallbackSlot::new(),
737 on_resources_list_changed: CallbackSlot::new(),
738 tools_changed: Notify::new(),
739 resources_changed: Notify::new(),
740 });
741
742 // Spawn background tool lister if the server supports tools.
743 //
744 // We don't return until the spawned task has acquired the write
745 // lock. Otherwise a caller that immediately reads `list_tools()`
746 // could race the writer — `tokio::spawn` only queues the task,
747 // and a fast reader can acquire the read lock before the writer
748 // has run its first instruction. The reader would then see the
749 // initial empty `Vec` and return that, even though a full
750 // populate is in flight.
751 //
752 // The `RwLockWriteGuard` itself isn't `Send`-friendly enough to
753 // pass back, so we use a oneshot to signal "I'm holding the
754 // lock now"; once we receive that, the cache is exclusively
755 // owned by the writer and any subsequent `read().await` from
756 // the caller is guaranteed to wait for the populate to finish.
757 if conn.initialize_result.capabilities.tools.is_some() {
758 let conn = Arc::clone(&conn);
759 let (lock_held_tx, lock_held_rx) = tokio::sync::oneshot::channel();
760 tokio::spawn(async move {
761 conn.refresh_tools_signaling(lock_held_tx, None).await;
762 });
763 // Wait for the writer to hold the lock before returning.
764 let _ = lock_held_rx.await;
765 }
766
767 // Spawn background resource lister if the server supports
768 // resources. Same lock-handoff contract as tools above.
769 if conn.initialize_result.capabilities.resources.is_some() {
770 let conn = Arc::clone(&conn);
771 let (lock_held_tx, lock_held_rx) = tokio::sync::oneshot::channel();
772 tokio::spawn(async move {
773 conn.refresh_resources_signaling(lock_held_tx, None).await;
774 });
775 let _ = lock_held_rx.await;
776 }
777
778 // Spawn the list-changed listener iff the caller handed us a
779 // pre-opened SSE stream. The connection is naive about
780 // `tools.list_changed` / `resources.list_changed` capabilities —
781 // [`Client::connect`](super::Client::connect) translates them
782 // into "did or didn't open a stream for us." If we get a stream,
783 // we listen on it; if we don't, there's nothing to listen for.
784 if let Some(initial_lines) = initial_sse_lines {
785 // Hand the listener a `Weak` so the spawned task itself does
786 // not keep the connection alive. `listener_cancel_for_task`
787 // is a sibling clone of the connection's own
788 // `_listener_cancel_guard` token — when the last external
789 // `Arc<ConnectionInner>` clone is dropped, the inner's Drop
790 // releases the guard and the listener wakes from any
791 // pending await (read, send, sleep) and exits immediately.
792 let weak = Arc::downgrade(&conn);
793 tokio::spawn(async move {
794 Self::listen_for_list_changes(
795 weak,
796 listener_cancel_for_task,
797 initial_lines,
798 )
799 .await;
800 });
801 }
802
803 conn
804 }
805
806 /// Creates an exponential backoff configuration from the connection's fields.
807 fn backoff(&self) -> backoff::ExponentialBackoff {
808 backoff::ExponentialBackoff {
809 current_interval: self.backoff_current_interval,
810 initial_interval: self.backoff_initial_interval,
811 randomization_factor: self.backoff_randomization_factor,
812 multiplier: self.backoff_multiplier,
813 max_interval: self.backoff_max_interval,
814 start_time: std::time::Instant::now(),
815 max_elapsed_time: Some(self.backoff_max_elapsed_time),
816 clock: backoff::SystemClock::default(),
817 }
818 }
819
820 /// Builds a POST request with all required headers and the call timeout.
821 async fn post(&self) -> reqwest::RequestBuilder {
822 self.http_client
823 .post(&self.url)
824 .timeout(self.call_timeout)
825 .headers(
826 self.build_request_headers(
827 Some("application/json"),
828 Some("application/json, text/event-stream"),
829 )
830 .await,
831 )
832 }
833
834 /// Build the `HeaderMap` stamped on every outbound request. Order
835 /// of insertion drives override semantics — `HeaderMap::insert`
836 /// REPLACES existing values for the same key:
837 ///
838 /// 1. Content-Type / Accept (when supplied by the caller).
839 /// 2. `self.headers` (the per-URL bag set at `Client::connect`).
840 /// 3. `self.extra_headers` (the mutable, session-global overrides
841 /// — proxies use this for `X-OBJECTIVEAI-RESPONSE-ID` etc).
842 /// 4. `Mcp-Session-Id` (the connection's own session id, always
843 /// last so it can never be shadowed).
844 async fn build_request_headers(
845 &self,
846 content_type: Option<&str>,
847 accept: Option<&str>,
848 ) -> reqwest::header::HeaderMap {
849 use reqwest::header::{
850 ACCEPT, CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue,
851 };
852 let mut hmap = HeaderMap::new();
853 if let Some(ct) = content_type {
854 if let Ok(hv) = HeaderValue::from_str(ct) {
855 hmap.insert(CONTENT_TYPE, hv);
856 }
857 }
858 if let Some(a) = accept {
859 if let Ok(hv) = HeaderValue::from_str(a) {
860 hmap.insert(ACCEPT, hv);
861 }
862 }
863 for (k, v) in &self.headers {
864 if let (Ok(hn), Ok(hv)) = (
865 HeaderName::try_from(k.as_str()),
866 HeaderValue::from_str(v),
867 ) {
868 hmap.insert(hn, hv);
869 }
870 }
871 let extras = self.extra_headers.read().await;
872 for (k, v) in extras.iter() {
873 if let (Ok(hn), Ok(hv)) = (
874 HeaderName::try_from(k.as_str()),
875 HeaderValue::from_str(v),
876 ) {
877 hmap.insert(hn, hv);
878 }
879 }
880 drop(extras);
881 if let Ok(hv) = HeaderValue::from_str(&self.session_id) {
882 hmap.insert(HeaderName::from_static("mcp-session-id"), hv);
883 }
884 hmap
885 }
886
887 /// Sends a JSON-RPC request, retrying transient errors when
888 /// `idempotent` is `true`.
889 ///
890 /// Idempotent methods (`tools/list`, `resources/list`,
891 /// `resources/read`, etc.) retry every transient error — network,
892 /// HTTP status, malformed body, JSON-RPC error, session expiration —
893 /// until the backoff's `max_elapsed_time` is exceeded.
894 ///
895 /// Non-idempotent methods (`tools/call`) make exactly one attempt.
896 /// Retrying a `tools/call` is unsafe: a tool may have mutated remote
897 /// state during the first attempt before the response was lost, and
898 /// re-firing the call would mutate state again. Each retry of
899 /// `AppendTask` advances `state.tasks.len()` an extra step, so the
900 /// agent sees a different return value than expected and the
901 /// pid-derived mock seed at the next step diverges. See
902 /// `objectiveai-api/src/agent/completions/client.rs` (sequential
903 /// dispatch) and `mock/client.rs::mock.seed_derive` for the
904 /// downstream consequence.
905 async fn rpc<P: serde::Serialize, R: serde::de::DeserializeOwned>(
906 &self,
907 method: &str,
908 params: &P,
909 idempotent: bool,
910 ) -> Result<R, super::Error> {
911 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
912 let body = serde_json::json!({
913 "jsonrpc": "2.0",
914 "id": id,
915 "method": method,
916 "params": params,
917 });
918
919 let attempt_one = || async {
920 let url = self.url.clone();
921 let response =
922 self.post().await.json(&body).send().await.map_err(|source| {
923 backoff::Error::transient(super::Error::Request {
924 url: url.clone(),
925 source,
926 })
927 })?;
928
929 if response.status() == reqwest::StatusCode::NOT_FOUND {
930 return Err(backoff::Error::transient(
931 super::Error::SessionExpired { url: url.clone() },
932 ));
933 }
934 if !response.status().is_success() {
935 let code = response.status();
936 let body = response.text().await.unwrap_or_default();
937 return Err(backoff::Error::transient(
938 super::Error::BadStatus {
939 url: url.clone(),
940 code,
941 body,
942 },
943 ));
944 }
945
946 let rpc_response: super::JsonRpcResponse<R> =
947 super::parse_streamable_http_response(&url, response)
948 .await
949 .map_err(backoff::Error::transient)?;
950
951 match rpc_response {
952 super::JsonRpcResponse::Success { result, .. } => Ok(result),
953 super::JsonRpcResponse::Error { error, .. } => {
954 Err(backoff::Error::transient(super::Error::JsonRpc {
955 url: url.clone(),
956 code: error.code,
957 message: error.message,
958 data: error.data,
959 }))
960 }
961 }
962 };
963
964 if idempotent {
965 backoff::future::retry(self.backoff(), attempt_one).await
966 } else {
967 attempt_one().await.map_err(|e| match e {
968 backoff::Error::Permanent(err)
969 | backoff::Error::Transient { err, .. } => err,
970 })
971 }
972 }
973
974 /// Sends a JSON-RPC notification (no response expected) with the
975 /// same exponential-backoff retry policy as [`Self::rpc`]. Every
976 /// error is transient; the loop gives up only when the backoff's
977 /// `max_elapsed_time` is exceeded.
978 async fn notify<P: serde::Serialize>(
979 &self,
980 method: &str,
981 params: &P,
982 ) -> Result<(), super::Error> {
983 if self.mock {
984 return Ok(());
985 }
986 let body = serde_json::json!({
987 "jsonrpc": "2.0",
988 "method": method,
989 "params": params,
990 });
991
992 backoff::future::retry(self.backoff(), || async {
993 let url = self.url.clone();
994 let response =
995 self.post().await.json(&body).send().await.map_err(|source| {
996 backoff::Error::transient(super::Error::Request {
997 url: url.clone(),
998 source,
999 })
1000 })?;
1001
1002 if response.status() == reqwest::StatusCode::NOT_FOUND {
1003 return Err(backoff::Error::transient(
1004 super::Error::SessionExpired { url: url.clone() },
1005 ));
1006 }
1007 if !response.status().is_success() {
1008 let code = response.status();
1009 let body = response.text().await.unwrap_or_default();
1010 return Err(backoff::Error::transient(
1011 super::Error::BadStatus {
1012 url: url.clone(),
1013 code,
1014 body,
1015 },
1016 ));
1017 }
1018
1019 Ok(())
1020 })
1021 .await
1022 }
1023
1024 /// `GET <self.url>/notify` against the ObjectiveAI MCP proxy.
1025 /// Atomically drains the proxy's pending-notifications queue for
1026 /// this session and returns the queued content blocks.
1027 ///
1028 /// Single-attempt — the proxy drain is destructive, so a retry
1029 /// after a transient failure would risk silently dropping
1030 /// notifications that the first attempt's response carried but
1031 /// failed to deliver. Networks errors propagate to the caller; the
1032 /// next turn's drain will pick up anything queued in the meantime.
1033 /// A 404 (session unknown) is mapped to `Ok(vec![])` — see the
1034 /// public method's doc on `Connection`.
1035 async fn drain_notifications(
1036 &self,
1037 ) -> Result<Vec<super::tool::ContentBlock>, super::Error> {
1038 if self.mock {
1039 return Ok(Vec::new());
1040 }
1041
1042 let url = format!("{}/notify", self.url.trim_end_matches('/'));
1043 let request = self
1044 .http_client
1045 .get(&url)
1046 .timeout(self.call_timeout)
1047 .headers(
1048 self.build_request_headers(None, Some("application/json"))
1049 .await,
1050 );
1051
1052 let response =
1053 request
1054 .send()
1055 .await
1056 .map_err(|source| super::Error::Request {
1057 url: url.clone(),
1058 source,
1059 })?;
1060
1061 if response.status() == reqwest::StatusCode::NOT_FOUND {
1062 return Ok(Vec::new());
1063 }
1064 if !response.status().is_success() {
1065 let code = response.status();
1066 let body = response.text().await.unwrap_or_default();
1067 return Err(super::Error::BadStatus { url, code, body });
1068 }
1069
1070 response
1071 .json::<Vec<super::tool::ContentBlock>>()
1072 .await
1073 .map_err(|source| super::Error::Request { url, source })
1074 }
1075
1076 /// `POST <self.url>/notify` against the ObjectiveAI MCP proxy.
1077 /// Appends `blocks` to the proxy's pending-notifications queue for
1078 /// this session. Single-attempt — the caller decides whether to
1079 /// retry. A 404 (session unknown) surfaces as `SessionExpired`
1080 /// rather than `Ok(())` because the caller is asking for delivery
1081 /// and a lost session means delivery did not happen.
1082 async fn enqueue_notifications(
1083 &self,
1084 blocks: &[super::tool::ContentBlock],
1085 ) -> Result<(), super::Error> {
1086 if self.mock {
1087 return Ok(());
1088 }
1089
1090 let url = format!("{}/notify", self.url.trim_end_matches('/'));
1091 let request = self
1092 .http_client
1093 .post(&url)
1094 .timeout(self.call_timeout)
1095 .headers(
1096 self.build_request_headers(
1097 Some("application/json"),
1098 Some("application/json"),
1099 )
1100 .await,
1101 )
1102 .json(blocks);
1103
1104 let response =
1105 request
1106 .send()
1107 .await
1108 .map_err(|source| super::Error::Request {
1109 url: url.clone(),
1110 source,
1111 })?;
1112
1113 if response.status() == reqwest::StatusCode::NOT_FOUND {
1114 return Err(super::Error::SessionExpired { url });
1115 }
1116 if !response.status().is_success() {
1117 let code = response.status();
1118 let body = response.text().await.unwrap_or_default();
1119 return Err(super::Error::BadStatus { url, code, body });
1120 }
1121
1122 Ok(())
1123 }
1124
1125 /// `GET <self.url>/notify/queued` against the ObjectiveAI MCP proxy.
1126 /// Non-draining peek — returns `true` iff the proxy's
1127 /// pending-notifications queue for this session is non-empty.
1128 /// A 404 (session unknown) is mapped to `Ok(false)` to match the
1129 /// drain path's soft-fallback contract.
1130 async fn has_pending_notifications(&self) -> Result<bool, super::Error> {
1131 if self.mock {
1132 return Ok(false);
1133 }
1134
1135 let url = format!("{}/notify/queued", self.url.trim_end_matches('/'));
1136 let request = self
1137 .http_client
1138 .get(&url)
1139 .timeout(self.call_timeout)
1140 .headers(
1141 self.build_request_headers(None, Some("application/json"))
1142 .await,
1143 );
1144
1145 let response =
1146 request
1147 .send()
1148 .await
1149 .map_err(|source| super::Error::Request {
1150 url: url.clone(),
1151 source,
1152 })?;
1153
1154 if response.status() == reqwest::StatusCode::NOT_FOUND {
1155 return Ok(false);
1156 }
1157 if !response.status().is_success() {
1158 let code = response.status();
1159 let body = response.text().await.unwrap_or_default();
1160 return Err(super::Error::BadStatus { url, code, body });
1161 }
1162
1163 response
1164 .json::<bool>()
1165 .await
1166 .map_err(|source| super::Error::Request { url, source })
1167 }
1168
1169 /// Returns a key identifying this connection for tool namespacing.
1170 fn tool_key(&self) -> String {
1171 format!("{}-{}", self.initialize_result.server_info.name, self.url)
1172 }
1173
1174 /// Returns the session ID for this connection.
1175 fn session_id(&self) -> &str {
1176 &self.session_id
1177 }
1178
1179 /// Sends a `tools/list` RPC call for a single page.
1180 async fn rpc_list_tools(
1181 &self,
1182 cursor: Option<&str>,
1183 ) -> Result<super::tool::ListToolsResult, super::Error> {
1184 self.rpc(
1185 "tools/list",
1186 &super::tool::ListToolsRequest {
1187 cursor: cursor.map(String::from),
1188 },
1189 true,
1190 )
1191 .await
1192 }
1193
1194 /// Returns all tools from the server.
1195 ///
1196 /// Blocks until background pagination completes, then returns a
1197 /// cheap `Arc` clone of the result. If the cache is currently
1198 /// empty (e.g. because the listener detected its SSE stream drop
1199 /// and cleared it) this paginates inline against the upstream —
1200 /// the caller gets fresh data on the happy path, or the live
1201 /// upstream error if the connection is genuinely down, rather
1202 /// than stale pre-drop tools.
1203 async fn list_tools(
1204 &self,
1205 ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
1206 if let Some(cached) = self.tools.read().await.as_ref() {
1207 return cached.clone();
1208 }
1209 // Cache cleared; refresh inline. Concurrent callers may each
1210 // refresh — wasteful, but the proxy fans out across distinct
1211 // upstreams so a single Connection rarely sees concurrent
1212 // `list_tools` calls.
1213 self.refresh_tools(None).await;
1214 self.tools
1215 .read()
1216 .await
1217 .as_ref()
1218 .expect("refresh_tools installs Some")
1219 .clone()
1220 }
1221
1222 /// Calls a tool on the MCP server. The returned
1223 /// `CallToolResult.content` is **fully resolved**: every
1224 /// `ContentBlock::ResourceLink { uri }` whose URI appears in
1225 /// `list_resources` has already been replaced by one or more
1226 /// `ContentBlock::EmbeddedResource` blocks carrying the fetched
1227 /// contents (via `read_resource`). Unknown-URI links pass through
1228 /// untouched — the upstream server may have its own out-of-band
1229 /// resolution path the caller should preserve.
1230 ///
1231 /// Resolving inside `call_tool` (rather than downstream in
1232 /// `call_tool_as_message`) means every consumer of the result
1233 /// sees `EmbeddedResource` shapes uniformly; the stateless
1234 /// `From<ContentBlock>` impl is then enough to convert the whole
1235 /// result to `RichContent` with no further connection work.
1236 async fn call_tool(
1237 &self,
1238 params: &super::tool::CallToolRequestParams,
1239 ) -> Result<super::tool::CallToolResult, super::Error> {
1240 // Mark the connection as deliberately used. Flipped at the top
1241 // of the method (not after success) because even a failed
1242 // `tools/call` may have mutated upstream state — we don't want
1243 // the drop-time orphan-DELETE second-guessing that.
1244 self.any_calls.store(true, Ordering::Relaxed);
1245 if self.mock {
1246 return Ok(super::tool::CallToolResult {
1247 content: vec![super::tool::ContentBlock::Text(
1248 super::tool::TextContent {
1249 text: "mock".to_string(),
1250 annotations: None,
1251 _meta: None,
1252 },
1253 )],
1254 structured_content: None,
1255 is_error: None,
1256 _meta: None,
1257 });
1258 }
1259 let mut result: super::tool::CallToolResult =
1260 self.rpc("tools/call", params, false).await?;
1261
1262 // Build the known-resource URI set for ResourceLink
1263 // resolution. `list_resources` failure → empty set (same
1264 // safe fallback the resolution path used previously).
1265 let known_uris: std::collections::HashSet<String> =
1266 match self.list_resources().await {
1267 Ok(rs) => rs.iter().map(|r| r.uri.clone()).collect(),
1268 Err(_) => std::collections::HashSet::new(),
1269 };
1270
1271 // Walk the blocks, replacing each resolvable ResourceLink
1272 // with one EmbeddedResource per returned ResourceContentsUnion.
1273 // Everything else (Text, Image, Audio, EmbeddedResource,
1274 // unknown-URI ResourceLinks) passes through.
1275 let mut resolved: Vec<super::tool::ContentBlock> =
1276 Vec::with_capacity(result.content.len());
1277 for block in std::mem::take(&mut result.content) {
1278 match block {
1279 super::tool::ContentBlock::ResourceLink(link)
1280 if known_uris.contains(&link.uri) =>
1281 {
1282 let read = self.read_resource(&link.uri).await?;
1283 for contents in read.contents {
1284 resolved.push(
1285 super::tool::ContentBlock::EmbeddedResource(
1286 super::tool::EmbeddedResource {
1287 resource: contents,
1288 // ResourceLink's annotations
1289 // don't have a perfect home on
1290 // EmbeddedResource — both fields
1291 // exist but they describe different
1292 // shapes (the link vs the inlined
1293 // contents). Drop them on the way
1294 // in; the EmbeddedResource is now
1295 // a fresh authoritative block.
1296 annotations: None,
1297 _meta: None,
1298 },
1299 ),
1300 );
1301 }
1302 }
1303 other => resolved.push(other),
1304 }
1305 }
1306 result.content = resolved;
1307 Ok(result)
1308 }
1309
1310 /// Calls a tool and converts the (already-resolved) result into a
1311 /// [`ToolMessage`]. Resource resolution happens inside
1312 /// [`Self::call_tool`] — by the time we get the blocks here every
1313 /// resolvable `ResourceLink` has already been replaced with an
1314 /// `EmbeddedResource`, so the conversion is a pure stateless
1315 /// element-wise map through [`From<ContentBlock> for
1316 /// RichContentPart`](crate::agent::completions::message::RichContentPart).
1317 ///
1318 /// Content-block mapping (handled by the `From` impl):
1319 /// - `text` → text part
1320 /// - `image` → image_url part (data URL)
1321 /// - `audio` → input_audio part
1322 /// - `embedded_resource` (text) → text part
1323 /// - `embedded_resource` (blob, image mime) → image_url part
1324 /// - `embedded_resource` (blob, audio mime) → input_audio part
1325 /// - `embedded_resource` (blob, video mime) → input_video part
1326 /// - `embedded_resource` (blob, other mime) → file part
1327 /// - `resource_link` (unknown URI) → JSON-text fallback
1328 /// (resolvable URIs were already resolved upstream)
1329 async fn call_tool_as_message(
1330 &self,
1331 params: &super::tool::CallToolRequestParams,
1332 tool_call_id: String,
1333 ) -> Result<crate::agent::completions::message::ToolMessage, super::Error>
1334 {
1335 use crate::agent::completions::message::{
1336 RichContentPart, ToolMessage, ToolResponseMetadata,
1337 };
1338
1339 let result = self.call_tool(params).await?;
1340
1341 let parts: Vec<RichContentPart> =
1342 result.content.into_iter().map(Into::into).collect();
1343
1344 // Lossy-decode the MCP `_meta` extension bag into our typed
1345 // `ToolResponseMetadata`. Unknown keys (set by non-objectiveai
1346 // upstreams) are silently dropped. Decoding failure leaves
1347 // metadata as `None`.
1348 let metadata = result._meta.as_ref().and_then(|m| {
1349 serde_json::from_value::<ToolResponseMetadata>(
1350 serde_json::to_value(m).ok()?,
1351 )
1352 .ok()
1353 });
1354
1355 Ok(ToolMessage {
1356 content: parts.into(),
1357 tool_call_id,
1358 metadata,
1359 })
1360 }
1361
1362 /// Sends a `resources/list` RPC call for a single page.
1363 async fn rpc_list_resources(
1364 &self,
1365 cursor: Option<&str>,
1366 ) -> Result<super::resource::ListResourcesResult, super::Error> {
1367 self.rpc(
1368 "resources/list",
1369 &super::resource::ListResourcesRequest {
1370 cursor: cursor.map(String::from),
1371 },
1372 true,
1373 )
1374 .await
1375 }
1376
1377 /// Returns all resources from the server.
1378 ///
1379 /// Same cache-or-refresh semantics as [`Self::list_tools`]: a
1380 /// cleared cache (post-drop or pre-populate) triggers an inline
1381 /// paginate against the upstream.
1382 async fn list_resources(
1383 &self,
1384 ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
1385 if let Some(cached) = self.resources.read().await.as_ref() {
1386 return cached.clone();
1387 }
1388 self.refresh_resources(None).await;
1389 self.resources
1390 .read()
1391 .await
1392 .as_ref()
1393 .expect("refresh_resources installs Some")
1394 .clone()
1395 }
1396
1397 /// Returns the cached tool list as soon as it differs from `current`,
1398 /// or — if it equals `current` right now — waits up to `timeout` for
1399 /// the cache to change and then returns whatever it sees.
1400 ///
1401 /// An `Err` cache is treated as "different from any caller snapshot"
1402 /// and returned immediately.
1403 ///
1404 /// Concurrency-safe: any number of concurrent subscribers wait on
1405 /// independent `Notified` futures and read the cache through the
1406 /// shared `RwLock`. A timeout that fires alone is not an error — we
1407 /// re-read the cache and return whatever's there.
1408 async fn subscribe_tools(
1409 &self,
1410 current: &[super::tool::Tool],
1411 timeout: Duration,
1412 ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
1413 // Arm BEFORE reading. `enable()` registers the future in the
1414 // wait queue without polling, so a `notify_waiters` racing
1415 // between our read and our await still wakes us.
1416 let notified = self.tools_changed.notified();
1417 tokio::pin!(notified);
1418 notified.as_mut().enable();
1419
1420 // `list_tools` handles a cleared cache (post-drop) by
1421 // paginating inline. A `None` initial state can't be
1422 // compared to the caller's snapshot meaningfully — promote
1423 // to whatever the refresh installs.
1424 let initial = self.list_tools().await;
1425 match &initial {
1426 Ok(arc) if arc.as_slice() == current => {}
1427 _ => return initial,
1428 }
1429
1430 let _ = tokio::time::timeout(timeout, notified).await;
1431
1432 self.list_tools().await
1433 }
1434
1435 /// Resource counterpart of [`Self::subscribe_tools`].
1436 async fn subscribe_resources(
1437 &self,
1438 current: &[super::resource::Resource],
1439 timeout: Duration,
1440 ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
1441 let notified = self.resources_changed.notified();
1442 tokio::pin!(notified);
1443 notified.as_mut().enable();
1444
1445 let initial = self.list_resources().await;
1446 match &initial {
1447 Ok(arc) if arc.as_slice() == current => {}
1448 _ => return initial,
1449 }
1450
1451 let _ = tokio::time::timeout(timeout, notified).await;
1452
1453 self.list_resources().await
1454 }
1455
1456 /// Reads a resource from the MCP server.
1457 async fn read_resource(
1458 &self,
1459 uri: &str,
1460 ) -> Result<super::resource::ReadResourceResult, super::Error> {
1461 // Mark the connection as deliberately used (same reasoning as
1462 // `call_tool` — see the drop-time orphan-DELETE gate).
1463 self.any_calls.store(true, Ordering::Relaxed);
1464 self.rpc(
1465 "resources/read",
1466 &super::resource::ReadResourceRequestParams {
1467 uri: uri.to_string(),
1468 },
1469 true,
1470 )
1471 .await
1472 }
1473
1474 /// Re-fetches all tools from the server, replacing the cached list.
1475 ///
1476 /// Optionally fires `on_change` *after* the write lock is acquired but
1477 /// *before* the network paginate begins, so the callback observes the
1478 /// "list change is in flight" edge — readers blocked on the read lock
1479 /// won't return until the new list lands. The proxy uses this to
1480 /// re-emit `notifications/tools/list_changed` to its downstream client
1481 /// at the moment the staleness window opens.
1482 async fn refresh_tools(&self, on_change: Option<ListChangedCallback>) {
1483 // Listener-driven refresh. Visibility contract: any caller
1484 // that issues `list_tools()` after a `tools/list_changed`
1485 // notification has been observed must see the post-swap
1486 // value, not stale data — so the write lock has to gate
1487 // readers across the upstream paginate.
1488 //
1489 // Performance contract: don't serialise paginate *behind*
1490 // lock-acquisition latency. We start `tools.write()` and the
1491 // upstream paginate **concurrently** with `tokio::join!`. The
1492 // write-lock acquire blocks new `list_tools()` readers
1493 // immediately (preserving visibility) and runs in parallel
1494 // with whatever drain time the in-flight readers need; the
1495 // paginate runs alongside. Total wall-clock is
1496 // `max(drain_time, paginate_time)` instead of the sum.
1497 //
1498 // `notify_waiters` and `on_change` fire under the write
1499 // guard, *after* `*guard = result`, so anyone awoken by them
1500 // queues on the read lock, waits for the guard to drop, and
1501 // observes the post-swap state.
1502 let (mut guard, result) =
1503 tokio::join!(self.tools.write(), self.paginate_tools(),);
1504 *guard = Some(result);
1505 self.tools_changed.notify_waiters();
1506 if let Some(cb) = on_change {
1507 cb();
1508 }
1509 }
1510
1511 /// Page-by-page fetch of the upstream tool list, no locks held.
1512 /// Shared between the `_signaling` (initial-populate, holds lock
1513 /// for the original "block fast readers" contract) and `refresh_*`
1514 /// (listener-driven, lock-only-around-install) variants.
1515 async fn paginate_tools(
1516 &self,
1517 ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
1518 let mut all_tools = Vec::new();
1519 let mut cursor: Option<String> = None;
1520 loop {
1521 match self.rpc_list_tools(cursor.as_deref()).await {
1522 Ok(page) => {
1523 all_tools.extend(page.tools);
1524 cursor = page.next_cursor;
1525 if cursor.is_none() {
1526 return Ok(Arc::new(all_tools));
1527 }
1528 }
1529 Err(e) => return Err(Arc::new(e)),
1530 }
1531 }
1532 }
1533
1534 /// Same as [`Self::refresh_tools`] but fires `lock_held` once the
1535 /// write lock has been acquired so the caller can synchronise on
1536 /// "writer is in possession of the cache" before returning. Used by
1537 /// `ConnectionInner::new` to prevent a fast reader from acquiring
1538 /// the read lock before this writer has even started.
1539 async fn refresh_tools_signaling(
1540 &self,
1541 lock_held: tokio::sync::oneshot::Sender<()>,
1542 on_change: Option<ListChangedCallback>,
1543 ) {
1544 let mut guard = self.tools.write().await;
1545 // Fire `tools_changed` while we hold the write lock and *before*
1546 // installing the new list. Any subscriber woken now must take a
1547 // read lock to observe the result, and that read lock is queued
1548 // behind this write guard — so they always see the post-swap
1549 // state, never mid-swap.
1550 self.tools_changed.notify_waiters();
1551 let _ = lock_held.send(());
1552 if let Some(cb) = on_change {
1553 cb();
1554 }
1555 let mut all_tools = Vec::new();
1556 let mut cursor: Option<String> = None;
1557 let result = loop {
1558 match self.rpc_list_tools(cursor.as_deref()).await {
1559 Ok(page) => {
1560 all_tools.extend(page.tools);
1561 cursor = page.next_cursor;
1562 if cursor.is_none() {
1563 break Ok(Arc::new(all_tools));
1564 }
1565 }
1566 Err(e) => break Err(Arc::new(e)),
1567 }
1568 };
1569 *guard = Some(result);
1570 }
1571
1572 /// Re-fetches all resources from the server, replacing the cached list.
1573 /// See [`ConnectionInner::refresh_tools`] for the callback timing
1574 /// contract.
1575 async fn refresh_resources(&self, on_change: Option<ListChangedCallback>) {
1576 // Same paginate-while-acquiring-the-write-lock pattern as
1577 // `refresh_tools` — see that comment for the visibility +
1578 // performance rationale.
1579 let (mut guard, result) =
1580 tokio::join!(self.resources.write(), self.paginate_resources(),);
1581 *guard = Some(result);
1582 self.resources_changed.notify_waiters();
1583 if let Some(cb) = on_change {
1584 cb();
1585 }
1586 }
1587
1588 async fn paginate_resources(
1589 &self,
1590 ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
1591 let mut all_resources = Vec::new();
1592 let mut cursor: Option<String> = None;
1593 loop {
1594 match self.rpc_list_resources(cursor.as_deref()).await {
1595 Ok(page) => {
1596 all_resources.extend(page.resources);
1597 cursor = page.next_cursor;
1598 if cursor.is_none() {
1599 return Ok(Arc::new(all_resources));
1600 }
1601 }
1602 Err(e) => return Err(Arc::new(e)),
1603 }
1604 }
1605 }
1606
1607 /// Resource counterpart of [`Self::refresh_tools_signaling`].
1608 async fn refresh_resources_signaling(
1609 &self,
1610 lock_held: tokio::sync::oneshot::Sender<()>,
1611 on_change: Option<ListChangedCallback>,
1612 ) {
1613 let mut guard = self.resources.write().await;
1614 // See `refresh_tools_signaling` — fire under the write lock,
1615 // before install, so subscribers' next read sees the post-swap
1616 // state.
1617 self.resources_changed.notify_waiters();
1618 let _ = lock_held.send(());
1619 if let Some(cb) = on_change {
1620 cb();
1621 }
1622 let mut all_resources = Vec::new();
1623 let mut cursor: Option<String> = None;
1624 let result = loop {
1625 match self.rpc_list_resources(cursor.as_deref()).await {
1626 Ok(page) => {
1627 all_resources.extend(page.resources);
1628 cursor = page.next_cursor;
1629 if cursor.is_none() {
1630 break Ok(Arc::new(all_resources));
1631 }
1632 }
1633 Err(e) => break Err(Arc::new(e)),
1634 }
1635 };
1636 *guard = Some(result);
1637 }
1638
1639 /// Builds a GET request to the MCP endpoint for receiving server
1640 /// notifications via SSE.
1641 async fn get(&self) -> reqwest::RequestBuilder {
1642 self.http_client
1643 .get(&self.url)
1644 .headers(
1645 self.build_request_headers(None, Some("text/event-stream"))
1646 .await,
1647 )
1648 }
1649
1650 /// Listens for `notifications/tools/list_changed` and
1651 /// `notifications/resources/list_changed` on an SSE stream. On each
1652 /// notification, write-locks and re-fetches the full list.
1653 ///
1654 /// `initial_lines` is the pre-opened SSE line reader handed in by
1655 /// [`Client::connect`](super::Client::connect) — that stream is
1656 /// consumed first. When it ends (or any later GET reconnect ends),
1657 /// we sleep `backoff_initial_interval` and open a fresh GET `/` SSE
1658 /// stream.
1659 ///
1660 /// Takes a [`Weak<Self>`] (not `Arc<Self>`) so the spawned task
1661 /// doesn't itself keep the [`Connection`] alive, and a
1662 /// [`CancellationToken`] sibling clone of the connection's
1663 /// [`DropGuard`] so the task tears down the instant the last
1664 /// external `Arc<ConnectionInner>` clone is dropped — every
1665 /// blocking await (line read, reconnect send, backoff sleep) is
1666 /// raced against `cancel.cancelled()` and exits without any zombie
1667 /// retries against a now-dead session.
1668 async fn listen_for_list_changes(
1669 weak: Weak<Self>,
1670 cancel: CancellationToken,
1671 initial_lines: super::LinesStream,
1672 ) {
1673 // First iteration: use the pre-opened SSE stream the client
1674 // handed us. After that, fall back to opening fresh GET / SSE
1675 // streams as the upstream connection cycles.
1676 let mut next_lines: Option<super::LinesStream> = Some(initial_lines);
1677 // One-shot guard for the catch-up refresh: false on the very
1678 // first iteration (the caller's pre-opened SSE stream — its
1679 // associated cache was just populated by `Client::connect`'s
1680 // initial pagination, so re-fetching there would just be a
1681 // wasted round-trip), true thereafter. Every stream end —
1682 // whether `Ok(None)` (clean close) or `Err(_)` (read failure)
1683 // — drops back here, which we treat as an implicit
1684 // list-changed notification: the upstream's broadcast (in
1685 // particular the proxy's per-session `tokio::broadcast`) is
1686 // lossy for moments when this listener has zero active
1687 // subscribers, so anything that fired during our disconnect
1688 // window may have been dropped.
1689 //
1690 // ORDER MATTERS. The refresh must run AFTER we've re-opened
1691 // the GET / SSE stream — i.e. after we're a subscriber again
1692 // — and BEFORE we enter the inner read loop. If we refreshed
1693 // before the resubscribe, a notification that fired between
1694 // our refresh-completion and our subscribe would be lost the
1695 // same way as the original disconnect-window drops; doing it
1696 // after means a notification fired DURING the refresh lands
1697 // in the new subscriber's buffer (broadcast::Sender::send
1698 // backs onto each receiver's channel-capacity slot) and gets
1699 // consumed by the inner loop on its next read.
1700 let mut is_reconnect = false;
1701
1702 loop {
1703 // The token cancels deterministically when the last
1704 // `Arc<ConnectionInner>` clone is dropped (see
1705 // `_listener_cancel_guard`). Check once per outer
1706 // iteration, but the real protection is the cancel arms in
1707 // every blocking await below — those exit immediately on
1708 // cancel.
1709 if cancel.is_cancelled() {
1710 return;
1711 }
1712 let Some(this) = weak.upgrade() else { return };
1713 let backoff_delay = this.backoff_initial_interval;
1714
1715 let mut lines = match next_lines.take() {
1716 Some(l) => l,
1717 None => {
1718 // Race the upstream GET against cancellation — if
1719 // the connection drops mid-reconnect, exit
1720 // immediately rather than waiting for the request
1721 // to complete or time out (otherwise produces a
1722 // burst of 401 retries against a now-dead session
1723 // under heavy churn).
1724 let send_outcome = tokio::select! {
1725 out = async {
1726 this.get().await.send().await
1727 } => out,
1728 _ = cancel.cancelled() => {
1729 drop(this);
1730 return;
1731 }
1732 };
1733 let response = match send_outcome {
1734 Ok(r) if r.status().is_success() => r,
1735 _ => {
1736 drop(this);
1737 // Sleep with cancel-arm: instant exit on
1738 // drop, no zombie retries.
1739 tokio::select! {
1740 _ = tokio::time::sleep(backoff_delay) => {}
1741 _ = cancel.cancelled() => return,
1742 }
1743 continue;
1744 }
1745 };
1746 super::lines_from_response(response)
1747 }
1748 };
1749
1750 // Catch-up refresh on every reconnect — the implicit
1751 // list-changed treatment for the just-failed stream. See
1752 // the `is_reconnect` doc-comment above for the
1753 // refresh-AFTER-resubscribe rationale.
1754 if is_reconnect {
1755 // tools and resources are independent locks; run the
1756 // catch-up refreshes concurrently so disconnect
1757 // recovery isn't sequential.
1758 let _ = tokio::join!(
1759 this.refresh_tools(this.on_tools_list_changed.get()),
1760 this.refresh_resources(
1761 this.on_resources_list_changed.get()
1762 ),
1763 );
1764 }
1765 is_reconnect = true;
1766
1767 'inner: loop {
1768 tokio::select! {
1769 line_result = lines.next_line() => {
1770 match line_result {
1771 Ok(Some(line)) => {
1772 // SSE data lines start with "data: ".
1773 let Some(data) = line.strip_prefix("data: ") else {
1774 continue 'inner;
1775 };
1776 let method = match serde_json::from_str::<super::JsonRpcNotification>(data) {
1777 Ok(n) => n.method,
1778 Err(_) => continue 'inner,
1779 };
1780 match method.as_str() {
1781 "notifications/tools/list_changed" => {
1782 // refresh_tools fires the
1783 // callback after the cache is
1784 // installed, so the proxy's
1785 // downstream
1786 // notifications/tools/list_changed
1787 // emission lines up with the
1788 // staleness window opening.
1789 this.refresh_tools(
1790 this.on_tools_list_changed.get(),
1791 )
1792 .await;
1793 }
1794 "notifications/resources/list_changed" => {
1795 this.refresh_resources(
1796 this.on_resources_list_changed.get(),
1797 )
1798 .await;
1799 }
1800 _ => {}
1801 }
1802 }
1803 // Stream ended cleanly or errored — break out
1804 // to the outer loop so we either reconnect or,
1805 // if everyone's gone, exit at the top.
1806 _ => break 'inner,
1807 }
1808 }
1809 // Cancellation: the connection's last clone has
1810 // dropped. Tear down immediately.
1811 _ = cancel.cancelled() => {
1812 drop(this);
1813 return;
1814 }
1815 }
1816 }
1817
1818 // Stream dropped — empty the per-Connection caches so the
1819 // next `list_tools` / `list_resources` paginates inline
1820 // against the (possibly still-dead) upstream rather than
1821 // returning whatever was cached before the drop. The
1822 // `is_reconnect` catch-up at the top of the next outer
1823 // iteration will repopulate `Some(_)` if the reconnect
1824 // succeeds; if the reconnect keeps failing, the cache
1825 // stays `None` and `list_*` callers paginate themselves.
1826 *this.tools.write().await = None;
1827 *this.resources.write().await = None;
1828
1829 // Stream ended — drop the strong ref before sleeping so the
1830 // next iteration's weak-upgrade can detect liveness honestly.
1831 drop(this);
1832 tokio::select! {
1833 _ = tokio::time::sleep(backoff_delay) => {}
1834 _ = cancel.cancelled() => return,
1835 }
1836 }
1837 }
1838}
1839
1840impl Drop for ConnectionInner {
1841 /// Orphan-DELETE hook: when a **freshly-minted** connection (not a
1842 /// resume) is dropped without any deliberate use — no `call_tool`,
1843 /// no `read_resource`, no explicit `Connection::delete` — spawn a
1844 /// fire-and-forget HTTP DELETE so the upstream session we just
1845 /// opened doesn't sit there accruing per-session state for nothing.
1846 ///
1847 /// Reconnect-resumes are deliberately excluded: a reconnect's
1848 /// `any_calls == false` only means *this* `Connection` instance
1849 /// never called anything — the underlying upstream session may
1850 /// well have been used by an earlier `Connection` that opened it,
1851 /// did real work, and let us re-attach. Tearing it down here would
1852 /// kill a still-live session out from under whoever owns it.
1853 /// Reconnects rely on the proxy's explicit `Connection::delete`
1854 /// (or `Client::delete`) for upstream cleanup instead.
1855 ///
1856 /// Skip conditions (any one triggers a no-op):
1857 ///
1858 /// - `mock` is `true` — there was never an HTTP session to begin with.
1859 /// - `is_reconnect` is `true` — see above; the upstream session
1860 /// pre-existed this connection and isn't ours to tear down on drop.
1861 /// - `any_calls` is `true` — the connection was deliberately used,
1862 /// or an explicit `Connection::delete` already ran.
1863 /// - No tokio runtime is in scope — `tokio::spawn` would panic.
1864 /// Silently leak the upstream session in this case (sync
1865 /// teardown paths e.g. `cfg(test)` blocks not driven by tokio).
1866 ///
1867 /// The listener-cancel `DropGuard` inside `_listener_cancel_guard`
1868 /// fires automatically as part of this same `drop` call, so by the
1869 /// time the orphan DELETE goes out the listener task has already
1870 /// been told to cancel — no SSE/GET race with the upstream DELETE.
1871 fn drop(&mut self) {
1872 if self.mock {
1873 return;
1874 }
1875 if self.is_reconnect {
1876 return;
1877 }
1878 if self.any_calls.load(Ordering::Relaxed) {
1879 return;
1880 }
1881
1882 // Clone out the bits the orphan task needs. None of these are
1883 // big: `reqwest::Client` is itself an `Arc` bump,
1884 // `IndexMap<String, String>` is the per-connection header bag
1885 // (small), and the `String`s are the per-session id + URL.
1886 let http_client = self.http_client.clone();
1887 let url = self.url.clone();
1888 let session_id = self.session_id.clone();
1889 let headers = self.headers.clone();
1890 let timeout = self.call_timeout;
1891
1892 // Spawn only if a tokio runtime is in scope. `tokio::spawn`
1893 // panics outside one — sync teardown paths (e.g. a test that
1894 // builds a `Connection` and lets it drop on a non-async stack)
1895 // would crash without this guard.
1896 if let Ok(handle) = tokio::runtime::Handle::try_current() {
1897 handle.spawn(orphan_delete(
1898 http_client,
1899 url,
1900 session_id,
1901 headers,
1902 timeout,
1903 ));
1904 }
1905 }
1906}
1907
1908/// Fire-and-forget HTTP `DELETE` used by [`ConnectionInner::drop`] to
1909/// release a resumed upstream session that was never used. Mirrors
1910/// [`super::Connection::delete`]'s wire shape (same `Mcp-Session-Id`
1911/// header behavior, same header-loop with the explicit session id
1912/// winning over any `Mcp-Session-Id` entry in `headers`) but never
1913/// surfaces errors — there's no caller left to surface them to. The
1914/// `timeout` (sourced from the originating connection's `call_timeout`)
1915/// caps the request so a hanging upstream can't keep the spawned task
1916/// alive forever.
1917async fn orphan_delete(
1918 http_client: reqwest::Client,
1919 url: String,
1920 session_id: String,
1921 headers: IndexMap<String, String>,
1922 timeout: Duration,
1923) {
1924 let mut request = http_client
1925 .delete(&url)
1926 .timeout(timeout)
1927 .header("Mcp-Session-Id", &session_id);
1928 for (name, value) in &headers {
1929 if name.eq_ignore_ascii_case("Mcp-Session-Id") {
1930 continue;
1931 }
1932 request = request.header(name, value);
1933 }
1934 // Errors silently swallowed: no caller, and the listener-cancel
1935 // guard has already fired (it runs as part of the regular
1936 // `_listener_cancel_guard` drop inside the same `drop` call).
1937 let _ = request.send().await;
1938}
1939
1940#[cfg(test)]
1941mod subscribe_tests {
1942 use super::*;
1943 use crate::mcp::tool::{Tool, ToolSchemaObject, ToolSchemaType};
1944
1945 fn tool(name: &str) -> Tool {
1946 Tool {
1947 name: name.to_string(),
1948 title: None,
1949 description: None,
1950 icons: None,
1951 input_schema: ToolSchemaObject {
1952 r#type: ToolSchemaType::Object,
1953 properties: None,
1954 required: None,
1955 extra: IndexMap::new(),
1956 },
1957 output_schema: None,
1958 annotations: None,
1959 execution: None,
1960 _meta: None,
1961 }
1962 }
1963
1964 /// First read shows a different list — return immediately, never wait.
1965 #[tokio::test]
1966 async fn subscribe_tools_returns_immediately_when_cache_differs() {
1967 let conn = Connection::new_for_test("t".into(), "http://x".into());
1968 *conn.inner.tools.write().await = Some(Ok(Arc::new(vec![tool("a")])));
1969
1970 let start = std::time::Instant::now();
1971 let got = conn
1972 .subscribe_tools(&[tool("b")], Duration::from_secs(5))
1973 .await
1974 .unwrap();
1975 assert!(start.elapsed() < Duration::from_millis(100));
1976 assert_eq!(got.as_slice(), &[tool("a")]);
1977 }
1978
1979 /// Cached `Err` is treated as "different from any caller snapshot."
1980 #[tokio::test]
1981 async fn subscribe_tools_returns_err_immediately() {
1982 let conn = Connection::new_for_test("t".into(), "http://x".into());
1983 let err = super::super::Error::NoSessionId {
1984 url: "http://x".into(),
1985 body: String::new(),
1986 };
1987 *conn.inner.tools.write().await = Some(Err(Arc::new(err)));
1988
1989 let start = std::time::Instant::now();
1990 let got = conn.subscribe_tools(&[], Duration::from_secs(5)).await;
1991 assert!(start.elapsed() < Duration::from_millis(100));
1992 assert!(got.is_err());
1993 }
1994
1995 /// Cache equals snapshot, then a writer fires the notify under the
1996 /// write lock and installs a new list. The subscriber wakes, then its
1997 /// re-read blocks behind the writer's guard, observes the new list.
1998 #[tokio::test]
1999 async fn subscribe_tools_wakes_on_change_and_reads_post_swap() {
2000 let conn = Connection::new_for_test("t".into(), "http://x".into());
2001 *conn.inner.tools.write().await = Some(Ok(Arc::new(vec![tool("a")])));
2002
2003 let conn_for_subscriber = conn.clone();
2004 let subscriber = tokio::spawn(async move {
2005 conn_for_subscriber
2006 .subscribe_tools(&[tool("a")], Duration::from_secs(5))
2007 .await
2008 .unwrap()
2009 });
2010
2011 // Give the subscriber a moment to arm `notified()` and finish
2012 // its first read so it's parked on the timeout.
2013 tokio::time::sleep(Duration::from_millis(50)).await;
2014
2015 // Simulate `refresh_tools_signaling`: take the write lock, fire
2016 // `tools_changed` *while holding* the write lock, then install
2017 // the new value before releasing. This is exactly the ordering
2018 // that the real refresh path uses.
2019 {
2020 let mut guard = conn.inner.tools.write().await;
2021 conn.inner.tools_changed.notify_waiters();
2022 // Hold briefly to make absolutely sure the subscriber is
2023 // racing the read lock against our drop.
2024 tokio::time::sleep(Duration::from_millis(20)).await;
2025 *guard = Some(Ok(Arc::new(vec![tool("b")])));
2026 }
2027
2028 let got = tokio::time::timeout(Duration::from_secs(2), subscriber)
2029 .await
2030 .expect("subscriber returned in time")
2031 .expect("subscriber didn't panic");
2032 assert_eq!(got.as_slice(), &[tool("b")]);
2033 }
2034
2035 /// Cache equals snapshot, no notification arrives — timeout, return
2036 /// the still-equal list (not an error).
2037 #[tokio::test]
2038 async fn subscribe_tools_times_out_and_returns_unchanged_list() {
2039 let conn = Connection::new_for_test("t".into(), "http://x".into());
2040 *conn.inner.tools.write().await = Some(Ok(Arc::new(vec![tool("a")])));
2041
2042 let start = std::time::Instant::now();
2043 let got = conn
2044 .subscribe_tools(&[tool("a")], Duration::from_millis(50))
2045 .await
2046 .unwrap();
2047 let elapsed = start.elapsed();
2048 assert!(elapsed >= Duration::from_millis(40), "elapsed: {elapsed:?}");
2049 assert!(elapsed < Duration::from_millis(500), "elapsed: {elapsed:?}");
2050 assert_eq!(got.as_slice(), &[tool("a")]);
2051 }
2052
2053 /// Two concurrent subscribers both wake on a single notify_waiters
2054 /// and both observe the post-swap list.
2055 #[tokio::test]
2056 async fn subscribe_tools_supports_concurrent_subscribers() {
2057 let conn = Connection::new_for_test("t".into(), "http://x".into());
2058 *conn.inner.tools.write().await = Some(Ok(Arc::new(vec![tool("a")])));
2059
2060 let c1 = conn.clone();
2061 let c2 = conn.clone();
2062 let s1 = tokio::spawn(async move {
2063 c1.subscribe_tools(&[tool("a")], Duration::from_secs(5))
2064 .await
2065 .unwrap()
2066 });
2067 let s2 = tokio::spawn(async move {
2068 c2.subscribe_tools(&[tool("a")], Duration::from_secs(5))
2069 .await
2070 .unwrap()
2071 });
2072
2073 tokio::time::sleep(Duration::from_millis(50)).await;
2074
2075 {
2076 let mut guard = conn.inner.tools.write().await;
2077 conn.inner.tools_changed.notify_waiters();
2078 *guard = Some(Ok(Arc::new(vec![tool("c")])));
2079 }
2080
2081 let (r1, r2) = tokio::join!(s1, s2);
2082 let r1 = r1.unwrap();
2083 let r2 = r2.unwrap();
2084 assert_eq!(r1.as_slice(), &[tool("c")]);
2085 assert_eq!(r2.as_slice(), &[tool("c")]);
2086 }
2087}
2088
2089#[cfg(test)]
2090mod drain_notifications_tests {
2091 use super::*;
2092 use crate::mcp::tool::{ContentBlock, TextContent};
2093 use serde_json::json;
2094 use wiremock::matchers::{header, method, path};
2095 use wiremock::{Mock, MockServer, ResponseTemplate};
2096
2097 /// Happy path: proxy returns `[text, text]`, we parse it as two
2098 /// `ContentBlock::Text` and return them in order.
2099 #[tokio::test]
2100 async fn drain_notifications_parses_text_blocks_in_order() {
2101 let server = MockServer::start().await;
2102 Mock::given(method("GET"))
2103 .and(path("/notify"))
2104 .and(header("Mcp-Session-Id", ""))
2105 .respond_with(ResponseTemplate::new(200).set_body_json(json!([
2106 {"type": "text", "text": "first"},
2107 {"type": "text", "text": "second"},
2108 ])))
2109 .mount(&server)
2110 .await;
2111
2112 let conn = Connection::new_for_test("t".into(), server.uri());
2113 let blocks = conn.drain_notifications().await.expect("drain ok");
2114 assert_eq!(blocks.len(), 2);
2115 match &blocks[0] {
2116 ContentBlock::Text(TextContent { text, .. }) => {
2117 assert_eq!(text, "first")
2118 }
2119 other => panic!("expected text, got {other:?}"),
2120 }
2121 match &blocks[1] {
2122 ContentBlock::Text(TextContent { text, .. }) => {
2123 assert_eq!(text, "second")
2124 }
2125 other => panic!("expected text, got {other:?}"),
2126 }
2127 }
2128
2129 /// 404 (proxy lost the session, e.g. after a restart) → empty vec
2130 /// rather than an error. The next upstream call will surface the
2131 /// session-lost condition through its own error path; init-time
2132 /// drain shouldn't be the one to abort the request.
2133 #[tokio::test]
2134 async fn drain_notifications_404_returns_empty() {
2135 let server = MockServer::start().await;
2136 Mock::given(method("GET"))
2137 .and(path("/notify"))
2138 .respond_with(ResponseTemplate::new(404))
2139 .mount(&server)
2140 .await;
2141
2142 let conn = Connection::new_for_test("t".into(), server.uri());
2143 let blocks = conn.drain_notifications().await.expect("404 → ok(empty)");
2144 assert!(blocks.is_empty(), "expected empty vec, got {blocks:?}");
2145 }
2146
2147 /// Empty queue → empty array → empty vec. The most common case.
2148 #[tokio::test]
2149 async fn drain_notifications_empty_queue_returns_empty() {
2150 let server = MockServer::start().await;
2151 Mock::given(method("GET"))
2152 .and(path("/notify"))
2153 .respond_with(ResponseTemplate::new(200).set_body_json(json!([])))
2154 .mount(&server)
2155 .await;
2156
2157 let conn = Connection::new_for_test("t".into(), server.uri());
2158 let blocks = conn.drain_notifications().await.expect("drain ok");
2159 assert!(blocks.is_empty(), "expected empty vec, got {blocks:?}");
2160 }
2161
2162 /// Non-success / non-404 status propagates as `BadStatus`.
2163 #[tokio::test]
2164 async fn drain_notifications_5xx_returns_bad_status() {
2165 let server = MockServer::start().await;
2166 Mock::given(method("GET"))
2167 .and(path("/notify"))
2168 .respond_with(ResponseTemplate::new(500).set_body_string("boom"))
2169 .mount(&server)
2170 .await;
2171
2172 let conn = Connection::new_for_test("t".into(), server.uri());
2173 let err = conn.drain_notifications().await.expect_err("5xx → err");
2174 match err {
2175 super::super::Error::BadStatus { code, body, .. } => {
2176 assert_eq!(code.as_u16(), 500);
2177 assert_eq!(body, "boom");
2178 }
2179 other => panic!("expected BadStatus, got {other:?}"),
2180 }
2181 }
2182
2183 /// Mock connections never hit the network and always return empty.
2184 #[tokio::test]
2185 async fn drain_notifications_mock_returns_empty() {
2186 let conn = Connection::new_mock("http://does-not-matter".into());
2187 let blocks = conn.drain_notifications().await.expect("mock ok");
2188 assert!(blocks.is_empty());
2189 }
2190}
2191
2192#[cfg(test)]
2193mod has_pending_notifications_tests {
2194 use super::*;
2195 use serde_json::json;
2196 use wiremock::matchers::{header, method, path};
2197 use wiremock::{Mock, MockServer, ResponseTemplate};
2198
2199 /// Happy path: proxy returns `true` → Ok(true).
2200 #[tokio::test]
2201 async fn has_pending_notifications_true() {
2202 let server = MockServer::start().await;
2203 Mock::given(method("GET"))
2204 .and(path("/notify/queued"))
2205 .and(header("Mcp-Session-Id", ""))
2206 .respond_with(ResponseTemplate::new(200).set_body_json(json!(true)))
2207 .mount(&server)
2208 .await;
2209
2210 let conn = Connection::new_for_test("t".into(), server.uri());
2211 let got = conn.has_pending_notifications().await.expect("peek ok");
2212 assert!(got);
2213 }
2214
2215 /// Proxy returns `false` → Ok(false).
2216 #[tokio::test]
2217 async fn has_pending_notifications_false() {
2218 let server = MockServer::start().await;
2219 Mock::given(method("GET"))
2220 .and(path("/notify/queued"))
2221 .respond_with(
2222 ResponseTemplate::new(200).set_body_json(json!(false)),
2223 )
2224 .mount(&server)
2225 .await;
2226
2227 let conn = Connection::new_for_test("t".into(), server.uri());
2228 let got = conn.has_pending_notifications().await.expect("peek ok");
2229 assert!(!got);
2230 }
2231
2232 /// 404 (proxy lost the session) → Ok(false). Same soft-fallback
2233 /// contract as drain_notifications — peek must never abort a
2234 /// request over a missing-session race.
2235 #[tokio::test]
2236 async fn has_pending_notifications_404_returns_false() {
2237 let server = MockServer::start().await;
2238 Mock::given(method("GET"))
2239 .and(path("/notify/queued"))
2240 .respond_with(ResponseTemplate::new(404))
2241 .mount(&server)
2242 .await;
2243
2244 let conn = Connection::new_for_test("t".into(), server.uri());
2245 let got = conn
2246 .has_pending_notifications()
2247 .await
2248 .expect("404 → ok(false)");
2249 assert!(!got);
2250 }
2251
2252 /// Non-success / non-404 status propagates as `BadStatus`.
2253 #[tokio::test]
2254 async fn has_pending_notifications_5xx_returns_bad_status() {
2255 let server = MockServer::start().await;
2256 Mock::given(method("GET"))
2257 .and(path("/notify/queued"))
2258 .respond_with(ResponseTemplate::new(500).set_body_string("boom"))
2259 .mount(&server)
2260 .await;
2261
2262 let conn = Connection::new_for_test("t".into(), server.uri());
2263 let err = conn
2264 .has_pending_notifications()
2265 .await
2266 .expect_err("5xx → err");
2267 match err {
2268 super::super::Error::BadStatus { code, body, .. } => {
2269 assert_eq!(code.as_u16(), 500);
2270 assert_eq!(body, "boom");
2271 }
2272 other => panic!("expected BadStatus, got {other:?}"),
2273 }
2274 }
2275
2276 /// Mock connections never hit the network and always return false.
2277 #[tokio::test]
2278 async fn has_pending_notifications_mock_returns_false() {
2279 let conn = Connection::new_mock("http://does-not-matter".into());
2280 let got = conn.has_pending_notifications().await.expect("mock ok");
2281 assert!(!got);
2282 }
2283}