Skip to main content

agent_client_protocol/
jsonrpc.rs

1//! Core JSON-RPC server support.
2
3use agent_client_protocol_schema::v1::{
4    JsonRpcMessage as VersionedJsonRpcMessage, Notification as RpcNotification,
5    Request as RpcRequest, RequestId, Response as RpcResponse, SessionId,
6};
7
8// Types re-exported from crate root
9use serde::{Deserialize, Serialize};
10use std::any::TypeId;
11#[cfg(feature = "unstable_cancel_request")]
12use std::collections::HashMap;
13use std::fmt::Debug;
14use std::panic::Location;
15use std::pin::pin;
16use std::sync::Arc;
17#[cfg(feature = "unstable_cancel_request")]
18use std::sync::{
19    Mutex,
20    atomic::{AtomicBool, Ordering},
21};
22use uuid::Uuid;
23
24#[cfg(feature = "unstable_cancel_request")]
25use futures::FutureExt;
26use futures::channel::{mpsc, oneshot};
27use futures::future::{self, BoxFuture, Either};
28use futures::{AsyncRead, AsyncWrite, StreamExt};
29
30mod dynamic_handler;
31pub(crate) mod handlers;
32mod incoming_actor;
33mod outgoing_actor;
34mod protocol_compat;
35pub(crate) mod run;
36mod task_actor;
37mod transport_actor;
38
39use crate::jsonrpc::dynamic_handler::DynamicHandlerMessage;
40pub use crate::jsonrpc::handlers::NullHandler;
41use crate::jsonrpc::handlers::{ChainedHandler, NamedHandler};
42use crate::jsonrpc::handlers::{MessageHandler, NotificationHandler, RequestHandler};
43use crate::jsonrpc::outgoing_actor::{OutgoingMessageTx, send_raw_message};
44use crate::jsonrpc::protocol_compat::{ProtocolCompat, ProtocolMode};
45use crate::jsonrpc::run::SpawnedRun;
46use crate::jsonrpc::run::{ChainRun, NullRun, RunWithConnectionTo};
47use crate::jsonrpc::task_actor::{Task, TaskTx};
48use crate::mcp_server::McpServer;
49use crate::role::HasPeer;
50use crate::role::Role;
51use crate::{Agent, Client, ConnectTo, RoleId};
52
53/// Raw JSON-RPC message transported by [`Channel`].
54///
55/// This uses the JSON-RPC envelope types from `agent-client-protocol-schema`
56/// while keeping method params as raw, JSON-RPC-valid params at the transport boundary.
57#[derive(Debug, Clone)]
58pub enum RawJsonRpcMessage {
59    /// A JSON-RPC request with an id and expected response.
60    Request(RpcRequest<RawJsonRpcParams>),
61    /// A JSON-RPC notification without a response.
62    Notification(RpcNotification<RawJsonRpcParams>),
63    /// A JSON-RPC response to a prior request.
64    Response(RpcResponse<serde_json::Value>),
65}
66
67/// Raw JSON-RPC request or notification parameters.
68///
69/// JSON-RPC params, when present, must be either an array or an object.
70#[derive(Debug, Clone, PartialEq)]
71pub enum RawJsonRpcParams {
72    /// Positional JSON-RPC params.
73    Array(Vec<serde_json::Value>),
74    /// Named JSON-RPC params.
75    Object(serde_json::Map<String, serde_json::Value>),
76}
77
78impl RawJsonRpcParams {
79    /// Convert a JSON value into JSON-RPC params.
80    pub fn from_value(value: serde_json::Value) -> Result<Option<Self>, crate::Error> {
81        match value {
82            serde_json::Value::Null => Ok(None),
83            serde_json::Value::Array(array) => Ok(Some(Self::Array(array))),
84            serde_json::Value::Object(object) => Ok(Some(Self::Object(object))),
85            _ => {
86                Err(crate::Error::invalid_params()
87                    .data("JSON-RPC params must be an object or array"))
88            }
89        }
90    }
91
92    /// Convert params back into a JSON value.
93    #[must_use]
94    pub fn into_value(self) -> serde_json::Value {
95        match self {
96            Self::Array(array) => serde_json::Value::Array(array),
97            Self::Object(object) => serde_json::Value::Object(object),
98        }
99    }
100}
101
102impl Serialize for RawJsonRpcParams {
103    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
104    where
105        S: serde::Serializer,
106    {
107        match self {
108            Self::Array(array) => array.serialize(serializer),
109            Self::Object(object) => object.serialize(serializer),
110        }
111    }
112}
113
114impl<'de> Deserialize<'de> for RawJsonRpcParams {
115    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
116    where
117        D: serde::Deserializer<'de>,
118    {
119        let value = serde_json::Value::deserialize(deserializer)?;
120        match value {
121            serde_json::Value::Array(array) => Ok(Self::Array(array)),
122            serde_json::Value::Object(object) => Ok(Self::Object(object)),
123            _ => Err(serde::de::Error::custom(
124                "JSON-RPC params must be an object or array",
125            )),
126        }
127    }
128}
129
130impl RawJsonRpcMessage {
131    /// Build a raw JSON-RPC request message.
132    pub fn request(
133        method: String,
134        params: serde_json::Value,
135        id: RequestId,
136    ) -> Result<Self, crate::Error> {
137        Ok(Self::Request(RpcRequest {
138            id,
139            method: Arc::from(method),
140            params: RawJsonRpcParams::from_value(params)?,
141        }))
142    }
143
144    /// Build a raw JSON-RPC notification message.
145    pub fn notification(method: String, params: serde_json::Value) -> Result<Self, crate::Error> {
146        Ok(Self::Notification(RpcNotification {
147            method: Arc::from(method),
148            params: RawJsonRpcParams::from_value(params)?,
149        }))
150    }
151
152    /// Build a raw JSON-RPC response message.
153    #[must_use]
154    pub fn response(id: RequestId, response: Result<serde_json::Value, crate::Error>) -> Self {
155        Self::Response(RpcResponse::new(id, response))
156    }
157
158    /// The response id, if this is a response.
159    #[must_use]
160    pub fn response_id(&self) -> Option<&RequestId> {
161        match self {
162            Self::Response(RpcResponse::Result { id, .. } | RpcResponse::Error { id, .. }) => {
163                Some(id)
164            }
165            Self::Request(_) | Self::Notification(_) => None,
166        }
167    }
168}
169
170impl Serialize for RawJsonRpcMessage {
171    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
172    where
173        S: serde::Serializer,
174    {
175        match self {
176            Self::Request(request) => {
177                VersionedJsonRpcMessage::wrap(request.clone()).serialize(serializer)
178            }
179            Self::Notification(notification) => {
180                VersionedJsonRpcMessage::wrap(notification.clone()).serialize(serializer)
181            }
182            Self::Response(response) => {
183                VersionedJsonRpcMessage::wrap(response.clone()).serialize(serializer)
184            }
185        }
186    }
187}
188
189impl<'de> Deserialize<'de> for RawJsonRpcMessage {
190    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
191    where
192        D: serde::Deserializer<'de>,
193    {
194        let value = serde_json::Value::deserialize(deserializer)?;
195        if value.get("method").is_some() {
196            if value.get("id").is_some() {
197                let request = serde_json::from_value::<
198                    VersionedJsonRpcMessage<RpcRequest<RawJsonRpcParams>>,
199                >(value)
200                .map_err(serde::de::Error::custom)?
201                .into_inner();
202                Ok(Self::Request(request))
203            } else {
204                let notification = serde_json::from_value::<
205                    VersionedJsonRpcMessage<RpcNotification<RawJsonRpcParams>>,
206                >(value)
207                .map_err(serde::de::Error::custom)?
208                .into_inner();
209                Ok(Self::Notification(notification))
210            }
211        } else if value.get("result").is_some() || value.get("error").is_some() {
212            let response = serde_json::from_value::<
213                VersionedJsonRpcMessage<RpcResponse<serde_json::Value>>,
214            >(value)
215            .map_err(serde::de::Error::custom)?
216            .into_inner();
217            Ok(Self::Response(response))
218        } else {
219            Err(serde::de::Error::custom("invalid JSON-RPC message"))
220        }
221    }
222}
223
224fn params_from_transport(params: Option<RawJsonRpcParams>) -> serde_json::Value {
225    params.map_or(serde_json::Value::Null, RawJsonRpcParams::into_value)
226}
227
228/// Handlers process incoming JSON-RPC messages on a connection.
229///
230/// When messages arrive, they flow through a chain of handlers. Each handler can
231/// either **claim** the message (handle it) or **decline** it (pass to the next handler).
232///
233/// # Message Flow
234///
235/// Messages flow through three layers of handlers in order:
236///
237/// ```text
238/// ┌─────────────────────────────────────────────────────────────────┐
239/// │                     Incoming Message                            │
240/// └─────────────────────────────────────────────────────────────────┘
241///                              │
242///                              ▼
243/// ┌─────────────────────────────────────────────────────────────────┐
244/// │  1. User Handlers (registered via on_receive_request, etc.)     │
245/// │     - Tried in registration order                               │
246/// │     - First handler to return Handled::Yes claims the message   │
247/// └─────────────────────────────────────────────────────────────────┘
248///                              │ Handled::No
249///                              ▼
250/// ┌─────────────────────────────────────────────────────────────────┐
251/// │  2. Dynamic Handlers (added at runtime)                         │
252/// │     - Used for session-specific message handling                │
253/// │     - Added via ConnectionTo::add_dynamic_handler             │
254/// └─────────────────────────────────────────────────────────────────┘
255///                              │ Handled::No
256///                              ▼
257/// ┌─────────────────────────────────────────────────────────────────┐
258/// │  3. Role Default Handler                                        │
259/// │     - Fallback based on the connection's Role                   │
260/// │     - Handles protocol-level messages (e.g., proxy forwarding)  │
261/// └─────────────────────────────────────────────────────────────────┘
262///                              │ Handled::No
263///                              ▼
264/// ┌─────────────────────────────────────────────────────────────────┐
265/// │  Unhandled: Error response sent (or queued if retry=true)       │
266/// └─────────────────────────────────────────────────────────────────┘
267/// ```
268///
269/// # The `Handled` Return Value
270///
271/// Each handler returns [`Handled`] to indicate whether it processed the message:
272///
273/// - **`Handled::Yes`** - Message was handled. No further handlers are invoked.
274/// - **`Handled::No { message, retry }`** - Message was not handled. The message
275///   (possibly modified) is passed to the next handler in the chain.
276///
277/// For convenience, handlers can return `()` which is equivalent to `Handled::Yes`.
278///
279/// # The Retry Mechanism
280///
281/// The `retry` flag in `Handled::No` controls what happens when no handler claims a message:
282///
283/// - **`retry: false`** (default) - Send a "method not found" error response immediately.
284/// - **`retry: true`** - Queue the message and retry it when new dynamic handlers are added.
285///
286/// This mechanism exists because of a timing issue with sessions: when a `session/new`
287/// response is being processed, the dynamic handler for that session hasn't been registered
288/// yet, but `session/update` notifications for that session may already be arriving.
289/// By setting `retry: true`, these early notifications are queued until the session's
290/// dynamic handler is added.
291///
292/// # Handler Registration
293///
294/// Most users register handlers using the builder methods on [`Builder`]:
295///
296/// ```
297/// # use agent_client_protocol::{Agent, Client, ConnectTo};
298/// # use agent_client_protocol::schema::v1::{AgentCapabilities, InitializeRequest, InitializeResponse};
299/// # use agent_client_protocol_test::StatusUpdate;
300/// # async fn example(transport: impl ConnectTo<Agent>) -> Result<(), agent_client_protocol::Error> {
301/// Agent.builder()
302///     .on_receive_request(async |req: InitializeRequest, responder, cx| {
303///         responder.respond(
304///             InitializeResponse::new(req.protocol_version)
305///                 .agent_capabilities(AgentCapabilities::new()),
306///         )
307///     }, agent_client_protocol::on_receive_request!())
308///     .on_receive_notification(async |notif: StatusUpdate, cx| {
309///         // Process notification
310///         Ok(())
311///     }, agent_client_protocol::on_receive_notification!())
312///     .connect_to(transport)
313///     .await?;
314/// # Ok(())
315/// # }
316/// ```
317///
318/// The type parameter on the closure determines which messages are dispatched to it.
319/// Messages that don't match the type are automatically passed to the next handler.
320///
321/// # Implementing Custom Handlers
322///
323/// For advanced use cases, you can implement `HandleMessageAs` directly:
324///
325/// ```ignore
326/// struct MyHandler;
327///
328/// impl HandleMessageAs<Agent> for MyHandler {
329///
330///     async fn handle_dispatch(
331///         &mut self,
332///         message: Dispatch,
333///         cx: ConnectionTo<Self::Role>,
334///     ) -> Result<Handled<Dispatch>, Error> {
335///         if message.method() == "my/custom/method" {
336///             // Handle it
337///             Ok(Handled::Yes)
338///         } else {
339///             // Pass to next handler
340///             Ok(Handled::No { message, retry: false })
341///         }
342///     }
343///
344///     fn describe_chain(&self) -> impl std::fmt::Debug {
345///         "MyHandler"
346///     }
347/// }
348/// ```
349///
350/// # Important: Handlers Must Not Block
351///
352/// The connection processes messages on a single async task. While a handler is running,
353/// no other messages can be processed. For expensive operations, use [`ConnectionTo::spawn`]
354/// to run work concurrently:
355///
356/// ```
357/// # use agent_client_protocol::{Client, Agent, ConnectTo};
358/// # use agent_client_protocol_test::{expensive_operation, ProcessComplete};
359/// # async fn example(transport: impl ConnectTo<Client>) -> Result<(), agent_client_protocol::Error> {
360/// # Client.builder().connect_with(transport, async |cx| {
361/// cx.spawn({
362///     let connection = cx.clone();
363///     async move {
364///         let result = expensive_operation("data").await?;
365///         connection.send_notification(ProcessComplete { result })?;
366///         Ok(())
367///     }
368/// })?;
369/// # Ok(())
370/// # }).await?;
371/// # Ok(())
372/// # }
373/// ```
374#[allow(async_fn_in_trait)]
375/// A handler for incoming JSON-RPC messages.
376///
377/// This trait is implemented by types that can process incoming messages on a connection.
378/// Handlers are registered with a [`Builder`] and are called in order until
379/// one claims the message.
380///
381/// The type parameter `R` is the role this handler plays - who I am.
382/// For an agent handler, `R = Agent` (I handle messages as an agent).
383/// For a client handler, `R = Client` (I handle messages as a client).
384pub trait HandleDispatchFrom<Counterpart: Role>: Send {
385    /// Attempt to claim an incoming message (request or notification).
386    ///
387    /// # Important: do not block
388    ///
389    /// The server will not process new messages until this handler returns.
390    /// You should avoid blocking in this callback unless you wish to block the server (e.g., for rate limiting).
391    /// The recommended approach to manage expensive operations is to the [`ConnectionTo::spawn`] method available on the message context.
392    ///
393    /// # Parameters
394    ///
395    /// * `message` - The incoming message to handle.
396    /// * `connection` - The connection, used to send messages and access connection state.
397    ///
398    /// # Returns
399    ///
400    /// * `Ok(Handled::Yes)` if the message was claimed. It will not be propagated further.
401    /// * `Ok(Handled::No(message))` if not; the (possibly changed) message will be passed to the remaining handlers.
402    /// * `Err` if an internal error occurs (this will bring down the server).
403    fn handle_dispatch_from(
404        &mut self,
405        message: Dispatch,
406        connection: ConnectionTo<Counterpart>,
407    ) -> impl Future<Output = Result<Handled<Dispatch>, crate::Error>> + Send;
408
409    /// Returns a debug description of the registered handlers for diagnostics.
410    fn describe_chain(&self) -> impl std::fmt::Debug;
411}
412
413impl<Counterpart: Role, H> HandleDispatchFrom<Counterpart> for &mut H
414where
415    H: HandleDispatchFrom<Counterpart>,
416{
417    fn handle_dispatch_from(
418        &mut self,
419        message: Dispatch,
420        cx: ConnectionTo<Counterpart>,
421    ) -> impl Future<Output = Result<Handled<Dispatch>, crate::Error>> + Send {
422        H::handle_dispatch_from(self, message, cx)
423    }
424
425    fn describe_chain(&self) -> impl std::fmt::Debug {
426        H::describe_chain(self)
427    }
428}
429
430/// A JSON-RPC connection that can act as either a server, client, or both.
431///
432/// [`Builder`] provides a builder-style API for creating JSON-RPC servers and clients.
433/// You start by calling `Role.builder()` (e.g., `Client.builder()`), then add message
434/// handlers, and finally drive the connection with either [`connect_to`](Builder::connect_to)
435/// or [`connect_with`](Builder::connect_with), providing a component implementation
436/// (e.g., [`ByteStreams`] for byte streams).
437///
438/// # JSON-RPC Primer
439///
440/// JSON-RPC 2.0 has two fundamental message types:
441///
442/// * **Requests** - Messages that expect a response. They have an `id` field that gets
443///   echoed back in the response so the sender can correlate them.
444/// * **Notifications** - Fire-and-forget messages with no `id` field. The sender doesn't
445///   expect or receive a response.
446///
447/// # Type-Driven Message Dispatch
448///
449/// The handler registration methods use Rust's type system to determine which messages
450/// to handle. The type parameter you provide controls what gets dispatched to your handler:
451///
452/// ## Single Message Types
453///
454/// The simplest case - handle one specific message type:
455///
456/// ```no_run
457/// # use agent_client_protocol_test::*;
458/// # use agent_client_protocol::schema::v1::{InitializeRequest, InitializeResponse, SessionNotification};
459/// # async fn example() -> Result<(), agent_client_protocol::Error> {
460/// # let connection = mock_connection();
461/// connection
462///     .on_receive_request(async |req: InitializeRequest, responder, cx| {
463///         // Handle only InitializeRequest messages
464///         responder.respond(InitializeResponse::make())
465///     }, agent_client_protocol::on_receive_request!())
466///     .on_receive_notification(async |notif: SessionNotification, cx| {
467///         // Handle only SessionUpdate notifications
468///         Ok(())
469///     }, agent_client_protocol::on_receive_notification!())
470/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
471/// # Ok(())
472/// # }
473/// ```
474///
475/// ## Enum Message Types
476///
477/// You can also handle multiple related messages with a single handler by defining an enum
478/// that implements the appropriate trait ([`JsonRpcRequest`] or [`JsonRpcNotification`]):
479///
480/// ```no_run
481/// # use agent_client_protocol_test::*;
482/// # use agent_client_protocol::{JsonRpcRequest, JsonRpcMessage, UntypedMessage};
483/// # use agent_client_protocol::schema::v1::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse};
484/// # async fn example() -> Result<(), agent_client_protocol::Error> {
485/// # let connection = mock_connection();
486/// // Define an enum for multiple request types
487/// #[derive(Debug, Clone)]
488/// enum MyRequests {
489///     Initialize(InitializeRequest),
490///     Prompt(PromptRequest),
491/// }
492///
493/// // Implement JsonRpcRequest for your enum
494/// # impl JsonRpcMessage for MyRequests {
495/// #     fn matches_method(_method: &str) -> bool { false }
496/// #     fn method(&self) -> &str { "myRequests" }
497/// #     fn to_untyped_message(&self) -> Result<UntypedMessage, agent_client_protocol::Error> { todo!() }
498/// #     fn parse_message(_method: &str, _params: &impl serde::Serialize) -> Result<Self, agent_client_protocol::Error> { Err(agent_client_protocol::Error::method_not_found()) }
499/// # }
500/// impl JsonRpcRequest for MyRequests { type Response = serde_json::Value; }
501///
502/// // Handle all variants in one place
503/// connection.on_receive_request(async |req: MyRequests, responder, cx| {
504///     match req {
505///         MyRequests::Initialize(init) => { responder.respond(serde_json::json!({})) }
506///         MyRequests::Prompt(prompt) => { responder.respond(serde_json::json!({})) }
507///     }
508/// }, agent_client_protocol::on_receive_request!())
509/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
510/// # Ok(())
511/// # }
512/// ```
513///
514/// ## Mixed Message Types
515///
516/// For enums containing both requests AND notifications, use [`on_receive_dispatch`](Self::on_receive_dispatch):
517///
518/// ```no_run
519/// # use agent_client_protocol_test::*;
520/// # use agent_client_protocol::Dispatch;
521/// # use agent_client_protocol::schema::v1::{InitializeRequest, InitializeResponse, SessionNotification};
522/// # async fn example() -> Result<(), agent_client_protocol::Error> {
523/// # let connection = mock_connection();
524/// // on_receive_dispatch receives Dispatch which can be either a request or notification
525/// connection.on_receive_dispatch(async |msg: Dispatch<InitializeRequest, SessionNotification>, _cx| {
526///     match msg {
527///         Dispatch::Request(req, responder) => {
528///             responder.respond(InitializeResponse::make())
529///         }
530///         Dispatch::Notification(notif) => {
531///             Ok(())
532///         }
533///         Dispatch::Response(result, router) => {
534///             // Forward response to its destination
535///             router.respond_with_result(result)
536///         }
537///     }
538/// }, agent_client_protocol::on_receive_dispatch!())
539/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
540/// # Ok(())
541/// # }
542/// ```
543///
544/// # Handler Registration
545///
546/// Register handlers using these methods (listed from most common to most flexible):
547///
548/// * [`on_receive_request`](Self::on_receive_request) - Handle JSON-RPC requests (messages expecting responses)
549/// * [`on_receive_notification`](Self::on_receive_notification) - Handle JSON-RPC notifications (fire-and-forget)
550/// * [`on_receive_dispatch`](Self::on_receive_dispatch) - Handle enums containing both requests and notifications
551/// * [`with_handler`](Self::with_handler) - Low-level primitive for maximum flexibility
552///
553/// ## Handler Ordering
554///
555/// Handlers are tried in the order you register them. The first handler that claims a message
556/// (by matching its type) will process it. Subsequent handlers won't see that message:
557///
558/// ```no_run
559/// # use agent_client_protocol_test::*;
560/// # use agent_client_protocol::{Dispatch, UntypedMessage};
561/// # use agent_client_protocol::schema::v1::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse};
562/// # async fn example() -> Result<(), agent_client_protocol::Error> {
563/// # let connection = mock_connection();
564/// connection
565///     .on_receive_request(async |req: InitializeRequest, responder, cx| {
566///         // This runs first for InitializeRequest
567///         responder.respond(InitializeResponse::make())
568///     }, agent_client_protocol::on_receive_request!())
569///     .on_receive_request(async |req: PromptRequest, responder, cx| {
570///         // This runs first for PromptRequest
571///         responder.respond(PromptResponse::make())
572///     }, agent_client_protocol::on_receive_request!())
573///     .on_receive_dispatch(async |msg: Dispatch, cx| {
574///         // This runs for any message not handled above
575///         msg.respond_with_error(agent_client_protocol::util::internal_error("unknown method"), cx)
576///     }, agent_client_protocol::on_receive_dispatch!())
577/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
578/// # Ok(())
579/// # }
580/// ```
581///
582/// # Event Loop and Concurrency
583///
584/// Understanding the event loop is critical for writing correct handlers.
585///
586/// ## The Event Loop
587///
588/// [`Builder`] runs all handler callbacks on a single async task - the event loop.
589/// While a handler is running, **the server cannot receive new messages**. This means
590/// any blocking or expensive work in your handlers will stall the entire connection.
591///
592/// To avoid blocking the event loop, use [`ConnectionTo::spawn`] to offload serious
593/// work to concurrent tasks:
594///
595/// ```no_run
596/// # use agent_client_protocol_test::*;
597/// # async fn example() -> Result<(), agent_client_protocol::Error> {
598/// # let connection = mock_connection();
599/// connection.on_receive_request(async |req: AnalyzeRequest, responder, cx| {
600///     // Clone cx for the spawned task
601///     cx.spawn({
602///         let connection = cx.clone();
603///         async move {
604///             let result = expensive_analysis(&req.data).await?;
605///             connection.send_notification(AnalysisComplete { result })?;
606///             Ok(())
607///         }
608///     })?;
609///
610///     // Respond immediately without blocking
611///     responder.respond(AnalysisStarted { job_id: 42 })
612/// }, agent_client_protocol::on_receive_request!())
613/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
614/// # Ok(())
615/// # }
616/// ```
617///
618/// Note that the entire connection runs within one async task, so parallelism must be
619/// managed explicitly using [`spawn`](ConnectionTo::spawn).
620///
621/// ## The Connection Context
622///
623/// Handler callbacks receive a context object (`cx`) for interacting with the connection:
624///
625/// * **For request handlers** - [`Responder<R>`] provides [`respond`](Responder::respond)
626///   to send the response, plus methods to send other messages
627/// * **For notification handlers** - [`ConnectionTo`] provides methods to send messages
628///   and spawn tasks
629///
630/// Both context types support:
631/// * [`send_request`](ConnectionTo::send_request) - Send requests to the other side
632/// * [`send_notification`](ConnectionTo::send_notification) - Send notifications
633/// * [`spawn`](ConnectionTo::spawn) - Run tasks concurrently without blocking the event loop
634///
635/// The [`SentRequest`] returned by `send_request` provides methods like
636/// [`on_receiving_result`](SentRequest::on_receiving_result) that help you
637/// avoid accidentally blocking the event loop while waiting for responses.
638///
639/// # Driving the Connection
640///
641/// After adding handlers, you must drive the connection using one of two modes:
642///
643/// ## Server Mode: `connect_to()`
644///
645/// Use [`connect_to`](Self::connect_to) when you only need to respond to incoming messages:
646///
647/// ```no_run
648/// # use agent_client_protocol_test::*;
649/// # async fn example() -> Result<(), agent_client_protocol::Error> {
650/// # let connection = mock_connection();
651/// connection
652///     .on_receive_request(async |req: MyRequest, responder, cx| {
653///         responder.respond(MyResponse { status: "ok".into() })
654///     }, agent_client_protocol::on_receive_request!())
655///     .connect_to(MockTransport)  // Runs until connection closes or error occurs
656///     .await?;
657/// # Ok(())
658/// # }
659/// ```
660///
661/// The connection will process incoming messages and invoke your handlers until the
662/// connection is closed or an error occurs.
663///
664/// ## Client Mode: `connect_with()`
665///
666/// Use [`connect_with`](Self::connect_with) when you need to both handle incoming messages
667/// AND send your own requests/notifications:
668///
669/// ```no_run
670/// # use agent_client_protocol_test::*;
671/// # use agent_client_protocol::schema::v1::InitializeRequest;
672/// # async fn example() -> Result<(), agent_client_protocol::Error> {
673/// # let connection = mock_connection();
674/// connection
675///     .on_receive_request(async |req: MyRequest, responder, cx| {
676///         responder.respond(MyResponse { status: "ok".into() })
677///     }, agent_client_protocol::on_receive_request!())
678///     .connect_with(MockTransport, async |cx| {
679///         // You can send requests to the other side
680///         let response = cx.send_request(InitializeRequest::make())
681///             .block_task()
682///             .await?;
683///
684///         // And send notifications
685///         cx.send_notification(StatusUpdate { message: "ready".into() })?;
686///
687///         Ok(())
688///     })
689///     .await?;
690/// # Ok(())
691/// # }
692/// ```
693///
694/// The connection will serve incoming messages in the background while your client closure
695/// runs. When the closure returns, the connection shuts down.
696///
697/// # Example: Complete Agent
698///
699/// ```no_run
700/// # use agent_client_protocol::UntypedRole;
701/// # use agent_client_protocol::{Builder};
702/// # use agent_client_protocol::Stdio;
703/// # use agent_client_protocol::schema::v1::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse, SessionNotification};
704/// # async fn example() -> Result<(), agent_client_protocol::Error> {
705/// let transport = Stdio::new();
706///
707/// UntypedRole.builder()
708///     .name("my-agent")  // Optional: for debugging logs
709///     .on_receive_request(async |init: InitializeRequest, responder, cx| {
710///         let response: InitializeResponse = todo!();
711///         responder.respond(response)
712///     }, agent_client_protocol::on_receive_request!())
713///     .on_receive_request(async |prompt: PromptRequest, responder, cx| {
714///         // You can send notifications while processing a request
715///         let notif: SessionNotification = todo!();
716///         cx.send_notification(notif)?;
717///
718///         // Then respond to the request
719///         let response: PromptResponse = todo!();
720///         responder.respond(response)
721///     }, agent_client_protocol::on_receive_request!())
722///     .connect_to(transport)
723///     .await?;
724/// # Ok(())
725/// # }
726/// ```
727#[must_use]
728#[derive(Debug)]
729pub struct Builder<Host: Role, Handler = NullHandler, Runner = NullRun>
730where
731    Handler: HandleDispatchFrom<Host::Counterpart>,
732    Runner: RunWithConnectionTo<Host::Counterpart>,
733{
734    /// My role.
735    host: Host,
736
737    /// Name of the connection, used in tracing logs.
738    name: Option<String>,
739
740    /// Handler for incoming messages.
741    handler: Handler,
742
743    /// Responder for background tasks.
744    responder: Runner,
745
746    /// Protocol version mode for the public API and wire compatibility layer.
747    protocol_mode: ProtocolMode,
748}
749
750fn default_protocol_mode<Host: Role>() -> ProtocolMode {
751    let role = TypeId::of::<Host>();
752
753    if role == TypeId::of::<Agent>() {
754        ProtocolMode::v1_agent()
755    } else if role == TypeId::of::<Client>() {
756        ProtocolMode::v1_client()
757    } else {
758        ProtocolMode::disabled()
759    }
760}
761
762impl<Host: Role> Builder<Host, NullHandler, NullRun> {
763    /// Create a new connection builder for the given role.
764    /// This type follows a builder pattern; use other methods to configure and then invoke
765    /// [`Self::connect_to`] (to use as a server) or [`Self::connect_with`] to use as a client.
766    pub fn new(role: Host) -> Self {
767        Self {
768            host: role,
769            name: None,
770            handler: NullHandler,
771            responder: NullRun,
772            protocol_mode: default_protocol_mode::<Host>(),
773        }
774    }
775}
776
777impl<Host: Role, Handler> Builder<Host, Handler, NullRun>
778where
779    Handler: HandleDispatchFrom<Host::Counterpart>,
780{
781    /// Create a new connection builder with the given handler.
782    pub fn new_with(role: Host, handler: Handler) -> Self {
783        Self {
784            host: role,
785            name: None,
786            handler,
787            responder: NullRun,
788            protocol_mode: default_protocol_mode::<Host>(),
789        }
790    }
791}
792
793impl<
794    Host: Role,
795    Handler: HandleDispatchFrom<Host::Counterpart>,
796    Runner: RunWithConnectionTo<Host::Counterpart>,
797> Builder<Host, Handler, Runner>
798{
799    /// Set the "name" of this connection -- used only for debugging logs.
800    pub fn name(mut self, name: impl ToString) -> Self {
801        self.name = Some(name.to_string());
802        self
803    }
804
805    pub(crate) fn v1_agent(mut self) -> Self {
806        self.protocol_mode = ProtocolMode::v1_agent();
807        self
808    }
809
810    pub(crate) fn v1_client(mut self) -> Self {
811        self.protocol_mode = ProtocolMode::v1_client();
812        self
813    }
814
815    #[cfg(feature = "unstable_protocol_v2")]
816    pub(crate) fn v2_agent(mut self) -> Self {
817        self.protocol_mode = ProtocolMode::v2_agent();
818        self
819    }
820
821    #[cfg(feature = "unstable_protocol_v2")]
822    pub(crate) fn v2_client(mut self) -> Self {
823        self.protocol_mode = ProtocolMode::v2_client();
824        self
825    }
826
827    /// Merge another [`Builder`] into this one.
828    ///
829    /// Prefer [`Self::on_receive_request`] or [`Self::on_receive_notification`].
830    /// This is a low-level method that is not intended for general use.
831    pub fn with_connection_builder(
832        self,
833        other: Builder<
834            Host,
835            impl HandleDispatchFrom<Host::Counterpart>,
836            impl RunWithConnectionTo<Host::Counterpart>,
837        >,
838    ) -> Builder<
839        Host,
840        impl HandleDispatchFrom<Host::Counterpart>,
841        impl RunWithConnectionTo<Host::Counterpart>,
842    > {
843        let Builder {
844            name: other_name,
845            handler: other_handler,
846            responder: other_responder,
847            protocol_mode: other_protocol_mode,
848            host: _,
849        } = other;
850        Builder {
851            host: self.host,
852            name: self.name,
853            handler: ChainedHandler::new(
854                self.handler,
855                NamedHandler::new(other_name, other_handler),
856            ),
857            responder: ChainRun::new(self.responder, other_responder),
858            protocol_mode: self.protocol_mode.merge(other_protocol_mode),
859        }
860    }
861
862    /// Add a new [`HandleDispatchFrom`] to the chain.
863    ///
864    /// Prefer [`Self::on_receive_request`] or [`Self::on_receive_notification`].
865    /// This is a low-level method that is not intended for general use.
866    pub fn with_handler(
867        self,
868        handler: impl HandleDispatchFrom<Host::Counterpart>,
869    ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner> {
870        Builder {
871            host: self.host,
872            name: self.name,
873            handler: ChainedHandler::new(self.handler, handler),
874            responder: self.responder,
875            protocol_mode: self.protocol_mode,
876        }
877    }
878
879    /// Add a new [`RunWithConnectionTo`] to the chain.
880    pub fn with_responder<Run1>(
881        self,
882        responder: Run1,
883    ) -> Builder<Host, Handler, impl RunWithConnectionTo<Host::Counterpart>>
884    where
885        Run1: RunWithConnectionTo<Host::Counterpart>,
886    {
887        Builder {
888            host: self.host,
889            name: self.name,
890            handler: self.handler,
891            responder: ChainRun::new(self.responder, responder),
892            protocol_mode: self.protocol_mode,
893        }
894    }
895
896    /// Enqueue a task to run once the connection is actively serving traffic.
897    #[track_caller]
898    pub fn with_spawned<F, Fut>(
899        self,
900        task: F,
901    ) -> Builder<Host, Handler, impl RunWithConnectionTo<Host::Counterpart>>
902    where
903        F: FnOnce(ConnectionTo<Host::Counterpart>) -> Fut + Send,
904        Fut: Future<Output = Result<(), crate::Error>> + Send,
905    {
906        let location = Location::caller();
907        self.with_responder(SpawnedRun::new(location, task))
908    }
909
910    /// Register a handler for messages that can be either requests OR notifications.
911    ///
912    /// Use this when you want to handle an enum type that contains both request and
913    /// notification variants. Your handler receives a [`Dispatch<Req, Notif>`] which
914    /// is an enum with two variants:
915    ///
916    /// - `Dispatch::Request(request, responder)` - A request with its response context
917    /// - `Dispatch::Notification(notification)` - A notification
918    /// - `Dispatch::Response(result, router)` - A response to a request we sent
919    ///
920    /// # Example
921    ///
922    /// ```no_run
923    /// # use agent_client_protocol_test::*;
924    /// # use agent_client_protocol::Dispatch;
925    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
926    /// # let connection = mock_connection();
927    /// connection.on_receive_dispatch(async |message: Dispatch<MyRequest, StatusUpdate>, _cx| {
928    ///     match message {
929    ///         Dispatch::Request(req, responder) => {
930    ///             // Handle request and send response
931    ///             responder.respond(MyResponse { status: "ok".into() })
932    ///         }
933    ///         Dispatch::Notification(notif) => {
934    ///             // Handle notification (no response needed)
935    ///             Ok(())
936    ///         }
937    ///         Dispatch::Response(result, router) => {
938    ///             // Forward response to its destination
939    ///             router.respond_with_result(result)
940    ///         }
941    ///     }
942    /// }, agent_client_protocol::on_receive_dispatch!())
943    /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
944    /// # Ok(())
945    /// # }
946    /// ```
947    ///
948    /// For most use cases, prefer [`on_receive_request`](Self::on_receive_request) or
949    /// [`on_receive_notification`](Self::on_receive_notification) which provide cleaner APIs
950    /// for handling requests or notifications separately.
951    ///
952    /// # Ordering
953    ///
954    /// This callback runs inside the dispatch loop and blocks further message processing
955    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
956    /// ordering guarantees and how to avoid deadlocks.
957    pub fn on_receive_dispatch<Req, Notif, F, T, ToFut>(
958        self,
959        op: F,
960        to_future_hack: ToFut,
961    ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
962    where
963        Host::Counterpart: HasPeer<Host::Counterpart>,
964        Req: JsonRpcRequest,
965        Notif: JsonRpcNotification,
966        F: AsyncFnMut(
967                Dispatch<Req, Notif>,
968                ConnectionTo<Host::Counterpart>,
969            ) -> Result<T, crate::Error>
970            + Send,
971        T: IntoHandled<Dispatch<Req, Notif>>,
972        ToFut: Fn(
973                &mut F,
974                Dispatch<Req, Notif>,
975                ConnectionTo<Host::Counterpart>,
976            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
977            + Send
978            + Sync,
979    {
980        let handler = MessageHandler::new(
981            self.host.counterpart(),
982            self.host.counterpart(),
983            op,
984            to_future_hack,
985        );
986        self.with_handler(handler)
987    }
988
989    /// Register a handler for JSON-RPC requests of type `Req`.
990    ///
991    /// Your handler receives two arguments:
992    /// 1. The request (type `Req`)
993    /// 2. A [`Responder<R, Req::Response>`] for sending the response
994    ///
995    /// The request context allows you to:
996    /// - Send the response with [`Responder::respond`]
997    /// - Send notifications to the client with [`ConnectionTo::send_notification`]
998    /// - Send requests to the client with [`ConnectionTo::send_request`]
999    ///
1000    /// # Example
1001    ///
1002    /// ```ignore
1003    /// # use agent_client_protocol::UntypedRole;
1004    /// # use agent_client_protocol::{Builder};
1005    /// # use agent_client_protocol::schema::v1::{PromptRequest, PromptResponse, SessionNotification};
1006    /// # fn example<R: agent_client_protocol::Role>(connection: Builder<R, impl agent_client_protocol::HandleMessageAs<R>>) {
1007    /// connection.on_receive_request(async |request: PromptRequest, responder, cx| {
1008    ///     // Send a notification while processing
1009    ///     let notif: SessionNotification = todo!();
1010    ///     cx.send_notification(notif)?;
1011    ///
1012    ///     // Do some work...
1013    ///     let result = todo!("process the prompt");
1014    ///
1015    ///     // Send the response
1016    ///     let response: PromptResponse = todo!();
1017    ///     responder.respond(response)
1018    /// }, agent_client_protocol::on_receive_request!());
1019    /// # }
1020    /// ```
1021    ///
1022    /// # Type Parameter
1023    ///
1024    /// `Req` can be either a single request type or an enum of multiple request types.
1025    /// See the [type-driven dispatch](Self#type-driven-message-dispatch) section for details.
1026    ///
1027    /// # Ordering
1028    ///
1029    /// This callback runs inside the dispatch loop and blocks further message processing
1030    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
1031    /// ordering guarantees and how to avoid deadlocks.
1032    pub fn on_receive_request<Req: JsonRpcRequest, F, T, ToFut>(
1033        self,
1034        op: F,
1035        to_future_hack: ToFut,
1036    ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
1037    where
1038        Host::Counterpart: HasPeer<Host::Counterpart>,
1039        F: AsyncFnMut(
1040                Req,
1041                Responder<Req::Response>,
1042                ConnectionTo<Host::Counterpart>,
1043            ) -> Result<T, crate::Error>
1044            + Send,
1045        T: IntoHandled<(Req, Responder<Req::Response>)>,
1046        ToFut: Fn(
1047                &mut F,
1048                Req,
1049                Responder<Req::Response>,
1050                ConnectionTo<Host::Counterpart>,
1051            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
1052            + Send
1053            + Sync,
1054    {
1055        let handler = RequestHandler::new(
1056            self.host.counterpart(),
1057            self.host.counterpart(),
1058            op,
1059            to_future_hack,
1060        );
1061        self.with_handler(handler)
1062    }
1063
1064    /// Register a handler for JSON-RPC notifications of type `Notif`.
1065    ///
1066    /// Notifications are fire-and-forget messages that don't expect a response.
1067    /// Your handler receives:
1068    /// 1. The notification (type `Notif`)
1069    /// 2. A [`ConnectionTo<R>`] for sending messages to the other side
1070    ///
1071    /// Unlike request handlers, you cannot send a response (notifications don't have IDs),
1072    /// but you can still send your own requests and notifications using the context.
1073    ///
1074    /// # Example
1075    ///
1076    /// ```no_run
1077    /// # use agent_client_protocol_test::*;
1078    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
1079    /// # let connection = mock_connection();
1080    /// connection.on_receive_notification(async |notif: SessionUpdate, cx| {
1081    ///     // Process the notification
1082    ///     update_session_state(&notif)?;
1083    ///
1084    ///     // Optionally send a notification back
1085    ///     cx.send_notification(StatusUpdate {
1086    ///         message: "Acknowledged".into(),
1087    ///     })?;
1088    ///
1089    ///     Ok(())
1090    /// }, agent_client_protocol::on_receive_notification!())
1091    /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
1092    /// # Ok(())
1093    /// # }
1094    /// ```
1095    ///
1096    /// # Type Parameter
1097    ///
1098    /// `Notif` can be either a single notification type or an enum of multiple notification types.
1099    /// See the [type-driven dispatch](Self#type-driven-message-dispatch) section for details.
1100    ///
1101    /// # Ordering
1102    ///
1103    /// This callback runs inside the dispatch loop and blocks further message processing
1104    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
1105    /// ordering guarantees and how to avoid deadlocks.
1106    pub fn on_receive_notification<Notif, F, T, ToFut>(
1107        self,
1108        op: F,
1109        to_future_hack: ToFut,
1110    ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
1111    where
1112        Host::Counterpart: HasPeer<Host::Counterpart>,
1113        Notif: JsonRpcNotification,
1114        F: AsyncFnMut(Notif, ConnectionTo<Host::Counterpart>) -> Result<T, crate::Error> + Send,
1115        T: IntoHandled<(Notif, ConnectionTo<Host::Counterpart>)>,
1116        ToFut: Fn(
1117                &mut F,
1118                Notif,
1119                ConnectionTo<Host::Counterpart>,
1120            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
1121            + Send
1122            + Sync,
1123    {
1124        let handler = NotificationHandler::new(
1125            self.host.counterpart(),
1126            self.host.counterpart(),
1127            op,
1128            to_future_hack,
1129        );
1130        self.with_handler(handler)
1131    }
1132
1133    /// Register a handler for messages from a specific peer.
1134    ///
1135    /// This is similar to [`on_receive_dispatch`](Self::on_receive_dispatch), but allows
1136    /// specifying the source peer explicitly. This is useful when receiving messages
1137    /// from a peer that requires message transformation (e.g., unwrapping `SuccessorMessage`
1138    /// envelopes when receiving from an agent via a proxy).
1139    ///
1140    /// For the common case of receiving from the default counterpart, use
1141    /// [`on_receive_dispatch`](Self::on_receive_dispatch) instead.
1142    ///
1143    /// # Ordering
1144    ///
1145    /// This callback runs inside the dispatch loop and blocks further message processing
1146    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
1147    /// ordering guarantees and how to avoid deadlocks.
1148    pub fn on_receive_dispatch_from<
1149        Req: JsonRpcRequest,
1150        Notif: JsonRpcNotification,
1151        Peer: Role,
1152        F,
1153        T,
1154        ToFut,
1155    >(
1156        self,
1157        peer: Peer,
1158        op: F,
1159        to_future_hack: ToFut,
1160    ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
1161    where
1162        Host::Counterpart: HasPeer<Peer>,
1163        F: AsyncFnMut(
1164                Dispatch<Req, Notif>,
1165                ConnectionTo<Host::Counterpart>,
1166            ) -> Result<T, crate::Error>
1167            + Send,
1168        T: IntoHandled<Dispatch<Req, Notif>>,
1169        ToFut: Fn(
1170                &mut F,
1171                Dispatch<Req, Notif>,
1172                ConnectionTo<Host::Counterpart>,
1173            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
1174            + Send
1175            + Sync,
1176    {
1177        let handler = MessageHandler::new(self.host.counterpart(), peer, op, to_future_hack);
1178        self.with_handler(handler)
1179    }
1180
1181    /// Register a handler for JSON-RPC requests from a specific peer.
1182    ///
1183    /// This is similar to [`on_receive_request`](Self::on_receive_request), but allows
1184    /// specifying the source peer explicitly. This is useful when receiving messages
1185    /// from a peer that requires message transformation (e.g., unwrapping `SuccessorRequest`
1186    /// envelopes when receiving from an agent via a proxy).
1187    ///
1188    /// For the common case of receiving from the default counterpart, use
1189    /// [`on_receive_request`](Self::on_receive_request) instead.
1190    ///
1191    /// # Example
1192    ///
1193    /// ```ignore
1194    /// use agent_client_protocol::Agent;
1195    /// use agent_client_protocol::schema::v1::InitializeRequest;
1196    ///
1197    /// // Conductor receiving from agent direction - messages will be unwrapped from SuccessorMessage
1198    /// connection.on_receive_request_from(Agent, async |req: InitializeRequest, responder, cx| {
1199    ///     // Handle the request
1200    ///     responder.respond(InitializeResponse::make())
1201    /// })
1202    /// ```
1203    ///
1204    /// # Ordering
1205    ///
1206    /// This callback runs inside the dispatch loop and blocks further message processing
1207    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
1208    /// ordering guarantees and how to avoid deadlocks.
1209    pub fn on_receive_request_from<Req: JsonRpcRequest, Peer: Role, F, T, ToFut>(
1210        self,
1211        peer: Peer,
1212        op: F,
1213        to_future_hack: ToFut,
1214    ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
1215    where
1216        Host::Counterpart: HasPeer<Peer>,
1217        F: AsyncFnMut(
1218                Req,
1219                Responder<Req::Response>,
1220                ConnectionTo<Host::Counterpart>,
1221            ) -> Result<T, crate::Error>
1222            + Send,
1223        T: IntoHandled<(Req, Responder<Req::Response>)>,
1224        ToFut: Fn(
1225                &mut F,
1226                Req,
1227                Responder<Req::Response>,
1228                ConnectionTo<Host::Counterpart>,
1229            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
1230            + Send
1231            + Sync,
1232    {
1233        let handler = RequestHandler::new(self.host.counterpart(), peer, op, to_future_hack);
1234        self.with_handler(handler)
1235    }
1236
1237    /// Register a handler for JSON-RPC notifications from a specific peer.
1238    ///
1239    /// This is similar to [`on_receive_notification`](Self::on_receive_notification), but allows
1240    /// specifying the source peer explicitly. This is useful when receiving messages
1241    /// from a peer that requires message transformation (e.g., unwrapping `SuccessorNotification`
1242    /// envelopes when receiving from an agent via a proxy).
1243    ///
1244    /// For the common case of receiving from the default counterpart, use
1245    /// [`on_receive_notification`](Self::on_receive_notification) instead.
1246    ///
1247    /// # Ordering
1248    ///
1249    /// This callback runs inside the dispatch loop and blocks further message processing
1250    /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
1251    /// ordering guarantees and how to avoid deadlocks.
1252    pub fn on_receive_notification_from<Notif: JsonRpcNotification, Peer: Role, F, T, ToFut>(
1253        self,
1254        peer: Peer,
1255        op: F,
1256        to_future_hack: ToFut,
1257    ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
1258    where
1259        Host::Counterpart: HasPeer<Peer>,
1260        F: AsyncFnMut(Notif, ConnectionTo<Host::Counterpart>) -> Result<T, crate::Error> + Send,
1261        T: IntoHandled<(Notif, ConnectionTo<Host::Counterpart>)>,
1262        ToFut: Fn(
1263                &mut F,
1264                Notif,
1265                ConnectionTo<Host::Counterpart>,
1266            ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
1267            + Send
1268            + Sync,
1269    {
1270        let handler = NotificationHandler::new(self.host.counterpart(), peer, op, to_future_hack);
1271        self.with_handler(handler)
1272    }
1273
1274    /// Add an MCP server that will be added to all new sessions that are proxied through this connection.
1275    ///
1276    /// Only applicable to proxies.
1277    pub fn with_mcp_server(
1278        self,
1279        mcp_server: McpServer<Host::Counterpart, impl RunWithConnectionTo<Host::Counterpart>>,
1280    ) -> Builder<
1281        Host,
1282        impl HandleDispatchFrom<Host::Counterpart>,
1283        impl RunWithConnectionTo<Host::Counterpart>,
1284    >
1285    where
1286        Host::Counterpart: HasPeer<Agent> + HasPeer<Client>,
1287    {
1288        let (handler, responder) = mcp_server.into_handler_and_responder();
1289        self.with_handler(handler).with_responder(responder)
1290    }
1291
1292    /// Run in server mode with the provided transport.
1293    ///
1294    /// This drives the connection by continuously processing messages from the transport
1295    /// and dispatching them to your registered handlers. The connection will run until:
1296    /// - The transport closes (e.g., EOF on byte streams)
1297    /// - An error occurs
1298    /// - One of your handlers returns an error
1299    ///
1300    /// The transport is responsible for serializing and deserializing [`RawJsonRpcMessage`]
1301    /// values to/from the underlying I/O mechanism (byte streams, channels, etc.).
1302    ///
1303    /// Use this mode when you only need to respond to incoming messages and don't need
1304    /// to initiate your own requests. If you need to send requests to the other side,
1305    /// use [`connect_with`](Self::connect_with) instead.
1306    ///
1307    /// # Example: Byte Stream Transport
1308    ///
1309    /// ```no_run
1310    /// # use agent_client_protocol::UntypedRole;
1311    /// # use agent_client_protocol::{Builder};
1312    /// # use agent_client_protocol::Stdio;
1313    /// # use agent_client_protocol_test::*;
1314    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
1315    /// let transport = Stdio::new();
1316    ///
1317    /// UntypedRole.builder()
1318    ///     .on_receive_request(async |req: MyRequest, responder, cx| {
1319    ///         responder.respond(MyResponse { status: "ok".into() })
1320    ///     }, agent_client_protocol::on_receive_request!())
1321    ///     .connect_to(transport)
1322    ///     .await?;
1323    /// # Ok(())
1324    /// # }
1325    /// ```
1326    pub async fn connect_to(
1327        self,
1328        transport: impl ConnectTo<Host> + 'static,
1329    ) -> Result<(), crate::Error> {
1330        self.connect_with(transport, async move |_cx| future::pending().await)
1331            .await
1332    }
1333
1334    /// Run the connection until the provided closure completes.
1335    ///
1336    /// This drives the connection by:
1337    /// 1. Running your registered handlers in the background to process incoming messages
1338    /// 2. Executing your `main_fn` closure with a [`ConnectionTo<R>`] for sending requests/notifications
1339    ///
1340    /// The connection stays active until your `main_fn` returns, then shuts down gracefully.
1341    /// If the connection closes unexpectedly before `main_fn` completes, this returns an error.
1342    ///
1343    /// Use this mode when you need to initiate communication (send requests/notifications)
1344    /// in addition to responding to incoming messages. For server-only mode where you just
1345    /// respond to messages, use [`connect_to`](Self::connect_to) instead.
1346    ///
1347    /// # Example
1348    ///
1349    /// ```no_run
1350    /// # use agent_client_protocol::UntypedRole;
1351    /// # use agent_client_protocol::{Builder};
1352    /// # use agent_client_protocol::ByteStreams;
1353    /// # use agent_client_protocol::schema::v1::InitializeRequest;
1354    /// # use agent_client_protocol::Stdio;
1355    /// # use agent_client_protocol_test::*;
1356    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
1357    /// let transport = Stdio::new();
1358    ///
1359    /// UntypedRole.builder()
1360    ///     .on_receive_request(async |req: MyRequest, responder, cx| {
1361    ///         // Handle incoming requests in the background
1362    ///         responder.respond(MyResponse { status: "ok".into() })
1363    ///     }, agent_client_protocol::on_receive_request!())
1364    ///     .connect_with(transport, async |cx| {
1365    ///         // Initialize the protocol
1366    ///         let init_response = cx.send_request(InitializeRequest::make())
1367    ///             .block_task()
1368    ///             .await?;
1369    ///
1370    ///         // Send more requests...
1371    ///         let result = cx.send_request(MyRequest {})
1372    ///             .block_task()
1373    ///             .await?;
1374    ///
1375    ///         // When this closure returns, the connection shuts down
1376    ///         Ok(())
1377    ///     })
1378    ///     .await?;
1379    /// # Ok(())
1380    /// # }
1381    /// ```
1382    ///
1383    /// # Parameters
1384    ///
1385    /// - `main_fn`: Your client logic. Receives a [`ConnectionTo<R>`] for sending messages.
1386    ///
1387    /// # Errors
1388    ///
1389    /// Returns an error if the connection closes before `main_fn` completes.
1390    pub async fn connect_with<R>(
1391        self,
1392        transport: impl ConnectTo<Host> + 'static,
1393        main_fn: impl AsyncFnOnce(ConnectionTo<Host::Counterpart>) -> Result<R, crate::Error>,
1394    ) -> Result<R, crate::Error> {
1395        let (_, future) = self.into_connection_and_future(transport, main_fn);
1396        future.await
1397    }
1398
1399    /// Helper that returns a [`ConnectionTo<R>`] and a future that runs this connection until `main_fn` returns.
1400    fn into_connection_and_future<R>(
1401        self,
1402        transport: impl ConnectTo<Host> + 'static,
1403        main_fn: impl AsyncFnOnce(ConnectionTo<Host::Counterpart>) -> Result<R, crate::Error>,
1404    ) -> (
1405        ConnectionTo<Host::Counterpart>,
1406        impl Future<Output = Result<R, crate::Error>>,
1407    ) {
1408        let Self {
1409            name,
1410            handler,
1411            responder,
1412            host: me,
1413            protocol_mode,
1414        } = self;
1415
1416        let (outgoing_tx, outgoing_rx) = mpsc::unbounded();
1417        let (new_task_tx, new_task_rx) = mpsc::unbounded();
1418        let (dynamic_handler_tx, dynamic_handler_rx) = mpsc::unbounded();
1419        let connection = ConnectionTo::new(
1420            me.counterpart(),
1421            outgoing_tx,
1422            new_task_tx,
1423            dynamic_handler_tx,
1424        );
1425
1426        // Convert transport into server - this returns a channel for us to use
1427        // and a future that runs the transport
1428        let transport_component = crate::DynConnectTo::new(transport);
1429        let (transport_channel, transport_future) = transport_component.into_channel_and_future();
1430        let spawn_result = connection.spawn(transport_future);
1431
1432        // Destructure the channel endpoints
1433        let Channel {
1434            rx: transport_incoming_rx,
1435            tx: transport_outgoing_tx,
1436        } = transport_channel;
1437
1438        let (reply_tx, reply_rx) = mpsc::unbounded();
1439        let protocol_compat = ProtocolCompat::new(protocol_mode);
1440
1441        let future = crate::util::instrument_with_connection_name(name, {
1442            let connection = connection.clone();
1443            async move {
1444                let () = spawn_result?;
1445
1446                let background = async {
1447                    futures::try_join!(
1448                        // Protocol layer: OutgoingMessage -> RawJsonRpcMessage
1449                        outgoing_actor::outgoing_protocol_actor(
1450                            outgoing_rx,
1451                            reply_tx.clone(),
1452                            transport_outgoing_tx,
1453                            protocol_compat.clone(),
1454                        ),
1455                        // Protocol layer: RawJsonRpcMessage -> handler/reply routing
1456                        incoming_actor::incoming_protocol_actor(
1457                            me.counterpart(),
1458                            &connection,
1459                            transport_incoming_rx,
1460                            dynamic_handler_rx,
1461                            reply_rx,
1462                            handler,
1463                            protocol_compat,
1464                        ),
1465                        task_actor::task_actor(new_task_rx, &connection),
1466                        responder.run_with_connection_to(connection.clone()),
1467                    )?;
1468                    Ok(())
1469                };
1470
1471                crate::util::run_until(Box::pin(background), Box::pin(main_fn(connection.clone())))
1472                    .await
1473            }
1474        });
1475
1476        (connection, future)
1477    }
1478}
1479
1480impl<R, H, Run> ConnectTo<R::Counterpart> for Builder<R, H, Run>
1481where
1482    R: Role,
1483    H: HandleDispatchFrom<R::Counterpart> + 'static,
1484    Run: RunWithConnectionTo<R::Counterpart> + 'static,
1485{
1486    async fn connect_to(self, client: impl ConnectTo<R>) -> Result<(), crate::Error> {
1487        Builder::connect_to(self, client).await
1488    }
1489}
1490
1491/// The payload sent through the response oneshot channel.
1492///
1493/// Includes the response value and an optional ack channel for dispatch loop
1494/// synchronization.
1495pub(crate) struct ResponsePayload {
1496    /// The response result - either the JSON value or an error.
1497    pub(crate) result: Result<serde_json::Value, crate::Error>,
1498
1499    /// Optional acknowledgment channel for dispatch loop synchronization.
1500    ///
1501    /// When present, the receiver must send on this channel to signal that
1502    /// response processing is complete, allowing the dispatch loop to continue
1503    /// to the next message.
1504    ///
1505    /// This is `None` for error paths where the response is sent directly
1506    /// (e.g., when the outgoing channel is broken) rather than through the
1507    /// normal dispatch loop flow.
1508    pub(crate) ack_tx: Option<oneshot::Sender<()>>,
1509}
1510
1511impl std::fmt::Debug for ResponsePayload {
1512    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1513        f.debug_struct("ResponsePayload")
1514            .field("result", &self.result)
1515            .field("ack_tx", &self.ack_tx.as_ref().map(|_| "..."))
1516            .finish()
1517    }
1518}
1519
1520/// Message sent to the incoming actor for reply subscription management.
1521enum ReplyMessage {
1522    /// Subscribe to receive a response for the given request id.
1523    /// When a response with this id arrives, it will be sent through the oneshot
1524    /// along with an ack channel that must be signaled when processing is complete.
1525    /// The method name is stored to allow routing responses through typed handlers.
1526    Subscribe {
1527        id: RequestId,
1528
1529        /// id of the peer this request was sent to
1530        role_id: RoleId,
1531
1532        /// (original) method of the request -- the actual request may have been transformed
1533        /// to a successor method, but this will reflect the method of the wrapped request
1534        method: String,
1535
1536        sender: oneshot::Sender<ResponsePayload>,
1537
1538        #[cfg(feature = "unstable_cancel_request")]
1539        cancellation_disarm: SentRequestCancellationDisarm,
1540    },
1541}
1542
1543impl std::fmt::Debug for ReplyMessage {
1544    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1545        match self {
1546            ReplyMessage::Subscribe { id, method, .. } => f
1547                .debug_struct("Subscribe")
1548                .field("id", id)
1549                .field("method", method)
1550                .finish(),
1551        }
1552    }
1553}
1554
1555/// A request-local marker that is set when the peer asks to cancel the request.
1556///
1557/// Request handlers can get this handle from [`Responder::cancellation`] and
1558/// use it from spawned work to stop long-running request processing
1559/// cooperatively.
1560#[cfg(feature = "unstable_cancel_request")]
1561#[derive(Clone)]
1562pub struct RequestCancellation {
1563    state: Arc<RequestCancellationState>,
1564}
1565
1566#[cfg(feature = "unstable_cancel_request")]
1567struct RequestCancellationState {
1568    cancelled: AtomicBool,
1569    signal_tx: Mutex<Option<oneshot::Sender<()>>>,
1570    signal_rx: future::Shared<BoxFuture<'static, ()>>,
1571}
1572
1573#[cfg(feature = "unstable_cancel_request")]
1574impl RequestCancellation {
1575    fn new() -> Self {
1576        let (signal_tx, signal_rx) = oneshot::channel();
1577        let signal_rx = signal_rx.map(|_| ()).boxed().shared();
1578        Self {
1579            state: Arc::new(RequestCancellationState {
1580                cancelled: AtomicBool::new(false),
1581                signal_tx: Mutex::new(Some(signal_tx)),
1582                signal_rx,
1583            }),
1584        }
1585    }
1586
1587    /// Wait until the peer sends `$/cancel_request` for this request.
1588    ///
1589    /// If cancellation was already requested, this returns immediately.
1590    pub async fn cancelled(&self) {
1591        self.state.signal_rx.clone().await;
1592    }
1593
1594    /// Run request work until it completes or the peer asks to cancel it.
1595    ///
1596    /// If cancellation is requested first, this returns
1597    /// [`Error::request_cancelled`]. This is a convenience for request handlers
1598    /// that want to respond with the normal result or the standard
1599    /// cancellation error.
1600    ///
1601    /// When cancellation wins, `future` is dropped: work stops at its next
1602    /// await point, partial results are lost, and any cleanup must happen in
1603    /// `Drop` implementations. Handlers that need to flush partial results or
1604    /// run async cleanup should instead watch [`cancelled`](Self::cancelled)
1605    /// or poll [`is_cancelled`](Self::is_cancelled) from inside the work.
1606    ///
1607    /// [`Error::request_cancelled`]: crate::Error::request_cancelled
1608    pub async fn run_until_cancelled<T>(
1609        &self,
1610        future: impl std::future::Future<Output = Result<T, crate::Error>>,
1611    ) -> Result<T, crate::Error> {
1612        if self.is_cancelled() {
1613            return Err(crate::Error::request_cancelled());
1614        }
1615
1616        match future::select(pin!(future), pin!(self.cancelled())).await {
1617            Either::Left((result, _)) => result,
1618            Either::Right(((), _)) => Err(crate::Error::request_cancelled()),
1619        }
1620    }
1621
1622    /// Returns whether the peer has already requested cancellation.
1623    #[must_use]
1624    pub fn is_cancelled(&self) -> bool {
1625        self.state.cancelled.load(Ordering::Acquire)
1626    }
1627
1628    fn cancel(&self) {
1629        if self.state.cancelled.swap(true, Ordering::AcqRel) {
1630            return;
1631        }
1632
1633        let signal_tx = self
1634            .state
1635            .signal_tx
1636            .lock()
1637            .expect("request cancellation signal mutex poisoned")
1638            .take();
1639
1640        // Complete the oneshot outside the lock: it wakes waiters, and
1641        // arbitrary waker code must not observe the lock held.
1642        if let Some(signal_tx) = signal_tx {
1643            let _ = signal_tx.send(());
1644        }
1645    }
1646}
1647
1648#[cfg(feature = "unstable_cancel_request")]
1649impl Debug for RequestCancellation {
1650    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1651        formatter
1652            .debug_struct("RequestCancellation")
1653            .field("is_cancelled", &self.is_cancelled())
1654            .finish_non_exhaustive()
1655    }
1656}
1657
1658/// Per-request cancellation state tracked by [`RequestCancellationRegistry`].
1659///
1660/// The full [`RequestCancellation`] marker (with its wakeup machinery) is only
1661/// allocated once a handler asks for it via [`Responder::cancellation`]; until
1662/// then an incoming `$/cancel_request` just flips the entry to `Cancelled`.
1663/// This keeps the per-request cost of the registry to a single map entry.
1664#[cfg(feature = "unstable_cancel_request")]
1665#[derive(Debug)]
1666enum RequestCancellationEntry {
1667    /// The request is in flight; no marker handed out, no cancellation yet.
1668    Armed,
1669    /// `$/cancel_request` arrived before a marker was handed out.
1670    Cancelled,
1671    /// A marker was handed out via [`Responder::cancellation`].
1672    Marker(RequestCancellation),
1673}
1674
1675/// A registered request's cancellation state, tagged with the generation of
1676/// its registration.
1677///
1678/// The generation distinguishes a registration from earlier ones that used
1679/// the same request ID, so that when a (protocol-violating) peer reuses the
1680/// ID of a request that is still in flight, the stale request's responder can
1681/// neither remove nor observe the cancellation state of the newer request.
1682#[cfg(feature = "unstable_cancel_request")]
1683#[derive(Debug)]
1684struct RequestCancellationSlot {
1685    generation: u64,
1686    entry: RequestCancellationEntry,
1687}
1688
1689#[cfg(feature = "unstable_cancel_request")]
1690#[derive(Debug, Default)]
1691struct RequestCancellationRegistryInner {
1692    slots: HashMap<RequestId, RequestCancellationSlot>,
1693    next_generation: u64,
1694}
1695
1696#[cfg(feature = "unstable_cancel_request")]
1697#[derive(Clone, Debug, Default)]
1698struct RequestCancellationRegistry {
1699    inner: Arc<Mutex<RequestCancellationRegistryInner>>,
1700}
1701
1702#[cfg(not(feature = "unstable_cancel_request"))]
1703#[derive(Clone, Debug, Default)]
1704struct RequestCancellationRegistry;
1705
1706#[cfg(feature = "unstable_cancel_request")]
1707#[derive(Debug)]
1708struct ResponderCancellation {
1709    id: RequestId,
1710    generation: u64,
1711    registry: RequestCancellationRegistry,
1712}
1713
1714#[cfg(not(feature = "unstable_cancel_request"))]
1715#[derive(Debug)]
1716struct ResponderCancellation;
1717
1718#[cfg(feature = "unstable_cancel_request")]
1719impl RequestCancellationRegistry {
1720    fn new() -> Self {
1721        Self::default()
1722    }
1723
1724    fn register(&self, id: &RequestId) -> ResponderCancellation {
1725        let generation = {
1726            let mut inner = self
1727                .inner
1728                .lock()
1729                .expect("request cancellation registry mutex poisoned");
1730            let generation = inner.next_generation;
1731            inner.next_generation += 1;
1732            if inner
1733                .slots
1734                .insert(
1735                    id.clone(),
1736                    RequestCancellationSlot {
1737                        generation,
1738                        entry: RequestCancellationEntry::Armed,
1739                    },
1740                )
1741                .is_some()
1742            {
1743                tracing::debug!(
1744                    ?id,
1745                    "peer reused the ID of a request that is still in flight"
1746                );
1747            }
1748            generation
1749        };
1750        ResponderCancellation {
1751            id: id.clone(),
1752            generation,
1753            registry: self.clone(),
1754        }
1755    }
1756
1757    /// Get the cancellation marker for a registered request, creating it on
1758    /// first use. Repeated calls return markers that share the same state.
1759    ///
1760    /// Exception: when the registration is stale (a protocol-violating peer
1761    /// reused this request ID and the slot now belongs to a newer request, or
1762    /// was already removed by it), every call returns a fresh *detached*
1763    /// marker. Detached markers can never fire, and detached markers from
1764    /// repeated calls do not share state with each other.
1765    fn marker(&self, id: &RequestId, generation: u64) -> RequestCancellation {
1766        let mut inner = self
1767            .inner
1768            .lock()
1769            .expect("request cancellation registry mutex poisoned");
1770        let Some(slot) = inner.slots.get_mut(id) else {
1771            // The slot lives as long as the responder that owns it, so this
1772            // is only reachable if the peer reused this request ID and the
1773            // newer request's responder already removed the replacement slot.
1774            // Hand out a detached marker rather than panicking.
1775            return RequestCancellation::new();
1776        };
1777        if slot.generation != generation {
1778            // The peer reused this request ID while the request was still in
1779            // flight, and the slot now belongs to the newer request. Hand the
1780            // stale responder a detached marker instead of cross-wiring the
1781            // two requests' cancellation states.
1782            return RequestCancellation::new();
1783        }
1784        let entry = &mut slot.entry;
1785        match entry {
1786            RequestCancellationEntry::Marker(marker) => marker.clone(),
1787            RequestCancellationEntry::Armed => {
1788                let marker = RequestCancellation::new();
1789                *entry = RequestCancellationEntry::Marker(marker.clone());
1790                marker
1791            }
1792            RequestCancellationEntry::Cancelled => {
1793                // No one can be waiting on a marker that did not exist yet,
1794                // so firing it while holding the registry lock is fine.
1795                let marker = RequestCancellation::new();
1796                marker.cancel();
1797                *entry = RequestCancellationEntry::Marker(marker.clone());
1798                marker
1799            }
1800        }
1801    }
1802
1803    fn cancel_if_requested(&self, dispatch: &Dispatch) -> Result<bool, crate::Error> {
1804        let Some(request_id) = cancellation_request_id(dispatch)? else {
1805            return Ok(false);
1806        };
1807        Ok(self.cancel(&request_id))
1808    }
1809
1810    /// Mark whichever request currently owns `request_id` as cancelled.
1811    fn cancel(&self, request_id: &RequestId) -> bool {
1812        let marker = {
1813            let mut inner = self
1814                .inner
1815                .lock()
1816                .expect("request cancellation registry mutex poisoned");
1817            let Some(slot) = inner.slots.get_mut(request_id) else {
1818                return false;
1819            };
1820            let entry = &mut slot.entry;
1821            match entry {
1822                RequestCancellationEntry::Marker(marker) => marker.clone(),
1823                RequestCancellationEntry::Cancelled => return true,
1824                RequestCancellationEntry::Armed => {
1825                    *entry = RequestCancellationEntry::Cancelled;
1826                    return true;
1827                }
1828            }
1829        };
1830
1831        // Fire the marker outside the registry lock: waking waiters runs
1832        // arbitrary waker code that must not observe the lock held.
1833        marker.cancel();
1834        true
1835    }
1836
1837    /// Remove the slot for `request_id`, but only if it still belongs to the
1838    /// registration identified by `generation`.
1839    fn remove(&self, request_id: &RequestId, generation: u64) {
1840        let mut inner = self
1841            .inner
1842            .lock()
1843            .expect("request cancellation registry mutex poisoned");
1844        if inner
1845            .slots
1846            .get(request_id)
1847            .is_some_and(|slot| slot.generation == generation)
1848        {
1849            inner.slots.remove(request_id);
1850        }
1851    }
1852}
1853
1854#[cfg(not(feature = "unstable_cancel_request"))]
1855impl RequestCancellationRegistry {
1856    fn new() -> Self {
1857        Self
1858    }
1859
1860    #[expect(
1861        clippy::unused_self,
1862        reason = "feature-disabled stub mirrors the real registry API"
1863    )]
1864    fn register(&self, _id: &RequestId) -> ResponderCancellation {
1865        ResponderCancellation
1866    }
1867
1868    #[expect(
1869        clippy::unused_self,
1870        clippy::unnecessary_wraps,
1871        reason = "feature-disabled stub mirrors the real registry API"
1872    )]
1873    fn cancel_if_requested(&self, _dispatch: &Dispatch) -> Result<bool, crate::Error> {
1874        Ok(false)
1875    }
1876}
1877
1878#[cfg(feature = "unstable_cancel_request")]
1879impl ResponderCancellation {
1880    fn cancellation(&self) -> RequestCancellation {
1881        self.registry.marker(&self.id, self.generation)
1882    }
1883}
1884
1885#[cfg(feature = "unstable_cancel_request")]
1886impl Drop for ResponderCancellation {
1887    fn drop(&mut self) {
1888        self.registry.remove(&self.id, self.generation);
1889    }
1890}
1891
1892#[cfg(feature = "unstable_cancel_request")]
1893fn cancellation_request_id(dispatch: &Dispatch) -> Result<Option<RequestId>, crate::Error> {
1894    let Dispatch::Notification(message) = dispatch else {
1895        return Ok(None);
1896    };
1897    cancellation_request_id_from_message(message)
1898}
1899
1900#[cfg(feature = "unstable_cancel_request")]
1901fn cancellation_request_id_from_message(
1902    message: &UntypedMessage,
1903) -> Result<Option<RequestId>, crate::Error> {
1904    let (method, params) = peel_successor_envelopes(&message.method, &message.params);
1905    if !crate::schema::v1::CancelRequestNotification::matches_method(method) {
1906        return Ok(None);
1907    }
1908
1909    let notification = crate::schema::v1::CancelRequestNotification::parse_message(method, params)?;
1910    Ok(Some(notification.request_id))
1911}
1912
1913/// Peel any [`SuccessorMessage`] envelopes off a notification by reference,
1914/// returning the innermost method and params.
1915///
1916/// This only peeks at the envelope's `method`/`params` fields instead of
1917/// deserializing the envelope, for two reasons:
1918///
1919/// - It avoids deep-cloning the params of every wrapped notification on the
1920///   hot dispatch path just to inspect the inner method name.
1921/// - It is deliberately lenient: a malformed envelope is left as-is here and
1922///   flows on to the handler chain, which is responsible for reporting it.
1923///
1924/// [`SuccessorMessage`]: crate::schema::SuccessorMessage
1925fn peel_successor_envelopes<'message>(
1926    mut method: &'message str,
1927    mut params: &'message serde_json::Value,
1928) -> (&'message str, &'message serde_json::Value) {
1929    while crate::schema::SuccessorMessage::<UntypedMessage>::matches_method(method) {
1930        let Some(inner_method) = params.get("method").and_then(serde_json::Value::as_str) else {
1931            break;
1932        };
1933        method = inner_method;
1934        params = params.get("params").unwrap_or(&serde_json::Value::Null);
1935    }
1936    (method, params)
1937}
1938
1939/// Whether a notification is a `$/cancel_request`, even when it is still
1940/// wrapped in `_proxy/successor` envelopes.
1941///
1942/// `$/cancel_request` is connection-scoped: its `requestId` was allocated on
1943/// the connection the notification arrived over and means nothing on any
1944/// other connection. Generic forwarding code (such as
1945/// [`ConnectionTo::send_proxied_message_to`]) uses this check to drop the raw
1946/// notification instead of tunneling it across a hop; the cancellation still
1947/// propagates because [`forward_response_to`](SentRequest::forward_response_to)
1948/// re-issues it with the forwarded request's own ID.
1949///
1950/// Checking a notification whose method is not the successor envelope is a
1951/// plain method-name comparison. Only successor-wrapped notifications pay for
1952/// a serialization to peel the envelope.
1953#[cfg(feature = "unstable_cancel_request")]
1954#[must_use]
1955pub fn is_cancel_request_notification<N: JsonRpcNotification>(notification: &N) -> bool {
1956    let method = notification.method();
1957    if crate::schema::v1::CancelRequestNotification::matches_method(method) {
1958        return true;
1959    }
1960    if !crate::schema::SuccessorMessage::<UntypedMessage>::matches_method(method) {
1961        return false;
1962    }
1963
1964    match notification.to_untyped_message() {
1965        Ok(untyped) => {
1966            let (method, _params) = peel_successor_envelopes(&untyped.method, &untyped.params);
1967            crate::schema::v1::CancelRequestNotification::matches_method(method)
1968        }
1969        Err(error) => {
1970            tracing::debug!(
1971                ?error,
1972                "failed to inspect successor-wrapped notification for cancellation"
1973            );
1974            false
1975        }
1976    }
1977}
1978
1979/// Whether the dispatch is a protocol-level (`$/`-prefixed) notification,
1980/// possibly wrapped in a [`SuccessorMessage`] envelope.
1981///
1982/// Unhandled protocol-level notifications are ignored rather than rejected
1983/// with a method-not-found error. This is deliberately *not* feature-gated:
1984/// protocol-level notifications are optional by design, so a peer that sends
1985/// `$/cancel_request` must be able to interoperate with an SDK built without
1986/// `unstable_cancel_request` (which simply won't act on it).
1987///
1988/// A handler that explicitly declines with `retry: true` takes precedence
1989/// over this fallback: the notification is queued for newly registered
1990/// dynamic handlers like any other retried message.
1991///
1992/// [`SuccessorMessage`]: crate::schema::SuccessorMessage
1993fn is_protocol_level_notification(dispatch: &Dispatch) -> bool {
1994    let Dispatch::Notification(message) = dispatch else {
1995        return false;
1996    };
1997    let (method, _params) = peel_successor_envelopes(&message.method, &message.params);
1998    method.starts_with("$/")
1999}
2000
2001/// Messages send to be serialized over the transport.
2002#[derive(Debug)]
2003enum OutgoingMessage {
2004    /// Send a request to the server.
2005    Request {
2006        /// id assigned to this request (generated by sender)
2007        id: RequestId,
2008
2009        /// the original method
2010        method: String,
2011
2012        /// the peer we sent this to
2013        role_id: RoleId,
2014
2015        /// the message to send; this may have a distinct method
2016        /// depending on the peer
2017        untyped: UntypedMessage,
2018
2019        /// where to send the response when it arrives (includes ack channel)
2020        response_tx: oneshot::Sender<ResponsePayload>,
2021
2022        #[cfg(feature = "unstable_cancel_request")]
2023        cancellation_disarm: SentRequestCancellationDisarm,
2024    },
2025
2026    /// Send a notification to the server.
2027    Notification {
2028        /// the message to send; this may have a distinct method
2029        /// depending on the peer
2030        untyped: UntypedMessage,
2031    },
2032
2033    /// Send a response to a message from the server
2034    Response {
2035        id: RequestId,
2036
2037        /// Method of the incoming request this response completes.
2038        method: String,
2039
2040        response: Result<serde_json::Value, crate::Error>,
2041    },
2042
2043    /// Send a generalized error message
2044    Error { error: crate::Error },
2045}
2046
2047/// Return type from JrHandler; indicates whether the request was handled or not.
2048#[must_use]
2049#[derive(Debug)]
2050pub enum Handled<T> {
2051    /// The message was handled
2052    Yes,
2053
2054    /// The message was not handled; returns the original value.
2055    ///
2056    /// If `retry` is true,
2057    No {
2058        /// The message to be passed to subsequent handlers
2059        /// (typically the original message, but it may have been
2060        /// mutated.)
2061        message: T,
2062
2063        /// If true, request the message to be queued and retried with
2064        /// dynamic handlers as they are added.
2065        ///
2066        /// This is used for managing session updates since the dynamic
2067        /// handler for a session cannot be added until the response to the
2068        /// new session request has been processed and there may be updates
2069        /// that get processed at the same time.
2070        retry: bool,
2071    },
2072}
2073
2074/// Trait for converting handler return values into [`Handled`].
2075///
2076/// This trait allows handlers to return either `()` (which becomes `Handled::Yes`)
2077/// or an explicit `Handled<T>` value for more control over handler propagation.
2078pub trait IntoHandled<T> {
2079    /// Convert this value into a `Handled<T>`.
2080    fn into_handled(self) -> Handled<T>;
2081}
2082
2083impl<T> IntoHandled<T> for () {
2084    fn into_handled(self) -> Handled<T> {
2085        Handled::Yes
2086    }
2087}
2088
2089impl<T> IntoHandled<T> for Handled<T> {
2090    fn into_handled(self) -> Handled<T> {
2091        self
2092    }
2093}
2094
2095/// Connection context for sending messages and spawning tasks.
2096///
2097/// This is the primary handle for interacting with the JSON-RPC connection from
2098/// within handler callbacks. You can use it to:
2099///
2100/// * Send requests and notifications to the other side
2101/// * Spawn concurrent tasks that run alongside the connection
2102/// * Respond to requests (via [`Responder`] which wraps this)
2103///
2104/// # Cloning
2105///
2106/// `ConnectionTo` is cheaply cloneable - all clones refer to the same underlying connection.
2107/// This makes it easy to share across async tasks.
2108///
2109/// # Event Loop and Concurrency
2110///
2111/// Handler callbacks run on the event loop, which means the connection cannot process new
2112/// messages while your handler is running. Use [`spawn`](Self::spawn) to offload any
2113/// expensive or blocking work to concurrent tasks.
2114///
2115/// See the [Event Loop and Concurrency](Builder#event-loop-and-concurrency) section
2116/// for more details.
2117#[derive(Clone, Debug)]
2118pub struct ConnectionTo<Counterpart: Role> {
2119    counterpart: Counterpart,
2120    message_tx: OutgoingMessageTx,
2121    task_tx: TaskTx,
2122    dynamic_handler_tx: mpsc::UnboundedSender<DynamicHandlerMessage<Counterpart>>,
2123}
2124
2125impl<Counterpart: Role> ConnectionTo<Counterpart> {
2126    fn new(
2127        counterpart: Counterpart,
2128        message_tx: mpsc::UnboundedSender<OutgoingMessage>,
2129        task_tx: mpsc::UnboundedSender<Task>,
2130        dynamic_handler_tx: mpsc::UnboundedSender<DynamicHandlerMessage<Counterpart>>,
2131    ) -> Self {
2132        Self {
2133            counterpart,
2134            message_tx,
2135            task_tx,
2136            dynamic_handler_tx,
2137        }
2138    }
2139
2140    /// Return the counterpart role this connection is talking to.
2141    pub fn counterpart(&self) -> Counterpart {
2142        self.counterpart.clone()
2143    }
2144
2145    /// Spawns a task that will run so long as the JSON-RPC connection is being served.
2146    ///
2147    /// This is the primary mechanism for offloading expensive work from handler callbacks
2148    /// to avoid blocking the event loop. Spawned tasks run concurrently with the connection,
2149    /// allowing the server to continue processing messages.
2150    ///
2151    /// # Event Loop
2152    ///
2153    /// Handler callbacks run on the event loop, which cannot process new messages while
2154    /// your handler is running. Use `spawn` for any expensive operations:
2155    ///
2156    /// ```no_run
2157    /// # use agent_client_protocol_test::*;
2158    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
2159    /// # let connection = mock_connection();
2160    /// connection.on_receive_request(async |req: ProcessRequest, responder, cx| {
2161    ///     // Clone cx for the spawned task
2162    ///     cx.spawn({
2163    ///         let connection = cx.clone();
2164    ///         async move {
2165    ///             let result = expensive_operation(&req.data).await?;
2166    ///             connection.send_notification(ProcessComplete { result })?;
2167    ///             Ok(())
2168    ///         }
2169    ///     })?;
2170    ///
2171    ///     // Respond immediately
2172    ///     responder.respond(ProcessResponse { result: "started".into() })
2173    /// }, agent_client_protocol::on_receive_request!())
2174    /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
2175    /// # Ok(())
2176    /// # }
2177    /// ```
2178    ///
2179    /// # Errors
2180    ///
2181    /// If the spawned task returns an error, the entire server will shut down.
2182    #[track_caller]
2183    pub fn spawn(
2184        &self,
2185        task: impl IntoFuture<Output = Result<(), crate::Error>, IntoFuture: Send + 'static>,
2186    ) -> Result<(), crate::Error> {
2187        let location = std::panic::Location::caller();
2188        let task = task.into_future();
2189        Task::new(location, task).spawn(&self.task_tx)
2190    }
2191
2192    /// Spawn a JSON-RPC connection in the background and return a [`ConnectionTo`] for sending messages to it.
2193    ///
2194    /// This is useful for creating multiple connections that communicate with each other,
2195    /// such as implementing proxy patterns or connecting to multiple backend services.
2196    ///
2197    /// # Arguments
2198    ///
2199    /// - `builder`: The connection builder with handlers configured
2200    /// - `transport`: The transport component to connect to
2201    ///
2202    /// # Returns
2203    ///
2204    /// A `ConnectionTo` that you can use to send requests and notifications to the spawned connection.
2205    ///
2206    /// # Example: Proxying to a backend connection
2207    ///
2208    /// ```
2209    /// # use agent_client_protocol::UntypedRole;
2210    /// # use agent_client_protocol::{Builder, ConnectionTo};
2211    /// # use agent_client_protocol_test::*;
2212    /// # async fn example(cx: ConnectionTo<UntypedRole>) -> Result<(), agent_client_protocol::Error> {
2213    /// // Set up a backend connection builder
2214    /// let backend = UntypedRole.builder()
2215    ///     .on_receive_request(async |req: MyRequest, responder, _cx| {
2216    ///         responder.respond(MyResponse { status: "ok".into() })
2217    ///     }, agent_client_protocol::on_receive_request!());
2218    ///
2219    /// // Spawn it and get a context to send requests to it
2220    /// let backend_connection = cx.spawn_connection(backend, MockTransport)?;
2221    ///
2222    /// // Now you can forward requests to the backend
2223    /// let response = backend_connection.send_request(MyRequest {}).block_task().await?;
2224    /// # Ok(())
2225    /// # }
2226    /// ```
2227    #[track_caller]
2228    pub fn spawn_connection<R: Role>(
2229        &self,
2230        builder: Builder<
2231            R,
2232            impl HandleDispatchFrom<R::Counterpart> + 'static,
2233            impl RunWithConnectionTo<R::Counterpart> + 'static,
2234        >,
2235        transport: impl ConnectTo<R> + 'static,
2236    ) -> Result<ConnectionTo<R::Counterpart>, crate::Error> {
2237        let (connection, future) =
2238            builder.into_connection_and_future(transport, |_| std::future::pending());
2239        Task::new(std::panic::Location::caller(), future).spawn(&self.task_tx)?;
2240        Ok(connection)
2241    }
2242
2243    /// Send a request/notification and forward the response appropriately.
2244    ///
2245    /// The request context's response type matches the request's response type,
2246    /// enabling type-safe message forwarding.
2247    pub fn send_proxied_message<Req: JsonRpcRequest<Response: Send>, Notif: JsonRpcNotification>(
2248        &self,
2249        message: Dispatch<Req, Notif>,
2250    ) -> Result<(), crate::Error>
2251    where
2252        Counterpart: HasPeer<Counterpart>,
2253    {
2254        self.send_proxied_message_to(self.counterpart(), message)
2255    }
2256
2257    /// Send a request/notification and forward the response appropriately.
2258    ///
2259    /// The request context's response type matches the request's response type,
2260    /// enabling type-safe message forwarding.
2261    ///
2262    /// When the `unstable_cancel_request` feature is enabled, `$/cancel_request`
2263    /// notifications are *not* forwarded: their `requestId` refers to a request
2264    /// on the connection they arrived over and would be meaningless to `peer`.
2265    /// Cancellation instead propagates hop by hop, because the responders
2266    /// passed to [`forward_response_to`](SentRequest::forward_response_to)
2267    /// observe it and re-issue the cancellation with the forwarded request's
2268    /// own ID.
2269    pub fn send_proxied_message_to<
2270        Peer: Role,
2271        Req: JsonRpcRequest<Response: Send>,
2272        Notif: JsonRpcNotification,
2273    >(
2274        &self,
2275        peer: Peer,
2276        message: Dispatch<Req, Notif>,
2277    ) -> Result<(), crate::Error>
2278    where
2279        Counterpart: HasPeer<Peer>,
2280    {
2281        match message {
2282            Dispatch::Request(request, responder) => self
2283                .send_request_to(peer, request)
2284                .forward_response_to(responder),
2285            Dispatch::Notification(notification) => {
2286                // `$/cancel_request` is connection-scoped: its `requestId` was
2287                // allocated on the connection the notification arrived over
2288                // and means nothing to `peer`. The cancellation has already
2289                // been recorded on this connection's responder markers, and
2290                // `forward_response_to` re-issues it for the forwarded request
2291                // with the correct per-hop ID, so drop the raw notification
2292                // instead of tunneling a meaningless ID across the hop.
2293                #[cfg(feature = "unstable_cancel_request")]
2294                if is_cancel_request_notification(&notification) {
2295                    tracing::debug!(
2296                        "not forwarding hop-scoped `$/cancel_request` notification across proxy hop"
2297                    );
2298                    return Ok(());
2299                }
2300                self.send_notification_to(peer, notification)
2301            }
2302            Dispatch::Response(result, router) => {
2303                // Responses are forwarded directly to their destination
2304                router.respond_with_result(result)
2305            }
2306        }
2307    }
2308
2309    /// Send an outgoing request and return a [`SentRequest`] for handling the reply.
2310    ///
2311    /// The returned [`SentRequest`] provides methods for receiving the response without
2312    /// blocking the event loop:
2313    ///
2314    /// * [`on_receiving_result`](SentRequest::on_receiving_result) - Schedule
2315    ///   a callback to run when the response arrives (doesn't block the event loop)
2316    /// * [`block_task`](SentRequest::block_task) - Block the current task until the response
2317    ///   arrives (only safe in spawned tasks, not in handlers)
2318    ///
2319    /// # Anti-Footgun Design
2320    ///
2321    /// The API intentionally makes it difficult to block on the result directly to prevent
2322    /// the common mistake of blocking the event loop while waiting for a response:
2323    ///
2324    /// ```compile_fail
2325    /// # use agent_client_protocol_test::*;
2326    /// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
2327    /// // ❌ This doesn't compile - prevents blocking the event loop
2328    /// let response = cx.send_request(MyRequest {}).await?;
2329    /// # Ok(())
2330    /// # }
2331    /// ```
2332    ///
2333    /// ```no_run
2334    /// # use agent_client_protocol_test::*;
2335    /// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
2336    /// // ✅ Option 1: Schedule callback (safe in handlers)
2337    /// cx.send_request(MyRequest {})
2338    ///     .on_receiving_result(async |result| {
2339    ///         // Handle the response
2340    ///         Ok(())
2341    ///     })?;
2342    ///
2343    /// // ✅ Option 2: Block in spawned task (safe because task is concurrent)
2344    /// cx.spawn({
2345    ///     let cx = cx.clone();
2346    ///     async move {
2347    ///         let response = cx.send_request(MyRequest {})
2348    ///             .block_task()
2349    ///             .await?;
2350    ///         // Process response...
2351    ///         Ok(())
2352    ///     }
2353    /// })?;
2354    /// # Ok(())
2355    /// # }
2356    /// ```
2357    /// Send an outgoing request to the default counterpart peer.
2358    ///
2359    /// This is a convenience method that sends to the counterpart role `R`.
2360    /// For explicit control over the target peer, use [`send_request_to`](Self::send_request_to).
2361    pub fn send_request<Req: JsonRpcRequest>(&self, request: Req) -> SentRequest<Req::Response>
2362    where
2363        Counterpart: HasPeer<Counterpart>,
2364    {
2365        self.send_request_to(self.counterpart.clone(), request)
2366    }
2367
2368    /// Send an outgoing request to a specific peer.
2369    ///
2370    /// The message will be transformed according to the [`HasPeer`](crate::role::HasPeer)
2371    /// implementation before being sent.
2372    pub fn send_request_to<Peer: Role, Req: JsonRpcRequest>(
2373        &self,
2374        peer: Peer,
2375        request: Req,
2376    ) -> SentRequest<Req::Response>
2377    where
2378        Counterpart: HasPeer<Peer>,
2379    {
2380        let method = request.method().to_string();
2381        let id = RequestId::Str(uuid::Uuid::new_v4().to_string());
2382        let (response_tx, response_rx) = oneshot::channel();
2383        let role_id = peer.role_id();
2384        let remote_style = self.counterpart.remote_style(peer);
2385        #[cfg(feature = "unstable_cancel_request")]
2386        let cancellation =
2387            SentRequestCancellation::new(self.message_tx.clone(), remote_style, id.clone());
2388        match remote_style.transform_outgoing_message(request) {
2389            Ok(untyped) => {
2390                // Transform the message for the target role
2391                let message = OutgoingMessage::Request {
2392                    id: id.clone(),
2393                    method: method.clone(),
2394                    role_id,
2395                    untyped,
2396                    response_tx,
2397                    #[cfg(feature = "unstable_cancel_request")]
2398                    cancellation_disarm: cancellation.disarm_handle(),
2399                };
2400
2401                match self.message_tx.unbounded_send(message) {
2402                    Ok(()) => (),
2403                    Err(error) => {
2404                        #[cfg(feature = "unstable_cancel_request")]
2405                        cancellation.disarm();
2406
2407                        let OutgoingMessage::Request {
2408                            method,
2409                            response_tx,
2410                            ..
2411                        } = error.into_inner()
2412                        else {
2413                            unreachable!();
2414                        };
2415
2416                        response_tx
2417                            .send(ResponsePayload {
2418                                result: Err(crate::util::internal_error(format!(
2419                                    "failed to send outgoing request `{method}"
2420                                ))),
2421                                ack_tx: None,
2422                            })
2423                            .unwrap();
2424                    }
2425                }
2426            }
2427
2428            Err(err) => {
2429                #[cfg(feature = "unstable_cancel_request")]
2430                cancellation.disarm();
2431
2432                response_tx
2433                    .send(ResponsePayload {
2434                        result: Err(crate::util::internal_error(format!(
2435                            "failed to create untyped request for `{method}`: {err}"
2436                        ))),
2437                        ack_tx: None,
2438                    })
2439                    .unwrap();
2440            }
2441        }
2442
2443        SentRequest::new(
2444            id,
2445            method.clone(),
2446            self.task_tx.clone(),
2447            response_rx,
2448            #[cfg(feature = "unstable_cancel_request")]
2449            cancellation,
2450        )
2451        .map(move |json| <Req::Response>::from_value(&method, json))
2452    }
2453
2454    /// Send an outgoing notification to the default counterpart peer (no reply expected).
2455    ///
2456    /// Notifications are fire-and-forget messages that don't have IDs and don't expect responses.
2457    /// This method sends the notification immediately and returns.
2458    ///
2459    /// This is a convenience method that sends to the counterpart role `R`.
2460    /// For explicit control over the target peer, use [`send_notification_to`](Self::send_notification_to).
2461    ///
2462    /// ```no_run
2463    /// # use agent_client_protocol_test::*;
2464    /// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::Agent>) -> Result<(), agent_client_protocol::Error> {
2465    /// cx.send_notification(StatusUpdate {
2466    ///     message: "Processing...".into(),
2467    /// })?;
2468    /// # Ok(())
2469    /// # }
2470    /// ```
2471    pub fn send_notification<N: JsonRpcNotification>(
2472        &self,
2473        notification: N,
2474    ) -> Result<(), crate::Error>
2475    where
2476        Counterpart: HasPeer<Counterpart>,
2477    {
2478        self.send_notification_to(self.counterpart.clone(), notification)
2479    }
2480
2481    /// Send an outgoing notification to a specific peer (no reply expected).
2482    ///
2483    /// The message will be transformed according to the [`HasPeer`](crate::role::HasPeer)
2484    /// implementation before being sent.
2485    pub fn send_notification_to<Peer: Role, N: JsonRpcNotification>(
2486        &self,
2487        peer: Peer,
2488        notification: N,
2489    ) -> Result<(), crate::Error>
2490    where
2491        Counterpart: HasPeer<Peer>,
2492    {
2493        let remote_style = self.counterpart.remote_style(peer);
2494        tracing::debug!(
2495            role = std::any::type_name::<Counterpart>(),
2496            peer = std::any::type_name::<Peer>(),
2497            notification_type = std::any::type_name::<N>(),
2498            ?remote_style,
2499            original_method = notification.method(),
2500            "send_notification_to"
2501        );
2502        let transformed = remote_style.transform_outgoing_message(notification)?;
2503        tracing::debug!(
2504            transformed_method = %transformed.method,
2505            "send_notification_to transformed"
2506        );
2507        send_raw_message(
2508            &self.message_tx,
2509            OutgoingMessage::Notification {
2510                untyped: transformed,
2511            },
2512        )
2513    }
2514
2515    /// Send a `$/cancel_request` notification for an arbitrary request ID to
2516    /// the default counterpart peer.
2517    ///
2518    /// Prefer [`SentRequest::cancel`] when you have the request handle: it
2519    /// already knows the correct peer, request ID, and proxy wrapping. Use this
2520    /// low-level method only when implementing custom routing with a request ID
2521    /// that is valid on this connection.
2522    #[cfg(feature = "unstable_cancel_request")]
2523    pub fn send_cancel_request(
2524        &self,
2525        request_id: impl Into<crate::schema::v1::RequestId>,
2526    ) -> Result<(), crate::Error>
2527    where
2528        Counterpart: HasPeer<Counterpart>,
2529    {
2530        self.send_cancel_request_to(self.counterpart.clone(), request_id)
2531    }
2532
2533    /// Send a `$/cancel_request` notification for an arbitrary request ID to a
2534    /// specific peer.
2535    ///
2536    /// Prefer [`SentRequest::cancel`] when you have the request handle: it
2537    /// already knows the correct peer, request ID, and proxy wrapping. Use this
2538    /// low-level method only when implementing custom routing with a request ID
2539    /// that is valid on the target peer's connection.
2540    #[cfg(feature = "unstable_cancel_request")]
2541    pub fn send_cancel_request_to<Peer: Role>(
2542        &self,
2543        peer: Peer,
2544        request_id: impl Into<crate::schema::v1::RequestId>,
2545    ) -> Result<(), crate::Error>
2546    where
2547        Counterpart: HasPeer<Peer>,
2548    {
2549        self.send_notification_to(
2550            peer,
2551            crate::schema::v1::CancelRequestNotification::new(request_id),
2552        )
2553    }
2554
2555    /// Send an error notification (no reply expected).
2556    pub fn send_error_notification(&self, error: crate::Error) -> Result<(), crate::Error> {
2557        send_raw_message(&self.message_tx, OutgoingMessage::Error { error })
2558    }
2559
2560    /// Register a dynamic message handler, used to intercept messages specific to a particular session
2561    /// or some similar modal thing.
2562    ///
2563    /// Dynamic message handlers are called first for every incoming message.
2564    ///
2565    /// If they decline to handle the message, then the message is passed to the regular registered handlers.
2566    ///
2567    /// The handler will stay registered until the returned registration guard is dropped.
2568    pub fn add_dynamic_handler(
2569        &self,
2570        handler: impl HandleDispatchFrom<Counterpart> + 'static,
2571    ) -> Result<DynamicHandlerRegistration<Counterpart>, crate::Error> {
2572        let uuid = Uuid::new_v4();
2573        self.dynamic_handler_tx
2574            .unbounded_send(DynamicHandlerMessage::AddDynamicHandler(
2575                uuid,
2576                Box::new(handler),
2577            ))
2578            .map_err(crate::util::internal_error)?;
2579
2580        Ok(DynamicHandlerRegistration::new(uuid, self.clone()))
2581    }
2582
2583    fn remove_dynamic_handler(&self, uuid: Uuid) {
2584        // Ignore errors
2585        drop(
2586            self.dynamic_handler_tx
2587                .unbounded_send(DynamicHandlerMessage::RemoveDynamicHandler(uuid)),
2588        );
2589    }
2590}
2591
2592#[derive(Clone, Debug)]
2593pub struct DynamicHandlerRegistration<R: Role> {
2594    uuid: Uuid,
2595    cx: ConnectionTo<R>,
2596}
2597
2598impl<R: Role> DynamicHandlerRegistration<R> {
2599    fn new(uuid: Uuid, cx: ConnectionTo<R>) -> Self {
2600        Self { uuid, cx }
2601    }
2602
2603    /// Prevents the dynamic handler from being removed when dropped.
2604    pub fn run_indefinitely(self) {
2605        std::mem::forget(self);
2606    }
2607}
2608
2609impl<R: Role> Drop for DynamicHandlerRegistration<R> {
2610    fn drop(&mut self) {
2611        self.cx.remove_dynamic_handler(self.uuid);
2612    }
2613}
2614
2615/// The context to respond to an incoming request.
2616///
2617/// This context is provided to request handlers and serves a dual role:
2618///
2619/// 1. **Respond to the request** - Use [`respond`](Self::respond) or
2620///    [`respond_with_result`](Self::respond_with_result) to send the response
2621/// 2. **Send other messages** - Use the [`ConnectionTo`] parameter passed to your
2622///    handler, which provides [`send_request`](`ConnectionTo::send_request`),
2623///    [`send_notification`](`ConnectionTo::send_notification`), and
2624///    [`spawn`](`ConnectionTo::spawn`)
2625///
2626/// # Example
2627///
2628/// ```no_run
2629/// # use agent_client_protocol_test::*;
2630/// # async fn example() -> Result<(), agent_client_protocol::Error> {
2631/// # let connection = mock_connection();
2632/// connection.on_receive_request(async |req: ProcessRequest, responder, cx| {
2633///     // Send a notification while processing
2634///     cx.send_notification(StatusUpdate {
2635///         message: "processing".into(),
2636///     })?;
2637///
2638///     // Do some work...
2639///     let result = process(&req.data)?;
2640///
2641///     // Respond to the request
2642///     responder.respond(ProcessResponse { result })
2643/// }, agent_client_protocol::on_receive_request!())
2644/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
2645/// # Ok(())
2646/// # }
2647/// ```
2648///
2649/// # Event Loop Considerations
2650///
2651/// Like all handlers, request handlers run on the event loop. Use
2652/// [`spawn`](ConnectionTo::spawn) for expensive operations to avoid blocking
2653/// the connection.
2654///
2655/// See the [Event Loop and Concurrency](Builder#event-loop-and-concurrency)
2656/// section for more details.
2657#[must_use]
2658pub struct Responder<T: JsonRpcResponse = serde_json::Value> {
2659    /// The method of the request.
2660    method: String,
2661
2662    /// The `id` of the message we are replying to.
2663    id: RequestId,
2664
2665    /// Request-local cancellation state.
2666    cancellation: ResponderCancellation,
2667
2668    /// Function to send the response to its destination.
2669    ///
2670    /// For incoming requests: serializes to JSON and sends over the wire.
2671    /// For incoming responses: sends to the waiting oneshot channel.
2672    send_fn: Box<dyn FnOnce(Result<T, crate::Error>) -> Result<(), crate::Error> + Send>,
2673}
2674
2675impl<T: JsonRpcResponse> std::fmt::Debug for Responder<T> {
2676    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2677        f.debug_struct("Responder")
2678            .field("method", &self.method)
2679            .field("id", &self.id)
2680            .field("response_type", &std::any::type_name::<T>())
2681            .finish_non_exhaustive()
2682    }
2683}
2684
2685impl Responder<serde_json::Value> {
2686    /// Create a new request context for an incoming request.
2687    ///
2688    /// The response will be serialized to JSON and sent over the wire.
2689    fn new(
2690        message_tx: OutgoingMessageTx,
2691        method: String,
2692        id: RequestId,
2693        cancellation_registry: &RequestCancellationRegistry,
2694    ) -> Self {
2695        let id_clone = id.clone();
2696        let method_clone = method.clone();
2697        let cancellation = cancellation_registry.register(&id);
2698        Self {
2699            method,
2700            id,
2701            cancellation,
2702            send_fn: Box::new(move |response: Result<serde_json::Value, crate::Error>| {
2703                send_raw_message(
2704                    &message_tx,
2705                    OutgoingMessage::Response {
2706                        id: id_clone,
2707                        method: method_clone,
2708                        response,
2709                    },
2710                )
2711            }),
2712        }
2713    }
2714
2715    /// Cast this request context to a different response type.
2716    ///
2717    /// The provided type `T` will be serialized to JSON before sending.
2718    pub fn cast<T: JsonRpcResponse>(self) -> Responder<T> {
2719        self.wrap_params(move |method, value| match value {
2720            Ok(value) => T::into_json(value, method),
2721            Err(e) => Err(e),
2722        })
2723    }
2724}
2725
2726impl<T: JsonRpcResponse> Responder<T> {
2727    /// Method of the incoming request
2728    #[must_use]
2729    pub fn method(&self) -> &str {
2730        &self.method
2731    }
2732
2733    /// ID of the incoming request/response as a JSON value
2734    #[must_use]
2735    pub fn id(&self) -> serde_json::Value {
2736        crate::util::id_to_json(&self.id)
2737    }
2738
2739    /// Returns the cancellation marker for this request.
2740    ///
2741    /// The marker is set when the peer sends `$/cancel_request` for this
2742    /// request's JSON-RPC ID. Cancellation is cooperative: handlers should use
2743    /// the marker to stop long-running work and then decide whether to respond
2744    /// with [`Error::request_cancelled`] or partial data.
2745    ///
2746    /// [`Error::request_cancelled`]: crate::Error::request_cancelled
2747    #[cfg(feature = "unstable_cancel_request")]
2748    #[must_use]
2749    pub fn cancellation(&self) -> RequestCancellation {
2750        self.cancellation.cancellation()
2751    }
2752
2753    /// Convert to a `Responder` that expects a JSON value
2754    /// and which checks (dynamically) that the JSON value it receives
2755    /// can be converted to `T`.
2756    pub fn erase_to_json(self) -> Responder<serde_json::Value> {
2757        self.wrap_params(|method, value| T::from_value(method, value?))
2758    }
2759
2760    /// Return a new Responder with a different method name.
2761    pub fn wrap_method(self, method: String) -> Responder<T> {
2762        Responder {
2763            method,
2764            id: self.id,
2765            cancellation: self.cancellation,
2766            send_fn: self.send_fn,
2767        }
2768    }
2769
2770    /// Return a new Responder that expects a response of type U.
2771    ///
2772    /// `wrap_fn` will be invoked with the method name and the result to transform
2773    /// type `U` into type `T` before sending.
2774    pub fn wrap_params<U: JsonRpcResponse>(
2775        self,
2776        wrap_fn: impl FnOnce(&str, Result<U, crate::Error>) -> Result<T, crate::Error> + Send + 'static,
2777    ) -> Responder<U> {
2778        let method = self.method.clone();
2779        Responder {
2780            method: self.method,
2781            id: self.id,
2782            cancellation: self.cancellation,
2783            send_fn: Box::new(move |input: Result<U, crate::Error>| {
2784                let t_value = wrap_fn(&method, input);
2785                (self.send_fn)(t_value)
2786            }),
2787        }
2788    }
2789
2790    /// Respond to the JSON-RPC request with either a value (`Ok`) or an error (`Err`).
2791    pub fn respond_with_result(
2792        self,
2793        response: Result<T, crate::Error>,
2794    ) -> Result<(), crate::Error> {
2795        tracing::debug!(id = ?self.id, "respond called");
2796        (self.send_fn)(response)
2797    }
2798
2799    /// Respond to the JSON-RPC request with a value.
2800    pub fn respond(self, response: T) -> Result<(), crate::Error> {
2801        self.respond_with_result(Ok(response))
2802    }
2803
2804    /// Respond to the JSON-RPC request with an internal error containing a message.
2805    pub fn respond_with_internal_error(self, message: impl ToString) -> Result<(), crate::Error> {
2806        self.respond_with_error(crate::util::internal_error(message))
2807    }
2808
2809    /// Respond to the JSON-RPC request with an error.
2810    pub fn respond_with_error(self, error: crate::Error) -> Result<(), crate::Error> {
2811        tracing::debug!(id = ?self.id, ?error, "respond_with_error called");
2812        self.respond_with_result(Err(error))
2813    }
2814}
2815
2816/// Context for handling an incoming JSON-RPC response.
2817///
2818/// This is the response-side counterpart to [`Responder`]. While `Responder` handles
2819/// incoming requests (where you send a response over the wire), `ResponseRouter` handles
2820/// incoming responses (where you route the response to a local task waiting for it).
2821///
2822/// Both are fundamentally "sinks" that push the message through a `send_fn`, but they
2823/// represent different points in the message lifecycle and carry different metadata.
2824///
2825/// # Drop Behavior
2826///
2827/// Dropping a `ResponseRouter` without responding (for example, from a
2828/// dispatch handler that claims a [`Dispatch::Response`]) discards the
2829/// response: the local awaiter observes the response as never received. The
2830/// request still counts as settled — when the `unstable_cancel_request`
2831/// feature is enabled, routing a response this far disarms the originating
2832/// [`SentRequest`]'s drop-time auto-cancellation even if the router is never
2833/// invoked, since the peer has already answered.
2834#[must_use]
2835pub struct ResponseRouter<T: JsonRpcResponse = serde_json::Value> {
2836    /// The method of the original request.
2837    method: String,
2838
2839    /// The `id` of the original request.
2840    id: RequestId,
2841
2842    /// The RoleId to which the original request was sent
2843    /// (and hence from which the reply is expected).
2844    role_id: RoleId,
2845
2846    /// Function to send the response to the waiting task.
2847    send_fn: Box<dyn FnOnce(Result<T, crate::Error>) -> Result<(), crate::Error> + Send>,
2848}
2849
2850impl<T: JsonRpcResponse> std::fmt::Debug for ResponseRouter<T> {
2851    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2852        f.debug_struct("ResponseRouter")
2853            .field("method", &self.method)
2854            .field("id", &self.id)
2855            .field("response_type", &std::any::type_name::<T>())
2856            .finish_non_exhaustive()
2857    }
2858}
2859
2860impl ResponseRouter<serde_json::Value> {
2861    /// Create a new response context for routing a response to a local awaiter.
2862    ///
2863    /// When `respond_with_result` is called, the response is sent through the oneshot
2864    /// channel to the code that originally sent the request. If that receiver was
2865    /// dropped, the response is discarded because there is no local awaiter left.
2866    pub(crate) fn new(
2867        method: String,
2868        id: RequestId,
2869        role_id: RoleId,
2870        sender: oneshot::Sender<ResponsePayload>,
2871        #[cfg(feature = "unstable_cancel_request")]
2872        cancellation_disarm: SentRequestCancellationDisarm,
2873    ) -> Self {
2874        let response_method = method.clone();
2875        let response_id = id.clone();
2876        // A response for the request reached this router, so the request is
2877        // settled from the peer's perspective and a `$/cancel_request` could
2878        // only ever be redundant. Disarm immediately so handlers may retain
2879        // the router without leaving auto-cancellation armed.
2880        #[cfg(feature = "unstable_cancel_request")]
2881        cancellation_disarm.disarm();
2882        Self {
2883            method,
2884            id,
2885            role_id,
2886            send_fn: Box::new(move |response: Result<serde_json::Value, crate::Error>| {
2887                if sender
2888                    .send(ResponsePayload {
2889                        result: response,
2890                        ack_tx: None,
2891                    })
2892                    .is_err()
2893                {
2894                    tracing::debug!(
2895                        method = %response_method,
2896                        id = ?response_id,
2897                        "dropped response because local receiver was gone"
2898                    );
2899                }
2900                Ok(())
2901            }),
2902        }
2903    }
2904
2905    /// Cast this response context to a different response type.
2906    ///
2907    /// The provided type `T` will be serialized to JSON before sending.
2908    pub fn cast<T: JsonRpcResponse>(self) -> ResponseRouter<T> {
2909        self.wrap_params(move |method, value| match value {
2910            Ok(value) => T::into_json(value, method),
2911            Err(e) => Err(e),
2912        })
2913    }
2914}
2915
2916impl<T: JsonRpcResponse> ResponseRouter<T> {
2917    /// Method of the original request
2918    #[must_use]
2919    pub fn method(&self) -> &str {
2920        &self.method
2921    }
2922
2923    /// ID of the original request as a JSON value
2924    #[must_use]
2925    pub fn id(&self) -> serde_json::Value {
2926        crate::util::id_to_json(&self.id)
2927    }
2928
2929    /// The peer to which the original request was sent.
2930    ///
2931    /// This is the peer from which we expect to receive the response.
2932    #[must_use]
2933    pub fn role_id(&self) -> RoleId {
2934        self.role_id.clone()
2935    }
2936
2937    /// Convert to a `ResponseRouter` that expects a JSON value
2938    /// and which checks (dynamically) that the JSON value it receives
2939    /// can be converted to `T`.
2940    pub fn erase_to_json(self) -> ResponseRouter<serde_json::Value> {
2941        self.wrap_params(|method, value| T::from_value(method, value?))
2942    }
2943
2944    /// Return a new ResponseRouter that expects a response of type U.
2945    ///
2946    /// `wrap_fn` will be invoked with the method name and the result to transform
2947    /// type `U` into type `T` before sending.
2948    fn wrap_params<U: JsonRpcResponse>(
2949        self,
2950        wrap_fn: impl FnOnce(&str, Result<U, crate::Error>) -> Result<T, crate::Error> + Send + 'static,
2951    ) -> ResponseRouter<U> {
2952        let method = self.method.clone();
2953        ResponseRouter {
2954            method: self.method,
2955            id: self.id,
2956            role_id: self.role_id,
2957            send_fn: Box::new(move |input: Result<U, crate::Error>| {
2958                let t_value = wrap_fn(&method, input);
2959                (self.send_fn)(t_value)
2960            }),
2961        }
2962    }
2963
2964    /// Complete the response by sending the result to the waiting task.
2965    pub fn respond_with_result(
2966        self,
2967        response: Result<T, crate::Error>,
2968    ) -> Result<(), crate::Error> {
2969        tracing::debug!(id = ?self.id, "response routed to awaiter");
2970        (self.send_fn)(response)
2971    }
2972
2973    /// Complete the response by sending a value to the waiting task.
2974    pub fn respond(self, response: T) -> Result<(), crate::Error> {
2975        self.respond_with_result(Ok(response))
2976    }
2977
2978    /// Complete the response by sending an internal error to the waiting task.
2979    pub fn respond_with_internal_error(self, message: impl ToString) -> Result<(), crate::Error> {
2980        self.respond_with_error(crate::util::internal_error(message))
2981    }
2982
2983    /// Complete the response by sending an error to the waiting task.
2984    pub fn respond_with_error(self, error: crate::Error) -> Result<(), crate::Error> {
2985        tracing::debug!(id = ?self.id, ?error, "error routed to awaiter");
2986        self.respond_with_result(Err(error))
2987    }
2988}
2989
2990/// Common bounds for any JSON-RPC message.
2991///
2992/// # Derive Macro
2993///
2994/// For simple message types, you can use the `JsonRpcRequest` or `JsonRpcNotification` derive macros
2995/// which will implement both `JsonRpcMessage` and the respective trait. See [`JsonRpcRequest`] and
2996/// [`JsonRpcNotification`] for examples.
2997pub trait JsonRpcMessage: 'static + Debug + Sized + Send + Clone {
2998    /// Check if this message type matches the given method name.
2999    fn matches_method(method: &str) -> bool;
3000
3001    /// The method name for the message.
3002    fn method(&self) -> &str;
3003
3004    /// Convert this message into an untyped message.
3005    fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error>;
3006
3007    /// Parse this type from a method name and parameters.
3008    ///
3009    /// Returns an error if the method doesn't match or deserialization fails.
3010    /// Callers should use `matches_method` first to check if this type handles the method.
3011    fn parse_message(method: &str, params: &impl Serialize) -> Result<Self, crate::Error>;
3012}
3013
3014/// Defines the "payload" of a successful response to a JSON-RPC request.
3015///
3016/// # Derive Macro
3017///
3018/// Use `#[derive(JsonRpcResponse)]` to automatically implement this trait:
3019///
3020/// ```ignore
3021/// use agent_client_protocol::JsonRpcResponse;
3022/// use serde::{Serialize, Deserialize};
3023///
3024/// #[derive(Debug, Serialize, Deserialize, JsonRpcResponse)]
3025/// #[response(method = "_hello")]
3026/// struct HelloResponse {
3027///     greeting: String,
3028/// }
3029/// ```
3030pub trait JsonRpcResponse: 'static + Debug + Sized + Send + Clone {
3031    /// Convert this message into a JSON value.
3032    fn into_json(self, method: &str) -> Result<serde_json::Value, crate::Error>;
3033
3034    /// Parse a JSON value into the response type.
3035    fn from_value(method: &str, value: serde_json::Value) -> Result<Self, crate::Error>;
3036}
3037
3038impl JsonRpcResponse for serde_json::Value {
3039    fn from_value(_method: &str, value: serde_json::Value) -> Result<Self, crate::Error> {
3040        Ok(value)
3041    }
3042
3043    fn into_json(self, _method: &str) -> Result<serde_json::Value, crate::Error> {
3044        Ok(self)
3045    }
3046}
3047
3048/// A struct that represents a notification (JSON-RPC message that does not expect a response).
3049///
3050/// # Derive Macro
3051///
3052/// Use `#[derive(JsonRpcNotification)]` to automatically implement both `JsonRpcMessage` and `JsonRpcNotification`:
3053///
3054/// ```ignore
3055/// use agent_client_protocol::JsonRpcNotification;
3056/// use serde::{Serialize, Deserialize};
3057///
3058/// #[derive(Debug, Clone, Serialize, Deserialize, JsonRpcNotification)]
3059/// #[notification(method = "_ping")]
3060/// struct PingNotification {
3061///     timestamp: u64,
3062/// }
3063/// ```
3064pub trait JsonRpcNotification: JsonRpcMessage {}
3065
3066/// A struct that represents a request (JSON-RPC message expecting a response).
3067///
3068/// # Derive Macro
3069///
3070/// Use `#[derive(JsonRpcRequest)]` to automatically implement both `JsonRpcMessage` and `JsonRpcRequest`:
3071///
3072/// ```ignore
3073/// use agent_client_protocol::{JsonRpcRequest, JsonRpcResponse};
3074/// use serde::{Serialize, Deserialize};
3075///
3076/// #[derive(Debug, Clone, Serialize, Deserialize, JsonRpcRequest)]
3077/// #[request(method = "_hello", response = HelloResponse)]
3078/// struct HelloRequest {
3079///     name: String,
3080/// }
3081///
3082/// #[derive(Debug, Serialize, Deserialize, JsonRpcResponse)]
3083/// struct HelloResponse {
3084///     greeting: String,
3085/// }
3086/// ```
3087pub trait JsonRpcRequest: JsonRpcMessage {
3088    /// The type of data expected in response.
3089    type Response: JsonRpcResponse;
3090}
3091
3092/// An enum capturing an in-flight request or notification.
3093/// In the case of a request, also includes the context used to respond to the request.
3094///
3095/// Type parameters allow specifying the concrete request and notification types.
3096/// By default, both are `UntypedMessage` for dynamic dispatch.
3097/// The request context's response type matches the request's response type.
3098#[derive(Debug)]
3099pub enum Dispatch<Req: JsonRpcRequest = UntypedMessage, Notif: JsonRpcMessage = UntypedMessage> {
3100    /// Incoming request and the context where the response should be sent.
3101    Request(Req, Responder<Req::Response>),
3102
3103    /// Incoming notification.
3104    Notification(Notif),
3105
3106    /// Incoming response to a request we sent.
3107    ///
3108    /// The first field is the response result (success or error from the remote).
3109    /// The second field is the context for forwarding the response to its destination
3110    /// (typically a waiting oneshot channel).
3111    Response(
3112        Result<Req::Response, crate::Error>,
3113        ResponseRouter<Req::Response>,
3114    ),
3115}
3116
3117impl<Req: JsonRpcRequest, Notif: JsonRpcMessage> Dispatch<Req, Notif> {
3118    /// Map the request and notification types to new types.
3119    ///
3120    /// Note: Response variants are passed through unchanged since they don't
3121    /// contain a parseable message payload.
3122    pub fn map<Req1, Notif1>(
3123        self,
3124        map_request: impl FnOnce(Req, Responder<Req::Response>) -> (Req1, Responder<Req1::Response>),
3125        map_notification: impl FnOnce(Notif) -> Notif1,
3126    ) -> Dispatch<Req1, Notif1>
3127    where
3128        Req1: JsonRpcRequest<Response = Req::Response>,
3129        Notif1: JsonRpcMessage,
3130    {
3131        match self {
3132            Dispatch::Request(request, responder) => {
3133                let (new_request, new_responder) = map_request(request, responder);
3134                Dispatch::Request(new_request, new_responder)
3135            }
3136            Dispatch::Notification(notification) => {
3137                let new_notification = map_notification(notification);
3138                Dispatch::Notification(new_notification)
3139            }
3140            Dispatch::Response(result, router) => Dispatch::Response(result, router),
3141        }
3142    }
3143
3144    /// Respond to the message with an error.
3145    ///
3146    /// If this message is a request, this error becomes the reply to the request.
3147    ///
3148    /// If this message is a notification, the error is sent as a notification.
3149    ///
3150    /// If this message is a response, the error is forwarded to the waiting handler.
3151    pub fn respond_with_error<R: Role>(
3152        self,
3153        error: crate::Error,
3154        cx: ConnectionTo<R>,
3155    ) -> Result<(), crate::Error> {
3156        match self {
3157            Dispatch::Request(_, responder) => responder.respond_with_error(error),
3158            Dispatch::Notification(_) => cx.send_error_notification(error),
3159            Dispatch::Response(_, responder) => responder.respond_with_error(error),
3160        }
3161    }
3162
3163    /// Convert to a `Responder` that expects a JSON value
3164    /// and which checks (dynamically) that the JSON value it receives
3165    /// can be converted to `T`.
3166    ///
3167    /// Note: Response variants cannot be erased since their payload is already
3168    /// parsed. This returns an error for Response variants.
3169    pub fn erase_to_json(self) -> Result<Dispatch, crate::Error> {
3170        match self {
3171            Dispatch::Request(response, responder) => Ok(Dispatch::Request(
3172                response.to_untyped_message()?,
3173                responder.erase_to_json(),
3174            )),
3175            Dispatch::Notification(notification) => {
3176                Ok(Dispatch::Notification(notification.to_untyped_message()?))
3177            }
3178            Dispatch::Response(_, _) => Err(crate::util::internal_error(
3179                "cannot erase Response variant to JSON",
3180            )),
3181        }
3182    }
3183
3184    /// Convert the message in self to an untyped message.
3185    ///
3186    /// Note: Response variants don't have an untyped message representation.
3187    /// This returns an error for Response variants.
3188    pub fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error> {
3189        match self {
3190            Dispatch::Request(request, _) => request.to_untyped_message(),
3191            Dispatch::Notification(notification) => notification.to_untyped_message(),
3192            Dispatch::Response(_, _) => Err(crate::util::internal_error(
3193                "Response variant has no untyped message representation",
3194            )),
3195        }
3196    }
3197
3198    /// Convert self to an untyped message context.
3199    ///
3200    /// Note: Response variants cannot be converted. This returns an error for Response variants.
3201    pub fn into_untyped_dispatch(self) -> Result<Dispatch, crate::Error> {
3202        match self {
3203            Dispatch::Request(request, responder) => Ok(Dispatch::Request(
3204                request.to_untyped_message()?,
3205                responder.erase_to_json(),
3206            )),
3207            Dispatch::Notification(notification) => {
3208                Ok(Dispatch::Notification(notification.to_untyped_message()?))
3209            }
3210            Dispatch::Response(_, _) => Err(crate::util::internal_error(
3211                "cannot convert Response variant to untyped message context",
3212            )),
3213        }
3214    }
3215
3216    /// Returns the request ID if this is a request or response, None if notification.
3217    pub fn id(&self) -> Option<serde_json::Value> {
3218        match self {
3219            Dispatch::Request(_, cx) => Some(cx.id()),
3220            Dispatch::Notification(_) => None,
3221            Dispatch::Response(_, cx) => Some(cx.id()),
3222        }
3223    }
3224
3225    /// Returns the method of the message.
3226    ///
3227    /// For requests and notifications, this is the method from the message payload.
3228    /// For responses, this is the method of the original request.
3229    pub fn method(&self) -> &str {
3230        match self {
3231            Dispatch::Request(msg, _) => msg.method(),
3232            Dispatch::Notification(msg) => msg.method(),
3233            Dispatch::Response(_, cx) => cx.method(),
3234        }
3235    }
3236}
3237
3238impl Dispatch {
3239    /// Attempts to parse `self` into a typed message context.
3240    ///
3241    /// # Returns
3242    ///
3243    /// * `Ok(Ok(typed))` if this is a request/notification of the given types
3244    /// * `Ok(Err(self))` if not
3245    /// * `Err` if has the correct method for the given types but parsing fails
3246    #[tracing::instrument(skip(self), fields(Request = ?std::any::type_name::<Req>(), Notif = ?std::any::type_name::<Notif>()), level = "trace", ret)]
3247    pub(crate) fn into_typed_dispatch<Req: JsonRpcRequest, Notif: JsonRpcNotification>(
3248        self,
3249    ) -> Result<Result<Dispatch<Req, Notif>, Dispatch>, crate::Error> {
3250        tracing::debug!(
3251            message = ?self,
3252            "into_typed_dispatch"
3253        );
3254        match self {
3255            Dispatch::Request(message, responder) => {
3256                if Req::matches_method(&message.method) {
3257                    match Req::parse_message(&message.method, &message.params) {
3258                        Ok(req) => {
3259                            tracing::trace!(?req, "parsed ok");
3260                            Ok(Ok(Dispatch::Request(req, responder.cast())))
3261                        }
3262                        Err(err) => {
3263                            tracing::trace!(?err, "parse error");
3264                            Err(err)
3265                        }
3266                    }
3267                } else {
3268                    tracing::trace!("method doesn't match");
3269                    Ok(Err(Dispatch::Request(message, responder)))
3270                }
3271            }
3272
3273            Dispatch::Notification(message) => {
3274                if Notif::matches_method(&message.method) {
3275                    match Notif::parse_message(&message.method, &message.params) {
3276                        Ok(notif) => {
3277                            tracing::trace!(?notif, "parse ok");
3278                            Ok(Ok(Dispatch::Notification(notif)))
3279                        }
3280                        Err(err) => {
3281                            tracing::trace!(?err, "parse error");
3282                            Err(err)
3283                        }
3284                    }
3285                } else {
3286                    tracing::trace!("method doesn't match");
3287                    Ok(Err(Dispatch::Notification(message)))
3288                }
3289            }
3290
3291            Dispatch::Response(result, cx) => {
3292                let method = cx.method();
3293                if Req::matches_method(method) {
3294                    // Parse the response result
3295                    let typed_result = match result {
3296                        Ok(value) => {
3297                            match <Req::Response as JsonRpcResponse>::from_value(method, value) {
3298                                Ok(parsed) => {
3299                                    tracing::trace!(?parsed, "parse ok");
3300                                    Ok(parsed)
3301                                }
3302                                Err(err) => {
3303                                    tracing::trace!(?err, "parse error");
3304                                    return Err(err);
3305                                }
3306                            }
3307                        }
3308                        Err(err) => {
3309                            tracing::trace!("error, passthrough");
3310                            Err(err)
3311                        }
3312                    };
3313                    Ok(Ok(Dispatch::Response(typed_result, cx.cast())))
3314                } else {
3315                    tracing::trace!("method doesn't match");
3316                    Ok(Err(Dispatch::Response(result, cx)))
3317                }
3318            }
3319        }
3320    }
3321
3322    /// True if this message has a field with the given name.
3323    ///
3324    /// Returns `false` for Response variants.
3325    #[must_use]
3326    pub fn has_field(&self, field_name: &str) -> bool {
3327        self.message()
3328            .and_then(|m| m.params().get(field_name))
3329            .is_some()
3330    }
3331
3332    /// Returns true if this message has a session-id field.
3333    ///
3334    /// Returns `false` for Response variants.
3335    pub(crate) fn has_session_id(&self) -> bool {
3336        self.has_field("sessionId")
3337    }
3338
3339    /// Extract the ACP session-id from this message (if any).
3340    ///
3341    /// Returns `Ok(None)` for Response variants.
3342    pub(crate) fn get_session_id(&self) -> Result<Option<SessionId>, crate::Error> {
3343        let Some(message) = self.message() else {
3344            return Ok(None);
3345        };
3346        let Some(value) = message.params().get("sessionId") else {
3347            return Ok(None);
3348        };
3349        let session_id = serde_json::from_value(value.clone())?;
3350        Ok(Some(session_id))
3351    }
3352
3353    /// Try to parse this as a notification of the given type.
3354    ///
3355    /// # Returns
3356    ///
3357    /// * `Ok(Ok(typed))` if this is a request/notification of the given types
3358    /// * `Ok(Err(self))` if not
3359    /// * `Err` if has the correct method for the given types but parsing fails
3360    pub fn into_notification<N: JsonRpcNotification>(
3361        self,
3362    ) -> Result<Result<N, Dispatch>, crate::Error> {
3363        match self {
3364            Dispatch::Notification(msg) => {
3365                if !N::matches_method(&msg.method) {
3366                    return Ok(Err(Dispatch::Notification(msg)));
3367                }
3368                match N::parse_message(&msg.method, &msg.params) {
3369                    Ok(n) => Ok(Ok(n)),
3370                    Err(err) => Err(err),
3371                }
3372            }
3373            Dispatch::Request(..) | Dispatch::Response(..) => Ok(Err(self)),
3374        }
3375    }
3376
3377    /// Try to parse this as a request of the given type.
3378    ///
3379    /// # Returns
3380    ///
3381    /// * `Ok(Ok(typed))` if this is a request/notification of the given types
3382    /// * `Ok(Err(self))` if not
3383    /// * `Err` if has the correct method for the given types but parsing fails
3384    pub fn into_request<Req: JsonRpcRequest>(
3385        self,
3386    ) -> Result<Result<(Req, Responder<Req::Response>), Dispatch>, crate::Error> {
3387        match self {
3388            Dispatch::Request(msg, responder) => {
3389                if !Req::matches_method(&msg.method) {
3390                    return Ok(Err(Dispatch::Request(msg, responder)));
3391                }
3392                match Req::parse_message(&msg.method, &msg.params) {
3393                    Ok(req) => Ok(Ok((req, responder.cast()))),
3394                    Err(err) => Err(err),
3395                }
3396            }
3397            Dispatch::Notification(..) | Dispatch::Response(..) => Ok(Err(self)),
3398        }
3399    }
3400}
3401
3402impl<M: JsonRpcRequest + JsonRpcNotification> Dispatch<M, M> {
3403    /// Returns the message payload for requests and notifications.
3404    ///
3405    /// Returns `None` for Response variants since they don't contain a message payload.
3406    pub fn message(&self) -> Option<&M> {
3407        match self {
3408            Dispatch::Request(msg, _) | Dispatch::Notification(msg) => Some(msg),
3409            Dispatch::Response(_, _) => None,
3410        }
3411    }
3412
3413    /// Map the request/notification message.
3414    ///
3415    /// Response variants pass through unchanged.
3416    pub(crate) fn try_map_message(
3417        self,
3418        map_message: impl FnOnce(M) -> Result<M, crate::Error>,
3419    ) -> Result<Dispatch<M, M>, crate::Error> {
3420        match self {
3421            Dispatch::Request(request, cx) => Ok(Dispatch::Request(map_message(request)?, cx)),
3422            Dispatch::Notification(notification) => {
3423                Ok(Dispatch::<M, M>::Notification(map_message(notification)?))
3424            }
3425            Dispatch::Response(result, cx) => Ok(Dispatch::Response(result, cx)),
3426        }
3427    }
3428}
3429
3430/// An incoming JSON message without any typing. Can be a request or a notification.
3431#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
3432pub struct UntypedMessage {
3433    /// The JSON-RPC method name
3434    pub method: String,
3435    /// The JSON-RPC parameters as a raw JSON value
3436    pub params: serde_json::Value,
3437}
3438
3439impl UntypedMessage {
3440    /// Returns an untyped message with the given method and parameters.
3441    pub fn new(method: &str, params: impl Serialize) -> Result<Self, crate::Error> {
3442        let params = serde_json::to_value(params)?;
3443        Ok(Self {
3444            method: method.to_string(),
3445            params,
3446        })
3447    }
3448
3449    /// Returns the method name
3450    #[must_use]
3451    pub fn method(&self) -> &str {
3452        &self.method
3453    }
3454
3455    /// Returns the parameters as a JSON value
3456    #[must_use]
3457    pub fn params(&self) -> &serde_json::Value {
3458        &self.params
3459    }
3460
3461    /// Consumes this message and returns the method and params
3462    #[must_use]
3463    pub fn into_parts(self) -> (String, serde_json::Value) {
3464        (self.method, self.params)
3465    }
3466
3467    /// Convert `self` to a raw JSON-RPC message.
3468    pub(crate) fn into_raw_jsonrpc_message(
3469        self,
3470        id: Option<RequestId>,
3471    ) -> Result<RawJsonRpcMessage, crate::Error> {
3472        let Self { method, params } = self;
3473        match id {
3474            Some(id) => RawJsonRpcMessage::request(method, params, id),
3475            None => RawJsonRpcMessage::notification(method, params),
3476        }
3477    }
3478}
3479
3480impl JsonRpcMessage for UntypedMessage {
3481    fn matches_method(_method: &str) -> bool {
3482        // UntypedMessage matches any method - it's the untyped fallback
3483        true
3484    }
3485
3486    fn method(&self) -> &str {
3487        &self.method
3488    }
3489
3490    fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error> {
3491        Ok(self.clone())
3492    }
3493
3494    fn parse_message(method: &str, params: &impl Serialize) -> Result<Self, crate::Error> {
3495        UntypedMessage::new(method, params)
3496    }
3497}
3498
3499impl JsonRpcRequest for UntypedMessage {
3500    type Response = serde_json::Value;
3501}
3502
3503impl JsonRpcNotification for UntypedMessage {}
3504
3505/// Represents a pending response of type `R` from an outgoing request.
3506///
3507/// Returned by [`ConnectionTo::send_request`], this type provides methods for handling
3508/// the response without blocking the event loop. The API is intentionally designed to make
3509/// it difficult to accidentally block.
3510///
3511/// # Anti-Footgun Design
3512///
3513/// You cannot directly `.await` a `SentRequest`. Instead, you must choose how to handle
3514/// the response:
3515///
3516/// ## Option 1: Schedule a Callback (Safe in Handlers)
3517///
3518/// Use [`on_receiving_result`](Self::on_receiving_result) to schedule a task
3519/// that runs when the response arrives. This doesn't block the event loop:
3520///
3521/// ```no_run
3522/// # use agent_client_protocol_test::*;
3523/// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
3524/// cx.send_request(MyRequest {})
3525///     .on_receiving_result(async |result| {
3526///         match result {
3527///             Ok(response) => {
3528///                 // Handle successful response
3529///                 Ok(())
3530///             }
3531///             Err(error) => {
3532///                 // Handle error
3533///                 Err(error)
3534///             }
3535///         }
3536///     })?;
3537/// # Ok(())
3538/// # }
3539/// ```
3540///
3541/// ## Option 2: Block in a Spawned Task (Safe Only in `spawn`)
3542///
3543/// Use [`block_task`](Self::block_task) to block until the response arrives, but **only**
3544/// in a spawned task (never in a handler):
3545///
3546/// ```no_run
3547/// # use agent_client_protocol_test::*;
3548/// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
3549/// // ✅ Safe: Spawned task runs concurrently
3550/// cx.spawn({
3551///     let cx = cx.clone();
3552///     async move {
3553///         let response = cx.send_request(MyRequest {})
3554///             .block_task()
3555///             .await?;
3556///         // Process response...
3557///         Ok(())
3558///     }
3559/// })?;
3560/// # Ok(())
3561/// # }
3562/// ```
3563///
3564/// ```no_run
3565/// # use agent_client_protocol_test::*;
3566/// # async fn example() -> Result<(), agent_client_protocol::Error> {
3567/// # let connection = mock_connection();
3568/// // ❌ NEVER do this in a handler - blocks the event loop!
3569/// connection.on_receive_request(async |req: MyRequest, responder, cx| {
3570///     let response = cx.send_request(MyRequest {})
3571///         .block_task()  // This will deadlock!
3572///         .await?;
3573///     responder.respond(response)
3574/// }, agent_client_protocol::on_receive_request!())
3575/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
3576/// # Ok(())
3577/// # }
3578/// ```
3579///
3580/// # Why This Design?
3581///
3582/// If you block the event loop while waiting for a response, the connection cannot process
3583/// the incoming response message, creating a deadlock. This API design prevents that footgun
3584/// by making blocking explicit and encouraging non-blocking patterns.
3585///
3586/// # Drop Behavior
3587///
3588/// By default, dropping a `SentRequest` without consuming it discards the
3589/// response when it arrives. When the `unstable_cancel_request` feature is
3590/// enabled, dropping a `SentRequest` before the SDK has received the response
3591/// additionally sends a `$/cancel_request` notification asking the peer to
3592/// cancel the request; requests whose eventual response should be ignored, but
3593/// which should keep running on the peer, should use [`detach`](Self::detach)
3594/// instead.
3595#[must_use = "dropping a SentRequest discards the response (and, with the \
3596              `unstable_cancel_request` feature, asks the peer to cancel the \
3597              request); consume it with `block_task`, `on_receiving_result`, \
3598              `forward_response_to`, or `detach`"]
3599pub struct SentRequest<T> {
3600    id: RequestId,
3601    method: String,
3602    task_tx: TaskTx,
3603    response_rx: oneshot::Receiver<ResponsePayload>,
3604    to_result: Box<dyn Fn(serde_json::Value) -> Result<T, crate::Error> + Send>,
3605    #[cfg(feature = "unstable_cancel_request")]
3606    cancellation: SentRequestCancellation,
3607    /// Cancellation markers of other (incoming) requests whose cancellation
3608    /// should be forwarded to this request. See
3609    /// [`forward_cancellation_from`](Self::forward_cancellation_from).
3610    #[cfg(feature = "unstable_cancel_request")]
3611    cancellation_sources: Vec<RequestCancellation>,
3612}
3613
3614#[cfg(feature = "unstable_cancel_request")]
3615#[derive(Clone, Debug)]
3616pub(crate) struct SentRequestCancellationDisarm {
3617    armed: Arc<AtomicBool>,
3618}
3619
3620#[cfg(feature = "unstable_cancel_request")]
3621impl SentRequestCancellationDisarm {
3622    fn new() -> Self {
3623        Self {
3624            armed: Arc::new(AtomicBool::new(true)),
3625        }
3626    }
3627
3628    fn disarm(&self) {
3629        self.armed.store(false, Ordering::Release);
3630    }
3631}
3632
3633#[cfg(feature = "unstable_cancel_request")]
3634struct SentRequestCancellation {
3635    message_tx: OutgoingMessageTx,
3636    remote_style: crate::role::RemoteStyle,
3637    request_id: RequestId,
3638    disarm: SentRequestCancellationDisarm,
3639}
3640
3641#[cfg(feature = "unstable_cancel_request")]
3642impl SentRequestCancellation {
3643    fn new(
3644        message_tx: OutgoingMessageTx,
3645        remote_style: crate::role::RemoteStyle,
3646        request_id: RequestId,
3647    ) -> Self {
3648        Self {
3649            message_tx,
3650            remote_style,
3651            request_id,
3652            disarm: SentRequestCancellationDisarm::new(),
3653        }
3654    }
3655
3656    fn disarm(&self) {
3657        self.disarm.disarm();
3658    }
3659
3660    fn disarm_handle(&self) -> SentRequestCancellationDisarm {
3661        self.disarm.clone()
3662    }
3663
3664    fn send(&self) -> Result<(), crate::Error> {
3665        if !self.disarm.armed.swap(false, Ordering::AcqRel) {
3666            return Ok(());
3667        }
3668
3669        // Build the notification lazily: most requests are never cancelled,
3670        // so this avoids serializing a notification per outgoing request.
3671        let untyped = self.remote_style.transform_outgoing_message(
3672            crate::schema::v1::CancelRequestNotification::new(self.request_id.clone()),
3673        )?;
3674
3675        send_raw_message(&self.message_tx, OutgoingMessage::Notification { untyped })
3676    }
3677}
3678
3679#[cfg(feature = "unstable_cancel_request")]
3680impl Drop for SentRequestCancellation {
3681    fn drop(&mut self) {
3682        if let Err(error) = self.send() {
3683            tracing::debug!(?error, "failed to auto-cancel dropped request");
3684        }
3685    }
3686}
3687
3688#[cfg(feature = "unstable_cancel_request")]
3689impl Debug for SentRequestCancellation {
3690    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3691        f.debug_struct("SentRequestCancellation")
3692            .field("request_id", &self.request_id)
3693            .field("remote_style", &self.remote_style)
3694            .field("armed", &self.disarm.armed.load(Ordering::Acquire))
3695            .finish_non_exhaustive()
3696    }
3697}
3698
3699/// Await the response payload for an outgoing request, watching `sources` for
3700/// cancellation of the upstream requests it was registered with.
3701///
3702/// When any source reports cancellation, a `$/cancel_request` is forwarded to
3703/// the outgoing request (at most once, shared with [`SentRequest::cancel`] and
3704/// drop-time auto-cancellation), and the response is *still* awaited: the peer
3705/// always answers, with normal data or a cancellation error.
3706///
3707/// Watching is deliberately bounded by response arrival so that completed
3708/// requests do not leak waiters on markers that will never fire.
3709#[cfg(feature = "unstable_cancel_request")]
3710async fn await_response_forwarding_cancellation(
3711    response_rx: oneshot::Receiver<ResponsePayload>,
3712    cancellation: &SentRequestCancellation,
3713    sources: &[RequestCancellation],
3714) -> Result<ResponsePayload, oneshot::Canceled> {
3715    // Failing to forward the cancellation must not abort the wait: the
3716    // response (normal data or a cancellation error) may still arrive and
3717    // must still be processed.
3718    let forward_cancellation = || {
3719        if let Err(error) = cancellation.send() {
3720            tracing::debug!(
3721                ?error,
3722                "failed to forward cancellation to downstream request"
3723            );
3724        }
3725    };
3726
3727    let response = if sources.is_empty() {
3728        response_rx.await
3729    } else if sources.iter().any(RequestCancellation::is_cancelled) {
3730        forward_cancellation();
3731        response_rx.await
3732    } else {
3733        let cancelled = sources.iter().map(|source| source.state.signal_rx.clone());
3734        match future::select(future::select_all(cancelled), response_rx).await {
3735            Either::Left((_, response_rx)) => {
3736                forward_cancellation();
3737                response_rx.await
3738            }
3739            Either::Right((response, _)) => response,
3740        }
3741    };
3742
3743    cancellation.disarm();
3744    response
3745}
3746
3747impl<T: Debug> Debug for SentRequest<T> {
3748    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3749        let mut debug = f.debug_struct("SentRequest");
3750        debug
3751            .field("id", &self.id)
3752            .field("method", &self.method)
3753            .field("task_tx", &self.task_tx)
3754            .field("response_rx", &self.response_rx);
3755        #[cfg(feature = "unstable_cancel_request")]
3756        debug
3757            .field("cancellation", &self.cancellation)
3758            .field("cancellation_sources", &self.cancellation_sources);
3759        debug.finish_non_exhaustive()
3760    }
3761}
3762
3763impl SentRequest<serde_json::Value> {
3764    fn new(
3765        id: RequestId,
3766        method: String,
3767        task_tx: mpsc::UnboundedSender<Task>,
3768        response_rx: oneshot::Receiver<ResponsePayload>,
3769        #[cfg(feature = "unstable_cancel_request")] cancellation: SentRequestCancellation,
3770    ) -> Self {
3771        Self {
3772            id,
3773            method,
3774            response_rx,
3775            task_tx,
3776            to_result: Box::new(Ok),
3777            #[cfg(feature = "unstable_cancel_request")]
3778            cancellation,
3779            #[cfg(feature = "unstable_cancel_request")]
3780            cancellation_sources: Vec::new(),
3781        }
3782    }
3783}
3784
3785impl<T> SentRequest<T> {
3786    /// Detach this request handle without waiting for its response.
3787    ///
3788    /// The response will be discarded when it arrives. When the
3789    /// `unstable_cancel_request` feature is enabled, this also disarms the
3790    /// drop-time automatic cancellation described in
3791    /// [Drop Behavior](Self#drop-behavior), so use it for requests whose
3792    /// eventual response should be ignored, but which should keep running on the
3793    /// peer. The peer is still expected to answer the JSON-RPC request
3794    /// eventually; use a notification instead when no response is expected at
3795    /// all.
3796    ///
3797    /// To ask the peer to stop the request, enable `unstable_cancel_request`
3798    /// and call `cancel` instead, or drop the handle while automatic
3799    /// cancellation is enabled.
3800    pub fn detach(self) {
3801        #[cfg(feature = "unstable_cancel_request")]
3802        self.cancellation.disarm();
3803    }
3804
3805    /// Send a `$/cancel_request` notification for this outgoing request.
3806    ///
3807    /// This uses the same peer and message wrapping that were used to send the
3808    /// original request, so it is the preferred way to cancel a [`SentRequest`]
3809    /// when the request handle is still available.
3810    ///
3811    /// At most one `$/cancel_request` is ever sent per request: the first
3812    /// `cancel` call sends it (and also prevents the drop-time automatic
3813    /// cancellation described in [Drop Behavior](Self#drop-behavior)), while
3814    /// later calls return `Ok(())` without sending anything. Likewise, once
3815    /// the SDK has routed the response to this handle, `cancel` becomes a
3816    /// no-op: there is nothing left to cancel.
3817    ///
3818    /// Errors are only reported by the call that attempts to send the
3819    /// notification.
3820    #[cfg(feature = "unstable_cancel_request")]
3821    pub fn cancel(&self) -> Result<(), crate::Error> {
3822        self.cancellation.send()
3823    }
3824
3825    /// Forward cancellation of another request to this one.
3826    ///
3827    /// When the request that `source` belongs to is cancelled by its peer,
3828    /// a `$/cancel_request` for *this* request is sent to its peer, using the
3829    /// same wrapping as the original request. The response is still awaited
3830    /// and delivered as usual (normal data or a cancellation error), so this
3831    /// composes with [`block_task`](Self::block_task) and
3832    /// [`on_receiving_result`](Self::on_receiving_result).
3833    ///
3834    /// This is the building block for proxies that forward a request with
3835    /// custom logic instead of [`forward_response_to`](Self::forward_response_to)
3836    /// (which wires this up automatically from its responder). Without it,
3837    /// custom forwarding *absorbs* cancellation: the upstream marker is still
3838    /// set, but nothing is sent downstream.
3839    ///
3840    /// ```
3841    /// # use agent_client_protocol::{ConnectionTo, Error, Responder, UntypedRole};
3842    /// # use agent_client_protocol_test::{MyRequest, MyResponse};
3843    /// # async fn example(request: MyRequest, responder: Responder<MyResponse>, backend: ConnectionTo<UntypedRole>) -> Result<(), Error> {
3844    /// backend
3845    ///     .send_request(request)
3846    ///     .forward_cancellation_from(responder.cancellation())
3847    ///     .on_receiving_result(async move |result| {
3848    ///         // Custom result handling, e.g. bookkeeping or rewriting.
3849    ///         responder.respond_with_result(result)
3850    ///     })?;
3851    /// # Ok(())
3852    /// # }
3853    /// ```
3854    ///
3855    /// May be called multiple times; cancellation of any registered source
3856    /// triggers the forwarding (at most one `$/cancel_request` is ever sent
3857    /// per request). Sources are observed while the response is being
3858    /// awaited — that is, once the handle is consumed with
3859    /// [`block_task`](Self::block_task),
3860    /// [`on_receiving_result`](Self::on_receiving_result), or
3861    /// [`forward_response_to`](Self::forward_response_to); a source that was
3862    /// already cancelled by then is honored immediately.
3863    #[cfg(feature = "unstable_cancel_request")]
3864    pub fn forward_cancellation_from(mut self, source: RequestCancellation) -> Self {
3865        self.cancellation_sources.push(source);
3866        self
3867    }
3868}
3869
3870impl<T: JsonRpcResponse> SentRequest<T> {
3871    /// The id of the outgoing request.
3872    #[must_use]
3873    pub fn id(&self) -> serde_json::Value {
3874        crate::util::id_to_json(&self.id)
3875    }
3876
3877    /// The method of the request this is in response to.
3878    #[must_use]
3879    pub fn method(&self) -> &str {
3880        &self.method
3881    }
3882
3883    /// Create a new response that maps the result of the response to a new type.
3884    pub fn map<U>(
3885        self,
3886        map_fn: impl Fn(T) -> Result<U, crate::Error> + 'static + Send,
3887    ) -> SentRequest<U> {
3888        SentRequest {
3889            id: self.id,
3890            method: self.method,
3891            response_rx: self.response_rx,
3892            task_tx: self.task_tx,
3893            to_result: Box::new(move |value| map_fn((self.to_result)(value)?)),
3894            #[cfg(feature = "unstable_cancel_request")]
3895            cancellation: self.cancellation,
3896            #[cfg(feature = "unstable_cancel_request")]
3897            cancellation_sources: self.cancellation_sources,
3898        }
3899    }
3900
3901    /// Forward the response (success or error) to a request context when it arrives.
3902    ///
3903    /// This is a convenience method for proxying messages between connections. When the
3904    /// response arrives, it will be automatically sent to the provided request context,
3905    /// whether it's a successful response or an error.
3906    ///
3907    /// # Example: Proxying requests
3908    ///
3909    /// ```
3910    /// # use agent_client_protocol::UntypedRole;
3911    /// # use agent_client_protocol::{Builder, ConnectionTo};
3912    /// # use agent_client_protocol_test::*;
3913    /// # async fn example(cx: ConnectionTo<UntypedRole>) -> Result<(), agent_client_protocol::Error> {
3914    /// // Set up backend connection builder
3915    /// let backend = UntypedRole.builder()
3916    ///     .on_receive_request(async |req: MyRequest, responder, cx| {
3917    ///         responder.respond(MyResponse { status: "ok".into() })
3918    ///     }, agent_client_protocol::on_receive_request!());
3919    ///
3920    /// // Spawn backend and get a context to send to it
3921    /// let backend_connection = cx.spawn_connection(backend, MockTransport)?;
3922    ///
3923    /// // Set up proxy that forwards requests to backend
3924    /// UntypedRole.builder()
3925    ///     .on_receive_request({
3926    ///         let backend_connection = backend_connection.clone();
3927    ///         async move |req: MyRequest, responder, cx| {
3928    ///             // Forward the request to backend and proxy the response back
3929    ///             backend_connection.send_request(req)
3930    ///                 .forward_response_to(responder)?;
3931    ///             Ok(())
3932    ///         }
3933    ///     }, agent_client_protocol::on_receive_request!());
3934    /// # Ok(())
3935    /// # }
3936    /// ```
3937    ///
3938    /// # Type Safety
3939    ///
3940    /// The request context's response type must match the request's response type,
3941    /// ensuring type-safe message forwarding.
3942    ///
3943    /// # When to Use
3944    ///
3945    /// Use this when:
3946    /// - You're implementing a proxy or gateway pattern
3947    /// - You want to forward responses without processing them
3948    /// - The response types match between the outgoing request and incoming request
3949    ///
3950    /// This is equivalent to calling `on_receiving_result` and manually forwarding
3951    /// the result, with two proxy-specific additions:
3952    ///
3953    /// - If the pending response is dropped without ever being delivered (for
3954    ///   example, the downstream connection closed), the incoming request is
3955    ///   answered with an internal error instead of being left unanswered.
3956    /// - When the `unstable_cancel_request` feature is enabled and the peer
3957    ///   cancels the incoming request, the cancellation is forwarded to the
3958    ///   outgoing request, and the downstream response (normal data or a
3959    ///   cancellation error) is still forwarded back. This is equivalent to
3960    ///   registering the responder's marker with `forward_cancellation_from`.
3961    #[track_caller]
3962    pub fn forward_response_to(self, responder: Responder<T>) -> Result<(), crate::Error>
3963    where
3964        T: Send,
3965    {
3966        #[cfg(feature = "unstable_cancel_request")]
3967        let this = self.forward_cancellation_from(responder.cancellation());
3968        #[cfg(not(feature = "unstable_cancel_request"))]
3969        let this = self;
3970
3971        this.consume_with(async move |response| {
3972            // A response that was never delivered (outer `Err`, e.g. the
3973            // downstream connection closed) is forwarded as an error: the
3974            // incoming request must not be left unanswered.
3975            responder.respond_with_result(response.unwrap_or_else(Err))
3976        })
3977    }
3978
3979    /// Spawn the response-consumption task shared by
3980    /// [`on_receiving_result`](Self::on_receiving_result) and
3981    /// [`forward_response_to`](Self::forward_response_to).
3982    ///
3983    /// The task awaits the response (forwarding cancellation from registered
3984    /// sources while waiting, when the `unstable_cancel_request` feature is
3985    /// enabled), converts the payload, and invokes `handle` with the typed
3986    /// result (`Ok(Result<T, _>)`). The dispatch loop's ack, if any, is sent
3987    /// after `handle` completes.
3988    ///
3989    /// If the pending response is dropped without ever being delivered (for
3990    /// example, the connection closed), `handle` receives the outer `Err`
3991    /// describing the loss; there is no ack in that case.
3992    #[track_caller]
3993    fn consume_with<F>(
3994        self,
3995        handle: impl FnOnce(Result<Result<T, crate::Error>, crate::Error>) -> F + 'static + Send,
3996    ) -> Result<(), crate::Error>
3997    where
3998        F: Future<Output = Result<(), crate::Error>> + 'static + Send,
3999        T: Send,
4000    {
4001        let task_tx = self.task_tx.clone();
4002        let method = self.method;
4003        let response_rx = self.response_rx;
4004        let to_result = self.to_result;
4005        #[cfg(feature = "unstable_cancel_request")]
4006        let cancellation = self.cancellation;
4007        #[cfg(feature = "unstable_cancel_request")]
4008        let cancellation_sources = self.cancellation_sources;
4009        let location = Location::caller();
4010
4011        Task::new(location, async move {
4012            #[cfg(feature = "unstable_cancel_request")]
4013            let response = await_response_forwarding_cancellation(
4014                response_rx,
4015                &cancellation,
4016                &cancellation_sources,
4017            )
4018            .await;
4019            #[cfg(not(feature = "unstable_cancel_request"))]
4020            let response = response_rx.await;
4021
4022            match response {
4023                Ok(ResponsePayload { result, ack_tx }) => {
4024                    // Convert the result using to_result for Ok values
4025                    let typed_result = match result {
4026                        Ok(json_value) => to_result(json_value),
4027                        Err(err) => Err(err),
4028                    };
4029
4030                    let outcome = handle(Ok(typed_result)).await;
4031
4032                    // Ack AFTER the handler completes - this is the key
4033                    // difference from block_task. The dispatch loop waits for
4034                    // this ack.
4035                    if let Some(tx) = ack_tx {
4036                        let _ = tx.send(());
4037                    }
4038
4039                    outcome
4040                }
4041                Err(err) => {
4042                    handle(Err(crate::util::internal_error(format!(
4043                        "response to `{method}` never received: {err}"
4044                    ))))
4045                    .await
4046                }
4047            }
4048        })
4049        .spawn(&task_tx)
4050    }
4051
4052    /// Block the current task until the response is received.
4053    ///
4054    /// **Warning:** This method blocks the current async task. It is **only safe** to use
4055    /// in spawned tasks created with [`ConnectionTo::spawn`]. Using it directly in a
4056    /// handler callback will deadlock the connection.
4057    ///
4058    /// # Safe Usage (in spawned tasks)
4059    ///
4060    /// ```no_run
4061    /// # use agent_client_protocol_test::*;
4062    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
4063    /// # let connection = mock_connection();
4064    /// connection.on_receive_request(async |req: MyRequest, responder, cx| {
4065    ///     // Spawn a task to handle the request
4066    ///     cx.spawn({
4067    ///         let connection = cx.clone();
4068    ///         async move {
4069    ///             // Safe: We're in a spawned task, not blocking the event loop
4070    ///             let response = connection.send_request(OtherRequest {})
4071    ///                 .block_task()
4072    ///                 .await?;
4073    ///
4074    ///             // Process the response...
4075    ///             Ok(())
4076    ///         }
4077    ///     })?;
4078    ///
4079    ///     // Respond immediately
4080    ///     responder.respond(MyResponse { status: "ok".into() })
4081    /// }, agent_client_protocol::on_receive_request!())
4082    /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
4083    /// # Ok(())
4084    /// # }
4085    /// ```
4086    ///
4087    /// # Unsafe Usage (in handlers - will deadlock!)
4088    ///
4089    /// ```no_run
4090    /// # use agent_client_protocol_test::*;
4091    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
4092    /// # let connection = mock_connection();
4093    /// connection.on_receive_request(async |req: MyRequest, responder, cx| {
4094    ///     // ❌ DEADLOCK: Handler blocks event loop, which can't process the response
4095    ///     let response = cx.send_request(OtherRequest {})
4096    ///         .block_task()
4097    ///         .await?;
4098    ///
4099    ///     responder.respond(MyResponse { status: response.value })
4100    /// }, agent_client_protocol::on_receive_request!())
4101    /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
4102    /// # Ok(())
4103    /// # }
4104    /// ```
4105    ///
4106    /// # When to Use
4107    ///
4108    /// Use this method when:
4109    /// - You're in a spawned task (via [`ConnectionTo::spawn`])
4110    /// - You need the response value to proceed with your logic
4111    /// - Linear control flow is more natural than callbacks
4112    ///
4113    /// For handler callbacks, use [`on_receiving_result`](Self::on_receiving_result) instead.
4114    pub async fn block_task(self) -> Result<T, crate::Error>
4115    where
4116        T: Send,
4117    {
4118        #[cfg(feature = "unstable_cancel_request")]
4119        let response = await_response_forwarding_cancellation(
4120            self.response_rx,
4121            &self.cancellation,
4122            &self.cancellation_sources,
4123        )
4124        .await;
4125        #[cfg(not(feature = "unstable_cancel_request"))]
4126        let response = self.response_rx.await;
4127
4128        match response {
4129            Ok(ResponsePayload {
4130                result: Ok(json_value),
4131                ack_tx,
4132            }) => {
4133                // Ack immediately - we're in a spawned task, so the dispatch loop
4134                // can continue while we process the value.
4135                if let Some(tx) = ack_tx {
4136                    let _ = tx.send(());
4137                }
4138                match (self.to_result)(json_value) {
4139                    Ok(value) => Ok(value),
4140                    Err(err) => Err(err),
4141                }
4142            }
4143            Ok(ResponsePayload {
4144                result: Err(err),
4145                ack_tx,
4146            }) => {
4147                if let Some(tx) = ack_tx {
4148                    let _ = tx.send(());
4149                }
4150                Err(err)
4151            }
4152            Err(err) => Err(crate::util::internal_error(format!(
4153                "response to `{}` never received: {}",
4154                self.method, err
4155            ))),
4156        }
4157    }
4158
4159    /// Schedule an async task to run when a successful response is received.
4160    ///
4161    /// This is a convenience wrapper around [`on_receiving_result`](Self::on_receiving_result)
4162    /// for the common pattern of forwarding errors to a request context while only processing
4163    /// successful responses.
4164    ///
4165    /// # Behavior
4166    ///
4167    /// - If the response is `Ok(value)`, your task receives the value and the request context
4168    /// - If the response is `Err(error)`, the error is automatically sent to `responder`
4169    ///   and your task is not called
4170    ///
4171    /// # Example: Chaining requests
4172    ///
4173    /// ```no_run
4174    /// # use agent_client_protocol_test::*;
4175    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
4176    /// # let connection = mock_connection();
4177    /// connection.on_receive_request(async |req: ValidateRequest, responder, cx| {
4178    ///     // Send initial request
4179    ///     cx.send_request(ValidateRequest { data: req.data.clone() })
4180    ///         .on_receiving_ok_result(responder, async |validation, responder| {
4181    ///             // Only runs if validation succeeded
4182    ///             if validation.is_valid {
4183    ///                 // Respond to original request
4184    ///                 responder.respond(ValidateResponse { is_valid: true, error: None })
4185    ///             } else {
4186    ///                 responder.respond_with_error(agent_client_protocol::util::internal_error("validation failed"))
4187    ///             }
4188    ///         })?;
4189    ///
4190    ///     Ok(())
4191    /// }, agent_client_protocol::on_receive_request!())
4192    /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
4193    /// # Ok(())
4194    /// # }
4195    /// ```
4196    ///
4197    /// # Ordering
4198    ///
4199    /// Like [`on_receiving_result`](Self::on_receiving_result), the callback blocks the
4200    /// dispatch loop until it completes. See the [`ordering`](crate::concepts::ordering) module
4201    /// for details.
4202    ///
4203    /// # When to Use
4204    ///
4205    /// Use this when:
4206    /// - You need to respond to a request based on another request's result
4207    /// - You want errors to automatically propagate to the request context
4208    /// - You only care about the success case
4209    ///
4210    /// For more control over error handling, use [`on_receiving_result`](Self::on_receiving_result).
4211    #[track_caller]
4212    pub fn on_receiving_ok_result<F>(
4213        self,
4214        responder: Responder<T>,
4215        task: impl FnOnce(T, Responder<T>) -> F + 'static + Send,
4216    ) -> Result<(), crate::Error>
4217    where
4218        F: Future<Output = Result<(), crate::Error>> + 'static + Send,
4219        T: Send,
4220    {
4221        self.on_receiving_result(async move |result| match result {
4222            Ok(value) => task(value, responder).await,
4223            Err(err) => responder.respond_with_error(err),
4224        })
4225    }
4226
4227    /// Schedule an async task to run when the response is received.
4228    ///
4229    /// This is the recommended way to handle responses in handler callbacks, as it doesn't
4230    /// block the event loop. The task will be spawned automatically when the response arrives.
4231    ///
4232    /// # Example: Handle response in callback
4233    ///
4234    /// ```no_run
4235    /// # use agent_client_protocol_test::*;
4236    /// # async fn example() -> Result<(), agent_client_protocol::Error> {
4237    /// # let connection = mock_connection();
4238    /// connection.on_receive_request(async |req: MyRequest, responder, cx| {
4239    ///     // Send a request and schedule a callback for the response
4240    ///     cx.send_request(QueryRequest { id: 22 })
4241    ///         .on_receiving_result({
4242    ///             let connection = cx.clone();
4243    ///             async move |result| {
4244    ///                 match result {
4245    ///                     Ok(response) => {
4246    ///                         println!("Got response: {:?}", response);
4247    ///                         // Can send more messages here
4248    ///                         connection.send_notification(QueryComplete {})?;
4249    ///                         Ok(())
4250    ///                 }
4251    ///                     Err(error) => {
4252    ///                         eprintln!("Request failed: {}", error);
4253    ///                         Err(error)
4254    ///                     }
4255    ///                 }
4256    ///             }
4257    ///         })?;
4258    ///
4259    ///     // Handler continues immediately without waiting
4260    ///     responder.respond(MyResponse { status: "processing".into() })
4261    /// }, agent_client_protocol::on_receive_request!())
4262    /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
4263    /// # Ok(())
4264    /// # }
4265    /// ```
4266    ///
4267    /// # Ordering
4268    ///
4269    /// The callback runs as a spawned task, but the dispatch loop waits for it to complete
4270    /// before processing the next message. This gives you ordering guarantees: no other
4271    /// messages will be processed while your callback runs.
4272    ///
4273    /// This differs from [`block_task`](Self::block_task), which signals completion immediately
4274    /// upon receiving the response (before your code processes it).
4275    ///
4276    /// See the [`ordering`](crate::concepts::ordering) module for details on ordering guarantees
4277    /// and how to avoid deadlocks.
4278    ///
4279    /// # Error Handling
4280    ///
4281    /// If the scheduled task returns `Err`, the entire server will shut down. Make sure to handle
4282    /// errors appropriately within your task.
4283    ///
4284    /// # When to Use
4285    ///
4286    /// Use this method when:
4287    /// - You're in a handler callback (not a spawned task)
4288    /// - You want ordering guarantees (no other messages processed during your callback)
4289    /// - You need to do async work before "releasing" control back to the dispatch loop
4290    ///
4291    /// For spawned tasks where you don't need ordering guarantees, consider [`block_task`](Self::block_task).
4292    #[track_caller]
4293    pub fn on_receiving_result<F>(
4294        self,
4295        task: impl FnOnce(Result<T, crate::Error>) -> F + 'static + Send,
4296    ) -> Result<(), crate::Error>
4297    where
4298        F: Future<Output = Result<(), crate::Error>> + 'static + Send,
4299        T: Send,
4300    {
4301        self.consume_with(async move |response| {
4302            match response {
4303                // Run the user's callback on the peer's result.
4304                Ok(result) => task(result).await,
4305                // A response that was never delivered fails the consuming
4306                // task instead of invoking the callback.
4307                Err(err) => Err(err),
4308            }
4309        })
4310    }
4311}
4312
4313// ============================================================================
4314// IntoJrConnectionTransport Implementations
4315// ============================================================================
4316
4317/// A component that communicates over line streams.
4318///
4319/// `Lines` implements the [`ConnectTo`] trait for any pair of line-based streams
4320/// (a `Stream<Item = io::Result<String>>` for incoming and a `Sink<String>` for outgoing),
4321/// handling serialization of JSON-RPC messages to/from newline-delimited JSON.
4322///
4323/// This is a lower-level primitive than [`ByteStreams`] that enables interception and
4324/// transformation of individual lines before they are parsed or after they are serialized.
4325/// This is particularly useful for debugging, logging, or implementing custom line-based
4326/// protocols.
4327///
4328/// # Use Cases
4329///
4330/// - **Line-by-line logging**: Intercept and log each line before parsing
4331/// - **Custom protocols**: Transform lines before/after JSON-RPC processing
4332/// - **Debugging**: Inspect raw message strings
4333/// - **Line filtering**: Skip or modify specific messages
4334///
4335/// Most users should use [`ByteStreams`] instead, which provides a simpler interface
4336/// for byte-based I/O.
4337///
4338/// [`ConnectTo`]: crate::ConnectTo
4339#[derive(Debug)]
4340pub struct Lines<OutgoingSink, IncomingStream> {
4341    /// Outgoing line sink (where we write serialized JSON-RPC messages)
4342    pub outgoing: OutgoingSink,
4343    /// Incoming line stream (where we read and parse JSON-RPC messages)
4344    pub incoming: IncomingStream,
4345}
4346
4347impl<OutgoingSink, IncomingStream> Lines<OutgoingSink, IncomingStream>
4348where
4349    OutgoingSink: futures::Sink<String, Error = std::io::Error> + Send + 'static,
4350    IncomingStream: futures::Stream<Item = std::io::Result<String>> + Send + 'static,
4351{
4352    /// Create a new line stream transport.
4353    pub fn new(outgoing: OutgoingSink, incoming: IncomingStream) -> Self {
4354        Self { outgoing, incoming }
4355    }
4356}
4357
4358impl<OutgoingSink, IncomingStream, R: Role> ConnectTo<R> for Lines<OutgoingSink, IncomingStream>
4359where
4360    OutgoingSink: futures::Sink<String, Error = std::io::Error> + Send + 'static,
4361    IncomingStream: futures::Stream<Item = std::io::Result<String>> + Send + 'static,
4362{
4363    async fn connect_to(self, client: impl ConnectTo<R::Counterpart>) -> Result<(), crate::Error> {
4364        let (channel, serve_self) = ConnectTo::<R>::into_channel_and_future(self);
4365        match futures::future::select(Box::pin(client.connect_to(channel)), serve_self).await {
4366            Either::Left((result, _)) | Either::Right((result, _)) => result,
4367        }
4368    }
4369
4370    fn into_channel_and_future(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
4371        let Self { outgoing, incoming } = self;
4372
4373        // Create a channel pair for the client to use
4374        let (channel_for_caller, channel_for_lines) = Channel::duplex();
4375
4376        // Create the server future that runs the line stream actors
4377        let server_future = Box::pin(async move {
4378            let Channel { rx, tx } = channel_for_lines;
4379
4380            // Run both actors concurrently
4381            let outgoing_future = transport_actor::transport_outgoing_lines_actor(rx, outgoing);
4382            let incoming_future = transport_actor::transport_incoming_lines_actor(incoming, tx);
4383
4384            // Wait for both to complete
4385            futures::try_join!(outgoing_future, incoming_future)?;
4386
4387            Ok(())
4388        });
4389
4390        (channel_for_caller, server_future)
4391    }
4392}
4393
4394/// A component that communicates over byte streams (stdin/stdout, sockets, pipes, etc.).
4395///
4396/// `ByteStreams` implements the [`ConnectTo`] trait for any pair of `AsyncRead` and `AsyncWrite`
4397/// streams, handling serialization of JSON-RPC messages to/from newline-delimited JSON.
4398/// This is the standard way to communicate with external processes or network connections.
4399///
4400/// # Use Cases
4401///
4402/// - **Stdio communication**: Connect to agents or proxies via stdin/stdout
4403/// - **Network sockets**: TCP, Unix domain sockets, or other stream-based protocols
4404/// - **Named pipes**: Cross-process communication on the same machine
4405/// - **File I/O**: Reading from and writing to file descriptors
4406///
4407/// # Example
4408///
4409/// Connecting to an agent via stdio:
4410///
4411/// ```no_run
4412/// use agent_client_protocol::UntypedRole;
4413/// # use agent_client_protocol::{ByteStreams};
4414/// use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
4415///
4416/// # async fn example() -> Result<(), agent_client_protocol::Error> {
4417/// let component = ByteStreams::new(
4418///     tokio::io::stdout().compat_write(),
4419///     tokio::io::stdin().compat(),
4420/// );
4421///
4422/// // Use as a component in a connection
4423/// agent_client_protocol::UntypedRole.builder()
4424///     .name("my-client")
4425///     .connect_to(component)
4426///     .await?;
4427/// # Ok(())
4428/// # }
4429/// ```
4430///
4431/// [`ConnectTo`]: crate::ConnectTo
4432#[derive(Debug)]
4433pub struct ByteStreams<OB, IB> {
4434    /// Outgoing byte stream (where we write serialized messages)
4435    pub outgoing: OB,
4436    /// Incoming byte stream (where we read and parse messages)
4437    pub incoming: IB,
4438}
4439
4440impl<OB, IB> ByteStreams<OB, IB>
4441where
4442    OB: AsyncWrite + Send + 'static,
4443    IB: AsyncRead + Send + 'static,
4444{
4445    /// Create a new byte stream transport.
4446    pub fn new(outgoing: OB, incoming: IB) -> Self {
4447        Self { outgoing, incoming }
4448    }
4449}
4450
4451impl<OB, IB, R: Role> ConnectTo<R> for ByteStreams<OB, IB>
4452where
4453    OB: AsyncWrite + Send + 'static,
4454    IB: AsyncRead + Send + 'static,
4455{
4456    async fn connect_to(self, client: impl ConnectTo<R::Counterpart>) -> Result<(), crate::Error> {
4457        let (channel, serve_self) = ConnectTo::<R>::into_channel_and_future(self);
4458        match futures::future::select(pin!(client.connect_to(channel)), serve_self).await {
4459            Either::Left((result, _)) | Either::Right((result, _)) => result,
4460        }
4461    }
4462
4463    fn into_channel_and_future(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
4464        use futures::AsyncBufReadExt;
4465        use futures::AsyncWriteExt;
4466        use futures::io::BufReader;
4467        let Self { outgoing, incoming } = self;
4468
4469        // Convert byte streams to line streams
4470        // Box both streams to satisfy Unpin requirements
4471        let incoming_lines = Box::pin(BufReader::new(incoming).lines());
4472
4473        // Create a sink that writes lines (with newlines) to the outgoing byte stream
4474        // We need to Box the writer since it may not be Unpin
4475        let outgoing_sink =
4476            futures::sink::unfold(Box::pin(outgoing), async move |mut writer, line: String| {
4477                let mut bytes = line.into_bytes();
4478                bytes.push(b'\n');
4479                writer.write_all(&bytes).await?;
4480                Ok::<_, std::io::Error>(writer)
4481            });
4482
4483        // Delegate to Lines component
4484        ConnectTo::<R>::into_channel_and_future(Lines::new(outgoing_sink, incoming_lines))
4485    }
4486}
4487
4488/// A channel endpoint representing one side of a bidirectional message channel.
4489///
4490/// `Channel` represents a single endpoint's view of a bidirectional communication channel.
4491/// Each endpoint has:
4492/// - `rx`: A receiver for incoming messages (or errors) from the counterpart
4493/// - `tx`: A sender for outgoing messages (or errors) to the counterpart
4494///
4495/// # Example
4496///
4497/// ```no_run
4498/// # use agent_client_protocol::UntypedRole;
4499/// # use agent_client_protocol::{Channel, Builder};
4500/// # async fn example() -> Result<(), agent_client_protocol::Error> {
4501/// // Create a pair of connected channels
4502/// let (channel_a, channel_b) = Channel::duplex();
4503///
4504/// // Each channel can be used by a different component
4505/// UntypedRole.builder()
4506///     .name("connection-a")
4507///     .connect_to(channel_a)
4508///     .await?;
4509/// # Ok(())
4510/// # }
4511/// ```
4512#[derive(Debug)]
4513pub struct Channel {
4514    /// Receives messages (or errors) from the counterpart.
4515    pub rx: mpsc::UnboundedReceiver<Result<RawJsonRpcMessage, crate::Error>>,
4516    /// Sends messages (or errors) to the counterpart.
4517    pub tx: mpsc::UnboundedSender<Result<RawJsonRpcMessage, crate::Error>>,
4518}
4519
4520impl Channel {
4521    /// Create a pair of connected channel endpoints.
4522    ///
4523    /// Returns two `Channel` instances that are connected to each other:
4524    /// - Messages sent via `channel_a.tx` are received on `channel_b.rx`
4525    /// - Messages sent via `channel_b.tx` are received on `channel_a.rx`
4526    ///
4527    /// # Returns
4528    ///
4529    /// A tuple `(channel_a, channel_b)` of connected channel endpoints.
4530    #[must_use]
4531    pub fn duplex() -> (Self, Self) {
4532        // Create channels: A sends Result<Message> which B receives as Message
4533        let (a_tx, b_rx) = mpsc::unbounded();
4534        let (b_tx, a_rx) = mpsc::unbounded();
4535
4536        let channel_a = Self { rx: a_rx, tx: a_tx };
4537        let channel_b = Self { rx: b_rx, tx: b_tx };
4538
4539        (channel_a, channel_b)
4540    }
4541
4542    /// Copy messages from `rx` to `tx`.
4543    ///
4544    /// # Returns
4545    ///
4546    /// A `Result` indicating success or failure.
4547    pub async fn copy(mut self) -> Result<(), crate::Error> {
4548        while let Some(msg) = self.rx.next().await {
4549            self.tx
4550                .unbounded_send(msg)
4551                .map_err(crate::util::internal_error)?;
4552        }
4553        Ok(())
4554    }
4555}
4556
4557impl<R: Role> ConnectTo<R> for Channel {
4558    async fn connect_to(self, client: impl ConnectTo<R::Counterpart>) -> Result<(), crate::Error> {
4559        let (client_channel, client_serve) = client.into_channel_and_future();
4560
4561        match futures::try_join!(
4562            Channel {
4563                rx: client_channel.rx,
4564                tx: self.tx
4565            }
4566            .copy(),
4567            Channel {
4568                rx: self.rx,
4569                tx: client_channel.tx
4570            }
4571            .copy(),
4572            client_serve
4573        ) {
4574            Ok(((), (), ())) => Ok(()),
4575            Err(err) => Err(err),
4576        }
4577    }
4578
4579    fn into_channel_and_future(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
4580        (self, Box::pin(future::ready(Ok(()))))
4581    }
4582}
4583
4584#[cfg(test)]
4585mod tests {
4586    use super::*;
4587
4588    #[test]
4589    fn peel_successor_envelopes_returns_plain_messages_unchanged() {
4590        let params = serde_json::json!({ "key": "value" });
4591        let (method, peeled) = peel_successor_envelopes("session/update", &params);
4592        assert_eq!(method, "session/update");
4593        assert_eq!(peeled, &params);
4594    }
4595
4596    #[test]
4597    fn peel_successor_envelopes_unwraps_nested_envelopes() {
4598        let params = serde_json::json!({
4599            "method": "_proxy/successor",
4600            "params": {
4601                "method": "$/cancel_request",
4602                "params": { "requestId": "req-1" }
4603            }
4604        });
4605        let (method, peeled) = peel_successor_envelopes("_proxy/successor", &params);
4606        assert_eq!(method, "$/cancel_request");
4607        assert_eq!(peeled, &serde_json::json!({ "requestId": "req-1" }));
4608    }
4609
4610    #[test]
4611    fn peel_successor_envelopes_leaves_malformed_envelopes_intact() {
4612        // No string `method` field: the envelope cannot be peeled, so the
4613        // message is returned as-is for the handler chain to deal with.
4614        let params = serde_json::json!({ "unexpected": true });
4615        let (method, peeled) = peel_successor_envelopes("_proxy/successor", &params);
4616        assert_eq!(method, "_proxy/successor");
4617        assert_eq!(peeled, &params);
4618    }
4619
4620    #[cfg(feature = "unstable_cancel_request")]
4621    mod cancel_request {
4622        use super::super::*;
4623
4624        fn notification(method: &str, params: serde_json::Value) -> UntypedMessage {
4625            UntypedMessage::new(method, params).expect("well-formed JSON")
4626        }
4627
4628        #[test]
4629        fn cancellation_request_id_is_extracted_from_wrapped_notifications() {
4630            let message = notification(
4631                "_proxy/successor",
4632                serde_json::json!({
4633                    "method": "$/cancel_request",
4634                    "params": { "requestId": "req-1" }
4635                }),
4636            );
4637            let request_id = cancellation_request_id_from_message(&message)
4638                .expect("wrapped cancel should parse");
4639            assert_eq!(request_id, Some(RequestId::Str("req-1".into())));
4640        }
4641
4642        #[test]
4643        fn malformed_successor_envelope_is_not_treated_as_cancellation() {
4644            // The envelope cannot be peeled; the message must flow on to the
4645            // handler chain instead of erroring the dispatch.
4646            let message = notification("_proxy/successor", serde_json::json!({ "bogus": true }));
4647            let request_id = cancellation_request_id_from_message(&message)
4648                .expect("malformed envelope should be left to the handler chain");
4649            assert_eq!(request_id, None);
4650        }
4651
4652        #[test]
4653        fn cancel_request_notifications_are_detected_even_when_wrapped() {
4654            let plain = notification("$/cancel_request", serde_json::json!({ "requestId": 1 }));
4655            assert!(is_cancel_request_notification(&plain));
4656
4657            let wrapped = notification(
4658                "_proxy/successor",
4659                serde_json::json!({
4660                    "method": "$/cancel_request",
4661                    "params": { "requestId": 1 }
4662                }),
4663            );
4664            assert!(is_cancel_request_notification(&wrapped));
4665
4666            let other_wrapped = notification(
4667                "_proxy/successor",
4668                serde_json::json!({
4669                    "method": "session/update",
4670                    "params": {}
4671                }),
4672            );
4673            assert!(!is_cancel_request_notification(&other_wrapped));
4674
4675            let malformed_envelope =
4676                notification("_proxy/successor", serde_json::json!({ "bogus": true }));
4677            assert!(!is_cancel_request_notification(&malformed_envelope));
4678        }
4679
4680        #[test]
4681        fn malformed_cancel_request_params_error() {
4682            let message = notification(
4683                "$/cancel_request",
4684                serde_json::json!({ "requestId": { "not": "an id" } }),
4685            );
4686            cancellation_request_id_from_message(&message)
4687                .expect_err("malformed cancel params should error");
4688        }
4689
4690        #[test]
4691        fn registry_marks_and_removes_requests() {
4692            let registry = RequestCancellationRegistry::new();
4693            let id = RequestId::Str("req-1".into());
4694
4695            let responder_cancellation = registry.register(&id);
4696            let marker = responder_cancellation.cancellation();
4697            assert!(!marker.is_cancelled());
4698
4699            assert!(registry.cancel(&id));
4700            assert!(marker.is_cancelled());
4701            assert!(responder_cancellation.cancellation().is_cancelled());
4702
4703            drop(responder_cancellation);
4704            assert!(!registry.cancel(&id), "slot should be removed on drop");
4705        }
4706
4707        #[test]
4708        fn reused_request_id_does_not_cross_wire_cancellation_state() {
4709            let registry = RequestCancellationRegistry::new();
4710            let id = RequestId::Str("dup".into());
4711
4712            // A protocol-violating peer reuses an in-flight request ID.
4713            let first = registry.register(&id);
4714            let first_marker = first.cancellation();
4715            let second = registry.register(&id);
4716            let second_marker = second.cancellation();
4717
4718            // A cancellation targets whichever request currently owns the ID.
4719            assert!(registry.cancel(&id));
4720            assert!(second_marker.is_cancelled());
4721            assert!(
4722                !first_marker.is_cancelled(),
4723                "the stale request must not observe the newer request's cancellation"
4724            );
4725
4726            // The stale responder must hand out detached markers, not the
4727            // newer request's marker.
4728            assert!(!first.cancellation().is_cancelled());
4729
4730            // Dropping the stale responder must not remove the newer
4731            // request's slot.
4732            drop(first);
4733            assert!(registry.cancel(&id), "newer slot should still be present");
4734
4735            drop(second);
4736            assert!(!registry.cancel(&id), "slot should be removed on drop");
4737        }
4738    }
4739}