Skip to main content

agent_client_protocol/
jsonrpc.rs

1//! Core JSON-RPC server support.
2
3use agent_client_protocol_schema::SessionId;
4// Re-export jsonrpcmsg for use in public API
5pub use jsonrpcmsg;
6
7// Types re-exported from crate root
8use serde::{Deserialize, Serialize};
9use std::fmt::Debug;
10use std::panic::Location;
11use std::pin::pin;
12use uuid::Uuid;
13
14use futures::channel::{mpsc, oneshot};
15use futures::future::{self, BoxFuture, Either};
16use futures::{AsyncRead, AsyncWrite, StreamExt};
17
18mod dynamic_handler;
19pub(crate) mod handlers;
20mod incoming_actor;
21mod outgoing_actor;
22pub(crate) mod run;
23mod task_actor;
24mod transport_actor;
25
26use crate::jsonrpc::dynamic_handler::DynamicHandlerMessage;
27pub use crate::jsonrpc::handlers::NullHandler;
28use crate::jsonrpc::handlers::{ChainedHandler, NamedHandler};
29use crate::jsonrpc::handlers::{MessageHandler, NotificationHandler, RequestHandler};
30use crate::jsonrpc::outgoing_actor::{OutgoingMessageTx, send_raw_message};
31use crate::jsonrpc::run::SpawnedRun;
32use crate::jsonrpc::run::{ChainRun, NullRun, RunWithConnectionTo};
33use crate::jsonrpc::task_actor::{Task, TaskTx};
34use crate::mcp_server::McpServer;
35use crate::role::HasPeer;
36use crate::role::Role;
37use crate::util::json_cast;
38use crate::{Agent, Client, ConnectTo, RoleId};
39
40/// Handlers process incoming JSON-RPC messages on a connection.
41///
42/// When messages arrive, they flow through a chain of handlers. Each handler can
43/// either **claim** the message (handle it) or **decline** it (pass to the next handler).
44///
45/// # Message Flow
46///
47/// Messages flow through three layers of handlers in order:
48///
49/// ```text
50/// ┌─────────────────────────────────────────────────────────────────┐
51/// │                     Incoming Message                            │
52/// └─────────────────────────────────────────────────────────────────┘
53///                              │
54///                              ▼
55/// ┌─────────────────────────────────────────────────────────────────┐
56/// │  1. User Handlers (registered via on_receive_request, etc.)     │
57/// │     - Tried in registration order                               │
58/// │     - First handler to return Handled::Yes claims the message   │
59/// └─────────────────────────────────────────────────────────────────┘
60///                              │ Handled::No
61///                              ▼
62/// ┌─────────────────────────────────────────────────────────────────┐
63/// │  2. Dynamic Handlers (added at runtime)                         │
64/// │     - Used for session-specific message handling                │
65/// │     - Added via ConnectionTo::add_dynamic_handler             │
66/// └─────────────────────────────────────────────────────────────────┘
67///                              │ Handled::No
68///                              ▼
69/// ┌─────────────────────────────────────────────────────────────────┐
70/// │  3. Role Default Handler                                        │
71/// │     - Fallback based on the connection's Role                   │
72/// │     - Handles protocol-level messages (e.g., proxy forwarding)  │
73/// └─────────────────────────────────────────────────────────────────┘
74///                              │ Handled::No
75///                              ▼
76/// ┌─────────────────────────────────────────────────────────────────┐
77/// │  Unhandled: Error response sent (or queued if retry=true)       │
78/// └─────────────────────────────────────────────────────────────────┘
79/// ```
80///
81/// # The `Handled` Return Value
82///
83/// Each handler returns [`Handled`] to indicate whether it processed the message:
84///
85/// - **`Handled::Yes`** - Message was handled. No further handlers are invoked.
86/// - **`Handled::No { message, retry }`** - Message was not handled. The message
87///   (possibly modified) is passed to the next handler in the chain.
88///
89/// For convenience, handlers can return `()` which is equivalent to `Handled::Yes`.
90///
91/// # The Retry Mechanism
92///
93/// The `retry` flag in `Handled::No` controls what happens when no handler claims a message:
94///
95/// - **`retry: false`** (default) - Send a "method not found" error response immediately.
96/// - **`retry: true`** - Queue the message and retry it when new dynamic handlers are added.
97///
98/// This mechanism exists because of a timing issue with sessions: when a `session/new`
99/// response is being processed, the dynamic handler for that session hasn't been registered
100/// yet, but `session/update` notifications for that session may already be arriving.
101/// By setting `retry: true`, these early notifications are queued until the session's
102/// dynamic handler is added.
103///
104/// # Handler Registration
105///
106/// Most users register handlers using the builder methods on [`Builder`]:
107///
108/// ```
109/// # use agent_client_protocol::{Agent, Client, ConnectTo};
110/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, AgentCapabilities};
111/// # use agent_client_protocol_test::StatusUpdate;
112/// # async fn example(transport: impl ConnectTo<Agent>) -> Result<(), agent_client_protocol::Error> {
113/// Agent.builder()
114///     .on_receive_request(async |req: InitializeRequest, responder, cx| {
115///         responder.respond(
116///             InitializeResponse::new(req.protocol_version)
117///                 .agent_capabilities(AgentCapabilities::new()),
118///         )
119///     }, agent_client_protocol::on_receive_request!())
120///     .on_receive_notification(async |notif: StatusUpdate, cx| {
121///         // Process notification
122///         Ok(())
123///     }, agent_client_protocol::on_receive_notification!())
124///     .connect_to(transport)
125///     .await?;
126/// # Ok(())
127/// # }
128/// ```
129///
130/// The type parameter on the closure determines which messages are dispatched to it.
131/// Messages that don't match the type are automatically passed to the next handler.
132///
133/// # Implementing Custom Handlers
134///
135/// For advanced use cases, you can implement `HandleMessageAs` directly:
136///
137/// ```ignore
138/// struct MyHandler;
139///
140/// impl HandleMessageAs<Agent> for MyHandler {
141///
142///     async fn handle_dispatch(
143///         &mut self,
144///         message: Dispatch,
145///         cx: ConnectionTo<Self::Role>,
146///     ) -> Result<Handled<Dispatch>, Error> {
147///         if message.method() == "my/custom/method" {
148///             // Handle it
149///             Ok(Handled::Yes)
150///         } else {
151///             // Pass to next handler
152///             Ok(Handled::No { message, retry: false })
153///         }
154///     }
155///
156///     fn describe_chain(&self) -> impl std::fmt::Debug {
157///         "MyHandler"
158///     }
159/// }
160/// ```
161///
162/// # Important: Handlers Must Not Block
163///
164/// The connection processes messages on a single async task. While a handler is running,
165/// no other messages can be processed. For expensive operations, use [`ConnectionTo::spawn`]
166/// to run work concurrently:
167///
168/// ```
169/// # use agent_client_protocol::{Client, Agent, ConnectTo};
170/// # use agent_client_protocol_test::{expensive_operation, ProcessComplete};
171/// # async fn example(transport: impl ConnectTo<Client>) -> Result<(), agent_client_protocol::Error> {
172/// # Client.builder().connect_with(transport, async |cx| {
173/// cx.spawn({
174///     let connection = cx.clone();
175///     async move {
176///         let result = expensive_operation("data").await?;
177///         connection.send_notification(ProcessComplete { result })?;
178///         Ok(())
179///     }
180/// })?;
181/// # Ok(())
182/// # }).await?;
183/// # Ok(())
184/// # }
185/// ```
186#[allow(async_fn_in_trait)]
187/// A handler for incoming JSON-RPC messages.
188///
189/// This trait is implemented by types that can process incoming messages on a connection.
190/// Handlers are registered with a [`Builder`] and are called in order until
191/// one claims the message.
192///
193/// The type parameter `R` is the role this handler plays - who I am.
194/// For an agent handler, `R = Agent` (I handle messages as an agent).
195/// For a client handler, `R = Client` (I handle messages as a client).
196pub trait HandleDispatchFrom<Counterpart: Role>: Send {
197    /// Attempt to claim an incoming message (request or notification).
198    ///
199    /// # Important: do not block
200    ///
201    /// The server will not process new messages until this handler returns.
202    /// You should avoid blocking in this callback unless you wish to block the server (e.g., for rate limiting).
203    /// The recommended approach to manage expensive operations is to the [`ConnectionTo::spawn`] method available on the message context.
204    ///
205    /// # Parameters
206    ///
207    /// * `message` - The incoming message to handle.
208    /// * `connection` - The connection, used to send messages and access connection state.
209    ///
210    /// # Returns
211    ///
212    /// * `Ok(Handled::Yes)` if the message was claimed. It will not be propagated further.
213    /// * `Ok(Handled::No(message))` if not; the (possibly changed) message will be passed to the remaining handlers.
214    /// * `Err` if an internal error occurs (this will bring down the server).
215    fn handle_dispatch_from(
216        &mut self,
217        message: Dispatch,
218        connection: ConnectionTo<Counterpart>,
219    ) -> impl Future<Output = Result<Handled<Dispatch>, crate::Error>> + Send;
220
221    /// Returns a debug description of the registered handlers for diagnostics.
222    fn describe_chain(&self) -> impl std::fmt::Debug;
223}
224
225impl<Counterpart: Role, H> HandleDispatchFrom<Counterpart> for &mut H
226where
227    H: HandleDispatchFrom<Counterpart>,
228{
229    fn handle_dispatch_from(
230        &mut self,
231        message: Dispatch,
232        cx: ConnectionTo<Counterpart>,
233    ) -> impl Future<Output = Result<Handled<Dispatch>, crate::Error>> + Send {
234        H::handle_dispatch_from(self, message, cx)
235    }
236
237    fn describe_chain(&self) -> impl std::fmt::Debug {
238        H::describe_chain(self)
239    }
240}
241
242/// A JSON-RPC connection that can act as either a server, client, or both.
243///
244/// [`Builder`] provides a builder-style API for creating JSON-RPC servers and clients.
245/// You start by calling `Role.builder()` (e.g., `Client.builder()`), then add message
246/// handlers, and finally drive the connection with either [`connect_to`](Builder::connect_to)
247/// or [`connect_with`](Builder::connect_with), providing a component implementation
248/// (e.g., [`ByteStreams`] for byte streams).
249///
250/// # JSON-RPC Primer
251///
252/// JSON-RPC 2.0 has two fundamental message types:
253///
254/// * **Requests** - Messages that expect a response. They have an `id` field that gets
255///   echoed back in the response so the sender can correlate them.
256/// * **Notifications** - Fire-and-forget messages with no `id` field. The sender doesn't
257///   expect or receive a response.
258///
259/// # Type-Driven Message Dispatch
260///
261/// The handler registration methods use Rust's type system to determine which messages
262/// to handle. The type parameter you provide controls what gets dispatched to your handler:
263///
264/// ## Single Message Types
265///
266/// The simplest case - handle one specific message type:
267///
268/// ```no_run
269/// # use agent_client_protocol_test::*;
270/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, SessionNotification};
271/// # async fn example() -> Result<(), agent_client_protocol::Error> {
272/// # let connection = mock_connection();
273/// connection
274///     .on_receive_request(async |req: InitializeRequest, responder, cx| {
275///         // Handle only InitializeRequest messages
276///         responder.respond(InitializeResponse::make())
277///     }, agent_client_protocol::on_receive_request!())
278///     .on_receive_notification(async |notif: SessionNotification, cx| {
279///         // Handle only SessionUpdate notifications
280///         Ok(())
281///     }, agent_client_protocol::on_receive_notification!())
282/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
283/// # Ok(())
284/// # }
285/// ```
286///
287/// ## Enum Message Types
288///
289/// You can also handle multiple related messages with a single handler by defining an enum
290/// that implements the appropriate trait ([`JsonRpcRequest`] or [`JsonRpcNotification`]):
291///
292/// ```no_run
293/// # use agent_client_protocol_test::*;
294/// # use agent_client_protocol::{JsonRpcRequest, JsonRpcMessage, UntypedMessage};
295/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse};
296/// # async fn example() -> Result<(), agent_client_protocol::Error> {
297/// # let connection = mock_connection();
298/// // Define an enum for multiple request types
299/// #[derive(Debug, Clone)]
300/// enum MyRequests {
301///     Initialize(InitializeRequest),
302///     Prompt(PromptRequest),
303/// }
304///
305/// // Implement JsonRpcRequest for your enum
306/// # impl JsonRpcMessage for MyRequests {
307/// #     fn matches_method(_method: &str) -> bool { false }
308/// #     fn method(&self) -> &str { "myRequests" }
309/// #     fn to_untyped_message(&self) -> Result<UntypedMessage, agent_client_protocol::Error> { todo!() }
310/// #     fn parse_message(_method: &str, _params: &impl serde::Serialize) -> Result<Self, agent_client_protocol::Error> { Err(agent_client_protocol::Error::method_not_found()) }
311/// # }
312/// impl JsonRpcRequest for MyRequests { type Response = serde_json::Value; }
313///
314/// // Handle all variants in one place
315/// connection.on_receive_request(async |req: MyRequests, responder, cx| {
316///     match req {
317///         MyRequests::Initialize(init) => { responder.respond(serde_json::json!({})) }
318///         MyRequests::Prompt(prompt) => { responder.respond(serde_json::json!({})) }
319///     }
320/// }, agent_client_protocol::on_receive_request!())
321/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
322/// # Ok(())
323/// # }
324/// ```
325///
326/// ## Mixed Message Types
327///
328/// For enums containing both requests AND notifications, use [`on_receive_dispatch`](Self::on_receive_dispatch):
329///
330/// ```no_run
331/// # use agent_client_protocol_test::*;
332/// # use agent_client_protocol::Dispatch;
333/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, SessionNotification};
334/// # async fn example() -> Result<(), agent_client_protocol::Error> {
335/// # let connection = mock_connection();
336/// // on_receive_dispatch receives Dispatch which can be either a request or notification
337/// connection.on_receive_dispatch(async |msg: Dispatch<InitializeRequest, SessionNotification>, _cx| {
338///     match msg {
339///         Dispatch::Request(req, responder) => {
340///             responder.respond(InitializeResponse::make())
341///         }
342///         Dispatch::Notification(notif) => {
343///             Ok(())
344///         }
345///         Dispatch::Response(result, router) => {
346///             // Forward response to its destination
347///             router.respond_with_result(result)
348///         }
349///     }
350/// }, agent_client_protocol::on_receive_dispatch!())
351/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
352/// # Ok(())
353/// # }
354/// ```
355///
356/// # Handler Registration
357///
358/// Register handlers using these methods (listed from most common to most flexible):
359///
360/// * [`on_receive_request`](Self::on_receive_request) - Handle JSON-RPC requests (messages expecting responses)
361/// * [`on_receive_notification`](Self::on_receive_notification) - Handle JSON-RPC notifications (fire-and-forget)
362/// * [`on_receive_dispatch`](Self::on_receive_dispatch) - Handle enums containing both requests and notifications
363/// * [`with_handler`](Self::with_handler) - Low-level primitive for maximum flexibility
364///
365/// ## Handler Ordering
366///
367/// Handlers are tried in the order you register them. The first handler that claims a message
368/// (by matching its type) will process it. Subsequent handlers won't see that message:
369///
370/// ```no_run
371/// # use agent_client_protocol_test::*;
372/// # use agent_client_protocol::{Dispatch, UntypedMessage};
373/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse};
374/// # async fn example() -> Result<(), agent_client_protocol::Error> {
375/// # let connection = mock_connection();
376/// connection
377///     .on_receive_request(async |req: InitializeRequest, responder, cx| {
378///         // This runs first for InitializeRequest
379///         responder.respond(InitializeResponse::make())
380///     }, agent_client_protocol::on_receive_request!())
381///     .on_receive_request(async |req: PromptRequest, responder, cx| {
382///         // This runs first for PromptRequest
383///         responder.respond(PromptResponse::make())
384///     }, agent_client_protocol::on_receive_request!())
385///     .on_receive_dispatch(async |msg: Dispatch, cx| {
386///         // This runs for any message not handled above
387///         msg.respond_with_error(agent_client_protocol::util::internal_error("unknown method"), cx)
388///     }, agent_client_protocol::on_receive_dispatch!())
389/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
390/// # Ok(())
391/// # }
392/// ```
393///
394/// # Event Loop and Concurrency
395///
396/// Understanding the event loop is critical for writing correct handlers.
397///
398/// ## The Event Loop
399///
400/// [`Builder`] runs all handler callbacks on a single async task - the event loop.
401/// While a handler is running, **the server cannot receive new messages**. This means
402/// any blocking or expensive work in your handlers will stall the entire connection.
403///
404/// To avoid blocking the event loop, use [`ConnectionTo::spawn`] to offload serious
405/// work to concurrent tasks:
406///
407/// ```no_run
408/// # use agent_client_protocol_test::*;
409/// # async fn example() -> Result<(), agent_client_protocol::Error> {
410/// # let connection = mock_connection();
411/// connection.on_receive_request(async |req: AnalyzeRequest, responder, cx| {
412///     // Clone cx for the spawned task
413///     cx.spawn({
414///         let connection = cx.clone();
415///         async move {
416///             let result = expensive_analysis(&req.data).await?;
417///             connection.send_notification(AnalysisComplete { result })?;
418///             Ok(())
419///         }
420///     })?;
421///
422///     // Respond immediately without blocking
423///     responder.respond(AnalysisStarted { job_id: 42 })
424/// }, agent_client_protocol::on_receive_request!())
425/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
426/// # Ok(())
427/// # }
428/// ```
429///
430/// Note that the entire connection runs within one async task, so parallelism must be
431/// managed explicitly using [`spawn`](ConnectionTo::spawn).
432///
433/// ## The Connection Context
434///
435/// Handler callbacks receive a context object (`cx`) for interacting with the connection:
436///
437/// * **For request handlers** - [`Responder<R>`] provides [`respond`](Responder::respond)
438///   to send the response, plus methods to send other messages
439/// * **For notification handlers** - [`ConnectionTo`] provides methods to send messages
440///   and spawn tasks
441///
442/// Both context types support:
443/// * [`send_request`](ConnectionTo::send_request) - Send requests to the other side
444/// * [`send_notification`](ConnectionTo::send_notification) - Send notifications
445/// * [`spawn`](ConnectionTo::spawn) - Run tasks concurrently without blocking the event loop
446///
447/// The [`SentRequest`] returned by `send_request` provides methods like
448/// [`on_receiving_result`](SentRequest::on_receiving_result) that help you
449/// avoid accidentally blocking the event loop while waiting for responses.
450///
451/// # Driving the Connection
452///
453/// After adding handlers, you must drive the connection using one of two modes:
454///
455/// ## Server Mode: `connect_to()`
456///
457/// Use [`connect_to`](Self::connect_to) when you only need to respond to incoming messages:
458///
459/// ```no_run
460/// # use agent_client_protocol_test::*;
461/// # async fn example() -> Result<(), agent_client_protocol::Error> {
462/// # let connection = mock_connection();
463/// connection
464///     .on_receive_request(async |req: MyRequest, responder, cx| {
465///         responder.respond(MyResponse { status: "ok".into() })
466///     }, agent_client_protocol::on_receive_request!())
467///     .connect_to(MockTransport)  // Runs until connection closes or error occurs
468///     .await?;
469/// # Ok(())
470/// # }
471/// ```
472///
473/// The connection will process incoming messages and invoke your handlers until the
474/// connection is closed or an error occurs.
475///
476/// ## Client Mode: `connect_with()`
477///
478/// Use [`connect_with`](Self::connect_with) when you need to both handle incoming messages
479/// AND send your own requests/notifications:
480///
481/// ```no_run
482/// # use agent_client_protocol_test::*;
483/// # use agent_client_protocol::schema::InitializeRequest;
484/// # async fn example() -> Result<(), agent_client_protocol::Error> {
485/// # let connection = mock_connection();
486/// connection
487///     .on_receive_request(async |req: MyRequest, responder, cx| {
488///         responder.respond(MyResponse { status: "ok".into() })
489///     }, agent_client_protocol::on_receive_request!())
490///     .connect_with(MockTransport, async |cx| {
491///         // You can send requests to the other side
492///         let response = cx.send_request(InitializeRequest::make())
493///             .block_task()
494///             .await?;
495///
496///         // And send notifications
497///         cx.send_notification(StatusUpdate { message: "ready".into() })?;
498///
499///         Ok(())
500///     })
501///     .await?;
502/// # Ok(())
503/// # }
504/// ```
505///
506/// The connection will serve incoming messages in the background while your client closure
507/// runs. When the closure returns, the connection shuts down.
508///
509/// # Example: Complete Agent
510///
511/// ```no_run
512/// # use agent_client_protocol::UntypedRole;
513/// # use agent_client_protocol::{Builder};
514/// # use agent_client_protocol::Stdio;
515/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse, SessionNotification};
516/// # async fn example() -> Result<(), agent_client_protocol::Error> {
517/// let transport = Stdio::new();
518///
519/// UntypedRole.builder()
520///     .name("my-agent")  // Optional: for debugging logs
521///     .on_receive_request(async |init: InitializeRequest, responder, cx| {
522///         let response: InitializeResponse = todo!();
523///         responder.respond(response)
524///     }, agent_client_protocol::on_receive_request!())
525///     .on_receive_request(async |prompt: PromptRequest, responder, cx| {
526///         // You can send notifications while processing a request
527///         let notif: SessionNotification = todo!();
528///         cx.send_notification(notif)?;
529///
530///         // Then respond to the request
531///         let response: PromptResponse = todo!();
532///         responder.respond(response)
533///     }, agent_client_protocol::on_receive_request!())
534///     .connect_to(transport)
535///     .await?;
536/// # Ok(())
537/// # }
538/// ```
539#[must_use]
540#[derive(Debug)]
541pub struct Builder<Host: Role, Handler = NullHandler, Runner = NullRun>
542where
543    Handler: HandleDispatchFrom<Host::Counterpart>,
544    Runner: RunWithConnectionTo<Host::Counterpart>,
545{
546    /// My role.
547    host: Host,
548
549    /// Name of the connection, used in tracing logs.
550    name: Option<String>,
551
552    /// Handler for incoming messages.
553    handler: Handler,
554
555    /// Responder for background tasks.
556    responder: Runner,
557}
558
559impl<Host: Role> Builder<Host, NullHandler, NullRun> {
560    /// Create a new connection builder for the given role.
561    /// This type follows a builder pattern; use other methods to configure and then invoke
562    /// [`Self::connect_to`] (to use as a server) or [`Self::connect_with`] to use as a client.
563    pub fn new(role: Host) -> Self {
564        Self {
565            host: role,
566            name: None,
567            handler: NullHandler,
568            responder: NullRun,
569        }
570    }
571}
572
573impl<Host: Role, Handler> Builder<Host, Handler, NullRun>
574where
575    Handler: HandleDispatchFrom<Host::Counterpart>,
576{
577    /// Create a new connection builder with the given handler.
578    pub fn new_with(role: Host, handler: Handler) -> Self {
579        Self {
580            host: role,
581            name: None,
582            handler,
583            responder: NullRun,
584        }
585    }
586}
587
588impl<
589    Host: Role,
590    Handler: HandleDispatchFrom<Host::Counterpart>,
591    Runner: RunWithConnectionTo<Host::Counterpart>,
592> Builder<Host, Handler, Runner>
593{
594    /// Set the "name" of this connection -- used only for debugging logs.
595    pub fn name(mut self, name: impl ToString) -> Self {
596        self.name = Some(name.to_string());
597        self
598    }
599
600    /// Merge another [`Builder`] into this one.
601    ///
602    /// Prefer [`Self::on_receive_request`] or [`Self::on_receive_notification`].
603    /// This is a low-level method that is not intended for general use.
604    pub fn with_connection_builder(
605        self,
606        other: Builder<
607            Host,
608            impl HandleDispatchFrom<Host::Counterpart>,
609            impl RunWithConnectionTo<Host::Counterpart>,
610        >,
611    ) -> Builder<
612        Host,
613        impl HandleDispatchFrom<Host::Counterpart>,
614        impl RunWithConnectionTo<Host::Counterpart>,
615    > {
616        Builder {
617            host: self.host,
618            name: self.name,
619            handler: ChainedHandler::new(
620                self.handler,
621                NamedHandler::new(other.name, other.handler),
622            ),
623            responder: ChainRun::new(self.responder, other.responder),
624        }
625    }
626
627    /// Add a new [`HandleDispatchFrom`] to the chain.
628    ///
629    /// Prefer [`Self::on_receive_request`] or [`Self::on_receive_notification`].
630    /// This is a low-level method that is not intended for general use.
631    pub fn with_handler(
632        self,
633        handler: impl HandleDispatchFrom<Host::Counterpart>,
634    ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner> {
635        Builder {
636            host: self.host,
637            name: self.name,
638            handler: ChainedHandler::new(self.handler, handler),
639            responder: self.responder,
640        }
641    }
642
643    /// Add a new [`RunWithConnectionTo`] to the chain.
644    pub fn with_responder<Run1>(
645        self,
646        responder: Run1,
647    ) -> Builder<Host, Handler, impl RunWithConnectionTo<Host::Counterpart>>
648    where
649        Run1: RunWithConnectionTo<Host::Counterpart>,
650    {
651        Builder {
652            host: self.host,
653            name: self.name,
654            handler: self.handler,
655            responder: ChainRun::new(self.responder, responder),
656        }
657    }
658
659    /// Enqueue a task to run once the connection is actively serving traffic.
660    #[track_caller]
661    pub fn with_spawned<F, Fut>(
662        self,
663        task: F,
664    ) -> Builder<Host, Handler, impl RunWithConnectionTo<Host::Counterpart>>
665    where
666        F: FnOnce(ConnectionTo<Host::Counterpart>) -> Fut + Send,
667        Fut: Future<Output = Result<(), crate::Error>> + Send,
668    {
669        let location = Location::caller();
670        self.with_responder(SpawnedRun::new(location, task))
671    }
672
673    /// Register a handler for messages that can be either requests OR notifications.
674    ///
675    /// Use this when you want to handle an enum type that contains both request and
676    /// notification variants. Your handler receives a [`Dispatch<Req, Notif>`] which
677    /// is an enum with two variants:
678    ///
679    /// - `Dispatch::Request(request, responder)` - A request with its response context
680    /// - `Dispatch::Notification(notification)` - A notification
681    /// - `Dispatch::Response(result, router)` - A response to a request we sent
682    ///
683    /// # Example
684    ///
685    /// ```no_run
686    /// # use agent_client_protocol_test::*;
687    /// # use agent_client_protocol::Dispatch;
688    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
689    /// # let connection = mock_connection();
690    /// connection.on_receive_dispatch(async |message: Dispatch<MyRequest, StatusUpdate>, _cx| {
691    ///     match message {
692    ///         Dispatch::Request(req, responder) => {
693    ///             // Handle request and send response
694    ///             responder.respond(MyResponse { status: "ok".into() })
695    ///         }
696    ///         Dispatch::Notification(notif) => {
697    ///             // Handle notification (no response needed)
698    ///             Ok(())
699    ///         }
700    ///         Dispatch::Response(result, router) => {
701    ///             // Forward response to its destination
702    ///             router.respond_with_result(result)
703    ///         }
704    ///     }
705    /// }, agent_client_protocol::on_receive_dispatch!())
706    /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
707    /// # Ok(())
708    /// # }
709    /// ```
710    ///
711    /// For most use cases, prefer [`on_receive_request`](Self::on_receive_request) or
712    /// [`on_receive_notification`](Self::on_receive_notification) which provide cleaner APIs
713    /// for handling requests or notifications separately.
714    ///
715    /// # Ordering
716    ///
717    /// This callback runs inside the dispatch loop and blocks further message processing
718    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
719    /// ordering guarantees and how to avoid deadlocks.
720    pub fn on_receive_dispatch<Req, Notif, F, T, ToFut>(
721        self,
722        op: F,
723        to_future_hack: ToFut,
724    ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
725    where
726        Host::Counterpart: HasPeer<Host::Counterpart>,
727        Req: JsonRpcRequest,
728        Notif: JsonRpcNotification,
729        F: AsyncFnMut(
730                Dispatch<Req, Notif>,
731                ConnectionTo<Host::Counterpart>,
732            ) -> Result<T, crate::Error>
733            + Send,
734        T: IntoHandled<Dispatch<Req, Notif>>,
735        ToFut: Fn(
736                &mut F,
737                Dispatch<Req, Notif>,
738                ConnectionTo<Host::Counterpart>,
739            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
740            + Send
741            + Sync,
742    {
743        let handler = MessageHandler::new(
744            self.host.counterpart(),
745            self.host.counterpart(),
746            op,
747            to_future_hack,
748        );
749        self.with_handler(handler)
750    }
751
752    /// Register a handler for JSON-RPC requests of type `Req`.
753    ///
754    /// Your handler receives two arguments:
755    /// 1. The request (type `Req`)
756    /// 2. A [`Responder<R, Req::Response>`] for sending the response
757    ///
758    /// The request context allows you to:
759    /// - Send the response with [`Responder::respond`]
760    /// - Send notifications to the client with [`ConnectionTo::send_notification`]
761    /// - Send requests to the client with [`ConnectionTo::send_request`]
762    ///
763    /// # Example
764    ///
765    /// ```ignore
766    /// # use agent_client_protocol::UntypedRole;
767    /// # use agent_client_protocol::{Builder};
768    /// # use agent_client_protocol::schema::{PromptRequest, PromptResponse, SessionNotification};
769    /// # fn example<R: agent_client_protocol::Role>(connection: Builder<R, impl agent_client_protocol::HandleMessageAs<R>>) {
770    /// connection.on_receive_request(async |request: PromptRequest, responder, cx| {
771    ///     // Send a notification while processing
772    ///     let notif: SessionNotification = todo!();
773    ///     cx.send_notification(notif)?;
774    ///
775    ///     // Do some work...
776    ///     let result = todo!("process the prompt");
777    ///
778    ///     // Send the response
779    ///     let response: PromptResponse = todo!();
780    ///     responder.respond(response)
781    /// }, agent_client_protocol::on_receive_request!());
782    /// # }
783    /// ```
784    ///
785    /// # Type Parameter
786    ///
787    /// `Req` can be either a single request type or an enum of multiple request types.
788    /// See the [type-driven dispatch](Self#type-driven-message-dispatch) section for details.
789    ///
790    /// # Ordering
791    ///
792    /// This callback runs inside the dispatch loop and blocks further message processing
793    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
794    /// ordering guarantees and how to avoid deadlocks.
795    pub fn on_receive_request<Req: JsonRpcRequest, F, T, ToFut>(
796        self,
797        op: F,
798        to_future_hack: ToFut,
799    ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
800    where
801        Host::Counterpart: HasPeer<Host::Counterpart>,
802        F: AsyncFnMut(
803                Req,
804                Responder<Req::Response>,
805                ConnectionTo<Host::Counterpart>,
806            ) -> Result<T, crate::Error>
807            + Send,
808        T: IntoHandled<(Req, Responder<Req::Response>)>,
809        ToFut: Fn(
810                &mut F,
811                Req,
812                Responder<Req::Response>,
813                ConnectionTo<Host::Counterpart>,
814            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
815            + Send
816            + Sync,
817    {
818        let handler = RequestHandler::new(
819            self.host.counterpart(),
820            self.host.counterpart(),
821            op,
822            to_future_hack,
823        );
824        self.with_handler(handler)
825    }
826
827    /// Register a handler for JSON-RPC notifications of type `Notif`.
828    ///
829    /// Notifications are fire-and-forget messages that don't expect a response.
830    /// Your handler receives:
831    /// 1. The notification (type `Notif`)
832    /// 2. A [`ConnectionTo<R>`] for sending messages to the other side
833    ///
834    /// Unlike request handlers, you cannot send a response (notifications don't have IDs),
835    /// but you can still send your own requests and notifications using the context.
836    ///
837    /// # Example
838    ///
839    /// ```no_run
840    /// # use agent_client_protocol_test::*;
841    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
842    /// # let connection = mock_connection();
843    /// connection.on_receive_notification(async |notif: SessionUpdate, cx| {
844    ///     // Process the notification
845    ///     update_session_state(&notif)?;
846    ///
847    ///     // Optionally send a notification back
848    ///     cx.send_notification(StatusUpdate {
849    ///         message: "Acknowledged".into(),
850    ///     })?;
851    ///
852    ///     Ok(())
853    /// }, agent_client_protocol::on_receive_notification!())
854    /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
855    /// # Ok(())
856    /// # }
857    /// ```
858    ///
859    /// # Type Parameter
860    ///
861    /// `Notif` can be either a single notification type or an enum of multiple notification types.
862    /// See the [type-driven dispatch](Self#type-driven-message-dispatch) section for details.
863    ///
864    /// # Ordering
865    ///
866    /// This callback runs inside the dispatch loop and blocks further message processing
867    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
868    /// ordering guarantees and how to avoid deadlocks.
869    pub fn on_receive_notification<Notif, F, T, ToFut>(
870        self,
871        op: F,
872        to_future_hack: ToFut,
873    ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
874    where
875        Host::Counterpart: HasPeer<Host::Counterpart>,
876        Notif: JsonRpcNotification,
877        F: AsyncFnMut(Notif, ConnectionTo<Host::Counterpart>) -> Result<T, crate::Error> + Send,
878        T: IntoHandled<(Notif, ConnectionTo<Host::Counterpart>)>,
879        ToFut: Fn(
880                &mut F,
881                Notif,
882                ConnectionTo<Host::Counterpart>,
883            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
884            + Send
885            + Sync,
886    {
887        let handler = NotificationHandler::new(
888            self.host.counterpart(),
889            self.host.counterpart(),
890            op,
891            to_future_hack,
892        );
893        self.with_handler(handler)
894    }
895
896    /// Register a handler for messages from a specific peer.
897    ///
898    /// This is similar to [`on_receive_dispatch`](Self::on_receive_dispatch), but allows
899    /// specifying the source peer explicitly. This is useful when receiving messages
900    /// from a peer that requires message transformation (e.g., unwrapping `SuccessorMessage`
901    /// envelopes when receiving from an agent via a proxy).
902    ///
903    /// For the common case of receiving from the default counterpart, use
904    /// [`on_receive_dispatch`](Self::on_receive_dispatch) instead.
905    ///
906    /// # Ordering
907    ///
908    /// This callback runs inside the dispatch loop and blocks further message processing
909    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
910    /// ordering guarantees and how to avoid deadlocks.
911    pub fn on_receive_dispatch_from<
912        Req: JsonRpcRequest,
913        Notif: JsonRpcNotification,
914        Peer: Role,
915        F,
916        T,
917        ToFut,
918    >(
919        self,
920        peer: Peer,
921        op: F,
922        to_future_hack: ToFut,
923    ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
924    where
925        Host::Counterpart: HasPeer<Peer>,
926        F: AsyncFnMut(
927                Dispatch<Req, Notif>,
928                ConnectionTo<Host::Counterpart>,
929            ) -> Result<T, crate::Error>
930            + Send,
931        T: IntoHandled<Dispatch<Req, Notif>>,
932        ToFut: Fn(
933                &mut F,
934                Dispatch<Req, Notif>,
935                ConnectionTo<Host::Counterpart>,
936            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
937            + Send
938            + Sync,
939    {
940        let handler = MessageHandler::new(self.host.counterpart(), peer, op, to_future_hack);
941        self.with_handler(handler)
942    }
943
944    /// Register a handler for JSON-RPC requests from a specific peer.
945    ///
946    /// This is similar to [`on_receive_request`](Self::on_receive_request), but allows
947    /// specifying the source peer explicitly. This is useful when receiving messages
948    /// from a peer that requires message transformation (e.g., unwrapping `SuccessorRequest`
949    /// envelopes when receiving from an agent via a proxy).
950    ///
951    /// For the common case of receiving from the default counterpart, use
952    /// [`on_receive_request`](Self::on_receive_request) instead.
953    ///
954    /// # Example
955    ///
956    /// ```ignore
957    /// use agent_client_protocol::Agent;
958    /// use agent_client_protocol::schema::InitializeRequest;
959    ///
960    /// // Conductor receiving from agent direction - messages will be unwrapped from SuccessorMessage
961    /// connection.on_receive_request_from(Agent, async |req: InitializeRequest, responder, cx| {
962    ///     // Handle the request
963    ///     responder.respond(InitializeResponse::make())
964    /// })
965    /// ```
966    ///
967    /// # Ordering
968    ///
969    /// This callback runs inside the dispatch loop and blocks further message processing
970    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
971    /// ordering guarantees and how to avoid deadlocks.
972    pub fn on_receive_request_from<Req: JsonRpcRequest, Peer: Role, F, T, ToFut>(
973        self,
974        peer: Peer,
975        op: F,
976        to_future_hack: ToFut,
977    ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
978    where
979        Host::Counterpart: HasPeer<Peer>,
980        F: AsyncFnMut(
981                Req,
982                Responder<Req::Response>,
983                ConnectionTo<Host::Counterpart>,
984            ) -> Result<T, crate::Error>
985            + Send,
986        T: IntoHandled<(Req, Responder<Req::Response>)>,
987        ToFut: Fn(
988                &mut F,
989                Req,
990                Responder<Req::Response>,
991                ConnectionTo<Host::Counterpart>,
992            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
993            + Send
994            + Sync,
995    {
996        let handler = RequestHandler::new(self.host.counterpart(), peer, op, to_future_hack);
997        self.with_handler(handler)
998    }
999
1000    /// Register a handler for JSON-RPC notifications from a specific peer.
1001    ///
1002    /// This is similar to [`on_receive_notification`](Self::on_receive_notification), but allows
1003    /// specifying the source peer explicitly. This is useful when receiving messages
1004    /// from a peer that requires message transformation (e.g., unwrapping `SuccessorNotification`
1005    /// envelopes when receiving from an agent via a proxy).
1006    ///
1007    /// For the common case of receiving from the default counterpart, use
1008    /// [`on_receive_notification`](Self::on_receive_notification) instead.
1009    ///
1010    /// # Ordering
1011    ///
1012    /// This callback runs inside the dispatch loop and blocks further message processing
1013    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
1014    /// ordering guarantees and how to avoid deadlocks.
1015    pub fn on_receive_notification_from<Notif: JsonRpcNotification, Peer: Role, F, T, ToFut>(
1016        self,
1017        peer: Peer,
1018        op: F,
1019        to_future_hack: ToFut,
1020    ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
1021    where
1022        Host::Counterpart: HasPeer<Peer>,
1023        F: AsyncFnMut(Notif, ConnectionTo<Host::Counterpart>) -> Result<T, crate::Error> + Send,
1024        T: IntoHandled<(Notif, ConnectionTo<Host::Counterpart>)>,
1025        ToFut: Fn(
1026                &mut F,
1027                Notif,
1028                ConnectionTo<Host::Counterpart>,
1029            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
1030            + Send
1031            + Sync,
1032    {
1033        let handler = NotificationHandler::new(self.host.counterpart(), peer, op, to_future_hack);
1034        self.with_handler(handler)
1035    }
1036
1037    /// Add an MCP server that will be added to all new sessions that are proxied through this connection.
1038    ///
1039    /// Only applicable to proxies.
1040    pub fn with_mcp_server(
1041        self,
1042        mcp_server: McpServer<Host::Counterpart, impl RunWithConnectionTo<Host::Counterpart>>,
1043    ) -> Builder<
1044        Host,
1045        impl HandleDispatchFrom<Host::Counterpart>,
1046        impl RunWithConnectionTo<Host::Counterpart>,
1047    >
1048    where
1049        Host::Counterpart: HasPeer<Agent> + HasPeer<Client>,
1050    {
1051        let (handler, responder) = mcp_server.into_handler_and_responder();
1052        self.with_handler(handler).with_responder(responder)
1053    }
1054
1055    /// Run in server mode with the provided transport.
1056    ///
1057    /// This drives the connection by continuously processing messages from the transport
1058    /// and dispatching them to your registered handlers. The connection will run until:
1059    /// - The transport closes (e.g., EOF on byte streams)
1060    /// - An error occurs
1061    /// - One of your handlers returns an error
1062    ///
1063    /// The transport is responsible for serializing and deserializing `jsonrpcmsg::Message`
1064    /// values to/from the underlying I/O mechanism (byte streams, channels, etc.).
1065    ///
1066    /// Use this mode when you only need to respond to incoming messages and don't need
1067    /// to initiate your own requests. If you need to send requests to the other side,
1068    /// use [`connect_with`](Self::connect_with) instead.
1069    ///
1070    /// # Example: Byte Stream Transport
1071    ///
1072    /// ```no_run
1073    /// # use agent_client_protocol::UntypedRole;
1074    /// # use agent_client_protocol::{Builder};
1075    /// # use agent_client_protocol::Stdio;
1076    /// # use agent_client_protocol_test::*;
1077    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
1078    /// let transport = Stdio::new();
1079    ///
1080    /// UntypedRole.builder()
1081    ///     .on_receive_request(async |req: MyRequest, responder, cx| {
1082    ///         responder.respond(MyResponse { status: "ok".into() })
1083    ///     }, agent_client_protocol::on_receive_request!())
1084    ///     .connect_to(transport)
1085    ///     .await?;
1086    /// # Ok(())
1087    /// # }
1088    /// ```
1089    pub async fn connect_to(
1090        self,
1091        transport: impl ConnectTo<Host> + 'static,
1092    ) -> Result<(), crate::Error> {
1093        self.connect_with(transport, async move |_cx| future::pending().await)
1094            .await
1095    }
1096
1097    /// Run the connection until the provided closure completes.
1098    ///
1099    /// This drives the connection by:
1100    /// 1. Running your registered handlers in the background to process incoming messages
1101    /// 2. Executing your `main_fn` closure with a [`ConnectionTo<R>`] for sending requests/notifications
1102    ///
1103    /// The connection stays active until your `main_fn` returns, then shuts down gracefully.
1104    /// If the connection closes unexpectedly before `main_fn` completes, this returns an error.
1105    ///
1106    /// Use this mode when you need to initiate communication (send requests/notifications)
1107    /// in addition to responding to incoming messages. For server-only mode where you just
1108    /// respond to messages, use [`connect_to`](Self::connect_to) instead.
1109    ///
1110    /// # Example
1111    ///
1112    /// ```no_run
1113    /// # use agent_client_protocol::UntypedRole;
1114    /// # use agent_client_protocol::{Builder};
1115    /// # use agent_client_protocol::ByteStreams;
1116    /// # use agent_client_protocol::schema::InitializeRequest;
1117    /// # use agent_client_protocol::Stdio;
1118    /// # use agent_client_protocol_test::*;
1119    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
1120    /// let transport = Stdio::new();
1121    ///
1122    /// UntypedRole.builder()
1123    ///     .on_receive_request(async |req: MyRequest, responder, cx| {
1124    ///         // Handle incoming requests in the background
1125    ///         responder.respond(MyResponse { status: "ok".into() })
1126    ///     }, agent_client_protocol::on_receive_request!())
1127    ///     .connect_with(transport, async |cx| {
1128    ///         // Initialize the protocol
1129    ///         let init_response = cx.send_request(InitializeRequest::make())
1130    ///             .block_task()
1131    ///             .await?;
1132    ///
1133    ///         // Send more requests...
1134    ///         let result = cx.send_request(MyRequest {})
1135    ///             .block_task()
1136    ///             .await?;
1137    ///
1138    ///         // When this closure returns, the connection shuts down
1139    ///         Ok(())
1140    ///     })
1141    ///     .await?;
1142    /// # Ok(())
1143    /// # }
1144    /// ```
1145    ///
1146    /// # Parameters
1147    ///
1148    /// - `main_fn`: Your client logic. Receives a [`ConnectionTo<R>`] for sending messages.
1149    ///
1150    /// # Errors
1151    ///
1152    /// Returns an error if the connection closes before `main_fn` completes.
1153    pub async fn connect_with<R>(
1154        self,
1155        transport: impl ConnectTo<Host> + 'static,
1156        main_fn: impl AsyncFnOnce(ConnectionTo<Host::Counterpart>) -> Result<R, crate::Error>,
1157    ) -> Result<R, crate::Error> {
1158        let (_, future) = self.into_connection_and_future(transport, main_fn);
1159        future.await
1160    }
1161
1162    /// Helper that returns a [`ConnectionTo<R>`] and a future that runs this connection until `main_fn` returns.
1163    fn into_connection_and_future<R>(
1164        self,
1165        transport: impl ConnectTo<Host> + 'static,
1166        main_fn: impl AsyncFnOnce(ConnectionTo<Host::Counterpart>) -> Result<R, crate::Error>,
1167    ) -> (
1168        ConnectionTo<Host::Counterpart>,
1169        impl Future<Output = Result<R, crate::Error>>,
1170    ) {
1171        let Self {
1172            name,
1173            handler,
1174            responder,
1175            host: me,
1176        } = self;
1177
1178        let (outgoing_tx, outgoing_rx) = mpsc::unbounded();
1179        let (new_task_tx, new_task_rx) = mpsc::unbounded();
1180        let (dynamic_handler_tx, dynamic_handler_rx) = mpsc::unbounded();
1181        let connection = ConnectionTo::new(
1182            me.counterpart(),
1183            outgoing_tx,
1184            new_task_tx,
1185            dynamic_handler_tx,
1186        );
1187
1188        // Convert transport into server - this returns a channel for us to use
1189        // and a future that runs the transport
1190        let transport_component = crate::DynConnectTo::new(transport);
1191        let (transport_channel, transport_future) = transport_component.into_channel_and_future();
1192        let spawn_result = connection.spawn(transport_future);
1193
1194        // Destructure the channel endpoints
1195        let Channel {
1196            rx: transport_incoming_rx,
1197            tx: transport_outgoing_tx,
1198        } = transport_channel;
1199
1200        let (reply_tx, reply_rx) = mpsc::unbounded();
1201
1202        let future = crate::util::instrument_with_connection_name(name, {
1203            let connection = connection.clone();
1204            async move {
1205                let () = spawn_result?;
1206
1207                let background = async {
1208                    futures::try_join!(
1209                        // Protocol layer: OutgoingMessage → jsonrpcmsg::Message
1210                        outgoing_actor::outgoing_protocol_actor(
1211                            outgoing_rx,
1212                            reply_tx.clone(),
1213                            transport_outgoing_tx,
1214                        ),
1215                        // Protocol layer: jsonrpcmsg::Message → handler/reply routing
1216                        incoming_actor::incoming_protocol_actor(
1217                            me.counterpart(),
1218                            &connection,
1219                            transport_incoming_rx,
1220                            dynamic_handler_rx,
1221                            reply_rx,
1222                            handler,
1223                        ),
1224                        task_actor::task_actor(new_task_rx, &connection),
1225                        responder.run_with_connection_to(connection.clone()),
1226                    )?;
1227                    Ok(())
1228                };
1229
1230                crate::util::run_until(Box::pin(background), Box::pin(main_fn(connection.clone())))
1231                    .await
1232            }
1233        });
1234
1235        (connection, future)
1236    }
1237}
1238
1239impl<R, H, Run> ConnectTo<R::Counterpart> for Builder<R, H, Run>
1240where
1241    R: Role,
1242    H: HandleDispatchFrom<R::Counterpart> + 'static,
1243    Run: RunWithConnectionTo<R::Counterpart> + 'static,
1244{
1245    async fn connect_to(self, client: impl ConnectTo<R>) -> Result<(), crate::Error> {
1246        Builder::connect_to(self, client).await
1247    }
1248}
1249
1250/// The payload sent through the response oneshot channel.
1251///
1252/// Includes the response value and an optional ack channel for dispatch loop
1253/// synchronization.
1254pub(crate) struct ResponsePayload {
1255    /// The response result - either the JSON value or an error.
1256    pub(crate) result: Result<serde_json::Value, crate::Error>,
1257
1258    /// Optional acknowledgment channel for dispatch loop synchronization.
1259    ///
1260    /// When present, the receiver must send on this channel to signal that
1261    /// response processing is complete, allowing the dispatch loop to continue
1262    /// to the next message.
1263    ///
1264    /// This is `None` for error paths where the response is sent directly
1265    /// (e.g., when the outgoing channel is broken) rather than through the
1266    /// normal dispatch loop flow.
1267    pub(crate) ack_tx: Option<oneshot::Sender<()>>,
1268}
1269
1270impl std::fmt::Debug for ResponsePayload {
1271    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1272        f.debug_struct("ResponsePayload")
1273            .field("result", &self.result)
1274            .field("ack_tx", &self.ack_tx.as_ref().map(|_| "..."))
1275            .finish()
1276    }
1277}
1278
1279/// Message sent to the incoming actor for reply subscription management.
1280enum ReplyMessage {
1281    /// Subscribe to receive a response for the given request id.
1282    /// When a response with this id arrives, it will be sent through the oneshot
1283    /// along with an ack channel that must be signaled when processing is complete.
1284    /// The method name is stored to allow routing responses through typed handlers.
1285    Subscribe {
1286        id: jsonrpcmsg::Id,
1287
1288        /// id of the peer this request was sent to
1289        role_id: RoleId,
1290
1291        /// (original) method of the request -- the actual request may have been transformed
1292        /// to a successor method, but this will reflect the method of the wrapped request
1293        method: String,
1294
1295        sender: oneshot::Sender<ResponsePayload>,
1296    },
1297}
1298
1299impl std::fmt::Debug for ReplyMessage {
1300    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1301        match self {
1302            ReplyMessage::Subscribe { id, method, .. } => f
1303                .debug_struct("Subscribe")
1304                .field("id", id)
1305                .field("method", method)
1306                .finish(),
1307        }
1308    }
1309}
1310
1311/// Messages send to be serialized over the transport.
1312#[derive(Debug)]
1313enum OutgoingMessage {
1314    /// Send a request to the server.
1315    Request {
1316        /// id assigned to this request (generated by sender)
1317        id: jsonrpcmsg::Id,
1318
1319        /// the original method
1320        method: String,
1321
1322        /// the peer we sent this to
1323        role_id: RoleId,
1324
1325        /// the message to send; this may have a distinct method
1326        /// depending on the peer
1327        untyped: UntypedMessage,
1328
1329        /// where to send the response when it arrives (includes ack channel)
1330        response_tx: oneshot::Sender<ResponsePayload>,
1331    },
1332
1333    /// Send a notification to the server.
1334    Notification {
1335        /// the message to send; this may have a distinct method
1336        /// depending on the peer
1337        untyped: UntypedMessage,
1338    },
1339
1340    /// Send a response to a message from the server
1341    Response {
1342        id: jsonrpcmsg::Id,
1343
1344        response: Result<serde_json::Value, crate::Error>,
1345    },
1346
1347    /// Send a generalized error message
1348    Error { error: crate::Error },
1349}
1350
1351/// Return type from JrHandler; indicates whether the request was handled or not.
1352#[must_use]
1353#[derive(Debug)]
1354pub enum Handled<T> {
1355    /// The message was handled
1356    Yes,
1357
1358    /// The message was not handled; returns the original value.
1359    ///
1360    /// If `retry` is true,
1361    No {
1362        /// The message to be passed to subsequent handlers
1363        /// (typically the original message, but it may have been
1364        /// mutated.)
1365        message: T,
1366
1367        /// If true, request the message to be queued and retried with
1368        /// dynamic handlers as they are added.
1369        ///
1370        /// This is used for managing session updates since the dynamic
1371        /// handler for a session cannot be added until the response to the
1372        /// new session request has been processed and there may be updates
1373        /// that get processed at the same time.
1374        retry: bool,
1375    },
1376}
1377
1378/// Trait for converting handler return values into [`Handled`].
1379///
1380/// This trait allows handlers to return either `()` (which becomes `Handled::Yes`)
1381/// or an explicit `Handled<T>` value for more control over handler propagation.
1382pub trait IntoHandled<T> {
1383    /// Convert this value into a `Handled<T>`.
1384    fn into_handled(self) -> Handled<T>;
1385}
1386
1387impl<T> IntoHandled<T> for () {
1388    fn into_handled(self) -> Handled<T> {
1389        Handled::Yes
1390    }
1391}
1392
1393impl<T> IntoHandled<T> for Handled<T> {
1394    fn into_handled(self) -> Handled<T> {
1395        self
1396    }
1397}
1398
1399/// Connection context for sending messages and spawning tasks.
1400///
1401/// This is the primary handle for interacting with the JSON-RPC connection from
1402/// within handler callbacks. You can use it to:
1403///
1404/// * Send requests and notifications to the other side
1405/// * Spawn concurrent tasks that run alongside the connection
1406/// * Respond to requests (via [`Responder`] which wraps this)
1407///
1408/// # Cloning
1409///
1410/// `ConnectionTo` is cheaply cloneable - all clones refer to the same underlying connection.
1411/// This makes it easy to share across async tasks.
1412///
1413/// # Event Loop and Concurrency
1414///
1415/// Handler callbacks run on the event loop, which means the connection cannot process new
1416/// messages while your handler is running. Use [`spawn`](Self::spawn) to offload any
1417/// expensive or blocking work to concurrent tasks.
1418///
1419/// See the [Event Loop and Concurrency](Builder#event-loop-and-concurrency) section
1420/// for more details.
1421#[derive(Clone, Debug)]
1422pub struct ConnectionTo<Counterpart: Role> {
1423    counterpart: Counterpart,
1424    message_tx: OutgoingMessageTx,
1425    task_tx: TaskTx,
1426    dynamic_handler_tx: mpsc::UnboundedSender<DynamicHandlerMessage<Counterpart>>,
1427}
1428
1429impl<Counterpart: Role> ConnectionTo<Counterpart> {
1430    fn new(
1431        counterpart: Counterpart,
1432        message_tx: mpsc::UnboundedSender<OutgoingMessage>,
1433        task_tx: mpsc::UnboundedSender<Task>,
1434        dynamic_handler_tx: mpsc::UnboundedSender<DynamicHandlerMessage<Counterpart>>,
1435    ) -> Self {
1436        Self {
1437            counterpart,
1438            message_tx,
1439            task_tx,
1440            dynamic_handler_tx,
1441        }
1442    }
1443
1444    /// Return the counterpart role this connection is talking to.
1445    pub fn counterpart(&self) -> Counterpart {
1446        self.counterpart.clone()
1447    }
1448
1449    /// Spawns a task that will run so long as the JSON-RPC connection is being served.
1450    ///
1451    /// This is the primary mechanism for offloading expensive work from handler callbacks
1452    /// to avoid blocking the event loop. Spawned tasks run concurrently with the connection,
1453    /// allowing the server to continue processing messages.
1454    ///
1455    /// # Event Loop
1456    ///
1457    /// Handler callbacks run on the event loop, which cannot process new messages while
1458    /// your handler is running. Use `spawn` for any expensive operations:
1459    ///
1460    /// ```no_run
1461    /// # use agent_client_protocol_test::*;
1462    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
1463    /// # let connection = mock_connection();
1464    /// connection.on_receive_request(async |req: ProcessRequest, responder, cx| {
1465    ///     // Clone cx for the spawned task
1466    ///     cx.spawn({
1467    ///         let connection = cx.clone();
1468    ///         async move {
1469    ///             let result = expensive_operation(&req.data).await?;
1470    ///             connection.send_notification(ProcessComplete { result })?;
1471    ///             Ok(())
1472    ///         }
1473    ///     })?;
1474    ///
1475    ///     // Respond immediately
1476    ///     responder.respond(ProcessResponse { result: "started".into() })
1477    /// }, agent_client_protocol::on_receive_request!())
1478    /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
1479    /// # Ok(())
1480    /// # }
1481    /// ```
1482    ///
1483    /// # Errors
1484    ///
1485    /// If the spawned task returns an error, the entire server will shut down.
1486    #[track_caller]
1487    pub fn spawn(
1488        &self,
1489        task: impl IntoFuture<Output = Result<(), crate::Error>, IntoFuture: Send + 'static>,
1490    ) -> Result<(), crate::Error> {
1491        let location = std::panic::Location::caller();
1492        let task = task.into_future();
1493        Task::new(location, task).spawn(&self.task_tx)
1494    }
1495
1496    /// Spawn a JSON-RPC connection in the background and return a [`ConnectionTo`] for sending messages to it.
1497    ///
1498    /// This is useful for creating multiple connections that communicate with each other,
1499    /// such as implementing proxy patterns or connecting to multiple backend services.
1500    ///
1501    /// # Arguments
1502    ///
1503    /// - `builder`: The connection builder with handlers configured
1504    /// - `transport`: The transport component to connect to
1505    ///
1506    /// # Returns
1507    ///
1508    /// A `ConnectionTo` that you can use to send requests and notifications to the spawned connection.
1509    ///
1510    /// # Example: Proxying to a backend connection
1511    ///
1512    /// ```
1513    /// # use agent_client_protocol::UntypedRole;
1514    /// # use agent_client_protocol::{Builder, ConnectionTo};
1515    /// # use agent_client_protocol_test::*;
1516    /// # async fn example(cx: ConnectionTo<UntypedRole>) -> Result<(), agent_client_protocol::Error> {
1517    /// // Set up a backend connection builder
1518    /// let backend = UntypedRole.builder()
1519    ///     .on_receive_request(async |req: MyRequest, responder, _cx| {
1520    ///         responder.respond(MyResponse { status: "ok".into() })
1521    ///     }, agent_client_protocol::on_receive_request!());
1522    ///
1523    /// // Spawn it and get a context to send requests to it
1524    /// let backend_connection = cx.spawn_connection(backend, MockTransport)?;
1525    ///
1526    /// // Now you can forward requests to the backend
1527    /// let response = backend_connection.send_request(MyRequest {}).block_task().await?;
1528    /// # Ok(())
1529    /// # }
1530    /// ```
1531    #[track_caller]
1532    pub fn spawn_connection<R: Role>(
1533        &self,
1534        builder: Builder<
1535            R,
1536            impl HandleDispatchFrom<R::Counterpart> + 'static,
1537            impl RunWithConnectionTo<R::Counterpart> + 'static,
1538        >,
1539        transport: impl ConnectTo<R> + 'static,
1540    ) -> Result<ConnectionTo<R::Counterpart>, crate::Error> {
1541        let (connection, future) =
1542            builder.into_connection_and_future(transport, |_| std::future::pending());
1543        Task::new(std::panic::Location::caller(), future).spawn(&self.task_tx)?;
1544        Ok(connection)
1545    }
1546
1547    /// Send a request/notification and forward the response appropriately.
1548    ///
1549    /// The request context's response type matches the request's response type,
1550    /// enabling type-safe message forwarding.
1551    pub fn send_proxied_message<Req: JsonRpcRequest<Response: Send>, Notif: JsonRpcNotification>(
1552        &self,
1553        message: Dispatch<Req, Notif>,
1554    ) -> Result<(), crate::Error>
1555    where
1556        Counterpart: HasPeer<Counterpart>,
1557    {
1558        self.send_proxied_message_to(self.counterpart(), message)
1559    }
1560
1561    /// Send a request/notification and forward the response appropriately.
1562    ///
1563    /// The request context's response type matches the request's response type,
1564    /// enabling type-safe message forwarding.
1565    pub fn send_proxied_message_to<
1566        Peer: Role,
1567        Req: JsonRpcRequest<Response: Send>,
1568        Notif: JsonRpcNotification,
1569    >(
1570        &self,
1571        peer: Peer,
1572        message: Dispatch<Req, Notif>,
1573    ) -> Result<(), crate::Error>
1574    where
1575        Counterpart: HasPeer<Peer>,
1576    {
1577        match message {
1578            Dispatch::Request(request, responder) => self
1579                .send_request_to(peer, request)
1580                .forward_response_to(responder),
1581            Dispatch::Notification(notification) => self.send_notification_to(peer, notification),
1582            Dispatch::Response(result, router) => {
1583                // Responses are forwarded directly to their destination
1584                router.respond_with_result(result)
1585            }
1586        }
1587    }
1588
1589    /// Send an outgoing request and return a [`SentRequest`] for handling the reply.
1590    ///
1591    /// The returned [`SentRequest`] provides methods for receiving the response without
1592    /// blocking the event loop:
1593    ///
1594    /// * [`on_receiving_result`](SentRequest::on_receiving_result) - Schedule
1595    ///   a callback to run when the response arrives (doesn't block the event loop)
1596    /// * [`block_task`](SentRequest::block_task) - Block the current task until the response
1597    ///   arrives (only safe in spawned tasks, not in handlers)
1598    ///
1599    /// # Anti-Footgun Design
1600    ///
1601    /// The API intentionally makes it difficult to block on the result directly to prevent
1602    /// the common mistake of blocking the event loop while waiting for a response:
1603    ///
1604    /// ```compile_fail
1605    /// # use agent_client_protocol_test::*;
1606    /// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
1607    /// // ❌ This doesn't compile - prevents blocking the event loop
1608    /// let response = cx.send_request(MyRequest {}).await?;
1609    /// # Ok(())
1610    /// # }
1611    /// ```
1612    ///
1613    /// ```no_run
1614    /// # use agent_client_protocol_test::*;
1615    /// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
1616    /// // ✅ Option 1: Schedule callback (safe in handlers)
1617    /// cx.send_request(MyRequest {})
1618    ///     .on_receiving_result(async |result| {
1619    ///         // Handle the response
1620    ///         Ok(())
1621    ///     })?;
1622    ///
1623    /// // ✅ Option 2: Block in spawned task (safe because task is concurrent)
1624    /// cx.spawn({
1625    ///     let cx = cx.clone();
1626    ///     async move {
1627    ///         let response = cx.send_request(MyRequest {})
1628    ///             .block_task()
1629    ///             .await?;
1630    ///         // Process response...
1631    ///         Ok(())
1632    ///     }
1633    /// })?;
1634    /// # Ok(())
1635    /// # }
1636    /// ```
1637    /// Send an outgoing request to the default counterpart peer.
1638    ///
1639    /// This is a convenience method that sends to the counterpart role `R`.
1640    /// For explicit control over the target peer, use [`send_request_to`](Self::send_request_to).
1641    pub fn send_request<Req: JsonRpcRequest>(&self, request: Req) -> SentRequest<Req::Response>
1642    where
1643        Counterpart: HasPeer<Counterpart>,
1644    {
1645        self.send_request_to(self.counterpart.clone(), request)
1646    }
1647
1648    /// Send an outgoing request to a specific peer.
1649    ///
1650    /// The message will be transformed according to the [`HasPeer`](crate::role::HasPeer)
1651    /// implementation before being sent.
1652    pub fn send_request_to<Peer: Role, Req: JsonRpcRequest>(
1653        &self,
1654        peer: Peer,
1655        request: Req,
1656    ) -> SentRequest<Req::Response>
1657    where
1658        Counterpart: HasPeer<Peer>,
1659    {
1660        let method = request.method().to_string();
1661        let id = jsonrpcmsg::Id::String(uuid::Uuid::new_v4().to_string());
1662        let (response_tx, response_rx) = oneshot::channel();
1663        let role_id = peer.role_id();
1664        let remote_style = self.counterpart.remote_style(peer);
1665        match remote_style.transform_outgoing_message(request) {
1666            Ok(untyped) => {
1667                // Transform the message for the target role
1668                let message = OutgoingMessage::Request {
1669                    id: id.clone(),
1670                    method: method.clone(),
1671                    role_id,
1672                    untyped,
1673                    response_tx,
1674                };
1675
1676                match self.message_tx.unbounded_send(message) {
1677                    Ok(()) => (),
1678                    Err(error) => {
1679                        let OutgoingMessage::Request {
1680                            method,
1681                            response_tx,
1682                            ..
1683                        } = error.into_inner()
1684                        else {
1685                            unreachable!();
1686                        };
1687
1688                        response_tx
1689                            .send(ResponsePayload {
1690                                result: Err(crate::util::internal_error(format!(
1691                                    "failed to send outgoing request `{method}"
1692                                ))),
1693                                ack_tx: None,
1694                            })
1695                            .unwrap();
1696                    }
1697                }
1698            }
1699
1700            Err(err) => {
1701                response_tx
1702                    .send(ResponsePayload {
1703                        result: Err(crate::util::internal_error(format!(
1704                            "failed to create untyped request for `{method}`: {err}"
1705                        ))),
1706                        ack_tx: None,
1707                    })
1708                    .unwrap();
1709            }
1710        }
1711
1712        SentRequest::new(id, method.clone(), self.task_tx.clone(), response_rx)
1713            .map(move |json| <Req::Response>::from_value(&method, json))
1714    }
1715
1716    /// Send an outgoing notification to the default counterpart peer (no reply expected).
1717    ///
1718    /// Notifications are fire-and-forget messages that don't have IDs and don't expect responses.
1719    /// This method sends the notification immediately and returns.
1720    ///
1721    /// This is a convenience method that sends to the counterpart role `R`.
1722    /// For explicit control over the target peer, use [`send_notification_to`](Self::send_notification_to).
1723    ///
1724    /// ```no_run
1725    /// # use agent_client_protocol_test::*;
1726    /// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::Agent>) -> Result<(), agent_client_protocol::Error> {
1727    /// cx.send_notification(StatusUpdate {
1728    ///     message: "Processing...".into(),
1729    /// })?;
1730    /// # Ok(())
1731    /// # }
1732    /// ```
1733    pub fn send_notification<N: JsonRpcNotification>(
1734        &self,
1735        notification: N,
1736    ) -> Result<(), crate::Error>
1737    where
1738        Counterpart: HasPeer<Counterpart>,
1739    {
1740        self.send_notification_to(self.counterpart.clone(), notification)
1741    }
1742
1743    /// Send an outgoing notification to a specific peer (no reply expected).
1744    ///
1745    /// The message will be transformed according to the [`HasPeer`](crate::role::HasPeer)
1746    /// implementation before being sent.
1747    pub fn send_notification_to<Peer: Role, N: JsonRpcNotification>(
1748        &self,
1749        peer: Peer,
1750        notification: N,
1751    ) -> Result<(), crate::Error>
1752    where
1753        Counterpart: HasPeer<Peer>,
1754    {
1755        let remote_style = self.counterpart.remote_style(peer);
1756        tracing::debug!(
1757            role = std::any::type_name::<Counterpart>(),
1758            peer = std::any::type_name::<Peer>(),
1759            notification_type = std::any::type_name::<N>(),
1760            ?remote_style,
1761            original_method = notification.method(),
1762            "send_notification_to"
1763        );
1764        let transformed = remote_style.transform_outgoing_message(notification)?;
1765        tracing::debug!(
1766            transformed_method = %transformed.method,
1767            "send_notification_to transformed"
1768        );
1769        send_raw_message(
1770            &self.message_tx,
1771            OutgoingMessage::Notification {
1772                untyped: transformed,
1773            },
1774        )
1775    }
1776
1777    /// Send an error notification (no reply expected).
1778    pub fn send_error_notification(&self, error: crate::Error) -> Result<(), crate::Error> {
1779        send_raw_message(&self.message_tx, OutgoingMessage::Error { error })
1780    }
1781
1782    /// Register a dynamic message handler, used to intercept messages specific to a particular session
1783    /// or some similar modal thing.
1784    ///
1785    /// Dynamic message handlers are called first for every incoming message.
1786    ///
1787    /// If they decline to handle the message, then the message is passed to the regular registered handlers.
1788    ///
1789    /// The handler will stay registered until the returned registration guard is dropped.
1790    pub fn add_dynamic_handler(
1791        &self,
1792        handler: impl HandleDispatchFrom<Counterpart> + 'static,
1793    ) -> Result<DynamicHandlerRegistration<Counterpart>, crate::Error> {
1794        let uuid = Uuid::new_v4();
1795        self.dynamic_handler_tx
1796            .unbounded_send(DynamicHandlerMessage::AddDynamicHandler(
1797                uuid,
1798                Box::new(handler),
1799            ))
1800            .map_err(crate::util::internal_error)?;
1801
1802        Ok(DynamicHandlerRegistration::new(uuid, self.clone()))
1803    }
1804
1805    fn remove_dynamic_handler(&self, uuid: Uuid) {
1806        // Ignore errors
1807        drop(
1808            self.dynamic_handler_tx
1809                .unbounded_send(DynamicHandlerMessage::RemoveDynamicHandler(uuid)),
1810        );
1811    }
1812}
1813
1814#[derive(Clone, Debug)]
1815pub struct DynamicHandlerRegistration<R: Role> {
1816    uuid: Uuid,
1817    cx: ConnectionTo<R>,
1818}
1819
1820impl<R: Role> DynamicHandlerRegistration<R> {
1821    fn new(uuid: Uuid, cx: ConnectionTo<R>) -> Self {
1822        Self { uuid, cx }
1823    }
1824
1825    /// Prevents the dynamic handler from being removed when dropped.
1826    pub fn run_indefinitely(self) {
1827        std::mem::forget(self);
1828    }
1829}
1830
1831impl<R: Role> Drop for DynamicHandlerRegistration<R> {
1832    fn drop(&mut self) {
1833        self.cx.remove_dynamic_handler(self.uuid);
1834    }
1835}
1836
1837/// The context to respond to an incoming request.
1838///
1839/// This context is provided to request handlers and serves a dual role:
1840///
1841/// 1. **Respond to the request** - Use [`respond`](Self::respond) or
1842///    [`respond_with_result`](Self::respond_with_result) to send the response
1843/// 2. **Send other messages** - Use the [`ConnectionTo`] parameter passed to your
1844///    handler, which provides [`send_request`](`ConnectionTo::send_request`),
1845///    [`send_notification`](`ConnectionTo::send_notification`), and
1846///    [`spawn`](`ConnectionTo::spawn`)
1847///
1848/// # Example
1849///
1850/// ```no_run
1851/// # use agent_client_protocol_test::*;
1852/// # async fn example() -> Result<(), agent_client_protocol::Error> {
1853/// # let connection = mock_connection();
1854/// connection.on_receive_request(async |req: ProcessRequest, responder, cx| {
1855///     // Send a notification while processing
1856///     cx.send_notification(StatusUpdate {
1857///         message: "processing".into(),
1858///     })?;
1859///
1860///     // Do some work...
1861///     let result = process(&req.data)?;
1862///
1863///     // Respond to the request
1864///     responder.respond(ProcessResponse { result })
1865/// }, agent_client_protocol::on_receive_request!())
1866/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
1867/// # Ok(())
1868/// # }
1869/// ```
1870///
1871/// # Event Loop Considerations
1872///
1873/// Like all handlers, request handlers run on the event loop. Use
1874/// [`spawn`](ConnectionTo::spawn) for expensive operations to avoid blocking
1875/// the connection.
1876///
1877/// See the [Event Loop and Concurrency](Builder#event-loop-and-concurrency)
1878/// section for more details.
1879#[must_use]
1880pub struct Responder<T: JsonRpcResponse = serde_json::Value> {
1881    /// The method of the request.
1882    method: String,
1883
1884    /// The `id` of the message we are replying to.
1885    id: jsonrpcmsg::Id,
1886
1887    /// Function to send the response to its destination.
1888    ///
1889    /// For incoming requests: serializes to JSON and sends over the wire.
1890    /// For incoming responses: sends to the waiting oneshot channel.
1891    send_fn: Box<dyn FnOnce(Result<T, crate::Error>) -> Result<(), crate::Error> + Send>,
1892}
1893
1894impl<T: JsonRpcResponse> std::fmt::Debug for Responder<T> {
1895    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1896        f.debug_struct("Responder")
1897            .field("method", &self.method)
1898            .field("id", &self.id)
1899            .field("response_type", &std::any::type_name::<T>())
1900            .finish_non_exhaustive()
1901    }
1902}
1903
1904impl Responder<serde_json::Value> {
1905    /// Create a new request context for an incoming request.
1906    ///
1907    /// The response will be serialized to JSON and sent over the wire.
1908    fn new(message_tx: OutgoingMessageTx, method: String, id: jsonrpcmsg::Id) -> Self {
1909        let id_clone = id.clone();
1910        Self {
1911            method,
1912            id,
1913            send_fn: Box::new(move |response: Result<serde_json::Value, crate::Error>| {
1914                send_raw_message(
1915                    &message_tx,
1916                    OutgoingMessage::Response {
1917                        id: id_clone,
1918                        response,
1919                    },
1920                )
1921            }),
1922        }
1923    }
1924
1925    /// Cast this request context to a different response type.
1926    ///
1927    /// The provided type `T` will be serialized to JSON before sending.
1928    pub fn cast<T: JsonRpcResponse>(self) -> Responder<T> {
1929        self.wrap_params(move |method, value| match value {
1930            Ok(value) => T::into_json(value, method),
1931            Err(e) => Err(e),
1932        })
1933    }
1934}
1935
1936impl<T: JsonRpcResponse> Responder<T> {
1937    /// Method of the incoming request
1938    #[must_use]
1939    pub fn method(&self) -> &str {
1940        &self.method
1941    }
1942
1943    /// ID of the incoming request/response as a JSON value
1944    #[must_use]
1945    pub fn id(&self) -> serde_json::Value {
1946        crate::util::id_to_json(&self.id)
1947    }
1948
1949    /// Convert to a `Responder` that expects a JSON value
1950    /// and which checks (dynamically) that the JSON value it receives
1951    /// can be converted to `T`.
1952    pub fn erase_to_json(self) -> Responder<serde_json::Value> {
1953        self.wrap_params(|method, value| T::from_value(method, value?))
1954    }
1955
1956    /// Return a new Responder with a different method name.
1957    pub fn wrap_method(self, method: String) -> Responder<T> {
1958        Responder {
1959            method,
1960            id: self.id,
1961            send_fn: self.send_fn,
1962        }
1963    }
1964
1965    /// Return a new Responder that expects a response of type U.
1966    ///
1967    /// `wrap_fn` will be invoked with the method name and the result to transform
1968    /// type `U` into type `T` before sending.
1969    pub fn wrap_params<U: JsonRpcResponse>(
1970        self,
1971        wrap_fn: impl FnOnce(&str, Result<U, crate::Error>) -> Result<T, crate::Error> + Send + 'static,
1972    ) -> Responder<U> {
1973        let method = self.method.clone();
1974        Responder {
1975            method: self.method,
1976            id: self.id,
1977            send_fn: Box::new(move |input: Result<U, crate::Error>| {
1978                let t_value = wrap_fn(&method, input);
1979                (self.send_fn)(t_value)
1980            }),
1981        }
1982    }
1983
1984    /// Respond to the JSON-RPC request with either a value (`Ok`) or an error (`Err`).
1985    pub fn respond_with_result(
1986        self,
1987        response: Result<T, crate::Error>,
1988    ) -> Result<(), crate::Error> {
1989        tracing::debug!(id = ?self.id, "respond called");
1990        (self.send_fn)(response)
1991    }
1992
1993    /// Respond to the JSON-RPC request with a value.
1994    pub fn respond(self, response: T) -> Result<(), crate::Error> {
1995        self.respond_with_result(Ok(response))
1996    }
1997
1998    /// Respond to the JSON-RPC request with an internal error containing a message.
1999    pub fn respond_with_internal_error(self, message: impl ToString) -> Result<(), crate::Error> {
2000        self.respond_with_error(crate::util::internal_error(message))
2001    }
2002
2003    /// Respond to the JSON-RPC request with an error.
2004    pub fn respond_with_error(self, error: crate::Error) -> Result<(), crate::Error> {
2005        tracing::debug!(id = ?self.id, ?error, "respond_with_error called");
2006        self.respond_with_result(Err(error))
2007    }
2008}
2009
2010/// Context for handling an incoming JSON-RPC response.
2011///
2012/// This is the response-side counterpart to [`Responder`]. While `Responder` handles
2013/// incoming requests (where you send a response over the wire), `ResponseRouter` handles
2014/// incoming responses (where you route the response to a local task waiting for it).
2015///
2016/// Both are fundamentally "sinks" that push the message through a `send_fn`, but they
2017/// represent different points in the message lifecycle and carry different metadata.
2018#[must_use]
2019pub struct ResponseRouter<T: JsonRpcResponse = serde_json::Value> {
2020    /// The method of the original request.
2021    method: String,
2022
2023    /// The `id` of the original request.
2024    id: jsonrpcmsg::Id,
2025
2026    /// The RoleId to which the original request was sent
2027    /// (and hence from which the reply is expected).
2028    role_id: RoleId,
2029
2030    /// Function to send the response to the waiting task.
2031    send_fn: Box<dyn FnOnce(Result<T, crate::Error>) -> Result<(), crate::Error> + Send>,
2032}
2033
2034impl<T: JsonRpcResponse> std::fmt::Debug for ResponseRouter<T> {
2035    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2036        f.debug_struct("ResponseRouter")
2037            .field("method", &self.method)
2038            .field("id", &self.id)
2039            .field("response_type", &std::any::type_name::<T>())
2040            .finish_non_exhaustive()
2041    }
2042}
2043
2044impl ResponseRouter<serde_json::Value> {
2045    /// Create a new response context for routing a response to a local awaiter.
2046    ///
2047    /// When `respond_with_result` is called, the response is sent through the oneshot
2048    /// channel to the code that originally sent the request.
2049    pub(crate) fn new(
2050        method: String,
2051        id: jsonrpcmsg::Id,
2052        role_id: RoleId,
2053        sender: oneshot::Sender<ResponsePayload>,
2054    ) -> Self {
2055        Self {
2056            method,
2057            id,
2058            role_id,
2059            send_fn: Box::new(move |response: Result<serde_json::Value, crate::Error>| {
2060                sender
2061                    .send(ResponsePayload {
2062                        result: response,
2063                        ack_tx: None,
2064                    })
2065                    .map_err(|_| {
2066                        crate::util::internal_error("failed to send response, receiver dropped")
2067                    })
2068            }),
2069        }
2070    }
2071
2072    /// Cast this response context to a different response type.
2073    ///
2074    /// The provided type `T` will be serialized to JSON before sending.
2075    pub fn cast<T: JsonRpcResponse>(self) -> ResponseRouter<T> {
2076        self.wrap_params(move |method, value| match value {
2077            Ok(value) => T::into_json(value, method),
2078            Err(e) => Err(e),
2079        })
2080    }
2081}
2082
2083impl<T: JsonRpcResponse> ResponseRouter<T> {
2084    /// Method of the original request
2085    #[must_use]
2086    pub fn method(&self) -> &str {
2087        &self.method
2088    }
2089
2090    /// ID of the original request as a JSON value
2091    #[must_use]
2092    pub fn id(&self) -> serde_json::Value {
2093        crate::util::id_to_json(&self.id)
2094    }
2095
2096    /// The peer to which the original request was sent.
2097    ///
2098    /// This is the peer from which we expect to receive the response.
2099    #[must_use]
2100    pub fn role_id(&self) -> RoleId {
2101        self.role_id.clone()
2102    }
2103
2104    /// Convert to a `ResponseRouter` that expects a JSON value
2105    /// and which checks (dynamically) that the JSON value it receives
2106    /// can be converted to `T`.
2107    pub fn erase_to_json(self) -> ResponseRouter<serde_json::Value> {
2108        self.wrap_params(|method, value| T::from_value(method, value?))
2109    }
2110
2111    /// Return a new ResponseRouter that expects a response of type U.
2112    ///
2113    /// `wrap_fn` will be invoked with the method name and the result to transform
2114    /// type `U` into type `T` before sending.
2115    fn wrap_params<U: JsonRpcResponse>(
2116        self,
2117        wrap_fn: impl FnOnce(&str, Result<U, crate::Error>) -> Result<T, crate::Error> + Send + 'static,
2118    ) -> ResponseRouter<U> {
2119        let method = self.method.clone();
2120        ResponseRouter {
2121            method: self.method,
2122            id: self.id,
2123            role_id: self.role_id,
2124            send_fn: Box::new(move |input: Result<U, crate::Error>| {
2125                let t_value = wrap_fn(&method, input);
2126                (self.send_fn)(t_value)
2127            }),
2128        }
2129    }
2130
2131    /// Complete the response by sending the result to the waiting task.
2132    pub fn respond_with_result(
2133        self,
2134        response: Result<T, crate::Error>,
2135    ) -> Result<(), crate::Error> {
2136        tracing::debug!(id = ?self.id, "response routed to awaiter");
2137        (self.send_fn)(response)
2138    }
2139
2140    /// Complete the response by sending a value to the waiting task.
2141    pub fn respond(self, response: T) -> Result<(), crate::Error> {
2142        self.respond_with_result(Ok(response))
2143    }
2144
2145    /// Complete the response by sending an internal error to the waiting task.
2146    pub fn respond_with_internal_error(self, message: impl ToString) -> Result<(), crate::Error> {
2147        self.respond_with_error(crate::util::internal_error(message))
2148    }
2149
2150    /// Complete the response by sending an error to the waiting task.
2151    pub fn respond_with_error(self, error: crate::Error) -> Result<(), crate::Error> {
2152        tracing::debug!(id = ?self.id, ?error, "error routed to awaiter");
2153        self.respond_with_result(Err(error))
2154    }
2155}
2156
2157/// Common bounds for any JSON-RPC message.
2158///
2159/// # Derive Macro
2160///
2161/// For simple message types, you can use the `JsonRpcRequest` or `JsonRpcNotification` derive macros
2162/// which will implement both `JsonRpcMessage` and the respective trait. See [`JsonRpcRequest`] and
2163/// [`JsonRpcNotification`] for examples.
2164pub trait JsonRpcMessage: 'static + Debug + Sized + Send + Clone {
2165    /// Check if this message type matches the given method name.
2166    fn matches_method(method: &str) -> bool;
2167
2168    /// The method name for the message.
2169    fn method(&self) -> &str;
2170
2171    /// Convert this message into an untyped message.
2172    fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error>;
2173
2174    /// Parse this type from a method name and parameters.
2175    ///
2176    /// Returns an error if the method doesn't match or deserialization fails.
2177    /// Callers should use `matches_method` first to check if this type handles the method.
2178    fn parse_message(method: &str, params: &impl Serialize) -> Result<Self, crate::Error>;
2179}
2180
2181/// Defines the "payload" of a successful response to a JSON-RPC request.
2182///
2183/// # Derive Macro
2184///
2185/// Use `#[derive(JsonRpcResponse)]` to automatically implement this trait:
2186///
2187/// ```ignore
2188/// use agent_client_protocol::JsonRpcResponse;
2189/// use serde::{Serialize, Deserialize};
2190///
2191/// #[derive(Debug, Serialize, Deserialize, JsonRpcResponse)]
2192/// #[response(method = "_hello")]
2193/// struct HelloResponse {
2194///     greeting: String,
2195/// }
2196/// ```
2197pub trait JsonRpcResponse: 'static + Debug + Sized + Send + Clone {
2198    /// Convert this message into a JSON value.
2199    fn into_json(self, method: &str) -> Result<serde_json::Value, crate::Error>;
2200
2201    /// Parse a JSON value into the response type.
2202    fn from_value(method: &str, value: serde_json::Value) -> Result<Self, crate::Error>;
2203}
2204
2205impl JsonRpcResponse for serde_json::Value {
2206    fn from_value(_method: &str, value: serde_json::Value) -> Result<Self, crate::Error> {
2207        Ok(value)
2208    }
2209
2210    fn into_json(self, _method: &str) -> Result<serde_json::Value, crate::Error> {
2211        Ok(self)
2212    }
2213}
2214
2215/// A struct that represents a notification (JSON-RPC message that does not expect a response).
2216///
2217/// # Derive Macro
2218///
2219/// Use `#[derive(JsonRpcNotification)]` to automatically implement both `JsonRpcMessage` and `JsonRpcNotification`:
2220///
2221/// ```ignore
2222/// use agent_client_protocol::JsonRpcNotification;
2223/// use serde::{Serialize, Deserialize};
2224///
2225/// #[derive(Debug, Clone, Serialize, Deserialize, JsonRpcNotification)]
2226/// #[notification(method = "_ping")]
2227/// struct PingNotification {
2228///     timestamp: u64,
2229/// }
2230/// ```
2231pub trait JsonRpcNotification: JsonRpcMessage {}
2232
2233/// A struct that represents a request (JSON-RPC message expecting a response).
2234///
2235/// # Derive Macro
2236///
2237/// Use `#[derive(JsonRpcRequest)]` to automatically implement both `JsonRpcMessage` and `JsonRpcRequest`:
2238///
2239/// ```ignore
2240/// use agent_client_protocol::{JsonRpcRequest, JsonRpcResponse};
2241/// use serde::{Serialize, Deserialize};
2242///
2243/// #[derive(Debug, Clone, Serialize, Deserialize, JsonRpcRequest)]
2244/// #[request(method = "_hello", response = HelloResponse)]
2245/// struct HelloRequest {
2246///     name: String,
2247/// }
2248///
2249/// #[derive(Debug, Serialize, Deserialize, JsonRpcResponse)]
2250/// struct HelloResponse {
2251///     greeting: String,
2252/// }
2253/// ```
2254pub trait JsonRpcRequest: JsonRpcMessage {
2255    /// The type of data expected in response.
2256    type Response: JsonRpcResponse;
2257}
2258
2259/// An enum capturing an in-flight request or notification.
2260/// In the case of a request, also includes the context used to respond to the request.
2261///
2262/// Type parameters allow specifying the concrete request and notification types.
2263/// By default, both are `UntypedMessage` for dynamic dispatch.
2264/// The request context's response type matches the request's response type.
2265#[derive(Debug)]
2266pub enum Dispatch<Req: JsonRpcRequest = UntypedMessage, Notif: JsonRpcMessage = UntypedMessage> {
2267    /// Incoming request and the context where the response should be sent.
2268    Request(Req, Responder<Req::Response>),
2269
2270    /// Incoming notification.
2271    Notification(Notif),
2272
2273    /// Incoming response to a request we sent.
2274    ///
2275    /// The first field is the response result (success or error from the remote).
2276    /// The second field is the context for forwarding the response to its destination
2277    /// (typically a waiting oneshot channel).
2278    Response(
2279        Result<Req::Response, crate::Error>,
2280        ResponseRouter<Req::Response>,
2281    ),
2282}
2283
2284impl<Req: JsonRpcRequest, Notif: JsonRpcMessage> Dispatch<Req, Notif> {
2285    /// Map the request and notification types to new types.
2286    ///
2287    /// Note: Response variants are passed through unchanged since they don't
2288    /// contain a parseable message payload.
2289    pub fn map<Req1, Notif1>(
2290        self,
2291        map_request: impl FnOnce(Req, Responder<Req::Response>) -> (Req1, Responder<Req1::Response>),
2292        map_notification: impl FnOnce(Notif) -> Notif1,
2293    ) -> Dispatch<Req1, Notif1>
2294    where
2295        Req1: JsonRpcRequest<Response = Req::Response>,
2296        Notif1: JsonRpcMessage,
2297    {
2298        match self {
2299            Dispatch::Request(request, responder) => {
2300                let (new_request, new_responder) = map_request(request, responder);
2301                Dispatch::Request(new_request, new_responder)
2302            }
2303            Dispatch::Notification(notification) => {
2304                let new_notification = map_notification(notification);
2305                Dispatch::Notification(new_notification)
2306            }
2307            Dispatch::Response(result, router) => Dispatch::Response(result, router),
2308        }
2309    }
2310
2311    /// Respond to the message with an error.
2312    ///
2313    /// If this message is a request, this error becomes the reply to the request.
2314    ///
2315    /// If this message is a notification, the error is sent as a notification.
2316    ///
2317    /// If this message is a response, the error is forwarded to the waiting handler.
2318    pub fn respond_with_error<R: Role>(
2319        self,
2320        error: crate::Error,
2321        cx: ConnectionTo<R>,
2322    ) -> Result<(), crate::Error> {
2323        match self {
2324            Dispatch::Request(_, responder) => responder.respond_with_error(error),
2325            Dispatch::Notification(_) => cx.send_error_notification(error),
2326            Dispatch::Response(_, responder) => responder.respond_with_error(error),
2327        }
2328    }
2329
2330    /// Convert to a `Responder` that expects a JSON value
2331    /// and which checks (dynamically) that the JSON value it receives
2332    /// can be converted to `T`.
2333    ///
2334    /// Note: Response variants cannot be erased since their payload is already
2335    /// parsed. This returns an error for Response variants.
2336    pub fn erase_to_json(self) -> Result<Dispatch, crate::Error> {
2337        match self {
2338            Dispatch::Request(response, responder) => Ok(Dispatch::Request(
2339                response.to_untyped_message()?,
2340                responder.erase_to_json(),
2341            )),
2342            Dispatch::Notification(notification) => {
2343                Ok(Dispatch::Notification(notification.to_untyped_message()?))
2344            }
2345            Dispatch::Response(_, _) => Err(crate::util::internal_error(
2346                "cannot erase Response variant to JSON",
2347            )),
2348        }
2349    }
2350
2351    /// Convert the message in self to an untyped message.
2352    ///
2353    /// Note: Response variants don't have an untyped message representation.
2354    /// This returns an error for Response variants.
2355    pub fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error> {
2356        match self {
2357            Dispatch::Request(request, _) => request.to_untyped_message(),
2358            Dispatch::Notification(notification) => notification.to_untyped_message(),
2359            Dispatch::Response(_, _) => Err(crate::util::internal_error(
2360                "Response variant has no untyped message representation",
2361            )),
2362        }
2363    }
2364
2365    /// Convert self to an untyped message context.
2366    ///
2367    /// Note: Response variants cannot be converted. This returns an error for Response variants.
2368    pub fn into_untyped_dispatch(self) -> Result<Dispatch, crate::Error> {
2369        match self {
2370            Dispatch::Request(request, responder) => Ok(Dispatch::Request(
2371                request.to_untyped_message()?,
2372                responder.erase_to_json(),
2373            )),
2374            Dispatch::Notification(notification) => {
2375                Ok(Dispatch::Notification(notification.to_untyped_message()?))
2376            }
2377            Dispatch::Response(_, _) => Err(crate::util::internal_error(
2378                "cannot convert Response variant to untyped message context",
2379            )),
2380        }
2381    }
2382
2383    /// Returns the request ID if this is a request or response, None if notification.
2384    pub fn id(&self) -> Option<serde_json::Value> {
2385        match self {
2386            Dispatch::Request(_, cx) => Some(cx.id()),
2387            Dispatch::Notification(_) => None,
2388            Dispatch::Response(_, cx) => Some(cx.id()),
2389        }
2390    }
2391
2392    /// Returns the method of the message.
2393    ///
2394    /// For requests and notifications, this is the method from the message payload.
2395    /// For responses, this is the method of the original request.
2396    pub fn method(&self) -> &str {
2397        match self {
2398            Dispatch::Request(msg, _) => msg.method(),
2399            Dispatch::Notification(msg) => msg.method(),
2400            Dispatch::Response(_, cx) => cx.method(),
2401        }
2402    }
2403}
2404
2405impl Dispatch {
2406    /// Attempts to parse `self` into a typed message context.
2407    ///
2408    /// # Returns
2409    ///
2410    /// * `Ok(Ok(typed))` if this is a request/notification of the given types
2411    /// * `Ok(Err(self))` if not
2412    /// * `Err` if has the correct method for the given types but parsing fails
2413    #[tracing::instrument(skip(self), fields(Request = ?std::any::type_name::<Req>(), Notif = ?std::any::type_name::<Notif>()), level = "trace", ret)]
2414    pub(crate) fn into_typed_dispatch<Req: JsonRpcRequest, Notif: JsonRpcNotification>(
2415        self,
2416    ) -> Result<Result<Dispatch<Req, Notif>, Dispatch>, crate::Error> {
2417        tracing::debug!(
2418            message = ?self,
2419            "into_typed_dispatch"
2420        );
2421        match self {
2422            Dispatch::Request(message, responder) => {
2423                if Req::matches_method(&message.method) {
2424                    match Req::parse_message(&message.method, &message.params) {
2425                        Ok(req) => {
2426                            tracing::trace!(?req, "parsed ok");
2427                            Ok(Ok(Dispatch::Request(req, responder.cast())))
2428                        }
2429                        Err(err) => {
2430                            tracing::trace!(?err, "parse error");
2431                            Err(err)
2432                        }
2433                    }
2434                } else {
2435                    tracing::trace!("method doesn't match");
2436                    Ok(Err(Dispatch::Request(message, responder)))
2437                }
2438            }
2439
2440            Dispatch::Notification(message) => {
2441                if Notif::matches_method(&message.method) {
2442                    match Notif::parse_message(&message.method, &message.params) {
2443                        Ok(notif) => {
2444                            tracing::trace!(?notif, "parse ok");
2445                            Ok(Ok(Dispatch::Notification(notif)))
2446                        }
2447                        Err(err) => {
2448                            tracing::trace!(?err, "parse error");
2449                            Err(err)
2450                        }
2451                    }
2452                } else {
2453                    tracing::trace!("method doesn't match");
2454                    Ok(Err(Dispatch::Notification(message)))
2455                }
2456            }
2457
2458            Dispatch::Response(result, cx) => {
2459                let method = cx.method();
2460                if Req::matches_method(method) {
2461                    // Parse the response result
2462                    let typed_result = match result {
2463                        Ok(value) => {
2464                            match <Req::Response as JsonRpcResponse>::from_value(method, value) {
2465                                Ok(parsed) => {
2466                                    tracing::trace!(?parsed, "parse ok");
2467                                    Ok(parsed)
2468                                }
2469                                Err(err) => {
2470                                    tracing::trace!(?err, "parse error");
2471                                    return Err(err);
2472                                }
2473                            }
2474                        }
2475                        Err(err) => {
2476                            tracing::trace!("error, passthrough");
2477                            Err(err)
2478                        }
2479                    };
2480                    Ok(Ok(Dispatch::Response(typed_result, cx.cast())))
2481                } else {
2482                    tracing::trace!("method doesn't match");
2483                    Ok(Err(Dispatch::Response(result, cx)))
2484                }
2485            }
2486        }
2487    }
2488
2489    /// True if this message has a field with the given name.
2490    ///
2491    /// Returns `false` for Response variants.
2492    #[must_use]
2493    pub fn has_field(&self, field_name: &str) -> bool {
2494        self.message()
2495            .and_then(|m| m.params().get(field_name))
2496            .is_some()
2497    }
2498
2499    /// Returns true if this message has a session-id field.
2500    ///
2501    /// Returns `false` for Response variants.
2502    pub(crate) fn has_session_id(&self) -> bool {
2503        self.has_field("sessionId")
2504    }
2505
2506    /// Extract the ACP session-id from this message (if any).
2507    ///
2508    /// Returns `Ok(None)` for Response variants.
2509    pub(crate) fn get_session_id(&self) -> Result<Option<SessionId>, crate::Error> {
2510        let Some(message) = self.message() else {
2511            return Ok(None);
2512        };
2513        let Some(value) = message.params().get("sessionId") else {
2514            return Ok(None);
2515        };
2516        let session_id = serde_json::from_value(value.clone())?;
2517        Ok(Some(session_id))
2518    }
2519
2520    /// Try to parse this as a notification of the given type.
2521    ///
2522    /// # Returns
2523    ///
2524    /// * `Ok(Ok(typed))` if this is a request/notification of the given types
2525    /// * `Ok(Err(self))` if not
2526    /// * `Err` if has the correct method for the given types but parsing fails
2527    pub fn into_notification<N: JsonRpcNotification>(
2528        self,
2529    ) -> Result<Result<N, Dispatch>, crate::Error> {
2530        match self {
2531            Dispatch::Notification(msg) => {
2532                if !N::matches_method(&msg.method) {
2533                    return Ok(Err(Dispatch::Notification(msg)));
2534                }
2535                match N::parse_message(&msg.method, &msg.params) {
2536                    Ok(n) => Ok(Ok(n)),
2537                    Err(err) => Err(err),
2538                }
2539            }
2540            Dispatch::Request(..) | Dispatch::Response(..) => Ok(Err(self)),
2541        }
2542    }
2543
2544    /// Try to parse this as a request of the given type.
2545    ///
2546    /// # Returns
2547    ///
2548    /// * `Ok(Ok(typed))` if this is a request/notification of the given types
2549    /// * `Ok(Err(self))` if not
2550    /// * `Err` if has the correct method for the given types but parsing fails
2551    pub fn into_request<Req: JsonRpcRequest>(
2552        self,
2553    ) -> Result<Result<(Req, Responder<Req::Response>), Dispatch>, crate::Error> {
2554        match self {
2555            Dispatch::Request(msg, responder) => {
2556                if !Req::matches_method(&msg.method) {
2557                    return Ok(Err(Dispatch::Request(msg, responder)));
2558                }
2559                match Req::parse_message(&msg.method, &msg.params) {
2560                    Ok(req) => Ok(Ok((req, responder.cast()))),
2561                    Err(err) => Err(err),
2562                }
2563            }
2564            Dispatch::Notification(..) | Dispatch::Response(..) => Ok(Err(self)),
2565        }
2566    }
2567}
2568
2569impl<M: JsonRpcRequest + JsonRpcNotification> Dispatch<M, M> {
2570    /// Returns the message payload for requests and notifications.
2571    ///
2572    /// Returns `None` for Response variants since they don't contain a message payload.
2573    pub fn message(&self) -> Option<&M> {
2574        match self {
2575            Dispatch::Request(msg, _) | Dispatch::Notification(msg) => Some(msg),
2576            Dispatch::Response(_, _) => None,
2577        }
2578    }
2579
2580    /// Map the request/notification message.
2581    ///
2582    /// Response variants pass through unchanged.
2583    pub(crate) fn try_map_message(
2584        self,
2585        map_message: impl FnOnce(M) -> Result<M, crate::Error>,
2586    ) -> Result<Dispatch<M, M>, crate::Error> {
2587        match self {
2588            Dispatch::Request(request, cx) => Ok(Dispatch::Request(map_message(request)?, cx)),
2589            Dispatch::Notification(notification) => {
2590                Ok(Dispatch::<M, M>::Notification(map_message(notification)?))
2591            }
2592            Dispatch::Response(result, cx) => Ok(Dispatch::Response(result, cx)),
2593        }
2594    }
2595}
2596
2597/// An incoming JSON message without any typing. Can be a request or a notification.
2598#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
2599pub struct UntypedMessage {
2600    /// The JSON-RPC method name
2601    pub method: String,
2602    /// The JSON-RPC parameters as a raw JSON value
2603    pub params: serde_json::Value,
2604}
2605
2606impl UntypedMessage {
2607    /// Returns an untyped message with the given method and parameters.
2608    pub fn new(method: &str, params: impl Serialize) -> Result<Self, crate::Error> {
2609        let params = serde_json::to_value(params)?;
2610        Ok(Self {
2611            method: method.to_string(),
2612            params,
2613        })
2614    }
2615
2616    /// Returns the method name
2617    #[must_use]
2618    pub fn method(&self) -> &str {
2619        &self.method
2620    }
2621
2622    /// Returns the parameters as a JSON value
2623    #[must_use]
2624    pub fn params(&self) -> &serde_json::Value {
2625        &self.params
2626    }
2627
2628    /// Consumes this message and returns the method and params
2629    #[must_use]
2630    pub fn into_parts(self) -> (String, serde_json::Value) {
2631        (self.method, self.params)
2632    }
2633
2634    /// Convert `self` to a JSON-RPC message.
2635    pub(crate) fn into_jsonrpc_msg(
2636        self,
2637        id: Option<jsonrpcmsg::Id>,
2638    ) -> Result<jsonrpcmsg::Request, crate::Error> {
2639        let Self { method, params } = self;
2640        Ok(jsonrpcmsg::Request::new_v2(method, json_cast(params)?, id))
2641    }
2642}
2643
2644impl JsonRpcMessage for UntypedMessage {
2645    fn matches_method(_method: &str) -> bool {
2646        // UntypedMessage matches any method - it's the untyped fallback
2647        true
2648    }
2649
2650    fn method(&self) -> &str {
2651        &self.method
2652    }
2653
2654    fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error> {
2655        Ok(self.clone())
2656    }
2657
2658    fn parse_message(method: &str, params: &impl Serialize) -> Result<Self, crate::Error> {
2659        UntypedMessage::new(method, params)
2660    }
2661}
2662
2663impl JsonRpcRequest for UntypedMessage {
2664    type Response = serde_json::Value;
2665}
2666
2667impl JsonRpcNotification for UntypedMessage {}
2668
2669/// Represents a pending response of type `R` from an outgoing request.
2670///
2671/// Returned by [`ConnectionTo::send_request`], this type provides methods for handling
2672/// the response without blocking the event loop. The API is intentionally designed to make
2673/// it difficult to accidentally block.
2674///
2675/// # Anti-Footgun Design
2676///
2677/// You cannot directly `.await` a `SentRequest`. Instead, you must choose how to handle
2678/// the response:
2679///
2680/// ## Option 1: Schedule a Callback (Safe in Handlers)
2681///
2682/// Use [`on_receiving_result`](Self::on_receiving_result) to schedule a task
2683/// that runs when the response arrives. This doesn't block the event loop:
2684///
2685/// ```no_run
2686/// # use agent_client_protocol_test::*;
2687/// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
2688/// cx.send_request(MyRequest {})
2689///     .on_receiving_result(async |result| {
2690///         match result {
2691///             Ok(response) => {
2692///                 // Handle successful response
2693///                 Ok(())
2694///             }
2695///             Err(error) => {
2696///                 // Handle error
2697///                 Err(error)
2698///             }
2699///         }
2700///     })?;
2701/// # Ok(())
2702/// # }
2703/// ```
2704///
2705/// ## Option 2: Block in a Spawned Task (Safe Only in `spawn`)
2706///
2707/// Use [`block_task`](Self::block_task) to block until the response arrives, but **only**
2708/// in a spawned task (never in a handler):
2709///
2710/// ```no_run
2711/// # use agent_client_protocol_test::*;
2712/// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
2713/// // ✅ Safe: Spawned task runs concurrently
2714/// cx.spawn({
2715///     let cx = cx.clone();
2716///     async move {
2717///         let response = cx.send_request(MyRequest {})
2718///             .block_task()
2719///             .await?;
2720///         // Process response...
2721///         Ok(())
2722///     }
2723/// })?;
2724/// # Ok(())
2725/// # }
2726/// ```
2727///
2728/// ```no_run
2729/// # use agent_client_protocol_test::*;
2730/// # async fn example() -> Result<(), agent_client_protocol::Error> {
2731/// # let connection = mock_connection();
2732/// // ❌ NEVER do this in a handler - blocks the event loop!
2733/// connection.on_receive_request(async |req: MyRequest, responder, cx| {
2734///     let response = cx.send_request(MyRequest {})
2735///         .block_task()  // This will deadlock!
2736///         .await?;
2737///     responder.respond(response)
2738/// }, agent_client_protocol::on_receive_request!())
2739/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
2740/// # Ok(())
2741/// # }
2742/// ```
2743///
2744/// # Why This Design?
2745///
2746/// If you block the event loop while waiting for a response, the connection cannot process
2747/// the incoming response message, creating a deadlock. This API design prevents that footgun
2748/// by making blocking explicit and encouraging non-blocking patterns.
2749pub struct SentRequest<T> {
2750    id: jsonrpcmsg::Id,
2751    method: String,
2752    task_tx: TaskTx,
2753    response_rx: oneshot::Receiver<ResponsePayload>,
2754    to_result: Box<dyn Fn(serde_json::Value) -> Result<T, crate::Error> + Send>,
2755}
2756
2757impl<T: Debug> Debug for SentRequest<T> {
2758    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2759        f.debug_struct("SentRequest")
2760            .field("id", &self.id)
2761            .field("method", &self.method)
2762            .field("task_tx", &self.task_tx)
2763            .field("response_rx", &self.response_rx)
2764            .finish_non_exhaustive()
2765    }
2766}
2767
2768impl SentRequest<serde_json::Value> {
2769    fn new(
2770        id: jsonrpcmsg::Id,
2771        method: String,
2772        task_tx: mpsc::UnboundedSender<Task>,
2773        response_rx: oneshot::Receiver<ResponsePayload>,
2774    ) -> Self {
2775        Self {
2776            id,
2777            method,
2778            response_rx,
2779            task_tx,
2780            to_result: Box::new(Ok),
2781        }
2782    }
2783}
2784
2785impl<T: JsonRpcResponse> SentRequest<T> {
2786    /// The id of the outgoing request.
2787    #[must_use]
2788    pub fn id(&self) -> serde_json::Value {
2789        crate::util::id_to_json(&self.id)
2790    }
2791
2792    /// The method of the request this is in response to.
2793    #[must_use]
2794    pub fn method(&self) -> &str {
2795        &self.method
2796    }
2797
2798    /// Create a new response that maps the result of the response to a new type.
2799    pub fn map<U>(
2800        self,
2801        map_fn: impl Fn(T) -> Result<U, crate::Error> + 'static + Send,
2802    ) -> SentRequest<U> {
2803        SentRequest {
2804            id: self.id,
2805            method: self.method,
2806            response_rx: self.response_rx,
2807            task_tx: self.task_tx,
2808            to_result: Box::new(move |value| map_fn((self.to_result)(value)?)),
2809        }
2810    }
2811
2812    /// Forward the response (success or error) to a request context when it arrives.
2813    ///
2814    /// This is a convenience method for proxying messages between connections. When the
2815    /// response arrives, it will be automatically sent to the provided request context,
2816    /// whether it's a successful response or an error.
2817    ///
2818    /// # Example: Proxying requests
2819    ///
2820    /// ```
2821    /// # use agent_client_protocol::UntypedRole;
2822    /// # use agent_client_protocol::{Builder, ConnectionTo};
2823    /// # use agent_client_protocol_test::*;
2824    /// # async fn example(cx: ConnectionTo<UntypedRole>) -> Result<(), agent_client_protocol::Error> {
2825    /// // Set up backend connection builder
2826    /// let backend = UntypedRole.builder()
2827    ///     .on_receive_request(async |req: MyRequest, responder, cx| {
2828    ///         responder.respond(MyResponse { status: "ok".into() })
2829    ///     }, agent_client_protocol::on_receive_request!());
2830    ///
2831    /// // Spawn backend and get a context to send to it
2832    /// let backend_connection = cx.spawn_connection(backend, MockTransport)?;
2833    ///
2834    /// // Set up proxy that forwards requests to backend
2835    /// UntypedRole.builder()
2836    ///     .on_receive_request({
2837    ///         let backend_connection = backend_connection.clone();
2838    ///         async move |req: MyRequest, responder, cx| {
2839    ///             // Forward the request to backend and proxy the response back
2840    ///             backend_connection.send_request(req)
2841    ///                 .forward_response_to(responder)?;
2842    ///             Ok(())
2843    ///         }
2844    ///     }, agent_client_protocol::on_receive_request!());
2845    /// # Ok(())
2846    /// # }
2847    /// ```
2848    ///
2849    /// # Type Safety
2850    ///
2851    /// The request context's response type must match the request's response type,
2852    /// ensuring type-safe message forwarding.
2853    ///
2854    /// # When to Use
2855    ///
2856    /// Use this when:
2857    /// - You're implementing a proxy or gateway pattern
2858    /// - You want to forward responses without processing them
2859    /// - The response types match between the outgoing request and incoming request
2860    ///
2861    /// This is equivalent to calling `on_receiving_result` and manually forwarding
2862    /// the result, but more concise.
2863    pub fn forward_response_to(self, responder: Responder<T>) -> Result<(), crate::Error>
2864    where
2865        T: Send,
2866    {
2867        self.on_receiving_result(async move |result| responder.respond_with_result(result))
2868    }
2869
2870    /// Block the current task until the response is received.
2871    ///
2872    /// **Warning:** This method blocks the current async task. It is **only safe** to use
2873    /// in spawned tasks created with [`ConnectionTo::spawn`]. Using it directly in a
2874    /// handler callback will deadlock the connection.
2875    ///
2876    /// # Safe Usage (in spawned tasks)
2877    ///
2878    /// ```no_run
2879    /// # use agent_client_protocol_test::*;
2880    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
2881    /// # let connection = mock_connection();
2882    /// connection.on_receive_request(async |req: MyRequest, responder, cx| {
2883    ///     // Spawn a task to handle the request
2884    ///     cx.spawn({
2885    ///         let connection = cx.clone();
2886    ///         async move {
2887    ///             // Safe: We're in a spawned task, not blocking the event loop
2888    ///             let response = connection.send_request(OtherRequest {})
2889    ///                 .block_task()
2890    ///                 .await?;
2891    ///
2892    ///             // Process the response...
2893    ///             Ok(())
2894    ///         }
2895    ///     })?;
2896    ///
2897    ///     // Respond immediately
2898    ///     responder.respond(MyResponse { status: "ok".into() })
2899    /// }, agent_client_protocol::on_receive_request!())
2900    /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
2901    /// # Ok(())
2902    /// # }
2903    /// ```
2904    ///
2905    /// # Unsafe Usage (in handlers - will deadlock!)
2906    ///
2907    /// ```no_run
2908    /// # use agent_client_protocol_test::*;
2909    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
2910    /// # let connection = mock_connection();
2911    /// connection.on_receive_request(async |req: MyRequest, responder, cx| {
2912    ///     // ❌ DEADLOCK: Handler blocks event loop, which can't process the response
2913    ///     let response = cx.send_request(OtherRequest {})
2914    ///         .block_task()
2915    ///         .await?;
2916    ///
2917    ///     responder.respond(MyResponse { status: response.value })
2918    /// }, agent_client_protocol::on_receive_request!())
2919    /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
2920    /// # Ok(())
2921    /// # }
2922    /// ```
2923    ///
2924    /// # When to Use
2925    ///
2926    /// Use this method when:
2927    /// - You're in a spawned task (via [`ConnectionTo::spawn`])
2928    /// - You need the response value to proceed with your logic
2929    /// - Linear control flow is more natural than callbacks
2930    ///
2931    /// For handler callbacks, use [`on_receiving_result`](Self::on_receiving_result) instead.
2932    pub async fn block_task(self) -> Result<T, crate::Error>
2933    where
2934        T: Send,
2935    {
2936        match self.response_rx.await {
2937            Ok(ResponsePayload {
2938                result: Ok(json_value),
2939                ack_tx,
2940            }) => {
2941                // Ack immediately - we're in a spawned task, so the dispatch loop
2942                // can continue while we process the value.
2943                if let Some(tx) = ack_tx {
2944                    let _ = tx.send(());
2945                }
2946                match (self.to_result)(json_value) {
2947                    Ok(value) => Ok(value),
2948                    Err(err) => Err(err),
2949                }
2950            }
2951            Ok(ResponsePayload {
2952                result: Err(err),
2953                ack_tx,
2954            }) => {
2955                if let Some(tx) = ack_tx {
2956                    let _ = tx.send(());
2957                }
2958                Err(err)
2959            }
2960            Err(err) => Err(crate::util::internal_error(format!(
2961                "response to `{}` never received: {}",
2962                self.method, err
2963            ))),
2964        }
2965    }
2966
2967    /// Schedule an async task to run when a successful response is received.
2968    ///
2969    /// This is a convenience wrapper around [`on_receiving_result`](Self::on_receiving_result)
2970    /// for the common pattern of forwarding errors to a request context while only processing
2971    /// successful responses.
2972    ///
2973    /// # Behavior
2974    ///
2975    /// - If the response is `Ok(value)`, your task receives the value and the request context
2976    /// - If the response is `Err(error)`, the error is automatically sent to `responder`
2977    ///   and your task is not called
2978    ///
2979    /// # Example: Chaining requests
2980    ///
2981    /// ```no_run
2982    /// # use agent_client_protocol_test::*;
2983    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
2984    /// # let connection = mock_connection();
2985    /// connection.on_receive_request(async |req: ValidateRequest, responder, cx| {
2986    ///     // Send initial request
2987    ///     cx.send_request(ValidateRequest { data: req.data.clone() })
2988    ///         .on_receiving_ok_result(responder, async |validation, responder| {
2989    ///             // Only runs if validation succeeded
2990    ///             if validation.is_valid {
2991    ///                 // Respond to original request
2992    ///                 responder.respond(ValidateResponse { is_valid: true, error: None })
2993    ///             } else {
2994    ///                 responder.respond_with_error(agent_client_protocol::util::internal_error("validation failed"))
2995    ///             }
2996    ///         })?;
2997    ///
2998    ///     Ok(())
2999    /// }, agent_client_protocol::on_receive_request!())
3000    /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
3001    /// # Ok(())
3002    /// # }
3003    /// ```
3004    ///
3005    /// # Ordering
3006    ///
3007    /// Like [`on_receiving_result`](Self::on_receiving_result), the callback blocks the
3008    /// dispatch loop until it completes. See the [`ordering`](crate::concepts::ordering) module
3009    /// for details.
3010    ///
3011    /// # When to Use
3012    ///
3013    /// Use this when:
3014    /// - You need to respond to a request based on another request's result
3015    /// - You want errors to automatically propagate to the request context
3016    /// - You only care about the success case
3017    ///
3018    /// For more control over error handling, use [`on_receiving_result`](Self::on_receiving_result).
3019    #[track_caller]
3020    pub fn on_receiving_ok_result<F>(
3021        self,
3022        responder: Responder<T>,
3023        task: impl FnOnce(T, Responder<T>) -> F + 'static + Send,
3024    ) -> Result<(), crate::Error>
3025    where
3026        F: Future<Output = Result<(), crate::Error>> + 'static + Send,
3027        T: Send,
3028    {
3029        self.on_receiving_result(async move |result| match result {
3030            Ok(value) => task(value, responder).await,
3031            Err(err) => responder.respond_with_error(err),
3032        })
3033    }
3034
3035    /// Schedule an async task to run when the response is received.
3036    ///
3037    /// This is the recommended way to handle responses in handler callbacks, as it doesn't
3038    /// block the event loop. The task will be spawned automatically when the response arrives.
3039    ///
3040    /// # Example: Handle response in callback
3041    ///
3042    /// ```no_run
3043    /// # use agent_client_protocol_test::*;
3044    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
3045    /// # let connection = mock_connection();
3046    /// connection.on_receive_request(async |req: MyRequest, responder, cx| {
3047    ///     // Send a request and schedule a callback for the response
3048    ///     cx.send_request(QueryRequest { id: 22 })
3049    ///         .on_receiving_result({
3050    ///             let connection = cx.clone();
3051    ///             async move |result| {
3052    ///                 match result {
3053    ///                     Ok(response) => {
3054    ///                         println!("Got response: {:?}", response);
3055    ///                         // Can send more messages here
3056    ///                         connection.send_notification(QueryComplete {})?;
3057    ///                         Ok(())
3058    ///                 }
3059    ///                     Err(error) => {
3060    ///                         eprintln!("Request failed: {}", error);
3061    ///                         Err(error)
3062    ///                     }
3063    ///                 }
3064    ///             }
3065    ///         })?;
3066    ///
3067    ///     // Handler continues immediately without waiting
3068    ///     responder.respond(MyResponse { status: "processing".into() })
3069    /// }, agent_client_protocol::on_receive_request!())
3070    /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
3071    /// # Ok(())
3072    /// # }
3073    /// ```
3074    ///
3075    /// # Ordering
3076    ///
3077    /// The callback runs as a spawned task, but the dispatch loop waits for it to complete
3078    /// before processing the next message. This gives you ordering guarantees: no other
3079    /// messages will be processed while your callback runs.
3080    ///
3081    /// This differs from [`block_task`](Self::block_task), which signals completion immediately
3082    /// upon receiving the response (before your code processes it).
3083    ///
3084    /// See the [`ordering`](crate::concepts::ordering) module for details on ordering guarantees
3085    /// and how to avoid deadlocks.
3086    ///
3087    /// # Error Handling
3088    ///
3089    /// If the scheduled task returns `Err`, the entire server will shut down. Make sure to handle
3090    /// errors appropriately within your task.
3091    ///
3092    /// # When to Use
3093    ///
3094    /// Use this method when:
3095    /// - You're in a handler callback (not a spawned task)
3096    /// - You want ordering guarantees (no other messages processed during your callback)
3097    /// - You need to do async work before "releasing" control back to the dispatch loop
3098    ///
3099    /// For spawned tasks where you don't need ordering guarantees, consider [`block_task`](Self::block_task).
3100    #[track_caller]
3101    pub fn on_receiving_result<F>(
3102        self,
3103        task: impl FnOnce(Result<T, crate::Error>) -> F + 'static + Send,
3104    ) -> Result<(), crate::Error>
3105    where
3106        F: Future<Output = Result<(), crate::Error>> + 'static + Send,
3107        T: Send,
3108    {
3109        let task_tx = self.task_tx.clone();
3110        let method = self.method;
3111        let response_rx = self.response_rx;
3112        let to_result = self.to_result;
3113        let location = Location::caller();
3114
3115        Task::new(location, async move {
3116            match response_rx.await {
3117                Ok(ResponsePayload { result, ack_tx }) => {
3118                    // Convert the result using to_result for Ok values
3119                    let typed_result = match result {
3120                        Ok(json_value) => to_result(json_value),
3121                        Err(err) => Err(err),
3122                    };
3123
3124                    // Run the user's callback
3125                    let outcome = task(typed_result).await;
3126
3127                    // Ack AFTER the callback completes - this is the key difference
3128                    // from block_task. The dispatch loop waits for this ack.
3129                    if let Some(tx) = ack_tx {
3130                        let _ = tx.send(());
3131                    }
3132
3133                    outcome
3134                }
3135                Err(err) => Err(crate::util::internal_error(format!(
3136                    "response to `{method}` never received: {err}"
3137                ))),
3138            }
3139        })
3140        .spawn(&task_tx)
3141    }
3142}
3143
3144// ============================================================================
3145// IntoJrConnectionTransport Implementations
3146// ============================================================================
3147
3148/// A component that communicates over line streams.
3149///
3150/// `Lines` implements the [`ConnectTo`] trait for any pair of line-based streams
3151/// (a `Stream<Item = io::Result<String>>` for incoming and a `Sink<String>` for outgoing),
3152/// handling serialization of JSON-RPC messages to/from newline-delimited JSON.
3153///
3154/// This is a lower-level primitive than [`ByteStreams`] that enables interception and
3155/// transformation of individual lines before they are parsed or after they are serialized.
3156/// This is particularly useful for debugging, logging, or implementing custom line-based
3157/// protocols.
3158///
3159/// # Use Cases
3160///
3161/// - **Line-by-line logging**: Intercept and log each line before parsing
3162/// - **Custom protocols**: Transform lines before/after JSON-RPC processing
3163/// - **Debugging**: Inspect raw message strings
3164/// - **Line filtering**: Skip or modify specific messages
3165///
3166/// Most users should use [`ByteStreams`] instead, which provides a simpler interface
3167/// for byte-based I/O.
3168///
3169/// [`ConnectTo`]: crate::ConnectTo
3170#[derive(Debug)]
3171pub struct Lines<OutgoingSink, IncomingStream> {
3172    /// Outgoing line sink (where we write serialized JSON-RPC messages)
3173    pub outgoing: OutgoingSink,
3174    /// Incoming line stream (where we read and parse JSON-RPC messages)
3175    pub incoming: IncomingStream,
3176}
3177
3178impl<OutgoingSink, IncomingStream> Lines<OutgoingSink, IncomingStream>
3179where
3180    OutgoingSink: futures::Sink<String, Error = std::io::Error> + Send + 'static,
3181    IncomingStream: futures::Stream<Item = std::io::Result<String>> + Send + 'static,
3182{
3183    /// Create a new line stream transport.
3184    pub fn new(outgoing: OutgoingSink, incoming: IncomingStream) -> Self {
3185        Self { outgoing, incoming }
3186    }
3187}
3188
3189impl<OutgoingSink, IncomingStream, R: Role> ConnectTo<R> for Lines<OutgoingSink, IncomingStream>
3190where
3191    OutgoingSink: futures::Sink<String, Error = std::io::Error> + Send + 'static,
3192    IncomingStream: futures::Stream<Item = std::io::Result<String>> + Send + 'static,
3193{
3194    async fn connect_to(self, client: impl ConnectTo<R::Counterpart>) -> Result<(), crate::Error> {
3195        let (channel, serve_self) = ConnectTo::<R>::into_channel_and_future(self);
3196        match futures::future::select(Box::pin(client.connect_to(channel)), serve_self).await {
3197            Either::Left((result, _)) | Either::Right((result, _)) => result,
3198        }
3199    }
3200
3201    fn into_channel_and_future(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
3202        let Self { outgoing, incoming } = self;
3203
3204        // Create a channel pair for the client to use
3205        let (channel_for_caller, channel_for_lines) = Channel::duplex();
3206
3207        // Create the server future that runs the line stream actors
3208        let server_future = Box::pin(async move {
3209            let Channel { rx, tx } = channel_for_lines;
3210
3211            // Run both actors concurrently
3212            let outgoing_future = transport_actor::transport_outgoing_lines_actor(rx, outgoing);
3213            let incoming_future = transport_actor::transport_incoming_lines_actor(incoming, tx);
3214
3215            // Wait for both to complete
3216            futures::try_join!(outgoing_future, incoming_future)?;
3217
3218            Ok(())
3219        });
3220
3221        (channel_for_caller, server_future)
3222    }
3223}
3224
3225/// A component that communicates over byte streams (stdin/stdout, sockets, pipes, etc.).
3226///
3227/// `ByteStreams` implements the [`ConnectTo`] trait for any pair of `AsyncRead` and `AsyncWrite`
3228/// streams, handling serialization of JSON-RPC messages to/from newline-delimited JSON.
3229/// This is the standard way to communicate with external processes or network connections.
3230///
3231/// # Use Cases
3232///
3233/// - **Stdio communication**: Connect to agents or proxies via stdin/stdout
3234/// - **Network sockets**: TCP, Unix domain sockets, or other stream-based protocols
3235/// - **Named pipes**: Cross-process communication on the same machine
3236/// - **File I/O**: Reading from and writing to file descriptors
3237///
3238/// # Example
3239///
3240/// Connecting to an agent via stdio:
3241///
3242/// ```no_run
3243/// use agent_client_protocol::UntypedRole;
3244/// # use agent_client_protocol::{ByteStreams};
3245/// use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
3246///
3247/// # async fn example() -> Result<(), agent_client_protocol::Error> {
3248/// let component = ByteStreams::new(
3249///     tokio::io::stdout().compat_write(),
3250///     tokio::io::stdin().compat(),
3251/// );
3252///
3253/// // Use as a component in a connection
3254/// agent_client_protocol::UntypedRole.builder()
3255///     .name("my-client")
3256///     .connect_to(component)
3257///     .await?;
3258/// # Ok(())
3259/// # }
3260/// ```
3261///
3262/// [`ConnectTo`]: crate::ConnectTo
3263#[derive(Debug)]
3264pub struct ByteStreams<OB, IB> {
3265    /// Outgoing byte stream (where we write serialized messages)
3266    pub outgoing: OB,
3267    /// Incoming byte stream (where we read and parse messages)
3268    pub incoming: IB,
3269}
3270
3271impl<OB, IB> ByteStreams<OB, IB>
3272where
3273    OB: AsyncWrite + Send + 'static,
3274    IB: AsyncRead + Send + 'static,
3275{
3276    /// Create a new byte stream transport.
3277    pub fn new(outgoing: OB, incoming: IB) -> Self {
3278        Self { outgoing, incoming }
3279    }
3280}
3281
3282impl<OB, IB, R: Role> ConnectTo<R> for ByteStreams<OB, IB>
3283where
3284    OB: AsyncWrite + Send + 'static,
3285    IB: AsyncRead + Send + 'static,
3286{
3287    async fn connect_to(self, client: impl ConnectTo<R::Counterpart>) -> Result<(), crate::Error> {
3288        let (channel, serve_self) = ConnectTo::<R>::into_channel_and_future(self);
3289        match futures::future::select(pin!(client.connect_to(channel)), serve_self).await {
3290            Either::Left((result, _)) | Either::Right((result, _)) => result,
3291        }
3292    }
3293
3294    fn into_channel_and_future(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
3295        use futures::AsyncBufReadExt;
3296        use futures::AsyncWriteExt;
3297        use futures::io::BufReader;
3298        let Self { outgoing, incoming } = self;
3299
3300        // Convert byte streams to line streams
3301        // Box both streams to satisfy Unpin requirements
3302        let incoming_lines = Box::pin(BufReader::new(incoming).lines());
3303
3304        // Create a sink that writes lines (with newlines) to the outgoing byte stream
3305        // We need to Box the writer since it may not be Unpin
3306        let outgoing_sink =
3307            futures::sink::unfold(Box::pin(outgoing), async move |mut writer, line: String| {
3308                let mut bytes = line.into_bytes();
3309                bytes.push(b'\n');
3310                writer.write_all(&bytes).await?;
3311                Ok::<_, std::io::Error>(writer)
3312            });
3313
3314        // Delegate to Lines component
3315        ConnectTo::<R>::into_channel_and_future(Lines::new(outgoing_sink, incoming_lines))
3316    }
3317}
3318
3319/// A channel endpoint representing one side of a bidirectional message channel.
3320///
3321/// `Channel` represents a single endpoint's view of a bidirectional communication channel.
3322/// Each endpoint has:
3323/// - `rx`: A receiver for incoming messages (or errors) from the counterpart
3324/// - `tx`: A sender for outgoing messages (or errors) to the counterpart
3325///
3326/// # Example
3327///
3328/// ```no_run
3329/// # use agent_client_protocol::UntypedRole;
3330/// # use agent_client_protocol::{Channel, Builder};
3331/// # async fn example() -> Result<(), agent_client_protocol::Error> {
3332/// // Create a pair of connected channels
3333/// let (channel_a, channel_b) = Channel::duplex();
3334///
3335/// // Each channel can be used by a different component
3336/// UntypedRole.builder()
3337///     .name("connection-a")
3338///     .connect_to(channel_a)
3339///     .await?;
3340/// # Ok(())
3341/// # }
3342/// ```
3343#[derive(Debug)]
3344pub struct Channel {
3345    /// Receives messages (or errors) from the counterpart.
3346    pub rx: mpsc::UnboundedReceiver<Result<jsonrpcmsg::Message, crate::Error>>,
3347    /// Sends messages (or errors) to the counterpart.
3348    pub tx: mpsc::UnboundedSender<Result<jsonrpcmsg::Message, crate::Error>>,
3349}
3350
3351impl Channel {
3352    /// Create a pair of connected channel endpoints.
3353    ///
3354    /// Returns two `Channel` instances that are connected to each other:
3355    /// - Messages sent via `channel_a.tx` are received on `channel_b.rx`
3356    /// - Messages sent via `channel_b.tx` are received on `channel_a.rx`
3357    ///
3358    /// # Returns
3359    ///
3360    /// A tuple `(channel_a, channel_b)` of connected channel endpoints.
3361    #[must_use]
3362    pub fn duplex() -> (Self, Self) {
3363        // Create channels: A sends Result<Message> which B receives as Message
3364        let (a_tx, b_rx) = mpsc::unbounded();
3365        let (b_tx, a_rx) = mpsc::unbounded();
3366
3367        let channel_a = Self { rx: a_rx, tx: a_tx };
3368        let channel_b = Self { rx: b_rx, tx: b_tx };
3369
3370        (channel_a, channel_b)
3371    }
3372
3373    /// Copy messages from `rx` to `tx`.
3374    ///
3375    /// # Returns
3376    ///
3377    /// A `Result` indicating success or failure.
3378    pub async fn copy(mut self) -> Result<(), crate::Error> {
3379        while let Some(msg) = self.rx.next().await {
3380            self.tx
3381                .unbounded_send(msg)
3382                .map_err(crate::util::internal_error)?;
3383        }
3384        Ok(())
3385    }
3386}
3387
3388impl<R: Role> ConnectTo<R> for Channel {
3389    async fn connect_to(self, client: impl ConnectTo<R::Counterpart>) -> Result<(), crate::Error> {
3390        let (client_channel, client_serve) = client.into_channel_and_future();
3391
3392        match futures::try_join!(
3393            Channel {
3394                rx: client_channel.rx,
3395                tx: self.tx
3396            }
3397            .copy(),
3398            Channel {
3399                rx: self.rx,
3400                tx: client_channel.tx
3401            }
3402            .copy(),
3403            client_serve
3404        ) {
3405            Ok(((), (), ())) => Ok(()),
3406            Err(err) => Err(err),
3407        }
3408    }
3409
3410    fn into_channel_and_future(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
3411        (self, Box::pin(future::ready(Ok(()))))
3412    }
3413}