agent_client_protocol/jsonrpc.rs
1//! Core JSON-RPC server support.
2
3use agent_client_protocol_schema::SessionId;
4// Re-export jsonrpcmsg for use in public API
5pub use jsonrpcmsg;
6
7// Types re-exported from crate root
8use serde::{Deserialize, Serialize};
9use std::any::TypeId;
10use std::fmt::Debug;
11use std::panic::Location;
12use std::pin::pin;
13use uuid::Uuid;
14
15use futures::channel::{mpsc, oneshot};
16use futures::future::{self, BoxFuture, Either};
17use futures::{AsyncRead, AsyncWrite, StreamExt};
18
19mod dynamic_handler;
20pub(crate) mod handlers;
21mod incoming_actor;
22mod outgoing_actor;
23mod protocol_compat;
24pub(crate) mod run;
25mod task_actor;
26mod transport_actor;
27
28use crate::jsonrpc::dynamic_handler::DynamicHandlerMessage;
29pub use crate::jsonrpc::handlers::NullHandler;
30use crate::jsonrpc::handlers::{ChainedHandler, NamedHandler};
31use crate::jsonrpc::handlers::{MessageHandler, NotificationHandler, RequestHandler};
32use crate::jsonrpc::outgoing_actor::{OutgoingMessageTx, send_raw_message};
33use crate::jsonrpc::protocol_compat::{ProtocolCompat, ProtocolMode};
34use crate::jsonrpc::run::SpawnedRun;
35use crate::jsonrpc::run::{ChainRun, NullRun, RunWithConnectionTo};
36use crate::jsonrpc::task_actor::{Task, TaskTx};
37use crate::mcp_server::McpServer;
38use crate::role::HasPeer;
39use crate::role::Role;
40use crate::util::json_cast;
41use crate::{Agent, Client, ConnectTo, RoleId};
42
43/// Handlers process incoming JSON-RPC messages on a connection.
44///
45/// When messages arrive, they flow through a chain of handlers. Each handler can
46/// either **claim** the message (handle it) or **decline** it (pass to the next handler).
47///
48/// # Message Flow
49///
50/// Messages flow through three layers of handlers in order:
51///
52/// ```text
53/// ┌─────────────────────────────────────────────────────────────────┐
54/// │ Incoming Message │
55/// └─────────────────────────────────────────────────────────────────┘
56/// │
57/// ▼
58/// ┌─────────────────────────────────────────────────────────────────┐
59/// │ 1. User Handlers (registered via on_receive_request, etc.) │
60/// │ - Tried in registration order │
61/// │ - First handler to return Handled::Yes claims the message │
62/// └─────────────────────────────────────────────────────────────────┘
63/// │ Handled::No
64/// ▼
65/// ┌─────────────────────────────────────────────────────────────────┐
66/// │ 2. Dynamic Handlers (added at runtime) │
67/// │ - Used for session-specific message handling │
68/// │ - Added via ConnectionTo::add_dynamic_handler │
69/// └─────────────────────────────────────────────────────────────────┘
70/// │ Handled::No
71/// ▼
72/// ┌─────────────────────────────────────────────────────────────────┐
73/// │ 3. Role Default Handler │
74/// │ - Fallback based on the connection's Role │
75/// │ - Handles protocol-level messages (e.g., proxy forwarding) │
76/// └─────────────────────────────────────────────────────────────────┘
77/// │ Handled::No
78/// ▼
79/// ┌─────────────────────────────────────────────────────────────────┐
80/// │ Unhandled: Error response sent (or queued if retry=true) │
81/// └─────────────────────────────────────────────────────────────────┘
82/// ```
83///
84/// # The `Handled` Return Value
85///
86/// Each handler returns [`Handled`] to indicate whether it processed the message:
87///
88/// - **`Handled::Yes`** - Message was handled. No further handlers are invoked.
89/// - **`Handled::No { message, retry }`** - Message was not handled. The message
90/// (possibly modified) is passed to the next handler in the chain.
91///
92/// For convenience, handlers can return `()` which is equivalent to `Handled::Yes`.
93///
94/// # The Retry Mechanism
95///
96/// The `retry` flag in `Handled::No` controls what happens when no handler claims a message:
97///
98/// - **`retry: false`** (default) - Send a "method not found" error response immediately.
99/// - **`retry: true`** - Queue the message and retry it when new dynamic handlers are added.
100///
101/// This mechanism exists because of a timing issue with sessions: when a `session/new`
102/// response is being processed, the dynamic handler for that session hasn't been registered
103/// yet, but `session/update` notifications for that session may already be arriving.
104/// By setting `retry: true`, these early notifications are queued until the session's
105/// dynamic handler is added.
106///
107/// # Handler Registration
108///
109/// Most users register handlers using the builder methods on [`Builder`]:
110///
111/// ```
112/// # use agent_client_protocol::{Agent, Client, ConnectTo};
113/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, AgentCapabilities};
114/// # use agent_client_protocol_test::StatusUpdate;
115/// # async fn example(transport: impl ConnectTo<Agent>) -> Result<(), agent_client_protocol::Error> {
116/// Agent.builder()
117/// .on_receive_request(async |req: InitializeRequest, responder, cx| {
118/// responder.respond(
119/// InitializeResponse::new(req.protocol_version)
120/// .agent_capabilities(AgentCapabilities::new()),
121/// )
122/// }, agent_client_protocol::on_receive_request!())
123/// .on_receive_notification(async |notif: StatusUpdate, cx| {
124/// // Process notification
125/// Ok(())
126/// }, agent_client_protocol::on_receive_notification!())
127/// .connect_to(transport)
128/// .await?;
129/// # Ok(())
130/// # }
131/// ```
132///
133/// The type parameter on the closure determines which messages are dispatched to it.
134/// Messages that don't match the type are automatically passed to the next handler.
135///
136/// # Implementing Custom Handlers
137///
138/// For advanced use cases, you can implement `HandleMessageAs` directly:
139///
140/// ```ignore
141/// struct MyHandler;
142///
143/// impl HandleMessageAs<Agent> for MyHandler {
144///
145/// async fn handle_dispatch(
146/// &mut self,
147/// message: Dispatch,
148/// cx: ConnectionTo<Self::Role>,
149/// ) -> Result<Handled<Dispatch>, Error> {
150/// if message.method() == "my/custom/method" {
151/// // Handle it
152/// Ok(Handled::Yes)
153/// } else {
154/// // Pass to next handler
155/// Ok(Handled::No { message, retry: false })
156/// }
157/// }
158///
159/// fn describe_chain(&self) -> impl std::fmt::Debug {
160/// "MyHandler"
161/// }
162/// }
163/// ```
164///
165/// # Important: Handlers Must Not Block
166///
167/// The connection processes messages on a single async task. While a handler is running,
168/// no other messages can be processed. For expensive operations, use [`ConnectionTo::spawn`]
169/// to run work concurrently:
170///
171/// ```
172/// # use agent_client_protocol::{Client, Agent, ConnectTo};
173/// # use agent_client_protocol_test::{expensive_operation, ProcessComplete};
174/// # async fn example(transport: impl ConnectTo<Client>) -> Result<(), agent_client_protocol::Error> {
175/// # Client.builder().connect_with(transport, async |cx| {
176/// cx.spawn({
177/// let connection = cx.clone();
178/// async move {
179/// let result = expensive_operation("data").await?;
180/// connection.send_notification(ProcessComplete { result })?;
181/// Ok(())
182/// }
183/// })?;
184/// # Ok(())
185/// # }).await?;
186/// # Ok(())
187/// # }
188/// ```
189#[allow(async_fn_in_trait)]
190/// A handler for incoming JSON-RPC messages.
191///
192/// This trait is implemented by types that can process incoming messages on a connection.
193/// Handlers are registered with a [`Builder`] and are called in order until
194/// one claims the message.
195///
196/// The type parameter `R` is the role this handler plays - who I am.
197/// For an agent handler, `R = Agent` (I handle messages as an agent).
198/// For a client handler, `R = Client` (I handle messages as a client).
199pub trait HandleDispatchFrom<Counterpart: Role>: Send {
200 /// Attempt to claim an incoming message (request or notification).
201 ///
202 /// # Important: do not block
203 ///
204 /// The server will not process new messages until this handler returns.
205 /// You should avoid blocking in this callback unless you wish to block the server (e.g., for rate limiting).
206 /// The recommended approach to manage expensive operations is to the [`ConnectionTo::spawn`] method available on the message context.
207 ///
208 /// # Parameters
209 ///
210 /// * `message` - The incoming message to handle.
211 /// * `connection` - The connection, used to send messages and access connection state.
212 ///
213 /// # Returns
214 ///
215 /// * `Ok(Handled::Yes)` if the message was claimed. It will not be propagated further.
216 /// * `Ok(Handled::No(message))` if not; the (possibly changed) message will be passed to the remaining handlers.
217 /// * `Err` if an internal error occurs (this will bring down the server).
218 fn handle_dispatch_from(
219 &mut self,
220 message: Dispatch,
221 connection: ConnectionTo<Counterpart>,
222 ) -> impl Future<Output = Result<Handled<Dispatch>, crate::Error>> + Send;
223
224 /// Returns a debug description of the registered handlers for diagnostics.
225 fn describe_chain(&self) -> impl std::fmt::Debug;
226}
227
228impl<Counterpart: Role, H> HandleDispatchFrom<Counterpart> for &mut H
229where
230 H: HandleDispatchFrom<Counterpart>,
231{
232 fn handle_dispatch_from(
233 &mut self,
234 message: Dispatch,
235 cx: ConnectionTo<Counterpart>,
236 ) -> impl Future<Output = Result<Handled<Dispatch>, crate::Error>> + Send {
237 H::handle_dispatch_from(self, message, cx)
238 }
239
240 fn describe_chain(&self) -> impl std::fmt::Debug {
241 H::describe_chain(self)
242 }
243}
244
245/// A JSON-RPC connection that can act as either a server, client, or both.
246///
247/// [`Builder`] provides a builder-style API for creating JSON-RPC servers and clients.
248/// You start by calling `Role.builder()` (e.g., `Client.builder()`), then add message
249/// handlers, and finally drive the connection with either [`connect_to`](Builder::connect_to)
250/// or [`connect_with`](Builder::connect_with), providing a component implementation
251/// (e.g., [`ByteStreams`] for byte streams).
252///
253/// # JSON-RPC Primer
254///
255/// JSON-RPC 2.0 has two fundamental message types:
256///
257/// * **Requests** - Messages that expect a response. They have an `id` field that gets
258/// echoed back in the response so the sender can correlate them.
259/// * **Notifications** - Fire-and-forget messages with no `id` field. The sender doesn't
260/// expect or receive a response.
261///
262/// # Type-Driven Message Dispatch
263///
264/// The handler registration methods use Rust's type system to determine which messages
265/// to handle. The type parameter you provide controls what gets dispatched to your handler:
266///
267/// ## Single Message Types
268///
269/// The simplest case - handle one specific message type:
270///
271/// ```no_run
272/// # use agent_client_protocol_test::*;
273/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, SessionNotification};
274/// # async fn example() -> Result<(), agent_client_protocol::Error> {
275/// # let connection = mock_connection();
276/// connection
277/// .on_receive_request(async |req: InitializeRequest, responder, cx| {
278/// // Handle only InitializeRequest messages
279/// responder.respond(InitializeResponse::make())
280/// }, agent_client_protocol::on_receive_request!())
281/// .on_receive_notification(async |notif: SessionNotification, cx| {
282/// // Handle only SessionUpdate notifications
283/// Ok(())
284/// }, agent_client_protocol::on_receive_notification!())
285/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
286/// # Ok(())
287/// # }
288/// ```
289///
290/// ## Enum Message Types
291///
292/// You can also handle multiple related messages with a single handler by defining an enum
293/// that implements the appropriate trait ([`JsonRpcRequest`] or [`JsonRpcNotification`]):
294///
295/// ```no_run
296/// # use agent_client_protocol_test::*;
297/// # use agent_client_protocol::{JsonRpcRequest, JsonRpcMessage, UntypedMessage};
298/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse};
299/// # async fn example() -> Result<(), agent_client_protocol::Error> {
300/// # let connection = mock_connection();
301/// // Define an enum for multiple request types
302/// #[derive(Debug, Clone)]
303/// enum MyRequests {
304/// Initialize(InitializeRequest),
305/// Prompt(PromptRequest),
306/// }
307///
308/// // Implement JsonRpcRequest for your enum
309/// # impl JsonRpcMessage for MyRequests {
310/// # fn matches_method(_method: &str) -> bool { false }
311/// # fn method(&self) -> &str { "myRequests" }
312/// # fn to_untyped_message(&self) -> Result<UntypedMessage, agent_client_protocol::Error> { todo!() }
313/// # fn parse_message(_method: &str, _params: &impl serde::Serialize) -> Result<Self, agent_client_protocol::Error> { Err(agent_client_protocol::Error::method_not_found()) }
314/// # }
315/// impl JsonRpcRequest for MyRequests { type Response = serde_json::Value; }
316///
317/// // Handle all variants in one place
318/// connection.on_receive_request(async |req: MyRequests, responder, cx| {
319/// match req {
320/// MyRequests::Initialize(init) => { responder.respond(serde_json::json!({})) }
321/// MyRequests::Prompt(prompt) => { responder.respond(serde_json::json!({})) }
322/// }
323/// }, agent_client_protocol::on_receive_request!())
324/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
325/// # Ok(())
326/// # }
327/// ```
328///
329/// ## Mixed Message Types
330///
331/// For enums containing both requests AND notifications, use [`on_receive_dispatch`](Self::on_receive_dispatch):
332///
333/// ```no_run
334/// # use agent_client_protocol_test::*;
335/// # use agent_client_protocol::Dispatch;
336/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, SessionNotification};
337/// # async fn example() -> Result<(), agent_client_protocol::Error> {
338/// # let connection = mock_connection();
339/// // on_receive_dispatch receives Dispatch which can be either a request or notification
340/// connection.on_receive_dispatch(async |msg: Dispatch<InitializeRequest, SessionNotification>, _cx| {
341/// match msg {
342/// Dispatch::Request(req, responder) => {
343/// responder.respond(InitializeResponse::make())
344/// }
345/// Dispatch::Notification(notif) => {
346/// Ok(())
347/// }
348/// Dispatch::Response(result, router) => {
349/// // Forward response to its destination
350/// router.respond_with_result(result)
351/// }
352/// }
353/// }, agent_client_protocol::on_receive_dispatch!())
354/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
355/// # Ok(())
356/// # }
357/// ```
358///
359/// # Handler Registration
360///
361/// Register handlers using these methods (listed from most common to most flexible):
362///
363/// * [`on_receive_request`](Self::on_receive_request) - Handle JSON-RPC requests (messages expecting responses)
364/// * [`on_receive_notification`](Self::on_receive_notification) - Handle JSON-RPC notifications (fire-and-forget)
365/// * [`on_receive_dispatch`](Self::on_receive_dispatch) - Handle enums containing both requests and notifications
366/// * [`with_handler`](Self::with_handler) - Low-level primitive for maximum flexibility
367///
368/// ## Handler Ordering
369///
370/// Handlers are tried in the order you register them. The first handler that claims a message
371/// (by matching its type) will process it. Subsequent handlers won't see that message:
372///
373/// ```no_run
374/// # use agent_client_protocol_test::*;
375/// # use agent_client_protocol::{Dispatch, UntypedMessage};
376/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse};
377/// # async fn example() -> Result<(), agent_client_protocol::Error> {
378/// # let connection = mock_connection();
379/// connection
380/// .on_receive_request(async |req: InitializeRequest, responder, cx| {
381/// // This runs first for InitializeRequest
382/// responder.respond(InitializeResponse::make())
383/// }, agent_client_protocol::on_receive_request!())
384/// .on_receive_request(async |req: PromptRequest, responder, cx| {
385/// // This runs first for PromptRequest
386/// responder.respond(PromptResponse::make())
387/// }, agent_client_protocol::on_receive_request!())
388/// .on_receive_dispatch(async |msg: Dispatch, cx| {
389/// // This runs for any message not handled above
390/// msg.respond_with_error(agent_client_protocol::util::internal_error("unknown method"), cx)
391/// }, agent_client_protocol::on_receive_dispatch!())
392/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
393/// # Ok(())
394/// # }
395/// ```
396///
397/// # Event Loop and Concurrency
398///
399/// Understanding the event loop is critical for writing correct handlers.
400///
401/// ## The Event Loop
402///
403/// [`Builder`] runs all handler callbacks on a single async task - the event loop.
404/// While a handler is running, **the server cannot receive new messages**. This means
405/// any blocking or expensive work in your handlers will stall the entire connection.
406///
407/// To avoid blocking the event loop, use [`ConnectionTo::spawn`] to offload serious
408/// work to concurrent tasks:
409///
410/// ```no_run
411/// # use agent_client_protocol_test::*;
412/// # async fn example() -> Result<(), agent_client_protocol::Error> {
413/// # let connection = mock_connection();
414/// connection.on_receive_request(async |req: AnalyzeRequest, responder, cx| {
415/// // Clone cx for the spawned task
416/// cx.spawn({
417/// let connection = cx.clone();
418/// async move {
419/// let result = expensive_analysis(&req.data).await?;
420/// connection.send_notification(AnalysisComplete { result })?;
421/// Ok(())
422/// }
423/// })?;
424///
425/// // Respond immediately without blocking
426/// responder.respond(AnalysisStarted { job_id: 42 })
427/// }, agent_client_protocol::on_receive_request!())
428/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
429/// # Ok(())
430/// # }
431/// ```
432///
433/// Note that the entire connection runs within one async task, so parallelism must be
434/// managed explicitly using [`spawn`](ConnectionTo::spawn).
435///
436/// ## The Connection Context
437///
438/// Handler callbacks receive a context object (`cx`) for interacting with the connection:
439///
440/// * **For request handlers** - [`Responder<R>`] provides [`respond`](Responder::respond)
441/// to send the response, plus methods to send other messages
442/// * **For notification handlers** - [`ConnectionTo`] provides methods to send messages
443/// and spawn tasks
444///
445/// Both context types support:
446/// * [`send_request`](ConnectionTo::send_request) - Send requests to the other side
447/// * [`send_notification`](ConnectionTo::send_notification) - Send notifications
448/// * [`spawn`](ConnectionTo::spawn) - Run tasks concurrently without blocking the event loop
449///
450/// The [`SentRequest`] returned by `send_request` provides methods like
451/// [`on_receiving_result`](SentRequest::on_receiving_result) that help you
452/// avoid accidentally blocking the event loop while waiting for responses.
453///
454/// # Driving the Connection
455///
456/// After adding handlers, you must drive the connection using one of two modes:
457///
458/// ## Server Mode: `connect_to()`
459///
460/// Use [`connect_to`](Self::connect_to) when you only need to respond to incoming messages:
461///
462/// ```no_run
463/// # use agent_client_protocol_test::*;
464/// # async fn example() -> Result<(), agent_client_protocol::Error> {
465/// # let connection = mock_connection();
466/// connection
467/// .on_receive_request(async |req: MyRequest, responder, cx| {
468/// responder.respond(MyResponse { status: "ok".into() })
469/// }, agent_client_protocol::on_receive_request!())
470/// .connect_to(MockTransport) // Runs until connection closes or error occurs
471/// .await?;
472/// # Ok(())
473/// # }
474/// ```
475///
476/// The connection will process incoming messages and invoke your handlers until the
477/// connection is closed or an error occurs.
478///
479/// ## Client Mode: `connect_with()`
480///
481/// Use [`connect_with`](Self::connect_with) when you need to both handle incoming messages
482/// AND send your own requests/notifications:
483///
484/// ```no_run
485/// # use agent_client_protocol_test::*;
486/// # use agent_client_protocol::schema::InitializeRequest;
487/// # async fn example() -> Result<(), agent_client_protocol::Error> {
488/// # let connection = mock_connection();
489/// connection
490/// .on_receive_request(async |req: MyRequest, responder, cx| {
491/// responder.respond(MyResponse { status: "ok".into() })
492/// }, agent_client_protocol::on_receive_request!())
493/// .connect_with(MockTransport, async |cx| {
494/// // You can send requests to the other side
495/// let response = cx.send_request(InitializeRequest::make())
496/// .block_task()
497/// .await?;
498///
499/// // And send notifications
500/// cx.send_notification(StatusUpdate { message: "ready".into() })?;
501///
502/// Ok(())
503/// })
504/// .await?;
505/// # Ok(())
506/// # }
507/// ```
508///
509/// The connection will serve incoming messages in the background while your client closure
510/// runs. When the closure returns, the connection shuts down.
511///
512/// # Example: Complete Agent
513///
514/// ```no_run
515/// # use agent_client_protocol::UntypedRole;
516/// # use agent_client_protocol::{Builder};
517/// # use agent_client_protocol::Stdio;
518/// # use agent_client_protocol::schema::{InitializeRequest, InitializeResponse, PromptRequest, PromptResponse, SessionNotification};
519/// # async fn example() -> Result<(), agent_client_protocol::Error> {
520/// let transport = Stdio::new();
521///
522/// UntypedRole.builder()
523/// .name("my-agent") // Optional: for debugging logs
524/// .on_receive_request(async |init: InitializeRequest, responder, cx| {
525/// let response: InitializeResponse = todo!();
526/// responder.respond(response)
527/// }, agent_client_protocol::on_receive_request!())
528/// .on_receive_request(async |prompt: PromptRequest, responder, cx| {
529/// // You can send notifications while processing a request
530/// let notif: SessionNotification = todo!();
531/// cx.send_notification(notif)?;
532///
533/// // Then respond to the request
534/// let response: PromptResponse = todo!();
535/// responder.respond(response)
536/// }, agent_client_protocol::on_receive_request!())
537/// .connect_to(transport)
538/// .await?;
539/// # Ok(())
540/// # }
541/// ```
542#[must_use]
543#[derive(Debug)]
544pub struct Builder<Host: Role, Handler = NullHandler, Runner = NullRun>
545where
546 Handler: HandleDispatchFrom<Host::Counterpart>,
547 Runner: RunWithConnectionTo<Host::Counterpart>,
548{
549 /// My role.
550 host: Host,
551
552 /// Name of the connection, used in tracing logs.
553 name: Option<String>,
554
555 /// Handler for incoming messages.
556 handler: Handler,
557
558 /// Responder for background tasks.
559 responder: Runner,
560
561 /// Protocol version mode for the public API and wire compatibility layer.
562 protocol_mode: ProtocolMode,
563}
564
565fn default_protocol_mode<Host: Role>() -> ProtocolMode {
566 let role = TypeId::of::<Host>();
567
568 if role == TypeId::of::<Agent>() {
569 ProtocolMode::v1_agent()
570 } else if role == TypeId::of::<Client>() {
571 ProtocolMode::v1_client()
572 } else {
573 ProtocolMode::disabled()
574 }
575}
576
577impl<Host: Role> Builder<Host, NullHandler, NullRun> {
578 /// Create a new connection builder for the given role.
579 /// This type follows a builder pattern; use other methods to configure and then invoke
580 /// [`Self::connect_to`] (to use as a server) or [`Self::connect_with`] to use as a client.
581 pub fn new(role: Host) -> Self {
582 Self {
583 host: role,
584 name: None,
585 handler: NullHandler,
586 responder: NullRun,
587 protocol_mode: default_protocol_mode::<Host>(),
588 }
589 }
590}
591
592impl<Host: Role, Handler> Builder<Host, Handler, NullRun>
593where
594 Handler: HandleDispatchFrom<Host::Counterpart>,
595{
596 /// Create a new connection builder with the given handler.
597 pub fn new_with(role: Host, handler: Handler) -> Self {
598 Self {
599 host: role,
600 name: None,
601 handler,
602 responder: NullRun,
603 protocol_mode: default_protocol_mode::<Host>(),
604 }
605 }
606}
607
608impl<
609 Host: Role,
610 Handler: HandleDispatchFrom<Host::Counterpart>,
611 Runner: RunWithConnectionTo<Host::Counterpart>,
612> Builder<Host, Handler, Runner>
613{
614 /// Set the "name" of this connection -- used only for debugging logs.
615 pub fn name(mut self, name: impl ToString) -> Self {
616 self.name = Some(name.to_string());
617 self
618 }
619
620 pub(crate) fn v1_agent(mut self) -> Self {
621 self.protocol_mode = ProtocolMode::v1_agent();
622 self
623 }
624
625 pub(crate) fn v1_client(mut self) -> Self {
626 self.protocol_mode = ProtocolMode::v1_client();
627 self
628 }
629
630 #[cfg(feature = "unstable_protocol_v2")]
631 pub(crate) fn v2_agent(mut self) -> Self {
632 self.protocol_mode = ProtocolMode::v2_agent();
633 self
634 }
635
636 #[cfg(feature = "unstable_protocol_v2")]
637 pub(crate) fn v2_client(mut self) -> Self {
638 self.protocol_mode = ProtocolMode::v2_client();
639 self
640 }
641
642 /// Merge another [`Builder`] into this one.
643 ///
644 /// Prefer [`Self::on_receive_request`] or [`Self::on_receive_notification`].
645 /// This is a low-level method that is not intended for general use.
646 pub fn with_connection_builder(
647 self,
648 other: Builder<
649 Host,
650 impl HandleDispatchFrom<Host::Counterpart>,
651 impl RunWithConnectionTo<Host::Counterpart>,
652 >,
653 ) -> Builder<
654 Host,
655 impl HandleDispatchFrom<Host::Counterpart>,
656 impl RunWithConnectionTo<Host::Counterpart>,
657 > {
658 let Builder {
659 name: other_name,
660 handler: other_handler,
661 responder: other_responder,
662 protocol_mode: other_protocol_mode,
663 host: _,
664 } = other;
665 Builder {
666 host: self.host,
667 name: self.name,
668 handler: ChainedHandler::new(
669 self.handler,
670 NamedHandler::new(other_name, other_handler),
671 ),
672 responder: ChainRun::new(self.responder, other_responder),
673 protocol_mode: self.protocol_mode.merge(other_protocol_mode),
674 }
675 }
676
677 /// Add a new [`HandleDispatchFrom`] to the chain.
678 ///
679 /// Prefer [`Self::on_receive_request`] or [`Self::on_receive_notification`].
680 /// This is a low-level method that is not intended for general use.
681 pub fn with_handler(
682 self,
683 handler: impl HandleDispatchFrom<Host::Counterpart>,
684 ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner> {
685 Builder {
686 host: self.host,
687 name: self.name,
688 handler: ChainedHandler::new(self.handler, handler),
689 responder: self.responder,
690 protocol_mode: self.protocol_mode,
691 }
692 }
693
694 /// Add a new [`RunWithConnectionTo`] to the chain.
695 pub fn with_responder<Run1>(
696 self,
697 responder: Run1,
698 ) -> Builder<Host, Handler, impl RunWithConnectionTo<Host::Counterpart>>
699 where
700 Run1: RunWithConnectionTo<Host::Counterpart>,
701 {
702 Builder {
703 host: self.host,
704 name: self.name,
705 handler: self.handler,
706 responder: ChainRun::new(self.responder, responder),
707 protocol_mode: self.protocol_mode,
708 }
709 }
710
711 /// Enqueue a task to run once the connection is actively serving traffic.
712 #[track_caller]
713 pub fn with_spawned<F, Fut>(
714 self,
715 task: F,
716 ) -> Builder<Host, Handler, impl RunWithConnectionTo<Host::Counterpart>>
717 where
718 F: FnOnce(ConnectionTo<Host::Counterpart>) -> Fut + Send,
719 Fut: Future<Output = Result<(), crate::Error>> + Send,
720 {
721 let location = Location::caller();
722 self.with_responder(SpawnedRun::new(location, task))
723 }
724
725 /// Register a handler for messages that can be either requests OR notifications.
726 ///
727 /// Use this when you want to handle an enum type that contains both request and
728 /// notification variants. Your handler receives a [`Dispatch<Req, Notif>`] which
729 /// is an enum with two variants:
730 ///
731 /// - `Dispatch::Request(request, responder)` - A request with its response context
732 /// - `Dispatch::Notification(notification)` - A notification
733 /// - `Dispatch::Response(result, router)` - A response to a request we sent
734 ///
735 /// # Example
736 ///
737 /// ```no_run
738 /// # use agent_client_protocol_test::*;
739 /// # use agent_client_protocol::Dispatch;
740 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
741 /// # let connection = mock_connection();
742 /// connection.on_receive_dispatch(async |message: Dispatch<MyRequest, StatusUpdate>, _cx| {
743 /// match message {
744 /// Dispatch::Request(req, responder) => {
745 /// // Handle request and send response
746 /// responder.respond(MyResponse { status: "ok".into() })
747 /// }
748 /// Dispatch::Notification(notif) => {
749 /// // Handle notification (no response needed)
750 /// Ok(())
751 /// }
752 /// Dispatch::Response(result, router) => {
753 /// // Forward response to its destination
754 /// router.respond_with_result(result)
755 /// }
756 /// }
757 /// }, agent_client_protocol::on_receive_dispatch!())
758 /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
759 /// # Ok(())
760 /// # }
761 /// ```
762 ///
763 /// For most use cases, prefer [`on_receive_request`](Self::on_receive_request) or
764 /// [`on_receive_notification`](Self::on_receive_notification) which provide cleaner APIs
765 /// for handling requests or notifications separately.
766 ///
767 /// # Ordering
768 ///
769 /// This callback runs inside the dispatch loop and blocks further message processing
770 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
771 /// ordering guarantees and how to avoid deadlocks.
772 pub fn on_receive_dispatch<Req, Notif, F, T, ToFut>(
773 self,
774 op: F,
775 to_future_hack: ToFut,
776 ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
777 where
778 Host::Counterpart: HasPeer<Host::Counterpart>,
779 Req: JsonRpcRequest,
780 Notif: JsonRpcNotification,
781 F: AsyncFnMut(
782 Dispatch<Req, Notif>,
783 ConnectionTo<Host::Counterpart>,
784 ) -> Result<T, crate::Error>
785 + Send,
786 T: IntoHandled<Dispatch<Req, Notif>>,
787 ToFut: Fn(
788 &mut F,
789 Dispatch<Req, Notif>,
790 ConnectionTo<Host::Counterpart>,
791 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
792 + Send
793 + Sync,
794 {
795 let handler = MessageHandler::new(
796 self.host.counterpart(),
797 self.host.counterpart(),
798 op,
799 to_future_hack,
800 );
801 self.with_handler(handler)
802 }
803
804 /// Register a handler for JSON-RPC requests of type `Req`.
805 ///
806 /// Your handler receives two arguments:
807 /// 1. The request (type `Req`)
808 /// 2. A [`Responder<R, Req::Response>`] for sending the response
809 ///
810 /// The request context allows you to:
811 /// - Send the response with [`Responder::respond`]
812 /// - Send notifications to the client with [`ConnectionTo::send_notification`]
813 /// - Send requests to the client with [`ConnectionTo::send_request`]
814 ///
815 /// # Example
816 ///
817 /// ```ignore
818 /// # use agent_client_protocol::UntypedRole;
819 /// # use agent_client_protocol::{Builder};
820 /// # use agent_client_protocol::schema::{PromptRequest, PromptResponse, SessionNotification};
821 /// # fn example<R: agent_client_protocol::Role>(connection: Builder<R, impl agent_client_protocol::HandleMessageAs<R>>) {
822 /// connection.on_receive_request(async |request: PromptRequest, responder, cx| {
823 /// // Send a notification while processing
824 /// let notif: SessionNotification = todo!();
825 /// cx.send_notification(notif)?;
826 ///
827 /// // Do some work...
828 /// let result = todo!("process the prompt");
829 ///
830 /// // Send the response
831 /// let response: PromptResponse = todo!();
832 /// responder.respond(response)
833 /// }, agent_client_protocol::on_receive_request!());
834 /// # }
835 /// ```
836 ///
837 /// # Type Parameter
838 ///
839 /// `Req` can be either a single request type or an enum of multiple request types.
840 /// See the [type-driven dispatch](Self#type-driven-message-dispatch) section for details.
841 ///
842 /// # Ordering
843 ///
844 /// This callback runs inside the dispatch loop and blocks further message processing
845 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
846 /// ordering guarantees and how to avoid deadlocks.
847 pub fn on_receive_request<Req: JsonRpcRequest, F, T, ToFut>(
848 self,
849 op: F,
850 to_future_hack: ToFut,
851 ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
852 where
853 Host::Counterpart: HasPeer<Host::Counterpart>,
854 F: AsyncFnMut(
855 Req,
856 Responder<Req::Response>,
857 ConnectionTo<Host::Counterpart>,
858 ) -> Result<T, crate::Error>
859 + Send,
860 T: IntoHandled<(Req, Responder<Req::Response>)>,
861 ToFut: Fn(
862 &mut F,
863 Req,
864 Responder<Req::Response>,
865 ConnectionTo<Host::Counterpart>,
866 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
867 + Send
868 + Sync,
869 {
870 let handler = RequestHandler::new(
871 self.host.counterpart(),
872 self.host.counterpart(),
873 op,
874 to_future_hack,
875 );
876 self.with_handler(handler)
877 }
878
879 /// Register a handler for JSON-RPC notifications of type `Notif`.
880 ///
881 /// Notifications are fire-and-forget messages that don't expect a response.
882 /// Your handler receives:
883 /// 1. The notification (type `Notif`)
884 /// 2. A [`ConnectionTo<R>`] for sending messages to the other side
885 ///
886 /// Unlike request handlers, you cannot send a response (notifications don't have IDs),
887 /// but you can still send your own requests and notifications using the context.
888 ///
889 /// # Example
890 ///
891 /// ```no_run
892 /// # use agent_client_protocol_test::*;
893 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
894 /// # let connection = mock_connection();
895 /// connection.on_receive_notification(async |notif: SessionUpdate, cx| {
896 /// // Process the notification
897 /// update_session_state(¬if)?;
898 ///
899 /// // Optionally send a notification back
900 /// cx.send_notification(StatusUpdate {
901 /// message: "Acknowledged".into(),
902 /// })?;
903 ///
904 /// Ok(())
905 /// }, agent_client_protocol::on_receive_notification!())
906 /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
907 /// # Ok(())
908 /// # }
909 /// ```
910 ///
911 /// # Type Parameter
912 ///
913 /// `Notif` can be either a single notification type or an enum of multiple notification types.
914 /// See the [type-driven dispatch](Self#type-driven-message-dispatch) section for details.
915 ///
916 /// # Ordering
917 ///
918 /// This callback runs inside the dispatch loop and blocks further message processing
919 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
920 /// ordering guarantees and how to avoid deadlocks.
921 pub fn on_receive_notification<Notif, F, T, ToFut>(
922 self,
923 op: F,
924 to_future_hack: ToFut,
925 ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
926 where
927 Host::Counterpart: HasPeer<Host::Counterpart>,
928 Notif: JsonRpcNotification,
929 F: AsyncFnMut(Notif, ConnectionTo<Host::Counterpart>) -> Result<T, crate::Error> + Send,
930 T: IntoHandled<(Notif, ConnectionTo<Host::Counterpart>)>,
931 ToFut: Fn(
932 &mut F,
933 Notif,
934 ConnectionTo<Host::Counterpart>,
935 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
936 + Send
937 + Sync,
938 {
939 let handler = NotificationHandler::new(
940 self.host.counterpart(),
941 self.host.counterpart(),
942 op,
943 to_future_hack,
944 );
945 self.with_handler(handler)
946 }
947
948 /// Register a handler for messages from a specific peer.
949 ///
950 /// This is similar to [`on_receive_dispatch`](Self::on_receive_dispatch), but allows
951 /// specifying the source peer explicitly. This is useful when receiving messages
952 /// from a peer that requires message transformation (e.g., unwrapping `SuccessorMessage`
953 /// envelopes when receiving from an agent via a proxy).
954 ///
955 /// For the common case of receiving from the default counterpart, use
956 /// [`on_receive_dispatch`](Self::on_receive_dispatch) instead.
957 ///
958 /// # Ordering
959 ///
960 /// This callback runs inside the dispatch loop and blocks further message processing
961 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
962 /// ordering guarantees and how to avoid deadlocks.
963 pub fn on_receive_dispatch_from<
964 Req: JsonRpcRequest,
965 Notif: JsonRpcNotification,
966 Peer: Role,
967 F,
968 T,
969 ToFut,
970 >(
971 self,
972 peer: Peer,
973 op: F,
974 to_future_hack: ToFut,
975 ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
976 where
977 Host::Counterpart: HasPeer<Peer>,
978 F: AsyncFnMut(
979 Dispatch<Req, Notif>,
980 ConnectionTo<Host::Counterpart>,
981 ) -> Result<T, crate::Error>
982 + Send,
983 T: IntoHandled<Dispatch<Req, Notif>>,
984 ToFut: Fn(
985 &mut F,
986 Dispatch<Req, Notif>,
987 ConnectionTo<Host::Counterpart>,
988 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
989 + Send
990 + Sync,
991 {
992 let handler = MessageHandler::new(self.host.counterpart(), peer, op, to_future_hack);
993 self.with_handler(handler)
994 }
995
996 /// Register a handler for JSON-RPC requests from a specific peer.
997 ///
998 /// This is similar to [`on_receive_request`](Self::on_receive_request), but allows
999 /// specifying the source peer explicitly. This is useful when receiving messages
1000 /// from a peer that requires message transformation (e.g., unwrapping `SuccessorRequest`
1001 /// envelopes when receiving from an agent via a proxy).
1002 ///
1003 /// For the common case of receiving from the default counterpart, use
1004 /// [`on_receive_request`](Self::on_receive_request) instead.
1005 ///
1006 /// # Example
1007 ///
1008 /// ```ignore
1009 /// use agent_client_protocol::Agent;
1010 /// use agent_client_protocol::schema::InitializeRequest;
1011 ///
1012 /// // Conductor receiving from agent direction - messages will be unwrapped from SuccessorMessage
1013 /// connection.on_receive_request_from(Agent, async |req: InitializeRequest, responder, cx| {
1014 /// // Handle the request
1015 /// responder.respond(InitializeResponse::make())
1016 /// })
1017 /// ```
1018 ///
1019 /// # Ordering
1020 ///
1021 /// This callback runs inside the dispatch loop and blocks further message processing
1022 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
1023 /// ordering guarantees and how to avoid deadlocks.
1024 pub fn on_receive_request_from<Req: JsonRpcRequest, Peer: Role, F, T, ToFut>(
1025 self,
1026 peer: Peer,
1027 op: F,
1028 to_future_hack: ToFut,
1029 ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
1030 where
1031 Host::Counterpart: HasPeer<Peer>,
1032 F: AsyncFnMut(
1033 Req,
1034 Responder<Req::Response>,
1035 ConnectionTo<Host::Counterpart>,
1036 ) -> Result<T, crate::Error>
1037 + Send,
1038 T: IntoHandled<(Req, Responder<Req::Response>)>,
1039 ToFut: Fn(
1040 &mut F,
1041 Req,
1042 Responder<Req::Response>,
1043 ConnectionTo<Host::Counterpart>,
1044 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
1045 + Send
1046 + Sync,
1047 {
1048 let handler = RequestHandler::new(self.host.counterpart(), peer, op, to_future_hack);
1049 self.with_handler(handler)
1050 }
1051
1052 /// Register a handler for JSON-RPC notifications from a specific peer.
1053 ///
1054 /// This is similar to [`on_receive_notification`](Self::on_receive_notification), but allows
1055 /// specifying the source peer explicitly. This is useful when receiving messages
1056 /// from a peer that requires message transformation (e.g., unwrapping `SuccessorNotification`
1057 /// envelopes when receiving from an agent via a proxy).
1058 ///
1059 /// For the common case of receiving from the default counterpart, use
1060 /// [`on_receive_notification`](Self::on_receive_notification) instead.
1061 ///
1062 /// # Ordering
1063 ///
1064 /// This callback runs inside the dispatch loop and blocks further message processing
1065 /// until it completes. See the [`ordering`](crate::concepts::ordering) module for details on
1066 /// ordering guarantees and how to avoid deadlocks.
1067 pub fn on_receive_notification_from<Notif: JsonRpcNotification, Peer: Role, F, T, ToFut>(
1068 self,
1069 peer: Peer,
1070 op: F,
1071 to_future_hack: ToFut,
1072 ) -> Builder<Host, impl HandleDispatchFrom<Host::Counterpart>, Runner>
1073 where
1074 Host::Counterpart: HasPeer<Peer>,
1075 F: AsyncFnMut(Notif, ConnectionTo<Host::Counterpart>) -> Result<T, crate::Error> + Send,
1076 T: IntoHandled<(Notif, ConnectionTo<Host::Counterpart>)>,
1077 ToFut: Fn(
1078 &mut F,
1079 Notif,
1080 ConnectionTo<Host::Counterpart>,
1081 ) -> crate::BoxFuture<'_, Result<T, crate::Error>>
1082 + Send
1083 + Sync,
1084 {
1085 let handler = NotificationHandler::new(self.host.counterpart(), peer, op, to_future_hack);
1086 self.with_handler(handler)
1087 }
1088
1089 /// Add an MCP server that will be added to all new sessions that are proxied through this connection.
1090 ///
1091 /// Only applicable to proxies.
1092 pub fn with_mcp_server(
1093 self,
1094 mcp_server: McpServer<Host::Counterpart, impl RunWithConnectionTo<Host::Counterpart>>,
1095 ) -> Builder<
1096 Host,
1097 impl HandleDispatchFrom<Host::Counterpart>,
1098 impl RunWithConnectionTo<Host::Counterpart>,
1099 >
1100 where
1101 Host::Counterpart: HasPeer<Agent> + HasPeer<Client>,
1102 {
1103 let (handler, responder) = mcp_server.into_handler_and_responder();
1104 self.with_handler(handler).with_responder(responder)
1105 }
1106
1107 /// Run in server mode with the provided transport.
1108 ///
1109 /// This drives the connection by continuously processing messages from the transport
1110 /// and dispatching them to your registered handlers. The connection will run until:
1111 /// - The transport closes (e.g., EOF on byte streams)
1112 /// - An error occurs
1113 /// - One of your handlers returns an error
1114 ///
1115 /// The transport is responsible for serializing and deserializing `jsonrpcmsg::Message`
1116 /// values to/from the underlying I/O mechanism (byte streams, channels, etc.).
1117 ///
1118 /// Use this mode when you only need to respond to incoming messages and don't need
1119 /// to initiate your own requests. If you need to send requests to the other side,
1120 /// use [`connect_with`](Self::connect_with) instead.
1121 ///
1122 /// # Example: Byte Stream Transport
1123 ///
1124 /// ```no_run
1125 /// # use agent_client_protocol::UntypedRole;
1126 /// # use agent_client_protocol::{Builder};
1127 /// # use agent_client_protocol::Stdio;
1128 /// # use agent_client_protocol_test::*;
1129 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
1130 /// let transport = Stdio::new();
1131 ///
1132 /// UntypedRole.builder()
1133 /// .on_receive_request(async |req: MyRequest, responder, cx| {
1134 /// responder.respond(MyResponse { status: "ok".into() })
1135 /// }, agent_client_protocol::on_receive_request!())
1136 /// .connect_to(transport)
1137 /// .await?;
1138 /// # Ok(())
1139 /// # }
1140 /// ```
1141 pub async fn connect_to(
1142 self,
1143 transport: impl ConnectTo<Host> + 'static,
1144 ) -> Result<(), crate::Error> {
1145 self.connect_with(transport, async move |_cx| future::pending().await)
1146 .await
1147 }
1148
1149 /// Run the connection until the provided closure completes.
1150 ///
1151 /// This drives the connection by:
1152 /// 1. Running your registered handlers in the background to process incoming messages
1153 /// 2. Executing your `main_fn` closure with a [`ConnectionTo<R>`] for sending requests/notifications
1154 ///
1155 /// The connection stays active until your `main_fn` returns, then shuts down gracefully.
1156 /// If the connection closes unexpectedly before `main_fn` completes, this returns an error.
1157 ///
1158 /// Use this mode when you need to initiate communication (send requests/notifications)
1159 /// in addition to responding to incoming messages. For server-only mode where you just
1160 /// respond to messages, use [`connect_to`](Self::connect_to) instead.
1161 ///
1162 /// # Example
1163 ///
1164 /// ```no_run
1165 /// # use agent_client_protocol::UntypedRole;
1166 /// # use agent_client_protocol::{Builder};
1167 /// # use agent_client_protocol::ByteStreams;
1168 /// # use agent_client_protocol::schema::InitializeRequest;
1169 /// # use agent_client_protocol::Stdio;
1170 /// # use agent_client_protocol_test::*;
1171 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
1172 /// let transport = Stdio::new();
1173 ///
1174 /// UntypedRole.builder()
1175 /// .on_receive_request(async |req: MyRequest, responder, cx| {
1176 /// // Handle incoming requests in the background
1177 /// responder.respond(MyResponse { status: "ok".into() })
1178 /// }, agent_client_protocol::on_receive_request!())
1179 /// .connect_with(transport, async |cx| {
1180 /// // Initialize the protocol
1181 /// let init_response = cx.send_request(InitializeRequest::make())
1182 /// .block_task()
1183 /// .await?;
1184 ///
1185 /// // Send more requests...
1186 /// let result = cx.send_request(MyRequest {})
1187 /// .block_task()
1188 /// .await?;
1189 ///
1190 /// // When this closure returns, the connection shuts down
1191 /// Ok(())
1192 /// })
1193 /// .await?;
1194 /// # Ok(())
1195 /// # }
1196 /// ```
1197 ///
1198 /// # Parameters
1199 ///
1200 /// - `main_fn`: Your client logic. Receives a [`ConnectionTo<R>`] for sending messages.
1201 ///
1202 /// # Errors
1203 ///
1204 /// Returns an error if the connection closes before `main_fn` completes.
1205 pub async fn connect_with<R>(
1206 self,
1207 transport: impl ConnectTo<Host> + 'static,
1208 main_fn: impl AsyncFnOnce(ConnectionTo<Host::Counterpart>) -> Result<R, crate::Error>,
1209 ) -> Result<R, crate::Error> {
1210 let (_, future) = self.into_connection_and_future(transport, main_fn);
1211 future.await
1212 }
1213
1214 /// Helper that returns a [`ConnectionTo<R>`] and a future that runs this connection until `main_fn` returns.
1215 fn into_connection_and_future<R>(
1216 self,
1217 transport: impl ConnectTo<Host> + 'static,
1218 main_fn: impl AsyncFnOnce(ConnectionTo<Host::Counterpart>) -> Result<R, crate::Error>,
1219 ) -> (
1220 ConnectionTo<Host::Counterpart>,
1221 impl Future<Output = Result<R, crate::Error>>,
1222 ) {
1223 let Self {
1224 name,
1225 handler,
1226 responder,
1227 host: me,
1228 protocol_mode,
1229 } = self;
1230
1231 let (outgoing_tx, outgoing_rx) = mpsc::unbounded();
1232 let (new_task_tx, new_task_rx) = mpsc::unbounded();
1233 let (dynamic_handler_tx, dynamic_handler_rx) = mpsc::unbounded();
1234 let connection = ConnectionTo::new(
1235 me.counterpart(),
1236 outgoing_tx,
1237 new_task_tx,
1238 dynamic_handler_tx,
1239 );
1240
1241 // Convert transport into server - this returns a channel for us to use
1242 // and a future that runs the transport
1243 let transport_component = crate::DynConnectTo::new(transport);
1244 let (transport_channel, transport_future) = transport_component.into_channel_and_future();
1245 let spawn_result = connection.spawn(transport_future);
1246
1247 // Destructure the channel endpoints
1248 let Channel {
1249 rx: transport_incoming_rx,
1250 tx: transport_outgoing_tx,
1251 } = transport_channel;
1252
1253 let (reply_tx, reply_rx) = mpsc::unbounded();
1254 let protocol_compat = ProtocolCompat::new(protocol_mode);
1255
1256 let future = crate::util::instrument_with_connection_name(name, {
1257 let connection = connection.clone();
1258 async move {
1259 let () = spawn_result?;
1260
1261 let background = async {
1262 futures::try_join!(
1263 // Protocol layer: OutgoingMessage → jsonrpcmsg::Message
1264 outgoing_actor::outgoing_protocol_actor(
1265 outgoing_rx,
1266 reply_tx.clone(),
1267 transport_outgoing_tx,
1268 protocol_compat.clone(),
1269 ),
1270 // Protocol layer: jsonrpcmsg::Message → handler/reply routing
1271 incoming_actor::incoming_protocol_actor(
1272 me.counterpart(),
1273 &connection,
1274 transport_incoming_rx,
1275 dynamic_handler_rx,
1276 reply_rx,
1277 handler,
1278 protocol_compat,
1279 ),
1280 task_actor::task_actor(new_task_rx, &connection),
1281 responder.run_with_connection_to(connection.clone()),
1282 )?;
1283 Ok(())
1284 };
1285
1286 crate::util::run_until(Box::pin(background), Box::pin(main_fn(connection.clone())))
1287 .await
1288 }
1289 });
1290
1291 (connection, future)
1292 }
1293}
1294
1295impl<R, H, Run> ConnectTo<R::Counterpart> for Builder<R, H, Run>
1296where
1297 R: Role,
1298 H: HandleDispatchFrom<R::Counterpart> + 'static,
1299 Run: RunWithConnectionTo<R::Counterpart> + 'static,
1300{
1301 async fn connect_to(self, client: impl ConnectTo<R>) -> Result<(), crate::Error> {
1302 Builder::connect_to(self, client).await
1303 }
1304}
1305
1306/// The payload sent through the response oneshot channel.
1307///
1308/// Includes the response value and an optional ack channel for dispatch loop
1309/// synchronization.
1310pub(crate) struct ResponsePayload {
1311 /// The response result - either the JSON value or an error.
1312 pub(crate) result: Result<serde_json::Value, crate::Error>,
1313
1314 /// Optional acknowledgment channel for dispatch loop synchronization.
1315 ///
1316 /// When present, the receiver must send on this channel to signal that
1317 /// response processing is complete, allowing the dispatch loop to continue
1318 /// to the next message.
1319 ///
1320 /// This is `None` for error paths where the response is sent directly
1321 /// (e.g., when the outgoing channel is broken) rather than through the
1322 /// normal dispatch loop flow.
1323 pub(crate) ack_tx: Option<oneshot::Sender<()>>,
1324}
1325
1326impl std::fmt::Debug for ResponsePayload {
1327 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1328 f.debug_struct("ResponsePayload")
1329 .field("result", &self.result)
1330 .field("ack_tx", &self.ack_tx.as_ref().map(|_| "..."))
1331 .finish()
1332 }
1333}
1334
1335/// Message sent to the incoming actor for reply subscription management.
1336enum ReplyMessage {
1337 /// Subscribe to receive a response for the given request id.
1338 /// When a response with this id arrives, it will be sent through the oneshot
1339 /// along with an ack channel that must be signaled when processing is complete.
1340 /// The method name is stored to allow routing responses through typed handlers.
1341 Subscribe {
1342 id: jsonrpcmsg::Id,
1343
1344 /// id of the peer this request was sent to
1345 role_id: RoleId,
1346
1347 /// (original) method of the request -- the actual request may have been transformed
1348 /// to a successor method, but this will reflect the method of the wrapped request
1349 method: String,
1350
1351 sender: oneshot::Sender<ResponsePayload>,
1352 },
1353}
1354
1355impl std::fmt::Debug for ReplyMessage {
1356 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1357 match self {
1358 ReplyMessage::Subscribe { id, method, .. } => f
1359 .debug_struct("Subscribe")
1360 .field("id", id)
1361 .field("method", method)
1362 .finish(),
1363 }
1364 }
1365}
1366
1367/// Messages send to be serialized over the transport.
1368#[derive(Debug)]
1369enum OutgoingMessage {
1370 /// Send a request to the server.
1371 Request {
1372 /// id assigned to this request (generated by sender)
1373 id: jsonrpcmsg::Id,
1374
1375 /// the original method
1376 method: String,
1377
1378 /// the peer we sent this to
1379 role_id: RoleId,
1380
1381 /// the message to send; this may have a distinct method
1382 /// depending on the peer
1383 untyped: UntypedMessage,
1384
1385 /// where to send the response when it arrives (includes ack channel)
1386 response_tx: oneshot::Sender<ResponsePayload>,
1387 },
1388
1389 /// Send a notification to the server.
1390 Notification {
1391 /// the message to send; this may have a distinct method
1392 /// depending on the peer
1393 untyped: UntypedMessage,
1394 },
1395
1396 /// Send a response to a message from the server
1397 Response {
1398 id: jsonrpcmsg::Id,
1399
1400 /// Method of the incoming request this response completes.
1401 method: String,
1402
1403 response: Result<serde_json::Value, crate::Error>,
1404 },
1405
1406 /// Send a generalized error message
1407 Error { error: crate::Error },
1408}
1409
1410/// Return type from JrHandler; indicates whether the request was handled or not.
1411#[must_use]
1412#[derive(Debug)]
1413pub enum Handled<T> {
1414 /// The message was handled
1415 Yes,
1416
1417 /// The message was not handled; returns the original value.
1418 ///
1419 /// If `retry` is true,
1420 No {
1421 /// The message to be passed to subsequent handlers
1422 /// (typically the original message, but it may have been
1423 /// mutated.)
1424 message: T,
1425
1426 /// If true, request the message to be queued and retried with
1427 /// dynamic handlers as they are added.
1428 ///
1429 /// This is used for managing session updates since the dynamic
1430 /// handler for a session cannot be added until the response to the
1431 /// new session request has been processed and there may be updates
1432 /// that get processed at the same time.
1433 retry: bool,
1434 },
1435}
1436
1437/// Trait for converting handler return values into [`Handled`].
1438///
1439/// This trait allows handlers to return either `()` (which becomes `Handled::Yes`)
1440/// or an explicit `Handled<T>` value for more control over handler propagation.
1441pub trait IntoHandled<T> {
1442 /// Convert this value into a `Handled<T>`.
1443 fn into_handled(self) -> Handled<T>;
1444}
1445
1446impl<T> IntoHandled<T> for () {
1447 fn into_handled(self) -> Handled<T> {
1448 Handled::Yes
1449 }
1450}
1451
1452impl<T> IntoHandled<T> for Handled<T> {
1453 fn into_handled(self) -> Handled<T> {
1454 self
1455 }
1456}
1457
1458/// Connection context for sending messages and spawning tasks.
1459///
1460/// This is the primary handle for interacting with the JSON-RPC connection from
1461/// within handler callbacks. You can use it to:
1462///
1463/// * Send requests and notifications to the other side
1464/// * Spawn concurrent tasks that run alongside the connection
1465/// * Respond to requests (via [`Responder`] which wraps this)
1466///
1467/// # Cloning
1468///
1469/// `ConnectionTo` is cheaply cloneable - all clones refer to the same underlying connection.
1470/// This makes it easy to share across async tasks.
1471///
1472/// # Event Loop and Concurrency
1473///
1474/// Handler callbacks run on the event loop, which means the connection cannot process new
1475/// messages while your handler is running. Use [`spawn`](Self::spawn) to offload any
1476/// expensive or blocking work to concurrent tasks.
1477///
1478/// See the [Event Loop and Concurrency](Builder#event-loop-and-concurrency) section
1479/// for more details.
1480#[derive(Clone, Debug)]
1481pub struct ConnectionTo<Counterpart: Role> {
1482 counterpart: Counterpart,
1483 message_tx: OutgoingMessageTx,
1484 task_tx: TaskTx,
1485 dynamic_handler_tx: mpsc::UnboundedSender<DynamicHandlerMessage<Counterpart>>,
1486}
1487
1488impl<Counterpart: Role> ConnectionTo<Counterpart> {
1489 fn new(
1490 counterpart: Counterpart,
1491 message_tx: mpsc::UnboundedSender<OutgoingMessage>,
1492 task_tx: mpsc::UnboundedSender<Task>,
1493 dynamic_handler_tx: mpsc::UnboundedSender<DynamicHandlerMessage<Counterpart>>,
1494 ) -> Self {
1495 Self {
1496 counterpart,
1497 message_tx,
1498 task_tx,
1499 dynamic_handler_tx,
1500 }
1501 }
1502
1503 /// Return the counterpart role this connection is talking to.
1504 pub fn counterpart(&self) -> Counterpart {
1505 self.counterpart.clone()
1506 }
1507
1508 /// Spawns a task that will run so long as the JSON-RPC connection is being served.
1509 ///
1510 /// This is the primary mechanism for offloading expensive work from handler callbacks
1511 /// to avoid blocking the event loop. Spawned tasks run concurrently with the connection,
1512 /// allowing the server to continue processing messages.
1513 ///
1514 /// # Event Loop
1515 ///
1516 /// Handler callbacks run on the event loop, which cannot process new messages while
1517 /// your handler is running. Use `spawn` for any expensive operations:
1518 ///
1519 /// ```no_run
1520 /// # use agent_client_protocol_test::*;
1521 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
1522 /// # let connection = mock_connection();
1523 /// connection.on_receive_request(async |req: ProcessRequest, responder, cx| {
1524 /// // Clone cx for the spawned task
1525 /// cx.spawn({
1526 /// let connection = cx.clone();
1527 /// async move {
1528 /// let result = expensive_operation(&req.data).await?;
1529 /// connection.send_notification(ProcessComplete { result })?;
1530 /// Ok(())
1531 /// }
1532 /// })?;
1533 ///
1534 /// // Respond immediately
1535 /// responder.respond(ProcessResponse { result: "started".into() })
1536 /// }, agent_client_protocol::on_receive_request!())
1537 /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
1538 /// # Ok(())
1539 /// # }
1540 /// ```
1541 ///
1542 /// # Errors
1543 ///
1544 /// If the spawned task returns an error, the entire server will shut down.
1545 #[track_caller]
1546 pub fn spawn(
1547 &self,
1548 task: impl IntoFuture<Output = Result<(), crate::Error>, IntoFuture: Send + 'static>,
1549 ) -> Result<(), crate::Error> {
1550 let location = std::panic::Location::caller();
1551 let task = task.into_future();
1552 Task::new(location, task).spawn(&self.task_tx)
1553 }
1554
1555 /// Spawn a JSON-RPC connection in the background and return a [`ConnectionTo`] for sending messages to it.
1556 ///
1557 /// This is useful for creating multiple connections that communicate with each other,
1558 /// such as implementing proxy patterns or connecting to multiple backend services.
1559 ///
1560 /// # Arguments
1561 ///
1562 /// - `builder`: The connection builder with handlers configured
1563 /// - `transport`: The transport component to connect to
1564 ///
1565 /// # Returns
1566 ///
1567 /// A `ConnectionTo` that you can use to send requests and notifications to the spawned connection.
1568 ///
1569 /// # Example: Proxying to a backend connection
1570 ///
1571 /// ```
1572 /// # use agent_client_protocol::UntypedRole;
1573 /// # use agent_client_protocol::{Builder, ConnectionTo};
1574 /// # use agent_client_protocol_test::*;
1575 /// # async fn example(cx: ConnectionTo<UntypedRole>) -> Result<(), agent_client_protocol::Error> {
1576 /// // Set up a backend connection builder
1577 /// let backend = UntypedRole.builder()
1578 /// .on_receive_request(async |req: MyRequest, responder, _cx| {
1579 /// responder.respond(MyResponse { status: "ok".into() })
1580 /// }, agent_client_protocol::on_receive_request!());
1581 ///
1582 /// // Spawn it and get a context to send requests to it
1583 /// let backend_connection = cx.spawn_connection(backend, MockTransport)?;
1584 ///
1585 /// // Now you can forward requests to the backend
1586 /// let response = backend_connection.send_request(MyRequest {}).block_task().await?;
1587 /// # Ok(())
1588 /// # }
1589 /// ```
1590 #[track_caller]
1591 pub fn spawn_connection<R: Role>(
1592 &self,
1593 builder: Builder<
1594 R,
1595 impl HandleDispatchFrom<R::Counterpart> + 'static,
1596 impl RunWithConnectionTo<R::Counterpart> + 'static,
1597 >,
1598 transport: impl ConnectTo<R> + 'static,
1599 ) -> Result<ConnectionTo<R::Counterpart>, crate::Error> {
1600 let (connection, future) =
1601 builder.into_connection_and_future(transport, |_| std::future::pending());
1602 Task::new(std::panic::Location::caller(), future).spawn(&self.task_tx)?;
1603 Ok(connection)
1604 }
1605
1606 /// Send a request/notification and forward the response appropriately.
1607 ///
1608 /// The request context's response type matches the request's response type,
1609 /// enabling type-safe message forwarding.
1610 pub fn send_proxied_message<Req: JsonRpcRequest<Response: Send>, Notif: JsonRpcNotification>(
1611 &self,
1612 message: Dispatch<Req, Notif>,
1613 ) -> Result<(), crate::Error>
1614 where
1615 Counterpart: HasPeer<Counterpart>,
1616 {
1617 self.send_proxied_message_to(self.counterpart(), message)
1618 }
1619
1620 /// Send a request/notification and forward the response appropriately.
1621 ///
1622 /// The request context's response type matches the request's response type,
1623 /// enabling type-safe message forwarding.
1624 pub fn send_proxied_message_to<
1625 Peer: Role,
1626 Req: JsonRpcRequest<Response: Send>,
1627 Notif: JsonRpcNotification,
1628 >(
1629 &self,
1630 peer: Peer,
1631 message: Dispatch<Req, Notif>,
1632 ) -> Result<(), crate::Error>
1633 where
1634 Counterpart: HasPeer<Peer>,
1635 {
1636 match message {
1637 Dispatch::Request(request, responder) => self
1638 .send_request_to(peer, request)
1639 .forward_response_to(responder),
1640 Dispatch::Notification(notification) => self.send_notification_to(peer, notification),
1641 Dispatch::Response(result, router) => {
1642 // Responses are forwarded directly to their destination
1643 router.respond_with_result(result)
1644 }
1645 }
1646 }
1647
1648 /// Send an outgoing request and return a [`SentRequest`] for handling the reply.
1649 ///
1650 /// The returned [`SentRequest`] provides methods for receiving the response without
1651 /// blocking the event loop:
1652 ///
1653 /// * [`on_receiving_result`](SentRequest::on_receiving_result) - Schedule
1654 /// a callback to run when the response arrives (doesn't block the event loop)
1655 /// * [`block_task`](SentRequest::block_task) - Block the current task until the response
1656 /// arrives (only safe in spawned tasks, not in handlers)
1657 ///
1658 /// # Anti-Footgun Design
1659 ///
1660 /// The API intentionally makes it difficult to block on the result directly to prevent
1661 /// the common mistake of blocking the event loop while waiting for a response:
1662 ///
1663 /// ```compile_fail
1664 /// # use agent_client_protocol_test::*;
1665 /// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
1666 /// // ❌ This doesn't compile - prevents blocking the event loop
1667 /// let response = cx.send_request(MyRequest {}).await?;
1668 /// # Ok(())
1669 /// # }
1670 /// ```
1671 ///
1672 /// ```no_run
1673 /// # use agent_client_protocol_test::*;
1674 /// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
1675 /// // ✅ Option 1: Schedule callback (safe in handlers)
1676 /// cx.send_request(MyRequest {})
1677 /// .on_receiving_result(async |result| {
1678 /// // Handle the response
1679 /// Ok(())
1680 /// })?;
1681 ///
1682 /// // ✅ Option 2: Block in spawned task (safe because task is concurrent)
1683 /// cx.spawn({
1684 /// let cx = cx.clone();
1685 /// async move {
1686 /// let response = cx.send_request(MyRequest {})
1687 /// .block_task()
1688 /// .await?;
1689 /// // Process response...
1690 /// Ok(())
1691 /// }
1692 /// })?;
1693 /// # Ok(())
1694 /// # }
1695 /// ```
1696 /// Send an outgoing request to the default counterpart peer.
1697 ///
1698 /// This is a convenience method that sends to the counterpart role `R`.
1699 /// For explicit control over the target peer, use [`send_request_to`](Self::send_request_to).
1700 pub fn send_request<Req: JsonRpcRequest>(&self, request: Req) -> SentRequest<Req::Response>
1701 where
1702 Counterpart: HasPeer<Counterpart>,
1703 {
1704 self.send_request_to(self.counterpart.clone(), request)
1705 }
1706
1707 /// Send an outgoing request to a specific peer.
1708 ///
1709 /// The message will be transformed according to the [`HasPeer`](crate::role::HasPeer)
1710 /// implementation before being sent.
1711 pub fn send_request_to<Peer: Role, Req: JsonRpcRequest>(
1712 &self,
1713 peer: Peer,
1714 request: Req,
1715 ) -> SentRequest<Req::Response>
1716 where
1717 Counterpart: HasPeer<Peer>,
1718 {
1719 let method = request.method().to_string();
1720 let id = jsonrpcmsg::Id::String(uuid::Uuid::new_v4().to_string());
1721 let (response_tx, response_rx) = oneshot::channel();
1722 let role_id = peer.role_id();
1723 let remote_style = self.counterpart.remote_style(peer);
1724 match remote_style.transform_outgoing_message(request) {
1725 Ok(untyped) => {
1726 // Transform the message for the target role
1727 let message = OutgoingMessage::Request {
1728 id: id.clone(),
1729 method: method.clone(),
1730 role_id,
1731 untyped,
1732 response_tx,
1733 };
1734
1735 match self.message_tx.unbounded_send(message) {
1736 Ok(()) => (),
1737 Err(error) => {
1738 let OutgoingMessage::Request {
1739 method,
1740 response_tx,
1741 ..
1742 } = error.into_inner()
1743 else {
1744 unreachable!();
1745 };
1746
1747 response_tx
1748 .send(ResponsePayload {
1749 result: Err(crate::util::internal_error(format!(
1750 "failed to send outgoing request `{method}"
1751 ))),
1752 ack_tx: None,
1753 })
1754 .unwrap();
1755 }
1756 }
1757 }
1758
1759 Err(err) => {
1760 response_tx
1761 .send(ResponsePayload {
1762 result: Err(crate::util::internal_error(format!(
1763 "failed to create untyped request for `{method}`: {err}"
1764 ))),
1765 ack_tx: None,
1766 })
1767 .unwrap();
1768 }
1769 }
1770
1771 SentRequest::new(id, method.clone(), self.task_tx.clone(), response_rx)
1772 .map(move |json| <Req::Response>::from_value(&method, json))
1773 }
1774
1775 /// Send an outgoing notification to the default counterpart peer (no reply expected).
1776 ///
1777 /// Notifications are fire-and-forget messages that don't have IDs and don't expect responses.
1778 /// This method sends the notification immediately and returns.
1779 ///
1780 /// This is a convenience method that sends to the counterpart role `R`.
1781 /// For explicit control over the target peer, use [`send_notification_to`](Self::send_notification_to).
1782 ///
1783 /// ```no_run
1784 /// # use agent_client_protocol_test::*;
1785 /// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::Agent>) -> Result<(), agent_client_protocol::Error> {
1786 /// cx.send_notification(StatusUpdate {
1787 /// message: "Processing...".into(),
1788 /// })?;
1789 /// # Ok(())
1790 /// # }
1791 /// ```
1792 pub fn send_notification<N: JsonRpcNotification>(
1793 &self,
1794 notification: N,
1795 ) -> Result<(), crate::Error>
1796 where
1797 Counterpart: HasPeer<Counterpart>,
1798 {
1799 self.send_notification_to(self.counterpart.clone(), notification)
1800 }
1801
1802 /// Send an outgoing notification to a specific peer (no reply expected).
1803 ///
1804 /// The message will be transformed according to the [`HasPeer`](crate::role::HasPeer)
1805 /// implementation before being sent.
1806 pub fn send_notification_to<Peer: Role, N: JsonRpcNotification>(
1807 &self,
1808 peer: Peer,
1809 notification: N,
1810 ) -> Result<(), crate::Error>
1811 where
1812 Counterpart: HasPeer<Peer>,
1813 {
1814 let remote_style = self.counterpart.remote_style(peer);
1815 tracing::debug!(
1816 role = std::any::type_name::<Counterpart>(),
1817 peer = std::any::type_name::<Peer>(),
1818 notification_type = std::any::type_name::<N>(),
1819 ?remote_style,
1820 original_method = notification.method(),
1821 "send_notification_to"
1822 );
1823 let transformed = remote_style.transform_outgoing_message(notification)?;
1824 tracing::debug!(
1825 transformed_method = %transformed.method,
1826 "send_notification_to transformed"
1827 );
1828 send_raw_message(
1829 &self.message_tx,
1830 OutgoingMessage::Notification {
1831 untyped: transformed,
1832 },
1833 )
1834 }
1835
1836 /// Send an error notification (no reply expected).
1837 pub fn send_error_notification(&self, error: crate::Error) -> Result<(), crate::Error> {
1838 send_raw_message(&self.message_tx, OutgoingMessage::Error { error })
1839 }
1840
1841 /// Register a dynamic message handler, used to intercept messages specific to a particular session
1842 /// or some similar modal thing.
1843 ///
1844 /// Dynamic message handlers are called first for every incoming message.
1845 ///
1846 /// If they decline to handle the message, then the message is passed to the regular registered handlers.
1847 ///
1848 /// The handler will stay registered until the returned registration guard is dropped.
1849 pub fn add_dynamic_handler(
1850 &self,
1851 handler: impl HandleDispatchFrom<Counterpart> + 'static,
1852 ) -> Result<DynamicHandlerRegistration<Counterpart>, crate::Error> {
1853 let uuid = Uuid::new_v4();
1854 self.dynamic_handler_tx
1855 .unbounded_send(DynamicHandlerMessage::AddDynamicHandler(
1856 uuid,
1857 Box::new(handler),
1858 ))
1859 .map_err(crate::util::internal_error)?;
1860
1861 Ok(DynamicHandlerRegistration::new(uuid, self.clone()))
1862 }
1863
1864 fn remove_dynamic_handler(&self, uuid: Uuid) {
1865 // Ignore errors
1866 drop(
1867 self.dynamic_handler_tx
1868 .unbounded_send(DynamicHandlerMessage::RemoveDynamicHandler(uuid)),
1869 );
1870 }
1871}
1872
1873#[derive(Clone, Debug)]
1874pub struct DynamicHandlerRegistration<R: Role> {
1875 uuid: Uuid,
1876 cx: ConnectionTo<R>,
1877}
1878
1879impl<R: Role> DynamicHandlerRegistration<R> {
1880 fn new(uuid: Uuid, cx: ConnectionTo<R>) -> Self {
1881 Self { uuid, cx }
1882 }
1883
1884 /// Prevents the dynamic handler from being removed when dropped.
1885 pub fn run_indefinitely(self) {
1886 std::mem::forget(self);
1887 }
1888}
1889
1890impl<R: Role> Drop for DynamicHandlerRegistration<R> {
1891 fn drop(&mut self) {
1892 self.cx.remove_dynamic_handler(self.uuid);
1893 }
1894}
1895
1896/// The context to respond to an incoming request.
1897///
1898/// This context is provided to request handlers and serves a dual role:
1899///
1900/// 1. **Respond to the request** - Use [`respond`](Self::respond) or
1901/// [`respond_with_result`](Self::respond_with_result) to send the response
1902/// 2. **Send other messages** - Use the [`ConnectionTo`] parameter passed to your
1903/// handler, which provides [`send_request`](`ConnectionTo::send_request`),
1904/// [`send_notification`](`ConnectionTo::send_notification`), and
1905/// [`spawn`](`ConnectionTo::spawn`)
1906///
1907/// # Example
1908///
1909/// ```no_run
1910/// # use agent_client_protocol_test::*;
1911/// # async fn example() -> Result<(), agent_client_protocol::Error> {
1912/// # let connection = mock_connection();
1913/// connection.on_receive_request(async |req: ProcessRequest, responder, cx| {
1914/// // Send a notification while processing
1915/// cx.send_notification(StatusUpdate {
1916/// message: "processing".into(),
1917/// })?;
1918///
1919/// // Do some work...
1920/// let result = process(&req.data)?;
1921///
1922/// // Respond to the request
1923/// responder.respond(ProcessResponse { result })
1924/// }, agent_client_protocol::on_receive_request!())
1925/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
1926/// # Ok(())
1927/// # }
1928/// ```
1929///
1930/// # Event Loop Considerations
1931///
1932/// Like all handlers, request handlers run on the event loop. Use
1933/// [`spawn`](ConnectionTo::spawn) for expensive operations to avoid blocking
1934/// the connection.
1935///
1936/// See the [Event Loop and Concurrency](Builder#event-loop-and-concurrency)
1937/// section for more details.
1938#[must_use]
1939pub struct Responder<T: JsonRpcResponse = serde_json::Value> {
1940 /// The method of the request.
1941 method: String,
1942
1943 /// The `id` of the message we are replying to.
1944 id: jsonrpcmsg::Id,
1945
1946 /// Function to send the response to its destination.
1947 ///
1948 /// For incoming requests: serializes to JSON and sends over the wire.
1949 /// For incoming responses: sends to the waiting oneshot channel.
1950 send_fn: Box<dyn FnOnce(Result<T, crate::Error>) -> Result<(), crate::Error> + Send>,
1951}
1952
1953impl<T: JsonRpcResponse> std::fmt::Debug for Responder<T> {
1954 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1955 f.debug_struct("Responder")
1956 .field("method", &self.method)
1957 .field("id", &self.id)
1958 .field("response_type", &std::any::type_name::<T>())
1959 .finish_non_exhaustive()
1960 }
1961}
1962
1963impl Responder<serde_json::Value> {
1964 /// Create a new request context for an incoming request.
1965 ///
1966 /// The response will be serialized to JSON and sent over the wire.
1967 fn new(message_tx: OutgoingMessageTx, method: String, id: jsonrpcmsg::Id) -> Self {
1968 let id_clone = id.clone();
1969 let method_clone = method.clone();
1970 Self {
1971 method,
1972 id,
1973 send_fn: Box::new(move |response: Result<serde_json::Value, crate::Error>| {
1974 send_raw_message(
1975 &message_tx,
1976 OutgoingMessage::Response {
1977 id: id_clone,
1978 method: method_clone,
1979 response,
1980 },
1981 )
1982 }),
1983 }
1984 }
1985
1986 /// Cast this request context to a different response type.
1987 ///
1988 /// The provided type `T` will be serialized to JSON before sending.
1989 pub fn cast<T: JsonRpcResponse>(self) -> Responder<T> {
1990 self.wrap_params(move |method, value| match value {
1991 Ok(value) => T::into_json(value, method),
1992 Err(e) => Err(e),
1993 })
1994 }
1995}
1996
1997impl<T: JsonRpcResponse> Responder<T> {
1998 /// Method of the incoming request
1999 #[must_use]
2000 pub fn method(&self) -> &str {
2001 &self.method
2002 }
2003
2004 /// ID of the incoming request/response as a JSON value
2005 #[must_use]
2006 pub fn id(&self) -> serde_json::Value {
2007 crate::util::id_to_json(&self.id)
2008 }
2009
2010 /// Convert to a `Responder` that expects a JSON value
2011 /// and which checks (dynamically) that the JSON value it receives
2012 /// can be converted to `T`.
2013 pub fn erase_to_json(self) -> Responder<serde_json::Value> {
2014 self.wrap_params(|method, value| T::from_value(method, value?))
2015 }
2016
2017 /// Return a new Responder with a different method name.
2018 pub fn wrap_method(self, method: String) -> Responder<T> {
2019 Responder {
2020 method,
2021 id: self.id,
2022 send_fn: self.send_fn,
2023 }
2024 }
2025
2026 /// Return a new Responder that expects a response of type U.
2027 ///
2028 /// `wrap_fn` will be invoked with the method name and the result to transform
2029 /// type `U` into type `T` before sending.
2030 pub fn wrap_params<U: JsonRpcResponse>(
2031 self,
2032 wrap_fn: impl FnOnce(&str, Result<U, crate::Error>) -> Result<T, crate::Error> + Send + 'static,
2033 ) -> Responder<U> {
2034 let method = self.method.clone();
2035 Responder {
2036 method: self.method,
2037 id: self.id,
2038 send_fn: Box::new(move |input: Result<U, crate::Error>| {
2039 let t_value = wrap_fn(&method, input);
2040 (self.send_fn)(t_value)
2041 }),
2042 }
2043 }
2044
2045 /// Respond to the JSON-RPC request with either a value (`Ok`) or an error (`Err`).
2046 pub fn respond_with_result(
2047 self,
2048 response: Result<T, crate::Error>,
2049 ) -> Result<(), crate::Error> {
2050 tracing::debug!(id = ?self.id, "respond called");
2051 (self.send_fn)(response)
2052 }
2053
2054 /// Respond to the JSON-RPC request with a value.
2055 pub fn respond(self, response: T) -> Result<(), crate::Error> {
2056 self.respond_with_result(Ok(response))
2057 }
2058
2059 /// Respond to the JSON-RPC request with an internal error containing a message.
2060 pub fn respond_with_internal_error(self, message: impl ToString) -> Result<(), crate::Error> {
2061 self.respond_with_error(crate::util::internal_error(message))
2062 }
2063
2064 /// Respond to the JSON-RPC request with an error.
2065 pub fn respond_with_error(self, error: crate::Error) -> Result<(), crate::Error> {
2066 tracing::debug!(id = ?self.id, ?error, "respond_with_error called");
2067 self.respond_with_result(Err(error))
2068 }
2069}
2070
2071/// Context for handling an incoming JSON-RPC response.
2072///
2073/// This is the response-side counterpart to [`Responder`]. While `Responder` handles
2074/// incoming requests (where you send a response over the wire), `ResponseRouter` handles
2075/// incoming responses (where you route the response to a local task waiting for it).
2076///
2077/// Both are fundamentally "sinks" that push the message through a `send_fn`, but they
2078/// represent different points in the message lifecycle and carry different metadata.
2079#[must_use]
2080pub struct ResponseRouter<T: JsonRpcResponse = serde_json::Value> {
2081 /// The method of the original request.
2082 method: String,
2083
2084 /// The `id` of the original request.
2085 id: jsonrpcmsg::Id,
2086
2087 /// The RoleId to which the original request was sent
2088 /// (and hence from which the reply is expected).
2089 role_id: RoleId,
2090
2091 /// Function to send the response to the waiting task.
2092 send_fn: Box<dyn FnOnce(Result<T, crate::Error>) -> Result<(), crate::Error> + Send>,
2093}
2094
2095impl<T: JsonRpcResponse> std::fmt::Debug for ResponseRouter<T> {
2096 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2097 f.debug_struct("ResponseRouter")
2098 .field("method", &self.method)
2099 .field("id", &self.id)
2100 .field("response_type", &std::any::type_name::<T>())
2101 .finish_non_exhaustive()
2102 }
2103}
2104
2105impl ResponseRouter<serde_json::Value> {
2106 /// Create a new response context for routing a response to a local awaiter.
2107 ///
2108 /// When `respond_with_result` is called, the response is sent through the oneshot
2109 /// channel to the code that originally sent the request.
2110 pub(crate) fn new(
2111 method: String,
2112 id: jsonrpcmsg::Id,
2113 role_id: RoleId,
2114 sender: oneshot::Sender<ResponsePayload>,
2115 ) -> Self {
2116 Self {
2117 method,
2118 id,
2119 role_id,
2120 send_fn: Box::new(move |response: Result<serde_json::Value, crate::Error>| {
2121 sender
2122 .send(ResponsePayload {
2123 result: response,
2124 ack_tx: None,
2125 })
2126 .map_err(|_| {
2127 crate::util::internal_error("failed to send response, receiver dropped")
2128 })
2129 }),
2130 }
2131 }
2132
2133 /// Cast this response context to a different response type.
2134 ///
2135 /// The provided type `T` will be serialized to JSON before sending.
2136 pub fn cast<T: JsonRpcResponse>(self) -> ResponseRouter<T> {
2137 self.wrap_params(move |method, value| match value {
2138 Ok(value) => T::into_json(value, method),
2139 Err(e) => Err(e),
2140 })
2141 }
2142}
2143
2144impl<T: JsonRpcResponse> ResponseRouter<T> {
2145 /// Method of the original request
2146 #[must_use]
2147 pub fn method(&self) -> &str {
2148 &self.method
2149 }
2150
2151 /// ID of the original request as a JSON value
2152 #[must_use]
2153 pub fn id(&self) -> serde_json::Value {
2154 crate::util::id_to_json(&self.id)
2155 }
2156
2157 /// The peer to which the original request was sent.
2158 ///
2159 /// This is the peer from which we expect to receive the response.
2160 #[must_use]
2161 pub fn role_id(&self) -> RoleId {
2162 self.role_id.clone()
2163 }
2164
2165 /// Convert to a `ResponseRouter` that expects a JSON value
2166 /// and which checks (dynamically) that the JSON value it receives
2167 /// can be converted to `T`.
2168 pub fn erase_to_json(self) -> ResponseRouter<serde_json::Value> {
2169 self.wrap_params(|method, value| T::from_value(method, value?))
2170 }
2171
2172 /// Return a new ResponseRouter that expects a response of type U.
2173 ///
2174 /// `wrap_fn` will be invoked with the method name and the result to transform
2175 /// type `U` into type `T` before sending.
2176 fn wrap_params<U: JsonRpcResponse>(
2177 self,
2178 wrap_fn: impl FnOnce(&str, Result<U, crate::Error>) -> Result<T, crate::Error> + Send + 'static,
2179 ) -> ResponseRouter<U> {
2180 let method = self.method.clone();
2181 ResponseRouter {
2182 method: self.method,
2183 id: self.id,
2184 role_id: self.role_id,
2185 send_fn: Box::new(move |input: Result<U, crate::Error>| {
2186 let t_value = wrap_fn(&method, input);
2187 (self.send_fn)(t_value)
2188 }),
2189 }
2190 }
2191
2192 /// Complete the response by sending the result to the waiting task.
2193 pub fn respond_with_result(
2194 self,
2195 response: Result<T, crate::Error>,
2196 ) -> Result<(), crate::Error> {
2197 tracing::debug!(id = ?self.id, "response routed to awaiter");
2198 (self.send_fn)(response)
2199 }
2200
2201 /// Complete the response by sending a value to the waiting task.
2202 pub fn respond(self, response: T) -> Result<(), crate::Error> {
2203 self.respond_with_result(Ok(response))
2204 }
2205
2206 /// Complete the response by sending an internal error to the waiting task.
2207 pub fn respond_with_internal_error(self, message: impl ToString) -> Result<(), crate::Error> {
2208 self.respond_with_error(crate::util::internal_error(message))
2209 }
2210
2211 /// Complete the response by sending an error to the waiting task.
2212 pub fn respond_with_error(self, error: crate::Error) -> Result<(), crate::Error> {
2213 tracing::debug!(id = ?self.id, ?error, "error routed to awaiter");
2214 self.respond_with_result(Err(error))
2215 }
2216}
2217
2218/// Common bounds for any JSON-RPC message.
2219///
2220/// # Derive Macro
2221///
2222/// For simple message types, you can use the `JsonRpcRequest` or `JsonRpcNotification` derive macros
2223/// which will implement both `JsonRpcMessage` and the respective trait. See [`JsonRpcRequest`] and
2224/// [`JsonRpcNotification`] for examples.
2225pub trait JsonRpcMessage: 'static + Debug + Sized + Send + Clone {
2226 /// Check if this message type matches the given method name.
2227 fn matches_method(method: &str) -> bool;
2228
2229 /// The method name for the message.
2230 fn method(&self) -> &str;
2231
2232 /// Convert this message into an untyped message.
2233 fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error>;
2234
2235 /// Parse this type from a method name and parameters.
2236 ///
2237 /// Returns an error if the method doesn't match or deserialization fails.
2238 /// Callers should use `matches_method` first to check if this type handles the method.
2239 fn parse_message(method: &str, params: &impl Serialize) -> Result<Self, crate::Error>;
2240}
2241
2242/// Defines the "payload" of a successful response to a JSON-RPC request.
2243///
2244/// # Derive Macro
2245///
2246/// Use `#[derive(JsonRpcResponse)]` to automatically implement this trait:
2247///
2248/// ```ignore
2249/// use agent_client_protocol::JsonRpcResponse;
2250/// use serde::{Serialize, Deserialize};
2251///
2252/// #[derive(Debug, Serialize, Deserialize, JsonRpcResponse)]
2253/// #[response(method = "_hello")]
2254/// struct HelloResponse {
2255/// greeting: String,
2256/// }
2257/// ```
2258pub trait JsonRpcResponse: 'static + Debug + Sized + Send + Clone {
2259 /// Convert this message into a JSON value.
2260 fn into_json(self, method: &str) -> Result<serde_json::Value, crate::Error>;
2261
2262 /// Parse a JSON value into the response type.
2263 fn from_value(method: &str, value: serde_json::Value) -> Result<Self, crate::Error>;
2264}
2265
2266impl JsonRpcResponse for serde_json::Value {
2267 fn from_value(_method: &str, value: serde_json::Value) -> Result<Self, crate::Error> {
2268 Ok(value)
2269 }
2270
2271 fn into_json(self, _method: &str) -> Result<serde_json::Value, crate::Error> {
2272 Ok(self)
2273 }
2274}
2275
2276/// A struct that represents a notification (JSON-RPC message that does not expect a response).
2277///
2278/// # Derive Macro
2279///
2280/// Use `#[derive(JsonRpcNotification)]` to automatically implement both `JsonRpcMessage` and `JsonRpcNotification`:
2281///
2282/// ```ignore
2283/// use agent_client_protocol::JsonRpcNotification;
2284/// use serde::{Serialize, Deserialize};
2285///
2286/// #[derive(Debug, Clone, Serialize, Deserialize, JsonRpcNotification)]
2287/// #[notification(method = "_ping")]
2288/// struct PingNotification {
2289/// timestamp: u64,
2290/// }
2291/// ```
2292pub trait JsonRpcNotification: JsonRpcMessage {}
2293
2294/// A struct that represents a request (JSON-RPC message expecting a response).
2295///
2296/// # Derive Macro
2297///
2298/// Use `#[derive(JsonRpcRequest)]` to automatically implement both `JsonRpcMessage` and `JsonRpcRequest`:
2299///
2300/// ```ignore
2301/// use agent_client_protocol::{JsonRpcRequest, JsonRpcResponse};
2302/// use serde::{Serialize, Deserialize};
2303///
2304/// #[derive(Debug, Clone, Serialize, Deserialize, JsonRpcRequest)]
2305/// #[request(method = "_hello", response = HelloResponse)]
2306/// struct HelloRequest {
2307/// name: String,
2308/// }
2309///
2310/// #[derive(Debug, Serialize, Deserialize, JsonRpcResponse)]
2311/// struct HelloResponse {
2312/// greeting: String,
2313/// }
2314/// ```
2315pub trait JsonRpcRequest: JsonRpcMessage {
2316 /// The type of data expected in response.
2317 type Response: JsonRpcResponse;
2318}
2319
2320/// An enum capturing an in-flight request or notification.
2321/// In the case of a request, also includes the context used to respond to the request.
2322///
2323/// Type parameters allow specifying the concrete request and notification types.
2324/// By default, both are `UntypedMessage` for dynamic dispatch.
2325/// The request context's response type matches the request's response type.
2326#[derive(Debug)]
2327pub enum Dispatch<Req: JsonRpcRequest = UntypedMessage, Notif: JsonRpcMessage = UntypedMessage> {
2328 /// Incoming request and the context where the response should be sent.
2329 Request(Req, Responder<Req::Response>),
2330
2331 /// Incoming notification.
2332 Notification(Notif),
2333
2334 /// Incoming response to a request we sent.
2335 ///
2336 /// The first field is the response result (success or error from the remote).
2337 /// The second field is the context for forwarding the response to its destination
2338 /// (typically a waiting oneshot channel).
2339 Response(
2340 Result<Req::Response, crate::Error>,
2341 ResponseRouter<Req::Response>,
2342 ),
2343}
2344
2345impl<Req: JsonRpcRequest, Notif: JsonRpcMessage> Dispatch<Req, Notif> {
2346 /// Map the request and notification types to new types.
2347 ///
2348 /// Note: Response variants are passed through unchanged since they don't
2349 /// contain a parseable message payload.
2350 pub fn map<Req1, Notif1>(
2351 self,
2352 map_request: impl FnOnce(Req, Responder<Req::Response>) -> (Req1, Responder<Req1::Response>),
2353 map_notification: impl FnOnce(Notif) -> Notif1,
2354 ) -> Dispatch<Req1, Notif1>
2355 where
2356 Req1: JsonRpcRequest<Response = Req::Response>,
2357 Notif1: JsonRpcMessage,
2358 {
2359 match self {
2360 Dispatch::Request(request, responder) => {
2361 let (new_request, new_responder) = map_request(request, responder);
2362 Dispatch::Request(new_request, new_responder)
2363 }
2364 Dispatch::Notification(notification) => {
2365 let new_notification = map_notification(notification);
2366 Dispatch::Notification(new_notification)
2367 }
2368 Dispatch::Response(result, router) => Dispatch::Response(result, router),
2369 }
2370 }
2371
2372 /// Respond to the message with an error.
2373 ///
2374 /// If this message is a request, this error becomes the reply to the request.
2375 ///
2376 /// If this message is a notification, the error is sent as a notification.
2377 ///
2378 /// If this message is a response, the error is forwarded to the waiting handler.
2379 pub fn respond_with_error<R: Role>(
2380 self,
2381 error: crate::Error,
2382 cx: ConnectionTo<R>,
2383 ) -> Result<(), crate::Error> {
2384 match self {
2385 Dispatch::Request(_, responder) => responder.respond_with_error(error),
2386 Dispatch::Notification(_) => cx.send_error_notification(error),
2387 Dispatch::Response(_, responder) => responder.respond_with_error(error),
2388 }
2389 }
2390
2391 /// Convert to a `Responder` that expects a JSON value
2392 /// and which checks (dynamically) that the JSON value it receives
2393 /// can be converted to `T`.
2394 ///
2395 /// Note: Response variants cannot be erased since their payload is already
2396 /// parsed. This returns an error for Response variants.
2397 pub fn erase_to_json(self) -> Result<Dispatch, crate::Error> {
2398 match self {
2399 Dispatch::Request(response, responder) => Ok(Dispatch::Request(
2400 response.to_untyped_message()?,
2401 responder.erase_to_json(),
2402 )),
2403 Dispatch::Notification(notification) => {
2404 Ok(Dispatch::Notification(notification.to_untyped_message()?))
2405 }
2406 Dispatch::Response(_, _) => Err(crate::util::internal_error(
2407 "cannot erase Response variant to JSON",
2408 )),
2409 }
2410 }
2411
2412 /// Convert the message in self to an untyped message.
2413 ///
2414 /// Note: Response variants don't have an untyped message representation.
2415 /// This returns an error for Response variants.
2416 pub fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error> {
2417 match self {
2418 Dispatch::Request(request, _) => request.to_untyped_message(),
2419 Dispatch::Notification(notification) => notification.to_untyped_message(),
2420 Dispatch::Response(_, _) => Err(crate::util::internal_error(
2421 "Response variant has no untyped message representation",
2422 )),
2423 }
2424 }
2425
2426 /// Convert self to an untyped message context.
2427 ///
2428 /// Note: Response variants cannot be converted. This returns an error for Response variants.
2429 pub fn into_untyped_dispatch(self) -> Result<Dispatch, crate::Error> {
2430 match self {
2431 Dispatch::Request(request, responder) => Ok(Dispatch::Request(
2432 request.to_untyped_message()?,
2433 responder.erase_to_json(),
2434 )),
2435 Dispatch::Notification(notification) => {
2436 Ok(Dispatch::Notification(notification.to_untyped_message()?))
2437 }
2438 Dispatch::Response(_, _) => Err(crate::util::internal_error(
2439 "cannot convert Response variant to untyped message context",
2440 )),
2441 }
2442 }
2443
2444 /// Returns the request ID if this is a request or response, None if notification.
2445 pub fn id(&self) -> Option<serde_json::Value> {
2446 match self {
2447 Dispatch::Request(_, cx) => Some(cx.id()),
2448 Dispatch::Notification(_) => None,
2449 Dispatch::Response(_, cx) => Some(cx.id()),
2450 }
2451 }
2452
2453 /// Returns the method of the message.
2454 ///
2455 /// For requests and notifications, this is the method from the message payload.
2456 /// For responses, this is the method of the original request.
2457 pub fn method(&self) -> &str {
2458 match self {
2459 Dispatch::Request(msg, _) => msg.method(),
2460 Dispatch::Notification(msg) => msg.method(),
2461 Dispatch::Response(_, cx) => cx.method(),
2462 }
2463 }
2464}
2465
2466impl Dispatch {
2467 /// Attempts to parse `self` into a typed message context.
2468 ///
2469 /// # Returns
2470 ///
2471 /// * `Ok(Ok(typed))` if this is a request/notification of the given types
2472 /// * `Ok(Err(self))` if not
2473 /// * `Err` if has the correct method for the given types but parsing fails
2474 #[tracing::instrument(skip(self), fields(Request = ?std::any::type_name::<Req>(), Notif = ?std::any::type_name::<Notif>()), level = "trace", ret)]
2475 pub(crate) fn into_typed_dispatch<Req: JsonRpcRequest, Notif: JsonRpcNotification>(
2476 self,
2477 ) -> Result<Result<Dispatch<Req, Notif>, Dispatch>, crate::Error> {
2478 tracing::debug!(
2479 message = ?self,
2480 "into_typed_dispatch"
2481 );
2482 match self {
2483 Dispatch::Request(message, responder) => {
2484 if Req::matches_method(&message.method) {
2485 match Req::parse_message(&message.method, &message.params) {
2486 Ok(req) => {
2487 tracing::trace!(?req, "parsed ok");
2488 Ok(Ok(Dispatch::Request(req, responder.cast())))
2489 }
2490 Err(err) => {
2491 tracing::trace!(?err, "parse error");
2492 Err(err)
2493 }
2494 }
2495 } else {
2496 tracing::trace!("method doesn't match");
2497 Ok(Err(Dispatch::Request(message, responder)))
2498 }
2499 }
2500
2501 Dispatch::Notification(message) => {
2502 if Notif::matches_method(&message.method) {
2503 match Notif::parse_message(&message.method, &message.params) {
2504 Ok(notif) => {
2505 tracing::trace!(?notif, "parse ok");
2506 Ok(Ok(Dispatch::Notification(notif)))
2507 }
2508 Err(err) => {
2509 tracing::trace!(?err, "parse error");
2510 Err(err)
2511 }
2512 }
2513 } else {
2514 tracing::trace!("method doesn't match");
2515 Ok(Err(Dispatch::Notification(message)))
2516 }
2517 }
2518
2519 Dispatch::Response(result, cx) => {
2520 let method = cx.method();
2521 if Req::matches_method(method) {
2522 // Parse the response result
2523 let typed_result = match result {
2524 Ok(value) => {
2525 match <Req::Response as JsonRpcResponse>::from_value(method, value) {
2526 Ok(parsed) => {
2527 tracing::trace!(?parsed, "parse ok");
2528 Ok(parsed)
2529 }
2530 Err(err) => {
2531 tracing::trace!(?err, "parse error");
2532 return Err(err);
2533 }
2534 }
2535 }
2536 Err(err) => {
2537 tracing::trace!("error, passthrough");
2538 Err(err)
2539 }
2540 };
2541 Ok(Ok(Dispatch::Response(typed_result, cx.cast())))
2542 } else {
2543 tracing::trace!("method doesn't match");
2544 Ok(Err(Dispatch::Response(result, cx)))
2545 }
2546 }
2547 }
2548 }
2549
2550 /// True if this message has a field with the given name.
2551 ///
2552 /// Returns `false` for Response variants.
2553 #[must_use]
2554 pub fn has_field(&self, field_name: &str) -> bool {
2555 self.message()
2556 .and_then(|m| m.params().get(field_name))
2557 .is_some()
2558 }
2559
2560 /// Returns true if this message has a session-id field.
2561 ///
2562 /// Returns `false` for Response variants.
2563 pub(crate) fn has_session_id(&self) -> bool {
2564 self.has_field("sessionId")
2565 }
2566
2567 /// Extract the ACP session-id from this message (if any).
2568 ///
2569 /// Returns `Ok(None)` for Response variants.
2570 pub(crate) fn get_session_id(&self) -> Result<Option<SessionId>, crate::Error> {
2571 let Some(message) = self.message() else {
2572 return Ok(None);
2573 };
2574 let Some(value) = message.params().get("sessionId") else {
2575 return Ok(None);
2576 };
2577 let session_id = serde_json::from_value(value.clone())?;
2578 Ok(Some(session_id))
2579 }
2580
2581 /// Try to parse this as a notification of the given type.
2582 ///
2583 /// # Returns
2584 ///
2585 /// * `Ok(Ok(typed))` if this is a request/notification of the given types
2586 /// * `Ok(Err(self))` if not
2587 /// * `Err` if has the correct method for the given types but parsing fails
2588 pub fn into_notification<N: JsonRpcNotification>(
2589 self,
2590 ) -> Result<Result<N, Dispatch>, crate::Error> {
2591 match self {
2592 Dispatch::Notification(msg) => {
2593 if !N::matches_method(&msg.method) {
2594 return Ok(Err(Dispatch::Notification(msg)));
2595 }
2596 match N::parse_message(&msg.method, &msg.params) {
2597 Ok(n) => Ok(Ok(n)),
2598 Err(err) => Err(err),
2599 }
2600 }
2601 Dispatch::Request(..) | Dispatch::Response(..) => Ok(Err(self)),
2602 }
2603 }
2604
2605 /// Try to parse this as a request of the given type.
2606 ///
2607 /// # Returns
2608 ///
2609 /// * `Ok(Ok(typed))` if this is a request/notification of the given types
2610 /// * `Ok(Err(self))` if not
2611 /// * `Err` if has the correct method for the given types but parsing fails
2612 pub fn into_request<Req: JsonRpcRequest>(
2613 self,
2614 ) -> Result<Result<(Req, Responder<Req::Response>), Dispatch>, crate::Error> {
2615 match self {
2616 Dispatch::Request(msg, responder) => {
2617 if !Req::matches_method(&msg.method) {
2618 return Ok(Err(Dispatch::Request(msg, responder)));
2619 }
2620 match Req::parse_message(&msg.method, &msg.params) {
2621 Ok(req) => Ok(Ok((req, responder.cast()))),
2622 Err(err) => Err(err),
2623 }
2624 }
2625 Dispatch::Notification(..) | Dispatch::Response(..) => Ok(Err(self)),
2626 }
2627 }
2628}
2629
2630impl<M: JsonRpcRequest + JsonRpcNotification> Dispatch<M, M> {
2631 /// Returns the message payload for requests and notifications.
2632 ///
2633 /// Returns `None` for Response variants since they don't contain a message payload.
2634 pub fn message(&self) -> Option<&M> {
2635 match self {
2636 Dispatch::Request(msg, _) | Dispatch::Notification(msg) => Some(msg),
2637 Dispatch::Response(_, _) => None,
2638 }
2639 }
2640
2641 /// Map the request/notification message.
2642 ///
2643 /// Response variants pass through unchanged.
2644 pub(crate) fn try_map_message(
2645 self,
2646 map_message: impl FnOnce(M) -> Result<M, crate::Error>,
2647 ) -> Result<Dispatch<M, M>, crate::Error> {
2648 match self {
2649 Dispatch::Request(request, cx) => Ok(Dispatch::Request(map_message(request)?, cx)),
2650 Dispatch::Notification(notification) => {
2651 Ok(Dispatch::<M, M>::Notification(map_message(notification)?))
2652 }
2653 Dispatch::Response(result, cx) => Ok(Dispatch::Response(result, cx)),
2654 }
2655 }
2656}
2657
2658/// An incoming JSON message without any typing. Can be a request or a notification.
2659#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
2660pub struct UntypedMessage {
2661 /// The JSON-RPC method name
2662 pub method: String,
2663 /// The JSON-RPC parameters as a raw JSON value
2664 pub params: serde_json::Value,
2665}
2666
2667impl UntypedMessage {
2668 /// Returns an untyped message with the given method and parameters.
2669 pub fn new(method: &str, params: impl Serialize) -> Result<Self, crate::Error> {
2670 let params = serde_json::to_value(params)?;
2671 Ok(Self {
2672 method: method.to_string(),
2673 params,
2674 })
2675 }
2676
2677 /// Returns the method name
2678 #[must_use]
2679 pub fn method(&self) -> &str {
2680 &self.method
2681 }
2682
2683 /// Returns the parameters as a JSON value
2684 #[must_use]
2685 pub fn params(&self) -> &serde_json::Value {
2686 &self.params
2687 }
2688
2689 /// Consumes this message and returns the method and params
2690 #[must_use]
2691 pub fn into_parts(self) -> (String, serde_json::Value) {
2692 (self.method, self.params)
2693 }
2694
2695 /// Convert `self` to a JSON-RPC message.
2696 pub(crate) fn into_jsonrpc_msg(
2697 self,
2698 id: Option<jsonrpcmsg::Id>,
2699 ) -> Result<jsonrpcmsg::Request, crate::Error> {
2700 let Self { method, params } = self;
2701 Ok(jsonrpcmsg::Request::new_v2(method, json_cast(params)?, id))
2702 }
2703}
2704
2705impl JsonRpcMessage for UntypedMessage {
2706 fn matches_method(_method: &str) -> bool {
2707 // UntypedMessage matches any method - it's the untyped fallback
2708 true
2709 }
2710
2711 fn method(&self) -> &str {
2712 &self.method
2713 }
2714
2715 fn to_untyped_message(&self) -> Result<UntypedMessage, crate::Error> {
2716 Ok(self.clone())
2717 }
2718
2719 fn parse_message(method: &str, params: &impl Serialize) -> Result<Self, crate::Error> {
2720 UntypedMessage::new(method, params)
2721 }
2722}
2723
2724impl JsonRpcRequest for UntypedMessage {
2725 type Response = serde_json::Value;
2726}
2727
2728impl JsonRpcNotification for UntypedMessage {}
2729
2730/// Represents a pending response of type `R` from an outgoing request.
2731///
2732/// Returned by [`ConnectionTo::send_request`], this type provides methods for handling
2733/// the response without blocking the event loop. The API is intentionally designed to make
2734/// it difficult to accidentally block.
2735///
2736/// # Anti-Footgun Design
2737///
2738/// You cannot directly `.await` a `SentRequest`. Instead, you must choose how to handle
2739/// the response:
2740///
2741/// ## Option 1: Schedule a Callback (Safe in Handlers)
2742///
2743/// Use [`on_receiving_result`](Self::on_receiving_result) to schedule a task
2744/// that runs when the response arrives. This doesn't block the event loop:
2745///
2746/// ```no_run
2747/// # use agent_client_protocol_test::*;
2748/// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
2749/// cx.send_request(MyRequest {})
2750/// .on_receiving_result(async |result| {
2751/// match result {
2752/// Ok(response) => {
2753/// // Handle successful response
2754/// Ok(())
2755/// }
2756/// Err(error) => {
2757/// // Handle error
2758/// Err(error)
2759/// }
2760/// }
2761/// })?;
2762/// # Ok(())
2763/// # }
2764/// ```
2765///
2766/// ## Option 2: Block in a Spawned Task (Safe Only in `spawn`)
2767///
2768/// Use [`block_task`](Self::block_task) to block until the response arrives, but **only**
2769/// in a spawned task (never in a handler):
2770///
2771/// ```no_run
2772/// # use agent_client_protocol_test::*;
2773/// # async fn example(cx: agent_client_protocol::ConnectionTo<agent_client_protocol::UntypedRole>) -> Result<(), agent_client_protocol::Error> {
2774/// // ✅ Safe: Spawned task runs concurrently
2775/// cx.spawn({
2776/// let cx = cx.clone();
2777/// async move {
2778/// let response = cx.send_request(MyRequest {})
2779/// .block_task()
2780/// .await?;
2781/// // Process response...
2782/// Ok(())
2783/// }
2784/// })?;
2785/// # Ok(())
2786/// # }
2787/// ```
2788///
2789/// ```no_run
2790/// # use agent_client_protocol_test::*;
2791/// # async fn example() -> Result<(), agent_client_protocol::Error> {
2792/// # let connection = mock_connection();
2793/// // ❌ NEVER do this in a handler - blocks the event loop!
2794/// connection.on_receive_request(async |req: MyRequest, responder, cx| {
2795/// let response = cx.send_request(MyRequest {})
2796/// .block_task() // This will deadlock!
2797/// .await?;
2798/// responder.respond(response)
2799/// }, agent_client_protocol::on_receive_request!())
2800/// # .connect_to(agent_client_protocol_test::MockTransport).await?;
2801/// # Ok(())
2802/// # }
2803/// ```
2804///
2805/// # Why This Design?
2806///
2807/// If you block the event loop while waiting for a response, the connection cannot process
2808/// the incoming response message, creating a deadlock. This API design prevents that footgun
2809/// by making blocking explicit and encouraging non-blocking patterns.
2810pub struct SentRequest<T> {
2811 id: jsonrpcmsg::Id,
2812 method: String,
2813 task_tx: TaskTx,
2814 response_rx: oneshot::Receiver<ResponsePayload>,
2815 to_result: Box<dyn Fn(serde_json::Value) -> Result<T, crate::Error> + Send>,
2816}
2817
2818impl<T: Debug> Debug for SentRequest<T> {
2819 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2820 f.debug_struct("SentRequest")
2821 .field("id", &self.id)
2822 .field("method", &self.method)
2823 .field("task_tx", &self.task_tx)
2824 .field("response_rx", &self.response_rx)
2825 .finish_non_exhaustive()
2826 }
2827}
2828
2829impl SentRequest<serde_json::Value> {
2830 fn new(
2831 id: jsonrpcmsg::Id,
2832 method: String,
2833 task_tx: mpsc::UnboundedSender<Task>,
2834 response_rx: oneshot::Receiver<ResponsePayload>,
2835 ) -> Self {
2836 Self {
2837 id,
2838 method,
2839 response_rx,
2840 task_tx,
2841 to_result: Box::new(Ok),
2842 }
2843 }
2844}
2845
2846impl<T: JsonRpcResponse> SentRequest<T> {
2847 /// The id of the outgoing request.
2848 #[must_use]
2849 pub fn id(&self) -> serde_json::Value {
2850 crate::util::id_to_json(&self.id)
2851 }
2852
2853 /// The method of the request this is in response to.
2854 #[must_use]
2855 pub fn method(&self) -> &str {
2856 &self.method
2857 }
2858
2859 /// Create a new response that maps the result of the response to a new type.
2860 pub fn map<U>(
2861 self,
2862 map_fn: impl Fn(T) -> Result<U, crate::Error> + 'static + Send,
2863 ) -> SentRequest<U> {
2864 SentRequest {
2865 id: self.id,
2866 method: self.method,
2867 response_rx: self.response_rx,
2868 task_tx: self.task_tx,
2869 to_result: Box::new(move |value| map_fn((self.to_result)(value)?)),
2870 }
2871 }
2872
2873 /// Forward the response (success or error) to a request context when it arrives.
2874 ///
2875 /// This is a convenience method for proxying messages between connections. When the
2876 /// response arrives, it will be automatically sent to the provided request context,
2877 /// whether it's a successful response or an error.
2878 ///
2879 /// # Example: Proxying requests
2880 ///
2881 /// ```
2882 /// # use agent_client_protocol::UntypedRole;
2883 /// # use agent_client_protocol::{Builder, ConnectionTo};
2884 /// # use agent_client_protocol_test::*;
2885 /// # async fn example(cx: ConnectionTo<UntypedRole>) -> Result<(), agent_client_protocol::Error> {
2886 /// // Set up backend connection builder
2887 /// let backend = UntypedRole.builder()
2888 /// .on_receive_request(async |req: MyRequest, responder, cx| {
2889 /// responder.respond(MyResponse { status: "ok".into() })
2890 /// }, agent_client_protocol::on_receive_request!());
2891 ///
2892 /// // Spawn backend and get a context to send to it
2893 /// let backend_connection = cx.spawn_connection(backend, MockTransport)?;
2894 ///
2895 /// // Set up proxy that forwards requests to backend
2896 /// UntypedRole.builder()
2897 /// .on_receive_request({
2898 /// let backend_connection = backend_connection.clone();
2899 /// async move |req: MyRequest, responder, cx| {
2900 /// // Forward the request to backend and proxy the response back
2901 /// backend_connection.send_request(req)
2902 /// .forward_response_to(responder)?;
2903 /// Ok(())
2904 /// }
2905 /// }, agent_client_protocol::on_receive_request!());
2906 /// # Ok(())
2907 /// # }
2908 /// ```
2909 ///
2910 /// # Type Safety
2911 ///
2912 /// The request context's response type must match the request's response type,
2913 /// ensuring type-safe message forwarding.
2914 ///
2915 /// # When to Use
2916 ///
2917 /// Use this when:
2918 /// - You're implementing a proxy or gateway pattern
2919 /// - You want to forward responses without processing them
2920 /// - The response types match between the outgoing request and incoming request
2921 ///
2922 /// This is equivalent to calling `on_receiving_result` and manually forwarding
2923 /// the result, but more concise.
2924 pub fn forward_response_to(self, responder: Responder<T>) -> Result<(), crate::Error>
2925 where
2926 T: Send,
2927 {
2928 self.on_receiving_result(async move |result| responder.respond_with_result(result))
2929 }
2930
2931 /// Block the current task until the response is received.
2932 ///
2933 /// **Warning:** This method blocks the current async task. It is **only safe** to use
2934 /// in spawned tasks created with [`ConnectionTo::spawn`]. Using it directly in a
2935 /// handler callback will deadlock the connection.
2936 ///
2937 /// # Safe Usage (in spawned tasks)
2938 ///
2939 /// ```no_run
2940 /// # use agent_client_protocol_test::*;
2941 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
2942 /// # let connection = mock_connection();
2943 /// connection.on_receive_request(async |req: MyRequest, responder, cx| {
2944 /// // Spawn a task to handle the request
2945 /// cx.spawn({
2946 /// let connection = cx.clone();
2947 /// async move {
2948 /// // Safe: We're in a spawned task, not blocking the event loop
2949 /// let response = connection.send_request(OtherRequest {})
2950 /// .block_task()
2951 /// .await?;
2952 ///
2953 /// // Process the response...
2954 /// Ok(())
2955 /// }
2956 /// })?;
2957 ///
2958 /// // Respond immediately
2959 /// responder.respond(MyResponse { status: "ok".into() })
2960 /// }, agent_client_protocol::on_receive_request!())
2961 /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
2962 /// # Ok(())
2963 /// # }
2964 /// ```
2965 ///
2966 /// # Unsafe Usage (in handlers - will deadlock!)
2967 ///
2968 /// ```no_run
2969 /// # use agent_client_protocol_test::*;
2970 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
2971 /// # let connection = mock_connection();
2972 /// connection.on_receive_request(async |req: MyRequest, responder, cx| {
2973 /// // ❌ DEADLOCK: Handler blocks event loop, which can't process the response
2974 /// let response = cx.send_request(OtherRequest {})
2975 /// .block_task()
2976 /// .await?;
2977 ///
2978 /// responder.respond(MyResponse { status: response.value })
2979 /// }, agent_client_protocol::on_receive_request!())
2980 /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
2981 /// # Ok(())
2982 /// # }
2983 /// ```
2984 ///
2985 /// # When to Use
2986 ///
2987 /// Use this method when:
2988 /// - You're in a spawned task (via [`ConnectionTo::spawn`])
2989 /// - You need the response value to proceed with your logic
2990 /// - Linear control flow is more natural than callbacks
2991 ///
2992 /// For handler callbacks, use [`on_receiving_result`](Self::on_receiving_result) instead.
2993 pub async fn block_task(self) -> Result<T, crate::Error>
2994 where
2995 T: Send,
2996 {
2997 match self.response_rx.await {
2998 Ok(ResponsePayload {
2999 result: Ok(json_value),
3000 ack_tx,
3001 }) => {
3002 // Ack immediately - we're in a spawned task, so the dispatch loop
3003 // can continue while we process the value.
3004 if let Some(tx) = ack_tx {
3005 let _ = tx.send(());
3006 }
3007 match (self.to_result)(json_value) {
3008 Ok(value) => Ok(value),
3009 Err(err) => Err(err),
3010 }
3011 }
3012 Ok(ResponsePayload {
3013 result: Err(err),
3014 ack_tx,
3015 }) => {
3016 if let Some(tx) = ack_tx {
3017 let _ = tx.send(());
3018 }
3019 Err(err)
3020 }
3021 Err(err) => Err(crate::util::internal_error(format!(
3022 "response to `{}` never received: {}",
3023 self.method, err
3024 ))),
3025 }
3026 }
3027
3028 /// Schedule an async task to run when a successful response is received.
3029 ///
3030 /// This is a convenience wrapper around [`on_receiving_result`](Self::on_receiving_result)
3031 /// for the common pattern of forwarding errors to a request context while only processing
3032 /// successful responses.
3033 ///
3034 /// # Behavior
3035 ///
3036 /// - If the response is `Ok(value)`, your task receives the value and the request context
3037 /// - If the response is `Err(error)`, the error is automatically sent to `responder`
3038 /// and your task is not called
3039 ///
3040 /// # Example: Chaining requests
3041 ///
3042 /// ```no_run
3043 /// # use agent_client_protocol_test::*;
3044 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
3045 /// # let connection = mock_connection();
3046 /// connection.on_receive_request(async |req: ValidateRequest, responder, cx| {
3047 /// // Send initial request
3048 /// cx.send_request(ValidateRequest { data: req.data.clone() })
3049 /// .on_receiving_ok_result(responder, async |validation, responder| {
3050 /// // Only runs if validation succeeded
3051 /// if validation.is_valid {
3052 /// // Respond to original request
3053 /// responder.respond(ValidateResponse { is_valid: true, error: None })
3054 /// } else {
3055 /// responder.respond_with_error(agent_client_protocol::util::internal_error("validation failed"))
3056 /// }
3057 /// })?;
3058 ///
3059 /// Ok(())
3060 /// }, agent_client_protocol::on_receive_request!())
3061 /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
3062 /// # Ok(())
3063 /// # }
3064 /// ```
3065 ///
3066 /// # Ordering
3067 ///
3068 /// Like [`on_receiving_result`](Self::on_receiving_result), the callback blocks the
3069 /// dispatch loop until it completes. See the [`ordering`](crate::concepts::ordering) module
3070 /// for details.
3071 ///
3072 /// # When to Use
3073 ///
3074 /// Use this when:
3075 /// - You need to respond to a request based on another request's result
3076 /// - You want errors to automatically propagate to the request context
3077 /// - You only care about the success case
3078 ///
3079 /// For more control over error handling, use [`on_receiving_result`](Self::on_receiving_result).
3080 #[track_caller]
3081 pub fn on_receiving_ok_result<F>(
3082 self,
3083 responder: Responder<T>,
3084 task: impl FnOnce(T, Responder<T>) -> F + 'static + Send,
3085 ) -> Result<(), crate::Error>
3086 where
3087 F: Future<Output = Result<(), crate::Error>> + 'static + Send,
3088 T: Send,
3089 {
3090 self.on_receiving_result(async move |result| match result {
3091 Ok(value) => task(value, responder).await,
3092 Err(err) => responder.respond_with_error(err),
3093 })
3094 }
3095
3096 /// Schedule an async task to run when the response is received.
3097 ///
3098 /// This is the recommended way to handle responses in handler callbacks, as it doesn't
3099 /// block the event loop. The task will be spawned automatically when the response arrives.
3100 ///
3101 /// # Example: Handle response in callback
3102 ///
3103 /// ```no_run
3104 /// # use agent_client_protocol_test::*;
3105 /// # async fn example() -> Result<(), agent_client_protocol::Error> {
3106 /// # let connection = mock_connection();
3107 /// connection.on_receive_request(async |req: MyRequest, responder, cx| {
3108 /// // Send a request and schedule a callback for the response
3109 /// cx.send_request(QueryRequest { id: 22 })
3110 /// .on_receiving_result({
3111 /// let connection = cx.clone();
3112 /// async move |result| {
3113 /// match result {
3114 /// Ok(response) => {
3115 /// println!("Got response: {:?}", response);
3116 /// // Can send more messages here
3117 /// connection.send_notification(QueryComplete {})?;
3118 /// Ok(())
3119 /// }
3120 /// Err(error) => {
3121 /// eprintln!("Request failed: {}", error);
3122 /// Err(error)
3123 /// }
3124 /// }
3125 /// }
3126 /// })?;
3127 ///
3128 /// // Handler continues immediately without waiting
3129 /// responder.respond(MyResponse { status: "processing".into() })
3130 /// }, agent_client_protocol::on_receive_request!())
3131 /// # .connect_to(agent_client_protocol_test::MockTransport).await?;
3132 /// # Ok(())
3133 /// # }
3134 /// ```
3135 ///
3136 /// # Ordering
3137 ///
3138 /// The callback runs as a spawned task, but the dispatch loop waits for it to complete
3139 /// before processing the next message. This gives you ordering guarantees: no other
3140 /// messages will be processed while your callback runs.
3141 ///
3142 /// This differs from [`block_task`](Self::block_task), which signals completion immediately
3143 /// upon receiving the response (before your code processes it).
3144 ///
3145 /// See the [`ordering`](crate::concepts::ordering) module for details on ordering guarantees
3146 /// and how to avoid deadlocks.
3147 ///
3148 /// # Error Handling
3149 ///
3150 /// If the scheduled task returns `Err`, the entire server will shut down. Make sure to handle
3151 /// errors appropriately within your task.
3152 ///
3153 /// # When to Use
3154 ///
3155 /// Use this method when:
3156 /// - You're in a handler callback (not a spawned task)
3157 /// - You want ordering guarantees (no other messages processed during your callback)
3158 /// - You need to do async work before "releasing" control back to the dispatch loop
3159 ///
3160 /// For spawned tasks where you don't need ordering guarantees, consider [`block_task`](Self::block_task).
3161 #[track_caller]
3162 pub fn on_receiving_result<F>(
3163 self,
3164 task: impl FnOnce(Result<T, crate::Error>) -> F + 'static + Send,
3165 ) -> Result<(), crate::Error>
3166 where
3167 F: Future<Output = Result<(), crate::Error>> + 'static + Send,
3168 T: Send,
3169 {
3170 let task_tx = self.task_tx.clone();
3171 let method = self.method;
3172 let response_rx = self.response_rx;
3173 let to_result = self.to_result;
3174 let location = Location::caller();
3175
3176 Task::new(location, async move {
3177 match response_rx.await {
3178 Ok(ResponsePayload { result, ack_tx }) => {
3179 // Convert the result using to_result for Ok values
3180 let typed_result = match result {
3181 Ok(json_value) => to_result(json_value),
3182 Err(err) => Err(err),
3183 };
3184
3185 // Run the user's callback
3186 let outcome = task(typed_result).await;
3187
3188 // Ack AFTER the callback completes - this is the key difference
3189 // from block_task. The dispatch loop waits for this ack.
3190 if let Some(tx) = ack_tx {
3191 let _ = tx.send(());
3192 }
3193
3194 outcome
3195 }
3196 Err(err) => Err(crate::util::internal_error(format!(
3197 "response to `{method}` never received: {err}"
3198 ))),
3199 }
3200 })
3201 .spawn(&task_tx)
3202 }
3203}
3204
3205// ============================================================================
3206// IntoJrConnectionTransport Implementations
3207// ============================================================================
3208
3209/// A component that communicates over line streams.
3210///
3211/// `Lines` implements the [`ConnectTo`] trait for any pair of line-based streams
3212/// (a `Stream<Item = io::Result<String>>` for incoming and a `Sink<String>` for outgoing),
3213/// handling serialization of JSON-RPC messages to/from newline-delimited JSON.
3214///
3215/// This is a lower-level primitive than [`ByteStreams`] that enables interception and
3216/// transformation of individual lines before they are parsed or after they are serialized.
3217/// This is particularly useful for debugging, logging, or implementing custom line-based
3218/// protocols.
3219///
3220/// # Use Cases
3221///
3222/// - **Line-by-line logging**: Intercept and log each line before parsing
3223/// - **Custom protocols**: Transform lines before/after JSON-RPC processing
3224/// - **Debugging**: Inspect raw message strings
3225/// - **Line filtering**: Skip or modify specific messages
3226///
3227/// Most users should use [`ByteStreams`] instead, which provides a simpler interface
3228/// for byte-based I/O.
3229///
3230/// [`ConnectTo`]: crate::ConnectTo
3231#[derive(Debug)]
3232pub struct Lines<OutgoingSink, IncomingStream> {
3233 /// Outgoing line sink (where we write serialized JSON-RPC messages)
3234 pub outgoing: OutgoingSink,
3235 /// Incoming line stream (where we read and parse JSON-RPC messages)
3236 pub incoming: IncomingStream,
3237}
3238
3239impl<OutgoingSink, IncomingStream> Lines<OutgoingSink, IncomingStream>
3240where
3241 OutgoingSink: futures::Sink<String, Error = std::io::Error> + Send + 'static,
3242 IncomingStream: futures::Stream<Item = std::io::Result<String>> + Send + 'static,
3243{
3244 /// Create a new line stream transport.
3245 pub fn new(outgoing: OutgoingSink, incoming: IncomingStream) -> Self {
3246 Self { outgoing, incoming }
3247 }
3248}
3249
3250impl<OutgoingSink, IncomingStream, R: Role> ConnectTo<R> for Lines<OutgoingSink, IncomingStream>
3251where
3252 OutgoingSink: futures::Sink<String, Error = std::io::Error> + Send + 'static,
3253 IncomingStream: futures::Stream<Item = std::io::Result<String>> + Send + 'static,
3254{
3255 async fn connect_to(self, client: impl ConnectTo<R::Counterpart>) -> Result<(), crate::Error> {
3256 let (channel, serve_self) = ConnectTo::<R>::into_channel_and_future(self);
3257 match futures::future::select(Box::pin(client.connect_to(channel)), serve_self).await {
3258 Either::Left((result, _)) | Either::Right((result, _)) => result,
3259 }
3260 }
3261
3262 fn into_channel_and_future(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
3263 let Self { outgoing, incoming } = self;
3264
3265 // Create a channel pair for the client to use
3266 let (channel_for_caller, channel_for_lines) = Channel::duplex();
3267
3268 // Create the server future that runs the line stream actors
3269 let server_future = Box::pin(async move {
3270 let Channel { rx, tx } = channel_for_lines;
3271
3272 // Run both actors concurrently
3273 let outgoing_future = transport_actor::transport_outgoing_lines_actor(rx, outgoing);
3274 let incoming_future = transport_actor::transport_incoming_lines_actor(incoming, tx);
3275
3276 // Wait for both to complete
3277 futures::try_join!(outgoing_future, incoming_future)?;
3278
3279 Ok(())
3280 });
3281
3282 (channel_for_caller, server_future)
3283 }
3284}
3285
3286/// A component that communicates over byte streams (stdin/stdout, sockets, pipes, etc.).
3287///
3288/// `ByteStreams` implements the [`ConnectTo`] trait for any pair of `AsyncRead` and `AsyncWrite`
3289/// streams, handling serialization of JSON-RPC messages to/from newline-delimited JSON.
3290/// This is the standard way to communicate with external processes or network connections.
3291///
3292/// # Use Cases
3293///
3294/// - **Stdio communication**: Connect to agents or proxies via stdin/stdout
3295/// - **Network sockets**: TCP, Unix domain sockets, or other stream-based protocols
3296/// - **Named pipes**: Cross-process communication on the same machine
3297/// - **File I/O**: Reading from and writing to file descriptors
3298///
3299/// # Example
3300///
3301/// Connecting to an agent via stdio:
3302///
3303/// ```no_run
3304/// use agent_client_protocol::UntypedRole;
3305/// # use agent_client_protocol::{ByteStreams};
3306/// use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
3307///
3308/// # async fn example() -> Result<(), agent_client_protocol::Error> {
3309/// let component = ByteStreams::new(
3310/// tokio::io::stdout().compat_write(),
3311/// tokio::io::stdin().compat(),
3312/// );
3313///
3314/// // Use as a component in a connection
3315/// agent_client_protocol::UntypedRole.builder()
3316/// .name("my-client")
3317/// .connect_to(component)
3318/// .await?;
3319/// # Ok(())
3320/// # }
3321/// ```
3322///
3323/// [`ConnectTo`]: crate::ConnectTo
3324#[derive(Debug)]
3325pub struct ByteStreams<OB, IB> {
3326 /// Outgoing byte stream (where we write serialized messages)
3327 pub outgoing: OB,
3328 /// Incoming byte stream (where we read and parse messages)
3329 pub incoming: IB,
3330}
3331
3332impl<OB, IB> ByteStreams<OB, IB>
3333where
3334 OB: AsyncWrite + Send + 'static,
3335 IB: AsyncRead + Send + 'static,
3336{
3337 /// Create a new byte stream transport.
3338 pub fn new(outgoing: OB, incoming: IB) -> Self {
3339 Self { outgoing, incoming }
3340 }
3341}
3342
3343impl<OB, IB, R: Role> ConnectTo<R> for ByteStreams<OB, IB>
3344where
3345 OB: AsyncWrite + Send + 'static,
3346 IB: AsyncRead + Send + 'static,
3347{
3348 async fn connect_to(self, client: impl ConnectTo<R::Counterpart>) -> Result<(), crate::Error> {
3349 let (channel, serve_self) = ConnectTo::<R>::into_channel_and_future(self);
3350 match futures::future::select(pin!(client.connect_to(channel)), serve_self).await {
3351 Either::Left((result, _)) | Either::Right((result, _)) => result,
3352 }
3353 }
3354
3355 fn into_channel_and_future(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
3356 use futures::AsyncBufReadExt;
3357 use futures::AsyncWriteExt;
3358 use futures::io::BufReader;
3359 let Self { outgoing, incoming } = self;
3360
3361 // Convert byte streams to line streams
3362 // Box both streams to satisfy Unpin requirements
3363 let incoming_lines = Box::pin(BufReader::new(incoming).lines());
3364
3365 // Create a sink that writes lines (with newlines) to the outgoing byte stream
3366 // We need to Box the writer since it may not be Unpin
3367 let outgoing_sink =
3368 futures::sink::unfold(Box::pin(outgoing), async move |mut writer, line: String| {
3369 let mut bytes = line.into_bytes();
3370 bytes.push(b'\n');
3371 writer.write_all(&bytes).await?;
3372 Ok::<_, std::io::Error>(writer)
3373 });
3374
3375 // Delegate to Lines component
3376 ConnectTo::<R>::into_channel_and_future(Lines::new(outgoing_sink, incoming_lines))
3377 }
3378}
3379
3380/// A channel endpoint representing one side of a bidirectional message channel.
3381///
3382/// `Channel` represents a single endpoint's view of a bidirectional communication channel.
3383/// Each endpoint has:
3384/// - `rx`: A receiver for incoming messages (or errors) from the counterpart
3385/// - `tx`: A sender for outgoing messages (or errors) to the counterpart
3386///
3387/// # Example
3388///
3389/// ```no_run
3390/// # use agent_client_protocol::UntypedRole;
3391/// # use agent_client_protocol::{Channel, Builder};
3392/// # async fn example() -> Result<(), agent_client_protocol::Error> {
3393/// // Create a pair of connected channels
3394/// let (channel_a, channel_b) = Channel::duplex();
3395///
3396/// // Each channel can be used by a different component
3397/// UntypedRole.builder()
3398/// .name("connection-a")
3399/// .connect_to(channel_a)
3400/// .await?;
3401/// # Ok(())
3402/// # }
3403/// ```
3404#[derive(Debug)]
3405pub struct Channel {
3406 /// Receives messages (or errors) from the counterpart.
3407 pub rx: mpsc::UnboundedReceiver<Result<jsonrpcmsg::Message, crate::Error>>,
3408 /// Sends messages (or errors) to the counterpart.
3409 pub tx: mpsc::UnboundedSender<Result<jsonrpcmsg::Message, crate::Error>>,
3410}
3411
3412impl Channel {
3413 /// Create a pair of connected channel endpoints.
3414 ///
3415 /// Returns two `Channel` instances that are connected to each other:
3416 /// - Messages sent via `channel_a.tx` are received on `channel_b.rx`
3417 /// - Messages sent via `channel_b.tx` are received on `channel_a.rx`
3418 ///
3419 /// # Returns
3420 ///
3421 /// A tuple `(channel_a, channel_b)` of connected channel endpoints.
3422 #[must_use]
3423 pub fn duplex() -> (Self, Self) {
3424 // Create channels: A sends Result<Message> which B receives as Message
3425 let (a_tx, b_rx) = mpsc::unbounded();
3426 let (b_tx, a_rx) = mpsc::unbounded();
3427
3428 let channel_a = Self { rx: a_rx, tx: a_tx };
3429 let channel_b = Self { rx: b_rx, tx: b_tx };
3430
3431 (channel_a, channel_b)
3432 }
3433
3434 /// Copy messages from `rx` to `tx`.
3435 ///
3436 /// # Returns
3437 ///
3438 /// A `Result` indicating success or failure.
3439 pub async fn copy(mut self) -> Result<(), crate::Error> {
3440 while let Some(msg) = self.rx.next().await {
3441 self.tx
3442 .unbounded_send(msg)
3443 .map_err(crate::util::internal_error)?;
3444 }
3445 Ok(())
3446 }
3447}
3448
3449impl<R: Role> ConnectTo<R> for Channel {
3450 async fn connect_to(self, client: impl ConnectTo<R::Counterpart>) -> Result<(), crate::Error> {
3451 let (client_channel, client_serve) = client.into_channel_and_future();
3452
3453 match futures::try_join!(
3454 Channel {
3455 rx: client_channel.rx,
3456 tx: self.tx
3457 }
3458 .copy(),
3459 Channel {
3460 rx: self.rx,
3461 tx: client_channel.tx
3462 }
3463 .copy(),
3464 client_serve
3465 ) {
3466 Ok(((), (), ())) => Ok(()),
3467 Err(err) => Err(err),
3468 }
3469 }
3470
3471 fn into_channel_and_future(self) -> (Channel, BoxFuture<'static, Result<(), crate::Error>>) {
3472 (self, Box::pin(future::ready(Ok(()))))
3473 }
3474}