agent_client_protocol/jsonrpc.rs
1//! Core JSON-RPC server support.
2
3use agent_client_protocol_schema::SessionId;
4// Re-export jsonrpcmsg for use in public API
5pub use jsonrpcmsg;
6
7// Types re-exported from crate root
8use serde::{Deserialize, Serialize};
9use std::fmt::Debug;
10use std::panic::Location;
11use std::pin::pin;
12use uuid::Uuid;
13
14use futures::channel::{mpsc, oneshot};
15use futures::future::{self, BoxFuture, Either};
16use futures::{AsyncRead, AsyncWrite, StreamExt};
17
18mod dynamic_handler;
19pub(crate) mod handlers;
20mod incoming_actor;
21mod outgoing_actor;
22pub(crate) mod run;
23mod task_actor;
24mod transport_actor;
25
26use crate::jsonrpc::dynamic_handler::DynamicHandlerMessage;
27pub use crate::jsonrpc::handlers::NullHandler;
28use crate::jsonrpc::handlers::{ChainedHandler, NamedHandler};
29use crate::jsonrpc::handlers::{MessageHandler, NotificationHandler, RequestHandler};
30use crate::jsonrpc::outgoing_actor::{OutgoingMessageTx, send_raw_message};
31use crate::jsonrpc::run::SpawnedRun;
32use crate::jsonrpc::run::{ChainRun, NullRun, RunWithConnectionTo};
33use crate::jsonrpc::task_actor::{Task, TaskTx};
34use crate::mcp_server::McpServer;
35use crate::role::HasPeer;
36use crate::role::Role;
37use crate::util::json_cast;
38use crate::{Agent, Client, ConnectTo, RoleId};
39
40/// Handlers process incoming JSON-RPC messages on a connection.
41///
42/// When messages arrive, they flow through a chain of handlers. Each handler can
43/// either **claim** the message (handle it) or **decline** it (pass to the next handler).
44///
45/// # Message Flow
46///
47/// Messages flow through three layers of handlers in order:
48///
49/// ```text
50/// ┌─────────────────────────────────────────────────────────────────┐
51/// │ Incoming Message │
52/// └─────────────────────────────────────────────────────────────────┘
53/// │
54/// ▼
55/// ┌─────────────────────────────────────────────────────────────────┐
56/// │ 1. User Handlers (registered via on_receive_request, etc.) │
57/// │ - Tried in registration order │
58/// │ - First handler to return Handled::Yes claims the message │
59/// └─────────────────────────────────────────────────────────────────┘
60/// │ Handled::No
61/// ▼
62/// ┌─────────────────────────────────────────────────────────────────┐
63/// │ 2. Dynamic Handlers (added at runtime) │
64/// │ - Used for session-specific message handling │
65/// │ - Added via ConnectionTo::add_dynamic_handler │
66/// └─────────────────────────────────────────────────────────────────┘
67/// │ Handled::No
68/// ▼
69/// ┌─────────────────────────────────────────────────────────────────┐
70/// │ 3. Role Default Handler │
71/// │ - Fallback based on the connection's Role │
72/// │ - Handles protocol-level messages (e.g., proxy forwarding) │
73/// └─────────────────────────────────────────────────────────────────┘
74/// │ Handled::No
75/// ▼
76/// ┌─────────────────────────────────────────────────────────────────┐
77/// │ Unhandled: Error response sent (or queued if retry=true) │
78/// └─────────────────────────────────────────────────────────────────┘
79/// ```
80///
81/// # The `Handled` Return Value
82///
83/// Each handler returns [`Handled`] to indicate whether it processed the message:
84///
85/// - **`Handled::Yes`** - Message was handled. No further handlers are invoked.
86/// - **`Handled::No { message, retry }`** - Message was not handled. The message
87/// (possibly modified) is passed to the next handler in the chain.
88///
89/// For convenience, handlers can return `()` which is equivalent to `Handled::Yes`.
90///
91/// # The Retry Mechanism
92///
93/// The `retry` flag in `Handled::No` controls what happens when no handler claims a message:
94///
95/// - **`retry: false`** (default) - Send a "method not found" error response immediately.
96/// - **`retry: true`** - Queue the message and retry it when new dynamic handlers are added.
97///
98/// This mechanism exists because of a timing issue with sessions: when a `session/new`
99/// response is being processed, the dynamic handler for that session hasn't been registered
100/// yet, but `session/update` notifications for that session may already be arriving.
101/// By setting `retry: true`, these early notifications are queued until the session's
102/// dynamic handler is added.
103///
104/// # Handler Registration
105///
106/// Most users register handlers using the builder methods on [`Builder`]:
107///
108/// ```
109/// # use agent_client_protocol::{Agent, Client, ConnectTo};
110/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, AgentCapabilities};
111/// # use agent_client_protocol_test::StatusUpdate;
112/// # async fn example(transport: impl ConnectTo<Agent>) -> Result<(), agent_client_protocol::Error> {
113/// Agent.builder()
114/// .on_receive_request(async |req: InitializeRequest, responder, cx| {
115/// responder.respond(
116/// InitializeResponse::new(req.protocol_version)
117/// .agent_capabilities(AgentCapabilities::new()),
118/// )
119/// }, agent_client_protocol::on_receive_request!())
120/// .on_receive_notification(async |notif: StatusUpdate, cx| {
121/// // Process notification
122/// Ok(())
123/// }, agent_client_protocol::on_receive_notification!())
124/// .connect_to(transport)
125/// .await?;
126/// # Ok(())
127/// # }
128/// ```
129///
130/// The type parameter on the closure determines which messages are dispatched to it.
131/// Messages that don't match the type are automatically passed to the next handler.
132///
133/// # Implementing Custom Handlers
134///
135/// For advanced use cases, you can implement `HandleMessageAs` directly:
136///
137/// ```ignore
138/// struct MyHandler;
139///
140/// impl HandleMessageAs<Agent> for MyHandler {
141///
142/// async fn handle_dispatch(
143/// &mut self,
144/// message: Dispatch,
145/// cx: ConnectionTo<Self::Role>,
146/// ) -> Result<Handled<Dispatch>, Error> {
147/// if message.method() == "my/custom/method" {
148/// // Handle it
149/// Ok(Handled::Yes)
150/// } else {
151/// // Pass to next handler
152/// Ok(Handled::No { message, retry: false })
153/// }
154/// }
155///
156/// fn describe_chain(&self) -> impl std::fmt::Debug {
157/// "MyHandler"
158/// }
159/// }
160/// ```
161///
162/// # Important: Handlers Must Not Block
163///
164/// The connection processes messages on a single async task. While a handler is running,
165/// no other messages can be processed. For expensive operations, use [`ConnectionTo::spawn`]
166/// to run work concurrently:
167///
168/// ```
169/// # use agent_client_protocol::{Client, Agent, ConnectTo};
170/// # use agent_client_protocol_test::{expensive_operation, ProcessComplete};
171/// # async fn example(transport: impl ConnectTo<Client>) -> Result<(), agent_client_protocol::Error> {
172/// # Client.builder().connect_with(transport, async |cx| {
173/// cx.spawn({
174/// let connection = cx.clone();
175/// async move {
176/// let result = expensive_operation("data").await?;
177/// connection.send_notification(ProcessComplete { result })?;
178/// Ok(())
179/// }
180/// })?;
181/// # Ok(())
182/// # }).await?;
183/// # Ok(())
184/// # }
185/// ```
186#[allow(async_fn_in_trait)]
187/// A handler for incoming JSON-RPC messages.
188///
189/// This trait is implemented by types that can process incoming messages on a connection.
190/// Handlers are registered with a [`Builder`] and are called in order until
191/// one claims the message.
192///
193/// The type parameter `R` is the role this handler plays - who I am.
194/// For an agent handler, `R = Agent` (I handle messages as an agent).
195/// For a client handler, `R = Client` (I handle messages as a client).
196pub trait HandleDispatchFrom<Counterpart: Role>: Send {
197 /// Attempt to claim an incoming message (request or notification).
198 ///
199 /// # Important: do not block
200 ///
201 /// The server will not process new messages until this handler returns.
202 /// You should avoid blocking in this callback unless you wish to block the server (e.g., for rate limiting).
203 /// The recommended approach to manage expensive operations is to the [`ConnectionTo::spawn`] method available on the message context.
204 ///
205 /// # Parameters
206 ///
207 /// * `message` - The incoming message to handle.
208 /// * `connection` - The connection, used to send messages and access connection state.
209 ///
210 /// # Returns
211 ///
212 /// * `Ok(Handled::Yes)` if the message was claimed. It will not be propagated further.
213 /// * `Ok(Handled::No(message))` if not; the (possibly changed) message will be passed to the remaining handlers.
214 /// * `Err` if an internal error occurs (this will bring down the server).
215 fn handle_dispatch_from(
216 &mut self,
217 message: Dispatch,
218 connection: ConnectionTo<Counterpart>,
219 ) -> impl Future<Output = Result<Handled<Dispatch>, crate::Error>> + Send;
220
221 /// Returns a debug description of the registered handlers for diagnostics.
222 fn describe_chain(&self) -> impl std::fmt::Debug;
223}
224
225impl<Counterpart: Role, H> HandleDispatchFrom<Counterpart> for &mut H
226where
227 H: HandleDispatchFrom<Counterpart>,
228{
229 fn handle_dispatch_from(
230 &mut self,
231 message: Dispatch,
232 cx: ConnectionTo<Counterpart>,
233 ) -> impl Future<Output = Result<Handled<Dispatch>, crate::Error>> + Send {
234 H::handle_dispatch_from(self, message, cx)
235 }
236
237 fn describe_chain(&self) -> impl std::fmt::Debug {
238 H::describe_chain(self)
239 }
240}
241
242/// A JSON-RPC connection that can act as either a server, client, or both.
243///
244/// [`Builder`] provides a builder-style API for creating JSON-RPC servers and clients.
245/// You start by calling `Role.builder()` (e.g., `Client.builder()`), then add message
246/// handlers, and finally drive the connection with either [`connect_to`](Builder::connect_to)
247/// or [`connect_with`](Builder::connect_with), providing a component implementation
248/// (e.g., [`ByteStreams`] for byte streams).
249///
250/// # JSON-RPC Primer
251///
252/// JSON-RPC 2.0 has two fundamental message types:
253///
254/// * **Requests** - Messages that expect a response. They have an `id` field that gets
255/// echoed back in the response so the sender can correlate them.
256/// * **Notifications** - Fire-and-forget messages with no `id` field. The sender doesn't
257/// expect or receive a response.
258///
259/// # Type-Driven Message Dispatch
260///
261/// The handler registration methods use Rust's type system to determine which messages
262/// to handle. The type parameter you provide controls what gets dispatched to your handler:
263///
264/// ## Single Message Types
265///
266/// The simplest case - handle one specific message type:
267///
268/// ```no_run
269/// # use agent_client_protocol_test::*;
270/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, SessionNotification};
271/// # async fn example() -> Result<(), agent_client_protocol::Error> {
272/// # let connection = mock_connection();
273/// connection
274/// .on_receive_request(async |req: InitializeRequest, responder, cx| {
275/// // Handle only InitializeRequest messages
276/// responder.respond(InitializeResponse::make())
277/// }, agent_client_protocol::on_receive_request!())
278/// .on_receive_notification(async |notif: SessionNotification, cx| {
279/// // Handle only SessionUpdate notifications
280/// Ok(())
281/// }, agent_client_protocol::on_receive_notification!())
282/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
283/// # Ok(())
284/// # }
285/// ```
286///
287/// ## Enum Message Types
288///
289/// You can also handle multiple related messages with a single handler by defining an enum
290/// that implements the appropriate trait ([`JsonRpcRequest`] or [`JsonRpcNotification`]):
291///
292/// ```no_run
293/// # use agent_client_protocol_test::*;
294/// # use agent_client_protocol::{JsonRpcRequest, JsonRpcMessage, UntypedMessage};
295/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse};
296/// # async fn example() -> Result<(), agent_client_protocol::Error> {
297/// # let connection = mock_connection();
298/// // Define an enum for multiple request types
299/// #[derive(Debug, Clone)]
300/// enum MyRequests {
301/// Initialize(InitializeRequest),
302/// Prompt(PromptRequest),
303/// }
304///
305/// // Implement JsonRpcRequest for your enum
306/// # impl JsonRpcMessage for MyRequests {
307/// # fn matches_method(_method: &str) -> bool { false }
308/// # fn method(&self) -> &str { "myRequests" }
309/// # fn to_untyped_message(&self) -> Result<UntypedMessage, agent_client_protocol::Error> { todo!() }
310/// # fn parse_message(_method: &str, _params: &impl serde::Serialize) -> Result<Self, agent_client_protocol::Error> { Err(agent_client_protocol::Error::method_not_found()) }
311/// # }
312/// impl JsonRpcRequest for MyRequests { type Response = serde_json::Value; }
313///
314/// // Handle all variants in one place
315/// connection.on_receive_request(async |req: MyRequests, responder, cx| {
316/// match req {
317/// MyRequests::Initialize(init) => { responder.respond(serde_json::json!({})) }
318/// MyRequests::Prompt(prompt) => { responder.respond(serde_json::json!({})) }
319/// }
320/// }, agent_client_protocol::on_receive_request!())
321/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
322/// # Ok(())
323/// # }
324/// ```
325///
326/// ## Mixed Message Types
327///
328/// For enums containing both requests AND notifications, use [`on_receive_dispatch`](Self::on_receive_dispatch):
329///
330/// ```no_run
331/// # use agent_client_protocol_test::*;
332/// # use agent_client_protocol::Dispatch;
333/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, SessionNotification};
334/// # async fn example() -> Result<(), agent_client_protocol::Error> {
335/// # let connection = mock_connection();
336/// // on_receive_dispatch receives Dispatch which can be either a request or notification
337/// connection.on_receive_dispatch(async |msg: Dispatch<InitializeRequest, SessionNotification>, _cx| {
338/// match msg {
339/// Dispatch::Request(req, responder) => {
340/// responder.respond(InitializeResponse::make())
341/// }
342/// Dispatch::Notification(notif) => {
343/// Ok(())
344/// }
345/// Dispatch::Response(result, router) => {
346/// // Forward response to its destination
347/// router.respond_with_result(result)
348/// }
349/// }
350/// }, agent_client_protocol::on_receive_dispatch!())
351/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
352/// # Ok(())
353/// # }
354/// ```
355///
356/// # Handler Registration
357///
358/// Register handlers using these methods (listed from most common to most flexible):
359///
360/// * [`on_receive_request`](Self::on_receive_request) - Handle JSON-RPC requests (messages expecting responses)
361/// * [`on_receive_notification`](Self::on_receive_notification) - Handle JSON-RPC notifications (fire-and-forget)
362/// * [`on_receive_dispatch`](Self::on_receive_dispatch) - Handle enums containing both requests and notifications
363/// * [`with_handler`](Self::with_handler) - Low-level primitive for maximum flexibility
364///
365/// ## Handler Ordering
366///
367/// Handlers are tried in the order you register them. The first handler that claims a message
368/// (by matching its type) will process it. Subsequent handlers won't see that message:
369///
370/// ```no_run
371/// # use agent_client_protocol_test::*;
372/// # use agent_client_protocol::{Dispatch, UntypedMessage};
373/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse};
374/// # async fn example() -> Result<(), agent_client_protocol::Error> {
375/// # let connection = mock_connection();
376/// connection
377/// .on_receive_request(async |req: InitializeRequest, responder, cx| {
378/// // This runs first for InitializeRequest
379/// responder.respond(InitializeResponse::make())
380/// }, agent_client_protocol::on_receive_request!())
381/// .on_receive_request(async |req: PromptRequest, responder, cx| {
382/// // This runs first for PromptRequest
383/// responder.respond(PromptResponse::make())
384/// }, agent_client_protocol::on_receive_request!())
385/// .on_receive_dispatch(async |msg: Dispatch, cx| {
386/// // This runs for any message not handled above
387/// msg.respond_with_error(agent_client_protocol::util::internal_error("unknown method"), cx)
388/// }, agent_client_protocol::on_receive_dispatch!())
389/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
390/// # Ok(())
391/// # }
392/// ```
393///
394/// # Event Loop and Concurrency
395///
396/// Understanding the event loop is critical for writing correct handlers.
397///
398/// ## The Event Loop
399///
400/// [`Builder`] runs all handler callbacks on a single async task - the event loop.
401/// While a handler is running, **the server cannot receive new messages**. This means
402/// any blocking or expensive work in your handlers will stall the entire connection.
403///
404/// To avoid blocking the event loop, use [`ConnectionTo::spawn`] to offload serious
405/// work to concurrent tasks:
406///
407/// ```no_run
408/// # use agent_client_protocol_test::*;
409/// # async fn example() -> Result<(), agent_client_protocol::Error> {
410/// # let connection = mock_connection();
411/// connection.on_receive_request(async |req: AnalyzeRequest, responder, cx| {
412/// // Clone cx for the spawned task
413/// cx.spawn({
414/// let connection = cx.clone();
415/// async move {
416/// let result = expensive_analysis(&req.data).await?;
417/// connection.send_notification(AnalysisComplete { result })?;
418/// Ok(())
419/// }
420/// })?;
421///
422/// // Respond immediately without blocking
423/// responder.respond(AnalysisStarted { job_id: 42 })
424/// }, agent_client_protocol::on_receive_request!())
425/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
426/// # Ok(())
427/// # }
428/// ```
429///
430/// Note that the entire connection runs within one async task, so parallelism must be
431/// managed explicitly using [`spawn`](ConnectionTo::spawn).
432///
433/// ## The Connection Context
434///
435/// Handler callbacks receive a context object (`cx`) for interacting with the connection:
436///
437/// * **For request handlers** - [`Responder<R>`] provides [`respond`](Responder::respond)
438/// to send the response, plus methods to send other messages
439/// * **For notification handlers** - [`ConnectionTo`] provides methods to send messages
440/// and spawn tasks
441///
442/// Both context types support:
443/// * [`send_request`](ConnectionTo::send_request) - Send requests to the other side
444/// * [`send_notification`](ConnectionTo::send_notification) - Send notifications
445/// * [`spawn`](ConnectionTo::spawn) - Run tasks concurrently without blocking the event loop
446///
447/// The [`SentRequest`] returned by `send_request` provides methods like
448/// [`on_receiving_result`](SentRequest::on_receiving_result) that help you
449/// avoid accidentally blocking the event loop while waiting for responses.
450///
451/// # Driving the Connection
452///
453/// After adding handlers, you must drive the connection using one of two modes:
454///
455/// ## Server Mode: `connect_to()`
456///
457/// Use [`connect_to`](Self::connect_to) when you only need to respond to incoming messages:
458///
459/// ```no_run
460/// # use agent_client_protocol_test::*;
461/// # async fn example() -> Result<(), agent_client_protocol::Error> {
462/// # let connection = mock_connection();
463/// connection
464/// .on_receive_request(async |req: MyRequest, responder, cx| {
465/// responder.respond(MyResponse { status: "ok".into() })
466/// }, agent_client_protocol::on_receive_request!())
467/// .connect_to(MockTransport) // Runs until connection closes or error occurs
468/// .await?;
469/// # Ok(())
470/// # }
471/// ```
472///
473/// The connection will process incoming messages and invoke your handlers until the
474/// connection is closed or an error occurs.
475///
476/// ## Client Mode: `connect_with()`
477///
478/// Use [`connect_with`](Self::connect_with) when you need to both handle incoming messages
479/// AND send your own requests/notifications:
480///
481/// ```no_run
482/// # use agent_client_protocol_test::*;
483/// # use agent_client_protocol::schema::InitializeRequest;
484/// # async fn example() -> Result<(), agent_client_protocol::Error> {
485/// # let connection = mock_connection();
486/// connection
487/// .on_receive_request(async |req: MyRequest, responder, cx| {
488/// responder.respond(MyResponse { status: "ok".into() })
489/// }, agent_client_protocol::on_receive_request!())
490/// .connect_with(MockTransport, async |cx| {
491/// // You can send requests to the other side
492/// let response = cx.send_request(InitializeRequest::make())
493/// .block_task()
494/// .await?;
495///
496/// // And send notifications
497/// cx.send_notification(StatusUpdate { message: "ready".into() })?;
498///
499/// Ok(())
500/// })
501/// .await?;
502/// # Ok(())
503/// # }
504/// ```
505///
506/// The connection will serve incoming messages in the background while your client closure
507/// runs. When the closure returns, the connection shuts down.
508///
509/// # Example: Complete Agent
510///
511/// ```no_run
512/// # use agent_client_protocol::UntypedRole;
513/// # use agent_client_protocol::{Builder};
514/// # use agent_client_protocol::Stdio;
515/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse, SessionNotification};
516/// # async fn example() -> Result<(), agent_client_protocol::Error> {
517/// let transport = Stdio::new();
518///
519/// UntypedRole.builder()
520/// .name("my-agent") // Optional: for debugging logs
521/// .on_receive_request(async |init: InitializeRequest, responder, cx| {
522/// let response: InitializeResponse = todo!();
523/// responder.respond(response)
524/// }, agent_client_protocol::on_receive_request!())
525/// .on_receive_request(async |prompt: PromptRequest, responder, cx| {
526/// // You can send notifications while processing a request
527/// let notif: SessionNotification = todo!();
528/// cx.send_notification(notif)?;
529///
530/// // Then respond to the request
531/// let response: PromptResponse = todo!();
532/// responder.respond(response)
533/// }, agent_client_protocol::on_receive_request!())
534/// .connect_to(transport)
535/// .await?;
536/// # Ok(())
537/// # }
538/// ```
539#[must_use]
540#[derive(Debug)]
541pub struct Builder<Host: Role, Handler = NullHandler, Runner = NullRun>
542where
543 Handler: HandleDispatchFrom<Host::Counterpart>,
544 Runner: RunWithConnectionTo<Host::Counterpart>,
545{
546 /// My role.
547 host: Host,
548
549 /// Name of the connection, used in tracing logs.
550 name: Option<String>,
551
552 /// Handler for incoming messages.
553 handler: Handler,
554
555 /// Responder for background tasks.
556 responder: Runner,
557}
558
559impl<Host: Role> Builder<Host, NullHandler, NullRun> {
560 /// Create a new connection builder for the given role.
561 /// This type follows a builder pattern; use other methods to configure and then invoke
562 /// [`Self::connect_to`] (to use as a server) or [`Self::connect_with`] to use as a client.
563 pub fn new(role: Host) -> Self {
564 Self {
565 host: role,
566 name: None,
567 handler: NullHandler,
568 responder: NullRun,
569 }
570 }
571}
572
573impl<Host: Role, Handler> Builder<Host, Handler, NullRun>
574where
575 Handler: HandleDispatchFrom<Host::Counterpart>,
576{
577 /// Create a new connection builder with the given handler.
578 pub fn new_with(role: Host, handler: Handler) -> Self {
579 Self {
580 host: role,
581 name: None,
582 handler,
583 responder: NullRun,
584 }
585 }
586}
587
588impl<
589 Host: Role,
590 Handler: HandleDispatchFrom<Host::Counterpart>,
591 Runner: RunWithConnectionTo<Host::Counterpart>,
592> Builder<Host, Handler, Runner>
593{
594 /// Set the "name" of this connection -- used only for debugging logs.
595 pub fn name(mut self, name: impl ToString) -> Self {
596 self.name = Some(name.to_string());
597 self
598 }
599
600 /// Merge another [`Builder`] into this one.
601 ///
602 /// Prefer [`Self::on_receive_request`] or [`Self::on_receive_notification`].
603 /// This is a low-level method that is not intended for general use.
604 pub fn with_connection_builder(
605 self,
606 other: Builder<
607 Host,
608 impl HandleDispatchFrom<Host::Counterpart>,
609 impl RunWithConnectionTo<Host::Counterpart>,
610 >,
611 ) -> Builder<
612 Host,
613 impl HandleDispatchFrom<Host::Counterpart>,
614 impl RunWithConnectionTo<Host::Counterpart>,
615 > {
616 Builder {
617 host: self.host,
618 name: self.name,
619 handler: ChainedHandler::new(
620 self.handler,
621 NamedHandler::new(other.name, other.handler),
622 ),
623 responder: ChainRun::new(self.responder, other.responder),
624 }
625 }
626
627 /// Add a new [`HandleDispatchFrom`] to the chain.
628 ///
629 /// Prefer [`Self::on_receive_request`] or [`Self::on_receive_notification`].
630 /// This is a low-level method that is not intended for general use.
631 pub fn with_handler(
632 self,
633 handler: impl HandleDispatchFrom<Host::Counterpart>,
634 ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner> {
635 Builder {
636 host: self.host,
637 name: self.name,
638 handler: ChainedHandler::new(self.handler, handler),
639 responder: self.responder,
640 }
641 }
642
643 /// Add a new [`RunWithConnectionTo`] to the chain.
644 pub fn with_responder<Run1>(
645 self,
646 responder: Run1,
647 ) -> Builder<Host, Handler, impl RunWithConnectionTo<Host::Counterpart>>
648 where
649 Run1: RunWithConnectionTo<Host::Counterpart>,
650 {
651 Builder {
652 host: self.host,
653 name: self.name,
654 handler: self.handler,
655 responder: ChainRun::new(self.responder, responder),
656 }
657 }
658
659 /// Enqueue a task to run once the connection is actively serving traffic.
660 #[track_caller]
661 pub fn with_spawned<F, Fut>(
662 self,
663 task: F,
664 ) -> Builder<Host, Handler, impl RunWithConnectionTo<Host::Counterpart>>
665 where
666 F: FnOnce(ConnectionTo<Host::Counterpart>) -> Fut + Send,
667 Fut: Future<Output = Result<(), crate::Error>> + Send,
668 {
669 let location = Location::caller();
670 self.with_responder(SpawnedRun::new(location, task))
671 }
672
673 /// Register a handler for messages that can be either requests OR notifications.
674 ///
675 /// Use this when you want to handle an enum type that contains both request and
676 /// notification variants. Your handler receives a [`Dispatch<Req, Notif>`] which
677 /// is an enum with two variants:
678 ///
679 /// - `Dispatch::Request(request, responder)` - A request with its response context
680 /// - `Dispatch::Notification(notification)` - A notification
681 /// - `Dispatch::Response(result, router)` - A response to a request we sent
682 ///
683 /// # Example
684 ///
685 /// ```no_run
686 /// # use agent_client_protocol_test::*;
687 /// # use agent_client_protocol::Dispatch;
688 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
689 /// # let connection = mock_connection();
690 /// connection.on_receive_dispatch(async |message: Dispatch<MyRequest, StatusUpdate>, _cx| {
691 /// match message {
692 /// Dispatch::Request(req, responder) => {
693 /// // Handle request and send response
694 /// responder.respond(MyResponse { status: "ok".into() })
695 /// }
696 /// Dispatch::Notification(notif) => {
697 /// // Handle notification (no response needed)
698 /// Ok(())
699 /// }
700 /// Dispatch::Response(result, router) => {
701 /// // Forward response to its destination
702 /// router.respond_with_result(result)
703 /// }
704 /// }
705 /// }, agent_client_protocol::on_receive_dispatch!())
706 /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
707 /// # Ok(())
708 /// # }
709 /// ```
710 ///
711 /// For most use cases, prefer [`on_receive_request`](Self::on_receive_request) or
712 /// [`on_receive_notification`](Self::on_receive_notification) which provide cleaner APIs
713 /// for handling requests or notifications separately.
714 ///
715 /// # Ordering
716 ///
717 /// This callback runs inside the dispatch loop and blocks further message processing
718 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
719 /// ordering guarantees and how to avoid deadlocks.
720 pub fn on_receive_dispatch<Req, Notif, F, T, ToFut>(
721 self,
722 op: F,
723 to_future_hack: ToFut,
724 ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
725 where
726 Host::Counterpart: HasPeer<Host::Counterpart>,
727 Req: JsonRpcRequest,
728 Notif: JsonRpcNotification,
729 F: AsyncFnMut(
730 Dispatch<Req, Notif>,
731 ConnectionTo<Host::Counterpart>,
732 ) -> Result<T, crate::Error>
733 + Send,
734 T: IntoHandled<Dispatch<Req, Notif>>,
735 ToFut: Fn(
736 &mut F,
737 Dispatch<Req, Notif>,
738 ConnectionTo<Host::Counterpart>,
739 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
740 + Send
741 + Sync,
742 {
743 let handler = MessageHandler::new(
744 self.host.counterpart(),
745 self.host.counterpart(),
746 op,
747 to_future_hack,
748 );
749 self.with_handler(handler)
750 }
751
752 /// Register a handler for JSON-RPC requests of type `Req`.
753 ///
754 /// Your handler receives two arguments:
755 /// 1. The request (type `Req`)
756 /// 2. A [`Responder<R, Req::Response>`] for sending the response
757 ///
758 /// The request context allows you to:
759 /// - Send the response with [`Responder::respond`]
760 /// - Send notifications to the client with [`ConnectionTo::send_notification`]
761 /// - Send requests to the client with [`ConnectionTo::send_request`]
762 ///
763 /// # Example
764 ///
765 /// ```ignore
766 /// # use agent_client_protocol::UntypedRole;
767 /// # use agent_client_protocol::{Builder};
768 /// # use agent_client_protocol::schema::{PromptRequest, PromptResponse, SessionNotification};
769 /// # fn example<R: agent_client_protocol::Role>(connection: Builder<R, impl agent_client_protocol::HandleMessageAs<R>>) {
770 /// connection.on_receive_request(async |request: PromptRequest, responder, cx| {
771 /// // Send a notification while processing
772 /// let notif: SessionNotification = todo!();
773 /// cx.send_notification(notif)?;
774 ///
775 /// // Do some work...
776 /// let result = todo!("process the prompt");
777 ///
778 /// // Send the response
779 /// let response: PromptResponse = todo!();
780 /// responder.respond(response)
781 /// }, agent_client_protocol::on_receive_request!());
782 /// # }
783 /// ```
784 ///
785 /// # Type Parameter
786 ///
787 /// `Req` can be either a single request type or an enum of multiple request types.
788 /// See the [type-driven dispatch](Self#type-driven-message-dispatch) section for details.
789 ///
790 /// # Ordering
791 ///
792 /// This callback runs inside the dispatch loop and blocks further message processing
793 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
794 /// ordering guarantees and how to avoid deadlocks.
795 pub fn on_receive_request<Req: JsonRpcRequest, F, T, ToFut>(
796 self,
797 op: F,
798 to_future_hack: ToFut,
799 ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
800 where
801 Host::Counterpart: HasPeer<Host::Counterpart>,
802 F: AsyncFnMut(
803 Req,
804 Responder<Req::Response>,
805 ConnectionTo<Host::Counterpart>,
806 ) -> Result<T, crate::Error>
807 + Send,
808 T: IntoHandled<(Req, Responder<Req::Response>)>,
809 ToFut: Fn(
810 &mut F,
811 Req,
812 Responder<Req::Response>,
813 ConnectionTo<Host::Counterpart>,
814 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
815 + Send
816 + Sync,
817 {
818 let handler = RequestHandler::new(
819 self.host.counterpart(),
820 self.host.counterpart(),
821 op,
822 to_future_hack,
823 );
824 self.with_handler(handler)
825 }
826
827 /// Register a handler for JSON-RPC notifications of type `Notif`.
828 ///
829 /// Notifications are fire-and-forget messages that don't expect a response.
830 /// Your handler receives:
831 /// 1. The notification (type `Notif`)
832 /// 2. A [`ConnectionTo<R>`] for sending messages to the other side
833 ///
834 /// Unlike request handlers, you cannot send a response (notifications don't have IDs),
835 /// but you can still send your own requests and notifications using the context.
836 ///
837 /// # Example
838 ///
839 /// ```no_run
840 /// # use agent_client_protocol_test::*;
841 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
842 /// # let connection = mock_connection();
843 /// connection.on_receive_notification(async |notif: SessionUpdate, cx| {
844 /// // Process the notification
845 /// update_session_state(¬if)?;
846 ///
847 /// // Optionally send a notification back
848 /// cx.send_notification(StatusUpdate {
849 /// message: "Acknowledged".into(),
850 /// })?;
851 ///
852 /// Ok(())
853 /// }, agent_client_protocol::on_receive_notification!())
854 /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
855 /// # Ok(())
856 /// # }
857 /// ```
858 ///
859 /// # Type Parameter
860 ///
861 /// `Notif` can be either a single notification type or an enum of multiple notification types.
862 /// See the [type-driven dispatch](Self#type-driven-message-dispatch) section for details.
863 ///
864 /// # Ordering
865 ///
866 /// This callback runs inside the dispatch loop and blocks further message processing
867 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
868 /// ordering guarantees and how to avoid deadlocks.
869 pub fn on_receive_notification<Notif, F, T, ToFut>(
870 self,
871 op: F,
872 to_future_hack: ToFut,
873 ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
874 where
875 Host::Counterpart: HasPeer<Host::Counterpart>,
876 Notif: JsonRpcNotification,
877 F: AsyncFnMut(Notif, ConnectionTo<Host::Counterpart>) -> Result<T, crate::Error> + Send,
878 T: IntoHandled<(Notif, ConnectionTo<Host::Counterpart>)>,
879 ToFut: Fn(
880 &mut F,
881 Notif,
882 ConnectionTo<Host::Counterpart>,
883 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
884 + Send
885 + Sync,
886 {
887 let handler = NotificationHandler::new(
888 self.host.counterpart(),
889 self.host.counterpart(),
890 op,
891 to_future_hack,
892 );
893 self.with_handler(handler)
894 }
895
896 /// Register a handler for messages from a specific peer.
897 ///
898 /// This is similar to [`on_receive_dispatch`](Self::on_receive_dispatch), but allows
899 /// specifying the source peer explicitly. This is useful when receiving messages
900 /// from a peer that requires message transformation (e.g., unwrapping `SuccessorMessage`
901 /// envelopes when receiving from an agent via a proxy).
902 ///
903 /// For the common case of receiving from the default counterpart, use
904 /// [`on_receive_dispatch`](Self::on_receive_dispatch) instead.
905 ///
906 /// # Ordering
907 ///
908 /// This callback runs inside the dispatch loop and blocks further message processing
909 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
910 /// ordering guarantees and how to avoid deadlocks.
911 pub fn on_receive_dispatch_from<
912 Req: JsonRpcRequest,
913 Notif: JsonRpcNotification,
914 Peer: Role,
915 F,
916 T,
917 ToFut,
918 >(
919 self,
920 peer: Peer,
921 op: F,
922 to_future_hack: ToFut,
923 ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
924 where
925 Host::Counterpart: HasPeer<Peer>,
926 F: AsyncFnMut(
927 Dispatch<Req, Notif>,
928 ConnectionTo<Host::Counterpart>,
929 ) -> Result<T, crate::Error>
930 + Send,
931 T: IntoHandled<Dispatch<Req, Notif>>,
932 ToFut: Fn(
933 &mut F,
934 Dispatch<Req, Notif>,
935 ConnectionTo<Host::Counterpart>,
936 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
937 + Send
938 + Sync,
939 {
940 let handler = MessageHandler::new(self.host.counterpart(), peer, op, to_future_hack);
941 self.with_handler(handler)
942 }
943
944 /// Register a handler for JSON-RPC requests from a specific peer.
945 ///
946 /// This is similar to [`on_receive_request`](Self::on_receive_request), but allows
947 /// specifying the source peer explicitly. This is useful when receiving messages
948 /// from a peer that requires message transformation (e.g., unwrapping `SuccessorRequest`
949 /// envelopes when receiving from an agent via a proxy).
950 ///
951 /// For the common case of receiving from the default counterpart, use
952 /// [`on_receive_request`](Self::on_receive_request) instead.
953 ///
954 /// # Example
955 ///
956 /// ```ignore
957 /// use agent_client_protocol::Agent;
958 /// use agent_client_protocol::schema::InitializeRequest;
959 ///
960 /// // Conductor receiving from agent direction - messages will be unwrapped from SuccessorMessage
961 /// connection.on_receive_request_from(Agent, async |req: InitializeRequest, responder, cx| {
962 /// // Handle the request
963 /// responder.respond(InitializeResponse::make())
964 /// })
965 /// ```
966 ///
967 /// # Ordering
968 ///
969 /// This callback runs inside the dispatch loop and blocks further message processing
970 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
971 /// ordering guarantees and how to avoid deadlocks.
972 pub fn on_receive_request_from<Req: JsonRpcRequest, Peer: Role, F, T, ToFut>(
973 self,
974 peer: Peer,
975 op: F,
976 to_future_hack: ToFut,
977 ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
978 where
979 Host::Counterpart: HasPeer<Peer>,
980 F: AsyncFnMut(
981 Req,
982 Responder<Req::Response>,
983 ConnectionTo<Host::Counterpart>,
984 ) -> Result<T, crate::Error>
985 + Send,
986 T: IntoHandled<(Req, Responder<Req::Response>)>,
987 ToFut: Fn(
988 &mut F,
989 Req,
990 Responder<Req::Response>,
991 ConnectionTo<Host::Counterpart>,
992 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
993 + Send
994 + Sync,
995 {
996 let handler = RequestHandler::new(self.host.counterpart(), peer, op, to_future_hack);
997 self.with_handler(handler)
998 }
999
1000 /// Register a handler for JSON-RPC notifications from a specific peer.
1001 ///
1002 /// This is similar to [`on_receive_notification`](Self::on_receive_notification), but allows
1003 /// specifying the source peer explicitly. This is useful when receiving messages
1004 /// from a peer that requires message transformation (e.g., unwrapping `SuccessorNotification`
1005 /// envelopes when receiving from an agent via a proxy).
1006 ///
1007 /// For the common case of receiving from the default counterpart, use
1008 /// [`on_receive_notification`](Self::on_receive_notification) instead.
1009 ///
1010 /// # Ordering
1011 ///
1012 /// This callback runs inside the dispatch loop and blocks further message processing
1013 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
1014 /// ordering guarantees and how to avoid deadlocks.
1015 pub fn on_receive_notification_from<Notif: JsonRpcNotification, Peer: Role, F, T, ToFut>(
1016 self,
1017 peer: Peer,
1018 op: F,
1019 to_future_hack: ToFut,
1020 ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
1021 where
1022 Host::Counterpart: HasPeer<Peer>,
1023 F: AsyncFnMut(Notif, ConnectionTo<Host::Counterpart>) -> Result<T, crate::Error> + Send,
1024 T: IntoHandled<(Notif, ConnectionTo<Host::Counterpart>)>,
1025 ToFut: Fn(
1026 &mut F,
1027 Notif,
1028 ConnectionTo<Host::Counterpart>,
1029 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
1030 + Send
1031 + Sync,
1032 {
1033 let handler = NotificationHandler::new(self.host.counterpart(), peer, op, to_future_hack);
1034 self.with_handler(handler)
1035 }
1036
1037 /// Add an MCP server that will be added to all new sessions that are proxied through this connection.
1038 ///
1039 /// Only applicable to proxies.
1040 pub fn with_mcp_server(
1041 self,
1042 mcp_server: McpServer<Host::Counterpart, impl RunWithConnectionTo<Host::Counterpart>>,
1043 ) -> Builder<
1044 Host,
1045 impl HandleDispatchFrom<Host::Counterpart>,
1046 impl RunWithConnectionTo<Host::Counterpart>,
1047 >
1048 where
1049 Host::Counterpart: HasPeer<Agent> + HasPeer<Client>,
1050 {
1051 let (handler, responder) = mcp_server.into_handler_and_responder();
1052 self.with_handler(handler).with_responder(responder)
1053 }
1054
1055 /// Run in server mode with the provided transport.
1056 ///
1057 /// This drives the connection by continuously processing messages from the transport
1058 /// and dispatching them to your registered handlers. The connection will run until:
1059 /// - The transport closes (e.g., EOF on byte streams)
1060 /// - An error occurs
1061 /// - One of your handlers returns an error
1062 ///
1063 /// The transport is responsible for serializing and deserializing `jsonrpcmsg::Message`
1064 /// values to/from the underlying I/O mechanism (byte streams, channels, etc.).
1065 ///
1066 /// Use this mode when you only need to respond to incoming messages and don't need
1067 /// to initiate your own requests. If you need to send requests to the other side,
1068 /// use [`connect_with`](Self::connect_with) instead.
1069 ///
1070 /// # Example: Byte Stream Transport
1071 ///
1072 /// ```no_run
1073 /// # use agent_client_protocol::UntypedRole;
1074 /// # use agent_client_protocol::{Builder};
1075 /// # use agent_client_protocol::Stdio;
1076 /// # use agent_client_protocol_test::*;
1077 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
1078 /// let transport = Stdio::new();
1079 ///
1080 /// UntypedRole.builder()
1081 /// .on_receive_request(async |req: MyRequest, responder, cx| {
1082 /// responder.respond(MyResponse { status: "ok".into() })
1083 /// }, agent_client_protocol::on_receive_request!())
1084 /// .connect_to(transport)
1085 /// .await?;
1086 /// # Ok(())
1087 /// # }
1088 /// ```
1089 pub async fn connect_to(
1090 self,
1091 transport: impl ConnectTo<Host> + 'static,
1092 ) -> Result<(), crate::Error> {
1093 self.connect_with(transport, async move |_cx| future::pending().await)
1094 .await
1095 }
1096
1097 /// Run the connection until the provided closure completes.
1098 ///
1099 /// This drives the connection by:
1100 /// 1. Running your registered handlers in the background to process incoming messages
1101 /// 2. Executing your `main_fn` closure with a [`ConnectionTo<R>`] for sending requests/notifications
1102 ///
1103 /// The connection stays active until your `main_fn` returns, then shuts down gracefully.
1104 /// If the connection closes unexpectedly before `main_fn` completes, this returns an error.
1105 ///
1106 /// Use this mode when you need to initiate communication (send requests/notifications)
1107 /// in addition to responding to incoming messages. For server-only mode where you just
1108 /// respond to messages, use [`connect_to`](Self::connect_to) instead.
1109 ///
1110 /// # Example
1111 ///
1112 /// ```no_run
1113 /// # use agent_client_protocol::UntypedRole;
1114 /// # use agent_client_protocol::{Builder};
1115 /// # use agent_client_protocol::ByteStreams;
1116 /// # use agent_client_protocol::schema::InitializeRequest;
1117 /// # use agent_client_protocol::Stdio;
1118 /// # use agent_client_protocol_test::*;
1119 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
1120 /// let transport = Stdio::new();
1121 ///
1122 /// UntypedRole.builder()
1123 /// .on_receive_request(async |req: MyRequest, responder, cx| {
1124 /// // Handle incoming requests in the background
1125 /// responder.respond(MyResponse { status: "ok".into() })
1126 /// }, agent_client_protocol::on_receive_request!())
1127 /// .connect_with(transport, async |cx| {
1128 /// // Initialize the protocol
1129 /// let init_response = cx.send_request(InitializeRequest::make())
1130 /// .block_task()
1131 /// .await?;
1132 ///
1133 /// // Send more requests...
1134 /// let result = cx.send_request(MyRequest {})
1135 /// .block_task()
1136 /// .await?;
1137 ///
1138 /// // When this closure returns, the connection shuts down
1139 /// Ok(())
1140 /// })
1141 /// .await?;
1142 /// # Ok(())
1143 /// # }
1144 /// ```
1145 ///
1146 /// # Parameters
1147 ///
1148 /// - `main_fn`: Your client logic. Receives a [`ConnectionTo<R>`] for sending messages.
1149 ///
1150 /// # Errors
1151 ///
1152 /// Returns an error if the connection closes before `main_fn` completes.
1153 pub async fn connect_with<R>(
1154 self,
1155 transport: impl ConnectTo<Host> + 'static,
1156 main_fn: impl AsyncFnOnce(ConnectionTo<Host::Counterpart>) -> Result<R, crate::Error>,
1157 ) -> Result<R, crate::Error> {
1158 let (_, future) = self.into_connection_and_future(transport, main_fn);
1159 future.await
1160 }
1161
1162 /// Helper that returns a [`ConnectionTo<R>`] and a future that runs this connection until `main_fn` returns.
1163 fn into_connection_and_future<R>(
1164 self,
1165 transport: impl ConnectTo<Host> + 'static,
1166 main_fn: impl AsyncFnOnce(ConnectionTo<Host::Counterpart>) -> Result<R, crate::Error>,
1167 ) -> (
1168 ConnectionTo<Host::Counterpart>,
1169 impl Future<Output = Result<R, crate::Error>>,
1170 ) {
1171 let Self {
1172 name,
1173 handler,
1174 responder,
1175 host: me,
1176 } = self;
1177
1178 let (outgoing_tx, outgoing_rx) = mpsc::unbounded();
1179 let (new_task_tx, new_task_rx) = mpsc::unbounded();
1180 let (dynamic_handler_tx, dynamic_handler_rx) = mpsc::unbounded();
1181 let connection = ConnectionTo::new(
1182 me.counterpart(),
1183 outgoing_tx,
1184 new_task_tx,
1185 dynamic_handler_tx,
1186 );
1187
1188 // Convert transport into server - this returns a channel for us to use
1189 // and a future that runs the transport
1190 let transport_component = crate::DynConnectTo::new(transport);
1191 let (transport_channel, transport_future) = transport_component.into_channel_and_future();
1192 let spawn_result = connection.spawn(transport_future);
1193
1194 // Destructure the channel endpoints
1195 let Channel {
1196 rx: transport_incoming_rx,
1197 tx: transport_outgoing_tx,
1198 } = transport_channel;
1199
1200 let (reply_tx, reply_rx) = mpsc::unbounded();
1201
1202 let future = crate::util::instrument_with_connection_name(name, {
1203 let connection = connection.clone();
1204 async move {
1205 let () = spawn_result?;
1206
1207 let background = async {
1208 futures::try_join!(
1209 // Protocol layer: OutgoingMessage → jsonrpcmsg::Message
1210 outgoing_actor::outgoing_protocol_actor(
1211 outgoing_rx,
1212 reply_tx.clone(),
1213 transport_outgoing_tx,
1214 ),
1215 // Protocol layer: jsonrpcmsg::Message → handler/reply routing
1216 incoming_actor::incoming_protocol_actor(
1217 me.counterpart(),
1218 &connection,
1219 transport_incoming_rx,
1220 dynamic_handler_rx,
1221 reply_rx,
1222 handler,
1223 ),
1224 task_actor::task_actor(new_task_rx, &connection),
1225 responder.run_with_connection_to(connection.clone()),
1226 )?;
1227 Ok(())
1228 };
1229
1230 crate::util::run_until(Box::pin(background), Box::pin(main_fn(connection.clone())))
1231 .await
1232 }
1233 });
1234
1235 (connection, future)
1236 }
1237}
1238
1239impl<R, H, Run> ConnectTo<R::Counterpart> for Builder<R, H, Run>
1240where
1241 R: Role,
1242 H: HandleDispatchFrom<R::Counterpart> + 'static,
1243 Run: RunWithConnectionTo<R::Counterpart> + 'static,
1244{
1245 async fn connect_to(self, client: impl ConnectTo<R>) -> Result<(), crate::Error> {
1246 Builder::connect_to(self, client).await
1247 }
1248}
1249
1250/// The payload sent through the response oneshot channel.
1251///
1252/// Includes the response value and an optional ack channel for dispatch loop
1253/// synchronization.
1254pub(crate) struct ResponsePayload {
1255 /// The response result - either the JSON value or an error.
1256 pub(crate) result: Result<serde_json::Value, crate::Error>,
1257
1258 /// Optional acknowledgment channel for dispatch loop synchronization.
1259 ///
1260 /// When present, the receiver must send on this channel to signal that
1261 /// response processing is complete, allowing the dispatch loop to continue
1262 /// to the next message.
1263 ///
1264 /// This is `None` for error paths where the response is sent directly
1265 /// (e.g., when the outgoing channel is broken) rather than through the
1266 /// normal dispatch loop flow.
1267 pub(crate) ack_tx: Option<oneshot::Sender<()>>,
1268}
1269
1270impl std::fmt::Debug for ResponsePayload {
1271 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1272 f.debug_struct("ResponsePayload")
1273 .field("result", &self.result)
1274 .field("ack_tx", &self.ack_tx.as_ref().map(|_| "..."))
1275 .finish()
1276 }
1277}
1278
1279/// Message sent to the incoming actor for reply subscription management.
1280enum ReplyMessage {
1281 /// Subscribe to receive a response for the given request id.
1282 /// When a response with this id arrives, it will be sent through the oneshot
1283 /// along with an ack channel that must be signaled when processing is complete.
1284 /// The method name is stored to allow routing responses through typed handlers.
1285 Subscribe {
1286 id: jsonrpcmsg::Id,
1287
1288 /// id of the peer this request was sent to
1289 role_id: RoleId,
1290
1291 /// (original) method of the request -- the actual request may have been transformed
1292 /// to a successor method, but this will reflect the method of the wrapped request
1293 method: String,
1294
1295 sender: oneshot::Sender<ResponsePayload>,
1296 },
1297}
1298
1299impl std::fmt::Debug for ReplyMessage {
1300 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1301 match self {
1302 ReplyMessage::Subscribe { id, method, .. } => f
1303 .debug_struct("Subscribe")
1304 .field("id", id)
1305 .field("method", method)
1306 .finish(),
1307 }
1308 }
1309}
1310
1311/// Messages send to be serialized over the transport.
1312#[derive(Debug)]
1313enum OutgoingMessage {
1314 /// Send a request to the server.
1315 Request {
1316 /// id assigned to this request (generated by sender)
1317 id: jsonrpcmsg::Id,
1318
1319 /// the original method
1320 method: String,
1321
1322 /// the peer we sent this to
1323 role_id: RoleId,
1324
1325 /// the message to send; this may have a distinct method
1326 /// depending on the peer
1327 untyped: UntypedMessage,
1328
1329 /// where to send the response when it arrives (includes ack channel)
1330 response_tx: oneshot::Sender<ResponsePayload>,
1331 },
1332
1333 /// Send a notification to the server.
1334 Notification {
1335 /// the message to send; this may have a distinct method
1336 /// depending on the peer
1337 untyped: UntypedMessage,
1338 },
1339
1340 /// Send a response to a message from the server
1341 Response {
1342 id: jsonrpcmsg::Id,
1343
1344 response: Result<serde_json::Value, crate::Error>,
1345 },
1346
1347 /// Send a generalized error message
1348 Error { error: crate::Error },
1349}
1350
1351/// Return type from JrHandler; indicates whether the request was handled or not.
1352#[must_use]
1353#[derive(Debug)]
1354pub enum Handled<T> {
1355 /// The message was handled
1356 Yes,
1357
1358 /// The message was not handled; returns the original value.
1359 ///
1360 /// If `retry` is true,
1361 No {
1362 /// The message to be passed to subsequent handlers
1363 /// (typically the original message, but it may have been
1364 /// mutated.)
1365 message: T,
1366
1367 /// If true, request the message to be queued and retried with
1368 /// dynamic handlers as they are added.
1369 ///
1370 /// This is used for managing session updates since the dynamic
1371 /// handler for a session cannot be added until the response to the
1372 /// new session request has been processed and there may be updates
1373 /// that get processed at the same time.
1374 retry: bool,
1375 },
1376}
1377
1378/// Trait for converting handler return values into [`Handled`].
1379///
1380/// This trait allows handlers to return either `()` (which becomes `Handled::Yes`)
1381/// or an explicit `Handled<T>` value for more control over handler propagation.
1382pub trait IntoHandled<T> {
1383 /// Convert this value into a `Handled<T>`.
1384 fn into_handled(self) -> Handled<T>;
1385}
1386
1387impl<T> IntoHandled<T> for () {
1388 fn into_handled(self) -> Handled<T> {
1389 Handled::Yes
1390 }
1391}
1392
1393impl<T> IntoHandled<T> for Handled<T> {
1394 fn into_handled(self) -> Handled<T> {
1395 self
1396 }
1397}
1398
1399/// Connection context for sending messages and spawning tasks.
1400///
1401/// This is the primary handle for interacting with the JSON-RPC connection from
1402/// within handler callbacks. You can use it to:
1403///
1404/// * Send requests and notifications to the other side
1405/// * Spawn concurrent tasks that run alongside the connection
1406/// * Respond to requests (via [`Responder`] which wraps this)
1407///
1408/// # Cloning
1409///
1410/// `ConnectionTo` is cheaply cloneable - all clones refer to the same underlying connection.
1411/// This makes it easy to share across async tasks.
1412///
1413/// # Event Loop and Concurrency
1414///
1415/// Handler callbacks run on the event loop, which means the connection cannot process new
1416/// messages while your handler is running. Use [`spawn`](Self::spawn) to offload any
1417/// expensive or blocking work to concurrent tasks.
1418///
1419/// See the [Event Loop and Concurrency](Builder#event-loop-and-concurrency) section
1420/// for more details.
1421#[derive(Clone, Debug)]
1422pub struct ConnectionTo<Counterpart: Role> {
1423 counterpart: Counterpart,
1424 message_tx: OutgoingMessageTx,
1425 task_tx: TaskTx,
1426 dynamic_handler_tx: mpsc::UnboundedSender<DynamicHandlerMessage<Counterpart>>,
1427}
1428
1429impl<Counterpart: Role> ConnectionTo<Counterpart> {
1430 fn new(
1431 counterpart: Counterpart,
1432 message_tx: mpsc::UnboundedSender<OutgoingMessage>,
1433 task_tx: mpsc::UnboundedSender<Task>,
1434 dynamic_handler_tx: mpsc::UnboundedSender<DynamicHandlerMessage<Counterpart>>,
1435 ) -> Self {
1436 Self {
1437 counterpart,
1438 message_tx,
1439 task_tx,
1440 dynamic_handler_tx,
1441 }
1442 }
1443
1444 /// Return the counterpart role this connection is talking to.
1445 pub fn counterpart(&self) -> Counterpart {
1446 self.counterpart.clone()
1447 }
1448
1449 /// Spawns a task that will run so long as the JSON-RPC connection is being served.
1450 ///
1451 /// This is the primary mechanism for offloading expensive work from handler callbacks
1452 /// to avoid blocking the event loop. Spawned tasks run concurrently with the connection,
1453 /// allowing the server to continue processing messages.
1454 ///
1455 /// # Event Loop
1456 ///
1457 /// Handler callbacks run on the event loop, which cannot process new messages while
1458 /// your handler is running. Use `spawn` for any expensive operations:
1459 ///
1460 /// ```no_run
1461 /// # use agent_client_protocol_test::*;
1462 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
1463 /// # let connection = mock_connection();
1464 /// connection.on_receive_request(async |req: ProcessRequest, responder, cx| {
1465 /// // Clone cx for the spawned task
1466 /// cx.spawn({
1467 /// let connection = cx.clone();
1468 /// async move {
1469 /// let result = expensive_operation(&req.data).await?;
1470 /// connection.send_notification(ProcessComplete { result })?;
1471 /// Ok(())
1472 /// }
1473 /// })?;
1474 ///
1475 /// // Respond immediately
1476 /// responder.respond(ProcessResponse { result: "started".into() })
1477 /// }, agent_client_protocol::on_receive_request!())
1478 /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
1479 /// # Ok(())
1480 /// # }
1481 /// ```
1482 ///
1483 /// # Errors
1484 ///
1485 /// If the spawned task returns an error, the entire server will shut down.
1486 #[track_caller]
1487 pub fn spawn(
1488 &self,
1489 task: impl IntoFuture<Output = Result<(), crate::Error>, IntoFuture: Send + 'static>,
1490 ) -> Result<(), crate::Error> {
1491 let location = std::panic::Location::caller();
1492 let task = task.into_future();
1493 Task::new(location, task).spawn(&self.task_tx)
1494 }
1495
1496 /// Spawn a JSON-RPC connection in the background and return a [`ConnectionTo`] for sending messages to it.
1497 ///
1498 /// This is useful for creating multiple connections that communicate with each other,
1499 /// such as implementing proxy patterns or connecting to multiple backend services.
1500 ///
1501 /// # Arguments
1502 ///
1503 /// - `builder`: The connection builder with handlers configured
1504 /// - `transport`: The transport component to connect to
1505 ///
1506 /// # Returns
1507 ///
1508 /// A `ConnectionTo` that you can use to send requests and notifications to the spawned connection.
1509 ///
1510 /// # Example: Proxying to a backend connection
1511 ///
1512 /// ```
1513 /// # use agent_client_protocol::UntypedRole;
1514 /// # use agent_client_protocol::{Builder, ConnectionTo};
1515 /// # use agent_client_protocol_test::*;
1516 /// # async fn example(cx: ConnectionTo<UntypedRole>) -> Result<(), agent_client_protocol::Error> {
1517 /// // Set up a backend connection builder
1518 /// let backend = UntypedRole.builder()
1519 /// .on_receive_request(async |req: MyRequest, responder, _cx| {
1520 /// responder.respond(MyResponse { status: "ok".into() })
1521 /// }, agent_client_protocol::on_receive_request!());
1522 ///
1523 /// // Spawn it and get a context to send requests to it
1524 /// let backend_connection = cx.spawn_connection(backend, MockTransport)?;
1525 ///
1526 /// // Now you can forward requests to the backend
1527 /// let response = backend_connection.send_request(MyRequest {}).block_task().await?;
1528 /// # Ok(())
1529 /// # }
1530 /// ```
1531 #[track_caller]
1532 pub fn spawn_connection<R: Role>(
1533 &self,
1534 builder: Builder<
1535 R,
1536 impl HandleDispatchFrom<R::Counterpart> + 'static,
1537 impl RunWithConnectionTo<R::Counterpart> + 'static,
1538 >,
1539 transport: impl ConnectTo<R> + 'static,
1540 ) -> Result<ConnectionTo<R::Counterpart>, crate::Error> {
1541 let (connection, future) =
1542 builder.into_connection_and_future(transport, |_| std::future::pending());
1543 Task::new(std::panic::Location::caller(), future).spawn(&self.task_tx)?;
1544 Ok(connection)
1545 }
1546
1547 /// Send a request/notification and forward the response appropriately.
1548 ///
1549 /// The request context's response type matches the request's response type,
1550 /// enabling type-safe message forwarding.
1551 pub fn send_proxied_message<Req: JsonRpcRequest<Response: Send>, Notif: JsonRpcNotification>(
1552 &self,
1553 message: Dispatch<Req, Notif>,
1554 ) -> Result<(), crate::Error>
1555 where
1556 Counterpart: HasPeer<Counterpart>,
1557 {
1558 self.send_proxied_message_to(self.counterpart(), message)
1559 }
1560
1561 /// Send a request/notification and forward the response appropriately.
1562 ///
1563 /// The request context's response type matches the request's response type,
1564 /// enabling type-safe message forwarding.
1565 pub fn send_proxied_message_to<
1566 Peer: Role,
1567 Req: JsonRpcRequest<Response: Send>,
1568 Notif: JsonRpcNotification,
1569 >(
1570 &self,
1571 peer: Peer,
1572 message: Dispatch<Req, Notif>,
1573 ) -> Result<(), crate::Error>
1574 where
1575 Counterpart: HasPeer<Peer>,
1576 {
1577 match message {
1578 Dispatch::Request(request, responder) => self
1579 .send_request_to(peer, request)
1580 .forward_response_to(responder),
1581 Dispatch::Notification(notification) => self.send_notification_to(peer, notification),
1582 Dispatch::Response(result, router) => {
1583 // Responses are forwarded directly to their destination
1584 router.respond_with_result(result)
1585 }
1586 }
1587 }
1588
1589 /// Send an outgoing request and return a [`SentRequest`] for handling the reply.
1590 ///
1591 /// The returned [`SentRequest`] provides methods for receiving the response without
1592 /// blocking the event loop:
1593 ///
1594 /// * [`on_receiving_result`](SentRequest::on_receiving_result) - Schedule
1595 /// a callback to run when the response arrives (doesn't block the event loop)
1596 /// * [`block_task`](SentRequest::block_task) - Block the current task until the response
1597 /// arrives (only safe in spawned tasks, not in handlers)
1598 ///
1599 /// # Anti-Footgun Design
1600 ///
1601 /// The API intentionally makes it difficult to block on the result directly to prevent
1602 /// the common mistake of blocking the event loop while waiting for a response:
1603 ///
1604 /// ```compile_fail
1605 /// # use agent_client_protocol_test::*;
1606 /// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
1607 /// // ❌ This doesn't compile - prevents blocking the event loop
1608 /// let response = cx.send_request(MyRequest {}).await?;
1609 /// # Ok(())
1610 /// # }
1611 /// ```
1612 ///
1613 /// ```no_run
1614 /// # use agent_client_protocol_test::*;
1615 /// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
1616 /// // ✅ Option 1: Schedule callback (safe in handlers)
1617 /// cx.send_request(MyRequest {})
1618 /// .on_receiving_result(async |result| {
1619 /// // Handle the response
1620 /// Ok(())
1621 /// })?;
1622 ///
1623 /// // ✅ Option 2: Block in spawned task (safe because task is concurrent)
1624 /// cx.spawn({
1625 /// let cx = cx.clone();
1626 /// async move {
1627 /// let response = cx.send_request(MyRequest {})
1628 /// .block_task()
1629 /// .await?;
1630 /// // Process response...
1631 /// Ok(())
1632 /// }
1633 /// })?;
1634 /// # Ok(())
1635 /// # }
1636 /// ```
1637 /// Send an outgoing request to the default counterpart peer.
1638 ///
1639 /// This is a convenience method that sends to the counterpart role `R`.
1640 /// For explicit control over the target peer, use [`send_request_to`](Self::send_request_to).
1641 pub fn send_request<Req: JsonRpcRequest>(&self, request: Req) -> SentRequest<Req::Response>
1642 where
1643 Counterpart: HasPeer<Counterpart>,
1644 {
1645 self.send_request_to(self.counterpart.clone(), request)
1646 }
1647
1648 /// Send an outgoing request to a specific peer.
1649 ///
1650 /// The message will be transformed according to the [`HasPeer`](crate::role::HasPeer)
1651 /// implementation before being sent.
1652 pub fn send_request_to<Peer: Role, Req: JsonRpcRequest>(
1653 &self,
1654 peer: Peer,
1655 request: Req,
1656 ) -> SentRequest<Req::Response>
1657 where
1658 Counterpart: HasPeer<Peer>,
1659 {
1660 let method = request.method().to_string();
1661 let id = jsonrpcmsg::Id::String(uuid::Uuid::new_v4().to_string());
1662 let (response_tx, response_rx) = oneshot::channel();
1663 let role_id = peer.role_id();
1664 let remote_style = self.counterpart.remote_style(peer);
1665 match remote_style.transform_outgoing_message(request) {
1666 Ok(untyped) => {
1667 // Transform the message for the target role
1668 let message = OutgoingMessage::Request {
1669 id: id.clone(),
1670 method: method.clone(),
1671 role_id,
1672 untyped,
1673 response_tx,
1674 };
1675
1676 match self.message_tx.unbounded_send(message) {
1677 Ok(()) => (),
1678 Err(error) => {
1679 let OutgoingMessage::Request {
1680 method,
1681 response_tx,
1682 ..
1683 } = error.into_inner()
1684 else {
1685 unreachable!();
1686 };
1687
1688 response_tx
1689 .send(ResponsePayload {
1690 result: Err(crate::util::internal_error(format!(
1691 "failed to send outgoing request `{method}"
1692 ))),
1693 ack_tx: None,
1694 })
1695 .unwrap();
1696 }
1697 }
1698 }
1699
1700 Err(err) => {
1701 response_tx
1702 .send(ResponsePayload {
1703 result: Err(crate::util::internal_error(format!(
1704 "failed to create untyped request for `{method}`: {err}"
1705 ))),
1706 ack_tx: None,
1707 })
1708 .unwrap();
1709 }
1710 }
1711
1712 SentRequest::new(id, method.clone(), self.task_tx.clone(), response_rx)
1713 .map(move |json| <Req::Response>::from_value(&method, json))
1714 }
1715
1716 /// Send an outgoing notification to the default counterpart peer (no reply expected).
1717 ///
1718 /// Notifications are fire-and-forget messages that don't have IDs and don't expect responses.
1719 /// This method sends the notification immediately and returns.
1720 ///
1721 /// This is a convenience method that sends to the counterpart role `R`.
1722 /// For explicit control over the target peer, use [`send_notification_to`](Self::send_notification_to).
1723 ///
1724 /// ```no_run
1725 /// # use agent_client_protocol_test::*;
1726 /// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::Agent>) -> Result<(), agent_client_protocol::Error> {
1727 /// cx.send_notification(StatusUpdate {
1728 /// message: "Processing...".into(),
1729 /// })?;
1730 /// # Ok(())
1731 /// # }
1732 /// ```
1733 pub fn send_notification<N: JsonRpcNotification>(
1734 &self,
1735 notification: N,
1736 ) -> Result<(), crate::Error>
1737 where
1738 Counterpart: HasPeer<Counterpart>,
1739 {
1740 self.send_notification_to(self.counterpart.clone(), notification)
1741 }
1742
1743 /// Send an outgoing notification to a specific peer (no reply expected).
1744 ///
1745 /// The message will be transformed according to the [`HasPeer`](crate::role::HasPeer)
1746 /// implementation before being sent.
1747 pub fn send_notification_to<Peer: Role, N: JsonRpcNotification>(
1748 &self,
1749 peer: Peer,
1750 notification: N,
1751 ) -> Result<(), crate::Error>
1752 where
1753 Counterpart: HasPeer<Peer>,
1754 {
1755 let remote_style = self.counterpart.remote_style(peer);
1756 tracing::debug!(
1757 role = std::any::type_name::<Counterpart>(),
1758 peer = std::any::type_name::<Peer>(),
1759 notification_type = std::any::type_name::<N>(),
1760 ?remote_style,
1761 original_method = notification.method(),
1762 "send_notification_to"
1763 );
1764 let transformed = remote_style.transform_outgoing_message(notification)?;
1765 tracing::debug!(
1766 transformed_method = %transformed.method,
1767 "send_notification_to transformed"
1768 );
1769 send_raw_message(
1770 &self.message_tx,
1771 OutgoingMessage::Notification {
1772 untyped: transformed,
1773 },
1774 )
1775 }
1776
1777 /// Send an error notification (no reply expected).
1778 pub fn send_error_notification(&self, error: crate::Error) -> Result<(), crate::Error> {
1779 send_raw_message(&self.message_tx, OutgoingMessage::Error { error })
1780 }
1781
1782 /// Register a dynamic message handler, used to intercept messages specific to a particular session
1783 /// or some similar modal thing.
1784 ///
1785 /// Dynamic message handlers are called first for every incoming message.
1786 ///
1787 /// If they decline to handle the message, then the message is passed to the regular registered handlers.
1788 ///
1789 /// The handler will stay registered until the returned registration guard is dropped.
1790 pub fn add_dynamic_handler(
1791 &self,
1792 handler: impl HandleDispatchFrom<Counterpart> + 'static,
1793 ) -> Result<DynamicHandlerRegistration<Counterpart>, crate::Error> {
1794 let uuid = Uuid::new_v4();
1795 self.dynamic_handler_tx
1796 .unbounded_send(DynamicHandlerMessage::AddDynamicHandler(
1797 uuid,
1798 Box::new(handler),
1799 ))
1800 .map_err(crate::util::internal_error)?;
1801
1802 Ok(DynamicHandlerRegistration::new(uuid, self.clone()))
1803 }
1804
1805 fn remove_dynamic_handler(&self, uuid: Uuid) {
1806 // Ignore errors
1807 drop(
1808 self.dynamic_handler_tx
1809 .unbounded_send(DynamicHandlerMessage::RemoveDynamicHandler(uuid)),
1810 );
1811 }
1812}
1813
1814#[derive(Clone, Debug)]
1815pub struct DynamicHandlerRegistration<R: Role> {
1816 uuid: Uuid,
1817 cx: ConnectionTo<R>,
1818}
1819
1820impl<R: Role> DynamicHandlerRegistration<R> {
1821 fn new(uuid: Uuid, cx: ConnectionTo<R>) -> Self {
1822 Self { uuid, cx }
1823 }
1824
1825 /// Prevents the dynamic handler from being removed when dropped.
1826 pub fn run_indefinitely(self) {
1827 std::mem::forget(self);
1828 }
1829}
1830
1831impl<R: Role> Drop for DynamicHandlerRegistration<R> {
1832 fn drop(&mut self) {
1833 self.cx.remove_dynamic_handler(self.uuid);
1834 }
1835}
1836
1837/// The context to respond to an incoming request.
1838///
1839/// This context is provided to request handlers and serves a dual role:
1840///
1841/// 1. **Respond to the request** - Use [`respond`](Self::respond) or
1842/// [`respond_with_result`](Self::respond_with_result) to send the response
1843/// 2. **Send other messages** - Use the [`ConnectionTo`] parameter passed to your
1844/// handler, which provides [`send_request`](`ConnectionTo::send_request`),
1845/// [`send_notification`](`ConnectionTo::send_notification`), and
1846/// [`spawn`](`ConnectionTo::spawn`)
1847///
1848/// # Example
1849///
1850/// ```no_run
1851/// # use agent_client_protocol_test::*;
1852/// # async fn example() -> Result<(), agent_client_protocol::Error> {
1853/// # let connection = mock_connection();
1854/// connection.on_receive_request(async |req: ProcessRequest, responder, cx| {
1855/// // Send a notification while processing
1856/// cx.send_notification(StatusUpdate {
1857/// message: "processing".into(),
1858/// })?;
1859///
1860/// // Do some work...
1861/// let result = process(&req.data)?;
1862///
1863/// // Respond to the request
1864/// responder.respond(ProcessResponse { result })
1865/// }, agent_client_protocol::on_receive_request!())
1866/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
1867/// # Ok(())
1868/// # }
1869/// ```
1870///
1871/// # Event Loop Considerations
1872///
1873/// Like all handlers, request handlers run on the event loop. Use
1874/// [`spawn`](ConnectionTo::spawn) for expensive operations to avoid blocking
1875/// the connection.
1876///
1877/// See the [Event Loop and Concurrency](Builder#event-loop-and-concurrency)
1878/// section for more details.
1879#[must_use]
1880pub struct Responder<T: JsonRpcResponse = serde_json::Value> {
1881 /// The method of the request.
1882 method: String,
1883
1884 /// The `id` of the message we are replying to.
1885 id: jsonrpcmsg::Id,
1886
1887 /// Function to send the response to its destination.
1888 ///
1889 /// For incoming requests: serializes to JSON and sends over the wire.
1890 /// For incoming responses: sends to the waiting oneshot channel.
1891 send_fn: Box<dyn FnOnce(Result<T, crate::Error>) -> Result<(), crate::Error> + Send>,
1892}
1893
1894impl<T: JsonRpcResponse> std::fmt::Debug for Responder<T> {
1895 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1896 f.debug_struct("Responder")
1897 .field("method", &self.method)
1898 .field("id", &self.id)
1899 .field("response_type", &std::any::type_name::<T>())
1900 .finish_non_exhaustive()
1901 }
1902}
1903
1904impl Responder<serde_json::Value> {
1905 /// Create a new request context for an incoming request.
1906 ///
1907 /// The response will be serialized to JSON and sent over the wire.
1908 fn new(message_tx: OutgoingMessageTx, method: String, id: jsonrpcmsg::Id) -> Self {
1909 let id_clone = id.clone();
1910 Self {
1911 method,
1912 id,
1913 send_fn: Box::new(move |response: Result<serde_json::Value, crate::Error>| {
1914 send_raw_message(
1915 &message_tx,
1916 OutgoingMessage::Response {
1917 id: id_clone,
1918 response,
1919 },
1920 )
1921 }),
1922 }
1923 }
1924
1925 /// Cast this request context to a different response type.
1926 ///
1927 /// The provided type `T` will be serialized to JSON before sending.
1928 pub fn cast<T: JsonRpcResponse>(self) -> Responder<T> {
1929 self.wrap_params(move |method, value| match value {
1930 Ok(value) => T::into_json(value, method),
1931 Err(e) => Err(e),
1932 })
1933 }
1934}
1935
1936impl<T: JsonRpcResponse> Responder<T> {
1937 /// Method of the incoming request
1938 #[must_use]
1939 pub fn method(&self) -> &str {
1940 &self.method
1941 }
1942
1943 /// ID of the incoming request/response as a JSON value
1944 #[must_use]
1945 pub fn id(&self) -> serde_json::Value {
1946 crate::util::id_to_json(&self.id)
1947 }
1948
1949 /// Convert to a `Responder` that expects a JSON value
1950 /// and which checks (dynamically) that the JSON value it receives
1951 /// can be converted to `T`.
1952 pub fn erase_to_json(self) -> Responder<serde_json::Value> {
1953 self.wrap_params(|method, value| T::from_value(method, value?))
1954 }
1955
1956 /// Return a new Responder with a different method name.
1957 pub fn wrap_method(self, method: String) -> Responder<T> {
1958 Responder {
1959 method,
1960 id: self.id,
1961 send_fn: self.send_fn,
1962 }
1963 }
1964
1965 /// Return a new Responder that expects a response of type U.
1966 ///
1967 /// `wrap_fn` will be invoked with the method name and the result to transform
1968 /// type `U` into type `T` before sending.
1969 pub fn wrap_params<U: JsonRpcResponse>(
1970 self,
1971 wrap_fn: impl FnOnce(&str, Result<U, crate::Error>) -> Result<T, crate::Error> + Send + 'static,
1972 ) -> Responder<U> {
1973 let method = self.method.clone();
1974 Responder {
1975 method: self.method,
1976 id: self.id,
1977 send_fn: Box::new(move |input: Result<U, crate::Error>| {
1978 let t_value = wrap_fn(&method, input);
1979 (self.send_fn)(t_value)
1980 }),
1981 }
1982 }
1983
1984 /// Respond to the JSON-RPC request with either a value (`Ok`) or an error (`Err`).
1985 pub fn respond_with_result(
1986 self,
1987 response: Result<T, crate::Error>,
1988 ) -> Result<(), crate::Error> {
1989 tracing::debug!(id = ?self.id, "respond called");
1990 (self.send_fn)(response)
1991 }
1992
1993 /// Respond to the JSON-RPC request with a value.
1994 pub fn respond(self, response: T) -> Result<(), crate::Error> {
1995 self.respond_with_result(Ok(response))
1996 }
1997
1998 /// Respond to the JSON-RPC request with an internal error containing a message.
1999 pub fn respond_with_internal_error(self, message: impl ToString) -> Result<(), crate::Error> {
2000 self.respond_with_error(crate::util::internal_error(message))
2001 }
2002
2003 /// Respond to the JSON-RPC request with an error.
2004 pub fn respond_with_error(self, error: crate::Error) -> Result<(), crate::Error> {
2005 tracing::debug!(id = ?self.id, ?error, "respond_with_error called");
2006 self.respond_with_result(Err(error))
2007 }
2008}
2009
2010/// Context for handling an incoming JSON-RPC response.
2011///
2012/// This is the response-side counterpart to [`Responder`]. While `Responder` handles
2013/// incoming requests (where you send a response over the wire), `ResponseRouter` handles
2014/// incoming responses (where you route the response to a local task waiting for it).
2015///
2016/// Both are fundamentally "sinks" that push the message through a `send_fn`, but they
2017/// represent different points in the message lifecycle and carry different metadata.
2018#[must_use]
2019pub struct ResponseRouter<T: JsonRpcResponse = serde_json::Value> {
2020 /// The method of the original request.
2021 method: String,
2022
2023 /// The `id` of the original request.
2024 id: jsonrpcmsg::Id,
2025
2026 /// The RoleId to which the original request was sent
2027 /// (and hence from which the reply is expected).
2028 role_id: RoleId,
2029
2030 /// Function to send the response to the waiting task.
2031 send_fn: Box<dyn FnOnce(Result<T, crate::Error>) -> Result<(), crate::Error> + Send>,
2032}
2033
2034impl<T: JsonRpcResponse> std::fmt::Debug for ResponseRouter<T> {
2035 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2036 f.debug_struct("ResponseRouter")
2037 .field("method", &self.method)
2038 .field("id", &self.id)
2039 .field("response_type", &std::any::type_name::<T>())
2040 .finish_non_exhaustive()
2041 }
2042}
2043
2044impl ResponseRouter<serde_json::Value> {
2045 /// Create a new response context for routing a response to a local awaiter.
2046 ///
2047 /// When `respond_with_result` is called, the response is sent through the oneshot
2048 /// channel to the code that originally sent the request.
2049 pub(crate) fn new(
2050 method: String,
2051 id: jsonrpcmsg::Id,
2052 role_id: RoleId,
2053 sender: oneshot::Sender<ResponsePayload>,
2054 ) -> Self {
2055 Self {
2056 method,
2057 id,
2058 role_id,
2059 send_fn: Box::new(move |response: Result<serde_json::Value, crate::Error>| {
2060 sender
2061 .send(ResponsePayload {
2062 result: response,
2063 ack_tx: None,
2064 })
2065 .map_err(|_| {
2066 crate::util::internal_error("failed to send response, receiver dropped")
2067 })
2068 }),
2069 }
2070 }
2071
2072 /// Cast this response context to a different response type.
2073 ///
2074 /// The provided type `T` will be serialized to JSON before sending.
2075 pub fn cast<T: JsonRpcResponse>(self) -> ResponseRouter<T> {
2076 self.wrap_params(move |method, value| match value {
2077 Ok(value) => T::into_json(value, method),
2078 Err(e) => Err(e),
2079 })
2080 }
2081}
2082
2083impl<T: JsonRpcResponse> ResponseRouter<T> {
2084 /// Method of the original request
2085 #[must_use]
2086 pub fn method(&self) -> &str {
2087 &self.method
2088 }
2089
2090 /// ID of the original request as a JSON value
2091 #[must_use]
2092 pub fn id(&self) -> serde_json::Value {
2093 crate::util::id_to_json(&self.id)
2094 }
2095
2096 /// The peer to which the original request was sent.
2097 ///
2098 /// This is the peer from which we expect to receive the response.
2099 #[must_use]
2100 pub fn role_id(&self) -> RoleId {
2101 self.role_id.clone()
2102 }
2103
2104 /// Convert to a `ResponseRouter` that expects a JSON value
2105 /// and which checks (dynamically) that the JSON value it receives
2106 /// can be converted to `T`.
2107 pub fn erase_to_json(self) -> ResponseRouter<serde_json::Value> {
2108 self.wrap_params(|method, value| T::from_value(method, value?))
2109 }
2110
2111 /// Return a new ResponseRouter that expects a response of type U.
2112 ///
2113 /// `wrap_fn` will be invoked with the method name and the result to transform
2114 /// type `U` into type `T` before sending.
2115 fn wrap_params<U: JsonRpcResponse>(
2116 self,
2117 wrap_fn: impl FnOnce(&str, Result<U, crate::Error>) -> Result<T, crate::Error> + Send + 'static,
2118 ) -> ResponseRouter<U> {
2119 let method = self.method.clone();
2120 ResponseRouter {
2121 method: self.method,
2122 id: self.id,
2123 role_id: self.role_id,
2124 send_fn: Box::new(move |input: Result<U, crate::Error>| {
2125 let t_value = wrap_fn(&method, input);
2126 (self.send_fn)(t_value)
2127 }),
2128 }
2129 }
2130
2131 /// Complete the response by sending the result to the waiting task.
2132 pub fn respond_with_result(
2133 self,
2134 response: Result<T, crate::Error>,
2135 ) -> Result<(), crate::Error> {
2136 tracing::debug!(id = ?self.id, "response routed to awaiter");
2137 (self.send_fn)(response)
2138 }
2139
2140 /// Complete the response by sending a value to the waiting task.
2141 pub fn respond(self, response: T) -> Result<(), crate::Error> {
2142 self.respond_with_result(Ok(response))
2143 }
2144
2145 /// Complete the response by sending an internal error to the waiting task.
2146 pub fn respond_with_internal_error(self, message: impl ToString) -> Result<(), crate::Error> {
2147 self.respond_with_error(crate::util::internal_error(message))
2148 }
2149
2150 /// Complete the response by sending an error to the waiting task.
2151 pub fn respond_with_error(self, error: crate::Error) -> Result<(), crate::Error> {
2152 tracing::debug!(id = ?self.id, ?error, "error routed to awaiter");
2153 self.respond_with_result(Err(error))
2154 }
2155}
2156
2157/// Common bounds for any JSON-RPC message.
2158///
2159/// # Derive Macro
2160///
2161/// For simple message types, you can use the `JsonRpcRequest` or `JsonRpcNotification` derive macros
2162/// which will implement both `JsonRpcMessage` and the respective trait. See [`JsonRpcRequest`] and
2163/// [`JsonRpcNotification`] for examples.
2164pub trait JsonRpcMessage: 'static + Debug + Sized + Send + Clone {
2165 /// Check if this message type matches the given method name.
2166 fn matches_method(method: &str) -> bool;
2167
2168 /// The method name for the message.
2169 fn method(&self) -> &str;
2170
2171 /// Convert this message into an untyped message.
2172 fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error>;
2173
2174 /// Parse this type from a method name and parameters.
2175 ///
2176 /// Returns an error if the method doesn't match or deserialization fails.
2177 /// Callers should use `matches_method` first to check if this type handles the method.
2178 fn parse_message(method: &str, params: &impl Serialize) -> Result<Self, crate::Error>;
2179}
2180
2181/// Defines the "payload" of a successful response to a JSON-RPC request.
2182///
2183/// # Derive Macro
2184///
2185/// Use `#[derive(JsonRpcResponse)]` to automatically implement this trait:
2186///
2187/// ```ignore
2188/// use agent_client_protocol::JsonRpcResponse;
2189/// use serde::{Serialize, Deserialize};
2190///
2191/// #[derive(Debug, Serialize, Deserialize, JsonRpcResponse)]
2192/// #[response(method = "_hello")]
2193/// struct HelloResponse {
2194/// greeting: String,
2195/// }
2196/// ```
2197pub trait JsonRpcResponse: 'static + Debug + Sized + Send + Clone {
2198 /// Convert this message into a JSON value.
2199 fn into_json(self, method: &str) -> Result<serde_json::Value, crate::Error>;
2200
2201 /// Parse a JSON value into the response type.
2202 fn from_value(method: &str, value: serde_json::Value) -> Result<Self, crate::Error>;
2203}
2204
2205impl JsonRpcResponse for serde_json::Value {
2206 fn from_value(_method: &str, value: serde_json::Value) -> Result<Self, crate::Error> {
2207 Ok(value)
2208 }
2209
2210 fn into_json(self, _method: &str) -> Result<serde_json::Value, crate::Error> {
2211 Ok(self)
2212 }
2213}
2214
2215/// A struct that represents a notification (JSON-RPC message that does not expect a response).
2216///
2217/// # Derive Macro
2218///
2219/// Use `#[derive(JsonRpcNotification)]` to automatically implement both `JsonRpcMessage` and `JsonRpcNotification`:
2220///
2221/// ```ignore
2222/// use agent_client_protocol::JsonRpcNotification;
2223/// use serde::{Serialize, Deserialize};
2224///
2225/// #[derive(Debug, Clone, Serialize, Deserialize, JsonRpcNotification)]
2226/// #[notification(method = "_ping")]
2227/// struct PingNotification {
2228/// timestamp: u64,
2229/// }
2230/// ```
2231pub trait JsonRpcNotification: JsonRpcMessage {}
2232
2233/// A struct that represents a request (JSON-RPC message expecting a response).
2234///
2235/// # Derive Macro
2236///
2237/// Use `#[derive(JsonRpcRequest)]` to automatically implement both `JsonRpcMessage` and `JsonRpcRequest`:
2238///
2239/// ```ignore
2240/// use agent_client_protocol::{JsonRpcRequest, JsonRpcResponse};
2241/// use serde::{Serialize, Deserialize};
2242///
2243/// #[derive(Debug, Clone, Serialize, Deserialize, JsonRpcRequest)]
2244/// #[request(method = "_hello", response = HelloResponse)]
2245/// struct HelloRequest {
2246/// name: String,
2247/// }
2248///
2249/// #[derive(Debug, Serialize, Deserialize, JsonRpcResponse)]
2250/// struct HelloResponse {
2251/// greeting: String,
2252/// }
2253/// ```
2254pub trait JsonRpcRequest: JsonRpcMessage {
2255 /// The type of data expected in response.
2256 type Response: JsonRpcResponse;
2257}
2258
2259/// An enum capturing an in-flight request or notification.
2260/// In the case of a request, also includes the context used to respond to the request.
2261///
2262/// Type parameters allow specifying the concrete request and notification types.
2263/// By default, both are `UntypedMessage` for dynamic dispatch.
2264/// The request context's response type matches the request's response type.
2265#[derive(Debug)]
2266pub enum Dispatch<Req: JsonRpcRequest = UntypedMessage, Notif: JsonRpcMessage = UntypedMessage> {
2267 /// Incoming request and the context where the response should be sent.
2268 Request(Req, Responder<Req::Response>),
2269
2270 /// Incoming notification.
2271 Notification(Notif),
2272
2273 /// Incoming response to a request we sent.
2274 ///
2275 /// The first field is the response result (success or error from the remote).
2276 /// The second field is the context for forwarding the response to its destination
2277 /// (typically a waiting oneshot channel).
2278 Response(
2279 Result<Req::Response, crate::Error>,
2280 ResponseRouter<Req::Response>,
2281 ),
2282}
2283
2284impl<Req: JsonRpcRequest, Notif: JsonRpcMessage> Dispatch<Req, Notif> {
2285 /// Map the request and notification types to new types.
2286 ///
2287 /// Note: Response variants are passed through unchanged since they don't
2288 /// contain a parseable message payload.
2289 pub fn map<Req1, Notif1>(
2290 self,
2291 map_request: impl FnOnce(Req, Responder<Req::Response>) -> (Req1, Responder<Req1::Response>),
2292 map_notification: impl FnOnce(Notif) -> Notif1,
2293 ) -> Dispatch<Req1, Notif1>
2294 where
2295 Req1: JsonRpcRequest<Response = Req::Response>,
2296 Notif1: JsonRpcMessage,
2297 {
2298 match self {
2299 Dispatch::Request(request, responder) => {
2300 let (new_request, new_responder) = map_request(request, responder);
2301 Dispatch::Request(new_request, new_responder)
2302 }
2303 Dispatch::Notification(notification) => {
2304 let new_notification = map_notification(notification);
2305 Dispatch::Notification(new_notification)
2306 }
2307 Dispatch::Response(result, router) => Dispatch::Response(result, router),
2308 }
2309 }
2310
2311 /// Respond to the message with an error.
2312 ///
2313 /// If this message is a request, this error becomes the reply to the request.
2314 ///
2315 /// If this message is a notification, the error is sent as a notification.
2316 ///
2317 /// If this message is a response, the error is forwarded to the waiting handler.
2318 pub fn respond_with_error<R: Role>(
2319 self,
2320 error: crate::Error,
2321 cx: ConnectionTo<R>,
2322 ) -> Result<(), crate::Error> {
2323 match self {
2324 Dispatch::Request(_, responder) => responder.respond_with_error(error),
2325 Dispatch::Notification(_) => cx.send_error_notification(error),
2326 Dispatch::Response(_, responder) => responder.respond_with_error(error),
2327 }
2328 }
2329
2330 /// Convert to a `Responder` that expects a JSON value
2331 /// and which checks (dynamically) that the JSON value it receives
2332 /// can be converted to `T`.
2333 ///
2334 /// Note: Response variants cannot be erased since their payload is already
2335 /// parsed. This returns an error for Response variants.
2336 pub fn erase_to_json(self) -> Result<Dispatch, crate::Error> {
2337 match self {
2338 Dispatch::Request(response, responder) => Ok(Dispatch::Request(
2339 response.to_untyped_message()?,
2340 responder.erase_to_json(),
2341 )),
2342 Dispatch::Notification(notification) => {
2343 Ok(Dispatch::Notification(notification.to_untyped_message()?))
2344 }
2345 Dispatch::Response(_, _) => Err(crate::util::internal_error(
2346 "cannot erase Response variant to JSON",
2347 )),
2348 }
2349 }
2350
2351 /// Convert the message in self to an untyped message.
2352 ///
2353 /// Note: Response variants don't have an untyped message representation.
2354 /// This returns an error for Response variants.
2355 pub fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error> {
2356 match self {
2357 Dispatch::Request(request, _) => request.to_untyped_message(),
2358 Dispatch::Notification(notification) => notification.to_untyped_message(),
2359 Dispatch::Response(_, _) => Err(crate::util::internal_error(
2360 "Response variant has no untyped message representation",
2361 )),
2362 }
2363 }
2364
2365 /// Convert self to an untyped message context.
2366 ///
2367 /// Note: Response variants cannot be converted. This returns an error for Response variants.
2368 pub fn into_untyped_dispatch(self) -> Result<Dispatch, crate::Error> {
2369 match self {
2370 Dispatch::Request(request, responder) => Ok(Dispatch::Request(
2371 request.to_untyped_message()?,
2372 responder.erase_to_json(),
2373 )),
2374 Dispatch::Notification(notification) => {
2375 Ok(Dispatch::Notification(notification.to_untyped_message()?))
2376 }
2377 Dispatch::Response(_, _) => Err(crate::util::internal_error(
2378 "cannot convert Response variant to untyped message context",
2379 )),
2380 }
2381 }
2382
2383 /// Returns the request ID if this is a request or response, None if notification.
2384 pub fn id(&self) -> Option<serde_json::Value> {
2385 match self {
2386 Dispatch::Request(_, cx) => Some(cx.id()),
2387 Dispatch::Notification(_) => None,
2388 Dispatch::Response(_, cx) => Some(cx.id()),
2389 }
2390 }
2391
2392 /// Returns the method of the message.
2393 ///
2394 /// For requests and notifications, this is the method from the message payload.
2395 /// For responses, this is the method of the original request.
2396 pub fn method(&self) -> &str {
2397 match self {
2398 Dispatch::Request(msg, _) => msg.method(),
2399 Dispatch::Notification(msg) => msg.method(),
2400 Dispatch::Response(_, cx) => cx.method(),
2401 }
2402 }
2403}
2404
2405impl Dispatch {
2406 /// Attempts to parse `self` into a typed message context.
2407 ///
2408 /// # Returns
2409 ///
2410 /// * `Ok(Ok(typed))` if this is a request/notification of the given types
2411 /// * `Ok(Err(self))` if not
2412 /// * `Err` if has the correct method for the given types but parsing fails
2413 #[tracing::instrument(skip(self), fields(Request = ?std::any::type_name::<Req>(), Notif = ?std::any::type_name::<Notif>()), level = "trace", ret)]
2414 pub(crate) fn into_typed_dispatch<Req: JsonRpcRequest, Notif: JsonRpcNotification>(
2415 self,
2416 ) -> Result<Result<Dispatch<Req, Notif>, Dispatch>, crate::Error> {
2417 tracing::debug!(
2418 message = ?self,
2419 "into_typed_dispatch"
2420 );
2421 match self {
2422 Dispatch::Request(message, responder) => {
2423 if Req::matches_method(&message.method) {
2424 match Req::parse_message(&message.method, &message.params) {
2425 Ok(req) => {
2426 tracing::trace!(?req, "parsed ok");
2427 Ok(Ok(Dispatch::Request(req, responder.cast())))
2428 }
2429 Err(err) => {
2430 tracing::trace!(?err, "parse error");
2431 Err(err)
2432 }
2433 }
2434 } else {
2435 tracing::trace!("method doesn't match");
2436 Ok(Err(Dispatch::Request(message, responder)))
2437 }
2438 }
2439
2440 Dispatch::Notification(message) => {
2441 if Notif::matches_method(&message.method) {
2442 match Notif::parse_message(&message.method, &message.params) {
2443 Ok(notif) => {
2444 tracing::trace!(?notif, "parse ok");
2445 Ok(Ok(Dispatch::Notification(notif)))
2446 }
2447 Err(err) => {
2448 tracing::trace!(?err, "parse error");
2449 Err(err)
2450 }
2451 }
2452 } else {
2453 tracing::trace!("method doesn't match");
2454 Ok(Err(Dispatch::Notification(message)))
2455 }
2456 }
2457
2458 Dispatch::Response(result, cx) => {
2459 let method = cx.method();
2460 if Req::matches_method(method) {
2461 // Parse the response result
2462 let typed_result = match result {
2463 Ok(value) => {
2464 match <Req::Response as JsonRpcResponse>::from_value(method, value) {
2465 Ok(parsed) => {
2466 tracing::trace!(?parsed, "parse ok");
2467 Ok(parsed)
2468 }
2469 Err(err) => {
2470 tracing::trace!(?err, "parse error");
2471 return Err(err);
2472 }
2473 }
2474 }
2475 Err(err) => {
2476 tracing::trace!("error, passthrough");
2477 Err(err)
2478 }
2479 };
2480 Ok(Ok(Dispatch::Response(typed_result, cx.cast())))
2481 } else {
2482 tracing::trace!("method doesn't match");
2483 Ok(Err(Dispatch::Response(result, cx)))
2484 }
2485 }
2486 }
2487 }
2488
2489 /// True if this message has a field with the given name.
2490 ///
2491 /// Returns `false` for Response variants.
2492 #[must_use]
2493 pub fn has_field(&self, field_name: &str) -> bool {
2494 self.message()
2495 .and_then(|m| m.params().get(field_name))
2496 .is_some()
2497 }
2498
2499 /// Returns true if this message has a session-id field.
2500 ///
2501 /// Returns `false` for Response variants.
2502 pub(crate) fn has_session_id(&self) -> bool {
2503 self.has_field("sessionId")
2504 }
2505
2506 /// Extract the ACP session-id from this message (if any).
2507 ///
2508 /// Returns `Ok(None)` for Response variants.
2509 pub(crate) fn get_session_id(&self) -> Result<Option<SessionId>, crate::Error> {
2510 let Some(message) = self.message() else {
2511 return Ok(None);
2512 };
2513 let Some(value) = message.params().get("sessionId") else {
2514 return Ok(None);
2515 };
2516 let session_id = serde_json::from_value(value.clone())?;
2517 Ok(Some(session_id))
2518 }
2519
2520 /// Try to parse this as a notification of the given type.
2521 ///
2522 /// # Returns
2523 ///
2524 /// * `Ok(Ok(typed))` if this is a request/notification of the given types
2525 /// * `Ok(Err(self))` if not
2526 /// * `Err` if has the correct method for the given types but parsing fails
2527 pub fn into_notification<N: JsonRpcNotification>(
2528 self,
2529 ) -> Result<Result<N, Dispatch>, crate::Error> {
2530 match self {
2531 Dispatch::Notification(msg) => {
2532 if !N::matches_method(&msg.method) {
2533 return Ok(Err(Dispatch::Notification(msg)));
2534 }
2535 match N::parse_message(&msg.method, &msg.params) {
2536 Ok(n) => Ok(Ok(n)),
2537 Err(err) => Err(err),
2538 }
2539 }
2540 Dispatch::Request(..) | Dispatch::Response(..) => Ok(Err(self)),
2541 }
2542 }
2543
2544 /// Try to parse this as a request of the given type.
2545 ///
2546 /// # Returns
2547 ///
2548 /// * `Ok(Ok(typed))` if this is a request/notification of the given types
2549 /// * `Ok(Err(self))` if not
2550 /// * `Err` if has the correct method for the given types but parsing fails
2551 pub fn into_request<Req: JsonRpcRequest>(
2552 self,
2553 ) -> Result<Result<(Req, Responder<Req::Response>), Dispatch>, crate::Error> {
2554 match self {
2555 Dispatch::Request(msg, responder) => {
2556 if !Req::matches_method(&msg.method) {
2557 return Ok(Err(Dispatch::Request(msg, responder)));
2558 }
2559 match Req::parse_message(&msg.method, &msg.params) {
2560 Ok(req) => Ok(Ok((req, responder.cast()))),
2561 Err(err) => Err(err),
2562 }
2563 }
2564 Dispatch::Notification(..) | Dispatch::Response(..) => Ok(Err(self)),
2565 }
2566 }
2567}
2568
2569impl<M: JsonRpcRequest + JsonRpcNotification> Dispatch<M, M> {
2570 /// Returns the message payload for requests and notifications.
2571 ///
2572 /// Returns `None` for Response variants since they don't contain a message payload.
2573 pub fn message(&self) -> Option<&M> {
2574 match self {
2575 Dispatch::Request(msg, _) | Dispatch::Notification(msg) => Some(msg),
2576 Dispatch::Response(_, _) => None,
2577 }
2578 }
2579
2580 /// Map the request/notification message.
2581 ///
2582 /// Response variants pass through unchanged.
2583 pub(crate) fn try_map_message(
2584 self,
2585 map_message: impl FnOnce(M) -> Result<M, crate::Error>,
2586 ) -> Result<Dispatch<M, M>, crate::Error> {
2587 match self {
2588 Dispatch::Request(request, cx) => Ok(Dispatch::Request(map_message(request)?, cx)),
2589 Dispatch::Notification(notification) => {
2590 Ok(Dispatch::<M, M>::Notification(map_message(notification)?))
2591 }
2592 Dispatch::Response(result, cx) => Ok(Dispatch::Response(result, cx)),
2593 }
2594 }
2595}
2596
2597/// An incoming JSON message without any typing. Can be a request or a notification.
2598#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
2599pub struct UntypedMessage {
2600 /// The JSON-RPC method name
2601 pub method: String,
2602 /// The JSON-RPC parameters as a raw JSON value
2603 pub params: serde_json::Value,
2604}
2605
2606impl UntypedMessage {
2607 /// Returns an untyped message with the given method and parameters.
2608 pub fn new(method: &str, params: impl Serialize) -> Result<Self, crate::Error> {
2609 let params = serde_json::to_value(params)?;
2610 Ok(Self {
2611 method: method.to_string(),
2612 params,
2613 })
2614 }
2615
2616 /// Returns the method name
2617 #[must_use]
2618 pub fn method(&self) -> &str {
2619 &self.method
2620 }
2621
2622 /// Returns the parameters as a JSON value
2623 #[must_use]
2624 pub fn params(&self) -> &serde_json::Value {
2625 &self.params
2626 }
2627
2628 /// Consumes this message and returns the method and params
2629 #[must_use]
2630 pub fn into_parts(self) -> (String, serde_json::Value) {
2631 (self.method, self.params)
2632 }
2633
2634 /// Convert `self` to a JSON-RPC message.
2635 pub(crate) fn into_jsonrpc_msg(
2636 self,
2637 id: Option<jsonrpcmsg::Id>,
2638 ) -> Result<jsonrpcmsg::Request, crate::Error> {
2639 let Self { method, params } = self;
2640 Ok(jsonrpcmsg::Request::new_v2(method, json_cast(params)?, id))
2641 }
2642}
2643
2644impl JsonRpcMessage for UntypedMessage {
2645 fn matches_method(_method: &str) -> bool {
2646 // UntypedMessage matches any method - it's the untyped fallback
2647 true
2648 }
2649
2650 fn method(&self) -> &str {
2651 &self.method
2652 }
2653
2654 fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error> {
2655 Ok(self.clone())
2656 }
2657
2658 fn parse_message(method: &str, params: &impl Serialize) -> Result<Self, crate::Error> {
2659 UntypedMessage::new(method, params)
2660 }
2661}
2662
2663impl JsonRpcRequest for UntypedMessage {
2664 type Response = serde_json::Value;
2665}
2666
2667impl JsonRpcNotification for UntypedMessage {}
2668
2669/// Represents a pending response of type `R` from an outgoing request.
2670///
2671/// Returned by [`ConnectionTo::send_request`], this type provides methods for handling
2672/// the response without blocking the event loop. The API is intentionally designed to make
2673/// it difficult to accidentally block.
2674///
2675/// # Anti-Footgun Design
2676///
2677/// You cannot directly `.await` a `SentRequest`. Instead, you must choose how to handle
2678/// the response:
2679///
2680/// ## Option 1: Schedule a Callback (Safe in Handlers)
2681///
2682/// Use [`on_receiving_result`](Self::on_receiving_result) to schedule a task
2683/// that runs when the response arrives. This doesn't block the event loop:
2684///
2685/// ```no_run
2686/// # use agent_client_protocol_test::*;
2687/// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
2688/// cx.send_request(MyRequest {})
2689/// .on_receiving_result(async |result| {
2690/// match result {
2691/// Ok(response) => {
2692/// // Handle successful response
2693/// Ok(())
2694/// }
2695/// Err(error) => {
2696/// // Handle error
2697/// Err(error)
2698/// }
2699/// }
2700/// })?;
2701/// # Ok(())
2702/// # }
2703/// ```
2704///
2705/// ## Option 2: Block in a Spawned Task (Safe Only in `spawn`)
2706///
2707/// Use [`block_task`](Self::block_task) to block until the response arrives, but **only**
2708/// in a spawned task (never in a handler):
2709///
2710/// ```no_run
2711/// # use agent_client_protocol_test::*;
2712/// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
2713/// // ✅ Safe: Spawned task runs concurrently
2714/// cx.spawn({
2715/// let cx = cx.clone();
2716/// async move {
2717/// let response = cx.send_request(MyRequest {})
2718/// .block_task()
2719/// .await?;
2720/// // Process response...
2721/// Ok(())
2722/// }
2723/// })?;
2724/// # Ok(())
2725/// # }
2726/// ```
2727///
2728/// ```no_run
2729/// # use agent_client_protocol_test::*;
2730/// # async fn example() -> Result<(), agent_client_protocol::Error> {
2731/// # let connection = mock_connection();
2732/// // ❌ NEVER do this in a handler - blocks the event loop!
2733/// connection.on_receive_request(async |req: MyRequest, responder, cx| {
2734/// let response = cx.send_request(MyRequest {})
2735/// .block_task() // This will deadlock!
2736/// .await?;
2737/// responder.respond(response)
2738/// }, agent_client_protocol::on_receive_request!())
2739/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
2740/// # Ok(())
2741/// # }
2742/// ```
2743///
2744/// # Why This Design?
2745///
2746/// If you block the event loop while waiting for a response, the connection cannot process
2747/// the incoming response message, creating a deadlock. This API design prevents that footgun
2748/// by making blocking explicit and encouraging non-blocking patterns.
2749pub struct SentRequest<T> {
2750 id: jsonrpcmsg::Id,
2751 method: String,
2752 task_tx: TaskTx,
2753 response_rx: oneshot::Receiver<ResponsePayload>,
2754 to_result: Box<dyn Fn(serde_json::Value) -> Result<T, crate::Error> + Send>,
2755}
2756
2757impl<T: Debug> Debug for SentRequest<T> {
2758 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2759 f.debug_struct("SentRequest")
2760 .field("id", &self.id)
2761 .field("method", &self.method)
2762 .field("task_tx", &self.task_tx)
2763 .field("response_rx", &self.response_rx)
2764 .finish_non_exhaustive()
2765 }
2766}
2767
2768impl SentRequest<serde_json::Value> {
2769 fn new(
2770 id: jsonrpcmsg::Id,
2771 method: String,
2772 task_tx: mpsc::UnboundedSender<Task>,
2773 response_rx: oneshot::Receiver<ResponsePayload>,
2774 ) -> Self {
2775 Self {
2776 id,
2777 method,
2778 response_rx,
2779 task_tx,
2780 to_result: Box::new(Ok),
2781 }
2782 }
2783}
2784
2785impl<T: JsonRpcResponse> SentRequest<T> {
2786 /// The id of the outgoing request.
2787 #[must_use]
2788 pub fn id(&self) -> serde_json::Value {
2789 crate::util::id_to_json(&self.id)
2790 }
2791
2792 /// The method of the request this is in response to.
2793 #[must_use]
2794 pub fn method(&self) -> &str {
2795 &self.method
2796 }
2797
2798 /// Create a new response that maps the result of the response to a new type.
2799 pub fn map<U>(
2800 self,
2801 map_fn: impl Fn(T) -> Result<U, crate::Error> + 'static + Send,
2802 ) -> SentRequest<U> {
2803 SentRequest {
2804 id: self.id,
2805 method: self.method,
2806 response_rx: self.response_rx,
2807 task_tx: self.task_tx,
2808 to_result: Box::new(move |value| map_fn((self.to_result)(value)?)),
2809 }
2810 }
2811
2812 /// Forward the response (success or error) to a request context when it arrives.
2813 ///
2814 /// This is a convenience method for proxying messages between connections. When the
2815 /// response arrives, it will be automatically sent to the provided request context,
2816 /// whether it's a successful response or an error.
2817 ///
2818 /// # Example: Proxying requests
2819 ///
2820 /// ```
2821 /// # use agent_client_protocol::UntypedRole;
2822 /// # use agent_client_protocol::{Builder, ConnectionTo};
2823 /// # use agent_client_protocol_test::*;
2824 /// # async fn example(cx: ConnectionTo<UntypedRole>) -> Result<(), agent_client_protocol::Error> {
2825 /// // Set up backend connection builder
2826 /// let backend = UntypedRole.builder()
2827 /// .on_receive_request(async |req: MyRequest, responder, cx| {
2828 /// responder.respond(MyResponse { status: "ok".into() })
2829 /// }, agent_client_protocol::on_receive_request!());
2830 ///
2831 /// // Spawn backend and get a context to send to it
2832 /// let backend_connection = cx.spawn_connection(backend, MockTransport)?;
2833 ///
2834 /// // Set up proxy that forwards requests to backend
2835 /// UntypedRole.builder()
2836 /// .on_receive_request({
2837 /// let backend_connection = backend_connection.clone();
2838 /// async move |req: MyRequest, responder, cx| {
2839 /// // Forward the request to backend and proxy the response back
2840 /// backend_connection.send_request(req)
2841 /// .forward_response_to(responder)?;
2842 /// Ok(())
2843 /// }
2844 /// }, agent_client_protocol::on_receive_request!());
2845 /// # Ok(())
2846 /// # }
2847 /// ```
2848 ///
2849 /// # Type Safety
2850 ///
2851 /// The request context's response type must match the request's response type,
2852 /// ensuring type-safe message forwarding.
2853 ///
2854 /// # When to Use
2855 ///
2856 /// Use this when:
2857 /// - You're implementing a proxy or gateway pattern
2858 /// - You want to forward responses without processing them
2859 /// - The response types match between the outgoing request and incoming request
2860 ///
2861 /// This is equivalent to calling `on_receiving_result` and manually forwarding
2862 /// the result, but more concise.
2863 pub fn forward_response_to(self, responder: Responder<T>) -> Result<(), crate::Error>
2864 where
2865 T: Send,
2866 {
2867 self.on_receiving_result(async move |result| responder.respond_with_result(result))
2868 }
2869
2870 /// Block the current task until the response is received.
2871 ///
2872 /// **Warning:** This method blocks the current async task. It is **only safe** to use
2873 /// in spawned tasks created with [`ConnectionTo::spawn`]. Using it directly in a
2874 /// handler callback will deadlock the connection.
2875 ///
2876 /// # Safe Usage (in spawned tasks)
2877 ///
2878 /// ```no_run
2879 /// # use agent_client_protocol_test::*;
2880 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
2881 /// # let connection = mock_connection();
2882 /// connection.on_receive_request(async |req: MyRequest, responder, cx| {
2883 /// // Spawn a task to handle the request
2884 /// cx.spawn({
2885 /// let connection = cx.clone();
2886 /// async move {
2887 /// // Safe: We're in a spawned task, not blocking the event loop
2888 /// let response = connection.send_request(OtherRequest {})
2889 /// .block_task()
2890 /// .await?;
2891 ///
2892 /// // Process the response...
2893 /// Ok(())
2894 /// }
2895 /// })?;
2896 ///
2897 /// // Respond immediately
2898 /// responder.respond(MyResponse { status: "ok".into() })
2899 /// }, agent_client_protocol::on_receive_request!())
2900 /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
2901 /// # Ok(())
2902 /// # }
2903 /// ```
2904 ///
2905 /// # Unsafe Usage (in handlers - will deadlock!)
2906 ///
2907 /// ```no_run
2908 /// # use agent_client_protocol_test::*;
2909 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
2910 /// # let connection = mock_connection();
2911 /// connection.on_receive_request(async |req: MyRequest, responder, cx| {
2912 /// // ❌ DEADLOCK: Handler blocks event loop, which can't process the response
2913 /// let response = cx.send_request(OtherRequest {})
2914 /// .block_task()
2915 /// .await?;
2916 ///
2917 /// responder.respond(MyResponse { status: response.value })
2918 /// }, agent_client_protocol::on_receive_request!())
2919 /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
2920 /// # Ok(())
2921 /// # }
2922 /// ```
2923 ///
2924 /// # When to Use
2925 ///
2926 /// Use this method when:
2927 /// - You're in a spawned task (via [`ConnectionTo::spawn`])
2928 /// - You need the response value to proceed with your logic
2929 /// - Linear control flow is more natural than callbacks
2930 ///
2931 /// For handler callbacks, use [`on_receiving_result`](Self::on_receiving_result) instead.
2932 pub async fn block_task(self) -> Result<T, crate::Error>
2933 where
2934 T: Send,
2935 {
2936 match self.response_rx.await {
2937 Ok(ResponsePayload {
2938 result: Ok(json_value),
2939 ack_tx,
2940 }) => {
2941 // Ack immediately - we're in a spawned task, so the dispatch loop
2942 // can continue while we process the value.
2943 if let Some(tx) = ack_tx {
2944 let _ = tx.send(());
2945 }
2946 match (self.to_result)(json_value) {
2947 Ok(value) => Ok(value),
2948 Err(err) => Err(err),
2949 }
2950 }
2951 Ok(ResponsePayload {
2952 result: Err(err),
2953 ack_tx,
2954 }) => {
2955 if let Some(tx) = ack_tx {
2956 let _ = tx.send(());
2957 }
2958 Err(err)
2959 }
2960 Err(err) => Err(crate::util::internal_error(format!(
2961 "response to `{}` never received: {}",
2962 self.method, err
2963 ))),
2964 }
2965 }
2966
2967 /// Schedule an async task to run when a successful response is received.
2968 ///
2969 /// This is a convenience wrapper around [`on_receiving_result`](Self::on_receiving_result)
2970 /// for the common pattern of forwarding errors to a request context while only processing
2971 /// successful responses.
2972 ///
2973 /// # Behavior
2974 ///
2975 /// - If the response is `Ok(value)`, your task receives the value and the request context
2976 /// - If the response is `Err(error)`, the error is automatically sent to `responder`
2977 /// and your task is not called
2978 ///
2979 /// # Example: Chaining requests
2980 ///
2981 /// ```no_run
2982 /// # use agent_client_protocol_test::*;
2983 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
2984 /// # let connection = mock_connection();
2985 /// connection.on_receive_request(async |req: ValidateRequest, responder, cx| {
2986 /// // Send initial request
2987 /// cx.send_request(ValidateRequest { data: req.data.clone() })
2988 /// .on_receiving_ok_result(responder, async |validation, responder| {
2989 /// // Only runs if validation succeeded
2990 /// if validation.is_valid {
2991 /// // Respond to original request
2992 /// responder.respond(ValidateResponse { is_valid: true, error: None })
2993 /// } else {
2994 /// responder.respond_with_error(agent_client_protocol::util::internal_error("validation failed"))
2995 /// }
2996 /// })?;
2997 ///
2998 /// Ok(())
2999 /// }, agent_client_protocol::on_receive_request!())
3000 /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
3001 /// # Ok(())
3002 /// # }
3003 /// ```
3004 ///
3005 /// # Ordering
3006 ///
3007 /// Like [`on_receiving_result`](Self::on_receiving_result), the callback blocks the
3008 /// dispatch loop until it completes. See the [`ordering`](crate::concepts::ordering) module
3009 /// for details.
3010 ///
3011 /// # When to Use
3012 ///
3013 /// Use this when:
3014 /// - You need to respond to a request based on another request's result
3015 /// - You want errors to automatically propagate to the request context
3016 /// - You only care about the success case
3017 ///
3018 /// For more control over error handling, use [`on_receiving_result`](Self::on_receiving_result).
3019 #[track_caller]
3020 pub fn on_receiving_ok_result<F>(
3021 self,
3022 responder: Responder<T>,
3023 task: impl FnOnce(T, Responder<T>) -> F + 'static + Send,
3024 ) -> Result<(), crate::Error>
3025 where
3026 F: Future<Output = Result<(), crate::Error>> + 'static + Send,
3027 T: Send,
3028 {
3029 self.on_receiving_result(async move |result| match result {
3030 Ok(value) => task(value, responder).await,
3031 Err(err) => responder.respond_with_error(err),
3032 })
3033 }
3034
3035 /// Schedule an async task to run when the response is received.
3036 ///
3037 /// This is the recommended way to handle responses in handler callbacks, as it doesn't
3038 /// block the event loop. The task will be spawned automatically when the response arrives.
3039 ///
3040 /// # Example: Handle response in callback
3041 ///
3042 /// ```no_run
3043 /// # use agent_client_protocol_test::*;
3044 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
3045 /// # let connection = mock_connection();
3046 /// connection.on_receive_request(async |req: MyRequest, responder, cx| {
3047 /// // Send a request and schedule a callback for the response
3048 /// cx.send_request(QueryRequest { id: 22 })
3049 /// .on_receiving_result({
3050 /// let connection = cx.clone();
3051 /// async move |result| {
3052 /// match result {
3053 /// Ok(response) => {
3054 /// println!("Got response: {:?}", response);
3055 /// // Can send more messages here
3056 /// connection.send_notification(QueryComplete {})?;
3057 /// Ok(())
3058 /// }
3059 /// Err(error) => {
3060 /// eprintln!("Request failed: {}", error);
3061 /// Err(error)
3062 /// }
3063 /// }
3064 /// }
3065 /// })?;
3066 ///
3067 /// // Handler continues immediately without waiting
3068 /// responder.respond(MyResponse { status: "processing".into() })
3069 /// }, agent_client_protocol::on_receive_request!())
3070 /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
3071 /// # Ok(())
3072 /// # }
3073 /// ```
3074 ///
3075 /// # Ordering
3076 ///
3077 /// The callback runs as a spawned task, but the dispatch loop waits for it to complete
3078 /// before processing the next message. This gives you ordering guarantees: no other
3079 /// messages will be processed while your callback runs.
3080 ///
3081 /// This differs from [`block_task`](Self::block_task), which signals completion immediately
3082 /// upon receiving the response (before your code processes it).
3083 ///
3084 /// See the [`ordering`](crate::concepts::ordering) module for details on ordering guarantees
3085 /// and how to avoid deadlocks.
3086 ///
3087 /// # Error Handling
3088 ///
3089 /// If the scheduled task returns `Err`, the entire server will shut down. Make sure to handle
3090 /// errors appropriately within your task.
3091 ///
3092 /// # When to Use
3093 ///
3094 /// Use this method when:
3095 /// - You're in a handler callback (not a spawned task)
3096 /// - You want ordering guarantees (no other messages processed during your callback)
3097 /// - You need to do async work before "releasing" control back to the dispatch loop
3098 ///
3099 /// For spawned tasks where you don't need ordering guarantees, consider [`block_task`](Self::block_task).
3100 #[track_caller]
3101 pub fn on_receiving_result<F>(
3102 self,
3103 task: impl FnOnce(Result<T, crate::Error>) -> F + 'static + Send,
3104 ) -> Result<(), crate::Error>
3105 where
3106 F: Future<Output = Result<(), crate::Error>> + 'static + Send,
3107 T: Send,
3108 {
3109 let task_tx = self.task_tx.clone();
3110 let method = self.method;
3111 let response_rx = self.response_rx;
3112 let to_result = self.to_result;
3113 let location = Location::caller();
3114
3115 Task::new(location, async move {
3116 match response_rx.await {
3117 Ok(ResponsePayload { result, ack_tx }) => {
3118 // Convert the result using to_result for Ok values
3119 let typed_result = match result {
3120 Ok(json_value) => to_result(json_value),
3121 Err(err) => Err(err),
3122 };
3123
3124 // Run the user's callback
3125 let outcome = task(typed_result).await;
3126
3127 // Ack AFTER the callback completes - this is the key difference
3128 // from block_task. The dispatch loop waits for this ack.
3129 if let Some(tx) = ack_tx {
3130 let _ = tx.send(());
3131 }
3132
3133 outcome
3134 }
3135 Err(err) => Err(crate::util::internal_error(format!(
3136 "response to `{method}` never received: {err}"
3137 ))),
3138 }
3139 })
3140 .spawn(&task_tx)
3141 }
3142}
3143
3144// ============================================================================
3145// IntoJrConnectionTransport Implementations
3146// ============================================================================
3147
3148/// A component that communicates over line streams.
3149///
3150/// `Lines` implements the [`ConnectTo`] trait for any pair of line-based streams
3151/// (a `Stream<Item = io::Result<String>>` for incoming and a `Sink<String>` for outgoing),
3152/// handling serialization of JSON-RPC messages to/from newline-delimited JSON.
3153///
3154/// This is a lower-level primitive than [`ByteStreams`] that enables interception and
3155/// transformation of individual lines before they are parsed or after they are serialized.
3156/// This is particularly useful for debugging, logging, or implementing custom line-based
3157/// protocols.
3158///
3159/// # Use Cases
3160///
3161/// - **Line-by-line logging**: Intercept and log each line before parsing
3162/// - **Custom protocols**: Transform lines before/after JSON-RPC processing
3163/// - **Debugging**: Inspect raw message strings
3164/// - **Line filtering**: Skip or modify specific messages
3165///
3166/// Most users should use [`ByteStreams`] instead, which provides a simpler interface
3167/// for byte-based I/O.
3168///
3169/// [`ConnectTo`]: crate::ConnectTo
3170#[derive(Debug)]
3171pub struct Lines<OutgoingSink, IncomingStream> {
3172 /// Outgoing line sink (where we write serialized JSON-RPC messages)
3173 pub outgoing: OutgoingSink,
3174 /// Incoming line stream (where we read and parse JSON-RPC messages)
3175 pub incoming: IncomingStream,
3176}
3177
3178impl<OutgoingSink, IncomingStream> Lines<OutgoingSink, IncomingStream>
3179where
3180 OutgoingSink: futures::Sink<String, Error = std::io::Error> + Send + 'static,
3181 IncomingStream: futures::Stream<Item = std::io::Result<String>> + Send + 'static,
3182{
3183 /// Create a new line stream transport.
3184 pub fn new(outgoing: OutgoingSink, incoming: IncomingStream) -> Self {
3185 Self { outgoing, incoming }
3186 }
3187}
3188
3189impl<OutgoingSink, IncomingStream, R: Role> ConnectTo<R> for Lines<OutgoingSink, IncomingStream>
3190where
3191 OutgoingSink: futures::Sink<String, Error = std::io::Error> + Send + 'static,
3192 IncomingStream: futures::Stream<Item = std::io::Result<String>> + Send + 'static,
3193{
3194 async fn connect_to(self, client: impl ConnectTo<R::Counterpart>) -> Result<(), crate::Error> {
3195 let (channel, serve_self) = ConnectTo::<R>::into_channel_and_future(self);
3196 match futures::future::select(Box::pin(client.connect_to(channel)), serve_self).await {
3197 Either::Left((result, _)) | Either::Right((result, _)) => result,
3198 }
3199 }
3200
3201 fn into_channel_and_future(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
3202 let Self { outgoing, incoming } = self;
3203
3204 // Create a channel pair for the client to use
3205 let (channel_for_caller, channel_for_lines) = Channel::duplex();
3206
3207 // Create the server future that runs the line stream actors
3208 let server_future = Box::pin(async move {
3209 let Channel { rx, tx } = channel_for_lines;
3210
3211 // Run both actors concurrently
3212 let outgoing_future = transport_actor::transport_outgoing_lines_actor(rx, outgoing);
3213 let incoming_future = transport_actor::transport_incoming_lines_actor(incoming, tx);
3214
3215 // Wait for both to complete
3216 futures::try_join!(outgoing_future, incoming_future)?;
3217
3218 Ok(())
3219 });
3220
3221 (channel_for_caller, server_future)
3222 }
3223}
3224
3225/// A component that communicates over byte streams (stdin/stdout, sockets, pipes, etc.).
3226///
3227/// `ByteStreams` implements the [`ConnectTo`] trait for any pair of `AsyncRead` and `AsyncWrite`
3228/// streams, handling serialization of JSON-RPC messages to/from newline-delimited JSON.
3229/// This is the standard way to communicate with external processes or network connections.
3230///
3231/// # Use Cases
3232///
3233/// - **Stdio communication**: Connect to agents or proxies via stdin/stdout
3234/// - **Network sockets**: TCP, Unix domain sockets, or other stream-based protocols
3235/// - **Named pipes**: Cross-process communication on the same machine
3236/// - **File I/O**: Reading from and writing to file descriptors
3237///
3238/// # Example
3239///
3240/// Connecting to an agent via stdio:
3241///
3242/// ```no_run
3243/// use agent_client_protocol::UntypedRole;
3244/// # use agent_client_protocol::{ByteStreams};
3245/// use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
3246///
3247/// # async fn example() -> Result<(), agent_client_protocol::Error> {
3248/// let component = ByteStreams::new(
3249/// tokio::io::stdout().compat_write(),
3250/// tokio::io::stdin().compat(),
3251/// );
3252///
3253/// // Use as a component in a connection
3254/// agent_client_protocol::UntypedRole.builder()
3255/// .name("my-client")
3256/// .connect_to(component)
3257/// .await?;
3258/// # Ok(())
3259/// # }
3260/// ```
3261///
3262/// [`ConnectTo`]: crate::ConnectTo
3263#[derive(Debug)]
3264pub struct ByteStreams<OB, IB> {
3265 /// Outgoing byte stream (where we write serialized messages)
3266 pub outgoing: OB,
3267 /// Incoming byte stream (where we read and parse messages)
3268 pub incoming: IB,
3269}
3270
3271impl<OB, IB> ByteStreams<OB, IB>
3272where
3273 OB: AsyncWrite + Send + 'static,
3274 IB: AsyncRead + Send + 'static,
3275{
3276 /// Create a new byte stream transport.
3277 pub fn new(outgoing: OB, incoming: IB) -> Self {
3278 Self { outgoing, incoming }
3279 }
3280}
3281
3282impl<OB, IB, R: Role> ConnectTo<R> for ByteStreams<OB, IB>
3283where
3284 OB: AsyncWrite + Send + 'static,
3285 IB: AsyncRead + Send + 'static,
3286{
3287 async fn connect_to(self, client: impl ConnectTo<R::Counterpart>) -> Result<(), crate::Error> {
3288 let (channel, serve_self) = ConnectTo::<R>::into_channel_and_future(self);
3289 match futures::future::select(pin!(client.connect_to(channel)), serve_self).await {
3290 Either::Left((result, _)) | Either::Right((result, _)) => result,
3291 }
3292 }
3293
3294 fn into_channel_and_future(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
3295 use futures::AsyncBufReadExt;
3296 use futures::AsyncWriteExt;
3297 use futures::io::BufReader;
3298 let Self { outgoing, incoming } = self;
3299
3300 // Convert byte streams to line streams
3301 // Box both streams to satisfy Unpin requirements
3302 let incoming_lines = Box::pin(BufReader::new(incoming).lines());
3303
3304 // Create a sink that writes lines (with newlines) to the outgoing byte stream
3305 // We need to Box the writer since it may not be Unpin
3306 let outgoing_sink =
3307 futures::sink::unfold(Box::pin(outgoing), async move |mut writer, line: String| {
3308 let mut bytes = line.into_bytes();
3309 bytes.push(b'\n');
3310 writer.write_all(&bytes).await?;
3311 Ok::<_, std::io::Error>(writer)
3312 });
3313
3314 // Delegate to Lines component
3315 ConnectTo::<R>::into_channel_and_future(Lines::new(outgoing_sink, incoming_lines))
3316 }
3317}
3318
3319/// A channel endpoint representing one side of a bidirectional message channel.
3320///
3321/// `Channel` represents a single endpoint's view of a bidirectional communication channel.
3322/// Each endpoint has:
3323/// - `rx`: A receiver for incoming messages (or errors) from the counterpart
3324/// - `tx`: A sender for outgoing messages (or errors) to the counterpart
3325///
3326/// # Example
3327///
3328/// ```no_run
3329/// # use agent_client_protocol::UntypedRole;
3330/// # use agent_client_protocol::{Channel, Builder};
3331/// # async fn example() -> Result<(), agent_client_protocol::Error> {
3332/// // Create a pair of connected channels
3333/// let (channel_a, channel_b) = Channel::duplex();
3334///
3335/// // Each channel can be used by a different component
3336/// UntypedRole.builder()
3337/// .name("connection-a")
3338/// .connect_to(channel_a)
3339/// .await?;
3340/// # Ok(())
3341/// # }
3342/// ```
3343#[derive(Debug)]
3344pub struct Channel {
3345 /// Receives messages (or errors) from the counterpart.
3346 pub rx: mpsc::UnboundedReceiver<Result<jsonrpcmsg::Message, crate::Error>>,
3347 /// Sends messages (or errors) to the counterpart.
3348 pub tx: mpsc::UnboundedSender<Result<jsonrpcmsg::Message, crate::Error>>,
3349}
3350
3351impl Channel {
3352 /// Create a pair of connected channel endpoints.
3353 ///
3354 /// Returns two `Channel` instances that are connected to each other:
3355 /// - Messages sent via `channel_a.tx` are received on `channel_b.rx`
3356 /// - Messages sent via `channel_b.tx` are received on `channel_a.rx`
3357 ///
3358 /// # Returns
3359 ///
3360 /// A tuple `(channel_a, channel_b)` of connected channel endpoints.
3361 #[must_use]
3362 pub fn duplex() -> (Self, Self) {
3363 // Create channels: A sends Result<Message> which B receives as Message
3364 let (a_tx, b_rx) = mpsc::unbounded();
3365 let (b_tx, a_rx) = mpsc::unbounded();
3366
3367 let channel_a = Self { rx: a_rx, tx: a_tx };
3368 let channel_b = Self { rx: b_rx, tx: b_tx };
3369
3370 (channel_a, channel_b)
3371 }
3372
3373 /// Copy messages from `rx` to `tx`.
3374 ///
3375 /// # Returns
3376 ///
3377 /// A `Result` indicating success or failure.
3378 pub async fn copy(mut self) -> Result<(), crate::Error> {
3379 while let Some(msg) = self.rx.next().await {
3380 self.tx
3381 .unbounded_send(msg)
3382 .map_err(crate::util::internal_error)?;
3383 }
3384 Ok(())
3385 }
3386}
3387
3388impl<R: Role> ConnectTo<R> for Channel {
3389 async fn connect_to(self, client: impl ConnectTo<R::Counterpart>) -> Result<(), crate::Error> {
3390 let (client_channel, client_serve) = client.into_channel_and_future();
3391
3392 match futures::try_join!(
3393 Channel {
3394 rx: client_channel.rx,
3395 tx: self.tx
3396 }
3397 .copy(),
3398 Channel {
3399 rx: self.rx,
3400 tx: client_channel.tx
3401 }
3402 .copy(),
3403 client_serve
3404 ) {
3405 Ok(((), (), ())) => Ok(()),
3406 Err(err) => Err(err),
3407 }
3408 }
3409
3410 fn into_channel_and_future(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
3411 (self, Box::pin(future::ready(Ok(()))))
3412 }
3413}