agent_client_protocol/jsonrpc.rs
1//! Core JSON-RPC server support.
2
3use agent_client_protocol_schema::SessionId;
4// Re-export jsonrpcmsg for use in public API
5pub use jsonrpcmsg;
6
7// Types re-exported from crate root
8use serde::{Deserialize, Serialize};
9use std::fmt::Debug;
10use std::panic::Location;
11use std::pin::pin;
12use uuid::Uuid;
13
14use futures::channel::{mpsc, oneshot};
15use futures::future::{self, BoxFuture, Either};
16use futures::{AsyncRead, AsyncWrite, StreamExt};
17
18mod dynamic_handler;
19pub(crate) mod handlers;
20mod incoming_actor;
21mod outgoing_actor;
22pub(crate) mod run;
23mod task_actor;
24mod transport_actor;
25
26use crate::jsonrpc::dynamic_handler::DynamicHandlerMessage;
27pub use crate::jsonrpc::handlers::NullHandler;
28use crate::jsonrpc::handlers::{ChainedHandler, NamedHandler};
29use crate::jsonrpc::handlers::{MessageHandler, NotificationHandler, RequestHandler};
30use crate::jsonrpc::outgoing_actor::{OutgoingMessageTx, send_raw_message};
31use crate::jsonrpc::run::SpawnedRun;
32use crate::jsonrpc::run::{ChainRun, NullRun, RunWithConnectionTo};
33use crate::jsonrpc::task_actor::{Task, TaskTx};
34use crate::mcp_server::McpServer;
35use crate::role::HasPeer;
36use crate::role::Role;
37use crate::util::json_cast;
38use crate::{Agent, Client, ConnectTo, RoleId};
39
40/// Handlers process incoming JSON-RPC messages on a connection.
41///
42/// When messages arrive, they flow through a chain of handlers. Each handler can
43/// either **claim** the message (handle it) or **decline** it (pass to the next handler).
44///
45/// # Message Flow
46///
47/// Messages flow through three layers of handlers in order:
48///
49/// ```text
50/// ┌─────────────────────────────────────────────────────────────────┐
51/// │ Incoming Message │
52/// └─────────────────────────────────────────────────────────────────┘
53/// │
54/// ▼
55/// ┌─────────────────────────────────────────────────────────────────┐
56/// │ 1. User Handlers (registered via on_receive_request, etc.) │
57/// │ - Tried in registration order │
58/// │ - First handler to return Handled::Yes claims the message │
59/// └─────────────────────────────────────────────────────────────────┘
60/// │ Handled::No
61/// ▼
62/// ┌─────────────────────────────────────────────────────────────────┐
63/// │ 2. Dynamic Handlers (added at runtime) │
64/// │ - Used for session-specific message handling │
65/// │ - Added via ConnectionTo::add_dynamic_handler │
66/// └─────────────────────────────────────────────────────────────────┘
67/// │ Handled::No
68/// ▼
69/// ┌─────────────────────────────────────────────────────────────────┐
70/// │ 3. Role Default Handler │
71/// │ - Fallback based on the connection's Role │
72/// │ - Handles protocol-level messages (e.g., proxy forwarding) │
73/// └─────────────────────────────────────────────────────────────────┘
74/// │ Handled::No
75/// ▼
76/// ┌─────────────────────────────────────────────────────────────────┐
77/// │ Unhandled: Error response sent (or queued if retry=true) │
78/// └─────────────────────────────────────────────────────────────────┘
79/// ```
80///
81/// # The `Handled` Return Value
82///
83/// Each handler returns [`Handled`] to indicate whether it processed the message:
84///
85/// - **`Handled::Yes`** - Message was handled. No further handlers are invoked.
86/// - **`Handled::No { message, retry }`** - Message was not handled. The message
87/// (possibly modified) is passed to the next handler in the chain.
88///
89/// For convenience, handlers can return `()` which is equivalent to `Handled::Yes`.
90///
91/// # The Retry Mechanism
92///
93/// The `retry` flag in `Handled::No` controls what happens when no handler claims a message:
94///
95/// - **`retry: false`** (default) - Send a "method not found" error response immediately.
96/// - **`retry: true`** - Queue the message and retry it when new dynamic handlers are added.
97///
98/// This mechanism exists because of a timing issue with sessions: when a `session/new`
99/// response is being processed, the dynamic handler for that session hasn't been registered
100/// yet, but `session/update` notifications for that session may already be arriving.
101/// By setting `retry: true`, these early notifications are queued until the session's
102/// dynamic handler is added.
103///
104/// # Handler Registration
105///
106/// Most users register handlers using the builder methods on [`Builder`]:
107///
108/// ```
109/// # use agent_client_protocol::{Agent, Client, ConnectTo};
110/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, AgentCapabilities};
111/// # use agent_client_protocol_test::StatusUpdate;
112/// # async fn example(transport: impl ConnectTo<Agent>) -> Result<(), agent_client_protocol::Error> {
113/// Agent.builder()
114/// .on_receive_request(async |req: InitializeRequest, responder, cx| {
115/// responder.respond(
116/// InitializeResponse::new(req.protocol_version)
117/// .agent_capabilities(AgentCapabilities::new()),
118/// )
119/// }, agent_client_protocol::on_receive_request!())
120/// .on_receive_notification(async |notif: StatusUpdate, cx| {
121/// // Process notification
122/// Ok(())
123/// }, agent_client_protocol::on_receive_notification!())
124/// .connect_to(transport)
125/// .await?;
126/// # Ok(())
127/// # }
128/// ```
129///
130/// The type parameter on the closure determines which messages are dispatched to it.
131/// Messages that don't match the type are automatically passed to the next handler.
132///
133/// # Implementing Custom Handlers
134///
135/// For advanced use cases, you can implement `HandleMessageAs` directly:
136///
137/// ```ignore
138/// struct MyHandler;
139///
140/// impl HandleMessageAs<Agent> for MyHandler {
141///
142/// async fn handle_dispatch(
143/// &mut self,
144/// message: Dispatch,
145/// cx: ConnectionTo<Self::Role>,
146/// ) -> Result<Handled<Dispatch>, Error> {
147/// if message.method() == "my/custom/method" {
148/// // Handle it
149/// Ok(Handled::Yes)
150/// } else {
151/// // Pass to next handler
152/// Ok(Handled::No { message, retry: false })
153/// }
154/// }
155///
156/// fn describe_chain(&self) -> impl std::fmt::Debug {
157/// "MyHandler"
158/// }
159/// }
160/// ```
161///
162/// # Important: Handlers Must Not Block
163///
164/// The connection processes messages on a single async task. While a handler is running,
165/// no other messages can be processed. For expensive operations, use [`ConnectionTo::spawn`]
166/// to run work concurrently:
167///
168/// ```
169/// # use agent_client_protocol::{Client, Agent, ConnectTo};
170/// # use agent_client_protocol_test::{expensive_operation, ProcessComplete};
171/// # async fn example(transport: impl ConnectTo<Client>) -> Result<(), agent_client_protocol::Error> {
172/// # Client.builder().connect_with(transport, async |cx| {
173/// cx.spawn({
174/// let connection = cx.clone();
175/// async move {
176/// let result = expensive_operation("data").await?;
177/// connection.send_notification(ProcessComplete { result })?;
178/// Ok(())
179/// }
180/// })?;
181/// # Ok(())
182/// # }).await?;
183/// # Ok(())
184/// # }
185/// ```
186#[allow(async_fn_in_trait)]
187/// A handler for incoming JSON-RPC messages.
188///
189/// This trait is implemented by types that can process incoming messages on a connection.
190/// Handlers are registered with a [`Builder`] and are called in order until
191/// one claims the message.
192///
193/// The type parameter `R` is the role this handler plays - who I am.
194/// For an agent handler, `R = Agent` (I handle messages as an agent).
195/// For a client handler, `R = Client` (I handle messages as a client).
196pub trait HandleDispatchFrom<Counterpart: Role>: Send {
197 /// Attempt to claim an incoming message (request or notification).
198 ///
199 /// # Important: do not block
200 ///
201 /// The server will not process new messages until this handler returns.
202 /// You should avoid blocking in this callback unless you wish to block the server (e.g., for rate limiting).
203 /// The recommended approach to manage expensive operations is to the [`ConnectionTo::spawn`] method available on the message context.
204 ///
205 /// # Parameters
206 ///
207 /// * `message` - The incoming message to handle.
208 /// * `connection` - The connection, used to send messages and access connection state.
209 ///
210 /// # Returns
211 ///
212 /// * `Ok(Handled::Yes)` if the message was claimed. It will not be propagated further.
213 /// * `Ok(Handled::No(message))` if not; the (possibly changed) message will be passed to the remaining handlers.
214 /// * `Err` if an internal error occurs (this will bring down the server).
215 fn handle_dispatch_from(
216 &mut self,
217 message: Dispatch,
218 connection: ConnectionTo<Counterpart>,
219 ) -> impl Future<Output = Result<Handled<Dispatch>, crate::Error>> + Send;
220
221 /// Returns a debug description of the registered handlers for diagnostics.
222 fn describe_chain(&self) -> impl std::fmt::Debug;
223}
224
225impl<Counterpart: Role, H> HandleDispatchFrom<Counterpart> for &mut H
226where
227 H: HandleDispatchFrom<Counterpart>,
228{
229 fn handle_dispatch_from(
230 &mut self,
231 message: Dispatch,
232 cx: ConnectionTo<Counterpart>,
233 ) -> impl Future<Output = Result<Handled<Dispatch>, crate::Error>> + Send {
234 H::handle_dispatch_from(self, message, cx)
235 }
236
237 fn describe_chain(&self) -> impl std::fmt::Debug {
238 H::describe_chain(self)
239 }
240}
241
242/// A JSON-RPC connection that can act as either a server, client, or both.
243///
244/// [`Builder`] provides a builder-style API for creating JSON-RPC servers and clients.
245/// You start by calling `Role.builder()` (e.g., `Client.builder()`), then add message
246/// handlers, and finally drive the connection with either [`connect_to`](Builder::connect_to)
247/// or [`connect_with`](Builder::connect_with), providing a component implementation
248/// (e.g., [`ByteStreams`] for byte streams).
249///
250/// # JSON-RPC Primer
251///
252/// JSON-RPC 2.0 has two fundamental message types:
253///
254/// * **Requests** - Messages that expect a response. They have an `id` field that gets
255/// echoed back in the response so the sender can correlate them.
256/// * **Notifications** - Fire-and-forget messages with no `id` field. The sender doesn't
257/// expect or receive a response.
258///
259/// # Type-Driven Message Dispatch
260///
261/// The handler registration methods use Rust's type system to determine which messages
262/// to handle. The type parameter you provide controls what gets dispatched to your handler:
263///
264/// ## Single Message Types
265///
266/// The simplest case - handle one specific message type:
267///
268/// ```no_run
269/// # use agent_client_protocol_test::*;
270/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, SessionNotification};
271/// # async fn example() -> Result<(), agent_client_protocol::Error> {
272/// # let connection = mock_connection();
273/// connection
274/// .on_receive_request(async |req: InitializeRequest, responder, cx| {
275/// // Handle only InitializeRequest messages
276/// responder.respond(InitializeResponse::make())
277/// }, agent_client_protocol::on_receive_request!())
278/// .on_receive_notification(async |notif: SessionNotification, cx| {
279/// // Handle only SessionUpdate notifications
280/// Ok(())
281/// }, agent_client_protocol::on_receive_notification!())
282/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
283/// # Ok(())
284/// # }
285/// ```
286///
287/// ## Enum Message Types
288///
289/// You can also handle multiple related messages with a single handler by defining an enum
290/// that implements the appropriate trait ([`JsonRpcRequest`] or [`JsonRpcNotification`]):
291///
292/// ```no_run
293/// # use agent_client_protocol_test::*;
294/// # use agent_client_protocol::{JsonRpcRequest, JsonRpcMessage, UntypedMessage};
295/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse};
296/// # async fn example() -> Result<(), agent_client_protocol::Error> {
297/// # let connection = mock_connection();
298/// // Define an enum for multiple request types
299/// #[derive(Debug, Clone)]
300/// enum MyRequests {
301/// Initialize(InitializeRequest),
302/// Prompt(PromptRequest),
303/// }
304///
305/// // Implement JsonRpcRequest for your enum
306/// # impl JsonRpcMessage for MyRequests {
307/// # fn matches_method(_method: &str) -> bool { false }
308/// # fn method(&self) -> &str { "myRequests" }
309/// # fn to_untyped_message(&self) -> Result<UntypedMessage, agent_client_protocol::Error> { todo!() }
310/// # fn parse_message(_method: &str, _params: &impl serde::Serialize) -> Result<Self, agent_client_protocol::Error> { Err(agent_client_protocol::Error::method_not_found()) }
311/// # }
312/// impl JsonRpcRequest for MyRequests { type Response = serde_json::Value; }
313///
314/// // Handle all variants in one place
315/// connection.on_receive_request(async |req: MyRequests, responder, cx| {
316/// match req {
317/// MyRequests::Initialize(init) => { responder.respond(serde_json::json!({})) }
318/// MyRequests::Prompt(prompt) => { responder.respond(serde_json::json!({})) }
319/// }
320/// }, agent_client_protocol::on_receive_request!())
321/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
322/// # Ok(())
323/// # }
324/// ```
325///
326/// ## Mixed Message Types
327///
328/// For enums containing both requests AND notifications, use [`on_receive_dispatch`](Self::on_receive_dispatch):
329///
330/// ```no_run
331/// # use agent_client_protocol_test::*;
332/// # use agent_client_protocol::Dispatch;
333/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, SessionNotification};
334/// # async fn example() -> Result<(), agent_client_protocol::Error> {
335/// # let connection = mock_connection();
336/// // on_receive_dispatch receives Dispatch which can be either a request or notification
337/// connection.on_receive_dispatch(async |msg: Dispatch<InitializeRequest, SessionNotification>, _cx| {
338/// match msg {
339/// Dispatch::Request(req, responder) => {
340/// responder.respond(InitializeResponse::make())
341/// }
342/// Dispatch::Notification(notif) => {
343/// Ok(())
344/// }
345/// Dispatch::Response(result, router) => {
346/// // Forward response to its destination
347/// router.respond_with_result(result)
348/// }
349/// }
350/// }, agent_client_protocol::on_receive_dispatch!())
351/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
352/// # Ok(())
353/// # }
354/// ```
355///
356/// # Handler Registration
357///
358/// Register handlers using these methods (listed from most common to most flexible):
359///
360/// * [`on_receive_request`](Self::on_receive_request) - Handle JSON-RPC requests (messages expecting responses)
361/// * [`on_receive_notification`](Self::on_receive_notification) - Handle JSON-RPC notifications (fire-and-forget)
362/// * [`on_receive_dispatch`](Self::on_receive_dispatch) - Handle enums containing both requests and notifications
363/// * [`with_handler`](Self::with_handler) - Low-level primitive for maximum flexibility
364///
365/// ## Handler Ordering
366///
367/// Handlers are tried in the order you register them. The first handler that claims a message
368/// (by matching its type) will process it. Subsequent handlers won't see that message:
369///
370/// ```no_run
371/// # use agent_client_protocol_test::*;
372/// # use agent_client_protocol::{Dispatch, UntypedMessage};
373/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse};
374/// # async fn example() -> Result<(), agent_client_protocol::Error> {
375/// # let connection = mock_connection();
376/// connection
377/// .on_receive_request(async |req: InitializeRequest, responder, cx| {
378/// // This runs first for InitializeRequest
379/// responder.respond(InitializeResponse::make())
380/// }, agent_client_protocol::on_receive_request!())
381/// .on_receive_request(async |req: PromptRequest, responder, cx| {
382/// // This runs first for PromptRequest
383/// responder.respond(PromptResponse::make())
384/// }, agent_client_protocol::on_receive_request!())
385/// .on_receive_dispatch(async |msg: Dispatch, cx| {
386/// // This runs for any message not handled above
387/// msg.respond_with_error(agent_client_protocol::util::internal_error("unknown method"), cx)
388/// }, agent_client_protocol::on_receive_dispatch!())
389/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
390/// # Ok(())
391/// # }
392/// ```
393///
394/// # Event Loop and Concurrency
395///
396/// Understanding the event loop is critical for writing correct handlers.
397///
398/// ## The Event Loop
399///
400/// [`Builder`] runs all handler callbacks on a single async task - the event loop.
401/// While a handler is running, **the server cannot receive new messages**. This means
402/// any blocking or expensive work in your handlers will stall the entire connection.
403///
404/// To avoid blocking the event loop, use [`ConnectionTo::spawn`] to offload serious
405/// work to concurrent tasks:
406///
407/// ```no_run
408/// # use agent_client_protocol_test::*;
409/// # async fn example() -> Result<(), agent_client_protocol::Error> {
410/// # let connection = mock_connection();
411/// connection.on_receive_request(async |req: AnalyzeRequest, responder, cx| {
412/// // Clone cx for the spawned task
413/// cx.spawn({
414/// let connection = cx.clone();
415/// async move {
416/// let result = expensive_analysis(&req.data).await?;
417/// connection.send_notification(AnalysisComplete { result })?;
418/// Ok(())
419/// }
420/// })?;
421///
422/// // Respond immediately without blocking
423/// responder.respond(AnalysisStarted { job_id: 42 })
424/// }, agent_client_protocol::on_receive_request!())
425/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
426/// # Ok(())
427/// # }
428/// ```
429///
430/// Note that the entire connection runs within one async task, so parallelism must be
431/// managed explicitly using [`spawn`](ConnectionTo::spawn).
432///
433/// ## The Connection Context
434///
435/// Handler callbacks receive a context object (`cx`) for interacting with the connection:
436///
437/// * **For request handlers** - [`Responder<R>`] provides [`respond`](Responder::respond)
438/// to send the response, plus methods to send other messages
439/// * **For notification handlers** - [`ConnectionTo`] provides methods to send messages
440/// and spawn tasks
441///
442/// Both context types support:
443/// * [`send_request`](ConnectionTo::send_request) - Send requests to the other side
444/// * [`send_notification`](ConnectionTo::send_notification) - Send notifications
445/// * [`spawn`](ConnectionTo::spawn) - Run tasks concurrently without blocking the event loop
446///
447/// The [`SentRequest`] returned by `send_request` provides methods like
448/// [`on_receiving_result`](SentRequest::on_receiving_result) that help you
449/// avoid accidentally blocking the event loop while waiting for responses.
450///
451/// # Driving the Connection
452///
453/// After adding handlers, you must drive the connection using one of two modes:
454///
455/// ## Server Mode: `connect_to()`
456///
457/// Use [`connect_to`](Self::connect_to) when you only need to respond to incoming messages:
458///
459/// ```no_run
460/// # use agent_client_protocol_test::*;
461/// # async fn example() -> Result<(), agent_client_protocol::Error> {
462/// # let connection = mock_connection();
463/// connection
464/// .on_receive_request(async |req: MyRequest, responder, cx| {
465/// responder.respond(MyResponse { status: "ok".into() })
466/// }, agent_client_protocol::on_receive_request!())
467/// .connect_to(MockTransport) // Runs until connection closes or error occurs
468/// .await?;
469/// # Ok(())
470/// # }
471/// ```
472///
473/// The connection will process incoming messages and invoke your handlers until the
474/// connection is closed or an error occurs.
475///
476/// ## Client Mode: `connect_with()`
477///
478/// Use [`connect_with`](Self::connect_with) when you need to both handle incoming messages
479/// AND send your own requests/notifications:
480///
481/// ```no_run
482/// # use agent_client_protocol_test::*;
483/// # use agent_client_protocol::schema::InitializeRequest;
484/// # async fn example() -> Result<(), agent_client_protocol::Error> {
485/// # let connection = mock_connection();
486/// connection
487/// .on_receive_request(async |req: MyRequest, responder, cx| {
488/// responder.respond(MyResponse { status: "ok".into() })
489/// }, agent_client_protocol::on_receive_request!())
490/// .connect_with(MockTransport, async |cx| {
491/// // You can send requests to the other side
492/// let response = cx.send_request(InitializeRequest::make())
493/// .block_task()
494/// .await?;
495///
496/// // And send notifications
497/// cx.send_notification(StatusUpdate { message: "ready".into() })?;
498///
499/// Ok(())
500/// })
501/// .await?;
502/// # Ok(())
503/// # }
504/// ```
505///
506/// The connection will serve incoming messages in the background while your client closure
507/// runs. When the closure returns, the connection shuts down.
508///
509/// # Example: Complete Agent
510///
511/// ```no_run
512/// # use agent_client_protocol::UntypedRole;
513/// # use agent_client_protocol::{Builder};
514/// # use agent_client_protocol::ByteStreams;
515/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse, SessionNotification};
516/// # use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
517/// # async fn example() -> Result<(), agent_client_protocol::Error> {
518/// let transport = ByteStreams::new(
519/// tokio::io::stdout().compat_write(),
520/// tokio::io::stdin().compat(),
521/// );
522///
523/// UntypedRole.builder()
524/// .name("my-agent") // Optional: for debugging logs
525/// .on_receive_request(async |init: InitializeRequest, responder, cx| {
526/// let response: InitializeResponse = todo!();
527/// responder.respond(response)
528/// }, agent_client_protocol::on_receive_request!())
529/// .on_receive_request(async |prompt: PromptRequest, responder, cx| {
530/// // You can send notifications while processing a request
531/// let notif: SessionNotification = todo!();
532/// cx.send_notification(notif)?;
533///
534/// // Then respond to the request
535/// let response: PromptResponse = todo!();
536/// responder.respond(response)
537/// }, agent_client_protocol::on_receive_request!())
538/// .connect_to(transport)
539/// .await?;
540/// # Ok(())
541/// # }
542/// ```
543#[must_use]
544#[derive(Debug)]
545pub struct Builder<Host: Role, Handler = NullHandler, Runner = NullRun>
546where
547 Handler: HandleDispatchFrom<Host::Counterpart>,
548 Runner: RunWithConnectionTo<Host::Counterpart>,
549{
550 /// My role.
551 host: Host,
552
553 /// Name of the connection, used in tracing logs.
554 name: Option<String>,
555
556 /// Handler for incoming messages.
557 handler: Handler,
558
559 /// Responder for background tasks.
560 responder: Runner,
561}
562
563impl<Host: Role> Builder<Host, NullHandler, NullRun> {
564 /// Create a new connection builder for the given role.
565 /// This type follows a builder pattern; use other methods to configure and then invoke
566 /// [`Self::connect_to`] (to use as a server) or [`Self::connect_with`] to use as a client.
567 pub fn new(role: Host) -> Self {
568 Self {
569 host: role,
570 name: None,
571 handler: NullHandler,
572 responder: NullRun,
573 }
574 }
575}
576
577impl<Host: Role, Handler> Builder<Host, Handler, NullRun>
578where
579 Handler: HandleDispatchFrom<Host::Counterpart>,
580{
581 /// Create a new connection builder with the given handler.
582 pub fn new_with(role: Host, handler: Handler) -> Self {
583 Self {
584 host: role,
585 name: None,
586 handler,
587 responder: NullRun,
588 }
589 }
590}
591
592impl<
593 Host: Role,
594 Handler: HandleDispatchFrom<Host::Counterpart>,
595 Runner: RunWithConnectionTo<Host::Counterpart>,
596> Builder<Host, Handler, Runner>
597{
598 /// Set the "name" of this connection -- used only for debugging logs.
599 pub fn name(mut self, name: impl ToString) -> Self {
600 self.name = Some(name.to_string());
601 self
602 }
603
604 /// Merge another [`Builder`] into this one.
605 ///
606 /// Prefer [`Self::on_receive_request`] or [`Self::on_receive_notification`].
607 /// This is a low-level method that is not intended for general use.
608 pub fn with_connection_builder(
609 self,
610 other: Builder<
611 Host,
612 impl HandleDispatchFrom<Host::Counterpart>,
613 impl RunWithConnectionTo<Host::Counterpart>,
614 >,
615 ) -> Builder<
616 Host,
617 impl HandleDispatchFrom<Host::Counterpart>,
618 impl RunWithConnectionTo<Host::Counterpart>,
619 > {
620 Builder {
621 host: self.host,
622 name: self.name,
623 handler: ChainedHandler::new(
624 self.handler,
625 NamedHandler::new(other.name, other.handler),
626 ),
627 responder: ChainRun::new(self.responder, other.responder),
628 }
629 }
630
631 /// Add a new [`HandleDispatchFrom`] to the chain.
632 ///
633 /// Prefer [`Self::on_receive_request`] or [`Self::on_receive_notification`].
634 /// This is a low-level method that is not intended for general use.
635 pub fn with_handler(
636 self,
637 handler: impl HandleDispatchFrom<Host::Counterpart>,
638 ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner> {
639 Builder {
640 host: self.host,
641 name: self.name,
642 handler: ChainedHandler::new(self.handler, handler),
643 responder: self.responder,
644 }
645 }
646
647 /// Add a new [`RunWithConnectionTo`] to the chain.
648 pub fn with_responder<Run1>(
649 self,
650 responder: Run1,
651 ) -> Builder<Host, Handler, impl RunWithConnectionTo<Host::Counterpart>>
652 where
653 Run1: RunWithConnectionTo<Host::Counterpart>,
654 {
655 Builder {
656 host: self.host,
657 name: self.name,
658 handler: self.handler,
659 responder: ChainRun::new(self.responder, responder),
660 }
661 }
662
663 /// Enqueue a task to run once the connection is actively serving traffic.
664 #[track_caller]
665 pub fn with_spawned<F, Fut>(
666 self,
667 task: F,
668 ) -> Builder<Host, Handler, impl RunWithConnectionTo<Host::Counterpart>>
669 where
670 F: FnOnce(ConnectionTo<Host::Counterpart>) -> Fut + Send,
671 Fut: Future<Output = Result<(), crate::Error>> + Send,
672 {
673 let location = Location::caller();
674 self.with_responder(SpawnedRun::new(location, task))
675 }
676
677 /// Register a handler for messages that can be either requests OR notifications.
678 ///
679 /// Use this when you want to handle an enum type that contains both request and
680 /// notification variants. Your handler receives a [`Dispatch<Req, Notif>`] which
681 /// is an enum with two variants:
682 ///
683 /// - `Dispatch::Request(request, responder)` - A request with its response context
684 /// - `Dispatch::Notification(notification)` - A notification
685 /// - `Dispatch::Response(result, router)` - A response to a request we sent
686 ///
687 /// # Example
688 ///
689 /// ```no_run
690 /// # use agent_client_protocol_test::*;
691 /// # use agent_client_protocol::Dispatch;
692 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
693 /// # let connection = mock_connection();
694 /// connection.on_receive_dispatch(async |message: Dispatch<MyRequest, StatusUpdate>, _cx| {
695 /// match message {
696 /// Dispatch::Request(req, responder) => {
697 /// // Handle request and send response
698 /// responder.respond(MyResponse { status: "ok".into() })
699 /// }
700 /// Dispatch::Notification(notif) => {
701 /// // Handle notification (no response needed)
702 /// Ok(())
703 /// }
704 /// Dispatch::Response(result, router) => {
705 /// // Forward response to its destination
706 /// router.respond_with_result(result)
707 /// }
708 /// }
709 /// }, agent_client_protocol::on_receive_dispatch!())
710 /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
711 /// # Ok(())
712 /// # }
713 /// ```
714 ///
715 /// For most use cases, prefer [`on_receive_request`](Self::on_receive_request) or
716 /// [`on_receive_notification`](Self::on_receive_notification) which provide cleaner APIs
717 /// for handling requests or notifications separately.
718 ///
719 /// # Ordering
720 ///
721 /// This callback runs inside the dispatch loop and blocks further message processing
722 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
723 /// ordering guarantees and how to avoid deadlocks.
724 pub fn on_receive_dispatch<Req, Notif, F, T, ToFut>(
725 self,
726 op: F,
727 to_future_hack: ToFut,
728 ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
729 where
730 Host::Counterpart: HasPeer<Host::Counterpart>,
731 Req: JsonRpcRequest,
732 Notif: JsonRpcNotification,
733 F: AsyncFnMut(
734 Dispatch<Req, Notif>,
735 ConnectionTo<Host::Counterpart>,
736 ) -> Result<T, crate::Error>
737 + Send,
738 T: IntoHandled<Dispatch<Req, Notif>>,
739 ToFut: Fn(
740 &mut F,
741 Dispatch<Req, Notif>,
742 ConnectionTo<Host::Counterpart>,
743 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
744 + Send
745 + Sync,
746 {
747 let handler = MessageHandler::new(
748 self.host.counterpart(),
749 self.host.counterpart(),
750 op,
751 to_future_hack,
752 );
753 self.with_handler(handler)
754 }
755
756 /// Register a handler for JSON-RPC requests of type `Req`.
757 ///
758 /// Your handler receives two arguments:
759 /// 1. The request (type `Req`)
760 /// 2. A [`Responder<R, Req::Response>`] for sending the response
761 ///
762 /// The request context allows you to:
763 /// - Send the response with [`Responder::respond`]
764 /// - Send notifications to the client with [`ConnectionTo::send_notification`]
765 /// - Send requests to the client with [`ConnectionTo::send_request`]
766 ///
767 /// # Example
768 ///
769 /// ```ignore
770 /// # use agent_client_protocol::UntypedRole;
771 /// # use agent_client_protocol::{Builder};
772 /// # use agent_client_protocol::schema::{PromptRequest, PromptResponse, SessionNotification};
773 /// # fn example<R: agent_client_protocol::Role>(connection: Builder<R, impl agent_client_protocol::HandleMessageAs<R>>) {
774 /// connection.on_receive_request(async |request: PromptRequest, responder, cx| {
775 /// // Send a notification while processing
776 /// let notif: SessionNotification = todo!();
777 /// cx.send_notification(notif)?;
778 ///
779 /// // Do some work...
780 /// let result = todo!("process the prompt");
781 ///
782 /// // Send the response
783 /// let response: PromptResponse = todo!();
784 /// responder.respond(response)
785 /// }, agent_client_protocol::on_receive_request!());
786 /// # }
787 /// ```
788 ///
789 /// # Type Parameter
790 ///
791 /// `Req` can be either a single request type or an enum of multiple request types.
792 /// See the [type-driven dispatch](Self#type-driven-message-dispatch) section for details.
793 ///
794 /// # Ordering
795 ///
796 /// This callback runs inside the dispatch loop and blocks further message processing
797 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
798 /// ordering guarantees and how to avoid deadlocks.
799 pub fn on_receive_request<Req: JsonRpcRequest, F, T, ToFut>(
800 self,
801 op: F,
802 to_future_hack: ToFut,
803 ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
804 where
805 Host::Counterpart: HasPeer<Host::Counterpart>,
806 F: AsyncFnMut(
807 Req,
808 Responder<Req::Response>,
809 ConnectionTo<Host::Counterpart>,
810 ) -> Result<T, crate::Error>
811 + Send,
812 T: IntoHandled<(Req, Responder<Req::Response>)>,
813 ToFut: Fn(
814 &mut F,
815 Req,
816 Responder<Req::Response>,
817 ConnectionTo<Host::Counterpart>,
818 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
819 + Send
820 + Sync,
821 {
822 let handler = RequestHandler::new(
823 self.host.counterpart(),
824 self.host.counterpart(),
825 op,
826 to_future_hack,
827 );
828 self.with_handler(handler)
829 }
830
831 /// Register a handler for JSON-RPC notifications of type `Notif`.
832 ///
833 /// Notifications are fire-and-forget messages that don't expect a response.
834 /// Your handler receives:
835 /// 1. The notification (type `Notif`)
836 /// 2. A [`ConnectionTo<R>`] for sending messages to the other side
837 ///
838 /// Unlike request handlers, you cannot send a response (notifications don't have IDs),
839 /// but you can still send your own requests and notifications using the context.
840 ///
841 /// # Example
842 ///
843 /// ```no_run
844 /// # use agent_client_protocol_test::*;
845 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
846 /// # let connection = mock_connection();
847 /// connection.on_receive_notification(async |notif: SessionUpdate, cx| {
848 /// // Process the notification
849 /// update_session_state(¬if)?;
850 ///
851 /// // Optionally send a notification back
852 /// cx.send_notification(StatusUpdate {
853 /// message: "Acknowledged".into(),
854 /// })?;
855 ///
856 /// Ok(())
857 /// }, agent_client_protocol::on_receive_notification!())
858 /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
859 /// # Ok(())
860 /// # }
861 /// ```
862 ///
863 /// # Type Parameter
864 ///
865 /// `Notif` can be either a single notification type or an enum of multiple notification types.
866 /// See the [type-driven dispatch](Self#type-driven-message-dispatch) section for details.
867 ///
868 /// # Ordering
869 ///
870 /// This callback runs inside the dispatch loop and blocks further message processing
871 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
872 /// ordering guarantees and how to avoid deadlocks.
873 pub fn on_receive_notification<Notif, F, T, ToFut>(
874 self,
875 op: F,
876 to_future_hack: ToFut,
877 ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
878 where
879 Host::Counterpart: HasPeer<Host::Counterpart>,
880 Notif: JsonRpcNotification,
881 F: AsyncFnMut(Notif, ConnectionTo<Host::Counterpart>) -> Result<T, crate::Error> + Send,
882 T: IntoHandled<(Notif, ConnectionTo<Host::Counterpart>)>,
883 ToFut: Fn(
884 &mut F,
885 Notif,
886 ConnectionTo<Host::Counterpart>,
887 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
888 + Send
889 + Sync,
890 {
891 let handler = NotificationHandler::new(
892 self.host.counterpart(),
893 self.host.counterpart(),
894 op,
895 to_future_hack,
896 );
897 self.with_handler(handler)
898 }
899
900 /// Register a handler for messages from a specific peer.
901 ///
902 /// This is similar to [`on_receive_dispatch`](Self::on_receive_dispatch), but allows
903 /// specifying the source peer explicitly. This is useful when receiving messages
904 /// from a peer that requires message transformation (e.g., unwrapping `SuccessorMessage`
905 /// envelopes when receiving from an agent via a proxy).
906 ///
907 /// For the common case of receiving from the default counterpart, use
908 /// [`on_receive_dispatch`](Self::on_receive_dispatch) instead.
909 ///
910 /// # Ordering
911 ///
912 /// This callback runs inside the dispatch loop and blocks further message processing
913 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
914 /// ordering guarantees and how to avoid deadlocks.
915 pub fn on_receive_dispatch_from<
916 Req: JsonRpcRequest,
917 Notif: JsonRpcNotification,
918 Peer: Role,
919 F,
920 T,
921 ToFut,
922 >(
923 self,
924 peer: Peer,
925 op: F,
926 to_future_hack: ToFut,
927 ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
928 where
929 Host::Counterpart: HasPeer<Peer>,
930 F: AsyncFnMut(
931 Dispatch<Req, Notif>,
932 ConnectionTo<Host::Counterpart>,
933 ) -> Result<T, crate::Error>
934 + Send,
935 T: IntoHandled<Dispatch<Req, Notif>>,
936 ToFut: Fn(
937 &mut F,
938 Dispatch<Req, Notif>,
939 ConnectionTo<Host::Counterpart>,
940 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
941 + Send
942 + Sync,
943 {
944 let handler = MessageHandler::new(self.host.counterpart(), peer, op, to_future_hack);
945 self.with_handler(handler)
946 }
947
948 /// Register a handler for JSON-RPC requests from a specific peer.
949 ///
950 /// This is similar to [`on_receive_request`](Self::on_receive_request), but allows
951 /// specifying the source peer explicitly. This is useful when receiving messages
952 /// from a peer that requires message transformation (e.g., unwrapping `SuccessorRequest`
953 /// envelopes when receiving from an agent via a proxy).
954 ///
955 /// For the common case of receiving from the default counterpart, use
956 /// [`on_receive_request`](Self::on_receive_request) instead.
957 ///
958 /// # Example
959 ///
960 /// ```ignore
961 /// use agent_client_protocol::Agent;
962 /// use agent_client_protocol::schema::InitializeRequest;
963 ///
964 /// // Conductor receiving from agent direction - messages will be unwrapped from SuccessorMessage
965 /// connection.on_receive_request_from(Agent, async |req: InitializeRequest, responder, cx| {
966 /// // Handle the request
967 /// responder.respond(InitializeResponse::make())
968 /// })
969 /// ```
970 ///
971 /// # Ordering
972 ///
973 /// This callback runs inside the dispatch loop and blocks further message processing
974 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
975 /// ordering guarantees and how to avoid deadlocks.
976 pub fn on_receive_request_from<Req: JsonRpcRequest, Peer: Role, F, T, ToFut>(
977 self,
978 peer: Peer,
979 op: F,
980 to_future_hack: ToFut,
981 ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
982 where
983 Host::Counterpart: HasPeer<Peer>,
984 F: AsyncFnMut(
985 Req,
986 Responder<Req::Response>,
987 ConnectionTo<Host::Counterpart>,
988 ) -> Result<T, crate::Error>
989 + Send,
990 T: IntoHandled<(Req, Responder<Req::Response>)>,
991 ToFut: Fn(
992 &mut F,
993 Req,
994 Responder<Req::Response>,
995 ConnectionTo<Host::Counterpart>,
996 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
997 + Send
998 + Sync,
999 {
1000 let handler = RequestHandler::new(self.host.counterpart(), peer, op, to_future_hack);
1001 self.with_handler(handler)
1002 }
1003
1004 /// Register a handler for JSON-RPC notifications from a specific peer.
1005 ///
1006 /// This is similar to [`on_receive_notification`](Self::on_receive_notification), but allows
1007 /// specifying the source peer explicitly. This is useful when receiving messages
1008 /// from a peer that requires message transformation (e.g., unwrapping `SuccessorNotification`
1009 /// envelopes when receiving from an agent via a proxy).
1010 ///
1011 /// For the common case of receiving from the default counterpart, use
1012 /// [`on_receive_notification`](Self::on_receive_notification) instead.
1013 ///
1014 /// # Ordering
1015 ///
1016 /// This callback runs inside the dispatch loop and blocks further message processing
1017 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
1018 /// ordering guarantees and how to avoid deadlocks.
1019 pub fn on_receive_notification_from<Notif: JsonRpcNotification, Peer: Role, F, T, ToFut>(
1020 self,
1021 peer: Peer,
1022 op: F,
1023 to_future_hack: ToFut,
1024 ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
1025 where
1026 Host::Counterpart: HasPeer<Peer>,
1027 F: AsyncFnMut(Notif, ConnectionTo<Host::Counterpart>) -> Result<T, crate::Error> + Send,
1028 T: IntoHandled<(Notif, ConnectionTo<Host::Counterpart>)>,
1029 ToFut: Fn(
1030 &mut F,
1031 Notif,
1032 ConnectionTo<Host::Counterpart>,
1033 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
1034 + Send
1035 + Sync,
1036 {
1037 let handler = NotificationHandler::new(self.host.counterpart(), peer, op, to_future_hack);
1038 self.with_handler(handler)
1039 }
1040
1041 /// Add an MCP server that will be added to all new sessions that are proxied through this connection.
1042 ///
1043 /// Only applicable to proxies.
1044 pub fn with_mcp_server(
1045 self,
1046 mcp_server: McpServer<Host::Counterpart, impl RunWithConnectionTo<Host::Counterpart>>,
1047 ) -> Builder<
1048 Host,
1049 impl HandleDispatchFrom<Host::Counterpart>,
1050 impl RunWithConnectionTo<Host::Counterpart>,
1051 >
1052 where
1053 Host::Counterpart: HasPeer<Agent> + HasPeer<Client>,
1054 {
1055 let (handler, responder) = mcp_server.into_handler_and_responder();
1056 self.with_handler(handler).with_responder(responder)
1057 }
1058
1059 /// Run in server mode with the provided transport.
1060 ///
1061 /// This drives the connection by continuously processing messages from the transport
1062 /// and dispatching them to your registered handlers. The connection will run until:
1063 /// - The transport closes (e.g., EOF on byte streams)
1064 /// - An error occurs
1065 /// - One of your handlers returns an error
1066 ///
1067 /// The transport is responsible for serializing and deserializing `jsonrpcmsg::Message`
1068 /// values to/from the underlying I/O mechanism (byte streams, channels, etc.).
1069 ///
1070 /// Use this mode when you only need to respond to incoming messages and don't need
1071 /// to initiate your own requests. If you need to send requests to the other side,
1072 /// use [`connect_with`](Self::connect_with) instead.
1073 ///
1074 /// # Example: Byte Stream Transport
1075 ///
1076 /// ```no_run
1077 /// # use agent_client_protocol::UntypedRole;
1078 /// # use agent_client_protocol::{Builder};
1079 /// # use agent_client_protocol::ByteStreams;
1080 /// # use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
1081 /// # use agent_client_protocol_test::*;
1082 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
1083 /// let transport = ByteStreams::new(
1084 /// tokio::io::stdout().compat_write(),
1085 /// tokio::io::stdin().compat(),
1086 /// );
1087 ///
1088 /// UntypedRole.builder()
1089 /// .on_receive_request(async |req: MyRequest, responder, cx| {
1090 /// responder.respond(MyResponse { status: "ok".into() })
1091 /// }, agent_client_protocol::on_receive_request!())
1092 /// .connect_to(transport)
1093 /// .await?;
1094 /// # Ok(())
1095 /// # }
1096 /// ```
1097 pub async fn connect_to(
1098 self,
1099 transport: impl ConnectTo<Host> + 'static,
1100 ) -> Result<(), crate::Error> {
1101 self.connect_with(transport, async move |_cx| future::pending().await)
1102 .await
1103 }
1104
1105 /// Run the connection until the provided closure completes.
1106 ///
1107 /// This drives the connection by:
1108 /// 1. Running your registered handlers in the background to process incoming messages
1109 /// 2. Executing your `main_fn` closure with a [`ConnectionTo<R>`] for sending requests/notifications
1110 ///
1111 /// The connection stays active until your `main_fn` returns, then shuts down gracefully.
1112 /// If the connection closes unexpectedly before `main_fn` completes, this returns an error.
1113 ///
1114 /// Use this mode when you need to initiate communication (send requests/notifications)
1115 /// in addition to responding to incoming messages. For server-only mode where you just
1116 /// respond to messages, use [`connect_to`](Self::connect_to) instead.
1117 ///
1118 /// # Example
1119 ///
1120 /// ```no_run
1121 /// # use agent_client_protocol::UntypedRole;
1122 /// # use agent_client_protocol::{Builder};
1123 /// # use agent_client_protocol::ByteStreams;
1124 /// # use agent_client_protocol::schema::InitializeRequest;
1125 /// # use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
1126 /// # use agent_client_protocol_test::*;
1127 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
1128 /// let transport = ByteStreams::new(
1129 /// tokio::io::stdout().compat_write(),
1130 /// tokio::io::stdin().compat(),
1131 /// );
1132 ///
1133 /// UntypedRole.builder()
1134 /// .on_receive_request(async |req: MyRequest, responder, cx| {
1135 /// // Handle incoming requests in the background
1136 /// responder.respond(MyResponse { status: "ok".into() })
1137 /// }, agent_client_protocol::on_receive_request!())
1138 /// .connect_with(transport, async |cx| {
1139 /// // Initialize the protocol
1140 /// let init_response = cx.send_request(InitializeRequest::make())
1141 /// .block_task()
1142 /// .await?;
1143 ///
1144 /// // Send more requests...
1145 /// let result = cx.send_request(MyRequest {})
1146 /// .block_task()
1147 /// .await?;
1148 ///
1149 /// // When this closure returns, the connection shuts down
1150 /// Ok(())
1151 /// })
1152 /// .await?;
1153 /// # Ok(())
1154 /// # }
1155 /// ```
1156 ///
1157 /// # Parameters
1158 ///
1159 /// - `main_fn`: Your client logic. Receives a [`ConnectionTo<R>`] for sending messages.
1160 ///
1161 /// # Errors
1162 ///
1163 /// Returns an error if the connection closes before `main_fn` completes.
1164 pub async fn connect_with<R>(
1165 self,
1166 transport: impl ConnectTo<Host> + 'static,
1167 main_fn: impl AsyncFnOnce(ConnectionTo<Host::Counterpart>) -> Result<R, crate::Error>,
1168 ) -> Result<R, crate::Error> {
1169 let (_, future) = self.into_connection_and_future(transport, main_fn);
1170 future.await
1171 }
1172
1173 /// Helper that returns a [`ConnectionTo<R>`] and a future that runs this connection until `main_fn` returns.
1174 fn into_connection_and_future<R>(
1175 self,
1176 transport: impl ConnectTo<Host> + 'static,
1177 main_fn: impl AsyncFnOnce(ConnectionTo<Host::Counterpart>) -> Result<R, crate::Error>,
1178 ) -> (
1179 ConnectionTo<Host::Counterpart>,
1180 impl Future<Output = Result<R, crate::Error>>,
1181 ) {
1182 let Self {
1183 name,
1184 handler,
1185 responder,
1186 host: me,
1187 } = self;
1188
1189 let (outgoing_tx, outgoing_rx) = mpsc::unbounded();
1190 let (new_task_tx, new_task_rx) = mpsc::unbounded();
1191 let (dynamic_handler_tx, dynamic_handler_rx) = mpsc::unbounded();
1192 let connection = ConnectionTo::new(
1193 me.counterpart(),
1194 outgoing_tx,
1195 new_task_tx,
1196 dynamic_handler_tx,
1197 );
1198
1199 // Convert transport into server - this returns a channel for us to use
1200 // and a future that runs the transport
1201 let transport_component = crate::DynConnectTo::new(transport);
1202 let (transport_channel, transport_future) = transport_component.into_channel_and_future();
1203 let spawn_result = connection.spawn(transport_future);
1204
1205 // Destructure the channel endpoints
1206 let Channel {
1207 rx: transport_incoming_rx,
1208 tx: transport_outgoing_tx,
1209 } = transport_channel;
1210
1211 let (reply_tx, reply_rx) = mpsc::unbounded();
1212
1213 let future = crate::util::instrument_with_connection_name(name, {
1214 let connection = connection.clone();
1215 async move {
1216 let () = spawn_result?;
1217
1218 let background = async {
1219 futures::try_join!(
1220 // Protocol layer: OutgoingMessage → jsonrpcmsg::Message
1221 outgoing_actor::outgoing_protocol_actor(
1222 outgoing_rx,
1223 reply_tx.clone(),
1224 transport_outgoing_tx,
1225 ),
1226 // Protocol layer: jsonrpcmsg::Message → handler/reply routing
1227 incoming_actor::incoming_protocol_actor(
1228 me.counterpart(),
1229 &connection,
1230 transport_incoming_rx,
1231 dynamic_handler_rx,
1232 reply_rx,
1233 handler,
1234 ),
1235 task_actor::task_actor(new_task_rx, &connection),
1236 responder.run_with_connection_to(connection.clone()),
1237 )?;
1238 Ok(())
1239 };
1240
1241 crate::util::run_until(Box::pin(background), Box::pin(main_fn(connection.clone())))
1242 .await
1243 }
1244 });
1245
1246 (connection, future)
1247 }
1248}
1249
1250impl<R, H, Run> ConnectTo<R::Counterpart> for Builder<R, H, Run>
1251where
1252 R: Role,
1253 H: HandleDispatchFrom<R::Counterpart> + 'static,
1254 Run: RunWithConnectionTo<R::Counterpart> + 'static,
1255{
1256 async fn connect_to(self, client: impl ConnectTo<R>) -> Result<(), crate::Error> {
1257 Builder::connect_to(self, client).await
1258 }
1259}
1260
1261/// The payload sent through the response oneshot channel.
1262///
1263/// Includes the response value and an optional ack channel for dispatch loop
1264/// synchronization.
1265pub(crate) struct ResponsePayload {
1266 /// The response result - either the JSON value or an error.
1267 pub(crate) result: Result<serde_json::Value, crate::Error>,
1268
1269 /// Optional acknowledgment channel for dispatch loop synchronization.
1270 ///
1271 /// When present, the receiver must send on this channel to signal that
1272 /// response processing is complete, allowing the dispatch loop to continue
1273 /// to the next message.
1274 ///
1275 /// This is `None` for error paths where the response is sent directly
1276 /// (e.g., when the outgoing channel is broken) rather than through the
1277 /// normal dispatch loop flow.
1278 pub(crate) ack_tx: Option<oneshot::Sender<()>>,
1279}
1280
1281impl std::fmt::Debug for ResponsePayload {
1282 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1283 f.debug_struct("ResponsePayload")
1284 .field("result", &self.result)
1285 .field("ack_tx", &self.ack_tx.as_ref().map(|_| "..."))
1286 .finish()
1287 }
1288}
1289
1290/// Message sent to the incoming actor for reply subscription management.
1291enum ReplyMessage {
1292 /// Subscribe to receive a response for the given request id.
1293 /// When a response with this id arrives, it will be sent through the oneshot
1294 /// along with an ack channel that must be signaled when processing is complete.
1295 /// The method name is stored to allow routing responses through typed handlers.
1296 Subscribe {
1297 id: jsonrpcmsg::Id,
1298
1299 /// id of the peer this request was sent to
1300 role_id: RoleId,
1301
1302 /// (original) method of the request -- the actual request may have been transformed
1303 /// to a successor method, but this will reflect the method of the wrapped request
1304 method: String,
1305
1306 sender: oneshot::Sender<ResponsePayload>,
1307 },
1308}
1309
1310impl std::fmt::Debug for ReplyMessage {
1311 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1312 match self {
1313 ReplyMessage::Subscribe { id, method, .. } => f
1314 .debug_struct("Subscribe")
1315 .field("id", id)
1316 .field("method", method)
1317 .finish(),
1318 }
1319 }
1320}
1321
1322/// Messages send to be serialized over the transport.
1323#[derive(Debug)]
1324enum OutgoingMessage {
1325 /// Send a request to the server.
1326 Request {
1327 /// id assigned to this request (generated by sender)
1328 id: jsonrpcmsg::Id,
1329
1330 /// the original method
1331 method: String,
1332
1333 /// the peer we sent this to
1334 role_id: RoleId,
1335
1336 /// the message to send; this may have a distinct method
1337 /// depending on the peer
1338 untyped: UntypedMessage,
1339
1340 /// where to send the response when it arrives (includes ack channel)
1341 response_tx: oneshot::Sender<ResponsePayload>,
1342 },
1343
1344 /// Send a notification to the server.
1345 Notification {
1346 /// the message to send; this may have a distinct method
1347 /// depending on the peer
1348 untyped: UntypedMessage,
1349 },
1350
1351 /// Send a response to a message from the server
1352 Response {
1353 id: jsonrpcmsg::Id,
1354
1355 response: Result<serde_json::Value, crate::Error>,
1356 },
1357
1358 /// Send a generalized error message
1359 Error { error: crate::Error },
1360}
1361
1362/// Return type from JrHandler; indicates whether the request was handled or not.
1363#[must_use]
1364#[derive(Debug)]
1365pub enum Handled<T> {
1366 /// The message was handled
1367 Yes,
1368
1369 /// The message was not handled; returns the original value.
1370 ///
1371 /// If `retry` is true,
1372 No {
1373 /// The message to be passed to subsequent handlers
1374 /// (typically the original message, but it may have been
1375 /// mutated.)
1376 message: T,
1377
1378 /// If true, request the message to be queued and retried with
1379 /// dynamic handlers as they are added.
1380 ///
1381 /// This is used for managing session updates since the dynamic
1382 /// handler for a session cannot be added until the response to the
1383 /// new session request has been processed and there may be updates
1384 /// that get processed at the same time.
1385 retry: bool,
1386 },
1387}
1388
1389/// Trait for converting handler return values into [`Handled`].
1390///
1391/// This trait allows handlers to return either `()` (which becomes `Handled::Yes`)
1392/// or an explicit `Handled<T>` value for more control over handler propagation.
1393pub trait IntoHandled<T> {
1394 /// Convert this value into a `Handled<T>`.
1395 fn into_handled(self) -> Handled<T>;
1396}
1397
1398impl<T> IntoHandled<T> for () {
1399 fn into_handled(self) -> Handled<T> {
1400 Handled::Yes
1401 }
1402}
1403
1404impl<T> IntoHandled<T> for Handled<T> {
1405 fn into_handled(self) -> Handled<T> {
1406 self
1407 }
1408}
1409
1410/// Connection context for sending messages and spawning tasks.
1411///
1412/// This is the primary handle for interacting with the JSON-RPC connection from
1413/// within handler callbacks. You can use it to:
1414///
1415/// * Send requests and notifications to the other side
1416/// * Spawn concurrent tasks that run alongside the connection
1417/// * Respond to requests (via [`Responder`] which wraps this)
1418///
1419/// # Cloning
1420///
1421/// `ConnectionTo` is cheaply cloneable - all clones refer to the same underlying connection.
1422/// This makes it easy to share across async tasks.
1423///
1424/// # Event Loop and Concurrency
1425///
1426/// Handler callbacks run on the event loop, which means the connection cannot process new
1427/// messages while your handler is running. Use [`spawn`](Self::spawn) to offload any
1428/// expensive or blocking work to concurrent tasks.
1429///
1430/// See the [Event Loop and Concurrency](Builder#event-loop-and-concurrency) section
1431/// for more details.
1432#[derive(Clone, Debug)]
1433pub struct ConnectionTo<Counterpart: Role> {
1434 counterpart: Counterpart,
1435 message_tx: OutgoingMessageTx,
1436 task_tx: TaskTx,
1437 dynamic_handler_tx: mpsc::UnboundedSender<DynamicHandlerMessage<Counterpart>>,
1438}
1439
1440impl<Counterpart: Role> ConnectionTo<Counterpart> {
1441 fn new(
1442 counterpart: Counterpart,
1443 message_tx: mpsc::UnboundedSender<OutgoingMessage>,
1444 task_tx: mpsc::UnboundedSender<Task>,
1445 dynamic_handler_tx: mpsc::UnboundedSender<DynamicHandlerMessage<Counterpart>>,
1446 ) -> Self {
1447 Self {
1448 counterpart,
1449 message_tx,
1450 task_tx,
1451 dynamic_handler_tx,
1452 }
1453 }
1454
1455 /// Return the counterpart role this connection is talking to.
1456 pub fn counterpart(&self) -> Counterpart {
1457 self.counterpart.clone()
1458 }
1459
1460 /// Spawns a task that will run so long as the JSON-RPC connection is being served.
1461 ///
1462 /// This is the primary mechanism for offloading expensive work from handler callbacks
1463 /// to avoid blocking the event loop. Spawned tasks run concurrently with the connection,
1464 /// allowing the server to continue processing messages.
1465 ///
1466 /// # Event Loop
1467 ///
1468 /// Handler callbacks run on the event loop, which cannot process new messages while
1469 /// your handler is running. Use `spawn` for any expensive operations:
1470 ///
1471 /// ```no_run
1472 /// # use agent_client_protocol_test::*;
1473 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
1474 /// # let connection = mock_connection();
1475 /// connection.on_receive_request(async |req: ProcessRequest, responder, cx| {
1476 /// // Clone cx for the spawned task
1477 /// cx.spawn({
1478 /// let connection = cx.clone();
1479 /// async move {
1480 /// let result = expensive_operation(&req.data).await?;
1481 /// connection.send_notification(ProcessComplete { result })?;
1482 /// Ok(())
1483 /// }
1484 /// })?;
1485 ///
1486 /// // Respond immediately
1487 /// responder.respond(ProcessResponse { result: "started".into() })
1488 /// }, agent_client_protocol::on_receive_request!())
1489 /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
1490 /// # Ok(())
1491 /// # }
1492 /// ```
1493 ///
1494 /// # Errors
1495 ///
1496 /// If the spawned task returns an error, the entire server will shut down.
1497 #[track_caller]
1498 pub fn spawn(
1499 &self,
1500 task: impl IntoFuture<Output = Result<(), crate::Error>, IntoFuture: Send + 'static>,
1501 ) -> Result<(), crate::Error> {
1502 let location = std::panic::Location::caller();
1503 let task = task.into_future();
1504 Task::new(location, task).spawn(&self.task_tx)
1505 }
1506
1507 /// Spawn a JSON-RPC connection in the background and return a [`ConnectionTo`] for sending messages to it.
1508 ///
1509 /// This is useful for creating multiple connections that communicate with each other,
1510 /// such as implementing proxy patterns or connecting to multiple backend services.
1511 ///
1512 /// # Arguments
1513 ///
1514 /// - `builder`: The connection builder with handlers configured
1515 /// - `transport`: The transport component to connect to
1516 ///
1517 /// # Returns
1518 ///
1519 /// A `ConnectionTo` that you can use to send requests and notifications to the spawned connection.
1520 ///
1521 /// # Example: Proxying to a backend connection
1522 ///
1523 /// ```
1524 /// # use agent_client_protocol::UntypedRole;
1525 /// # use agent_client_protocol::{Builder, ConnectionTo};
1526 /// # use agent_client_protocol_test::*;
1527 /// # async fn example(cx: ConnectionTo<UntypedRole>) -> Result<(), agent_client_protocol::Error> {
1528 /// // Set up a backend connection builder
1529 /// let backend = UntypedRole.builder()
1530 /// .on_receive_request(async |req: MyRequest, responder, _cx| {
1531 /// responder.respond(MyResponse { status: "ok".into() })
1532 /// }, agent_client_protocol::on_receive_request!());
1533 ///
1534 /// // Spawn it and get a context to send requests to it
1535 /// let backend_connection = cx.spawn_connection(backend, MockTransport)?;
1536 ///
1537 /// // Now you can forward requests to the backend
1538 /// let response = backend_connection.send_request(MyRequest {}).block_task().await?;
1539 /// # Ok(())
1540 /// # }
1541 /// ```
1542 #[track_caller]
1543 pub fn spawn_connection<R: Role>(
1544 &self,
1545 builder: Builder<
1546 R,
1547 impl HandleDispatchFrom<R::Counterpart> + 'static,
1548 impl RunWithConnectionTo<R::Counterpart> + 'static,
1549 >,
1550 transport: impl ConnectTo<R> + 'static,
1551 ) -> Result<ConnectionTo<R::Counterpart>, crate::Error> {
1552 let (connection, future) =
1553 builder.into_connection_and_future(transport, |_| std::future::pending());
1554 Task::new(std::panic::Location::caller(), future).spawn(&self.task_tx)?;
1555 Ok(connection)
1556 }
1557
1558 /// Send a request/notification and forward the response appropriately.
1559 ///
1560 /// The request context's response type matches the request's response type,
1561 /// enabling type-safe message forwarding.
1562 pub fn send_proxied_message<Req: JsonRpcRequest<Response: Send>, Notif: JsonRpcNotification>(
1563 &self,
1564 message: Dispatch<Req, Notif>,
1565 ) -> Result<(), crate::Error>
1566 where
1567 Counterpart: HasPeer<Counterpart>,
1568 {
1569 self.send_proxied_message_to(self.counterpart(), message)
1570 }
1571
1572 /// Send a request/notification and forward the response appropriately.
1573 ///
1574 /// The request context's response type matches the request's response type,
1575 /// enabling type-safe message forwarding.
1576 pub fn send_proxied_message_to<
1577 Peer: Role,
1578 Req: JsonRpcRequest<Response: Send>,
1579 Notif: JsonRpcNotification,
1580 >(
1581 &self,
1582 peer: Peer,
1583 message: Dispatch<Req, Notif>,
1584 ) -> Result<(), crate::Error>
1585 where
1586 Counterpart: HasPeer<Peer>,
1587 {
1588 match message {
1589 Dispatch::Request(request, responder) => self
1590 .send_request_to(peer, request)
1591 .forward_response_to(responder),
1592 Dispatch::Notification(notification) => self.send_notification_to(peer, notification),
1593 Dispatch::Response(result, router) => {
1594 // Responses are forwarded directly to their destination
1595 router.respond_with_result(result)
1596 }
1597 }
1598 }
1599
1600 /// Send an outgoing request and return a [`SentRequest`] for handling the reply.
1601 ///
1602 /// The returned [`SentRequest`] provides methods for receiving the response without
1603 /// blocking the event loop:
1604 ///
1605 /// * [`on_receiving_result`](SentRequest::on_receiving_result) - Schedule
1606 /// a callback to run when the response arrives (doesn't block the event loop)
1607 /// * [`block_task`](SentRequest::block_task) - Block the current task until the response
1608 /// arrives (only safe in spawned tasks, not in handlers)
1609 ///
1610 /// # Anti-Footgun Design
1611 ///
1612 /// The API intentionally makes it difficult to block on the result directly to prevent
1613 /// the common mistake of blocking the event loop while waiting for a response:
1614 ///
1615 /// ```compile_fail
1616 /// # use agent_client_protocol_test::*;
1617 /// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
1618 /// // ❌ This doesn't compile - prevents blocking the event loop
1619 /// let response = cx.send_request(MyRequest {}).await?;
1620 /// # Ok(())
1621 /// # }
1622 /// ```
1623 ///
1624 /// ```no_run
1625 /// # use agent_client_protocol_test::*;
1626 /// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
1627 /// // ✅ Option 1: Schedule callback (safe in handlers)
1628 /// cx.send_request(MyRequest {})
1629 /// .on_receiving_result(async |result| {
1630 /// // Handle the response
1631 /// Ok(())
1632 /// })?;
1633 ///
1634 /// // ✅ Option 2: Block in spawned task (safe because task is concurrent)
1635 /// cx.spawn({
1636 /// let cx = cx.clone();
1637 /// async move {
1638 /// let response = cx.send_request(MyRequest {})
1639 /// .block_task()
1640 /// .await?;
1641 /// // Process response...
1642 /// Ok(())
1643 /// }
1644 /// })?;
1645 /// # Ok(())
1646 /// # }
1647 /// ```
1648 /// Send an outgoing request to the default counterpart peer.
1649 ///
1650 /// This is a convenience method that sends to the counterpart role `R`.
1651 /// For explicit control over the target peer, use [`send_request_to`](Self::send_request_to).
1652 pub fn send_request<Req: JsonRpcRequest>(&self, request: Req) -> SentRequest<Req::Response>
1653 where
1654 Counterpart: HasPeer<Counterpart>,
1655 {
1656 self.send_request_to(self.counterpart.clone(), request)
1657 }
1658
1659 /// Send an outgoing request to a specific peer.
1660 ///
1661 /// The message will be transformed according to the [`HasPeer`](crate::role::HasPeer)
1662 /// implementation before being sent.
1663 pub fn send_request_to<Peer: Role, Req: JsonRpcRequest>(
1664 &self,
1665 peer: Peer,
1666 request: Req,
1667 ) -> SentRequest<Req::Response>
1668 where
1669 Counterpart: HasPeer<Peer>,
1670 {
1671 let method = request.method().to_string();
1672 let id = jsonrpcmsg::Id::String(uuid::Uuid::new_v4().to_string());
1673 let (response_tx, response_rx) = oneshot::channel();
1674 let role_id = peer.role_id();
1675 let remote_style = self.counterpart.remote_style(peer);
1676 match remote_style.transform_outgoing_message(request) {
1677 Ok(untyped) => {
1678 // Transform the message for the target role
1679 let message = OutgoingMessage::Request {
1680 id: id.clone(),
1681 method: method.clone(),
1682 role_id,
1683 untyped,
1684 response_tx,
1685 };
1686
1687 match self.message_tx.unbounded_send(message) {
1688 Ok(()) => (),
1689 Err(error) => {
1690 let OutgoingMessage::Request {
1691 method,
1692 response_tx,
1693 ..
1694 } = error.into_inner()
1695 else {
1696 unreachable!();
1697 };
1698
1699 response_tx
1700 .send(ResponsePayload {
1701 result: Err(crate::util::internal_error(format!(
1702 "failed to send outgoing request `{method}"
1703 ))),
1704 ack_tx: None,
1705 })
1706 .unwrap();
1707 }
1708 }
1709 }
1710
1711 Err(err) => {
1712 response_tx
1713 .send(ResponsePayload {
1714 result: Err(crate::util::internal_error(format!(
1715 "failed to create untyped request for `{method}`: {err}"
1716 ))),
1717 ack_tx: None,
1718 })
1719 .unwrap();
1720 }
1721 }
1722
1723 SentRequest::new(id, method.clone(), self.task_tx.clone(), response_rx)
1724 .map(move |json| <Req::Response>::from_value(&method, json))
1725 }
1726
1727 /// Send an outgoing notification to the default counterpart peer (no reply expected).
1728 ///
1729 /// Notifications are fire-and-forget messages that don't have IDs and don't expect responses.
1730 /// This method sends the notification immediately and returns.
1731 ///
1732 /// This is a convenience method that sends to the counterpart role `R`.
1733 /// For explicit control over the target peer, use [`send_notification_to`](Self::send_notification_to).
1734 ///
1735 /// ```no_run
1736 /// # use agent_client_protocol_test::*;
1737 /// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::Agent>) -> Result<(), agent_client_protocol::Error> {
1738 /// cx.send_notification(StatusUpdate {
1739 /// message: "Processing...".into(),
1740 /// })?;
1741 /// # Ok(())
1742 /// # }
1743 /// ```
1744 pub fn send_notification<N: JsonRpcNotification>(
1745 &self,
1746 notification: N,
1747 ) -> Result<(), crate::Error>
1748 where
1749 Counterpart: HasPeer<Counterpart>,
1750 {
1751 self.send_notification_to(self.counterpart.clone(), notification)
1752 }
1753
1754 /// Send an outgoing notification to a specific peer (no reply expected).
1755 ///
1756 /// The message will be transformed according to the [`HasPeer`](crate::role::HasPeer)
1757 /// implementation before being sent.
1758 pub fn send_notification_to<Peer: Role, N: JsonRpcNotification>(
1759 &self,
1760 peer: Peer,
1761 notification: N,
1762 ) -> Result<(), crate::Error>
1763 where
1764 Counterpart: HasPeer<Peer>,
1765 {
1766 let remote_style = self.counterpart.remote_style(peer);
1767 tracing::debug!(
1768 role = std::any::type_name::<Counterpart>(),
1769 peer = std::any::type_name::<Peer>(),
1770 notification_type = std::any::type_name::<N>(),
1771 ?remote_style,
1772 original_method = notification.method(),
1773 "send_notification_to"
1774 );
1775 let transformed = remote_style.transform_outgoing_message(notification)?;
1776 tracing::debug!(
1777 transformed_method = %transformed.method,
1778 "send_notification_to transformed"
1779 );
1780 send_raw_message(
1781 &self.message_tx,
1782 OutgoingMessage::Notification {
1783 untyped: transformed,
1784 },
1785 )
1786 }
1787
1788 /// Send an error notification (no reply expected).
1789 pub fn send_error_notification(&self, error: crate::Error) -> Result<(), crate::Error> {
1790 send_raw_message(&self.message_tx, OutgoingMessage::Error { error })
1791 }
1792
1793 /// Register a dynamic message handler, used to intercept messages specific to a particular session
1794 /// or some similar modal thing.
1795 ///
1796 /// Dynamic message handlers are called first for every incoming message.
1797 ///
1798 /// If they decline to handle the message, then the message is passed to the regular registered handlers.
1799 ///
1800 /// The handler will stay registered until the returned registration guard is dropped.
1801 pub fn add_dynamic_handler(
1802 &self,
1803 handler: impl HandleDispatchFrom<Counterpart> + 'static,
1804 ) -> Result<DynamicHandlerRegistration<Counterpart>, crate::Error> {
1805 let uuid = Uuid::new_v4();
1806 self.dynamic_handler_tx
1807 .unbounded_send(DynamicHandlerMessage::AddDynamicHandler(
1808 uuid,
1809 Box::new(handler),
1810 ))
1811 .map_err(crate::util::internal_error)?;
1812
1813 Ok(DynamicHandlerRegistration::new(uuid, self.clone()))
1814 }
1815
1816 fn remove_dynamic_handler(&self, uuid: Uuid) {
1817 // Ignore errors
1818 drop(
1819 self.dynamic_handler_tx
1820 .unbounded_send(DynamicHandlerMessage::RemoveDynamicHandler(uuid)),
1821 );
1822 }
1823}
1824
1825#[derive(Clone, Debug)]
1826pub struct DynamicHandlerRegistration<R: Role> {
1827 uuid: Uuid,
1828 cx: ConnectionTo<R>,
1829}
1830
1831impl<R: Role> DynamicHandlerRegistration<R> {
1832 fn new(uuid: Uuid, cx: ConnectionTo<R>) -> Self {
1833 Self { uuid, cx }
1834 }
1835
1836 /// Prevents the dynamic handler from being removed when dropped.
1837 pub fn run_indefinitely(self) {
1838 std::mem::forget(self);
1839 }
1840}
1841
1842impl<R: Role> Drop for DynamicHandlerRegistration<R> {
1843 fn drop(&mut self) {
1844 self.cx.remove_dynamic_handler(self.uuid);
1845 }
1846}
1847
1848/// The context to respond to an incoming request.
1849///
1850/// This context is provided to request handlers and serves a dual role:
1851///
1852/// 1. **Respond to the request** - Use [`respond`](Self::respond) or
1853/// [`respond_with_result`](Self::respond_with_result) to send the response
1854/// 2. **Send other messages** - Use the [`ConnectionTo`] parameter passed to your
1855/// handler, which provides [`send_request`](`ConnectionTo::send_request`),
1856/// [`send_notification`](`ConnectionTo::send_notification`), and
1857/// [`spawn`](`ConnectionTo::spawn`)
1858///
1859/// # Example
1860///
1861/// ```no_run
1862/// # use agent_client_protocol_test::*;
1863/// # async fn example() -> Result<(), agent_client_protocol::Error> {
1864/// # let connection = mock_connection();
1865/// connection.on_receive_request(async |req: ProcessRequest, responder, cx| {
1866/// // Send a notification while processing
1867/// cx.send_notification(StatusUpdate {
1868/// message: "processing".into(),
1869/// })?;
1870///
1871/// // Do some work...
1872/// let result = process(&req.data)?;
1873///
1874/// // Respond to the request
1875/// responder.respond(ProcessResponse { result })
1876/// }, agent_client_protocol::on_receive_request!())
1877/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
1878/// # Ok(())
1879/// # }
1880/// ```
1881///
1882/// # Event Loop Considerations
1883///
1884/// Like all handlers, request handlers run on the event loop. Use
1885/// [`spawn`](ConnectionTo::spawn) for expensive operations to avoid blocking
1886/// the connection.
1887///
1888/// See the [Event Loop and Concurrency](Builder#event-loop-and-concurrency)
1889/// section for more details.
1890#[must_use]
1891pub struct Responder<T: JsonRpcResponse = serde_json::Value> {
1892 /// The method of the request.
1893 method: String,
1894
1895 /// The `id` of the message we are replying to.
1896 id: jsonrpcmsg::Id,
1897
1898 /// Function to send the response to its destination.
1899 ///
1900 /// For incoming requests: serializes to JSON and sends over the wire.
1901 /// For incoming responses: sends to the waiting oneshot channel.
1902 send_fn: Box<dyn FnOnce(Result<T, crate::Error>) -> Result<(), crate::Error> + Send>,
1903}
1904
1905impl<T: JsonRpcResponse> std::fmt::Debug for Responder<T> {
1906 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1907 f.debug_struct("Responder")
1908 .field("method", &self.method)
1909 .field("id", &self.id)
1910 .field("response_type", &std::any::type_name::<T>())
1911 .finish_non_exhaustive()
1912 }
1913}
1914
1915impl Responder<serde_json::Value> {
1916 /// Create a new request context for an incoming request.
1917 ///
1918 /// The response will be serialized to JSON and sent over the wire.
1919 fn new(message_tx: OutgoingMessageTx, method: String, id: jsonrpcmsg::Id) -> Self {
1920 let id_clone = id.clone();
1921 Self {
1922 method,
1923 id,
1924 send_fn: Box::new(move |response: Result<serde_json::Value, crate::Error>| {
1925 send_raw_message(
1926 &message_tx,
1927 OutgoingMessage::Response {
1928 id: id_clone,
1929 response,
1930 },
1931 )
1932 }),
1933 }
1934 }
1935
1936 /// Cast this request context to a different response type.
1937 ///
1938 /// The provided type `T` will be serialized to JSON before sending.
1939 pub fn cast<T: JsonRpcResponse>(self) -> Responder<T> {
1940 self.wrap_params(move |method, value| match value {
1941 Ok(value) => T::into_json(value, method),
1942 Err(e) => Err(e),
1943 })
1944 }
1945}
1946
1947impl<T: JsonRpcResponse> Responder<T> {
1948 /// Method of the incoming request
1949 #[must_use]
1950 pub fn method(&self) -> &str {
1951 &self.method
1952 }
1953
1954 /// ID of the incoming request/response as a JSON value
1955 #[must_use]
1956 pub fn id(&self) -> serde_json::Value {
1957 crate::util::id_to_json(&self.id)
1958 }
1959
1960 /// Convert to a `Responder` that expects a JSON value
1961 /// and which checks (dynamically) that the JSON value it receives
1962 /// can be converted to `T`.
1963 pub fn erase_to_json(self) -> Responder<serde_json::Value> {
1964 self.wrap_params(|method, value| T::from_value(method, value?))
1965 }
1966
1967 /// Return a new Responder with a different method name.
1968 pub fn wrap_method(self, method: String) -> Responder<T> {
1969 Responder {
1970 method,
1971 id: self.id,
1972 send_fn: self.send_fn,
1973 }
1974 }
1975
1976 /// Return a new Responder that expects a response of type U.
1977 ///
1978 /// `wrap_fn` will be invoked with the method name and the result to transform
1979 /// type `U` into type `T` before sending.
1980 pub fn wrap_params<U: JsonRpcResponse>(
1981 self,
1982 wrap_fn: impl FnOnce(&str, Result<U, crate::Error>) -> Result<T, crate::Error> + Send + 'static,
1983 ) -> Responder<U> {
1984 let method = self.method.clone();
1985 Responder {
1986 method: self.method,
1987 id: self.id,
1988 send_fn: Box::new(move |input: Result<U, crate::Error>| {
1989 let t_value = wrap_fn(&method, input);
1990 (self.send_fn)(t_value)
1991 }),
1992 }
1993 }
1994
1995 /// Respond to the JSON-RPC request with either a value (`Ok`) or an error (`Err`).
1996 pub fn respond_with_result(
1997 self,
1998 response: Result<T, crate::Error>,
1999 ) -> Result<(), crate::Error> {
2000 tracing::debug!(id = ?self.id, "respond called");
2001 (self.send_fn)(response)
2002 }
2003
2004 /// Respond to the JSON-RPC request with a value.
2005 pub fn respond(self, response: T) -> Result<(), crate::Error> {
2006 self.respond_with_result(Ok(response))
2007 }
2008
2009 /// Respond to the JSON-RPC request with an internal error containing a message.
2010 pub fn respond_with_internal_error(self, message: impl ToString) -> Result<(), crate::Error> {
2011 self.respond_with_error(crate::util::internal_error(message))
2012 }
2013
2014 /// Respond to the JSON-RPC request with an error.
2015 pub fn respond_with_error(self, error: crate::Error) -> Result<(), crate::Error> {
2016 tracing::debug!(id = ?self.id, ?error, "respond_with_error called");
2017 self.respond_with_result(Err(error))
2018 }
2019}
2020
2021/// Context for handling an incoming JSON-RPC response.
2022///
2023/// This is the response-side counterpart to [`Responder`]. While `Responder` handles
2024/// incoming requests (where you send a response over the wire), `ResponseRouter` handles
2025/// incoming responses (where you route the response to a local task waiting for it).
2026///
2027/// Both are fundamentally "sinks" that push the message through a `send_fn`, but they
2028/// represent different points in the message lifecycle and carry different metadata.
2029#[must_use]
2030pub struct ResponseRouter<T: JsonRpcResponse = serde_json::Value> {
2031 /// The method of the original request.
2032 method: String,
2033
2034 /// The `id` of the original request.
2035 id: jsonrpcmsg::Id,
2036
2037 /// The RoleId to which the original request was sent
2038 /// (and hence from which the reply is expected).
2039 role_id: RoleId,
2040
2041 /// Function to send the response to the waiting task.
2042 send_fn: Box<dyn FnOnce(Result<T, crate::Error>) -> Result<(), crate::Error> + Send>,
2043}
2044
2045impl<T: JsonRpcResponse> std::fmt::Debug for ResponseRouter<T> {
2046 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2047 f.debug_struct("ResponseRouter")
2048 .field("method", &self.method)
2049 .field("id", &self.id)
2050 .field("response_type", &std::any::type_name::<T>())
2051 .finish_non_exhaustive()
2052 }
2053}
2054
2055impl ResponseRouter<serde_json::Value> {
2056 /// Create a new response context for routing a response to a local awaiter.
2057 ///
2058 /// When `respond_with_result` is called, the response is sent through the oneshot
2059 /// channel to the code that originally sent the request.
2060 pub(crate) fn new(
2061 method: String,
2062 id: jsonrpcmsg::Id,
2063 role_id: RoleId,
2064 sender: oneshot::Sender<ResponsePayload>,
2065 ) -> Self {
2066 Self {
2067 method,
2068 id,
2069 role_id,
2070 send_fn: Box::new(move |response: Result<serde_json::Value, crate::Error>| {
2071 sender
2072 .send(ResponsePayload {
2073 result: response,
2074 ack_tx: None,
2075 })
2076 .map_err(|_| {
2077 crate::util::internal_error("failed to send response, receiver dropped")
2078 })
2079 }),
2080 }
2081 }
2082
2083 /// Cast this response context to a different response type.
2084 ///
2085 /// The provided type `T` will be serialized to JSON before sending.
2086 pub fn cast<T: JsonRpcResponse>(self) -> ResponseRouter<T> {
2087 self.wrap_params(move |method, value| match value {
2088 Ok(value) => T::into_json(value, method),
2089 Err(e) => Err(e),
2090 })
2091 }
2092}
2093
2094impl<T: JsonRpcResponse> ResponseRouter<T> {
2095 /// Method of the original request
2096 #[must_use]
2097 pub fn method(&self) -> &str {
2098 &self.method
2099 }
2100
2101 /// ID of the original request as a JSON value
2102 #[must_use]
2103 pub fn id(&self) -> serde_json::Value {
2104 crate::util::id_to_json(&self.id)
2105 }
2106
2107 /// The peer to which the original request was sent.
2108 ///
2109 /// This is the peer from which we expect to receive the response.
2110 #[must_use]
2111 pub fn role_id(&self) -> RoleId {
2112 self.role_id.clone()
2113 }
2114
2115 /// Convert to a `ResponseRouter` that expects a JSON value
2116 /// and which checks (dynamically) that the JSON value it receives
2117 /// can be converted to `T`.
2118 pub fn erase_to_json(self) -> ResponseRouter<serde_json::Value> {
2119 self.wrap_params(|method, value| T::from_value(method, value?))
2120 }
2121
2122 /// Return a new ResponseRouter that expects a response of type U.
2123 ///
2124 /// `wrap_fn` will be invoked with the method name and the result to transform
2125 /// type `U` into type `T` before sending.
2126 fn wrap_params<U: JsonRpcResponse>(
2127 self,
2128 wrap_fn: impl FnOnce(&str, Result<U, crate::Error>) -> Result<T, crate::Error> + Send + 'static,
2129 ) -> ResponseRouter<U> {
2130 let method = self.method.clone();
2131 ResponseRouter {
2132 method: self.method,
2133 id: self.id,
2134 role_id: self.role_id,
2135 send_fn: Box::new(move |input: Result<U, crate::Error>| {
2136 let t_value = wrap_fn(&method, input);
2137 (self.send_fn)(t_value)
2138 }),
2139 }
2140 }
2141
2142 /// Complete the response by sending the result to the waiting task.
2143 pub fn respond_with_result(
2144 self,
2145 response: Result<T, crate::Error>,
2146 ) -> Result<(), crate::Error> {
2147 tracing::debug!(id = ?self.id, "response routed to awaiter");
2148 (self.send_fn)(response)
2149 }
2150
2151 /// Complete the response by sending a value to the waiting task.
2152 pub fn respond(self, response: T) -> Result<(), crate::Error> {
2153 self.respond_with_result(Ok(response))
2154 }
2155
2156 /// Complete the response by sending an internal error to the waiting task.
2157 pub fn respond_with_internal_error(self, message: impl ToString) -> Result<(), crate::Error> {
2158 self.respond_with_error(crate::util::internal_error(message))
2159 }
2160
2161 /// Complete the response by sending an error to the waiting task.
2162 pub fn respond_with_error(self, error: crate::Error) -> Result<(), crate::Error> {
2163 tracing::debug!(id = ?self.id, ?error, "error routed to awaiter");
2164 self.respond_with_result(Err(error))
2165 }
2166}
2167
2168/// Common bounds for any JSON-RPC message.
2169///
2170/// # Derive Macro
2171///
2172/// For simple message types, you can use the `JsonRpcRequest` or `JsonRpcNotification` derive macros
2173/// which will implement both `JsonRpcMessage` and the respective trait. See [`JsonRpcRequest`] and
2174/// [`JsonRpcNotification`] for examples.
2175pub trait JsonRpcMessage: 'static + Debug + Sized + Send + Clone {
2176 /// Check if this message type matches the given method name.
2177 fn matches_method(method: &str) -> bool;
2178
2179 /// The method name for the message.
2180 fn method(&self) -> &str;
2181
2182 /// Convert this message into an untyped message.
2183 fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error>;
2184
2185 /// Parse this type from a method name and parameters.
2186 ///
2187 /// Returns an error if the method doesn't match or deserialization fails.
2188 /// Callers should use `matches_method` first to check if this type handles the method.
2189 fn parse_message(method: &str, params: &impl Serialize) -> Result<Self, crate::Error>;
2190}
2191
2192/// Defines the "payload" of a successful response to a JSON-RPC request.
2193///
2194/// # Derive Macro
2195///
2196/// Use `#[derive(JsonRpcResponse)]` to automatically implement this trait:
2197///
2198/// ```ignore
2199/// use agent_client_protocol::JsonRpcResponse;
2200/// use serde::{Serialize, Deserialize};
2201///
2202/// #[derive(Debug, Serialize, Deserialize, JsonRpcResponse)]
2203/// #[response(method = "_hello")]
2204/// struct HelloResponse {
2205/// greeting: String,
2206/// }
2207/// ```
2208pub trait JsonRpcResponse: 'static + Debug + Sized + Send + Clone {
2209 /// Convert this message into a JSON value.
2210 fn into_json(self, method: &str) -> Result<serde_json::Value, crate::Error>;
2211
2212 /// Parse a JSON value into the response type.
2213 fn from_value(method: &str, value: serde_json::Value) -> Result<Self, crate::Error>;
2214}
2215
2216impl JsonRpcResponse for serde_json::Value {
2217 fn from_value(_method: &str, value: serde_json::Value) -> Result<Self, crate::Error> {
2218 Ok(value)
2219 }
2220
2221 fn into_json(self, _method: &str) -> Result<serde_json::Value, crate::Error> {
2222 Ok(self)
2223 }
2224}
2225
2226/// A struct that represents a notification (JSON-RPC message that does not expect a response).
2227///
2228/// # Derive Macro
2229///
2230/// Use `#[derive(JsonRpcNotification)]` to automatically implement both `JsonRpcMessage` and `JsonRpcNotification`:
2231///
2232/// ```ignore
2233/// use agent_client_protocol::JsonRpcNotification;
2234/// use serde::{Serialize, Deserialize};
2235///
2236/// #[derive(Debug, Clone, Serialize, Deserialize, JsonRpcNotification)]
2237/// #[notification(method = "_ping")]
2238/// struct PingNotification {
2239/// timestamp: u64,
2240/// }
2241/// ```
2242pub trait JsonRpcNotification: JsonRpcMessage {}
2243
2244/// A struct that represents a request (JSON-RPC message expecting a response).
2245///
2246/// # Derive Macro
2247///
2248/// Use `#[derive(JsonRpcRequest)]` to automatically implement both `JsonRpcMessage` and `JsonRpcRequest`:
2249///
2250/// ```ignore
2251/// use agent_client_protocol::{JsonRpcRequest, JsonRpcResponse};
2252/// use serde::{Serialize, Deserialize};
2253///
2254/// #[derive(Debug, Clone, Serialize, Deserialize, JsonRpcRequest)]
2255/// #[request(method = "_hello", response = HelloResponse)]
2256/// struct HelloRequest {
2257/// name: String,
2258/// }
2259///
2260/// #[derive(Debug, Serialize, Deserialize, JsonRpcResponse)]
2261/// struct HelloResponse {
2262/// greeting: String,
2263/// }
2264/// ```
2265pub trait JsonRpcRequest: JsonRpcMessage {
2266 /// The type of data expected in response.
2267 type Response: JsonRpcResponse;
2268}
2269
2270/// An enum capturing an in-flight request or notification.
2271/// In the case of a request, also includes the context used to respond to the request.
2272///
2273/// Type parameters allow specifying the concrete request and notification types.
2274/// By default, both are `UntypedMessage` for dynamic dispatch.
2275/// The request context's response type matches the request's response type.
2276#[derive(Debug)]
2277pub enum Dispatch<Req: JsonRpcRequest = UntypedMessage, Notif: JsonRpcMessage = UntypedMessage> {
2278 /// Incoming request and the context where the response should be sent.
2279 Request(Req, Responder<Req::Response>),
2280
2281 /// Incoming notification.
2282 Notification(Notif),
2283
2284 /// Incoming response to a request we sent.
2285 ///
2286 /// The first field is the response result (success or error from the remote).
2287 /// The second field is the context for forwarding the response to its destination
2288 /// (typically a waiting oneshot channel).
2289 Response(
2290 Result<Req::Response, crate::Error>,
2291 ResponseRouter<Req::Response>,
2292 ),
2293}
2294
2295impl<Req: JsonRpcRequest, Notif: JsonRpcMessage> Dispatch<Req, Notif> {
2296 /// Map the request and notification types to new types.
2297 ///
2298 /// Note: Response variants are passed through unchanged since they don't
2299 /// contain a parseable message payload.
2300 pub fn map<Req1, Notif1>(
2301 self,
2302 map_request: impl FnOnce(Req, Responder<Req::Response>) -> (Req1, Responder<Req1::Response>),
2303 map_notification: impl FnOnce(Notif) -> Notif1,
2304 ) -> Dispatch<Req1, Notif1>
2305 where
2306 Req1: JsonRpcRequest<Response = Req::Response>,
2307 Notif1: JsonRpcMessage,
2308 {
2309 match self {
2310 Dispatch::Request(request, responder) => {
2311 let (new_request, new_responder) = map_request(request, responder);
2312 Dispatch::Request(new_request, new_responder)
2313 }
2314 Dispatch::Notification(notification) => {
2315 let new_notification = map_notification(notification);
2316 Dispatch::Notification(new_notification)
2317 }
2318 Dispatch::Response(result, router) => Dispatch::Response(result, router),
2319 }
2320 }
2321
2322 /// Respond to the message with an error.
2323 ///
2324 /// If this message is a request, this error becomes the reply to the request.
2325 ///
2326 /// If this message is a notification, the error is sent as a notification.
2327 ///
2328 /// If this message is a response, the error is forwarded to the waiting handler.
2329 pub fn respond_with_error<R: Role>(
2330 self,
2331 error: crate::Error,
2332 cx: ConnectionTo<R>,
2333 ) -> Result<(), crate::Error> {
2334 match self {
2335 Dispatch::Request(_, responder) => responder.respond_with_error(error),
2336 Dispatch::Notification(_) => cx.send_error_notification(error),
2337 Dispatch::Response(_, responder) => responder.respond_with_error(error),
2338 }
2339 }
2340
2341 /// Convert to a `Responder` that expects a JSON value
2342 /// and which checks (dynamically) that the JSON value it receives
2343 /// can be converted to `T`.
2344 ///
2345 /// Note: Response variants cannot be erased since their payload is already
2346 /// parsed. This returns an error for Response variants.
2347 pub fn erase_to_json(self) -> Result<Dispatch, crate::Error> {
2348 match self {
2349 Dispatch::Request(response, responder) => Ok(Dispatch::Request(
2350 response.to_untyped_message()?,
2351 responder.erase_to_json(),
2352 )),
2353 Dispatch::Notification(notification) => {
2354 Ok(Dispatch::Notification(notification.to_untyped_message()?))
2355 }
2356 Dispatch::Response(_, _) => Err(crate::util::internal_error(
2357 "cannot erase Response variant to JSON",
2358 )),
2359 }
2360 }
2361
2362 /// Convert the message in self to an untyped message.
2363 ///
2364 /// Note: Response variants don't have an untyped message representation.
2365 /// This returns an error for Response variants.
2366 pub fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error> {
2367 match self {
2368 Dispatch::Request(request, _) => request.to_untyped_message(),
2369 Dispatch::Notification(notification) => notification.to_untyped_message(),
2370 Dispatch::Response(_, _) => Err(crate::util::internal_error(
2371 "Response variant has no untyped message representation",
2372 )),
2373 }
2374 }
2375
2376 /// Convert self to an untyped message context.
2377 ///
2378 /// Note: Response variants cannot be converted. This returns an error for Response variants.
2379 pub fn into_untyped_dispatch(self) -> Result<Dispatch, crate::Error> {
2380 match self {
2381 Dispatch::Request(request, responder) => Ok(Dispatch::Request(
2382 request.to_untyped_message()?,
2383 responder.erase_to_json(),
2384 )),
2385 Dispatch::Notification(notification) => {
2386 Ok(Dispatch::Notification(notification.to_untyped_message()?))
2387 }
2388 Dispatch::Response(_, _) => Err(crate::util::internal_error(
2389 "cannot convert Response variant to untyped message context",
2390 )),
2391 }
2392 }
2393
2394 /// Returns the request ID if this is a request or response, None if notification.
2395 pub fn id(&self) -> Option<serde_json::Value> {
2396 match self {
2397 Dispatch::Request(_, cx) => Some(cx.id()),
2398 Dispatch::Notification(_) => None,
2399 Dispatch::Response(_, cx) => Some(cx.id()),
2400 }
2401 }
2402
2403 /// Returns the method of the message.
2404 ///
2405 /// For requests and notifications, this is the method from the message payload.
2406 /// For responses, this is the method of the original request.
2407 pub fn method(&self) -> &str {
2408 match self {
2409 Dispatch::Request(msg, _) => msg.method(),
2410 Dispatch::Notification(msg) => msg.method(),
2411 Dispatch::Response(_, cx) => cx.method(),
2412 }
2413 }
2414}
2415
2416impl Dispatch {
2417 /// Attempts to parse `self` into a typed message context.
2418 ///
2419 /// # Returns
2420 ///
2421 /// * `Ok(Ok(typed))` if this is a request/notification of the given types
2422 /// * `Ok(Err(self))` if not
2423 /// * `Err` if has the correct method for the given types but parsing fails
2424 #[tracing::instrument(skip(self), fields(Request = ?std::any::type_name::<Req>(), Notif = ?std::any::type_name::<Notif>()), level = "trace", ret)]
2425 pub(crate) fn into_typed_dispatch<Req: JsonRpcRequest, Notif: JsonRpcNotification>(
2426 self,
2427 ) -> Result<Result<Dispatch<Req, Notif>, Dispatch>, crate::Error> {
2428 tracing::debug!(
2429 message = ?self,
2430 "into_typed_dispatch"
2431 );
2432 match self {
2433 Dispatch::Request(message, responder) => {
2434 if Req::matches_method(&message.method) {
2435 match Req::parse_message(&message.method, &message.params) {
2436 Ok(req) => {
2437 tracing::trace!(?req, "parsed ok");
2438 Ok(Ok(Dispatch::Request(req, responder.cast())))
2439 }
2440 Err(err) => {
2441 tracing::trace!(?err, "parse error");
2442 Err(err)
2443 }
2444 }
2445 } else {
2446 tracing::trace!("method doesn't match");
2447 Ok(Err(Dispatch::Request(message, responder)))
2448 }
2449 }
2450
2451 Dispatch::Notification(message) => {
2452 if Notif::matches_method(&message.method) {
2453 match Notif::parse_message(&message.method, &message.params) {
2454 Ok(notif) => {
2455 tracing::trace!(?notif, "parse ok");
2456 Ok(Ok(Dispatch::Notification(notif)))
2457 }
2458 Err(err) => {
2459 tracing::trace!(?err, "parse error");
2460 Err(err)
2461 }
2462 }
2463 } else {
2464 tracing::trace!("method doesn't match");
2465 Ok(Err(Dispatch::Notification(message)))
2466 }
2467 }
2468
2469 Dispatch::Response(result, cx) => {
2470 let method = cx.method();
2471 if Req::matches_method(method) {
2472 // Parse the response result
2473 let typed_result = match result {
2474 Ok(value) => {
2475 match <Req::Response as JsonRpcResponse>::from_value(method, value) {
2476 Ok(parsed) => {
2477 tracing::trace!(?parsed, "parse ok");
2478 Ok(parsed)
2479 }
2480 Err(err) => {
2481 tracing::trace!(?err, "parse error");
2482 return Err(err);
2483 }
2484 }
2485 }
2486 Err(err) => {
2487 tracing::trace!("error, passthrough");
2488 Err(err)
2489 }
2490 };
2491 Ok(Ok(Dispatch::Response(typed_result, cx.cast())))
2492 } else {
2493 tracing::trace!("method doesn't match");
2494 Ok(Err(Dispatch::Response(result, cx)))
2495 }
2496 }
2497 }
2498 }
2499
2500 /// True if this message has a field with the given name.
2501 ///
2502 /// Returns `false` for Response variants.
2503 #[must_use]
2504 pub fn has_field(&self, field_name: &str) -> bool {
2505 self.message()
2506 .and_then(|m| m.params().get(field_name))
2507 .is_some()
2508 }
2509
2510 /// Returns true if this message has a session-id field.
2511 ///
2512 /// Returns `false` for Response variants.
2513 pub(crate) fn has_session_id(&self) -> bool {
2514 self.has_field("sessionId")
2515 }
2516
2517 /// Extract the ACP session-id from this message (if any).
2518 ///
2519 /// Returns `Ok(None)` for Response variants.
2520 pub(crate) fn get_session_id(&self) -> Result<Option<SessionId>, crate::Error> {
2521 let Some(message) = self.message() else {
2522 return Ok(None);
2523 };
2524 let Some(value) = message.params().get("sessionId") else {
2525 return Ok(None);
2526 };
2527 let session_id = serde_json::from_value(value.clone())?;
2528 Ok(Some(session_id))
2529 }
2530
2531 /// Try to parse this as a notification of the given type.
2532 ///
2533 /// # Returns
2534 ///
2535 /// * `Ok(Ok(typed))` if this is a request/notification of the given types
2536 /// * `Ok(Err(self))` if not
2537 /// * `Err` if has the correct method for the given types but parsing fails
2538 pub fn into_notification<N: JsonRpcNotification>(
2539 self,
2540 ) -> Result<Result<N, Dispatch>, crate::Error> {
2541 match self {
2542 Dispatch::Notification(msg) => {
2543 if !N::matches_method(&msg.method) {
2544 return Ok(Err(Dispatch::Notification(msg)));
2545 }
2546 match N::parse_message(&msg.method, &msg.params) {
2547 Ok(n) => Ok(Ok(n)),
2548 Err(err) => Err(err),
2549 }
2550 }
2551 Dispatch::Request(..) | Dispatch::Response(..) => Ok(Err(self)),
2552 }
2553 }
2554
2555 /// Try to parse this as a request of the given type.
2556 ///
2557 /// # Returns
2558 ///
2559 /// * `Ok(Ok(typed))` if this is a request/notification of the given types
2560 /// * `Ok(Err(self))` if not
2561 /// * `Err` if has the correct method for the given types but parsing fails
2562 pub fn into_request<Req: JsonRpcRequest>(
2563 self,
2564 ) -> Result<Result<(Req, Responder<Req::Response>), Dispatch>, crate::Error> {
2565 match self {
2566 Dispatch::Request(msg, responder) => {
2567 if !Req::matches_method(&msg.method) {
2568 return Ok(Err(Dispatch::Request(msg, responder)));
2569 }
2570 match Req::parse_message(&msg.method, &msg.params) {
2571 Ok(req) => Ok(Ok((req, responder.cast()))),
2572 Err(err) => Err(err),
2573 }
2574 }
2575 Dispatch::Notification(..) | Dispatch::Response(..) => Ok(Err(self)),
2576 }
2577 }
2578}
2579
2580impl<M: JsonRpcRequest + JsonRpcNotification> Dispatch<M, M> {
2581 /// Returns the message payload for requests and notifications.
2582 ///
2583 /// Returns `None` for Response variants since they don't contain a message payload.
2584 pub fn message(&self) -> Option<&M> {
2585 match self {
2586 Dispatch::Request(msg, _) | Dispatch::Notification(msg) => Some(msg),
2587 Dispatch::Response(_, _) => None,
2588 }
2589 }
2590
2591 /// Map the request/notification message.
2592 ///
2593 /// Response variants pass through unchanged.
2594 pub(crate) fn try_map_message(
2595 self,
2596 map_message: impl FnOnce(M) -> Result<M, crate::Error>,
2597 ) -> Result<Dispatch<M, M>, crate::Error> {
2598 match self {
2599 Dispatch::Request(request, cx) => Ok(Dispatch::Request(map_message(request)?, cx)),
2600 Dispatch::Notification(notification) => {
2601 Ok(Dispatch::<M, M>::Notification(map_message(notification)?))
2602 }
2603 Dispatch::Response(result, cx) => Ok(Dispatch::Response(result, cx)),
2604 }
2605 }
2606}
2607
2608/// An incoming JSON message without any typing. Can be a request or a notification.
2609#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
2610pub struct UntypedMessage {
2611 /// The JSON-RPC method name
2612 pub method: String,
2613 /// The JSON-RPC parameters as a raw JSON value
2614 pub params: serde_json::Value,
2615}
2616
2617impl UntypedMessage {
2618 /// Returns an untyped message with the given method and parameters.
2619 pub fn new(method: &str, params: impl Serialize) -> Result<Self, crate::Error> {
2620 let params = serde_json::to_value(params)?;
2621 Ok(Self {
2622 method: method.to_string(),
2623 params,
2624 })
2625 }
2626
2627 /// Returns the method name
2628 #[must_use]
2629 pub fn method(&self) -> &str {
2630 &self.method
2631 }
2632
2633 /// Returns the parameters as a JSON value
2634 #[must_use]
2635 pub fn params(&self) -> &serde_json::Value {
2636 &self.params
2637 }
2638
2639 /// Consumes this message and returns the method and params
2640 #[must_use]
2641 pub fn into_parts(self) -> (String, serde_json::Value) {
2642 (self.method, self.params)
2643 }
2644
2645 /// Convert `self` to a JSON-RPC message.
2646 pub(crate) fn into_jsonrpc_msg(
2647 self,
2648 id: Option<jsonrpcmsg::Id>,
2649 ) -> Result<jsonrpcmsg::Request, crate::Error> {
2650 let Self { method, params } = self;
2651 Ok(jsonrpcmsg::Request::new_v2(method, json_cast(params)?, id))
2652 }
2653}
2654
2655impl JsonRpcMessage for UntypedMessage {
2656 fn matches_method(_method: &str) -> bool {
2657 // UntypedMessage matches any method - it's the untyped fallback
2658 true
2659 }
2660
2661 fn method(&self) -> &str {
2662 &self.method
2663 }
2664
2665 fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error> {
2666 Ok(self.clone())
2667 }
2668
2669 fn parse_message(method: &str, params: &impl Serialize) -> Result<Self, crate::Error> {
2670 UntypedMessage::new(method, params)
2671 }
2672}
2673
2674impl JsonRpcRequest for UntypedMessage {
2675 type Response = serde_json::Value;
2676}
2677
2678impl JsonRpcNotification for UntypedMessage {}
2679
2680/// Represents a pending response of type `R` from an outgoing request.
2681///
2682/// Returned by [`ConnectionTo::send_request`], this type provides methods for handling
2683/// the response without blocking the event loop. The API is intentionally designed to make
2684/// it difficult to accidentally block.
2685///
2686/// # Anti-Footgun Design
2687///
2688/// You cannot directly `.await` a `SentRequest`. Instead, you must choose how to handle
2689/// the response:
2690///
2691/// ## Option 1: Schedule a Callback (Safe in Handlers)
2692///
2693/// Use [`on_receiving_result`](Self::on_receiving_result) to schedule a task
2694/// that runs when the response arrives. This doesn't block the event loop:
2695///
2696/// ```no_run
2697/// # use agent_client_protocol_test::*;
2698/// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
2699/// cx.send_request(MyRequest {})
2700/// .on_receiving_result(async |result| {
2701/// match result {
2702/// Ok(response) => {
2703/// // Handle successful response
2704/// Ok(())
2705/// }
2706/// Err(error) => {
2707/// // Handle error
2708/// Err(error)
2709/// }
2710/// }
2711/// })?;
2712/// # Ok(())
2713/// # }
2714/// ```
2715///
2716/// ## Option 2: Block in a Spawned Task (Safe Only in `spawn`)
2717///
2718/// Use [`block_task`](Self::block_task) to block until the response arrives, but **only**
2719/// in a spawned task (never in a handler):
2720///
2721/// ```no_run
2722/// # use agent_client_protocol_test::*;
2723/// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
2724/// // ✅ Safe: Spawned task runs concurrently
2725/// cx.spawn({
2726/// let cx = cx.clone();
2727/// async move {
2728/// let response = cx.send_request(MyRequest {})
2729/// .block_task()
2730/// .await?;
2731/// // Process response...
2732/// Ok(())
2733/// }
2734/// })?;
2735/// # Ok(())
2736/// # }
2737/// ```
2738///
2739/// ```no_run
2740/// # use agent_client_protocol_test::*;
2741/// # async fn example() -> Result<(), agent_client_protocol::Error> {
2742/// # let connection = mock_connection();
2743/// // ❌ NEVER do this in a handler - blocks the event loop!
2744/// connection.on_receive_request(async |req: MyRequest, responder, cx| {
2745/// let response = cx.send_request(MyRequest {})
2746/// .block_task() // This will deadlock!
2747/// .await?;
2748/// responder.respond(response)
2749/// }, agent_client_protocol::on_receive_request!())
2750/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
2751/// # Ok(())
2752/// # }
2753/// ```
2754///
2755/// # Why This Design?
2756///
2757/// If you block the event loop while waiting for a response, the connection cannot process
2758/// the incoming response message, creating a deadlock. This API design prevents that footgun
2759/// by making blocking explicit and encouraging non-blocking patterns.
2760pub struct SentRequest<T> {
2761 id: jsonrpcmsg::Id,
2762 method: String,
2763 task_tx: TaskTx,
2764 response_rx: oneshot::Receiver<ResponsePayload>,
2765 to_result: Box<dyn Fn(serde_json::Value) -> Result<T, crate::Error> + Send>,
2766}
2767
2768impl<T: Debug> Debug for SentRequest<T> {
2769 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2770 f.debug_struct("SentRequest")
2771 .field("id", &self.id)
2772 .field("method", &self.method)
2773 .field("task_tx", &self.task_tx)
2774 .field("response_rx", &self.response_rx)
2775 .finish_non_exhaustive()
2776 }
2777}
2778
2779impl SentRequest<serde_json::Value> {
2780 fn new(
2781 id: jsonrpcmsg::Id,
2782 method: String,
2783 task_tx: mpsc::UnboundedSender<Task>,
2784 response_rx: oneshot::Receiver<ResponsePayload>,
2785 ) -> Self {
2786 Self {
2787 id,
2788 method,
2789 response_rx,
2790 task_tx,
2791 to_result: Box::new(Ok),
2792 }
2793 }
2794}
2795
2796impl<T: JsonRpcResponse> SentRequest<T> {
2797 /// The id of the outgoing request.
2798 #[must_use]
2799 pub fn id(&self) -> serde_json::Value {
2800 crate::util::id_to_json(&self.id)
2801 }
2802
2803 /// The method of the request this is in response to.
2804 #[must_use]
2805 pub fn method(&self) -> &str {
2806 &self.method
2807 }
2808
2809 /// Create a new response that maps the result of the response to a new type.
2810 pub fn map<U>(
2811 self,
2812 map_fn: impl Fn(T) -> Result<U, crate::Error> + 'static + Send,
2813 ) -> SentRequest<U> {
2814 SentRequest {
2815 id: self.id,
2816 method: self.method,
2817 response_rx: self.response_rx,
2818 task_tx: self.task_tx,
2819 to_result: Box::new(move |value| map_fn((self.to_result)(value)?)),
2820 }
2821 }
2822
2823 /// Forward the response (success or error) to a request context when it arrives.
2824 ///
2825 /// This is a convenience method for proxying messages between connections. When the
2826 /// response arrives, it will be automatically sent to the provided request context,
2827 /// whether it's a successful response or an error.
2828 ///
2829 /// # Example: Proxying requests
2830 ///
2831 /// ```
2832 /// # use agent_client_protocol::UntypedRole;
2833 /// # use agent_client_protocol::{Builder, ConnectionTo};
2834 /// # use agent_client_protocol_test::*;
2835 /// # async fn example(cx: ConnectionTo<UntypedRole>) -> Result<(), agent_client_protocol::Error> {
2836 /// // Set up backend connection builder
2837 /// let backend = UntypedRole.builder()
2838 /// .on_receive_request(async |req: MyRequest, responder, cx| {
2839 /// responder.respond(MyResponse { status: "ok".into() })
2840 /// }, agent_client_protocol::on_receive_request!());
2841 ///
2842 /// // Spawn backend and get a context to send to it
2843 /// let backend_connection = cx.spawn_connection(backend, MockTransport)?;
2844 ///
2845 /// // Set up proxy that forwards requests to backend
2846 /// UntypedRole.builder()
2847 /// .on_receive_request({
2848 /// let backend_connection = backend_connection.clone();
2849 /// async move |req: MyRequest, responder, cx| {
2850 /// // Forward the request to backend and proxy the response back
2851 /// backend_connection.send_request(req)
2852 /// .forward_response_to(responder)?;
2853 /// Ok(())
2854 /// }
2855 /// }, agent_client_protocol::on_receive_request!());
2856 /// # Ok(())
2857 /// # }
2858 /// ```
2859 ///
2860 /// # Type Safety
2861 ///
2862 /// The request context's response type must match the request's response type,
2863 /// ensuring type-safe message forwarding.
2864 ///
2865 /// # When to Use
2866 ///
2867 /// Use this when:
2868 /// - You're implementing a proxy or gateway pattern
2869 /// - You want to forward responses without processing them
2870 /// - The response types match between the outgoing request and incoming request
2871 ///
2872 /// This is equivalent to calling `on_receiving_result` and manually forwarding
2873 /// the result, but more concise.
2874 pub fn forward_response_to(self, responder: Responder<T>) -> Result<(), crate::Error>
2875 where
2876 T: Send,
2877 {
2878 self.on_receiving_result(async move |result| responder.respond_with_result(result))
2879 }
2880
2881 /// Block the current task until the response is received.
2882 ///
2883 /// **Warning:** This method blocks the current async task. It is **only safe** to use
2884 /// in spawned tasks created with [`ConnectionTo::spawn`]. Using it directly in a
2885 /// handler callback will deadlock the connection.
2886 ///
2887 /// # Safe Usage (in spawned tasks)
2888 ///
2889 /// ```no_run
2890 /// # use agent_client_protocol_test::*;
2891 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
2892 /// # let connection = mock_connection();
2893 /// connection.on_receive_request(async |req: MyRequest, responder, cx| {
2894 /// // Spawn a task to handle the request
2895 /// cx.spawn({
2896 /// let connection = cx.clone();
2897 /// async move {
2898 /// // Safe: We're in a spawned task, not blocking the event loop
2899 /// let response = connection.send_request(OtherRequest {})
2900 /// .block_task()
2901 /// .await?;
2902 ///
2903 /// // Process the response...
2904 /// Ok(())
2905 /// }
2906 /// })?;
2907 ///
2908 /// // Respond immediately
2909 /// responder.respond(MyResponse { status: "ok".into() })
2910 /// }, agent_client_protocol::on_receive_request!())
2911 /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
2912 /// # Ok(())
2913 /// # }
2914 /// ```
2915 ///
2916 /// # Unsafe Usage (in handlers - will deadlock!)
2917 ///
2918 /// ```no_run
2919 /// # use agent_client_protocol_test::*;
2920 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
2921 /// # let connection = mock_connection();
2922 /// connection.on_receive_request(async |req: MyRequest, responder, cx| {
2923 /// // ❌ DEADLOCK: Handler blocks event loop, which can't process the response
2924 /// let response = cx.send_request(OtherRequest {})
2925 /// .block_task()
2926 /// .await?;
2927 ///
2928 /// responder.respond(MyResponse { status: response.value })
2929 /// }, agent_client_protocol::on_receive_request!())
2930 /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
2931 /// # Ok(())
2932 /// # }
2933 /// ```
2934 ///
2935 /// # When to Use
2936 ///
2937 /// Use this method when:
2938 /// - You're in a spawned task (via [`ConnectionTo::spawn`])
2939 /// - You need the response value to proceed with your logic
2940 /// - Linear control flow is more natural than callbacks
2941 ///
2942 /// For handler callbacks, use [`on_receiving_result`](Self::on_receiving_result) instead.
2943 pub async fn block_task(self) -> Result<T, crate::Error>
2944 where
2945 T: Send,
2946 {
2947 match self.response_rx.await {
2948 Ok(ResponsePayload {
2949 result: Ok(json_value),
2950 ack_tx,
2951 }) => {
2952 // Ack immediately - we're in a spawned task, so the dispatch loop
2953 // can continue while we process the value.
2954 if let Some(tx) = ack_tx {
2955 let _ = tx.send(());
2956 }
2957 match (self.to_result)(json_value) {
2958 Ok(value) => Ok(value),
2959 Err(err) => Err(err),
2960 }
2961 }
2962 Ok(ResponsePayload {
2963 result: Err(err),
2964 ack_tx,
2965 }) => {
2966 if let Some(tx) = ack_tx {
2967 let _ = tx.send(());
2968 }
2969 Err(err)
2970 }
2971 Err(err) => Err(crate::util::internal_error(format!(
2972 "response to `{}` never received: {}",
2973 self.method, err
2974 ))),
2975 }
2976 }
2977
2978 /// Schedule an async task to run when a successful response is received.
2979 ///
2980 /// This is a convenience wrapper around [`on_receiving_result`](Self::on_receiving_result)
2981 /// for the common pattern of forwarding errors to a request context while only processing
2982 /// successful responses.
2983 ///
2984 /// # Behavior
2985 ///
2986 /// - If the response is `Ok(value)`, your task receives the value and the request context
2987 /// - If the response is `Err(error)`, the error is automatically sent to `responder`
2988 /// and your task is not called
2989 ///
2990 /// # Example: Chaining requests
2991 ///
2992 /// ```no_run
2993 /// # use agent_client_protocol_test::*;
2994 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
2995 /// # let connection = mock_connection();
2996 /// connection.on_receive_request(async |req: ValidateRequest, responder, cx| {
2997 /// // Send initial request
2998 /// cx.send_request(ValidateRequest { data: req.data.clone() })
2999 /// .on_receiving_ok_result(responder, async |validation, responder| {
3000 /// // Only runs if validation succeeded
3001 /// if validation.is_valid {
3002 /// // Respond to original request
3003 /// responder.respond(ValidateResponse { is_valid: true, error: None })
3004 /// } else {
3005 /// responder.respond_with_error(agent_client_protocol::util::internal_error("validation failed"))
3006 /// }
3007 /// })?;
3008 ///
3009 /// Ok(())
3010 /// }, agent_client_protocol::on_receive_request!())
3011 /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
3012 /// # Ok(())
3013 /// # }
3014 /// ```
3015 ///
3016 /// # Ordering
3017 ///
3018 /// Like [`on_receiving_result`](Self::on_receiving_result), the callback blocks the
3019 /// dispatch loop until it completes. See the [`ordering`](crate::concepts::ordering) module
3020 /// for details.
3021 ///
3022 /// # When to Use
3023 ///
3024 /// Use this when:
3025 /// - You need to respond to a request based on another request's result
3026 /// - You want errors to automatically propagate to the request context
3027 /// - You only care about the success case
3028 ///
3029 /// For more control over error handling, use [`on_receiving_result`](Self::on_receiving_result).
3030 #[track_caller]
3031 pub fn on_receiving_ok_result<F>(
3032 self,
3033 responder: Responder<T>,
3034 task: impl FnOnce(T, Responder<T>) -> F + 'static + Send,
3035 ) -> Result<(), crate::Error>
3036 where
3037 F: Future<Output = Result<(), crate::Error>> + 'static + Send,
3038 T: Send,
3039 {
3040 self.on_receiving_result(async move |result| match result {
3041 Ok(value) => task(value, responder).await,
3042 Err(err) => responder.respond_with_error(err),
3043 })
3044 }
3045
3046 /// Schedule an async task to run when the response is received.
3047 ///
3048 /// This is the recommended way to handle responses in handler callbacks, as it doesn't
3049 /// block the event loop. The task will be spawned automatically when the response arrives.
3050 ///
3051 /// # Example: Handle response in callback
3052 ///
3053 /// ```no_run
3054 /// # use agent_client_protocol_test::*;
3055 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
3056 /// # let connection = mock_connection();
3057 /// connection.on_receive_request(async |req: MyRequest, responder, cx| {
3058 /// // Send a request and schedule a callback for the response
3059 /// cx.send_request(QueryRequest { id: 22 })
3060 /// .on_receiving_result({
3061 /// let connection = cx.clone();
3062 /// async move |result| {
3063 /// match result {
3064 /// Ok(response) => {
3065 /// println!("Got response: {:?}", response);
3066 /// // Can send more messages here
3067 /// connection.send_notification(QueryComplete {})?;
3068 /// Ok(())
3069 /// }
3070 /// Err(error) => {
3071 /// eprintln!("Request failed: {}", error);
3072 /// Err(error)
3073 /// }
3074 /// }
3075 /// }
3076 /// })?;
3077 ///
3078 /// // Handler continues immediately without waiting
3079 /// responder.respond(MyResponse { status: "processing".into() })
3080 /// }, agent_client_protocol::on_receive_request!())
3081 /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
3082 /// # Ok(())
3083 /// # }
3084 /// ```
3085 ///
3086 /// # Ordering
3087 ///
3088 /// The callback runs as a spawned task, but the dispatch loop waits for it to complete
3089 /// before processing the next message. This gives you ordering guarantees: no other
3090 /// messages will be processed while your callback runs.
3091 ///
3092 /// This differs from [`block_task`](Self::block_task), which signals completion immediately
3093 /// upon receiving the response (before your code processes it).
3094 ///
3095 /// See the [`ordering`](crate::concepts::ordering) module for details on ordering guarantees
3096 /// and how to avoid deadlocks.
3097 ///
3098 /// # Error Handling
3099 ///
3100 /// If the scheduled task returns `Err`, the entire server will shut down. Make sure to handle
3101 /// errors appropriately within your task.
3102 ///
3103 /// # When to Use
3104 ///
3105 /// Use this method when:
3106 /// - You're in a handler callback (not a spawned task)
3107 /// - You want ordering guarantees (no other messages processed during your callback)
3108 /// - You need to do async work before "releasing" control back to the dispatch loop
3109 ///
3110 /// For spawned tasks where you don't need ordering guarantees, consider [`block_task`](Self::block_task).
3111 #[track_caller]
3112 pub fn on_receiving_result<F>(
3113 self,
3114 task: impl FnOnce(Result<T, crate::Error>) -> F + 'static + Send,
3115 ) -> Result<(), crate::Error>
3116 where
3117 F: Future<Output = Result<(), crate::Error>> + 'static + Send,
3118 T: Send,
3119 {
3120 let task_tx = self.task_tx.clone();
3121 let method = self.method;
3122 let response_rx = self.response_rx;
3123 let to_result = self.to_result;
3124 let location = Location::caller();
3125
3126 Task::new(location, async move {
3127 match response_rx.await {
3128 Ok(ResponsePayload { result, ack_tx }) => {
3129 // Convert the result using to_result for Ok values
3130 let typed_result = match result {
3131 Ok(json_value) => to_result(json_value),
3132 Err(err) => Err(err),
3133 };
3134
3135 // Run the user's callback
3136 let outcome = task(typed_result).await;
3137
3138 // Ack AFTER the callback completes - this is the key difference
3139 // from block_task. The dispatch loop waits for this ack.
3140 if let Some(tx) = ack_tx {
3141 let _ = tx.send(());
3142 }
3143
3144 outcome
3145 }
3146 Err(err) => Err(crate::util::internal_error(format!(
3147 "response to `{method}` never received: {err}"
3148 ))),
3149 }
3150 })
3151 .spawn(&task_tx)
3152 }
3153}
3154
3155// ============================================================================
3156// IntoJrConnectionTransport Implementations
3157// ============================================================================
3158
3159/// A component that communicates over line streams.
3160///
3161/// `Lines` implements the [`ConnectTo`] trait for any pair of line-based streams
3162/// (a `Stream<Item = io::Result<String>>` for incoming and a `Sink<String>` for outgoing),
3163/// handling serialization of JSON-RPC messages to/from newline-delimited JSON.
3164///
3165/// This is a lower-level primitive than [`ByteStreams`] that enables interception and
3166/// transformation of individual lines before they are parsed or after they are serialized.
3167/// This is particularly useful for debugging, logging, or implementing custom line-based
3168/// protocols.
3169///
3170/// # Use Cases
3171///
3172/// - **Line-by-line logging**: Intercept and log each line before parsing
3173/// - **Custom protocols**: Transform lines before/after JSON-RPC processing
3174/// - **Debugging**: Inspect raw message strings
3175/// - **Line filtering**: Skip or modify specific messages
3176///
3177/// Most users should use [`ByteStreams`] instead, which provides a simpler interface
3178/// for byte-based I/O.
3179///
3180/// [`ConnectTo`]: crate::ConnectTo
3181#[derive(Debug)]
3182pub struct Lines<OutgoingSink, IncomingStream> {
3183 /// Outgoing line sink (where we write serialized JSON-RPC messages)
3184 pub outgoing: OutgoingSink,
3185 /// Incoming line stream (where we read and parse JSON-RPC messages)
3186 pub incoming: IncomingStream,
3187}
3188
3189impl<OutgoingSink, IncomingStream> Lines<OutgoingSink, IncomingStream>
3190where
3191 OutgoingSink: futures::Sink<String, Error = std::io::Error> + Send + 'static,
3192 IncomingStream: futures::Stream<Item = std::io::Result<String>> + Send + 'static,
3193{
3194 /// Create a new line stream transport.
3195 pub fn new(outgoing: OutgoingSink, incoming: IncomingStream) -> Self {
3196 Self { outgoing, incoming }
3197 }
3198}
3199
3200impl<OutgoingSink, IncomingStream, R: Role> ConnectTo<R> for Lines<OutgoingSink, IncomingStream>
3201where
3202 OutgoingSink: futures::Sink<String, Error = std::io::Error> + Send + 'static,
3203 IncomingStream: futures::Stream<Item = std::io::Result<String>> + Send + 'static,
3204{
3205 async fn connect_to(self, client: impl ConnectTo<R::Counterpart>) -> Result<(), crate::Error> {
3206 let (channel, serve_self) = ConnectTo::<R>::into_channel_and_future(self);
3207 match futures::future::select(Box::pin(client.connect_to(channel)), serve_self).await {
3208 Either::Left((result, _)) | Either::Right((result, _)) => result,
3209 }
3210 }
3211
3212 fn into_channel_and_future(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
3213 let Self { outgoing, incoming } = self;
3214
3215 // Create a channel pair for the client to use
3216 let (channel_for_caller, channel_for_lines) = Channel::duplex();
3217
3218 // Create the server future that runs the line stream actors
3219 let server_future = Box::pin(async move {
3220 let Channel { rx, tx } = channel_for_lines;
3221
3222 // Run both actors concurrently
3223 let outgoing_future = transport_actor::transport_outgoing_lines_actor(rx, outgoing);
3224 let incoming_future = transport_actor::transport_incoming_lines_actor(incoming, tx);
3225
3226 // Wait for both to complete
3227 futures::try_join!(outgoing_future, incoming_future)?;
3228
3229 Ok(())
3230 });
3231
3232 (channel_for_caller, server_future)
3233 }
3234}
3235
3236/// A component that communicates over byte streams (stdin/stdout, sockets, pipes, etc.).
3237///
3238/// `ByteStreams` implements the [`ConnectTo`] trait for any pair of `AsyncRead` and `AsyncWrite`
3239/// streams, handling serialization of JSON-RPC messages to/from newline-delimited JSON.
3240/// This is the standard way to communicate with external processes or network connections.
3241///
3242/// # Use Cases
3243///
3244/// - **Stdio communication**: Connect to agents or proxies via stdin/stdout
3245/// - **Network sockets**: TCP, Unix domain sockets, or other stream-based protocols
3246/// - **Named pipes**: Cross-process communication on the same machine
3247/// - **File I/O**: Reading from and writing to file descriptors
3248///
3249/// # Example
3250///
3251/// Connecting to an agent via stdio:
3252///
3253/// ```no_run
3254/// use agent_client_protocol::UntypedRole;
3255/// # use agent_client_protocol::{ByteStreams};
3256/// use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
3257///
3258/// # async fn example() -> Result<(), agent_client_protocol::Error> {
3259/// let component = ByteStreams::new(
3260/// tokio::io::stdout().compat_write(),
3261/// tokio::io::stdin().compat(),
3262/// );
3263///
3264/// // Use as a component in a connection
3265/// agent_client_protocol::UntypedRole.builder()
3266/// .name("my-client")
3267/// .connect_to(component)
3268/// .await?;
3269/// # Ok(())
3270/// # }
3271/// ```
3272///
3273/// [`ConnectTo`]: crate::ConnectTo
3274#[derive(Debug)]
3275pub struct ByteStreams<OB, IB> {
3276 /// Outgoing byte stream (where we write serialized messages)
3277 pub outgoing: OB,
3278 /// Incoming byte stream (where we read and parse messages)
3279 pub incoming: IB,
3280}
3281
3282impl<OB, IB> ByteStreams<OB, IB>
3283where
3284 OB: AsyncWrite + Send + 'static,
3285 IB: AsyncRead + Send + 'static,
3286{
3287 /// Create a new byte stream transport.
3288 pub fn new(outgoing: OB, incoming: IB) -> Self {
3289 Self { outgoing, incoming }
3290 }
3291}
3292
3293impl<OB, IB, R: Role> ConnectTo<R> for ByteStreams<OB, IB>
3294where
3295 OB: AsyncWrite + Send + 'static,
3296 IB: AsyncRead + Send + 'static,
3297{
3298 async fn connect_to(self, client: impl ConnectTo<R::Counterpart>) -> Result<(), crate::Error> {
3299 let (channel, serve_self) = ConnectTo::<R>::into_channel_and_future(self);
3300 match futures::future::select(pin!(client.connect_to(channel)), serve_self).await {
3301 Either::Left((result, _)) | Either::Right((result, _)) => result,
3302 }
3303 }
3304
3305 fn into_channel_and_future(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
3306 use futures::AsyncBufReadExt;
3307 use futures::AsyncWriteExt;
3308 use futures::io::BufReader;
3309 let Self { outgoing, incoming } = self;
3310
3311 // Convert byte streams to line streams
3312 // Box both streams to satisfy Unpin requirements
3313 let incoming_lines = Box::pin(BufReader::new(incoming).lines());
3314
3315 // Create a sink that writes lines (with newlines) to the outgoing byte stream
3316 // We need to Box the writer since it may not be Unpin
3317 let outgoing_sink =
3318 futures::sink::unfold(Box::pin(outgoing), async move |mut writer, line: String| {
3319 let mut bytes = line.into_bytes();
3320 bytes.push(b'\n');
3321 writer.write_all(&bytes).await?;
3322 Ok::<_, std::io::Error>(writer)
3323 });
3324
3325 // Delegate to Lines component
3326 ConnectTo::<R>::into_channel_and_future(Lines::new(outgoing_sink, incoming_lines))
3327 }
3328}
3329
3330/// A channel endpoint representing one side of a bidirectional message channel.
3331///
3332/// `Channel` represents a single endpoint's view of a bidirectional communication channel.
3333/// Each endpoint has:
3334/// - `rx`: A receiver for incoming messages (or errors) from the counterpart
3335/// - `tx`: A sender for outgoing messages (or errors) to the counterpart
3336///
3337/// # Example
3338///
3339/// ```no_run
3340/// # use agent_client_protocol::UntypedRole;
3341/// # use agent_client_protocol::{Channel, Builder};
3342/// # async fn example() -> Result<(), agent_client_protocol::Error> {
3343/// // Create a pair of connected channels
3344/// let (channel_a, channel_b) = Channel::duplex();
3345///
3346/// // Each channel can be used by a different component
3347/// UntypedRole.builder()
3348/// .name("connection-a")
3349/// .connect_to(channel_a)
3350/// .await?;
3351/// # Ok(())
3352/// # }
3353/// ```
3354#[derive(Debug)]
3355pub struct Channel {
3356 /// Receives messages (or errors) from the counterpart.
3357 pub rx: mpsc::UnboundedReceiver<Result<jsonrpcmsg::Message, crate::Error>>,
3358 /// Sends messages (or errors) to the counterpart.
3359 pub tx: mpsc::UnboundedSender<Result<jsonrpcmsg::Message, crate::Error>>,
3360}
3361
3362impl Channel {
3363 /// Create a pair of connected channel endpoints.
3364 ///
3365 /// Returns two `Channel` instances that are connected to each other:
3366 /// - Messages sent via `channel_a.tx` are received on `channel_b.rx`
3367 /// - Messages sent via `channel_b.tx` are received on `channel_a.rx`
3368 ///
3369 /// # Returns
3370 ///
3371 /// A tuple `(channel_a, channel_b)` of connected channel endpoints.
3372 #[must_use]
3373 pub fn duplex() -> (Self, Self) {
3374 // Create channels: A sends Result<Message> which B receives as Message
3375 let (a_tx, b_rx) = mpsc::unbounded();
3376 let (b_tx, a_rx) = mpsc::unbounded();
3377
3378 let channel_a = Self { rx: a_rx, tx: a_tx };
3379 let channel_b = Self { rx: b_rx, tx: b_tx };
3380
3381 (channel_a, channel_b)
3382 }
3383
3384 /// Copy messages from `rx` to `tx`.
3385 ///
3386 /// # Returns
3387 ///
3388 /// A `Result` indicating success or failure.
3389 pub async fn copy(mut self) -> Result<(), crate::Error> {
3390 while let Some(msg) = self.rx.next().await {
3391 self.tx
3392 .unbounded_send(msg)
3393 .map_err(crate::util::internal_error)?;
3394 }
3395 Ok(())
3396 }
3397}
3398
3399impl<R: Role> ConnectTo<R> for Channel {
3400 async fn connect_to(self, client: impl ConnectTo<R::Counterpart>) -> Result<(), crate::Error> {
3401 let (client_channel, client_serve) = client.into_channel_and_future();
3402
3403 match futures::try_join!(
3404 Channel {
3405 rx: client_channel.rx,
3406 tx: self.tx
3407 }
3408 .copy(),
3409 Channel {
3410 rx: self.rx,
3411 tx: client_channel.tx
3412 }
3413 .copy(),
3414 client_serve
3415 ) {
3416 Ok(((), (), ())) => Ok(()),
3417 Err(err) => Err(err),
3418 }
3419 }
3420
3421 fn into_channel_and_future(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
3422 (self, Box::pin(future::ready(Ok(()))))
3423 }
3424}