objectiveai_sdk/mcp/connection.rs
1//! MCP connection for communicating with an MCP server.
2//!
3//! [`Connection`] is a cheaply-clonable handle around an internal
4//! [`ConnectionInner`]. The last drop of the inner `Arc` runs
5//! [`ConnectionInner`]'s `Drop`, which cancels the listener task's
6//! [`tokio_util::sync::CancellationToken`] (held in `_listener_cancel_guard`
7//! as a [`tokio_util::sync::DropGuard`]) — the SSE listener exits the
8//! instant any in-flight reconnect, sleep, or read is cancelled, with no
9//! zombie 401 retries against a now-dead proxy session.
10
11use std::ops::Deref;
12use std::sync::{Arc, RwLock as StdRwLock, Weak};
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::time::Duration;
15
16use indexmap::IndexMap;
17use tokio::sync::{Notify, RwLock};
18use tokio_util::sync::{CancellationToken, DropGuard};
19
20/// Callback fired by [`Connection`] when the upstream MCP server emits
21/// `notifications/tools/list_changed` or `notifications/resources/list_changed`.
22///
23/// **Timing:** runs after the corresponding cache's write lock is taken
24/// but *before* the network paginate that replaces it. That ordering
25/// matches the moment the staleness window opens — anyone blocked on the
26/// read lock won't return until the new list lands. The callback should
27/// not call back into `list_tools` / `list_resources`: doing so would
28/// re-take the lock the listener already holds and deadlock.
29///
30/// Stored behind an `Arc` so the listener task can cheaply clone it out
31/// of the lock and call it without holding the read guard.
32pub type ListChangedCallback = Arc<dyn Fn() + Send + Sync + 'static>;
33
34/// A registered-or-not callback slot. Wrapper so [`ConnectionInner`] can
35/// keep `#[derive(Debug)]` (a raw `dyn Fn` isn't `Debug`).
36struct CallbackSlot(StdRwLock<Option<ListChangedCallback>>);
37
38impl CallbackSlot {
39 fn new() -> Self {
40 Self(StdRwLock::new(None))
41 }
42
43 fn set(&self, callback: ListChangedCallback) {
44 *self.0.write().unwrap() = Some(callback);
45 }
46
47 /// Cheap clone-out of the current callback (if any). The `Arc` clone
48 /// lets us release the read guard before invoking the callback.
49 fn get(&self) -> Option<ListChangedCallback> {
50 self.0.read().unwrap().clone()
51 }
52}
53
54impl std::fmt::Debug for CallbackSlot {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 let set = self.0.read().map(|g| g.is_some()).unwrap_or(false);
57 f.debug_struct("CallbackSlot").field("set", &set).finish()
58 }
59}
60
61/// An active connection to an MCP server using the Streamable HTTP transport.
62///
63/// Cheaply clonable (one `Arc` bump). When the last clone is dropped, the
64/// inner `Arc` ref count hits zero, [`ConnectionInner::Drop`] runs, the
65/// listener-cancel `DropGuard` is dropped, and the SSE listener task is
66/// cancelled — exiting any in-flight `lines.next_line()`, reconnect
67/// `send()`, or backoff `sleep` *immediately* without retrying against
68/// the now-dead proxy session.
69///
70/// Use the public methods (`list_tools`, `call_tool`, `list_resources`,
71/// `read_resource`, `call_tool_as_message`, `tool_key`) for the upstream
72/// MCP protocol surface. The inner state ([`ConnectionInner`]) is also
73/// reachable via `Deref` for read-only field access (e.g.
74/// `connection.url`, `connection.initialize_result.server_info.name`),
75/// but its methods are private — you must go through `Connection`.
76#[derive(Debug)]
77pub struct Connection {
78 inner: Arc<ConnectionInner>,
79}
80
81impl Clone for Connection {
82 fn clone(&self) -> Self {
83 Self { inner: Arc::clone(&self.inner) }
84 }
85}
86
87// No `Drop` for `Connection`: cancellation happens deterministically
88// when the last `Arc<ConnectionInner>` clone is dropped, which runs
89// `ConnectionInner::drop` and releases the cancel-token DropGuard.
90
91impl Deref for Connection {
92 type Target = ConnectionInner;
93 fn deref(&self) -> &ConnectionInner {
94 &self.inner
95 }
96}
97
98impl Connection {
99 pub(super) async fn new(
100 http_client: reqwest::Client,
101 url: String,
102 session_id: String,
103 headers: IndexMap<String, String>,
104 backoff_current_interval: Duration,
105 backoff_initial_interval: Duration,
106 backoff_randomization_factor: f64,
107 backoff_multiplier: f64,
108 backoff_max_interval: Duration,
109 backoff_max_elapsed_time: Duration,
110 call_timeout: Duration,
111 initialize_result: super::initialize_result::InitializeResult,
112 initial_sse_lines: Option<super::LinesStream>,
113 ) -> Self {
114 let inner = ConnectionInner::new(
115 http_client,
116 url,
117 session_id,
118 headers,
119 backoff_current_interval,
120 backoff_initial_interval,
121 backoff_randomization_factor,
122 backoff_multiplier,
123 backoff_max_interval,
124 backoff_max_elapsed_time,
125 call_timeout,
126 initialize_result,
127 initial_sse_lines,
128 )
129 .await;
130 Self { inner }
131 }
132
133
134 pub(super) fn new_mock(url: String) -> Self {
135 Self { inner: ConnectionInner::new_mock(url) }
136 }
137
138 #[cfg(test)]
139 pub(crate) fn new_for_test(name: String, url: String) -> Self {
140 Self { inner: ConnectionInner::new_for_test(name, url) }
141 }
142
143 /// Send a JSON-RPC notification to the upstream. Used by `Client`
144 /// right after `initialize` to send `notifications/initialized`.
145 pub(super) async fn notify<P: serde::Serialize>(
146 &self,
147 method: &str,
148 params: &P,
149 ) -> Result<(), super::Error> {
150 self.inner.notify(method, params).await
151 }
152
153 /// Returns a key identifying this connection for tool namespacing.
154 pub fn tool_key(&self) -> String {
155 self.inner.tool_key()
156 }
157
158 /// Returns the session ID for this connection.
159 pub fn session_id(&self) -> &str {
160 self.inner.session_id()
161 }
162
163 /// Returns all tools from the upstream server.
164 pub async fn list_tools(
165 &self,
166 ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
167 self.inner.list_tools().await
168 }
169
170 /// Calls a tool on the upstream server.
171 pub async fn call_tool(
172 &self,
173 params: &super::tool::CallToolRequestParams,
174 ) -> Result<super::tool::CallToolResult, super::Error> {
175 self.inner.call_tool(params).await
176 }
177
178 /// Calls a tool and converts the result into a [`ToolMessage`].
179 pub async fn call_tool_as_message(
180 &self,
181 params: &super::tool::CallToolRequestParams,
182 tool_call_id: String,
183 ) -> Result<
184 crate::agent::completions::message::ToolMessage,
185 super::Error,
186 > {
187 self.inner.call_tool_as_message(params, tool_call_id).await
188 }
189
190 /// Returns all resources from the upstream server.
191 pub async fn list_resources(
192 &self,
193 ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
194 self.inner.list_resources().await
195 }
196
197 /// Returns the cached tool list as soon as it differs from `current`,
198 /// or waits up to `timeout` for the next `notifications/tools/list_changed`
199 /// from the upstream server before re-reading.
200 ///
201 /// Wakes the moment a refresh writer takes the cache write lock, so
202 /// the post-wake `read` is guaranteed to observe the new list rather
203 /// than racing against the install. Safe to call from any number of
204 /// tasks concurrently.
205 pub async fn subscribe_tools(
206 &self,
207 current: &[super::tool::Tool],
208 timeout: Duration,
209 ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
210 self.inner.subscribe_tools(current, timeout).await
211 }
212
213 /// Resource counterpart of [`Connection::subscribe_tools`].
214 pub async fn subscribe_resources(
215 &self,
216 current: &[super::resource::Resource],
217 timeout: Duration,
218 ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
219 self.inner.subscribe_resources(current, timeout).await
220 }
221
222 /// Atomically drain the proxy's `pending_notifications` queue for
223 /// this session via `GET /notify` and return the queued content
224 /// blocks. A second call returns `[]` until the next out-of-band
225 /// `POST /notify`.
226 ///
227 /// Intended for use at the start of an agent turn so notifications
228 /// queued between turns — when the prior turn ended without a tool
229 /// call, or the user is starting a fresh continuation — surface as
230 /// a user message instead of being lost. The proxy's existing
231 /// `tools/call` response path still drains in-flight notifications
232 /// arriving *during* a turn; this method covers the gap between
233 /// turns.
234 ///
235 /// A 404 from the proxy (session unknown — possible after a proxy
236 /// restart) is mapped to an empty `Vec` so callers do not need to
237 /// distinguish "no notifications" from "lost session" at the use
238 /// site; the next upstream call will surface the lost-session
239 /// condition through its own error path.
240 pub async fn drain_notifications(
241 &self,
242 ) -> Result<Vec<super::tool::ContentBlock>, super::Error> {
243 self.inner.drain_notifications().await
244 }
245
246 /// Reads a resource from the upstream server.
247 pub async fn read_resource(
248 &self,
249 uri: &str,
250 ) -> Result<super::resource::ReadResourceResult, super::Error> {
251 self.inner.read_resource(uri).await
252 }
253
254 /// Register a callback to fire whenever the upstream emits
255 /// `notifications/tools/list_changed`.
256 ///
257 /// **Timing:** the callback runs *after* the tool cache's write lock
258 /// is acquired but *before* the network paginate that replaces it.
259 /// That means readers blocked on the read lock won't return until the
260 /// new list is in place, and the callback observes the moment the
261 /// staleness window opens. The proxy uses this to emit its own
262 /// `notifications/tools/list_changed` to downstream clients at the
263 /// right instant.
264 ///
265 /// Replaces any previously-registered tools-list-changed callback.
266 /// All clones of this `Connection` share the same callback slot.
267 pub fn set_on_tools_list_changed<F>(&self, callback: F)
268 where
269 F: Fn() + Send + Sync + 'static,
270 {
271 self.inner.on_tools_list_changed.set(Arc::new(callback));
272 }
273
274 /// Register a callback to fire whenever the upstream emits
275 /// `notifications/resources/list_changed`. Same timing contract as
276 /// [`Connection::set_on_tools_list_changed`].
277 ///
278 /// Replaces any previously-registered resources-list-changed callback.
279 /// All clones of this `Connection` share the same callback slot.
280 pub fn set_on_resources_list_changed<F>(&self, callback: F)
281 where
282 F: Fn() + Send + Sync + 'static,
283 {
284 self.inner.on_resources_list_changed.set(Arc::new(callback));
285 }
286}
287
288/// The actual connection state. Behind an `Arc` inside [`Connection`].
289///
290/// Fields are public for read-only access (callers reach them via
291/// `Connection`'s `Deref`), but every method on this type is private —
292/// the public surface lives on [`Connection`] and delegates through.
293#[derive(Debug)]
294pub struct ConnectionInner {
295 pub http_client: reqwest::Client,
296 pub url: String,
297 pub session_id: String,
298 /// All HTTP headers stamped on every POST / GET this connection
299 /// makes — the same merged map (defaults + caller overrides) the
300 /// `Client` built once during connect. `Mcp-Session-Id`,
301 /// `Content-Type`, and `Accept` are still set by the request
302 /// builders and override anything in `headers`.
303 pub headers: IndexMap<String, String>,
304
305 pub backoff_current_interval: Duration,
306 pub backoff_initial_interval: Duration,
307 pub backoff_randomization_factor: f64,
308 pub backoff_multiplier: f64,
309 pub backoff_max_interval: Duration,
310 pub backoff_max_elapsed_time: Duration,
311 pub call_timeout: Duration,
312
313 /// The server's capabilities and info from the initialize response.
314 pub initialize_result: super::initialize_result::InitializeResult,
315
316 /// If true, all RPC/notify calls are no-ops. Used for mock orchestrator URLs.
317 mock: bool,
318
319 /// Auto-incrementing request ID (starts at 2; 1 was used for initialize).
320 next_id: AtomicU64,
321
322 /// All tools from the server, populated by background pagination.
323 tools: RwLock<Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>>>,
324 /// All resources from the server, populated by background pagination.
325 resources:
326 RwLock<Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>>>,
327
328 /// Cancellation token for the long-lived `listen_for_list_changes`
329 /// task. The listener selects this against every blocking await
330 /// (read, reconnect-send, backoff-sleep) and returns the instant it
331 /// fires.
332 ///
333 /// Held inside the connection as a [`DropGuard`] so that the moment
334 /// the last `Arc<ConnectionInner>` clone is dropped — i.e. the
335 /// moment no external `Connection` handle remains — `Drop` runs on
336 /// the guard, the token cancels, and the listener task tears down.
337 /// The listener itself holds a sibling `CancellationToken` (clone),
338 /// not the guard, so its task does not extend the connection's
339 /// lifetime.
340 _listener_cancel_guard: Option<DropGuard>,
341
342 /// Optional callback fired *after* the listener has refreshed the
343 /// tool cache in response to an upstream `notifications/tools/list_changed`.
344 /// Set via [`Connection::set_on_tools_list_changed`].
345 on_tools_list_changed: CallbackSlot,
346
347 /// Optional callback fired *after* the listener has refreshed the
348 /// resource cache in response to an upstream
349 /// `notifications/resources/list_changed`.
350 /// Set via [`Connection::set_on_resources_list_changed`].
351 on_resources_list_changed: CallbackSlot,
352
353 /// Wakes any task awaiting in [`Connection::subscribe_tools`]. Fired
354 /// from inside `refresh_tools_signaling` the moment the writer
355 /// acquires the cache write lock — *before* the new list is
356 /// installed. A woken subscriber's next `read().await` blocks behind
357 /// the writer's guard, so it always observes the post-swap state.
358 tools_changed: Notify,
359
360 /// Resource counterpart of [`Self::tools_changed`].
361 resources_changed: Notify,
362}
363
364impl ConnectionInner {
365 /// Creates a mock connection that never makes network requests.
366 /// All RPC calls return empty/default results.
367 fn new_mock(url: String) -> Arc<Self> {
368 Arc::new(Self {
369 http_client: reqwest::Client::new(),
370 url,
371 session_id: String::new(),
372 headers: IndexMap::new(),
373 backoff_current_interval: Duration::ZERO,
374 backoff_initial_interval: Duration::ZERO,
375 backoff_randomization_factor: 0.0,
376 backoff_multiplier: 1.0,
377 backoff_max_interval: Duration::ZERO,
378 backoff_max_elapsed_time: Duration::ZERO,
379 call_timeout: Duration::ZERO,
380 initialize_result: super::initialize_result::InitializeResult {
381 protocol_version: "2025-03-26".into(),
382 capabilities: super::initialize_result::ServerCapabilities {
383 experimental: None,
384 logging: None,
385 completions: None,
386 prompts: None,
387 resources: None,
388 tools: None,
389 tasks: None,
390 },
391 server_info: super::initialize_result::Implementation {
392 name: "mock".into(),
393 title: None,
394 version: "0.0.0".into(),
395 website_url: None,
396 description: None,
397 icons: None,
398 },
399 instructions: None,
400 _meta: None,
401 },
402 mock: true,
403 next_id: AtomicU64::new(2),
404 tools: RwLock::new(Ok(Arc::new(Vec::new()))),
405 resources: RwLock::new(Ok(Arc::new(Vec::new()))),
406 _listener_cancel_guard: None,
407 on_tools_list_changed: CallbackSlot::new(),
408 on_resources_list_changed: CallbackSlot::new(),
409 tools_changed: Notify::new(),
410 resources_changed: Notify::new(),
411 })
412 }
413
414 /// Creates a minimal connection for unit testing.
415 #[cfg(test)]
416 fn new_for_test(name: String, url: String) -> Arc<Self> {
417 Arc::new(Self {
418 http_client: reqwest::Client::new(),
419 url,
420 session_id: String::new(),
421 headers: IndexMap::new(),
422 backoff_current_interval: Duration::from_millis(500),
423 backoff_initial_interval: Duration::from_millis(500),
424 backoff_randomization_factor: 0.5,
425 backoff_multiplier: 1.5,
426 backoff_max_interval: Duration::from_secs(60),
427 backoff_max_elapsed_time: Duration::from_secs(900),
428 call_timeout: Duration::from_secs(30),
429 initialize_result: super::initialize_result::InitializeResult {
430 protocol_version: "2025-03-26".into(),
431 capabilities:
432 super::initialize_result::ServerCapabilities {
433 experimental: None,
434 logging: None,
435 completions: None,
436 prompts: None,
437 resources: None,
438 tools: None,
439 tasks: None,
440 },
441 server_info: super::initialize_result::Implementation {
442 name,
443 title: None,
444 version: "0.0.0".into(),
445 website_url: None,
446 description: None,
447 icons: None,
448 },
449 instructions: None,
450 _meta: None,
451 },
452 mock: false,
453 next_id: AtomicU64::new(2),
454 tools: RwLock::new(Ok(Arc::new(Vec::new()))),
455 resources: RwLock::new(Ok(Arc::new(Vec::new()))),
456 _listener_cancel_guard: None,
457 on_tools_list_changed: CallbackSlot::new(),
458 on_resources_list_changed: CallbackSlot::new(),
459 tools_changed: Notify::new(),
460 resources_changed: Notify::new(),
461 })
462 }
463
464 /// Creates a new connection and spawns background tasks to paginate
465 /// all tools and resources. Called internally by
466 /// [`Client::connect`](super::Client::connect) (via [`Connection::new`]).
467 ///
468 /// `initial_sse_lines`, if `Some`, is a pre-opened SSE line reader
469 /// that the list-changed listener will read from immediately on its
470 /// first iteration, instead of opening its own GET `/`. The caller
471 /// is responsible for arranging for one of these to exist whenever
472 /// the upstream advertises `tools.list_changed` or
473 /// `resources.list_changed` — see
474 /// [`Client::connect`](super::Client::connect).
475 async fn new(
476 http_client: reqwest::Client,
477 url: String,
478 session_id: String,
479 headers: IndexMap<String, String>,
480 backoff_current_interval: Duration,
481 backoff_initial_interval: Duration,
482 backoff_randomization_factor: f64,
483 backoff_multiplier: f64,
484 backoff_max_interval: Duration,
485 backoff_max_elapsed_time: Duration,
486 call_timeout: Duration,
487 initialize_result: super::initialize_result::InitializeResult,
488 initial_sse_lines: Option<super::LinesStream>,
489 ) -> Arc<Self> {
490 // Cancel-the-listener machinery: store the DropGuard inside the
491 // inner so the cancellation fires deterministically when the
492 // last external `Arc<ConnectionInner>` clone drops. Hand the
493 // listener task a sibling clone (no guard) — that way the
494 // listener task's lifetime does not extend the connection.
495 let listener_cancel = CancellationToken::new();
496 let listener_cancel_for_task = listener_cancel.clone();
497 let conn = Arc::new(Self {
498 http_client,
499 url,
500 session_id,
501 headers,
502 backoff_current_interval,
503 backoff_initial_interval,
504 backoff_randomization_factor,
505 backoff_multiplier,
506 backoff_max_interval,
507 backoff_max_elapsed_time,
508 call_timeout,
509 initialize_result,
510 mock: false,
511 next_id: AtomicU64::new(2),
512 tools: RwLock::new(Ok(Arc::new(Vec::new()))),
513 resources: RwLock::new(Ok(Arc::new(Vec::new()))),
514 _listener_cancel_guard: Some(listener_cancel.drop_guard()),
515 on_tools_list_changed: CallbackSlot::new(),
516 on_resources_list_changed: CallbackSlot::new(),
517 tools_changed: Notify::new(),
518 resources_changed: Notify::new(),
519 });
520
521 // Spawn background tool lister if the server supports tools.
522 //
523 // We don't return until the spawned task has acquired the write
524 // lock. Otherwise a caller that immediately reads `list_tools()`
525 // could race the writer — `tokio::spawn` only queues the task,
526 // and a fast reader can acquire the read lock before the writer
527 // has run its first instruction. The reader would then see the
528 // initial empty `Vec` and return that, even though a full
529 // populate is in flight.
530 //
531 // The `RwLockWriteGuard` itself isn't `Send`-friendly enough to
532 // pass back, so we use a oneshot to signal "I'm holding the
533 // lock now"; once we receive that, the cache is exclusively
534 // owned by the writer and any subsequent `read().await` from
535 // the caller is guaranteed to wait for the populate to finish.
536 if conn.initialize_result.capabilities.tools.is_some() {
537 let conn = Arc::clone(&conn);
538 let (lock_held_tx, lock_held_rx) = tokio::sync::oneshot::channel();
539 tokio::spawn(async move {
540 conn.refresh_tools_signaling(lock_held_tx, None).await;
541 });
542 // Wait for the writer to hold the lock before returning.
543 let _ = lock_held_rx.await;
544 }
545
546 // Spawn background resource lister if the server supports
547 // resources. Same lock-handoff contract as tools above.
548 if conn.initialize_result.capabilities.resources.is_some() {
549 let conn = Arc::clone(&conn);
550 let (lock_held_tx, lock_held_rx) = tokio::sync::oneshot::channel();
551 tokio::spawn(async move {
552 conn.refresh_resources_signaling(lock_held_tx, None).await;
553 });
554 let _ = lock_held_rx.await;
555 }
556
557 // Spawn the list-changed listener iff the caller handed us a
558 // pre-opened SSE stream. The connection is naive about
559 // `tools.list_changed` / `resources.list_changed` capabilities —
560 // [`Client::connect`](super::Client::connect) translates them
561 // into "did or didn't open a stream for us." If we get a stream,
562 // we listen on it; if we don't, there's nothing to listen for.
563 if let Some(initial_lines) = initial_sse_lines {
564 // Hand the listener a `Weak` so the spawned task itself does
565 // not keep the connection alive. `listener_cancel_for_task`
566 // is a sibling clone of the connection's own
567 // `_listener_cancel_guard` token — when the last external
568 // `Arc<ConnectionInner>` clone is dropped, the inner's Drop
569 // releases the guard and the listener wakes from any
570 // pending await (read, send, sleep) and exits immediately.
571 let weak = Arc::downgrade(&conn);
572 tokio::spawn(async move {
573 Self::listen_for_list_changes(
574 weak,
575 listener_cancel_for_task,
576 initial_lines,
577 )
578 .await;
579 });
580 }
581
582 conn
583 }
584
585 /// Creates an exponential backoff configuration from the connection's fields.
586 fn backoff(&self) -> backoff::ExponentialBackoff {
587 backoff::ExponentialBackoff {
588 current_interval: self.backoff_current_interval,
589 initial_interval: self.backoff_initial_interval,
590 randomization_factor: self.backoff_randomization_factor,
591 multiplier: self.backoff_multiplier,
592 max_interval: self.backoff_max_interval,
593 start_time: std::time::Instant::now(),
594 max_elapsed_time: Some(self.backoff_max_elapsed_time),
595 clock: backoff::SystemClock::default(),
596 }
597 }
598
599 /// Builds a POST request with all required headers and the call timeout.
600 fn post(&self) -> reqwest::RequestBuilder {
601 let mut request = self
602 .http_client
603 .post(&self.url)
604 .timeout(self.call_timeout)
605 .header("Content-Type", "application/json")
606 .header("Accept", "application/json, text/event-stream");
607 for (name, value) in &self.headers {
608 request = request.header(name, value);
609 }
610 // Mcp-Session-Id is applied last so a same-named entry in
611 // `headers` (e.g. the proxy's encoded session id) can never
612 // override the connection's own session id.
613 request = request.header("Mcp-Session-Id", &self.session_id);
614 request
615 }
616
617 /// Sends a JSON-RPC request, retrying transient errors when
618 /// `idempotent` is `true`.
619 ///
620 /// Idempotent methods (`tools/list`, `resources/list`,
621 /// `resources/read`, etc.) retry every transient error — network,
622 /// HTTP status, malformed body, JSON-RPC error, session expiration —
623 /// until the backoff's `max_elapsed_time` is exceeded.
624 ///
625 /// Non-idempotent methods (`tools/call`) make exactly one attempt.
626 /// Retrying a `tools/call` is unsafe: a tool may have mutated remote
627 /// state during the first attempt before the response was lost, and
628 /// re-firing the call would mutate state again. Each retry of
629 /// `AppendTask` advances `state.tasks.len()` an extra step, so the
630 /// agent sees a different return value than expected and the
631 /// pid-derived mock seed at the next step diverges. See
632 /// `objectiveai-api/src/agent/completions/client.rs` (sequential
633 /// dispatch) and `mock/client.rs::mock.seed_derive` for the
634 /// downstream consequence.
635 async fn rpc<P: serde::Serialize, R: serde::de::DeserializeOwned>(
636 &self,
637 method: &str,
638 params: &P,
639 idempotent: bool,
640 ) -> Result<R, super::Error> {
641 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
642 let body = serde_json::json!({
643 "jsonrpc": "2.0",
644 "id": id,
645 "method": method,
646 "params": params,
647 });
648
649 let attempt_one = || async {
650 let url = self.url.clone();
651 let response = self.post().json(&body).send().await.map_err(|source| {
652 backoff::Error::transient(super::Error::Request {
653 url: url.clone(),
654 source,
655 })
656 })?;
657
658 if response.status() == reqwest::StatusCode::NOT_FOUND {
659 return Err(backoff::Error::transient(
660 super::Error::SessionExpired { url: url.clone() },
661 ));
662 }
663 if !response.status().is_success() {
664 let code = response.status();
665 let body = response.text().await.unwrap_or_default();
666 return Err(backoff::Error::transient(
667 super::Error::BadStatus { url: url.clone(), code, body },
668 ));
669 }
670
671 let rpc_response: super::JsonRpcResponse<R> =
672 super::parse_streamable_http_response(&url, response)
673 .await
674 .map_err(backoff::Error::transient)?;
675
676 match rpc_response {
677 super::JsonRpcResponse::Success { result, .. } => Ok(result),
678 super::JsonRpcResponse::Error { error, .. } => {
679 Err(backoff::Error::transient(super::Error::JsonRpc {
680 url: url.clone(),
681 code: error.code,
682 message: error.message,
683 data: error.data,
684 }))
685 }
686 }
687 };
688
689 if idempotent {
690 backoff::future::retry(self.backoff(), attempt_one).await
691 } else {
692 attempt_one().await.map_err(|e| match e {
693 backoff::Error::Permanent(err) | backoff::Error::Transient { err, .. } => err,
694 })
695 }
696 }
697
698 /// Sends a JSON-RPC notification (no response expected) with the
699 /// same exponential-backoff retry policy as [`Self::rpc`]. Every
700 /// error is transient; the loop gives up only when the backoff's
701 /// `max_elapsed_time` is exceeded.
702 async fn notify<P: serde::Serialize>(
703 &self,
704 method: &str,
705 params: &P,
706 ) -> Result<(), super::Error> {
707 if self.mock { return Ok(()); }
708 let body = serde_json::json!({
709 "jsonrpc": "2.0",
710 "method": method,
711 "params": params,
712 });
713
714 backoff::future::retry(self.backoff(), || async {
715 let url = self.url.clone();
716 let response = self.post().json(&body).send().await.map_err(|source| {
717 backoff::Error::transient(super::Error::Request {
718 url: url.clone(),
719 source,
720 })
721 })?;
722
723 if response.status() == reqwest::StatusCode::NOT_FOUND {
724 return Err(backoff::Error::transient(
725 super::Error::SessionExpired { url: url.clone() },
726 ));
727 }
728 if !response.status().is_success() {
729 let code = response.status();
730 let body = response.text().await.unwrap_or_default();
731 return Err(backoff::Error::transient(
732 super::Error::BadStatus { url: url.clone(), code, body },
733 ));
734 }
735
736 Ok(())
737 })
738 .await
739 }
740
741 /// `GET <self.url>/notify` against the ObjectiveAI MCP proxy.
742 /// Atomically drains the proxy's pending-notifications queue for
743 /// this session and returns the queued content blocks.
744 ///
745 /// Single-attempt — the proxy drain is destructive, so a retry
746 /// after a transient failure would risk silently dropping
747 /// notifications that the first attempt's response carried but
748 /// failed to deliver. Networks errors propagate to the caller; the
749 /// next turn's drain will pick up anything queued in the meantime.
750 /// A 404 (session unknown) is mapped to `Ok(vec![])` — see the
751 /// public method's doc on `Connection`.
752 async fn drain_notifications(
753 &self,
754 ) -> Result<Vec<super::tool::ContentBlock>, super::Error> {
755 if self.mock {
756 return Ok(Vec::new());
757 }
758
759 let url = format!("{}/notify", self.url.trim_end_matches('/'));
760 let mut request = self
761 .http_client
762 .get(&url)
763 .timeout(self.call_timeout)
764 .header("Accept", "application/json");
765 for (name, value) in &self.headers {
766 request = request.header(name, value);
767 }
768 // Mcp-Session-Id applied last so a same-named entry in `headers`
769 // can never override the connection's own session id — matches
770 // the invariant in `Self::post`.
771 request = request.header("Mcp-Session-Id", &self.session_id);
772
773 let response = request.send().await.map_err(|source| super::Error::Request {
774 url: url.clone(),
775 source,
776 })?;
777
778 if response.status() == reqwest::StatusCode::NOT_FOUND {
779 return Ok(Vec::new());
780 }
781 if !response.status().is_success() {
782 let code = response.status();
783 let body = response.text().await.unwrap_or_default();
784 return Err(super::Error::BadStatus { url, code, body });
785 }
786
787 response
788 .json::<Vec<super::tool::ContentBlock>>()
789 .await
790 .map_err(|source| super::Error::Request { url, source })
791 }
792
793 /// Returns a key identifying this connection for tool namespacing.
794 fn tool_key(&self) -> String {
795 format!("{}-{}", self.initialize_result.server_info.name, self.url)
796 }
797
798 /// Returns the session ID for this connection.
799 fn session_id(&self) -> &str {
800 &self.session_id
801 }
802
803 /// Sends a `tools/list` RPC call for a single page.
804 async fn rpc_list_tools(
805 &self,
806 cursor: Option<&str>,
807 ) -> Result<super::tool::ListToolsResult, super::Error> {
808 self.rpc(
809 "tools/list",
810 &super::tool::ListToolsRequest {
811 cursor: cursor.map(String::from),
812 },
813 true,
814 )
815 .await
816 }
817
818 /// Returns all tools from the server.
819 ///
820 /// Blocks until background pagination completes, then returns a
821 /// cheap `Arc` clone of the result.
822 async fn list_tools(
823 &self,
824 ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
825 self.tools.read().await.clone()
826 }
827
828 /// Calls a tool on the MCP server.
829 async fn call_tool(
830 &self,
831 params: &super::tool::CallToolRequestParams,
832 ) -> Result<super::tool::CallToolResult, super::Error> {
833 if self.mock {
834 return Ok(super::tool::CallToolResult {
835 content: vec![super::tool::ContentBlock::Text(super::tool::TextContent {
836 text: "mock".to_string(),
837 annotations: None,
838 _meta: None,
839 })],
840 structured_content: None,
841 is_error: None,
842 _meta: None,
843 });
844 }
845 self.rpc("tools/call", params, false).await
846 }
847
848 /// Calls a tool and converts the result into a [`ToolMessage`].
849 ///
850 /// Content blocks are mapped as follows:
851 /// - `text` → text part
852 /// - `image` → image_url part (data URL)
853 /// - `audio` → input_audio part
854 /// - `resource` (embedded text) → text part
855 /// - `resource` (embedded blob, image mime) → image_url part (data URL)
856 /// - `resource` (embedded blob, other mime) → file part
857 /// - `resource_link` → if the URI appears in `list_resources`, fetches
858 /// via `read_resource` and inlines the content using the same
859 /// text/blob rules; otherwise serializes the link as JSON text
860 ///
861 /// If `is_error` is set on the result, the content is prefixed with
862 /// an error indicator.
863 async fn call_tool_as_message(
864 &self,
865 params: &super::tool::CallToolRequestParams,
866 tool_call_id: String,
867 ) -> Result<
868 crate::agent::completions::message::ToolMessage,
869 super::Error,
870 > {
871 use crate::agent::completions::message::{
872 File, ImageUrl, InputAudio, RichContent, RichContentPart,
873 ToolMessage,
874 };
875 use super::shared::ResourceContentsUnion;
876 use super::tool::ContentBlock;
877
878 let result = self.call_tool(params).await?;
879
880 // Build the set of known resource URIs for resource_link resolution.
881 let known_resource_uris: std::collections::HashSet<String> =
882 match self.list_resources().await {
883 Ok(resources) => {
884 resources.iter().map(|r| r.uri.clone()).collect()
885 }
886 Err(_) => std::collections::HashSet::new(),
887 };
888
889 /// Converts a `ResourceContentsUnion` into one or more rich content
890 /// parts. Text resources become text parts. Blob resources with an
891 /// image MIME type become image_url parts (data URL); all other blobs
892 /// become file parts.
893 fn resource_contents_to_part(
894 contents: &ResourceContentsUnion,
895 ) -> RichContentPart {
896 match contents {
897 ResourceContentsUnion::Text(text) => {
898 RichContentPart::Text {
899 text: text.text.clone(),
900 }
901 }
902 ResourceContentsUnion::Blob(blob) => {
903 let mime = blob
904 .base
905 .mime_type
906 .as_deref()
907 .unwrap_or("application/octet-stream");
908
909 if mime.starts_with("image/") {
910 RichContentPart::ImageUrl {
911 image_url: ImageUrl {
912 url: format!(
913 "data:{};base64,{}",
914 mime, blob.blob
915 ),
916 detail: None,
917 },
918 }
919 } else {
920 // Extract a filename from the URI path, if any.
921 let filename = blob
922 .base
923 .uri
924 .rsplit('/')
925 .next()
926 .filter(|s| !s.is_empty())
927 .map(String::from);
928
929 RichContentPart::File {
930 file: File {
931 file_data: Some(blob.blob.clone()),
932 filename,
933 file_id: None,
934 file_url: None,
935 },
936 }
937 }
938 }
939 }
940 }
941
942 let mut parts: Vec<RichContentPart> = Vec::new();
943
944 for block in &result.content {
945 match block {
946 ContentBlock::Text(text) => {
947 parts.push(RichContentPart::Text {
948 text: text.text.clone(),
949 });
950 }
951 ContentBlock::Image(image) => {
952 parts.push(RichContentPart::ImageUrl {
953 image_url: ImageUrl {
954 url: format!(
955 "data:{};base64,{}",
956 image.mime_type, image.data
957 ),
958 detail: None,
959 },
960 });
961 }
962 ContentBlock::Audio(audio) => {
963 parts.push(RichContentPart::InputAudio {
964 input_audio: InputAudio {
965 data: audio.data.clone(),
966 format: audio.mime_type.clone(),
967 },
968 });
969 }
970 ContentBlock::EmbeddedResource(embedded) => {
971 parts.push(resource_contents_to_part(
972 &embedded.resource,
973 ));
974 }
975 ContentBlock::ResourceLink(link) => {
976 if known_resource_uris.contains(&link.uri) {
977 // Fetch the resource and inline its contents.
978 let read_result =
979 self.read_resource(&link.uri).await?;
980 for contents in &read_result.contents {
981 parts.push(
982 resource_contents_to_part(contents),
983 );
984 }
985 } else {
986 // Not a known resource; serialize as JSON text.
987 parts.push(RichContentPart::Text {
988 text: serde_json::to_string(link)
989 .unwrap_or_default(),
990 });
991 }
992 }
993 }
994 }
995
996 let content = match parts.len() {
997 0 => RichContent::Text(String::new()),
998 1 => match parts.remove(0) {
999 RichContentPart::Text { text } => RichContent::Text(text),
1000 other => RichContent::Parts(vec![other]),
1001 },
1002 _ => RichContent::Parts(parts),
1003 };
1004
1005 Ok(ToolMessage {
1006 content,
1007 tool_call_id,
1008 })
1009 }
1010
1011 /// Sends a `resources/list` RPC call for a single page.
1012 async fn rpc_list_resources(
1013 &self,
1014 cursor: Option<&str>,
1015 ) -> Result<super::resource::ListResourcesResult, super::Error> {
1016 self.rpc(
1017 "resources/list",
1018 &super::resource::ListResourcesRequest {
1019 cursor: cursor.map(String::from),
1020 },
1021 true,
1022 )
1023 .await
1024 }
1025
1026 /// Returns all resources from the server.
1027 ///
1028 /// Blocks until background pagination completes, then returns a
1029 /// cheap `Arc` clone of the result.
1030 async fn list_resources(
1031 &self,
1032 ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
1033 self.resources.read().await.clone()
1034 }
1035
1036 /// Returns the cached tool list as soon as it differs from `current`,
1037 /// or — if it equals `current` right now — waits up to `timeout` for
1038 /// the cache to change and then returns whatever it sees.
1039 ///
1040 /// An `Err` cache is treated as "different from any caller snapshot"
1041 /// and returned immediately.
1042 ///
1043 /// Concurrency-safe: any number of concurrent subscribers wait on
1044 /// independent `Notified` futures and read the cache through the
1045 /// shared `RwLock`. A timeout that fires alone is not an error — we
1046 /// re-read the cache and return whatever's there.
1047 async fn subscribe_tools(
1048 &self,
1049 current: &[super::tool::Tool],
1050 timeout: Duration,
1051 ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
1052 // Arm BEFORE reading. `enable()` registers the future in the
1053 // wait queue without polling, so a `notify_waiters` racing
1054 // between our read and our await still wakes us.
1055 let notified = self.tools_changed.notified();
1056 tokio::pin!(notified);
1057 notified.as_mut().enable();
1058
1059 let initial = self.tools.read().await.clone();
1060 match &initial {
1061 Ok(arc) if arc.as_slice() == current => {}
1062 _ => return initial,
1063 }
1064
1065 let _ = tokio::time::timeout(timeout, notified).await;
1066
1067 self.tools.read().await.clone()
1068 }
1069
1070 /// Resource counterpart of [`Self::subscribe_tools`].
1071 async fn subscribe_resources(
1072 &self,
1073 current: &[super::resource::Resource],
1074 timeout: Duration,
1075 ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
1076 let notified = self.resources_changed.notified();
1077 tokio::pin!(notified);
1078 notified.as_mut().enable();
1079
1080 let initial = self.resources.read().await.clone();
1081 match &initial {
1082 Ok(arc) if arc.as_slice() == current => {}
1083 _ => return initial,
1084 }
1085
1086 let _ = tokio::time::timeout(timeout, notified).await;
1087
1088 self.resources.read().await.clone()
1089 }
1090
1091 /// Reads a resource from the MCP server.
1092 async fn read_resource(
1093 &self,
1094 uri: &str,
1095 ) -> Result<super::resource::ReadResourceResult, super::Error> {
1096 self.rpc(
1097 "resources/read",
1098 &super::resource::ReadResourceRequestParams {
1099 uri: uri.to_string(),
1100 },
1101 true,
1102 )
1103 .await
1104 }
1105
1106 /// Re-fetches all tools from the server, replacing the cached list.
1107 ///
1108 /// Optionally fires `on_change` *after* the write lock is acquired but
1109 /// *before* the network paginate begins, so the callback observes the
1110 /// "list change is in flight" edge — readers blocked on the read lock
1111 /// won't return until the new list lands. The proxy uses this to
1112 /// re-emit `notifications/tools/list_changed` to its downstream client
1113 /// at the moment the staleness window opens.
1114 async fn refresh_tools(&self, on_change: Option<ListChangedCallback>) {
1115 // Listener-driven refresh. Visibility contract: any caller
1116 // that issues `list_tools()` after a `tools/list_changed`
1117 // notification has been observed must see the post-swap
1118 // value, not stale data — so the write lock has to gate
1119 // readers across the upstream paginate.
1120 //
1121 // Performance contract: don't serialise paginate *behind*
1122 // lock-acquisition latency. We start `tools.write()` and the
1123 // upstream paginate **concurrently** with `tokio::join!`. The
1124 // write-lock acquire blocks new `list_tools()` readers
1125 // immediately (preserving visibility) and runs in parallel
1126 // with whatever drain time the in-flight readers need; the
1127 // paginate runs alongside. Total wall-clock is
1128 // `max(drain_time, paginate_time)` instead of the sum.
1129 //
1130 // `notify_waiters` and `on_change` fire under the write
1131 // guard, *after* `*guard = result`, so anyone awoken by them
1132 // queues on the read lock, waits for the guard to drop, and
1133 // observes the post-swap state.
1134 let (mut guard, result) = tokio::join!(
1135 self.tools.write(),
1136 self.paginate_tools(),
1137 );
1138 *guard = result;
1139 self.tools_changed.notify_waiters();
1140 if let Some(cb) = on_change {
1141 cb();
1142 }
1143 }
1144
1145 /// Page-by-page fetch of the upstream tool list, no locks held.
1146 /// Shared between the `_signaling` (initial-populate, holds lock
1147 /// for the original "block fast readers" contract) and `refresh_*`
1148 /// (listener-driven, lock-only-around-install) variants.
1149 async fn paginate_tools(
1150 &self,
1151 ) -> Result<Arc<Vec<super::tool::Tool>>, Arc<super::Error>> {
1152 let mut all_tools = Vec::new();
1153 let mut cursor: Option<String> = None;
1154 loop {
1155 match self.rpc_list_tools(cursor.as_deref()).await {
1156 Ok(page) => {
1157 all_tools.extend(page.tools);
1158 cursor = page.next_cursor;
1159 if cursor.is_none() {
1160 return Ok(Arc::new(all_tools));
1161 }
1162 }
1163 Err(e) => return Err(Arc::new(e)),
1164 }
1165 }
1166 }
1167
1168 /// Same as [`Self::refresh_tools`] but fires `lock_held` once the
1169 /// write lock has been acquired so the caller can synchronise on
1170 /// "writer is in possession of the cache" before returning. Used by
1171 /// `ConnectionInner::new` to prevent a fast reader from acquiring
1172 /// the read lock before this writer has even started.
1173 async fn refresh_tools_signaling(
1174 &self,
1175 lock_held: tokio::sync::oneshot::Sender<()>,
1176 on_change: Option<ListChangedCallback>,
1177 ) {
1178 let mut guard = self.tools.write().await;
1179 // Fire `tools_changed` while we hold the write lock and *before*
1180 // installing the new list. Any subscriber woken now must take a
1181 // read lock to observe the result, and that read lock is queued
1182 // behind this write guard — so they always see the post-swap
1183 // state, never mid-swap.
1184 self.tools_changed.notify_waiters();
1185 let _ = lock_held.send(());
1186 if let Some(cb) = on_change {
1187 cb();
1188 }
1189 let mut all_tools = Vec::new();
1190 let mut cursor: Option<String> = None;
1191 let result = loop {
1192 match self.rpc_list_tools(cursor.as_deref()).await {
1193 Ok(page) => {
1194 all_tools.extend(page.tools);
1195 cursor = page.next_cursor;
1196 if cursor.is_none() {
1197 break Ok(Arc::new(all_tools));
1198 }
1199 }
1200 Err(e) => break Err(Arc::new(e)),
1201 }
1202 };
1203 *guard = result;
1204 }
1205
1206 /// Re-fetches all resources from the server, replacing the cached list.
1207 /// See [`ConnectionInner::refresh_tools`] for the callback timing
1208 /// contract.
1209 async fn refresh_resources(&self, on_change: Option<ListChangedCallback>) {
1210 // Same paginate-while-acquiring-the-write-lock pattern as
1211 // `refresh_tools` — see that comment for the visibility +
1212 // performance rationale.
1213 let (mut guard, result) = tokio::join!(
1214 self.resources.write(),
1215 self.paginate_resources(),
1216 );
1217 *guard = result;
1218 self.resources_changed.notify_waiters();
1219 if let Some(cb) = on_change {
1220 cb();
1221 }
1222 }
1223
1224 async fn paginate_resources(
1225 &self,
1226 ) -> Result<Arc<Vec<super::resource::Resource>>, Arc<super::Error>> {
1227 let mut all_resources = Vec::new();
1228 let mut cursor: Option<String> = None;
1229 loop {
1230 match self.rpc_list_resources(cursor.as_deref()).await {
1231 Ok(page) => {
1232 all_resources.extend(page.resources);
1233 cursor = page.next_cursor;
1234 if cursor.is_none() {
1235 return Ok(Arc::new(all_resources));
1236 }
1237 }
1238 Err(e) => return Err(Arc::new(e)),
1239 }
1240 }
1241 }
1242
1243 /// Resource counterpart of [`Self::refresh_tools_signaling`].
1244 async fn refresh_resources_signaling(
1245 &self,
1246 lock_held: tokio::sync::oneshot::Sender<()>,
1247 on_change: Option<ListChangedCallback>,
1248 ) {
1249 let mut guard = self.resources.write().await;
1250 // See `refresh_tools_signaling` — fire under the write lock,
1251 // before install, so subscribers' next read sees the post-swap
1252 // state.
1253 self.resources_changed.notify_waiters();
1254 let _ = lock_held.send(());
1255 if let Some(cb) = on_change {
1256 cb();
1257 }
1258 let mut all_resources = Vec::new();
1259 let mut cursor: Option<String> = None;
1260 let result = loop {
1261 match self.rpc_list_resources(cursor.as_deref()).await {
1262 Ok(page) => {
1263 all_resources.extend(page.resources);
1264 cursor = page.next_cursor;
1265 if cursor.is_none() {
1266 break Ok(Arc::new(all_resources));
1267 }
1268 }
1269 Err(e) => break Err(Arc::new(e)),
1270 }
1271 };
1272 *guard = result;
1273 }
1274
1275 /// Builds a GET request to the MCP endpoint for receiving server
1276 /// notifications via SSE.
1277 fn get(&self) -> reqwest::RequestBuilder {
1278 let mut request = self
1279 .http_client
1280 .get(&self.url)
1281 .header("Accept", "text/event-stream");
1282 for (name, value) in &self.headers {
1283 request = request.header(name, value);
1284 }
1285 // Mcp-Session-Id last so it always wins over `headers`.
1286 request = request.header("Mcp-Session-Id", &self.session_id);
1287 request
1288 }
1289
1290 /// Listens for `notifications/tools/list_changed` and
1291 /// `notifications/resources/list_changed` on an SSE stream. On each
1292 /// notification, write-locks and re-fetches the full list.
1293 ///
1294 /// `initial_lines` is the pre-opened SSE line reader handed in by
1295 /// [`Client::connect`](super::Client::connect) — that stream is
1296 /// consumed first. When it ends (or any later GET reconnect ends),
1297 /// we sleep `backoff_initial_interval` and open a fresh GET `/` SSE
1298 /// stream.
1299 ///
1300 /// Takes a [`Weak<Self>`] (not `Arc<Self>`) so the spawned task
1301 /// doesn't itself keep the [`Connection`] alive, and a
1302 /// [`CancellationToken`] sibling clone of the connection's
1303 /// [`DropGuard`] so the task tears down the instant the last
1304 /// external `Arc<ConnectionInner>` clone is dropped — every
1305 /// blocking await (line read, reconnect send, backoff sleep) is
1306 /// raced against `cancel.cancelled()` and exits without any zombie
1307 /// retries against a now-dead session.
1308 async fn listen_for_list_changes(
1309 weak: Weak<Self>,
1310 cancel: CancellationToken,
1311 initial_lines: super::LinesStream,
1312 ) {
1313 // First iteration: use the pre-opened SSE stream the client
1314 // handed us. After that, fall back to opening fresh GET / SSE
1315 // streams as the upstream connection cycles.
1316 let mut next_lines: Option<super::LinesStream> = Some(initial_lines);
1317 // One-shot guard for the catch-up refresh: false on the very
1318 // first iteration (the caller's pre-opened SSE stream — its
1319 // associated cache was just populated by `Client::connect`'s
1320 // initial pagination, so re-fetching there would just be a
1321 // wasted round-trip), true thereafter. Every stream end —
1322 // whether `Ok(None)` (clean close) or `Err(_)` (read failure)
1323 // — drops back here, which we treat as an implicit
1324 // list-changed notification: the upstream's broadcast (in
1325 // particular the proxy's per-session `tokio::broadcast`) is
1326 // lossy for moments when this listener has zero active
1327 // subscribers, so anything that fired during our disconnect
1328 // window may have been dropped.
1329 //
1330 // ORDER MATTERS. The refresh must run AFTER we've re-opened
1331 // the GET / SSE stream — i.e. after we're a subscriber again
1332 // — and BEFORE we enter the inner read loop. If we refreshed
1333 // before the resubscribe, a notification that fired between
1334 // our refresh-completion and our subscribe would be lost the
1335 // same way as the original disconnect-window drops; doing it
1336 // after means a notification fired DURING the refresh lands
1337 // in the new subscriber's buffer (broadcast::Sender::send
1338 // backs onto each receiver's channel-capacity slot) and gets
1339 // consumed by the inner loop on its next read.
1340 let mut is_reconnect = false;
1341
1342 loop {
1343 // The token cancels deterministically when the last
1344 // `Arc<ConnectionInner>` clone is dropped (see
1345 // `_listener_cancel_guard`). Check once per outer
1346 // iteration, but the real protection is the cancel arms in
1347 // every blocking await below — those exit immediately on
1348 // cancel.
1349 if cancel.is_cancelled() {
1350 return;
1351 }
1352 let Some(this) = weak.upgrade() else { return };
1353 let backoff_delay = this.backoff_initial_interval;
1354
1355 let mut lines = match next_lines.take() {
1356 Some(l) => l,
1357 None => {
1358 // Race the upstream GET against cancellation — if
1359 // the connection drops mid-reconnect, exit
1360 // immediately rather than waiting for the request
1361 // to complete or time out (otherwise produces a
1362 // burst of 401 retries against a now-dead session
1363 // under heavy churn).
1364 let send_outcome = tokio::select! {
1365 out = this.get().send() => out,
1366 _ = cancel.cancelled() => {
1367 drop(this);
1368 return;
1369 }
1370 };
1371 let response = match send_outcome {
1372 Ok(r) if r.status().is_success() => r,
1373 _ => {
1374 drop(this);
1375 // Sleep with cancel-arm: instant exit on
1376 // drop, no zombie retries.
1377 tokio::select! {
1378 _ = tokio::time::sleep(backoff_delay) => {}
1379 _ = cancel.cancelled() => return,
1380 }
1381 continue;
1382 }
1383 };
1384 super::lines_from_response(response)
1385 }
1386 };
1387
1388 // Catch-up refresh on every reconnect — the implicit
1389 // list-changed treatment for the just-failed stream. See
1390 // the `is_reconnect` doc-comment above for the
1391 // refresh-AFTER-resubscribe rationale.
1392 if is_reconnect {
1393 // tools and resources are independent locks; run the
1394 // catch-up refreshes concurrently so disconnect
1395 // recovery isn't sequential.
1396 let _ = tokio::join!(
1397 this.refresh_tools(this.on_tools_list_changed.get()),
1398 this.refresh_resources(this.on_resources_list_changed.get()),
1399 );
1400 }
1401 is_reconnect = true;
1402
1403 'inner: loop {
1404 tokio::select! {
1405 line_result = lines.next_line() => {
1406 match line_result {
1407 Ok(Some(line)) => {
1408 // SSE data lines start with "data: ".
1409 let Some(data) = line.strip_prefix("data: ") else {
1410 continue 'inner;
1411 };
1412 let method = match serde_json::from_str::<super::JsonRpcNotification>(data) {
1413 Ok(n) => n.method,
1414 Err(_) => continue 'inner,
1415 };
1416 match method.as_str() {
1417 "notifications/tools/list_changed" => {
1418 // refresh_tools fires the
1419 // callback after the cache is
1420 // installed, so the proxy's
1421 // downstream
1422 // notifications/tools/list_changed
1423 // emission lines up with the
1424 // staleness window opening.
1425 this.refresh_tools(
1426 this.on_tools_list_changed.get(),
1427 )
1428 .await;
1429 }
1430 "notifications/resources/list_changed" => {
1431 this.refresh_resources(
1432 this.on_resources_list_changed.get(),
1433 )
1434 .await;
1435 }
1436 _ => {}
1437 }
1438 }
1439 // Stream ended cleanly or errored — break out
1440 // to the outer loop so we either reconnect or,
1441 // if everyone's gone, exit at the top.
1442 _ => break 'inner,
1443 }
1444 }
1445 // Cancellation: the connection's last clone has
1446 // dropped. Tear down immediately.
1447 _ = cancel.cancelled() => {
1448 drop(this);
1449 return;
1450 }
1451 }
1452 }
1453
1454 // Stream ended — drop the strong ref before sleeping so the
1455 // next iteration's weak-upgrade can detect liveness honestly.
1456 drop(this);
1457 tokio::select! {
1458 _ = tokio::time::sleep(backoff_delay) => {}
1459 _ = cancel.cancelled() => return,
1460 }
1461 }
1462 }
1463}
1464
1465#[cfg(test)]
1466mod subscribe_tests {
1467 use super::*;
1468 use crate::mcp::tool::{Tool, ToolSchemaObject, ToolSchemaType};
1469
1470 fn tool(name: &str) -> Tool {
1471 Tool {
1472 name: name.to_string(),
1473 title: None,
1474 description: None,
1475 icons: None,
1476 input_schema: ToolSchemaObject {
1477 r#type: ToolSchemaType::Object,
1478 properties: None,
1479 required: None,
1480 extra: IndexMap::new(),
1481 },
1482 output_schema: None,
1483 annotations: None,
1484 execution: None,
1485 _meta: None,
1486 }
1487 }
1488
1489 /// First read shows a different list — return immediately, never wait.
1490 #[tokio::test]
1491 async fn subscribe_tools_returns_immediately_when_cache_differs() {
1492 let conn = Connection::new_for_test("t".into(), "http://x".into());
1493 *conn.inner.tools.write().await = Ok(Arc::new(vec![tool("a")]));
1494
1495 let start = std::time::Instant::now();
1496 let got = conn
1497 .subscribe_tools(&[tool("b")], Duration::from_secs(5))
1498 .await
1499 .unwrap();
1500 assert!(start.elapsed() < Duration::from_millis(100));
1501 assert_eq!(got.as_slice(), &[tool("a")]);
1502 }
1503
1504 /// Cached `Err` is treated as "different from any caller snapshot."
1505 #[tokio::test]
1506 async fn subscribe_tools_returns_err_immediately() {
1507 let conn = Connection::new_for_test("t".into(), "http://x".into());
1508 let err = super::super::Error::NoSessionId {
1509 url: "http://x".into(),
1510 body: String::new(),
1511 };
1512 *conn.inner.tools.write().await = Err(Arc::new(err));
1513
1514 let start = std::time::Instant::now();
1515 let got = conn
1516 .subscribe_tools(&[], Duration::from_secs(5))
1517 .await;
1518 assert!(start.elapsed() < Duration::from_millis(100));
1519 assert!(got.is_err());
1520 }
1521
1522 /// Cache equals snapshot, then a writer fires the notify under the
1523 /// write lock and installs a new list. The subscriber wakes, then its
1524 /// re-read blocks behind the writer's guard, observes the new list.
1525 #[tokio::test]
1526 async fn subscribe_tools_wakes_on_change_and_reads_post_swap() {
1527 let conn = Connection::new_for_test("t".into(), "http://x".into());
1528 *conn.inner.tools.write().await = Ok(Arc::new(vec![tool("a")]));
1529
1530 let conn_for_subscriber = conn.clone();
1531 let subscriber = tokio::spawn(async move {
1532 conn_for_subscriber
1533 .subscribe_tools(&[tool("a")], Duration::from_secs(5))
1534 .await
1535 .unwrap()
1536 });
1537
1538 // Give the subscriber a moment to arm `notified()` and finish
1539 // its first read so it's parked on the timeout.
1540 tokio::time::sleep(Duration::from_millis(50)).await;
1541
1542 // Simulate `refresh_tools_signaling`: take the write lock, fire
1543 // `tools_changed` *while holding* the write lock, then install
1544 // the new value before releasing. This is exactly the ordering
1545 // that the real refresh path uses.
1546 {
1547 let mut guard = conn.inner.tools.write().await;
1548 conn.inner.tools_changed.notify_waiters();
1549 // Hold briefly to make absolutely sure the subscriber is
1550 // racing the read lock against our drop.
1551 tokio::time::sleep(Duration::from_millis(20)).await;
1552 *guard = Ok(Arc::new(vec![tool("b")]));
1553 }
1554
1555 let got = tokio::time::timeout(Duration::from_secs(2), subscriber)
1556 .await
1557 .expect("subscriber returned in time")
1558 .expect("subscriber didn't panic");
1559 assert_eq!(got.as_slice(), &[tool("b")]);
1560 }
1561
1562 /// Cache equals snapshot, no notification arrives — timeout, return
1563 /// the still-equal list (not an error).
1564 #[tokio::test]
1565 async fn subscribe_tools_times_out_and_returns_unchanged_list() {
1566 let conn = Connection::new_for_test("t".into(), "http://x".into());
1567 *conn.inner.tools.write().await = Ok(Arc::new(vec![tool("a")]));
1568
1569 let start = std::time::Instant::now();
1570 let got = conn
1571 .subscribe_tools(&[tool("a")], Duration::from_millis(50))
1572 .await
1573 .unwrap();
1574 let elapsed = start.elapsed();
1575 assert!(elapsed >= Duration::from_millis(40), "elapsed: {elapsed:?}");
1576 assert!(elapsed < Duration::from_millis(500), "elapsed: {elapsed:?}");
1577 assert_eq!(got.as_slice(), &[tool("a")]);
1578 }
1579
1580 /// Two concurrent subscribers both wake on a single notify_waiters
1581 /// and both observe the post-swap list.
1582 #[tokio::test]
1583 async fn subscribe_tools_supports_concurrent_subscribers() {
1584 let conn = Connection::new_for_test("t".into(), "http://x".into());
1585 *conn.inner.tools.write().await = Ok(Arc::new(vec![tool("a")]));
1586
1587 let c1 = conn.clone();
1588 let c2 = conn.clone();
1589 let s1 = tokio::spawn(async move {
1590 c1.subscribe_tools(&[tool("a")], Duration::from_secs(5))
1591 .await
1592 .unwrap()
1593 });
1594 let s2 = tokio::spawn(async move {
1595 c2.subscribe_tools(&[tool("a")], Duration::from_secs(5))
1596 .await
1597 .unwrap()
1598 });
1599
1600 tokio::time::sleep(Duration::from_millis(50)).await;
1601
1602 {
1603 let mut guard = conn.inner.tools.write().await;
1604 conn.inner.tools_changed.notify_waiters();
1605 *guard = Ok(Arc::new(vec![tool("c")]));
1606 }
1607
1608 let (r1, r2) = tokio::join!(s1, s2);
1609 let r1 = r1.unwrap();
1610 let r2 = r2.unwrap();
1611 assert_eq!(r1.as_slice(), &[tool("c")]);
1612 assert_eq!(r2.as_slice(), &[tool("c")]);
1613 }
1614}
1615
1616#[cfg(test)]
1617mod drain_notifications_tests {
1618 use super::*;
1619 use crate::mcp::tool::{ContentBlock, TextContent};
1620 use serde_json::json;
1621 use wiremock::matchers::{header, method, path};
1622 use wiremock::{Mock, MockServer, ResponseTemplate};
1623
1624 /// Happy path: proxy returns `[text, text]`, we parse it as two
1625 /// `ContentBlock::Text` and return them in order.
1626 #[tokio::test]
1627 async fn drain_notifications_parses_text_blocks_in_order() {
1628 let server = MockServer::start().await;
1629 Mock::given(method("GET"))
1630 .and(path("/notify"))
1631 .and(header("Mcp-Session-Id", ""))
1632 .respond_with(ResponseTemplate::new(200).set_body_json(json!([
1633 {"type": "text", "text": "first"},
1634 {"type": "text", "text": "second"},
1635 ])))
1636 .mount(&server)
1637 .await;
1638
1639 let conn = Connection::new_for_test("t".into(), server.uri());
1640 let blocks = conn.drain_notifications().await.expect("drain ok");
1641 assert_eq!(blocks.len(), 2);
1642 match &blocks[0] {
1643 ContentBlock::Text(TextContent { text, .. }) => assert_eq!(text, "first"),
1644 other => panic!("expected text, got {other:?}"),
1645 }
1646 match &blocks[1] {
1647 ContentBlock::Text(TextContent { text, .. }) => assert_eq!(text, "second"),
1648 other => panic!("expected text, got {other:?}"),
1649 }
1650 }
1651
1652 /// 404 (proxy lost the session, e.g. after a restart) → empty vec
1653 /// rather than an error. The next upstream call will surface the
1654 /// session-lost condition through its own error path; init-time
1655 /// drain shouldn't be the one to abort the request.
1656 #[tokio::test]
1657 async fn drain_notifications_404_returns_empty() {
1658 let server = MockServer::start().await;
1659 Mock::given(method("GET"))
1660 .and(path("/notify"))
1661 .respond_with(ResponseTemplate::new(404))
1662 .mount(&server)
1663 .await;
1664
1665 let conn = Connection::new_for_test("t".into(), server.uri());
1666 let blocks = conn.drain_notifications().await.expect("404 → ok(empty)");
1667 assert!(blocks.is_empty(), "expected empty vec, got {blocks:?}");
1668 }
1669
1670 /// Empty queue → empty array → empty vec. The most common case.
1671 #[tokio::test]
1672 async fn drain_notifications_empty_queue_returns_empty() {
1673 let server = MockServer::start().await;
1674 Mock::given(method("GET"))
1675 .and(path("/notify"))
1676 .respond_with(ResponseTemplate::new(200).set_body_json(json!([])))
1677 .mount(&server)
1678 .await;
1679
1680 let conn = Connection::new_for_test("t".into(), server.uri());
1681 let blocks = conn.drain_notifications().await.expect("drain ok");
1682 assert!(blocks.is_empty(), "expected empty vec, got {blocks:?}");
1683 }
1684
1685 /// Non-success / non-404 status propagates as `BadStatus`.
1686 #[tokio::test]
1687 async fn drain_notifications_5xx_returns_bad_status() {
1688 let server = MockServer::start().await;
1689 Mock::given(method("GET"))
1690 .and(path("/notify"))
1691 .respond_with(ResponseTemplate::new(500).set_body_string("boom"))
1692 .mount(&server)
1693 .await;
1694
1695 let conn = Connection::new_for_test("t".into(), server.uri());
1696 let err = conn
1697 .drain_notifications()
1698 .await
1699 .expect_err("5xx → err");
1700 match err {
1701 super::super::Error::BadStatus { code, body, .. } => {
1702 assert_eq!(code.as_u16(), 500);
1703 assert_eq!(body, "boom");
1704 }
1705 other => panic!("expected BadStatus, got {other:?}"),
1706 }
1707 }
1708
1709 /// Mock connections never hit the network and always return empty.
1710 #[tokio::test]
1711 async fn drain_notifications_mock_returns_empty() {
1712 let conn = Connection::new_mock("http://does-not-matter".into());
1713 let blocks = conn.drain_notifications().await.expect("mock ok");
1714 assert!(blocks.is_empty());
1715 }
1716}