Skip to main content

agent_client_protocol/
jsonrpc.rs

1//! Core JSON-RPC server support.
2
3use agent_client_protocol_schema::SessionId;
4// Re-export jsonrpcmsg for use in public API
5pub use jsonrpcmsg;
6
7// Types re-exported from crate root
8use serde::{Deserialize, Serialize};
9use std::fmt::Debug;
10use std::panic::Location;
11use std::pin::pin;
12use uuid::Uuid;
13
14use futures::channel::{mpsc, oneshot};
15use futures::future::{self, BoxFuture, Either};
16use futures::{AsyncRead, AsyncWrite, StreamExt};
17
18mod dynamic_handler;
19pub(crate) mod handlers;
20mod incoming_actor;
21mod outgoing_actor;
22pub(crate) mod run;
23mod task_actor;
24mod transport_actor;
25
26use crate::jsonrpc::dynamic_handler::DynamicHandlerMessage;
27pub use crate::jsonrpc::handlers::NullHandler;
28use crate::jsonrpc::handlers::{ChainedHandler, NamedHandler};
29use crate::jsonrpc::handlers::{MessageHandler, NotificationHandler, RequestHandler};
30use crate::jsonrpc::outgoing_actor::{OutgoingMessageTx, send_raw_message};
31use crate::jsonrpc::run::SpawnedRun;
32use crate::jsonrpc::run::{ChainRun, NullRun, RunWithConnectionTo};
33use crate::jsonrpc::task_actor::{Task, TaskTx};
34use crate::mcp_server::McpServer;
35use crate::role::HasPeer;
36use crate::role::Role;
37use crate::util::json_cast;
38use crate::{Agent, Client, ConnectTo, RoleId};
39
40/// Handlers process incoming JSON-RPC messages on a connection.
41///
42/// When messages arrive, they flow through a chain of handlers. Each handler can
43/// either **claim** the message (handle it) or **decline** it (pass to the next handler).
44///
45/// # Message Flow
46///
47/// Messages flow through three layers of handlers in order:
48///
49/// ```text
50/// ┌─────────────────────────────────────────────────────────────────┐
51/// │                     Incoming Message                            │
52/// └─────────────────────────────────────────────────────────────────┘
53///                              │
54///                              ▼
55/// ┌─────────────────────────────────────────────────────────────────┐
56/// │  1. User Handlers (registered via on_receive_request, etc.)     │
57/// │     - Tried in registration order                               │
58/// │     - First handler to return Handled::Yes claims the message   │
59/// └─────────────────────────────────────────────────────────────────┘
60///                              │ Handled::No
61///                              ▼
62/// ┌─────────────────────────────────────────────────────────────────┐
63/// │  2. Dynamic Handlers (added at runtime)                         │
64/// │     - Used for session-specific message handling                │
65/// │     - Added via ConnectionTo::add_dynamic_handler             │
66/// └─────────────────────────────────────────────────────────────────┘
67///                              │ Handled::No
68///                              ▼
69/// ┌─────────────────────────────────────────────────────────────────┐
70/// │  3. Role Default Handler                                        │
71/// │     - Fallback based on the connection's Role                   │
72/// │     - Handles protocol-level messages (e.g., proxy forwarding)  │
73/// └─────────────────────────────────────────────────────────────────┘
74///                              │ Handled::No
75///                              ▼
76/// ┌─────────────────────────────────────────────────────────────────┐
77/// │  Unhandled: Error response sent (or queued if retry=true)       │
78/// └─────────────────────────────────────────────────────────────────┘
79/// ```
80///
81/// # The `Handled` Return Value
82///
83/// Each handler returns [`Handled`] to indicate whether it processed the message:
84///
85/// - **`Handled::Yes`** - Message was handled. No further handlers are invoked.
86/// - **`Handled::No { message, retry }`** - Message was not handled. The message
87///   (possibly modified) is passed to the next handler in the chain.
88///
89/// For convenience, handlers can return `()` which is equivalent to `Handled::Yes`.
90///
91/// # The Retry Mechanism
92///
93/// The `retry` flag in `Handled::No` controls what happens when no handler claims a message:
94///
95/// - **`retry: false`** (default) - Send a "method not found" error response immediately.
96/// - **`retry: true`** - Queue the message and retry it when new dynamic handlers are added.
97///
98/// This mechanism exists because of a timing issue with sessions: when a `session/new`
99/// response is being processed, the dynamic handler for that session hasn't been registered
100/// yet, but `session/update` notifications for that session may already be arriving.
101/// By setting `retry: true`, these early notifications are queued until the session's
102/// dynamic handler is added.
103///
104/// # Handler Registration
105///
106/// Most users register handlers using the builder methods on [`Builder`]:
107///
108/// ```
109/// # use agent_client_protocol::{Agent, Client, ConnectTo};
110/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, AgentCapabilities};
111/// # use agent_client_protocol_test::StatusUpdate;
112/// # async fn example(transport: impl ConnectTo<Agent>) -> Result<(), agent_client_protocol::Error> {
113/// Agent.builder()
114///     .on_receive_request(async |req: InitializeRequest, responder, cx| {
115///         responder.respond(
116///             InitializeResponse::new(req.protocol_version)
117///                 .agent_capabilities(AgentCapabilities::new()),
118///         )
119///     }, agent_client_protocol::on_receive_request!())
120///     .on_receive_notification(async |notif: StatusUpdate, cx| {
121///         // Process notification
122///         Ok(())
123///     }, agent_client_protocol::on_receive_notification!())
124///     .connect_to(transport)
125///     .await?;
126/// # Ok(())
127/// # }
128/// ```
129///
130/// The type parameter on the closure determines which messages are dispatched to it.
131/// Messages that don't match the type are automatically passed to the next handler.
132///
133/// # Implementing Custom Handlers
134///
135/// For advanced use cases, you can implement `HandleMessageAs` directly:
136///
137/// ```ignore
138/// struct MyHandler;
139///
140/// impl HandleMessageAs<Agent> for MyHandler {
141///
142///     async fn handle_dispatch(
143///         &mut self,
144///         message: Dispatch,
145///         cx: ConnectionTo<Self::Role>,
146///     ) -> Result<Handled<Dispatch>, Error> {
147///         if message.method() == "my/custom/method" {
148///             // Handle it
149///             Ok(Handled::Yes)
150///         } else {
151///             // Pass to next handler
152///             Ok(Handled::No { message, retry: false })
153///         }
154///     }
155///
156///     fn describe_chain(&self) -> impl std::fmt::Debug {
157///         "MyHandler"
158///     }
159/// }
160/// ```
161///
162/// # Important: Handlers Must Not Block
163///
164/// The connection processes messages on a single async task. While a handler is running,
165/// no other messages can be processed. For expensive operations, use [`ConnectionTo::spawn`]
166/// to run work concurrently:
167///
168/// ```
169/// # use agent_client_protocol::{Client, Agent, ConnectTo};
170/// # use agent_client_protocol_test::{expensive_operation, ProcessComplete};
171/// # async fn example(transport: impl ConnectTo<Client>) -> Result<(), agent_client_protocol::Error> {
172/// # Client.builder().connect_with(transport, async |cx| {
173/// cx.spawn({
174///     let connection = cx.clone();
175///     async move {
176///         let result = expensive_operation("data").await?;
177///         connection.send_notification(ProcessComplete { result })?;
178///         Ok(())
179///     }
180/// })?;
181/// # Ok(())
182/// # }).await?;
183/// # Ok(())
184/// # }
185/// ```
186#[allow(async_fn_in_trait)]
187/// A handler for incoming JSON-RPC messages.
188///
189/// This trait is implemented by types that can process incoming messages on a connection.
190/// Handlers are registered with a [`Builder`] and are called in order until
191/// one claims the message.
192///
193/// The type parameter `R` is the role this handler plays - who I am.
194/// For an agent handler, `R = Agent` (I handle messages as an agent).
195/// For a client handler, `R = Client` (I handle messages as a client).
196pub trait HandleDispatchFrom<Counterpart: Role>: Send {
197    /// Attempt to claim an incoming message (request or notification).
198    ///
199    /// # Important: do not block
200    ///
201    /// The server will not process new messages until this handler returns.
202    /// You should avoid blocking in this callback unless you wish to block the server (e.g., for rate limiting).
203    /// The recommended approach to manage expensive operations is to the [`ConnectionTo::spawn`] method available on the message context.
204    ///
205    /// # Parameters
206    ///
207    /// * `message` - The incoming message to handle.
208    /// * `connection` - The connection, used to send messages and access connection state.
209    ///
210    /// # Returns
211    ///
212    /// * `Ok(Handled::Yes)` if the message was claimed. It will not be propagated further.
213    /// * `Ok(Handled::No(message))` if not; the (possibly changed) message will be passed to the remaining handlers.
214    /// * `Err` if an internal error occurs (this will bring down the server).
215    fn handle_dispatch_from(
216        &mut self,
217        message: Dispatch,
218        connection: ConnectionTo<Counterpart>,
219    ) -> impl Future<Output = Result<Handled<Dispatch>, crate::Error>> + Send;
220
221    /// Returns a debug description of the registered handlers for diagnostics.
222    fn describe_chain(&self) -> impl std::fmt::Debug;
223}
224
225impl<Counterpart: Role, H> HandleDispatchFrom<Counterpart> for &mut H
226where
227    H: HandleDispatchFrom<Counterpart>,
228{
229    fn handle_dispatch_from(
230        &mut self,
231        message: Dispatch,
232        cx: ConnectionTo<Counterpart>,
233    ) -> impl Future<Output = Result<Handled<Dispatch>, crate::Error>> + Send {
234        H::handle_dispatch_from(self, message, cx)
235    }
236
237    fn describe_chain(&self) -> impl std::fmt::Debug {
238        H::describe_chain(self)
239    }
240}
241
242/// A JSON-RPC connection that can act as either a server, client, or both.
243///
244/// [`Builder`] provides a builder-style API for creating JSON-RPC servers and clients.
245/// You start by calling `Role.builder()` (e.g., `Client.builder()`), then add message
246/// handlers, and finally drive the connection with either [`connect_to`](Builder::connect_to)
247/// or [`connect_with`](Builder::connect_with), providing a component implementation
248/// (e.g., [`ByteStreams`] for byte streams).
249///
250/// # JSON-RPC Primer
251///
252/// JSON-RPC 2.0 has two fundamental message types:
253///
254/// * **Requests** - Messages that expect a response. They have an `id` field that gets
255///   echoed back in the response so the sender can correlate them.
256/// * **Notifications** - Fire-and-forget messages with no `id` field. The sender doesn't
257///   expect or receive a response.
258///
259/// # Type-Driven Message Dispatch
260///
261/// The handler registration methods use Rust's type system to determine which messages
262/// to handle. The type parameter you provide controls what gets dispatched to your handler:
263///
264/// ## Single Message Types
265///
266/// The simplest case - handle one specific message type:
267///
268/// ```no_run
269/// # use agent_client_protocol_test::*;
270/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, SessionNotification};
271/// # async fn example() -> Result<(), agent_client_protocol::Error> {
272/// # let connection = mock_connection();
273/// connection
274///     .on_receive_request(async |req: InitializeRequest, responder, cx| {
275///         // Handle only InitializeRequest messages
276///         responder.respond(InitializeResponse::make())
277///     }, agent_client_protocol::on_receive_request!())
278///     .on_receive_notification(async |notif: SessionNotification, cx| {
279///         // Handle only SessionUpdate notifications
280///         Ok(())
281///     }, agent_client_protocol::on_receive_notification!())
282/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
283/// # Ok(())
284/// # }
285/// ```
286///
287/// ## Enum Message Types
288///
289/// You can also handle multiple related messages with a single handler by defining an enum
290/// that implements the appropriate trait ([`JsonRpcRequest`] or [`JsonRpcNotification`]):
291///
292/// ```no_run
293/// # use agent_client_protocol_test::*;
294/// # use agent_client_protocol::{JsonRpcRequest, JsonRpcMessage, UntypedMessage};
295/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse};
296/// # async fn example() -> Result<(), agent_client_protocol::Error> {
297/// # let connection = mock_connection();
298/// // Define an enum for multiple request types
299/// #[derive(Debug, Clone)]
300/// enum MyRequests {
301///     Initialize(InitializeRequest),
302///     Prompt(PromptRequest),
303/// }
304///
305/// // Implement JsonRpcRequest for your enum
306/// # impl JsonRpcMessage for MyRequests {
307/// #     fn matches_method(_method: &str) -> bool { false }
308/// #     fn method(&self) -> &str { "myRequests" }
309/// #     fn to_untyped_message(&self) -> Result<UntypedMessage, agent_client_protocol::Error> { todo!() }
310/// #     fn parse_message(_method: &str, _params: &impl serde::Serialize) -> Result<Self, agent_client_protocol::Error> { Err(agent_client_protocol::Error::method_not_found()) }
311/// # }
312/// impl JsonRpcRequest for MyRequests { type Response = serde_json::Value; }
313///
314/// // Handle all variants in one place
315/// connection.on_receive_request(async |req: MyRequests, responder, cx| {
316///     match req {
317///         MyRequests::Initialize(init) => { responder.respond(serde_json::json!({})) }
318///         MyRequests::Prompt(prompt) => { responder.respond(serde_json::json!({})) }
319///     }
320/// }, agent_client_protocol::on_receive_request!())
321/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
322/// # Ok(())
323/// # }
324/// ```
325///
326/// ## Mixed Message Types
327///
328/// For enums containing both requests AND notifications, use [`on_receive_dispatch`](Self::on_receive_dispatch):
329///
330/// ```no_run
331/// # use agent_client_protocol_test::*;
332/// # use agent_client_protocol::Dispatch;
333/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, SessionNotification};
334/// # async fn example() -> Result<(), agent_client_protocol::Error> {
335/// # let connection = mock_connection();
336/// // on_receive_dispatch receives Dispatch which can be either a request or notification
337/// connection.on_receive_dispatch(async |msg: Dispatch<InitializeRequest, SessionNotification>, _cx| {
338///     match msg {
339///         Dispatch::Request(req, responder) => {
340///             responder.respond(InitializeResponse::make())
341///         }
342///         Dispatch::Notification(notif) => {
343///             Ok(())
344///         }
345///         Dispatch::Response(result, router) => {
346///             // Forward response to its destination
347///             router.respond_with_result(result)
348///         }
349///     }
350/// }, agent_client_protocol::on_receive_dispatch!())
351/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
352/// # Ok(())
353/// # }
354/// ```
355///
356/// # Handler Registration
357///
358/// Register handlers using these methods (listed from most common to most flexible):
359///
360/// * [`on_receive_request`](Self::on_receive_request) - Handle JSON-RPC requests (messages expecting responses)
361/// * [`on_receive_notification`](Self::on_receive_notification) - Handle JSON-RPC notifications (fire-and-forget)
362/// * [`on_receive_dispatch`](Self::on_receive_dispatch) - Handle enums containing both requests and notifications
363/// * [`with_handler`](Self::with_handler) - Low-level primitive for maximum flexibility
364///
365/// ## Handler Ordering
366///
367/// Handlers are tried in the order you register them. The first handler that claims a message
368/// (by matching its type) will process it. Subsequent handlers won't see that message:
369///
370/// ```no_run
371/// # use agent_client_protocol_test::*;
372/// # use agent_client_protocol::{Dispatch, UntypedMessage};
373/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse};
374/// # async fn example() -> Result<(), agent_client_protocol::Error> {
375/// # let connection = mock_connection();
376/// connection
377///     .on_receive_request(async |req: InitializeRequest, responder, cx| {
378///         // This runs first for InitializeRequest
379///         responder.respond(InitializeResponse::make())
380///     }, agent_client_protocol::on_receive_request!())
381///     .on_receive_request(async |req: PromptRequest, responder, cx| {
382///         // This runs first for PromptRequest
383///         responder.respond(PromptResponse::make())
384///     }, agent_client_protocol::on_receive_request!())
385///     .on_receive_dispatch(async |msg: Dispatch, cx| {
386///         // This runs for any message not handled above
387///         msg.respond_with_error(agent_client_protocol::util::internal_error("unknown method"), cx)
388///     }, agent_client_protocol::on_receive_dispatch!())
389/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
390/// # Ok(())
391/// # }
392/// ```
393///
394/// # Event Loop and Concurrency
395///
396/// Understanding the event loop is critical for writing correct handlers.
397///
398/// ## The Event Loop
399///
400/// [`Builder`] runs all handler callbacks on a single async task - the event loop.
401/// While a handler is running, **the server cannot receive new messages**. This means
402/// any blocking or expensive work in your handlers will stall the entire connection.
403///
404/// To avoid blocking the event loop, use [`ConnectionTo::spawn`] to offload serious
405/// work to concurrent tasks:
406///
407/// ```no_run
408/// # use agent_client_protocol_test::*;
409/// # async fn example() -> Result<(), agent_client_protocol::Error> {
410/// # let connection = mock_connection();
411/// connection.on_receive_request(async |req: AnalyzeRequest, responder, cx| {
412///     // Clone cx for the spawned task
413///     cx.spawn({
414///         let connection = cx.clone();
415///         async move {
416///             let result = expensive_analysis(&req.data).await?;
417///             connection.send_notification(AnalysisComplete { result })?;
418///             Ok(())
419///         }
420///     })?;
421///
422///     // Respond immediately without blocking
423///     responder.respond(AnalysisStarted { job_id: 42 })
424/// }, agent_client_protocol::on_receive_request!())
425/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
426/// # Ok(())
427/// # }
428/// ```
429///
430/// Note that the entire connection runs within one async task, so parallelism must be
431/// managed explicitly using [`spawn`](ConnectionTo::spawn).
432///
433/// ## The Connection Context
434///
435/// Handler callbacks receive a context object (`cx`) for interacting with the connection:
436///
437/// * **For request handlers** - [`Responder<R>`] provides [`respond`](Responder::respond)
438///   to send the response, plus methods to send other messages
439/// * **For notification handlers** - [`ConnectionTo`] provides methods to send messages
440///   and spawn tasks
441///
442/// Both context types support:
443/// * [`send_request`](ConnectionTo::send_request) - Send requests to the other side
444/// * [`send_notification`](ConnectionTo::send_notification) - Send notifications
445/// * [`spawn`](ConnectionTo::spawn) - Run tasks concurrently without blocking the event loop
446///
447/// The [`SentRequest`] returned by `send_request` provides methods like
448/// [`on_receiving_result`](SentRequest::on_receiving_result) that help you
449/// avoid accidentally blocking the event loop while waiting for responses.
450///
451/// # Driving the Connection
452///
453/// After adding handlers, you must drive the connection using one of two modes:
454///
455/// ## Server Mode: `connect_to()`
456///
457/// Use [`connect_to`](Self::connect_to) when you only need to respond to incoming messages:
458///
459/// ```no_run
460/// # use agent_client_protocol_test::*;
461/// # async fn example() -> Result<(), agent_client_protocol::Error> {
462/// # let connection = mock_connection();
463/// connection
464///     .on_receive_request(async |req: MyRequest, responder, cx| {
465///         responder.respond(MyResponse { status: "ok".into() })
466///     }, agent_client_protocol::on_receive_request!())
467///     .connect_to(MockTransport)  // Runs until connection closes or error occurs
468///     .await?;
469/// # Ok(())
470/// # }
471/// ```
472///
473/// The connection will process incoming messages and invoke your handlers until the
474/// connection is closed or an error occurs.
475///
476/// ## Client Mode: `connect_with()`
477///
478/// Use [`connect_with`](Self::connect_with) when you need to both handle incoming messages
479/// AND send your own requests/notifications:
480///
481/// ```no_run
482/// # use agent_client_protocol_test::*;
483/// # use agent_client_protocol::schema::InitializeRequest;
484/// # async fn example() -> Result<(), agent_client_protocol::Error> {
485/// # let connection = mock_connection();
486/// connection
487///     .on_receive_request(async |req: MyRequest, responder, cx| {
488///         responder.respond(MyResponse { status: "ok".into() })
489///     }, agent_client_protocol::on_receive_request!())
490///     .connect_with(MockTransport, async |cx| {
491///         // You can send requests to the other side
492///         let response = cx.send_request(InitializeRequest::make())
493///             .block_task()
494///             .await?;
495///
496///         // And send notifications
497///         cx.send_notification(StatusUpdate { message: "ready".into() })?;
498///
499///         Ok(())
500///     })
501///     .await?;
502/// # Ok(())
503/// # }
504/// ```
505///
506/// The connection will serve incoming messages in the background while your client closure
507/// runs. When the closure returns, the connection shuts down.
508///
509/// # Example: Complete Agent
510///
511/// ```no_run
512/// # use agent_client_protocol::UntypedRole;
513/// # use agent_client_protocol::{Builder};
514/// # use agent_client_protocol::ByteStreams;
515/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse, SessionNotification};
516/// # use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
517/// # async fn example() -> Result<(), agent_client_protocol::Error> {
518/// let transport = ByteStreams::new(
519///     tokio::io::stdout().compat_write(),
520///     tokio::io::stdin().compat(),
521/// );
522///
523/// UntypedRole.builder()
524///     .name("my-agent")  // Optional: for debugging logs
525///     .on_receive_request(async |init: InitializeRequest, responder, cx| {
526///         let response: InitializeResponse = todo!();
527///         responder.respond(response)
528///     }, agent_client_protocol::on_receive_request!())
529///     .on_receive_request(async |prompt: PromptRequest, responder, cx| {
530///         // You can send notifications while processing a request
531///         let notif: SessionNotification = todo!();
532///         cx.send_notification(notif)?;
533///
534///         // Then respond to the request
535///         let response: PromptResponse = todo!();
536///         responder.respond(response)
537///     }, agent_client_protocol::on_receive_request!())
538///     .connect_to(transport)
539///     .await?;
540/// # Ok(())
541/// # }
542/// ```
543#[must_use]
544#[derive(Debug)]
545pub struct Builder<Host: Role, Handler = NullHandler, Runner = NullRun>
546where
547    Handler: HandleDispatchFrom<Host::Counterpart>,
548    Runner: RunWithConnectionTo<Host::Counterpart>,
549{
550    /// My role.
551    host: Host,
552
553    /// Name of the connection, used in tracing logs.
554    name: Option<String>,
555
556    /// Handler for incoming messages.
557    handler: Handler,
558
559    /// Responder for background tasks.
560    responder: Runner,
561}
562
563impl<Host: Role> Builder<Host, NullHandler, NullRun> {
564    /// Create a new connection builder for the given role.
565    /// This type follows a builder pattern; use other methods to configure and then invoke
566    /// [`Self::connect_to`] (to use as a server) or [`Self::connect_with`] to use as a client.
567    pub fn new(role: Host) -> Self {
568        Self {
569            host: role,
570            name: None,
571            handler: NullHandler,
572            responder: NullRun,
573        }
574    }
575}
576
577impl<Host: Role, Handler> Builder<Host, Handler, NullRun>
578where
579    Handler: HandleDispatchFrom<Host::Counterpart>,
580{
581    /// Create a new connection builder with the given handler.
582    pub fn new_with(role: Host, handler: Handler) -> Self {
583        Self {
584            host: role,
585            name: None,
586            handler,
587            responder: NullRun,
588        }
589    }
590}
591
592impl<
593    Host: Role,
594    Handler: HandleDispatchFrom<Host::Counterpart>,
595    Runner: RunWithConnectionTo<Host::Counterpart>,
596> Builder<Host, Handler, Runner>
597{
598    /// Set the "name" of this connection -- used only for debugging logs.
599    pub fn name(mut self, name: impl ToString) -> Self {
600        self.name = Some(name.to_string());
601        self
602    }
603
604    /// Merge another [`Builder`] into this one.
605    ///
606    /// Prefer [`Self::on_receive_request`] or [`Self::on_receive_notification`].
607    /// This is a low-level method that is not intended for general use.
608    pub fn with_connection_builder(
609        self,
610        other: Builder<
611            Host,
612            impl HandleDispatchFrom<Host::Counterpart>,
613            impl RunWithConnectionTo<Host::Counterpart>,
614        >,
615    ) -> Builder<
616        Host,
617        impl HandleDispatchFrom<Host::Counterpart>,
618        impl RunWithConnectionTo<Host::Counterpart>,
619    > {
620        Builder {
621            host: self.host,
622            name: self.name,
623            handler: ChainedHandler::new(
624                self.handler,
625                NamedHandler::new(other.name, other.handler),
626            ),
627            responder: ChainRun::new(self.responder, other.responder),
628        }
629    }
630
631    /// Add a new [`HandleDispatchFrom`] to the chain.
632    ///
633    /// Prefer [`Self::on_receive_request`] or [`Self::on_receive_notification`].
634    /// This is a low-level method that is not intended for general use.
635    pub fn with_handler(
636        self,
637        handler: impl HandleDispatchFrom<Host::Counterpart>,
638    ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner> {
639        Builder {
640            host: self.host,
641            name: self.name,
642            handler: ChainedHandler::new(self.handler, handler),
643            responder: self.responder,
644        }
645    }
646
647    /// Add a new [`RunWithConnectionTo`] to the chain.
648    pub fn with_responder<Run1>(
649        self,
650        responder: Run1,
651    ) -> Builder<Host, Handler, impl RunWithConnectionTo<Host::Counterpart>>
652    where
653        Run1: RunWithConnectionTo<Host::Counterpart>,
654    {
655        Builder {
656            host: self.host,
657            name: self.name,
658            handler: self.handler,
659            responder: ChainRun::new(self.responder, responder),
660        }
661    }
662
663    /// Enqueue a task to run once the connection is actively serving traffic.
664    #[track_caller]
665    pub fn with_spawned<F, Fut>(
666        self,
667        task: F,
668    ) -> Builder<Host, Handler, impl RunWithConnectionTo<Host::Counterpart>>
669    where
670        F: FnOnce(ConnectionTo<Host::Counterpart>) -> Fut + Send,
671        Fut: Future<Output = Result<(), crate::Error>> + Send,
672    {
673        let location = Location::caller();
674        self.with_responder(SpawnedRun::new(location, task))
675    }
676
677    /// Register a handler for messages that can be either requests OR notifications.
678    ///
679    /// Use this when you want to handle an enum type that contains both request and
680    /// notification variants. Your handler receives a [`Dispatch<Req, Notif>`] which
681    /// is an enum with two variants:
682    ///
683    /// - `Dispatch::Request(request, responder)` - A request with its response context
684    /// - `Dispatch::Notification(notification)` - A notification
685    /// - `Dispatch::Response(result, router)` - A response to a request we sent
686    ///
687    /// # Example
688    ///
689    /// ```no_run
690    /// # use agent_client_protocol_test::*;
691    /// # use agent_client_protocol::Dispatch;
692    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
693    /// # let connection = mock_connection();
694    /// connection.on_receive_dispatch(async |message: Dispatch<MyRequest, StatusUpdate>, _cx| {
695    ///     match message {
696    ///         Dispatch::Request(req, responder) => {
697    ///             // Handle request and send response
698    ///             responder.respond(MyResponse { status: "ok".into() })
699    ///         }
700    ///         Dispatch::Notification(notif) => {
701    ///             // Handle notification (no response needed)
702    ///             Ok(())
703    ///         }
704    ///         Dispatch::Response(result, router) => {
705    ///             // Forward response to its destination
706    ///             router.respond_with_result(result)
707    ///         }
708    ///     }
709    /// }, agent_client_protocol::on_receive_dispatch!())
710    /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
711    /// # Ok(())
712    /// # }
713    /// ```
714    ///
715    /// For most use cases, prefer [`on_receive_request`](Self::on_receive_request) or
716    /// [`on_receive_notification`](Self::on_receive_notification) which provide cleaner APIs
717    /// for handling requests or notifications separately.
718    ///
719    /// # Ordering
720    ///
721    /// This callback runs inside the dispatch loop and blocks further message processing
722    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
723    /// ordering guarantees and how to avoid deadlocks.
724    pub fn on_receive_dispatch<Req, Notif, F, T, ToFut>(
725        self,
726        op: F,
727        to_future_hack: ToFut,
728    ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
729    where
730        Host::Counterpart: HasPeer<Host::Counterpart>,
731        Req: JsonRpcRequest,
732        Notif: JsonRpcNotification,
733        F: AsyncFnMut(
734                Dispatch<Req, Notif>,
735                ConnectionTo<Host::Counterpart>,
736            ) -> Result<T, crate::Error>
737            + Send,
738        T: IntoHandled<Dispatch<Req, Notif>>,
739        ToFut: Fn(
740                &mut F,
741                Dispatch<Req, Notif>,
742                ConnectionTo<Host::Counterpart>,
743            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
744            + Send
745            + Sync,
746    {
747        let handler = MessageHandler::new(
748            self.host.counterpart(),
749            self.host.counterpart(),
750            op,
751            to_future_hack,
752        );
753        self.with_handler(handler)
754    }
755
756    /// Register a handler for JSON-RPC requests of type `Req`.
757    ///
758    /// Your handler receives two arguments:
759    /// 1. The request (type `Req`)
760    /// 2. A [`Responder<R, Req::Response>`] for sending the response
761    ///
762    /// The request context allows you to:
763    /// - Send the response with [`Responder::respond`]
764    /// - Send notifications to the client with [`ConnectionTo::send_notification`]
765    /// - Send requests to the client with [`ConnectionTo::send_request`]
766    ///
767    /// # Example
768    ///
769    /// ```ignore
770    /// # use agent_client_protocol::UntypedRole;
771    /// # use agent_client_protocol::{Builder};
772    /// # use agent_client_protocol::schema::{PromptRequest, PromptResponse, SessionNotification};
773    /// # fn example<R: agent_client_protocol::Role>(connection: Builder<R, impl agent_client_protocol::HandleMessageAs<R>>) {
774    /// connection.on_receive_request(async |request: PromptRequest, responder, cx| {
775    ///     // Send a notification while processing
776    ///     let notif: SessionNotification = todo!();
777    ///     cx.send_notification(notif)?;
778    ///
779    ///     // Do some work...
780    ///     let result = todo!("process the prompt");
781    ///
782    ///     // Send the response
783    ///     let response: PromptResponse = todo!();
784    ///     responder.respond(response)
785    /// }, agent_client_protocol::on_receive_request!());
786    /// # }
787    /// ```
788    ///
789    /// # Type Parameter
790    ///
791    /// `Req` can be either a single request type or an enum of multiple request types.
792    /// See the [type-driven dispatch](Self#type-driven-message-dispatch) section for details.
793    ///
794    /// # Ordering
795    ///
796    /// This callback runs inside the dispatch loop and blocks further message processing
797    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
798    /// ordering guarantees and how to avoid deadlocks.
799    pub fn on_receive_request<Req: JsonRpcRequest, F, T, ToFut>(
800        self,
801        op: F,
802        to_future_hack: ToFut,
803    ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
804    where
805        Host::Counterpart: HasPeer<Host::Counterpart>,
806        F: AsyncFnMut(
807                Req,
808                Responder<Req::Response>,
809                ConnectionTo<Host::Counterpart>,
810            ) -> Result<T, crate::Error>
811            + Send,
812        T: IntoHandled<(Req, Responder<Req::Response>)>,
813        ToFut: Fn(
814                &mut F,
815                Req,
816                Responder<Req::Response>,
817                ConnectionTo<Host::Counterpart>,
818            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
819            + Send
820            + Sync,
821    {
822        let handler = RequestHandler::new(
823            self.host.counterpart(),
824            self.host.counterpart(),
825            op,
826            to_future_hack,
827        );
828        self.with_handler(handler)
829    }
830
831    /// Register a handler for JSON-RPC notifications of type `Notif`.
832    ///
833    /// Notifications are fire-and-forget messages that don't expect a response.
834    /// Your handler receives:
835    /// 1. The notification (type `Notif`)
836    /// 2. A [`ConnectionTo<R>`] for sending messages to the other side
837    ///
838    /// Unlike request handlers, you cannot send a response (notifications don't have IDs),
839    /// but you can still send your own requests and notifications using the context.
840    ///
841    /// # Example
842    ///
843    /// ```no_run
844    /// # use agent_client_protocol_test::*;
845    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
846    /// # let connection = mock_connection();
847    /// connection.on_receive_notification(async |notif: SessionUpdate, cx| {
848    ///     // Process the notification
849    ///     update_session_state(&notif)?;
850    ///
851    ///     // Optionally send a notification back
852    ///     cx.send_notification(StatusUpdate {
853    ///         message: "Acknowledged".into(),
854    ///     })?;
855    ///
856    ///     Ok(())
857    /// }, agent_client_protocol::on_receive_notification!())
858    /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
859    /// # Ok(())
860    /// # }
861    /// ```
862    ///
863    /// # Type Parameter
864    ///
865    /// `Notif` can be either a single notification type or an enum of multiple notification types.
866    /// See the [type-driven dispatch](Self#type-driven-message-dispatch) section for details.
867    ///
868    /// # Ordering
869    ///
870    /// This callback runs inside the dispatch loop and blocks further message processing
871    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
872    /// ordering guarantees and how to avoid deadlocks.
873    pub fn on_receive_notification<Notif, F, T, ToFut>(
874        self,
875        op: F,
876        to_future_hack: ToFut,
877    ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
878    where
879        Host::Counterpart: HasPeer<Host::Counterpart>,
880        Notif: JsonRpcNotification,
881        F: AsyncFnMut(Notif, ConnectionTo<Host::Counterpart>) -> Result<T, crate::Error> + Send,
882        T: IntoHandled<(Notif, ConnectionTo<Host::Counterpart>)>,
883        ToFut: Fn(
884                &mut F,
885                Notif,
886                ConnectionTo<Host::Counterpart>,
887            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
888            + Send
889            + Sync,
890    {
891        let handler = NotificationHandler::new(
892            self.host.counterpart(),
893            self.host.counterpart(),
894            op,
895            to_future_hack,
896        );
897        self.with_handler(handler)
898    }
899
900    /// Register a handler for messages from a specific peer.
901    ///
902    /// This is similar to [`on_receive_dispatch`](Self::on_receive_dispatch), but allows
903    /// specifying the source peer explicitly. This is useful when receiving messages
904    /// from a peer that requires message transformation (e.g., unwrapping `SuccessorMessage`
905    /// envelopes when receiving from an agent via a proxy).
906    ///
907    /// For the common case of receiving from the default counterpart, use
908    /// [`on_receive_dispatch`](Self::on_receive_dispatch) instead.
909    ///
910    /// # Ordering
911    ///
912    /// This callback runs inside the dispatch loop and blocks further message processing
913    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
914    /// ordering guarantees and how to avoid deadlocks.
915    pub fn on_receive_dispatch_from<
916        Req: JsonRpcRequest,
917        Notif: JsonRpcNotification,
918        Peer: Role,
919        F,
920        T,
921        ToFut,
922    >(
923        self,
924        peer: Peer,
925        op: F,
926        to_future_hack: ToFut,
927    ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
928    where
929        Host::Counterpart: HasPeer<Peer>,
930        F: AsyncFnMut(
931                Dispatch<Req, Notif>,
932                ConnectionTo<Host::Counterpart>,
933            ) -> Result<T, crate::Error>
934            + Send,
935        T: IntoHandled<Dispatch<Req, Notif>>,
936        ToFut: Fn(
937                &mut F,
938                Dispatch<Req, Notif>,
939                ConnectionTo<Host::Counterpart>,
940            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
941            + Send
942            + Sync,
943    {
944        let handler = MessageHandler::new(self.host.counterpart(), peer, op, to_future_hack);
945        self.with_handler(handler)
946    }
947
948    /// Register a handler for JSON-RPC requests from a specific peer.
949    ///
950    /// This is similar to [`on_receive_request`](Self::on_receive_request), but allows
951    /// specifying the source peer explicitly. This is useful when receiving messages
952    /// from a peer that requires message transformation (e.g., unwrapping `SuccessorRequest`
953    /// envelopes when receiving from an agent via a proxy).
954    ///
955    /// For the common case of receiving from the default counterpart, use
956    /// [`on_receive_request`](Self::on_receive_request) instead.
957    ///
958    /// # Example
959    ///
960    /// ```ignore
961    /// use agent_client_protocol::Agent;
962    /// use agent_client_protocol::schema::InitializeRequest;
963    ///
964    /// // Conductor receiving from agent direction - messages will be unwrapped from SuccessorMessage
965    /// connection.on_receive_request_from(Agent, async |req: InitializeRequest, responder, cx| {
966    ///     // Handle the request
967    ///     responder.respond(InitializeResponse::make())
968    /// })
969    /// ```
970    ///
971    /// # Ordering
972    ///
973    /// This callback runs inside the dispatch loop and blocks further message processing
974    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
975    /// ordering guarantees and how to avoid deadlocks.
976    pub fn on_receive_request_from<Req: JsonRpcRequest, Peer: Role, F, T, ToFut>(
977        self,
978        peer: Peer,
979        op: F,
980        to_future_hack: ToFut,
981    ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
982    where
983        Host::Counterpart: HasPeer<Peer>,
984        F: AsyncFnMut(
985                Req,
986                Responder<Req::Response>,
987                ConnectionTo<Host::Counterpart>,
988            ) -> Result<T, crate::Error>
989            + Send,
990        T: IntoHandled<(Req, Responder<Req::Response>)>,
991        ToFut: Fn(
992                &mut F,
993                Req,
994                Responder<Req::Response>,
995                ConnectionTo<Host::Counterpart>,
996            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
997            + Send
998            + Sync,
999    {
1000        let handler = RequestHandler::new(self.host.counterpart(), peer, op, to_future_hack);
1001        self.with_handler(handler)
1002    }
1003
1004    /// Register a handler for JSON-RPC notifications from a specific peer.
1005    ///
1006    /// This is similar to [`on_receive_notification`](Self::on_receive_notification), but allows
1007    /// specifying the source peer explicitly. This is useful when receiving messages
1008    /// from a peer that requires message transformation (e.g., unwrapping `SuccessorNotification`
1009    /// envelopes when receiving from an agent via a proxy).
1010    ///
1011    /// For the common case of receiving from the default counterpart, use
1012    /// [`on_receive_notification`](Self::on_receive_notification) instead.
1013    ///
1014    /// # Ordering
1015    ///
1016    /// This callback runs inside the dispatch loop and blocks further message processing
1017    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
1018    /// ordering guarantees and how to avoid deadlocks.
1019    pub fn on_receive_notification_from<Notif: JsonRpcNotification, Peer: Role, F, T, ToFut>(
1020        self,
1021        peer: Peer,
1022        op: F,
1023        to_future_hack: ToFut,
1024    ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
1025    where
1026        Host::Counterpart: HasPeer<Peer>,
1027        F: AsyncFnMut(Notif, ConnectionTo<Host::Counterpart>) -> Result<T, crate::Error> + Send,
1028        T: IntoHandled<(Notif, ConnectionTo<Host::Counterpart>)>,
1029        ToFut: Fn(
1030                &mut F,
1031                Notif,
1032                ConnectionTo<Host::Counterpart>,
1033            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
1034            + Send
1035            + Sync,
1036    {
1037        let handler = NotificationHandler::new(self.host.counterpart(), peer, op, to_future_hack);
1038        self.with_handler(handler)
1039    }
1040
1041    /// Add an MCP server that will be added to all new sessions that are proxied through this connection.
1042    ///
1043    /// Only applicable to proxies.
1044    pub fn with_mcp_server(
1045        self,
1046        mcp_server: McpServer<Host::Counterpart, impl RunWithConnectionTo<Host::Counterpart>>,
1047    ) -> Builder<
1048        Host,
1049        impl HandleDispatchFrom<Host::Counterpart>,
1050        impl RunWithConnectionTo<Host::Counterpart>,
1051    >
1052    where
1053        Host::Counterpart: HasPeer<Agent> + HasPeer<Client>,
1054    {
1055        let (handler, responder) = mcp_server.into_handler_and_responder();
1056        self.with_handler(handler).with_responder(responder)
1057    }
1058
1059    /// Run in server mode with the provided transport.
1060    ///
1061    /// This drives the connection by continuously processing messages from the transport
1062    /// and dispatching them to your registered handlers. The connection will run until:
1063    /// - The transport closes (e.g., EOF on byte streams)
1064    /// - An error occurs
1065    /// - One of your handlers returns an error
1066    ///
1067    /// The transport is responsible for serializing and deserializing `jsonrpcmsg::Message`
1068    /// values to/from the underlying I/O mechanism (byte streams, channels, etc.).
1069    ///
1070    /// Use this mode when you only need to respond to incoming messages and don't need
1071    /// to initiate your own requests. If you need to send requests to the other side,
1072    /// use [`connect_with`](Self::connect_with) instead.
1073    ///
1074    /// # Example: Byte Stream Transport
1075    ///
1076    /// ```no_run
1077    /// # use agent_client_protocol::UntypedRole;
1078    /// # use agent_client_protocol::{Builder};
1079    /// # use agent_client_protocol::ByteStreams;
1080    /// # use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
1081    /// # use agent_client_protocol_test::*;
1082    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
1083    /// let transport = ByteStreams::new(
1084    ///     tokio::io::stdout().compat_write(),
1085    ///     tokio::io::stdin().compat(),
1086    /// );
1087    ///
1088    /// UntypedRole.builder()
1089    ///     .on_receive_request(async |req: MyRequest, responder, cx| {
1090    ///         responder.respond(MyResponse { status: "ok".into() })
1091    ///     }, agent_client_protocol::on_receive_request!())
1092    ///     .connect_to(transport)
1093    ///     .await?;
1094    /// # Ok(())
1095    /// # }
1096    /// ```
1097    pub async fn connect_to(
1098        self,
1099        transport: impl ConnectTo<Host> + 'static,
1100    ) -> Result<(), crate::Error> {
1101        self.connect_with(transport, async move |_cx| future::pending().await)
1102            .await
1103    }
1104
1105    /// Run the connection until the provided closure completes.
1106    ///
1107    /// This drives the connection by:
1108    /// 1. Running your registered handlers in the background to process incoming messages
1109    /// 2. Executing your `main_fn` closure with a [`ConnectionTo<R>`] for sending requests/notifications
1110    ///
1111    /// The connection stays active until your `main_fn` returns, then shuts down gracefully.
1112    /// If the connection closes unexpectedly before `main_fn` completes, this returns an error.
1113    ///
1114    /// Use this mode when you need to initiate communication (send requests/notifications)
1115    /// in addition to responding to incoming messages. For server-only mode where you just
1116    /// respond to messages, use [`connect_to`](Self::connect_to) instead.
1117    ///
1118    /// # Example
1119    ///
1120    /// ```no_run
1121    /// # use agent_client_protocol::UntypedRole;
1122    /// # use agent_client_protocol::{Builder};
1123    /// # use agent_client_protocol::ByteStreams;
1124    /// # use agent_client_protocol::schema::InitializeRequest;
1125    /// # use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
1126    /// # use agent_client_protocol_test::*;
1127    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
1128    /// let transport = ByteStreams::new(
1129    ///     tokio::io::stdout().compat_write(),
1130    ///     tokio::io::stdin().compat(),
1131    /// );
1132    ///
1133    /// UntypedRole.builder()
1134    ///     .on_receive_request(async |req: MyRequest, responder, cx| {
1135    ///         // Handle incoming requests in the background
1136    ///         responder.respond(MyResponse { status: "ok".into() })
1137    ///     }, agent_client_protocol::on_receive_request!())
1138    ///     .connect_with(transport, async |cx| {
1139    ///         // Initialize the protocol
1140    ///         let init_response = cx.send_request(InitializeRequest::make())
1141    ///             .block_task()
1142    ///             .await?;
1143    ///
1144    ///         // Send more requests...
1145    ///         let result = cx.send_request(MyRequest {})
1146    ///             .block_task()
1147    ///             .await?;
1148    ///
1149    ///         // When this closure returns, the connection shuts down
1150    ///         Ok(())
1151    ///     })
1152    ///     .await?;
1153    /// # Ok(())
1154    /// # }
1155    /// ```
1156    ///
1157    /// # Parameters
1158    ///
1159    /// - `main_fn`: Your client logic. Receives a [`ConnectionTo<R>`] for sending messages.
1160    ///
1161    /// # Errors
1162    ///
1163    /// Returns an error if the connection closes before `main_fn` completes.
1164    pub async fn connect_with<R>(
1165        self,
1166        transport: impl ConnectTo<Host> + 'static,
1167        main_fn: impl AsyncFnOnce(ConnectionTo<Host::Counterpart>) -> Result<R, crate::Error>,
1168    ) -> Result<R, crate::Error> {
1169        let (_, future) = self.into_connection_and_future(transport, main_fn);
1170        future.await
1171    }
1172
1173    /// Helper that returns a [`ConnectionTo<R>`] and a future that runs this connection until `main_fn` returns.
1174    fn into_connection_and_future<R>(
1175        self,
1176        transport: impl ConnectTo<Host> + 'static,
1177        main_fn: impl AsyncFnOnce(ConnectionTo<Host::Counterpart>) -> Result<R, crate::Error>,
1178    ) -> (
1179        ConnectionTo<Host::Counterpart>,
1180        impl Future<Output = Result<R, crate::Error>>,
1181    ) {
1182        let Self {
1183            name,
1184            handler,
1185            responder,
1186            host: me,
1187        } = self;
1188
1189        let (outgoing_tx, outgoing_rx) = mpsc::unbounded();
1190        let (new_task_tx, new_task_rx) = mpsc::unbounded();
1191        let (dynamic_handler_tx, dynamic_handler_rx) = mpsc::unbounded();
1192        let connection = ConnectionTo::new(
1193            me.counterpart(),
1194            outgoing_tx,
1195            new_task_tx,
1196            dynamic_handler_tx,
1197        );
1198
1199        // Convert transport into server - this returns a channel for us to use
1200        // and a future that runs the transport
1201        let transport_component = crate::DynConnectTo::new(transport);
1202        let (transport_channel, transport_future) = transport_component.into_channel_and_future();
1203        let spawn_result = connection.spawn(transport_future);
1204
1205        // Destructure the channel endpoints
1206        let Channel {
1207            rx: transport_incoming_rx,
1208            tx: transport_outgoing_tx,
1209        } = transport_channel;
1210
1211        let (reply_tx, reply_rx) = mpsc::unbounded();
1212
1213        let future = crate::util::instrument_with_connection_name(name, {
1214            let connection = connection.clone();
1215            async move {
1216                let () = spawn_result?;
1217
1218                let background = async {
1219                    futures::try_join!(
1220                        // Protocol layer: OutgoingMessage → jsonrpcmsg::Message
1221                        outgoing_actor::outgoing_protocol_actor(
1222                            outgoing_rx,
1223                            reply_tx.clone(),
1224                            transport_outgoing_tx,
1225                        ),
1226                        // Protocol layer: jsonrpcmsg::Message → handler/reply routing
1227                        incoming_actor::incoming_protocol_actor(
1228                            me.counterpart(),
1229                            &connection,
1230                            transport_incoming_rx,
1231                            dynamic_handler_rx,
1232                            reply_rx,
1233                            handler,
1234                        ),
1235                        task_actor::task_actor(new_task_rx, &connection),
1236                        responder.run_with_connection_to(connection.clone()),
1237                    )?;
1238                    Ok(())
1239                };
1240
1241                crate::util::run_until(Box::pin(background), Box::pin(main_fn(connection.clone())))
1242                    .await
1243            }
1244        });
1245
1246        (connection, future)
1247    }
1248}
1249
1250impl<R, H, Run> ConnectTo<R::Counterpart> for Builder<R, H, Run>
1251where
1252    R: Role,
1253    H: HandleDispatchFrom<R::Counterpart> + 'static,
1254    Run: RunWithConnectionTo<R::Counterpart> + 'static,
1255{
1256    async fn connect_to(self, client: impl ConnectTo<R>) -> Result<(), crate::Error> {
1257        Builder::connect_to(self, client).await
1258    }
1259}
1260
1261/// The payload sent through the response oneshot channel.
1262///
1263/// Includes the response value and an optional ack channel for dispatch loop
1264/// synchronization.
1265pub(crate) struct ResponsePayload {
1266    /// The response result - either the JSON value or an error.
1267    pub(crate) result: Result<serde_json::Value, crate::Error>,
1268
1269    /// Optional acknowledgment channel for dispatch loop synchronization.
1270    ///
1271    /// When present, the receiver must send on this channel to signal that
1272    /// response processing is complete, allowing the dispatch loop to continue
1273    /// to the next message.
1274    ///
1275    /// This is `None` for error paths where the response is sent directly
1276    /// (e.g., when the outgoing channel is broken) rather than through the
1277    /// normal dispatch loop flow.
1278    pub(crate) ack_tx: Option<oneshot::Sender<()>>,
1279}
1280
1281impl std::fmt::Debug for ResponsePayload {
1282    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1283        f.debug_struct("ResponsePayload")
1284            .field("result", &self.result)
1285            .field("ack_tx", &self.ack_tx.as_ref().map(|_| "..."))
1286            .finish()
1287    }
1288}
1289
1290/// Message sent to the incoming actor for reply subscription management.
1291enum ReplyMessage {
1292    /// Subscribe to receive a response for the given request id.
1293    /// When a response with this id arrives, it will be sent through the oneshot
1294    /// along with an ack channel that must be signaled when processing is complete.
1295    /// The method name is stored to allow routing responses through typed handlers.
1296    Subscribe {
1297        id: jsonrpcmsg::Id,
1298
1299        /// id of the peer this request was sent to
1300        role_id: RoleId,
1301
1302        /// (original) method of the request -- the actual request may have been transformed
1303        /// to a successor method, but this will reflect the method of the wrapped request
1304        method: String,
1305
1306        sender: oneshot::Sender<ResponsePayload>,
1307    },
1308}
1309
1310impl std::fmt::Debug for ReplyMessage {
1311    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1312        match self {
1313            ReplyMessage::Subscribe { id, method, .. } => f
1314                .debug_struct("Subscribe")
1315                .field("id", id)
1316                .field("method", method)
1317                .finish(),
1318        }
1319    }
1320}
1321
1322/// Messages send to be serialized over the transport.
1323#[derive(Debug)]
1324enum OutgoingMessage {
1325    /// Send a request to the server.
1326    Request {
1327        /// id assigned to this request (generated by sender)
1328        id: jsonrpcmsg::Id,
1329
1330        /// the original method
1331        method: String,
1332
1333        /// the peer we sent this to
1334        role_id: RoleId,
1335
1336        /// the message to send; this may have a distinct method
1337        /// depending on the peer
1338        untyped: UntypedMessage,
1339
1340        /// where to send the response when it arrives (includes ack channel)
1341        response_tx: oneshot::Sender<ResponsePayload>,
1342    },
1343
1344    /// Send a notification to the server.
1345    Notification {
1346        /// the message to send; this may have a distinct method
1347        /// depending on the peer
1348        untyped: UntypedMessage,
1349    },
1350
1351    /// Send a response to a message from the server
1352    Response {
1353        id: jsonrpcmsg::Id,
1354
1355        response: Result<serde_json::Value, crate::Error>,
1356    },
1357
1358    /// Send a generalized error message
1359    Error { error: crate::Error },
1360}
1361
1362/// Return type from JrHandler; indicates whether the request was handled or not.
1363#[must_use]
1364#[derive(Debug)]
1365pub enum Handled<T> {
1366    /// The message was handled
1367    Yes,
1368
1369    /// The message was not handled; returns the original value.
1370    ///
1371    /// If `retry` is true,
1372    No {
1373        /// The message to be passed to subsequent handlers
1374        /// (typically the original message, but it may have been
1375        /// mutated.)
1376        message: T,
1377
1378        /// If true, request the message to be queued and retried with
1379        /// dynamic handlers as they are added.
1380        ///
1381        /// This is used for managing session updates since the dynamic
1382        /// handler for a session cannot be added until the response to the
1383        /// new session request has been processed and there may be updates
1384        /// that get processed at the same time.
1385        retry: bool,
1386    },
1387}
1388
1389/// Trait for converting handler return values into [`Handled`].
1390///
1391/// This trait allows handlers to return either `()` (which becomes `Handled::Yes`)
1392/// or an explicit `Handled<T>` value for more control over handler propagation.
1393pub trait IntoHandled<T> {
1394    /// Convert this value into a `Handled<T>`.
1395    fn into_handled(self) -> Handled<T>;
1396}
1397
1398impl<T> IntoHandled<T> for () {
1399    fn into_handled(self) -> Handled<T> {
1400        Handled::Yes
1401    }
1402}
1403
1404impl<T> IntoHandled<T> for Handled<T> {
1405    fn into_handled(self) -> Handled<T> {
1406        self
1407    }
1408}
1409
1410/// Connection context for sending messages and spawning tasks.
1411///
1412/// This is the primary handle for interacting with the JSON-RPC connection from
1413/// within handler callbacks. You can use it to:
1414///
1415/// * Send requests and notifications to the other side
1416/// * Spawn concurrent tasks that run alongside the connection
1417/// * Respond to requests (via [`Responder`] which wraps this)
1418///
1419/// # Cloning
1420///
1421/// `ConnectionTo` is cheaply cloneable - all clones refer to the same underlying connection.
1422/// This makes it easy to share across async tasks.
1423///
1424/// # Event Loop and Concurrency
1425///
1426/// Handler callbacks run on the event loop, which means the connection cannot process new
1427/// messages while your handler is running. Use [`spawn`](Self::spawn) to offload any
1428/// expensive or blocking work to concurrent tasks.
1429///
1430/// See the [Event Loop and Concurrency](Builder#event-loop-and-concurrency) section
1431/// for more details.
1432#[derive(Clone, Debug)]
1433pub struct ConnectionTo<Counterpart: Role> {
1434    counterpart: Counterpart,
1435    message_tx: OutgoingMessageTx,
1436    task_tx: TaskTx,
1437    dynamic_handler_tx: mpsc::UnboundedSender<DynamicHandlerMessage<Counterpart>>,
1438}
1439
1440impl<Counterpart: Role> ConnectionTo<Counterpart> {
1441    fn new(
1442        counterpart: Counterpart,
1443        message_tx: mpsc::UnboundedSender<OutgoingMessage>,
1444        task_tx: mpsc::UnboundedSender<Task>,
1445        dynamic_handler_tx: mpsc::UnboundedSender<DynamicHandlerMessage<Counterpart>>,
1446    ) -> Self {
1447        Self {
1448            counterpart,
1449            message_tx,
1450            task_tx,
1451            dynamic_handler_tx,
1452        }
1453    }
1454
1455    /// Return the counterpart role this connection is talking to.
1456    pub fn counterpart(&self) -> Counterpart {
1457        self.counterpart.clone()
1458    }
1459
1460    /// Spawns a task that will run so long as the JSON-RPC connection is being served.
1461    ///
1462    /// This is the primary mechanism for offloading expensive work from handler callbacks
1463    /// to avoid blocking the event loop. Spawned tasks run concurrently with the connection,
1464    /// allowing the server to continue processing messages.
1465    ///
1466    /// # Event Loop
1467    ///
1468    /// Handler callbacks run on the event loop, which cannot process new messages while
1469    /// your handler is running. Use `spawn` for any expensive operations:
1470    ///
1471    /// ```no_run
1472    /// # use agent_client_protocol_test::*;
1473    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
1474    /// # let connection = mock_connection();
1475    /// connection.on_receive_request(async |req: ProcessRequest, responder, cx| {
1476    ///     // Clone cx for the spawned task
1477    ///     cx.spawn({
1478    ///         let connection = cx.clone();
1479    ///         async move {
1480    ///             let result = expensive_operation(&req.data).await?;
1481    ///             connection.send_notification(ProcessComplete { result })?;
1482    ///             Ok(())
1483    ///         }
1484    ///     })?;
1485    ///
1486    ///     // Respond immediately
1487    ///     responder.respond(ProcessResponse { result: "started".into() })
1488    /// }, agent_client_protocol::on_receive_request!())
1489    /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
1490    /// # Ok(())
1491    /// # }
1492    /// ```
1493    ///
1494    /// # Errors
1495    ///
1496    /// If the spawned task returns an error, the entire server will shut down.
1497    #[track_caller]
1498    pub fn spawn(
1499        &self,
1500        task: impl IntoFuture<Output = Result<(), crate::Error>, IntoFuture: Send + 'static>,
1501    ) -> Result<(), crate::Error> {
1502        let location = std::panic::Location::caller();
1503        let task = task.into_future();
1504        Task::new(location, task).spawn(&self.task_tx)
1505    }
1506
1507    /// Spawn a JSON-RPC connection in the background and return a [`ConnectionTo`] for sending messages to it.
1508    ///
1509    /// This is useful for creating multiple connections that communicate with each other,
1510    /// such as implementing proxy patterns or connecting to multiple backend services.
1511    ///
1512    /// # Arguments
1513    ///
1514    /// - `builder`: The connection builder with handlers configured
1515    /// - `transport`: The transport component to connect to
1516    ///
1517    /// # Returns
1518    ///
1519    /// A `ConnectionTo` that you can use to send requests and notifications to the spawned connection.
1520    ///
1521    /// # Example: Proxying to a backend connection
1522    ///
1523    /// ```
1524    /// # use agent_client_protocol::UntypedRole;
1525    /// # use agent_client_protocol::{Builder, ConnectionTo};
1526    /// # use agent_client_protocol_test::*;
1527    /// # async fn example(cx: ConnectionTo<UntypedRole>) -> Result<(), agent_client_protocol::Error> {
1528    /// // Set up a backend connection builder
1529    /// let backend = UntypedRole.builder()
1530    ///     .on_receive_request(async |req: MyRequest, responder, _cx| {
1531    ///         responder.respond(MyResponse { status: "ok".into() })
1532    ///     }, agent_client_protocol::on_receive_request!());
1533    ///
1534    /// // Spawn it and get a context to send requests to it
1535    /// let backend_connection = cx.spawn_connection(backend, MockTransport)?;
1536    ///
1537    /// // Now you can forward requests to the backend
1538    /// let response = backend_connection.send_request(MyRequest {}).block_task().await?;
1539    /// # Ok(())
1540    /// # }
1541    /// ```
1542    #[track_caller]
1543    pub fn spawn_connection<R: Role>(
1544        &self,
1545        builder: Builder<
1546            R,
1547            impl HandleDispatchFrom<R::Counterpart> + 'static,
1548            impl RunWithConnectionTo<R::Counterpart> + 'static,
1549        >,
1550        transport: impl ConnectTo<R> + 'static,
1551    ) -> Result<ConnectionTo<R::Counterpart>, crate::Error> {
1552        let (connection, future) =
1553            builder.into_connection_and_future(transport, |_| std::future::pending());
1554        Task::new(std::panic::Location::caller(), future).spawn(&self.task_tx)?;
1555        Ok(connection)
1556    }
1557
1558    /// Send a request/notification and forward the response appropriately.
1559    ///
1560    /// The request context's response type matches the request's response type,
1561    /// enabling type-safe message forwarding.
1562    pub fn send_proxied_message<Req: JsonRpcRequest<Response: Send>, Notif: JsonRpcNotification>(
1563        &self,
1564        message: Dispatch<Req, Notif>,
1565    ) -> Result<(), crate::Error>
1566    where
1567        Counterpart: HasPeer<Counterpart>,
1568    {
1569        self.send_proxied_message_to(self.counterpart(), message)
1570    }
1571
1572    /// Send a request/notification and forward the response appropriately.
1573    ///
1574    /// The request context's response type matches the request's response type,
1575    /// enabling type-safe message forwarding.
1576    pub fn send_proxied_message_to<
1577        Peer: Role,
1578        Req: JsonRpcRequest<Response: Send>,
1579        Notif: JsonRpcNotification,
1580    >(
1581        &self,
1582        peer: Peer,
1583        message: Dispatch<Req, Notif>,
1584    ) -> Result<(), crate::Error>
1585    where
1586        Counterpart: HasPeer<Peer>,
1587    {
1588        match message {
1589            Dispatch::Request(request, responder) => self
1590                .send_request_to(peer, request)
1591                .forward_response_to(responder),
1592            Dispatch::Notification(notification) => self.send_notification_to(peer, notification),
1593            Dispatch::Response(result, router) => {
1594                // Responses are forwarded directly to their destination
1595                router.respond_with_result(result)
1596            }
1597        }
1598    }
1599
1600    /// Send an outgoing request and return a [`SentRequest`] for handling the reply.
1601    ///
1602    /// The returned [`SentRequest`] provides methods for receiving the response without
1603    /// blocking the event loop:
1604    ///
1605    /// * [`on_receiving_result`](SentRequest::on_receiving_result) - Schedule
1606    ///   a callback to run when the response arrives (doesn't block the event loop)
1607    /// * [`block_task`](SentRequest::block_task) - Block the current task until the response
1608    ///   arrives (only safe in spawned tasks, not in handlers)
1609    ///
1610    /// # Anti-Footgun Design
1611    ///
1612    /// The API intentionally makes it difficult to block on the result directly to prevent
1613    /// the common mistake of blocking the event loop while waiting for a response:
1614    ///
1615    /// ```compile_fail
1616    /// # use agent_client_protocol_test::*;
1617    /// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
1618    /// // ❌ This doesn't compile - prevents blocking the event loop
1619    /// let response = cx.send_request(MyRequest {}).await?;
1620    /// # Ok(())
1621    /// # }
1622    /// ```
1623    ///
1624    /// ```no_run
1625    /// # use agent_client_protocol_test::*;
1626    /// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
1627    /// // ✅ Option 1: Schedule callback (safe in handlers)
1628    /// cx.send_request(MyRequest {})
1629    ///     .on_receiving_result(async |result| {
1630    ///         // Handle the response
1631    ///         Ok(())
1632    ///     })?;
1633    ///
1634    /// // ✅ Option 2: Block in spawned task (safe because task is concurrent)
1635    /// cx.spawn({
1636    ///     let cx = cx.clone();
1637    ///     async move {
1638    ///         let response = cx.send_request(MyRequest {})
1639    ///             .block_task()
1640    ///             .await?;
1641    ///         // Process response...
1642    ///         Ok(())
1643    ///     }
1644    /// })?;
1645    /// # Ok(())
1646    /// # }
1647    /// ```
1648    /// Send an outgoing request to the default counterpart peer.
1649    ///
1650    /// This is a convenience method that sends to the counterpart role `R`.
1651    /// For explicit control over the target peer, use [`send_request_to`](Self::send_request_to).
1652    pub fn send_request<Req: JsonRpcRequest>(&self, request: Req) -> SentRequest<Req::Response>
1653    where
1654        Counterpart: HasPeer<Counterpart>,
1655    {
1656        self.send_request_to(self.counterpart.clone(), request)
1657    }
1658
1659    /// Send an outgoing request to a specific peer.
1660    ///
1661    /// The message will be transformed according to the [`HasPeer`](crate::role::HasPeer)
1662    /// implementation before being sent.
1663    pub fn send_request_to<Peer: Role, Req: JsonRpcRequest>(
1664        &self,
1665        peer: Peer,
1666        request: Req,
1667    ) -> SentRequest<Req::Response>
1668    where
1669        Counterpart: HasPeer<Peer>,
1670    {
1671        let method = request.method().to_string();
1672        let id = jsonrpcmsg::Id::String(uuid::Uuid::new_v4().to_string());
1673        let (response_tx, response_rx) = oneshot::channel();
1674        let role_id = peer.role_id();
1675        let remote_style = self.counterpart.remote_style(peer);
1676        match remote_style.transform_outgoing_message(request) {
1677            Ok(untyped) => {
1678                // Transform the message for the target role
1679                let message = OutgoingMessage::Request {
1680                    id: id.clone(),
1681                    method: method.clone(),
1682                    role_id,
1683                    untyped,
1684                    response_tx,
1685                };
1686
1687                match self.message_tx.unbounded_send(message) {
1688                    Ok(()) => (),
1689                    Err(error) => {
1690                        let OutgoingMessage::Request {
1691                            method,
1692                            response_tx,
1693                            ..
1694                        } = error.into_inner()
1695                        else {
1696                            unreachable!();
1697                        };
1698
1699                        response_tx
1700                            .send(ResponsePayload {
1701                                result: Err(crate::util::internal_error(format!(
1702                                    "failed to send outgoing request `{method}"
1703                                ))),
1704                                ack_tx: None,
1705                            })
1706                            .unwrap();
1707                    }
1708                }
1709            }
1710
1711            Err(err) => {
1712                response_tx
1713                    .send(ResponsePayload {
1714                        result: Err(crate::util::internal_error(format!(
1715                            "failed to create untyped request for `{method}`: {err}"
1716                        ))),
1717                        ack_tx: None,
1718                    })
1719                    .unwrap();
1720            }
1721        }
1722
1723        SentRequest::new(id, method.clone(), self.task_tx.clone(), response_rx)
1724            .map(move |json| <Req::Response>::from_value(&method, json))
1725    }
1726
1727    /// Send an outgoing notification to the default counterpart peer (no reply expected).
1728    ///
1729    /// Notifications are fire-and-forget messages that don't have IDs and don't expect responses.
1730    /// This method sends the notification immediately and returns.
1731    ///
1732    /// This is a convenience method that sends to the counterpart role `R`.
1733    /// For explicit control over the target peer, use [`send_notification_to`](Self::send_notification_to).
1734    ///
1735    /// ```no_run
1736    /// # use agent_client_protocol_test::*;
1737    /// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::Agent>) -> Result<(), agent_client_protocol::Error> {
1738    /// cx.send_notification(StatusUpdate {
1739    ///     message: "Processing...".into(),
1740    /// })?;
1741    /// # Ok(())
1742    /// # }
1743    /// ```
1744    pub fn send_notification<N: JsonRpcNotification>(
1745        &self,
1746        notification: N,
1747    ) -> Result<(), crate::Error>
1748    where
1749        Counterpart: HasPeer<Counterpart>,
1750    {
1751        self.send_notification_to(self.counterpart.clone(), notification)
1752    }
1753
1754    /// Send an outgoing notification to a specific peer (no reply expected).
1755    ///
1756    /// The message will be transformed according to the [`HasPeer`](crate::role::HasPeer)
1757    /// implementation before being sent.
1758    pub fn send_notification_to<Peer: Role, N: JsonRpcNotification>(
1759        &self,
1760        peer: Peer,
1761        notification: N,
1762    ) -> Result<(), crate::Error>
1763    where
1764        Counterpart: HasPeer<Peer>,
1765    {
1766        let remote_style = self.counterpart.remote_style(peer);
1767        tracing::debug!(
1768            role = std::any::type_name::<Counterpart>(),
1769            peer = std::any::type_name::<Peer>(),
1770            notification_type = std::any::type_name::<N>(),
1771            ?remote_style,
1772            original_method = notification.method(),
1773            "send_notification_to"
1774        );
1775        let transformed = remote_style.transform_outgoing_message(notification)?;
1776        tracing::debug!(
1777            transformed_method = %transformed.method,
1778            "send_notification_to transformed"
1779        );
1780        send_raw_message(
1781            &self.message_tx,
1782            OutgoingMessage::Notification {
1783                untyped: transformed,
1784            },
1785        )
1786    }
1787
1788    /// Send an error notification (no reply expected).
1789    pub fn send_error_notification(&self, error: crate::Error) -> Result<(), crate::Error> {
1790        send_raw_message(&self.message_tx, OutgoingMessage::Error { error })
1791    }
1792
1793    /// Register a dynamic message handler, used to intercept messages specific to a particular session
1794    /// or some similar modal thing.
1795    ///
1796    /// Dynamic message handlers are called first for every incoming message.
1797    ///
1798    /// If they decline to handle the message, then the message is passed to the regular registered handlers.
1799    ///
1800    /// The handler will stay registered until the returned registration guard is dropped.
1801    pub fn add_dynamic_handler(
1802        &self,
1803        handler: impl HandleDispatchFrom<Counterpart> + 'static,
1804    ) -> Result<DynamicHandlerRegistration<Counterpart>, crate::Error> {
1805        let uuid = Uuid::new_v4();
1806        self.dynamic_handler_tx
1807            .unbounded_send(DynamicHandlerMessage::AddDynamicHandler(
1808                uuid,
1809                Box::new(handler),
1810            ))
1811            .map_err(crate::util::internal_error)?;
1812
1813        Ok(DynamicHandlerRegistration::new(uuid, self.clone()))
1814    }
1815
1816    fn remove_dynamic_handler(&self, uuid: Uuid) {
1817        // Ignore errors
1818        drop(
1819            self.dynamic_handler_tx
1820                .unbounded_send(DynamicHandlerMessage::RemoveDynamicHandler(uuid)),
1821        );
1822    }
1823}
1824
1825#[derive(Clone, Debug)]
1826pub struct DynamicHandlerRegistration<R: Role> {
1827    uuid: Uuid,
1828    cx: ConnectionTo<R>,
1829}
1830
1831impl<R: Role> DynamicHandlerRegistration<R> {
1832    fn new(uuid: Uuid, cx: ConnectionTo<R>) -> Self {
1833        Self { uuid, cx }
1834    }
1835
1836    /// Prevents the dynamic handler from being removed when dropped.
1837    pub fn run_indefinitely(self) {
1838        std::mem::forget(self);
1839    }
1840}
1841
1842impl<R: Role> Drop for DynamicHandlerRegistration<R> {
1843    fn drop(&mut self) {
1844        self.cx.remove_dynamic_handler(self.uuid);
1845    }
1846}
1847
1848/// The context to respond to an incoming request.
1849///
1850/// This context is provided to request handlers and serves a dual role:
1851///
1852/// 1. **Respond to the request** - Use [`respond`](Self::respond) or
1853///    [`respond_with_result`](Self::respond_with_result) to send the response
1854/// 2. **Send other messages** - Use the [`ConnectionTo`] parameter passed to your
1855///    handler, which provides [`send_request`](`ConnectionTo::send_request`),
1856///    [`send_notification`](`ConnectionTo::send_notification`), and
1857///    [`spawn`](`ConnectionTo::spawn`)
1858///
1859/// # Example
1860///
1861/// ```no_run
1862/// # use agent_client_protocol_test::*;
1863/// # async fn example() -> Result<(), agent_client_protocol::Error> {
1864/// # let connection = mock_connection();
1865/// connection.on_receive_request(async |req: ProcessRequest, responder, cx| {
1866///     // Send a notification while processing
1867///     cx.send_notification(StatusUpdate {
1868///         message: "processing".into(),
1869///     })?;
1870///
1871///     // Do some work...
1872///     let result = process(&req.data)?;
1873///
1874///     // Respond to the request
1875///     responder.respond(ProcessResponse { result })
1876/// }, agent_client_protocol::on_receive_request!())
1877/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
1878/// # Ok(())
1879/// # }
1880/// ```
1881///
1882/// # Event Loop Considerations
1883///
1884/// Like all handlers, request handlers run on the event loop. Use
1885/// [`spawn`](ConnectionTo::spawn) for expensive operations to avoid blocking
1886/// the connection.
1887///
1888/// See the [Event Loop and Concurrency](Builder#event-loop-and-concurrency)
1889/// section for more details.
1890#[must_use]
1891pub struct Responder<T: JsonRpcResponse = serde_json::Value> {
1892    /// The method of the request.
1893    method: String,
1894
1895    /// The `id` of the message we are replying to.
1896    id: jsonrpcmsg::Id,
1897
1898    /// Function to send the response to its destination.
1899    ///
1900    /// For incoming requests: serializes to JSON and sends over the wire.
1901    /// For incoming responses: sends to the waiting oneshot channel.
1902    send_fn: Box<dyn FnOnce(Result<T, crate::Error>) -> Result<(), crate::Error> + Send>,
1903}
1904
1905impl<T: JsonRpcResponse> std::fmt::Debug for Responder<T> {
1906    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1907        f.debug_struct("Responder")
1908            .field("method", &self.method)
1909            .field("id", &self.id)
1910            .field("response_type", &std::any::type_name::<T>())
1911            .finish_non_exhaustive()
1912    }
1913}
1914
1915impl Responder<serde_json::Value> {
1916    /// Create a new request context for an incoming request.
1917    ///
1918    /// The response will be serialized to JSON and sent over the wire.
1919    fn new(message_tx: OutgoingMessageTx, method: String, id: jsonrpcmsg::Id) -> Self {
1920        let id_clone = id.clone();
1921        Self {
1922            method,
1923            id,
1924            send_fn: Box::new(move |response: Result<serde_json::Value, crate::Error>| {
1925                send_raw_message(
1926                    &message_tx,
1927                    OutgoingMessage::Response {
1928                        id: id_clone,
1929                        response,
1930                    },
1931                )
1932            }),
1933        }
1934    }
1935
1936    /// Cast this request context to a different response type.
1937    ///
1938    /// The provided type `T` will be serialized to JSON before sending.
1939    pub fn cast<T: JsonRpcResponse>(self) -> Responder<T> {
1940        self.wrap_params(move |method, value| match value {
1941            Ok(value) => T::into_json(value, method),
1942            Err(e) => Err(e),
1943        })
1944    }
1945}
1946
1947impl<T: JsonRpcResponse> Responder<T> {
1948    /// Method of the incoming request
1949    #[must_use]
1950    pub fn method(&self) -> &str {
1951        &self.method
1952    }
1953
1954    /// ID of the incoming request/response as a JSON value
1955    #[must_use]
1956    pub fn id(&self) -> serde_json::Value {
1957        crate::util::id_to_json(&self.id)
1958    }
1959
1960    /// Convert to a `Responder` that expects a JSON value
1961    /// and which checks (dynamically) that the JSON value it receives
1962    /// can be converted to `T`.
1963    pub fn erase_to_json(self) -> Responder<serde_json::Value> {
1964        self.wrap_params(|method, value| T::from_value(method, value?))
1965    }
1966
1967    /// Return a new Responder with a different method name.
1968    pub fn wrap_method(self, method: String) -> Responder<T> {
1969        Responder {
1970            method,
1971            id: self.id,
1972            send_fn: self.send_fn,
1973        }
1974    }
1975
1976    /// Return a new Responder that expects a response of type U.
1977    ///
1978    /// `wrap_fn` will be invoked with the method name and the result to transform
1979    /// type `U` into type `T` before sending.
1980    pub fn wrap_params<U: JsonRpcResponse>(
1981        self,
1982        wrap_fn: impl FnOnce(&str, Result<U, crate::Error>) -> Result<T, crate::Error> + Send + 'static,
1983    ) -> Responder<U> {
1984        let method = self.method.clone();
1985        Responder {
1986            method: self.method,
1987            id: self.id,
1988            send_fn: Box::new(move |input: Result<U, crate::Error>| {
1989                let t_value = wrap_fn(&method, input);
1990                (self.send_fn)(t_value)
1991            }),
1992        }
1993    }
1994
1995    /// Respond to the JSON-RPC request with either a value (`Ok`) or an error (`Err`).
1996    pub fn respond_with_result(
1997        self,
1998        response: Result<T, crate::Error>,
1999    ) -> Result<(), crate::Error> {
2000        tracing::debug!(id = ?self.id, "respond called");
2001        (self.send_fn)(response)
2002    }
2003
2004    /// Respond to the JSON-RPC request with a value.
2005    pub fn respond(self, response: T) -> Result<(), crate::Error> {
2006        self.respond_with_result(Ok(response))
2007    }
2008
2009    /// Respond to the JSON-RPC request with an internal error containing a message.
2010    pub fn respond_with_internal_error(self, message: impl ToString) -> Result<(), crate::Error> {
2011        self.respond_with_error(crate::util::internal_error(message))
2012    }
2013
2014    /// Respond to the JSON-RPC request with an error.
2015    pub fn respond_with_error(self, error: crate::Error) -> Result<(), crate::Error> {
2016        tracing::debug!(id = ?self.id, ?error, "respond_with_error called");
2017        self.respond_with_result(Err(error))
2018    }
2019}
2020
2021/// Context for handling an incoming JSON-RPC response.
2022///
2023/// This is the response-side counterpart to [`Responder`]. While `Responder` handles
2024/// incoming requests (where you send a response over the wire), `ResponseRouter` handles
2025/// incoming responses (where you route the response to a local task waiting for it).
2026///
2027/// Both are fundamentally "sinks" that push the message through a `send_fn`, but they
2028/// represent different points in the message lifecycle and carry different metadata.
2029#[must_use]
2030pub struct ResponseRouter<T: JsonRpcResponse = serde_json::Value> {
2031    /// The method of the original request.
2032    method: String,
2033
2034    /// The `id` of the original request.
2035    id: jsonrpcmsg::Id,
2036
2037    /// The RoleId to which the original request was sent
2038    /// (and hence from which the reply is expected).
2039    role_id: RoleId,
2040
2041    /// Function to send the response to the waiting task.
2042    send_fn: Box<dyn FnOnce(Result<T, crate::Error>) -> Result<(), crate::Error> + Send>,
2043}
2044
2045impl<T: JsonRpcResponse> std::fmt::Debug for ResponseRouter<T> {
2046    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2047        f.debug_struct("ResponseRouter")
2048            .field("method", &self.method)
2049            .field("id", &self.id)
2050            .field("response_type", &std::any::type_name::<T>())
2051            .finish_non_exhaustive()
2052    }
2053}
2054
2055impl ResponseRouter<serde_json::Value> {
2056    /// Create a new response context for routing a response to a local awaiter.
2057    ///
2058    /// When `respond_with_result` is called, the response is sent through the oneshot
2059    /// channel to the code that originally sent the request.
2060    pub(crate) fn new(
2061        method: String,
2062        id: jsonrpcmsg::Id,
2063        role_id: RoleId,
2064        sender: oneshot::Sender<ResponsePayload>,
2065    ) -> Self {
2066        Self {
2067            method,
2068            id,
2069            role_id,
2070            send_fn: Box::new(move |response: Result<serde_json::Value, crate::Error>| {
2071                sender
2072                    .send(ResponsePayload {
2073                        result: response,
2074                        ack_tx: None,
2075                    })
2076                    .map_err(|_| {
2077                        crate::util::internal_error("failed to send response, receiver dropped")
2078                    })
2079            }),
2080        }
2081    }
2082
2083    /// Cast this response context to a different response type.
2084    ///
2085    /// The provided type `T` will be serialized to JSON before sending.
2086    pub fn cast<T: JsonRpcResponse>(self) -> ResponseRouter<T> {
2087        self.wrap_params(move |method, value| match value {
2088            Ok(value) => T::into_json(value, method),
2089            Err(e) => Err(e),
2090        })
2091    }
2092}
2093
2094impl<T: JsonRpcResponse> ResponseRouter<T> {
2095    /// Method of the original request
2096    #[must_use]
2097    pub fn method(&self) -> &str {
2098        &self.method
2099    }
2100
2101    /// ID of the original request as a JSON value
2102    #[must_use]
2103    pub fn id(&self) -> serde_json::Value {
2104        crate::util::id_to_json(&self.id)
2105    }
2106
2107    /// The peer to which the original request was sent.
2108    ///
2109    /// This is the peer from which we expect to receive the response.
2110    #[must_use]
2111    pub fn role_id(&self) -> RoleId {
2112        self.role_id.clone()
2113    }
2114
2115    /// Convert to a `ResponseRouter` that expects a JSON value
2116    /// and which checks (dynamically) that the JSON value it receives
2117    /// can be converted to `T`.
2118    pub fn erase_to_json(self) -> ResponseRouter<serde_json::Value> {
2119        self.wrap_params(|method, value| T::from_value(method, value?))
2120    }
2121
2122    /// Return a new ResponseRouter that expects a response of type U.
2123    ///
2124    /// `wrap_fn` will be invoked with the method name and the result to transform
2125    /// type `U` into type `T` before sending.
2126    fn wrap_params<U: JsonRpcResponse>(
2127        self,
2128        wrap_fn: impl FnOnce(&str, Result<U, crate::Error>) -> Result<T, crate::Error> + Send + 'static,
2129    ) -> ResponseRouter<U> {
2130        let method = self.method.clone();
2131        ResponseRouter {
2132            method: self.method,
2133            id: self.id,
2134            role_id: self.role_id,
2135            send_fn: Box::new(move |input: Result<U, crate::Error>| {
2136                let t_value = wrap_fn(&method, input);
2137                (self.send_fn)(t_value)
2138            }),
2139        }
2140    }
2141
2142    /// Complete the response by sending the result to the waiting task.
2143    pub fn respond_with_result(
2144        self,
2145        response: Result<T, crate::Error>,
2146    ) -> Result<(), crate::Error> {
2147        tracing::debug!(id = ?self.id, "response routed to awaiter");
2148        (self.send_fn)(response)
2149    }
2150
2151    /// Complete the response by sending a value to the waiting task.
2152    pub fn respond(self, response: T) -> Result<(), crate::Error> {
2153        self.respond_with_result(Ok(response))
2154    }
2155
2156    /// Complete the response by sending an internal error to the waiting task.
2157    pub fn respond_with_internal_error(self, message: impl ToString) -> Result<(), crate::Error> {
2158        self.respond_with_error(crate::util::internal_error(message))
2159    }
2160
2161    /// Complete the response by sending an error to the waiting task.
2162    pub fn respond_with_error(self, error: crate::Error) -> Result<(), crate::Error> {
2163        tracing::debug!(id = ?self.id, ?error, "error routed to awaiter");
2164        self.respond_with_result(Err(error))
2165    }
2166}
2167
2168/// Common bounds for any JSON-RPC message.
2169///
2170/// # Derive Macro
2171///
2172/// For simple message types, you can use the `JsonRpcRequest` or `JsonRpcNotification` derive macros
2173/// which will implement both `JsonRpcMessage` and the respective trait. See [`JsonRpcRequest`] and
2174/// [`JsonRpcNotification`] for examples.
2175pub trait JsonRpcMessage: 'static + Debug + Sized + Send + Clone {
2176    /// Check if this message type matches the given method name.
2177    fn matches_method(method: &str) -> bool;
2178
2179    /// The method name for the message.
2180    fn method(&self) -> &str;
2181
2182    /// Convert this message into an untyped message.
2183    fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error>;
2184
2185    /// Parse this type from a method name and parameters.
2186    ///
2187    /// Returns an error if the method doesn't match or deserialization fails.
2188    /// Callers should use `matches_method` first to check if this type handles the method.
2189    fn parse_message(method: &str, params: &impl Serialize) -> Result<Self, crate::Error>;
2190}
2191
2192/// Defines the "payload" of a successful response to a JSON-RPC request.
2193///
2194/// # Derive Macro
2195///
2196/// Use `#[derive(JsonRpcResponse)]` to automatically implement this trait:
2197///
2198/// ```ignore
2199/// use agent_client_protocol::JsonRpcResponse;
2200/// use serde::{Serialize, Deserialize};
2201///
2202/// #[derive(Debug, Serialize, Deserialize, JsonRpcResponse)]
2203/// #[response(method = "_hello")]
2204/// struct HelloResponse {
2205///     greeting: String,
2206/// }
2207/// ```
2208pub trait JsonRpcResponse: 'static + Debug + Sized + Send + Clone {
2209    /// Convert this message into a JSON value.
2210    fn into_json(self, method: &str) -> Result<serde_json::Value, crate::Error>;
2211
2212    /// Parse a JSON value into the response type.
2213    fn from_value(method: &str, value: serde_json::Value) -> Result<Self, crate::Error>;
2214}
2215
2216impl JsonRpcResponse for serde_json::Value {
2217    fn from_value(_method: &str, value: serde_json::Value) -> Result<Self, crate::Error> {
2218        Ok(value)
2219    }
2220
2221    fn into_json(self, _method: &str) -> Result<serde_json::Value, crate::Error> {
2222        Ok(self)
2223    }
2224}
2225
2226/// A struct that represents a notification (JSON-RPC message that does not expect a response).
2227///
2228/// # Derive Macro
2229///
2230/// Use `#[derive(JsonRpcNotification)]` to automatically implement both `JsonRpcMessage` and `JsonRpcNotification`:
2231///
2232/// ```ignore
2233/// use agent_client_protocol::JsonRpcNotification;
2234/// use serde::{Serialize, Deserialize};
2235///
2236/// #[derive(Debug, Clone, Serialize, Deserialize, JsonRpcNotification)]
2237/// #[notification(method = "_ping")]
2238/// struct PingNotification {
2239///     timestamp: u64,
2240/// }
2241/// ```
2242pub trait JsonRpcNotification: JsonRpcMessage {}
2243
2244/// A struct that represents a request (JSON-RPC message expecting a response).
2245///
2246/// # Derive Macro
2247///
2248/// Use `#[derive(JsonRpcRequest)]` to automatically implement both `JsonRpcMessage` and `JsonRpcRequest`:
2249///
2250/// ```ignore
2251/// use agent_client_protocol::{JsonRpcRequest, JsonRpcResponse};
2252/// use serde::{Serialize, Deserialize};
2253///
2254/// #[derive(Debug, Clone, Serialize, Deserialize, JsonRpcRequest)]
2255/// #[request(method = "_hello", response = HelloResponse)]
2256/// struct HelloRequest {
2257///     name: String,
2258/// }
2259///
2260/// #[derive(Debug, Serialize, Deserialize, JsonRpcResponse)]
2261/// struct HelloResponse {
2262///     greeting: String,
2263/// }
2264/// ```
2265pub trait JsonRpcRequest: JsonRpcMessage {
2266    /// The type of data expected in response.
2267    type Response: JsonRpcResponse;
2268}
2269
2270/// An enum capturing an in-flight request or notification.
2271/// In the case of a request, also includes the context used to respond to the request.
2272///
2273/// Type parameters allow specifying the concrete request and notification types.
2274/// By default, both are `UntypedMessage` for dynamic dispatch.
2275/// The request context's response type matches the request's response type.
2276#[derive(Debug)]
2277pub enum Dispatch<Req: JsonRpcRequest = UntypedMessage, Notif: JsonRpcMessage = UntypedMessage> {
2278    /// Incoming request and the context where the response should be sent.
2279    Request(Req, Responder<Req::Response>),
2280
2281    /// Incoming notification.
2282    Notification(Notif),
2283
2284    /// Incoming response to a request we sent.
2285    ///
2286    /// The first field is the response result (success or error from the remote).
2287    /// The second field is the context for forwarding the response to its destination
2288    /// (typically a waiting oneshot channel).
2289    Response(
2290        Result<Req::Response, crate::Error>,
2291        ResponseRouter<Req::Response>,
2292    ),
2293}
2294
2295impl<Req: JsonRpcRequest, Notif: JsonRpcMessage> Dispatch<Req, Notif> {
2296    /// Map the request and notification types to new types.
2297    ///
2298    /// Note: Response variants are passed through unchanged since they don't
2299    /// contain a parseable message payload.
2300    pub fn map<Req1, Notif1>(
2301        self,
2302        map_request: impl FnOnce(Req, Responder<Req::Response>) -> (Req1, Responder<Req1::Response>),
2303        map_notification: impl FnOnce(Notif) -> Notif1,
2304    ) -> Dispatch<Req1, Notif1>
2305    where
2306        Req1: JsonRpcRequest<Response = Req::Response>,
2307        Notif1: JsonRpcMessage,
2308    {
2309        match self {
2310            Dispatch::Request(request, responder) => {
2311                let (new_request, new_responder) = map_request(request, responder);
2312                Dispatch::Request(new_request, new_responder)
2313            }
2314            Dispatch::Notification(notification) => {
2315                let new_notification = map_notification(notification);
2316                Dispatch::Notification(new_notification)
2317            }
2318            Dispatch::Response(result, router) => Dispatch::Response(result, router),
2319        }
2320    }
2321
2322    /// Respond to the message with an error.
2323    ///
2324    /// If this message is a request, this error becomes the reply to the request.
2325    ///
2326    /// If this message is a notification, the error is sent as a notification.
2327    ///
2328    /// If this message is a response, the error is forwarded to the waiting handler.
2329    pub fn respond_with_error<R: Role>(
2330        self,
2331        error: crate::Error,
2332        cx: ConnectionTo<R>,
2333    ) -> Result<(), crate::Error> {
2334        match self {
2335            Dispatch::Request(_, responder) => responder.respond_with_error(error),
2336            Dispatch::Notification(_) => cx.send_error_notification(error),
2337            Dispatch::Response(_, responder) => responder.respond_with_error(error),
2338        }
2339    }
2340
2341    /// Convert to a `Responder` that expects a JSON value
2342    /// and which checks (dynamically) that the JSON value it receives
2343    /// can be converted to `T`.
2344    ///
2345    /// Note: Response variants cannot be erased since their payload is already
2346    /// parsed. This returns an error for Response variants.
2347    pub fn erase_to_json(self) -> Result<Dispatch, crate::Error> {
2348        match self {
2349            Dispatch::Request(response, responder) => Ok(Dispatch::Request(
2350                response.to_untyped_message()?,
2351                responder.erase_to_json(),
2352            )),
2353            Dispatch::Notification(notification) => {
2354                Ok(Dispatch::Notification(notification.to_untyped_message()?))
2355            }
2356            Dispatch::Response(_, _) => Err(crate::util::internal_error(
2357                "cannot erase Response variant to JSON",
2358            )),
2359        }
2360    }
2361
2362    /// Convert the message in self to an untyped message.
2363    ///
2364    /// Note: Response variants don't have an untyped message representation.
2365    /// This returns an error for Response variants.
2366    pub fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error> {
2367        match self {
2368            Dispatch::Request(request, _) => request.to_untyped_message(),
2369            Dispatch::Notification(notification) => notification.to_untyped_message(),
2370            Dispatch::Response(_, _) => Err(crate::util::internal_error(
2371                "Response variant has no untyped message representation",
2372            )),
2373        }
2374    }
2375
2376    /// Convert self to an untyped message context.
2377    ///
2378    /// Note: Response variants cannot be converted. This returns an error for Response variants.
2379    pub fn into_untyped_dispatch(self) -> Result<Dispatch, crate::Error> {
2380        match self {
2381            Dispatch::Request(request, responder) => Ok(Dispatch::Request(
2382                request.to_untyped_message()?,
2383                responder.erase_to_json(),
2384            )),
2385            Dispatch::Notification(notification) => {
2386                Ok(Dispatch::Notification(notification.to_untyped_message()?))
2387            }
2388            Dispatch::Response(_, _) => Err(crate::util::internal_error(
2389                "cannot convert Response variant to untyped message context",
2390            )),
2391        }
2392    }
2393
2394    /// Returns the request ID if this is a request or response, None if notification.
2395    pub fn id(&self) -> Option<serde_json::Value> {
2396        match self {
2397            Dispatch::Request(_, cx) => Some(cx.id()),
2398            Dispatch::Notification(_) => None,
2399            Dispatch::Response(_, cx) => Some(cx.id()),
2400        }
2401    }
2402
2403    /// Returns the method of the message.
2404    ///
2405    /// For requests and notifications, this is the method from the message payload.
2406    /// For responses, this is the method of the original request.
2407    pub fn method(&self) -> &str {
2408        match self {
2409            Dispatch::Request(msg, _) => msg.method(),
2410            Dispatch::Notification(msg) => msg.method(),
2411            Dispatch::Response(_, cx) => cx.method(),
2412        }
2413    }
2414}
2415
2416impl Dispatch {
2417    /// Attempts to parse `self` into a typed message context.
2418    ///
2419    /// # Returns
2420    ///
2421    /// * `Ok(Ok(typed))` if this is a request/notification of the given types
2422    /// * `Ok(Err(self))` if not
2423    /// * `Err` if has the correct method for the given types but parsing fails
2424    #[tracing::instrument(skip(self), fields(Request = ?std::any::type_name::<Req>(), Notif = ?std::any::type_name::<Notif>()), level = "trace", ret)]
2425    pub(crate) fn into_typed_dispatch<Req: JsonRpcRequest, Notif: JsonRpcNotification>(
2426        self,
2427    ) -> Result<Result<Dispatch<Req, Notif>, Dispatch>, crate::Error> {
2428        tracing::debug!(
2429            message = ?self,
2430            "into_typed_dispatch"
2431        );
2432        match self {
2433            Dispatch::Request(message, responder) => {
2434                if Req::matches_method(&message.method) {
2435                    match Req::parse_message(&message.method, &message.params) {
2436                        Ok(req) => {
2437                            tracing::trace!(?req, "parsed ok");
2438                            Ok(Ok(Dispatch::Request(req, responder.cast())))
2439                        }
2440                        Err(err) => {
2441                            tracing::trace!(?err, "parse error");
2442                            Err(err)
2443                        }
2444                    }
2445                } else {
2446                    tracing::trace!("method doesn't match");
2447                    Ok(Err(Dispatch::Request(message, responder)))
2448                }
2449            }
2450
2451            Dispatch::Notification(message) => {
2452                if Notif::matches_method(&message.method) {
2453                    match Notif::parse_message(&message.method, &message.params) {
2454                        Ok(notif) => {
2455                            tracing::trace!(?notif, "parse ok");
2456                            Ok(Ok(Dispatch::Notification(notif)))
2457                        }
2458                        Err(err) => {
2459                            tracing::trace!(?err, "parse error");
2460                            Err(err)
2461                        }
2462                    }
2463                } else {
2464                    tracing::trace!("method doesn't match");
2465                    Ok(Err(Dispatch::Notification(message)))
2466                }
2467            }
2468
2469            Dispatch::Response(result, cx) => {
2470                let method = cx.method();
2471                if Req::matches_method(method) {
2472                    // Parse the response result
2473                    let typed_result = match result {
2474                        Ok(value) => {
2475                            match <Req::Response as JsonRpcResponse>::from_value(method, value) {
2476                                Ok(parsed) => {
2477                                    tracing::trace!(?parsed, "parse ok");
2478                                    Ok(parsed)
2479                                }
2480                                Err(err) => {
2481                                    tracing::trace!(?err, "parse error");
2482                                    return Err(err);
2483                                }
2484                            }
2485                        }
2486                        Err(err) => {
2487                            tracing::trace!("error, passthrough");
2488                            Err(err)
2489                        }
2490                    };
2491                    Ok(Ok(Dispatch::Response(typed_result, cx.cast())))
2492                } else {
2493                    tracing::trace!("method doesn't match");
2494                    Ok(Err(Dispatch::Response(result, cx)))
2495                }
2496            }
2497        }
2498    }
2499
2500    /// True if this message has a field with the given name.
2501    ///
2502    /// Returns `false` for Response variants.
2503    #[must_use]
2504    pub fn has_field(&self, field_name: &str) -> bool {
2505        self.message()
2506            .and_then(|m| m.params().get(field_name))
2507            .is_some()
2508    }
2509
2510    /// Returns true if this message has a session-id field.
2511    ///
2512    /// Returns `false` for Response variants.
2513    pub(crate) fn has_session_id(&self) -> bool {
2514        self.has_field("sessionId")
2515    }
2516
2517    /// Extract the ACP session-id from this message (if any).
2518    ///
2519    /// Returns `Ok(None)` for Response variants.
2520    pub(crate) fn get_session_id(&self) -> Result<Option<SessionId>, crate::Error> {
2521        let Some(message) = self.message() else {
2522            return Ok(None);
2523        };
2524        let Some(value) = message.params().get("sessionId") else {
2525            return Ok(None);
2526        };
2527        let session_id = serde_json::from_value(value.clone())?;
2528        Ok(Some(session_id))
2529    }
2530
2531    /// Try to parse this as a notification of the given type.
2532    ///
2533    /// # Returns
2534    ///
2535    /// * `Ok(Ok(typed))` if this is a request/notification of the given types
2536    /// * `Ok(Err(self))` if not
2537    /// * `Err` if has the correct method for the given types but parsing fails
2538    pub fn into_notification<N: JsonRpcNotification>(
2539        self,
2540    ) -> Result<Result<N, Dispatch>, crate::Error> {
2541        match self {
2542            Dispatch::Notification(msg) => {
2543                if !N::matches_method(&msg.method) {
2544                    return Ok(Err(Dispatch::Notification(msg)));
2545                }
2546                match N::parse_message(&msg.method, &msg.params) {
2547                    Ok(n) => Ok(Ok(n)),
2548                    Err(err) => Err(err),
2549                }
2550            }
2551            Dispatch::Request(..) | Dispatch::Response(..) => Ok(Err(self)),
2552        }
2553    }
2554
2555    /// Try to parse this as a request of the given type.
2556    ///
2557    /// # Returns
2558    ///
2559    /// * `Ok(Ok(typed))` if this is a request/notification of the given types
2560    /// * `Ok(Err(self))` if not
2561    /// * `Err` if has the correct method for the given types but parsing fails
2562    pub fn into_request<Req: JsonRpcRequest>(
2563        self,
2564    ) -> Result<Result<(Req, Responder<Req::Response>), Dispatch>, crate::Error> {
2565        match self {
2566            Dispatch::Request(msg, responder) => {
2567                if !Req::matches_method(&msg.method) {
2568                    return Ok(Err(Dispatch::Request(msg, responder)));
2569                }
2570                match Req::parse_message(&msg.method, &msg.params) {
2571                    Ok(req) => Ok(Ok((req, responder.cast()))),
2572                    Err(err) => Err(err),
2573                }
2574            }
2575            Dispatch::Notification(..) | Dispatch::Response(..) => Ok(Err(self)),
2576        }
2577    }
2578}
2579
2580impl<M: JsonRpcRequest + JsonRpcNotification> Dispatch<M, M> {
2581    /// Returns the message payload for requests and notifications.
2582    ///
2583    /// Returns `None` for Response variants since they don't contain a message payload.
2584    pub fn message(&self) -> Option<&M> {
2585        match self {
2586            Dispatch::Request(msg, _) | Dispatch::Notification(msg) => Some(msg),
2587            Dispatch::Response(_, _) => None,
2588        }
2589    }
2590
2591    /// Map the request/notification message.
2592    ///
2593    /// Response variants pass through unchanged.
2594    pub(crate) fn try_map_message(
2595        self,
2596        map_message: impl FnOnce(M) -> Result<M, crate::Error>,
2597    ) -> Result<Dispatch<M, M>, crate::Error> {
2598        match self {
2599            Dispatch::Request(request, cx) => Ok(Dispatch::Request(map_message(request)?, cx)),
2600            Dispatch::Notification(notification) => {
2601                Ok(Dispatch::<M, M>::Notification(map_message(notification)?))
2602            }
2603            Dispatch::Response(result, cx) => Ok(Dispatch::Response(result, cx)),
2604        }
2605    }
2606}
2607
2608/// An incoming JSON message without any typing. Can be a request or a notification.
2609#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
2610pub struct UntypedMessage {
2611    /// The JSON-RPC method name
2612    pub method: String,
2613    /// The JSON-RPC parameters as a raw JSON value
2614    pub params: serde_json::Value,
2615}
2616
2617impl UntypedMessage {
2618    /// Returns an untyped message with the given method and parameters.
2619    pub fn new(method: &str, params: impl Serialize) -> Result<Self, crate::Error> {
2620        let params = serde_json::to_value(params)?;
2621        Ok(Self {
2622            method: method.to_string(),
2623            params,
2624        })
2625    }
2626
2627    /// Returns the method name
2628    #[must_use]
2629    pub fn method(&self) -> &str {
2630        &self.method
2631    }
2632
2633    /// Returns the parameters as a JSON value
2634    #[must_use]
2635    pub fn params(&self) -> &serde_json::Value {
2636        &self.params
2637    }
2638
2639    /// Consumes this message and returns the method and params
2640    #[must_use]
2641    pub fn into_parts(self) -> (String, serde_json::Value) {
2642        (self.method, self.params)
2643    }
2644
2645    /// Convert `self` to a JSON-RPC message.
2646    pub(crate) fn into_jsonrpc_msg(
2647        self,
2648        id: Option<jsonrpcmsg::Id>,
2649    ) -> Result<jsonrpcmsg::Request, crate::Error> {
2650        let Self { method, params } = self;
2651        Ok(jsonrpcmsg::Request::new_v2(method, json_cast(params)?, id))
2652    }
2653}
2654
2655impl JsonRpcMessage for UntypedMessage {
2656    fn matches_method(_method: &str) -> bool {
2657        // UntypedMessage matches any method - it's the untyped fallback
2658        true
2659    }
2660
2661    fn method(&self) -> &str {
2662        &self.method
2663    }
2664
2665    fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error> {
2666        Ok(self.clone())
2667    }
2668
2669    fn parse_message(method: &str, params: &impl Serialize) -> Result<Self, crate::Error> {
2670        UntypedMessage::new(method, params)
2671    }
2672}
2673
2674impl JsonRpcRequest for UntypedMessage {
2675    type Response = serde_json::Value;
2676}
2677
2678impl JsonRpcNotification for UntypedMessage {}
2679
2680/// Represents a pending response of type `R` from an outgoing request.
2681///
2682/// Returned by [`ConnectionTo::send_request`], this type provides methods for handling
2683/// the response without blocking the event loop. The API is intentionally designed to make
2684/// it difficult to accidentally block.
2685///
2686/// # Anti-Footgun Design
2687///
2688/// You cannot directly `.await` a `SentRequest`. Instead, you must choose how to handle
2689/// the response:
2690///
2691/// ## Option 1: Schedule a Callback (Safe in Handlers)
2692///
2693/// Use [`on_receiving_result`](Self::on_receiving_result) to schedule a task
2694/// that runs when the response arrives. This doesn't block the event loop:
2695///
2696/// ```no_run
2697/// # use agent_client_protocol_test::*;
2698/// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
2699/// cx.send_request(MyRequest {})
2700///     .on_receiving_result(async |result| {
2701///         match result {
2702///             Ok(response) => {
2703///                 // Handle successful response
2704///                 Ok(())
2705///             }
2706///             Err(error) => {
2707///                 // Handle error
2708///                 Err(error)
2709///             }
2710///         }
2711///     })?;
2712/// # Ok(())
2713/// # }
2714/// ```
2715///
2716/// ## Option 2: Block in a Spawned Task (Safe Only in `spawn`)
2717///
2718/// Use [`block_task`](Self::block_task) to block until the response arrives, but **only**
2719/// in a spawned task (never in a handler):
2720///
2721/// ```no_run
2722/// # use agent_client_protocol_test::*;
2723/// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
2724/// // ✅ Safe: Spawned task runs concurrently
2725/// cx.spawn({
2726///     let cx = cx.clone();
2727///     async move {
2728///         let response = cx.send_request(MyRequest {})
2729///             .block_task()
2730///             .await?;
2731///         // Process response...
2732///         Ok(())
2733///     }
2734/// })?;
2735/// # Ok(())
2736/// # }
2737/// ```
2738///
2739/// ```no_run
2740/// # use agent_client_protocol_test::*;
2741/// # async fn example() -> Result<(), agent_client_protocol::Error> {
2742/// # let connection = mock_connection();
2743/// // ❌ NEVER do this in a handler - blocks the event loop!
2744/// connection.on_receive_request(async |req: MyRequest, responder, cx| {
2745///     let response = cx.send_request(MyRequest {})
2746///         .block_task()  // This will deadlock!
2747///         .await?;
2748///     responder.respond(response)
2749/// }, agent_client_protocol::on_receive_request!())
2750/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
2751/// # Ok(())
2752/// # }
2753/// ```
2754///
2755/// # Why This Design?
2756///
2757/// If you block the event loop while waiting for a response, the connection cannot process
2758/// the incoming response message, creating a deadlock. This API design prevents that footgun
2759/// by making blocking explicit and encouraging non-blocking patterns.
2760pub struct SentRequest<T> {
2761    id: jsonrpcmsg::Id,
2762    method: String,
2763    task_tx: TaskTx,
2764    response_rx: oneshot::Receiver<ResponsePayload>,
2765    to_result: Box<dyn Fn(serde_json::Value) -> Result<T, crate::Error> + Send>,
2766}
2767
2768impl<T: Debug> Debug for SentRequest<T> {
2769    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2770        f.debug_struct("SentRequest")
2771            .field("id", &self.id)
2772            .field("method", &self.method)
2773            .field("task_tx", &self.task_tx)
2774            .field("response_rx", &self.response_rx)
2775            .finish_non_exhaustive()
2776    }
2777}
2778
2779impl SentRequest<serde_json::Value> {
2780    fn new(
2781        id: jsonrpcmsg::Id,
2782        method: String,
2783        task_tx: mpsc::UnboundedSender<Task>,
2784        response_rx: oneshot::Receiver<ResponsePayload>,
2785    ) -> Self {
2786        Self {
2787            id,
2788            method,
2789            response_rx,
2790            task_tx,
2791            to_result: Box::new(Ok),
2792        }
2793    }
2794}
2795
2796impl<T: JsonRpcResponse> SentRequest<T> {
2797    /// The id of the outgoing request.
2798    #[must_use]
2799    pub fn id(&self) -> serde_json::Value {
2800        crate::util::id_to_json(&self.id)
2801    }
2802
2803    /// The method of the request this is in response to.
2804    #[must_use]
2805    pub fn method(&self) -> &str {
2806        &self.method
2807    }
2808
2809    /// Create a new response that maps the result of the response to a new type.
2810    pub fn map<U>(
2811        self,
2812        map_fn: impl Fn(T) -> Result<U, crate::Error> + 'static + Send,
2813    ) -> SentRequest<U> {
2814        SentRequest {
2815            id: self.id,
2816            method: self.method,
2817            response_rx: self.response_rx,
2818            task_tx: self.task_tx,
2819            to_result: Box::new(move |value| map_fn((self.to_result)(value)?)),
2820        }
2821    }
2822
2823    /// Forward the response (success or error) to a request context when it arrives.
2824    ///
2825    /// This is a convenience method for proxying messages between connections. When the
2826    /// response arrives, it will be automatically sent to the provided request context,
2827    /// whether it's a successful response or an error.
2828    ///
2829    /// # Example: Proxying requests
2830    ///
2831    /// ```
2832    /// # use agent_client_protocol::UntypedRole;
2833    /// # use agent_client_protocol::{Builder, ConnectionTo};
2834    /// # use agent_client_protocol_test::*;
2835    /// # async fn example(cx: ConnectionTo<UntypedRole>) -> Result<(), agent_client_protocol::Error> {
2836    /// // Set up backend connection builder
2837    /// let backend = UntypedRole.builder()
2838    ///     .on_receive_request(async |req: MyRequest, responder, cx| {
2839    ///         responder.respond(MyResponse { status: "ok".into() })
2840    ///     }, agent_client_protocol::on_receive_request!());
2841    ///
2842    /// // Spawn backend and get a context to send to it
2843    /// let backend_connection = cx.spawn_connection(backend, MockTransport)?;
2844    ///
2845    /// // Set up proxy that forwards requests to backend
2846    /// UntypedRole.builder()
2847    ///     .on_receive_request({
2848    ///         let backend_connection = backend_connection.clone();
2849    ///         async move |req: MyRequest, responder, cx| {
2850    ///             // Forward the request to backend and proxy the response back
2851    ///             backend_connection.send_request(req)
2852    ///                 .forward_response_to(responder)?;
2853    ///             Ok(())
2854    ///         }
2855    ///     }, agent_client_protocol::on_receive_request!());
2856    /// # Ok(())
2857    /// # }
2858    /// ```
2859    ///
2860    /// # Type Safety
2861    ///
2862    /// The request context's response type must match the request's response type,
2863    /// ensuring type-safe message forwarding.
2864    ///
2865    /// # When to Use
2866    ///
2867    /// Use this when:
2868    /// - You're implementing a proxy or gateway pattern
2869    /// - You want to forward responses without processing them
2870    /// - The response types match between the outgoing request and incoming request
2871    ///
2872    /// This is equivalent to calling `on_receiving_result` and manually forwarding
2873    /// the result, but more concise.
2874    pub fn forward_response_to(self, responder: Responder<T>) -> Result<(), crate::Error>
2875    where
2876        T: Send,
2877    {
2878        self.on_receiving_result(async move |result| responder.respond_with_result(result))
2879    }
2880
2881    /// Block the current task until the response is received.
2882    ///
2883    /// **Warning:** This method blocks the current async task. It is **only safe** to use
2884    /// in spawned tasks created with [`ConnectionTo::spawn`]. Using it directly in a
2885    /// handler callback will deadlock the connection.
2886    ///
2887    /// # Safe Usage (in spawned tasks)
2888    ///
2889    /// ```no_run
2890    /// # use agent_client_protocol_test::*;
2891    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
2892    /// # let connection = mock_connection();
2893    /// connection.on_receive_request(async |req: MyRequest, responder, cx| {
2894    ///     // Spawn a task to handle the request
2895    ///     cx.spawn({
2896    ///         let connection = cx.clone();
2897    ///         async move {
2898    ///             // Safe: We're in a spawned task, not blocking the event loop
2899    ///             let response = connection.send_request(OtherRequest {})
2900    ///                 .block_task()
2901    ///                 .await?;
2902    ///
2903    ///             // Process the response...
2904    ///             Ok(())
2905    ///         }
2906    ///     })?;
2907    ///
2908    ///     // Respond immediately
2909    ///     responder.respond(MyResponse { status: "ok".into() })
2910    /// }, agent_client_protocol::on_receive_request!())
2911    /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
2912    /// # Ok(())
2913    /// # }
2914    /// ```
2915    ///
2916    /// # Unsafe Usage (in handlers - will deadlock!)
2917    ///
2918    /// ```no_run
2919    /// # use agent_client_protocol_test::*;
2920    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
2921    /// # let connection = mock_connection();
2922    /// connection.on_receive_request(async |req: MyRequest, responder, cx| {
2923    ///     // ❌ DEADLOCK: Handler blocks event loop, which can't process the response
2924    ///     let response = cx.send_request(OtherRequest {})
2925    ///         .block_task()
2926    ///         .await?;
2927    ///
2928    ///     responder.respond(MyResponse { status: response.value })
2929    /// }, agent_client_protocol::on_receive_request!())
2930    /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
2931    /// # Ok(())
2932    /// # }
2933    /// ```
2934    ///
2935    /// # When to Use
2936    ///
2937    /// Use this method when:
2938    /// - You're in a spawned task (via [`ConnectionTo::spawn`])
2939    /// - You need the response value to proceed with your logic
2940    /// - Linear control flow is more natural than callbacks
2941    ///
2942    /// For handler callbacks, use [`on_receiving_result`](Self::on_receiving_result) instead.
2943    pub async fn block_task(self) -> Result<T, crate::Error>
2944    where
2945        T: Send,
2946    {
2947        match self.response_rx.await {
2948            Ok(ResponsePayload {
2949                result: Ok(json_value),
2950                ack_tx,
2951            }) => {
2952                // Ack immediately - we're in a spawned task, so the dispatch loop
2953                // can continue while we process the value.
2954                if let Some(tx) = ack_tx {
2955                    let _ = tx.send(());
2956                }
2957                match (self.to_result)(json_value) {
2958                    Ok(value) => Ok(value),
2959                    Err(err) => Err(err),
2960                }
2961            }
2962            Ok(ResponsePayload {
2963                result: Err(err),
2964                ack_tx,
2965            }) => {
2966                if let Some(tx) = ack_tx {
2967                    let _ = tx.send(());
2968                }
2969                Err(err)
2970            }
2971            Err(err) => Err(crate::util::internal_error(format!(
2972                "response to `{}` never received: {}",
2973                self.method, err
2974            ))),
2975        }
2976    }
2977
2978    /// Schedule an async task to run when a successful response is received.
2979    ///
2980    /// This is a convenience wrapper around [`on_receiving_result`](Self::on_receiving_result)
2981    /// for the common pattern of forwarding errors to a request context while only processing
2982    /// successful responses.
2983    ///
2984    /// # Behavior
2985    ///
2986    /// - If the response is `Ok(value)`, your task receives the value and the request context
2987    /// - If the response is `Err(error)`, the error is automatically sent to `responder`
2988    ///   and your task is not called
2989    ///
2990    /// # Example: Chaining requests
2991    ///
2992    /// ```no_run
2993    /// # use agent_client_protocol_test::*;
2994    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
2995    /// # let connection = mock_connection();
2996    /// connection.on_receive_request(async |req: ValidateRequest, responder, cx| {
2997    ///     // Send initial request
2998    ///     cx.send_request(ValidateRequest { data: req.data.clone() })
2999    ///         .on_receiving_ok_result(responder, async |validation, responder| {
3000    ///             // Only runs if validation succeeded
3001    ///             if validation.is_valid {
3002    ///                 // Respond to original request
3003    ///                 responder.respond(ValidateResponse { is_valid: true, error: None })
3004    ///             } else {
3005    ///                 responder.respond_with_error(agent_client_protocol::util::internal_error("validation failed"))
3006    ///             }
3007    ///         })?;
3008    ///
3009    ///     Ok(())
3010    /// }, agent_client_protocol::on_receive_request!())
3011    /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
3012    /// # Ok(())
3013    /// # }
3014    /// ```
3015    ///
3016    /// # Ordering
3017    ///
3018    /// Like [`on_receiving_result`](Self::on_receiving_result), the callback blocks the
3019    /// dispatch loop until it completes. See the [`ordering`](crate::concepts::ordering) module
3020    /// for details.
3021    ///
3022    /// # When to Use
3023    ///
3024    /// Use this when:
3025    /// - You need to respond to a request based on another request's result
3026    /// - You want errors to automatically propagate to the request context
3027    /// - You only care about the success case
3028    ///
3029    /// For more control over error handling, use [`on_receiving_result`](Self::on_receiving_result).
3030    #[track_caller]
3031    pub fn on_receiving_ok_result<F>(
3032        self,
3033        responder: Responder<T>,
3034        task: impl FnOnce(T, Responder<T>) -> F + 'static + Send,
3035    ) -> Result<(), crate::Error>
3036    where
3037        F: Future<Output = Result<(), crate::Error>> + 'static + Send,
3038        T: Send,
3039    {
3040        self.on_receiving_result(async move |result| match result {
3041            Ok(value) => task(value, responder).await,
3042            Err(err) => responder.respond_with_error(err),
3043        })
3044    }
3045
3046    /// Schedule an async task to run when the response is received.
3047    ///
3048    /// This is the recommended way to handle responses in handler callbacks, as it doesn't
3049    /// block the event loop. The task will be spawned automatically when the response arrives.
3050    ///
3051    /// # Example: Handle response in callback
3052    ///
3053    /// ```no_run
3054    /// # use agent_client_protocol_test::*;
3055    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
3056    /// # let connection = mock_connection();
3057    /// connection.on_receive_request(async |req: MyRequest, responder, cx| {
3058    ///     // Send a request and schedule a callback for the response
3059    ///     cx.send_request(QueryRequest { id: 22 })
3060    ///         .on_receiving_result({
3061    ///             let connection = cx.clone();
3062    ///             async move |result| {
3063    ///                 match result {
3064    ///                     Ok(response) => {
3065    ///                         println!("Got response: {:?}", response);
3066    ///                         // Can send more messages here
3067    ///                         connection.send_notification(QueryComplete {})?;
3068    ///                         Ok(())
3069    ///                 }
3070    ///                     Err(error) => {
3071    ///                         eprintln!("Request failed: {}", error);
3072    ///                         Err(error)
3073    ///                     }
3074    ///                 }
3075    ///             }
3076    ///         })?;
3077    ///
3078    ///     // Handler continues immediately without waiting
3079    ///     responder.respond(MyResponse { status: "processing".into() })
3080    /// }, agent_client_protocol::on_receive_request!())
3081    /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
3082    /// # Ok(())
3083    /// # }
3084    /// ```
3085    ///
3086    /// # Ordering
3087    ///
3088    /// The callback runs as a spawned task, but the dispatch loop waits for it to complete
3089    /// before processing the next message. This gives you ordering guarantees: no other
3090    /// messages will be processed while your callback runs.
3091    ///
3092    /// This differs from [`block_task`](Self::block_task), which signals completion immediately
3093    /// upon receiving the response (before your code processes it).
3094    ///
3095    /// See the [`ordering`](crate::concepts::ordering) module for details on ordering guarantees
3096    /// and how to avoid deadlocks.
3097    ///
3098    /// # Error Handling
3099    ///
3100    /// If the scheduled task returns `Err`, the entire server will shut down. Make sure to handle
3101    /// errors appropriately within your task.
3102    ///
3103    /// # When to Use
3104    ///
3105    /// Use this method when:
3106    /// - You're in a handler callback (not a spawned task)
3107    /// - You want ordering guarantees (no other messages processed during your callback)
3108    /// - You need to do async work before "releasing" control back to the dispatch loop
3109    ///
3110    /// For spawned tasks where you don't need ordering guarantees, consider [`block_task`](Self::block_task).
3111    #[track_caller]
3112    pub fn on_receiving_result<F>(
3113        self,
3114        task: impl FnOnce(Result<T, crate::Error>) -> F + 'static + Send,
3115    ) -> Result<(), crate::Error>
3116    where
3117        F: Future<Output = Result<(), crate::Error>> + 'static + Send,
3118        T: Send,
3119    {
3120        let task_tx = self.task_tx.clone();
3121        let method = self.method;
3122        let response_rx = self.response_rx;
3123        let to_result = self.to_result;
3124        let location = Location::caller();
3125
3126        Task::new(location, async move {
3127            match response_rx.await {
3128                Ok(ResponsePayload { result, ack_tx }) => {
3129                    // Convert the result using to_result for Ok values
3130                    let typed_result = match result {
3131                        Ok(json_value) => to_result(json_value),
3132                        Err(err) => Err(err),
3133                    };
3134
3135                    // Run the user's callback
3136                    let outcome = task(typed_result).await;
3137
3138                    // Ack AFTER the callback completes - this is the key difference
3139                    // from block_task. The dispatch loop waits for this ack.
3140                    if let Some(tx) = ack_tx {
3141                        let _ = tx.send(());
3142                    }
3143
3144                    outcome
3145                }
3146                Err(err) => Err(crate::util::internal_error(format!(
3147                    "response to `{method}` never received: {err}"
3148                ))),
3149            }
3150        })
3151        .spawn(&task_tx)
3152    }
3153}
3154
3155// ============================================================================
3156// IntoJrConnectionTransport Implementations
3157// ============================================================================
3158
3159/// A component that communicates over line streams.
3160///
3161/// `Lines` implements the [`ConnectTo`] trait for any pair of line-based streams
3162/// (a `Stream<Item = io::Result<String>>` for incoming and a `Sink<String>` for outgoing),
3163/// handling serialization of JSON-RPC messages to/from newline-delimited JSON.
3164///
3165/// This is a lower-level primitive than [`ByteStreams`] that enables interception and
3166/// transformation of individual lines before they are parsed or after they are serialized.
3167/// This is particularly useful for debugging, logging, or implementing custom line-based
3168/// protocols.
3169///
3170/// # Use Cases
3171///
3172/// - **Line-by-line logging**: Intercept and log each line before parsing
3173/// - **Custom protocols**: Transform lines before/after JSON-RPC processing
3174/// - **Debugging**: Inspect raw message strings
3175/// - **Line filtering**: Skip or modify specific messages
3176///
3177/// Most users should use [`ByteStreams`] instead, which provides a simpler interface
3178/// for byte-based I/O.
3179///
3180/// [`ConnectTo`]: crate::ConnectTo
3181#[derive(Debug)]
3182pub struct Lines<OutgoingSink, IncomingStream> {
3183    /// Outgoing line sink (where we write serialized JSON-RPC messages)
3184    pub outgoing: OutgoingSink,
3185    /// Incoming line stream (where we read and parse JSON-RPC messages)
3186    pub incoming: IncomingStream,
3187}
3188
3189impl<OutgoingSink, IncomingStream> Lines<OutgoingSink, IncomingStream>
3190where
3191    OutgoingSink: futures::Sink<String, Error = std::io::Error> + Send + 'static,
3192    IncomingStream: futures::Stream<Item = std::io::Result<String>> + Send + 'static,
3193{
3194    /// Create a new line stream transport.
3195    pub fn new(outgoing: OutgoingSink, incoming: IncomingStream) -> Self {
3196        Self { outgoing, incoming }
3197    }
3198}
3199
3200impl<OutgoingSink, IncomingStream, R: Role> ConnectTo<R> for Lines<OutgoingSink, IncomingStream>
3201where
3202    OutgoingSink: futures::Sink<String, Error = std::io::Error> + Send + 'static,
3203    IncomingStream: futures::Stream<Item = std::io::Result<String>> + Send + 'static,
3204{
3205    async fn connect_to(self, client: impl ConnectTo<R::Counterpart>) -> Result<(), crate::Error> {
3206        let (channel, serve_self) = ConnectTo::<R>::into_channel_and_future(self);
3207        match futures::future::select(Box::pin(client.connect_to(channel)), serve_self).await {
3208            Either::Left((result, _)) | Either::Right((result, _)) => result,
3209        }
3210    }
3211
3212    fn into_channel_and_future(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
3213        let Self { outgoing, incoming } = self;
3214
3215        // Create a channel pair for the client to use
3216        let (channel_for_caller, channel_for_lines) = Channel::duplex();
3217
3218        // Create the server future that runs the line stream actors
3219        let server_future = Box::pin(async move {
3220            let Channel { rx, tx } = channel_for_lines;
3221
3222            // Run both actors concurrently
3223            let outgoing_future = transport_actor::transport_outgoing_lines_actor(rx, outgoing);
3224            let incoming_future = transport_actor::transport_incoming_lines_actor(incoming, tx);
3225
3226            // Wait for both to complete
3227            futures::try_join!(outgoing_future, incoming_future)?;
3228
3229            Ok(())
3230        });
3231
3232        (channel_for_caller, server_future)
3233    }
3234}
3235
3236/// A component that communicates over byte streams (stdin/stdout, sockets, pipes, etc.).
3237///
3238/// `ByteStreams` implements the [`ConnectTo`] trait for any pair of `AsyncRead` and `AsyncWrite`
3239/// streams, handling serialization of JSON-RPC messages to/from newline-delimited JSON.
3240/// This is the standard way to communicate with external processes or network connections.
3241///
3242/// # Use Cases
3243///
3244/// - **Stdio communication**: Connect to agents or proxies via stdin/stdout
3245/// - **Network sockets**: TCP, Unix domain sockets, or other stream-based protocols
3246/// - **Named pipes**: Cross-process communication on the same machine
3247/// - **File I/O**: Reading from and writing to file descriptors
3248///
3249/// # Example
3250///
3251/// Connecting to an agent via stdio:
3252///
3253/// ```no_run
3254/// use agent_client_protocol::UntypedRole;
3255/// # use agent_client_protocol::{ByteStreams};
3256/// use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
3257///
3258/// # async fn example() -> Result<(), agent_client_protocol::Error> {
3259/// let component = ByteStreams::new(
3260///     tokio::io::stdout().compat_write(),
3261///     tokio::io::stdin().compat(),
3262/// );
3263///
3264/// // Use as a component in a connection
3265/// agent_client_protocol::UntypedRole.builder()
3266///     .name("my-client")
3267///     .connect_to(component)
3268///     .await?;
3269/// # Ok(())
3270/// # }
3271/// ```
3272///
3273/// [`ConnectTo`]: crate::ConnectTo
3274#[derive(Debug)]
3275pub struct ByteStreams<OB, IB> {
3276    /// Outgoing byte stream (where we write serialized messages)
3277    pub outgoing: OB,
3278    /// Incoming byte stream (where we read and parse messages)
3279    pub incoming: IB,
3280}
3281
3282impl<OB, IB> ByteStreams<OB, IB>
3283where
3284    OB: AsyncWrite + Send + 'static,
3285    IB: AsyncRead + Send + 'static,
3286{
3287    /// Create a new byte stream transport.
3288    pub fn new(outgoing: OB, incoming: IB) -> Self {
3289        Self { outgoing, incoming }
3290    }
3291}
3292
3293impl<OB, IB, R: Role> ConnectTo<R> for ByteStreams<OB, IB>
3294where
3295    OB: AsyncWrite + Send + 'static,
3296    IB: AsyncRead + Send + 'static,
3297{
3298    async fn connect_to(self, client: impl ConnectTo<R::Counterpart>) -> Result<(), crate::Error> {
3299        let (channel, serve_self) = ConnectTo::<R>::into_channel_and_future(self);
3300        match futures::future::select(pin!(client.connect_to(channel)), serve_self).await {
3301            Either::Left((result, _)) | Either::Right((result, _)) => result,
3302        }
3303    }
3304
3305    fn into_channel_and_future(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
3306        use futures::AsyncBufReadExt;
3307        use futures::AsyncWriteExt;
3308        use futures::io::BufReader;
3309        let Self { outgoing, incoming } = self;
3310
3311        // Convert byte streams to line streams
3312        // Box both streams to satisfy Unpin requirements
3313        let incoming_lines = Box::pin(BufReader::new(incoming).lines());
3314
3315        // Create a sink that writes lines (with newlines) to the outgoing byte stream
3316        // We need to Box the writer since it may not be Unpin
3317        let outgoing_sink =
3318            futures::sink::unfold(Box::pin(outgoing), async move |mut writer, line: String| {
3319                let mut bytes = line.into_bytes();
3320                bytes.push(b'\n');
3321                writer.write_all(&bytes).await?;
3322                Ok::<_, std::io::Error>(writer)
3323            });
3324
3325        // Delegate to Lines component
3326        ConnectTo::<R>::into_channel_and_future(Lines::new(outgoing_sink, incoming_lines))
3327    }
3328}
3329
3330/// A channel endpoint representing one side of a bidirectional message channel.
3331///
3332/// `Channel` represents a single endpoint's view of a bidirectional communication channel.
3333/// Each endpoint has:
3334/// - `rx`: A receiver for incoming messages (or errors) from the counterpart
3335/// - `tx`: A sender for outgoing messages (or errors) to the counterpart
3336///
3337/// # Example
3338///
3339/// ```no_run
3340/// # use agent_client_protocol::UntypedRole;
3341/// # use agent_client_protocol::{Channel, Builder};
3342/// # async fn example() -> Result<(), agent_client_protocol::Error> {
3343/// // Create a pair of connected channels
3344/// let (channel_a, channel_b) = Channel::duplex();
3345///
3346/// // Each channel can be used by a different component
3347/// UntypedRole.builder()
3348///     .name("connection-a")
3349///     .connect_to(channel_a)
3350///     .await?;
3351/// # Ok(())
3352/// # }
3353/// ```
3354#[derive(Debug)]
3355pub struct Channel {
3356    /// Receives messages (or errors) from the counterpart.
3357    pub rx: mpsc::UnboundedReceiver<Result<jsonrpcmsg::Message, crate::Error>>,
3358    /// Sends messages (or errors) to the counterpart.
3359    pub tx: mpsc::UnboundedSender<Result<jsonrpcmsg::Message, crate::Error>>,
3360}
3361
3362impl Channel {
3363    /// Create a pair of connected channel endpoints.
3364    ///
3365    /// Returns two `Channel` instances that are connected to each other:
3366    /// - Messages sent via `channel_a.tx` are received on `channel_b.rx`
3367    /// - Messages sent via `channel_b.tx` are received on `channel_a.rx`
3368    ///
3369    /// # Returns
3370    ///
3371    /// A tuple `(channel_a, channel_b)` of connected channel endpoints.
3372    #[must_use]
3373    pub fn duplex() -> (Self, Self) {
3374        // Create channels: A sends Result<Message> which B receives as Message
3375        let (a_tx, b_rx) = mpsc::unbounded();
3376        let (b_tx, a_rx) = mpsc::unbounded();
3377
3378        let channel_a = Self { rx: a_rx, tx: a_tx };
3379        let channel_b = Self { rx: b_rx, tx: b_tx };
3380
3381        (channel_a, channel_b)
3382    }
3383
3384    /// Copy messages from `rx` to `tx`.
3385    ///
3386    /// # Returns
3387    ///
3388    /// A `Result` indicating success or failure.
3389    pub async fn copy(mut self) -> Result<(), crate::Error> {
3390        while let Some(msg) = self.rx.next().await {
3391            self.tx
3392                .unbounded_send(msg)
3393                .map_err(crate::util::internal_error)?;
3394        }
3395        Ok(())
3396    }
3397}
3398
3399impl<R: Role> ConnectTo<R> for Channel {
3400    async fn connect_to(self, client: impl ConnectTo<R::Counterpart>) -> Result<(), crate::Error> {
3401        let (client_channel, client_serve) = client.into_channel_and_future();
3402
3403        match futures::try_join!(
3404            Channel {
3405                rx: client_channel.rx,
3406                tx: self.tx
3407            }
3408            .copy(),
3409            Channel {
3410                rx: self.rx,
3411                tx: client_channel.tx
3412            }
3413            .copy(),
3414            client_serve
3415        ) {
3416            Ok(((), (), ())) => Ok(()),
3417            Err(err) => Err(err),
3418        }
3419    }
3420
3421    fn into_channel_and_future(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
3422        (self, Box::pin(future::ready(Ok(()))))
3423    }
3424}