pub struct Connection { /* private fields */ }Expand description
An active connection to an MCP server using the Streamable HTTP transport.
Cheaply clonable (one Arc bump). When the last clone is dropped, the
inner Arc ref count hits zero, [ConnectionInner::Drop] runs, the
listener-cancel DropGuard is dropped, and the SSE listener task is
cancelled — exiting any in-flight lines.next_line(), reconnect
send(), or backoff sleep immediately without retrying against
the now-dead proxy session.
Use the public methods (list_tools, call_tool, list_resources,
read_resource, call_tool_as_message, tool_key) for the upstream
MCP protocol surface. The inner state (ConnectionInner) is also
reachable via Deref for read-only field access (e.g.
connection.url, connection.initialize_result.server_info.name),
but its methods are private — you must go through Connection.
Implementations§
Source§impl Connection
impl Connection
Sourcepub async fn delete(&self) -> Result<(), Error>
pub async fn delete(&self) -> Result<(), Error>
Tear this connection down explicitly.
- Cancels the long-lived list-changed listener task immediately
(drops the
DropGuardinConnectionInner::_listener_cancel_guard), so by the time the HTTP DELETE goes out the listener isn’t still holding an SSE read open against the upstream we’re about to close. - Issues
DELETE /to the upstream with this connection’sMcp-Session-Idand the same merged header set every other RPC stamps. ReusesConnectionInner::call_timeout. - Treats
404 / 401 / 403as success — the upstream is unreachable from these credentials anyway, which is the desired terminal state. Other non-2xx surfaces assuper::Error::BadStatus.
Takes &self: the listener cancel is in-place, and dropping
the surrounding Arc<ConnectionInner> (which closes the rest
of the connection’s owned state) is the caller’s responsibility
— usually by dropping the Arc<Session> holding it. Stateless
callers that don’t hold a Connection should use
Client::delete instead.
In-flight RPC ordering. This method does not block on
in-flight call_tool / read_resource / list_tools /
list_resources calls on the same connection. If one is
outstanding when delete lands, the upstream may see DELETE
before the RPC’s reply makes it back; the in-flight call then
surfaces as a closed-connection error to whoever started it.
That’s the spec-correct order (client said terminate) — drain
on the caller side first if you need different semantics.
Sourcepub fn tool_key(&self) -> String
pub fn tool_key(&self) -> String
Returns a key identifying this connection for tool namespacing.
Sourcepub fn session_id(&self) -> &str
pub fn session_id(&self) -> &str
Returns the session ID for this connection.
Sourcepub async fn list_tools(&self) -> Result<Arc<Vec<Tool>>, Arc<Error>>
pub async fn list_tools(&self) -> Result<Arc<Vec<Tool>>, Arc<Error>>
Returns all tools from the upstream server.
Sourcepub async fn call_tool(
&self,
params: &CallToolRequestParams,
) -> Result<CallToolResult, Error>
pub async fn call_tool( &self, params: &CallToolRequestParams, ) -> Result<CallToolResult, Error>
Calls a tool on the upstream server.
Sourcepub async fn call_tool_as_message(
&self,
params: &CallToolRequestParams,
tool_call_id: String,
) -> Result<ToolMessage, Error>
pub async fn call_tool_as_message( &self, params: &CallToolRequestParams, tool_call_id: String, ) -> Result<ToolMessage, Error>
Calls a tool and converts the result into a [ToolMessage].
Sourcepub async fn list_resources(&self) -> Result<Arc<Vec<Resource>>, Arc<Error>>
pub async fn list_resources(&self) -> Result<Arc<Vec<Resource>>, Arc<Error>>
Returns all resources from the upstream server.
Sourcepub async fn subscribe_tools(
&self,
current: &[Tool],
timeout: Duration,
) -> Result<Arc<Vec<Tool>>, Arc<Error>>
pub async fn subscribe_tools( &self, current: &[Tool], timeout: Duration, ) -> Result<Arc<Vec<Tool>>, Arc<Error>>
Returns the cached tool list as soon as it differs from current,
or waits up to timeout for the next notifications/tools/list_changed
from the upstream server before re-reading.
Wakes the moment a refresh writer takes the cache write lock, so
the post-wake read is guaranteed to observe the new list rather
than racing against the install. Safe to call from any number of
tasks concurrently.
Sourcepub async fn subscribe_resources(
&self,
current: &[Resource],
timeout: Duration,
) -> Result<Arc<Vec<Resource>>, Arc<Error>>
pub async fn subscribe_resources( &self, current: &[Resource], timeout: Duration, ) -> Result<Arc<Vec<Resource>>, Arc<Error>>
Resource counterpart of Connection::subscribe_tools.
Sourcepub async fn drain_notifications(&self) -> Result<Vec<ContentBlock>, Error>
pub async fn drain_notifications(&self) -> Result<Vec<ContentBlock>, Error>
Atomically drain the proxy’s pending_notifications queue for
this session via GET /notify and return the queued content
blocks. A second call returns [] until the next out-of-band
POST /notify.
Intended for use at the start of an agent turn so notifications
queued between turns — when the prior turn ended without a tool
call, or the user is starting a fresh continuation — surface as
a user message instead of being lost. The proxy’s existing
tools/call response path still drains in-flight notifications
arriving during a turn; this method covers the gap between
turns.
A 404 from the proxy (session unknown — possible after a proxy
restart) is mapped to an empty Vec so callers do not need to
distinguish “no notifications” from “lost session” at the use
site; the next upstream call will surface the lost-session
condition through its own error path.
Sourcepub async fn has_pending_notifications(&self) -> Result<bool, Error>
pub async fn has_pending_notifications(&self) -> Result<bool, Error>
Non-draining peek at the proxy’s pending_notifications queue
via GET /notify/queued. Returns true iff the queue holds at
least one block. Companion to Connection::drain_notifications
for callers that want to know whether queued blocks exist
without consuming them.
A 404 from the proxy (session unknown — possible after a proxy
restart) is mapped to Ok(false) for the same reason as the
drain path: callers do not need to distinguish “no
notifications” from “lost session” at the use site.
Sourcepub async fn enqueue_notifications(
&self,
blocks: &[ContentBlock],
) -> Result<(), Error>
pub async fn enqueue_notifications( &self, blocks: &[ContentBlock], ) -> Result<(), Error>
POST <self.url>/notify against the ObjectiveAI MCP proxy.
Appends blocks to the proxy’s pending-notifications queue for
this session; they surface as a user message on the next
tools/call response (wrapped in a <system-reminder> block)
or as the head of the next agent turn when drained between turns.
Mirror of Connection::drain_notifications / Connection::has_pending_notifications
for the inbound side. A 404 from the proxy means the session is
gone — surfaced as SessionExpired so callers can distinguish
“session lost” from “delivery failed” at the use site.
Sourcepub async fn read_resource(
&self,
uri: &str,
) -> Result<ReadResourceResult, Error>
pub async fn read_resource( &self, uri: &str, ) -> Result<ReadResourceResult, Error>
Reads a resource from the upstream server.
Sourcepub fn set_on_tools_list_changed<F>(&self, callback: F)
pub fn set_on_tools_list_changed<F>(&self, callback: F)
Register a callback to fire whenever the upstream emits
notifications/tools/list_changed.
Timing: the callback runs after the tool cache’s write lock
is acquired but before the network paginate that replaces it.
That means readers blocked on the read lock won’t return until the
new list is in place, and the callback observes the moment the
staleness window opens. The proxy uses this to emit its own
notifications/tools/list_changed to downstream clients at the
right instant.
Replaces any previously-registered tools-list-changed callback.
All clones of this Connection share the same callback slot.
Sourcepub fn set_on_resources_list_changed<F>(&self, callback: F)
pub fn set_on_resources_list_changed<F>(&self, callback: F)
Register a callback to fire whenever the upstream emits
notifications/resources/list_changed. Same timing contract as
Connection::set_on_tools_list_changed.
Replaces any previously-registered resources-list-changed callback.
All clones of this Connection share the same callback slot.
Sourcepub async fn set_extra_headers(&self, extras: IndexMap<String, String>)
pub async fn set_extra_headers(&self, extras: IndexMap<String, String>)
Atomically replace the connection’s ConnectionInner::extra_headers
bag. Every subsequent outbound HTTP request from this connection
stamps the new map AFTER headers, with HeaderMap::insert
REPLACE semantics — keys in extras override the same key in
the per-URL headers bag set at Client::connect. Caller
supplies the FULL replacement map; missing keys are dropped
(no merge).
Used by the proxy to inject session-global headers
(X-OBJECTIVEAI-RESPONSE-ID, X-OBJECTIVEAI-RESPONSE-IDS)
that re-set on every inbound initialize, without re-dialing
the upstream.
Trait Implementations§
Source§impl Clone for Connection
impl Clone for Connection
Source§impl Debug for Connection
impl Debug for Connection
Source§impl Deref for Connection
impl Deref for Connection
Source§type Target = ConnectionInner
type Target = ConnectionInner
Source§fn deref(&self) -> &ConnectionInner
fn deref(&self) -> &ConnectionInner
Auto Trait Implementations§
impl !RefUnwindSafe for Connection
impl !UnwindSafe for Connection
impl Freeze for Connection
impl Send for Connection
impl Sync for Connection
impl Unpin for Connection
impl UnsafeUnpin for Connection
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more