Skip to main content

ConnectionTo

Struct ConnectionTo 

Source
pub struct ConnectionTo<Counterpart: Role> { /* private fields */ }
Expand description

Connection context for sending messages and spawning tasks.

This is the primary handle for interacting with the JSON-RPC connection from within handler callbacks. You can use it to:

  • Send requests and notifications to the other side
  • Spawn concurrent tasks that run alongside the connection
  • Respond to requests (via Responder which wraps this)

§Cloning

ConnectionTo is cheaply cloneable - all clones refer to the same underlying connection. This makes it easy to share across async tasks.

§Event Loop and Concurrency

Handler callbacks run on the event loop, which means the connection cannot process new messages while your handler is running. Use spawn to offload any expensive or blocking work to concurrent tasks.

See the Event Loop and Concurrency section for more details.

Implementations§

Source§

impl<Counterpart: Role> ConnectionTo<Counterpart>

Source

pub fn counterpart(&self) -> Counterpart

Return the counterpart role this connection is talking to.

Source

pub fn spawn( &self, task: impl IntoFuture<Output = Result<(), Error>, IntoFuture: Send + 'static>, ) -> Result<(), Error>

Spawns a task that will run so long as the JSON-RPC connection is being served.

This is the primary mechanism for offloading expensive work from handler callbacks to avoid blocking the event loop. Spawned tasks run concurrently with the connection, allowing the server to continue processing messages.

§Event Loop

Handler callbacks run on the event loop, which cannot process new messages while your handler is running. Use spawn for any expensive operations:

connection.on_receive_request(async |req: ProcessRequest, responder, cx| {
    // Clone cx for the spawned task
    cx.spawn({
        let connection = cx.clone();
        async move {
            let result = expensive_operation(&req.data).await?;
            connection.send_notification(ProcessComplete { result })?;
            Ok(())
        }
    })?;

    // Respond immediately
    responder.respond(ProcessResponse { result: "started".into() })
}, agent_client_protocol::on_receive_request!())
§Errors

If the spawned task returns an error, the entire server will shut down.

Source

pub fn spawn_connection<R: Role>( &self, builder: Builder<R, impl HandleDispatchFrom<R::Counterpart> + 'static, impl RunWithConnectionTo<R::Counterpart> + 'static>, transport: impl ConnectTo<R> + 'static, ) -> Result<ConnectionTo<R::Counterpart>, Error>

Spawn a JSON-RPC connection in the background and return a ConnectionTo for sending messages to it.

This is useful for creating multiple connections that communicate with each other, such as implementing proxy patterns or connecting to multiple backend services.

§Arguments
  • builder: The connection builder with handlers configured
  • transport: The transport component to connect to
§Returns

A ConnectionTo that you can use to send requests and notifications to the spawned connection.

§Example: Proxying to a backend connection
// Set up a backend connection builder
let backend = UntypedRole.builder()
    .on_receive_request(async |req: MyRequest, responder, _cx| {
        responder.respond(MyResponse { status: "ok".into() })
    }, agent_client_protocol::on_receive_request!());

// Spawn it and get a context to send requests to it
let backend_connection = cx.spawn_connection(backend, MockTransport)?;

// Now you can forward requests to the backend
let response = backend_connection.send_request(MyRequest {}).block_task().await?;
Source

pub fn send_proxied_message<Req: JsonRpcRequest<Response: Send>, Notif: JsonRpcNotification>( &self, message: Dispatch<Req, Notif>, ) -> Result<(), Error>
where Counterpart: HasPeer<Counterpart>,

Send a request/notification and forward the response appropriately.

The request context’s response type matches the request’s response type, enabling type-safe message forwarding.

Source

pub fn send_proxied_message_to<Peer: Role, Req: JsonRpcRequest<Response: Send>, Notif: JsonRpcNotification>( &self, peer: Peer, message: Dispatch<Req, Notif>, ) -> Result<(), Error>
where Counterpart: HasPeer<Peer>,

Send a request/notification and forward the response appropriately.

The request context’s response type matches the request’s response type, enabling type-safe message forwarding.

Source

pub fn send_request<Req: JsonRpcRequest>( &self, request: Req, ) -> SentRequest<Req::Response>
where Counterpart: HasPeer<Counterpart>,

Send an outgoing request and return a SentRequest for handling the reply.

The returned SentRequest provides methods for receiving the response without blocking the event loop:

  • on_receiving_result - Schedule a callback to run when the response arrives (doesn’t block the event loop)
  • block_task - Block the current task until the response arrives (only safe in spawned tasks, not in handlers)
§Anti-Footgun Design

The API intentionally makes it difficult to block on the result directly to prevent the common mistake of blocking the event loop while waiting for a response:

// ❌ This doesn't compile - prevents blocking the event loop
let response = cx.send_request(MyRequest {}).await?;
// ✅ Option 1: Schedule callback (safe in handlers)
cx.send_request(MyRequest {})
    .on_receiving_result(async |result| {
        // Handle the response
        Ok(())
    })?;

// ✅ Option 2: Block in spawned task (safe because task is concurrent)
cx.spawn({
    let cx = cx.clone();
    async move {
        let response = cx.send_request(MyRequest {})
            .block_task()
            .await?;
        // Process response...
        Ok(())
    }
})?;

Send an outgoing request to the default counterpart peer.

This is a convenience method that sends to the counterpart role R. For explicit control over the target peer, use send_request_to.

Source

pub fn send_request_to<Peer: Role, Req: JsonRpcRequest>( &self, peer: Peer, request: Req, ) -> SentRequest<Req::Response>
where Counterpart: HasPeer<Peer>,

Send an outgoing request to a specific peer.

The message will be transformed according to the HasPeer implementation before being sent.

Source

pub fn send_notification<N: JsonRpcNotification>( &self, notification: N, ) -> Result<(), Error>
where Counterpart: HasPeer<Counterpart>,

Send an outgoing notification to the default counterpart peer (no reply expected).

Notifications are fire-and-forget messages that don’t have IDs and don’t expect responses. This method sends the notification immediately and returns.

This is a convenience method that sends to the counterpart role R. For explicit control over the target peer, use send_notification_to.

cx.send_notification(StatusUpdate {
    message: "Processing...".into(),
})?;
Source

pub fn send_notification_to<Peer: Role, N: JsonRpcNotification>( &self, peer: Peer, notification: N, ) -> Result<(), Error>
where Counterpart: HasPeer<Peer>,

Send an outgoing notification to a specific peer (no reply expected).

The message will be transformed according to the HasPeer implementation before being sent.

Source

pub fn send_error_notification(&self, error: Error) -> Result<(), Error>

Send an error notification (no reply expected).

Source

pub fn add_dynamic_handler( &self, handler: impl HandleDispatchFrom<Counterpart> + 'static, ) -> Result<DynamicHandlerRegistration<Counterpart>, Error>

Register a dynamic message handler, used to intercept messages specific to a particular session or some similar modal thing.

Dynamic message handlers are called first for every incoming message.

If they decline to handle the message, then the message is passed to the regular registered handlers.

The handler will stay registered until the returned registration guard is dropped.

Source§

impl<Counterpart> ConnectionTo<Counterpart>
where Counterpart: HasPeer<Agent> + Role,

Source

pub fn build_session( &self, cwd: impl AsRef<Path>, ) -> SessionBuilder<Counterpart, NullRun>

Session builder for a new session request.

Source

pub fn build_session_cwd( &self, ) -> Result<SessionBuilder<Counterpart, NullRun>, Error>

Session builder using the current working directory.

This is a convenience wrapper around build_session that uses std::env::current_dir to get the working directory.

Returns an error if the current directory cannot be determined.

Source

pub fn build_session_from( &self, request: NewSessionRequest, ) -> SessionBuilder<Counterpart, NullRun>

Session builder starting from an existing request.

Use this when you’ve intercepted a session.new request and want to modify it (e.g., inject MCP servers) before forwarding.

Source

pub fn attach_session<'responder>( &self, response: NewSessionResponse, mcp_handler_registrations: Vec<DynamicHandlerRegistration<Counterpart>>, ) -> Result<ActiveSession<'responder, Counterpart>, Error>

Given a session response received from the agent, attach a handler to process messages related to this session and let you access them.

Normally you would not use this method directly but would instead use Self::build_session and then SessionBuilder::start_session.

The vector dynamic_handler_registrations contains any dynamic handle registrations associated with this session (e.g., from MCP servers). You can simply pass Default::default() if not applicable.

Trait Implementations§

Source§

impl<Counterpart: Clone + Role> Clone for ConnectionTo<Counterpart>

Source§

fn clone(&self) -> ConnectionTo<Counterpart>

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl<Counterpart: Debug + Role> Debug for ConnectionTo<Counterpart>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl<Counterpart> Freeze for ConnectionTo<Counterpart>
where Counterpart: Freeze,

§

impl<Counterpart> !RefUnwindSafe for ConnectionTo<Counterpart>

§

impl<Counterpart> Send for ConnectionTo<Counterpart>

§

impl<Counterpart> Sync for ConnectionTo<Counterpart>

§

impl<Counterpart> Unpin for ConnectionTo<Counterpart>
where Counterpart: Unpin,

§

impl<Counterpart> UnsafeUnpin for ConnectionTo<Counterpart>
where Counterpart: UnsafeUnpin,

§

impl<Counterpart> !UnwindSafe for ConnectionTo<Counterpart>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> DynClone for T
where T: Clone,

Source§

fn __clone_box(&self, _: Private) -> *mut ()

Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoMaybeUndefined<T> for T

Source§

impl<T> IntoOption<T> for T

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more