Skip to main content

agent_client_protocol/
jsonrpc.rs

1//! Core JSON-RPC server support.
2
3use agent_client_protocol_schema::SessionId;
4// Re-export jsonrpcmsg for use in public API
5pub use jsonrpcmsg;
6
7// Types re-exported from crate root
8use serde::{Deserialize, Serialize};
9use std::any::TypeId;
10use std::fmt::Debug;
11use std::panic::Location;
12use std::pin::pin;
13use uuid::Uuid;
14
15use futures::channel::{mpsc, oneshot};
16use futures::future::{self, BoxFuture, Either};
17use futures::{AsyncRead, AsyncWrite, StreamExt};
18
19mod dynamic_handler;
20pub(crate) mod handlers;
21mod incoming_actor;
22mod outgoing_actor;
23mod protocol_compat;
24pub(crate) mod run;
25mod task_actor;
26mod transport_actor;
27
28use crate::jsonrpc::dynamic_handler::DynamicHandlerMessage;
29pub use crate::jsonrpc::handlers::NullHandler;
30use crate::jsonrpc::handlers::{ChainedHandler, NamedHandler};
31use crate::jsonrpc::handlers::{MessageHandler, NotificationHandler, RequestHandler};
32use crate::jsonrpc::outgoing_actor::{OutgoingMessageTx, send_raw_message};
33use crate::jsonrpc::protocol_compat::{ProtocolCompat, ProtocolMode};
34use crate::jsonrpc::run::SpawnedRun;
35use crate::jsonrpc::run::{ChainRun, NullRun, RunWithConnectionTo};
36use crate::jsonrpc::task_actor::{Task, TaskTx};
37use crate::mcp_server::McpServer;
38use crate::role::HasPeer;
39use crate::role::Role;
40use crate::util::json_cast;
41use crate::{Agent, Client, ConnectTo, RoleId};
42
43/// Handlers process incoming JSON-RPC messages on a connection.
44///
45/// When messages arrive, they flow through a chain of handlers. Each handler can
46/// either **claim** the message (handle it) or **decline** it (pass to the next handler).
47///
48/// # Message Flow
49///
50/// Messages flow through three layers of handlers in order:
51///
52/// ```text
53/// ┌─────────────────────────────────────────────────────────────────┐
54/// │                     Incoming Message                            │
55/// └─────────────────────────────────────────────────────────────────┘
56///                              │
57///                              ▼
58/// ┌─────────────────────────────────────────────────────────────────┐
59/// │  1. User Handlers (registered via on_receive_request, etc.)     │
60/// │     - Tried in registration order                               │
61/// │     - First handler to return Handled::Yes claims the message   │
62/// └─────────────────────────────────────────────────────────────────┘
63///                              │ Handled::No
64///                              ▼
65/// ┌─────────────────────────────────────────────────────────────────┐
66/// │  2. Dynamic Handlers (added at runtime)                         │
67/// │     - Used for session-specific message handling                │
68/// │     - Added via ConnectionTo::add_dynamic_handler             │
69/// └─────────────────────────────────────────────────────────────────┘
70///                              │ Handled::No
71///                              ▼
72/// ┌─────────────────────────────────────────────────────────────────┐
73/// │  3. Role Default Handler                                        │
74/// │     - Fallback based on the connection's Role                   │
75/// │     - Handles protocol-level messages (e.g., proxy forwarding)  │
76/// └─────────────────────────────────────────────────────────────────┘
77///                              │ Handled::No
78///                              ▼
79/// ┌─────────────────────────────────────────────────────────────────┐
80/// │  Unhandled: Error response sent (or queued if retry=true)       │
81/// └─────────────────────────────────────────────────────────────────┘
82/// ```
83///
84/// # The `Handled` Return Value
85///
86/// Each handler returns [`Handled`] to indicate whether it processed the message:
87///
88/// - **`Handled::Yes`** - Message was handled. No further handlers are invoked.
89/// - **`Handled::No { message, retry }`** - Message was not handled. The message
90///   (possibly modified) is passed to the next handler in the chain.
91///
92/// For convenience, handlers can return `()` which is equivalent to `Handled::Yes`.
93///
94/// # The Retry Mechanism
95///
96/// The `retry` flag in `Handled::No` controls what happens when no handler claims a message:
97///
98/// - **`retry: false`** (default) - Send a "method not found" error response immediately.
99/// - **`retry: true`** - Queue the message and retry it when new dynamic handlers are added.
100///
101/// This mechanism exists because of a timing issue with sessions: when a `session/new`
102/// response is being processed, the dynamic handler for that session hasn't been registered
103/// yet, but `session/update` notifications for that session may already be arriving.
104/// By setting `retry: true`, these early notifications are queued until the session's
105/// dynamic handler is added.
106///
107/// # Handler Registration
108///
109/// Most users register handlers using the builder methods on [`Builder`]:
110///
111/// ```
112/// # use agent_client_protocol::{Agent, Client, ConnectTo};
113/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, AgentCapabilities};
114/// # use agent_client_protocol_test::StatusUpdate;
115/// # async fn example(transport: impl ConnectTo<Agent>) -> Result<(), agent_client_protocol::Error> {
116/// Agent.builder()
117///     .on_receive_request(async |req: InitializeRequest, responder, cx| {
118///         responder.respond(
119///             InitializeResponse::new(req.protocol_version)
120///                 .agent_capabilities(AgentCapabilities::new()),
121///         )
122///     }, agent_client_protocol::on_receive_request!())
123///     .on_receive_notification(async |notif: StatusUpdate, cx| {
124///         // Process notification
125///         Ok(())
126///     }, agent_client_protocol::on_receive_notification!())
127///     .connect_to(transport)
128///     .await?;
129/// # Ok(())
130/// # }
131/// ```
132///
133/// The type parameter on the closure determines which messages are dispatched to it.
134/// Messages that don't match the type are automatically passed to the next handler.
135///
136/// # Implementing Custom Handlers
137///
138/// For advanced use cases, you can implement `HandleMessageAs` directly:
139///
140/// ```ignore
141/// struct MyHandler;
142///
143/// impl HandleMessageAs<Agent> for MyHandler {
144///
145///     async fn handle_dispatch(
146///         &mut self,
147///         message: Dispatch,
148///         cx: ConnectionTo<Self::Role>,
149///     ) -> Result<Handled<Dispatch>, Error> {
150///         if message.method() == "my/custom/method" {
151///             // Handle it
152///             Ok(Handled::Yes)
153///         } else {
154///             // Pass to next handler
155///             Ok(Handled::No { message, retry: false })
156///         }
157///     }
158///
159///     fn describe_chain(&self) -> impl std::fmt::Debug {
160///         "MyHandler"
161///     }
162/// }
163/// ```
164///
165/// # Important: Handlers Must Not Block
166///
167/// The connection processes messages on a single async task. While a handler is running,
168/// no other messages can be processed. For expensive operations, use [`ConnectionTo::spawn`]
169/// to run work concurrently:
170///
171/// ```
172/// # use agent_client_protocol::{Client, Agent, ConnectTo};
173/// # use agent_client_protocol_test::{expensive_operation, ProcessComplete};
174/// # async fn example(transport: impl ConnectTo<Client>) -> Result<(), agent_client_protocol::Error> {
175/// # Client.builder().connect_with(transport, async |cx| {
176/// cx.spawn({
177///     let connection = cx.clone();
178///     async move {
179///         let result = expensive_operation("data").await?;
180///         connection.send_notification(ProcessComplete { result })?;
181///         Ok(())
182///     }
183/// })?;
184/// # Ok(())
185/// # }).await?;
186/// # Ok(())
187/// # }
188/// ```
189#[allow(async_fn_in_trait)]
190/// A handler for incoming JSON-RPC messages.
191///
192/// This trait is implemented by types that can process incoming messages on a connection.
193/// Handlers are registered with a [`Builder`] and are called in order until
194/// one claims the message.
195///
196/// The type parameter `R` is the role this handler plays - who I am.
197/// For an agent handler, `R = Agent` (I handle messages as an agent).
198/// For a client handler, `R = Client` (I handle messages as a client).
199pub trait HandleDispatchFrom<Counterpart: Role>: Send {
200    /// Attempt to claim an incoming message (request or notification).
201    ///
202    /// # Important: do not block
203    ///
204    /// The server will not process new messages until this handler returns.
205    /// You should avoid blocking in this callback unless you wish to block the server (e.g., for rate limiting).
206    /// The recommended approach to manage expensive operations is to the [`ConnectionTo::spawn`] method available on the message context.
207    ///
208    /// # Parameters
209    ///
210    /// * `message` - The incoming message to handle.
211    /// * `connection` - The connection, used to send messages and access connection state.
212    ///
213    /// # Returns
214    ///
215    /// * `Ok(Handled::Yes)` if the message was claimed. It will not be propagated further.
216    /// * `Ok(Handled::No(message))` if not; the (possibly changed) message will be passed to the remaining handlers.
217    /// * `Err` if an internal error occurs (this will bring down the server).
218    fn handle_dispatch_from(
219        &mut self,
220        message: Dispatch,
221        connection: ConnectionTo<Counterpart>,
222    ) -> impl Future<Output = Result<Handled<Dispatch>, crate::Error>> + Send;
223
224    /// Returns a debug description of the registered handlers for diagnostics.
225    fn describe_chain(&self) -> impl std::fmt::Debug;
226}
227
228impl<Counterpart: Role, H> HandleDispatchFrom<Counterpart> for &mut H
229where
230    H: HandleDispatchFrom<Counterpart>,
231{
232    fn handle_dispatch_from(
233        &mut self,
234        message: Dispatch,
235        cx: ConnectionTo<Counterpart>,
236    ) -> impl Future<Output = Result<Handled<Dispatch>, crate::Error>> + Send {
237        H::handle_dispatch_from(self, message, cx)
238    }
239
240    fn describe_chain(&self) -> impl std::fmt::Debug {
241        H::describe_chain(self)
242    }
243}
244
245/// A JSON-RPC connection that can act as either a server, client, or both.
246///
247/// [`Builder`] provides a builder-style API for creating JSON-RPC servers and clients.
248/// You start by calling `Role.builder()` (e.g., `Client.builder()`), then add message
249/// handlers, and finally drive the connection with either [`connect_to`](Builder::connect_to)
250/// or [`connect_with`](Builder::connect_with), providing a component implementation
251/// (e.g., [`ByteStreams`] for byte streams).
252///
253/// # JSON-RPC Primer
254///
255/// JSON-RPC 2.0 has two fundamental message types:
256///
257/// * **Requests** - Messages that expect a response. They have an `id` field that gets
258///   echoed back in the response so the sender can correlate them.
259/// * **Notifications** - Fire-and-forget messages with no `id` field. The sender doesn't
260///   expect or receive a response.
261///
262/// # Type-Driven Message Dispatch
263///
264/// The handler registration methods use Rust's type system to determine which messages
265/// to handle. The type parameter you provide controls what gets dispatched to your handler:
266///
267/// ## Single Message Types
268///
269/// The simplest case - handle one specific message type:
270///
271/// ```no_run
272/// # use agent_client_protocol_test::*;
273/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, SessionNotification};
274/// # async fn example() -> Result<(), agent_client_protocol::Error> {
275/// # let connection = mock_connection();
276/// connection
277///     .on_receive_request(async |req: InitializeRequest, responder, cx| {
278///         // Handle only InitializeRequest messages
279///         responder.respond(InitializeResponse::make())
280///     }, agent_client_protocol::on_receive_request!())
281///     .on_receive_notification(async |notif: SessionNotification, cx| {
282///         // Handle only SessionUpdate notifications
283///         Ok(())
284///     }, agent_client_protocol::on_receive_notification!())
285/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
286/// # Ok(())
287/// # }
288/// ```
289///
290/// ## Enum Message Types
291///
292/// You can also handle multiple related messages with a single handler by defining an enum
293/// that implements the appropriate trait ([`JsonRpcRequest`] or [`JsonRpcNotification`]):
294///
295/// ```no_run
296/// # use agent_client_protocol_test::*;
297/// # use agent_client_protocol::{JsonRpcRequest, JsonRpcMessage, UntypedMessage};
298/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse};
299/// # async fn example() -> Result<(), agent_client_protocol::Error> {
300/// # let connection = mock_connection();
301/// // Define an enum for multiple request types
302/// #[derive(Debug, Clone)]
303/// enum MyRequests {
304///     Initialize(InitializeRequest),
305///     Prompt(PromptRequest),
306/// }
307///
308/// // Implement JsonRpcRequest for your enum
309/// # impl JsonRpcMessage for MyRequests {
310/// #     fn matches_method(_method: &str) -> bool { false }
311/// #     fn method(&self) -> &str { "myRequests" }
312/// #     fn to_untyped_message(&self) -> Result<UntypedMessage, agent_client_protocol::Error> { todo!() }
313/// #     fn parse_message(_method: &str, _params: &impl serde::Serialize) -> Result<Self, agent_client_protocol::Error> { Err(agent_client_protocol::Error::method_not_found()) }
314/// # }
315/// impl JsonRpcRequest for MyRequests { type Response = serde_json::Value; }
316///
317/// // Handle all variants in one place
318/// connection.on_receive_request(async |req: MyRequests, responder, cx| {
319///     match req {
320///         MyRequests::Initialize(init) => { responder.respond(serde_json::json!({})) }
321///         MyRequests::Prompt(prompt) => { responder.respond(serde_json::json!({})) }
322///     }
323/// }, agent_client_protocol::on_receive_request!())
324/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
325/// # Ok(())
326/// # }
327/// ```
328///
329/// ## Mixed Message Types
330///
331/// For enums containing both requests AND notifications, use [`on_receive_dispatch`](Self::on_receive_dispatch):
332///
333/// ```no_run
334/// # use agent_client_protocol_test::*;
335/// # use agent_client_protocol::Dispatch;
336/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, SessionNotification};
337/// # async fn example() -> Result<(), agent_client_protocol::Error> {
338/// # let connection = mock_connection();
339/// // on_receive_dispatch receives Dispatch which can be either a request or notification
340/// connection.on_receive_dispatch(async |msg: Dispatch<InitializeRequest, SessionNotification>, _cx| {
341///     match msg {
342///         Dispatch::Request(req, responder) => {
343///             responder.respond(InitializeResponse::make())
344///         }
345///         Dispatch::Notification(notif) => {
346///             Ok(())
347///         }
348///         Dispatch::Response(result, router) => {
349///             // Forward response to its destination
350///             router.respond_with_result(result)
351///         }
352///     }
353/// }, agent_client_protocol::on_receive_dispatch!())
354/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
355/// # Ok(())
356/// # }
357/// ```
358///
359/// # Handler Registration
360///
361/// Register handlers using these methods (listed from most common to most flexible):
362///
363/// * [`on_receive_request`](Self::on_receive_request) - Handle JSON-RPC requests (messages expecting responses)
364/// * [`on_receive_notification`](Self::on_receive_notification) - Handle JSON-RPC notifications (fire-and-forget)
365/// * [`on_receive_dispatch`](Self::on_receive_dispatch) - Handle enums containing both requests and notifications
366/// * [`with_handler`](Self::with_handler) - Low-level primitive for maximum flexibility
367///
368/// ## Handler Ordering
369///
370/// Handlers are tried in the order you register them. The first handler that claims a message
371/// (by matching its type) will process it. Subsequent handlers won't see that message:
372///
373/// ```no_run
374/// # use agent_client_protocol_test::*;
375/// # use agent_client_protocol::{Dispatch, UntypedMessage};
376/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse};
377/// # async fn example() -> Result<(), agent_client_protocol::Error> {
378/// # let connection = mock_connection();
379/// connection
380///     .on_receive_request(async |req: InitializeRequest, responder, cx| {
381///         // This runs first for InitializeRequest
382///         responder.respond(InitializeResponse::make())
383///     }, agent_client_protocol::on_receive_request!())
384///     .on_receive_request(async |req: PromptRequest, responder, cx| {
385///         // This runs first for PromptRequest
386///         responder.respond(PromptResponse::make())
387///     }, agent_client_protocol::on_receive_request!())
388///     .on_receive_dispatch(async |msg: Dispatch, cx| {
389///         // This runs for any message not handled above
390///         msg.respond_with_error(agent_client_protocol::util::internal_error("unknown method"), cx)
391///     }, agent_client_protocol::on_receive_dispatch!())
392/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
393/// # Ok(())
394/// # }
395/// ```
396///
397/// # Event Loop and Concurrency
398///
399/// Understanding the event loop is critical for writing correct handlers.
400///
401/// ## The Event Loop
402///
403/// [`Builder`] runs all handler callbacks on a single async task - the event loop.
404/// While a handler is running, **the server cannot receive new messages**. This means
405/// any blocking or expensive work in your handlers will stall the entire connection.
406///
407/// To avoid blocking the event loop, use [`ConnectionTo::spawn`] to offload serious
408/// work to concurrent tasks:
409///
410/// ```no_run
411/// # use agent_client_protocol_test::*;
412/// # async fn example() -> Result<(), agent_client_protocol::Error> {
413/// # let connection = mock_connection();
414/// connection.on_receive_request(async |req: AnalyzeRequest, responder, cx| {
415///     // Clone cx for the spawned task
416///     cx.spawn({
417///         let connection = cx.clone();
418///         async move {
419///             let result = expensive_analysis(&req.data).await?;
420///             connection.send_notification(AnalysisComplete { result })?;
421///             Ok(())
422///         }
423///     })?;
424///
425///     // Respond immediately without blocking
426///     responder.respond(AnalysisStarted { job_id: 42 })
427/// }, agent_client_protocol::on_receive_request!())
428/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
429/// # Ok(())
430/// # }
431/// ```
432///
433/// Note that the entire connection runs within one async task, so parallelism must be
434/// managed explicitly using [`spawn`](ConnectionTo::spawn).
435///
436/// ## The Connection Context
437///
438/// Handler callbacks receive a context object (`cx`) for interacting with the connection:
439///
440/// * **For request handlers** - [`Responder<R>`] provides [`respond`](Responder::respond)
441///   to send the response, plus methods to send other messages
442/// * **For notification handlers** - [`ConnectionTo`] provides methods to send messages
443///   and spawn tasks
444///
445/// Both context types support:
446/// * [`send_request`](ConnectionTo::send_request) - Send requests to the other side
447/// * [`send_notification`](ConnectionTo::send_notification) - Send notifications
448/// * [`spawn`](ConnectionTo::spawn) - Run tasks concurrently without blocking the event loop
449///
450/// The [`SentRequest`] returned by `send_request` provides methods like
451/// [`on_receiving_result`](SentRequest::on_receiving_result) that help you
452/// avoid accidentally blocking the event loop while waiting for responses.
453///
454/// # Driving the Connection
455///
456/// After adding handlers, you must drive the connection using one of two modes:
457///
458/// ## Server Mode: `connect_to()`
459///
460/// Use [`connect_to`](Self::connect_to) when you only need to respond to incoming messages:
461///
462/// ```no_run
463/// # use agent_client_protocol_test::*;
464/// # async fn example() -> Result<(), agent_client_protocol::Error> {
465/// # let connection = mock_connection();
466/// connection
467///     .on_receive_request(async |req: MyRequest, responder, cx| {
468///         responder.respond(MyResponse { status: "ok".into() })
469///     }, agent_client_protocol::on_receive_request!())
470///     .connect_to(MockTransport)  // Runs until connection closes or error occurs
471///     .await?;
472/// # Ok(())
473/// # }
474/// ```
475///
476/// The connection will process incoming messages and invoke your handlers until the
477/// connection is closed or an error occurs.
478///
479/// ## Client Mode: `connect_with()`
480///
481/// Use [`connect_with`](Self::connect_with) when you need to both handle incoming messages
482/// AND send your own requests/notifications:
483///
484/// ```no_run
485/// # use agent_client_protocol_test::*;
486/// # use agent_client_protocol::schema::InitializeRequest;
487/// # async fn example() -> Result<(), agent_client_protocol::Error> {
488/// # let connection = mock_connection();
489/// connection
490///     .on_receive_request(async |req: MyRequest, responder, cx| {
491///         responder.respond(MyResponse { status: "ok".into() })
492///     }, agent_client_protocol::on_receive_request!())
493///     .connect_with(MockTransport, async |cx| {
494///         // You can send requests to the other side
495///         let response = cx.send_request(InitializeRequest::make())
496///             .block_task()
497///             .await?;
498///
499///         // And send notifications
500///         cx.send_notification(StatusUpdate { message: "ready".into() })?;
501///
502///         Ok(())
503///     })
504///     .await?;
505/// # Ok(())
506/// # }
507/// ```
508///
509/// The connection will serve incoming messages in the background while your client closure
510/// runs. When the closure returns, the connection shuts down.
511///
512/// # Example: Complete Agent
513///
514/// ```no_run
515/// # use agent_client_protocol::UntypedRole;
516/// # use agent_client_protocol::{Builder};
517/// # use agent_client_protocol::Stdio;
518/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse, SessionNotification};
519/// # async fn example() -> Result<(), agent_client_protocol::Error> {
520/// let transport = Stdio::new();
521///
522/// UntypedRole.builder()
523///     .name("my-agent")  // Optional: for debugging logs
524///     .on_receive_request(async |init: InitializeRequest, responder, cx| {
525///         let response: InitializeResponse = todo!();
526///         responder.respond(response)
527///     }, agent_client_protocol::on_receive_request!())
528///     .on_receive_request(async |prompt: PromptRequest, responder, cx| {
529///         // You can send notifications while processing a request
530///         let notif: SessionNotification = todo!();
531///         cx.send_notification(notif)?;
532///
533///         // Then respond to the request
534///         let response: PromptResponse = todo!();
535///         responder.respond(response)
536///     }, agent_client_protocol::on_receive_request!())
537///     .connect_to(transport)
538///     .await?;
539/// # Ok(())
540/// # }
541/// ```
542#[must_use]
543#[derive(Debug)]
544pub struct Builder<Host: Role, Handler = NullHandler, Runner = NullRun>
545where
546    Handler: HandleDispatchFrom<Host::Counterpart>,
547    Runner: RunWithConnectionTo<Host::Counterpart>,
548{
549    /// My role.
550    host: Host,
551
552    /// Name of the connection, used in tracing logs.
553    name: Option<String>,
554
555    /// Handler for incoming messages.
556    handler: Handler,
557
558    /// Responder for background tasks.
559    responder: Runner,
560
561    /// Protocol version mode for the public API and wire compatibility layer.
562    protocol_mode: ProtocolMode,
563}
564
565fn default_protocol_mode<Host: Role>() -> ProtocolMode {
566    let role = TypeId::of::<Host>();
567
568    if role == TypeId::of::<Agent>() {
569        ProtocolMode::v1_agent()
570    } else if role == TypeId::of::<Client>() {
571        ProtocolMode::v1_client()
572    } else {
573        ProtocolMode::disabled()
574    }
575}
576
577impl<Host: Role> Builder<Host, NullHandler, NullRun> {
578    /// Create a new connection builder for the given role.
579    /// This type follows a builder pattern; use other methods to configure and then invoke
580    /// [`Self::connect_to`] (to use as a server) or [`Self::connect_with`] to use as a client.
581    pub fn new(role: Host) -> Self {
582        Self {
583            host: role,
584            name: None,
585            handler: NullHandler,
586            responder: NullRun,
587            protocol_mode: default_protocol_mode::<Host>(),
588        }
589    }
590}
591
592impl<Host: Role, Handler> Builder<Host, Handler, NullRun>
593where
594    Handler: HandleDispatchFrom<Host::Counterpart>,
595{
596    /// Create a new connection builder with the given handler.
597    pub fn new_with(role: Host, handler: Handler) -> Self {
598        Self {
599            host: role,
600            name: None,
601            handler,
602            responder: NullRun,
603            protocol_mode: default_protocol_mode::<Host>(),
604        }
605    }
606}
607
608impl<
609    Host: Role,
610    Handler: HandleDispatchFrom<Host::Counterpart>,
611    Runner: RunWithConnectionTo<Host::Counterpart>,
612> Builder<Host, Handler, Runner>
613{
614    /// Set the "name" of this connection -- used only for debugging logs.
615    pub fn name(mut self, name: impl ToString) -> Self {
616        self.name = Some(name.to_string());
617        self
618    }
619
620    pub(crate) fn v1_agent(mut self) -> Self {
621        self.protocol_mode = ProtocolMode::v1_agent();
622        self
623    }
624
625    pub(crate) fn v1_client(mut self) -> Self {
626        self.protocol_mode = ProtocolMode::v1_client();
627        self
628    }
629
630    #[cfg(feature = "unstable_protocol_v2")]
631    pub(crate) fn v2_agent(mut self) -> Self {
632        self.protocol_mode = ProtocolMode::v2_agent();
633        self
634    }
635
636    #[cfg(feature = "unstable_protocol_v2")]
637    pub(crate) fn v2_client(mut self) -> Self {
638        self.protocol_mode = ProtocolMode::v2_client();
639        self
640    }
641
642    /// Merge another [`Builder`] into this one.
643    ///
644    /// Prefer [`Self::on_receive_request`] or [`Self::on_receive_notification`].
645    /// This is a low-level method that is not intended for general use.
646    pub fn with_connection_builder(
647        self,
648        other: Builder<
649            Host,
650            impl HandleDispatchFrom<Host::Counterpart>,
651            impl RunWithConnectionTo<Host::Counterpart>,
652        >,
653    ) -> Builder<
654        Host,
655        impl HandleDispatchFrom<Host::Counterpart>,
656        impl RunWithConnectionTo<Host::Counterpart>,
657    > {
658        let Builder {
659            name: other_name,
660            handler: other_handler,
661            responder: other_responder,
662            protocol_mode: other_protocol_mode,
663            host: _,
664        } = other;
665        Builder {
666            host: self.host,
667            name: self.name,
668            handler: ChainedHandler::new(
669                self.handler,
670                NamedHandler::new(other_name, other_handler),
671            ),
672            responder: ChainRun::new(self.responder, other_responder),
673            protocol_mode: self.protocol_mode.merge(other_protocol_mode),
674        }
675    }
676
677    /// Add a new [`HandleDispatchFrom`] to the chain.
678    ///
679    /// Prefer [`Self::on_receive_request`] or [`Self::on_receive_notification`].
680    /// This is a low-level method that is not intended for general use.
681    pub fn with_handler(
682        self,
683        handler: impl HandleDispatchFrom<Host::Counterpart>,
684    ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner> {
685        Builder {
686            host: self.host,
687            name: self.name,
688            handler: ChainedHandler::new(self.handler, handler),
689            responder: self.responder,
690            protocol_mode: self.protocol_mode,
691        }
692    }
693
694    /// Add a new [`RunWithConnectionTo`] to the chain.
695    pub fn with_responder<Run1>(
696        self,
697        responder: Run1,
698    ) -> Builder<Host, Handler, impl RunWithConnectionTo<Host::Counterpart>>
699    where
700        Run1: RunWithConnectionTo<Host::Counterpart>,
701    {
702        Builder {
703            host: self.host,
704            name: self.name,
705            handler: self.handler,
706            responder: ChainRun::new(self.responder, responder),
707            protocol_mode: self.protocol_mode,
708        }
709    }
710
711    /// Enqueue a task to run once the connection is actively serving traffic.
712    #[track_caller]
713    pub fn with_spawned<F, Fut>(
714        self,
715        task: F,
716    ) -> Builder<Host, Handler, impl RunWithConnectionTo<Host::Counterpart>>
717    where
718        F: FnOnce(ConnectionTo<Host::Counterpart>) -> Fut + Send,
719        Fut: Future<Output = Result<(), crate::Error>> + Send,
720    {
721        let location = Location::caller();
722        self.with_responder(SpawnedRun::new(location, task))
723    }
724
725    /// Register a handler for messages that can be either requests OR notifications.
726    ///
727    /// Use this when you want to handle an enum type that contains both request and
728    /// notification variants. Your handler receives a [`Dispatch<Req, Notif>`] which
729    /// is an enum with two variants:
730    ///
731    /// - `Dispatch::Request(request, responder)` - A request with its response context
732    /// - `Dispatch::Notification(notification)` - A notification
733    /// - `Dispatch::Response(result, router)` - A response to a request we sent
734    ///
735    /// # Example
736    ///
737    /// ```no_run
738    /// # use agent_client_protocol_test::*;
739    /// # use agent_client_protocol::Dispatch;
740    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
741    /// # let connection = mock_connection();
742    /// connection.on_receive_dispatch(async |message: Dispatch<MyRequest, StatusUpdate>, _cx| {
743    ///     match message {
744    ///         Dispatch::Request(req, responder) => {
745    ///             // Handle request and send response
746    ///             responder.respond(MyResponse { status: "ok".into() })
747    ///         }
748    ///         Dispatch::Notification(notif) => {
749    ///             // Handle notification (no response needed)
750    ///             Ok(())
751    ///         }
752    ///         Dispatch::Response(result, router) => {
753    ///             // Forward response to its destination
754    ///             router.respond_with_result(result)
755    ///         }
756    ///     }
757    /// }, agent_client_protocol::on_receive_dispatch!())
758    /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
759    /// # Ok(())
760    /// # }
761    /// ```
762    ///
763    /// For most use cases, prefer [`on_receive_request`](Self::on_receive_request) or
764    /// [`on_receive_notification`](Self::on_receive_notification) which provide cleaner APIs
765    /// for handling requests or notifications separately.
766    ///
767    /// # Ordering
768    ///
769    /// This callback runs inside the dispatch loop and blocks further message processing
770    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
771    /// ordering guarantees and how to avoid deadlocks.
772    pub fn on_receive_dispatch<Req, Notif, F, T, ToFut>(
773        self,
774        op: F,
775        to_future_hack: ToFut,
776    ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
777    where
778        Host::Counterpart: HasPeer<Host::Counterpart>,
779        Req: JsonRpcRequest,
780        Notif: JsonRpcNotification,
781        F: AsyncFnMut(
782                Dispatch<Req, Notif>,
783                ConnectionTo<Host::Counterpart>,
784            ) -> Result<T, crate::Error>
785            + Send,
786        T: IntoHandled<Dispatch<Req, Notif>>,
787        ToFut: Fn(
788                &mut F,
789                Dispatch<Req, Notif>,
790                ConnectionTo<Host::Counterpart>,
791            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
792            + Send
793            + Sync,
794    {
795        let handler = MessageHandler::new(
796            self.host.counterpart(),
797            self.host.counterpart(),
798            op,
799            to_future_hack,
800        );
801        self.with_handler(handler)
802    }
803
804    /// Register a handler for JSON-RPC requests of type `Req`.
805    ///
806    /// Your handler receives two arguments:
807    /// 1. The request (type `Req`)
808    /// 2. A [`Responder<R, Req::Response>`] for sending the response
809    ///
810    /// The request context allows you to:
811    /// - Send the response with [`Responder::respond`]
812    /// - Send notifications to the client with [`ConnectionTo::send_notification`]
813    /// - Send requests to the client with [`ConnectionTo::send_request`]
814    ///
815    /// # Example
816    ///
817    /// ```ignore
818    /// # use agent_client_protocol::UntypedRole;
819    /// # use agent_client_protocol::{Builder};
820    /// # use agent_client_protocol::schema::{PromptRequest, PromptResponse, SessionNotification};
821    /// # fn example<R: agent_client_protocol::Role>(connection: Builder<R, impl agent_client_protocol::HandleMessageAs<R>>) {
822    /// connection.on_receive_request(async |request: PromptRequest, responder, cx| {
823    ///     // Send a notification while processing
824    ///     let notif: SessionNotification = todo!();
825    ///     cx.send_notification(notif)?;
826    ///
827    ///     // Do some work...
828    ///     let result = todo!("process the prompt");
829    ///
830    ///     // Send the response
831    ///     let response: PromptResponse = todo!();
832    ///     responder.respond(response)
833    /// }, agent_client_protocol::on_receive_request!());
834    /// # }
835    /// ```
836    ///
837    /// # Type Parameter
838    ///
839    /// `Req` can be either a single request type or an enum of multiple request types.
840    /// See the [type-driven dispatch](Self#type-driven-message-dispatch) section for details.
841    ///
842    /// # Ordering
843    ///
844    /// This callback runs inside the dispatch loop and blocks further message processing
845    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
846    /// ordering guarantees and how to avoid deadlocks.
847    pub fn on_receive_request<Req: JsonRpcRequest, F, T, ToFut>(
848        self,
849        op: F,
850        to_future_hack: ToFut,
851    ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
852    where
853        Host::Counterpart: HasPeer<Host::Counterpart>,
854        F: AsyncFnMut(
855                Req,
856                Responder<Req::Response>,
857                ConnectionTo<Host::Counterpart>,
858            ) -> Result<T, crate::Error>
859            + Send,
860        T: IntoHandled<(Req, Responder<Req::Response>)>,
861        ToFut: Fn(
862                &mut F,
863                Req,
864                Responder<Req::Response>,
865                ConnectionTo<Host::Counterpart>,
866            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
867            + Send
868            + Sync,
869    {
870        let handler = RequestHandler::new(
871            self.host.counterpart(),
872            self.host.counterpart(),
873            op,
874            to_future_hack,
875        );
876        self.with_handler(handler)
877    }
878
879    /// Register a handler for JSON-RPC notifications of type `Notif`.
880    ///
881    /// Notifications are fire-and-forget messages that don't expect a response.
882    /// Your handler receives:
883    /// 1. The notification (type `Notif`)
884    /// 2. A [`ConnectionTo<R>`] for sending messages to the other side
885    ///
886    /// Unlike request handlers, you cannot send a response (notifications don't have IDs),
887    /// but you can still send your own requests and notifications using the context.
888    ///
889    /// # Example
890    ///
891    /// ```no_run
892    /// # use agent_client_protocol_test::*;
893    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
894    /// # let connection = mock_connection();
895    /// connection.on_receive_notification(async |notif: SessionUpdate, cx| {
896    ///     // Process the notification
897    ///     update_session_state(&notif)?;
898    ///
899    ///     // Optionally send a notification back
900    ///     cx.send_notification(StatusUpdate {
901    ///         message: "Acknowledged".into(),
902    ///     })?;
903    ///
904    ///     Ok(())
905    /// }, agent_client_protocol::on_receive_notification!())
906    /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
907    /// # Ok(())
908    /// # }
909    /// ```
910    ///
911    /// # Type Parameter
912    ///
913    /// `Notif` can be either a single notification type or an enum of multiple notification types.
914    /// See the [type-driven dispatch](Self#type-driven-message-dispatch) section for details.
915    ///
916    /// # Ordering
917    ///
918    /// This callback runs inside the dispatch loop and blocks further message processing
919    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
920    /// ordering guarantees and how to avoid deadlocks.
921    pub fn on_receive_notification<Notif, F, T, ToFut>(
922        self,
923        op: F,
924        to_future_hack: ToFut,
925    ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
926    where
927        Host::Counterpart: HasPeer<Host::Counterpart>,
928        Notif: JsonRpcNotification,
929        F: AsyncFnMut(Notif, ConnectionTo<Host::Counterpart>) -> Result<T, crate::Error> + Send,
930        T: IntoHandled<(Notif, ConnectionTo<Host::Counterpart>)>,
931        ToFut: Fn(
932                &mut F,
933                Notif,
934                ConnectionTo<Host::Counterpart>,
935            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
936            + Send
937            + Sync,
938    {
939        let handler = NotificationHandler::new(
940            self.host.counterpart(),
941            self.host.counterpart(),
942            op,
943            to_future_hack,
944        );
945        self.with_handler(handler)
946    }
947
948    /// Register a handler for messages from a specific peer.
949    ///
950    /// This is similar to [`on_receive_dispatch`](Self::on_receive_dispatch), but allows
951    /// specifying the source peer explicitly. This is useful when receiving messages
952    /// from a peer that requires message transformation (e.g., unwrapping `SuccessorMessage`
953    /// envelopes when receiving from an agent via a proxy).
954    ///
955    /// For the common case of receiving from the default counterpart, use
956    /// [`on_receive_dispatch`](Self::on_receive_dispatch) instead.
957    ///
958    /// # Ordering
959    ///
960    /// This callback runs inside the dispatch loop and blocks further message processing
961    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
962    /// ordering guarantees and how to avoid deadlocks.
963    pub fn on_receive_dispatch_from<
964        Req: JsonRpcRequest,
965        Notif: JsonRpcNotification,
966        Peer: Role,
967        F,
968        T,
969        ToFut,
970    >(
971        self,
972        peer: Peer,
973        op: F,
974        to_future_hack: ToFut,
975    ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
976    where
977        Host::Counterpart: HasPeer<Peer>,
978        F: AsyncFnMut(
979                Dispatch<Req, Notif>,
980                ConnectionTo<Host::Counterpart>,
981            ) -> Result<T, crate::Error>
982            + Send,
983        T: IntoHandled<Dispatch<Req, Notif>>,
984        ToFut: Fn(
985                &mut F,
986                Dispatch<Req, Notif>,
987                ConnectionTo<Host::Counterpart>,
988            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
989            + Send
990            + Sync,
991    {
992        let handler = MessageHandler::new(self.host.counterpart(), peer, op, to_future_hack);
993        self.with_handler(handler)
994    }
995
996    /// Register a handler for JSON-RPC requests from a specific peer.
997    ///
998    /// This is similar to [`on_receive_request`](Self::on_receive_request), but allows
999    /// specifying the source peer explicitly. This is useful when receiving messages
1000    /// from a peer that requires message transformation (e.g., unwrapping `SuccessorRequest`
1001    /// envelopes when receiving from an agent via a proxy).
1002    ///
1003    /// For the common case of receiving from the default counterpart, use
1004    /// [`on_receive_request`](Self::on_receive_request) instead.
1005    ///
1006    /// # Example
1007    ///
1008    /// ```ignore
1009    /// use agent_client_protocol::Agent;
1010    /// use agent_client_protocol::schema::InitializeRequest;
1011    ///
1012    /// // Conductor receiving from agent direction - messages will be unwrapped from SuccessorMessage
1013    /// connection.on_receive_request_from(Agent, async |req: InitializeRequest, responder, cx| {
1014    ///     // Handle the request
1015    ///     responder.respond(InitializeResponse::make())
1016    /// })
1017    /// ```
1018    ///
1019    /// # Ordering
1020    ///
1021    /// This callback runs inside the dispatch loop and blocks further message processing
1022    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
1023    /// ordering guarantees and how to avoid deadlocks.
1024    pub fn on_receive_request_from<Req: JsonRpcRequest, Peer: Role, F, T, ToFut>(
1025        self,
1026        peer: Peer,
1027        op: F,
1028        to_future_hack: ToFut,
1029    ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
1030    where
1031        Host::Counterpart: HasPeer<Peer>,
1032        F: AsyncFnMut(
1033                Req,
1034                Responder<Req::Response>,
1035                ConnectionTo<Host::Counterpart>,
1036            ) -> Result<T, crate::Error>
1037            + Send,
1038        T: IntoHandled<(Req, Responder<Req::Response>)>,
1039        ToFut: Fn(
1040                &mut F,
1041                Req,
1042                Responder<Req::Response>,
1043                ConnectionTo<Host::Counterpart>,
1044            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
1045            + Send
1046            + Sync,
1047    {
1048        let handler = RequestHandler::new(self.host.counterpart(), peer, op, to_future_hack);
1049        self.with_handler(handler)
1050    }
1051
1052    /// Register a handler for JSON-RPC notifications from a specific peer.
1053    ///
1054    /// This is similar to [`on_receive_notification`](Self::on_receive_notification), but allows
1055    /// specifying the source peer explicitly. This is useful when receiving messages
1056    /// from a peer that requires message transformation (e.g., unwrapping `SuccessorNotification`
1057    /// envelopes when receiving from an agent via a proxy).
1058    ///
1059    /// For the common case of receiving from the default counterpart, use
1060    /// [`on_receive_notification`](Self::on_receive_notification) instead.
1061    ///
1062    /// # Ordering
1063    ///
1064    /// This callback runs inside the dispatch loop and blocks further message processing
1065    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
1066    /// ordering guarantees and how to avoid deadlocks.
1067    pub fn on_receive_notification_from<Notif: JsonRpcNotification, Peer: Role, F, T, ToFut>(
1068        self,
1069        peer: Peer,
1070        op: F,
1071        to_future_hack: ToFut,
1072    ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
1073    where
1074        Host::Counterpart: HasPeer<Peer>,
1075        F: AsyncFnMut(Notif, ConnectionTo<Host::Counterpart>) -> Result<T, crate::Error> + Send,
1076        T: IntoHandled<(Notif, ConnectionTo<Host::Counterpart>)>,
1077        ToFut: Fn(
1078                &mut F,
1079                Notif,
1080                ConnectionTo<Host::Counterpart>,
1081            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
1082            + Send
1083            + Sync,
1084    {
1085        let handler = NotificationHandler::new(self.host.counterpart(), peer, op, to_future_hack);
1086        self.with_handler(handler)
1087    }
1088
1089    /// Add an MCP server that will be added to all new sessions that are proxied through this connection.
1090    ///
1091    /// Only applicable to proxies.
1092    pub fn with_mcp_server(
1093        self,
1094        mcp_server: McpServer<Host::Counterpart, impl RunWithConnectionTo<Host::Counterpart>>,
1095    ) -> Builder<
1096        Host,
1097        impl HandleDispatchFrom<Host::Counterpart>,
1098        impl RunWithConnectionTo<Host::Counterpart>,
1099    >
1100    where
1101        Host::Counterpart: HasPeer<Agent> + HasPeer<Client>,
1102    {
1103        let (handler, responder) = mcp_server.into_handler_and_responder();
1104        self.with_handler(handler).with_responder(responder)
1105    }
1106
1107    /// Run in server mode with the provided transport.
1108    ///
1109    /// This drives the connection by continuously processing messages from the transport
1110    /// and dispatching them to your registered handlers. The connection will run until:
1111    /// - The transport closes (e.g., EOF on byte streams)
1112    /// - An error occurs
1113    /// - One of your handlers returns an error
1114    ///
1115    /// The transport is responsible for serializing and deserializing `jsonrpcmsg::Message`
1116    /// values to/from the underlying I/O mechanism (byte streams, channels, etc.).
1117    ///
1118    /// Use this mode when you only need to respond to incoming messages and don't need
1119    /// to initiate your own requests. If you need to send requests to the other side,
1120    /// use [`connect_with`](Self::connect_with) instead.
1121    ///
1122    /// # Example: Byte Stream Transport
1123    ///
1124    /// ```no_run
1125    /// # use agent_client_protocol::UntypedRole;
1126    /// # use agent_client_protocol::{Builder};
1127    /// # use agent_client_protocol::Stdio;
1128    /// # use agent_client_protocol_test::*;
1129    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
1130    /// let transport = Stdio::new();
1131    ///
1132    /// UntypedRole.builder()
1133    ///     .on_receive_request(async |req: MyRequest, responder, cx| {
1134    ///         responder.respond(MyResponse { status: "ok".into() })
1135    ///     }, agent_client_protocol::on_receive_request!())
1136    ///     .connect_to(transport)
1137    ///     .await?;
1138    /// # Ok(())
1139    /// # }
1140    /// ```
1141    pub async fn connect_to(
1142        self,
1143        transport: impl ConnectTo<Host> + 'static,
1144    ) -> Result<(), crate::Error> {
1145        self.connect_with(transport, async move |_cx| future::pending().await)
1146            .await
1147    }
1148
1149    /// Run the connection until the provided closure completes.
1150    ///
1151    /// This drives the connection by:
1152    /// 1. Running your registered handlers in the background to process incoming messages
1153    /// 2. Executing your `main_fn` closure with a [`ConnectionTo<R>`] for sending requests/notifications
1154    ///
1155    /// The connection stays active until your `main_fn` returns, then shuts down gracefully.
1156    /// If the connection closes unexpectedly before `main_fn` completes, this returns an error.
1157    ///
1158    /// Use this mode when you need to initiate communication (send requests/notifications)
1159    /// in addition to responding to incoming messages. For server-only mode where you just
1160    /// respond to messages, use [`connect_to`](Self::connect_to) instead.
1161    ///
1162    /// # Example
1163    ///
1164    /// ```no_run
1165    /// # use agent_client_protocol::UntypedRole;
1166    /// # use agent_client_protocol::{Builder};
1167    /// # use agent_client_protocol::ByteStreams;
1168    /// # use agent_client_protocol::schema::InitializeRequest;
1169    /// # use agent_client_protocol::Stdio;
1170    /// # use agent_client_protocol_test::*;
1171    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
1172    /// let transport = Stdio::new();
1173    ///
1174    /// UntypedRole.builder()
1175    ///     .on_receive_request(async |req: MyRequest, responder, cx| {
1176    ///         // Handle incoming requests in the background
1177    ///         responder.respond(MyResponse { status: "ok".into() })
1178    ///     }, agent_client_protocol::on_receive_request!())
1179    ///     .connect_with(transport, async |cx| {
1180    ///         // Initialize the protocol
1181    ///         let init_response = cx.send_request(InitializeRequest::make())
1182    ///             .block_task()
1183    ///             .await?;
1184    ///
1185    ///         // Send more requests...
1186    ///         let result = cx.send_request(MyRequest {})
1187    ///             .block_task()
1188    ///             .await?;
1189    ///
1190    ///         // When this closure returns, the connection shuts down
1191    ///         Ok(())
1192    ///     })
1193    ///     .await?;
1194    /// # Ok(())
1195    /// # }
1196    /// ```
1197    ///
1198    /// # Parameters
1199    ///
1200    /// - `main_fn`: Your client logic. Receives a [`ConnectionTo<R>`] for sending messages.
1201    ///
1202    /// # Errors
1203    ///
1204    /// Returns an error if the connection closes before `main_fn` completes.
1205    pub async fn connect_with<R>(
1206        self,
1207        transport: impl ConnectTo<Host> + 'static,
1208        main_fn: impl AsyncFnOnce(ConnectionTo<Host::Counterpart>) -> Result<R, crate::Error>,
1209    ) -> Result<R, crate::Error> {
1210        let (_, future) = self.into_connection_and_future(transport, main_fn);
1211        future.await
1212    }
1213
1214    /// Helper that returns a [`ConnectionTo<R>`] and a future that runs this connection until `main_fn` returns.
1215    fn into_connection_and_future<R>(
1216        self,
1217        transport: impl ConnectTo<Host> + 'static,
1218        main_fn: impl AsyncFnOnce(ConnectionTo<Host::Counterpart>) -> Result<R, crate::Error>,
1219    ) -> (
1220        ConnectionTo<Host::Counterpart>,
1221        impl Future<Output = Result<R, crate::Error>>,
1222    ) {
1223        let Self {
1224            name,
1225            handler,
1226            responder,
1227            host: me,
1228            protocol_mode,
1229        } = self;
1230
1231        let (outgoing_tx, outgoing_rx) = mpsc::unbounded();
1232        let (new_task_tx, new_task_rx) = mpsc::unbounded();
1233        let (dynamic_handler_tx, dynamic_handler_rx) = mpsc::unbounded();
1234        let connection = ConnectionTo::new(
1235            me.counterpart(),
1236            outgoing_tx,
1237            new_task_tx,
1238            dynamic_handler_tx,
1239        );
1240
1241        // Convert transport into server - this returns a channel for us to use
1242        // and a future that runs the transport
1243        let transport_component = crate::DynConnectTo::new(transport);
1244        let (transport_channel, transport_future) = transport_component.into_channel_and_future();
1245        let spawn_result = connection.spawn(transport_future);
1246
1247        // Destructure the channel endpoints
1248        let Channel {
1249            rx: transport_incoming_rx,
1250            tx: transport_outgoing_tx,
1251        } = transport_channel;
1252
1253        let (reply_tx, reply_rx) = mpsc::unbounded();
1254        let protocol_compat = ProtocolCompat::new(protocol_mode);
1255
1256        let future = crate::util::instrument_with_connection_name(name, {
1257            let connection = connection.clone();
1258            async move {
1259                let () = spawn_result?;
1260
1261                let background = async {
1262                    futures::try_join!(
1263                        // Protocol layer: OutgoingMessage → jsonrpcmsg::Message
1264                        outgoing_actor::outgoing_protocol_actor(
1265                            outgoing_rx,
1266                            reply_tx.clone(),
1267                            transport_outgoing_tx,
1268                            protocol_compat.clone(),
1269                        ),
1270                        // Protocol layer: jsonrpcmsg::Message → handler/reply routing
1271                        incoming_actor::incoming_protocol_actor(
1272                            me.counterpart(),
1273                            &connection,
1274                            transport_incoming_rx,
1275                            dynamic_handler_rx,
1276                            reply_rx,
1277                            handler,
1278                            protocol_compat,
1279                        ),
1280                        task_actor::task_actor(new_task_rx, &connection),
1281                        responder.run_with_connection_to(connection.clone()),
1282                    )?;
1283                    Ok(())
1284                };
1285
1286                crate::util::run_until(Box::pin(background), Box::pin(main_fn(connection.clone())))
1287                    .await
1288            }
1289        });
1290
1291        (connection, future)
1292    }
1293}
1294
1295impl<R, H, Run> ConnectTo<R::Counterpart> for Builder<R, H, Run>
1296where
1297    R: Role,
1298    H: HandleDispatchFrom<R::Counterpart> + 'static,
1299    Run: RunWithConnectionTo<R::Counterpart> + 'static,
1300{
1301    async fn connect_to(self, client: impl ConnectTo<R>) -> Result<(), crate::Error> {
1302        Builder::connect_to(self, client).await
1303    }
1304}
1305
1306/// The payload sent through the response oneshot channel.
1307///
1308/// Includes the response value and an optional ack channel for dispatch loop
1309/// synchronization.
1310pub(crate) struct ResponsePayload {
1311    /// The response result - either the JSON value or an error.
1312    pub(crate) result: Result<serde_json::Value, crate::Error>,
1313
1314    /// Optional acknowledgment channel for dispatch loop synchronization.
1315    ///
1316    /// When present, the receiver must send on this channel to signal that
1317    /// response processing is complete, allowing the dispatch loop to continue
1318    /// to the next message.
1319    ///
1320    /// This is `None` for error paths where the response is sent directly
1321    /// (e.g., when the outgoing channel is broken) rather than through the
1322    /// normal dispatch loop flow.
1323    pub(crate) ack_tx: Option<oneshot::Sender<()>>,
1324}
1325
1326impl std::fmt::Debug for ResponsePayload {
1327    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1328        f.debug_struct("ResponsePayload")
1329            .field("result", &self.result)
1330            .field("ack_tx", &self.ack_tx.as_ref().map(|_| "..."))
1331            .finish()
1332    }
1333}
1334
1335/// Message sent to the incoming actor for reply subscription management.
1336enum ReplyMessage {
1337    /// Subscribe to receive a response for the given request id.
1338    /// When a response with this id arrives, it will be sent through the oneshot
1339    /// along with an ack channel that must be signaled when processing is complete.
1340    /// The method name is stored to allow routing responses through typed handlers.
1341    Subscribe {
1342        id: jsonrpcmsg::Id,
1343
1344        /// id of the peer this request was sent to
1345        role_id: RoleId,
1346
1347        /// (original) method of the request -- the actual request may have been transformed
1348        /// to a successor method, but this will reflect the method of the wrapped request
1349        method: String,
1350
1351        sender: oneshot::Sender<ResponsePayload>,
1352    },
1353}
1354
1355impl std::fmt::Debug for ReplyMessage {
1356    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1357        match self {
1358            ReplyMessage::Subscribe { id, method, .. } => f
1359                .debug_struct("Subscribe")
1360                .field("id", id)
1361                .field("method", method)
1362                .finish(),
1363        }
1364    }
1365}
1366
1367/// Messages send to be serialized over the transport.
1368#[derive(Debug)]
1369enum OutgoingMessage {
1370    /// Send a request to the server.
1371    Request {
1372        /// id assigned to this request (generated by sender)
1373        id: jsonrpcmsg::Id,
1374
1375        /// the original method
1376        method: String,
1377
1378        /// the peer we sent this to
1379        role_id: RoleId,
1380
1381        /// the message to send; this may have a distinct method
1382        /// depending on the peer
1383        untyped: UntypedMessage,
1384
1385        /// where to send the response when it arrives (includes ack channel)
1386        response_tx: oneshot::Sender<ResponsePayload>,
1387    },
1388
1389    /// Send a notification to the server.
1390    Notification {
1391        /// the message to send; this may have a distinct method
1392        /// depending on the peer
1393        untyped: UntypedMessage,
1394    },
1395
1396    /// Send a response to a message from the server
1397    Response {
1398        id: jsonrpcmsg::Id,
1399
1400        /// Method of the incoming request this response completes.
1401        method: String,
1402
1403        response: Result<serde_json::Value, crate::Error>,
1404    },
1405
1406    /// Send a generalized error message
1407    Error { error: crate::Error },
1408}
1409
1410/// Return type from JrHandler; indicates whether the request was handled or not.
1411#[must_use]
1412#[derive(Debug)]
1413pub enum Handled<T> {
1414    /// The message was handled
1415    Yes,
1416
1417    /// The message was not handled; returns the original value.
1418    ///
1419    /// If `retry` is true,
1420    No {
1421        /// The message to be passed to subsequent handlers
1422        /// (typically the original message, but it may have been
1423        /// mutated.)
1424        message: T,
1425
1426        /// If true, request the message to be queued and retried with
1427        /// dynamic handlers as they are added.
1428        ///
1429        /// This is used for managing session updates since the dynamic
1430        /// handler for a session cannot be added until the response to the
1431        /// new session request has been processed and there may be updates
1432        /// that get processed at the same time.
1433        retry: bool,
1434    },
1435}
1436
1437/// Trait for converting handler return values into [`Handled`].
1438///
1439/// This trait allows handlers to return either `()` (which becomes `Handled::Yes`)
1440/// or an explicit `Handled<T>` value for more control over handler propagation.
1441pub trait IntoHandled<T> {
1442    /// Convert this value into a `Handled<T>`.
1443    fn into_handled(self) -> Handled<T>;
1444}
1445
1446impl<T> IntoHandled<T> for () {
1447    fn into_handled(self) -> Handled<T> {
1448        Handled::Yes
1449    }
1450}
1451
1452impl<T> IntoHandled<T> for Handled<T> {
1453    fn into_handled(self) -> Handled<T> {
1454        self
1455    }
1456}
1457
1458/// Connection context for sending messages and spawning tasks.
1459///
1460/// This is the primary handle for interacting with the JSON-RPC connection from
1461/// within handler callbacks. You can use it to:
1462///
1463/// * Send requests and notifications to the other side
1464/// * Spawn concurrent tasks that run alongside the connection
1465/// * Respond to requests (via [`Responder`] which wraps this)
1466///
1467/// # Cloning
1468///
1469/// `ConnectionTo` is cheaply cloneable - all clones refer to the same underlying connection.
1470/// This makes it easy to share across async tasks.
1471///
1472/// # Event Loop and Concurrency
1473///
1474/// Handler callbacks run on the event loop, which means the connection cannot process new
1475/// messages while your handler is running. Use [`spawn`](Self::spawn) to offload any
1476/// expensive or blocking work to concurrent tasks.
1477///
1478/// See the [Event Loop and Concurrency](Builder#event-loop-and-concurrency) section
1479/// for more details.
1480#[derive(Clone, Debug)]
1481pub struct ConnectionTo<Counterpart: Role> {
1482    counterpart: Counterpart,
1483    message_tx: OutgoingMessageTx,
1484    task_tx: TaskTx,
1485    dynamic_handler_tx: mpsc::UnboundedSender<DynamicHandlerMessage<Counterpart>>,
1486}
1487
1488impl<Counterpart: Role> ConnectionTo<Counterpart> {
1489    fn new(
1490        counterpart: Counterpart,
1491        message_tx: mpsc::UnboundedSender<OutgoingMessage>,
1492        task_tx: mpsc::UnboundedSender<Task>,
1493        dynamic_handler_tx: mpsc::UnboundedSender<DynamicHandlerMessage<Counterpart>>,
1494    ) -> Self {
1495        Self {
1496            counterpart,
1497            message_tx,
1498            task_tx,
1499            dynamic_handler_tx,
1500        }
1501    }
1502
1503    /// Return the counterpart role this connection is talking to.
1504    pub fn counterpart(&self) -> Counterpart {
1505        self.counterpart.clone()
1506    }
1507
1508    /// Spawns a task that will run so long as the JSON-RPC connection is being served.
1509    ///
1510    /// This is the primary mechanism for offloading expensive work from handler callbacks
1511    /// to avoid blocking the event loop. Spawned tasks run concurrently with the connection,
1512    /// allowing the server to continue processing messages.
1513    ///
1514    /// # Event Loop
1515    ///
1516    /// Handler callbacks run on the event loop, which cannot process new messages while
1517    /// your handler is running. Use `spawn` for any expensive operations:
1518    ///
1519    /// ```no_run
1520    /// # use agent_client_protocol_test::*;
1521    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
1522    /// # let connection = mock_connection();
1523    /// connection.on_receive_request(async |req: ProcessRequest, responder, cx| {
1524    ///     // Clone cx for the spawned task
1525    ///     cx.spawn({
1526    ///         let connection = cx.clone();
1527    ///         async move {
1528    ///             let result = expensive_operation(&req.data).await?;
1529    ///             connection.send_notification(ProcessComplete { result })?;
1530    ///             Ok(())
1531    ///         }
1532    ///     })?;
1533    ///
1534    ///     // Respond immediately
1535    ///     responder.respond(ProcessResponse { result: "started".into() })
1536    /// }, agent_client_protocol::on_receive_request!())
1537    /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
1538    /// # Ok(())
1539    /// # }
1540    /// ```
1541    ///
1542    /// # Errors
1543    ///
1544    /// If the spawned task returns an error, the entire server will shut down.
1545    #[track_caller]
1546    pub fn spawn(
1547        &self,
1548        task: impl IntoFuture<Output = Result<(), crate::Error>, IntoFuture: Send + 'static>,
1549    ) -> Result<(), crate::Error> {
1550        let location = std::panic::Location::caller();
1551        let task = task.into_future();
1552        Task::new(location, task).spawn(&self.task_tx)
1553    }
1554
1555    /// Spawn a JSON-RPC connection in the background and return a [`ConnectionTo`] for sending messages to it.
1556    ///
1557    /// This is useful for creating multiple connections that communicate with each other,
1558    /// such as implementing proxy patterns or connecting to multiple backend services.
1559    ///
1560    /// # Arguments
1561    ///
1562    /// - `builder`: The connection builder with handlers configured
1563    /// - `transport`: The transport component to connect to
1564    ///
1565    /// # Returns
1566    ///
1567    /// A `ConnectionTo` that you can use to send requests and notifications to the spawned connection.
1568    ///
1569    /// # Example: Proxying to a backend connection
1570    ///
1571    /// ```
1572    /// # use agent_client_protocol::UntypedRole;
1573    /// # use agent_client_protocol::{Builder, ConnectionTo};
1574    /// # use agent_client_protocol_test::*;
1575    /// # async fn example(cx: ConnectionTo<UntypedRole>) -> Result<(), agent_client_protocol::Error> {
1576    /// // Set up a backend connection builder
1577    /// let backend = UntypedRole.builder()
1578    ///     .on_receive_request(async |req: MyRequest, responder, _cx| {
1579    ///         responder.respond(MyResponse { status: "ok".into() })
1580    ///     }, agent_client_protocol::on_receive_request!());
1581    ///
1582    /// // Spawn it and get a context to send requests to it
1583    /// let backend_connection = cx.spawn_connection(backend, MockTransport)?;
1584    ///
1585    /// // Now you can forward requests to the backend
1586    /// let response = backend_connection.send_request(MyRequest {}).block_task().await?;
1587    /// # Ok(())
1588    /// # }
1589    /// ```
1590    #[track_caller]
1591    pub fn spawn_connection<R: Role>(
1592        &self,
1593        builder: Builder<
1594            R,
1595            impl HandleDispatchFrom<R::Counterpart> + 'static,
1596            impl RunWithConnectionTo<R::Counterpart> + 'static,
1597        >,
1598        transport: impl ConnectTo<R> + 'static,
1599    ) -> Result<ConnectionTo<R::Counterpart>, crate::Error> {
1600        let (connection, future) =
1601            builder.into_connection_and_future(transport, |_| std::future::pending());
1602        Task::new(std::panic::Location::caller(), future).spawn(&self.task_tx)?;
1603        Ok(connection)
1604    }
1605
1606    /// Send a request/notification and forward the response appropriately.
1607    ///
1608    /// The request context's response type matches the request's response type,
1609    /// enabling type-safe message forwarding.
1610    pub fn send_proxied_message<Req: JsonRpcRequest<Response: Send>, Notif: JsonRpcNotification>(
1611        &self,
1612        message: Dispatch<Req, Notif>,
1613    ) -> Result<(), crate::Error>
1614    where
1615        Counterpart: HasPeer<Counterpart>,
1616    {
1617        self.send_proxied_message_to(self.counterpart(), message)
1618    }
1619
1620    /// Send a request/notification and forward the response appropriately.
1621    ///
1622    /// The request context's response type matches the request's response type,
1623    /// enabling type-safe message forwarding.
1624    pub fn send_proxied_message_to<
1625        Peer: Role,
1626        Req: JsonRpcRequest<Response: Send>,
1627        Notif: JsonRpcNotification,
1628    >(
1629        &self,
1630        peer: Peer,
1631        message: Dispatch<Req, Notif>,
1632    ) -> Result<(), crate::Error>
1633    where
1634        Counterpart: HasPeer<Peer>,
1635    {
1636        match message {
1637            Dispatch::Request(request, responder) => self
1638                .send_request_to(peer, request)
1639                .forward_response_to(responder),
1640            Dispatch::Notification(notification) => self.send_notification_to(peer, notification),
1641            Dispatch::Response(result, router) => {
1642                // Responses are forwarded directly to their destination
1643                router.respond_with_result(result)
1644            }
1645        }
1646    }
1647
1648    /// Send an outgoing request and return a [`SentRequest`] for handling the reply.
1649    ///
1650    /// The returned [`SentRequest`] provides methods for receiving the response without
1651    /// blocking the event loop:
1652    ///
1653    /// * [`on_receiving_result`](SentRequest::on_receiving_result) - Schedule
1654    ///   a callback to run when the response arrives (doesn't block the event loop)
1655    /// * [`block_task`](SentRequest::block_task) - Block the current task until the response
1656    ///   arrives (only safe in spawned tasks, not in handlers)
1657    ///
1658    /// # Anti-Footgun Design
1659    ///
1660    /// The API intentionally makes it difficult to block on the result directly to prevent
1661    /// the common mistake of blocking the event loop while waiting for a response:
1662    ///
1663    /// ```compile_fail
1664    /// # use agent_client_protocol_test::*;
1665    /// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
1666    /// // ❌ This doesn't compile - prevents blocking the event loop
1667    /// let response = cx.send_request(MyRequest {}).await?;
1668    /// # Ok(())
1669    /// # }
1670    /// ```
1671    ///
1672    /// ```no_run
1673    /// # use agent_client_protocol_test::*;
1674    /// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
1675    /// // ✅ Option 1: Schedule callback (safe in handlers)
1676    /// cx.send_request(MyRequest {})
1677    ///     .on_receiving_result(async |result| {
1678    ///         // Handle the response
1679    ///         Ok(())
1680    ///     })?;
1681    ///
1682    /// // ✅ Option 2: Block in spawned task (safe because task is concurrent)
1683    /// cx.spawn({
1684    ///     let cx = cx.clone();
1685    ///     async move {
1686    ///         let response = cx.send_request(MyRequest {})
1687    ///             .block_task()
1688    ///             .await?;
1689    ///         // Process response...
1690    ///         Ok(())
1691    ///     }
1692    /// })?;
1693    /// # Ok(())
1694    /// # }
1695    /// ```
1696    /// Send an outgoing request to the default counterpart peer.
1697    ///
1698    /// This is a convenience method that sends to the counterpart role `R`.
1699    /// For explicit control over the target peer, use [`send_request_to`](Self::send_request_to).
1700    pub fn send_request<Req: JsonRpcRequest>(&self, request: Req) -> SentRequest<Req::Response>
1701    where
1702        Counterpart: HasPeer<Counterpart>,
1703    {
1704        self.send_request_to(self.counterpart.clone(), request)
1705    }
1706
1707    /// Send an outgoing request to a specific peer.
1708    ///
1709    /// The message will be transformed according to the [`HasPeer`](crate::role::HasPeer)
1710    /// implementation before being sent.
1711    pub fn send_request_to<Peer: Role, Req: JsonRpcRequest>(
1712        &self,
1713        peer: Peer,
1714        request: Req,
1715    ) -> SentRequest<Req::Response>
1716    where
1717        Counterpart: HasPeer<Peer>,
1718    {
1719        let method = request.method().to_string();
1720        let id = jsonrpcmsg::Id::String(uuid::Uuid::new_v4().to_string());
1721        let (response_tx, response_rx) = oneshot::channel();
1722        let role_id = peer.role_id();
1723        let remote_style = self.counterpart.remote_style(peer);
1724        match remote_style.transform_outgoing_message(request) {
1725            Ok(untyped) => {
1726                // Transform the message for the target role
1727                let message = OutgoingMessage::Request {
1728                    id: id.clone(),
1729                    method: method.clone(),
1730                    role_id,
1731                    untyped,
1732                    response_tx,
1733                };
1734
1735                match self.message_tx.unbounded_send(message) {
1736                    Ok(()) => (),
1737                    Err(error) => {
1738                        let OutgoingMessage::Request {
1739                            method,
1740                            response_tx,
1741                            ..
1742                        } = error.into_inner()
1743                        else {
1744                            unreachable!();
1745                        };
1746
1747                        response_tx
1748                            .send(ResponsePayload {
1749                                result: Err(crate::util::internal_error(format!(
1750                                    "failed to send outgoing request `{method}"
1751                                ))),
1752                                ack_tx: None,
1753                            })
1754                            .unwrap();
1755                    }
1756                }
1757            }
1758
1759            Err(err) => {
1760                response_tx
1761                    .send(ResponsePayload {
1762                        result: Err(crate::util::internal_error(format!(
1763                            "failed to create untyped request for `{method}`: {err}"
1764                        ))),
1765                        ack_tx: None,
1766                    })
1767                    .unwrap();
1768            }
1769        }
1770
1771        SentRequest::new(id, method.clone(), self.task_tx.clone(), response_rx)
1772            .map(move |json| <Req::Response>::from_value(&method, json))
1773    }
1774
1775    /// Send an outgoing notification to the default counterpart peer (no reply expected).
1776    ///
1777    /// Notifications are fire-and-forget messages that don't have IDs and don't expect responses.
1778    /// This method sends the notification immediately and returns.
1779    ///
1780    /// This is a convenience method that sends to the counterpart role `R`.
1781    /// For explicit control over the target peer, use [`send_notification_to`](Self::send_notification_to).
1782    ///
1783    /// ```no_run
1784    /// # use agent_client_protocol_test::*;
1785    /// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::Agent>) -> Result<(), agent_client_protocol::Error> {
1786    /// cx.send_notification(StatusUpdate {
1787    ///     message: "Processing...".into(),
1788    /// })?;
1789    /// # Ok(())
1790    /// # }
1791    /// ```
1792    pub fn send_notification<N: JsonRpcNotification>(
1793        &self,
1794        notification: N,
1795    ) -> Result<(), crate::Error>
1796    where
1797        Counterpart: HasPeer<Counterpart>,
1798    {
1799        self.send_notification_to(self.counterpart.clone(), notification)
1800    }
1801
1802    /// Send an outgoing notification to a specific peer (no reply expected).
1803    ///
1804    /// The message will be transformed according to the [`HasPeer`](crate::role::HasPeer)
1805    /// implementation before being sent.
1806    pub fn send_notification_to<Peer: Role, N: JsonRpcNotification>(
1807        &self,
1808        peer: Peer,
1809        notification: N,
1810    ) -> Result<(), crate::Error>
1811    where
1812        Counterpart: HasPeer<Peer>,
1813    {
1814        let remote_style = self.counterpart.remote_style(peer);
1815        tracing::debug!(
1816            role = std::any::type_name::<Counterpart>(),
1817            peer = std::any::type_name::<Peer>(),
1818            notification_type = std::any::type_name::<N>(),
1819            ?remote_style,
1820            original_method = notification.method(),
1821            "send_notification_to"
1822        );
1823        let transformed = remote_style.transform_outgoing_message(notification)?;
1824        tracing::debug!(
1825            transformed_method = %transformed.method,
1826            "send_notification_to transformed"
1827        );
1828        send_raw_message(
1829            &self.message_tx,
1830            OutgoingMessage::Notification {
1831                untyped: transformed,
1832            },
1833        )
1834    }
1835
1836    /// Send an error notification (no reply expected).
1837    pub fn send_error_notification(&self, error: crate::Error) -> Result<(), crate::Error> {
1838        send_raw_message(&self.message_tx, OutgoingMessage::Error { error })
1839    }
1840
1841    /// Register a dynamic message handler, used to intercept messages specific to a particular session
1842    /// or some similar modal thing.
1843    ///
1844    /// Dynamic message handlers are called first for every incoming message.
1845    ///
1846    /// If they decline to handle the message, then the message is passed to the regular registered handlers.
1847    ///
1848    /// The handler will stay registered until the returned registration guard is dropped.
1849    pub fn add_dynamic_handler(
1850        &self,
1851        handler: impl HandleDispatchFrom<Counterpart> + 'static,
1852    ) -> Result<DynamicHandlerRegistration<Counterpart>, crate::Error> {
1853        let uuid = Uuid::new_v4();
1854        self.dynamic_handler_tx
1855            .unbounded_send(DynamicHandlerMessage::AddDynamicHandler(
1856                uuid,
1857                Box::new(handler),
1858            ))
1859            .map_err(crate::util::internal_error)?;
1860
1861        Ok(DynamicHandlerRegistration::new(uuid, self.clone()))
1862    }
1863
1864    fn remove_dynamic_handler(&self, uuid: Uuid) {
1865        // Ignore errors
1866        drop(
1867            self.dynamic_handler_tx
1868                .unbounded_send(DynamicHandlerMessage::RemoveDynamicHandler(uuid)),
1869        );
1870    }
1871}
1872
1873#[derive(Clone, Debug)]
1874pub struct DynamicHandlerRegistration<R: Role> {
1875    uuid: Uuid,
1876    cx: ConnectionTo<R>,
1877}
1878
1879impl<R: Role> DynamicHandlerRegistration<R> {
1880    fn new(uuid: Uuid, cx: ConnectionTo<R>) -> Self {
1881        Self { uuid, cx }
1882    }
1883
1884    /// Prevents the dynamic handler from being removed when dropped.
1885    pub fn run_indefinitely(self) {
1886        std::mem::forget(self);
1887    }
1888}
1889
1890impl<R: Role> Drop for DynamicHandlerRegistration<R> {
1891    fn drop(&mut self) {
1892        self.cx.remove_dynamic_handler(self.uuid);
1893    }
1894}
1895
1896/// The context to respond to an incoming request.
1897///
1898/// This context is provided to request handlers and serves a dual role:
1899///
1900/// 1. **Respond to the request** - Use [`respond`](Self::respond) or
1901///    [`respond_with_result`](Self::respond_with_result) to send the response
1902/// 2. **Send other messages** - Use the [`ConnectionTo`] parameter passed to your
1903///    handler, which provides [`send_request`](`ConnectionTo::send_request`),
1904///    [`send_notification`](`ConnectionTo::send_notification`), and
1905///    [`spawn`](`ConnectionTo::spawn`)
1906///
1907/// # Example
1908///
1909/// ```no_run
1910/// # use agent_client_protocol_test::*;
1911/// # async fn example() -> Result<(), agent_client_protocol::Error> {
1912/// # let connection = mock_connection();
1913/// connection.on_receive_request(async |req: ProcessRequest, responder, cx| {
1914///     // Send a notification while processing
1915///     cx.send_notification(StatusUpdate {
1916///         message: "processing".into(),
1917///     })?;
1918///
1919///     // Do some work...
1920///     let result = process(&req.data)?;
1921///
1922///     // Respond to the request
1923///     responder.respond(ProcessResponse { result })
1924/// }, agent_client_protocol::on_receive_request!())
1925/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
1926/// # Ok(())
1927/// # }
1928/// ```
1929///
1930/// # Event Loop Considerations
1931///
1932/// Like all handlers, request handlers run on the event loop. Use
1933/// [`spawn`](ConnectionTo::spawn) for expensive operations to avoid blocking
1934/// the connection.
1935///
1936/// See the [Event Loop and Concurrency](Builder#event-loop-and-concurrency)
1937/// section for more details.
1938#[must_use]
1939pub struct Responder<T: JsonRpcResponse = serde_json::Value> {
1940    /// The method of the request.
1941    method: String,
1942
1943    /// The `id` of the message we are replying to.
1944    id: jsonrpcmsg::Id,
1945
1946    /// Function to send the response to its destination.
1947    ///
1948    /// For incoming requests: serializes to JSON and sends over the wire.
1949    /// For incoming responses: sends to the waiting oneshot channel.
1950    send_fn: Box<dyn FnOnce(Result<T, crate::Error>) -> Result<(), crate::Error> + Send>,
1951}
1952
1953impl<T: JsonRpcResponse> std::fmt::Debug for Responder<T> {
1954    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1955        f.debug_struct("Responder")
1956            .field("method", &self.method)
1957            .field("id", &self.id)
1958            .field("response_type", &std::any::type_name::<T>())
1959            .finish_non_exhaustive()
1960    }
1961}
1962
1963impl Responder<serde_json::Value> {
1964    /// Create a new request context for an incoming request.
1965    ///
1966    /// The response will be serialized to JSON and sent over the wire.
1967    fn new(message_tx: OutgoingMessageTx, method: String, id: jsonrpcmsg::Id) -> Self {
1968        let id_clone = id.clone();
1969        let method_clone = method.clone();
1970        Self {
1971            method,
1972            id,
1973            send_fn: Box::new(move |response: Result<serde_json::Value, crate::Error>| {
1974                send_raw_message(
1975                    &message_tx,
1976                    OutgoingMessage::Response {
1977                        id: id_clone,
1978                        method: method_clone,
1979                        response,
1980                    },
1981                )
1982            }),
1983        }
1984    }
1985
1986    /// Cast this request context to a different response type.
1987    ///
1988    /// The provided type `T` will be serialized to JSON before sending.
1989    pub fn cast<T: JsonRpcResponse>(self) -> Responder<T> {
1990        self.wrap_params(move |method, value| match value {
1991            Ok(value) => T::into_json(value, method),
1992            Err(e) => Err(e),
1993        })
1994    }
1995}
1996
1997impl<T: JsonRpcResponse> Responder<T> {
1998    /// Method of the incoming request
1999    #[must_use]
2000    pub fn method(&self) -> &str {
2001        &self.method
2002    }
2003
2004    /// ID of the incoming request/response as a JSON value
2005    #[must_use]
2006    pub fn id(&self) -> serde_json::Value {
2007        crate::util::id_to_json(&self.id)
2008    }
2009
2010    /// Convert to a `Responder` that expects a JSON value
2011    /// and which checks (dynamically) that the JSON value it receives
2012    /// can be converted to `T`.
2013    pub fn erase_to_json(self) -> Responder<serde_json::Value> {
2014        self.wrap_params(|method, value| T::from_value(method, value?))
2015    }
2016
2017    /// Return a new Responder with a different method name.
2018    pub fn wrap_method(self, method: String) -> Responder<T> {
2019        Responder {
2020            method,
2021            id: self.id,
2022            send_fn: self.send_fn,
2023        }
2024    }
2025
2026    /// Return a new Responder that expects a response of type U.
2027    ///
2028    /// `wrap_fn` will be invoked with the method name and the result to transform
2029    /// type `U` into type `T` before sending.
2030    pub fn wrap_params<U: JsonRpcResponse>(
2031        self,
2032        wrap_fn: impl FnOnce(&str, Result<U, crate::Error>) -> Result<T, crate::Error> + Send + 'static,
2033    ) -> Responder<U> {
2034        let method = self.method.clone();
2035        Responder {
2036            method: self.method,
2037            id: self.id,
2038            send_fn: Box::new(move |input: Result<U, crate::Error>| {
2039                let t_value = wrap_fn(&method, input);
2040                (self.send_fn)(t_value)
2041            }),
2042        }
2043    }
2044
2045    /// Respond to the JSON-RPC request with either a value (`Ok`) or an error (`Err`).
2046    pub fn respond_with_result(
2047        self,
2048        response: Result<T, crate::Error>,
2049    ) -> Result<(), crate::Error> {
2050        tracing::debug!(id = ?self.id, "respond called");
2051        (self.send_fn)(response)
2052    }
2053
2054    /// Respond to the JSON-RPC request with a value.
2055    pub fn respond(self, response: T) -> Result<(), crate::Error> {
2056        self.respond_with_result(Ok(response))
2057    }
2058
2059    /// Respond to the JSON-RPC request with an internal error containing a message.
2060    pub fn respond_with_internal_error(self, message: impl ToString) -> Result<(), crate::Error> {
2061        self.respond_with_error(crate::util::internal_error(message))
2062    }
2063
2064    /// Respond to the JSON-RPC request with an error.
2065    pub fn respond_with_error(self, error: crate::Error) -> Result<(), crate::Error> {
2066        tracing::debug!(id = ?self.id, ?error, "respond_with_error called");
2067        self.respond_with_result(Err(error))
2068    }
2069}
2070
2071/// Context for handling an incoming JSON-RPC response.
2072///
2073/// This is the response-side counterpart to [`Responder`]. While `Responder` handles
2074/// incoming requests (where you send a response over the wire), `ResponseRouter` handles
2075/// incoming responses (where you route the response to a local task waiting for it).
2076///
2077/// Both are fundamentally "sinks" that push the message through a `send_fn`, but they
2078/// represent different points in the message lifecycle and carry different metadata.
2079#[must_use]
2080pub struct ResponseRouter<T: JsonRpcResponse = serde_json::Value> {
2081    /// The method of the original request.
2082    method: String,
2083
2084    /// The `id` of the original request.
2085    id: jsonrpcmsg::Id,
2086
2087    /// The RoleId to which the original request was sent
2088    /// (and hence from which the reply is expected).
2089    role_id: RoleId,
2090
2091    /// Function to send the response to the waiting task.
2092    send_fn: Box<dyn FnOnce(Result<T, crate::Error>) -> Result<(), crate::Error> + Send>,
2093}
2094
2095impl<T: JsonRpcResponse> std::fmt::Debug for ResponseRouter<T> {
2096    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2097        f.debug_struct("ResponseRouter")
2098            .field("method", &self.method)
2099            .field("id", &self.id)
2100            .field("response_type", &std::any::type_name::<T>())
2101            .finish_non_exhaustive()
2102    }
2103}
2104
2105impl ResponseRouter<serde_json::Value> {
2106    /// Create a new response context for routing a response to a local awaiter.
2107    ///
2108    /// When `respond_with_result` is called, the response is sent through the oneshot
2109    /// channel to the code that originally sent the request.
2110    pub(crate) fn new(
2111        method: String,
2112        id: jsonrpcmsg::Id,
2113        role_id: RoleId,
2114        sender: oneshot::Sender<ResponsePayload>,
2115    ) -> Self {
2116        Self {
2117            method,
2118            id,
2119            role_id,
2120            send_fn: Box::new(move |response: Result<serde_json::Value, crate::Error>| {
2121                sender
2122                    .send(ResponsePayload {
2123                        result: response,
2124                        ack_tx: None,
2125                    })
2126                    .map_err(|_| {
2127                        crate::util::internal_error("failed to send response, receiver dropped")
2128                    })
2129            }),
2130        }
2131    }
2132
2133    /// Cast this response context to a different response type.
2134    ///
2135    /// The provided type `T` will be serialized to JSON before sending.
2136    pub fn cast<T: JsonRpcResponse>(self) -> ResponseRouter<T> {
2137        self.wrap_params(move |method, value| match value {
2138            Ok(value) => T::into_json(value, method),
2139            Err(e) => Err(e),
2140        })
2141    }
2142}
2143
2144impl<T: JsonRpcResponse> ResponseRouter<T> {
2145    /// Method of the original request
2146    #[must_use]
2147    pub fn method(&self) -> &str {
2148        &self.method
2149    }
2150
2151    /// ID of the original request as a JSON value
2152    #[must_use]
2153    pub fn id(&self) -> serde_json::Value {
2154        crate::util::id_to_json(&self.id)
2155    }
2156
2157    /// The peer to which the original request was sent.
2158    ///
2159    /// This is the peer from which we expect to receive the response.
2160    #[must_use]
2161    pub fn role_id(&self) -> RoleId {
2162        self.role_id.clone()
2163    }
2164
2165    /// Convert to a `ResponseRouter` that expects a JSON value
2166    /// and which checks (dynamically) that the JSON value it receives
2167    /// can be converted to `T`.
2168    pub fn erase_to_json(self) -> ResponseRouter<serde_json::Value> {
2169        self.wrap_params(|method, value| T::from_value(method, value?))
2170    }
2171
2172    /// Return a new ResponseRouter that expects a response of type U.
2173    ///
2174    /// `wrap_fn` will be invoked with the method name and the result to transform
2175    /// type `U` into type `T` before sending.
2176    fn wrap_params<U: JsonRpcResponse>(
2177        self,
2178        wrap_fn: impl FnOnce(&str, Result<U, crate::Error>) -> Result<T, crate::Error> + Send + 'static,
2179    ) -> ResponseRouter<U> {
2180        let method = self.method.clone();
2181        ResponseRouter {
2182            method: self.method,
2183            id: self.id,
2184            role_id: self.role_id,
2185            send_fn: Box::new(move |input: Result<U, crate::Error>| {
2186                let t_value = wrap_fn(&method, input);
2187                (self.send_fn)(t_value)
2188            }),
2189        }
2190    }
2191
2192    /// Complete the response by sending the result to the waiting task.
2193    pub fn respond_with_result(
2194        self,
2195        response: Result<T, crate::Error>,
2196    ) -> Result<(), crate::Error> {
2197        tracing::debug!(id = ?self.id, "response routed to awaiter");
2198        (self.send_fn)(response)
2199    }
2200
2201    /// Complete the response by sending a value to the waiting task.
2202    pub fn respond(self, response: T) -> Result<(), crate::Error> {
2203        self.respond_with_result(Ok(response))
2204    }
2205
2206    /// Complete the response by sending an internal error to the waiting task.
2207    pub fn respond_with_internal_error(self, message: impl ToString) -> Result<(), crate::Error> {
2208        self.respond_with_error(crate::util::internal_error(message))
2209    }
2210
2211    /// Complete the response by sending an error to the waiting task.
2212    pub fn respond_with_error(self, error: crate::Error) -> Result<(), crate::Error> {
2213        tracing::debug!(id = ?self.id, ?error, "error routed to awaiter");
2214        self.respond_with_result(Err(error))
2215    }
2216}
2217
2218/// Common bounds for any JSON-RPC message.
2219///
2220/// # Derive Macro
2221///
2222/// For simple message types, you can use the `JsonRpcRequest` or `JsonRpcNotification` derive macros
2223/// which will implement both `JsonRpcMessage` and the respective trait. See [`JsonRpcRequest`] and
2224/// [`JsonRpcNotification`] for examples.
2225pub trait JsonRpcMessage: 'static + Debug + Sized + Send + Clone {
2226    /// Check if this message type matches the given method name.
2227    fn matches_method(method: &str) -> bool;
2228
2229    /// The method name for the message.
2230    fn method(&self) -> &str;
2231
2232    /// Convert this message into an untyped message.
2233    fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error>;
2234
2235    /// Parse this type from a method name and parameters.
2236    ///
2237    /// Returns an error if the method doesn't match or deserialization fails.
2238    /// Callers should use `matches_method` first to check if this type handles the method.
2239    fn parse_message(method: &str, params: &impl Serialize) -> Result<Self, crate::Error>;
2240}
2241
2242/// Defines the "payload" of a successful response to a JSON-RPC request.
2243///
2244/// # Derive Macro
2245///
2246/// Use `#[derive(JsonRpcResponse)]` to automatically implement this trait:
2247///
2248/// ```ignore
2249/// use agent_client_protocol::JsonRpcResponse;
2250/// use serde::{Serialize, Deserialize};
2251///
2252/// #[derive(Debug, Serialize, Deserialize, JsonRpcResponse)]
2253/// #[response(method = "_hello")]
2254/// struct HelloResponse {
2255///     greeting: String,
2256/// }
2257/// ```
2258pub trait JsonRpcResponse: 'static + Debug + Sized + Send + Clone {
2259    /// Convert this message into a JSON value.
2260    fn into_json(self, method: &str) -> Result<serde_json::Value, crate::Error>;
2261
2262    /// Parse a JSON value into the response type.
2263    fn from_value(method: &str, value: serde_json::Value) -> Result<Self, crate::Error>;
2264}
2265
2266impl JsonRpcResponse for serde_json::Value {
2267    fn from_value(_method: &str, value: serde_json::Value) -> Result<Self, crate::Error> {
2268        Ok(value)
2269    }
2270
2271    fn into_json(self, _method: &str) -> Result<serde_json::Value, crate::Error> {
2272        Ok(self)
2273    }
2274}
2275
2276/// A struct that represents a notification (JSON-RPC message that does not expect a response).
2277///
2278/// # Derive Macro
2279///
2280/// Use `#[derive(JsonRpcNotification)]` to automatically implement both `JsonRpcMessage` and `JsonRpcNotification`:
2281///
2282/// ```ignore
2283/// use agent_client_protocol::JsonRpcNotification;
2284/// use serde::{Serialize, Deserialize};
2285///
2286/// #[derive(Debug, Clone, Serialize, Deserialize, JsonRpcNotification)]
2287/// #[notification(method = "_ping")]
2288/// struct PingNotification {
2289///     timestamp: u64,
2290/// }
2291/// ```
2292pub trait JsonRpcNotification: JsonRpcMessage {}
2293
2294/// A struct that represents a request (JSON-RPC message expecting a response).
2295///
2296/// # Derive Macro
2297///
2298/// Use `#[derive(JsonRpcRequest)]` to automatically implement both `JsonRpcMessage` and `JsonRpcRequest`:
2299///
2300/// ```ignore
2301/// use agent_client_protocol::{JsonRpcRequest, JsonRpcResponse};
2302/// use serde::{Serialize, Deserialize};
2303///
2304/// #[derive(Debug, Clone, Serialize, Deserialize, JsonRpcRequest)]
2305/// #[request(method = "_hello", response = HelloResponse)]
2306/// struct HelloRequest {
2307///     name: String,
2308/// }
2309///
2310/// #[derive(Debug, Serialize, Deserialize, JsonRpcResponse)]
2311/// struct HelloResponse {
2312///     greeting: String,
2313/// }
2314/// ```
2315pub trait JsonRpcRequest: JsonRpcMessage {
2316    /// The type of data expected in response.
2317    type Response: JsonRpcResponse;
2318}
2319
2320/// An enum capturing an in-flight request or notification.
2321/// In the case of a request, also includes the context used to respond to the request.
2322///
2323/// Type parameters allow specifying the concrete request and notification types.
2324/// By default, both are `UntypedMessage` for dynamic dispatch.
2325/// The request context's response type matches the request's response type.
2326#[derive(Debug)]
2327pub enum Dispatch<Req: JsonRpcRequest = UntypedMessage, Notif: JsonRpcMessage = UntypedMessage> {
2328    /// Incoming request and the context where the response should be sent.
2329    Request(Req, Responder<Req::Response>),
2330
2331    /// Incoming notification.
2332    Notification(Notif),
2333
2334    /// Incoming response to a request we sent.
2335    ///
2336    /// The first field is the response result (success or error from the remote).
2337    /// The second field is the context for forwarding the response to its destination
2338    /// (typically a waiting oneshot channel).
2339    Response(
2340        Result<Req::Response, crate::Error>,
2341        ResponseRouter<Req::Response>,
2342    ),
2343}
2344
2345impl<Req: JsonRpcRequest, Notif: JsonRpcMessage> Dispatch<Req, Notif> {
2346    /// Map the request and notification types to new types.
2347    ///
2348    /// Note: Response variants are passed through unchanged since they don't
2349    /// contain a parseable message payload.
2350    pub fn map<Req1, Notif1>(
2351        self,
2352        map_request: impl FnOnce(Req, Responder<Req::Response>) -> (Req1, Responder<Req1::Response>),
2353        map_notification: impl FnOnce(Notif) -> Notif1,
2354    ) -> Dispatch<Req1, Notif1>
2355    where
2356        Req1: JsonRpcRequest<Response = Req::Response>,
2357        Notif1: JsonRpcMessage,
2358    {
2359        match self {
2360            Dispatch::Request(request, responder) => {
2361                let (new_request, new_responder) = map_request(request, responder);
2362                Dispatch::Request(new_request, new_responder)
2363            }
2364            Dispatch::Notification(notification) => {
2365                let new_notification = map_notification(notification);
2366                Dispatch::Notification(new_notification)
2367            }
2368            Dispatch::Response(result, router) => Dispatch::Response(result, router),
2369        }
2370    }
2371
2372    /// Respond to the message with an error.
2373    ///
2374    /// If this message is a request, this error becomes the reply to the request.
2375    ///
2376    /// If this message is a notification, the error is sent as a notification.
2377    ///
2378    /// If this message is a response, the error is forwarded to the waiting handler.
2379    pub fn respond_with_error<R: Role>(
2380        self,
2381        error: crate::Error,
2382        cx: ConnectionTo<R>,
2383    ) -> Result<(), crate::Error> {
2384        match self {
2385            Dispatch::Request(_, responder) => responder.respond_with_error(error),
2386            Dispatch::Notification(_) => cx.send_error_notification(error),
2387            Dispatch::Response(_, responder) => responder.respond_with_error(error),
2388        }
2389    }
2390
2391    /// Convert to a `Responder` that expects a JSON value
2392    /// and which checks (dynamically) that the JSON value it receives
2393    /// can be converted to `T`.
2394    ///
2395    /// Note: Response variants cannot be erased since their payload is already
2396    /// parsed. This returns an error for Response variants.
2397    pub fn erase_to_json(self) -> Result<Dispatch, crate::Error> {
2398        match self {
2399            Dispatch::Request(response, responder) => Ok(Dispatch::Request(
2400                response.to_untyped_message()?,
2401                responder.erase_to_json(),
2402            )),
2403            Dispatch::Notification(notification) => {
2404                Ok(Dispatch::Notification(notification.to_untyped_message()?))
2405            }
2406            Dispatch::Response(_, _) => Err(crate::util::internal_error(
2407                "cannot erase Response variant to JSON",
2408            )),
2409        }
2410    }
2411
2412    /// Convert the message in self to an untyped message.
2413    ///
2414    /// Note: Response variants don't have an untyped message representation.
2415    /// This returns an error for Response variants.
2416    pub fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error> {
2417        match self {
2418            Dispatch::Request(request, _) => request.to_untyped_message(),
2419            Dispatch::Notification(notification) => notification.to_untyped_message(),
2420            Dispatch::Response(_, _) => Err(crate::util::internal_error(
2421                "Response variant has no untyped message representation",
2422            )),
2423        }
2424    }
2425
2426    /// Convert self to an untyped message context.
2427    ///
2428    /// Note: Response variants cannot be converted. This returns an error for Response variants.
2429    pub fn into_untyped_dispatch(self) -> Result<Dispatch, crate::Error> {
2430        match self {
2431            Dispatch::Request(request, responder) => Ok(Dispatch::Request(
2432                request.to_untyped_message()?,
2433                responder.erase_to_json(),
2434            )),
2435            Dispatch::Notification(notification) => {
2436                Ok(Dispatch::Notification(notification.to_untyped_message()?))
2437            }
2438            Dispatch::Response(_, _) => Err(crate::util::internal_error(
2439                "cannot convert Response variant to untyped message context",
2440            )),
2441        }
2442    }
2443
2444    /// Returns the request ID if this is a request or response, None if notification.
2445    pub fn id(&self) -> Option<serde_json::Value> {
2446        match self {
2447            Dispatch::Request(_, cx) => Some(cx.id()),
2448            Dispatch::Notification(_) => None,
2449            Dispatch::Response(_, cx) => Some(cx.id()),
2450        }
2451    }
2452
2453    /// Returns the method of the message.
2454    ///
2455    /// For requests and notifications, this is the method from the message payload.
2456    /// For responses, this is the method of the original request.
2457    pub fn method(&self) -> &str {
2458        match self {
2459            Dispatch::Request(msg, _) => msg.method(),
2460            Dispatch::Notification(msg) => msg.method(),
2461            Dispatch::Response(_, cx) => cx.method(),
2462        }
2463    }
2464}
2465
2466impl Dispatch {
2467    /// Attempts to parse `self` into a typed message context.
2468    ///
2469    /// # Returns
2470    ///
2471    /// * `Ok(Ok(typed))` if this is a request/notification of the given types
2472    /// * `Ok(Err(self))` if not
2473    /// * `Err` if has the correct method for the given types but parsing fails
2474    #[tracing::instrument(skip(self), fields(Request = ?std::any::type_name::<Req>(), Notif = ?std::any::type_name::<Notif>()), level = "trace", ret)]
2475    pub(crate) fn into_typed_dispatch<Req: JsonRpcRequest, Notif: JsonRpcNotification>(
2476        self,
2477    ) -> Result<Result<Dispatch<Req, Notif>, Dispatch>, crate::Error> {
2478        tracing::debug!(
2479            message = ?self,
2480            "into_typed_dispatch"
2481        );
2482        match self {
2483            Dispatch::Request(message, responder) => {
2484                if Req::matches_method(&message.method) {
2485                    match Req::parse_message(&message.method, &message.params) {
2486                        Ok(req) => {
2487                            tracing::trace!(?req, "parsed ok");
2488                            Ok(Ok(Dispatch::Request(req, responder.cast())))
2489                        }
2490                        Err(err) => {
2491                            tracing::trace!(?err, "parse error");
2492                            Err(err)
2493                        }
2494                    }
2495                } else {
2496                    tracing::trace!("method doesn't match");
2497                    Ok(Err(Dispatch::Request(message, responder)))
2498                }
2499            }
2500
2501            Dispatch::Notification(message) => {
2502                if Notif::matches_method(&message.method) {
2503                    match Notif::parse_message(&message.method, &message.params) {
2504                        Ok(notif) => {
2505                            tracing::trace!(?notif, "parse ok");
2506                            Ok(Ok(Dispatch::Notification(notif)))
2507                        }
2508                        Err(err) => {
2509                            tracing::trace!(?err, "parse error");
2510                            Err(err)
2511                        }
2512                    }
2513                } else {
2514                    tracing::trace!("method doesn't match");
2515                    Ok(Err(Dispatch::Notification(message)))
2516                }
2517            }
2518
2519            Dispatch::Response(result, cx) => {
2520                let method = cx.method();
2521                if Req::matches_method(method) {
2522                    // Parse the response result
2523                    let typed_result = match result {
2524                        Ok(value) => {
2525                            match <Req::Response as JsonRpcResponse>::from_value(method, value) {
2526                                Ok(parsed) => {
2527                                    tracing::trace!(?parsed, "parse ok");
2528                                    Ok(parsed)
2529                                }
2530                                Err(err) => {
2531                                    tracing::trace!(?err, "parse error");
2532                                    return Err(err);
2533                                }
2534                            }
2535                        }
2536                        Err(err) => {
2537                            tracing::trace!("error, passthrough");
2538                            Err(err)
2539                        }
2540                    };
2541                    Ok(Ok(Dispatch::Response(typed_result, cx.cast())))
2542                } else {
2543                    tracing::trace!("method doesn't match");
2544                    Ok(Err(Dispatch::Response(result, cx)))
2545                }
2546            }
2547        }
2548    }
2549
2550    /// True if this message has a field with the given name.
2551    ///
2552    /// Returns `false` for Response variants.
2553    #[must_use]
2554    pub fn has_field(&self, field_name: &str) -> bool {
2555        self.message()
2556            .and_then(|m| m.params().get(field_name))
2557            .is_some()
2558    }
2559
2560    /// Returns true if this message has a session-id field.
2561    ///
2562    /// Returns `false` for Response variants.
2563    pub(crate) fn has_session_id(&self) -> bool {
2564        self.has_field("sessionId")
2565    }
2566
2567    /// Extract the ACP session-id from this message (if any).
2568    ///
2569    /// Returns `Ok(None)` for Response variants.
2570    pub(crate) fn get_session_id(&self) -> Result<Option<SessionId>, crate::Error> {
2571        let Some(message) = self.message() else {
2572            return Ok(None);
2573        };
2574        let Some(value) = message.params().get("sessionId") else {
2575            return Ok(None);
2576        };
2577        let session_id = serde_json::from_value(value.clone())?;
2578        Ok(Some(session_id))
2579    }
2580
2581    /// Try to parse this as a notification of the given type.
2582    ///
2583    /// # Returns
2584    ///
2585    /// * `Ok(Ok(typed))` if this is a request/notification of the given types
2586    /// * `Ok(Err(self))` if not
2587    /// * `Err` if has the correct method for the given types but parsing fails
2588    pub fn into_notification<N: JsonRpcNotification>(
2589        self,
2590    ) -> Result<Result<N, Dispatch>, crate::Error> {
2591        match self {
2592            Dispatch::Notification(msg) => {
2593                if !N::matches_method(&msg.method) {
2594                    return Ok(Err(Dispatch::Notification(msg)));
2595                }
2596                match N::parse_message(&msg.method, &msg.params) {
2597                    Ok(n) => Ok(Ok(n)),
2598                    Err(err) => Err(err),
2599                }
2600            }
2601            Dispatch::Request(..) | Dispatch::Response(..) => Ok(Err(self)),
2602        }
2603    }
2604
2605    /// Try to parse this as a request of the given type.
2606    ///
2607    /// # Returns
2608    ///
2609    /// * `Ok(Ok(typed))` if this is a request/notification of the given types
2610    /// * `Ok(Err(self))` if not
2611    /// * `Err` if has the correct method for the given types but parsing fails
2612    pub fn into_request<Req: JsonRpcRequest>(
2613        self,
2614    ) -> Result<Result<(Req, Responder<Req::Response>), Dispatch>, crate::Error> {
2615        match self {
2616            Dispatch::Request(msg, responder) => {
2617                if !Req::matches_method(&msg.method) {
2618                    return Ok(Err(Dispatch::Request(msg, responder)));
2619                }
2620                match Req::parse_message(&msg.method, &msg.params) {
2621                    Ok(req) => Ok(Ok((req, responder.cast()))),
2622                    Err(err) => Err(err),
2623                }
2624            }
2625            Dispatch::Notification(..) | Dispatch::Response(..) => Ok(Err(self)),
2626        }
2627    }
2628}
2629
2630impl<M: JsonRpcRequest + JsonRpcNotification> Dispatch<M, M> {
2631    /// Returns the message payload for requests and notifications.
2632    ///
2633    /// Returns `None` for Response variants since they don't contain a message payload.
2634    pub fn message(&self) -> Option<&M> {
2635        match self {
2636            Dispatch::Request(msg, _) | Dispatch::Notification(msg) => Some(msg),
2637            Dispatch::Response(_, _) => None,
2638        }
2639    }
2640
2641    /// Map the request/notification message.
2642    ///
2643    /// Response variants pass through unchanged.
2644    pub(crate) fn try_map_message(
2645        self,
2646        map_message: impl FnOnce(M) -> Result<M, crate::Error>,
2647    ) -> Result<Dispatch<M, M>, crate::Error> {
2648        match self {
2649            Dispatch::Request(request, cx) => Ok(Dispatch::Request(map_message(request)?, cx)),
2650            Dispatch::Notification(notification) => {
2651                Ok(Dispatch::<M, M>::Notification(map_message(notification)?))
2652            }
2653            Dispatch::Response(result, cx) => Ok(Dispatch::Response(result, cx)),
2654        }
2655    }
2656}
2657
2658/// An incoming JSON message without any typing. Can be a request or a notification.
2659#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
2660pub struct UntypedMessage {
2661    /// The JSON-RPC method name
2662    pub method: String,
2663    /// The JSON-RPC parameters as a raw JSON value
2664    pub params: serde_json::Value,
2665}
2666
2667impl UntypedMessage {
2668    /// Returns an untyped message with the given method and parameters.
2669    pub fn new(method: &str, params: impl Serialize) -> Result<Self, crate::Error> {
2670        let params = serde_json::to_value(params)?;
2671        Ok(Self {
2672            method: method.to_string(),
2673            params,
2674        })
2675    }
2676
2677    /// Returns the method name
2678    #[must_use]
2679    pub fn method(&self) -> &str {
2680        &self.method
2681    }
2682
2683    /// Returns the parameters as a JSON value
2684    #[must_use]
2685    pub fn params(&self) -> &serde_json::Value {
2686        &self.params
2687    }
2688
2689    /// Consumes this message and returns the method and params
2690    #[must_use]
2691    pub fn into_parts(self) -> (String, serde_json::Value) {
2692        (self.method, self.params)
2693    }
2694
2695    /// Convert `self` to a JSON-RPC message.
2696    pub(crate) fn into_jsonrpc_msg(
2697        self,
2698        id: Option<jsonrpcmsg::Id>,
2699    ) -> Result<jsonrpcmsg::Request, crate::Error> {
2700        let Self { method, params } = self;
2701        Ok(jsonrpcmsg::Request::new_v2(method, json_cast(params)?, id))
2702    }
2703}
2704
2705impl JsonRpcMessage for UntypedMessage {
2706    fn matches_method(_method: &str) -> bool {
2707        // UntypedMessage matches any method - it's the untyped fallback
2708        true
2709    }
2710
2711    fn method(&self) -> &str {
2712        &self.method
2713    }
2714
2715    fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error> {
2716        Ok(self.clone())
2717    }
2718
2719    fn parse_message(method: &str, params: &impl Serialize) -> Result<Self, crate::Error> {
2720        UntypedMessage::new(method, params)
2721    }
2722}
2723
2724impl JsonRpcRequest for UntypedMessage {
2725    type Response = serde_json::Value;
2726}
2727
2728impl JsonRpcNotification for UntypedMessage {}
2729
2730/// Represents a pending response of type `R` from an outgoing request.
2731///
2732/// Returned by [`ConnectionTo::send_request`], this type provides methods for handling
2733/// the response without blocking the event loop. The API is intentionally designed to make
2734/// it difficult to accidentally block.
2735///
2736/// # Anti-Footgun Design
2737///
2738/// You cannot directly `.await` a `SentRequest`. Instead, you must choose how to handle
2739/// the response:
2740///
2741/// ## Option 1: Schedule a Callback (Safe in Handlers)
2742///
2743/// Use [`on_receiving_result`](Self::on_receiving_result) to schedule a task
2744/// that runs when the response arrives. This doesn't block the event loop:
2745///
2746/// ```no_run
2747/// # use agent_client_protocol_test::*;
2748/// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
2749/// cx.send_request(MyRequest {})
2750///     .on_receiving_result(async |result| {
2751///         match result {
2752///             Ok(response) => {
2753///                 // Handle successful response
2754///                 Ok(())
2755///             }
2756///             Err(error) => {
2757///                 // Handle error
2758///                 Err(error)
2759///             }
2760///         }
2761///     })?;
2762/// # Ok(())
2763/// # }
2764/// ```
2765///
2766/// ## Option 2: Block in a Spawned Task (Safe Only in `spawn`)
2767///
2768/// Use [`block_task`](Self::block_task) to block until the response arrives, but **only**
2769/// in a spawned task (never in a handler):
2770///
2771/// ```no_run
2772/// # use agent_client_protocol_test::*;
2773/// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
2774/// // ✅ Safe: Spawned task runs concurrently
2775/// cx.spawn({
2776///     let cx = cx.clone();
2777///     async move {
2778///         let response = cx.send_request(MyRequest {})
2779///             .block_task()
2780///             .await?;
2781///         // Process response...
2782///         Ok(())
2783///     }
2784/// })?;
2785/// # Ok(())
2786/// # }
2787/// ```
2788///
2789/// ```no_run
2790/// # use agent_client_protocol_test::*;
2791/// # async fn example() -> Result<(), agent_client_protocol::Error> {
2792/// # let connection = mock_connection();
2793/// // ❌ NEVER do this in a handler - blocks the event loop!
2794/// connection.on_receive_request(async |req: MyRequest, responder, cx| {
2795///     let response = cx.send_request(MyRequest {})
2796///         .block_task()  // This will deadlock!
2797///         .await?;
2798///     responder.respond(response)
2799/// }, agent_client_protocol::on_receive_request!())
2800/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
2801/// # Ok(())
2802/// # }
2803/// ```
2804///
2805/// # Why This Design?
2806///
2807/// If you block the event loop while waiting for a response, the connection cannot process
2808/// the incoming response message, creating a deadlock. This API design prevents that footgun
2809/// by making blocking explicit and encouraging non-blocking patterns.
2810pub struct SentRequest<T> {
2811    id: jsonrpcmsg::Id,
2812    method: String,
2813    task_tx: TaskTx,
2814    response_rx: oneshot::Receiver<ResponsePayload>,
2815    to_result: Box<dyn Fn(serde_json::Value) -> Result<T, crate::Error> + Send>,
2816}
2817
2818impl<T: Debug> Debug for SentRequest<T> {
2819    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2820        f.debug_struct("SentRequest")
2821            .field("id", &self.id)
2822            .field("method", &self.method)
2823            .field("task_tx", &self.task_tx)
2824            .field("response_rx", &self.response_rx)
2825            .finish_non_exhaustive()
2826    }
2827}
2828
2829impl SentRequest<serde_json::Value> {
2830    fn new(
2831        id: jsonrpcmsg::Id,
2832        method: String,
2833        task_tx: mpsc::UnboundedSender<Task>,
2834        response_rx: oneshot::Receiver<ResponsePayload>,
2835    ) -> Self {
2836        Self {
2837            id,
2838            method,
2839            response_rx,
2840            task_tx,
2841            to_result: Box::new(Ok),
2842        }
2843    }
2844}
2845
2846impl<T: JsonRpcResponse> SentRequest<T> {
2847    /// The id of the outgoing request.
2848    #[must_use]
2849    pub fn id(&self) -> serde_json::Value {
2850        crate::util::id_to_json(&self.id)
2851    }
2852
2853    /// The method of the request this is in response to.
2854    #[must_use]
2855    pub fn method(&self) -> &str {
2856        &self.method
2857    }
2858
2859    /// Create a new response that maps the result of the response to a new type.
2860    pub fn map<U>(
2861        self,
2862        map_fn: impl Fn(T) -> Result<U, crate::Error> + 'static + Send,
2863    ) -> SentRequest<U> {
2864        SentRequest {
2865            id: self.id,
2866            method: self.method,
2867            response_rx: self.response_rx,
2868            task_tx: self.task_tx,
2869            to_result: Box::new(move |value| map_fn((self.to_result)(value)?)),
2870        }
2871    }
2872
2873    /// Forward the response (success or error) to a request context when it arrives.
2874    ///
2875    /// This is a convenience method for proxying messages between connections. When the
2876    /// response arrives, it will be automatically sent to the provided request context,
2877    /// whether it's a successful response or an error.
2878    ///
2879    /// # Example: Proxying requests
2880    ///
2881    /// ```
2882    /// # use agent_client_protocol::UntypedRole;
2883    /// # use agent_client_protocol::{Builder, ConnectionTo};
2884    /// # use agent_client_protocol_test::*;
2885    /// # async fn example(cx: ConnectionTo<UntypedRole>) -> Result<(), agent_client_protocol::Error> {
2886    /// // Set up backend connection builder
2887    /// let backend = UntypedRole.builder()
2888    ///     .on_receive_request(async |req: MyRequest, responder, cx| {
2889    ///         responder.respond(MyResponse { status: "ok".into() })
2890    ///     }, agent_client_protocol::on_receive_request!());
2891    ///
2892    /// // Spawn backend and get a context to send to it
2893    /// let backend_connection = cx.spawn_connection(backend, MockTransport)?;
2894    ///
2895    /// // Set up proxy that forwards requests to backend
2896    /// UntypedRole.builder()
2897    ///     .on_receive_request({
2898    ///         let backend_connection = backend_connection.clone();
2899    ///         async move |req: MyRequest, responder, cx| {
2900    ///             // Forward the request to backend and proxy the response back
2901    ///             backend_connection.send_request(req)
2902    ///                 .forward_response_to(responder)?;
2903    ///             Ok(())
2904    ///         }
2905    ///     }, agent_client_protocol::on_receive_request!());
2906    /// # Ok(())
2907    /// # }
2908    /// ```
2909    ///
2910    /// # Type Safety
2911    ///
2912    /// The request context's response type must match the request's response type,
2913    /// ensuring type-safe message forwarding.
2914    ///
2915    /// # When to Use
2916    ///
2917    /// Use this when:
2918    /// - You're implementing a proxy or gateway pattern
2919    /// - You want to forward responses without processing them
2920    /// - The response types match between the outgoing request and incoming request
2921    ///
2922    /// This is equivalent to calling `on_receiving_result` and manually forwarding
2923    /// the result, but more concise.
2924    pub fn forward_response_to(self, responder: Responder<T>) -> Result<(), crate::Error>
2925    where
2926        T: Send,
2927    {
2928        self.on_receiving_result(async move |result| responder.respond_with_result(result))
2929    }
2930
2931    /// Block the current task until the response is received.
2932    ///
2933    /// **Warning:** This method blocks the current async task. It is **only safe** to use
2934    /// in spawned tasks created with [`ConnectionTo::spawn`]. Using it directly in a
2935    /// handler callback will deadlock the connection.
2936    ///
2937    /// # Safe Usage (in spawned tasks)
2938    ///
2939    /// ```no_run
2940    /// # use agent_client_protocol_test::*;
2941    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
2942    /// # let connection = mock_connection();
2943    /// connection.on_receive_request(async |req: MyRequest, responder, cx| {
2944    ///     // Spawn a task to handle the request
2945    ///     cx.spawn({
2946    ///         let connection = cx.clone();
2947    ///         async move {
2948    ///             // Safe: We're in a spawned task, not blocking the event loop
2949    ///             let response = connection.send_request(OtherRequest {})
2950    ///                 .block_task()
2951    ///                 .await?;
2952    ///
2953    ///             // Process the response...
2954    ///             Ok(())
2955    ///         }
2956    ///     })?;
2957    ///
2958    ///     // Respond immediately
2959    ///     responder.respond(MyResponse { status: "ok".into() })
2960    /// }, agent_client_protocol::on_receive_request!())
2961    /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
2962    /// # Ok(())
2963    /// # }
2964    /// ```
2965    ///
2966    /// # Unsafe Usage (in handlers - will deadlock!)
2967    ///
2968    /// ```no_run
2969    /// # use agent_client_protocol_test::*;
2970    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
2971    /// # let connection = mock_connection();
2972    /// connection.on_receive_request(async |req: MyRequest, responder, cx| {
2973    ///     // ❌ DEADLOCK: Handler blocks event loop, which can't process the response
2974    ///     let response = cx.send_request(OtherRequest {})
2975    ///         .block_task()
2976    ///         .await?;
2977    ///
2978    ///     responder.respond(MyResponse { status: response.value })
2979    /// }, agent_client_protocol::on_receive_request!())
2980    /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
2981    /// # Ok(())
2982    /// # }
2983    /// ```
2984    ///
2985    /// # When to Use
2986    ///
2987    /// Use this method when:
2988    /// - You're in a spawned task (via [`ConnectionTo::spawn`])
2989    /// - You need the response value to proceed with your logic
2990    /// - Linear control flow is more natural than callbacks
2991    ///
2992    /// For handler callbacks, use [`on_receiving_result`](Self::on_receiving_result) instead.
2993    pub async fn block_task(self) -> Result<T, crate::Error>
2994    where
2995        T: Send,
2996    {
2997        match self.response_rx.await {
2998            Ok(ResponsePayload {
2999                result: Ok(json_value),
3000                ack_tx,
3001            }) => {
3002                // Ack immediately - we're in a spawned task, so the dispatch loop
3003                // can continue while we process the value.
3004                if let Some(tx) = ack_tx {
3005                    let _ = tx.send(());
3006                }
3007                match (self.to_result)(json_value) {
3008                    Ok(value) => Ok(value),
3009                    Err(err) => Err(err),
3010                }
3011            }
3012            Ok(ResponsePayload {
3013                result: Err(err),
3014                ack_tx,
3015            }) => {
3016                if let Some(tx) = ack_tx {
3017                    let _ = tx.send(());
3018                }
3019                Err(err)
3020            }
3021            Err(err) => Err(crate::util::internal_error(format!(
3022                "response to `{}` never received: {}",
3023                self.method, err
3024            ))),
3025        }
3026    }
3027
3028    /// Schedule an async task to run when a successful response is received.
3029    ///
3030    /// This is a convenience wrapper around [`on_receiving_result`](Self::on_receiving_result)
3031    /// for the common pattern of forwarding errors to a request context while only processing
3032    /// successful responses.
3033    ///
3034    /// # Behavior
3035    ///
3036    /// - If the response is `Ok(value)`, your task receives the value and the request context
3037    /// - If the response is `Err(error)`, the error is automatically sent to `responder`
3038    ///   and your task is not called
3039    ///
3040    /// # Example: Chaining requests
3041    ///
3042    /// ```no_run
3043    /// # use agent_client_protocol_test::*;
3044    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
3045    /// # let connection = mock_connection();
3046    /// connection.on_receive_request(async |req: ValidateRequest, responder, cx| {
3047    ///     // Send initial request
3048    ///     cx.send_request(ValidateRequest { data: req.data.clone() })
3049    ///         .on_receiving_ok_result(responder, async |validation, responder| {
3050    ///             // Only runs if validation succeeded
3051    ///             if validation.is_valid {
3052    ///                 // Respond to original request
3053    ///                 responder.respond(ValidateResponse { is_valid: true, error: None })
3054    ///             } else {
3055    ///                 responder.respond_with_error(agent_client_protocol::util::internal_error("validation failed"))
3056    ///             }
3057    ///         })?;
3058    ///
3059    ///     Ok(())
3060    /// }, agent_client_protocol::on_receive_request!())
3061    /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
3062    /// # Ok(())
3063    /// # }
3064    /// ```
3065    ///
3066    /// # Ordering
3067    ///
3068    /// Like [`on_receiving_result`](Self::on_receiving_result), the callback blocks the
3069    /// dispatch loop until it completes. See the [`ordering`](crate::concepts::ordering) module
3070    /// for details.
3071    ///
3072    /// # When to Use
3073    ///
3074    /// Use this when:
3075    /// - You need to respond to a request based on another request's result
3076    /// - You want errors to automatically propagate to the request context
3077    /// - You only care about the success case
3078    ///
3079    /// For more control over error handling, use [`on_receiving_result`](Self::on_receiving_result).
3080    #[track_caller]
3081    pub fn on_receiving_ok_result<F>(
3082        self,
3083        responder: Responder<T>,
3084        task: impl FnOnce(T, Responder<T>) -> F + 'static + Send,
3085    ) -> Result<(), crate::Error>
3086    where
3087        F: Future<Output = Result<(), crate::Error>> + 'static + Send,
3088        T: Send,
3089    {
3090        self.on_receiving_result(async move |result| match result {
3091            Ok(value) => task(value, responder).await,
3092            Err(err) => responder.respond_with_error(err),
3093        })
3094    }
3095
3096    /// Schedule an async task to run when the response is received.
3097    ///
3098    /// This is the recommended way to handle responses in handler callbacks, as it doesn't
3099    /// block the event loop. The task will be spawned automatically when the response arrives.
3100    ///
3101    /// # Example: Handle response in callback
3102    ///
3103    /// ```no_run
3104    /// # use agent_client_protocol_test::*;
3105    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
3106    /// # let connection = mock_connection();
3107    /// connection.on_receive_request(async |req: MyRequest, responder, cx| {
3108    ///     // Send a request and schedule a callback for the response
3109    ///     cx.send_request(QueryRequest { id: 22 })
3110    ///         .on_receiving_result({
3111    ///             let connection = cx.clone();
3112    ///             async move |result| {
3113    ///                 match result {
3114    ///                     Ok(response) => {
3115    ///                         println!("Got response: {:?}", response);
3116    ///                         // Can send more messages here
3117    ///                         connection.send_notification(QueryComplete {})?;
3118    ///                         Ok(())
3119    ///                 }
3120    ///                     Err(error) => {
3121    ///                         eprintln!("Request failed: {}", error);
3122    ///                         Err(error)
3123    ///                     }
3124    ///                 }
3125    ///             }
3126    ///         })?;
3127    ///
3128    ///     // Handler continues immediately without waiting
3129    ///     responder.respond(MyResponse { status: "processing".into() })
3130    /// }, agent_client_protocol::on_receive_request!())
3131    /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
3132    /// # Ok(())
3133    /// # }
3134    /// ```
3135    ///
3136    /// # Ordering
3137    ///
3138    /// The callback runs as a spawned task, but the dispatch loop waits for it to complete
3139    /// before processing the next message. This gives you ordering guarantees: no other
3140    /// messages will be processed while your callback runs.
3141    ///
3142    /// This differs from [`block_task`](Self::block_task), which signals completion immediately
3143    /// upon receiving the response (before your code processes it).
3144    ///
3145    /// See the [`ordering`](crate::concepts::ordering) module for details on ordering guarantees
3146    /// and how to avoid deadlocks.
3147    ///
3148    /// # Error Handling
3149    ///
3150    /// If the scheduled task returns `Err`, the entire server will shut down. Make sure to handle
3151    /// errors appropriately within your task.
3152    ///
3153    /// # When to Use
3154    ///
3155    /// Use this method when:
3156    /// - You're in a handler callback (not a spawned task)
3157    /// - You want ordering guarantees (no other messages processed during your callback)
3158    /// - You need to do async work before "releasing" control back to the dispatch loop
3159    ///
3160    /// For spawned tasks where you don't need ordering guarantees, consider [`block_task`](Self::block_task).
3161    #[track_caller]
3162    pub fn on_receiving_result<F>(
3163        self,
3164        task: impl FnOnce(Result<T, crate::Error>) -> F + 'static + Send,
3165    ) -> Result<(), crate::Error>
3166    where
3167        F: Future<Output = Result<(), crate::Error>> + 'static + Send,
3168        T: Send,
3169    {
3170        let task_tx = self.task_tx.clone();
3171        let method = self.method;
3172        let response_rx = self.response_rx;
3173        let to_result = self.to_result;
3174        let location = Location::caller();
3175
3176        Task::new(location, async move {
3177            match response_rx.await {
3178                Ok(ResponsePayload { result, ack_tx }) => {
3179                    // Convert the result using to_result for Ok values
3180                    let typed_result = match result {
3181                        Ok(json_value) => to_result(json_value),
3182                        Err(err) => Err(err),
3183                    };
3184
3185                    // Run the user's callback
3186                    let outcome = task(typed_result).await;
3187
3188                    // Ack AFTER the callback completes - this is the key difference
3189                    // from block_task. The dispatch loop waits for this ack.
3190                    if let Some(tx) = ack_tx {
3191                        let _ = tx.send(());
3192                    }
3193
3194                    outcome
3195                }
3196                Err(err) => Err(crate::util::internal_error(format!(
3197                    "response to `{method}` never received: {err}"
3198                ))),
3199            }
3200        })
3201        .spawn(&task_tx)
3202    }
3203}
3204
3205// ============================================================================
3206// IntoJrConnectionTransport Implementations
3207// ============================================================================
3208
3209/// A component that communicates over line streams.
3210///
3211/// `Lines` implements the [`ConnectTo`] trait for any pair of line-based streams
3212/// (a `Stream<Item = io::Result<String>>` for incoming and a `Sink<String>` for outgoing),
3213/// handling serialization of JSON-RPC messages to/from newline-delimited JSON.
3214///
3215/// This is a lower-level primitive than [`ByteStreams`] that enables interception and
3216/// transformation of individual lines before they are parsed or after they are serialized.
3217/// This is particularly useful for debugging, logging, or implementing custom line-based
3218/// protocols.
3219///
3220/// # Use Cases
3221///
3222/// - **Line-by-line logging**: Intercept and log each line before parsing
3223/// - **Custom protocols**: Transform lines before/after JSON-RPC processing
3224/// - **Debugging**: Inspect raw message strings
3225/// - **Line filtering**: Skip or modify specific messages
3226///
3227/// Most users should use [`ByteStreams`] instead, which provides a simpler interface
3228/// for byte-based I/O.
3229///
3230/// [`ConnectTo`]: crate::ConnectTo
3231#[derive(Debug)]
3232pub struct Lines<OutgoingSink, IncomingStream> {
3233    /// Outgoing line sink (where we write serialized JSON-RPC messages)
3234    pub outgoing: OutgoingSink,
3235    /// Incoming line stream (where we read and parse JSON-RPC messages)
3236    pub incoming: IncomingStream,
3237}
3238
3239impl<OutgoingSink, IncomingStream> Lines<OutgoingSink, IncomingStream>
3240where
3241    OutgoingSink: futures::Sink<String, Error = std::io::Error> + Send + 'static,
3242    IncomingStream: futures::Stream<Item = std::io::Result<String>> + Send + 'static,
3243{
3244    /// Create a new line stream transport.
3245    pub fn new(outgoing: OutgoingSink, incoming: IncomingStream) -> Self {
3246        Self { outgoing, incoming }
3247    }
3248}
3249
3250impl<OutgoingSink, IncomingStream, R: Role> ConnectTo<R> for Lines<OutgoingSink, IncomingStream>
3251where
3252    OutgoingSink: futures::Sink<String, Error = std::io::Error> + Send + 'static,
3253    IncomingStream: futures::Stream<Item = std::io::Result<String>> + Send + 'static,
3254{
3255    async fn connect_to(self, client: impl ConnectTo<R::Counterpart>) -> Result<(), crate::Error> {
3256        let (channel, serve_self) = ConnectTo::<R>::into_channel_and_future(self);
3257        match futures::future::select(Box::pin(client.connect_to(channel)), serve_self).await {
3258            Either::Left((result, _)) | Either::Right((result, _)) => result,
3259        }
3260    }
3261
3262    fn into_channel_and_future(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
3263        let Self { outgoing, incoming } = self;
3264
3265        // Create a channel pair for the client to use
3266        let (channel_for_caller, channel_for_lines) = Channel::duplex();
3267
3268        // Create the server future that runs the line stream actors
3269        let server_future = Box::pin(async move {
3270            let Channel { rx, tx } = channel_for_lines;
3271
3272            // Run both actors concurrently
3273            let outgoing_future = transport_actor::transport_outgoing_lines_actor(rx, outgoing);
3274            let incoming_future = transport_actor::transport_incoming_lines_actor(incoming, tx);
3275
3276            // Wait for both to complete
3277            futures::try_join!(outgoing_future, incoming_future)?;
3278
3279            Ok(())
3280        });
3281
3282        (channel_for_caller, server_future)
3283    }
3284}
3285
3286/// A component that communicates over byte streams (stdin/stdout, sockets, pipes, etc.).
3287///
3288/// `ByteStreams` implements the [`ConnectTo`] trait for any pair of `AsyncRead` and `AsyncWrite`
3289/// streams, handling serialization of JSON-RPC messages to/from newline-delimited JSON.
3290/// This is the standard way to communicate with external processes or network connections.
3291///
3292/// # Use Cases
3293///
3294/// - **Stdio communication**: Connect to agents or proxies via stdin/stdout
3295/// - **Network sockets**: TCP, Unix domain sockets, or other stream-based protocols
3296/// - **Named pipes**: Cross-process communication on the same machine
3297/// - **File I/O**: Reading from and writing to file descriptors
3298///
3299/// # Example
3300///
3301/// Connecting to an agent via stdio:
3302///
3303/// ```no_run
3304/// use agent_client_protocol::UntypedRole;
3305/// # use agent_client_protocol::{ByteStreams};
3306/// use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
3307///
3308/// # async fn example() -> Result<(), agent_client_protocol::Error> {
3309/// let component = ByteStreams::new(
3310///     tokio::io::stdout().compat_write(),
3311///     tokio::io::stdin().compat(),
3312/// );
3313///
3314/// // Use as a component in a connection
3315/// agent_client_protocol::UntypedRole.builder()
3316///     .name("my-client")
3317///     .connect_to(component)
3318///     .await?;
3319/// # Ok(())
3320/// # }
3321/// ```
3322///
3323/// [`ConnectTo`]: crate::ConnectTo
3324#[derive(Debug)]
3325pub struct ByteStreams<OB, IB> {
3326    /// Outgoing byte stream (where we write serialized messages)
3327    pub outgoing: OB,
3328    /// Incoming byte stream (where we read and parse messages)
3329    pub incoming: IB,
3330}
3331
3332impl<OB, IB> ByteStreams<OB, IB>
3333where
3334    OB: AsyncWrite + Send + 'static,
3335    IB: AsyncRead + Send + 'static,
3336{
3337    /// Create a new byte stream transport.
3338    pub fn new(outgoing: OB, incoming: IB) -> Self {
3339        Self { outgoing, incoming }
3340    }
3341}
3342
3343impl<OB, IB, R: Role> ConnectTo<R> for ByteStreams<OB, IB>
3344where
3345    OB: AsyncWrite + Send + 'static,
3346    IB: AsyncRead + Send + 'static,
3347{
3348    async fn connect_to(self, client: impl ConnectTo<R::Counterpart>) -> Result<(), crate::Error> {
3349        let (channel, serve_self) = ConnectTo::<R>::into_channel_and_future(self);
3350        match futures::future::select(pin!(client.connect_to(channel)), serve_self).await {
3351            Either::Left((result, _)) | Either::Right((result, _)) => result,
3352        }
3353    }
3354
3355    fn into_channel_and_future(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
3356        use futures::AsyncBufReadExt;
3357        use futures::AsyncWriteExt;
3358        use futures::io::BufReader;
3359        let Self { outgoing, incoming } = self;
3360
3361        // Convert byte streams to line streams
3362        // Box both streams to satisfy Unpin requirements
3363        let incoming_lines = Box::pin(BufReader::new(incoming).lines());
3364
3365        // Create a sink that writes lines (with newlines) to the outgoing byte stream
3366        // We need to Box the writer since it may not be Unpin
3367        let outgoing_sink =
3368            futures::sink::unfold(Box::pin(outgoing), async move |mut writer, line: String| {
3369                let mut bytes = line.into_bytes();
3370                bytes.push(b'\n');
3371                writer.write_all(&bytes).await?;
3372                Ok::<_, std::io::Error>(writer)
3373            });
3374
3375        // Delegate to Lines component
3376        ConnectTo::<R>::into_channel_and_future(Lines::new(outgoing_sink, incoming_lines))
3377    }
3378}
3379
3380/// A channel endpoint representing one side of a bidirectional message channel.
3381///
3382/// `Channel` represents a single endpoint's view of a bidirectional communication channel.
3383/// Each endpoint has:
3384/// - `rx`: A receiver for incoming messages (or errors) from the counterpart
3385/// - `tx`: A sender for outgoing messages (or errors) to the counterpart
3386///
3387/// # Example
3388///
3389/// ```no_run
3390/// # use agent_client_protocol::UntypedRole;
3391/// # use agent_client_protocol::{Channel, Builder};
3392/// # async fn example() -> Result<(), agent_client_protocol::Error> {
3393/// // Create a pair of connected channels
3394/// let (channel_a, channel_b) = Channel::duplex();
3395///
3396/// // Each channel can be used by a different component
3397/// UntypedRole.builder()
3398///     .name("connection-a")
3399///     .connect_to(channel_a)
3400///     .await?;
3401/// # Ok(())
3402/// # }
3403/// ```
3404#[derive(Debug)]
3405pub struct Channel {
3406    /// Receives messages (or errors) from the counterpart.
3407    pub rx: mpsc::UnboundedReceiver<Result<jsonrpcmsg::Message, crate::Error>>,
3408    /// Sends messages (or errors) to the counterpart.
3409    pub tx: mpsc::UnboundedSender<Result<jsonrpcmsg::Message, crate::Error>>,
3410}
3411
3412impl Channel {
3413    /// Create a pair of connected channel endpoints.
3414    ///
3415    /// Returns two `Channel` instances that are connected to each other:
3416    /// - Messages sent via `channel_a.tx` are received on `channel_b.rx`
3417    /// - Messages sent via `channel_b.tx` are received on `channel_a.rx`
3418    ///
3419    /// # Returns
3420    ///
3421    /// A tuple `(channel_a, channel_b)` of connected channel endpoints.
3422    #[must_use]
3423    pub fn duplex() -> (Self, Self) {
3424        // Create channels: A sends Result<Message> which B receives as Message
3425        let (a_tx, b_rx) = mpsc::unbounded();
3426        let (b_tx, a_rx) = mpsc::unbounded();
3427
3428        let channel_a = Self { rx: a_rx, tx: a_tx };
3429        let channel_b = Self { rx: b_rx, tx: b_tx };
3430
3431        (channel_a, channel_b)
3432    }
3433
3434    /// Copy messages from `rx` to `tx`.
3435    ///
3436    /// # Returns
3437    ///
3438    /// A `Result` indicating success or failure.
3439    pub async fn copy(mut self) -> Result<(), crate::Error> {
3440        while let Some(msg) = self.rx.next().await {
3441            self.tx
3442                .unbounded_send(msg)
3443                .map_err(crate::util::internal_error)?;
3444        }
3445        Ok(())
3446    }
3447}
3448
3449impl<R: Role> ConnectTo<R> for Channel {
3450    async fn connect_to(self, client: impl ConnectTo<R::Counterpart>) -> Result<(), crate::Error> {
3451        let (client_channel, client_serve) = client.into_channel_and_future();
3452
3453        match futures::try_join!(
3454            Channel {
3455                rx: client_channel.rx,
3456                tx: self.tx
3457            }
3458            .copy(),
3459            Channel {
3460                rx: self.rx,
3461                tx: client_channel.tx
3462            }
3463            .copy(),
3464            client_serve
3465        ) {
3466            Ok(((), (), ())) => Ok(()),
3467            Err(err) => Err(err),
3468        }
3469    }
3470
3471    fn into_channel_and_future(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
3472        (self, Box::pin(future::ready(Ok(()))))
3473    }
3474}