agent_client_protocol/jsonrpc.rs
1//! Core JSON-RPC server support.
2
3use agent_client_protocol_schema::v1::{
4 JsonRpcMessage as VersionedJsonRpcMessage, Notification as RpcNotification,
5 Request as RpcRequest, RequestId, Response as RpcResponse, SessionId,
6};
7
8// Types re-exported from crate root
9use serde::{Deserialize, Serialize};
10use std::any::TypeId;
11#[cfg(feature = "unstable_cancel_request")]
12use std::collections::HashMap;
13use std::fmt::Debug;
14use std::panic::Location;
15use std::pin::pin;
16use std::sync::Arc;
17#[cfg(feature = "unstable_cancel_request")]
18use std::sync::{
19 Mutex,
20 atomic::{AtomicBool, Ordering},
21};
22use uuid::Uuid;
23
24#[cfg(feature = "unstable_cancel_request")]
25use futures::FutureExt;
26use futures::channel::{mpsc, oneshot};
27use futures::future::{self, BoxFuture, Either};
28use futures::{AsyncRead, AsyncWrite, StreamExt};
29
30mod dynamic_handler;
31pub(crate) mod handlers;
32mod incoming_actor;
33mod outgoing_actor;
34mod protocol_compat;
35pub(crate) mod run;
36mod task_actor;
37mod transport_actor;
38
39use crate::jsonrpc::dynamic_handler::DynamicHandlerMessage;
40pub use crate::jsonrpc::handlers::NullHandler;
41use crate::jsonrpc::handlers::{ChainedHandler, NamedHandler};
42use crate::jsonrpc::handlers::{MessageHandler, NotificationHandler, RequestHandler};
43use crate::jsonrpc::outgoing_actor::{OutgoingMessageTx, send_raw_message};
44use crate::jsonrpc::protocol_compat::{ProtocolCompat, ProtocolMode};
45use crate::jsonrpc::run::SpawnedRun;
46use crate::jsonrpc::run::{ChainRun, NullRun, RunWithConnectionTo};
47use crate::jsonrpc::task_actor::{Task, TaskTx};
48use crate::mcp_server::McpServer;
49use crate::role::HasPeer;
50use crate::role::Role;
51use crate::{Agent, Client, ConnectTo, RoleId};
52
53/// Raw JSON-RPC message transported by [`Channel`].
54///
55/// This uses the JSON-RPC envelope types from `agent-client-protocol-schema`
56/// while keeping method params as raw, JSON-RPC-valid params at the transport boundary.
57#[derive(Debug, Clone)]
58pub enum RawJsonRpcMessage {
59 /// A JSON-RPC request with an id and expected response.
60 Request(RpcRequest<RawJsonRpcParams>),
61 /// A JSON-RPC notification without a response.
62 Notification(RpcNotification<RawJsonRpcParams>),
63 /// A JSON-RPC response to a prior request.
64 Response(RpcResponse<serde_json::Value>),
65}
66
67/// Raw JSON-RPC request or notification parameters.
68///
69/// JSON-RPC params, when present, must be either an array or an object.
70#[derive(Debug, Clone, PartialEq)]
71pub enum RawJsonRpcParams {
72 /// Positional JSON-RPC params.
73 Array(Vec<serde_json::Value>),
74 /// Named JSON-RPC params.
75 Object(serde_json::Map<String, serde_json::Value>),
76}
77
78impl RawJsonRpcParams {
79 /// Convert a JSON value into JSON-RPC params.
80 pub fn from_value(value: serde_json::Value) -> Result<Option<Self>, crate::Error> {
81 match value {
82 serde_json::Value::Null => Ok(None),
83 serde_json::Value::Array(array) => Ok(Some(Self::Array(array))),
84 serde_json::Value::Object(object) => Ok(Some(Self::Object(object))),
85 _ => {
86 Err(crate::Error::invalid_params()
87 .data("JSON-RPC params must be an object or array"))
88 }
89 }
90 }
91
92 /// Convert params back into a JSON value.
93 #[must_use]
94 pub fn into_value(self) -> serde_json::Value {
95 match self {
96 Self::Array(array) => serde_json::Value::Array(array),
97 Self::Object(object) => serde_json::Value::Object(object),
98 }
99 }
100}
101
102impl Serialize for RawJsonRpcParams {
103 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
104 where
105 S: serde::Serializer,
106 {
107 match self {
108 Self::Array(array) => array.serialize(serializer),
109 Self::Object(object) => object.serialize(serializer),
110 }
111 }
112}
113
114impl<'de> Deserialize<'de> for RawJsonRpcParams {
115 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
116 where
117 D: serde::Deserializer<'de>,
118 {
119 let value = serde_json::Value::deserialize(deserializer)?;
120 match value {
121 serde_json::Value::Array(array) => Ok(Self::Array(array)),
122 serde_json::Value::Object(object) => Ok(Self::Object(object)),
123 _ => Err(serde::de::Error::custom(
124 "JSON-RPC params must be an object or array",
125 )),
126 }
127 }
128}
129
130impl RawJsonRpcMessage {
131 /// Build a raw JSON-RPC request message.
132 pub fn request(
133 method: String,
134 params: serde_json::Value,
135 id: RequestId,
136 ) -> Result<Self, crate::Error> {
137 Ok(Self::Request(RpcRequest {
138 id,
139 method: Arc::from(method),
140 params: RawJsonRpcParams::from_value(params)?,
141 }))
142 }
143
144 /// Build a raw JSON-RPC notification message.
145 pub fn notification(method: String, params: serde_json::Value) -> Result<Self, crate::Error> {
146 Ok(Self::Notification(RpcNotification {
147 method: Arc::from(method),
148 params: RawJsonRpcParams::from_value(params)?,
149 }))
150 }
151
152 /// Build a raw JSON-RPC response message.
153 #[must_use]
154 pub fn response(id: RequestId, response: Result<serde_json::Value, crate::Error>) -> Self {
155 Self::Response(RpcResponse::new(id, response))
156 }
157
158 /// The response id, if this is a response.
159 #[must_use]
160 pub fn response_id(&self) -> Option<&RequestId> {
161 match self {
162 Self::Response(RpcResponse::Result { id, .. } | RpcResponse::Error { id, .. }) => {
163 Some(id)
164 }
165 Self::Request(_) | Self::Notification(_) => None,
166 }
167 }
168}
169
170impl Serialize for RawJsonRpcMessage {
171 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
172 where
173 S: serde::Serializer,
174 {
175 match self {
176 Self::Request(request) => {
177 VersionedJsonRpcMessage::wrap(request.clone()).serialize(serializer)
178 }
179 Self::Notification(notification) => {
180 VersionedJsonRpcMessage::wrap(notification.clone()).serialize(serializer)
181 }
182 Self::Response(response) => {
183 VersionedJsonRpcMessage::wrap(response.clone()).serialize(serializer)
184 }
185 }
186 }
187}
188
189impl<'de> Deserialize<'de> for RawJsonRpcMessage {
190 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
191 where
192 D: serde::Deserializer<'de>,
193 {
194 let value = serde_json::Value::deserialize(deserializer)?;
195 if value.get("method").is_some() {
196 if value.get("id").is_some() {
197 let request = serde_json::from_value::<
198 VersionedJsonRpcMessage<RpcRequest<RawJsonRpcParams>>,
199 >(value)
200 .map_err(serde::de::Error::custom)?
201 .into_inner();
202 Ok(Self::Request(request))
203 } else {
204 let notification = serde_json::from_value::<
205 VersionedJsonRpcMessage<RpcNotification<RawJsonRpcParams>>,
206 >(value)
207 .map_err(serde::de::Error::custom)?
208 .into_inner();
209 Ok(Self::Notification(notification))
210 }
211 } else if value.get("result").is_some() || value.get("error").is_some() {
212 let response = serde_json::from_value::<
213 VersionedJsonRpcMessage<RpcResponse<serde_json::Value>>,
214 >(value)
215 .map_err(serde::de::Error::custom)?
216 .into_inner();
217 Ok(Self::Response(response))
218 } else {
219 Err(serde::de::Error::custom("invalid JSON-RPC message"))
220 }
221 }
222}
223
224fn params_from_transport(params: Option<RawJsonRpcParams>) -> serde_json::Value {
225 params.map_or(serde_json::Value::Null, RawJsonRpcParams::into_value)
226}
227
228/// Handlers process incoming JSON-RPC messages on a connection.
229///
230/// When messages arrive, they flow through a chain of handlers. Each handler can
231/// either **claim** the message (handle it) or **decline** it (pass to the next handler).
232///
233/// # Message Flow
234///
235/// Messages flow through three layers of handlers in order:
236///
237/// ```text
238/// ┌─────────────────────────────────────────────────────────────────┐
239/// │ Incoming Message │
240/// └─────────────────────────────────────────────────────────────────┘
241/// │
242/// ▼
243/// ┌─────────────────────────────────────────────────────────────────┐
244/// │ 1. User Handlers (registered via on_receive_request, etc.) │
245/// │ - Tried in registration order │
246/// │ - First handler to return Handled::Yes claims the message │
247/// └─────────────────────────────────────────────────────────────────┘
248/// │ Handled::No
249/// ▼
250/// ┌─────────────────────────────────────────────────────────────────┐
251/// │ 2. Dynamic Handlers (added at runtime) │
252/// │ - Used for session-specific message handling │
253/// │ - Added via ConnectionTo::add_dynamic_handler │
254/// └─────────────────────────────────────────────────────────────────┘
255/// │ Handled::No
256/// ▼
257/// ┌─────────────────────────────────────────────────────────────────┐
258/// │ 3. Role Default Handler │
259/// │ - Fallback based on the connection's Role │
260/// │ - Handles protocol-level messages (e.g., proxy forwarding) │
261/// └─────────────────────────────────────────────────────────────────┘
262/// │ Handled::No
263/// ▼
264/// ┌─────────────────────────────────────────────────────────────────┐
265/// │ Unhandled: Error response sent (or queued if retry=true) │
266/// └─────────────────────────────────────────────────────────────────┘
267/// ```
268///
269/// # The `Handled` Return Value
270///
271/// Each handler returns [`Handled`] to indicate whether it processed the message:
272///
273/// - **`Handled::Yes`** - Message was handled. No further handlers are invoked.
274/// - **`Handled::No { message, retry }`** - Message was not handled. The message
275/// (possibly modified) is passed to the next handler in the chain.
276///
277/// For convenience, handlers can return `()` which is equivalent to `Handled::Yes`.
278///
279/// # The Retry Mechanism
280///
281/// The `retry` flag in `Handled::No` controls what happens when no handler claims a message:
282///
283/// - **`retry: false`** (default) - Send a "method not found" error response immediately.
284/// - **`retry: true`** - Queue the message and retry it when new dynamic handlers are added.
285///
286/// This mechanism exists because of a timing issue with sessions: when a `session/new`
287/// response is being processed, the dynamic handler for that session hasn't been registered
288/// yet, but `session/update` notifications for that session may already be arriving.
289/// By setting `retry: true`, these early notifications are queued until the session's
290/// dynamic handler is added.
291///
292/// # Handler Registration
293///
294/// Most users register handlers using the builder methods on [`Builder`]:
295///
296/// ```
297/// # use agent_client_protocol::{Agent, Client, ConnectTo};
298/// # use agent_client_protocol::schema::v1::{AgentCapabilities, InitializeRequest, InitializeResponse};
299/// # use agent_client_protocol_test::StatusUpdate;
300/// # async fn example(transport: impl ConnectTo<Agent>) -> Result<(), agent_client_protocol::Error> {
301/// Agent.builder()
302/// .on_receive_request(async |req: InitializeRequest, responder, cx| {
303/// responder.respond(
304/// InitializeResponse::new(req.protocol_version)
305/// .agent_capabilities(AgentCapabilities::new()),
306/// )
307/// }, agent_client_protocol::on_receive_request!())
308/// .on_receive_notification(async |notif: StatusUpdate, cx| {
309/// // Process notification
310/// Ok(())
311/// }, agent_client_protocol::on_receive_notification!())
312/// .connect_to(transport)
313/// .await?;
314/// # Ok(())
315/// # }
316/// ```
317///
318/// The type parameter on the closure determines which messages are dispatched to it.
319/// Messages that don't match the type are automatically passed to the next handler.
320///
321/// # Implementing Custom Handlers
322///
323/// For advanced use cases, you can implement `HandleMessageAs` directly:
324///
325/// ```ignore
326/// struct MyHandler;
327///
328/// impl HandleMessageAs<Agent> for MyHandler {
329///
330/// async fn handle_dispatch(
331/// &mut self,
332/// message: Dispatch,
333/// cx: ConnectionTo<Self::Role>,
334/// ) -> Result<Handled<Dispatch>, Error> {
335/// if message.method() == "my/custom/method" {
336/// // Handle it
337/// Ok(Handled::Yes)
338/// } else {
339/// // Pass to next handler
340/// Ok(Handled::No { message, retry: false })
341/// }
342/// }
343///
344/// fn describe_chain(&self) -> impl std::fmt::Debug {
345/// "MyHandler"
346/// }
347/// }
348/// ```
349///
350/// # Important: Handlers Must Not Block
351///
352/// The connection processes messages on a single async task. While a handler is running,
353/// no other messages can be processed. For expensive operations, use [`ConnectionTo::spawn`]
354/// to run work concurrently:
355///
356/// ```
357/// # use agent_client_protocol::{Client, Agent, ConnectTo};
358/// # use agent_client_protocol_test::{expensive_operation, ProcessComplete};
359/// # async fn example(transport: impl ConnectTo<Client>) -> Result<(), agent_client_protocol::Error> {
360/// # Client.builder().connect_with(transport, async |cx| {
361/// cx.spawn({
362/// let connection = cx.clone();
363/// async move {
364/// let result = expensive_operation("data").await?;
365/// connection.send_notification(ProcessComplete { result })?;
366/// Ok(())
367/// }
368/// })?;
369/// # Ok(())
370/// # }).await?;
371/// # Ok(())
372/// # }
373/// ```
374#[allow(async_fn_in_trait)]
375/// A handler for incoming JSON-RPC messages.
376///
377/// This trait is implemented by types that can process incoming messages on a connection.
378/// Handlers are registered with a [`Builder`] and are called in order until
379/// one claims the message.
380///
381/// The type parameter `R` is the role this handler plays - who I am.
382/// For an agent handler, `R = Agent` (I handle messages as an agent).
383/// For a client handler, `R = Client` (I handle messages as a client).
384pub trait HandleDispatchFrom<Counterpart: Role>: Send {
385 /// Attempt to claim an incoming message (request or notification).
386 ///
387 /// # Important: do not block
388 ///
389 /// The server will not process new messages until this handler returns.
390 /// You should avoid blocking in this callback unless you wish to block the server (e.g., for rate limiting).
391 /// The recommended approach to manage expensive operations is to the [`ConnectionTo::spawn`] method available on the message context.
392 ///
393 /// # Parameters
394 ///
395 /// * `message` - The incoming message to handle.
396 /// * `connection` - The connection, used to send messages and access connection state.
397 ///
398 /// # Returns
399 ///
400 /// * `Ok(Handled::Yes)` if the message was claimed. It will not be propagated further.
401 /// * `Ok(Handled::No(message))` if not; the (possibly changed) message will be passed to the remaining handlers.
402 /// * `Err` if an internal error occurs (this will bring down the server).
403 fn handle_dispatch_from(
404 &mut self,
405 message: Dispatch,
406 connection: ConnectionTo<Counterpart>,
407 ) -> impl Future<Output = Result<Handled<Dispatch>, crate::Error>> + Send;
408
409 /// Returns a debug description of the registered handlers for diagnostics.
410 fn describe_chain(&self) -> impl std::fmt::Debug;
411}
412
413impl<Counterpart: Role, H> HandleDispatchFrom<Counterpart> for &mut H
414where
415 H: HandleDispatchFrom<Counterpart>,
416{
417 fn handle_dispatch_from(
418 &mut self,
419 message: Dispatch,
420 cx: ConnectionTo<Counterpart>,
421 ) -> impl Future<Output = Result<Handled<Dispatch>, crate::Error>> + Send {
422 H::handle_dispatch_from(self, message, cx)
423 }
424
425 fn describe_chain(&self) -> impl std::fmt::Debug {
426 H::describe_chain(self)
427 }
428}
429
430/// A JSON-RPC connection that can act as either a server, client, or both.
431///
432/// [`Builder`] provides a builder-style API for creating JSON-RPC servers and clients.
433/// You start by calling `Role.builder()` (e.g., `Client.builder()`), then add message
434/// handlers, and finally drive the connection with either [`connect_to`](Builder::connect_to)
435/// or [`connect_with`](Builder::connect_with), providing a component implementation
436/// (e.g., [`ByteStreams`] for byte streams).
437///
438/// # JSON-RPC Primer
439///
440/// JSON-RPC 2.0 has two fundamental message types:
441///
442/// * **Requests** - Messages that expect a response. They have an `id` field that gets
443/// echoed back in the response so the sender can correlate them.
444/// * **Notifications** - Fire-and-forget messages with no `id` field. The sender doesn't
445/// expect or receive a response.
446///
447/// # Type-Driven Message Dispatch
448///
449/// The handler registration methods use Rust's type system to determine which messages
450/// to handle. The type parameter you provide controls what gets dispatched to your handler:
451///
452/// ## Single Message Types
453///
454/// The simplest case - handle one specific message type:
455///
456/// ```no_run
457/// # use agent_client_protocol_test::*;
458/// # use agent_client_protocol::schema::v1::{InitializeRequest, InitializeResponse, SessionNotification};
459/// # async fn example() -> Result<(), agent_client_protocol::Error> {
460/// # let connection = mock_connection();
461/// connection
462/// .on_receive_request(async |req: InitializeRequest, responder, cx| {
463/// // Handle only InitializeRequest messages
464/// responder.respond(InitializeResponse::make())
465/// }, agent_client_protocol::on_receive_request!())
466/// .on_receive_notification(async |notif: SessionNotification, cx| {
467/// // Handle only SessionUpdate notifications
468/// Ok(())
469/// }, agent_client_protocol::on_receive_notification!())
470/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
471/// # Ok(())
472/// # }
473/// ```
474///
475/// ## Enum Message Types
476///
477/// You can also handle multiple related messages with a single handler by defining an enum
478/// that implements the appropriate trait ([`JsonRpcRequest`] or [`JsonRpcNotification`]):
479///
480/// ```no_run
481/// # use agent_client_protocol_test::*;
482/// # use agent_client_protocol::{JsonRpcRequest, JsonRpcMessage, UntypedMessage};
483/// # use agent_client_protocol::schema::v1::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse};
484/// # async fn example() -> Result<(), agent_client_protocol::Error> {
485/// # let connection = mock_connection();
486/// // Define an enum for multiple request types
487/// #[derive(Debug, Clone)]
488/// enum MyRequests {
489/// Initialize(InitializeRequest),
490/// Prompt(PromptRequest),
491/// }
492///
493/// // Implement JsonRpcRequest for your enum
494/// # impl JsonRpcMessage for MyRequests {
495/// # fn matches_method(_method: &str) -> bool { false }
496/// # fn method(&self) -> &str { "myRequests" }
497/// # fn to_untyped_message(&self) -> Result<UntypedMessage, agent_client_protocol::Error> { todo!() }
498/// # fn parse_message(_method: &str, _params: &impl serde::Serialize) -> Result<Self, agent_client_protocol::Error> { Err(agent_client_protocol::Error::method_not_found()) }
499/// # }
500/// impl JsonRpcRequest for MyRequests { type Response = serde_json::Value; }
501///
502/// // Handle all variants in one place
503/// connection.on_receive_request(async |req: MyRequests, responder, cx| {
504/// match req {
505/// MyRequests::Initialize(init) => { responder.respond(serde_json::json!({})) }
506/// MyRequests::Prompt(prompt) => { responder.respond(serde_json::json!({})) }
507/// }
508/// }, agent_client_protocol::on_receive_request!())
509/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
510/// # Ok(())
511/// # }
512/// ```
513///
514/// ## Mixed Message Types
515///
516/// For enums containing both requests AND notifications, use [`on_receive_dispatch`](Self::on_receive_dispatch):
517///
518/// ```no_run
519/// # use agent_client_protocol_test::*;
520/// # use agent_client_protocol::Dispatch;
521/// # use agent_client_protocol::schema::v1::{InitializeRequest, InitializeResponse, SessionNotification};
522/// # async fn example() -> Result<(), agent_client_protocol::Error> {
523/// # let connection = mock_connection();
524/// // on_receive_dispatch receives Dispatch which can be either a request or notification
525/// connection.on_receive_dispatch(async |msg: Dispatch<InitializeRequest, SessionNotification>, _cx| {
526/// match msg {
527/// Dispatch::Request(req, responder) => {
528/// responder.respond(InitializeResponse::make())
529/// }
530/// Dispatch::Notification(notif) => {
531/// Ok(())
532/// }
533/// Dispatch::Response(result, router) => {
534/// // Forward response to its destination
535/// router.respond_with_result(result)
536/// }
537/// }
538/// }, agent_client_protocol::on_receive_dispatch!())
539/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
540/// # Ok(())
541/// # }
542/// ```
543///
544/// # Handler Registration
545///
546/// Register handlers using these methods (listed from most common to most flexible):
547///
548/// * [`on_receive_request`](Self::on_receive_request) - Handle JSON-RPC requests (messages expecting responses)
549/// * [`on_receive_notification`](Self::on_receive_notification) - Handle JSON-RPC notifications (fire-and-forget)
550/// * [`on_receive_dispatch`](Self::on_receive_dispatch) - Handle enums containing both requests and notifications
551/// * [`with_handler`](Self::with_handler) - Low-level primitive for maximum flexibility
552///
553/// ## Handler Ordering
554///
555/// Handlers are tried in the order you register them. The first handler that claims a message
556/// (by matching its type) will process it. Subsequent handlers won't see that message:
557///
558/// ```no_run
559/// # use agent_client_protocol_test::*;
560/// # use agent_client_protocol::{Dispatch, UntypedMessage};
561/// # use agent_client_protocol::schema::v1::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse};
562/// # async fn example() -> Result<(), agent_client_protocol::Error> {
563/// # let connection = mock_connection();
564/// connection
565/// .on_receive_request(async |req: InitializeRequest, responder, cx| {
566/// // This runs first for InitializeRequest
567/// responder.respond(InitializeResponse::make())
568/// }, agent_client_protocol::on_receive_request!())
569/// .on_receive_request(async |req: PromptRequest, responder, cx| {
570/// // This runs first for PromptRequest
571/// responder.respond(PromptResponse::make())
572/// }, agent_client_protocol::on_receive_request!())
573/// .on_receive_dispatch(async |msg: Dispatch, cx| {
574/// // This runs for any message not handled above
575/// msg.respond_with_error(agent_client_protocol::util::internal_error("unknown method"), cx)
576/// }, agent_client_protocol::on_receive_dispatch!())
577/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
578/// # Ok(())
579/// # }
580/// ```
581///
582/// # Event Loop and Concurrency
583///
584/// Understanding the event loop is critical for writing correct handlers.
585///
586/// ## The Event Loop
587///
588/// [`Builder`] runs all handler callbacks on a single async task - the event loop.
589/// While a handler is running, **the server cannot receive new messages**. This means
590/// any blocking or expensive work in your handlers will stall the entire connection.
591///
592/// To avoid blocking the event loop, use [`ConnectionTo::spawn`] to offload serious
593/// work to concurrent tasks:
594///
595/// ```no_run
596/// # use agent_client_protocol_test::*;
597/// # async fn example() -> Result<(), agent_client_protocol::Error> {
598/// # let connection = mock_connection();
599/// connection.on_receive_request(async |req: AnalyzeRequest, responder, cx| {
600/// // Clone cx for the spawned task
601/// cx.spawn({
602/// let connection = cx.clone();
603/// async move {
604/// let result = expensive_analysis(&req.data).await?;
605/// connection.send_notification(AnalysisComplete { result })?;
606/// Ok(())
607/// }
608/// })?;
609///
610/// // Respond immediately without blocking
611/// responder.respond(AnalysisStarted { job_id: 42 })
612/// }, agent_client_protocol::on_receive_request!())
613/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
614/// # Ok(())
615/// # }
616/// ```
617///
618/// Note that the entire connection runs within one async task, so parallelism must be
619/// managed explicitly using [`spawn`](ConnectionTo::spawn).
620///
621/// ## The Connection Context
622///
623/// Handler callbacks receive a context object (`cx`) for interacting with the connection:
624///
625/// * **For request handlers** - [`Responder<R>`] provides [`respond`](Responder::respond)
626/// to send the response, plus methods to send other messages
627/// * **For notification handlers** - [`ConnectionTo`] provides methods to send messages
628/// and spawn tasks
629///
630/// Both context types support:
631/// * [`send_request`](ConnectionTo::send_request) - Send requests to the other side
632/// * [`send_notification`](ConnectionTo::send_notification) - Send notifications
633/// * [`spawn`](ConnectionTo::spawn) - Run tasks concurrently without blocking the event loop
634///
635/// The [`SentRequest`] returned by `send_request` provides methods like
636/// [`on_receiving_result`](SentRequest::on_receiving_result) that help you
637/// avoid accidentally blocking the event loop while waiting for responses.
638///
639/// # Driving the Connection
640///
641/// After adding handlers, you must drive the connection using one of two modes:
642///
643/// ## Server Mode: `connect_to()`
644///
645/// Use [`connect_to`](Self::connect_to) when you only need to respond to incoming messages:
646///
647/// ```no_run
648/// # use agent_client_protocol_test::*;
649/// # async fn example() -> Result<(), agent_client_protocol::Error> {
650/// # let connection = mock_connection();
651/// connection
652/// .on_receive_request(async |req: MyRequest, responder, cx| {
653/// responder.respond(MyResponse { status: "ok".into() })
654/// }, agent_client_protocol::on_receive_request!())
655/// .connect_to(MockTransport) // Runs until connection closes or error occurs
656/// .await?;
657/// # Ok(())
658/// # }
659/// ```
660///
661/// The connection will process incoming messages and invoke your handlers until the
662/// connection is closed or an error occurs.
663///
664/// ## Client Mode: `connect_with()`
665///
666/// Use [`connect_with`](Self::connect_with) when you need to both handle incoming messages
667/// AND send your own requests/notifications:
668///
669/// ```no_run
670/// # use agent_client_protocol_test::*;
671/// # use agent_client_protocol::schema::v1::InitializeRequest;
672/// # async fn example() -> Result<(), agent_client_protocol::Error> {
673/// # let connection = mock_connection();
674/// connection
675/// .on_receive_request(async |req: MyRequest, responder, cx| {
676/// responder.respond(MyResponse { status: "ok".into() })
677/// }, agent_client_protocol::on_receive_request!())
678/// .connect_with(MockTransport, async |cx| {
679/// // You can send requests to the other side
680/// let response = cx.send_request(InitializeRequest::make())
681/// .block_task()
682/// .await?;
683///
684/// // And send notifications
685/// cx.send_notification(StatusUpdate { message: "ready".into() })?;
686///
687/// Ok(())
688/// })
689/// .await?;
690/// # Ok(())
691/// # }
692/// ```
693///
694/// The connection will serve incoming messages in the background while your client closure
695/// runs. When the closure returns, the connection shuts down.
696///
697/// # Example: Complete Agent
698///
699/// ```no_run
700/// # use agent_client_protocol::UntypedRole;
701/// # use agent_client_protocol::{Builder};
702/// # use agent_client_protocol::Stdio;
703/// # use agent_client_protocol::schema::v1::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse, SessionNotification};
704/// # async fn example() -> Result<(), agent_client_protocol::Error> {
705/// let transport = Stdio::new();
706///
707/// UntypedRole.builder()
708/// .name("my-agent") // Optional: for debugging logs
709/// .on_receive_request(async |init: InitializeRequest, responder, cx| {
710/// let response: InitializeResponse = todo!();
711/// responder.respond(response)
712/// }, agent_client_protocol::on_receive_request!())
713/// .on_receive_request(async |prompt: PromptRequest, responder, cx| {
714/// // You can send notifications while processing a request
715/// let notif: SessionNotification = todo!();
716/// cx.send_notification(notif)?;
717///
718/// // Then respond to the request
719/// let response: PromptResponse = todo!();
720/// responder.respond(response)
721/// }, agent_client_protocol::on_receive_request!())
722/// .connect_to(transport)
723/// .await?;
724/// # Ok(())
725/// # }
726/// ```
727#[must_use]
728#[derive(Debug)]
729pub struct Builder<Host: Role, Handler = NullHandler, Runner = NullRun>
730where
731 Handler: HandleDispatchFrom<Host::Counterpart>,
732 Runner: RunWithConnectionTo<Host::Counterpart>,
733{
734 /// My role.
735 host: Host,
736
737 /// Name of the connection, used in tracing logs.
738 name: Option<String>,
739
740 /// Handler for incoming messages.
741 handler: Handler,
742
743 /// Responder for background tasks.
744 responder: Runner,
745
746 /// Protocol version mode for the public API and wire compatibility layer.
747 protocol_mode: ProtocolMode,
748}
749
750fn default_protocol_mode<Host: Role>() -> ProtocolMode {
751 let role = TypeId::of::<Host>();
752
753 if role == TypeId::of::<Agent>() {
754 ProtocolMode::v1_agent()
755 } else if role == TypeId::of::<Client>() {
756 ProtocolMode::v1_client()
757 } else {
758 ProtocolMode::disabled()
759 }
760}
761
762impl<Host: Role> Builder<Host, NullHandler, NullRun> {
763 /// Create a new connection builder for the given role.
764 /// This type follows a builder pattern; use other methods to configure and then invoke
765 /// [`Self::connect_to`] (to use as a server) or [`Self::connect_with`] to use as a client.
766 pub fn new(role: Host) -> Self {
767 Self {
768 host: role,
769 name: None,
770 handler: NullHandler,
771 responder: NullRun,
772 protocol_mode: default_protocol_mode::<Host>(),
773 }
774 }
775}
776
777impl<Host: Role, Handler> Builder<Host, Handler, NullRun>
778where
779 Handler: HandleDispatchFrom<Host::Counterpart>,
780{
781 /// Create a new connection builder with the given handler.
782 pub fn new_with(role: Host, handler: Handler) -> Self {
783 Self {
784 host: role,
785 name: None,
786 handler,
787 responder: NullRun,
788 protocol_mode: default_protocol_mode::<Host>(),
789 }
790 }
791}
792
793impl<
794 Host: Role,
795 Handler: HandleDispatchFrom<Host::Counterpart>,
796 Runner: RunWithConnectionTo<Host::Counterpart>,
797> Builder<Host, Handler, Runner>
798{
799 /// Set the "name" of this connection -- used only for debugging logs.
800 pub fn name(mut self, name: impl ToString) -> Self {
801 self.name = Some(name.to_string());
802 self
803 }
804
805 pub(crate) fn v1_agent(mut self) -> Self {
806 self.protocol_mode = ProtocolMode::v1_agent();
807 self
808 }
809
810 pub(crate) fn v1_client(mut self) -> Self {
811 self.protocol_mode = ProtocolMode::v1_client();
812 self
813 }
814
815 #[cfg(feature = "unstable_protocol_v2")]
816 pub(crate) fn v2_agent(mut self) -> Self {
817 self.protocol_mode = ProtocolMode::v2_agent();
818 self
819 }
820
821 #[cfg(feature = "unstable_protocol_v2")]
822 pub(crate) fn v2_client(mut self) -> Self {
823 self.protocol_mode = ProtocolMode::v2_client();
824 self
825 }
826
827 /// Merge another [`Builder`] into this one.
828 ///
829 /// Prefer [`Self::on_receive_request`] or [`Self::on_receive_notification`].
830 /// This is a low-level method that is not intended for general use.
831 pub fn with_connection_builder(
832 self,
833 other: Builder<
834 Host,
835 impl HandleDispatchFrom<Host::Counterpart>,
836 impl RunWithConnectionTo<Host::Counterpart>,
837 >,
838 ) -> Builder<
839 Host,
840 impl HandleDispatchFrom<Host::Counterpart>,
841 impl RunWithConnectionTo<Host::Counterpart>,
842 > {
843 let Builder {
844 name: other_name,
845 handler: other_handler,
846 responder: other_responder,
847 protocol_mode: other_protocol_mode,
848 host: _,
849 } = other;
850 Builder {
851 host: self.host,
852 name: self.name,
853 handler: ChainedHandler::new(
854 self.handler,
855 NamedHandler::new(other_name, other_handler),
856 ),
857 responder: ChainRun::new(self.responder, other_responder),
858 protocol_mode: self.protocol_mode.merge(other_protocol_mode),
859 }
860 }
861
862 /// Add a new [`HandleDispatchFrom`] to the chain.
863 ///
864 /// Prefer [`Self::on_receive_request`] or [`Self::on_receive_notification`].
865 /// This is a low-level method that is not intended for general use.
866 pub fn with_handler(
867 self,
868 handler: impl HandleDispatchFrom<Host::Counterpart>,
869 ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner> {
870 Builder {
871 host: self.host,
872 name: self.name,
873 handler: ChainedHandler::new(self.handler, handler),
874 responder: self.responder,
875 protocol_mode: self.protocol_mode,
876 }
877 }
878
879 /// Add a new [`RunWithConnectionTo`] to the chain.
880 pub fn with_responder<Run1>(
881 self,
882 responder: Run1,
883 ) -> Builder<Host, Handler, impl RunWithConnectionTo<Host::Counterpart>>
884 where
885 Run1: RunWithConnectionTo<Host::Counterpart>,
886 {
887 Builder {
888 host: self.host,
889 name: self.name,
890 handler: self.handler,
891 responder: ChainRun::new(self.responder, responder),
892 protocol_mode: self.protocol_mode,
893 }
894 }
895
896 /// Enqueue a task to run once the connection is actively serving traffic.
897 #[track_caller]
898 pub fn with_spawned<F, Fut>(
899 self,
900 task: F,
901 ) -> Builder<Host, Handler, impl RunWithConnectionTo<Host::Counterpart>>
902 where
903 F: FnOnce(ConnectionTo<Host::Counterpart>) -> Fut + Send,
904 Fut: Future<Output = Result<(), crate::Error>> + Send,
905 {
906 let location = Location::caller();
907 self.with_responder(SpawnedRun::new(location, task))
908 }
909
910 /// Register a handler for messages that can be either requests OR notifications.
911 ///
912 /// Use this when you want to handle an enum type that contains both request and
913 /// notification variants. Your handler receives a [`Dispatch<Req, Notif>`] which
914 /// is an enum with two variants:
915 ///
916 /// - `Dispatch::Request(request, responder)` - A request with its response context
917 /// - `Dispatch::Notification(notification)` - A notification
918 /// - `Dispatch::Response(result, router)` - A response to a request we sent
919 ///
920 /// # Example
921 ///
922 /// ```no_run
923 /// # use agent_client_protocol_test::*;
924 /// # use agent_client_protocol::Dispatch;
925 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
926 /// # let connection = mock_connection();
927 /// connection.on_receive_dispatch(async |message: Dispatch<MyRequest, StatusUpdate>, _cx| {
928 /// match message {
929 /// Dispatch::Request(req, responder) => {
930 /// // Handle request and send response
931 /// responder.respond(MyResponse { status: "ok".into() })
932 /// }
933 /// Dispatch::Notification(notif) => {
934 /// // Handle notification (no response needed)
935 /// Ok(())
936 /// }
937 /// Dispatch::Response(result, router) => {
938 /// // Forward response to its destination
939 /// router.respond_with_result(result)
940 /// }
941 /// }
942 /// }, agent_client_protocol::on_receive_dispatch!())
943 /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
944 /// # Ok(())
945 /// # }
946 /// ```
947 ///
948 /// For most use cases, prefer [`on_receive_request`](Self::on_receive_request) or
949 /// [`on_receive_notification`](Self::on_receive_notification) which provide cleaner APIs
950 /// for handling requests or notifications separately.
951 ///
952 /// # Ordering
953 ///
954 /// This callback runs inside the dispatch loop and blocks further message processing
955 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
956 /// ordering guarantees and how to avoid deadlocks.
957 pub fn on_receive_dispatch<Req, Notif, F, T, ToFut>(
958 self,
959 op: F,
960 to_future_hack: ToFut,
961 ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
962 where
963 Host::Counterpart: HasPeer<Host::Counterpart>,
964 Req: JsonRpcRequest,
965 Notif: JsonRpcNotification,
966 F: AsyncFnMut(
967 Dispatch<Req, Notif>,
968 ConnectionTo<Host::Counterpart>,
969 ) -> Result<T, crate::Error>
970 + Send,
971 T: IntoHandled<Dispatch<Req, Notif>>,
972 ToFut: Fn(
973 &mut F,
974 Dispatch<Req, Notif>,
975 ConnectionTo<Host::Counterpart>,
976 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
977 + Send
978 + Sync,
979 {
980 let handler = MessageHandler::new(
981 self.host.counterpart(),
982 self.host.counterpart(),
983 op,
984 to_future_hack,
985 );
986 self.with_handler(handler)
987 }
988
989 /// Register a handler for JSON-RPC requests of type `Req`.
990 ///
991 /// Your handler receives two arguments:
992 /// 1. The request (type `Req`)
993 /// 2. A [`Responder<R, Req::Response>`] for sending the response
994 ///
995 /// The request context allows you to:
996 /// - Send the response with [`Responder::respond`]
997 /// - Send notifications to the client with [`ConnectionTo::send_notification`]
998 /// - Send requests to the client with [`ConnectionTo::send_request`]
999 ///
1000 /// # Example
1001 ///
1002 /// ```ignore
1003 /// # use agent_client_protocol::UntypedRole;
1004 /// # use agent_client_protocol::{Builder};
1005 /// # use agent_client_protocol::schema::v1::{PromptRequest, PromptResponse, SessionNotification};
1006 /// # fn example<R: agent_client_protocol::Role>(connection: Builder<R, impl agent_client_protocol::HandleMessageAs<R>>) {
1007 /// connection.on_receive_request(async |request: PromptRequest, responder, cx| {
1008 /// // Send a notification while processing
1009 /// let notif: SessionNotification = todo!();
1010 /// cx.send_notification(notif)?;
1011 ///
1012 /// // Do some work...
1013 /// let result = todo!("process the prompt");
1014 ///
1015 /// // Send the response
1016 /// let response: PromptResponse = todo!();
1017 /// responder.respond(response)
1018 /// }, agent_client_protocol::on_receive_request!());
1019 /// # }
1020 /// ```
1021 ///
1022 /// # Type Parameter
1023 ///
1024 /// `Req` can be either a single request type or an enum of multiple request types.
1025 /// See the [type-driven dispatch](Self#type-driven-message-dispatch) section for details.
1026 ///
1027 /// # Ordering
1028 ///
1029 /// This callback runs inside the dispatch loop and blocks further message processing
1030 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
1031 /// ordering guarantees and how to avoid deadlocks.
1032 pub fn on_receive_request<Req: JsonRpcRequest, F, T, ToFut>(
1033 self,
1034 op: F,
1035 to_future_hack: ToFut,
1036 ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
1037 where
1038 Host::Counterpart: HasPeer<Host::Counterpart>,
1039 F: AsyncFnMut(
1040 Req,
1041 Responder<Req::Response>,
1042 ConnectionTo<Host::Counterpart>,
1043 ) -> Result<T, crate::Error>
1044 + Send,
1045 T: IntoHandled<(Req, Responder<Req::Response>)>,
1046 ToFut: Fn(
1047 &mut F,
1048 Req,
1049 Responder<Req::Response>,
1050 ConnectionTo<Host::Counterpart>,
1051 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
1052 + Send
1053 + Sync,
1054 {
1055 let handler = RequestHandler::new(
1056 self.host.counterpart(),
1057 self.host.counterpart(),
1058 op,
1059 to_future_hack,
1060 );
1061 self.with_handler(handler)
1062 }
1063
1064 /// Register a handler for JSON-RPC notifications of type `Notif`.
1065 ///
1066 /// Notifications are fire-and-forget messages that don't expect a response.
1067 /// Your handler receives:
1068 /// 1. The notification (type `Notif`)
1069 /// 2. A [`ConnectionTo<R>`] for sending messages to the other side
1070 ///
1071 /// Unlike request handlers, you cannot send a response (notifications don't have IDs),
1072 /// but you can still send your own requests and notifications using the context.
1073 ///
1074 /// # Example
1075 ///
1076 /// ```no_run
1077 /// # use agent_client_protocol_test::*;
1078 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
1079 /// # let connection = mock_connection();
1080 /// connection.on_receive_notification(async |notif: SessionUpdate, cx| {
1081 /// // Process the notification
1082 /// update_session_state(¬if)?;
1083 ///
1084 /// // Optionally send a notification back
1085 /// cx.send_notification(StatusUpdate {
1086 /// message: "Acknowledged".into(),
1087 /// })?;
1088 ///
1089 /// Ok(())
1090 /// }, agent_client_protocol::on_receive_notification!())
1091 /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
1092 /// # Ok(())
1093 /// # }
1094 /// ```
1095 ///
1096 /// # Type Parameter
1097 ///
1098 /// `Notif` can be either a single notification type or an enum of multiple notification types.
1099 /// See the [type-driven dispatch](Self#type-driven-message-dispatch) section for details.
1100 ///
1101 /// # Ordering
1102 ///
1103 /// This callback runs inside the dispatch loop and blocks further message processing
1104 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
1105 /// ordering guarantees and how to avoid deadlocks.
1106 pub fn on_receive_notification<Notif, F, T, ToFut>(
1107 self,
1108 op: F,
1109 to_future_hack: ToFut,
1110 ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
1111 where
1112 Host::Counterpart: HasPeer<Host::Counterpart>,
1113 Notif: JsonRpcNotification,
1114 F: AsyncFnMut(Notif, ConnectionTo<Host::Counterpart>) -> Result<T, crate::Error> + Send,
1115 T: IntoHandled<(Notif, ConnectionTo<Host::Counterpart>)>,
1116 ToFut: Fn(
1117 &mut F,
1118 Notif,
1119 ConnectionTo<Host::Counterpart>,
1120 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
1121 + Send
1122 + Sync,
1123 {
1124 let handler = NotificationHandler::new(
1125 self.host.counterpart(),
1126 self.host.counterpart(),
1127 op,
1128 to_future_hack,
1129 );
1130 self.with_handler(handler)
1131 }
1132
1133 /// Register a handler for messages from a specific peer.
1134 ///
1135 /// This is similar to [`on_receive_dispatch`](Self::on_receive_dispatch), but allows
1136 /// specifying the source peer explicitly. This is useful when receiving messages
1137 /// from a peer that requires message transformation (e.g., unwrapping `SuccessorMessage`
1138 /// envelopes when receiving from an agent via a proxy).
1139 ///
1140 /// For the common case of receiving from the default counterpart, use
1141 /// [`on_receive_dispatch`](Self::on_receive_dispatch) instead.
1142 ///
1143 /// # Ordering
1144 ///
1145 /// This callback runs inside the dispatch loop and blocks further message processing
1146 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
1147 /// ordering guarantees and how to avoid deadlocks.
1148 pub fn on_receive_dispatch_from<
1149 Req: JsonRpcRequest,
1150 Notif: JsonRpcNotification,
1151 Peer: Role,
1152 F,
1153 T,
1154 ToFut,
1155 >(
1156 self,
1157 peer: Peer,
1158 op: F,
1159 to_future_hack: ToFut,
1160 ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
1161 where
1162 Host::Counterpart: HasPeer<Peer>,
1163 F: AsyncFnMut(
1164 Dispatch<Req, Notif>,
1165 ConnectionTo<Host::Counterpart>,
1166 ) -> Result<T, crate::Error>
1167 + Send,
1168 T: IntoHandled<Dispatch<Req, Notif>>,
1169 ToFut: Fn(
1170 &mut F,
1171 Dispatch<Req, Notif>,
1172 ConnectionTo<Host::Counterpart>,
1173 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
1174 + Send
1175 + Sync,
1176 {
1177 let handler = MessageHandler::new(self.host.counterpart(), peer, op, to_future_hack);
1178 self.with_handler(handler)
1179 }
1180
1181 /// Register a handler for JSON-RPC requests from a specific peer.
1182 ///
1183 /// This is similar to [`on_receive_request`](Self::on_receive_request), but allows
1184 /// specifying the source peer explicitly. This is useful when receiving messages
1185 /// from a peer that requires message transformation (e.g., unwrapping `SuccessorRequest`
1186 /// envelopes when receiving from an agent via a proxy).
1187 ///
1188 /// For the common case of receiving from the default counterpart, use
1189 /// [`on_receive_request`](Self::on_receive_request) instead.
1190 ///
1191 /// # Example
1192 ///
1193 /// ```ignore
1194 /// use agent_client_protocol::Agent;
1195 /// use agent_client_protocol::schema::v1::InitializeRequest;
1196 ///
1197 /// // Conductor receiving from agent direction - messages will be unwrapped from SuccessorMessage
1198 /// connection.on_receive_request_from(Agent, async |req: InitializeRequest, responder, cx| {
1199 /// // Handle the request
1200 /// responder.respond(InitializeResponse::make())
1201 /// })
1202 /// ```
1203 ///
1204 /// # Ordering
1205 ///
1206 /// This callback runs inside the dispatch loop and blocks further message processing
1207 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
1208 /// ordering guarantees and how to avoid deadlocks.
1209 pub fn on_receive_request_from<Req: JsonRpcRequest, Peer: Role, F, T, ToFut>(
1210 self,
1211 peer: Peer,
1212 op: F,
1213 to_future_hack: ToFut,
1214 ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
1215 where
1216 Host::Counterpart: HasPeer<Peer>,
1217 F: AsyncFnMut(
1218 Req,
1219 Responder<Req::Response>,
1220 ConnectionTo<Host::Counterpart>,
1221 ) -> Result<T, crate::Error>
1222 + Send,
1223 T: IntoHandled<(Req, Responder<Req::Response>)>,
1224 ToFut: Fn(
1225 &mut F,
1226 Req,
1227 Responder<Req::Response>,
1228 ConnectionTo<Host::Counterpart>,
1229 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
1230 + Send
1231 + Sync,
1232 {
1233 let handler = RequestHandler::new(self.host.counterpart(), peer, op, to_future_hack);
1234 self.with_handler(handler)
1235 }
1236
1237 /// Register a handler for JSON-RPC notifications from a specific peer.
1238 ///
1239 /// This is similar to [`on_receive_notification`](Self::on_receive_notification), but allows
1240 /// specifying the source peer explicitly. This is useful when receiving messages
1241 /// from a peer that requires message transformation (e.g., unwrapping `SuccessorNotification`
1242 /// envelopes when receiving from an agent via a proxy).
1243 ///
1244 /// For the common case of receiving from the default counterpart, use
1245 /// [`on_receive_notification`](Self::on_receive_notification) instead.
1246 ///
1247 /// # Ordering
1248 ///
1249 /// This callback runs inside the dispatch loop and blocks further message processing
1250 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
1251 /// ordering guarantees and how to avoid deadlocks.
1252 pub fn on_receive_notification_from<Notif: JsonRpcNotification, Peer: Role, F, T, ToFut>(
1253 self,
1254 peer: Peer,
1255 op: F,
1256 to_future_hack: ToFut,
1257 ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
1258 where
1259 Host::Counterpart: HasPeer<Peer>,
1260 F: AsyncFnMut(Notif, ConnectionTo<Host::Counterpart>) -> Result<T, crate::Error> + Send,
1261 T: IntoHandled<(Notif, ConnectionTo<Host::Counterpart>)>,
1262 ToFut: Fn(
1263 &mut F,
1264 Notif,
1265 ConnectionTo<Host::Counterpart>,
1266 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
1267 + Send
1268 + Sync,
1269 {
1270 let handler = NotificationHandler::new(self.host.counterpart(), peer, op, to_future_hack);
1271 self.with_handler(handler)
1272 }
1273
1274 /// Add an MCP server that will be added to all new sessions that are proxied through this connection.
1275 ///
1276 /// Only applicable to proxies.
1277 pub fn with_mcp_server(
1278 self,
1279 mcp_server: McpServer<Host::Counterpart, impl RunWithConnectionTo<Host::Counterpart>>,
1280 ) -> Builder<
1281 Host,
1282 impl HandleDispatchFrom<Host::Counterpart>,
1283 impl RunWithConnectionTo<Host::Counterpart>,
1284 >
1285 where
1286 Host::Counterpart: HasPeer<Agent> + HasPeer<Client>,
1287 {
1288 let (handler, responder) = mcp_server.into_handler_and_responder();
1289 self.with_handler(handler).with_responder(responder)
1290 }
1291
1292 /// Run in server mode with the provided transport.
1293 ///
1294 /// This drives the connection by continuously processing messages from the transport
1295 /// and dispatching them to your registered handlers. The connection will run until:
1296 /// - The transport closes (e.g., EOF on byte streams)
1297 /// - An error occurs
1298 /// - One of your handlers returns an error
1299 ///
1300 /// The transport is responsible for serializing and deserializing [`RawJsonRpcMessage`]
1301 /// values to/from the underlying I/O mechanism (byte streams, channels, etc.).
1302 ///
1303 /// Use this mode when you only need to respond to incoming messages and don't need
1304 /// to initiate your own requests. If you need to send requests to the other side,
1305 /// use [`connect_with`](Self::connect_with) instead.
1306 ///
1307 /// # Example: Byte Stream Transport
1308 ///
1309 /// ```no_run
1310 /// # use agent_client_protocol::UntypedRole;
1311 /// # use agent_client_protocol::{Builder};
1312 /// # use agent_client_protocol::Stdio;
1313 /// # use agent_client_protocol_test::*;
1314 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
1315 /// let transport = Stdio::new();
1316 ///
1317 /// UntypedRole.builder()
1318 /// .on_receive_request(async |req: MyRequest, responder, cx| {
1319 /// responder.respond(MyResponse { status: "ok".into() })
1320 /// }, agent_client_protocol::on_receive_request!())
1321 /// .connect_to(transport)
1322 /// .await?;
1323 /// # Ok(())
1324 /// # }
1325 /// ```
1326 pub async fn connect_to(
1327 self,
1328 transport: impl ConnectTo<Host> + 'static,
1329 ) -> Result<(), crate::Error> {
1330 self.connect_with(transport, async move |_cx| future::pending().await)
1331 .await
1332 }
1333
1334 /// Run the connection until the provided closure completes.
1335 ///
1336 /// This drives the connection by:
1337 /// 1. Running your registered handlers in the background to process incoming messages
1338 /// 2. Executing your `main_fn` closure with a [`ConnectionTo<R>`] for sending requests/notifications
1339 ///
1340 /// The connection stays active until your `main_fn` returns, then shuts down gracefully.
1341 /// If the connection closes unexpectedly before `main_fn` completes, this returns an error.
1342 ///
1343 /// Use this mode when you need to initiate communication (send requests/notifications)
1344 /// in addition to responding to incoming messages. For server-only mode where you just
1345 /// respond to messages, use [`connect_to`](Self::connect_to) instead.
1346 ///
1347 /// # Example
1348 ///
1349 /// ```no_run
1350 /// # use agent_client_protocol::UntypedRole;
1351 /// # use agent_client_protocol::{Builder};
1352 /// # use agent_client_protocol::ByteStreams;
1353 /// # use agent_client_protocol::schema::v1::InitializeRequest;
1354 /// # use agent_client_protocol::Stdio;
1355 /// # use agent_client_protocol_test::*;
1356 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
1357 /// let transport = Stdio::new();
1358 ///
1359 /// UntypedRole.builder()
1360 /// .on_receive_request(async |req: MyRequest, responder, cx| {
1361 /// // Handle incoming requests in the background
1362 /// responder.respond(MyResponse { status: "ok".into() })
1363 /// }, agent_client_protocol::on_receive_request!())
1364 /// .connect_with(transport, async |cx| {
1365 /// // Initialize the protocol
1366 /// let init_response = cx.send_request(InitializeRequest::make())
1367 /// .block_task()
1368 /// .await?;
1369 ///
1370 /// // Send more requests...
1371 /// let result = cx.send_request(MyRequest {})
1372 /// .block_task()
1373 /// .await?;
1374 ///
1375 /// // When this closure returns, the connection shuts down
1376 /// Ok(())
1377 /// })
1378 /// .await?;
1379 /// # Ok(())
1380 /// # }
1381 /// ```
1382 ///
1383 /// # Parameters
1384 ///
1385 /// - `main_fn`: Your client logic. Receives a [`ConnectionTo<R>`] for sending messages.
1386 ///
1387 /// # Errors
1388 ///
1389 /// Returns an error if the connection closes before `main_fn` completes.
1390 pub async fn connect_with<R>(
1391 self,
1392 transport: impl ConnectTo<Host> + 'static,
1393 main_fn: impl AsyncFnOnce(ConnectionTo<Host::Counterpart>) -> Result<R, crate::Error>,
1394 ) -> Result<R, crate::Error> {
1395 let (_, future) = self.into_connection_and_future(transport, main_fn);
1396 future.await
1397 }
1398
1399 /// Helper that returns a [`ConnectionTo<R>`] and a future that runs this connection until `main_fn` returns.
1400 fn into_connection_and_future<R>(
1401 self,
1402 transport: impl ConnectTo<Host> + 'static,
1403 main_fn: impl AsyncFnOnce(ConnectionTo<Host::Counterpart>) -> Result<R, crate::Error>,
1404 ) -> (
1405 ConnectionTo<Host::Counterpart>,
1406 impl Future<Output = Result<R, crate::Error>>,
1407 ) {
1408 let Self {
1409 name,
1410 handler,
1411 responder,
1412 host: me,
1413 protocol_mode,
1414 } = self;
1415
1416 let (outgoing_tx, outgoing_rx) = mpsc::unbounded();
1417 let (new_task_tx, new_task_rx) = mpsc::unbounded();
1418 let (dynamic_handler_tx, dynamic_handler_rx) = mpsc::unbounded();
1419 let connection = ConnectionTo::new(
1420 me.counterpart(),
1421 outgoing_tx,
1422 new_task_tx,
1423 dynamic_handler_tx,
1424 );
1425
1426 // Convert transport into server - this returns a channel for us to use
1427 // and a future that runs the transport
1428 let transport_component = crate::DynConnectTo::new(transport);
1429 let (transport_channel, transport_future) = transport_component.into_channel_and_future();
1430 let spawn_result = connection.spawn(transport_future);
1431
1432 // Destructure the channel endpoints
1433 let Channel {
1434 rx: transport_incoming_rx,
1435 tx: transport_outgoing_tx,
1436 } = transport_channel;
1437
1438 let (reply_tx, reply_rx) = mpsc::unbounded();
1439 let protocol_compat = ProtocolCompat::new(protocol_mode);
1440
1441 let future = crate::util::instrument_with_connection_name(name, {
1442 let connection = connection.clone();
1443 async move {
1444 let () = spawn_result?;
1445
1446 let background = async {
1447 futures::try_join!(
1448 // Protocol layer: OutgoingMessage -> RawJsonRpcMessage
1449 outgoing_actor::outgoing_protocol_actor(
1450 outgoing_rx,
1451 reply_tx.clone(),
1452 transport_outgoing_tx,
1453 protocol_compat.clone(),
1454 ),
1455 // Protocol layer: RawJsonRpcMessage -> handler/reply routing
1456 incoming_actor::incoming_protocol_actor(
1457 me.counterpart(),
1458 &connection,
1459 transport_incoming_rx,
1460 dynamic_handler_rx,
1461 reply_rx,
1462 handler,
1463 protocol_compat,
1464 ),
1465 task_actor::task_actor(new_task_rx, &connection),
1466 responder.run_with_connection_to(connection.clone()),
1467 )?;
1468 Ok(())
1469 };
1470
1471 crate::util::run_until(Box::pin(background), Box::pin(main_fn(connection.clone())))
1472 .await
1473 }
1474 });
1475
1476 (connection, future)
1477 }
1478}
1479
1480impl<R, H, Run> ConnectTo<R::Counterpart> for Builder<R, H, Run>
1481where
1482 R: Role,
1483 H: HandleDispatchFrom<R::Counterpart> + 'static,
1484 Run: RunWithConnectionTo<R::Counterpart> + 'static,
1485{
1486 async fn connect_to(self, client: impl ConnectTo<R>) -> Result<(), crate::Error> {
1487 Builder::connect_to(self, client).await
1488 }
1489}
1490
1491/// The payload sent through the response oneshot channel.
1492///
1493/// Includes the response value and an optional ack channel for dispatch loop
1494/// synchronization.
1495pub(crate) struct ResponsePayload {
1496 /// The response result - either the JSON value or an error.
1497 pub(crate) result: Result<serde_json::Value, crate::Error>,
1498
1499 /// Optional acknowledgment channel for dispatch loop synchronization.
1500 ///
1501 /// When present, the receiver must send on this channel to signal that
1502 /// response processing is complete, allowing the dispatch loop to continue
1503 /// to the next message.
1504 ///
1505 /// This is `None` for error paths where the response is sent directly
1506 /// (e.g., when the outgoing channel is broken) rather than through the
1507 /// normal dispatch loop flow.
1508 pub(crate) ack_tx: Option<oneshot::Sender<()>>,
1509}
1510
1511impl std::fmt::Debug for ResponsePayload {
1512 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1513 f.debug_struct("ResponsePayload")
1514 .field("result", &self.result)
1515 .field("ack_tx", &self.ack_tx.as_ref().map(|_| "..."))
1516 .finish()
1517 }
1518}
1519
1520/// Message sent to the incoming actor for reply subscription management.
1521enum ReplyMessage {
1522 /// Subscribe to receive a response for the given request id.
1523 /// When a response with this id arrives, it will be sent through the oneshot
1524 /// along with an ack channel that must be signaled when processing is complete.
1525 /// The method name is stored to allow routing responses through typed handlers.
1526 Subscribe {
1527 id: RequestId,
1528
1529 /// id of the peer this request was sent to
1530 role_id: RoleId,
1531
1532 /// (original) method of the request -- the actual request may have been transformed
1533 /// to a successor method, but this will reflect the method of the wrapped request
1534 method: String,
1535
1536 sender: oneshot::Sender<ResponsePayload>,
1537
1538 #[cfg(feature = "unstable_cancel_request")]
1539 cancellation_disarm: SentRequestCancellationDisarm,
1540 },
1541}
1542
1543impl std::fmt::Debug for ReplyMessage {
1544 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1545 match self {
1546 ReplyMessage::Subscribe { id, method, .. } => f
1547 .debug_struct("Subscribe")
1548 .field("id", id)
1549 .field("method", method)
1550 .finish(),
1551 }
1552 }
1553}
1554
1555/// A request-local marker that is set when the peer asks to cancel the request.
1556///
1557/// Request handlers can get this handle from [`Responder::cancellation`] and
1558/// use it from spawned work to stop long-running request processing
1559/// cooperatively.
1560#[cfg(feature = "unstable_cancel_request")]
1561#[derive(Clone)]
1562pub struct RequestCancellation {
1563 state: Arc<RequestCancellationState>,
1564}
1565
1566#[cfg(feature = "unstable_cancel_request")]
1567struct RequestCancellationState {
1568 cancelled: AtomicBool,
1569 signal_tx: Mutex<Option<oneshot::Sender<()>>>,
1570 signal_rx: future::Shared<BoxFuture<'static, ()>>,
1571}
1572
1573#[cfg(feature = "unstable_cancel_request")]
1574impl RequestCancellation {
1575 fn new() -> Self {
1576 let (signal_tx, signal_rx) = oneshot::channel();
1577 let signal_rx = signal_rx.map(|_| ()).boxed().shared();
1578 Self {
1579 state: Arc::new(RequestCancellationState {
1580 cancelled: AtomicBool::new(false),
1581 signal_tx: Mutex::new(Some(signal_tx)),
1582 signal_rx,
1583 }),
1584 }
1585 }
1586
1587 /// Wait until the peer sends `$/cancel_request` for this request.
1588 ///
1589 /// If cancellation was already requested, this returns immediately.
1590 pub async fn cancelled(&self) {
1591 self.state.signal_rx.clone().await;
1592 }
1593
1594 /// Run request work until it completes or the peer asks to cancel it.
1595 ///
1596 /// If cancellation is requested first, this returns
1597 /// [`Error::request_cancelled`]. This is a convenience for request handlers
1598 /// that want to respond with the normal result or the standard
1599 /// cancellation error.
1600 ///
1601 /// When cancellation wins, `future` is dropped: work stops at its next
1602 /// await point, partial results are lost, and any cleanup must happen in
1603 /// `Drop` implementations. Handlers that need to flush partial results or
1604 /// run async cleanup should instead watch [`cancelled`](Self::cancelled)
1605 /// or poll [`is_cancelled`](Self::is_cancelled) from inside the work.
1606 ///
1607 /// [`Error::request_cancelled`]: crate::Error::request_cancelled
1608 pub async fn run_until_cancelled<T>(
1609 &self,
1610 future: impl std::future::Future<Output = Result<T, crate::Error>>,
1611 ) -> Result<T, crate::Error> {
1612 if self.is_cancelled() {
1613 return Err(crate::Error::request_cancelled());
1614 }
1615
1616 match future::select(pin!(future), pin!(self.cancelled())).await {
1617 Either::Left((result, _)) => result,
1618 Either::Right(((), _)) => Err(crate::Error::request_cancelled()),
1619 }
1620 }
1621
1622 /// Returns whether the peer has already requested cancellation.
1623 #[must_use]
1624 pub fn is_cancelled(&self) -> bool {
1625 self.state.cancelled.load(Ordering::Acquire)
1626 }
1627
1628 fn cancel(&self) {
1629 if self.state.cancelled.swap(true, Ordering::AcqRel) {
1630 return;
1631 }
1632
1633 let signal_tx = self
1634 .state
1635 .signal_tx
1636 .lock()
1637 .expect("request cancellation signal mutex poisoned")
1638 .take();
1639
1640 // Complete the oneshot outside the lock: it wakes waiters, and
1641 // arbitrary waker code must not observe the lock held.
1642 if let Some(signal_tx) = signal_tx {
1643 let _ = signal_tx.send(());
1644 }
1645 }
1646}
1647
1648#[cfg(feature = "unstable_cancel_request")]
1649impl Debug for RequestCancellation {
1650 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1651 formatter
1652 .debug_struct("RequestCancellation")
1653 .field("is_cancelled", &self.is_cancelled())
1654 .finish_non_exhaustive()
1655 }
1656}
1657
1658/// Per-request cancellation state tracked by [`RequestCancellationRegistry`].
1659///
1660/// The full [`RequestCancellation`] marker (with its wakeup machinery) is only
1661/// allocated once a handler asks for it via [`Responder::cancellation`]; until
1662/// then an incoming `$/cancel_request` just flips the entry to `Cancelled`.
1663/// This keeps the per-request cost of the registry to a single map entry.
1664#[cfg(feature = "unstable_cancel_request")]
1665#[derive(Debug)]
1666enum RequestCancellationEntry {
1667 /// The request is in flight; no marker handed out, no cancellation yet.
1668 Armed,
1669 /// `$/cancel_request` arrived before a marker was handed out.
1670 Cancelled,
1671 /// A marker was handed out via [`Responder::cancellation`].
1672 Marker(RequestCancellation),
1673}
1674
1675/// A registered request's cancellation state, tagged with the generation of
1676/// its registration.
1677///
1678/// The generation distinguishes a registration from earlier ones that used
1679/// the same request ID, so that when a (protocol-violating) peer reuses the
1680/// ID of a request that is still in flight, the stale request's responder can
1681/// neither remove nor observe the cancellation state of the newer request.
1682#[cfg(feature = "unstable_cancel_request")]
1683#[derive(Debug)]
1684struct RequestCancellationSlot {
1685 generation: u64,
1686 entry: RequestCancellationEntry,
1687}
1688
1689#[cfg(feature = "unstable_cancel_request")]
1690#[derive(Debug, Default)]
1691struct RequestCancellationRegistryInner {
1692 slots: HashMap<RequestId, RequestCancellationSlot>,
1693 next_generation: u64,
1694}
1695
1696#[cfg(feature = "unstable_cancel_request")]
1697#[derive(Clone, Debug, Default)]
1698struct RequestCancellationRegistry {
1699 inner: Arc<Mutex<RequestCancellationRegistryInner>>,
1700}
1701
1702#[cfg(not(feature = "unstable_cancel_request"))]
1703#[derive(Clone, Debug, Default)]
1704struct RequestCancellationRegistry;
1705
1706#[cfg(feature = "unstable_cancel_request")]
1707#[derive(Debug)]
1708struct ResponderCancellation {
1709 id: RequestId,
1710 generation: u64,
1711 registry: RequestCancellationRegistry,
1712}
1713
1714#[cfg(not(feature = "unstable_cancel_request"))]
1715#[derive(Debug)]
1716struct ResponderCancellation;
1717
1718#[cfg(feature = "unstable_cancel_request")]
1719impl RequestCancellationRegistry {
1720 fn new() -> Self {
1721 Self::default()
1722 }
1723
1724 fn register(&self, id: &RequestId) -> ResponderCancellation {
1725 let generation = {
1726 let mut inner = self
1727 .inner
1728 .lock()
1729 .expect("request cancellation registry mutex poisoned");
1730 let generation = inner.next_generation;
1731 inner.next_generation += 1;
1732 if inner
1733 .slots
1734 .insert(
1735 id.clone(),
1736 RequestCancellationSlot {
1737 generation,
1738 entry: RequestCancellationEntry::Armed,
1739 },
1740 )
1741 .is_some()
1742 {
1743 tracing::debug!(
1744 ?id,
1745 "peer reused the ID of a request that is still in flight"
1746 );
1747 }
1748 generation
1749 };
1750 ResponderCancellation {
1751 id: id.clone(),
1752 generation,
1753 registry: self.clone(),
1754 }
1755 }
1756
1757 /// Get the cancellation marker for a registered request, creating it on
1758 /// first use. Repeated calls return markers that share the same state.
1759 ///
1760 /// Exception: when the registration is stale (a protocol-violating peer
1761 /// reused this request ID and the slot now belongs to a newer request, or
1762 /// was already removed by it), every call returns a fresh *detached*
1763 /// marker. Detached markers can never fire, and detached markers from
1764 /// repeated calls do not share state with each other.
1765 fn marker(&self, id: &RequestId, generation: u64) -> RequestCancellation {
1766 let mut inner = self
1767 .inner
1768 .lock()
1769 .expect("request cancellation registry mutex poisoned");
1770 let Some(slot) = inner.slots.get_mut(id) else {
1771 // The slot lives as long as the responder that owns it, so this
1772 // is only reachable if the peer reused this request ID and the
1773 // newer request's responder already removed the replacement slot.
1774 // Hand out a detached marker rather than panicking.
1775 return RequestCancellation::new();
1776 };
1777 if slot.generation != generation {
1778 // The peer reused this request ID while the request was still in
1779 // flight, and the slot now belongs to the newer request. Hand the
1780 // stale responder a detached marker instead of cross-wiring the
1781 // two requests' cancellation states.
1782 return RequestCancellation::new();
1783 }
1784 let entry = &mut slot.entry;
1785 match entry {
1786 RequestCancellationEntry::Marker(marker) => marker.clone(),
1787 RequestCancellationEntry::Armed => {
1788 let marker = RequestCancellation::new();
1789 *entry = RequestCancellationEntry::Marker(marker.clone());
1790 marker
1791 }
1792 RequestCancellationEntry::Cancelled => {
1793 // No one can be waiting on a marker that did not exist yet,
1794 // so firing it while holding the registry lock is fine.
1795 let marker = RequestCancellation::new();
1796 marker.cancel();
1797 *entry = RequestCancellationEntry::Marker(marker.clone());
1798 marker
1799 }
1800 }
1801 }
1802
1803 fn cancel_if_requested(&self, dispatch: &Dispatch) -> Result<bool, crate::Error> {
1804 let Some(request_id) = cancellation_request_id(dispatch)? else {
1805 return Ok(false);
1806 };
1807 Ok(self.cancel(&request_id))
1808 }
1809
1810 /// Mark whichever request currently owns `request_id` as cancelled.
1811 fn cancel(&self, request_id: &RequestId) -> bool {
1812 let marker = {
1813 let mut inner = self
1814 .inner
1815 .lock()
1816 .expect("request cancellation registry mutex poisoned");
1817 let Some(slot) = inner.slots.get_mut(request_id) else {
1818 return false;
1819 };
1820 let entry = &mut slot.entry;
1821 match entry {
1822 RequestCancellationEntry::Marker(marker) => marker.clone(),
1823 RequestCancellationEntry::Cancelled => return true,
1824 RequestCancellationEntry::Armed => {
1825 *entry = RequestCancellationEntry::Cancelled;
1826 return true;
1827 }
1828 }
1829 };
1830
1831 // Fire the marker outside the registry lock: waking waiters runs
1832 // arbitrary waker code that must not observe the lock held.
1833 marker.cancel();
1834 true
1835 }
1836
1837 /// Remove the slot for `request_id`, but only if it still belongs to the
1838 /// registration identified by `generation`.
1839 fn remove(&self, request_id: &RequestId, generation: u64) {
1840 let mut inner = self
1841 .inner
1842 .lock()
1843 .expect("request cancellation registry mutex poisoned");
1844 if inner
1845 .slots
1846 .get(request_id)
1847 .is_some_and(|slot| slot.generation == generation)
1848 {
1849 inner.slots.remove(request_id);
1850 }
1851 }
1852}
1853
1854#[cfg(not(feature = "unstable_cancel_request"))]
1855impl RequestCancellationRegistry {
1856 fn new() -> Self {
1857 Self
1858 }
1859
1860 #[expect(
1861 clippy::unused_self,
1862 reason = "feature-disabled stub mirrors the real registry API"
1863 )]
1864 fn register(&self, _id: &RequestId) -> ResponderCancellation {
1865 ResponderCancellation
1866 }
1867
1868 #[expect(
1869 clippy::unused_self,
1870 clippy::unnecessary_wraps,
1871 reason = "feature-disabled stub mirrors the real registry API"
1872 )]
1873 fn cancel_if_requested(&self, _dispatch: &Dispatch) -> Result<bool, crate::Error> {
1874 Ok(false)
1875 }
1876}
1877
1878#[cfg(feature = "unstable_cancel_request")]
1879impl ResponderCancellation {
1880 fn cancellation(&self) -> RequestCancellation {
1881 self.registry.marker(&self.id, self.generation)
1882 }
1883}
1884
1885#[cfg(feature = "unstable_cancel_request")]
1886impl Drop for ResponderCancellation {
1887 fn drop(&mut self) {
1888 self.registry.remove(&self.id, self.generation);
1889 }
1890}
1891
1892#[cfg(feature = "unstable_cancel_request")]
1893fn cancellation_request_id(dispatch: &Dispatch) -> Result<Option<RequestId>, crate::Error> {
1894 let Dispatch::Notification(message) = dispatch else {
1895 return Ok(None);
1896 };
1897 cancellation_request_id_from_message(message)
1898}
1899
1900#[cfg(feature = "unstable_cancel_request")]
1901fn cancellation_request_id_from_message(
1902 message: &UntypedMessage,
1903) -> Result<Option<RequestId>, crate::Error> {
1904 let (method, params) = peel_successor_envelopes(&message.method, &message.params);
1905 if !crate::schema::v1::CancelRequestNotification::matches_method(method) {
1906 return Ok(None);
1907 }
1908
1909 let notification = crate::schema::v1::CancelRequestNotification::parse_message(method, params)?;
1910 Ok(Some(notification.request_id))
1911}
1912
1913/// Peel any [`SuccessorMessage`] envelopes off a notification by reference,
1914/// returning the innermost method and params.
1915///
1916/// This only peeks at the envelope's `method`/`params` fields instead of
1917/// deserializing the envelope, for two reasons:
1918///
1919/// - It avoids deep-cloning the params of every wrapped notification on the
1920/// hot dispatch path just to inspect the inner method name.
1921/// - It is deliberately lenient: a malformed envelope is left as-is here and
1922/// flows on to the handler chain, which is responsible for reporting it.
1923///
1924/// [`SuccessorMessage`]: crate::schema::SuccessorMessage
1925fn peel_successor_envelopes<'message>(
1926 mut method: &'message str,
1927 mut params: &'message serde_json::Value,
1928) -> (&'message str, &'message serde_json::Value) {
1929 while crate::schema::SuccessorMessage::<UntypedMessage>::matches_method(method) {
1930 let Some(inner_method) = params.get("method").and_then(serde_json::Value::as_str) else {
1931 break;
1932 };
1933 method = inner_method;
1934 params = params.get("params").unwrap_or(&serde_json::Value::Null);
1935 }
1936 (method, params)
1937}
1938
1939/// Whether a notification is a `$/cancel_request`, even when it is still
1940/// wrapped in `_proxy/successor` envelopes.
1941///
1942/// `$/cancel_request` is connection-scoped: its `requestId` was allocated on
1943/// the connection the notification arrived over and means nothing on any
1944/// other connection. Generic forwarding code (such as
1945/// [`ConnectionTo::send_proxied_message_to`]) uses this check to drop the raw
1946/// notification instead of tunneling it across a hop; the cancellation still
1947/// propagates because [`forward_response_to`](SentRequest::forward_response_to)
1948/// re-issues it with the forwarded request's own ID.
1949///
1950/// Checking a notification whose method is not the successor envelope is a
1951/// plain method-name comparison. Only successor-wrapped notifications pay for
1952/// a serialization to peel the envelope.
1953#[cfg(feature = "unstable_cancel_request")]
1954#[must_use]
1955pub fn is_cancel_request_notification<N: JsonRpcNotification>(notification: &N) -> bool {
1956 let method = notification.method();
1957 if crate::schema::v1::CancelRequestNotification::matches_method(method) {
1958 return true;
1959 }
1960 if !crate::schema::SuccessorMessage::<UntypedMessage>::matches_method(method) {
1961 return false;
1962 }
1963
1964 match notification.to_untyped_message() {
1965 Ok(untyped) => {
1966 let (method, _params) = peel_successor_envelopes(&untyped.method, &untyped.params);
1967 crate::schema::v1::CancelRequestNotification::matches_method(method)
1968 }
1969 Err(error) => {
1970 tracing::debug!(
1971 ?error,
1972 "failed to inspect successor-wrapped notification for cancellation"
1973 );
1974 false
1975 }
1976 }
1977}
1978
1979/// Whether the dispatch is a protocol-level (`$/`-prefixed) notification,
1980/// possibly wrapped in a [`SuccessorMessage`] envelope.
1981///
1982/// Unhandled protocol-level notifications are ignored rather than rejected
1983/// with a method-not-found error. This is deliberately *not* feature-gated:
1984/// protocol-level notifications are optional by design, so a peer that sends
1985/// `$/cancel_request` must be able to interoperate with an SDK built without
1986/// `unstable_cancel_request` (which simply won't act on it).
1987///
1988/// A handler that explicitly declines with `retry: true` takes precedence
1989/// over this fallback: the notification is queued for newly registered
1990/// dynamic handlers like any other retried message.
1991///
1992/// [`SuccessorMessage`]: crate::schema::SuccessorMessage
1993fn is_protocol_level_notification(dispatch: &Dispatch) -> bool {
1994 let Dispatch::Notification(message) = dispatch else {
1995 return false;
1996 };
1997 let (method, _params) = peel_successor_envelopes(&message.method, &message.params);
1998 method.starts_with("$/")
1999}
2000
2001/// Messages send to be serialized over the transport.
2002#[derive(Debug)]
2003enum OutgoingMessage {
2004 /// Send a request to the server.
2005 Request {
2006 /// id assigned to this request (generated by sender)
2007 id: RequestId,
2008
2009 /// the original method
2010 method: String,
2011
2012 /// the peer we sent this to
2013 role_id: RoleId,
2014
2015 /// the message to send; this may have a distinct method
2016 /// depending on the peer
2017 untyped: UntypedMessage,
2018
2019 /// where to send the response when it arrives (includes ack channel)
2020 response_tx: oneshot::Sender<ResponsePayload>,
2021
2022 #[cfg(feature = "unstable_cancel_request")]
2023 cancellation_disarm: SentRequestCancellationDisarm,
2024 },
2025
2026 /// Send a notification to the server.
2027 Notification {
2028 /// the message to send; this may have a distinct method
2029 /// depending on the peer
2030 untyped: UntypedMessage,
2031 },
2032
2033 /// Send a response to a message from the server
2034 Response {
2035 id: RequestId,
2036
2037 /// Method of the incoming request this response completes.
2038 method: String,
2039
2040 response: Result<serde_json::Value, crate::Error>,
2041 },
2042
2043 /// Send a generalized error message
2044 Error { error: crate::Error },
2045}
2046
2047/// Return type from JrHandler; indicates whether the request was handled or not.
2048#[must_use]
2049#[derive(Debug)]
2050pub enum Handled<T> {
2051 /// The message was handled
2052 Yes,
2053
2054 /// The message was not handled; returns the original value.
2055 ///
2056 /// If `retry` is true,
2057 No {
2058 /// The message to be passed to subsequent handlers
2059 /// (typically the original message, but it may have been
2060 /// mutated.)
2061 message: T,
2062
2063 /// If true, request the message to be queued and retried with
2064 /// dynamic handlers as they are added.
2065 ///
2066 /// This is used for managing session updates since the dynamic
2067 /// handler for a session cannot be added until the response to the
2068 /// new session request has been processed and there may be updates
2069 /// that get processed at the same time.
2070 retry: bool,
2071 },
2072}
2073
2074/// Trait for converting handler return values into [`Handled`].
2075///
2076/// This trait allows handlers to return either `()` (which becomes `Handled::Yes`)
2077/// or an explicit `Handled<T>` value for more control over handler propagation.
2078pub trait IntoHandled<T> {
2079 /// Convert this value into a `Handled<T>`.
2080 fn into_handled(self) -> Handled<T>;
2081}
2082
2083impl<T> IntoHandled<T> for () {
2084 fn into_handled(self) -> Handled<T> {
2085 Handled::Yes
2086 }
2087}
2088
2089impl<T> IntoHandled<T> for Handled<T> {
2090 fn into_handled(self) -> Handled<T> {
2091 self
2092 }
2093}
2094
2095/// Connection context for sending messages and spawning tasks.
2096///
2097/// This is the primary handle for interacting with the JSON-RPC connection from
2098/// within handler callbacks. You can use it to:
2099///
2100/// * Send requests and notifications to the other side
2101/// * Spawn concurrent tasks that run alongside the connection
2102/// * Respond to requests (via [`Responder`] which wraps this)
2103///
2104/// # Cloning
2105///
2106/// `ConnectionTo` is cheaply cloneable - all clones refer to the same underlying connection.
2107/// This makes it easy to share across async tasks.
2108///
2109/// # Event Loop and Concurrency
2110///
2111/// Handler callbacks run on the event loop, which means the connection cannot process new
2112/// messages while your handler is running. Use [`spawn`](Self::spawn) to offload any
2113/// expensive or blocking work to concurrent tasks.
2114///
2115/// See the [Event Loop and Concurrency](Builder#event-loop-and-concurrency) section
2116/// for more details.
2117#[derive(Clone, Debug)]
2118pub struct ConnectionTo<Counterpart: Role> {
2119 counterpart: Counterpart,
2120 message_tx: OutgoingMessageTx,
2121 task_tx: TaskTx,
2122 dynamic_handler_tx: mpsc::UnboundedSender<DynamicHandlerMessage<Counterpart>>,
2123}
2124
2125impl<Counterpart: Role> ConnectionTo<Counterpart> {
2126 fn new(
2127 counterpart: Counterpart,
2128 message_tx: mpsc::UnboundedSender<OutgoingMessage>,
2129 task_tx: mpsc::UnboundedSender<Task>,
2130 dynamic_handler_tx: mpsc::UnboundedSender<DynamicHandlerMessage<Counterpart>>,
2131 ) -> Self {
2132 Self {
2133 counterpart,
2134 message_tx,
2135 task_tx,
2136 dynamic_handler_tx,
2137 }
2138 }
2139
2140 /// Return the counterpart role this connection is talking to.
2141 pub fn counterpart(&self) -> Counterpart {
2142 self.counterpart.clone()
2143 }
2144
2145 /// Spawns a task that will run so long as the JSON-RPC connection is being served.
2146 ///
2147 /// This is the primary mechanism for offloading expensive work from handler callbacks
2148 /// to avoid blocking the event loop. Spawned tasks run concurrently with the connection,
2149 /// allowing the server to continue processing messages.
2150 ///
2151 /// # Event Loop
2152 ///
2153 /// Handler callbacks run on the event loop, which cannot process new messages while
2154 /// your handler is running. Use `spawn` for any expensive operations:
2155 ///
2156 /// ```no_run
2157 /// # use agent_client_protocol_test::*;
2158 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
2159 /// # let connection = mock_connection();
2160 /// connection.on_receive_request(async |req: ProcessRequest, responder, cx| {
2161 /// // Clone cx for the spawned task
2162 /// cx.spawn({
2163 /// let connection = cx.clone();
2164 /// async move {
2165 /// let result = expensive_operation(&req.data).await?;
2166 /// connection.send_notification(ProcessComplete { result })?;
2167 /// Ok(())
2168 /// }
2169 /// })?;
2170 ///
2171 /// // Respond immediately
2172 /// responder.respond(ProcessResponse { result: "started".into() })
2173 /// }, agent_client_protocol::on_receive_request!())
2174 /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
2175 /// # Ok(())
2176 /// # }
2177 /// ```
2178 ///
2179 /// # Errors
2180 ///
2181 /// If the spawned task returns an error, the entire server will shut down.
2182 #[track_caller]
2183 pub fn spawn(
2184 &self,
2185 task: impl IntoFuture<Output = Result<(), crate::Error>, IntoFuture: Send + 'static>,
2186 ) -> Result<(), crate::Error> {
2187 let location = std::panic::Location::caller();
2188 let task = task.into_future();
2189 Task::new(location, task).spawn(&self.task_tx)
2190 }
2191
2192 /// Spawn a JSON-RPC connection in the background and return a [`ConnectionTo`] for sending messages to it.
2193 ///
2194 /// This is useful for creating multiple connections that communicate with each other,
2195 /// such as implementing proxy patterns or connecting to multiple backend services.
2196 ///
2197 /// # Arguments
2198 ///
2199 /// - `builder`: The connection builder with handlers configured
2200 /// - `transport`: The transport component to connect to
2201 ///
2202 /// # Returns
2203 ///
2204 /// A `ConnectionTo` that you can use to send requests and notifications to the spawned connection.
2205 ///
2206 /// # Example: Proxying to a backend connection
2207 ///
2208 /// ```
2209 /// # use agent_client_protocol::UntypedRole;
2210 /// # use agent_client_protocol::{Builder, ConnectionTo};
2211 /// # use agent_client_protocol_test::*;
2212 /// # async fn example(cx: ConnectionTo<UntypedRole>) -> Result<(), agent_client_protocol::Error> {
2213 /// // Set up a backend connection builder
2214 /// let backend = UntypedRole.builder()
2215 /// .on_receive_request(async |req: MyRequest, responder, _cx| {
2216 /// responder.respond(MyResponse { status: "ok".into() })
2217 /// }, agent_client_protocol::on_receive_request!());
2218 ///
2219 /// // Spawn it and get a context to send requests to it
2220 /// let backend_connection = cx.spawn_connection(backend, MockTransport)?;
2221 ///
2222 /// // Now you can forward requests to the backend
2223 /// let response = backend_connection.send_request(MyRequest {}).block_task().await?;
2224 /// # Ok(())
2225 /// # }
2226 /// ```
2227 #[track_caller]
2228 pub fn spawn_connection<R: Role>(
2229 &self,
2230 builder: Builder<
2231 R,
2232 impl HandleDispatchFrom<R::Counterpart> + 'static,
2233 impl RunWithConnectionTo<R::Counterpart> + 'static,
2234 >,
2235 transport: impl ConnectTo<R> + 'static,
2236 ) -> Result<ConnectionTo<R::Counterpart>, crate::Error> {
2237 let (connection, future) =
2238 builder.into_connection_and_future(transport, |_| std::future::pending());
2239 Task::new(std::panic::Location::caller(), future).spawn(&self.task_tx)?;
2240 Ok(connection)
2241 }
2242
2243 /// Send a request/notification and forward the response appropriately.
2244 ///
2245 /// The request context's response type matches the request's response type,
2246 /// enabling type-safe message forwarding.
2247 pub fn send_proxied_message<Req: JsonRpcRequest<Response: Send>, Notif: JsonRpcNotification>(
2248 &self,
2249 message: Dispatch<Req, Notif>,
2250 ) -> Result<(), crate::Error>
2251 where
2252 Counterpart: HasPeer<Counterpart>,
2253 {
2254 self.send_proxied_message_to(self.counterpart(), message)
2255 }
2256
2257 /// Send a request/notification and forward the response appropriately.
2258 ///
2259 /// The request context's response type matches the request's response type,
2260 /// enabling type-safe message forwarding.
2261 ///
2262 /// When the `unstable_cancel_request` feature is enabled, `$/cancel_request`
2263 /// notifications are *not* forwarded: their `requestId` refers to a request
2264 /// on the connection they arrived over and would be meaningless to `peer`.
2265 /// Cancellation instead propagates hop by hop, because the responders
2266 /// passed to [`forward_response_to`](SentRequest::forward_response_to)
2267 /// observe it and re-issue the cancellation with the forwarded request's
2268 /// own ID.
2269 pub fn send_proxied_message_to<
2270 Peer: Role,
2271 Req: JsonRpcRequest<Response: Send>,
2272 Notif: JsonRpcNotification,
2273 >(
2274 &self,
2275 peer: Peer,
2276 message: Dispatch<Req, Notif>,
2277 ) -> Result<(), crate::Error>
2278 where
2279 Counterpart: HasPeer<Peer>,
2280 {
2281 match message {
2282 Dispatch::Request(request, responder) => self
2283 .send_request_to(peer, request)
2284 .forward_response_to(responder),
2285 Dispatch::Notification(notification) => {
2286 // `$/cancel_request` is connection-scoped: its `requestId` was
2287 // allocated on the connection the notification arrived over
2288 // and means nothing to `peer`. The cancellation has already
2289 // been recorded on this connection's responder markers, and
2290 // `forward_response_to` re-issues it for the forwarded request
2291 // with the correct per-hop ID, so drop the raw notification
2292 // instead of tunneling a meaningless ID across the hop.
2293 #[cfg(feature = "unstable_cancel_request")]
2294 if is_cancel_request_notification(¬ification) {
2295 tracing::debug!(
2296 "not forwarding hop-scoped `$/cancel_request` notification across proxy hop"
2297 );
2298 return Ok(());
2299 }
2300 self.send_notification_to(peer, notification)
2301 }
2302 Dispatch::Response(result, router) => {
2303 // Responses are forwarded directly to their destination
2304 router.respond_with_result(result)
2305 }
2306 }
2307 }
2308
2309 /// Send an outgoing request and return a [`SentRequest`] for handling the reply.
2310 ///
2311 /// The returned [`SentRequest`] provides methods for receiving the response without
2312 /// blocking the event loop:
2313 ///
2314 /// * [`on_receiving_result`](SentRequest::on_receiving_result) - Schedule
2315 /// a callback to run when the response arrives (doesn't block the event loop)
2316 /// * [`block_task`](SentRequest::block_task) - Block the current task until the response
2317 /// arrives (only safe in spawned tasks, not in handlers)
2318 ///
2319 /// # Anti-Footgun Design
2320 ///
2321 /// The API intentionally makes it difficult to block on the result directly to prevent
2322 /// the common mistake of blocking the event loop while waiting for a response:
2323 ///
2324 /// ```compile_fail
2325 /// # use agent_client_protocol_test::*;
2326 /// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
2327 /// // ❌ This doesn't compile - prevents blocking the event loop
2328 /// let response = cx.send_request(MyRequest {}).await?;
2329 /// # Ok(())
2330 /// # }
2331 /// ```
2332 ///
2333 /// ```no_run
2334 /// # use agent_client_protocol_test::*;
2335 /// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
2336 /// // ✅ Option 1: Schedule callback (safe in handlers)
2337 /// cx.send_request(MyRequest {})
2338 /// .on_receiving_result(async |result| {
2339 /// // Handle the response
2340 /// Ok(())
2341 /// })?;
2342 ///
2343 /// // ✅ Option 2: Block in spawned task (safe because task is concurrent)
2344 /// cx.spawn({
2345 /// let cx = cx.clone();
2346 /// async move {
2347 /// let response = cx.send_request(MyRequest {})
2348 /// .block_task()
2349 /// .await?;
2350 /// // Process response...
2351 /// Ok(())
2352 /// }
2353 /// })?;
2354 /// # Ok(())
2355 /// # }
2356 /// ```
2357 /// Send an outgoing request to the default counterpart peer.
2358 ///
2359 /// This is a convenience method that sends to the counterpart role `R`.
2360 /// For explicit control over the target peer, use [`send_request_to`](Self::send_request_to).
2361 pub fn send_request<Req: JsonRpcRequest>(&self, request: Req) -> SentRequest<Req::Response>
2362 where
2363 Counterpart: HasPeer<Counterpart>,
2364 {
2365 self.send_request_to(self.counterpart.clone(), request)
2366 }
2367
2368 /// Send an outgoing request to a specific peer.
2369 ///
2370 /// The message will be transformed according to the [`HasPeer`](crate::role::HasPeer)
2371 /// implementation before being sent.
2372 pub fn send_request_to<Peer: Role, Req: JsonRpcRequest>(
2373 &self,
2374 peer: Peer,
2375 request: Req,
2376 ) -> SentRequest<Req::Response>
2377 where
2378 Counterpart: HasPeer<Peer>,
2379 {
2380 let method = request.method().to_string();
2381 let id = RequestId::Str(uuid::Uuid::new_v4().to_string());
2382 let (response_tx, response_rx) = oneshot::channel();
2383 let role_id = peer.role_id();
2384 let remote_style = self.counterpart.remote_style(peer);
2385 #[cfg(feature = "unstable_cancel_request")]
2386 let cancellation =
2387 SentRequestCancellation::new(self.message_tx.clone(), remote_style, id.clone());
2388 match remote_style.transform_outgoing_message(request) {
2389 Ok(untyped) => {
2390 // Transform the message for the target role
2391 let message = OutgoingMessage::Request {
2392 id: id.clone(),
2393 method: method.clone(),
2394 role_id,
2395 untyped,
2396 response_tx,
2397 #[cfg(feature = "unstable_cancel_request")]
2398 cancellation_disarm: cancellation.disarm_handle(),
2399 };
2400
2401 match self.message_tx.unbounded_send(message) {
2402 Ok(()) => (),
2403 Err(error) => {
2404 #[cfg(feature = "unstable_cancel_request")]
2405 cancellation.disarm();
2406
2407 let OutgoingMessage::Request {
2408 method,
2409 response_tx,
2410 ..
2411 } = error.into_inner()
2412 else {
2413 unreachable!();
2414 };
2415
2416 response_tx
2417 .send(ResponsePayload {
2418 result: Err(crate::util::internal_error(format!(
2419 "failed to send outgoing request `{method}"
2420 ))),
2421 ack_tx: None,
2422 })
2423 .unwrap();
2424 }
2425 }
2426 }
2427
2428 Err(err) => {
2429 #[cfg(feature = "unstable_cancel_request")]
2430 cancellation.disarm();
2431
2432 response_tx
2433 .send(ResponsePayload {
2434 result: Err(crate::util::internal_error(format!(
2435 "failed to create untyped request for `{method}`: {err}"
2436 ))),
2437 ack_tx: None,
2438 })
2439 .unwrap();
2440 }
2441 }
2442
2443 SentRequest::new(
2444 id,
2445 method.clone(),
2446 self.task_tx.clone(),
2447 response_rx,
2448 #[cfg(feature = "unstable_cancel_request")]
2449 cancellation,
2450 )
2451 .map(move |json| <Req::Response>::from_value(&method, json))
2452 }
2453
2454 /// Send an outgoing notification to the default counterpart peer (no reply expected).
2455 ///
2456 /// Notifications are fire-and-forget messages that don't have IDs and don't expect responses.
2457 /// This method sends the notification immediately and returns.
2458 ///
2459 /// This is a convenience method that sends to the counterpart role `R`.
2460 /// For explicit control over the target peer, use [`send_notification_to`](Self::send_notification_to).
2461 ///
2462 /// ```no_run
2463 /// # use agent_client_protocol_test::*;
2464 /// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::Agent>) -> Result<(), agent_client_protocol::Error> {
2465 /// cx.send_notification(StatusUpdate {
2466 /// message: "Processing...".into(),
2467 /// })?;
2468 /// # Ok(())
2469 /// # }
2470 /// ```
2471 pub fn send_notification<N: JsonRpcNotification>(
2472 &self,
2473 notification: N,
2474 ) -> Result<(), crate::Error>
2475 where
2476 Counterpart: HasPeer<Counterpart>,
2477 {
2478 self.send_notification_to(self.counterpart.clone(), notification)
2479 }
2480
2481 /// Send an outgoing notification to a specific peer (no reply expected).
2482 ///
2483 /// The message will be transformed according to the [`HasPeer`](crate::role::HasPeer)
2484 /// implementation before being sent.
2485 pub fn send_notification_to<Peer: Role, N: JsonRpcNotification>(
2486 &self,
2487 peer: Peer,
2488 notification: N,
2489 ) -> Result<(), crate::Error>
2490 where
2491 Counterpart: HasPeer<Peer>,
2492 {
2493 let remote_style = self.counterpart.remote_style(peer);
2494 tracing::debug!(
2495 role = std::any::type_name::<Counterpart>(),
2496 peer = std::any::type_name::<Peer>(),
2497 notification_type = std::any::type_name::<N>(),
2498 ?remote_style,
2499 original_method = notification.method(),
2500 "send_notification_to"
2501 );
2502 let transformed = remote_style.transform_outgoing_message(notification)?;
2503 tracing::debug!(
2504 transformed_method = %transformed.method,
2505 "send_notification_to transformed"
2506 );
2507 send_raw_message(
2508 &self.message_tx,
2509 OutgoingMessage::Notification {
2510 untyped: transformed,
2511 },
2512 )
2513 }
2514
2515 /// Send a `$/cancel_request` notification for an arbitrary request ID to
2516 /// the default counterpart peer.
2517 ///
2518 /// Prefer [`SentRequest::cancel`] when you have the request handle: it
2519 /// already knows the correct peer, request ID, and proxy wrapping. Use this
2520 /// low-level method only when implementing custom routing with a request ID
2521 /// that is valid on this connection.
2522 #[cfg(feature = "unstable_cancel_request")]
2523 pub fn send_cancel_request(
2524 &self,
2525 request_id: impl Into<crate::schema::v1::RequestId>,
2526 ) -> Result<(), crate::Error>
2527 where
2528 Counterpart: HasPeer<Counterpart>,
2529 {
2530 self.send_cancel_request_to(self.counterpart.clone(), request_id)
2531 }
2532
2533 /// Send a `$/cancel_request` notification for an arbitrary request ID to a
2534 /// specific peer.
2535 ///
2536 /// Prefer [`SentRequest::cancel`] when you have the request handle: it
2537 /// already knows the correct peer, request ID, and proxy wrapping. Use this
2538 /// low-level method only when implementing custom routing with a request ID
2539 /// that is valid on the target peer's connection.
2540 #[cfg(feature = "unstable_cancel_request")]
2541 pub fn send_cancel_request_to<Peer: Role>(
2542 &self,
2543 peer: Peer,
2544 request_id: impl Into<crate::schema::v1::RequestId>,
2545 ) -> Result<(), crate::Error>
2546 where
2547 Counterpart: HasPeer<Peer>,
2548 {
2549 self.send_notification_to(
2550 peer,
2551 crate::schema::v1::CancelRequestNotification::new(request_id),
2552 )
2553 }
2554
2555 /// Send an error notification (no reply expected).
2556 pub fn send_error_notification(&self, error: crate::Error) -> Result<(), crate::Error> {
2557 send_raw_message(&self.message_tx, OutgoingMessage::Error { error })
2558 }
2559
2560 /// Register a dynamic message handler, used to intercept messages specific to a particular session
2561 /// or some similar modal thing.
2562 ///
2563 /// Dynamic message handlers are called first for every incoming message.
2564 ///
2565 /// If they decline to handle the message, then the message is passed to the regular registered handlers.
2566 ///
2567 /// The handler will stay registered until the returned registration guard is dropped.
2568 pub fn add_dynamic_handler(
2569 &self,
2570 handler: impl HandleDispatchFrom<Counterpart> + 'static,
2571 ) -> Result<DynamicHandlerRegistration<Counterpart>, crate::Error> {
2572 let uuid = Uuid::new_v4();
2573 self.dynamic_handler_tx
2574 .unbounded_send(DynamicHandlerMessage::AddDynamicHandler(
2575 uuid,
2576 Box::new(handler),
2577 ))
2578 .map_err(crate::util::internal_error)?;
2579
2580 Ok(DynamicHandlerRegistration::new(uuid, self.clone()))
2581 }
2582
2583 fn remove_dynamic_handler(&self, uuid: Uuid) {
2584 // Ignore errors
2585 drop(
2586 self.dynamic_handler_tx
2587 .unbounded_send(DynamicHandlerMessage::RemoveDynamicHandler(uuid)),
2588 );
2589 }
2590}
2591
2592#[derive(Clone, Debug)]
2593pub struct DynamicHandlerRegistration<R: Role> {
2594 uuid: Uuid,
2595 cx: ConnectionTo<R>,
2596}
2597
2598impl<R: Role> DynamicHandlerRegistration<R> {
2599 fn new(uuid: Uuid, cx: ConnectionTo<R>) -> Self {
2600 Self { uuid, cx }
2601 }
2602
2603 /// Prevents the dynamic handler from being removed when dropped.
2604 pub fn run_indefinitely(self) {
2605 std::mem::forget(self);
2606 }
2607}
2608
2609impl<R: Role> Drop for DynamicHandlerRegistration<R> {
2610 fn drop(&mut self) {
2611 self.cx.remove_dynamic_handler(self.uuid);
2612 }
2613}
2614
2615/// The context to respond to an incoming request.
2616///
2617/// This context is provided to request handlers and serves a dual role:
2618///
2619/// 1. **Respond to the request** - Use [`respond`](Self::respond) or
2620/// [`respond_with_result`](Self::respond_with_result) to send the response
2621/// 2. **Send other messages** - Use the [`ConnectionTo`] parameter passed to your
2622/// handler, which provides [`send_request`](`ConnectionTo::send_request`),
2623/// [`send_notification`](`ConnectionTo::send_notification`), and
2624/// [`spawn`](`ConnectionTo::spawn`)
2625///
2626/// # Example
2627///
2628/// ```no_run
2629/// # use agent_client_protocol_test::*;
2630/// # async fn example() -> Result<(), agent_client_protocol::Error> {
2631/// # let connection = mock_connection();
2632/// connection.on_receive_request(async |req: ProcessRequest, responder, cx| {
2633/// // Send a notification while processing
2634/// cx.send_notification(StatusUpdate {
2635/// message: "processing".into(),
2636/// })?;
2637///
2638/// // Do some work...
2639/// let result = process(&req.data)?;
2640///
2641/// // Respond to the request
2642/// responder.respond(ProcessResponse { result })
2643/// }, agent_client_protocol::on_receive_request!())
2644/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
2645/// # Ok(())
2646/// # }
2647/// ```
2648///
2649/// # Event Loop Considerations
2650///
2651/// Like all handlers, request handlers run on the event loop. Use
2652/// [`spawn`](ConnectionTo::spawn) for expensive operations to avoid blocking
2653/// the connection.
2654///
2655/// See the [Event Loop and Concurrency](Builder#event-loop-and-concurrency)
2656/// section for more details.
2657#[must_use]
2658pub struct Responder<T: JsonRpcResponse = serde_json::Value> {
2659 /// The method of the request.
2660 method: String,
2661
2662 /// The `id` of the message we are replying to.
2663 id: RequestId,
2664
2665 /// Request-local cancellation state.
2666 cancellation: ResponderCancellation,
2667
2668 /// Function to send the response to its destination.
2669 ///
2670 /// For incoming requests: serializes to JSON and sends over the wire.
2671 /// For incoming responses: sends to the waiting oneshot channel.
2672 send_fn: Box<dyn FnOnce(Result<T, crate::Error>) -> Result<(), crate::Error> + Send>,
2673}
2674
2675impl<T: JsonRpcResponse> std::fmt::Debug for Responder<T> {
2676 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2677 f.debug_struct("Responder")
2678 .field("method", &self.method)
2679 .field("id", &self.id)
2680 .field("response_type", &std::any::type_name::<T>())
2681 .finish_non_exhaustive()
2682 }
2683}
2684
2685impl Responder<serde_json::Value> {
2686 /// Create a new request context for an incoming request.
2687 ///
2688 /// The response will be serialized to JSON and sent over the wire.
2689 fn new(
2690 message_tx: OutgoingMessageTx,
2691 method: String,
2692 id: RequestId,
2693 cancellation_registry: &RequestCancellationRegistry,
2694 ) -> Self {
2695 let id_clone = id.clone();
2696 let method_clone = method.clone();
2697 let cancellation = cancellation_registry.register(&id);
2698 Self {
2699 method,
2700 id,
2701 cancellation,
2702 send_fn: Box::new(move |response: Result<serde_json::Value, crate::Error>| {
2703 send_raw_message(
2704 &message_tx,
2705 OutgoingMessage::Response {
2706 id: id_clone,
2707 method: method_clone,
2708 response,
2709 },
2710 )
2711 }),
2712 }
2713 }
2714
2715 /// Cast this request context to a different response type.
2716 ///
2717 /// The provided type `T` will be serialized to JSON before sending.
2718 pub fn cast<T: JsonRpcResponse>(self) -> Responder<T> {
2719 self.wrap_params(move |method, value| match value {
2720 Ok(value) => T::into_json(value, method),
2721 Err(e) => Err(e),
2722 })
2723 }
2724}
2725
2726impl<T: JsonRpcResponse> Responder<T> {
2727 /// Method of the incoming request
2728 #[must_use]
2729 pub fn method(&self) -> &str {
2730 &self.method
2731 }
2732
2733 /// ID of the incoming request/response as a JSON value
2734 #[must_use]
2735 pub fn id(&self) -> serde_json::Value {
2736 crate::util::id_to_json(&self.id)
2737 }
2738
2739 /// Returns the cancellation marker for this request.
2740 ///
2741 /// The marker is set when the peer sends `$/cancel_request` for this
2742 /// request's JSON-RPC ID. Cancellation is cooperative: handlers should use
2743 /// the marker to stop long-running work and then decide whether to respond
2744 /// with [`Error::request_cancelled`] or partial data.
2745 ///
2746 /// [`Error::request_cancelled`]: crate::Error::request_cancelled
2747 #[cfg(feature = "unstable_cancel_request")]
2748 #[must_use]
2749 pub fn cancellation(&self) -> RequestCancellation {
2750 self.cancellation.cancellation()
2751 }
2752
2753 /// Convert to a `Responder` that expects a JSON value
2754 /// and which checks (dynamically) that the JSON value it receives
2755 /// can be converted to `T`.
2756 pub fn erase_to_json(self) -> Responder<serde_json::Value> {
2757 self.wrap_params(|method, value| T::from_value(method, value?))
2758 }
2759
2760 /// Return a new Responder with a different method name.
2761 pub fn wrap_method(self, method: String) -> Responder<T> {
2762 Responder {
2763 method,
2764 id: self.id,
2765 cancellation: self.cancellation,
2766 send_fn: self.send_fn,
2767 }
2768 }
2769
2770 /// Return a new Responder that expects a response of type U.
2771 ///
2772 /// `wrap_fn` will be invoked with the method name and the result to transform
2773 /// type `U` into type `T` before sending.
2774 pub fn wrap_params<U: JsonRpcResponse>(
2775 self,
2776 wrap_fn: impl FnOnce(&str, Result<U, crate::Error>) -> Result<T, crate::Error> + Send + 'static,
2777 ) -> Responder<U> {
2778 let method = self.method.clone();
2779 Responder {
2780 method: self.method,
2781 id: self.id,
2782 cancellation: self.cancellation,
2783 send_fn: Box::new(move |input: Result<U, crate::Error>| {
2784 let t_value = wrap_fn(&method, input);
2785 (self.send_fn)(t_value)
2786 }),
2787 }
2788 }
2789
2790 /// Respond to the JSON-RPC request with either a value (`Ok`) or an error (`Err`).
2791 pub fn respond_with_result(
2792 self,
2793 response: Result<T, crate::Error>,
2794 ) -> Result<(), crate::Error> {
2795 tracing::debug!(id = ?self.id, "respond called");
2796 (self.send_fn)(response)
2797 }
2798
2799 /// Respond to the JSON-RPC request with a value.
2800 pub fn respond(self, response: T) -> Result<(), crate::Error> {
2801 self.respond_with_result(Ok(response))
2802 }
2803
2804 /// Respond to the JSON-RPC request with an internal error containing a message.
2805 pub fn respond_with_internal_error(self, message: impl ToString) -> Result<(), crate::Error> {
2806 self.respond_with_error(crate::util::internal_error(message))
2807 }
2808
2809 /// Respond to the JSON-RPC request with an error.
2810 pub fn respond_with_error(self, error: crate::Error) -> Result<(), crate::Error> {
2811 tracing::debug!(id = ?self.id, ?error, "respond_with_error called");
2812 self.respond_with_result(Err(error))
2813 }
2814}
2815
2816/// Context for handling an incoming JSON-RPC response.
2817///
2818/// This is the response-side counterpart to [`Responder`]. While `Responder` handles
2819/// incoming requests (where you send a response over the wire), `ResponseRouter` handles
2820/// incoming responses (where you route the response to a local task waiting for it).
2821///
2822/// Both are fundamentally "sinks" that push the message through a `send_fn`, but they
2823/// represent different points in the message lifecycle and carry different metadata.
2824///
2825/// # Drop Behavior
2826///
2827/// Dropping a `ResponseRouter` without responding (for example, from a
2828/// dispatch handler that claims a [`Dispatch::Response`]) discards the
2829/// response: the local awaiter observes the response as never received. The
2830/// request still counts as settled — when the `unstable_cancel_request`
2831/// feature is enabled, routing a response this far disarms the originating
2832/// [`SentRequest`]'s drop-time auto-cancellation even if the router is never
2833/// invoked, since the peer has already answered.
2834#[must_use]
2835pub struct ResponseRouter<T: JsonRpcResponse = serde_json::Value> {
2836 /// The method of the original request.
2837 method: String,
2838
2839 /// The `id` of the original request.
2840 id: RequestId,
2841
2842 /// The RoleId to which the original request was sent
2843 /// (and hence from which the reply is expected).
2844 role_id: RoleId,
2845
2846 /// Function to send the response to the waiting task.
2847 send_fn: Box<dyn FnOnce(Result<T, crate::Error>) -> Result<(), crate::Error> + Send>,
2848}
2849
2850impl<T: JsonRpcResponse> std::fmt::Debug for ResponseRouter<T> {
2851 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2852 f.debug_struct("ResponseRouter")
2853 .field("method", &self.method)
2854 .field("id", &self.id)
2855 .field("response_type", &std::any::type_name::<T>())
2856 .finish_non_exhaustive()
2857 }
2858}
2859
2860impl ResponseRouter<serde_json::Value> {
2861 /// Create a new response context for routing a response to a local awaiter.
2862 ///
2863 /// When `respond_with_result` is called, the response is sent through the oneshot
2864 /// channel to the code that originally sent the request. If that receiver was
2865 /// dropped, the response is discarded because there is no local awaiter left.
2866 pub(crate) fn new(
2867 method: String,
2868 id: RequestId,
2869 role_id: RoleId,
2870 sender: oneshot::Sender<ResponsePayload>,
2871 #[cfg(feature = "unstable_cancel_request")]
2872 cancellation_disarm: SentRequestCancellationDisarm,
2873 ) -> Self {
2874 let response_method = method.clone();
2875 let response_id = id.clone();
2876 // A response for the request reached this router, so the request is
2877 // settled from the peer's perspective and a `$/cancel_request` could
2878 // only ever be redundant. Disarm immediately so handlers may retain
2879 // the router without leaving auto-cancellation armed.
2880 #[cfg(feature = "unstable_cancel_request")]
2881 cancellation_disarm.disarm();
2882 Self {
2883 method,
2884 id,
2885 role_id,
2886 send_fn: Box::new(move |response: Result<serde_json::Value, crate::Error>| {
2887 if sender
2888 .send(ResponsePayload {
2889 result: response,
2890 ack_tx: None,
2891 })
2892 .is_err()
2893 {
2894 tracing::debug!(
2895 method = %response_method,
2896 id = ?response_id,
2897 "dropped response because local receiver was gone"
2898 );
2899 }
2900 Ok(())
2901 }),
2902 }
2903 }
2904
2905 /// Cast this response context to a different response type.
2906 ///
2907 /// The provided type `T` will be serialized to JSON before sending.
2908 pub fn cast<T: JsonRpcResponse>(self) -> ResponseRouter<T> {
2909 self.wrap_params(move |method, value| match value {
2910 Ok(value) => T::into_json(value, method),
2911 Err(e) => Err(e),
2912 })
2913 }
2914}
2915
2916impl<T: JsonRpcResponse> ResponseRouter<T> {
2917 /// Method of the original request
2918 #[must_use]
2919 pub fn method(&self) -> &str {
2920 &self.method
2921 }
2922
2923 /// ID of the original request as a JSON value
2924 #[must_use]
2925 pub fn id(&self) -> serde_json::Value {
2926 crate::util::id_to_json(&self.id)
2927 }
2928
2929 /// The peer to which the original request was sent.
2930 ///
2931 /// This is the peer from which we expect to receive the response.
2932 #[must_use]
2933 pub fn role_id(&self) -> RoleId {
2934 self.role_id.clone()
2935 }
2936
2937 /// Convert to a `ResponseRouter` that expects a JSON value
2938 /// and which checks (dynamically) that the JSON value it receives
2939 /// can be converted to `T`.
2940 pub fn erase_to_json(self) -> ResponseRouter<serde_json::Value> {
2941 self.wrap_params(|method, value| T::from_value(method, value?))
2942 }
2943
2944 /// Return a new ResponseRouter that expects a response of type U.
2945 ///
2946 /// `wrap_fn` will be invoked with the method name and the result to transform
2947 /// type `U` into type `T` before sending.
2948 fn wrap_params<U: JsonRpcResponse>(
2949 self,
2950 wrap_fn: impl FnOnce(&str, Result<U, crate::Error>) -> Result<T, crate::Error> + Send + 'static,
2951 ) -> ResponseRouter<U> {
2952 let method = self.method.clone();
2953 ResponseRouter {
2954 method: self.method,
2955 id: self.id,
2956 role_id: self.role_id,
2957 send_fn: Box::new(move |input: Result<U, crate::Error>| {
2958 let t_value = wrap_fn(&method, input);
2959 (self.send_fn)(t_value)
2960 }),
2961 }
2962 }
2963
2964 /// Complete the response by sending the result to the waiting task.
2965 pub fn respond_with_result(
2966 self,
2967 response: Result<T, crate::Error>,
2968 ) -> Result<(), crate::Error> {
2969 tracing::debug!(id = ?self.id, "response routed to awaiter");
2970 (self.send_fn)(response)
2971 }
2972
2973 /// Complete the response by sending a value to the waiting task.
2974 pub fn respond(self, response: T) -> Result<(), crate::Error> {
2975 self.respond_with_result(Ok(response))
2976 }
2977
2978 /// Complete the response by sending an internal error to the waiting task.
2979 pub fn respond_with_internal_error(self, message: impl ToString) -> Result<(), crate::Error> {
2980 self.respond_with_error(crate::util::internal_error(message))
2981 }
2982
2983 /// Complete the response by sending an error to the waiting task.
2984 pub fn respond_with_error(self, error: crate::Error) -> Result<(), crate::Error> {
2985 tracing::debug!(id = ?self.id, ?error, "error routed to awaiter");
2986 self.respond_with_result(Err(error))
2987 }
2988}
2989
2990/// Common bounds for any JSON-RPC message.
2991///
2992/// # Derive Macro
2993///
2994/// For simple message types, you can use the `JsonRpcRequest` or `JsonRpcNotification` derive macros
2995/// which will implement both `JsonRpcMessage` and the respective trait. See [`JsonRpcRequest`] and
2996/// [`JsonRpcNotification`] for examples.
2997pub trait JsonRpcMessage: 'static + Debug + Sized + Send + Clone {
2998 /// Check if this message type matches the given method name.
2999 fn matches_method(method: &str) -> bool;
3000
3001 /// The method name for the message.
3002 fn method(&self) -> &str;
3003
3004 /// Convert this message into an untyped message.
3005 fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error>;
3006
3007 /// Parse this type from a method name and parameters.
3008 ///
3009 /// Returns an error if the method doesn't match or deserialization fails.
3010 /// Callers should use `matches_method` first to check if this type handles the method.
3011 fn parse_message(method: &str, params: &impl Serialize) -> Result<Self, crate::Error>;
3012}
3013
3014/// Defines the "payload" of a successful response to a JSON-RPC request.
3015///
3016/// # Derive Macro
3017///
3018/// Use `#[derive(JsonRpcResponse)]` to automatically implement this trait:
3019///
3020/// ```ignore
3021/// use agent_client_protocol::JsonRpcResponse;
3022/// use serde::{Serialize, Deserialize};
3023///
3024/// #[derive(Debug, Serialize, Deserialize, JsonRpcResponse)]
3025/// #[response(method = "_hello")]
3026/// struct HelloResponse {
3027/// greeting: String,
3028/// }
3029/// ```
3030pub trait JsonRpcResponse: 'static + Debug + Sized + Send + Clone {
3031 /// Convert this message into a JSON value.
3032 fn into_json(self, method: &str) -> Result<serde_json::Value, crate::Error>;
3033
3034 /// Parse a JSON value into the response type.
3035 fn from_value(method: &str, value: serde_json::Value) -> Result<Self, crate::Error>;
3036}
3037
3038impl JsonRpcResponse for serde_json::Value {
3039 fn from_value(_method: &str, value: serde_json::Value) -> Result<Self, crate::Error> {
3040 Ok(value)
3041 }
3042
3043 fn into_json(self, _method: &str) -> Result<serde_json::Value, crate::Error> {
3044 Ok(self)
3045 }
3046}
3047
3048/// A struct that represents a notification (JSON-RPC message that does not expect a response).
3049///
3050/// # Derive Macro
3051///
3052/// Use `#[derive(JsonRpcNotification)]` to automatically implement both `JsonRpcMessage` and `JsonRpcNotification`:
3053///
3054/// ```ignore
3055/// use agent_client_protocol::JsonRpcNotification;
3056/// use serde::{Serialize, Deserialize};
3057///
3058/// #[derive(Debug, Clone, Serialize, Deserialize, JsonRpcNotification)]
3059/// #[notification(method = "_ping")]
3060/// struct PingNotification {
3061/// timestamp: u64,
3062/// }
3063/// ```
3064pub trait JsonRpcNotification: JsonRpcMessage {}
3065
3066/// A struct that represents a request (JSON-RPC message expecting a response).
3067///
3068/// # Derive Macro
3069///
3070/// Use `#[derive(JsonRpcRequest)]` to automatically implement both `JsonRpcMessage` and `JsonRpcRequest`:
3071///
3072/// ```ignore
3073/// use agent_client_protocol::{JsonRpcRequest, JsonRpcResponse};
3074/// use serde::{Serialize, Deserialize};
3075///
3076/// #[derive(Debug, Clone, Serialize, Deserialize, JsonRpcRequest)]
3077/// #[request(method = "_hello", response = HelloResponse)]
3078/// struct HelloRequest {
3079/// name: String,
3080/// }
3081///
3082/// #[derive(Debug, Serialize, Deserialize, JsonRpcResponse)]
3083/// struct HelloResponse {
3084/// greeting: String,
3085/// }
3086/// ```
3087pub trait JsonRpcRequest: JsonRpcMessage {
3088 /// The type of data expected in response.
3089 type Response: JsonRpcResponse;
3090}
3091
3092/// An enum capturing an in-flight request or notification.
3093/// In the case of a request, also includes the context used to respond to the request.
3094///
3095/// Type parameters allow specifying the concrete request and notification types.
3096/// By default, both are `UntypedMessage` for dynamic dispatch.
3097/// The request context's response type matches the request's response type.
3098#[derive(Debug)]
3099pub enum Dispatch<Req: JsonRpcRequest = UntypedMessage, Notif: JsonRpcMessage = UntypedMessage> {
3100 /// Incoming request and the context where the response should be sent.
3101 Request(Req, Responder<Req::Response>),
3102
3103 /// Incoming notification.
3104 Notification(Notif),
3105
3106 /// Incoming response to a request we sent.
3107 ///
3108 /// The first field is the response result (success or error from the remote).
3109 /// The second field is the context for forwarding the response to its destination
3110 /// (typically a waiting oneshot channel).
3111 Response(
3112 Result<Req::Response, crate::Error>,
3113 ResponseRouter<Req::Response>,
3114 ),
3115}
3116
3117impl<Req: JsonRpcRequest, Notif: JsonRpcMessage> Dispatch<Req, Notif> {
3118 /// Map the request and notification types to new types.
3119 ///
3120 /// Note: Response variants are passed through unchanged since they don't
3121 /// contain a parseable message payload.
3122 pub fn map<Req1, Notif1>(
3123 self,
3124 map_request: impl FnOnce(Req, Responder<Req::Response>) -> (Req1, Responder<Req1::Response>),
3125 map_notification: impl FnOnce(Notif) -> Notif1,
3126 ) -> Dispatch<Req1, Notif1>
3127 where
3128 Req1: JsonRpcRequest<Response = Req::Response>,
3129 Notif1: JsonRpcMessage,
3130 {
3131 match self {
3132 Dispatch::Request(request, responder) => {
3133 let (new_request, new_responder) = map_request(request, responder);
3134 Dispatch::Request(new_request, new_responder)
3135 }
3136 Dispatch::Notification(notification) => {
3137 let new_notification = map_notification(notification);
3138 Dispatch::Notification(new_notification)
3139 }
3140 Dispatch::Response(result, router) => Dispatch::Response(result, router),
3141 }
3142 }
3143
3144 /// Respond to the message with an error.
3145 ///
3146 /// If this message is a request, this error becomes the reply to the request.
3147 ///
3148 /// If this message is a notification, the error is sent as a notification.
3149 ///
3150 /// If this message is a response, the error is forwarded to the waiting handler.
3151 pub fn respond_with_error<R: Role>(
3152 self,
3153 error: crate::Error,
3154 cx: ConnectionTo<R>,
3155 ) -> Result<(), crate::Error> {
3156 match self {
3157 Dispatch::Request(_, responder) => responder.respond_with_error(error),
3158 Dispatch::Notification(_) => cx.send_error_notification(error),
3159 Dispatch::Response(_, responder) => responder.respond_with_error(error),
3160 }
3161 }
3162
3163 /// Convert to a `Responder` that expects a JSON value
3164 /// and which checks (dynamically) that the JSON value it receives
3165 /// can be converted to `T`.
3166 ///
3167 /// Note: Response variants cannot be erased since their payload is already
3168 /// parsed. This returns an error for Response variants.
3169 pub fn erase_to_json(self) -> Result<Dispatch, crate::Error> {
3170 match self {
3171 Dispatch::Request(response, responder) => Ok(Dispatch::Request(
3172 response.to_untyped_message()?,
3173 responder.erase_to_json(),
3174 )),
3175 Dispatch::Notification(notification) => {
3176 Ok(Dispatch::Notification(notification.to_untyped_message()?))
3177 }
3178 Dispatch::Response(_, _) => Err(crate::util::internal_error(
3179 "cannot erase Response variant to JSON",
3180 )),
3181 }
3182 }
3183
3184 /// Convert the message in self to an untyped message.
3185 ///
3186 /// Note: Response variants don't have an untyped message representation.
3187 /// This returns an error for Response variants.
3188 pub fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error> {
3189 match self {
3190 Dispatch::Request(request, _) => request.to_untyped_message(),
3191 Dispatch::Notification(notification) => notification.to_untyped_message(),
3192 Dispatch::Response(_, _) => Err(crate::util::internal_error(
3193 "Response variant has no untyped message representation",
3194 )),
3195 }
3196 }
3197
3198 /// Convert self to an untyped message context.
3199 ///
3200 /// Note: Response variants cannot be converted. This returns an error for Response variants.
3201 pub fn into_untyped_dispatch(self) -> Result<Dispatch, crate::Error> {
3202 match self {
3203 Dispatch::Request(request, responder) => Ok(Dispatch::Request(
3204 request.to_untyped_message()?,
3205 responder.erase_to_json(),
3206 )),
3207 Dispatch::Notification(notification) => {
3208 Ok(Dispatch::Notification(notification.to_untyped_message()?))
3209 }
3210 Dispatch::Response(_, _) => Err(crate::util::internal_error(
3211 "cannot convert Response variant to untyped message context",
3212 )),
3213 }
3214 }
3215
3216 /// Returns the request ID if this is a request or response, None if notification.
3217 pub fn id(&self) -> Option<serde_json::Value> {
3218 match self {
3219 Dispatch::Request(_, cx) => Some(cx.id()),
3220 Dispatch::Notification(_) => None,
3221 Dispatch::Response(_, cx) => Some(cx.id()),
3222 }
3223 }
3224
3225 /// Returns the method of the message.
3226 ///
3227 /// For requests and notifications, this is the method from the message payload.
3228 /// For responses, this is the method of the original request.
3229 pub fn method(&self) -> &str {
3230 match self {
3231 Dispatch::Request(msg, _) => msg.method(),
3232 Dispatch::Notification(msg) => msg.method(),
3233 Dispatch::Response(_, cx) => cx.method(),
3234 }
3235 }
3236}
3237
3238impl Dispatch {
3239 /// Attempts to parse `self` into a typed message context.
3240 ///
3241 /// # Returns
3242 ///
3243 /// * `Ok(Ok(typed))` if this is a request/notification of the given types
3244 /// * `Ok(Err(self))` if not
3245 /// * `Err` if has the correct method for the given types but parsing fails
3246 #[tracing::instrument(skip(self), fields(Request = ?std::any::type_name::<Req>(), Notif = ?std::any::type_name::<Notif>()), level = "trace", ret)]
3247 pub(crate) fn into_typed_dispatch<Req: JsonRpcRequest, Notif: JsonRpcNotification>(
3248 self,
3249 ) -> Result<Result<Dispatch<Req, Notif>, Dispatch>, crate::Error> {
3250 tracing::debug!(
3251 message = ?self,
3252 "into_typed_dispatch"
3253 );
3254 match self {
3255 Dispatch::Request(message, responder) => {
3256 if Req::matches_method(&message.method) {
3257 match Req::parse_message(&message.method, &message.params) {
3258 Ok(req) => {
3259 tracing::trace!(?req, "parsed ok");
3260 Ok(Ok(Dispatch::Request(req, responder.cast())))
3261 }
3262 Err(err) => {
3263 tracing::trace!(?err, "parse error");
3264 Err(err)
3265 }
3266 }
3267 } else {
3268 tracing::trace!("method doesn't match");
3269 Ok(Err(Dispatch::Request(message, responder)))
3270 }
3271 }
3272
3273 Dispatch::Notification(message) => {
3274 if Notif::matches_method(&message.method) {
3275 match Notif::parse_message(&message.method, &message.params) {
3276 Ok(notif) => {
3277 tracing::trace!(?notif, "parse ok");
3278 Ok(Ok(Dispatch::Notification(notif)))
3279 }
3280 Err(err) => {
3281 tracing::trace!(?err, "parse error");
3282 Err(err)
3283 }
3284 }
3285 } else {
3286 tracing::trace!("method doesn't match");
3287 Ok(Err(Dispatch::Notification(message)))
3288 }
3289 }
3290
3291 Dispatch::Response(result, cx) => {
3292 let method = cx.method();
3293 if Req::matches_method(method) {
3294 // Parse the response result
3295 let typed_result = match result {
3296 Ok(value) => {
3297 match <Req::Response as JsonRpcResponse>::from_value(method, value) {
3298 Ok(parsed) => {
3299 tracing::trace!(?parsed, "parse ok");
3300 Ok(parsed)
3301 }
3302 Err(err) => {
3303 tracing::trace!(?err, "parse error");
3304 return Err(err);
3305 }
3306 }
3307 }
3308 Err(err) => {
3309 tracing::trace!("error, passthrough");
3310 Err(err)
3311 }
3312 };
3313 Ok(Ok(Dispatch::Response(typed_result, cx.cast())))
3314 } else {
3315 tracing::trace!("method doesn't match");
3316 Ok(Err(Dispatch::Response(result, cx)))
3317 }
3318 }
3319 }
3320 }
3321
3322 /// True if this message has a field with the given name.
3323 ///
3324 /// Returns `false` for Response variants.
3325 #[must_use]
3326 pub fn has_field(&self, field_name: &str) -> bool {
3327 self.message()
3328 .and_then(|m| m.params().get(field_name))
3329 .is_some()
3330 }
3331
3332 /// Returns true if this message has a session-id field.
3333 ///
3334 /// Returns `false` for Response variants.
3335 pub(crate) fn has_session_id(&self) -> bool {
3336 self.has_field("sessionId")
3337 }
3338
3339 /// Extract the ACP session-id from this message (if any).
3340 ///
3341 /// Returns `Ok(None)` for Response variants.
3342 pub(crate) fn get_session_id(&self) -> Result<Option<SessionId>, crate::Error> {
3343 let Some(message) = self.message() else {
3344 return Ok(None);
3345 };
3346 let Some(value) = message.params().get("sessionId") else {
3347 return Ok(None);
3348 };
3349 let session_id = serde_json::from_value(value.clone())?;
3350 Ok(Some(session_id))
3351 }
3352
3353 /// Try to parse this as a notification of the given type.
3354 ///
3355 /// # Returns
3356 ///
3357 /// * `Ok(Ok(typed))` if this is a request/notification of the given types
3358 /// * `Ok(Err(self))` if not
3359 /// * `Err` if has the correct method for the given types but parsing fails
3360 pub fn into_notification<N: JsonRpcNotification>(
3361 self,
3362 ) -> Result<Result<N, Dispatch>, crate::Error> {
3363 match self {
3364 Dispatch::Notification(msg) => {
3365 if !N::matches_method(&msg.method) {
3366 return Ok(Err(Dispatch::Notification(msg)));
3367 }
3368 match N::parse_message(&msg.method, &msg.params) {
3369 Ok(n) => Ok(Ok(n)),
3370 Err(err) => Err(err),
3371 }
3372 }
3373 Dispatch::Request(..) | Dispatch::Response(..) => Ok(Err(self)),
3374 }
3375 }
3376
3377 /// Try to parse this as a request of the given type.
3378 ///
3379 /// # Returns
3380 ///
3381 /// * `Ok(Ok(typed))` if this is a request/notification of the given types
3382 /// * `Ok(Err(self))` if not
3383 /// * `Err` if has the correct method for the given types but parsing fails
3384 pub fn into_request<Req: JsonRpcRequest>(
3385 self,
3386 ) -> Result<Result<(Req, Responder<Req::Response>), Dispatch>, crate::Error> {
3387 match self {
3388 Dispatch::Request(msg, responder) => {
3389 if !Req::matches_method(&msg.method) {
3390 return Ok(Err(Dispatch::Request(msg, responder)));
3391 }
3392 match Req::parse_message(&msg.method, &msg.params) {
3393 Ok(req) => Ok(Ok((req, responder.cast()))),
3394 Err(err) => Err(err),
3395 }
3396 }
3397 Dispatch::Notification(..) | Dispatch::Response(..) => Ok(Err(self)),
3398 }
3399 }
3400}
3401
3402impl<M: JsonRpcRequest + JsonRpcNotification> Dispatch<M, M> {
3403 /// Returns the message payload for requests and notifications.
3404 ///
3405 /// Returns `None` for Response variants since they don't contain a message payload.
3406 pub fn message(&self) -> Option<&M> {
3407 match self {
3408 Dispatch::Request(msg, _) | Dispatch::Notification(msg) => Some(msg),
3409 Dispatch::Response(_, _) => None,
3410 }
3411 }
3412
3413 /// Map the request/notification message.
3414 ///
3415 /// Response variants pass through unchanged.
3416 pub(crate) fn try_map_message(
3417 self,
3418 map_message: impl FnOnce(M) -> Result<M, crate::Error>,
3419 ) -> Result<Dispatch<M, M>, crate::Error> {
3420 match self {
3421 Dispatch::Request(request, cx) => Ok(Dispatch::Request(map_message(request)?, cx)),
3422 Dispatch::Notification(notification) => {
3423 Ok(Dispatch::<M, M>::Notification(map_message(notification)?))
3424 }
3425 Dispatch::Response(result, cx) => Ok(Dispatch::Response(result, cx)),
3426 }
3427 }
3428}
3429
3430/// An incoming JSON message without any typing. Can be a request or a notification.
3431#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
3432pub struct UntypedMessage {
3433 /// The JSON-RPC method name
3434 pub method: String,
3435 /// The JSON-RPC parameters as a raw JSON value
3436 pub params: serde_json::Value,
3437}
3438
3439impl UntypedMessage {
3440 /// Returns an untyped message with the given method and parameters.
3441 pub fn new(method: &str, params: impl Serialize) -> Result<Self, crate::Error> {
3442 let params = serde_json::to_value(params)?;
3443 Ok(Self {
3444 method: method.to_string(),
3445 params,
3446 })
3447 }
3448
3449 /// Returns the method name
3450 #[must_use]
3451 pub fn method(&self) -> &str {
3452 &self.method
3453 }
3454
3455 /// Returns the parameters as a JSON value
3456 #[must_use]
3457 pub fn params(&self) -> &serde_json::Value {
3458 &self.params
3459 }
3460
3461 /// Consumes this message and returns the method and params
3462 #[must_use]
3463 pub fn into_parts(self) -> (String, serde_json::Value) {
3464 (self.method, self.params)
3465 }
3466
3467 /// Convert `self` to a raw JSON-RPC message.
3468 pub(crate) fn into_raw_jsonrpc_message(
3469 self,
3470 id: Option<RequestId>,
3471 ) -> Result<RawJsonRpcMessage, crate::Error> {
3472 let Self { method, params } = self;
3473 match id {
3474 Some(id) => RawJsonRpcMessage::request(method, params, id),
3475 None => RawJsonRpcMessage::notification(method, params),
3476 }
3477 }
3478}
3479
3480impl JsonRpcMessage for UntypedMessage {
3481 fn matches_method(_method: &str) -> bool {
3482 // UntypedMessage matches any method - it's the untyped fallback
3483 true
3484 }
3485
3486 fn method(&self) -> &str {
3487 &self.method
3488 }
3489
3490 fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error> {
3491 Ok(self.clone())
3492 }
3493
3494 fn parse_message(method: &str, params: &impl Serialize) -> Result<Self, crate::Error> {
3495 UntypedMessage::new(method, params)
3496 }
3497}
3498
3499impl JsonRpcRequest for UntypedMessage {
3500 type Response = serde_json::Value;
3501}
3502
3503impl JsonRpcNotification for UntypedMessage {}
3504
3505/// Represents a pending response of type `R` from an outgoing request.
3506///
3507/// Returned by [`ConnectionTo::send_request`], this type provides methods for handling
3508/// the response without blocking the event loop. The API is intentionally designed to make
3509/// it difficult to accidentally block.
3510///
3511/// # Anti-Footgun Design
3512///
3513/// You cannot directly `.await` a `SentRequest`. Instead, you must choose how to handle
3514/// the response:
3515///
3516/// ## Option 1: Schedule a Callback (Safe in Handlers)
3517///
3518/// Use [`on_receiving_result`](Self::on_receiving_result) to schedule a task
3519/// that runs when the response arrives. This doesn't block the event loop:
3520///
3521/// ```no_run
3522/// # use agent_client_protocol_test::*;
3523/// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
3524/// cx.send_request(MyRequest {})
3525/// .on_receiving_result(async |result| {
3526/// match result {
3527/// Ok(response) => {
3528/// // Handle successful response
3529/// Ok(())
3530/// }
3531/// Err(error) => {
3532/// // Handle error
3533/// Err(error)
3534/// }
3535/// }
3536/// })?;
3537/// # Ok(())
3538/// # }
3539/// ```
3540///
3541/// ## Option 2: Block in a Spawned Task (Safe Only in `spawn`)
3542///
3543/// Use [`block_task`](Self::block_task) to block until the response arrives, but **only**
3544/// in a spawned task (never in a handler):
3545///
3546/// ```no_run
3547/// # use agent_client_protocol_test::*;
3548/// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
3549/// // ✅ Safe: Spawned task runs concurrently
3550/// cx.spawn({
3551/// let cx = cx.clone();
3552/// async move {
3553/// let response = cx.send_request(MyRequest {})
3554/// .block_task()
3555/// .await?;
3556/// // Process response...
3557/// Ok(())
3558/// }
3559/// })?;
3560/// # Ok(())
3561/// # }
3562/// ```
3563///
3564/// ```no_run
3565/// # use agent_client_protocol_test::*;
3566/// # async fn example() -> Result<(), agent_client_protocol::Error> {
3567/// # let connection = mock_connection();
3568/// // ❌ NEVER do this in a handler - blocks the event loop!
3569/// connection.on_receive_request(async |req: MyRequest, responder, cx| {
3570/// let response = cx.send_request(MyRequest {})
3571/// .block_task() // This will deadlock!
3572/// .await?;
3573/// responder.respond(response)
3574/// }, agent_client_protocol::on_receive_request!())
3575/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
3576/// # Ok(())
3577/// # }
3578/// ```
3579///
3580/// # Why This Design?
3581///
3582/// If you block the event loop while waiting for a response, the connection cannot process
3583/// the incoming response message, creating a deadlock. This API design prevents that footgun
3584/// by making blocking explicit and encouraging non-blocking patterns.
3585///
3586/// # Drop Behavior
3587///
3588/// By default, dropping a `SentRequest` without consuming it discards the
3589/// response when it arrives. When the `unstable_cancel_request` feature is
3590/// enabled, dropping a `SentRequest` before the SDK has received the response
3591/// additionally sends a `$/cancel_request` notification asking the peer to
3592/// cancel the request; requests whose eventual response should be ignored, but
3593/// which should keep running on the peer, should use [`detach`](Self::detach)
3594/// instead.
3595#[must_use = "dropping a SentRequest discards the response (and, with the \
3596 `unstable_cancel_request` feature, asks the peer to cancel the \
3597 request); consume it with `block_task`, `on_receiving_result`, \
3598 `forward_response_to`, or `detach`"]
3599pub struct SentRequest<T> {
3600 id: RequestId,
3601 method: String,
3602 task_tx: TaskTx,
3603 response_rx: oneshot::Receiver<ResponsePayload>,
3604 to_result: Box<dyn Fn(serde_json::Value) -> Result<T, crate::Error> + Send>,
3605 #[cfg(feature = "unstable_cancel_request")]
3606 cancellation: SentRequestCancellation,
3607 /// Cancellation markers of other (incoming) requests whose cancellation
3608 /// should be forwarded to this request. See
3609 /// [`forward_cancellation_from`](Self::forward_cancellation_from).
3610 #[cfg(feature = "unstable_cancel_request")]
3611 cancellation_sources: Vec<RequestCancellation>,
3612}
3613
3614#[cfg(feature = "unstable_cancel_request")]
3615#[derive(Clone, Debug)]
3616pub(crate) struct SentRequestCancellationDisarm {
3617 armed: Arc<AtomicBool>,
3618}
3619
3620#[cfg(feature = "unstable_cancel_request")]
3621impl SentRequestCancellationDisarm {
3622 fn new() -> Self {
3623 Self {
3624 armed: Arc::new(AtomicBool::new(true)),
3625 }
3626 }
3627
3628 fn disarm(&self) {
3629 self.armed.store(false, Ordering::Release);
3630 }
3631}
3632
3633#[cfg(feature = "unstable_cancel_request")]
3634struct SentRequestCancellation {
3635 message_tx: OutgoingMessageTx,
3636 remote_style: crate::role::RemoteStyle,
3637 request_id: RequestId,
3638 disarm: SentRequestCancellationDisarm,
3639}
3640
3641#[cfg(feature = "unstable_cancel_request")]
3642impl SentRequestCancellation {
3643 fn new(
3644 message_tx: OutgoingMessageTx,
3645 remote_style: crate::role::RemoteStyle,
3646 request_id: RequestId,
3647 ) -> Self {
3648 Self {
3649 message_tx,
3650 remote_style,
3651 request_id,
3652 disarm: SentRequestCancellationDisarm::new(),
3653 }
3654 }
3655
3656 fn disarm(&self) {
3657 self.disarm.disarm();
3658 }
3659
3660 fn disarm_handle(&self) -> SentRequestCancellationDisarm {
3661 self.disarm.clone()
3662 }
3663
3664 fn send(&self) -> Result<(), crate::Error> {
3665 if !self.disarm.armed.swap(false, Ordering::AcqRel) {
3666 return Ok(());
3667 }
3668
3669 // Build the notification lazily: most requests are never cancelled,
3670 // so this avoids serializing a notification per outgoing request.
3671 let untyped = self.remote_style.transform_outgoing_message(
3672 crate::schema::v1::CancelRequestNotification::new(self.request_id.clone()),
3673 )?;
3674
3675 send_raw_message(&self.message_tx, OutgoingMessage::Notification { untyped })
3676 }
3677}
3678
3679#[cfg(feature = "unstable_cancel_request")]
3680impl Drop for SentRequestCancellation {
3681 fn drop(&mut self) {
3682 if let Err(error) = self.send() {
3683 tracing::debug!(?error, "failed to auto-cancel dropped request");
3684 }
3685 }
3686}
3687
3688#[cfg(feature = "unstable_cancel_request")]
3689impl Debug for SentRequestCancellation {
3690 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3691 f.debug_struct("SentRequestCancellation")
3692 .field("request_id", &self.request_id)
3693 .field("remote_style", &self.remote_style)
3694 .field("armed", &self.disarm.armed.load(Ordering::Acquire))
3695 .finish_non_exhaustive()
3696 }
3697}
3698
3699/// Await the response payload for an outgoing request, watching `sources` for
3700/// cancellation of the upstream requests it was registered with.
3701///
3702/// When any source reports cancellation, a `$/cancel_request` is forwarded to
3703/// the outgoing request (at most once, shared with [`SentRequest::cancel`] and
3704/// drop-time auto-cancellation), and the response is *still* awaited: the peer
3705/// always answers, with normal data or a cancellation error.
3706///
3707/// Watching is deliberately bounded by response arrival so that completed
3708/// requests do not leak waiters on markers that will never fire.
3709#[cfg(feature = "unstable_cancel_request")]
3710async fn await_response_forwarding_cancellation(
3711 response_rx: oneshot::Receiver<ResponsePayload>,
3712 cancellation: &SentRequestCancellation,
3713 sources: &[RequestCancellation],
3714) -> Result<ResponsePayload, oneshot::Canceled> {
3715 // Failing to forward the cancellation must not abort the wait: the
3716 // response (normal data or a cancellation error) may still arrive and
3717 // must still be processed.
3718 let forward_cancellation = || {
3719 if let Err(error) = cancellation.send() {
3720 tracing::debug!(
3721 ?error,
3722 "failed to forward cancellation to downstream request"
3723 );
3724 }
3725 };
3726
3727 let response = if sources.is_empty() {
3728 response_rx.await
3729 } else if sources.iter().any(RequestCancellation::is_cancelled) {
3730 forward_cancellation();
3731 response_rx.await
3732 } else {
3733 let cancelled = sources.iter().map(|source| source.state.signal_rx.clone());
3734 match future::select(future::select_all(cancelled), response_rx).await {
3735 Either::Left((_, response_rx)) => {
3736 forward_cancellation();
3737 response_rx.await
3738 }
3739 Either::Right((response, _)) => response,
3740 }
3741 };
3742
3743 cancellation.disarm();
3744 response
3745}
3746
3747impl<T: Debug> Debug for SentRequest<T> {
3748 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3749 let mut debug = f.debug_struct("SentRequest");
3750 debug
3751 .field("id", &self.id)
3752 .field("method", &self.method)
3753 .field("task_tx", &self.task_tx)
3754 .field("response_rx", &self.response_rx);
3755 #[cfg(feature = "unstable_cancel_request")]
3756 debug
3757 .field("cancellation", &self.cancellation)
3758 .field("cancellation_sources", &self.cancellation_sources);
3759 debug.finish_non_exhaustive()
3760 }
3761}
3762
3763impl SentRequest<serde_json::Value> {
3764 fn new(
3765 id: RequestId,
3766 method: String,
3767 task_tx: mpsc::UnboundedSender<Task>,
3768 response_rx: oneshot::Receiver<ResponsePayload>,
3769 #[cfg(feature = "unstable_cancel_request")] cancellation: SentRequestCancellation,
3770 ) -> Self {
3771 Self {
3772 id,
3773 method,
3774 response_rx,
3775 task_tx,
3776 to_result: Box::new(Ok),
3777 #[cfg(feature = "unstable_cancel_request")]
3778 cancellation,
3779 #[cfg(feature = "unstable_cancel_request")]
3780 cancellation_sources: Vec::new(),
3781 }
3782 }
3783}
3784
3785impl<T> SentRequest<T> {
3786 /// Detach this request handle without waiting for its response.
3787 ///
3788 /// The response will be discarded when it arrives. When the
3789 /// `unstable_cancel_request` feature is enabled, this also disarms the
3790 /// drop-time automatic cancellation described in
3791 /// [Drop Behavior](Self#drop-behavior), so use it for requests whose
3792 /// eventual response should be ignored, but which should keep running on the
3793 /// peer. The peer is still expected to answer the JSON-RPC request
3794 /// eventually; use a notification instead when no response is expected at
3795 /// all.
3796 ///
3797 /// To ask the peer to stop the request, enable `unstable_cancel_request`
3798 /// and call `cancel` instead, or drop the handle while automatic
3799 /// cancellation is enabled.
3800 pub fn detach(self) {
3801 #[cfg(feature = "unstable_cancel_request")]
3802 self.cancellation.disarm();
3803 }
3804
3805 /// Send a `$/cancel_request` notification for this outgoing request.
3806 ///
3807 /// This uses the same peer and message wrapping that were used to send the
3808 /// original request, so it is the preferred way to cancel a [`SentRequest`]
3809 /// when the request handle is still available.
3810 ///
3811 /// At most one `$/cancel_request` is ever sent per request: the first
3812 /// `cancel` call sends it (and also prevents the drop-time automatic
3813 /// cancellation described in [Drop Behavior](Self#drop-behavior)), while
3814 /// later calls return `Ok(())` without sending anything. Likewise, once
3815 /// the SDK has routed the response to this handle, `cancel` becomes a
3816 /// no-op: there is nothing left to cancel.
3817 ///
3818 /// Errors are only reported by the call that attempts to send the
3819 /// notification.
3820 #[cfg(feature = "unstable_cancel_request")]
3821 pub fn cancel(&self) -> Result<(), crate::Error> {
3822 self.cancellation.send()
3823 }
3824
3825 /// Forward cancellation of another request to this one.
3826 ///
3827 /// When the request that `source` belongs to is cancelled by its peer,
3828 /// a `$/cancel_request` for *this* request is sent to its peer, using the
3829 /// same wrapping as the original request. The response is still awaited
3830 /// and delivered as usual (normal data or a cancellation error), so this
3831 /// composes with [`block_task`](Self::block_task) and
3832 /// [`on_receiving_result`](Self::on_receiving_result).
3833 ///
3834 /// This is the building block for proxies that forward a request with
3835 /// custom logic instead of [`forward_response_to`](Self::forward_response_to)
3836 /// (which wires this up automatically from its responder). Without it,
3837 /// custom forwarding *absorbs* cancellation: the upstream marker is still
3838 /// set, but nothing is sent downstream.
3839 ///
3840 /// ```
3841 /// # use agent_client_protocol::{ConnectionTo, Error, Responder, UntypedRole};
3842 /// # use agent_client_protocol_test::{MyRequest, MyResponse};
3843 /// # async fn example(request: MyRequest, responder: Responder<MyResponse>, backend: ConnectionTo<UntypedRole>) -> Result<(), Error> {
3844 /// backend
3845 /// .send_request(request)
3846 /// .forward_cancellation_from(responder.cancellation())
3847 /// .on_receiving_result(async move |result| {
3848 /// // Custom result handling, e.g. bookkeeping or rewriting.
3849 /// responder.respond_with_result(result)
3850 /// })?;
3851 /// # Ok(())
3852 /// # }
3853 /// ```
3854 ///
3855 /// May be called multiple times; cancellation of any registered source
3856 /// triggers the forwarding (at most one `$/cancel_request` is ever sent
3857 /// per request). Sources are observed while the response is being
3858 /// awaited — that is, once the handle is consumed with
3859 /// [`block_task`](Self::block_task),
3860 /// [`on_receiving_result`](Self::on_receiving_result), or
3861 /// [`forward_response_to`](Self::forward_response_to); a source that was
3862 /// already cancelled by then is honored immediately.
3863 #[cfg(feature = "unstable_cancel_request")]
3864 pub fn forward_cancellation_from(mut self, source: RequestCancellation) -> Self {
3865 self.cancellation_sources.push(source);
3866 self
3867 }
3868}
3869
3870impl<T: JsonRpcResponse> SentRequest<T> {
3871 /// The id of the outgoing request.
3872 #[must_use]
3873 pub fn id(&self) -> serde_json::Value {
3874 crate::util::id_to_json(&self.id)
3875 }
3876
3877 /// The method of the request this is in response to.
3878 #[must_use]
3879 pub fn method(&self) -> &str {
3880 &self.method
3881 }
3882
3883 /// Create a new response that maps the result of the response to a new type.
3884 pub fn map<U>(
3885 self,
3886 map_fn: impl Fn(T) -> Result<U, crate::Error> + 'static + Send,
3887 ) -> SentRequest<U> {
3888 SentRequest {
3889 id: self.id,
3890 method: self.method,
3891 response_rx: self.response_rx,
3892 task_tx: self.task_tx,
3893 to_result: Box::new(move |value| map_fn((self.to_result)(value)?)),
3894 #[cfg(feature = "unstable_cancel_request")]
3895 cancellation: self.cancellation,
3896 #[cfg(feature = "unstable_cancel_request")]
3897 cancellation_sources: self.cancellation_sources,
3898 }
3899 }
3900
3901 /// Forward the response (success or error) to a request context when it arrives.
3902 ///
3903 /// This is a convenience method for proxying messages between connections. When the
3904 /// response arrives, it will be automatically sent to the provided request context,
3905 /// whether it's a successful response or an error.
3906 ///
3907 /// # Example: Proxying requests
3908 ///
3909 /// ```
3910 /// # use agent_client_protocol::UntypedRole;
3911 /// # use agent_client_protocol::{Builder, ConnectionTo};
3912 /// # use agent_client_protocol_test::*;
3913 /// # async fn example(cx: ConnectionTo<UntypedRole>) -> Result<(), agent_client_protocol::Error> {
3914 /// // Set up backend connection builder
3915 /// let backend = UntypedRole.builder()
3916 /// .on_receive_request(async |req: MyRequest, responder, cx| {
3917 /// responder.respond(MyResponse { status: "ok".into() })
3918 /// }, agent_client_protocol::on_receive_request!());
3919 ///
3920 /// // Spawn backend and get a context to send to it
3921 /// let backend_connection = cx.spawn_connection(backend, MockTransport)?;
3922 ///
3923 /// // Set up proxy that forwards requests to backend
3924 /// UntypedRole.builder()
3925 /// .on_receive_request({
3926 /// let backend_connection = backend_connection.clone();
3927 /// async move |req: MyRequest, responder, cx| {
3928 /// // Forward the request to backend and proxy the response back
3929 /// backend_connection.send_request(req)
3930 /// .forward_response_to(responder)?;
3931 /// Ok(())
3932 /// }
3933 /// }, agent_client_protocol::on_receive_request!());
3934 /// # Ok(())
3935 /// # }
3936 /// ```
3937 ///
3938 /// # Type Safety
3939 ///
3940 /// The request context's response type must match the request's response type,
3941 /// ensuring type-safe message forwarding.
3942 ///
3943 /// # When to Use
3944 ///
3945 /// Use this when:
3946 /// - You're implementing a proxy or gateway pattern
3947 /// - You want to forward responses without processing them
3948 /// - The response types match between the outgoing request and incoming request
3949 ///
3950 /// This is equivalent to calling `on_receiving_result` and manually forwarding
3951 /// the result, with two proxy-specific additions:
3952 ///
3953 /// - If the pending response is dropped without ever being delivered (for
3954 /// example, the downstream connection closed), the incoming request is
3955 /// answered with an internal error instead of being left unanswered.
3956 /// - When the `unstable_cancel_request` feature is enabled and the peer
3957 /// cancels the incoming request, the cancellation is forwarded to the
3958 /// outgoing request, and the downstream response (normal data or a
3959 /// cancellation error) is still forwarded back. This is equivalent to
3960 /// registering the responder's marker with `forward_cancellation_from`.
3961 #[track_caller]
3962 pub fn forward_response_to(self, responder: Responder<T>) -> Result<(), crate::Error>
3963 where
3964 T: Send,
3965 {
3966 #[cfg(feature = "unstable_cancel_request")]
3967 let this = self.forward_cancellation_from(responder.cancellation());
3968 #[cfg(not(feature = "unstable_cancel_request"))]
3969 let this = self;
3970
3971 this.consume_with(async move |response| {
3972 // A response that was never delivered (outer `Err`, e.g. the
3973 // downstream connection closed) is forwarded as an error: the
3974 // incoming request must not be left unanswered.
3975 responder.respond_with_result(response.unwrap_or_else(Err))
3976 })
3977 }
3978
3979 /// Spawn the response-consumption task shared by
3980 /// [`on_receiving_result`](Self::on_receiving_result) and
3981 /// [`forward_response_to`](Self::forward_response_to).
3982 ///
3983 /// The task awaits the response (forwarding cancellation from registered
3984 /// sources while waiting, when the `unstable_cancel_request` feature is
3985 /// enabled), converts the payload, and invokes `handle` with the typed
3986 /// result (`Ok(Result<T, _>)`). The dispatch loop's ack, if any, is sent
3987 /// after `handle` completes.
3988 ///
3989 /// If the pending response is dropped without ever being delivered (for
3990 /// example, the connection closed), `handle` receives the outer `Err`
3991 /// describing the loss; there is no ack in that case.
3992 #[track_caller]
3993 fn consume_with<F>(
3994 self,
3995 handle: impl FnOnce(Result<Result<T, crate::Error>, crate::Error>) -> F + 'static + Send,
3996 ) -> Result<(), crate::Error>
3997 where
3998 F: Future<Output = Result<(), crate::Error>> + 'static + Send,
3999 T: Send,
4000 {
4001 let task_tx = self.task_tx.clone();
4002 let method = self.method;
4003 let response_rx = self.response_rx;
4004 let to_result = self.to_result;
4005 #[cfg(feature = "unstable_cancel_request")]
4006 let cancellation = self.cancellation;
4007 #[cfg(feature = "unstable_cancel_request")]
4008 let cancellation_sources = self.cancellation_sources;
4009 let location = Location::caller();
4010
4011 Task::new(location, async move {
4012 #[cfg(feature = "unstable_cancel_request")]
4013 let response = await_response_forwarding_cancellation(
4014 response_rx,
4015 &cancellation,
4016 &cancellation_sources,
4017 )
4018 .await;
4019 #[cfg(not(feature = "unstable_cancel_request"))]
4020 let response = response_rx.await;
4021
4022 match response {
4023 Ok(ResponsePayload { result, ack_tx }) => {
4024 // Convert the result using to_result for Ok values
4025 let typed_result = match result {
4026 Ok(json_value) => to_result(json_value),
4027 Err(err) => Err(err),
4028 };
4029
4030 let outcome = handle(Ok(typed_result)).await;
4031
4032 // Ack AFTER the handler completes - this is the key
4033 // difference from block_task. The dispatch loop waits for
4034 // this ack.
4035 if let Some(tx) = ack_tx {
4036 let _ = tx.send(());
4037 }
4038
4039 outcome
4040 }
4041 Err(err) => {
4042 handle(Err(crate::util::internal_error(format!(
4043 "response to `{method}` never received: {err}"
4044 ))))
4045 .await
4046 }
4047 }
4048 })
4049 .spawn(&task_tx)
4050 }
4051
4052 /// Block the current task until the response is received.
4053 ///
4054 /// **Warning:** This method blocks the current async task. It is **only safe** to use
4055 /// in spawned tasks created with [`ConnectionTo::spawn`]. Using it directly in a
4056 /// handler callback will deadlock the connection.
4057 ///
4058 /// # Safe Usage (in spawned tasks)
4059 ///
4060 /// ```no_run
4061 /// # use agent_client_protocol_test::*;
4062 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
4063 /// # let connection = mock_connection();
4064 /// connection.on_receive_request(async |req: MyRequest, responder, cx| {
4065 /// // Spawn a task to handle the request
4066 /// cx.spawn({
4067 /// let connection = cx.clone();
4068 /// async move {
4069 /// // Safe: We're in a spawned task, not blocking the event loop
4070 /// let response = connection.send_request(OtherRequest {})
4071 /// .block_task()
4072 /// .await?;
4073 ///
4074 /// // Process the response...
4075 /// Ok(())
4076 /// }
4077 /// })?;
4078 ///
4079 /// // Respond immediately
4080 /// responder.respond(MyResponse { status: "ok".into() })
4081 /// }, agent_client_protocol::on_receive_request!())
4082 /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
4083 /// # Ok(())
4084 /// # }
4085 /// ```
4086 ///
4087 /// # Unsafe Usage (in handlers - will deadlock!)
4088 ///
4089 /// ```no_run
4090 /// # use agent_client_protocol_test::*;
4091 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
4092 /// # let connection = mock_connection();
4093 /// connection.on_receive_request(async |req: MyRequest, responder, cx| {
4094 /// // ❌ DEADLOCK: Handler blocks event loop, which can't process the response
4095 /// let response = cx.send_request(OtherRequest {})
4096 /// .block_task()
4097 /// .await?;
4098 ///
4099 /// responder.respond(MyResponse { status: response.value })
4100 /// }, agent_client_protocol::on_receive_request!())
4101 /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
4102 /// # Ok(())
4103 /// # }
4104 /// ```
4105 ///
4106 /// # When to Use
4107 ///
4108 /// Use this method when:
4109 /// - You're in a spawned task (via [`ConnectionTo::spawn`])
4110 /// - You need the response value to proceed with your logic
4111 /// - Linear control flow is more natural than callbacks
4112 ///
4113 /// For handler callbacks, use [`on_receiving_result`](Self::on_receiving_result) instead.
4114 pub async fn block_task(self) -> Result<T, crate::Error>
4115 where
4116 T: Send,
4117 {
4118 #[cfg(feature = "unstable_cancel_request")]
4119 let response = await_response_forwarding_cancellation(
4120 self.response_rx,
4121 &self.cancellation,
4122 &self.cancellation_sources,
4123 )
4124 .await;
4125 #[cfg(not(feature = "unstable_cancel_request"))]
4126 let response = self.response_rx.await;
4127
4128 match response {
4129 Ok(ResponsePayload {
4130 result: Ok(json_value),
4131 ack_tx,
4132 }) => {
4133 // Ack immediately - we're in a spawned task, so the dispatch loop
4134 // can continue while we process the value.
4135 if let Some(tx) = ack_tx {
4136 let _ = tx.send(());
4137 }
4138 match (self.to_result)(json_value) {
4139 Ok(value) => Ok(value),
4140 Err(err) => Err(err),
4141 }
4142 }
4143 Ok(ResponsePayload {
4144 result: Err(err),
4145 ack_tx,
4146 }) => {
4147 if let Some(tx) = ack_tx {
4148 let _ = tx.send(());
4149 }
4150 Err(err)
4151 }
4152 Err(err) => Err(crate::util::internal_error(format!(
4153 "response to `{}` never received: {}",
4154 self.method, err
4155 ))),
4156 }
4157 }
4158
4159 /// Schedule an async task to run when a successful response is received.
4160 ///
4161 /// This is a convenience wrapper around [`on_receiving_result`](Self::on_receiving_result)
4162 /// for the common pattern of forwarding errors to a request context while only processing
4163 /// successful responses.
4164 ///
4165 /// # Behavior
4166 ///
4167 /// - If the response is `Ok(value)`, your task receives the value and the request context
4168 /// - If the response is `Err(error)`, the error is automatically sent to `responder`
4169 /// and your task is not called
4170 ///
4171 /// # Example: Chaining requests
4172 ///
4173 /// ```no_run
4174 /// # use agent_client_protocol_test::*;
4175 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
4176 /// # let connection = mock_connection();
4177 /// connection.on_receive_request(async |req: ValidateRequest, responder, cx| {
4178 /// // Send initial request
4179 /// cx.send_request(ValidateRequest { data: req.data.clone() })
4180 /// .on_receiving_ok_result(responder, async |validation, responder| {
4181 /// // Only runs if validation succeeded
4182 /// if validation.is_valid {
4183 /// // Respond to original request
4184 /// responder.respond(ValidateResponse { is_valid: true, error: None })
4185 /// } else {
4186 /// responder.respond_with_error(agent_client_protocol::util::internal_error("validation failed"))
4187 /// }
4188 /// })?;
4189 ///
4190 /// Ok(())
4191 /// }, agent_client_protocol::on_receive_request!())
4192 /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
4193 /// # Ok(())
4194 /// # }
4195 /// ```
4196 ///
4197 /// # Ordering
4198 ///
4199 /// Like [`on_receiving_result`](Self::on_receiving_result), the callback blocks the
4200 /// dispatch loop until it completes. See the [`ordering`](crate::concepts::ordering) module
4201 /// for details.
4202 ///
4203 /// # When to Use
4204 ///
4205 /// Use this when:
4206 /// - You need to respond to a request based on another request's result
4207 /// - You want errors to automatically propagate to the request context
4208 /// - You only care about the success case
4209 ///
4210 /// For more control over error handling, use [`on_receiving_result`](Self::on_receiving_result).
4211 #[track_caller]
4212 pub fn on_receiving_ok_result<F>(
4213 self,
4214 responder: Responder<T>,
4215 task: impl FnOnce(T, Responder<T>) -> F + 'static + Send,
4216 ) -> Result<(), crate::Error>
4217 where
4218 F: Future<Output = Result<(), crate::Error>> + 'static + Send,
4219 T: Send,
4220 {
4221 self.on_receiving_result(async move |result| match result {
4222 Ok(value) => task(value, responder).await,
4223 Err(err) => responder.respond_with_error(err),
4224 })
4225 }
4226
4227 /// Schedule an async task to run when the response is received.
4228 ///
4229 /// This is the recommended way to handle responses in handler callbacks, as it doesn't
4230 /// block the event loop. The task will be spawned automatically when the response arrives.
4231 ///
4232 /// # Example: Handle response in callback
4233 ///
4234 /// ```no_run
4235 /// # use agent_client_protocol_test::*;
4236 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
4237 /// # let connection = mock_connection();
4238 /// connection.on_receive_request(async |req: MyRequest, responder, cx| {
4239 /// // Send a request and schedule a callback for the response
4240 /// cx.send_request(QueryRequest { id: 22 })
4241 /// .on_receiving_result({
4242 /// let connection = cx.clone();
4243 /// async move |result| {
4244 /// match result {
4245 /// Ok(response) => {
4246 /// println!("Got response: {:?}", response);
4247 /// // Can send more messages here
4248 /// connection.send_notification(QueryComplete {})?;
4249 /// Ok(())
4250 /// }
4251 /// Err(error) => {
4252 /// eprintln!("Request failed: {}", error);
4253 /// Err(error)
4254 /// }
4255 /// }
4256 /// }
4257 /// })?;
4258 ///
4259 /// // Handler continues immediately without waiting
4260 /// responder.respond(MyResponse { status: "processing".into() })
4261 /// }, agent_client_protocol::on_receive_request!())
4262 /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
4263 /// # Ok(())
4264 /// # }
4265 /// ```
4266 ///
4267 /// # Ordering
4268 ///
4269 /// The callback runs as a spawned task, but the dispatch loop waits for it to complete
4270 /// before processing the next message. This gives you ordering guarantees: no other
4271 /// messages will be processed while your callback runs.
4272 ///
4273 /// This differs from [`block_task`](Self::block_task), which signals completion immediately
4274 /// upon receiving the response (before your code processes it).
4275 ///
4276 /// See the [`ordering`](crate::concepts::ordering) module for details on ordering guarantees
4277 /// and how to avoid deadlocks.
4278 ///
4279 /// # Error Handling
4280 ///
4281 /// If the scheduled task returns `Err`, the entire server will shut down. Make sure to handle
4282 /// errors appropriately within your task.
4283 ///
4284 /// # When to Use
4285 ///
4286 /// Use this method when:
4287 /// - You're in a handler callback (not a spawned task)
4288 /// - You want ordering guarantees (no other messages processed during your callback)
4289 /// - You need to do async work before "releasing" control back to the dispatch loop
4290 ///
4291 /// For spawned tasks where you don't need ordering guarantees, consider [`block_task`](Self::block_task).
4292 #[track_caller]
4293 pub fn on_receiving_result<F>(
4294 self,
4295 task: impl FnOnce(Result<T, crate::Error>) -> F + 'static + Send,
4296 ) -> Result<(), crate::Error>
4297 where
4298 F: Future<Output = Result<(), crate::Error>> + 'static + Send,
4299 T: Send,
4300 {
4301 self.consume_with(async move |response| {
4302 match response {
4303 // Run the user's callback on the peer's result.
4304 Ok(result) => task(result).await,
4305 // A response that was never delivered fails the consuming
4306 // task instead of invoking the callback.
4307 Err(err) => Err(err),
4308 }
4309 })
4310 }
4311}
4312
4313// ============================================================================
4314// IntoJrConnectionTransport Implementations
4315// ============================================================================
4316
4317/// A component that communicates over line streams.
4318///
4319/// `Lines` implements the [`ConnectTo`] trait for any pair of line-based streams
4320/// (a `Stream<Item = io::Result<String>>` for incoming and a `Sink<String>` for outgoing),
4321/// handling serialization of JSON-RPC messages to/from newline-delimited JSON.
4322///
4323/// This is a lower-level primitive than [`ByteStreams`] that enables interception and
4324/// transformation of individual lines before they are parsed or after they are serialized.
4325/// This is particularly useful for debugging, logging, or implementing custom line-based
4326/// protocols.
4327///
4328/// # Use Cases
4329///
4330/// - **Line-by-line logging**: Intercept and log each line before parsing
4331/// - **Custom protocols**: Transform lines before/after JSON-RPC processing
4332/// - **Debugging**: Inspect raw message strings
4333/// - **Line filtering**: Skip or modify specific messages
4334///
4335/// Most users should use [`ByteStreams`] instead, which provides a simpler interface
4336/// for byte-based I/O.
4337///
4338/// [`ConnectTo`]: crate::ConnectTo
4339#[derive(Debug)]
4340pub struct Lines<OutgoingSink, IncomingStream> {
4341 /// Outgoing line sink (where we write serialized JSON-RPC messages)
4342 pub outgoing: OutgoingSink,
4343 /// Incoming line stream (where we read and parse JSON-RPC messages)
4344 pub incoming: IncomingStream,
4345}
4346
4347impl<OutgoingSink, IncomingStream> Lines<OutgoingSink, IncomingStream>
4348where
4349 OutgoingSink: futures::Sink<String, Error = std::io::Error> + Send + 'static,
4350 IncomingStream: futures::Stream<Item = std::io::Result<String>> + Send + 'static,
4351{
4352 /// Create a new line stream transport.
4353 pub fn new(outgoing: OutgoingSink, incoming: IncomingStream) -> Self {
4354 Self { outgoing, incoming }
4355 }
4356}
4357
4358impl<OutgoingSink, IncomingStream, R: Role> ConnectTo<R> for Lines<OutgoingSink, IncomingStream>
4359where
4360 OutgoingSink: futures::Sink<String, Error = std::io::Error> + Send + 'static,
4361 IncomingStream: futures::Stream<Item = std::io::Result<String>> + Send + 'static,
4362{
4363 async fn connect_to(self, client: impl ConnectTo<R::Counterpart>) -> Result<(), crate::Error> {
4364 let (channel, serve_self) = ConnectTo::<R>::into_channel_and_future(self);
4365 match futures::future::select(Box::pin(client.connect_to(channel)), serve_self).await {
4366 Either::Left((result, _)) | Either::Right((result, _)) => result,
4367 }
4368 }
4369
4370 fn into_channel_and_future(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
4371 let Self { outgoing, incoming } = self;
4372
4373 // Create a channel pair for the client to use
4374 let (channel_for_caller, channel_for_lines) = Channel::duplex();
4375
4376 // Create the server future that runs the line stream actors
4377 let server_future = Box::pin(async move {
4378 let Channel { rx, tx } = channel_for_lines;
4379
4380 // Run both actors concurrently
4381 let outgoing_future = transport_actor::transport_outgoing_lines_actor(rx, outgoing);
4382 let incoming_future = transport_actor::transport_incoming_lines_actor(incoming, tx);
4383
4384 // Wait for both to complete
4385 futures::try_join!(outgoing_future, incoming_future)?;
4386
4387 Ok(())
4388 });
4389
4390 (channel_for_caller, server_future)
4391 }
4392}
4393
4394/// A component that communicates over byte streams (stdin/stdout, sockets, pipes, etc.).
4395///
4396/// `ByteStreams` implements the [`ConnectTo`] trait for any pair of `AsyncRead` and `AsyncWrite`
4397/// streams, handling serialization of JSON-RPC messages to/from newline-delimited JSON.
4398/// This is the standard way to communicate with external processes or network connections.
4399///
4400/// # Use Cases
4401///
4402/// - **Stdio communication**: Connect to agents or proxies via stdin/stdout
4403/// - **Network sockets**: TCP, Unix domain sockets, or other stream-based protocols
4404/// - **Named pipes**: Cross-process communication on the same machine
4405/// - **File I/O**: Reading from and writing to file descriptors
4406///
4407/// # Example
4408///
4409/// Connecting to an agent via stdio:
4410///
4411/// ```no_run
4412/// use agent_client_protocol::UntypedRole;
4413/// # use agent_client_protocol::{ByteStreams};
4414/// use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
4415///
4416/// # async fn example() -> Result<(), agent_client_protocol::Error> {
4417/// let component = ByteStreams::new(
4418/// tokio::io::stdout().compat_write(),
4419/// tokio::io::stdin().compat(),
4420/// );
4421///
4422/// // Use as a component in a connection
4423/// agent_client_protocol::UntypedRole.builder()
4424/// .name("my-client")
4425/// .connect_to(component)
4426/// .await?;
4427/// # Ok(())
4428/// # }
4429/// ```
4430///
4431/// [`ConnectTo`]: crate::ConnectTo
4432#[derive(Debug)]
4433pub struct ByteStreams<OB, IB> {
4434 /// Outgoing byte stream (where we write serialized messages)
4435 pub outgoing: OB,
4436 /// Incoming byte stream (where we read and parse messages)
4437 pub incoming: IB,
4438}
4439
4440impl<OB, IB> ByteStreams<OB, IB>
4441where
4442 OB: AsyncWrite + Send + 'static,
4443 IB: AsyncRead + Send + 'static,
4444{
4445 /// Create a new byte stream transport.
4446 pub fn new(outgoing: OB, incoming: IB) -> Self {
4447 Self { outgoing, incoming }
4448 }
4449}
4450
4451impl<OB, IB, R: Role> ConnectTo<R> for ByteStreams<OB, IB>
4452where
4453 OB: AsyncWrite + Send + 'static,
4454 IB: AsyncRead + Send + 'static,
4455{
4456 async fn connect_to(self, client: impl ConnectTo<R::Counterpart>) -> Result<(), crate::Error> {
4457 let (channel, serve_self) = ConnectTo::<R>::into_channel_and_future(self);
4458 match futures::future::select(pin!(client.connect_to(channel)), serve_self).await {
4459 Either::Left((result, _)) | Either::Right((result, _)) => result,
4460 }
4461 }
4462
4463 fn into_channel_and_future(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
4464 use futures::AsyncBufReadExt;
4465 use futures::AsyncWriteExt;
4466 use futures::io::BufReader;
4467 let Self { outgoing, incoming } = self;
4468
4469 // Convert byte streams to line streams
4470 // Box both streams to satisfy Unpin requirements
4471 let incoming_lines = Box::pin(BufReader::new(incoming).lines());
4472
4473 // Create a sink that writes lines (with newlines) to the outgoing byte stream
4474 // We need to Box the writer since it may not be Unpin
4475 let outgoing_sink =
4476 futures::sink::unfold(Box::pin(outgoing), async move |mut writer, line: String| {
4477 let mut bytes = line.into_bytes();
4478 bytes.push(b'\n');
4479 writer.write_all(&bytes).await?;
4480 Ok::<_, std::io::Error>(writer)
4481 });
4482
4483 // Delegate to Lines component
4484 ConnectTo::<R>::into_channel_and_future(Lines::new(outgoing_sink, incoming_lines))
4485 }
4486}
4487
4488/// A channel endpoint representing one side of a bidirectional message channel.
4489///
4490/// `Channel` represents a single endpoint's view of a bidirectional communication channel.
4491/// Each endpoint has:
4492/// - `rx`: A receiver for incoming messages (or errors) from the counterpart
4493/// - `tx`: A sender for outgoing messages (or errors) to the counterpart
4494///
4495/// # Example
4496///
4497/// ```no_run
4498/// # use agent_client_protocol::UntypedRole;
4499/// # use agent_client_protocol::{Channel, Builder};
4500/// # async fn example() -> Result<(), agent_client_protocol::Error> {
4501/// // Create a pair of connected channels
4502/// let (channel_a, channel_b) = Channel::duplex();
4503///
4504/// // Each channel can be used by a different component
4505/// UntypedRole.builder()
4506/// .name("connection-a")
4507/// .connect_to(channel_a)
4508/// .await?;
4509/// # Ok(())
4510/// # }
4511/// ```
4512#[derive(Debug)]
4513pub struct Channel {
4514 /// Receives messages (or errors) from the counterpart.
4515 pub rx: mpsc::UnboundedReceiver<Result<RawJsonRpcMessage, crate::Error>>,
4516 /// Sends messages (or errors) to the counterpart.
4517 pub tx: mpsc::UnboundedSender<Result<RawJsonRpcMessage, crate::Error>>,
4518}
4519
4520impl Channel {
4521 /// Create a pair of connected channel endpoints.
4522 ///
4523 /// Returns two `Channel` instances that are connected to each other:
4524 /// - Messages sent via `channel_a.tx` are received on `channel_b.rx`
4525 /// - Messages sent via `channel_b.tx` are received on `channel_a.rx`
4526 ///
4527 /// # Returns
4528 ///
4529 /// A tuple `(channel_a, channel_b)` of connected channel endpoints.
4530 #[must_use]
4531 pub fn duplex() -> (Self, Self) {
4532 // Create channels: A sends Result<Message> which B receives as Message
4533 let (a_tx, b_rx) = mpsc::unbounded();
4534 let (b_tx, a_rx) = mpsc::unbounded();
4535
4536 let channel_a = Self { rx: a_rx, tx: a_tx };
4537 let channel_b = Self { rx: b_rx, tx: b_tx };
4538
4539 (channel_a, channel_b)
4540 }
4541
4542 /// Copy messages from `rx` to `tx`.
4543 ///
4544 /// # Returns
4545 ///
4546 /// A `Result` indicating success or failure.
4547 pub async fn copy(mut self) -> Result<(), crate::Error> {
4548 while let Some(msg) = self.rx.next().await {
4549 self.tx
4550 .unbounded_send(msg)
4551 .map_err(crate::util::internal_error)?;
4552 }
4553 Ok(())
4554 }
4555}
4556
4557impl<R: Role> ConnectTo<R> for Channel {
4558 async fn connect_to(self, client: impl ConnectTo<R::Counterpart>) -> Result<(), crate::Error> {
4559 let (client_channel, client_serve) = client.into_channel_and_future();
4560
4561 match futures::try_join!(
4562 Channel {
4563 rx: client_channel.rx,
4564 tx: self.tx
4565 }
4566 .copy(),
4567 Channel {
4568 rx: self.rx,
4569 tx: client_channel.tx
4570 }
4571 .copy(),
4572 client_serve
4573 ) {
4574 Ok(((), (), ())) => Ok(()),
4575 Err(err) => Err(err),
4576 }
4577 }
4578
4579 fn into_channel_and_future(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
4580 (self, Box::pin(future::ready(Ok(()))))
4581 }
4582}
4583
4584#[cfg(test)]
4585mod tests {
4586 use super::*;
4587
4588 #[test]
4589 fn peel_successor_envelopes_returns_plain_messages_unchanged() {
4590 let params = serde_json::json!({ "key": "value" });
4591 let (method, peeled) = peel_successor_envelopes("session/update", ¶ms);
4592 assert_eq!(method, "session/update");
4593 assert_eq!(peeled, ¶ms);
4594 }
4595
4596 #[test]
4597 fn peel_successor_envelopes_unwraps_nested_envelopes() {
4598 let params = serde_json::json!({
4599 "method": "_proxy/successor",
4600 "params": {
4601 "method": "$/cancel_request",
4602 "params": { "requestId": "req-1" }
4603 }
4604 });
4605 let (method, peeled) = peel_successor_envelopes("_proxy/successor", ¶ms);
4606 assert_eq!(method, "$/cancel_request");
4607 assert_eq!(peeled, &serde_json::json!({ "requestId": "req-1" }));
4608 }
4609
4610 #[test]
4611 fn peel_successor_envelopes_leaves_malformed_envelopes_intact() {
4612 // No string `method` field: the envelope cannot be peeled, so the
4613 // message is returned as-is for the handler chain to deal with.
4614 let params = serde_json::json!({ "unexpected": true });
4615 let (method, peeled) = peel_successor_envelopes("_proxy/successor", ¶ms);
4616 assert_eq!(method, "_proxy/successor");
4617 assert_eq!(peeled, ¶ms);
4618 }
4619
4620 #[cfg(feature = "unstable_cancel_request")]
4621 mod cancel_request {
4622 use super::super::*;
4623
4624 fn notification(method: &str, params: serde_json::Value) -> UntypedMessage {
4625 UntypedMessage::new(method, params).expect("well-formed JSON")
4626 }
4627
4628 #[test]
4629 fn cancellation_request_id_is_extracted_from_wrapped_notifications() {
4630 let message = notification(
4631 "_proxy/successor",
4632 serde_json::json!({
4633 "method": "$/cancel_request",
4634 "params": { "requestId": "req-1" }
4635 }),
4636 );
4637 let request_id = cancellation_request_id_from_message(&message)
4638 .expect("wrapped cancel should parse");
4639 assert_eq!(request_id, Some(RequestId::Str("req-1".into())));
4640 }
4641
4642 #[test]
4643 fn malformed_successor_envelope_is_not_treated_as_cancellation() {
4644 // The envelope cannot be peeled; the message must flow on to the
4645 // handler chain instead of erroring the dispatch.
4646 let message = notification("_proxy/successor", serde_json::json!({ "bogus": true }));
4647 let request_id = cancellation_request_id_from_message(&message)
4648 .expect("malformed envelope should be left to the handler chain");
4649 assert_eq!(request_id, None);
4650 }
4651
4652 #[test]
4653 fn cancel_request_notifications_are_detected_even_when_wrapped() {
4654 let plain = notification("$/cancel_request", serde_json::json!({ "requestId": 1 }));
4655 assert!(is_cancel_request_notification(&plain));
4656
4657 let wrapped = notification(
4658 "_proxy/successor",
4659 serde_json::json!({
4660 "method": "$/cancel_request",
4661 "params": { "requestId": 1 }
4662 }),
4663 );
4664 assert!(is_cancel_request_notification(&wrapped));
4665
4666 let other_wrapped = notification(
4667 "_proxy/successor",
4668 serde_json::json!({
4669 "method": "session/update",
4670 "params": {}
4671 }),
4672 );
4673 assert!(!is_cancel_request_notification(&other_wrapped));
4674
4675 let malformed_envelope =
4676 notification("_proxy/successor", serde_json::json!({ "bogus": true }));
4677 assert!(!is_cancel_request_notification(&malformed_envelope));
4678 }
4679
4680 #[test]
4681 fn malformed_cancel_request_params_error() {
4682 let message = notification(
4683 "$/cancel_request",
4684 serde_json::json!({ "requestId": { "not": "an id" } }),
4685 );
4686 cancellation_request_id_from_message(&message)
4687 .expect_err("malformed cancel params should error");
4688 }
4689
4690 #[test]
4691 fn registry_marks_and_removes_requests() {
4692 let registry = RequestCancellationRegistry::new();
4693 let id = RequestId::Str("req-1".into());
4694
4695 let responder_cancellation = registry.register(&id);
4696 let marker = responder_cancellation.cancellation();
4697 assert!(!marker.is_cancelled());
4698
4699 assert!(registry.cancel(&id));
4700 assert!(marker.is_cancelled());
4701 assert!(responder_cancellation.cancellation().is_cancelled());
4702
4703 drop(responder_cancellation);
4704 assert!(!registry.cancel(&id), "slot should be removed on drop");
4705 }
4706
4707 #[test]
4708 fn reused_request_id_does_not_cross_wire_cancellation_state() {
4709 let registry = RequestCancellationRegistry::new();
4710 let id = RequestId::Str("dup".into());
4711
4712 // A protocol-violating peer reuses an in-flight request ID.
4713 let first = registry.register(&id);
4714 let first_marker = first.cancellation();
4715 let second = registry.register(&id);
4716 let second_marker = second.cancellation();
4717
4718 // A cancellation targets whichever request currently owns the ID.
4719 assert!(registry.cancel(&id));
4720 assert!(second_marker.is_cancelled());
4721 assert!(
4722 !first_marker.is_cancelled(),
4723 "the stale request must not observe the newer request's cancellation"
4724 );
4725
4726 // The stale responder must hand out detached markers, not the
4727 // newer request's marker.
4728 assert!(!first.cancellation().is_cancelled());
4729
4730 // Dropping the stale responder must not remove the newer
4731 // request's slot.
4732 drop(first);
4733 assert!(registry.cancel(&id), "newer slot should still be present");
4734
4735 drop(second);
4736 assert!(!registry.cancel(&id), "slot should be removed on drop");
4737 }
4738 }
4739}