agent_client_protocol/jsonrpc.rs
1//! Core JSON-RPC server support.
2
3use agent_client_protocol_schema::SessionId;
4// Re-export jsonrpcmsg for use in public API
5pub use jsonrpcmsg;
6
7// Types re-exported from crate root
8use serde::{Deserialize, Serialize};
9use std::fmt::Debug;
10use std::panic::Location;
11use std::pin::pin;
12use uuid::Uuid;
13
14use boxfnonce::SendBoxFnOnce;
15use futures::channel::{mpsc, oneshot};
16use futures::future::{self, BoxFuture, Either};
17use futures::{AsyncRead, AsyncWrite, StreamExt};
18
19mod dynamic_handler;
20pub(crate) mod handlers;
21mod incoming_actor;
22mod outgoing_actor;
23pub(crate) mod run;
24mod task_actor;
25mod transport_actor;
26
27use crate::jsonrpc::dynamic_handler::DynamicHandlerMessage;
28pub use crate::jsonrpc::handlers::NullHandler;
29use crate::jsonrpc::handlers::{ChainedHandler, NamedHandler};
30use crate::jsonrpc::handlers::{MessageHandler, NotificationHandler, RequestHandler};
31use crate::jsonrpc::outgoing_actor::{OutgoingMessageTx, send_raw_message};
32use crate::jsonrpc::run::SpawnedRun;
33use crate::jsonrpc::run::{ChainRun, NullRun, RunWithConnectionTo};
34use crate::jsonrpc::task_actor::{Task, TaskTx};
35use crate::mcp_server::McpServer;
36use crate::role::HasPeer;
37use crate::role::Role;
38use crate::util::json_cast;
39use crate::{Agent, Client, ConnectTo, RoleId};
40
41/// Handlers process incoming JSON-RPC messages on a connection.
42///
43/// When messages arrive, they flow through a chain of handlers. Each handler can
44/// either **claim** the message (handle it) or **decline** it (pass to the next handler).
45///
46/// # Message Flow
47///
48/// Messages flow through three layers of handlers in order:
49///
50/// ```text
51/// ┌─────────────────────────────────────────────────────────────────┐
52/// │ Incoming Message │
53/// └─────────────────────────────────────────────────────────────────┘
54/// │
55/// ▼
56/// ┌─────────────────────────────────────────────────────────────────┐
57/// │ 1. User Handlers (registered via on_receive_request, etc.) │
58/// │ - Tried in registration order │
59/// │ - First handler to return Handled::Yes claims the message │
60/// └─────────────────────────────────────────────────────────────────┘
61/// │ Handled::No
62/// ▼
63/// ┌─────────────────────────────────────────────────────────────────┐
64/// │ 2. Dynamic Handlers (added at runtime) │
65/// │ - Used for session-specific message handling │
66/// │ - Added via ConnectionTo::add_dynamic_handler │
67/// └─────────────────────────────────────────────────────────────────┘
68/// │ Handled::No
69/// ▼
70/// ┌─────────────────────────────────────────────────────────────────┐
71/// │ 3. Role Default Handler │
72/// │ - Fallback based on the connection's Role │
73/// │ - Handles protocol-level messages (e.g., proxy forwarding) │
74/// └─────────────────────────────────────────────────────────────────┘
75/// │ Handled::No
76/// ▼
77/// ┌─────────────────────────────────────────────────────────────────┐
78/// │ Unhandled: Error response sent (or queued if retry=true) │
79/// └─────────────────────────────────────────────────────────────────┘
80/// ```
81///
82/// # The `Handled` Return Value
83///
84/// Each handler returns [`Handled`] to indicate whether it processed the message:
85///
86/// - **`Handled::Yes`** - Message was handled. No further handlers are invoked.
87/// - **`Handled::No { message, retry }`** - Message was not handled. The message
88/// (possibly modified) is passed to the next handler in the chain.
89///
90/// For convenience, handlers can return `()` which is equivalent to `Handled::Yes`.
91///
92/// # The Retry Mechanism
93///
94/// The `retry` flag in `Handled::No` controls what happens when no handler claims a message:
95///
96/// - **`retry: false`** (default) - Send a "method not found" error response immediately.
97/// - **`retry: true`** - Queue the message and retry it when new dynamic handlers are added.
98///
99/// This mechanism exists because of a timing issue with sessions: when a `session/new`
100/// response is being processed, the dynamic handler for that session hasn't been registered
101/// yet, but `session/update` notifications for that session may already be arriving.
102/// By setting `retry: true`, these early notifications are queued until the session's
103/// dynamic handler is added.
104///
105/// # Handler Registration
106///
107/// Most users register handlers using the builder methods on [`Builder`]:
108///
109/// ```
110/// # use agent_client_protocol::{Agent, Client, ConnectTo};
111/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, AgentCapabilities};
112/// # use agent_client_protocol_test::StatusUpdate;
113/// # async fn example(transport: impl ConnectTo<Agent>) -> Result<(), agent_client_protocol::Error> {
114/// Agent.builder()
115/// .on_receive_request(async |req: InitializeRequest, responder, cx| {
116/// responder.respond(
117/// InitializeResponse::new(req.protocol_version)
118/// .agent_capabilities(AgentCapabilities::new()),
119/// )
120/// }, agent_client_protocol::on_receive_request!())
121/// .on_receive_notification(async |notif: StatusUpdate, cx| {
122/// // Process notification
123/// Ok(())
124/// }, agent_client_protocol::on_receive_notification!())
125/// .connect_to(transport)
126/// .await?;
127/// # Ok(())
128/// # }
129/// ```
130///
131/// The type parameter on the closure determines which messages are dispatched to it.
132/// Messages that don't match the type are automatically passed to the next handler.
133///
134/// # Implementing Custom Handlers
135///
136/// For advanced use cases, you can implement `HandleMessageAs` directly:
137///
138/// ```ignore
139/// struct MyHandler;
140///
141/// impl HandleMessageAs<Agent> for MyHandler {
142///
143/// async fn handle_dispatch(
144/// &mut self,
145/// message: Dispatch,
146/// cx: ConnectionTo<Self::Role>,
147/// ) -> Result<Handled<Dispatch>, Error> {
148/// if message.method() == "my/custom/method" {
149/// // Handle it
150/// Ok(Handled::Yes)
151/// } else {
152/// // Pass to next handler
153/// Ok(Handled::No { message, retry: false })
154/// }
155/// }
156///
157/// fn describe_chain(&self) -> impl std::fmt::Debug {
158/// "MyHandler"
159/// }
160/// }
161/// ```
162///
163/// # Important: Handlers Must Not Block
164///
165/// The connection processes messages on a single async task. While a handler is running,
166/// no other messages can be processed. For expensive operations, use [`ConnectionTo::spawn`]
167/// to run work concurrently:
168///
169/// ```
170/// # use agent_client_protocol::{Client, Agent, ConnectTo};
171/// # use agent_client_protocol_test::{expensive_operation, ProcessComplete};
172/// # async fn example(transport: impl ConnectTo<Client>) -> Result<(), agent_client_protocol::Error> {
173/// # Client.builder().connect_with(transport, async |cx| {
174/// cx.spawn({
175/// let connection = cx.clone();
176/// async move {
177/// let result = expensive_operation("data").await?;
178/// connection.send_notification(ProcessComplete { result })?;
179/// Ok(())
180/// }
181/// })?;
182/// # Ok(())
183/// # }).await?;
184/// # Ok(())
185/// # }
186/// ```
187#[allow(async_fn_in_trait)]
188/// A handler for incoming JSON-RPC messages.
189///
190/// This trait is implemented by types that can process incoming messages on a connection.
191/// Handlers are registered with a [`Builder`] and are called in order until
192/// one claims the message.
193///
194/// The type parameter `R` is the role this handler plays - who I am.
195/// For an agent handler, `R = Agent` (I handle messages as an agent).
196/// For a client handler, `R = Client` (I handle messages as a client).
197pub trait HandleDispatchFrom<Counterpart: Role>: Send {
198 /// Attempt to claim an incoming message (request or notification).
199 ///
200 /// # Important: do not block
201 ///
202 /// The server will not process new messages until this handler returns.
203 /// You should avoid blocking in this callback unless you wish to block the server (e.g., for rate limiting).
204 /// The recommended approach to manage expensive operations is to the [`ConnectionTo::spawn`] method available on the message context.
205 ///
206 /// # Parameters
207 ///
208 /// * `message` - The incoming message to handle.
209 /// * `connection` - The connection, used to send messages and access connection state.
210 ///
211 /// # Returns
212 ///
213 /// * `Ok(Handled::Yes)` if the message was claimed. It will not be propagated further.
214 /// * `Ok(Handled::No(message))` if not; the (possibly changed) message will be passed to the remaining handlers.
215 /// * `Err` if an internal error occurs (this will bring down the server).
216 fn handle_dispatch_from(
217 &mut self,
218 message: Dispatch,
219 connection: ConnectionTo<Counterpart>,
220 ) -> impl Future<Output = Result<Handled<Dispatch>, crate::Error>> + Send;
221
222 /// Returns a debug description of the registered handlers for diagnostics.
223 fn describe_chain(&self) -> impl std::fmt::Debug;
224}
225
226impl<Counterpart: Role, H> HandleDispatchFrom<Counterpart> for &mut H
227where
228 H: HandleDispatchFrom<Counterpart>,
229{
230 fn handle_dispatch_from(
231 &mut self,
232 message: Dispatch,
233 cx: ConnectionTo<Counterpart>,
234 ) -> impl Future<Output = Result<Handled<Dispatch>, crate::Error>> + Send {
235 H::handle_dispatch_from(self, message, cx)
236 }
237
238 fn describe_chain(&self) -> impl std::fmt::Debug {
239 H::describe_chain(self)
240 }
241}
242
243/// A JSON-RPC connection that can act as either a server, client, or both.
244///
245/// [`Builder`] provides a builder-style API for creating JSON-RPC servers and clients.
246/// You start by calling `Role.builder()` (e.g., `Client.builder()`), then add message
247/// handlers, and finally drive the connection with either [`connect_to`](Builder::connect_to)
248/// or [`connect_with`](Builder::connect_with), providing a component implementation
249/// (e.g., [`ByteStreams`] for byte streams).
250///
251/// # JSON-RPC Primer
252///
253/// JSON-RPC 2.0 has two fundamental message types:
254///
255/// * **Requests** - Messages that expect a response. They have an `id` field that gets
256/// echoed back in the response so the sender can correlate them.
257/// * **Notifications** - Fire-and-forget messages with no `id` field. The sender doesn't
258/// expect or receive a response.
259///
260/// # Type-Driven Message Dispatch
261///
262/// The handler registration methods use Rust's type system to determine which messages
263/// to handle. The type parameter you provide controls what gets dispatched to your handler:
264///
265/// ## Single Message Types
266///
267/// The simplest case - handle one specific message type:
268///
269/// ```no_run
270/// # use agent_client_protocol_test::*;
271/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, SessionNotification};
272/// # async fn example() -> Result<(), agent_client_protocol::Error> {
273/// # let connection = mock_connection();
274/// connection
275/// .on_receive_request(async |req: InitializeRequest, responder, cx| {
276/// // Handle only InitializeRequest messages
277/// responder.respond(InitializeResponse::make())
278/// }, agent_client_protocol::on_receive_request!())
279/// .on_receive_notification(async |notif: SessionNotification, cx| {
280/// // Handle only SessionUpdate notifications
281/// Ok(())
282/// }, agent_client_protocol::on_receive_notification!())
283/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
284/// # Ok(())
285/// # }
286/// ```
287///
288/// ## Enum Message Types
289///
290/// You can also handle multiple related messages with a single handler by defining an enum
291/// that implements the appropriate trait ([`JsonRpcRequest`] or [`JsonRpcNotification`]):
292///
293/// ```no_run
294/// # use agent_client_protocol_test::*;
295/// # use agent_client_protocol::{JsonRpcRequest, JsonRpcMessage, UntypedMessage};
296/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse};
297/// # async fn example() -> Result<(), agent_client_protocol::Error> {
298/// # let connection = mock_connection();
299/// // Define an enum for multiple request types
300/// #[derive(Debug, Clone)]
301/// enum MyRequests {
302/// Initialize(InitializeRequest),
303/// Prompt(PromptRequest),
304/// }
305///
306/// // Implement JsonRpcRequest for your enum
307/// # impl JsonRpcMessage for MyRequests {
308/// # fn matches_method(_method: &str) -> bool { false }
309/// # fn method(&self) -> &str { "myRequests" }
310/// # fn to_untyped_message(&self) -> Result<UntypedMessage, agent_client_protocol::Error> { todo!() }
311/// # fn parse_message(_method: &str, _params: &impl serde::Serialize) -> Result<Self, agent_client_protocol::Error> { Err(agent_client_protocol::Error::method_not_found()) }
312/// # }
313/// impl JsonRpcRequest for MyRequests { type Response = serde_json::Value; }
314///
315/// // Handle all variants in one place
316/// connection.on_receive_request(async |req: MyRequests, responder, cx| {
317/// match req {
318/// MyRequests::Initialize(init) => { responder.respond(serde_json::json!({})) }
319/// MyRequests::Prompt(prompt) => { responder.respond(serde_json::json!({})) }
320/// }
321/// }, agent_client_protocol::on_receive_request!())
322/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
323/// # Ok(())
324/// # }
325/// ```
326///
327/// ## Mixed Message Types
328///
329/// For enums containing both requests AND notifications, use [`on_receive_dispatch`](Self::on_receive_dispatch):
330///
331/// ```no_run
332/// # use agent_client_protocol_test::*;
333/// # use agent_client_protocol::Dispatch;
334/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, SessionNotification};
335/// # async fn example() -> Result<(), agent_client_protocol::Error> {
336/// # let connection = mock_connection();
337/// // on_receive_dispatch receives Dispatch which can be either a request or notification
338/// connection.on_receive_dispatch(async |msg: Dispatch<InitializeRequest, SessionNotification>, _cx| {
339/// match msg {
340/// Dispatch::Request(req, responder) => {
341/// responder.respond(InitializeResponse::make())
342/// }
343/// Dispatch::Notification(notif) => {
344/// Ok(())
345/// }
346/// Dispatch::Response(result, router) => {
347/// // Forward response to its destination
348/// router.respond_with_result(result)
349/// }
350/// }
351/// }, agent_client_protocol::on_receive_dispatch!())
352/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
353/// # Ok(())
354/// # }
355/// ```
356///
357/// # Handler Registration
358///
359/// Register handlers using these methods (listed from most common to most flexible):
360///
361/// * [`on_receive_request`](Self::on_receive_request) - Handle JSON-RPC requests (messages expecting responses)
362/// * [`on_receive_notification`](Self::on_receive_notification) - Handle JSON-RPC notifications (fire-and-forget)
363/// * [`on_receive_dispatch`](Self::on_receive_dispatch) - Handle enums containing both requests and notifications
364/// * [`with_handler`](Self::with_handler) - Low-level primitive for maximum flexibility
365///
366/// ## Handler Ordering
367///
368/// Handlers are tried in the order you register them. The first handler that claims a message
369/// (by matching its type) will process it. Subsequent handlers won't see that message:
370///
371/// ```no_run
372/// # use agent_client_protocol_test::*;
373/// # use agent_client_protocol::{Dispatch, UntypedMessage};
374/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse};
375/// # async fn example() -> Result<(), agent_client_protocol::Error> {
376/// # let connection = mock_connection();
377/// connection
378/// .on_receive_request(async |req: InitializeRequest, responder, cx| {
379/// // This runs first for InitializeRequest
380/// responder.respond(InitializeResponse::make())
381/// }, agent_client_protocol::on_receive_request!())
382/// .on_receive_request(async |req: PromptRequest, responder, cx| {
383/// // This runs first for PromptRequest
384/// responder.respond(PromptResponse::make())
385/// }, agent_client_protocol::on_receive_request!())
386/// .on_receive_dispatch(async |msg: Dispatch, cx| {
387/// // This runs for any message not handled above
388/// msg.respond_with_error(agent_client_protocol::util::internal_error("unknown method"), cx)
389/// }, agent_client_protocol::on_receive_dispatch!())
390/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
391/// # Ok(())
392/// # }
393/// ```
394///
395/// # Event Loop and Concurrency
396///
397/// Understanding the event loop is critical for writing correct handlers.
398///
399/// ## The Event Loop
400///
401/// [`Builder`] runs all handler callbacks on a single async task - the event loop.
402/// While a handler is running, **the server cannot receive new messages**. This means
403/// any blocking or expensive work in your handlers will stall the entire connection.
404///
405/// To avoid blocking the event loop, use [`ConnectionTo::spawn`] to offload serious
406/// work to concurrent tasks:
407///
408/// ```no_run
409/// # use agent_client_protocol_test::*;
410/// # async fn example() -> Result<(), agent_client_protocol::Error> {
411/// # let connection = mock_connection();
412/// connection.on_receive_request(async |req: AnalyzeRequest, responder, cx| {
413/// // Clone cx for the spawned task
414/// cx.spawn({
415/// let connection = cx.clone();
416/// async move {
417/// let result = expensive_analysis(&req.data).await?;
418/// connection.send_notification(AnalysisComplete { result })?;
419/// Ok(())
420/// }
421/// })?;
422///
423/// // Respond immediately without blocking
424/// responder.respond(AnalysisStarted { job_id: 42 })
425/// }, agent_client_protocol::on_receive_request!())
426/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
427/// # Ok(())
428/// # }
429/// ```
430///
431/// Note that the entire connection runs within one async task, so parallelism must be
432/// managed explicitly using [`spawn`](ConnectionTo::spawn).
433///
434/// ## The Connection Context
435///
436/// Handler callbacks receive a context object (`cx`) for interacting with the connection:
437///
438/// * **For request handlers** - [`Responder<R>`] provides [`respond`](Responder::respond)
439/// to send the response, plus methods to send other messages
440/// * **For notification handlers** - [`ConnectionTo`] provides methods to send messages
441/// and spawn tasks
442///
443/// Both context types support:
444/// * [`send_request`](ConnectionTo::send_request) - Send requests to the other side
445/// * [`send_notification`](ConnectionTo::send_notification) - Send notifications
446/// * [`spawn`](ConnectionTo::spawn) - Run tasks concurrently without blocking the event loop
447///
448/// The [`SentRequest`] returned by `send_request` provides methods like
449/// [`on_receiving_result`](SentRequest::on_receiving_result) that help you
450/// avoid accidentally blocking the event loop while waiting for responses.
451///
452/// # Driving the Connection
453///
454/// After adding handlers, you must drive the connection using one of two modes:
455///
456/// ## Server Mode: `connect_to()`
457///
458/// Use [`connect_to`](Self::connect_to) when you only need to respond to incoming messages:
459///
460/// ```no_run
461/// # use agent_client_protocol_test::*;
462/// # async fn example() -> Result<(), agent_client_protocol::Error> {
463/// # let connection = mock_connection();
464/// connection
465/// .on_receive_request(async |req: MyRequest, responder, cx| {
466/// responder.respond(MyResponse { status: "ok".into() })
467/// }, agent_client_protocol::on_receive_request!())
468/// .connect_to(MockTransport) // Runs until connection closes or error occurs
469/// .await?;
470/// # Ok(())
471/// # }
472/// ```
473///
474/// The connection will process incoming messages and invoke your handlers until the
475/// connection is closed or an error occurs.
476///
477/// ## Client Mode: `connect_with()`
478///
479/// Use [`connect_with`](Self::connect_with) when you need to both handle incoming messages
480/// AND send your own requests/notifications:
481///
482/// ```no_run
483/// # use agent_client_protocol_test::*;
484/// # use agent_client_protocol::schema::InitializeRequest;
485/// # async fn example() -> Result<(), agent_client_protocol::Error> {
486/// # let connection = mock_connection();
487/// connection
488/// .on_receive_request(async |req: MyRequest, responder, cx| {
489/// responder.respond(MyResponse { status: "ok".into() })
490/// }, agent_client_protocol::on_receive_request!())
491/// .connect_with(MockTransport, async |cx| {
492/// // You can send requests to the other side
493/// let response = cx.send_request(InitializeRequest::make())
494/// .block_task()
495/// .await?;
496///
497/// // And send notifications
498/// cx.send_notification(StatusUpdate { message: "ready".into() })?;
499///
500/// Ok(())
501/// })
502/// .await?;
503/// # Ok(())
504/// # }
505/// ```
506///
507/// The connection will serve incoming messages in the background while your client closure
508/// runs. When the closure returns, the connection shuts down.
509///
510/// # Example: Complete Agent
511///
512/// ```no_run
513/// # use agent_client_protocol::UntypedRole;
514/// # use agent_client_protocol::{Builder};
515/// # use agent_client_protocol::ByteStreams;
516/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse, SessionNotification};
517/// # use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
518/// # async fn example() -> Result<(), agent_client_protocol::Error> {
519/// let transport = ByteStreams::new(
520/// tokio::io::stdout().compat_write(),
521/// tokio::io::stdin().compat(),
522/// );
523///
524/// UntypedRole.builder()
525/// .name("my-agent") // Optional: for debugging logs
526/// .on_receive_request(async |init: InitializeRequest, responder, cx| {
527/// let response: InitializeResponse = todo!();
528/// responder.respond(response)
529/// }, agent_client_protocol::on_receive_request!())
530/// .on_receive_request(async |prompt: PromptRequest, responder, cx| {
531/// // You can send notifications while processing a request
532/// let notif: SessionNotification = todo!();
533/// cx.send_notification(notif)?;
534///
535/// // Then respond to the request
536/// let response: PromptResponse = todo!();
537/// responder.respond(response)
538/// }, agent_client_protocol::on_receive_request!())
539/// .connect_to(transport)
540/// .await?;
541/// # Ok(())
542/// # }
543/// ```
544#[must_use]
545#[derive(Debug)]
546pub struct Builder<Host: Role, Handler = NullHandler, Runner = NullRun>
547where
548 Handler: HandleDispatchFrom<Host::Counterpart>,
549 Runner: RunWithConnectionTo<Host::Counterpart>,
550{
551 /// My role.
552 host: Host,
553
554 /// Name of the connection, used in tracing logs.
555 name: Option<String>,
556
557 /// Handler for incoming messages.
558 handler: Handler,
559
560 /// Responder for background tasks.
561 responder: Runner,
562}
563
564impl<Host: Role> Builder<Host, NullHandler, NullRun> {
565 /// Create a new connection builder for the given role.
566 /// This type follows a builder pattern; use other methods to configure and then invoke
567 /// [`Self::connect_to`] (to use as a server) or [`Self::connect_with`] to use as a client.
568 pub fn new(role: Host) -> Self {
569 Self {
570 host: role,
571 name: None,
572 handler: NullHandler,
573 responder: NullRun,
574 }
575 }
576}
577
578impl<Host: Role, Handler> Builder<Host, Handler, NullRun>
579where
580 Handler: HandleDispatchFrom<Host::Counterpart>,
581{
582 /// Create a new connection builder with the given handler.
583 pub fn new_with(role: Host, handler: Handler) -> Self {
584 Self {
585 host: role,
586 name: None,
587 handler,
588 responder: NullRun,
589 }
590 }
591}
592
593impl<
594 Host: Role,
595 Handler: HandleDispatchFrom<Host::Counterpart>,
596 Runner: RunWithConnectionTo<Host::Counterpart>,
597> Builder<Host, Handler, Runner>
598{
599 /// Set the "name" of this connection -- used only for debugging logs.
600 pub fn name(mut self, name: impl ToString) -> Self {
601 self.name = Some(name.to_string());
602 self
603 }
604
605 /// Merge another [`Builder`] into this one.
606 ///
607 /// Prefer [`Self::on_receive_request`] or [`Self::on_receive_notification`].
608 /// This is a low-level method that is not intended for general use.
609 pub fn with_connection_builder(
610 self,
611 other: Builder<
612 Host,
613 impl HandleDispatchFrom<Host::Counterpart>,
614 impl RunWithConnectionTo<Host::Counterpart>,
615 >,
616 ) -> Builder<
617 Host,
618 impl HandleDispatchFrom<Host::Counterpart>,
619 impl RunWithConnectionTo<Host::Counterpart>,
620 > {
621 Builder {
622 host: self.host,
623 name: self.name,
624 handler: ChainedHandler::new(
625 self.handler,
626 NamedHandler::new(other.name, other.handler),
627 ),
628 responder: ChainRun::new(self.responder, other.responder),
629 }
630 }
631
632 /// Add a new [`HandleDispatchFrom`] to the chain.
633 ///
634 /// Prefer [`Self::on_receive_request`] or [`Self::on_receive_notification`].
635 /// This is a low-level method that is not intended for general use.
636 pub fn with_handler(
637 self,
638 handler: impl HandleDispatchFrom<Host::Counterpart>,
639 ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner> {
640 Builder {
641 host: self.host,
642 name: self.name,
643 handler: ChainedHandler::new(self.handler, handler),
644 responder: self.responder,
645 }
646 }
647
648 /// Add a new [`RunWithConnectionTo`] to the chain.
649 pub fn with_responder<Run1>(
650 self,
651 responder: Run1,
652 ) -> Builder<Host, Handler, impl RunWithConnectionTo<Host::Counterpart>>
653 where
654 Run1: RunWithConnectionTo<Host::Counterpart>,
655 {
656 Builder {
657 host: self.host,
658 name: self.name,
659 handler: self.handler,
660 responder: ChainRun::new(self.responder, responder),
661 }
662 }
663
664 /// Enqueue a task to run once the connection is actively serving traffic.
665 #[track_caller]
666 pub fn with_spawned<F, Fut>(
667 self,
668 task: F,
669 ) -> Builder<Host, Handler, impl RunWithConnectionTo<Host::Counterpart>>
670 where
671 F: FnOnce(ConnectionTo<Host::Counterpart>) -> Fut + Send,
672 Fut: Future<Output = Result<(), crate::Error>> + Send,
673 {
674 let location = Location::caller();
675 self.with_responder(SpawnedRun::new(location, task))
676 }
677
678 /// Register a handler for messages that can be either requests OR notifications.
679 ///
680 /// Use this when you want to handle an enum type that contains both request and
681 /// notification variants. Your handler receives a [`Dispatch<Req, Notif>`] which
682 /// is an enum with two variants:
683 ///
684 /// - `Dispatch::Request(request, responder)` - A request with its response context
685 /// - `Dispatch::Notification(notification)` - A notification
686 /// - `Dispatch::Response(result, router)` - A response to a request we sent
687 ///
688 /// # Example
689 ///
690 /// ```no_run
691 /// # use agent_client_protocol_test::*;
692 /// # use agent_client_protocol::Dispatch;
693 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
694 /// # let connection = mock_connection();
695 /// connection.on_receive_dispatch(async |message: Dispatch<MyRequest, StatusUpdate>, _cx| {
696 /// match message {
697 /// Dispatch::Request(req, responder) => {
698 /// // Handle request and send response
699 /// responder.respond(MyResponse { status: "ok".into() })
700 /// }
701 /// Dispatch::Notification(notif) => {
702 /// // Handle notification (no response needed)
703 /// Ok(())
704 /// }
705 /// Dispatch::Response(result, router) => {
706 /// // Forward response to its destination
707 /// router.respond_with_result(result)
708 /// }
709 /// }
710 /// }, agent_client_protocol::on_receive_dispatch!())
711 /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
712 /// # Ok(())
713 /// # }
714 /// ```
715 ///
716 /// For most use cases, prefer [`on_receive_request`](Self::on_receive_request) or
717 /// [`on_receive_notification`](Self::on_receive_notification) which provide cleaner APIs
718 /// for handling requests or notifications separately.
719 ///
720 /// # Ordering
721 ///
722 /// This callback runs inside the dispatch loop and blocks further message processing
723 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
724 /// ordering guarantees and how to avoid deadlocks.
725 pub fn on_receive_dispatch<Req, Notif, F, T, ToFut>(
726 self,
727 op: F,
728 to_future_hack: ToFut,
729 ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
730 where
731 Host::Counterpart: HasPeer<Host::Counterpart>,
732 Req: JsonRpcRequest,
733 Notif: JsonRpcNotification,
734 F: AsyncFnMut(
735 Dispatch<Req, Notif>,
736 ConnectionTo<Host::Counterpart>,
737 ) -> Result<T, crate::Error>
738 + Send,
739 T: IntoHandled<Dispatch<Req, Notif>>,
740 ToFut: Fn(
741 &mut F,
742 Dispatch<Req, Notif>,
743 ConnectionTo<Host::Counterpart>,
744 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
745 + Send
746 + Sync,
747 {
748 let handler = MessageHandler::new(
749 self.host.counterpart(),
750 self.host.counterpart(),
751 op,
752 to_future_hack,
753 );
754 self.with_handler(handler)
755 }
756
757 /// Register a handler for JSON-RPC requests of type `Req`.
758 ///
759 /// Your handler receives two arguments:
760 /// 1. The request (type `Req`)
761 /// 2. A [`Responder<R, Req::Response>`] for sending the response
762 ///
763 /// The request context allows you to:
764 /// - Send the response with [`Responder::respond`]
765 /// - Send notifications to the client with [`ConnectionTo::send_notification`]
766 /// - Send requests to the client with [`ConnectionTo::send_request`]
767 ///
768 /// # Example
769 ///
770 /// ```ignore
771 /// # use agent_client_protocol::UntypedRole;
772 /// # use agent_client_protocol::{Builder};
773 /// # use agent_client_protocol::schema::{PromptRequest, PromptResponse, SessionNotification};
774 /// # fn example<R: agent_client_protocol::Role>(connection: Builder<R, impl agent_client_protocol::HandleMessageAs<R>>) {
775 /// connection.on_receive_request(async |request: PromptRequest, responder, cx| {
776 /// // Send a notification while processing
777 /// let notif: SessionNotification = todo!();
778 /// cx.send_notification(notif)?;
779 ///
780 /// // Do some work...
781 /// let result = todo!("process the prompt");
782 ///
783 /// // Send the response
784 /// let response: PromptResponse = todo!();
785 /// responder.respond(response)
786 /// }, agent_client_protocol::on_receive_request!());
787 /// # }
788 /// ```
789 ///
790 /// # Type Parameter
791 ///
792 /// `Req` can be either a single request type or an enum of multiple request types.
793 /// See the [type-driven dispatch](Self#type-driven-message-dispatch) section for details.
794 ///
795 /// # Ordering
796 ///
797 /// This callback runs inside the dispatch loop and blocks further message processing
798 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
799 /// ordering guarantees and how to avoid deadlocks.
800 pub fn on_receive_request<Req: JsonRpcRequest, F, T, ToFut>(
801 self,
802 op: F,
803 to_future_hack: ToFut,
804 ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
805 where
806 Host::Counterpart: HasPeer<Host::Counterpart>,
807 F: AsyncFnMut(
808 Req,
809 Responder<Req::Response>,
810 ConnectionTo<Host::Counterpart>,
811 ) -> Result<T, crate::Error>
812 + Send,
813 T: IntoHandled<(Req, Responder<Req::Response>)>,
814 ToFut: Fn(
815 &mut F,
816 Req,
817 Responder<Req::Response>,
818 ConnectionTo<Host::Counterpart>,
819 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
820 + Send
821 + Sync,
822 {
823 let handler = RequestHandler::new(
824 self.host.counterpart(),
825 self.host.counterpart(),
826 op,
827 to_future_hack,
828 );
829 self.with_handler(handler)
830 }
831
832 /// Register a handler for JSON-RPC notifications of type `Notif`.
833 ///
834 /// Notifications are fire-and-forget messages that don't expect a response.
835 /// Your handler receives:
836 /// 1. The notification (type `Notif`)
837 /// 2. A [`ConnectionTo<R>`] for sending messages to the other side
838 ///
839 /// Unlike request handlers, you cannot send a response (notifications don't have IDs),
840 /// but you can still send your own requests and notifications using the context.
841 ///
842 /// # Example
843 ///
844 /// ```no_run
845 /// # use agent_client_protocol_test::*;
846 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
847 /// # let connection = mock_connection();
848 /// connection.on_receive_notification(async |notif: SessionUpdate, cx| {
849 /// // Process the notification
850 /// update_session_state(¬if)?;
851 ///
852 /// // Optionally send a notification back
853 /// cx.send_notification(StatusUpdate {
854 /// message: "Acknowledged".into(),
855 /// })?;
856 ///
857 /// Ok(())
858 /// }, agent_client_protocol::on_receive_notification!())
859 /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
860 /// # Ok(())
861 /// # }
862 /// ```
863 ///
864 /// # Type Parameter
865 ///
866 /// `Notif` can be either a single notification type or an enum of multiple notification types.
867 /// See the [type-driven dispatch](Self#type-driven-message-dispatch) section for details.
868 ///
869 /// # Ordering
870 ///
871 /// This callback runs inside the dispatch loop and blocks further message processing
872 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
873 /// ordering guarantees and how to avoid deadlocks.
874 pub fn on_receive_notification<Notif, F, T, ToFut>(
875 self,
876 op: F,
877 to_future_hack: ToFut,
878 ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
879 where
880 Host::Counterpart: HasPeer<Host::Counterpart>,
881 Notif: JsonRpcNotification,
882 F: AsyncFnMut(Notif, ConnectionTo<Host::Counterpart>) -> Result<T, crate::Error> + Send,
883 T: IntoHandled<(Notif, ConnectionTo<Host::Counterpart>)>,
884 ToFut: Fn(
885 &mut F,
886 Notif,
887 ConnectionTo<Host::Counterpart>,
888 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
889 + Send
890 + Sync,
891 {
892 let handler = NotificationHandler::new(
893 self.host.counterpart(),
894 self.host.counterpart(),
895 op,
896 to_future_hack,
897 );
898 self.with_handler(handler)
899 }
900
901 /// Register a handler for messages from a specific peer.
902 ///
903 /// This is similar to [`on_receive_dispatch`](Self::on_receive_dispatch), but allows
904 /// specifying the source peer explicitly. This is useful when receiving messages
905 /// from a peer that requires message transformation (e.g., unwrapping `SuccessorMessage`
906 /// envelopes when receiving from an agent via a proxy).
907 ///
908 /// For the common case of receiving from the default counterpart, use
909 /// [`on_receive_dispatch`](Self::on_receive_dispatch) instead.
910 ///
911 /// # Ordering
912 ///
913 /// This callback runs inside the dispatch loop and blocks further message processing
914 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
915 /// ordering guarantees and how to avoid deadlocks.
916 pub fn on_receive_dispatch_from<
917 Req: JsonRpcRequest,
918 Notif: JsonRpcNotification,
919 Peer: Role,
920 F,
921 T,
922 ToFut,
923 >(
924 self,
925 peer: Peer,
926 op: F,
927 to_future_hack: ToFut,
928 ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
929 where
930 Host::Counterpart: HasPeer<Peer>,
931 F: AsyncFnMut(
932 Dispatch<Req, Notif>,
933 ConnectionTo<Host::Counterpart>,
934 ) -> Result<T, crate::Error>
935 + Send,
936 T: IntoHandled<Dispatch<Req, Notif>>,
937 ToFut: Fn(
938 &mut F,
939 Dispatch<Req, Notif>,
940 ConnectionTo<Host::Counterpart>,
941 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
942 + Send
943 + Sync,
944 {
945 let handler = MessageHandler::new(self.host.counterpart(), peer, op, to_future_hack);
946 self.with_handler(handler)
947 }
948
949 /// Register a handler for JSON-RPC requests from a specific peer.
950 ///
951 /// This is similar to [`on_receive_request`](Self::on_receive_request), but allows
952 /// specifying the source peer explicitly. This is useful when receiving messages
953 /// from a peer that requires message transformation (e.g., unwrapping `SuccessorRequest`
954 /// envelopes when receiving from an agent via a proxy).
955 ///
956 /// For the common case of receiving from the default counterpart, use
957 /// [`on_receive_request`](Self::on_receive_request) instead.
958 ///
959 /// # Example
960 ///
961 /// ```ignore
962 /// use agent_client_protocol::Agent;
963 /// use agent_client_protocol::schema::InitializeRequest;
964 ///
965 /// // Conductor receiving from agent direction - messages will be unwrapped from SuccessorMessage
966 /// connection.on_receive_request_from(Agent, async |req: InitializeRequest, responder, cx| {
967 /// // Handle the request
968 /// responder.respond(InitializeResponse::make())
969 /// })
970 /// ```
971 ///
972 /// # Ordering
973 ///
974 /// This callback runs inside the dispatch loop and blocks further message processing
975 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
976 /// ordering guarantees and how to avoid deadlocks.
977 pub fn on_receive_request_from<Req: JsonRpcRequest, Peer: Role, F, T, ToFut>(
978 self,
979 peer: Peer,
980 op: F,
981 to_future_hack: ToFut,
982 ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
983 where
984 Host::Counterpart: HasPeer<Peer>,
985 F: AsyncFnMut(
986 Req,
987 Responder<Req::Response>,
988 ConnectionTo<Host::Counterpart>,
989 ) -> Result<T, crate::Error>
990 + Send,
991 T: IntoHandled<(Req, Responder<Req::Response>)>,
992 ToFut: Fn(
993 &mut F,
994 Req,
995 Responder<Req::Response>,
996 ConnectionTo<Host::Counterpart>,
997 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
998 + Send
999 + Sync,
1000 {
1001 let handler = RequestHandler::new(self.host.counterpart(), peer, op, to_future_hack);
1002 self.with_handler(handler)
1003 }
1004
1005 /// Register a handler for JSON-RPC notifications from a specific peer.
1006 ///
1007 /// This is similar to [`on_receive_notification`](Self::on_receive_notification), but allows
1008 /// specifying the source peer explicitly. This is useful when receiving messages
1009 /// from a peer that requires message transformation (e.g., unwrapping `SuccessorNotification`
1010 /// envelopes when receiving from an agent via a proxy).
1011 ///
1012 /// For the common case of receiving from the default counterpart, use
1013 /// [`on_receive_notification`](Self::on_receive_notification) instead.
1014 ///
1015 /// # Ordering
1016 ///
1017 /// This callback runs inside the dispatch loop and blocks further message processing
1018 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
1019 /// ordering guarantees and how to avoid deadlocks.
1020 pub fn on_receive_notification_from<Notif: JsonRpcNotification, Peer: Role, F, T, ToFut>(
1021 self,
1022 peer: Peer,
1023 op: F,
1024 to_future_hack: ToFut,
1025 ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
1026 where
1027 Host::Counterpart: HasPeer<Peer>,
1028 F: AsyncFnMut(Notif, ConnectionTo<Host::Counterpart>) -> Result<T, crate::Error> + Send,
1029 T: IntoHandled<(Notif, ConnectionTo<Host::Counterpart>)>,
1030 ToFut: Fn(
1031 &mut F,
1032 Notif,
1033 ConnectionTo<Host::Counterpart>,
1034 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
1035 + Send
1036 + Sync,
1037 {
1038 let handler = NotificationHandler::new(self.host.counterpart(), peer, op, to_future_hack);
1039 self.with_handler(handler)
1040 }
1041
1042 /// Add an MCP server that will be added to all new sessions that are proxied through this connection.
1043 ///
1044 /// Only applicable to proxies.
1045 pub fn with_mcp_server(
1046 self,
1047 mcp_server: McpServer<Host::Counterpart, impl RunWithConnectionTo<Host::Counterpart>>,
1048 ) -> Builder<
1049 Host,
1050 impl HandleDispatchFrom<Host::Counterpart>,
1051 impl RunWithConnectionTo<Host::Counterpart>,
1052 >
1053 where
1054 Host::Counterpart: HasPeer<Agent> + HasPeer<Client>,
1055 {
1056 let (handler, responder) = mcp_server.into_handler_and_responder();
1057 self.with_handler(handler).with_responder(responder)
1058 }
1059
1060 /// Run in server mode with the provided transport.
1061 ///
1062 /// This drives the connection by continuously processing messages from the transport
1063 /// and dispatching them to your registered handlers. The connection will run until:
1064 /// - The transport closes (e.g., EOF on byte streams)
1065 /// - An error occurs
1066 /// - One of your handlers returns an error
1067 ///
1068 /// The transport is responsible for serializing and deserializing `jsonrpcmsg::Message`
1069 /// values to/from the underlying I/O mechanism (byte streams, channels, etc.).
1070 ///
1071 /// Use this mode when you only need to respond to incoming messages and don't need
1072 /// to initiate your own requests. If you need to send requests to the other side,
1073 /// use [`connect_with`](Self::connect_with) instead.
1074 ///
1075 /// # Example: Byte Stream Transport
1076 ///
1077 /// ```no_run
1078 /// # use agent_client_protocol::UntypedRole;
1079 /// # use agent_client_protocol::{Builder};
1080 /// # use agent_client_protocol::ByteStreams;
1081 /// # use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
1082 /// # use agent_client_protocol_test::*;
1083 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
1084 /// let transport = ByteStreams::new(
1085 /// tokio::io::stdout().compat_write(),
1086 /// tokio::io::stdin().compat(),
1087 /// );
1088 ///
1089 /// UntypedRole.builder()
1090 /// .on_receive_request(async |req: MyRequest, responder, cx| {
1091 /// responder.respond(MyResponse { status: "ok".into() })
1092 /// }, agent_client_protocol::on_receive_request!())
1093 /// .connect_to(transport)
1094 /// .await?;
1095 /// # Ok(())
1096 /// # }
1097 /// ```
1098 pub async fn connect_to(
1099 self,
1100 transport: impl ConnectTo<Host> + 'static,
1101 ) -> Result<(), crate::Error> {
1102 self.connect_with(transport, async move |_cx| future::pending().await)
1103 .await
1104 }
1105
1106 /// Run the connection until the provided closure completes.
1107 ///
1108 /// This drives the connection by:
1109 /// 1. Running your registered handlers in the background to process incoming messages
1110 /// 2. Executing your `main_fn` closure with a [`ConnectionTo<R>`] for sending requests/notifications
1111 ///
1112 /// The connection stays active until your `main_fn` returns, then shuts down gracefully.
1113 /// If the connection closes unexpectedly before `main_fn` completes, this returns an error.
1114 ///
1115 /// Use this mode when you need to initiate communication (send requests/notifications)
1116 /// in addition to responding to incoming messages. For server-only mode where you just
1117 /// respond to messages, use [`connect_to`](Self::connect_to) instead.
1118 ///
1119 /// # Example
1120 ///
1121 /// ```no_run
1122 /// # use agent_client_protocol::UntypedRole;
1123 /// # use agent_client_protocol::{Builder};
1124 /// # use agent_client_protocol::ByteStreams;
1125 /// # use agent_client_protocol::schema::InitializeRequest;
1126 /// # use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
1127 /// # use agent_client_protocol_test::*;
1128 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
1129 /// let transport = ByteStreams::new(
1130 /// tokio::io::stdout().compat_write(),
1131 /// tokio::io::stdin().compat(),
1132 /// );
1133 ///
1134 /// UntypedRole.builder()
1135 /// .on_receive_request(async |req: MyRequest, responder, cx| {
1136 /// // Handle incoming requests in the background
1137 /// responder.respond(MyResponse { status: "ok".into() })
1138 /// }, agent_client_protocol::on_receive_request!())
1139 /// .connect_with(transport, async |cx| {
1140 /// // Initialize the protocol
1141 /// let init_response = cx.send_request(InitializeRequest::make())
1142 /// .block_task()
1143 /// .await?;
1144 ///
1145 /// // Send more requests...
1146 /// let result = cx.send_request(MyRequest {})
1147 /// .block_task()
1148 /// .await?;
1149 ///
1150 /// // When this closure returns, the connection shuts down
1151 /// Ok(())
1152 /// })
1153 /// .await?;
1154 /// # Ok(())
1155 /// # }
1156 /// ```
1157 ///
1158 /// # Parameters
1159 ///
1160 /// - `main_fn`: Your client logic. Receives a [`ConnectionTo<R>`] for sending messages.
1161 ///
1162 /// # Errors
1163 ///
1164 /// Returns an error if the connection closes before `main_fn` completes.
1165 pub async fn connect_with<R>(
1166 self,
1167 transport: impl ConnectTo<Host> + 'static,
1168 main_fn: impl AsyncFnOnce(ConnectionTo<Host::Counterpart>) -> Result<R, crate::Error>,
1169 ) -> Result<R, crate::Error> {
1170 let (_, future) = self.into_connection_and_future(transport, main_fn);
1171 future.await
1172 }
1173
1174 /// Helper that returns a [`ConnectionTo<R>`] and a future that runs this connection until `main_fn` returns.
1175 fn into_connection_and_future<R>(
1176 self,
1177 transport: impl ConnectTo<Host> + 'static,
1178 main_fn: impl AsyncFnOnce(ConnectionTo<Host::Counterpart>) -> Result<R, crate::Error>,
1179 ) -> (
1180 ConnectionTo<Host::Counterpart>,
1181 impl Future<Output = Result<R, crate::Error>>,
1182 ) {
1183 let Self {
1184 name,
1185 handler,
1186 responder,
1187 host: me,
1188 } = self;
1189
1190 let (outgoing_tx, outgoing_rx) = mpsc::unbounded();
1191 let (new_task_tx, new_task_rx) = mpsc::unbounded();
1192 let (dynamic_handler_tx, dynamic_handler_rx) = mpsc::unbounded();
1193 let connection = ConnectionTo::new(
1194 me.counterpart(),
1195 outgoing_tx,
1196 new_task_tx,
1197 dynamic_handler_tx,
1198 );
1199
1200 // Convert transport into server - this returns a channel for us to use
1201 // and a future that runs the transport
1202 let transport_component = crate::DynConnectTo::new(transport);
1203 let (transport_channel, transport_future) = transport_component.into_channel_and_future();
1204 let spawn_result = connection.spawn(transport_future);
1205
1206 // Destructure the channel endpoints
1207 let Channel {
1208 rx: transport_incoming_rx,
1209 tx: transport_outgoing_tx,
1210 } = transport_channel;
1211
1212 let (reply_tx, reply_rx) = mpsc::unbounded();
1213
1214 let future = crate::util::instrument_with_connection_name(name, {
1215 let connection = connection.clone();
1216 async move {
1217 let () = spawn_result?;
1218
1219 let background = async {
1220 futures::try_join!(
1221 // Protocol layer: OutgoingMessage → jsonrpcmsg::Message
1222 outgoing_actor::outgoing_protocol_actor(
1223 outgoing_rx,
1224 reply_tx.clone(),
1225 transport_outgoing_tx,
1226 ),
1227 // Protocol layer: jsonrpcmsg::Message → handler/reply routing
1228 incoming_actor::incoming_protocol_actor(
1229 me.counterpart(),
1230 &connection,
1231 transport_incoming_rx,
1232 dynamic_handler_rx,
1233 reply_rx,
1234 handler,
1235 ),
1236 task_actor::task_actor(new_task_rx, &connection),
1237 responder.run_with_connection_to(connection.clone()),
1238 )?;
1239 Ok(())
1240 };
1241
1242 crate::util::run_until(Box::pin(background), Box::pin(main_fn(connection.clone())))
1243 .await
1244 }
1245 });
1246
1247 (connection, future)
1248 }
1249}
1250
1251impl<R, H, Run> ConnectTo<R::Counterpart> for Builder<R, H, Run>
1252where
1253 R: Role,
1254 H: HandleDispatchFrom<R::Counterpart> + 'static,
1255 Run: RunWithConnectionTo<R::Counterpart> + 'static,
1256{
1257 async fn connect_to(self, client: impl ConnectTo<R>) -> Result<(), crate::Error> {
1258 Builder::connect_to(self, client).await
1259 }
1260}
1261
1262/// The payload sent through the response oneshot channel.
1263///
1264/// Includes the response value and an optional ack channel for dispatch loop
1265/// synchronization.
1266pub(crate) struct ResponsePayload {
1267 /// The response result - either the JSON value or an error.
1268 pub(crate) result: Result<serde_json::Value, crate::Error>,
1269
1270 /// Optional acknowledgment channel for dispatch loop synchronization.
1271 ///
1272 /// When present, the receiver must send on this channel to signal that
1273 /// response processing is complete, allowing the dispatch loop to continue
1274 /// to the next message.
1275 ///
1276 /// This is `None` for error paths where the response is sent directly
1277 /// (e.g., when the outgoing channel is broken) rather than through the
1278 /// normal dispatch loop flow.
1279 pub(crate) ack_tx: Option<oneshot::Sender<()>>,
1280}
1281
1282impl std::fmt::Debug for ResponsePayload {
1283 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1284 f.debug_struct("ResponsePayload")
1285 .field("result", &self.result)
1286 .field("ack_tx", &self.ack_tx.as_ref().map(|_| "..."))
1287 .finish()
1288 }
1289}
1290
1291/// Message sent to the incoming actor for reply subscription management.
1292enum ReplyMessage {
1293 /// Subscribe to receive a response for the given request id.
1294 /// When a response with this id arrives, it will be sent through the oneshot
1295 /// along with an ack channel that must be signaled when processing is complete.
1296 /// The method name is stored to allow routing responses through typed handlers.
1297 Subscribe {
1298 id: jsonrpcmsg::Id,
1299
1300 /// id of the peer this request was sent to
1301 role_id: RoleId,
1302
1303 /// (original) method of the request -- the actual request may have been transformed
1304 /// to a successor method, but this will reflect the method of the wrapped request
1305 method: String,
1306
1307 sender: oneshot::Sender<ResponsePayload>,
1308 },
1309}
1310
1311impl std::fmt::Debug for ReplyMessage {
1312 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1313 match self {
1314 ReplyMessage::Subscribe { id, method, .. } => f
1315 .debug_struct("Subscribe")
1316 .field("id", id)
1317 .field("method", method)
1318 .finish(),
1319 }
1320 }
1321}
1322
1323/// Messages send to be serialized over the transport.
1324#[derive(Debug)]
1325enum OutgoingMessage {
1326 /// Send a request to the server.
1327 Request {
1328 /// id assigned to this request (generated by sender)
1329 id: jsonrpcmsg::Id,
1330
1331 /// the original method
1332 method: String,
1333
1334 /// the peer we sent this to
1335 role_id: RoleId,
1336
1337 /// the message to send; this may have a distinct method
1338 /// depending on the peer
1339 untyped: UntypedMessage,
1340
1341 /// where to send the response when it arrives (includes ack channel)
1342 response_tx: oneshot::Sender<ResponsePayload>,
1343 },
1344
1345 /// Send a notification to the server.
1346 Notification {
1347 /// the message to send; this may have a distinct method
1348 /// depending on the peer
1349 untyped: UntypedMessage,
1350 },
1351
1352 /// Send a response to a message from the server
1353 Response {
1354 id: jsonrpcmsg::Id,
1355
1356 response: Result<serde_json::Value, crate::Error>,
1357 },
1358
1359 /// Send a generalized error message
1360 Error { error: crate::Error },
1361}
1362
1363/// Return type from JrHandler; indicates whether the request was handled or not.
1364#[must_use]
1365#[derive(Debug)]
1366pub enum Handled<T> {
1367 /// The message was handled
1368 Yes,
1369
1370 /// The message was not handled; returns the original value.
1371 ///
1372 /// If `retry` is true,
1373 No {
1374 /// The message to be passed to subsequent handlers
1375 /// (typically the original message, but it may have been
1376 /// mutated.)
1377 message: T,
1378
1379 /// If true, request the message to be queued and retried with
1380 /// dynamic handlers as they are added.
1381 ///
1382 /// This is used for managing session updates since the dynamic
1383 /// handler for a session cannot be added until the response to the
1384 /// new session request has been processed and there may be updates
1385 /// that get processed at the same time.
1386 retry: bool,
1387 },
1388}
1389
1390/// Trait for converting handler return values into [`Handled`].
1391///
1392/// This trait allows handlers to return either `()` (which becomes `Handled::Yes`)
1393/// or an explicit `Handled<T>` value for more control over handler propagation.
1394pub trait IntoHandled<T> {
1395 /// Convert this value into a `Handled<T>`.
1396 fn into_handled(self) -> Handled<T>;
1397}
1398
1399impl<T> IntoHandled<T> for () {
1400 fn into_handled(self) -> Handled<T> {
1401 Handled::Yes
1402 }
1403}
1404
1405impl<T> IntoHandled<T> for Handled<T> {
1406 fn into_handled(self) -> Handled<T> {
1407 self
1408 }
1409}
1410
1411/// Connection context for sending messages and spawning tasks.
1412///
1413/// This is the primary handle for interacting with the JSON-RPC connection from
1414/// within handler callbacks. You can use it to:
1415///
1416/// * Send requests and notifications to the other side
1417/// * Spawn concurrent tasks that run alongside the connection
1418/// * Respond to requests (via [`Responder`] which wraps this)
1419///
1420/// # Cloning
1421///
1422/// `ConnectionTo` is cheaply cloneable - all clones refer to the same underlying connection.
1423/// This makes it easy to share across async tasks.
1424///
1425/// # Event Loop and Concurrency
1426///
1427/// Handler callbacks run on the event loop, which means the connection cannot process new
1428/// messages while your handler is running. Use [`spawn`](Self::spawn) to offload any
1429/// expensive or blocking work to concurrent tasks.
1430///
1431/// See the [Event Loop and Concurrency](Builder#event-loop-and-concurrency) section
1432/// for more details.
1433#[derive(Clone, Debug)]
1434pub struct ConnectionTo<Counterpart: Role> {
1435 counterpart: Counterpart,
1436 message_tx: OutgoingMessageTx,
1437 task_tx: TaskTx,
1438 dynamic_handler_tx: mpsc::UnboundedSender<DynamicHandlerMessage<Counterpart>>,
1439}
1440
1441impl<Counterpart: Role> ConnectionTo<Counterpart> {
1442 fn new(
1443 counterpart: Counterpart,
1444 message_tx: mpsc::UnboundedSender<OutgoingMessage>,
1445 task_tx: mpsc::UnboundedSender<Task>,
1446 dynamic_handler_tx: mpsc::UnboundedSender<DynamicHandlerMessage<Counterpart>>,
1447 ) -> Self {
1448 Self {
1449 counterpart,
1450 message_tx,
1451 task_tx,
1452 dynamic_handler_tx,
1453 }
1454 }
1455
1456 /// Return the counterpart role this connection is talking to.
1457 pub fn counterpart(&self) -> Counterpart {
1458 self.counterpart.clone()
1459 }
1460
1461 /// Spawns a task that will run so long as the JSON-RPC connection is being served.
1462 ///
1463 /// This is the primary mechanism for offloading expensive work from handler callbacks
1464 /// to avoid blocking the event loop. Spawned tasks run concurrently with the connection,
1465 /// allowing the server to continue processing messages.
1466 ///
1467 /// # Event Loop
1468 ///
1469 /// Handler callbacks run on the event loop, which cannot process new messages while
1470 /// your handler is running. Use `spawn` for any expensive operations:
1471 ///
1472 /// ```no_run
1473 /// # use agent_client_protocol_test::*;
1474 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
1475 /// # let connection = mock_connection();
1476 /// connection.on_receive_request(async |req: ProcessRequest, responder, cx| {
1477 /// // Clone cx for the spawned task
1478 /// cx.spawn({
1479 /// let connection = cx.clone();
1480 /// async move {
1481 /// let result = expensive_operation(&req.data).await?;
1482 /// connection.send_notification(ProcessComplete { result })?;
1483 /// Ok(())
1484 /// }
1485 /// })?;
1486 ///
1487 /// // Respond immediately
1488 /// responder.respond(ProcessResponse { result: "started".into() })
1489 /// }, agent_client_protocol::on_receive_request!())
1490 /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
1491 /// # Ok(())
1492 /// # }
1493 /// ```
1494 ///
1495 /// # Errors
1496 ///
1497 /// If the spawned task returns an error, the entire server will shut down.
1498 #[track_caller]
1499 pub fn spawn(
1500 &self,
1501 task: impl IntoFuture<Output = Result<(), crate::Error>, IntoFuture: Send + 'static>,
1502 ) -> Result<(), crate::Error> {
1503 let location = std::panic::Location::caller();
1504 let task = task.into_future();
1505 Task::new(location, task).spawn(&self.task_tx)
1506 }
1507
1508 /// Spawn a JSON-RPC connection in the background and return a [`ConnectionTo`] for sending messages to it.
1509 ///
1510 /// This is useful for creating multiple connections that communicate with each other,
1511 /// such as implementing proxy patterns or connecting to multiple backend services.
1512 ///
1513 /// # Arguments
1514 ///
1515 /// - `builder`: The connection builder with handlers configured
1516 /// - `transport`: The transport component to connect to
1517 ///
1518 /// # Returns
1519 ///
1520 /// A `ConnectionTo` that you can use to send requests and notifications to the spawned connection.
1521 ///
1522 /// # Example: Proxying to a backend connection
1523 ///
1524 /// ```
1525 /// # use agent_client_protocol::UntypedRole;
1526 /// # use agent_client_protocol::{Builder, ConnectionTo};
1527 /// # use agent_client_protocol_test::*;
1528 /// # async fn example(cx: ConnectionTo<UntypedRole>) -> Result<(), agent_client_protocol::Error> {
1529 /// // Set up a backend connection builder
1530 /// let backend = UntypedRole.builder()
1531 /// .on_receive_request(async |req: MyRequest, responder, _cx| {
1532 /// responder.respond(MyResponse { status: "ok".into() })
1533 /// }, agent_client_protocol::on_receive_request!());
1534 ///
1535 /// // Spawn it and get a context to send requests to it
1536 /// let backend_connection = cx.spawn_connection(backend, MockTransport)?;
1537 ///
1538 /// // Now you can forward requests to the backend
1539 /// let response = backend_connection.send_request(MyRequest {}).block_task().await?;
1540 /// # Ok(())
1541 /// # }
1542 /// ```
1543 #[track_caller]
1544 pub fn spawn_connection<R: Role>(
1545 &self,
1546 builder: Builder<
1547 R,
1548 impl HandleDispatchFrom<R::Counterpart> + 'static,
1549 impl RunWithConnectionTo<R::Counterpart> + 'static,
1550 >,
1551 transport: impl ConnectTo<R> + 'static,
1552 ) -> Result<ConnectionTo<R::Counterpart>, crate::Error> {
1553 let (connection, future) =
1554 builder.into_connection_and_future(transport, |_| std::future::pending());
1555 Task::new(std::panic::Location::caller(), future).spawn(&self.task_tx)?;
1556 Ok(connection)
1557 }
1558
1559 /// Send a request/notification and forward the response appropriately.
1560 ///
1561 /// The request context's response type matches the request's response type,
1562 /// enabling type-safe message forwarding.
1563 pub fn send_proxied_message<Req: JsonRpcRequest<Response: Send>, Notif: JsonRpcNotification>(
1564 &self,
1565 message: Dispatch<Req, Notif>,
1566 ) -> Result<(), crate::Error>
1567 where
1568 Counterpart: HasPeer<Counterpart>,
1569 {
1570 self.send_proxied_message_to(self.counterpart(), message)
1571 }
1572
1573 /// Send a request/notification and forward the response appropriately.
1574 ///
1575 /// The request context's response type matches the request's response type,
1576 /// enabling type-safe message forwarding.
1577 pub fn send_proxied_message_to<
1578 Peer: Role,
1579 Req: JsonRpcRequest<Response: Send>,
1580 Notif: JsonRpcNotification,
1581 >(
1582 &self,
1583 peer: Peer,
1584 message: Dispatch<Req, Notif>,
1585 ) -> Result<(), crate::Error>
1586 where
1587 Counterpart: HasPeer<Peer>,
1588 {
1589 match message {
1590 Dispatch::Request(request, responder) => self
1591 .send_request_to(peer, request)
1592 .forward_response_to(responder),
1593 Dispatch::Notification(notification) => self.send_notification_to(peer, notification),
1594 Dispatch::Response(result, router) => {
1595 // Responses are forwarded directly to their destination
1596 router.respond_with_result(result)
1597 }
1598 }
1599 }
1600
1601 /// Send an outgoing request and return a [`SentRequest`] for handling the reply.
1602 ///
1603 /// The returned [`SentRequest`] provides methods for receiving the response without
1604 /// blocking the event loop:
1605 ///
1606 /// * [`on_receiving_result`](SentRequest::on_receiving_result) - Schedule
1607 /// a callback to run when the response arrives (doesn't block the event loop)
1608 /// * [`block_task`](SentRequest::block_task) - Block the current task until the response
1609 /// arrives (only safe in spawned tasks, not in handlers)
1610 ///
1611 /// # Anti-Footgun Design
1612 ///
1613 /// The API intentionally makes it difficult to block on the result directly to prevent
1614 /// the common mistake of blocking the event loop while waiting for a response:
1615 ///
1616 /// ```compile_fail
1617 /// # use agent_client_protocol_test::*;
1618 /// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
1619 /// // ❌ This doesn't compile - prevents blocking the event loop
1620 /// let response = cx.send_request(MyRequest {}).await?;
1621 /// # Ok(())
1622 /// # }
1623 /// ```
1624 ///
1625 /// ```no_run
1626 /// # use agent_client_protocol_test::*;
1627 /// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
1628 /// // ✅ Option 1: Schedule callback (safe in handlers)
1629 /// cx.send_request(MyRequest {})
1630 /// .on_receiving_result(async |result| {
1631 /// // Handle the response
1632 /// Ok(())
1633 /// })?;
1634 ///
1635 /// // ✅ Option 2: Block in spawned task (safe because task is concurrent)
1636 /// cx.spawn({
1637 /// let cx = cx.clone();
1638 /// async move {
1639 /// let response = cx.send_request(MyRequest {})
1640 /// .block_task()
1641 /// .await?;
1642 /// // Process response...
1643 /// Ok(())
1644 /// }
1645 /// })?;
1646 /// # Ok(())
1647 /// # }
1648 /// ```
1649 /// Send an outgoing request to the default counterpart peer.
1650 ///
1651 /// This is a convenience method that sends to the counterpart role `R`.
1652 /// For explicit control over the target peer, use [`send_request_to`](Self::send_request_to).
1653 pub fn send_request<Req: JsonRpcRequest>(&self, request: Req) -> SentRequest<Req::Response>
1654 where
1655 Counterpart: HasPeer<Counterpart>,
1656 {
1657 self.send_request_to(self.counterpart.clone(), request)
1658 }
1659
1660 /// Send an outgoing request to a specific peer.
1661 ///
1662 /// The message will be transformed according to the [`HasPeer`](crate::role::HasPeer)
1663 /// implementation before being sent.
1664 pub fn send_request_to<Peer: Role, Req: JsonRpcRequest>(
1665 &self,
1666 peer: Peer,
1667 request: Req,
1668 ) -> SentRequest<Req::Response>
1669 where
1670 Counterpart: HasPeer<Peer>,
1671 {
1672 let method = request.method().to_string();
1673 let id = jsonrpcmsg::Id::String(uuid::Uuid::new_v4().to_string());
1674 let (response_tx, response_rx) = oneshot::channel();
1675 let role_id = peer.role_id();
1676 let remote_style = self.counterpart.remote_style(peer);
1677 match remote_style.transform_outgoing_message(request) {
1678 Ok(untyped) => {
1679 // Transform the message for the target role
1680 let message = OutgoingMessage::Request {
1681 id: id.clone(),
1682 method: method.clone(),
1683 role_id,
1684 untyped,
1685 response_tx,
1686 };
1687
1688 match self.message_tx.unbounded_send(message) {
1689 Ok(()) => (),
1690 Err(error) => {
1691 let OutgoingMessage::Request {
1692 method,
1693 response_tx,
1694 ..
1695 } = error.into_inner()
1696 else {
1697 unreachable!();
1698 };
1699
1700 response_tx
1701 .send(ResponsePayload {
1702 result: Err(crate::util::internal_error(format!(
1703 "failed to send outgoing request `{method}"
1704 ))),
1705 ack_tx: None,
1706 })
1707 .unwrap();
1708 }
1709 }
1710 }
1711
1712 Err(err) => {
1713 response_tx
1714 .send(ResponsePayload {
1715 result: Err(crate::util::internal_error(format!(
1716 "failed to create untyped request for `{method}`: {err}"
1717 ))),
1718 ack_tx: None,
1719 })
1720 .unwrap();
1721 }
1722 }
1723
1724 SentRequest::new(id, method.clone(), self.task_tx.clone(), response_rx)
1725 .map(move |json| <Req::Response>::from_value(&method, json))
1726 }
1727
1728 /// Send an outgoing notification to the default counterpart peer (no reply expected).
1729 ///
1730 /// Notifications are fire-and-forget messages that don't have IDs and don't expect responses.
1731 /// This method sends the notification immediately and returns.
1732 ///
1733 /// This is a convenience method that sends to the counterpart role `R`.
1734 /// For explicit control over the target peer, use [`send_notification_to`](Self::send_notification_to).
1735 ///
1736 /// ```no_run
1737 /// # use agent_client_protocol_test::*;
1738 /// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::Agent>) -> Result<(), agent_client_protocol::Error> {
1739 /// cx.send_notification(StatusUpdate {
1740 /// message: "Processing...".into(),
1741 /// })?;
1742 /// # Ok(())
1743 /// # }
1744 /// ```
1745 pub fn send_notification<N: JsonRpcNotification>(
1746 &self,
1747 notification: N,
1748 ) -> Result<(), crate::Error>
1749 where
1750 Counterpart: HasPeer<Counterpart>,
1751 {
1752 self.send_notification_to(self.counterpart.clone(), notification)
1753 }
1754
1755 /// Send an outgoing notification to a specific peer (no reply expected).
1756 ///
1757 /// The message will be transformed according to the [`HasPeer`](crate::role::HasPeer)
1758 /// implementation before being sent.
1759 pub fn send_notification_to<Peer: Role, N: JsonRpcNotification>(
1760 &self,
1761 peer: Peer,
1762 notification: N,
1763 ) -> Result<(), crate::Error>
1764 where
1765 Counterpart: HasPeer<Peer>,
1766 {
1767 let remote_style = self.counterpart.remote_style(peer);
1768 tracing::debug!(
1769 role = std::any::type_name::<Counterpart>(),
1770 peer = std::any::type_name::<Peer>(),
1771 notification_type = std::any::type_name::<N>(),
1772 ?remote_style,
1773 original_method = notification.method(),
1774 "send_notification_to"
1775 );
1776 let transformed = remote_style.transform_outgoing_message(notification)?;
1777 tracing::debug!(
1778 transformed_method = %transformed.method,
1779 "send_notification_to transformed"
1780 );
1781 send_raw_message(
1782 &self.message_tx,
1783 OutgoingMessage::Notification {
1784 untyped: transformed,
1785 },
1786 )
1787 }
1788
1789 /// Send an error notification (no reply expected).
1790 pub fn send_error_notification(&self, error: crate::Error) -> Result<(), crate::Error> {
1791 send_raw_message(&self.message_tx, OutgoingMessage::Error { error })
1792 }
1793
1794 /// Register a dynamic message handler, used to intercept messages specific to a particular session
1795 /// or some similar modal thing.
1796 ///
1797 /// Dynamic message handlers are called first for every incoming message.
1798 ///
1799 /// If they decline to handle the message, then the message is passed to the regular registered handlers.
1800 ///
1801 /// The handler will stay registered until the returned registration guard is dropped.
1802 pub fn add_dynamic_handler(
1803 &self,
1804 handler: impl HandleDispatchFrom<Counterpart> + 'static,
1805 ) -> Result<DynamicHandlerRegistration<Counterpart>, crate::Error> {
1806 let uuid = Uuid::new_v4();
1807 self.dynamic_handler_tx
1808 .unbounded_send(DynamicHandlerMessage::AddDynamicHandler(
1809 uuid,
1810 Box::new(handler),
1811 ))
1812 .map_err(crate::util::internal_error)?;
1813
1814 Ok(DynamicHandlerRegistration::new(uuid, self.clone()))
1815 }
1816
1817 fn remove_dynamic_handler(&self, uuid: Uuid) {
1818 // Ignore errors
1819 drop(
1820 self.dynamic_handler_tx
1821 .unbounded_send(DynamicHandlerMessage::RemoveDynamicHandler(uuid)),
1822 );
1823 }
1824}
1825
1826#[derive(Clone, Debug)]
1827pub struct DynamicHandlerRegistration<R: Role> {
1828 uuid: Uuid,
1829 cx: ConnectionTo<R>,
1830}
1831
1832impl<R: Role> DynamicHandlerRegistration<R> {
1833 fn new(uuid: Uuid, cx: ConnectionTo<R>) -> Self {
1834 Self { uuid, cx }
1835 }
1836
1837 /// Prevents the dynamic handler from being removed when dropped.
1838 pub fn run_indefinitely(self) {
1839 std::mem::forget(self);
1840 }
1841}
1842
1843impl<R: Role> Drop for DynamicHandlerRegistration<R> {
1844 fn drop(&mut self) {
1845 self.cx.remove_dynamic_handler(self.uuid);
1846 }
1847}
1848
1849/// The context to respond to an incoming request.
1850///
1851/// This context is provided to request handlers and serves a dual role:
1852///
1853/// 1. **Respond to the request** - Use [`respond`](Self::respond) or
1854/// [`respond_with_result`](Self::respond_with_result) to send the response
1855/// 2. **Send other messages** - Use the [`ConnectionTo`] parameter passed to your
1856/// handler, which provides [`send_request`](`ConnectionTo::send_request`),
1857/// [`send_notification`](`ConnectionTo::send_notification`), and
1858/// [`spawn`](`ConnectionTo::spawn`)
1859///
1860/// # Example
1861///
1862/// ```no_run
1863/// # use agent_client_protocol_test::*;
1864/// # async fn example() -> Result<(), agent_client_protocol::Error> {
1865/// # let connection = mock_connection();
1866/// connection.on_receive_request(async |req: ProcessRequest, responder, cx| {
1867/// // Send a notification while processing
1868/// cx.send_notification(StatusUpdate {
1869/// message: "processing".into(),
1870/// })?;
1871///
1872/// // Do some work...
1873/// let result = process(&req.data)?;
1874///
1875/// // Respond to the request
1876/// responder.respond(ProcessResponse { result })
1877/// }, agent_client_protocol::on_receive_request!())
1878/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
1879/// # Ok(())
1880/// # }
1881/// ```
1882///
1883/// # Event Loop Considerations
1884///
1885/// Like all handlers, request handlers run on the event loop. Use
1886/// [`spawn`](ConnectionTo::spawn) for expensive operations to avoid blocking
1887/// the connection.
1888///
1889/// See the [Event Loop and Concurrency](Builder#event-loop-and-concurrency)
1890/// section for more details.
1891#[must_use]
1892pub struct Responder<T: JsonRpcResponse = serde_json::Value> {
1893 /// The method of the request.
1894 method: String,
1895
1896 /// The `id` of the message we are replying to.
1897 id: jsonrpcmsg::Id,
1898
1899 /// Function to send the response to its destination.
1900 ///
1901 /// For incoming requests: serializes to JSON and sends over the wire.
1902 /// For incoming responses: sends to the waiting oneshot channel.
1903 send_fn: SendBoxFnOnce<'static, (Result<T, crate::Error>,), Result<(), crate::Error>>,
1904}
1905
1906impl<T: JsonRpcResponse> std::fmt::Debug for Responder<T> {
1907 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1908 f.debug_struct("Responder")
1909 .field("method", &self.method)
1910 .field("id", &self.id)
1911 .field("response_type", &std::any::type_name::<T>())
1912 .finish_non_exhaustive()
1913 }
1914}
1915
1916impl Responder<serde_json::Value> {
1917 /// Create a new request context for an incoming request.
1918 ///
1919 /// The response will be serialized to JSON and sent over the wire.
1920 fn new(message_tx: OutgoingMessageTx, method: String, id: jsonrpcmsg::Id) -> Self {
1921 let id_clone = id.clone();
1922 Self {
1923 method,
1924 id,
1925 send_fn: SendBoxFnOnce::new(
1926 move |response: Result<serde_json::Value, crate::Error>| {
1927 send_raw_message(
1928 &message_tx,
1929 OutgoingMessage::Response {
1930 id: id_clone,
1931 response,
1932 },
1933 )
1934 },
1935 ),
1936 }
1937 }
1938
1939 /// Cast this request context to a different response type.
1940 ///
1941 /// The provided type `T` will be serialized to JSON before sending.
1942 pub fn cast<T: JsonRpcResponse>(self) -> Responder<T> {
1943 self.wrap_params(move |method, value| match value {
1944 Ok(value) => T::into_json(value, method),
1945 Err(e) => Err(e),
1946 })
1947 }
1948}
1949
1950impl<T: JsonRpcResponse> Responder<T> {
1951 /// Method of the incoming request
1952 #[must_use]
1953 pub fn method(&self) -> &str {
1954 &self.method
1955 }
1956
1957 /// ID of the incoming request/response as a JSON value
1958 #[must_use]
1959 pub fn id(&self) -> serde_json::Value {
1960 crate::util::id_to_json(&self.id)
1961 }
1962
1963 /// Convert to a `Responder` that expects a JSON value
1964 /// and which checks (dynamically) that the JSON value it receives
1965 /// can be converted to `T`.
1966 pub fn erase_to_json(self) -> Responder<serde_json::Value> {
1967 self.wrap_params(|method, value| T::from_value(method, value?))
1968 }
1969
1970 /// Return a new Responder with a different method name.
1971 pub fn wrap_method(self, method: String) -> Responder<T> {
1972 Responder {
1973 method,
1974 id: self.id,
1975 send_fn: self.send_fn,
1976 }
1977 }
1978
1979 /// Return a new Responder that expects a response of type U.
1980 ///
1981 /// `wrap_fn` will be invoked with the method name and the result to transform
1982 /// type `U` into type `T` before sending.
1983 pub fn wrap_params<U: JsonRpcResponse>(
1984 self,
1985 wrap_fn: impl FnOnce(&str, Result<U, crate::Error>) -> Result<T, crate::Error> + Send + 'static,
1986 ) -> Responder<U> {
1987 let method = self.method.clone();
1988 Responder {
1989 method: self.method,
1990 id: self.id,
1991 send_fn: SendBoxFnOnce::new(move |input: Result<U, crate::Error>| {
1992 let t_value = wrap_fn(&method, input);
1993 self.send_fn.call(t_value)
1994 }),
1995 }
1996 }
1997
1998 /// Respond to the JSON-RPC request with either a value (`Ok`) or an error (`Err`).
1999 pub fn respond_with_result(
2000 self,
2001 response: Result<T, crate::Error>,
2002 ) -> Result<(), crate::Error> {
2003 tracing::debug!(id = ?self.id, "respond called");
2004 self.send_fn.call(response)
2005 }
2006
2007 /// Respond to the JSON-RPC request with a value.
2008 pub fn respond(self, response: T) -> Result<(), crate::Error> {
2009 self.respond_with_result(Ok(response))
2010 }
2011
2012 /// Respond to the JSON-RPC request with an internal error containing a message.
2013 pub fn respond_with_internal_error(self, message: impl ToString) -> Result<(), crate::Error> {
2014 self.respond_with_error(crate::util::internal_error(message))
2015 }
2016
2017 /// Respond to the JSON-RPC request with an error.
2018 pub fn respond_with_error(self, error: crate::Error) -> Result<(), crate::Error> {
2019 tracing::debug!(id = ?self.id, ?error, "respond_with_error called");
2020 self.respond_with_result(Err(error))
2021 }
2022}
2023
2024/// Context for handling an incoming JSON-RPC response.
2025///
2026/// This is the response-side counterpart to [`Responder`]. While `Responder` handles
2027/// incoming requests (where you send a response over the wire), `ResponseRouter` handles
2028/// incoming responses (where you route the response to a local task waiting for it).
2029///
2030/// Both are fundamentally "sinks" that push the message through a `send_fn`, but they
2031/// represent different points in the message lifecycle and carry different metadata.
2032#[must_use]
2033pub struct ResponseRouter<T: JsonRpcResponse = serde_json::Value> {
2034 /// The method of the original request.
2035 method: String,
2036
2037 /// The `id` of the original request.
2038 id: jsonrpcmsg::Id,
2039
2040 /// The RoleId to which the original request was sent
2041 /// (and hence from which the reply is expected).
2042 role_id: RoleId,
2043
2044 /// Function to send the response to the waiting task.
2045 send_fn: SendBoxFnOnce<'static, (Result<T, crate::Error>,), Result<(), crate::Error>>,
2046}
2047
2048impl<T: JsonRpcResponse> std::fmt::Debug for ResponseRouter<T> {
2049 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2050 f.debug_struct("ResponseRouter")
2051 .field("method", &self.method)
2052 .field("id", &self.id)
2053 .field("response_type", &std::any::type_name::<T>())
2054 .finish_non_exhaustive()
2055 }
2056}
2057
2058impl ResponseRouter<serde_json::Value> {
2059 /// Create a new response context for routing a response to a local awaiter.
2060 ///
2061 /// When `respond_with_result` is called, the response is sent through the oneshot
2062 /// channel to the code that originally sent the request.
2063 pub(crate) fn new(
2064 method: String,
2065 id: jsonrpcmsg::Id,
2066 role_id: RoleId,
2067 sender: oneshot::Sender<ResponsePayload>,
2068 ) -> Self {
2069 Self {
2070 method,
2071 id,
2072 role_id,
2073 send_fn: SendBoxFnOnce::new(
2074 move |response: Result<serde_json::Value, crate::Error>| {
2075 sender
2076 .send(ResponsePayload {
2077 result: response,
2078 ack_tx: None,
2079 })
2080 .map_err(|_| {
2081 crate::util::internal_error("failed to send response, receiver dropped")
2082 })
2083 },
2084 ),
2085 }
2086 }
2087
2088 /// Cast this response context to a different response type.
2089 ///
2090 /// The provided type `T` will be serialized to JSON before sending.
2091 pub fn cast<T: JsonRpcResponse>(self) -> ResponseRouter<T> {
2092 self.wrap_params(move |method, value| match value {
2093 Ok(value) => T::into_json(value, method),
2094 Err(e) => Err(e),
2095 })
2096 }
2097}
2098
2099impl<T: JsonRpcResponse> ResponseRouter<T> {
2100 /// Method of the original request
2101 #[must_use]
2102 pub fn method(&self) -> &str {
2103 &self.method
2104 }
2105
2106 /// ID of the original request as a JSON value
2107 #[must_use]
2108 pub fn id(&self) -> serde_json::Value {
2109 crate::util::id_to_json(&self.id)
2110 }
2111
2112 /// The peer to which the original request was sent.
2113 ///
2114 /// This is the peer from which we expect to receive the response.
2115 #[must_use]
2116 pub fn role_id(&self) -> RoleId {
2117 self.role_id.clone()
2118 }
2119
2120 /// Convert to a `ResponseRouter` that expects a JSON value
2121 /// and which checks (dynamically) that the JSON value it receives
2122 /// can be converted to `T`.
2123 pub fn erase_to_json(self) -> ResponseRouter<serde_json::Value> {
2124 self.wrap_params(|method, value| T::from_value(method, value?))
2125 }
2126
2127 /// Return a new ResponseRouter that expects a response of type U.
2128 ///
2129 /// `wrap_fn` will be invoked with the method name and the result to transform
2130 /// type `U` into type `T` before sending.
2131 fn wrap_params<U: JsonRpcResponse>(
2132 self,
2133 wrap_fn: impl FnOnce(&str, Result<U, crate::Error>) -> Result<T, crate::Error> + Send + 'static,
2134 ) -> ResponseRouter<U> {
2135 let method = self.method.clone();
2136 ResponseRouter {
2137 method: self.method,
2138 id: self.id,
2139 role_id: self.role_id,
2140 send_fn: SendBoxFnOnce::new(move |input: Result<U, crate::Error>| {
2141 let t_value = wrap_fn(&method, input);
2142 self.send_fn.call(t_value)
2143 }),
2144 }
2145 }
2146
2147 /// Complete the response by sending the result to the waiting task.
2148 pub fn respond_with_result(
2149 self,
2150 response: Result<T, crate::Error>,
2151 ) -> Result<(), crate::Error> {
2152 tracing::debug!(id = ?self.id, "response routed to awaiter");
2153 self.send_fn.call(response)
2154 }
2155
2156 /// Complete the response by sending a value to the waiting task.
2157 pub fn respond(self, response: T) -> Result<(), crate::Error> {
2158 self.respond_with_result(Ok(response))
2159 }
2160
2161 /// Complete the response by sending an internal error to the waiting task.
2162 pub fn respond_with_internal_error(self, message: impl ToString) -> Result<(), crate::Error> {
2163 self.respond_with_error(crate::util::internal_error(message))
2164 }
2165
2166 /// Complete the response by sending an error to the waiting task.
2167 pub fn respond_with_error(self, error: crate::Error) -> Result<(), crate::Error> {
2168 tracing::debug!(id = ?self.id, ?error, "error routed to awaiter");
2169 self.respond_with_result(Err(error))
2170 }
2171}
2172
2173/// Common bounds for any JSON-RPC message.
2174///
2175/// # Derive Macro
2176///
2177/// For simple message types, you can use the `JsonRpcRequest` or `JsonRpcNotification` derive macros
2178/// which will implement both `JsonRpcMessage` and the respective trait. See [`JsonRpcRequest`] and
2179/// [`JsonRpcNotification`] for examples.
2180pub trait JsonRpcMessage: 'static + Debug + Sized + Send + Clone {
2181 /// Check if this message type matches the given method name.
2182 fn matches_method(method: &str) -> bool;
2183
2184 /// The method name for the message.
2185 fn method(&self) -> &str;
2186
2187 /// Convert this message into an untyped message.
2188 fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error>;
2189
2190 /// Parse this type from a method name and parameters.
2191 ///
2192 /// Returns an error if the method doesn't match or deserialization fails.
2193 /// Callers should use `matches_method` first to check if this type handles the method.
2194 fn parse_message(method: &str, params: &impl Serialize) -> Result<Self, crate::Error>;
2195}
2196
2197/// Defines the "payload" of a successful response to a JSON-RPC request.
2198///
2199/// # Derive Macro
2200///
2201/// Use `#[derive(JsonRpcResponse)]` to automatically implement this trait:
2202///
2203/// ```ignore
2204/// use agent_client_protocol::JsonRpcResponse;
2205/// use serde::{Serialize, Deserialize};
2206///
2207/// #[derive(Debug, Serialize, Deserialize, JsonRpcResponse)]
2208/// #[response(method = "_hello")]
2209/// struct HelloResponse {
2210/// greeting: String,
2211/// }
2212/// ```
2213pub trait JsonRpcResponse: 'static + Debug + Sized + Send + Clone {
2214 /// Convert this message into a JSON value.
2215 fn into_json(self, method: &str) -> Result<serde_json::Value, crate::Error>;
2216
2217 /// Parse a JSON value into the response type.
2218 fn from_value(method: &str, value: serde_json::Value) -> Result<Self, crate::Error>;
2219}
2220
2221impl JsonRpcResponse for serde_json::Value {
2222 fn from_value(_method: &str, value: serde_json::Value) -> Result<Self, crate::Error> {
2223 Ok(value)
2224 }
2225
2226 fn into_json(self, _method: &str) -> Result<serde_json::Value, crate::Error> {
2227 Ok(self)
2228 }
2229}
2230
2231/// A struct that represents a notification (JSON-RPC message that does not expect a response).
2232///
2233/// # Derive Macro
2234///
2235/// Use `#[derive(JsonRpcNotification)]` to automatically implement both `JsonRpcMessage` and `JsonRpcNotification`:
2236///
2237/// ```ignore
2238/// use agent_client_protocol::JsonRpcNotification;
2239/// use serde::{Serialize, Deserialize};
2240///
2241/// #[derive(Debug, Clone, Serialize, Deserialize, JsonRpcNotification)]
2242/// #[notification(method = "_ping")]
2243/// struct PingNotification {
2244/// timestamp: u64,
2245/// }
2246/// ```
2247pub trait JsonRpcNotification: JsonRpcMessage {}
2248
2249/// A struct that represents a request (JSON-RPC message expecting a response).
2250///
2251/// # Derive Macro
2252///
2253/// Use `#[derive(JsonRpcRequest)]` to automatically implement both `JsonRpcMessage` and `JsonRpcRequest`:
2254///
2255/// ```ignore
2256/// use agent_client_protocol::{JsonRpcRequest, JsonRpcResponse};
2257/// use serde::{Serialize, Deserialize};
2258///
2259/// #[derive(Debug, Clone, Serialize, Deserialize, JsonRpcRequest)]
2260/// #[request(method = "_hello", response = HelloResponse)]
2261/// struct HelloRequest {
2262/// name: String,
2263/// }
2264///
2265/// #[derive(Debug, Serialize, Deserialize, JsonRpcResponse)]
2266/// struct HelloResponse {
2267/// greeting: String,
2268/// }
2269/// ```
2270pub trait JsonRpcRequest: JsonRpcMessage {
2271 /// The type of data expected in response.
2272 type Response: JsonRpcResponse;
2273}
2274
2275/// An enum capturing an in-flight request or notification.
2276/// In the case of a request, also includes the context used to respond to the request.
2277///
2278/// Type parameters allow specifying the concrete request and notification types.
2279/// By default, both are `UntypedMessage` for dynamic dispatch.
2280/// The request context's response type matches the request's response type.
2281#[derive(Debug)]
2282pub enum Dispatch<Req: JsonRpcRequest = UntypedMessage, Notif: JsonRpcMessage = UntypedMessage> {
2283 /// Incoming request and the context where the response should be sent.
2284 Request(Req, Responder<Req::Response>),
2285
2286 /// Incoming notification.
2287 Notification(Notif),
2288
2289 /// Incoming response to a request we sent.
2290 ///
2291 /// The first field is the response result (success or error from the remote).
2292 /// The second field is the context for forwarding the response to its destination
2293 /// (typically a waiting oneshot channel).
2294 Response(
2295 Result<Req::Response, crate::Error>,
2296 ResponseRouter<Req::Response>,
2297 ),
2298}
2299
2300impl<Req: JsonRpcRequest, Notif: JsonRpcMessage> Dispatch<Req, Notif> {
2301 /// Map the request and notification types to new types.
2302 ///
2303 /// Note: Response variants are passed through unchanged since they don't
2304 /// contain a parseable message payload.
2305 pub fn map<Req1, Notif1>(
2306 self,
2307 map_request: impl FnOnce(Req, Responder<Req::Response>) -> (Req1, Responder<Req1::Response>),
2308 map_notification: impl FnOnce(Notif) -> Notif1,
2309 ) -> Dispatch<Req1, Notif1>
2310 where
2311 Req1: JsonRpcRequest<Response = Req::Response>,
2312 Notif1: JsonRpcMessage,
2313 {
2314 match self {
2315 Dispatch::Request(request, responder) => {
2316 let (new_request, new_responder) = map_request(request, responder);
2317 Dispatch::Request(new_request, new_responder)
2318 }
2319 Dispatch::Notification(notification) => {
2320 let new_notification = map_notification(notification);
2321 Dispatch::Notification(new_notification)
2322 }
2323 Dispatch::Response(result, router) => Dispatch::Response(result, router),
2324 }
2325 }
2326
2327 /// Respond to the message with an error.
2328 ///
2329 /// If this message is a request, this error becomes the reply to the request.
2330 ///
2331 /// If this message is a notification, the error is sent as a notification.
2332 ///
2333 /// If this message is a response, the error is forwarded to the waiting handler.
2334 pub fn respond_with_error<R: Role>(
2335 self,
2336 error: crate::Error,
2337 cx: ConnectionTo<R>,
2338 ) -> Result<(), crate::Error> {
2339 match self {
2340 Dispatch::Request(_, responder) => responder.respond_with_error(error),
2341 Dispatch::Notification(_) => cx.send_error_notification(error),
2342 Dispatch::Response(_, responder) => responder.respond_with_error(error),
2343 }
2344 }
2345
2346 /// Convert to a `Responder` that expects a JSON value
2347 /// and which checks (dynamically) that the JSON value it receives
2348 /// can be converted to `T`.
2349 ///
2350 /// Note: Response variants cannot be erased since their payload is already
2351 /// parsed. This returns an error for Response variants.
2352 pub fn erase_to_json(self) -> Result<Dispatch, crate::Error> {
2353 match self {
2354 Dispatch::Request(response, responder) => Ok(Dispatch::Request(
2355 response.to_untyped_message()?,
2356 responder.erase_to_json(),
2357 )),
2358 Dispatch::Notification(notification) => {
2359 Ok(Dispatch::Notification(notification.to_untyped_message()?))
2360 }
2361 Dispatch::Response(_, _) => Err(crate::util::internal_error(
2362 "cannot erase Response variant to JSON",
2363 )),
2364 }
2365 }
2366
2367 /// Convert the message in self to an untyped message.
2368 ///
2369 /// Note: Response variants don't have an untyped message representation.
2370 /// This returns an error for Response variants.
2371 pub fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error> {
2372 match self {
2373 Dispatch::Request(request, _) => request.to_untyped_message(),
2374 Dispatch::Notification(notification) => notification.to_untyped_message(),
2375 Dispatch::Response(_, _) => Err(crate::util::internal_error(
2376 "Response variant has no untyped message representation",
2377 )),
2378 }
2379 }
2380
2381 /// Convert self to an untyped message context.
2382 ///
2383 /// Note: Response variants cannot be converted. This returns an error for Response variants.
2384 pub fn into_untyped_dispatch(self) -> Result<Dispatch, crate::Error> {
2385 match self {
2386 Dispatch::Request(request, responder) => Ok(Dispatch::Request(
2387 request.to_untyped_message()?,
2388 responder.erase_to_json(),
2389 )),
2390 Dispatch::Notification(notification) => {
2391 Ok(Dispatch::Notification(notification.to_untyped_message()?))
2392 }
2393 Dispatch::Response(_, _) => Err(crate::util::internal_error(
2394 "cannot convert Response variant to untyped message context",
2395 )),
2396 }
2397 }
2398
2399 /// Returns the request ID if this is a request or response, None if notification.
2400 pub fn id(&self) -> Option<serde_json::Value> {
2401 match self {
2402 Dispatch::Request(_, cx) => Some(cx.id()),
2403 Dispatch::Notification(_) => None,
2404 Dispatch::Response(_, cx) => Some(cx.id()),
2405 }
2406 }
2407
2408 /// Returns the method of the message.
2409 ///
2410 /// For requests and notifications, this is the method from the message payload.
2411 /// For responses, this is the method of the original request.
2412 pub fn method(&self) -> &str {
2413 match self {
2414 Dispatch::Request(msg, _) => msg.method(),
2415 Dispatch::Notification(msg) => msg.method(),
2416 Dispatch::Response(_, cx) => cx.method(),
2417 }
2418 }
2419}
2420
2421impl Dispatch {
2422 /// Attempts to parse `self` into a typed message context.
2423 ///
2424 /// # Returns
2425 ///
2426 /// * `Ok(Ok(typed))` if this is a request/notification of the given types
2427 /// * `Ok(Err(self))` if not
2428 /// * `Err` if has the correct method for the given types but parsing fails
2429 #[tracing::instrument(skip(self), fields(Request = ?std::any::type_name::<Req>(), Notif = ?std::any::type_name::<Notif>()), level = "trace", ret)]
2430 pub(crate) fn into_typed_dispatch<Req: JsonRpcRequest, Notif: JsonRpcNotification>(
2431 self,
2432 ) -> Result<Result<Dispatch<Req, Notif>, Dispatch>, crate::Error> {
2433 tracing::debug!(
2434 message = ?self,
2435 "into_typed_dispatch"
2436 );
2437 match self {
2438 Dispatch::Request(message, responder) => {
2439 if Req::matches_method(&message.method) {
2440 match Req::parse_message(&message.method, &message.params) {
2441 Ok(req) => {
2442 tracing::trace!(?req, "parsed ok");
2443 Ok(Ok(Dispatch::Request(req, responder.cast())))
2444 }
2445 Err(err) => {
2446 tracing::trace!(?err, "parse error");
2447 Err(err)
2448 }
2449 }
2450 } else {
2451 tracing::trace!("method doesn't match");
2452 Ok(Err(Dispatch::Request(message, responder)))
2453 }
2454 }
2455
2456 Dispatch::Notification(message) => {
2457 if Notif::matches_method(&message.method) {
2458 match Notif::parse_message(&message.method, &message.params) {
2459 Ok(notif) => {
2460 tracing::trace!(?notif, "parse ok");
2461 Ok(Ok(Dispatch::Notification(notif)))
2462 }
2463 Err(err) => {
2464 tracing::trace!(?err, "parse error");
2465 Err(err)
2466 }
2467 }
2468 } else {
2469 tracing::trace!("method doesn't match");
2470 Ok(Err(Dispatch::Notification(message)))
2471 }
2472 }
2473
2474 Dispatch::Response(result, cx) => {
2475 let method = cx.method();
2476 if Req::matches_method(method) {
2477 // Parse the response result
2478 let typed_result = match result {
2479 Ok(value) => {
2480 match <Req::Response as JsonRpcResponse>::from_value(method, value) {
2481 Ok(parsed) => {
2482 tracing::trace!(?parsed, "parse ok");
2483 Ok(parsed)
2484 }
2485 Err(err) => {
2486 tracing::trace!(?err, "parse error");
2487 return Err(err);
2488 }
2489 }
2490 }
2491 Err(err) => {
2492 tracing::trace!("error, passthrough");
2493 Err(err)
2494 }
2495 };
2496 Ok(Ok(Dispatch::Response(typed_result, cx.cast())))
2497 } else {
2498 tracing::trace!("method doesn't match");
2499 Ok(Err(Dispatch::Response(result, cx)))
2500 }
2501 }
2502 }
2503 }
2504
2505 /// True if this message has a field with the given name.
2506 ///
2507 /// Returns `false` for Response variants.
2508 #[must_use]
2509 pub fn has_field(&self, field_name: &str) -> bool {
2510 self.message()
2511 .and_then(|m| m.params().get(field_name))
2512 .is_some()
2513 }
2514
2515 /// Returns true if this message has a session-id field.
2516 ///
2517 /// Returns `false` for Response variants.
2518 pub(crate) fn has_session_id(&self) -> bool {
2519 self.has_field("sessionId")
2520 }
2521
2522 /// Extract the ACP session-id from this message (if any).
2523 ///
2524 /// Returns `Ok(None)` for Response variants.
2525 pub(crate) fn get_session_id(&self) -> Result<Option<SessionId>, crate::Error> {
2526 let Some(message) = self.message() else {
2527 return Ok(None);
2528 };
2529 let Some(value) = message.params().get("sessionId") else {
2530 return Ok(None);
2531 };
2532 let session_id = serde_json::from_value(value.clone())?;
2533 Ok(Some(session_id))
2534 }
2535
2536 /// Try to parse this as a notification of the given type.
2537 ///
2538 /// # Returns
2539 ///
2540 /// * `Ok(Ok(typed))` if this is a request/notification of the given types
2541 /// * `Ok(Err(self))` if not
2542 /// * `Err` if has the correct method for the given types but parsing fails
2543 pub fn into_notification<N: JsonRpcNotification>(
2544 self,
2545 ) -> Result<Result<N, Dispatch>, crate::Error> {
2546 match self {
2547 Dispatch::Notification(msg) => {
2548 if !N::matches_method(&msg.method) {
2549 return Ok(Err(Dispatch::Notification(msg)));
2550 }
2551 match N::parse_message(&msg.method, &msg.params) {
2552 Ok(n) => Ok(Ok(n)),
2553 Err(err) => Err(err),
2554 }
2555 }
2556 Dispatch::Request(..) | Dispatch::Response(..) => Ok(Err(self)),
2557 }
2558 }
2559
2560 /// Try to parse this as a request of the given type.
2561 ///
2562 /// # Returns
2563 ///
2564 /// * `Ok(Ok(typed))` if this is a request/notification of the given types
2565 /// * `Ok(Err(self))` if not
2566 /// * `Err` if has the correct method for the given types but parsing fails
2567 pub fn into_request<Req: JsonRpcRequest>(
2568 self,
2569 ) -> Result<Result<(Req, Responder<Req::Response>), Dispatch>, crate::Error> {
2570 match self {
2571 Dispatch::Request(msg, responder) => {
2572 if !Req::matches_method(&msg.method) {
2573 return Ok(Err(Dispatch::Request(msg, responder)));
2574 }
2575 match Req::parse_message(&msg.method, &msg.params) {
2576 Ok(req) => Ok(Ok((req, responder.cast()))),
2577 Err(err) => Err(err),
2578 }
2579 }
2580 Dispatch::Notification(..) | Dispatch::Response(..) => Ok(Err(self)),
2581 }
2582 }
2583}
2584
2585impl<M: JsonRpcRequest + JsonRpcNotification> Dispatch<M, M> {
2586 /// Returns the message payload for requests and notifications.
2587 ///
2588 /// Returns `None` for Response variants since they don't contain a message payload.
2589 pub fn message(&self) -> Option<&M> {
2590 match self {
2591 Dispatch::Request(msg, _) | Dispatch::Notification(msg) => Some(msg),
2592 Dispatch::Response(_, _) => None,
2593 }
2594 }
2595
2596 /// Map the request/notification message.
2597 ///
2598 /// Response variants pass through unchanged.
2599 pub(crate) fn try_map_message(
2600 self,
2601 map_message: impl FnOnce(M) -> Result<M, crate::Error>,
2602 ) -> Result<Dispatch<M, M>, crate::Error> {
2603 match self {
2604 Dispatch::Request(request, cx) => Ok(Dispatch::Request(map_message(request)?, cx)),
2605 Dispatch::Notification(notification) => {
2606 Ok(Dispatch::<M, M>::Notification(map_message(notification)?))
2607 }
2608 Dispatch::Response(result, cx) => Ok(Dispatch::Response(result, cx)),
2609 }
2610 }
2611}
2612
2613/// An incoming JSON message without any typing. Can be a request or a notification.
2614#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
2615pub struct UntypedMessage {
2616 /// The JSON-RPC method name
2617 pub method: String,
2618 /// The JSON-RPC parameters as a raw JSON value
2619 pub params: serde_json::Value,
2620}
2621
2622impl UntypedMessage {
2623 /// Returns an untyped message with the given method and parameters.
2624 pub fn new(method: &str, params: impl Serialize) -> Result<Self, crate::Error> {
2625 let params = serde_json::to_value(params)?;
2626 Ok(Self {
2627 method: method.to_string(),
2628 params,
2629 })
2630 }
2631
2632 /// Returns the method name
2633 #[must_use]
2634 pub fn method(&self) -> &str {
2635 &self.method
2636 }
2637
2638 /// Returns the parameters as a JSON value
2639 #[must_use]
2640 pub fn params(&self) -> &serde_json::Value {
2641 &self.params
2642 }
2643
2644 /// Consumes this message and returns the method and params
2645 #[must_use]
2646 pub fn into_parts(self) -> (String, serde_json::Value) {
2647 (self.method, self.params)
2648 }
2649
2650 /// Convert `self` to a JSON-RPC message.
2651 pub(crate) fn into_jsonrpc_msg(
2652 self,
2653 id: Option<jsonrpcmsg::Id>,
2654 ) -> Result<jsonrpcmsg::Request, crate::Error> {
2655 let Self { method, params } = self;
2656 Ok(jsonrpcmsg::Request::new_v2(method, json_cast(params)?, id))
2657 }
2658}
2659
2660impl JsonRpcMessage for UntypedMessage {
2661 fn matches_method(_method: &str) -> bool {
2662 // UntypedMessage matches any method - it's the untyped fallback
2663 true
2664 }
2665
2666 fn method(&self) -> &str {
2667 &self.method
2668 }
2669
2670 fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error> {
2671 Ok(self.clone())
2672 }
2673
2674 fn parse_message(method: &str, params: &impl Serialize) -> Result<Self, crate::Error> {
2675 UntypedMessage::new(method, params)
2676 }
2677}
2678
2679impl JsonRpcRequest for UntypedMessage {
2680 type Response = serde_json::Value;
2681}
2682
2683impl JsonRpcNotification for UntypedMessage {}
2684
2685/// Represents a pending response of type `R` from an outgoing request.
2686///
2687/// Returned by [`ConnectionTo::send_request`], this type provides methods for handling
2688/// the response without blocking the event loop. The API is intentionally designed to make
2689/// it difficult to accidentally block.
2690///
2691/// # Anti-Footgun Design
2692///
2693/// You cannot directly `.await` a `SentRequest`. Instead, you must choose how to handle
2694/// the response:
2695///
2696/// ## Option 1: Schedule a Callback (Safe in Handlers)
2697///
2698/// Use [`on_receiving_result`](Self::on_receiving_result) to schedule a task
2699/// that runs when the response arrives. This doesn't block the event loop:
2700///
2701/// ```no_run
2702/// # use agent_client_protocol_test::*;
2703/// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
2704/// cx.send_request(MyRequest {})
2705/// .on_receiving_result(async |result| {
2706/// match result {
2707/// Ok(response) => {
2708/// // Handle successful response
2709/// Ok(())
2710/// }
2711/// Err(error) => {
2712/// // Handle error
2713/// Err(error)
2714/// }
2715/// }
2716/// })?;
2717/// # Ok(())
2718/// # }
2719/// ```
2720///
2721/// ## Option 2: Block in a Spawned Task (Safe Only in `spawn`)
2722///
2723/// Use [`block_task`](Self::block_task) to block until the response arrives, but **only**
2724/// in a spawned task (never in a handler):
2725///
2726/// ```no_run
2727/// # use agent_client_protocol_test::*;
2728/// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
2729/// // ✅ Safe: Spawned task runs concurrently
2730/// cx.spawn({
2731/// let cx = cx.clone();
2732/// async move {
2733/// let response = cx.send_request(MyRequest {})
2734/// .block_task()
2735/// .await?;
2736/// // Process response...
2737/// Ok(())
2738/// }
2739/// })?;
2740/// # Ok(())
2741/// # }
2742/// ```
2743///
2744/// ```no_run
2745/// # use agent_client_protocol_test::*;
2746/// # async fn example() -> Result<(), agent_client_protocol::Error> {
2747/// # let connection = mock_connection();
2748/// // ❌ NEVER do this in a handler - blocks the event loop!
2749/// connection.on_receive_request(async |req: MyRequest, responder, cx| {
2750/// let response = cx.send_request(MyRequest {})
2751/// .block_task() // This will deadlock!
2752/// .await?;
2753/// responder.respond(response)
2754/// }, agent_client_protocol::on_receive_request!())
2755/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
2756/// # Ok(())
2757/// # }
2758/// ```
2759///
2760/// # Why This Design?
2761///
2762/// If you block the event loop while waiting for a response, the connection cannot process
2763/// the incoming response message, creating a deadlock. This API design prevents that footgun
2764/// by making blocking explicit and encouraging non-blocking patterns.
2765pub struct SentRequest<T> {
2766 id: jsonrpcmsg::Id,
2767 method: String,
2768 task_tx: TaskTx,
2769 response_rx: oneshot::Receiver<ResponsePayload>,
2770 to_result: Box<dyn Fn(serde_json::Value) -> Result<T, crate::Error> + Send>,
2771}
2772
2773impl<T: Debug> Debug for SentRequest<T> {
2774 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2775 f.debug_struct("SentRequest")
2776 .field("id", &self.id)
2777 .field("method", &self.method)
2778 .field("task_tx", &self.task_tx)
2779 .field("response_rx", &self.response_rx)
2780 .finish_non_exhaustive()
2781 }
2782}
2783
2784impl SentRequest<serde_json::Value> {
2785 fn new(
2786 id: jsonrpcmsg::Id,
2787 method: String,
2788 task_tx: mpsc::UnboundedSender<Task>,
2789 response_rx: oneshot::Receiver<ResponsePayload>,
2790 ) -> Self {
2791 Self {
2792 id,
2793 method,
2794 response_rx,
2795 task_tx,
2796 to_result: Box::new(Ok),
2797 }
2798 }
2799}
2800
2801impl<T: JsonRpcResponse> SentRequest<T> {
2802 /// The id of the outgoing request.
2803 #[must_use]
2804 pub fn id(&self) -> serde_json::Value {
2805 crate::util::id_to_json(&self.id)
2806 }
2807
2808 /// The method of the request this is in response to.
2809 #[must_use]
2810 pub fn method(&self) -> &str {
2811 &self.method
2812 }
2813
2814 /// Create a new response that maps the result of the response to a new type.
2815 pub fn map<U>(
2816 self,
2817 map_fn: impl Fn(T) -> Result<U, crate::Error> + 'static + Send,
2818 ) -> SentRequest<U> {
2819 SentRequest {
2820 id: self.id,
2821 method: self.method,
2822 response_rx: self.response_rx,
2823 task_tx: self.task_tx,
2824 to_result: Box::new(move |value| map_fn((self.to_result)(value)?)),
2825 }
2826 }
2827
2828 /// Forward the response (success or error) to a request context when it arrives.
2829 ///
2830 /// This is a convenience method for proxying messages between connections. When the
2831 /// response arrives, it will be automatically sent to the provided request context,
2832 /// whether it's a successful response or an error.
2833 ///
2834 /// # Example: Proxying requests
2835 ///
2836 /// ```
2837 /// # use agent_client_protocol::UntypedRole;
2838 /// # use agent_client_protocol::{Builder, ConnectionTo};
2839 /// # use agent_client_protocol_test::*;
2840 /// # async fn example(cx: ConnectionTo<UntypedRole>) -> Result<(), agent_client_protocol::Error> {
2841 /// // Set up backend connection builder
2842 /// let backend = UntypedRole.builder()
2843 /// .on_receive_request(async |req: MyRequest, responder, cx| {
2844 /// responder.respond(MyResponse { status: "ok".into() })
2845 /// }, agent_client_protocol::on_receive_request!());
2846 ///
2847 /// // Spawn backend and get a context to send to it
2848 /// let backend_connection = cx.spawn_connection(backend, MockTransport)?;
2849 ///
2850 /// // Set up proxy that forwards requests to backend
2851 /// UntypedRole.builder()
2852 /// .on_receive_request({
2853 /// let backend_connection = backend_connection.clone();
2854 /// async move |req: MyRequest, responder, cx| {
2855 /// // Forward the request to backend and proxy the response back
2856 /// backend_connection.send_request(req)
2857 /// .forward_response_to(responder)?;
2858 /// Ok(())
2859 /// }
2860 /// }, agent_client_protocol::on_receive_request!());
2861 /// # Ok(())
2862 /// # }
2863 /// ```
2864 ///
2865 /// # Type Safety
2866 ///
2867 /// The request context's response type must match the request's response type,
2868 /// ensuring type-safe message forwarding.
2869 ///
2870 /// # When to Use
2871 ///
2872 /// Use this when:
2873 /// - You're implementing a proxy or gateway pattern
2874 /// - You want to forward responses without processing them
2875 /// - The response types match between the outgoing request and incoming request
2876 ///
2877 /// This is equivalent to calling `on_receiving_result` and manually forwarding
2878 /// the result, but more concise.
2879 pub fn forward_response_to(self, responder: Responder<T>) -> Result<(), crate::Error>
2880 where
2881 T: Send,
2882 {
2883 self.on_receiving_result(async move |result| responder.respond_with_result(result))
2884 }
2885
2886 /// Block the current task until the response is received.
2887 ///
2888 /// **Warning:** This method blocks the current async task. It is **only safe** to use
2889 /// in spawned tasks created with [`ConnectionTo::spawn`]. Using it directly in a
2890 /// handler callback will deadlock the connection.
2891 ///
2892 /// # Safe Usage (in spawned tasks)
2893 ///
2894 /// ```no_run
2895 /// # use agent_client_protocol_test::*;
2896 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
2897 /// # let connection = mock_connection();
2898 /// connection.on_receive_request(async |req: MyRequest, responder, cx| {
2899 /// // Spawn a task to handle the request
2900 /// cx.spawn({
2901 /// let connection = cx.clone();
2902 /// async move {
2903 /// // Safe: We're in a spawned task, not blocking the event loop
2904 /// let response = connection.send_request(OtherRequest {})
2905 /// .block_task()
2906 /// .await?;
2907 ///
2908 /// // Process the response...
2909 /// Ok(())
2910 /// }
2911 /// })?;
2912 ///
2913 /// // Respond immediately
2914 /// responder.respond(MyResponse { status: "ok".into() })
2915 /// }, agent_client_protocol::on_receive_request!())
2916 /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
2917 /// # Ok(())
2918 /// # }
2919 /// ```
2920 ///
2921 /// # Unsafe Usage (in handlers - will deadlock!)
2922 ///
2923 /// ```no_run
2924 /// # use agent_client_protocol_test::*;
2925 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
2926 /// # let connection = mock_connection();
2927 /// connection.on_receive_request(async |req: MyRequest, responder, cx| {
2928 /// // ❌ DEADLOCK: Handler blocks event loop, which can't process the response
2929 /// let response = cx.send_request(OtherRequest {})
2930 /// .block_task()
2931 /// .await?;
2932 ///
2933 /// responder.respond(MyResponse { status: response.value })
2934 /// }, agent_client_protocol::on_receive_request!())
2935 /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
2936 /// # Ok(())
2937 /// # }
2938 /// ```
2939 ///
2940 /// # When to Use
2941 ///
2942 /// Use this method when:
2943 /// - You're in a spawned task (via [`ConnectionTo::spawn`])
2944 /// - You need the response value to proceed with your logic
2945 /// - Linear control flow is more natural than callbacks
2946 ///
2947 /// For handler callbacks, use [`on_receiving_result`](Self::on_receiving_result) instead.
2948 pub async fn block_task(self) -> Result<T, crate::Error>
2949 where
2950 T: Send,
2951 {
2952 match self.response_rx.await {
2953 Ok(ResponsePayload {
2954 result: Ok(json_value),
2955 ack_tx,
2956 }) => {
2957 // Ack immediately - we're in a spawned task, so the dispatch loop
2958 // can continue while we process the value.
2959 if let Some(tx) = ack_tx {
2960 let _ = tx.send(());
2961 }
2962 match (self.to_result)(json_value) {
2963 Ok(value) => Ok(value),
2964 Err(err) => Err(err),
2965 }
2966 }
2967 Ok(ResponsePayload {
2968 result: Err(err),
2969 ack_tx,
2970 }) => {
2971 if let Some(tx) = ack_tx {
2972 let _ = tx.send(());
2973 }
2974 Err(err)
2975 }
2976 Err(err) => Err(crate::util::internal_error(format!(
2977 "response to `{}` never received: {}",
2978 self.method, err
2979 ))),
2980 }
2981 }
2982
2983 /// Schedule an async task to run when a successful response is received.
2984 ///
2985 /// This is a convenience wrapper around [`on_receiving_result`](Self::on_receiving_result)
2986 /// for the common pattern of forwarding errors to a request context while only processing
2987 /// successful responses.
2988 ///
2989 /// # Behavior
2990 ///
2991 /// - If the response is `Ok(value)`, your task receives the value and the request context
2992 /// - If the response is `Err(error)`, the error is automatically sent to `responder`
2993 /// and your task is not called
2994 ///
2995 /// # Example: Chaining requests
2996 ///
2997 /// ```no_run
2998 /// # use agent_client_protocol_test::*;
2999 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
3000 /// # let connection = mock_connection();
3001 /// connection.on_receive_request(async |req: ValidateRequest, responder, cx| {
3002 /// // Send initial request
3003 /// cx.send_request(ValidateRequest { data: req.data.clone() })
3004 /// .on_receiving_ok_result(responder, async |validation, responder| {
3005 /// // Only runs if validation succeeded
3006 /// if validation.is_valid {
3007 /// // Respond to original request
3008 /// responder.respond(ValidateResponse { is_valid: true, error: None })
3009 /// } else {
3010 /// responder.respond_with_error(agent_client_protocol::util::internal_error("validation failed"))
3011 /// }
3012 /// })?;
3013 ///
3014 /// Ok(())
3015 /// }, agent_client_protocol::on_receive_request!())
3016 /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
3017 /// # Ok(())
3018 /// # }
3019 /// ```
3020 ///
3021 /// # Ordering
3022 ///
3023 /// Like [`on_receiving_result`](Self::on_receiving_result), the callback blocks the
3024 /// dispatch loop until it completes. See the [`ordering`](crate::concepts::ordering) module
3025 /// for details.
3026 ///
3027 /// # When to Use
3028 ///
3029 /// Use this when:
3030 /// - You need to respond to a request based on another request's result
3031 /// - You want errors to automatically propagate to the request context
3032 /// - You only care about the success case
3033 ///
3034 /// For more control over error handling, use [`on_receiving_result`](Self::on_receiving_result).
3035 #[track_caller]
3036 pub fn on_receiving_ok_result<F>(
3037 self,
3038 responder: Responder<T>,
3039 task: impl FnOnce(T, Responder<T>) -> F + 'static + Send,
3040 ) -> Result<(), crate::Error>
3041 where
3042 F: Future<Output = Result<(), crate::Error>> + 'static + Send,
3043 T: Send,
3044 {
3045 self.on_receiving_result(async move |result| match result {
3046 Ok(value) => task(value, responder).await,
3047 Err(err) => responder.respond_with_error(err),
3048 })
3049 }
3050
3051 /// Schedule an async task to run when the response is received.
3052 ///
3053 /// This is the recommended way to handle responses in handler callbacks, as it doesn't
3054 /// block the event loop. The task will be spawned automatically when the response arrives.
3055 ///
3056 /// # Example: Handle response in callback
3057 ///
3058 /// ```no_run
3059 /// # use agent_client_protocol_test::*;
3060 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
3061 /// # let connection = mock_connection();
3062 /// connection.on_receive_request(async |req: MyRequest, responder, cx| {
3063 /// // Send a request and schedule a callback for the response
3064 /// cx.send_request(QueryRequest { id: 22 })
3065 /// .on_receiving_result({
3066 /// let connection = cx.clone();
3067 /// async move |result| {
3068 /// match result {
3069 /// Ok(response) => {
3070 /// println!("Got response: {:?}", response);
3071 /// // Can send more messages here
3072 /// connection.send_notification(QueryComplete {})?;
3073 /// Ok(())
3074 /// }
3075 /// Err(error) => {
3076 /// eprintln!("Request failed: {}", error);
3077 /// Err(error)
3078 /// }
3079 /// }
3080 /// }
3081 /// })?;
3082 ///
3083 /// // Handler continues immediately without waiting
3084 /// responder.respond(MyResponse { status: "processing".into() })
3085 /// }, agent_client_protocol::on_receive_request!())
3086 /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
3087 /// # Ok(())
3088 /// # }
3089 /// ```
3090 ///
3091 /// # Ordering
3092 ///
3093 /// The callback runs as a spawned task, but the dispatch loop waits for it to complete
3094 /// before processing the next message. This gives you ordering guarantees: no other
3095 /// messages will be processed while your callback runs.
3096 ///
3097 /// This differs from [`block_task`](Self::block_task), which signals completion immediately
3098 /// upon receiving the response (before your code processes it).
3099 ///
3100 /// See the [`ordering`](crate::concepts::ordering) module for details on ordering guarantees
3101 /// and how to avoid deadlocks.
3102 ///
3103 /// # Error Handling
3104 ///
3105 /// If the scheduled task returns `Err`, the entire server will shut down. Make sure to handle
3106 /// errors appropriately within your task.
3107 ///
3108 /// # When to Use
3109 ///
3110 /// Use this method when:
3111 /// - You're in a handler callback (not a spawned task)
3112 /// - You want ordering guarantees (no other messages processed during your callback)
3113 /// - You need to do async work before "releasing" control back to the dispatch loop
3114 ///
3115 /// For spawned tasks where you don't need ordering guarantees, consider [`block_task`](Self::block_task).
3116 #[track_caller]
3117 pub fn on_receiving_result<F>(
3118 self,
3119 task: impl FnOnce(Result<T, crate::Error>) -> F + 'static + Send,
3120 ) -> Result<(), crate::Error>
3121 where
3122 F: Future<Output = Result<(), crate::Error>> + 'static + Send,
3123 T: Send,
3124 {
3125 let task_tx = self.task_tx.clone();
3126 let method = self.method;
3127 let response_rx = self.response_rx;
3128 let to_result = self.to_result;
3129 let location = Location::caller();
3130
3131 Task::new(location, async move {
3132 match response_rx.await {
3133 Ok(ResponsePayload { result, ack_tx }) => {
3134 // Convert the result using to_result for Ok values
3135 let typed_result = match result {
3136 Ok(json_value) => to_result(json_value),
3137 Err(err) => Err(err),
3138 };
3139
3140 // Run the user's callback
3141 let outcome = task(typed_result).await;
3142
3143 // Ack AFTER the callback completes - this is the key difference
3144 // from block_task. The dispatch loop waits for this ack.
3145 if let Some(tx) = ack_tx {
3146 let _ = tx.send(());
3147 }
3148
3149 outcome
3150 }
3151 Err(err) => Err(crate::util::internal_error(format!(
3152 "response to `{method}` never received: {err}"
3153 ))),
3154 }
3155 })
3156 .spawn(&task_tx)
3157 }
3158}
3159
3160// ============================================================================
3161// IntoJrConnectionTransport Implementations
3162// ============================================================================
3163
3164/// A component that communicates over line streams.
3165///
3166/// `Lines` implements the [`ConnectTo`] trait for any pair of line-based streams
3167/// (a `Stream<Item = io::Result<String>>` for incoming and a `Sink<String>` for outgoing),
3168/// handling serialization of JSON-RPC messages to/from newline-delimited JSON.
3169///
3170/// This is a lower-level primitive than [`ByteStreams`] that enables interception and
3171/// transformation of individual lines before they are parsed or after they are serialized.
3172/// This is particularly useful for debugging, logging, or implementing custom line-based
3173/// protocols.
3174///
3175/// # Use Cases
3176///
3177/// - **Line-by-line logging**: Intercept and log each line before parsing
3178/// - **Custom protocols**: Transform lines before/after JSON-RPC processing
3179/// - **Debugging**: Inspect raw message strings
3180/// - **Line filtering**: Skip or modify specific messages
3181///
3182/// Most users should use [`ByteStreams`] instead, which provides a simpler interface
3183/// for byte-based I/O.
3184///
3185/// [`ConnectTo`]: crate::ConnectTo
3186#[derive(Debug)]
3187pub struct Lines<OutgoingSink, IncomingStream> {
3188 /// Outgoing line sink (where we write serialized JSON-RPC messages)
3189 pub outgoing: OutgoingSink,
3190 /// Incoming line stream (where we read and parse JSON-RPC messages)
3191 pub incoming: IncomingStream,
3192}
3193
3194impl<OutgoingSink, IncomingStream> Lines<OutgoingSink, IncomingStream>
3195where
3196 OutgoingSink: futures::Sink<String, Error = std::io::Error> + Send + 'static,
3197 IncomingStream: futures::Stream<Item = std::io::Result<String>> + Send + 'static,
3198{
3199 /// Create a new line stream transport.
3200 pub fn new(outgoing: OutgoingSink, incoming: IncomingStream) -> Self {
3201 Self { outgoing, incoming }
3202 }
3203}
3204
3205impl<OutgoingSink, IncomingStream, R: Role> ConnectTo<R> for Lines<OutgoingSink, IncomingStream>
3206where
3207 OutgoingSink: futures::Sink<String, Error = std::io::Error> + Send + 'static,
3208 IncomingStream: futures::Stream<Item = std::io::Result<String>> + Send + 'static,
3209{
3210 async fn connect_to(self, client: impl ConnectTo<R::Counterpart>) -> Result<(), crate::Error> {
3211 let (channel, serve_self) = ConnectTo::<R>::into_channel_and_future(self);
3212 match futures::future::select(Box::pin(client.connect_to(channel)), serve_self).await {
3213 Either::Left((result, _)) | Either::Right((result, _)) => result,
3214 }
3215 }
3216
3217 fn into_channel_and_future(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
3218 let Self { outgoing, incoming } = self;
3219
3220 // Create a channel pair for the client to use
3221 let (channel_for_caller, channel_for_lines) = Channel::duplex();
3222
3223 // Create the server future that runs the line stream actors
3224 let server_future = Box::pin(async move {
3225 let Channel { rx, tx } = channel_for_lines;
3226
3227 // Run both actors concurrently
3228 let outgoing_future = transport_actor::transport_outgoing_lines_actor(rx, outgoing);
3229 let incoming_future = transport_actor::transport_incoming_lines_actor(incoming, tx);
3230
3231 // Wait for both to complete
3232 futures::try_join!(outgoing_future, incoming_future)?;
3233
3234 Ok(())
3235 });
3236
3237 (channel_for_caller, server_future)
3238 }
3239}
3240
3241/// A component that communicates over byte streams (stdin/stdout, sockets, pipes, etc.).
3242///
3243/// `ByteStreams` implements the [`ConnectTo`] trait for any pair of `AsyncRead` and `AsyncWrite`
3244/// streams, handling serialization of JSON-RPC messages to/from newline-delimited JSON.
3245/// This is the standard way to communicate with external processes or network connections.
3246///
3247/// # Use Cases
3248///
3249/// - **Stdio communication**: Connect to agents or proxies via stdin/stdout
3250/// - **Network sockets**: TCP, Unix domain sockets, or other stream-based protocols
3251/// - **Named pipes**: Cross-process communication on the same machine
3252/// - **File I/O**: Reading from and writing to file descriptors
3253///
3254/// # Example
3255///
3256/// Connecting to an agent via stdio:
3257///
3258/// ```no_run
3259/// use agent_client_protocol::UntypedRole;
3260/// # use agent_client_protocol::{ByteStreams};
3261/// use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
3262///
3263/// # async fn example() -> Result<(), agent_client_protocol::Error> {
3264/// let component = ByteStreams::new(
3265/// tokio::io::stdout().compat_write(),
3266/// tokio::io::stdin().compat(),
3267/// );
3268///
3269/// // Use as a component in a connection
3270/// agent_client_protocol::UntypedRole.builder()
3271/// .name("my-client")
3272/// .connect_to(component)
3273/// .await?;
3274/// # Ok(())
3275/// # }
3276/// ```
3277///
3278/// [`ConnectTo`]: crate::ConnectTo
3279#[derive(Debug)]
3280pub struct ByteStreams<OB, IB> {
3281 /// Outgoing byte stream (where we write serialized messages)
3282 pub outgoing: OB,
3283 /// Incoming byte stream (where we read and parse messages)
3284 pub incoming: IB,
3285}
3286
3287impl<OB, IB> ByteStreams<OB, IB>
3288where
3289 OB: AsyncWrite + Send + 'static,
3290 IB: AsyncRead + Send + 'static,
3291{
3292 /// Create a new byte stream transport.
3293 pub fn new(outgoing: OB, incoming: IB) -> Self {
3294 Self { outgoing, incoming }
3295 }
3296}
3297
3298impl<OB, IB, R: Role> ConnectTo<R> for ByteStreams<OB, IB>
3299where
3300 OB: AsyncWrite + Send + 'static,
3301 IB: AsyncRead + Send + 'static,
3302{
3303 async fn connect_to(self, client: impl ConnectTo<R::Counterpart>) -> Result<(), crate::Error> {
3304 let (channel, serve_self) = ConnectTo::<R>::into_channel_and_future(self);
3305 match futures::future::select(pin!(client.connect_to(channel)), serve_self).await {
3306 Either::Left((result, _)) | Either::Right((result, _)) => result,
3307 }
3308 }
3309
3310 fn into_channel_and_future(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
3311 use futures::AsyncBufReadExt;
3312 use futures::AsyncWriteExt;
3313 use futures::io::BufReader;
3314 let Self { outgoing, incoming } = self;
3315
3316 // Convert byte streams to line streams
3317 // Box both streams to satisfy Unpin requirements
3318 let incoming_lines = Box::pin(BufReader::new(incoming).lines());
3319
3320 // Create a sink that writes lines (with newlines) to the outgoing byte stream
3321 // We need to Box the writer since it may not be Unpin
3322 let outgoing_sink =
3323 futures::sink::unfold(Box::pin(outgoing), async move |mut writer, line: String| {
3324 let mut bytes = line.into_bytes();
3325 bytes.push(b'\n');
3326 writer.write_all(&bytes).await?;
3327 Ok::<_, std::io::Error>(writer)
3328 });
3329
3330 // Delegate to Lines component
3331 ConnectTo::<R>::into_channel_and_future(Lines::new(outgoing_sink, incoming_lines))
3332 }
3333}
3334
3335/// A channel endpoint representing one side of a bidirectional message channel.
3336///
3337/// `Channel` represents a single endpoint's view of a bidirectional communication channel.
3338/// Each endpoint has:
3339/// - `rx`: A receiver for incoming messages (or errors) from the counterpart
3340/// - `tx`: A sender for outgoing messages (or errors) to the counterpart
3341///
3342/// # Example
3343///
3344/// ```no_run
3345/// # use agent_client_protocol::UntypedRole;
3346/// # use agent_client_protocol::{Channel, Builder};
3347/// # async fn example() -> Result<(), agent_client_protocol::Error> {
3348/// // Create a pair of connected channels
3349/// let (channel_a, channel_b) = Channel::duplex();
3350///
3351/// // Each channel can be used by a different component
3352/// UntypedRole.builder()
3353/// .name("connection-a")
3354/// .connect_to(channel_a)
3355/// .await?;
3356/// # Ok(())
3357/// # }
3358/// ```
3359#[derive(Debug)]
3360pub struct Channel {
3361 /// Receives messages (or errors) from the counterpart.
3362 pub rx: mpsc::UnboundedReceiver<Result<jsonrpcmsg::Message, crate::Error>>,
3363 /// Sends messages (or errors) to the counterpart.
3364 pub tx: mpsc::UnboundedSender<Result<jsonrpcmsg::Message, crate::Error>>,
3365}
3366
3367impl Channel {
3368 /// Create a pair of connected channel endpoints.
3369 ///
3370 /// Returns two `Channel` instances that are connected to each other:
3371 /// - Messages sent via `channel_a.tx` are received on `channel_b.rx`
3372 /// - Messages sent via `channel_b.tx` are received on `channel_a.rx`
3373 ///
3374 /// # Returns
3375 ///
3376 /// A tuple `(channel_a, channel_b)` of connected channel endpoints.
3377 #[must_use]
3378 pub fn duplex() -> (Self, Self) {
3379 // Create channels: A sends Result<Message> which B receives as Message
3380 let (a_tx, b_rx) = mpsc::unbounded();
3381 let (b_tx, a_rx) = mpsc::unbounded();
3382
3383 let channel_a = Self { rx: a_rx, tx: a_tx };
3384 let channel_b = Self { rx: b_rx, tx: b_tx };
3385
3386 (channel_a, channel_b)
3387 }
3388
3389 /// Copy messages from `rx` to `tx`.
3390 ///
3391 /// # Returns
3392 ///
3393 /// A `Result` indicating success or failure.
3394 pub async fn copy(mut self) -> Result<(), crate::Error> {
3395 while let Some(msg) = self.rx.next().await {
3396 self.tx
3397 .unbounded_send(msg)
3398 .map_err(crate::util::internal_error)?;
3399 }
3400 Ok(())
3401 }
3402}
3403
3404impl<R: Role> ConnectTo<R> for Channel {
3405 async fn connect_to(self, client: impl ConnectTo<R::Counterpart>) -> Result<(), crate::Error> {
3406 let (client_channel, client_serve) = client.into_channel_and_future();
3407
3408 match futures::try_join!(
3409 Channel {
3410 rx: client_channel.rx,
3411 tx: self.tx
3412 }
3413 .copy(),
3414 Channel {
3415 rx: self.rx,
3416 tx: client_channel.tx
3417 }
3418 .copy(),
3419 client_serve
3420 ) {
3421 Ok(((), (), ())) => Ok(()),
3422 Err(err) => Err(err),
3423 }
3424 }
3425
3426 fn into_channel_and_future(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
3427 (self, Box::pin(future::ready(Ok(()))))
3428 }
3429}