JrConnectionBuilder

Struct JrConnectionBuilder 

Source
pub struct JrConnectionBuilder<H: JrMessageHandler> { /* private fields */ }
Expand description

A JSON-RPC connection that can act as either a server, client, or both.

JrConnection provides a builder-style API for creating JSON-RPC servers and clients. You start by calling Role::builder() (e.g., ClientToAgent::builder()), then add message handlers, and finally drive the connection with either serve or with_client, providing a component implementation (e.g., ByteStreams for byte streams).

§JSON-RPC Primer

JSON-RPC 2.0 has two fundamental message types:

  • Requests - Messages that expect a response. They have an id field that gets echoed back in the response so the sender can correlate them.
  • Notifications - Fire-and-forget messages with no id field. The sender doesn’t expect or receive a response.

§Type-Driven Message Dispatch

The handler registration methods use Rust’s type system to determine which messages to handle. The type parameter you provide controls what gets dispatched to your handler:

§Single Message Types

The simplest case - handle one specific message type:

connection
    .on_receive_request(async |req: InitializeRequest, request_cx, cx| {
        // Handle only InitializeRequest messages
        request_cx.respond(InitializeResponse::make())
    })
    .on_receive_notification(async |notif: SessionNotification, cx| {
        // Handle only SessionUpdate notifications
        Ok(())
    })

§Enum Message Types

You can also handle multiple related messages with a single handler by defining an enum that implements the appropriate trait (JrRequest or JrNotification):

// Define an enum for multiple request types
#[derive(Debug, Clone)]
enum MyRequests {
    Initialize(InitializeRequest),
    Prompt(PromptRequest),
}

// Implement JrRequest for your enum
impl JrRequest for MyRequests { type Response = serde_json::Value; }

// Handle all variants in one place
connection.on_receive_request(async |req: MyRequests, request_cx, cx| {
    match req {
        MyRequests::Initialize(init) => { request_cx.respond(serde_json::json!({})) }
        MyRequests::Prompt(prompt) => { request_cx.respond(serde_json::json!({})) }
    }
})

§Mixed Message Types

For enums containing both requests AND notifications, use on_receive_message:

// on_receive_message receives MessageCx which can be either a request or notification
connection.on_receive_message(async |msg: MessageCx<InitializeRequest, SessionNotification>, _cx| {
    match msg {
        MessageCx::Request(req, request_cx) => {
            request_cx.respond(InitializeResponse::make())
        }
        MessageCx::Notification(notif) => {
            Ok(())
        }
    }
})

§Handler Registration

Register handlers using these methods (listed from most common to most flexible):

§Handler Ordering

Handlers are tried in the order you register them. The first handler that claims a message (by matching its type) will process it. Subsequent handlers won’t see that message:

connection
    .on_receive_request(async |req: InitializeRequest, request_cx, cx| {
        // This runs first for InitializeRequest
        request_cx.respond(InitializeResponse::make())
    })
    .on_receive_request(async |req: PromptRequest, request_cx, cx| {
        // This runs first for PromptRequest
        request_cx.respond(PromptResponse::make())
    })
    .on_receive_message(async |msg: MessageCx, cx| {
        // This runs for any message not handled above
        msg.respond_with_error(sacp::util::internal_error("unknown method"), cx)
    })

§Event Loop and Concurrency

Understanding the event loop is critical for writing correct handlers.

§The Event Loop

JrConnection runs all handler callbacks on a single async task - the event loop. While a handler is running, the server cannot receive new messages. This means any blocking or expensive work in your handlers will stall the entire connection.

To avoid blocking the event loop, use JrConnectionCx::spawn to offload serious work to concurrent tasks:

connection.on_receive_request(async |req: AnalyzeRequest, request_cx, cx| {
    // Clone cx for the spawned task
    cx.spawn({
        let connection_cx = cx.clone();
        async move {
            let result = expensive_analysis(&req.data).await?;
            connection_cx.send_notification(AnalysisComplete { result })?;
            Ok(())
        }
    })?;

    // Respond immediately without blocking
    request_cx.respond(AnalysisStarted { job_id: 42 })
})

Note that the entire connection runs within one async task, so parallelism must be managed explicitly using spawn.

§The Connection Context

Handler callbacks receive a context object (cx) for interacting with the connection:

  • For request handlers - JrRequestCx<R> provides respond to send the response, plus methods to send other messages
  • For notification handlers - JrConnectionCx provides methods to send messages and spawn tasks

Both context types support:

The JrResponse returned by send_request provides methods like await_when_result_received that help you avoid accidentally blocking the event loop while waiting for responses.

§Driving the Connection

After adding handlers, you must drive the connection using one of two modes:

§Server Mode: serve()

Use serve when you only need to respond to incoming messages:

connection
    .on_receive_request(async |req: MyRequest, request_cx, cx| {
        request_cx.respond(MyResponse { status: "ok".into() })
    })
    .serve(MockTransport)  // Runs until connection closes or error occurs
    .await?;

The connection will process incoming messages and invoke your handlers until the connection is closed or an error occurs.

§Client Mode: with_client()

Use with_client when you need to both handle incoming messages AND send your own requests/notifications:

connection
    .on_receive_request(async |req: MyRequest, request_cx, cx| {
        request_cx.respond(MyResponse { status: "ok".into() })
    })
    .with_client(MockTransport, async |cx| {
        // You can send requests to the other side
        let response = cx.send_request(InitializeRequest::make())
            .block_task()
            .await?;

        // And send notifications
        cx.send_notification(StatusUpdate { message: "ready".into() })?;

        Ok(())
    })
    .await?;

The connection will serve incoming messages in the background while your client closure runs. When the closure returns, the connection shuts down.

§Example: Complete Agent

let transport = ByteStreams::new(
    tokio::io::stdout().compat_write(),
    tokio::io::stdin().compat(),
);

UntypedRole::builder()
    .name("my-agent")  // Optional: for debugging logs
    .on_receive_request(async |init: InitializeRequest, request_cx, cx| {
        let response: InitializeResponse = todo!();
        request_cx.respond(response)
    })
    .on_receive_request(async |prompt: PromptRequest, request_cx, cx| {
        // You can send notifications while processing a request
        let notif: SessionNotification = todo!();
        cx.send_notification(notif)?;

        // Then respond to the request
        let response: PromptResponse = todo!();
        request_cx.respond(response)
    })
    .serve(transport)
    .await?;

Implementations§

Source§

impl<H: JrMessageHandler> JrConnectionBuilder<H>

Source

pub fn new_with(handler: H) -> Self

Create a new handler chain with the given handler and roles.

Source

pub fn name(self, name: impl ToString) -> Self

Set the “name” of this connection – used only for debugging logs.

Source

pub fn with_handler_chain<H1>( self, handler_chain: JrConnectionBuilder<H1>, ) -> JrConnectionBuilder<ChainedHandler<H, NamedHandler<H1>>>
where H1: JrMessageHandler<Role = H::Role>,

Returns a reference to the local role. Returns a reference to the remote role. Add a new JrMessageHandler to the chain.

Prefer Self::on_receive_request or Self::on_receive_notification. This is a low-level method that is not intended for general use.

Source

pub fn with_handler<H1>( self, handler: H1, ) -> JrConnectionBuilder<ChainedHandler<H, H1>>
where H1: JrMessageHandler<Role = H::Role>,

Add a new JrMessageHandler to the chain.

Prefer Self::on_receive_request or Self::on_receive_notification. This is a low-level method that is not intended for general use.

Source

pub fn with_spawned<F>( self, task: impl FnOnce(JrConnectionCx<H::Role>) -> F + Send + 'static, ) -> Self
where F: Future<Output = Result<(), Error>> + Send + 'static,

Enqueue a task to run once the connection is actively serving traffic.

Source

pub fn on_receive_message<Req, Notif, F, T>( self, op: F, ) -> JrConnectionBuilder<ChainedHandler<H, MessageHandler<H::Role, <H::Role as JrRole>::HandlerEndpoint, Req, Notif, F>>>
where H::Role: HasEndpoint<<H::Role as JrRole>::HandlerEndpoint>, Req: JrRequest, Notif: JrNotification, F: AsyncFnMut(MessageCx<Req, Notif>, JrConnectionCx<H::Role>) -> Result<T, Error> + Send, T: IntoHandled<MessageCx<Req, Notif>>,

Register a handler for messages that can be either requests OR notifications.

Use this when you want to handle an enum type that contains both request and notification variants. Your handler receives a MessageCx<Req, Notif> which is an enum with two variants:

  • MessageCx::Request(request, request_cx) - A request with its response context
  • MessageCx::Notification(notification) - A notification
§Example
connection.on_receive_message(async |message: MessageCx<MyRequest, StatusUpdate>, _cx| {
    match message {
        MessageCx::Request(req, request_cx) => {
            // Handle request and send response
            request_cx.respond(MyResponse { status: "ok".into() })
        }
        MessageCx::Notification(notif) => {
            // Handle notification (no response needed)
            Ok(())
        }
    }
})

For most use cases, prefer on_receive_request or on_receive_notification which provide cleaner APIs for handling requests or notifications separately.

Source

pub fn on_receive_request<Req: JrRequest, F, T>( self, op: F, ) -> JrConnectionBuilder<ChainedHandler<H, RequestHandler<H::Role, <H::Role as JrRole>::HandlerEndpoint, Req, F>>>

Register a handler for JSON-RPC requests of type Req.

Your handler receives two arguments:

  1. The request (type Req)
  2. A JrRequestCx<R, Req::Response> for sending the response

The request context allows you to:

§Example
connection.on_receive_request(async |request: PromptRequest, request_cx, cx| {
    // Send a notification while processing
    let notif: SessionNotification = todo!();
    cx.send_notification(notif)?;

    // Do some work...
    let result = todo!("process the prompt");

    // Send the response
    let response: PromptResponse = todo!();
    request_cx.respond(response)
});
§Type Parameter

Req can be either a single request type or an enum of multiple request types. See the type-driven dispatch section for details.

Source

pub fn on_receive_notification<Notif, F, T>( self, op: F, ) -> JrConnectionBuilder<ChainedHandler<H, NotificationHandler<H::Role, <H::Role as JrRole>::HandlerEndpoint, Notif, F>>>
where H::Role: HasEndpoint<<H::Role as JrRole>::HandlerEndpoint>, Notif: JrNotification, F: AsyncFnMut(Notif, JrConnectionCx<H::Role>) -> Result<T, Error> + Send, T: IntoHandled<(Notif, JrConnectionCx<H::Role>)>,

Register a handler for JSON-RPC notifications of type Notif.

Notifications are fire-and-forget messages that don’t expect a response. Your handler receives:

  1. The notification (type Notif)
  2. A JrConnectionCx<R> for sending messages to the other side

Unlike request handlers, you cannot send a response (notifications don’t have IDs), but you can still send your own requests and notifications using the context.

§Example
connection.on_receive_notification(async |notif: SessionUpdate, cx| {
    // Process the notification
    update_session_state(&notif)?;

    // Optionally send a notification back
    cx.send_notification(StatusUpdate {
        message: "Acknowledged".into(),
    })?;

    Ok(())
})
§Type Parameter

Notif can be either a single notification type or an enum of multiple notification types. See the type-driven dispatch section for details.

Source

pub fn on_receive_message_from<Req: JrRequest, Notif: JrNotification, End: JrEndpoint, F, T>( self, endpoint: End, op: F, ) -> JrConnectionBuilder<ChainedHandler<H, MessageHandler<H::Role, End, Req, Notif, F>>>
where H::Role: HasEndpoint<End>, F: AsyncFnMut(MessageCx<Req, Notif>, JrConnectionCx<H::Role>) -> Result<T, Error> + Send, T: IntoHandled<MessageCx<Req, Notif>>,

Register a handler for messages from a specific endpoint.

This is similar to on_receive_message, but allows specifying the source endpoint explicitly. This is useful when receiving messages from an endpoint that requires message transformation (e.g., unwrapping SuccessorMessage envelopes when receiving from an agent via a proxy).

For the common case of receiving from the default counterpart, use on_receive_message instead.

Source

pub fn on_receive_request_from<Req: JrRequest, End: JrEndpoint, F, T>( self, endpoint: End, op: F, ) -> JrConnectionBuilder<ChainedHandler<H, RequestHandler<H::Role, End, Req, F>>>
where H::Role: HasEndpoint<End>, F: AsyncFnMut(Req, JrRequestCx<Req::Response>, JrConnectionCx<H::Role>) -> Result<T, Error> + Send, T: IntoHandled<(Req, JrRequestCx<Req::Response>)>,

Register a handler for JSON-RPC requests from a specific endpoint.

This is similar to on_receive_request, but allows specifying the source endpoint explicitly. This is useful when receiving messages from an endpoint that requires message transformation (e.g., unwrapping SuccessorRequest envelopes when receiving from an agent via a proxy).

For the common case of receiving from the default counterpart, use on_receive_request instead.

§Example
use sacp::Agent;
use sacp::schema::InitializeRequest;

// Conductor receiving from agent direction - messages will be unwrapped from SuccessorMessage
connection.on_receive_request_from(Agent, async |req: InitializeRequest, request_cx, cx| {
    // Handle the request
    request_cx.respond(InitializeResponse::make())
})
Source

pub fn on_receive_notification_from<Notif: JrNotification, End: JrEndpoint, F, T>( self, endpoint: End, op: F, ) -> JrConnectionBuilder<ChainedHandler<H, NotificationHandler<H::Role, End, Notif, F>>>
where H::Role: HasEndpoint<End>, F: AsyncFnMut(Notif, JrConnectionCx<H::Role>) -> Result<T, Error> + Send, T: IntoHandled<(Notif, JrConnectionCx<H::Role>)>,

Register a handler for JSON-RPC notifications from a specific endpoint.

This is similar to on_receive_notification, but allows specifying the source endpoint explicitly. This is useful when receiving messages from an endpoint that requires message transformation (e.g., unwrapping SuccessorNotification envelopes when receiving from an agent via a proxy).

For the common case of receiving from the default counterpart, use on_receive_notification instead.

Source

pub fn provide_mcp<Role>( self, registry: McpServiceRegistry<Role>, ) -> JrConnectionBuilder<ChainedHandler<H, McpServiceRegistry<Role>>>
where H: JrMessageHandler<Role = Role>, Role: HasEndpoint<Agent> + JrRole,

Provide MCP servers to downstream successors.

This adds a handler that intercepts session/new requests to include the registered MCP servers, and handles MCP-over-ACP communication.

§Example
use sacp::mcp_server::McpServiceRegistry;
use sacp::ProxyToConductor;

ProxyToConductor::builder()
    .name("my-proxy")
    .provide_mcp(McpServiceRegistry::default().with_mcp_server("example", my_server)?)
    .serve(connection)
    .await?;
Source

pub fn connect_to( self, transport: impl Component + 'static, ) -> Result<JrConnection<H>, Error>

Connect these handlers to a transport layer. The resulting connection must then be either served or used as a client.

Source

pub async fn apply( &mut self, message: MessageCx, cx: JrConnectionCx<H::Role>, ) -> Result<Handled<MessageCx>, Error>

Apply the handler chain to a single message.

This method processes one message through the entire handler chain, attempting to match it against each registered handler in order. This is useful when implementing custom message handling logic or when you need fine-grained control over message processing.

§Returns
  • Ok(Handled::Claimed) - A handler claimed and processed the message
  • Ok(Handled::Unclaimed(message)) - No handler matched the message
  • Err(_) - A handler encountered an error while processing
§Borrow Checker Considerations

You may find that [MatchMessage] is a better choice than this method for implementing custom handlers. It offers a very similar API to JrConnectionBuilder but is structured to apply each test one at a time (sequentially) instead of setting them all up at once. This sequential approach often interacts better with the borrow checker, at the cost of requiring .await calls between each handler and only working for processing a single message.

§Example: Borrow Checker Challenges

When building a handler chain with async {} blocks (non-move), you might encounter borrow checker errors if multiple handlers need access to the same mutable state:

let mut state = String::from("initial");

// This fails to compile because both handlers borrow `state` mutably,
// and the futures are set up at the same time (even though only one will run)
let chain = UntypedRole::builder()
    .on_receive_request(async |req: InitializeRequest, cx: JrRequestCx| {
        state.push_str(" - initialized");  // First mutable borrow
        cx.respond(InitializeResponse::make())
    })
    .on_receive_request(async |req: PromptRequest, cx: JrRequestCx| {
        state.push_str(" - prompted");  // Second mutable borrow - ERROR!
        cx.respond(PromptResponse { content: vec![], stopReason: None })
    });

You can work around this by using apply() to process messages one at a time, or use [MatchMessage] which provides a similar API but applies handlers sequentially:

use sacp::{MessageCx, Handled};
use sacp::util::MatchMessage;

async fn handle_with_state(
    message: MessageCx,
    state: &mut String,
) -> Result<Handled<MessageCx>, sacp::Error> {
    MatchMessage::new(message)
        .on_request(async |req: InitializeRequest, request_cx| {
            state.push_str(" - initialized");  // Sequential - OK!
            request_cx.respond(InitializeResponse::make())
        })
        .on_request(async |req: PromptRequest, request_cx| {
            state.push_str(" - prompted");  // Sequential - OK!
            request_cx.respond(PromptResponse { content: vec![], stopReason: None })
        })
        .otherwise(async |msg| Ok(Handled::Unclaimed(msg)))
        .await
}
Source

pub async fn serve( self, transport: impl Component + 'static, ) -> Result<(), Error>

Convenience method to connect to a transport and serve.

This is equivalent to:

handler_chain.connect_to(transport)?.serve().await
Source

pub async fn with_client( self, transport: impl Component + 'static, main_fn: impl AsyncFnOnce(JrConnectionCx<H::Role>) -> Result<(), Error>, ) -> Result<(), Error>

Convenience method to connect to a transport and run a client function.

This is equivalent to:

handler_chain.connect_to(transport)?.with_client(main_fn).await

Auto Trait Implementations§

§

impl<H> Freeze for JrConnectionBuilder<H>
where H: Freeze,

§

impl<H> !RefUnwindSafe for JrConnectionBuilder<H>

§

impl<H> Send for JrConnectionBuilder<H>
where H: Send,

§

impl<H> !Sync for JrConnectionBuilder<H>

§

impl<H> Unpin for JrConnectionBuilder<H>
where H: Unpin,

§

impl<H> !UnwindSafe for JrConnectionBuilder<H>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more