Skip to main content

agent_client_protocol/
jsonrpc.rs

1//! Core JSON-RPC server support.
2
3use agent_client_protocol_schema::SessionId;
4// Re-export jsonrpcmsg for use in public API
5pub use jsonrpcmsg;
6
7// Types re-exported from crate root
8use serde::{Deserialize, Serialize};
9use std::fmt::Debug;
10use std::panic::Location;
11use std::pin::pin;
12use uuid::Uuid;
13
14use boxfnonce::SendBoxFnOnce;
15use futures::channel::{mpsc, oneshot};
16use futures::future::{self, BoxFuture, Either};
17use futures::{AsyncRead, AsyncWrite, StreamExt};
18
19mod dynamic_handler;
20pub(crate) mod handlers;
21mod incoming_actor;
22mod outgoing_actor;
23pub(crate) mod run;
24mod task_actor;
25mod transport_actor;
26
27use crate::jsonrpc::dynamic_handler::DynamicHandlerMessage;
28pub use crate::jsonrpc::handlers::NullHandler;
29use crate::jsonrpc::handlers::{ChainedHandler, NamedHandler};
30use crate::jsonrpc::handlers::{MessageHandler, NotificationHandler, RequestHandler};
31use crate::jsonrpc::outgoing_actor::{OutgoingMessageTx, send_raw_message};
32use crate::jsonrpc::run::SpawnedRun;
33use crate::jsonrpc::run::{ChainRun, NullRun, RunWithConnectionTo};
34use crate::jsonrpc::task_actor::{Task, TaskTx};
35use crate::mcp_server::McpServer;
36use crate::role::HasPeer;
37use crate::role::Role;
38use crate::util::json_cast;
39use crate::{Agent, Client, ConnectTo, RoleId};
40
41/// Handlers process incoming JSON-RPC messages on a connection.
42///
43/// When messages arrive, they flow through a chain of handlers. Each handler can
44/// either **claim** the message (handle it) or **decline** it (pass to the next handler).
45///
46/// # Message Flow
47///
48/// Messages flow through three layers of handlers in order:
49///
50/// ```text
51/// ┌─────────────────────────────────────────────────────────────────┐
52/// │                     Incoming Message                            │
53/// └─────────────────────────────────────────────────────────────────┘
54///                              │
55///                              ▼
56/// ┌─────────────────────────────────────────────────────────────────┐
57/// │  1. User Handlers (registered via on_receive_request, etc.)     │
58/// │     - Tried in registration order                               │
59/// │     - First handler to return Handled::Yes claims the message   │
60/// └─────────────────────────────────────────────────────────────────┘
61///                              │ Handled::No
62///                              ▼
63/// ┌─────────────────────────────────────────────────────────────────┐
64/// │  2. Dynamic Handlers (added at runtime)                         │
65/// │     - Used for session-specific message handling                │
66/// │     - Added via ConnectionTo::add_dynamic_handler             │
67/// └─────────────────────────────────────────────────────────────────┘
68///                              │ Handled::No
69///                              ▼
70/// ┌─────────────────────────────────────────────────────────────────┐
71/// │  3. Role Default Handler                                        │
72/// │     - Fallback based on the connection's Role                   │
73/// │     - Handles protocol-level messages (e.g., proxy forwarding)  │
74/// └─────────────────────────────────────────────────────────────────┘
75///                              │ Handled::No
76///                              ▼
77/// ┌─────────────────────────────────────────────────────────────────┐
78/// │  Unhandled: Error response sent (or queued if retry=true)       │
79/// └─────────────────────────────────────────────────────────────────┘
80/// ```
81///
82/// # The `Handled` Return Value
83///
84/// Each handler returns [`Handled`] to indicate whether it processed the message:
85///
86/// - **`Handled::Yes`** - Message was handled. No further handlers are invoked.
87/// - **`Handled::No { message, retry }`** - Message was not handled. The message
88///   (possibly modified) is passed to the next handler in the chain.
89///
90/// For convenience, handlers can return `()` which is equivalent to `Handled::Yes`.
91///
92/// # The Retry Mechanism
93///
94/// The `retry` flag in `Handled::No` controls what happens when no handler claims a message:
95///
96/// - **`retry: false`** (default) - Send a "method not found" error response immediately.
97/// - **`retry: true`** - Queue the message and retry it when new dynamic handlers are added.
98///
99/// This mechanism exists because of a timing issue with sessions: when a `session/new`
100/// response is being processed, the dynamic handler for that session hasn't been registered
101/// yet, but `session/update` notifications for that session may already be arriving.
102/// By setting `retry: true`, these early notifications are queued until the session's
103/// dynamic handler is added.
104///
105/// # Handler Registration
106///
107/// Most users register handlers using the builder methods on [`Builder`]:
108///
109/// ```
110/// # use agent_client_protocol::{Agent, Client, ConnectTo};
111/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, AgentCapabilities};
112/// # use agent_client_protocol_test::StatusUpdate;
113/// # async fn example(transport: impl ConnectTo<Agent>) -> Result<(), agent_client_protocol::Error> {
114/// Agent.builder()
115///     .on_receive_request(async |req: InitializeRequest, responder, cx| {
116///         responder.respond(
117///             InitializeResponse::new(req.protocol_version)
118///                 .agent_capabilities(AgentCapabilities::new()),
119///         )
120///     }, agent_client_protocol::on_receive_request!())
121///     .on_receive_notification(async |notif: StatusUpdate, cx| {
122///         // Process notification
123///         Ok(())
124///     }, agent_client_protocol::on_receive_notification!())
125///     .connect_to(transport)
126///     .await?;
127/// # Ok(())
128/// # }
129/// ```
130///
131/// The type parameter on the closure determines which messages are dispatched to it.
132/// Messages that don't match the type are automatically passed to the next handler.
133///
134/// # Implementing Custom Handlers
135///
136/// For advanced use cases, you can implement `HandleMessageAs` directly:
137///
138/// ```ignore
139/// struct MyHandler;
140///
141/// impl HandleMessageAs<Agent> for MyHandler {
142///
143///     async fn handle_dispatch(
144///         &mut self,
145///         message: Dispatch,
146///         cx: ConnectionTo<Self::Role>,
147///     ) -> Result<Handled<Dispatch>, Error> {
148///         if message.method() == "my/custom/method" {
149///             // Handle it
150///             Ok(Handled::Yes)
151///         } else {
152///             // Pass to next handler
153///             Ok(Handled::No { message, retry: false })
154///         }
155///     }
156///
157///     fn describe_chain(&self) -> impl std::fmt::Debug {
158///         "MyHandler"
159///     }
160/// }
161/// ```
162///
163/// # Important: Handlers Must Not Block
164///
165/// The connection processes messages on a single async task. While a handler is running,
166/// no other messages can be processed. For expensive operations, use [`ConnectionTo::spawn`]
167/// to run work concurrently:
168///
169/// ```
170/// # use agent_client_protocol::{Client, Agent, ConnectTo};
171/// # use agent_client_protocol_test::{expensive_operation, ProcessComplete};
172/// # async fn example(transport: impl ConnectTo<Client>) -> Result<(), agent_client_protocol::Error> {
173/// # Client.builder().connect_with(transport, async |cx| {
174/// cx.spawn({
175///     let connection = cx.clone();
176///     async move {
177///         let result = expensive_operation("data").await?;
178///         connection.send_notification(ProcessComplete { result })?;
179///         Ok(())
180///     }
181/// })?;
182/// # Ok(())
183/// # }).await?;
184/// # Ok(())
185/// # }
186/// ```
187#[allow(async_fn_in_trait)]
188/// A handler for incoming JSON-RPC messages.
189///
190/// This trait is implemented by types that can process incoming messages on a connection.
191/// Handlers are registered with a [`Builder`] and are called in order until
192/// one claims the message.
193///
194/// The type parameter `R` is the role this handler plays - who I am.
195/// For an agent handler, `R = Agent` (I handle messages as an agent).
196/// For a client handler, `R = Client` (I handle messages as a client).
197pub trait HandleDispatchFrom<Counterpart: Role>: Send {
198    /// Attempt to claim an incoming message (request or notification).
199    ///
200    /// # Important: do not block
201    ///
202    /// The server will not process new messages until this handler returns.
203    /// You should avoid blocking in this callback unless you wish to block the server (e.g., for rate limiting).
204    /// The recommended approach to manage expensive operations is to the [`ConnectionTo::spawn`] method available on the message context.
205    ///
206    /// # Parameters
207    ///
208    /// * `message` - The incoming message to handle.
209    /// * `connection` - The connection, used to send messages and access connection state.
210    ///
211    /// # Returns
212    ///
213    /// * `Ok(Handled::Yes)` if the message was claimed. It will not be propagated further.
214    /// * `Ok(Handled::No(message))` if not; the (possibly changed) message will be passed to the remaining handlers.
215    /// * `Err` if an internal error occurs (this will bring down the server).
216    fn handle_dispatch_from(
217        &mut self,
218        message: Dispatch,
219        connection: ConnectionTo<Counterpart>,
220    ) -> impl Future<Output = Result<Handled<Dispatch>, crate::Error>> + Send;
221
222    /// Returns a debug description of the registered handlers for diagnostics.
223    fn describe_chain(&self) -> impl std::fmt::Debug;
224}
225
226impl<Counterpart: Role, H> HandleDispatchFrom<Counterpart> for &mut H
227where
228    H: HandleDispatchFrom<Counterpart>,
229{
230    fn handle_dispatch_from(
231        &mut self,
232        message: Dispatch,
233        cx: ConnectionTo<Counterpart>,
234    ) -> impl Future<Output = Result<Handled<Dispatch>, crate::Error>> + Send {
235        H::handle_dispatch_from(self, message, cx)
236    }
237
238    fn describe_chain(&self) -> impl std::fmt::Debug {
239        H::describe_chain(self)
240    }
241}
242
243/// A JSON-RPC connection that can act as either a server, client, or both.
244///
245/// [`Builder`] provides a builder-style API for creating JSON-RPC servers and clients.
246/// You start by calling `Role.builder()` (e.g., `Client.builder()`), then add message
247/// handlers, and finally drive the connection with either [`connect_to`](Builder::connect_to)
248/// or [`connect_with`](Builder::connect_with), providing a component implementation
249/// (e.g., [`ByteStreams`] for byte streams).
250///
251/// # JSON-RPC Primer
252///
253/// JSON-RPC 2.0 has two fundamental message types:
254///
255/// * **Requests** - Messages that expect a response. They have an `id` field that gets
256///   echoed back in the response so the sender can correlate them.
257/// * **Notifications** - Fire-and-forget messages with no `id` field. The sender doesn't
258///   expect or receive a response.
259///
260/// # Type-Driven Message Dispatch
261///
262/// The handler registration methods use Rust's type system to determine which messages
263/// to handle. The type parameter you provide controls what gets dispatched to your handler:
264///
265/// ## Single Message Types
266///
267/// The simplest case - handle one specific message type:
268///
269/// ```no_run
270/// # use agent_client_protocol_test::*;
271/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, SessionNotification};
272/// # async fn example() -> Result<(), agent_client_protocol::Error> {
273/// # let connection = mock_connection();
274/// connection
275///     .on_receive_request(async |req: InitializeRequest, responder, cx| {
276///         // Handle only InitializeRequest messages
277///         responder.respond(InitializeResponse::make())
278///     }, agent_client_protocol::on_receive_request!())
279///     .on_receive_notification(async |notif: SessionNotification, cx| {
280///         // Handle only SessionUpdate notifications
281///         Ok(())
282///     }, agent_client_protocol::on_receive_notification!())
283/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
284/// # Ok(())
285/// # }
286/// ```
287///
288/// ## Enum Message Types
289///
290/// You can also handle multiple related messages with a single handler by defining an enum
291/// that implements the appropriate trait ([`JsonRpcRequest`] or [`JsonRpcNotification`]):
292///
293/// ```no_run
294/// # use agent_client_protocol_test::*;
295/// # use agent_client_protocol::{JsonRpcRequest, JsonRpcMessage, UntypedMessage};
296/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse};
297/// # async fn example() -> Result<(), agent_client_protocol::Error> {
298/// # let connection = mock_connection();
299/// // Define an enum for multiple request types
300/// #[derive(Debug, Clone)]
301/// enum MyRequests {
302///     Initialize(InitializeRequest),
303///     Prompt(PromptRequest),
304/// }
305///
306/// // Implement JsonRpcRequest for your enum
307/// # impl JsonRpcMessage for MyRequests {
308/// #     fn matches_method(_method: &str) -> bool { false }
309/// #     fn method(&self) -> &str { "myRequests" }
310/// #     fn to_untyped_message(&self) -> Result<UntypedMessage, agent_client_protocol::Error> { todo!() }
311/// #     fn parse_message(_method: &str, _params: &impl serde::Serialize) -> Result<Self, agent_client_protocol::Error> { Err(agent_client_protocol::Error::method_not_found()) }
312/// # }
313/// impl JsonRpcRequest for MyRequests { type Response = serde_json::Value; }
314///
315/// // Handle all variants in one place
316/// connection.on_receive_request(async |req: MyRequests, responder, cx| {
317///     match req {
318///         MyRequests::Initialize(init) => { responder.respond(serde_json::json!({})) }
319///         MyRequests::Prompt(prompt) => { responder.respond(serde_json::json!({})) }
320///     }
321/// }, agent_client_protocol::on_receive_request!())
322/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
323/// # Ok(())
324/// # }
325/// ```
326///
327/// ## Mixed Message Types
328///
329/// For enums containing both requests AND notifications, use [`on_receive_dispatch`](Self::on_receive_dispatch):
330///
331/// ```no_run
332/// # use agent_client_protocol_test::*;
333/// # use agent_client_protocol::Dispatch;
334/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, SessionNotification};
335/// # async fn example() -> Result<(), agent_client_protocol::Error> {
336/// # let connection = mock_connection();
337/// // on_receive_dispatch receives Dispatch which can be either a request or notification
338/// connection.on_receive_dispatch(async |msg: Dispatch<InitializeRequest, SessionNotification>, _cx| {
339///     match msg {
340///         Dispatch::Request(req, responder) => {
341///             responder.respond(InitializeResponse::make())
342///         }
343///         Dispatch::Notification(notif) => {
344///             Ok(())
345///         }
346///         Dispatch::Response(result, router) => {
347///             // Forward response to its destination
348///             router.respond_with_result(result)
349///         }
350///     }
351/// }, agent_client_protocol::on_receive_dispatch!())
352/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
353/// # Ok(())
354/// # }
355/// ```
356///
357/// # Handler Registration
358///
359/// Register handlers using these methods (listed from most common to most flexible):
360///
361/// * [`on_receive_request`](Self::on_receive_request) - Handle JSON-RPC requests (messages expecting responses)
362/// * [`on_receive_notification`](Self::on_receive_notification) - Handle JSON-RPC notifications (fire-and-forget)
363/// * [`on_receive_dispatch`](Self::on_receive_dispatch) - Handle enums containing both requests and notifications
364/// * [`with_handler`](Self::with_handler) - Low-level primitive for maximum flexibility
365///
366/// ## Handler Ordering
367///
368/// Handlers are tried in the order you register them. The first handler that claims a message
369/// (by matching its type) will process it. Subsequent handlers won't see that message:
370///
371/// ```no_run
372/// # use agent_client_protocol_test::*;
373/// # use agent_client_protocol::{Dispatch, UntypedMessage};
374/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse};
375/// # async fn example() -> Result<(), agent_client_protocol::Error> {
376/// # let connection = mock_connection();
377/// connection
378///     .on_receive_request(async |req: InitializeRequest, responder, cx| {
379///         // This runs first for InitializeRequest
380///         responder.respond(InitializeResponse::make())
381///     }, agent_client_protocol::on_receive_request!())
382///     .on_receive_request(async |req: PromptRequest, responder, cx| {
383///         // This runs first for PromptRequest
384///         responder.respond(PromptResponse::make())
385///     }, agent_client_protocol::on_receive_request!())
386///     .on_receive_dispatch(async |msg: Dispatch, cx| {
387///         // This runs for any message not handled above
388///         msg.respond_with_error(agent_client_protocol::util::internal_error("unknown method"), cx)
389///     }, agent_client_protocol::on_receive_dispatch!())
390/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
391/// # Ok(())
392/// # }
393/// ```
394///
395/// # Event Loop and Concurrency
396///
397/// Understanding the event loop is critical for writing correct handlers.
398///
399/// ## The Event Loop
400///
401/// [`Builder`] runs all handler callbacks on a single async task - the event loop.
402/// While a handler is running, **the server cannot receive new messages**. This means
403/// any blocking or expensive work in your handlers will stall the entire connection.
404///
405/// To avoid blocking the event loop, use [`ConnectionTo::spawn`] to offload serious
406/// work to concurrent tasks:
407///
408/// ```no_run
409/// # use agent_client_protocol_test::*;
410/// # async fn example() -> Result<(), agent_client_protocol::Error> {
411/// # let connection = mock_connection();
412/// connection.on_receive_request(async |req: AnalyzeRequest, responder, cx| {
413///     // Clone cx for the spawned task
414///     cx.spawn({
415///         let connection = cx.clone();
416///         async move {
417///             let result = expensive_analysis(&req.data).await?;
418///             connection.send_notification(AnalysisComplete { result })?;
419///             Ok(())
420///         }
421///     })?;
422///
423///     // Respond immediately without blocking
424///     responder.respond(AnalysisStarted { job_id: 42 })
425/// }, agent_client_protocol::on_receive_request!())
426/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
427/// # Ok(())
428/// # }
429/// ```
430///
431/// Note that the entire connection runs within one async task, so parallelism must be
432/// managed explicitly using [`spawn`](ConnectionTo::spawn).
433///
434/// ## The Connection Context
435///
436/// Handler callbacks receive a context object (`cx`) for interacting with the connection:
437///
438/// * **For request handlers** - [`Responder<R>`] provides [`respond`](Responder::respond)
439///   to send the response, plus methods to send other messages
440/// * **For notification handlers** - [`ConnectionTo`] provides methods to send messages
441///   and spawn tasks
442///
443/// Both context types support:
444/// * [`send_request`](ConnectionTo::send_request) - Send requests to the other side
445/// * [`send_notification`](ConnectionTo::send_notification) - Send notifications
446/// * [`spawn`](ConnectionTo::spawn) - Run tasks concurrently without blocking the event loop
447///
448/// The [`SentRequest`] returned by `send_request` provides methods like
449/// [`on_receiving_result`](SentRequest::on_receiving_result) that help you
450/// avoid accidentally blocking the event loop while waiting for responses.
451///
452/// # Driving the Connection
453///
454/// After adding handlers, you must drive the connection using one of two modes:
455///
456/// ## Server Mode: `connect_to()`
457///
458/// Use [`connect_to`](Self::connect_to) when you only need to respond to incoming messages:
459///
460/// ```no_run
461/// # use agent_client_protocol_test::*;
462/// # async fn example() -> Result<(), agent_client_protocol::Error> {
463/// # let connection = mock_connection();
464/// connection
465///     .on_receive_request(async |req: MyRequest, responder, cx| {
466///         responder.respond(MyResponse { status: "ok".into() })
467///     }, agent_client_protocol::on_receive_request!())
468///     .connect_to(MockTransport)  // Runs until connection closes or error occurs
469///     .await?;
470/// # Ok(())
471/// # }
472/// ```
473///
474/// The connection will process incoming messages and invoke your handlers until the
475/// connection is closed or an error occurs.
476///
477/// ## Client Mode: `connect_with()`
478///
479/// Use [`connect_with`](Self::connect_with) when you need to both handle incoming messages
480/// AND send your own requests/notifications:
481///
482/// ```no_run
483/// # use agent_client_protocol_test::*;
484/// # use agent_client_protocol::schema::InitializeRequest;
485/// # async fn example() -> Result<(), agent_client_protocol::Error> {
486/// # let connection = mock_connection();
487/// connection
488///     .on_receive_request(async |req: MyRequest, responder, cx| {
489///         responder.respond(MyResponse { status: "ok".into() })
490///     }, agent_client_protocol::on_receive_request!())
491///     .connect_with(MockTransport, async |cx| {
492///         // You can send requests to the other side
493///         let response = cx.send_request(InitializeRequest::make())
494///             .block_task()
495///             .await?;
496///
497///         // And send notifications
498///         cx.send_notification(StatusUpdate { message: "ready".into() })?;
499///
500///         Ok(())
501///     })
502///     .await?;
503/// # Ok(())
504/// # }
505/// ```
506///
507/// The connection will serve incoming messages in the background while your client closure
508/// runs. When the closure returns, the connection shuts down.
509///
510/// # Example: Complete Agent
511///
512/// ```no_run
513/// # use agent_client_protocol::UntypedRole;
514/// # use agent_client_protocol::{Builder};
515/// # use agent_client_protocol::ByteStreams;
516/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse, SessionNotification};
517/// # use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
518/// # async fn example() -> Result<(), agent_client_protocol::Error> {
519/// let transport = ByteStreams::new(
520///     tokio::io::stdout().compat_write(),
521///     tokio::io::stdin().compat(),
522/// );
523///
524/// UntypedRole.builder()
525///     .name("my-agent")  // Optional: for debugging logs
526///     .on_receive_request(async |init: InitializeRequest, responder, cx| {
527///         let response: InitializeResponse = todo!();
528///         responder.respond(response)
529///     }, agent_client_protocol::on_receive_request!())
530///     .on_receive_request(async |prompt: PromptRequest, responder, cx| {
531///         // You can send notifications while processing a request
532///         let notif: SessionNotification = todo!();
533///         cx.send_notification(notif)?;
534///
535///         // Then respond to the request
536///         let response: PromptResponse = todo!();
537///         responder.respond(response)
538///     }, agent_client_protocol::on_receive_request!())
539///     .connect_to(transport)
540///     .await?;
541/// # Ok(())
542/// # }
543/// ```
544#[must_use]
545#[derive(Debug)]
546pub struct Builder<Host: Role, Handler = NullHandler, Runner = NullRun>
547where
548    Handler: HandleDispatchFrom<Host::Counterpart>,
549    Runner: RunWithConnectionTo<Host::Counterpart>,
550{
551    /// My role.
552    host: Host,
553
554    /// Name of the connection, used in tracing logs.
555    name: Option<String>,
556
557    /// Handler for incoming messages.
558    handler: Handler,
559
560    /// Responder for background tasks.
561    responder: Runner,
562}
563
564impl<Host: Role> Builder<Host, NullHandler, NullRun> {
565    /// Create a new connection builder for the given role.
566    /// This type follows a builder pattern; use other methods to configure and then invoke
567    /// [`Self::connect_to`] (to use as a server) or [`Self::connect_with`] to use as a client.
568    pub fn new(role: Host) -> Self {
569        Self {
570            host: role,
571            name: None,
572            handler: NullHandler,
573            responder: NullRun,
574        }
575    }
576}
577
578impl<Host: Role, Handler> Builder<Host, Handler, NullRun>
579where
580    Handler: HandleDispatchFrom<Host::Counterpart>,
581{
582    /// Create a new connection builder with the given handler.
583    pub fn new_with(role: Host, handler: Handler) -> Self {
584        Self {
585            host: role,
586            name: None,
587            handler,
588            responder: NullRun,
589        }
590    }
591}
592
593impl<
594    Host: Role,
595    Handler: HandleDispatchFrom<Host::Counterpart>,
596    Runner: RunWithConnectionTo<Host::Counterpart>,
597> Builder<Host, Handler, Runner>
598{
599    /// Set the "name" of this connection -- used only for debugging logs.
600    pub fn name(mut self, name: impl ToString) -> Self {
601        self.name = Some(name.to_string());
602        self
603    }
604
605    /// Merge another [`Builder`] into this one.
606    ///
607    /// Prefer [`Self::on_receive_request`] or [`Self::on_receive_notification`].
608    /// This is a low-level method that is not intended for general use.
609    pub fn with_connection_builder(
610        self,
611        other: Builder<
612            Host,
613            impl HandleDispatchFrom<Host::Counterpart>,
614            impl RunWithConnectionTo<Host::Counterpart>,
615        >,
616    ) -> Builder<
617        Host,
618        impl HandleDispatchFrom<Host::Counterpart>,
619        impl RunWithConnectionTo<Host::Counterpart>,
620    > {
621        Builder {
622            host: self.host,
623            name: self.name,
624            handler: ChainedHandler::new(
625                self.handler,
626                NamedHandler::new(other.name, other.handler),
627            ),
628            responder: ChainRun::new(self.responder, other.responder),
629        }
630    }
631
632    /// Add a new [`HandleDispatchFrom`] to the chain.
633    ///
634    /// Prefer [`Self::on_receive_request`] or [`Self::on_receive_notification`].
635    /// This is a low-level method that is not intended for general use.
636    pub fn with_handler(
637        self,
638        handler: impl HandleDispatchFrom<Host::Counterpart>,
639    ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner> {
640        Builder {
641            host: self.host,
642            name: self.name,
643            handler: ChainedHandler::new(self.handler, handler),
644            responder: self.responder,
645        }
646    }
647
648    /// Add a new [`RunWithConnectionTo`] to the chain.
649    pub fn with_responder<Run1>(
650        self,
651        responder: Run1,
652    ) -> Builder<Host, Handler, impl RunWithConnectionTo<Host::Counterpart>>
653    where
654        Run1: RunWithConnectionTo<Host::Counterpart>,
655    {
656        Builder {
657            host: self.host,
658            name: self.name,
659            handler: self.handler,
660            responder: ChainRun::new(self.responder, responder),
661        }
662    }
663
664    /// Enqueue a task to run once the connection is actively serving traffic.
665    #[track_caller]
666    pub fn with_spawned<F, Fut>(
667        self,
668        task: F,
669    ) -> Builder<Host, Handler, impl RunWithConnectionTo<Host::Counterpart>>
670    where
671        F: FnOnce(ConnectionTo<Host::Counterpart>) -> Fut + Send,
672        Fut: Future<Output = Result<(), crate::Error>> + Send,
673    {
674        let location = Location::caller();
675        self.with_responder(SpawnedRun::new(location, task))
676    }
677
678    /// Register a handler for messages that can be either requests OR notifications.
679    ///
680    /// Use this when you want to handle an enum type that contains both request and
681    /// notification variants. Your handler receives a [`Dispatch<Req, Notif>`] which
682    /// is an enum with two variants:
683    ///
684    /// - `Dispatch::Request(request, responder)` - A request with its response context
685    /// - `Dispatch::Notification(notification)` - A notification
686    /// - `Dispatch::Response(result, router)` - A response to a request we sent
687    ///
688    /// # Example
689    ///
690    /// ```no_run
691    /// # use agent_client_protocol_test::*;
692    /// # use agent_client_protocol::Dispatch;
693    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
694    /// # let connection = mock_connection();
695    /// connection.on_receive_dispatch(async |message: Dispatch<MyRequest, StatusUpdate>, _cx| {
696    ///     match message {
697    ///         Dispatch::Request(req, responder) => {
698    ///             // Handle request and send response
699    ///             responder.respond(MyResponse { status: "ok".into() })
700    ///         }
701    ///         Dispatch::Notification(notif) => {
702    ///             // Handle notification (no response needed)
703    ///             Ok(())
704    ///         }
705    ///         Dispatch::Response(result, router) => {
706    ///             // Forward response to its destination
707    ///             router.respond_with_result(result)
708    ///         }
709    ///     }
710    /// }, agent_client_protocol::on_receive_dispatch!())
711    /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
712    /// # Ok(())
713    /// # }
714    /// ```
715    ///
716    /// For most use cases, prefer [`on_receive_request`](Self::on_receive_request) or
717    /// [`on_receive_notification`](Self::on_receive_notification) which provide cleaner APIs
718    /// for handling requests or notifications separately.
719    ///
720    /// # Ordering
721    ///
722    /// This callback runs inside the dispatch loop and blocks further message processing
723    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
724    /// ordering guarantees and how to avoid deadlocks.
725    pub fn on_receive_dispatch<Req, Notif, F, T, ToFut>(
726        self,
727        op: F,
728        to_future_hack: ToFut,
729    ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
730    where
731        Host::Counterpart: HasPeer<Host::Counterpart>,
732        Req: JsonRpcRequest,
733        Notif: JsonRpcNotification,
734        F: AsyncFnMut(
735                Dispatch<Req, Notif>,
736                ConnectionTo<Host::Counterpart>,
737            ) -> Result<T, crate::Error>
738            + Send,
739        T: IntoHandled<Dispatch<Req, Notif>>,
740        ToFut: Fn(
741                &mut F,
742                Dispatch<Req, Notif>,
743                ConnectionTo<Host::Counterpart>,
744            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
745            + Send
746            + Sync,
747    {
748        let handler = MessageHandler::new(
749            self.host.counterpart(),
750            self.host.counterpart(),
751            op,
752            to_future_hack,
753        );
754        self.with_handler(handler)
755    }
756
757    /// Register a handler for JSON-RPC requests of type `Req`.
758    ///
759    /// Your handler receives two arguments:
760    /// 1. The request (type `Req`)
761    /// 2. A [`Responder<R, Req::Response>`] for sending the response
762    ///
763    /// The request context allows you to:
764    /// - Send the response with [`Responder::respond`]
765    /// - Send notifications to the client with [`ConnectionTo::send_notification`]
766    /// - Send requests to the client with [`ConnectionTo::send_request`]
767    ///
768    /// # Example
769    ///
770    /// ```ignore
771    /// # use agent_client_protocol::UntypedRole;
772    /// # use agent_client_protocol::{Builder};
773    /// # use agent_client_protocol::schema::{PromptRequest, PromptResponse, SessionNotification};
774    /// # fn example<R: agent_client_protocol::Role>(connection: Builder<R, impl agent_client_protocol::HandleMessageAs<R>>) {
775    /// connection.on_receive_request(async |request: PromptRequest, responder, cx| {
776    ///     // Send a notification while processing
777    ///     let notif: SessionNotification = todo!();
778    ///     cx.send_notification(notif)?;
779    ///
780    ///     // Do some work...
781    ///     let result = todo!("process the prompt");
782    ///
783    ///     // Send the response
784    ///     let response: PromptResponse = todo!();
785    ///     responder.respond(response)
786    /// }, agent_client_protocol::on_receive_request!());
787    /// # }
788    /// ```
789    ///
790    /// # Type Parameter
791    ///
792    /// `Req` can be either a single request type or an enum of multiple request types.
793    /// See the [type-driven dispatch](Self#type-driven-message-dispatch) section for details.
794    ///
795    /// # Ordering
796    ///
797    /// This callback runs inside the dispatch loop and blocks further message processing
798    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
799    /// ordering guarantees and how to avoid deadlocks.
800    pub fn on_receive_request<Req: JsonRpcRequest, F, T, ToFut>(
801        self,
802        op: F,
803        to_future_hack: ToFut,
804    ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
805    where
806        Host::Counterpart: HasPeer<Host::Counterpart>,
807        F: AsyncFnMut(
808                Req,
809                Responder<Req::Response>,
810                ConnectionTo<Host::Counterpart>,
811            ) -> Result<T, crate::Error>
812            + Send,
813        T: IntoHandled<(Req, Responder<Req::Response>)>,
814        ToFut: Fn(
815                &mut F,
816                Req,
817                Responder<Req::Response>,
818                ConnectionTo<Host::Counterpart>,
819            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
820            + Send
821            + Sync,
822    {
823        let handler = RequestHandler::new(
824            self.host.counterpart(),
825            self.host.counterpart(),
826            op,
827            to_future_hack,
828        );
829        self.with_handler(handler)
830    }
831
832    /// Register a handler for JSON-RPC notifications of type `Notif`.
833    ///
834    /// Notifications are fire-and-forget messages that don't expect a response.
835    /// Your handler receives:
836    /// 1. The notification (type `Notif`)
837    /// 2. A [`ConnectionTo<R>`] for sending messages to the other side
838    ///
839    /// Unlike request handlers, you cannot send a response (notifications don't have IDs),
840    /// but you can still send your own requests and notifications using the context.
841    ///
842    /// # Example
843    ///
844    /// ```no_run
845    /// # use agent_client_protocol_test::*;
846    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
847    /// # let connection = mock_connection();
848    /// connection.on_receive_notification(async |notif: SessionUpdate, cx| {
849    ///     // Process the notification
850    ///     update_session_state(&notif)?;
851    ///
852    ///     // Optionally send a notification back
853    ///     cx.send_notification(StatusUpdate {
854    ///         message: "Acknowledged".into(),
855    ///     })?;
856    ///
857    ///     Ok(())
858    /// }, agent_client_protocol::on_receive_notification!())
859    /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
860    /// # Ok(())
861    /// # }
862    /// ```
863    ///
864    /// # Type Parameter
865    ///
866    /// `Notif` can be either a single notification type or an enum of multiple notification types.
867    /// See the [type-driven dispatch](Self#type-driven-message-dispatch) section for details.
868    ///
869    /// # Ordering
870    ///
871    /// This callback runs inside the dispatch loop and blocks further message processing
872    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
873    /// ordering guarantees and how to avoid deadlocks.
874    pub fn on_receive_notification<Notif, F, T, ToFut>(
875        self,
876        op: F,
877        to_future_hack: ToFut,
878    ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
879    where
880        Host::Counterpart: HasPeer<Host::Counterpart>,
881        Notif: JsonRpcNotification,
882        F: AsyncFnMut(Notif, ConnectionTo<Host::Counterpart>) -> Result<T, crate::Error> + Send,
883        T: IntoHandled<(Notif, ConnectionTo<Host::Counterpart>)>,
884        ToFut: Fn(
885                &mut F,
886                Notif,
887                ConnectionTo<Host::Counterpart>,
888            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
889            + Send
890            + Sync,
891    {
892        let handler = NotificationHandler::new(
893            self.host.counterpart(),
894            self.host.counterpart(),
895            op,
896            to_future_hack,
897        );
898        self.with_handler(handler)
899    }
900
901    /// Register a handler for messages from a specific peer.
902    ///
903    /// This is similar to [`on_receive_dispatch`](Self::on_receive_dispatch), but allows
904    /// specifying the source peer explicitly. This is useful when receiving messages
905    /// from a peer that requires message transformation (e.g., unwrapping `SuccessorMessage`
906    /// envelopes when receiving from an agent via a proxy).
907    ///
908    /// For the common case of receiving from the default counterpart, use
909    /// [`on_receive_dispatch`](Self::on_receive_dispatch) instead.
910    ///
911    /// # Ordering
912    ///
913    /// This callback runs inside the dispatch loop and blocks further message processing
914    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
915    /// ordering guarantees and how to avoid deadlocks.
916    pub fn on_receive_dispatch_from<
917        Req: JsonRpcRequest,
918        Notif: JsonRpcNotification,
919        Peer: Role,
920        F,
921        T,
922        ToFut,
923    >(
924        self,
925        peer: Peer,
926        op: F,
927        to_future_hack: ToFut,
928    ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
929    where
930        Host::Counterpart: HasPeer<Peer>,
931        F: AsyncFnMut(
932                Dispatch<Req, Notif>,
933                ConnectionTo<Host::Counterpart>,
934            ) -> Result<T, crate::Error>
935            + Send,
936        T: IntoHandled<Dispatch<Req, Notif>>,
937        ToFut: Fn(
938                &mut F,
939                Dispatch<Req, Notif>,
940                ConnectionTo<Host::Counterpart>,
941            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
942            + Send
943            + Sync,
944    {
945        let handler = MessageHandler::new(self.host.counterpart(), peer, op, to_future_hack);
946        self.with_handler(handler)
947    }
948
949    /// Register a handler for JSON-RPC requests from a specific peer.
950    ///
951    /// This is similar to [`on_receive_request`](Self::on_receive_request), but allows
952    /// specifying the source peer explicitly. This is useful when receiving messages
953    /// from a peer that requires message transformation (e.g., unwrapping `SuccessorRequest`
954    /// envelopes when receiving from an agent via a proxy).
955    ///
956    /// For the common case of receiving from the default counterpart, use
957    /// [`on_receive_request`](Self::on_receive_request) instead.
958    ///
959    /// # Example
960    ///
961    /// ```ignore
962    /// use agent_client_protocol::Agent;
963    /// use agent_client_protocol::schema::InitializeRequest;
964    ///
965    /// // Conductor receiving from agent direction - messages will be unwrapped from SuccessorMessage
966    /// connection.on_receive_request_from(Agent, async |req: InitializeRequest, responder, cx| {
967    ///     // Handle the request
968    ///     responder.respond(InitializeResponse::make())
969    /// })
970    /// ```
971    ///
972    /// # Ordering
973    ///
974    /// This callback runs inside the dispatch loop and blocks further message processing
975    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
976    /// ordering guarantees and how to avoid deadlocks.
977    pub fn on_receive_request_from<Req: JsonRpcRequest, Peer: Role, F, T, ToFut>(
978        self,
979        peer: Peer,
980        op: F,
981        to_future_hack: ToFut,
982    ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
983    where
984        Host::Counterpart: HasPeer<Peer>,
985        F: AsyncFnMut(
986                Req,
987                Responder<Req::Response>,
988                ConnectionTo<Host::Counterpart>,
989            ) -> Result<T, crate::Error>
990            + Send,
991        T: IntoHandled<(Req, Responder<Req::Response>)>,
992        ToFut: Fn(
993                &mut F,
994                Req,
995                Responder<Req::Response>,
996                ConnectionTo<Host::Counterpart>,
997            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
998            + Send
999            + Sync,
1000    {
1001        let handler = RequestHandler::new(self.host.counterpart(), peer, op, to_future_hack);
1002        self.with_handler(handler)
1003    }
1004
1005    /// Register a handler for JSON-RPC notifications from a specific peer.
1006    ///
1007    /// This is similar to [`on_receive_notification`](Self::on_receive_notification), but allows
1008    /// specifying the source peer explicitly. This is useful when receiving messages
1009    /// from a peer that requires message transformation (e.g., unwrapping `SuccessorNotification`
1010    /// envelopes when receiving from an agent via a proxy).
1011    ///
1012    /// For the common case of receiving from the default counterpart, use
1013    /// [`on_receive_notification`](Self::on_receive_notification) instead.
1014    ///
1015    /// # Ordering
1016    ///
1017    /// This callback runs inside the dispatch loop and blocks further message processing
1018    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
1019    /// ordering guarantees and how to avoid deadlocks.
1020    pub fn on_receive_notification_from<Notif: JsonRpcNotification, Peer: Role, F, T, ToFut>(
1021        self,
1022        peer: Peer,
1023        op: F,
1024        to_future_hack: ToFut,
1025    ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
1026    where
1027        Host::Counterpart: HasPeer<Peer>,
1028        F: AsyncFnMut(Notif, ConnectionTo<Host::Counterpart>) -> Result<T, crate::Error> + Send,
1029        T: IntoHandled<(Notif, ConnectionTo<Host::Counterpart>)>,
1030        ToFut: Fn(
1031                &mut F,
1032                Notif,
1033                ConnectionTo<Host::Counterpart>,
1034            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
1035            + Send
1036            + Sync,
1037    {
1038        let handler = NotificationHandler::new(self.host.counterpart(), peer, op, to_future_hack);
1039        self.with_handler(handler)
1040    }
1041
1042    /// Add an MCP server that will be added to all new sessions that are proxied through this connection.
1043    ///
1044    /// Only applicable to proxies.
1045    pub fn with_mcp_server(
1046        self,
1047        mcp_server: McpServer<Host::Counterpart, impl RunWithConnectionTo<Host::Counterpart>>,
1048    ) -> Builder<
1049        Host,
1050        impl HandleDispatchFrom<Host::Counterpart>,
1051        impl RunWithConnectionTo<Host::Counterpart>,
1052    >
1053    where
1054        Host::Counterpart: HasPeer<Agent> + HasPeer<Client>,
1055    {
1056        let (handler, responder) = mcp_server.into_handler_and_responder();
1057        self.with_handler(handler).with_responder(responder)
1058    }
1059
1060    /// Run in server mode with the provided transport.
1061    ///
1062    /// This drives the connection by continuously processing messages from the transport
1063    /// and dispatching them to your registered handlers. The connection will run until:
1064    /// - The transport closes (e.g., EOF on byte streams)
1065    /// - An error occurs
1066    /// - One of your handlers returns an error
1067    ///
1068    /// The transport is responsible for serializing and deserializing `jsonrpcmsg::Message`
1069    /// values to/from the underlying I/O mechanism (byte streams, channels, etc.).
1070    ///
1071    /// Use this mode when you only need to respond to incoming messages and don't need
1072    /// to initiate your own requests. If you need to send requests to the other side,
1073    /// use [`connect_with`](Self::connect_with) instead.
1074    ///
1075    /// # Example: Byte Stream Transport
1076    ///
1077    /// ```no_run
1078    /// # use agent_client_protocol::UntypedRole;
1079    /// # use agent_client_protocol::{Builder};
1080    /// # use agent_client_protocol::ByteStreams;
1081    /// # use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
1082    /// # use agent_client_protocol_test::*;
1083    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
1084    /// let transport = ByteStreams::new(
1085    ///     tokio::io::stdout().compat_write(),
1086    ///     tokio::io::stdin().compat(),
1087    /// );
1088    ///
1089    /// UntypedRole.builder()
1090    ///     .on_receive_request(async |req: MyRequest, responder, cx| {
1091    ///         responder.respond(MyResponse { status: "ok".into() })
1092    ///     }, agent_client_protocol::on_receive_request!())
1093    ///     .connect_to(transport)
1094    ///     .await?;
1095    /// # Ok(())
1096    /// # }
1097    /// ```
1098    pub async fn connect_to(
1099        self,
1100        transport: impl ConnectTo<Host> + 'static,
1101    ) -> Result<(), crate::Error> {
1102        self.connect_with(transport, async move |_cx| future::pending().await)
1103            .await
1104    }
1105
1106    /// Run the connection until the provided closure completes.
1107    ///
1108    /// This drives the connection by:
1109    /// 1. Running your registered handlers in the background to process incoming messages
1110    /// 2. Executing your `main_fn` closure with a [`ConnectionTo<R>`] for sending requests/notifications
1111    ///
1112    /// The connection stays active until your `main_fn` returns, then shuts down gracefully.
1113    /// If the connection closes unexpectedly before `main_fn` completes, this returns an error.
1114    ///
1115    /// Use this mode when you need to initiate communication (send requests/notifications)
1116    /// in addition to responding to incoming messages. For server-only mode where you just
1117    /// respond to messages, use [`connect_to`](Self::connect_to) instead.
1118    ///
1119    /// # Example
1120    ///
1121    /// ```no_run
1122    /// # use agent_client_protocol::UntypedRole;
1123    /// # use agent_client_protocol::{Builder};
1124    /// # use agent_client_protocol::ByteStreams;
1125    /// # use agent_client_protocol::schema::InitializeRequest;
1126    /// # use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
1127    /// # use agent_client_protocol_test::*;
1128    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
1129    /// let transport = ByteStreams::new(
1130    ///     tokio::io::stdout().compat_write(),
1131    ///     tokio::io::stdin().compat(),
1132    /// );
1133    ///
1134    /// UntypedRole.builder()
1135    ///     .on_receive_request(async |req: MyRequest, responder, cx| {
1136    ///         // Handle incoming requests in the background
1137    ///         responder.respond(MyResponse { status: "ok".into() })
1138    ///     }, agent_client_protocol::on_receive_request!())
1139    ///     .connect_with(transport, async |cx| {
1140    ///         // Initialize the protocol
1141    ///         let init_response = cx.send_request(InitializeRequest::make())
1142    ///             .block_task()
1143    ///             .await?;
1144    ///
1145    ///         // Send more requests...
1146    ///         let result = cx.send_request(MyRequest {})
1147    ///             .block_task()
1148    ///             .await?;
1149    ///
1150    ///         // When this closure returns, the connection shuts down
1151    ///         Ok(())
1152    ///     })
1153    ///     .await?;
1154    /// # Ok(())
1155    /// # }
1156    /// ```
1157    ///
1158    /// # Parameters
1159    ///
1160    /// - `main_fn`: Your client logic. Receives a [`ConnectionTo<R>`] for sending messages.
1161    ///
1162    /// # Errors
1163    ///
1164    /// Returns an error if the connection closes before `main_fn` completes.
1165    pub async fn connect_with<R>(
1166        self,
1167        transport: impl ConnectTo<Host> + 'static,
1168        main_fn: impl AsyncFnOnce(ConnectionTo<Host::Counterpart>) -> Result<R, crate::Error>,
1169    ) -> Result<R, crate::Error> {
1170        let (_, future) = self.into_connection_and_future(transport, main_fn);
1171        future.await
1172    }
1173
1174    /// Helper that returns a [`ConnectionTo<R>`] and a future that runs this connection until `main_fn` returns.
1175    fn into_connection_and_future<R>(
1176        self,
1177        transport: impl ConnectTo<Host> + 'static,
1178        main_fn: impl AsyncFnOnce(ConnectionTo<Host::Counterpart>) -> Result<R, crate::Error>,
1179    ) -> (
1180        ConnectionTo<Host::Counterpart>,
1181        impl Future<Output = Result<R, crate::Error>>,
1182    ) {
1183        let Self {
1184            name,
1185            handler,
1186            responder,
1187            host: me,
1188        } = self;
1189
1190        let (outgoing_tx, outgoing_rx) = mpsc::unbounded();
1191        let (new_task_tx, new_task_rx) = mpsc::unbounded();
1192        let (dynamic_handler_tx, dynamic_handler_rx) = mpsc::unbounded();
1193        let connection = ConnectionTo::new(
1194            me.counterpart(),
1195            outgoing_tx,
1196            new_task_tx,
1197            dynamic_handler_tx,
1198        );
1199
1200        // Convert transport into server - this returns a channel for us to use
1201        // and a future that runs the transport
1202        let transport_component = crate::DynConnectTo::new(transport);
1203        let (transport_channel, transport_future) = transport_component.into_channel_and_future();
1204        let spawn_result = connection.spawn(transport_future);
1205
1206        // Destructure the channel endpoints
1207        let Channel {
1208            rx: transport_incoming_rx,
1209            tx: transport_outgoing_tx,
1210        } = transport_channel;
1211
1212        let (reply_tx, reply_rx) = mpsc::unbounded();
1213
1214        let future = crate::util::instrument_with_connection_name(name, {
1215            let connection = connection.clone();
1216            async move {
1217                let () = spawn_result?;
1218
1219                let background = async {
1220                    futures::try_join!(
1221                        // Protocol layer: OutgoingMessage → jsonrpcmsg::Message
1222                        outgoing_actor::outgoing_protocol_actor(
1223                            outgoing_rx,
1224                            reply_tx.clone(),
1225                            transport_outgoing_tx,
1226                        ),
1227                        // Protocol layer: jsonrpcmsg::Message → handler/reply routing
1228                        incoming_actor::incoming_protocol_actor(
1229                            me.counterpart(),
1230                            &connection,
1231                            transport_incoming_rx,
1232                            dynamic_handler_rx,
1233                            reply_rx,
1234                            handler,
1235                        ),
1236                        task_actor::task_actor(new_task_rx, &connection),
1237                        responder.run_with_connection_to(connection.clone()),
1238                    )?;
1239                    Ok(())
1240                };
1241
1242                crate::util::run_until(Box::pin(background), Box::pin(main_fn(connection.clone())))
1243                    .await
1244            }
1245        });
1246
1247        (connection, future)
1248    }
1249}
1250
1251impl<R, H, Run> ConnectTo<R::Counterpart> for Builder<R, H, Run>
1252where
1253    R: Role,
1254    H: HandleDispatchFrom<R::Counterpart> + 'static,
1255    Run: RunWithConnectionTo<R::Counterpart> + 'static,
1256{
1257    async fn connect_to(self, client: impl ConnectTo<R>) -> Result<(), crate::Error> {
1258        Builder::connect_to(self, client).await
1259    }
1260}
1261
1262/// The payload sent through the response oneshot channel.
1263///
1264/// Includes the response value and an optional ack channel for dispatch loop
1265/// synchronization.
1266pub(crate) struct ResponsePayload {
1267    /// The response result - either the JSON value or an error.
1268    pub(crate) result: Result<serde_json::Value, crate::Error>,
1269
1270    /// Optional acknowledgment channel for dispatch loop synchronization.
1271    ///
1272    /// When present, the receiver must send on this channel to signal that
1273    /// response processing is complete, allowing the dispatch loop to continue
1274    /// to the next message.
1275    ///
1276    /// This is `None` for error paths where the response is sent directly
1277    /// (e.g., when the outgoing channel is broken) rather than through the
1278    /// normal dispatch loop flow.
1279    pub(crate) ack_tx: Option<oneshot::Sender<()>>,
1280}
1281
1282impl std::fmt::Debug for ResponsePayload {
1283    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1284        f.debug_struct("ResponsePayload")
1285            .field("result", &self.result)
1286            .field("ack_tx", &self.ack_tx.as_ref().map(|_| "..."))
1287            .finish()
1288    }
1289}
1290
1291/// Message sent to the incoming actor for reply subscription management.
1292enum ReplyMessage {
1293    /// Subscribe to receive a response for the given request id.
1294    /// When a response with this id arrives, it will be sent through the oneshot
1295    /// along with an ack channel that must be signaled when processing is complete.
1296    /// The method name is stored to allow routing responses through typed handlers.
1297    Subscribe {
1298        id: jsonrpcmsg::Id,
1299
1300        /// id of the peer this request was sent to
1301        role_id: RoleId,
1302
1303        /// (original) method of the request -- the actual request may have been transformed
1304        /// to a successor method, but this will reflect the method of the wrapped request
1305        method: String,
1306
1307        sender: oneshot::Sender<ResponsePayload>,
1308    },
1309}
1310
1311impl std::fmt::Debug for ReplyMessage {
1312    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1313        match self {
1314            ReplyMessage::Subscribe { id, method, .. } => f
1315                .debug_struct("Subscribe")
1316                .field("id", id)
1317                .field("method", method)
1318                .finish(),
1319        }
1320    }
1321}
1322
1323/// Messages send to be serialized over the transport.
1324#[derive(Debug)]
1325enum OutgoingMessage {
1326    /// Send a request to the server.
1327    Request {
1328        /// id assigned to this request (generated by sender)
1329        id: jsonrpcmsg::Id,
1330
1331        /// the original method
1332        method: String,
1333
1334        /// the peer we sent this to
1335        role_id: RoleId,
1336
1337        /// the message to send; this may have a distinct method
1338        /// depending on the peer
1339        untyped: UntypedMessage,
1340
1341        /// where to send the response when it arrives (includes ack channel)
1342        response_tx: oneshot::Sender<ResponsePayload>,
1343    },
1344
1345    /// Send a notification to the server.
1346    Notification {
1347        /// the message to send; this may have a distinct method
1348        /// depending on the peer
1349        untyped: UntypedMessage,
1350    },
1351
1352    /// Send a response to a message from the server
1353    Response {
1354        id: jsonrpcmsg::Id,
1355
1356        response: Result<serde_json::Value, crate::Error>,
1357    },
1358
1359    /// Send a generalized error message
1360    Error { error: crate::Error },
1361}
1362
1363/// Return type from JrHandler; indicates whether the request was handled or not.
1364#[must_use]
1365#[derive(Debug)]
1366pub enum Handled<T> {
1367    /// The message was handled
1368    Yes,
1369
1370    /// The message was not handled; returns the original value.
1371    ///
1372    /// If `retry` is true,
1373    No {
1374        /// The message to be passed to subsequent handlers
1375        /// (typically the original message, but it may have been
1376        /// mutated.)
1377        message: T,
1378
1379        /// If true, request the message to be queued and retried with
1380        /// dynamic handlers as they are added.
1381        ///
1382        /// This is used for managing session updates since the dynamic
1383        /// handler for a session cannot be added until the response to the
1384        /// new session request has been processed and there may be updates
1385        /// that get processed at the same time.
1386        retry: bool,
1387    },
1388}
1389
1390/// Trait for converting handler return values into [`Handled`].
1391///
1392/// This trait allows handlers to return either `()` (which becomes `Handled::Yes`)
1393/// or an explicit `Handled<T>` value for more control over handler propagation.
1394pub trait IntoHandled<T> {
1395    /// Convert this value into a `Handled<T>`.
1396    fn into_handled(self) -> Handled<T>;
1397}
1398
1399impl<T> IntoHandled<T> for () {
1400    fn into_handled(self) -> Handled<T> {
1401        Handled::Yes
1402    }
1403}
1404
1405impl<T> IntoHandled<T> for Handled<T> {
1406    fn into_handled(self) -> Handled<T> {
1407        self
1408    }
1409}
1410
1411/// Connection context for sending messages and spawning tasks.
1412///
1413/// This is the primary handle for interacting with the JSON-RPC connection from
1414/// within handler callbacks. You can use it to:
1415///
1416/// * Send requests and notifications to the other side
1417/// * Spawn concurrent tasks that run alongside the connection
1418/// * Respond to requests (via [`Responder`] which wraps this)
1419///
1420/// # Cloning
1421///
1422/// `ConnectionTo` is cheaply cloneable - all clones refer to the same underlying connection.
1423/// This makes it easy to share across async tasks.
1424///
1425/// # Event Loop and Concurrency
1426///
1427/// Handler callbacks run on the event loop, which means the connection cannot process new
1428/// messages while your handler is running. Use [`spawn`](Self::spawn) to offload any
1429/// expensive or blocking work to concurrent tasks.
1430///
1431/// See the [Event Loop and Concurrency](Builder#event-loop-and-concurrency) section
1432/// for more details.
1433#[derive(Clone, Debug)]
1434pub struct ConnectionTo<Counterpart: Role> {
1435    counterpart: Counterpart,
1436    message_tx: OutgoingMessageTx,
1437    task_tx: TaskTx,
1438    dynamic_handler_tx: mpsc::UnboundedSender<DynamicHandlerMessage<Counterpart>>,
1439}
1440
1441impl<Counterpart: Role> ConnectionTo<Counterpart> {
1442    fn new(
1443        counterpart: Counterpart,
1444        message_tx: mpsc::UnboundedSender<OutgoingMessage>,
1445        task_tx: mpsc::UnboundedSender<Task>,
1446        dynamic_handler_tx: mpsc::UnboundedSender<DynamicHandlerMessage<Counterpart>>,
1447    ) -> Self {
1448        Self {
1449            counterpart,
1450            message_tx,
1451            task_tx,
1452            dynamic_handler_tx,
1453        }
1454    }
1455
1456    /// Return the counterpart role this connection is talking to.
1457    pub fn counterpart(&self) -> Counterpart {
1458        self.counterpart.clone()
1459    }
1460
1461    /// Spawns a task that will run so long as the JSON-RPC connection is being served.
1462    ///
1463    /// This is the primary mechanism for offloading expensive work from handler callbacks
1464    /// to avoid blocking the event loop. Spawned tasks run concurrently with the connection,
1465    /// allowing the server to continue processing messages.
1466    ///
1467    /// # Event Loop
1468    ///
1469    /// Handler callbacks run on the event loop, which cannot process new messages while
1470    /// your handler is running. Use `spawn` for any expensive operations:
1471    ///
1472    /// ```no_run
1473    /// # use agent_client_protocol_test::*;
1474    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
1475    /// # let connection = mock_connection();
1476    /// connection.on_receive_request(async |req: ProcessRequest, responder, cx| {
1477    ///     // Clone cx for the spawned task
1478    ///     cx.spawn({
1479    ///         let connection = cx.clone();
1480    ///         async move {
1481    ///             let result = expensive_operation(&req.data).await?;
1482    ///             connection.send_notification(ProcessComplete { result })?;
1483    ///             Ok(())
1484    ///         }
1485    ///     })?;
1486    ///
1487    ///     // Respond immediately
1488    ///     responder.respond(ProcessResponse { result: "started".into() })
1489    /// }, agent_client_protocol::on_receive_request!())
1490    /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
1491    /// # Ok(())
1492    /// # }
1493    /// ```
1494    ///
1495    /// # Errors
1496    ///
1497    /// If the spawned task returns an error, the entire server will shut down.
1498    #[track_caller]
1499    pub fn spawn(
1500        &self,
1501        task: impl IntoFuture<Output = Result<(), crate::Error>, IntoFuture: Send + 'static>,
1502    ) -> Result<(), crate::Error> {
1503        let location = std::panic::Location::caller();
1504        let task = task.into_future();
1505        Task::new(location, task).spawn(&self.task_tx)
1506    }
1507
1508    /// Spawn a JSON-RPC connection in the background and return a [`ConnectionTo`] for sending messages to it.
1509    ///
1510    /// This is useful for creating multiple connections that communicate with each other,
1511    /// such as implementing proxy patterns or connecting to multiple backend services.
1512    ///
1513    /// # Arguments
1514    ///
1515    /// - `builder`: The connection builder with handlers configured
1516    /// - `transport`: The transport component to connect to
1517    ///
1518    /// # Returns
1519    ///
1520    /// A `ConnectionTo` that you can use to send requests and notifications to the spawned connection.
1521    ///
1522    /// # Example: Proxying to a backend connection
1523    ///
1524    /// ```
1525    /// # use agent_client_protocol::UntypedRole;
1526    /// # use agent_client_protocol::{Builder, ConnectionTo};
1527    /// # use agent_client_protocol_test::*;
1528    /// # async fn example(cx: ConnectionTo<UntypedRole>) -> Result<(), agent_client_protocol::Error> {
1529    /// // Set up a backend connection builder
1530    /// let backend = UntypedRole.builder()
1531    ///     .on_receive_request(async |req: MyRequest, responder, _cx| {
1532    ///         responder.respond(MyResponse { status: "ok".into() })
1533    ///     }, agent_client_protocol::on_receive_request!());
1534    ///
1535    /// // Spawn it and get a context to send requests to it
1536    /// let backend_connection = cx.spawn_connection(backend, MockTransport)?;
1537    ///
1538    /// // Now you can forward requests to the backend
1539    /// let response = backend_connection.send_request(MyRequest {}).block_task().await?;
1540    /// # Ok(())
1541    /// # }
1542    /// ```
1543    #[track_caller]
1544    pub fn spawn_connection<R: Role>(
1545        &self,
1546        builder: Builder<
1547            R,
1548            impl HandleDispatchFrom<R::Counterpart> + 'static,
1549            impl RunWithConnectionTo<R::Counterpart> + 'static,
1550        >,
1551        transport: impl ConnectTo<R> + 'static,
1552    ) -> Result<ConnectionTo<R::Counterpart>, crate::Error> {
1553        let (connection, future) =
1554            builder.into_connection_and_future(transport, |_| std::future::pending());
1555        Task::new(std::panic::Location::caller(), future).spawn(&self.task_tx)?;
1556        Ok(connection)
1557    }
1558
1559    /// Send a request/notification and forward the response appropriately.
1560    ///
1561    /// The request context's response type matches the request's response type,
1562    /// enabling type-safe message forwarding.
1563    pub fn send_proxied_message<Req: JsonRpcRequest<Response: Send>, Notif: JsonRpcNotification>(
1564        &self,
1565        message: Dispatch<Req, Notif>,
1566    ) -> Result<(), crate::Error>
1567    where
1568        Counterpart: HasPeer<Counterpart>,
1569    {
1570        self.send_proxied_message_to(self.counterpart(), message)
1571    }
1572
1573    /// Send a request/notification and forward the response appropriately.
1574    ///
1575    /// The request context's response type matches the request's response type,
1576    /// enabling type-safe message forwarding.
1577    pub fn send_proxied_message_to<
1578        Peer: Role,
1579        Req: JsonRpcRequest<Response: Send>,
1580        Notif: JsonRpcNotification,
1581    >(
1582        &self,
1583        peer: Peer,
1584        message: Dispatch<Req, Notif>,
1585    ) -> Result<(), crate::Error>
1586    where
1587        Counterpart: HasPeer<Peer>,
1588    {
1589        match message {
1590            Dispatch::Request(request, responder) => self
1591                .send_request_to(peer, request)
1592                .forward_response_to(responder),
1593            Dispatch::Notification(notification) => self.send_notification_to(peer, notification),
1594            Dispatch::Response(result, router) => {
1595                // Responses are forwarded directly to their destination
1596                router.respond_with_result(result)
1597            }
1598        }
1599    }
1600
1601    /// Send an outgoing request and return a [`SentRequest`] for handling the reply.
1602    ///
1603    /// The returned [`SentRequest`] provides methods for receiving the response without
1604    /// blocking the event loop:
1605    ///
1606    /// * [`on_receiving_result`](SentRequest::on_receiving_result) - Schedule
1607    ///   a callback to run when the response arrives (doesn't block the event loop)
1608    /// * [`block_task`](SentRequest::block_task) - Block the current task until the response
1609    ///   arrives (only safe in spawned tasks, not in handlers)
1610    ///
1611    /// # Anti-Footgun Design
1612    ///
1613    /// The API intentionally makes it difficult to block on the result directly to prevent
1614    /// the common mistake of blocking the event loop while waiting for a response:
1615    ///
1616    /// ```compile_fail
1617    /// # use agent_client_protocol_test::*;
1618    /// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
1619    /// // ❌ This doesn't compile - prevents blocking the event loop
1620    /// let response = cx.send_request(MyRequest {}).await?;
1621    /// # Ok(())
1622    /// # }
1623    /// ```
1624    ///
1625    /// ```no_run
1626    /// # use agent_client_protocol_test::*;
1627    /// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
1628    /// // ✅ Option 1: Schedule callback (safe in handlers)
1629    /// cx.send_request(MyRequest {})
1630    ///     .on_receiving_result(async |result| {
1631    ///         // Handle the response
1632    ///         Ok(())
1633    ///     })?;
1634    ///
1635    /// // ✅ Option 2: Block in spawned task (safe because task is concurrent)
1636    /// cx.spawn({
1637    ///     let cx = cx.clone();
1638    ///     async move {
1639    ///         let response = cx.send_request(MyRequest {})
1640    ///             .block_task()
1641    ///             .await?;
1642    ///         // Process response...
1643    ///         Ok(())
1644    ///     }
1645    /// })?;
1646    /// # Ok(())
1647    /// # }
1648    /// ```
1649    /// Send an outgoing request to the default counterpart peer.
1650    ///
1651    /// This is a convenience method that sends to the counterpart role `R`.
1652    /// For explicit control over the target peer, use [`send_request_to`](Self::send_request_to).
1653    pub fn send_request<Req: JsonRpcRequest>(&self, request: Req) -> SentRequest<Req::Response>
1654    where
1655        Counterpart: HasPeer<Counterpart>,
1656    {
1657        self.send_request_to(self.counterpart.clone(), request)
1658    }
1659
1660    /// Send an outgoing request to a specific peer.
1661    ///
1662    /// The message will be transformed according to the [`HasPeer`](crate::role::HasPeer)
1663    /// implementation before being sent.
1664    pub fn send_request_to<Peer: Role, Req: JsonRpcRequest>(
1665        &self,
1666        peer: Peer,
1667        request: Req,
1668    ) -> SentRequest<Req::Response>
1669    where
1670        Counterpart: HasPeer<Peer>,
1671    {
1672        let method = request.method().to_string();
1673        let id = jsonrpcmsg::Id::String(uuid::Uuid::new_v4().to_string());
1674        let (response_tx, response_rx) = oneshot::channel();
1675        let role_id = peer.role_id();
1676        let remote_style = self.counterpart.remote_style(peer);
1677        match remote_style.transform_outgoing_message(request) {
1678            Ok(untyped) => {
1679                // Transform the message for the target role
1680                let message = OutgoingMessage::Request {
1681                    id: id.clone(),
1682                    method: method.clone(),
1683                    role_id,
1684                    untyped,
1685                    response_tx,
1686                };
1687
1688                match self.message_tx.unbounded_send(message) {
1689                    Ok(()) => (),
1690                    Err(error) => {
1691                        let OutgoingMessage::Request {
1692                            method,
1693                            response_tx,
1694                            ..
1695                        } = error.into_inner()
1696                        else {
1697                            unreachable!();
1698                        };
1699
1700                        response_tx
1701                            .send(ResponsePayload {
1702                                result: Err(crate::util::internal_error(format!(
1703                                    "failed to send outgoing request `{method}"
1704                                ))),
1705                                ack_tx: None,
1706                            })
1707                            .unwrap();
1708                    }
1709                }
1710            }
1711
1712            Err(err) => {
1713                response_tx
1714                    .send(ResponsePayload {
1715                        result: Err(crate::util::internal_error(format!(
1716                            "failed to create untyped request for `{method}`: {err}"
1717                        ))),
1718                        ack_tx: None,
1719                    })
1720                    .unwrap();
1721            }
1722        }
1723
1724        SentRequest::new(id, method.clone(), self.task_tx.clone(), response_rx)
1725            .map(move |json| <Req::Response>::from_value(&method, json))
1726    }
1727
1728    /// Send an outgoing notification to the default counterpart peer (no reply expected).
1729    ///
1730    /// Notifications are fire-and-forget messages that don't have IDs and don't expect responses.
1731    /// This method sends the notification immediately and returns.
1732    ///
1733    /// This is a convenience method that sends to the counterpart role `R`.
1734    /// For explicit control over the target peer, use [`send_notification_to`](Self::send_notification_to).
1735    ///
1736    /// ```no_run
1737    /// # use agent_client_protocol_test::*;
1738    /// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::Agent>) -> Result<(), agent_client_protocol::Error> {
1739    /// cx.send_notification(StatusUpdate {
1740    ///     message: "Processing...".into(),
1741    /// })?;
1742    /// # Ok(())
1743    /// # }
1744    /// ```
1745    pub fn send_notification<N: JsonRpcNotification>(
1746        &self,
1747        notification: N,
1748    ) -> Result<(), crate::Error>
1749    where
1750        Counterpart: HasPeer<Counterpart>,
1751    {
1752        self.send_notification_to(self.counterpart.clone(), notification)
1753    }
1754
1755    /// Send an outgoing notification to a specific peer (no reply expected).
1756    ///
1757    /// The message will be transformed according to the [`HasPeer`](crate::role::HasPeer)
1758    /// implementation before being sent.
1759    pub fn send_notification_to<Peer: Role, N: JsonRpcNotification>(
1760        &self,
1761        peer: Peer,
1762        notification: N,
1763    ) -> Result<(), crate::Error>
1764    where
1765        Counterpart: HasPeer<Peer>,
1766    {
1767        let remote_style = self.counterpart.remote_style(peer);
1768        tracing::debug!(
1769            role = std::any::type_name::<Counterpart>(),
1770            peer = std::any::type_name::<Peer>(),
1771            notification_type = std::any::type_name::<N>(),
1772            ?remote_style,
1773            original_method = notification.method(),
1774            "send_notification_to"
1775        );
1776        let transformed = remote_style.transform_outgoing_message(notification)?;
1777        tracing::debug!(
1778            transformed_method = %transformed.method,
1779            "send_notification_to transformed"
1780        );
1781        send_raw_message(
1782            &self.message_tx,
1783            OutgoingMessage::Notification {
1784                untyped: transformed,
1785            },
1786        )
1787    }
1788
1789    /// Send an error notification (no reply expected).
1790    pub fn send_error_notification(&self, error: crate::Error) -> Result<(), crate::Error> {
1791        send_raw_message(&self.message_tx, OutgoingMessage::Error { error })
1792    }
1793
1794    /// Register a dynamic message handler, used to intercept messages specific to a particular session
1795    /// or some similar modal thing.
1796    ///
1797    /// Dynamic message handlers are called first for every incoming message.
1798    ///
1799    /// If they decline to handle the message, then the message is passed to the regular registered handlers.
1800    ///
1801    /// The handler will stay registered until the returned registration guard is dropped.
1802    pub fn add_dynamic_handler(
1803        &self,
1804        handler: impl HandleDispatchFrom<Counterpart> + 'static,
1805    ) -> Result<DynamicHandlerRegistration<Counterpart>, crate::Error> {
1806        let uuid = Uuid::new_v4();
1807        self.dynamic_handler_tx
1808            .unbounded_send(DynamicHandlerMessage::AddDynamicHandler(
1809                uuid,
1810                Box::new(handler),
1811            ))
1812            .map_err(crate::util::internal_error)?;
1813
1814        Ok(DynamicHandlerRegistration::new(uuid, self.clone()))
1815    }
1816
1817    fn remove_dynamic_handler(&self, uuid: Uuid) {
1818        // Ignore errors
1819        drop(
1820            self.dynamic_handler_tx
1821                .unbounded_send(DynamicHandlerMessage::RemoveDynamicHandler(uuid)),
1822        );
1823    }
1824}
1825
1826#[derive(Clone, Debug)]
1827pub struct DynamicHandlerRegistration<R: Role> {
1828    uuid: Uuid,
1829    cx: ConnectionTo<R>,
1830}
1831
1832impl<R: Role> DynamicHandlerRegistration<R> {
1833    fn new(uuid: Uuid, cx: ConnectionTo<R>) -> Self {
1834        Self { uuid, cx }
1835    }
1836
1837    /// Prevents the dynamic handler from being removed when dropped.
1838    pub fn run_indefinitely(self) {
1839        std::mem::forget(self);
1840    }
1841}
1842
1843impl<R: Role> Drop for DynamicHandlerRegistration<R> {
1844    fn drop(&mut self) {
1845        self.cx.remove_dynamic_handler(self.uuid);
1846    }
1847}
1848
1849/// The context to respond to an incoming request.
1850///
1851/// This context is provided to request handlers and serves a dual role:
1852///
1853/// 1. **Respond to the request** - Use [`respond`](Self::respond) or
1854///    [`respond_with_result`](Self::respond_with_result) to send the response
1855/// 2. **Send other messages** - Use the [`ConnectionTo`] parameter passed to your
1856///    handler, which provides [`send_request`](`ConnectionTo::send_request`),
1857///    [`send_notification`](`ConnectionTo::send_notification`), and
1858///    [`spawn`](`ConnectionTo::spawn`)
1859///
1860/// # Example
1861///
1862/// ```no_run
1863/// # use agent_client_protocol_test::*;
1864/// # async fn example() -> Result<(), agent_client_protocol::Error> {
1865/// # let connection = mock_connection();
1866/// connection.on_receive_request(async |req: ProcessRequest, responder, cx| {
1867///     // Send a notification while processing
1868///     cx.send_notification(StatusUpdate {
1869///         message: "processing".into(),
1870///     })?;
1871///
1872///     // Do some work...
1873///     let result = process(&req.data)?;
1874///
1875///     // Respond to the request
1876///     responder.respond(ProcessResponse { result })
1877/// }, agent_client_protocol::on_receive_request!())
1878/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
1879/// # Ok(())
1880/// # }
1881/// ```
1882///
1883/// # Event Loop Considerations
1884///
1885/// Like all handlers, request handlers run on the event loop. Use
1886/// [`spawn`](ConnectionTo::spawn) for expensive operations to avoid blocking
1887/// the connection.
1888///
1889/// See the [Event Loop and Concurrency](Builder#event-loop-and-concurrency)
1890/// section for more details.
1891#[must_use]
1892pub struct Responder<T: JsonRpcResponse = serde_json::Value> {
1893    /// The method of the request.
1894    method: String,
1895
1896    /// The `id` of the message we are replying to.
1897    id: jsonrpcmsg::Id,
1898
1899    /// Function to send the response to its destination.
1900    ///
1901    /// For incoming requests: serializes to JSON and sends over the wire.
1902    /// For incoming responses: sends to the waiting oneshot channel.
1903    send_fn: SendBoxFnOnce<'static, (Result<T, crate::Error>,), Result<(), crate::Error>>,
1904}
1905
1906impl<T: JsonRpcResponse> std::fmt::Debug for Responder<T> {
1907    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1908        f.debug_struct("Responder")
1909            .field("method", &self.method)
1910            .field("id", &self.id)
1911            .field("response_type", &std::any::type_name::<T>())
1912            .finish_non_exhaustive()
1913    }
1914}
1915
1916impl Responder<serde_json::Value> {
1917    /// Create a new request context for an incoming request.
1918    ///
1919    /// The response will be serialized to JSON and sent over the wire.
1920    fn new(message_tx: OutgoingMessageTx, method: String, id: jsonrpcmsg::Id) -> Self {
1921        let id_clone = id.clone();
1922        Self {
1923            method,
1924            id,
1925            send_fn: SendBoxFnOnce::new(
1926                move |response: Result<serde_json::Value, crate::Error>| {
1927                    send_raw_message(
1928                        &message_tx,
1929                        OutgoingMessage::Response {
1930                            id: id_clone,
1931                            response,
1932                        },
1933                    )
1934                },
1935            ),
1936        }
1937    }
1938
1939    /// Cast this request context to a different response type.
1940    ///
1941    /// The provided type `T` will be serialized to JSON before sending.
1942    pub fn cast<T: JsonRpcResponse>(self) -> Responder<T> {
1943        self.wrap_params(move |method, value| match value {
1944            Ok(value) => T::into_json(value, method),
1945            Err(e) => Err(e),
1946        })
1947    }
1948}
1949
1950impl<T: JsonRpcResponse> Responder<T> {
1951    /// Method of the incoming request
1952    #[must_use]
1953    pub fn method(&self) -> &str {
1954        &self.method
1955    }
1956
1957    /// ID of the incoming request/response as a JSON value
1958    #[must_use]
1959    pub fn id(&self) -> serde_json::Value {
1960        crate::util::id_to_json(&self.id)
1961    }
1962
1963    /// Convert to a `Responder` that expects a JSON value
1964    /// and which checks (dynamically) that the JSON value it receives
1965    /// can be converted to `T`.
1966    pub fn erase_to_json(self) -> Responder<serde_json::Value> {
1967        self.wrap_params(|method, value| T::from_value(method, value?))
1968    }
1969
1970    /// Return a new Responder with a different method name.
1971    pub fn wrap_method(self, method: String) -> Responder<T> {
1972        Responder {
1973            method,
1974            id: self.id,
1975            send_fn: self.send_fn,
1976        }
1977    }
1978
1979    /// Return a new Responder that expects a response of type U.
1980    ///
1981    /// `wrap_fn` will be invoked with the method name and the result to transform
1982    /// type `U` into type `T` before sending.
1983    pub fn wrap_params<U: JsonRpcResponse>(
1984        self,
1985        wrap_fn: impl FnOnce(&str, Result<U, crate::Error>) -> Result<T, crate::Error> + Send + 'static,
1986    ) -> Responder<U> {
1987        let method = self.method.clone();
1988        Responder {
1989            method: self.method,
1990            id: self.id,
1991            send_fn: SendBoxFnOnce::new(move |input: Result<U, crate::Error>| {
1992                let t_value = wrap_fn(&method, input);
1993                self.send_fn.call(t_value)
1994            }),
1995        }
1996    }
1997
1998    /// Respond to the JSON-RPC request with either a value (`Ok`) or an error (`Err`).
1999    pub fn respond_with_result(
2000        self,
2001        response: Result<T, crate::Error>,
2002    ) -> Result<(), crate::Error> {
2003        tracing::debug!(id = ?self.id, "respond called");
2004        self.send_fn.call(response)
2005    }
2006
2007    /// Respond to the JSON-RPC request with a value.
2008    pub fn respond(self, response: T) -> Result<(), crate::Error> {
2009        self.respond_with_result(Ok(response))
2010    }
2011
2012    /// Respond to the JSON-RPC request with an internal error containing a message.
2013    pub fn respond_with_internal_error(self, message: impl ToString) -> Result<(), crate::Error> {
2014        self.respond_with_error(crate::util::internal_error(message))
2015    }
2016
2017    /// Respond to the JSON-RPC request with an error.
2018    pub fn respond_with_error(self, error: crate::Error) -> Result<(), crate::Error> {
2019        tracing::debug!(id = ?self.id, ?error, "respond_with_error called");
2020        self.respond_with_result(Err(error))
2021    }
2022}
2023
2024/// Context for handling an incoming JSON-RPC response.
2025///
2026/// This is the response-side counterpart to [`Responder`]. While `Responder` handles
2027/// incoming requests (where you send a response over the wire), `ResponseRouter` handles
2028/// incoming responses (where you route the response to a local task waiting for it).
2029///
2030/// Both are fundamentally "sinks" that push the message through a `send_fn`, but they
2031/// represent different points in the message lifecycle and carry different metadata.
2032#[must_use]
2033pub struct ResponseRouter<T: JsonRpcResponse = serde_json::Value> {
2034    /// The method of the original request.
2035    method: String,
2036
2037    /// The `id` of the original request.
2038    id: jsonrpcmsg::Id,
2039
2040    /// The RoleId to which the original request was sent
2041    /// (and hence from which the reply is expected).
2042    role_id: RoleId,
2043
2044    /// Function to send the response to the waiting task.
2045    send_fn: SendBoxFnOnce<'static, (Result<T, crate::Error>,), Result<(), crate::Error>>,
2046}
2047
2048impl<T: JsonRpcResponse> std::fmt::Debug for ResponseRouter<T> {
2049    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2050        f.debug_struct("ResponseRouter")
2051            .field("method", &self.method)
2052            .field("id", &self.id)
2053            .field("response_type", &std::any::type_name::<T>())
2054            .finish_non_exhaustive()
2055    }
2056}
2057
2058impl ResponseRouter<serde_json::Value> {
2059    /// Create a new response context for routing a response to a local awaiter.
2060    ///
2061    /// When `respond_with_result` is called, the response is sent through the oneshot
2062    /// channel to the code that originally sent the request.
2063    pub(crate) fn new(
2064        method: String,
2065        id: jsonrpcmsg::Id,
2066        role_id: RoleId,
2067        sender: oneshot::Sender<ResponsePayload>,
2068    ) -> Self {
2069        Self {
2070            method,
2071            id,
2072            role_id,
2073            send_fn: SendBoxFnOnce::new(
2074                move |response: Result<serde_json::Value, crate::Error>| {
2075                    sender
2076                        .send(ResponsePayload {
2077                            result: response,
2078                            ack_tx: None,
2079                        })
2080                        .map_err(|_| {
2081                            crate::util::internal_error("failed to send response, receiver dropped")
2082                        })
2083                },
2084            ),
2085        }
2086    }
2087
2088    /// Cast this response context to a different response type.
2089    ///
2090    /// The provided type `T` will be serialized to JSON before sending.
2091    pub fn cast<T: JsonRpcResponse>(self) -> ResponseRouter<T> {
2092        self.wrap_params(move |method, value| match value {
2093            Ok(value) => T::into_json(value, method),
2094            Err(e) => Err(e),
2095        })
2096    }
2097}
2098
2099impl<T: JsonRpcResponse> ResponseRouter<T> {
2100    /// Method of the original request
2101    #[must_use]
2102    pub fn method(&self) -> &str {
2103        &self.method
2104    }
2105
2106    /// ID of the original request as a JSON value
2107    #[must_use]
2108    pub fn id(&self) -> serde_json::Value {
2109        crate::util::id_to_json(&self.id)
2110    }
2111
2112    /// The peer to which the original request was sent.
2113    ///
2114    /// This is the peer from which we expect to receive the response.
2115    #[must_use]
2116    pub fn role_id(&self) -> RoleId {
2117        self.role_id.clone()
2118    }
2119
2120    /// Convert to a `ResponseRouter` that expects a JSON value
2121    /// and which checks (dynamically) that the JSON value it receives
2122    /// can be converted to `T`.
2123    pub fn erase_to_json(self) -> ResponseRouter<serde_json::Value> {
2124        self.wrap_params(|method, value| T::from_value(method, value?))
2125    }
2126
2127    /// Return a new ResponseRouter that expects a response of type U.
2128    ///
2129    /// `wrap_fn` will be invoked with the method name and the result to transform
2130    /// type `U` into type `T` before sending.
2131    fn wrap_params<U: JsonRpcResponse>(
2132        self,
2133        wrap_fn: impl FnOnce(&str, Result<U, crate::Error>) -> Result<T, crate::Error> + Send + 'static,
2134    ) -> ResponseRouter<U> {
2135        let method = self.method.clone();
2136        ResponseRouter {
2137            method: self.method,
2138            id: self.id,
2139            role_id: self.role_id,
2140            send_fn: SendBoxFnOnce::new(move |input: Result<U, crate::Error>| {
2141                let t_value = wrap_fn(&method, input);
2142                self.send_fn.call(t_value)
2143            }),
2144        }
2145    }
2146
2147    /// Complete the response by sending the result to the waiting task.
2148    pub fn respond_with_result(
2149        self,
2150        response: Result<T, crate::Error>,
2151    ) -> Result<(), crate::Error> {
2152        tracing::debug!(id = ?self.id, "response routed to awaiter");
2153        self.send_fn.call(response)
2154    }
2155
2156    /// Complete the response by sending a value to the waiting task.
2157    pub fn respond(self, response: T) -> Result<(), crate::Error> {
2158        self.respond_with_result(Ok(response))
2159    }
2160
2161    /// Complete the response by sending an internal error to the waiting task.
2162    pub fn respond_with_internal_error(self, message: impl ToString) -> Result<(), crate::Error> {
2163        self.respond_with_error(crate::util::internal_error(message))
2164    }
2165
2166    /// Complete the response by sending an error to the waiting task.
2167    pub fn respond_with_error(self, error: crate::Error) -> Result<(), crate::Error> {
2168        tracing::debug!(id = ?self.id, ?error, "error routed to awaiter");
2169        self.respond_with_result(Err(error))
2170    }
2171}
2172
2173/// Common bounds for any JSON-RPC message.
2174///
2175/// # Derive Macro
2176///
2177/// For simple message types, you can use the `JsonRpcRequest` or `JsonRpcNotification` derive macros
2178/// which will implement both `JsonRpcMessage` and the respective trait. See [`JsonRpcRequest`] and
2179/// [`JsonRpcNotification`] for examples.
2180pub trait JsonRpcMessage: 'static + Debug + Sized + Send + Clone {
2181    /// Check if this message type matches the given method name.
2182    fn matches_method(method: &str) -> bool;
2183
2184    /// The method name for the message.
2185    fn method(&self) -> &str;
2186
2187    /// Convert this message into an untyped message.
2188    fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error>;
2189
2190    /// Parse this type from a method name and parameters.
2191    ///
2192    /// Returns an error if the method doesn't match or deserialization fails.
2193    /// Callers should use `matches_method` first to check if this type handles the method.
2194    fn parse_message(method: &str, params: &impl Serialize) -> Result<Self, crate::Error>;
2195}
2196
2197/// Defines the "payload" of a successful response to a JSON-RPC request.
2198///
2199/// # Derive Macro
2200///
2201/// Use `#[derive(JsonRpcResponse)]` to automatically implement this trait:
2202///
2203/// ```ignore
2204/// use agent_client_protocol::JsonRpcResponse;
2205/// use serde::{Serialize, Deserialize};
2206///
2207/// #[derive(Debug, Serialize, Deserialize, JsonRpcResponse)]
2208/// #[response(method = "_hello")]
2209/// struct HelloResponse {
2210///     greeting: String,
2211/// }
2212/// ```
2213pub trait JsonRpcResponse: 'static + Debug + Sized + Send + Clone {
2214    /// Convert this message into a JSON value.
2215    fn into_json(self, method: &str) -> Result<serde_json::Value, crate::Error>;
2216
2217    /// Parse a JSON value into the response type.
2218    fn from_value(method: &str, value: serde_json::Value) -> Result<Self, crate::Error>;
2219}
2220
2221impl JsonRpcResponse for serde_json::Value {
2222    fn from_value(_method: &str, value: serde_json::Value) -> Result<Self, crate::Error> {
2223        Ok(value)
2224    }
2225
2226    fn into_json(self, _method: &str) -> Result<serde_json::Value, crate::Error> {
2227        Ok(self)
2228    }
2229}
2230
2231/// A struct that represents a notification (JSON-RPC message that does not expect a response).
2232///
2233/// # Derive Macro
2234///
2235/// Use `#[derive(JsonRpcNotification)]` to automatically implement both `JsonRpcMessage` and `JsonRpcNotification`:
2236///
2237/// ```ignore
2238/// use agent_client_protocol::JsonRpcNotification;
2239/// use serde::{Serialize, Deserialize};
2240///
2241/// #[derive(Debug, Clone, Serialize, Deserialize, JsonRpcNotification)]
2242/// #[notification(method = "_ping")]
2243/// struct PingNotification {
2244///     timestamp: u64,
2245/// }
2246/// ```
2247pub trait JsonRpcNotification: JsonRpcMessage {}
2248
2249/// A struct that represents a request (JSON-RPC message expecting a response).
2250///
2251/// # Derive Macro
2252///
2253/// Use `#[derive(JsonRpcRequest)]` to automatically implement both `JsonRpcMessage` and `JsonRpcRequest`:
2254///
2255/// ```ignore
2256/// use agent_client_protocol::{JsonRpcRequest, JsonRpcResponse};
2257/// use serde::{Serialize, Deserialize};
2258///
2259/// #[derive(Debug, Clone, Serialize, Deserialize, JsonRpcRequest)]
2260/// #[request(method = "_hello", response = HelloResponse)]
2261/// struct HelloRequest {
2262///     name: String,
2263/// }
2264///
2265/// #[derive(Debug, Serialize, Deserialize, JsonRpcResponse)]
2266/// struct HelloResponse {
2267///     greeting: String,
2268/// }
2269/// ```
2270pub trait JsonRpcRequest: JsonRpcMessage {
2271    /// The type of data expected in response.
2272    type Response: JsonRpcResponse;
2273}
2274
2275/// An enum capturing an in-flight request or notification.
2276/// In the case of a request, also includes the context used to respond to the request.
2277///
2278/// Type parameters allow specifying the concrete request and notification types.
2279/// By default, both are `UntypedMessage` for dynamic dispatch.
2280/// The request context's response type matches the request's response type.
2281#[derive(Debug)]
2282pub enum Dispatch<Req: JsonRpcRequest = UntypedMessage, Notif: JsonRpcMessage = UntypedMessage> {
2283    /// Incoming request and the context where the response should be sent.
2284    Request(Req, Responder<Req::Response>),
2285
2286    /// Incoming notification.
2287    Notification(Notif),
2288
2289    /// Incoming response to a request we sent.
2290    ///
2291    /// The first field is the response result (success or error from the remote).
2292    /// The second field is the context for forwarding the response to its destination
2293    /// (typically a waiting oneshot channel).
2294    Response(
2295        Result<Req::Response, crate::Error>,
2296        ResponseRouter<Req::Response>,
2297    ),
2298}
2299
2300impl<Req: JsonRpcRequest, Notif: JsonRpcMessage> Dispatch<Req, Notif> {
2301    /// Map the request and notification types to new types.
2302    ///
2303    /// Note: Response variants are passed through unchanged since they don't
2304    /// contain a parseable message payload.
2305    pub fn map<Req1, Notif1>(
2306        self,
2307        map_request: impl FnOnce(Req, Responder<Req::Response>) -> (Req1, Responder<Req1::Response>),
2308        map_notification: impl FnOnce(Notif) -> Notif1,
2309    ) -> Dispatch<Req1, Notif1>
2310    where
2311        Req1: JsonRpcRequest<Response = Req::Response>,
2312        Notif1: JsonRpcMessage,
2313    {
2314        match self {
2315            Dispatch::Request(request, responder) => {
2316                let (new_request, new_responder) = map_request(request, responder);
2317                Dispatch::Request(new_request, new_responder)
2318            }
2319            Dispatch::Notification(notification) => {
2320                let new_notification = map_notification(notification);
2321                Dispatch::Notification(new_notification)
2322            }
2323            Dispatch::Response(result, router) => Dispatch::Response(result, router),
2324        }
2325    }
2326
2327    /// Respond to the message with an error.
2328    ///
2329    /// If this message is a request, this error becomes the reply to the request.
2330    ///
2331    /// If this message is a notification, the error is sent as a notification.
2332    ///
2333    /// If this message is a response, the error is forwarded to the waiting handler.
2334    pub fn respond_with_error<R: Role>(
2335        self,
2336        error: crate::Error,
2337        cx: ConnectionTo<R>,
2338    ) -> Result<(), crate::Error> {
2339        match self {
2340            Dispatch::Request(_, responder) => responder.respond_with_error(error),
2341            Dispatch::Notification(_) => cx.send_error_notification(error),
2342            Dispatch::Response(_, responder) => responder.respond_with_error(error),
2343        }
2344    }
2345
2346    /// Convert to a `Responder` that expects a JSON value
2347    /// and which checks (dynamically) that the JSON value it receives
2348    /// can be converted to `T`.
2349    ///
2350    /// Note: Response variants cannot be erased since their payload is already
2351    /// parsed. This returns an error for Response variants.
2352    pub fn erase_to_json(self) -> Result<Dispatch, crate::Error> {
2353        match self {
2354            Dispatch::Request(response, responder) => Ok(Dispatch::Request(
2355                response.to_untyped_message()?,
2356                responder.erase_to_json(),
2357            )),
2358            Dispatch::Notification(notification) => {
2359                Ok(Dispatch::Notification(notification.to_untyped_message()?))
2360            }
2361            Dispatch::Response(_, _) => Err(crate::util::internal_error(
2362                "cannot erase Response variant to JSON",
2363            )),
2364        }
2365    }
2366
2367    /// Convert the message in self to an untyped message.
2368    ///
2369    /// Note: Response variants don't have an untyped message representation.
2370    /// This returns an error for Response variants.
2371    pub fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error> {
2372        match self {
2373            Dispatch::Request(request, _) => request.to_untyped_message(),
2374            Dispatch::Notification(notification) => notification.to_untyped_message(),
2375            Dispatch::Response(_, _) => Err(crate::util::internal_error(
2376                "Response variant has no untyped message representation",
2377            )),
2378        }
2379    }
2380
2381    /// Convert self to an untyped message context.
2382    ///
2383    /// Note: Response variants cannot be converted. This returns an error for Response variants.
2384    pub fn into_untyped_dispatch(self) -> Result<Dispatch, crate::Error> {
2385        match self {
2386            Dispatch::Request(request, responder) => Ok(Dispatch::Request(
2387                request.to_untyped_message()?,
2388                responder.erase_to_json(),
2389            )),
2390            Dispatch::Notification(notification) => {
2391                Ok(Dispatch::Notification(notification.to_untyped_message()?))
2392            }
2393            Dispatch::Response(_, _) => Err(crate::util::internal_error(
2394                "cannot convert Response variant to untyped message context",
2395            )),
2396        }
2397    }
2398
2399    /// Returns the request ID if this is a request or response, None if notification.
2400    pub fn id(&self) -> Option<serde_json::Value> {
2401        match self {
2402            Dispatch::Request(_, cx) => Some(cx.id()),
2403            Dispatch::Notification(_) => None,
2404            Dispatch::Response(_, cx) => Some(cx.id()),
2405        }
2406    }
2407
2408    /// Returns the method of the message.
2409    ///
2410    /// For requests and notifications, this is the method from the message payload.
2411    /// For responses, this is the method of the original request.
2412    pub fn method(&self) -> &str {
2413        match self {
2414            Dispatch::Request(msg, _) => msg.method(),
2415            Dispatch::Notification(msg) => msg.method(),
2416            Dispatch::Response(_, cx) => cx.method(),
2417        }
2418    }
2419}
2420
2421impl Dispatch {
2422    /// Attempts to parse `self` into a typed message context.
2423    ///
2424    /// # Returns
2425    ///
2426    /// * `Ok(Ok(typed))` if this is a request/notification of the given types
2427    /// * `Ok(Err(self))` if not
2428    /// * `Err` if has the correct method for the given types but parsing fails
2429    #[tracing::instrument(skip(self), fields(Request = ?std::any::type_name::<Req>(), Notif = ?std::any::type_name::<Notif>()), level = "trace", ret)]
2430    pub(crate) fn into_typed_dispatch<Req: JsonRpcRequest, Notif: JsonRpcNotification>(
2431        self,
2432    ) -> Result<Result<Dispatch<Req, Notif>, Dispatch>, crate::Error> {
2433        tracing::debug!(
2434            message = ?self,
2435            "into_typed_dispatch"
2436        );
2437        match self {
2438            Dispatch::Request(message, responder) => {
2439                if Req::matches_method(&message.method) {
2440                    match Req::parse_message(&message.method, &message.params) {
2441                        Ok(req) => {
2442                            tracing::trace!(?req, "parsed ok");
2443                            Ok(Ok(Dispatch::Request(req, responder.cast())))
2444                        }
2445                        Err(err) => {
2446                            tracing::trace!(?err, "parse error");
2447                            Err(err)
2448                        }
2449                    }
2450                } else {
2451                    tracing::trace!("method doesn't match");
2452                    Ok(Err(Dispatch::Request(message, responder)))
2453                }
2454            }
2455
2456            Dispatch::Notification(message) => {
2457                if Notif::matches_method(&message.method) {
2458                    match Notif::parse_message(&message.method, &message.params) {
2459                        Ok(notif) => {
2460                            tracing::trace!(?notif, "parse ok");
2461                            Ok(Ok(Dispatch::Notification(notif)))
2462                        }
2463                        Err(err) => {
2464                            tracing::trace!(?err, "parse error");
2465                            Err(err)
2466                        }
2467                    }
2468                } else {
2469                    tracing::trace!("method doesn't match");
2470                    Ok(Err(Dispatch::Notification(message)))
2471                }
2472            }
2473
2474            Dispatch::Response(result, cx) => {
2475                let method = cx.method();
2476                if Req::matches_method(method) {
2477                    // Parse the response result
2478                    let typed_result = match result {
2479                        Ok(value) => {
2480                            match <Req::Response as JsonRpcResponse>::from_value(method, value) {
2481                                Ok(parsed) => {
2482                                    tracing::trace!(?parsed, "parse ok");
2483                                    Ok(parsed)
2484                                }
2485                                Err(err) => {
2486                                    tracing::trace!(?err, "parse error");
2487                                    return Err(err);
2488                                }
2489                            }
2490                        }
2491                        Err(err) => {
2492                            tracing::trace!("error, passthrough");
2493                            Err(err)
2494                        }
2495                    };
2496                    Ok(Ok(Dispatch::Response(typed_result, cx.cast())))
2497                } else {
2498                    tracing::trace!("method doesn't match");
2499                    Ok(Err(Dispatch::Response(result, cx)))
2500                }
2501            }
2502        }
2503    }
2504
2505    /// True if this message has a field with the given name.
2506    ///
2507    /// Returns `false` for Response variants.
2508    #[must_use]
2509    pub fn has_field(&self, field_name: &str) -> bool {
2510        self.message()
2511            .and_then(|m| m.params().get(field_name))
2512            .is_some()
2513    }
2514
2515    /// Returns true if this message has a session-id field.
2516    ///
2517    /// Returns `false` for Response variants.
2518    pub(crate) fn has_session_id(&self) -> bool {
2519        self.has_field("sessionId")
2520    }
2521
2522    /// Extract the ACP session-id from this message (if any).
2523    ///
2524    /// Returns `Ok(None)` for Response variants.
2525    pub(crate) fn get_session_id(&self) -> Result<Option<SessionId>, crate::Error> {
2526        let Some(message) = self.message() else {
2527            return Ok(None);
2528        };
2529        let Some(value) = message.params().get("sessionId") else {
2530            return Ok(None);
2531        };
2532        let session_id = serde_json::from_value(value.clone())?;
2533        Ok(Some(session_id))
2534    }
2535
2536    /// Try to parse this as a notification of the given type.
2537    ///
2538    /// # Returns
2539    ///
2540    /// * `Ok(Ok(typed))` if this is a request/notification of the given types
2541    /// * `Ok(Err(self))` if not
2542    /// * `Err` if has the correct method for the given types but parsing fails
2543    pub fn into_notification<N: JsonRpcNotification>(
2544        self,
2545    ) -> Result<Result<N, Dispatch>, crate::Error> {
2546        match self {
2547            Dispatch::Notification(msg) => {
2548                if !N::matches_method(&msg.method) {
2549                    return Ok(Err(Dispatch::Notification(msg)));
2550                }
2551                match N::parse_message(&msg.method, &msg.params) {
2552                    Ok(n) => Ok(Ok(n)),
2553                    Err(err) => Err(err),
2554                }
2555            }
2556            Dispatch::Request(..) | Dispatch::Response(..) => Ok(Err(self)),
2557        }
2558    }
2559
2560    /// Try to parse this as a request of the given type.
2561    ///
2562    /// # Returns
2563    ///
2564    /// * `Ok(Ok(typed))` if this is a request/notification of the given types
2565    /// * `Ok(Err(self))` if not
2566    /// * `Err` if has the correct method for the given types but parsing fails
2567    pub fn into_request<Req: JsonRpcRequest>(
2568        self,
2569    ) -> Result<Result<(Req, Responder<Req::Response>), Dispatch>, crate::Error> {
2570        match self {
2571            Dispatch::Request(msg, responder) => {
2572                if !Req::matches_method(&msg.method) {
2573                    return Ok(Err(Dispatch::Request(msg, responder)));
2574                }
2575                match Req::parse_message(&msg.method, &msg.params) {
2576                    Ok(req) => Ok(Ok((req, responder.cast()))),
2577                    Err(err) => Err(err),
2578                }
2579            }
2580            Dispatch::Notification(..) | Dispatch::Response(..) => Ok(Err(self)),
2581        }
2582    }
2583}
2584
2585impl<M: JsonRpcRequest + JsonRpcNotification> Dispatch<M, M> {
2586    /// Returns the message payload for requests and notifications.
2587    ///
2588    /// Returns `None` for Response variants since they don't contain a message payload.
2589    pub fn message(&self) -> Option<&M> {
2590        match self {
2591            Dispatch::Request(msg, _) | Dispatch::Notification(msg) => Some(msg),
2592            Dispatch::Response(_, _) => None,
2593        }
2594    }
2595
2596    /// Map the request/notification message.
2597    ///
2598    /// Response variants pass through unchanged.
2599    pub(crate) fn try_map_message(
2600        self,
2601        map_message: impl FnOnce(M) -> Result<M, crate::Error>,
2602    ) -> Result<Dispatch<M, M>, crate::Error> {
2603        match self {
2604            Dispatch::Request(request, cx) => Ok(Dispatch::Request(map_message(request)?, cx)),
2605            Dispatch::Notification(notification) => {
2606                Ok(Dispatch::<M, M>::Notification(map_message(notification)?))
2607            }
2608            Dispatch::Response(result, cx) => Ok(Dispatch::Response(result, cx)),
2609        }
2610    }
2611}
2612
2613/// An incoming JSON message without any typing. Can be a request or a notification.
2614#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
2615pub struct UntypedMessage {
2616    /// The JSON-RPC method name
2617    pub method: String,
2618    /// The JSON-RPC parameters as a raw JSON value
2619    pub params: serde_json::Value,
2620}
2621
2622impl UntypedMessage {
2623    /// Returns an untyped message with the given method and parameters.
2624    pub fn new(method: &str, params: impl Serialize) -> Result<Self, crate::Error> {
2625        let params = serde_json::to_value(params)?;
2626        Ok(Self {
2627            method: method.to_string(),
2628            params,
2629        })
2630    }
2631
2632    /// Returns the method name
2633    #[must_use]
2634    pub fn method(&self) -> &str {
2635        &self.method
2636    }
2637
2638    /// Returns the parameters as a JSON value
2639    #[must_use]
2640    pub fn params(&self) -> &serde_json::Value {
2641        &self.params
2642    }
2643
2644    /// Consumes this message and returns the method and params
2645    #[must_use]
2646    pub fn into_parts(self) -> (String, serde_json::Value) {
2647        (self.method, self.params)
2648    }
2649
2650    /// Convert `self` to a JSON-RPC message.
2651    pub(crate) fn into_jsonrpc_msg(
2652        self,
2653        id: Option<jsonrpcmsg::Id>,
2654    ) -> Result<jsonrpcmsg::Request, crate::Error> {
2655        let Self { method, params } = self;
2656        Ok(jsonrpcmsg::Request::new_v2(method, json_cast(params)?, id))
2657    }
2658}
2659
2660impl JsonRpcMessage for UntypedMessage {
2661    fn matches_method(_method: &str) -> bool {
2662        // UntypedMessage matches any method - it's the untyped fallback
2663        true
2664    }
2665
2666    fn method(&self) -> &str {
2667        &self.method
2668    }
2669
2670    fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error> {
2671        Ok(self.clone())
2672    }
2673
2674    fn parse_message(method: &str, params: &impl Serialize) -> Result<Self, crate::Error> {
2675        UntypedMessage::new(method, params)
2676    }
2677}
2678
2679impl JsonRpcRequest for UntypedMessage {
2680    type Response = serde_json::Value;
2681}
2682
2683impl JsonRpcNotification for UntypedMessage {}
2684
2685/// Represents a pending response of type `R` from an outgoing request.
2686///
2687/// Returned by [`ConnectionTo::send_request`], this type provides methods for handling
2688/// the response without blocking the event loop. The API is intentionally designed to make
2689/// it difficult to accidentally block.
2690///
2691/// # Anti-Footgun Design
2692///
2693/// You cannot directly `.await` a `SentRequest`. Instead, you must choose how to handle
2694/// the response:
2695///
2696/// ## Option 1: Schedule a Callback (Safe in Handlers)
2697///
2698/// Use [`on_receiving_result`](Self::on_receiving_result) to schedule a task
2699/// that runs when the response arrives. This doesn't block the event loop:
2700///
2701/// ```no_run
2702/// # use agent_client_protocol_test::*;
2703/// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
2704/// cx.send_request(MyRequest {})
2705///     .on_receiving_result(async |result| {
2706///         match result {
2707///             Ok(response) => {
2708///                 // Handle successful response
2709///                 Ok(())
2710///             }
2711///             Err(error) => {
2712///                 // Handle error
2713///                 Err(error)
2714///             }
2715///         }
2716///     })?;
2717/// # Ok(())
2718/// # }
2719/// ```
2720///
2721/// ## Option 2: Block in a Spawned Task (Safe Only in `spawn`)
2722///
2723/// Use [`block_task`](Self::block_task) to block until the response arrives, but **only**
2724/// in a spawned task (never in a handler):
2725///
2726/// ```no_run
2727/// # use agent_client_protocol_test::*;
2728/// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
2729/// // ✅ Safe: Spawned task runs concurrently
2730/// cx.spawn({
2731///     let cx = cx.clone();
2732///     async move {
2733///         let response = cx.send_request(MyRequest {})
2734///             .block_task()
2735///             .await?;
2736///         // Process response...
2737///         Ok(())
2738///     }
2739/// })?;
2740/// # Ok(())
2741/// # }
2742/// ```
2743///
2744/// ```no_run
2745/// # use agent_client_protocol_test::*;
2746/// # async fn example() -> Result<(), agent_client_protocol::Error> {
2747/// # let connection = mock_connection();
2748/// // ❌ NEVER do this in a handler - blocks the event loop!
2749/// connection.on_receive_request(async |req: MyRequest, responder, cx| {
2750///     let response = cx.send_request(MyRequest {})
2751///         .block_task()  // This will deadlock!
2752///         .await?;
2753///     responder.respond(response)
2754/// }, agent_client_protocol::on_receive_request!())
2755/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
2756/// # Ok(())
2757/// # }
2758/// ```
2759///
2760/// # Why This Design?
2761///
2762/// If you block the event loop while waiting for a response, the connection cannot process
2763/// the incoming response message, creating a deadlock. This API design prevents that footgun
2764/// by making blocking explicit and encouraging non-blocking patterns.
2765pub struct SentRequest<T> {
2766    id: jsonrpcmsg::Id,
2767    method: String,
2768    task_tx: TaskTx,
2769    response_rx: oneshot::Receiver<ResponsePayload>,
2770    to_result: Box<dyn Fn(serde_json::Value) -> Result<T, crate::Error> + Send>,
2771}
2772
2773impl<T: Debug> Debug for SentRequest<T> {
2774    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2775        f.debug_struct("SentRequest")
2776            .field("id", &self.id)
2777            .field("method", &self.method)
2778            .field("task_tx", &self.task_tx)
2779            .field("response_rx", &self.response_rx)
2780            .finish_non_exhaustive()
2781    }
2782}
2783
2784impl SentRequest<serde_json::Value> {
2785    fn new(
2786        id: jsonrpcmsg::Id,
2787        method: String,
2788        task_tx: mpsc::UnboundedSender<Task>,
2789        response_rx: oneshot::Receiver<ResponsePayload>,
2790    ) -> Self {
2791        Self {
2792            id,
2793            method,
2794            response_rx,
2795            task_tx,
2796            to_result: Box::new(Ok),
2797        }
2798    }
2799}
2800
2801impl<T: JsonRpcResponse> SentRequest<T> {
2802    /// The id of the outgoing request.
2803    #[must_use]
2804    pub fn id(&self) -> serde_json::Value {
2805        crate::util::id_to_json(&self.id)
2806    }
2807
2808    /// The method of the request this is in response to.
2809    #[must_use]
2810    pub fn method(&self) -> &str {
2811        &self.method
2812    }
2813
2814    /// Create a new response that maps the result of the response to a new type.
2815    pub fn map<U>(
2816        self,
2817        map_fn: impl Fn(T) -> Result<U, crate::Error> + 'static + Send,
2818    ) -> SentRequest<U> {
2819        SentRequest {
2820            id: self.id,
2821            method: self.method,
2822            response_rx: self.response_rx,
2823            task_tx: self.task_tx,
2824            to_result: Box::new(move |value| map_fn((self.to_result)(value)?)),
2825        }
2826    }
2827
2828    /// Forward the response (success or error) to a request context when it arrives.
2829    ///
2830    /// This is a convenience method for proxying messages between connections. When the
2831    /// response arrives, it will be automatically sent to the provided request context,
2832    /// whether it's a successful response or an error.
2833    ///
2834    /// # Example: Proxying requests
2835    ///
2836    /// ```
2837    /// # use agent_client_protocol::UntypedRole;
2838    /// # use agent_client_protocol::{Builder, ConnectionTo};
2839    /// # use agent_client_protocol_test::*;
2840    /// # async fn example(cx: ConnectionTo<UntypedRole>) -> Result<(), agent_client_protocol::Error> {
2841    /// // Set up backend connection builder
2842    /// let backend = UntypedRole.builder()
2843    ///     .on_receive_request(async |req: MyRequest, responder, cx| {
2844    ///         responder.respond(MyResponse { status: "ok".into() })
2845    ///     }, agent_client_protocol::on_receive_request!());
2846    ///
2847    /// // Spawn backend and get a context to send to it
2848    /// let backend_connection = cx.spawn_connection(backend, MockTransport)?;
2849    ///
2850    /// // Set up proxy that forwards requests to backend
2851    /// UntypedRole.builder()
2852    ///     .on_receive_request({
2853    ///         let backend_connection = backend_connection.clone();
2854    ///         async move |req: MyRequest, responder, cx| {
2855    ///             // Forward the request to backend and proxy the response back
2856    ///             backend_connection.send_request(req)
2857    ///                 .forward_response_to(responder)?;
2858    ///             Ok(())
2859    ///         }
2860    ///     }, agent_client_protocol::on_receive_request!());
2861    /// # Ok(())
2862    /// # }
2863    /// ```
2864    ///
2865    /// # Type Safety
2866    ///
2867    /// The request context's response type must match the request's response type,
2868    /// ensuring type-safe message forwarding.
2869    ///
2870    /// # When to Use
2871    ///
2872    /// Use this when:
2873    /// - You're implementing a proxy or gateway pattern
2874    /// - You want to forward responses without processing them
2875    /// - The response types match between the outgoing request and incoming request
2876    ///
2877    /// This is equivalent to calling `on_receiving_result` and manually forwarding
2878    /// the result, but more concise.
2879    pub fn forward_response_to(self, responder: Responder<T>) -> Result<(), crate::Error>
2880    where
2881        T: Send,
2882    {
2883        self.on_receiving_result(async move |result| responder.respond_with_result(result))
2884    }
2885
2886    /// Block the current task until the response is received.
2887    ///
2888    /// **Warning:** This method blocks the current async task. It is **only safe** to use
2889    /// in spawned tasks created with [`ConnectionTo::spawn`]. Using it directly in a
2890    /// handler callback will deadlock the connection.
2891    ///
2892    /// # Safe Usage (in spawned tasks)
2893    ///
2894    /// ```no_run
2895    /// # use agent_client_protocol_test::*;
2896    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
2897    /// # let connection = mock_connection();
2898    /// connection.on_receive_request(async |req: MyRequest, responder, cx| {
2899    ///     // Spawn a task to handle the request
2900    ///     cx.spawn({
2901    ///         let connection = cx.clone();
2902    ///         async move {
2903    ///             // Safe: We're in a spawned task, not blocking the event loop
2904    ///             let response = connection.send_request(OtherRequest {})
2905    ///                 .block_task()
2906    ///                 .await?;
2907    ///
2908    ///             // Process the response...
2909    ///             Ok(())
2910    ///         }
2911    ///     })?;
2912    ///
2913    ///     // Respond immediately
2914    ///     responder.respond(MyResponse { status: "ok".into() })
2915    /// }, agent_client_protocol::on_receive_request!())
2916    /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
2917    /// # Ok(())
2918    /// # }
2919    /// ```
2920    ///
2921    /// # Unsafe Usage (in handlers - will deadlock!)
2922    ///
2923    /// ```no_run
2924    /// # use agent_client_protocol_test::*;
2925    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
2926    /// # let connection = mock_connection();
2927    /// connection.on_receive_request(async |req: MyRequest, responder, cx| {
2928    ///     // ❌ DEADLOCK: Handler blocks event loop, which can't process the response
2929    ///     let response = cx.send_request(OtherRequest {})
2930    ///         .block_task()
2931    ///         .await?;
2932    ///
2933    ///     responder.respond(MyResponse { status: response.value })
2934    /// }, agent_client_protocol::on_receive_request!())
2935    /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
2936    /// # Ok(())
2937    /// # }
2938    /// ```
2939    ///
2940    /// # When to Use
2941    ///
2942    /// Use this method when:
2943    /// - You're in a spawned task (via [`ConnectionTo::spawn`])
2944    /// - You need the response value to proceed with your logic
2945    /// - Linear control flow is more natural than callbacks
2946    ///
2947    /// For handler callbacks, use [`on_receiving_result`](Self::on_receiving_result) instead.
2948    pub async fn block_task(self) -> Result<T, crate::Error>
2949    where
2950        T: Send,
2951    {
2952        match self.response_rx.await {
2953            Ok(ResponsePayload {
2954                result: Ok(json_value),
2955                ack_tx,
2956            }) => {
2957                // Ack immediately - we're in a spawned task, so the dispatch loop
2958                // can continue while we process the value.
2959                if let Some(tx) = ack_tx {
2960                    let _ = tx.send(());
2961                }
2962                match (self.to_result)(json_value) {
2963                    Ok(value) => Ok(value),
2964                    Err(err) => Err(err),
2965                }
2966            }
2967            Ok(ResponsePayload {
2968                result: Err(err),
2969                ack_tx,
2970            }) => {
2971                if let Some(tx) = ack_tx {
2972                    let _ = tx.send(());
2973                }
2974                Err(err)
2975            }
2976            Err(err) => Err(crate::util::internal_error(format!(
2977                "response to `{}` never received: {}",
2978                self.method, err
2979            ))),
2980        }
2981    }
2982
2983    /// Schedule an async task to run when a successful response is received.
2984    ///
2985    /// This is a convenience wrapper around [`on_receiving_result`](Self::on_receiving_result)
2986    /// for the common pattern of forwarding errors to a request context while only processing
2987    /// successful responses.
2988    ///
2989    /// # Behavior
2990    ///
2991    /// - If the response is `Ok(value)`, your task receives the value and the request context
2992    /// - If the response is `Err(error)`, the error is automatically sent to `responder`
2993    ///   and your task is not called
2994    ///
2995    /// # Example: Chaining requests
2996    ///
2997    /// ```no_run
2998    /// # use agent_client_protocol_test::*;
2999    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
3000    /// # let connection = mock_connection();
3001    /// connection.on_receive_request(async |req: ValidateRequest, responder, cx| {
3002    ///     // Send initial request
3003    ///     cx.send_request(ValidateRequest { data: req.data.clone() })
3004    ///         .on_receiving_ok_result(responder, async |validation, responder| {
3005    ///             // Only runs if validation succeeded
3006    ///             if validation.is_valid {
3007    ///                 // Respond to original request
3008    ///                 responder.respond(ValidateResponse { is_valid: true, error: None })
3009    ///             } else {
3010    ///                 responder.respond_with_error(agent_client_protocol::util::internal_error("validation failed"))
3011    ///             }
3012    ///         })?;
3013    ///
3014    ///     Ok(())
3015    /// }, agent_client_protocol::on_receive_request!())
3016    /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
3017    /// # Ok(())
3018    /// # }
3019    /// ```
3020    ///
3021    /// # Ordering
3022    ///
3023    /// Like [`on_receiving_result`](Self::on_receiving_result), the callback blocks the
3024    /// dispatch loop until it completes. See the [`ordering`](crate::concepts::ordering) module
3025    /// for details.
3026    ///
3027    /// # When to Use
3028    ///
3029    /// Use this when:
3030    /// - You need to respond to a request based on another request's result
3031    /// - You want errors to automatically propagate to the request context
3032    /// - You only care about the success case
3033    ///
3034    /// For more control over error handling, use [`on_receiving_result`](Self::on_receiving_result).
3035    #[track_caller]
3036    pub fn on_receiving_ok_result<F>(
3037        self,
3038        responder: Responder<T>,
3039        task: impl FnOnce(T, Responder<T>) -> F + 'static + Send,
3040    ) -> Result<(), crate::Error>
3041    where
3042        F: Future<Output = Result<(), crate::Error>> + 'static + Send,
3043        T: Send,
3044    {
3045        self.on_receiving_result(async move |result| match result {
3046            Ok(value) => task(value, responder).await,
3047            Err(err) => responder.respond_with_error(err),
3048        })
3049    }
3050
3051    /// Schedule an async task to run when the response is received.
3052    ///
3053    /// This is the recommended way to handle responses in handler callbacks, as it doesn't
3054    /// block the event loop. The task will be spawned automatically when the response arrives.
3055    ///
3056    /// # Example: Handle response in callback
3057    ///
3058    /// ```no_run
3059    /// # use agent_client_protocol_test::*;
3060    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
3061    /// # let connection = mock_connection();
3062    /// connection.on_receive_request(async |req: MyRequest, responder, cx| {
3063    ///     // Send a request and schedule a callback for the response
3064    ///     cx.send_request(QueryRequest { id: 22 })
3065    ///         .on_receiving_result({
3066    ///             let connection = cx.clone();
3067    ///             async move |result| {
3068    ///                 match result {
3069    ///                     Ok(response) => {
3070    ///                         println!("Got response: {:?}", response);
3071    ///                         // Can send more messages here
3072    ///                         connection.send_notification(QueryComplete {})?;
3073    ///                         Ok(())
3074    ///                 }
3075    ///                     Err(error) => {
3076    ///                         eprintln!("Request failed: {}", error);
3077    ///                         Err(error)
3078    ///                     }
3079    ///                 }
3080    ///             }
3081    ///         })?;
3082    ///
3083    ///     // Handler continues immediately without waiting
3084    ///     responder.respond(MyResponse { status: "processing".into() })
3085    /// }, agent_client_protocol::on_receive_request!())
3086    /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
3087    /// # Ok(())
3088    /// # }
3089    /// ```
3090    ///
3091    /// # Ordering
3092    ///
3093    /// The callback runs as a spawned task, but the dispatch loop waits for it to complete
3094    /// before processing the next message. This gives you ordering guarantees: no other
3095    /// messages will be processed while your callback runs.
3096    ///
3097    /// This differs from [`block_task`](Self::block_task), which signals completion immediately
3098    /// upon receiving the response (before your code processes it).
3099    ///
3100    /// See the [`ordering`](crate::concepts::ordering) module for details on ordering guarantees
3101    /// and how to avoid deadlocks.
3102    ///
3103    /// # Error Handling
3104    ///
3105    /// If the scheduled task returns `Err`, the entire server will shut down. Make sure to handle
3106    /// errors appropriately within your task.
3107    ///
3108    /// # When to Use
3109    ///
3110    /// Use this method when:
3111    /// - You're in a handler callback (not a spawned task)
3112    /// - You want ordering guarantees (no other messages processed during your callback)
3113    /// - You need to do async work before "releasing" control back to the dispatch loop
3114    ///
3115    /// For spawned tasks where you don't need ordering guarantees, consider [`block_task`](Self::block_task).
3116    #[track_caller]
3117    pub fn on_receiving_result<F>(
3118        self,
3119        task: impl FnOnce(Result<T, crate::Error>) -> F + 'static + Send,
3120    ) -> Result<(), crate::Error>
3121    where
3122        F: Future<Output = Result<(), crate::Error>> + 'static + Send,
3123        T: Send,
3124    {
3125        let task_tx = self.task_tx.clone();
3126        let method = self.method;
3127        let response_rx = self.response_rx;
3128        let to_result = self.to_result;
3129        let location = Location::caller();
3130
3131        Task::new(location, async move {
3132            match response_rx.await {
3133                Ok(ResponsePayload { result, ack_tx }) => {
3134                    // Convert the result using to_result for Ok values
3135                    let typed_result = match result {
3136                        Ok(json_value) => to_result(json_value),
3137                        Err(err) => Err(err),
3138                    };
3139
3140                    // Run the user's callback
3141                    let outcome = task(typed_result).await;
3142
3143                    // Ack AFTER the callback completes - this is the key difference
3144                    // from block_task. The dispatch loop waits for this ack.
3145                    if let Some(tx) = ack_tx {
3146                        let _ = tx.send(());
3147                    }
3148
3149                    outcome
3150                }
3151                Err(err) => Err(crate::util::internal_error(format!(
3152                    "response to `{method}` never received: {err}"
3153                ))),
3154            }
3155        })
3156        .spawn(&task_tx)
3157    }
3158}
3159
3160// ============================================================================
3161// IntoJrConnectionTransport Implementations
3162// ============================================================================
3163
3164/// A component that communicates over line streams.
3165///
3166/// `Lines` implements the [`ConnectTo`] trait for any pair of line-based streams
3167/// (a `Stream<Item = io::Result<String>>` for incoming and a `Sink<String>` for outgoing),
3168/// handling serialization of JSON-RPC messages to/from newline-delimited JSON.
3169///
3170/// This is a lower-level primitive than [`ByteStreams`] that enables interception and
3171/// transformation of individual lines before they are parsed or after they are serialized.
3172/// This is particularly useful for debugging, logging, or implementing custom line-based
3173/// protocols.
3174///
3175/// # Use Cases
3176///
3177/// - **Line-by-line logging**: Intercept and log each line before parsing
3178/// - **Custom protocols**: Transform lines before/after JSON-RPC processing
3179/// - **Debugging**: Inspect raw message strings
3180/// - **Line filtering**: Skip or modify specific messages
3181///
3182/// Most users should use [`ByteStreams`] instead, which provides a simpler interface
3183/// for byte-based I/O.
3184///
3185/// [`ConnectTo`]: crate::ConnectTo
3186#[derive(Debug)]
3187pub struct Lines<OutgoingSink, IncomingStream> {
3188    /// Outgoing line sink (where we write serialized JSON-RPC messages)
3189    pub outgoing: OutgoingSink,
3190    /// Incoming line stream (where we read and parse JSON-RPC messages)
3191    pub incoming: IncomingStream,
3192}
3193
3194impl<OutgoingSink, IncomingStream> Lines<OutgoingSink, IncomingStream>
3195where
3196    OutgoingSink: futures::Sink<String, Error = std::io::Error> + Send + 'static,
3197    IncomingStream: futures::Stream<Item = std::io::Result<String>> + Send + 'static,
3198{
3199    /// Create a new line stream transport.
3200    pub fn new(outgoing: OutgoingSink, incoming: IncomingStream) -> Self {
3201        Self { outgoing, incoming }
3202    }
3203}
3204
3205impl<OutgoingSink, IncomingStream, R: Role> ConnectTo<R> for Lines<OutgoingSink, IncomingStream>
3206where
3207    OutgoingSink: futures::Sink<String, Error = std::io::Error> + Send + 'static,
3208    IncomingStream: futures::Stream<Item = std::io::Result<String>> + Send + 'static,
3209{
3210    async fn connect_to(self, client: impl ConnectTo<R::Counterpart>) -> Result<(), crate::Error> {
3211        let (channel, serve_self) = ConnectTo::<R>::into_channel_and_future(self);
3212        match futures::future::select(Box::pin(client.connect_to(channel)), serve_self).await {
3213            Either::Left((result, _)) | Either::Right((result, _)) => result,
3214        }
3215    }
3216
3217    fn into_channel_and_future(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
3218        let Self { outgoing, incoming } = self;
3219
3220        // Create a channel pair for the client to use
3221        let (channel_for_caller, channel_for_lines) = Channel::duplex();
3222
3223        // Create the server future that runs the line stream actors
3224        let server_future = Box::pin(async move {
3225            let Channel { rx, tx } = channel_for_lines;
3226
3227            // Run both actors concurrently
3228            let outgoing_future = transport_actor::transport_outgoing_lines_actor(rx, outgoing);
3229            let incoming_future = transport_actor::transport_incoming_lines_actor(incoming, tx);
3230
3231            // Wait for both to complete
3232            futures::try_join!(outgoing_future, incoming_future)?;
3233
3234            Ok(())
3235        });
3236
3237        (channel_for_caller, server_future)
3238    }
3239}
3240
3241/// A component that communicates over byte streams (stdin/stdout, sockets, pipes, etc.).
3242///
3243/// `ByteStreams` implements the [`ConnectTo`] trait for any pair of `AsyncRead` and `AsyncWrite`
3244/// streams, handling serialization of JSON-RPC messages to/from newline-delimited JSON.
3245/// This is the standard way to communicate with external processes or network connections.
3246///
3247/// # Use Cases
3248///
3249/// - **Stdio communication**: Connect to agents or proxies via stdin/stdout
3250/// - **Network sockets**: TCP, Unix domain sockets, or other stream-based protocols
3251/// - **Named pipes**: Cross-process communication on the same machine
3252/// - **File I/O**: Reading from and writing to file descriptors
3253///
3254/// # Example
3255///
3256/// Connecting to an agent via stdio:
3257///
3258/// ```no_run
3259/// use agent_client_protocol::UntypedRole;
3260/// # use agent_client_protocol::{ByteStreams};
3261/// use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
3262///
3263/// # async fn example() -> Result<(), agent_client_protocol::Error> {
3264/// let component = ByteStreams::new(
3265///     tokio::io::stdout().compat_write(),
3266///     tokio::io::stdin().compat(),
3267/// );
3268///
3269/// // Use as a component in a connection
3270/// agent_client_protocol::UntypedRole.builder()
3271///     .name("my-client")
3272///     .connect_to(component)
3273///     .await?;
3274/// # Ok(())
3275/// # }
3276/// ```
3277///
3278/// [`ConnectTo`]: crate::ConnectTo
3279#[derive(Debug)]
3280pub struct ByteStreams<OB, IB> {
3281    /// Outgoing byte stream (where we write serialized messages)
3282    pub outgoing: OB,
3283    /// Incoming byte stream (where we read and parse messages)
3284    pub incoming: IB,
3285}
3286
3287impl<OB, IB> ByteStreams<OB, IB>
3288where
3289    OB: AsyncWrite + Send + 'static,
3290    IB: AsyncRead + Send + 'static,
3291{
3292    /// Create a new byte stream transport.
3293    pub fn new(outgoing: OB, incoming: IB) -> Self {
3294        Self { outgoing, incoming }
3295    }
3296}
3297
3298impl<OB, IB, R: Role> ConnectTo<R> for ByteStreams<OB, IB>
3299where
3300    OB: AsyncWrite + Send + 'static,
3301    IB: AsyncRead + Send + 'static,
3302{
3303    async fn connect_to(self, client: impl ConnectTo<R::Counterpart>) -> Result<(), crate::Error> {
3304        let (channel, serve_self) = ConnectTo::<R>::into_channel_and_future(self);
3305        match futures::future::select(pin!(client.connect_to(channel)), serve_self).await {
3306            Either::Left((result, _)) | Either::Right((result, _)) => result,
3307        }
3308    }
3309
3310    fn into_channel_and_future(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
3311        use futures::AsyncBufReadExt;
3312        use futures::AsyncWriteExt;
3313        use futures::io::BufReader;
3314        let Self { outgoing, incoming } = self;
3315
3316        // Convert byte streams to line streams
3317        // Box both streams to satisfy Unpin requirements
3318        let incoming_lines = Box::pin(BufReader::new(incoming).lines());
3319
3320        // Create a sink that writes lines (with newlines) to the outgoing byte stream
3321        // We need to Box the writer since it may not be Unpin
3322        let outgoing_sink =
3323            futures::sink::unfold(Box::pin(outgoing), async move |mut writer, line: String| {
3324                let mut bytes = line.into_bytes();
3325                bytes.push(b'\n');
3326                writer.write_all(&bytes).await?;
3327                Ok::<_, std::io::Error>(writer)
3328            });
3329
3330        // Delegate to Lines component
3331        ConnectTo::<R>::into_channel_and_future(Lines::new(outgoing_sink, incoming_lines))
3332    }
3333}
3334
3335/// A channel endpoint representing one side of a bidirectional message channel.
3336///
3337/// `Channel` represents a single endpoint's view of a bidirectional communication channel.
3338/// Each endpoint has:
3339/// - `rx`: A receiver for incoming messages (or errors) from the counterpart
3340/// - `tx`: A sender for outgoing messages (or errors) to the counterpart
3341///
3342/// # Example
3343///
3344/// ```no_run
3345/// # use agent_client_protocol::UntypedRole;
3346/// # use agent_client_protocol::{Channel, Builder};
3347/// # async fn example() -> Result<(), agent_client_protocol::Error> {
3348/// // Create a pair of connected channels
3349/// let (channel_a, channel_b) = Channel::duplex();
3350///
3351/// // Each channel can be used by a different component
3352/// UntypedRole.builder()
3353///     .name("connection-a")
3354///     .connect_to(channel_a)
3355///     .await?;
3356/// # Ok(())
3357/// # }
3358/// ```
3359#[derive(Debug)]
3360pub struct Channel {
3361    /// Receives messages (or errors) from the counterpart.
3362    pub rx: mpsc::UnboundedReceiver<Result<jsonrpcmsg::Message, crate::Error>>,
3363    /// Sends messages (or errors) to the counterpart.
3364    pub tx: mpsc::UnboundedSender<Result<jsonrpcmsg::Message, crate::Error>>,
3365}
3366
3367impl Channel {
3368    /// Create a pair of connected channel endpoints.
3369    ///
3370    /// Returns two `Channel` instances that are connected to each other:
3371    /// - Messages sent via `channel_a.tx` are received on `channel_b.rx`
3372    /// - Messages sent via `channel_b.tx` are received on `channel_a.rx`
3373    ///
3374    /// # Returns
3375    ///
3376    /// A tuple `(channel_a, channel_b)` of connected channel endpoints.
3377    #[must_use]
3378    pub fn duplex() -> (Self, Self) {
3379        // Create channels: A sends Result<Message> which B receives as Message
3380        let (a_tx, b_rx) = mpsc::unbounded();
3381        let (b_tx, a_rx) = mpsc::unbounded();
3382
3383        let channel_a = Self { rx: a_rx, tx: a_tx };
3384        let channel_b = Self { rx: b_rx, tx: b_tx };
3385
3386        (channel_a, channel_b)
3387    }
3388
3389    /// Copy messages from `rx` to `tx`.
3390    ///
3391    /// # Returns
3392    ///
3393    /// A `Result` indicating success or failure.
3394    pub async fn copy(mut self) -> Result<(), crate::Error> {
3395        while let Some(msg) = self.rx.next().await {
3396            self.tx
3397                .unbounded_send(msg)
3398                .map_err(crate::util::internal_error)?;
3399        }
3400        Ok(())
3401    }
3402}
3403
3404impl<R: Role> ConnectTo<R> for Channel {
3405    async fn connect_to(self, client: impl ConnectTo<R::Counterpart>) -> Result<(), crate::Error> {
3406        let (client_channel, client_serve) = client.into_channel_and_future();
3407
3408        match futures::try_join!(
3409            Channel {
3410                rx: client_channel.rx,
3411                tx: self.tx
3412            }
3413            .copy(),
3414            Channel {
3415                rx: self.rx,
3416                tx: client_channel.tx
3417            }
3418            .copy(),
3419            client_serve
3420        ) {
3421            Ok(((), (), ())) => Ok(()),
3422            Err(err) => Err(err),
3423        }
3424    }
3425
3426    fn into_channel_and_future(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
3427        (self, Box::pin(future::ready(Ok(()))))
3428    }
3429}