pub struct ConnectionTo<Counterpart: Role> { /* private fields */ }Expand description
Connection context for sending messages and spawning tasks.
This is the primary handle for interacting with the JSON-RPC connection from within handler callbacks. You can use it to:
- Send requests and notifications to the other side
- Spawn concurrent tasks that run alongside the connection
- Respond to requests (via
Responderwhich wraps this)
§Cloning
ConnectionTo is cheaply cloneable - all clones refer to the same underlying connection.
This makes it easy to share across async tasks.
§Event Loop and Concurrency
Handler callbacks run on the event loop, which means the connection cannot process new
messages while your handler is running. Use spawn to offload any
expensive or blocking work to concurrent tasks.
See the Event Loop and Concurrency section for more details.
Implementations§
Source§impl<Counterpart: Role> ConnectionTo<Counterpart>
impl<Counterpart: Role> ConnectionTo<Counterpart>
Sourcepub fn counterpart(&self) -> Counterpart
pub fn counterpart(&self) -> Counterpart
Return the counterpart role this connection is talking to.
Sourcepub fn spawn(
&self,
task: impl IntoFuture<Output = Result<(), Error>, IntoFuture: Send + 'static>,
) -> Result<(), Error>
pub fn spawn( &self, task: impl IntoFuture<Output = Result<(), Error>, IntoFuture: Send + 'static>, ) -> Result<(), Error>
Spawns a task that will run so long as the JSON-RPC connection is being served.
This is the primary mechanism for offloading expensive work from handler callbacks to avoid blocking the event loop. Spawned tasks run concurrently with the connection, allowing the server to continue processing messages.
§Event Loop
Handler callbacks run on the event loop, which cannot process new messages while
your handler is running. Use spawn for any expensive operations:
connection.on_receive_request(async |req: ProcessRequest, responder, cx| {
// Clone cx for the spawned task
cx.spawn({
let connection = cx.clone();
async move {
let result = expensive_operation(&req.data).await?;
connection.send_notification(ProcessComplete { result })?;
Ok(())
}
})?;
// Respond immediately
responder.respond(ProcessResponse { result: "started".into() })
}, agent_client_protocol::on_receive_request!())§Errors
If the spawned task returns an error, the entire server will shut down.
Sourcepub fn spawn_connection<R: Role>(
&self,
builder: Builder<R, impl HandleDispatchFrom<R::Counterpart> + 'static, impl RunWithConnectionTo<R::Counterpart> + 'static>,
transport: impl ConnectTo<R> + 'static,
) -> Result<ConnectionTo<R::Counterpart>, Error>
pub fn spawn_connection<R: Role>( &self, builder: Builder<R, impl HandleDispatchFrom<R::Counterpart> + 'static, impl RunWithConnectionTo<R::Counterpart> + 'static>, transport: impl ConnectTo<R> + 'static, ) -> Result<ConnectionTo<R::Counterpart>, Error>
Spawn a JSON-RPC connection in the background and return a ConnectionTo for sending messages to it.
This is useful for creating multiple connections that communicate with each other, such as implementing proxy patterns or connecting to multiple backend services.
§Arguments
builder: The connection builder with handlers configuredtransport: The transport component to connect to
§Returns
A ConnectionTo that you can use to send requests and notifications to the spawned connection.
§Example: Proxying to a backend connection
// Set up a backend connection builder
let backend = UntypedRole.builder()
.on_receive_request(async |req: MyRequest, responder, _cx| {
responder.respond(MyResponse { status: "ok".into() })
}, agent_client_protocol::on_receive_request!());
// Spawn it and get a context to send requests to it
let backend_connection = cx.spawn_connection(backend, MockTransport)?;
// Now you can forward requests to the backend
let response = backend_connection.send_request(MyRequest {}).block_task().await?;Sourcepub fn send_proxied_message<Req: JsonRpcRequest<Response: Send>, Notif: JsonRpcNotification>(
&self,
message: Dispatch<Req, Notif>,
) -> Result<(), Error>where
Counterpart: HasPeer<Counterpart>,
pub fn send_proxied_message<Req: JsonRpcRequest<Response: Send>, Notif: JsonRpcNotification>(
&self,
message: Dispatch<Req, Notif>,
) -> Result<(), Error>where
Counterpart: HasPeer<Counterpart>,
Send a request/notification and forward the response appropriately.
The request context’s response type matches the request’s response type, enabling type-safe message forwarding.
Sourcepub fn send_proxied_message_to<Peer: Role, Req: JsonRpcRequest<Response: Send>, Notif: JsonRpcNotification>(
&self,
peer: Peer,
message: Dispatch<Req, Notif>,
) -> Result<(), Error>where
Counterpart: HasPeer<Peer>,
pub fn send_proxied_message_to<Peer: Role, Req: JsonRpcRequest<Response: Send>, Notif: JsonRpcNotification>(
&self,
peer: Peer,
message: Dispatch<Req, Notif>,
) -> Result<(), Error>where
Counterpart: HasPeer<Peer>,
Send a request/notification and forward the response appropriately.
The request context’s response type matches the request’s response type, enabling type-safe message forwarding.
Sourcepub fn send_request<Req: JsonRpcRequest>(
&self,
request: Req,
) -> SentRequest<Req::Response>where
Counterpart: HasPeer<Counterpart>,
pub fn send_request<Req: JsonRpcRequest>(
&self,
request: Req,
) -> SentRequest<Req::Response>where
Counterpart: HasPeer<Counterpart>,
Send an outgoing request and return a SentRequest for handling the reply.
The returned SentRequest provides methods for receiving the response without
blocking the event loop:
on_receiving_result- Schedule a callback to run when the response arrives (doesn’t block the event loop)block_task- Block the current task until the response arrives (only safe in spawned tasks, not in handlers)
§Anti-Footgun Design
The API intentionally makes it difficult to block on the result directly to prevent the common mistake of blocking the event loop while waiting for a response:
// ❌ This doesn't compile - prevents blocking the event loop
let response = cx.send_request(MyRequest {}).await?;// ✅ Option 1: Schedule callback (safe in handlers)
cx.send_request(MyRequest {})
.on_receiving_result(async |result| {
// Handle the response
Ok(())
})?;
// ✅ Option 2: Block in spawned task (safe because task is concurrent)
cx.spawn({
let cx = cx.clone();
async move {
let response = cx.send_request(MyRequest {})
.block_task()
.await?;
// Process response...
Ok(())
}
})?;Send an outgoing request to the default counterpart peer.
This is a convenience method that sends to the counterpart role R.
For explicit control over the target peer, use send_request_to.
Sourcepub fn send_request_to<Peer: Role, Req: JsonRpcRequest>(
&self,
peer: Peer,
request: Req,
) -> SentRequest<Req::Response>where
Counterpart: HasPeer<Peer>,
pub fn send_request_to<Peer: Role, Req: JsonRpcRequest>(
&self,
peer: Peer,
request: Req,
) -> SentRequest<Req::Response>where
Counterpart: HasPeer<Peer>,
Send an outgoing request to a specific peer.
The message will be transformed according to the HasPeer
implementation before being sent.
Sourcepub fn send_notification<N: JsonRpcNotification>(
&self,
notification: N,
) -> Result<(), Error>where
Counterpart: HasPeer<Counterpart>,
pub fn send_notification<N: JsonRpcNotification>(
&self,
notification: N,
) -> Result<(), Error>where
Counterpart: HasPeer<Counterpart>,
Send an outgoing notification to the default counterpart peer (no reply expected).
Notifications are fire-and-forget messages that don’t have IDs and don’t expect responses. This method sends the notification immediately and returns.
This is a convenience method that sends to the counterpart role R.
For explicit control over the target peer, use send_notification_to.
cx.send_notification(StatusUpdate {
message: "Processing...".into(),
})?;Sourcepub fn send_notification_to<Peer: Role, N: JsonRpcNotification>(
&self,
peer: Peer,
notification: N,
) -> Result<(), Error>where
Counterpart: HasPeer<Peer>,
pub fn send_notification_to<Peer: Role, N: JsonRpcNotification>(
&self,
peer: Peer,
notification: N,
) -> Result<(), Error>where
Counterpart: HasPeer<Peer>,
Send an outgoing notification to a specific peer (no reply expected).
The message will be transformed according to the HasPeer
implementation before being sent.
Sourcepub fn send_error_notification(&self, error: Error) -> Result<(), Error>
pub fn send_error_notification(&self, error: Error) -> Result<(), Error>
Send an error notification (no reply expected).
Sourcepub fn add_dynamic_handler(
&self,
handler: impl HandleDispatchFrom<Counterpart> + 'static,
) -> Result<DynamicHandlerRegistration<Counterpart>, Error>
pub fn add_dynamic_handler( &self, handler: impl HandleDispatchFrom<Counterpart> + 'static, ) -> Result<DynamicHandlerRegistration<Counterpart>, Error>
Register a dynamic message handler, used to intercept messages specific to a particular session or some similar modal thing.
Dynamic message handlers are called first for every incoming message.
If they decline to handle the message, then the message is passed to the regular registered handlers.
The handler will stay registered until the returned registration guard is dropped.
Source§impl<Counterpart> ConnectionTo<Counterpart>
impl<Counterpart> ConnectionTo<Counterpart>
Sourcepub fn build_session(
&self,
cwd: impl AsRef<Path>,
) -> SessionBuilder<Counterpart, NullRun>
pub fn build_session( &self, cwd: impl AsRef<Path>, ) -> SessionBuilder<Counterpart, NullRun>
Session builder for a new session request.
Sourcepub fn build_session_cwd(
&self,
) -> Result<SessionBuilder<Counterpart, NullRun>, Error>
pub fn build_session_cwd( &self, ) -> Result<SessionBuilder<Counterpart, NullRun>, Error>
Session builder using the current working directory.
This is a convenience wrapper around build_session
that uses std::env::current_dir to get the working directory.
Returns an error if the current directory cannot be determined.
Sourcepub fn build_session_from(
&self,
request: NewSessionRequest,
) -> SessionBuilder<Counterpart, NullRun>
pub fn build_session_from( &self, request: NewSessionRequest, ) -> SessionBuilder<Counterpart, NullRun>
Session builder starting from an existing request.
Use this when you’ve intercepted a session.new request and want to
modify it (e.g., inject MCP servers) before forwarding.
Sourcepub fn attach_session<'responder>(
&self,
response: NewSessionResponse,
mcp_handler_registrations: Vec<DynamicHandlerRegistration<Counterpart>>,
) -> Result<ActiveSession<'responder, Counterpart>, Error>
pub fn attach_session<'responder>( &self, response: NewSessionResponse, mcp_handler_registrations: Vec<DynamicHandlerRegistration<Counterpart>>, ) -> Result<ActiveSession<'responder, Counterpart>, Error>
Given a session response received from the agent, attach a handler to process messages related to this session and let you access them.
Normally you would not use this method directly but would
instead use Self::build_session and then SessionBuilder::start_session.
The vector dynamic_handler_registrations contains any dynamic
handle registrations associated with this session (e.g., from MCP servers).
You can simply pass Default::default() if not applicable.
Trait Implementations§
Source§impl<Counterpart: Clone + Role> Clone for ConnectionTo<Counterpart>
impl<Counterpart: Clone + Role> Clone for ConnectionTo<Counterpart>
Source§fn clone(&self) -> ConnectionTo<Counterpart>
fn clone(&self) -> ConnectionTo<Counterpart>
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read more