pub struct JrConnectionBuilder<H: JrMessageHandler> { /* private fields */ }Expand description
A JSON-RPC connection that can act as either a server, client, or both.
JrConnection provides a builder-style API for creating JSON-RPC servers and clients.
You start by calling Role::builder() (e.g., ClientToAgent::builder()), then add message
handlers, and finally drive the connection with either serve
or with_client, providing a component implementation
(e.g., ByteStreams for byte streams).
§JSON-RPC Primer
JSON-RPC 2.0 has two fundamental message types:
- Requests - Messages that expect a response. They have an
idfield that gets echoed back in the response so the sender can correlate them. - Notifications - Fire-and-forget messages with no
idfield. The sender doesn’t expect or receive a response.
§Type-Driven Message Dispatch
The handler registration methods use Rust’s type system to determine which messages to handle. The type parameter you provide controls what gets dispatched to your handler:
§Single Message Types
The simplest case - handle one specific message type:
connection
.on_receive_request(async |req: InitializeRequest, request_cx, cx| {
// Handle only InitializeRequest messages
request_cx.respond(InitializeResponse::make())
})
.on_receive_notification(async |notif: SessionNotification, cx| {
// Handle only SessionUpdate notifications
Ok(())
})§Enum Message Types
You can also handle multiple related messages with a single handler by defining an enum
that implements the appropriate trait (JrRequest or JrNotification):
// Define an enum for multiple request types
#[derive(Debug, Clone)]
enum MyRequests {
Initialize(InitializeRequest),
Prompt(PromptRequest),
}
// Implement JrRequest for your enum
impl JrRequest for MyRequests { type Response = serde_json::Value; }
// Handle all variants in one place
connection.on_receive_request(async |req: MyRequests, request_cx, cx| {
match req {
MyRequests::Initialize(init) => { request_cx.respond(serde_json::json!({})) }
MyRequests::Prompt(prompt) => { request_cx.respond(serde_json::json!({})) }
}
})§Mixed Message Types
For enums containing both requests AND notifications, use on_receive_message:
// on_receive_message receives MessageCx which can be either a request or notification
connection.on_receive_message(async |msg: MessageCx<InitializeRequest, SessionNotification>, _cx| {
match msg {
MessageCx::Request(req, request_cx) => {
request_cx.respond(InitializeResponse::make())
}
MessageCx::Notification(notif) => {
Ok(())
}
}
})§Handler Registration
Register handlers using these methods (listed from most common to most flexible):
on_receive_request- Handle JSON-RPC requests (messages expecting responses)on_receive_notification- Handle JSON-RPC notifications (fire-and-forget)on_receive_message- Handle enums containing both requests and notificationswith_handler- Low-level primitive for maximum flexibility
§Handler Ordering
Handlers are tried in the order you register them. The first handler that claims a message (by matching its type) will process it. Subsequent handlers won’t see that message:
connection
.on_receive_request(async |req: InitializeRequest, request_cx, cx| {
// This runs first for InitializeRequest
request_cx.respond(InitializeResponse::make())
})
.on_receive_request(async |req: PromptRequest, request_cx, cx| {
// This runs first for PromptRequest
request_cx.respond(PromptResponse::make())
})
.on_receive_message(async |msg: MessageCx, cx| {
// This runs for any message not handled above
msg.respond_with_error(sacp::util::internal_error("unknown method"), cx)
})§Event Loop and Concurrency
Understanding the event loop is critical for writing correct handlers.
§The Event Loop
JrConnection runs all handler callbacks on a single async task - the event loop.
While a handler is running, the server cannot receive new messages. This means
any blocking or expensive work in your handlers will stall the entire connection.
To avoid blocking the event loop, use JrConnectionCx::spawn to offload serious
work to concurrent tasks:
connection.on_receive_request(async |req: AnalyzeRequest, request_cx, cx| {
// Clone cx for the spawned task
cx.spawn({
let connection_cx = cx.clone();
async move {
let result = expensive_analysis(&req.data).await?;
connection_cx.send_notification(AnalysisComplete { result })?;
Ok(())
}
})?;
// Respond immediately without blocking
request_cx.respond(AnalysisStarted { job_id: 42 })
})Note that the entire connection runs within one async task, so parallelism must be
managed explicitly using spawn.
§The Connection Context
Handler callbacks receive a context object (cx) for interacting with the connection:
- For request handlers -
JrRequestCx<R>providesrespondto send the response, plus methods to send other messages - For notification handlers -
JrConnectionCxprovides methods to send messages and spawn tasks
Both context types support:
send_request- Send requests to the other sidesend_notification- Send notificationsspawn- Run tasks concurrently without blocking the event loop
The JrResponse returned by send_request provides methods like
await_when_result_received that help you
avoid accidentally blocking the event loop while waiting for responses.
§Driving the Connection
After adding handlers, you must drive the connection using one of two modes:
§Server Mode: serve()
Use serve when you only need to respond to incoming messages:
connection
.on_receive_request(async |req: MyRequest, request_cx, cx| {
request_cx.respond(MyResponse { status: "ok".into() })
})
.serve(MockTransport) // Runs until connection closes or error occurs
.await?;The connection will process incoming messages and invoke your handlers until the connection is closed or an error occurs.
§Client Mode: with_client()
Use with_client when you need to both handle incoming messages
AND send your own requests/notifications:
connection
.on_receive_request(async |req: MyRequest, request_cx, cx| {
request_cx.respond(MyResponse { status: "ok".into() })
})
.with_client(MockTransport, async |cx| {
// You can send requests to the other side
let response = cx.send_request(InitializeRequest::make())
.block_task()
.await?;
// And send notifications
cx.send_notification(StatusUpdate { message: "ready".into() })?;
Ok(())
})
.await?;The connection will serve incoming messages in the background while your client closure runs. When the closure returns, the connection shuts down.
§Example: Complete Agent
let transport = ByteStreams::new(
tokio::io::stdout().compat_write(),
tokio::io::stdin().compat(),
);
UntypedRole::builder()
.name("my-agent") // Optional: for debugging logs
.on_receive_request(async |init: InitializeRequest, request_cx, cx| {
let response: InitializeResponse = todo!();
request_cx.respond(response)
})
.on_receive_request(async |prompt: PromptRequest, request_cx, cx| {
// You can send notifications while processing a request
let notif: SessionNotification = todo!();
cx.send_notification(notif)?;
// Then respond to the request
let response: PromptResponse = todo!();
request_cx.respond(response)
})
.serve(transport)
.await?;Implementations§
Source§impl<H: JrMessageHandler> JrConnectionBuilder<H>
impl<H: JrMessageHandler> JrConnectionBuilder<H>
Sourcepub fn new_with(handler: H) -> Self
pub fn new_with(handler: H) -> Self
Create a new handler chain with the given handler and roles.
Sourcepub fn name(self, name: impl ToString) -> Self
pub fn name(self, name: impl ToString) -> Self
Set the “name” of this connection – used only for debugging logs.
Sourcepub fn with_handler_chain<H1>(
self,
handler_chain: JrConnectionBuilder<H1>,
) -> JrConnectionBuilder<ChainedHandler<H, NamedHandler<H1>>>where
H1: JrMessageHandler<Role = H::Role>,
pub fn with_handler_chain<H1>(
self,
handler_chain: JrConnectionBuilder<H1>,
) -> JrConnectionBuilder<ChainedHandler<H, NamedHandler<H1>>>where
H1: JrMessageHandler<Role = H::Role>,
Returns a reference to the local role.
Returns a reference to the remote role.
Add a new JrMessageHandler to the chain.
Prefer Self::on_receive_request or Self::on_receive_notification.
This is a low-level method that is not intended for general use.
Sourcepub fn with_handler<H1>(
self,
handler: H1,
) -> JrConnectionBuilder<ChainedHandler<H, H1>>where
H1: JrMessageHandler<Role = H::Role>,
pub fn with_handler<H1>(
self,
handler: H1,
) -> JrConnectionBuilder<ChainedHandler<H, H1>>where
H1: JrMessageHandler<Role = H::Role>,
Add a new JrMessageHandler to the chain.
Prefer Self::on_receive_request or Self::on_receive_notification.
This is a low-level method that is not intended for general use.
Sourcepub fn with_spawned<F>(
self,
task: impl FnOnce(JrConnectionCx<H::Role>) -> F + Send + 'static,
) -> Self
pub fn with_spawned<F>( self, task: impl FnOnce(JrConnectionCx<H::Role>) -> F + Send + 'static, ) -> Self
Enqueue a task to run once the connection is actively serving traffic.
Sourcepub fn on_receive_message<Req, Notif, F, T>(
self,
op: F,
) -> JrConnectionBuilder<ChainedHandler<H, MessageHandler<H::Role, <H::Role as JrRole>::HandlerEndpoint, Req, Notif, F>>>where
H::Role: HasEndpoint<<H::Role as JrRole>::HandlerEndpoint>,
Req: JrRequest,
Notif: JrNotification,
F: AsyncFnMut(MessageCx<Req, Notif>, JrConnectionCx<H::Role>) -> Result<T, Error> + Send,
T: IntoHandled<MessageCx<Req, Notif>>,
pub fn on_receive_message<Req, Notif, F, T>(
self,
op: F,
) -> JrConnectionBuilder<ChainedHandler<H, MessageHandler<H::Role, <H::Role as JrRole>::HandlerEndpoint, Req, Notif, F>>>where
H::Role: HasEndpoint<<H::Role as JrRole>::HandlerEndpoint>,
Req: JrRequest,
Notif: JrNotification,
F: AsyncFnMut(MessageCx<Req, Notif>, JrConnectionCx<H::Role>) -> Result<T, Error> + Send,
T: IntoHandled<MessageCx<Req, Notif>>,
Register a handler for messages that can be either requests OR notifications.
Use this when you want to handle an enum type that contains both request and
notification variants. Your handler receives a MessageCx<Req, Notif> which
is an enum with two variants:
MessageCx::Request(request, request_cx)- A request with its response contextMessageCx::Notification(notification)- A notification
§Example
connection.on_receive_message(async |message: MessageCx<MyRequest, StatusUpdate>, _cx| {
match message {
MessageCx::Request(req, request_cx) => {
// Handle request and send response
request_cx.respond(MyResponse { status: "ok".into() })
}
MessageCx::Notification(notif) => {
// Handle notification (no response needed)
Ok(())
}
}
})For most use cases, prefer on_receive_request or
on_receive_notification which provide cleaner APIs
for handling requests or notifications separately.
Sourcepub fn on_receive_request<Req: JrRequest, F, T>(
self,
op: F,
) -> JrConnectionBuilder<ChainedHandler<H, RequestHandler<H::Role, <H::Role as JrRole>::HandlerEndpoint, Req, F>>>where
H::Role: HasEndpoint<<H::Role as JrRole>::HandlerEndpoint>,
F: AsyncFnMut(Req, JrRequestCx<Req::Response>, JrConnectionCx<H::Role>) -> Result<T, Error> + Send,
T: IntoHandled<(Req, JrRequestCx<Req::Response>)>,
pub fn on_receive_request<Req: JrRequest, F, T>(
self,
op: F,
) -> JrConnectionBuilder<ChainedHandler<H, RequestHandler<H::Role, <H::Role as JrRole>::HandlerEndpoint, Req, F>>>where
H::Role: HasEndpoint<<H::Role as JrRole>::HandlerEndpoint>,
F: AsyncFnMut(Req, JrRequestCx<Req::Response>, JrConnectionCx<H::Role>) -> Result<T, Error> + Send,
T: IntoHandled<(Req, JrRequestCx<Req::Response>)>,
Register a handler for JSON-RPC requests of type Req.
Your handler receives two arguments:
- The request (type
Req) - A
JrRequestCx<R, Req::Response>for sending the response
The request context allows you to:
- Send the response with
JrRequestCx::respond - Send notifications to the client with
JrConnectionCx::send_notification - Send requests to the client with
JrConnectionCx::send_request
§Example
connection.on_receive_request(async |request: PromptRequest, request_cx, cx| {
// Send a notification while processing
let notif: SessionNotification = todo!();
cx.send_notification(notif)?;
// Do some work...
let result = todo!("process the prompt");
// Send the response
let response: PromptResponse = todo!();
request_cx.respond(response)
});§Type Parameter
Req can be either a single request type or an enum of multiple request types.
See the type-driven dispatch section for details.
Sourcepub fn on_receive_notification<Notif, F, T>(
self,
op: F,
) -> JrConnectionBuilder<ChainedHandler<H, NotificationHandler<H::Role, <H::Role as JrRole>::HandlerEndpoint, Notif, F>>>where
H::Role: HasEndpoint<<H::Role as JrRole>::HandlerEndpoint>,
Notif: JrNotification,
F: AsyncFnMut(Notif, JrConnectionCx<H::Role>) -> Result<T, Error> + Send,
T: IntoHandled<(Notif, JrConnectionCx<H::Role>)>,
pub fn on_receive_notification<Notif, F, T>(
self,
op: F,
) -> JrConnectionBuilder<ChainedHandler<H, NotificationHandler<H::Role, <H::Role as JrRole>::HandlerEndpoint, Notif, F>>>where
H::Role: HasEndpoint<<H::Role as JrRole>::HandlerEndpoint>,
Notif: JrNotification,
F: AsyncFnMut(Notif, JrConnectionCx<H::Role>) -> Result<T, Error> + Send,
T: IntoHandled<(Notif, JrConnectionCx<H::Role>)>,
Register a handler for JSON-RPC notifications of type Notif.
Notifications are fire-and-forget messages that don’t expect a response. Your handler receives:
- The notification (type
Notif) - A
JrConnectionCx<R>for sending messages to the other side
Unlike request handlers, you cannot send a response (notifications don’t have IDs), but you can still send your own requests and notifications using the context.
§Example
connection.on_receive_notification(async |notif: SessionUpdate, cx| {
// Process the notification
update_session_state(¬if)?;
// Optionally send a notification back
cx.send_notification(StatusUpdate {
message: "Acknowledged".into(),
})?;
Ok(())
})§Type Parameter
Notif can be either a single notification type or an enum of multiple notification types.
See the type-driven dispatch section for details.
Sourcepub fn on_receive_message_from<Req: JrRequest, Notif: JrNotification, End: JrEndpoint, F, T>(
self,
endpoint: End,
op: F,
) -> JrConnectionBuilder<ChainedHandler<H, MessageHandler<H::Role, End, Req, Notif, F>>>where
H::Role: HasEndpoint<End>,
F: AsyncFnMut(MessageCx<Req, Notif>, JrConnectionCx<H::Role>) -> Result<T, Error> + Send,
T: IntoHandled<MessageCx<Req, Notif>>,
pub fn on_receive_message_from<Req: JrRequest, Notif: JrNotification, End: JrEndpoint, F, T>(
self,
endpoint: End,
op: F,
) -> JrConnectionBuilder<ChainedHandler<H, MessageHandler<H::Role, End, Req, Notif, F>>>where
H::Role: HasEndpoint<End>,
F: AsyncFnMut(MessageCx<Req, Notif>, JrConnectionCx<H::Role>) -> Result<T, Error> + Send,
T: IntoHandled<MessageCx<Req, Notif>>,
Register a handler for messages from a specific endpoint.
This is similar to on_receive_message, but allows
specifying the source endpoint explicitly. This is useful when receiving messages
from an endpoint that requires message transformation (e.g., unwrapping SuccessorMessage
envelopes when receiving from an agent via a proxy).
For the common case of receiving from the default counterpart, use
on_receive_message instead.
Sourcepub fn on_receive_request_from<Req: JrRequest, End: JrEndpoint, F, T>(
self,
endpoint: End,
op: F,
) -> JrConnectionBuilder<ChainedHandler<H, RequestHandler<H::Role, End, Req, F>>>where
H::Role: HasEndpoint<End>,
F: AsyncFnMut(Req, JrRequestCx<Req::Response>, JrConnectionCx<H::Role>) -> Result<T, Error> + Send,
T: IntoHandled<(Req, JrRequestCx<Req::Response>)>,
pub fn on_receive_request_from<Req: JrRequest, End: JrEndpoint, F, T>(
self,
endpoint: End,
op: F,
) -> JrConnectionBuilder<ChainedHandler<H, RequestHandler<H::Role, End, Req, F>>>where
H::Role: HasEndpoint<End>,
F: AsyncFnMut(Req, JrRequestCx<Req::Response>, JrConnectionCx<H::Role>) -> Result<T, Error> + Send,
T: IntoHandled<(Req, JrRequestCx<Req::Response>)>,
Register a handler for JSON-RPC requests from a specific endpoint.
This is similar to on_receive_request, but allows
specifying the source endpoint explicitly. This is useful when receiving messages
from an endpoint that requires message transformation (e.g., unwrapping SuccessorRequest
envelopes when receiving from an agent via a proxy).
For the common case of receiving from the default counterpart, use
on_receive_request instead.
§Example
use sacp::Agent;
use sacp::schema::InitializeRequest;
// Conductor receiving from agent direction - messages will be unwrapped from SuccessorMessage
connection.on_receive_request_from(Agent, async |req: InitializeRequest, request_cx, cx| {
// Handle the request
request_cx.respond(InitializeResponse::make())
})Sourcepub fn on_receive_notification_from<Notif: JrNotification, End: JrEndpoint, F, T>(
self,
endpoint: End,
op: F,
) -> JrConnectionBuilder<ChainedHandler<H, NotificationHandler<H::Role, End, Notif, F>>>where
H::Role: HasEndpoint<End>,
F: AsyncFnMut(Notif, JrConnectionCx<H::Role>) -> Result<T, Error> + Send,
T: IntoHandled<(Notif, JrConnectionCx<H::Role>)>,
pub fn on_receive_notification_from<Notif: JrNotification, End: JrEndpoint, F, T>(
self,
endpoint: End,
op: F,
) -> JrConnectionBuilder<ChainedHandler<H, NotificationHandler<H::Role, End, Notif, F>>>where
H::Role: HasEndpoint<End>,
F: AsyncFnMut(Notif, JrConnectionCx<H::Role>) -> Result<T, Error> + Send,
T: IntoHandled<(Notif, JrConnectionCx<H::Role>)>,
Register a handler for JSON-RPC notifications from a specific endpoint.
This is similar to on_receive_notification, but allows
specifying the source endpoint explicitly. This is useful when receiving messages
from an endpoint that requires message transformation (e.g., unwrapping SuccessorNotification
envelopes when receiving from an agent via a proxy).
For the common case of receiving from the default counterpart, use
on_receive_notification instead.
Sourcepub fn provide_mcp<Role>(
self,
registry: McpServiceRegistry<Role>,
) -> JrConnectionBuilder<ChainedHandler<H, McpServiceRegistry<Role>>>
pub fn provide_mcp<Role>( self, registry: McpServiceRegistry<Role>, ) -> JrConnectionBuilder<ChainedHandler<H, McpServiceRegistry<Role>>>
Provide MCP servers to downstream successors.
This adds a handler that intercepts session/new requests to include the
registered MCP servers, and handles MCP-over-ACP communication.
§Example
use sacp::mcp_server::McpServiceRegistry;
use sacp::ProxyToConductor;
ProxyToConductor::builder()
.name("my-proxy")
.provide_mcp(McpServiceRegistry::default().with_mcp_server("example", my_server)?)
.serve(connection)
.await?;Sourcepub fn connect_to(
self,
transport: impl Component + 'static,
) -> Result<JrConnection<H>, Error>
pub fn connect_to( self, transport: impl Component + 'static, ) -> Result<JrConnection<H>, Error>
Connect these handlers to a transport layer. The resulting connection must then be either served or used as a client.
Sourcepub async fn apply(
&mut self,
message: MessageCx,
cx: JrConnectionCx<H::Role>,
) -> Result<Handled<MessageCx>, Error>
pub async fn apply( &mut self, message: MessageCx, cx: JrConnectionCx<H::Role>, ) -> Result<Handled<MessageCx>, Error>
Apply the handler chain to a single message.
This method processes one message through the entire handler chain, attempting to match it against each registered handler in order. This is useful when implementing custom message handling logic or when you need fine-grained control over message processing.
§Returns
Ok(Handled::Claimed)- A handler claimed and processed the messageOk(Handled::Unclaimed(message))- No handler matched the messageErr(_)- A handler encountered an error while processing
§Borrow Checker Considerations
You may find that [MatchMessage] is a better choice than this method
for implementing custom handlers. It offers a very similar API to
JrConnectionBuilder but is structured to apply each test one at a time
(sequentially) instead of setting them all up at once. This sequential approach
often interacts better with the borrow checker, at the cost of requiring .await
calls between each handler and only working for processing a single message.
§Example: Borrow Checker Challenges
When building a handler chain with async {} blocks (non-move), you might encounter
borrow checker errors if multiple handlers need access to the same mutable state:
let mut state = String::from("initial");
// This fails to compile because both handlers borrow `state` mutably,
// and the futures are set up at the same time (even though only one will run)
let chain = UntypedRole::builder()
.on_receive_request(async |req: InitializeRequest, cx: JrRequestCx| {
state.push_str(" - initialized"); // First mutable borrow
cx.respond(InitializeResponse::make())
})
.on_receive_request(async |req: PromptRequest, cx: JrRequestCx| {
state.push_str(" - prompted"); // Second mutable borrow - ERROR!
cx.respond(PromptResponse { content: vec![], stopReason: None })
});You can work around this by using apply() to process messages one at a time,
or use [MatchMessage] which provides a similar API but applies handlers sequentially:
use sacp::{MessageCx, Handled};
use sacp::util::MatchMessage;
async fn handle_with_state(
message: MessageCx,
state: &mut String,
) -> Result<Handled<MessageCx>, sacp::Error> {
MatchMessage::new(message)
.on_request(async |req: InitializeRequest, request_cx| {
state.push_str(" - initialized"); // Sequential - OK!
request_cx.respond(InitializeResponse::make())
})
.on_request(async |req: PromptRequest, request_cx| {
state.push_str(" - prompted"); // Sequential - OK!
request_cx.respond(PromptResponse { content: vec![], stopReason: None })
})
.otherwise(async |msg| Ok(Handled::Unclaimed(msg)))
.await
}Sourcepub async fn serve(
self,
transport: impl Component + 'static,
) -> Result<(), Error>
pub async fn serve( self, transport: impl Component + 'static, ) -> Result<(), Error>
Convenience method to connect to a transport and serve.
This is equivalent to:
handler_chain.connect_to(transport)?.serve().awaitSourcepub async fn with_client(
self,
transport: impl Component + 'static,
main_fn: impl AsyncFnOnce(JrConnectionCx<H::Role>) -> Result<(), Error>,
) -> Result<(), Error>
pub async fn with_client( self, transport: impl Component + 'static, main_fn: impl AsyncFnOnce(JrConnectionCx<H::Role>) -> Result<(), Error>, ) -> Result<(), Error>
Convenience method to connect to a transport and run a client function.
This is equivalent to:
handler_chain.connect_to(transport)?.with_client(main_fn).await